#!/usr/bin/env python3 from __future__ import annotations import argparse import json import sys from dataclasses import asdict, dataclass from datetime import UTC, datetime from decimal import Decimal from pathlib import Path from typing import Any from sqlalchemy import select from sqlalchemy.orm import selectinload SERVER_DIR = Path(__file__).resolve().parents[1] SRC_DIR = SERVER_DIR / "src" if str(SRC_DIR) not in sys.path: sys.path.insert(0, str(SRC_DIR)) from app.db.session import get_session_factory # noqa: E402 from app.models.financial_record import ExpenseClaim, ExpenseClaimItem # noqa: E402 from app.services.demo_company_simulation_catalog import SIM_CLAIM_PREFIX # noqa: E402 from app.services.expense_claim_attachment_storage import ( # noqa: E402 ExpenseClaimAttachmentStorage, ) DOCUMENT_BY_ITEM_TYPE = { "hotel": ("hotel_invoice", "酒店住宿票据", "hotel", "住宿票据"), "hotel_ticket": ("hotel_invoice", "酒店住宿票据", "hotel", "住宿票据"), "transport": ("transport_receipt", "乘车票据", "transport", "交通票据"), "train_ticket": ("train_ticket", "火车/高铁票", "travel", "差旅票据"), "flight_ticket": ("flight_itinerary", "航空行程单", "travel", "差旅票据"), "ride_ticket": ("taxi_receipt", "出租车/网约车票据", "transport", "交通票据"), "meal": ("meal_receipt", "餐饮发票", "meal", "餐饮票据"), "entertainment": ("meal_receipt", "餐饮发票", "meal", "餐饮票据"), "office": ("office_invoice", "办公用品发票", "office", "办公票据"), "communication": ("telecom_invoice", "通信服务发票", "communication", "通信票据"), "travel_allowance": ("allowance_sheet", "差旅补贴测算单", "travel", "差旅测算"), } @dataclass(frozen=True, slots=True) class MockAttachmentSummary: mode: str sim_claims: int sim_items: int attachments_to_mock: int missing_material_items: int compliant_attachments: int violation_attachments: int already_mocked: int def to_dict(self) -> dict[str, Any]: return asdict(self) def main() -> None: parser = argparse.ArgumentParser( description="Mock attachment files and OCR metadata for half-year simulated claims." ) parser.add_argument("--apply", action="store_true", help="Write mock attachment files.") args = parser.parse_args() session_factory = get_session_factory() with session_factory() as db: try: summary = mock_attachments(db, apply=args.apply) if args.apply: db.commit() print(json.dumps(summary.to_dict(), ensure_ascii=False, indent=2)) if not args.apply: print("dry-run only; pass --apply after confirmation to write mock attachments.") except Exception: db.rollback() raise def mock_attachments(db, *, apply: bool) -> MockAttachmentSummary: claims = _sim_claims(db) storage = ExpenseClaimAttachmentStorage() attachments_to_mock = 0 missing_material_items = 0 compliant_attachments = 0 violation_attachments = 0 already_mocked = 0 sim_items = 0 for claim_index, claim in enumerate(claims, start=1): items = list(claim.items or []) sim_items += len(items) for item_index, item in enumerate(items, start=1): if _has_existing_mock(storage, item): already_mocked += 1 continue if _should_leave_missing(claim_index, item_index, claim): missing_material_items += 1 if apply: item.invoice_id = None continue violated = _is_violation_sample(claim_index, item_index, claim) attachments_to_mock += 1 violation_attachments += int(violated) compliant_attachments += int(not violated) if apply: _write_mock_attachment( storage=storage, claim=claim, item=item, claim_index=claim_index, item_index=item_index, violated=violated, ) if apply: claim.invoice_count = sum( 1 for item in items if str(item.invoice_id or "").strip() ) return MockAttachmentSummary( mode="apply" if apply else "dry-run", sim_claims=len(claims), sim_items=sim_items, attachments_to_mock=attachments_to_mock, missing_material_items=missing_material_items, compliant_attachments=compliant_attachments, violation_attachments=violation_attachments, already_mocked=already_mocked, ) def _sim_claims(db) -> list[ExpenseClaim]: return list( db.scalars( select(ExpenseClaim) .options(selectinload(ExpenseClaim.items)) .where(ExpenseClaim.claim_no.like(f"{SIM_CLAIM_PREFIX}%")) .order_by(ExpenseClaim.claim_no.asc()) ).all() ) def _has_existing_mock(storage: ExpenseClaimAttachmentStorage, item: ExpenseClaimItem) -> bool: file_path = storage.resolve_item_path(item) if file_path is None or not file_path.exists(): return False metadata = storage.read_meta(file_path) return str(metadata.get("source") or "") == "half_year_expense_demo_mock" def _should_leave_missing(claim_index: int, item_index: int, claim: ExpenseClaim) -> bool: if str(claim.status or "").strip().lower() in {"draft", "returned"}: return (claim_index + item_index) % 4 == 0 return (claim_index + item_index) % 19 == 0 def _is_violation_sample(claim_index: int, item_index: int, claim: ExpenseClaim) -> bool: if claim.hermes_risk_flag or claim.risk_flags_json: return True return (claim_index * 7 + item_index * 3) % 11 == 0 def _write_mock_attachment( *, storage: ExpenseClaimAttachmentStorage, claim: ExpenseClaim, item: ExpenseClaimItem, claim_index: int, item_index: int, violated: bool, ) -> None: document_type, document_label, scene_code, scene_label = _document_meta(item.item_type) filename = f"{claim.claim_no}-{item_index:02d}-{document_type}.txt" attachment_dir = storage.build_item_dir(claim.id, item.id) attachment_dir.mkdir(parents=True, exist_ok=True) file_path = attachment_dir / filename ocr_text = _ocr_text( claim=claim, item=item, document_label=document_label, claim_index=claim_index, item_index=item_index, violated=violated, ) file_path.write_text(ocr_text, encoding="utf-8") item.invoice_id = storage.to_storage_key(file_path) storage.write_meta( file_path, _meta_payload( storage_key=item.invoice_id, filename=filename, file_path=file_path, claim=claim, item=item, document_type=document_type, document_label=document_label, scene_code=scene_code, scene_label=scene_label, ocr_text=ocr_text, violated=violated, ), ) def _document_meta(item_type: str) -> tuple[str, str, str, str]: return DOCUMENT_BY_ITEM_TYPE.get( str(item_type or "").strip().lower(), ("invoice", "费用发票", "other", "其他票据"), ) def _ocr_text( *, claim: ExpenseClaim, item: ExpenseClaimItem, document_label: str, claim_index: int, item_index: int, violated: bool, ) -> str: invoice_no = f"MOCK{claim_index:04d}{item_index:02d}" amount = _display_amount(item.item_amount) merchant = _merchant_name(item.item_type, violated) violation_line = ( "校验提示:票据金额或场景需要人工复核。" if violated else "校验提示:票据字段与报销明细一致。" ) return "\n".join( [ f"票据类型:{document_label}", f"发票号码:{invoice_no}", f"开票方:{merchant}", f"购买方:{claim.department_name}", f"发生日期:{item.item_date.isoformat()}", f"发生地点:{item.item_location}", f"金额:{amount}", f"关联报销单:{claim.claim_no}", violation_line, ] ) def _merchant_name(item_type: str, violated: bool) -> str: normalized = str(item_type or "").strip().lower() if violated: return { "hotel": "上海云栖酒店有限公司", "transport": "跨城交通服务商", "office": "综合采购供应商", "meal": "高端商务餐饮有限公司", }.get(normalized, "异常样本供应商") return { "hotel": "合规住宿服务有限公司", "transport": "合规出行服务有限公司", "travel_allowance": "系统差旅补贴测算", "office": "合规办公用品有限公司", "communication": "合规通信服务有限公司", "meal": "合规餐饮服务有限公司", }.get(normalized, "合规票据供应商") def _meta_payload( *, storage_key: str, filename: str, file_path: Path, claim: ExpenseClaim, item: ExpenseClaimItem, document_type: str, document_label: str, scene_code: str, scene_label: str, ocr_text: str, violated: bool, ) -> dict[str, Any]: amount_text = _display_amount(item.item_amount) document_info = { "document_type": document_type, "document_type_label": document_label, "scene_code": scene_code, "scene_label": scene_label, "fields": [ {"key": "invoice_no", "label": "发票号码", "value": _invoice_no(filename)}, {"key": "invoice_date", "label": "开票日期", "value": item.item_date.isoformat()}, {"key": "amount", "label": "金额", "value": amount_text}, {"key": "location", "label": "地点", "value": str(item.item_location or "")}, { "key": "merchant", "label": "开票方", "value": _merchant_name(item.item_type, violated), }, ], } requirement_check = _requirement_payload( violated, item, document_type, document_label, scene_code, scene_label, ) ocr_summary = f"{document_label},金额 {amount_text},{'需复核' if violated else '字段匹配'}。" return { "source": "half_year_expense_demo_mock", "file_name": filename, "storage_key": storage_key, "media_type": "text/plain", "size_bytes": file_path.stat().st_size, "uploaded_at": datetime.now(UTC).isoformat(), "previewable": False, "preview_kind": "", "preview_storage_key": "", "preview_media_type": "", "preview_file_name": "", "analysis": _analysis_payload(violated, claim, item), "document_info": document_info, "requirement_check": requirement_check, "ocr_status": "mocked", "ocr_error": "", "ocr_text": ocr_text, "ocr_summary": ocr_summary, "ocr_avg_score": 0.97 if not violated else 0.81, "ocr_line_count": len(ocr_text.splitlines()), "ocr_classification_source": "mock_rule", "ocr_classification_confidence": 0.96 if not violated else 0.78, "ocr_classification_evidence": [document_label, scene_label], "ocr_warnings": ["mock违规样本"] if violated else [], } def _analysis_payload( violated: bool, claim: ExpenseClaim, item: ExpenseClaimItem, ) -> dict[str, Any]: if violated: return { "severity": "warning", "label": "需复核", "headline": "票据字段存在合规疑点", "summary": "系统 mock 的 OCR 字段与报销场景存在偏差,用于演示违规样本。", "points": [ f"报销单 {claim.claim_no} 金额或场景需要人工复核。", f"费用明细:{item.item_reason},金额 {_display_amount(item.item_amount)}。", ], "rule_basis": ["票据金额与费用明细一致性", "票据场景与费用科目匹配"], "suggestion": "请核对票据原件、业务事由和费用归口后再提交或付款。", } return { "severity": "success", "label": "合规", "headline": "票据字段与报销明细一致", "summary": "系统 mock 的 OCR 字段已覆盖金额、日期、地点和票据类型。", "points": [ f"金额 {_display_amount(item.item_amount)} 与费用明细一致。", f"票据类型匹配 {item.item_reason}。", ], "rule_basis": ["基础票据完整性", "金额一致性"], "suggestion": "当前材料可作为演示合规样本。", } def _requirement_payload( violated: bool, item: ExpenseClaimItem, document_type: str, document_label: str, scene_code: str, scene_label: str, ) -> dict[str, Any]: return { "matches": not violated, "current_expense_type": str(item.item_type or "other"), "current_expense_type_label": str(item.item_reason or "费用明细"), "allowed_scene_labels": [scene_label], "recognized_scene_code": scene_code, "recognized_scene_label": scene_label, "recognized_document_type": document_type, "recognized_document_type_label": document_label, "message": "材料匹配,可继续处理。" if not violated else "材料存在疑点,建议人工复核。", } def _invoice_no(filename: str) -> str: return Path(filename).stem.replace("-", "").upper()[-20:] def _display_amount(value: Decimal | float | int | str | None) -> str: amount = Decimal(str(value or "0")).quantize(Decimal("0.01")) return f"{amount:.2f}" if __name__ == "__main__": main()