refactor(backend): update expense claims service and tests
- services/expense_claims.py: update expense claims service - tests/test_expense_claim_service.py: update expense claim service tests
This commit is contained in:
@@ -535,7 +535,6 @@ class ExpenseClaimService:
|
||||
if is_new_claim:
|
||||
existing_draft_count = self._count_draft_claims_for_owner(
|
||||
employee=employee,
|
||||
employee_name=draft_owner_name,
|
||||
user_id=user_id,
|
||||
)
|
||||
if existing_draft_count >= MAX_DRAFT_CLAIMS_PER_USER:
|
||||
@@ -733,12 +732,10 @@ class ExpenseClaimService:
|
||||
self,
|
||||
*,
|
||||
employee: Employee | None,
|
||||
employee_name: str,
|
||||
user_id: str | None,
|
||||
) -> int:
|
||||
owner_filters = self._build_draft_owner_filters(
|
||||
employee=employee,
|
||||
employee_name=employee_name,
|
||||
user_id=user_id,
|
||||
)
|
||||
if not owner_filters:
|
||||
@@ -752,11 +749,10 @@ class ExpenseClaimService:
|
||||
)
|
||||
return int(self.db.scalar(stmt) or 0)
|
||||
|
||||
@staticmethod
|
||||
def _build_draft_owner_filters(
|
||||
self,
|
||||
*,
|
||||
employee: Employee | None,
|
||||
employee_name: str,
|
||||
user_id: str | None,
|
||||
) -> list[Any]:
|
||||
conditions: list[Any] = []
|
||||
@@ -779,10 +775,10 @@ class ExpenseClaimService:
|
||||
|
||||
if employee is not None:
|
||||
add_condition("employee_id", employee.id)
|
||||
add_condition("employee_name", employee.name)
|
||||
add_condition("employee_name", employee.email)
|
||||
if self._employee_name_is_unique(employee):
|
||||
add_condition("employee_name", employee.name)
|
||||
|
||||
add_condition("employee_name", employee_name)
|
||||
add_condition("employee_name", user_id)
|
||||
return conditions
|
||||
|
||||
@@ -1607,7 +1603,25 @@ class ExpenseClaimService:
|
||||
|
||||
@staticmethod
|
||||
def _has_privileged_claim_access(current_user: CurrentUserContext) -> bool:
|
||||
return bool(set(current_user.role_codes) & PRIVILEGED_CLAIM_ROLE_CODES)
|
||||
role_codes = {
|
||||
str(item).strip().lower()
|
||||
for item in current_user.role_codes
|
||||
if str(item).strip()
|
||||
}
|
||||
return bool(role_codes & PRIVILEGED_CLAIM_ROLE_CODES)
|
||||
|
||||
def _employee_name_is_unique(self, employee: Employee) -> bool:
|
||||
normalized_name = str(employee.name or "").strip()
|
||||
if not normalized_name:
|
||||
return False
|
||||
|
||||
same_name_count = int(
|
||||
self.db.scalar(
|
||||
select(func.count()).select_from(Employee).where(Employee.name == normalized_name)
|
||||
)
|
||||
or 0
|
||||
)
|
||||
return same_name_count == 1
|
||||
|
||||
def _apply_claim_scope(self, stmt: Any, current_user: CurrentUserContext) -> Any:
|
||||
if self._has_privileged_claim_access(current_user):
|
||||
@@ -1635,8 +1649,9 @@ class ExpenseClaimService:
|
||||
|
||||
if employee is not None:
|
||||
add_condition("employee_id", employee.id)
|
||||
add_condition("employee_name", employee.name)
|
||||
add_condition("employee_name", employee.email)
|
||||
if self._employee_name_is_unique(employee):
|
||||
add_condition("employee_name", employee.name)
|
||||
else:
|
||||
add_condition("employee_id", username)
|
||||
add_condition("employee_name", username)
|
||||
|
||||
Reference in New Issue
Block a user