""" Jarvis Agent 服务层 负责 LangGraph Agent 的调用、流式输出、对话历史管理 """ import json import uuid from datetime import datetime from typing import Any, AsyncGenerator import asyncio from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from langchain_core.messages import HumanMessage, AIMessage from langchain_openai import ChatOpenAI from langchain_anthropic import ChatAnthropic from langchain_ollama import ChatOllama import httpx from app.database import async_session from app.models.conversation import Conversation, Message from app.models.user import User from app.agents.graph import get_agent_graph from app.agents.context import set_current_user, clear_current_user from app.services import memory_service from app.services.brain_service import BrainService def _create_llm_from_config(config: dict): """根据用户模型配置创建 LLM 实例""" provider = config.get("provider", "openai") model = config.get("model", "") api_key = config.get("api_key", "") base_url = config.get("base_url", "") if provider == "openai" or provider == "deepseek" or provider == "custom": return ChatOpenAI( api_key=api_key, model=model, base_url=base_url or None, timeout=httpx.Timeout(60.0, connect=10.0), ) elif provider == "claude": return ChatAnthropic( api_key=api_key, model=model, timeout=httpx.Timeout(60.0, connect=10.0), ) elif provider == "ollama": return ChatOllama( base_url=base_url or "http://localhost:11434", model=model, timeout=httpx.Timeout(120.0, connect=10.0), ) else: # 默认使用 OpenAI return ChatOpenAI( api_key=api_key, model=model, base_url=base_url or None, timeout=httpx.Timeout(60.0, connect=10.0), ) class AgentService: """对话 Agent 服务""" def __init__(self, db: AsyncSession): self.db = db async def _try_auto_summarize_background(self, user_id: str, conversation_id: str) -> None: async with async_session() as session: await memory_service.try_auto_summarize(session, user_id, conversation_id) def _build_progress_event( self, stage: str, label: str, *, agent: str | None = None, tool_name: str | None = None, step: str | None = None, steps: list[str] | None = None, ) -> dict[str, Any]: return { "type": "progress", "stage": stage, "label": label, "agent": agent, "tool_name": tool_name, "step": step, "steps": steps or [], } async def _get_user_llm_config(self, user_id: str, model_name: str | None = None) -> dict | None: """获取用户的 LLM 模型配置""" result = await self.db.execute(select(User).where(User.id == user_id)) user = result.scalar_one_or_none() if not user or not user.llm_config: return None llm_config = user.llm_config # 如果指定了模型名称,查找对应的配置 if model_name: for model_type in ["chat", "vlm"]: models = llm_config.get(model_type, []) for m in models: if m.get("name") == model_name: return m # 没找到,返回 None 让调用方知道配置不存在 return None # 如果没指定模型名,返回默认启用的 chat 模型 chat_models = llm_config.get("chat", []) for m in chat_models: if m.get("enabled"): return m vlm_models = llm_config.get("vlm", []) for m in vlm_models: if m.get("enabled"): return m return None async def chat( self, user_id: str, message: str, conversation_id: str | None = None, file_ids: list[str] | None = None, model_name: str | None = None, ) -> tuple[str, str, AsyncGenerator[dict[str, Any], None]]: """ 处理对话请求(流式) Returns: (conversation_id, message_id, response_stream) """ # 获取或创建对话 if conversation_id: result = await self.db.execute( select(Conversation).where(Conversation.id == conversation_id) ) conv = result.scalar_one_or_none() else: conv = None if not conv: conv = Conversation(user_id=user_id, title=message[:50]) self.db.add(conv) await self.db.commit() await self.db.refresh(conv) conversation_id = conv.id else: conversation_id = conv.id # 如果有文件,读取内容作为上下文 file_context = "" if file_ids: from app.services.document_service import DocumentService doc_svc = DocumentService(self.db) for file_id in file_ids: content = await doc_svc.get_document_content(user_id, file_id) if content: file_context += f"\n\n[用户上传文件内容]\n{content}\n[/文件内容]" full_message = f"{message}\n{file_context}" if file_context else message # 存储用户消息 user_msg = Message( conversation_id=conversation_id, role="user", content=message, attachments=[{"file_ids": file_ids}] if file_ids else None, ) self.db.add(user_msg) await self.db.commit() await self.db.refresh(user_msg) brain_service = BrainService(self.db) await brain_service.create_event( user_id, source_type="conversation", source_id=conversation_id, event_type="message_created", title="User message", content_summary=message[:500], raw_excerpt=message[:2000], metadata_={"role": "user"}, importance_signal=1.0, ) await self.db.commit() # 预创建助手消息(后续更新内容) user_llm_config = await self._get_user_llm_config(user_id, model_name) model_name_used = model_name if user_llm_config: model_name_used = user_llm_config.get("name", model_name) assistant_msg = Message( conversation_id=conversation_id, role="assistant", content="", model=model_name_used or "jarvis", ) self.db.add(assistant_msg) await self.db.commit() await self.db.refresh(assistant_msg) # 加载记忆上下文 memory_ctx = await memory_service.build_memory_context( self.db, user_id, conversation_id, message ) # 调用 LangGraph Agent async def run_agent(): set_current_user(user_id) try: graph = get_agent_graph() langgraph_state = { "messages": [HumanMessage(content=full_message)], # type: ignore[arg-type] "user_id": user_id, "conversation_id": conversation_id, "current_agent": "master", "active_agents": ["master"], "current_sub_commander": None, "active_sub_commanders": [], "sub_commander_trace": [], "pending_tasks": [], "completed_tasks": [], "tool_calls": [], "last_tool_result": None, "knowledge_context": None, "graph_context": None, "plan": None, "plan_steps": [], "analysis_report": None, "final_response": None, "should_respond": True, "memory_context": memory_ctx, "user_llm_config": user_llm_config, } yield self._build_progress_event("thinking", "Jarvis 正在分析请求", agent="master", step="理解你的问题") collected = "" async for event in graph.astream_events(langgraph_state, version="v2"): kind = event.get("event") event_name = event.get("name", "") metadata = event.get("metadata", {}) data = event.get("data", {}) if kind == "on_chain_start" and event_name in {"master", "planner", "executor", "librarian", "analyst"}: stage_map = { "master": ("thinking", "Jarvis 正在理解请求"), "planner": ("planning", "Jarvis 正在拆解步骤"), "executor": ("tool", "Jarvis 正在执行操作"), "librarian": ("tool", "Jarvis 正在检索知识"), "analyst": ("thinking", "Jarvis 正在分析信息"), } stage, label = stage_map[event_name] yield self._build_progress_event(stage, label, agent=event_name, step=label) elif kind == "on_tool_start": tool_input = data.get("input") step = None if isinstance(tool_input, dict) and tool_input: step = f"调用工具 {event_name}" yield self._build_progress_event("tool", f"Jarvis 正在调用工具 {event_name}", agent="executor", tool_name=event_name, step=step) elif kind == "on_tool_end": yield self._build_progress_event("tool", f"工具 {event_name} 已完成", agent="executor", tool_name=event_name, step=f"已获得 {event_name} 结果") elif kind == "on_chain_end" and event_name == "planner": output = data.get("output") or {} plan_steps = output.get("plan_steps") or [] steps = [item.get("description", "") for item in plan_steps if item.get("description")] yield self._build_progress_event("planning", "Jarvis 已生成处理步骤", agent="planner", step=steps[0] if steps else "正在整理计划", steps=steps[:4]) elif kind == "on_chat_model_stream": chunk = data.get("chunk") content = getattr(chunk, "content", "") if chunk else "" if isinstance(content, list): text_parts = [] for item in content: if isinstance(item, dict): text_parts.append(item.get("text", "")) else: text_parts.append(str(item)) content = "".join(text_parts) if content: collected += content yield {"type": "chunk", "content": content} elif kind == "on_chat_model_end" and not collected: output = data.get("output") content = getattr(output, "content", "") if output else "" if isinstance(content, list): text_parts = [] for item in content: if isinstance(item, dict): text_parts.append(item.get("text", "")) else: text_parts.append(str(item)) content = "".join(text_parts) if content: collected = content yield {"type": "chunk", "content": content} elif kind == "on_chain_end" and event_name in {"executor", "librarian", "analyst"}: yield self._build_progress_event("responding", "Jarvis 正在整理最终回答", agent=event_name, step="生成回复") except Exception as e: fallback = f"抱歉,发生错误: {str(e)}" collected = fallback yield {"type": "error", "error": str(e)} yield {"type": "chunk", "content": fallback} finally: clear_current_user() try: asyncio.get_running_loop().create_task( self._try_auto_summarize_background(user_id, conversation_id) ) except Exception: pass # 最终更新数据库中的消息内容 if collected: try: result2 = await self.db.execute( select(Message).where(Message.id == assistant_msg.id) ) msg = result2.scalar_one_or_none() if msg: msg.content = collected await self.db.commit() await brain_service.create_event( user_id, source_type="conversation", source_id=conversation_id, event_type="message_created", title="Assistant message", content_summary=collected[:500], raw_excerpt=collected[:2000], metadata_={"role": "assistant"}, importance_signal=1.0, ) await self.db.commit() except Exception: pass return conversation_id, assistant_msg.id, run_agent() async def chat_simple( self, user_id: str, message: str, conversation_id: str | None = None, file_ids: list[str] | None = None, model_name: str | None = None, ) -> tuple[str, str, str, str | None]: """ 简单同步版对话(无流式) Returns: (conversation_id, message_id, response_content, model_name_used) """ # 获取或创建对话 if conversation_id: result = await self.db.execute( select(Conversation).where(Conversation.id == conversation_id) ) conv = result.scalar_one_or_none() else: conv = None if not conv: conv = Conversation(user_id=user_id, title=message[:50]) self.db.add(conv) await self.db.commit() await self.db.refresh(conv) conversation_id = conv.id else: conversation_id = conv.id # 如果有文件,读取内容作为上下文 file_context = "" if file_ids: from app.services.document_service import DocumentService doc_svc = DocumentService(self.db) for file_id in file_ids: content = await doc_svc.get_document_content(user_id, file_id) if content: file_context += f"\n\n[用户上传文件内容]\n{content}\n[/文件内容]" # 将文件上下文添加到消息 full_message = f"{message}\n{file_context}" if file_context else message # 存储用户消息 user_msg = Message( conversation_id=conversation_id, role="user", content=message, attachments=[{"file_ids": file_ids}] if file_ids else None, ) self.db.add(user_msg) await self.db.commit() await self.db.refresh(user_msg) brain_service = BrainService(self.db) await brain_service.create_event( user_id, source_type="conversation", source_id=conversation_id, event_type="message_created", title="User message", content_summary=message[:500], raw_excerpt=message[:2000], metadata_={"role": "user"}, importance_signal=1.0, ) await self.db.commit() # 加载记忆上下文 memory_ctx = await memory_service.build_memory_context( self.db, user_id, conversation_id, message ) # 获取用户配置的 LLM user_llm_config = await self._get_user_llm_config(user_id, model_name) model_name_used = model_name if user_llm_config: model_name_used = user_llm_config.get("name", model_name) # 调用 LangGraph Agent set_current_user(user_id) graph = get_agent_graph() langgraph_state = { "messages": [HumanMessage(content=full_message)], # type: ignore[arg-type] "user_id": user_id, "conversation_id": conversation_id, "current_agent": "master", "active_agents": ["master"], "pending_tasks": [], "completed_tasks": [], "tool_calls": [], "last_tool_result": None, "knowledge_context": None, "graph_context": None, "plan": None, "plan_steps": [], "analysis_report": None, "final_response": None, "should_respond": True, "memory_context": memory_ctx, "user_llm_config": user_llm_config, # 传递用户 LLM 配置 } try: result_state = await graph.ainvoke(langgraph_state) response_content = result_state.get("final_response", "抱歉,我无法处理这个请求。") except Exception as e: response_content = f"抱歉,发生错误: {str(e)}" finally: clear_current_user() try: asyncio.get_running_loop().create_task( self._try_auto_summarize_background(user_id, conversation_id) ) except Exception: pass # 保存助手消息 assistant_msg = Message( conversation_id=conversation_id, role="assistant", content=response_content, model=model_name_used or "jarvis", ) self.db.add(assistant_msg) await self.db.commit() await self.db.refresh(assistant_msg) await brain_service.create_event( user_id, source_type="conversation", source_id=conversation_id, event_type="message_created", title="Assistant message", content_summary=response_content[:500], raw_excerpt=response_content[:2000], metadata_={"role": "assistant"}, importance_signal=1.0, ) await self.db.commit() return conversation_id, assistant_msg.id, response_content, model_name_used