from __future__ import annotations import base64 import binascii import json import mimetypes import re import shutil from collections import defaultdict from datetime import UTC, date, datetime, timedelta from decimal import Decimal, InvalidOperation from pathlib import Path from types import SimpleNamespace from typing import Any from urllib.parse import quote from sqlalchemy import and_, func, or_, select from sqlalchemy.exc import IntegrityError from sqlalchemy.orm import Session, selectinload from app.api.deps import CurrentUserContext from app.core.agent_enums import AgentAssetDomain, AgentAssetStatus, AgentAssetType from app.core.config import get_settings from app.models.agent_asset import AgentAsset from app.models.employee import Employee from app.models.financial_record import ExpenseClaim, ExpenseClaimItem from app.models.organization import OrganizationUnit from app.schemas.ontology import OntologyEntity, OntologyParseResult from app.schemas.reimbursement import ExpenseClaimItemCreate, ExpenseClaimItemUpdate from app.services.agent_asset_rule_library import AgentAssetRuleLibraryManager from app.services.agent_asset_spreadsheet import RISK_RULES_LIBRARY from app.services.agent_foundation import AgentFoundationService from app.services.audit import AuditLogService from app.services.document_intelligence import build_document_insight from app.services.expense_rule_runtime import ( DEFAULT_SCENE_RULE_ASSET_CODE, ExpenseRuleRuntimeService, RuntimeTravelPolicy, build_default_expense_rule_catalog, resolve_document_type_label, ) from app.services.ocr import OcrService EXPENSE_TYPE_LABELS = { "travel": "差旅", "hotel": "住宿", "transport": "交通", "meal": "餐费", "meeting": "会务", "entertainment": "招待", "office": "办公", "training": "培训", "communication": "通讯", "welfare": "福利", } PRIVILEGED_CLAIM_ROLE_CODES = {"finance", "executive"} APPROVAL_VISIBLE_CLAIM_ROLE_CODES = {"manager", "approver"} MAX_DRAFT_CLAIMS_PER_USER = 3 LOCATION_REQUIRED_EXPENSE_TYPES = { "travel", "meeting", "entertainment", } class ExpenseClaimSubmissionBlockedError(ValueError): def __init__(self, issues: list[str]) -> None: self.issues = [str(issue or "").strip() for issue in issues if str(issue or "").strip()] super().__init__("提交前请先补全信息:" + ";".join(self.issues)) EXPENSE_SCENE_KEYWORDS = { "travel": ("差旅", "出差", "行程"), "hotel": ("酒店", "住宿", "房费", "客房", "入住", "离店"), "transport": ( "交通", "打车", "出租车", "网约车", "滴滴", "出行", "高铁", "动车", "火车", "机票", "航班", "行程单", "登机", "客票", "公交", "地铁", "过路费", "通行费", "停车", ), "meal": ("餐饮", "餐费", "用餐", "外卖", "快餐", "酒楼", "饭店", "饭馆", "食品", "咖啡"), "entertainment": ("招待", "宴请", "接待", "客户餐", "商务餐", "业务招待"), "office": ("办公", "办公用品", "文具", "耗材", "打印", "纸张", "硒鼓", "墨盒", "鼠标", "键盘", "电脑"), "meeting": ("会议", "会务", "会展", "会议室", "会场", "场地费", "论坛"), "training": ("培训", "课程", "讲师", "教材", "学费", "认证"), } EXPENSE_TYPE_ALLOWED_DOCUMENT_SCENES = { "travel": {"travel", "hotel", "transport", "meal"}, "hotel": {"hotel"}, "transport": {"transport", "travel"}, "meal": {"meal", "entertainment"}, "entertainment": {"entertainment", "meal"}, "office": {"office"}, "meeting": {"meeting"}, "training": {"training"}, } DOCUMENT_SCENE_LABELS = { "travel": "差旅", "hotel": "住宿", "transport": "交通", "meal": "餐饮", "entertainment": "业务招待", "office": "办公用品", "meeting": "会务", "training": "培训", "other": "其他票据", } DOCUMENT_ASSOCIATION_REVIEW_ACTIONS = { "link_to_existing_draft", "create_new_claim_from_documents", } MAX_CLAIM_NO_RETRY_ATTEMPTS = 3 DOCUMENT_AMOUNT_PATTERNS = ( re.compile( r"(?:价税合计|合计金额|费用合计|订单(?:总)?金额|支付(?:金额)?|实付(?:金额)?|实收(?:金额)?|总(?:额|计|价)|票价|金额|车费|消费金额)" r"[::\s¥¥人民币]*([0-9]+(?:[.,][0-9]{1,2})?)" ), re.compile(r"[¥¥]\s*([0-9]+(?:[.,][0-9]{1,2})?)"), re.compile(r"([0-9]+(?:[.,][0-9]{1,2})?)\s*元"), ) DOCUMENT_DATE_PATTERN = re.compile(r"((?:20\d{2}|19\d{2})[-/年.](?:1[0-2]|0?[1-9])[-/月.](?:3[01]|[12]\d|0?[1-9])日?)") SYSTEM_GENERATED_REASON_PREFIXES = ( "我上传了", "请按当前已识别信息", "请把当前上传的票据", "请基于当前上传的多张票据", "我已核对右侧识别结果", "请同步修正逐票据识别结果", "我已修改识别信息", "查看报销草稿", "请解释一下当前这笔报销的合规风险和待补充项", ) AI_REVIEW_LOOKBACK_DAYS = 90 AI_REVIEW_REPEAT_RISK_WARNING_COUNT = 1 AI_REVIEW_REPEAT_RISK_BLOCK_COUNT = 2 TRAVEL_REVIEW_RELEVANT_EXPENSE_TYPES = {"travel", "hotel", "transport"} TRAVEL_REVIEW_LONG_DISTANCE_DOCUMENT_TYPES = {"flight_itinerary", "train_ticket"} TRAVEL_POLICY_CITY_TIERS = { "北京": "tier_1", "上海": "tier_1", "广州": "tier_1", "深圳": "tier_1", "杭州": "tier_2", "南京": "tier_2", "苏州": "tier_2", "武汉": "tier_2", "成都": "tier_2", "重庆": "tier_2", "西安": "tier_2", "天津": "tier_2", "宁波": "tier_2", "厦门": "tier_2", "青岛": "tier_2", "长沙": "tier_2", "郑州": "tier_2", "合肥": "tier_2", "济南": "tier_2", "沈阳": "tier_2", "大连": "tier_2", "福州": "tier_2", "昆明": "tier_2", "海口": "tier_2", "三亚": "tier_2", "无锡": "tier_2", "东莞": "tier_2", "佛山": "tier_2", } TRAVEL_POLICY_CITY_MATCH_ORDER = tuple( sorted(TRAVEL_POLICY_CITY_TIERS.keys(), key=lambda item: len(item), reverse=True) ) TRAVEL_POLICY_BAND_LABELS = { "junior": "P1-P3", "mid": "P4-P5", "senior": "P6-P7", "manager": "M1-M2", "executive": "M3及以上 / D序列", } TRAVEL_POLICY_HOTEL_LIMITS = { "junior": { "tier_1": Decimal("450.00"), "tier_2": Decimal("380.00"), "tier_3": Decimal("320.00"), }, "mid": { "tier_1": Decimal("550.00"), "tier_2": Decimal("480.00"), "tier_3": Decimal("380.00"), }, "senior": { "tier_1": Decimal("700.00"), "tier_2": Decimal("620.00"), "tier_3": Decimal("520.00"), }, "manager": { "tier_1": Decimal("900.00"), "tier_2": Decimal("820.00"), "tier_3": Decimal("720.00"), }, "executive": { "tier_1": Decimal("1200.00"), "tier_2": Decimal("1000.00"), "tier_3": Decimal("900.00"), }, } TRAVEL_POLICY_ALLOWED_TRANSPORT_LEVELS = { "junior": {"flight": 1, "train": 1}, "mid": {"flight": 1, "train": 1}, "senior": {"flight": 2, "train": 2}, "manager": {"flight": 3, "train": 3}, "executive": {"flight": 4, "train": 3}, } TRAVEL_POLICY_ROUTE_EXCEPTION_KEYWORDS = ( "中转", "转机", "经停", "改签", "多地出差", "多城市", "多站", "异地返程", "异地结束", "临时变更", "继续前往", "第二站", ) TRAVEL_POLICY_STANDARD_EXCEPTION_KEYWORDS = ( "超标说明", "无直达", "展会高峰", "会议高峰", "协议酒店满房", "客户指定", "临时改签", "行程变更", "红眼航班", "晚到店", ) TRAVEL_POLICY_FLIGHT_CLASS_PATTERNS = ( ("头等舱", 4), ("公务舱", 3), ("商务舱", 3), ("超级经济舱", 2), ("高端经济舱", 2), ("明珠经济舱", 2), ("经济舱", 1), ) TRAVEL_POLICY_TRAIN_CLASS_PATTERNS = ( ("商务座", 3), ("一等座", 2), ("软卧", 2), ("二等座", 1), ("二等卧", 1), ("硬卧", 1), ) TRAVEL_POLICY_HOTEL_NIGHT_PATTERN = re.compile(r"(\d+)\s*(?:晚|间夜)") class ExpenseClaimService: def __init__(self, db: Session) -> None: self.db = db self.audit_service = AuditLogService(db) def list_claims(self, current_user: CurrentUserContext) -> list[ExpenseClaim]: stmt = ( select(ExpenseClaim) .options( selectinload(ExpenseClaim.items), selectinload(ExpenseClaim.employee).selectinload(Employee.manager), selectinload(ExpenseClaim.employee).selectinload(Employee.roles), ) .order_by(ExpenseClaim.created_at.desc(), ExpenseClaim.occurred_at.desc()) ) stmt = self._apply_claim_scope(stmt, current_user) return list(self.db.scalars(stmt).all()) def get_claim(self, claim_id: str, current_user: CurrentUserContext) -> ExpenseClaim | None: stmt = ( select(ExpenseClaim) .options( selectinload(ExpenseClaim.items), selectinload(ExpenseClaim.employee).selectinload(Employee.manager), selectinload(ExpenseClaim.employee).selectinload(Employee.roles), ) .where(ExpenseClaim.id == claim_id) ) stmt = self._apply_claim_scope(stmt, current_user) return self.db.scalar(stmt) def update_claim_item( self, *, claim_id: str, item_id: str, payload: ExpenseClaimItemUpdate, current_user: CurrentUserContext, ) -> ExpenseClaim | None: claim = self.get_claim(claim_id, current_user) if claim is None: return None self._ensure_draft_claim(claim) item = next((entry for entry in claim.items if entry.id == item_id), None) if item is None: raise LookupError("Item not found") before_json = self._serialize_claim(claim) if payload.item_date is not None: item.item_date = payload.item_date if payload.item_type is not None: item.item_type = self._normalize_optional_text(payload.item_type, fallback=item.item_type) or item.item_type if payload.item_reason is not None: item.item_reason = ( self._normalize_optional_text(payload.item_reason, fallback=item.item_reason) or item.item_reason ) if payload.item_location is not None: item.item_location = ( self._normalize_optional_text(payload.item_location, fallback=item.item_location) or item.item_location ) if payload.item_amount is not None: amount = payload.item_amount.quantize(Decimal("0.01")) if amount <= Decimal("0.00"): raise ValueError("费用金额必须大于 0。") item.item_amount = amount if payload.invoice_id is not None: item.invoice_id = self._normalize_optional_text(payload.invoice_id, allow_empty=True) self._refresh_item_attachment_analysis(item) self._sync_claim_from_items(claim) self.db.commit() self.db.refresh(claim) self.audit_service.log_action( actor=current_user.name or current_user.username, action="expense_claim.item_update", resource_type="expense_claim", resource_id=claim.id, before_json=before_json, after_json=self._serialize_claim(claim), ) return claim def create_claim_item( self, *, claim_id: str, payload: ExpenseClaimItemCreate | None, current_user: CurrentUserContext, ) -> ExpenseClaim | None: claim = self.get_claim(claim_id, current_user) if claim is None: return None self._ensure_draft_claim(claim) before_json = self._serialize_claim(claim) payload = payload or ExpenseClaimItemCreate() occurred_at = claim.occurred_at if claim.occurred_at is not None else datetime.now(UTC) item_amount = Decimal("0.00") if payload.item_amount is not None: item_amount = payload.item_amount.quantize(Decimal("0.01")) if item_amount < Decimal("0.00"): raise ValueError("费用金额不能小于 0。") item = ExpenseClaimItem( claim_id=claim.id, item_date=payload.item_date or occurred_at.date(), item_type=self._normalize_optional_text( payload.item_type, fallback=str(claim.expense_type or "").strip() or "other", ) or "other", item_reason=self._normalize_optional_text(payload.item_reason, fallback="") or "", item_location=self._normalize_optional_text(payload.item_location, fallback="") or "", item_amount=item_amount, invoice_id=self._normalize_optional_text(payload.invoice_id, allow_empty=True), ) claim.items.append(item) self.db.add(item) self._sync_claim_from_items(claim) self.db.commit() self.db.refresh(claim) self.audit_service.log_action( actor=current_user.name or current_user.username, action="expense_claim.item_create", resource_type="expense_claim", resource_id=claim.id, before_json=before_json, after_json=self._serialize_claim(claim), ) return claim def delete_claim_item( self, *, claim_id: str, item_id: str, current_user: CurrentUserContext, ) -> dict[str, Any] | None: claim, item = self._get_claim_item_or_raise( claim_id=claim_id, item_id=item_id, current_user=current_user, ) if claim is None: return None self._ensure_draft_claim(claim) before_json = self._serialize_claim(claim) item_label = str(item.item_reason or "").strip() or self._resolve_expense_type_label(item.item_type) self._delete_item_attachment_files(item) claim.items = [entry for entry in claim.items if entry.id != item.id] self.db.delete(item) self._sync_claim_from_items(claim) self.db.commit() self.db.refresh(claim) self.audit_service.log_action( actor=current_user.name or current_user.username, action="expense_claim.item_delete", resource_type="expense_claim", resource_id=claim.id, before_json=before_json, after_json=self._serialize_claim(claim), ) return { "message": f"费用明细“{item_label}”已删除。", "claim_id": claim.id, "item_id": item.id, } def upload_claim_item_attachment( self, *, claim_id: str, item_id: str, filename: str, content: bytes, media_type: str | None, current_user: CurrentUserContext, ) -> dict[str, Any] | None: claim, item = self._get_claim_item_or_raise( claim_id=claim_id, item_id=item_id, current_user=current_user, ) if claim is None: return None self._ensure_draft_claim(claim) normalized_name = self._normalize_attachment_filename(filename) if not content: raise ValueError("上传文件不能为空。") before_json = self._serialize_claim(claim) attachment_dir = self._build_item_attachment_dir(claim.id, item.id) shutil.rmtree(attachment_dir, ignore_errors=True) attachment_dir.mkdir(parents=True, exist_ok=True) file_path = attachment_dir / normalized_name file_path.write_bytes(content) resolved_media_type = self._resolve_attachment_media_type( normalized_name, fallback=media_type, ) attachment_analysis = self._build_fallback_attachment_analysis( media_type=media_type, item=item, ) ocr_document = None document_info = None requirement_check = None ocr_status = "empty" ocr_error = "" try: ocr_result = OcrService(self.db).recognize_files( [(normalized_name, content, media_type or "application/octet-stream")] ) documents = list(ocr_result.documents or []) if documents: ocr_document = documents[0] ocr_status = "recognized" document_info = self._build_attachment_document_info(ocr_document) requirement_check = self._build_attachment_requirement_check( item=item, document_info=document_info, ) attachment_analysis = self._build_attachment_analysis( document=ocr_document, item=item, document_info=document_info, requirement_check=requirement_check, ) except Exception as exc: # pragma: no cover - fallback path depends on OCR runtime ocr_status = "failed" ocr_error = str(exc) attachment_analysis = self._build_failed_ocr_attachment_analysis( media_type=media_type, error_message=ocr_error, item=item, ) item.invoice_id = self._to_attachment_storage_key(file_path) preview_meta = self._build_attachment_preview_meta( file_path=file_path, media_type=resolved_media_type, ocr_document=ocr_document, ) meta = { "file_name": normalized_name, "storage_key": item.invoice_id, "media_type": resolved_media_type, "size_bytes": len(content), "uploaded_at": datetime.now(UTC).isoformat(), "previewable": bool(preview_meta["previewable"]), "preview_kind": str(preview_meta["preview_kind"]), "preview_storage_key": str(preview_meta["preview_storage_key"]), "preview_media_type": str(preview_meta["preview_media_type"]), "preview_file_name": str(preview_meta["preview_file_name"]), "analysis": attachment_analysis, "document_info": document_info, "requirement_check": requirement_check, "ocr_status": ocr_status, "ocr_error": ocr_error, "ocr_text": str(getattr(ocr_document, "text", "") or ""), "ocr_summary": str(getattr(ocr_document, "summary", "") or ""), "ocr_avg_score": float(getattr(ocr_document, "avg_score", 0.0) or 0.0), "ocr_line_count": int(getattr(ocr_document, "line_count", 0) or 0), "ocr_classification_source": str(getattr(ocr_document, "classification_source", "") or ""), "ocr_classification_confidence": float(getattr(ocr_document, "classification_confidence", 0.0) or 0.0), "ocr_classification_evidence": [ str(item) for item in getattr(ocr_document, "classification_evidence", []) or [] if str(item).strip() ], "ocr_warnings": [str(item) for item in getattr(ocr_document, "warnings", []) or []], } self._write_attachment_meta(file_path, meta) self._sync_claim_from_items(claim) self.db.commit() self.db.refresh(claim) self.audit_service.log_action( actor=current_user.name or current_user.username, action="expense_claim.attachment_upload", resource_type="expense_claim", resource_id=claim.id, before_json=before_json, after_json=self._serialize_claim(claim), ) return { "message": f"{normalized_name} 已上传并关联到当前费用明细。", "claim_id": claim.id, "item_id": item.id, "invoice_id": item.invoice_id, "attachment": self._build_attachment_payload(item), } def get_claim_item_attachment_meta( self, *, claim_id: str, item_id: str, current_user: CurrentUserContext, ) -> dict[str, Any] | None: claim, item = self._get_claim_item_or_raise( claim_id=claim_id, item_id=item_id, current_user=current_user, ) if claim is None: return None return self._build_attachment_payload(item) def get_claim_item_attachment_content( self, *, claim_id: str, item_id: str, current_user: CurrentUserContext, ) -> tuple[Path, str, str] | None: claim, item = self._get_claim_item_or_raise( claim_id=claim_id, item_id=item_id, current_user=current_user, ) if claim is None: return None return self._resolve_item_attachment_content(item) def get_claim_item_attachment_preview_content( self, *, claim_id: str, item_id: str, current_user: CurrentUserContext, ) -> tuple[Path, str, str] | None: claim, item = self._get_claim_item_or_raise( claim_id=claim_id, item_id=item_id, current_user=current_user, ) if claim is None: return None return self._resolve_item_attachment_preview_content(item) def delete_claim_item_attachment( self, *, claim_id: str, item_id: str, current_user: CurrentUserContext, ) -> dict[str, Any] | None: claim, item = self._get_claim_item_or_raise( claim_id=claim_id, item_id=item_id, current_user=current_user, ) if claim is None: return None self._ensure_draft_claim(claim) before_json = self._serialize_claim(claim) previous_name = self._resolve_attachment_display_name(item.invoice_id) self._delete_item_attachment_files(item) item.invoice_id = None self._sync_claim_from_items(claim) self.db.commit() self.db.refresh(claim) self.audit_service.log_action( actor=current_user.name or current_user.username, action="expense_claim.attachment_delete", resource_type="expense_claim", resource_id=claim.id, before_json=before_json, after_json=self._serialize_claim(claim), ) return { "message": f"{previous_name or '附件'} 已删除。", "claim_id": claim.id, "item_id": item.id, "invoice_id": item.invoice_id, "attachment": None, } def submit_claim(self, claim_id: str, current_user: CurrentUserContext) -> ExpenseClaim | None: claim = self.get_claim(claim_id, current_user) if claim is None: return None self._ensure_draft_claim(claim) self._backfill_claim_identity_from_current_user(claim, current_user) self._sync_claim_from_items(claim) missing_fields = self._validate_claim_for_submission(claim) if missing_fields: raise ExpenseClaimSubmissionBlockedError(missing_fields) before_json = self._serialize_claim(claim) review_result = self._run_ai_submission_review(claim) claim.status = str(review_result.get("status") or "supplement") claim.approval_stage = str(review_result.get("approval_stage") or "待补充") claim.risk_flags_json = list(review_result.get("risk_flags") or []) claim.submitted_at = datetime.now(UTC) if claim.status == "submitted" else None self.db.commit() self.db.refresh(claim) self.audit_service.log_action( actor=current_user.name or current_user.username, action="expense_claim.submit", resource_type="expense_claim", resource_id=claim.id, before_json=before_json, after_json=self._serialize_claim(claim), ) return claim def save_or_submit_from_ontology( self, *, run_id: str, user_id: str | None, message: str, ontology: OntologyParseResult, context_json: dict[str, Any], ) -> dict[str, Any]: result = self.upsert_draft_from_ontology( run_id=run_id, user_id=user_id, message=message, ontology=ontology, context_json=context_json, ) review_action = str(context_json.get("review_action") or "").strip() if review_action != "next_step": return result claim_id = str(result.get("claim_id") or "").strip() if not claim_id or result.get("draft_limit_reached"): return result current_user = CurrentUserContext( username=str(user_id or context_json.get("name") or "anonymous").strip() or "anonymous", name=str(context_json.get("name") or user_id or "anonymous").strip() or "anonymous", role_codes=[ str(item).strip() for item in list(context_json.get("role_codes") or []) if str(item).strip() ], is_admin=bool(context_json.get("is_admin")), department_name=str(context_json.get("department_name") or context_json.get("department") or "").strip(), ) try: claim = self.submit_claim(claim_id, current_user) except ExpenseClaimSubmissionBlockedError as exc: return { **result, "message": self._format_submission_blocked_message(exc.issues), "submission_blocked": True, "submission_blocked_reasons": exc.issues, "missing_fields": exc.issues, "draft_only": False, } except ValueError as exc: message = str(exc) return { **result, "message": message, "submission_blocked": True, "submission_blocked_reasons": [message] if message else [], "missing_fields": [message] if message else [], "draft_only": False, } if claim is None: return { **result, "message": "未找到可提交的报销单,请刷新后重试。", "submission_blocked": True, "draft_only": False, } if str(claim.status or "").strip().lower() != "submitted": review_message = "" for flag in list(claim.risk_flags_json or []): if not isinstance(flag, dict): continue if str(flag.get("source") or "").strip() != "submission_review": continue review_message = str(flag.get("message") or "").strip() if review_message: break return { "message": review_message or f"报销单 {claim.claim_no} 经 AI预审后转为待补充,请先修正后再提交。", "submission_blocked": True, "draft_only": False, "claim_id": claim.id, "claim_no": claim.claim_no, "status": claim.status, "approval_stage": claim.approval_stage, "amount": float(claim.amount), "invoice_count": int(claim.invoice_count or 0), } return { "message": ( f"报销单 {claim.claim_no} 已完成 AI预审," f"当前节点为 {claim.approval_stage or '审批中'}。" ), "draft_only": False, "claim_id": claim.id, "claim_no": claim.claim_no, "status": claim.status, "approval_stage": claim.approval_stage, "amount": float(claim.amount), "invoice_count": int(claim.invoice_count or 0), } def delete_claim(self, claim_id: str, current_user: CurrentUserContext) -> ExpenseClaim | None: claim = self.get_claim(claim_id, current_user) if claim is None: return None if not self._has_privileged_claim_access(current_user): self._ensure_draft_claim(claim) before_json = self._serialize_claim(claim) resource_id = claim.id self._delete_claim_attachment_root(claim.id) self.db.delete(claim) self.db.commit() self.audit_service.log_action( actor=current_user.name or current_user.username, action="expense_claim.delete", resource_type="expense_claim", resource_id=resource_id, before_json=before_json, after_json=None, ) return claim def return_claim( self, claim_id: str, current_user: CurrentUserContext, *, reason: str | None = None, ) -> ExpenseClaim | None: claim = self.get_claim(claim_id, current_user) if claim is None: return None if not self._has_privileged_claim_access(current_user): raise ValueError("只有财务人员或高级管理人员可以退回报销单。") normalized_status = str(claim.status or "").strip().lower() if normalized_status == "draft": raise ValueError("草稿状态无需退回。") if normalized_status in {"approved", "completed", "paid"}: raise ValueError("已完成单据不允许退回。") before_json = self._serialize_claim(claim) operator = current_user.name or current_user.username return_reason = str(reason or "").strip() return_flag = { "source": "manual_return", "severity": "medium", "label": "人工退回", "message": return_reason or f"{operator} 已退回该报销单,请申请人调整后重新提交。", "operator": operator, "created_at": datetime.now(UTC).isoformat(), } claim.status = "returned" claim.approval_stage = "待提交" claim.risk_flags_json = [*list(claim.risk_flags_json or []), return_flag] self.db.commit() self.db.refresh(claim) self.audit_service.log_action( actor=operator, action="expense_claim.return", resource_type="expense_claim", resource_id=claim.id, before_json=before_json, after_json=self._serialize_claim(claim), ) return claim def upsert_draft_from_ontology( self, *, run_id: str, user_id: str | None, message: str, ontology: OntologyParseResult, context_json: dict[str, Any], ) -> dict[str, Any]: self._ensure_ready() context_json = dict(context_json or {}) retry_count = self._resolve_claim_no_retry_count(context_json) review_action = str(context_json.get("review_action") or "").strip() attachment_names = self._resolve_attachment_names(context_json) context_documents = self._resolve_context_documents(context_json) employee = self._resolve_employee( ontology=ontology, context_json=context_json, user_id=user_id, ) draft_owner_name = ( employee.name if employee is not None else self._resolve_employee_name( ontology=ontology, context_json=context_json, user_id=user_id, ) ) association_candidate = self._find_association_candidate( ontology=ontology, context_json=context_json, user_id=user_id, employee=employee, ) if self._should_defer_multi_document_association( context_json=context_json, review_action=review_action, association_candidate=association_candidate, context_documents=context_documents, ): document_count = max(len(context_documents), len(attachment_names), self._resolve_attachment_count(context_json)) return { "message": ( f"检测到你已有草稿 {association_candidate.claim_no}," f"当前新上传了 {document_count} 张票据,请先选择关联到现有草稿,或单独建立新的报销单。" ), "draft_only": False, "status": "pending_association_decision", "pending_association_decision": True, "association_candidate_claim_id": association_candidate.id, "association_candidate_claim_no": association_candidate.claim_no, } claim = self._find_target_claim( ontology=ontology, context_json=context_json, review_action=review_action, association_candidate=association_candidate, ) is_new_claim = claim is None before_json = self._serialize_claim(claim) if claim is not None else None if is_new_claim: existing_draft_count = self._count_draft_claims_for_owner( employee=employee, user_id=user_id, ) if existing_draft_count >= MAX_DRAFT_CLAIMS_PER_USER: return { "message": ( f"你当前已保存 {MAX_DRAFT_CLAIMS_PER_USER} 个草稿,请先完成已保存的草稿," "才能再次新建草稿。" ), "draft_limit_reached": True, "draft_only": False, "status": "blocked", "draft_count": existing_draft_count, "max_draft_count": MAX_DRAFT_CLAIMS_PER_USER, } amount = self._resolve_amount(ontology.entities, context_json=context_json) occurred_at = self._resolve_occurred_at(ontology, context_json=context_json) expense_type = self._resolve_expense_type(ontology.entities, context_json=context_json) location = self._resolve_location(message=message, context_json=context_json) reason = self._resolve_reason( message=message, context_json=context_json, allow_message_fallback=is_new_claim, ) attachment_count = len(attachment_names) or self._resolve_attachment_count(context_json) final_amount = amount if amount is not None else (claim.amount if claim is not None else Decimal("0.00")) final_occurred_at = ( occurred_at if occurred_at is not None else (claim.occurred_at if claim is not None else datetime.now(UTC)) ) final_expense_type = expense_type or (claim.expense_type if claim is not None else "other") final_location = location or (claim.location if claim is not None else "待补充") final_reason = reason or (claim.reason if claim is not None else "待补充") final_attachment_count = ( attachment_count if attachment_count > 0 else int(claim.invoice_count or 0) if claim is not None else 0 ) final_risk_flags = list(ontology.risk_flags) or ( list(claim.risk_flags_json or []) if claim is not None else [] ) try: if claim is None: claim = ExpenseClaim( claim_no=self._generate_claim_no(final_occurred_at), employee_id=employee.id if employee is not None else None, employee_name=draft_owner_name, department_id=employee.organization_unit_id if employee is not None else None, department_name=self._resolve_department_name( employee=employee, context_json=context_json, ), project_code=self._resolve_project_code(ontology.entities), expense_type=final_expense_type, reason=final_reason, location=final_location, amount=final_amount, currency="CNY", invoice_count=final_attachment_count, occurred_at=final_occurred_at, status="draft", approval_stage="待提交", risk_flags_json=final_risk_flags, ) self.db.add(claim) else: claim.employee_id = employee.id if employee is not None else claim.employee_id claim.employee_name = ( employee.name if employee is not None else self._resolve_employee_name( ontology=ontology, context_json=context_json, user_id=user_id, fallback=claim.employee_name, ) ) claim.department_id = employee.organization_unit_id if employee is not None else claim.department_id claim.department_name = self._resolve_department_name( employee=employee, context_json=context_json, fallback=claim.department_name, ) claim.project_code = self._resolve_project_code(ontology.entities) or claim.project_code claim.expense_type = final_expense_type claim.reason = final_reason claim.location = final_location claim.amount = final_amount claim.invoice_count = final_attachment_count claim.occurred_at = final_occurred_at claim.status = "draft" claim.approval_stage = "待提交" claim.risk_flags_json = final_risk_flags self.db.flush() if context_documents or attachment_names: document_specs = self._build_context_item_specs( context_documents=context_documents, attachment_names=attachment_names, occurred_at=final_occurred_at, expense_type=final_expense_type, amount=final_amount, reason=final_reason, location=final_location, ) else: document_specs = [] if document_specs and (is_new_claim or review_action in DOCUMENT_ASSOCIATION_REVIEW_ACTIONS): if review_action == "link_to_existing_draft" and claim.items: self._append_document_items( claim=claim, item_specs=document_specs, ) else: self._replace_claim_items( claim=claim, item_specs=document_specs, ) self._sync_claim_from_items(claim) else: self._upsert_primary_item( claim=claim, occurred_at=final_occurred_at, expense_type=final_expense_type, amount=final_amount, reason=final_reason, location=final_location, attachment_names=attachment_names, ) self._sync_claim_from_items(claim) self.db.commit() self.db.refresh(claim) except IntegrityError as exc: self.db.rollback() if ( is_new_claim and retry_count < MAX_CLAIM_NO_RETRY_ATTEMPTS and self._is_claim_no_conflict_error(exc) ): retry_context = dict(context_json) retry_context["_claim_no_retry_count"] = retry_count + 1 return self.upsert_draft_from_ontology( run_id=run_id, user_id=user_id, message=message, ontology=ontology, context_json=retry_context, ) raise except Exception: self.db.rollback() raise self.audit_service.log_action( actor=user_id or claim.employee_name or "anonymous", action="expense_claim.draft_upsert", resource_type="expense_claim", resource_id=claim.id, before_json=before_json, after_json=self._serialize_claim(claim), request_id=run_id, ) return { "message": ( f"已{'创建' if is_new_claim else '更新'}报销草稿 {claim.claim_no},当前状态为 draft。" "你可以继续补充费用明细、客户单位和票据附件。" ), "draft_only": True, "claim_id": claim.id, "claim_no": claim.claim_no, "status": claim.status, "amount": float(claim.amount), "invoice_count": int(claim.invoice_count or 0), } def _find_target_claim( self, *, ontology: OntologyParseResult, context_json: dict[str, Any], review_action: str = "", association_candidate: ExpenseClaim | None = None, ) -> ExpenseClaim | None: if review_action == "create_new_claim_from_documents": return None if review_action == "link_to_existing_draft" and association_candidate is not None: return association_candidate draft_claim_id = str(context_json.get("draft_claim_id") or "").strip() if draft_claim_id: claim = self.db.get(ExpenseClaim, draft_claim_id) if claim is not None and str(claim.status or "").strip() == "draft": return claim return None claim_codes = [ item.normalized_value for item in ontology.entities if item.type == "expense_claim" and item.normalized_value ] if not claim_codes: return None stmt = ( select(ExpenseClaim) .where(ExpenseClaim.claim_no.in_(claim_codes)) .where(ExpenseClaim.status == "draft") .limit(1) ) return self.db.scalar(stmt) def _find_association_candidate( self, *, ontology: OntologyParseResult, context_json: dict[str, Any], user_id: str | None, employee: Employee | None, ) -> ExpenseClaim | None: draft_claim_id = str(context_json.get("draft_claim_id") or "").strip() if draft_claim_id: claim = self.db.get(ExpenseClaim, draft_claim_id) if claim is not None and str(claim.status or "").strip() == "draft": return claim owner_filters = self._build_draft_owner_filters( employee=employee, user_id=user_id, ) if not owner_filters: fallback_name = self._resolve_employee_name( ontology=ontology, context_json=context_json, user_id=user_id, fallback="", ) if fallback_name: owner_filters = [ExpenseClaim.employee_name == fallback_name] if not owner_filters: return None stmt = ( select(ExpenseClaim) .where(ExpenseClaim.status == "draft") .where(or_(*owner_filters)) .order_by(ExpenseClaim.updated_at.desc(), ExpenseClaim.created_at.desc()) .limit(1) ) return self.db.scalar(stmt) def _should_defer_multi_document_association( self, *, context_json: dict[str, Any], review_action: str, association_candidate: ExpenseClaim | None, context_documents: list[dict[str, Any]], ) -> bool: if association_candidate is None: return False if review_action in DOCUMENT_ASSOCIATION_REVIEW_ACTIONS: return False document_count = max( len(context_documents), len(self._resolve_attachment_names(context_json)), self._resolve_attachment_count(context_json), ) return document_count > 1 def _resolve_context_documents(self, context_json: dict[str, Any]) -> list[dict[str, Any]]: documents = context_json.get("ocr_documents") if not isinstance(documents, list): documents = [] normalized: list[dict[str, Any]] = [] for index, item in enumerate(documents[:10], start=1): if not isinstance(item, dict): continue normalized.append( { "index": index, "filename": str(item.get("filename") or "").strip(), "summary": str(item.get("summary") or "").strip(), "text": str(item.get("text") or "").strip(), "document_type": str(item.get("document_type") or "").strip(), "scene_code": str(item.get("scene_code") or "").strip(), "scene_label": str(item.get("scene_label") or "").strip(), "document_fields": self._normalize_document_fields(item.get("document_fields")), } ) overrides = context_json.get("review_document_form_values") if not isinstance(overrides, list) or not normalized: return normalized override_map: dict[tuple[int, str], dict[str, Any]] = {} for item in overrides: if not isinstance(item, dict): continue filename = str(item.get("filename") or "").strip() index = int(item.get("index") or 0) if not filename and index <= 0: continue override_map[(index, filename)] = item for item in normalized: override = override_map.get((int(item["index"]), str(item["filename"]))) if override is None: override = override_map.get((int(item["index"]), "")) if override is None: continue summary = str(override.get("summary") or "").strip() scene_label = str(override.get("scene_label") or "").strip() fields = override.get("fields") if summary: item["summary"] = summary if scene_label: item["scene_label"] = scene_label if isinstance(fields, list): item["document_fields"] = self._normalize_document_fields(fields) return normalized @staticmethod def _normalize_document_fields(raw_fields: Any) -> list[dict[str, str]]: if not isinstance(raw_fields, list): return [] normalized: list[dict[str, str]] = [] for field in raw_fields: if not isinstance(field, dict): continue label = str(field.get("label") or "").strip() value = str(field.get("value") or "").strip() key = str(field.get("key") or label or "").strip() if not label or not value: continue normalized.append( { "key": key, "label": label, "value": value, } ) return normalized def _build_context_item_specs( self, *, context_documents: list[dict[str, Any]], attachment_names: list[str], occurred_at: datetime, expense_type: str, amount: Decimal, reason: str, location: str, ) -> list[dict[str, Any]]: specs: list[dict[str, Any]] = [] if context_documents: for document in context_documents: specs.append( { "item_date": self._resolve_document_item_date(document, fallback=occurred_at.date()), "item_type": self._resolve_document_item_type(document, fallback=expense_type), "item_reason": reason, "item_location": location, "item_amount": self._resolve_document_item_amount(document), "invoice_id": str(document.get("filename") or "").strip() or None, } ) elif attachment_names: for attachment_name in attachment_names: specs.append( { "item_date": occurred_at.date(), "item_type": expense_type, "item_reason": reason, "item_location": location, "item_amount": None, "invoice_id": attachment_name, } ) if not specs: return [] total_recognized = sum( spec["item_amount"] for spec in specs if isinstance(spec.get("item_amount"), Decimal) ) missing_specs = [spec for spec in specs if spec.get("item_amount") is None] if missing_specs: remaining = (amount - total_recognized).quantize(Decimal("0.01")) if remaining > Decimal("0.00"): missing_specs[0]["item_amount"] = remaining for spec in specs: if spec.get("item_amount") is None: spec["item_amount"] = Decimal("0.00") return specs def _replace_claim_items( self, *, claim: ExpenseClaim, item_specs: list[dict[str, Any]], ) -> None: existing_items = sorted( list(claim.items), key=lambda item: ( item.item_date or date.max, self._normalize_sort_datetime(item.created_at), ), ) for index, spec in enumerate(item_specs): item = existing_items[index] if index < len(existing_items) else None if item is None: item = ExpenseClaimItem(claim_id=claim.id) claim.items.append(item) self.db.add(item) item.item_date = spec["item_date"] item.item_type = spec["item_type"] item.item_reason = spec["item_reason"] item.item_location = spec["item_location"] item.item_amount = spec["item_amount"] item.invoice_id = spec["invoice_id"] for stale_item in existing_items[len(item_specs) :]: claim.items.remove(stale_item) self.db.delete(stale_item) def _append_document_items( self, *, claim: ExpenseClaim, item_specs: list[dict[str, Any]], ) -> None: existing_invoice_ids = { str(item.invoice_id or "").strip() for item in claim.items if str(item.invoice_id or "").strip() } for spec in item_specs: invoice_id = str(spec.get("invoice_id") or "").strip() if invoice_id and invoice_id in existing_invoice_ids: continue claim.items.append( ExpenseClaimItem( claim_id=claim.id, item_date=spec["item_date"], item_type=spec["item_type"], item_reason=spec["item_reason"], item_location=spec["item_location"], item_amount=spec["item_amount"], invoice_id=spec["invoice_id"], ) ) self.db.add(claim.items[-1]) if invoice_id: existing_invoice_ids.add(invoice_id) def _resolve_document_item_type(self, document: dict[str, Any], *, fallback: str) -> str: scene_code = str(document.get("scene_code") or "").strip() if scene_code in {"travel", "hotel", "transport", "meal", "office", "meeting", "training"}: return scene_code document_type = str(document.get("document_type") or "").strip() if document_type in {"flight_itinerary", "train_ticket"}: return "travel" if document_type in {"taxi_receipt", "parking_toll_receipt", "transport_receipt"}: return "transport" if document_type == "hotel_invoice": return "hotel" if document_type == "meal_receipt": return "meal" if document_type == "office_invoice": return "office" if document_type == "meeting_invoice": return "meeting" if document_type == "training_invoice": return "training" scene_label = str(document.get("scene_label") or "").strip() if "交通" in scene_label: return "transport" if "住宿" in scene_label: return "hotel" if "餐" in scene_label: return "meal" if "会务" in scene_label or "会议" in scene_label: return "meeting" if "培训" in scene_label: return "training" return fallback or "other" def _resolve_document_item_amount(self, document: dict[str, Any]) -> Decimal | None: for field in list(document.get("document_fields") or []): if not isinstance(field, dict): continue key = str(field.get("key") or "").strip().lower().replace("_", "") label = str(field.get("label") or "").replace(" ", "") value = self._parse_document_amount_value(str(field.get("value") or "")) if value is None: continue if key in { "amount", "totalamount", "paymentamount", "paidamount", "actualamount", } or any( token in label for token in ("金额", "价税合计", "合计", "总额", "总计", "票价", "支付金额", "实付金额", "实收金额") ): return value text = " ".join( [ str(document.get("summary") or "").strip(), str(document.get("text") or "").strip(), ] ).strip() return self._parse_document_amount_value(text) def _parse_document_amount_value(self, value: str) -> Decimal | None: raw_value = str(value or "").strip() if not raw_value: return None for pattern in DOCUMENT_AMOUNT_PATTERNS: match = pattern.search(raw_value) if not match: continue numeric = str(match.group(1) or "").replace(",", ".").strip() try: amount = Decimal(numeric).quantize(Decimal("0.01")) except (InvalidOperation, ValueError): continue if amount > Decimal("0.00"): return amount return None def _resolve_document_item_date(self, document: dict[str, Any], *, fallback: date) -> date: for field in list(document.get("document_fields") or []): if not isinstance(field, dict): continue key = str(field.get("key") or "").strip().lower().replace("_", "") label = str(field.get("label") or "").replace(" ", "") value = str(field.get("value") or "").strip() if not value: continue if key in {"date", "time", "issuedat", "invoicedate"} or any( token in label for token in ("日期", "时间", "开票日期", "发生时间") ): parsed = self._parse_document_date(value) if parsed is not None: return parsed parsed = self._parse_document_date( " ".join( [ str(document.get("summary") or "").strip(), str(document.get("text") or "").strip(), ] ).strip() ) return parsed or fallback @staticmethod def _parse_document_date(value: str) -> date | None: match = DOCUMENT_DATE_PATTERN.search(str(value or "")) if not match: return None raw_value = str(match.group(1) or "").strip() normalized = raw_value.replace("年", "-").replace("月", "-").replace("日", "") normalized = normalized.replace("/", "-").replace(".", "-") parts = [part for part in normalized.split("-") if part] if len(parts) != 3: return None try: return date(int(parts[0]), int(parts[1]), int(parts[2])) except ValueError: return None def _upsert_primary_item( self, *, claim: ExpenseClaim, occurred_at: datetime, expense_type: str, amount: Decimal, reason: str, location: str, attachment_names: list[str], ) -> None: item = claim.items[0] if claim.items else None if item is None: item = ExpenseClaimItem( claim_id=claim.id, item_date=occurred_at.date(), item_type=expense_type, item_reason=reason, item_location=location, item_amount=amount, invoice_id=attachment_names[0] if attachment_names else None, ) claim.items.append(item) self.db.add(item) return item.item_date = occurred_at.date() item.item_type = expense_type item.item_reason = reason item.item_location = location item.item_amount = amount item.invoice_id = attachment_names[0] if attachment_names else item.invoice_id def _generate_claim_no(self, occurred_at: datetime) -> str: month_code = occurred_at.strftime("%Y%m") prefix = f"EXP-{month_code}-" existing_claim_nos = list( self.db.scalars( select(ExpenseClaim.claim_no).where(ExpenseClaim.claim_no.like(f"{prefix}%")) ) ) max_suffix = 0 for claim_no in existing_claim_nos: normalized = str(claim_no or "").strip() if not normalized.startswith(prefix): continue suffix = normalized[len(prefix):] if not suffix.isdigit(): continue max_suffix = max(max_suffix, int(suffix)) return f"{prefix}{max_suffix + 1:03d}" @staticmethod def _resolve_claim_no_retry_count(context_json: dict[str, Any]) -> int: try: return max(0, int(context_json.get("_claim_no_retry_count") or 0)) except (TypeError, ValueError): return 0 @staticmethod def _is_claim_no_conflict_error(exc: IntegrityError) -> bool: message = str(exc).lower() return ( "claim_no" in message and ( "unique" in message or "duplicate key" in message or "ix_expense_claims_claim_no" in message or "expense_claims.claim_no" in message ) ) def _count_draft_claims_for_owner( self, *, employee: Employee | None, user_id: str | None, ) -> int: owner_filters = self._build_draft_owner_filters( employee=employee, user_id=user_id, ) if not owner_filters: return 0 stmt = ( select(func.count()) .select_from(ExpenseClaim) .where(ExpenseClaim.status == "draft") .where(or_(*owner_filters)) ) return int(self.db.scalar(stmt) or 0) def _build_draft_owner_filters( self, *, employee: Employee | None, user_id: str | None, ) -> list[Any]: conditions: list[Any] = [] seen: set[tuple[str, str]] = set() def add_condition(field_name: str, value: str | None) -> None: normalized = str(value or "").strip() if not normalized or normalized == "待补充": return marker = (field_name, normalized.lower()) if marker in seen: return seen.add(marker) if field_name == "employee_id": conditions.append(ExpenseClaim.employee_id == normalized) return conditions.append(ExpenseClaim.employee_name == normalized) if employee is not None: add_condition("employee_id", employee.id) add_condition("employee_name", employee.email) if self._employee_name_is_unique(employee): add_condition("employee_name", employee.name) add_condition("employee_name", user_id) return conditions def _resolve_employee( self, *, ontology: OntologyParseResult, context_json: dict[str, Any], user_id: str | None, ) -> Employee | None: normalized_user_id = str(user_id or "").strip() if normalized_user_id: stmt = ( select(Employee) .options(selectinload(Employee.organization_unit), selectinload(Employee.manager)) .where(func.lower(Employee.email) == normalized_user_id.lower()) .limit(1) ) employee = self.db.scalar(stmt) if employee is not None: return employee employee_name = self._resolve_employee_name( ontology=ontology, context_json=context_json, user_id=None, ) if not employee_name: return None stmt = ( select(Employee) .options(selectinload(Employee.organization_unit), selectinload(Employee.manager)) .where(Employee.name == employee_name) .limit(1) ) return self.db.scalar(stmt) @staticmethod def _resolve_employee_name( *, ontology: OntologyParseResult, context_json: dict[str, Any], user_id: str | None, fallback: str = "待补充", ) -> str: review_form_values = context_json.get("review_form_values") if isinstance(review_form_values, dict): for key in ("reporter_name", "employee_name", "claimant_name"): value = str(review_form_values.get(key) or "").strip() if value: return value for item in ontology.entities: if item.type == "employee" and item.value.strip(): return item.value.strip() for key in ("name", "user_name", "employee_name"): value = str(context_json.get(key) or "").strip() if value: return value return str(user_id or fallback).strip() or fallback @staticmethod def _resolve_department_name( *, employee: Employee | None, context_json: dict[str, Any], fallback: str = "待补充", ) -> str: if employee is not None and employee.organization_unit is not None: return employee.organization_unit.name request_context = context_json.get("request_context") if isinstance(request_context, dict): for key in ("department", "department_name", "deptName"): value = str(request_context.get(key) or "").strip() if value: return value for key in ("department_name", "department"): value = str(context_json.get(key) or "").strip() if value: return value return fallback @staticmethod def _resolve_project_code(entities: list[OntologyEntity]) -> str | None: for item in entities: if item.type == "project" and item.normalized_value.strip(): return item.normalized_value.strip() return None @staticmethod def _resolve_expense_type( entities: list[OntologyEntity], *, context_json: dict[str, Any], ) -> str | None: review_form_values = context_json.get("review_form_values") if isinstance(review_form_values, dict): compact = str( review_form_values.get("expense_type") or review_form_values.get("reimbursement_type") or "" ).replace(" ", "") if compact: if "招待" in compact or ("客户" in compact and any(word in compact for word in ("吃饭", "宴请", "请客", "用餐"))): return "entertainment" if any(word in compact for word in ("差旅", "出差", "机票", "行程")): return "travel" if any(word in compact for word in ("住宿", "酒店", "宾馆")): return "hotel" if any(word in compact for word in ("交通", "打车", "网约车", "出租车", "停车", "车费")): return "transport" if any(word in compact for word in ("餐费", "用餐", "午餐", "晚餐", "早餐", "伙食")): return "meal" if "会务" in compact: return "meeting" if any(word in compact for word in ("办公费", "办公用品", "文具", "耗材", "办公耗材", "打印纸", "办公设备", "键盘", "鼠标", "白板")): return "office" if any(word in compact for word in ("培训费", "培训", "讲师费", "课时费", "课程费")): return "training" if any(word in compact for word in ("通讯费", "话费", "流量费", "宽带费")): return "communication" if any(word in compact for word in ("福利费", "团建", "慰问", "节日福利", "体检费")): return "welfare" for item in entities: if item.type == "expense_type": normalized = item.normalized_value.strip() if normalized: return normalized return None @staticmethod def _resolve_reason( *, message: str, context_json: dict[str, Any], allow_message_fallback: bool, ) -> str | None: review_form_values = context_json.get("review_form_values") if isinstance(review_form_values, dict): for key in ("reason", "business_reason"): value = str(review_form_values.get(key) or "").strip() if value: return value explicit_text = context_json.get("user_input_text") if isinstance(explicit_text, str): normalized_explicit_text = explicit_text.strip() if normalized_explicit_text: return normalized_explicit_text[:500] return None request_context = context_json.get("request_context") if ( isinstance(request_context, dict) and str(context_json.get("entry_source") or "").strip() == "detail" ): for key in ("reason", "title"): value = str(request_context.get(key) or "").strip() if value: return value if not allow_message_fallback: return None normalized_message = str(message or "").strip() compact_message = re.sub(r"\s+", "", normalized_message) if compact_message.startswith(SYSTEM_GENERATED_REASON_PREFIXES): return None return normalized_message[:500] or None @staticmethod def _resolve_location(*, message: str, context_json: dict[str, Any]) -> str | None: review_form_values = context_json.get("review_form_values") if isinstance(review_form_values, dict): for key in ("business_location", "location"): value = str(review_form_values.get(key) or "").strip() if value: return value request_context = context_json.get("request_context") if ( isinstance(request_context, dict) and str(context_json.get("entry_source") or "").strip() == "detail" ): for key in ("city", "location"): value = str(request_context.get(key) or "").strip() if value: return value compact = str(message or "").replace(" ", "") if "客户现场" in compact: return "客户现场" return None @staticmethod def _resolve_occurred_at( ontology: OntologyParseResult, *, context_json: dict[str, Any], ) -> datetime | None: review_form_values = context_json.get("review_form_values") if isinstance(review_form_values, dict): for key in ("occurred_date", "time_range", "business_time"): value = str(review_form_values.get(key) or "").strip() if not value: continue try: parsed = date.fromisoformat(value) return datetime(parsed.year, parsed.month, parsed.day, tzinfo=UTC) except ValueError: continue start_date = ontology.time_range.start_date if start_date: try: parsed = date.fromisoformat(start_date) return datetime(parsed.year, parsed.month, parsed.day, tzinfo=UTC) except ValueError: pass return None @staticmethod def _resolve_amount( entities: list[OntologyEntity], *, context_json: dict[str, Any], ) -> Decimal | None: review_form_values = context_json.get("review_form_values") if isinstance(review_form_values, dict): raw_value = str(review_form_values.get("amount") or "").strip() if raw_value: compact = raw_value.replace("元", "").replace(",", "").strip() try: return Decimal(compact).quantize(Decimal("0.01")) except (InvalidOperation, ValueError): pass for item in entities: if item.type != "amount" or item.role == "threshold": continue try: return Decimal(item.normalized_value).quantize(Decimal("0.01")) except (InvalidOperation, ValueError): continue return None @staticmethod def _resolve_attachment_names(context_json: dict[str, Any]) -> list[str]: names = context_json.get("attachment_names") if not isinstance(names, list): return [] return [str(name).strip() for name in names if str(name).strip()] def _resolve_attachment_count(self, context_json: dict[str, Any]) -> int: names = self._resolve_attachment_names(context_json) if names: return len(names) try: return max(0, int(context_json.get("attachment_count") or 0)) except (TypeError, ValueError): return 0 def _get_claim_item_or_raise( self, *, claim_id: str, item_id: str, current_user: CurrentUserContext, ) -> tuple[ExpenseClaim | None, ExpenseClaimItem]: claim = self.get_claim(claim_id, current_user) if claim is None: return None, None # type: ignore[return-value] item = next((entry for entry in claim.items if entry.id == item_id), None) if item is None: raise LookupError("Item not found") return claim, item def _get_attachment_storage_root(self) -> Path: return (get_settings().resolved_storage_root_dir / "expense_claims").resolve() def _build_item_attachment_dir(self, claim_id: str, item_id: str) -> Path: return (self._get_attachment_storage_root() / claim_id / item_id).resolve() def _delete_claim_attachment_root(self, claim_id: str) -> None: shutil.rmtree((self._get_attachment_storage_root() / claim_id).resolve(), ignore_errors=True) @staticmethod def _normalize_attachment_filename(filename: str | None) -> str: normalized = Path(str(filename or "").strip()).name normalized = re.sub(r"[^\w.\-\u4e00-\u9fff]+", "_", normalized).strip("._") suffix = Path(normalized).suffix if normalized: return normalized return f"attachment{suffix or '.bin'}" def _resolve_attachment_path(self, storage_key: str | None) -> Path | None: normalized = str(storage_key or "").strip() if not normalized: return None root = self._get_attachment_storage_root() path = (root / normalized).resolve() try: path.relative_to(root) except ValueError as exc: raise FileNotFoundError("Attachment path is invalid") from exc return path def _to_attachment_storage_key(self, file_path: Path) -> str: root = self._get_attachment_storage_root() return file_path.resolve().relative_to(root).as_posix() def _resolve_item_attachment_content(self, item: ExpenseClaimItem) -> tuple[Path, str, str]: file_path = self._resolve_attachment_path(item.invoice_id) if file_path is None or not file_path.exists(): raise FileNotFoundError("Attachment not found") metadata = self._read_attachment_meta(file_path) filename = str(metadata.get("file_name") or file_path.name) media_type = self._resolve_attachment_media_type( filename, fallback=str(metadata.get("media_type") or ""), ) return file_path, media_type, filename def _delete_item_attachment_files(self, item: ExpenseClaimItem) -> None: file_path = self._resolve_attachment_path(item.invoice_id) if file_path is None: return root = self._get_attachment_storage_root() if file_path.parent == root: file_path.unlink(missing_ok=True) self._attachment_meta_path(file_path).unlink(missing_ok=True) return shutil.rmtree(file_path.parent, ignore_errors=True) @staticmethod def _attachment_meta_path(file_path: Path) -> Path: return file_path.with_name(f"{file_path.name}.meta.json") def _write_attachment_meta(self, file_path: Path, payload: dict[str, Any]) -> None: meta_path = self._attachment_meta_path(file_path) meta_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") def _read_attachment_meta(self, file_path: Path) -> dict[str, Any]: meta_path = self._attachment_meta_path(file_path) if not meta_path.exists(): return {} try: payload = json.loads(meta_path.read_text(encoding="utf-8")) except (json.JSONDecodeError, OSError): return {} return payload if isinstance(payload, dict) else {} def _build_attachment_preview_meta( self, *, file_path: Path, media_type: str, ocr_document: Any | None, ) -> dict[str, Any]: filename = file_path.name storage_key = self._to_attachment_storage_key(file_path) preview_kind = self._resolve_preview_kind(media_type, filename) preview_data_url = str(getattr(ocr_document, "preview_data_url", "") or "").strip() preview_source_kind = str(getattr(ocr_document, "preview_kind", "") or "").strip() if preview_source_kind == "image" and preview_data_url: preview_asset = self._write_preview_asset_from_data_url( attachment_dir=file_path.parent, original_filename=filename, preview_data_url=preview_data_url, ) if preview_asset is not None: preview_path, preview_media_type, preview_file_name = preview_asset return { "previewable": True, "preview_kind": "image", "preview_storage_key": self._to_attachment_storage_key(preview_path), "preview_media_type": preview_media_type, "preview_file_name": preview_file_name, } if preview_kind: return { "previewable": True, "preview_kind": preview_kind, "preview_storage_key": storage_key, "preview_media_type": media_type, "preview_file_name": filename, } return { "previewable": False, "preview_kind": "", "preview_storage_key": "", "preview_media_type": "", "preview_file_name": "", } def _resolve_item_attachment_preview_content(self, item: ExpenseClaimItem) -> tuple[Path, str, str]: file_path, media_type, filename = self._resolve_item_attachment_content(item) metadata = self._read_attachment_meta(file_path) preview_storage_key = str(metadata.get("preview_storage_key") or "").strip() preview_file_name = str(metadata.get("preview_file_name") or "").strip() preview_media_type = str(metadata.get("preview_media_type") or "").strip() if preview_storage_key: preview_path = self._resolve_attachment_path(preview_storage_key) if preview_path is not None and preview_path.exists(): resolved_name = preview_file_name or preview_path.name resolved_media_type = self._resolve_attachment_media_type( resolved_name, fallback=preview_media_type, ) return preview_path, resolved_media_type, resolved_name if self._is_previewable_media_type(media_type, filename): return file_path, media_type, filename raise FileNotFoundError("Attachment preview not found") def _build_attachment_payload(self, item: ExpenseClaimItem) -> dict[str, Any]: file_path, media_type, filename = self._resolve_item_attachment_content(item) metadata = self._read_attachment_meta(file_path) uploaded_at_value = metadata.get("uploaded_at") uploaded_at = None if isinstance(uploaded_at_value, str) and uploaded_at_value.strip(): try: uploaded_at = datetime.fromisoformat(uploaded_at_value) except ValueError: uploaded_at = None analysis = metadata.get("analysis") if not isinstance(analysis, dict): analysis = None document_info = metadata.get("document_info") if not isinstance(document_info, dict): document_info = None requirement_check = metadata.get("requirement_check") if not isinstance(requirement_check, dict): requirement_check = None preview_kind = str(metadata.get("preview_kind") or "").strip() previewable = bool(metadata.get("previewable", self._is_previewable_media_type(media_type, filename))) preview_url = self._build_attachment_preview_client_path(item.claim_id, item.id) if previewable else "" return { "file_name": str(metadata.get("file_name") or filename), "storage_key": str(item.invoice_id or ""), "media_type": str(metadata.get("media_type") or media_type), "size_bytes": int(metadata.get("size_bytes") or file_path.stat().st_size), "uploaded_at": uploaded_at, "previewable": previewable, "preview_kind": preview_kind or self._resolve_preview_kind(media_type, filename), "preview_url": preview_url, "analysis": analysis, "document_info": document_info, "requirement_check": requirement_check, } @staticmethod def _resolve_preview_kind(media_type: str | None, filename: str) -> str: resolved = str(media_type or "").strip() or (mimetypes.guess_type(filename)[0] or "") if resolved.startswith("image/"): return "image" if resolved == "application/pdf": return "pdf" return "" @staticmethod def _decode_data_url(payload: str) -> tuple[str, bytes] | None: normalized = str(payload or "").strip() matched = re.match(r"^data:(?P[\w.+-]+/[\w.+-]+);base64,(?P.+)$", normalized, flags=re.DOTALL) if not matched: return None try: content = base64.b64decode(matched.group("body"), validate=True) except (binascii.Error, ValueError): return None return matched.group("media"), content def _write_preview_asset_from_data_url( self, *, attachment_dir: Path, original_filename: str, preview_data_url: str, ) -> tuple[Path, str, str] | None: decoded = self._decode_data_url(preview_data_url) if decoded is None: return None preview_media_type, preview_content = decoded suffix = mimetypes.guess_extension(preview_media_type) or ".bin" preview_name = f"{Path(original_filename).stem}.preview{suffix}" preview_path = attachment_dir / preview_name preview_path.write_bytes(preview_content) return preview_path, preview_media_type, preview_name @staticmethod def _build_attachment_preview_client_path(claim_id: str, item_id: str) -> str: return ( "/reimbursements/claims/" f"{quote(str(claim_id or '').strip(), safe='')}" f"/items/{quote(str(item_id or '').strip(), safe='')}/attachment/preview" ) @staticmethod def _resolve_attachment_media_type(filename: str, *, fallback: str | None = None) -> str: guessed = mimetypes.guess_type(filename)[0] return str(guessed or fallback or "application/octet-stream") @staticmethod def _is_previewable_media_type(media_type: str | None, filename: str) -> bool: resolved = str(media_type or "").strip() or (mimetypes.guess_type(filename)[0] or "") return resolved.startswith("image/") or resolved == "application/pdf" @staticmethod def _resolve_attachment_display_name(storage_key: str | None) -> str: return Path(str(storage_key or "").strip()).name def _build_attachment_document_info(self, document: Any) -> dict[str, Any]: insight = build_document_insight( filename=str(getattr(document, "filename", "") or ""), summary=str(getattr(document, "summary", "") or ""), text=str(getattr(document, "text", "") or ""), ) raw_fields = list(getattr(document, "document_fields", []) or []) normalized_fields: list[dict[str, str]] = [] for item in raw_fields: key = "" label = "" value = "" if isinstance(item, dict): key = str(item.get("key") or "").strip() label = str(item.get("label") or "").strip() value = str(item.get("value") or "").strip() else: key = str(getattr(item, "key", "") or "").strip() label = str(getattr(item, "label", "") or "").strip() value = str(getattr(item, "value", "") or "").strip() if key and label and value: normalized_fields.append( { "key": key, "label": label, "value": value, } ) if not normalized_fields: normalized_fields = [ { "key": field.key, "label": field.label, "value": field.value, } for field in insight.fields if field.value ] document_type = str(getattr(document, "document_type", "") or "").strip() if document_type in {"", "other"}: document_type = insight.document_type document_type_label = str(getattr(document, "document_type_label", "") or "").strip() if not document_type_label or document_type_label == "其他单据": document_type_label = insight.document_type_label scene_code = str(getattr(document, "scene_code", "") or "").strip() if scene_code in {"", "other"}: scene_code = insight.scene_code scene_label = str(getattr(document, "scene_label", "") or "").strip() if not scene_label or scene_label == "其他票据": scene_label = insight.scene_label return { "document_type": document_type, "document_type_label": document_type_label, "scene_code": scene_code, "scene_label": scene_label, "fields": normalized_fields, } def _build_attachment_requirement_check( self, *, item: ExpenseClaimItem, document_info: dict[str, Any], ) -> dict[str, Any]: expense_type = str(item.item_type or "").strip().lower() or "other" policy = self._get_expense_scene_policy(expense_type) expense_label = policy.label if policy is not None else self._resolve_expense_type_label(expense_type) allowed_scenes = set(policy.allowed_scene_codes) if policy is not None else set() allowed_document_types = set(policy.allowed_document_types) if policy is not None else set() allowed_scene_labels = [self._resolve_document_scene_label(code) for code in sorted(allowed_scenes)] allowed_document_type_labels = [ resolve_document_type_label(document_type) for document_type in sorted(allowed_document_types) ] recognized_scene_code = str(document_info.get("scene_code") or "other").strip() or "other" recognized_scene_label = str( document_info.get("scene_label") or self._resolve_document_scene_label(recognized_scene_code) ).strip() recognized_document_type = str(document_info.get("document_type") or "other").strip() or "other" recognized_document_type_label = str(document_info.get("document_type_label") or "其他单据").strip() or "其他单据" matches = ( (not allowed_scenes and not allowed_document_types) or recognized_scene_code in allowed_scenes or recognized_document_type in allowed_document_types ) if matches: if allowed_scene_labels or allowed_document_type_labels: message = ( f"当前费用项目为{expense_label},已识别为{recognized_document_type_label}," f"符合当前{expense_label}场景的附件要求。" ) else: message = f"当前费用项目为{expense_label},已识别为{recognized_document_type_label}。" else: expected_parts = [label + "相关票据" for label in allowed_scene_labels] expected_parts.extend(allowed_document_type_labels) expected_text = "、".join(dict.fromkeys(part for part in expected_parts if part)) or "对应场景票据" message = ( f"当前费用项目为{expense_label},要求上传{expected_text};" f"当前识别为{recognized_document_type_label},不符合当前场景,建议过滤或更换附件。" ) return { "matches": matches, "current_expense_type": expense_type, "current_expense_type_label": expense_label, "allowed_scene_labels": allowed_scene_labels, "allowed_document_type_labels": allowed_document_type_labels, "recognized_scene_code": recognized_scene_code, "recognized_scene_label": recognized_scene_label, "recognized_document_type": recognized_document_type, "recognized_document_type_label": recognized_document_type_label, "mismatch_severity": policy.attachment_mismatch_severity if policy is not None else "high", "rule_code": policy.rule_code if policy is not None else DEFAULT_SCENE_RULE_ASSET_CODE, "rule_name": policy.rule_name if policy is not None else "报销场景提交与附件标准", "message": message, } @staticmethod def _resolve_document_scene_label(scene_code: str) -> str: normalized = str(scene_code or "").strip().lower() return DOCUMENT_SCENE_LABELS.get(normalized, "其他票据") @staticmethod def _extract_amount_candidates(text: str) -> list[Decimal]: values: list[Decimal] = [] seen: set[Decimal] = set() def append_candidate(raw: str) -> None: compact = str(raw or "").replace(",", ".").strip() if not compact: return try: candidate = Decimal(compact).quantize(Decimal("0.01")) except (InvalidOperation, ValueError): return if candidate in seen: return seen.add(candidate) values.append(candidate) for pattern in ( r"(?:金额|价税合计|合计|小写|实收金额|支付金额|订单金额|总额|票价|房费|餐费)[::\s¥¥]*([0-9]{1,6}(?:[.,][0-9]{1,2})?)", r"[¥¥]\s*([0-9]{1,6}(?:[.,][0-9]{1,2})?)", r"([0-9]{1,6}(?:[.,][0-9]{1,2})?)\s*元", ): for raw in re.findall(pattern, text, flags=re.IGNORECASE): append_candidate(raw) if values: return values for raw in re.findall(r"(? bool: return bool(re.search(r"(20\d{2}[年/\-.]\d{1,2}[月/\-.]\d{1,2}日?)", text)) @staticmethod def _normalize_match_text(text: str) -> str: return re.sub(r"\s+", "", str(text or "")).lower() @staticmethod def _resolve_expense_type_label(expense_type: str | None) -> str: normalized = str(expense_type or "").strip().lower() return EXPENSE_TYPE_LABELS.get(normalized, "其他") def _resolve_allowed_document_scenes(self, expense_type: str | None) -> set[str]: normalized = str(expense_type or "").strip().lower() policy = self._get_expense_scene_policy(normalized) return set(policy.allowed_scene_codes) if policy is not None else set() def _detect_expense_scenes(self, text: str) -> dict[str, list[str]]: normalized = self._normalize_match_text(text) if not normalized: return {} matches: dict[str, list[str]] = {} for scene, keywords in EXPENSE_SCENE_KEYWORDS.items(): matched = [keyword for keyword in keywords if keyword in normalized] if matched: matches[scene] = matched[:3] return matches def _format_scene_labels(self, scene_codes: set[str]) -> str: labels = [self._resolve_expense_type_label(code) for code in scene_codes] unique_labels = list(dict.fromkeys(label for label in labels if label)) return "、".join(unique_labels) if unique_labels else "其他" def _build_purpose_mismatch_point( self, *, item: ExpenseClaimItem, document_scenes: set[str], ) -> str | None: if not document_scenes: return None allowed_scenes = self._resolve_allowed_document_scenes(item.item_type) reason_text = str(item.item_reason or "").strip() reason_scenes = set(self._detect_expense_scenes(reason_text).keys()) document_scene_labels = self._format_scene_labels(document_scenes) if reason_scenes and document_scenes.isdisjoint(reason_scenes): return ( f"用途字段:用户填写用途“{reason_text[:24]}”与票据内容不一致," f"当前附件更像{document_scene_labels}相关材料。" ) if allowed_scenes and document_scenes.isdisjoint(allowed_scenes): expense_label = self._resolve_expense_type_label(item.item_type) return f"用途字段:当前费用项目为{expense_label},但附件内容更像{document_scene_labels}相关票据。" return None def _build_fallback_attachment_analysis( self, *, media_type: str | None, item: ExpenseClaimItem, ) -> dict[str, Any]: return { "severity": "medium", "label": "中风险", "headline": "AI提示:附件已上传,待识别结果", "summary": "附件已成功保存,但当前尚未拿到有效识别结果,建议人工先核对票据内容。", "points": [ f"附件格式:{self._resolve_attachment_media_type('attachment', fallback=media_type)}", f"费用金额:当前明细金额为 {item.item_amount} 元", ], "suggestion": "建议打开附件确认金额、日期和票据类型是否完整,再继续提交审批。", } def _build_failed_ocr_attachment_analysis( self, *, media_type: str | None, error_message: str, item: ExpenseClaimItem, ) -> dict[str, Any]: return { "severity": "medium", "label": "中风险", "headline": "AI提示:附件已上传,但识别失败", "summary": "文件已经保存成功,但本次 AI 识别未完成,因此无法给出完整票据核验结论。", "points": [ f"识别异常:{error_message or 'OCR 服务暂不可用'}", f"费用金额:当前明细金额为 {item.item_amount} 元", f"附件格式:{self._resolve_attachment_media_type('attachment', fallback=media_type)}", ], "suggestion": "建议重新上传更清晰的票据图片,或稍后重试识别后再提交。", } def _build_attachment_analysis( self, *, document: Any, item: ExpenseClaimItem, document_info: dict[str, Any] | None = None, requirement_check: dict[str, Any] | None = None, ) -> dict[str, Any]: warnings = [str(value).strip() for value in list(getattr(document, "warnings", []) or []) if str(value).strip()] text = " ".join( [ str(getattr(document, "summary", "") or "").strip(), str(getattr(document, "text", "") or "").strip(), ] ).strip() compact_text = text.replace(" ", "") avg_score = float(getattr(document, "avg_score", 0.0) or 0.0) line_count = int(getattr(document, "line_count", 0) or 0) document_info = document_info or self._build_attachment_document_info(document) requirement_check = requirement_check or self._build_attachment_requirement_check( item=item, document_info=document_info, ) document_scene_matches = self._detect_expense_scenes(text) purpose_mismatch_point = self._build_purpose_mismatch_point( item=item, document_scenes=set(document_scene_matches.keys()), ) recognized_document_type = str(document_info.get("document_type") or "other").strip().lower() or "other" recognized_document_label = str(document_info.get("document_type_label") or "其他单据").strip() or "其他单据" requirement_matches = bool(requirement_check.get("matches")) mismatch_severity = str(requirement_check.get("mismatch_severity") or "high").strip().lower() or "high" has_ticket_keyword = any( keyword in compact_text for keyword in ( "发票", "票据", "增值税", "电子行程单", "购买方", "销售方", "税额", "价税", "票号", "发票代码", "凭证", ) ) amount_candidates = self._extract_amount_candidates(text) item_amount = Decimal(item.item_amount or Decimal("0.00")).quantize(Decimal("0.01")) has_matching_amount = any(abs(candidate - item_amount) <= Decimal("1.00") for candidate in amount_candidates) has_date_text = self._has_date_like_text(text) amount_mismatch = bool(amount_candidates) and item_amount > Decimal("0.00") and not has_matching_amount points: list[str] = [] if warnings: points.append(f"识别提示:{warnings[0]}") if line_count == 0 or not compact_text: points.append("附件内容:未识别到有效文字,当前附件更像普通图片或内容过于模糊。") if recognized_document_type == "other" and not has_ticket_keyword: points.append("票据类型:未识别到发票、票据、电子行程单等关键字,暂无法判断票据类型。") if not amount_candidates: points.append("金额字段:未识别到可用于核对的金额。") elif amount_mismatch: candidate_text = "、".join(str(candidate) for candidate in amount_candidates[:3]) points.append(f"金额字段:附件识别金额 {candidate_text} 元与报销金额 {item_amount} 元不一致。") if not has_date_text: points.append("日期字段:未识别到开票日期或业务发生日期。") if not requirement_matches: points.append(f"附件类型要求:{requirement_check.get('message')}") if purpose_mismatch_point: points.append(purpose_mismatch_point) if avg_score and avg_score < 0.72: points.append(f"识别质量:OCR 置信度偏低({avg_score:.0%}),可能影响票据核验准确性。") issue_count = len(points) if issue_count == 0: return { "severity": "pass", "label": "AI提示符合条件", "headline": "AI提示:附件符合基础校验条件", "summary": "已识别到票据类型和关键字段,且符合当前费用场景的附件要求。", "points": [ f"票据类型:已识别为{recognized_document_label}。", f"附件类型要求:{requirement_check.get('message')}", f"金额字段:已识别到与当前明细接近的金额 {item_amount} 元。", ], "suggestion": "建议继续核对报销分类、费用说明和业务场景是否一致。", } severity = "low" label = "低风险" headline = "AI提示:附件存在轻微待核对项" summary = "当前附件已识别出部分票据要素,但仍建议人工继续复核。" if ( line_count == 0 or not compact_text or (recognized_document_type == "other" and not has_ticket_keyword and issue_count >= 2) or (not requirement_matches and mismatch_severity == "high") or (purpose_mismatch_point and amount_mismatch) ): severity = "high" label = "高风险" headline = "AI提示:附件不符合票据校验条件" summary = "当前附件存在明显异常,票据类型与当前费用场景不匹配,或无法作为有效报销材料。" elif ( purpose_mismatch_point or amount_mismatch or issue_count >= 2 or warnings or (avg_score and avg_score < 0.72) or (not requirement_matches and mismatch_severity in {"medium", "low"}) ): severity = "medium" label = "中风险" headline = "AI提示:附件存在明显待整改项" summary = "当前附件可见部分内容,但金额、用途、日期或附件类型仍有缺失或不一致。" suggestion = { "high": "建议过滤当前不匹配的票据,重新上传符合当前费用场景的清晰原件。", "medium": "建议根据风险点补齐清晰票据,或修正金额、日期、费用说明后再提交。", "low": "建议人工再次核对金额和业务说明,确认后可继续流转。", }[severity] return { "severity": severity, "label": label, "headline": headline, "summary": summary, "points": points, "suggestion": suggestion, } @staticmethod def _serialize_claim(claim: ExpenseClaim) -> dict[str, Any]: return { "id": claim.id, "claim_no": claim.claim_no, "employee_name": claim.employee_name, "department_name": claim.department_name, "project_code": claim.project_code, "expense_type": claim.expense_type, "reason": claim.reason, "location": claim.location, "amount": float(claim.amount), "invoice_count": int(claim.invoice_count or 0), "status": claim.status, "approval_stage": claim.approval_stage, "risk_flags_json": list(claim.risk_flags_json or []), } @staticmethod def _normalize_optional_text(value: str | None, *, fallback: str = "", allow_empty: bool = False) -> str | None: normalized = str(value or "").strip() if normalized: return normalized if allow_empty: return None return fallback @staticmethod def _normalize_sort_datetime(value: datetime | None) -> datetime: if value is None: return datetime.max.replace(tzinfo=UTC) if value.tzinfo is None: return value.replace(tzinfo=UTC) return value @staticmethod def _is_missing_value(value: Any) -> bool: text = str(value or "").strip() if not text: return True compact = text.replace(" ", "") return compact in {"待补充", "暂无", "无", "未知", "处理中"} def _ensure_draft_claim(self, claim: ExpenseClaim) -> None: normalized_status = str(claim.status or "").strip().lower() if normalized_status not in {"draft", "supplement", "returned"}: raise ValueError("只有草稿、待补充或退回待提交状态的报销单才允许执行该操作。") def _run_ai_submission_review(self, claim: ExpenseClaim) -> dict[str, Any]: base_flags = list(claim.risk_flags_json or []) attachment_flags = [ flag for flag in base_flags if isinstance(flag, dict) and str(flag.get("source") or "").strip() == "attachment_analysis" ] preserved_flags = [ flag for flag in base_flags if not (isinstance(flag, dict) and str(flag.get("source") or "").strip() == "submission_review") ] review_flags: list[dict[str, Any]] = [] attention_reasons: list[str] = [] high_attachment_flags = [ flag for flag in attachment_flags if str(flag.get("severity") or "").strip().lower() == "high" ] medium_attachment_flags = [ flag for flag in attachment_flags if str(flag.get("severity") or "").strip().lower() == "medium" ] if high_attachment_flags: attention_reasons.append("存在高风险票据,需审批人重点复核。") review_flags.append( { "source": "submission_review", "severity": "high", "label": "AI预审重点复核", "message": ( f"AI预审发现 {len(high_attachment_flags)} 条高风险附件," "已随单流转给审批人重点复核。" ), } ) elif medium_attachment_flags: review_flags.append( { "source": "submission_review", "severity": "medium", "label": "AI预审提醒", "message": f"AI预审发现 {len(medium_attachment_flags)} 条中风险附件,已随单流转给审批人复核。", } ) manager_name = self._resolve_claim_manager_name(claim) if not manager_name: attention_reasons.append("未识别到该员工的直属领导,需审批环节补充分配。") review_flags.append( { "source": "submission_review", "severity": "medium", "label": "审批链待分配", "message": "AI预审发现直属领导缺失,已提交到审批环节等待分配或复核。", } ) historical_risk_count = self._count_recent_risky_claims(claim) if historical_risk_count >= AI_REVIEW_REPEAT_RISK_BLOCK_COUNT: review_flags.append( { "source": "submission_review", "severity": "medium", "label": "历史风险偏高", "message": ( f"近 {AI_REVIEW_LOOKBACK_DAYS} 天内该员工已有 {historical_risk_count} 笔带风险标记的报销," "本次已追加到审批链重点关注。" ), } ) elif historical_risk_count >= AI_REVIEW_REPEAT_RISK_WARNING_COUNT: review_flags.append( { "source": "submission_review", "severity": "low", "label": "历史风险提醒", "message": ( f"近 {AI_REVIEW_LOOKBACK_DAYS} 天内该员工已有 {historical_risk_count} 笔带风险标记的报销," "建议直属领导重点复核。" ), } ) travel_review = self._run_travel_policy_review(claim) attention_reasons.extend(travel_review["blocking_reasons"]) review_flags.extend(travel_review["flags"]) scene_policy_review = self._run_scene_policy_review(claim) attention_reasons.extend(scene_policy_review["blocking_reasons"]) review_flags.extend(scene_policy_review["flags"]) platform_risk_review = self.evaluate_platform_risk_rules(claim) attention_reasons.extend(platform_risk_review["blocking_reasons"]) review_flags.extend(platform_risk_review["flags"]) if attention_reasons: summary_message = "AI预审发现需审批重点关注事项:" + ";".join( dict.fromkeys(attention_reasons) ) review_flags.insert( 0, { "source": "submission_review", "severity": "medium", "label": "AI预审重点复核", "message": summary_message, }, ) return { "status": "submitted", "approval_stage": "直属领导审批", "risk_flags": preserved_flags + review_flags, "message": ( f"报销单 {claim.claim_no} 已完成 AI预审," f"现已提交给直属领导 {manager_name or '审批人'} 审批。" ), "passed": True, } @staticmethod def _resolve_claim_manager_name(claim: ExpenseClaim) -> str: if claim.employee is not None: if claim.employee.manager is not None and claim.employee.manager.name: return str(claim.employee.manager.name).strip() if claim.employee.organization_unit is not None and claim.employee.organization_unit.manager_name: return str(claim.employee.organization_unit.manager_name).strip() return "" def _count_recent_risky_claims(self, claim: ExpenseClaim) -> int: filters = [] if claim.employee_id: filters.append(ExpenseClaim.employee_id == claim.employee_id) elif claim.employee_name: filters.append(ExpenseClaim.employee_name == claim.employee_name) if not filters: return 0 since = datetime.now(UTC) - timedelta(days=AI_REVIEW_LOOKBACK_DAYS) stmt = ( select(ExpenseClaim) .where(or_(*filters)) .where(ExpenseClaim.id != claim.id) .where(ExpenseClaim.occurred_at >= since) ) recent_claims = list(self.db.scalars(stmt).all()) return sum(1 for item in recent_claims if list(item.risk_flags_json or [])) def evaluate_platform_risk_rules( self, claim: ExpenseClaim, *, rule_codes: list[str] | None = None, ) -> dict[str, list[Any]]: manifests = self._load_platform_risk_rule_manifests(rule_codes=rule_codes) if not manifests: return {"flags": [], "blocking_reasons": []} contexts = self._build_claim_attachment_contexts(claim) flags: list[dict[str, Any]] = [] blocking_reasons: list[str] = [] for manifest in manifests: if not self._risk_manifest_applies_to_claim(manifest, claim=claim, contexts=contexts): continue flag = self._evaluate_platform_risk_manifest( manifest, claim=claim, contexts=contexts, ) if flag is None: continue flags.append(flag) severity = str(flag.get("severity") or "").strip().lower() action = str(flag.get("action") or "").strip().lower() if severity == "high" or action == "block": blocking_reasons.append(str(flag.get("message") or flag.get("label") or "").strip()) deduplicated_reasons = list( dict.fromkeys(reason for reason in blocking_reasons if reason) ) return {"flags": flags, "blocking_reasons": deduplicated_reasons} def _load_platform_risk_rule_manifests( self, *, rule_codes: list[str] | None, ) -> list[dict[str, Any]]: code_filter = { str(code or "").strip() for code in list(rule_codes or []) if str(code or "").strip() } manifests_by_code: dict[str, dict[str, Any]] = {} assets = list( self.db.scalars( select(AgentAsset) .where(AgentAsset.asset_type == AgentAssetType.RULE.value) .where(AgentAsset.status == AgentAssetStatus.ACTIVE.value) .where(AgentAsset.domain == AgentAssetDomain.EXPENSE.value) .order_by(AgentAsset.updated_at.desc(), AgentAsset.created_at.desc()) ).all() ) library_manager = AgentAssetRuleLibraryManager() for asset in assets: config_json = asset.config_json if isinstance(asset.config_json, dict) else {} if str(config_json.get("detail_mode") or "").strip().lower() != "json_risk": continue rule_code = str(asset.code or "").strip() if code_filter and rule_code not in code_filter: continue rule_document = config_json.get("rule_document") if not isinstance(rule_document, dict): continue file_name = str(rule_document.get("file_name") or "").strip() rule_library = ( str(config_json.get("rule_library") or RISK_RULES_LIBRARY).strip() or RISK_RULES_LIBRARY ) if not file_name: continue try: payload = library_manager.read_rule_library_json( library=rule_library, file_name=file_name, ) except (FileNotFoundError, ValueError): continue manifest_code = str(payload.get("rule_code") or rule_code).strip() if not manifest_code or (code_filter and manifest_code not in code_filter): continue if payload.get("enabled") is False: continue payload = dict(payload) payload.setdefault("rule_code", manifest_code) payload["_rule_version"] = str( asset.published_version or asset.current_version or "v1.0.0" ) payload["_rule_asset_id"] = asset.id manifests_by_code[manifest_code] = payload missing_codes = code_filter - set(manifests_by_code) should_load_fallback = not code_filter or bool(missing_codes) if should_load_fallback: try: files = library_manager.list_rule_library_json_files(library=RISK_RULES_LIBRARY) except ValueError: files = [] for file_name in files: try: payload = library_manager.read_rule_library_json( library=RISK_RULES_LIBRARY, file_name=file_name, ) except (FileNotFoundError, ValueError): continue rule_code = str(payload.get("rule_code") or "").strip() if not rule_code or rule_code in manifests_by_code: continue if code_filter and rule_code not in missing_codes: continue if payload.get("enabled") is False: continue payload = dict(payload) payload["_rule_version"] = "v1.0.0" manifests_by_code[rule_code] = payload return list(manifests_by_code.values()) def _risk_manifest_applies_to_claim( self, manifest: dict[str, Any], *, claim: ExpenseClaim, contexts: list[dict[str, Any]], ) -> bool: applies_to = manifest.get("applies_to") if not isinstance(applies_to, dict): applies_to = {} try: min_attachments = int(applies_to.get("min_attachments") or 0) except (TypeError, ValueError): min_attachments = 0 if min_attachments and int(claim.invoice_count or 0) < min_attachments and not contexts: return False expense_types = { str(claim.expense_type or "").strip().lower(), *{ str(item.item_type or "").strip().lower() for item in list(claim.items or []) if str(item.item_type or "").strip() }, } domains = { str(value or "").strip().lower() for value in list(applies_to.get("domains") or []) if str(value or "").strip() } configured_expense_types = { str(value or "").strip().lower() for value in list(applies_to.get("expense_types") or []) if str(value or "").strip() } if configured_expense_types and not (expense_types & configured_expense_types): return False if domains and not self._risk_domains_match_claim( domains, expense_types=expense_types, contexts=contexts, ): return False return True def _risk_domains_match_claim( self, domains: set[str], *, expense_types: set[str], contexts: list[dict[str, Any]], ) -> bool: normalized_contexts: list[dict[str, str]] = [] for context in contexts: document_info = context.get("document_info") or {} normalized_contexts.append( { "scene_code": str(document_info.get("scene_code") or "").strip().lower(), "document_type": str( document_info.get("document_type") or "" ).strip().lower(), "item_type": str( getattr(context.get("item"), "item_type", "") or "" ).strip().lower(), } ) if "travel" in domains: if expense_types & {"travel", "hotel", "transport"}: return True if any( item["scene_code"] in {"travel", "hotel", "transport"} or item["document_type"] in { "flight_itinerary", "train_ticket", "hotel_invoice", "taxi_receipt", } for item in normalized_contexts ): return True if "meal" in domains: if expense_types & {"meal", "entertainment"}: return True if any( item["scene_code"] == "meal" or item["document_type"] == "meal_receipt" for item in normalized_contexts ): return True return bool(domains & expense_types) def _evaluate_platform_risk_manifest( self, manifest: dict[str, Any], *, claim: ExpenseClaim, contexts: list[dict[str, Any]], ) -> dict[str, Any] | None: evaluator = str(manifest.get("evaluator") or "").strip().lower() if evaluator == "reason_too_brief": return self._evaluate_reason_too_brief_risk(manifest, claim=claim) if evaluator == "entertainment_reason_missing": return self._evaluate_entertainment_reason_missing_risk(manifest, claim=claim) if evaluator == "document_expense_mismatch": return self._evaluate_document_expense_mismatch_risk( manifest, claim=claim, contexts=contexts, ) if evaluator == "location_consistency": return self._evaluate_location_consistency_risk( manifest, claim=claim, contexts=contexts, ) if evaluator == "duplicate_invoice": return self._evaluate_duplicate_invoice_risk(manifest, claim=claim, contexts=contexts) if evaluator == "identity_consistency": return self._evaluate_identity_consistency_risk( manifest, claim=claim, contexts=contexts, ) if evaluator == "cross_year_invoice": return self._evaluate_cross_year_invoice_risk(manifest, claim=claim, contexts=contexts) if evaluator == "void_or_red_invoice": return self._evaluate_text_keyword_risk( manifest, contexts=contexts, keywords=["作废", "红冲", "红字", "冲红"], fallback_message="票据文本中出现作废、红冲或红字发票相关信息,建议退回补充或人工复核。", ) if evaluator == "vague_goods_description": return self._evaluate_text_keyword_risk( manifest, contexts=contexts, keywords=["详见清单", "服务费", "咨询费", "其他", "办公用品"], fallback_message="票据商品或服务描述较笼统,建议审批人核对真实用途和明细清单。", ) if evaluator == "multi_city_reason_required": return self._evaluate_multi_city_reason_required_risk( manifest, claim=claim, contexts=contexts, ) return None def _evaluate_reason_too_brief_risk( self, manifest: dict[str, Any], *, claim: ExpenseClaim, ) -> dict[str, Any] | None: params = manifest.get("params") if isinstance(manifest.get("params"), dict) else {} try: min_reason_length = max(1, int(params.get("min_reason_length") or 6)) except (TypeError, ValueError): min_reason_length = 6 reason_corpus = re.sub(r"\s+", "", self._build_scene_reason_corpus(claim)) if len(reason_corpus) >= min_reason_length: return None return self._build_platform_risk_flag( manifest, message=f"报销事由有效描述不足 {min_reason_length} 个字符,暂不足以支撑真实性判断。", evidence={"reason_length": len(reason_corpus), "min_reason_length": min_reason_length}, ) def _evaluate_entertainment_reason_missing_risk( self, manifest: dict[str, Any], *, claim: ExpenseClaim, ) -> dict[str, Any] | None: expense_types = { str(claim.expense_type or "").strip().lower(), *{str(item.item_type or "").strip().lower() for item in list(claim.items or [])}, } reason_corpus = self._build_scene_reason_corpus(claim) compact_reason = re.sub(r"\s+", "", reason_corpus) looks_like_entertainment = ( "entertainment" in expense_types or "招待" in compact_reason or "客户" in compact_reason ) if not looks_like_entertainment: return None required_keywords = ("客户", "项目", "参与", "人员", "对象", "商务", "会议") has_detail = any(keyword in compact_reason for keyword in required_keywords) if has_detail: return None return self._build_platform_risk_flag( manifest, message="招待或餐饮类费用未识别到客户、项目、参与人员等必要说明,建议补充后再流转。", evidence={"reason": reason_corpus[:300]}, ) def _evaluate_document_expense_mismatch_risk( self, manifest: dict[str, Any], *, claim: ExpenseClaim, contexts: list[dict[str, Any]], ) -> dict[str, Any] | None: mismatches: list[str] = [] for context in contexts: item = context["item"] item_type = ( str(item.item_type or claim.expense_type or "other").strip().lower() or "other" ) policy = self._get_expense_scene_policy(item_type) if policy is None: continue document_info = context.get("document_info") or {} recognized_scene_code = ( str(document_info.get("scene_code") or "other").strip().lower() or "other" ) recognized_document_type = ( str(document_info.get("document_type") or "other").strip().lower() or "other" ) if ( recognized_scene_code in set(policy.allowed_scene_codes) or recognized_document_type in set(policy.allowed_document_types) ): continue recognized_label = str( document_info.get("document_type_label") or recognized_document_type or "未知票据" ) mismatches.append(f"第 {context['index']} 条明细为{policy.label},附件识别为{recognized_label}") if not mismatches: return None return self._build_platform_risk_flag( manifest, message=";".join(mismatches[:3]) + ",与当前费用场景不匹配。", evidence={"mismatches": mismatches[:5]}, ) def _evaluate_location_consistency_risk( self, manifest: dict[str, Any], *, claim: ExpenseClaim, contexts: list[dict[str, Any]], ) -> dict[str, Any] | None: policy = self._get_expense_rule_catalog().travel_policy if policy is None: return None declared_cities = self._extract_known_cities_from_text( " ".join( [ str(claim.location or ""), *[str(item.item_location or "") for item in list(claim.items or [])], ] ), policy, ) evidence_cities = self._collect_attachment_cities(contexts, policy) if not declared_cities or not evidence_cities: return None if set(declared_cities) & set(evidence_cities): return None declared_text = "、".join(declared_cities) evidence_text = "、".join(evidence_cities[:5]) return self._build_platform_risk_flag( manifest, message=f"申报地点 {declared_text} 与票据识别地点 {evidence_text} 不一致,建议补充异地说明或更换附件。", evidence={"declared_cities": declared_cities, "evidence_cities": evidence_cities}, ) def _evaluate_duplicate_invoice_risk( self, manifest: dict[str, Any], *, claim: ExpenseClaim, contexts: list[dict[str, Any]], ) -> dict[str, Any] | None: invoice_keys = self._collect_invoice_keys_from_contexts(contexts) duplicate_keys = [ key for key, count in self._count_values(invoice_keys).items() if count > 1 ] if duplicate_keys: return self._build_platform_risk_flag( manifest, message=f"当前报销单内存在重复票据号码:{'、'.join(duplicate_keys[:3])}。", evidence={"duplicate_invoice_keys": duplicate_keys[:5]}, ) if not invoice_keys: return None other_items = list( self.db.scalars( select(ExpenseClaimItem) .where(ExpenseClaimItem.claim_id != claim.id) .where(ExpenseClaimItem.invoice_id.is_not(None)) ).all() ) matched_claim_ids: set[str] = set() for other_item in other_items: other_path = self._resolve_attachment_path(other_item.invoice_id) if other_path is None or not other_path.exists(): continue other_meta = self._read_attachment_meta(other_path) other_document_info = other_meta.get("document_info") if not isinstance(other_document_info, dict): continue other_keys = self._collect_invoice_keys_from_document_info(other_document_info) if set(invoice_keys) & set(other_keys): matched_claim_ids.add(str(other_item.claim_id or "")) if not matched_claim_ids: return None return self._build_platform_risk_flag( manifest, message=f"票据号码已在其他报销单中出现,疑似重复报销:{'、'.join(invoice_keys[:3])}。", evidence={ "invoice_keys": invoice_keys[:5], "matched_claim_ids": sorted(matched_claim_ids)[:5], }, ) def _evaluate_identity_consistency_risk( self, manifest: dict[str, Any], *, claim: ExpenseClaim, contexts: list[dict[str, Any]], ) -> dict[str, Any] | None: params = manifest.get("params") if isinstance(manifest.get("params"), dict) else {} allow_keywords = [ str(value) for value in list(params.get("allow_keywords") or []) if str(value).strip() ] claimant = str(claim.employee_name or "").strip() if not claimant: return None mismatched_buyers: list[str] = [] for context in contexts: buyer = self._resolve_first_document_field_value( context.get("document_info") or {}, keys={"buyer_name", "buyer", "purchaser_name", "claimant"}, labels={"购买方", "抬头", "买方", "购方"}, ) if not buyer: continue if claimant in buyer or any(keyword in buyer for keyword in allow_keywords): continue mismatched_buyers.append(buyer) if not mismatched_buyers: return None return self._build_platform_risk_flag( manifest, message=f"发票抬头 {mismatched_buyers[0]} 与报销人 {claimant} 不一致,建议人工复核。", evidence={"claimant": claimant, "buyers": mismatched_buyers[:5]}, ) def _evaluate_cross_year_invoice_risk( self, manifest: dict[str, Any], *, claim: ExpenseClaim, contexts: list[dict[str, Any]], ) -> dict[str, Any] | None: claim_year = claim.occurred_at.year if claim.occurred_at is not None else None if claim_year is None: return None issue_years: list[int] = [] for context in contexts: text = " ".join( [ self._resolve_first_document_field_value( context.get("document_info") or {}, keys={"date", "issue_date", "invoice_date"}, labels={"日期", "开票日期", "发生时间"}, ), str(context.get("ocr_summary") or ""), str(context.get("ocr_text") or ""), ] ) for match in re.findall(r"(20\d{2}|19\d{2})[年/\-.]", text): try: issue_years.append(int(match)) except ValueError: continue mismatch_years = sorted({year for year in issue_years if year != claim_year}) if not mismatch_years: return None return self._build_platform_risk_flag( manifest, message=f"票据年份 {mismatch_years[0]} 与费用发生年份 {claim_year} 不一致,建议确认是否跨年报销。", evidence={"claim_year": claim_year, "invoice_years": mismatch_years}, ) def _evaluate_text_keyword_risk( self, manifest: dict[str, Any], *, contexts: list[dict[str, Any]], keywords: list[str], fallback_message: str, ) -> dict[str, Any] | None: matched: list[str] = [] for context in contexts: text = f"{context.get('ocr_summary') or ''}\n{context.get('ocr_text') or ''}" for keyword in keywords: if keyword in text and keyword not in matched: matched.append(keyword) if not matched: return None return self._build_platform_risk_flag( manifest, message=fallback_message, evidence={"matched_keywords": matched}, ) def _evaluate_multi_city_reason_required_risk( self, manifest: dict[str, Any], *, claim: ExpenseClaim, contexts: list[dict[str, Any]], ) -> dict[str, Any] | None: policy = self._get_expense_rule_catalog().travel_policy if policy is None: return None cities = self._collect_attachment_cities(contexts, policy) for item in list(claim.items or []): for city in self._extract_known_cities_from_text(str(item.item_location or ""), policy): if city not in cities: cities.append(city) if len(cities) <= 2: return None reason_corpus = self._build_travel_reason_corpus(claim) if self._text_contains_keywords(reason_corpus, policy.route_exception_keywords): return None return self._build_platform_risk_flag( manifest, message=f"本次报销识别到多城市行程({'、'.join(cities[:5])}),但事由中未说明中转、多地拜访或改签原因。", evidence={"cities": cities[:8]}, ) def _build_platform_risk_flag( self, manifest: dict[str, Any], *, message: str, evidence: dict[str, Any], ) -> dict[str, Any]: outcomes = manifest.get("outcomes") if isinstance(manifest.get("outcomes"), dict) else {} fail_outcome = outcomes.get("fail") if isinstance(outcomes.get("fail"), dict) else {} severity = str(fail_outcome.get("severity") or "medium").strip().lower() or "medium" default_action = "block" if severity == "high" else "manual_review" action = str(fail_outcome.get("action") or default_action).strip() label = str(manifest.get("name") or manifest.get("rule_code") or "风险规则命中").strip() return { "source": "submission_review", "hit_source": "rule_center", "rule_type": "risk", "rule_code": str(manifest.get("rule_code") or "").strip(), "rule_version": str(manifest.get("_rule_version") or "v1.0.0").strip(), "severity": severity, "action": action, "label": label, "message": message, "evidence": evidence, } @staticmethod def _count_values(values: list[str]) -> dict[str, int]: counts: dict[str, int] = {} for value in values: normalized = str(value or "").strip() if not normalized: continue counts[normalized] = counts.get(normalized, 0) + 1 return counts def _collect_invoice_keys_from_contexts(self, contexts: list[dict[str, Any]]) -> list[str]: invoice_keys: list[str] = [] for context in contexts: document_info = context.get("document_info") or {} for key in self._collect_invoice_keys_from_document_info(document_info): if key not in invoice_keys: invoice_keys.append(key) return invoice_keys def _collect_invoice_keys_from_document_info(self, document_info: dict[str, Any]) -> list[str]: keys: list[str] = [] for field in list(document_info.get("fields") or []): if not isinstance(field, dict): continue field_key = str(field.get("key") or "").strip().lower().replace("_", "") label = str(field.get("label") or "").replace(" ", "") value = str(field.get("value") or "").strip() if not value: continue if field_key in {"invoiceno", "invoicenumber", "number", "code"} or any( token in label for token in ("发票号码", "票号", "发票代码", "号码") ): normalized = re.sub(r"\s+", "", value) if normalized and normalized not in keys: keys.append(normalized) return keys def _collect_attachment_cities( self, contexts: list[dict[str, Any]], policy: RuntimeTravelPolicy, ) -> list[str]: cities: list[str] = [] for context in contexts: document_info = context.get("document_info") or {} parts = [ str(context.get("ocr_summary") or ""), str(context.get("ocr_text") or ""), str(context.get("item").item_location if context.get("item") is not None else ""), ] for field in list(document_info.get("fields") or []): if isinstance(field, dict): parts.append(str(field.get("value") or "")) for city in self._extract_known_cities_from_text(" ".join(parts), policy): if city not in cities: cities.append(city) return cities @staticmethod def _extract_known_cities_from_text(text: str, policy: RuntimeTravelPolicy) -> list[str]: normalized = str(text or "").strip() if not normalized: return [] cities: list[str] = [] for city in sorted(policy.city_tiers.keys(), key=lambda item: len(item), reverse=True): if city in normalized and city not in cities: cities.append(city) return cities @staticmethod def _resolve_first_document_field_value( document_info: dict[str, Any], *, keys: set[str], labels: set[str], ) -> str: normalized_keys = {key.replace("_", "").lower() for key in keys} for field in list(document_info.get("fields") or []): if not isinstance(field, dict): continue field_key = str(field.get("key") or "").strip().lower().replace("_", "") label = str(field.get("label") or "").replace(" ", "") value = str(field.get("value") or "").strip() if not value: continue if field_key in normalized_keys or any(token in label for token in labels): return value return "" def _run_scene_policy_review(self, claim: ExpenseClaim) -> dict[str, list[Any]]: catalog = self._get_expense_rule_catalog() flags: list[dict[str, Any]] = [] blocking_reasons: list[str] = [] reason_corpus = self._build_scene_reason_corpus(claim) scene_totals: dict[str, Decimal] = defaultdict(lambda: Decimal("0.00")) scene_warned: set[str] = set() for item in claim.items: item_type = str(item.item_type or claim.expense_type or "other").strip().lower() or "other" policy = catalog.get_scene_policy(item_type) if policy is None: continue scene_totals[item_type] += Decimal(item.item_amount or Decimal("0.00")).quantize(Decimal("0.01")) if policy.always_warn and item_type not in scene_warned: scene_warned.add(item_type) flags.append( { "source": "submission_review", "severity": "medium", "label": f"{policy.label}人工重点复核", "message": policy.always_warn_message or f"{policy.label}默认需要人工重点复核。", "rule_code": policy.rule_code, } ) item_limit = policy.item_amount_limit item_amount = Decimal(item.item_amount or Decimal("0.00")).quantize(Decimal("0.01")) if item_limit is not None and item_amount > Decimal("0.00"): exceeded = self._evaluate_amount_limit( amount=item_amount, limit_config=item_limit, reason_text="\n".join( part for part in [reason_corpus, str(item.item_reason or "").strip()] if part ), ) if exceeded is not None: severity, threshold = exceeded label = ( f"{policy.label}金额超标待说明" if severity == "high" else f"{policy.label}金额超标提醒" ) message = ( f"{policy.label}当前识别金额为 {item_amount} 元," f"已超过制度阈值 {threshold} 元。" ) if severity == "high": message += " 当前未识别到例外说明,请先补充原因。" blocking_reasons.append(f"{policy.label}金额超出制度阈值,且未补充例外说明。") else: message += " 已识别到例外说明,请审批人重点复核。" flags.append( { "source": "submission_review", "severity": severity, "label": label, "message": message, "rule_code": policy.rule_code, } ) for scene_code, total_amount in scene_totals.items(): policy = catalog.get_scene_policy(scene_code) if policy is None or policy.claim_amount_limit is None or total_amount <= Decimal("0.00"): continue exceeded = self._evaluate_amount_limit( amount=total_amount, limit_config=policy.claim_amount_limit, reason_text=reason_corpus, ) if exceeded is None: continue severity, threshold = exceeded label = f"{policy.label}合计超标待说明" if severity == "high" else f"{policy.label}合计超标提醒" message = ( f"{policy.label}当前合计金额为 {total_amount} 元," f"已超过制度阈值 {threshold} 元。" ) if severity == "high": message += " 当前未识别到例外说明,请先补充原因。" blocking_reasons.append(f"{policy.label}合计金额超出制度阈值,且未补充例外说明。") else: message += " 已识别到例外说明,请审批人重点复核。" flags.append( { "source": "submission_review", "severity": severity, "label": label, "message": message, "rule_code": policy.rule_code, } ) return { "flags": flags, "blocking_reasons": list(dict.fromkeys(reason for reason in blocking_reasons if reason)), } @staticmethod def _evaluate_amount_limit( *, amount: Decimal, limit_config: Any, reason_text: str, ) -> tuple[str, Decimal] | None: block_amount = getattr(limit_config, "block_amount", None) warn_amount = getattr(limit_config, "warn_amount", None) exception_keywords = list(getattr(limit_config, "exception_keywords", []) or []) has_exception = ExpenseClaimService._text_contains_keywords(reason_text, exception_keywords) if block_amount is not None and amount > Decimal(block_amount): return ("medium" if has_exception else "high", Decimal(block_amount)) if warn_amount is not None and amount > Decimal(warn_amount): return ("medium", Decimal(warn_amount)) return None def _run_travel_policy_review(self, claim: ExpenseClaim) -> dict[str, list[Any]]: policy = self._get_expense_rule_catalog().travel_policy if policy is None: return {"flags": [], "blocking_reasons": []} contexts = [ context for context in self._build_claim_attachment_contexts(claim) if self._is_travel_policy_relevant_context(context, policy) ] if not contexts: return {"flags": [], "blocking_reasons": []} reason_corpus = self._build_travel_reason_corpus(claim) has_route_exception = self._text_contains_keywords( reason_corpus, policy.route_exception_keywords, ) has_standard_exception = self._text_contains_keywords( reason_corpus, policy.standard_exception_keywords, ) grade_band = self._resolve_travel_policy_band(claim.employee_grade) band_label = policy.band_labels.get(grade_band or "", str(claim.employee_grade or "").strip() or "当前职级") itinerary_segments: list[dict[str, Any]] = [] itinerary_cities: list[str] = [] hotel_contexts: list[dict[str, Any]] = [] flags: list[dict[str, Any]] = [] blocking_reasons: list[str] = [] for context in contexts: route_segment = self._extract_route_segment(context, policy) if route_segment and self._is_long_distance_travel_context(context, policy): itinerary_segments.append( { "item": context["item"], "origin": route_segment[0], "destination": route_segment[1], } ) itinerary_cities.extend([route_segment[0], route_segment[1]]) scene_code = str(context["document_info"].get("scene_code") or "").strip().lower() document_type = str(context["document_info"].get("document_type") or "").strip().lower() item_type = str(context["item"].item_type or "").strip().lower() if "hotel" in {scene_code, document_type, item_type} or document_type == "hotel_invoice": hotel_contexts.append(context) unique_itinerary_cities = list(dict.fromkeys(city for city in itinerary_cities if city)) expected_destination_city = self._resolve_expected_travel_city( claim, contexts, unique_itinerary_cities, policy, ) if itinerary_segments: unique_destinations = list( dict.fromkeys(segment["destination"] for segment in itinerary_segments if segment["destination"]) ) first_origin = str(itinerary_segments[0]["origin"] or "").strip() last_destination = str(itinerary_segments[-1]["destination"] or "").strip() for previous, current in zip(itinerary_segments, itinerary_segments[1:]): previous_destination = str(previous["destination"] or "").strip() current_origin = str(current["origin"] or "").strip() if previous_destination and current_origin and previous_destination != current_origin: message = ( f"差旅行程未形成连续链路:上一段到达 {previous_destination}," f"下一段却从 {current_origin} 出发,请补充中转或改签说明。" ) flags.append( { "source": "submission_review", "severity": "high", "label": "行程闭环异常", "message": message, "rule_code": policy.rule_code, } ) blocking_reasons.append("差旅行程未形成连续闭环,请补充中转、改签或异地出发原因。") break if ( expected_destination_city and last_destination and last_destination not in {expected_destination_city, first_origin} ): message = ( f"差旅行程终点识别为 {last_destination}," f"与申报目的地 {expected_destination_city} 不一致,请补充多地出差或后续行程说明。" ) flags.append( { "source": "submission_review", "severity": "high", "label": "行程终点异常", "message": message, "rule_code": policy.rule_code, } ) blocking_reasons.append("差旅行程终点与申报目的地不一致,请补充多地出差说明或补齐后续票据。") expected_city_set = { city for city in (expected_destination_city, first_origin) if city } extra_destinations = [ city for city in unique_destinations if city and city not in expected_city_set ] if extra_destinations and not has_route_exception: destinations_text = "、".join(extra_destinations[:3]) flags.append( { "source": "submission_review", "severity": "high", "label": "多城市行程待说明", "message": ( f"检测到本次差旅涉及 {destinations_text} 多个目的地," "但当前报销事由未说明中转、多地拜访或改签原因。" ), "rule_code": policy.rule_code, } ) blocking_reasons.append("检测到多城市差旅行程,但当前未补充中转或多地出差说明。") allowed_hotel_cities = { city for city in [expected_destination_city, *unique_itinerary_cities] if city } for context in hotel_contexts: hotel_city = self._extract_hotel_city(context, policy) if hotel_city and allowed_hotel_cities and hotel_city not in allowed_hotel_cities: expected_text = "、".join(sorted(allowed_hotel_cities)) flags.append( { "source": "submission_review", "severity": "high", "label": "酒店地点异常", "message": ( f"酒店票据识别城市为 {hotel_city}," f"与当前差旅目的地/行程城市 {expected_text} 不一致,请补充异地住宿原因。" ), "rule_code": policy.rule_code, } ) blocking_reasons.append("酒店票据地点与差旅目的地不一致,请补充异地住宿原因或更换附件。") if grade_band is None: continue baseline_city = hotel_city or expected_destination_city city_tier = policy.city_tiers.get(str(baseline_city or "").strip(), "tier_3") cap = Decimal(policy.hotel_limits[grade_band][city_tier]) night_count = self._extract_hotel_night_count(context) item_amount = Decimal(context["item"].item_amount or Decimal("0.00")).quantize(Decimal("0.01")) nightly_amount = (item_amount / Decimal(max(night_count, 1))).quantize(Decimal("0.01")) if nightly_amount <= cap: continue city_tier_label = { "tier_1": "一线城市", "tier_2": "重点城市", "tier_3": "其他城市", }.get(city_tier, "当前城市") hotel_message = ( f"{band_label} 职级在{city_tier_label}的住宿标准为 {cap} 元/晚," f"当前酒店识别金额约 {nightly_amount} 元/晚。" ) item_reason = str(context["item"].item_reason or "").strip() item_has_exception = self._text_contains_keywords(item_reason, policy.standard_exception_keywords) if has_standard_exception or item_has_exception: flags.append( { "source": "submission_review", "severity": "medium", "label": "住宿超标提醒", "message": hotel_message + " 已识别到补充说明,请直属领导重点复核。", "rule_code": policy.rule_code, } ) else: flags.append( { "source": "submission_review", "severity": "high", "label": "住宿超标待说明", "message": hotel_message + " 当前未识别到超标说明,请先补充原因。", "rule_code": policy.rule_code, } ) blocking_reasons.append("住宿金额超出当前职级差标,且未补充超标说明。") if grade_band is not None: for context in contexts: transport_class = self._detect_transport_class(context, policy) if transport_class is None: continue transport_kind, class_label, class_level = transport_class allowed_level = policy.transport_limits.get(grade_band, {}).get(transport_kind) if allowed_level is None or class_level <= allowed_level: continue item_reason = str(context["item"].item_reason or "").strip() item_has_exception = self._text_contains_keywords(item_reason, policy.standard_exception_keywords) message = f"{band_label} 职级当前默认不可报销 {class_label}。" if has_standard_exception or item_has_exception: flags.append( { "source": "submission_review", "severity": "medium", "label": "交通舱位超标提醒", "message": message + " 已识别到补充说明,请审批人重点复核。", "rule_code": policy.rule_code, } ) else: flags.append( { "source": "submission_review", "severity": "high", "label": "交通舱位超标待说明", "message": message + " 当前未识别到例外说明,请先补充原因。", "rule_code": policy.rule_code, } ) blocking_reasons.append("交通舱位或席别超出当前职级差标,且未补充例外说明。") return { "flags": flags, "blocking_reasons": list(dict.fromkeys(reason for reason in blocking_reasons if reason)), } def _build_claim_attachment_contexts(self, claim: ExpenseClaim) -> list[dict[str, Any]]: contexts: list[dict[str, Any]] = [] ordered_items = sorted( claim.items, key=lambda item: ( item.item_date or date.max, self._normalize_sort_datetime(item.created_at), ), ) for index, item in enumerate(ordered_items, start=1): file_path = self._resolve_attachment_path(item.invoice_id) if file_path is None or not file_path.exists(): continue metadata = self._read_attachment_meta(file_path) document_info = metadata.get("document_info") contexts.append( { "index": index, "item": item, "document_info": document_info if isinstance(document_info, dict) else {}, "ocr_text": str(metadata.get("ocr_text") or ""), "ocr_summary": str(metadata.get("ocr_summary") or ""), } ) return contexts def _is_travel_policy_relevant_context( self, context: dict[str, Any], policy: RuntimeTravelPolicy, ) -> bool: item = context.get("item") document_info = context.get("document_info") or {} item_type = str(getattr(item, "item_type", "") or "").strip().lower() scene_code = str(document_info.get("scene_code") or "").strip().lower() document_type = str(document_info.get("document_type") or "").strip().lower() return ( item_type in set(policy.relevant_expense_types) or scene_code in set(policy.relevant_expense_types) or document_type in {"hotel_invoice", *set(policy.long_distance_document_types)} ) @staticmethod def _resolve_document_field_value(document_info: dict[str, Any], key: str) -> str: normalized_key = str(key or "").strip().lower() for field in list(document_info.get("fields") or []): if not isinstance(field, dict): continue field_key = str(field.get("key") or "").strip().lower() if field_key == normalized_key: return str(field.get("value") or "").strip() return "" @staticmethod def _text_contains_keywords(text: str, keywords: tuple[str, ...] | list[str]) -> bool: compact = re.sub(r"\s+", "", str(text or "")) if not compact: return False return any(keyword in compact for keyword in keywords) def _build_travel_reason_corpus(self, claim: ExpenseClaim) -> str: parts = [str(claim.reason or "").strip(), str(claim.location or "").strip()] for item in claim.items: parts.append(str(item.item_reason or "").strip()) parts.append(str(item.item_location or "").strip()) return "\n".join(part for part in parts if part) @staticmethod def _resolve_travel_policy_band(grade: str | None) -> str | None: normalized = str(grade or "").strip().upper() if not normalized: return None p_match = re.search(r"P(\d+)", normalized) if p_match: level = int(p_match.group(1)) if level <= 3: return "junior" if level <= 5: return "mid" return "senior" m_match = re.search(r"M(\d+)", normalized) if m_match: level = int(m_match.group(1)) if level <= 2: return "manager" return "executive" if normalized.startswith("D"): return "executive" return None def _resolve_expected_travel_city( self, claim: ExpenseClaim, contexts: list[dict[str, Any]], itinerary_cities: list[str], policy: RuntimeTravelPolicy, ) -> str: claim_city = self._extract_city_from_text(str(claim.location or ""), policy) if claim_city: return claim_city for context in contexts: hotel_city = self._extract_hotel_city(context, policy) if hotel_city: return hotel_city if len(itinerary_cities) >= 2 and itinerary_cities[1]: return itinerary_cities[1] for city in itinerary_cities: if city: return city return "" def _extract_route_segment( self, context: dict[str, Any], policy: RuntimeTravelPolicy, ) -> tuple[str, str] | None: document_info = context["document_info"] route_value = self._resolve_document_field_value(document_info, "route") if not route_value or "-" not in route_value: return None origin_text, destination_text = [segment.strip() for segment in route_value.split("-", 1)] origin_city = self._extract_city_from_text(origin_text, policy) destination_city = self._extract_city_from_text(destination_text, policy) if not origin_city or not destination_city or origin_city == destination_city: return None return origin_city, destination_city def _extract_hotel_city(self, context: dict[str, Any], policy: RuntimeTravelPolicy) -> str: document_info = context["document_info"] item = context["item"] merchant_name = self._resolve_document_field_value(document_info, "merchant_name") for candidate in ( merchant_name, str(item.item_location or ""), str(context.get("ocr_summary") or ""), str(context.get("ocr_text") or ""), ): city = self._extract_city_from_text(candidate, policy) if city: return city return "" @staticmethod def _extract_city_from_text(text: str, policy: RuntimeTravelPolicy) -> str: normalized = str(text or "").strip() if not normalized: return "" city_match_order = sorted(policy.city_tiers.keys(), key=lambda item: len(item), reverse=True) for city in city_match_order: if city in normalized: return city return "" @staticmethod def _extract_hotel_night_count(context: dict[str, Any]) -> int: text = " ".join( [ str(context.get("ocr_summary") or "").strip(), str(context.get("ocr_text") or "").strip(), ] ).strip() match = TRAVEL_POLICY_HOTEL_NIGHT_PATTERN.search(text) if not match: return 1 try: return max(1, int(match.group(1))) except (TypeError, ValueError): return 1 def _detect_transport_class( self, context: dict[str, Any], policy: RuntimeTravelPolicy, ) -> tuple[str, str, int] | None: document_info = context["document_info"] document_type = str(document_info.get("document_type") or "").strip().lower() text = " ".join( [ str(context.get("ocr_summary") or "").strip(), str(context.get("ocr_text") or "").strip(), ] ).strip() compact_text = re.sub(r"\s+", "", text) if not compact_text: return None if document_type == "flight_itinerary": for config in policy.flight_classes: label = str(config.keyword or "").strip() level = int(config.level) if label in compact_text: return "flight", label, level return None if document_type == "train_ticket": for config in policy.train_classes: label = str(config.keyword or "").strip() level = int(config.level) if label in compact_text: return "train", label, level return None return None def _is_long_distance_travel_context( self, context: dict[str, Any], policy: RuntimeTravelPolicy, ) -> bool: document_info = context["document_info"] document_type = str(document_info.get("document_type") or "").strip().lower() scene_code = str(document_info.get("scene_code") or "").strip().lower() if document_type in set(policy.long_distance_document_types): return True return scene_code == "travel" def _sync_claim_from_items(self, claim: ExpenseClaim) -> None: if not claim.items: claim.amount = Decimal("0.00") claim.invoice_count = 0 claim.risk_flags_json = self._merge_claim_attachment_risk_flags(claim, []) return ordered_items = sorted( claim.items, key=lambda item: ( item.item_date or date.max, self._normalize_sort_datetime(item.created_at), ), ) primary_item = ordered_items[0] total_amount = sum((item.item_amount for item in ordered_items), Decimal("0.00")) claim.amount = total_amount.quantize(Decimal("0.01")) claim.invoice_count = sum(1 for item in ordered_items if str(item.invoice_id or "").strip()) claim.occurred_at = datetime( primary_item.item_date.year, primary_item.item_date.month, primary_item.item_date.day, tzinfo=UTC, ) claim.expense_type = str(primary_item.item_type or claim.expense_type or "other").strip() or "other" claim.reason = ( self._normalize_optional_text(primary_item.item_reason, fallback=claim.reason or "待补充") or "待补充" ) claim.location = ( self._normalize_optional_text(primary_item.item_location, fallback=claim.location or "待补充") or "待补充" ) claim.risk_flags_json = self._merge_claim_attachment_risk_flags( claim, self._build_claim_attachment_risk_flags(ordered_items), ) if str(claim.status or "").strip().lower() == "draft": claim.approval_stage = "待提交" def _refresh_item_attachment_analysis(self, item: ExpenseClaimItem) -> None: file_path = self._resolve_attachment_path(item.invoice_id) if file_path is None or not file_path.exists(): return metadata = self._read_attachment_meta(file_path) media_type = str(metadata.get("media_type") or self._resolve_attachment_media_type(file_path.name)).strip() ocr_status = str(metadata.get("ocr_status") or "").strip().lower() if ocr_status == "failed": analysis = self._build_failed_ocr_attachment_analysis( media_type=media_type, error_message=str(metadata.get("ocr_error") or ""), item=item, ) elif ocr_status == "recognized" or any( ( str(metadata.get("ocr_text") or "").strip(), str(metadata.get("ocr_summary") or "").strip(), int(metadata.get("ocr_line_count") or 0), list(metadata.get("ocr_warnings") or []), ) ): stored_document_info = metadata.get("document_info") if not isinstance(stored_document_info, dict): stored_document_info = {} document = SimpleNamespace( filename=str(metadata.get("file_name") or file_path.name), text=str(metadata.get("ocr_text") or ""), summary=str(metadata.get("ocr_summary") or ""), avg_score=float(metadata.get("ocr_avg_score") or 0.0), line_count=int(metadata.get("ocr_line_count") or 0), document_type=str(stored_document_info.get("document_type") or ""), document_type_label=str(stored_document_info.get("document_type_label") or ""), scene_code=str(stored_document_info.get("scene_code") or ""), scene_label=str(stored_document_info.get("scene_label") or ""), document_fields=list(stored_document_info.get("fields") or []), warnings=[str(value) for value in list(metadata.get("ocr_warnings") or []) if str(value).strip()], ) document_info = self._build_attachment_document_info(document) requirement_check = self._build_attachment_requirement_check( item=item, document_info=document_info, ) analysis = self._build_attachment_analysis( document=document, item=item, document_info=document_info, requirement_check=requirement_check, ) metadata["document_info"] = document_info metadata["requirement_check"] = requirement_check else: analysis = self._build_fallback_attachment_analysis(media_type=media_type, item=item) metadata["analysis"] = analysis self._write_attachment_meta(file_path, metadata) def _build_claim_attachment_risk_flags(self, ordered_items: list[ExpenseClaimItem]) -> list[dict[str, Any]]: derived_flags: list[dict[str, Any]] = [] for index, item in enumerate(ordered_items, start=1): file_path = self._resolve_attachment_path(item.invoice_id) if file_path is None or not file_path.exists(): continue metadata = self._read_attachment_meta(file_path) analysis = metadata.get("analysis") if not isinstance(analysis, dict): continue severity = str(analysis.get("severity") or "").strip().lower() if severity in {"", "pass", "low"}: continue summary = str(analysis.get("summary") or analysis.get("headline") or "").strip() or "附件存在待核对风险。" label = str(analysis.get("label") or ("高风险" if severity == "high" else "中风险")).strip() derived_flags.append( { "source": "attachment_analysis", "item_id": item.id, "severity": severity, "label": label, "message": f"费用明细第 {index} 条:{summary}", } ) return derived_flags def _get_expense_rule_catalog(self) -> Any: cached = getattr(self, "_expense_rule_catalog", None) if cached is not None: return cached db = getattr(self, "db", None) if db is None: catalog = build_default_expense_rule_catalog() else: catalog = ExpenseRuleRuntimeService(db).load_catalog() setattr(self, "_expense_rule_catalog", catalog) return catalog def _get_expense_scene_policy(self, expense_type: str | None) -> Any | None: return self._get_expense_rule_catalog().get_scene_policy(expense_type) def _resolve_min_attachment_count(self, expense_type: str | None) -> int: policy = self._get_expense_scene_policy(expense_type) if policy is None: return 1 return max(0, int(policy.min_attachment_count or 0)) def _build_scene_reason_corpus(self, claim: ExpenseClaim) -> str: parts = [str(claim.reason or "").strip(), str(claim.location or "").strip()] for item in claim.items: parts.append(str(item.item_reason or "").strip()) parts.append(str(item.item_location or "").strip()) return "\n".join(part for part in parts if part) @staticmethod def _merge_claim_attachment_risk_flags( claim: ExpenseClaim, attachment_risk_flags: list[dict[str, Any]], ) -> list[Any]: preserved_flags = [ flag for flag in list(claim.risk_flags_json or []) if not (isinstance(flag, dict) and str(flag.get("source") or "").strip() == "attachment_analysis") ] return preserved_flags + attachment_risk_flags @staticmethod def _format_submission_blocked_message(issues: list[str]) -> str: normalized_issues = [str(issue or "").strip() for issue in issues if str(issue or "").strip()] if not normalized_issues: return "AI预审未通过,但没有返回明确原因,请刷新草稿后重试。" return "AI预审暂未通过,原因如下:\n" + "\n".join( f"{index}. {issue}" for index, issue in enumerate(normalized_issues, start=1) ) def _validate_claim_for_submission(self, claim: ExpenseClaim) -> list[str]: issues: list[str] = [] claim_location_required = self._is_location_required_expense_type(claim.expense_type) claim_min_attachment_count = self._resolve_min_attachment_count(claim.expense_type) if self._is_missing_value(claim.employee_name): issues.append("申请人未完善") if self._is_missing_value(claim.department_name): issues.append("所属部门未完善") if self._is_missing_value(claim.expense_type): issues.append("报销类型未完善") if self._is_missing_value(claim.reason): issues.append("报销事由未完善") if claim_location_required and self._is_missing_value(claim.location): issues.append("业务地点未完善") if claim.amount is None or claim.amount <= Decimal("0.00"): issues.append("报销金额未完善") if claim.occurred_at is None: issues.append("发生时间未完善") if int(claim.invoice_count or 0) < claim_min_attachment_count: issues.append("票据附件数量不足") if not claim.items: issues.append("费用明细不能为空") for index, item in enumerate(claim.items, start=1): prefix = f"费用明细第 {index} 条" item_location_required = self._is_location_required_expense_type(item.item_type or claim.expense_type) if item.item_date is None: issues.append(f"{prefix}缺少日期") if self._is_missing_value(item.item_type): issues.append(f"{prefix}缺少费用项目") if self._is_missing_value(item.item_reason): issues.append(f"{prefix}缺少说明") if item_location_required and self._is_missing_value(item.item_location): issues.append(f"{prefix}缺少地点") if item.item_amount is None or item.item_amount <= Decimal("0.00"): issues.append(f"{prefix}缺少金额") if self._is_missing_value(item.invoice_id): issues.append(f"{prefix}缺少票据标识") return issues def _is_location_required_expense_type(self, expense_type: str | None) -> bool: policy = self._get_expense_scene_policy(expense_type) if policy is None: return str(expense_type or "").strip().lower() in LOCATION_REQUIRED_EXPENSE_TYPES return bool(policy.location_required) @staticmethod def _has_privileged_claim_access(current_user: CurrentUserContext) -> bool: if current_user.is_admin: return True role_codes = { str(item).strip().lower() for item in current_user.role_codes if str(item).strip() } return bool(role_codes & PRIVILEGED_CLAIM_ROLE_CODES) @staticmethod def _normalize_role_codes(current_user: CurrentUserContext) -> set[str]: return { str(item).strip().lower() for item in current_user.role_codes if str(item).strip() } def _resolve_current_employee(self, current_user: CurrentUserContext) -> Employee | None: return self._resolve_employee_by_identity_candidates( [ str(current_user.username or "").strip(), str(current_user.name or "").strip(), ] ) def _resolve_claim_employee_for_backfill(self, claim: ExpenseClaim) -> Employee | None: if claim.employee is not None: employee = self.db.scalar( select(Employee) .options( selectinload(Employee.organization_unit), selectinload(Employee.manager), selectinload(Employee.roles), ) .where(Employee.id == claim.employee.id) .limit(1) ) return employee or claim.employee employee_id = str(claim.employee_id or "").strip() if employee_id: employee = self.db.scalar( select(Employee) .options( selectinload(Employee.organization_unit), selectinload(Employee.manager), selectinload(Employee.roles), ) .where(Employee.id == employee_id) .limit(1) ) if employee is not None: return employee return self._resolve_employee_by_identity_candidates([str(claim.employee_name or "").strip()]) def _resolve_employee_by_identity_candidates(self, candidates: list[str]) -> Employee | None: normalized_candidates = [ item for item in dict.fromkeys(str(candidate or "").strip() for candidate in candidates) if item ] if not normalized_candidates: return None load_options = ( selectinload(Employee.organization_unit), selectinload(Employee.manager), selectinload(Employee.roles), ) for candidate in normalized_candidates: employee = self.db.scalar( select(Employee) .options(*load_options) .where( or_( func.lower(Employee.email) == candidate.lower(), func.lower(Employee.employee_no) == candidate.lower(), ) ) .limit(1) ) if employee is not None: return employee for candidate in normalized_candidates: matches = list( self.db.scalars( select(Employee) .options(*load_options) .where(Employee.name == candidate) .limit(2) ).all() ) if len(matches) == 1: return matches[0] return None def _backfill_claim_identity_from_current_user( self, claim: ExpenseClaim, current_user: CurrentUserContext, ) -> None: employee = self._resolve_claim_employee_for_backfill(claim) or self._resolve_current_employee(current_user) if employee is not None: claim_employee_id = str(claim.employee_id or "").strip() claim_employee_name = str(claim.employee_name or "").strip() employee_names = { str(employee.name or "").strip(), str(employee.email or "").strip(), str(employee.employee_no or "").strip(), } employee_names.discard("") can_apply_employee = ( not claim_employee_id or claim_employee_id == employee.id or self._is_missing_value(claim_employee_name) or claim_employee_name in employee_names ) if can_apply_employee: claim.employee = employee claim.employee_id = employee.id if employee.name: claim.employee_name = employee.name if employee.organization_unit is not None: claim.department_id = employee.organization_unit_id claim.department_name = employee.organization_unit.name return context_department = str( getattr(current_user, "department_name", "") or getattr(current_user, "department", "") or getattr(current_user, "departmentName", "") or "" ).strip() if context_department and self._is_missing_value(claim.department_name): claim.department_name = context_department context_name = str(current_user.name or current_user.username or "").strip() if context_name and self._is_missing_value(claim.employee_name): claim.employee_name = context_name def _employee_name_is_unique(self, employee: Employee) -> bool: normalized_name = str(employee.name or "").strip() if not normalized_name: return False same_name_count = int( self.db.scalar( select(func.count()).select_from(Employee).where(Employee.name == normalized_name) ) or 0 ) return same_name_count == 1 def _apply_claim_scope(self, stmt: Any, current_user: CurrentUserContext) -> Any: if self._has_privileged_claim_access(current_user): return stmt conditions = [] username = str(current_user.username or "").strip() employee = self._resolve_current_employee(current_user) def add_condition(field_name: str, value: str | None) -> None: normalized = str(value or "").strip() if not normalized: return if field_name == "employee_id": conditions.append(ExpenseClaim.employee_id == normalized) return conditions.append(ExpenseClaim.employee_name == normalized) if employee is not None: add_condition("employee_id", employee.id) add_condition("employee_name", employee.email) if self._employee_name_is_unique(employee): add_condition("employee_name", employee.name) else: add_condition("employee_id", username) add_condition("employee_name", username) if not conditions: return stmt.where(ExpenseClaim.id == "__no_visible_claim__") role_codes = self._normalize_role_codes(current_user) if role_codes & APPROVAL_VISIBLE_CLAIM_ROLE_CODES: pending_leader_approval = and_( ExpenseClaim.status == "submitted", ExpenseClaim.approval_stage == "直属领导审批", ) if employee is not None: subordinate_ids = select(Employee.id).where(Employee.manager_id == employee.id) conditions.append(and_(pending_leader_approval, ExpenseClaim.employee_id.in_(subordinate_ids))) manager_name = str( employee.name if employee is not None and employee.name else current_user.name or "" ).strip() if manager_name: managed_department_ids = select(OrganizationUnit.id).where(OrganizationUnit.manager_name == manager_name) managed_department_names = select(OrganizationUnit.name).where(OrganizationUnit.manager_name == manager_name) conditions.append(and_(pending_leader_approval, ExpenseClaim.department_id.in_(managed_department_ids))) conditions.append(and_(pending_leader_approval, ExpenseClaim.department_name.in_(managed_department_names))) return stmt.where(or_(*conditions)) def _ensure_ready(self) -> None: AgentFoundationService(self.db).ensure_foundation_ready()