from __future__ import annotations from datetime import date from typing import Any, TypedDict from langgraph.graph import END, START, StateGraph from app.schemas.steward import StewardPlanRequest, StewardPlanResponse from app.services.steward_action_contracts import StewardActionPlanBuilder from app.services.steward_constants import BUSINESS_CANONICAL_FIELD_ORDER from app.services.steward_intent_agent import StewardIntentAgent, StewardIntentAgentResult from app.services.steward_model_plan_builder import StewardModelPlanBuilder from app.services.steward_off_topic_agent import StewardOffTopicAgent from app.services.steward_planner_extraction import StewardPlannerExtractionMixin from app.services.steward_planner_fallback import StewardPlannerFallbackMixin class StewardGraphState(TypedDict, total=False): request: StewardPlanRequest message: str base_date: date scenario: str | None should_use_model: bool intent_result: StewardIntentAgentResult | None plan: StewardPlanResponse model_call_traces: list[dict[str, Any]] fallback_reason: str class StewardGraphPlannerService(StewardPlannerFallbackMixin, StewardPlannerExtractionMixin): """用 LangGraph 编排小财管家的意图识别、流程判断和兜底路径。""" def __init__( self, intent_agent: StewardIntentAgent | None = None, off_topic_agent: StewardOffTopicAgent | None = None, ) -> None: self.intent_agent = intent_agent self.off_topic_agent = off_topic_agent self._graph = self._build_graph() def build_plan(self, request: StewardPlanRequest) -> StewardPlanResponse: final_state = self._graph.invoke( { "request": request, "model_call_traces": [], "fallback_reason": "", } ) plan = final_state.get("plan") if not isinstance(plan, StewardPlanResponse): raise RuntimeError("LangGraph 小财管家规划未生成有效计划。") return plan def _build_graph(self): graph = StateGraph(StewardGraphState) graph.add_node("prepare_context", self._prepare_context) graph.add_node("detect_model_intent", self._detect_model_intent) graph.add_node("build_off_topic_plan", self._build_off_topic_graph_plan) graph.add_node("build_rule_fallback_plan", self._build_rule_fallback_graph_plan) graph.add_node("attach_action_steps", self._attach_action_steps) graph.add_edge(START, "prepare_context") graph.add_conditional_edges( "prepare_context", self._route_after_prepare_context, { "model": "detect_model_intent", "off_topic": "build_off_topic_plan", "fallback": "build_rule_fallback_plan", }, ) graph.add_conditional_edges( "detect_model_intent", self._route_after_model_intent, { "done": "attach_action_steps", "off_topic": "build_off_topic_plan", "fallback": "build_rule_fallback_plan", }, ) graph.add_edge("build_off_topic_plan", "attach_action_steps") graph.add_edge("build_rule_fallback_plan", "attach_action_steps") graph.add_edge("attach_action_steps", END) return graph.compile() def _prepare_context(self, state: StewardGraphState) -> dict[str, Any]: request = state["request"] message = self._clean_text(request.message) if not message: raise ValueError("小财管家需要一段任务描述。") base_date = self._resolve_base_date(request.client_now_iso, request.context_json) return { "message": message, "base_date": base_date, "scenario": self._classify_irrelevant_input(message, request), "should_use_model": bool( self.intent_agent is not None and self._should_use_model_intent_recognition(message, base_date, request) ), } @staticmethod def _route_after_prepare_context(state: StewardGraphState) -> str: if state.get("should_use_model"): return "model" if state.get("scenario") is not None: return "off_topic" return "fallback" def _detect_model_intent(self, state: StewardGraphState) -> dict[str, Any]: request = state["request"] message = state["message"] base_date = state["base_date"] model_call_traces: list[dict[str, Any]] = [] if self.intent_agent is None: return {} try: intent_result = self.intent_agent.detect( request, base_date=base_date, canonical_fields=list(BUSINESS_CANONICAL_FIELD_ORDER), ) if intent_result is None: return { "model_call_traces": self._last_intent_call_traces(model_call_traces), "fallback_reason": ( "主模型未返回可用的 function calling 计划,已切换到规则兜底。" ), } model_call_traces = intent_result.model_call_traces llm_plan = StewardModelPlanBuilder(self).build( intent_result, request=request, base_date=base_date, ) if llm_plan is None: return { "model_call_traces": self._last_intent_call_traces(model_call_traces), "fallback_reason": ( "主模型未返回可用的 function calling 计划,已切换到规则兜底。" ), } if self._looks_like_ambiguous_travel_flow(message, base_date, request): return { "plan": self._build_pending_flow_fallback_plan( request, base_date=base_date, model_call_traces=model_call_traces, fallback_reason=( "主模型返回了直接任务,但当前话术没有明确申请或报销动作;" "服务端已改为候选流程确认,避免误入申请流程。" ), planning_source="llm_function_call", ), "model_call_traces": model_call_traces, } return { "intent_result": intent_result, "plan": llm_plan, "model_call_traces": model_call_traces, } except Exception as exc: return { "model_call_traces": self._last_intent_call_traces(model_call_traces), "fallback_reason": f"主模型 function calling 调用失败,已切换到规则兜底:{exc}", } @staticmethod def _route_after_model_intent(state: StewardGraphState) -> str: if isinstance(state.get("plan"), StewardPlanResponse): return "done" if state.get("scenario") is not None: return "off_topic" return "fallback" def _build_off_topic_graph_plan( self, state: StewardGraphState, ) -> dict[str, StewardPlanResponse]: return { "plan": self._build_off_topic_plan( state["request"], scenario=str(state["scenario"] or ""), model_call_traces=state.get("model_call_traces"), fallback_reason=str(state.get("fallback_reason") or ""), ) } def _build_rule_fallback_graph_plan( self, state: StewardGraphState, ) -> dict[str, StewardPlanResponse]: return { "plan": self._build_rule_fallback_plan( state["request"], base_date=state["base_date"], model_call_traces=state.get("model_call_traces"), fallback_reason=str(state.get("fallback_reason") or ""), ) } @staticmethod def _attach_action_steps(state: StewardGraphState) -> dict[str, StewardPlanResponse]: plan = state.get("plan") if not isinstance(plan, StewardPlanResponse): raise RuntimeError("LangGraph 小财管家动作规划缺少有效计划。") return {"plan": StewardActionPlanBuilder().attach_action_steps(plan)} def _last_intent_call_traces( self, fallback_traces: list[dict[str, Any]], ) -> list[dict[str, Any]]: return getattr(self.intent_agent, "last_call_traces", []) or fallback_traces