Files
X-Agents/multi_agent_plan/implementation_plan.md
DESKTOP-72TV0V4\caoxiaozhu 5ea6f0d31f feat: 新增多 Agent 协作系统
- 添加多 Agent 图协作框架 (graph, supervisor, workers)
- 添加迭代器和集成模块
- 添加多 Agent 规划文档

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-10 23:21:37 +08:00

22 KiB
Raw Blame History

多智能体联动系统实现计划

项目概述

基于 LangGraph 实现类似 OpenClaw 的多智能体协作系统,采用 Supervisor + Workers 层级架构。

核心特性

  • 任务规划: Supervisor 分析任务并生成执行计划
  • 动态分发: LLM 自主决策调用哪个 Worker
  • 并行执行: 支持多个 Worker 同时处理任务
  • 结果汇总: Supervisor 汇总所有 Worker 结果
  • 迭代优化: 支持 Review 机制和迭代重试

一、系统架构

1.1 整体架构图

┌─────────────────────────────────────────────────────────────────┐
│                      MultiAgentSystem                           │
│  ┌───────────────────────────────────────────────────────────┐ │
│  │                    Supervisor Agent                        │ │
│  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐       │ │
│  │  │   Planner   │  │  Dispatcher │  │  Aggregator │       │ │
│  │  │  (任务规划)  │  │  (任务分发)  │  │  (结果汇总)  │       │ │
│  │  └─────────────┘  └─────────────┘  └─────────────┘       │ │
│  └───────────────────────────────────────────────────────────┘ │
│                              │                                   │
│           ┌──────────────────┼──────────────────┐              │
│           ▼                  ▼                  ▼               │
│   ┌─────────────┐     ┌─────────────┐     ┌─────────────┐       │
│   │  Research   │     │   Coder    │     │   Review   │       │
│   │   Worker    │     │   Worker   │     │   Worker   │       │
│   └─────────────┘     └─────────────┘     └─────────────┘       │
│           │                  │                  │               │
│           └──────────────────┴──────────────────┘              │
│                              │                                   │
│                              ▼                                   │
│                    ┌─────────────┐                             │
│                    │ Shared State │                             │
│                    │  (共享状态)   │                             │
│                    └─────────────┘                             │
└─────────────────────────────────────────────────────────────────┘

1.2 核心组件

组件 职责 文件位置
SupervisorAgent 任务分析、规划、分发、汇总 agent/multi/supervisor.py
BaseWorker Worker 基类,定义执行接口 agent/multi/workers/base.py
ResearchWorker 信息搜索和调研 agent/multi/workers/research.py
CoderWorker 代码编写和修改 agent/multi/workers/coder.py
ReviewWorker 结果检查和评审 agent/multi/workers/review.py
SharedState 跨 Agent 共享状态 agent/multi/state.py
TaskQueue 任务队列管理 agent/multi/queue.py
MultiAgentGraph LangGraph 流程编排 agent/multi/graph.py

二、数据结构设计

2.1 Agent State 定义

# agent/multi/types.py
from typing import TypedDict, Annotated, Optional
from operator import add
from pydantic import BaseModel


class TaskItem(BaseModel):
    """单个任务项"""
    id: str
    description: str
    assigned_agent: str  # research / coder / review
    status: str  # pending / running / completed / failed
    result: Optional[dict] = None
    error: Optional[str] = None
    retry_count: int = 0


class AgentState(TypedDict):
    """贯穿整个图的 Agent 状态"""
    # 用户输入
    original_task: str  # 原始任务描述

    # 任务规划
    task_plan: list[TaskItem]  # 分解后的任务列表
    current_task_index: int  # 当前执行的任务索引

    # 执行结果
    results: dict  # {task_id: result}

    # 流程控制
    iteration: int  # 当前迭代次数
    next_node: str  # 下一个节点名称

    # 共享上下文
    shared_context: dict  # Agent 间共享的数据

    # 最终输出
    final_output: str
    status: str  # running / completed / failed

2.2 Supervisor 输出结构

# Supervisor 的结构化输出
class SupervisorDecision(BaseModel):
    """Supervisor 的决策"""
    analysis: str  # 任务分析
    task_plan: list[TaskItem]  # 任务计划
    need_aggregation: bool  # 是否需要汇总
    next_worker: str  # 下一个执行的 Worker

三、核心实现

3.1 Supervisor Agent

# agent/multi/supervisor.py
from langchain_core.language_models import BaseChatModel
from langchain_core.output_parsers import PydanticOutputParser
from pydantic import BaseModel
from typing import Type

from .types import AgentState, TaskItem, SupervisorDecision
from .prompts import SUPERVISOR_SYSTEM_PROMPT


