710 lines
22 KiB
Markdown
710 lines
22 KiB
Markdown
|
|
# 多智能体联动系统实现计划
|
|||
|
|
|
|||
|
|
## 项目概述
|
|||
|
|
|
|||
|
|
基于 LangGraph 实现类似 OpenClaw 的多智能体协作系统,采用 Supervisor + Workers 层级架构。
|
|||
|
|
|
|||
|
|
### 核心特性
|
|||
|
|
- **任务规划**: Supervisor 分析任务并生成执行计划
|
|||
|
|
- **动态分发**: LLM 自主决策调用哪个 Worker
|
|||
|
|
- **并行执行**: 支持多个 Worker 同时处理任务
|
|||
|
|
- **结果汇总**: Supervisor 汇总所有 Worker 结果
|
|||
|
|
- **迭代优化**: 支持 Review 机制和迭代重试
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
## 一、系统架构
|
|||
|
|
|
|||
|
|
### 1.1 整体架构图
|
|||
|
|
|
|||
|
|
```
|
|||
|
|
┌─────────────────────────────────────────────────────────────────┐
|
|||
|
|
│ MultiAgentSystem │
|
|||
|
|
│ ┌───────────────────────────────────────────────────────────┐ │
|
|||
|
|
│ │ Supervisor Agent │ │
|
|||
|
|
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
|
|||
|
|
│ │ │ Planner │ │ Dispatcher │ │ Aggregator │ │ │
|
|||
|
|
│ │ │ (任务规划) │ │ (任务分发) │ │ (结果汇总) │ │ │
|
|||
|
|
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
|
|||
|
|
│ └───────────────────────────────────────────────────────────┘ │
|
|||
|
|
│ │ │
|
|||
|
|
│ ┌──────────────────┼──────────────────┐ │
|
|||
|
|
│ ▼ ▼ ▼ │
|
|||
|
|
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
|
|||
|
|
│ │ Research │ │ Coder │ │ Review │ │
|
|||
|
|
│ │ Worker │ │ Worker │ │ Worker │ │
|
|||
|
|
│ └─────────────┘ └─────────────┘ └─────────────┘ │
|
|||
|
|
│ │ │ │ │
|
|||
|
|
│ └──────────────────┴──────────────────┘ │
|
|||
|
|
│ │ │
|
|||
|
|
│ ▼ │
|
|||
|
|
│ ┌─────────────┐ │
|
|||
|
|
│ │ Shared State │ │
|
|||
|
|
│ │ (共享状态) │ │
|
|||
|
|
│ └─────────────┘ │
|
|||
|
|
└─────────────────────────────────────────────────────────────────┘
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
### 1.2 核心组件
|
|||
|
|
|
|||
|
|
| 组件 | 职责 | 文件位置 |
|
|||
|
|
|------|------|----------|
|
|||
|
|
| **SupervisorAgent** | 任务分析、规划、分发、汇总 | `agent/multi/supervisor.py` |
|
|||
|
|
| **BaseWorker** | Worker 基类,定义执行接口 | `agent/multi/workers/base.py` |
|
|||
|
|
| **ResearchWorker** | 信息搜索和调研 | `agent/multi/workers/research.py` |
|
|||
|
|
| **CoderWorker** | 代码编写和修改 | `agent/multi/workers/coder.py` |
|
|||
|
|
| **ReviewWorker** | 结果检查和评审 | `agent/multi/workers/review.py` |
|
|||
|
|
| **SharedState** | 跨 Agent 共享状态 | `agent/multi/state.py` |
|
|||
|
|
| **TaskQueue** | 任务队列管理 | `agent/multi/queue.py` |
|
|||
|
|
| **MultiAgentGraph** | LangGraph 流程编排 | `agent/multi/graph.py` |
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
## 二、数据结构设计
|
|||
|
|
|
|||
|
|
### 2.1 Agent State 定义
|
|||
|
|
|
|||
|
|
```python
|
|||
|
|
# agent/multi/types.py
|
|||
|
|
from typing import TypedDict, Annotated, Optional
|
|||
|
|
from operator import add
|
|||
|
|
from pydantic import BaseModel
|
|||
|
|
|
|||
|
|
|
|||
|
|
class TaskItem(BaseModel):
|
|||
|
|
"""单个任务项"""
|
|||
|
|
id: str
|
|||
|
|
description: str
|
|||
|
|
assigned_agent: str # research / coder / review
|
|||
|
|
status: str # pending / running / completed / failed
|
|||
|
|
result: Optional[dict] = None
|
|||
|
|
error: Optional[str] = None
|
|||
|
|
retry_count: int = 0
|
|||
|
|
|
|||
|
|
|
|||
|
|
class AgentState(TypedDict):
|
|||
|
|
"""贯穿整个图的 Agent 状态"""
|
|||
|
|
# 用户输入
|
|||
|
|
original_task: str # 原始任务描述
|
|||
|
|
|
|||
|
|
# 任务规划
|
|||
|
|
task_plan: list[TaskItem] # 分解后的任务列表
|
|||
|
|
current_task_index: int # 当前执行的任务索引
|
|||
|
|
|
|||
|
|
# 执行结果
|
|||
|
|
results: dict # {task_id: result}
|
|||
|
|
|
|||
|
|
# 流程控制
|
|||
|
|
iteration: int # 当前迭代次数
|
|||
|
|
next_node: str # 下一个节点名称
|
|||
|
|
|
|||
|
|
# 共享上下文
|
|||
|
|
shared_context: dict # Agent 间共享的数据
|
|||
|
|
|
|||
|
|
# 最终输出
|
|||
|
|
final_output: str
|
|||
|
|
status: str # running / completed / failed
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
### 2.2 Supervisor 输出结构
|
|||
|
|
|
|||
|
|
```python
|
|||
|
|
# Supervisor 的结构化输出
|
|||
|
|
class SupervisorDecision(BaseModel):
|
|||
|
|
"""Supervisor 的决策"""
|
|||
|
|
analysis: str # 任务分析
|
|||
|
|
task_plan: list[TaskItem] # 任务计划
|
|||
|
|
need_aggregation: bool # 是否需要汇总
|
|||
|
|
next_worker: str # 下一个执行的 Worker
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
## 三、核心实现
|
|||
|
|
|
|||
|
|
### 3.1 Supervisor Agent
|
|||
|
|
|
|||
|
|
```python
|
|||
|
|
# agent/multi/supervisor.py
|
|||
|
|
from langchain_core.language_models import BaseChatModel
|
|||
|
|
from langchain_core.output_parsers import PydanticOutputParser
|
|||
|
|
from pydantic import BaseModel
|
|||
|
|
from typing import Type
|
|||
|
|
|
|||
|
|
from .types import AgentState, TaskItem, SupervisorDecision
|
|||
|
|
from .prompts import SUPERVISOR_SYSTEM_PROMPT
|
|||
|
|
|
|||
|
|
|
|||
|
|
class SupervisorAgent:
|
|||
|
|
"""Supervisor Agent - 负责任务规划和分发"""
|
|||
|
|
|
|||
|
|
def __init__(
|
|||
|
|
self,
|
|||
|
|
llm: BaseChatModel,
|
|||
|
|
max_iterations: int = 3
|
|||
|
|
):
|
|||
|
|
self.llm = llm
|
|||
|
|
self.max_iterations = max_iterations
|
|||
|
|
self.output_parser = PydanticOutputParser(pydantic_object=SupervisorDecision)
|
|||
|
|
|
|||
|
|
def create_node(self):
|
|||
|
|
"""创建 Supervisor 节点"""
|
|||
|
|
return self._supervisor_node
|
|||
|
|
|
|||
|
|
async def _supervisor_node(self, state: AgentState) -> dict:
|
|||
|
|
"""Supervisor 节点逻辑"""
|
|||
|
|
# 首次调用:分析任务并生成计划
|
|||
|
|
if not state.get("task_plan"):
|
|||
|
|
decision = await self._plan_tasks(state["original_task"])
|
|||
|
|
return {
|
|||
|
|
"task_plan": decision.task_plan,
|
|||
|
|
"next_node": decision.next_worker,
|
|||
|
|
"current_task_index": 0,
|
|||
|
|
"shared_context": {"task_analysis": decision.analysis}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
# 检查是否需要继续
|
|||
|
|
current_task = state["task_plan"][state["current_task_index"]]
|
|||
|
|
|
|||
|
|
if current_task["status"] == "completed":
|
|||
|
|
# 当前任务完成,检查是否还有更多任务
|
|||
|
|
if state["current_task_index"] + 1 < len(state["task_plan"]):
|
|||
|
|
next_index = state["current_task_index"] + 1
|
|||
|
|
next_task = state["task_plan"][next_index]
|
|||
|
|
return {
|
|||
|
|
"current_task_index": next_index,
|
|||
|
|
"next_node": next_task["assigned_agent"]
|
|||
|
|
}
|
|||
|
|
else:
|
|||
|
|
# 所有任务完成,进入汇总
|
|||
|
|
return {"next_node": "aggregate"}
|
|||
|
|
|
|||
|
|
elif current_task["status"] == "failed":
|
|||
|
|
# 任务失败,检查是否超过最大重试
|
|||
|
|
if current_task["retry_count"] >= self.max_iterations:
|
|||
|
|
return {"next_node": "aggregate", "status": "failed"}
|
|||
|
|
else:
|
|||
|
|
# 重试
|
|||
|
|
return {"next_node": current_task["assigned_agent"]}
|
|||
|
|
|
|||
|
|
return {"next_node": state.get("next_node", "aggregate")}
|
|||
|
|
|
|||
|
|
async def _plan_tasks(self, task: str) -> SupervisorDecision:
|
|||
|
|
"""调用 LLM 生成任务计划"""
|
|||
|
|
prompt = SUPERVISOR_SYSTEM_PROMPT.format(task=task)
|
|||
|
|
|
|||
|
|
response = await self.llm.ainvoke([
|
|||
|
|
{"role": "system", "content": prompt},
|
|||
|
|
{"role": "user", "content": "请分析任务并制定执行计划。"}
|
|||
|
|
])
|
|||
|
|
|
|||
|
|
# 解析 LLM 输出为结构化决策
|
|||
|
|
# ... (实现解析逻辑)
|
|||
|
|
return decision
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
### 3.2 Worker 基类
|
|||
|
|
|
|||
|
|
```python
|
|||
|
|
# agent/multi/workers/base.py
|
|||
|
|
from abc import ABC, abstractmethod
|
|||
|
|
from typing import Any
|
|||
|
|
from langchain_core.language_models import BaseChatModel
|
|||
|
|
|
|||
|
|
from ..types import AgentState
|
|||
|
|
|
|||
|
|
|
|||
|
|
class BaseWorker(ABC):
|
|||
|
|
"""Worker Agent 基类"""
|
|||
|
|
|
|||
|
|
def __init__(
|
|||
|
|
self,
|
|||
|
|
llm: BaseChatModel,
|
|||
|
|
name: str,
|
|||
|
|
system_prompt: str,
|
|||
|
|
tools: list = None
|
|||
|
|
):
|
|||
|
|
self.llm = llm
|
|||
|
|
self.name = name
|
|||
|
|
self.system_prompt = system_prompt
|
|||
|
|
self.tools = tools or []
|
|||
|
|
|
|||
|
|
@abstractmethod
|
|||
|
|
async def execute(self, task: TaskItem, context: dict) -> dict:
|
|||
|
|
"""执行任务"""
|
|||
|
|
pass
|
|||
|
|
|
|||
|
|
def create_node(self):
|
|||
|
|
"""创建 LangGraph 节点"""
|
|||
|
|
async def node(state: AgentState) -> dict:
|
|||
|
|
task = state["task_plan"][state["current_task_index"]]
|
|||
|
|
result = await self.execute(task, state.get("shared_context", {}))
|
|||
|
|
|
|||
|
|
# 更新状态
|
|||
|
|
return {
|
|||
|
|
"results": {task.id: result},
|
|||
|
|
"task_plan": self._update_task_status(
|
|||
|
|
state["task_plan"],
|
|||
|
|
task.id,
|
|||
|
|
"completed" if result.get("success") else "failed"
|
|||
|
|
),
|
|||
|
|
"shared_context": {**state.get("shared_context", {}), **result.get("context", {})}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return node
|
|||
|
|
|
|||
|
|
def _update_task_status(self, tasks: list, task_id: str, status: str) -> list:
|
|||
|
|
"""更新任务状态"""
|
|||
|
|
return [
|
|||
|
|
{**task, "status": status} if task["id"] == task_id else task
|
|||
|
|
for task in tasks
|
|||
|
|
]
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
### 3.3 任务队列(可选:支持并行执行)
|
|||
|
|
|
|||
|
|
```python
|
|||
|
|
# agent/multi/queue.py
|
|||
|
|
import asyncio
|
|||
|
|
from typing import Any, Callable
|
|||
|
|
from dataclasses import dataclass
|
|||
|
|
from enum import Enum
|
|||
|
|
|
|||
|
|
|
|||
|
|
class TaskStatus(Enum):
|
|||
|
|
PENDING = "pending"
|
|||
|
|
RUNNING = "running"
|
|||
|
|
COMPLETED = "completed"
|
|||
|
|
FAILED = "failed"
|
|||
|
|
|
|||
|
|
|
|||
|
|
@dataclass
|
|||
|
|
class QueuedTask:
|
|||
|
|
id: str
|
|||
|
|
agent_name: str
|
|||
|
|
task_data: Any
|
|||
|
|
status: TaskStatus = TaskStatus.PENDING
|
|||
|
|
result: Any = None
|
|||
|
|
error: str = None
|
|||
|
|
|
|||
|
|
|
|||
|
|
class TaskQueue:
|
|||
|
|
"""任务队列 - 支持并行执行多个 Worker"""
|
|||
|
|
|
|||
|
|
def __init__(self, max_concurrent: int = 3):
|
|||
|
|
self.max_concurrent = max_concurrent
|
|||
|
|
self.queue: asyncio.Queue = asyncio.Queue()
|
|||
|
|
self.results: dict = {}
|
|||
|
|
self._running = 0
|
|||
|
|
|
|||
|
|
async def add_task(self, task: QueuedTask):
|
|||
|
|
"""添加任务到队列"""
|
|||
|
|
await self.queue.put(task)
|
|||
|
|
|
|||
|
|
async def execute_all(self, worker_factory: Callable):
|
|||
|
|
"""执行所有任务"""
|
|||
|
|
async def worker():
|
|||
|
|
while True:
|
|||
|
|
try:
|
|||
|
|
task = self.queue.get_nowait()
|
|||
|
|
except asyncio.QueueEmpty:
|
|||
|
|
break
|
|||
|
|
|
|||
|
|
self._running += 1
|
|||
|
|
task.status = TaskStatus.Running
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
worker_instance = worker_factory(task.agent_name)
|
|||
|
|
task.result = await worker_instance.execute(task.task_data)
|
|||
|
|
task.status = TaskStatus.COMPLETED
|
|||
|
|
except Exception as e:
|
|||
|
|
task.status = TaskStatus.FAILED
|
|||
|
|
task.error = str(e)
|
|||
|
|
finally:
|
|||
|
|
self._running -= 1
|
|||
|
|
self.results[task.id] = task
|
|||
|
|
|
|||
|
|
# 启动多个 worker 协程
|
|||
|
|
workers = [asyncio.create_task(worker()) for _ in range(self.max_concurrent)]
|
|||
|
|
await asyncio.gather(*workers)
|
|||
|
|
|
|||
|
|
return self.results
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
### 3.4 LangGraph 流程编排
|
|||
|
|
|
|||
|
|
```python
|
|||
|
|
# agent/multi/graph.py
|
|||
|
|
from langgraph.graph import StateGraph, END
|
|||
|
|
from langgraph.prebuilt import ToolNode
|
|||
|
|
|
|||
|
|
from .types import AgentState
|
|||
|
|
from .supervisor import SupervisorAgent
|
|||
|
|
from .workers.research import ResearchWorker
|
|||
|
|
from .workers.coder import CoderWorker
|
|||
|
|
from .workers.review import ReviewWorker
|
|||
|
|
from .aggregator import ResultAggregator
|
|||
|
|
|
|||
|
|
|
|||
|
|
def create_multi_agent_graph(
|
|||
|
|
llm,
|
|||
|
|
tool_registry,
|
|||
|
|
max_iterations: int = 3
|
|||
|
|
) -> StateGraph:
|
|||
|
|
"""创建多 Agent 流程图"""
|
|||
|
|
|
|||
|
|
# 初始化组件
|
|||
|
|
supervisor = SupervisorAgent(llm, max_iterations)
|
|||
|
|
research_worker = ResearchWorker(llm, tool_registry)
|
|||
|
|
coder_worker = CoderWorker(llm, tool_registry)
|
|||
|
|
review_worker = ReviewWorker(llm, tool_registry)
|
|||
|
|
aggregator = ResultAggregator(llm)
|
|||
|
|
|
|||
|
|
# 创建图
|
|||
|
|
graph = StateGraph(AgentState)
|
|||
|
|
|
|||
|
|
# 添加节点
|
|||
|
|
graph.add_node("supervisor", supervisor.create_node())
|
|||
|
|
graph.add_node("research", research_worker.create_node())
|
|||
|
|
graph.add_node("coder", coder_worker.create_node())
|
|||
|
|
graph.add_node("review", review_worker.create_node())
|
|||
|
|
graph.add_node("aggregate", aggregator.create_node())
|
|||
|
|
|
|||
|
|
# 设置入口
|
|||
|
|
graph.set_entry_point("supervisor")
|
|||
|
|
|
|||
|
|
# 添加边
|
|||
|
|
graph.add_edge("supervisor", "research")
|
|||
|
|
graph.add_edge("research", "review")
|
|||
|
|
graph.add_edge("coder", "review")
|
|||
|
|
|
|||
|
|
# 条件边:从 review 回到 supervisor
|
|||
|
|
def should_continue(state: AgentState) -> str:
|
|||
|
|
if state.get("status") == "failed":
|
|||
|
|
return "aggregate"
|
|||
|
|
if state.get("iteration", 0) >= max_iterations:
|
|||
|
|
return "aggregate"
|
|||
|
|
if state.get("current_task_index", 0) >= len(state.get("task_plan", [])):
|
|||
|
|
return "aggregate"
|
|||
|
|
return "supervisor"
|
|||
|
|
|
|||
|
|
graph.add_conditional_edges(
|
|||
|
|
"review",
|
|||
|
|
should_continue,
|
|||
|
|
{
|
|||
|
|
"supervisor": "supervisor",
|
|||
|
|
"aggregate": "aggregate"
|
|||
|
|
}
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
# 结束节点
|
|||
|
|
graph.add_edge("aggregate", END)
|
|||
|
|
|
|||
|
|
return graph.compile()
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
## 四、Prompt 设计
|
|||
|
|
|
|||
|
|
### 4.1 Supervisor System Prompt
|
|||
|
|
|
|||
|
|
```python
|
|||
|
|
# agent/multi/prompts.py
|
|||
|
|
|
|||
|
|
SUPERVISOR_SYSTEM_PROMPT = """你是一个任务规划专家(Supervisor)。你的职责是将复杂任务分解为可执行的子任务,并分配给合适的执行 Agent。
|
|||
|
|
|
|||
|
|
## 可用的 Worker Agent
|
|||
|
|
- **research**: 信息搜索和调研
|
|||
|
|
- **coder**: 代码编写、修改和调试
|
|||
|
|
- **review**: 结果检查、质量评审
|
|||
|
|
|
|||
|
|
## 任务
|
|||
|
|
{task}
|
|||
|
|
|
|||
|
|
## 请按以下步骤执行
|
|||
|
|
|
|||
|
|
### 步骤 1: 任务分析
|
|||
|
|
分析任务的性质,确定需要哪些步骤来完成。
|
|||
|
|
|
|||
|
|
### 步骤 2: 任务分解
|
|||
|
|
将任务分解为独立的子任务。每个子任务应该:
|
|||
|
|
- 描述清晰
|
|||
|
|
- 可以由单个 Agent 完成
|
|||
|
|
- 有明确的完成标准
|
|||
|
|
|
|||
|
|
### 步骤 3: 分配 Agent
|
|||
|
|
为每个子任务选择最合适的执行 Agent。
|
|||
|
|
|
|||
|
|
### 步骤 4: 确定执行顺序
|
|||
|
|
如果有依赖关系,确定正确的执行顺序。
|
|||
|
|
|
|||
|
|
## 输出格式
|
|||
|
|
请以 JSON 格式输出你的决策:
|
|||
|
|
```json
|
|||
|
|
{{
|
|||
|
|
"analysis": "任务分析...",
|
|||
|
|
"task_plan": [
|
|||
|
|
{{
|
|||
|
|
"id": "task_1",
|
|||
|
|
"description": "子任务描述",
|
|||
|
|
"assigned_agent": "research"
|
|||
|
|
}},
|
|||
|
|
{{
|
|||
|
|
"id": "task_2",
|
|||
|
|
"description": "子任务描述",
|
|||
|
|
"assigned_agent": "coder"
|
|||
|
|
}}
|
|||
|
|
],
|
|||
|
|
"need_aggregation": true,
|
|||
|
|
"next_worker": "research"
|
|||
|
|
}}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
## 注意
|
|||
|
|
- 如果任务很简单,可以只分配给一个 Agent
|
|||
|
|
- 如果任务需要迭代优化,确保有 review 环节
|
|||
|
|
- 考虑任务之间的依赖关系
|
|||
|
|
"""
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
### 4.2 Review Worker Prompt
|
|||
|
|
|
|||
|
|
```python
|
|||
|
|
REVIEW_SYSTEM_PROMPT = """你是一个代码和结果评审专家(Reviewer)。你的职责是检查任务执行结果是否符合要求。
|
|||
|
|
|
|||
|
|
## 任务描述
|
|||
|
|
{task_description}
|
|||
|
|
|
|||
|
|
## 执行结果
|
|||
|
|
{execution_result}
|
|||
|
|
|
|||
|
|
## 检查标准
|
|||
|
|
1. 结果是否完整解决了原始任务?
|
|||
|
|
2. 输出格式是否正确?
|
|||
|
|
3. 是否存在明显的错误或遗漏?
|
|||
|
|
4. 代码是否有潜在问题?
|
|||
|
|
|
|||
|
|
## 请输出评审结果
|
|||
|
|
```json
|
|||
|
|
{{
|
|||
|
|
"passed": true/false,
|
|||
|
|
"issues": [
|
|||
|
|
{{"severity": "high/medium/low", "description": "问题描述"}}
|
|||
|
|
],
|
|||
|
|
"suggestions": ["改进建议"]
|
|||
|
|
}}
|
|||
|
|
```
|
|||
|
|
"""
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
## 五、迭代控制
|
|||
|
|
|
|||
|
|
### 5.1 迭代逻辑
|
|||
|
|
|
|||
|
|
```python
|
|||
|
|
# agent/multi/iteration.py
|
|||
|
|
from typing import Optional
|
|||
|
|
|
|||
|
|
|
|||
|
|
class IterationController:
|
|||
|
|
"""迭代控制器"""
|
|||
|
|
|
|||
|
|
def __init__(self, max_iterations: int = 3):
|
|||
|
|
self.max_iterations = max_iterations
|
|||
|
|
|
|||
|
|
def should_continue(
|
|||
|
|
self,
|
|||
|
|
iteration: int,
|
|||
|
|
task_status: str,
|
|||
|
|
review_result: dict
|
|||
|
|
) -> tuple[bool, str]:
|
|||
|
|
"""
|
|||
|
|
判断是否继续迭代
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
(是否继续, 原因)
|
|||
|
|
"""
|
|||
|
|
# 超过最大迭代次数
|
|||
|
|
if iteration >= self.max_iterations:
|
|||
|
|
return False, "max_iterations_reached"
|
|||
|
|
|
|||
|
|
# 任务成功完成
|
|||
|
|
if task_status == "completed" and review_result.get("passed"):
|
|||
|
|
return False, "task_completed"
|
|||
|
|
|
|||
|
|
# 任务失败且不可重试
|
|||
|
|
if task_status == "failed" and not review_result.get("retryable"):
|
|||
|
|
return False, "task_failed"
|
|||
|
|
|
|||
|
|
# 需要重试
|
|||
|
|
if review_result.get("issues") and review_result.get("passed") is False:
|
|||
|
|
return True, "needs_retry"
|
|||
|
|
|
|||
|
|
return True, "continue"
|
|||
|
|
|
|||
|
|
def get_next_action(
|
|||
|
|
self,
|
|||
|
|
review_result: dict,
|
|||
|
|
current_worker: str
|
|||
|
|
) -> str:
|
|||
|
|
"""确定下一步动作"""
|
|||
|
|
if review_result.get("passed"):
|
|||
|
|
return "supervisor" # 返回 Supervisor
|
|||
|
|
|
|||
|
|
# 根据问题类型决定下一步
|
|||
|
|
issues = review_result.get("issues", [])
|
|||
|
|
high_severity = any(i.get("severity") == "high" for i in issues)
|
|||
|
|
|
|||
|
|
if high_severity:
|
|||
|
|
# 严重问题,重新执行相同任务
|
|||
|
|
return current_worker
|
|||
|
|
else:
|
|||
|
|
# 轻微问题,可以继续
|
|||
|
|
return "supervisor"
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
## 六、与现有系统集成
|
|||
|
|
|
|||
|
|
### 6.1 复用现有组件
|
|||
|
|
|
|||
|
|
```python
|
|||
|
|
# agent/multi/integration.py
|
|||
|
|
from app.agent.core.agent import AgentManager
|
|||
|
|
from app.agent.tools.registry import ToolRegistry
|
|||
|
|
from app.agent.memory.session import SessionManager
|
|||
|
|
from app.llm.factory import LLMFactory
|
|||
|
|
|
|||
|
|
|
|||
|
|
class MultiAgentSystem:
|
|||
|
|
"""多智能体系统 - 集成现有组件"""
|
|||
|
|
|
|||
|
|
def __init__(self, config: dict):
|
|||
|
|
# 复用现有 LLM Factory
|
|||
|
|
self.llm_factory = LLMFactory(
|
|||
|
|
provider=config.get("llm_provider", "openai"),
|
|||
|
|
openai_api_key=config.get("openai_api_key"),
|
|||
|
|
anthropic_api_key=config.get("anthropic_api_key")
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
# 复用现有 Tool Registry
|
|||
|
|
self.tool_registry = ToolRegistry()
|
|||
|
|
self._register_default_tools()
|
|||
|
|
|
|||
|
|
# 复用现有 Session Manager
|
|||
|
|
self.session_manager = SessionManager()
|
|||
|
|
|
|||
|
|
# 配置
|
|||
|
|
self.max_iterations = config.get("max_iterations", 3)
|
|||
|
|
|
|||
|
|
def _register_default_tools(self):
|
|||
|
|
"""注册默认工具"""
|
|||
|
|
# 从现有 Agent 复制工具注册逻辑
|
|||
|
|
from app.agent.tools.impl import search, calculator
|
|||
|
|
self.tool_registry.register(
|
|||
|
|
name="search",
|
|||
|
|
func=search.search_web,
|
|||
|
|
description="Search the web",
|
|||
|
|
security_level="safe"
|
|||
|
|
)
|
|||
|
|
# ... 其他工具
|
|||
|
|
|
|||
|
|
async def execute(self, task: str, session_id: str = None) -> dict:
|
|||
|
|
"""执行多 Agent 任务"""
|
|||
|
|
# 创建 LangGraph
|
|||
|
|
from .graph import create_multi_agent_graph
|
|||
|
|
|
|||
|
|
llm = self.llm_factory.get_llm()
|
|||
|
|
graph = create_multi_agent_graph(
|
|||
|
|
llm=llm,
|
|||
|
|
tool_registry=self.tool_registry,
|
|||
|
|
max_iterations=self.max_iterations
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
# 初始化状态
|
|||
|
|
initial_state = {
|
|||
|
|
"original_task": task,
|
|||
|
|
"task_plan": [],
|
|||
|
|
"current_task_index": 0,
|
|||
|
|
"results": {},
|
|||
|
|
"iteration": 0,
|
|||
|
|
"next_node": "supervisor",
|
|||
|
|
"shared_context": {},
|
|||
|
|
"final_output": "",
|
|||
|
|
"status": "running"
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
# 执行
|
|||
|
|
result = await graph.ainvoke(initial_state)
|
|||
|
|
|
|||
|
|
# 保存到 session
|
|||
|
|
if session_id:
|
|||
|
|
self.session_manager.add_message(session_id, "user", task)
|
|||
|
|
self.session_manager.add_message(session_id, "assistant", result["final_output"])
|
|||
|
|
|
|||
|
|
return result
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
## 七、文件结构
|
|||
|
|
|
|||
|
|
```
|
|||
|
|
agent/
|
|||
|
|
├── __init__.py
|
|||
|
|
├── multi/
|
|||
|
|
│ ├── __init__.py
|
|||
|
|
│ ├── types.py # 数据类型定义
|
|||
|
|
│ ├── prompts.py # Prompt 模板
|
|||
|
|
│ ├── supervisor.py # Supervisor Agent
|
|||
|
|
│ ├── graph.py # LangGraph 流程图
|
|||
|
|
│ ├── iteration.py # 迭代控制
|
|||
|
|
│ ├── integration.py # 与现有系统集成
|
|||
|
|
│ ├── queue.py # 任务队列(可选)
|
|||
|
|
│ └── workers/
|
|||
|
|
│ ├── __init__.py
|
|||
|
|
│ ├── base.py # Worker 基类
|
|||
|
|
│ ├── research.py # Research Worker
|
|||
|
|
│ ├── coder.py # Coder Worker
|
|||
|
|
│ └── review.py # Review Worker
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
## 八、实现顺序
|
|||
|
|
|
|||
|
|
1. **Phase 1: 基础架构**
|
|||
|
|
- 定义数据类型 (types.py)
|
|||
|
|
- 创建 Prompt 模板 (prompts.py)
|
|||
|
|
|
|||
|
|
2. **Phase 2: Supervisor**
|
|||
|
|
- 实现 SupervisorAgent
|
|||
|
|
- 实现任务规划和分发逻辑
|
|||
|
|
|
|||
|
|
3. **Phase 3: Workers**
|
|||
|
|
- 实现 BaseWorker
|
|||
|
|
- 实现 ResearchWorker
|
|||
|
|
- 实现 CoderWorker
|
|||
|
|
- 实现 ReviewWorker
|
|||
|
|
|
|||
|
|
4. **Phase 4: 流程编排**
|
|||
|
|
- 实现 LangGraph 流程图
|
|||
|
|
- 添加条件边和迭代控制
|
|||
|
|
|
|||
|
|
5. **Phase 5: 集成**
|
|||
|
|
- 与现有 Agent 系统集成
|
|||
|
|
- 添加 API 接口
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
## 九、测试计划
|
|||
|
|
|
|||
|
|
1. **单元测试**: 测试各 Worker 的执行逻辑
|
|||
|
|
2. **集成测试**: 测试完整的 Supervisor + Workers 流程
|
|||
|
|
3. **迭代测试**: 测试重试和迭代逻辑
|
|||
|
|
4. **端到端测试**: 模拟真实任务执行
|