# 智能体系统实现计划 ## 项目概述 设计并实现一个支持单智能体独立工作 + 多智能体协作的混合型智能体系统,具备长短时记忆、多种技能调用能力。 ## 技术架构 ``` ┌─────────────────────────────────────────────────────────────────┐ │ 用户请求入口 │ └─────────────────────────────┬───────────────────────────────────┘ ↓ ┌─────────────────────────────────────────────────────────────────┐ │ API Gateway (Go) │ │ /api/v1/agents/:id/chat │ └─────────────────────────────┬───────────────────────────────────┘ ↓ ┌─────────────────────────────────────────────────────────────────┐ │ Agent Engine (Python) │ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │ │ Agent Core │ │ Supervisor │ │ Memory │ │ │ │ (单智能体) │ │ (多智能体) │ │ (长短时) │ │ │ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │ │ │ │ │ │ │ └────────┬────────┴──────────────────┘ │ │ ↓ │ │ ┌──────────────────────────────────────────────────────┐ │ │ │ Skill Router (技能路由器) │ │ │ └──────────────────────────┬───────────────────────────┘ │ │ ↓ │ │ ┌──────────────────────────────────────────────────────┐ │ │ │ Skill Executor (执行器) │ │ │ └──────────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────────┘ ``` ## 1. 数据库设计 ### 1.1 新增表结构 ```sql -- 智能体配置表 (agents) CREATE TABLE agents ( id BIGINT PRIMARY KEY AUTO_INCREMENT, name VARCHAR(255) NOT NULL, role_description TEXT, model_provider VARCHAR(50), model_name VARCHAR(100), status VARCHAR(20) DEFAULT 'active', created_at DATETIME DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP ); -- 技能绑定表 CREATE TABLE agent_skills ( id BIGINT PRIMARY KEY AUTO_INCREMENT, agent_id BIGINT NOT NULL, skill_id BIGINT NOT NULL, skill_config JSON, FOREIGN KEY (agent_id) REFERENCES agents(id), FOREIGN KEY (skill_id) REFERENCES skills(id) ); -- 知识库绑定表 CREATE TABLE agent_knowledge_bases ( id BIGINT PRIMARY KEY AUTO_INCREMENT, agent_id BIGINT NOT NULL, knowledge_base_id BIGINT NOT NULL, FOREIGN KEY (agent_id) REFERENCES agents(id), FOREIGN KEY (knowledge_base_id) REFERENCES knowledge_bases(id) ); -- 长期记忆表 CREATE TABLE agent_memories ( id BIGINT PRIMARY KEY AUTO_INCREMENT, agent_id BIGINT NOT NULL, user_id BIGINT, content TEXT NOT NULL, embedding VECTOR(1536), memory_type VARCHAR(20), importance INT DEFAULT 5, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (agent_id) REFERENCES agents(id) ); -- 会话记忆 (Redis) -- Key: session:{agent_id}:{user_id}:{session_id} -- Value: JSON {"messages": [...], "summary": "..."} -- 任务记录表 CREATE TABLE agent_tasks ( id BIGINT PRIMARY KEY AUTO_INCREMENT, agent_id BIGINT NOT NULL, user_id BIGINT NOT NULL, user_input TEXT NOT NULL, agent_response TEXT, status VARCHAR(20), tokens_used INT DEFAULT 0, duration_ms INT, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, completed_at DATETIME, FOREIGN KEY (agent_id) REFERENCES agents(id) ); -- 多智能体协作配置表 CREATE TABLE agent_teams ( id BIGINT PRIMARY KEY AUTO_INCREMENT, supervisor_agent_id BIGINT NOT NULL, member_agent_id BIGINT NOT NULL, dispatch_strategy VARCHAR(20) DEFAULT 'parallel', FOREIGN KEY (supervisor_agent_id) REFERENCES agents(id), FOREIGN KEY (member_agent_id) REFERENCES agents(id) ); ``` ## 2. 后端实现 (Python Agent Engine) ### 2.1 项目结构 ``` agent/ ├── app/ │ ├── agent/ │ │ ├── __init__.py │ │ ├── core/ │ │ │ ├── __init__.py │ │ │ ├── agent.py # AgentCore 单智能体核心 │ │ │ ├── supervisor.py # Supervisor 多智能体调度 │ │ │ └── config.py # Agent 配置模型 │ │ ├── skills/ │ │ │ ├── __init__.py │ │ │ ├── router.py # 技能路由器 │ │ │ ├── executor.py # 技能执行器 │ │ │ └── registry.py # 技能注册表 │ │ ├── memory/ │ │ │ ├── __init__.py │ │ │ ├── manager.py # 记忆管理器 │ │ │ ├── working.py # Working Memory │ │ │ ├── session.py # Session Memory (Redis) │ │ │ └── persistent.py # Persistent Memory (向量库) │ │ ├── llm/ │ │ │ ├── __init__.py │ │ │ ├── base.py # LLM 抽象基类 │ │ │ ├── openai.py # OpenAI 实现 │ │ │ └── anthropic.py # Anthropic 实现 │ │ └── tools/ │ │ ├── __init__.py │ │ └── registry.py # 工具注册表 (复用现有) │ ├── api/ │ │ ├── __init__.py │ │ └── routes/ │ │ ├── __init__.py │ │ └── agent.py # Agent API 路由 │ └── main.py ├── requirements.txt └── config.yaml ``` ### 2.2 核心代码设计 #### AgentCore (单智能体核心) ```python # agent/app/agent/core/agent.py from typing import Optional, List from pydantic import BaseModel from app.agent.memory.manager import MemoryManager from app.agent.skills.router import SkillRouter from app.agent.skills.executor import SkillExecutor from app.agent.llm.base import LLMBase class AgentConfig(BaseModel): id: int name: str role_description: str model_provider: str model_name: str skills: List[int] # 技能 ID 列表 knowledge_base_ids: List[int] = [] class AgentResponse(BaseModel): content: str tool_calls: List[dict] = [] tokens_used: int = 0 duration_ms: int = 0 class AgentCore: def __init__(self, config: AgentConfig, llm: LLMBase): self.config = config self.llm = llm self.memory = MemoryManager(config.id) self.skill_router = SkillRouter(config.skills) self.skill_executor = SkillExecutor() async def run(self, user_input: str, user_id: int) -> AgentResponse: start_time = time.time() # 1. 加载记忆 context = await self.memory.load_context(user_input, user_id) # 2. 构建 Prompt prompt = self._build_prompt(user_input, context) # 3. LLM 决策 decision = await self.llm.decide(prompt) # 4. 执行技能(如需) if decision.needs_skill: skill_results = await self._execute_skills(decision.skills) # 5. 基于结果生成回复 final_response = await self.llm.generate(prompt, skill_results) else: final_response = decision.response # 6. 保存记忆 await self.memory.save(user_input, final_response) duration_ms = int((time.time() - start_time) * 1000) return AgentResponse( content=final_response, tool_calls=decision.tool_calls, duration_ms=duration_ms ) def _build_prompt(self, user_input: str, context: dict) -> str: system_prompt = f"""你是 {self.config.name}。 {self.config.role_description} 相关记忆: {context.get('summary', '')} """ return f"{system_prompt}\n\n用户: {user_input}" async def _execute_skills(self, skill_decisions: List[dict]) -> List[dict]: results = [] for decision in skill_decisions: result = await self.skill_executor.execute( skill_id=decision['skill_id'], params=decision['params'] ) results.append(result) return results ``` #### Supervisor (多智能体调度) ```python # agent/app/agent/core/supervisor.py from typing import List from app.agent.core.agent import AgentCore, AgentConfig class Supervisor: def __init__(self, supervisor_agent: AgentCore, members: List[AgentCore], strategy: str = "parallel"): self.supervisor = supervisor_agent self.members = members self.strategy = strategy async def run(self, task: str, user_id: int) -> dict: # 1. 任务分解 (调用 Supervisor 的 LLM) subtasks = await self._decompose_task(task) # 2. 分配任务 if self.strategy == "parallel": results = await self._dispatch_parallel(subtasks, user_id) else: results = await self._dispatch_sequential(subtasks, user_id) # 3. 汇总结果 final_result = await self._aggregate(results) return { "main_response": final_result, "subtask_results": results } async def _decompose_task(self, task: str) -> List[dict]: # 调用 LLM 分解任务 prompt = f"""分解以下任务为子任务,返回 JSON 数组: 任务: {task} 格式: [{"task": "子任务描述", "agent_type": "适合的智能体类型"}]""" # ... 实现 return subtasks async def _dispatch_parallel(self, subtasks: List[dict], user_id: int) -> List[dict]: tasks = [] for subtask, member in zip(subtasks, self.members): tasks.append(member.run(subtask['task'], user_id)) return await asyncio.gather(*tasks) async def _dispatch_sequential(self, subtasks: List[dict], user_id: int) -> List[dict]: results = [] context = "" for subtask in subtasks: # 传递前一个结果作为上下文 enhanced_task = f"{context}\n\n当前任务: {subtask['task']}" result = await self.members[self.members.index(subtask['agent'])].run(enhanced_task, user_id) results.append(result) context += f"\n{result.content}" return results async def _aggregate(self, results: List[dict]) -> str: # 汇总所有子任务结果 prompt = "汇总以下结果:\n" + "\n---\n".join([r['content'] for r in results]) return await self.supervisor.llm.generate(prompt, []) ``` #### Memory Manager (记忆管理) ```python # agent/app/agent/memory/manager.py from app.agent.memory.working import WorkingMemory from app.agent.memory.session import SessionMemory from app.agent.memory.persistent import PersistentMemory class MemoryManager: def __init__(self, agent_id: int): self.agent_id = agent_id self.working = WorkingMemory() self.session = SessionMemory(agent_id) self.persistent = PersistentMemory(agent_id) async def load_context(self, query: str, user_id: int, session_id: str) -> dict: # 1. Working Memory (内存,最快) working_context = self.working.get() # 2. Session Memory (Redis) session_context = await self.session.get_summary(user_id, session_id) # 3. Persistent Memory (向量库) - 按需检索 persistent_context = await self.persistent.search(query, user_id, top_k=3) return { 'working': working_context, 'session': session_context, 'persistent': persistent_context, 'summary': self._build_summary(session_context, persistent_context) } async def save(self, user_input: str, response: str, user_id: int, session_id: str): # 1. 写入 Working self.working.add(user_input, response) # 2. 写入 Session (定期摘要) await self.session.add(user_input, response, user_id, session_id) # 3. 提取关键信息写入 Persistent (定期) if self._should_persist(): await self._extract_and_persist(user_input, response, user_id) def _should_persist(self) -> bool: # 每 N 条对话或达到阈值时持久化 return self.working.size() >= 5 async def _extract_and_persist(self, user_input: str, response: str, user_id: int): # 提取关键信息(可以用 LLM 或规则) key_points = self._extract_key_points(user_input, response) for point in key_points: await self.persistent.add(point, user_id, memory_type="experience") ``` ```python # agent/app/agent/memory/working.py class WorkingMemory: """当前任务上下文,内存级存储""" def __init__(self): self.current_task = None self.recent_messages = [] self.max_size = 10 def get(self) -> dict: return { 'current_task': self.current_task, 'recent_messages': self.recent_messages[-self.max_size:] } def add(self, user_input: str, response: str): self.recent_messages.append({ 'role': 'user', 'content': user_input }) self.recent_messages.append({ 'role': 'assistant', 'content': response }) # 保持固定大小 if len(self.recent_messages) > self.max_size * 2: self.recent_messages = self.recent_messages[-self.max_size:] def size(self) -> int: return len(self.recent_messages) // 2 ``` ```python # agent/app/agent/memory/session.py import redis.asyncio as redis import json class SessionMemory: """会话级记忆,Redis 存储""" def __init__(self, agent_id: int, redis_client: redis.Redis): self.agent_id = agent_id self.redis = redis_client self.ttl = 3600 * 24 # 24 小时 def _key(self, user_id: int, session_id: str) -> str: return f"agent:memory:session:{self.agent_id}:{user_id}:{session_id}" async def add(self, user_input: str, response: str, user_id: int, session_id: str): key = self._key(user_id, session_id) # 获取现有数据 data = await self.redis.get(key) messages = json.loads(data) if data else {"messages": [], "summary": ""} # 添加新消息 messages["messages"].append({"role": "user", "content": user_input}) messages["messages"].append({"role": "assistant", "content": response}) # 定期生成摘要 if len(messages["messages"]) >= 10: messages["summary"] = await self._generate_summary(messages["messages"]) await self.redis.setex(key, self.ttl, json.dumps(messages)) async def get_summary(self, user_id: int, session_id: str) -> str: key = self._key(user_id, session_id) data = await self.redis.get(key) if data: messages = json.loads(data) return messages.get("summary", "") return "" async def _generate_summary(self, messages: List[dict]) -> str: # 使用 LLM 生成摘要 # ... return summary ``` ```python # agent/app/agent/memory/persistent.py from typing import List class PersistentMemory: """长期记忆,向量存储""" def __init__(self, agent_id: int): self.agent_id = agent_id self.vector_store = None # 初始化向量库客户端 async def add(self, content: str, user_id: int, memory_type: str = "experience"): # 生成向量 embedding = await self._get_embedding(content) # 存储到数据库 await db.agent_memories.create( agent_id=self.agent_id, user_id=user_id, content=content, embedding=embedding, memory_type=memory_type ) async def search(self, query: str, user_id: int, top_k: int = 3) -> List[str]: # 生成查询向量 query_embedding = await self._get_embedding(query) # 向量相似度搜索 results = await db.agent_memories.search( agent_id=self.agent_id, user_id=user_id, embedding=query_embedding, top_k=top_k ) return [r.content for r in results] async def _get_embedding(self, text: str) -> List[float]: # 调用 embedding 模型 # ... pass ``` #### Skill Router (技能路由器) ```python # agent/app/agent/skills/router.py from typing import List, Dict class SkillRouter: """根据 LLM 决策选择要调用的技能""" def __init__(self, available_skills: List[int]): self.available_skills = available_skills async def route(self, llm_decision: dict) -> List[dict]: """解析 LLM 的技能调用决策""" if not llm_decision.get('tool_calls'): return [] routes = [] for tool_call in llm_decision['tool_calls']: skill_id = tool_call['skill_id'] # 检查技能是否可用 if skill_id not in self.available_skills: continue routes.append({ 'skill_id': skill_id, 'params': tool_call.get('parameters', {}), 'reason': tool_call.get('reason', '') }) return routes ``` #### Skill Executor (技能执行器) ```python # agent/app/agent/skills/executor.py import asyncio class SkillExecutor: """技能执行器,支持并发/串行执行""" def __init__(self): self.skill_registry = None # 技能注册表 async def execute(self, skill_id: int, params: dict) -> dict: """执行单个技能""" skill = self.skill_registry.get(skill_id) if not skill: return {"error": f"Skill {skill_id} not found"} try: result = await skill.execute(**params) return {"success": True, "result": result} except Exception as e: return {"success": False, "error": str(e)} async def execute_multiple(self, skills: List[dict], strategy: str = "parallel") -> List[dict]: """批量执行技能""" if strategy == "parallel": tasks = [self.execute(s['skill_id'], s['params']) for s in skills] return await asyncio.gather(*tasks, return_exceptions=True) else: results = [] for s in skills: result = await self.execute(s['skill_id'], s['params']) results.append(result) return results ``` ## 3. API 接口设计 ### 3.1 新增接口 | 方法 | 路径 | 描述 | |------|------|------| | POST | /api/v1/agents/:id/chat | 单智能体对话 | | POST | /api/v1/agents/:id/chat/stream | 单智能体流式对话 | | POST | /api/v1/teams/:id/chat | 多智能体群聊 | | GET | /api/v1/agents/:id/memories | 获取记忆 | | DELETE | /api/v1/agents/:id/memories/:memory_id | 删除记忆 | | GET | /api/v1/agents/:id/history | 获取对话历史 | ### 3.2 接口请求/响应示例 ```json // POST /api/v1/agents/1/chat // Request { "user_id": 123, "message": "帮我分析销售数据", "session_id": "optional-session-id" } // Response { "agent_id": 1, "response": "根据分析,今天销售额为...", "tool_calls": [ {"skill": "query_database", "params": {"sql": "SELECT ..."}} ], "tokens_used": 1500, "duration_ms": 2000 } ``` ## 4. 实现步骤 ### Phase 1: 数据库设计与迁移 - [ ] 创建数据库迁移脚本 - [ ] 新增 agents, agent_skills, agent_memories, agent_teams 等表 ### Phase 2: 后端 Agent Engine 核心 - [ ] 实现 AgentCore 单智能体核心类 - [ ] 实现 LLM 适配器 (OpenAI/Anthropic) - [ ] 实现 Prompt 构建逻辑 ### Phase 3: 记忆系统实现 - [ ] 实现 WorkingMemory (内存) - [ ] 实现 SessionMemory (Redis) - [ ] 实现 PersistentMemory (向量库) - [ ] 实现 MemoryManager 统一接口 ### Phase 4: 技能路由与执行器 - [ ] 实现 SkillRouter - [ ] 实现 SkillExecutor - [ ] 对接现有技能注册表 ### Phase 5: 多智能体 Supervisor - [ ] 实现 Supervisor 调度器 - [ ] 实现任务分解逻辑 - [ ] 实现结果聚合 ### Phase 6: API 接口对接 - [ ] 新增 Agent API 路由 - [ ] 实现 /chat, /chat/stream 等接口 - [ ] 对接 Go API Gateway ### Phase 7: 前端页面集成 - [ ] 智能体详情页增加对话功能 - [ ] 记忆管理页面 - [ ] 多智能体协作配置页面 ### Phase 8: 测试与优化 - [ ] 单元测试 - [ ] 集成测试 - [ ] 性能优化 ## 5. 里程碑 | 里程碑 | 预计时间 | 交付物 | |--------|----------|--------| | M1: 基础骨架 | 1 周 | 数据库 + AgentCore 基础 | | M2: 记忆系统 | 1 周 | 三层记忆实现 | | M3: 技能调用 | 1 周 | Router + Executor | | M4: 多智能体 | 1 周 | Supervisor 实现 | | M5: API 对接 | 1 周 | 完整 API | | M6: 前端集成 | 1 周 | 页面功能 | ## 6. 风险与对策 | 风险 | 影响 | 对策 | |------|------|------| | LLM API 不稳定 | 功能不可用 | 重试机制 + 降级 | | 向量库性能 | 检索慢 | 缓存 + 限流 | | Token 成本超支 | 费用上涨 | 记忆压缩 + 按需加载 | | 多智能体通信 | 延迟增加 | 超时控制 + 并行优化 |