feat(server): 会话上下文保留(LLM 历史 + 确定性兜底双保险)

解决用户删除草稿后说'再提交'丢失上下文的问题:

- steward.py 新增 _inject_recent_conversation_history:build_plan 前读最近 10 条对话注入 context_json
- steward_intent_agent.py 的 _build_messages 把 recent_history 暴露给模型,system prompt 加确认类话术引导
- 新建 steward_context_resume.py:should_resume_recent_task 检测'再提交'类话术 + state 有可恢复 flow,attach_resumed_task 从 state 恢复 task
- 两个 plan 入口(/plans 和 /plans/stream)都已接入双保险
- 后端 67 passed,端到端验证'上海出差→再提交'成功恢复 task
This commit is contained in:
caoxiaozhu
2026-06-25 15:08:56 +08:00
parent 2ebc2756bf
commit e9d7c56d5b
5 changed files with 440 additions and 0 deletions

View File

@@ -29,6 +29,10 @@ from app.services.agent_conversations import AgentConversationService
from app.services.expense_claim_draft_flow import APPROVED_APPLICATION_LINK_STATUSES
from app.services.expense_claims import ExpenseClaimService
from app.services.runtime_chat import RuntimeChatService
from app.services.steward_context_resume import (
attach_resumed_task,
should_resume_recent_task,
)
from app.services.steward_flow_state import StewardFlowStateService
from app.services.steward_graph_action_runtime import StewardGraphActionRuntime
from app.services.steward_graph_planner import StewardGraphPlannerService
@@ -61,7 +65,9 @@ def create_steward_plan(payload: StewardPlanRequest, db: DbSession) -> StewardPl
try:
planner = _build_steward_planner(db)
hydrated_payload = _hydrate_required_application_gate(db, payload, planner)
hydrated_payload = _inject_recent_conversation_history(db, hydrated_payload)
plan = planner.build_plan(hydrated_payload)
plan = _apply_context_resume(db, hydrated_payload, plan)
return _attach_conversation_state(db, hydrated_payload, plan)
except ValueError as exc:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc
@@ -143,7 +149,9 @@ async def _iter_steward_plan_events(
try:
hydrated_payload = _hydrate_required_application_gate(db, payload, planner)
hydrated_payload = _inject_recent_conversation_history(db, hydrated_payload)
plan = planner.build_plan(hydrated_payload)
plan = _apply_context_resume(db, hydrated_payload, plan)
plan = _attach_conversation_state(db, hydrated_payload, plan)
except ValueError as exc:
yield _encode_stream_event("error", {"message": str(exc)})
@@ -495,3 +503,66 @@ def _resolve_current_steward_state(
return stored_state
incoming_state = context_json.get("steward_state") or context_json.get("stewardState")
return incoming_state if isinstance(incoming_state, dict) else {}
def _inject_recent_conversation_history(
db: Session,
payload: StewardPlanRequest,
) -> StewardPlanRequest:
"""读取会话最近 10 条对话历史,注入 context_json.recent_history 供 LLM 关联上下文。
历史只给模型用,不返回前端。在 get_or_create_conversation 之前读取,
使用前端传入的 conversation_id避免把本轮消息算进历史。
"""
context_json = dict(payload.context_json or {})
conversation_id = _resolve_conversation_id(context_json)
if not conversation_id:
return payload
try:
recent_history = AgentConversationService(db).list_message_history(
conversation_id,
limit=10,
)
except Exception:
recent_history = []
if not recent_history:
return payload
return payload.model_copy(
update={
"context_json": {
**context_json,
"recent_history": recent_history,
}
}
)
def _apply_context_resume(
db: Session,
payload: StewardPlanRequest,
plan: StewardPlanResponse,
) -> StewardPlanResponse:
"""确定性兜底:若 plan 无 task 且用户说"再提交"类话术,从会话状态恢复最近 task。
不依赖 LLM 理解力100% 可靠地恢复上下文。LLM 注入历史(保险②)覆盖更模糊话术。
"""
if plan.tasks or plan.candidate_flows:
return plan
context_json = dict(payload.context_json or {})
conversation_id = _resolve_conversation_id(context_json)
if not conversation_id:
return plan
try:
conversation = AgentConversationService(db).get_conversation(conversation_id)
except Exception:
conversation = None
if conversation is None:
return plan
current_state = _resolve_current_steward_state(
conversation.state_json if isinstance(conversation.state_json, dict) else {},
context_json,
)
resume_flow_id = should_resume_recent_task(payload.message, current_state)
if not resume_flow_id:
return plan
return attach_resumed_task(plan, current_state, resume_flow_id)

View File

@@ -0,0 +1,170 @@
from __future__ import annotations
import re
from typing import Any
from app.schemas.steward import (
StewardPlanResponse,
StewardTask,
StewardThinkingEvent,
)
# "再提交"类确认话术:用户在删除草稿/解决冲突后,用这些话术恢复之前的申请 task
RESUME_CONFIRMATION_KEYWORDS = (
"再提交",
"继续提交",
"重新提交",
"再申请",
"继续申请",
"重新申请",
"那就提交",
"那就申请",
"继续吧",
"再试一次",
"重新发起",
"重新创建",
)
# flow_id → task_type 映射,用于从 steward_state 恢复 task
_FLOW_TASK_TYPE = {
"travel_application": "expense_application",
"travel_reimbursement": "reimbursement",
}
_FLOW_ASSIGNED_AGENT = {
"travel_application": "application_assistant",
"travel_reimbursement": "reimbursement_assistant",
}
def should_resume_recent_task(
message: str,
steward_state: dict[str, Any] | None,
) -> str | None:
"""检测'再提交'类话术 + steward_state 里有可恢复的 flow返回 flow_id 或 None。
确定性兜底:不依赖 LLM当用户用确认类话术"再提交")且 state 里存在
一个仍有业务字段的 flow 时,直接恢复该 flow。
"""
if not _matches_resume_keywords(message):
return None
if not isinstance(steward_state, dict):
return None
active_flow = str(steward_state.get("active_flow") or "").strip()
flows = steward_state.get("flows") if isinstance(steward_state.get("flows"), dict) else {}
# 优先恢复 active_flow其次遍历所有 flow 找最近一个有字段的
candidate_flow_ids: list[str] = []
if active_flow and active_flow in flows:
candidate_flow_ids.append(active_flow)
for flow_id in flows:
if flow_id not in candidate_flow_ids:
candidate_flow_ids.append(flow_id)
for flow_id in candidate_flow_ids:
flow = flows.get(flow_id)
if isinstance(flow, dict) and _flow_has_resumable_fields(flow):
return str(flow_id or "").strip() or None
return None
def resume_task_from_flow(
flow_id: str,
flow: dict[str, Any],
task_index: int = 1,
) -> StewardTask:
"""从 steward_state.flows[flow_id] 恢复成 StewardTask。
复用 runtime-decision 的恢复逻辑_hydrate_runtime_state 的 field 读取),
但产出完整 StewardTask 而非 runtime dict。
"""
task_type = _FLOW_TASK_TYPE.get(flow_id, "expense_application")
assigned_agent = _FLOW_ASSIGNED_AGENT.get(flow_id, "application_assistant")
fields = {
str(key or "").strip(): str(value or "").strip()
for key, value in (flow.get("fields") or {}).items()
if str(key or "").strip() and str(value or "").strip()
}
missing_fields = [
str(item or "").strip()
for item in (flow.get("missing_fields") or [])
if str(item or "").strip()
]
task_prefix = "app" if task_type == "expense_application" else "reim"
return StewardTask(
task_id=f"task_resume_{task_prefix}_{task_index:03d}",
task_type=task_type,
assigned_agent=assigned_agent,
title="恢复上次未完成的申请" if task_type == "expense_application" else "恢复上次未完成的报销",
summary="根据之前的对话上下文恢复该任务。",
status="needs_confirmation" if missing_fields else "ready_to_delegate",
confidence=0.85,
requested_action="submit",
ontology_fields=fields,
missing_fields=missing_fields,
confirmation_required=True,
)
def attach_resumed_task(
plan: StewardPlanResponse,
steward_state: dict[str, Any] | None,
flow_id: str,
) -> StewardPlanResponse:
"""把恢复的 task 挂回 plan并补充一条 thinking_event 说明上下文已恢复。"""
if not isinstance(steward_state, dict):
return plan
flows = steward_state.get("flows") if isinstance(steward_state.get("flows"), dict) else {}
flow = flows.get(flow_id) if isinstance(flows, dict) else None
if not isinstance(flow, dict):
return plan
resumed_task = resume_task_from_flow(flow_id, flow, task_index=len(plan.tasks) + 1)
tasks = list(plan.tasks) + [resumed_task]
thinking_events = list(plan.thinking_events)
field_summary = "".join(
f"{key}:{value}" for key, value in resumed_task.ontology_fields.items() if value
)
thinking_events.append(
StewardThinkingEvent(
event_id="context_resume_recovered",
stage="llm_function_call",
title="已恢复上次未完成的申请",
content=(
f"识别到您要继续之前的{('出差申请' if resumed_task.task_type == 'expense_application' else '费用报销')}"
f"已从会话上下文恢复该任务"
+ (f"{field_summary})。" if field_summary else "")
),
status="completed",
)
)
return plan.model_copy(
update={
"tasks": tasks,
"thinking_events": thinking_events,
"planning_source": "context_resume",
"next_action": "confirm_task" if resumed_task.missing_fields else "delegate_task",
}
)
def _matches_resume_keywords(message: str) -> bool:
compact = re.sub(r"\s+", "", str(message or ""))
if not compact:
return False
return any(keyword in compact for keyword in RESUME_CONFIRMATION_KEYWORDS)
def _flow_has_resumable_fields(flow: dict[str, Any]) -> bool:
"""判断 flow 是否还有可恢复的业务字段(至少有 1 个非空字段)。"""
fields = flow.get("fields")
if not isinstance(fields, dict):
return False
return any(
str(value or "").strip()
for value in fields.values()
)

View File

@@ -103,8 +103,17 @@ class StewardIntentAgent:
"employee_grade",
"employee_no",
"client_timezone_offset_minutes",
"recent_history",
}
},
"recent_history": [
{
"role": str(item.get("role") or "").strip(),
"content": str(item.get("content") or "").strip(),
}
for item in (request.context_json.get("recent_history") or [])
if isinstance(item, dict) and str(item.get("content") or "").strip()
],
"attachments": [
{
"index": index + 1,
@@ -134,6 +143,11 @@ class StewardIntentAgent:
"每个 task 必须输出 requested_action用户只是要求整理/发起但未说保存或提交时为 preview"
"用户说保存草稿、先保存、存草稿时为 save_draft用户说直接提交、提交申请、确认提交时为 submit。"
"对于查询类任务(如查询差旅标准),requested_action 固定为 preview。"
"recent_history 是本会话最近 10 轮对话role 为 user 或 assistant"
"当用户说“再提交”“继续”“重新提交”“重新申请”等确认类话术时,"
"必须结合 recent_history 里最近一次提到的出差/报销申请来理解用户意图,"
"复用该申请的 ontology_fields 重新生成 task而不是把确认话术当作孤立的模糊输入。"
"如果 recent_history 为空或无法关联到具体申请,才按当前 message 字面理解。"
"相对日期必须以 base_date 为准转换为明确日期。"
"thinking_events 只能是面向用户的过程摘要,不能暴露内部推理链。"
"如果用户输入与出差、费用、报销、申请、差旅标准等财务事项完全无关"

View File

@@ -0,0 +1,130 @@
from __future__ import annotations
from datetime import date
from app.schemas.steward import StewardPlanResponse, StewardTask, StewardThinkingEvent
from app.services.steward_context_resume import (
RESUME_CONFIRMATION_KEYWORDS,
attach_resumed_task,
resume_task_from_flow,
should_resume_recent_task,
)
def _state_with_travel_application(fields: dict | None = None) -> dict:
return {
"active_flow": "travel_application",
"flows": {
"travel_application": {
"flow_id": "travel_application",
"status": "ready_for_confirmation",
"fields": fields or {"location": "上海", "time_range": "2026-02-20 至 2026-02-23"},
"missing_fields": [],
}
},
}
def test_should_resume_returns_flow_id_for_confirmation_keyword_with_state():
state = _state_with_travel_application()
for keyword in ("再提交", "继续提交", "重新提交", "再申请", "重新申请", "那就提交", "继续吧", "再试一次"):
assert should_resume_recent_task(keyword, state) == "travel_application", f"keyword={keyword}"
def test_should_resume_returns_none_when_state_empty():
assert should_resume_recent_task("再提交", {}) is None
assert should_resume_recent_task("再提交", None) is None
def test_should_resume_returns_none_for_non_confirmation_message():
state = _state_with_travel_application()
assert should_resume_recent_task("今天天气不错", state) is None
assert should_resume_recent_task("你好", state) is None
assert should_resume_recent_task("查一下差旅标准", state) is None
assert should_resume_recent_task("", state) is None
def test_should_resume_returns_none_when_flow_has_no_fields():
state = {
"active_flow": "travel_application",
"flows": {"travel_application": {"fields": {}, "missing_fields": []}},
}
assert should_resume_recent_task("再提交", state) is None
def test_should_resume_finds_flow_when_active_flow_empty():
# active_flow 已清空,但 flows 里仍有可恢复的 flow
state = {
"active_flow": "",
"flows": {
"travel_application": {
"fields": {"location": "武汉"},
}
},
}
assert should_resume_recent_task("再提交", state) == "travel_application"
def test_resume_task_from_flow_restores_travel_application():
flow = {
"flow_id": "travel_application",
"fields": {"location": "上海", "time_range": "2026-02-20 至 2026-02-23"},
"missing_fields": [],
}
task = resume_task_from_flow("travel_application", flow, task_index=1)
assert task.task_type == "expense_application"
assert task.assigned_agent == "application_assistant"
assert task.ontology_fields["location"] == "上海"
assert task.requested_action == "submit"
assert task.status == "ready_to_delegate" # 无 missing_fields
def test_resume_task_from_flow_marks_needs_confirmation_when_missing_fields():
flow = {
"fields": {"location": "武汉"},
"missing_fields": ["time_range", "reason"],
}
task = resume_task_from_flow("travel_application", flow)
assert task.missing_fields == ["time_range", "reason"]
assert task.status == "needs_confirmation"
def test_attach_resumed_task_adds_task_and_thinking_event():
plan = StewardPlanResponse(
plan_id="plan_test",
planning_source="rule_fallback",
summary="占位",
tasks=[],
thinking_events=[],
pending_flow_confirmation={"status": "none"},
)
state = _state_with_travel_application({"location": "上海", "time_range": "2026-02-20 至 2026-02-23"})
updated = attach_resumed_task(plan, state, "travel_application")
assert len(updated.tasks) == 1
assert updated.tasks[0].task_type == "expense_application"
assert updated.tasks[0].ontology_fields["location"] == "上海"
assert updated.planning_source == "context_resume"
# thinking_event 应说明上下文已恢复
assert any("恢复" in event.title or "恢复" in event.content for event in updated.thinking_events)
def test_attach_resumed_task_returns_unchanged_when_flow_missing():
plan = StewardPlanResponse(
plan_id="plan_test",
planning_source="rule_fallback",
summary="占位",
tasks=[],
thinking_events=[],
pending_flow_confirmation={"status": "none"},
)
updated = attach_resumed_task(plan, {"flows": {}}, "travel_application")
assert updated is plan # 原样返回
def test_resume_keywords_cover_common_variants():
# 确认关键词覆盖场景里常见的表述
assert "再提交" in RESUME_CONFIRMATION_KEYWORDS
assert "继续提交" in RESUME_CONFIRMATION_KEYWORDS
assert "重新申请" in RESUME_CONFIRMATION_KEYWORDS
# "提交" 单独不在列表里(避免把"首次提交"误判为恢复)
assert "提交" not in RESUME_CONFIRMATION_KEYWORDS

View File

@@ -98,3 +98,58 @@ def test_steward_intent_system_prompt_mentions_query_intent_guidance() -> None:
assert "query_travel_standard" in system_prompt
assert "差旅" in system_prompt
assert "住宿标准" in system_prompt
def test_steward_intent_system_prompt_includes_conversation_history_guidance() -> None:
"""system prompt 应包含'结合对话历史理解确认类话术'的引导。"""
from app.services import steward_intent_bootstrap # noqa: F401
messages = StewardIntentAgent._build_messages(
StewardPlanRequest(message="再提交"),
base_date=__import__("datetime").date(2026, 6, 24),
canonical_fields=["location", "time_range"],
)
system_prompt = messages[0]["content"]
assert "recent_history" in system_prompt
assert "再提交" in system_prompt
assert "确认类话术" in system_prompt
def test_steward_intent_context_payload_includes_recent_history() -> None:
"""context_payload 应携带 recent_history 结构化字段role + content"""
import json
request = StewardPlanRequest(
message="再提交",
context_json={
"recent_history": [
{"role": "user", "content": "2026-02-20 至 2026-02-23去上海出差火车"},
{"role": "assistant", "content": "好的,为您整理出差申请预览。"},
{"role": "user", "content": "直接提交"},
{"role": "assistant", "content": "检测到重复申请,已暂停提交。"},
],
},
)
messages = StewardIntentAgent._build_messages(
request,
base_date=__import__("datetime").date(2026, 6, 24),
canonical_fields=["location", "time_range"],
)
user_payload = json.loads(messages[1]["content"])
assert "recent_history" in user_payload
assert len(user_payload["recent_history"]) == 4
assert user_payload["recent_history"][0]["role"] == "user"
assert "上海" in user_payload["recent_history"][0]["content"]
def test_steward_intent_context_payload_omits_empty_recent_history() -> None:
"""无 recent_history 时不应注入空列表。"""
import json
messages = StewardIntentAgent._build_messages(
StewardPlanRequest(message="你好"),
base_date=__import__("datetime").date(2026, 6, 24),
canonical_fields=["location"],
)
user_payload = json.loads(messages[1]["content"])
assert user_payload.get("recent_history", []) == []