Files
X-Financial/server/src/app/services/user_agent.py
caoxiaozhu 002bf4f756 feat: 完善报销单审批流程及退回原因追踪
新增直属领导审批通过接口和审批待办列表查询,报销单退回
支持原因码分类和审批环节标记,优化票据附件去重和路径
回退查找,前端新增退回原因对话框、审批收件箱和工作台
图标组件,补充工具函数和单元测试覆盖。
2026-05-20 21:00:47 +08:00

3918 lines
164 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
import json
import re
from datetime import UTC, datetime, timedelta
from decimal import Decimal, InvalidOperation
from typing import Any
from sqlalchemy import or_, select
from sqlalchemy.orm import Session, selectinload
from app.core.agent_enums import AgentAssetStatus, AgentAssetType
from app.models.employee import Employee
from app.models.financial_record import ExpenseClaim
from app.schemas.agent_asset import AgentAssetListItem
from app.schemas.user_agent import (
UserAgentCitation,
UserAgentDraftPayload,
UserAgentExpenseQueryRecord,
UserAgentQueryPayload,
UserAgentQueryStatusGroup,
UserAgentReviewAction,
UserAgentReviewEditField,
UserAgentReviewClaimGroup,
UserAgentReviewDocumentCard,
UserAgentReviewDocumentField,
UserAgentReviewPayload,
UserAgentReviewRiskBrief,
UserAgentReviewSlotCard,
UserAgentRequest,
UserAgentResponse,
UserAgentSuggestedAction,
)
from app.services.agent_assets import AgentAssetService
from app.services.agent_foundation import AgentFoundationService
from app.services.expense_claims import ExpenseClaimService
from app.services.risk_ontology_bridge import resolve_rule_codes_for_risk_check
from app.services.runtime_chat import RuntimeChatService
SCENARIO_LABELS = {
"expense": "报销",
"accounts_receivable": "应收",
"accounts_payable": "应付",
"knowledge": "知识",
"unknown": "通用",
}
RISK_REASON_MAP = {
"duplicate_expense": "检测到同员工、同金额或近似单据存在重复提交迹象。",
"location_mismatch": "申报出差地点与票据识别地点可能不一致,需要核对行程或补充说明。",
"amount_over_limit": "金额超过当前制度或预算阈值,需要补充例外说明。",
"invoice_anomaly": "票据或附件完整性不满足当前规则要求,需要补件或人工复核。",
"ar_overdue": "应收账款已出现逾期,存在回款延迟风险。",
"ap_overdue": "应付付款已出现逾期,可能影响供应商履约或合作关系。",
}
GENERIC_EXPENSE_PROMPTS = {
"报销",
"我要报销",
"我想报销",
"帮我报销",
"我要申请报销",
"发起报销",
"提交报销",
}
EXPLICIT_DRAFT_KEYWORDS = ("生成", "草稿", "起草", "创建", "发起", "准备")
EXPENSE_TYPE_LABELS = {
"travel": "差旅费",
"hotel": "住宿费",
"transport": "交通费",
"meal": "餐费",
"meeting": "会务费",
"entertainment": "业务招待费",
"office": "办公费",
"training": "培训费",
"communication": "通讯费",
"welfare": "福利费",
"other": "其他费用",
}
GROUP_SCENE_LABELS = {
"travel": "差旅费",
"entertainment": "业务招待费",
"meal": "伙食费",
"transport": "交通费",
"hotel": "住宿费",
"office": "办公费",
"training": "培训费",
"communication": "通讯费",
"welfare": "福利费",
"other": "其他费用",
}
KNOWLEDGE_MODEL_MAIN_TIMEOUT_SECONDS = 3
KNOWLEDGE_MODEL_BACKUP_TIMEOUT_SECONDS = 5
KNOWLEDGE_MODEL_TIMEOUT_SECONDS = KNOWLEDGE_MODEL_BACKUP_TIMEOUT_SECONDS
KNOWLEDGE_DIRECT_ANSWER_HINTS = (
"是什么",
"标准",
"限额",
"流程",
"条件",
"规则",
"怎么",
"如何",
"哪些",
"需要",
"是否",
"区别",
"范围",
"额度",
"金额",
"多少",
"多少钱",
"上限",
)
KNOWLEDGE_QUERY_STOPWORDS = {
"什么",
"多少",
"哪些",
"怎么",
"如何",
"请问",
"一下",
"关于",
"规定",
"标准",
"可以",
"是否",
"一个",
"哪些人",
"目前",
"当前",
"一下子",
}
MAX_KNOWLEDGE_QUERY_TERMS = 12
MAX_KNOWLEDGE_DIRECT_EVIDENCE = 4
MAX_KNOWLEDGE_MODEL_HITS = 5
KNOWLEDGE_SECTION_HEADING_PATTERN = re.compile(
r"^(#\s*.+|##\s*.+|###\s*.+|第[一二三四五六七八九十百零0-9]+[章节条]\s*.*|[一二三四五六七八九十]+、.*|[一二三四五六七八九十]+.*|\([一二三四五六七八九十]+\).*)$"
)
KNOWLEDGE_LIST_ITEM_PATTERN = re.compile(r"^[-*•]\s+.+$")
KNOWLEDGE_NUMBERED_ITEM_PATTERN = re.compile(
r"^(?:(?:\d+[.)、])|(?:[(][一二三四五六七八九十百零0-9]+[)])|[①②③④⑤⑥⑦⑧⑨⑩])\s*.+$"
)
KNOWLEDGE_ARTICLE_PATTERN = re.compile(r"^(第[一二三四五六七八九十百零0-9]+条)\s*.*$")
EXPENSE_STATUS_LABELS = {
"draft": "草稿",
"submitted": "已提交",
"review": "审核中",
"approved": "已通过",
"rejected": "已驳回",
"paid": "已付款",
}
EXPENSE_STATUS_GROUP_LABELS = {
"draft": "草稿",
"in_progress": "审批中",
"completed": "审批完成",
"other": "其他状态",
}
SLOT_LABELS = {
"expense_type": "报销类型",
"customer_name": "客户名称",
"time_range": "发生时间",
"location": "地点",
"merchant_name": "酒店/商户",
"amount": "金额",
"reason": "事由说明",
"participants": "参与人员",
"attachments": "票据附件",
}
DATE_TEXT_PATTERN = re.compile(r"(\d{4}[年/-]\d{1,2}[月/-]\d{1,2}日?)")
AMOUNT_TEXT_PATTERN = re.compile(
r"(\d+(?:\.\d+)?)\s*(?:万元|万员|万圆|万园|万块|万元整|元整|块钱|块|元|员|圆|园|万)"
)
DOCUMENT_AMOUNT_PATTERN = re.compile(
r"(?:价税合计|合计金额|费用合计|订单(?:总)?金额|支付(?:金额)?|实付(?:金额)?|实收(?:金额)?|总(?:额|计|价)|票价|金额|车费|消费金额)"
r"[:\s¥¥人民币]*([0-9]+(?:[.,][0-9]{1,2})?)"
)
DOCUMENT_CURRENCY_AMOUNT_PATTERN = re.compile(r"[¥¥]\s*([0-9]+(?:[.,][0-9]{1,2})?)")
SOURCE_LABELS = {
"user_text": "用户描述",
"user_form": "用户修改",
"ocr": "票据识别",
"upload": "上传附件",
"detail_context": "关联单据",
"system_context": "系统上下文",
"inferred": "语义推断",
"system": "系统判断",
}
SCENE_REQUIRED_SLOT_KEYS = {
"hotel": {"merchant_name"},
"meeting": {"location"},
"entertainment": {"location", "customer_name", "participants"},
}
INFERRED_REASON_LABELS = {
"travel": "出差行程",
"hotel": "住宿报销",
"transport": "交通出行",
"meal": "餐饮用餐",
"meeting": "会务活动",
"entertainment": "客户接待",
"office": "办公采购",
"training": "培训学习",
"communication": "通讯使用",
"welfare": "员工福利",
"other": "其他费用",
}
SYSTEM_GENERATED_REASON_PREFIXES = (
"我上传了",
"请按当前已识别信息",
"请把当前上传的票据",
"请基于当前上传的多张票据",
"我已核对右侧识别结果",
"请同步修正逐票据识别结果",
"我已修改识别信息",
"查看报销草稿",
"请解释一下当前这笔报销的合规风险和待补充项",
)
LEADING_REASON_TIME_PATTERNS = (
re.compile(
r"^\s*(?:识别事项(?:有)?[:]\s*)?"
r"(?:业务发生(?:时间|日期)|费用发生(?:时间|日期)|发生(?:时间|日期)|报销(?:时间|日期)|时间)[:]?\s*"
r"(?:19|20)\d{2}[-/年.]\d{1,2}[-/月.]\d{1,2}日?\s*[,。;;、]?\s*"
),
re.compile(
r"^\s*(?:19|20)\d{2}[-/年.]\d{1,2}[-/月.]\d{1,2}日?\s*[,。;;、]\s*"
),
)
AMOUNT_UNIT_ALIASES = {
"": "",
"": "",
"": "",
"": "",
"块钱": "",
"元整": "",
"万员": "万元",
"万圆": "万元",
"万园": "万元",
"万块": "万元",
"万元整": "万元",
}
class UserAgentService:
def __init__(self, db: Session) -> None:
self.db = db
self.asset_service = AgentAssetService(db)
self.runtime_chat_service = RuntimeChatService(db)
def respond(self, payload: UserAgentRequest) -> UserAgentResponse:
AgentFoundationService(self.db).ensure_foundation_ready()
citations = self._build_citations(payload)
suggested_actions = self._build_suggested_actions(payload)
risk_flags = self._resolve_risk_flags(payload)
query_payload = self._build_query_payload(payload)
draft_payload = (
self._build_draft_payload(payload)
if self._should_build_draft_payload(payload)
else None
)
review_payload = self._build_review_payload(
payload,
citations=citations,
draft_payload=draft_payload,
)
review_answer = self._build_review_body_answer(
payload,
review_payload=review_payload,
draft_payload=draft_payload,
)
if payload.degraded and payload.tool_payload.get("message"):
return UserAgentResponse(
answer=review_answer or str(payload.tool_payload["message"]),
citations=citations,
suggested_actions=suggested_actions,
query_payload=query_payload,
draft_payload=draft_payload,
review_payload=review_payload,
risk_flags=risk_flags,
requires_confirmation=payload.requires_confirmation,
)
if review_answer:
return UserAgentResponse(
answer=review_answer,
citations=citations,
suggested_actions=suggested_actions,
query_payload=query_payload,
draft_payload=draft_payload,
review_payload=review_payload,
risk_flags=risk_flags,
requires_confirmation=payload.requires_confirmation,
)
guided_answer = None
if draft_payload is None or draft_payload.claim_id is None:
guided_answer = self._build_guided_answer(payload)
if guided_answer:
return UserAgentResponse(
answer=guided_answer,
citations=citations,
suggested_actions=suggested_actions,
query_payload=query_payload,
draft_payload=draft_payload,
review_payload=review_payload,
risk_flags=risk_flags,
requires_confirmation=payload.requires_confirmation,
)
fast_knowledge_answer = self._build_fast_knowledge_answer(
payload,
citations=citations,
)
if fast_knowledge_answer:
return UserAgentResponse(
answer=fast_knowledge_answer,
citations=citations,
suggested_actions=suggested_actions,
query_payload=query_payload,
draft_payload=draft_payload,
review_payload=review_payload,
risk_flags=risk_flags,
requires_confirmation=payload.requires_confirmation,
)
fallback_answer = self._build_fallback_answer(
payload,
citations=citations,
draft_payload=draft_payload,
)
answer = None
if not self._should_skip_model_answer(payload, review_payload):
answer = self._generate_answer_with_model(
payload,
citations=citations,
suggested_actions=suggested_actions,
risk_flags=risk_flags,
draft_payload=draft_payload,
fallback_answer=fallback_answer,
)
return UserAgentResponse(
answer=answer or fallback_answer,
citations=citations,
suggested_actions=suggested_actions,
query_payload=query_payload,
draft_payload=draft_payload,
review_payload=review_payload,
risk_flags=risk_flags,
requires_confirmation=payload.requires_confirmation,
)
def _build_fallback_answer(
self,
payload: UserAgentRequest,
*,
citations: list[UserAgentCitation],
draft_payload: UserAgentDraftPayload | None,
) -> str:
if str(payload.tool_payload.get("result_type") or "").strip() == "knowledge_search":
return self._build_explain_answer(payload, citations)
if payload.ontology.intent in {"query", "compare"}:
return self._build_query_answer(payload)
if payload.ontology.intent == "risk_check":
return self._build_risk_answer(payload, citations)
if payload.ontology.intent == "draft":
tool_message = str(payload.tool_payload.get("message") or "").strip()
if payload.tool_payload.get("draft_limit_reached"):
return tool_message or "你当前已保存 3 个草稿,请先完成已保存的草稿,才能再次新建草稿。"
if tool_message and (
str(payload.tool_payload.get("claim_id") or "").strip()
or str(payload.tool_payload.get("claim_no") or "").strip()
):
return tool_message
if payload.ontology.intent == "draft" and draft_payload is not None:
return (
f"已生成 {draft_payload.title},当前仅返回待人工确认的草稿内容,"
"仍需人工确认后再进入正式流程。"
)
return self._build_explain_answer(payload, citations)
def _build_guided_answer(self, payload: UserAgentRequest) -> str | None:
if not self._is_generic_expense_prompt(payload):
return self._build_implicit_expense_draft_guidance(payload)
attachment_names = self._resolve_attachment_names(payload)
ocr_summary = str(payload.context_json.get("ocr_summary") or "").strip()
attachment_hint = ""
if ocr_summary:
attachment_hint = f" 我已读取附件 OCR 摘要:{ocr_summary}"
elif attachment_names:
attachment_hint = (
f" 我已带入 {len(attachment_names)} 份附件名称,但目前还不能直接读取附件内容,"
"仍需要你补充关键信息。"
)
return (
"可以帮你发起报销。请补充费用类型、发生时间、金额、事由和相关对象,"
"或者直接上传票据附件,我再继续帮你判断能否报、缺什么材料以及生成报销草稿。"
f"{attachment_hint}"
)
def _build_implicit_expense_draft_guidance(
self,
payload: UserAgentRequest,
) -> str | None:
if not self._is_implicit_expense_draft_request(payload):
return None
amount_text = next(
(item.value for item in payload.ontology.entities if item.type == "amount"),
"",
)
expense_type = next(
(
EXPENSE_TYPE_LABELS.get(item.normalized_value, item.value)
for item in payload.ontology.entities
if item.type == "expense_type"
),
"报销",
)
time_text = payload.ontology.time_range.raw or "本次"
amount_hint = f",金额 {amount_text}" if amount_text else ""
return (
f"已识别到一笔{time_text}{expense_type}支出{amount_hint}"
"如果要继续生成报销草稿,还需要补充客户单位、参与人员、费用明细和票据附件。"
"你也可以继续上传发票或图片,我会把这些信息带入后续对话。"
)
def _generate_answer_with_model(
self,
payload: UserAgentRequest,
*,
citations: list[UserAgentCitation],
suggested_actions: list[UserAgentSuggestedAction],
risk_flags: list[str],
draft_payload: UserAgentDraftPayload | None,
fallback_answer: str,
) -> str | None:
messages = self._build_model_messages(
payload,
citations=citations,
suggested_actions=suggested_actions,
risk_flags=risk_flags,
draft_payload=draft_payload,
fallback_answer=fallback_answer,
)
answer = self._sanitize_model_answer(
self.runtime_chat_service.complete(
messages,
max_tokens=800 if payload.ontology.scenario == "knowledge" else 420,
temperature=0.2,
timeout_seconds=(
KNOWLEDGE_MODEL_TIMEOUT_SECONDS
if payload.ontology.scenario == "knowledge"
else None
),
slot_timeouts=(
{
"main": KNOWLEDGE_MODEL_MAIN_TIMEOUT_SECONDS,
"backup": KNOWLEDGE_MODEL_BACKUP_TIMEOUT_SECONDS,
}
if payload.ontology.scenario == "knowledge"
else None
),
max_attempts=1 if payload.ontology.scenario == "knowledge" else None,
)
)
return self._reject_unsupported_location_inference(payload, answer)
def _sanitize_model_answer(self, answer: str | None) -> str | None:
if not answer:
return None
cleaned = re.sub(r"<think>.*?</think>", "", answer, flags=re.DOTALL | re.IGNORECASE)
cleaned = cleaned.strip()
leaked_reasoning_markers = (
"用户问的是",
"让我分析一下",
"实体识别",
"从对话历史来看",
"从tool_payload来看",
"现在问题是",
"我需要:",
"关键是我",
)
if any(marker in cleaned[:500] for marker in leaked_reasoning_markers):
return None
return cleaned or None
@staticmethod
def _extract_query_location(message: str) -> str:
match = re.search(r"(?:去|到|前往)([\u4e00-\u9fff]{2,8})(?:出差|开会|培训)", str(message or ""))
return match.group(1) if match else ""
def _reject_unsupported_location_inference(
self,
payload: UserAgentRequest,
answer: str | None,
) -> str | None:
del payload
return answer
def _build_model_messages(
self,
payload: UserAgentRequest,
*,
citations: list[UserAgentCitation],
suggested_actions: list[UserAgentSuggestedAction],
risk_flags: list[str],
draft_payload: UserAgentDraftPayload | None,
fallback_answer: str,
) -> list[dict[str, str]]:
knowledge_question = (
self._resolve_knowledge_question(payload)
if payload.ontology.scenario == "knowledge"
else ""
)
facts = {
"run_id": payload.run_id,
"user_message": payload.message,
"ontology": payload.ontology.model_dump(mode="json"),
"context": {
"entry_source": payload.context_json.get("entry_source"),
"user_name": payload.context_json.get("name"),
"user_role": payload.context_json.get("role"),
"user_department": payload.context_json.get("department_name")
or payload.context_json.get("department"),
"user_position": payload.context_json.get("position"),
"user_grade": payload.context_json.get("grade"),
"employee_no": payload.context_json.get("employee_no"),
"manager_name": payload.context_json.get("manager_name"),
"employee_location": payload.context_json.get("employee_location"),
"cost_center": payload.context_json.get("cost_center"),
"finance_owner_name": payload.context_json.get("finance_owner_name"),
"employee_risk_profile": payload.context_json.get("employee_risk_profile", {}),
"user_role_codes": payload.context_json.get("role_codes", []),
"is_admin": bool(payload.context_json.get("is_admin")),
"request_context": payload.context_json.get("request_context"),
"attachment_count": payload.context_json.get("attachment_count"),
"attachment_names": self._resolve_attachment_names(payload),
"ocr_summary": payload.context_json.get("ocr_summary", ""),
"ocr_documents": payload.context_json.get("ocr_documents", []),
"conversation_id": payload.context_json.get("conversation_id"),
"conversation_scenario": payload.context_json.get("conversation_scenario"),
"conversation_intent": payload.context_json.get("conversation_intent"),
"draft_claim_id": payload.context_json.get("draft_claim_id"),
"conversation_history": self._resolve_conversation_history(payload),
},
"tool_payload": self._build_model_tool_payload(
payload.tool_payload,
question=knowledge_question,
),
"citations": [item.model_dump(mode="json") for item in citations],
"suggested_actions": [item.model_dump(mode="json") for item in suggested_actions],
"risk_flags": risk_flags,
"draft_payload": draft_payload.model_dump(mode="json") if draft_payload is not None else None,
"selected_capability_codes": payload.selected_capability_codes,
"requires_confirmation": payload.requires_confirmation,
"fallback_answer": fallback_answer,
}
if payload.ontology.scenario == "knowledge":
facts["knowledge_evidence_blocks"] = self._build_knowledge_evidence_blocks(
payload.tool_payload,
question=knowledge_question,
)
facts["knowledge_answer_evidence"] = [
{
"title": str(item.get("title") or "").strip(),
"heading": str(item.get("heading") or "").strip(),
"kind": str(item.get("kind") or "").strip(),
"content": str(item.get("content") or "").strip(),
}
for item in self._build_knowledge_answer_evidence(payload)
]
if payload.ontology.scenario == "knowledge":
answer_style_instruction = (
"你是财务制度知识问答助手。只能依据 facts.tool_payload.hits、facts.knowledge_answer_evidence、citations 与 conversation_history 回答,"
"不要扩展成通用助手。优先直接回答,不要复述思考过程,不要输出 JSON、代码块或 <think>。"
"回答风格要像一位真正熟悉制度的财务伙伴:先直接回应用户的核心问题,再用一张简洁表格或短段落说明依据,"
"最后补充最重要的注意事项。不要写成“已检索到内容”的系统回执,也不要把命中片段连缀成答案。"
"必须优先回答用户当前这句话本身,不能把制度标题、制度全文或完整标准表当成主答案。"
"如果用户问的是某次具体行程“一共能报多少”,就先给“当前已能确认的金额”,再用一张很短的表说明项目、"
"适用标准、计算式和结果;如果总额还缺少住宿晚数、实际票据或其他必要条件,就明确写出“暂不能确认的部分”。"
"只有用户明确在问“标准有哪些”或“制度全文怎么规定”时,才展开完整标准表。"
"如果命中的知识已经足够支持计算、比较或归纳,就直接给出结论;金额、标准、天数、补贴等问题要把计算过程写清楚。"
"适合时请使用 Markdown 二级标题、短段落和表格,让回答更清晰;表格必须保证每一行列数一致,不要出现空白残列。"
"只能陈述 hits 中明确出现的事实,不能用常识、外部知识或主观推断补齐缺失条件。"
"回答前先在全部 hits 中寻找与问题最直接相关的章节、表格或条目,不能只依赖排在最前面的片段。"
"如果 facts.knowledge_answer_evidence 中已经给出更短的高相关证据,优先基于这些证据组织答案,再回看原始 hits 补上下文。"
"如果某个表格在检索片段中已经被摊平成连续文本,只有在行、列和数值对应关系能够从片段本身明确确认时才能据此计算;"
"如果列对应关系不清楚,必须说明表格结构在当前片段中不够清晰,不能把第一列或相邻数字想当然套给用户。"
"如果 hits 中出现“结构化表格补充”,它表示知识归纳阶段已经把原文表格重新整理过,"
"优先使用这类结构化表格来理解行列关系,再回看原文确认上下文。"
"facts.knowledge_evidence_blocks 中保留了原始换行和定宽排版;遇到表格时,优先按这些证据块阅读,"
"必须按表头从左到右逐列对应数值,不能把第一列的数值直接套给后面的列名。"
"如果完成计算或归纳仍缺少某个关键映射关系、适用条件或数值依据,必须明确说明当前知识库还缺哪一项信息,再给出已能确认的部分。"
"如果用户问题里没有明确给出某个套用条件,而 hits 或 evidence 里也没有明确出现,就不能自己补一个默认值。"
"当问题涉及追问时,必须结合 conversation_history 延续上一轮上下文,而不是重新泛化成制度全文摘录。"
"不要大段粘贴原始命中文本;只提炼与问题直接相关的规则、条件、金额和注意事项。"
"如果依据仍然不足,明确指出缺少哪一项信息,再给出当前能确认的部分。"
)
else:
answer_style_instruction = "用 2 到 4 段完成回答,优先给结论,再补充最关键的依据与下一步建议。"
personalization_instruction = (
"如果 context.user_name 存在,并且当前问题与员工本人适用标准、报销额度、审批权限、职级待遇有关,"
"开头应自然称呼一次用户,例如“曹笑竹,您好”。"
"如果需要根据员工身份判断标准,优先参考 context.user_grade 与 context.user_position。"
"如果问题与用户身份无关,就不要生硬加入姓名、职级或岗位。"
)
system_prompt = (
"你是 X-Financial 的专业财务 AI 助手。"
"回答必须准确、自然、可执行,不要泄露中间推理。"
"当知识问题有命中依据时,先给结论,再给结构化说明。"
"不要把制度全文原样搬出来,不要把检索片段当作最终答案直接粘贴。"
"如果使用表格,确保列名简洁、数值明确。"
f"{personalization_instruction}"
f"{answer_style_instruction}"
)
user_prompt = (
"请严格依据下面的 facts 生成最终答复:\n"
f"{json.dumps(facts, ensure_ascii=False, indent=2)}"
)
return [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
]
@staticmethod
def _build_model_tool_payload(
tool_payload: dict[str, Any],
*,
question: str | None = None,
) -> dict[str, Any]:
normalized = dict(tool_payload or {})
hits = []
for item in UserAgentService._select_knowledge_model_hits(
tool_payload,
question=question,
):
if not isinstance(item, dict):
continue
hits.append(
{
"title": str(item.get("title") or "").strip(),
"document_name": str(item.get("document_name") or "").strip(),
"excerpt": str(item.get("excerpt") or "").strip(),
"content": str(item.get("content") or "").strip()[:1200],
"tags": list(item.get("tags") or [])[:5],
"evidence": list(item.get("evidence") or [])[:3],
"code": str(item.get("code") or "").strip(),
}
)
normalized["hits"] = hits
return normalized
@staticmethod
def _build_knowledge_evidence_blocks(
tool_payload: dict[str, Any],
*,
question: str | None = None,
) -> str:
blocks: list[str] = []
for index, item in enumerate(
UserAgentService._select_knowledge_model_hits(
tool_payload,
question=question,
)[:3],
start=1,
):
if not isinstance(item, dict):
continue
title = str(item.get("title") or item.get("document_name") or f"证据 {index}").strip()
code = str(item.get("code") or "").strip()
content = str(item.get("content") or "").strip()
if not content:
continue
blocks.append(
"\n".join(
[
f"[证据 {index}] {title}" + (f" ({code})" if code else ""),
"```text",
content[:1200],
"```",
]
)
)
return "\n\n".join(blocks)
@staticmethod
def _select_knowledge_model_hits(
tool_payload: dict[str, Any],
*,
question: str | None = None,
) -> list[dict[str, Any]]:
raw_hits = [
item
for item in list(tool_payload.get("hits") or [])
if isinstance(item, dict)
][: max(MAX_KNOWLEDGE_MODEL_HITS + 1, 6)]
if not raw_hits:
return []
query_terms = UserAgentService._extract_knowledge_query_terms(question or "")
if not query_terms:
return raw_hits[:MAX_KNOWLEDGE_MODEL_HITS]
ranked_hits = sorted(
enumerate(raw_hits),
key=lambda value: (
UserAgentService._score_knowledge_model_hit(
value[1],
query_terms=query_terms,
rank_index=value[0],
),
-value[0],
),
reverse=True,
)
return [item for _, item in ranked_hits[:MAX_KNOWLEDGE_MODEL_HITS]]
@staticmethod
def _score_knowledge_model_hit(
item: dict[str, Any],
*,
query_terms: list[str],
rank_index: int,
) -> int:
title = str(item.get("title") or item.get("document_name") or "").lower()
excerpt = str(item.get("excerpt") or "").lower()
content = str(item.get("content") or "").lower()
haystack = "\n".join([title, excerpt, content[:1400]])
matched_terms = [term for term in query_terms if term in haystack]
score = max(1, 48 - rank_index * 4)
score += len(matched_terms) * 10
score += sum(1 for term in matched_terms if term in title) * 8
leading_marker = UserAgentService._leading_knowledge_appendix_marker(content)
if leading_marker == "# 章节导航":
score -= 22
elif leading_marker == "# 问答线索补充":
score += 6 if matched_terms else -8
elif leading_marker == "# 重点章节摘录":
score += 4 if matched_terms else -4
elif leading_marker == "# 结构化表格补充":
score += 8 if matched_terms else -3
if matched_terms and "|" in content:
score += 8
if matched_terms and any(marker in content for marker in ("", ":")):
score += 10
if matched_terms and "\n" in content:
score += 4
if matched_terms and any(marker in content for marker in ("附表", "", "")):
score += 4
if matched_terms and any(marker in content for marker in ("", "", "", "-", "")):
score += 4
if re.search(r"没有.{0,8}(信息|规定|说明|依据)", content):
score -= 12
return score
@staticmethod
def _leading_knowledge_appendix_marker(content: str) -> str:
normalized = str(content or "").lstrip()
for marker in ("# 章节导航", "# 重点章节摘录", "# 问答线索补充", "# 结构化表格补充"):
index = normalized.find(marker)
if 0 <= index <= 220:
return marker
return ""
def _build_query_answer(self, payload: UserAgentRequest) -> str:
scenario = payload.ontology.scenario
data = payload.tool_payload
subject = self._resolve_subject(payload)
if scenario == "expense":
query_payload = self._build_query_payload(payload)
scope_label = str(data.get("scope_label") or subject).strip() or subject
if query_payload is None:
return f"当前没有查到{scope_label}。你可以补充时间范围、单号或状态继续筛选。"
window_prefix = (
f"{query_payload.window_start_date}{query_payload.window_end_date}"
if query_payload.recent_window_applied
and query_payload.window_start_date
and query_payload.window_end_date
else (
f"{query_payload.window_days} 日内"
if query_payload.recent_window_applied and query_payload.window_days
else "当前条件下"
)
)
if query_payload.record_count <= 0:
if query_payload.older_record_count > 0 and query_payload.window_days:
return (
f"{window_prefix}没有查到{query_payload.scope_label}"
f"另有 {query_payload.older_record_count} 笔超过 {query_payload.window_days} 日的单据,"
"请前往个人报销中心查看。"
)
return f"{window_prefix}没有查到{query_payload.scope_label}。你可以补充时间范围、单号或状态继续筛选。"
group_lines = [
f"{item.label} {item.count}"
for item in query_payload.status_groups
if item.count > 0
]
answer_parts = [
f"我先为你列出{window_prefix}{query_payload.scope_label}"
f"{query_payload.record_count} 笔,金额合计 {query_payload.total_amount:.2f} 元。"
]
if group_lines:
answer_parts.append(f"其中包括:{''.join(group_lines)}")
hint_parts: list[str] = []
if query_payload.has_more_in_window and query_payload.preview_count < query_payload.record_count:
hint_parts.append(
f"下方先展示最近 {query_payload.preview_count} 笔,你可以直接点击单据查看详情。"
)
elif query_payload.records:
hint_parts.append("下方已列出本次命中的真实单据,可直接点击查看详情。")
if query_payload.older_record_count > 0 and query_payload.window_days:
hint_parts.append(
f"另有 {query_payload.older_record_count} 笔超过 {query_payload.window_days} 日的单据,"
"请前往个人报销中心查看。"
)
return " ".join(answer_parts + hint_parts).strip()
if scenario == "accounts_receivable":
record_count = int(data.get("record_count") or 0)
outstanding_amount = float(data.get("outstanding_amount") or 0)
return (
f"{subject}共命中 {record_count} 条应收,未回款金额 {outstanding_amount:.2f} 元。"
"建议结合账龄和客户分布继续排查逾期风险。"
)
if scenario == "accounts_payable":
record_count = int(data.get("record_count") or 0)
outstanding_amount = float(data.get("outstanding_amount") or 0)
return (
f"{subject}共命中 {record_count} 条应付,待付金额 {outstanding_amount:.2f} 元。"
"如需推进动作,建议先生成付款建议草稿并发起人工确认。"
)
return "已完成当前查询,但暂时没有更多结构化结果可展示。"
def _build_query_payload(
self,
payload: UserAgentRequest,
) -> UserAgentQueryPayload | None:
if payload.ontology.scenario != "expense" or payload.ontology.intent not in {"query", "compare"}:
return None
result_type = str(payload.tool_payload.get("result_type") or "").strip()
if result_type and result_type != "expense_claim_list":
return None
records: list[UserAgentExpenseQueryRecord] = []
for item in payload.tool_payload.get("records") or []:
if not isinstance(item, dict):
continue
amount = float(item.get("amount") or 0)
records.append(
UserAgentExpenseQueryRecord(
claim_id=str(item.get("claim_id") or "").strip(),
claim_no=str(item.get("claim_no") or "").strip() or "未编号",
employee_name=str(item.get("employee_name") or "").strip(),
expense_type=str(item.get("expense_type") or "").strip(),
expense_type_label=str(item.get("expense_type_label") or "").strip()
or EXPENSE_TYPE_LABELS.get(str(item.get("expense_type") or "").strip(), "报销"),
amount=round(amount, 2),
status=str(item.get("status") or "").strip(),
status_label=str(item.get("status_label") or "").strip()
or EXPENSE_STATUS_LABELS.get(str(item.get("status") or "").strip(), "处理中"),
status_group=str(item.get("status_group") or "").strip() or "other",
status_group_label=str(item.get("status_group_label") or "").strip()
or EXPENSE_STATUS_GROUP_LABELS.get(str(item.get("status_group") or "").strip(), "其他状态"),
approval_stage=str(item.get("approval_stage") or "").strip() or None,
document_date=str(item.get("document_date") or "").strip(),
occurred_at=str(item.get("occurred_at") or "").strip(),
reason=str(item.get("reason") or "").strip(),
location=str(item.get("location") or "").strip(),
)
)
status_groups: list[UserAgentQueryStatusGroup] = []
for item in payload.tool_payload.get("status_groups") or []:
if not isinstance(item, dict):
continue
status_groups.append(
UserAgentQueryStatusGroup(
key=str(item.get("key") or "").strip() or "other",
label=str(item.get("label") or "").strip() or "其他状态",
count=max(0, int(item.get("count") or 0)),
)
)
return UserAgentQueryPayload(
result_type="expense_claim_list",
scope_label=str(payload.tool_payload.get("scope_label") or self._resolve_subject(payload)).strip() or "报销单",
recent_window_applied=bool(payload.tool_payload.get("recent_window_applied")),
window_days=(
int(payload.tool_payload["window_days"])
if payload.tool_payload.get("window_days") not in {None, ""}
else None
),
window_start_date=(
str(payload.tool_payload.get("window_start_date") or "").strip() or None
),
window_end_date=(
str(payload.tool_payload.get("window_end_date") or "").strip() or None
),
record_count=max(0, int(payload.tool_payload.get("record_count") or 0)),
preview_count=max(0, int(payload.tool_payload.get("preview_count") or len(records))),
older_record_count=max(0, int(payload.tool_payload.get("older_record_count") or 0)),
has_more_in_window=bool(payload.tool_payload.get("has_more_in_window") or payload.tool_payload.get("has_more")),
total_amount=round(float(payload.tool_payload.get("total_amount") or 0), 2),
status_groups=status_groups,
records=records,
)
def _build_fast_knowledge_answer(
self,
payload: UserAgentRequest,
*,
citations: list[UserAgentCitation],
) -> str | None:
if payload.ontology.scenario != "knowledge":
return None
if str(payload.tool_payload.get("result_type") or "").strip() != "knowledge_search":
return None
evidence_items = self._build_knowledge_answer_evidence(payload)
if not evidence_items:
return None
question = self._resolve_knowledge_question(payload)
if not self._should_use_direct_knowledge_answer(question, evidence_items):
return None
return self._render_knowledge_direct_answer(
payload,
citations=citations,
evidence_items=evidence_items,
)
def _render_knowledge_direct_answer(
self,
payload: UserAgentRequest,
*,
citations: list[UserAgentCitation],
evidence_items: list[dict[str, Any]],
) -> str | None:
if not evidence_items:
return None
title = str(
(citations[0].title if citations else "")
or evidence_items[0].get("title")
or "相关制度"
).strip()
user_name = str(payload.context_json.get("name") or "").strip()
question = self._resolve_knowledge_question(payload)
query_terms = self._extract_knowledge_query_terms(question)
ordered_evidence_items = self._prioritize_knowledge_evidence_items(question, evidence_items)
primary_item = ordered_evidence_items[0]
primary_heading = self._format_knowledge_heading_label(
str(primary_item.get("heading") or "").strip()
)
primary_lines = self._collect_direct_knowledge_answer_lines(ordered_evidence_items)
lines: list[str] = []
if user_name:
lines.append(f"{user_name},您好。")
source_prefix = f"根据《{title}"
if primary_heading:
source_prefix = f"{source_prefix}{primary_heading}"
if str(primary_item.get("kind") or "") == "table":
lines.append(f"{source_prefix},当前能直接确认的是:")
lines.append(self._extract_relevant_table_preview(str(primary_item.get("content") or ""), query_terms))
else:
if not primary_lines:
lines.append(
f"{source_prefix},当前能直接确认的是:"
f"{self._summarize_knowledge_evidence_content(primary_item, query_terms)}"
)
elif len(primary_lines) == 1:
lines.append(f"{source_prefix},当前能直接确认的是:{primary_lines[0].strip()}")
else:
lines.append(f"{source_prefix},当前能直接确认的是:")
lines.extend(primary_lines)
notes: list[str] = []
location_note = self._build_missing_location_grounding_note(question, evidence_items)
if location_note:
notes.append(location_note)
if self._question_requires_explicit_condition(question) and not self._answer_evidence_has_numeric_or_condition(evidence_items):
notes.append("当前命中的证据更偏规则说明或流程约束,还没有直接给出可立即套用的数值或完整条件。")
if notes:
lines.append("")
lines.append("说明:")
lines.extend(f"- {note}" for note in notes)
return "\n".join(line for line in lines if line is not None).strip()
def _prioritize_knowledge_evidence_items(
self,
question: str,
evidence_items: list[dict[str, Any]],
) -> list[dict[str, Any]]:
if not evidence_items or not self._question_requires_explicit_condition(question):
return evidence_items
for preferred_kind in ("table", "kv", "clause", "list"):
for index, item in enumerate(evidence_items):
if str(item.get("kind") or "") != preferred_kind:
continue
return [item, *evidence_items[:index], *evidence_items[index + 1 :]]
for index, item in enumerate(evidence_items):
if re.search(r"\d", str(item.get("content") or "")):
return [item, *evidence_items[:index], *evidence_items[index + 1 :]]
return evidence_items
@staticmethod
def _resolve_knowledge_question(payload: UserAgentRequest) -> str:
return str(payload.context_json.get("user_input_text") or payload.message or "").strip()
@staticmethod
def _looks_like_structured_knowledge_query(question: str) -> bool:
normalized = str(question or "").strip()
if not normalized:
return False
return any(keyword in normalized for keyword in KNOWLEDGE_DIRECT_ANSWER_HINTS)
def _should_use_direct_knowledge_answer(
self,
question: str,
evidence_items: list[dict[str, Any]],
) -> bool:
if not evidence_items:
return False
if self._looks_like_structured_knowledge_query(question):
return True
return str(evidence_items[0].get("kind") or "") in {"table", "kv", "list", "clause"}
def _build_knowledge_answer_evidence(
self,
payload: UserAgentRequest,
) -> list[dict[str, Any]]:
question = self._resolve_knowledge_question(payload)
query_terms = self._extract_knowledge_query_terms(question)
candidates: list[dict[str, Any]] = []
for hit in self._select_knowledge_model_hits(
payload.tool_payload,
question=question,
):
if not isinstance(hit, dict):
continue
candidates.extend(self._extract_knowledge_evidence_candidates(hit, query_terms))
deduped: list[dict[str, Any]] = []
seen: set[tuple[str, str, str]] = set()
ranked_candidates = sorted(
candidates,
key=lambda value: (
float(value.get("score") or 0),
-len(str(value.get("content") or "")),
),
reverse=True,
)
top_score = float(ranked_candidates[0].get("score") or 0) if ranked_candidates else 0.0
for item in ranked_candidates:
score = float(item.get("score") or 0)
if deduped and score < max(6.0, top_score - 14):
continue
key = (
str(item.get("title") or "").strip(),
str(item.get("heading") or "").strip(),
self._clean_knowledge_segment_text(str(item.get("content") or ""))[:180],
)
if key in seen:
continue
seen.add(key)
deduped.append(item)
if len(deduped) >= MAX_KNOWLEDGE_DIRECT_EVIDENCE:
break
return deduped
def _extract_knowledge_evidence_candidates(
self,
hit: dict[str, Any],
query_terms: list[str],
) -> list[dict[str, Any]]:
title = str(hit.get("title") or hit.get("document_name") or "相关制度").strip()
content = str(hit.get("content") or "").strip()
if not content:
return []
raw_candidates = self._merge_knowledge_lead_in_segments(
self._split_knowledge_hit_into_segments(content)
)
candidates: list[dict[str, Any]] = []
for item in raw_candidates:
score = self._score_knowledge_evidence_candidate(item, query_terms)
if query_terms and score <= 0:
continue
normalized = dict(item)
normalized["title"] = title
normalized["score"] = score
candidates.append(normalized)
if candidates:
return candidates
fallback_text = str(hit.get("excerpt") or "").strip() or self._extract_excerpt(content)
if not fallback_text:
return []
return [
{
"title": title,
"heading": "",
"kind": "paragraph",
"content": fallback_text,
"score": 1,
}
]
@staticmethod
def _is_knowledge_lead_in_segment(item: dict[str, str]) -> bool:
kind = str(item.get("kind") or "").strip()
content = str(item.get("content") or "").strip()
return kind in {"kv", "list", "clause"} and content.endswith(("", ":"))
@staticmethod
def _extract_knowledge_marker_family(content: str) -> str:
normalized = str(content or "").strip()
if not normalized:
return ""
if KNOWLEDGE_ARTICLE_PATTERN.match(normalized):
return "article"
if re.match(r"^\d+[.)、]\s*", normalized):
return "arabic"
if re.match(r"^[(][一二三四五六七八九十百零0-9]+[)]\s*", normalized):
return "paren"
if re.match(r"^[①②③④⑤⑥⑦⑧⑨⑩]\s*", normalized):
return "circled"
if KNOWLEDGE_LIST_ITEM_PATTERN.match(normalized):
return "bullet"
return ""
@staticmethod
def _format_knowledge_heading_label(heading: str) -> str:
parts = [item.strip() for item in str(heading or "").split(">") if item.strip()]
return " / ".join(parts)
def _merge_knowledge_lead_in_segments(
self,
segments: list[dict[str, str]],
) -> list[dict[str, str]]:
if not segments:
return []
merged: list[dict[str, str]] = []
index = 0
while index < len(segments):
current = dict(segments[index])
if not self._is_knowledge_lead_in_segment(current):
merged.append(current)
index += 1
continue
base_heading = str(current.get("heading") or "").strip()
current_marker = self._extract_knowledge_marker_family(str(current.get("content") or ""))
follow_segments: list[dict[str, str]] = []
next_index = index + 1
while next_index < len(segments):
candidate = segments[next_index]
if str(candidate.get("heading") or "").strip() != base_heading:
break
candidate_kind = str(candidate.get("kind") or "").strip()
candidate_content = str(candidate.get("content") or "").strip()
candidate_marker = self._extract_knowledge_marker_family(candidate_content)
if not candidate_content or candidate_kind == "table":
break
if current_marker and candidate_marker == current_marker:
break
if self._is_knowledge_lead_in_segment(candidate) and follow_segments:
break
if candidate_kind not in {"list", "paragraph", "kv", "clause"}:
break
follow_segments.append(candidate)
next_index += 1
if len(follow_segments) >= 4:
break
if candidate_kind == "paragraph" and len(candidate_content) >= 200:
break
if follow_segments:
current["content"] = "\n".join(
[str(current.get("content") or "").strip()]
+ [str(item.get("content") or "").strip() for item in follow_segments]
)
if any(str(item.get("kind") or "").strip() == "list" for item in follow_segments):
current["kind"] = "list"
merged.append(current)
index = next_index
continue
merged.append(current)
index += 1
return merged
def _split_knowledge_hit_into_segments(self, content: str) -> list[dict[str, str]]:
segments: list[dict[str, str]] = []
markdown_headings: list[str] = []
section_heading = ""
paragraph_lines: list[str] = []
table_lines: list[str] = []
def current_heading() -> str:
heading_parts = [item for item in markdown_headings if item]
if section_heading:
heading_parts.append(section_heading)
return " > ".join(heading_parts)
def flush_paragraph() -> None:
nonlocal paragraph_lines
if not paragraph_lines:
return
merged = " ".join(line.strip() for line in paragraph_lines if line.strip()).strip()
paragraph_lines = []
if merged:
segments.append(
{
"heading": current_heading(),
"kind": "paragraph",
"content": merged,
}
)
def flush_table() -> None:
nonlocal table_lines
if not table_lines:
return
merged = "\n".join(line.rstrip() for line in table_lines if line.strip()).strip()
table_lines = []
if merged:
segments.append(
{
"heading": current_heading(),
"kind": "table",
"content": merged,
}
)
for raw_line in str(content or "").replace("\r\n", "\n").replace("\r", "\n").splitlines():
line = raw_line.rstrip()
stripped = line.strip()
if not stripped:
flush_paragraph()
flush_table()
continue
markdown_heading_match = re.match(r"^(#{1,6})\s+(.+)$", stripped)
if markdown_heading_match:
flush_paragraph()
flush_table()
level = len(markdown_heading_match.group(1))
heading_text = markdown_heading_match.group(2).strip()
markdown_headings = markdown_headings[: max(0, level - 1)]
markdown_headings.append(heading_text)
section_heading = ""
continue
if KNOWLEDGE_SECTION_HEADING_PATTERN.match(stripped) and len(stripped) <= 90:
flush_paragraph()
flush_table()
section_heading = stripped.lstrip("#").strip()
continue
if stripped.count("|") >= 2 and "|" in stripped:
flush_paragraph()
table_lines.append(stripped)
continue
flush_table()
if KNOWLEDGE_LIST_ITEM_PATTERN.match(stripped):
flush_paragraph()
segments.append(
{
"heading": current_heading(),
"kind": "list",
"content": stripped,
}
)
continue
if KNOWLEDGE_NUMBERED_ITEM_PATTERN.match(stripped):
flush_paragraph()
segments.append(
{
"heading": current_heading(),
"kind": "list",
"content": stripped,
}
)
continue
if KNOWLEDGE_ARTICLE_PATTERN.match(stripped):
flush_paragraph()
segments.append(
{
"heading": current_heading(),
"kind": "clause",
"content": stripped,
}
)
continue
if ("" in stripped or ":" in stripped) and len(stripped) <= 180:
flush_paragraph()
segments.append(
{
"heading": current_heading(),
"kind": "kv",
"content": stripped,
}
)
continue
paragraph_lines.append(stripped)
flush_paragraph()
flush_table()
return segments
def _score_knowledge_evidence_candidate(
self,
item: dict[str, str],
query_terms: list[str],
) -> int:
heading = str(item.get("heading") or "").lower()
content = str(item.get("content") or "").lower()
kind = str(item.get("kind") or "").strip()
haystack = "\n".join([heading, content])
matched_terms = [term for term in query_terms if term in haystack]
score = len(matched_terms) * 10
score += sum(1 for term in matched_terms if term in heading) * 6
if kind == "table":
score += 10
elif kind in {"kv", "clause", "list"}:
score += 8
elif kind == "paragraph":
score += 4
if "问答线索补充" in heading or "重点章节摘录" in heading:
score += 8
if "结构化表格补充" in heading:
score += 10
if "章节导航" in heading or "目录" in heading:
score -= 16
if re.search(r"[.。…]{6,}", content):
score -= 12
if any(hint in content for hint in ("", "", "不得", "可以", "标准", "条件", "材料", "审批", "流程", "包括")):
score += 3
content_length = len(content)
if content_length > 220:
score -= min(8, (content_length - 220) // 40)
return score
@staticmethod
def _extract_knowledge_query_terms(question: str) -> list[str]:
normalized_question = str(question or "").strip().lower()
if not normalized_question:
return []
terms: list[str] = []
seen: set[str] = set()
def remember(term: str) -> None:
normalized = str(term or "").strip().lower()
if (
not normalized
or normalized in seen
or normalized in KNOWLEDGE_QUERY_STOPWORDS
):
return
seen.add(normalized)
terms.append(normalized)
for item in re.findall(r"[a-z0-9][a-z0-9_\-]{1,}", normalized_question):
remember(item)
for block in re.findall(r"[\u4e00-\u9fff]{2,20}", normalized_question):
if len(block) <= 4:
remember(block)
continue
for size in (4, 3, 2):
for start in range(0, len(block) - size + 1):
remember(block[start : start + size])
if len(terms) >= MAX_KNOWLEDGE_QUERY_TERMS:
return terms
return terms[:MAX_KNOWLEDGE_QUERY_TERMS]
@staticmethod
def _clean_knowledge_segment_text(content: str) -> str:
normalized = str(content or "").strip()
normalized = re.sub(r"^[-*•]\s*", "", normalized)
normalized = re.sub(r"^(?:\d+[.)、]|[①②③④⑤⑥⑦⑧⑨⑩])\s*", "", normalized)
normalized = re.sub(r"^[(][一二三四五六七八九十百零0-9]+[)]\s*", "", normalized)
normalized = re.sub(r"\s+", " ", normalized)
if len(normalized) <= 180:
return normalized
return f"{normalized[:177].rstrip()}..."
@staticmethod
def _normalize_knowledge_line(content: str, *, preserve_marker: bool) -> str:
normalized = str(content or "").strip()
normalized = re.sub(r"^[-*•]\s*", "", normalized)
if not preserve_marker:
normalized = re.sub(r"^(?:\d+[.)、]|[①②③④⑤⑥⑦⑧⑨⑩])\s*", "", normalized)
normalized = re.sub(r"^[(][一二三四五六七八九十百零0-9]+[)]\s*", "", normalized)
normalized = re.sub(r"\s+", " ", normalized)
return normalized
def _split_clean_knowledge_lines(
self,
content: str,
*,
preserve_marker: bool,
) -> list[str]:
return [
line
for line in (
self._normalize_knowledge_line(item, preserve_marker=preserve_marker)
for item in str(content or "").splitlines()
)
if line
]
def _render_knowledge_evidence_text(self, item: dict[str, Any]) -> str:
lines = self._split_clean_knowledge_lines(
str(item.get("content") or ""),
preserve_marker=True,
)
if not lines:
return ""
if len(lines) == 1:
return self._clean_knowledge_segment_text(lines[0])
return "\n".join(f" {line}" for line in lines)
def _collect_direct_knowledge_answer_lines(
self,
ordered_evidence_items: list[dict[str, Any]],
) -> list[str]:
if not ordered_evidence_items:
return []
primary_item = ordered_evidence_items[0]
primary_title = str(primary_item.get("title") or "").strip()
primary_heading = str(primary_item.get("heading") or "").strip()
primary_kind = str(primary_item.get("kind") or "").strip()
related_items = [primary_item]
if primary_kind != "table":
for item in ordered_evidence_items[1:]:
if len(related_items) >= 3:
break
if str(item.get("kind") or "").strip() != primary_kind:
continue
if str(item.get("title") or "").strip() != primary_title:
continue
if str(item.get("heading") or "").strip() != primary_heading:
continue
related_items.append(item)
lines: list[str] = []
seen: set[str] = set()
for item in related_items:
rendered = self._render_knowledge_evidence_text(item)
for line in rendered.splitlines():
normalized = str(line or "").strip()
if not normalized or normalized in seen:
continue
seen.add(normalized)
lines.append(line)
return lines
def _summarize_knowledge_evidence_content(
self,
item: dict[str, Any],
query_terms: list[str],
) -> str:
kind = str(item.get("kind") or "").strip()
content = str(item.get("content") or "").strip()
if kind == "table":
preview = self._extract_relevant_table_preview(content, query_terms)
preview_rows = [line for line in preview.splitlines() if line.strip()][:4]
if len(preview_rows) >= 3:
return "当前命中的直接依据是一张与问题强相关的标准表,已摘出最相关的表头和行。"
return "当前命中的直接依据是一张与问题强相关的标准表。"
lines = self._split_clean_knowledge_lines(content, preserve_marker=True)
if len(lines) >= 2:
return self._clean_knowledge_segment_text(f"{lines[0]} {' '.join(lines[1:4])}")
return self._clean_knowledge_segment_text(content)
@staticmethod
def _extract_relevant_table_preview(content: str, query_terms: list[str]) -> str:
lines = [line.strip() for line in str(content or "").splitlines() if line.strip()]
if len(lines) <= 3:
return "\n".join(lines)
header = lines[0]
divider = lines[1] if len(lines) > 1 else ""
body = lines[2:] if divider.count("|") >= 2 else lines[1:]
matched_rows = [
row
for row in body
if any(term in row.lower() for term in query_terms)
]
selected_rows = matched_rows[:3] or body[:2]
preview_lines = [header]
if divider:
preview_lines.append(divider)
preview_lines.extend(selected_rows)
return "\n".join(preview_lines).strip()
@staticmethod
def _question_requires_explicit_condition(question: str) -> bool:
normalized = str(question or "").strip()
return any(keyword in normalized for keyword in ("多少", "金额", "上限", "限额", "标准", "条件", "需要"))
def _build_missing_location_grounding_note(
self,
question: str,
evidence_items: list[dict[str, Any]],
) -> str:
location = self._extract_query_location(question)
if not location:
return ""
haystack = "\n".join(
str(item.get("heading") or "") + "\n" + str(item.get("content") or "")
for item in evidence_items
)
if location in haystack:
return ""
return (
f"当前命中的制度依据没有直接写出“{location}”对应的地区档位或映射关系,"
"因此不能直接把它套用到表格中的某一列。"
)
@staticmethod
def _answer_evidence_has_numeric_or_condition(evidence_items: list[dict[str, Any]]) -> bool:
for item in evidence_items:
content = str(item.get("content") or "")
if re.search(r"\d", content):
return True
if any(
keyword in content
for keyword in ("", "", "不得", "可以", "条件", "材料", "审批", "流程", "标准", "适用")
):
return True
return False
def _build_explain_answer(
self,
payload: UserAgentRequest,
citations: list[UserAgentCitation],
) -> str:
if str(payload.tool_payload.get("result_type") or "").strip() == "knowledge_search":
if citations:
return self._build_knowledge_search_answer(payload, citations)
tool_message = str(payload.tool_payload.get("message") or "").strip()
if tool_message:
return tool_message
if citations:
titles = "".join(item.title for item in citations[:2])
summary = citations[0].excerpt or "请结合制度全文进一步确认。"
return f"已检索到相关依据:{titles}。核心说明:{summary}"
return (
f"当前还没有与“{SCENARIO_LABELS.get(payload.ontology.scenario, '当前问题')}"
"强匹配的已上线规则引用,建议先人工复核或补充更具体的单据上下文。"
)
def _build_knowledge_search_answer(
self,
payload: UserAgentRequest,
citations: list[UserAgentCitation],
) -> str:
hits = [item for item in list(payload.tool_payload.get("hits") or []) if isinstance(item, dict)]
evidence_items = self._build_knowledge_answer_evidence(payload)
primary_citation = citations[0] if citations else None
title = str(
(primary_citation.title if primary_citation else "")
or (hits[0].get("title") if hits else "")
or "相关制度"
).strip()
user_name = str(payload.context_json.get("name") or "").strip()
prefix = f"{user_name},您好。\n" if user_name else ""
if not hits:
return (
f"{prefix}我已经从《{title}》中检索到与你这次问题相关的制度依据,"
"但本次答案生成环节暂时没有成功返回。请稍后重试一次;如果仍然失败,"
"建议先检查主对话模型的连通性。"
)
evidence_lines: list[str] = []
for item in evidence_items[:3]:
heading = str(item.get("heading") or "").strip()
heading_text = f" > {heading}" if heading else ""
if str(item.get("kind") or "") == "table":
preview = self._extract_relevant_table_preview(
str(item.get("content") or ""),
self._extract_knowledge_query_terms(self._resolve_knowledge_question(payload)),
)
evidence_lines.append(f"- 《{item.get('title') or title}{heading_text}\n{preview}")
continue
rendered = self._render_knowledge_evidence_text(item)
if rendered:
if "\n" in rendered:
evidence_lines.append(f"- 《{item.get('title') or title}{heading_text}\n{rendered}")
else:
evidence_lines.append(f"- 《{item.get('title') or title}{heading_text}{rendered}")
if not evidence_lines:
for item in hits[:2]:
item_title = str(item.get("title") or item.get("document_name") or "相关制度").strip()
excerpt = (
str(item.get("excerpt") or "").strip()
or self._extract_excerpt(str(item.get("content") or ""))
)
if not excerpt:
continue
evidence_lines.append(f"- 《{item_title}》:{excerpt}")
if not evidence_lines:
return (
f"{prefix}我已经从《{title}》中检索到与你这次问题相关的制度依据,"
"但本次答案生成环节暂时没有成功返回。请稍后重试一次;如果仍然失败,"
"建议先检查主对话模型的连通性。"
)
return "\n".join(
[
f"{prefix}我已经命中与你这次问题最相关的制度依据,但答案整理阶段本轮没有及时返回。",
"先给你当前最直接的依据:",
*evidence_lines,
"如果你希望我继续把这些依据整理成更完整的结论、步骤或对比说明,可以继续缩小问题范围后再问一次。",
]
).strip()
def _build_risk_answer(
self,
payload: UserAgentRequest,
citations: list[UserAgentCitation],
) -> str:
risk_flags = self._resolve_risk_flags(payload)
platform_messages = self._evaluate_platform_risk_messages(payload)
if not risk_flags and not platform_messages:
return "当前未识别到明确风险标签,建议继续查看原始明细或补充更多上下文。"
reasons = [
f"{flag}{RISK_REASON_MAP.get(flag, f'{flag} 需要人工进一步确认。')}"
for flag in risk_flags
]
if platform_messages:
reasons.extend(platform_messages)
citation_text = (
f" 参考规则:{''.join(item.title for item in citations[:2])}"
if citations
else ""
)
signal_count = len(risk_flags) + (1 if platform_messages else 0)
return (
f"本次识别到 {signal_count} 类风险信号。"
f"触发原因:{''.join(reasons)}"
"建议先复核明细、附件和审批链,再决定是否继续处理。"
f"{citation_text}"
)
def _evaluate_platform_risk_messages(self, payload: UserAgentRequest) -> list[str]:
claim_id = str(payload.tool_payload.get("claim_id") or "").strip()
if not claim_id:
return []
claim = self.db.scalar(
select(ExpenseClaim)
.where(ExpenseClaim.id == claim_id)
.options(selectinload(ExpenseClaim.items))
)
if claim is None:
return []
rule_codes = resolve_rule_codes_for_risk_check(
payload.ontology,
query_text=payload.message,
)
review = ExpenseClaimService(self.db).evaluate_platform_risk_rules(
claim,
rule_codes=rule_codes,
)
messages: list[str] = []
for flag in review.get("flags") or []:
if not isinstance(flag, dict):
continue
message = str(flag.get("message") or "").strip()
if message and message not in messages:
messages.append(message)
return messages
def _build_draft_payload(self, payload: UserAgentRequest) -> UserAgentDraftPayload:
scenario_label = SCENARIO_LABELS.get(payload.ontology.scenario, "业务")
subject = self._resolve_subject(payload)
claim_no = str(payload.tool_payload.get("claim_no") or "").strip() or None
claim_status = str(payload.tool_payload.get("status") or "").strip() or None
approval_stage = str(payload.tool_payload.get("approval_stage") or "").strip() or None
is_submitted = claim_status == "submitted"
title = f"{scenario_label}处理意见草稿"
if claim_no:
title = f"{scenario_label}{'报销单' if is_submitted else '草稿'} {claim_no}"
if is_submitted:
body = (
f"主题:{subject}\n"
f"结论:报销单已提交,当前节点为 {approval_stage or '审批中'}\n"
"建议:后续可在个人报销列表中跟踪审批进度,必要时再补充说明或附件。\n"
f"原始问题:{payload.message}"
)
else:
body = (
f"主题:{subject}\n"
"结论:已根据当前语义解析结果生成草稿,尚未自动执行。\n"
"建议:请先核对明细、规则命中和所需附件,再由人工确认是否提交正式流程。\n"
f"原始问题:{payload.message}"
)
return UserAgentDraftPayload(
draft_type=payload.ontology.scenario,
title=title,
body=body,
confirmation_required=not is_submitted,
claim_id=str(payload.tool_payload.get("claim_id") or "").strip() or None,
claim_no=claim_no,
status=claim_status,
approval_stage=approval_stage,
)
@staticmethod
def _should_build_draft_payload(payload: UserAgentRequest) -> bool:
if payload.ontology.intent == "draft":
return True
if payload.ontology.scenario != "expense":
return False
return any(
str(payload.tool_payload.get(key) or "").strip()
for key in ("claim_id", "claim_no", "status")
)
def _build_suggested_actions(
self,
payload: UserAgentRequest,
) -> list[UserAgentSuggestedAction]:
if payload.ontology.scenario == "knowledge":
return []
if self._is_generic_expense_prompt(payload):
return [
UserAgentSuggestedAction(
label="上传票据",
action_type="ask_clarification",
description="上传发票、行程单或付款截图,继续识别报销内容。",
),
UserAgentSuggestedAction(
label="补充报销信息",
action_type="ask_clarification",
description="补充费用类型、金额、时间和事由后继续处理。",
),
]
if payload.ontology.intent in {"query", "compare"}:
return [
UserAgentSuggestedAction(
label="查看明细",
action_type="open_detail",
description="继续查看命中记录和过滤条件。",
),
UserAgentSuggestedAction(
label="生成处理意见",
action_type="create_draft",
description="把当前查询结果整理成可确认草稿。",
),
]
if payload.ontology.intent == "risk_check":
return [
UserAgentSuggestedAction(
label="人工复核风险",
action_type="manual_review",
description="优先检查明细、附件和规则命中原因。",
),
UserAgentSuggestedAction(
label="生成整改建议",
action_type="create_draft",
description="把风险说明整理成处理意见草稿。",
),
]
if payload.ontology.intent == "draft":
return [
UserAgentSuggestedAction(
label="复制草稿",
action_type="copy_draft",
description="复制当前草稿后交由人工确认。",
),
UserAgentSuggestedAction(
label="补充上下文",
action_type="ask_clarification",
description="补充单据编号、客户或供应商信息以完善草稿。",
),
]
return [
UserAgentSuggestedAction(
label="查看规则全文",
action_type="open_rule",
description="继续查看引用规则或知识内容。",
),
UserAgentSuggestedAction(
label="补充问题上下文",
action_type="ask_clarification",
description="补充业务对象、时间或单据范围,提升回答准确度。",
),
]
def _build_review_payload(
self,
payload: UserAgentRequest,
*,
citations: list[UserAgentCitation],
draft_payload: UserAgentDraftPayload | None,
) -> UserAgentReviewPayload | None:
attachment_count = self._resolve_attachment_count(payload)
ocr_documents = self._resolve_ocr_documents(payload)
if payload.ontology.scenario != "expense":
return None
if payload.ontology.intent not in {"draft", "operate"} and attachment_count <= 0 and not ocr_documents:
return None
document_cards = self._build_review_document_cards(payload, ocr_documents=ocr_documents)
claim_groups = self._build_review_claim_groups(
payload,
document_cards=document_cards,
)
slot_cards = self._build_review_slot_cards(
payload,
ocr_documents=ocr_documents,
claim_groups=claim_groups,
)
missing_slot_keys = self._resolve_review_missing_slot_keys(
payload,
slot_cards=slot_cards,
)
submission_blocked = bool(payload.tool_payload.get("submission_blocked"))
risk_briefs = self._build_review_risk_briefs(
payload,
citations=citations,
document_cards=document_cards,
claim_groups=claim_groups,
)
association_choice_pending = self._is_review_association_choice_pending(payload)
can_proceed = (
False
if association_choice_pending or submission_blocked
else self._can_proceed_review(
payload,
missing_slot_keys=missing_slot_keys,
claim_groups=claim_groups,
)
)
confirmation_actions = self._build_review_confirmation_actions(
payload,
can_proceed=can_proceed,
claim_groups=claim_groups,
draft_payload=draft_payload,
)
edit_fields = self._build_review_edit_fields(
payload,
draft_payload=draft_payload,
slot_cards=slot_cards,
)
intent_summary = self._build_review_intent_summary(
payload,
slot_cards=slot_cards,
claim_groups=claim_groups,
)
body_message = self._build_review_body_message(
payload,
slot_cards=slot_cards,
risk_briefs=risk_briefs,
can_proceed=can_proceed,
document_cards=document_cards,
)
return UserAgentReviewPayload(
intent_summary=intent_summary,
body_message=body_message,
scenario=payload.ontology.scenario,
intent=payload.ontology.intent,
can_proceed=can_proceed,
missing_slots=[SLOT_LABELS.get(key, key) for key in missing_slot_keys],
risk_briefs=risk_briefs,
slot_cards=slot_cards,
document_cards=document_cards,
claim_groups=claim_groups,
confirmation_actions=confirmation_actions,
edit_fields=edit_fields,
)
def _build_review_slot_cards(
self,
payload: UserAgentRequest,
*,
ocr_documents: list[dict[str, object]],
claim_groups: list[UserAgentReviewClaimGroup],
) -> list[UserAgentReviewSlotCard]:
entity_map = self._collect_entity_values(payload)
time_slot = self._build_time_slot(payload)
location_slot = self._build_location_slot(payload)
customer_slot = self._build_customer_slot(payload, entity_map=entity_map)
participants_slot = self._build_participants_slot(payload, entity_map=entity_map)
amount_slot = self._build_amount_slot(payload, entity_map=entity_map, ocr_documents=ocr_documents)
expense_type_slot = self._build_expense_type_slot(
payload,
entity_map=entity_map,
ocr_documents=ocr_documents,
)
merchant_slot = self._build_merchant_slot(payload, ocr_documents=ocr_documents)
reason_slot = self._build_reason_slot(
payload,
claim_groups=claim_groups,
)
attachment_slot = self._build_attachment_slot(payload)
required_keys = self._resolve_required_review_keys(
payload,
primary_expense_type=str(expense_type_slot["normalized_value"] or ""),
claim_groups=claim_groups,
)
cards = [
self._make_slot_card(
key="expense_type",
value=expense_type_slot["value"],
raw_value=expense_type_slot["raw_value"],
normalized_value=expense_type_slot["normalized_value"],
source=expense_type_slot["source"],
confidence=expense_type_slot["confidence"],
evidence=expense_type_slot["evidence"],
required="expense_type" in required_keys,
),
self._make_slot_card(
key="customer_name",
value=customer_slot["value"],
raw_value=customer_slot["raw_value"],
normalized_value=customer_slot["normalized_value"],
source=customer_slot["source"],
confidence=customer_slot["confidence"],
evidence=customer_slot["evidence"],
required="customer_name" in required_keys,
),
self._make_slot_card(
key="time_range",
value=time_slot["value"],
raw_value=time_slot["raw_value"],
normalized_value=time_slot["normalized_value"],
source=time_slot["source"],
confidence=time_slot["confidence"],
evidence=time_slot["evidence"],
required="time_range" in required_keys,
),
self._make_slot_card(
key="location",
value=location_slot["value"],
raw_value=location_slot["raw_value"],
normalized_value=location_slot["normalized_value"],
source=location_slot["source"],
confidence=location_slot["confidence"],
evidence=location_slot["evidence"],
required="location" in required_keys,
),
self._make_slot_card(
key="merchant_name",
value=merchant_slot["value"],
raw_value=merchant_slot["raw_value"],
normalized_value=merchant_slot["normalized_value"],
source=merchant_slot["source"],
confidence=merchant_slot["confidence"],
evidence=merchant_slot["evidence"],
required="merchant_name" in required_keys,
),
self._make_slot_card(
key="amount",
value=amount_slot["value"],
raw_value=amount_slot["raw_value"],
normalized_value=amount_slot["normalized_value"],
source=amount_slot["source"],
confidence=amount_slot["confidence"],
evidence=amount_slot["evidence"],
required="amount" in required_keys,
),
self._make_slot_card(
key="reason",
value=reason_slot["value"],
raw_value=reason_slot["raw_value"],
normalized_value=reason_slot["normalized_value"],
source=reason_slot["source"],
confidence=reason_slot["confidence"],
evidence=reason_slot["evidence"],
required="reason" in required_keys,
),
self._make_slot_card(
key="participants",
value=participants_slot["value"],
raw_value=participants_slot["raw_value"],
normalized_value=participants_slot["normalized_value"],
source=participants_slot["source"],
confidence=participants_slot["confidence"],
evidence=participants_slot["evidence"],
required="participants" in required_keys,
),
self._make_slot_card(
key="attachments",
value=attachment_slot["value"],
raw_value=attachment_slot["raw_value"],
normalized_value=attachment_slot["normalized_value"],
source=attachment_slot["source"],
confidence=attachment_slot["confidence"],
evidence=attachment_slot["evidence"],
required="attachments" in required_keys,
),
]
return cards
def _build_review_document_cards(
self,
payload: UserAgentRequest,
*,
ocr_documents: list[dict[str, object]],
) -> list[UserAgentReviewDocumentCard]:
cards: list[UserAgentReviewDocumentCard] = []
for index, item in enumerate(ocr_documents, start=1):
classified = self._classify_document(item, payload)
fields = self._extract_document_fields(item)
cards.append(
UserAgentReviewDocumentCard(
index=index,
filename=str(item.get("filename") or f"document-{index}"),
document_type=classified["document_type"],
suggested_expense_type=classified["expense_type"],
scene_label=GROUP_SCENE_LABELS.get(
classified["group_code"],
classified["scene_label"],
),
summary=str(item.get("summary") or item.get("text") or "").strip(),
avg_score=float(item.get("avg_score") or 0.0),
preview_kind=str(item.get("preview_kind") or "").strip(),
preview_data_url=str(item.get("preview_data_url") or "").strip(),
warnings=[str(warning) for warning in item.get("warnings", []) if str(warning).strip()],
fields=[
UserAgentReviewDocumentField(
label=label,
value=value,
source="ocr",
)
for label, value in fields.items()
if str(value).strip()
],
)
)
return cards
def _build_review_claim_groups(
self,
payload: UserAgentRequest,
*,
document_cards: list[UserAgentReviewDocumentCard],
) -> list[UserAgentReviewClaimGroup]:
groups: dict[str, dict[str, object]] = {}
for card in document_cards:
group_code = self._normalize_group_code(card.suggested_expense_type)
bucket = groups.setdefault(
group_code,
{
"document_indexes": [],
"amount_total": 0.0,
"expense_type": str(card.suggested_expense_type or group_code).strip() or group_code,
"scene_label": GROUP_SCENE_LABELS.get(
str(card.suggested_expense_type or group_code).strip() or group_code,
GROUP_SCENE_LABELS.get(group_code, "其他费用"),
),
"reasons": [],
},
)
bucket["document_indexes"].append(card.index)
bucket["amount_total"] = float(bucket["amount_total"]) + self._extract_amount_from_card(card)
bucket["reasons"].append(f"{card.filename} 识别为 {card.scene_label}")
current_expense_type = str(bucket["expense_type"] or "").strip()
current_card_type = str(card.suggested_expense_type or "").strip()
if current_expense_type and current_card_type and current_expense_type != current_card_type:
bucket["expense_type"] = group_code
bucket["scene_label"] = GROUP_SCENE_LABELS.get(group_code, "其他费用")
if not groups:
expense_type_code = self._collect_entity_values(payload).get("expense_type_code", "other")
group_code = self._normalize_group_code(expense_type_code)
groups[group_code] = {
"document_indexes": [],
"amount_total": self._resolve_amount_value(payload),
"expense_type": expense_type_code or "other",
"scene_label": GROUP_SCENE_LABELS.get(group_code, "其他费用"),
"reasons": ["当前主要依据用户文本和页面上下文进行分单建议。"],
}
claim_groups: list[UserAgentReviewClaimGroup] = []
for index, (group_code, bucket) in enumerate(groups.items(), start=1):
title = f"建议报销单 {index}{bucket['scene_label']}"
rationale = (
"".join(dict.fromkeys(str(item) for item in bucket["reasons"]))
if bucket["reasons"]
else "当前仅有单一场景,无需拆单。"
)
claim_groups.append(
UserAgentReviewClaimGroup(
group_code=group_code,
title=title,
expense_type=str(bucket["expense_type"]),
scene_label=str(bucket["scene_label"]),
document_indexes=list(bucket["document_indexes"]),
amount_total=round(float(bucket["amount_total"]), 2),
rationale=rationale,
)
)
return claim_groups
def _build_review_risk_briefs(
self,
payload: UserAgentRequest,
*,
citations: list[UserAgentCitation],
document_cards: list[UserAgentReviewDocumentCard],
claim_groups: list[UserAgentReviewClaimGroup],
) -> list[UserAgentReviewRiskBrief]:
briefs: list[UserAgentReviewRiskBrief] = []
for reason in self._resolve_submission_blocked_reasons(payload):
briefs.append(
UserAgentReviewRiskBrief(
title="AI预审未通过",
level="high",
content=reason,
detail=(
"该项属于提交审批前的阻断条件。系统会先要求补齐基础字段、附件或业务说明,"
"否则审批人无法判断成本归属、业务真实性或票据有效性。"
),
suggestion="按提示补齐对应信息;如果业务场景本身合理,请补充说明或佐证附件后再提交。",
)
)
employee = self._resolve_employee_profile(payload)
employee_name = (
str(employee.name).strip()
if employee is not None and employee.name
else self._collect_entity_values(payload).get("employee_name")
or str(payload.context_json.get("name") or "").strip()
)
if employee_name:
since = datetime.now(UTC) - timedelta(days=90)
claim_identity_conditions = [ExpenseClaim.employee_name == employee_name]
if employee is not None:
employee_identifiers = {
str(employee.name or "").strip(),
str(employee.email or "").strip(),
str(employee.employee_no or "").strip(),
}
employee_identifiers.discard("")
claim_identity_conditions = [
ExpenseClaim.employee_id == employee.id,
ExpenseClaim.employee_name.in_(list(employee_identifiers)),
]
stmt = select(ExpenseClaim).where(or_(*claim_identity_conditions), ExpenseClaim.occurred_at >= since)
recent_claims = list(self.db.scalars(stmt).all())
if recent_claims:
risky_count = sum(1 for item in recent_claims if item.risk_flags_json)
draft_count = sum(1 for item in recent_claims if item.status == "draft")
briefs.append(
UserAgentReviewRiskBrief(
title="历史报销画像",
level="info",
content=(
f"{employee_name} 最近 90 天共有 {len(recent_claims)} 笔报销,"
f"其中 {risky_count} 笔带风险标记,{draft_count} 笔仍处于草稿态。"
),
detail=(
"该画像来自员工近 90 天报销记录,用于辅助判断是否存在频繁草稿、"
"历史风险或异常重复报销倾向,不会单独阻断审批。"
),
suggestion="如历史记录中存在风险标记,本次提交时建议主动补充业务背景和票据说明。",
)
)
current_amount = self._resolve_amount_value(payload)
if current_amount > 0:
duplicate_count = sum(
1
for item in recent_claims
if abs(float(item.amount) - current_amount) < 0.01
)
if duplicate_count:
briefs.append(
UserAgentReviewRiskBrief(
title="金额重复预警",
level="warning",
content=(
f"近 90 天发现 {duplicate_count} 笔金额相同的报销记录,"
"提交前建议核对是否为重复报销或拆分不当。"
),
detail=(
"系统将当前金额与近 90 天历史报销金额进行比对。金额完全一致不一定违规,"
"但在交通、餐饮、办公采购等场景中可能提示重复票据或拆分报销。"
),
suggestion="核对历史单据与当前票据是否对应同一业务;如不是重复,请在事由中说明差异。",
)
)
if citations:
briefs.append(
UserAgentReviewRiskBrief(
title="制度注意事项",
level="info",
content=citations[0].excerpt or f"请先核对 {citations[0].title} 的制度要求。",
detail=f"本条来自规则或知识库引用:{citations[0].title}。提交前应确认当前单据符合该条口径。",
suggestion="如当前场景与制度口径存在差异,请补充审批说明或选择更准确的报销分类。",
)
)
warning_count = sum(len(item.warnings) for item in document_cards)
if warning_count:
briefs.append(
UserAgentReviewRiskBrief(
title="票据识别提醒",
level="warning",
content=f"当前共有 {warning_count} 条票据识别提示,建议逐张确认 OCR 识别字段。",
detail="票据 OCR 识别存在字段缺失、置信度偏低或类型判断不稳定时,会生成该提醒。",
suggestion="打开票据明细逐张核对日期、金额、商户和票据类型,必要时更正后再提交。",
)
)
if len(claim_groups) > 1:
briefs.append(
UserAgentReviewRiskBrief(
title="建议拆单",
level="high",
content=f"系统检测到 {len(claim_groups)} 类费用场景,建议拆成多张报销单后再提交。",
detail="同一批附件中包含多类费用场景时,混在一张报销单里会影响规则匹配、附件核验和审批归口。",
suggestion="按费用场景拆成多张报销单,分别确认金额、事由和附件归属。",
)
)
return briefs[:4]
@staticmethod
def _resolve_submission_blocked_reasons(payload: UserAgentRequest) -> list[str]:
raw_reasons = payload.tool_payload.get("submission_blocked_reasons")
submission_blocked = bool(payload.tool_payload.get("submission_blocked"))
if raw_reasons is None and submission_blocked:
raw_reasons = payload.tool_payload.get("missing_fields")
if raw_reasons is None and not submission_blocked:
return []
reasons: list[str] = []
if isinstance(raw_reasons, list):
reasons.extend(str(item or "").strip() for item in raw_reasons)
elif isinstance(raw_reasons, str):
reasons.extend(
item.strip()
for item in re.split(r"[;\n]+", raw_reasons)
if item.strip()
)
if not reasons and submission_blocked:
message = str(payload.tool_payload.get("message") or "").strip()
for prefix in (
"提交前请先补全信息:",
"AI预审暂未通过原因如下",
"AI预审未通过原因如下",
"AI预审暂未通过",
"AI预审未通过",
):
if message.startswith(prefix):
message = message[len(prefix):].strip()
break
if message:
reasons.extend(
item.strip()
for item in re.split(r"[;\n]+", message)
if item.strip() and not item.strip().startswith("AI预审暂未通过")
)
return list(dict.fromkeys(reason for reason in reasons if reason))
def _build_review_confirmation_actions(
self,
payload: UserAgentRequest,
*,
can_proceed: bool,
claim_groups: list[UserAgentReviewClaimGroup],
draft_payload: UserAgentDraftPayload | None,
) -> list[UserAgentReviewAction]:
if self._is_review_association_choice_pending(payload):
claim_no = str(payload.tool_payload.get("association_candidate_claim_no") or "").strip()
link_label = f"关联到草稿 {claim_no}" if claim_no else "关联到现有草稿"
return [
UserAgentReviewAction(
label="取消",
action_type="cancel_review",
description="放弃当前识别结果,并退出本次核对流程。",
emphasis="secondary",
),
UserAgentReviewAction(
label="修改识别信息",
action_type="edit_review",
description="打开结构化模板,按已识别字段逐项修改。",
emphasis="secondary",
),
UserAgentReviewAction(
label=link_label,
action_type="link_to_existing_draft",
description=(
f"把本次上传票据并入现有草稿 {claim_no}"
if claim_no
else "把本次上传票据并入现有草稿。"
),
emphasis="primary",
),
UserAgentReviewAction(
label="单独建立报销单",
action_type="create_new_claim_from_documents",
description="基于当前上传的多张票据,新建一张独立的报销草稿。",
emphasis="secondary",
),
]
primary_action = UserAgentReviewAction(
label="继续下一步" if can_proceed else "保存为草稿",
action_type="next_step" if can_proceed else "save_draft",
description=(
"当前识别信息已满足继续处理条件,确认后进入下一步。"
if can_proceed
else "暂存当前识别结果,后续可以继续补充或修改。"
),
emphasis="primary",
)
if len(claim_groups) > 1 and can_proceed:
primary_action.description = f"系统建议拆分为 {len(claim_groups)} 张报销单,确认后继续下一步。"
if draft_payload is not None and draft_payload.claim_no and not can_proceed:
primary_action.description = f"保存后会生成草稿 {draft_payload.claim_no},后续仍可继续补充。"
return [
UserAgentReviewAction(
label="取消",
action_type="cancel_review",
description="放弃当前识别结果,并退出本次核对流程。",
emphasis="secondary",
),
UserAgentReviewAction(
label="修改识别信息",
action_type="edit_review",
description="打开结构化模板,按已识别字段逐项修改。",
emphasis="secondary",
),
primary_action,
]
def _build_review_intent_summary(
self,
payload: UserAgentRequest,
*,
slot_cards: list[UserAgentReviewSlotCard],
claim_groups: list[UserAgentReviewClaimGroup],
) -> str:
slots = {item.key: item for item in slot_cards}
expense_type = slots.get("expense_type")
amount = slots.get("amount")
time_range = slots.get("time_range")
location = slots.get("location")
customer = slots.get("customer_name")
summary = "我先根据您当前提供的信息整理出一笔报销。"
if expense_type and expense_type.value:
summary = f"识别到您希望报销一笔“{expense_type.value}”费用。"
details: list[str] = []
if customer and customer.value:
details.append(f"客户为 {customer.value}")
if time_range and time_range.value:
details.append(f"时间为 {time_range.value}")
if location and location.value:
details.append(f"地点为 {location.value}")
if amount and amount.value:
details.append(f"金额为 {amount.value}")
reason = slots.get("reason")
if reason and reason.value:
details.append(f"事由是 {reason.value}")
if details:
return f"{summary} {''.join(details)}"
return summary
def _build_review_body_answer(
self,
payload: UserAgentRequest,
*,
review_payload: UserAgentReviewPayload | None,
draft_payload: UserAgentDraftPayload | None,
) -> str | None:
if review_payload is None:
return None
if payload.ontology.scenario != "expense":
return None
if payload.ontology.intent not in {"draft", "operate"}:
return None
if payload.tool_payload.get("draft_limit_reached"):
return (
str(payload.tool_payload.get("message") or "").strip()
or "你当前已保存 3 个草稿,请先完成已保存的草稿,才能再次新建草稿。"
)
review_action = str(payload.context_json.get("review_action") or "").strip()
if review_action == "save_draft":
if draft_payload is not None and draft_payload.claim_no:
return (
f"已按您当前确认的信息保存为草稿 {draft_payload.claim_no}"
"后续您可以继续补充缺失项,或修改识别结果后再继续提交。"
)
return "已按您当前确认的信息保存为草稿。后续您可以继续补充缺失项,或修改识别结果后再继续提交。"
if review_action == "link_to_existing_draft":
document_count = self._resolve_review_document_count(payload)
if draft_payload is not None and draft_payload.claim_no:
return (
f"已将本次上传的 {document_count} 张票据关联到草稿 {draft_payload.claim_no}"
"您可以继续补充识别字段,确认无误后再提交审批。"
)
return "已将本次上传的票据关联到现有草稿。您可以继续补充识别字段,确认无误后再提交审批。"
if review_action == "create_new_claim_from_documents":
document_count = self._resolve_review_document_count(payload)
if draft_payload is not None and draft_payload.claim_no:
return (
f"已按当前上传的 {document_count} 张票据新建报销草稿 {draft_payload.claim_no}"
"您可以继续补充识别字段,确认无误后再提交审批。"
)
return "已按当前上传票据新建报销草稿。您可以继续补充识别字段,确认无误后再提交审批。"
if review_action == "next_step":
if draft_payload is not None and draft_payload.status == "submitted":
stage_text = draft_payload.approval_stage or "审批中"
return f"报销单 {draft_payload.claim_no or ''} 已提交,当前节点为 {stage_text}".strip()
if payload.tool_payload.get("submission_blocked"):
reasons = self._resolve_submission_blocked_reasons(payload)
if reasons:
reason_lines = "\n".join(
f"{index}. {reason}" for index, reason in enumerate(reasons, start=1)
)
return (
"AI预审暂未通过所以还没有提交到审批人。\n"
f"{reason_lines}\n"
"请先处理以上项目;处理完成后再点继续下一步。"
)
return str(payload.tool_payload.get("message") or "").strip() or "当前报销单暂时还不能提交审批。"
return (
f"{self._build_review_intent_summary(payload, slot_cards=review_payload.slot_cards, claim_groups=review_payload.claim_groups)} "
"当前关键信息已基本齐全,您确认无误后可以继续下一步。"
)
if review_action == "edit_review":
return (
f"{self._build_review_intent_summary(payload, slot_cards=review_payload.slot_cards, claim_groups=review_payload.claim_groups)} "
f"{self._build_review_guidance_copy(review_payload, mention_save_draft=True)}"
)
return review_payload.body_message or None
def _build_review_body_message(
self,
payload: UserAgentRequest,
*,
slot_cards: list[UserAgentReviewSlotCard],
risk_briefs: list[UserAgentReviewRiskBrief],
can_proceed: bool,
document_cards: list[UserAgentReviewDocumentCard],
) -> str:
if self._is_review_association_choice_pending(payload):
claim_no = str(payload.tool_payload.get("association_candidate_claim_no") or "").strip()
document_count = len(document_cards) or self._resolve_review_document_count(payload)
if claim_no:
return (
f"已识别出本次上传的 {document_count} 张票据。"
f"系统检测到你已有草稿 {claim_no},请选择关联到该草稿,或单独建立一张新的报销单。"
)
return (
f"已识别出本次上传的 {document_count} 张票据。"
"系统检测到你已有可用草稿,请先选择关联到现有草稿,或单独建立一张新的报销单。"
)
review_payload = UserAgentReviewPayload(
intent_summary="",
body_message="",
scenario=payload.ontology.scenario,
intent=payload.ontology.intent,
can_proceed=can_proceed,
missing_slots=self._resolve_review_missing_slot_labels(slot_cards),
risk_briefs=risk_briefs,
slot_cards=slot_cards,
document_cards=[],
claim_groups=[],
confirmation_actions=[],
edit_fields=[],
)
return (
f"{self._build_review_intent_summary(payload, slot_cards=slot_cards, claim_groups=[])} "
f"{self._build_review_guidance_copy(review_payload, mention_save_draft=not can_proceed)}"
)
@staticmethod
def _resolve_review_missing_slot_labels(
slot_cards: list[UserAgentReviewSlotCard],
) -> list[str]:
return [item.label for item in slot_cards if item.status == "missing"]
@staticmethod
def _build_review_guidance_copy(
review_payload: UserAgentReviewPayload,
*,
mention_save_draft: bool,
) -> str:
missing_count = len(review_payload.missing_slots)
reminder_count = len(review_payload.risk_briefs)
if review_payload.can_proceed:
if reminder_count:
return (
f"当前关键信息已基本齐全,但还有 {reminder_count} 条提醒。"
"您可以展开下方卡片查看详情,确认无误后继续下一步。"
)
return "当前关键信息已基本齐全,您确认无误后可以继续下一步。"
issue_parts: list[str] = []
if missing_count:
issue_parts.append(f"{missing_count} 项信息待补充")
if reminder_count:
issue_parts.append(f"{reminder_count} 条提醒")
issue_summary = "".join(issue_parts) if issue_parts else "一些细节还需要进一步确认"
suffix = ";如果想先暂存,也可以点击下方按钮保存草稿。" if mention_save_draft else ""
return (
f"当前还有 {issue_summary}"
f"您可以展开下方卡片查看详情,继续补充或修改{suffix}"
)
@staticmethod
def _can_proceed_review(
payload: UserAgentRequest,
*,
missing_slot_keys: list[str],
claim_groups: list[UserAgentReviewClaimGroup],
) -> bool:
if payload.ontology.ambiguity:
return False
if missing_slot_keys:
return False
if not claim_groups:
return False
return True
def _build_review_edit_fields(
self,
payload: UserAgentRequest,
*,
draft_payload: UserAgentDraftPayload | None,
slot_cards: list[UserAgentReviewSlotCard],
) -> list[UserAgentReviewEditField]:
slot_map = {item.key: item for item in slot_cards}
employee = self._resolve_employee_profile(payload)
reporter_name = (
slot_map.get("reporter_name").value
if slot_map.get("reporter_name")
else str(payload.context_json.get("name") or "").strip()
)
manager_name = self._resolve_manager_name(employee)
reason = slot_map.get("reason").value if slot_map.get("reason") else ""
attachments = "".join(self._resolve_attachment_names(payload))
fields = [
UserAgentReviewEditField(
key="claim_no",
label="报销单据编号",
value=str(draft_payload.claim_no if draft_payload is not None and draft_payload.claim_no else "待生成"),
placeholder="保存草稿后自动生成",
required=False,
group="basic",
),
UserAgentReviewEditField(
key="expense_type",
label="报销类型",
value=slot_map.get("expense_type").value if slot_map.get("expense_type") else "",
placeholder="例如:业务招待费 / 差旅费",
group="basic",
),
UserAgentReviewEditField(
key="occurred_date",
label="业务发生时间",
value=slot_map.get("time_range").normalized_value if slot_map.get("time_range") and slot_map.get("time_range").normalized_value else slot_map.get("time_range").value if slot_map.get("time_range") else "",
placeholder="例如2026-05-11",
group="basic",
),
UserAgentReviewEditField(
key="reporter_name",
label="报销人",
value=reporter_name,
placeholder="请输入报销人姓名",
group="basic",
),
UserAgentReviewEditField(
key="manager_name",
label="直属上司姓名",
value=manager_name,
placeholder="请输入直属上司姓名",
required=False,
group="basic",
),
UserAgentReviewEditField(
key="customer_name",
label="客户名称",
value=slot_map.get("customer_name").value if slot_map.get("customer_name") else "",
placeholder="请输入客户名称",
group="business",
),
UserAgentReviewEditField(
key="business_location",
label="业务地点",
value=slot_map.get("location").normalized_value if slot_map.get("location") and slot_map.get("location").normalized_value else slot_map.get("location").value if slot_map.get("location") else "",
placeholder="例如:北京 / 客户现场",
required=False,
group="business",
),
UserAgentReviewEditField(
key="merchant_name",
label="酒店/商户",
value=slot_map.get("merchant_name").value if slot_map.get("merchant_name") else "",
placeholder="请输入酒店或商户名称",
required=False,
group="business",
),
UserAgentReviewEditField(
key="amount",
label="金额",
value=slot_map.get("amount").normalized_value if slot_map.get("amount") and slot_map.get("amount").normalized_value else slot_map.get("amount").value if slot_map.get("amount") else "",
placeholder="例如200.00元",
group="business",
),
UserAgentReviewEditField(
key="participants",
label="参与人员",
value=slot_map.get("participants").value if slot_map.get("participants") else "",
placeholder="例如:客户 2 人,我方 1 人",
group="business",
),
UserAgentReviewEditField(
key="reason",
label="事由",
value=reason,
placeholder="请输入报销事由",
field_type="textarea",
group="business",
),
UserAgentReviewEditField(
key="attachment_names",
label="附件清单",
value=attachments,
placeholder="例如:发票.jpg、行程单.png",
required=False,
field_type="textarea",
group="attachments",
),
]
return fields
def _resolve_employee_profile(self, payload: UserAgentRequest) -> Employee | None:
candidates = [
str(payload.context_json.get("name") or "").strip(),
str(payload.user_id or "").strip(),
self._collect_entity_values(payload).get("employee_name", ""),
]
normalized = [item for item in dict.fromkeys(candidates) if item]
if not normalized:
return None
stmt = (
select(Employee)
.options(selectinload(Employee.organization_unit), selectinload(Employee.manager))
.where(
or_(
Employee.name.in_(normalized),
Employee.employee_no.in_(normalized),
Employee.email.in_(normalized),
)
)
.limit(1)
)
return self.db.scalar(stmt)
@staticmethod
def _resolve_manager_name(employee: Employee | None) -> str:
if employee is None:
return ""
if employee.manager is not None and employee.manager.name:
return employee.manager.name
if employee.organization_unit is not None and employee.organization_unit.manager_name:
return employee.organization_unit.manager_name
return ""
@staticmethod
def _extract_message_reason(message: str) -> str:
for line in str(message or "").splitlines():
cleaned = line.strip()
if not cleaned:
continue
if cleaned.startswith(("附件名称:", "OCR摘要", "关联单号:")):
continue
return cleaned[:300]
return ""
@staticmethod
def _looks_like_system_generated_reason_message(message: str) -> bool:
cleaned = str(message or "").strip()
if not cleaned:
return False
compact = re.sub(r"\s+", "", cleaned)
return compact.startswith(SYSTEM_GENERATED_REASON_PREFIXES)
def _resolve_reason_source_text(self, payload: UserAgentRequest) -> str:
explicit_text = payload.context_json.get("user_input_text")
if isinstance(explicit_text, str):
return explicit_text.strip()
if self._looks_like_system_generated_reason_message(payload.message):
return ""
return str(payload.message or "").strip()
@classmethod
def _resolve_reason_text(cls, message: str) -> str:
reason = cls._strip_leading_time_from_reason(cls._extract_message_reason(message))
if not reason:
return ""
compact = re.sub(r"\s+", "", reason)
if compact in GENERIC_EXPENSE_PROMPTS:
return ""
instruction_prefixes = (
"帮我生成",
"请帮我生成",
"生成",
"起草",
"创建",
"发起",
"准备",
"帮我报销",
"我要报销",
"我想报销",
)
if compact.startswith(instruction_prefixes):
for separator in ("", ",", "", "", ";", "", ":"):
if separator in reason:
trailing = reason.split(separator, 1)[1].strip()
if trailing:
return trailing[:300]
return ""
return reason
@staticmethod
def _strip_leading_time_from_reason(value: str) -> str:
reason = str(value or "").strip()
for pattern in LEADING_REASON_TIME_PATTERNS:
next_reason = pattern.sub("", reason).strip()
if next_reason != reason:
return next_reason
return reason
@staticmethod
def _should_skip_model_answer(
payload: UserAgentRequest,
review_payload: UserAgentReviewPayload | None,
) -> bool:
if payload.ontology.scenario == "expense" and payload.ontology.intent in {"query", "compare"}:
return True
if review_payload is None:
return False
return payload.ontology.scenario == "expense" and (
payload.ontology.intent == "draft"
or int(payload.context_json.get("attachment_count") or 0) > 0
)
def _build_citations(self, payload: UserAgentRequest) -> list[UserAgentCitation]:
knowledge_citations = self._build_knowledge_citations(payload)
if payload.ontology.scenario == "knowledge":
return knowledge_citations[:3]
rule_citations = self._build_rule_asset_citations(payload)
if knowledge_citations:
return (knowledge_citations + rule_citations)[:3]
return rule_citations
@staticmethod
def _build_knowledge_citations(payload: UserAgentRequest) -> list[UserAgentCitation]:
citations: list[UserAgentCitation] = []
for item in list(payload.tool_payload.get("hits") or [])[:3]:
if not isinstance(item, dict):
continue
title = str(item.get("title") or item.get("document_name") or "").strip()
code = str(item.get("code") or item.get("candidate_id") or "").strip()
if not title or not code:
continue
citations.append(
UserAgentCitation(
source_type="knowledge",
code=code,
title=title,
version=str(item.get("version") or "").strip() or None,
updated_at=str(item.get("updated_at") or "").strip() or None,
excerpt=(
str(item.get("excerpt") or "").strip()
or str(item.get("content") or "").strip()
or None
),
)
)
return citations
def _build_rule_asset_citations(self, payload: UserAgentRequest) -> list[UserAgentCitation]:
domain = self._resolve_domain(payload.ontology.scenario)
items = self.asset_service.list_assets(
asset_type=AgentAssetType.RULE.value,
status=AgentAssetStatus.ACTIVE.value,
domain=domain,
)
ranked = self._rank_rule_assets(items, payload)
citations: list[UserAgentCitation] = []
for item in ranked[:2]:
detail = self.asset_service.get_asset(item.id)
if detail is None:
continue
excerpt = self._extract_excerpt(str(detail.current_version_content or ""))
citations.append(
UserAgentCitation(
source_type="rule",
code=detail.code,
title=detail.name,
version=detail.current_version,
updated_at=detail.updated_at.date().isoformat(),
excerpt=excerpt,
)
)
return citations
@staticmethod
def _resolve_risk_flags(payload: UserAgentRequest) -> list[str]:
tool_flags = payload.tool_payload.get("risk_flags")
if isinstance(tool_flags, list) and tool_flags:
return [str(item) for item in tool_flags]
return [str(item) for item in payload.ontology.risk_flags]
@staticmethod
def _resolve_subject(payload: UserAgentRequest) -> str:
named_entities = [
item.value
for item in payload.ontology.entities
if item.type in {"employee", "customer", "vendor", "project"}
]
if named_entities:
return f"{''.join(named_entities)} 相关数据"
return f"{SCENARIO_LABELS.get(payload.ontology.scenario, '当前')}场景数据"
@staticmethod
def _is_generic_expense_prompt(payload: UserAgentRequest) -> bool:
if payload.ontology.scenario != "expense":
return False
normalized_message = re.sub(r"\s+", "", payload.message)
return normalized_message in GENERIC_EXPENSE_PROMPTS
@staticmethod
def _is_implicit_expense_draft_request(payload: UserAgentRequest) -> bool:
if payload.ontology.scenario != "expense" or payload.ontology.intent != "draft":
return False
compact_message = re.sub(r"\s+", "", payload.message)
if any(keyword in compact_message for keyword in EXPLICIT_DRAFT_KEYWORDS):
return False
return True
@staticmethod
def _resolve_attachment_names(payload: UserAgentRequest) -> list[str]:
names = payload.context_json.get("attachment_names")
if not isinstance(names, list):
return []
return [str(name) for name in names if str(name).strip()]
@staticmethod
def _resolve_attachment_count(payload: UserAgentRequest) -> int:
names = UserAgentService._resolve_attachment_names(payload)
if names:
return len(names)
try:
return max(0, int(payload.context_json.get("attachment_count") or 0))
except (TypeError, ValueError):
return 0
@staticmethod
def _resolve_ocr_documents(payload: UserAgentRequest) -> list[dict[str, object]]:
documents = payload.context_json.get("ocr_documents")
if not isinstance(documents, list):
return []
overrides = payload.context_json.get("review_document_form_values")
override_map: dict[tuple[int, str], dict[str, object]] = {}
if isinstance(overrides, list):
for item in overrides:
if not isinstance(item, dict):
continue
filename = str(item.get("filename") or "").strip()
index = int(item.get("index") or 0)
if not filename and index <= 0:
continue
override_map[(index, filename)] = item
normalized: list[dict[str, object]] = []
for index, item in enumerate(documents[:8], start=1):
if not isinstance(item, dict):
continue
normalized_item = dict(item)
override = override_map.get((index, str(normalized_item.get("filename") or "").strip()))
if override is None:
override = override_map.get((index, ""))
if override is not None:
summary = str(override.get("summary") or "").strip()
scene_label = str(override.get("scene_label") or "").strip()
fields = override.get("fields")
if summary:
normalized_item["summary"] = summary
if scene_label:
normalized_item["scene_label"] = scene_label
if isinstance(fields, list):
normalized_item["document_fields"] = [
{
"key": str(field.get("key") or field.get("label") or "").strip(),
"label": str(field.get("label") or "").strip(),
"value": str(field.get("value") or "").strip(),
}
for field in fields
if isinstance(field, dict)
and str(field.get("label") or "").strip()
and str(field.get("value") or "").strip()
]
normalized.append(normalized_item)
return normalized
@staticmethod
def _is_review_association_choice_pending(payload: UserAgentRequest) -> bool:
return bool(payload.tool_payload.get("pending_association_decision"))
def _resolve_review_document_count(self, payload: UserAgentRequest) -> int:
return max(
len(self._resolve_ocr_documents(payload)),
self._resolve_attachment_count(payload),
)
@staticmethod
def _resolve_conversation_history(payload: UserAgentRequest) -> list[dict[str, object]]:
history = payload.context_json.get("conversation_history")
if not isinstance(history, list):
return []
normalized: list[dict[str, object]] = []
for item in history[-8:]:
if not isinstance(item, dict):
continue
role = str(item.get("role") or "").strip()
content = str(item.get("content") or "").strip()
if not role or not content:
continue
normalized.append({"role": role, "content": content})
return normalized
@staticmethod
def _resolve_domain(scenario: str) -> str | None:
if scenario == "expense":
return "expense"
if scenario == "accounts_receivable":
return "ar"
if scenario == "accounts_payable":
return "ap"
return None
@staticmethod
def _rank_rule_assets(
items: list[AgentAssetListItem],
payload: UserAgentRequest,
) -> list[AgentAssetListItem]:
def score(item: AgentAssetListItem) -> tuple[int, str]:
tags = {str(value) for value in item.scenario_json or []}
weight = 0
if payload.ontology.scenario in tags:
weight += 3
if payload.ontology.intent in tags:
weight += 2
for risk_flag in payload.ontology.risk_flags:
if risk_flag in tags:
weight += 4
return weight, item.code
ranked = sorted(items, key=score, reverse=True)
return [item for item in ranked if score(item)[0] > 0]
@staticmethod
def _extract_excerpt(content: str) -> str:
lines = [line.strip() for line in str(content).splitlines() if line.strip()]
cleaned: list[str] = []
for line in lines:
normalized = re.sub(r"^[#>\-\*\d\.\s`]+", "", line).strip()
if normalized:
cleaned.append(normalized)
if len(cleaned) >= 2:
break
return "".join(cleaned[:2])
def _collect_entity_values(self, payload: UserAgentRequest) -> dict[str, str]:
values = {
"employee_name": "",
"customer": "",
"participants": "",
"amount": "",
"expense_type": "",
"expense_type_code": "",
}
participants: list[str] = []
for item in payload.ontology.entities:
if item.type == "employee" and not values["employee_name"]:
values["employee_name"] = item.value
elif item.type == "customer" and not values["customer"]:
values["customer"] = item.value
elif item.type == "amount" and item.role != "threshold" and not values["amount"]:
normalized_amount = str(item.normalized_value or "").strip()
values["amount"] = f"{normalized_amount}" if normalized_amount else item.value
elif item.type == "expense_type" and not values["expense_type_code"]:
values["expense_type_code"] = item.normalized_value
values["expense_type"] = EXPENSE_TYPE_LABELS.get(
item.normalized_value,
item.value,
)
elif item.type in {"participant", "person"} and item.value.strip():
participants.append(item.value.strip())
if participants:
values["participants"] = "".join(dict.fromkeys(participants))
return values
def _format_time_range(self, payload: UserAgentRequest) -> str:
time_range = payload.ontology.time_range
if time_range.start_date and time_range.end_date:
if time_range.start_date == time_range.end_date:
return time_range.start_date
normalized = f"{time_range.start_date}{time_range.end_date}"
return normalized
if time_range.raw:
return time_range.raw
return ""
def _resolve_location_value(self, payload: UserAgentRequest) -> str:
review_form_values = self._resolve_review_form_values(payload)
for key in ("business_location", "location"):
value = str(review_form_values.get(key) or "").strip()
if value:
return value
if str(payload.context_json.get("entry_source") or "").strip() == "detail":
request_context = payload.context_json.get("request_context")
if isinstance(request_context, dict):
for key in ("city", "location"):
value = str(request_context.get(key) or "").strip()
if value:
return value
labeled_match = re.search(r"(?:业务地点|发生地点|地点)[:]\s*(?P<value>[^\n]+)", payload.message)
if labeled_match:
return labeled_match.group("value").strip()
city_match = re.search(r"去(?P<city>[\u4e00-\u9fa5]{2,8})(?:出差|拜访|参会|见客户|客户现场)", payload.message)
if city_match:
return city_match.group("city").strip()
if "客户现场" in payload.message.replace(" ", ""):
return "客户现场"
return ""
@staticmethod
def _resolve_review_form_values(payload: UserAgentRequest) -> dict[str, str]:
values = payload.context_json.get("review_form_values")
if not isinstance(values, dict):
return {}
normalized: dict[str, str] = {}
for key, value in values.items():
cleaned_key = str(key or "").strip()
if not cleaned_key:
continue
normalized[cleaned_key] = str(value or "").strip()
return normalized
@staticmethod
def _build_slot_value(
*,
value: str = "",
raw_value: str = "",
normalized_value: str = "",
source: str = "system",
confidence: float = 0.0,
evidence: str = "",
) -> dict[str, str | float]:
return {
"value": str(value or "").strip(),
"raw_value": str(raw_value or "").strip(),
"normalized_value": str(normalized_value or "").strip(),
"source": str(source or "system").strip() or "system",
"confidence": float(confidence),
"evidence": str(evidence or "").strip(),
}
def _build_time_slot(self, payload: UserAgentRequest) -> dict[str, str | float]:
review_form_values = self._resolve_review_form_values(payload)
edited_value = str(
review_form_values.get("occurred_date")
or review_form_values.get("time_range")
or review_form_values.get("business_time")
or ""
).strip()
if edited_value:
raw_value = str(review_form_values.get("time_range_raw") or edited_value).strip()
return self._build_slot_value(
value=edited_value,
raw_value=raw_value,
normalized_value=edited_value,
source="user_form",
confidence=1.0,
evidence="来源于用户修改后的结构化表单。",
)
time_range = payload.ontology.time_range
if time_range.start_date and time_range.end_date:
normalized_value = (
time_range.start_date
if time_range.start_date == time_range.end_date
else f"{time_range.start_date}{time_range.end_date}"
)
raw_value = str(time_range.raw or "").strip()
return self._build_slot_value(
value=normalized_value,
raw_value=raw_value,
normalized_value=normalized_value,
source="user_text",
confidence=0.92,
evidence="系统已根据当前日期将相对时间换算为标准日期。",
)
return self._build_slot_value()
def _build_location_slot(self, payload: UserAgentRequest) -> dict[str, str | float]:
review_form_values = self._resolve_review_form_values(payload)
for key in ("business_location", "location"):
value = str(review_form_values.get(key) or "").strip()
if value:
return self._build_slot_value(
value=value,
normalized_value=value,
source="user_form",
confidence=1.0,
evidence="来源于用户修改后的结构化表单。",
)
if str(payload.context_json.get("entry_source") or "").strip() == "detail":
request_context = payload.context_json.get("request_context")
if isinstance(request_context, dict):
for key in ("city", "location"):
value = str(request_context.get(key) or "").strip()
if value:
return self._build_slot_value(
value=value,
normalized_value=value,
source="detail_context",
confidence=0.68,
evidence="来源于当前关联单据,仅作为辅助上下文,需要用户再次核对。",
)
value = self._resolve_location_value(payload)
if value:
evidence = "用户在文本中明确描述了业务地点。"
if value == "客户现场":
evidence = "用户明确提到“客户现场”,但未提供具体城市或地址。"
return self._build_slot_value(
value=value,
normalized_value=value,
source="user_text",
confidence=0.82,
evidence=evidence,
)
return self._build_slot_value()
def _build_customer_slot(
self,
payload: UserAgentRequest,
*,
entity_map: dict[str, str],
) -> dict[str, str | float]:
review_form_values = self._resolve_review_form_values(payload)
value = str(review_form_values.get("customer_name") or "").strip()
if value:
return self._build_slot_value(
value=value,
normalized_value=value,
source="user_form",
confidence=1.0,
evidence="来源于用户修改后的结构化表单。",
)
value = entity_map.get("customer", "")
if value:
return self._build_slot_value(
value=value,
normalized_value=value,
source="user_text",
confidence=0.88,
evidence="用户在原始描述中直接提到了客户对象。",
)
return self._build_slot_value()
def _build_participants_slot(
self,
payload: UserAgentRequest,
*,
entity_map: dict[str, str],
) -> dict[str, str | float]:
review_form_values = self._resolve_review_form_values(payload)
value = str(review_form_values.get("participants") or "").strip()
if value:
return self._build_slot_value(
value=value,
normalized_value=value,
source="user_form",
confidence=1.0,
evidence="来源于用户修改后的结构化表单。",
)
value = entity_map.get("participants", "")
if value:
return self._build_slot_value(
value=value,
normalized_value=value,
source="user_text",
confidence=0.8,
evidence="用户在当前描述中补充了参与人员。",
)
return self._build_slot_value()
def _build_reason_slot(
self,
payload: UserAgentRequest,
*,
claim_groups: list[UserAgentReviewClaimGroup],
) -> dict[str, str | float]:
review_form_values = self._resolve_review_form_values(payload)
edited_value = str(review_form_values.get("reason") or "").strip()
if edited_value:
return self._build_slot_value(
value=edited_value,
raw_value=edited_value,
normalized_value=edited_value,
source="user_form",
confidence=1.0,
evidence="来源于用户修改后的结构化表单。",
)
inferred_reason = self._infer_reason_from_claim_groups(
claim_groups=claim_groups,
)
reason_value = self._resolve_reason_text(self._resolve_reason_source_text(payload))
if inferred_reason:
return self._build_slot_value(
value=inferred_reason,
raw_value=reason_value or inferred_reason,
normalized_value=inferred_reason,
source="ocr",
confidence=0.82,
evidence=(
"系统已根据票据识别结果预置场景类型;原始描述仍保留为补充说明。"
if reason_value
else "系统已根据票据识别场景补全通用事由,若需更具体说明可继续修改。"
),
)
if reason_value:
return self._build_slot_value(
value=reason_value,
raw_value=reason_value,
normalized_value=reason_value,
source="user_text",
confidence=0.76,
evidence="系统从用户原始描述中提取了本次费用事由,建议继续核对。",
)
return self._build_slot_value()
def _build_amount_slot(
self,
payload: UserAgentRequest,
*,
entity_map: dict[str, str],
ocr_documents: list[dict[str, object]],
) -> dict[str, str | float]:
review_form_values = self._resolve_review_form_values(payload)
edited_amount = str(review_form_values.get("amount") or "").strip()
if edited_amount:
normalized = self._normalize_amount_text(edited_amount)
return self._build_slot_value(
value=normalized,
raw_value=edited_amount,
normalized_value=normalized,
source="user_form",
confidence=1.0,
evidence="来源于用户修改后的结构化表单。",
)
amount_value = entity_map.get("amount", "")
if amount_value:
normalized = self._normalize_amount_text(amount_value)
return self._build_slot_value(
value=normalized,
raw_value=amount_value,
normalized_value=normalized,
source="user_text",
confidence=0.92,
evidence="用户在原始描述中直接给出了金额。",
)
ocr_total_amount = self._sum_ocr_amounts(ocr_documents)
if ocr_total_amount > 0:
normalized = f"{ocr_total_amount:.2f}"
return self._build_slot_value(
value=normalized,
normalized_value=normalized,
source="ocr",
confidence=0.76,
evidence="金额来自 OCR 汇总结果,仍建议用户核对票据原文。",
)
return self._build_slot_value()
def _build_expense_type_slot(
self,
payload: UserAgentRequest,
*,
entity_map: dict[str, str],
ocr_documents: list[dict[str, object]],
) -> dict[str, str | float]:
review_form_values = self._resolve_review_form_values(payload)
edited_value = str(review_form_values.get("expense_type") or review_form_values.get("reimbursement_type") or "").strip()
if edited_value:
normalized_code, normalized_label = self._normalize_expense_type_input(edited_value)
return self._build_slot_value(
value=normalized_label,
raw_value=edited_value,
normalized_value=normalized_code,
source="user_form",
confidence=1.0,
evidence="来源于用户修改后的结构化表单。",
)
expense_type_code = entity_map.get("expense_type_code", "")
expense_type_value = EXPENSE_TYPE_LABELS.get(expense_type_code, entity_map.get("expense_type", ""))
if expense_type_value:
return self._build_slot_value(
value=expense_type_value,
raw_value=expense_type_value,
normalized_value=expense_type_code,
source="user_text",
confidence=0.9,
evidence="系统根据用户描述中的业务场景判断费用类型。",
)
inferred_label = self._infer_expense_type_from_documents(payload, ocr_documents) if ocr_documents else ""
if inferred_label:
normalized_code, normalized_label = self._normalize_expense_type_input(inferred_label)
return self._build_slot_value(
value=normalized_label,
raw_value=inferred_label,
normalized_value=normalized_code,
source="ocr",
confidence=0.74,
evidence="系统根据票据内容推断费用类型,仍建议用户确认。",
)
return self._build_slot_value()
def _build_merchant_slot(
self,
payload: UserAgentRequest,
*,
ocr_documents: list[dict[str, object]],
) -> dict[str, str | float]:
review_form_values = self._resolve_review_form_values(payload)
edited_value = str(review_form_values.get("merchant_name") or "").strip()
if edited_value:
return self._build_slot_value(
value=edited_value,
normalized_value=edited_value,
source="user_form",
confidence=1.0,
evidence="来源于用户修改后的结构化表单。",
)
merchant_value = self._extract_document_merchant_name(ocr_documents[0]) if ocr_documents else ""
if merchant_value:
return self._build_slot_value(
value=merchant_value,
normalized_value=merchant_value,
source="ocr",
confidence=0.72,
evidence="商户名称来自 OCR 票据识别结果,仍建议用户核对。",
)
return self._build_slot_value()
def _build_attachment_slot(self, payload: UserAgentRequest) -> dict[str, str | float]:
review_form_values = self._resolve_review_form_values(payload)
attachment_names = str(review_form_values.get("attachment_names") or "").strip()
if attachment_names:
return self._build_slot_value(
value=attachment_names,
normalized_value=attachment_names,
source="user_form",
confidence=1.0,
evidence="来源于用户修改后的结构化表单。",
)
count = self._resolve_attachment_count(payload)
if count > 0:
names = self._resolve_attachment_names(payload)
value = "".join(names) if names else f"{count} 份附件"
return self._build_slot_value(
value=value,
raw_value=value,
normalized_value=str(count),
source="upload",
confidence=1.0,
evidence="系统已接收到用户上传的附件。",
)
return self._build_slot_value()
@staticmethod
def _normalize_amount_text(value: str) -> str:
cleaned = str(value or "").strip()
if not cleaned:
return ""
for alias, canonical in sorted(AMOUNT_UNIT_ALIASES.items(), key=lambda item: len(item[0]), reverse=True):
cleaned = cleaned.replace(alias, canonical)
match = AMOUNT_TEXT_PATTERN.search(cleaned)
if not match:
return cleaned
number = float(match.group(1))
return f"{number:.2f}"
@staticmethod
def _normalize_expense_type_input(value: str) -> tuple[str, str]:
compact = str(value or "").replace(" ", "")
if "招待" in compact or ("客户" in compact and any(keyword in compact for keyword in ("吃饭", "用餐", "宴请", "请客"))):
return "entertainment", "业务招待费"
if any(keyword in compact for keyword in ("差旅", "出差", "机票", "行程")):
return "travel", "差旅费"
if any(keyword in compact for keyword in ("住宿", "酒店", "宾馆")):
return "hotel", "住宿费"
if any(keyword in compact for keyword in ("交通", "打车", "网约车", "出租车", "乘车", "用车", "叫车", "车费", "车资", "的士", "停车")):
return "transport", "交通费"
if any(keyword in compact for keyword in ("餐费", "用餐", "午餐", "晚餐", "早餐", "伙食")):
return "meal", "餐费"
if "会务" in compact:
return "meeting", "会务费"
if any(keyword in compact for keyword in ("办公费", "办公用品", "文具", "耗材", "办公耗材", "打印纸", "办公设备", "键盘", "鼠标", "白板")):
return "office", "办公费"
if any(keyword in compact for keyword in ("培训费", "培训", "讲师费", "课时费", "课程费")):
return "training", "培训费"
if any(keyword in compact for keyword in ("通讯费", "话费", "流量费", "宽带费")):
return "communication", "通讯费"
if any(keyword in compact for keyword in ("福利费", "团建", "慰问", "节日福利", "体检费")):
return "welfare", "福利费"
return "other", str(value or "").strip() or "其他费用"
def _resolve_required_review_keys(
self,
payload: UserAgentRequest,
*,
primary_expense_type: str,
claim_groups: list[UserAgentReviewClaimGroup],
) -> set[str]:
required = {"expense_type", "time_range", "amount", "reason", "attachments"}
scene_codes = {
str(item.group_code or "").strip()
for item in claim_groups
if str(item.group_code or "").strip()
}
if primary_expense_type:
scene_codes.add(primary_expense_type)
for scene_code in scene_codes:
required.update(SCENE_REQUIRED_SLOT_KEYS.get(scene_code, set()))
compact_message = re.sub(r"\s+", "", self._resolve_reason_source_text(payload) or payload.message)
if "entertainment" in scene_codes or (
"客户" in compact_message and any(keyword in compact_message for keyword in ("招待", "吃饭", "用餐", "宴请", "请客"))
):
required.update({"customer_name", "participants"})
return required
@staticmethod
def _infer_reason_from_claim_groups(
*,
claim_groups: list[UserAgentReviewClaimGroup],
) -> str:
if len(claim_groups) == 1:
document_indexes = list(claim_groups[0].document_indexes or [])
if not document_indexes:
return ""
expense_type = str(claim_groups[0].expense_type or "").strip()
group_code = str(claim_groups[0].group_code or "").strip()
if expense_type:
return INFERRED_REASON_LABELS.get(expense_type, "") or str(claim_groups[0].scene_label or "").strip()
if group_code:
return INFERRED_REASON_LABELS.get(group_code, "") or str(claim_groups[0].scene_label or "").strip()
return ""
@staticmethod
def _resolve_review_missing_slot_keys(
payload: UserAgentRequest,
*,
slot_cards: list[UserAgentReviewSlotCard],
) -> list[str]:
required_keys = {item.key for item in slot_cards if item.required}
slot_map = {item.key: item for item in slot_cards}
missing_keys = {
item.key
for item in slot_cards
if item.required and (item.status == "missing" or not str(item.value).strip())
}
for key in payload.ontology.missing_slots:
normalized_key = str(key or "").strip()
if (
normalized_key
and normalized_key in required_keys
and (
normalized_key not in slot_map
or slot_map[normalized_key].status == "missing"
or not str(slot_map[normalized_key].value).strip()
)
):
missing_keys.add(normalized_key)
ordered_keys: list[str] = []
for item in slot_cards:
if item.required and item.key in missing_keys and item.key not in ordered_keys:
ordered_keys.append(item.key)
return ordered_keys
def _make_slot_card(
self,
*,
key: str,
value: str,
raw_value: str,
normalized_value: str,
source: str,
confidence: float,
evidence: str,
required: bool = True,
) -> UserAgentReviewSlotCard:
is_missing = required and not str(value).strip()
source_key = source if source in SOURCE_LABELS else "system"
return UserAgentReviewSlotCard(
key=key,
label=SLOT_LABELS.get(key, key),
value=str(value or "").strip(),
raw_value=str(raw_value or "").strip(),
normalized_value=str(normalized_value or "").strip(),
source=source,
source_label=SOURCE_LABELS.get(source_key, "系统判断"),
confidence=confidence,
required=required,
confirmed=not is_missing and source in {"user_text", "user_form"},
status="missing" if is_missing else "identified" if source in {"user_text", "user_form"} else "inferred",
hint=f"建议补充 {SLOT_LABELS.get(key, key)}"
if is_missing and required
else ("该字段来自系统辅助上下文,建议你再核对一次。" if source in {"detail_context", "ocr"} else ""),
evidence=evidence,
)
def _classify_document(
self,
item: dict[str, object],
payload: UserAgentRequest,
) -> dict[str, str]:
provided_type = str(item.get("document_type") or "").strip().lower()
expense_type_code = self._collect_entity_values(payload).get("expense_type_code", "")
has_customer = bool(self._collect_entity_values(payload).get("customer"))
if provided_type:
if provided_type in {"flight_itinerary", "train_ticket"}:
return {
"document_type": provided_type,
"expense_type": "travel",
"group_code": "travel",
"scene_label": "差旅票据",
}
if provided_type == "hotel_invoice":
return {
"document_type": provided_type,
"expense_type": "hotel",
"group_code": "travel",
"scene_label": "住宿票据",
}
if provided_type in {"taxi_receipt", "parking_toll_receipt"}:
return {
"document_type": provided_type,
"expense_type": "transport",
"group_code": "travel",
"scene_label": "交通票据",
}
if provided_type == "meal_receipt":
group_code = "entertainment" if expense_type_code == "entertainment" or has_customer else "meal"
return {
"document_type": provided_type,
"expense_type": group_code,
"group_code": group_code,
"scene_label": "餐饮票据",
}
if provided_type == "office_invoice":
return {
"document_type": provided_type,
"expense_type": "office",
"group_code": "office",
"scene_label": "办公用品票据",
}
if provided_type == "meeting_invoice":
return {
"document_type": provided_type,
"expense_type": "meeting",
"group_code": "meeting",
"scene_label": "会务票据",
}
if provided_type == "training_invoice":
return {
"document_type": provided_type,
"expense_type": "training",
"group_code": "training",
"scene_label": "培训票据",
}
text = " ".join(
[
str(item.get("filename") or ""),
str(item.get("summary") or ""),
str(item.get("text") or ""),
]
).lower()
compact = text.replace(" ", "")
if any(keyword in compact for keyword in ("机票", "航班", "火车", "高铁", "行程单")):
return {
"document_type": "travel_ticket",
"expense_type": "travel",
"group_code": "travel",
"scene_label": "差旅票据",
}
if any(keyword in compact for keyword in ("酒店", "住宿", "宾馆")):
return {
"document_type": "hotel_invoice",
"expense_type": "hotel",
"group_code": "travel",
"scene_label": "住宿票据",
}
if any(keyword in compact for keyword in ("打车", "出租车", "滴滴", "网约车", "乘车", "用车", "叫车", "车费", "车资", "的士", "过路费", "停车")):
return {
"document_type": "transport_receipt",
"expense_type": "transport",
"group_code": "travel",
"scene_label": "交通票据",
}
if any(keyword in compact for keyword in ("", "饭店", "酒楼", "酒家", "餐饮", "meal")):
group_code = "entertainment" if expense_type_code == "entertainment" or has_customer else "meal"
return {
"document_type": "meal_receipt",
"expense_type": group_code,
"group_code": group_code,
"scene_label": "餐饮票据",
}
if any(keyword in compact for keyword in ("办公用品", "文具", "耗材", "办公耗材", "打印纸", "键盘", "鼠标", "白板", "墨盒", "硒鼓")):
return {
"document_type": "other",
"expense_type": "office",
"group_code": "office",
"scene_label": "办公用品票据",
}
return {
"document_type": "other",
"expense_type": expense_type_code or "other",
"group_code": self._normalize_group_code(expense_type_code or "other"),
"scene_label": "其他票据",
}
@staticmethod
def _normalize_group_code(expense_type_code: str) -> str:
if expense_type_code in {"travel", "hotel", "transport"}:
return "travel"
if expense_type_code in {"entertainment", "meal", "office", "training", "communication", "welfare"}:
return expense_type_code
return "other"
def _extract_document_fields(self, item: dict[str, object]) -> dict[str, str]:
raw_fields = item.get("document_fields")
normalized_fields: dict[str, str] = {}
if isinstance(raw_fields, list):
for field in raw_fields:
if not isinstance(field, dict):
continue
key = str(field.get("key") or "").strip()
label = str(field.get("label") or "").strip()
value = str(field.get("value") or "").strip()
if not value:
continue
normalized_label = self._normalize_document_field_label(key=key, label=label)
display_label = normalized_label or label
normalized_value = self._normalize_document_field_value(
label=display_label,
value=value,
)
if display_label and normalized_value:
normalized_fields.setdefault(display_label, normalized_value)
text = " ".join([str(item.get("summary") or ""), str(item.get("text") or "")]).strip()
amount_value = self._extract_amount_text_from_value(text)
if amount_value and "金额" not in normalized_fields:
normalized_fields["金额"] = amount_value
date_match = DATE_TEXT_PATTERN.search(text)
if date_match and "时间" not in normalized_fields:
normalized_fields["时间"] = date_match.group(1)
merchant = self._extract_document_merchant_name_from_text(text)
if merchant and "商户/酒店" not in normalized_fields:
normalized_fields["商户/酒店"] = merchant
return normalized_fields
@staticmethod
def _normalize_document_field_label(*, key: str, label: str) -> str:
compact_key = str(key or "").strip().lower().replace("_", "")
compact_label = str(label or "").replace(" ", "")
if compact_key in {
"amount",
"totalamount",
"paymentamount",
"paidamount",
"actualamount",
} or any(
token in compact_label
for token in ("金额", "价税合计", "合计", "总额", "总计", "票价", "支付金额", "实付金额", "实收金额")
):
return "金额"
if compact_key in {"date", "time", "issuedat", "invoicedate"} or any(
token in compact_label for token in ("日期", "时间", "开票日期", "发生时间")
):
return "时间"
if compact_key in {"merchant", "merchantname", "sellername", "vendorname"} or any(
token in compact_label for token in ("商户", "酒店", "销售方", "开票方", "收款方")
):
return "商户/酒店"
return label
def _normalize_document_field_value(self, *, label: str, value: str) -> str:
normalized_label = str(label or "").strip()
raw_value = str(value or "").strip()
if not normalized_label or not raw_value:
return ""
if normalized_label == "金额":
return self._extract_amount_text_from_value(raw_value) or raw_value
if normalized_label == "时间":
match = DATE_TEXT_PATTERN.search(raw_value)
return match.group(1) if match else raw_value
return raw_value
def _extract_amount_text_from_value(self, value: str) -> str:
raw_value = str(value or "").strip()
if not raw_value:
return ""
best_amount: Decimal | None = None
for pattern in (DOCUMENT_AMOUNT_PATTERN, DOCUMENT_CURRENCY_AMOUNT_PATTERN, AMOUNT_TEXT_PATTERN):
for match in pattern.finditer(raw_value):
try:
candidate = Decimal(str(match.group(1)).replace(",", "."))
except (InvalidOperation, TypeError):
continue
if candidate <= Decimal("0.00"):
continue
if best_amount is None or candidate > best_amount:
best_amount = candidate
if best_amount is None:
return ""
return f"{best_amount.quantize(Decimal('0.01')):.2f}"
def _extract_document_merchant_name(self, item: dict[str, object]) -> str:
fields = self._extract_document_fields(item)
merchant = str(fields.get("商户/酒店") or "").strip()
if merchant:
return merchant
text = " ".join([str(item.get("summary") or ""), str(item.get("text") or "")]).strip()
return self._extract_document_merchant_name_from_text(text)
@staticmethod
def _extract_document_merchant_name_from_text(text: str) -> str:
for keyword in ("酒店", "宾馆", "饭店", "酒楼", "餐厅", "航空", "铁路", "滴滴"):
if keyword in text:
return keyword
return ""
@staticmethod
def _extract_amount_from_card(card: UserAgentReviewDocumentCard) -> float:
for item in card.fields:
if item.label != "金额":
continue
try:
normalized_value = str(item.value).replace("", "").replace("", "").replace("¥", "").strip()
return float(normalized_value)
except ValueError:
return 0.0
return 0.0
def _resolve_amount_value(self, payload: UserAgentRequest) -> float:
for item in payload.ontology.entities:
if item.type == "amount" and item.role != "threshold":
try:
return float(item.normalized_value)
except ValueError:
return 0.0
return 0.0
def _sum_ocr_amounts(self, ocr_documents: list[dict[str, object]]) -> float:
total = 0.0
for item in ocr_documents:
fields = self._extract_document_fields(item)
amount_text = str(fields.get("金额") or "").replace("", "").replace("", "").replace("¥", "").strip()
if not amount_text:
continue
try:
total += float(amount_text)
except ValueError:
continue
return total
def _infer_expense_type_from_documents(
self,
payload: UserAgentRequest,
ocr_documents: list[dict[str, object]],
) -> str:
labels: list[str] = []
for item in ocr_documents:
classified = self._classify_document(item, payload)
label = GROUP_SCENE_LABELS.get(classified["group_code"], "")
if label and label not in labels:
labels.append(label)
return " + ".join(labels[:3])