from __future__ import annotations import json import mimetypes import re import shutil from datetime import UTC, date, datetime from decimal import Decimal, InvalidOperation from pathlib import Path from types import SimpleNamespace from typing import Any from sqlalchemy import func, or_, select from sqlalchemy.orm import Session, selectinload from app.api.deps import CurrentUserContext from app.core.config import get_settings from app.models.employee import Employee from app.models.financial_record import ExpenseClaim, ExpenseClaimItem from app.schemas.ontology import OntologyEntity, OntologyParseResult from app.schemas.reimbursement import ExpenseClaimItemCreate, ExpenseClaimItemUpdate from app.services.agent_foundation import AgentFoundationService from app.services.audit import AuditLogService from app.services.document_intelligence import build_document_insight from app.services.ocr import OcrService EXPENSE_TYPE_LABELS = { "travel": "差旅", "hotel": "住宿", "transport": "交通", "meal": "餐费", "meeting": "会务", "entertainment": "招待", "office": "办公", "training": "培训", "communication": "通讯", "welfare": "福利", } PRIVILEGED_CLAIM_ROLE_CODES = {"finance"} MAX_DRAFT_CLAIMS_PER_USER = 3 LOCATION_REQUIRED_EXPENSE_TYPES = { "travel", "hotel", "transport", "meal", "meeting", "entertainment", } EXPENSE_SCENE_KEYWORDS = { "travel": ("差旅", "出差", "行程"), "hotel": ("酒店", "住宿", "房费", "客房", "入住", "离店"), "transport": ( "交通", "打车", "出租车", "网约车", "滴滴", "出行", "高铁", "动车", "火车", "机票", "航班", "行程单", "登机", "客票", "公交", "地铁", "过路费", "通行费", "停车", ), "meal": ("餐饮", "餐费", "用餐", "外卖", "快餐", "酒楼", "饭店", "饭馆", "食品", "咖啡"), "entertainment": ("招待", "宴请", "接待", "客户餐", "商务餐", "业务招待"), "office": ("办公", "办公用品", "文具", "耗材", "打印", "纸张", "硒鼓", "墨盒", "鼠标", "键盘", "电脑"), "meeting": ("会议", "会务", "会展", "会议室", "会场", "场地费", "论坛"), "training": ("培训", "课程", "讲师", "教材", "学费", "认证"), } EXPENSE_TYPE_ALLOWED_DOCUMENT_SCENES = { "travel": {"travel", "hotel", "transport", "meal"}, "hotel": {"hotel"}, "transport": {"transport", "travel"}, "meal": {"meal", "entertainment"}, "entertainment": {"entertainment", "meal"}, "office": {"office"}, "meeting": {"meeting"}, "training": {"training"}, } DOCUMENT_SCENE_LABELS = { "travel": "差旅", "hotel": "住宿", "transport": "交通", "meal": "餐饮", "entertainment": "业务招待", "office": "办公用品", "meeting": "会务", "training": "培训", "other": "其他票据", } class ExpenseClaimService: def __init__(self, db: Session) -> None: self.db = db self.audit_service = AuditLogService(db) def list_claims(self, current_user: CurrentUserContext) -> list[ExpenseClaim]: stmt = ( select(ExpenseClaim) .options( selectinload(ExpenseClaim.items), selectinload(ExpenseClaim.employee).selectinload(Employee.manager), selectinload(ExpenseClaim.employee).selectinload(Employee.roles), ) .order_by(ExpenseClaim.created_at.desc(), ExpenseClaim.occurred_at.desc()) ) stmt = self._apply_claim_scope(stmt, current_user) return list(self.db.scalars(stmt).all()) def get_claim(self, claim_id: str, current_user: CurrentUserContext) -> ExpenseClaim | None: stmt = ( select(ExpenseClaim) .options( selectinload(ExpenseClaim.items), selectinload(ExpenseClaim.employee).selectinload(Employee.manager), selectinload(ExpenseClaim.employee).selectinload(Employee.roles), ) .where(ExpenseClaim.id == claim_id) ) stmt = self._apply_claim_scope(stmt, current_user) return self.db.scalar(stmt) def update_claim_item( self, *, claim_id: str, item_id: str, payload: ExpenseClaimItemUpdate, current_user: CurrentUserContext, ) -> ExpenseClaim | None: claim = self.get_claim(claim_id, current_user) if claim is None: return None self._ensure_draft_claim(claim) item = next((entry for entry in claim.items if entry.id == item_id), None) if item is None: raise LookupError("Item not found") before_json = self._serialize_claim(claim) if payload.item_date is not None: item.item_date = payload.item_date if payload.item_type is not None: item.item_type = self._normalize_optional_text(payload.item_type, fallback=item.item_type) or item.item_type if payload.item_reason is not None: item.item_reason = ( self._normalize_optional_text(payload.item_reason, fallback=item.item_reason) or item.item_reason ) if payload.item_location is not None: item.item_location = ( self._normalize_optional_text(payload.item_location, fallback=item.item_location) or item.item_location ) if payload.item_amount is not None: amount = payload.item_amount.quantize(Decimal("0.01")) if amount <= Decimal("0.00"): raise ValueError("费用金额必须大于 0。") item.item_amount = amount if payload.invoice_id is not None: item.invoice_id = self._normalize_optional_text(payload.invoice_id, allow_empty=True) self._refresh_item_attachment_analysis(item) self._sync_claim_from_items(claim) self.db.commit() self.db.refresh(claim) self.audit_service.log_action( actor=current_user.name or current_user.username, action="expense_claim.item_update", resource_type="expense_claim", resource_id=claim.id, before_json=before_json, after_json=self._serialize_claim(claim), ) return claim def create_claim_item( self, *, claim_id: str, payload: ExpenseClaimItemCreate | None, current_user: CurrentUserContext, ) -> ExpenseClaim | None: claim = self.get_claim(claim_id, current_user) if claim is None: return None self._ensure_draft_claim(claim) before_json = self._serialize_claim(claim) payload = payload or ExpenseClaimItemCreate() occurred_at = claim.occurred_at if claim.occurred_at is not None else datetime.now(UTC) item_amount = Decimal("0.00") if payload.item_amount is not None: item_amount = payload.item_amount.quantize(Decimal("0.01")) if item_amount < Decimal("0.00"): raise ValueError("费用金额不能小于 0。") item = ExpenseClaimItem( claim_id=claim.id, item_date=payload.item_date or occurred_at.date(), item_type=self._normalize_optional_text( payload.item_type, fallback=str(claim.expense_type or "").strip() or "other", ) or "other", item_reason=self._normalize_optional_text(payload.item_reason, fallback="") or "", item_location=self._normalize_optional_text(payload.item_location, fallback="") or "", item_amount=item_amount, invoice_id=self._normalize_optional_text(payload.invoice_id, allow_empty=True), ) claim.items.append(item) self.db.add(item) self._sync_claim_from_items(claim) self.db.commit() self.db.refresh(claim) self.audit_service.log_action( actor=current_user.name or current_user.username, action="expense_claim.item_create", resource_type="expense_claim", resource_id=claim.id, before_json=before_json, after_json=self._serialize_claim(claim), ) return claim def delete_claim_item( self, *, claim_id: str, item_id: str, current_user: CurrentUserContext, ) -> dict[str, Any] | None: claim, item = self._get_claim_item_or_raise( claim_id=claim_id, item_id=item_id, current_user=current_user, ) if claim is None: return None self._ensure_draft_claim(claim) before_json = self._serialize_claim(claim) item_label = str(item.item_reason or "").strip() or self._resolve_expense_type_label(item.item_type) self._delete_item_attachment_files(item) claim.items = [entry for entry in claim.items if entry.id != item.id] self.db.delete(item) self._sync_claim_from_items(claim) self.db.commit() self.db.refresh(claim) self.audit_service.log_action( actor=current_user.name or current_user.username, action="expense_claim.item_delete", resource_type="expense_claim", resource_id=claim.id, before_json=before_json, after_json=self._serialize_claim(claim), ) return { "message": f"费用明细“{item_label}”已删除。", "claim_id": claim.id, "item_id": item.id, } def upload_claim_item_attachment( self, *, claim_id: str, item_id: str, filename: str, content: bytes, media_type: str | None, current_user: CurrentUserContext, ) -> dict[str, Any] | None: claim, item = self._get_claim_item_or_raise( claim_id=claim_id, item_id=item_id, current_user=current_user, ) if claim is None: return None self._ensure_draft_claim(claim) normalized_name = self._normalize_attachment_filename(filename) if not content: raise ValueError("上传文件不能为空。") before_json = self._serialize_claim(claim) attachment_dir = self._build_item_attachment_dir(claim.id, item.id) shutil.rmtree(attachment_dir, ignore_errors=True) attachment_dir.mkdir(parents=True, exist_ok=True) file_path = attachment_dir / normalized_name file_path.write_bytes(content) attachment_analysis = self._build_fallback_attachment_analysis( media_type=media_type, item=item, ) ocr_document = None document_info = None requirement_check = None ocr_status = "empty" ocr_error = "" try: ocr_result = OcrService(self.db).recognize_files( [(normalized_name, content, media_type or "application/octet-stream")] ) documents = list(ocr_result.documents or []) if documents: ocr_document = documents[0] ocr_status = "recognized" document_info = self._build_attachment_document_info(ocr_document) requirement_check = self._build_attachment_requirement_check( item=item, document_info=document_info, ) attachment_analysis = self._build_attachment_analysis( document=ocr_document, item=item, document_info=document_info, requirement_check=requirement_check, ) except Exception as exc: # pragma: no cover - fallback path depends on OCR runtime ocr_status = "failed" ocr_error = str(exc) attachment_analysis = self._build_failed_ocr_attachment_analysis( media_type=media_type, error_message=ocr_error, item=item, ) item.invoice_id = self._to_attachment_storage_key(file_path) meta = { "file_name": normalized_name, "storage_key": item.invoice_id, "media_type": self._resolve_attachment_media_type( normalized_name, fallback=media_type, ), "size_bytes": len(content), "uploaded_at": datetime.now(UTC).isoformat(), "previewable": self._is_previewable_media_type(media_type, normalized_name), "analysis": attachment_analysis, "document_info": document_info, "requirement_check": requirement_check, "ocr_status": ocr_status, "ocr_error": ocr_error, "ocr_text": str(getattr(ocr_document, "text", "") or ""), "ocr_summary": str(getattr(ocr_document, "summary", "") or ""), "ocr_avg_score": float(getattr(ocr_document, "avg_score", 0.0) or 0.0), "ocr_line_count": int(getattr(ocr_document, "line_count", 0) or 0), "ocr_classification_source": str(getattr(ocr_document, "classification_source", "") or ""), "ocr_classification_confidence": float(getattr(ocr_document, "classification_confidence", 0.0) or 0.0), "ocr_classification_evidence": [ str(item) for item in getattr(ocr_document, "classification_evidence", []) or [] if str(item).strip() ], "ocr_warnings": [str(item) for item in getattr(ocr_document, "warnings", []) or []], } self._write_attachment_meta(file_path, meta) self._sync_claim_from_items(claim) self.db.commit() self.db.refresh(claim) self.audit_service.log_action( actor=current_user.name or current_user.username, action="expense_claim.attachment_upload", resource_type="expense_claim", resource_id=claim.id, before_json=before_json, after_json=self._serialize_claim(claim), ) return { "message": f"{normalized_name} 已上传并关联到当前费用明细。", "claim_id": claim.id, "item_id": item.id, "invoice_id": item.invoice_id, "attachment": self._build_attachment_payload(item), } def get_claim_item_attachment_meta( self, *, claim_id: str, item_id: str, current_user: CurrentUserContext, ) -> dict[str, Any] | None: claim, item = self._get_claim_item_or_raise( claim_id=claim_id, item_id=item_id, current_user=current_user, ) if claim is None: return None return self._build_attachment_payload(item) def get_claim_item_attachment_content( self, *, claim_id: str, item_id: str, current_user: CurrentUserContext, ) -> tuple[Path, str, str] | None: claim, item = self._get_claim_item_or_raise( claim_id=claim_id, item_id=item_id, current_user=current_user, ) if claim is None: return None return self._resolve_item_attachment_content(item) def delete_claim_item_attachment( self, *, claim_id: str, item_id: str, current_user: CurrentUserContext, ) -> dict[str, Any] | None: claim, item = self._get_claim_item_or_raise( claim_id=claim_id, item_id=item_id, current_user=current_user, ) if claim is None: return None self._ensure_draft_claim(claim) before_json = self._serialize_claim(claim) previous_name = self._resolve_attachment_display_name(item.invoice_id) self._delete_item_attachment_files(item) item.invoice_id = None self._sync_claim_from_items(claim) self.db.commit() self.db.refresh(claim) self.audit_service.log_action( actor=current_user.name or current_user.username, action="expense_claim.attachment_delete", resource_type="expense_claim", resource_id=claim.id, before_json=before_json, after_json=self._serialize_claim(claim), ) return { "message": f"{previous_name or '附件'} 已删除。", "claim_id": claim.id, "item_id": item.id, "invoice_id": item.invoice_id, "attachment": None, } def submit_claim(self, claim_id: str, current_user: CurrentUserContext) -> ExpenseClaim | None: claim = self.get_claim(claim_id, current_user) if claim is None: return None self._ensure_draft_claim(claim) self._sync_claim_from_items(claim) missing_fields = self._validate_claim_for_submission(claim) if missing_fields: raise ValueError("提交前请先补全信息:" + ";".join(missing_fields)) before_json = self._serialize_claim(claim) claim.status = "submitted" claim.approval_stage = "AI验审" claim.submitted_at = datetime.now(UTC) self.db.commit() self.db.refresh(claim) self.audit_service.log_action( actor=current_user.name or current_user.username, action="expense_claim.submit", resource_type="expense_claim", resource_id=claim.id, before_json=before_json, after_json=self._serialize_claim(claim), ) return claim def save_or_submit_from_ontology( self, *, run_id: str, user_id: str | None, message: str, ontology: OntologyParseResult, context_json: dict[str, Any], ) -> dict[str, Any]: result = self.upsert_draft_from_ontology( run_id=run_id, user_id=user_id, message=message, ontology=ontology, context_json=context_json, ) review_action = str(context_json.get("review_action") or "").strip() if review_action != "next_step": return result claim_id = str(result.get("claim_id") or "").strip() if not claim_id or result.get("draft_limit_reached"): return result current_user = CurrentUserContext( username=str(user_id or context_json.get("name") or "anonymous").strip() or "anonymous", name=str(context_json.get("name") or user_id or "anonymous").strip() or "anonymous", role_codes=[ str(item).strip() for item in list(context_json.get("role_codes") or []) if str(item).strip() ], is_admin=bool(context_json.get("is_admin")), ) try: claim = self.submit_claim(claim_id, current_user) except ValueError as exc: return { **result, "message": str(exc), "submission_blocked": True, "draft_only": False, } if claim is None: return { **result, "message": "未找到可提交的报销单,请刷新后重试。", "submission_blocked": True, "draft_only": False, } return { "message": f"报销单 {claim.claim_no} 已提交审批,当前节点为 {claim.approval_stage or '审批中'}。", "draft_only": False, "claim_id": claim.id, "claim_no": claim.claim_no, "status": claim.status, "approval_stage": claim.approval_stage, "amount": float(claim.amount), "invoice_count": int(claim.invoice_count or 0), } def delete_claim(self, claim_id: str, current_user: CurrentUserContext) -> ExpenseClaim | None: claim = self.get_claim(claim_id, current_user) if claim is None: return None self._ensure_draft_claim(claim) before_json = self._serialize_claim(claim) resource_id = claim.id self._delete_claim_attachment_root(claim.id) self.db.delete(claim) self.db.commit() self.audit_service.log_action( actor=current_user.name or current_user.username, action="expense_claim.delete", resource_type="expense_claim", resource_id=resource_id, before_json=before_json, after_json=None, ) return claim def upsert_draft_from_ontology( self, *, run_id: str, user_id: str | None, message: str, ontology: OntologyParseResult, context_json: dict[str, Any], ) -> dict[str, Any]: self._ensure_ready() claim = self._find_target_claim(ontology=ontology, context_json=context_json) is_new_claim = claim is None before_json = self._serialize_claim(claim) if claim is not None else None employee = self._resolve_employee( ontology=ontology, context_json=context_json, user_id=user_id, ) draft_owner_name = ( employee.name if employee is not None else self._resolve_employee_name( ontology=ontology, context_json=context_json, user_id=user_id, ) ) if is_new_claim: existing_draft_count = self._count_draft_claims_for_owner( employee=employee, user_id=user_id, ) if existing_draft_count >= MAX_DRAFT_CLAIMS_PER_USER: return { "message": ( f"你当前已保存 {MAX_DRAFT_CLAIMS_PER_USER} 个草稿,请先完成已保存的草稿," "才能再次新建草稿。" ), "draft_limit_reached": True, "draft_only": False, "status": "blocked", "draft_count": existing_draft_count, "max_draft_count": MAX_DRAFT_CLAIMS_PER_USER, } amount = self._resolve_amount(ontology.entities, context_json=context_json) occurred_at = self._resolve_occurred_at(ontology, context_json=context_json) expense_type = self._resolve_expense_type(ontology.entities, context_json=context_json) location = self._resolve_location(message=message, context_json=context_json) reason = self._resolve_reason( message=message, context_json=context_json, allow_message_fallback=is_new_claim, ) attachment_count = self._resolve_attachment_count(context_json) final_amount = amount if amount is not None else (claim.amount if claim is not None else Decimal("0.00")) final_occurred_at = ( occurred_at if occurred_at is not None else (claim.occurred_at if claim is not None else datetime.now(UTC)) ) final_expense_type = expense_type or (claim.expense_type if claim is not None else "other") final_location = location or (claim.location if claim is not None else "待补充") final_reason = reason or (claim.reason if claim is not None else "待补充") final_attachment_count = ( attachment_count if attachment_count > 0 else int(claim.invoice_count or 0) if claim is not None else 0 ) final_risk_flags = list(ontology.risk_flags) or ( list(claim.risk_flags_json or []) if claim is not None else [] ) if claim is None: claim = ExpenseClaim( claim_no=self._generate_claim_no(final_occurred_at), employee_id=employee.id if employee is not None else None, employee_name=draft_owner_name, department_id=employee.organization_unit_id if employee is not None else None, department_name=self._resolve_department_name( employee=employee, context_json=context_json, ), project_code=self._resolve_project_code(ontology.entities), expense_type=final_expense_type, reason=final_reason, location=final_location, amount=final_amount, currency="CNY", invoice_count=final_attachment_count, occurred_at=final_occurred_at, status="draft", approval_stage="待提交", risk_flags_json=final_risk_flags, ) self.db.add(claim) else: claim.employee_id = employee.id if employee is not None else claim.employee_id claim.employee_name = ( employee.name if employee is not None else self._resolve_employee_name( ontology=ontology, context_json=context_json, user_id=user_id, fallback=claim.employee_name, ) ) claim.department_id = employee.organization_unit_id if employee is not None else claim.department_id claim.department_name = self._resolve_department_name( employee=employee, context_json=context_json, fallback=claim.department_name, ) claim.project_code = self._resolve_project_code(ontology.entities) or claim.project_code claim.expense_type = final_expense_type claim.reason = final_reason claim.location = final_location claim.amount = final_amount claim.invoice_count = final_attachment_count claim.occurred_at = final_occurred_at claim.status = "draft" claim.approval_stage = "待提交" claim.risk_flags_json = final_risk_flags self.db.flush() self._upsert_primary_item( claim=claim, occurred_at=final_occurred_at, expense_type=final_expense_type, amount=final_amount, reason=final_reason, location=final_location, attachment_names=self._resolve_attachment_names(context_json), ) self.db.commit() self.db.refresh(claim) self.audit_service.log_action( actor=user_id or claim.employee_name or "anonymous", action="expense_claim.draft_upsert", resource_type="expense_claim", resource_id=claim.id, before_json=before_json, after_json=self._serialize_claim(claim), request_id=run_id, ) return { "message": ( f"已{'创建' if is_new_claim else '更新'}报销草稿 {claim.claim_no},当前状态为 draft。" "你可以继续补充费用明细、客户单位和票据附件。" ), "draft_only": True, "claim_id": claim.id, "claim_no": claim.claim_no, "status": claim.status, "amount": float(claim.amount), "invoice_count": int(claim.invoice_count or 0), } def _find_target_claim( self, *, ontology: OntologyParseResult, context_json: dict[str, Any], ) -> ExpenseClaim | None: draft_claim_id = str(context_json.get("draft_claim_id") or "").strip() if draft_claim_id: return self.db.get(ExpenseClaim, draft_claim_id) claim_codes = [ item.normalized_value for item in ontology.entities if item.type == "expense_claim" and item.normalized_value ] if not claim_codes: return None stmt = select(ExpenseClaim).where(ExpenseClaim.claim_no.in_(claim_codes)).limit(1) return self.db.scalar(stmt) def _upsert_primary_item( self, *, claim: ExpenseClaim, occurred_at: datetime, expense_type: str, amount: Decimal, reason: str, location: str, attachment_names: list[str], ) -> None: item = claim.items[0] if claim.items else None if item is None: item = ExpenseClaimItem( claim_id=claim.id, item_date=occurred_at.date(), item_type=expense_type, item_reason=reason, item_location=location, item_amount=amount, invoice_id=attachment_names[0] if attachment_names else None, ) claim.items.append(item) self.db.add(item) return item.item_date = occurred_at.date() item.item_type = expense_type item.item_reason = reason item.item_location = location item.item_amount = amount item.invoice_id = attachment_names[0] if attachment_names else item.invoice_id def _generate_claim_no(self, occurred_at: datetime) -> str: month_code = occurred_at.strftime("%Y%m") prefix = f"EXP-{month_code}-" existing = int( self.db.scalar( select(func.count()).select_from(ExpenseClaim).where(ExpenseClaim.claim_no.like(f"{prefix}%")) ) or 0 ) return f"{prefix}{existing + 1:03d}" def _count_draft_claims_for_owner( self, *, employee: Employee | None, user_id: str | None, ) -> int: owner_filters = self._build_draft_owner_filters( employee=employee, user_id=user_id, ) if not owner_filters: return 0 stmt = ( select(func.count()) .select_from(ExpenseClaim) .where(ExpenseClaim.status == "draft") .where(or_(*owner_filters)) ) return int(self.db.scalar(stmt) or 0) def _build_draft_owner_filters( self, *, employee: Employee | None, user_id: str | None, ) -> list[Any]: conditions: list[Any] = [] seen: set[tuple[str, str]] = set() def add_condition(field_name: str, value: str | None) -> None: normalized = str(value or "").strip() if not normalized or normalized == "待补充": return marker = (field_name, normalized.lower()) if marker in seen: return seen.add(marker) if field_name == "employee_id": conditions.append(ExpenseClaim.employee_id == normalized) return conditions.append(ExpenseClaim.employee_name == normalized) if employee is not None: add_condition("employee_id", employee.id) add_condition("employee_name", employee.email) if self._employee_name_is_unique(employee): add_condition("employee_name", employee.name) add_condition("employee_name", user_id) return conditions def _resolve_employee( self, *, ontology: OntologyParseResult, context_json: dict[str, Any], user_id: str | None, ) -> Employee | None: normalized_user_id = str(user_id or "").strip() if normalized_user_id: stmt = select(Employee).where(func.lower(Employee.email) == normalized_user_id.lower()).limit(1) employee = self.db.scalar(stmt) if employee is not None: return employee employee_name = self._resolve_employee_name( ontology=ontology, context_json=context_json, user_id=None, ) if not employee_name: return None stmt = select(Employee).where(Employee.name == employee_name).limit(1) return self.db.scalar(stmt) @staticmethod def _resolve_employee_name( *, ontology: OntologyParseResult, context_json: dict[str, Any], user_id: str | None, fallback: str = "待补充", ) -> str: review_form_values = context_json.get("review_form_values") if isinstance(review_form_values, dict): for key in ("reporter_name", "employee_name", "claimant_name"): value = str(review_form_values.get(key) or "").strip() if value: return value for item in ontology.entities: if item.type == "employee" and item.value.strip(): return item.value.strip() for key in ("name", "user_name", "employee_name"): value = str(context_json.get(key) or "").strip() if value: return value return str(user_id or fallback).strip() or fallback @staticmethod def _resolve_department_name( *, employee: Employee | None, context_json: dict[str, Any], fallback: str = "待补充", ) -> str: if employee is not None and employee.organization_unit is not None: return employee.organization_unit.name request_context = context_json.get("request_context") if isinstance(request_context, dict): for key in ("department", "department_name", "deptName"): value = str(request_context.get(key) or "").strip() if value: return value for key in ("department_name", "department"): value = str(context_json.get(key) or "").strip() if value: return value return fallback @staticmethod def _resolve_project_code(entities: list[OntologyEntity]) -> str | None: for item in entities: if item.type == "project" and item.normalized_value.strip(): return item.normalized_value.strip() return None @staticmethod def _resolve_expense_type( entities: list[OntologyEntity], *, context_json: dict[str, Any], ) -> str | None: review_form_values = context_json.get("review_form_values") if isinstance(review_form_values, dict): compact = str( review_form_values.get("expense_type") or review_form_values.get("reimbursement_type") or "" ).replace(" ", "") if compact: if "招待" in compact or ("客户" in compact and any(word in compact for word in ("吃饭", "宴请", "请客", "用餐"))): return "entertainment" if any(word in compact for word in ("差旅", "出差", "机票", "行程")): return "travel" if any(word in compact for word in ("住宿", "酒店", "宾馆")): return "hotel" if any(word in compact for word in ("交通", "打车", "网约车", "出租车", "停车", "车费")): return "transport" if any(word in compact for word in ("餐费", "用餐", "午餐", "晚餐", "早餐", "伙食")): return "meal" if "会务" in compact: return "meeting" if any(word in compact for word in ("办公费", "办公用品", "文具", "耗材", "办公耗材", "打印纸", "办公设备", "键盘", "鼠标", "白板")): return "office" if any(word in compact for word in ("培训费", "培训", "讲师费", "课时费", "课程费")): return "training" if any(word in compact for word in ("通讯费", "话费", "流量费", "宽带费")): return "communication" if any(word in compact for word in ("福利费", "团建", "慰问", "节日福利", "体检费")): return "welfare" for item in entities: if item.type == "expense_type": normalized = item.normalized_value.strip() if normalized: return normalized return None @staticmethod def _resolve_reason( *, message: str, context_json: dict[str, Any], allow_message_fallback: bool, ) -> str | None: review_form_values = context_json.get("review_form_values") if isinstance(review_form_values, dict): for key in ("reason", "business_reason"): value = str(review_form_values.get(key) or "").strip() if value: return value request_context = context_json.get("request_context") if ( isinstance(request_context, dict) and str(context_json.get("entry_source") or "").strip() == "detail" ): for key in ("reason", "title"): value = str(request_context.get(key) or "").strip() if value: return value if not allow_message_fallback: return None return str(message or "").strip()[:500] or None @staticmethod def _resolve_location(*, message: str, context_json: dict[str, Any]) -> str | None: review_form_values = context_json.get("review_form_values") if isinstance(review_form_values, dict): for key in ("business_location", "location"): value = str(review_form_values.get(key) or "").strip() if value: return value request_context = context_json.get("request_context") if ( isinstance(request_context, dict) and str(context_json.get("entry_source") or "").strip() == "detail" ): for key in ("city", "location"): value = str(request_context.get(key) or "").strip() if value: return value compact = str(message or "").replace(" ", "") if "客户现场" in compact: return "客户现场" return None @staticmethod def _resolve_occurred_at( ontology: OntologyParseResult, *, context_json: dict[str, Any], ) -> datetime | None: review_form_values = context_json.get("review_form_values") if isinstance(review_form_values, dict): for key in ("occurred_date", "time_range", "business_time"): value = str(review_form_values.get(key) or "").strip() if not value: continue try: parsed = date.fromisoformat(value) return datetime(parsed.year, parsed.month, parsed.day, tzinfo=UTC) except ValueError: continue start_date = ontology.time_range.start_date if start_date: try: parsed = date.fromisoformat(start_date) return datetime(parsed.year, parsed.month, parsed.day, tzinfo=UTC) except ValueError: pass return None @staticmethod def _resolve_amount( entities: list[OntologyEntity], *, context_json: dict[str, Any], ) -> Decimal | None: review_form_values = context_json.get("review_form_values") if isinstance(review_form_values, dict): raw_value = str(review_form_values.get("amount") or "").strip() if raw_value: compact = raw_value.replace("元", "").replace(",", "").strip() try: return Decimal(compact).quantize(Decimal("0.01")) except (InvalidOperation, ValueError): pass for item in entities: if item.type != "amount" or item.role == "threshold": continue try: return Decimal(item.normalized_value).quantize(Decimal("0.01")) except (InvalidOperation, ValueError): continue return None @staticmethod def _resolve_attachment_names(context_json: dict[str, Any]) -> list[str]: names = context_json.get("attachment_names") if not isinstance(names, list): return [] return [str(name).strip() for name in names if str(name).strip()] def _resolve_attachment_count(self, context_json: dict[str, Any]) -> int: names = self._resolve_attachment_names(context_json) if names: return len(names) try: return max(0, int(context_json.get("attachment_count") or 0)) except (TypeError, ValueError): return 0 def _get_claim_item_or_raise( self, *, claim_id: str, item_id: str, current_user: CurrentUserContext, ) -> tuple[ExpenseClaim | None, ExpenseClaimItem]: claim = self.get_claim(claim_id, current_user) if claim is None: return None, None # type: ignore[return-value] item = next((entry for entry in claim.items if entry.id == item_id), None) if item is None: raise LookupError("Item not found") return claim, item def _get_attachment_storage_root(self) -> Path: return (get_settings().resolved_storage_root_dir / "expense_claims").resolve() def _build_item_attachment_dir(self, claim_id: str, item_id: str) -> Path: return (self._get_attachment_storage_root() / claim_id / item_id).resolve() def _delete_claim_attachment_root(self, claim_id: str) -> None: shutil.rmtree((self._get_attachment_storage_root() / claim_id).resolve(), ignore_errors=True) @staticmethod def _normalize_attachment_filename(filename: str | None) -> str: normalized = Path(str(filename or "").strip()).name normalized = re.sub(r"[^\w.\-\u4e00-\u9fff]+", "_", normalized).strip("._") suffix = Path(normalized).suffix if normalized: return normalized return f"attachment{suffix or '.bin'}" def _resolve_attachment_path(self, storage_key: str | None) -> Path | None: normalized = str(storage_key or "").strip() if not normalized: return None root = self._get_attachment_storage_root() path = (root / normalized).resolve() try: path.relative_to(root) except ValueError as exc: raise FileNotFoundError("Attachment path is invalid") from exc return path def _to_attachment_storage_key(self, file_path: Path) -> str: root = self._get_attachment_storage_root() return file_path.resolve().relative_to(root).as_posix() def _resolve_item_attachment_content(self, item: ExpenseClaimItem) -> tuple[Path, str, str]: file_path = self._resolve_attachment_path(item.invoice_id) if file_path is None or not file_path.exists(): raise FileNotFoundError("Attachment not found") metadata = self._read_attachment_meta(file_path) filename = str(metadata.get("file_name") or file_path.name) media_type = self._resolve_attachment_media_type( filename, fallback=str(metadata.get("media_type") or ""), ) return file_path, media_type, filename def _delete_item_attachment_files(self, item: ExpenseClaimItem) -> None: file_path = self._resolve_attachment_path(item.invoice_id) if file_path is None: return root = self._get_attachment_storage_root() if file_path.parent == root: file_path.unlink(missing_ok=True) self._attachment_meta_path(file_path).unlink(missing_ok=True) return shutil.rmtree(file_path.parent, ignore_errors=True) @staticmethod def _attachment_meta_path(file_path: Path) -> Path: return file_path.with_name(f"{file_path.name}.meta.json") def _write_attachment_meta(self, file_path: Path, payload: dict[str, Any]) -> None: meta_path = self._attachment_meta_path(file_path) meta_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") def _read_attachment_meta(self, file_path: Path) -> dict[str, Any]: meta_path = self._attachment_meta_path(file_path) if not meta_path.exists(): return {} try: payload = json.loads(meta_path.read_text(encoding="utf-8")) except (json.JSONDecodeError, OSError): return {} return payload if isinstance(payload, dict) else {} def _build_attachment_payload(self, item: ExpenseClaimItem) -> dict[str, Any]: file_path, media_type, filename = self._resolve_item_attachment_content(item) metadata = self._read_attachment_meta(file_path) uploaded_at_value = metadata.get("uploaded_at") uploaded_at = None if isinstance(uploaded_at_value, str) and uploaded_at_value.strip(): try: uploaded_at = datetime.fromisoformat(uploaded_at_value) except ValueError: uploaded_at = None analysis = metadata.get("analysis") if not isinstance(analysis, dict): analysis = None document_info = metadata.get("document_info") if not isinstance(document_info, dict): document_info = None requirement_check = metadata.get("requirement_check") if not isinstance(requirement_check, dict): requirement_check = None return { "file_name": str(metadata.get("file_name") or filename), "storage_key": str(item.invoice_id or ""), "media_type": str(metadata.get("media_type") or media_type), "size_bytes": int(metadata.get("size_bytes") or file_path.stat().st_size), "uploaded_at": uploaded_at, "previewable": bool(metadata.get("previewable", self._is_previewable_media_type(media_type, filename))), "analysis": analysis, "document_info": document_info, "requirement_check": requirement_check, } @staticmethod def _resolve_attachment_media_type(filename: str, *, fallback: str | None = None) -> str: guessed = mimetypes.guess_type(filename)[0] return str(guessed or fallback or "application/octet-stream") @staticmethod def _is_previewable_media_type(media_type: str | None, filename: str) -> bool: resolved = str(media_type or "").strip() or (mimetypes.guess_type(filename)[0] or "") return resolved.startswith("image/") or resolved == "application/pdf" @staticmethod def _resolve_attachment_display_name(storage_key: str | None) -> str: return Path(str(storage_key or "").strip()).name def _build_attachment_document_info(self, document: Any) -> dict[str, Any]: insight = build_document_insight( filename=str(getattr(document, "filename", "") or ""), summary=str(getattr(document, "summary", "") or ""), text=str(getattr(document, "text", "") or ""), ) raw_fields = list(getattr(document, "document_fields", []) or []) normalized_fields: list[dict[str, str]] = [] for item in raw_fields: key = "" label = "" value = "" if isinstance(item, dict): key = str(item.get("key") or "").strip() label = str(item.get("label") or "").strip() value = str(item.get("value") or "").strip() else: key = str(getattr(item, "key", "") or "").strip() label = str(getattr(item, "label", "") or "").strip() value = str(getattr(item, "value", "") or "").strip() if key and label and value: normalized_fields.append( { "key": key, "label": label, "value": value, } ) if not normalized_fields: normalized_fields = [ { "key": field.key, "label": field.label, "value": field.value, } for field in insight.fields if field.value ] document_type = str(getattr(document, "document_type", "") or "").strip() if document_type in {"", "other"}: document_type = insight.document_type document_type_label = str(getattr(document, "document_type_label", "") or "").strip() if not document_type_label or document_type_label == "其他单据": document_type_label = insight.document_type_label scene_code = str(getattr(document, "scene_code", "") or "").strip() if scene_code in {"", "other"}: scene_code = insight.scene_code scene_label = str(getattr(document, "scene_label", "") or "").strip() if not scene_label or scene_label == "其他票据": scene_label = insight.scene_label return { "document_type": document_type, "document_type_label": document_type_label, "scene_code": scene_code, "scene_label": scene_label, "fields": normalized_fields, } def _build_attachment_requirement_check( self, *, item: ExpenseClaimItem, document_info: dict[str, Any], ) -> dict[str, Any]: expense_type = str(item.item_type or "").strip().lower() or "other" expense_label = self._resolve_expense_type_label(expense_type) allowed_scenes = EXPENSE_TYPE_ALLOWED_DOCUMENT_SCENES.get(expense_type, set()) allowed_scene_labels = [self._resolve_document_scene_label(code) for code in sorted(allowed_scenes)] recognized_scene_code = str(document_info.get("scene_code") or "other").strip() or "other" recognized_scene_label = str( document_info.get("scene_label") or self._resolve_document_scene_label(recognized_scene_code) ).strip() recognized_document_type = str(document_info.get("document_type") or "other").strip() or "other" recognized_document_type_label = str(document_info.get("document_type_label") or "其他单据").strip() or "其他单据" matches = not allowed_scenes or recognized_scene_code in allowed_scenes if matches: if allowed_scene_labels: message = ( f"当前费用项目为{expense_label},已识别为{recognized_document_type_label}," f"符合当前{expense_label}场景的附件要求。" ) else: message = f"当前费用项目为{expense_label},已识别为{recognized_document_type_label}。" else: expected_text = "、".join(label + "相关票据" for label in allowed_scene_labels) or "对应场景票据" message = ( f"当前费用项目为{expense_label},要求上传{expected_text};" f"当前识别为{recognized_document_type_label},不符合当前场景,建议过滤或更换附件。" ) return { "matches": matches, "current_expense_type": expense_type, "current_expense_type_label": expense_label, "allowed_scene_labels": allowed_scene_labels, "recognized_scene_code": recognized_scene_code, "recognized_scene_label": recognized_scene_label, "recognized_document_type": recognized_document_type, "recognized_document_type_label": recognized_document_type_label, "message": message, } @staticmethod def _resolve_document_scene_label(scene_code: str) -> str: normalized = str(scene_code or "").strip().lower() return DOCUMENT_SCENE_LABELS.get(normalized, "其他票据") @staticmethod def _extract_amount_candidates(text: str) -> list[Decimal]: values: list[Decimal] = [] seen: set[Decimal] = set() def append_candidate(raw: str) -> None: compact = str(raw or "").replace(",", ".").strip() if not compact: return try: candidate = Decimal(compact).quantize(Decimal("0.01")) except (InvalidOperation, ValueError): return if candidate in seen: return seen.add(candidate) values.append(candidate) for pattern in ( r"(?:金额|价税合计|合计|小写|实收金额|支付金额|订单金额|总额|票价|房费|餐费)[::\s¥¥]*([0-9]{1,6}(?:[.,][0-9]{1,2})?)", r"[¥¥]\s*([0-9]{1,6}(?:[.,][0-9]{1,2})?)", r"([0-9]{1,6}(?:[.,][0-9]{1,2})?)\s*元", ): for raw in re.findall(pattern, text, flags=re.IGNORECASE): append_candidate(raw) if values: return values for raw in re.findall(r"(? bool: return bool(re.search(r"(20\d{2}[年/\-.]\d{1,2}[月/\-.]\d{1,2}日?)", text)) @staticmethod def _normalize_match_text(text: str) -> str: return re.sub(r"\s+", "", str(text or "")).lower() @staticmethod def _resolve_expense_type_label(expense_type: str | None) -> str: normalized = str(expense_type or "").strip().lower() return EXPENSE_TYPE_LABELS.get(normalized, "其他") @staticmethod def _resolve_allowed_document_scenes(expense_type: str | None) -> set[str]: normalized = str(expense_type or "").strip().lower() return set(EXPENSE_TYPE_ALLOWED_DOCUMENT_SCENES.get(normalized, set())) def _detect_expense_scenes(self, text: str) -> dict[str, list[str]]: normalized = self._normalize_match_text(text) if not normalized: return {} matches: dict[str, list[str]] = {} for scene, keywords in EXPENSE_SCENE_KEYWORDS.items(): matched = [keyword for keyword in keywords if keyword in normalized] if matched: matches[scene] = matched[:3] return matches def _format_scene_labels(self, scene_codes: set[str]) -> str: labels = [self._resolve_expense_type_label(code) for code in scene_codes] unique_labels = list(dict.fromkeys(label for label in labels if label)) return "、".join(unique_labels) if unique_labels else "其他" def _build_purpose_mismatch_point( self, *, item: ExpenseClaimItem, document_scenes: set[str], ) -> str | None: if not document_scenes: return None allowed_scenes = self._resolve_allowed_document_scenes(item.item_type) reason_text = str(item.item_reason or "").strip() reason_scenes = set(self._detect_expense_scenes(reason_text).keys()) document_scene_labels = self._format_scene_labels(document_scenes) if reason_scenes and document_scenes.isdisjoint(reason_scenes): return ( f"用途字段:用户填写用途“{reason_text[:24]}”与票据内容不一致," f"当前附件更像{document_scene_labels}相关材料。" ) if allowed_scenes and document_scenes.isdisjoint(allowed_scenes): expense_label = self._resolve_expense_type_label(item.item_type) return f"用途字段:当前费用项目为{expense_label},但附件内容更像{document_scene_labels}相关票据。" return None def _build_fallback_attachment_analysis( self, *, media_type: str | None, item: ExpenseClaimItem, ) -> dict[str, Any]: return { "severity": "medium", "label": "中风险", "headline": "AI提示:附件已上传,待识别结果", "summary": "附件已成功保存,但当前尚未拿到有效识别结果,建议人工先核对票据内容。", "points": [ f"附件格式:{self._resolve_attachment_media_type('attachment', fallback=media_type)}", f"费用金额:当前明细金额为 {item.item_amount} 元", ], "suggestion": "建议打开附件确认金额、日期和票据类型是否完整,再继续提交审批。", } def _build_failed_ocr_attachment_analysis( self, *, media_type: str | None, error_message: str, item: ExpenseClaimItem, ) -> dict[str, Any]: return { "severity": "medium", "label": "中风险", "headline": "AI提示:附件已上传,但识别失败", "summary": "文件已经保存成功,但本次 AI 识别未完成,因此无法给出完整票据核验结论。", "points": [ f"识别异常:{error_message or 'OCR 服务暂不可用'}", f"费用金额:当前明细金额为 {item.item_amount} 元", f"附件格式:{self._resolve_attachment_media_type('attachment', fallback=media_type)}", ], "suggestion": "建议重新上传更清晰的票据图片,或稍后重试识别后再提交。", } def _build_attachment_analysis( self, *, document: Any, item: ExpenseClaimItem, document_info: dict[str, Any] | None = None, requirement_check: dict[str, Any] | None = None, ) -> dict[str, Any]: warnings = [str(value).strip() for value in list(getattr(document, "warnings", []) or []) if str(value).strip()] text = " ".join( [ str(getattr(document, "summary", "") or "").strip(), str(getattr(document, "text", "") or "").strip(), ] ).strip() compact_text = text.replace(" ", "") avg_score = float(getattr(document, "avg_score", 0.0) or 0.0) line_count = int(getattr(document, "line_count", 0) or 0) document_info = document_info or self._build_attachment_document_info(document) requirement_check = requirement_check or self._build_attachment_requirement_check( item=item, document_info=document_info, ) document_scene_matches = self._detect_expense_scenes(text) purpose_mismatch_point = self._build_purpose_mismatch_point( item=item, document_scenes=set(document_scene_matches.keys()), ) recognized_document_type = str(document_info.get("document_type") or "other").strip().lower() or "other" recognized_document_label = str(document_info.get("document_type_label") or "其他单据").strip() or "其他单据" requirement_matches = bool(requirement_check.get("matches")) has_ticket_keyword = any( keyword in compact_text for keyword in ( "发票", "票据", "增值税", "电子行程单", "购买方", "销售方", "税额", "价税", "票号", "发票代码", "凭证", ) ) amount_candidates = self._extract_amount_candidates(text) item_amount = Decimal(item.item_amount or Decimal("0.00")).quantize(Decimal("0.01")) has_matching_amount = any(abs(candidate - item_amount) <= Decimal("1.00") for candidate in amount_candidates) has_date_text = self._has_date_like_text(text) amount_mismatch = bool(amount_candidates) and item_amount > Decimal("0.00") and not has_matching_amount points: list[str] = [] if warnings: points.append(f"识别提示:{warnings[0]}") if line_count == 0 or not compact_text: points.append("附件内容:未识别到有效文字,当前附件更像普通图片或内容过于模糊。") if recognized_document_type == "other" and not has_ticket_keyword: points.append("票据类型:未识别到发票、票据、电子行程单等关键字,暂无法判断票据类型。") if not amount_candidates: points.append("金额字段:未识别到可用于核对的金额。") elif amount_mismatch: candidate_text = "、".join(str(candidate) for candidate in amount_candidates[:3]) points.append(f"金额字段:附件识别金额 {candidate_text} 元与报销金额 {item_amount} 元不一致。") if not has_date_text: points.append("日期字段:未识别到开票日期或业务发生日期。") if not requirement_matches: points.append(f"附件类型要求:{requirement_check.get('message')}") if purpose_mismatch_point: points.append(purpose_mismatch_point) if avg_score and avg_score < 0.72: points.append(f"识别质量:OCR 置信度偏低({avg_score:.0%}),可能影响票据核验准确性。") issue_count = len(points) if issue_count == 0: return { "severity": "pass", "label": "AI提示符合条件", "headline": "AI提示:附件符合基础校验条件", "summary": "已识别到票据类型和关键字段,且符合当前费用场景的附件要求。", "points": [ f"票据类型:已识别为{recognized_document_label}。", f"附件类型要求:{requirement_check.get('message')}", f"金额字段:已识别到与当前明细接近的金额 {item_amount} 元。", ], "suggestion": "建议继续核对报销分类、费用说明和业务场景是否一致。", } severity = "low" label = "低风险" headline = "AI提示:附件存在轻微待核对项" summary = "当前附件已识别出部分票据要素,但仍建议人工继续复核。" if ( line_count == 0 or not compact_text or (recognized_document_type == "other" and not has_ticket_keyword and issue_count >= 2) or not requirement_matches or (purpose_mismatch_point and amount_mismatch) ): severity = "high" label = "高风险" headline = "AI提示:附件不符合票据校验条件" summary = "当前附件存在明显异常,票据类型与当前费用场景不匹配,或无法作为有效报销材料。" elif purpose_mismatch_point or amount_mismatch or issue_count >= 2 or warnings or (avg_score and avg_score < 0.72): severity = "medium" label = "中风险" headline = "AI提示:附件存在明显待整改项" summary = "当前附件可见部分内容,但金额、用途、日期或附件类型仍有缺失或不一致。" suggestion = { "high": "建议过滤当前不匹配的票据,重新上传符合当前费用场景的清晰原件。", "medium": "建议根据风险点补齐清晰票据,或修正金额、日期、费用说明后再提交。", "low": "建议人工再次核对金额和业务说明,确认后可继续流转。", }[severity] return { "severity": severity, "label": label, "headline": headline, "summary": summary, "points": points, "suggestion": suggestion, } @staticmethod def _serialize_claim(claim: ExpenseClaim) -> dict[str, Any]: return { "id": claim.id, "claim_no": claim.claim_no, "employee_name": claim.employee_name, "department_name": claim.department_name, "project_code": claim.project_code, "expense_type": claim.expense_type, "reason": claim.reason, "location": claim.location, "amount": float(claim.amount), "invoice_count": int(claim.invoice_count or 0), "status": claim.status, "approval_stage": claim.approval_stage, "risk_flags_json": list(claim.risk_flags_json or []), } @staticmethod def _normalize_optional_text(value: str | None, *, fallback: str = "", allow_empty: bool = False) -> str | None: normalized = str(value or "").strip() if normalized: return normalized if allow_empty: return None return fallback @staticmethod def _normalize_sort_datetime(value: datetime | None) -> datetime: if value is None: return datetime.max.replace(tzinfo=UTC) if value.tzinfo is None: return value.replace(tzinfo=UTC) return value @staticmethod def _is_missing_value(value: Any) -> bool: text = str(value or "").strip() if not text: return True compact = text.replace(" ", "") return compact in {"待补充", "暂无", "无", "未知", "处理中"} def _ensure_draft_claim(self, claim: ExpenseClaim) -> None: if str(claim.status or "").strip().lower() != "draft": raise ValueError("只有草稿状态的报销单才允许执行该操作。") def _sync_claim_from_items(self, claim: ExpenseClaim) -> None: if not claim.items: claim.amount = Decimal("0.00") claim.invoice_count = 0 claim.risk_flags_json = self._merge_claim_attachment_risk_flags(claim, []) return ordered_items = sorted( claim.items, key=lambda item: ( item.item_date or date.max, self._normalize_sort_datetime(item.created_at), ), ) primary_item = ordered_items[0] total_amount = sum((item.item_amount for item in ordered_items), Decimal("0.00")) claim.amount = total_amount.quantize(Decimal("0.01")) claim.invoice_count = sum(1 for item in ordered_items if str(item.invoice_id or "").strip()) claim.occurred_at = datetime( primary_item.item_date.year, primary_item.item_date.month, primary_item.item_date.day, tzinfo=UTC, ) claim.expense_type = str(primary_item.item_type or claim.expense_type or "other").strip() or "other" claim.reason = ( self._normalize_optional_text(primary_item.item_reason, fallback=claim.reason or "待补充") or "待补充" ) claim.location = ( self._normalize_optional_text(primary_item.item_location, fallback=claim.location or "待补充") or "待补充" ) claim.risk_flags_json = self._merge_claim_attachment_risk_flags( claim, self._build_claim_attachment_risk_flags(ordered_items), ) if str(claim.status or "").strip().lower() == "draft": claim.approval_stage = "待提交" def _refresh_item_attachment_analysis(self, item: ExpenseClaimItem) -> None: file_path = self._resolve_attachment_path(item.invoice_id) if file_path is None or not file_path.exists(): return metadata = self._read_attachment_meta(file_path) media_type = str(metadata.get("media_type") or self._resolve_attachment_media_type(file_path.name)).strip() ocr_status = str(metadata.get("ocr_status") or "").strip().lower() if ocr_status == "failed": analysis = self._build_failed_ocr_attachment_analysis( media_type=media_type, error_message=str(metadata.get("ocr_error") or ""), item=item, ) elif ocr_status == "recognized" or any( ( str(metadata.get("ocr_text") or "").strip(), str(metadata.get("ocr_summary") or "").strip(), int(metadata.get("ocr_line_count") or 0), list(metadata.get("ocr_warnings") or []), ) ): stored_document_info = metadata.get("document_info") if not isinstance(stored_document_info, dict): stored_document_info = {} document = SimpleNamespace( filename=str(metadata.get("file_name") or file_path.name), text=str(metadata.get("ocr_text") or ""), summary=str(metadata.get("ocr_summary") or ""), avg_score=float(metadata.get("ocr_avg_score") or 0.0), line_count=int(metadata.get("ocr_line_count") or 0), document_type=str(stored_document_info.get("document_type") or ""), document_type_label=str(stored_document_info.get("document_type_label") or ""), scene_code=str(stored_document_info.get("scene_code") or ""), scene_label=str(stored_document_info.get("scene_label") or ""), document_fields=list(stored_document_info.get("fields") or []), warnings=[str(value) for value in list(metadata.get("ocr_warnings") or []) if str(value).strip()], ) document_info = self._build_attachment_document_info(document) requirement_check = self._build_attachment_requirement_check( item=item, document_info=document_info, ) analysis = self._build_attachment_analysis( document=document, item=item, document_info=document_info, requirement_check=requirement_check, ) metadata["document_info"] = document_info metadata["requirement_check"] = requirement_check else: analysis = self._build_fallback_attachment_analysis(media_type=media_type, item=item) metadata["analysis"] = analysis self._write_attachment_meta(file_path, metadata) def _build_claim_attachment_risk_flags(self, ordered_items: list[ExpenseClaimItem]) -> list[dict[str, Any]]: derived_flags: list[dict[str, Any]] = [] for index, item in enumerate(ordered_items, start=1): file_path = self._resolve_attachment_path(item.invoice_id) if file_path is None or not file_path.exists(): continue metadata = self._read_attachment_meta(file_path) analysis = metadata.get("analysis") if not isinstance(analysis, dict): continue severity = str(analysis.get("severity") or "").strip().lower() if severity in {"", "pass", "low"}: continue summary = str(analysis.get("summary") or analysis.get("headline") or "").strip() or "附件存在待核对风险。" label = str(analysis.get("label") or ("高风险" if severity == "high" else "中风险")).strip() derived_flags.append( { "source": "attachment_analysis", "item_id": item.id, "severity": severity, "label": label, "message": f"费用明细第 {index} 条:{summary}", } ) return derived_flags @staticmethod def _merge_claim_attachment_risk_flags( claim: ExpenseClaim, attachment_risk_flags: list[dict[str, Any]], ) -> list[Any]: preserved_flags = [ flag for flag in list(claim.risk_flags_json or []) if not (isinstance(flag, dict) and str(flag.get("source") or "").strip() == "attachment_analysis") ] return preserved_flags + attachment_risk_flags def _validate_claim_for_submission(self, claim: ExpenseClaim) -> list[str]: issues: list[str] = [] claim_location_required = self._is_location_required_expense_type(claim.expense_type) if self._is_missing_value(claim.employee_name): issues.append("申请人未完善") if self._is_missing_value(claim.department_name): issues.append("所属部门未完善") if self._is_missing_value(claim.expense_type): issues.append("报销类型未完善") if self._is_missing_value(claim.reason): issues.append("报销事由未完善") if claim_location_required and self._is_missing_value(claim.location): issues.append("业务地点未完善") if claim.amount is None or claim.amount <= Decimal("0.00"): issues.append("报销金额未完善") if claim.occurred_at is None: issues.append("发生时间未完善") if not claim.items: issues.append("费用明细不能为空") for index, item in enumerate(claim.items, start=1): prefix = f"费用明细第 {index} 条" item_location_required = self._is_location_required_expense_type(item.item_type or claim.expense_type) if item.item_date is None: issues.append(f"{prefix}缺少日期") if self._is_missing_value(item.item_type): issues.append(f"{prefix}缺少费用项目") if self._is_missing_value(item.item_reason): issues.append(f"{prefix}缺少说明") if item_location_required and self._is_missing_value(item.item_location): issues.append(f"{prefix}缺少地点") if item.item_amount is None or item.item_amount <= Decimal("0.00"): issues.append(f"{prefix}缺少金额") if self._is_missing_value(item.invoice_id): issues.append(f"{prefix}缺少票据标识") return issues @staticmethod def _is_location_required_expense_type(expense_type: str | None) -> bool: return str(expense_type or "").strip().lower() in LOCATION_REQUIRED_EXPENSE_TYPES @staticmethod def _has_privileged_claim_access(current_user: CurrentUserContext) -> bool: role_codes = { str(item).strip().lower() for item in current_user.role_codes if str(item).strip() } return bool(role_codes & PRIVILEGED_CLAIM_ROLE_CODES) def _employee_name_is_unique(self, employee: Employee) -> bool: normalized_name = str(employee.name or "").strip() if not normalized_name: return False same_name_count = int( self.db.scalar( select(func.count()).select_from(Employee).where(Employee.name == normalized_name) ) or 0 ) return same_name_count == 1 def _apply_claim_scope(self, stmt: Any, current_user: CurrentUserContext) -> Any: if self._has_privileged_claim_access(current_user): return stmt conditions = [] username = str(current_user.username or "").strip() employee = None if username: employee = self.db.scalar( select(Employee) .where(func.lower(Employee.email) == username.lower()) .limit(1) ) def add_condition(field_name: str, value: str | None) -> None: normalized = str(value or "").strip() if not normalized: return if field_name == "employee_id": conditions.append(ExpenseClaim.employee_id == normalized) return conditions.append(ExpenseClaim.employee_name == normalized) if employee is not None: add_condition("employee_id", employee.id) add_condition("employee_name", employee.email) if self._employee_name_is_unique(employee): add_condition("employee_name", employee.name) else: add_condition("employee_id", username) add_condition("employee_name", username) if not conditions: return stmt.where(ExpenseClaim.id == "__no_visible_claim__") return stmt.where(or_(*conditions)) def _ensure_ready(self) -> None: AgentFoundationService(self.db).ensure_foundation_ready()