class SupervisorAgent:
    """Supervisor Agent - 负责任务规划和分发"""

    def __init__(
        self,
        llm: BaseChatModel,
        max_iterations: int = 3
    ):
        self.llm = llm
        self.max_iterations = max_iterations
        self.output_parser = PydanticOutputParser(pydantic_object=SupervisorDecision)

    def create_node(self):
        """创建 Supervisor 节点"""
        return self._supervisor_node

    async def _supervisor_node(self, state: AgentState) -> dict:
        """Supervisor 节点逻辑"""
        # 首次调用:分析任务并生成计划
        if not state.get("task_plan"):
            decision = await self._plan_tasks(state["original_task"])
            return {
                "task_plan": decision.task_plan,
                "next_node": decision.next_worker,
                "current_task_index": 0,
                "shared_context": {"task_analysis": decision.analysis}
            }

        # 检查是否需要继续
        current_task = state["task_plan"][state["current_task_index"]]

        if current_task["status"] == "completed":
            # 当前任务完成,检查是否还有更多任务
            if state["current_task_index"] + 1 < len(state["task_plan"]):
                next_index = state["current_task_index"] + 1
                next_task = state["task_plan"][next_index]
                return {
                    "current_task_index": next_index,
                    "next_node": next_task["assigned_agent"]
                }
            else:
                # 所有任务完成,进入汇总
                return {"next_node": "aggregate"}

        elif current_task["status"] == "failed":
            # 任务失败,检查是否超过最大重试
            if current_task["retry_count"] >= self.max_iterations:
                return {"next_node": "aggregate", "status": "failed"}
            else:
                # 重试
                return {"next_node": current_task["assigned_agent"]}

        return {"next_node": state.get("next_node", "aggregate")}

    async def _plan_tasks(self, task: str) -> SupervisorDecision:
        """调用 LLM 生成任务计划"""
        prompt = SUPERVISOR_SYSTEM_PROMPT.format(task=task)

        response = await self.llm.ainvoke([
            {"role": "system", "content": prompt},
            {"role": "user", "content": "请分析任务并制定执行计划。"}
        ])

        # 解析 LLM 输出为结构化决策
        # ... (实现解析逻辑)
        return decision

3.2 Worker 基类

# agent/multi/workers/base.py
from abc import ABC, abstractmethod
from typing import Any
from langchain_core.language_models import BaseChatModel

from ..types import AgentState


class BaseWorker(ABC):
    """Worker Agent 基类"""

    def __init__(
        self,
        llm: BaseChatModel,
        name: str,
        system_prompt: str,
        tools: list = None
    ):
        self.llm = llm
        self.name = name
        self.system_prompt = system_prompt
        self.tools = tools or []

    @abstractmethod
    async def execute(self, task: TaskItem, context: dict) -> dict:
        """执行任务"""
        pass

    def create_node(self):
        """创建 LangGraph 节点"""
        async def node(state: AgentState) -> dict:
            task = state["task_plan"][state["current_task_index"]]
            result = await self.execute(task, state.get("shared_context", {}))

            # 更新状态
            return {
                "results": {task.id: result},
                "task_plan": self._update_task_status(
                    state["task_plan"],
                    task.id,
                    "completed" if result.get("success") else "failed"
                ),
                "shared_context": {**state.get("shared_context", {}), **result.get("context", {})}
            }

        return node

    def _update_task_status(self, tasks: list, task_id: str, status: str) -> list:
        """更新任务状态"""
        return [
            {**task, "status": status} if task["id"] == task_id else task
            for task in tasks
        ]

3.3 任务队列(可选:支持并行执行)

# agent/multi/queue.py
import asyncio
from typing import Any, Callable
from dataclasses import dataclass
from enum import Enum


class TaskStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"


@dataclass
class QueuedTask:
    id: str
    agent_name: str
    task_data: Any
    status: TaskStatus = TaskStatus.PENDING
    result: Any = None
    error: str = None


class TaskQueue:
    """任务队列 - 支持并行执行多个 Worker"""

    def __init__(self, max_concurrent: int = 3):
        self.max_concurrent = max_concurrent
        self.queue: asyncio.Queue = asyncio.Queue()
        self.results: dict = {}
        self._running = 0

    async def add_task(self, task: QueuedTask):
        """添加任务到队列"""
        await self.queue.put(task)

    async def execute_all(self, worker_factory: Callable):
        """执行所有任务"""
        async def worker():
            while True:
                try:
                    task = self.queue.get_nowait()
                except asyncio.QueueEmpty:
                    break

                self._running += 1
                task.status = TaskStatus.Running

                try:
                    worker_instance = worker_factory(task.agent_name)
                    task.result = await worker_instance.execute(task.task_data)
                    task.status = TaskStatus.COMPLETED
                except Exception as e:
                    task.status = TaskStatus.FAILED
                    task.error = str(e)
                finally:
                    self._running -= 1
                    self.results[task.id] = task

        # 启动多个 worker 协程
        workers = [asyncio.create_task(worker()) for _ in range(self.max_concurrent)]
        await asyncio.gather(*workers)

        return self.results

