refactor(backend): update and add service layers

- services/ontology.py: update ontology service
- services/orchestrator.py: update orchestrator service
- services/user_agent.py: update user agent service
- services/settings.py: update settings service
- services/expense_claims.py: update expense claims service
- services/agent_conversations.py: add new agent conversations service
This commit is contained in:
caoxiaozhu
2026-05-12 06:36:09 +00:00
parent a6a28ba865
commit 01df3452fd
6 changed files with 1442 additions and 80 deletions

View File

@@ -2,14 +2,24 @@ from __future__ import annotations
import json
import re
from datetime import UTC, datetime, timedelta
from sqlalchemy import select
from sqlalchemy.orm import Session
from app.core.agent_enums import AgentAssetStatus, AgentAssetType
from app.models.financial_record import ExpenseClaim
from app.schemas.agent_asset import AgentAssetListItem
from app.schemas.user_agent import (
UserAgentCitation,
UserAgentDraftPayload,
UserAgentReviewAction,
UserAgentReviewClaimGroup,
UserAgentReviewDocumentCard,
UserAgentReviewDocumentField,
UserAgentReviewPayload,
UserAgentReviewRiskBrief,
UserAgentReviewSlotCard,
UserAgentRequest,
UserAgentResponse,
UserAgentSuggestedAction,
@@ -53,8 +63,32 @@ EXPENSE_TYPE_LABELS = {
"meal": "餐费",
"meeting": "会务",
"entertainment": "招待",
"other": "其他",
}
GROUP_SCENE_LABELS = {
"travel": "差旅费",
"entertainment": "业务招待费",
"meal": "伙食费",
"transport": "交通费",
"hotel": "住宿费",
"other": "其他费用",
}
SLOT_LABELS = {
"expense_type": "报销类型",
"customer_name": "客户名称",
"time_range": "发生时间",
"location": "地点",
"merchant_name": "酒店/商户",
"amount": "金额",
"participants": "参与人员",
"attachments": "票据附件",
}
DATE_TEXT_PATTERN = re.compile(r"(\d{4}[年/-]\d{1,2}[月/-]\d{1,2}日?)")
AMOUNT_TEXT_PATTERN = re.compile(r"(\d+(?:\.\d+)?)\s*(?:元|万元|万)")
class UserAgentService:
def __init__(self, db: Session) -> None:
@@ -72,23 +106,32 @@ class UserAgentService:
if payload.ontology.intent == "draft"
else None
)
review_payload = self._build_review_payload(
payload,
citations=citations,
draft_payload=draft_payload,
)
if payload.degraded and payload.tool_payload.get("message"):
return UserAgentResponse(
answer=str(payload.tool_payload["message"]),
citations=citations,
suggested_actions=suggested_actions,
review_payload=review_payload,
risk_flags=risk_flags,
requires_confirmation=payload.requires_confirmation,
)
guided_answer = self._build_guided_answer(payload)
guided_answer = None
if draft_payload is None or draft_payload.claim_id is None:
guided_answer = self._build_guided_answer(payload)
if guided_answer:
return UserAgentResponse(
answer=guided_answer,
citations=citations,
suggested_actions=suggested_actions,
draft_payload=draft_payload,
review_payload=review_payload,
risk_flags=risk_flags,
requires_confirmation=payload.requires_confirmation,
)
@@ -98,20 +141,23 @@ class UserAgentService:
citations=citations,
draft_payload=draft_payload,
)
answer = self._generate_answer_with_model(
payload,
citations=citations,
suggested_actions=suggested_actions,
risk_flags=risk_flags,
draft_payload=draft_payload,
fallback_answer=fallback_answer,
)
answer = None
if not self._should_skip_model_answer(payload, review_payload):
answer = self._generate_answer_with_model(
payload,
citations=citations,
suggested_actions=suggested_actions,
risk_flags=risk_flags,
draft_payload=draft_payload,
fallback_answer=fallback_answer,
)
return UserAgentResponse(
answer=answer or fallback_answer,
citations=citations,
suggested_actions=suggested_actions,
draft_payload=draft_payload,
review_payload=review_payload,
risk_flags=risk_flags,
requires_confirmation=payload.requires_confirmation,
)
@@ -129,6 +175,13 @@ class UserAgentService:
if payload.ontology.intent == "risk_check":
return self._build_risk_answer(payload, citations)
if payload.ontology.intent == "draft":
tool_message = str(payload.tool_payload.get("message") or "").strip()
if tool_message and (
str(payload.tool_payload.get("claim_id") or "").strip()
or str(payload.tool_payload.get("claim_no") or "").strip()
):
return tool_message
if payload.ontology.intent == "draft" and draft_payload is not None:
return (
f"已生成 {draft_payload.title},当前仅返回待人工确认的草稿内容,"
@@ -243,6 +296,11 @@ class UserAgentService:
"attachment_names": self._resolve_attachment_names(payload),
"ocr_summary": payload.context_json.get("ocr_summary", ""),
"ocr_documents": payload.context_json.get("ocr_documents", []),
"conversation_id": payload.context_json.get("conversation_id"),
"conversation_scenario": payload.context_json.get("conversation_scenario"),
"conversation_intent": payload.context_json.get("conversation_intent"),
"draft_claim_id": payload.context_json.get("draft_claim_id"),
"conversation_history": self._resolve_conversation_history(payload),
},
"tool_payload": payload.tool_payload,
"citations": [item.model_dump(mode="json") for item in citations],
@@ -267,6 +325,7 @@ class UserAgentService:
"并明确要求补充费用类型、金额、时间、事由、参与对象或上传票据。"
"如果上下文里只有附件名称,必须明确说明你只拿到了附件名称,"
"不能假装已看过图片、PDF 或发票内容。"
"如果提供了 conversation_history必须结合最近轮次理解追问、代词、省略字段和补充信息。"
"不要声称已经提交、审批、付款、入账或真正执行了任何动作;如果只是建议、草稿或待确认,要明确说清楚。"
"若给出了风险标签、制度引用或建议动作,可以简洁吸收进回答,但不要新增未提供的事实。"
"只输出最终给用户看的自然语言,不要输出 JSON、Markdown、标题、"
@@ -447,6 +506,424 @@ class UserAgentService:
),
]
def _build_review_payload(
self,
payload: UserAgentRequest,
*,
citations: list[UserAgentCitation],
draft_payload: UserAgentDraftPayload | None,
) -> UserAgentReviewPayload | None:
attachment_count = self._resolve_attachment_count(payload)
ocr_documents = self._resolve_ocr_documents(payload)
if payload.ontology.scenario != "expense":
return None
if payload.ontology.intent not in {"draft", "operate"} and attachment_count <= 0 and not ocr_documents:
return None
slot_cards = self._build_review_slot_cards(payload, ocr_documents=ocr_documents)
document_cards = self._build_review_document_cards(payload, ocr_documents=ocr_documents)
claim_groups = self._build_review_claim_groups(
payload,
document_cards=document_cards,
)
risk_briefs = self._build_review_risk_briefs(
payload,
citations=citations,
document_cards=document_cards,
claim_groups=claim_groups,
)
confirmation_actions = self._build_review_confirmation_actions(
payload,
claim_groups=claim_groups,
draft_payload=draft_payload,
)
intent_summary = self._build_review_intent_summary(
payload,
slot_cards=slot_cards,
claim_groups=claim_groups,
)
return UserAgentReviewPayload(
intent_summary=intent_summary,
scenario=payload.ontology.scenario,
intent=payload.ontology.intent,
missing_slots=list(payload.ontology.missing_slots),
risk_briefs=risk_briefs,
slot_cards=slot_cards,
document_cards=document_cards,
claim_groups=claim_groups,
confirmation_actions=confirmation_actions,
)
def _build_review_slot_cards(
self,
payload: UserAgentRequest,
*,
ocr_documents: list[dict[str, object]],
) -> list[UserAgentReviewSlotCard]:
first_doc_fields = self._extract_document_fields(ocr_documents[0]) if ocr_documents else {}
missing_slots = set(payload.ontology.missing_slots)
entity_map = self._collect_entity_values(payload)
time_value = self._format_time_range(payload)
location_value = self._resolve_location_value(payload)
merchant_value = self._extract_document_merchant_name(ocr_documents[0]) if ocr_documents else ""
customer_value = entity_map.get("customer", "")
participants_value = entity_map.get("participants", "")
amount_value = entity_map.get("amount")
if not amount_value:
ocr_total_amount = self._sum_ocr_amounts(ocr_documents)
amount_value = f"{ocr_total_amount:.2f}" if ocr_total_amount > 0 else ""
expense_type_code = entity_map.get("expense_type_code", "")
expense_type_value = EXPENSE_TYPE_LABELS.get(expense_type_code, entity_map.get("expense_type", ""))
if not expense_type_value and ocr_documents:
expense_type_value = self._infer_expense_type_from_documents(payload, ocr_documents)
attachment_value = (
f"{self._resolve_attachment_count(payload)} 份附件"
if self._resolve_attachment_count(payload)
else ""
)
cards = [
self._make_slot_card(
key="expense_type",
value=expense_type_value,
source="user_text" if expense_type_value else "system",
confidence=0.9 if expense_type_value else 0.0,
missing_slots=missing_slots,
),
self._make_slot_card(
key="customer_name",
value=customer_value,
source="user_text" if customer_value else "system",
confidence=0.88 if customer_value else 0.0,
missing_slots=missing_slots,
),
self._make_slot_card(
key="time_range",
value=time_value,
source="user_text" if time_value else "system",
confidence=0.9 if time_value else 0.0,
missing_slots=missing_slots,
),
self._make_slot_card(
key="location",
value=location_value,
source="page_context" if location_value and location_value != "客户现场" else "user_text",
confidence=0.82 if location_value else 0.0,
required=False,
missing_slots=missing_slots,
),
self._make_slot_card(
key="merchant_name",
value=merchant_value,
source="ocr" if merchant_value else "system",
confidence=0.72 if merchant_value else 0.0,
required=False,
missing_slots=missing_slots,
),
self._make_slot_card(
key="amount",
value=amount_value,
source="user_text" if entity_map.get("amount") else "ocr" if amount_value else "system",
confidence=0.92 if amount_value else 0.0,
missing_slots=missing_slots,
),
self._make_slot_card(
key="participants",
value=participants_value,
source="user_text" if participants_value else "system",
confidence=0.8 if participants_value else 0.0,
missing_slots=missing_slots,
),
self._make_slot_card(
key="attachments",
value=attachment_value,
source="upload" if attachment_value else "system",
confidence=1.0 if attachment_value else 0.0,
missing_slots=missing_slots,
),
]
return cards
def _build_review_document_cards(
self,
payload: UserAgentRequest,
*,
ocr_documents: list[dict[str, object]],
) -> list[UserAgentReviewDocumentCard]:
cards: list[UserAgentReviewDocumentCard] = []
for index, item in enumerate(ocr_documents, start=1):
classified = self._classify_document(item, payload)
fields = self._extract_document_fields(item)
cards.append(
UserAgentReviewDocumentCard(
index=index,
filename=str(item.get("filename") or f"document-{index}"),
document_type=classified["document_type"],
suggested_expense_type=classified["expense_type"],
scene_label=GROUP_SCENE_LABELS.get(
classified["group_code"],
classified["scene_label"],
),
summary=str(item.get("summary") or item.get("text") or "").strip(),
avg_score=float(item.get("avg_score") or 0.0),
warnings=[str(warning) for warning in item.get("warnings", []) if str(warning).strip()],
fields=[
UserAgentReviewDocumentField(
label=label,
value=value,
source="ocr",
)
for label, value in fields.items()
if str(value).strip()
],
)
)
return cards
def _build_review_claim_groups(
self,
payload: UserAgentRequest,
*,
document_cards: list[UserAgentReviewDocumentCard],
) -> list[UserAgentReviewClaimGroup]:
groups: dict[str, dict[str, object]] = {}
for card in document_cards:
group_code = self._normalize_group_code(card.suggested_expense_type)
bucket = groups.setdefault(
group_code,
{
"document_indexes": [],
"amount_total": 0.0,
"expense_type": group_code,
"scene_label": GROUP_SCENE_LABELS.get(group_code, "其他费用"),
"reasons": [],
},
)
bucket["document_indexes"].append(card.index)
bucket["amount_total"] = float(bucket["amount_total"]) + self._extract_amount_from_card(card)
bucket["reasons"].append(f"{card.filename} 识别为 {card.scene_label}")
if not groups:
expense_type_code = self._collect_entity_values(payload).get("expense_type_code", "other")
group_code = self._normalize_group_code(expense_type_code)
groups[group_code] = {
"document_indexes": [],
"amount_total": self._resolve_amount_value(payload),
"expense_type": expense_type_code or "other",
"scene_label": GROUP_SCENE_LABELS.get(group_code, "其他费用"),
"reasons": ["当前主要依据用户文本和页面上下文进行分单建议。"],
}
claim_groups: list[UserAgentReviewClaimGroup] = []
for index, (group_code, bucket) in enumerate(groups.items(), start=1):
title = f"建议报销单 {index}{bucket['scene_label']}"
rationale = (
"".join(dict.fromkeys(str(item) for item in bucket["reasons"]))
if bucket["reasons"]
else "当前仅有单一场景,无需拆单。"
)
claim_groups.append(
UserAgentReviewClaimGroup(
group_code=group_code,
title=title,
expense_type=str(bucket["expense_type"]),
scene_label=str(bucket["scene_label"]),
document_indexes=list(bucket["document_indexes"]),
amount_total=round(float(bucket["amount_total"]), 2),
rationale=rationale,
)
)
return claim_groups
def _build_review_risk_briefs(
self,
payload: UserAgentRequest,
*,
citations: list[UserAgentCitation],
document_cards: list[UserAgentReviewDocumentCard],
claim_groups: list[UserAgentReviewClaimGroup],
) -> list[UserAgentReviewRiskBrief]:
briefs: list[UserAgentReviewRiskBrief] = []
employee_name = self._collect_entity_values(payload).get("employee_name") or str(
payload.context_json.get("name") or ""
).strip()
if employee_name:
since = datetime.now(UTC) - timedelta(days=90)
stmt = select(ExpenseClaim).where(
ExpenseClaim.employee_name == employee_name,
ExpenseClaim.occurred_at >= since,
)
recent_claims = list(self.db.scalars(stmt).all())
if recent_claims:
risky_count = sum(1 for item in recent_claims if item.risk_flags_json)
draft_count = sum(1 for item in recent_claims if item.status == "draft")
briefs.append(
UserAgentReviewRiskBrief(
title="历史报销画像",
level="info",
content=(
f"{employee_name} 最近 90 天共有 {len(recent_claims)} 笔报销,"
f"其中 {risky_count} 笔带风险标记,{draft_count} 笔仍处于草稿态。"
),
)
)
current_amount = self._resolve_amount_value(payload)
if current_amount > 0:
duplicate_count = sum(
1
for item in recent_claims
if abs(float(item.amount) - current_amount) < 0.01
)
if duplicate_count:
briefs.append(
UserAgentReviewRiskBrief(
title="金额重复预警",
level="warning",
content=(
f"近 90 天发现 {duplicate_count} 笔金额相同的报销记录,"
"提交前建议核对是否为重复报销或拆分不当。"
),
)
)
if citations:
briefs.append(
UserAgentReviewRiskBrief(
title="制度注意事项",
level="info",
content=citations[0].excerpt or f"请先核对 {citations[0].title} 的制度要求。",
)
)
warning_count = sum(len(item.warnings) for item in document_cards)
if warning_count:
briefs.append(
UserAgentReviewRiskBrief(
title="票据识别提醒",
level="warning",
content=f"当前共有 {warning_count} 条票据识别提示,建议逐张确认 OCR 识别字段。",
)
)
if len(claim_groups) > 1:
briefs.append(
UserAgentReviewRiskBrief(
title="建议拆单",
level="high",
content=f"系统检测到 {len(claim_groups)} 类费用场景,建议拆成多张报销单后再提交。",
)
)
return briefs[:4]
def _build_review_confirmation_actions(
self,
payload: UserAgentRequest,
*,
claim_groups: list[UserAgentReviewClaimGroup],
draft_payload: UserAgentDraftPayload | None,
) -> list[UserAgentReviewAction]:
actions: list[UserAgentReviewAction] = []
if claim_groups:
if len(claim_groups) > 1:
actions.append(
UserAgentReviewAction(
label=f"{len(claim_groups)} 张报销单生成",
action_type="split_claims",
description="保留当前识别结果,并按费用场景拆分生成多张报销草稿。",
emphasis="primary",
)
)
else:
actions.append(
UserAgentReviewAction(
label="确认并继续生成草稿",
action_type="confirm_review",
description="确认当前识别字段无误后,继续生成或覆盖当前报销草稿。",
emphasis="primary",
)
)
for slot in payload.ontology.missing_slots[:3]:
label = SLOT_LABELS.get(slot, slot)
actions.append(
UserAgentReviewAction(
label=f"补充{label}",
action_type="fill_slot",
description=f"当前还缺少 {label},补充后可提升分单和建单准确度。",
emphasis="secondary",
)
)
if self._resolve_attachment_count(payload) <= 0:
actions.append(
UserAgentReviewAction(
label="继续上传票据",
action_type="upload_more",
description="上传发票、行程单或电子票据后,系统会重新识别并完善报销分组。",
emphasis="secondary",
)
)
if draft_payload is not None and draft_payload.claim_no:
actions.append(
UserAgentReviewAction(
label=f"查看草稿 {draft_payload.claim_no}",
action_type="open_claim",
description="查看当前已创建的报销草稿,并继续补充字段或附件。",
emphasis="secondary",
)
)
return actions[:5]
def _build_review_intent_summary(
self,
payload: UserAgentRequest,
*,
slot_cards: list[UserAgentReviewSlotCard],
claim_groups: list[UserAgentReviewClaimGroup],
) -> str:
slots = {item.key: item for item in slot_cards}
expense_type = slots.get("expense_type")
amount = slots.get("amount")
time_range = slots.get("time_range")
location = slots.get("location")
customer = slots.get("customer_name")
summary = "系统识别出您想要发起一笔报销。"
if expense_type and expense_type.value:
summary = f"系统识别出您想要报销{expense_type.value}"
details: list[str] = []
if customer and customer.value:
details.append(f"客户名称:{customer.value}")
if time_range and time_range.value:
details.append(f"时间:{time_range.value}")
if location and location.value:
details.append(f"地点:{location.value}")
if amount and amount.value:
details.append(f"金额:{amount.value}")
if claim_groups and len(claim_groups) > 1:
details.append(f"建议拆分为 {len(claim_groups)} 张报销单")
if details:
return f"{summary} {''.join(details)}"
return summary
@staticmethod
def _should_skip_model_answer(
payload: UserAgentRequest,
review_payload: UserAgentReviewPayload | None,
) -> bool:
if review_payload is None:
return False
return payload.ontology.scenario == "expense" and (
payload.ontology.intent == "draft"
or int(payload.context_json.get("attachment_count") or 0) > 0
)
def _build_rule_citations(self, payload: UserAgentRequest) -> list[UserAgentCitation]:
domain = self._resolve_domain(payload.ontology.scenario)
items = self.asset_service.list_assets(
@@ -516,6 +993,45 @@ class UserAgentService:
return []
return [str(name) for name in names if str(name).strip()]
@staticmethod
def _resolve_attachment_count(payload: UserAgentRequest) -> int:
names = UserAgentService._resolve_attachment_names(payload)
if names:
return len(names)
try:
return max(0, int(payload.context_json.get("attachment_count") or 0))
except (TypeError, ValueError):
return 0
@staticmethod
def _resolve_ocr_documents(payload: UserAgentRequest) -> list[dict[str, object]]:
documents = payload.context_json.get("ocr_documents")
if not isinstance(documents, list):
return []
normalized: list[dict[str, object]] = []
for item in documents[:8]:
if not isinstance(item, dict):
continue
normalized.append(item)
return normalized
@staticmethod
def _resolve_conversation_history(payload: UserAgentRequest) -> list[dict[str, object]]:
history = payload.context_json.get("conversation_history")
if not isinstance(history, list):
return []
normalized: list[dict[str, object]] = []
for item in history[-8:]:
if not isinstance(item, dict):
continue
role = str(item.get("role") or "").strip()
content = str(item.get("content") or "").strip()
if not role or not content:
continue
normalized.append({"role": role, "content": content})
return normalized
@staticmethod
def _resolve_domain(scenario: str) -> str | None:
if scenario == "expense":
@@ -557,3 +1073,210 @@ class UserAgentService:
if len(cleaned) >= 2:
break
return "".join(cleaned[:2])
def _collect_entity_values(self, payload: UserAgentRequest) -> dict[str, str]:
values = {
"employee_name": "",
"customer": "",
"participants": "",
"amount": "",
"expense_type": "",
"expense_type_code": "",
}
participants: list[str] = []
for item in payload.ontology.entities:
if item.type == "employee" and not values["employee_name"]:
values["employee_name"] = item.value
elif item.type == "customer" and not values["customer"]:
values["customer"] = item.value
elif item.type == "amount" and item.role != "threshold" and not values["amount"]:
values["amount"] = f"{item.value}" if "" not in item.value else item.value
elif item.type == "expense_type" and not values["expense_type_code"]:
values["expense_type_code"] = item.normalized_value
values["expense_type"] = EXPENSE_TYPE_LABELS.get(
item.normalized_value,
item.value,
)
elif item.type in {"participant", "person"} and item.value.strip():
participants.append(item.value.strip())
if participants:
values["participants"] = "".join(dict.fromkeys(participants))
return values
def _format_time_range(self, payload: UserAgentRequest) -> str:
time_range = payload.ontology.time_range
if time_range.raw:
return time_range.raw
if time_range.start_date and time_range.end_date:
if time_range.start_date == time_range.end_date:
return time_range.start_date
return f"{time_range.start_date}{time_range.end_date}"
return ""
def _resolve_location_value(self, payload: UserAgentRequest) -> str:
request_context = payload.context_json.get("request_context")
if isinstance(request_context, dict):
for key in ("city", "location"):
value = str(request_context.get(key) or "").strip()
if value:
return value
city_match = re.search(r"去(?P<city>[\u4e00-\u9fa5]{2,8})(?:出差|拜访|参会|见客户|客户现场)", payload.message)
if city_match:
return city_match.group("city").strip()
if "客户现场" in payload.message.replace(" ", ""):
return "客户现场"
return ""
def _make_slot_card(
self,
*,
key: str,
value: str,
source: str,
confidence: float,
missing_slots: set[str],
required: bool = True,
) -> UserAgentReviewSlotCard:
is_missing = key in missing_slots or not str(value).strip()
return UserAgentReviewSlotCard(
key=key,
label=SLOT_LABELS.get(key, key),
value=str(value or "").strip(),
source=source,
confidence=confidence,
required=required,
confirmed=not is_missing and source in {"user_text", "page_context", "upload"},
status="missing" if is_missing else "identified" if source == "user_text" else "inferred",
hint=f"建议补充 {SLOT_LABELS.get(key, key)}"
if is_missing and required
else "",
)
def _classify_document(
self,
item: dict[str, object],
payload: UserAgentRequest,
) -> dict[str, str]:
text = " ".join(
[
str(item.get("filename") or ""),
str(item.get("summary") or ""),
str(item.get("text") or ""),
]
).lower()
compact = text.replace(" ", "")
expense_type_code = self._collect_entity_values(payload).get("expense_type_code", "")
has_customer = bool(self._collect_entity_values(payload).get("customer"))
if any(keyword in compact for keyword in ("机票", "航班", "火车", "高铁", "行程单")):
return {
"document_type": "travel_ticket",
"expense_type": "travel",
"group_code": "travel",
"scene_label": "差旅票据",
}
if any(keyword in compact for keyword in ("酒店", "住宿", "宾馆")):
return {
"document_type": "hotel_invoice",
"expense_type": "hotel",
"group_code": "travel",
"scene_label": "住宿票据",
}
if any(keyword in compact for keyword in ("打车", "出租车", "滴滴", "网约车", "过路费", "停车")):
return {
"document_type": "transport_receipt",
"expense_type": "transport",
"group_code": "travel",
"scene_label": "交通票据",
}
if any(keyword in compact for keyword in ("", "饭店", "酒楼", "酒家", "餐饮", "meal")):
group_code = "entertainment" if expense_type_code == "entertainment" or has_customer else "meal"
return {
"document_type": "meal_receipt",
"expense_type": group_code,
"group_code": group_code,
"scene_label": "餐饮票据",
}
return {
"document_type": "other",
"expense_type": expense_type_code or "other",
"group_code": self._normalize_group_code(expense_type_code or "other"),
"scene_label": "其他票据",
}
@staticmethod
def _normalize_group_code(expense_type_code: str) -> str:
if expense_type_code in {"travel", "hotel", "transport"}:
return "travel"
if expense_type_code in {"entertainment", "meal"}:
return expense_type_code
return "other"
def _extract_document_fields(self, item: dict[str, object]) -> dict[str, str]:
text = " ".join([str(item.get("summary") or ""), str(item.get("text") or "")]).strip()
fields: dict[str, str] = {}
amount_match = AMOUNT_TEXT_PATTERN.search(text)
if amount_match:
fields["金额"] = f"{amount_match.group(1)}"
date_match = DATE_TEXT_PATTERN.search(text)
if date_match:
fields["时间"] = date_match.group(1)
merchant = self._extract_document_merchant_name(item)
if merchant:
fields["商户/酒店"] = merchant
return fields
@staticmethod
def _extract_document_merchant_name(item: dict[str, object]) -> str:
text = " ".join([str(item.get("summary") or ""), str(item.get("text") or "")]).strip()
for keyword in ("酒店", "宾馆", "饭店", "酒楼", "餐厅", "航空", "铁路", "滴滴"):
if keyword in text:
return keyword
return ""
@staticmethod
def _extract_amount_from_card(card: UserAgentReviewDocumentCard) -> float:
for item in card.fields:
if item.label != "金额":
continue
try:
return float(str(item.value).replace("", "").strip())
except ValueError:
return 0.0
return 0.0
def _resolve_amount_value(self, payload: UserAgentRequest) -> float:
for item in payload.ontology.entities:
if item.type == "amount" and item.role != "threshold":
try:
return float(item.normalized_value)
except ValueError:
return 0.0
return 0.0
def _sum_ocr_amounts(self, ocr_documents: list[dict[str, object]]) -> float:
total = 0.0
for item in ocr_documents:
fields = self._extract_document_fields(item)
amount_text = str(fields.get("金额") or "").replace("", "").strip()
if not amount_text:
continue
try:
total += float(amount_text)
except ValueError:
continue
return total
def _infer_expense_type_from_documents(
self,
payload: UserAgentRequest,
ocr_documents: list[dict[str, object]],
) -> str:
labels: list[str] = []
for item in ocr_documents:
classified = self._classify_document(item, payload)
label = GROUP_SCENE_LABELS.get(classified["group_code"], "")
if label and label not in labels:
labels.append(label)
return " + ".join(labels[:3])