from __future__ import annotations import json import re from datetime import UTC, datetime, timedelta from sqlalchemy import select from sqlalchemy.orm import Session from app.core.agent_enums import AgentAssetStatus, AgentAssetType from app.models.financial_record import ExpenseClaim from app.schemas.agent_asset import AgentAssetListItem from app.schemas.user_agent import ( UserAgentCitation, UserAgentDraftPayload, UserAgentReviewAction, UserAgentReviewClaimGroup, UserAgentReviewDocumentCard, UserAgentReviewDocumentField, UserAgentReviewPayload, UserAgentReviewRiskBrief, UserAgentReviewSlotCard, UserAgentRequest, UserAgentResponse, UserAgentSuggestedAction, ) from app.services.agent_assets import AgentAssetService from app.services.agent_foundation import AgentFoundationService from app.services.runtime_chat import RuntimeChatService SCENARIO_LABELS = { "expense": "报销", "accounts_receivable": "应收", "accounts_payable": "应付", "knowledge": "知识", "unknown": "通用", } RISK_REASON_MAP = { "duplicate_expense": "检测到同员工、同金额或近似单据存在重复提交迹象。", "amount_over_limit": "金额超过当前制度或预算阈值,需要补充例外说明。", "invoice_anomaly": "票据或附件完整性不满足当前规则要求,需要补件或人工复核。", "ar_overdue": "应收账款已出现逾期,存在回款延迟风险。", "ap_overdue": "应付付款已出现逾期,可能影响供应商履约或合作关系。", } GENERIC_EXPENSE_PROMPTS = { "报销", "我要报销", "我想报销", "帮我报销", "我要申请报销", "发起报销", "提交报销", } EXPLICIT_DRAFT_KEYWORDS = ("生成", "草稿", "起草", "创建", "发起", "准备") EXPENSE_TYPE_LABELS = { "travel": "差旅", "hotel": "住宿", "transport": "交通", "meal": "餐费", "meeting": "会务", "entertainment": "招待", "other": "其他", } GROUP_SCENE_LABELS = { "travel": "差旅费", "entertainment": "业务招待费", "meal": "伙食费", "transport": "交通费", "hotel": "住宿费", "other": "其他费用", } SLOT_LABELS = { "expense_type": "报销类型", "customer_name": "客户名称", "time_range": "发生时间", "location": "地点", "merchant_name": "酒店/商户", "amount": "金额", "participants": "参与人员", "attachments": "票据附件", } DATE_TEXT_PATTERN = re.compile(r"(\d{4}[年/-]\d{1,2}[月/-]\d{1,2}日?)") AMOUNT_TEXT_PATTERN = re.compile(r"(\d+(?:\.\d+)?)\s*(?:元|万元|万)") class UserAgentService: def __init__(self, db: Session) -> None: self.db = db self.asset_service = AgentAssetService(db) self.runtime_chat_service = RuntimeChatService(db) def respond(self, payload: UserAgentRequest) -> UserAgentResponse: AgentFoundationService(self.db).ensure_foundation_ready() citations = self._build_rule_citations(payload) suggested_actions = self._build_suggested_actions(payload) risk_flags = self._resolve_risk_flags(payload) draft_payload = ( self._build_draft_payload(payload) if payload.ontology.intent == "draft" else None ) review_payload = self._build_review_payload( payload, citations=citations, draft_payload=draft_payload, ) if payload.degraded and payload.tool_payload.get("message"): return UserAgentResponse( answer=str(payload.tool_payload["message"]), citations=citations, suggested_actions=suggested_actions, review_payload=review_payload, risk_flags=risk_flags, requires_confirmation=payload.requires_confirmation, ) guided_answer = None if draft_payload is None or draft_payload.claim_id is None: guided_answer = self._build_guided_answer(payload) if guided_answer: return UserAgentResponse( answer=guided_answer, citations=citations, suggested_actions=suggested_actions, draft_payload=draft_payload, review_payload=review_payload, risk_flags=risk_flags, requires_confirmation=payload.requires_confirmation, ) fallback_answer = self._build_fallback_answer( payload, citations=citations, draft_payload=draft_payload, ) answer = None if not self._should_skip_model_answer(payload, review_payload): answer = self._generate_answer_with_model( payload, citations=citations, suggested_actions=suggested_actions, risk_flags=risk_flags, draft_payload=draft_payload, fallback_answer=fallback_answer, ) return UserAgentResponse( answer=answer or fallback_answer, citations=citations, suggested_actions=suggested_actions, draft_payload=draft_payload, review_payload=review_payload, risk_flags=risk_flags, requires_confirmation=payload.requires_confirmation, ) def _build_fallback_answer( self, payload: UserAgentRequest, *, citations: list[UserAgentCitation], draft_payload: UserAgentDraftPayload | None, ) -> str: if payload.ontology.intent in {"query", "compare"}: return self._build_query_answer(payload) if payload.ontology.intent == "risk_check": return self._build_risk_answer(payload, citations) if payload.ontology.intent == "draft": tool_message = str(payload.tool_payload.get("message") or "").strip() if tool_message and ( str(payload.tool_payload.get("claim_id") or "").strip() or str(payload.tool_payload.get("claim_no") or "").strip() ): return tool_message if payload.ontology.intent == "draft" and draft_payload is not None: return ( f"已生成 {draft_payload.title},当前仅返回待人工确认的草稿内容," "仍需人工确认后再进入正式流程。" ) return self._build_explain_answer(payload, citations) def _build_guided_answer(self, payload: UserAgentRequest) -> str | None: if not self._is_generic_expense_prompt(payload): return self._build_implicit_expense_draft_guidance(payload) attachment_names = self._resolve_attachment_names(payload) ocr_summary = str(payload.context_json.get("ocr_summary") or "").strip() attachment_hint = "" if ocr_summary: attachment_hint = f" 我已读取附件 OCR 摘要:{ocr_summary}" elif attachment_names: attachment_hint = ( f" 我已带入 {len(attachment_names)} 份附件名称,但目前还不能直接读取附件内容," "仍需要你补充关键信息。" ) return ( "可以帮你发起报销。请补充费用类型、发生时间、金额、事由和相关对象," "或者直接上传票据附件,我再继续帮你判断能否报、缺什么材料以及生成报销草稿。" f"{attachment_hint}" ) def _build_implicit_expense_draft_guidance( self, payload: UserAgentRequest, ) -> str | None: if not self._is_implicit_expense_draft_request(payload): return None amount_text = next( (item.value for item in payload.ontology.entities if item.type == "amount"), "", ) expense_type = next( ( EXPENSE_TYPE_LABELS.get(item.normalized_value, item.value) for item in payload.ontology.entities if item.type == "expense_type" ), "报销", ) time_text = payload.ontology.time_range.raw or "本次" amount_hint = f",金额 {amount_text}" if amount_text else "" return ( f"已识别到一笔{time_text}的{expense_type}支出{amount_hint}。" "如果要继续生成报销草稿,还需要补充客户单位、参与人员、费用明细和票据附件。" "你也可以继续上传发票或图片,我会把这些信息带入后续对话。" ) def _generate_answer_with_model( self, payload: UserAgentRequest, *, citations: list[UserAgentCitation], suggested_actions: list[UserAgentSuggestedAction], risk_flags: list[str], draft_payload: UserAgentDraftPayload | None, fallback_answer: str, ) -> str | None: messages = self._build_model_messages( payload, citations=citations, suggested_actions=suggested_actions, risk_flags=risk_flags, draft_payload=draft_payload, fallback_answer=fallback_answer, ) return self._sanitize_model_answer( self.runtime_chat_service.complete( messages, max_tokens=420, temperature=0.2, ) ) def _sanitize_model_answer(self, answer: str | None) -> str | None: if not answer: return None cleaned = re.sub(r".*?", "", answer, flags=re.DOTALL | re.IGNORECASE) cleaned = cleaned.strip() return cleaned or None def _build_model_messages( self, payload: UserAgentRequest, *, citations: list[UserAgentCitation], suggested_actions: list[UserAgentSuggestedAction], risk_flags: list[str], draft_payload: UserAgentDraftPayload | None, fallback_answer: str, ) -> list[dict[str, str]]: facts = { "run_id": payload.run_id, "user_message": payload.message, "ontology": payload.ontology.model_dump(mode="json"), "context": { "entry_source": payload.context_json.get("entry_source"), "user_name": payload.context_json.get("name"), "user_role": payload.context_json.get("role"), "request_context": payload.context_json.get("request_context"), "attachment_count": payload.context_json.get("attachment_count"), "attachment_names": self._resolve_attachment_names(payload), "ocr_summary": payload.context_json.get("ocr_summary", ""), "ocr_documents": payload.context_json.get("ocr_documents", []), "conversation_id": payload.context_json.get("conversation_id"), "conversation_scenario": payload.context_json.get("conversation_scenario"), "conversation_intent": payload.context_json.get("conversation_intent"), "draft_claim_id": payload.context_json.get("draft_claim_id"), "conversation_history": self._resolve_conversation_history(payload), }, "tool_payload": payload.tool_payload, "citations": [item.model_dump(mode="json") for item in citations], "suggested_actions": [ item.model_dump(mode="json") for item in suggested_actions ], "risk_flags": risk_flags, "draft_payload": ( draft_payload.model_dump(mode="json") if draft_payload is not None else None ), "selected_capability_codes": payload.selected_capability_codes, "requires_confirmation": payload.requires_confirmation, "fallback_answer": fallback_answer, } system_prompt = ( "你是企业财务共享场景中的中文智能助手,负责和最终用户直接对话。" "你只能基于提供的事实回答,不能编造制度、流程结果或附件内容。" "如果用户问题很笼统,例如“我要报销”,优先告诉用户你可以协助什么," "并明确要求补充费用类型、金额、时间、事由、参与对象或上传票据。" "如果上下文里只有附件名称,必须明确说明你只拿到了附件名称," "不能假装已看过图片、PDF 或发票内容。" "如果提供了 conversation_history,必须结合最近轮次理解追问、代词、省略字段和补充信息。" "不要声称已经提交、审批、付款、入账或真正执行了任何动作;如果只是建议、草稿或待确认,要明确说清楚。" "若给出了风险标签、制度引用或建议动作,可以简洁吸收进回答,但不要新增未提供的事实。" "只输出最终给用户看的自然语言,不要输出 JSON、Markdown、标题、" " 标签或任何中间推理。" "使用简体中文,控制在 2 到 4 句。" ) user_prompt = ( "请根据以下事实生成最终答复,优先保持准确、具体、可执行:\n" f"{json.dumps(facts, ensure_ascii=False, indent=2)}" ) return [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}, ] def _build_query_answer(self, payload: UserAgentRequest) -> str: scenario = payload.ontology.scenario data = payload.tool_payload subject = self._resolve_subject(payload) if scenario == "expense": record_count = int(data.get("record_count") or 0) total_amount = float(data.get("total_amount") or 0) return ( f"{subject}共命中 {record_count} 笔报销,金额合计 {total_amount:.2f} 元。" "如需继续处理,可以查看明细或生成处理意见草稿。" ) if scenario == "accounts_receivable": record_count = int(data.get("record_count") or 0) outstanding_amount = float(data.get("outstanding_amount") or 0) return ( f"{subject}共命中 {record_count} 条应收,未回款金额 {outstanding_amount:.2f} 元。" "建议结合账龄和客户分布继续排查逾期风险。" ) if scenario == "accounts_payable": record_count = int(data.get("record_count") or 0) outstanding_amount = float(data.get("outstanding_amount") or 0) return ( f"{subject}共命中 {record_count} 条应付,待付金额 {outstanding_amount:.2f} 元。" "如需推进动作,建议先生成付款建议草稿并发起人工确认。" ) return "已完成当前查询,但暂时没有更多结构化结果可展示。" def _build_explain_answer( self, payload: UserAgentRequest, citations: list[UserAgentCitation], ) -> str: if citations: titles = "、".join(item.title for item in citations[:2]) summary = citations[0].excerpt or "请结合制度全文进一步确认。" return f"已检索到相关依据:{titles}。核心说明:{summary}" return ( f"当前还没有与“{SCENARIO_LABELS.get(payload.ontology.scenario, '当前问题')}”" "强匹配的已上线规则引用,建议先人工复核或补充更具体的单据上下文。" ) def _build_risk_answer( self, payload: UserAgentRequest, citations: list[UserAgentCitation], ) -> str: risk_flags = self._resolve_risk_flags(payload) if not risk_flags: return "当前未识别到明确风险标签,建议继续查看原始明细或补充更多上下文。" reasons = [RISK_REASON_MAP.get(flag, f"{flag} 需要人工进一步确认。") for flag in risk_flags] citation_text = ( f" 参考规则:{'、'.join(item.title for item in citations[:2])}。" if citations else "" ) return ( f"本次识别到 {len(risk_flags)} 类风险:{'、'.join(risk_flags)}。" f"触发原因:{';'.join(reasons)}。" "建议先复核明细、附件和审批链,再决定是否继续处理。" f"{citation_text}" ) def _build_draft_payload(self, payload: UserAgentRequest) -> UserAgentDraftPayload: scenario_label = SCENARIO_LABELS.get(payload.ontology.scenario, "业务") subject = self._resolve_subject(payload) claim_no = str(payload.tool_payload.get("claim_no") or "").strip() or None claim_status = str(payload.tool_payload.get("status") or "").strip() or None title = f"{scenario_label}处理意见草稿" if claim_no: title = f"{scenario_label}草稿 {claim_no}" body = ( f"主题:{subject}\n" "结论:已根据当前语义解析结果生成草稿,尚未自动执行。\n" "建议:请先核对明细、规则命中和所需附件,再由人工确认是否提交正式流程。\n" f"原始问题:{payload.message}" ) return UserAgentDraftPayload( draft_type=payload.ontology.scenario, title=title, body=body, confirmation_required=True, claim_id=str(payload.tool_payload.get("claim_id") or "").strip() or None, claim_no=claim_no, status=claim_status, ) def _build_suggested_actions( self, payload: UserAgentRequest, ) -> list[UserAgentSuggestedAction]: if self._is_generic_expense_prompt(payload): return [ UserAgentSuggestedAction( label="上传票据", action_type="ask_clarification", description="上传发票、行程单或付款截图,继续识别报销内容。", ), UserAgentSuggestedAction( label="补充报销信息", action_type="ask_clarification", description="补充费用类型、金额、时间和事由后继续处理。", ), ] if payload.ontology.intent in {"query", "compare"}: return [ UserAgentSuggestedAction( label="查看明细", action_type="open_detail", description="继续查看命中记录和过滤条件。", ), UserAgentSuggestedAction( label="生成处理意见", action_type="create_draft", description="把当前查询结果整理成可确认草稿。", ), ] if payload.ontology.intent == "risk_check": return [ UserAgentSuggestedAction( label="人工复核风险", action_type="manual_review", description="优先检查明细、附件和规则命中原因。", ), UserAgentSuggestedAction( label="生成整改建议", action_type="create_draft", description="把风险说明整理成处理意见草稿。", ), ] if payload.ontology.intent == "draft": return [ UserAgentSuggestedAction( label="复制草稿", action_type="copy_draft", description="复制当前草稿后交由人工确认。", ), UserAgentSuggestedAction( label="补充上下文", action_type="ask_clarification", description="补充单据编号、客户或供应商信息以完善草稿。", ), ] return [ UserAgentSuggestedAction( label="查看规则全文", action_type="open_rule", description="继续查看引用规则或知识内容。", ), UserAgentSuggestedAction( label="补充问题上下文", action_type="ask_clarification", description="补充业务对象、时间或单据范围,提升回答准确度。", ), ] def _build_review_payload( self, payload: UserAgentRequest, *, citations: list[UserAgentCitation], draft_payload: UserAgentDraftPayload | None, ) -> UserAgentReviewPayload | None: attachment_count = self._resolve_attachment_count(payload) ocr_documents = self._resolve_ocr_documents(payload) if payload.ontology.scenario != "expense": return None if payload.ontology.intent not in {"draft", "operate"} and attachment_count <= 0 and not ocr_documents: return None slot_cards = self._build_review_slot_cards(payload, ocr_documents=ocr_documents) document_cards = self._build_review_document_cards(payload, ocr_documents=ocr_documents) claim_groups = self._build_review_claim_groups( payload, document_cards=document_cards, ) risk_briefs = self._build_review_risk_briefs( payload, citations=citations, document_cards=document_cards, claim_groups=claim_groups, ) confirmation_actions = self._build_review_confirmation_actions( payload, claim_groups=claim_groups, draft_payload=draft_payload, ) intent_summary = self._build_review_intent_summary( payload, slot_cards=slot_cards, claim_groups=claim_groups, ) return UserAgentReviewPayload( intent_summary=intent_summary, scenario=payload.ontology.scenario, intent=payload.ontology.intent, missing_slots=list(payload.ontology.missing_slots), risk_briefs=risk_briefs, slot_cards=slot_cards, document_cards=document_cards, claim_groups=claim_groups, confirmation_actions=confirmation_actions, ) def _build_review_slot_cards( self, payload: UserAgentRequest, *, ocr_documents: list[dict[str, object]], ) -> list[UserAgentReviewSlotCard]: first_doc_fields = self._extract_document_fields(ocr_documents[0]) if ocr_documents else {} missing_slots = set(payload.ontology.missing_slots) entity_map = self._collect_entity_values(payload) time_value = self._format_time_range(payload) location_value = self._resolve_location_value(payload) merchant_value = self._extract_document_merchant_name(ocr_documents[0]) if ocr_documents else "" customer_value = entity_map.get("customer", "") participants_value = entity_map.get("participants", "") amount_value = entity_map.get("amount") if not amount_value: ocr_total_amount = self._sum_ocr_amounts(ocr_documents) amount_value = f"{ocr_total_amount:.2f}元" if ocr_total_amount > 0 else "" expense_type_code = entity_map.get("expense_type_code", "") expense_type_value = EXPENSE_TYPE_LABELS.get(expense_type_code, entity_map.get("expense_type", "")) if not expense_type_value and ocr_documents: expense_type_value = self._infer_expense_type_from_documents(payload, ocr_documents) attachment_value = ( f"{self._resolve_attachment_count(payload)} 份附件" if self._resolve_attachment_count(payload) else "" ) cards = [ self._make_slot_card( key="expense_type", value=expense_type_value, source="user_text" if expense_type_value else "system", confidence=0.9 if expense_type_value else 0.0, missing_slots=missing_slots, ), self._make_slot_card( key="customer_name", value=customer_value, source="user_text" if customer_value else "system", confidence=0.88 if customer_value else 0.0, missing_slots=missing_slots, ), self._make_slot_card( key="time_range", value=time_value, source="user_text" if time_value else "system", confidence=0.9 if time_value else 0.0, missing_slots=missing_slots, ), self._make_slot_card( key="location", value=location_value, source="page_context" if location_value and location_value != "客户现场" else "user_text", confidence=0.82 if location_value else 0.0, required=False, missing_slots=missing_slots, ), self._make_slot_card( key="merchant_name", value=merchant_value, source="ocr" if merchant_value else "system", confidence=0.72 if merchant_value else 0.0, required=False, missing_slots=missing_slots, ), self._make_slot_card( key="amount", value=amount_value, source="user_text" if entity_map.get("amount") else "ocr" if amount_value else "system", confidence=0.92 if amount_value else 0.0, missing_slots=missing_slots, ), self._make_slot_card( key="participants", value=participants_value, source="user_text" if participants_value else "system", confidence=0.8 if participants_value else 0.0, missing_slots=missing_slots, ), self._make_slot_card( key="attachments", value=attachment_value, source="upload" if attachment_value else "system", confidence=1.0 if attachment_value else 0.0, missing_slots=missing_slots, ), ] return cards def _build_review_document_cards( self, payload: UserAgentRequest, *, ocr_documents: list[dict[str, object]], ) -> list[UserAgentReviewDocumentCard]: cards: list[UserAgentReviewDocumentCard] = [] for index, item in enumerate(ocr_documents, start=1): classified = self._classify_document(item, payload) fields = self._extract_document_fields(item) cards.append( UserAgentReviewDocumentCard( index=index, filename=str(item.get("filename") or f"document-{index}"), document_type=classified["document_type"], suggested_expense_type=classified["expense_type"], scene_label=GROUP_SCENE_LABELS.get( classified["group_code"], classified["scene_label"], ), summary=str(item.get("summary") or item.get("text") or "").strip(), avg_score=float(item.get("avg_score") or 0.0), warnings=[str(warning) for warning in item.get("warnings", []) if str(warning).strip()], fields=[ UserAgentReviewDocumentField( label=label, value=value, source="ocr", ) for label, value in fields.items() if str(value).strip() ], ) ) return cards def _build_review_claim_groups( self, payload: UserAgentRequest, *, document_cards: list[UserAgentReviewDocumentCard], ) -> list[UserAgentReviewClaimGroup]: groups: dict[str, dict[str, object]] = {} for card in document_cards: group_code = self._normalize_group_code(card.suggested_expense_type) bucket = groups.setdefault( group_code, { "document_indexes": [], "amount_total": 0.0, "expense_type": group_code, "scene_label": GROUP_SCENE_LABELS.get(group_code, "其他费用"), "reasons": [], }, ) bucket["document_indexes"].append(card.index) bucket["amount_total"] = float(bucket["amount_total"]) + self._extract_amount_from_card(card) bucket["reasons"].append(f"{card.filename} 识别为 {card.scene_label}") if not groups: expense_type_code = self._collect_entity_values(payload).get("expense_type_code", "other") group_code = self._normalize_group_code(expense_type_code) groups[group_code] = { "document_indexes": [], "amount_total": self._resolve_amount_value(payload), "expense_type": expense_type_code or "other", "scene_label": GROUP_SCENE_LABELS.get(group_code, "其他费用"), "reasons": ["当前主要依据用户文本和页面上下文进行分单建议。"], } claim_groups: list[UserAgentReviewClaimGroup] = [] for index, (group_code, bucket) in enumerate(groups.items(), start=1): title = f"建议报销单 {index}:{bucket['scene_label']}" rationale = ( ";".join(dict.fromkeys(str(item) for item in bucket["reasons"])) if bucket["reasons"] else "当前仅有单一场景,无需拆单。" ) claim_groups.append( UserAgentReviewClaimGroup( group_code=group_code, title=title, expense_type=str(bucket["expense_type"]), scene_label=str(bucket["scene_label"]), document_indexes=list(bucket["document_indexes"]), amount_total=round(float(bucket["amount_total"]), 2), rationale=rationale, ) ) return claim_groups def _build_review_risk_briefs( self, payload: UserAgentRequest, *, citations: list[UserAgentCitation], document_cards: list[UserAgentReviewDocumentCard], claim_groups: list[UserAgentReviewClaimGroup], ) -> list[UserAgentReviewRiskBrief]: briefs: list[UserAgentReviewRiskBrief] = [] employee_name = self._collect_entity_values(payload).get("employee_name") or str( payload.context_json.get("name") or "" ).strip() if employee_name: since = datetime.now(UTC) - timedelta(days=90) stmt = select(ExpenseClaim).where( ExpenseClaim.employee_name == employee_name, ExpenseClaim.occurred_at >= since, ) recent_claims = list(self.db.scalars(stmt).all()) if recent_claims: risky_count = sum(1 for item in recent_claims if item.risk_flags_json) draft_count = sum(1 for item in recent_claims if item.status == "draft") briefs.append( UserAgentReviewRiskBrief( title="历史报销画像", level="info", content=( f"{employee_name} 最近 90 天共有 {len(recent_claims)} 笔报销," f"其中 {risky_count} 笔带风险标记,{draft_count} 笔仍处于草稿态。" ), ) ) current_amount = self._resolve_amount_value(payload) if current_amount > 0: duplicate_count = sum( 1 for item in recent_claims if abs(float(item.amount) - current_amount) < 0.01 ) if duplicate_count: briefs.append( UserAgentReviewRiskBrief( title="金额重复预警", level="warning", content=( f"近 90 天发现 {duplicate_count} 笔金额相同的报销记录," "提交前建议核对是否为重复报销或拆分不当。" ), ) ) if citations: briefs.append( UserAgentReviewRiskBrief( title="制度注意事项", level="info", content=citations[0].excerpt or f"请先核对 {citations[0].title} 的制度要求。", ) ) warning_count = sum(len(item.warnings) for item in document_cards) if warning_count: briefs.append( UserAgentReviewRiskBrief( title="票据识别提醒", level="warning", content=f"当前共有 {warning_count} 条票据识别提示,建议逐张确认 OCR 识别字段。", ) ) if len(claim_groups) > 1: briefs.append( UserAgentReviewRiskBrief( title="建议拆单", level="high", content=f"系统检测到 {len(claim_groups)} 类费用场景,建议拆成多张报销单后再提交。", ) ) return briefs[:4] def _build_review_confirmation_actions( self, payload: UserAgentRequest, *, claim_groups: list[UserAgentReviewClaimGroup], draft_payload: UserAgentDraftPayload | None, ) -> list[UserAgentReviewAction]: actions: list[UserAgentReviewAction] = [] if claim_groups: if len(claim_groups) > 1: actions.append( UserAgentReviewAction( label=f"按 {len(claim_groups)} 张报销单生成", action_type="split_claims", description="保留当前识别结果,并按费用场景拆分生成多张报销草稿。", emphasis="primary", ) ) else: actions.append( UserAgentReviewAction( label="确认并继续生成草稿", action_type="confirm_review", description="确认当前识别字段无误后,继续生成或覆盖当前报销草稿。", emphasis="primary", ) ) for slot in payload.ontology.missing_slots[:3]: label = SLOT_LABELS.get(slot, slot) actions.append( UserAgentReviewAction( label=f"补充{label}", action_type="fill_slot", description=f"当前还缺少 {label},补充后可提升分单和建单准确度。", emphasis="secondary", ) ) if self._resolve_attachment_count(payload) <= 0: actions.append( UserAgentReviewAction( label="继续上传票据", action_type="upload_more", description="上传发票、行程单或电子票据后,系统会重新识别并完善报销分组。", emphasis="secondary", ) ) if draft_payload is not None and draft_payload.claim_no: actions.append( UserAgentReviewAction( label=f"查看草稿 {draft_payload.claim_no}", action_type="open_claim", description="查看当前已创建的报销草稿,并继续补充字段或附件。", emphasis="secondary", ) ) return actions[:5] def _build_review_intent_summary( self, payload: UserAgentRequest, *, slot_cards: list[UserAgentReviewSlotCard], claim_groups: list[UserAgentReviewClaimGroup], ) -> str: slots = {item.key: item for item in slot_cards} expense_type = slots.get("expense_type") amount = slots.get("amount") time_range = slots.get("time_range") location = slots.get("location") customer = slots.get("customer_name") summary = "系统识别出您想要发起一笔报销。" if expense_type and expense_type.value: summary = f"系统识别出您想要报销{expense_type.value}。" details: list[str] = [] if customer and customer.value: details.append(f"客户名称:{customer.value}") if time_range and time_range.value: details.append(f"时间:{time_range.value}") if location and location.value: details.append(f"地点:{location.value}") if amount and amount.value: details.append(f"金额:{amount.value}") if claim_groups and len(claim_groups) > 1: details.append(f"建议拆分为 {len(claim_groups)} 张报销单") if details: return f"{summary} {';'.join(details)}。" return summary @staticmethod def _should_skip_model_answer( payload: UserAgentRequest, review_payload: UserAgentReviewPayload | None, ) -> bool: if review_payload is None: return False return payload.ontology.scenario == "expense" and ( payload.ontology.intent == "draft" or int(payload.context_json.get("attachment_count") or 0) > 0 ) def _build_rule_citations(self, payload: UserAgentRequest) -> list[UserAgentCitation]: domain = self._resolve_domain(payload.ontology.scenario) items = self.asset_service.list_assets( asset_type=AgentAssetType.RULE.value, status=AgentAssetStatus.ACTIVE.value, domain=domain, ) ranked = self._rank_rule_assets(items, payload) citations: list[UserAgentCitation] = [] for item in ranked[:2]: detail = self.asset_service.get_asset(item.id) if detail is None: continue excerpt = self._extract_excerpt(str(detail.current_version_content or "")) citations.append( UserAgentCitation( source_type="rule", code=detail.code, title=detail.name, version=detail.current_version, updated_at=detail.updated_at.date().isoformat(), excerpt=excerpt, ) ) return citations @staticmethod def _resolve_risk_flags(payload: UserAgentRequest) -> list[str]: tool_flags = payload.tool_payload.get("risk_flags") if isinstance(tool_flags, list) and tool_flags: return [str(item) for item in tool_flags] return [str(item) for item in payload.ontology.risk_flags] @staticmethod def _resolve_subject(payload: UserAgentRequest) -> str: named_entities = [ item.value for item in payload.ontology.entities if item.type in {"employee", "customer", "vendor", "project"} ] if named_entities: return f"{'、'.join(named_entities)} 相关数据" return f"{SCENARIO_LABELS.get(payload.ontology.scenario, '当前')}场景数据" @staticmethod def _is_generic_expense_prompt(payload: UserAgentRequest) -> bool: if payload.ontology.scenario != "expense": return False normalized_message = re.sub(r"\s+", "", payload.message) return normalized_message in GENERIC_EXPENSE_PROMPTS @staticmethod def _is_implicit_expense_draft_request(payload: UserAgentRequest) -> bool: if payload.ontology.scenario != "expense" or payload.ontology.intent != "draft": return False compact_message = re.sub(r"\s+", "", payload.message) if any(keyword in compact_message for keyword in EXPLICIT_DRAFT_KEYWORDS): return False return True @staticmethod def _resolve_attachment_names(payload: UserAgentRequest) -> list[str]: names = payload.context_json.get("attachment_names") if not isinstance(names, list): return [] return [str(name) for name in names if str(name).strip()] @staticmethod def _resolve_attachment_count(payload: UserAgentRequest) -> int: names = UserAgentService._resolve_attachment_names(payload) if names: return len(names) try: return max(0, int(payload.context_json.get("attachment_count") or 0)) except (TypeError, ValueError): return 0 @staticmethod def _resolve_ocr_documents(payload: UserAgentRequest) -> list[dict[str, object]]: documents = payload.context_json.get("ocr_documents") if not isinstance(documents, list): return [] normalized: list[dict[str, object]] = [] for item in documents[:8]: if not isinstance(item, dict): continue normalized.append(item) return normalized @staticmethod def _resolve_conversation_history(payload: UserAgentRequest) -> list[dict[str, object]]: history = payload.context_json.get("conversation_history") if not isinstance(history, list): return [] normalized: list[dict[str, object]] = [] for item in history[-8:]: if not isinstance(item, dict): continue role = str(item.get("role") or "").strip() content = str(item.get("content") or "").strip() if not role or not content: continue normalized.append({"role": role, "content": content}) return normalized @staticmethod def _resolve_domain(scenario: str) -> str | None: if scenario == "expense": return "expense" if scenario == "accounts_receivable": return "ar" if scenario == "accounts_payable": return "ap" return None @staticmethod def _rank_rule_assets( items: list[AgentAssetListItem], payload: UserAgentRequest, ) -> list[AgentAssetListItem]: def score(item: AgentAssetListItem) -> tuple[int, str]: tags = {str(value) for value in item.scenario_json or []} weight = 0 if payload.ontology.scenario in tags: weight += 3 if payload.ontology.intent in tags: weight += 2 for risk_flag in payload.ontology.risk_flags: if risk_flag in tags: weight += 4 return weight, item.code ranked = sorted(items, key=score, reverse=True) return [item for item in ranked if score(item)[0] > 0] @staticmethod def _extract_excerpt(content: str) -> str: lines = [line.strip() for line in str(content).splitlines() if line.strip()] cleaned: list[str] = [] for line in lines: normalized = re.sub(r"^[#>\-\*\d\.\s`]+", "", line).strip() if normalized: cleaned.append(normalized) if len(cleaned) >= 2: break return ";".join(cleaned[:2]) def _collect_entity_values(self, payload: UserAgentRequest) -> dict[str, str]: values = { "employee_name": "", "customer": "", "participants": "", "amount": "", "expense_type": "", "expense_type_code": "", } participants: list[str] = [] for item in payload.ontology.entities: if item.type == "employee" and not values["employee_name"]: values["employee_name"] = item.value elif item.type == "customer" and not values["customer"]: values["customer"] = item.value elif item.type == "amount" and item.role != "threshold" and not values["amount"]: values["amount"] = f"{item.value}元" if "元" not in item.value else item.value elif item.type == "expense_type" and not values["expense_type_code"]: values["expense_type_code"] = item.normalized_value values["expense_type"] = EXPENSE_TYPE_LABELS.get( item.normalized_value, item.value, ) elif item.type in {"participant", "person"} and item.value.strip(): participants.append(item.value.strip()) if participants: values["participants"] = "、".join(dict.fromkeys(participants)) return values def _format_time_range(self, payload: UserAgentRequest) -> str: time_range = payload.ontology.time_range if time_range.raw: return time_range.raw if time_range.start_date and time_range.end_date: if time_range.start_date == time_range.end_date: return time_range.start_date return f"{time_range.start_date} 至 {time_range.end_date}" return "" def _resolve_location_value(self, payload: UserAgentRequest) -> str: request_context = payload.context_json.get("request_context") if isinstance(request_context, dict): for key in ("city", "location"): value = str(request_context.get(key) or "").strip() if value: return value city_match = re.search(r"去(?P[\u4e00-\u9fa5]{2,8})(?:出差|拜访|参会|见客户|客户现场)", payload.message) if city_match: return city_match.group("city").strip() if "客户现场" in payload.message.replace(" ", ""): return "客户现场" return "" def _make_slot_card( self, *, key: str, value: str, source: str, confidence: float, missing_slots: set[str], required: bool = True, ) -> UserAgentReviewSlotCard: is_missing = key in missing_slots or not str(value).strip() return UserAgentReviewSlotCard( key=key, label=SLOT_LABELS.get(key, key), value=str(value or "").strip(), source=source, confidence=confidence, required=required, confirmed=not is_missing and source in {"user_text", "page_context", "upload"}, status="missing" if is_missing else "identified" if source == "user_text" else "inferred", hint=f"建议补充 {SLOT_LABELS.get(key, key)}。" if is_missing and required else "", ) def _classify_document( self, item: dict[str, object], payload: UserAgentRequest, ) -> dict[str, str]: text = " ".join( [ str(item.get("filename") or ""), str(item.get("summary") or ""), str(item.get("text") or ""), ] ).lower() compact = text.replace(" ", "") expense_type_code = self._collect_entity_values(payload).get("expense_type_code", "") has_customer = bool(self._collect_entity_values(payload).get("customer")) if any(keyword in compact for keyword in ("机票", "航班", "火车", "高铁", "行程单")): return { "document_type": "travel_ticket", "expense_type": "travel", "group_code": "travel", "scene_label": "差旅票据", } if any(keyword in compact for keyword in ("酒店", "住宿", "宾馆")): return { "document_type": "hotel_invoice", "expense_type": "hotel", "group_code": "travel", "scene_label": "住宿票据", } if any(keyword in compact for keyword in ("打车", "出租车", "滴滴", "网约车", "过路费", "停车")): return { "document_type": "transport_receipt", "expense_type": "transport", "group_code": "travel", "scene_label": "交通票据", } if any(keyword in compact for keyword in ("餐", "饭店", "酒楼", "酒家", "餐饮", "meal")): group_code = "entertainment" if expense_type_code == "entertainment" or has_customer else "meal" return { "document_type": "meal_receipt", "expense_type": group_code, "group_code": group_code, "scene_label": "餐饮票据", } return { "document_type": "other", "expense_type": expense_type_code or "other", "group_code": self._normalize_group_code(expense_type_code or "other"), "scene_label": "其他票据", } @staticmethod def _normalize_group_code(expense_type_code: str) -> str: if expense_type_code in {"travel", "hotel", "transport"}: return "travel" if expense_type_code in {"entertainment", "meal"}: return expense_type_code return "other" def _extract_document_fields(self, item: dict[str, object]) -> dict[str, str]: text = " ".join([str(item.get("summary") or ""), str(item.get("text") or "")]).strip() fields: dict[str, str] = {} amount_match = AMOUNT_TEXT_PATTERN.search(text) if amount_match: fields["金额"] = f"{amount_match.group(1)}元" date_match = DATE_TEXT_PATTERN.search(text) if date_match: fields["时间"] = date_match.group(1) merchant = self._extract_document_merchant_name(item) if merchant: fields["商户/酒店"] = merchant return fields @staticmethod def _extract_document_merchant_name(item: dict[str, object]) -> str: text = " ".join([str(item.get("summary") or ""), str(item.get("text") or "")]).strip() for keyword in ("酒店", "宾馆", "饭店", "酒楼", "餐厅", "航空", "铁路", "滴滴"): if keyword in text: return keyword return "" @staticmethod def _extract_amount_from_card(card: UserAgentReviewDocumentCard) -> float: for item in card.fields: if item.label != "金额": continue try: return float(str(item.value).replace("元", "").strip()) except ValueError: return 0.0 return 0.0 def _resolve_amount_value(self, payload: UserAgentRequest) -> float: for item in payload.ontology.entities: if item.type == "amount" and item.role != "threshold": try: return float(item.normalized_value) except ValueError: return 0.0 return 0.0 def _sum_ocr_amounts(self, ocr_documents: list[dict[str, object]]) -> float: total = 0.0 for item in ocr_documents: fields = self._extract_document_fields(item) amount_text = str(fields.get("金额") or "").replace("元", "").strip() if not amount_text: continue try: total += float(amount_text) except ValueError: continue return total def _infer_expense_type_from_documents( self, payload: UserAgentRequest, ocr_documents: list[dict[str, object]], ) -> str: labels: list[str] = [] for item in ocr_documents: classified = self._classify_document(item, payload) label = GROUP_SCENE_LABELS.get(classified["group_code"], "") if label and label not in labels: labels.append(label) return " + ".join(labels[:3])