Files
JARVIS/backend/app/services/agent_service.py
DESKTOP-72TV0V4\caoxiaozhu d2447ee635 Add brain memory services and APIs
Introduce the backend pieces for brain memory ingestion, routing, and
system telemetry so the new knowledge workflows can project data into a
brain view. The supporting tests lock in the new behavior and keep the
expanded backend surface stable.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-22 13:47:34 +08:00

490 lines
19 KiB
Python

"""
Jarvis Agent 服务层
负责 LangGraph Agent 的调用、流式输出、对话历史管理
"""
import json
import uuid
from datetime import datetime
from typing import Any, AsyncGenerator
import asyncio
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from langchain_core.messages import HumanMessage, AIMessage
from langchain_openai import ChatOpenAI
from langchain_anthropic import ChatAnthropic
from langchain_ollama import ChatOllama
import httpx
from app.database import async_session
from app.models.conversation import Conversation, Message
from app.models.user import User
from app.agents.graph import get_agent_graph
from app.agents.context import set_current_user, clear_current_user
from app.services import memory_service
from app.services.brain_service import BrainService
def _create_llm_from_config(config: dict):
"""根据用户模型配置创建 LLM 实例"""
provider = config.get("provider", "openai")
model = config.get("model", "")
api_key = config.get("api_key", "")
base_url = config.get("base_url", "")
if provider == "openai" or provider == "deepseek" or provider == "custom":
return ChatOpenAI(
api_key=api_key,
model=model,
base_url=base_url or None,
timeout=httpx.Timeout(60.0, connect=10.0),
)
elif provider == "claude":
return ChatAnthropic(
api_key=api_key,
model=model,
timeout=httpx.Timeout(60.0, connect=10.0),
)
elif provider == "ollama":
return ChatOllama(
base_url=base_url or "http://localhost:11434",
model=model,
timeout=httpx.Timeout(120.0, connect=10.0),
)
else:
# 默认使用 OpenAI
return ChatOpenAI(
api_key=api_key,
model=model,
base_url=base_url or None,
timeout=httpx.Timeout(60.0, connect=10.0),
)
class AgentService:
"""对话 Agent 服务"""
def __init__(self, db: AsyncSession):
self.db = db
async def _try_auto_summarize_background(self, user_id: str, conversation_id: str) -> None:
async with async_session() as session:
await memory_service.try_auto_summarize(session, user_id, conversation_id)
def _build_progress_event(
self,
stage: str,
label: str,
*,
agent: str | None = None,
tool_name: str | None = None,
step: str | None = None,
steps: list[str] | None = None,
) -> dict[str, Any]:
return {
"type": "progress",
"stage": stage,
"label": label,
"agent": agent,
"tool_name": tool_name,
"step": step,
"steps": steps or [],
}
async def _get_user_llm_config(self, user_id: str, model_name: str | None = None) -> dict | None:
"""获取用户的 LLM 模型配置"""
result = await self.db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user or not user.llm_config:
return None
llm_config = user.llm_config
# 如果指定了模型名称,查找对应的配置
if model_name:
for model_type in ["chat", "vlm"]:
models = llm_config.get(model_type, [])
for m in models:
if m.get("name") == model_name:
return m
# 没找到,返回 None 让调用方知道配置不存在
return None
# 如果没指定模型名,返回默认启用的 chat 模型
chat_models = llm_config.get("chat", [])
for m in chat_models:
if m.get("enabled"):
return m
vlm_models = llm_config.get("vlm", [])
for m in vlm_models:
if m.get("enabled"):
return m
return None
async def chat(
self,
user_id: str,
message: str,
conversation_id: str | None = None,
file_ids: list[str] | None = None,
model_name: str | None = None,
) -> tuple[str, str, AsyncGenerator[dict[str, Any], None]]:
"""
处理对话请求(流式)
Returns:
(conversation_id, message_id, response_stream)
"""
# 获取或创建对话
if conversation_id:
result = await self.db.execute(
select(Conversation).where(Conversation.id == conversation_id)
)
conv = result.scalar_one_or_none()
else:
conv = None
if not conv:
conv = Conversation(user_id=user_id, title=message[:50])
self.db.add(conv)
await self.db.commit()
await self.db.refresh(conv)
conversation_id = conv.id
else:
conversation_id = conv.id
# 如果有文件,读取内容作为上下文
file_context = ""
if file_ids:
from app.services.document_service import DocumentService
doc_svc = DocumentService(self.db)
for file_id in file_ids:
content = await doc_svc.get_document_content(user_id, file_id)
if content:
file_context += f"\n\n[用户上传文件内容]\n{content}\n[/文件内容]"
full_message = f"{message}\n{file_context}" if file_context else message
# 存储用户消息
user_msg = Message(
conversation_id=conversation_id,
role="user",
content=message,
attachments=[{"file_ids": file_ids}] if file_ids else None,
)
self.db.add(user_msg)
await self.db.commit()
await self.db.refresh(user_msg)
brain_service = BrainService(self.db)
await brain_service.create_event(
user_id,
source_type="conversation",
source_id=conversation_id,
event_type="message_created",
title="User message",
content_summary=message[:500],
raw_excerpt=message[:2000],
metadata_={"role": "user"},
importance_signal=1.0,
)
await self.db.commit()
# 预创建助手消息(后续更新内容)
user_llm_config = await self._get_user_llm_config(user_id, model_name)
model_name_used = model_name
if user_llm_config:
model_name_used = user_llm_config.get("name", model_name)
assistant_msg = Message(
conversation_id=conversation_id,
role="assistant",
content="",
model=model_name_used or "jarvis",
)
self.db.add(assistant_msg)
await self.db.commit()
await self.db.refresh(assistant_msg)
# 加载记忆上下文
memory_ctx = await memory_service.build_memory_context(
self.db, user_id, conversation_id, message
)
# 调用 LangGraph Agent
async def run_agent():
set_current_user(user_id)
try:
graph = get_agent_graph()
langgraph_state = {
"messages": [HumanMessage(content=full_message)], # type: ignore[arg-type]
"user_id": user_id,
"conversation_id": conversation_id,
"current_agent": "master",
"active_agents": ["master"],
"pending_tasks": [],
"completed_tasks": [],
"tool_calls": [],
"last_tool_result": None,
"knowledge_context": None,
"graph_context": None,
"plan": None,
"plan_steps": [],
"analysis_report": None,
"final_response": None,
"should_respond": True,
"memory_context": memory_ctx,
"user_llm_config": user_llm_config,
}
yield self._build_progress_event("thinking", "Jarvis 正在分析请求", agent="master", step="理解你的问题")
collected = ""
async for event in graph.astream_events(langgraph_state, version="v2"):
kind = event.get("event")
event_name = event.get("name", "")
metadata = event.get("metadata", {})
data = event.get("data", {})
if kind == "on_chain_start" and event_name in {"master", "planner", "executor", "librarian", "analyst"}:
stage_map = {
"master": ("thinking", "Jarvis 正在理解请求"),
"planner": ("planning", "Jarvis 正在拆解步骤"),
"executor": ("tool", "Jarvis 正在执行操作"),
"librarian": ("tool", "Jarvis 正在检索知识"),
"analyst": ("thinking", "Jarvis 正在分析信息"),
}
stage, label = stage_map[event_name]
yield self._build_progress_event(stage, label, agent=event_name, step=label)
elif kind == "on_tool_start":
tool_input = data.get("input")
step = None
if isinstance(tool_input, dict) and tool_input:
step = f"调用工具 {event_name}"
yield self._build_progress_event("tool", f"Jarvis 正在调用工具 {event_name}", agent="executor", tool_name=event_name, step=step)
elif kind == "on_tool_end":
yield self._build_progress_event("tool", f"工具 {event_name} 已完成", agent="executor", tool_name=event_name, step=f"已获得 {event_name} 结果")
elif kind == "on_chain_end" and event_name == "planner":
output = data.get("output") or {}
plan_steps = output.get("plan_steps") or []
steps = [item.get("description", "") for item in plan_steps if item.get("description")]
yield self._build_progress_event("planning", "Jarvis 已生成处理步骤", agent="planner", step=steps[0] if steps else "正在整理计划", steps=steps[:4])
elif kind == "on_chat_model_stream":
chunk = data.get("chunk")
content = getattr(chunk, "content", "") if chunk else ""
if isinstance(content, list):
text_parts = []
for item in content:
if isinstance(item, dict):
text_parts.append(item.get("text", ""))
else:
text_parts.append(str(item))
content = "".join(text_parts)
if content:
collected += content
yield {"type": "chunk", "content": content}
elif kind == "on_chat_model_end" and not collected:
output = data.get("output")
content = getattr(output, "content", "") if output else ""
if isinstance(content, list):
text_parts = []
for item in content:
if isinstance(item, dict):
text_parts.append(item.get("text", ""))
else:
text_parts.append(str(item))
content = "".join(text_parts)
if content:
collected = content
yield {"type": "chunk", "content": content}
elif kind == "on_chain_end" and event_name in {"executor", "librarian", "analyst"}:
yield self._build_progress_event("responding", "Jarvis 正在整理最终回答", agent=event_name, step="生成回复")
except Exception as e:
fallback = f"抱歉,发生错误: {str(e)}"
collected = fallback
yield {"type": "error", "error": str(e)}
yield {"type": "chunk", "content": fallback}
finally:
clear_current_user()
try:
asyncio.get_running_loop().create_task(
self._try_auto_summarize_background(user_id, conversation_id)
)
except Exception:
pass
# 最终更新数据库中的消息内容
if collected:
try:
result2 = await self.db.execute(
select(Message).where(Message.id == assistant_msg.id)
)
msg = result2.scalar_one_or_none()
if msg:
msg.content = collected
await self.db.commit()
await brain_service.create_event(
user_id,
source_type="conversation",
source_id=conversation_id,
event_type="message_created",
title="Assistant message",
content_summary=collected[:500],
raw_excerpt=collected[:2000],
metadata_={"role": "assistant"},
importance_signal=1.0,
)
await self.db.commit()
except Exception:
pass
return conversation_id, assistant_msg.id, run_agent()
async def chat_simple(
self,
user_id: str,
message: str,
conversation_id: str | None = None,
file_ids: list[str] | None = None,
model_name: str | None = None,
) -> tuple[str, str, str, str | None]:
"""
简单同步版对话(无流式)
Returns:
(conversation_id, message_id, response_content, model_name_used)
"""
# 获取或创建对话
if conversation_id:
result = await self.db.execute(
select(Conversation).where(Conversation.id == conversation_id)
)
conv = result.scalar_one_or_none()
else:
conv = None
if not conv:
conv = Conversation(user_id=user_id, title=message[:50])
self.db.add(conv)
await self.db.commit()
await self.db.refresh(conv)
conversation_id = conv.id
else:
conversation_id = conv.id
# 如果有文件,读取内容作为上下文
file_context = ""
if file_ids:
from app.services.document_service import DocumentService
doc_svc = DocumentService(self.db)
for file_id in file_ids:
content = await doc_svc.get_document_content(user_id, file_id)
if content:
file_context += f"\n\n[用户上传文件内容]\n{content}\n[/文件内容]"
# 将文件上下文添加到消息
full_message = f"{message}\n{file_context}" if file_context else message
# 存储用户消息
user_msg = Message(
conversation_id=conversation_id,
role="user",
content=message,
attachments=[{"file_ids": file_ids}] if file_ids else None,
)
self.db.add(user_msg)
await self.db.commit()
await self.db.refresh(user_msg)
brain_service = BrainService(self.db)
await brain_service.create_event(
user_id,
source_type="conversation",
source_id=conversation_id,
event_type="message_created",
title="User message",
content_summary=message[:500],
raw_excerpt=message[:2000],
metadata_={"role": "user"},
importance_signal=1.0,
)
await self.db.commit()
# 加载记忆上下文
memory_ctx = await memory_service.build_memory_context(
self.db, user_id, conversation_id, message
)
# 获取用户配置的 LLM
user_llm_config = await self._get_user_llm_config(user_id, model_name)
model_name_used = model_name
if user_llm_config:
model_name_used = user_llm_config.get("name", model_name)
# 调用 LangGraph Agent
set_current_user(user_id)
graph = get_agent_graph()
langgraph_state = {
"messages": [HumanMessage(content=full_message)], # type: ignore[arg-type]
"user_id": user_id,
"conversation_id": conversation_id,
"current_agent": "master",
"active_agents": ["master"],
"pending_tasks": [],
"completed_tasks": [],
"tool_calls": [],
"last_tool_result": None,
"knowledge_context": None,
"graph_context": None,
"plan": None,
"plan_steps": [],
"analysis_report": None,
"final_response": None,
"should_respond": True,
"memory_context": memory_ctx,
"user_llm_config": user_llm_config, # 传递用户 LLM 配置
}
try:
result_state = await graph.ainvoke(langgraph_state)
response_content = result_state.get("final_response", "抱歉,我无法处理这个请求。")
except Exception as e:
response_content = f"抱歉,发生错误: {str(e)}"
finally:
clear_current_user()
try:
asyncio.get_running_loop().create_task(
self._try_auto_summarize_background(user_id, conversation_id)
)
except Exception:
pass
# 保存助手消息
assistant_msg = Message(
conversation_id=conversation_id,
role="assistant",
content=response_content,
model=model_name_used or "jarvis",
)
self.db.add(assistant_msg)
await self.db.commit()
await self.db.refresh(assistant_msg)
await brain_service.create_event(
user_id,
source_type="conversation",
source_id=conversation_id,
event_type="message_created",
title="Assistant message",
content_summary=response_content[:500],
raw_excerpt=response_content[:2000],
metadata_={"role": "assistant"},
importance_signal=1.0,
)
await self.db.commit()
return conversation_id, assistant_msg.id, response_content, model_name_used