refactor(server): steward 决策链路改用 LangGraph 编排

- 新增 StewardGraphPlannerService,用 LangGraph 状态图编排意图识别→流程判断→模型/规则分支→兜底,替代原 planner 内线性调用
- 新增 StewardGraphRuntimeService 编排运行时决策与槽位决策;StewardActionContracts/Executor 统一动作合约与执行
- steward_intent_agent/application_fact_resolver/runtime_chat 适配图执行器,config 暴露图相关开关
- pyproject/uv.lock 新增 langgraph 依赖
- 新增 graph_planner/graph_runtime/action_executor 测试,更新 intent_agent/planner/fact_resolver/runtime_chat/reimbursement 测试
This commit is contained in:
caoxiaozhu
2026-06-24 21:58:35 +08:00
parent 545b31d32f
commit 5311c99d69
25 changed files with 3580 additions and 104 deletions

View File

@@ -10,10 +10,13 @@ from fastapi.responses import StreamingResponse
from sqlalchemy import select
from sqlalchemy.orm import Session
from app.api.deps import get_db
from app.api.deps import CurrentUserContext, get_current_user, get_db
from app.core.config import get_settings
from app.models.financial_record import ExpenseClaim
from app.schemas.common import ErrorResponse
from app.schemas.steward import (
StewardActionExecuteRequest,
StewardActionExecuteResponse,
StewardPlanRequest,
StewardPlanResponse,
StewardRuntimeDecisionRequest,
@@ -27,6 +30,9 @@ from app.services.expense_claim_draft_flow import APPROVED_APPLICATION_LINK_STAT
from app.services.expense_claims import ExpenseClaimService
from app.services.runtime_chat import RuntimeChatService
from app.services.steward_flow_state import StewardFlowStateService
from app.services.steward_graph_action_runtime import StewardGraphActionRuntime
from app.services.steward_graph_planner import StewardGraphPlannerService
from app.services.steward_graph_runtime import StewardGraphRuntime
from app.services.steward_intent_agent import StewardIntentAgent
from app.services.steward_off_topic_agent import StewardOffTopicAgent
from app.services.steward_planner import StewardPlannerService
@@ -35,6 +41,8 @@ from app.services.steward_slot_decision_agent import StewardSlotDecisionAgent
router = APIRouter(prefix="/steward")
DbSession = Annotated[Session, Depends(get_db)]
CurrentUser = Annotated[CurrentUserContext, Depends(get_current_user)]
StewardPlannerLike = StewardPlannerService | StewardGraphPlannerService
@router.post(
@@ -69,7 +77,7 @@ def create_steward_slot_decision(
payload: StewardSlotDecisionRequest,
db: DbSession,
) -> StewardSlotDecisionResponse:
return StewardSlotDecisionAgent(RuntimeChatService(db)).decide(payload)
return _decide_steward_slot(payload, RuntimeChatService(db))
@router.post(
@@ -83,10 +91,27 @@ def create_steward_runtime_decision(
db: DbSession,
) -> StewardRuntimeDecisionResponse:
hydrated_payload = _hydrate_runtime_decision_payload(db, payload)
decision = StewardRuntimeDecisionAgent(RuntimeChatService(db)).decide(hydrated_payload)
decision = _decide_steward_runtime(hydrated_payload, RuntimeChatService(db))
return _attach_runtime_conversation_state(db, hydrated_payload, decision)
@router.post(
"/actions/execute",
response_model=StewardActionExecuteResponse,
summary="执行小财管家白名单动作",
description=(
"按 LangGraph 规划出的 action step 执行确定性业务动作;"
"当 action 未知、缺字段、缺确认或预检查未通过时直接阻断并返回结构化兜底结果。"
),
)
def execute_steward_action(
payload: StewardActionExecuteRequest,
db: DbSession,
current_user: CurrentUser,
) -> StewardActionExecuteResponse:
return StewardGraphActionRuntime(db).execute(payload, current_user)
@router.post(
"/plans/stream",
summary="流式生成小财管家任务计划",
@@ -101,7 +126,7 @@ async def stream_steward_plan(payload: StewardPlanRequest, db: DbSession) -> Str
async def _iter_steward_plan_events(
payload: StewardPlanRequest,
planner: StewardPlannerService,
planner: StewardPlannerLike,
db: Session,
) -> AsyncIterator[str]:
yield _encode_stream_event(
@@ -135,18 +160,53 @@ def _encode_stream_event(event: str, data: dict[str, Any]) -> str:
return json.dumps({"event": event, "data": data}, ensure_ascii=False) + "\n"
def _build_steward_planner(db: Session) -> StewardPlannerService:
def _build_steward_planner(db: Session) -> StewardPlannerLike:
runtime_chat = RuntimeChatService(db)
if get_settings().steward_agent_runtime.strip().lower() == "langgraph":
return StewardGraphPlannerService(
intent_agent=StewardIntentAgent(runtime_chat),
off_topic_agent=StewardOffTopicAgent(runtime_chat),
)
return StewardPlannerService(
intent_agent=StewardIntentAgent(runtime_chat),
off_topic_agent=StewardOffTopicAgent(runtime_chat),
)
def _decide_steward_slot(
payload: StewardSlotDecisionRequest,
runtime_chat: Any,
) -> StewardSlotDecisionResponse:
legacy_agent = StewardSlotDecisionAgent(runtime_chat)
if not _should_use_langgraph_runtime():
return legacy_agent.decide(payload)
try:
return StewardGraphRuntime(runtime_chat).decide_slot(payload)
except Exception:
return legacy_agent.decide(payload)
def _decide_steward_runtime(
payload: StewardRuntimeDecisionRequest,
runtime_chat: Any,
) -> StewardRuntimeDecisionResponse:
legacy_agent = StewardRuntimeDecisionAgent(runtime_chat)
if not _should_use_langgraph_runtime():
return legacy_agent.decide(payload)
try:
return StewardGraphRuntime(runtime_chat).decide_runtime(payload)
except Exception:
return legacy_agent.decide(payload)
def _should_use_langgraph_runtime() -> bool:
return get_settings().steward_agent_runtime.strip().lower() == "langgraph"
def _hydrate_required_application_gate(
db: Session,
payload: StewardPlanRequest,
planner: StewardPlannerService,
planner: StewardPlannerLike,
) -> StewardPlanRequest:
context_json = dict(payload.context_json or {})
required_gate = context_json.get("required_application_gate")

View File

@@ -73,6 +73,7 @@ class Settings(BaseSettings):
onlyoffice_backend_url: str = Field(default="", alias="ONLYOFFICE_BACKEND_URL")
onlyoffice_jwt_secret: str = Field(default="", alias="ONLYOFFICE_JWT_SECRET")
hermes_agent_shared_token: str = Field(default="", alias="HERMES_AGENT_SHARED_TOKEN")
steward_agent_runtime: str = Field(default="langgraph", alias="STEWARD_AGENT_RUNTIME")
log_level: str = Field(default="INFO", alias="LOG_LEVEL")
log_dir: str = Field(default="logs", alias="LOG_DIR")

View File

@@ -4,11 +4,11 @@ from typing import Any, Literal
from pydantic import BaseModel, Field
StewardTaskType = Literal["expense_application", "reimbursement"]
StewardAssignedAgent = Literal["application_assistant", "reimbursement_assistant"]
StewardPlanningSource = Literal["llm_function_call", "rule_fallback"]
StewardPlanNextAction = Literal["confirm_flow", "confirm_task", "delegate_task", "none"]
StewardRequestedAction = Literal["preview", "save_draft", "submit"]
StewardSlotDecisionSource = Literal["llm_function_call", "rule_fallback"]
StewardSlotNextAction = Literal["ask_user", "render_preview"]
StewardRuntimeDecisionSource = Literal["llm_function_call", "rule_fallback"]
@@ -22,6 +22,22 @@ StewardRuntimeNextAction = Literal[
"cancel_current_action",
"no_op",
]
StewardActionType = Literal[
"detect_intent",
"fill_application_fields",
"build_application_preview",
"fill_reimbursement_fields",
"build_reimbursement_preview",
"validate_required_fields",
"run_duplicate_precheck",
"save_application_draft",
"submit_application",
"link_existing_application",
"create_reimbursement_draft",
"associate_attachments",
]
StewardActionStatus = Literal["completed", "planned", "pending_confirmation", "blocked"]
StewardActionExecutionStatus = Literal["succeeded", "blocked", "needs_confirmation", "failed"]
StewardTaskStatus = Literal[
"planned",
"needs_confirmation",
@@ -58,6 +74,17 @@ class StewardThinkingEvent(BaseModel):
status: str = Field(default="completed", description="事件状态。")
class StewardActionStep(BaseModel):
step_id: str = Field(description="动作步骤 ID。")
action_type: StewardActionType = Field(description="白名单动作类型。")
label: str = Field(description="用户可见动作名称。")
target_task_id: str = Field(default="", description="关联的小财管家任务 ID。")
status: StewardActionStatus = Field(default="planned", description="动作规划状态。")
requires_confirmation: bool = Field(default=False, description="执行前是否需要用户确认。")
depends_on: list[str] = Field(default_factory=list, description="前置动作步骤 ID。")
payload: dict[str, Any] = Field(default_factory=dict, description="动作执行需要的确定性载荷。")
class StewardTask(BaseModel):
task_id: str = Field(description="小财管家任务 ID。")
task_type: StewardTaskType = Field(description="任务类型。")
@@ -66,9 +93,11 @@ class StewardTask(BaseModel):
summary: str = Field(description="任务摘要。")
status: StewardTaskStatus = Field(default="needs_confirmation", description="任务状态。")
confidence: float = Field(default=0.0, ge=0.0, le=1.0, description="识别置信度。")
requested_action: StewardRequestedAction = Field(default="preview", description="用户希望执行的动作:预览、保存草稿或提交。")
ontology_fields: dict[str, str] = Field(default_factory=dict, description="归一化后的业务本体字段。")
missing_fields: list[str] = Field(default_factory=list, description="仍缺失的本体字段。")
confirmation_required: bool = Field(default=True, description="执行前是否需要用户确认。")
action_steps: list[StewardActionStep] = Field(default_factory=list, description="当前任务的白名单动作步骤。")
class StewardAttachmentGroup(BaseModel):
@@ -120,6 +149,7 @@ class StewardPlanResponse(BaseModel):
summary: str = Field(description="计划摘要。")
thinking_events: list[StewardThinkingEvent] = Field(default_factory=list, description="过程摘要事件。")
tasks: list[StewardTask] = Field(default_factory=list, description="拆解后的任务。")
action_steps: list[StewardActionStep] = Field(default_factory=list, description="本计划的全量白名单动作步骤。")
attachment_groups: list[StewardAttachmentGroup] = Field(default_factory=list, description="附件归集建议。")
confirmation_groups: list[StewardConfirmationAction] = Field(default_factory=list, description="等待用户确认的动作。")
pending_flow_confirmation: StewardPendingFlowConfirmation = Field(
@@ -134,6 +164,30 @@ class StewardPlanResponse(BaseModel):
)
class StewardActionExecuteRequest(BaseModel):
action_type: str = Field(description="待执行的白名单动作类型。未知动作会被 executor 阻断。")
message: str = Field(default="", description="触发该动作的用户原始话术。")
plan_id: str = Field(default="", description="关联的小财管家计划 ID。")
conversation_id: str | None = Field(default=None, description="关联会话 ID。")
task: StewardTask | None = Field(default=None, description="动作所属任务快照。")
action_step: StewardActionStep | None = Field(default=None, description="规划侧生成的动作步骤快照。")
confirmed: bool = Field(default=False, description="用户是否已确认执行该动作。")
context_json: dict[str, Any] = Field(default_factory=dict, description="前端或运行时补充上下文。")
client_trace_id: str = Field(default="", description="前端幂等或追踪 ID。")
class StewardActionExecuteResponse(BaseModel):
action_type: str = Field(description="实际处理的动作类型。")
status: StewardActionExecutionStatus = Field(description="动作执行状态。")
execution_source: str = Field(default="langgraph_action", description="执行来源或兜底来源。")
fallback_used: bool = Field(default=False, description="是否走了规则兜底执行。")
requires_confirmation: bool = Field(default=False, description="是否仍需用户确认。")
message: str = Field(description="面向用户展示的执行结果。")
blocked_reasons: list[str] = Field(default_factory=list, description="阻断原因。")
result_payload: dict[str, Any] = Field(default_factory=dict, description="业务服务返回的结构化结果。")
trace: list[dict[str, Any]] = Field(default_factory=list, description="动作执行轨迹。")
class StewardSlotOption(BaseModel):
label: str = Field(description="用户可见选项文案。")
value: str = Field(description="写回本体字段的选项值。")

View File

@@ -3,7 +3,6 @@ from __future__ import annotations
import re
from datetime import date, timedelta
CITY_NAMES = (
"北京",
"上海",
@@ -34,6 +33,16 @@ MONTH_DAY_PATTERN = re.compile(r"(?P<month>\d{1,2})\s*月\s*(?P<day>\d{1,2})\s*(
ISO_DATE_PATTERN = re.compile(
r"(?P<year>\d{4})[-/年](?P<month>\d{1,2})[-/月](?P<day>\d{1,2})(?:日)?"
)
ISO_DATE_RANGE_PATTERN = re.compile(
r"(?P<start_year>\d{4})[-/年](?P<start_month>\d{1,2})[-/月](?P<start_day>\d{1,2})(?:日)?"
r"(?:至|到|~||—|--)"
r"(?P<end_year>\d{4})[-/年](?P<end_month>\d{1,2})[-/月](?P<end_day>\d{1,2})(?:日)?"
)
MONTH_DAY_RANGE_PATTERN = re.compile(
r"(?P<start_month>\d{1,2})\s*月\s*(?P<start_day>\d{1,2})\s*(?:日|号)?"
r"(?:至|到|~||—|--|-)"
r"(?:(?P<end_month>\d{1,2})\s*月\s*)?(?P<end_day>\d{1,2})\s*(?:日|号)?"
)
class ApplicationFactResolver:
@@ -60,6 +69,38 @@ class ApplicationFactResolver:
if "后天" in compact:
return (base_date + timedelta(days=2)).isoformat()
iso_range = ISO_DATE_RANGE_PATTERN.search(compact)
if iso_range:
start_date = ApplicationFactResolver.safe_date(
int(iso_range.group("start_year")),
int(iso_range.group("start_month")),
int(iso_range.group("start_day")),
)
end_date = ApplicationFactResolver.safe_date(
int(iso_range.group("end_year")),
int(iso_range.group("end_month")),
int(iso_range.group("end_day")),
)
if start_date and end_date:
return f"{start_date}{end_date}"
month_day_range = MONTH_DAY_RANGE_PATTERN.search(compact)
if month_day_range:
start_month = int(month_day_range.group("start_month"))
end_month = int(month_day_range.group("end_month") or start_month)
start_date = ApplicationFactResolver.safe_date(
base_date.year,
start_month,
int(month_day_range.group("start_day")),
)
end_date = ApplicationFactResolver.safe_date(
base_date.year,
end_month,
int(month_day_range.group("end_day")),
)
if start_date and end_date:
return f"{start_date}{end_date}"
iso_match = ISO_DATE_PATTERN.search(compact)
if iso_match:
return ApplicationFactResolver.safe_date(
@@ -100,6 +141,9 @@ class ApplicationFactResolver:
def extract_reason(segment: str, task_type: str) -> str:
cleaned = re.sub(r"\s+", "", segment).strip(",。;; ")
if task_type == "expense_application":
normalized = strip_application_non_reason_tokens(cleaned)
if normalized:
return normalized
match = re.search(r"(辅助|支持|协助|支撑|参加|拜访|调研|实施|部署|审核).+", cleaned)
if match:
return strip_trailing_connectors(match.group(0))
@@ -130,6 +174,47 @@ def strip_trailing_connectors(value: str) -> str:
return re.sub(r"(?:并且|而且|同时|另外|还需要|需要)$", "", cleaned).strip(",。;; ")
def strip_application_non_reason_tokens(value: str) -> str:
cleaned = str(value or "").strip(",。;; ")
if not cleaned:
return ""
preserved_transport_phrases: dict[str, str] = {}
def preserve_transport_phrase(match: re.Match[str]) -> str:
placeholder = f"__TRANSPORT_REASON_{len(preserved_transport_phrases)}__"
preserved_transport_phrases[placeholder] = match.group(0)
return placeholder
cleaned = re.sub(
r"(?:高铁|动车|火车|飞机|机票|航班|轮船|出租车|的士|网约车|打车|地铁|公交)往返",
preserve_transport_phrase,
cleaned,
)
cleaned = ISO_DATE_RANGE_PATTERN.sub("", cleaned)
cleaned = MONTH_DAY_RANGE_PATTERN.sub("", cleaned)
cleaned = ISO_DATE_PATTERN.sub("", cleaned)
cleaned = MONTH_DAY_PATTERN.sub("", cleaned)
cleaned = re.sub(r"昨天|前天|明天|后天|今天|上周|上月|下周|下月|近期|月底", "", cleaned)
cleaned = re.sub(r"[0-9一二两三四五六七八九十]+天", "", cleaned)
cleaned = re.sub(fr"(?:去|到|赴|前往)(?:{'|'.join(CITY_NAMES)})", "", cleaned)
cleaned = re.sub(fr"(?:出差|差旅)(?:{'|'.join(CITY_NAMES)})", "", cleaned)
cleaned = re.sub(fr"(?:{'|'.join(CITY_NAMES)})(?=出差|差旅)", "", cleaned)
cleaned = re.sub(r"出差|差旅|费用申请|出差申请|差旅申请", "", cleaned)
cleaned = re.sub(r"交通方式?|出行方式?", "", cleaned)
cleaned = re.sub(r"高铁|动车|火车|飞机|机票|航班|轮船|出租车|的士|网约车|打车|地铁|公交", "", cleaned)
cleaned = re.sub(
r"保存草稿|存草稿|先保存|直接提交|提交申请|确认提交|提交审批|保存|提交|申请|发起|创建|我想要|我想|我要|请帮我|帮我",
"",
cleaned,
)
cleaned = re.sub(r"(^|[,。;;、])(?:交通|出行)(?=$|[,。;;、])", r"\1", cleaned)
for placeholder, phrase in preserved_transport_phrases.items():
cleaned = cleaned.replace(placeholder, phrase)
cleaned = re.sub(r"[,。;;、]+", "", cleaned).strip(",。;;、的 ")
return strip_trailing_connectors(cleaned)
def resolve_application_facts(segment: str, task_type: str, base_date: date) -> dict[str, str]:
fields = {
"expense_type": ApplicationFactResolver.infer_expense_type(segment, task_type),

View File

@@ -244,6 +244,7 @@ class RuntimeChatService:
timeout_seconds: int | None = None,
slot_timeouts: dict[str, int] | None = None,
max_attempts: int | None = None,
use_failure_cooldown: bool = True,
) -> RuntimeToolCallResult:
configs: list[dict[str, str]] = []
calls: list[RuntimeChatCallTrace] = []
@@ -272,7 +273,7 @@ class RuntimeChatService:
for attempt in range(1, resolved_max_attempts + 1):
for config in configs:
cache_key = self._build_slot_cache_key(config)
if _slot_failure_until.get(cache_key, 0.0) > monotonic():
if use_failure_cooldown and _slot_failure_until.get(cache_key, 0.0) > monotonic():
logger.info(
"Skip runtime chat tool slot=%s provider=%s because it is in cooldown",
config["slot"],
@@ -330,9 +331,10 @@ class RuntimeChatService:
)
except Exception as exc:
duration_ms = int((monotonic() - started) * 1000)
_slot_failure_until[cache_key] = (
monotonic() + DEFAULT_RUNTIME_CHAT_FAILURE_COOLDOWN_SECONDS
)
if use_failure_cooldown:
_slot_failure_until[cache_key] = (
monotonic() + DEFAULT_RUNTIME_CHAT_FAILURE_COOLDOWN_SECONDS
)
calls.append(
RuntimeChatCallTrace(
slot=config["slot"],

View File

@@ -0,0 +1,224 @@
from __future__ import annotations
from typing import Any
from app.schemas.steward import (
StewardActionStatus,
StewardActionStep,
StewardPlanResponse,
StewardTask,
)
class StewardActionPlanBuilder:
"""把小财管家任务转换为确定性的白名单动作步骤。"""
def attach_action_steps(self, plan: StewardPlanResponse) -> StewardPlanResponse:
if not plan.tasks:
return plan.model_copy(update={"action_steps": []})
tasks: list[StewardTask] = []
plan_steps = [self._build_detect_intent_step(plan)]
for task in plan.tasks:
task_steps = self.build_task_action_steps(task)
tasks.append(task.model_copy(update={"action_steps": task_steps}))
plan_steps.extend(task_steps)
return plan.model_copy(update={"tasks": tasks, "action_steps": plan_steps})
def build_task_action_steps(self, task: StewardTask) -> list[StewardActionStep]:
if task.task_type == "expense_application":
return self._build_application_steps(task)
return self._build_reimbursement_steps(task)
def _build_application_steps(self, task: StewardTask) -> list[StewardActionStep]:
steps = [
self._build_task_step(
task,
1,
"fill_application_fields",
"填充申请字段",
payload=self._field_payload(task),
),
self._build_task_step(
task,
2,
"build_application_preview",
"展示申请核对表",
depends_on_index=1,
payload={"task_id": task.task_id, "ontology_fields": task.ontology_fields},
),
self._build_task_step(
task,
3,
"validate_required_fields",
"校验申请必填字段",
status=self._validation_status(task),
depends_on_index=2,
payload={"missing_fields": task.missing_fields},
),
]
if task.requested_action == "save_draft":
steps.append(
self._build_task_step(
task,
4,
"save_application_draft",
"保存申请草稿",
status=self._side_effect_status(task, requires_confirmation=False),
depends_on_index=3,
payload={
"task_id": task.task_id,
"requested_action": task.requested_action,
"ontology_fields": task.ontology_fields,
},
)
)
elif task.requested_action == "submit":
steps.append(
self._build_task_step(
task,
4,
"run_duplicate_precheck",
"检查重复或冲突申请",
status=self._side_effect_status(task, requires_confirmation=False),
depends_on_index=3,
payload={"task_id": task.task_id, "precheck_type": "travel_overlap"},
)
)
steps.append(
self._build_task_step(
task,
5,
"submit_application",
"提交申请审批",
status=self._side_effect_status(task, requires_confirmation=True),
requires_confirmation=not bool(task.missing_fields),
depends_on_index=4,
payload={
"task_id": task.task_id,
"requested_action": task.requested_action,
"confirmation_required": True,
},
)
)
return steps
def _build_reimbursement_steps(self, task: StewardTask) -> list[StewardActionStep]:
steps = [
self._build_task_step(
task,
1,
"fill_reimbursement_fields",
"填充报销字段",
payload=self._field_payload(task),
),
self._build_task_step(
task,
2,
"build_reimbursement_preview",
"展示报销核对表",
depends_on_index=1,
payload={"task_id": task.task_id, "ontology_fields": task.ontology_fields},
),
self._build_task_step(
task,
3,
"validate_required_fields",
"校验报销必填字段",
status=self._validation_status(task),
depends_on_index=2,
payload={"missing_fields": task.missing_fields},
),
self._build_task_step(
task,
4,
"create_reimbursement_draft",
"创建报销草稿",
status=self._side_effect_status(task, requires_confirmation=False),
depends_on_index=3,
payload={
"task_id": task.task_id,
"requested_action": task.requested_action,
"ontology_fields": task.ontology_fields,
},
),
]
if task.ontology_fields.get("attachments"):
steps.append(
self._build_task_step(
task,
5,
"associate_attachments",
"关联报销附件",
status=self._side_effect_status(task, requires_confirmation=False),
depends_on_index=4,
payload={"task_id": task.task_id, "attachments": task.ontology_fields["attachments"]},
)
)
return steps
@staticmethod
def _build_detect_intent_step(plan: StewardPlanResponse) -> StewardActionStep:
return StewardActionStep(
step_id="plan:00:detect_intent",
action_type="detect_intent",
label="识别业务意图",
status="completed",
payload={
"planning_source": plan.planning_source,
"plan_status": plan.plan_status,
},
)
def _build_task_step(
self,
task: StewardTask,
index: int,
action_type: str,
label: str,
*,
status: StewardActionStatus = "planned",
requires_confirmation: bool = False,
depends_on_index: int | None = None,
payload: dict[str, Any] | None = None,
) -> StewardActionStep:
step_id = self._step_id(task, index)
depends_on = [self._step_id(task, depends_on_index)] if depends_on_index is not None else []
return StewardActionStep(
step_id=step_id,
action_type=action_type, # type: ignore[arg-type]
label=label,
target_task_id=task.task_id,
status=status,
requires_confirmation=requires_confirmation,
depends_on=depends_on,
payload=payload or {},
)
@staticmethod
def _field_payload(task: StewardTask) -> dict[str, Any]:
return {
"task_id": task.task_id,
"task_type": task.task_type,
"requested_action": task.requested_action,
"ontology_fields": task.ontology_fields,
"missing_fields": task.missing_fields,
}
@staticmethod
def _validation_status(task: StewardTask) -> StewardActionStatus:
return "blocked" if task.missing_fields else "planned"
@staticmethod
def _side_effect_status(
task: StewardTask,
*,
requires_confirmation: bool,
) -> StewardActionStatus:
if task.missing_fields:
return "blocked"
return "pending_confirmation" if requires_confirmation else "planned"
@staticmethod
def _step_id(task: StewardTask, index: int) -> str:
return f"{task.task_id}:{index:02d}"

View File

@@ -0,0 +1,570 @@
from __future__ import annotations
from datetime import UTC, datetime
from typing import Any
from sqlalchemy.orm import Session
from app.api.deps import CurrentUserContext
from app.schemas.ontology import OntologyParseResult, OntologyPermission
from app.schemas.steward import (
StewardActionExecuteRequest,
StewardActionExecuteResponse,
StewardTask,
)
from app.schemas.user_agent import UserAgentRequest
from app.services.attachment_association_jobs import AttachmentAssociationJobRunner
from app.services.expense_claims import ExpenseClaimService
from app.services.user_agent import UserAgentService
from app.services.user_agent_application_dates import resolve_application_days_from_time_range
SUPPORTED_ACTIONS = {
"fill_application_fields",
"build_application_preview",
"fill_reimbursement_fields",
"build_reimbursement_preview",
"validate_required_fields",
"run_duplicate_precheck",
"save_application_draft",
"submit_application",
"create_reimbursement_draft",
"link_existing_application",
"associate_attachments",
}
APPLICATION_SIDE_EFFECT_ACTIONS = {"save_application_draft", "submit_application"}
REIMBURSEMENT_SIDE_EFFECT_ACTIONS = {"create_reimbursement_draft", "link_existing_application"}
NOOP_ACTIONS = {
"fill_application_fields",
"build_application_preview",
"fill_reimbursement_fields",
"build_reimbursement_preview",
"validate_required_fields",
}
TRANSPORT_MODE_LABELS = {
"train": "火车",
"rail": "火车",
"high_speed_rail": "火车",
"flight": "飞机",
"airplane": "飞机",
"plane": "飞机",
"ship": "轮船",
"boat": "轮船",
"taxi": "出租车",
}
EXPENSE_TYPE_LABELS = {
"travel": "差旅费",
"transport": "交通费",
"hotel": "住宿费",
"meal": "业务招待费",
"meeting": "会务费",
"office": "办公用品费",
"other": "其他费用",
}
APPLICATION_TYPE_LABELS = {
"travel": "差旅费用申请",
"travel_application": "差旅费用申请",
"transport": "交通费用申请",
"hotel": "住宿费用申请",
"meeting": "会务费用申请",
"office": "办公费用申请",
}
class StewardActionExecutor:
"""执行 LangGraph 规划出的确定性白名单动作。"""
def __init__(self, db: Session) -> None:
self.db = db
def execute(
self,
request: StewardActionExecuteRequest,
current_user: CurrentUserContext,
) -> StewardActionExecuteResponse:
action_type = self._normalize_action_type(request.action_type)
trace = [self._trace("received", action_type=action_type, plan_id=request.plan_id)]
if action_type not in SUPPORTED_ACTIONS:
return self._blocked(
action_type,
f"不支持的小财管家动作:{action_type or '空动作'}",
trace=[*trace, self._trace("blocked", reason="unsupported_action")],
)
task = request.task
if task is None and action_type not in NOOP_ACTIONS:
return self._blocked(
action_type,
"动作缺少任务快照,无法安全执行。",
trace=[*trace, self._trace("blocked", reason="missing_task")],
)
blocked_reasons = self._resolve_task_blockers(task)
if blocked_reasons:
return self._blocked(
action_type,
"当前任务仍有必填信息缺失,已停止执行动作。",
blocked_reasons=blocked_reasons,
trace=[*trace, self._trace("blocked", reason="missing_fields")],
)
if action_type in NOOP_ACTIONS:
return StewardActionExecuteResponse(
action_type=action_type,
status="succeeded",
message="动作已确认,无需调用外部业务服务。",
result_payload={
"task_id": task.task_id if task is not None else "",
"action_type": action_type,
},
trace=[*trace, self._trace("completed", mode="noop")],
)
if action_type == "run_duplicate_precheck":
return self._run_duplicate_precheck(request, current_user, trace)
if action_type in APPLICATION_SIDE_EFFECT_ACTIONS:
return self._execute_application_action(request, current_user, action_type, trace)
if action_type in REIMBURSEMENT_SIDE_EFFECT_ACTIONS:
return self._execute_reimbursement_action(request, current_user, action_type, trace)
if action_type == "associate_attachments":
return self._execute_associate_attachments_action(request, current_user, trace)
return self._blocked(
action_type,
f"动作 {action_type} 暂未接入执行器。",
trace=[*trace, self._trace("blocked", reason="unwired_action")],
)
def _run_duplicate_precheck(
self,
request: StewardActionExecuteRequest,
current_user: CurrentUserContext,
trace: list[dict[str, Any]],
) -> StewardActionExecuteResponse:
payload = self._build_application_user_agent_request(
request,
current_user,
action_type="submit_application",
force_submit_message=False,
)
service = UserAgentService(self.db)
facts = service._resolve_expense_application_facts(payload)
duplicate = service._find_duplicate_expense_application_record(payload, facts)
if duplicate is not None:
result_payload = {
"status": "blocked",
"blocking": True,
"duplicate_claim_id": str(duplicate.id or ""),
"duplicate_claim_no": str(duplicate.claim_no or ""),
"duplicate_stage": str(duplicate.approval_stage or ""),
}
return StewardActionExecuteResponse(
action_type="run_duplicate_precheck",
status="blocked",
message="检测到同一申请人、同一申请类型、同一时间段已有申请单,已停止直接提交。",
blocked_reasons=["duplicate_application"],
result_payload=result_payload,
trace=[*trace, self._trace("blocked", reason="duplicate_application")],
)
result_payload = {"status": "ok", "blocking": False}
return StewardActionExecuteResponse(
action_type="run_duplicate_precheck",
status="succeeded",
message="未发现重复或冲突申请,可以继续提交。",
result_payload=result_payload,
trace=[*trace, self._trace("completed", mode="duplicate_precheck")],
)
def _execute_application_action(
self,
request: StewardActionExecuteRequest,
current_user: CurrentUserContext,
action_type: str,
trace: list[dict[str, Any]],
) -> StewardActionExecuteResponse:
if action_type == "submit_application" and not request.confirmed:
return StewardActionExecuteResponse(
action_type=action_type,
status="needs_confirmation",
requires_confirmation=True,
message="提交申请前需要用户确认。请确认申请核对表无误后再提交。",
trace=[*trace, self._trace("needs_confirmation")],
)
if action_type == "submit_application":
precheck_blocker = self._resolve_submit_precheck_blocker(request.context_json)
if precheck_blocker:
return self._blocked(
action_type,
precheck_blocker,
blocked_reasons=["precheck_not_passed"],
trace=[*trace, self._trace("blocked", reason="precheck_not_passed")],
)
payload = self._build_application_user_agent_request(
request,
current_user,
action_type=action_type,
force_submit_message=action_type == "submit_application",
)
try:
user_agent_response = UserAgentService(self.db)._build_expense_application_response(
payload,
risk_flags=[],
)
except ValueError as exc:
return self._failed(action_type, str(exc), trace)
draft_payload = (
user_agent_response.draft_payload.model_dump(mode="json")
if user_agent_response.draft_payload is not None
else None
)
result_payload = {
"answer": user_agent_response.answer,
"suggested_actions": [
action.model_dump(mode="json")
for action in user_agent_response.suggested_actions
],
"requires_confirmation": user_agent_response.requires_confirmation,
"draft_payload": draft_payload,
}
status = "succeeded" if draft_payload is not None else "blocked"
blocked_reasons = [] if draft_payload is not None else ["application_not_persisted"]
return StewardActionExecuteResponse(
action_type=action_type,
status=status,
message=user_agent_response.answer,
blocked_reasons=blocked_reasons,
result_payload=result_payload,
trace=[*trace, self._trace("completed", service="UserAgentService")],
)
def _execute_reimbursement_action(
self,
request: StewardActionExecuteRequest,
current_user: CurrentUserContext,
action_type: str,
trace: list[dict[str, Any]],
) -> StewardActionExecuteResponse:
context_json = self._build_reimbursement_context_json(request, current_user)
run_id = self._build_run_id(request, action_type)
ontology = OntologyParseResult(
scenario="expense",
intent="draft",
permission=OntologyPermission(
level="draft_write",
allowed=True,
reason="小财管家白名单动作创建报销草稿。",
),
confidence=1.0,
run_id=run_id,
)
try:
result = ExpenseClaimService(self.db).save_or_submit_from_ontology(
run_id=run_id,
user_id=current_user.username,
message=self._resolve_message(request),
ontology=ontology,
context_json=context_json,
)
except ValueError as exc:
return self._failed(action_type, str(exc), trace)
persisted = str(result.get("claim_id") or "").strip() and str(result.get("status") or "").strip() == "draft"
return StewardActionExecuteResponse(
action_type=action_type,
status="succeeded" if persisted else "blocked",
message=str(result.get("message") or "报销草稿已生成。").strip(),
blocked_reasons=[] if persisted else ["reimbursement_not_persisted"],
result_payload=result,
trace=[*trace, self._trace("completed", service="ExpenseClaimService")],
)
def _execute_associate_attachments_action(
self,
request: StewardActionExecuteRequest,
current_user: CurrentUserContext,
trace: list[dict[str, Any]],
) -> StewardActionExecuteResponse:
receipt_ids = self._resolve_receipt_ids(request)
if not receipt_ids:
return self._blocked(
"associate_attachments",
"关联附件前需要先提供票据夹 receipt_ids。",
blocked_reasons=["missing_receipt_ids"],
trace=[*trace, self._trace("blocked", reason="missing_receipt_ids")],
)
try:
result = AttachmentAssociationJobRunner(self.db).run(
receipt_ids=receipt_ids,
current_user=current_user,
)
except ValueError as exc:
return self._blocked(
"associate_attachments",
str(exc),
blocked_reasons=["attachment_association_blocked"],
trace=[*trace, self._trace("blocked", reason="attachment_association_blocked")],
)
except Exception as exc:
return self._failed("associate_attachments", str(exc), trace)
return StewardActionExecuteResponse(
action_type="associate_attachments",
status="succeeded",
message=str(
result.get("message")
or f"已自动关联到 {result.get('claim_no') or '报销草稿'}"
).strip(),
result_payload={
**dict(result or {}),
"receipt_ids": receipt_ids,
},
trace=[*trace, self._trace("completed", service="AttachmentAssociationJobRunner")],
)
def _build_application_user_agent_request(
self,
request: StewardActionExecuteRequest,
current_user: CurrentUserContext,
*,
action_type: str,
force_submit_message: bool,
) -> UserAgentRequest:
run_id = self._build_run_id(request, action_type)
context_json = self._build_application_context_json(request, current_user, action_type)
message = self._resolve_message(request)
if force_submit_message and "确认提交" not in message and "直接提交" not in message:
message = "\n".join([message, "确认提交"]).strip()
ontology = OntologyParseResult(
scenario="expense",
intent="operate",
permission=OntologyPermission(
level="approval_required",
allowed=True,
reason="小财管家白名单动作执行申请操作。",
),
confidence=1.0,
run_id=run_id,
)
return UserAgentRequest(
run_id=run_id,
user_id=current_user.username,
message=message,
ontology=ontology,
context_json=context_json,
tool_payload={},
selected_capability_codes=[],
degraded=False,
requires_confirmation=False,
)
def _build_application_context_json(
self,
request: StewardActionExecuteRequest,
current_user: CurrentUserContext,
action_type: str,
) -> dict[str, Any]:
fields = self._resolve_ontology_fields(request.task)
preview_fields = {
"applicationType": self._resolve_application_type_label(fields.get("expense_type")),
"time": str(fields.get("time_range") or ""),
"location": str(fields.get("location") or ""),
"reason": str(fields.get("reason") or ""),
"days": self._resolve_days_text(fields.get("time_range")),
"transportMode": self._resolve_transport_label(fields.get("transport_mode")),
"amount": str(fields.get("amount") or ""),
"applicant": current_user.name,
"department": current_user.department_name,
"position": current_user.position,
"grade": current_user.grade,
"managerName": current_user.manager_name,
}
context_json = {
**dict(request.context_json or {}),
"session_type": "application",
"entry_source": "steward_action_executor",
"document_type": "expense_application",
"application_stage": "expense_application",
"role_codes": current_user.role_codes,
"is_admin": current_user.is_admin,
"username": current_user.username,
"name": current_user.name,
"department_name": current_user.department_name,
"position": current_user.position,
"grade": current_user.grade,
"employee_no": current_user.employee_no,
"manager_name": current_user.manager_name,
"application_preview": {"fields": preview_fields},
}
if action_type == "save_application_draft":
context_json["application_action"] = "save_draft"
context_json["application_save_mode"] = True
return context_json
def _build_reimbursement_context_json(
self,
request: StewardActionExecuteRequest,
current_user: CurrentUserContext,
) -> dict[str, Any]:
fields = self._resolve_ontology_fields(request.task)
review_form_values = {
"expense_type": self._resolve_expense_type_label(fields.get("expense_type")),
"time_range": str(fields.get("time_range") or ""),
"occurred_date": str(fields.get("time_range") or ""),
"location": str(fields.get("location") or ""),
"reason": str(fields.get("reason") or ""),
"amount": str(fields.get("amount") or ""),
"transport_mode": self._resolve_transport_label(fields.get("transport_mode")),
"attachments": str(fields.get("attachments") or ""),
}
return {
**dict(request.context_json or {}),
"session_type": "expense",
"entry_source": "steward_action_executor",
"review_action": "save_draft",
"review_form_values": review_form_values,
"user_input_text": self._resolve_message(request),
"role_codes": current_user.role_codes,
"is_admin": current_user.is_admin,
"username": current_user.username,
"name": current_user.name,
"department_name": current_user.department_name,
"position": current_user.position,
"grade": current_user.grade,
"employee_no": current_user.employee_no,
"manager_name": current_user.manager_name,
}
@staticmethod
def _resolve_receipt_ids(request: StewardActionExecuteRequest) -> list[str]:
context_json = dict(request.context_json or {})
raw_values = context_json.get("receipt_ids") or context_json.get("receiptIds") or []
if isinstance(raw_values, str):
raw_values = [item.strip() for item in raw_values.split(",")]
if not isinstance(raw_values, list):
raw_values = []
receipt_ids = [
str(item or "").strip()
for item in raw_values
if str(item or "").strip()
]
if receipt_ids:
return list(dict.fromkeys(receipt_ids))
fields = StewardActionExecutor._resolve_ontology_fields(request.task)
raw_receipt = str(fields.get("receipt_ids") or fields.get("receiptIds") or "").strip()
if not raw_receipt:
return []
return list(dict.fromkeys(item.strip() for item in raw_receipt.split(",") if item.strip()))
@staticmethod
def _resolve_submit_precheck_blocker(context_json: dict[str, Any]) -> str:
precheck = context_json.get("precheck_result") or context_json.get("precheckResult")
if not isinstance(precheck, dict):
return "提交申请前需要先完成重复/冲突预检查。"
if bool(precheck.get("blocking")):
return str(precheck.get("summary") or "重复/冲突预检查未通过,已停止提交。").strip()
status = str(precheck.get("status") or "").strip().lower()
if status not in {"ok", "passed", "succeeded"}:
return "重复/冲突预检查未通过,已停止提交。"
return ""
@staticmethod
def _resolve_task_blockers(task: StewardTask | None) -> list[str]:
if task is None:
return []
return [
str(field or "").strip()
for field in list(task.missing_fields or [])
if str(field or "").strip()
]
@staticmethod
def _resolve_ontology_fields(task: StewardTask | None) -> dict[str, str]:
if task is None or not isinstance(task.ontology_fields, dict):
return {}
return {
str(key or "").strip(): str(value or "").strip()
for key, value in task.ontology_fields.items()
if str(key or "").strip() and str(value or "").strip()
}
@staticmethod
def _resolve_message(request: StewardActionExecuteRequest) -> str:
message = str(request.message or "").strip()
if message:
return message
if request.task is not None:
return str(request.task.summary or request.task.title or "").strip()
return "小财管家动作执行"
@staticmethod
def _resolve_transport_label(value: Any) -> str:
text = str(value or "").strip()
return TRANSPORT_MODE_LABELS.get(text.lower(), text)
@staticmethod
def _resolve_application_type_label(value: Any) -> str:
text = str(value or "").strip()
return APPLICATION_TYPE_LABELS.get(text.lower(), text or "费用申请")
@staticmethod
def _resolve_expense_type_label(value: Any) -> str:
text = str(value or "").strip()
return EXPENSE_TYPE_LABELS.get(text.lower(), text or "其他费用")
@staticmethod
def _resolve_days_text(value: Any) -> str:
days = resolve_application_days_from_time_range(str(value or ""))
return f"{days}" if days else ""
@staticmethod
def _normalize_action_type(value: str) -> str:
return str(value or "").strip()
@staticmethod
def _build_run_id(request: StewardActionExecuteRequest, action_type: str) -> str:
trace_id = str(request.client_trace_id or "").strip()
if trace_id:
return f"steward-action:{trace_id}"
task_id = str(request.task.task_id if request.task is not None else "").strip()
suffix = task_id or datetime.now(UTC).strftime("%Y%m%d%H%M%S%f")
return f"steward-action:{action_type}:{suffix}"
@staticmethod
def _trace(stage: str, **extra: Any) -> dict[str, Any]:
return {
"stage": stage,
"at": datetime.now(UTC).isoformat(),
**extra,
}
@staticmethod
def _blocked(
action_type: str,
message: str,
*,
blocked_reasons: list[str] | None = None,
trace: list[dict[str, Any]] | None = None,
) -> StewardActionExecuteResponse:
return StewardActionExecuteResponse(
action_type=action_type,
status="blocked",
execution_source="rule_fallback",
fallback_used=True,
message=message,
blocked_reasons=blocked_reasons or [message],
trace=trace or [],
)
def _failed(
self,
action_type: str,
message: str,
trace: list[dict[str, Any]],
) -> StewardActionExecuteResponse:
return StewardActionExecuteResponse(
action_type=action_type,
status="failed",
message=message or "动作执行失败。",
blocked_reasons=[message] if message else [],
trace=[*trace, self._trace("failed", reason=message)],
)

View File

@@ -0,0 +1,204 @@
from __future__ import annotations
from datetime import UTC, datetime
from typing import Any, TypedDict
from langgraph.graph import END, START, StateGraph
from sqlalchemy.orm import Session
from app.api.deps import CurrentUserContext
from app.models.agent_conversation import AgentConversation
from app.schemas.steward import StewardActionExecuteRequest, StewardActionExecuteResponse
from app.services.agent_conversations import AgentConversationService
from app.services.steward_action_executor import StewardActionExecutor
ACTION_CHECKPOINT_KEY = "steward_action_checkpoint"
TERMINAL_ACTION_STATUSES = {"succeeded", "blocked", "failed"}
class StewardGraphActionState(TypedDict, total=False):
request: StewardActionExecuteRequest
current_user: CurrentUserContext
conversation: AgentConversation | None
trace_id: str
existing_result: dict[str, Any]
response: StewardActionExecuteResponse
class StewardGraphActionRuntime:
"""用 LangGraph 包装小财管家白名单动作执行、checkpoint 和幂等重放。"""
def __init__(self, db: Session) -> None:
self.db = db
self.executor = StewardActionExecutor(db)
self._graph = self._build_graph()
def execute(
self,
request: StewardActionExecuteRequest,
current_user: CurrentUserContext,
) -> StewardActionExecuteResponse:
final_state = self._graph.invoke(
{
"request": request,
"current_user": current_user,
}
)
response = final_state.get("response")
if not isinstance(response, StewardActionExecuteResponse):
raise RuntimeError("LangGraph action runtime 未生成有效执行结果。")
return response
def _build_graph(self):
graph = StateGraph(StewardGraphActionState)
graph.add_node("action_checkpoint_load", self._load_checkpoint)
graph.add_node("action_execute_node", self._execute_action)
graph.add_node("action_checkpoint_persist", self._persist_checkpoint)
graph.add_edge(START, "action_checkpoint_load")
graph.add_conditional_edges(
"action_checkpoint_load",
self._route_after_checkpoint_load,
{
"replay": "action_checkpoint_persist",
"execute": "action_execute_node",
},
)
graph.add_edge("action_execute_node", "action_checkpoint_persist")
graph.add_edge("action_checkpoint_persist", END)
return graph.compile()
def _load_checkpoint(self, state: StewardGraphActionState) -> dict[str, Any]:
request = state["request"]
conversation = self._get_or_create_conversation(request, state["current_user"])
trace_id = self._resolve_trace_id(request)
if conversation is None or not trace_id:
return {
"conversation": conversation,
"trace_id": trace_id,
}
checkpoint = self._resolve_checkpoint(conversation)
existing = dict(checkpoint.get("actions", {}).get(trace_id) or {})
existing_response = existing.get("response")
status = str(existing.get("status") or "").strip()
if isinstance(existing_response, dict) and (
status in TERMINAL_ACTION_STATUSES
or (status == "needs_confirmation" and not request.confirmed)
):
response = StewardActionExecuteResponse.model_validate(existing_response)
response.result_payload = {
**dict(response.result_payload or {}),
"idempotent_replay": True,
}
response.trace = [
*list(response.trace or []),
self._trace("checkpoint_replay", client_trace_id=trace_id),
]
return {
"conversation": conversation,
"trace_id": trace_id,
"existing_result": existing,
"response": response,
}
return {
"conversation": conversation,
"trace_id": trace_id,
"existing_result": existing,
}
@staticmethod
def _route_after_checkpoint_load(state: StewardGraphActionState) -> str:
if isinstance(state.get("response"), StewardActionExecuteResponse):
return "replay"
return "execute"
def _execute_action(self, state: StewardGraphActionState) -> dict[str, StewardActionExecuteResponse]:
response = self.executor.execute(state["request"], state["current_user"])
return {"response": response}
def _persist_checkpoint(self, state: StewardGraphActionState) -> dict[str, StewardActionExecuteResponse]:
response = state["response"]
conversation = state.get("conversation")
trace_id = str(state.get("trace_id") or "").strip()
if conversation is None or not trace_id:
return {"response": response}
checkpoint = self._resolve_checkpoint(conversation)
actions = dict(checkpoint.get("actions") or {})
request = state["request"]
response_payload = response.model_dump(mode="json")
actions[trace_id] = {
"client_trace_id": trace_id,
"action_type": response.action_type,
"status": response.status,
"request": request.model_dump(mode="json"),
"response": response_payload,
"updated_at": datetime.now(UTC).isoformat(),
}
checkpoint["actions"] = actions
if response.status == "needs_confirmation":
checkpoint["pending_interrupt"] = {
"client_trace_id": trace_id,
"action_type": response.action_type,
"message": response.message,
"requires_confirmation": True,
"updated_at": datetime.now(UTC).isoformat(),
}
elif str(checkpoint.get("pending_interrupt", {}).get("client_trace_id") or "") == trace_id:
checkpoint["pending_interrupt"] = {}
conversation.state_json = {
**dict(conversation.state_json or {}),
ACTION_CHECKPOINT_KEY: checkpoint,
}
conversation.updated_at = datetime.now(UTC)
self.db.add(conversation)
self.db.commit()
return {"response": response}
def _get_or_create_conversation(
self,
request: StewardActionExecuteRequest,
current_user: CurrentUserContext,
) -> AgentConversation | None:
conversation_id = str(request.conversation_id or "").strip()
if not conversation_id:
return None
return AgentConversationService(self.db).get_or_create_conversation(
conversation_id=conversation_id,
user_id=current_user.username,
source="user_message",
context_json={
"session_type": "steward",
"entry_source": "steward_action_executor",
"steward_state": dict((request.context_json or {}).get("steward_state") or {}),
},
)
@staticmethod
def _resolve_checkpoint(conversation: AgentConversation) -> dict[str, Any]:
checkpoint = dict((conversation.state_json or {}).get(ACTION_CHECKPOINT_KEY) or {})
checkpoint.setdefault("actions", {})
checkpoint.setdefault("pending_interrupt", {})
return checkpoint
@staticmethod
def _resolve_trace_id(request: StewardActionExecuteRequest) -> str:
trace_id = str(request.client_trace_id or "").strip()
if trace_id:
return trace_id
action_type = str(request.action_type or "").strip()
task_id = str(request.task.task_id if request.task is not None else "").strip()
if action_type and task_id:
return f"{action_type}:{task_id}"
return ""
@staticmethod
def _trace(stage: str, **extra: Any) -> dict[str, Any]:
return {
"stage": stage,
"at": datetime.now(UTC).isoformat(),
**extra,
}

View File

@@ -0,0 +1,220 @@
from __future__ import annotations
from datetime import date
from typing import Any, TypedDict
from langgraph.graph import END, START, StateGraph
from app.schemas.steward import StewardPlanRequest, StewardPlanResponse
from app.services.steward_action_contracts import StewardActionPlanBuilder
from app.services.steward_constants import BUSINESS_CANONICAL_FIELD_ORDER
from app.services.steward_intent_agent import StewardIntentAgent, StewardIntentAgentResult
from app.services.steward_model_plan_builder import StewardModelPlanBuilder
from app.services.steward_off_topic_agent import StewardOffTopicAgent
from app.services.steward_planner_extraction import StewardPlannerExtractionMixin
from app.services.steward_planner_fallback import StewardPlannerFallbackMixin
class StewardGraphState(TypedDict, total=False):
request: StewardPlanRequest
message: str
base_date: date
scenario: str | None
should_use_model: bool
intent_result: StewardIntentAgentResult | None
plan: StewardPlanResponse
model_call_traces: list[dict[str, Any]]
fallback_reason: str
class StewardGraphPlannerService(StewardPlannerFallbackMixin, StewardPlannerExtractionMixin):
"""用 LangGraph 编排小财管家的意图识别、流程判断和兜底路径。"""
def __init__(
self,
intent_agent: StewardIntentAgent | None = None,
off_topic_agent: StewardOffTopicAgent | None = None,
) -> None:
self.intent_agent = intent_agent
self.off_topic_agent = off_topic_agent
self._graph = self._build_graph()
def build_plan(self, request: StewardPlanRequest) -> StewardPlanResponse:
final_state = self._graph.invoke(
{
"request": request,
"model_call_traces": [],
"fallback_reason": "",
}
)
plan = final_state.get("plan")
if not isinstance(plan, StewardPlanResponse):
raise RuntimeError("LangGraph 小财管家规划未生成有效计划。")
return plan
def _build_graph(self):
graph = StateGraph(StewardGraphState)
graph.add_node("prepare_context", self._prepare_context)
graph.add_node("detect_model_intent", self._detect_model_intent)
graph.add_node("build_off_topic_plan", self._build_off_topic_graph_plan)
graph.add_node("build_rule_fallback_plan", self._build_rule_fallback_graph_plan)
graph.add_node("attach_action_steps", self._attach_action_steps)
graph.add_edge(START, "prepare_context")
graph.add_conditional_edges(
"prepare_context",
self._route_after_prepare_context,
{
"model": "detect_model_intent",
"off_topic": "build_off_topic_plan",
"fallback": "build_rule_fallback_plan",
},
)
graph.add_conditional_edges(
"detect_model_intent",
self._route_after_model_intent,
{
"done": "attach_action_steps",
"off_topic": "build_off_topic_plan",
"fallback": "build_rule_fallback_plan",
},
)
graph.add_edge("build_off_topic_plan", "attach_action_steps")
graph.add_edge("build_rule_fallback_plan", "attach_action_steps")
graph.add_edge("attach_action_steps", END)
return graph.compile()
def _prepare_context(self, state: StewardGraphState) -> dict[str, Any]:
request = state["request"]
message = self._clean_text(request.message)
if not message:
raise ValueError("小财管家需要一段任务描述。")
base_date = self._resolve_base_date(request.client_now_iso, request.context_json)
return {
"message": message,
"base_date": base_date,
"scenario": self._classify_irrelevant_input(message, request),
"should_use_model": bool(
self.intent_agent is not None
and self._should_use_model_intent_recognition(message, base_date, request)
),
}
@staticmethod
def _route_after_prepare_context(state: StewardGraphState) -> str:
if state.get("should_use_model"):
return "model"
if state.get("scenario") is not None:
return "off_topic"
return "fallback"
def _detect_model_intent(self, state: StewardGraphState) -> dict[str, Any]:
request = state["request"]
message = state["message"]
base_date = state["base_date"]
model_call_traces: list[dict[str, Any]] = []
if self.intent_agent is None:
return {}
try:
intent_result = self.intent_agent.detect(
request,
base_date=base_date,
canonical_fields=list(BUSINESS_CANONICAL_FIELD_ORDER),
)
if intent_result is None:
return {
"model_call_traces": self._last_intent_call_traces(model_call_traces),
"fallback_reason": (
"主模型未返回可用的 function calling 计划,已切换到规则兜底。"
),
}
model_call_traces = intent_result.model_call_traces
llm_plan = StewardModelPlanBuilder(self).build(
intent_result,
request=request,
base_date=base_date,
)
if llm_plan is None:
return {
"model_call_traces": self._last_intent_call_traces(model_call_traces),
"fallback_reason": (
"主模型未返回可用的 function calling 计划,已切换到规则兜底。"
),
}
if self._looks_like_ambiguous_travel_flow(message, base_date, request):
return {
"plan": self._build_pending_flow_fallback_plan(
request,
base_date=base_date,
model_call_traces=model_call_traces,
fallback_reason=(
"主模型返回了直接任务,但当前话术没有明确申请或报销动作;"
"服务端已改为候选流程确认,避免误入申请流程。"
),
planning_source="llm_function_call",
),
"model_call_traces": model_call_traces,
}
return {
"intent_result": intent_result,
"plan": llm_plan,
"model_call_traces": model_call_traces,
}
except Exception as exc:
return {
"model_call_traces": self._last_intent_call_traces(model_call_traces),
"fallback_reason": f"主模型 function calling 调用失败,已切换到规则兜底:{exc}",
}
@staticmethod
def _route_after_model_intent(state: StewardGraphState) -> str:
if isinstance(state.get("plan"), StewardPlanResponse):
return "done"
if state.get("scenario") is not None:
return "off_topic"
return "fallback"
def _build_off_topic_graph_plan(
self,
state: StewardGraphState,
) -> dict[str, StewardPlanResponse]:
return {
"plan": self._build_off_topic_plan(
state["request"],
scenario=str(state["scenario"] or ""),
model_call_traces=state.get("model_call_traces"),
fallback_reason=str(state.get("fallback_reason") or ""),
)
}
def _build_rule_fallback_graph_plan(
self,
state: StewardGraphState,
) -> dict[str, StewardPlanResponse]:
return {
"plan": self._build_rule_fallback_plan(
state["request"],
base_date=state["base_date"],
model_call_traces=state.get("model_call_traces"),
fallback_reason=str(state.get("fallback_reason") or ""),
)
}
@staticmethod
def _attach_action_steps(state: StewardGraphState) -> dict[str, StewardPlanResponse]:
plan = state.get("plan")
if not isinstance(plan, StewardPlanResponse):
raise RuntimeError("LangGraph 小财管家动作规划缺少有效计划。")
return {"plan": StewardActionPlanBuilder().attach_action_steps(plan)}
def _last_intent_call_traces(
self,
fallback_traces: list[dict[str, Any]],
) -> list[dict[str, Any]]:
return getattr(self.intent_agent, "last_call_traces", []) or fallback_traces

View File

@@ -0,0 +1,214 @@
from __future__ import annotations
from typing import Any, TypedDict
from langgraph.graph import END, START, StateGraph
from app.schemas.steward import (
StewardRuntimeDecisionRequest,
StewardRuntimeDecisionResponse,
StewardSlotDecisionRequest,
StewardSlotDecisionResponse,
)
from app.services.runtime_chat import RuntimeChatService
from app.services.steward_runtime_decision_agent import StewardRuntimeDecisionAgent
from app.services.steward_slot_decision_agent import StewardSlotDecisionAgent
class StewardSlotGraphState(TypedDict, total=False):
request: StewardSlotDecisionRequest
normalized_request: StewardSlotDecisionRequest
decision: StewardSlotDecisionResponse
model_call_traces: list[dict[str, Any]]
fallback_reason: str
class StewardRuntimeGraphState(TypedDict, total=False):
request: StewardRuntimeDecisionRequest
normalized_request: StewardRuntimeDecisionRequest
action_decision: StewardRuntimeDecisionResponse
decision: StewardRuntimeDecisionResponse
model_call_traces: list[dict[str, Any]]
fallback_reason: str
class StewardGraphRuntime:
"""用 LangGraph 编排会话内槽位、记忆和行动决策。"""
def __init__(self, runtime_chat_service: RuntimeChatService) -> None:
self.slot_agent = StewardSlotDecisionAgent(runtime_chat_service)
self.runtime_agent = StewardRuntimeDecisionAgent(runtime_chat_service)
self._slot_graph = self._build_slot_graph()
self._runtime_graph = self._build_runtime_graph()
def decide_slot(self, request: StewardSlotDecisionRequest) -> StewardSlotDecisionResponse:
final_state = self._slot_graph.invoke(
{
"request": request,
"model_call_traces": [],
"fallback_reason": "",
}
)
decision = final_state.get("decision")
if not isinstance(decision, StewardSlotDecisionResponse):
raise RuntimeError("LangGraph 槽位决策未生成有效结果。")
return decision
def decide_runtime(self, request: StewardRuntimeDecisionRequest) -> StewardRuntimeDecisionResponse:
final_state = self._runtime_graph.invoke(
{
"request": request,
"model_call_traces": [],
"fallback_reason": "",
}
)
decision = final_state.get("decision")
if not isinstance(decision, StewardRuntimeDecisionResponse):
raise RuntimeError("LangGraph 运行时决策未生成有效结果。")
return decision
def _build_slot_graph(self):
graph = StateGraph(StewardSlotGraphState)
graph.add_node("slot_prepare_context", self._slot_prepare_context)
graph.add_node("slot_tool_decision", self._slot_tool_decision)
graph.add_node("slot_rule_fallback", self._slot_rule_fallback)
graph.add_edge(START, "slot_prepare_context")
graph.add_edge("slot_prepare_context", "slot_tool_decision")
graph.add_conditional_edges(
"slot_tool_decision",
self._route_after_slot_tool_decision,
{
"done": END,
"fallback": "slot_rule_fallback",
},
)
graph.add_edge("slot_rule_fallback", END)
return graph.compile()
def _build_runtime_graph(self):
graph = StateGraph(StewardRuntimeGraphState)
graph.add_node("runtime_memory_context", self._runtime_memory_context)
graph.add_node("runtime_action_decision", self._runtime_action_decision)
graph.add_node("runtime_tool_decision", self._runtime_tool_decision)
graph.add_node("runtime_rule_fallback", self._runtime_rule_fallback)
graph.add_edge(START, "runtime_memory_context")
graph.add_conditional_edges(
"runtime_memory_context",
self._route_after_runtime_memory_context,
{
"action": "runtime_action_decision",
"tool": "runtime_tool_decision",
},
)
graph.add_edge("runtime_action_decision", END)
graph.add_conditional_edges(
"runtime_tool_decision",
self._route_after_runtime_tool_decision,
{
"done": END,
"fallback": "runtime_rule_fallback",
},
)
graph.add_edge("runtime_rule_fallback", END)
return graph.compile()
def _slot_prepare_context(self, state: StewardSlotGraphState) -> dict[str, Any]:
return {
"normalized_request": self.slot_agent._normalize_request(state["request"]),
}
def _slot_tool_decision(self, state: StewardSlotGraphState) -> dict[str, Any]:
request = state.get("normalized_request") or state["request"]
try:
return {"decision": self.slot_agent.decide(request)}
except Exception as exc:
return {
"model_call_traces": [
*state.get("model_call_traces", []),
self._build_failure_trace("langgraph_slot_decision", exc),
],
"fallback_reason": f"LangGraph 槽位工具节点失败,已切换规则兜底:{exc}",
}
@staticmethod
def _route_after_slot_tool_decision(state: StewardSlotGraphState) -> str:
if isinstance(state.get("decision"), StewardSlotDecisionResponse):
return "done"
return "fallback"
def _slot_rule_fallback(self, state: StewardSlotGraphState) -> dict[str, StewardSlotDecisionResponse]:
request = state.get("normalized_request") or self.slot_agent._normalize_request(state["request"])
return {
"decision": self.slot_agent._build_rule_fallback(
request,
state.get("model_call_traces", []),
)
}
def _runtime_memory_context(self, state: StewardRuntimeGraphState) -> dict[str, Any]:
request = self.runtime_agent._normalize_request(state["request"])
action_decision = self.runtime_agent._build_selected_flow_decision(request, [])
update: dict[str, Any] = {"normalized_request": request}
if action_decision is not None:
update["action_decision"] = action_decision
return update
@staticmethod
def _route_after_runtime_memory_context(state: StewardRuntimeGraphState) -> str:
if isinstance(state.get("action_decision"), StewardRuntimeDecisionResponse):
return "action"
return "tool"
def _runtime_action_decision(
self,
state: StewardRuntimeGraphState,
) -> dict[str, StewardRuntimeDecisionResponse]:
return {"decision": state["action_decision"]}
def _runtime_tool_decision(self, state: StewardRuntimeGraphState) -> dict[str, Any]:
request = state.get("normalized_request") or state["request"]
try:
return {"decision": self.runtime_agent.decide(request)}
except Exception as exc:
return {
"model_call_traces": [
*state.get("model_call_traces", []),
self._build_failure_trace("langgraph_runtime_decision", exc),
],
"fallback_reason": f"LangGraph 运行时工具节点失败,已切换规则兜底:{exc}",
}
@staticmethod
def _route_after_runtime_tool_decision(state: StewardRuntimeGraphState) -> str:
if isinstance(state.get("decision"), StewardRuntimeDecisionResponse):
return "done"
return "fallback"
def _runtime_rule_fallback(
self,
state: StewardRuntimeGraphState,
) -> dict[str, StewardRuntimeDecisionResponse]:
request = state.get("normalized_request") or self.runtime_agent._normalize_request(state["request"])
decision = self.runtime_agent._build_rule_fallback(
request,
state.get("model_call_traces", []),
)
return {
"decision": self.runtime_agent._attach_updated_steward_state(
decision,
request,
)
}
@staticmethod
def _build_failure_trace(slot: str, exc: Exception) -> dict[str, Any]:
return {
"slot": slot,
"provider": "langgraph",
"model": "",
"attempt": 1,
"status": "failed",
"error": str(exc),
}

View File

@@ -42,8 +42,9 @@ class StewardIntentAgent:
},
max_tokens=1800,
temperature=0.1,
timeout_seconds=45,
max_attempts=1,
timeout_seconds=10,
max_attempts=3,
use_failure_cooldown=False,
)
self.last_call_traces = result.calls_as_dicts()
if result.tool_call is None or result.tool_call.name != STEWARD_INTENT_FUNCTION_NAME:
@@ -113,6 +114,8 @@ class StewardIntentAgent:
"candidate_flows 同时给出 travel_application 和 travel_reimbursementtasks 保持空数组。"
"所有 ontology_fields 只能使用调用方给出的 canonical_ontology_fields"
"如果输入里出现 occurred_date、transport_type、reason_value 等别名,必须映射为 canonical 字段。"
"每个 task 必须输出 requested_action用户只是要求整理/发起但未说保存或提交时为 preview"
"用户说保存草稿、先保存、存草稿时为 save_draft用户说直接提交、提交申请、确认提交时为 submit。"
"相对日期必须以 base_date 为准转换为明确日期。"
"thinking_events 只能是面向用户的过程摘要,不能暴露内部推理链。"
"如果用户输入与出差、费用、报销、申请等财务事项完全无关"
@@ -165,6 +168,10 @@ class StewardIntentAgent:
"minimum": 0,
"maximum": 1,
},
"requested_action": {
"type": "string",
"enum": ["preview", "save_draft", "submit"],
},
"ontology_fields": {
"type": "object",
"additionalProperties": {"type": "string"},
@@ -182,6 +189,7 @@ class StewardIntentAgent:
"title",
"summary",
"confidence",
"requested_action",
"ontology_fields",
"missing_fields",
],

View File

@@ -157,6 +157,10 @@ class StewardModelPlanBuilder:
task_type=task_type,
fields=fields,
)
requested_action = self._sanitize_requested_action(
raw_task.get("requested_action"),
fallback_text=request.message,
)
tasks.append(
StewardTask(
task_id=task_id,
@@ -171,6 +175,7 @@ class StewardModelPlanBuilder:
fields=fields,
task_type=task_type,
),
requested_action=requested_action, # type: ignore[arg-type]
ontology_fields=fields,
missing_fields=missing_fields,
confirmation_required=True,
@@ -441,6 +446,17 @@ class StewardModelPlanBuilder:
missing_fields.append(key)
return missing_fields
def _sanitize_requested_action(self, raw_action: Any, *, fallback_text: str = "") -> str:
action = self.planner._clean_text(raw_action)
if action in {"preview", "save_draft", "submit"}:
return action
compact = self.planner._clean_text(fallback_text).replace(" ", "")
if any(keyword in compact for keyword in ("直接提交", "提交申请", "确认提交", "提交审批")):
return "submit"
if any(keyword in compact for keyword in ("保存草稿", "存草稿", "先保存")):
return "save_draft"
return "preview"
def _resolve_model_confidence(
self,
value: Any,
@@ -459,6 +475,8 @@ class StewardModelPlanBuilder:
if not cleaned:
return ""
if key == "time_range":
if re.search(r"(?:至|到|~||—|--|-).*\d", cleaned):
return cleaned
return self.planner._extract_time_range(cleaned, base_date) or cleaned
if key == "expense_type":
return self._normalize_expense_type_value(cleaned)

View File

@@ -3,6 +3,7 @@ from __future__ import annotations
from typing import Any
from app.schemas.steward import StewardPlanRequest, StewardPlanResponse
from app.services.steward_action_contracts import StewardActionPlanBuilder
from app.services.steward_constants import BUSINESS_CANONICAL_FIELD_ORDER
from app.services.steward_intent_agent import StewardIntentAgent
from app.services.steward_model_plan_builder import StewardModelPlanBuilder
@@ -28,10 +29,7 @@ class StewardPlannerService(StewardPlannerFallbackMixin, StewardPlannerExtractio
raise ValueError("小财管家需要一段任务描述。")
base_date = self._resolve_base_date(request.client_now_iso, request.context_json)
# 业务无关输入拦截(纯数字、问候、闲聊、乱码等):在进入 LLM/规则兜底之前直接返回 off_topic 计划。
scenario = self._classify_irrelevant_input(message, request)
if scenario is not None:
return self._build_off_topic_plan(request, scenario=scenario)
model_call_traces: list[dict[str, Any]] = []
fallback_reason = ""
if self.intent_agent is not None and self._should_use_model_intent_recognition(message, base_date, request):
@@ -50,26 +48,44 @@ class StewardPlannerService(StewardPlannerFallbackMixin, StewardPlannerExtractio
)
if llm_plan is not None:
if self._looks_like_ambiguous_travel_flow(message, base_date, request):
return self._build_pending_flow_fallback_plan(
request,
base_date=base_date,
model_call_traces=model_call_traces,
fallback_reason=(
"主模型返回了直接任务,但当前话术没有明确申请或报销动作;"
"服务端已改为候选流程确认,避免误入申请流程。"
),
planning_source="llm_function_call",
return self._with_action_steps(
self._build_pending_flow_fallback_plan(
request,
base_date=base_date,
model_call_traces=model_call_traces,
fallback_reason=(
"主模型返回了直接任务,但当前话术没有明确申请或报销动作;"
"服务端已改为候选流程确认,避免误入申请流程。"
),
planning_source="llm_function_call",
)
)
return llm_plan
return self._with_action_steps(llm_plan)
model_call_traces = getattr(self.intent_agent, "last_call_traces", []) or model_call_traces
fallback_reason = "主模型未返回可用的 function calling 计划,已切换到规则兜底。"
except Exception as exc:
model_call_traces = getattr(self.intent_agent, "last_call_traces", []) or model_call_traces
fallback_reason = f"主模型 function calling 调用失败,已切换到规则兜底:{exc}"
return self._build_rule_fallback_plan(
request,
base_date=base_date,
model_call_traces=model_call_traces,
fallback_reason=fallback_reason,
if scenario is not None:
return self._with_action_steps(
self._build_off_topic_plan(
request,
scenario=scenario,
model_call_traces=model_call_traces,
fallback_reason=fallback_reason,
)
)
return self._with_action_steps(
self._build_rule_fallback_plan(
request,
base_date=base_date,
model_call_traces=model_call_traces,
fallback_reason=fallback_reason,
)
)
@staticmethod
def _with_action_steps(plan: StewardPlanResponse) -> StewardPlanResponse:
return StewardActionPlanBuilder().attach_action_steps(plan)

View File

@@ -133,6 +133,7 @@ class StewardPlannerExtractionMixin:
summary=self._build_task_summary(draft.segment, fields),
status="needs_confirmation",
confidence=self._resolve_task_confidence(draft.segment, fields, draft.task_type),
requested_action=self._resolve_requested_action(draft.segment), # type: ignore[arg-type]
ontology_fields=fields,
missing_fields=missing_fields,
confirmation_required=True,
@@ -205,6 +206,15 @@ class StewardPlannerExtractionMixin:
def _extract_transport_mode(segment: str) -> str:
return ApplicationFactResolver.extract_transport_mode(segment)
@staticmethod
def _resolve_requested_action(segment: str) -> str:
compact = re.sub(r"\s+", "", segment)
if re.search(r"直接提交|提交申请|确认提交|提交审批", compact):
return "submit"
if re.search(r"保存草稿|存草稿|先保存|保存", compact):
return "save_draft"
return "preview"
@staticmethod
def _resolve_missing_fields(task_type: str, fields: dict[str, str]) -> list[str]:
required = ["expense_type", "time_range", "reason"]
@@ -575,4 +585,3 @@ class StewardPlannerExtractionMixin:
@staticmethod
def _clean_text(value: Any) -> str:
return re.sub(r"\s+", " ", str(value or "")).strip()

View File

@@ -34,9 +34,8 @@ class StewardPlannerFallbackMixin:
base_date: date,
request: StewardPlanRequest,
) -> bool:
if self._looks_like_ambiguous_travel_flow(message, base_date, request):
return False
return self._has_multiple_financial_demands(message)
return bool(message.strip())
@staticmethod
def _is_business_irrelevant_input(message: str, request: StewardPlanRequest) -> bool:
@@ -105,12 +104,14 @@ class StewardPlannerFallbackMixin:
request: StewardPlanRequest,
*,
scenario: str,
model_call_traces: list[dict[str, Any]] | None = None,
fallback_reason: str = "",
) -> StewardPlanResponse:
"""业务无关输入的兜底计划根据场景给出对应引导off_business 场景可由 LLM 增强。"""
base_summary = self._default_off_topic_summary(scenario)
thinking_event = self._build_off_topic_thinking_event(scenario)
suggested_prompts = self._off_topic_suggested_prompts(scenario)
model_call_traces: list[dict[str, Any]] = []
traces = list(model_call_traces or [])
# 仅对 off_business 场景尝试让 LLM 生成多样化引导;问候/无意义场景用规则模板即可。
if (
@@ -121,24 +122,36 @@ class StewardPlannerFallbackMixin:
llm_result = self.off_topic_agent.generate(request, scenario=scenario)
if llm_result is not None and llm_result.response_text:
base_summary = llm_result.response_text
model_call_traces = llm_result.model_call_traces
traces = llm_result.model_call_traces
except Exception:
# 失败时静默回退到规则模板
pass
thinking_events = [thinking_event]
if fallback_reason:
thinking_events.insert(
0,
StewardThinkingEvent(
event_id="intent_agent_rule_fallback",
stage="rule_fallback",
title="意图识别智能体进入兜底模式",
content=fallback_reason,
),
)
return StewardPlanResponse(
plan_id=f"steward_plan_{uuid.uuid4().hex[:12]}",
plan_status="off_topic",
planning_source="rule_fallback",
next_action="none",
summary=base_summary,
thinking_events=[thinking_event],
thinking_events=thinking_events,
tasks=[],
attachment_groups=[],
confirmation_groups=[],
candidate_flows=[],
suggested_prompts=suggested_prompts,
model_call_traces=model_call_traces,
model_call_traces=traces,
)
@staticmethod
@@ -435,4 +448,3 @@ class StewardPlannerFallbackMixin:
)
return drafts