3.4 LangGraph 流程编排

# agent/multi/graph.py
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import ToolNode

from .types import AgentState
from .supervisor import SupervisorAgent
from .workers.research import ResearchWorker
from .workers.coder import CoderWorker
from .workers.review import ReviewWorker
from .aggregator import ResultAggregator


def create_multi_agent_graph(
    llm,
    tool_registry,
    max_iterations: int = 3
) -> StateGraph:
    """创建多 Agent 流程图"""

    # 初始化组件
    supervisor = SupervisorAgent(llm, max_iterations)
    research_worker = ResearchWorker(llm, tool_registry)
    coder_worker = CoderWorker(llm, tool_registry)
    review_worker = ReviewWorker(llm, tool_registry)
    aggregator = ResultAggregator(llm)

    # 创建图
    graph = StateGraph(AgentState)

    # 添加节点
    graph.add_node("supervisor", supervisor.create_node())
    graph.add_node("research", research_worker.create_node())
    graph.add_node("coder", coder_worker.create_node())
    graph.add_node("review", review_worker.create_node())
    graph.add_node("aggregate", aggregator.create_node())

    # 设置入口
    graph.set_entry_point("supervisor")

    # 添加边
    graph.add_edge("supervisor", "research")
    graph.add_edge("research", "review")
    graph.add_edge("coder", "review")

    # 条件边:从 review 回到 supervisor
    def should_continue(state: AgentState) -> str:
        if state.get("status") == "failed":
            return "aggregate"
        if state.get("iteration", 0) >= max_iterations:
            return "aggregate"
        if state.get("current_task_index", 0) >= len(state.get("task_plan", [])):
            return "aggregate"
        return "supervisor"

    graph.add_conditional_edges(
        "review",
        should_continue,
        {
            "supervisor": "supervisor",
            "aggregate": "aggregate"
        }
    )

    # 结束节点
    graph.add_edge("aggregate", END)

    return graph.compile()

四、Prompt 设计

4.1 Supervisor System Prompt

# agent/multi/prompts.py

SUPERVISOR_SYSTEM_PROMPT = """你是一个任务规划专家Supervisor。你的职责是将复杂任务分解为可执行的子任务并分配给合适的执行 Agent。

## 可用的 Worker Agent
- **research**: 信息搜索和调研
- **coder**: 代码编写、修改和调试
- **review**: 结果检查、质量评审

## 任务
{task}

## 请按以下步骤执行

### 步骤 1: 任务分析
分析任务的性质,确定需要哪些步骤来完成。

### 步骤 2: 任务分解
将任务分解为独立的子任务。每个子任务应该:
- 描述清晰
- 可以由单个 Agent 完成
- 有明确的完成标准

### 步骤 3: 分配 Agent
为每个子任务选择最合适的执行 Agent。

### 步骤 4: 确定执行顺序
如果有依赖关系,确定正确的执行顺序。

## 输出格式
请以 JSON 格式输出你的决策:
```json
{{
  "analysis": "任务分析...",
  "task_plan": [
    {{
      "id": "task_1",
      "description": "子任务描述",
      "assigned_agent": "research"
    }},
    {{
      "id": "task_2",
      "description": "子任务描述",
      "assigned_agent": "coder"
    }}
  ],
  "need_aggregation": true,
  "next_worker": "research"
}}

注意

  • 如果任务很简单,可以只分配给一个 Agent
  • 如果任务需要迭代优化,确保有 review 环节
  • 考虑任务之间的依赖关系 """

### 4.2 Review Worker Prompt

```python
REVIEW_SYSTEM_PROMPT = """你是一个代码和结果评审专家Reviewer。你的职责是检查任务执行结果是否符合要求。

## 任务描述
{task_description}

## 执行结果
{execution_result}

## 检查标准
1. 结果是否完整解决了原始任务?
2. 输出格式是否正确?
3. 是否存在明显的错误或遗漏?
4. 代码是否有潜在问题?

## 请输出评审结果
```json
{{
  "passed": true/false,
  "issues": [
    {{"severity": "high/medium/low", "description": "问题描述"}}
  ],
  "suggestions": ["改进建议"]
}}

"""


---

## 五、迭代控制

### 5.1 迭代逻辑

```python
# agent/multi/iteration.py
from typing import Optional


class IterationController:
    """迭代控制器"""

    def __init__(self, max_iterations: int = 3):
        self.max_iterations = max_iterations

    def should_continue(
        self,
        iteration: int,
        task_status: str,
        review_result: dict
    ) -> tuple[bool, str]:
        """
        判断是否继续迭代

        Returns:
            (是否继续, 原因)
        """
        # 超过最大迭代次数
        if iteration >= self.max_iterations:
            return False, "max_iterations_reached"

        # 任务成功完成
        if task_status == "completed" and review_result.get("passed"):
            return False, "task_completed"

        # 任务失败且不可重试
        if task_status == "failed" and not review_result.get("retryable"):
            return False, "task_failed"

        # 需要重试
        if review_result.get("issues") and review_result.get("passed") is False:
            return True, "needs_retry"

        return True, "continue"

    def get_next_action(
        self,
        review_result: dict,
        current_worker: str
    ) -> str:
        """确定下一步动作"""
        if review_result.get("passed"):
            return "supervisor"  # 返回 Supervisor

        # 根据问题类型决定下一步
        issues = review_result.get("issues", [])
        high_severity = any(i.get("severity") == "high" for i in issues)

        if high_severity:
            # 严重问题,重新执行相同任务
            return current_worker
        else:
            # 轻微问题,可以继续
            return "supervisor"

六、与现有系统集成

6.1 复用现有组件

# agent/multi/integration.py
from app.agent.core.agent import AgentManager
from app.agent.tools.registry import ToolRegistry
from app.agent.memory.session import SessionManager
from app.llm.factory import LLMFactory


class MultiAgentSystem:
    """多智能体系统 - 集成现有组件"""

    def __init__(self, config: dict):
        # 复用现有 LLM Factory
        self.llm_factory = LLMFactory(
            provider=config.get("llm_provider", "openai"),
            openai_api_key=config.get("openai_api_key"),
            anthropic_api_key=config.get("anthropic_api_key")
        )

        # 复用现有 Tool Registry
        self.tool_registry = ToolRegistry()
        self._register_default_tools()

        # 复用现有 Session Manager
        self.session_manager = SessionManager()

        # 配置
        self.max_iterations = config.get("max_iterations", 3)

    def _register_default_tools(self):
        """注册默认工具"""
        # 从现有 Agent 复制工具注册逻辑
        from app.agent.tools.impl import search, calculator
        self.tool_registry.register(
            name="search",
            func=search.search_web,
            description="Search the web",
            security_level="safe"
        )
        # ... 其他工具

    async def execute(self, task: str, session_id: str = None) -> dict:
        """执行多 Agent 任务"""
        # 创建 LangGraph
        from .graph import create_multi_agent_graph

        llm = self.llm_factory.get_llm()
        graph = create_multi_agent_graph(
            llm=llm,
            tool_registry=self.tool_registry,
            max_iterations=self.max_iterations
        )

        # 初始化状态
        initial_state = {
            "original_task": task,
            "task_plan": [],
            "current_task_index": 0,
            "results": {},
            "iteration": 0,
            "next_node": "supervisor",
            "shared_context": {},
            "final_output": "",
            "status": "running"
        }

        # 执行
        result = await graph.ainvoke(initial_state)

        # 保存到 session
        if session_id:
            self.session_manager.add_message(session_id, "user", task)
            self.session_manager.add_message(session_id, "assistant", result["final_output"])

        return result

七、文件结构

agent/
├── __init__.py
├── multi/
│   ├── __init__.py
│   ├── types.py              # 数据类型定义
│   ├── prompts.py            # Prompt 模板
│   ├── supervisor.py         # Supervisor Agent
│   ├── graph.py              # LangGraph 流程图
│   ├── iteration.py          # 迭代控制
│   ├── integration.py        # 与现有系统集成
│   ├── queue.py              # 任务队列(可选)
│   └── workers/
│       ├── __init__.py
│       ├── base.py           # Worker 基类
│       ├── research.py       # Research Worker
│       ├── coder.py          # Coder Worker
│       └── review.py         # Review Worker

八、实现顺序

  1. Phase 1: 基础架构

    • 定义数据类型 (types.py)
    • 创建 Prompt 模板 (prompts.py)
  2. Phase 2: Supervisor

    • 实现 SupervisorAgent
    • 实现任务规划和分发逻辑
  3. Phase 3: Workers

    • 实现 BaseWorker
    • 实现 ResearchWorker
    • 实现 CoderWorker
    • 实现 ReviewWorker
  4. Phase 4: 流程编排

    • 实现 LangGraph 流程图
    • 添加条件边和迭代控制
  5. Phase 5: 集成

    • 与现有 Agent 系统集成
    • 添加 API 接口

九、测试计划

  1. 单元测试: 测试各 Worker 的执行逻辑
  2. 集成测试: 测试完整的 Supervisor + Workers 流程
  3. 迭代测试: 测试重试和迭代逻辑
  4. 端到端测试: 模拟真实任务执行