from __future__ import annotations import re from typing import Any from sqlalchemy import and_, func, or_, select from sqlalchemy.orm import Session, selectinload from app.api.deps import CurrentUserContext from app.models.employee import Employee from app.models.financial_record import ExpenseClaim from app.models.organization import OrganizationUnit from app.models.role import Role from app.services.expense_claim_workflow_constants import ( APPROVAL_DONE_STAGE, ARCHIVE_ACCOUNTING_STAGE, BUDGET_MANAGER_APPROVAL_STAGE, DIRECT_MANAGER_APPROVAL_STAGE, FINANCE_APPROVAL_STAGE, PAYMENT_PAID_STAGE, PAYMENT_PENDING_STATUS, ) PRIVILEGED_CLAIM_ROLE_CODES = {"finance", "executive"} ARCHIVE_CENTER_ROLE_CODES = {"finance", "executive"} APPROVAL_VISIBLE_CLAIM_ROLE_CODES = {"manager", "approver"} BUDGET_APPROVAL_ROLE_CODES = {"budget_monitor", "executive"} BUDGET_MONITOR_ROLE_CODE = "budget_monitor" BUDGET_MONITOR_APPROVAL_GRADE = "P8" CLAIM_DELETE_ROLE_CODES = {"executive"} ARCHIVED_CLAIM_STATUSES = ("approved", "completed", "paid") APPLICATION_ARCHIVED_STAGES = (APPROVAL_DONE_STAGE, "申请归档", "completed") ARCHIVED_REIMBURSEMENT_STAGES = (ARCHIVE_ACCOUNTING_STAGE, PAYMENT_PAID_STAGE, "completed") class ExpenseClaimAccessPolicy: def __init__(self, db: Session) -> None: self.db = db @staticmethod def has_privileged_claim_access(current_user: CurrentUserContext) -> bool: if current_user.is_admin: return True return bool(ExpenseClaimAccessPolicy.normalize_role_codes(current_user) & PRIVILEGED_CLAIM_ROLE_CODES) @staticmethod def has_archive_center_access(current_user: CurrentUserContext) -> bool: if current_user.is_admin: return True return bool(ExpenseClaimAccessPolicy.normalize_role_codes(current_user) & ARCHIVE_CENTER_ROLE_CODES) @staticmethod def build_archived_claim_condition() -> Any: normalized_status = func.lower(func.coalesce(ExpenseClaim.status, "")) stage = func.coalesce(ExpenseClaim.approval_stage, "") normalized_type = func.lower(func.coalesce(ExpenseClaim.expense_type, "")) claim_no = func.upper(func.coalesce(ExpenseClaim.claim_no, "")) application_condition = or_( claim_no.like("AP-%"), claim_no.like("APP-%"), normalized_type == "application", normalized_type.like("%\\_application", escape="\\"), ) return or_( stage.in_(ARCHIVED_REIMBURSEMENT_STAGES), stage == "completed", and_( application_condition, normalized_status.in_(ARCHIVED_CLAIM_STATUSES), stage.in_(APPLICATION_ARCHIVED_STAGES), ), and_( normalized_status.in_(ARCHIVED_CLAIM_STATUSES), or_( stage == "", stage.is_(None), stage.in_(ARCHIVED_REIMBURSEMENT_STAGES), stage == "completed", ), ), ) @staticmethod def has_claim_delete_access(current_user: CurrentUserContext) -> bool: if current_user.is_admin: return True return bool(ExpenseClaimAccessPolicy.normalize_role_codes(current_user) & CLAIM_DELETE_ROLE_CODES) @staticmethod def is_archived_claim(claim: ExpenseClaim) -> bool: normalized_status = str(claim.status or "").strip().lower() stage = str(claim.approval_stage or "").strip() if stage in set(ARCHIVED_REIMBURSEMENT_STAGES): return True normalized_type = str(claim.expense_type or "").strip().lower() claim_no = str(claim.claim_no or "").strip().upper() is_application_claim = ( claim_no.startswith(("AP-", "APP-")) or normalized_type == "application" or normalized_type.endswith("_application") ) if ( is_application_claim and normalized_status in ARCHIVED_CLAIM_STATUSES and stage in APPLICATION_ARCHIVED_STAGES ): return True return normalized_status in ARCHIVED_CLAIM_STATUSES and stage in {"", *ARCHIVED_REIMBURSEMENT_STAGES} def can_return_claim(self, current_user: CurrentUserContext, claim: ExpenseClaim) -> bool: normalized_status = str(claim.status or "").strip().lower() if normalized_status != "submitted": return False stage = str(claim.approval_stage or "").strip() if stage == DIRECT_MANAGER_APPROVAL_STAGE: return self.is_current_direct_manager_approver(current_user, claim) if stage == BUDGET_MANAGER_APPROVAL_STAGE: return self.is_budget_manager_approver(current_user, claim) if stage == FINANCE_APPROVAL_STAGE: return self.has_privileged_claim_access(current_user) and not self.is_claim_owned_by_current_user( claim, current_user, ) return False def can_approve_claim(self, current_user: CurrentUserContext, claim: ExpenseClaim) -> bool: stage = str(claim.approval_stage or "").strip() if stage == DIRECT_MANAGER_APPROVAL_STAGE: return self.is_current_direct_manager_approver(current_user, claim) if stage == BUDGET_MANAGER_APPROVAL_STAGE: return self.is_budget_manager_approver(current_user, claim) if stage == FINANCE_APPROVAL_STAGE: role_codes = self.normalize_role_codes(current_user) return ( (current_user.is_admin or "finance" in role_codes) and not self.is_claim_owned_by_current_user(claim, current_user) ) return False def can_mark_claim_paid(self, current_user: CurrentUserContext, claim: ExpenseClaim) -> bool: if str(claim.status or "").strip().lower() != PAYMENT_PENDING_STATUS: return False if self.is_claim_owned_by_current_user(claim, current_user): return False if current_user.is_admin: return True return bool(self.normalize_role_codes(current_user) & PRIVILEGED_CLAIM_ROLE_CODES) def is_current_direct_manager_approver(self, current_user: CurrentUserContext, claim: ExpenseClaim) -> bool: role_codes = self.normalize_role_codes(current_user) if not (role_codes & APPROVAL_VISIBLE_CLAIM_ROLE_CODES): return False if str(claim.status or "").strip().lower() != "submitted": return False if str(claim.approval_stage or "").strip() != DIRECT_MANAGER_APPROVAL_STAGE: return False current_employee = self.resolve_current_employee(current_user) if current_employee is not None and str(claim.employee_id or "").strip() == current_employee.id: return False claim_employee = claim.employee if current_employee is not None and claim_employee is not None: if claim_employee.manager_id == current_employee.id: return True if claim_employee.manager is not None and claim_employee.manager.id == current_employee.id: return True approver_name = str( current_employee.name if current_employee is not None and current_employee.name else current_user.name or "" ).strip() if not approver_name: return False return self.resolve_claim_manager_name(claim) == approver_name def is_budget_manager_approver(self, current_user: CurrentUserContext, claim: ExpenseClaim) -> bool: if str(claim.status or "").strip().lower() != "submitted": return False if str(claim.approval_stage or "").strip() != BUDGET_MANAGER_APPROVAL_STAGE: return False if self.is_claim_owned_by_current_user(claim, current_user): return False if current_user.is_admin: return True return self.is_department_budget_approver(current_user, claim) def is_budget_manager_user(self, current_user: CurrentUserContext) -> bool: if current_user.is_admin: return True role_codes = self.normalize_role_codes(current_user) return bool(role_codes & BUDGET_APPROVAL_ROLE_CODES) def is_department_p8_budget_monitor(self, current_user: CurrentUserContext, claim: ExpenseClaim) -> bool: return self.is_department_budget_approver(current_user, claim) def is_department_budget_approver(self, current_user: CurrentUserContext, claim: ExpenseClaim) -> bool: role_codes = self.normalize_role_codes(current_user) current_employee = self.resolve_current_employee(current_user) if current_employee is None: return False role_codes |= self._collect_employee_role_codes(current_employee) if not role_codes & BUDGET_APPROVAL_ROLE_CODES: return False if not self._employee_has_budget_approval_grade(current_employee): return False return self._employee_matches_claim_department(current_employee, current_user, claim) def resolve_department_budget_manager(self, claim: ExpenseClaim) -> Employee | None: department_ids, department_names = self._collect_claim_department_identity(claim) department_conditions = [] if department_ids: department_conditions.append(Employee.organization_unit_id.in_(department_ids)) if department_names: department_conditions.append(Employee.organization_unit.has(OrganizationUnit.name.in_(department_names))) if not department_conditions: return None stmt = ( select(Employee) .options(selectinload(Employee.organization_unit), selectinload(Employee.roles)) .where( func.upper(func.coalesce(Employee.grade, "")) == BUDGET_MONITOR_APPROVAL_GRADE, Employee.roles.any(Role.role_code.in_(BUDGET_APPROVAL_ROLE_CODES)), or_(*department_conditions), ) .order_by(Employee.name.asc(), Employee.employee_no.asc()) .limit(1) ) claim_employee_id = str(claim.employee_id or "").strip() if claim_employee_id: stmt = stmt.where(Employee.id != claim_employee_id) return self.db.scalar(stmt) def resolve_budget_approval_role_code(self, employee: Employee | None) -> str: role_codes = self._collect_employee_role_codes(employee) for role_code in ("budget_monitor", "executive"): if role_code in role_codes: return role_code return BUDGET_MONITOR_ROLE_CODE def attach_budget_approval_snapshot(self, claim: ExpenseClaim | None) -> ExpenseClaim | None: if claim is None: return None if str(claim.approval_stage or "").strip() != BUDGET_MANAGER_APPROVAL_STAGE: return claim budget_manager = self.resolve_department_budget_manager(claim) if budget_manager is None: return claim setattr(claim, "budget_approver_name", str(budget_manager.name or "").strip()) setattr(claim, "budget_approver_grade", str(budget_manager.grade or "").strip()) setattr( claim, "budget_approver_role_code", self.resolve_budget_approval_role_code(budget_manager), ) return claim def attach_budget_approval_snapshots(self, claims: list[ExpenseClaim]) -> list[ExpenseClaim]: for claim in claims: self.attach_budget_approval_snapshot(claim) return claims @staticmethod def normalize_role_codes(current_user: CurrentUserContext) -> set[str]: return { str(item).strip().lower() for item in current_user.role_codes if str(item).strip() } @staticmethod def _collect_employee_role_codes(employee: Employee | None) -> set[str]: if employee is None: return set() return { str(role.role_code or "").strip().lower() for role in list(employee.roles or []) if str(role.role_code or "").strip() } @staticmethod def _employee_has_budget_approval_grade(employee: Employee) -> bool: return str(employee.grade or "").strip().upper() == BUDGET_MONITOR_APPROVAL_GRADE def _employee_matches_claim_department( self, employee: Employee, current_user: CurrentUserContext, claim: ExpenseClaim, ) -> bool: claim_department_ids, claim_department_names = self._collect_claim_department_identity(claim) employee_department_ids = { str(employee.organization_unit_id or "").strip(), } employee_department_names = { str(current_user.department_name or "").strip(), } if employee.organization_unit is not None: employee_department_names.add(str(employee.organization_unit.name or "").strip()) employee_department_ids.discard("") employee_department_names.discard("") return bool( (claim_department_ids and employee_department_ids & claim_department_ids) or (claim_department_names and employee_department_names & claim_department_names) ) def _collect_claim_department_identity(self, claim: ExpenseClaim) -> tuple[set[str], set[str]]: department_ids = { str(claim.department_id or "").strip(), } department_names = { str(claim.department_name or "").strip(), } claim_employee = self.resolve_claim_employee_for_backfill(claim) if claim_employee is not None: department_ids.add(str(claim_employee.organization_unit_id or "").strip()) if claim_employee.organization_unit is not None: department_names.add(str(claim_employee.organization_unit.name or "").strip()) department_ids.discard("") department_names.discard("") return department_ids, department_names def resolve_current_employee(self, current_user: CurrentUserContext) -> Employee | None: return self.resolve_employee_by_identity_candidates( [ str(current_user.username or "").strip(), str(current_user.name or "").strip(), str(current_user.employee_no or "").strip(), ] ) def resolve_current_user_display_name(self, current_user: CurrentUserContext) -> str: current_employee = self.resolve_current_employee(current_user) if current_employee is not None and str(current_employee.name or "").strip(): return str(current_employee.name).strip() for candidate in (current_user.name, current_user.username): normalized = str(candidate or "").strip() if normalized and not self.is_email_like(normalized): return normalized return str(current_user.username or current_user.name or "anonymous").strip() or "anonymous" def is_claim_owned_by_current_user(self, claim: ExpenseClaim, current_user: CurrentUserContext) -> bool: claim_employee_id = str(claim.employee_id or "").strip() current_employee = self.resolve_current_employee(current_user) if current_employee is not None: if claim_employee_id == current_employee.id: return True identity_values = { str(current_employee.name or "").strip(), str(current_employee.email or "").strip(), str(current_employee.employee_no or "").strip(), } else: identity_values = set() identity_values.update( { str(current_user.username or "").strip(), str(current_user.name or "").strip(), str(current_user.employee_no or "").strip(), } ) identity_values.discard("") if claim_employee_id and claim_employee_id in identity_values: return True return str(claim.employee_name or "").strip() in identity_values @staticmethod def is_email_like(value: str) -> bool: return bool(re.match(r"^[^@\s]+@[^@\s]+\.[^@\s]+$", str(value or "").strip())) def resolve_claim_employee_for_backfill(self, claim: ExpenseClaim) -> Employee | None: if claim.employee is not None: employee = self.db.scalar( select(Employee) .options( selectinload(Employee.organization_unit), selectinload(Employee.manager), selectinload(Employee.roles), ) .where(Employee.id == claim.employee.id) .limit(1) ) return employee or claim.employee employee_id = str(claim.employee_id or "").strip() if employee_id: employee = self.db.scalar( select(Employee) .options( selectinload(Employee.organization_unit), selectinload(Employee.manager), selectinload(Employee.roles), ) .where(Employee.id == employee_id) .limit(1) ) if employee is not None: return employee return self.resolve_employee_by_identity_candidates([str(claim.employee_name or "").strip()]) def resolve_employee_by_identity_candidates(self, candidates: list[str]) -> Employee | None: normalized_candidates = [ item for item in dict.fromkeys(str(candidate or "").strip() for candidate in candidates) if item ] if not normalized_candidates: return None load_options = ( selectinload(Employee.organization_unit), selectinload(Employee.manager), selectinload(Employee.roles), ) for candidate in normalized_candidates: employee = self.db.scalar( select(Employee) .options(*load_options) .where( or_( func.lower(Employee.email) == candidate.lower(), func.lower(Employee.employee_no) == candidate.lower(), ) ) .limit(1) ) if employee is not None: return employee for candidate in normalized_candidates: matches = list( self.db.scalars( select(Employee) .options(*load_options) .where(Employee.name == candidate) .limit(2) ).all() ) if len(matches) == 1: return matches[0] return None def backfill_claim_identity_from_current_user( self, claim: ExpenseClaim, current_user: CurrentUserContext, ) -> None: employee = self.resolve_claim_employee_for_backfill(claim) or self.resolve_current_employee(current_user) if employee is not None: claim_employee_id = str(claim.employee_id or "").strip() claim_employee_name = str(claim.employee_name or "").strip() employee_names = { str(employee.name or "").strip(), str(employee.email or "").strip(), str(employee.employee_no or "").strip(), } employee_names.discard("") can_apply_employee = ( not claim_employee_id or claim_employee_id == employee.id or self.is_missing_value(claim_employee_name) or claim_employee_name in employee_names ) if can_apply_employee: claim.employee = employee claim.employee_id = employee.id if employee.name: claim.employee_name = employee.name if employee.organization_unit is not None: claim.department_id = employee.organization_unit_id claim.department_name = employee.organization_unit.name return context_department = str( getattr(current_user, "department_name", "") or getattr(current_user, "department", "") or getattr(current_user, "departmentName", "") or "" ).strip() if context_department and self.is_missing_value(claim.department_name): claim.department_name = context_department context_name = str(current_user.name or current_user.username or "").strip() if context_name and self.is_missing_value(claim.employee_name): claim.employee_name = context_name def employee_name_is_unique(self, employee: Employee) -> bool: normalized_name = str(employee.name or "").strip() if not normalized_name: return False same_name_count = int( self.db.scalar( select(func.count()).select_from(Employee).where(Employee.name == normalized_name) ) or 0 ) return same_name_count == 1 def build_personal_claim_conditions(self, current_user: CurrentUserContext) -> list[Any]: conditions = [] username = str(current_user.username or "").strip() employee = self.resolve_current_employee(current_user) def add_condition(field_name: str, value: str | None) -> None: normalized = str(value or "").strip() if not normalized: return if field_name == "employee_id": conditions.append(ExpenseClaim.employee_id == normalized) return conditions.append(ExpenseClaim.employee_name == normalized) if employee is not None: add_condition("employee_id", employee.id) add_condition("employee_name", employee.email) if self.employee_name_is_unique(employee): add_condition("employee_name", employee.name) else: add_condition("employee_id", username) add_condition("employee_id", str(current_user.employee_no or "").strip()) add_condition("employee_name", username) add_condition("employee_name", str(current_user.name or "").strip()) add_condition("employee_name", str(current_user.employee_no or "").strip()) return conditions def build_approval_claim_conditions(self, current_user: CurrentUserContext) -> list[Any]: role_codes = self.normalize_role_codes(current_user) if not (role_codes & APPROVAL_VISIBLE_CLAIM_ROLE_CODES): return [] employee = self.resolve_current_employee(current_user) manager_name = str( employee.name if employee is not None and employee.name else current_user.name or "" ).strip() pending_leader_approval_parts = [ ExpenseClaim.status == "submitted", ExpenseClaim.approval_stage == DIRECT_MANAGER_APPROVAL_STAGE, ] if employee is not None: pending_leader_approval_parts.append( or_(ExpenseClaim.employee_id.is_(None), ExpenseClaim.employee_id != employee.id) ) if manager_name: pending_leader_approval_parts.append(ExpenseClaim.employee_name != manager_name) pending_leader_approval = and_(*pending_leader_approval_parts) conditions = [] if employee is not None: subordinate_ids = select(Employee.id).where(Employee.manager_id == employee.id) conditions.append(and_(pending_leader_approval, ExpenseClaim.employee_id.in_(subordinate_ids))) if manager_name: managed_department_ids = select(OrganizationUnit.id).where(OrganizationUnit.manager_name == manager_name) managed_department_names = select(OrganizationUnit.name).where(OrganizationUnit.manager_name == manager_name) conditions.append(and_(pending_leader_approval, ExpenseClaim.department_id.in_(managed_department_ids))) conditions.append(and_(pending_leader_approval, ExpenseClaim.department_name.in_(managed_department_names))) return conditions def build_budget_approval_claim_conditions(self, current_user: CurrentUserContext) -> list[Any]: employee = self.resolve_current_employee(current_user) role_codes = self.normalize_role_codes(current_user) | self._collect_employee_role_codes(employee) if not role_codes & BUDGET_APPROVAL_ROLE_CODES: return [] if employee is None or not self._employee_has_budget_approval_grade(employee): return [] pending_budget_approval_parts = [ ExpenseClaim.status == "submitted", ExpenseClaim.approval_stage == BUDGET_MANAGER_APPROVAL_STAGE, ] pending_budget_approval_parts.append( or_(ExpenseClaim.employee_id.is_(None), ExpenseClaim.employee_id != employee.id) ) if employee.name: pending_budget_approval_parts.append(ExpenseClaim.employee_name != employee.name) department_conditions = [] department_name = str(current_user.department_name or "").strip() if employee.organization_unit_id: department_conditions.append(ExpenseClaim.department_id == employee.organization_unit_id) subordinate_department_employee_ids = select(Employee.id).where( Employee.organization_unit_id == employee.organization_unit_id ) department_conditions.append(ExpenseClaim.employee_id.in_(subordinate_department_employee_ids)) if employee.organization_unit is not None and employee.organization_unit.name: department_conditions.append(ExpenseClaim.department_name == employee.organization_unit.name) if department_name: department_conditions.append(ExpenseClaim.department_name == department_name) if not department_conditions: return [] pending_budget_approval_parts.append(or_(*department_conditions)) return [and_(*pending_budget_approval_parts)] def apply_approval_claim_scope(self, stmt: Any, current_user: CurrentUserContext) -> Any: role_codes = self.normalize_role_codes(current_user) if current_user.is_admin: return stmt.where(ExpenseClaim.status == "submitted") conditions = [] if "finance" in role_codes: conditions.append(and_( ExpenseClaim.status == "submitted", ExpenseClaim.approval_stage == FINANCE_APPROVAL_STAGE, )) conditions.extend(self.build_budget_approval_claim_conditions(current_user)) conditions.extend(self.build_approval_claim_conditions(current_user)) if not conditions: return stmt.where(ExpenseClaim.id == "__no_visible_claim__") return stmt.where(or_(*conditions)) def apply_claim_scope( self, stmt: Any, current_user: CurrentUserContext, *, include_approval_scope: bool = False, ) -> Any: conditions = self.build_personal_claim_conditions(current_user) if include_approval_scope: role_codes = self.normalize_role_codes(current_user) if current_user.is_admin or "executive" in role_codes: conditions.append(ExpenseClaim.status.in_(("submitted", PAYMENT_PENDING_STATUS, "returned"))) elif "finance" in role_codes: conditions.append( or_( and_( ExpenseClaim.status == "submitted", ExpenseClaim.approval_stage == FINANCE_APPROVAL_STAGE, ), ExpenseClaim.status.in_((PAYMENT_PENDING_STATUS, "returned")), ) ) conditions.extend(self.build_budget_approval_claim_conditions(current_user)) conditions.extend(self.build_approval_claim_conditions(current_user)) if self.has_archive_center_access(current_user): conditions.append(self.build_archived_claim_condition()) if not conditions: return stmt.where(ExpenseClaim.id == "__no_visible_claim__") return stmt.where(or_(*conditions)) def apply_archived_claim_scope(self, stmt: Any, current_user: CurrentUserContext) -> Any: archived_condition = self.build_archived_claim_condition() if not self.has_archive_center_access(current_user): owned_conditions = self.build_personal_claim_conditions(current_user) if not owned_conditions: return stmt.where(ExpenseClaim.id == "__no_visible_claim__") return stmt.where(archived_condition, or_(*owned_conditions)) return stmt.where(archived_condition) @staticmethod def resolve_claim_manager_name(claim: ExpenseClaim) -> str: if claim.employee is not None: if claim.employee.manager is not None and claim.employee.manager.name: return str(claim.employee.manager.name).strip() if claim.employee.organization_unit is not None and claim.employee.organization_unit.manager_name: return str(claim.employee.organization_unit.manager_name).strip() return "" @staticmethod def is_missing_value(value: Any) -> bool: normalized = str(value or "").strip() return not normalized or normalized in {"待补充", "待确认", "N/A", "n/a", "无"}