from __future__ import annotations from app.schemas.ontology import OntologyParseResult RISK_SIGNAL_TO_RULE_CODES: dict[str, list[str]] = { "location_mismatch": ["risk.travel.destination_receipt_location"], "base_location_overlap": ["risk.travel.base_location_overlap"], "intracity_travel": ["risk.travel.intracity_travel_claim"], "multi_city_itinerary": ["risk.travel.multi_city_reason_required"], "hotel_itinerary_mismatch": ["risk.travel.hotel_without_itinerary"], "duplicate_invoice": ["risk.invoice.duplicate_invoice"], "buyer_name_mismatch": ["risk.invoice.claimant_buyer_name_match"], "document_expense_mismatch": ["risk.invoice.document_expense_mismatch"], "cross_year_invoice": ["risk.invoice.cross_year_invoice"], "void_or_red_invoice": ["risk.invoice.void_or_red_invoice"], "vague_goods_description": ["risk.invoice.vague_goods_description"], "entertainment_missing_detail": ["risk.expense.entertainment_missing_detail"], "meal_as_travel": ["risk.expense.meal_localized_as_travel"], "consecutive_transport_receipts": ["risk.expense.consecutive_transport_receipts"], "reason_too_brief": ["risk.expense.reason_too_brief"], } TEXT_SIGNAL_KEYWORDS: dict[str, tuple[str, ...]] = { "location_mismatch": ("地点", "行程", "出差地", "票据地", "城市不一致"), "duplicate_invoice": ("重复", "同一张票", "重复报销", "发票重复"), "buyer_name_mismatch": ("购买方", "抬头", "开票单位"), "document_expense_mismatch": ("附件", "票据", "单据", "材料不一致"), "cross_year_invoice": ("跨年", "以前年度", "去年发票"), "void_or_red_invoice": ("作废", "红冲", "红字"), "vague_goods_description": ("商品名称", "品名", "笼统"), "entertainment_missing_detail": ("招待", "宴请", "陪同", "客户餐"), "meal_as_travel": ("餐费", "差旅餐", "本地餐"), "consecutive_transport_receipts": ("连续交通", "多张车票", "打车"), "reason_too_brief": ("事由", "说明太短", "理由不足"), } def list_all_platform_risk_rule_codes() -> list[str]: return sorted({code for codes in RISK_SIGNAL_TO_RULE_CODES.values() for code in codes}) def resolve_rule_codes_from_ontology(ontology: OntologyParseResult) -> list[str]: resolved: list[str] = [] for signal in ontology.risk_flags: for rule_code in RISK_SIGNAL_TO_RULE_CODES.get(str(signal or "").strip(), []): if rule_code not in resolved: resolved.append(rule_code) return resolved def infer_risk_signals_from_text(text: str) -> list[str]: normalized = str(text or "").strip().lower() if not normalized: return [] signals: list[str] = [] for signal, keywords in TEXT_SIGNAL_KEYWORDS.items(): if any(keyword.lower() in normalized for keyword in keywords): signals.append(signal) return signals def resolve_rule_codes_for_risk_check( ontology: OntologyParseResult, *, query_text: str = "", ) -> list[str]: if ontology.intent != "risk_check": return [] resolved = resolve_rule_codes_from_ontology(ontology) for signal in infer_risk_signals_from_text(query_text): for rule_code in RISK_SIGNAL_TO_RULE_CODES.get(signal, []): if rule_code not in resolved: resolved.append(rule_code) return resolved or list_all_platform_risk_rule_codes()