Files
X-Agents/plan/agent-implementation-plan.md
2026-03-11 16:26:22 +08:00

656 lines
23 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# 智能体系统实现计划
## 项目概述
设计并实现一个支持单智能体独立工作 + 多智能体协作的混合型智能体系统,具备长短时记忆、多种技能调用能力。
## 技术架构
```
┌─────────────────────────────────────────────────────────────────┐
│ 用户请求入口 │
└─────────────────────────────┬───────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ API Gateway (Go) │
│ /api/v1/agents/:id/chat │
└─────────────────────────────┬───────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ Agent Engine (Python) │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Agent Core │ │ Supervisor │ │ Memory │ │
│ │ (单智能体) │ │ (多智能体) │ │ (长短时) │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │ │
│ └────────┬────────┴──────────────────┘ │
│ ↓ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ Skill Router (技能路由器) │ │
│ └──────────────────────────┬───────────────────────────┘ │
│ ↓ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ Skill Executor (执行器) │ │
│ └──────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
```
## 1. 数据库设计
### 1.1 新增表结构
```sql
-- 智能体配置表 (agents)
CREATE TABLE agents (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
name VARCHAR(255) NOT NULL,
role_description TEXT,
model_provider VARCHAR(50),
model_name VARCHAR(100),
status VARCHAR(20) DEFAULT 'active',
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
-- 技能绑定表
CREATE TABLE agent_skills (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
agent_id BIGINT NOT NULL,
skill_id BIGINT NOT NULL,
skill_config JSON,
FOREIGN KEY (agent_id) REFERENCES agents(id),
FOREIGN KEY (skill_id) REFERENCES skills(id)
);
-- 知识库绑定表
CREATE TABLE agent_knowledge_bases (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
agent_id BIGINT NOT NULL,
knowledge_base_id BIGINT NOT NULL,
FOREIGN KEY (agent_id) REFERENCES agents(id),
FOREIGN KEY (knowledge_base_id) REFERENCES knowledge_bases(id)
);
-- 长期记忆表
CREATE TABLE agent_memories (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
agent_id BIGINT NOT NULL,
user_id BIGINT,
content TEXT NOT NULL,
embedding VECTOR(1536),
memory_type VARCHAR(20),
importance INT DEFAULT 5,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (agent_id) REFERENCES agents(id)
);
-- 会话记忆 (Redis)
-- Key: session:{agent_id}:{user_id}:{session_id}
-- Value: JSON {"messages": [...], "summary": "..."}
-- 任务记录表
CREATE TABLE agent_tasks (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
agent_id BIGINT NOT NULL,
user_id BIGINT NOT NULL,
user_input TEXT NOT NULL,
agent_response TEXT,
status VARCHAR(20),
tokens_used INT DEFAULT 0,
duration_ms INT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
completed_at DATETIME,
FOREIGN KEY (agent_id) REFERENCES agents(id)
);
-- 多智能体协作配置表
CREATE TABLE agent_teams (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
supervisor_agent_id BIGINT NOT NULL,
member_agent_id BIGINT NOT NULL,
dispatch_strategy VARCHAR(20) DEFAULT 'parallel',
FOREIGN KEY (supervisor_agent_id) REFERENCES agents(id),
FOREIGN KEY (member_agent_id) REFERENCES agents(id)
);
```
## 2. 后端实现 (Python Agent Engine)
### 2.1 项目结构
```
agent/
├── app/
│ ├── agent/
│ │ ├── __init__.py
│ │ ├── core/
│ │ │ ├── __init__.py
│ │ │ ├── agent.py # AgentCore 单智能体核心
│ │ │ ├── supervisor.py # Supervisor 多智能体调度
│ │ │ └── config.py # Agent 配置模型
│ │ ├── skills/
│ │ │ ├── __init__.py
│ │ │ ├── router.py # 技能路由器
│ │ │ ├── executor.py # 技能执行器
│ │ │ └── registry.py # 技能注册表
│ │ ├── memory/
│ │ │ ├── __init__.py
│ │ │ ├── manager.py # 记忆管理器
│ │ │ ├── working.py # Working Memory
│ │ │ ├── session.py # Session Memory (Redis)
│ │ │ └── persistent.py # Persistent Memory (向量库)
│ │ ├── llm/
│ │ │ ├── __init__.py
│ │ │ ├── base.py # LLM 抽象基类
│ │ │ ├── openai.py # OpenAI 实现
│ │ │ └── anthropic.py # Anthropic 实现
│ │ └── tools/
│ │ ├── __init__.py
│ │ └── registry.py # 工具注册表 (复用现有)
│ ├── api/
│ │ ├── __init__.py
│ │ └── routes/
│ │ ├── __init__.py
│ │ └── agent.py # Agent API 路由
│ └── main.py
├── requirements.txt
└── config.yaml
```
### 2.2 核心代码设计
#### AgentCore (单智能体核心)
```python
# agent/app/agent/core/agent.py
from typing import Optional, List
from pydantic import BaseModel
from app.agent.memory.manager import MemoryManager
from app.agent.skills.router import SkillRouter
from app.agent.skills.executor import SkillExecutor
from app.agent.llm.base import LLMBase
class AgentConfig(BaseModel):
id: int
name: str
role_description: str
model_provider: str
model_name: str
skills: List[int] # 技能 ID 列表
knowledge_base_ids: List[int] = []
class AgentResponse(BaseModel):
content: str
tool_calls: List[dict] = []
tokens_used: int = 0
duration_ms: int = 0
class AgentCore:
def __init__(self, config: AgentConfig, llm: LLMBase):
self.config = config
self.llm = llm
self.memory = MemoryManager(config.id)
self.skill_router = SkillRouter(config.skills)
self.skill_executor = SkillExecutor()
async def run(self, user_input: str, user_id: int) -> AgentResponse:
start_time = time.time()
# 1. 加载记忆
context = await self.memory.load_context(user_input, user_id)
# 2. 构建 Prompt
prompt = self._build_prompt(user_input, context)
# 3. LLM 决策
decision = await self.llm.decide(prompt)
# 4. 执行技能(如需)
if decision.needs_skill:
skill_results = await self._execute_skills(decision.skills)
# 5. 基于结果生成回复
final_response = await self.llm.generate(prompt, skill_results)
else:
final_response = decision.response
# 6. 保存记忆
await self.memory.save(user_input, final_response)
duration_ms = int((time.time() - start_time) * 1000)
return AgentResponse(
content=final_response,
tool_calls=decision.tool_calls,
duration_ms=duration_ms
)
def _build_prompt(self, user_input: str, context: dict) -> str:
system_prompt = f"""你是 {self.config.name}
{self.config.role_description}
相关记忆:
{context.get('summary', '')}
"""
return f"{system_prompt}\n\n用户: {user_input}"
async def _execute_skills(self, skill_decisions: List[dict]) -> List[dict]:
results = []
for decision in skill_decisions:
result = await self.skill_executor.execute(
skill_id=decision['skill_id'],
params=decision['params']
)
results.append(result)
return results
```
#### Supervisor (多智能体调度)
```python
# agent/app/agent/core/supervisor.py
from typing import List
from app.agent.core.agent import AgentCore, AgentConfig
class Supervisor:
def __init__(self, supervisor_agent: AgentCore, members: List[AgentCore], strategy: str = "parallel"):
self.supervisor = supervisor_agent
self.members = members
self.strategy = strategy
async def run(self, task: str, user_id: int) -> dict:
# 1. 任务分解 (调用 Supervisor 的 LLM)
subtasks = await self._decompose_task(task)
# 2. 分配任务
if self.strategy == "parallel":
results = await self._dispatch_parallel(subtasks, user_id)
else:
results = await self._dispatch_sequential(subtasks, user_id)
# 3. 汇总结果
final_result = await self._aggregate(results)
return {
"main_response": final_result,
"subtask_results": results
}
async def _decompose_task(self, task: str) -> List[dict]:
# 调用 LLM 分解任务
prompt = f"""分解以下任务为子任务,返回 JSON 数组:
任务: {task}
格式: [{"task": "子任务描述", "agent_type": "适合的智能体类型"}]"""
# ... 实现
return subtasks
async def _dispatch_parallel(self, subtasks: List[dict], user_id: int) -> List[dict]:
tasks = []
for subtask, member in zip(subtasks, self.members):
tasks.append(member.run(subtask['task'], user_id))
return await asyncio.gather(*tasks)
async def _dispatch_sequential(self, subtasks: List[dict], user_id: int) -> List[dict]:
results = []
context = ""
for subtask in subtasks:
# 传递前一个结果作为上下文
enhanced_task = f"{context}\n\n当前任务: {subtask['task']}"
result = await self.members[self.members.index(subtask['agent'])].run(enhanced_task, user_id)
results.append(result)
context += f"\n{result.content}"
return results
async def _aggregate(self, results: List[dict]) -> str:
# 汇总所有子任务结果
prompt = "汇总以下结果:\n" + "\n---\n".join([r['content'] for r in results])
return await self.supervisor.llm.generate(prompt, [])
```
#### Memory Manager (记忆管理)
```python
# agent/app/agent/memory/manager.py
from app.agent.memory.working import WorkingMemory
from app.agent.memory.session import SessionMemory
from app.agent.memory.persistent import PersistentMemory
class MemoryManager:
def __init__(self, agent_id: int):
self.agent_id = agent_id
self.working = WorkingMemory()
self.session = SessionMemory(agent_id)
self.persistent = PersistentMemory(agent_id)
async def load_context(self, query: str, user_id: int, session_id: str) -> dict:
# 1. Working Memory (内存,最快)
working_context = self.working.get()
# 2. Session Memory (Redis)
session_context = await self.session.get_summary(user_id, session_id)
# 3. Persistent Memory (向量库) - 按需检索
persistent_context = await self.persistent.search(query, user_id, top_k=3)
return {
'working': working_context,
'session': session_context,
'persistent': persistent_context,
'summary': self._build_summary(session_context, persistent_context)
}
async def save(self, user_input: str, response: str, user_id: int, session_id: str):
# 1. 写入 Working
self.working.add(user_input, response)
# 2. 写入 Session (定期摘要)
await self.session.add(user_input, response, user_id, session_id)
# 3. 提取关键信息写入 Persistent (定期)
if self._should_persist():
await self._extract_and_persist(user_input, response, user_id)
def _should_persist(self) -> bool:
# 每 N 条对话或达到阈值时持久化
return self.working.size() >= 5
async def _extract_and_persist(self, user_input: str, response: str, user_id: int):
# 提取关键信息(可以用 LLM 或规则)
key_points = self._extract_key_points(user_input, response)
for point in key_points:
await self.persistent.add(point, user_id, memory_type="experience")
```
```python
# agent/app/agent/memory/working.py
class WorkingMemory:
"""当前任务上下文,内存级存储"""
def __init__(self):
self.current_task = None
self.recent_messages = []
self.max_size = 10
def get(self) -> dict:
return {
'current_task': self.current_task,
'recent_messages': self.recent_messages[-self.max_size:]
}
def add(self, user_input: str, response: str):
self.recent_messages.append({
'role': 'user',
'content': user_input
})
self.recent_messages.append({
'role': 'assistant',
'content': response
})
# 保持固定大小
if len(self.recent_messages) > self.max_size * 2:
self.recent_messages = self.recent_messages[-self.max_size:]
def size(self) -> int:
return len(self.recent_messages) // 2
```
```python
# agent/app/agent/memory/session.py
import redis.asyncio as redis
import json
class SessionMemory:
"""会话级记忆Redis 存储"""
def __init__(self, agent_id: int, redis_client: redis.Redis):
self.agent_id = agent_id
self.redis = redis_client
self.ttl = 3600 * 24 # 24 小时
def _key(self, user_id: int, session_id: str) -> str:
return f"agent:memory:session:{self.agent_id}:{user_id}:{session_id}"
async def add(self, user_input: str, response: str, user_id: int, session_id: str):
key = self._key(user_id, session_id)
# 获取现有数据
data = await self.redis.get(key)
messages = json.loads(data) if data else {"messages": [], "summary": ""}
# 添加新消息
messages["messages"].append({"role": "user", "content": user_input})
messages["messages"].append({"role": "assistant", "content": response})
# 定期生成摘要
if len(messages["messages"]) >= 10:
messages["summary"] = await self._generate_summary(messages["messages"])
await self.redis.setex(key, self.ttl, json.dumps(messages))
async def get_summary(self, user_id: int, session_id: str) -> str:
key = self._key(user_id, session_id)
data = await self.redis.get(key)
if data:
messages = json.loads(data)
return messages.get("summary", "")
return ""
async def _generate_summary(self, messages: List[dict]) -> str:
# 使用 LLM 生成摘要
# ...
return summary
```
```python
# agent/app/agent/memory/persistent.py
from typing import List
class PersistentMemory:
"""长期记忆,向量存储"""
def __init__(self, agent_id: int):
self.agent_id = agent_id
self.vector_store = None # 初始化向量库客户端
async def add(self, content: str, user_id: int, memory_type: str = "experience"):
# 生成向量
embedding = await self._get_embedding(content)
# 存储到数据库
await db.agent_memories.create(
agent_id=self.agent_id,
user_id=user_id,
content=content,
embedding=embedding,
memory_type=memory_type
)
async def search(self, query: str, user_id: int, top_k: int = 3) -> List[str]:
# 生成查询向量
query_embedding = await self._get_embedding(query)
# 向量相似度搜索
results = await db.agent_memories.search(
agent_id=self.agent_id,
user_id=user_id,
embedding=query_embedding,
top_k=top_k
)
return [r.content for r in results]
async def _get_embedding(self, text: str) -> List[float]:
# 调用 embedding 模型
# ...
pass
```
#### Skill Router (技能路由器)
```python
# agent/app/agent/skills/router.py
from typing import List, Dict
class SkillRouter:
"""根据 LLM 决策选择要调用的技能"""
def __init__(self, available_skills: List[int]):
self.available_skills = available_skills
async def route(self, llm_decision: dict) -> List[dict]:
"""解析 LLM 的技能调用决策"""
if not llm_decision.get('tool_calls'):
return []
routes = []
for tool_call in llm_decision['tool_calls']:
skill_id = tool_call['skill_id']
# 检查技能是否可用
if skill_id not in self.available_skills:
continue
routes.append({
'skill_id': skill_id,
'params': tool_call.get('parameters', {}),
'reason': tool_call.get('reason', '')
})
return routes
```
#### Skill Executor (技能执行器)
```python
# agent/app/agent/skills/executor.py
import asyncio
class SkillExecutor:
"""技能执行器,支持并发/串行执行"""
def __init__(self):
self.skill_registry = None # 技能注册表
async def execute(self, skill_id: int, params: dict) -> dict:
"""执行单个技能"""
skill = self.skill_registry.get(skill_id)
if not skill:
return {"error": f"Skill {skill_id} not found"}
try:
result = await skill.execute(**params)
return {"success": True, "result": result}
except Exception as e:
return {"success": False, "error": str(e)}
async def execute_multiple(self, skills: List[dict], strategy: str = "parallel") -> List[dict]:
"""批量执行技能"""
if strategy == "parallel":
tasks = [self.execute(s['skill_id'], s['params']) for s in skills]
return await asyncio.gather(*tasks, return_exceptions=True)
else:
results = []
for s in skills:
result = await self.execute(s['skill_id'], s['params'])
results.append(result)
return results
```
## 3. API 接口设计
### 3.1 新增接口
| 方法 | 路径 | 描述 |
|------|------|------|
| POST | /api/v1/agents/:id/chat | 单智能体对话 |
| POST | /api/v1/agents/:id/chat/stream | 单智能体流式对话 |
| POST | /api/v1/teams/:id/chat | 多智能体群聊 |
| GET | /api/v1/agents/:id/memories | 获取记忆 |
| DELETE | /api/v1/agents/:id/memories/:memory_id | 删除记忆 |
| GET | /api/v1/agents/:id/history | 获取对话历史 |
### 3.2 接口请求/响应示例
```json
// POST /api/v1/agents/1/chat
// Request
{
"user_id": 123,
"message": "帮我分析销售数据",
"session_id": "optional-session-id"
}
// Response
{
"agent_id": 1,
"response": "根据分析,今天销售额为...",
"tool_calls": [
{"skill": "query_database", "params": {"sql": "SELECT ..."}}
],
"tokens_used": 1500,
"duration_ms": 2000
}
```
## 4. 实现步骤
### Phase 1: 数据库设计与迁移
- [ ] 创建数据库迁移脚本
- [ ] 新增 agents, agent_skills, agent_memories, agent_teams 等表
### Phase 2: 后端 Agent Engine 核心
- [ ] 实现 AgentCore 单智能体核心类
- [ ] 实现 LLM 适配器 (OpenAI/Anthropic)
- [ ] 实现 Prompt 构建逻辑
### Phase 3: 记忆系统实现
- [ ] 实现 WorkingMemory (内存)
- [ ] 实现 SessionMemory (Redis)
- [ ] 实现 PersistentMemory (向量库)
- [ ] 实现 MemoryManager 统一接口
### Phase 4: 技能路由与执行器
- [ ] 实现 SkillRouter
- [ ] 实现 SkillExecutor
- [ ] 对接现有技能注册表
### Phase 5: 多智能体 Supervisor
- [ ] 实现 Supervisor 调度器
- [ ] 实现任务分解逻辑
- [ ] 实现结果聚合
### Phase 6: API 接口对接
- [ ] 新增 Agent API 路由
- [ ] 实现 /chat, /chat/stream 等接口
- [ ] 对接 Go API Gateway
### Phase 7: 前端页面集成
- [ ] 智能体详情页增加对话功能
- [ ] 记忆管理页面
- [ ] 多智能体协作配置页面
### Phase 8: 测试与优化
- [ ] 单元测试
- [ ] 集成测试
- [ ] 性能优化
## 5. 里程碑
| 里程碑 | 预计时间 | 交付物 |
|--------|----------|--------|
| M1: 基础骨架 | 1 周 | 数据库 + AgentCore 基础 |
| M2: 记忆系统 | 1 周 | 三层记忆实现 |
| M3: 技能调用 | 1 周 | Router + Executor |
| M4: 多智能体 | 1 周 | Supervisor 实现 |
| M5: API 对接 | 1 周 | 完整 API |
| M6: 前端集成 | 1 周 | 页面功能 |
## 6. 风险与对策
| 风险 | 影响 | 对策 |
|------|------|------|
| LLM API 不稳定 | 功能不可用 | 重试机制 + 降级 |
| 向量库性能 | 检索慢 | 缓存 + 限流 |
| Token 成本超支 | 费用上涨 | 记忆压缩 + 按需加载 |
| 多智能体通信 | 延迟增加 | 超时控制 + 并行优化 |