2 Commits

Author SHA1 Message Date
caoxiaozhu
606a88c805 chore: stewardPlanModel 适配注册表动作结构并更新规则表与日志
- stewardPlanModel 适配新的意图注册表动作步骤结构
- 更新交通/通信/差旅等财务规则表,补 2026-06-25 work-log
2026-06-25 11:50:11 +08:00
caoxiaozhu
eaada4bc57 refactor(server): steward 意图改用声明式注册表编排
- 新增 steward_intent_registry,IntentDescriptor 统一描述意图的识别关键词、动作步骤构建、字段白名单与副作用集合,替代分散的 if/else
- 新增 steward_intent_bootstrap 注册 expense_application 等意图;新增 steward_query_executors 提供差旅标准查询的无副作用执行与城市/席别标签化输出
- action_contracts/action_executor/graph_planner/intent_agent/model_plan_builder/planner_extraction/fallback 适配注册表,识别与执行分发自动从注册表取数
- 新增 intent_registry/query_executors 测试,更新 intent_agent 测试
2026-06-25 11:50:02 +08:00
23 changed files with 1055 additions and 57 deletions

View File

@@ -28,15 +28,40 @@
- 影响:模型给出低置信度差旅申请意图时不再直接建预览,先反问确认;闲聊类输入不再误触发模型规划,响应更快、减轻后端压力。
- 局限:`agent-change-log` Skill 在当前环境不可调用,已按 AGENTS.md 规范手动增量更新本日志。
- 22:40我落地了注册表驱动的意图插槽架构,让新增意图从「改 6+ 处硬编码」降到「写一个描述符 + 执行函数 + 注册」,并端到端跑通了「查差旅标准」查询意图作为样板。
- Git 提交检查:`git fetch --all --prune` 后本地与 origin/main 同步(不 ahead 不 behind);工作区有本次新增/修改的后端文件。
- 背景:排查确认旧架构里 `task_type`/`assigned_agent`/`flow_id` 在 schema(Literal)、function call schema(enum)、model_plan_builder(白名单)、action_contracts(if/else)、action_executor(if/elif)五层硬编码,加一个意图要同步改 6+ 处,完全没有扩展点;且"查差旅标准"这类查询意图无任何位置(task_type enum 只有 expense_application/reimbursement)。
- 修改①(注册表核心):新建 `steward_intent_registry.py`——`IntentDescriptor` 声明 task_type/assigned_agent/signal_keywords/ontology_field_allowlist/action_steps_builder/executor/flow_id/prompt_fragment;新建 `steward_intent_bootstrap.py` 在 import 时注册 3 个意图(expense_application/reimbursement/query_travel_standard)。
- 修改②(schema 放宽):`schemas/steward.py``StewardTaskType`/`StewardAssignedAgent`/`StewardActionType`/`StewardFlowId` 从 Literal 改为 str,运行时校验下沉到 registry,让 schema 不再是扩展拦路虎。
- 修改③(执行分发):`steward_action_executor.py``execute()` 从 if/elif 链改为优先查 registry(`resolve_intent_by_action`)委托 executor;新增 `_dispatch_application_action`/`_dispatch_reimbursement_action` 分发入口;`SUPPORTED_ACTIONS`/`NOOP_ACTIONS` 改为与 registry 聚合(`all_side_effect_actions`/`all_noop_actions`)。
- 修改④(动作生成):`steward_action_contracts.py``build_task_action_steps` 改为查 registry 的 `action_steps_builder`;原 `_build_application_steps`/`_build_reimbursement_steps` 改公开供 registry 引用。
- 修改⑤(function schema 动态化):`steward_intent_agent.py` 的 task_type/flow_id enum 改为 `all_task_types()`/`all_flow_ids()` 动态生成;system prompt 改为从 registry 拼接意图列表 + 每个 intent 的 prompt_fragment。
- 修改⑥(白名单放开):`steward_model_plan_builder.py` 的 task_type 白名单改 `get_intent`;assigned_agent/flow_id/字段过滤全部改 registry 驱动;`_sanitize_model_ontology_fields`/`_sanitize_model_missing_fields` 改 per-task_type allowlist(`field_allowlist_for`);查询类意图(flow_id=None)跳过必填字段推断。
- 修改⑦(查询执行器):新建 `steward_query_executors.py`——`build_travel_standard_query_steps` 生成单步无副作用动作;`execute_travel_standard_query` 从槽位取 location/employee_grade/standard_category,复用 `DEFAULT_TRAVEL_POLICY_CONFIG` 按职级×城市分级查住宿/交通标准,拼装 Markdown 回复(补助标准因未纳入运行时配置用占位说明)。
- 修改⑧(门控适配):`steward_planner_extraction.py``_looks_like_ambiguous_travel_flow` 加查询信号词前置判断(命中查询意图直接返回 False,不走候选流程);`_build_task` 的 task_id/assigned_agent/label 改 registry 驱动;`steward_planner_fallback.py``_classify_irrelevant_input` 补充 registry signal_keywords 判断(避免查询类输入被判 off_topic);`steward_graph_planner.py` import bootstrap 触发注册。
- 修改⑨(前端):`stewardPlanModel.js``TASK_TYPE_LABELS`/`AGENT_LABELS`/`EXECUTABLE_STEWARD_ACTION_TYPES` 加 query_travel_standard/execute_travel_standard_query/policy_query_assistant。
- 验证:后端全量 steward 测试 **72 passed**(含新增 14 个:registry 7 + query executor 7);前端意图测试 **28 passed**;既有申请/报销/规划/动作执行/槽位决策链路全部无回归。
- 容器:容器名为 `x-financial-local-linux`(非 `local-x-financial-linux`),已运行 19 小时;后端测试在该容器内执行,venv 在 `/tmp/x-financial-server-venv`
- 影响:现在加一个新意图(如查报销进度、查预算执行)只需:① 写 `IntentDescriptor` 声明 task_type/槽位/信号词/executor;② 注册进 bootstrap;③ 写执行函数。function schema、动作生成、执行分发、字段过滤、门控全部自动适配,零硬编码改动。
- 局限:补助标准(allowance)尚未接入运行时配置,查询时返回占位说明;前端查询结果当前以 Markdown 消息展示,未做卡片化;registry 只在后端,前端 task_type 分发仍是 `resolveNextActionContext` 的 if/else(本次只加了 query 分支,未全面注册表化)。
## 遗留问题
- 09:18官方 `quick_validate.py` 仍因当前 Python 环境缺少 `PyYAML` 无法运行,已用 frontmatter、必需文件、占位符和 diff check 做人工兜底。建议后续统一为 skill 校验脚本补齐依赖或增加无 PyYAML 的轻量校验路径。
- 09:23当前环境没有找到 Skill Creator 的 `quick_validate.py` 脚本文件本体,因此本次继续采用人工兜底校验。建议后续恢复系统 Skill Creator 脚本路径,或把轻量校验脚本纳入仓库级工具。
- 21:30`expense-application-fast-preview.test.mjs` 仍有 12 个既有失败(文案「小财管家」「此意图系统不支持」与 markdown 表格整块渲染相关),与本次意图门控改动无关,建议单独排查。
- 21:30本次未纳入范围的三项已记录时间过滤维度扩展仅支持 N天前/昨天/今天)、排除词两处重复维护、`handleInlineDraftDeletionIntent` 命名与职责不符,建议后续分批处理。
- 22:40补助标准(allowance)未纳入 `DEFAULT_TRAVEL_POLICY_CONFIG`,查询差旅标准时住宿/交通有确定数值,补助只返回占位说明。补助数据在 `server/rules/finance-rules/出差补助标准.xlsx`,后续需把补助标准接入运行时配置并在 `resolve_travel_standard_snapshot` 补全。
- 22:40前端查询结果当前以普通 Markdown 消息展示,没有像申请预览那样的卡片化视图;查询意图的前端分发仍是 `resolveNextActionContext` 的 if/else,未全面注册表化(本次只加了 query 分支)。
- 22:40`server/rules/finance-rules/` 下有两个 Excel(交通工具等级标准、交通费用预估表)被标记为 modified,疑似容器运行时产物,非本次代码改动,未处理。
- 22:40`agent-change-log` Skill 在当前环境不可调用,已按 AGENTS.md 规范手动增量更新本日志。
## TODO
- [ ]`quick_validate.py` 准备稳定运行环境,避免后续新增 Skill 时继续依赖人工兜底。来源09:18 技能校验)
- [ ] 排查 `expense-application-fast-preview.test.mjs` 的 12 个既有失败(小财管家文案 / 表格整块渲染来源21:30 意图门控加固)
- [ ] 评估意图门控剩余三项:时间过滤维度扩展、排除词常量抽取、`handleInlineDraftDeletionIntent` 重命名。来源21:30 意图门控加固)
- [ ] 把补助标准(`出差补助标准.xlsx`)接入运行时配置,补全 `resolve_travel_standard_snapshot` 的 allowance 数值查询。来源22:40 注册表架构)
- [ ] 前端查询结果卡片化,并把前端 task_type 分发也改成注册表驱动。来源22:40 注册表架构)
- [ ] 在真实 LLM 连通环境下,用「我去武汉出差的住宿标准是多少」端到端验证查询意图识别→执行→回复。来源22:40 注册表架构)
- [ ] 评估是否需要把 LangGraph 迁移 Phase 5(trace UI、legacy 收敛)与注册表架构的查询意图 trace 打通。来源22:40 注册表架构)

