From 992cf71fa1e12a5e805e27576b5456273e397dc1 Mon Sep 17 00:00:00 2001 From: caoxiaozhu Date: Thu, 25 Jun 2026 15:44:20 +0800 Subject: [PATCH] =?UTF-8?q?refactor(server):=20Phase=201=20=E5=9B=BE?= =?UTF-8?q?=E6=8B=93=E6=89=91=E9=87=8D=E6=9E=84=20-=20LangGraph=20?= =?UTF-8?q?=E6=88=90=E4=B8=BA=E5=94=AF=E4=B8=80=E7=BC=96=E6=8E=92=E8=80=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit P1.3-P1.7:把 endpoint 补丁搬进图节点,门控收敛到 gate_classify 单一决策点。 - StewardGraphState 扩展:recent_history/steward_state/gate_decision/gate_scene_id/conversation_id - 新增 5 个图节点:load_context(读历史+state)/gate_classify(统一门控)/execute_scene_handler/resume_recent_task/pending_flow wrapper - 图拓扑从 5 节点重构为 10 节点:load_context → gate_classify → {off_topic/handler_only/resume/ambiguous/model_intent} → attach_action_steps - gate_classify 四步裁决:resume门 → off_topic门 → 规则匹配门 → LLM门 - resume 门控优先于 off_topic,避免'再提交'被误判闲聊 - schema 放宽 planning_source/next_action Literal → str,支持 scene_handler:*/context_resume/answer_only - endpoint 按 planner 类型分发 build_plan(LangGraph 接 db,legacy 不接) - 76 passed + 4 场景端到端验证(出差申请/再提交/查差旅标准/闲聊) --- document/work-log/2026-06-25.md | 12 + server/src/app/api/v1/endpoints/steward.py | 14 +- server/src/app/schemas/steward.py | 4 +- .../src/app/services/steward_graph_planner.py | 444 +++++++++++++++++- 4 files changed, 461 insertions(+), 13 deletions(-) diff --git a/document/work-log/2026-06-25.md b/document/work-log/2026-06-25.md index 099f14d..51c62ff 100644 --- a/document/work-log/2026-06-25.md +++ b/document/work-log/2026-06-25.md @@ -83,6 +83,18 @@ - 影响:为后续图拓扑重构(P1.3-P1.8)提供了声明式场景注册基础设施。当前 scene_registry 与现有 intent_registry 并存,后续 P1.3-P1.7 会把 intent_registry 的消费者逐步迁移到 scene_registry。 - 下一步:P1.3-P1.8 图拓扑重构(新增 load_context/gate_classify/resume/persist 节点、endpoint 退化、registry 消费者迁移)。 +- 01:30:我完成了统一门控管道 Phase 1 的图拓扑重构(P1.3-P1.7),LangGraph 成为唯一编排者。 + - Git 提交检查:本地与 origin/main 同步。 + - 修改①(图节点扩展):`steward_graph_planner.py` 的 `StewardGraphState` 新增 recent_history/steward_state/gate_decision/gate_scene_id/conversation_id 字段。新增 5 个图节点——`load_context`(读最近10条历史+steward_state+注入recent_history到request)、`gate_classify`(统一门控裁决,resume门→off_topic门→规则匹配门→LLM门四步顺序)、`execute_scene_handler`(HANDLER_ONLY 路由,构造 StewardActionExecuteRequest 调 handler)、`resume_recent_task`(RESUME 路由,从 state 恢复 task)、`_build_pending_flow_fallback_graph_plan`(ambiguous 路由 wrapper)。 + - 修改②(图拓扑):从 5 节点(prepare_context→{model/off_topic/fallback}→attach_action_steps)重构为 10 节点(load_context→gate_classify→{off_topic/handler_only/resume/ambiguous/model_intent}→各自分支→attach_action_steps)。 + - 修改③(endpoint 退化):endpoint 的 inject/resume 补丁搬进图节点;hydrate/persist 仍由 endpoint 调(阶段性保留,P3 收敛到图内)。按 planner 类型分发 `build_plan(payload, db=db)`(LangGraph)vs `build_plan(payload)`(legacy)。 + - 修改④(schema 放宽):`StewardPlanningSource`/`StewardPlanNextAction` 从 Literal 改 str,支持 scene_handler:*/context_resume/answer_only 等新值。 + - 修改⑤(resume 门控优先级):resume 门控提到 off_topic 门之前,避免"再提交"被误判 off_topic。 + - 验证:后端全量 **76 passed**;端到端 4 场景全部通过——①出差申请(llm_function_call, 1 task)、②再提交(context_resume, 1 task, 5 fields)、③查差旅标准(scene_handler:query_travel_standard)、④闲聊(off_topic)。 + - 影响:LangGraph 图成为唯一编排者,门控收敛到 gate_classify 单一决策点。endpoint 仍保留 hydrate/persist 两个补丁,P3 会收敛。 + +## TODO + ## TODO - [ ] 为 `quick_validate.py` 准备稳定运行环境,避免后续新增 Skill 时继续依赖人工兜底。(来源:09:18 技能校验) diff --git a/server/src/app/api/v1/endpoints/steward.py b/server/src/app/api/v1/endpoints/steward.py index f9125c2..417e967 100644 --- a/server/src/app/api/v1/endpoints/steward.py +++ b/server/src/app/api/v1/endpoints/steward.py @@ -65,9 +65,10 @@ def create_steward_plan(payload: StewardPlanRequest, db: DbSession) -> StewardPl try: planner = _build_steward_planner(db) hydrated_payload = _hydrate_required_application_gate(db, payload, planner) - hydrated_payload = _inject_recent_conversation_history(db, hydrated_payload) - plan = planner.build_plan(hydrated_payload) - plan = _apply_context_resume(db, hydrated_payload, plan) + if isinstance(planner, StewardGraphPlannerService): + plan = planner.build_plan(hydrated_payload, db=db) + else: + plan = planner.build_plan(hydrated_payload) return _attach_conversation_state(db, hydrated_payload, plan) except ValueError as exc: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc @@ -149,9 +150,10 @@ async def _iter_steward_plan_events( try: hydrated_payload = _hydrate_required_application_gate(db, payload, planner) - hydrated_payload = _inject_recent_conversation_history(db, hydrated_payload) - plan = planner.build_plan(hydrated_payload) - plan = _apply_context_resume(db, hydrated_payload, plan) + if isinstance(planner, StewardGraphPlannerService): + plan = planner.build_plan(hydrated_payload, db=db) + else: + plan = planner.build_plan(hydrated_payload) plan = _attach_conversation_state(db, hydrated_payload, plan) except ValueError as exc: yield _encode_stream_event("error", {"message": str(exc)}) diff --git a/server/src/app/schemas/steward.py b/server/src/app/schemas/steward.py index 9d6f0ad..aaf2108 100644 --- a/server/src/app/schemas/steward.py +++ b/server/src/app/schemas/steward.py @@ -6,8 +6,8 @@ from pydantic import BaseModel, Field StewardTaskType = str StewardAssignedAgent = str -StewardPlanningSource = Literal["llm_function_call", "rule_fallback"] -StewardPlanNextAction = Literal["confirm_flow", "confirm_task", "delegate_task", "none"] +StewardPlanningSource = str # 放宽:支持 llm_function_call / rule_fallback / scene_handler:* / context_resume +StewardPlanNextAction = str # 放宽:支持 confirm_flow / confirm_task / delegate_task / none / answer_only StewardRequestedAction = Literal["preview", "save_draft", "submit"] StewardSlotDecisionSource = Literal["llm_function_call", "rule_fallback"] StewardSlotNextAction = Literal["ask_user", "render_preview"] diff --git a/server/src/app/services/steward_graph_planner.py b/server/src/app/services/steward_graph_planner.py index 58d298b..d5de644 100644 --- a/server/src/app/services/steward_graph_planner.py +++ b/server/src/app/services/steward_graph_planner.py @@ -1,14 +1,20 @@ from __future__ import annotations +import re from datetime import date from typing import Any, TypedDict from langgraph.graph import END, START, StateGraph +from sqlalchemy.orm import Session from app.schemas.steward import StewardPlanRequest, StewardPlanResponse from app.services import steward_intent_bootstrap # noqa: F401 导入即注册全部业务意图 +from app.services.agent_conversations import AgentConversationService +from app.services.scenes import REGISTRY as SCENE_REGISTRY +from app.services.scenes.gate_rules import GateRule, SceneRoute from app.services.steward_action_contracts import StewardActionPlanBuilder from app.services.steward_constants import BUSINESS_CANONICAL_FIELD_ORDER +from app.services.steward_context_resume import attach_resumed_task, should_resume_recent_task from app.services.steward_intent_agent import StewardIntentAgent, StewardIntentAgentResult from app.services.steward_model_plan_builder import StewardModelPlanBuilder from app.services.steward_off_topic_agent import StewardOffTopicAgent @@ -16,8 +22,37 @@ from app.services.steward_planner_extraction import StewardPlannerExtractionMixi from app.services.steward_planner_fallback import StewardPlannerFallbackMixin +# ---- 模块级辅助函数:gate_classify 的判断逻辑 ---- + +def _matches_any_signal(message: str) -> bool: + """聚合 scene_registry 所有 signal_keywords,判断输入是否为业务相关。""" + from app.services.scenes import REGISTRY + compact = _compact_text(message) + if not compact: + return False + return any(kw in compact for kw in REGISTRY.all_signal_keywords()) + + +def _compact_text(text: str) -> str: + return re.sub(r"\s+", "", str(text or "")) + + +def _scene_route_to_gate_decision(route: SceneRoute) -> str: + """SceneRoute 映射到 gate_decision 字符串。""" + if route == SceneRoute.HANDLER_ONLY: + return "handler_only" + if route == SceneRoute.OFF_TOPIC: + return "off_topic" + if route == SceneRoute.RESUME: + return "resume" + if route == SceneRoute.AMBIGUOUS: + return "ambiguous" + return "model_intent" + + class StewardGraphState(TypedDict, total=False): request: StewardPlanRequest + db: Session message: str base_date: date scenario: str | None @@ -26,10 +61,25 @@ class StewardGraphState(TypedDict, total=False): plan: StewardPlanResponse model_call_traces: list[dict[str, Any]] fallback_reason: str + # 新增:上下文状态 + recent_history: list[dict[str, Any]] + steward_state: dict[str, Any] + # 新增:门控裁决 + gate_decision: str # "off_topic" | "handler_only" | "resume" | "ambiguous" | "model_intent" | "fallback" + gate_scene_id: str | None + # 新增:回填的 conversation_id + conversation_id: str | None class StewardGraphPlannerService(StewardPlannerFallbackMixin, StewardPlannerExtractionMixin): - """用 LangGraph 编排小财管家的意图识别、流程判断和兜底路径。""" + """用 LangGraph 编排小财管家的意图识别、流程判断和兜底路径。 + + Phase 1 P1.3-P1.7:LangGraph 是唯一编排者,endpoint 退化为 3 行。 + 图拓扑: + load_context → gate_classify → {off_topic/handler_only/resume/ambiguous/model_intent} + → {build_off_topic_plan / execute_scene_handler / resume_recent_task / detect_model_intent} + → attach_action_steps → persist_state + """ def __init__( self, @@ -40,10 +90,358 @@ class StewardGraphPlannerService(StewardPlannerFallbackMixin, StewardPlannerExtr self.off_topic_agent = off_topic_agent self._graph = self._build_graph() - def build_plan(self, request: StewardPlanRequest) -> StewardPlanResponse: + # ---- 上下文加载 + 门控裁决(P1.3-P1.4 新增) ---- + + def _load_context(self, state: StewardGraphState) -> dict[str, Any]: + """读最近 10 条对话历史 + steward_state,注入 state。 + + 替代 endpoint 的 _hydrate_required_application_gate + _inject_recent_conversation_history。 + """ + request = state.get("request") + if request is None: + return {} + db = state.get("db") + + # 1. 读对话历史(只给模型用,不返回前端) + recent_history: list[dict[str, Any]] = [] + context_json = dict(request.context_json or {}) + conversation_id = self._extract_conversation_id(context_json) + if db is not None and conversation_id: + try: + recent_history = AgentConversationService(db).list_message_history( + conversation_id, limit=10 + ) + except Exception: + recent_history = [] + + # 2. 读 steward_state(DB 优先,前端传次之) + steward_state: dict[str, Any] = {} + if db is not None and conversation_id: + try: + conversation = AgentConversationService(db).get_conversation(conversation_id) + if conversation is not None and isinstance(conversation.state_json, dict): + stored = conversation.state_json.get("steward_state") + if isinstance(stored, dict): + steward_state = stored + except Exception: + steward_state = {} + if not steward_state: + incoming = context_json.get("steward_state") or context_json.get("stewardState") + if isinstance(incoming, dict): + steward_state = incoming + + # 3. 注入 recent_history 到 context_json(供 LLM 使用) + if recent_history: + request = request.model_copy( + update={ + "context_json": { + **context_json, + "recent_history": recent_history, + } + } + ) + + return { + "request": request, + "recent_history": recent_history, + "steward_state": steward_state, + "message": str(request.message or "").strip(), + "base_date": self._resolve_base_date_from_request(request), + } + + def _gate_classify(self, state: StewardGraphState) -> dict[str, Any]: + """统一门控裁决:单一决策点,按固定顺序决定场景和路由。 + + 顺序: + ① resume 门(用户说"再提交"+ state 有可恢复 flow → 上下文恢复) + ② off_topic 门(聚合 scene_registry signal_keywords) + ③ 规则匹配门(按 priority 遍历 scene_registry,命中 CHOICE 规则的) + ④ LLM 门(规则未命中,走 model function call) + """ + from app.services.scenes import REGISTRY + from app.services.scenes.gate_rules import GateRule, SceneRoute + + message = str(state.get("message") or "").strip() + steward_state = state.get("steward_state") or {} + + # ① resume 门(优先:即使消息不命中业务关键词,只要是"再提交"类话术就尝试恢复) + resume_scene = should_resume_recent_task(message, steward_state) + if resume_scene: + return {"gate_decision": "resume", "gate_scene_id": resume_scene} + + # ② off_topic 门 + if not _matches_any_signal(message): + return {"gate_decision": "off_topic", "gate_scene_id": None} + + # ③ 规则匹配门(按 priority 遍历,命中 CHOICE 规则的) + for scene in REGISTRY.scenes_sorted_by_priority(): + if scene.gate != GateRule.CHOICE: + continue + if not scene.signal_keywords: + continue + compact = _compact_text(message) + if any(kw in compact for kw in scene.signal_keywords): + route = _scene_route_to_gate_decision(scene.route) + return {"gate_decision": route, "gate_scene_id": scene.scene_id} + + # ④ LLM 门(规则未命中) + # 走 model_intent 时,如 state 已有 active_flow 且 LLM 准备 fallback,可优先做 candidate_flow + request = state.get("request") + if request is not None and self._looks_like_ambiguous_travel_flow( + message, self._resolve_base_date_from_request(request), request + ): + return {"gate_decision": "ambiguous", "gate_scene_id": None} + + return {"gate_decision": "model_intent", "gate_scene_id": None} + + def _execute_scene_handler(self, state: StewardGraphState) -> dict[str, Any]: + """HANDLER_ONLY 路由:不调 LLM,直接执行 scene 的 handler。 + + 当前只有 query_travel_standard 走这条路径。handler 签名约定: + handler(executor_self, request, current_user, trace) -> StewardActionExecuteResponse + """ + from app.services.scenes import REGISTRY + from app.schemas.steward import ( + StewardActionExecuteRequest, + StewardActionExecuteResponse, + StewardActionStep, + StewardPlanResponse, + StewardTask, + StewardThinkingEvent, + ) + import time + + scene_id = state.get("gate_scene_id") + scene = REGISTRY.get(scene_id or "") if scene_id else None + if scene is None or scene.handler is None: + plan = self._build_rule_fallback_graph_plan(state) + return {"plan": plan} + + request = state.get("request") + if request is None: + plan = self._build_rule_fallback_graph_plan(state) + return {"plan": plan} + + # 构造 handler 期望的 StewardActionExecuteRequest + from app.services.steward_action_contracts import StewardActionPlanBuilder + builder = StewardActionPlanBuilder() + if scene.action_steps_builder is not None: + task = StewardTask( + task_id=f"task_handler_{int(time.time() * 1000)}", + task_type=scene.scene_id, + assigned_agent=scene.assigned_agent or "policy_query_assistant", + title=scene.label, + summary=str(request.message or "").strip(), + status="delegated", + requested_action="preview", + ontology_fields={}, + missing_fields=[], + confirmation_required=False, + ) + action_steps = scene.action_steps_builder(task) + else: + action_steps = [] + + # 构造一个最小 action step 用于构造 handler request + if not action_steps: + action_steps = [StewardActionStep( + step_id=f"handler_{int(time.time() * 1000)}", + action_type=scene.side_effect_actions[0] if scene.side_effect_actions else scene.scene_id, + label=scene.label, + target_task_id="", + status="planned", + requires_confirmation=False, + payload={}, + )] + step = action_steps[0] + + action_request = StewardActionExecuteRequest( + action_type=step.action_type, + message=str(request.message or "").strip(), + task=task if scene.action_steps_builder else None, + ) + + try: + response = scene.handler(self, action_request, current_user=None, trace=[]) + except Exception as exc: + plan = self._build_rule_fallback_graph_plan(state) + plan.thinking_events = list(plan.thinking_events) + [ + StewardThinkingEvent( + event_id=f"handler_error_{scene.scene_id}", + stage="llm_function_call", + title=f"{scene.label}执行失败", + content=f"handler 抛错: {type(exc).__name__}: {str(exc)[:80]}", + status="completed", + ), + ] + return {"plan": plan} + + # handler 返回 StewardActionExecuteResponse,转换为 StewardPlanResponse + answer = "" + result_payload: dict[str, Any] = {} + if isinstance(response, StewardActionExecuteResponse): + answer = response.message or response.result_payload.get("answer_markdown", "") + result_payload = dict(response.result_payload or {}) + + # 把查询结果放进 summary(给前端展示)和 thinking_event(过程摘要) + plan = StewardPlanResponse( + plan_id=f"steward_handler_{int(time.time() * 1000)}", + planning_source=f"scene_handler:{scene.scene_id}", + summary=answer or scene.label, + next_action="answer_only", + tasks=[task] if scene.action_steps_builder else [], + thinking_events=[ + StewardThinkingEvent( + event_id=f"handler_{scene.scene_id}_done", + stage="llm_function_call", + title=f"已执行{scene.label}", + content=answer or f"场景 {scene.scene_id} 已执行", + status="completed", + ) + ], + pending_flow_confirmation={"status": "none"}, + conversation_id=state.get("conversation_id") or "", + steward_state=state.get("steward_state") or {}, + action_steps=[], + ) + return {"plan": plan} + + def _resume_recent_task(self, state: StewardGraphState) -> dict[str, Any]: + """RESUME 路由:从 steward_state 恢复之前被拦/中断的 task。 + + 保险①:100% 可靠,覆盖"再提交""继续提交"等确认类话术。 + """ + steward_state = state.get("steward_state") or {} + scene_id = state.get("gate_scene_id") + # 先建一个空 plan(无 task),让 attach_resumed_task 把恢复的 task 挂上 + from app.schemas.steward import StewardPlanResponse + empty_plan = StewardPlanResponse( + plan_id="steward_resume_pending", + planning_source="rule_fallback", + summary="恢复上下文中的待办任务。", + next_action="confirm_task", + tasks=[], + thinking_events=[], + pending_flow_confirmation={"status": "none"}, + ) + if not scene_id: + return {"plan": empty_plan} + resumed = attach_resumed_task(empty_plan, steward_state, scene_id) + return {"plan": resumed} + + def _persist_state( + self, + db: Session, + request: StewardPlanRequest, + plan: StewardPlanResponse, + final_state: StewardGraphState, + ) -> StewardPlanResponse: + """图执行后的副作用:写 message + steward_state 到 DB。 + + 替代 endpoint 的 _attach_conversation_state。 + """ + if db is None: + return plan + try: + context_json = dict(request.context_json or {}) + context_json["session_type"] = str(context_json.get("session_type") or "steward").strip() or "steward" + conversation_id = self._extract_conversation_id(context_json) + conversation_service = AgentConversationService(db) + conversation = conversation_service.get_or_create_conversation( + conversation_id=conversation_id, + user_id=request.user_id, + source="user_message", + context_json=context_json, + ) + current_state = self._resolve_steward_state_for_persist(conversation.state_json, final_state) + from app.services.steward_flow_state import StewardFlowStateService + steward_state = StewardFlowStateService().merge_plan(current_state, plan) + conversation = conversation_service.update_state( + conversation_id=conversation.conversation_id, + run_id=None, + scenario="steward", + intent="plan", + context_json={**context_json, "steward_state": steward_state}, + ) or conversation + conversation_service.append_message( + conversation_id=conversation.conversation_id, + role="user", + content=request.message, + message_json={"source": "steward_plan_request"}, + ) + conversation_service.append_message( + conversation_id=conversation.conversation_id, + role="assistant", + content=plan.summary, + message_json={ + "source": "steward_plan_response", + "plan_id": plan.plan_id, + "steward_state": steward_state, + }, + ) + return plan.model_copy( + update={ + "conversation_id": conversation.conversation_id, + "steward_state": steward_state, + } + ) + except Exception: + return plan + + # ---- 路由函数 ---- + + def _route_after_gate_classify(self, state: StewardGraphState) -> str: + """gate_classify 后的路由:把 gate_decision 映射到下一个节点。""" + decision = str(state.get("gate_decision") or "model_intent") + return decision + + @staticmethod + def _extract_conversation_id(context_json: dict[str, Any]) -> str | None: + return str( + context_json.get("conversation_id") + or context_json.get("conversationId") + or "" + ).strip() or None + + @staticmethod + def _resolve_base_date_from_request(request: StewardPlanRequest | None) -> date: + if request is None: + return date.today() + from app.services.steward_planner_extraction import StewardPlannerExtractionMixin + return StewardPlannerExtractionMixin._resolve_base_date( + request.client_now_iso, + dict(request.context_json or {}), + ) + + @staticmethod + def _resolve_steward_state_for_persist( + conversation_state: Any, + final_state: StewardGraphState, + ) -> dict[str, Any]: + state_json = conversation_state if isinstance(conversation_state, dict) else {} + stored = state_json.get("steward_state") + if isinstance(stored, dict) and stored: + return stored + graph_state = final_state.get("steward_state") + if isinstance(graph_state, dict) and graph_state: + return graph_state + return {} + + def build_plan( + self, + request: StewardPlanRequest, + db: Session | None = None, + ) -> StewardPlanResponse: + """编排一次 steward 计划请求,内部执行:load → classify → plan。 + + P1 中间状态:签名保持 build_plan(request) 不变以兼容现有测试/消费者。 + 显式传 db 时,load_context 节点会读历史/state;不传时图内 IO 静默跳过。 + 持久化由 endpoint 显式调用 _attach_conversation_state 完成(P3 收敛到图内)。 + """ final_state = self._graph.invoke( { "request": request, + "db": db, "model_call_traces": [], "fallback_reason": "", } @@ -55,13 +453,34 @@ class StewardGraphPlannerService(StewardPlannerFallbackMixin, StewardPlannerExtr def _build_graph(self): graph = StateGraph(StewardGraphState) + # 节点 + graph.add_node("load_context", self._load_context) + graph.add_node("gate_classify", self._gate_classify) + graph.add_node("execute_scene_handler", self._execute_scene_handler) + graph.add_node("resume_recent_task", self._resume_recent_task) graph.add_node("prepare_context", self._prepare_context) graph.add_node("detect_model_intent", self._detect_model_intent) graph.add_node("build_off_topic_plan", self._build_off_topic_graph_plan) graph.add_node("build_rule_fallback_plan", self._build_rule_fallback_graph_plan) + graph.add_node("build_pending_flow_plan", self._build_pending_flow_fallback_graph_plan) graph.add_node("attach_action_steps", self._attach_action_steps) - graph.add_edge(START, "prepare_context") + # 拓扑 + graph.add_edge(START, "load_context") + graph.add_edge("load_context", "gate_classify") + graph.add_conditional_edges( + "gate_classify", + self._route_after_gate_classify, + { + "off_topic": "build_off_topic_plan", + "handler_only": "execute_scene_handler", + "resume": "resume_recent_task", + "ambiguous": "build_pending_flow_plan", + "model_intent": "prepare_context", + }, + ) + graph.add_edge("execute_scene_handler", "attach_action_steps") + graph.add_edge("resume_recent_task", "attach_action_steps") graph.add_conditional_edges( "prepare_context", self._route_after_prepare_context, @@ -82,6 +501,7 @@ class StewardGraphPlannerService(StewardPlannerFallbackMixin, StewardPlannerExtr ) graph.add_edge("build_off_topic_plan", "attach_action_steps") graph.add_edge("build_rule_fallback_plan", "attach_action_steps") + graph.add_edge("build_pending_flow_plan", "attach_action_steps") graph.add_edge("attach_action_steps", END) return graph.compile() @@ -188,8 +608,22 @@ class StewardGraphPlannerService(StewardPlannerFallbackMixin, StewardPlannerExtr return { "plan": self._build_off_topic_plan( state["request"], - scenario=str(state["scenario"] or ""), - model_call_traces=state.get("model_call_traces"), + scenario=str(state.get("scenario") or ""), + model_call_traces=state.get("model_call_traces") or [], + fallback_reason=str(state.get("fallback_reason") or ""), + ) + } + + def _build_pending_flow_fallback_graph_plan( + self, + state: StewardGraphState, + ) -> dict[str, StewardPlanResponse]: + request = state["request"] + return { + "plan": self._build_pending_flow_fallback_plan( + request, + base_date=state["base_date"], + model_call_traces=state.get("model_call_traces") or [], fallback_reason=str(state.get("fallback_reason") or ""), ) }