Files
X-Financial/document/development/AI意图规划器/UNIFIED_GATE_PIPELINE.md
caoxiaozhu 54356ba81a refactor(server): scene 注册表骨架 + 统一门控管道设计文档
Phase 1 P1.1-P1.2:为后端门控收口提供声明式场景注册基础设施。

- 新建 scenes/ 目录:gate_rules(GateRule/SceneRoute 枚举)、scene_descriptor(SceneDescriptor dataclass)、scene_registry(SceneRegistry 单例)
- 3 个场景迁入 descriptor:expense_application / reimbursement / query_travel_standard
- __init__.py 的 bootstrap_scenes 在 import 时注册 + 运行时绑定 handler/builder/executor(解决循环 import)
- 查询场景 priority=50 优先于 MODEL_ONLY 场景,确保规则匹配先于 LLM
- 落地 UNIFIED_GATE_PIPELINE.md 架构文档:目标架构 / 验收标准(接入 O(1))/ 3 阶段迁移路径
- 76 passed,scene 注册表未破坏现有代码;与 intent_registry 暂时并存,P1.3-P1.8 会统一迁移
2026-06-25 15:09:16 +08:00

12 KiB
Raw Permalink Blame History

统一门控管道Unified Gate Pipeline

状态:设计定稿,待实施 创建2026-06-25 关联:CONCEPT.mdLANGGRAPH_RUNTIME_MIGRATION.md

1. 为什么要做这件事

1.1 现状的致命问题

小财管家的门控(决定用户输入走哪条路)目前散落在 7 个位置,互相不知道对方的结论,每加一个场景要找 n 个地方改:

# 位置 文件 做的门控
1 前端 7 层 if/else usePersonalWorkbenchAiMode.js:858-913 startInlineConversation 命令→文本动作→草稿→模型规划→报销→闲聊,每层各自 return
2 前端业务词预筛 workbenchAiIntentPlannerModel.js:shouldRequestWorkbenchAiIntentPlan 不含业务词的输入不发给后端
3 后端 endpoint 补丁群 steward.py:create_steward_plan _hydrate_required_application_gate / _inject_recent_conversation_history / _apply_context_resume 三个补丁串在 build_plan 前后
4 图条件边路由 steward_graph_planner.py:_route_after_prepare_context off_topic / model / fallback 三路
5 off_topic 关键词 steward_planner_fallback.py:_classify_irrelevant_input + STEWARD_BUSINESS_SIGNAL_KEYWORDS 写死的信号词元组
6 候选流程歧义 steward_planner_extraction.py:_looks_like_ambiguous_travel_flow 独立的正则判定
7 图后意图处理 usePersonalWorkbenchAiMode.js:813-835 executeModelPlannedWorkbenchIntent 前端再判一遍 task_type 决定渲染申请预览还是报销

根因:没有单一的决策点。 LangGraph 图只承担了"意图识别"这一个职责,控制流泄漏到了 endpoint 层和前端 composable形成两个影子编排器。

1.2 不持久的判据

加一个场景(如"查报销进度")的成本是 O(n)——必须同步改前端门控、后端补丁、图条件边、off_topic 关键词、候选流程判定等多处,漏一处就静默出错。本次会话已经在不断验证这个痛点:每个新场景(查询、低置信度、上下文恢复)都是往不同位置打补丁。

2. 目标:接入成本 O(1)

加一个新场景,全文改动只有一处

# server/src/app/services/scenes/scene_query_reimbursement_progress.py
register_scene(SceneDescriptor(
    scene_id="query_reimbursement_progress",
    label="报销进度查询",
    signal_keywords=("报销进度", "报销状态", "审批进度", "审批到哪了"),
    ontology_fields=("claim_no", "time_range"),
    gate=GateRule.CHOICE,           # 不走候选流程、不走 off_topic
    can_resume=False,               # 不参与上下文恢复
    route=SceneRoute.HANDLER_ONLY,  # 不走 LLM,直接执行 handler
    handler=execute_progress_query, # 纯函数:检索 + 拼装
    prompt_fragment="用户询问报销审批进度/状态时,识别为 query_reimbursement_progress。",
))

不改图、不改 endpoint、不改前端门控、不改 extraction。 判断规则、路由、执行、槽位、恢复能力在同一个 descriptor 里声明,不会割裂。

3. 目标架构

