From e9d7c56d5b6c11620d27fdc14401c84e0c66a6bf Mon Sep 17 00:00:00 2001 From: caoxiaozhu Date: Thu, 25 Jun 2026 15:08:56 +0800 Subject: [PATCH] =?UTF-8?q?feat(server):=20=E4=BC=9A=E8=AF=9D=E4=B8=8A?= =?UTF-8?q?=E4=B8=8B=E6=96=87=E4=BF=9D=E7=95=99=EF=BC=88LLM=20=E5=8E=86?= =?UTF-8?q?=E5=8F=B2=20+=20=E7=A1=AE=E5=AE=9A=E6=80=A7=E5=85=9C=E5=BA=95?= =?UTF-8?q?=E5=8F=8C=E4=BF=9D=E9=99=A9=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 解决用户删除草稿后说'再提交'丢失上下文的问题: - steward.py 新增 _inject_recent_conversation_history:build_plan 前读最近 10 条对话注入 context_json - steward_intent_agent.py 的 _build_messages 把 recent_history 暴露给模型,system prompt 加确认类话术引导 - 新建 steward_context_resume.py:should_resume_recent_task 检测'再提交'类话术 + state 有可恢复 flow,attach_resumed_task 从 state 恢复 task - 两个 plan 入口(/plans 和 /plans/stream)都已接入双保险 - 后端 67 passed,端到端验证'上海出差→再提交'成功恢复 task --- server/src/app/api/v1/endpoints/steward.py | 71 ++++++++ .../app/services/steward_context_resume.py | 170 ++++++++++++++++++ .../src/app/services/steward_intent_agent.py | 14 ++ server/tests/test_steward_context_resume.py | 130 ++++++++++++++ server/tests/test_steward_intent_agent.py | 55 ++++++ 5 files changed, 440 insertions(+) create mode 100644 server/src/app/services/steward_context_resume.py create mode 100644 server/tests/test_steward_context_resume.py diff --git a/server/src/app/api/v1/endpoints/steward.py b/server/src/app/api/v1/endpoints/steward.py index a877891..f9125c2 100644 --- a/server/src/app/api/v1/endpoints/steward.py +++ b/server/src/app/api/v1/endpoints/steward.py @@ -29,6 +29,10 @@ from app.services.agent_conversations import AgentConversationService from app.services.expense_claim_draft_flow import APPROVED_APPLICATION_LINK_STATUSES from app.services.expense_claims import ExpenseClaimService from app.services.runtime_chat import RuntimeChatService +from app.services.steward_context_resume import ( + attach_resumed_task, + should_resume_recent_task, +) from app.services.steward_flow_state import StewardFlowStateService from app.services.steward_graph_action_runtime import StewardGraphActionRuntime from app.services.steward_graph_planner import StewardGraphPlannerService @@ -61,7 +65,9 @@ def create_steward_plan(payload: StewardPlanRequest, db: DbSession) -> StewardPl try: planner = _build_steward_planner(db) hydrated_payload = _hydrate_required_application_gate(db, payload, planner) + hydrated_payload = _inject_recent_conversation_history(db, hydrated_payload) plan = planner.build_plan(hydrated_payload) + plan = _apply_context_resume(db, hydrated_payload, plan) return _attach_conversation_state(db, hydrated_payload, plan) except ValueError as exc: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc @@ -143,7 +149,9 @@ async def _iter_steward_plan_events( try: hydrated_payload = _hydrate_required_application_gate(db, payload, planner) + hydrated_payload = _inject_recent_conversation_history(db, hydrated_payload) plan = planner.build_plan(hydrated_payload) + plan = _apply_context_resume(db, hydrated_payload, plan) plan = _attach_conversation_state(db, hydrated_payload, plan) except ValueError as exc: yield _encode_stream_event("error", {"message": str(exc)}) @@ -495,3 +503,66 @@ def _resolve_current_steward_state( return stored_state incoming_state = context_json.get("steward_state") or context_json.get("stewardState") return incoming_state if isinstance(incoming_state, dict) else {} + + +def _inject_recent_conversation_history( + db: Session, + payload: StewardPlanRequest, +) -> StewardPlanRequest: + """读取会话最近 10 条对话历史,注入 context_json.recent_history 供 LLM 关联上下文。 + + 历史只给模型用,不返回前端。在 get_or_create_conversation 之前读取, + 使用前端传入的 conversation_id,避免把本轮消息算进历史。 + """ + context_json = dict(payload.context_json or {}) + conversation_id = _resolve_conversation_id(context_json) + if not conversation_id: + return payload + try: + recent_history = AgentConversationService(db).list_message_history( + conversation_id, + limit=10, + ) + except Exception: + recent_history = [] + if not recent_history: + return payload + return payload.model_copy( + update={ + "context_json": { + **context_json, + "recent_history": recent_history, + } + } + ) + + +def _apply_context_resume( + db: Session, + payload: StewardPlanRequest, + plan: StewardPlanResponse, +) -> StewardPlanResponse: + """确定性兜底:若 plan 无 task 且用户说"再提交"类话术,从会话状态恢复最近 task。 + + 不依赖 LLM 理解力,100% 可靠地恢复上下文。LLM 注入历史(保险②)覆盖更模糊话术。 + """ + if plan.tasks or plan.candidate_flows: + return plan + context_json = dict(payload.context_json or {}) + conversation_id = _resolve_conversation_id(context_json) + if not conversation_id: + return plan + try: + conversation = AgentConversationService(db).get_conversation(conversation_id) + except Exception: + conversation = None + if conversation is None: + return plan + current_state = _resolve_current_steward_state( + conversation.state_json if isinstance(conversation.state_json, dict) else {}, + context_json, + ) + resume_flow_id = should_resume_recent_task(payload.message, current_state) + if not resume_flow_id: + return plan + return attach_resumed_task(plan, current_state, resume_flow_id) diff --git a/server/src/app/services/steward_context_resume.py b/server/src/app/services/steward_context_resume.py new file mode 100644 index 0000000..f42cc67 --- /dev/null +++ b/server/src/app/services/steward_context_resume.py @@ -0,0 +1,170 @@ +from __future__ import annotations + +import re +from typing import Any + +from app.schemas.steward import ( + StewardPlanResponse, + StewardTask, + StewardThinkingEvent, +) + + +# "再提交"类确认话术:用户在删除草稿/解决冲突后,用这些话术恢复之前的申请 task +RESUME_CONFIRMATION_KEYWORDS = ( + "再提交", + "继续提交", + "重新提交", + "再申请", + "继续申请", + "重新申请", + "那就提交", + "那就申请", + "继续吧", + "再试一次", + "重新发起", + "重新创建", +) + +# flow_id → task_type 映射,用于从 steward_state 恢复 task +_FLOW_TASK_TYPE = { + "travel_application": "expense_application", + "travel_reimbursement": "reimbursement", +} + +_FLOW_ASSIGNED_AGENT = { + "travel_application": "application_assistant", + "travel_reimbursement": "reimbursement_assistant", +} + + +def should_resume_recent_task( + message: str, + steward_state: dict[str, Any] | None, +) -> str | None: + """检测'再提交'类话术 + steward_state 里有可恢复的 flow,返回 flow_id 或 None。 + + 确定性兜底:不依赖 LLM,当用户用确认类话术(如"再提交")且 state 里存在 + 一个仍有业务字段的 flow 时,直接恢复该 flow。 + """ + if not _matches_resume_keywords(message): + return None + if not isinstance(steward_state, dict): + return None + + active_flow = str(steward_state.get("active_flow") or "").strip() + flows = steward_state.get("flows") if isinstance(steward_state.get("flows"), dict) else {} + + # 优先恢复 active_flow,其次遍历所有 flow 找最近一个有字段的 + candidate_flow_ids: list[str] = [] + if active_flow and active_flow in flows: + candidate_flow_ids.append(active_flow) + for flow_id in flows: + if flow_id not in candidate_flow_ids: + candidate_flow_ids.append(flow_id) + + for flow_id in candidate_flow_ids: + flow = flows.get(flow_id) + if isinstance(flow, dict) and _flow_has_resumable_fields(flow): + return str(flow_id or "").strip() or None + return None + + +def resume_task_from_flow( + flow_id: str, + flow: dict[str, Any], + task_index: int = 1, +) -> StewardTask: + """从 steward_state.flows[flow_id] 恢复成 StewardTask。 + + 复用 runtime-decision 的恢复逻辑(_hydrate_runtime_state 的 field 读取), + 但产出完整 StewardTask 而非 runtime dict。 + """ + task_type = _FLOW_TASK_TYPE.get(flow_id, "expense_application") + assigned_agent = _FLOW_ASSIGNED_AGENT.get(flow_id, "application_assistant") + fields = { + str(key or "").strip(): str(value or "").strip() + for key, value in (flow.get("fields") or {}).items() + if str(key or "").strip() and str(value or "").strip() + } + missing_fields = [ + str(item or "").strip() + for item in (flow.get("missing_fields") or []) + if str(item or "").strip() + ] + task_prefix = "app" if task_type == "expense_application" else "reim" + return StewardTask( + task_id=f"task_resume_{task_prefix}_{task_index:03d}", + task_type=task_type, + assigned_agent=assigned_agent, + title="恢复上次未完成的申请" if task_type == "expense_application" else "恢复上次未完成的报销", + summary="根据之前的对话上下文恢复该任务。", + status="needs_confirmation" if missing_fields else "ready_to_delegate", + confidence=0.85, + requested_action="submit", + ontology_fields=fields, + missing_fields=missing_fields, + confirmation_required=True, + ) + + +def attach_resumed_task( + plan: StewardPlanResponse, + steward_state: dict[str, Any] | None, + flow_id: str, +) -> StewardPlanResponse: + """把恢复的 task 挂回 plan,并补充一条 thinking_event 说明上下文已恢复。""" + if not isinstance(steward_state, dict): + return plan + flows = steward_state.get("flows") if isinstance(steward_state.get("flows"), dict) else {} + flow = flows.get(flow_id) if isinstance(flows, dict) else None + if not isinstance(flow, dict): + return plan + + resumed_task = resume_task_from_flow(flow_id, flow, task_index=len(plan.tasks) + 1) + tasks = list(plan.tasks) + [resumed_task] + + thinking_events = list(plan.thinking_events) + field_summary = "、".join( + f"{key}:{value}" for key, value in resumed_task.ontology_fields.items() if value + ) + thinking_events.append( + StewardThinkingEvent( + event_id="context_resume_recovered", + stage="llm_function_call", + title="已恢复上次未完成的申请", + content=( + f"识别到您要继续之前的{('出差申请' if resumed_task.task_type == 'expense_application' else '费用报销')}," + f"已从会话上下文恢复该任务" + + (f"({field_summary})。" if field_summary else "。") + ), + status="completed", + ) + ) + + return plan.model_copy( + update={ + "tasks": tasks, + "thinking_events": thinking_events, + "planning_source": "context_resume", + "next_action": "confirm_task" if resumed_task.missing_fields else "delegate_task", + } + ) + + +def _matches_resume_keywords(message: str) -> bool: + compact = re.sub(r"\s+", "", str(message or "")) + if not compact: + return False + return any(keyword in compact for keyword in RESUME_CONFIRMATION_KEYWORDS) + + +def _flow_has_resumable_fields(flow: dict[str, Any]) -> bool: + """判断 flow 是否还有可恢复的业务字段(至少有 1 个非空字段)。""" + fields = flow.get("fields") + if not isinstance(fields, dict): + return False + return any( + str(value or "").strip() + for value in fields.values() + ) diff --git a/server/src/app/services/steward_intent_agent.py b/server/src/app/services/steward_intent_agent.py index ad5bfcd..2f851e6 100644 --- a/server/src/app/services/steward_intent_agent.py +++ b/server/src/app/services/steward_intent_agent.py @@ -103,8 +103,17 @@ class StewardIntentAgent: "employee_grade", "employee_no", "client_timezone_offset_minutes", + "recent_history", } }, + "recent_history": [ + { + "role": str(item.get("role") or "").strip(), + "content": str(item.get("content") or "").strip(), + } + for item in (request.context_json.get("recent_history") or []) + if isinstance(item, dict) and str(item.get("content") or "").strip() + ], "attachments": [ { "index": index + 1, @@ -134,6 +143,11 @@ class StewardIntentAgent: "每个 task 必须输出 requested_action:用户只是要求整理/发起但未说保存或提交时为 preview;" "用户说保存草稿、先保存、存草稿时为 save_draft;用户说直接提交、提交申请、确认提交时为 submit。" "对于查询类任务(如查询差旅标准),requested_action 固定为 preview。" + "recent_history 是本会话最近 10 轮对话(role 为 user 或 assistant)。" + "当用户说“再提交”“继续”“重新提交”“重新申请”等确认类话术时," + "必须结合 recent_history 里最近一次提到的出差/报销申请来理解用户意图," + "复用该申请的 ontology_fields 重新生成 task,而不是把确认话术当作孤立的模糊输入。" + "如果 recent_history 为空或无法关联到具体申请,才按当前 message 字面理解。" "相对日期必须以 base_date 为准转换为明确日期。" "thinking_events 只能是面向用户的过程摘要,不能暴露内部推理链。" "如果用户输入与出差、费用、报销、申请、差旅标准等财务事项完全无关" diff --git a/server/tests/test_steward_context_resume.py b/server/tests/test_steward_context_resume.py new file mode 100644 index 0000000..a66ee5d --- /dev/null +++ b/server/tests/test_steward_context_resume.py @@ -0,0 +1,130 @@ +from __future__ import annotations + +from datetime import date + +from app.schemas.steward import StewardPlanResponse, StewardTask, StewardThinkingEvent +from app.services.steward_context_resume import ( + RESUME_CONFIRMATION_KEYWORDS, + attach_resumed_task, + resume_task_from_flow, + should_resume_recent_task, +) + + +def _state_with_travel_application(fields: dict | None = None) -> dict: + return { + "active_flow": "travel_application", + "flows": { + "travel_application": { + "flow_id": "travel_application", + "status": "ready_for_confirmation", + "fields": fields or {"location": "上海", "time_range": "2026-02-20 至 2026-02-23"}, + "missing_fields": [], + } + }, + } + + +def test_should_resume_returns_flow_id_for_confirmation_keyword_with_state(): + state = _state_with_travel_application() + for keyword in ("再提交", "继续提交", "重新提交", "再申请", "重新申请", "那就提交", "继续吧", "再试一次"): + assert should_resume_recent_task(keyword, state) == "travel_application", f"keyword={keyword}" + + +def test_should_resume_returns_none_when_state_empty(): + assert should_resume_recent_task("再提交", {}) is None + assert should_resume_recent_task("再提交", None) is None + + +def test_should_resume_returns_none_for_non_confirmation_message(): + state = _state_with_travel_application() + assert should_resume_recent_task("今天天气不错", state) is None + assert should_resume_recent_task("你好", state) is None + assert should_resume_recent_task("查一下差旅标准", state) is None + assert should_resume_recent_task("", state) is None + + +def test_should_resume_returns_none_when_flow_has_no_fields(): + state = { + "active_flow": "travel_application", + "flows": {"travel_application": {"fields": {}, "missing_fields": []}}, + } + assert should_resume_recent_task("再提交", state) is None + + +def test_should_resume_finds_flow_when_active_flow_empty(): + # active_flow 已清空,但 flows 里仍有可恢复的 flow + state = { + "active_flow": "", + "flows": { + "travel_application": { + "fields": {"location": "武汉"}, + } + }, + } + assert should_resume_recent_task("再提交", state) == "travel_application" + + +def test_resume_task_from_flow_restores_travel_application(): + flow = { + "flow_id": "travel_application", + "fields": {"location": "上海", "time_range": "2026-02-20 至 2026-02-23"}, + "missing_fields": [], + } + task = resume_task_from_flow("travel_application", flow, task_index=1) + assert task.task_type == "expense_application" + assert task.assigned_agent == "application_assistant" + assert task.ontology_fields["location"] == "上海" + assert task.requested_action == "submit" + assert task.status == "ready_to_delegate" # 无 missing_fields + + +def test_resume_task_from_flow_marks_needs_confirmation_when_missing_fields(): + flow = { + "fields": {"location": "武汉"}, + "missing_fields": ["time_range", "reason"], + } + task = resume_task_from_flow("travel_application", flow) + assert task.missing_fields == ["time_range", "reason"] + assert task.status == "needs_confirmation" + + +def test_attach_resumed_task_adds_task_and_thinking_event(): + plan = StewardPlanResponse( + plan_id="plan_test", + planning_source="rule_fallback", + summary="占位", + tasks=[], + thinking_events=[], + pending_flow_confirmation={"status": "none"}, + ) + state = _state_with_travel_application({"location": "上海", "time_range": "2026-02-20 至 2026-02-23"}) + updated = attach_resumed_task(plan, state, "travel_application") + assert len(updated.tasks) == 1 + assert updated.tasks[0].task_type == "expense_application" + assert updated.tasks[0].ontology_fields["location"] == "上海" + assert updated.planning_source == "context_resume" + # thinking_event 应说明上下文已恢复 + assert any("恢复" in event.title or "恢复" in event.content for event in updated.thinking_events) + + +def test_attach_resumed_task_returns_unchanged_when_flow_missing(): + plan = StewardPlanResponse( + plan_id="plan_test", + planning_source="rule_fallback", + summary="占位", + tasks=[], + thinking_events=[], + pending_flow_confirmation={"status": "none"}, + ) + updated = attach_resumed_task(plan, {"flows": {}}, "travel_application") + assert updated is plan # 原样返回 + + +def test_resume_keywords_cover_common_variants(): + # 确认关键词覆盖场景里常见的表述 + assert "再提交" in RESUME_CONFIRMATION_KEYWORDS + assert "继续提交" in RESUME_CONFIRMATION_KEYWORDS + assert "重新申请" in RESUME_CONFIRMATION_KEYWORDS + # "提交" 单独不在列表里(避免把"首次提交"误判为恢复) + assert "提交" not in RESUME_CONFIRMATION_KEYWORDS diff --git a/server/tests/test_steward_intent_agent.py b/server/tests/test_steward_intent_agent.py index 022fe31..fea8f0d 100644 --- a/server/tests/test_steward_intent_agent.py +++ b/server/tests/test_steward_intent_agent.py @@ -98,3 +98,58 @@ def test_steward_intent_system_prompt_mentions_query_intent_guidance() -> None: assert "query_travel_standard" in system_prompt assert "差旅" in system_prompt assert "住宿标准" in system_prompt + + +def test_steward_intent_system_prompt_includes_conversation_history_guidance() -> None: + """system prompt 应包含'结合对话历史理解确认类话术'的引导。""" + from app.services import steward_intent_bootstrap # noqa: F401 + + messages = StewardIntentAgent._build_messages( + StewardPlanRequest(message="再提交"), + base_date=__import__("datetime").date(2026, 6, 24), + canonical_fields=["location", "time_range"], + ) + system_prompt = messages[0]["content"] + assert "recent_history" in system_prompt + assert "再提交" in system_prompt + assert "确认类话术" in system_prompt + + +def test_steward_intent_context_payload_includes_recent_history() -> None: + """context_payload 应携带 recent_history 结构化字段(role + content)。""" + import json + + request = StewardPlanRequest( + message="再提交", + context_json={ + "recent_history": [ + {"role": "user", "content": "2026-02-20 至 2026-02-23,去上海出差,火车"}, + {"role": "assistant", "content": "好的,为您整理出差申请预览。"}, + {"role": "user", "content": "直接提交"}, + {"role": "assistant", "content": "检测到重复申请,已暂停提交。"}, + ], + }, + ) + messages = StewardIntentAgent._build_messages( + request, + base_date=__import__("datetime").date(2026, 6, 24), + canonical_fields=["location", "time_range"], + ) + user_payload = json.loads(messages[1]["content"]) + assert "recent_history" in user_payload + assert len(user_payload["recent_history"]) == 4 + assert user_payload["recent_history"][0]["role"] == "user" + assert "上海" in user_payload["recent_history"][0]["content"] + + +def test_steward_intent_context_payload_omits_empty_recent_history() -> None: + """无 recent_history 时不应注入空列表。""" + import json + + messages = StewardIntentAgent._build_messages( + StewardPlanRequest(message="你好"), + base_date=__import__("datetime").date(2026, 6, 24), + canonical_fields=["location"], + ) + user_payload = json.loads(messages[1]["content"]) + assert user_payload.get("recent_history", []) == []