#!/usr/bin/env python3 from __future__ import annotations import argparse import json import sys import uuid from collections import defaultdict from dataclasses import asdict, dataclass from datetime import UTC, date, datetime from decimal import Decimal from pathlib import Path from typing import Any from sqlalchemy import select from sqlalchemy.orm import selectinload SERVER_DIR = Path(__file__).resolve().parents[1] SRC_DIR = SERVER_DIR / "src" if str(SRC_DIR) not in sys.path: sys.path.insert(0, str(SRC_DIR)) from app.db.session import get_session_factory # noqa: E402 from app.models.budget import BudgetAllocation, BudgetReservation, BudgetTransaction # noqa: E402 from app.models.employee import Employee # noqa: E402 from app.models.financial_record import ExpenseClaim # noqa: E402 from app.models.organization import OrganizationUnit # noqa: E402 from app.services.demo_company_simulation_catalog import ( # noqa: E402 BUDGETED_STATUSES, PENDING_STATUSES, SIM_BUDGET_PREFIX, SIM_EMPLOYEE_PREFIX, SIM_PROJECT_CODE, SIM_RESERVATION_PREFIX, SIM_TRANSACTION_PREFIX, SUBJECT_LABELS, SUCCESS_STATUSES, target_budget_usage, ) from app.services.demo_company_simulation_filters import is_admin_employee_like # noqa: E402 from app.services.employee_behavior_profile_service import ( # noqa: E402 EmployeeBehaviorProfileService, ) from app.services.expense_claim_status_registry import ( # noqa: E402 normalize_expense_claim_state, ) DEPARTMENT_PLAN = ( ("TECH-DEPT", Decimal("0.30")), ("MARKET-DEPT", Decimal("0.24")), ("PRODUCTION-DEPT", Decimal("0.18")), ("FINANCE-DEPT", Decimal("0.12")), ("HR-DEPT", Decimal("0.10")), ("PRESIDENT-OFFICE", Decimal("0.06")), ) RECENT_PENDING_PER_DEPARTMENT = 3 RECENT_DATES = ( datetime(2026, 6, 1, 10, 0, tzinfo=UTC), datetime(2026, 6, 1, 15, 0, tzinfo=UTC), datetime(2026, 6, 2, 6, 0, tzinfo=UTC), ) PERIOD_START = date(2026, 1, 1) PERIOD_END = date(2026, 6, 2) @dataclass(frozen=True, slots=True) class RepairSummary: mode: str sim_employees: int sim_claims: int employee_department_plan: dict[str, int] claim_department_plan: dict[str, int] recent_pending_plan: dict[str, int] rebuilt_budget_allocations: int rebuilt_budget_transactions: int rebuilt_budget_reservations: int before_all_department_amounts: dict[str, str] before_recent_pending_amounts: dict[str, str] after_all_department_amounts: dict[str, str] after_recent_pending_amounts: dict[str, str] def to_dict(self) -> dict[str, Any]: return asdict(self) def main() -> None: parser = argparse.ArgumentParser( description="Repair simulated half-year demo data distribution." ) parser.add_argument("--apply", action="store_true", help="Apply repair. Default is dry-run.") parser.add_argument( "--refresh-profiles", action="store_true", help="After --apply, refresh employee behavior profile snapshots for simulated employees.", ) parser.add_argument("--profile-limit", type=int, default=120) args = parser.parse_args() session_factory = get_session_factory() with session_factory() as db: try: summary = repair_distribution(db, apply=args.apply) profile_refresh = None if args.apply and args.refresh_profiles: profile_refresh = _refresh_company_profiles(db, limit=args.profile_limit) if args.apply: db.commit() payload = summary.to_dict() if profile_refresh is not None: payload["profile_refresh"] = profile_refresh print(json.dumps(payload, ensure_ascii=False, indent=2)) if not args.apply: print("dry-run only; pass --apply after confirmation to repair simulated data.") elif not args.refresh_profiles: print("pass --refresh-profiles to generate employee behavior profile snapshots.") except Exception: db.rollback() raise def repair_distribution(db, *, apply: bool) -> RepairSummary: departments = _canonical_departments(db) if len(departments) < len(DEPARTMENT_PLAN): missing = [code for code, _ in DEPARTMENT_PLAN if code not in departments] raise RuntimeError(f"missing canonical departments: {missing}") sim_employees = _sim_employees(db) sim_claims = _sim_claims(db) before_all = _department_amounts(sim_claims) before_recent = _recent_pending_amounts(sim_claims) employee_plan = _counts_by_weight(len(sim_employees)) claim_plan = _counts_by_weight(len(sim_claims)) recent_claims = _recent_claims(sim_claims) fixed_recent_plan = {code: RECENT_PENDING_PER_DEPARTMENT for code, _ in DEPARTMENT_PLAN} regular_plan = { code: max(claim_plan.get(code, 0) - fixed_recent_plan.get(code, 0), 0) for code, _ in DEPARTMENT_PLAN } if apply: _normalize_sim_claim_workflow(sim_claims) _clamp_sim_claim_dates(sim_claims) _redistribute_employees(sim_employees, departments, employee_plan) db.flush() employees_by_dept = _employees_by_department(db) _redistribute_regular_claims( [claim for claim in sim_claims if claim not in set(recent_claims)], departments, employees_by_dept, regular_plan, ) _repair_recent_pending_claims(recent_claims, departments, employees_by_dept) db.flush() _rebuild_sim_budget(db, sim_claims, departments) db.flush() after_claims = ( _sim_claims(db) if apply else _preview_claims(sim_claims, departments, claim_plan) ) after_all = _department_amounts(after_claims) after_recent = _recent_pending_amounts(after_claims) allocation_count, transaction_count, reservation_count = _planned_budget_counts(after_claims) return RepairSummary( mode="apply" if apply else "dry-run", sim_employees=len(sim_employees), sim_claims=len(sim_claims), employee_department_plan=employee_plan, claim_department_plan=claim_plan, recent_pending_plan=fixed_recent_plan, rebuilt_budget_allocations=allocation_count, rebuilt_budget_transactions=transaction_count, rebuilt_budget_reservations=reservation_count, before_all_department_amounts=before_all, before_recent_pending_amounts=before_recent, after_all_department_amounts=after_all, after_recent_pending_amounts=after_recent, ) def _refresh_company_profiles(db, *, limit: int) -> dict[str, object]: capped_limit = max(1, min(int(limit or 120), 500)) employees = list( db.scalars(select(Employee).order_by(Employee.employee_no.asc())).all() ) employee_ids = [ employee.id for employee in employees if not is_admin_employee_like(employee) ][:capped_limit] service = EmployeeBehaviorProfileService(db) snapshot_count = 0 for employee_id in employee_ids: snapshots = service.refresh_employee_profiles( employee_id=employee_id, window_days=(30, 90, 180), expense_type_scope="overall", source_task_type="half_year_expense_demo_repair", commit=False, ) snapshot_count += len(snapshots) db.commit() return { "target_employee_count": len(employee_ids), "snapshot_count": snapshot_count, "window_days": [30, 90, 180], "source_task_type": "half_year_expense_demo_repair", "scope": "all_non_admin_employees", } def _canonical_departments(db) -> dict[str, OrganizationUnit]: department_codes = [code for code, _weight in DEPARTMENT_PLAN] rows = db.scalars( select(OrganizationUnit).where(OrganizationUnit.unit_code.in_(department_codes)) ).all() return {row.unit_code: row for row in rows} def _sim_employees(db) -> list[Employee]: return list( db.scalars( select(Employee) .options(selectinload(Employee.organization_unit)) .where(Employee.employee_no.like(f"{SIM_EMPLOYEE_PREFIX}%")) .order_by(Employee.employee_no.asc()) ).all() ) def _sim_claims(db) -> list[ExpenseClaim]: return list( db.scalars( select(ExpenseClaim) .options(selectinload(ExpenseClaim.items)) .where(ExpenseClaim.project_code == SIM_PROJECT_CODE) .order_by(ExpenseClaim.created_at.asc(), ExpenseClaim.claim_no.asc()) ).all() ) def _normalize_sim_claim_workflow(claims: list[ExpenseClaim]) -> None: for claim in claims: normalized = normalize_expense_claim_state( claim.status, claim.approval_stage, claim_no=claim.claim_no, expense_type=claim.expense_type, is_application_claim=False, ) claim.status = normalized.status claim.approval_stage = normalized.approval_stage def _clamp_sim_claim_dates(claims: list[ExpenseClaim]) -> None: for index, claim in enumerate(claims): occurred_at = claim.occurred_at or claim.submitted_at if occurred_at is None: continue if PERIOD_START <= occurred_at.date() <= PERIOD_END: continue anchor = RECENT_DATES[index % len(RECENT_DATES)] claim.occurred_at = anchor - _hours(2) if claim.submitted_at is not None or claim.status != "draft": claim.submitted_at = anchor claim.created_at = claim.occurred_at claim.updated_at = anchor + _hours(1) for item in claim.items or []: item.item_date = claim.occurred_at.date() def _counts_by_weight(total: int) -> dict[str, int]: raw = [(code, total * weight) for code, weight in DEPARTMENT_PLAN] counts = {code: int(value) for code, value in raw} remainder = total - sum(counts.values()) remainder_order = sorted( raw, key=lambda item: item[1] - int(item[1]), reverse=True, ) for code, _value in remainder_order[:remainder]: counts[code] += 1 return counts def _redistribute_employees( employees: list[Employee], departments: dict[str, OrganizationUnit], plan: dict[str, int], ) -> None: index = 0 for code, _weight in DEPARTMENT_PLAN: department = departments[code] for employee in employees[index : index + plan.get(code, 0)]: employee.organization_unit = department employee.cost_center = department.cost_center employee.location = department.location employee.finance_owner_name = f"{department.name}财务BP" index += plan.get(code, 0) def _employees_by_department(db) -> dict[str, list[Employee]]: rows = db.scalars( select(Employee) .options(selectinload(Employee.organization_unit)) .where(Employee.organization_unit_id.is_not(None)) .order_by(Employee.employee_no.asc()) ).all() grouped: dict[str, list[Employee]] = defaultdict(list) for employee in rows: unit = employee.organization_unit if unit is not None and unit.unit_code: grouped[unit.unit_code].append(employee) return grouped def _redistribute_regular_claims( claims: list[ExpenseClaim], departments: dict[str, OrganizationUnit], employees_by_dept: dict[str, list[Employee]], plan: dict[str, int], ) -> None: index = 0 for code, _weight in DEPARTMENT_PLAN: department = departments[code] employees = employees_by_dept.get(code) or [] for offset, claim in enumerate(claims[index : index + plan.get(code, 0)]): employee = employees[offset % len(employees)] if employees else None _assign_claim_department(claim, department, employee) index += plan.get(code, 0) def _repair_recent_pending_claims( claims: list[ExpenseClaim], departments: dict[str, OrganizationUnit], employees_by_dept: dict[str, list[Employee]], ) -> None: index = 0 for code, _weight in DEPARTMENT_PLAN: department = departments[code] employees = employees_by_dept.get(code) or [] for offset in range(RECENT_PENDING_PER_DEPARTMENT): claim = claims[index] employee = employees[offset % len(employees)] if employees else None _assign_claim_department(claim, department, employee) claim.status = "submitted" claim.approval_stage = "财务审批" if offset % 2 == 0 else "直属领导审批" claim.occurred_at = RECENT_DATES[offset] - _hours(2) claim.submitted_at = RECENT_DATES[offset] claim.updated_at = RECENT_DATES[offset] + _hours(1) index += 1 def _assign_claim_department( claim: ExpenseClaim, department: OrganizationUnit, employee: Employee | None, ) -> None: claim.department_id = department.id claim.department_name = department.name if employee is not None: claim.employee_id = employee.id claim.employee_name = employee.name claim.location = department.location or claim.location def _rebuild_sim_budget( db, claims: list[ExpenseClaim], departments: dict[str, OrganizationUnit], ) -> None: for model, field, prefix in ( (BudgetTransaction, BudgetTransaction.transaction_no, SIM_TRANSACTION_PREFIX), (BudgetReservation, BudgetReservation.reservation_no, SIM_RESERVATION_PREFIX), (BudgetAllocation, BudgetAllocation.budget_no, SIM_BUDGET_PREFIX), ): for row in db.scalars(select(model).where(field.like(f"{prefix}%"))).all(): db.delete(row) db.flush() groups: dict[tuple[int, str, str, str, str], list[ExpenseClaim]] = defaultdict(list) for claim in claims: if claim.status not in BUDGETED_STATUSES: continue subject_code = "meal" if claim.expense_type == "entertainment" else claim.expense_type quarter = ((claim.occurred_at.month - 1) // 3) + 1 period_key = f"{claim.occurred_at.year}Q{quarter}" cost_center = _claim_cost_center(claim, departments) key = (claim.occurred_at.year, period_key, claim.department_id, cost_center, subject_code) groups[key].append(claim) allocation_index = 1 transaction_index = 1 for key, group_claims in sorted(groups.items()): year, period_key, department_id, cost_center, subject_code = key total_used = sum((Decimal(claim.amount or 0) for claim in group_claims), Decimal("0.00")) original_amount = ( total_used / target_budget_usage(period_key, subject_code, allocation_index) ).quantize(Decimal("0.01")) allocation = BudgetAllocation( id=str(uuid.uuid5(uuid.NAMESPACE_DNS, f"repair:{SIM_BUDGET_PREFIX}:{key}")), budget_no=f"{SIM_BUDGET_PREFIX}-R{allocation_index:04d}", fiscal_year=year, period_type="quarter", period_key=period_key, department_id=department_id, department_name=group_claims[0].department_name, cost_center=cost_center, project_code=SIM_PROJECT_CODE, subject_code=subject_code, subject_name=SUBJECT_LABELS.get(subject_code, subject_code), original_amount=max(original_amount, Decimal("3000.00")), adjusted_amount=Decimal("0.00"), status="active", warning_threshold=Decimal("80.00"), control_action="warn", description="半年报销模拟数据部门分布修复预算池", created_by="simulation", updated_by="simulation", ) db.add(allocation) db.flush() for claim in group_claims: db.add(_budget_transaction(allocation.id, claim, transaction_index)) if claim.status in PENDING_STATUSES: db.add(_budget_reservation(allocation.id, claim, transaction_index)) transaction_index += 1 allocation_index += 1 def _budget_transaction(allocation_id: str, claim: ExpenseClaim, index: int) -> BudgetTransaction: transaction_no = f"{SIM_TRANSACTION_PREFIX}-R{index:04d}" transaction_type = "consume" if claim.status in SUCCESS_STATUSES else "reserve" return BudgetTransaction( id=str(uuid.uuid5(uuid.NAMESPACE_DNS, f"repair:{transaction_no}")), transaction_no=transaction_no, allocation_id=allocation_id, source_type="claim", source_id=claim.id, source_no=claim.claim_no, transaction_type=transaction_type, amount=Decimal(claim.amount or 0), before_available_amount=Decimal("0.00"), after_available_amount=Decimal("0.00"), operator="simulation", reason="修复后模拟数据预算台账", context_json={"project_code": SIM_PROJECT_CODE, "simulated": True, "repair": True}, created_at=claim.submitted_at or claim.occurred_at, ) def _budget_reservation(allocation_id: str, claim: ExpenseClaim, index: int) -> BudgetReservation: reservation_no = f"{SIM_RESERVATION_PREFIX}-R{index:04d}" return BudgetReservation( id=str(uuid.uuid5(uuid.NAMESPACE_DNS, f"repair:{reservation_no}")), reservation_no=reservation_no, allocation_id=allocation_id, source_type="claim", source_id=claim.id, source_no=claim.claim_no, source_status="active", amount=Decimal(claim.amount or 0), context_json={"project_code": SIM_PROJECT_CODE, "simulated": True, "repair": True}, created_at=claim.submitted_at or claim.occurred_at, ) def _recent_claims(claims: list[ExpenseClaim]) -> list[ExpenseClaim]: needed = RECENT_PENDING_PER_DEPARTMENT * len(DEPARTMENT_PLAN) return sorted(claims, key=lambda claim: Decimal(claim.amount or 0), reverse=True)[:needed] def _department_amounts(claims: list[ExpenseClaim]) -> dict[str, str]: buckets: dict[str, Decimal] = defaultdict(Decimal) for claim in claims: buckets[claim.department_name or "待补充"] += Decimal(claim.amount or 0) return _format_amounts(buckets) def _recent_pending_amounts(claims: list[ExpenseClaim]) -> dict[str, str]: buckets: dict[str, Decimal] = defaultdict(Decimal) for claim in claims: if claim.status not in PENDING_STATUSES: continue submitted_at = claim.submitted_at or claim.occurred_at if not submitted_at: continue day = submitted_at.date() if date(2026, 6, 1) <= day <= date(2026, 6, 2): buckets[claim.department_name or "待补充"] += Decimal(claim.amount or 0) return _format_amounts(buckets) def _preview_claims( claims: list[ExpenseClaim], departments: dict[str, OrganizationUnit], claim_plan: dict[str, int], ) -> list[ExpenseClaim]: preview: list[ExpenseClaim] = [] recent_claims = _recent_claims(claims) recent_claim_set = set(recent_claims) regular_claims = [claim for claim in claims if claim not in recent_claim_set] index = 0 for code, _weight in DEPARTMENT_PLAN: department = departments[code] count = max(claim_plan.get(code, 0) - RECENT_PENDING_PER_DEPARTMENT, 0) for claim in regular_claims[index : index + count]: preview.append(_clone_claim(claim, department.name, claim.status, claim.submitted_at)) index += count recent_index = 0 for code, _weight in DEPARTMENT_PLAN: department = departments[code] for offset in range(RECENT_PENDING_PER_DEPARTMENT): preview.append( _clone_claim( recent_claims[recent_index], department.name, "submitted", RECENT_DATES[offset], ) ) recent_index += 1 return preview def _clone_claim( claim: ExpenseClaim, department_name: str, status: str, submitted_at: datetime | None, ) -> Any: return type( "ClaimPreview", (), { "department_name": department_name, "status": status, "submitted_at": submitted_at, "occurred_at": claim.occurred_at, "expense_type": claim.expense_type, "amount": claim.amount, }, )() def _planned_budget_counts(claims: list[Any]) -> tuple[int, int, int]: allocation_keys = set() transaction_count = 0 reservation_count = 0 for claim in claims: if claim.status not in BUDGETED_STATUSES: continue submitted_at = claim.submitted_at or claim.occurred_at period_key = f"{submitted_at.year}Q{((submitted_at.month - 1) // 3) + 1}" allocation_keys.add((period_key, claim.department_name, getattr(claim, "expense_type", ""))) transaction_count += 1 reservation_count += int(claim.status in PENDING_STATUSES) return len(allocation_keys), transaction_count, reservation_count def _claim_cost_center( claim: ExpenseClaim, departments: dict[str, OrganizationUnit], ) -> str | None: for department in departments.values(): if department.id == claim.department_id: return department.cost_center return None def _format_amounts(buckets: dict[str, Decimal]) -> dict[str, str]: return { key: str(value.quantize(Decimal("0.01"))) for key, value in sorted(buckets.items(), key=lambda item: item[1], reverse=True) } def _hours(value: int): from datetime import timedelta return timedelta(hours=value) if __name__ == "__main__": main()