Files
X-Financial/server/src/app/services/steward_planner.py

1005 lines
43 KiB
Python
Raw Normal View History

from __future__ import annotations
import re
import uuid
from dataclasses import dataclass
from datetime import UTC, date, datetime, timedelta
from typing import Any
from app.schemas.steward import (
StewardAttachmentGroup,
StewardAttachmentInput,
StewardCandidateFlow,
StewardConfirmationAction,
StewardPendingFlowConfirmation,
StewardPlanRequest,
StewardPlanResponse,
StewardTask,
StewardThinkingEvent,
)
from app.services.steward_constants import BUSINESS_CANONICAL_FIELD_ORDER, BUSINESS_CANONICAL_FIELDS
from app.services.ontology_field_registry import normalize_ontology_form_values
from app.services.steward_intent_agent import StewardIntentAgent
from app.services.steward_model_plan_builder import StewardModelPlanBuilder
CITY_NAMES = (
"北京",
"上海",
"广州",
"深圳",
"杭州",
"南京",
"苏州",
"成都",
"重庆",
"天津",
"武汉",
"西安",
"长沙",
"郑州",
"青岛",
"厦门",
"福州",
"合肥",
"济南",
"沈阳",
"大连",
"宁波",
"无锡",
)
# 业务信号关键词:用于判定输入是否与小财管家支持的财务事项相关。
# 只要清洗后的消息命中其中任意一个关键词,就视为业务相关;否则进入 off_topic 拦截。
STEWARD_BUSINESS_SIGNAL_KEYWORDS: tuple[str, ...] = (
# 动作词
"申请", "报销", "草稿", "提交", "审批", "保存", "发起", "创建", "核对", "归集",
# 差旅场景
"出差", "差旅", "费用", "交通", "住宿", "招待", "酒店", "机票", "航班", "高铁",
"动车", "火车", "出租车", "的士", "网约车", "打车", "地铁", "公交", "用餐", "餐饮", "宴请",
# 票据/凭证
"票据", "发票", "凭证", "行程单", "付款截图", "付款", "小票", "收据",
# 业务对象
"客户", "项目", "拜访", "会议", "培训", "部署", "实施", "支撑", "支持", "协助",
"调研", "驻场", "上线", "验收", "审核",
# 时间信号
"昨天", "前天", "明天", "后天", "下周", "下月", "近期", "月底", "今天", "上周", "上月",
# 金额/数量("天"用于"出差3天"等表达)
"金额", "", "", "", "", "",
# 复用城市名信号
*CITY_NAMES,
)
APPLICATION_SPLIT_PATTERN = re.compile(r"(?:^|[,。;;])[^,。;;]*?(?:申请|出差申请|差旅申请)[^,。;;]*")
REIMBURSEMENT_PATTERN = re.compile(r"(?:我要报销|还需要报销|需要报销|报销)([^,。;;?!\n]+)")
MONTH_DAY_PATTERN = re.compile(r"(?P<month>\d{1,2})\s*月\s*(?P<day>\d{1,2})\s*(?:日|号)?")
ISO_DATE_PATTERN = re.compile(r"(?P<year>\d{4})[-/年](?P<month>\d{1,2})[-/月](?P<day>\d{1,2})(?:日)?")
BUSINESS_FIELD_LABELS = {
"expense_type": "费用类型",
"time_range": "时间",
"location": "地点",
"reason": "事由",
"amount": "金额",
"transport_mode": "出行方式",
"attachments": "附件/凭证",
"customer_name": "客户或项目对象",
"merchant_name": "商户/开票方",
"department_name": "所属部门",
"employee_name": "申请人",
"employee_no": "员工编号",
}
EXPENSE_TYPE_LABELS = {
"travel": "差旅",
"transport": "交通费",
"entertainment": "业务招待费",
"office": "办公用品",
"meeting": "会议费",
"training": "培训费",
"other": "其他费用",
}
TRANSPORT_MODE_LABELS = {
"train": "火车/高铁",
"flight": "飞机",
"taxi": "出租车/网约车",
"subway": "地铁",
"other": "其他交通方式",
}
@dataclass(frozen=True)
class PlannedTaskDraft:
task_type: str
segment: str
index: int
class StewardPlannerService:
"""小财管家第一版规划服务:只生成计划,不执行入库类动作。"""
def __init__(self, intent_agent: StewardIntentAgent | None = None) -> None:
self.intent_agent = intent_agent
def build_plan(self, request: StewardPlanRequest) -> StewardPlanResponse:
message = self._clean_text(request.message)
if not message:
raise ValueError("小财管家需要一段任务描述。")
base_date = self._resolve_base_date(request.client_now_iso, request.context_json)
# 业务无关输入拦截(纯数字、问候、闲聊、乱码等):在进入 LLM/规则兜底之前直接返回 off_topic 计划。
if self._is_business_irrelevant_input(message, request):
return self._build_off_topic_plan(request)
model_call_traces: list[dict[str, Any]] = []
fallback_reason = ""
if self.intent_agent is not None and self._should_use_model_intent_recognition(message, base_date, request):
try:
intent_result = self.intent_agent.detect(
request,
base_date=base_date,
canonical_fields=list(BUSINESS_CANONICAL_FIELD_ORDER),
)
if intent_result is not None:
model_call_traces = intent_result.model_call_traces
llm_plan = StewardModelPlanBuilder(self).build(
intent_result,
request=request,
base_date=base_date,
)
if llm_plan is not None:
if self._looks_like_ambiguous_travel_flow(message, base_date, request):
return self._build_pending_flow_fallback_plan(
request,
base_date=base_date,
model_call_traces=model_call_traces,
fallback_reason=(
"主模型返回了直接任务,但当前话术没有明确申请或报销动作;"
"服务端已改为候选流程确认,避免误入申请流程。"
),
planning_source="llm_function_call",
)
return llm_plan
model_call_traces = getattr(self.intent_agent, "last_call_traces", []) or model_call_traces
fallback_reason = "主模型未返回可用的 function calling 计划,已切换到规则兜底。"
except Exception as exc:
model_call_traces = getattr(self.intent_agent, "last_call_traces", []) or model_call_traces
fallback_reason = f"主模型 function calling 调用失败,已切换到规则兜底:{exc}"
return self._build_rule_fallback_plan(
request,
base_date=base_date,
model_call_traces=model_call_traces,
fallback_reason=fallback_reason,
)
def _should_use_model_intent_recognition(
self,
message: str,
base_date: date,
request: StewardPlanRequest,
) -> bool:
if self._looks_like_ambiguous_travel_flow(message, base_date, request):
return False
return self._has_multiple_financial_demands(message)
@staticmethod
def _is_business_irrelevant_input(message: str, request: StewardPlanRequest) -> bool:
"""判断输入是否与小财管家支持的财务事项完全无关。
判定规则消息去除所有空白后不含任何业务信号关键词且没有上传附件
即视为业务无关输入如纯数字问候闲聊乱码
"""
if request.attachments:
return False
compact = re.sub(r"\s+", "", message)
if not compact:
return False
return not any(keyword in compact for keyword in STEWARD_BUSINESS_SIGNAL_KEYWORDS)
def _build_off_topic_plan(self, request: StewardPlanRequest) -> StewardPlanResponse:
"""业务无关输入的兜底计划:明确告知用户未识别到财务事项,并给出话术示例。"""
return StewardPlanResponse(
plan_id=f"steward_plan_{uuid.uuid4().hex[:12]}",
plan_status="off_topic",
planning_source="rule_fallback",
next_action="none",
summary="这看起来跟财务任务没什么关系,小财管家没识别到费用申请或费用报销的意图。",
thinking_events=[
StewardThinkingEvent(
event_id="intent_agent_off_topic",
stage="off_topic",
title="未识别到财务事项",
content=(
"我检查了这句话,没有发现费用申请、报销、出差、交通、招待等财务线索。"
"如果你确实是要处理财务任务,可以参考下面的示例换一种说法。"
),
)
],
tasks=[],
attachment_groups=[],
confirmation_groups=[],
candidate_flows=[],
suggested_prompts=[
"我想要申请明天去北京出差3天支撑客户现场实施",
"我要报销昨天的交通费",
"报销上周出差上海的费用",
],
model_call_traces=[],
)
def _build_rule_fallback_plan(
self,
request: StewardPlanRequest,
*,
base_date: date,
model_call_traces: list[dict[str, Any]] | None = None,
fallback_reason: str = "",
) -> StewardPlanResponse:
message = self._clean_text(request.message)
if self._looks_like_ambiguous_travel_flow(message, base_date, request):
return self._build_pending_flow_fallback_plan(
request,
base_date=base_date,
model_call_traces=model_call_traces,
fallback_reason=fallback_reason,
)
task_drafts = self._extract_task_drafts(message)
tasks = [self._build_task(draft, base_date, request) for draft in task_drafts]
if not tasks:
tasks = [self._build_fallback_task(message, base_date, request)]
attachment_groups = self._build_attachment_groups(request.attachments, tasks)
confirmation_groups = self._build_confirmation_actions(tasks, attachment_groups)
thinking_events = self._build_thinking_events(tasks, attachment_groups, request.attachments)
if fallback_reason:
thinking_events.insert(
0,
StewardThinkingEvent(
event_id="intent_agent_rule_fallback",
stage="rule_fallback",
title="意图识别智能体进入兜底模式",
content=fallback_reason,
),
)
plan_id = f"steward_plan_{uuid.uuid4().hex[:12]}"
return StewardPlanResponse(
plan_id=plan_id,
plan_status="needs_confirmation" if confirmation_groups else "ready_to_delegate",
planning_source="rule_fallback",
next_action="confirm_task" if confirmation_groups else "delegate_task",
summary=self._build_summary(tasks, attachment_groups),
thinking_events=thinking_events,
tasks=tasks,
attachment_groups=attachment_groups,
confirmation_groups=confirmation_groups,
model_call_traces=model_call_traces or [],
)
def _build_pending_flow_fallback_plan(
self,
request: StewardPlanRequest,
*,
base_date: date,
model_call_traces: list[dict[str, Any]] | None = None,
fallback_reason: str = "",
planning_source: str = "rule_fallback",
) -> StewardPlanResponse:
candidates = self._build_rule_candidate_flows(request, base_date)
pending = StewardPendingFlowConfirmation(
status="pending",
source_message=request.message,
reason="当前话术描述了出差事项,但没有明确说明要补办申请还是发起报销。",
candidate_flows=candidates,
)
thinking_events = []
if fallback_reason:
thinking_events.append(
StewardThinkingEvent(
event_id="intent_agent_rule_fallback",
stage="rule_fallback",
title="意图识别智能体进入兜底模式",
content=fallback_reason,
)
)
thinking_events.append(
StewardThinkingEvent(
event_id="intent_pending_flow_confirmation",
stage="flow_confirmation",
title="需要确认流程方向",
content="我识别到时间、地点和出差事由,但没有识别到明确的申请或报销动作,需要先请你选择流程方向。",
)
)
return StewardPlanResponse(
plan_id=f"steward_plan_{uuid.uuid4().hex[:12]}",
plan_status="needs_flow_confirmation",
planning_source=planning_source, # type: ignore[arg-type]
next_action="confirm_flow",
summary=(
"我识别到这是一次出差事项,但还不能确定你要做的是"
"**补办出差申请**还是**发起费用报销**。请先选择一个方向。"
),
thinking_events=thinking_events,
pending_flow_confirmation=pending,
candidate_flows=candidates,
model_call_traces=model_call_traces or [],
)
def _build_rule_candidate_flows(
self,
request: StewardPlanRequest,
base_date: date,
) -> list[StewardCandidateFlow]:
application_fields = self._extract_ontology_fields(
request.message,
"expense_application",
base_date,
request,
)
reimbursement_fields = self._extract_ontology_fields(
request.message,
"reimbursement",
base_date,
request,
)
return [
StewardCandidateFlow(
flow_id="travel_application",
label="补办出差申请",
confidence=0.52,
reason="用户描述了出差时间、地点和事由,但没有明确说要报销。",
ontology_fields=application_fields,
missing_fields=self._resolve_missing_fields("expense_application", application_fields),
),
StewardCandidateFlow(
flow_id="travel_reimbursement",
label="发起费用报销",
confidence=0.48,
reason="用户描述的也可能是已发生出差事项,需要进入报销材料整理。",
ontology_fields=reimbursement_fields,
missing_fields=self._resolve_missing_fields("reimbursement", reimbursement_fields),
),
]
def _extract_task_drafts(self, message: str) -> list[PlannedTaskDraft]:
drafts: list[PlannedTaskDraft] = []
first_reimbursement = self._find_first_reimbursement_index(message)
application_source = message[:first_reimbursement] if first_reimbursement >= 0 else message
if self._looks_like_application(application_source) or self._looks_like_future_travel_application(application_source):
drafts.append(
PlannedTaskDraft(
task_type="expense_application",
segment=application_source.strip(",。;; "),
index=len(drafts) + 1,
)
)
for match in REIMBURSEMENT_PATTERN.finditer(message):
segment = f"报销{match.group(1)}"
drafts.append(
PlannedTaskDraft(
task_type="reimbursement",
segment=segment.strip(",。;; "),
index=len(drafts) + 1,
)
)
return drafts
def _has_multiple_financial_demands(self, message: str) -> bool:
task_drafts = self._extract_task_drafts(message)
if len(task_drafts) > 1:
return True
compact = re.sub(r"\s+", "", message)
if not compact:
return False
application_signal = self._looks_like_application(compact) or self._looks_like_future_travel_application(compact)
reimbursement_signal = self._find_first_reimbursement_index(compact) >= 0
if application_signal and reimbursement_signal:
return True
connector_signal = re.search(r"并且|同时|另外|还有|还要|以及|再", compact)
repeated_reimbursement_signal = len(list(REIMBURSEMENT_PATTERN.finditer(compact))) > 1
return bool(connector_signal and repeated_reimbursement_signal)
@staticmethod
def _find_first_reimbursement_index(message: str) -> int:
candidates = [message.find(item) for item in ("我要报销", "还需要报销", "需要报销", "报销")]
positives = [item for item in candidates if item >= 0]
return min(positives) if positives else -1
@staticmethod
def _looks_like_application(text: str) -> bool:
compact = re.sub(r"\s+", "", text)
return bool(compact) and "申请" in compact and bool(re.search(r"出差|差旅|费用|交通|住宿|采购|会务|会议", compact))
@staticmethod
def _looks_like_future_travel_application(text: str) -> bool:
compact = re.sub(r"\s+", "", text)
if not compact or "报销" in compact:
return False
business_signal = re.search(
r"出差|差旅|客户现场|项目|部署|实施|支撑|支持|协助|拜访|调研|培训|会议|驻场|上线|验收",
compact,
)
route_signal = re.search(
fr"(?:去|到|赴|前往)({'|'.join(CITY_NAMES)})",
compact,
)
time_signal = re.search(
r"明天|后天|下周|下月|近期|月底|\d{1,2}月\d{1,2}(?:日|号)?|"
r"\d{4}[-/年]\d{1,2}[-/月]\d{1,2}(?:日)?|[0-9一二两三四五六七八九十]+天",
compact,
)
planned_route_signal = re.search(
r"(?:去|到|赴|前往).{0,24}(?:出差|差旅|客户|现场|项目|部署|实施|支撑|支持|协助|拜访|调研|培训|会议|驻场|上线|验收)|"
r"(?:出差|差旅).{0,24}(?:[0-9一二两三四五六七八九十]+天|客户|现场|项目|部署|实施|支撑|支持|协助|拜访|调研|培训|会议|驻场|上线|验收)",
compact,
)
return bool((business_signal or route_signal) and (time_signal or planned_route_signal))
def _looks_like_ambiguous_travel_flow(
self,
text: str,
base_date: date,
request: StewardPlanRequest,
) -> bool:
compact = re.sub(r"\s+", "", text)
if not compact or request.attachments:
return False
if re.search(r"申请|报销|草稿|提交|审批|保存|发起|创建", compact):
return False
if not re.search(r"出差|差旅|客户现场|项目|部署|实施|支撑|支持|协助|拜访|调研|培训|会议|驻场|上线|验收", compact):
return False
if not self._extract_time_range(compact, base_date):
return False
if not self._extract_location(compact):
return False
return not self._is_future_or_current_time_range(compact, base_date)
def _is_future_or_current_time_range(self, segment: str, base_date: date) -> bool:
normalized = self._extract_time_range(segment, base_date)
if not normalized:
return False
try:
parsed = date.fromisoformat(normalized)
except ValueError:
return False
return parsed >= base_date
def _build_task(
self,
draft: PlannedTaskDraft,
base_date: date,
request: StewardPlanRequest,
) -> StewardTask:
fields = self._extract_ontology_fields(draft.segment, draft.task_type, base_date, request)
missing_fields = self._resolve_missing_fields(draft.task_type, fields)
task_id = f"task_{'app' if draft.task_type == 'expense_application' else 'reim'}_{draft.index:03d}"
assigned_agent = (
"application_assistant"
if draft.task_type == "expense_application"
else "reimbursement_assistant"
)
title_prefix = "费用申请" if draft.task_type == "expense_application" else "费用报销"
title = self._build_task_title(title_prefix, fields, draft.index)
return StewardTask(
task_id=task_id,
task_type=draft.task_type, # type: ignore[arg-type]
assigned_agent=assigned_agent, # type: ignore[arg-type]
title=title,
summary=self._build_task_summary(draft.segment, fields),
status="needs_confirmation",
confidence=self._resolve_task_confidence(draft.segment, fields, draft.task_type),
ontology_fields=fields,
missing_fields=missing_fields,
confirmation_required=True,
)
def _build_fallback_task(
self,
message: str,
base_date: date,
request: StewardPlanRequest,
) -> StewardTask:
task_type = "reimbursement" if "报销" in message or request.attachments else "expense_application"
draft = PlannedTaskDraft(task_type=task_type, segment=message, index=1)
task = self._build_task(draft, base_date, request)
return task.model_copy(update={"confidence": min(task.confidence, 0.58)})
def _extract_ontology_fields(
self,
segment: str,
task_type: str,
base_date: date,
request: StewardPlanRequest,
) -> dict[str, str]:
normalized_context = normalize_ontology_form_values(request.context_json.get("review_form_values"))
fields: dict[str, str] = {
key: value
for key, value in normalized_context.items()
if key in BUSINESS_CANONICAL_FIELDS and str(value or "").strip()
}
expense_type = self._infer_expense_type(segment, task_type)
if expense_type and not fields.get("expense_type"):
fields["expense_type"] = expense_type
time_range = self._extract_time_range(segment, base_date)
if time_range and not fields.get("time_range"):
fields["time_range"] = time_range
location = self._extract_location(segment)
if location and not fields.get("location"):
fields["location"] = location
reason = self._extract_reason(segment, task_type)
if reason and not fields.get("reason"):
fields["reason"] = reason
transport_mode = self._extract_transport_mode(segment)
if transport_mode and not fields.get("transport_mode"):
fields["transport_mode"] = transport_mode
if request.attachments:
fields["attachments"] = "".join(item.name for item in request.attachments if item.name)
return {key: value for key, value in fields.items() if key in BUSINESS_CANONICAL_FIELDS and value}
@staticmethod
def _infer_expense_type(segment: str, task_type: str) -> str:
compact = re.sub(r"\s+", "", segment)
if re.search(r"招待|接待|餐饮|宴请|客户吃饭|业务餐", compact):
return "entertainment"
if re.search(r"出差|差旅|住宿|酒店|机票|航班|高铁|火车", compact):
return "travel"
if re.search(r"交通|出租车|的士|网约车|打车|地铁|公交", compact):
return "transport" if task_type == "reimbursement" else "travel"
return "travel" if task_type == "expense_application" else "other"
def _extract_time_range(self, segment: str, base_date: date) -> str:
compact = re.sub(r"\s+", "", segment)
if "昨天" in compact:
return (base_date - timedelta(days=1)).isoformat()
if "前天" in compact:
return (base_date - timedelta(days=2)).isoformat()
if "明天" in compact:
return (base_date + timedelta(days=1)).isoformat()
if "后天" in compact:
return (base_date + timedelta(days=2)).isoformat()
iso_match = ISO_DATE_PATTERN.search(compact)
if iso_match:
return self._safe_date(
int(iso_match.group("year")),
int(iso_match.group("month")),
int(iso_match.group("day")),
)
month_day = MONTH_DAY_PATTERN.search(compact)
if month_day:
return self._safe_date(
base_date.year,
int(month_day.group("month")),
int(month_day.group("day")),
)
return ""
@staticmethod
def _safe_date(year: int, month: int, day: int) -> str:
try:
return date(year, month, day).isoformat()
except ValueError:
return ""
@staticmethod
def _extract_location(segment: str) -> str:
compact = re.sub(r"\s+", "", segment)
for prefix in ("", "", "", "前往"):
match = re.search(fr"{prefix}({'|'.join(CITY_NAMES)})", compact)
if match:
return match.group(1)
for city in CITY_NAMES:
if city in compact:
return city
return ""
@staticmethod
def _extract_reason(segment: str, task_type: str) -> str:
cleaned = re.sub(r"\s+", "", segment).strip(",。;; ")
if task_type == "expense_application":
match = re.search(r"(辅助|支持|协助|支撑|参加|拜访|调研|实施|部署|审核).+", cleaned)
if match:
return StewardPlannerService._strip_trailing_connectors(match.group(0))
reason = re.sub(r"^.*?(?:出差|差旅)", "", cleaned).strip(",。;;的费用")
return StewardPlannerService._strip_trailing_connectors(reason) or cleaned
cleaned = re.sub(r"^报销", "", cleaned)
cleaned = re.sub(r"^(?:昨天|前天|明天|后天|\d{1,2}月\d{1,2}(?:日|号)?)的?", "", cleaned)
return cleaned.strip(",。;; ") or segment.strip()
@staticmethod
def _strip_trailing_connectors(value: str) -> str:
cleaned = str(value or "").strip(",。;; ")
return re.sub(r"(?:并且|而且|同时|另外|还需要|需要)$", "", cleaned).strip(",。;; ")
@staticmethod
def _extract_transport_mode(segment: str) -> str:
compact = re.sub(r"\s+", "", segment)
if re.search(r"高铁|动车|火车", compact):
return "train"
if re.search(r"飞机|机票|航班", compact):
return "flight"
if re.search(r"出租车|的士|网约车|打车", compact):
return "taxi"
if "交通" in compact:
return "other"
return ""
@staticmethod
def _resolve_missing_fields(task_type: str, fields: dict[str, str]) -> list[str]:
required = ["expense_type", "time_range", "reason"]
if task_type == "expense_application":
required.append("location")
if fields.get("expense_type") in {"travel", "transport"}:
required.append("transport_mode")
return [key for key in required if not str(fields.get(key) or "").strip()]
@staticmethod
def _resolve_task_confidence(segment: str, fields: dict[str, str], task_type: str) -> float:
compact = re.sub(r"\s+", "", segment)
if task_type == "expense_application":
intent_score = 1.0 if (
"申请" in compact or StewardPlannerService._looks_like_future_travel_application(compact)
) else 0.45
else:
intent_score = 1.0 if "报销" in compact else 0.45
time_score = 1.0 if fields.get("time_range") else 0.0
location_score = 1.0 if fields.get("location") else 0.2
scene_score = 1.0 if fields.get("expense_type") and fields["expense_type"] != "other" else 0.35
confidence = min(1.0, 0.35 * intent_score + 0.25 * time_score + 0.2 * location_score + 0.2 * scene_score)
return round(max(0.45, confidence), 2)
def _build_attachment_groups(
self,
attachments: list[StewardAttachmentInput],
tasks: list[StewardTask],
) -> list[StewardAttachmentGroup]:
if not attachments:
return []
classified = [(item, self._classify_attachment(item)) for item in attachments if item.name]
travel_related = [item.name for item, scene in classified if scene in {"travel", "transport"}]
excluded = [item.name for item, scene in classified if scene not in {"travel", "transport"}]
target_task = self._resolve_attachment_target_task(tasks)
groups: list[StewardAttachmentGroup] = []
if travel_related:
confidence = 0.72 + min(0.18, len(travel_related) * 0.04)
groups.append(
StewardAttachmentGroup(
group_id="ag_travel_001",
target_task_id=target_task.task_id if target_task else None,
scene="travel",
scene_label="差旅相关费用",
attachment_names=travel_related,
excluded_attachment_names=excluded,
confidence=round(confidence, 2),
rationale="附件名称或 OCR 摘要中包含差旅、交通、住宿、火车、机票等线索。",
confirmation_required=True,
)
)
elif excluded:
groups.append(
StewardAttachmentGroup(
group_id="ag_other_001",
target_task_id=None,
scene="other",
scene_label="待人工确认费用",
attachment_names=excluded,
excluded_attachment_names=[],
confidence=0.5,
rationale="当前附件缺少可稳定归属到申请或报销任务的差旅线索。",
confirmation_required=True,
)
)
return groups
@staticmethod
def _resolve_attachment_target_task(tasks: list[StewardTask]) -> StewardTask | None:
reimbursement_tasks = [item for item in tasks if item.task_type == "reimbursement"]
for task in reimbursement_tasks:
if task.ontology_fields.get("expense_type") == "travel":
return task
return reimbursement_tasks[0] if reimbursement_tasks else None
@staticmethod
def _classify_attachment(attachment: StewardAttachmentInput) -> str:
text = " ".join(
[
attachment.name,
attachment.media_type,
attachment.ocr_summary,
" ".join(f"{key}:{value}" for key, value in attachment.ocr_fields.items()),
]
)
compact = re.sub(r"\s+", "", text).lower()
if re.search(r"招待|接待|餐饮|宴请|客户|meal|entertainment", compact):
return "entertainment"
if re.search(r"酒店|住宿|差旅|出差|高铁|火车|动车|机票|航班|train|flight|hotel|travel", compact):
return "travel"
if re.search(r"出租车|的士|网约车|打车|交通|taxi|transport", compact):
return "transport"
return "other"
def _build_confirmation_actions(
self,
tasks: list[StewardTask],
attachment_groups: list[StewardAttachmentGroup],
) -> list[StewardConfirmationAction]:
actions: list[StewardConfirmationAction] = []
for task in tasks:
if task.task_type == "expense_application":
action_type = "confirm_create_application"
label = "确认创建申请单"
else:
action_type = "confirm_create_reimbursement_draft"
label = "确认创建报销草稿"
actions.append(
StewardConfirmationAction(
confirmation_id=f"confirm_{task.task_id}",
action_type=action_type,
label=label,
description=f"确认后把“{task.title}”交给{self._agent_label(task.assigned_agent)}继续核对。",
target_task_id=task.task_id,
payload={
"task_id": task.task_id,
"task_type": task.task_type,
"assigned_agent": task.assigned_agent,
"ontology_fields": task.ontology_fields,
},
)
)
for group in attachment_groups:
actions.append(
StewardConfirmationAction(
confirmation_id=f"confirm_{group.group_id}",
action_type="confirm_attachment_group",
label="确认附件归集",
description=f"确认后将 {len(group.attachment_names)} 份附件按“{group.scene_label}”归集。",
target_task_id=group.target_task_id,
attachment_group_id=group.group_id,
payload={
"attachment_group_id": group.group_id,
"target_task_id": group.target_task_id,
"attachment_names": group.attachment_names,
"excluded_attachment_names": group.excluded_attachment_names,
},
)
)
return actions
@staticmethod
def _agent_label(assigned_agent: str) -> str:
return "申请助手" if assigned_agent == "application_assistant" else "报销助手"
def _build_thinking_events(
self,
tasks: list[StewardTask],
attachment_groups: list[StewardAttachmentGroup],
attachments: list[StewardAttachmentInput],
) -> list[StewardThinkingEvent]:
application_count = sum(1 for item in tasks if item.task_type == "expense_application")
reimbursement_count = sum(1 for item in tasks if item.task_type == "reimbursement")
task_intent_summary = self._summarize_task_intents(tasks)
ontology_summary = self._summarize_ontology_coverage(tasks)
delegation_summary = self._summarize_delegation_targets(tasks)
events = [
StewardThinkingEvent(
event_id="intent_agent_entry",
stage="intent_agent",
title="意图识别智能体接管",
content=(
f"检测到复合财务话术,当前不是单一助手会话;"
f"已进入小财管家编排模式,候选任务共 {len(tasks)} 个。"
),
),
StewardThinkingEvent(
event_id="intent_task_split",
stage="task_split",
title=f"拆分申请 {application_count} 个、报销 {reimbursement_count}",
content=task_intent_summary,
),
StewardThinkingEvent(
event_id="intent_ontology_mapping",
stage="ontology_mapping",
title="核对业务要素",
content=ontology_summary,
),
]
gap_event = self._build_business_gap_thinking_event(tasks)
if gap_event:
events.append(gap_event)
if attachments:
events.append(
StewardThinkingEvent(
event_id="intent_attachment_correlation",
stage="attachment_correlation",
title="关联附件与任务线索",
content=self._summarize_attachment_correlation(attachment_groups, len(attachments)),
)
)
events.append(
StewardThinkingEvent(
event_id="intent_delegation_gate",
stage="delegation_gate",
title="生成确认点并准备分派",
content=f"{delegation_summary} 创建单据、生成草稿、绑定附件和提交审批都会等待用户确认。",
)
)
return events
@staticmethod
def _summarize_task_intents(tasks: list[StewardTask]) -> str:
if not tasks:
return "当前输入尚未形成稳定任务,先保留为待确认财务事项。"
parts = []
for task in tasks:
task_label = "申请" if task.task_type == "expense_application" else "报销"
fields = task.ontology_fields
anchors = []
if fields.get("time_range"):
anchors.append(fields["time_range"])
if fields.get("location"):
anchors.append(fields["location"])
if fields.get("expense_type"):
anchors.append(StewardPlannerService._format_business_field_value("expense_type", fields["expense_type"]))
anchor_text = "".join(anchors) if anchors else "待补充关键字段"
parts.append(f"{task_label}{task.title}{anchor_text}")
return "".join(parts)
@staticmethod
def _summarize_ontology_coverage(tasks: list[StewardTask]) -> str:
mapped_labels = []
missing_labels = []
for task in tasks:
mapped_labels.extend(StewardPlannerService._business_field_label(key) for key in task.ontology_fields.keys())
missing_labels.extend(StewardPlannerService._business_field_label(key) for key in task.missing_fields)
mapped = "".join(dict.fromkeys(label for label in mapped_labels if label)) or "暂无稳定业务要素"
missing = ";还缺少:" + "".join(dict.fromkeys(label for label in missing_labels if label)) if missing_labels else ""
return f"已把用户输入归一为业务要素:{mapped}{missing}。后续执行仍会先让用户确认。"
@staticmethod
def _build_business_gap_thinking_event(tasks: list[StewardTask]) -> StewardThinkingEvent | None:
gap_lines = []
for task in tasks:
if not task.missing_fields:
continue
missing_labels = [
StewardPlannerService._business_field_label(key)
for key in task.missing_fields
if key
]
if not missing_labels:
continue
if task.task_type == "expense_application" and "transport_mode" in task.missing_fields:
gap_lines.append(
(
f"{task.title}已识别到{StewardPlannerService._summarize_known_business_points(task)}"
"但用户没有说明出行方式;出行方式会影响交通费用测算,进入申请单核对后需要先追问火车、飞机或轮船。"
)
)
else:
gap_lines.append(
(
f"{task.title}还缺少{''.join(dict.fromkeys(missing_labels))}"
"需要在对应步骤里继续向用户确认,不能直接执行入库或提交。"
)
)
if not gap_lines:
return None
return StewardThinkingEvent(
event_id="intent_business_gap_check",
stage="business_gap_check",
title="判断待补充信息",
content="".join(gap_lines),
)
@staticmethod
def _summarize_known_business_points(task: StewardTask) -> str:
parts = []
for key in ("time_range", "location", "reason", "expense_type"):
value = str(task.ontology_fields.get(key) or "").strip()
if value:
parts.append(
f"{StewardPlannerService._business_field_label(key)}"
f"{StewardPlannerService._format_business_field_value(key, value)}"
)
return "".join(parts) or "部分业务要素"
@staticmethod
def _business_field_label(key: str) -> str:
return BUSINESS_FIELD_LABELS.get(str(key or "").strip(), str(key or "").strip())
@staticmethod
def _format_business_field_value(key: str, value: str) -> str:
cleaned = str(value or "").strip()
if key == "expense_type":
return EXPENSE_TYPE_LABELS.get(cleaned, cleaned)
if key == "transport_mode":
return TRANSPORT_MODE_LABELS.get(cleaned, cleaned)
return cleaned
@staticmethod
def _summarize_attachment_correlation(
attachment_groups: list[StewardAttachmentGroup],
total_attachment_count: int,
) -> str:
grouped_names = []
excluded_names = []
for group in attachment_groups:
grouped_names.extend(group.attachment_names)
excluded_names.extend(group.excluded_attachment_names)
grouped_text = "".join(grouped_names) if grouped_names else "暂无可稳定归集附件"
excluded_text = ";排除或单独确认:" + "".join(excluded_names) if excluded_names else ""
return f"已核对 {total_attachment_count} 份附件,建议归集:{grouped_text}{excluded_text}"
@staticmethod
def _summarize_delegation_targets(tasks: list[StewardTask]) -> str:
application_count = sum(1 for item in tasks if item.assigned_agent == "application_assistant")
reimbursement_count = sum(1 for item in tasks if item.assigned_agent == "reimbursement_assistant")
parts = []
if application_count:
parts.append(f"{application_count} 个申请任务交给申请助手")
if reimbursement_count:
parts.append(f"{reimbursement_count} 个报销任务交给报销助手")
return "".join(parts) + "" if parts else "尚无可分派任务。"
@staticmethod
def _build_summary(tasks: list[StewardTask], attachment_groups: list[StewardAttachmentGroup]) -> str:
parts = [f"我识别到 {len(tasks)} 个待处理任务"]
if attachment_groups:
grouped = sum(len(item.attachment_names) for item in attachment_groups)
parts.append(f"并形成 {grouped} 份附件的归集建议")
parts.append(",请确认后我再分派给对应助手执行。")
return "".join(parts)
@staticmethod
def _build_task_title(prefix: str, fields: dict[str, str], index: int) -> str:
location = fields.get("location", "")
time_range = fields.get("time_range", "")
expense_type = fields.get("expense_type", "")
subject = location or {"travel": "差旅", "transport": "交通", "entertainment": "招待"}.get(expense_type, "")
if subject and time_range:
return f"{prefix} {time_range} {subject}"
if subject:
return f"{prefix} {subject}"
return f"{prefix} {index}"
@staticmethod
def _build_task_summary(segment: str, fields: dict[str, str]) -> str:
field_parts = []
for key, label in (
("time_range", "时间"),
("location", "地点"),
("expense_type", "费用类型"),
("reason", "事由"),
("transport_mode", "交通方式"),
):
value = fields.get(key)
if value:
field_parts.append(f"{label}{value}")
return "".join(field_parts) or segment
@staticmethod
def _resolve_base_date(client_now_iso: str | None, context_json: dict[str, Any]) -> date:
raw_value = client_now_iso or str(context_json.get("client_now_iso") or "").strip()
if raw_value:
try:
parsed = datetime.fromisoformat(raw_value.replace("Z", "+00:00"))
return parsed.date()
except ValueError:
pass
return datetime.now(UTC).date()
@staticmethod
def _clean_text(value: Any) -> str:
return re.sub(r"\s+", " ", str(value or "")).strip()