Files
X-Financial/server/src/app/services/user_agent.py
caoxiaozhu 6317fc0ccd refactor(backend): update reimbursement and related services
- endpoints/reimbursements.py: update reimbursement API endpoint
- schemas/reimbursement.py: update reimbursement data schemas
- services/expense_claims.py: update expense claims service
- services/ontology.py: update ontology service
- services/user_agent.py: update user agent service
2026-05-13 06:45:04 +00:00

2060 lines
84 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
import json
import re
from datetime import UTC, datetime, timedelta
from sqlalchemy import or_, select
from sqlalchemy.orm import Session
from app.core.agent_enums import AgentAssetStatus, AgentAssetType
from app.models.employee import Employee
from app.models.financial_record import ExpenseClaim
from app.schemas.agent_asset import AgentAssetListItem
from app.schemas.user_agent import (
UserAgentCitation,
UserAgentDraftPayload,
UserAgentReviewAction,
UserAgentReviewEditField,
UserAgentReviewClaimGroup,
UserAgentReviewDocumentCard,
UserAgentReviewDocumentField,
UserAgentReviewPayload,
UserAgentReviewRiskBrief,
UserAgentReviewSlotCard,
UserAgentRequest,
UserAgentResponse,
UserAgentSuggestedAction,
)
from app.services.agent_assets import AgentAssetService
from app.services.agent_foundation import AgentFoundationService
from app.services.runtime_chat import RuntimeChatService
SCENARIO_LABELS = {
"expense": "报销",
"accounts_receivable": "应收",
"accounts_payable": "应付",
"knowledge": "知识",
"unknown": "通用",
}
RISK_REASON_MAP = {
"duplicate_expense": "检测到同员工、同金额或近似单据存在重复提交迹象。",
"amount_over_limit": "金额超过当前制度或预算阈值,需要补充例外说明。",
"invoice_anomaly": "票据或附件完整性不满足当前规则要求,需要补件或人工复核。",
"ar_overdue": "应收账款已出现逾期,存在回款延迟风险。",
"ap_overdue": "应付付款已出现逾期,可能影响供应商履约或合作关系。",
}
GENERIC_EXPENSE_PROMPTS = {
"报销",
"我要报销",
"我想报销",
"帮我报销",
"我要申请报销",
"发起报销",
"提交报销",
}
EXPLICIT_DRAFT_KEYWORDS = ("生成", "草稿", "起草", "创建", "发起", "准备")
EXPENSE_TYPE_LABELS = {
"travel": "差旅费",
"hotel": "住宿费",
"transport": "交通费",
"meal": "餐费",
"meeting": "会务费",
"entertainment": "业务招待费",
"office": "办公费",
"training": "培训费",
"communication": "通讯费",
"welfare": "福利费",
"other": "其他费用",
}
GROUP_SCENE_LABELS = {
"travel": "差旅费",
"entertainment": "业务招待费",
"meal": "伙食费",
"transport": "交通费",
"hotel": "住宿费",
"office": "办公费",
"training": "培训费",
"communication": "通讯费",
"welfare": "福利费",
"other": "其他费用",
}
SLOT_LABELS = {
"expense_type": "报销类型",
"customer_name": "客户名称",
"time_range": "发生时间",
"location": "地点",
"merchant_name": "酒店/商户",
"amount": "金额",
"reason": "事由说明",
"participants": "参与人员",
"attachments": "票据附件",
}
DATE_TEXT_PATTERN = re.compile(r"(\d{4}[年/-]\d{1,2}[月/-]\d{1,2}日?)")
AMOUNT_TEXT_PATTERN = re.compile(r"(\d+(?:\.\d+)?)\s*(?:元|万元|万)")
SOURCE_LABELS = {
"user_text": "用户描述",
"user_form": "用户修改",
"ocr": "票据识别",
"upload": "上传附件",
"detail_context": "关联单据",
"system_context": "系统上下文",
"inferred": "语义推断",
"system": "系统判断",
}
class UserAgentService:
def __init__(self, db: Session) -> None:
self.db = db
self.asset_service = AgentAssetService(db)
self.runtime_chat_service = RuntimeChatService(db)
def respond(self, payload: UserAgentRequest) -> UserAgentResponse:
AgentFoundationService(self.db).ensure_foundation_ready()
citations = self._build_rule_citations(payload)
suggested_actions = self._build_suggested_actions(payload)
risk_flags = self._resolve_risk_flags(payload)
draft_payload = (
self._build_draft_payload(payload)
if payload.ontology.intent == "draft"
else None
)
review_payload = self._build_review_payload(
payload,
citations=citations,
draft_payload=draft_payload,
)
review_answer = self._build_review_body_answer(
payload,
review_payload=review_payload,
draft_payload=draft_payload,
)
if payload.degraded and payload.tool_payload.get("message"):
return UserAgentResponse(
answer=review_answer or str(payload.tool_payload["message"]),
citations=citations,
suggested_actions=suggested_actions,
review_payload=review_payload,
risk_flags=risk_flags,
requires_confirmation=payload.requires_confirmation,
)
if review_answer:
return UserAgentResponse(
answer=review_answer,
citations=citations,
suggested_actions=suggested_actions,
draft_payload=draft_payload,
review_payload=review_payload,
risk_flags=risk_flags,
requires_confirmation=payload.requires_confirmation,
)
guided_answer = None
if draft_payload is None or draft_payload.claim_id is None:
guided_answer = self._build_guided_answer(payload)
if guided_answer:
return UserAgentResponse(
answer=guided_answer,
citations=citations,
suggested_actions=suggested_actions,
draft_payload=draft_payload,
review_payload=review_payload,
risk_flags=risk_flags,
requires_confirmation=payload.requires_confirmation,
)
fallback_answer = self._build_fallback_answer(
payload,
citations=citations,
draft_payload=draft_payload,
)
answer = None
if not self._should_skip_model_answer(payload, review_payload):
answer = self._generate_answer_with_model(
payload,
citations=citations,
suggested_actions=suggested_actions,
risk_flags=risk_flags,
draft_payload=draft_payload,
fallback_answer=fallback_answer,
)
return UserAgentResponse(
answer=answer or fallback_answer,
citations=citations,
suggested_actions=suggested_actions,
draft_payload=draft_payload,
review_payload=review_payload,
risk_flags=risk_flags,
requires_confirmation=payload.requires_confirmation,
)
def _build_fallback_answer(
self,
payload: UserAgentRequest,
*,
citations: list[UserAgentCitation],
draft_payload: UserAgentDraftPayload | None,
) -> str:
if payload.ontology.intent in {"query", "compare"}:
return self._build_query_answer(payload)
if payload.ontology.intent == "risk_check":
return self._build_risk_answer(payload, citations)
if payload.ontology.intent == "draft":
tool_message = str(payload.tool_payload.get("message") or "").strip()
if payload.tool_payload.get("draft_limit_reached"):
return tool_message or "你当前已保存 3 个草稿,请先完成已保存的草稿,才能再次新建草稿。"
if tool_message and (
str(payload.tool_payload.get("claim_id") or "").strip()
or str(payload.tool_payload.get("claim_no") or "").strip()
):
return tool_message
if payload.ontology.intent == "draft" and draft_payload is not None:
return (
f"已生成 {draft_payload.title},当前仅返回待人工确认的草稿内容,"
"仍需人工确认后再进入正式流程。"
)
return self._build_explain_answer(payload, citations)
def _build_guided_answer(self, payload: UserAgentRequest) -> str | None:
if not self._is_generic_expense_prompt(payload):
return self._build_implicit_expense_draft_guidance(payload)
attachment_names = self._resolve_attachment_names(payload)
ocr_summary = str(payload.context_json.get("ocr_summary") or "").strip()
attachment_hint = ""
if ocr_summary:
attachment_hint = f" 我已读取附件 OCR 摘要:{ocr_summary}"
elif attachment_names:
attachment_hint = (
f" 我已带入 {len(attachment_names)} 份附件名称,但目前还不能直接读取附件内容,"
"仍需要你补充关键信息。"
)
return (
"可以帮你发起报销。请补充费用类型、发生时间、金额、事由和相关对象,"
"或者直接上传票据附件,我再继续帮你判断能否报、缺什么材料以及生成报销草稿。"
f"{attachment_hint}"
)
def _build_implicit_expense_draft_guidance(
self,
payload: UserAgentRequest,
) -> str | None:
if not self._is_implicit_expense_draft_request(payload):
return None
amount_text = next(
(item.value for item in payload.ontology.entities if item.type == "amount"),
"",
)
expense_type = next(
(
EXPENSE_TYPE_LABELS.get(item.normalized_value, item.value)
for item in payload.ontology.entities
if item.type == "expense_type"
),
"报销",
)
time_text = payload.ontology.time_range.raw or "本次"
amount_hint = f",金额 {amount_text}" if amount_text else ""
return (
f"已识别到一笔{time_text}{expense_type}支出{amount_hint}"
"如果要继续生成报销草稿,还需要补充客户单位、参与人员、费用明细和票据附件。"
"你也可以继续上传发票或图片,我会把这些信息带入后续对话。"
)
def _generate_answer_with_model(
self,
payload: UserAgentRequest,
*,
citations: list[UserAgentCitation],
suggested_actions: list[UserAgentSuggestedAction],
risk_flags: list[str],
draft_payload: UserAgentDraftPayload | None,
fallback_answer: str,
) -> str | None:
messages = self._build_model_messages(
payload,
citations=citations,
suggested_actions=suggested_actions,
risk_flags=risk_flags,
draft_payload=draft_payload,
fallback_answer=fallback_answer,
)
return self._sanitize_model_answer(
self.runtime_chat_service.complete(
messages,
max_tokens=420,
temperature=0.2,
)
)
def _sanitize_model_answer(self, answer: str | None) -> str | None:
if not answer:
return None
cleaned = re.sub(r"<think>.*?</think>", "", answer, flags=re.DOTALL | re.IGNORECASE)
cleaned = cleaned.strip()
return cleaned or None
def _build_model_messages(
self,
payload: UserAgentRequest,
*,
citations: list[UserAgentCitation],
suggested_actions: list[UserAgentSuggestedAction],
risk_flags: list[str],
draft_payload: UserAgentDraftPayload | None,
fallback_answer: str,
) -> list[dict[str, str]]:
facts = {
"run_id": payload.run_id,
"user_message": payload.message,
"ontology": payload.ontology.model_dump(mode="json"),
"context": {
"entry_source": payload.context_json.get("entry_source"),
"user_name": payload.context_json.get("name"),
"user_role": payload.context_json.get("role"),
"request_context": payload.context_json.get("request_context"),
"attachment_count": payload.context_json.get("attachment_count"),
"attachment_names": self._resolve_attachment_names(payload),
"ocr_summary": payload.context_json.get("ocr_summary", ""),
"ocr_documents": payload.context_json.get("ocr_documents", []),
"conversation_id": payload.context_json.get("conversation_id"),
"conversation_scenario": payload.context_json.get("conversation_scenario"),
"conversation_intent": payload.context_json.get("conversation_intent"),
"draft_claim_id": payload.context_json.get("draft_claim_id"),
"conversation_history": self._resolve_conversation_history(payload),
},
"tool_payload": payload.tool_payload,
"citations": [item.model_dump(mode="json") for item in citations],
"suggested_actions": [
item.model_dump(mode="json") for item in suggested_actions
],
"risk_flags": risk_flags,
"draft_payload": (
draft_payload.model_dump(mode="json")
if draft_payload is not None
else None
),
"selected_capability_codes": payload.selected_capability_codes,
"requires_confirmation": payload.requires_confirmation,
"fallback_answer": fallback_answer,
}
system_prompt = (
"你是企业财务共享场景中的中文智能助手,负责和最终用户直接对话。"
"你只能基于提供的事实回答,不能编造制度、流程结果或附件内容。"
"如果用户问题很笼统,例如“我要报销”,优先告诉用户你可以协助什么,"
"并明确要求补充费用类型、金额、时间、事由、参与对象或上传票据。"
"如果上下文里只有附件名称,必须明确说明你只拿到了附件名称,"
"不能假装已看过图片、PDF 或发票内容。"
"如果提供了 conversation_history必须结合最近轮次理解追问、代词、省略字段和补充信息。"
"不要声称已经提交、审批、付款、入账或真正执行了任何动作;如果只是建议、草稿或待确认,要明确说清楚。"
"若给出了风险标签、制度引用或建议动作,可以简洁吸收进回答,但不要新增未提供的事实。"
"只输出最终给用户看的自然语言,不要输出 JSON、Markdown、标题、"
"<think> 标签或任何中间推理。"
"使用简体中文,控制在 2 到 4 句。"
)
user_prompt = (
"请根据以下事实生成最终答复,优先保持准确、具体、可执行:\n"
f"{json.dumps(facts, ensure_ascii=False, indent=2)}"
)
return [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
]
def _build_query_answer(self, payload: UserAgentRequest) -> str:
scenario = payload.ontology.scenario
data = payload.tool_payload
subject = self._resolve_subject(payload)
if scenario == "expense":
record_count = int(data.get("record_count") or 0)
total_amount = float(data.get("total_amount") or 0)
return (
f"{subject}共命中 {record_count} 笔报销,金额合计 {total_amount:.2f} 元。"
"如需继续处理,可以查看明细或生成处理意见草稿。"
)
if scenario == "accounts_receivable":
record_count = int(data.get("record_count") or 0)
outstanding_amount = float(data.get("outstanding_amount") or 0)
return (
f"{subject}共命中 {record_count} 条应收,未回款金额 {outstanding_amount:.2f} 元。"
"建议结合账龄和客户分布继续排查逾期风险。"
)
if scenario == "accounts_payable":
record_count = int(data.get("record_count") or 0)
outstanding_amount = float(data.get("outstanding_amount") or 0)
return (
f"{subject}共命中 {record_count} 条应付,待付金额 {outstanding_amount:.2f} 元。"
"如需推进动作,建议先生成付款建议草稿并发起人工确认。"
)
return "已完成当前查询,但暂时没有更多结构化结果可展示。"
def _build_explain_answer(
self,
payload: UserAgentRequest,
citations: list[UserAgentCitation],
) -> str:
if citations:
titles = "".join(item.title for item in citations[:2])
summary = citations[0].excerpt or "请结合制度全文进一步确认。"
return f"已检索到相关依据:{titles}。核心说明:{summary}"
return (
f"当前还没有与“{SCENARIO_LABELS.get(payload.ontology.scenario, '当前问题')}"
"强匹配的已上线规则引用,建议先人工复核或补充更具体的单据上下文。"
)
def _build_risk_answer(
self,
payload: UserAgentRequest,
citations: list[UserAgentCitation],
) -> str:
risk_flags = self._resolve_risk_flags(payload)
if not risk_flags:
return "当前未识别到明确风险标签,建议继续查看原始明细或补充更多上下文。"
reasons = [RISK_REASON_MAP.get(flag, f"{flag} 需要人工进一步确认。") for flag in risk_flags]
citation_text = (
f" 参考规则:{''.join(item.title for item in citations[:2])}"
if citations
else ""
)
return (
f"本次识别到 {len(risk_flags)} 类风险:{''.join(risk_flags)}"
f"触发原因:{''.join(reasons)}"
"建议先复核明细、附件和审批链,再决定是否继续处理。"
f"{citation_text}"
)
def _build_draft_payload(self, payload: UserAgentRequest) -> UserAgentDraftPayload:
scenario_label = SCENARIO_LABELS.get(payload.ontology.scenario, "业务")
subject = self._resolve_subject(payload)
claim_no = str(payload.tool_payload.get("claim_no") or "").strip() or None
claim_status = str(payload.tool_payload.get("status") or "").strip() or None
title = f"{scenario_label}处理意见草稿"
if claim_no:
title = f"{scenario_label}草稿 {claim_no}"
body = (
f"主题:{subject}\n"
"结论:已根据当前语义解析结果生成草稿,尚未自动执行。\n"
"建议:请先核对明细、规则命中和所需附件,再由人工确认是否提交正式流程。\n"
f"原始问题:{payload.message}"
)
return UserAgentDraftPayload(
draft_type=payload.ontology.scenario,
title=title,
body=body,
confirmation_required=True,
claim_id=str(payload.tool_payload.get("claim_id") or "").strip() or None,
claim_no=claim_no,
status=claim_status,
)
def _build_suggested_actions(
self,
payload: UserAgentRequest,
) -> list[UserAgentSuggestedAction]:
if self._is_generic_expense_prompt(payload):
return [
UserAgentSuggestedAction(
label="上传票据",
action_type="ask_clarification",
description="上传发票、行程单或付款截图,继续识别报销内容。",
),
UserAgentSuggestedAction(
label="补充报销信息",
action_type="ask_clarification",
description="补充费用类型、金额、时间和事由后继续处理。",
),
]
if payload.ontology.intent in {"query", "compare"}:
return [
UserAgentSuggestedAction(
label="查看明细",
action_type="open_detail",
description="继续查看命中记录和过滤条件。",
),
UserAgentSuggestedAction(
label="生成处理意见",
action_type="create_draft",
description="把当前查询结果整理成可确认草稿。",
),
]
if payload.ontology.intent == "risk_check":
return [
UserAgentSuggestedAction(
label="人工复核风险",
action_type="manual_review",
description="优先检查明细、附件和规则命中原因。",
),
UserAgentSuggestedAction(
label="生成整改建议",
action_type="create_draft",
description="把风险说明整理成处理意见草稿。",
),
]
if payload.ontology.intent == "draft":
return [
UserAgentSuggestedAction(
label="复制草稿",
action_type="copy_draft",
description="复制当前草稿后交由人工确认。",
),
UserAgentSuggestedAction(
label="补充上下文",
action_type="ask_clarification",
description="补充单据编号、客户或供应商信息以完善草稿。",
),
]
return [
UserAgentSuggestedAction(
label="查看规则全文",
action_type="open_rule",
description="继续查看引用规则或知识内容。",
),
UserAgentSuggestedAction(
label="补充问题上下文",
action_type="ask_clarification",
description="补充业务对象、时间或单据范围,提升回答准确度。",
),
]
def _build_review_payload(
self,
payload: UserAgentRequest,
*,
citations: list[UserAgentCitation],
draft_payload: UserAgentDraftPayload | None,
) -> UserAgentReviewPayload | None:
attachment_count = self._resolve_attachment_count(payload)
ocr_documents = self._resolve_ocr_documents(payload)
if payload.ontology.scenario != "expense":
return None
if payload.ontology.intent not in {"draft", "operate"} and attachment_count <= 0 and not ocr_documents:
return None
document_cards = self._build_review_document_cards(payload, ocr_documents=ocr_documents)
claim_groups = self._build_review_claim_groups(
payload,
document_cards=document_cards,
)
slot_cards = self._build_review_slot_cards(
payload,
ocr_documents=ocr_documents,
claim_groups=claim_groups,
)
missing_slot_keys = self._resolve_review_missing_slot_keys(
payload,
slot_cards=slot_cards,
)
risk_briefs = self._build_review_risk_briefs(
payload,
citations=citations,
document_cards=document_cards,
claim_groups=claim_groups,
)
can_proceed = self._can_proceed_review(
payload,
missing_slot_keys=missing_slot_keys,
claim_groups=claim_groups,
)
confirmation_actions = self._build_review_confirmation_actions(
payload,
can_proceed=can_proceed,
claim_groups=claim_groups,
draft_payload=draft_payload,
)
edit_fields = self._build_review_edit_fields(
payload,
draft_payload=draft_payload,
slot_cards=slot_cards,
)
intent_summary = self._build_review_intent_summary(
payload,
slot_cards=slot_cards,
claim_groups=claim_groups,
)
body_message = self._build_review_body_message(
payload,
can_proceed=can_proceed,
draft_payload=draft_payload,
missing_slot_labels=[SLOT_LABELS.get(key, key) for key in missing_slot_keys],
)
return UserAgentReviewPayload(
intent_summary=intent_summary,
body_message=body_message,
scenario=payload.ontology.scenario,
intent=payload.ontology.intent,
can_proceed=can_proceed,
missing_slots=[SLOT_LABELS.get(key, key) for key in missing_slot_keys],
risk_briefs=risk_briefs,
slot_cards=slot_cards,
document_cards=document_cards,
claim_groups=claim_groups,
confirmation_actions=confirmation_actions,
edit_fields=edit_fields,
)
def _build_review_slot_cards(
self,
payload: UserAgentRequest,
*,
ocr_documents: list[dict[str, object]],
claim_groups: list[UserAgentReviewClaimGroup],
) -> list[UserAgentReviewSlotCard]:
entity_map = self._collect_entity_values(payload)
time_slot = self._build_time_slot(payload)
location_slot = self._build_location_slot(payload)
customer_slot = self._build_customer_slot(payload, entity_map=entity_map)
participants_slot = self._build_participants_slot(payload, entity_map=entity_map)
amount_slot = self._build_amount_slot(payload, entity_map=entity_map, ocr_documents=ocr_documents)
expense_type_slot = self._build_expense_type_slot(
payload,
entity_map=entity_map,
ocr_documents=ocr_documents,
)
merchant_slot = self._build_merchant_slot(payload, ocr_documents=ocr_documents)
reason_slot = self._build_reason_slot(payload)
attachment_slot = self._build_attachment_slot(payload)
required_keys = self._resolve_required_review_keys(
payload,
primary_expense_type=str(expense_type_slot["normalized_value"] or ""),
claim_groups=claim_groups,
)
cards = [
self._make_slot_card(
key="expense_type",
value=expense_type_slot["value"],
raw_value=expense_type_slot["raw_value"],
normalized_value=expense_type_slot["normalized_value"],
source=expense_type_slot["source"],
confidence=expense_type_slot["confidence"],
evidence=expense_type_slot["evidence"],
required="expense_type" in required_keys,
),
self._make_slot_card(
key="customer_name",
value=customer_slot["value"],
raw_value=customer_slot["raw_value"],
normalized_value=customer_slot["normalized_value"],
source=customer_slot["source"],
confidence=customer_slot["confidence"],
evidence=customer_slot["evidence"],
required="customer_name" in required_keys,
),
self._make_slot_card(
key="time_range",
value=time_slot["value"],
raw_value=time_slot["raw_value"],
normalized_value=time_slot["normalized_value"],
source=time_slot["source"],
confidence=time_slot["confidence"],
evidence=time_slot["evidence"],
required="time_range" in required_keys,
),
self._make_slot_card(
key="location",
value=location_slot["value"],
raw_value=location_slot["raw_value"],
normalized_value=location_slot["normalized_value"],
source=location_slot["source"],
confidence=location_slot["confidence"],
evidence=location_slot["evidence"],
required="location" in required_keys,
),
self._make_slot_card(
key="merchant_name",
value=merchant_slot["value"],
raw_value=merchant_slot["raw_value"],
normalized_value=merchant_slot["normalized_value"],
source=merchant_slot["source"],
confidence=merchant_slot["confidence"],
evidence=merchant_slot["evidence"],
required="merchant_name" in required_keys,
),
self._make_slot_card(
key="amount",
value=amount_slot["value"],
raw_value=amount_slot["raw_value"],
normalized_value=amount_slot["normalized_value"],
source=amount_slot["source"],
confidence=amount_slot["confidence"],
evidence=amount_slot["evidence"],
required="amount" in required_keys,
),
self._make_slot_card(
key="reason",
value=reason_slot["value"],
raw_value=reason_slot["raw_value"],
normalized_value=reason_slot["normalized_value"],
source=reason_slot["source"],
confidence=reason_slot["confidence"],
evidence=reason_slot["evidence"],
required="reason" in required_keys,
),
self._make_slot_card(
key="participants",
value=participants_slot["value"],
raw_value=participants_slot["raw_value"],
normalized_value=participants_slot["normalized_value"],
source=participants_slot["source"],
confidence=participants_slot["confidence"],
evidence=participants_slot["evidence"],
required="participants" in required_keys,
),
self._make_slot_card(
key="attachments",
value=attachment_slot["value"],
raw_value=attachment_slot["raw_value"],
normalized_value=attachment_slot["normalized_value"],
source=attachment_slot["source"],
confidence=attachment_slot["confidence"],
evidence=attachment_slot["evidence"],
required="attachments" in required_keys,
),
]
return cards
def _build_review_document_cards(
self,
payload: UserAgentRequest,
*,
ocr_documents: list[dict[str, object]],
) -> list[UserAgentReviewDocumentCard]:
cards: list[UserAgentReviewDocumentCard] = []
for index, item in enumerate(ocr_documents, start=1):
classified = self._classify_document(item, payload)
fields = self._extract_document_fields(item)
cards.append(
UserAgentReviewDocumentCard(
index=index,
filename=str(item.get("filename") or f"document-{index}"),
document_type=classified["document_type"],
suggested_expense_type=classified["expense_type"],
scene_label=GROUP_SCENE_LABELS.get(
classified["group_code"],
classified["scene_label"],
),
summary=str(item.get("summary") or item.get("text") or "").strip(),
avg_score=float(item.get("avg_score") or 0.0),
warnings=[str(warning) for warning in item.get("warnings", []) if str(warning).strip()],
fields=[
UserAgentReviewDocumentField(
label=label,
value=value,
source="ocr",
)
for label, value in fields.items()
if str(value).strip()
],
)
)
return cards
def _build_review_claim_groups(
self,
payload: UserAgentRequest,
*,
document_cards: list[UserAgentReviewDocumentCard],
) -> list[UserAgentReviewClaimGroup]:
groups: dict[str, dict[str, object]] = {}
for card in document_cards:
group_code = self._normalize_group_code(card.suggested_expense_type)
bucket = groups.setdefault(
group_code,
{
"document_indexes": [],
"amount_total": 0.0,
"expense_type": group_code,
"scene_label": GROUP_SCENE_LABELS.get(group_code, "其他费用"),
"reasons": [],
},
)
bucket["document_indexes"].append(card.index)
bucket["amount_total"] = float(bucket["amount_total"]) + self._extract_amount_from_card(card)
bucket["reasons"].append(f"{card.filename} 识别为 {card.scene_label}")
if not groups:
expense_type_code = self._collect_entity_values(payload).get("expense_type_code", "other")
group_code = self._normalize_group_code(expense_type_code)
groups[group_code] = {
"document_indexes": [],
"amount_total": self._resolve_amount_value(payload),
"expense_type": expense_type_code or "other",
"scene_label": GROUP_SCENE_LABELS.get(group_code, "其他费用"),
"reasons": ["当前主要依据用户文本和页面上下文进行分单建议。"],
}
claim_groups: list[UserAgentReviewClaimGroup] = []
for index, (group_code, bucket) in enumerate(groups.items(), start=1):
title = f"建议报销单 {index}{bucket['scene_label']}"
rationale = (
"".join(dict.fromkeys(str(item) for item in bucket["reasons"]))
if bucket["reasons"]
else "当前仅有单一场景,无需拆单。"
)
claim_groups.append(
UserAgentReviewClaimGroup(
group_code=group_code,
title=title,
expense_type=str(bucket["expense_type"]),
scene_label=str(bucket["scene_label"]),
document_indexes=list(bucket["document_indexes"]),
amount_total=round(float(bucket["amount_total"]), 2),
rationale=rationale,
)
)
return claim_groups
def _build_review_risk_briefs(
self,
payload: UserAgentRequest,
*,
citations: list[UserAgentCitation],
document_cards: list[UserAgentReviewDocumentCard],
claim_groups: list[UserAgentReviewClaimGroup],
) -> list[UserAgentReviewRiskBrief]:
briefs: list[UserAgentReviewRiskBrief] = []
employee_name = self._collect_entity_values(payload).get("employee_name") or str(
payload.context_json.get("name") or ""
).strip()
if employee_name:
since = datetime.now(UTC) - timedelta(days=90)
stmt = select(ExpenseClaim).where(
ExpenseClaim.employee_name == employee_name,
ExpenseClaim.occurred_at >= since,
)
recent_claims = list(self.db.scalars(stmt).all())
if recent_claims:
risky_count = sum(1 for item in recent_claims if item.risk_flags_json)
draft_count = sum(1 for item in recent_claims if item.status == "draft")
briefs.append(
UserAgentReviewRiskBrief(
title="历史报销画像",
level="info",
content=(
f"{employee_name} 最近 90 天共有 {len(recent_claims)} 笔报销,"
f"其中 {risky_count} 笔带风险标记,{draft_count} 笔仍处于草稿态。"
),
)
)
current_amount = self._resolve_amount_value(payload)
if current_amount > 0:
duplicate_count = sum(
1
for item in recent_claims
if abs(float(item.amount) - current_amount) < 0.01
)
if duplicate_count:
briefs.append(
UserAgentReviewRiskBrief(
title="金额重复预警",
level="warning",
content=(
f"近 90 天发现 {duplicate_count} 笔金额相同的报销记录,"
"提交前建议核对是否为重复报销或拆分不当。"
),
)
)
if citations:
briefs.append(
UserAgentReviewRiskBrief(
title="制度注意事项",
level="info",
content=citations[0].excerpt or f"请先核对 {citations[0].title} 的制度要求。",
)
)
warning_count = sum(len(item.warnings) for item in document_cards)
if warning_count:
briefs.append(
UserAgentReviewRiskBrief(
title="票据识别提醒",
level="warning",
content=f"当前共有 {warning_count} 条票据识别提示,建议逐张确认 OCR 识别字段。",
)
)
if len(claim_groups) > 1:
briefs.append(
UserAgentReviewRiskBrief(
title="建议拆单",
level="high",
content=f"系统检测到 {len(claim_groups)} 类费用场景,建议拆成多张报销单后再提交。",
)
)
return briefs[:4]
def _build_review_confirmation_actions(
self,
payload: UserAgentRequest,
*,
can_proceed: bool,
claim_groups: list[UserAgentReviewClaimGroup],
draft_payload: UserAgentDraftPayload | None,
) -> list[UserAgentReviewAction]:
primary_action = UserAgentReviewAction(
label="下一步" if can_proceed else "保存草稿",
action_type="next_step" if can_proceed else "save_draft",
description=(
"当前识别信息已满足继续流转条件,确认后进入下一步。"
if can_proceed
else "当前信息仍未补齐,先保存为草稿,后续可继续补充。"
),
emphasis="primary",
)
if len(claim_groups) > 1 and can_proceed:
primary_action.description = f"系统建议拆分为 {len(claim_groups)} 张报销单,确认后进入下一步。"
if draft_payload is not None and draft_payload.claim_no and not can_proceed:
primary_action.description = f"会先保存到草稿 {draft_payload.claim_no},缺失信息后续再补。"
return [
UserAgentReviewAction(
label="取消",
action_type="cancel_review",
description="放弃当前识别结果,并退出本次核对流程。",
emphasis="secondary",
),
UserAgentReviewAction(
label="修改识别信息",
action_type="edit_review",
description="打开结构化模板,按已识别字段逐项修改。",
emphasis="secondary",
),
primary_action,
]
def _build_review_intent_summary(
self,
payload: UserAgentRequest,
*,
slot_cards: list[UserAgentReviewSlotCard],
claim_groups: list[UserAgentReviewClaimGroup],
) -> str:
slots = {item.key: item for item in slot_cards}
expense_type = slots.get("expense_type")
amount = slots.get("amount")
time_range = slots.get("time_range")
location = slots.get("location")
customer = slots.get("customer_name")
summary = "我先按你当前提供的信息整理出一笔报销。"
if expense_type and expense_type.value:
summary = f"我理解你这次想报销{expense_type.value}"
details: list[str] = []
if customer and customer.value:
details.append(f"客户名称:{customer.value}")
if time_range and time_range.value:
details.append(f"时间:{time_range.value}")
if location and location.value:
details.append(f"地点:{location.value}")
if amount and amount.value:
details.append(f"金额:{amount.value}")
if details:
return f"{summary} {''.join(details)}"
return summary
def _build_review_body_answer(
self,
payload: UserAgentRequest,
*,
review_payload: UserAgentReviewPayload | None,
draft_payload: UserAgentDraftPayload | None,
) -> str | None:
if review_payload is None:
return None
if payload.ontology.scenario != "expense":
return None
if payload.ontology.intent not in {"draft", "operate"}:
return None
if payload.tool_payload.get("draft_limit_reached"):
return (
str(payload.tool_payload.get("message") or "").strip()
or "你当前已保存 3 个草稿,请先完成已保存的草稿,才能再次新建草稿。"
)
review_action = str(payload.context_json.get("review_action") or "").strip()
if review_action == "save_draft":
if draft_payload is not None and draft_payload.claim_no:
return (
f"我已经把本轮识别结果整理好了,右侧可以继续核对。"
f"当前先替你保存到草稿 {draft_payload.claim_no},后面把缺的信息补齐就可以继续。"
)
return "我已经把本轮识别结果整理好了,右侧可以继续核对。当前信息还没补全,我先按你的要求保存为草稿。"
if review_action == "next_step":
return "我已经把识别到的关键信息整理好了,右侧是本轮识别结果。你确认无误后,可以直接进入下一步。"
if review_action == "edit_review":
return "我已经按你修改后的内容重新识别了一遍。右侧是最新结果,下方还有待补信息和注意事项,你继续确认即可。"
return review_payload.body_message or None
def _build_review_body_message(
self,
payload: UserAgentRequest,
*,
can_proceed: bool,
draft_payload: UserAgentDraftPayload | None,
missing_slot_labels: list[str],
) -> str:
if can_proceed:
return "我已经把识别结果整理在右侧了。当前关键信息基本齐全,你核对无误后可以直接点“下一步”继续处理。"
missing_hint = "".join(missing_slot_labels[:4])
missing_message = f"当前还缺少 {missing_hint}" if missing_hint else "当前仍有信息待补充。"
if draft_payload is not None and draft_payload.claim_no:
return (
f"我先根据你当前提供的信息完成了初步识别,右侧是识别结果。{missing_message}"
f"如果现在还拿不全,也可以先保存到草稿 {draft_payload.claim_no},后面再补。"
)
return (
f"我先根据你当前提供的信息完成了初步识别,右侧是识别结果。{missing_message}"
"你可以继续补充;如果暂时不方便提供,也可以先保存草稿。"
)
@staticmethod
def _can_proceed_review(
payload: UserAgentRequest,
*,
missing_slot_keys: list[str],
claim_groups: list[UserAgentReviewClaimGroup],
) -> bool:
if payload.ontology.ambiguity:
return False
if missing_slot_keys:
return False
if not claim_groups:
return False
return True
def _build_review_edit_fields(
self,
payload: UserAgentRequest,
*,
draft_payload: UserAgentDraftPayload | None,
slot_cards: list[UserAgentReviewSlotCard],
) -> list[UserAgentReviewEditField]:
slot_map = {item.key: item for item in slot_cards}
employee = self._resolve_employee_profile(payload)
reporter_name = (
slot_map.get("reporter_name").value
if slot_map.get("reporter_name")
else str(payload.context_json.get("name") or "").strip()
)
manager_name = self._resolve_manager_name(employee)
reason = slot_map.get("reason").value if slot_map.get("reason") else ""
attachments = "".join(self._resolve_attachment_names(payload))
fields = [
UserAgentReviewEditField(
key="claim_no",
label="报销单据编号",
value=str(draft_payload.claim_no if draft_payload is not None and draft_payload.claim_no else "待生成"),
placeholder="保存草稿后自动生成",
required=False,
group="basic",
),
UserAgentReviewEditField(
key="expense_type",
label="报销类型",
value=slot_map.get("expense_type").value if slot_map.get("expense_type") else "",
placeholder="例如:业务招待费 / 差旅费",
group="basic",
),
UserAgentReviewEditField(
key="occurred_date",
label="业务发生时间",
value=slot_map.get("time_range").normalized_value if slot_map.get("time_range") and slot_map.get("time_range").normalized_value else slot_map.get("time_range").value if slot_map.get("time_range") else "",
placeholder="例如2026-05-11",
group="basic",
),
UserAgentReviewEditField(
key="reporter_name",
label="报销人",
value=reporter_name,
placeholder="请输入报销人姓名",
group="basic",
),
UserAgentReviewEditField(
key="manager_name",
label="直属上司姓名",
value=manager_name,
placeholder="请输入直属上司姓名",
required=False,
group="basic",
),
UserAgentReviewEditField(
key="customer_name",
label="客户名称",
value=slot_map.get("customer_name").value if slot_map.get("customer_name") else "",
placeholder="请输入客户名称",
group="business",
),
UserAgentReviewEditField(
key="business_location",
label="业务地点",
value=slot_map.get("location").normalized_value if slot_map.get("location") and slot_map.get("location").normalized_value else slot_map.get("location").value if slot_map.get("location") else "",
placeholder="例如:北京 / 客户现场",
required=False,
group="business",
),
UserAgentReviewEditField(
key="merchant_name",
label="酒店/商户",
value=slot_map.get("merchant_name").value if slot_map.get("merchant_name") else "",
placeholder="请输入酒店或商户名称",
required=False,
group="business",
),
UserAgentReviewEditField(
key="amount",
label="金额",
value=slot_map.get("amount").normalized_value if slot_map.get("amount") and slot_map.get("amount").normalized_value else slot_map.get("amount").value if slot_map.get("amount") else "",
placeholder="例如200.00元",
group="business",
),
UserAgentReviewEditField(
key="participants",
label="参与人员",
value=slot_map.get("participants").value if slot_map.get("participants") else "",
placeholder="例如:客户 2 人,我方 1 人",
group="business",
),
UserAgentReviewEditField(
key="reason",
label="事由",
value=reason,
placeholder="请输入报销事由",
field_type="textarea",
group="business",
),
UserAgentReviewEditField(
key="attachment_names",
label="附件清单",
value=attachments,
placeholder="例如:发票.jpg、行程单.png",
required=False,
field_type="textarea",
group="attachments",
),
]
return fields
def _resolve_employee_profile(self, payload: UserAgentRequest) -> Employee | None:
candidates = [
str(payload.context_json.get("name") or "").strip(),
str(payload.user_id or "").strip(),
self._collect_entity_values(payload).get("employee_name", ""),
]
normalized = [item for item in dict.fromkeys(candidates) if item]
if not normalized:
return None
stmt = (
select(Employee)
.where(
or_(
Employee.name.in_(normalized),
Employee.employee_no.in_(normalized),
Employee.email.in_(normalized),
)
)
.limit(1)
)
return self.db.scalar(stmt)
@staticmethod
def _resolve_manager_name(employee: Employee | None) -> str:
if employee is None:
return ""
if employee.manager is not None and employee.manager.name:
return employee.manager.name
if employee.organization_unit is not None and employee.organization_unit.manager_name:
return employee.organization_unit.manager_name
return ""
@staticmethod
def _extract_message_reason(message: str) -> str:
for line in str(message or "").splitlines():
cleaned = line.strip()
if not cleaned:
continue
if cleaned.startswith(("附件名称:", "OCR摘要", "关联单号:")):
continue
return cleaned[:300]
return ""
@classmethod
def _resolve_reason_text(cls, message: str) -> str:
reason = cls._extract_message_reason(message)
if not reason:
return ""
compact = re.sub(r"\s+", "", reason)
if compact in GENERIC_EXPENSE_PROMPTS:
return ""
instruction_prefixes = (
"帮我生成",
"请帮我生成",
"生成",
"起草",
"创建",
"发起",
"准备",
"帮我报销",
"我要报销",
"我想报销",
)
if compact.startswith(instruction_prefixes):
for separator in ("", ",", "", "", ";", "", ":"):
if separator in reason:
trailing = reason.split(separator, 1)[1].strip()
if trailing:
return trailing[:300]
return ""
return reason
@staticmethod
def _should_skip_model_answer(
payload: UserAgentRequest,
review_payload: UserAgentReviewPayload | None,
) -> bool:
if review_payload is None:
return False
return payload.ontology.scenario == "expense" and (
payload.ontology.intent == "draft"
or int(payload.context_json.get("attachment_count") or 0) > 0
)
def _build_rule_citations(self, payload: UserAgentRequest) -> list[UserAgentCitation]:
domain = self._resolve_domain(payload.ontology.scenario)
items = self.asset_service.list_assets(
asset_type=AgentAssetType.RULE.value,
status=AgentAssetStatus.ACTIVE.value,
domain=domain,
)
ranked = self._rank_rule_assets(items, payload)
citations: list[UserAgentCitation] = []
for item in ranked[:2]:
detail = self.asset_service.get_asset(item.id)
if detail is None:
continue
excerpt = self._extract_excerpt(str(detail.current_version_content or ""))
citations.append(
UserAgentCitation(
source_type="rule",
code=detail.code,
title=detail.name,
version=detail.current_version,
updated_at=detail.updated_at.date().isoformat(),
excerpt=excerpt,
)
)
return citations
@staticmethod
def _resolve_risk_flags(payload: UserAgentRequest) -> list[str]:
tool_flags = payload.tool_payload.get("risk_flags")
if isinstance(tool_flags, list) and tool_flags:
return [str(item) for item in tool_flags]
return [str(item) for item in payload.ontology.risk_flags]
@staticmethod
def _resolve_subject(payload: UserAgentRequest) -> str:
named_entities = [
item.value
for item in payload.ontology.entities
if item.type in {"employee", "customer", "vendor", "project"}
]
if named_entities:
return f"{''.join(named_entities)} 相关数据"
return f"{SCENARIO_LABELS.get(payload.ontology.scenario, '当前')}场景数据"
@staticmethod
def _is_generic_expense_prompt(payload: UserAgentRequest) -> bool:
if payload.ontology.scenario != "expense":
return False
normalized_message = re.sub(r"\s+", "", payload.message)
return normalized_message in GENERIC_EXPENSE_PROMPTS
@staticmethod
def _is_implicit_expense_draft_request(payload: UserAgentRequest) -> bool:
if payload.ontology.scenario != "expense" or payload.ontology.intent != "draft":
return False
compact_message = re.sub(r"\s+", "", payload.message)
if any(keyword in compact_message for keyword in EXPLICIT_DRAFT_KEYWORDS):
return False
return True
@staticmethod
def _resolve_attachment_names(payload: UserAgentRequest) -> list[str]:
names = payload.context_json.get("attachment_names")
if not isinstance(names, list):
return []
return [str(name) for name in names if str(name).strip()]
@staticmethod
def _resolve_attachment_count(payload: UserAgentRequest) -> int:
names = UserAgentService._resolve_attachment_names(payload)
if names:
return len(names)
try:
return max(0, int(payload.context_json.get("attachment_count") or 0))
except (TypeError, ValueError):
return 0
@staticmethod
def _resolve_ocr_documents(payload: UserAgentRequest) -> list[dict[str, object]]:
documents = payload.context_json.get("ocr_documents")
if not isinstance(documents, list):
return []
normalized: list[dict[str, object]] = []
for item in documents[:8]:
if not isinstance(item, dict):
continue
normalized.append(item)
return normalized
@staticmethod
def _resolve_conversation_history(payload: UserAgentRequest) -> list[dict[str, object]]:
history = payload.context_json.get("conversation_history")
if not isinstance(history, list):
return []
normalized: list[dict[str, object]] = []
for item in history[-8:]:
if not isinstance(item, dict):
continue
role = str(item.get("role") or "").strip()
content = str(item.get("content") or "").strip()
if not role or not content:
continue
normalized.append({"role": role, "content": content})
return normalized
@staticmethod
def _resolve_domain(scenario: str) -> str | None:
if scenario == "expense":
return "expense"
if scenario == "accounts_receivable":
return "ar"
if scenario == "accounts_payable":
return "ap"
return None
@staticmethod
def _rank_rule_assets(
items: list[AgentAssetListItem],
payload: UserAgentRequest,
) -> list[AgentAssetListItem]:
def score(item: AgentAssetListItem) -> tuple[int, str]:
tags = {str(value) for value in item.scenario_json or []}
weight = 0
if payload.ontology.scenario in tags:
weight += 3
if payload.ontology.intent in tags:
weight += 2
for risk_flag in payload.ontology.risk_flags:
if risk_flag in tags:
weight += 4
return weight, item.code
ranked = sorted(items, key=score, reverse=True)
return [item for item in ranked if score(item)[0] > 0]
@staticmethod
def _extract_excerpt(content: str) -> str:
lines = [line.strip() for line in str(content).splitlines() if line.strip()]
cleaned: list[str] = []
for line in lines:
normalized = re.sub(r"^[#>\-\*\d\.\s`]+", "", line).strip()
if normalized:
cleaned.append(normalized)
if len(cleaned) >= 2:
break
return "".join(cleaned[:2])
def _collect_entity_values(self, payload: UserAgentRequest) -> dict[str, str]:
values = {
"employee_name": "",
"customer": "",
"participants": "",
"amount": "",
"expense_type": "",
"expense_type_code": "",
}
participants: list[str] = []
for item in payload.ontology.entities:
if item.type == "employee" and not values["employee_name"]:
values["employee_name"] = item.value
elif item.type == "customer" and not values["customer"]:
values["customer"] = item.value
elif item.type == "amount" and item.role != "threshold" and not values["amount"]:
values["amount"] = f"{item.value}" if "" not in item.value else item.value
elif item.type == "expense_type" and not values["expense_type_code"]:
values["expense_type_code"] = item.normalized_value
values["expense_type"] = EXPENSE_TYPE_LABELS.get(
item.normalized_value,
item.value,
)
elif item.type in {"participant", "person"} and item.value.strip():
participants.append(item.value.strip())
if participants:
values["participants"] = "".join(dict.fromkeys(participants))
return values
def _format_time_range(self, payload: UserAgentRequest) -> str:
time_range = payload.ontology.time_range
if time_range.start_date and time_range.end_date:
if time_range.start_date == time_range.end_date:
return time_range.start_date
normalized = f"{time_range.start_date}{time_range.end_date}"
return normalized
if time_range.raw:
return time_range.raw
return ""
def _resolve_location_value(self, payload: UserAgentRequest) -> str:
review_form_values = self._resolve_review_form_values(payload)
for key in ("business_location", "location"):
value = str(review_form_values.get(key) or "").strip()
if value:
return value
if str(payload.context_json.get("entry_source") or "").strip() == "detail":
request_context = payload.context_json.get("request_context")
if isinstance(request_context, dict):
for key in ("city", "location"):
value = str(request_context.get(key) or "").strip()
if value:
return value
labeled_match = re.search(r"(?:业务地点|发生地点|地点)[:]\s*(?P<value>[^\n]+)", payload.message)
if labeled_match:
return labeled_match.group("value").strip()
city_match = re.search(r"去(?P<city>[\u4e00-\u9fa5]{2,8})(?:出差|拜访|参会|见客户|客户现场)", payload.message)
if city_match:
return city_match.group("city").strip()
if "客户现场" in payload.message.replace(" ", ""):
return "客户现场"
return ""
@staticmethod
def _resolve_review_form_values(payload: UserAgentRequest) -> dict[str, str]:
values = payload.context_json.get("review_form_values")
if not isinstance(values, dict):
return {}
normalized: dict[str, str] = {}
for key, value in values.items():
cleaned_key = str(key or "").strip()
if not cleaned_key:
continue
normalized[cleaned_key] = str(value or "").strip()
return normalized
@staticmethod
def _build_slot_value(
*,
value: str = "",
raw_value: str = "",
normalized_value: str = "",
source: str = "system",
confidence: float = 0.0,
evidence: str = "",
) -> dict[str, str | float]:
return {
"value": str(value or "").strip(),
"raw_value": str(raw_value or "").strip(),
"normalized_value": str(normalized_value or "").strip(),
"source": str(source or "system").strip() or "system",
"confidence": float(confidence),
"evidence": str(evidence or "").strip(),
}
def _build_time_slot(self, payload: UserAgentRequest) -> dict[str, str | float]:
review_form_values = self._resolve_review_form_values(payload)
edited_value = str(
review_form_values.get("occurred_date")
or review_form_values.get("time_range")
or review_form_values.get("business_time")
or ""
).strip()
if edited_value:
raw_value = str(review_form_values.get("time_range_raw") or edited_value).strip()
return self._build_slot_value(
value=edited_value,
raw_value=raw_value,
normalized_value=edited_value,
source="user_form",
confidence=1.0,
evidence="来源于用户修改后的结构化表单。",
)
time_range = payload.ontology.time_range
if time_range.start_date and time_range.end_date:
normalized_value = (
time_range.start_date
if time_range.start_date == time_range.end_date
else f"{time_range.start_date}{time_range.end_date}"
)
raw_value = str(time_range.raw or "").strip()
return self._build_slot_value(
value=normalized_value,
raw_value=raw_value,
normalized_value=normalized_value,
source="user_text",
confidence=0.92,
evidence="系统已根据当前日期将相对时间换算为标准日期。",
)
return self._build_slot_value()
def _build_location_slot(self, payload: UserAgentRequest) -> dict[str, str | float]:
review_form_values = self._resolve_review_form_values(payload)
for key in ("business_location", "location"):
value = str(review_form_values.get(key) or "").strip()
if value:
return self._build_slot_value(
value=value,
normalized_value=value,
source="user_form",
confidence=1.0,
evidence="来源于用户修改后的结构化表单。",
)
if str(payload.context_json.get("entry_source") or "").strip() == "detail":
request_context = payload.context_json.get("request_context")
if isinstance(request_context, dict):
for key in ("city", "location"):
value = str(request_context.get(key) or "").strip()
if value:
return self._build_slot_value(
value=value,
normalized_value=value,
source="detail_context",
confidence=0.68,
evidence="来源于当前关联单据,仅作为辅助上下文,需要用户再次核对。",
)
value = self._resolve_location_value(payload)
if value:
evidence = "用户在文本中明确描述了业务地点。"
if value == "客户现场":
evidence = "用户明确提到“客户现场”,但未提供具体城市或地址。"
return self._build_slot_value(
value=value,
normalized_value=value,
source="user_text",
confidence=0.82,
evidence=evidence,
)
return self._build_slot_value()
def _build_customer_slot(
self,
payload: UserAgentRequest,
*,
entity_map: dict[str, str],
) -> dict[str, str | float]:
review_form_values = self._resolve_review_form_values(payload)
value = str(review_form_values.get("customer_name") or "").strip()
if value:
return self._build_slot_value(
value=value,
normalized_value=value,
source="user_form",
confidence=1.0,
evidence="来源于用户修改后的结构化表单。",
)
value = entity_map.get("customer", "")
if value:
return self._build_slot_value(
value=value,
normalized_value=value,
source="user_text",
confidence=0.88,
evidence="用户在原始描述中直接提到了客户对象。",
)
return self._build_slot_value()
def _build_participants_slot(
self,
payload: UserAgentRequest,
*,
entity_map: dict[str, str],
) -> dict[str, str | float]:
review_form_values = self._resolve_review_form_values(payload)
value = str(review_form_values.get("participants") or "").strip()
if value:
return self._build_slot_value(
value=value,
normalized_value=value,
source="user_form",
confidence=1.0,
evidence="来源于用户修改后的结构化表单。",
)
value = entity_map.get("participants", "")
if value:
return self._build_slot_value(
value=value,
normalized_value=value,
source="user_text",
confidence=0.8,
evidence="用户在当前描述中补充了参与人员。",
)
return self._build_slot_value()
def _build_reason_slot(self, payload: UserAgentRequest) -> dict[str, str | float]:
review_form_values = self._resolve_review_form_values(payload)
edited_value = str(review_form_values.get("reason") or "").strip()
if edited_value:
return self._build_slot_value(
value=edited_value,
raw_value=edited_value,
normalized_value=edited_value,
source="user_form",
confidence=1.0,
evidence="来源于用户修改后的结构化表单。",
)
reason_value = self._resolve_reason_text(payload.message)
if reason_value:
return self._build_slot_value(
value=reason_value,
raw_value=reason_value,
normalized_value=reason_value,
source="user_text",
confidence=0.76,
evidence="系统从用户原始描述中提取了本次费用事由,建议继续核对。",
)
return self._build_slot_value()
def _build_amount_slot(
self,
payload: UserAgentRequest,
*,
entity_map: dict[str, str],
ocr_documents: list[dict[str, object]],
) -> dict[str, str | float]:
review_form_values = self._resolve_review_form_values(payload)
edited_amount = str(review_form_values.get("amount") or "").strip()
if edited_amount:
normalized = self._normalize_amount_text(edited_amount)
return self._build_slot_value(
value=normalized,
raw_value=edited_amount,
normalized_value=normalized,
source="user_form",
confidence=1.0,
evidence="来源于用户修改后的结构化表单。",
)
amount_value = entity_map.get("amount", "")
if amount_value:
normalized = self._normalize_amount_text(amount_value)
return self._build_slot_value(
value=normalized,
raw_value=amount_value,
normalized_value=normalized,
source="user_text",
confidence=0.92,
evidence="用户在原始描述中直接给出了金额。",
)
ocr_total_amount = self._sum_ocr_amounts(ocr_documents)
if ocr_total_amount > 0:
normalized = f"{ocr_total_amount:.2f}"
return self._build_slot_value(
value=normalized,
normalized_value=normalized,
source="ocr",
confidence=0.76,
evidence="金额来自 OCR 汇总结果,仍建议用户核对票据原文。",
)
return self._build_slot_value()
def _build_expense_type_slot(
self,
payload: UserAgentRequest,
*,
entity_map: dict[str, str],
ocr_documents: list[dict[str, object]],
) -> dict[str, str | float]:
review_form_values = self._resolve_review_form_values(payload)
edited_value = str(review_form_values.get("expense_type") or review_form_values.get("reimbursement_type") or "").strip()
if edited_value:
normalized_code, normalized_label = self._normalize_expense_type_input(edited_value)
return self._build_slot_value(
value=normalized_label,
raw_value=edited_value,
normalized_value=normalized_code,
source="user_form",
confidence=1.0,
evidence="来源于用户修改后的结构化表单。",
)
expense_type_code = entity_map.get("expense_type_code", "")
expense_type_value = EXPENSE_TYPE_LABELS.get(expense_type_code, entity_map.get("expense_type", ""))
if expense_type_value:
return self._build_slot_value(
value=expense_type_value,
raw_value=expense_type_value,
normalized_value=expense_type_code,
source="user_text",
confidence=0.9,
evidence="系统根据用户描述中的业务场景判断费用类型。",
)
inferred_label = self._infer_expense_type_from_documents(payload, ocr_documents) if ocr_documents else ""
if inferred_label:
normalized_code, normalized_label = self._normalize_expense_type_input(inferred_label)
return self._build_slot_value(
value=normalized_label,
raw_value=inferred_label,
normalized_value=normalized_code,
source="ocr",
confidence=0.74,
evidence="系统根据票据内容推断费用类型,仍建议用户确认。",
)
return self._build_slot_value()
def _build_merchant_slot(
self,
payload: UserAgentRequest,
*,
ocr_documents: list[dict[str, object]],
) -> dict[str, str | float]:
review_form_values = self._resolve_review_form_values(payload)
edited_value = str(review_form_values.get("merchant_name") or "").strip()
if edited_value:
return self._build_slot_value(
value=edited_value,
normalized_value=edited_value,
source="user_form",
confidence=1.0,
evidence="来源于用户修改后的结构化表单。",
)
merchant_value = self._extract_document_merchant_name(ocr_documents[0]) if ocr_documents else ""
if merchant_value:
return self._build_slot_value(
value=merchant_value,
normalized_value=merchant_value,
source="ocr",
confidence=0.72,
evidence="商户名称来自 OCR 票据识别结果,仍建议用户核对。",
)
return self._build_slot_value()
def _build_attachment_slot(self, payload: UserAgentRequest) -> dict[str, str | float]:
review_form_values = self._resolve_review_form_values(payload)
attachment_names = str(review_form_values.get("attachment_names") or "").strip()
if attachment_names:
return self._build_slot_value(
value=attachment_names,
normalized_value=attachment_names,
source="user_form",
confidence=1.0,
evidence="来源于用户修改后的结构化表单。",
)
count = self._resolve_attachment_count(payload)
if count > 0:
names = self._resolve_attachment_names(payload)
value = "".join(names) if names else f"{count} 份附件"
return self._build_slot_value(
value=value,
raw_value=value,
normalized_value=str(count),
source="upload",
confidence=1.0,
evidence="系统已接收到用户上传的附件。",
)
return self._build_slot_value()
@staticmethod
def _normalize_amount_text(value: str) -> str:
cleaned = str(value or "").strip()
if not cleaned:
return ""
match = AMOUNT_TEXT_PATTERN.search(cleaned)
if not match:
return cleaned
number = float(match.group(1))
return f"{number:.2f}"
@staticmethod
def _normalize_expense_type_input(value: str) -> tuple[str, str]:
compact = str(value or "").replace(" ", "")
if "招待" in compact or ("客户" in compact and any(keyword in compact for keyword in ("吃饭", "用餐", "宴请", "请客"))):
return "entertainment", "业务招待费"
if any(keyword in compact for keyword in ("差旅", "出差", "机票", "行程")):
return "travel", "差旅费"
if any(keyword in compact for keyword in ("住宿", "酒店", "宾馆")):
return "hotel", "住宿费"
if any(keyword in compact for keyword in ("交通", "打车", "网约车", "出租车", "车费", "停车")):
return "transport", "交通费"
if any(keyword in compact for keyword in ("餐费", "用餐", "午餐", "晚餐", "早餐", "伙食")):
return "meal", "餐费"
if "会务" in compact:
return "meeting", "会务费"
if any(keyword in compact for keyword in ("办公费", "办公用品", "文具", "耗材", "办公耗材", "打印纸", "办公设备", "键盘", "鼠标", "白板")):
return "office", "办公费"
if any(keyword in compact for keyword in ("培训费", "培训", "讲师费", "课时费", "课程费")):
return "training", "培训费"
if any(keyword in compact for keyword in ("通讯费", "话费", "流量费", "宽带费")):
return "communication", "通讯费"
if any(keyword in compact for keyword in ("福利费", "团建", "慰问", "节日福利", "体检费")):
return "welfare", "福利费"
return "other", str(value or "").strip() or "其他费用"
def _resolve_required_review_keys(
self,
payload: UserAgentRequest,
*,
primary_expense_type: str,
claim_groups: list[UserAgentReviewClaimGroup],
) -> set[str]:
required = {"expense_type", "time_range", "amount", "reason", "attachments"}
scene_codes = {
str(item.group_code or "").strip()
for item in claim_groups
if str(item.group_code or "").strip()
}
if primary_expense_type:
scene_codes.add(primary_expense_type)
compact_message = re.sub(r"\s+", "", payload.message)
if "entertainment" in scene_codes or (
"客户" in compact_message and any(keyword in compact_message for keyword in ("招待", "吃饭", "用餐", "宴请", "请客"))
):
required.update({"customer_name", "participants"})
return required
@staticmethod
def _resolve_review_missing_slot_keys(
payload: UserAgentRequest,
*,
slot_cards: list[UserAgentReviewSlotCard],
) -> list[str]:
required_keys = {item.key for item in slot_cards if item.required}
missing_keys = {
item.key
for item in slot_cards
if item.required and (item.status == "missing" or not str(item.value).strip())
}
for key in payload.ontology.missing_slots:
normalized_key = str(key or "").strip()
if normalized_key and normalized_key in required_keys:
missing_keys.add(normalized_key)
ordered_keys: list[str] = []
for item in slot_cards:
if item.required and item.key in missing_keys and item.key not in ordered_keys:
ordered_keys.append(item.key)
return ordered_keys
def _make_slot_card(
self,
*,
key: str,
value: str,
raw_value: str,
normalized_value: str,
source: str,
confidence: float,
evidence: str,
required: bool = True,
) -> UserAgentReviewSlotCard:
is_missing = required and not str(value).strip()
source_key = source if source in SOURCE_LABELS else "system"
return UserAgentReviewSlotCard(
key=key,
label=SLOT_LABELS.get(key, key),
value=str(value or "").strip(),
raw_value=str(raw_value or "").strip(),
normalized_value=str(normalized_value or "").strip(),
source=source,
source_label=SOURCE_LABELS.get(source_key, "系统判断"),
confidence=confidence,
required=required,
confirmed=not is_missing and source in {"user_text", "user_form"},
status="missing" if is_missing else "identified" if source in {"user_text", "user_form"} else "inferred",
hint=f"建议补充 {SLOT_LABELS.get(key, key)}"
if is_missing and required
else ("该字段来自系统辅助上下文,建议你再核对一次。" if source in {"detail_context", "ocr"} else ""),
evidence=evidence,
)
def _classify_document(
self,
item: dict[str, object],
payload: UserAgentRequest,
) -> dict[str, str]:
text = " ".join(
[
str(item.get("filename") or ""),
str(item.get("summary") or ""),
str(item.get("text") or ""),
]
).lower()
compact = text.replace(" ", "")
expense_type_code = self._collect_entity_values(payload).get("expense_type_code", "")
has_customer = bool(self._collect_entity_values(payload).get("customer"))
if any(keyword in compact for keyword in ("机票", "航班", "火车", "高铁", "行程单")):
return {
"document_type": "travel_ticket",
"expense_type": "travel",
"group_code": "travel",
"scene_label": "差旅票据",
}
if any(keyword in compact for keyword in ("酒店", "住宿", "宾馆")):
return {
"document_type": "hotel_invoice",
"expense_type": "hotel",
"group_code": "travel",
"scene_label": "住宿票据",
}
if any(keyword in compact for keyword in ("打车", "出租车", "滴滴", "网约车", "过路费", "停车")):
return {
"document_type": "transport_receipt",
"expense_type": "transport",
"group_code": "travel",
"scene_label": "交通票据",
}
if any(keyword in compact for keyword in ("", "饭店", "酒楼", "酒家", "餐饮", "meal")):
group_code = "entertainment" if expense_type_code == "entertainment" or has_customer else "meal"
return {
"document_type": "meal_receipt",
"expense_type": group_code,
"group_code": group_code,
"scene_label": "餐饮票据",
}
if any(keyword in compact for keyword in ("办公用品", "文具", "耗材", "办公耗材", "打印纸", "键盘", "鼠标", "白板", "墨盒", "硒鼓")):
return {
"document_type": "other",
"expense_type": "office",
"group_code": "office",
"scene_label": "办公用品票据",
}
return {
"document_type": "other",
"expense_type": expense_type_code or "other",
"group_code": self._normalize_group_code(expense_type_code or "other"),
"scene_label": "其他票据",
}
@staticmethod
def _normalize_group_code(expense_type_code: str) -> str:
if expense_type_code in {"travel", "hotel", "transport"}:
return "travel"
if expense_type_code in {"entertainment", "meal", "office", "training", "communication", "welfare"}:
return expense_type_code
return "other"
def _extract_document_fields(self, item: dict[str, object]) -> dict[str, str]:
text = " ".join([str(item.get("summary") or ""), str(item.get("text") or "")]).strip()
fields: dict[str, str] = {}
amount_match = AMOUNT_TEXT_PATTERN.search(text)
if amount_match:
fields["金额"] = f"{amount_match.group(1)}"
date_match = DATE_TEXT_PATTERN.search(text)
if date_match:
fields["时间"] = date_match.group(1)
merchant = self._extract_document_merchant_name(item)
if merchant:
fields["商户/酒店"] = merchant
return fields
@staticmethod
def _extract_document_merchant_name(item: dict[str, object]) -> str:
text = " ".join([str(item.get("summary") or ""), str(item.get("text") or "")]).strip()
for keyword in ("酒店", "宾馆", "饭店", "酒楼", "餐厅", "航空", "铁路", "滴滴"):
if keyword in text:
return keyword
return ""
@staticmethod
def _extract_amount_from_card(card: UserAgentReviewDocumentCard) -> float:
for item in card.fields:
if item.label != "金额":
continue
try:
return float(str(item.value).replace("", "").strip())
except ValueError:
return 0.0
return 0.0
def _resolve_amount_value(self, payload: UserAgentRequest) -> float:
for item in payload.ontology.entities:
if item.type == "amount" and item.role != "threshold":
try:
return float(item.normalized_value)
except ValueError:
return 0.0
return 0.0
def _sum_ocr_amounts(self, ocr_documents: list[dict[str, object]]) -> float:
total = 0.0
for item in ocr_documents:
fields = self._extract_document_fields(item)
amount_text = str(fields.get("金额") or "").replace("", "").strip()
if not amount_text:
continue
try:
total += float(amount_text)
except ValueError:
continue
return total
def _infer_expense_type_from_documents(
self,
payload: UserAgentRequest,
ocr_documents: list[dict[str, object]],
) -> str:
labels: list[str] = []
for item in ocr_documents:
classified = self._classify_document(item, payload)
label = GROUP_SCENE_LABELS.get(classified["group_code"], "")
if label and label not in labels:
labels.append(label)
return " + ".join(labels[:3])