3.1 后端:图成为唯一编排者

POST /api/v1/steward/plans
  ↓
endpoint: 纯 IO (收请求 → graph.invoke → 返响应,零编排)
  ↓
LangGraph StateGraph (唯一编排者):
  START
    → load_context        读最近10条历史 + steward_state + hydrate
    → gate_classify       统一门控:按 registry 规则裁决 scene + route
    → route 分支
         ├─ off_topic        → off_topic_reply
         ├─ handler_only     → execute_scene_handler  (查询/命令类,不走 LLM)
         ├─ resume           → resume_recent_task     ("再提交"确定性恢复)
         ├─ ambiguous_flow   → pending_flow_confirmation
         └─ model_intent     → detect_model_intent → {done | fallback}
    → attach_action_steps
    → persist_state       写 message + steward_state
    → END

endpoint 层只剩 3 行planner = build(db); plan = planner.build_plan(payload); return plan。所有 hydrate/inject/resume 全部搬进图节点。

3.2 前端:退化为纯渲染

用户输入
  ↓
前端: 不再自己决策,统一发给后端
  POST /steward/plans { message, conversation_id }
  ↓
后端返回 StewardPlanResponse:
  - plan.next_action 告诉前端该渲染什么
  - plan.tasks[].task_type 告诉前端该用哪个渲染器
  - plan.suggested_actions 告诉前端该显示哪些按钮
  ↓
前端: 按 response 的指令渲染(申请预览/报销预览/查询结果/纯文本回复)

前端的 7 层 if/else 全部移除,替换为:

async function startInlineConversation(prompt) {
  const plan = await fetchStewardPlan({ message: prompt, conversation_id: conversationId.value })
  renderPlanResponse(plan)  // 按 plan.next_action / task_type 分发到对应渲染器
}

3.3 SceneDescriptor场景的唯一声明

@dataclass(frozen=True)
class SceneDescriptor:
    scene_id: str                          # 唯一标识,等同 task_type
    label: str                             # 中文标签
    signal_keywords: tuple[str, ...]       # 规则识别的关键词(聚合进 off_topic 信号池)
    ontology_fields: tuple[str, ...]       # 该场景允许的槽位
    gate: GateRule                         # 门控规则(见 3.4)
    route: SceneRoute                      # 路由策略(见 3.5)
    handler: Callable | None               # 执行函数(handler_only 路由用)
    can_resume: bool = False               # 是否参与"再提交"上下文恢复
    action_steps_builder: Callable = ...   # 动作步骤生成
    prompt_fragment: str = ""              # 注入 LLM system prompt 的识别指引
    priority: int = 100                    # gate_classify 的匹配优先级(小优先)
    flow_id: str | None = None             # 候选流程用;查询/命令类为 None

3.4 GateRule门控规则枚举

class GateRule(Enum):
    OFF_TOPIC = "off_topic"           # 非业务输入,走 off_topic_reply
    CHOICE = "choice"                 # 明确的业务选择,走 handler/model
    AMBIGUOUS_FLOW = "ambiguous_flow" # 话术歧义,走候选流程确认
    MODEL_ONLY = "model_only"         # 只走 LLM function call,不参与规则匹配

3.5 SceneRoute路由策略枚举

class SceneRoute(Enum):
    HANDLER_ONLY = "handler_only"     # 不走 LLM,直接执行 handler(查询/命令类)
    MODEL_INTENT = "model_intent"     # 走 LLM function call(申请/报销类)
    OFF_TOPIC = "off_topic"           # 走 off_topic 回复
    RESUME = "resume"                 # 走确定性上下文恢复
    AMBIGUOUS = "ambiguous"           # 走候选流程确认

4. gate_classify 节点的裁决逻辑(唯一决策点)

def gate_classify(state) -> dict:
    """统一门控:按优先级遍历 registry,输出 scene_id + route。"""
    message = state["message"]
    steward_state = state["steward_state"]
    history = state["recent_history"]

    # ① off_topic 门:聚合所有场景的 signal_keywords,无命中 → off_topic
    if not _matches_any_signal(message):
        return {"scene_id": "off_topic", "route": SceneRoute.OFF_TOPIC}

    # ② resume 门:用户说"再提交"+ state 有可恢复 flow
    resume_scene = _check_resume(message, steward_state)
    if resume_scene:
        return {"scene_id": resume_scene, "route": SceneRoute.RESUME}

    # ③ 规则匹配门:按 priority 遍历,命中 signal_keywords 的场景
    for scene in registry.scenes_sorted_by_priority():
        if scene.gate == GateRule.CHOICE and _matches_keywords(message, scene.signal_keywords):
            return {"scene_id": scene.scene_id, "route": scene.route}

    # ④ LLM 门:规则未命中,走 model function call
    return {"scene_id": None, "route": SceneRoute.MODEL_INTENT}

