402 lines
16 KiB
Python
402 lines
16 KiB
Python
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import re
|
||
|
|
from typing import Any
|
||
|
|
|
||
|
|
from sqlalchemy import and_, func, or_, select
|
||
|
|
from sqlalchemy.orm import Session, selectinload
|
||
|
|
|
||
|
|
from app.api.deps import CurrentUserContext
|
||
|
|
from app.models.employee import Employee
|
||
|
|
from app.models.financial_record import ExpenseClaim
|
||
|
|
from app.models.organization import OrganizationUnit
|
||
|
|
|
||
|
|
|
||
|
|
PRIVILEGED_CLAIM_ROLE_CODES = {"finance", "executive"}
|
||
|
|
APPROVAL_VISIBLE_CLAIM_ROLE_CODES = {"manager", "approver"}
|
||
|
|
CLAIM_DELETE_ROLE_CODES = {"executive"}
|
||
|
|
|
||
|
|
|
||
|
|
class ExpenseClaimAccessPolicy:
|
||
|
|
def __init__(self, db: Session) -> None:
|
||
|
|
self.db = db
|
||
|
|
|
||
|
|
@staticmethod
|
||
|
|
def has_privileged_claim_access(current_user: CurrentUserContext) -> bool:
|
||
|
|
if current_user.is_admin:
|
||
|
|
return True
|
||
|
|
return bool(ExpenseClaimAccessPolicy.normalize_role_codes(current_user) & PRIVILEGED_CLAIM_ROLE_CODES)
|
||
|
|
|
||
|
|
@staticmethod
|
||
|
|
def has_claim_delete_access(current_user: CurrentUserContext) -> bool:
|
||
|
|
if current_user.is_admin:
|
||
|
|
return True
|
||
|
|
return bool(ExpenseClaimAccessPolicy.normalize_role_codes(current_user) & CLAIM_DELETE_ROLE_CODES)
|
||
|
|
|
||
|
|
def can_return_claim(self, current_user: CurrentUserContext, claim: ExpenseClaim) -> bool:
|
||
|
|
if self.has_privileged_claim_access(current_user):
|
||
|
|
return True
|
||
|
|
|
||
|
|
role_codes = self.normalize_role_codes(current_user)
|
||
|
|
if not (role_codes & APPROVAL_VISIBLE_CLAIM_ROLE_CODES):
|
||
|
|
return False
|
||
|
|
if str(claim.status or "").strip().lower() != "submitted":
|
||
|
|
return False
|
||
|
|
if str(claim.approval_stage or "").strip() != "直属领导审批":
|
||
|
|
return False
|
||
|
|
|
||
|
|
current_employee = self.resolve_current_employee(current_user)
|
||
|
|
if current_employee is not None and str(claim.employee_id or "").strip() == current_employee.id:
|
||
|
|
return False
|
||
|
|
|
||
|
|
claim_employee = claim.employee
|
||
|
|
if current_employee is not None and claim_employee is not None:
|
||
|
|
if claim_employee.manager_id == current_employee.id:
|
||
|
|
return True
|
||
|
|
if claim_employee.manager is not None and claim_employee.manager.id == current_employee.id:
|
||
|
|
return True
|
||
|
|
|
||
|
|
approver_name = str(
|
||
|
|
current_employee.name if current_employee is not None and current_employee.name else current_user.name or ""
|
||
|
|
).strip()
|
||
|
|
if not approver_name:
|
||
|
|
return False
|
||
|
|
|
||
|
|
return self.resolve_claim_manager_name(claim) == approver_name
|
||
|
|
|
||
|
|
def can_approve_claim(self, current_user: CurrentUserContext, claim: ExpenseClaim) -> bool:
|
||
|
|
stage = str(claim.approval_stage or "").strip()
|
||
|
|
if stage == "直属领导审批":
|
||
|
|
return self.is_current_direct_manager_approver(current_user, claim)
|
||
|
|
if stage == "财务审批":
|
||
|
|
role_codes = self.normalize_role_codes(current_user)
|
||
|
|
return current_user.is_admin or "finance" in role_codes
|
||
|
|
return False
|
||
|
|
|
||
|
|
def is_current_direct_manager_approver(self, current_user: CurrentUserContext, claim: ExpenseClaim) -> bool:
|
||
|
|
role_codes = self.normalize_role_codes(current_user)
|
||
|
|
if not (role_codes & APPROVAL_VISIBLE_CLAIM_ROLE_CODES):
|
||
|
|
return False
|
||
|
|
if str(claim.status or "").strip().lower() != "submitted":
|
||
|
|
return False
|
||
|
|
if str(claim.approval_stage or "").strip() != "直属领导审批":
|
||
|
|
return False
|
||
|
|
|
||
|
|
current_employee = self.resolve_current_employee(current_user)
|
||
|
|
if current_employee is not None and str(claim.employee_id or "").strip() == current_employee.id:
|
||
|
|
return False
|
||
|
|
|
||
|
|
claim_employee = claim.employee
|
||
|
|
if current_employee is not None and claim_employee is not None:
|
||
|
|
if claim_employee.manager_id == current_employee.id:
|
||
|
|
return True
|
||
|
|
if claim_employee.manager is not None and claim_employee.manager.id == current_employee.id:
|
||
|
|
return True
|
||
|
|
|
||
|
|
approver_name = str(
|
||
|
|
current_employee.name if current_employee is not None and current_employee.name else current_user.name or ""
|
||
|
|
).strip()
|
||
|
|
if not approver_name:
|
||
|
|
return False
|
||
|
|
|
||
|
|
return self.resolve_claim_manager_name(claim) == approver_name
|
||
|
|
|
||
|
|
@staticmethod
|
||
|
|
def normalize_role_codes(current_user: CurrentUserContext) -> set[str]:
|
||
|
|
return {
|
||
|
|
str(item).strip().lower()
|
||
|
|
for item in current_user.role_codes
|
||
|
|
if str(item).strip()
|
||
|
|
}
|
||
|
|
|
||
|
|
def resolve_current_employee(self, current_user: CurrentUserContext) -> Employee | None:
|
||
|
|
return self.resolve_employee_by_identity_candidates(
|
||
|
|
[
|
||
|
|
str(current_user.username or "").strip(),
|
||
|
|
str(current_user.name or "").strip(),
|
||
|
|
]
|
||
|
|
)
|
||
|
|
|
||
|
|
def resolve_current_user_display_name(self, current_user: CurrentUserContext) -> str:
|
||
|
|
current_employee = self.resolve_current_employee(current_user)
|
||
|
|
if current_employee is not None and str(current_employee.name or "").strip():
|
||
|
|
return str(current_employee.name).strip()
|
||
|
|
|
||
|
|
for candidate in (current_user.name, current_user.username):
|
||
|
|
normalized = str(candidate or "").strip()
|
||
|
|
if normalized and not self.is_email_like(normalized):
|
||
|
|
return normalized
|
||
|
|
|
||
|
|
return str(current_user.username or current_user.name or "anonymous").strip() or "anonymous"
|
||
|
|
|
||
|
|
def is_claim_owned_by_current_user(self, claim: ExpenseClaim, current_user: CurrentUserContext) -> bool:
|
||
|
|
current_employee = self.resolve_current_employee(current_user)
|
||
|
|
if current_employee is not None:
|
||
|
|
if str(claim.employee_id or "").strip() == current_employee.id:
|
||
|
|
return True
|
||
|
|
identity_values = {
|
||
|
|
str(current_employee.name or "").strip(),
|
||
|
|
str(current_employee.email or "").strip(),
|
||
|
|
str(current_employee.employee_no or "").strip(),
|
||
|
|
}
|
||
|
|
else:
|
||
|
|
identity_values = set()
|
||
|
|
|
||
|
|
identity_values.update(
|
||
|
|
{
|
||
|
|
str(current_user.username or "").strip(),
|
||
|
|
str(current_user.name or "").strip(),
|
||
|
|
}
|
||
|
|
)
|
||
|
|
identity_values.discard("")
|
||
|
|
return str(claim.employee_name or "").strip() in identity_values
|
||
|
|
|
||
|
|
@staticmethod
|
||
|
|
def is_email_like(value: str) -> bool:
|
||
|
|
return bool(re.match(r"^[^@\s]+@[^@\s]+\.[^@\s]+$", str(value or "").strip()))
|
||
|
|
|
||
|
|
def resolve_claim_employee_for_backfill(self, claim: ExpenseClaim) -> Employee | None:
|
||
|
|
if claim.employee is not None:
|
||
|
|
employee = self.db.scalar(
|
||
|
|
select(Employee)
|
||
|
|
.options(
|
||
|
|
selectinload(Employee.organization_unit),
|
||
|
|
selectinload(Employee.manager),
|
||
|
|
selectinload(Employee.roles),
|
||
|
|
)
|
||
|
|
.where(Employee.id == claim.employee.id)
|
||
|
|
.limit(1)
|
||
|
|
)
|
||
|
|
return employee or claim.employee
|
||
|
|
|
||
|
|
employee_id = str(claim.employee_id or "").strip()
|
||
|
|
if employee_id:
|
||
|
|
employee = self.db.scalar(
|
||
|
|
select(Employee)
|
||
|
|
.options(
|
||
|
|
selectinload(Employee.organization_unit),
|
||
|
|
selectinload(Employee.manager),
|
||
|
|
selectinload(Employee.roles),
|
||
|
|
)
|
||
|
|
.where(Employee.id == employee_id)
|
||
|
|
.limit(1)
|
||
|
|
)
|
||
|
|
if employee is not None:
|
||
|
|
return employee
|
||
|
|
|
||
|
|
return self.resolve_employee_by_identity_candidates([str(claim.employee_name or "").strip()])
|
||
|
|
|
||
|
|
def resolve_employee_by_identity_candidates(self, candidates: list[str]) -> Employee | None:
|
||
|
|
normalized_candidates = [
|
||
|
|
item
|
||
|
|
for item in dict.fromkeys(str(candidate or "").strip() for candidate in candidates)
|
||
|
|
if item
|
||
|
|
]
|
||
|
|
if not normalized_candidates:
|
||
|
|
return None
|
||
|
|
|
||
|
|
load_options = (
|
||
|
|
selectinload(Employee.organization_unit),
|
||
|
|
selectinload(Employee.manager),
|
||
|
|
selectinload(Employee.roles),
|
||
|
|
)
|
||
|
|
|
||
|
|
for candidate in normalized_candidates:
|
||
|
|
employee = self.db.scalar(
|
||
|
|
select(Employee)
|
||
|
|
.options(*load_options)
|
||
|
|
.where(
|
||
|
|
or_(
|
||
|
|
func.lower(Employee.email) == candidate.lower(),
|
||
|
|
func.lower(Employee.employee_no) == candidate.lower(),
|
||
|
|
)
|
||
|
|
)
|
||
|
|
.limit(1)
|
||
|
|
)
|
||
|
|
if employee is not None:
|
||
|
|
return employee
|
||
|
|
|
||
|
|
for candidate in normalized_candidates:
|
||
|
|
matches = list(
|
||
|
|
self.db.scalars(
|
||
|
|
select(Employee)
|
||
|
|
.options(*load_options)
|
||
|
|
.where(Employee.name == candidate)
|
||
|
|
.limit(2)
|
||
|
|
).all()
|
||
|
|
)
|
||
|
|
if len(matches) == 1:
|
||
|
|
return matches[0]
|
||
|
|
|
||
|
|
return None
|
||
|
|
|
||
|
|
def backfill_claim_identity_from_current_user(
|
||
|
|
self,
|
||
|
|
claim: ExpenseClaim,
|
||
|
|
current_user: CurrentUserContext,
|
||
|
|
) -> None:
|
||
|
|
employee = self.resolve_claim_employee_for_backfill(claim) or self.resolve_current_employee(current_user)
|
||
|
|
|
||
|
|
if employee is not None:
|
||
|
|
claim_employee_id = str(claim.employee_id or "").strip()
|
||
|
|
claim_employee_name = str(claim.employee_name or "").strip()
|
||
|
|
employee_names = {
|
||
|
|
str(employee.name or "").strip(),
|
||
|
|
str(employee.email or "").strip(),
|
||
|
|
str(employee.employee_no or "").strip(),
|
||
|
|
}
|
||
|
|
employee_names.discard("")
|
||
|
|
|
||
|
|
can_apply_employee = (
|
||
|
|
not claim_employee_id
|
||
|
|
or claim_employee_id == employee.id
|
||
|
|
or self.is_missing_value(claim_employee_name)
|
||
|
|
or claim_employee_name in employee_names
|
||
|
|
)
|
||
|
|
|
||
|
|
if can_apply_employee:
|
||
|
|
claim.employee = employee
|
||
|
|
claim.employee_id = employee.id
|
||
|
|
if employee.name:
|
||
|
|
claim.employee_name = employee.name
|
||
|
|
if employee.organization_unit is not None:
|
||
|
|
claim.department_id = employee.organization_unit_id
|
||
|
|
claim.department_name = employee.organization_unit.name
|
||
|
|
return
|
||
|
|
|
||
|
|
context_department = str(
|
||
|
|
getattr(current_user, "department_name", "")
|
||
|
|
or getattr(current_user, "department", "")
|
||
|
|
or getattr(current_user, "departmentName", "")
|
||
|
|
or ""
|
||
|
|
).strip()
|
||
|
|
if context_department and self.is_missing_value(claim.department_name):
|
||
|
|
claim.department_name = context_department
|
||
|
|
|
||
|
|
context_name = str(current_user.name or current_user.username or "").strip()
|
||
|
|
if context_name and self.is_missing_value(claim.employee_name):
|
||
|
|
claim.employee_name = context_name
|
||
|
|
|
||
|
|
def employee_name_is_unique(self, employee: Employee) -> bool:
|
||
|
|
normalized_name = str(employee.name or "").strip()
|
||
|
|
if not normalized_name:
|
||
|
|
return False
|
||
|
|
|
||
|
|
same_name_count = int(
|
||
|
|
self.db.scalar(
|
||
|
|
select(func.count()).select_from(Employee).where(Employee.name == normalized_name)
|
||
|
|
)
|
||
|
|
or 0
|
||
|
|
)
|
||
|
|
return same_name_count == 1
|
||
|
|
|
||
|
|
def build_personal_claim_conditions(self, current_user: CurrentUserContext) -> list[Any]:
|
||
|
|
conditions = []
|
||
|
|
username = str(current_user.username or "").strip()
|
||
|
|
employee = self.resolve_current_employee(current_user)
|
||
|
|
|
||
|
|
def add_condition(field_name: str, value: str | None) -> None:
|
||
|
|
normalized = str(value or "").strip()
|
||
|
|
if not normalized:
|
||
|
|
return
|
||
|
|
if field_name == "employee_id":
|
||
|
|
conditions.append(ExpenseClaim.employee_id == normalized)
|
||
|
|
return
|
||
|
|
conditions.append(ExpenseClaim.employee_name == normalized)
|
||
|
|
|
||
|
|
if employee is not None:
|
||
|
|
add_condition("employee_id", employee.id)
|
||
|
|
add_condition("employee_name", employee.email)
|
||
|
|
if self.employee_name_is_unique(employee):
|
||
|
|
add_condition("employee_name", employee.name)
|
||
|
|
else:
|
||
|
|
add_condition("employee_id", username)
|
||
|
|
add_condition("employee_name", username)
|
||
|
|
|
||
|
|
return conditions
|
||
|
|
|
||
|
|
def build_approval_claim_conditions(self, current_user: CurrentUserContext) -> list[Any]:
|
||
|
|
role_codes = self.normalize_role_codes(current_user)
|
||
|
|
if not (role_codes & APPROVAL_VISIBLE_CLAIM_ROLE_CODES):
|
||
|
|
return []
|
||
|
|
|
||
|
|
employee = self.resolve_current_employee(current_user)
|
||
|
|
manager_name = str(
|
||
|
|
employee.name if employee is not None and employee.name else current_user.name or ""
|
||
|
|
).strip()
|
||
|
|
pending_leader_approval_parts = [
|
||
|
|
ExpenseClaim.status == "submitted",
|
||
|
|
ExpenseClaim.approval_stage == "直属领导审批",
|
||
|
|
]
|
||
|
|
if employee is not None:
|
||
|
|
pending_leader_approval_parts.append(
|
||
|
|
or_(ExpenseClaim.employee_id.is_(None), ExpenseClaim.employee_id != employee.id)
|
||
|
|
)
|
||
|
|
if manager_name:
|
||
|
|
pending_leader_approval_parts.append(ExpenseClaim.employee_name != manager_name)
|
||
|
|
|
||
|
|
pending_leader_approval = and_(*pending_leader_approval_parts)
|
||
|
|
conditions = []
|
||
|
|
|
||
|
|
if employee is not None:
|
||
|
|
subordinate_ids = select(Employee.id).where(Employee.manager_id == employee.id)
|
||
|
|
conditions.append(and_(pending_leader_approval, ExpenseClaim.employee_id.in_(subordinate_ids)))
|
||
|
|
|
||
|
|
if manager_name:
|
||
|
|
managed_department_ids = select(OrganizationUnit.id).where(OrganizationUnit.manager_name == manager_name)
|
||
|
|
managed_department_names = select(OrganizationUnit.name).where(OrganizationUnit.manager_name == manager_name)
|
||
|
|
conditions.append(and_(pending_leader_approval, ExpenseClaim.department_id.in_(managed_department_ids)))
|
||
|
|
conditions.append(and_(pending_leader_approval, ExpenseClaim.department_name.in_(managed_department_names)))
|
||
|
|
|
||
|
|
return conditions
|
||
|
|
|
||
|
|
def apply_approval_claim_scope(self, stmt: Any, current_user: CurrentUserContext) -> Any:
|
||
|
|
role_codes = self.normalize_role_codes(current_user)
|
||
|
|
if current_user.is_admin or "executive" in role_codes:
|
||
|
|
return stmt.where(ExpenseClaim.status == "submitted")
|
||
|
|
if "finance" in role_codes:
|
||
|
|
return stmt.where(
|
||
|
|
ExpenseClaim.status == "submitted",
|
||
|
|
ExpenseClaim.approval_stage == "财务审批",
|
||
|
|
)
|
||
|
|
|
||
|
|
conditions = self.build_approval_claim_conditions(current_user)
|
||
|
|
if not conditions:
|
||
|
|
return stmt.where(ExpenseClaim.id == "__no_visible_claim__")
|
||
|
|
|
||
|
|
return stmt.where(or_(*conditions))
|
||
|
|
|
||
|
|
def apply_claim_scope(
|
||
|
|
self,
|
||
|
|
stmt: Any,
|
||
|
|
current_user: CurrentUserContext,
|
||
|
|
*,
|
||
|
|
include_approval_scope: bool = False,
|
||
|
|
) -> Any:
|
||
|
|
if self.has_privileged_claim_access(current_user):
|
||
|
|
return stmt
|
||
|
|
|
||
|
|
conditions = self.build_personal_claim_conditions(current_user)
|
||
|
|
|
||
|
|
if not conditions:
|
||
|
|
return stmt.where(ExpenseClaim.id == "__no_visible_claim__")
|
||
|
|
|
||
|
|
if include_approval_scope:
|
||
|
|
conditions.extend(self.build_approval_claim_conditions(current_user))
|
||
|
|
|
||
|
|
return stmt.where(or_(*conditions))
|
||
|
|
|
||
|
|
@staticmethod
|
||
|
|
def resolve_claim_manager_name(claim: ExpenseClaim) -> str:
|
||
|
|
if claim.employee is not None:
|
||
|
|
if claim.employee.manager is not None and claim.employee.manager.name:
|
||
|
|
return str(claim.employee.manager.name).strip()
|
||
|
|
if claim.employee.organization_unit is not None and claim.employee.organization_unit.manager_name:
|
||
|
|
return str(claim.employee.organization_unit.manager_name).strip()
|
||
|
|
return ""
|
||
|
|
|
||
|
|
@staticmethod
|
||
|
|
def is_missing_value(value: Any) -> bool:
|
||
|
|
normalized = str(value or "").strip()
|
||
|
|
return not normalized or normalized in {"待补充", "待确认", "N/A", "n/a", "无"}
|