from __future__ import annotations import re from typing import Any from sqlalchemy import and_, func, or_, select from sqlalchemy.orm import Session, selectinload from app.api.deps import CurrentUserContext from app.models.employee import Employee from app.models.financial_record import ExpenseClaim from app.models.organization import OrganizationUnit PRIVILEGED_CLAIM_ROLE_CODES = {"finance", "executive"} ARCHIVE_CENTER_ROLE_CODES = {"finance", "executive"} APPROVAL_VISIBLE_CLAIM_ROLE_CODES = {"manager", "approver"} CLAIM_DELETE_ROLE_CODES = {"executive"} ARCHIVED_CLAIM_STATUSES = ("approved", "completed", "paid") APPLICATION_ARCHIVED_STAGES = ("审批完成", "申请归档", "completed") class ExpenseClaimAccessPolicy: def __init__(self, db: Session) -> None: self.db = db @staticmethod def has_privileged_claim_access(current_user: CurrentUserContext) -> bool: if current_user.is_admin: return True return bool(ExpenseClaimAccessPolicy.normalize_role_codes(current_user) & PRIVILEGED_CLAIM_ROLE_CODES) @staticmethod def has_archive_center_access(current_user: CurrentUserContext) -> bool: if current_user.is_admin: return True return bool(ExpenseClaimAccessPolicy.normalize_role_codes(current_user) & ARCHIVE_CENTER_ROLE_CODES) @staticmethod def build_archived_claim_condition() -> Any: normalized_status = func.lower(func.coalesce(ExpenseClaim.status, "")) stage = func.coalesce(ExpenseClaim.approval_stage, "") normalized_type = func.lower(func.coalesce(ExpenseClaim.expense_type, "")) claim_no = func.upper(func.coalesce(ExpenseClaim.claim_no, "")) application_condition = or_( claim_no.like("AP-%"), claim_no.like("APP-%"), normalized_type == "application", normalized_type.like("%\\_application", escape="\\"), ) return or_( stage == "归档入账", stage == "completed", and_( application_condition, normalized_status.in_(ARCHIVED_CLAIM_STATUSES), stage.in_(APPLICATION_ARCHIVED_STAGES), ), and_( normalized_status.in_(ARCHIVED_CLAIM_STATUSES), or_( stage == "", stage.is_(None), stage == "归档入账", stage == "completed", ), ), ) @staticmethod def has_claim_delete_access(current_user: CurrentUserContext) -> bool: if current_user.is_admin: return True return bool(ExpenseClaimAccessPolicy.normalize_role_codes(current_user) & CLAIM_DELETE_ROLE_CODES) @staticmethod def is_archived_claim(claim: ExpenseClaim) -> bool: normalized_status = str(claim.status or "").strip().lower() stage = str(claim.approval_stage or "").strip() if stage in {"归档入账", "completed"}: return True normalized_type = str(claim.expense_type or "").strip().lower() claim_no = str(claim.claim_no or "").strip().upper() is_application_claim = ( claim_no.startswith(("AP-", "APP-")) or normalized_type == "application" or normalized_type.endswith("_application") ) if ( is_application_claim and normalized_status in ARCHIVED_CLAIM_STATUSES and stage in APPLICATION_ARCHIVED_STAGES ): return True return normalized_status in ARCHIVED_CLAIM_STATUSES and stage in {"", "归档入账", "completed"} def can_return_claim(self, current_user: CurrentUserContext, claim: ExpenseClaim) -> bool: normalized_status = str(claim.status or "").strip().lower() if normalized_status != "submitted": return False stage = str(claim.approval_stage or "").strip() if stage == "直属领导审批": return self.is_current_direct_manager_approver(current_user, claim) if stage == "财务审批": return self.has_privileged_claim_access(current_user) and not self.is_claim_owned_by_current_user( claim, current_user, ) return False def can_approve_claim(self, current_user: CurrentUserContext, claim: ExpenseClaim) -> bool: stage = str(claim.approval_stage or "").strip() if stage == "直属领导审批": return self.is_current_direct_manager_approver(current_user, claim) if stage == "财务审批": role_codes = self.normalize_role_codes(current_user) return ( (current_user.is_admin or "finance" in role_codes) and not self.is_claim_owned_by_current_user(claim, current_user) ) return False def is_current_direct_manager_approver(self, current_user: CurrentUserContext, claim: ExpenseClaim) -> bool: role_codes = self.normalize_role_codes(current_user) if not (role_codes & APPROVAL_VISIBLE_CLAIM_ROLE_CODES): return False if str(claim.status or "").strip().lower() != "submitted": return False if str(claim.approval_stage or "").strip() != "直属领导审批": return False current_employee = self.resolve_current_employee(current_user) if current_employee is not None and str(claim.employee_id or "").strip() == current_employee.id: return False claim_employee = claim.employee if current_employee is not None and claim_employee is not None: if claim_employee.manager_id == current_employee.id: return True if claim_employee.manager is not None and claim_employee.manager.id == current_employee.id: return True approver_name = str( current_employee.name if current_employee is not None and current_employee.name else current_user.name or "" ).strip() if not approver_name: return False return self.resolve_claim_manager_name(claim) == approver_name @staticmethod def normalize_role_codes(current_user: CurrentUserContext) -> set[str]: return { str(item).strip().lower() for item in current_user.role_codes if str(item).strip() } def resolve_current_employee(self, current_user: CurrentUserContext) -> Employee | None: return self.resolve_employee_by_identity_candidates( [ str(current_user.username or "").strip(), str(current_user.name or "").strip(), ] ) def resolve_current_user_display_name(self, current_user: CurrentUserContext) -> str: current_employee = self.resolve_current_employee(current_user) if current_employee is not None and str(current_employee.name or "").strip(): return str(current_employee.name).strip() for candidate in (current_user.name, current_user.username): normalized = str(candidate or "").strip() if normalized and not self.is_email_like(normalized): return normalized return str(current_user.username or current_user.name or "anonymous").strip() or "anonymous" def is_claim_owned_by_current_user(self, claim: ExpenseClaim, current_user: CurrentUserContext) -> bool: current_employee = self.resolve_current_employee(current_user) if current_employee is not None: if str(claim.employee_id or "").strip() == current_employee.id: return True identity_values = { str(current_employee.name or "").strip(), str(current_employee.email or "").strip(), str(current_employee.employee_no or "").strip(), } else: identity_values = set() identity_values.update( { str(current_user.username or "").strip(), str(current_user.name or "").strip(), } ) identity_values.discard("") return str(claim.employee_name or "").strip() in identity_values @staticmethod def is_email_like(value: str) -> bool: return bool(re.match(r"^[^@\s]+@[^@\s]+\.[^@\s]+$", str(value or "").strip())) def resolve_claim_employee_for_backfill(self, claim: ExpenseClaim) -> Employee | None: if claim.employee is not None: employee = self.db.scalar( select(Employee) .options( selectinload(Employee.organization_unit), selectinload(Employee.manager), selectinload(Employee.roles), ) .where(Employee.id == claim.employee.id) .limit(1) ) return employee or claim.employee employee_id = str(claim.employee_id or "").strip() if employee_id: employee = self.db.scalar( select(Employee) .options( selectinload(Employee.organization_unit), selectinload(Employee.manager), selectinload(Employee.roles), ) .where(Employee.id == employee_id) .limit(1) ) if employee is not None: return employee return self.resolve_employee_by_identity_candidates([str(claim.employee_name or "").strip()]) def resolve_employee_by_identity_candidates(self, candidates: list[str]) -> Employee | None: normalized_candidates = [ item for item in dict.fromkeys(str(candidate or "").strip() for candidate in candidates) if item ] if not normalized_candidates: return None load_options = ( selectinload(Employee.organization_unit), selectinload(Employee.manager), selectinload(Employee.roles), ) for candidate in normalized_candidates: employee = self.db.scalar( select(Employee) .options(*load_options) .where( or_( func.lower(Employee.email) == candidate.lower(), func.lower(Employee.employee_no) == candidate.lower(), ) ) .limit(1) ) if employee is not None: return employee for candidate in normalized_candidates: matches = list( self.db.scalars( select(Employee) .options(*load_options) .where(Employee.name == candidate) .limit(2) ).all() ) if len(matches) == 1: return matches[0] return None def backfill_claim_identity_from_current_user( self, claim: ExpenseClaim, current_user: CurrentUserContext, ) -> None: employee = self.resolve_claim_employee_for_backfill(claim) or self.resolve_current_employee(current_user) if employee is not None: claim_employee_id = str(claim.employee_id or "").strip() claim_employee_name = str(claim.employee_name or "").strip() employee_names = { str(employee.name or "").strip(), str(employee.email or "").strip(), str(employee.employee_no or "").strip(), } employee_names.discard("") can_apply_employee = ( not claim_employee_id or claim_employee_id == employee.id or self.is_missing_value(claim_employee_name) or claim_employee_name in employee_names ) if can_apply_employee: claim.employee = employee claim.employee_id = employee.id if employee.name: claim.employee_name = employee.name if employee.organization_unit is not None: claim.department_id = employee.organization_unit_id claim.department_name = employee.organization_unit.name return context_department = str( getattr(current_user, "department_name", "") or getattr(current_user, "department", "") or getattr(current_user, "departmentName", "") or "" ).strip() if context_department and self.is_missing_value(claim.department_name): claim.department_name = context_department context_name = str(current_user.name or current_user.username or "").strip() if context_name and self.is_missing_value(claim.employee_name): claim.employee_name = context_name def employee_name_is_unique(self, employee: Employee) -> bool: normalized_name = str(employee.name or "").strip() if not normalized_name: return False same_name_count = int( self.db.scalar( select(func.count()).select_from(Employee).where(Employee.name == normalized_name) ) or 0 ) return same_name_count == 1 def build_personal_claim_conditions(self, current_user: CurrentUserContext) -> list[Any]: conditions = [] username = str(current_user.username or "").strip() employee = self.resolve_current_employee(current_user) def add_condition(field_name: str, value: str | None) -> None: normalized = str(value or "").strip() if not normalized: return if field_name == "employee_id": conditions.append(ExpenseClaim.employee_id == normalized) return conditions.append(ExpenseClaim.employee_name == normalized) if employee is not None: add_condition("employee_id", employee.id) add_condition("employee_name", employee.email) if self.employee_name_is_unique(employee): add_condition("employee_name", employee.name) else: add_condition("employee_id", username) add_condition("employee_name", username) add_condition("employee_name", str(current_user.name or "").strip()) return conditions def build_approval_claim_conditions(self, current_user: CurrentUserContext) -> list[Any]: role_codes = self.normalize_role_codes(current_user) if not (role_codes & APPROVAL_VISIBLE_CLAIM_ROLE_CODES): return [] employee = self.resolve_current_employee(current_user) manager_name = str( employee.name if employee is not None and employee.name else current_user.name or "" ).strip() pending_leader_approval_parts = [ ExpenseClaim.status == "submitted", ExpenseClaim.approval_stage == "直属领导审批", ] if employee is not None: pending_leader_approval_parts.append( or_(ExpenseClaim.employee_id.is_(None), ExpenseClaim.employee_id != employee.id) ) if manager_name: pending_leader_approval_parts.append(ExpenseClaim.employee_name != manager_name) pending_leader_approval = and_(*pending_leader_approval_parts) conditions = [] if employee is not None: subordinate_ids = select(Employee.id).where(Employee.manager_id == employee.id) conditions.append(and_(pending_leader_approval, ExpenseClaim.employee_id.in_(subordinate_ids))) if manager_name: managed_department_ids = select(OrganizationUnit.id).where(OrganizationUnit.manager_name == manager_name) managed_department_names = select(OrganizationUnit.name).where(OrganizationUnit.manager_name == manager_name) conditions.append(and_(pending_leader_approval, ExpenseClaim.department_id.in_(managed_department_ids))) conditions.append(and_(pending_leader_approval, ExpenseClaim.department_name.in_(managed_department_names))) return conditions def apply_approval_claim_scope(self, stmt: Any, current_user: CurrentUserContext) -> Any: role_codes = self.normalize_role_codes(current_user) if current_user.is_admin or "executive" in role_codes: return stmt.where(ExpenseClaim.status == "submitted") if "finance" in role_codes: return stmt.where( ExpenseClaim.status == "submitted", ExpenseClaim.approval_stage == "财务审批", ) conditions = self.build_approval_claim_conditions(current_user) if not conditions: return stmt.where(ExpenseClaim.id == "__no_visible_claim__") return stmt.where(or_(*conditions)) def apply_claim_scope( self, stmt: Any, current_user: CurrentUserContext, *, include_approval_scope: bool = False, ) -> Any: if self.has_privileged_claim_access(current_user): owned_conditions = self.build_personal_claim_conditions(current_user) archived_condition = self.build_archived_claim_condition() if owned_conditions: return stmt.where( or_( ~archived_condition, and_(archived_condition, or_(*owned_conditions)), ) ) return stmt.where(~archived_condition) conditions = self.build_personal_claim_conditions(current_user) if not conditions: return stmt.where(ExpenseClaim.id == "__no_visible_claim__") if include_approval_scope: conditions.extend(self.build_approval_claim_conditions(current_user)) return stmt.where(or_(*conditions)) def apply_archived_claim_scope(self, stmt: Any, current_user: CurrentUserContext) -> Any: archived_condition = self.build_archived_claim_condition() if not self.has_archive_center_access(current_user): owned_conditions = self.build_personal_claim_conditions(current_user) if not owned_conditions: return stmt.where(ExpenseClaim.id == "__no_visible_claim__") return stmt.where(archived_condition, or_(*owned_conditions)) return stmt.where(archived_condition) @staticmethod def resolve_claim_manager_name(claim: ExpenseClaim) -> str: if claim.employee is not None: if claim.employee.manager is not None and claim.employee.manager.name: return str(claim.employee.manager.name).strip() if claim.employee.organization_unit is not None and claim.employee.organization_unit.manager_name: return str(claim.employee.organization_unit.manager_name).strip() return "" @staticmethod def is_missing_value(value: Any) -> bool: normalized = str(value or "").strip() return not normalized or normalized in {"待补充", "待确认", "N/A", "n/a", "无"}