Compare commits
1 Commits
phase1-reg
...
4972b4e6b1
| Author | SHA1 | Date | |
|---|---|---|---|
| 4972b4e6b1 |
File diff suppressed because it is too large
Load Diff
@@ -1,6 +1,6 @@
|
||||
from dataclasses import dataclass, field
|
||||
from typing import TypedDict, Annotated, Sequence
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
from typing import Annotated, Any, TypedDict
|
||||
|
||||
from langchain_core.messages import BaseMessage
|
||||
from langgraph.graph.message import add_messages
|
||||
@@ -23,40 +23,65 @@ class ConversationTurn:
|
||||
|
||||
|
||||
class AgentState(TypedDict):
|
||||
# Core message history with add_messages reducer
|
||||
messages: Annotated[list[BaseMessage], add_messages]
|
||||
|
||||
# Session identifiers
|
||||
user_id: str
|
||||
conversation_id: str
|
||||
|
||||
# Agent routing state
|
||||
current_agent: str | None
|
||||
next_step: str | None # For explicit graph routing
|
||||
|
||||
# Traceability
|
||||
next_step: str | None
|
||||
active_agents: list[AgentRole]
|
||||
current_sub_commander: str | None
|
||||
active_sub_commanders: list[str]
|
||||
sub_commander_trace: list[dict[str, Any]]
|
||||
agent_trace: list[str]
|
||||
|
||||
# Task & Entity Tracking (Business Logic)
|
||||
pending_tasks: list[dict]
|
||||
completed_tasks: list[dict]
|
||||
created_entities: list[dict]
|
||||
|
||||
# Context summaries (for long-term or cross-agent context)
|
||||
pending_tasks: list[dict[str, Any]]
|
||||
completed_tasks: list[dict[str, Any]]
|
||||
tool_calls: list[dict[str, Any]]
|
||||
last_tool_result: str | None
|
||||
action_results: list[dict[str, Any]]
|
||||
created_entities: list[dict[str, Any]]
|
||||
tool_outcomes: list[dict[str, Any]]
|
||||
|
||||
tool_strategy_used: str | None
|
||||
tool_round_count: int
|
||||
max_tool_rounds: int
|
||||
retry_count: int
|
||||
max_retries: int
|
||||
iteration_count: int
|
||||
max_iterations: int
|
||||
routing_hops: int
|
||||
max_routing_hops: int
|
||||
terminated_due_to_loop_guard: bool
|
||||
retrieval_trace: list[dict[str, Any]]
|
||||
stop_reason: str | None
|
||||
|
||||
clarification_needed: bool
|
||||
clarification_question: str | None
|
||||
fallback_parse_error: str | None
|
||||
should_respond: bool
|
||||
|
||||
knowledge_context: str | None
|
||||
graph_context: str | None
|
||||
schedule_context_summary: str | None
|
||||
plan: str | None
|
||||
plan_steps: list[dict[str, Any]]
|
||||
analysis_report: str | None
|
||||
|
||||
# Output control
|
||||
final_response: str | None
|
||||
|
||||
# Memory & Environment
|
||||
|
||||
memory_context: str | None
|
||||
current_datetime_context: str | None
|
||||
|
||||
# Configuration
|
||||
user_llm_config: dict | None
|
||||
provider_capabilities: dict | None
|
||||
current_datetime_reference: dict[str, str] | None
|
||||
|
||||
turn_context: dict[str, Any] | None
|
||||
routing_decision: dict[str, Any] | None
|
||||
continuity_state: dict[str, Any] | None
|
||||
pending_action: dict[str, Any] | None
|
||||
last_completed_action: dict[str, Any] | None
|
||||
clarification_context: dict[str, Any] | None
|
||||
|
||||
user_llm_config: dict[str, Any] | None
|
||||
provider_capabilities: dict[str, Any] | None
|
||||
|
||||
|
||||
def initial_state(user_id: str, conversation_id: str) -> AgentState:
|
||||
@@ -66,16 +91,50 @@ def initial_state(user_id: str, conversation_id: str) -> AgentState:
|
||||
conversation_id=conversation_id,
|
||||
current_agent=AgentRole.MASTER.value,
|
||||
next_step=None,
|
||||
active_agents=[AgentRole.MASTER],
|
||||
current_sub_commander=None,
|
||||
active_sub_commanders=[],
|
||||
sub_commander_trace=[],
|
||||
agent_trace=[AgentRole.MASTER.value],
|
||||
pending_tasks=[],
|
||||
completed_tasks=[],
|
||||
tool_calls=[],
|
||||
last_tool_result=None,
|
||||
action_results=[],
|
||||
created_entities=[],
|
||||
tool_outcomes=[],
|
||||
tool_strategy_used=None,
|
||||
tool_round_count=0,
|
||||
max_tool_rounds=2,
|
||||
retry_count=0,
|
||||
max_retries=1,
|
||||
iteration_count=0,
|
||||
max_iterations=3,
|
||||
routing_hops=0,
|
||||
max_routing_hops=2,
|
||||
terminated_due_to_loop_guard=False,
|
||||
retrieval_trace=[],
|
||||
stop_reason=None,
|
||||
clarification_needed=False,
|
||||
clarification_question=None,
|
||||
fallback_parse_error=None,
|
||||
should_respond=True,
|
||||
knowledge_context=None,
|
||||
graph_context=None,
|
||||
schedule_context_summary=None,
|
||||
plan=None,
|
||||
plan_steps=[],
|
||||
analysis_report=None,
|
||||
final_response=None,
|
||||
memory_context=None,
|
||||
current_datetime_context=None,
|
||||
current_datetime_reference=None,
|
||||
turn_context=None,
|
||||
routing_decision=None,
|
||||
continuity_state=None,
|
||||
pending_action=None,
|
||||
last_completed_action=None,
|
||||
clarification_context=None,
|
||||
user_llm_config=None,
|
||||
provider_capabilities=None,
|
||||
)
|
||||
|
||||
18
backend/app/agents/tools/async_bridge.py
Normal file
18
backend/app/agents/tools/async_bridge.py
Normal file
@@ -0,0 +1,18 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from typing import Any
|
||||
|
||||
_executor = ThreadPoolExecutor(max_workers=4)
|
||||
|
||||
|
||||
def run_async(coro: Any, timeout: int = 30):
|
||||
try:
|
||||
asyncio.get_running_loop()
|
||||
except RuntimeError:
|
||||
return asyncio.run(coro)
|
||||
return _executor.submit(asyncio.run, coro).result(timeout=timeout)
|
||||
|
||||
|
||||
__all__ = ["run_async"]
|
||||
@@ -4,19 +4,12 @@ from langchain_core.tools import tool
|
||||
from app.database import async_session
|
||||
from app.models.forum import ForumPost, ForumReply
|
||||
from app.agents.context import get_current_user
|
||||
from app.agents.tools.async_bridge import run_async
|
||||
from sqlalchemy import select
|
||||
import asyncio
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
|
||||
_executor = ThreadPoolExecutor(max_workers=4)
|
||||
|
||||
|
||||
def _run_async(coro, timeout: int = 30):
|
||||
try:
|
||||
asyncio.get_running_loop()
|
||||
except RuntimeError:
|
||||
return asyncio.run(coro)
|
||||
return _executor.submit(asyncio.run, coro).result(timeout=timeout)
|
||||
return run_async(coro, timeout=timeout)
|
||||
|
||||
|
||||
@tool
|
||||
|
||||
@@ -2,8 +2,6 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from datetime import date, datetime
|
||||
from zoneinfo import ZoneInfo
|
||||
|
||||
@@ -11,21 +9,16 @@ from langchain_core.tools import tool
|
||||
from sqlalchemy import select
|
||||
|
||||
from app.agents.context import get_current_user
|
||||
from app.agents.tools.async_bridge import run_async
|
||||
from app.database import async_session
|
||||
from app.models.goal import Goal, GoalStatus
|
||||
from app.models.reminder import Reminder
|
||||
from app.models.task import Task, TaskPriority, TaskStatus
|
||||
from app.models.todo import DailyTodo, TodoSource
|
||||
|
||||
_executor = ThreadPoolExecutor(max_workers=4)
|
||||
|
||||
|
||||
def _run_async(coro, timeout: int = 30):
|
||||
try:
|
||||
asyncio.get_running_loop()
|
||||
except RuntimeError:
|
||||
return asyncio.run(coro)
|
||||
return _executor.submit(asyncio.run, coro).result(timeout=timeout)
|
||||
return run_async(coro, timeout=timeout)
|
||||
|
||||
|
||||
def _parse_date(value: str | None) -> date:
|
||||
|
||||
@@ -5,25 +5,16 @@ Agent 工具集 - 知识库 & 图谱相关
|
||||
由于 LangChain 工具系统是同步的,内部用 run_in_executor 处理 async 逻辑。
|
||||
"""
|
||||
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
import asyncio
|
||||
|
||||
from langchain_core.tools import tool
|
||||
|
||||
from app.agents.context import get_current_user
|
||||
from app.agents.tools.async_bridge import run_async
|
||||
from app.database import async_session
|
||||
|
||||
_executor = ThreadPoolExecutor(max_workers=4)
|
||||
|
||||
|
||||
def _run_async(coro, timeout: int = 30):
|
||||
"""在同步上下文中运行 async 代码"""
|
||||
try:
|
||||
loop = asyncio.get_running_loop()
|
||||
future = loop.run_in_executor(_executor, lambda: asyncio.run(coro))
|
||||
return future.result(timeout=timeout)
|
||||
except RuntimeError:
|
||||
return asyncio.run(coro)
|
||||
return run_async(coro, timeout=timeout)
|
||||
|
||||
|
||||
@tool
|
||||
|
||||
@@ -8,21 +8,13 @@ from langchain_core.tools import tool
|
||||
from sqlalchemy import select
|
||||
|
||||
from app.agents.context import get_current_user
|
||||
from app.agents.tools.async_bridge import run_async
|
||||
from app.database import async_session
|
||||
from app.models.task import Task, TaskPriority, TaskStatus
|
||||
|
||||
import asyncio
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
|
||||
_executor = ThreadPoolExecutor(max_workers=4)
|
||||
|
||||
|
||||
def _run_async(coro, timeout: int = 30):
|
||||
try:
|
||||
asyncio.get_running_loop()
|
||||
except RuntimeError:
|
||||
return asyncio.run(coro)
|
||||
return _executor.submit(asyncio.run, coro).result(timeout=timeout)
|
||||
return run_async(coro, timeout=timeout)
|
||||
|
||||
|
||||
def _normalize_title(title: str | None, content: str | None) -> str:
|
||||
|
||||
@@ -241,6 +241,10 @@ def normalize_tool_time_arguments(tool_name: str, args: dict, current_datetime_c
|
||||
if raw_value and not _is_iso_datetime(raw_value):
|
||||
payload = resolve_time_expression_data(raw_value, current_datetime_context=current_datetime_context, prefer="datetime")
|
||||
normalized["reminder_at"] = payload["resolved_datetime"]
|
||||
raw_date = normalized.get("date")
|
||||
if isinstance(raw_date, str) and raw_date.strip() and not _is_iso_date(raw_date):
|
||||
payload = resolve_time_expression_data(raw_date, current_datetime_context=current_datetime_context, prefer="date")
|
||||
normalized["date"] = payload["resolved_date"]
|
||||
return normalized
|
||||
|
||||
if tool_name in {"create_schedule_task", "create_task"}:
|
||||
|
||||
@@ -9,6 +9,7 @@ class Conversation(BaseModel):
|
||||
user_id = Column(String(36), ForeignKey("users.id"), nullable=False, index=True)
|
||||
title = Column(String(500), nullable=True)
|
||||
message_count = Column(Integer, default=0)
|
||||
agent_state = Column(JSON, nullable=True)
|
||||
|
||||
messages = relationship("Message", back_populates="conversation", cascade="all, delete-orphan")
|
||||
|
||||
|
||||
@@ -30,6 +30,56 @@ from app.agents.state import initial_state
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
MEMORY_SECTION_HEADERS = (
|
||||
"【用户记忆】",
|
||||
"【之前对话摘要】",
|
||||
"【知识大脑】",
|
||||
)
|
||||
|
||||
|
||||
def _split_memory_context_sections(memory_context: str | None) -> dict[str, str]:
|
||||
text = (memory_context or "").strip()
|
||||
if not text:
|
||||
return {}
|
||||
|
||||
sections: dict[str, str] = {}
|
||||
current_header: str | None = None
|
||||
current_lines: list[str] = []
|
||||
|
||||
for line in text.splitlines():
|
||||
stripped = line.strip()
|
||||
if stripped in MEMORY_SECTION_HEADERS:
|
||||
if current_header and current_lines:
|
||||
sections[current_header] = "\n".join(current_lines).strip()
|
||||
current_header = stripped
|
||||
current_lines = [stripped]
|
||||
continue
|
||||
if current_header:
|
||||
current_lines.append(line)
|
||||
|
||||
if current_header and current_lines:
|
||||
sections[current_header] = "\n".join(current_lines).strip()
|
||||
|
||||
return sections
|
||||
|
||||
|
||||
def _derive_role_memory_contexts(memory_context: str | None) -> dict[str, str | None]:
|
||||
sections = _split_memory_context_sections(memory_context)
|
||||
user_memory = sections.get("【用户记忆】")
|
||||
summaries = sections.get("【之前对话摘要】")
|
||||
knowledge = sections.get("【知识大脑】")
|
||||
|
||||
def _join_parts(*parts: str | None) -> str | None:
|
||||
values = [part for part in parts if part]
|
||||
return "\n\n".join(values) if values else None
|
||||
|
||||
return {
|
||||
"schedule_context_summary": _join_parts(user_memory, summaries),
|
||||
"knowledge_context": knowledge,
|
||||
"analysis_report": _join_parts(summaries, knowledge),
|
||||
}
|
||||
|
||||
|
||||
def _is_streaming_rejection_error(error: Exception, user_llm_config: dict | None) -> bool:
|
||||
capabilities = resolve_provider_capabilities(user_llm_config)
|
||||
error_text = str(error).lower()
|
||||
@@ -87,11 +137,122 @@ _CONTINUITY_SNAPSHOT_FIELDS = (
|
||||
)
|
||||
|
||||
|
||||
def _normalize_legacy_turn_context(turn_context: Any, current_agent: Any) -> dict[str, Any] | None:
|
||||
if not isinstance(turn_context, dict):
|
||||
return None
|
||||
normalized = dict(turn_context)
|
||||
active_agent = normalized.pop("active_agent", None)
|
||||
active_sub_flow = normalized.pop("active_sub_flow", None)
|
||||
if isinstance(active_agent, str) and active_agent and "active_agent" not in normalized:
|
||||
normalized["active_agent"] = active_agent
|
||||
if isinstance(active_sub_flow, str) and active_sub_flow and "active_sub_commander" not in normalized:
|
||||
normalized["active_sub_commander"] = active_sub_flow
|
||||
if not normalized.get("active_agent") and isinstance(current_agent, str) and current_agent:
|
||||
normalized["active_agent"] = current_agent
|
||||
return normalized or None
|
||||
|
||||
|
||||
def _normalize_legacy_pending_action(pending_action: Any) -> dict[str, Any] | None:
|
||||
if not isinstance(pending_action, dict):
|
||||
return None
|
||||
normalized = dict(pending_action)
|
||||
legacy_action_type = normalized.pop("action_type", None)
|
||||
if legacy_action_type and "type" not in normalized:
|
||||
normalized["type"] = legacy_action_type
|
||||
legacy_agent = normalized.pop("agent", None)
|
||||
legacy_sub_flow = normalized.pop("sub_flow", None)
|
||||
if legacy_agent and "owner_agent" not in normalized:
|
||||
normalized["owner_agent"] = legacy_agent
|
||||
if legacy_sub_flow and "owner_sub_commander" not in normalized:
|
||||
normalized["owner_sub_commander"] = legacy_sub_flow
|
||||
legacy_status = normalized.get("status")
|
||||
if legacy_status == "awaiting_confirmation":
|
||||
normalized["status"] = "pending"
|
||||
elif legacy_status == "awaiting_clarification":
|
||||
normalized["status"] = "blocked_on_clarification"
|
||||
return normalized or None
|
||||
|
||||
|
||||
def _normalize_legacy_clarification_context(
|
||||
clarification_context: Any,
|
||||
pending_action: dict[str, Any] | None,
|
||||
current_agent: Any,
|
||||
) -> dict[str, Any] | None:
|
||||
if not isinstance(clarification_context, dict):
|
||||
return None
|
||||
normalized = dict(clarification_context)
|
||||
active_agent = normalized.pop("active_agent", None)
|
||||
sub_flow = normalized.pop("sub_flow", None)
|
||||
awaiting_user_input = normalized.pop("awaiting_user_input", None)
|
||||
if isinstance(active_agent, str) and active_agent and "owning_agent" not in normalized:
|
||||
normalized["owning_agent"] = active_agent
|
||||
if isinstance(sub_flow, str) and sub_flow and "owning_sub_commander" not in normalized:
|
||||
normalized["owning_sub_commander"] = sub_flow
|
||||
if "target_action" not in normalized:
|
||||
target_action = None
|
||||
if pending_action:
|
||||
pending_type = pending_action.get("type")
|
||||
if isinstance(pending_type, str) and pending_type and pending_type != "clarification":
|
||||
target_action = pending_type
|
||||
if target_action is None and isinstance(sub_flow, str) and sub_flow.startswith("create_"):
|
||||
target_action = sub_flow
|
||||
if target_action:
|
||||
normalized["target_action"] = target_action
|
||||
if not normalized.get("owning_agent") and isinstance(current_agent, str) and current_agent:
|
||||
normalized["owning_agent"] = current_agent
|
||||
if awaiting_user_input is True and "status" not in normalized:
|
||||
normalized["status"] = "pending"
|
||||
return normalized or None
|
||||
|
||||
|
||||
def _normalize_legacy_continuity_state(
|
||||
continuity_state: Any,
|
||||
clarification_context: dict[str, Any] | None,
|
||||
) -> dict[str, Any] | None:
|
||||
if not isinstance(continuity_state, dict):
|
||||
return None
|
||||
normalized = dict(continuity_state)
|
||||
normalized.pop("active_agent", None)
|
||||
normalized.pop("active_sub_flow", None)
|
||||
legacy_status = normalized.get("status")
|
||||
if legacy_status == "awaiting_clarification":
|
||||
normalized["status"] = "fresh"
|
||||
if clarification_context and "mode" not in normalized:
|
||||
normalized["mode"] = "resume_after_clarification"
|
||||
return normalized or None
|
||||
|
||||
|
||||
def _normalize_continuity_snapshot(state: dict[str, Any]) -> dict[str, Any]:
|
||||
normalized = dict(state)
|
||||
current_agent = normalized.get("current_agent")
|
||||
pending_action = _normalize_legacy_pending_action(normalized.get("pending_action"))
|
||||
clarification_context = _normalize_legacy_clarification_context(
|
||||
normalized.get("clarification_context"),
|
||||
pending_action,
|
||||
current_agent,
|
||||
)
|
||||
continuity_state = _normalize_legacy_continuity_state(
|
||||
normalized.get("continuity_state"),
|
||||
clarification_context,
|
||||
)
|
||||
turn_context = _normalize_legacy_turn_context(normalized.get("turn_context"), current_agent)
|
||||
if pending_action is not None:
|
||||
normalized["pending_action"] = pending_action
|
||||
if clarification_context is not None:
|
||||
normalized["clarification_context"] = clarification_context
|
||||
if continuity_state is not None:
|
||||
normalized["continuity_state"] = continuity_state
|
||||
if turn_context is not None:
|
||||
normalized["turn_context"] = turn_context
|
||||
return normalized
|
||||
|
||||
|
||||
def _build_continuity_snapshot(state: dict[str, Any]) -> dict[str, Any] | None:
|
||||
normalized_state = _normalize_continuity_snapshot(state)
|
||||
snapshot = {
|
||||
field: state.get(field)
|
||||
field: normalized_state.get(field)
|
||||
for field in _CONTINUITY_SNAPSHOT_FIELDS
|
||||
if state.get(field) is not None
|
||||
if normalized_state.get(field) is not None
|
||||
}
|
||||
if not snapshot:
|
||||
return None
|
||||
@@ -116,7 +277,7 @@ def _extract_continuity_snapshot(payload: Any) -> dict[str, Any] | None:
|
||||
return None
|
||||
state = payload.get("state")
|
||||
if isinstance(state, dict):
|
||||
return state
|
||||
return _normalize_continuity_snapshot(state)
|
||||
return None
|
||||
|
||||
|
||||
@@ -187,7 +348,7 @@ class AgentService:
|
||||
return None
|
||||
|
||||
async def _load_continuity_snapshot(self, conversation: Conversation) -> dict[str, Any] | None:
|
||||
snapshot = _extract_continuity_snapshot(conversation.agent_state)
|
||||
snapshot = _extract_continuity_snapshot(getattr(conversation, "agent_state", None))
|
||||
if snapshot:
|
||||
return snapshot
|
||||
|
||||
@@ -358,6 +519,7 @@ class AgentService:
|
||||
current_datetime_reference=current_datetime_reference,
|
||||
user_llm_config=user_llm_config,
|
||||
)
|
||||
state.update(_derive_role_memory_contexts(memory_ctx))
|
||||
|
||||
yield self._build_progress_event("thinking", "Jarvis 正在分析请求", agent="master", step="理解你的问题")
|
||||
|
||||
@@ -464,7 +626,10 @@ class AgentService:
|
||||
"kind": "agent_continuity_state",
|
||||
**continuity_snapshot,
|
||||
}] if continuity_snapshot else None)
|
||||
conv.agent_state = continuity_snapshot
|
||||
conv.agent_state = ({
|
||||
"kind": "agent_continuity_state",
|
||||
**continuity_snapshot,
|
||||
} if continuity_snapshot else None)
|
||||
await BrainService(self.db).create_event(
|
||||
user_id,
|
||||
**_build_assistant_event_payload(collected),
|
||||
@@ -557,7 +722,7 @@ class AgentService:
|
||||
current_datetime_reference=current_datetime_reference,
|
||||
user_llm_config=user_llm_config,
|
||||
)
|
||||
|
||||
state.update(_derive_role_memory_contexts(memory_ctx))
|
||||
result_state = await graph.ainvoke(state)
|
||||
response_content = result_state.get("final_response") or str(result_state.get("messages", [AIMessage(content="")])[-1].content)
|
||||
except Exception as e:
|
||||
@@ -585,7 +750,10 @@ class AgentService:
|
||||
"kind": "agent_continuity_state",
|
||||
**continuity_snapshot,
|
||||
}] if continuity_snapshot else None)
|
||||
conv.agent_state = continuity_snapshot
|
||||
conv.agent_state = ({
|
||||
"kind": "agent_continuity_state",
|
||||
**continuity_snapshot,
|
||||
} if continuity_snapshot else None)
|
||||
await self.db.commit()
|
||||
await self.db.refresh(assistant_msg)
|
||||
|
||||
|
||||
@@ -4,12 +4,15 @@ Jarvis 记忆系统 (基于 Mem0)
|
||||
底层使用 Mem0 实现事实提取、时间线、矛盾解决和遗忘机制
|
||||
"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
from datetime import datetime
|
||||
import re
|
||||
from datetime import UTC, datetime
|
||||
from typing import Optional, Any
|
||||
from sqlalchemy import select, desc, func
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from app.models.conversation import Conversation, Message
|
||||
from app.models.memory import UserMemory
|
||||
from app.models.user import User
|
||||
from app.services.brain_service import BrainService
|
||||
from app.config import settings as _settings
|
||||
@@ -23,6 +26,9 @@ except ImportError:
|
||||
Memory = None
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
async def _get_user_embedding_config(db: AsyncSession, user_id: str) -> dict | None:
|
||||
"""从用户配置中获取 embedding 模型配置"""
|
||||
result = await db.execute(select(User).where(User.id == user_id))
|
||||
@@ -296,6 +302,23 @@ async def extract_user_memories(
|
||||
return []
|
||||
|
||||
|
||||
def _extract_memory_query_tokens(query: str) -> list[str]:
|
||||
normalized_query = (query or "").lower()
|
||||
tokens = [token for token in re.findall(r"[a-z0-9]+", normalized_query) if len(token) >= 3]
|
||||
|
||||
for chunk in re.findall(r"[\u4e00-\u9fff]+", query or ""):
|
||||
stripped_chunk = chunk.strip()
|
||||
if len(stripped_chunk) >= 4:
|
||||
tokens.append(stripped_chunk)
|
||||
if len(stripped_chunk) > 6:
|
||||
tokens.extend(
|
||||
stripped_chunk[index:index + 4]
|
||||
for index in range(len(stripped_chunk) - 3)
|
||||
)
|
||||
|
||||
return list(dict.fromkeys(tokens))
|
||||
|
||||
|
||||
async def recall_user_memories(
|
||||
db: AsyncSession,
|
||||
user_id: str,
|
||||
@@ -304,7 +327,7 @@ async def recall_user_memories(
|
||||
) -> list[dict]:
|
||||
"""
|
||||
根据当前输入召回相关的用户记忆。
|
||||
使用 Mem0 的语义搜索。
|
||||
使用 Mem0 的语义搜索;如果 Mem0 不可用或失败,则回退到本地 UserMemory。
|
||||
"""
|
||||
try:
|
||||
mem0 = await get_mem0(db, user_id)
|
||||
@@ -313,10 +336,56 @@ async def recall_user_memories(
|
||||
filters={"user_id": user_id},
|
||||
limit=top_k,
|
||||
)
|
||||
return results.get("results", [])
|
||||
mem0_results = results.get("results", [])
|
||||
if mem0_results:
|
||||
return mem0_results
|
||||
except Exception as e:
|
||||
print(f"Mem0 search error: {e}")
|
||||
return []
|
||||
|
||||
query_tokens = _extract_memory_query_tokens(query)
|
||||
statement = select(UserMemory).where(UserMemory.user_id == user_id)
|
||||
result = await db.execute(statement.order_by(UserMemory.importance.desc(), UserMemory.created_at.desc()))
|
||||
fallback_memories = list(result.scalars().all())
|
||||
|
||||
if _contains_hint(_normalize_query(query), MEMORY_QUERY_HINTS) or _matches_memory_query_pattern(_normalize_query(query)):
|
||||
return fallback_memories[:top_k]
|
||||
|
||||
if query_tokens:
|
||||
matched_memories = [
|
||||
memory for memory in fallback_memories
|
||||
if any(token in (memory.content or '').lower() for token in query_tokens)
|
||||
]
|
||||
return matched_memories[:top_k]
|
||||
|
||||
return []
|
||||
|
||||
|
||||
async def _mark_memories_recalled(db: AsyncSession, memories: list[UserMemory]) -> None:
|
||||
recalled_at = datetime.now(UTC)
|
||||
updated = False
|
||||
for memory in memories:
|
||||
memory.is_recalled = True
|
||||
memory.recall_count = (memory.recall_count or 0) + 1
|
||||
memory.last_recalled_at = recalled_at
|
||||
updated = True
|
||||
if updated:
|
||||
await db.commit()
|
||||
|
||||
|
||||
async def _run_tolerated_section(
|
||||
db: AsyncSession,
|
||||
section_name: str,
|
||||
builder,
|
||||
) -> str:
|
||||
try:
|
||||
return await builder()
|
||||
except Exception:
|
||||
logger.warning(
|
||||
"[MemoryService] %s失败,继续构建剩余上下文",
|
||||
section_name,
|
||||
exc_info=True,
|
||||
)
|
||||
return ""
|
||||
|
||||
|
||||
async def get_user_profile(db: AsyncSession, user_id: str) -> dict:
|
||||
@@ -339,6 +408,131 @@ async def get_user_profile(db: AsyncSession, user_id: str) -> dict:
|
||||
|
||||
# ———— 记忆组装: 供 Agent 使用的上下文 ————
|
||||
|
||||
MEMORY_QUERY_HINTS = (
|
||||
"记住",
|
||||
"记下",
|
||||
"记一下",
|
||||
"记着",
|
||||
"提醒",
|
||||
"偏好",
|
||||
"习惯",
|
||||
)
|
||||
MEMORY_QUERY_PATTERNS = (
|
||||
re.compile(r"\bremember\s+(?:that\s+)?i\b"),
|
||||
)
|
||||
GROUNDING_QUERY_HINTS = (
|
||||
"根据文档",
|
||||
"严格根据",
|
||||
"只根据",
|
||||
"文档内容",
|
||||
"grounded",
|
||||
"strictly based on",
|
||||
"based on the document",
|
||||
"based on the docs",
|
||||
"document only",
|
||||
"docs only",
|
||||
"only use the document",
|
||||
"only use the docs",
|
||||
)
|
||||
AVOID_USER_MEMORY_HINTS = (
|
||||
"不要结合我的个人偏好",
|
||||
"不要结合个人偏好",
|
||||
"不要结合偏好",
|
||||
"不要结合我的记忆",
|
||||
"不要结合记忆",
|
||||
)
|
||||
|
||||
|
||||
def _normalize_query(text: str) -> str:
|
||||
return text.strip().lower()
|
||||
|
||||
|
||||
def _contains_hint(text: str, hints: tuple[str, ...]) -> bool:
|
||||
return any(hint in text for hint in hints)
|
||||
|
||||
|
||||
def _matches_memory_query_pattern(text: str) -> bool:
|
||||
return any(pattern.search(text) for pattern in MEMORY_QUERY_PATTERNS)
|
||||
|
||||
|
||||
def _should_include_user_memories(query: str) -> bool:
|
||||
normalized_query = _normalize_query(query)
|
||||
if _contains_hint(normalized_query, GROUNDING_QUERY_HINTS):
|
||||
return False
|
||||
if _contains_hint(normalized_query, AVOID_USER_MEMORY_HINTS):
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def _should_include_summaries(query: str) -> bool:
|
||||
normalized_query = _normalize_query(query)
|
||||
if _contains_hint(normalized_query, GROUNDING_QUERY_HINTS):
|
||||
return False
|
||||
if _contains_hint(normalized_query, MEMORY_QUERY_HINTS):
|
||||
return False
|
||||
if _matches_memory_query_pattern(normalized_query):
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
async def _build_user_memory_section(
|
||||
db: AsyncSession,
|
||||
user_id: str,
|
||||
current_query: str,
|
||||
) -> str:
|
||||
memories = await recall_user_memories(db, user_id, current_query, top_k=5)
|
||||
if not memories:
|
||||
return ""
|
||||
|
||||
lines = []
|
||||
recalled_user_memories: list[UserMemory] = []
|
||||
for memory in memories:
|
||||
if isinstance(memory, UserMemory):
|
||||
memory_text = memory.content
|
||||
memory_type = memory.memory_type
|
||||
recalled_user_memories.append(memory)
|
||||
else:
|
||||
memory_text = memory.get("memory", memory.get("text", ""))
|
||||
memory_type = memory.get("memory_type")
|
||||
|
||||
if not memory_text:
|
||||
continue
|
||||
|
||||
if memory_type:
|
||||
lines.append(f" [{memory_type}] {memory_text}")
|
||||
else:
|
||||
lines.append(f" - {memory_text}")
|
||||
|
||||
if not lines:
|
||||
return ""
|
||||
|
||||
if recalled_user_memories:
|
||||
await _mark_memories_recalled(db, recalled_user_memories)
|
||||
return "【用户记忆】\n" + "\n".join(lines)
|
||||
|
||||
|
||||
async def _build_summary_section(db: AsyncSession, conversation_id: str) -> str:
|
||||
summaries = await get_summaries(db, conversation_id)
|
||||
if not summaries:
|
||||
return ""
|
||||
|
||||
recent = summaries[-2:]
|
||||
lines = [f"[对话摘要{i + 1}] {summary.summary_text}" for i, summary in enumerate(recent)]
|
||||
return "【之前对话摘要】\n" + "\n".join(lines)
|
||||
|
||||
|
||||
async def _build_brain_section(
|
||||
db: AsyncSession,
|
||||
user_id: str,
|
||||
current_query: str,
|
||||
) -> str:
|
||||
brain_memories = await BrainService(db).recall_memories(user_id, current_query, top_k=3)
|
||||
if not brain_memories:
|
||||
return ""
|
||||
|
||||
lines = [f"- {memory.title}: {memory.content}" for memory in brain_memories]
|
||||
return "【知识大脑】\n" + "\n".join(lines)
|
||||
|
||||
|
||||
async def build_memory_context(
|
||||
db: AsyncSession,
|
||||
@@ -350,30 +544,33 @@ async def build_memory_context(
|
||||
构建完整的记忆上下文字符串,
|
||||
供注入到 Agent system prompt 中使用。
|
||||
"""
|
||||
parts = []
|
||||
parts: list[str] = []
|
||||
|
||||
memories = await recall_user_memories(db, user_id, current_query, top_k=5)
|
||||
if memories:
|
||||
lines = []
|
||||
for m in memories:
|
||||
memory_text = m.get("memory", m.get("text", ""))
|
||||
if memory_text:
|
||||
lines.append(f" - {memory_text}")
|
||||
if lines:
|
||||
parts.append("【用户记忆】\n" + "\n".join(lines))
|
||||
if _should_include_user_memories(current_query):
|
||||
user_memory_section = await _run_tolerated_section(
|
||||
db,
|
||||
"用户记忆召回",
|
||||
lambda: _build_user_memory_section(db, user_id, current_query),
|
||||
)
|
||||
if user_memory_section:
|
||||
parts.append(user_memory_section)
|
||||
|
||||
summaries = await get_summaries(db, conversation_id)
|
||||
if summaries:
|
||||
recent = summaries[-2:]
|
||||
lines = [f"[对话摘要{i + 1}] {s.summary_text}" for i, s in enumerate(recent)]
|
||||
parts.append("【之前对话摘要】\n" + "\n".join(lines))
|
||||
if _should_include_summaries(current_query):
|
||||
summary_section = await _run_tolerated_section(
|
||||
db,
|
||||
"对话摘要加载",
|
||||
lambda: _build_summary_section(db, conversation_id),
|
||||
)
|
||||
if summary_section:
|
||||
parts.append(summary_section)
|
||||
|
||||
brain_memories = await BrainService(db).recall_memories(user_id, current_query, top_k=3)
|
||||
if brain_memories:
|
||||
lines = []
|
||||
for memory in brain_memories:
|
||||
lines.append(f"- {memory.title}: {memory.content}")
|
||||
parts.append("【知识大脑】\n" + "\n".join(lines))
|
||||
brain_section = await _run_tolerated_section(
|
||||
db,
|
||||
"知识大脑召回",
|
||||
lambda: _build_brain_section(db, user_id, current_query),
|
||||
)
|
||||
if brain_section:
|
||||
parts.append(brain_section)
|
||||
|
||||
if not parts:
|
||||
return ""
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
317
backend/tests/backend/app/agents/test_graph_system_messages.py
Normal file
317
backend/tests/backend/app/agents/test_graph_system_messages.py
Normal file
@@ -0,0 +1,317 @@
|
||||
import sys
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import Mock
|
||||
|
||||
from langchain_core.messages import AIMessage, HumanMessage
|
||||
|
||||
sys.modules.setdefault("trafilatura", Mock())
|
||||
|
||||
from app.agents.graph import _build_system_messages, _run_sub_commander
|
||||
from app.agents.state import AgentRole
|
||||
|
||||
|
||||
def _base_state(message: str, user_llm_config: dict | None = None) -> dict:
|
||||
return {
|
||||
"messages": [HumanMessage(content=message)],
|
||||
"user_id": "u1",
|
||||
"conversation_id": "c1",
|
||||
"current_agent": AgentRole.MASTER,
|
||||
"active_agents": [AgentRole.MASTER],
|
||||
"current_sub_commander": None,
|
||||
"active_sub_commanders": [],
|
||||
"sub_commander_trace": [],
|
||||
"pending_tasks": [],
|
||||
"completed_tasks": [],
|
||||
"tool_calls": [],
|
||||
"last_tool_result": None,
|
||||
"action_results": [],
|
||||
"created_entities": [],
|
||||
"tool_strategy_used": None,
|
||||
"provider_capabilities": None,
|
||||
"fallback_parse_error": None,
|
||||
"knowledge_context": None,
|
||||
"graph_context": None,
|
||||
"schedule_context_summary": None,
|
||||
"plan": None,
|
||||
"plan_steps": [],
|
||||
"analysis_report": None,
|
||||
"final_response": None,
|
||||
"should_respond": True,
|
||||
"memory_context": "memory context",
|
||||
"current_datetime_context": "CURRENT_TIME: 2026-03-28T12:00:00+08:00",
|
||||
"current_datetime_reference": {
|
||||
"current_time_iso": "2026-03-28T12:00:00+08:00",
|
||||
"current_date_iso": "2026-03-28",
|
||||
"timezone": "UTC",
|
||||
},
|
||||
"user_llm_config": user_llm_config,
|
||||
}
|
||||
|
||||
|
||||
class FakeTool:
|
||||
def __init__(self, name: str, result: str):
|
||||
self.name = name
|
||||
self.result = result
|
||||
self.invocations: list[dict] = []
|
||||
|
||||
def invoke(self, args: dict):
|
||||
self.invocations.append(args)
|
||||
return self.result
|
||||
|
||||
|
||||
class SingleSystemMessageLLM:
|
||||
def __init__(self):
|
||||
self.calls = 0
|
||||
self.system_message_counts: list[int] = []
|
||||
self._jarvis_provider_capabilities = SimpleNamespace(
|
||||
provider="minimax",
|
||||
supports_native_tools=False,
|
||||
preferred_tool_strategy="json_fallback",
|
||||
)
|
||||
|
||||
async def ainvoke(self, messages):
|
||||
self.calls += 1
|
||||
self.system_message_counts.append(
|
||||
sum(1 for message in messages if getattr(message, "type", None) == "system")
|
||||
)
|
||||
if self.system_message_counts[-1] != 1:
|
||||
raise AssertionError(
|
||||
f"expected exactly one system message, got {self.system_message_counts[-1]}"
|
||||
)
|
||||
if self.calls == 1:
|
||||
return AIMessage(
|
||||
content=(
|
||||
'{"mode":"tool_call","tool_calls":[{"name":"create_reminder",'
|
||||
'"arguments":{"title":"blanket","reminder_at":"\\u660e\\u5929 09:00"}}]}'
|
||||
)
|
||||
)
|
||||
return AIMessage(content="created reminder for blanket")
|
||||
|
||||
|
||||
def test_build_system_messages_includes_structured_continuity_summary():
|
||||
state = _base_state("创建")
|
||||
state["pending_action"] = {
|
||||
"type": "schedule_creation",
|
||||
"summary": "为周报安排明天下午提醒",
|
||||
"status": "pending",
|
||||
}
|
||||
state["routing_decision"] = {
|
||||
"target_agent": AgentRole.SCHEDULE_PLANNER.value,
|
||||
"reason": "continue_pending_action",
|
||||
}
|
||||
state["continuity_state"] = {"status": "fresh"}
|
||||
|
||||
messages = _build_system_messages(
|
||||
state,
|
||||
"manager prompt",
|
||||
AgentRole.SCHEDULE_PLANNER,
|
||||
"schedule_planning",
|
||||
)
|
||||
|
||||
system_text = "\n\n".join(str(getattr(message, "content", "")) for message in messages)
|
||||
assert "pending_action" in system_text
|
||||
assert "schedule_creation" in system_text
|
||||
assert "continue_pending_action" in system_text
|
||||
assert "为周报安排明天下午提醒" in system_text
|
||||
|
||||
|
||||
def test_build_system_messages_skips_structured_continuity_when_pending_action_is_not_pending():
|
||||
state = _base_state("创建")
|
||||
state["pending_action"] = {
|
||||
"type": "schedule_creation",
|
||||
"summary": "为周报安排明天下午提醒",
|
||||
"status": "completed",
|
||||
}
|
||||
state["routing_decision"] = {
|
||||
"target_agent": AgentRole.SCHEDULE_PLANNER.value,
|
||||
"reason": "continue_pending_action",
|
||||
}
|
||||
state["continuity_state"] = {"status": "fresh"}
|
||||
|
||||
messages = _build_system_messages(
|
||||
state,
|
||||
"manager prompt",
|
||||
AgentRole.SCHEDULE_PLANNER,
|
||||
"schedule_planning",
|
||||
)
|
||||
|
||||
system_text = "\n\n".join(str(getattr(message, "content", "")) for message in messages)
|
||||
assert "structured_continuity" not in system_text
|
||||
assert "continue_pending_action" not in system_text
|
||||
|
||||
|
||||
def test_build_system_messages_skips_structured_continuity_when_routing_reason_is_not_continuation():
|
||||
state = _base_state("创建")
|
||||
state["pending_action"] = {
|
||||
"type": "schedule_creation",
|
||||
"summary": "为周报安排明天下午提醒",
|
||||
"status": "pending",
|
||||
}
|
||||
state["routing_decision"] = {
|
||||
"target_agent": AgentRole.SCHEDULE_PLANNER.value,
|
||||
"reason": "initial_schedule_detection",
|
||||
}
|
||||
state["continuity_state"] = {"status": "fresh"}
|
||||
|
||||
messages = _build_system_messages(
|
||||
state,
|
||||
"manager prompt",
|
||||
AgentRole.SCHEDULE_PLANNER,
|
||||
"schedule_planning",
|
||||
)
|
||||
|
||||
system_text = "\n\n".join(str(getattr(message, "content", "")) for message in messages)
|
||||
assert "structured_continuity" not in system_text
|
||||
assert "continue_pending_action" not in system_text
|
||||
|
||||
|
||||
def test_build_system_messages_skips_structured_continuity_when_routing_decision_missing():
|
||||
state = _base_state("创建")
|
||||
state["pending_action"] = {
|
||||
"type": "schedule_creation",
|
||||
"summary": "为周报安排明天下午提醒",
|
||||
}
|
||||
state["routing_decision"] = None
|
||||
|
||||
messages = _build_system_messages(
|
||||
state,
|
||||
"manager prompt",
|
||||
AgentRole.SCHEDULE_PLANNER,
|
||||
"schedule_planning",
|
||||
)
|
||||
|
||||
system_text = "\n\n".join(str(getattr(message, "content", "")) for message in messages)
|
||||
assert "pending_action" not in system_text
|
||||
assert "schedule_creation" not in system_text
|
||||
assert "为周报安排明天下午提醒" not in system_text
|
||||
|
||||
|
||||
def test_build_system_messages_skips_stale_structured_continuity_for_unrelated_new_request():
|
||||
state = _base_state(
|
||||
"帮我搜索 Rust 异步 trait 最佳实践",
|
||||
{
|
||||
"provider": "openai",
|
||||
"model": "MiniMax-M2.7-highspeed",
|
||||
"base_url": "https://api.minimaxi.com/v1",
|
||||
},
|
||||
)
|
||||
state["current_agent"] = AgentRole.SCHEDULE_PLANNER
|
||||
state["pending_action"] = {
|
||||
"type": "schedule_creation",
|
||||
"summary": "为周报安排明天下午提醒",
|
||||
"status": "pending",
|
||||
}
|
||||
state["routing_decision"] = {
|
||||
"target_agent": AgentRole.SCHEDULE_PLANNER.value,
|
||||
"reason": "continue_pending_action",
|
||||
}
|
||||
state["continuity_state"] = {
|
||||
"status": "stale",
|
||||
"override_reason": "new_explicit_request",
|
||||
}
|
||||
|
||||
messages = _build_system_messages(
|
||||
state,
|
||||
"manager prompt",
|
||||
AgentRole.SCHEDULE_PLANNER,
|
||||
"schedule_planning",
|
||||
)
|
||||
|
||||
system_text = "\n\n".join(str(getattr(message, "content", "")) for message in messages)
|
||||
assert "structured_continuity" not in system_text
|
||||
assert "pending_action" not in system_text
|
||||
assert "continue_pending_action" not in system_text
|
||||
|
||||
|
||||
def test_build_system_messages_uses_role_scoped_context_instead_of_raw_memory_blob():
|
||||
state = _base_state("帮我搜索 Rust 异步 trait 最佳实践")
|
||||
state["memory_context"] = "【用户记忆】\n- 用户喜欢燕麦拿铁。\n\n【之前对话摘要】\n[对话摘要1] 之前聊过提醒。\n\n【知识大脑】\n- Rust Async: trait object 需要 pin。"
|
||||
state["schedule_context_summary"] = "【用户记忆】\n- 用户喜欢燕麦拿铁。\n\n【之前对话摘要】\n[对话摘要1] 之前聊过提醒。"
|
||||
state["knowledge_context"] = "【知识大脑】\n- Rust Async: trait object 需要 pin。"
|
||||
state["analysis_report"] = "【之前对话摘要】\n[对话摘要1] 之前聊过提醒。\n\n【知识大脑】\n- Rust Async: trait object 需要 pin。"
|
||||
|
||||
messages = _build_system_messages(
|
||||
state,
|
||||
"manager prompt",
|
||||
AgentRole.LIBRARIAN,
|
||||
"librarian_retrieval",
|
||||
)
|
||||
|
||||
system_text = "\n\n".join(str(getattr(message, "content", "")) for message in messages)
|
||||
assert "角色上下文" in system_text
|
||||
assert "【知识大脑】" in system_text
|
||||
assert "Rust Async" in system_text
|
||||
assert "用户喜欢燕麦拿铁" not in system_text
|
||||
assert "记忆上下文" not in system_text
|
||||
|
||||
|
||||
def test_build_system_messages_keeps_fresh_structured_continuity_for_matching_followup():
|
||||
state = _base_state(
|
||||
"创建",
|
||||
{
|
||||
"provider": "openai",
|
||||
"model": "MiniMax-M2.7-highspeed",
|
||||
"base_url": "https://api.minimaxi.com/v1",
|
||||
},
|
||||
)
|
||||
state["current_agent"] = AgentRole.SCHEDULE_PLANNER
|
||||
state["pending_action"] = {
|
||||
"type": "schedule_creation",
|
||||
"summary": "为周报安排明天下午提醒",
|
||||
"status": "pending",
|
||||
}
|
||||
state["routing_decision"] = {
|
||||
"target_agent": AgentRole.SCHEDULE_PLANNER.value,
|
||||
"reason": "continue_pending_action",
|
||||
}
|
||||
state["continuity_state"] = {
|
||||
"status": "fresh",
|
||||
}
|
||||
|
||||
messages = _build_system_messages(
|
||||
state,
|
||||
"manager prompt",
|
||||
AgentRole.SCHEDULE_PLANNER,
|
||||
"schedule_planning",
|
||||
)
|
||||
|
||||
system_text = "\n\n".join(str(getattr(message, "content", "")) for message in messages)
|
||||
assert "pending_action" in system_text
|
||||
assert "continue_pending_action" in system_text
|
||||
|
||||
|
||||
async def test_run_sub_commander_coalesces_system_messages_for_openai_compatible_provider(
|
||||
monkeypatch,
|
||||
):
|
||||
fake_llm = SingleSystemMessageLLM()
|
||||
fake_tool = FakeTool("create_reminder", "created reminder: blanket @ tomorrow 09:00")
|
||||
|
||||
monkeypatch.setattr("app.agents.graph._get_llm_for_state", lambda state: fake_llm)
|
||||
monkeypatch.setitem(
|
||||
__import__("app.agents.graph", fromlist=["SUB_COMMANDER_TOOLSETS"]).SUB_COMMANDER_TOOLSETS,
|
||||
"schedule_planning",
|
||||
[fake_tool],
|
||||
)
|
||||
|
||||
state = _base_state(
|
||||
"给我设置明天的提醒,提醒我收被子",
|
||||
{
|
||||
"provider": "openai",
|
||||
"model": "MiniMax-M2.7-highspeed",
|
||||
"base_url": "https://api.minimaxi.com/v1",
|
||||
},
|
||||
)
|
||||
state["current_agent"] = AgentRole.SCHEDULE_PLANNER
|
||||
|
||||
result = await _run_sub_commander(
|
||||
state,
|
||||
AgentRole.SCHEDULE_PLANNER,
|
||||
"manager prompt",
|
||||
"给我设置明天的提醒,提醒我收被子",
|
||||
use_tools=True,
|
||||
)
|
||||
|
||||
assert fake_llm.system_message_counts == [1, 1]
|
||||
assert result["tool_strategy_used"] == "json_fallback"
|
||||
assert fake_tool.invocations == [{"title": "blanket", "reminder_at": "2026-03-29T09:00:00"}]
|
||||
assert result["final_response"] == "created reminder for blanket"
|
||||
@@ -47,3 +47,27 @@ def test_web_search_tool_returns_stable_message_when_unavailable(monkeypatch):
|
||||
result = web_search.func('Jarvis')
|
||||
|
||||
assert result == '网页搜索不可用: 网页搜索未启用或未配置'
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_web_search_tool_runs_from_active_event_loop(monkeypatch):
|
||||
class FakeService:
|
||||
async def search(self, query: str, limit: int | None = None):
|
||||
assert query == 'Jarvis 最新更新'
|
||||
assert limit == 1
|
||||
return [
|
||||
FakeResult(
|
||||
title='Jarvis release notes',
|
||||
url='https://example.com/jarvis-release',
|
||||
snippet='Latest Jarvis changes.',
|
||||
source='duckduckgo',
|
||||
published_at='2026-03-29',
|
||||
)
|
||||
]
|
||||
|
||||
monkeypatch.setattr('app.services.web_search_service.WebSearchService', FakeService)
|
||||
|
||||
result = web_search.func('Jarvis 最新更新', top_k=1)
|
||||
|
||||
assert '[1] Jarvis release notes' in result
|
||||
assert '链接: https://example.com/jarvis-release' in result
|
||||
|
||||
@@ -2,6 +2,7 @@ import pytest
|
||||
|
||||
from app.agents.tools import forum as forum_tools
|
||||
from app.agents.tools import schedule as schedule_tools
|
||||
from app.agents.tools import search as search_tools
|
||||
from app.agents.tools import task as task_tools
|
||||
|
||||
|
||||
@@ -12,6 +13,7 @@ from app.agents.tools import task as task_tools
|
||||
(task_tools, "task"),
|
||||
(schedule_tools, "schedule"),
|
||||
(forum_tools, "forum"),
|
||||
(search_tools, "search"),
|
||||
],
|
||||
)
|
||||
async def test_run_async_bridge_works_inside_running_event_loop(module, label):
|
||||
|
||||
@@ -127,15 +127,14 @@ class FakeStreamingFallbackWithContinuityGraph:
|
||||
return {
|
||||
'final_response': '这是回退后的同步回答。',
|
||||
'continuity_state': {
|
||||
'active_agent': 'executor',
|
||||
'active_sub_flow': 'create_task',
|
||||
'status': 'awaiting_clarification',
|
||||
'status': 'fresh',
|
||||
'mode': 'resume_after_clarification',
|
||||
},
|
||||
'pending_action': {
|
||||
'agent': 'executor',
|
||||
'sub_flow': 'create_task',
|
||||
'action_type': 'create_task',
|
||||
'status': 'awaiting_confirmation',
|
||||
'type': 'create_task',
|
||||
'owner_agent': 'executor',
|
||||
'owner_sub_commander': 'create_task',
|
||||
'status': 'pending',
|
||||
},
|
||||
}
|
||||
|
||||
@@ -690,25 +689,25 @@ async def test_streaming_chat_fallback_reuses_rehydrated_continuity_snapshot(bra
|
||||
'user_turn_type': 'continuation',
|
||||
'user_turn_signal': 'clarification_answer',
|
||||
'active_agent': 'executor',
|
||||
'active_sub_flow': 'create_reminder',
|
||||
'active_sub_commander': 'create_reminder',
|
||||
},
|
||||
'current_agent': 'executor',
|
||||
'clarification_context': {
|
||||
'awaiting_user_input': True,
|
||||
'active_agent': 'executor',
|
||||
'sub_flow': 'create_reminder',
|
||||
'owning_agent': 'executor',
|
||||
'owning_sub_commander': 'create_reminder',
|
||||
'target_action': 'create_reminder',
|
||||
'question': '你想提醒几点?',
|
||||
'status': 'pending',
|
||||
},
|
||||
'pending_action': {
|
||||
'agent': 'executor',
|
||||
'sub_flow': 'create_reminder',
|
||||
'action_type': 'clarification',
|
||||
'status': 'awaiting_clarification',
|
||||
'type': 'clarification',
|
||||
'owner_agent': 'executor',
|
||||
'owner_sub_commander': 'create_reminder',
|
||||
'status': 'blocked_on_clarification',
|
||||
},
|
||||
'continuity_state': {
|
||||
'active_agent': 'executor',
|
||||
'active_sub_flow': 'create_reminder',
|
||||
'status': 'awaiting_clarification',
|
||||
'status': 'fresh',
|
||||
'mode': 'resume_after_clarification',
|
||||
},
|
||||
}
|
||||
conversation.agent_state = {
|
||||
@@ -927,21 +926,21 @@ async def test_chat_simple_persists_continuity_snapshot_on_assistant_message(bra
|
||||
return {
|
||||
'final_response': '需要你确认下一步。',
|
||||
'pending_action': {
|
||||
'agent': 'executor',
|
||||
'sub_flow': 'create_task',
|
||||
'action_type': 'create_task',
|
||||
'status': 'awaiting_confirmation',
|
||||
'type': 'create_task',
|
||||
'owner_agent': 'executor',
|
||||
'owner_sub_commander': 'create_task',
|
||||
'status': 'pending',
|
||||
},
|
||||
'clarification_context': {
|
||||
'awaiting_user_input': True,
|
||||
'active_agent': 'executor',
|
||||
'sub_flow': 'create_task',
|
||||
'owning_agent': 'executor',
|
||||
'owning_sub_commander': 'create_task',
|
||||
'target_action': 'create_task',
|
||||
'question': '要现在创建吗?',
|
||||
'status': 'pending',
|
||||
},
|
||||
'continuity_state': {
|
||||
'active_agent': 'executor',
|
||||
'active_sub_flow': 'create_task',
|
||||
'status': 'awaiting_clarification',
|
||||
'status': 'fresh',
|
||||
'mode': 'resume_after_clarification',
|
||||
},
|
||||
'last_completed_action': {
|
||||
'tool_name': 'create_task',
|
||||
@@ -972,15 +971,14 @@ async def test_chat_simple_persists_continuity_snapshot_on_assistant_message(bra
|
||||
'version': 1,
|
||||
'state': {
|
||||
'continuity_state': {
|
||||
'active_agent': 'executor',
|
||||
'active_sub_flow': 'create_task',
|
||||
'status': 'awaiting_clarification',
|
||||
'status': 'fresh',
|
||||
'mode': 'resume_after_clarification',
|
||||
},
|
||||
'pending_action': {
|
||||
'agent': 'executor',
|
||||
'sub_flow': 'create_task',
|
||||
'action_type': 'create_task',
|
||||
'status': 'awaiting_confirmation',
|
||||
'type': 'create_task',
|
||||
'owner_agent': 'executor',
|
||||
'owner_sub_commander': 'create_task',
|
||||
'status': 'pending',
|
||||
},
|
||||
'last_completed_action': {
|
||||
'tool_name': 'create_task',
|
||||
@@ -989,10 +987,11 @@ async def test_chat_simple_persists_continuity_snapshot_on_assistant_message(bra
|
||||
'entity_type': 'task',
|
||||
},
|
||||
'clarification_context': {
|
||||
'awaiting_user_input': True,
|
||||
'active_agent': 'executor',
|
||||
'sub_flow': 'create_task',
|
||||
'owning_agent': 'executor',
|
||||
'owning_sub_commander': 'create_task',
|
||||
'target_action': 'create_task',
|
||||
'question': '要现在创建吗?',
|
||||
'status': 'pending',
|
||||
},
|
||||
},
|
||||
}]
|
||||
@@ -1005,21 +1004,21 @@ async def test_streaming_chat_persists_continuity_snapshot_in_assistant_message_
|
||||
final_response='继续处理。',
|
||||
output_state={
|
||||
'continuity_state': {
|
||||
'active_agent': 'executor',
|
||||
'active_sub_flow': 'create_task',
|
||||
'status': 'awaiting_clarification',
|
||||
'status': 'fresh',
|
||||
'mode': 'resume_after_clarification',
|
||||
},
|
||||
'pending_action': {
|
||||
'agent': 'executor',
|
||||
'sub_flow': 'create_task',
|
||||
'action_type': 'create_task',
|
||||
'status': 'awaiting_confirmation',
|
||||
'type': 'create_task',
|
||||
'owner_agent': 'executor',
|
||||
'owner_sub_commander': 'create_task',
|
||||
'status': 'pending',
|
||||
},
|
||||
'clarification_context': {
|
||||
'awaiting_user_input': True,
|
||||
'active_agent': 'executor',
|
||||
'sub_flow': 'create_task',
|
||||
'owning_agent': 'executor',
|
||||
'owning_sub_commander': 'create_task',
|
||||
'target_action': 'create_task',
|
||||
'question': '要现在创建吗?',
|
||||
'status': 'pending',
|
||||
},
|
||||
},
|
||||
)
|
||||
@@ -1044,21 +1043,21 @@ async def test_streaming_chat_persists_continuity_snapshot_in_assistant_message_
|
||||
|
||||
expected_state_fields = {
|
||||
'continuity_state': {
|
||||
'active_agent': 'executor',
|
||||
'active_sub_flow': 'create_task',
|
||||
'status': 'awaiting_clarification',
|
||||
'status': 'fresh',
|
||||
'mode': 'resume_after_clarification',
|
||||
},
|
||||
'pending_action': {
|
||||
'agent': 'executor',
|
||||
'sub_flow': 'create_task',
|
||||
'action_type': 'create_task',
|
||||
'status': 'awaiting_confirmation',
|
||||
'type': 'create_task',
|
||||
'owner_agent': 'executor',
|
||||
'owner_sub_commander': 'create_task',
|
||||
'status': 'pending',
|
||||
},
|
||||
'clarification_context': {
|
||||
'awaiting_user_input': True,
|
||||
'active_agent': 'executor',
|
||||
'sub_flow': 'create_task',
|
||||
'owning_agent': 'executor',
|
||||
'owning_sub_commander': 'create_task',
|
||||
'target_action': 'create_task',
|
||||
'question': '要现在创建吗?',
|
||||
'status': 'pending',
|
||||
},
|
||||
}
|
||||
|
||||
@@ -1071,6 +1070,7 @@ async def test_streaming_chat_persists_continuity_snapshot_in_assistant_message_
|
||||
assert persisted_snapshot['state'][key] == value
|
||||
assert conversation is not None
|
||||
assert conversation.agent_state == {
|
||||
'kind': 'agent_continuity_state',
|
||||
'version': persisted_snapshot['version'],
|
||||
'state': persisted_snapshot['state'],
|
||||
}
|
||||
@@ -1099,21 +1099,21 @@ async def test_streaming_chat_rehydrates_previous_continuity_snapshot(brain_inge
|
||||
'version': 1,
|
||||
'state': {
|
||||
'pending_action': {
|
||||
'agent': 'executor',
|
||||
'sub_flow': 'create_task',
|
||||
'action_type': 'create_task',
|
||||
'status': 'awaiting_confirmation',
|
||||
'type': 'create_task',
|
||||
'owner_agent': 'executor',
|
||||
'owner_sub_commander': 'create_task',
|
||||
'status': 'pending',
|
||||
},
|
||||
'clarification_context': {
|
||||
'awaiting_user_input': True,
|
||||
'active_agent': 'executor',
|
||||
'sub_flow': 'create_task',
|
||||
'owning_agent': 'executor',
|
||||
'owning_sub_commander': 'create_task',
|
||||
'target_action': 'create_task',
|
||||
'question': '要现在创建吗?',
|
||||
'status': 'pending',
|
||||
},
|
||||
'continuity_state': {
|
||||
'active_agent': 'executor',
|
||||
'active_sub_flow': 'create_task',
|
||||
'status': 'awaiting_clarification',
|
||||
'status': 'fresh',
|
||||
'mode': 'resume_after_clarification',
|
||||
},
|
||||
'last_completed_action': {
|
||||
'tool_name': 'create_task',
|
||||
@@ -1139,21 +1139,21 @@ async def test_streaming_chat_rehydrates_previous_continuity_snapshot(brain_inge
|
||||
|
||||
assert streaming_graph.captured_state is not None
|
||||
assert streaming_graph.captured_state['pending_action'] == {
|
||||
'agent': 'executor',
|
||||
'sub_flow': 'create_task',
|
||||
'action_type': 'create_task',
|
||||
'status': 'awaiting_confirmation',
|
||||
'type': 'create_task',
|
||||
'owner_agent': 'executor',
|
||||
'owner_sub_commander': 'create_task',
|
||||
'status': 'pending',
|
||||
}
|
||||
assert streaming_graph.captured_state['clarification_context'] == {
|
||||
'awaiting_user_input': True,
|
||||
'active_agent': 'executor',
|
||||
'sub_flow': 'create_task',
|
||||
'owning_agent': 'executor',
|
||||
'owning_sub_commander': 'create_task',
|
||||
'target_action': 'create_task',
|
||||
'question': '要现在创建吗?',
|
||||
'status': 'pending',
|
||||
}
|
||||
assert streaming_graph.captured_state['continuity_state'] == {
|
||||
'active_agent': 'executor',
|
||||
'active_sub_flow': 'create_task',
|
||||
'status': 'awaiting_clarification',
|
||||
'status': 'fresh',
|
||||
'mode': 'resume_after_clarification',
|
||||
}
|
||||
assert streaming_graph.captured_state['last_completed_action'] == {
|
||||
'tool_name': 'create_task',
|
||||
@@ -1374,11 +1374,11 @@ async def test_build_memory_context_includes_brain_memory_section(brain_ingestio
|
||||
'Jarvis 接下来应该优先做什么?',
|
||||
)
|
||||
|
||||
assert '【用户记忆】' in context
|
||||
assert '【之前对话摘要】' in context
|
||||
assert '【知识大脑】' in context
|
||||
assert 'Knowledge brain phase 1' in context
|
||||
assert 'Jarvis should learn from conversation and document events first.' in context
|
||||
assert '【用户记忆】' not in context
|
||||
assert 'Forum moderation policy' not in context
|
||||
|
||||
|
||||
@@ -1397,25 +1397,25 @@ async def test_chat_simple_rehydrates_clarification_follow_up_state_into_langgra
|
||||
'user_turn_type': 'continuation',
|
||||
'user_turn_signal': 'clarification_answer',
|
||||
'active_agent': 'executor',
|
||||
'active_sub_flow': 'create_reminder',
|
||||
'active_sub_commander': 'create_reminder',
|
||||
},
|
||||
'current_agent': 'executor',
|
||||
'clarification_context': {
|
||||
'awaiting_user_input': True,
|
||||
'active_agent': 'executor',
|
||||
'sub_flow': 'create_reminder',
|
||||
'owning_agent': 'executor',
|
||||
'owning_sub_commander': 'create_reminder',
|
||||
'target_action': 'create_reminder',
|
||||
'question': '你想提醒几点?',
|
||||
'status': 'pending',
|
||||
},
|
||||
'pending_action': {
|
||||
'agent': 'executor',
|
||||
'sub_flow': 'create_reminder',
|
||||
'action_type': 'clarification',
|
||||
'status': 'awaiting_clarification',
|
||||
'type': 'clarification',
|
||||
'owner_agent': 'executor',
|
||||
'owner_sub_commander': 'create_reminder',
|
||||
'status': 'blocked_on_clarification',
|
||||
},
|
||||
'continuity_state': {
|
||||
'active_agent': 'executor',
|
||||
'active_sub_flow': 'create_reminder',
|
||||
'status': 'awaiting_clarification',
|
||||
'status': 'fresh',
|
||||
'mode': 'resume_after_clarification',
|
||||
},
|
||||
}
|
||||
session.add(Message(
|
||||
@@ -1465,25 +1465,25 @@ async def test_chat_simple_preserves_stale_continuity_state_for_fresh_request_ov
|
||||
'user_turn_type': 'continuation',
|
||||
'user_turn_signal': 'clarification_answer',
|
||||
'active_agent': 'executor',
|
||||
'active_sub_flow': 'create_reminder',
|
||||
'active_sub_commander': 'create_reminder',
|
||||
},
|
||||
'current_agent': 'executor',
|
||||
'clarification_context': {
|
||||
'awaiting_user_input': True,
|
||||
'active_agent': 'executor',
|
||||
'sub_flow': 'create_reminder',
|
||||
'owning_agent': 'executor',
|
||||
'owning_sub_commander': 'create_reminder',
|
||||
'target_action': 'create_reminder',
|
||||
'question': '你想提醒几点?',
|
||||
'status': 'pending',
|
||||
},
|
||||
'pending_action': {
|
||||
'agent': 'executor',
|
||||
'sub_flow': 'create_reminder',
|
||||
'action_type': 'clarification',
|
||||
'status': 'awaiting_clarification',
|
||||
'type': 'clarification',
|
||||
'owner_agent': 'executor',
|
||||
'owner_sub_commander': 'create_reminder',
|
||||
'status': 'blocked_on_clarification',
|
||||
},
|
||||
'continuity_state': {
|
||||
'active_agent': 'executor',
|
||||
'active_sub_flow': 'create_reminder',
|
||||
'status': 'awaiting_clarification',
|
||||
'status': 'fresh',
|
||||
'mode': 'resume_after_clarification',
|
||||
},
|
||||
'last_completed_action': {
|
||||
'tool_name': 'create_reminder',
|
||||
@@ -1546,25 +1546,24 @@ async def test_streaming_chat_rehydrates_continuation_state_and_memory_context_i
|
||||
'user_turn_type': 'continuation',
|
||||
'user_turn_signal': 'clarification_answer',
|
||||
'active_agent': 'schedule_planner',
|
||||
'active_sub_flow': 'plan_revision',
|
||||
'active_sub_commander': 'plan_revision',
|
||||
},
|
||||
'current_agent': 'schedule_planner',
|
||||
'clarification_context': {
|
||||
'awaiting_user_input': True,
|
||||
'active_agent': 'schedule_planner',
|
||||
'sub_flow': 'plan_revision',
|
||||
'owning_agent': 'schedule_planner',
|
||||
'owning_sub_commander': 'plan_revision',
|
||||
'question': '你想优先看总结版还是完整计划?',
|
||||
'status': 'pending',
|
||||
},
|
||||
'pending_action': {
|
||||
'agent': 'schedule_planner',
|
||||
'sub_flow': 'plan_revision',
|
||||
'action_type': 'clarification',
|
||||
'status': 'awaiting_clarification',
|
||||
'type': 'clarification',
|
||||
'owner_agent': 'schedule_planner',
|
||||
'owner_sub_commander': 'plan_revision',
|
||||
'status': 'blocked_on_clarification',
|
||||
},
|
||||
'continuity_state': {
|
||||
'active_agent': 'schedule_planner',
|
||||
'active_sub_flow': 'plan_revision',
|
||||
'status': 'awaiting_clarification',
|
||||
'status': 'fresh',
|
||||
'mode': 'resume_after_clarification',
|
||||
},
|
||||
}
|
||||
session.add(Message(
|
||||
@@ -1585,7 +1584,7 @@ async def test_streaming_chat_rehydrates_continuation_state_and_memory_context_i
|
||||
'【延续处理】\n'
|
||||
'- continuation context: this user turn continues an existing workflow.\n'
|
||||
'- active_agent: schedule_planner\n'
|
||||
'- active_sub_flow: plan_revision\n'
|
||||
'- active_sub_commander: plan_revision\n'
|
||||
'- user_turn_signal: clarification_answer'
|
||||
)
|
||||
|
||||
@@ -1617,3 +1616,380 @@ async def test_streaming_chat_rehydrates_continuation_state_and_memory_context_i
|
||||
assert graph.captured_state['pending_action'] == previous_snapshot['pending_action']
|
||||
assert graph.captured_state['continuity_state'] == previous_snapshot['continuity_state']
|
||||
assert graph.captured_state['current_agent'] == 'schedule_planner'
|
||||
async def test_build_memory_context_suppresses_summary_for_memory_query(brain_ingestion_env):
|
||||
session, user = brain_ingestion_env
|
||||
conversation = Conversation(user_id=user.id, title='Memory-only query test')
|
||||
session.add(conversation)
|
||||
await session.flush()
|
||||
|
||||
session.add(UserMemory(
|
||||
user_id=user.id,
|
||||
memory_type='preference',
|
||||
content='用户喜欢燕麦拿铁。',
|
||||
importance=8,
|
||||
source_conversation_id=conversation.id,
|
||||
))
|
||||
session.add(MemorySummary(
|
||||
user_id=user.id,
|
||||
conversation_id=conversation.id,
|
||||
summary_text='之前讨论了知识大脑迁移和文档入库流程。',
|
||||
turn_count=10,
|
||||
))
|
||||
await session.commit()
|
||||
|
||||
context = await memory_service.build_memory_context(
|
||||
session,
|
||||
user.id,
|
||||
conversation.id,
|
||||
'记住我喜欢燕麦拿铁,以后推荐咖啡时参考这个偏好。',
|
||||
)
|
||||
|
||||
assert '【用户记忆】' in context
|
||||
assert '用户喜欢燕麦拿铁。' in context
|
||||
assert '【之前对话摘要】' not in context
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_build_memory_context_keeps_summary_for_ambiguous_like_word_query(brain_ingestion_env):
|
||||
session, user = brain_ingestion_env
|
||||
conversation = Conversation(user_id=user.id, title='Ambiguous preference word test')
|
||||
session.add(conversation)
|
||||
await session.flush()
|
||||
|
||||
session.add(UserMemory(
|
||||
user_id=user.id,
|
||||
memory_type='preference',
|
||||
content='用户喜欢结构化输出。',
|
||||
importance=7,
|
||||
source_conversation_id=conversation.id,
|
||||
))
|
||||
session.add(MemorySummary(
|
||||
user_id=user.id,
|
||||
conversation_id=conversation.id,
|
||||
summary_text='之前已经总结过知识大脑迁移计划。',
|
||||
turn_count=6,
|
||||
))
|
||||
await session.commit()
|
||||
|
||||
context = await memory_service.build_memory_context(
|
||||
session,
|
||||
user.id,
|
||||
conversation.id,
|
||||
'你觉得用户会喜欢这个知识大脑迁移方案吗?顺便总结一下之前聊过的重点。',
|
||||
)
|
||||
|
||||
assert '【用户记忆】' not in context
|
||||
assert '【之前对话摘要】' in context
|
||||
assert '之前已经总结过知识大脑迁移计划。' in context
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_build_memory_context_keeps_summary_for_document_reference_query(brain_ingestion_env):
|
||||
session, user = brain_ingestion_env
|
||||
conversation = Conversation(user_id=user.id, title='Document reference query test')
|
||||
session.add(conversation)
|
||||
await session.flush()
|
||||
|
||||
session.add(UserMemory(
|
||||
user_id=user.id,
|
||||
memory_type='preference',
|
||||
content='用户偏好带示例的说明。',
|
||||
importance=7,
|
||||
source_conversation_id=conversation.id,
|
||||
))
|
||||
session.add(MemorySummary(
|
||||
user_id=user.id,
|
||||
conversation_id=conversation.id,
|
||||
summary_text='之前总结了文档入库和知识大脑联动流程。',
|
||||
turn_count=7,
|
||||
))
|
||||
await session.commit()
|
||||
|
||||
context = await memory_service.build_memory_context(
|
||||
session,
|
||||
user.id,
|
||||
conversation.id,
|
||||
'这个 document ingestion 方案会有什么影响?也请总结一下之前聊过的重点。',
|
||||
)
|
||||
|
||||
assert '【用户记忆】' not in context
|
||||
assert '【之前对话摘要】' in context
|
||||
assert '之前总结了文档入库和知识大脑联动流程。' in context
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_build_memory_context_suppresses_user_memory_for_grounded_query(brain_ingestion_env):
|
||||
session, user = brain_ingestion_env
|
||||
conversation = Conversation(user_id=user.id, title='Grounded query test')
|
||||
session.add(conversation)
|
||||
await session.flush()
|
||||
|
||||
session.add(UserMemory(
|
||||
user_id=user.id,
|
||||
memory_type='preference',
|
||||
content='用户偏好轻松随意的语气。',
|
||||
importance=9,
|
||||
source_conversation_id=conversation.id,
|
||||
))
|
||||
session.add(MemorySummary(
|
||||
user_id=user.id,
|
||||
conversation_id=conversation.id,
|
||||
summary_text='之前聊过论坛审核策略。',
|
||||
turn_count=8,
|
||||
))
|
||||
session.add(BrainMemory(
|
||||
user_id=user.id,
|
||||
memory_type='project_fact',
|
||||
title='Document ingestion flow',
|
||||
content='Document uploads are chunked before vector indexing.',
|
||||
importance=7,
|
||||
confidence=0.9,
|
||||
status='active',
|
||||
origin_source_types=['document'],
|
||||
metadata_={'source_count': 1},
|
||||
))
|
||||
await session.commit()
|
||||
|
||||
context = await memory_service.build_memory_context(
|
||||
session,
|
||||
user.id,
|
||||
conversation.id,
|
||||
'请严格根据文档内容说明 document ingestion flow,不要结合我的个人偏好。',
|
||||
)
|
||||
|
||||
assert '【知识大脑】' in context
|
||||
assert 'Document ingestion flow' in context
|
||||
assert '【用户记忆】' not in context
|
||||
assert '用户偏好轻松随意的语气。' not in context
|
||||
assert '【之前对话摘要】' not in context
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_build_memory_context_keeps_partial_context_when_user_memory_recall_fails(
|
||||
brain_ingestion_env,
|
||||
monkeypatch,
|
||||
caplog,
|
||||
):
|
||||
session, user = brain_ingestion_env
|
||||
conversation = Conversation(user_id=user.id, title='Partial context test')
|
||||
session.add(conversation)
|
||||
await session.flush()
|
||||
|
||||
session.add(MemorySummary(
|
||||
user_id=user.id,
|
||||
conversation_id=conversation.id,
|
||||
summary_text='之前总结了知识大脑的激活记忆策略。',
|
||||
turn_count=9,
|
||||
))
|
||||
session.add(BrainMemory(
|
||||
user_id=user.id,
|
||||
memory_type='project_fact',
|
||||
title='Active memory filter',
|
||||
content='Only active Brain memories should enter default prompt context.',
|
||||
importance=8,
|
||||
confidence=0.96,
|
||||
status='active',
|
||||
origin_source_types=['conversation'],
|
||||
metadata_={'source_count': 1},
|
||||
))
|
||||
await session.commit()
|
||||
|
||||
original_execute = session.execute
|
||||
recall_selects = 0
|
||||
|
||||
async def fail_recall_user_memories(*args, **kwargs):
|
||||
nonlocal recall_selects
|
||||
recall_selects += 1
|
||||
await original_execute(select(UserMemory).where(UserMemory.user_id == user.id))
|
||||
raise RuntimeError('mem0 unavailable')
|
||||
|
||||
monkeypatch.setattr(memory_service, 'recall_user_memories', fail_recall_user_memories)
|
||||
caplog.set_level('WARNING')
|
||||
|
||||
context = await memory_service.build_memory_context(
|
||||
session,
|
||||
user.id,
|
||||
conversation.id,
|
||||
'active memory filter',
|
||||
)
|
||||
|
||||
assert recall_selects == 1
|
||||
assert '【之前对话摘要】' in context
|
||||
assert '之前总结了知识大脑的激活记忆策略。' in context
|
||||
assert '【知识大脑】' in context
|
||||
assert 'Active memory filter' in context
|
||||
assert '【用户记忆】' not in context
|
||||
assert any('用户记忆召回失败' in record.message for record in caplog.records)
|
||||
assert any(record.exc_info for record in caplog.records if '用户记忆召回失败' in record.message)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_build_memory_context_does_not_rollback_caller_pending_message_on_tolerated_failure(
|
||||
brain_ingestion_env,
|
||||
monkeypatch,
|
||||
):
|
||||
session, user = brain_ingestion_env
|
||||
conversation = Conversation(user_id=user.id, title='Pending message preservation test')
|
||||
session.add(conversation)
|
||||
await session.flush()
|
||||
|
||||
pending_message = Message(
|
||||
conversation_id=conversation.id,
|
||||
role='user',
|
||||
content='这条消息不应因记忆召回失败而丢失。',
|
||||
)
|
||||
session.add(pending_message)
|
||||
|
||||
async def fail_recall_user_memories(*args, **kwargs):
|
||||
raise RuntimeError('mem0 unavailable')
|
||||
|
||||
monkeypatch.setattr(memory_service, 'recall_user_memories', fail_recall_user_memories)
|
||||
|
||||
context = await memory_service.build_memory_context(
|
||||
session,
|
||||
user.id,
|
||||
conversation.id,
|
||||
'active memory filter',
|
||||
)
|
||||
|
||||
await session.commit()
|
||||
|
||||
persisted_message = await session.get(Message, pending_message.id)
|
||||
|
||||
assert context == ''
|
||||
assert persisted_message is not None
|
||||
assert persisted_message.content == '这条消息不应因记忆召回失败而丢失。'
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_build_memory_context_skips_unrelated_user_memory_when_fallback_has_no_query_match(brain_ingestion_env):
|
||||
session, user = brain_ingestion_env
|
||||
conversation = Conversation(user_id=user.id, title='Irrelevant fallback memory test')
|
||||
session.add(conversation)
|
||||
await session.flush()
|
||||
|
||||
session.add(UserMemory(
|
||||
user_id=user.id,
|
||||
memory_type='preference',
|
||||
content='用户喜欢燕麦拿铁。',
|
||||
importance=8,
|
||||
source_conversation_id=conversation.id,
|
||||
))
|
||||
await session.commit()
|
||||
|
||||
context = await memory_service.build_memory_context(
|
||||
session,
|
||||
user.id,
|
||||
conversation.id,
|
||||
'讨论数据库迁移回滚策略。',
|
||||
)
|
||||
|
||||
assert '【用户记忆】' not in context
|
||||
assert '用户喜欢燕麦拿铁。' not in context
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_build_memory_context_marks_recalled_memories_in_single_commit(
|
||||
brain_ingestion_env,
|
||||
monkeypatch,
|
||||
):
|
||||
session, user = brain_ingestion_env
|
||||
conversation = Conversation(user_id=user.id, title='Recall batching test')
|
||||
session.add(conversation)
|
||||
await session.flush()
|
||||
|
||||
memories = [
|
||||
UserMemory(
|
||||
user_id=user.id,
|
||||
memory_type='preference',
|
||||
content='用户偏好简洁回答。',
|
||||
importance=7,
|
||||
source_conversation_id=conversation.id,
|
||||
),
|
||||
UserMemory(
|
||||
user_id=user.id,
|
||||
memory_type='goal',
|
||||
content='用户想推进知识大脑上线。',
|
||||
importance=6,
|
||||
source_conversation_id=conversation.id,
|
||||
),
|
||||
]
|
||||
session.add_all(memories)
|
||||
await session.commit()
|
||||
|
||||
original_commit = session.commit
|
||||
commit_calls = 0
|
||||
|
||||
async def counting_commit():
|
||||
nonlocal commit_calls
|
||||
commit_calls += 1
|
||||
await original_commit()
|
||||
|
||||
monkeypatch.setattr(session, 'commit', counting_commit)
|
||||
|
||||
context = await memory_service.build_memory_context(
|
||||
session,
|
||||
user.id,
|
||||
conversation.id,
|
||||
'请结合我的历史偏好给我建议。',
|
||||
)
|
||||
|
||||
assert '【用户记忆】' in context
|
||||
assert '用户偏好简洁回答。' in context
|
||||
assert '用户想推进知识大脑上线。' in context
|
||||
assert commit_calls == 1
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_build_memory_context_excludes_non_active_brain_memories(brain_ingestion_env):
|
||||
session, user = brain_ingestion_env
|
||||
conversation = Conversation(user_id=user.id, title='Brain status filter test')
|
||||
session.add(conversation)
|
||||
await session.flush()
|
||||
|
||||
session.add(BrainMemory(
|
||||
user_id=user.id,
|
||||
memory_type='project_fact',
|
||||
title='Active rollout note',
|
||||
content='Use only active Brain memories in the default prompt.',
|
||||
importance=9,
|
||||
confidence=0.97,
|
||||
status='active',
|
||||
origin_source_types=['conversation'],
|
||||
metadata_={'source_count': 1},
|
||||
))
|
||||
session.add(BrainMemory(
|
||||
user_id=user.id,
|
||||
memory_type='project_fact',
|
||||
title='Archived rollout note',
|
||||
content='This archived memory should stay out of the prompt.',
|
||||
importance=10,
|
||||
confidence=0.99,
|
||||
status='archived',
|
||||
origin_source_types=['conversation'],
|
||||
metadata_={'source_count': 1},
|
||||
))
|
||||
session.add(BrainMemory(
|
||||
user_id=user.id,
|
||||
memory_type='project_fact',
|
||||
title='Superseded rollout note',
|
||||
content='This superseded memory should stay out of the prompt.',
|
||||
importance=10,
|
||||
confidence=0.99,
|
||||
status='superseded',
|
||||
origin_source_types=['conversation'],
|
||||
metadata_={'source_count': 1},
|
||||
))
|
||||
await session.commit()
|
||||
|
||||
context = await memory_service.build_memory_context(
|
||||
session,
|
||||
user.id,
|
||||
conversation.id,
|
||||
'rollout note',
|
||||
)
|
||||
|
||||
assert '【知识大脑】' in context
|
||||
assert 'Active rollout note' in context
|
||||
assert 'Archived rollout note' not in context
|
||||
assert 'Superseded rollout note' not in context
|
||||
|
||||
150
docs/superpowers/plans/2026-04-03-l3-runtime-hardening-plan.md
Normal file
150
docs/superpowers/plans/2026-04-03-l3-runtime-hardening-plan.md
Normal file
@@ -0,0 +1,150 @@
|
||||
# 2026-04-03 L3 Runtime Hardening Plan
|
||||
|
||||
## Goal
|
||||
先把 Jarvis 的 L3 主链夯实,只处理 runtime / graph / tools / service integration / tests / docs 的一致性问题;暂不继续扩 unrelated feature domain。
|
||||
|
||||
## Scope
|
||||
- `backend/app/agents/graph.py`
|
||||
- `backend/app/agents/state.py`
|
||||
- `backend/app/agents/tools/__init__.py`
|
||||
- `backend/app/agents/tools/search.py`
|
||||
- `backend/app/agents/tools/schedule.py`
|
||||
- `backend/app/agents/tools/task.py`
|
||||
- `backend/app/services/agent_service.py`
|
||||
- `backend/app/services/document_service.py`
|
||||
- `backend/app/services/memory_service.py`
|
||||
- `backend/tests/backend/app/agents/test_graph*.py`
|
||||
- `backend/tests/backend/app/services/test_brain_ingestion.py`
|
||||
- related design/plan docs under `docs/superpowers/`
|
||||
|
||||
## Non-goals
|
||||
- 不在本轮新增前端页面
|
||||
- 不在 L3 未稳定前继续扩 accounting / weather / RSS 等运行时域
|
||||
- 不重做 graph 架构,只做收敛、对齐和补测试
|
||||
|
||||
## Current High-Priority Gaps
|
||||
1. **continuity / clarification schema drift**
|
||||
- graph runtime 已使用 `owning_agent` / `owning_sub_commander` / `target_action`
|
||||
- brain ingestion tests 仍大量使用旧快照字段:`active_sub_flow` / `awaiting_user_input` 等
|
||||
2. **tool execution drift**
|
||||
- `search.py` 的 `_run_async()` 在 running loop 下实现不一致
|
||||
- schedule/task canonicalization 仍存在参数映射漂移
|
||||
3. **service integration drift**
|
||||
- `agent_service` 已派生 role-scoped memory sections,但 continuity snapshot / graph runtime / persisted attachments 需要继续收口
|
||||
4. **docs drift**
|
||||
- 现有文档已记录 L3 merge progress,但缺少一份当天可执行的 hardening 追踪文档
|
||||
|
||||
## Workstreams
|
||||
|
||||
### Workstream A — Continuity Contract
|
||||
Owner: worker-1
|
||||
|
||||
Target:
|
||||
- 对齐 clarification / continuity canonical schema
|
||||
- 让 graph runtime 与 persisted snapshot 使用同一套契约,或显式兼容旧字段
|
||||
- 补针对性测试
|
||||
|
||||
Done when:
|
||||
- graph 与 ingestion tests 对 clarification/continuity 断言一致
|
||||
- stale continuity / resume-after-clarification 场景有回归覆盖
|
||||
- 文档明确列出 canonical 字段和兼容规则
|
||||
|
||||
### Workstream B — Tool Execution Path
|
||||
Owner: worker-2
|
||||
|
||||
Target:
|
||||
- 修复 search async bridge
|
||||
- 对齐 task / schedule canonicalization
|
||||
- 固定当前 L3 scope 下真实支持的 tool/fallback 规则
|
||||
|
||||
Current status:
|
||||
- 已统一 `search.py` / `schedule.py` / `task.py` 到共享 `app.agents.tools.async_bridge.run_async`,避免 running loop 下的同步桥接漂移。
|
||||
- 已收敛 graph canonicalization:`create_todo` 保留 date/todo_date 语义;仅在出现 timed task 信号时提升为 `create_schedule_task`;`create_goal` 统一落到 `goal_date`;`create_reminder` clarification 前会先标准化 `date`。
|
||||
- 已补 targeted regressions,覆盖 active event loop search path、timed todo promotion、reminder clarification date normalization。
|
||||
|
||||
Done when:
|
||||
- 相关工具测试通过
|
||||
- graph canonicalization 行为清晰且无死分支
|
||||
- 文档明确说明支持的 tool path 与 deferred domains
|
||||
|
||||
### Workstream C — Service Integration
|
||||
Owner: worker-3
|
||||
|
||||
Target:
|
||||
- 对齐 graph runtime 与 `agent_service` 入口语义
|
||||
- 收敛 continuity snapshot、role-scoped context、stream/sync 行为
|
||||
- 补接入层测试或针对性断言
|
||||
|
||||
Done when:
|
||||
- `agent_service` 与 graph 状态注入规则一致
|
||||
- continuity snapshot load/persist 行为有测试证据
|
||||
- 文档明确 graph/service 边界和责任
|
||||
|
||||
## Runtime Contract Notes
|
||||
### Clarification context
|
||||
Canonical target shape:
|
||||
- `owning_agent`
|
||||
- `owning_sub_commander`
|
||||
- `target_action`
|
||||
- `question`
|
||||
- `partial_args`
|
||||
- `missing_fields`
|
||||
- `status`
|
||||
|
||||
### Continuity state
|
||||
Current known active markers:
|
||||
- `status: fresh|stale`
|
||||
- `mode: resume_after_clarification` for clarification continuation
|
||||
- routing continuation should only survive when the new request is still semantically a continuation
|
||||
|
||||
### Tool strategy
|
||||
Current target contract:
|
||||
- native tools and JSON fallback should converge on the same normalized tool name + normalized args before execution
|
||||
- system messages should remain coalesced into one system message for OpenAI-compatible providers that reject multiple system messages
|
||||
- sync tool shims in current L3 scope must route through shared `async_bridge.run_async` instead of per-file event-loop wrappers
|
||||
|
||||
### Current L3 tool path rules
|
||||
- `librarian_retrieval` current allowlist: `search_knowledge`, `hybrid_search`, `web_search`, `get_knowledge_graph_context`
|
||||
- search-family sync wrappers must be safe under an already-running event loop
|
||||
- `create_todo` keeps day-level intent on `todo_date`; do not silently remap date-only todo requests to task due dates
|
||||
- `create_todo` upgrades to `create_schedule_task` only for timed/task-shaped payloads such as `due_time`, `due_datetime`, `start_time`, `end_time`
|
||||
- `create_goal` date aliases normalize to `goal_date`
|
||||
- `create_reminder` aliases normalize before clarification so resumed flows keep canonical partial args
|
||||
|
||||
### Explicitly deferred domains in this hardening pass
|
||||
- accounting runtime expansion
|
||||
- weather runtime expansion
|
||||
- RSS runtime expansion
|
||||
- any new tool domains outside current schedule / task / forum / knowledge L3 path
|
||||
|
||||
## Documentation Rule For This Hardening Pass
|
||||
每完成一个 workstream:
|
||||
1. 更新本文件的 status
|
||||
2. 在相关 spec/notes 中补一段“当前状态 / 已决策 / 已知边界”
|
||||
3. 再标记任务完成
|
||||
|
||||
## Status
|
||||
- [x] Hardening tracker created
|
||||
- [x] Workstream A complete
|
||||
- [x] Workstream B complete
|
||||
- [x] Workstream C complete
|
||||
- [x] Final verification pass complete
|
||||
|
||||
## Verification Checklist
|
||||
- [x] `test_graph_system_messages.py` → 8 passed
|
||||
- [x] `test_tool_async_bridge.py` + `test_task_tools.py` → 18 passed
|
||||
- [x] `test_brain_ingestion.py` full file → 40 passed
|
||||
- [x] targeted continuity persistence/rehydration checks → 3 passed
|
||||
- [x] targeted graph regressions for timed todo / reminder clarification / active event loop paths
|
||||
- [ ] broader graph suite beyond this L3 slice
|
||||
|
||||
## Final Notes
|
||||
- L3 continuity persistence now uses one canonical envelope and normalizes legacy snapshot shapes on rehydration.
|
||||
- Service/runtime integration is aligned on the canonical continuity schema rather than legacy raw snapshot persistence.
|
||||
- Tool sync shims now share one async bridge across search / schedule / task / forum paths.
|
||||
- Final verification was executed with `uv run pytest` from `backend/`, which bypassed the broken plain `python` launcher in this environment.
|
||||
- A reviewer flagged async bridge timeout/cancellation semantics as a follow-up reliability concern for mutating tools, but it is not blocking this L3 hardening pass.
|
||||
|
||||
## Next Action
|
||||
- Treat this L3 hardening slice as complete.
|
||||
- If continuing, the next best follow-up is either broader graph regression coverage or a dedicated fix for async bridge timeout/cancellation semantics.
|
||||
@@ -40,3 +40,18 @@
|
||||
- normalized_content should be persisted on documents so preview, rebuild, and future chunking can reuse the same canonical text.
|
||||
- Lightweight hierarchy should be represented in chunk metadata first, not in a new relational tree schema.
|
||||
- Current DOCX upload failure in the running environment is caused by a missing python-docx installation in the active backend environment.
|
||||
|
||||
## Additional Findings: L3 Merge Progress
|
||||
- `backend/app/agents/state.py` has been expanded to the newer L3 runtime state shape so graph/runtime code can rely on structured continuity, tool-round, retry, routing-hop, and datetime-reference fields.
|
||||
- `backend/app/agents/graph.py` no longer contains merge markers and the phantom `EXECUTOR_ACCOUNTING` branch has been removed from graph registration and routing.
|
||||
- Accounting-style prompts are currently normalized onto `AgentRole.EXECUTOR` instead of a separate executor-accounting role, which avoids dangling enum/runtime references while keeping those intents routable.
|
||||
- `backend/tests/backend/app/agents/test_graph.py` has been reconciled onto the newer L3 runtime test branch and stale `EXECUTOR_ACCOUNTING` expectations were updated to `AgentRole.EXECUTOR`.
|
||||
- Tool execution now uses a shared async bridge in `backend/app/agents/tools/async_bridge.py`, and `search.py`, `schedule.py`, `task.py`, plus `forum.py` all route synchronous tool entrypoints through that same bridge to keep runtime behavior consistent inside and outside active event loops.
|
||||
- Current task/schedule canonicalization remains intentionally narrow for L3: task aliases (`content`, `date`, legacy priorities) and reminder aliases (`datetime`, `at`, `remind_at`, `time`, timezone variants) are normalized; deferred domains such as weather/accounting-specific tool routing remain outside this stabilization slice.
|
||||
- Targeted verification now covers async bridge behavior plus task/schedule alias persistence tests; local pytest invocation still depends on resolving environment-level startup issues when the interpreter exits before running the selected files.
|
||||
- L3 runtime/service integration now persists continuity snapshots in a single canonical envelope (`kind`, `version`, `state`) on both assistant message attachments and `Conversation.agent_state`, so streaming and sync chat entrypoints rehydrate the same shape.
|
||||
- The continuity rehydration path is also tolerant of older `Conversation` rows/models that do not expose `agent_state`, falling back to assistant message attachments instead of failing before graph execution.
|
||||
- The finalized L3 continuity contract persists a canonical `agent_continuity_state` snapshot: `turn_context.active_sub_commander`, `pending_action.type|owner_agent|owner_sub_commander|status`, `clarification_context.owning_agent|owning_sub_commander|target_action|question|status`, and `continuity_state.status|mode`.
|
||||
- `backend/app/services/agent_service.py` normalizes legacy persisted snapshots (`active_sub_flow`, `agent`, `sub_flow`, `action_type`, `awaiting_user_input`, `awaiting_clarification`) into that canonical shape on both save and rehydration so older brain-ingestion records still resume correctly.
|
||||
- Edge cases: explicit new requests may keep stale continuity in memory for override-aware routing, but only `continuity_state.status == fresh` participates in active continuation; clarification resumes use `continuity_state.mode = resume_after_clarification`.
|
||||
- `memory_service.build_memory_context(...)` remains the shared retrieval join point for conversation summaries, user memory, and BrainMemory recall, while `document_service` continues emitting BrainEvent records from upload flow without changing the graph runtime contract.
|
||||
|
||||
Reference in New Issue
Block a user