#!/usr/bin/env python3 from __future__ import annotations import argparse import json import re import sys import uuid from dataclasses import asdict, dataclass from pathlib import Path from typing import Any from sqlalchemy import select from sqlalchemy.orm import selectinload SERVER_DIR = Path(__file__).resolve().parents[1] SRC_DIR = SERVER_DIR / "src" if str(SRC_DIR) not in sys.path: sys.path.insert(0, str(SRC_DIR)) from app.db.session import get_session_factory # noqa: E402 from app.models.budget import BudgetReservation, BudgetTransaction # noqa: E402 from app.models.financial_record import ExpenseClaim # noqa: E402 from app.models.risk_observation import RiskObservation # noqa: E402 from app.services.demo_company_simulation_catalog import ( # noqa: E402 SIM_CLAIM_ID_NAMESPACE, SIM_PROJECT_CODE, build_simulation_reimbursement_no, ) from app.services.expense_claim_attachment_storage import ( # noqa: E402 ExpenseClaimAttachmentStorage, ) LEGACY_CLAIM_PATTERN = re.compile(r"^SIM-EXP-2026-(\d+)$", flags=re.IGNORECASE) @dataclass(frozen=True, slots=True) class RenameSummary: mode: str legacy_claims: int renamed_claims: int budget_transactions_updated: int budget_reservations_updated: int risk_observations_updated: int attachment_files_updated: int attachment_items_updated: int residual_attachment_texts_updated: int samples: list[dict[str, str]] def to_dict(self) -> dict[str, Any]: return asdict(self) def main() -> None: parser = argparse.ArgumentParser( description="Rename legacy half-year demo claim numbers to canonical RE numbers." ) parser.add_argument("--apply", action="store_true", help="write changes to the database") parser.add_argument("--sample-limit", type=int, default=12) args = parser.parse_args() session_factory = get_session_factory() with session_factory() as db: summary = rename_demo_claim_numbers( db, apply=args.apply, sample_limit=max(args.sample_limit, 0), ) if args.apply: db.commit() else: db.rollback() print(json.dumps(summary.to_dict(), ensure_ascii=False, indent=2)) def rename_demo_claim_numbers(db, *, apply: bool, sample_limit: int) -> RenameSummary: claims = _legacy_demo_claims(db) rename_map = _build_rename_map(db, claims) storage = ExpenseClaimAttachmentStorage() transaction_updates = 0 reservation_updates = 0 risk_updates = 0 attachment_file_updates = 0 attachment_item_updates = 0 samples: list[dict[str, str]] = [] for claim in claims: old_no = str(claim.claim_no or "").strip() new_no = rename_map.get(old_no) if not new_no: continue if len(samples) < sample_limit: samples.append({"old": old_no, "new": new_no}) transaction_updates += _update_budget_transactions(db, old_no, new_no, apply=apply) reservation_updates += _update_budget_reservations(db, old_no, new_no, apply=apply) risk_updates += _update_risk_observations(db, claim, old_no, new_no, apply=apply) file_count, item_count = _update_attachments( storage, claim, old_no, new_no, apply=apply, ) attachment_file_updates += file_count attachment_item_updates += item_count if apply: claim.claim_no = new_no residual_text_updates = _repair_residual_attachment_texts( storage, _demo_claims(db), apply=apply, ) return RenameSummary( mode="apply" if apply else "dry-run", legacy_claims=len(claims), renamed_claims=len(rename_map), budget_transactions_updated=transaction_updates, budget_reservations_updated=reservation_updates, risk_observations_updated=risk_updates, attachment_files_updated=attachment_file_updates, attachment_items_updated=attachment_item_updates, residual_attachment_texts_updated=residual_text_updates, samples=samples, ) def _legacy_demo_claims(db) -> list[ExpenseClaim]: return list( db.scalars( select(ExpenseClaim) .options(selectinload(ExpenseClaim.items)) .where(ExpenseClaim.project_code == SIM_PROJECT_CODE) .where(ExpenseClaim.claim_no.like("SIM-EXP-2026-%")) .order_by(ExpenseClaim.created_at.asc(), ExpenseClaim.claim_no.asc()) ).all() ) def _demo_claims(db) -> list[ExpenseClaim]: return list( db.scalars( select(ExpenseClaim) .options(selectinload(ExpenseClaim.items)) .where(ExpenseClaim.project_code == SIM_PROJECT_CODE) .order_by(ExpenseClaim.created_at.asc(), ExpenseClaim.claim_no.asc()) ).all() ) def _build_rename_map(db, claims: list[ExpenseClaim]) -> dict[str, str]: legacy_numbers = {str(claim.claim_no or "").strip() for claim in claims} existing_numbers = set(db.scalars(select(ExpenseClaim.claim_no)).all()) - legacy_numbers rename_map: dict[str, str] = {} for fallback_index, claim in enumerate(claims, start=1): old_no = str(claim.claim_no or "").strip() sequence = _legacy_sequence(old_no) or fallback_index timestamp = claim.occurred_at or claim.created_at or claim.submitted_at new_no = build_simulation_reimbursement_no(timestamp, sequence) if new_no in existing_numbers: raise RuntimeError(f"canonical claim number already exists: {new_no}") existing_numbers.add(new_no) rename_map[old_no] = new_no return rename_map def _legacy_sequence(claim_no: str) -> int | None: match = LEGACY_CLAIM_PATTERN.match(claim_no) if not match: return None return int(match.group(1)) def _update_budget_transactions(db, old_no: str, new_no: str, *, apply: bool) -> int: rows = list( db.scalars( select(BudgetTransaction).where(BudgetTransaction.source_no == old_no) ).all() ) if apply: for row in rows: row.source_no = new_no return len(rows) def _update_budget_reservations(db, old_no: str, new_no: str, *, apply: bool) -> int: rows = list( db.scalars( select(BudgetReservation).where(BudgetReservation.source_no == old_no) ).all() ) if apply: for row in rows: row.source_no = new_no return len(rows) def _update_risk_observations( db, claim: ExpenseClaim, old_no: str, new_no: str, *, apply: bool, ) -> int: rows = list( db.scalars( select(RiskObservation).where( (RiskObservation.claim_id == claim.id) | (RiskObservation.claim_no == old_no) | (RiskObservation.subject_key == old_no) ) ).all() ) if apply: for row in rows: row.claim_no = new_no if row.claim_no == old_no else row.claim_no row.subject_key = new_no if row.subject_key == old_no else row.subject_key row.subject_label = new_no if row.subject_label == old_no else row.subject_label row.evidence_json = _replace_value(row.evidence_json, old_no, new_no) row.ontology_json = _replace_value(row.ontology_json, old_no, new_no) row.decision_trace_json = _replace_value(row.decision_trace_json, old_no, new_no) return len(rows) def _update_attachments( storage: ExpenseClaimAttachmentStorage, claim: ExpenseClaim, old_no: str, new_no: str, *, apply: bool, ) -> tuple[int, int]: file_updates = 0 item_updates = 0 for item in list(claim.items or []): invoice_id = str(item.invoice_id or "").strip() if old_no not in invoice_id: continue new_invoice_id = invoice_id.replace(old_no, new_no) item_updates += 1 if not apply: file_updates += 1 continue file_path = storage.resolve_item_path(item) if file_path is not None and file_path.exists(): file_updates += 1 meta_payload = _replace_value(storage.read_meta(file_path), old_no, new_no) new_file_path = file_path.with_name(file_path.name.replace(old_no, new_no)) meta_path = storage.meta_path(file_path) new_meta_path = storage.meta_path(new_file_path) file_path.rename(new_file_path) if meta_path.exists(): meta_path.rename(new_meta_path) storage.write_meta(new_file_path, meta_payload) item.invoice_id = new_invoice_id return file_updates, item_updates def _repair_residual_attachment_texts( storage: ExpenseClaimAttachmentStorage, claims: list[ExpenseClaim], *, apply: bool, ) -> int: sequence_by_claim_id = _simulation_sequence_by_claim_id(max(3000, len(claims) + 500)) updated = 0 for claim in claims: sequence = sequence_by_claim_id.get(str(claim.id)) if sequence is None: continue old_no = f"SIM-EXP-2026-{sequence:04d}" new_no = str(claim.claim_no or "").strip() if not old_no or not new_no or old_no == new_no: continue for item in list(claim.items or []): file_path = storage.resolve_item_path(item) if file_path is None or not file_path.exists(): continue if _replace_file_text(file_path, old_no, new_no, apply=apply): updated += 1 if _replace_meta_text(storage, file_path, old_no, new_no, apply=apply): updated += 1 return updated def _simulation_sequence_by_claim_id(limit: int) -> dict[str, int]: return { str( uuid.uuid5( uuid.NAMESPACE_DNS, f"x-financial:{SIM_CLAIM_ID_NAMESPACE}:{sequence}", ) ): sequence for sequence in range(1, limit + 1) } def _replace_file_text(file_path: Path, old_no: str, new_no: str, *, apply: bool) -> bool: try: content = file_path.read_text(encoding="utf-8") except UnicodeDecodeError: return False if old_no not in content: return False if apply: file_path.write_text(content.replace(old_no, new_no), encoding="utf-8") return True def _replace_meta_text( storage: ExpenseClaimAttachmentStorage, file_path: Path, old_no: str, new_no: str, *, apply: bool, ) -> bool: payload = storage.read_meta(file_path) if not payload: return False replaced = _replace_value(payload, old_no, new_no) if replaced == payload: return False if apply: storage.write_meta(file_path, replaced) return True def _replace_value(value: Any, old_no: str, new_no: str) -> Any: if isinstance(value, str): return value.replace(old_no, new_no) if isinstance(value, list): return [_replace_value(item, old_no, new_no) for item in value] if isinstance(value, dict): return {key: _replace_value(item, old_no, new_no) for key, item in value.items()} return value if __name__ == "__main__": main()