Files
X-Financial/server/src/app/services/steward_graph_planner.py

650 lines
27 KiB
Python
Raw Normal View History

from __future__ import annotations
import re
from datetime import date
from typing import Any, TypedDict
from langgraph.graph import END, START, StateGraph
from sqlalchemy.orm import Session
from app.schemas.steward import StewardPlanRequest, StewardPlanResponse
from app.services import steward_intent_bootstrap # noqa: F401 导入即注册全部业务意图
from app.services.agent_conversations import AgentConversationService
from app.services.scenes import REGISTRY as SCENE_REGISTRY
from app.services.scenes.gate_rules import GateRule, SceneRoute
from app.services.steward_action_contracts import StewardActionPlanBuilder
from app.services.steward_constants import BUSINESS_CANONICAL_FIELD_ORDER
from app.services.steward_context_resume import attach_resumed_task, should_resume_recent_task
from app.services.steward_intent_agent import StewardIntentAgent, StewardIntentAgentResult
from app.services.steward_model_plan_builder import StewardModelPlanBuilder
from app.services.steward_off_topic_agent import StewardOffTopicAgent
from app.services.steward_planner_extraction import StewardPlannerExtractionMixin
from app.services.steward_planner_fallback import StewardPlannerFallbackMixin
# ---- 模块级辅助函数:gate_classify 的判断逻辑 ----
def _compact_text(text: str) -> str:
return re.sub(r"\s+", "", str(text or ""))
def _scene_route_to_gate_decision(route: SceneRoute) -> str:
"""SceneRoute 映射到 gate_decision 字符串。"""
if route == SceneRoute.HANDLER_ONLY:
return "handler_only"
if route == SceneRoute.OFF_TOPIC:
return "off_topic"
if route == SceneRoute.RESUME:
return "resume"
if route == SceneRoute.AMBIGUOUS:
return "ambiguous"
return "model_intent"
class StewardGraphState(TypedDict, total=False):
request: StewardPlanRequest
db: Session
message: str
base_date: date
scenario: str | None
should_use_model: bool
intent_result: StewardIntentAgentResult | None
plan: StewardPlanResponse
model_call_traces: list[dict[str, Any]]
fallback_reason: str
# 新增:上下文状态
recent_history: list[dict[str, Any]]
steward_state: dict[str, Any]
# 新增:门控裁决
gate_decision: str # "off_topic" | "handler_only" | "resume" | "ambiguous" | "model_intent" | "fallback"
gate_scene_id: str | None
# 新增:回填的 conversation_id
conversation_id: str | None
class StewardGraphPlannerService(StewardPlannerFallbackMixin, StewardPlannerExtractionMixin):
"""用 LangGraph 编排小财管家的意图识别、流程判断和兜底路径。
Phase 1 P1.3-P1.7:LangGraph 是唯一编排者,endpoint 退化为 3
图拓扑:
load_context gate_classify {off_topic/handler_only/resume/ambiguous/model_intent}
{build_off_topic_plan / execute_scene_handler / resume_recent_task / detect_model_intent}
attach_action_steps persist_state
"""
def __init__(
self,
intent_agent: StewardIntentAgent | None = None,
off_topic_agent: StewardOffTopicAgent | None = None,
) -> None:
self.intent_agent = intent_agent
self.off_topic_agent = off_topic_agent
self._graph = self._build_graph()
# ---- 上下文加载 + 门控裁决(P1.3-P1.4 新增) ----
def _load_context(self, state: StewardGraphState) -> dict[str, Any]:
"""读最近 10 条对话历史 + steward_state,注入 state。
替代 endpoint _hydrate_required_application_gate + _inject_recent_conversation_history
"""
request = state.get("request")
if request is None:
return {}
db = state.get("db")
# 1. 读对话历史(只给模型用,不返回前端)
recent_history: list[dict[str, Any]] = []
context_json = dict(request.context_json or {})
conversation_id = self._extract_conversation_id(context_json)
if db is not None and conversation_id:
try:
recent_history = AgentConversationService(db).list_message_history(
conversation_id, limit=10
)
except Exception:
recent_history = []
# 2. 读 steward_state(DB 优先,前端传次之)
steward_state: dict[str, Any] = {}
if db is not None and conversation_id:
try:
conversation = AgentConversationService(db).get_conversation(conversation_id)
if conversation is not None and isinstance(conversation.state_json, dict):
stored = conversation.state_json.get("steward_state")
if isinstance(stored, dict):
steward_state = stored
except Exception:
steward_state = {}
if not steward_state:
incoming = context_json.get("steward_state") or context_json.get("stewardState")
if isinstance(incoming, dict):
steward_state = incoming
# 3. 注入 recent_history 到 context_json(供 LLM 使用)
if recent_history:
request = request.model_copy(
update={
"context_json": {
**context_json,
"recent_history": recent_history,
}
}
)
return {
"request": request,
"recent_history": recent_history,
"steward_state": steward_state,
"message": str(request.message or "").strip(),
"base_date": self._resolve_base_date_from_request(request),
}
def _gate_classify(self, state: StewardGraphState) -> dict[str, Any]:
"""统一门控裁决:单一决策点,按固定顺序决定场景和路由。
顺序:
resume (用户说"再提交"+ state 有可恢复 flow 上下文恢复)
off_topic (复用 _classify_irrelevant_input:legacy 94 + registry 信号词 + greeting/meaningless 细分)
规则匹配门( priority 遍历 scene_registry,命中 CHOICE 规则的)
LLM (规则未命中, model function call)
"""
from app.services.scenes import REGISTRY
from app.services.scenes.gate_rules import GateRule
request = state.get("request")
if request is None:
return {"gate_decision": "off_topic", "gate_scene_id": None}
message = str(state.get("message") or "").strip()
steward_state = state.get("steward_state") or {}
# ① resume 门(优先:即使消息不命中业务关键词,只要是"再提交"类话术就尝试恢复)
resume_scene = should_resume_recent_task(message, steward_state)
if resume_scene:
return {"gate_decision": "resume", "gate_scene_id": resume_scene}
# ② off_topic 门:复用成熟的 _classify_irrelevant_input(含城市名/时间词/金额词等 94 词 + greeting/meaningless 细分)
scenario = self._classify_irrelevant_input(message, request)
if scenario is not None:
return {"gate_decision": "off_topic", "gate_scene_id": None}
# ③ 规则匹配门(按 priority 遍历,命中 CHOICE 规则的)
for scene in REGISTRY.scenes_sorted_by_priority():
if scene.gate != GateRule.CHOICE:
continue
if not scene.signal_keywords:
continue
compact = _compact_text(message)
if any(kw in compact for kw in scene.signal_keywords):
route = _scene_route_to_gate_decision(scene.route)
return {"gate_decision": route, "gate_scene_id": scene.scene_id}
# ④ LLM 门(规则未命中)
# 走 model_intent 时,如 state 已有 active_flow 且 LLM 准备 fallback,可优先做 candidate_flow
if self._looks_like_ambiguous_travel_flow(
message, self._resolve_base_date_from_request(request), request
):
return {"gate_decision": "ambiguous", "gate_scene_id": None}
return {"gate_decision": "model_intent", "gate_scene_id": None}
def _execute_scene_handler(self, state: StewardGraphState) -> dict[str, Any]:
"""HANDLER_ONLY 路由:不调 LLM,直接执行 scene 的 handler。
当前只有 query_travel_standard 走这条路径handler 签名约定:
handler(executor_self, request, current_user, trace) -> StewardActionExecuteResponse
"""
from app.services.scenes import REGISTRY
from app.schemas.steward import (
StewardActionExecuteRequest,
StewardActionExecuteResponse,
StewardActionStep,
StewardPlanResponse,
StewardTask,
StewardThinkingEvent,
)
import time
scene_id = state.get("gate_scene_id")
scene = REGISTRY.get(scene_id or "") if scene_id else None
if scene is None or scene.handler is None:
plan = self._build_rule_fallback_graph_plan(state)
return {"plan": plan}
request = state.get("request")
if request is None:
plan = self._build_rule_fallback_graph_plan(state)
return {"plan": plan}
# 构造 handler 期望的 StewardActionExecuteRequest
from app.services.steward_action_contracts import StewardActionPlanBuilder
builder = StewardActionPlanBuilder()
if scene.action_steps_builder is not None:
task = StewardTask(
task_id=f"task_handler_{int(time.time() * 1000)}",
task_type=scene.scene_id,
assigned_agent=scene.assigned_agent or "policy_query_assistant",
title=scene.label,
summary=str(request.message or "").strip(),
status="delegated",
requested_action="preview",
ontology_fields={},
missing_fields=[],
confirmation_required=False,
)
action_steps = scene.action_steps_builder(task)
else:
action_steps = []
# 构造一个最小 action step 用于构造 handler request
if not action_steps:
action_steps = [StewardActionStep(
step_id=f"handler_{int(time.time() * 1000)}",
action_type=scene.side_effect_actions[0] if scene.side_effect_actions else scene.scene_id,
label=scene.label,
target_task_id="",
status="planned",
requires_confirmation=False,
payload={},
)]
step = action_steps[0]
action_request = StewardActionExecuteRequest(
action_type=step.action_type,
message=str(request.message or "").strip(),
task=task if scene.action_steps_builder else None,
)
try:
response = scene.handler(self, action_request, current_user=None, trace=[])
except Exception as exc:
plan = self._build_rule_fallback_graph_plan(state)
plan.thinking_events = list(plan.thinking_events) + [
StewardThinkingEvent(
event_id=f"handler_error_{scene.scene_id}",
stage="llm_function_call",
title=f"{scene.label}执行失败",
content=f"handler 抛错: {type(exc).__name__}: {str(exc)[:80]}",
status="completed",
),
]
return {"plan": plan}
# handler 返回 StewardActionExecuteResponse,转换为 StewardPlanResponse
answer = ""
result_payload: dict[str, Any] = {}
if isinstance(response, StewardActionExecuteResponse):
answer = response.message or response.result_payload.get("answer_markdown", "")
result_payload = dict(response.result_payload or {})
# 把查询结果放进 summary(给前端展示)和 thinking_event(过程摘要)
plan = StewardPlanResponse(
plan_id=f"steward_handler_{int(time.time() * 1000)}",
planning_source=f"scene_handler:{scene.scene_id}",
summary=answer or scene.label,
next_action="answer_only",
tasks=[task] if scene.action_steps_builder else [],
thinking_events=[
StewardThinkingEvent(
event_id=f"handler_{scene.scene_id}_done",
stage="llm_function_call",
title=f"已执行{scene.label}",
content=answer or f"场景 {scene.scene_id} 已执行",
status="completed",
)
],
pending_flow_confirmation={"status": "none"},
conversation_id=state.get("conversation_id") or "",
steward_state=state.get("steward_state") or {},
action_steps=[],
)
return {"plan": plan}
def _resume_recent_task(self, state: StewardGraphState) -> dict[str, Any]:
"""RESUME 路由:从 steward_state 恢复之前被拦/中断的 task。
保险①:100% 可靠,覆盖"再提交""继续提交"等确认类话术
"""
steward_state = state.get("steward_state") or {}
scene_id = state.get("gate_scene_id")
# 先建一个空 plan(无 task),让 attach_resumed_task 把恢复的 task 挂上
from app.schemas.steward import StewardPlanResponse
empty_plan = StewardPlanResponse(
plan_id="steward_resume_pending",
planning_source="rule_fallback",
summary="恢复上下文中的待办任务。",
next_action="confirm_task",
tasks=[],
thinking_events=[],
pending_flow_confirmation={"status": "none"},
)
if not scene_id:
return {"plan": empty_plan}
resumed = attach_resumed_task(empty_plan, steward_state, scene_id)
return {"plan": resumed}
def _persist_state(
self,
db: Session,
request: StewardPlanRequest,
plan: StewardPlanResponse,
final_state: StewardGraphState,
) -> StewardPlanResponse:
"""图执行后的副作用:写 message + steward_state 到 DB。
替代 endpoint _attach_conversation_state
"""
if db is None:
return plan
try:
context_json = dict(request.context_json or {})
context_json["session_type"] = str(context_json.get("session_type") or "steward").strip() or "steward"
conversation_id = self._extract_conversation_id(context_json)
conversation_service = AgentConversationService(db)
conversation = conversation_service.get_or_create_conversation(
conversation_id=conversation_id,
user_id=request.user_id,
source="user_message",
context_json=context_json,
)
current_state = self._resolve_steward_state_for_persist(conversation.state_json, final_state)
from app.services.steward_flow_state import StewardFlowStateService
steward_state = StewardFlowStateService().merge_plan(current_state, plan)
conversation = conversation_service.update_state(
conversation_id=conversation.conversation_id,
run_id=None,
scenario="steward",
intent="plan",
context_json={**context_json, "steward_state": steward_state},
) or conversation
conversation_service.append_message(
conversation_id=conversation.conversation_id,
role="user",
content=request.message,
message_json={"source": "steward_plan_request"},
)
conversation_service.append_message(
conversation_id=conversation.conversation_id,
role="assistant",
content=plan.summary,
message_json={
"source": "steward_plan_response",
"plan_id": plan.plan_id,
"steward_state": steward_state,
},
)
return plan.model_copy(
update={
"conversation_id": conversation.conversation_id,
"steward_state": steward_state,
}
)
except Exception:
return plan
# ---- 路由函数 ----
def _route_after_gate_classify(self, state: StewardGraphState) -> str:
"""gate_classify 后的路由:把 gate_decision 映射到下一个节点。"""
decision = str(state.get("gate_decision") or "model_intent")
return decision
@staticmethod
def _extract_conversation_id(context_json: dict[str, Any]) -> str | None:
return str(
context_json.get("conversation_id")
or context_json.get("conversationId")
or ""
).strip() or None
@staticmethod
def _resolve_base_date_from_request(request: StewardPlanRequest | None) -> date:
if request is None:
return date.today()
from app.services.steward_planner_extraction import StewardPlannerExtractionMixin
return StewardPlannerExtractionMixin._resolve_base_date(
request.client_now_iso,
dict(request.context_json or {}),
)
@staticmethod
def _resolve_steward_state_for_persist(
conversation_state: Any,
final_state: StewardGraphState,
) -> dict[str, Any]:
state_json = conversation_state if isinstance(conversation_state, dict) else {}
stored = state_json.get("steward_state")
if isinstance(stored, dict) and stored:
return stored
graph_state = final_state.get("steward_state")
if isinstance(graph_state, dict) and graph_state:
return graph_state
return {}
def build_plan(
self,
request: StewardPlanRequest,
db: Session | None = None,
) -> StewardPlanResponse:
"""编排一次 steward 计划请求,内部执行:load → classify → plan。
P1 中间状态:签名保持 build_plan(request) 不变以兼容现有测试/消费者
显式传 db ,load_context 节点会读历史/state;不传时图内 IO 静默跳过
持久化由 endpoint 显式调用 _attach_conversation_state 完成(P3 收敛到图内)
"""
final_state = self._graph.invoke(
{
"request": request,
"db": db,
"model_call_traces": [],
"fallback_reason": "",
}
)
plan = final_state.get("plan")
if not isinstance(plan, StewardPlanResponse):
raise RuntimeError("LangGraph 小财管家规划未生成有效计划。")
return plan
def _build_graph(self):
graph = StateGraph(StewardGraphState)
# 节点
graph.add_node("load_context", self._load_context)
graph.add_node("gate_classify", self._gate_classify)
graph.add_node("execute_scene_handler", self._execute_scene_handler)
graph.add_node("resume_recent_task", self._resume_recent_task)
graph.add_node("prepare_context", self._prepare_context)
graph.add_node("detect_model_intent", self._detect_model_intent)
graph.add_node("build_off_topic_plan", self._build_off_topic_graph_plan)
graph.add_node("build_rule_fallback_plan", self._build_rule_fallback_graph_plan)
graph.add_node("build_pending_flow_plan", self._build_pending_flow_fallback_graph_plan)
graph.add_node("attach_action_steps", self._attach_action_steps)
# 拓扑
graph.add_edge(START, "load_context")
graph.add_edge("load_context", "gate_classify")
graph.add_conditional_edges(
"gate_classify",
self._route_after_gate_classify,
{
"off_topic": "build_off_topic_plan",
"handler_only": "execute_scene_handler",
"resume": "resume_recent_task",
"ambiguous": "build_pending_flow_plan",
"model_intent": "prepare_context",
},
)
graph.add_edge("execute_scene_handler", "attach_action_steps")
graph.add_edge("resume_recent_task", "attach_action_steps")
graph.add_conditional_edges(
"prepare_context",
self._route_after_prepare_context,
{
"model": "detect_model_intent",
"off_topic": "build_off_topic_plan",
"fallback": "build_rule_fallback_plan",
},
)
graph.add_conditional_edges(
"detect_model_intent",
self._route_after_model_intent,
{
"done": "attach_action_steps",
"off_topic": "build_off_topic_plan",
"fallback": "build_rule_fallback_plan",
},
)
graph.add_edge("build_off_topic_plan", "attach_action_steps")
graph.add_edge("build_rule_fallback_plan", "attach_action_steps")
graph.add_edge("build_pending_flow_plan", "attach_action_steps")
graph.add_edge("attach_action_steps", END)
return graph.compile()
def _prepare_context(self, state: StewardGraphState) -> dict[str, Any]:
request = state["request"]
message = self._clean_text(request.message)
if not message:
raise ValueError("小财管家需要一段任务描述。")
base_date = self._resolve_base_date(request.client_now_iso, request.context_json)
return {
"message": message,
"base_date": base_date,
"scenario": self._classify_irrelevant_input(message, request),
"should_use_model": bool(
self.intent_agent is not None
and self._should_use_model_intent_recognition(message, base_date, request)
),
}
@staticmethod
def _route_after_prepare_context(state: StewardGraphState) -> str:
if state.get("should_use_model"):
return "model"
if state.get("scenario") is not None:
return "off_topic"
return "fallback"
def _detect_model_intent(self, state: StewardGraphState) -> dict[str, Any]:
request = state["request"]
message = state["message"]
base_date = state["base_date"]
model_call_traces: list[dict[str, Any]] = []
if self.intent_agent is None:
return {}
try:
intent_result = self.intent_agent.detect(
request,
base_date=base_date,
canonical_fields=list(BUSINESS_CANONICAL_FIELD_ORDER),
)
if intent_result is None:
return {
"model_call_traces": self._last_intent_call_traces(model_call_traces),
"fallback_reason": (
"主模型未返回可用的 function calling 计划,已切换到规则兜底。"
),
}
model_call_traces = intent_result.model_call_traces
llm_plan = StewardModelPlanBuilder(self).build(
intent_result,
request=request,
base_date=base_date,
)
if llm_plan is None:
return {
"model_call_traces": self._last_intent_call_traces(model_call_traces),
"fallback_reason": (
"主模型未返回可用的 function calling 计划,已切换到规则兜底。"
),
}
if self._looks_like_ambiguous_travel_flow(message, base_date, request):
return {
"plan": self._build_pending_flow_fallback_plan(
request,
base_date=base_date,
model_call_traces=model_call_traces,
fallback_reason=(
"主模型返回了直接任务,但当前话术没有明确申请或报销动作;"
"服务端已改为候选流程确认,避免误入申请流程。"
),
planning_source="llm_function_call",
),
"model_call_traces": model_call_traces,
}
return {
"intent_result": intent_result,
"plan": llm_plan,
"model_call_traces": model_call_traces,
}
except Exception as exc:
return {
"model_call_traces": self._last_intent_call_traces(model_call_traces),
"fallback_reason": f"主模型 function calling 调用失败,已切换到规则兜底:{exc}",
}
@staticmethod
def _route_after_model_intent(state: StewardGraphState) -> str:
if isinstance(state.get("plan"), StewardPlanResponse):
return "done"
if state.get("scenario") is not None:
return "off_topic"
return "fallback"
def _build_off_topic_graph_plan(
self,
state: StewardGraphState,
) -> dict[str, StewardPlanResponse]:
return {
"plan": self._build_off_topic_plan(
state["request"],
scenario=str(state.get("scenario") or ""),
model_call_traces=state.get("model_call_traces") or [],
fallback_reason=str(state.get("fallback_reason") or ""),
)
}
def _build_pending_flow_fallback_graph_plan(
self,
state: StewardGraphState,
) -> dict[str, StewardPlanResponse]:
request = state["request"]
return {
"plan": self._build_pending_flow_fallback_plan(
request,
base_date=state["base_date"],
model_call_traces=state.get("model_call_traces") or [],
fallback_reason=str(state.get("fallback_reason") or ""),
)
}
def _build_rule_fallback_graph_plan(
self,
state: StewardGraphState,
) -> dict[str, StewardPlanResponse]:
return {
"plan": self._build_rule_fallback_plan(
state["request"],
base_date=state["base_date"],
model_call_traces=state.get("model_call_traces"),
fallback_reason=str(state.get("fallback_reason") or ""),
)
}
@staticmethod
def _attach_action_steps(state: StewardGraphState) -> dict[str, StewardPlanResponse]:
plan = state.get("plan")
if not isinstance(plan, StewardPlanResponse):
raise RuntimeError("LangGraph 小财管家动作规划缺少有效计划。")
return {"plan": StewardActionPlanBuilder().attach_action_steps(plan)}
def _last_intent_call_traces(
self,
fallback_traces: list[dict[str, Any]],
) -> list[dict[str, Any]]:
return getattr(self.intent_agent, "last_call_traces", []) or fallback_traces