From 68a448a551cfdde76dc5a39e6729572426edb246 Mon Sep 17 00:00:00 2001 From: caoxiaozhu Date: Fri, 15 May 2026 06:56:00 +0000 Subject: [PATCH] =?UTF-8?q?feat(server):=20=E4=BC=98=E5=8C=96=E8=B4=B9?= =?UTF-8?q?=E7=94=A8=E6=8A=A5=E9=94=80=E6=9C=8D=E5=8A=A1=EF=BC=8C=E6=94=B9?= =?UTF-8?q?=E8=BF=9B=E6=8A=A5=E9=94=80=E5=8D=95=E5=88=9B=E5=BB=BA=E5=92=8C?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E6=A0=A1=E9=AA=8C=E9=80=BB=E8=BE=91=EF=BC=8C?= =?UTF-8?q?=E5=A2=9E=E5=BC=BA=E5=8D=95=E5=85=83=E6=B5=8B=E8=AF=95=E8=A6=86?= =?UTF-8?q?=E7=9B=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/src/app/services/expense_claims.py | 1040 +++++++++++++++++++- server/tests/test_expense_claim_service.py | 484 +++++++++ 2 files changed, 1496 insertions(+), 28 deletions(-) diff --git a/server/src/app/services/expense_claims.py b/server/src/app/services/expense_claims.py index 2be6061..e673b8b 100644 --- a/server/src/app/services/expense_claims.py +++ b/server/src/app/services/expense_claims.py @@ -6,14 +6,15 @@ import json import mimetypes import re import shutil -from datetime import UTC, date, datetime +from collections import defaultdict +from datetime import UTC, date, datetime, timedelta from decimal import Decimal, InvalidOperation from pathlib import Path from types import SimpleNamespace from typing import Any from urllib.parse import quote -from sqlalchemy import func, or_, select +from sqlalchemy import and_, func, or_, select from sqlalchemy.exc import IntegrityError from sqlalchemy.orm import Session, selectinload @@ -21,11 +22,19 @@ from app.api.deps import CurrentUserContext from app.core.config import get_settings from app.models.employee import Employee from app.models.financial_record import ExpenseClaim, ExpenseClaimItem +from app.models.organization import OrganizationUnit from app.schemas.ontology import OntologyEntity, OntologyParseResult from app.schemas.reimbursement import ExpenseClaimItemCreate, ExpenseClaimItemUpdate from app.services.agent_foundation import AgentFoundationService from app.services.audit import AuditLogService from app.services.document_intelligence import build_document_insight +from app.services.expense_rule_runtime import ( + DEFAULT_SCENE_RULE_ASSET_CODE, + ExpenseRuleRuntimeService, + RuntimeTravelPolicy, + build_default_expense_rule_catalog, + resolve_document_type_label, +) from app.services.ocr import OcrService EXPENSE_TYPE_LABELS = { @@ -42,6 +51,7 @@ EXPENSE_TYPE_LABELS = { } PRIVILEGED_CLAIM_ROLE_CODES = {"finance"} +APPROVAL_VISIBLE_CLAIM_ROLE_CODES = {"manager", "approver"} MAX_DRAFT_CLAIMS_PER_USER = 3 LOCATION_REQUIRED_EXPENSE_TYPES = { "travel", @@ -131,6 +141,129 @@ SYSTEM_GENERATED_REASON_PREFIXES = ( "查看报销草稿", "请解释一下当前这笔报销的合规风险和待补充项", ) +AI_REVIEW_LOOKBACK_DAYS = 90 +AI_REVIEW_REPEAT_RISK_WARNING_COUNT = 1 +AI_REVIEW_REPEAT_RISK_BLOCK_COUNT = 2 +TRAVEL_REVIEW_RELEVANT_EXPENSE_TYPES = {"travel", "hotel", "transport"} +TRAVEL_REVIEW_LONG_DISTANCE_DOCUMENT_TYPES = {"flight_itinerary", "train_ticket"} +TRAVEL_POLICY_CITY_TIERS = { + "北京": "tier_1", + "上海": "tier_1", + "广州": "tier_1", + "深圳": "tier_1", + "杭州": "tier_2", + "南京": "tier_2", + "苏州": "tier_2", + "武汉": "tier_2", + "成都": "tier_2", + "重庆": "tier_2", + "西安": "tier_2", + "天津": "tier_2", + "宁波": "tier_2", + "厦门": "tier_2", + "青岛": "tier_2", + "长沙": "tier_2", + "郑州": "tier_2", + "合肥": "tier_2", + "济南": "tier_2", + "沈阳": "tier_2", + "大连": "tier_2", + "福州": "tier_2", + "昆明": "tier_2", + "海口": "tier_2", + "三亚": "tier_2", + "无锡": "tier_2", + "东莞": "tier_2", + "佛山": "tier_2", +} +TRAVEL_POLICY_CITY_MATCH_ORDER = tuple( + sorted(TRAVEL_POLICY_CITY_TIERS.keys(), key=lambda item: len(item), reverse=True) +) +TRAVEL_POLICY_BAND_LABELS = { + "junior": "P1-P3", + "mid": "P4-P5", + "senior": "P6-P7", + "manager": "M1-M2", + "executive": "M3及以上 / D序列", +} +TRAVEL_POLICY_HOTEL_LIMITS = { + "junior": { + "tier_1": Decimal("450.00"), + "tier_2": Decimal("380.00"), + "tier_3": Decimal("320.00"), + }, + "mid": { + "tier_1": Decimal("550.00"), + "tier_2": Decimal("480.00"), + "tier_3": Decimal("380.00"), + }, + "senior": { + "tier_1": Decimal("700.00"), + "tier_2": Decimal("620.00"), + "tier_3": Decimal("520.00"), + }, + "manager": { + "tier_1": Decimal("900.00"), + "tier_2": Decimal("820.00"), + "tier_3": Decimal("720.00"), + }, + "executive": { + "tier_1": Decimal("1200.00"), + "tier_2": Decimal("1000.00"), + "tier_3": Decimal("900.00"), + }, +} +TRAVEL_POLICY_ALLOWED_TRANSPORT_LEVELS = { + "junior": {"flight": 1, "train": 1}, + "mid": {"flight": 1, "train": 1}, + "senior": {"flight": 2, "train": 2}, + "manager": {"flight": 3, "train": 3}, + "executive": {"flight": 4, "train": 3}, +} +TRAVEL_POLICY_ROUTE_EXCEPTION_KEYWORDS = ( + "中转", + "转机", + "经停", + "改签", + "多地出差", + "多城市", + "多站", + "异地返程", + "异地结束", + "临时变更", + "继续前往", + "第二站", +) +TRAVEL_POLICY_STANDARD_EXCEPTION_KEYWORDS = ( + "超标说明", + "无直达", + "展会高峰", + "会议高峰", + "协议酒店满房", + "客户指定", + "临时改签", + "行程变更", + "红眼航班", + "晚到店", +) +TRAVEL_POLICY_FLIGHT_CLASS_PATTERNS = ( + ("头等舱", 4), + ("公务舱", 3), + ("商务舱", 3), + ("超级经济舱", 2), + ("高端经济舱", 2), + ("明珠经济舱", 2), + ("经济舱", 1), +) +TRAVEL_POLICY_TRAIN_CLASS_PATTERNS = ( + ("商务座", 3), + ("一等座", 2), + ("软卧", 2), + ("二等座", 1), + ("二等卧", 1), + ("硬卧", 1), +) +TRAVEL_POLICY_HOTEL_NIGHT_PATTERN = re.compile(r"(\d+)\s*(?:晚|间夜)") class ExpenseClaimService: @@ -549,9 +682,11 @@ class ExpenseClaimService: raise ValueError("提交前请先补全信息:" + ";".join(missing_fields)) before_json = self._serialize_claim(claim) - claim.status = "submitted" - claim.approval_stage = "AI验审" - claim.submitted_at = datetime.now(UTC) + review_result = self._run_ai_submission_review(claim) + claim.status = str(review_result["status"]) + claim.approval_stage = str(review_result["approval_stage"]) + claim.risk_flags_json = list(review_result["risk_flags"]) + claim.submitted_at = datetime.now(UTC) if claim.status == "submitted" else None self.db.commit() self.db.refresh(claim) @@ -621,8 +756,33 @@ class ExpenseClaimService: "draft_only": False, } + if str(claim.status or "").strip().lower() != "submitted": + review_message = "" + for flag in list(claim.risk_flags_json or []): + if not isinstance(flag, dict): + continue + if str(flag.get("source") or "").strip() != "submission_review": + continue + review_message = str(flag.get("message") or "").strip() + if review_message: + break + return { + "message": review_message or f"报销单 {claim.claim_no} 经 AI验审后转为待补充,请先修正后再提交。", + "submission_blocked": True, + "draft_only": False, + "claim_id": claim.id, + "claim_no": claim.claim_no, + "status": claim.status, + "approval_stage": claim.approval_stage, + "amount": float(claim.amount), + "invoice_count": int(claim.invoice_count or 0), + } + return { - "message": f"报销单 {claim.claim_no} 已提交审批,当前节点为 {claim.approval_stage or '审批中'}。", + "message": ( + f"报销单 {claim.claim_no} 已完成 AI验审," + f"当前节点为 {claim.approval_stage or '审批中'}。" + ), "draft_only": False, "claim_id": claim.id, "claim_no": claim.claim_no, @@ -2019,19 +2179,29 @@ class ExpenseClaimService: document_info: dict[str, Any], ) -> dict[str, Any]: expense_type = str(item.item_type or "").strip().lower() or "other" - expense_label = self._resolve_expense_type_label(expense_type) - allowed_scenes = EXPENSE_TYPE_ALLOWED_DOCUMENT_SCENES.get(expense_type, set()) + policy = self._get_expense_scene_policy(expense_type) + expense_label = policy.label if policy is not None else self._resolve_expense_type_label(expense_type) + allowed_scenes = set(policy.allowed_scene_codes) if policy is not None else set() + allowed_document_types = set(policy.allowed_document_types) if policy is not None else set() allowed_scene_labels = [self._resolve_document_scene_label(code) for code in sorted(allowed_scenes)] + allowed_document_type_labels = [ + resolve_document_type_label(document_type) + for document_type in sorted(allowed_document_types) + ] recognized_scene_code = str(document_info.get("scene_code") or "other").strip() or "other" recognized_scene_label = str( document_info.get("scene_label") or self._resolve_document_scene_label(recognized_scene_code) ).strip() recognized_document_type = str(document_info.get("document_type") or "other").strip() or "other" recognized_document_type_label = str(document_info.get("document_type_label") or "其他单据").strip() or "其他单据" - matches = not allowed_scenes or recognized_scene_code in allowed_scenes + matches = ( + (not allowed_scenes and not allowed_document_types) + or recognized_scene_code in allowed_scenes + or recognized_document_type in allowed_document_types + ) if matches: - if allowed_scene_labels: + if allowed_scene_labels or allowed_document_type_labels: message = ( f"当前费用项目为{expense_label},已识别为{recognized_document_type_label}," f"符合当前{expense_label}场景的附件要求。" @@ -2039,7 +2209,9 @@ class ExpenseClaimService: else: message = f"当前费用项目为{expense_label},已识别为{recognized_document_type_label}。" else: - expected_text = "、".join(label + "相关票据" for label in allowed_scene_labels) or "对应场景票据" + expected_parts = [label + "相关票据" for label in allowed_scene_labels] + expected_parts.extend(allowed_document_type_labels) + expected_text = "、".join(dict.fromkeys(part for part in expected_parts if part)) or "对应场景票据" message = ( f"当前费用项目为{expense_label},要求上传{expected_text};" f"当前识别为{recognized_document_type_label},不符合当前场景,建议过滤或更换附件。" @@ -2050,10 +2222,14 @@ class ExpenseClaimService: "current_expense_type": expense_type, "current_expense_type_label": expense_label, "allowed_scene_labels": allowed_scene_labels, + "allowed_document_type_labels": allowed_document_type_labels, "recognized_scene_code": recognized_scene_code, "recognized_scene_label": recognized_scene_label, "recognized_document_type": recognized_document_type, "recognized_document_type_label": recognized_document_type_label, + "mismatch_severity": policy.attachment_mismatch_severity if policy is not None else "high", + "rule_code": policy.rule_code if policy is not None else DEFAULT_SCENE_RULE_ASSET_CODE, + "rule_name": policy.rule_name if policy is not None else "报销场景提交与附件标准", "message": message, } @@ -2108,10 +2284,10 @@ class ExpenseClaimService: normalized = str(expense_type or "").strip().lower() return EXPENSE_TYPE_LABELS.get(normalized, "其他") - @staticmethod - def _resolve_allowed_document_scenes(expense_type: str | None) -> set[str]: + def _resolve_allowed_document_scenes(self, expense_type: str | None) -> set[str]: normalized = str(expense_type or "").strip().lower() - return set(EXPENSE_TYPE_ALLOWED_DOCUMENT_SCENES.get(normalized, set())) + policy = self._get_expense_scene_policy(normalized) + return set(policy.allowed_scene_codes) if policy is not None else set() def _detect_expense_scenes(self, text: str) -> dict[str, list[str]]: normalized = self._normalize_match_text(text) @@ -2225,6 +2401,7 @@ class ExpenseClaimService: recognized_document_type = str(document_info.get("document_type") or "other").strip().lower() or "other" recognized_document_label = str(document_info.get("document_type_label") or "其他单据").strip() or "其他单据" requirement_matches = bool(requirement_check.get("matches")) + mismatch_severity = str(requirement_check.get("mismatch_severity") or "high").strip().lower() or "high" has_ticket_keyword = any( keyword in compact_text @@ -2293,14 +2470,21 @@ class ExpenseClaimService: line_count == 0 or not compact_text or (recognized_document_type == "other" and not has_ticket_keyword and issue_count >= 2) - or not requirement_matches + or (not requirement_matches and mismatch_severity == "high") or (purpose_mismatch_point and amount_mismatch) ): severity = "high" label = "高风险" headline = "AI提示:附件不符合票据校验条件" summary = "当前附件存在明显异常,票据类型与当前费用场景不匹配,或无法作为有效报销材料。" - elif purpose_mismatch_point or amount_mismatch or issue_count >= 2 or warnings or (avg_score and avg_score < 0.72): + elif ( + purpose_mismatch_point + or amount_mismatch + or issue_count >= 2 + or warnings + or (avg_score and avg_score < 0.72) + or (not requirement_matches and mismatch_severity in {"medium", "low"}) + ): severity = "medium" label = "中风险" headline = "AI提示:附件存在明显待整改项" @@ -2365,8 +2549,743 @@ class ExpenseClaimService: return compact in {"待补充", "暂无", "无", "未知", "处理中"} def _ensure_draft_claim(self, claim: ExpenseClaim) -> None: - if str(claim.status or "").strip().lower() != "draft": - raise ValueError("只有草稿状态的报销单才允许执行该操作。") + normalized_status = str(claim.status or "").strip().lower() + if normalized_status not in {"draft", "supplement"}: + raise ValueError("只有草稿或待补充状态的报销单才允许执行该操作。") + + def _run_ai_submission_review(self, claim: ExpenseClaim) -> dict[str, Any]: + base_flags = list(claim.risk_flags_json or []) + attachment_flags = [ + flag + for flag in base_flags + if isinstance(flag, dict) and str(flag.get("source") or "").strip() == "attachment_analysis" + ] + preserved_flags = [ + flag + for flag in base_flags + if not (isinstance(flag, dict) and str(flag.get("source") or "").strip() == "submission_review") + ] + + review_flags: list[dict[str, Any]] = [] + blocking_reasons: list[str] = [] + + high_attachment_flags = [ + flag + for flag in attachment_flags + if str(flag.get("severity") or "").strip().lower() == "high" + ] + medium_attachment_flags = [ + flag + for flag in attachment_flags + if str(flag.get("severity") or "").strip().lower() == "medium" + ] + if high_attachment_flags: + blocking_reasons.append("存在高风险票据,需先补充或更换附件后再提交。") + review_flags.append( + { + "source": "submission_review", + "severity": "high", + "label": "AI验审拦截", + "message": f"AI验审发现 {len(high_attachment_flags)} 条高风险附件,已退回待补充。", + } + ) + elif medium_attachment_flags: + review_flags.append( + { + "source": "submission_review", + "severity": "medium", + "label": "AI验审提醒", + "message": f"AI验审发现 {len(medium_attachment_flags)} 条中风险附件,已随单流转给审批人复核。", + } + ) + + manager_name = self._resolve_claim_manager_name(claim) + if not manager_name: + blocking_reasons.append("未识别到该员工的直属领导,暂时无法流转到领导审批。") + review_flags.append( + { + "source": "submission_review", + "severity": "high", + "label": "审批链缺失", + "message": "AI验审通过前检查到直属领导缺失,当前无法继续流转审批链。", + } + ) + + historical_risk_count = self._count_recent_risky_claims(claim) + if historical_risk_count >= AI_REVIEW_REPEAT_RISK_BLOCK_COUNT: + review_flags.append( + { + "source": "submission_review", + "severity": "medium", + "label": "历史风险偏高", + "message": ( + f"近 {AI_REVIEW_LOOKBACK_DAYS} 天内该员工已有 {historical_risk_count} 笔带风险标记的报销," + "本次已追加到审批链重点关注。" + ), + } + ) + elif historical_risk_count >= AI_REVIEW_REPEAT_RISK_WARNING_COUNT: + review_flags.append( + { + "source": "submission_review", + "severity": "low", + "label": "历史风险提醒", + "message": ( + f"近 {AI_REVIEW_LOOKBACK_DAYS} 天内该员工已有 {historical_risk_count} 笔带风险标记的报销," + "建议直属领导重点复核。" + ), + } + ) + + travel_review = self._run_travel_policy_review(claim) + blocking_reasons.extend(travel_review["blocking_reasons"]) + review_flags.extend(travel_review["flags"]) + + scene_policy_review = self._run_scene_policy_review(claim) + blocking_reasons.extend(scene_policy_review["blocking_reasons"]) + review_flags.extend(scene_policy_review["flags"]) + + if blocking_reasons: + summary_message = "AI验审未通过:" + ";".join(dict.fromkeys(blocking_reasons)) + review_flags.insert( + 0, + { + "source": "submission_review", + "severity": "high", + "label": "AI验审未通过", + "message": summary_message, + }, + ) + return { + "status": "supplement", + "approval_stage": "待补充", + "risk_flags": preserved_flags + review_flags, + "message": summary_message, + "passed": False, + } + + return { + "status": "submitted", + "approval_stage": "直属领导审批", + "risk_flags": preserved_flags + review_flags, + "message": ( + f"报销单 {claim.claim_no} 已完成 AI验审," + f"现已提交给直属领导 {manager_name or '审批人'} 审批。" + ), + "passed": True, + } + + @staticmethod + def _resolve_claim_manager_name(claim: ExpenseClaim) -> str: + if claim.employee is not None: + if claim.employee.manager is not None and claim.employee.manager.name: + return str(claim.employee.manager.name).strip() + if claim.employee.organization_unit is not None and claim.employee.organization_unit.manager_name: + return str(claim.employee.organization_unit.manager_name).strip() + return "" + + def _count_recent_risky_claims(self, claim: ExpenseClaim) -> int: + filters = [] + if claim.employee_id: + filters.append(ExpenseClaim.employee_id == claim.employee_id) + elif claim.employee_name: + filters.append(ExpenseClaim.employee_name == claim.employee_name) + if not filters: + return 0 + + since = datetime.now(UTC) - timedelta(days=AI_REVIEW_LOOKBACK_DAYS) + stmt = ( + select(ExpenseClaim) + .where(or_(*filters)) + .where(ExpenseClaim.id != claim.id) + .where(ExpenseClaim.occurred_at >= since) + ) + recent_claims = list(self.db.scalars(stmt).all()) + return sum(1 for item in recent_claims if list(item.risk_flags_json or [])) + + def _run_scene_policy_review(self, claim: ExpenseClaim) -> dict[str, list[Any]]: + catalog = self._get_expense_rule_catalog() + flags: list[dict[str, Any]] = [] + blocking_reasons: list[str] = [] + reason_corpus = self._build_scene_reason_corpus(claim) + scene_totals: dict[str, Decimal] = defaultdict(lambda: Decimal("0.00")) + scene_warned: set[str] = set() + + for item in claim.items: + item_type = str(item.item_type or claim.expense_type or "other").strip().lower() or "other" + policy = catalog.get_scene_policy(item_type) + if policy is None: + continue + + scene_totals[item_type] += Decimal(item.item_amount or Decimal("0.00")).quantize(Decimal("0.01")) + + if policy.always_warn and item_type not in scene_warned: + scene_warned.add(item_type) + flags.append( + { + "source": "submission_review", + "severity": "medium", + "label": f"{policy.label}人工重点复核", + "message": policy.always_warn_message or f"{policy.label}默认需要人工重点复核。", + "rule_code": policy.rule_code, + } + ) + + item_limit = policy.item_amount_limit + item_amount = Decimal(item.item_amount or Decimal("0.00")).quantize(Decimal("0.01")) + if item_limit is not None and item_amount > Decimal("0.00"): + exceeded = self._evaluate_amount_limit( + amount=item_amount, + limit_config=item_limit, + reason_text="\n".join( + part + for part in [reason_corpus, str(item.item_reason or "").strip()] + if part + ), + ) + if exceeded is not None: + severity, threshold = exceeded + label = ( + f"{policy.label}金额超标待说明" + if severity == "high" + else f"{policy.label}金额超标提醒" + ) + message = ( + f"{policy.label}当前识别金额为 {item_amount} 元," + f"已超过制度阈值 {threshold} 元。" + ) + if severity == "high": + message += " 当前未识别到例外说明,请先补充原因。" + blocking_reasons.append(f"{policy.label}金额超出制度阈值,且未补充例外说明。") + else: + message += " 已识别到例外说明,请审批人重点复核。" + flags.append( + { + "source": "submission_review", + "severity": severity, + "label": label, + "message": message, + "rule_code": policy.rule_code, + } + ) + + for scene_code, total_amount in scene_totals.items(): + policy = catalog.get_scene_policy(scene_code) + if policy is None or policy.claim_amount_limit is None or total_amount <= Decimal("0.00"): + continue + exceeded = self._evaluate_amount_limit( + amount=total_amount, + limit_config=policy.claim_amount_limit, + reason_text=reason_corpus, + ) + if exceeded is None: + continue + + severity, threshold = exceeded + label = f"{policy.label}合计超标待说明" if severity == "high" else f"{policy.label}合计超标提醒" + message = ( + f"{policy.label}当前合计金额为 {total_amount} 元," + f"已超过制度阈值 {threshold} 元。" + ) + if severity == "high": + message += " 当前未识别到例外说明,请先补充原因。" + blocking_reasons.append(f"{policy.label}合计金额超出制度阈值,且未补充例外说明。") + else: + message += " 已识别到例外说明,请审批人重点复核。" + flags.append( + { + "source": "submission_review", + "severity": severity, + "label": label, + "message": message, + "rule_code": policy.rule_code, + } + ) + + return { + "flags": flags, + "blocking_reasons": list(dict.fromkeys(reason for reason in blocking_reasons if reason)), + } + + @staticmethod + def _evaluate_amount_limit( + *, + amount: Decimal, + limit_config: Any, + reason_text: str, + ) -> tuple[str, Decimal] | None: + block_amount = getattr(limit_config, "block_amount", None) + warn_amount = getattr(limit_config, "warn_amount", None) + exception_keywords = list(getattr(limit_config, "exception_keywords", []) or []) + has_exception = ExpenseClaimService._text_contains_keywords(reason_text, exception_keywords) + + if block_amount is not None and amount > Decimal(block_amount): + return ("medium" if has_exception else "high", Decimal(block_amount)) + if warn_amount is not None and amount > Decimal(warn_amount): + return ("medium", Decimal(warn_amount)) + return None + + def _run_travel_policy_review(self, claim: ExpenseClaim) -> dict[str, list[Any]]: + policy = self._get_expense_rule_catalog().travel_policy + if policy is None: + return {"flags": [], "blocking_reasons": []} + contexts = [ + context + for context in self._build_claim_attachment_contexts(claim) + if self._is_travel_policy_relevant_context(context, policy) + ] + if not contexts: + return {"flags": [], "blocking_reasons": []} + + reason_corpus = self._build_travel_reason_corpus(claim) + has_route_exception = self._text_contains_keywords( + reason_corpus, + policy.route_exception_keywords, + ) + has_standard_exception = self._text_contains_keywords( + reason_corpus, + policy.standard_exception_keywords, + ) + grade_band = self._resolve_travel_policy_band(claim.employee_grade) + band_label = policy.band_labels.get(grade_band or "", str(claim.employee_grade or "").strip() or "当前职级") + + itinerary_segments: list[dict[str, Any]] = [] + itinerary_cities: list[str] = [] + hotel_contexts: list[dict[str, Any]] = [] + flags: list[dict[str, Any]] = [] + blocking_reasons: list[str] = [] + + for context in contexts: + route_segment = self._extract_route_segment(context, policy) + if route_segment and self._is_long_distance_travel_context(context, policy): + itinerary_segments.append( + { + "item": context["item"], + "origin": route_segment[0], + "destination": route_segment[1], + } + ) + itinerary_cities.extend([route_segment[0], route_segment[1]]) + + scene_code = str(context["document_info"].get("scene_code") or "").strip().lower() + document_type = str(context["document_info"].get("document_type") or "").strip().lower() + item_type = str(context["item"].item_type or "").strip().lower() + if "hotel" in {scene_code, document_type, item_type} or document_type == "hotel_invoice": + hotel_contexts.append(context) + + unique_itinerary_cities = list(dict.fromkeys(city for city in itinerary_cities if city)) + expected_destination_city = self._resolve_expected_travel_city( + claim, + contexts, + unique_itinerary_cities, + policy, + ) + + if itinerary_segments: + unique_destinations = list( + dict.fromkeys(segment["destination"] for segment in itinerary_segments if segment["destination"]) + ) + first_origin = str(itinerary_segments[0]["origin"] or "").strip() + last_destination = str(itinerary_segments[-1]["destination"] or "").strip() + + for previous, current in zip(itinerary_segments, itinerary_segments[1:]): + previous_destination = str(previous["destination"] or "").strip() + current_origin = str(current["origin"] or "").strip() + if previous_destination and current_origin and previous_destination != current_origin: + message = ( + f"差旅行程未形成连续链路:上一段到达 {previous_destination}," + f"下一段却从 {current_origin} 出发,请补充中转或改签说明。" + ) + flags.append( + { + "source": "submission_review", + "severity": "high", + "label": "行程闭环异常", + "message": message, + "rule_code": policy.rule_code, + } + ) + blocking_reasons.append("差旅行程未形成连续闭环,请补充中转、改签或异地出发原因。") + break + + if ( + expected_destination_city + and last_destination + and last_destination not in {expected_destination_city, first_origin} + ): + message = ( + f"差旅行程终点识别为 {last_destination}," + f"与申报目的地 {expected_destination_city} 不一致,请补充多地出差或后续行程说明。" + ) + flags.append( + { + "source": "submission_review", + "severity": "high", + "label": "行程终点异常", + "message": message, + "rule_code": policy.rule_code, + } + ) + blocking_reasons.append("差旅行程终点与申报目的地不一致,请补充多地出差说明或补齐后续票据。") + + expected_city_set = { + city + for city in (expected_destination_city, first_origin) + if city + } + extra_destinations = [ + city + for city in unique_destinations + if city and city not in expected_city_set + ] + if extra_destinations and not has_route_exception: + destinations_text = "、".join(extra_destinations[:3]) + flags.append( + { + "source": "submission_review", + "severity": "high", + "label": "多城市行程待说明", + "message": ( + f"检测到本次差旅涉及 {destinations_text} 多个目的地," + "但当前报销事由未说明中转、多地拜访或改签原因。" + ), + "rule_code": policy.rule_code, + } + ) + blocking_reasons.append("检测到多城市差旅行程,但当前未补充中转或多地出差说明。") + + allowed_hotel_cities = { + city + for city in [expected_destination_city, *unique_itinerary_cities] + if city + } + for context in hotel_contexts: + hotel_city = self._extract_hotel_city(context, policy) + if hotel_city and allowed_hotel_cities and hotel_city not in allowed_hotel_cities: + expected_text = "、".join(sorted(allowed_hotel_cities)) + flags.append( + { + "source": "submission_review", + "severity": "high", + "label": "酒店地点异常", + "message": ( + f"酒店票据识别城市为 {hotel_city}," + f"与当前差旅目的地/行程城市 {expected_text} 不一致,请补充异地住宿原因。" + ), + "rule_code": policy.rule_code, + } + ) + blocking_reasons.append("酒店票据地点与差旅目的地不一致,请补充异地住宿原因或更换附件。") + + if grade_band is None: + continue + + baseline_city = hotel_city or expected_destination_city + city_tier = policy.city_tiers.get(str(baseline_city or "").strip(), "tier_3") + cap = Decimal(policy.hotel_limits[grade_band][city_tier]) + night_count = self._extract_hotel_night_count(context) + item_amount = Decimal(context["item"].item_amount or Decimal("0.00")).quantize(Decimal("0.01")) + nightly_amount = (item_amount / Decimal(max(night_count, 1))).quantize(Decimal("0.01")) + + if nightly_amount <= cap: + continue + + city_tier_label = { + "tier_1": "一线城市", + "tier_2": "重点城市", + "tier_3": "其他城市", + }.get(city_tier, "当前城市") + hotel_message = ( + f"{band_label} 职级在{city_tier_label}的住宿标准为 {cap} 元/晚," + f"当前酒店识别金额约 {nightly_amount} 元/晚。" + ) + item_reason = str(context["item"].item_reason or "").strip() + item_has_exception = self._text_contains_keywords(item_reason, policy.standard_exception_keywords) + if has_standard_exception or item_has_exception: + flags.append( + { + "source": "submission_review", + "severity": "medium", + "label": "住宿超标提醒", + "message": hotel_message + " 已识别到补充说明,请直属领导重点复核。", + "rule_code": policy.rule_code, + } + ) + else: + flags.append( + { + "source": "submission_review", + "severity": "high", + "label": "住宿超标待说明", + "message": hotel_message + " 当前未识别到超标说明,请先补充原因。", + "rule_code": policy.rule_code, + } + ) + blocking_reasons.append("住宿金额超出当前职级差标,且未补充超标说明。") + + if grade_band is not None: + for context in contexts: + transport_class = self._detect_transport_class(context, policy) + if transport_class is None: + continue + + transport_kind, class_label, class_level = transport_class + allowed_level = policy.transport_limits.get(grade_band, {}).get(transport_kind) + if allowed_level is None or class_level <= allowed_level: + continue + + item_reason = str(context["item"].item_reason or "").strip() + item_has_exception = self._text_contains_keywords(item_reason, policy.standard_exception_keywords) + message = f"{band_label} 职级当前默认不可报销 {class_label}。" + if has_standard_exception or item_has_exception: + flags.append( + { + "source": "submission_review", + "severity": "medium", + "label": "交通舱位超标提醒", + "message": message + " 已识别到补充说明,请审批人重点复核。", + "rule_code": policy.rule_code, + } + ) + else: + flags.append( + { + "source": "submission_review", + "severity": "high", + "label": "交通舱位超标待说明", + "message": message + " 当前未识别到例外说明,请先补充原因。", + "rule_code": policy.rule_code, + } + ) + blocking_reasons.append("交通舱位或席别超出当前职级差标,且未补充例外说明。") + + return { + "flags": flags, + "blocking_reasons": list(dict.fromkeys(reason for reason in blocking_reasons if reason)), + } + + def _build_claim_attachment_contexts(self, claim: ExpenseClaim) -> list[dict[str, Any]]: + contexts: list[dict[str, Any]] = [] + ordered_items = sorted( + claim.items, + key=lambda item: ( + item.item_date or date.max, + self._normalize_sort_datetime(item.created_at), + ), + ) + for index, item in enumerate(ordered_items, start=1): + file_path = self._resolve_attachment_path(item.invoice_id) + if file_path is None or not file_path.exists(): + continue + + metadata = self._read_attachment_meta(file_path) + document_info = metadata.get("document_info") + contexts.append( + { + "index": index, + "item": item, + "document_info": document_info if isinstance(document_info, dict) else {}, + "ocr_text": str(metadata.get("ocr_text") or ""), + "ocr_summary": str(metadata.get("ocr_summary") or ""), + } + ) + return contexts + + def _is_travel_policy_relevant_context( + self, + context: dict[str, Any], + policy: RuntimeTravelPolicy, + ) -> bool: + item = context.get("item") + document_info = context.get("document_info") or {} + item_type = str(getattr(item, "item_type", "") or "").strip().lower() + scene_code = str(document_info.get("scene_code") or "").strip().lower() + document_type = str(document_info.get("document_type") or "").strip().lower() + return ( + item_type in set(policy.relevant_expense_types) + or scene_code in set(policy.relevant_expense_types) + or document_type in {"hotel_invoice", *set(policy.long_distance_document_types)} + ) + + @staticmethod + def _resolve_document_field_value(document_info: dict[str, Any], key: str) -> str: + normalized_key = str(key or "").strip().lower() + for field in list(document_info.get("fields") or []): + if not isinstance(field, dict): + continue + field_key = str(field.get("key") or "").strip().lower() + if field_key == normalized_key: + return str(field.get("value") or "").strip() + return "" + + @staticmethod + def _text_contains_keywords(text: str, keywords: tuple[str, ...] | list[str]) -> bool: + compact = re.sub(r"\s+", "", str(text or "")) + if not compact: + return False + return any(keyword in compact for keyword in keywords) + + def _build_travel_reason_corpus(self, claim: ExpenseClaim) -> str: + parts = [str(claim.reason or "").strip(), str(claim.location or "").strip()] + for item in claim.items: + parts.append(str(item.item_reason or "").strip()) + parts.append(str(item.item_location or "").strip()) + return "\n".join(part for part in parts if part) + + @staticmethod + def _resolve_travel_policy_band(grade: str | None) -> str | None: + normalized = str(grade or "").strip().upper() + if not normalized: + return None + + p_match = re.search(r"P(\d+)", normalized) + if p_match: + level = int(p_match.group(1)) + if level <= 3: + return "junior" + if level <= 5: + return "mid" + return "senior" + + m_match = re.search(r"M(\d+)", normalized) + if m_match: + level = int(m_match.group(1)) + if level <= 2: + return "manager" + return "executive" + + if normalized.startswith("D"): + return "executive" + return None + + def _resolve_expected_travel_city( + self, + claim: ExpenseClaim, + contexts: list[dict[str, Any]], + itinerary_cities: list[str], + policy: RuntimeTravelPolicy, + ) -> str: + claim_city = self._extract_city_from_text(str(claim.location or ""), policy) + if claim_city: + return claim_city + + for context in contexts: + hotel_city = self._extract_hotel_city(context, policy) + if hotel_city: + return hotel_city + + if len(itinerary_cities) >= 2 and itinerary_cities[1]: + return itinerary_cities[1] + for city in itinerary_cities: + if city: + return city + return "" + + def _extract_route_segment( + self, + context: dict[str, Any], + policy: RuntimeTravelPolicy, + ) -> tuple[str, str] | None: + document_info = context["document_info"] + route_value = self._resolve_document_field_value(document_info, "route") + if not route_value or "-" not in route_value: + return None + + origin_text, destination_text = [segment.strip() for segment in route_value.split("-", 1)] + origin_city = self._extract_city_from_text(origin_text, policy) + destination_city = self._extract_city_from_text(destination_text, policy) + if not origin_city or not destination_city or origin_city == destination_city: + return None + return origin_city, destination_city + + def _extract_hotel_city(self, context: dict[str, Any], policy: RuntimeTravelPolicy) -> str: + document_info = context["document_info"] + item = context["item"] + merchant_name = self._resolve_document_field_value(document_info, "merchant_name") + for candidate in ( + merchant_name, + str(item.item_location or ""), + str(context.get("ocr_summary") or ""), + str(context.get("ocr_text") or ""), + ): + city = self._extract_city_from_text(candidate, policy) + if city: + return city + return "" + + @staticmethod + def _extract_city_from_text(text: str, policy: RuntimeTravelPolicy) -> str: + normalized = str(text or "").strip() + if not normalized: + return "" + city_match_order = sorted(policy.city_tiers.keys(), key=lambda item: len(item), reverse=True) + for city in city_match_order: + if city in normalized: + return city + return "" + + @staticmethod + def _extract_hotel_night_count(context: dict[str, Any]) -> int: + text = " ".join( + [ + str(context.get("ocr_summary") or "").strip(), + str(context.get("ocr_text") or "").strip(), + ] + ).strip() + match = TRAVEL_POLICY_HOTEL_NIGHT_PATTERN.search(text) + if not match: + return 1 + try: + return max(1, int(match.group(1))) + except (TypeError, ValueError): + return 1 + + def _detect_transport_class( + self, + context: dict[str, Any], + policy: RuntimeTravelPolicy, + ) -> tuple[str, str, int] | None: + document_info = context["document_info"] + document_type = str(document_info.get("document_type") or "").strip().lower() + text = " ".join( + [ + str(context.get("ocr_summary") or "").strip(), + str(context.get("ocr_text") or "").strip(), + ] + ).strip() + compact_text = re.sub(r"\s+", "", text) + if not compact_text: + return None + + if document_type == "flight_itinerary": + for config in policy.flight_classes: + label = str(config.keyword or "").strip() + level = int(config.level) + if label in compact_text: + return "flight", label, level + return None + + if document_type == "train_ticket": + for config in policy.train_classes: + label = str(config.keyword or "").strip() + level = int(config.level) + if label in compact_text: + return "train", label, level + return None + + return None + + def _is_long_distance_travel_context( + self, + context: dict[str, Any], + policy: RuntimeTravelPolicy, + ) -> bool: + document_info = context["document_info"] + document_type = str(document_info.get("document_type") or "").strip().lower() + scene_code = str(document_info.get("scene_code") or "").strip().lower() + if document_type in set(policy.long_distance_document_types): + return True + return scene_code == "travel" def _sync_claim_from_items(self, claim: ExpenseClaim) -> None: if not claim.items: @@ -2495,6 +3414,35 @@ class ExpenseClaimService: ) return derived_flags + def _get_expense_rule_catalog(self) -> Any: + cached = getattr(self, "_expense_rule_catalog", None) + if cached is not None: + return cached + + db = getattr(self, "db", None) + if db is None: + catalog = build_default_expense_rule_catalog() + else: + catalog = ExpenseRuleRuntimeService(db).load_catalog() + setattr(self, "_expense_rule_catalog", catalog) + return catalog + + def _get_expense_scene_policy(self, expense_type: str | None) -> Any | None: + return self._get_expense_rule_catalog().get_scene_policy(expense_type) + + def _resolve_min_attachment_count(self, expense_type: str | None) -> int: + policy = self._get_expense_scene_policy(expense_type) + if policy is None: + return 1 + return max(0, int(policy.min_attachment_count or 0)) + + def _build_scene_reason_corpus(self, claim: ExpenseClaim) -> str: + parts = [str(claim.reason or "").strip(), str(claim.location or "").strip()] + for item in claim.items: + parts.append(str(item.item_reason or "").strip()) + parts.append(str(item.item_location or "").strip()) + return "\n".join(part for part in parts if part) + @staticmethod def _merge_claim_attachment_risk_flags( claim: ExpenseClaim, @@ -2510,6 +3458,7 @@ class ExpenseClaimService: def _validate_claim_for_submission(self, claim: ExpenseClaim) -> list[str]: issues: list[str] = [] claim_location_required = self._is_location_required_expense_type(claim.expense_type) + claim_min_attachment_count = self._resolve_min_attachment_count(claim.expense_type) if self._is_missing_value(claim.employee_name): issues.append("申请人未完善") @@ -2525,6 +3474,8 @@ class ExpenseClaimService: issues.append("报销金额未完善") if claim.occurred_at is None: issues.append("发生时间未完善") + if int(claim.invoice_count or 0) < claim_min_attachment_count: + issues.append("票据附件数量不足") if not claim.items: issues.append("费用明细不能为空") @@ -2546,12 +3497,16 @@ class ExpenseClaimService: return issues - @staticmethod def _is_location_required_expense_type(expense_type: str | None) -> bool: - return str(expense_type or "").strip().lower() in LOCATION_REQUIRED_EXPENSE_TYPES + policy = self._get_expense_scene_policy(expense_type) + if policy is None: + return str(expense_type or "").strip().lower() in LOCATION_REQUIRED_EXPENSE_TYPES + return bool(policy.location_required) @staticmethod def _has_privileged_claim_access(current_user: CurrentUserContext) -> bool: + if current_user.is_admin: + return True role_codes = { str(item).strip().lower() for item in current_user.role_codes @@ -2559,6 +3514,24 @@ class ExpenseClaimService: } return bool(role_codes & PRIVILEGED_CLAIM_ROLE_CODES) + @staticmethod + def _normalize_role_codes(current_user: CurrentUserContext) -> set[str]: + return { + str(item).strip().lower() + for item in current_user.role_codes + if str(item).strip() + } + + def _resolve_current_employee(self, current_user: CurrentUserContext) -> Employee | None: + username = str(current_user.username or "").strip() + if not username: + return None + return self.db.scalar( + select(Employee) + .where(func.lower(Employee.email) == username.lower()) + .limit(1) + ) + def _employee_name_is_unique(self, employee: Employee) -> bool: normalized_name = str(employee.name or "").strip() if not normalized_name: @@ -2578,14 +3551,7 @@ class ExpenseClaimService: conditions = [] username = str(current_user.username or "").strip() - - employee = None - if username: - employee = self.db.scalar( - select(Employee) - .where(func.lower(Employee.email) == username.lower()) - .limit(1) - ) + employee = self._resolve_current_employee(current_user) def add_condition(field_name: str, value: str | None) -> None: normalized = str(value or "").strip() @@ -2608,6 +3574,24 @@ class ExpenseClaimService: if not conditions: return stmt.where(ExpenseClaim.id == "__no_visible_claim__") + role_codes = self._normalize_role_codes(current_user) + if role_codes & APPROVAL_VISIBLE_CLAIM_ROLE_CODES: + pending_leader_approval = and_( + ExpenseClaim.status == "submitted", + ExpenseClaim.approval_stage == "直属领导审批", + ) + if employee is not None: + subordinate_ids = select(Employee.id).where(Employee.manager_id == employee.id) + conditions.append(and_(pending_leader_approval, ExpenseClaim.employee_id.in_(subordinate_ids))) + manager_name = str( + employee.name if employee is not None and employee.name else current_user.name or "" + ).strip() + if manager_name: + managed_department_ids = select(OrganizationUnit.id).where(OrganizationUnit.manager_name == manager_name) + managed_department_names = select(OrganizationUnit.name).where(OrganizationUnit.manager_name == manager_name) + conditions.append(and_(pending_leader_approval, ExpenseClaim.department_id.in_(managed_department_ids))) + conditions.append(and_(pending_leader_approval, ExpenseClaim.department_name.in_(managed_department_names))) + return stmt.where(or_(*conditions)) def _ensure_ready(self) -> None: diff --git a/server/tests/test_expense_claim_service.py b/server/tests/test_expense_claim_service.py index 93053f5..474dfae 100644 --- a/server/tests/test_expense_claim_service.py +++ b/server/tests/test_expense_claim_service.py @@ -631,6 +631,409 @@ def test_delete_claim_item_removes_row_and_attachment_files(monkeypatch, tmp_pat assert not attachment_root.exists() +def test_submit_claim_runs_ai_review_and_routes_to_direct_manager() -> None: + current_user = CurrentUserContext( + username="emp-submit@example.com", + name="张三", + role_codes=[], + is_admin=False, + ) + + with build_session() as db: + manager = Employee( + employee_no="E7000", + name="李经理", + email="manager@example.com", + ) + employee = Employee( + employee_no="E7001", + name="张三", + email="emp-submit@example.com", + manager=manager, + ) + claim = build_claim(expense_type="transport", location="上海") + claim.employee = employee + claim.employee_id = employee.id + claim.items[0].invoice_id = "taxi-ticket.png" + db.add_all([manager, employee, claim]) + db.commit() + + submitted = ExpenseClaimService(db).submit_claim(claim.id, current_user) + + assert submitted is not None + assert submitted.status == "submitted" + assert submitted.approval_stage == "直属领导审批" + assert submitted.submitted_at is not None + + +def test_submit_claim_blocks_high_risk_attachment_at_ai_review(monkeypatch, tmp_path) -> None: + current_user = CurrentUserContext( + username="emp-risk@example.com", + name="张三", + role_codes=[], + is_admin=False, + ) + + def fake_recognize( + self, + files: list[tuple[str, bytes, str | None]], + ) -> OcrRecognizeBatchRead: + return OcrRecognizeBatchRead( + total_file_count=1, + success_count=1, + documents=[ + OcrRecognizeDocumentRead( + filename="taxi-note.png", + media_type="image/png", + text="滴滴出行电子发票 金额120元 2026-05-13", + summary="识别到交通出行发票,金额 120 元。", + avg_score=0.97, + line_count=1, + page_count=1, + warnings=[], + ) + ], + ) + + monkeypatch.setattr(OcrService, "recognize_files", fake_recognize) + monkeypatch.setattr(ExpenseClaimService, "_get_attachment_storage_root", lambda self: tmp_path) + + with build_session() as db: + manager = Employee( + employee_no="E7100", + name="李经理", + email="manager2@example.com", + ) + employee = Employee( + employee_no="E7101", + name="张三", + email="emp-risk@example.com", + manager=manager, + ) + claim = build_claim(expense_type="office", location="深圳南山") + claim.employee = employee + claim.employee_id = employee.id + claim.invoice_count = 0 + claim.items[0].invoice_id = None + claim.items[0].item_reason = "办公用品采购" + db.add_all([manager, employee, claim]) + db.commit() + + service = ExpenseClaimService(db) + service.upload_claim_item_attachment( + claim_id=claim.id, + item_id=claim.items[0].id, + filename="taxi-note.png", + content=b"fake-image-bytes", + media_type="image/png", + current_user=current_user, + ) + + submitted = service.submit_claim(claim.id, current_user) + + assert submitted is not None + assert submitted.status == "supplement" + assert submitted.approval_stage == "待补充" + assert submitted.submitted_at is None + assert any( + isinstance(flag, dict) and str(flag.get("source") or "").strip() == "submission_review" + for flag in list(submitted.risk_flags_json or []) + ) + + +def test_submit_claim_blocks_travel_route_mismatch_without_explanation(monkeypatch, tmp_path) -> None: + current_user = CurrentUserContext( + username="emp-travel@example.com", + name="张三", + role_codes=[], + is_admin=False, + ) + + def fake_recognize( + self, + files: list[tuple[str, bytes, str | None]], + ) -> OcrRecognizeBatchRead: + documents: list[OcrRecognizeDocumentRead] = [] + for filename, _, media_type in files: + if filename == "outbound.png": + documents.append( + OcrRecognizeDocumentRead( + filename=filename, + media_type=media_type or "image/png", + text="电子行程单 2026-05-13 经济舱 武汉-上海 金额 480元 航班号 MU5101", + summary="武汉到上海机票", + avg_score=0.98, + line_count=1, + page_count=1, + document_type="flight_itinerary", + document_type_label="机票/航班行程单", + scene_code="travel", + scene_label="差旅票据", + document_fields=[ + {"key": "route", "label": "行程", "value": "武汉-上海"}, + {"key": "amount", "label": "金额", "value": "480元"}, + {"key": "date", "label": "日期", "value": "2026-05-13"}, + ], + warnings=[], + ) + ) + elif filename == "onward.png": + documents.append( + OcrRecognizeDocumentRead( + filename=filename, + media_type=media_type or "image/png", + text="电子行程单 2026-05-14 经济舱 上海-成都 金额 360元 航班号 MU5402", + summary="上海到成都机票", + avg_score=0.98, + line_count=1, + page_count=1, + document_type="flight_itinerary", + document_type_label="机票/航班行程单", + scene_code="travel", + scene_label="差旅票据", + document_fields=[ + {"key": "route", "label": "行程", "value": "上海-成都"}, + {"key": "amount", "label": "金额", "value": "360元"}, + {"key": "date", "label": "日期", "value": "2026-05-14"}, + ], + warnings=[], + ) + ) + return OcrRecognizeBatchRead( + total_file_count=len(files), + success_count=len(documents), + documents=documents, + ) + + monkeypatch.setattr(OcrService, "recognize_files", fake_recognize) + monkeypatch.setattr(ExpenseClaimService, "_get_attachment_storage_root", lambda self: tmp_path) + + with build_session() as db: + manager = Employee( + employee_no="E7200", + name="李经理", + email="manager-travel@example.com", + ) + employee = Employee( + employee_no="E7201", + name="张三", + email="emp-travel@example.com", + grade="P4", + manager=manager, + ) + db.add_all([manager, employee]) + db.flush() + + claim = build_claim(expense_type="travel", location="上海") + claim.reason = "上海客户现场出差" + claim.employee = employee + claim.employee_id = employee.id + claim.items = [ + ExpenseClaimItem( + id="travel-item-1", + claim_id=claim.id, + item_date=date(2026, 5, 13), + item_type="travel", + item_reason="赴上海客户现场", + item_location="上海", + item_amount=Decimal("480.00"), + invoice_id=None, + ), + ExpenseClaimItem( + id="travel-item-2", + claim_id=claim.id, + item_date=date(2026, 5, 14), + item_type="travel", + item_reason="赴上海客户现场", + item_location="上海", + item_amount=Decimal("360.00"), + invoice_id=None, + ), + ] + claim.amount = Decimal("840.00") + claim.invoice_count = 0 + db.add(claim) + db.commit() + + service = ExpenseClaimService(db) + service.upload_claim_item_attachment( + claim_id=claim.id, + item_id="travel-item-1", + filename="outbound.png", + content=b"outbound-image", + media_type="image/png", + current_user=current_user, + ) + service.upload_claim_item_attachment( + claim_id=claim.id, + item_id="travel-item-2", + filename="onward.png", + content=b"onward-image", + media_type="image/png", + current_user=current_user, + ) + + submitted = service.submit_claim(claim.id, current_user) + + assert submitted is not None + assert submitted.status == "supplement" + assert submitted.approval_stage == "待补充" + assert any( + isinstance(flag, dict) + and str(flag.get("source") or "").strip() == "submission_review" + and ( + "多城市" in str(flag.get("message") or "") + or "终点" in str(flag.get("message") or "") + ) + for flag in list(submitted.risk_flags_json or []) + ) + + +def test_submit_claim_blocks_hotel_amount_over_travel_policy_without_explanation(monkeypatch, tmp_path) -> None: + current_user = CurrentUserContext( + username="emp-hotel@example.com", + name="张三", + role_codes=[], + is_admin=False, + ) + + def fake_recognize( + self, + files: list[tuple[str, bytes, str | None]], + ) -> OcrRecognizeBatchRead: + documents: list[OcrRecognizeDocumentRead] = [] + for filename, _, media_type in files: + if filename == "beijing-trip.png": + documents.append( + OcrRecognizeDocumentRead( + filename=filename, + media_type=media_type or "image/png", + text="电子行程单 2026-05-13 经济舱 武汉-北京 金额 520元 航班号 MU6101", + summary="武汉到北京机票", + avg_score=0.97, + line_count=1, + page_count=1, + document_type="flight_itinerary", + document_type_label="机票/航班行程单", + scene_code="travel", + scene_label="差旅票据", + document_fields=[ + {"key": "route", "label": "行程", "value": "武汉-北京"}, + {"key": "amount", "label": "金额", "value": "520元"}, + {"key": "date", "label": "日期", "value": "2026-05-13"}, + ], + warnings=[], + ) + ) + elif filename == "beijing-hotel.png": + documents.append( + OcrRecognizeDocumentRead( + filename=filename, + media_type=media_type or "image/png", + text="北京全季酒店 1晚 金额 880元 2026-05-13", + summary="北京全季酒店住宿发票", + avg_score=0.98, + line_count=1, + page_count=1, + document_type="hotel_invoice", + document_type_label="酒店住宿票据", + scene_code="hotel", + scene_label="住宿票据", + document_fields=[ + {"key": "merchant_name", "label": "商户", "value": "北京全季酒店"}, + {"key": "amount", "label": "金额", "value": "880元"}, + {"key": "date", "label": "日期", "value": "2026-05-13"}, + ], + warnings=[], + ) + ) + return OcrRecognizeBatchRead( + total_file_count=len(files), + success_count=len(documents), + documents=documents, + ) + + monkeypatch.setattr(OcrService, "recognize_files", fake_recognize) + monkeypatch.setattr(ExpenseClaimService, "_get_attachment_storage_root", lambda self: tmp_path) + + with build_session() as db: + manager = Employee( + employee_no="E7300", + name="李经理", + email="manager-hotel@example.com", + ) + employee = Employee( + employee_no="E7301", + name="张三", + email="emp-hotel@example.com", + grade="P4", + manager=manager, + ) + db.add_all([manager, employee]) + db.flush() + + claim = build_claim(expense_type="travel", location="北京") + claim.reason = "北京客户现场出差" + claim.employee = employee + claim.employee_id = employee.id + claim.items = [ + ExpenseClaimItem( + id="hotel-trip-item", + claim_id=claim.id, + item_date=date(2026, 5, 13), + item_type="travel", + item_reason="赴北京客户现场", + item_location="北京", + item_amount=Decimal("520.00"), + invoice_id=None, + ), + ExpenseClaimItem( + id="hotel-item", + claim_id=claim.id, + item_date=date(2026, 5, 13), + item_type="hotel", + item_reason="北京住宿", + item_location="北京", + item_amount=Decimal("880.00"), + invoice_id=None, + ), + ] + claim.amount = Decimal("1400.00") + claim.invoice_count = 0 + db.add(claim) + db.commit() + + service = ExpenseClaimService(db) + service.upload_claim_item_attachment( + claim_id=claim.id, + item_id="hotel-trip-item", + filename="beijing-trip.png", + content=b"travel-image", + media_type="image/png", + current_user=current_user, + ) + service.upload_claim_item_attachment( + claim_id=claim.id, + item_id="hotel-item", + filename="beijing-hotel.png", + content=b"hotel-image", + media_type="image/png", + current_user=current_user, + ) + + submitted = service.submit_claim(claim.id, current_user) + + assert submitted is not None + assert submitted.status == "supplement" + assert submitted.approval_stage == "待补充" + assert any( + isinstance(flag, dict) + and str(flag.get("source") or "").strip() == "submission_review" + and "住宿标准" in str(flag.get("message") or "") + for flag in list(submitted.risk_flags_json or []) + ) + + def test_list_claims_scopes_to_current_user_id_even_when_names_duplicate() -> None: current_user = CurrentUserContext( username="zhangsan1@example.com", @@ -753,3 +1156,84 @@ def test_list_claims_allows_finance_to_view_all_records() -> None: assert len(claims) == 2 assert {claim.claim_no for claim in claims} == {"EXP-FIN-101", "EXP-FIN-102"} + + +def test_list_claims_allows_direct_manager_to_view_pending_claims_for_approval() -> None: + current_user = CurrentUserContext( + username="manager@example.com", + name="李经理", + role_codes=["manager"], + is_admin=False, + ) + + with build_session() as db: + manager = Employee( + employee_no="E8000", + name="李经理", + email="manager@example.com", + ) + employee = Employee( + employee_no="E8001", + name="张三", + email="zhangsan@example.com", + manager=manager, + ) + outsider_manager = Employee( + employee_no="E8002", + name="王经理", + email="other-manager@example.com", + ) + outsider = Employee( + employee_no="E8003", + name="李四", + email="lisi@example.com", + manager=outsider_manager, + ) + db.add_all([manager, employee, outsider_manager, outsider]) + db.flush() + db.add_all( + [ + ExpenseClaim( + claim_no="EXP-MGR-201", + employee_id=employee.id, + employee_name="张三", + department_name="市场部", + project_code="PRJ-MGR", + expense_type="transport", + reason="滴滴报销", + location="上海", + amount=Decimal("66.00"), + currency="CNY", + invoice_count=1, + occurred_at=datetime(2026, 5, 12, 9, 0, tzinfo=UTC), + submitted_at=datetime(2026, 5, 12, 10, 0, tzinfo=UTC), + status="submitted", + approval_stage="直属领导审批", + risk_flags_json=[], + ), + ExpenseClaim( + claim_no="EXP-MGR-202", + employee_id=outsider.id, + employee_name="李四", + department_name="销售部", + project_code="PRJ-OTHER", + expense_type="meal", + reason="客户用餐", + location="杭州", + amount=Decimal("188.00"), + currency="CNY", + invoice_count=1, + occurred_at=datetime(2026, 5, 12, 12, 0, tzinfo=UTC), + submitted_at=datetime(2026, 5, 12, 13, 0, tzinfo=UTC), + status="submitted", + approval_stage="直属领导审批", + risk_flags_json=[], + ), + ] + ) + db.commit() + + claims = ExpenseClaimService(db).list_claims(current_user) + + assert len(claims) == 1 + assert claims[0].claim_no == "EXP-MGR-201"