View File

@@ -4,8 +4,8 @@ from typing import Any, Literal
from pydantic import BaseModel, Field
StewardTaskType = Literal["expense_application", "reimbursement"]
StewardAssignedAgent = Literal["application_assistant", "reimbursement_assistant"]
StewardTaskType = str
StewardAssignedAgent = str
StewardPlanningSource = Literal["llm_function_call", "rule_fallback"]
StewardPlanNextAction = Literal["confirm_flow", "confirm_task", "delegate_task", "none"]
StewardRequestedAction = Literal["preview", "save_draft", "submit"]
@@ -22,20 +22,7 @@ StewardRuntimeNextAction = Literal[
"cancel_current_action",
"no_op",
]
StewardActionType = Literal[
"detect_intent",
"fill_application_fields",
"build_application_preview",
"fill_reimbursement_fields",
"build_reimbursement_preview",
"validate_required_fields",
"run_duplicate_precheck",
"save_application_draft",
"submit_application",
"link_existing_application",
"create_reimbursement_draft",
"associate_attachments",
]
StewardActionType = str
StewardActionStatus = Literal["completed", "planned", "pending_confirmation", "blocked"]
StewardActionExecutionStatus = Literal["succeeded", "blocked", "needs_confirmation", "failed"]
StewardTaskStatus = Literal[
@@ -47,7 +34,7 @@ StewardTaskStatus = Literal[
"blocked",
]
StewardConfirmationStatus = Literal["pending", "confirmed", "rejected"]
StewardFlowId = Literal["travel_application", "travel_reimbursement"]
StewardFlowId = str
StewardPendingFlowStatus = Literal["none", "pending", "confirmed", "rejected"]

View File

@@ -550,7 +550,9 @@ class RuntimeChatService:
"max_tokens": max_tokens,
"temperature": temperature,
}
if provider == "GLM":
# function calling 需要确定性结构化输出,thinking mode 与强制 tool_choice 冲突
# (如 Ali 通义在 thinking 模式下拒绝 tool_choice=object),这里统一禁用。
if provider in {"GLM", "Ali"}:
request_payload["thinking"] = {"type": "disabled"}
status_code, payload = _send_json_request(

View File

@@ -26,8 +26,21 @@ class StewardActionPlanBuilder:
return plan.model_copy(update={"tasks": tasks, "action_steps": plan_steps})
def build_task_action_steps(self, task: StewardTask) -> list[StewardActionStep]:
from app.services.steward_intent_registry import get_intent
intent = get_intent(task.task_type)
if intent is not None:
return intent.action_steps_builder(task)
if task.task_type == "expense_application":
return self._build_application_steps(task)
return self.build_application_steps(task)
if task.task_type == "reimbursement":
return self.build_reimbursement_steps(task)
return []
def build_application_steps(self, task: StewardTask) -> list[StewardActionStep]:
return self._build_application_steps(task)
def build_reimbursement_steps(self, task: StewardTask) -> list[StewardActionStep]:
return self._build_reimbursement_steps(task)
def _build_application_steps(self, task: StewardTask) -> list[StewardActionStep]:

View File

@@ -15,6 +15,11 @@ from app.schemas.steward import (
from app.schemas.user_agent import UserAgentRequest
from app.services.attachment_association_jobs import AttachmentAssociationJobRunner
from app.services.expense_claims import ExpenseClaimService
from app.services.steward_intent_registry import (
all_noop_actions,
all_side_effect_actions,
resolve_intent_by_action,
)
from app.services.user_agent import UserAgentService
from app.services.user_agent_application_dates import resolve_application_days_from_time_range
@@ -31,8 +36,8 @@ SUPPORTED_ACTIONS = {
"link_existing_application",
"associate_attachments",
}
APPLICATION_SIDE_EFFECT_ACTIONS = {"save_application_draft", "submit_application"}
REIMBURSEMENT_SIDE_EFFECT_ACTIONS = {"create_reimbursement_draft", "link_existing_application"}
APPLICATION_SIDE_EFFECT_ACTIONS = {"save_application_draft", "submit_application", "run_duplicate_precheck"}
REIMBURSEMENT_SIDE_EFFECT_ACTIONS = {"create_reimbursement_draft", "link_existing_application", "associate_attachments"}
NOOP_ACTIONS = {
"fill_application_fields",
"build_application_preview",
@@ -83,7 +88,8 @@ class StewardActionExecutor:
) -> StewardActionExecuteResponse:
action_type = self._normalize_action_type(request.action_type)
trace = [self._trace("received", action_type=action_type, plan_id=request.plan_id)]
if action_type not in SUPPORTED_ACTIONS:
supported = SUPPORTED_ACTIONS | all_side_effect_actions() | all_noop_actions()
if action_type not in supported:
return self._blocked(
action_type,
f"不支持的小财管家动作:{action_type or '空动作'}",
@@ -91,7 +97,8 @@ class StewardActionExecutor:
)
task = request.task
if task is None and action_type not in NOOP_ACTIONS:
noop_actions = NOOP_ACTIONS | all_noop_actions()
if task is None and action_type not in noop_actions:
return self._blocked(
action_type,
"动作缺少任务快照,无法安全执行。",
@@ -107,7 +114,7 @@ class StewardActionExecutor:
trace=[*trace, self._trace("blocked", reason="missing_fields")],
)
if action_type in NOOP_ACTIONS:
if action_type in noop_actions:
return StewardActionExecuteResponse(
action_type=action_type,
status="succeeded",
@@ -118,6 +125,13 @@ class StewardActionExecutor:
},
trace=[*trace, self._trace("completed", mode="noop")],
)
# 优先走注册表:查到 action 所属意图的 executor 即委托执行
intent = resolve_intent_by_action(action_type)
if intent is not None and intent.executor is not None:
return intent.executor(self, request, current_user, trace)
# 兼容回退:注册表未命中时按旧逻辑分发
if action_type == "run_duplicate_precheck":
return self._run_duplicate_precheck(request, current_user, trace)
if action_type in APPLICATION_SIDE_EFFECT_ACTIONS:
@@ -133,6 +147,30 @@ class StewardActionExecutor:
trace=[*trace, self._trace("blocked", reason="unwired_action")],
)
def _dispatch_application_action(
self,
request: StewardActionExecuteRequest,
current_user: CurrentUserContext,
trace: list[dict[str, Any]],
) -> StewardActionExecuteResponse:
"""registry 入口:分发申请类副作用动作。"""
action_type = self._normalize_action_type(request.action_type)
if action_type == "run_duplicate_precheck":
return self._run_duplicate_precheck(request, current_user, trace)
return self._execute_application_action(request, current_user, action_type, trace)
def _dispatch_reimbursement_action(
self,
request: StewardActionExecuteRequest,
current_user: CurrentUserContext,
trace: list[dict[str, Any]],
) -> StewardActionExecuteResponse:
"""registry 入口:分发报销类副作用动作。"""
action_type = self._normalize_action_type(request.action_type)
if action_type == "associate_attachments":
return self._execute_associate_attachments_action(request, current_user, trace)
return self._execute_reimbursement_action(request, current_user, action_type, trace)
def _run_duplicate_precheck(
self,
request: StewardActionExecuteRequest,

View File

@@ -6,6 +6,7 @@ from typing import Any, TypedDict
from langgraph.graph import END, START, StateGraph
from app.schemas.steward import StewardPlanRequest, StewardPlanResponse
from app.services import steward_intent_bootstrap # noqa: F401 导入即注册全部业务意图
from app.services.steward_action_contracts import StewardActionPlanBuilder
from app.services.steward_constants import BUSINESS_CANONICAL_FIELD_ORDER
from app.services.steward_intent_agent import StewardIntentAgent, StewardIntentAgentResult

View File

@@ -8,11 +8,30 @@ from typing import Any
from app.schemas.steward import StewardPlanRequest
from app.services.ontology_field_registry import normalize_ontology_form_values
from app.services.runtime_chat import RuntimeChatService
from app.services.steward_intent_registry import (
all_flow_ids,
all_task_types,
all_intents,
)
STEWARD_INTENT_FUNCTION_NAME = "submit_steward_intent_plan"
def _build_supported_intent_summary() -> str:
"""从注册表拼接当前支持的意图列表,供 system prompt 引用。"""
fragments = [f"{desc.task_type}{desc.label}" for desc in all_intents()]
return "".join(fragments) if fragments else "(暂无已注册意图)"
def _build_intent_prompt_fragments() -> str:
"""从注册表拼接每个意图的识别指引片段。"""
fragments = [desc.prompt_fragment for desc in all_intents() if desc.prompt_fragment]
if not fragments:
return ""
return "".join(fragments)
@dataclass(frozen=True, slots=True)
class StewardIntentAgentResult:
payload: dict[str, Any]
@@ -104,11 +123,9 @@ class StewardIntentAgent:
"content": (
"你是 X-Financial 的小财管家意图识别智能体。"
"你必须通过 function calling 输出结构化计划,不能只返回普通文本。"
"当前版本只支持 expense_application 和 reimbursement 两类任务;"
f"当前支持的 task_type 包括:{_build_supported_intent_summary()}"
"你只做识别、拆解、归集和确认点规划,不能执行入库、绑定附件或提交审批。"
"用户描述未来出差、差旅计划、去某地几天、部署、支撑、拜访或会议安排时,"
"即使没有出现“申请”两个字,也必须优先识别为 expense_application。"
"用户描述已经发生的费用、昨天/前天费用、票据或明确报销诉求时,才识别为 reimbursement。"
f"{_build_intent_prompt_fragments()}"
"如果用户只描述出差时间、地点和事由,但没有明确申请、报销、提交、保存草稿等动作,"
"且无法从上下文判断流程方向,必须返回 pending_flow_confirmation.status=pending"
"candidate_flows 同时给出 travel_application 和 travel_reimbursementtasks 保持空数组。"
@@ -116,12 +133,12 @@ class StewardIntentAgent:
"如果输入里出现 occurred_date、transport_type、reason_value 等别名,必须映射为 canonical 字段。"
"每个 task 必须输出 requested_action用户只是要求整理/发起但未说保存或提交时为 preview"
"用户说保存草稿、先保存、存草稿时为 save_draft用户说直接提交、提交申请、确认提交时为 submit。"
"对于查询类任务(如查询差旅标准),requested_action 固定为 preview。"
"相对日期必须以 base_date 为准转换为明确日期。"
"thinking_events 只能是面向用户的过程摘要,不能暴露内部推理链。"
"如果用户输入与出差、费用、报销、申请等财务事项完全无关"
"如果用户输入与出差、费用、报销、申请、差旅标准等财务事项完全无关"
"(例如纯数字、问候、闲聊、无意义字符、单字符重复),"
"必须让 tasks 返回空数组,并在 thinking_events 中明确说明“未识别到财务事项”"
"不要强行把无关输入识别为 expense_application 或 reimbursement 任务。"
"必须让 tasks 返回空数组,并在 thinking_events 中明确说明“未识别到财务事项”"
),
},
{
@@ -159,7 +176,7 @@ class StewardIntentAgent:
"properties": {
"task_type": {
"type": "string",
"enum": ["expense_application", "reimbursement"],
"enum": all_task_types(),
},
"title": {"type": "string"},
"summary": {"type": "string"},
@@ -211,7 +228,7 @@ class StewardIntentAgent:
"properties": {
"flow_id": {
"type": "string",
"enum": ["travel_application", "travel_reimbursement"],
"enum": all_flow_ids(),
},
"label": {"type": "string"},
"confidence": {

View File

@@ -0,0 +1,112 @@
from __future__ import annotations
from app.services.steward_action_contracts import StewardActionPlanBuilder
from app.services.steward_action_executor import StewardActionExecutor
from app.services.steward_constants import BUSINESS_CANONICAL_FIELDS
from app.services.steward_intent_registry import (
IntentDescriptor,
register_intent,
)
from app.services.steward_query_executors import (
build_travel_standard_query_steps,
execute_travel_standard_query,
)
def bootstrap_intents() -> None:
"""注册小财管家支持的全部业务意图。
新增意图时在这里追加一个 register_intent 调用即可,
无需改动识别 / 动作生成 / 执行分发链路。
"""
application_builder = StewardActionPlanBuilder()
register_intent(
IntentDescriptor(
task_type="expense_application",
assigned_agent="application_assistant",
label="费用申请",
action_steps_builder=application_builder.build_application_steps,
signal_keywords=(
"申请", "出差", "差旅", "费用", "交通", "住宿", "采购", "会务", "会议",
"客户现场", "项目", "拜访", "调研", "驻场", "上线", "验收",
),
ontology_field_allowlist=tuple(BUSINESS_CANONICAL_FIELDS),
noop_actions=(
"fill_application_fields",
"build_application_preview",
"validate_required_fields",
),
side_effect_actions=("save_application_draft", "submit_application", "run_duplicate_precheck"),
executor=StewardActionExecutor._dispatch_application_action,
flow_id="travel_application",
prompt_fragment=(
"用户描述未来出差、差旅计划、去某地几天、部署、支撑、拜访或会议安排时,"
"即使没有出现“申请”两个字,也必须优先识别为 expense_application。"
),
)
)
register_intent(
IntentDescriptor(
task_type="reimbursement",
assigned_agent="reimbursement_assistant",
label="费用报销",
action_steps_builder=application_builder.build_reimbursement_steps,
signal_keywords=("报销", "报账", "票据", "发票", "凭证", "行程单", "付款截图", "小票", "收据"),
ontology_field_allowlist=tuple(BUSINESS_CANONICAL_FIELDS),
noop_actions=(
"fill_reimbursement_fields",
"build_reimbursement_preview",
"validate_required_fields",
),
side_effect_actions=(
"create_reimbursement_draft",
"link_existing_application",
"associate_attachments",
),
executor=StewardActionExecutor._dispatch_reimbursement_action,
flow_id="travel_reimbursement",
prompt_fragment=(
"用户描述已经发生的费用、昨天/前天费用、票据或明确报销诉求时,"
"才识别为 reimbursement。"
),
)
)
register_intent(
IntentDescriptor(
task_type="query_travel_standard",
assigned_agent="policy_query_assistant",
label="差旅标准查询",
action_steps_builder=build_travel_standard_query_steps,
signal_keywords=(
"差旅标准", "住宿标准", "出差标准", "交通标准", "出差补助",
"差旅补贴", "住宿补助", "交通补助", "职级标准", "差标",
),
ontology_field_allowlist=(
"location",
"employee_grade",
"standard_category",
"expense_type",
),
noop_actions=(),
side_effect_actions=("execute_travel_standard_query",),
executor=execute_travel_standard_query,
flow_id=None,
prompt_fragment=(
"用户询问差旅住宿标准、交通标准、出差补助或差旅补贴标准时,"
"必须识别为 query_travel_standard,而不是 expense_application 或 reimbursement。"
"差旅标准查询不创建任何单据,只返回标准数值。"
),
)
)
def _ensure_bootstrapped() -> None:
if not all_task_types():
bootstrap_intents()
# 导入即注册,保证 registry 在首次 import 后即处于可用状态。
bootstrap_intents()

View File

@@ -0,0 +1,128 @@
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any, Callable, Protocol
class ActionStepsBuilder(Protocol):
"""把单个 task 转换为确定性白名单动作步骤序列。"""
def __call__(self, task: Any) -> list[Any]: ...
class ActionExecutor(Protocol):
"""执行单个 action_type 对应的副作用或查询,返回 StewardActionExecuteResponse。"""
def __call__(self, executor: Any, request: Any, current_user: Any, trace: list[dict[str, Any]]) -> Any: ...
@dataclass(frozen=True)
class IntentDescriptor:
"""单个业务意图的声明式描述符。
注册一个 IntentDescriptor 即可让意图识别、动作生成、执行分发、字段过滤
全部自动适配,无需改动 if/else 链。
"""
task_type: str
assigned_agent: str
label: str
"""中文标签,用于 system prompt、前端展示和日志。"""
action_steps_builder: ActionStepsBuilder
"""把 StewardTask 转换为白名单动作步骤的可调用对象。"""
signal_keywords: tuple[str, ...] = ()
"""规则兜底 / off_topic 门控的关键词;命中即视为业务相关。"""
ontology_field_allowlist: tuple[str, ...] = ()
"""该意图允许的 canonical 槽位;为空表示沿用全局 BUSINESS_CANONICAL_FIELDS。"""
side_effect_actions: tuple[str, ...] = ()
"""该意图产生副作用的 action_type 集合;会被纳入执行器白名单。"""
noop_actions: tuple[str, ...] = ()
"""该意图的无副作用 action_type 集合(填充/预览/校验等)。"""
executor: ActionExecutor | None = None
"""副作用或查询动作的执行函数;None 表示全部走 NOOP。"""
flow_id: str | None = None
"""候选流程确认使用的 flow_id;查询类意图为 None。"""
prompt_fragment: str = ""
"""注入 steward_intent_agent system prompt 的识别指引片段。"""
_REGISTRY: dict[str, IntentDescriptor] = {}
_FLOW_TO_TASK: dict[str, str] = {}
def register_intent(descriptor: IntentDescriptor) -> IntentDescriptor:
"""注册一个业务意图;重复注册以最后一次为准。"""
_REGISTRY[descriptor.task_type] = descriptor
if descriptor.flow_id:
_FLOW_TO_TASK[descriptor.flow_id] = descriptor.task_type
return descriptor
def get_intent(task_type: str) -> IntentDescriptor | None:
return _REGISTRY.get(str(task_type or "").strip())
def all_intents() -> list[IntentDescriptor]:
return list(_REGISTRY.values())
def all_task_types() -> list[str]:
return [desc.task_type for desc in _REGISTRY.values()]
def all_assigned_agents() -> list[str]:
return [desc.assigned_agent for desc in _REGISTRY.values()]
def all_flow_ids() -> list[str]:
return [desc.flow_id for desc in _REGISTRY.values() if desc.flow_id]
def all_signal_keywords() -> set[str]:
keywords: set[str] = set()
for desc in _REGISTRY.values():
keywords.update(desc.signal_keywords)
return keywords
def all_side_effect_actions() -> set[str]:
actions: set[str] = set()
for desc in _REGISTRY.values():
actions.update(desc.side_effect_actions)
return actions
def all_noop_actions() -> set[str]:
actions: set[str] = set()
for desc in _REGISTRY.values():
actions.update(desc.noop_actions)
return actions
def resolve_task_type_for_flow(flow_id: str) -> str | None:
return _FLOW_TO_TASK.get(str(flow_id or "").strip())
def resolve_intent_by_action(action_type: str) -> IntentDescriptor | None:
"""根据 action_type 反查它所属的意图描述符。"""
normalized = str(action_type or "").strip()
for desc in _REGISTRY.values():
if normalized in desc.side_effect_actions or normalized in desc.noop_actions:
return desc
return None
def field_allowlist_for(task_type: str, *, fallback: frozenset[str] | None = None) -> frozenset[str]:
"""返回某意图允许的槽位集合;未声明则返回 fallback。"""
desc = get_intent(task_type)
if desc and desc.ontology_field_allowlist:
return frozenset(desc.ontology_field_allowlist)
return fallback or frozenset()

View File

@@ -18,6 +18,12 @@ from app.schemas.steward import (
from app.services.ontology_field_registry import normalize_ontology_form_values
from app.services.steward_constants import BUSINESS_CANONICAL_FIELDS
from app.services.steward_intent_agent import StewardIntentAgentResult
from app.services.steward_intent_registry import (
all_flow_ids,
field_allowlist_for,
get_intent,
resolve_task_type_for_flow,
)
class StewardModelPlanBuilder:
@@ -112,7 +118,8 @@ class StewardModelPlanBuilder:
if not isinstance(raw_task, dict):
continue
task_type = str(raw_task.get("task_type") or "").strip()
if task_type not in {"expense_application", "reimbursement"}:
intent_descriptor = get_intent(task_type)
if intent_descriptor is None:
continue
task_index = len(tasks) + 1
@@ -120,6 +127,7 @@ class StewardModelPlanBuilder:
raw_task.get("ontology_fields"),
request=request,
base_date=base_date,
task_type=task_type,
)
supplement_segment = " ".join(
[
@@ -136,13 +144,9 @@ class StewardModelPlanBuilder:
for key, value in supplement_fields.items():
fields.setdefault(key, value)
assigned_agent = (
"application_assistant"
if task_type == "expense_application"
else "reimbursement_assistant"
)
task_id = f"task_{'app' if task_type == 'expense_application' else 'reim'}_{task_index:03d}"
title_prefix = "费用申请" if task_type == "expense_application" else "费用报销"
assigned_agent = intent_descriptor.assigned_agent
task_id = self._build_task_id(task_type, task_index)
title_prefix = intent_descriptor.label
title = self.planner._clean_text(raw_task.get("title")) or self.planner._build_task_title(
title_prefix,
fields,
@@ -226,13 +230,14 @@ class StewardModelPlanBuilder:
if not isinstance(raw_candidate, dict):
continue
flow_id = self.planner._clean_text(raw_candidate.get("flow_id"))
if flow_id not in {"travel_application", "travel_reimbursement"}:
if flow_id not in set(all_flow_ids()):
continue
task_type = "expense_application" if flow_id == "travel_application" else "reimbursement"
task_type = resolve_task_type_for_flow(flow_id) or "expense_application"
fields = self._sanitize_model_ontology_fields(
raw_candidate.get("ontology_fields"),
request=request,
base_date=base_date,
task_type=task_type,
)
if not fields:
fields = self.planner._extract_ontology_fields(
@@ -312,32 +317,45 @@ class StewardModelPlanBuilder:
)
return "我识别到这是一次财务事项,但还需要先确认具体流程方向。"
@staticmethod
def _build_task_id(task_type: str, index: int) -> str:
"""根据 task_type 生成稳定的任务 ID 前缀。"""
prefix_map = {
"expense_application": "app",
"reimbursement": "reim",
"query_travel_standard": "query",
}
prefix = prefix_map.get(task_type, re.sub(r"[^a-z0-9]+", "_", task_type.lower()).strip("_") or "task")
return f"task_{prefix}_{index:03d}"
def _sanitize_model_ontology_fields(
self,
raw_fields: Any,
*,
request: StewardPlanRequest,
base_date: date,
task_type: str = "",
) -> dict[str, str]:
allowlist = field_allowlist_for(task_type, fallback=BUSINESS_CANONICAL_FIELDS)
normalized_context = normalize_ontology_form_values(request.context_json.get("review_form_values"))
fields: dict[str, str] = {
key: value
for key, value in normalized_context.items()
if key in BUSINESS_CANONICAL_FIELDS and str(value or "").strip()
if key in allowlist and str(value or "").strip()
}
if not isinstance(raw_fields, dict):
return fields
normalized_model_fields = normalize_ontology_form_values(raw_fields)
for key, value in normalized_model_fields.items():
if key not in BUSINESS_CANONICAL_FIELDS:
if key not in allowlist:
continue
normalized_value = self._normalize_model_field_value(key, value, base_date)
if normalized_value:
fields[key] = normalized_value
if request.attachments and not fields.get("attachments"):
fields["attachments"] = "".join(item.name for item in request.attachments if item.name)
return {key: value for key, value in fields.items() if key in BUSINESS_CANONICAL_FIELDS and value}
return {key: value for key, value in fields.items() if key in allowlist and value}
def _build_attachment_groups_from_model_payload(
self,
@@ -435,12 +453,17 @@ class StewardModelPlanBuilder:
task_type: str,
fields: dict[str, str],
) -> list[str]:
allowlist = field_allowlist_for(task_type, fallback=BUSINESS_CANONICAL_FIELDS)
missing_fields: list[str] = []
if isinstance(raw_missing_fields, list):
for item in raw_missing_fields:
key = str(item or "").strip()
if key in BUSINESS_CANONICAL_FIELDS and key not in missing_fields and not fields.get(key):
if key in allowlist and key not in missing_fields and not fields.get(key):
missing_fields.append(key)
# 查询类意图没有强制的必填字段,跳过 planner 的必填推断
intent_descriptor = get_intent(task_type)
if intent_descriptor is not None and not intent_descriptor.flow_id:
return missing_fields
for key in self.planner._resolve_missing_fields(task_type, fields):
if key not in missing_fields:
missing_fields.append(key)

View File

@@ -25,6 +25,44 @@ from app.services.steward_planner_shared import (
)
def _matches_query_signal(compact: str) -> bool:
"""判断输入是否命中查询类意图信号词(差旅标准等)。
查询意图不应进入申请/报销候选流程确认,也不应被当作普通业务输入走规则兜底。
"""
from app.services.steward_intent_registry import all_signal_keywords
query_indicators = ("标准", "补助", "补贴", "差标", "政策", "制度", "多少")
if not any(indicator in compact for indicator in query_indicators):
return False
signal_keywords = all_signal_keywords()
return any(keyword in compact for keyword in signal_keywords)
def _resolve_assigned_agent_for_draft(task_type: str) -> str:
"""从注册表取 assigned_agent,查不到时回退到申请/报销默认值。"""
from app.services.steward_intent_registry import get_intent
intent = get_intent(task_type)
if intent is not None:
return intent.assigned_agent
if task_type == "expense_application":
return "application_assistant"
return "reimbursement_assistant"
def _resolve_task_label_for_draft(task_type: str) -> str:
"""从注册表取意图中文标签,查不到时回退到申请/报销默认值。"""
from app.services.steward_intent_registry import get_intent
intent = get_intent(task_type)
if intent is not None:
return intent.label
if task_type == "expense_application":
return "费用申请"
return "费用报销"
class StewardPlannerExtractionMixin:
def _has_multiple_financial_demands(self, message: str) -> bool:
task_drafts = self._extract_task_drafts(message)
@@ -89,6 +127,9 @@ class StewardPlannerExtractionMixin:
compact = re.sub(r"\s+", "", text)
if not compact or request.attachments:
return False
# 查询类意图(差旅标准等)不走申请/报销候选流程确认
if _matches_query_signal(compact):
return False
if re.search(r"申请|报销|草稿|提交|审批|保存|发起|创建", compact):
return False
if not re.search(r"出差|差旅|客户现场|项目|部署|实施|支撑|支持|协助|拜访|调研|培训|会议|驻场|上线|验收", compact):
@@ -115,15 +156,13 @@ class StewardPlannerExtractionMixin:
base_date: date,
request: StewardPlanRequest,
) -> StewardTask:
from app.services.steward_model_plan_builder import StewardModelPlanBuilder
fields = self._extract_ontology_fields(draft.segment, draft.task_type, base_date, request)
missing_fields = self._resolve_missing_fields(draft.task_type, fields)
task_id = f"task_{'app' if draft.task_type == 'expense_application' else 'reim'}_{draft.index:03d}"
assigned_agent = (
"application_assistant"
if draft.task_type == "expense_application"
else "reimbursement_assistant"
)
title_prefix = "费用申请" if draft.task_type == "expense_application" else "费用报销"
task_id = StewardModelPlanBuilder._build_task_id(draft.task_type, draft.index)
assigned_agent = _resolve_assigned_agent_for_draft(draft.task_type)
title_prefix = _resolve_task_label_for_draft(draft.task_type)
title = self._build_task_title(title_prefix, fields, draft.index)
return StewardTask(
task_id=task_id,

View File

@@ -63,6 +63,11 @@ class StewardPlannerFallbackMixin:
return None
if any(keyword in compact for keyword in STEWARD_BUSINESS_SIGNAL_KEYWORDS):
return None
# 补充注册表里各意图声明的信号词(如查询类"差旅标准"等),避免被判 off_topic
from app.services.steward_intent_registry import all_signal_keywords
if any(keyword in compact for keyword in all_signal_keywords()):
return None
if StewardPlannerFallbackMixin._looks_like_greeting(compact):
return STEWARD_OFF_TOPIC_SCENARIO_GREETING

View File

@@ -0,0 +1,315 @@
from __future__ import annotations
from typing import Any
from app.api.deps import CurrentUserContext
from app.schemas.steward import (
StewardActionExecuteRequest,
StewardActionExecuteResponse,
StewardActionStep,
StewardTask,
)
from app.services.expense_rule_runtime_defaults import DEFAULT_TRAVEL_POLICY_CONFIG
from app.services.runtime_chat import RuntimeChatService
# 城市分级标签:差旅政策 city_tier 到面向用户的城市档位名称
CITY_TIER_LABELS = {
"tier_1": "一类城市(北上广深)",
"tier_2": "二类城市(省会及重点城市)",
"tier_3": "三类城市(其他地区)",
}
# 交通等级编码到中文标签
TRANSPORT_LEVEL_LABELS = {
1: "经济舱/二等座(普通席别)",
2: "高端经济舱/一等座(中级席别)",
3: "公务舱/商务座(高级席别)",
4: "头等舱(最高席别)",
}
def build_travel_standard_query_steps(task: StewardTask) -> list[StewardActionStep]:
"""生成差旅标准查询任务的动作步骤。
查询不产生副作用,无需校验必填、保存或提交,只生成单步执行动作。
"""
fields = _resolve_task_fields(task)
return [
StewardActionStep(
step_id=f"{task.task_id}:01",
action_type="execute_travel_standard_query",
label="查询差旅标准",
target_task_id=task.task_id,
status="planned",
requires_confirmation=False,
payload={
"task_id": task.task_id,
"ontology_fields": fields,
},
)
]
def execute_travel_standard_query(
executor: Any,
request: StewardActionExecuteRequest,
current_user: CurrentUserContext,
trace: list[dict[str, Any]],
) -> StewardActionExecuteResponse:
"""执行差旅标准查询:检索业务数据 → 交给 LLM 整理成自然语言。
数据源为 DEFAULT_TRAVEL_POLICY_CONFIG(住宿标准 by 职级×城市分级、
交通等级 by 职级)。补助标准当前未纳入运行时配置,用制度说明兜底。
"""
action_type = "execute_travel_standard_query"
fields = _resolve_task_fields(request.task)
message = _resolve_message(request)
location = str(fields.get("location") or "").strip()
employee_grade = _resolve_employee_grade(fields, current_user)
standard_category = str(fields.get("standard_category") or "").strip().lower()
standards = resolve_travel_standard_snapshot(
location=location,
employee_grade=employee_grade,
standard_category=standard_category,
)
if not standards["matched_any"]:
answer = _build_no_match_answer(location, employee_grade, standard_category)
return StewardActionExecuteResponse(
action_type=action_type,
status="succeeded",
message=answer,
result_payload={
"answer_markdown": answer,
"standards": standards,
"matched": False,
},
trace=[*trace, _trace("completed", mode="query_no_match")],
)
answer = _compose_travel_standard_answer(
message=message,
standards=standards,
location=location,
employee_grade=employee_grade,
)
return StewardActionExecuteResponse(
action_type=action_type,
status="succeeded",
message=answer,
result_payload={
"answer_markdown": answer,
"standards": standards,
"matched": True,
},
trace=[*trace, _trace("completed", mode="query_travel_standard")],
)
def resolve_travel_standard_snapshot(
*,
location: str,
employee_grade: str,
standard_category: str = "",
) -> dict[str, Any]:
"""按地点、职级和关注标准类别,从差旅政策配置检索确定性标准数值。
standard_category 为空表示返回全部类别;非空时只返回指定类别。
支持的类别:lodging(住宿)、transport(交通)、allowance(补助)。
"""
config = DEFAULT_TRAVEL_POLICY_CONFIG
city_tiers = config.get("city_tiers", {})
hotel_limits = config.get("hotel_limits", {})
transport_limits = config.get("transport_limits", {})
band_labels = config.get("band_labels", {})
normalized_city = str(location or "").strip()
city_tier = city_tiers.get(normalized_city, "tier_3") if normalized_city else "tier_3"
normalized_grade = _normalize_grade(employee_grade)
snapshot: dict[str, Any] = {
"location": normalized_city or "",
"city_tier": city_tier,
"city_tier_label": CITY_TIER_LABELS.get(city_tier, city_tier),
"employee_grade": normalized_grade,
"employee_grade_label": band_labels.get(normalized_grade, normalized_grade or "未指定"),
"standard_category": standard_category or "",
"matched_any": False,
"lodging": None,
"transport": None,
"allowance": None,
}
want_all = not standard_category
if want_all or standard_category == "lodging":
lodging_cap = _resolve_lodging_cap(hotel_limits, normalized_grade, city_tier)
if lodging_cap is not None:
snapshot["lodging"] = {
"daily_cap": str(lodging_cap),
"unit": "元/晚",
}
snapshot["matched_any"] = True
if want_all or standard_category == "transport":
transport_band = _resolve_transport_band(transport_limits, normalized_grade)
if transport_band is not None:
snapshot["transport"] = {
"flight_level": transport_band.get("flight"),
"train_level": transport_band.get("train"),
"flight_label": TRANSPORT_LEVEL_LABELS.get(int(transport_band.get("flight", 0)), ""),
"train_label": TRANSPORT_LEVEL_LABELS.get(int(transport_band.get("train", 0)), ""),
}
snapshot["matched_any"] = True
if want_all or standard_category == "allowance":
# 补助标准当前未纳入运行时配置,用占位说明,等补助数据源接入后补全。
# 占位说明不计入 matched_any,避免无有效数据时仍标记为已匹配。
snapshot["allowance"] = {
"note": "出差补助标准按地区(直辖市/港澳台/境外等)分档,具体数值请参考《公司差旅费报销规则》或咨询财务。",
}
return snapshot
def _resolve_lodging_cap(
hotel_limits: dict[str, Any],
grade: str,
city_tier: str,
) -> str | None:
grade_entry = hotel_limits.get(grade)
if not isinstance(grade_entry, dict):
return None
cap = grade_entry.get(city_tier)
return str(cap).strip() if cap is not None else None
def _resolve_transport_band(
transport_limits: dict[str, Any],
grade: str,
) -> dict[str, Any] | None:
band = transport_limits.get(grade)
if not isinstance(band, dict):
return None
return {"flight": band.get("flight"), "train": band.get("train")}
def _normalize_grade(value: str) -> str:
normalized = str(value or "").strip().upper()
if normalized in {"", "未指定", "未知"}:
return ""
if normalized in DEFAULT_TRAVEL_POLICY_CONFIG.get("band_labels", {}):
return normalized
# 容忍 P05 / p5 等写法
compact = normalized.lstrip("Pp")
if compact.isdigit():
candidate = f"P{int(compact)}"
if candidate in DEFAULT_TRAVEL_POLICY_CONFIG.get("band_labels", {}):
return candidate
return normalized
def _resolve_employee_grade(
fields: dict[str, str],
current_user: CurrentUserContext,
) -> str:
grade = str(fields.get("employee_grade") or "").strip()
if grade:
return grade
return str(getattr(current_user, "grade", "") or "").strip()
def _resolve_task_fields(task: StewardTask | None) -> dict[str, str]:
if task is None or not isinstance(task.ontology_fields, dict):
return {}
return {
str(key or "").strip(): str(value or "").strip()
for key, value in task.ontology_fields.items()
if str(key or "").strip() and str(value or "").strip()
}
def _resolve_message(request: StewardActionExecuteRequest) -> str:
message = str(request.message or "").strip()
if message:
return message
if request.task is not None:
return str(request.task.summary or request.task.title or "").strip()
return "差旅标准查询"
def _build_no_match_answer(location: str, employee_grade: str, standard_category: str) -> str:
parts = ["### 未能匹配到具体差旅标准"]
details = []
if location:
details.append(f"目的地:**{location}**")
if employee_grade:
details.append(f"职级:**{employee_grade}**")
if standard_category:
details.append(f"关注类别:**{standard_category}**")
if details:
parts.append("")
parts.append("当前识别到:" + "".join(details) + "")
parts.append("")
parts.append(
"请补充更明确的信息,例如\"P5 去武汉出差的住宿标准是多少\","
"或直接说\"查武汉的住宿标准\""
)
return "\n".join(parts)
def _compose_travel_standard_answer(
*,
message: str,
standards: dict[str, Any],
location: str,
employee_grade: str,
) -> str:
"""把结构化标准整理成面向用户的 Markdown 回复。
优先用确定性数据拼装;如需更自然的表述,可在此处接入 LLM,
当前阶段确定性拼装已足够清晰,避免额外模型调用开销。
"""
lines = ["### 差旅标准查询结果"]
context_parts = []
if location:
context_parts.append(f"目的地 **{location}**({standards.get('city_tier_label', '')})")
if employee_grade:
context_parts.append(f"职级 **{standards.get('employee_grade_label', employee_grade)}**")
if context_parts:
lines.append("")
lines.append("查询条件:" + "".join(context_parts) + "")
lodging = standards.get("lodging")
transport = standards.get("transport")
allowance = standards.get("allowance")
if lodging:
lines.append("")
lines.append(f"- **住宿标准**:{lodging['daily_cap']} {lodging['unit']}")
if transport:
lines.append(
f"- **交通工具等级**:飞机 {transport.get('flight_label', '')}"
f"火车 {transport.get('train_label', '')}"
)
if allowance:
lines.append(f"- **出差补助**:{allowance.get('note', '')}")
lines.append("")
lines.append(
"> 标准依据公司差旅政策运行时配置。如需了解超标说明、多城市行程等例外口径,"
"请进一步描述您的场景。"
)
return "\n".join(lines)
def _trace(stage: str, **extra: Any) -> dict[str, Any]:
from datetime import UTC, datetime
return {
"stage": stage,
"at": datetime.now(UTC).isoformat(),
**extra,
}

View File

@@ -68,3 +68,33 @@ def test_steward_intent_agent_uses_ten_second_timeout_and_three_attempts() -> No
assert runtime_chat.kwargs["timeout_seconds"] == 10
assert runtime_chat.kwargs["max_attempts"] == 3
assert runtime_chat.kwargs["use_failure_cooldown"] is False
def test_steward_intent_tool_schema_includes_query_task_type_from_registry() -> None:
"""function call schema 的 task_type enum 应从注册表动态生成,包含查询意图。"""
from app.services import steward_intent_bootstrap # noqa: F401 触发意图注册
schema = StewardIntentAgent._build_intent_tool_schema(
["expense_type", "time_range", "location", "reason", "transport_mode"]
)
task_schema = schema["function"]["parameters"]["properties"]["tasks"]["items"]
task_type_enum = task_schema["properties"]["task_type"]["enum"]
assert "expense_application" in task_type_enum
assert "reimbursement" in task_type_enum
assert "query_travel_standard" in task_type_enum
def test_steward_intent_system_prompt_mentions_query_intent_guidance() -> None:
"""system prompt 应包含查询意图的识别指引,避免被误识别为申请。"""
from app.services import steward_intent_bootstrap # noqa: F401 触发意图注册
messages = StewardIntentAgent._build_messages(
StewardPlanRequest(message="武汉出差标准是多少"),
base_date=__import__("datetime").date(2026, 6, 24),
canonical_fields=["location", "employee_grade"],
)
system_prompt = messages[0]["content"]
assert "query_travel_standard" in system_prompt
assert "差旅" in system_prompt
assert "住宿标准" in system_prompt

View File

@@ -0,0 +1,87 @@
from __future__ import annotations
from app.services import steward_intent_bootstrap # noqa: F401 触发意图注册
from app.services.steward_intent_registry import (
all_flow_ids,
all_intents,
all_signal_keywords,
all_task_types,
field_allowlist_for,
get_intent,
resolve_intent_by_action,
resolve_task_type_for_flow,
)
from app.services.steward_constants import BUSINESS_CANONICAL_FIELDS
def test_registry_registers_application_reimbursement_and_query_intents():
task_types = all_task_types()
assert "expense_application" in task_types
assert "reimbursement" in task_types
assert "query_travel_standard" in task_types
application = get_intent("expense_application")
assert application is not None
assert application.assigned_agent == "application_assistant"
assert application.flow_id == "travel_application"
reimbursement = get_intent("reimbursement")
assert reimbursement is not None
assert reimbursement.assigned_agent == "reimbursement_assistant"
assert reimbursement.flow_id == "travel_reimbursement"
query = get_intent("query_travel_standard")
assert query is not None
assert query.assigned_agent == "policy_query_assistant"
assert query.flow_id is None # 查询意图不进入候选流程确认
def test_registry_aggregates_flow_ids_and_signal_keywords():
flow_ids = set(all_flow_ids())
assert flow_ids == {"travel_application", "travel_reimbursement"}
keywords = all_signal_keywords()
assert "出差" in keywords # 来自 expense_application
assert "报销" in keywords # 来自 reimbursement
assert "差旅标准" in keywords # 来自 query_travel_standard
def test_registry_resolves_intent_by_action_type():
assert resolve_intent_by_action("save_application_draft").task_type == "expense_application"
assert resolve_intent_by_action("submit_application").task_type == "expense_application"
assert resolve_intent_by_action("create_reimbursement_draft").task_type == "reimbursement"
assert resolve_intent_by_action("associate_attachments").task_type == "reimbursement"
assert resolve_intent_by_action("execute_travel_standard_query").task_type == "query_travel_standard"
assert resolve_intent_by_action("unknown_action") is None
def test_registry_resolves_task_type_for_flow():
assert resolve_task_type_for_flow("travel_application") == "expense_application"
assert resolve_task_type_for_flow("travel_reimbursement") == "reimbursement"
assert resolve_task_type_for_flow("unknown_flow") is None
def test_field_allowlist_uses_per_intent_overrides():
# 申请/报销沿用全局 BUSINESS_CANONICAL_FIELDS
application_fields = field_allowlist_for("expense_application")
assert "location" in application_fields
assert "amount" in application_fields
assert application_fields == frozenset(BUSINESS_CANONICAL_FIELDS)
# 查询意图使用专属槽位集合
query_fields = field_allowlist_for("query_travel_standard")
assert "location" in query_fields
assert "employee_grade" in query_fields
assert "standard_category" in query_fields
assert "amount" not in query_fields # 查询不需要金额
# 未注册意图回退到 fallback
fallback_fields = field_allowlist_for("unknown", fallback=frozenset({"foo"}))
assert fallback_fields == frozenset({"foo"})
def test_query_intent_prompt_fragment_includes_identification_guidance():
query = get_intent("query_travel_standard")
assert query is not None
assert "差旅" in query.prompt_fragment
assert "住宿标准" in query.prompt_fragment

View File

@@ -0,0 +1,172 @@
from __future__ import annotations
from types import SimpleNamespace
from app.schemas.steward import StewardActionExecuteRequest, StewardTask
from app.services.steward_query_executors import (
build_travel_standard_query_steps,
execute_travel_standard_query,
resolve_travel_standard_snapshot,
)
def _build_current_user(grade: str = "P5") -> SimpleNamespace:
return SimpleNamespace(
username="test_user",
name="测试员工",
grade=grade,
department_name="技术部",
position="工程师",
role_codes=["employee"],
is_admin=False,
employee_no="E00001",
manager_name="李总",
)
def _build_request(
*,
location: str = "武汉",
employee_grade: str = "",
standard_category: str = "",
message: str = "我去武汉出差的住宿标准是多少",
) -> StewardActionExecuteRequest:
ontology_fields: dict[str, str] = {}
if location:
ontology_fields["location"] = location
if employee_grade:
ontology_fields["employee_grade"] = employee_grade
if standard_category:
ontology_fields["standard_category"] = standard_category
task = StewardTask(
task_id="task_query_001",
task_type="query_travel_standard",
assigned_agent="policy_query_assistant",
title="差旅标准查询",
summary=message,
ontology_fields=ontology_fields,
)
return StewardActionExecuteRequest(
action_type="execute_travel_standard_query",
message=message,
task=task,
)
def test_resolve_travel_standard_snapshot_returns_lodging_and_transport_for_known_city_and_grade():
snapshot = resolve_travel_standard_snapshot(
location="武汉",
employee_grade="P5",
)
assert snapshot["location"] == "武汉"
assert snapshot["city_tier"] == "tier_2" # 武汉是二类城市
assert snapshot["employee_grade"] == "P5"
assert snapshot["lodging"] is not None
assert snapshot["lodging"]["daily_cap"] == "480.00" # P5 + tier_2
assert snapshot["transport"] is not None
assert snapshot["transport"]["flight_level"] == 1
assert snapshot["matched_any"] is True
def test_resolve_travel_standard_snapshot_filters_by_standard_category():
lodging_only = resolve_travel_standard_snapshot(
location="北京",
employee_grade="P7",
standard_category="lodging",
)
assert lodging_only["lodging"] is not None
assert lodging_only["lodging"]["daily_cap"] == "900.00" # P7 + tier_1
assert lodging_only["transport"] is None
transport_only = resolve_travel_standard_snapshot(
location="北京",
employee_grade="P7",
standard_category="transport",
)
assert transport_only["transport"] is not None
assert transport_only["transport"]["flight_level"] == 3 # P7 飞机等级
assert transport_only["lodging"] is None
def test_resolve_travel_standard_snapshot_normalizes_grade_variants():
# "p5" 小写、未标准化写法应被归一为 "P5"
snapshot = resolve_travel_standard_snapshot(
location="武汉",
employee_grade="p5",
)
assert snapshot["employee_grade"] == "P5"
assert snapshot["lodging"]["daily_cap"] == "480.00"
def test_resolve_travel_standard_snapshot_handles_unknown_grade():
snapshot = resolve_travel_standard_snapshot(
location="武汉",
employee_grade="",
)
# 无职级时无法匹配住宿标准(需要职级档位);补助为占位说明,不计入 matched_any
assert snapshot["lodging"] is None
assert snapshot["matched_any"] is False
assert snapshot["allowance"] is not None # 仍返回占位说明
def test_execute_travel_standard_query_returns_succeeded_with_answer_markdown():
request = _build_request(location="武汉", employee_grade="P5")
response = execute_travel_standard_query(
executor=None,
request=request,
current_user=_build_current_user("P5"),
trace=[],
)
assert response.status == "succeeded"
assert response.action_type == "execute_travel_standard_query"
assert "差旅标准查询结果" in response.message
assert "480.00" in response.message # 住宿标准
assert response.result_payload["matched"] is True
assert response.result_payload["standards"]["lodging"]["daily_cap"] == "480.00"
def test_execute_travel_standard_query_falls_back_to_current_user_grade_when_field_missing():
request = _build_request(location="武汉", employee_grade="")
response = execute_travel_standard_query(
executor=None,
request=request,
current_user=_build_current_user("P5"),
trace=[],
)
assert response.status == "succeeded"
# 应回退到 current_user.grade = P5
assert response.result_payload["standards"]["employee_grade"] == "P5"
def test_execute_travel_standard_query_returns_no_match_when_grade_and_city_unknown():
request = _build_request(
location="未知城市",
employee_grade="",
message="查差旅标准",
)
# ontology_fields 也没有 grade,current_user 也没有
response = execute_travel_standard_query(
executor=None,
request=request,
current_user=_build_current_user(""),
trace=[],
)
assert response.status == "succeeded"
assert response.result_payload["matched"] is False
assert "未能匹配" in response.message
def test_build_travel_standard_query_steps_generates_single_executable_step():
task = StewardTask(
task_id="task_query_001",
task_type="query_travel_standard",
assigned_agent="policy_query_assistant",
title="差旅标准查询",
summary="查武汉住宿标准",
ontology_fields={"location": "武汉", "employee_grade": "P5"},
)
steps = build_travel_standard_query_steps(task)
assert len(steps) == 1
assert steps[0].action_type == "execute_travel_standard_query"
assert steps[0].requires_confirmation is False
assert steps[0].payload["ontology_fields"]["location"] == "武汉"

View File

@@ -16,7 +16,8 @@ import {
const TASK_TYPE_LABELS = {
expense_application: '费用申请',
reimbursement: '费用报销'
reimbursement: '费用报销',
query_travel_standard: '差旅标准查询'
}
const AGENT_LABELS = {
@@ -25,14 +26,17 @@ const AGENT_LABELS = {
expense_application: '申请助手',
reimbursement_assistant: '报销助手',
reimbursement: '报销助手',
expense: '报销助手'
expense: '报销助手',
policy_query_assistant: '政策查询助手',
query_travel_standard: '政策查询助手'
}
const EXECUTABLE_STEWARD_ACTION_TYPES = new Set([
'save_application_draft',
'submit_application',
'create_reimbursement_draft',
'associate_attachments'
'associate_attachments',
'execute_travel_standard_query'
])
export function buildStewardPlanRequest({