feat(steward): off_topic 场景细分与引导回复

- 将业务无关输入细分为 greeting / meaningless / off_business 三类场景
- 新增 StewardOffTopicAgent,用 function calling 生成管家语气引导回复
- steward endpoint 与 user_agent_application 串联 off_topic 引导话术
- 补充 planner 与 user agent 的 off_topic 覆盖测试
This commit is contained in:
caoxiaozhu
2026-06-18 22:12:10 +08:00
parent 127d603e7d
commit a6674a1e76
6 changed files with 952 additions and 50 deletions

View File

@@ -7,9 +7,11 @@ from typing import Annotated, Any
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.responses import StreamingResponse
from sqlalchemy import select
from sqlalchemy.orm import Session
from app.api.deps import get_db
from app.models.financial_record import ExpenseClaim
from app.schemas.common import ErrorResponse
from app.schemas.steward import (
StewardPlanRequest,
@@ -21,9 +23,12 @@ from app.schemas.steward import (
StewardThinkingEvent,
)
from app.services.agent_conversations import AgentConversationService
from app.services.expense_claim_draft_flow import APPROVED_APPLICATION_LINK_STATUSES
from app.services.expense_claims import ExpenseClaimService
from app.services.runtime_chat import RuntimeChatService
from app.services.steward_flow_state import StewardFlowStateService
from app.services.steward_intent_agent import StewardIntentAgent
from app.services.steward_off_topic_agent import StewardOffTopicAgent
from app.services.steward_planner import StewardPlannerService
from app.services.steward_runtime_decision_agent import StewardRuntimeDecisionAgent
from app.services.steward_slot_decision_agent import StewardSlotDecisionAgent
@@ -46,8 +51,10 @@ DbSession = Annotated[Session, Depends(get_db)]
)
def create_steward_plan(payload: StewardPlanRequest, db: DbSession) -> StewardPlanResponse:
try:
plan = _build_steward_planner(db).build_plan(payload)
return _attach_conversation_state(db, payload, plan)
planner = _build_steward_planner(db)
hydrated_payload = _hydrate_required_application_gate(db, payload, planner)
plan = planner.build_plan(hydrated_payload)
return _attach_conversation_state(db, hydrated_payload, plan)
except ValueError as exc:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc
@@ -103,15 +110,16 @@ async def _iter_steward_plan_events(
event_id="intent_agent_stream_start",
stage="stream_start",
title="读取用户输入",
content="我先判断这句话里是否同时包含申请报销或附件归集事项,再决定处理顺序",
content="我先识别申请/报销边界;如果是历史差旅描述,会先查询可关联申请单再决定流程",
status="running",
).model_dump(mode="json"),
)
await asyncio.sleep(0)
try:
plan = planner.build_plan(payload)
plan = _attach_conversation_state(db, payload, plan)
hydrated_payload = _hydrate_required_application_gate(db, payload, planner)
plan = planner.build_plan(hydrated_payload)
plan = _attach_conversation_state(db, hydrated_payload, plan)
except ValueError as exc:
yield _encode_stream_event("error", {"message": str(exc)})
return
@@ -128,11 +136,179 @@ def _encode_stream_event(event: str, data: dict[str, Any]) -> str:
def _build_steward_planner(db: Session) -> StewardPlannerService:
runtime_chat = RuntimeChatService(db)
return StewardPlannerService(
intent_agent=StewardIntentAgent(RuntimeChatService(db)),
intent_agent=StewardIntentAgent(runtime_chat),
off_topic_agent=StewardOffTopicAgent(runtime_chat),
)
def _hydrate_required_application_gate(
db: Session,
payload: StewardPlanRequest,
planner: StewardPlannerService,
) -> StewardPlanRequest:
context_json = dict(payload.context_json or {})
required_gate = context_json.get("required_application_gate")
if isinstance(required_gate, dict):
travel_gate = required_gate.get("travel")
if isinstance(travel_gate, dict) and travel_gate.get("checked") is True:
return payload
message = planner._clean_text(payload.message)
base_date = planner._resolve_base_date(payload.client_now_iso, context_json)
if not planner._looks_like_ambiguous_travel_flow(message, base_date, payload):
return payload
candidates = _query_required_application_gate_candidates(db, payload, context_json)
next_required_gate = dict(required_gate) if isinstance(required_gate, dict) else {}
next_required_gate["travel"] = {
"checked": True,
"candidate_count": len(candidates),
"candidates": candidates[:5],
}
return payload.model_copy(
update={
"context_json": {
**context_json,
"required_application_gate": next_required_gate,
}
}
)
def _query_required_application_gate_candidates(
db: Session,
payload: StewardPlanRequest,
context_json: dict[str, Any],
) -> list[dict[str, Any]]:
identities = _resolve_required_application_gate_identities(payload, context_json)
stmt = (
select(ExpenseClaim)
.order_by(ExpenseClaim.submitted_at.desc(), ExpenseClaim.updated_at.desc())
.limit(200)
)
candidates: list[dict[str, Any]] = []
for claim in db.scalars(stmt).all():
if not ExpenseClaimService._is_expense_application_claim(claim):
continue
if str(claim.status or "").strip().lower() not in APPROVED_APPLICATION_LINK_STATUSES:
continue
if identities and not _claim_matches_required_application_identity(claim, identities):
continue
if not _claim_matches_required_travel_application(claim, payload.message):
continue
candidates.append(_serialize_required_application_gate_candidate(claim))
return candidates
def _resolve_required_application_gate_identities(
payload: StewardPlanRequest,
context_json: dict[str, Any],
) -> set[str]:
raw_values = [
payload.user_id,
context_json.get("user_id"),
context_json.get("username"),
context_json.get("name"),
context_json.get("employee_id"),
context_json.get("employee_no"),
context_json.get("employee_name"),
]
identities: set[str] = set()
for value in raw_values:
normalized = _normalize_required_application_identity(value)
if normalized:
identities.add(normalized)
return identities
def _normalize_required_application_identity(value: Any) -> str:
return str(value or "").strip().casefold()
def _claim_matches_required_application_identity(claim: ExpenseClaim, identities: set[str]) -> bool:
claim_identities = {
_normalize_required_application_identity(claim.employee_id),
_normalize_required_application_identity(claim.employee_name),
}
claim_identities.discard("")
return bool(claim_identities.intersection(identities))
def _claim_matches_required_travel_application(claim: ExpenseClaim, message: str) -> bool:
expense_type = str(claim.expense_type or "").strip().casefold()
if any(token in expense_type for token in ("travel", "trip", "差旅", "出差")):
return True
claim_text = "".join(
[
str(claim.reason or ""),
str(claim.location or ""),
str(claim.claim_no or ""),
]
)
if "差旅" in claim_text or "出差" in claim_text:
return True
compact_message = str(message or "").replace(" ", "")
location = str(claim.location or "").strip()
return bool(location and location in compact_message and "出差" in compact_message)
def _serialize_required_application_gate_candidate(claim: ExpenseClaim) -> dict[str, Any]:
business_time = _resolve_required_application_business_time(claim)
status_label = _resolve_required_application_status_label(claim.status)
return {
"id": str(claim.id or "").strip(),
"claim_no": str(claim.claim_no or "").strip(),
"reason": str(claim.reason or "").strip(),
"location": str(claim.location or "").strip(),
"business_time": business_time,
"status_label": status_label,
"application_claim_id": str(claim.id or "").strip(),
"application_claim_no": str(claim.claim_no or "").strip(),
"application_reason": str(claim.reason or "").strip(),
"application_location": str(claim.location or "").strip(),
"application_business_time": business_time,
"application_status_label": status_label,
}
def _resolve_required_application_business_time(claim: ExpenseClaim) -> str:
for flag in list(claim.risk_flags_json or []):
if not isinstance(flag, dict):
continue
for source in (
flag,
flag.get("application_detail"),
flag.get("applicationDetail"),
flag.get("review_form_values"),
flag.get("reviewFormValues"),
):
if not isinstance(source, dict):
continue
value = (
source.get("application_business_time")
or source.get("applicationBusinessTime")
or source.get("business_time")
or source.get("businessTime")
)
if str(value or "").strip():
return str(value).strip()
if claim.occurred_at is not None:
return claim.occurred_at.date().isoformat()
return ""
def _resolve_required_application_status_label(status: Any) -> str:
normalized = str(status or "").strip().lower()
return {
"approved": "已审批",
"completed": "已完成",
}.get(normalized, normalized)
def _attach_conversation_state(
db: Session,
payload: StewardPlanRequest,

View File

@@ -0,0 +1,157 @@
"""小财管家业务无关输入的引导生成 agent。
当用户的输入被识别为与财务任务无关时(问候、纯数字、闲聊等),
由该 agent 用 function calling 让主模型生成一句管家对主人语气的引导回复。
LLM 不可用或调用失败时,由调用方 fallback 到规则模板。
"""
from __future__ import annotations
import json
from dataclasses import dataclass
from typing import Any
from app.schemas.steward import StewardPlanRequest
from app.services.runtime_chat import RuntimeChatService
STEWARD_OFF_TOPIC_FUNCTION_NAME = "submit_steward_off_topic_response"
STEWARD_OFF_TOPIC_SCENARIO_PROMPTS: dict[str, str] = {
"greeting": (
"用户发起了礼貌问候,例如「你好」「您好」「早上好」。"
"请像管家一样礼貌地回应主人的问候,温和询问主人今天要办理什么业务,"
"并顺势说明小财管家能帮主人整理费用申请和费用报销两类事项。"
"可以再提示主人试试用具体的话术来表达需求。"
),
"off_business": (
"用户说了有意义的话但与财务无关,例如问天气、聊生活、聊工作日常。"
"请温和地告诉主人:这句话里没有识别到财务事项,"
"小财管家目前只能帮主人整理费用申请和费用报销。"
"再请主人用具体的话术告诉小财管家他/她想办什么业务。"
),
"meaningless": (
"用户输入了与财务无关且难以理解的内容,例如纯数字、纯标点、重复字符。"
"请温和地告诉主人:这句话里好像没有出现费用申请、报销、出差、交通、招待这些关键词,"
"请主人换种说法再告诉小财管家一次。"
),
}
@dataclass(frozen=True, slots=True)
class StewardOffTopicAgentResult:
response_text: str
model_call_traces: list[dict[str, Any]]
class StewardOffTopicAgent:
"""使用大模型 function calling 生成小财管家对业务无关输入的多样化引导。"""
def __init__(self, runtime_chat_service: RuntimeChatService) -> None:
self.runtime_chat_service = runtime_chat_service
self.last_call_traces: list[dict[str, Any]] = []
def generate(
self,
request: StewardPlanRequest,
*,
scenario: str,
) -> StewardOffTopicAgentResult | None:
messages = self._build_messages(request, scenario=scenario)
try:
result = self.runtime_chat_service.complete_with_tool_call(
messages,
tools=[self._build_tool_schema()],
tool_choice={
"type": "function",
"function": {"name": STEWARD_OFF_TOPIC_FUNCTION_NAME},
},
max_tokens=400,
temperature=0.7,
timeout_seconds=15,
max_attempts=1,
)
except Exception:
return None
self.last_call_traces = result.calls_as_dicts()
if result.tool_call is None or result.tool_call.name != STEWARD_OFF_TOPIC_FUNCTION_NAME:
return None
arguments = result.tool_call.arguments or {}
response_text = str(arguments.get("response_text") or "").strip()
if not response_text:
return None
return StewardOffTopicAgentResult(
response_text=response_text,
model_call_traces=self.last_call_traces,
)
@staticmethod
def _build_messages(
request: StewardPlanRequest,
*,
scenario: str,
) -> list[dict[str, Any]]:
scenario_hint = STEWARD_OFF_TOPIC_SCENARIO_PROMPTS.get(
scenario,
STEWARD_OFF_TOPIC_SCENARIO_PROMPTS["off_business"],
)
return [
{
"role": "system",
"content": (
"你是 X-Financial 的小财管家,要像一位贴身的管家那样为主人服务。"
"用户输入的内容与财务任务无关,你需要生成一段温和、尊敬的引导回复。"
"要求:\n"
"1. 始终称呼用户为「主人」「您」,不要用「你」。\n"
"2. 语气尊敬、温和、主动,体现管家的服务意识。\n"
"3. 不要每次都用相同的句式,要根据用户的输入和当前场景变化表达。\n"
"4. 控制在 2-4 句,不要过长。\n"
"5. 必须使用 function calling 输出 response_text不要返回普通文本。\n"
"6. response_text 使用 Markdown第一行用 ### 标题,正文与引导句之间留空行。\n"
"7. 如果主人是在问候,先礼貌回应再引导;"
"如果主人说的是非财务话题,温和说明小财管家能做什么;"
"如果主人的内容没有意义,请他/她换种说法。\n"
f"当前场景:{scenario_hint}"
),
},
{
"role": "user",
"content": json.dumps(
{
"message": request.message,
"scenario": scenario,
},
ensure_ascii=False,
),
},
]
@staticmethod
def _build_tool_schema() -> dict[str, Any]:
return {
"type": "function",
"function": {
"name": STEWARD_OFF_TOPIC_FUNCTION_NAME,
"description": (
"提交小财管家对业务无关输入的引导回复。"
"response_text 是完整 Markdown 文本:"
"第一行 ### 标题(基于场景),空行,正文(歉意或回应 + 能力范围 + 引导句)。"
),
"parameters": {
"type": "object",
"properties": {
"response_text": {
"type": "string",
"description": (
"面向用户的引导回复 Markdown 文本。"
"称呼用户为「主人」「您」,不要用「你」。"
),
},
},
"required": ["response_text"],
},
},
}

View File

@@ -21,6 +21,7 @@ from app.services.steward_constants import BUSINESS_CANONICAL_FIELD_ORDER, BUSIN
from app.services.ontology_field_registry import normalize_ontology_form_values
from app.services.steward_intent_agent import StewardIntentAgent
from app.services.steward_model_plan_builder import StewardModelPlanBuilder
from app.services.steward_off_topic_agent import StewardOffTopicAgent
CITY_NAMES = (
@@ -70,6 +71,20 @@ STEWARD_BUSINESS_SIGNAL_KEYWORDS: tuple[str, ...] = (
*CITY_NAMES,
)
# 业务无关输入的场景分类
STEWARD_OFF_TOPIC_SCENARIO_GREETING = "greeting"
STEWARD_OFF_TOPIC_SCENARIO_MEANINGLESS = "meaningless"
STEWARD_OFF_TOPIC_SCENARIO_OFF_BUSINESS = "off_business"
# 问候词:用于将"你好"等礼貌问候单独归类为 greeting 场景
STEWARD_GREETING_KEYWORDS: tuple[str, ...] = (
"你好", "您好", "hi", "hello", "hey", "", "哈喽",
"早上好", "上午好", "中午好", "下午好", "晚上好", "早安", "晚安",
"您好呀", "你好呀", "在吗", "在么", "在不在",
)
APPLICATION_SPLIT_PATTERN = re.compile(r"(?:^|[,。;;])[^,。;;]*?(?:申请|出差申请|差旅申请)[^,。;;]*")
REIMBURSEMENT_PATTERN = re.compile(r"(?:我要报销|还需要报销|需要报销|报销)([^,。;;?!\n]+)")
MONTH_DAY_PATTERN = re.compile(r"(?P<month>\d{1,2})\s*月\s*(?P<day>\d{1,2})\s*(?:日|号)?")
@@ -119,8 +134,13 @@ class PlannedTaskDraft:
class StewardPlannerService:
"""小财管家第一版规划服务:只生成计划,不执行入库类动作。"""
def __init__(self, intent_agent: StewardIntentAgent | None = None) -> None:
def __init__(
self,
intent_agent: StewardIntentAgent | None = None,
off_topic_agent: StewardOffTopicAgent | None = None,
) -> None:
self.intent_agent = intent_agent
self.off_topic_agent = off_topic_agent
def build_plan(self, request: StewardPlanRequest) -> StewardPlanResponse:
message = self._clean_text(request.message)
@@ -129,8 +149,9 @@ class StewardPlannerService:
base_date = self._resolve_base_date(request.client_now_iso, request.context_json)
# 业务无关输入拦截(纯数字、问候、闲聊、乱码等):在进入 LLM/规则兜底之前直接返回 off_topic 计划。
if self._is_business_irrelevant_input(message, request):
return self._build_off_topic_plan(request)
scenario = self._classify_irrelevant_input(message, request)
if scenario is not None:
return self._build_off_topic_plan(request, scenario=scenario)
model_call_traces: list[dict[str, Any]] = []
fallback_reason = ""
if self.intent_agent is not None and self._should_use_model_intent_recognition(message, base_date, request):
@@ -185,48 +206,179 @@ class StewardPlannerService:
@staticmethod
def _is_business_irrelevant_input(message: str, request: StewardPlanRequest) -> bool:
"""判断输入是否与小财管家支持的财务事项完全无关。
"""判断输入是否与小财管家支持的财务事项完全无关(向后兼容包装)
判定规则:消息去除所有空白后不含任何业务信号关键词,且没有上传附件
即视为业务无关输入(如纯数字、问候、闲聊、乱码)
判定规则:消息去除所有空白后不含任何业务信号关键词,且没有上传附件
实际判定逻辑由 _classify_irrelevant_input 负责,命中任何场景即视为业务无关。
"""
return StewardPlannerService._classify_irrelevant_input(message, request) is not None
@staticmethod
def _classify_irrelevant_input(message: str, request: StewardPlanRequest) -> str | None:
"""把业务无关输入细分为三个场景,便于给出更贴切的引导。
返回值:
- "greeting":礼貌问候("你好"等),无业务关键词
- "meaningless":完全无意义内容(纯数字、纯标点、单字符重复、纯字母数字乱码)
- "off_business":有意义但与财务无关(问天气、聊生活等)
- None消息与业务相关无需走 off_topic 路径
"""
if request.attachments:
return False
return None
compact = re.sub(r"\s+", "", message)
if not compact:
return False
return not any(keyword in compact for keyword in STEWARD_BUSINESS_SIGNAL_KEYWORDS)
return None
if any(keyword in compact for keyword in STEWARD_BUSINESS_SIGNAL_KEYWORDS):
return None
if StewardPlannerService._looks_like_greeting(compact):
return STEWARD_OFF_TOPIC_SCENARIO_GREETING
if StewardPlannerService._looks_like_meaningless(compact):
return STEWARD_OFF_TOPIC_SCENARIO_MEANINGLESS
return STEWARD_OFF_TOPIC_SCENARIO_OFF_BUSINESS
@staticmethod
def _looks_like_greeting(compact_message: str) -> bool:
"""判断消息是否只是礼貌问候(无其他有意义内容)。"""
normalized = compact_message.lower()
for keyword in STEWARD_GREETING_KEYWORDS:
if normalized == keyword.lower() or normalized.startswith(keyword.lower()):
# 整句只是问候词(允许少量标点)
tail = normalized[len(keyword.lower()):]
if not tail or re.fullmatch(r"[!。.?,~\s]+", tail):
return True
return False
@staticmethod
def _looks_like_meaningless(compact_message: str) -> bool:
"""判断消息是否完全没有语义价值(纯数字、纯标点、单字符重复等)。"""
if re.fullmatch(r"\d+", compact_message):
return True
# 纯标点
if re.fullmatch(r"[\W_]+", compact_message):
return True
# 单字符重复(例如 "啊啊啊啊啊"
if len(compact_message) >= 2 and len(set(compact_message)) == 1:
return True
# 短字母数字组合但没有任何业务意义,例如 "abc"、"test123"
# 注意:必须排除已经被关键词命中的情况(前面的判定已保证不命中关键词)
if re.fullmatch(r"[a-zA-Z0-9]+", compact_message) and len(compact_message) <= 12:
return True
return False
def _build_off_topic_plan(
self,
request: StewardPlanRequest,
*,
scenario: str,
) -> StewardPlanResponse:
"""业务无关输入的兜底计划根据场景给出对应引导off_business 场景可由 LLM 增强。"""
base_summary = self._default_off_topic_summary(scenario)
thinking_event = self._build_off_topic_thinking_event(scenario)
suggested_prompts = self._off_topic_suggested_prompts(scenario)
model_call_traces: list[dict[str, Any]] = []
# 仅对 off_business 场景尝试让 LLM 生成多样化引导;问候/无意义场景用规则模板即可。
if (
scenario == STEWARD_OFF_TOPIC_SCENARIO_OFF_BUSINESS
and self.off_topic_agent is not None
):
try:
llm_result = self.off_topic_agent.generate(request, scenario=scenario)
if llm_result is not None and llm_result.response_text:
base_summary = llm_result.response_text
model_call_traces = llm_result.model_call_traces
except Exception:
# 失败时静默回退到规则模板
pass
def _build_off_topic_plan(self, request: StewardPlanRequest) -> StewardPlanResponse:
"""业务无关输入的兜底计划:明确告知用户未识别到财务事项,并给出话术示例。"""
return StewardPlanResponse(
plan_id=f"steward_plan_{uuid.uuid4().hex[:12]}",
plan_status="off_topic",
planning_source="rule_fallback",
next_action="none",
summary="这看起来跟财务任务没什么关系,小财管家没识别到费用申请或费用报销的意图。",
thinking_events=[
StewardThinkingEvent(
event_id="intent_agent_off_topic",
stage="off_topic",
title="未识别到财务事项",
content=(
"我检查了这句话,没有发现费用申请、报销、出差、交通、招待等财务线索。"
"如果你确实是要处理财务任务,可以参考下面的示例换一种说法。"
),
)
],
summary=base_summary,
thinking_events=[thinking_event],
tasks=[],
attachment_groups=[],
confirmation_groups=[],
candidate_flows=[],
suggested_prompts=[
suggested_prompts=suggested_prompts,
model_call_traces=model_call_traces,
)
@staticmethod
def _default_off_topic_summary(scenario: str) -> str:
"""off_topic 场景的默认引导文案LLM 不可用时使用。"""
if scenario == STEWARD_OFF_TOPIC_SCENARIO_GREETING:
return (
"### 您好主人,很高兴为您服务\n\n"
"请问您今天要办理什么业务?目前小财管家能帮您整理"
"**费用申请**和**费用报销**这两类事项。\n\n"
"要不您换种说法告诉我:"
)
if scenario == STEWARD_OFF_TOPIC_SCENARIO_OFF_BUSINESS:
return (
"### 抱歉主人,这句话我暂时帮不上忙\n\n"
"我看了您刚才说的这句话,里面聊的不是财务事项。"
"小财管家目前只能帮您整理**费用申请**和**费用报销**这两类业务。\n\n"
"要不您换种说法告诉我:"
)
# meaningless
return (
"### 这句话我暂时没识别到财务事项\n\n"
"很抱歉主人,目前小财管家只能帮您整理**费用申请**和**费用报销**这两类事项。\n\n"
"要不您换种说法告诉我:"
)
@staticmethod
def _build_off_topic_thinking_event(scenario: str) -> StewardThinkingEvent:
"""off_topic 场景下向用户展示的思考过程摘要。"""
if scenario == STEWARD_OFF_TOPIC_SCENARIO_GREETING:
return StewardThinkingEvent(
event_id="intent_agent_off_topic_greeting",
stage="off_topic",
title="先回应主人的问候",
content="主人向我打了个招呼,我先礼貌回应一下,再引导他/她说出具体想办什么业务。",
)
if scenario == STEWARD_OFF_TOPIC_SCENARIO_OFF_BUSINESS:
return StewardThinkingEvent(
event_id="intent_agent_off_topic_non_business",
stage="off_topic",
title="这句话不在服务范围内",
content="我看了您刚才说的这句话,里面聊的不是财务事项。小财管家目前只能帮您整理费用申请和费用报销。",
)
return StewardThinkingEvent(
event_id="intent_agent_off_topic_meaningless",
stage="off_topic",
title="未识别到财务事项",
content=(
"我仔细看了看您刚才说的这句话,里面好像没有出现"
"费用申请、报销、出差、交通、招待这些财务关键词。"
),
)
@staticmethod
def _off_topic_suggested_prompts(scenario: str) -> list[str]:
"""off_topic 场景下展示给用户的推荐话术。"""
if scenario == STEWARD_OFF_TOPIC_SCENARIO_GREETING:
return [
"我想要申请明天去北京出差3天支撑客户现场实施",
"我要报销昨天的交通费",
"报销上周出差上海的费用",
],
model_call_traces=[],
)
"上周出差上海的费用需要报销",
]
if scenario == STEWARD_OFF_TOPIC_SCENARIO_OFF_BUSINESS:
return [
"我想要申请明天去北京出差3天支撑客户现场实施",
"我要报销昨天的交通费",
"我需要整理上周出差的发票",
]
# meaningless
return [
"我想要申请明天去北京出差3天支撑客户现场实施",
"我要报销昨天的交通费",
"我上周出差去上海的费用需要报销",
]
def _build_rule_fallback_plan(
self,
@@ -287,10 +439,12 @@ class StewardPlannerService:
planning_source: str = "rule_fallback",
) -> StewardPlanResponse:
candidates = self._build_rule_candidate_flows(request, base_date)
gate = self._resolve_required_application_gate(request, "travel")
pending_reason = self._build_pending_flow_reason(gate)
pending = StewardPendingFlowConfirmation(
status="pending",
source_message=request.message,
reason="当前话术描述了出差事项,但没有明确说明要补办申请还是发起报销。",
reason=pending_reason,
candidate_flows=candidates,
)
thinking_events = []
@@ -308,7 +462,7 @@ class StewardPlannerService:
event_id="intent_pending_flow_confirmation",
stage="flow_confirmation",
title="需要确认流程方向",
content="我识别到时间、地点和出差事由,但没有识别到明确的申请或报销动作,需要先请你选择流程方向。",
content=pending_reason,
)
)
return StewardPlanResponse(
@@ -316,10 +470,7 @@ class StewardPlannerService:
plan_status="needs_flow_confirmation",
planning_source=planning_source, # type: ignore[arg-type]
next_action="confirm_flow",
summary=(
"我识别到这是一次出差事项,但还不能确定你要做的是"
"**补办出差申请**还是**发起费用报销**。请先选择一个方向。"
),
summary=self._build_pending_flow_summary(gate),
thinking_events=thinking_events,
pending_flow_confirmation=pending,
candidate_flows=candidates,
@@ -343,6 +494,24 @@ class StewardPlannerService:
base_date,
request,
)
gate = self._resolve_required_application_gate(request, "travel")
if gate.get("checked") and int(gate.get("candidate_count") or 0) <= 0:
return [
StewardCandidateFlow(
flow_id="travel_application",
label="先发起出差申请",
confidence=0.86,
reason="已先查询你名下可关联的差旅申请单,暂未查到可关联单据,因此应先申请单据。",
ontology_fields=application_fields,
missing_fields=self._resolve_missing_fields("expense_application", application_fields),
)
]
reimbursement_label = "发起费用报销"
reimbursement_reason = "用户描述的也可能是已发生出差事项,需要进入报销材料整理。"
if gate.get("checked"):
candidate_count = int(gate.get("candidate_count") or 0)
reimbursement_label = "关联已有申请单并发起报销"
reimbursement_reason = f"已先查到 {candidate_count} 个可关联申请单,选择后会先请你关联具体单据。"
return [
StewardCandidateFlow(
flow_id="travel_application",
@@ -354,14 +523,60 @@ class StewardPlannerService:
),
StewardCandidateFlow(
flow_id="travel_reimbursement",
label="发起费用报销",
label=reimbursement_label,
confidence=0.48,
reason="用户描述的也可能是已发生出差事项,需要进入报销材料整理。",
reason=reimbursement_reason,
ontology_fields=reimbursement_fields,
missing_fields=self._resolve_missing_fields("reimbursement", reimbursement_fields),
),
]
@staticmethod
def _resolve_required_application_gate(
request: StewardPlanRequest,
expense_type: str,
) -> dict[str, Any]:
context = request.context_json if isinstance(request.context_json, dict) else {}
gates = context.get("required_application_gate")
if not isinstance(gates, dict):
return {}
gate = gates.get(expense_type)
if not isinstance(gate, dict) or not gate.get("checked"):
return {}
try:
candidate_count = max(0, int(gate.get("candidate_count") or 0))
except (TypeError, ValueError):
candidate_count = 0
return {
**gate,
"candidate_count": candidate_count,
"checked": True,
}
@staticmethod
def _build_pending_flow_reason(gate: dict[str, Any]) -> str:
if gate.get("checked") and int(gate.get("candidate_count") or 0) <= 0:
return "我已经先查询你名下可关联的差旅申请单,未查到可关联单据,所以当前应先申请单据。"
if gate.get("checked"):
candidate_count = int(gate.get("candidate_count") or 0)
return f"我已经先查询你名下的差旅申请单,查到 {candidate_count} 个可关联申请单,需要你确认是否关联单据后发起报销。"
return "当前话术描述了出差事项,但没有明确说明要补办申请还是发起报销。"
@staticmethod
def _build_pending_flow_summary(gate: dict[str, Any]) -> str:
if gate.get("checked") and int(gate.get("candidate_count") or 0) <= 0:
return "我已先查询可关联申请单,暂未查到可关联单据;这次应先申请单据,再进入后续报销。"
if gate.get("checked"):
candidate_count = int(gate.get("candidate_count") or 0)
return (
f"我已先查询可关联申请单,查到 {candidate_count} 个可关联申请单;"
"你可以选择关联已有申请单发起报销,或改为补办新的出差申请。"
)
return (
"我识别到这是一次出差事项,但还不能确定你要做的是"
"**补办出差申请**还是**发起费用报销**。请先选择一个方向。"
)
def _extract_task_drafts(self, message: str) -> list[PlannedTaskDraft]:
drafts: list[PlannedTaskDraft] = []
first_reimbursement = self._find_first_reimbursement_index(message)
@@ -610,9 +825,11 @@ class StewardPlannerService:
return StewardPlannerService._strip_trailing_connectors(match.group(0))
reason = re.sub(r"^.*?(?:出差|差旅)", "", cleaned).strip(",。;;的费用")
return StewardPlannerService._strip_trailing_connectors(reason) or cleaned
cleaned = re.sub(r"^报销", "", cleaned)
cleaned = re.sub(r"^(?:我想要|我想|我要|还需要|需要|请帮我|帮我)?报销", "", cleaned)
if not cleaned or cleaned in {"费用", "报销单", "报销流程"}:
return ""
cleaned = re.sub(r"^(?:昨天|前天|明天|后天|\d{1,2}月\d{1,2}(?:日|号)?)的?", "", cleaned)
return cleaned.strip(",。;; ") or segment.strip()
return cleaned.strip(",。;; ")
@staticmethod
def _strip_trailing_connectors(value: str) -> str:

View File

@@ -86,6 +86,25 @@ APPLICATION_TRANSPORT_KEYWORDS = {
"火车": ("火车", "高铁", "动车", "铁路", "列车"),
"轮船": ("轮船", "", "客轮", "邮轮", "坐船"),
}
APPLICATION_TYPE_DISPLAY_LABELS = {
"travel": "差旅费用申请",
"travel_application": "差旅费用申请",
"expense_application": "费用申请",
"application": "费用申请",
"transport": "交通费用申请",
"transportation": "交通费用申请",
"traffic": "交通费用申请",
"hotel": "住宿费用申请",
"accommodation": "住宿费用申请",
"meeting": "会务费用申请",
"conference": "会务费用申请",
"purchase": "采购费用申请",
"procurement": "采购费用申请",
"training": "培训费用申请",
"business_entertainment": "业务招待申请",
"entertainment": "业务招待申请",
"office": "办公费用申请",
}
APPLICATION_REASON_VERBS = (
"支撑",
"支持",
@@ -316,6 +335,7 @@ class UserAgentApplicationMixin:
if value:
facts[key] = value
facts["application_type"] = self._normalize_application_type_label(facts.get("application_type", ""))
context_json = payload.context_json or {}
context_time = self._resolve_application_time_from_context(context_json)
if context_time and self._should_prefer_context_application_time(facts.get("time", ""), context_time):
@@ -476,7 +496,9 @@ class UserAgentApplicationMixin:
reason = UserAgentApplicationMixin._cleanup_application_reason_candidate(pick("reason"))
return {
"application_type": pick("applicationType", "application_type"),
"application_type": UserAgentApplicationMixin._normalize_application_type_label(
pick("applicationType", "application_type")
),
"time": pick("time", "timeRange", "time_range"),
"location": pick("location"),
"reason": reason,
@@ -842,10 +864,39 @@ class UserAgentApplicationMixin:
@staticmethod
def _resolve_application_type_from_text(message: str) -> str:
return UserAgentApplicationMixin._resolve_application_labeled_value(
return UserAgentApplicationMixin._normalize_application_type_label(
UserAgentApplicationMixin._resolve_application_labeled_value(
message,
("申请类型", "费用类型"),
)
)
@staticmethod
def _normalize_application_type_label(value: object, fallback: str = "") -> str:
raw_value = str(value or "").strip()
if not raw_value:
return str(fallback or "").strip()
normalized_key = raw_value.lower()
if normalized_key in APPLICATION_TYPE_DISPLAY_LABELS:
return APPLICATION_TYPE_DISPLAY_LABELS[normalized_key]
if re.fullmatch(r"(差旅费|差旅|出差)", raw_value):
return "差旅费用申请"
if re.fullmatch(r"(交通费|交通)", raw_value):
return "交通费用申请"
if re.fullmatch(r"(住宿费|住宿|酒店)", raw_value):
return "住宿费用申请"
if re.fullmatch(r"(会务|会议|会务费)", raw_value):
return "会务费用申请"
if re.fullmatch(r"(采购|采购费|办公用品)", raw_value):
return "采购费用申请"
if raw_value.endswith("费用申请") or raw_value.endswith("申请"):
return raw_value
if raw_value.endswith("费用"):
return f"{raw_value}申请"
if raw_value.endswith(""):
return f"{raw_value[:-1]}费用申请"
return raw_value
@staticmethod
def _resolve_application_missing_slots(payload: UserAgentRequest) -> list[str]:

View File

@@ -1,10 +1,18 @@
from __future__ import annotations
import json
from datetime import UTC, datetime
from decimal import Decimal
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import StaticPool
from app.api.deps import get_db
from app.db.base import Base
from app.main import create_app
from app.models.financial_record import ExpenseClaim
from app.schemas.steward import StewardAttachmentInput, StewardPlanRequest
from app.services.steward_intent_agent import StewardIntentAgentResult
from app.services.steward_planner import StewardPlannerService
@@ -226,6 +234,61 @@ class AmbiguousApplicationFunctionCallingIntentAgent:
)
def _create_steward_test_client_with_db():
engine = create_engine(
"sqlite+pysqlite:///:memory:",
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
Base.metadata.create_all(bind=engine)
TestingSessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False)
app = create_app()
def override_db():
db = TestingSessionLocal()
try:
yield db
finally:
db.close()
app.dependency_overrides[get_db] = override_db
return TestClient(app), TestingSessionLocal, app
def _build_endpoint_application_claim(
*,
claim_no: str = "AP-202602-001",
employee_name: str = "张小青",
status: str = "approved",
) -> ExpenseClaim:
return ExpenseClaim(
id=claim_no.lower().replace("-", "_"),
claim_no=claim_no,
employee_name=employee_name,
department_name="产品交付部",
expense_type="travel_application",
reason="辅助国网仿生产服务器部署",
location="上海",
amount=Decimal("1800.00"),
currency="CNY",
invoice_count=0,
occurred_at=datetime(2026, 2, 20, tzinfo=UTC),
submitted_at=datetime(2026, 2, 19, tzinfo=UTC),
status=status,
approval_stage="关联单据状态",
risk_flags_json=[
{
"source": "application_detail",
"application_detail": {
"application_business_time": "2026-02-20 至 2026-02-23",
"location": "上海",
"reason": "辅助国网仿生产服务器部署",
},
}
],
)
def test_steward_planner_uses_llm_function_calling_plan_when_available() -> None:
payload = StewardPlanRequest(
message="\u6211\u60f3\u7533\u8bf7\u0037\u6708\u0032\u65e5\u53bb\u5317\u4eac\u51fa\u5dee\uff0c\u5e76\u4e14\u6211\u8981\u62a5\u9500\u6628\u5929\u5ba2\u6237\u73b0\u573a\u6c9f\u901a\u7684\u4ea4\u901a\u8d39",
@@ -393,6 +456,61 @@ def test_steward_planner_rule_fallback_confirms_ambiguous_travel_flow() -> None:
assert result.confirmation_groups == []
def test_steward_planner_prefers_application_when_checked_required_application_missing() -> None:
payload = StewardPlanRequest(
message="2月20-23日去上海出差辅助国网仿生产服务器部署",
client_now_iso="2026-06-15T09:30:00+08:00",
context_json={
"required_application_gate": {
"travel": {
"checked": True,
"candidate_count": 0,
"candidates": [],
}
}
},
)
result = StewardPlannerService(intent_agent=EmptyFunctionCallingIntentAgent()).build_plan(payload)
assert result.planning_source == "rule_fallback"
assert result.next_action == "confirm_flow"
assert result.pending_flow_confirmation.status == "pending"
assert [item.flow_id for item in result.candidate_flows] == ["travel_application"]
assert result.candidate_flows[0].label == "先发起出差申请"
assert "未查到可关联" in result.pending_flow_confirmation.reason
assert "先申请" in result.summary
def test_steward_planner_asks_to_link_application_when_checked_required_application_exists() -> None:
payload = StewardPlanRequest(
message="2月20-23日去上海出差辅助国网仿生产服务器部署",
client_now_iso="2026-06-15T09:30:00+08:00",
context_json={
"required_application_gate": {
"travel": {
"checked": True,
"candidate_count": 2,
"candidates": [
{"claim_no": "AP-202602-001"},
{"claim_no": "AP-202602-002"},
],
}
}
},
)
result = StewardPlannerService(intent_agent=EmptyFunctionCallingIntentAgent()).build_plan(payload)
assert [item.flow_id for item in result.candidate_flows] == [
"travel_application",
"travel_reimbursement",
]
assert result.candidate_flows[1].label == "关联已有申请单并发起报销"
assert "查到 2 个可关联申请单" in result.pending_flow_confirmation.reason
assert "关联已有申请单" in result.summary
def test_steward_planner_splits_application_and_reimbursement_tasks() -> None:
payload = StewardPlanRequest(
message=(
@@ -423,6 +541,24 @@ def test_steward_planner_splits_application_and_reimbursement_tasks() -> None:
assert all(action.status == "pending" for action in result.confirmation_groups)
def test_steward_planner_keeps_bare_reimbursement_intent_generic() -> None:
payload = StewardPlanRequest(
message="我要报销",
user_id="u001",
client_now_iso="2026-06-04T09:30:00+08:00",
)
result = StewardPlannerService().build_plan(payload)
assert len(result.tasks) == 1
task = result.tasks[0]
assert task.task_type == "reimbursement"
assert task.assigned_agent == "reimbursement_assistant"
assert task.ontology_fields.get("expense_type") == "other"
assert "reason" not in task.ontology_fields
assert task.missing_fields == ["time_range", "reason"]
def test_steward_planner_treats_future_travel_without_apply_word_as_application() -> None:
payload = StewardPlanRequest(
message="明天出差北京3天支撑国网仿生产部署并且报销昨天业务招待费",
@@ -549,6 +685,59 @@ def test_steward_plan_endpoint_persists_application_and_reimbursement_state() ->
assert all("invented_field" not in flow["fields"] for flow in state["flows"].values())
def test_steward_plan_endpoint_queries_applications_before_ambiguous_travel_choice() -> None:
client, SessionLocal, app = _create_steward_test_client_with_db()
try:
response = client.post(
"/api/v1/steward/plans",
json={
"message": "2月20-23日去上海出差辅助国网仿生产服务器部署",
"user_id": "zhang.xiaoqing",
"client_now_iso": "2026-06-15T09:30:00+08:00",
"context_json": {
"session_type": "steward",
"entry_source": "workbench_ai_inline",
"name": "张小青",
"username": "zhang.xiaoqing",
},
},
)
assert response.status_code == 200
payload = response.json()
assert [item["flow_id"] for item in payload["candidate_flows"]] == ["travel_application"]
assert payload["candidate_flows"][0]["label"] == "先发起出差申请"
assert "未查到可关联单据" in payload["pending_flow_confirmation"]["reason"]
with SessionLocal() as db:
db.add(_build_endpoint_application_claim())
db.commit()
response = client.post(
"/api/v1/steward/plans",
json={
"message": "2月20-23日去上海出差辅助国网仿生产服务器部署",
"user_id": "zhang.xiaoqing",
"client_now_iso": "2026-06-15T09:30:00+08:00",
"context_json": {
"session_type": "steward",
"entry_source": "workbench_ai_inline",
"name": "张小青",
"username": "zhang.xiaoqing",
},
},
)
assert response.status_code == 200
payload = response.json()
assert [item["flow_id"] for item in payload["candidate_flows"]] == [
"travel_application",
"travel_reimbursement",
]
assert payload["candidate_flows"][1]["label"] == "关联已有申请单并发起报销"
assert "查到 1 个可关联申请单" in payload["pending_flow_confirmation"]["reason"]
finally:
app.dependency_overrides.clear()
def test_steward_planner_returns_off_topic_for_business_irrelevant_input() -> None:
payload = StewardPlanRequest(
message="123",
@@ -566,9 +755,11 @@ def test_steward_planner_returns_off_topic_for_business_irrelevant_input() -> No
assert result.planning_source == "rule_fallback"
assert len(result.suggested_prompts) == 3
assert result.thinking_events[0].stage == "off_topic"
# 纯数字应归类为 meaningless 场景
assert "未识别到财务事项" in result.thinking_events[0].title
def test_steward_planner_returns_off_topic_for_pure_greeting() -> None:
def test_steward_planner_returns_off_topic_with_friendly_greeting_reply() -> None:
payload = StewardPlanRequest(
message="你好",
client_now_iso="2026-06-04T09:30:00+08:00",
@@ -582,7 +773,10 @@ def test_steward_planner_returns_off_topic_for_pure_greeting() -> None:
assert result.candidate_flows == []
assert result.planning_source == "rule_fallback"
assert len(result.suggested_prompts) == 3
assert result.thinking_events[0].stage == "off_topic"
# 问候场景应礼貌回应主人,不使用"抱歉/没识别到"等生硬措辞
assert "您好主人" in result.summary
assert "很高兴为您服务" in result.summary
assert "先回应主人的问候" in result.thinking_events[0].title
def test_steward_planner_returns_off_topic_for_pure_punctuation() -> None:
@@ -602,6 +796,86 @@ def test_steward_planner_returns_off_topic_for_pure_punctuation() -> None:
assert result.thinking_events[0].stage == "off_topic"
def test_steward_planner_returns_off_topic_for_off_business_with_llm_response() -> None:
"""有内容但与业务无关的场景:应优先使用 LLM 生成的引导文案。"""
llm_text = (
"### 抱歉主人,这句话我暂时帮不上忙\n\n"
"主人聊的是天气,目前小财管家只能帮您整理**费用申请**和**费用报销**。"
"要不您把想办的财务事项告诉我?"
)
class _FakeOffTopicAgent:
def __init__(self) -> None:
self.calls = 0
self.last_call_traces: list[dict[str, object]] = []
def generate(self, request, *, scenario):
self.calls += 1
from app.services.steward_off_topic_agent import StewardOffTopicAgentResult
return StewardOffTopicAgentResult(
response_text=llm_text,
model_call_traces=[{"slot": "main", "status": "succeeded", "model": "gpt-test"}],
)
agent = _FakeOffTopicAgent()
payload = StewardPlanRequest(
message="想问候您一下",
client_now_iso="2026-06-04T09:30:00+08:00",
)
result = StewardPlannerService(off_topic_agent=agent).build_plan(payload)
assert agent.calls == 1
assert result.plan_status == "off_topic"
assert result.summary == llm_text
assert result.model_call_traces and result.model_call_traces[0]["status"] == "succeeded"
# 思考事件应是 off_business 场景对应文案
assert "不在服务范围内" in result.thinking_events[0].title
def test_steward_planner_falls_back_to_template_when_off_topic_agent_raises() -> None:
"""LLM 失败时静默 fallback 到规则模板,不阻断业务无关拦截。"""
class _ExplodingOffTopicAgent:
def generate(self, request, *, scenario):
raise RuntimeError("模型供应商不可用")
payload = StewardPlanRequest(
message="想问候您一下",
client_now_iso="2026-06-04T09:30:00+08:00",
)
result = StewardPlannerService(off_topic_agent=_ExplodingOffTopicAgent()).build_plan(payload)
assert result.plan_status == "off_topic"
# 仍使用 off_business 场景的默认模板
assert "抱歉主人" in result.summary
assert "不在服务范围内" in result.thinking_events[0].title
assert result.model_call_traces == []
def test_steward_planner_skips_off_topic_agent_for_greeting_and_meaningless() -> None:
"""问候与无意义场景不走 LLM节省调用。"""
class _CallCounterOffTopicAgent:
def __init__(self) -> None:
self.calls = 0
def generate(self, request, *, scenario):
self.calls += 1
return None
agent = _CallCounterOffTopicAgent()
service = StewardPlannerService(off_topic_agent=agent)
for message in ("你好", "123", "???"):
result = service.build_plan(StewardPlanRequest(message=message))
assert result.plan_status == "off_topic"
assert agent.calls == 0
def test_steward_planner_preserves_normal_business_flow_after_guard() -> None:
payload = StewardPlanRequest(
message="我要报销昨天的交通费",

View File

@@ -753,6 +753,33 @@ def test_user_agent_application_submit_blocks_overlapping_travel_dates() -> None
assert response.draft_payload is None
def test_user_agent_application_maps_preview_travel_type_label() -> None:
session_factory = build_session_factory()
with session_factory() as db:
response = build_application_user_agent_response(
db,
"申请出差2月20-23日上海火车",
context_overrides={
"name": "曹笑竹",
"department_name": "技术部",
"grade": "P5",
"application_preview": {
"fields": {
"applicationType": "travel",
"time": "2026-02-20 至 2026-02-23",
"location": "上海市",
"reason": "",
"days": "4天",
"transportMode": "火车",
}
},
},
)
assert "| 申请类型 | 差旅费用申请 |" in response.answer
assert "| 申请类型 | travel |" not in response.answer
def test_user_agent_application_edit_resubmits_returned_application_claim() -> None:
session_factory = build_session_factory()
with session_factory() as db: