Files
X-Agents/plan/agent-implementation-plan.md
2026-03-11 16:26:22 +08:00

23 KiB
Raw Blame History

智能体系统实现计划

项目概述

设计并实现一个支持单智能体独立工作 + 多智能体协作的混合型智能体系统,具备长短时记忆、多种技能调用能力。

技术架构

┌─────────────────────────────────────────────────────────────────┐
│                         用户请求入口                             │
└─────────────────────────────┬───────────────────────────────────┘
                              ↓
┌─────────────────────────────────────────────────────────────────┐
│                      API Gateway (Go)                          │
│                   /api/v1/agents/:id/chat                      │
└─────────────────────────────┬───────────────────────────────────┘
                              ↓
┌─────────────────────────────────────────────────────────────────┐
│                   Agent Engine (Python)                         │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐         │
│  │  Agent Core   │  │   Supervisor  │  │    Memory    │         │
│  │  (单智能体)    │  │   (多智能体)   │  │  (长短时)     │         │
│  └──────┬───────┘  └──────┬───────┘  └──────┬───────┘         │
│         │                 │                  │                  │
│         └────────┬────────┴──────────────────┘                  │
│                  ↓                                             │
│  ┌──────────────────────────────────────────────────────┐     │
│  │              Skill Router (技能路由器)                 │     │
│  └──────────────────────────┬───────────────────────────┘     │
│                             ↓                                   │
│  ┌──────────────────────────────────────────────────────┐     │
│  │              Skill Executor (执行器)                  │     │
│  └──────────────────────────────────────────────────────┘     │
└─────────────────────────────────────────────────────────────────┘

1. 数据库设计

1.1 新增表结构

-- 智能体配置表 (agents)
CREATE TABLE agents (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    name VARCHAR(255) NOT NULL,
    role_description TEXT,
    model_provider VARCHAR(50),
    model_name VARCHAR(100),
    status VARCHAR(20) DEFAULT 'active',
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
    updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);

-- 技能绑定表
CREATE TABLE agent_skills (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    agent_id BIGINT NOT NULL,
    skill_id BIGINT NOT NULL,
    skill_config JSON,
    FOREIGN KEY (agent_id) REFERENCES agents(id),
    FOREIGN KEY (skill_id) REFERENCES skills(id)
);

-- 知识库绑定表
CREATE TABLE agent_knowledge_bases (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    agent_id BIGINT NOT NULL,
    knowledge_base_id BIGINT NOT NULL,
    FOREIGN KEY (agent_id) REFERENCES agents(id),
    FOREIGN KEY (knowledge_base_id) REFERENCES knowledge_bases(id)
);

-- 长期记忆表
CREATE TABLE agent_memories (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    agent_id BIGINT NOT NULL,
    user_id BIGINT,
    content TEXT NOT NULL,
    embedding VECTOR(1536),
    memory_type VARCHAR(20),
    importance INT DEFAULT 5,
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
    FOREIGN KEY (agent_id) REFERENCES agents(id)
);

-- 会话记忆 (Redis)
-- Key: session:{agent_id}:{user_id}:{session_id}
-- Value: JSON {"messages": [...], "summary": "..."}

-- 任务记录表
CREATE TABLE agent_tasks (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    agent_id BIGINT NOT NULL,
    user_id BIGINT NOT NULL,
    user_input TEXT NOT NULL,
    agent_response TEXT,
    status VARCHAR(20),
    tokens_used INT DEFAULT 0,
    duration_ms INT,
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
    completed_at DATETIME,
    FOREIGN KEY (agent_id) REFERENCES agents(id)
);

-- 多智能体协作配置表
CREATE TABLE agent_teams (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    supervisor_agent_id BIGINT NOT NULL,
    member_agent_id BIGINT NOT NULL,
    dispatch_strategy VARCHAR(20) DEFAULT 'parallel',
    FOREIGN KEY (supervisor_agent_id) REFERENCES agents(id),
    FOREIGN KEY (member_agent_id) REFERENCES agents(id)
);

2. 后端实现 (Python Agent Engine)

2.1 项目结构

agent/
├── app/
│   ├── agent/
│   │   ├── __init__.py
│   │   ├── core/
│   │   │   ├── __init__.py
│   │   │   ├── agent.py          # AgentCore 单智能体核心
│   │   │   ├── supervisor.py     # Supervisor 多智能体调度
│   │   │   └── config.py         # Agent 配置模型
│   │   ├── skills/
│   │   │   ├── __init__.py
│   │   │   ├── router.py         # 技能路由器
│   │   │   ├── executor.py       # 技能执行器
│   │   │   └── registry.py       # 技能注册表
│   │   ├── memory/
│   │   │   ├── __init__.py
│   │   │   ├── manager.py        # 记忆管理器
│   │   │   ├── working.py         # Working Memory
│   │   │   ├── session.py         # Session Memory (Redis)
│   │   │   └── persistent.py      # Persistent Memory (向量库)
│   │   ├── llm/
│   │   │   ├── __init__.py
│   │   │   ├── base.py            # LLM 抽象基类
│   │   │   ├── openai.py          # OpenAI 实现
│   │   │   └── anthropic.py       # Anthropic 实现
│   │   └── tools/
│   │       ├── __init__.py
│   │       └── registry.py        # 工具注册表 (复用现有)
│   ├── api/
│   │   ├── __init__.py
│   │   └── routes/
│   │       ├── __init__.py
│   │       └── agent.py           # Agent API 路由
│   └── main.py
├── requirements.txt
└── config.yaml

2.2 核心代码设计

AgentCore (单智能体核心)

# agent/app/agent/core/agent.py
from typing import Optional, List
from pydantic import BaseModel
from app.agent.memory.manager import MemoryManager
from app.agent.skills.router import SkillRouter
from app.agent.skills.executor import SkillExecutor
from app.agent.llm.base import LLMBase

class AgentConfig(BaseModel):
    id: int
    name: str
    role_description: str
    model_provider: str
    model_name: str
    skills: List[int]  # 技能 ID 列表
    knowledge_base_ids: List[int] = []

class AgentResponse(BaseModel):
    content: str
    tool_calls: List[dict] = []
    tokens_used: int = 0
    duration_ms: int = 0

class AgentCore:
    def __init__(self, config: AgentConfig, llm: LLMBase):
        self.config = config
        self.llm = llm
        self.memory = MemoryManager(config.id)
        self.skill_router = SkillRouter(config.skills)
        self.skill_executor = SkillExecutor()

    async def run(self, user_input: str, user_id: int) -> AgentResponse:
        start_time = time.time()

        # 1. 加载记忆
        context = await self.memory.load_context(user_input, user_id)

        # 2. 构建 Prompt
        prompt = self._build_prompt(user_input, context)

        # 3. LLM 决策
        decision = await self.llm.decide(prompt)

        # 4. 执行技能(如需)
        if decision.needs_skill:
            skill_results = await self._execute_skills(decision.skills)
            # 5. 基于结果生成回复
            final_response = await self.llm.generate(prompt, skill_results)
        else:
            final_response = decision.response

        # 6. 保存记忆
        await self.memory.save(user_input, final_response)

        duration_ms = int((time.time() - start_time) * 1000)

        return AgentResponse(
            content=final_response,
            tool_calls=decision.tool_calls,
            duration_ms=duration_ms
        )

    def _build_prompt(self, user_input: str, context: dict) -> str:
        system_prompt = f"""你是 {self.config.name}{self.config.role_description}

相关记忆:
{context.get('summary', '')}
"""
        return f"{system_prompt}\n\n用户: {user_input}"

    async def _execute_skills(self, skill_decisions: List[dict]) -> List[dict]:
        results = []
        for decision in skill_decisions:
            result = await self.skill_executor.execute(
                skill_id=decision['skill_id'],
                params=decision['params']
            )
            results.append(result)
        return results

Supervisor (多智能体调度)

# agent/app/agent/core/supervisor.py
from typing import List
from app.agent.core.agent import AgentCore, AgentConfig

class Supervisor:
    def __init__(self, supervisor_agent: AgentCore, members: List[AgentCore], strategy: str = "parallel"):
        self.supervisor = supervisor_agent
        self.members = members
        self.strategy = strategy

    async def run(self, task: str, user_id: int) -> dict:
        # 1. 任务分解 (调用 Supervisor 的 LLM)
        subtasks = await self._decompose_task(task)

        # 2. 分配任务
        if self.strategy == "parallel":
            results = await self._dispatch_parallel(subtasks, user_id)
        else:
            results = await self._dispatch_sequential(subtasks, user_id)

        # 3. 汇总结果
        final_result = await self._aggregate(results)

        return {
            "main_response": final_result,
            "subtask_results": results
        }

    async def _decompose_task(self, task: str) -> List[dict]:
        # 调用 LLM 分解任务
        prompt = f"""分解以下任务为子任务,返回 JSON 数组:
任务: {task}

格式: [{"task": "子任务描述", "agent_type": "适合的智能体类型"}]"""
        # ... 实现
        return subtasks

    async def _dispatch_parallel(self, subtasks: List[dict], user_id: int) -> List[dict]:
        tasks = []
        for subtask, member in zip(subtasks, self.members):
            tasks.append(member.run(subtask['task'], user_id))
        return await asyncio.gather(*tasks)

    async def _dispatch_sequential(self, subtasks: List[dict], user_id: int) -> List[dict]:
        results = []
        context = ""
        for subtask in subtasks:
            # 传递前一个结果作为上下文
            enhanced_task = f"{context}\n\n当前任务: {subtask['task']}"
            result = await self.members[self.members.index(subtask['agent'])].run(enhanced_task, user_id)
            results.append(result)
            context += f"\n{result.content}"
        return results

    async def _aggregate(self, results: List[dict]) -> str:
        # 汇总所有子任务结果
        prompt = "汇总以下结果:\n" + "\n---\n".join([r['content'] for r in results])
        return await self.supervisor.llm.generate(prompt, [])

Memory Manager (记忆管理)

# agent/app/agent/memory/manager.py
from app.agent.memory.working import WorkingMemory
from app.agent.memory.session import SessionMemory
from app.agent.memory.persistent import PersistentMemory

class MemoryManager:
    def __init__(self, agent_id: int):
        self.agent_id = agent_id
        self.working = WorkingMemory()
        self.session = SessionMemory(agent_id)
        self.persistent = PersistentMemory(agent_id)

    async def load_context(self, query: str, user_id: int, session_id: str) -> dict:
        # 1. Working Memory (内存,最快)
        working_context = self.working.get()

        # 2. Session Memory (Redis)
        session_context = await self.session.get_summary(user_id, session_id)

        # 3. Persistent Memory (向量库) - 按需检索
        persistent_context = await self.persistent.search(query, user_id, top_k=3)

        return {
            'working': working_context,
            'session': session_context,
            'persistent': persistent_context,
            'summary': self._build_summary(session_context, persistent_context)
        }

    async def save(self, user_input: str, response: str, user_id: int, session_id: str):
        # 1. 写入 Working
        self.working.add(user_input, response)

        # 2. 写入 Session (定期摘要)
        await self.session.add(user_input, response, user_id, session_id)

        # 3. 提取关键信息写入 Persistent (定期)
        if self._should_persist():
            await self._extract_and_persist(user_input, response, user_id)

    def _should_persist(self) -> bool:
        # 每 N 条对话或达到阈值时持久化
        return self.working.size() >= 5

    async def _extract_and_persist(self, user_input: str, response: str, user_id: int):
        # 提取关键信息(可以用 LLM 或规则)
        key_points = self._extract_key_points(user_input, response)
        for point in key_points:
            await self.persistent.add(point, user_id, memory_type="experience")
# agent/app/agent/memory/working.py
class WorkingMemory:
    """当前任务上下文,内存级存储"""

    def __init__(self):
        self.current_task = None
        self.recent_messages = []
        self.max_size = 10

    def get(self) -> dict:
        return {
            'current_task': self.current_task,
            'recent_messages': self.recent_messages[-self.max_size:]
        }

    def add(self, user_input: str, response: str):
        self.recent_messages.append({
            'role': 'user',
            'content': user_input
        })
        self.recent_messages.append({
            'role': 'assistant',
            'content': response
        })
        # 保持固定大小
        if len(self.recent_messages) > self.max_size * 2:
            self.recent_messages = self.recent_messages[-self.max_size:]

    def size(self) -> int:
        return len(self.recent_messages) // 2
# agent/app/agent/memory/session.py
import redis.asyncio as redis
import json

class SessionMemory:
    """会话级记忆Redis 存储"""

    def __init__(self, agent_id: int, redis_client: redis.Redis):
        self.agent_id = agent_id
        self.redis = redis_client
        self.ttl = 3600 * 24  # 24 小时

    def _key(self, user_id: int, session_id: str) -> str:
        return f"agent:memory:session:{self.agent_id}:{user_id}:{session_id}"

    async def add(self, user_input: str, response: str, user_id: int, session_id: str):
        key = self._key(user_id, session_id)

        # 获取现有数据
        data = await self.redis.get(key)
        messages = json.loads(data) if data else {"messages": [], "summary": ""}

        # 添加新消息
        messages["messages"].append({"role": "user", "content": user_input})
        messages["messages"].append({"role": "assistant", "content": response})

        # 定期生成摘要
        if len(messages["messages"]) >= 10:
            messages["summary"] = await self._generate_summary(messages["messages"])

        await self.redis.setex(key, self.ttl, json.dumps(messages))

    async def get_summary(self, user_id: int, session_id: str) -> str:
        key = self._key(user_id, session_id)
        data = await self.redis.get(key)
        if data:
            messages = json.loads(data)
            return messages.get("summary", "")
        return ""

    async def _generate_summary(self, messages: List[dict]) -> str:
        # 使用 LLM 生成摘要
        # ...
        return summary
# agent/app/agent/memory/persistent.py
from typing import List

class PersistentMemory:
    """长期记忆,向量存储"""

    def __init__(self, agent_id: int):
        self.agent_id = agent_id
        self.vector_store = None  # 初始化向量库客户端

    async def add(self, content: str, user_id: int, memory_type: str = "experience"):
        # 生成向量
        embedding = await self._get_embedding(content)

        # 存储到数据库
        await db.agent_memories.create(
            agent_id=self.agent_id,
            user_id=user_id,
            content=content,
            embedding=embedding,
            memory_type=memory_type
        )

    async def search(self, query: str, user_id: int, top_k: int = 3) -> List[str]:
        # 生成查询向量
        query_embedding = await self._get_embedding(query)

        # 向量相似度搜索
        results = await db.agent_memories.search(
            agent_id=self.agent_id,
            user_id=user_id,
            embedding=query_embedding,
            top_k=top_k
        )

        return [r.content for r in results]

    async def _get_embedding(self, text: str) -> List[float]:
        # 调用 embedding 模型
        # ...
        pass

Skill Router (技能路由器)

# agent/app/agent/skills/router.py
from typing import List, Dict

class SkillRouter:
    """根据 LLM 决策选择要调用的技能"""

    def __init__(self, available_skills: List[int]):
        self.available_skills = available_skills

    async def route(self, llm_decision: dict) -> List[dict]:
        """解析 LLM 的技能调用决策"""
        if not llm_decision.get('tool_calls'):
            return []

        routes = []
        for tool_call in llm_decision['tool_calls']:
            skill_id = tool_call['skill_id']

            # 检查技能是否可用
            if skill_id not in self.available_skills:
                continue

            routes.append({
                'skill_id': skill_id,
                'params': tool_call.get('parameters', {}),
                'reason': tool_call.get('reason', '')
            })

        return routes

Skill Executor (技能执行器)

# agent/app/agent/skills/executor.py
import asyncio

class SkillExecutor:
    """技能执行器,支持并发/串行执行"""

    def __init__(self):
        self.skill_registry = None  # 技能注册表

    async def execute(self, skill_id: int, params: dict) -> dict:
        """执行单个技能"""
        skill = self.skill_registry.get(skill_id)
        if not skill:
            return {"error": f"Skill {skill_id} not found"}

        try:
            result = await skill.execute(**params)
            return {"success": True, "result": result}
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def execute_multiple(self, skills: List[dict], strategy: str = "parallel") -> List[dict]:
        """批量执行技能"""
        if strategy == "parallel":
            tasks = [self.execute(s['skill_id'], s['params']) for s in skills]
            return await asyncio.gather(*tasks, return_exceptions=True)
        else:
            results = []
            for s in skills:
                result = await self.execute(s['skill_id'], s['params'])
                results.append(result)
            return results

3. API 接口设计

3.1 新增接口

方法 路径 描述
POST /api/v1/agents/:id/chat 单智能体对话
POST /api/v1/agents/:id/chat/stream 单智能体流式对话
POST /api/v1/teams/:id/chat 多智能体群聊
GET /api/v1/agents/:id/memories 获取记忆
DELETE /api/v1/agents/:id/memories/:memory_id 删除记忆
GET /api/v1/agents/:id/history 获取对话历史

3.2 接口请求/响应示例

// POST /api/v1/agents/1/chat
// Request
{
    "user_id": 123,
    "message": "帮我分析销售数据",
    "session_id": "optional-session-id"
}

// Response
{
    "agent_id": 1,
    "response": "根据分析,今天销售额为...",
    "tool_calls": [
        {"skill": "query_database", "params": {"sql": "SELECT ..."}}
    ],
    "tokens_used": 1500,
    "duration_ms": 2000
}

4. 实现步骤

Phase 1: 数据库设计与迁移

  • 创建数据库迁移脚本
  • 新增 agents, agent_skills, agent_memories, agent_teams 等表

Phase 2: 后端 Agent Engine 核心

  • 实现 AgentCore 单智能体核心类
  • 实现 LLM 适配器 (OpenAI/Anthropic)
  • 实现 Prompt 构建逻辑

Phase 3: 记忆系统实现

  • 实现 WorkingMemory (内存)
  • 实现 SessionMemory (Redis)
  • 实现 PersistentMemory (向量库)
  • 实现 MemoryManager 统一接口

Phase 4: 技能路由与执行器

  • 实现 SkillRouter
  • 实现 SkillExecutor
  • 对接现有技能注册表

Phase 5: 多智能体 Supervisor

  • 实现 Supervisor 调度器
  • 实现任务分解逻辑
  • 实现结果聚合

Phase 6: API 接口对接

  • 新增 Agent API 路由
  • 实现 /chat, /chat/stream 等接口
  • 对接 Go API Gateway

Phase 7: 前端页面集成

  • 智能体详情页增加对话功能
  • 记忆管理页面
  • 多智能体协作配置页面

Phase 8: 测试与优化

  • 单元测试
  • 集成测试
  • 性能优化

5. 里程碑

里程碑 预计时间 交付物
M1: 基础骨架 1 周 数据库 + AgentCore 基础
M2: 记忆系统 1 周 三层记忆实现
M3: 技能调用 1 周 Router + Executor
M4: 多智能体 1 周 Supervisor 实现
M5: API 对接 1 周 完整 API
M6: 前端集成 1 周 页面功能

6. 风险与对策

风险 影响 对策
LLM API 不稳定 功能不可用 重试机制 + 降级
向量库性能 检索慢 缓存 + 限流
Token 成本超支 费用上涨 记忆压缩 + 按需加载
多智能体通信 延迟增加 超时控制 + 并行优化