所有门控收敛到这一个函数。 off_topic 信号词、resume 判断、规则匹配、LLM 兜底,全部在这里按固定顺序裁决。

5. 文件结构

server/src/app/services/
  scenes/                         # 场景声明(每个场景一个文件)
    __init__.py                   # 注册所有场景
    scene_registry.py             # SceneRegistry 单例 + 查询方法
    scene_descriptor.py           # SceneDescriptor dataclass
    scene_expense_application.py  # 出差申请场景
    scene_reimbursement.py        # 报销场景
    scene_query_travel_standard.py# 差旅标准查询场景
    gate_rules.py                 # GateRule / SceneRoute 枚举
  steward_graph_planner.py        # 图:load_context/gate_classify/.../persist_state
  steward_scene_handlers.py       # 各场景的 handler 纯函数

6. 迁移路径(分阶段,每阶段可独立验证)

Phase 1建场景注册表 + 收口后端门控(后端自闭环)

目标:后端 endpoint 零编排,图成为唯一编排者。

  1. 新建 scenes/ 目录,实现 SceneDescriptor / SceneRegistry / GateRule / SceneRoute
  2. 把现有 3 个场景expense_application / reimbursement / query_travel_standard迁入 descriptor
  3. 新增图节点:load_contextgate_classifyresume_recent_taskpersist_state
  4. 把 endpoint 的 4 个补丁函数搬进图节点
  5. endpoint 退化为 3 行

验证:后端全量测试绿 + 端到端(上海出差/再提交/查差旅标准)通过

Phase 2前端退化为纯渲染

目标:前端移除 7 层 if/else统一发给后端。

  1. startInlineConversation 改为:fetchStewardPlan → renderPlanResponse
  2. plan.next_action / task_type 分发到渲染器(申请预览/报销预览/查询结果/纯文本)
  3. 移除 shouldRequestWorkbenchAiIntentPlanisReimbursementCreationIntentisLowConfidenceTravelApplicationPlan 等前端门控函数
  4. 保留并复用现有渲染组件applicationPreview、stewardPlan 渲染逻辑不重写)

验证:前端测试绿 + 人工验证各场景渲染正确

Phase 3清理冗余

  1. 删除 steward_planner_fallback.py_classify_irrelevant_input 独立门控
  2. 删除 _looks_like_ambiguous_travel_flow 独立判定(收进 gate_classify
  3. 统一 signal_keywords 来源registry 唯一)
  4. 删除旧的 endpoint 补丁函数

7. 验证标准(持久性的可衡量判据)

接入一个新场景(如"查报销进度")时,改动文件清单必须且仅限于

文件 改动
scenes/scene_query_reimbursement_progress.py 新建1 个 SceneDescriptor + 1 个 handler 函数
scenes/__init__.py 加 1 行 import + register

如果接入时需要动 steward_graph_planner.py / steward.py / 前端 composable / extraction.py / fallback.py 中任何一个,说明架构没有收口成功。 这是验收的硬标准。

8. 不改变的东西

  • RuntimeChatService(模型供应商抽象):不动
  • StewardActionExecutor(执行分发):已在 registry 驱动,不动
  • AgentConversationService(消息持久化):不动,只是调用点从 endpoint 搬进图节点
  • LangGraph 的 StateGraph / interrupt / checkpoint继续用只是节点职责更完整
  • 现有渲染组件applicationPreview 表格、stewardPlan 消息):复用,不重写

9. 风险与对策

风险 对策
图重构引入回归 Phase 1 每搬一个节点跑一次全量测试
前端去掉门控后某些场景渲染不出 Phase 2 先保留渲染器映射,只改"谁决策"不改"怎么渲染"
gate_classify 性能(遍历 registry 场景数 <20关键词正则匹配 O(1),无性能问题
LLM 历史注入搬进图后 token 超限 保持 limit=10 不变