348 lines
11 KiB
Python
348 lines
11 KiB
Python
|
|
#!/usr/bin/env python3
|
||
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import argparse
|
||
|
|
import json
|
||
|
|
import re
|
||
|
|
import sys
|
||
|
|
import uuid
|
||
|
|
from dataclasses import asdict, dataclass
|
||
|
|
from pathlib import Path
|
||
|
|
from typing import Any
|
||
|
|
|
||
|
|
from sqlalchemy import select
|
||
|
|
from sqlalchemy.orm import selectinload
|
||
|
|
|
||
|
|
SERVER_DIR = Path(__file__).resolve().parents[1]
|
||
|
|
SRC_DIR = SERVER_DIR / "src"
|
||
|
|
if str(SRC_DIR) not in sys.path:
|
||
|
|
sys.path.insert(0, str(SRC_DIR))
|
||
|
|
|
||
|
|
from app.db.session import get_session_factory # noqa: E402
|
||
|
|
from app.models.budget import BudgetReservation, BudgetTransaction # noqa: E402
|
||
|
|
from app.models.financial_record import ExpenseClaim # noqa: E402
|
||
|
|
from app.models.risk_observation import RiskObservation # noqa: E402
|
||
|
|
from app.services.demo_company_simulation_catalog import ( # noqa: E402
|
||
|
|
SIM_CLAIM_ID_NAMESPACE,
|
||
|
|
SIM_PROJECT_CODE,
|
||
|
|
build_simulation_reimbursement_no,
|
||
|
|
)
|
||
|
|
from app.services.expense_claim_attachment_storage import ( # noqa: E402
|
||
|
|
ExpenseClaimAttachmentStorage,
|
||
|
|
)
|
||
|
|
|
||
|
|
LEGACY_CLAIM_PATTERN = re.compile(r"^SIM-EXP-2026-(\d+)$", flags=re.IGNORECASE)
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass(frozen=True, slots=True)
|
||
|
|
class RenameSummary:
|
||
|
|
mode: str
|
||
|
|
legacy_claims: int
|
||
|
|
renamed_claims: int
|
||
|
|
budget_transactions_updated: int
|
||
|
|
budget_reservations_updated: int
|
||
|
|
risk_observations_updated: int
|
||
|
|
attachment_files_updated: int
|
||
|
|
attachment_items_updated: int
|
||
|
|
residual_attachment_texts_updated: int
|
||
|
|
samples: list[dict[str, str]]
|
||
|
|
|
||
|
|
def to_dict(self) -> dict[str, Any]:
|
||
|
|
return asdict(self)
|
||
|
|
|
||
|
|
|
||
|
|
def main() -> None:
|
||
|
|
parser = argparse.ArgumentParser(
|
||
|
|
description="Rename legacy half-year demo claim numbers to canonical RE numbers."
|
||
|
|
)
|
||
|
|
parser.add_argument("--apply", action="store_true", help="write changes to the database")
|
||
|
|
parser.add_argument("--sample-limit", type=int, default=12)
|
||
|
|
args = parser.parse_args()
|
||
|
|
|
||
|
|
session_factory = get_session_factory()
|
||
|
|
with session_factory() as db:
|
||
|
|
summary = rename_demo_claim_numbers(
|
||
|
|
db,
|
||
|
|
apply=args.apply,
|
||
|
|
sample_limit=max(args.sample_limit, 0),
|
||
|
|
)
|
||
|
|
if args.apply:
|
||
|
|
db.commit()
|
||
|
|
else:
|
||
|
|
db.rollback()
|
||
|
|
print(json.dumps(summary.to_dict(), ensure_ascii=False, indent=2))
|
||
|
|
|
||
|
|
|
||
|
|
def rename_demo_claim_numbers(db, *, apply: bool, sample_limit: int) -> RenameSummary:
|
||
|
|
claims = _legacy_demo_claims(db)
|
||
|
|
rename_map = _build_rename_map(db, claims)
|
||
|
|
storage = ExpenseClaimAttachmentStorage()
|
||
|
|
|
||
|
|
transaction_updates = 0
|
||
|
|
reservation_updates = 0
|
||
|
|
risk_updates = 0
|
||
|
|
attachment_file_updates = 0
|
||
|
|
attachment_item_updates = 0
|
||
|
|
samples: list[dict[str, str]] = []
|
||
|
|
|
||
|
|
for claim in claims:
|
||
|
|
old_no = str(claim.claim_no or "").strip()
|
||
|
|
new_no = rename_map.get(old_no)
|
||
|
|
if not new_no:
|
||
|
|
continue
|
||
|
|
if len(samples) < sample_limit:
|
||
|
|
samples.append({"old": old_no, "new": new_no})
|
||
|
|
|
||
|
|
transaction_updates += _update_budget_transactions(db, old_no, new_no, apply=apply)
|
||
|
|
reservation_updates += _update_budget_reservations(db, old_no, new_no, apply=apply)
|
||
|
|
risk_updates += _update_risk_observations(db, claim, old_no, new_no, apply=apply)
|
||
|
|
file_count, item_count = _update_attachments(
|
||
|
|
storage,
|
||
|
|
claim,
|
||
|
|
old_no,
|
||
|
|
new_no,
|
||
|
|
apply=apply,
|
||
|
|
)
|
||
|
|
attachment_file_updates += file_count
|
||
|
|
attachment_item_updates += item_count
|
||
|
|
|
||
|
|
if apply:
|
||
|
|
claim.claim_no = new_no
|
||
|
|
|
||
|
|
residual_text_updates = _repair_residual_attachment_texts(
|
||
|
|
storage,
|
||
|
|
_demo_claims(db),
|
||
|
|
apply=apply,
|
||
|
|
)
|
||
|
|
|
||
|
|
return RenameSummary(
|
||
|
|
mode="apply" if apply else "dry-run",
|
||
|
|
legacy_claims=len(claims),
|
||
|
|
renamed_claims=len(rename_map),
|
||
|
|
budget_transactions_updated=transaction_updates,
|
||
|
|
budget_reservations_updated=reservation_updates,
|
||
|
|
risk_observations_updated=risk_updates,
|
||
|
|
attachment_files_updated=attachment_file_updates,
|
||
|
|
attachment_items_updated=attachment_item_updates,
|
||
|
|
residual_attachment_texts_updated=residual_text_updates,
|
||
|
|
samples=samples,
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
def _legacy_demo_claims(db) -> list[ExpenseClaim]:
|
||
|
|
return list(
|
||
|
|
db.scalars(
|
||
|
|
select(ExpenseClaim)
|
||
|
|
.options(selectinload(ExpenseClaim.items))
|
||
|
|
.where(ExpenseClaim.project_code == SIM_PROJECT_CODE)
|
||
|
|
.where(ExpenseClaim.claim_no.like("SIM-EXP-2026-%"))
|
||
|
|
.order_by(ExpenseClaim.created_at.asc(), ExpenseClaim.claim_no.asc())
|
||
|
|
).all()
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
def _demo_claims(db) -> list[ExpenseClaim]:
|
||
|
|
return list(
|
||
|
|
db.scalars(
|
||
|
|
select(ExpenseClaim)
|
||
|
|
.options(selectinload(ExpenseClaim.items))
|
||
|
|
.where(ExpenseClaim.project_code == SIM_PROJECT_CODE)
|
||
|
|
.order_by(ExpenseClaim.created_at.asc(), ExpenseClaim.claim_no.asc())
|
||
|
|
).all()
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
def _build_rename_map(db, claims: list[ExpenseClaim]) -> dict[str, str]:
|
||
|
|
legacy_numbers = {str(claim.claim_no or "").strip() for claim in claims}
|
||
|
|
existing_numbers = set(db.scalars(select(ExpenseClaim.claim_no)).all()) - legacy_numbers
|
||
|
|
rename_map: dict[str, str] = {}
|
||
|
|
for fallback_index, claim in enumerate(claims, start=1):
|
||
|
|
old_no = str(claim.claim_no or "").strip()
|
||
|
|
sequence = _legacy_sequence(old_no) or fallback_index
|
||
|
|
timestamp = claim.occurred_at or claim.created_at or claim.submitted_at
|
||
|
|
new_no = build_simulation_reimbursement_no(timestamp, sequence)
|
||
|
|
if new_no in existing_numbers:
|
||
|
|
raise RuntimeError(f"canonical claim number already exists: {new_no}")
|
||
|
|
existing_numbers.add(new_no)
|
||
|
|
rename_map[old_no] = new_no
|
||
|
|
return rename_map
|
||
|
|
|
||
|
|
|
||
|
|
def _legacy_sequence(claim_no: str) -> int | None:
|
||
|
|
match = LEGACY_CLAIM_PATTERN.match(claim_no)
|
||
|
|
if not match:
|
||
|
|
return None
|
||
|
|
return int(match.group(1))
|
||
|
|
|
||
|
|
|
||
|
|
def _update_budget_transactions(db, old_no: str, new_no: str, *, apply: bool) -> int:
|
||
|
|
rows = list(
|
||
|
|
db.scalars(
|
||
|
|
select(BudgetTransaction).where(BudgetTransaction.source_no == old_no)
|
||
|
|
).all()
|
||
|
|
)
|
||
|
|
if apply:
|
||
|
|
for row in rows:
|
||
|
|
row.source_no = new_no
|
||
|
|
return len(rows)
|
||
|
|
|
||
|
|
|
||
|
|
def _update_budget_reservations(db, old_no: str, new_no: str, *, apply: bool) -> int:
|
||
|
|
rows = list(
|
||
|
|
db.scalars(
|
||
|
|
select(BudgetReservation).where(BudgetReservation.source_no == old_no)
|
||
|
|
).all()
|
||
|
|
)
|
||
|
|
if apply:
|
||
|
|
for row in rows:
|
||
|
|
row.source_no = new_no
|
||
|
|
return len(rows)
|
||
|
|
|
||
|
|
|
||
|
|
def _update_risk_observations(
|
||
|
|
db,
|
||
|
|
claim: ExpenseClaim,
|
||
|
|
old_no: str,
|
||
|
|
new_no: str,
|
||
|
|
*,
|
||
|
|
apply: bool,
|
||
|
|
) -> int:
|
||
|
|
rows = list(
|
||
|
|
db.scalars(
|
||
|
|
select(RiskObservation).where(
|
||
|
|
(RiskObservation.claim_id == claim.id)
|
||
|
|
| (RiskObservation.claim_no == old_no)
|
||
|
|
| (RiskObservation.subject_key == old_no)
|
||
|
|
)
|
||
|
|
).all()
|
||
|
|
)
|
||
|
|
if apply:
|
||
|
|
for row in rows:
|
||
|
|
row.claim_no = new_no if row.claim_no == old_no else row.claim_no
|
||
|
|
row.subject_key = new_no if row.subject_key == old_no else row.subject_key
|
||
|
|
row.subject_label = new_no if row.subject_label == old_no else row.subject_label
|
||
|
|
row.evidence_json = _replace_value(row.evidence_json, old_no, new_no)
|
||
|
|
row.ontology_json = _replace_value(row.ontology_json, old_no, new_no)
|
||
|
|
row.decision_trace_json = _replace_value(row.decision_trace_json, old_no, new_no)
|
||
|
|
return len(rows)
|
||
|
|
|
||
|
|
|
||
|
|
def _update_attachments(
|
||
|
|
storage: ExpenseClaimAttachmentStorage,
|
||
|
|
claim: ExpenseClaim,
|
||
|
|
old_no: str,
|
||
|
|
new_no: str,
|
||
|
|
*,
|
||
|
|
apply: bool,
|
||
|
|
) -> tuple[int, int]:
|
||
|
|
file_updates = 0
|
||
|
|
item_updates = 0
|
||
|
|
for item in list(claim.items or []):
|
||
|
|
invoice_id = str(item.invoice_id or "").strip()
|
||
|
|
if old_no not in invoice_id:
|
||
|
|
continue
|
||
|
|
new_invoice_id = invoice_id.replace(old_no, new_no)
|
||
|
|
item_updates += 1
|
||
|
|
if not apply:
|
||
|
|
file_updates += 1
|
||
|
|
continue
|
||
|
|
|
||
|
|
file_path = storage.resolve_item_path(item)
|
||
|
|
if file_path is not None and file_path.exists():
|
||
|
|
file_updates += 1
|
||
|
|
meta_payload = _replace_value(storage.read_meta(file_path), old_no, new_no)
|
||
|
|
new_file_path = file_path.with_name(file_path.name.replace(old_no, new_no))
|
||
|
|
meta_path = storage.meta_path(file_path)
|
||
|
|
new_meta_path = storage.meta_path(new_file_path)
|
||
|
|
file_path.rename(new_file_path)
|
||
|
|
if meta_path.exists():
|
||
|
|
meta_path.rename(new_meta_path)
|
||
|
|
storage.write_meta(new_file_path, meta_payload)
|
||
|
|
|
||
|
|
item.invoice_id = new_invoice_id
|
||
|
|
return file_updates, item_updates
|
||
|
|
|
||
|
|
|
||
|
|
def _repair_residual_attachment_texts(
|
||
|
|
storage: ExpenseClaimAttachmentStorage,
|
||
|
|
claims: list[ExpenseClaim],
|
||
|
|
*,
|
||
|
|
apply: bool,
|
||
|
|
) -> int:
|
||
|
|
sequence_by_claim_id = _simulation_sequence_by_claim_id(max(3000, len(claims) + 500))
|
||
|
|
updated = 0
|
||
|
|
for claim in claims:
|
||
|
|
sequence = sequence_by_claim_id.get(str(claim.id))
|
||
|
|
if sequence is None:
|
||
|
|
continue
|
||
|
|
old_no = f"SIM-EXP-2026-{sequence:04d}"
|
||
|
|
new_no = str(claim.claim_no or "").strip()
|
||
|
|
if not old_no or not new_no or old_no == new_no:
|
||
|
|
continue
|
||
|
|
for item in list(claim.items or []):
|
||
|
|
file_path = storage.resolve_item_path(item)
|
||
|
|
if file_path is None or not file_path.exists():
|
||
|
|
continue
|
||
|
|
if _replace_file_text(file_path, old_no, new_no, apply=apply):
|
||
|
|
updated += 1
|
||
|
|
if _replace_meta_text(storage, file_path, old_no, new_no, apply=apply):
|
||
|
|
updated += 1
|
||
|
|
return updated
|
||
|
|
|
||
|
|
|
||
|
|
def _simulation_sequence_by_claim_id(limit: int) -> dict[str, int]:
|
||
|
|
return {
|
||
|
|
str(
|
||
|
|
uuid.uuid5(
|
||
|
|
uuid.NAMESPACE_DNS,
|
||
|
|
f"x-financial:{SIM_CLAIM_ID_NAMESPACE}:{sequence}",
|
||
|
|
)
|
||
|
|
): sequence
|
||
|
|
for sequence in range(1, limit + 1)
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
def _replace_file_text(file_path: Path, old_no: str, new_no: str, *, apply: bool) -> bool:
|
||
|
|
try:
|
||
|
|
content = file_path.read_text(encoding="utf-8")
|
||
|
|
except UnicodeDecodeError:
|
||
|
|
return False
|
||
|
|
if old_no not in content:
|
||
|
|
return False
|
||
|
|
if apply:
|
||
|
|
file_path.write_text(content.replace(old_no, new_no), encoding="utf-8")
|
||
|
|
return True
|
||
|
|
|
||
|
|
|
||
|
|
def _replace_meta_text(
|
||
|
|
storage: ExpenseClaimAttachmentStorage,
|
||
|
|
file_path: Path,
|
||
|
|
old_no: str,
|
||
|
|
new_no: str,
|
||
|
|
*,
|
||
|
|
apply: bool,
|
||
|
|
) -> bool:
|
||
|
|
payload = storage.read_meta(file_path)
|
||
|
|
if not payload:
|
||
|
|
return False
|
||
|
|
replaced = _replace_value(payload, old_no, new_no)
|
||
|
|
if replaced == payload:
|
||
|
|
return False
|
||
|
|
if apply:
|
||
|
|
storage.write_meta(file_path, replaced)
|
||
|
|
return True
|
||
|
|
|
||
|
|
|
||
|
|
def _replace_value(value: Any, old_no: str, new_no: str) -> Any:
|
||
|
|
if isinstance(value, str):
|
||
|
|
return value.replace(old_no, new_no)
|
||
|
|
if isinstance(value, list):
|
||
|
|
return [_replace_value(item, old_no, new_no) for item in value]
|
||
|
|
if isinstance(value, dict):
|
||
|
|
return {key: _replace_value(item, old_no, new_no) for key, item in value.items()}
|
||
|
|
return value
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
main()
|