diff --git a/document/development/AI意图规划器/UNIFIED_GATE_PIPELINE.md b/document/development/AI意图规划器/UNIFIED_GATE_PIPELINE.md new file mode 100644 index 0000000..9349741 --- /dev/null +++ b/document/development/AI意图规划器/UNIFIED_GATE_PIPELINE.md @@ -0,0 +1,243 @@ +# 统一门控管道(Unified Gate Pipeline) + +> 状态:**设计定稿,待实施** +> 创建:2026-06-25 +> 关联:`CONCEPT.md`、`LANGGRAPH_RUNTIME_MIGRATION.md` + +## 1. 为什么要做这件事 + +### 1.1 现状的致命问题 + +小财管家的门控(决定用户输入走哪条路)目前散落在 **7 个位置**,互相不知道对方的结论,每加一个场景要找 n 个地方改: + +| # | 位置 | 文件 | 做的门控 | +|---|---|---|---| +| 1 | 前端 7 层 if/else | `usePersonalWorkbenchAiMode.js:858-913` `startInlineConversation` | 命令→文本动作→草稿→模型规划→报销→闲聊,每层各自 return | +| 2 | 前端业务词预筛 | `workbenchAiIntentPlannerModel.js:shouldRequestWorkbenchAiIntentPlan` | 不含业务词的输入不发给后端 | +| 3 | 后端 endpoint 补丁群 | `steward.py:create_steward_plan` | `_hydrate_required_application_gate` / `_inject_recent_conversation_history` / `_apply_context_resume` 三个补丁串在 `build_plan` 前后 | +| 4 | 图条件边路由 | `steward_graph_planner.py:_route_after_prepare_context` | off_topic / model / fallback 三路 | +| 5 | off_topic 关键词 | `steward_planner_fallback.py:_classify_irrelevant_input` + `STEWARD_BUSINESS_SIGNAL_KEYWORDS` | 写死的信号词元组 | +| 6 | 候选流程歧义 | `steward_planner_extraction.py:_looks_like_ambiguous_travel_flow` | 独立的正则判定 | +| 7 | 图后意图处理 | `usePersonalWorkbenchAiMode.js:813-835` `executeModelPlannedWorkbenchIntent` | 前端再判一遍 task_type 决定渲染申请预览还是报销 | + +**根因:没有单一的决策点。** LangGraph 图只承担了"意图识别"这一个职责,控制流泄漏到了 endpoint 层和前端 composable,形成两个影子编排器。 + +### 1.2 不持久的判据 + +加一个场景(如"查报销进度")的成本是 **O(n)**——必须同步改前端门控、后端补丁、图条件边、off_topic 关键词、候选流程判定等多处,漏一处就静默出错。本次会话已经在不断验证这个痛点:每个新场景(查询、低置信度、上下文恢复)都是往不同位置打补丁。 + +## 2. 目标:接入成本 O(1) + +加一个新场景,**全文改动只有一处**: + +```python +# server/src/app/services/scenes/scene_query_reimbursement_progress.py +register_scene(SceneDescriptor( + scene_id="query_reimbursement_progress", + label="报销进度查询", + signal_keywords=("报销进度", "报销状态", "审批进度", "审批到哪了"), + ontology_fields=("claim_no", "time_range"), + gate=GateRule.CHOICE, # 不走候选流程、不走 off_topic + can_resume=False, # 不参与上下文恢复 + route=SceneRoute.HANDLER_ONLY, # 不走 LLM,直接执行 handler + handler=execute_progress_query, # 纯函数:检索 + 拼装 + prompt_fragment="用户询问报销审批进度/状态时,识别为 query_reimbursement_progress。", +)) +``` + +**不改图、不改 endpoint、不改前端门控、不改 extraction。** 判断规则、路由、执行、槽位、恢复能力在同一个 descriptor 里声明,不会割裂。 + +## 3. 目标架构 + +### 3.1 后端:图成为唯一编排者 + +``` +POST /api/v1/steward/plans + ↓ +endpoint: 纯 IO (收请求 → graph.invoke → 返响应,零编排) + ↓ +LangGraph StateGraph (唯一编排者): + START + → load_context 读最近10条历史 + steward_state + hydrate + → gate_classify 统一门控:按 registry 规则裁决 scene + route + → route 分支 + ├─ off_topic → off_topic_reply + ├─ handler_only → execute_scene_handler (查询/命令类,不走 LLM) + ├─ resume → resume_recent_task ("再提交"确定性恢复) + ├─ ambiguous_flow → pending_flow_confirmation + └─ model_intent → detect_model_intent → {done | fallback} + → attach_action_steps + → persist_state 写 message + steward_state + → END +``` + +**endpoint 层只剩 3 行**:`planner = build(db); plan = planner.build_plan(payload); return plan`。所有 hydrate/inject/resume 全部搬进图节点。 + +### 3.2 前端:退化为纯渲染 + +``` +用户输入 + ↓ +前端: 不再自己决策,统一发给后端 + POST /steward/plans { message, conversation_id } + ↓ +后端返回 StewardPlanResponse: + - plan.next_action 告诉前端该渲染什么 + - plan.tasks[].task_type 告诉前端该用哪个渲染器 + - plan.suggested_actions 告诉前端该显示哪些按钮 + ↓ +前端: 按 response 的指令渲染(申请预览/报销预览/查询结果/纯文本回复) +``` + +前端的 7 层 if/else **全部移除**,替换为: +```js +async function startInlineConversation(prompt) { + const plan = await fetchStewardPlan({ message: prompt, conversation_id: conversationId.value }) + renderPlanResponse(plan) // 按 plan.next_action / task_type 分发到对应渲染器 +} +``` + +### 3.3 SceneDescriptor:场景的唯一声明 + +```python +@dataclass(frozen=True) +class SceneDescriptor: + scene_id: str # 唯一标识,等同 task_type + label: str # 中文标签 + signal_keywords: tuple[str, ...] # 规则识别的关键词(聚合进 off_topic 信号池) + ontology_fields: tuple[str, ...] # 该场景允许的槽位 + gate: GateRule # 门控规则(见 3.4) + route: SceneRoute # 路由策略(见 3.5) + handler: Callable | None # 执行函数(handler_only 路由用) + can_resume: bool = False # 是否参与"再提交"上下文恢复 + action_steps_builder: Callable = ... # 动作步骤生成 + prompt_fragment: str = "" # 注入 LLM system prompt 的识别指引 + priority: int = 100 # gate_classify 的匹配优先级(小优先) + flow_id: str | None = None # 候选流程用;查询/命令类为 None +``` + +### 3.4 GateRule:门控规则枚举 + +```python +class GateRule(Enum): + OFF_TOPIC = "off_topic" # 非业务输入,走 off_topic_reply + CHOICE = "choice" # 明确的业务选择,走 handler/model + AMBIGUOUS_FLOW = "ambiguous_flow" # 话术歧义,走候选流程确认 + MODEL_ONLY = "model_only" # 只走 LLM function call,不参与规则匹配 +``` + +### 3.5 SceneRoute:路由策略枚举 + +```python +class SceneRoute(Enum): + HANDLER_ONLY = "handler_only" # 不走 LLM,直接执行 handler(查询/命令类) + MODEL_INTENT = "model_intent" # 走 LLM function call(申请/报销类) + OFF_TOPIC = "off_topic" # 走 off_topic 回复 + RESUME = "resume" # 走确定性上下文恢复 + AMBIGUOUS = "ambiguous" # 走候选流程确认 +``` + +## 4. gate_classify 节点的裁决逻辑(唯一决策点) + +```python +def gate_classify(state) -> dict: + """统一门控:按优先级遍历 registry,输出 scene_id + route。""" + message = state["message"] + steward_state = state["steward_state"] + history = state["recent_history"] + + # ① off_topic 门:聚合所有场景的 signal_keywords,无命中 → off_topic + if not _matches_any_signal(message): + return {"scene_id": "off_topic", "route": SceneRoute.OFF_TOPIC} + + # ② resume 门:用户说"再提交"+ state 有可恢复 flow + resume_scene = _check_resume(message, steward_state) + if resume_scene: + return {"scene_id": resume_scene, "route": SceneRoute.RESUME} + + # ③ 规则匹配门:按 priority 遍历,命中 signal_keywords 的场景 + for scene in registry.scenes_sorted_by_priority(): + if scene.gate == GateRule.CHOICE and _matches_keywords(message, scene.signal_keywords): + return {"scene_id": scene.scene_id, "route": scene.route} + + # ④ LLM 门:规则未命中,走 model function call + return {"scene_id": None, "route": SceneRoute.MODEL_INTENT} +``` + +**所有门控收敛到这一个函数。** off_topic 信号词、resume 判断、规则匹配、LLM 兜底,全部在这里按固定顺序裁决。 + +## 5. 文件结构 + +``` +server/src/app/services/ + scenes/ # 场景声明(每个场景一个文件) + __init__.py # 注册所有场景 + scene_registry.py # SceneRegistry 单例 + 查询方法 + scene_descriptor.py # SceneDescriptor dataclass + scene_expense_application.py # 出差申请场景 + scene_reimbursement.py # 报销场景 + scene_query_travel_standard.py# 差旅标准查询场景 + gate_rules.py # GateRule / SceneRoute 枚举 + steward_graph_planner.py # 图:load_context/gate_classify/.../persist_state + steward_scene_handlers.py # 各场景的 handler 纯函数 +``` + +## 6. 迁移路径(分阶段,每阶段可独立验证) + +### Phase 1:建场景注册表 + 收口后端门控(后端自闭环) + +**目标**:后端 endpoint 零编排,图成为唯一编排者。 + +1. 新建 `scenes/` 目录,实现 `SceneDescriptor` / `SceneRegistry` / `GateRule` / `SceneRoute` +2. 把现有 3 个场景(expense_application / reimbursement / query_travel_standard)迁入 descriptor +3. 新增图节点:`load_context`、`gate_classify`、`resume_recent_task`、`persist_state` +4. 把 endpoint 的 4 个补丁函数搬进图节点 +5. endpoint 退化为 3 行 + +**验证**:后端全量测试绿 + 端到端(上海出差/再提交/查差旅标准)通过 + +### Phase 2:前端退化为纯渲染 + +**目标**:前端移除 7 层 if/else,统一发给后端。 + +1. `startInlineConversation` 改为:`fetchStewardPlan → renderPlanResponse` +2. 按 `plan.next_action` / `task_type` 分发到渲染器(申请预览/报销预览/查询结果/纯文本) +3. 移除 `shouldRequestWorkbenchAiIntentPlan`、`isReimbursementCreationIntent`、`isLowConfidenceTravelApplicationPlan` 等前端门控函数 +4. 保留并复用现有渲染组件(applicationPreview、stewardPlan 渲染逻辑不重写) + +**验证**:前端测试绿 + 人工验证各场景渲染正确 + +### Phase 3:清理冗余 + +1. 删除 `steward_planner_fallback.py` 的 `_classify_irrelevant_input` 独立门控 +2. 删除 `_looks_like_ambiguous_travel_flow` 独立判定(收进 gate_classify) +3. 统一 signal_keywords 来源(registry 唯一) +4. 删除旧的 endpoint 补丁函数 + +## 7. 验证标准(持久性的可衡量判据) + +接入一个新场景(如"查报销进度")时,**改动文件清单必须且仅限于**: + +| 文件 | 改动 | +|---|---| +| `scenes/scene_query_reimbursement_progress.py` | 新建:1 个 SceneDescriptor + 1 个 handler 函数 | +| `scenes/__init__.py` | 加 1 行 import + register | + +**如果接入时需要动 `steward_graph_planner.py` / `steward.py` / 前端 composable / extraction.py / fallback.py 中任何一个,说明架构没有收口成功。** 这是验收的硬标准。 + +## 8. 不改变的东西 + +- `RuntimeChatService`(模型供应商抽象):不动 +- `StewardActionExecutor`(执行分发):已在 registry 驱动,不动 +- `AgentConversationService`(消息持久化):不动,只是调用点从 endpoint 搬进图节点 +- LangGraph 的 `StateGraph` / `interrupt` / checkpoint:继续用,只是节点职责更完整 +- 现有渲染组件(applicationPreview 表格、stewardPlan 消息):复用,不重写 + +## 9. 风险与对策 + +| 风险 | 对策 | +|---|---| +| 图重构引入回归 | Phase 1 每搬一个节点跑一次全量测试 | +| 前端去掉门控后某些场景渲染不出 | Phase 2 先保留渲染器映射,只改"谁决策"不改"怎么渲染" | +| gate_classify 性能(遍历 registry) | 场景数 <20,关键词正则匹配 O(1),无性能问题 | +| LLM 历史注入搬进图后 token 超限 | 保持 limit=10 不变 | diff --git a/document/work-log/2026-06-25.md b/document/work-log/2026-06-25.md index 759bed4..099f14d 100644 --- a/document/work-log/2026-06-25.md +++ b/document/work-log/2026-06-25.md @@ -56,6 +56,33 @@ - 22:40:`server/rules/finance-rules/` 下有两个 Excel(交通工具等级标准、交通费用预估表)被标记为 modified,疑似容器运行时产物,非本次代码改动,未处理。 - 22:40:`agent-change-log` Skill 在当前环境不可调用,已按 AGENTS.md 规范手动增量更新本日志。 +- 23:30:我落地了会话上下文保留机制(LLM + 确定性双保险),解决了"用户删除草稿后说'再提交'丢失上下文"的问题。 + - Git 提交检查:`git fetch --all --prune` 后本地与 origin/main 同步(不 ahead 不 behind)。 + - 背景:排查确认对话消息和 steward_state 虽已持久化在 DB,但 plan 接口的 `build_plan` 从不读历史 task,且"再提交"被路由到 plan 接口(而非能恢复 task 的 runtime-decision 接口),导致系统无法把"再提交"和之前被拦的出差申请关联起来。 + - 修改①(LLM 历史关联·保险②):`steward.py` 新增 `_inject_recent_conversation_history`,在 build_plan 前用 `AgentConversationService.list_message_history(conversation_id, limit=10)` 读出最近 10 条对话,注入 `context_json.recent_history`。`steward_intent_agent.py` 的 `_build_messages` 把 recent_history 暴露为 context_payload 顶层结构化字段,并在 system prompt 加引导:"当用户说'再提交''继续''重新提交'等确认类话术时,必须结合 recent_history 里最近一次提到的出差/报销申请来理解"。 + - 修改②(确定性兜底·保险①):新建 `steward_context_resume.py`——`should_resume_recent_task` 检测"再提交"类话术(12 个关键词)+ `steward_state.flows` 有可恢复 flow;`resume_task_from_flow` 从 flow.fields 恢复 StewardTask(复用 runtime-decision 的恢复逻辑);`attach_resumed_task` 把恢复的 task 挂回 plan,planning_source 标记为 `context_resume`。`steward.py` 新增 `_apply_context_resume`,在 build_plan 后、plan 无 task 时触发确定性兜底。两个入口(`/plans` 和 `/plans/stream`)都已接入。 + - 验证:后端全量测试 **67 passed**(含新增 11 个:context_resume 8 + intent_agent history 3);端到端验证两轮对话——"上海出差火车"→"再提交",LLM 历史关联成功恢复 expense_application task(fields 完整);纯函数验证确定性兜底在模型返回空 task 时从 state 恢复(planning_source=context_resume)。 + - 影响:会话上下文保留到用户清理会话;行为处理只看最近 10 条,超长会话不爆 token;"再提交"类话术现在能恢复之前被拦的申请 task。正常 plan 产生的 task 已通过 `merge_plan` 写进 `steward_state.flows`,重复检查不改 state,所以 task 在 state 里一直存活到会话结束。 + +## 遗留问题(补充) + +- 23:30:历史条数固定为 10,未做 token 感知裁剪;极端情况下单条消息很长(如粘贴大段文本)可能导致 token 超限,但实测正常对话不会触发。 + +- 00:10:我完成了统一门控管道的架构设计文档,作为后续重构的唯一事实来源。 + - 文档路径:`document/development/AI意图规划器/UNIFIED_GATE_PIPELINE.md` + - 核心判断:当前门控散落在 7 处(前端 7 层 if/else + 后端 endpoint 4 个补丁 + 图条件边 + off_topic 关键词 + 候选流程判定),每加一个场景成本 O(n),漏一处静默出错。这是"不持久"的根因。 + - 目标架构:LangGraph 图成为唯一编排者(load_context → gate_classify → route 分支 → attach_action_steps → persist_state),endpoint 退化为 3 行纯 IO,前端退化为纯渲染(fetchStewardPlan → renderPlanResponse)。 + - 接入成本 O(1) 的硬验收标准:加场景只需新建 1 个 SceneDescriptor + 1 个 handler 函数 + 注册,不动图/endpoint/前端/extraction。 + - 迁移分 3 阶段:Phase 1 后端收口(建 scenes 注册表 + endpoint 补丁搬进图节点)、Phase 2 前端退化纯渲染(移除 7 层 if/else)、Phase 3 清理冗余。 + - Git 提交检查:本地与 origin/main 同步。 + +- 00:50:我完成了统一门控管道 Phase 1 的 scene 注册表骨架(P1.1-P1.2),作为后端收口的基础设施。 + - Git 提交检查:本地与 origin/main 同步。 + - 修改:新建 `server/src/app/services/scenes/` 目录——`gate_rules.py`(GateRule/SceneRoute 枚举)、`scene_descriptor.py`(SceneDescriptor dataclass,声明 scene_id/label/signal_keywords/ontology_fields/gate/route/handler/can_resume/flow_id/prompt_fragment/priority 等)、`scene_registry.py`(SceneRegistry 单例 + 查询方法)、3 个场景文件(expense_application/reimbursement/query_travel_standard)、`__init__.py`(bootstrap + 运行时绑定 handler/builder/executor)。 + - 验证:冒烟测试 3 个场景注册成功、优先级排序正确(query 在前,priority=50)、35 个 signal_keywords 聚合、handler/builder/executor 运行时绑定成功、无循环 import;后端全量 76 passed,scene 注册表的加入未破坏任何现有代码。 + - 影响:为后续图拓扑重构(P1.3-P1.8)提供了声明式场景注册基础设施。当前 scene_registry 与现有 intent_registry 并存,后续 P1.3-P1.7 会把 intent_registry 的消费者逐步迁移到 scene_registry。 + - 下一步:P1.3-P1.8 图拓扑重构(新增 load_context/gate_classify/resume/persist 节点、endpoint 退化、registry 消费者迁移)。 + ## TODO - [ ] 为 `quick_validate.py` 准备稳定运行环境,避免后续新增 Skill 时继续依赖人工兜底。(来源:09:18 技能校验) diff --git a/server/src/app/services/scenes/__init__.py b/server/src/app/services/scenes/__init__.py new file mode 100644 index 0000000..bf07d3d --- /dev/null +++ b/server/src/app/services/scenes/__init__.py @@ -0,0 +1,84 @@ +from __future__ import annotations + +from typing import Any + +from app.services.scenes.scene_descriptor import SceneDescriptor +from app.services.scenes.scene_registry import REGISTRY, register_scene # noqa: F401 + + +def bootstrap_scenes() -> None: + """注册全部业务场景,并运行时绑定 handler/builder/executor。 + + descriptor 声明时 handler/builder/executor 为 None(避免循环 import), + 这里在运行时从各自的服务模块取回实际可调用对象并回填到 descriptor。 + + 新增场景时: + 1. 新建 scenes/scene_xxx.py,声明 SceneDescriptor(handler 留 None) + 2. 在这里加一行 register 调用 + 3. 如有 handler,在 _bind_runtime_callbacks 里加绑定 + """ + # 声明式注册(不依赖任何服务模块) + from app.services.scenes import ( + scene_expense_application, + scene_query_travel_standard, + scene_reimbursement, + ) + + if REGISTRY.all_scene_ids(): + return # 已注册,避免重复 + + scene_expense_application.register() + scene_reimbursement.register() + scene_query_travel_standard.register() + + _bind_runtime_callbacks() + + +def _bind_runtime_callbacks() -> None: + """运行时把 handler/builder/executor 绑定到 descriptor。 + + 因为 SceneDescriptor 是 frozen dataclass,这里用替换的方式回填。 + """ + from app.services.steward_action_contracts import StewardActionPlanBuilder + from app.services.steward_action_executor import StewardActionExecutor + from app.services.steward_query_executors import ( + build_travel_standard_query_steps, + execute_travel_standard_query, + ) + + application_builder = StewardActionPlanBuilder() + + # expense_application + _update_scene( + "expense_application", + action_steps_builder=application_builder.build_application_steps, + executor=StewardActionExecutor._dispatch_application_action, + ) + + # reimbursement + _update_scene( + "reimbursement", + action_steps_builder=application_builder.build_reimbursement_steps, + executor=StewardActionExecutor._dispatch_reimbursement_action, + ) + + # query_travel_standard + _update_scene( + "query_travel_standard", + action_steps_builder=build_travel_standard_query_steps, + handler=execute_travel_standard_query, + executor=execute_travel_standard_query, + ) + + +def _update_scene(scene_id: str, **overrides: Any) -> None: + """替换 REGISTRY 里的 descriptor 字段(frozen dataclass 需重建)。""" + scene = REGISTRY.get(scene_id) + if scene is None: + return + updated = SceneDescriptor(**{**scene.__dict__, **overrides}) + REGISTRY.register(updated) + + +# import 即注册 +bootstrap_scenes() diff --git a/server/src/app/services/scenes/gate_rules.py b/server/src/app/services/scenes/gate_rules.py new file mode 100644 index 0000000..92ab5c6 --- /dev/null +++ b/server/src/app/services/scenes/gate_rules.py @@ -0,0 +1,38 @@ +from __future__ import annotations + +from enum import Enum + + +class GateRule(str, Enum): + """门控规则:决定场景如何参与 gate_classify 的裁决。""" + + OFF_TOPIC = "off_topic" + """非业务输入,走 off_topic_reply。""" + + CHOICE = "choice" + """明确的业务选择,命中 signal_keywords 即生效。""" + + AMBIGUOUS_FLOW = "ambiguous_flow" + """话术歧义,走候选流程确认。""" + + MODEL_ONLY = "model_only" + """只走 LLM function call,不参与规则匹配(如申请/报销的复杂识别)。""" + + +class SceneRoute(str, Enum): + """路由策略:gate_classify 裁决后决定走图的哪条边。""" + + HANDLER_ONLY = "handler_only" + """不走 LLM,直接执行 handler(查询/命令类场景)。""" + + MODEL_INTENT = "model_intent" + """走 LLM function call(申请/报销类场景)。""" + + OFF_TOPIC = "off_topic" + """走 off_topic 回复。""" + + RESUME = "resume" + """走确定性上下文恢复。""" + + AMBIGUOUS = "ambiguous" + """走候选流程确认。""" diff --git a/server/src/app/services/scenes/scene_descriptor.py b/server/src/app/services/scenes/scene_descriptor.py new file mode 100644 index 0000000..2d7e822 --- /dev/null +++ b/server/src/app/services/scenes/scene_descriptor.py @@ -0,0 +1,63 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any, Callable + +from app.services.scenes.gate_rules import GateRule, SceneRoute + + +@dataclass(frozen=True) +class SceneDescriptor: + """单个业务场景的声明式描述符。 + + 一个场景的"如何识别""走哪条路""做什么""要什么槽位""能否恢复上下文" + 全部在这里声明,实现接入成本 O(1)。 + """ + + scene_id: str + """唯一标识,等同 task_type(如 expense_application / query_travel_standard)。""" + + label: str + """中文标签,用于 system prompt、前端展示、日志。""" + + signal_keywords: tuple[str, ...] = () + """规则识别的关键词;聚合进 off_topic 信号池,也用于 CHOICE 门控规则匹配。""" + + ontology_fields: tuple[str, ...] = () + """该场景允许的 canonical 槽位;为空表示沿用全局 BUSINESS_CANONICAL_FIELDS。""" + + gate: GateRule = GateRule.MODEL_ONLY + """门控规则,决定场景如何参与 gate_classify 裁决。""" + + route: SceneRoute = SceneRoute.MODEL_INTENT + """路由策略,gate_classify 命中后决定走图的哪条边。""" + + handler: Callable[..., Any] | None = None + """执行函数;HANDLER_ONLY 路由必填,其他路由可选。""" + + action_steps_builder: Callable[[Any], list[Any]] | None = None + """动作步骤生成函数;把 StewardTask 转换为白名单 action steps。""" + + can_resume: bool = False + """是否参与"再提交"上下文恢复。""" + + flow_id: str | None = None + """候选流程确认使用的 flow_id;查询/命令类为 None。""" + + prompt_fragment: str = "" + """注入 steward_intent_agent system prompt 的识别指引片段。""" + + priority: int = 100 + """gate_classify 的匹配优先级;数字小的优先。""" + + side_effect_actions: tuple[str, ...] = () + """该场景产生副作用的 action_type 集合。""" + + noop_actions: tuple[str, ...] = () + """该场景的无副作用 action_type 集合(填充/预览/校验等)。""" + + assigned_agent: str = "" + """该场景对应的执行 agent 标识。""" + + executor: Callable[..., Any] | None = None + """副作用/查询动作的执行器;供 action_executor 通过 registry 分发。""" diff --git a/server/src/app/services/scenes/scene_expense_application.py b/server/src/app/services/scenes/scene_expense_application.py new file mode 100644 index 0000000..f4dd61b --- /dev/null +++ b/server/src/app/services/scenes/scene_expense_application.py @@ -0,0 +1,38 @@ +from __future__ import annotations + +from app.services.scenes.gate_rules import GateRule, SceneRoute +from app.services.scenes.scene_descriptor import SceneDescriptor +from app.services.scenes.scene_registry import register_scene + + +def register() -> None: + register_scene( + SceneDescriptor( + scene_id="expense_application", + label="费用申请", + assigned_agent="application_assistant", + signal_keywords=( + "申请", "出差", "差旅", "费用", "交通", "住宿", "采购", "会务", "会议", + "客户现场", "项目", "拜访", "调研", "驻场", "上线", "验收", + ), + ontology_fields=(), # 沿用全局 BUSINESS_CANONICAL_FIELDS,运行时 fallback + gate=GateRule.MODEL_ONLY, + route=SceneRoute.MODEL_INTENT, + handler=None, + action_steps_builder=None, # 运行时从 StewardActionPlanBuilder 取 + can_resume=True, + flow_id="travel_application", + side_effect_actions=("save_application_draft", "submit_application", "run_duplicate_precheck"), + noop_actions=( + "fill_application_fields", + "build_application_preview", + "validate_required_fields", + ), + executor=None, # 运行时从 StewardActionExecutor 取 + prompt_fragment=( + "用户描述未来出差、差旅计划、去某地几天、部署、支撑、拜访或会议安排时," + "即使没有出现“申请”两个字,也必须优先识别为 expense_application。" + ), + priority=100, + ) + ) diff --git a/server/src/app/services/scenes/scene_query_travel_standard.py b/server/src/app/services/scenes/scene_query_travel_standard.py new file mode 100644 index 0000000..7ae709f --- /dev/null +++ b/server/src/app/services/scenes/scene_query_travel_standard.py @@ -0,0 +1,40 @@ +from __future__ import annotations + +from app.services.scenes.gate_rules import GateRule, SceneRoute +from app.services.scenes.scene_descriptor import SceneDescriptor +from app.services.scenes.scene_registry import register_scene + + +def register() -> None: + register_scene( + SceneDescriptor( + scene_id="query_travel_standard", + label="差旅标准查询", + assigned_agent="policy_query_assistant", + signal_keywords=( + "差旅标准", "住宿标准", "出差标准", "交通标准", "出差补助", + "差旅补贴", "住宿补助", "交通补助", "职级标准", "差标", + ), + ontology_fields=( + "location", + "employee_grade", + "standard_category", + "expense_type", + ), + gate=GateRule.CHOICE, + route=SceneRoute.HANDLER_ONLY, + handler=None, # 运行时从 steward_query_executors 取 + action_steps_builder=None, # 运行时从 steward_query_executors 取 + can_resume=False, + flow_id=None, + side_effect_actions=("execute_travel_standard_query",), + noop_actions=(), + executor=None, # 运行时从 steward_query_executors 取 + prompt_fragment=( + "用户询问差旅住宿标准、交通标准、出差补助或差旅补贴标准时," + "必须识别为 query_travel_standard,而不是 expense_application 或 reimbursement。" + "差旅标准查询不创建任何单据,只返回标准数值。" + ), + priority=50, # 比 MODEL_ONLY 场景优先,确保查询类先被规则命中 + ) + ) diff --git a/server/src/app/services/scenes/scene_registry.py b/server/src/app/services/scenes/scene_registry.py new file mode 100644 index 0000000..3234dda --- /dev/null +++ b/server/src/app/services/scenes/scene_registry.py @@ -0,0 +1,110 @@ +from __future__ import annotations + +from typing import Any, Callable + +from app.services.scenes.gate_rules import GateRule, SceneRoute +from app.services.scenes.scene_descriptor import SceneDescriptor + + +class SceneRegistry: + """场景注册表单例。 + + 所有场景在 import 时注册,门控/路由/执行/字段过滤全部从这里查询。 + gate_classify 节点是它的唯一消费者(单一决策点)。 + """ + + def __init__(self) -> None: + self._scenes: dict[str, SceneDescriptor] = {} + self._flow_to_scene: dict[str, str] = {} + + # ---- 注册 ---- + + def register(self, descriptor: SceneDescriptor) -> SceneDescriptor: + self._scenes[descriptor.scene_id] = descriptor + if descriptor.flow_id: + self._flow_to_scene[descriptor.flow_id] = descriptor.scene_id + return descriptor + + # ---- 查询 ---- + + def get(self, scene_id: str) -> SceneDescriptor | None: + return self._scenes.get(str(scene_id or "").strip()) + + def all_scenes(self) -> list[SceneDescriptor]: + return list(self._scenes.values()) + + def scenes_sorted_by_priority(self) -> list[SceneDescriptor]: + """按 priority 升序排列(数字小优先)。""" + return sorted(self._scenes.values(), key=lambda s: s.priority) + + def all_scene_ids(self) -> list[str]: + return [s.scene_id for s in self._scenes.values()] + + def all_assigned_agents(self) -> list[str]: + return [s.assigned_agent for s in self._scenes.values() if s.assigned_agent] + + def all_flow_ids(self) -> list[str]: + return [s.flow_id for s in self._scenes.values() if s.flow_id] + + def all_signal_keywords(self) -> set[str]: + keywords: set[str] = set() + for scene in self._scenes.values(): + keywords.update(scene.signal_keywords) + return keywords + + def all_side_effect_actions(self) -> set[str]: + actions: set[str] = set() + for scene in self._scenes.values(): + actions.update(scene.side_effect_actions) + return actions + + def all_noop_actions(self) -> set[str]: + actions: set[str] = set() + for scene in self._scenes.values(): + actions.update(scene.noop_actions) + return actions + + def resolve_scene_by_action(self, action_type: str) -> SceneDescriptor | None: + normalized = str(action_type or "").strip() + for scene in self._scenes.values(): + if normalized in scene.side_effect_actions or normalized in scene.noop_actions: + return scene + return None + + def resolve_scene_by_flow(self, flow_id: str) -> SceneDescriptor | None: + scene_id = self._flow_to_scene.get(str(flow_id or "").strip()) + return self.get(scene_id) if scene_id else None + + def field_allowlist_for( + self, + scene_id: str, + *, + fallback: frozenset[str] | None = None, + ) -> frozenset[str]: + scene = self.get(scene_id) + if scene and scene.ontology_fields: + return frozenset(scene.ontology_fields) + return fallback or frozenset() + + def resumable_scenes(self) -> list[SceneDescriptor]: + """返回所有声明了 can_resume=True 的场景。""" + return [s for s in self._scenes.values() if s.can_resume] + + def prompt_fragments(self) -> str: + """拼接所有场景的 prompt_fragment,供 system prompt 注入。""" + fragments = [s.prompt_fragment for s in self._scenes.values() if s.prompt_fragment] + return "".join(fragments) + + def intent_summary(self) -> str: + """拼接场景列表摘要,供 system prompt 引用。""" + fragments = [f"{s.scene_id}({s.label})" for s in self._scenes.values()] + return "、".join(fragments) if fragments else "(暂无已注册场景)" + + +# 全局单例 +REGISTRY = SceneRegistry() + + +def register_scene(descriptor: SceneDescriptor) -> SceneDescriptor: + """注册场景到全局单例。""" + return REGISTRY.register(descriptor) diff --git a/server/src/app/services/scenes/scene_reimbursement.py b/server/src/app/services/scenes/scene_reimbursement.py new file mode 100644 index 0000000..768977e --- /dev/null +++ b/server/src/app/services/scenes/scene_reimbursement.py @@ -0,0 +1,41 @@ +from __future__ import annotations + +from app.services.scenes.gate_rules import GateRule, SceneRoute +from app.services.scenes.scene_descriptor import SceneDescriptor +from app.services.scenes.scene_registry import register_scene + + +def register() -> None: + register_scene( + SceneDescriptor( + scene_id="reimbursement", + label="费用报销", + assigned_agent="reimbursement_assistant", + signal_keywords=( + "报销", "报账", "票据", "发票", "凭证", "行程单", "付款截图", "小票", "收据", + ), + ontology_fields=(), # 沿用全局 BUSINESS_CANONICAL_FIELDS + gate=GateRule.MODEL_ONLY, + route=SceneRoute.MODEL_INTENT, + handler=None, + action_steps_builder=None, + can_resume=False, + flow_id="travel_reimbursement", + side_effect_actions=( + "create_reimbursement_draft", + "link_existing_application", + "associate_attachments", + ), + noop_actions=( + "fill_reimbursement_fields", + "build_reimbursement_preview", + "validate_required_fields", + ), + executor=None, + prompt_fragment=( + "用户描述已经发生的费用、昨天/前天费用、票据或明确报销诉求时," + "才识别为 reimbursement。" + ), + priority=100, + ) + )