refactor(server): scene 注册表骨架 + 统一门控管道设计文档
Phase 1 P1.1-P1.2:为后端门控收口提供声明式场景注册基础设施。 - 新建 scenes/ 目录:gate_rules(GateRule/SceneRoute 枚举)、scene_descriptor(SceneDescriptor dataclass)、scene_registry(SceneRegistry 单例) - 3 个场景迁入 descriptor:expense_application / reimbursement / query_travel_standard - __init__.py 的 bootstrap_scenes 在 import 时注册 + 运行时绑定 handler/builder/executor(解决循环 import) - 查询场景 priority=50 优先于 MODEL_ONLY 场景,确保规则匹配先于 LLM - 落地 UNIFIED_GATE_PIPELINE.md 架构文档:目标架构 / 验收标准(接入 O(1))/ 3 阶段迁移路径 - 76 passed,scene 注册表未破坏现有代码;与 intent_registry 暂时并存,P1.3-P1.8 会统一迁移
This commit is contained in:
84
server/src/app/services/scenes/__init__.py
Normal file
84
server/src/app/services/scenes/__init__.py
Normal file
@@ -0,0 +1,84 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
from app.services.scenes.scene_descriptor import SceneDescriptor
|
||||
from app.services.scenes.scene_registry import REGISTRY, register_scene # noqa: F401
|
||||
|
||||
|
||||
def bootstrap_scenes() -> None:
|
||||
"""注册全部业务场景,并运行时绑定 handler/builder/executor。
|
||||
|
||||
descriptor 声明时 handler/builder/executor 为 None(避免循环 import),
|
||||
这里在运行时从各自的服务模块取回实际可调用对象并回填到 descriptor。
|
||||
|
||||
新增场景时:
|
||||
1. 新建 scenes/scene_xxx.py,声明 SceneDescriptor(handler 留 None)
|
||||
2. 在这里加一行 register 调用
|
||||
3. 如有 handler,在 _bind_runtime_callbacks 里加绑定
|
||||
"""
|
||||
# 声明式注册(不依赖任何服务模块)
|
||||
from app.services.scenes import (
|
||||
scene_expense_application,
|
||||
scene_query_travel_standard,
|
||||
scene_reimbursement,
|
||||
)
|
||||
|
||||
if REGISTRY.all_scene_ids():
|
||||
return # 已注册,避免重复
|
||||
|
||||
scene_expense_application.register()
|
||||
scene_reimbursement.register()
|
||||
scene_query_travel_standard.register()
|
||||
|
||||
_bind_runtime_callbacks()
|
||||
|
||||
|
||||
def _bind_runtime_callbacks() -> None:
|
||||
"""运行时把 handler/builder/executor 绑定到 descriptor。
|
||||
|
||||
因为 SceneDescriptor 是 frozen dataclass,这里用替换的方式回填。
|
||||
"""
|
||||
from app.services.steward_action_contracts import StewardActionPlanBuilder
|
||||
from app.services.steward_action_executor import StewardActionExecutor
|
||||
from app.services.steward_query_executors import (
|
||||
build_travel_standard_query_steps,
|
||||
execute_travel_standard_query,
|
||||
)
|
||||
|
||||
application_builder = StewardActionPlanBuilder()
|
||||
|
||||
# expense_application
|
||||
_update_scene(
|
||||
"expense_application",
|
||||
action_steps_builder=application_builder.build_application_steps,
|
||||
executor=StewardActionExecutor._dispatch_application_action,
|
||||
)
|
||||
|
||||
# reimbursement
|
||||
_update_scene(
|
||||
"reimbursement",
|
||||
action_steps_builder=application_builder.build_reimbursement_steps,
|
||||
executor=StewardActionExecutor._dispatch_reimbursement_action,
|
||||
)
|
||||
|
||||
# query_travel_standard
|
||||
_update_scene(
|
||||
"query_travel_standard",
|
||||
action_steps_builder=build_travel_standard_query_steps,
|
||||
handler=execute_travel_standard_query,
|
||||
executor=execute_travel_standard_query,
|
||||
)
|
||||
|
||||
|
||||
def _update_scene(scene_id: str, **overrides: Any) -> None:
|
||||
"""替换 REGISTRY 里的 descriptor 字段(frozen dataclass 需重建)。"""
|
||||
scene = REGISTRY.get(scene_id)
|
||||
if scene is None:
|
||||
return
|
||||
updated = SceneDescriptor(**{**scene.__dict__, **overrides})
|
||||
REGISTRY.register(updated)
|
||||
|
||||
|
||||
# import 即注册
|
||||
bootstrap_scenes()
|
||||
38
server/src/app/services/scenes/gate_rules.py
Normal file
38
server/src/app/services/scenes/gate_rules.py
Normal file
@@ -0,0 +1,38 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from enum import Enum
|
||||
|
||||
|
||||
class GateRule(str, Enum):
|
||||
"""门控规则:决定场景如何参与 gate_classify 的裁决。"""
|
||||
|
||||
OFF_TOPIC = "off_topic"
|
||||
"""非业务输入,走 off_topic_reply。"""
|
||||
|
||||
CHOICE = "choice"
|
||||
"""明确的业务选择,命中 signal_keywords 即生效。"""
|
||||
|
||||
AMBIGUOUS_FLOW = "ambiguous_flow"
|
||||
"""话术歧义,走候选流程确认。"""
|
||||
|
||||
MODEL_ONLY = "model_only"
|
||||
"""只走 LLM function call,不参与规则匹配(如申请/报销的复杂识别)。"""
|
||||
|
||||
|
||||
class SceneRoute(str, Enum):
|
||||
"""路由策略:gate_classify 裁决后决定走图的哪条边。"""
|
||||
|
||||
HANDLER_ONLY = "handler_only"
|
||||
"""不走 LLM,直接执行 handler(查询/命令类场景)。"""
|
||||
|
||||
MODEL_INTENT = "model_intent"
|
||||
"""走 LLM function call(申请/报销类场景)。"""
|
||||
|
||||
OFF_TOPIC = "off_topic"
|
||||
"""走 off_topic 回复。"""
|
||||
|
||||
RESUME = "resume"
|
||||
"""走确定性上下文恢复。"""
|
||||
|
||||
AMBIGUOUS = "ambiguous"
|
||||
"""走候选流程确认。"""
|
||||
63
server/src/app/services/scenes/scene_descriptor.py
Normal file
63
server/src/app/services/scenes/scene_descriptor.py
Normal file
@@ -0,0 +1,63 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any, Callable
|
||||
|
||||
from app.services.scenes.gate_rules import GateRule, SceneRoute
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class SceneDescriptor:
|
||||
"""单个业务场景的声明式描述符。
|
||||
|
||||
一个场景的"如何识别""走哪条路""做什么""要什么槽位""能否恢复上下文"
|
||||
全部在这里声明,实现接入成本 O(1)。
|
||||
"""
|
||||
|
||||
scene_id: str
|
||||
"""唯一标识,等同 task_type(如 expense_application / query_travel_standard)。"""
|
||||
|
||||
label: str
|
||||
"""中文标签,用于 system prompt、前端展示、日志。"""
|
||||
|
||||
signal_keywords: tuple[str, ...] = ()
|
||||
"""规则识别的关键词;聚合进 off_topic 信号池,也用于 CHOICE 门控规则匹配。"""
|
||||
|
||||
ontology_fields: tuple[str, ...] = ()
|
||||
"""该场景允许的 canonical 槽位;为空表示沿用全局 BUSINESS_CANONICAL_FIELDS。"""
|
||||
|
||||
gate: GateRule = GateRule.MODEL_ONLY
|
||||
"""门控规则,决定场景如何参与 gate_classify 裁决。"""
|
||||
|
||||
route: SceneRoute = SceneRoute.MODEL_INTENT
|
||||
"""路由策略,gate_classify 命中后决定走图的哪条边。"""
|
||||
|
||||
handler: Callable[..., Any] | None = None
|
||||
"""执行函数;HANDLER_ONLY 路由必填,其他路由可选。"""
|
||||
|
||||
action_steps_builder: Callable[[Any], list[Any]] | None = None
|
||||
"""动作步骤生成函数;把 StewardTask 转换为白名单 action steps。"""
|
||||
|
||||
can_resume: bool = False
|
||||
"""是否参与"再提交"上下文恢复。"""
|
||||
|
||||
flow_id: str | None = None
|
||||
"""候选流程确认使用的 flow_id;查询/命令类为 None。"""
|
||||
|
||||
prompt_fragment: str = ""
|
||||
"""注入 steward_intent_agent system prompt 的识别指引片段。"""
|
||||
|
||||
priority: int = 100
|
||||
"""gate_classify 的匹配优先级;数字小的优先。"""
|
||||
|
||||
side_effect_actions: tuple[str, ...] = ()
|
||||
"""该场景产生副作用的 action_type 集合。"""
|
||||
|
||||
noop_actions: tuple[str, ...] = ()
|
||||
"""该场景的无副作用 action_type 集合(填充/预览/校验等)。"""
|
||||
|
||||
assigned_agent: str = ""
|
||||
"""该场景对应的执行 agent 标识。"""
|
||||
|
||||
executor: Callable[..., Any] | None = None
|
||||
"""副作用/查询动作的执行器;供 action_executor 通过 registry 分发。"""
|
||||
38
server/src/app/services/scenes/scene_expense_application.py
Normal file
38
server/src/app/services/scenes/scene_expense_application.py
Normal file
@@ -0,0 +1,38 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from app.services.scenes.gate_rules import GateRule, SceneRoute
|
||||
from app.services.scenes.scene_descriptor import SceneDescriptor
|
||||
from app.services.scenes.scene_registry import register_scene
|
||||
|
||||
|
||||
def register() -> None:
|
||||
register_scene(
|
||||
SceneDescriptor(
|
||||
scene_id="expense_application",
|
||||
label="费用申请",
|
||||
assigned_agent="application_assistant",
|
||||
signal_keywords=(
|
||||
"申请", "出差", "差旅", "费用", "交通", "住宿", "采购", "会务", "会议",
|
||||
"客户现场", "项目", "拜访", "调研", "驻场", "上线", "验收",
|
||||
),
|
||||
ontology_fields=(), # 沿用全局 BUSINESS_CANONICAL_FIELDS,运行时 fallback
|
||||
gate=GateRule.MODEL_ONLY,
|
||||
route=SceneRoute.MODEL_INTENT,
|
||||
handler=None,
|
||||
action_steps_builder=None, # 运行时从 StewardActionPlanBuilder 取
|
||||
can_resume=True,
|
||||
flow_id="travel_application",
|
||||
side_effect_actions=("save_application_draft", "submit_application", "run_duplicate_precheck"),
|
||||
noop_actions=(
|
||||
"fill_application_fields",
|
||||
"build_application_preview",
|
||||
"validate_required_fields",
|
||||
),
|
||||
executor=None, # 运行时从 StewardActionExecutor 取
|
||||
prompt_fragment=(
|
||||
"用户描述未来出差、差旅计划、去某地几天、部署、支撑、拜访或会议安排时,"
|
||||
"即使没有出现“申请”两个字,也必须优先识别为 expense_application。"
|
||||
),
|
||||
priority=100,
|
||||
)
|
||||
)
|
||||
@@ -0,0 +1,40 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from app.services.scenes.gate_rules import GateRule, SceneRoute
|
||||
from app.services.scenes.scene_descriptor import SceneDescriptor
|
||||
from app.services.scenes.scene_registry import register_scene
|
||||
|
||||
|
||||
def register() -> None:
|
||||
register_scene(
|
||||
SceneDescriptor(
|
||||
scene_id="query_travel_standard",
|
||||
label="差旅标准查询",
|
||||
assigned_agent="policy_query_assistant",
|
||||
signal_keywords=(
|
||||
"差旅标准", "住宿标准", "出差标准", "交通标准", "出差补助",
|
||||
"差旅补贴", "住宿补助", "交通补助", "职级标准", "差标",
|
||||
),
|
||||
ontology_fields=(
|
||||
"location",
|
||||
"employee_grade",
|
||||
"standard_category",
|
||||
"expense_type",
|
||||
),
|
||||
gate=GateRule.CHOICE,
|
||||
route=SceneRoute.HANDLER_ONLY,
|
||||
handler=None, # 运行时从 steward_query_executors 取
|
||||
action_steps_builder=None, # 运行时从 steward_query_executors 取
|
||||
can_resume=False,
|
||||
flow_id=None,
|
||||
side_effect_actions=("execute_travel_standard_query",),
|
||||
noop_actions=(),
|
||||
executor=None, # 运行时从 steward_query_executors 取
|
||||
prompt_fragment=(
|
||||
"用户询问差旅住宿标准、交通标准、出差补助或差旅补贴标准时,"
|
||||
"必须识别为 query_travel_standard,而不是 expense_application 或 reimbursement。"
|
||||
"差旅标准查询不创建任何单据,只返回标准数值。"
|
||||
),
|
||||
priority=50, # 比 MODEL_ONLY 场景优先,确保查询类先被规则命中
|
||||
)
|
||||
)
|
||||
110
server/src/app/services/scenes/scene_registry.py
Normal file
110
server/src/app/services/scenes/scene_registry.py
Normal file
@@ -0,0 +1,110 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any, Callable
|
||||
|
||||
from app.services.scenes.gate_rules import GateRule, SceneRoute
|
||||
from app.services.scenes.scene_descriptor import SceneDescriptor
|
||||
|
||||
|
||||
class SceneRegistry:
|
||||
"""场景注册表单例。
|
||||
|
||||
所有场景在 import 时注册,门控/路由/执行/字段过滤全部从这里查询。
|
||||
gate_classify 节点是它的唯一消费者(单一决策点)。
|
||||
"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._scenes: dict[str, SceneDescriptor] = {}
|
||||
self._flow_to_scene: dict[str, str] = {}
|
||||
|
||||
# ---- 注册 ----
|
||||
|
||||
def register(self, descriptor: SceneDescriptor) -> SceneDescriptor:
|
||||
self._scenes[descriptor.scene_id] = descriptor
|
||||
if descriptor.flow_id:
|
||||
self._flow_to_scene[descriptor.flow_id] = descriptor.scene_id
|
||||
return descriptor
|
||||
|
||||
# ---- 查询 ----
|
||||
|
||||
def get(self, scene_id: str) -> SceneDescriptor | None:
|
||||
return self._scenes.get(str(scene_id or "").strip())
|
||||
|
||||
def all_scenes(self) -> list[SceneDescriptor]:
|
||||
return list(self._scenes.values())
|
||||
|
||||
def scenes_sorted_by_priority(self) -> list[SceneDescriptor]:
|
||||
"""按 priority 升序排列(数字小优先)。"""
|
||||
return sorted(self._scenes.values(), key=lambda s: s.priority)
|
||||
|
||||
def all_scene_ids(self) -> list[str]:
|
||||
return [s.scene_id for s in self._scenes.values()]
|
||||
|
||||
def all_assigned_agents(self) -> list[str]:
|
||||
return [s.assigned_agent for s in self._scenes.values() if s.assigned_agent]
|
||||
|
||||
def all_flow_ids(self) -> list[str]:
|
||||
return [s.flow_id for s in self._scenes.values() if s.flow_id]
|
||||
|
||||
def all_signal_keywords(self) -> set[str]:
|
||||
keywords: set[str] = set()
|
||||
for scene in self._scenes.values():
|
||||
keywords.update(scene.signal_keywords)
|
||||
return keywords
|
||||
|
||||
def all_side_effect_actions(self) -> set[str]:
|
||||
actions: set[str] = set()
|
||||
for scene in self._scenes.values():
|
||||
actions.update(scene.side_effect_actions)
|
||||
return actions
|
||||
|
||||
def all_noop_actions(self) -> set[str]:
|
||||
actions: set[str] = set()
|
||||
for scene in self._scenes.values():
|
||||
actions.update(scene.noop_actions)
|
||||
return actions
|
||||
|
||||
def resolve_scene_by_action(self, action_type: str) -> SceneDescriptor | None:
|
||||
normalized = str(action_type or "").strip()
|
||||
for scene in self._scenes.values():
|
||||
if normalized in scene.side_effect_actions or normalized in scene.noop_actions:
|
||||
return scene
|
||||
return None
|
||||
|
||||
def resolve_scene_by_flow(self, flow_id: str) -> SceneDescriptor | None:
|
||||
scene_id = self._flow_to_scene.get(str(flow_id or "").strip())
|
||||
return self.get(scene_id) if scene_id else None
|
||||
|
||||
def field_allowlist_for(
|
||||
self,
|
||||
scene_id: str,
|
||||
*,
|
||||
fallback: frozenset[str] | None = None,
|
||||
) -> frozenset[str]:
|
||||
scene = self.get(scene_id)
|
||||
if scene and scene.ontology_fields:
|
||||
return frozenset(scene.ontology_fields)
|
||||
return fallback or frozenset()
|
||||
|
||||
def resumable_scenes(self) -> list[SceneDescriptor]:
|
||||
"""返回所有声明了 can_resume=True 的场景。"""
|
||||
return [s for s in self._scenes.values() if s.can_resume]
|
||||
|
||||
def prompt_fragments(self) -> str:
|
||||
"""拼接所有场景的 prompt_fragment,供 system prompt 注入。"""
|
||||
fragments = [s.prompt_fragment for s in self._scenes.values() if s.prompt_fragment]
|
||||
return "".join(fragments)
|
||||
|
||||
def intent_summary(self) -> str:
|
||||
"""拼接场景列表摘要,供 system prompt 引用。"""
|
||||
fragments = [f"{s.scene_id}({s.label})" for s in self._scenes.values()]
|
||||
return "、".join(fragments) if fragments else "(暂无已注册场景)"
|
||||
|
||||
|
||||
# 全局单例
|
||||
REGISTRY = SceneRegistry()
|
||||
|
||||
|
||||
def register_scene(descriptor: SceneDescriptor) -> SceneDescriptor:
|
||||
"""注册场景到全局单例。"""
|
||||
return REGISTRY.register(descriptor)
|
||||
41
server/src/app/services/scenes/scene_reimbursement.py
Normal file
41
server/src/app/services/scenes/scene_reimbursement.py
Normal file
@@ -0,0 +1,41 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from app.services.scenes.gate_rules import GateRule, SceneRoute
|
||||
from app.services.scenes.scene_descriptor import SceneDescriptor
|
||||
from app.services.scenes.scene_registry import register_scene
|
||||
|
||||
|
||||
def register() -> None:
|
||||
register_scene(
|
||||
SceneDescriptor(
|
||||
scene_id="reimbursement",
|
||||
label="费用报销",
|
||||
assigned_agent="reimbursement_assistant",
|
||||
signal_keywords=(
|
||||
"报销", "报账", "票据", "发票", "凭证", "行程单", "付款截图", "小票", "收据",
|
||||
),
|
||||
ontology_fields=(), # 沿用全局 BUSINESS_CANONICAL_FIELDS
|
||||
gate=GateRule.MODEL_ONLY,
|
||||
route=SceneRoute.MODEL_INTENT,
|
||||
handler=None,
|
||||
action_steps_builder=None,
|
||||
can_resume=False,
|
||||
flow_id="travel_reimbursement",
|
||||
side_effect_actions=(
|
||||
"create_reimbursement_draft",
|
||||
"link_existing_application",
|
||||
"associate_attachments",
|
||||
),
|
||||
noop_actions=(
|
||||
"fill_reimbursement_fields",
|
||||
"build_reimbursement_preview",
|
||||
"validate_required_fields",
|
||||
),
|
||||
executor=None,
|
||||
prompt_fragment=(
|
||||
"用户描述已经发生的费用、昨天/前天费用、票据或明确报销诉求时,"
|
||||
"才识别为 reimbursement。"
|
||||
),
|
||||
priority=100,
|
||||
)
|
||||
)
|
||||
Reference in New Issue
Block a user