from __future__ import annotations import json import re from datetime import UTC, datetime, timedelta from sqlalchemy import or_, select from sqlalchemy.orm import Session from app.core.agent_enums import AgentAssetStatus, AgentAssetType from app.models.employee import Employee from app.models.financial_record import ExpenseClaim from app.schemas.agent_asset import AgentAssetListItem from app.schemas.user_agent import ( UserAgentCitation, UserAgentDraftPayload, UserAgentReviewAction, UserAgentReviewEditField, UserAgentReviewClaimGroup, UserAgentReviewDocumentCard, UserAgentReviewDocumentField, UserAgentReviewPayload, UserAgentReviewRiskBrief, UserAgentReviewSlotCard, UserAgentRequest, UserAgentResponse, UserAgentSuggestedAction, ) from app.services.agent_assets import AgentAssetService from app.services.agent_foundation import AgentFoundationService from app.services.runtime_chat import RuntimeChatService SCENARIO_LABELS = { "expense": "报销", "accounts_receivable": "应收", "accounts_payable": "应付", "knowledge": "知识", "unknown": "通用", } RISK_REASON_MAP = { "duplicate_expense": "检测到同员工、同金额或近似单据存在重复提交迹象。", "amount_over_limit": "金额超过当前制度或预算阈值,需要补充例外说明。", "invoice_anomaly": "票据或附件完整性不满足当前规则要求,需要补件或人工复核。", "ar_overdue": "应收账款已出现逾期,存在回款延迟风险。", "ap_overdue": "应付付款已出现逾期,可能影响供应商履约或合作关系。", } GENERIC_EXPENSE_PROMPTS = { "报销", "我要报销", "我想报销", "帮我报销", "我要申请报销", "发起报销", "提交报销", } EXPLICIT_DRAFT_KEYWORDS = ("生成", "草稿", "起草", "创建", "发起", "准备") EXPENSE_TYPE_LABELS = { "travel": "差旅费", "hotel": "住宿费", "transport": "交通费", "meal": "餐费", "meeting": "会务费", "entertainment": "业务招待费", "other": "其他费用", } GROUP_SCENE_LABELS = { "travel": "差旅费", "entertainment": "业务招待费", "meal": "伙食费", "transport": "交通费", "hotel": "住宿费", "other": "其他费用", } SLOT_LABELS = { "expense_type": "报销类型", "customer_name": "客户名称", "time_range": "发生时间", "location": "地点", "merchant_name": "酒店/商户", "amount": "金额", "reason": "事由说明", "participants": "参与人员", "attachments": "票据附件", } DATE_TEXT_PATTERN = re.compile(r"(\d{4}[年/-]\d{1,2}[月/-]\d{1,2}日?)") AMOUNT_TEXT_PATTERN = re.compile(r"(\d+(?:\.\d+)?)\s*(?:元|万元|万)") SOURCE_LABELS = { "user_text": "用户描述", "user_form": "用户修改", "ocr": "票据识别", "upload": "上传附件", "detail_context": "关联单据", "system_context": "系统上下文", "inferred": "语义推断", "system": "系统判断", } class UserAgentService: def __init__(self, db: Session) -> None: self.db = db self.asset_service = AgentAssetService(db) self.runtime_chat_service = RuntimeChatService(db) def respond(self, payload: UserAgentRequest) -> UserAgentResponse: AgentFoundationService(self.db).ensure_foundation_ready() citations = self._build_rule_citations(payload) suggested_actions = self._build_suggested_actions(payload) risk_flags = self._resolve_risk_flags(payload) draft_payload = ( self._build_draft_payload(payload) if payload.ontology.intent == "draft" else None ) review_payload = self._build_review_payload( payload, citations=citations, draft_payload=draft_payload, ) review_answer = self._build_review_body_answer( payload, review_payload=review_payload, draft_payload=draft_payload, ) if payload.degraded and payload.tool_payload.get("message"): return UserAgentResponse( answer=review_answer or str(payload.tool_payload["message"]), citations=citations, suggested_actions=suggested_actions, review_payload=review_payload, risk_flags=risk_flags, requires_confirmation=payload.requires_confirmation, ) if review_answer: return UserAgentResponse( answer=review_answer, citations=citations, suggested_actions=suggested_actions, draft_payload=draft_payload, review_payload=review_payload, risk_flags=risk_flags, requires_confirmation=payload.requires_confirmation, ) guided_answer = None if draft_payload is None or draft_payload.claim_id is None: guided_answer = self._build_guided_answer(payload) if guided_answer: return UserAgentResponse( answer=guided_answer, citations=citations, suggested_actions=suggested_actions, draft_payload=draft_payload, review_payload=review_payload, risk_flags=risk_flags, requires_confirmation=payload.requires_confirmation, ) fallback_answer = self._build_fallback_answer( payload, citations=citations, draft_payload=draft_payload, ) answer = None if not self._should_skip_model_answer(payload, review_payload): answer = self._generate_answer_with_model( payload, citations=citations, suggested_actions=suggested_actions, risk_flags=risk_flags, draft_payload=draft_payload, fallback_answer=fallback_answer, ) return UserAgentResponse( answer=answer or fallback_answer, citations=citations, suggested_actions=suggested_actions, draft_payload=draft_payload, review_payload=review_payload, risk_flags=risk_flags, requires_confirmation=payload.requires_confirmation, ) def _build_fallback_answer( self, payload: UserAgentRequest, *, citations: list[UserAgentCitation], draft_payload: UserAgentDraftPayload | None, ) -> str: if payload.ontology.intent in {"query", "compare"}: return self._build_query_answer(payload) if payload.ontology.intent == "risk_check": return self._build_risk_answer(payload, citations) if payload.ontology.intent == "draft": tool_message = str(payload.tool_payload.get("message") or "").strip() if tool_message and ( str(payload.tool_payload.get("claim_id") or "").strip() or str(payload.tool_payload.get("claim_no") or "").strip() ): return tool_message if payload.ontology.intent == "draft" and draft_payload is not None: return ( f"已生成 {draft_payload.title},当前仅返回待人工确认的草稿内容," "仍需人工确认后再进入正式流程。" ) return self._build_explain_answer(payload, citations) def _build_guided_answer(self, payload: UserAgentRequest) -> str | None: if not self._is_generic_expense_prompt(payload): return self._build_implicit_expense_draft_guidance(payload) attachment_names = self._resolve_attachment_names(payload) ocr_summary = str(payload.context_json.get("ocr_summary") or "").strip() attachment_hint = "" if ocr_summary: attachment_hint = f" 我已读取附件 OCR 摘要:{ocr_summary}" elif attachment_names: attachment_hint = ( f" 我已带入 {len(attachment_names)} 份附件名称,但目前还不能直接读取附件内容," "仍需要你补充关键信息。" ) return ( "可以帮你发起报销。请补充费用类型、发生时间、金额、事由和相关对象," "或者直接上传票据附件,我再继续帮你判断能否报、缺什么材料以及生成报销草稿。" f"{attachment_hint}" ) def _build_implicit_expense_draft_guidance( self, payload: UserAgentRequest, ) -> str | None: if not self._is_implicit_expense_draft_request(payload): return None amount_text = next( (item.value for item in payload.ontology.entities if item.type == "amount"), "", ) expense_type = next( ( EXPENSE_TYPE_LABELS.get(item.normalized_value, item.value) for item in payload.ontology.entities if item.type == "expense_type" ), "报销", ) time_text = payload.ontology.time_range.raw or "本次" amount_hint = f",金额 {amount_text}" if amount_text else "" return ( f"已识别到一笔{time_text}的{expense_type}支出{amount_hint}。" "如果要继续生成报销草稿,还需要补充客户单位、参与人员、费用明细和票据附件。" "你也可以继续上传发票或图片,我会把这些信息带入后续对话。" ) def _generate_answer_with_model( self, payload: UserAgentRequest, *, citations: list[UserAgentCitation], suggested_actions: list[UserAgentSuggestedAction], risk_flags: list[str], draft_payload: UserAgentDraftPayload | None, fallback_answer: str, ) -> str | None: messages = self._build_model_messages( payload, citations=citations, suggested_actions=suggested_actions, risk_flags=risk_flags, draft_payload=draft_payload, fallback_answer=fallback_answer, ) return self._sanitize_model_answer( self.runtime_chat_service.complete( messages, max_tokens=420, temperature=0.2, ) ) def _sanitize_model_answer(self, answer: str | None) -> str | None: if not answer: return None cleaned = re.sub(r".*?", "", answer, flags=re.DOTALL | re.IGNORECASE) cleaned = cleaned.strip() return cleaned or None def _build_model_messages( self, payload: UserAgentRequest, *, citations: list[UserAgentCitation], suggested_actions: list[UserAgentSuggestedAction], risk_flags: list[str], draft_payload: UserAgentDraftPayload | None, fallback_answer: str, ) -> list[dict[str, str]]: facts = { "run_id": payload.run_id, "user_message": payload.message, "ontology": payload.ontology.model_dump(mode="json"), "context": { "entry_source": payload.context_json.get("entry_source"), "user_name": payload.context_json.get("name"), "user_role": payload.context_json.get("role"), "request_context": payload.context_json.get("request_context"), "attachment_count": payload.context_json.get("attachment_count"), "attachment_names": self._resolve_attachment_names(payload), "ocr_summary": payload.context_json.get("ocr_summary", ""), "ocr_documents": payload.context_json.get("ocr_documents", []), "conversation_id": payload.context_json.get("conversation_id"), "conversation_scenario": payload.context_json.get("conversation_scenario"), "conversation_intent": payload.context_json.get("conversation_intent"), "draft_claim_id": payload.context_json.get("draft_claim_id"), "conversation_history": self._resolve_conversation_history(payload), }, "tool_payload": payload.tool_payload, "citations": [item.model_dump(mode="json") for item in citations], "suggested_actions": [ item.model_dump(mode="json") for item in suggested_actions ], "risk_flags": risk_flags, "draft_payload": ( draft_payload.model_dump(mode="json") if draft_payload is not None else None ), "selected_capability_codes": payload.selected_capability_codes, "requires_confirmation": payload.requires_confirmation, "fallback_answer": fallback_answer, } system_prompt = ( "你是企业财务共享场景中的中文智能助手,负责和最终用户直接对话。" "你只能基于提供的事实回答,不能编造制度、流程结果或附件内容。" "如果用户问题很笼统,例如“我要报销”,优先告诉用户你可以协助什么," "并明确要求补充费用类型、金额、时间、事由、参与对象或上传票据。" "如果上下文里只有附件名称,必须明确说明你只拿到了附件名称," "不能假装已看过图片、PDF 或发票内容。" "如果提供了 conversation_history,必须结合最近轮次理解追问、代词、省略字段和补充信息。" "不要声称已经提交、审批、付款、入账或真正执行了任何动作;如果只是建议、草稿或待确认,要明确说清楚。" "若给出了风险标签、制度引用或建议动作,可以简洁吸收进回答,但不要新增未提供的事实。" "只输出最终给用户看的自然语言,不要输出 JSON、Markdown、标题、" " 标签或任何中间推理。" "使用简体中文,控制在 2 到 4 句。" ) user_prompt = ( "请根据以下事实生成最终答复,优先保持准确、具体、可执行:\n" f"{json.dumps(facts, ensure_ascii=False, indent=2)}" ) return [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}, ] def _build_query_answer(self, payload: UserAgentRequest) -> str: scenario = payload.ontology.scenario data = payload.tool_payload subject = self._resolve_subject(payload) if scenario == "expense": record_count = int(data.get("record_count") or 0) total_amount = float(data.get("total_amount") or 0) return ( f"{subject}共命中 {record_count} 笔报销,金额合计 {total_amount:.2f} 元。" "如需继续处理,可以查看明细或生成处理意见草稿。" ) if scenario == "accounts_receivable": record_count = int(data.get("record_count") or 0) outstanding_amount = float(data.get("outstanding_amount") or 0) return ( f"{subject}共命中 {record_count} 条应收,未回款金额 {outstanding_amount:.2f} 元。" "建议结合账龄和客户分布继续排查逾期风险。" ) if scenario == "accounts_payable": record_count = int(data.get("record_count") or 0) outstanding_amount = float(data.get("outstanding_amount") or 0) return ( f"{subject}共命中 {record_count} 条应付,待付金额 {outstanding_amount:.2f} 元。" "如需推进动作,建议先生成付款建议草稿并发起人工确认。" ) return "已完成当前查询,但暂时没有更多结构化结果可展示。" def _build_explain_answer( self, payload: UserAgentRequest, citations: list[UserAgentCitation], ) -> str: if citations: titles = "、".join(item.title for item in citations[:2]) summary = citations[0].excerpt or "请结合制度全文进一步确认。" return f"已检索到相关依据:{titles}。核心说明:{summary}" return ( f"当前还没有与“{SCENARIO_LABELS.get(payload.ontology.scenario, '当前问题')}”" "强匹配的已上线规则引用,建议先人工复核或补充更具体的单据上下文。" ) def _build_risk_answer( self, payload: UserAgentRequest, citations: list[UserAgentCitation], ) -> str: risk_flags = self._resolve_risk_flags(payload) if not risk_flags: return "当前未识别到明确风险标签,建议继续查看原始明细或补充更多上下文。" reasons = [RISK_REASON_MAP.get(flag, f"{flag} 需要人工进一步确认。") for flag in risk_flags] citation_text = ( f" 参考规则:{'、'.join(item.title for item in citations[:2])}。" if citations else "" ) return ( f"本次识别到 {len(risk_flags)} 类风险:{'、'.join(risk_flags)}。" f"触发原因:{';'.join(reasons)}。" "建议先复核明细、附件和审批链,再决定是否继续处理。" f"{citation_text}" ) def _build_draft_payload(self, payload: UserAgentRequest) -> UserAgentDraftPayload: scenario_label = SCENARIO_LABELS.get(payload.ontology.scenario, "业务") subject = self._resolve_subject(payload) claim_no = str(payload.tool_payload.get("claim_no") or "").strip() or None claim_status = str(payload.tool_payload.get("status") or "").strip() or None title = f"{scenario_label}处理意见草稿" if claim_no: title = f"{scenario_label}草稿 {claim_no}" body = ( f"主题:{subject}\n" "结论:已根据当前语义解析结果生成草稿,尚未自动执行。\n" "建议:请先核对明细、规则命中和所需附件,再由人工确认是否提交正式流程。\n" f"原始问题:{payload.message}" ) return UserAgentDraftPayload( draft_type=payload.ontology.scenario, title=title, body=body, confirmation_required=True, claim_id=str(payload.tool_payload.get("claim_id") or "").strip() or None, claim_no=claim_no, status=claim_status, ) def _build_suggested_actions( self, payload: UserAgentRequest, ) -> list[UserAgentSuggestedAction]: if self._is_generic_expense_prompt(payload): return [ UserAgentSuggestedAction( label="上传票据", action_type="ask_clarification", description="上传发票、行程单或付款截图,继续识别报销内容。", ), UserAgentSuggestedAction( label="补充报销信息", action_type="ask_clarification", description="补充费用类型、金额、时间和事由后继续处理。", ), ] if payload.ontology.intent in {"query", "compare"}: return [ UserAgentSuggestedAction( label="查看明细", action_type="open_detail", description="继续查看命中记录和过滤条件。", ), UserAgentSuggestedAction( label="生成处理意见", action_type="create_draft", description="把当前查询结果整理成可确认草稿。", ), ] if payload.ontology.intent == "risk_check": return [ UserAgentSuggestedAction( label="人工复核风险", action_type="manual_review", description="优先检查明细、附件和规则命中原因。", ), UserAgentSuggestedAction( label="生成整改建议", action_type="create_draft", description="把风险说明整理成处理意见草稿。", ), ] if payload.ontology.intent == "draft": return [ UserAgentSuggestedAction( label="复制草稿", action_type="copy_draft", description="复制当前草稿后交由人工确认。", ), UserAgentSuggestedAction( label="补充上下文", action_type="ask_clarification", description="补充单据编号、客户或供应商信息以完善草稿。", ), ] return [ UserAgentSuggestedAction( label="查看规则全文", action_type="open_rule", description="继续查看引用规则或知识内容。", ), UserAgentSuggestedAction( label="补充问题上下文", action_type="ask_clarification", description="补充业务对象、时间或单据范围,提升回答准确度。", ), ] def _build_review_payload( self, payload: UserAgentRequest, *, citations: list[UserAgentCitation], draft_payload: UserAgentDraftPayload | None, ) -> UserAgentReviewPayload | None: attachment_count = self._resolve_attachment_count(payload) ocr_documents = self._resolve_ocr_documents(payload) if payload.ontology.scenario != "expense": return None if payload.ontology.intent not in {"draft", "operate"} and attachment_count <= 0 and not ocr_documents: return None document_cards = self._build_review_document_cards(payload, ocr_documents=ocr_documents) claim_groups = self._build_review_claim_groups( payload, document_cards=document_cards, ) slot_cards = self._build_review_slot_cards( payload, ocr_documents=ocr_documents, claim_groups=claim_groups, ) missing_slot_keys = self._resolve_review_missing_slot_keys( payload, slot_cards=slot_cards, ) risk_briefs = self._build_review_risk_briefs( payload, citations=citations, document_cards=document_cards, claim_groups=claim_groups, ) can_proceed = self._can_proceed_review( payload, missing_slot_keys=missing_slot_keys, claim_groups=claim_groups, ) confirmation_actions = self._build_review_confirmation_actions( payload, can_proceed=can_proceed, claim_groups=claim_groups, draft_payload=draft_payload, ) edit_fields = self._build_review_edit_fields( payload, draft_payload=draft_payload, slot_cards=slot_cards, ) intent_summary = self._build_review_intent_summary( payload, slot_cards=slot_cards, claim_groups=claim_groups, ) body_message = self._build_review_body_message( payload, can_proceed=can_proceed, draft_payload=draft_payload, ) return UserAgentReviewPayload( intent_summary=intent_summary, body_message=body_message, scenario=payload.ontology.scenario, intent=payload.ontology.intent, can_proceed=can_proceed, missing_slots=[SLOT_LABELS.get(key, key) for key in missing_slot_keys], risk_briefs=risk_briefs, slot_cards=slot_cards, document_cards=document_cards, claim_groups=claim_groups, confirmation_actions=confirmation_actions, edit_fields=edit_fields, ) def _build_review_slot_cards( self, payload: UserAgentRequest, *, ocr_documents: list[dict[str, object]], claim_groups: list[UserAgentReviewClaimGroup], ) -> list[UserAgentReviewSlotCard]: entity_map = self._collect_entity_values(payload) time_slot = self._build_time_slot(payload) location_slot = self._build_location_slot(payload) customer_slot = self._build_customer_slot(payload, entity_map=entity_map) participants_slot = self._build_participants_slot(payload, entity_map=entity_map) amount_slot = self._build_amount_slot(payload, entity_map=entity_map, ocr_documents=ocr_documents) expense_type_slot = self._build_expense_type_slot( payload, entity_map=entity_map, ocr_documents=ocr_documents, ) merchant_slot = self._build_merchant_slot(payload, ocr_documents=ocr_documents) reason_slot = self._build_reason_slot(payload) attachment_slot = self._build_attachment_slot(payload) required_keys = self._resolve_required_review_keys( payload, primary_expense_type=str(expense_type_slot["normalized_value"] or ""), claim_groups=claim_groups, ) cards = [ self._make_slot_card( key="expense_type", value=expense_type_slot["value"], raw_value=expense_type_slot["raw_value"], normalized_value=expense_type_slot["normalized_value"], source=expense_type_slot["source"], confidence=expense_type_slot["confidence"], evidence=expense_type_slot["evidence"], required="expense_type" in required_keys, ), self._make_slot_card( key="customer_name", value=customer_slot["value"], raw_value=customer_slot["raw_value"], normalized_value=customer_slot["normalized_value"], source=customer_slot["source"], confidence=customer_slot["confidence"], evidence=customer_slot["evidence"], required="customer_name" in required_keys, ), self._make_slot_card( key="time_range", value=time_slot["value"], raw_value=time_slot["raw_value"], normalized_value=time_slot["normalized_value"], source=time_slot["source"], confidence=time_slot["confidence"], evidence=time_slot["evidence"], required="time_range" in required_keys, ), self._make_slot_card( key="location", value=location_slot["value"], raw_value=location_slot["raw_value"], normalized_value=location_slot["normalized_value"], source=location_slot["source"], confidence=location_slot["confidence"], evidence=location_slot["evidence"], required="location" in required_keys, ), self._make_slot_card( key="merchant_name", value=merchant_slot["value"], raw_value=merchant_slot["raw_value"], normalized_value=merchant_slot["normalized_value"], source=merchant_slot["source"], confidence=merchant_slot["confidence"], evidence=merchant_slot["evidence"], required="merchant_name" in required_keys, ), self._make_slot_card( key="amount", value=amount_slot["value"], raw_value=amount_slot["raw_value"], normalized_value=amount_slot["normalized_value"], source=amount_slot["source"], confidence=amount_slot["confidence"], evidence=amount_slot["evidence"], required="amount" in required_keys, ), self._make_slot_card( key="reason", value=reason_slot["value"], raw_value=reason_slot["raw_value"], normalized_value=reason_slot["normalized_value"], source=reason_slot["source"], confidence=reason_slot["confidence"], evidence=reason_slot["evidence"], required="reason" in required_keys, ), self._make_slot_card( key="participants", value=participants_slot["value"], raw_value=participants_slot["raw_value"], normalized_value=participants_slot["normalized_value"], source=participants_slot["source"], confidence=participants_slot["confidence"], evidence=participants_slot["evidence"], required="participants" in required_keys, ), self._make_slot_card( key="attachments", value=attachment_slot["value"], raw_value=attachment_slot["raw_value"], normalized_value=attachment_slot["normalized_value"], source=attachment_slot["source"], confidence=attachment_slot["confidence"], evidence=attachment_slot["evidence"], required="attachments" in required_keys, ), ] return cards def _build_review_document_cards( self, payload: UserAgentRequest, *, ocr_documents: list[dict[str, object]], ) -> list[UserAgentReviewDocumentCard]: cards: list[UserAgentReviewDocumentCard] = [] for index, item in enumerate(ocr_documents, start=1): classified = self._classify_document(item, payload) fields = self._extract_document_fields(item) cards.append( UserAgentReviewDocumentCard( index=index, filename=str(item.get("filename") or f"document-{index}"), document_type=classified["document_type"], suggested_expense_type=classified["expense_type"], scene_label=GROUP_SCENE_LABELS.get( classified["group_code"], classified["scene_label"], ), summary=str(item.get("summary") or item.get("text") or "").strip(), avg_score=float(item.get("avg_score") or 0.0), warnings=[str(warning) for warning in item.get("warnings", []) if str(warning).strip()], fields=[ UserAgentReviewDocumentField( label=label, value=value, source="ocr", ) for label, value in fields.items() if str(value).strip() ], ) ) return cards def _build_review_claim_groups( self, payload: UserAgentRequest, *, document_cards: list[UserAgentReviewDocumentCard], ) -> list[UserAgentReviewClaimGroup]: groups: dict[str, dict[str, object]] = {} for card in document_cards: group_code = self._normalize_group_code(card.suggested_expense_type) bucket = groups.setdefault( group_code, { "document_indexes": [], "amount_total": 0.0, "expense_type": group_code, "scene_label": GROUP_SCENE_LABELS.get(group_code, "其他费用"), "reasons": [], }, ) bucket["document_indexes"].append(card.index) bucket["amount_total"] = float(bucket["amount_total"]) + self._extract_amount_from_card(card) bucket["reasons"].append(f"{card.filename} 识别为 {card.scene_label}") if not groups: expense_type_code = self._collect_entity_values(payload).get("expense_type_code", "other") group_code = self._normalize_group_code(expense_type_code) groups[group_code] = { "document_indexes": [], "amount_total": self._resolve_amount_value(payload), "expense_type": expense_type_code or "other", "scene_label": GROUP_SCENE_LABELS.get(group_code, "其他费用"), "reasons": ["当前主要依据用户文本和页面上下文进行分单建议。"], } claim_groups: list[UserAgentReviewClaimGroup] = [] for index, (group_code, bucket) in enumerate(groups.items(), start=1): title = f"建议报销单 {index}:{bucket['scene_label']}" rationale = ( ";".join(dict.fromkeys(str(item) for item in bucket["reasons"])) if bucket["reasons"] else "当前仅有单一场景,无需拆单。" ) claim_groups.append( UserAgentReviewClaimGroup( group_code=group_code, title=title, expense_type=str(bucket["expense_type"]), scene_label=str(bucket["scene_label"]), document_indexes=list(bucket["document_indexes"]), amount_total=round(float(bucket["amount_total"]), 2), rationale=rationale, ) ) return claim_groups def _build_review_risk_briefs( self, payload: UserAgentRequest, *, citations: list[UserAgentCitation], document_cards: list[UserAgentReviewDocumentCard], claim_groups: list[UserAgentReviewClaimGroup], ) -> list[UserAgentReviewRiskBrief]: briefs: list[UserAgentReviewRiskBrief] = [] employee_name = self._collect_entity_values(payload).get("employee_name") or str( payload.context_json.get("name") or "" ).strip() if employee_name: since = datetime.now(UTC) - timedelta(days=90) stmt = select(ExpenseClaim).where( ExpenseClaim.employee_name == employee_name, ExpenseClaim.occurred_at >= since, ) recent_claims = list(self.db.scalars(stmt).all()) if recent_claims: risky_count = sum(1 for item in recent_claims if item.risk_flags_json) draft_count = sum(1 for item in recent_claims if item.status == "draft") briefs.append( UserAgentReviewRiskBrief( title="历史报销画像", level="info", content=( f"{employee_name} 最近 90 天共有 {len(recent_claims)} 笔报销," f"其中 {risky_count} 笔带风险标记,{draft_count} 笔仍处于草稿态。" ), ) ) current_amount = self._resolve_amount_value(payload) if current_amount > 0: duplicate_count = sum( 1 for item in recent_claims if abs(float(item.amount) - current_amount) < 0.01 ) if duplicate_count: briefs.append( UserAgentReviewRiskBrief( title="金额重复预警", level="warning", content=( f"近 90 天发现 {duplicate_count} 笔金额相同的报销记录," "提交前建议核对是否为重复报销或拆分不当。" ), ) ) if citations: briefs.append( UserAgentReviewRiskBrief( title="制度注意事项", level="info", content=citations[0].excerpt or f"请先核对 {citations[0].title} 的制度要求。", ) ) warning_count = sum(len(item.warnings) for item in document_cards) if warning_count: briefs.append( UserAgentReviewRiskBrief( title="票据识别提醒", level="warning", content=f"当前共有 {warning_count} 条票据识别提示,建议逐张确认 OCR 识别字段。", ) ) if len(claim_groups) > 1: briefs.append( UserAgentReviewRiskBrief( title="建议拆单", level="high", content=f"系统检测到 {len(claim_groups)} 类费用场景,建议拆成多张报销单后再提交。", ) ) return briefs[:4] def _build_review_confirmation_actions( self, payload: UserAgentRequest, *, can_proceed: bool, claim_groups: list[UserAgentReviewClaimGroup], draft_payload: UserAgentDraftPayload | None, ) -> list[UserAgentReviewAction]: primary_action = UserAgentReviewAction( label="下一步" if can_proceed else "保存草稿", action_type="next_step" if can_proceed else "save_draft", description=( "当前识别信息已满足继续流转条件,确认后进入下一步。" if can_proceed else "当前信息仍未补齐,先保存为草稿,后续可继续补充。" ), emphasis="primary", ) if len(claim_groups) > 1 and can_proceed: primary_action.description = f"系统建议拆分为 {len(claim_groups)} 张报销单,确认后进入下一步。" if draft_payload is not None and draft_payload.claim_no and not can_proceed: primary_action.description = f"会先保存到草稿 {draft_payload.claim_no},缺失信息后续再补。" return [ UserAgentReviewAction( label="取消", action_type="cancel_review", description="放弃当前识别结果,并退出本次核对流程。", emphasis="secondary", ), UserAgentReviewAction( label="修改", action_type="edit_review", description="打开结构化模板,按已识别字段逐项修改。", emphasis="secondary", ), primary_action, ] def _build_review_intent_summary( self, payload: UserAgentRequest, *, slot_cards: list[UserAgentReviewSlotCard], claim_groups: list[UserAgentReviewClaimGroup], ) -> str: slots = {item.key: item for item in slot_cards} expense_type = slots.get("expense_type") amount = slots.get("amount") time_range = slots.get("time_range") location = slots.get("location") customer = slots.get("customer_name") summary = "系统识别出您想要发起一笔报销。" if expense_type and expense_type.value: summary = f"系统识别出您想要报销{expense_type.value}。" details: list[str] = [] if customer and customer.value: details.append(f"客户名称:{customer.value}") if time_range and time_range.value: details.append(f"时间:{time_range.value}") if location and location.value: details.append(f"地点:{location.value}") if amount and amount.value: details.append(f"金额:{amount.value}") if claim_groups and len(claim_groups) > 1: details.append(f"建议拆分为 {len(claim_groups)} 张报销单") if details: return f"{summary} {';'.join(details)}。" return summary def _build_review_body_answer( self, payload: UserAgentRequest, *, review_payload: UserAgentReviewPayload | None, draft_payload: UserAgentDraftPayload | None, ) -> str | None: if review_payload is None: return None if payload.ontology.scenario != "expense": return None if payload.ontology.intent not in {"draft", "operate"}: return None review_action = str(payload.context_json.get("review_action") or "").strip() if review_action == "save_draft": if draft_payload is not None and draft_payload.claim_no: return f"相关识别信息已在右侧展示,请核对。当前已先保存到草稿 {draft_payload.claim_no},缺失信息后续可继续补充。" return "相关识别信息已在右侧展示,请核对。当前信息未补齐,已按你的要求先保存草稿。" if review_action == "next_step": return "相关识别信息已在右侧展示,请核对。当前信息已满足继续流转条件,可进入下一步。" if review_action == "edit_review": return "相关识别信息已在右侧展示,请核对。我已根据你的修改更新识别结果,请继续确认。" return review_payload.body_message or None def _build_review_body_message( self, payload: UserAgentRequest, *, can_proceed: bool, draft_payload: UserAgentDraftPayload | None, ) -> str: if can_proceed: return "相关识别信息已在右侧展示,请核对。确认无误后可点击“下一步”。" if draft_payload is not None and draft_payload.claim_no: return f"相关识别信息已在右侧展示,请核对。当前信息还未补齐,可修改后继续,或先保存到草稿 {draft_payload.claim_no}。" return "相关识别信息已在右侧展示,请核对。当前信息还未补齐,可点击“修改”继续补充,或先“保存草稿”。" @staticmethod def _can_proceed_review( payload: UserAgentRequest, *, missing_slot_keys: list[str], claim_groups: list[UserAgentReviewClaimGroup], ) -> bool: if payload.ontology.ambiguity: return False if missing_slot_keys: return False if not claim_groups: return False return True def _build_review_edit_fields( self, payload: UserAgentRequest, *, draft_payload: UserAgentDraftPayload | None, slot_cards: list[UserAgentReviewSlotCard], ) -> list[UserAgentReviewEditField]: slot_map = {item.key: item for item in slot_cards} employee = self._resolve_employee_profile(payload) reporter_name = ( slot_map.get("reporter_name").value if slot_map.get("reporter_name") else str(payload.context_json.get("name") or "").strip() ) manager_name = self._resolve_manager_name(employee) reason = slot_map.get("reason").value if slot_map.get("reason") else "" attachments = "、".join(self._resolve_attachment_names(payload)) fields = [ UserAgentReviewEditField( key="claim_no", label="报销单据编号", value=str(draft_payload.claim_no if draft_payload is not None and draft_payload.claim_no else "待生成"), placeholder="保存草稿后自动生成", required=False, group="basic", ), UserAgentReviewEditField( key="expense_type", label="报销类型", value=slot_map.get("expense_type").value if slot_map.get("expense_type") else "", placeholder="例如:业务招待费 / 差旅费", group="basic", ), UserAgentReviewEditField( key="occurred_date", label="业务发生时间", value=slot_map.get("time_range").normalized_value if slot_map.get("time_range") and slot_map.get("time_range").normalized_value else slot_map.get("time_range").value if slot_map.get("time_range") else "", placeholder="例如:2026-05-11", group="basic", ), UserAgentReviewEditField( key="reporter_name", label="报销人", value=reporter_name, placeholder="请输入报销人姓名", group="basic", ), UserAgentReviewEditField( key="manager_name", label="直属上司姓名", value=manager_name, placeholder="请输入直属上司姓名", required=False, group="basic", ), UserAgentReviewEditField( key="customer_name", label="客户名称", value=slot_map.get("customer_name").value if slot_map.get("customer_name") else "", placeholder="请输入客户名称", group="business", ), UserAgentReviewEditField( key="business_location", label="业务地点", value=slot_map.get("location").normalized_value if slot_map.get("location") and slot_map.get("location").normalized_value else slot_map.get("location").value if slot_map.get("location") else "", placeholder="例如:北京 / 客户现场", required=False, group="business", ), UserAgentReviewEditField( key="merchant_name", label="酒店/商户", value=slot_map.get("merchant_name").value if slot_map.get("merchant_name") else "", placeholder="请输入酒店或商户名称", required=False, group="business", ), UserAgentReviewEditField( key="amount", label="金额", value=slot_map.get("amount").normalized_value if slot_map.get("amount") and slot_map.get("amount").normalized_value else slot_map.get("amount").value if slot_map.get("amount") else "", placeholder="例如:200.00元", group="business", ), UserAgentReviewEditField( key="participants", label="参与人员", value=slot_map.get("participants").value if slot_map.get("participants") else "", placeholder="例如:客户 2 人,我方 1 人", group="business", ), UserAgentReviewEditField( key="reason", label="事由", value=reason, placeholder="请输入报销事由", field_type="textarea", group="business", ), UserAgentReviewEditField( key="attachment_names", label="附件清单", value=attachments, placeholder="例如:发票.jpg、行程单.png", required=False, field_type="textarea", group="attachments", ), ] return fields def _resolve_employee_profile(self, payload: UserAgentRequest) -> Employee | None: candidates = [ str(payload.context_json.get("name") or "").strip(), str(payload.user_id or "").strip(), self._collect_entity_values(payload).get("employee_name", ""), ] normalized = [item for item in dict.fromkeys(candidates) if item] if not normalized: return None stmt = ( select(Employee) .where( or_( Employee.name.in_(normalized), Employee.employee_no.in_(normalized), Employee.email.in_(normalized), ) ) .limit(1) ) return self.db.scalar(stmt) @staticmethod def _resolve_manager_name(employee: Employee | None) -> str: if employee is None: return "" if employee.manager is not None and employee.manager.name: return employee.manager.name if employee.organization_unit is not None and employee.organization_unit.manager_name: return employee.organization_unit.manager_name return "" @staticmethod def _extract_message_reason(message: str) -> str: for line in str(message or "").splitlines(): cleaned = line.strip() if not cleaned: continue if cleaned.startswith(("附件名称:", "OCR摘要:", "关联单号:")): continue return cleaned[:300] return "" @classmethod def _resolve_reason_text(cls, message: str) -> str: reason = cls._extract_message_reason(message) if not reason: return "" compact = re.sub(r"\s+", "", reason) if compact in GENERIC_EXPENSE_PROMPTS: return "" instruction_prefixes = ( "帮我生成", "请帮我生成", "生成", "起草", "创建", "发起", "准备", "帮我报销", "我要报销", "我想报销", ) if compact.startswith(instruction_prefixes): for separator in (",", ",", "。", ";", ";", ":", ":"): if separator in reason: trailing = reason.split(separator, 1)[1].strip() if trailing: return trailing[:300] return "" return reason @staticmethod def _should_skip_model_answer( payload: UserAgentRequest, review_payload: UserAgentReviewPayload | None, ) -> bool: if review_payload is None: return False return payload.ontology.scenario == "expense" and ( payload.ontology.intent == "draft" or int(payload.context_json.get("attachment_count") or 0) > 0 ) def _build_rule_citations(self, payload: UserAgentRequest) -> list[UserAgentCitation]: domain = self._resolve_domain(payload.ontology.scenario) items = self.asset_service.list_assets( asset_type=AgentAssetType.RULE.value, status=AgentAssetStatus.ACTIVE.value, domain=domain, ) ranked = self._rank_rule_assets(items, payload) citations: list[UserAgentCitation] = [] for item in ranked[:2]: detail = self.asset_service.get_asset(item.id) if detail is None: continue excerpt = self._extract_excerpt(str(detail.current_version_content or "")) citations.append( UserAgentCitation( source_type="rule", code=detail.code, title=detail.name, version=detail.current_version, updated_at=detail.updated_at.date().isoformat(), excerpt=excerpt, ) ) return citations @staticmethod def _resolve_risk_flags(payload: UserAgentRequest) -> list[str]: tool_flags = payload.tool_payload.get("risk_flags") if isinstance(tool_flags, list) and tool_flags: return [str(item) for item in tool_flags] return [str(item) for item in payload.ontology.risk_flags] @staticmethod def _resolve_subject(payload: UserAgentRequest) -> str: named_entities = [ item.value for item in payload.ontology.entities if item.type in {"employee", "customer", "vendor", "project"} ] if named_entities: return f"{'、'.join(named_entities)} 相关数据" return f"{SCENARIO_LABELS.get(payload.ontology.scenario, '当前')}场景数据" @staticmethod def _is_generic_expense_prompt(payload: UserAgentRequest) -> bool: if payload.ontology.scenario != "expense": return False normalized_message = re.sub(r"\s+", "", payload.message) return normalized_message in GENERIC_EXPENSE_PROMPTS @staticmethod def _is_implicit_expense_draft_request(payload: UserAgentRequest) -> bool: if payload.ontology.scenario != "expense" or payload.ontology.intent != "draft": return False compact_message = re.sub(r"\s+", "", payload.message) if any(keyword in compact_message for keyword in EXPLICIT_DRAFT_KEYWORDS): return False return True @staticmethod def _resolve_attachment_names(payload: UserAgentRequest) -> list[str]: names = payload.context_json.get("attachment_names") if not isinstance(names, list): return [] return [str(name) for name in names if str(name).strip()] @staticmethod def _resolve_attachment_count(payload: UserAgentRequest) -> int: names = UserAgentService._resolve_attachment_names(payload) if names: return len(names) try: return max(0, int(payload.context_json.get("attachment_count") or 0)) except (TypeError, ValueError): return 0 @staticmethod def _resolve_ocr_documents(payload: UserAgentRequest) -> list[dict[str, object]]: documents = payload.context_json.get("ocr_documents") if not isinstance(documents, list): return [] normalized: list[dict[str, object]] = [] for item in documents[:8]: if not isinstance(item, dict): continue normalized.append(item) return normalized @staticmethod def _resolve_conversation_history(payload: UserAgentRequest) -> list[dict[str, object]]: history = payload.context_json.get("conversation_history") if not isinstance(history, list): return [] normalized: list[dict[str, object]] = [] for item in history[-8:]: if not isinstance(item, dict): continue role = str(item.get("role") or "").strip() content = str(item.get("content") or "").strip() if not role or not content: continue normalized.append({"role": role, "content": content}) return normalized @staticmethod def _resolve_domain(scenario: str) -> str | None: if scenario == "expense": return "expense" if scenario == "accounts_receivable": return "ar" if scenario == "accounts_payable": return "ap" return None @staticmethod def _rank_rule_assets( items: list[AgentAssetListItem], payload: UserAgentRequest, ) -> list[AgentAssetListItem]: def score(item: AgentAssetListItem) -> tuple[int, str]: tags = {str(value) for value in item.scenario_json or []} weight = 0 if payload.ontology.scenario in tags: weight += 3 if payload.ontology.intent in tags: weight += 2 for risk_flag in payload.ontology.risk_flags: if risk_flag in tags: weight += 4 return weight, item.code ranked = sorted(items, key=score, reverse=True) return [item for item in ranked if score(item)[0] > 0] @staticmethod def _extract_excerpt(content: str) -> str: lines = [line.strip() for line in str(content).splitlines() if line.strip()] cleaned: list[str] = [] for line in lines: normalized = re.sub(r"^[#>\-\*\d\.\s`]+", "", line).strip() if normalized: cleaned.append(normalized) if len(cleaned) >= 2: break return ";".join(cleaned[:2]) def _collect_entity_values(self, payload: UserAgentRequest) -> dict[str, str]: values = { "employee_name": "", "customer": "", "participants": "", "amount": "", "expense_type": "", "expense_type_code": "", } participants: list[str] = [] for item in payload.ontology.entities: if item.type == "employee" and not values["employee_name"]: values["employee_name"] = item.value elif item.type == "customer" and not values["customer"]: values["customer"] = item.value elif item.type == "amount" and item.role != "threshold" and not values["amount"]: values["amount"] = f"{item.value}元" if "元" not in item.value else item.value elif item.type == "expense_type" and not values["expense_type_code"]: values["expense_type_code"] = item.normalized_value values["expense_type"] = EXPENSE_TYPE_LABELS.get( item.normalized_value, item.value, ) elif item.type in {"participant", "person"} and item.value.strip(): participants.append(item.value.strip()) if participants: values["participants"] = "、".join(dict.fromkeys(participants)) return values def _format_time_range(self, payload: UserAgentRequest) -> str: time_range = payload.ontology.time_range if time_range.start_date and time_range.end_date: if time_range.start_date == time_range.end_date: if time_range.raw and time_range.raw != time_range.start_date: return f"{time_range.start_date}(原文:{time_range.raw})" return time_range.start_date normalized = f"{time_range.start_date} 至 {time_range.end_date}" if time_range.raw and time_range.raw != normalized: return f"{normalized}(原文:{time_range.raw})" return normalized if time_range.raw: return time_range.raw return "" def _resolve_location_value(self, payload: UserAgentRequest) -> str: review_form_values = self._resolve_review_form_values(payload) for key in ("business_location", "location"): value = str(review_form_values.get(key) or "").strip() if value: return value if str(payload.context_json.get("entry_source") or "").strip() == "detail": request_context = payload.context_json.get("request_context") if isinstance(request_context, dict): for key in ("city", "location"): value = str(request_context.get(key) or "").strip() if value: return value labeled_match = re.search(r"(?:业务地点|发生地点|地点)[::]\s*(?P[^\n,。;]+)", payload.message) if labeled_match: return labeled_match.group("value").strip() city_match = re.search(r"去(?P[\u4e00-\u9fa5]{2,8})(?:出差|拜访|参会|见客户|客户现场)", payload.message) if city_match: return city_match.group("city").strip() if "客户现场" in payload.message.replace(" ", ""): return "客户现场" return "" @staticmethod def _resolve_review_form_values(payload: UserAgentRequest) -> dict[str, str]: values = payload.context_json.get("review_form_values") if not isinstance(values, dict): return {} normalized: dict[str, str] = {} for key, value in values.items(): cleaned_key = str(key or "").strip() if not cleaned_key: continue normalized[cleaned_key] = str(value or "").strip() return normalized @staticmethod def _build_slot_value( *, value: str = "", raw_value: str = "", normalized_value: str = "", source: str = "system", confidence: float = 0.0, evidence: str = "", ) -> dict[str, str | float]: return { "value": str(value or "").strip(), "raw_value": str(raw_value or "").strip(), "normalized_value": str(normalized_value or "").strip(), "source": str(source or "system").strip() or "system", "confidence": float(confidence), "evidence": str(evidence or "").strip(), } def _build_time_slot(self, payload: UserAgentRequest) -> dict[str, str | float]: review_form_values = self._resolve_review_form_values(payload) edited_value = str( review_form_values.get("occurred_date") or review_form_values.get("time_range") or review_form_values.get("business_time") or "" ).strip() if edited_value: raw_value = str(review_form_values.get("time_range_raw") or edited_value).strip() return self._build_slot_value( value=edited_value if raw_value == edited_value else f"{edited_value}(原文:{raw_value})", raw_value=raw_value, normalized_value=edited_value, source="user_form", confidence=1.0, evidence="来源于用户修改后的结构化表单。", ) time_range = payload.ontology.time_range if time_range.start_date and time_range.end_date: normalized_value = ( time_range.start_date if time_range.start_date == time_range.end_date else f"{time_range.start_date} 至 {time_range.end_date}" ) raw_value = str(time_range.raw or "").strip() value = normalized_value if not raw_value or raw_value == normalized_value else f"{normalized_value}(原文:{raw_value})" return self._build_slot_value( value=value, raw_value=raw_value, normalized_value=normalized_value, source="user_text", confidence=0.92, evidence="系统已根据当前日期将相对时间换算为标准日期。", ) return self._build_slot_value() def _build_location_slot(self, payload: UserAgentRequest) -> dict[str, str | float]: review_form_values = self._resolve_review_form_values(payload) for key in ("business_location", "location"): value = str(review_form_values.get(key) or "").strip() if value: return self._build_slot_value( value=value, normalized_value=value, source="user_form", confidence=1.0, evidence="来源于用户修改后的结构化表单。", ) if str(payload.context_json.get("entry_source") or "").strip() == "detail": request_context = payload.context_json.get("request_context") if isinstance(request_context, dict): for key in ("city", "location"): value = str(request_context.get(key) or "").strip() if value: return self._build_slot_value( value=value, normalized_value=value, source="detail_context", confidence=0.68, evidence="来源于当前关联单据,仅作为辅助上下文,需要用户再次核对。", ) value = self._resolve_location_value(payload) if value: evidence = "用户在文本中明确描述了业务地点。" if value == "客户现场": evidence = "用户明确提到“客户现场”,但未提供具体城市或地址。" return self._build_slot_value( value=value, normalized_value=value, source="user_text", confidence=0.82, evidence=evidence, ) return self._build_slot_value() def _build_customer_slot( self, payload: UserAgentRequest, *, entity_map: dict[str, str], ) -> dict[str, str | float]: review_form_values = self._resolve_review_form_values(payload) value = str(review_form_values.get("customer_name") or "").strip() if value: return self._build_slot_value( value=value, normalized_value=value, source="user_form", confidence=1.0, evidence="来源于用户修改后的结构化表单。", ) value = entity_map.get("customer", "") if value: return self._build_slot_value( value=value, normalized_value=value, source="user_text", confidence=0.88, evidence="用户在原始描述中直接提到了客户对象。", ) return self._build_slot_value() def _build_participants_slot( self, payload: UserAgentRequest, *, entity_map: dict[str, str], ) -> dict[str, str | float]: review_form_values = self._resolve_review_form_values(payload) value = str(review_form_values.get("participants") or "").strip() if value: return self._build_slot_value( value=value, normalized_value=value, source="user_form", confidence=1.0, evidence="来源于用户修改后的结构化表单。", ) value = entity_map.get("participants", "") if value: return self._build_slot_value( value=value, normalized_value=value, source="user_text", confidence=0.8, evidence="用户在当前描述中补充了参与人员。", ) return self._build_slot_value() def _build_reason_slot(self, payload: UserAgentRequest) -> dict[str, str | float]: review_form_values = self._resolve_review_form_values(payload) edited_value = str(review_form_values.get("reason") or "").strip() if edited_value: return self._build_slot_value( value=edited_value, raw_value=edited_value, normalized_value=edited_value, source="user_form", confidence=1.0, evidence="来源于用户修改后的结构化表单。", ) reason_value = self._resolve_reason_text(payload.message) if reason_value: return self._build_slot_value( value=reason_value, raw_value=reason_value, normalized_value=reason_value, source="user_text", confidence=0.76, evidence="系统从用户原始描述中提取了本次费用事由,建议继续核对。", ) return self._build_slot_value() def _build_amount_slot( self, payload: UserAgentRequest, *, entity_map: dict[str, str], ocr_documents: list[dict[str, object]], ) -> dict[str, str | float]: review_form_values = self._resolve_review_form_values(payload) edited_amount = str(review_form_values.get("amount") or "").strip() if edited_amount: normalized = self._normalize_amount_text(edited_amount) return self._build_slot_value( value=normalized, raw_value=edited_amount, normalized_value=normalized, source="user_form", confidence=1.0, evidence="来源于用户修改后的结构化表单。", ) amount_value = entity_map.get("amount", "") if amount_value: normalized = self._normalize_amount_text(amount_value) return self._build_slot_value( value=normalized, raw_value=amount_value, normalized_value=normalized, source="user_text", confidence=0.92, evidence="用户在原始描述中直接给出了金额。", ) ocr_total_amount = self._sum_ocr_amounts(ocr_documents) if ocr_total_amount > 0: normalized = f"{ocr_total_amount:.2f}元" return self._build_slot_value( value=normalized, normalized_value=normalized, source="ocr", confidence=0.76, evidence="金额来自 OCR 汇总结果,仍建议用户核对票据原文。", ) return self._build_slot_value() def _build_expense_type_slot( self, payload: UserAgentRequest, *, entity_map: dict[str, str], ocr_documents: list[dict[str, object]], ) -> dict[str, str | float]: review_form_values = self._resolve_review_form_values(payload) edited_value = str(review_form_values.get("expense_type") or review_form_values.get("reimbursement_type") or "").strip() if edited_value: normalized_code, normalized_label = self._normalize_expense_type_input(edited_value) return self._build_slot_value( value=normalized_label, raw_value=edited_value, normalized_value=normalized_code, source="user_form", confidence=1.0, evidence="来源于用户修改后的结构化表单。", ) expense_type_code = entity_map.get("expense_type_code", "") expense_type_value = EXPENSE_TYPE_LABELS.get(expense_type_code, entity_map.get("expense_type", "")) if expense_type_value: return self._build_slot_value( value=expense_type_value, raw_value=expense_type_value, normalized_value=expense_type_code, source="user_text", confidence=0.9, evidence="系统根据用户描述中的业务场景判断费用类型。", ) inferred_label = self._infer_expense_type_from_documents(payload, ocr_documents) if ocr_documents else "" if inferred_label: normalized_code, normalized_label = self._normalize_expense_type_input(inferred_label) return self._build_slot_value( value=normalized_label, raw_value=inferred_label, normalized_value=normalized_code, source="ocr", confidence=0.74, evidence="系统根据票据内容推断费用类型,仍建议用户确认。", ) return self._build_slot_value() def _build_merchant_slot( self, payload: UserAgentRequest, *, ocr_documents: list[dict[str, object]], ) -> dict[str, str | float]: review_form_values = self._resolve_review_form_values(payload) edited_value = str(review_form_values.get("merchant_name") or "").strip() if edited_value: return self._build_slot_value( value=edited_value, normalized_value=edited_value, source="user_form", confidence=1.0, evidence="来源于用户修改后的结构化表单。", ) merchant_value = self._extract_document_merchant_name(ocr_documents[0]) if ocr_documents else "" if merchant_value: return self._build_slot_value( value=merchant_value, normalized_value=merchant_value, source="ocr", confidence=0.72, evidence="商户名称来自 OCR 票据识别结果,仍建议用户核对。", ) return self._build_slot_value() def _build_attachment_slot(self, payload: UserAgentRequest) -> dict[str, str | float]: review_form_values = self._resolve_review_form_values(payload) attachment_names = str(review_form_values.get("attachment_names") or "").strip() if attachment_names: return self._build_slot_value( value=attachment_names, normalized_value=attachment_names, source="user_form", confidence=1.0, evidence="来源于用户修改后的结构化表单。", ) count = self._resolve_attachment_count(payload) if count > 0: names = self._resolve_attachment_names(payload) value = "、".join(names) if names else f"{count} 份附件" return self._build_slot_value( value=value, raw_value=value, normalized_value=str(count), source="upload", confidence=1.0, evidence="系统已接收到用户上传的附件。", ) return self._build_slot_value() @staticmethod def _normalize_amount_text(value: str) -> str: cleaned = str(value or "").strip() if not cleaned: return "" match = AMOUNT_TEXT_PATTERN.search(cleaned) if not match: return cleaned number = float(match.group(1)) return f"{number:.2f}元" @staticmethod def _normalize_expense_type_input(value: str) -> tuple[str, str]: compact = str(value or "").replace(" ", "") if "招待" in compact or ("客户" in compact and any(keyword in compact for keyword in ("吃饭", "用餐", "宴请", "请客"))): return "entertainment", "业务招待费" if any(keyword in compact for keyword in ("差旅", "出差", "机票", "行程")): return "travel", "差旅费" if any(keyword in compact for keyword in ("住宿", "酒店", "宾馆")): return "hotel", "住宿费" if any(keyword in compact for keyword in ("交通", "打车", "网约车", "出租车", "车费", "停车")): return "transport", "交通费" if any(keyword in compact for keyword in ("餐费", "用餐", "午餐", "晚餐", "早餐", "伙食")): return "meal", "餐费" if "会务" in compact: return "meeting", "会务费" return "other", str(value or "").strip() or "其他费用" def _resolve_required_review_keys( self, payload: UserAgentRequest, *, primary_expense_type: str, claim_groups: list[UserAgentReviewClaimGroup], ) -> set[str]: required = {"expense_type", "time_range", "amount", "reason", "attachments"} scene_codes = { str(item.group_code or "").strip() for item in claim_groups if str(item.group_code or "").strip() } if primary_expense_type: scene_codes.add(primary_expense_type) compact_message = re.sub(r"\s+", "", payload.message) if "entertainment" in scene_codes or ( "客户" in compact_message and any(keyword in compact_message for keyword in ("招待", "吃饭", "用餐", "宴请", "请客")) ): required.update({"customer_name", "participants"}) return required @staticmethod def _resolve_review_missing_slot_keys( payload: UserAgentRequest, *, slot_cards: list[UserAgentReviewSlotCard], ) -> list[str]: required_keys = {item.key for item in slot_cards if item.required} missing_keys = { item.key for item in slot_cards if item.required and (item.status == "missing" or not str(item.value).strip()) } for key in payload.ontology.missing_slots: normalized_key = str(key or "").strip() if normalized_key and normalized_key in required_keys: missing_keys.add(normalized_key) ordered_keys: list[str] = [] for item in slot_cards: if item.required and item.key in missing_keys and item.key not in ordered_keys: ordered_keys.append(item.key) return ordered_keys def _make_slot_card( self, *, key: str, value: str, raw_value: str, normalized_value: str, source: str, confidence: float, evidence: str, required: bool = True, ) -> UserAgentReviewSlotCard: is_missing = required and not str(value).strip() source_key = source if source in SOURCE_LABELS else "system" return UserAgentReviewSlotCard( key=key, label=SLOT_LABELS.get(key, key), value=str(value or "").strip(), raw_value=str(raw_value or "").strip(), normalized_value=str(normalized_value or "").strip(), source=source, source_label=SOURCE_LABELS.get(source_key, "系统判断"), confidence=confidence, required=required, confirmed=not is_missing and source in {"user_text", "user_form"}, status="missing" if is_missing else "identified" if source in {"user_text", "user_form"} else "inferred", hint=f"建议补充 {SLOT_LABELS.get(key, key)}。" if is_missing and required else ("该字段来自系统辅助上下文,建议你再核对一次。" if source in {"detail_context", "ocr"} else ""), evidence=evidence, ) def _classify_document( self, item: dict[str, object], payload: UserAgentRequest, ) -> dict[str, str]: text = " ".join( [ str(item.get("filename") or ""), str(item.get("summary") or ""), str(item.get("text") or ""), ] ).lower() compact = text.replace(" ", "") expense_type_code = self._collect_entity_values(payload).get("expense_type_code", "") has_customer = bool(self._collect_entity_values(payload).get("customer")) if any(keyword in compact for keyword in ("机票", "航班", "火车", "高铁", "行程单")): return { "document_type": "travel_ticket", "expense_type": "travel", "group_code": "travel", "scene_label": "差旅票据", } if any(keyword in compact for keyword in ("酒店", "住宿", "宾馆")): return { "document_type": "hotel_invoice", "expense_type": "hotel", "group_code": "travel", "scene_label": "住宿票据", } if any(keyword in compact for keyword in ("打车", "出租车", "滴滴", "网约车", "过路费", "停车")): return { "document_type": "transport_receipt", "expense_type": "transport", "group_code": "travel", "scene_label": "交通票据", } if any(keyword in compact for keyword in ("餐", "饭店", "酒楼", "酒家", "餐饮", "meal")): group_code = "entertainment" if expense_type_code == "entertainment" or has_customer else "meal" return { "document_type": "meal_receipt", "expense_type": group_code, "group_code": group_code, "scene_label": "餐饮票据", } return { "document_type": "other", "expense_type": expense_type_code or "other", "group_code": self._normalize_group_code(expense_type_code or "other"), "scene_label": "其他票据", } @staticmethod def _normalize_group_code(expense_type_code: str) -> str: if expense_type_code in {"travel", "hotel", "transport"}: return "travel" if expense_type_code in {"entertainment", "meal"}: return expense_type_code return "other" def _extract_document_fields(self, item: dict[str, object]) -> dict[str, str]: text = " ".join([str(item.get("summary") or ""), str(item.get("text") or "")]).strip() fields: dict[str, str] = {} amount_match = AMOUNT_TEXT_PATTERN.search(text) if amount_match: fields["金额"] = f"{amount_match.group(1)}元" date_match = DATE_TEXT_PATTERN.search(text) if date_match: fields["时间"] = date_match.group(1) merchant = self._extract_document_merchant_name(item) if merchant: fields["商户/酒店"] = merchant return fields @staticmethod def _extract_document_merchant_name(item: dict[str, object]) -> str: text = " ".join([str(item.get("summary") or ""), str(item.get("text") or "")]).strip() for keyword in ("酒店", "宾馆", "饭店", "酒楼", "餐厅", "航空", "铁路", "滴滴"): if keyword in text: return keyword return "" @staticmethod def _extract_amount_from_card(card: UserAgentReviewDocumentCard) -> float: for item in card.fields: if item.label != "金额": continue try: return float(str(item.value).replace("元", "").strip()) except ValueError: return 0.0 return 0.0 def _resolve_amount_value(self, payload: UserAgentRequest) -> float: for item in payload.ontology.entities: if item.type == "amount" and item.role != "threshold": try: return float(item.normalized_value) except ValueError: return 0.0 return 0.0 def _sum_ocr_amounts(self, ocr_documents: list[dict[str, object]]) -> float: total = 0.0 for item in ocr_documents: fields = self._extract_document_fields(item) amount_text = str(fields.get("金额") or "").replace("元", "").strip() if not amount_text: continue try: total += float(amount_text) except ValueError: continue return total def _infer_expense_type_from_documents( self, payload: UserAgentRequest, ocr_documents: list[dict[str, object]], ) -> str: labels: list[str] = [] for item in ocr_documents: classified = self._classify_document(item, payload) label = GROUP_SCENE_LABELS.get(classified["group_code"], "") if label and label not in labels: labels.append(label) return " + ".join(labels[:3])