100 lines
3.7 KiB
Python
100 lines
3.7 KiB
Python
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import argparse
|
||
|
|
from typing import Iterable
|
||
|
|
|
||
|
|
from app.api.deps import CurrentUserContext
|
||
|
|
from app.db.session import get_session_factory
|
||
|
|
from app.models.employee import Employee
|
||
|
|
from app.models.financial_record import ExpenseClaim
|
||
|
|
from app.services.expense_claim_workflow_constants import BUDGET_MANAGER_APPROVAL_STAGE
|
||
|
|
from app.services.expense_claims import ExpenseClaimService
|
||
|
|
|
||
|
|
|
||
|
|
def _role_codes(employee: Employee) -> list[str]:
|
||
|
|
return [
|
||
|
|
str(role.role_code or "").strip().lower()
|
||
|
|
for role in list(employee.roles or [])
|
||
|
|
if str(role.role_code or "").strip()
|
||
|
|
]
|
||
|
|
|
||
|
|
|
||
|
|
def _is_direct_manager(employee: Employee | None, budget_manager: Employee | None) -> bool:
|
||
|
|
if employee is None or budget_manager is None:
|
||
|
|
return False
|
||
|
|
if employee.manager_id and employee.manager_id == budget_manager.id:
|
||
|
|
return True
|
||
|
|
return employee.manager is not None and employee.manager.id == budget_manager.id
|
||
|
|
|
||
|
|
|
||
|
|
def _budget_manager_context(budget_manager: Employee) -> CurrentUserContext:
|
||
|
|
return CurrentUserContext(
|
||
|
|
username=str(budget_manager.email or budget_manager.employee_no or budget_manager.name or "").strip(),
|
||
|
|
name=str(budget_manager.name or "").strip(),
|
||
|
|
role_codes=_role_codes(budget_manager),
|
||
|
|
is_admin=False,
|
||
|
|
department_name=str(
|
||
|
|
budget_manager.organization_unit.name
|
||
|
|
if budget_manager.organization_unit is not None
|
||
|
|
else ""
|
||
|
|
).strip(),
|
||
|
|
grade=str(budget_manager.grade or "").strip(),
|
||
|
|
employee_no=str(budget_manager.employee_no or "").strip(),
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
def _iter_candidates(service: ExpenseClaimService) -> Iterable[tuple[ExpenseClaim, Employee]]:
|
||
|
|
claims = service.db.query(ExpenseClaim).filter(
|
||
|
|
ExpenseClaim.status == "submitted",
|
||
|
|
ExpenseClaim.approval_stage == BUDGET_MANAGER_APPROVAL_STAGE,
|
||
|
|
).all()
|
||
|
|
for claim in claims:
|
||
|
|
if not service._is_expense_application_claim(claim):
|
||
|
|
continue
|
||
|
|
budget_manager = service._access_policy.resolve_department_budget_manager(claim)
|
||
|
|
if budget_manager is None:
|
||
|
|
continue
|
||
|
|
if not _is_direct_manager(claim.employee, budget_manager):
|
||
|
|
continue
|
||
|
|
yield claim, budget_manager
|
||
|
|
|
||
|
|
|
||
|
|
def main() -> None:
|
||
|
|
parser = argparse.ArgumentParser(
|
||
|
|
description="Repair application claims stuck at budget approval when the direct manager is the budget approver.",
|
||
|
|
)
|
||
|
|
parser.add_argument("--apply", action="store_true", help="Apply the repair. Without it, only prints candidates.")
|
||
|
|
args = parser.parse_args()
|
||
|
|
|
||
|
|
session_factory = get_session_factory()
|
||
|
|
with session_factory() as db:
|
||
|
|
service = ExpenseClaimService(db)
|
||
|
|
candidates = list(_iter_candidates(service))
|
||
|
|
print(f"candidates={len(candidates)}")
|
||
|
|
for claim, budget_manager in candidates:
|
||
|
|
print(
|
||
|
|
"candidate "
|
||
|
|
f"claim_no={claim.claim_no} "
|
||
|
|
f"claim_id={claim.id} "
|
||
|
|
f"employee={claim.employee_name} "
|
||
|
|
f"budget_manager={budget_manager.name}"
|
||
|
|
)
|
||
|
|
if not args.apply:
|
||
|
|
continue
|
||
|
|
|
||
|
|
repaired = service.approve_claim(
|
||
|
|
claim.id,
|
||
|
|
_budget_manager_context(budget_manager),
|
||
|
|
opinion="历史流程修复:直属领导与预算审批人为同一人,合并预算审批。",
|
||
|
|
)
|
||
|
|
print(
|
||
|
|
"repaired "
|
||
|
|
f"claim_no={repaired.claim_no if repaired is not None else claim.claim_no} "
|
||
|
|
f"status={repaired.status if repaired is not None else ''} "
|
||
|
|
f"stage={repaired.approval_stage if repaired is not None else ''}"
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
main()
|