feat: 新增 teams 目录

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-11 14:26:47 +08:00
parent 85b4c51fd7
commit 8249f67351
33 changed files with 4841 additions and 0 deletions

View File

@@ -0,0 +1,874 @@
# 多智能体群聊系统实现计划
## 项目概述
实现类似"一人公司"的多智能体群聊系统,支持多个 Agent 在群聊中讨论问题,类似于人类的团队会议。
### 核心特性
- **三阶段流程**: 观点提出 → 讨论完善 → CEO 总结决策
- **智能轮数**: AI 判断讨论是否充分,自动决定轮数
- **用户插话**: 用户可以随时打断发表意见
- **角色扮演**: 可配置不同角色的 AgentCEO、CTO、Designer 等)
- **复用架构**: 复用现有的 LLM、ToolRegistry、SessionManager
---
## 一、系统架构
### 1.1 整体架构图
```
┌─────────────────────────────────────────────────────────────────┐
│ Agent Group Chat System │
│ ┌─────────────────────────────────────────────────────────────┐│
│ │ GroupChatManager ││
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ││
│ │ │ Presenter │ │ Discusser │ │ Summarizer │ ││
│ │ │ Controller │ │ Controller │ │ Controller │ ││
│ │ └─────────────┘ └─────────────┘ └─────────────┘ ││
│ │ │ │ │ ││
│ │ └────────────────┴────────────────┘ ││
│ │ │ ││
│ │ SmartRoundController ││
│ └─────────────────────────────────────────────────────────────┘│
│ │ │
│ ┌──────────────────┼──────────────────┐ │
│ ▼ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ CEO Agent │ │ CTO Agent │ │Designer Agent│ │
│ │ Participant │ │ Participant │ │ Participant │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │ │ │ │
│ └──────────────────┴──────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │GroupContext │ │
│ │(共享上下文) │ │
│ └─────────────┘ │
└─────────────────────────────────────────────────────────────────┘
```
### 1.2 核心组件
| 组件 | 职责 | 文件位置 |
|------|------|----------|
| **GroupChatManager** | 群聊管理器,控制整体流程 | `group_chat/manager.py` |
| **Participant** | 参与群聊的 Agent 实例 | `group_chat/participant.py` |
| **StageController** | 阶段控制器,管理各阶段 | `group_chat/stages/controller.py` |
| **PresenterStage** | 观点提出阶段 | `group_chat/stages/presenter.py` |
| **DiscusserStage** | 讨论完善阶段 | `group_chat/stages/discusser.py` |
| **SummarizerStage** | 总结决策阶段 | `group_chat/stages/summarizer.py` |
| **SmartRoundController** | 智能轮数控制 | `group_chat/round_controller.py` |
| **GroupContext** | 共享讨论上下文 | `group_chat/context.py` |
| **GroupMessage** | 群聊消息 | `group_chat/message.py` |
---
## 二、数据结构设计
### 2.1 消息定义
```python
# group_chat/message.py
from typing import Optional
from pydantic import BaseModel, Field
from datetime import datetime
from enum import Enum
class MessageStage(str, Enum):
"""消息所属阶段"""
PRESENT = "present" # 观点提出
DISCUSS = "discuss" # 讨论完善
SUMMARIZE = "summarize" # 总结决策
class GroupMessage(BaseModel):
"""群聊消息"""
id: str = Field(..., description="消息唯一标识")
agent_id: str = Field(..., description="Agent ID")
agent_name: str = Field(..., description="Agent 名称")
agent_role: str = Field(..., description="Agent 角色")
content: str = Field(..., description="消息内容")
timestamp: datetime = Field(default_factory=datetime.now)
stage: MessageStage = Field(..., description="所属阶段")
round: int = Field(default=1, description="轮数")
replying_to: Optional[str] = Field(default=None, description="回复的消息ID")
class UserInterruption(BaseModel):
"""用户插话"""
id: str
content: str
timestamp: datetime = Field(default_factory=datetime.now)
stage: MessageStage
round: int
```
### 2.2 共享上下文
```python
# group_chat/context.py
from typing import Optional
from pydantic import BaseModel, Field
from .message import GroupMessage, UserInterruption
class GroupContext(BaseModel):
"""群聊共享上下文"""
topic: str = Field(..., description="讨论主题")
stage: str = Field(default="present", description="当前阶段")
round: int = Field(default=1, description="当前轮数")
messages: list[GroupMessage] = Field(default_factory=list)
user_interruptions: list[UserInterruption] = Field(default_factory=list)
final_decision: Optional[str] = Field(default=None, description="最终决策")
status: str = Field(default="running", description="running/completed/failed")
class GroupState(BaseModel):
"""群聊状态"""
context: GroupContext
participants: dict[str, dict] # {agent_id: config}
config: dict # 群聊配置
```
---
## 三、核心实现
### 3.1 Agent 角色定义
```python
# group_chat/roles.py
from typing import Optional
# 默认角色配置
DEFAULT_ROLES = {
"ceo": {
"id": "ceo",
"name": "CEO",
"role": "首席执行官",
"system_prompt": """你是一家公司的 CEO负责制定公司战略方向和重大决策。
你的特点是:高瞻远瞩、战略思维、注重长远利益。
在讨论中,你应该:
- 从公司整体战略角度分析问题
- 权衡短期和长期利益
- 做最终决策并承担责任
- 善于总结和归纳各方观点""",
"stages": ["present", "summarize"],
"is_final_decider": True,
"priority": 1
},
"cto": {
"id": "cto",
"name": "CTO",
"role": "首席技术官",
"system_prompt": """你是一家公司的 CTO负责技术决策和技术团队管理。
你的特点是:技术深厚、注重可行性、关注技术趋势。
在讨论中,你应该:
- 从技术角度分析方案的可行性和风险
- 提供技术实现建议
- 评估技术选型
- 关注系统的可扩展性和稳定性""",
"stages": ["present", "discuss"],
"is_final_decider": False,
"priority": 2
},
"designer": {
"id": "designer",
"name": "Designer",
"role": "产品设计师",
"system_prompt": """你是公司的产品设计师,负责用户体验和界面设计。
你的特点是:创意丰富、注重用户体验、审美在线。
在讨论中,你应该:
- 从用户角度分析问题
- 提出用户体验优化建议
- 关注产品细节
- 平衡美观和实用性""",
"stages": ["present", "discuss"],
"is_final_decider": False,
"priority": 3
},
"analyst": {
"id": "analyst",
"name": "Analyst",
"role": "数据分析师",
"system_prompt": """你是公司的数据分析师,负责数据分析和决策支持。
你的特点是:数据敏感、逻辑严谨、注重证据。
在讨论中,你应该:
- 用数据支撑观点
- 提供数据分析结果
- 指出数据驱动的机会和风险
- 关注关键指标""",
"stages": ["present", "discuss"],
"is_final_decider": False,
"priority": 4
}
}
```
### 3.2 Participant参与者 Agent
```python
# group_chat/participant.py
import uuid
from datetime import datetime
from typing import Optional
from langchain_core.language_models import BaseChatModel
from langchain_core.messages import HumanMessage, SystemMessage
from .message import GroupMessage, MessageStage
from .context import GroupContext
class Participant:
"""参与群聊的 Agent"""
def __init__(
self,
llm: BaseChatModel,
config: dict,
stage: str = "present"
):
self.llm = llm
self.config = config
self.id = config["id"]
self.name = config["name"]
self.role = config["role"]
self.system_prompt = config["system_prompt"]
self.stages = config.get("stages", ["present", "discuss", "summarize"])
self.is_final_decider = config.get("is_final_decider", False)
async def generate_message(
self,
context: GroupContext,
stage: MessageStage,
round_num: int,
replying_to: Optional[str] = None
) -> GroupMessage:
"""生成消息"""
# 构建 prompt
prompt = self._build_prompt(context, stage, round_num)
# 调用 LLM
response = await self.llm.ainvoke([
SystemMessage(content=prompt),
HumanMessage(content=f"请就「{context.topic}」发表你的观点。")
])
content = response.content if hasattr(response, 'content') else str(response)
# 创建消息
message = GroupMessage(
id=str(uuid.uuid4()),
agent_id=self.id,
agent_name=self.name,
agent_role=self.role,
content=content,
timestamp=datetime.now(),
stage=stage,
round=round_num,
replying_to=replying_to
)
return message
def _build_prompt(self, context: GroupContext, stage: MessageStage, round_num: int) -> str:
"""构建 prompt"""
base_prompt = self.system_prompt
# 添加上下文
context_info = f"""
## 当前讨论
主题:{context.topic}
阶段:{stage.value}
轮数:{round_num}
## 历史消息
"""
# 添加历史消息(限制数量)
recent_messages = context.messages[-10:] if context.messages else []
for msg in recent_messages:
context_info += f"\n{msg.agent_name}{msg.content}\n"
# 添加用户插话
if context.user_interruptions:
context_info += "\n## 用户插话\n"
for interruption in context.user_interruptions[-3:]:
context_info += f"\n【用户】{interruption.content}\n"
# 阶段特定的指令
stage_instruction = {
"present": "请简洁地提出你的观点和建议。",
"discuss": "请回应其他人的观点,进行讨论和完善。",
"summarize": "请总结各方观点,给出最终决策建议。"
}
full_prompt = f"{base_prompt}\n\n{context_info}\n\n{stage_instruction.get(stage.value, '')}"
return full_prompt
def can_participate(self, stage: MessageStage) -> bool:
"""判断是否可以参与当前阶段"""
return stage.value in self.stages
```
### 3.3 智能轮数控制器
```python
# group_chat/round_controller.py
from langchain_core.language_models import BaseChatModel
from langchain_core.messages import SystemMessage, HumanMessage
from .context import GroupContext
from .message import MessageStage
class SmartRoundController:
"""智能轮数控制器"""
def __init__(self, llm: BaseChatModel, max_rounds: int = 3):
self.llm = llm
self.max_rounds = max_rounds
async def should_continue(
self,
context: GroupContext,
stage: MessageStage
) -> tuple[bool, str]:
"""
判断是否继续下一轮
Returns:
(是否继续, 原因)
"""
# 达到最大轮数
if context.round >= self.max_rounds:
return False, "max_rounds_reached"
# 构建判断 prompt
prompt = self._build_judge_prompt(context, stage)
# 调用 LLM 判断
response = await self.llm.ainvoke([
SystemMessage(content=prompt),
HumanMessage(content="请判断讨论是否已经充分,是否需要更多轮数。")
])
content = response.content.lower() if hasattr(response, 'content') else str(response)
# 解析判断结果
if "充分" in content or "足够" in content or "不需要" in content:
return False, "discussion_sufficient"
elif "不充分" in content or "不够" in content or "需要" in content:
return True, "need_more_discussion"
# 默认继续
return True, "default_continue"
def _build_judge_prompt(self, context: GroupContext, stage: MessageStage) -> str:
"""构建判断 prompt"""
messages_summary = "\n".join([
f"{msg.agent_name}{msg.content[:200]}..."
for msg in context.messages[-5:]
])
return f"""你是一个讨论质量评估专家。请判断当前的讨论是否已经充分。
## 讨论信息
主题:{context.topic}
当前阶段:{stage.value}
当前轮数:{context.round}
最大轮数:{self.max_rounds}
## 最近的讨论内容
{messages_summary}
## 判断标准
- 各方观点是否已经充分表达?
- 是否有建设性的讨论和回应?
- 是否已经形成明确的结论或方向?
## 请输出
如果讨论已经充分,请输出"充分",并简要说明原因。
如果还需要更多讨论,请输出"不充分",并说明需要讨论哪些方面。
"""
```
### 3.4 群聊管理器
```python
# group_chat/manager.py
import uuid
from typing import Optional
from datetime import datetime
from langchain_core.language_models import BaseChatModel
from .context import GroupContext
from .message import GroupMessage, MessageStage, UserInterruption
from .participant import Participant
from .roles import DEFAULT_ROLES
from .round_controller import SmartRoundController
class GroupChatManager:
"""群聊管理器"""
def __init__(
self,
llm: BaseChatModel,
roles: dict = None,
max_rounds: int = 3,
enable_user_interrupt: bool = True
):
self.llm = llm
self.roles = roles or DEFAULT_ROLES
self.max_rounds = max_rounds
self.enable_user_interrupt = enable_user_interrupt
# 初始化组件
self.round_controller = SmartRoundController(llm, max_rounds)
# 运行时状态
self.context: Optional[GroupContext] = None
self.participants: dict[str, Participant] = {}
async def start_chat(self, topic: str) -> dict:
"""开始群聊"""
# 创建上下文
self.context = GroupContext(
topic=topic,
stage="present",
round=1,
messages=[],
user_interruptions=[],
status="running"
)
# 创建参与者
self.participants = {
role_id: Participant(self.llm, config)
for role_id, config in self.roles.items()
}
# 按优先级排序参与者
sorted_participants = sorted(
self.participants.values(),
key=lambda p: p.config.get("priority", 999)
)
# 开始第一阶段
result = await self._run_stage(MessageStage.PRESENT, sorted_participants)
return result
async def _run_stage(
self,
stage: MessageStage,
participants: list[Participant]
) -> dict:
"""运行指定阶段"""
# 更新上下文
self.context.stage = stage.value
# 按阶段获取参与者
stage_participants = [p for p in participants if p.can_participate(stage)]
# 执行多轮
for round_num in range(1, self.max_rounds + 1):
self.context.round = round_num
# 当前阶段参与者发言
messages = await self._run_round(stage, stage_participants, round_num)
# 智能判断是否继续
should_continue, reason = await self.round_controller.should_continue(
self.context, stage
)
if not should_continue:
break
# 阶段完成后的处理
if stage == MessageStage.PRESENT:
# 进入讨论阶段
if any(p.can_participate(MessageStage.DISCUSS) for p in participants):
return await self._run_stage(MessageStage.DISCUSS, participants)
elif stage == MessageStage.DISCUSS:
# 进入总结阶段
summarizer = next(
(p for p in participants if p.is_final_decider),
participants[0]
)
return await self._run_stage(MessageStage.SUMMARIZE, [summarizer])
elif stage == MessageStage.SUMMARIZE:
# 完成
self.context.status = "completed"
return self._build_result()
async def _run_round(
self,
stage: MessageStage,
participants: list[Participant],
round_num: int
) -> list[GroupMessage]:
"""运行一轮发言"""
messages = []
for participant in participants:
# 获取前一轮的最新消息(用于回复)
replying_to = None
if self.context.messages:
last_message = self.context.messages[-1]
if last_message.agent_id != participant.id:
replying_to = last_message.id
# 生成消息
message = await participant.generate_message(
context=self.context,
stage=stage,
round_num=round_num,
replying_to=replying_to
)
# 保存消息
self.context.messages.append(message)
messages.append(message)
return messages
async def add_interruption(self, content: str) -> dict:
"""添加用户插话"""
if not self.enable_user_interrupt:
return {"error": "用户插话已禁用"}
interruption = UserInterruption(
id=str(uuid.uuid4()),
content=content,
timestamp=datetime.now(),
stage=MessageStage(self.context.stage),
round=self.context.round
)
self.context.user_interruptions.append(interruption)
return {"success": True, "interruption_id": interruption.id}
def get_messages(self) -> list[GroupMessage]:
"""获取所有消息"""
return self.context.messages if self.context else []
def get_result(self) -> dict:
"""获取结果"""
return self._build_result()
def _build_result(self) -> dict:
"""构建结果"""
return {
"topic": self.context.topic,
"status": self.context.status,
"messages": [
{
"agent_name": m.agent_name,
"agent_role": m.agent_role,
"content": m.content,
"stage": m.stage.value,
"round": m.round
}
for m in self.context.messages
],
"final_decision": self.context.final_decision,
"user_interruptions": [
{"content": i.content, "timestamp": i.timestamp.isoformat()}
for i in self.context.user_interruptions
]
}
```
---
## 四、阶段实现
### 4.1 Presenter Stage观点提出
```python
# group_chat/stages/presenter.py
from typing import list
from ..participant import Participant
from ..message import GroupMessage
class PresenterStage:
"""观点提出阶段"""
async def execute(
self,
participants: list[Participant],
context
) -> list[GroupMessage]:
"""执行观点提出"""
messages = []
# 按优先级顺序发言
for participant in participants:
if not participant.can_participate("present"):
continue
message = await participant.generate_message(
context=context,
stage="present",
round=context.round
)
messages.append(message)
context.messages.append(message)
return messages
```
### 4.2 Discusser Stage讨论完善
```python
# group_chat/stages/discusser.py
from ..participant import Participant
from ..message import GroupMessage
class DiscusserStage:
"""讨论完善阶段"""
async def execute(
self,
participants: list[Participant],
context
) -> list[GroupMessage]:
"""执行讨论完善"""
messages = []
# 获取上一轮的消息
last_round_messages = [
m for m in context.messages
if m.stage == "present" and m.round == context.round - 1
]
for participant in participants:
if not participant.can_participate("discuss"):
continue
# 选择要回复的消息
replying_to = None
for msg in reversed(last_round_messages):
if msg.agent_id != participant.id:
replying_to = msg.id
break
message = await participant.generate_message(
context=context,
stage="discuss",
round=context.round,
replying_to=replying_to
)
messages.append(message)
context.messages.append(message)
return messages
```
### 4.3 Summarizer Stage总结决策
```python
# group_chat/stages/summarizer.py
from ..participant import Participant
from ..message import GroupMessage
class SummarizerStage:
"""总结决策阶段"""
async def execute(
self,
participants: list[Participant],
context
) -> GroupMessage:
"""执行总结决策"""
# CEO 总结
summarizer = next(
(p for p in participants if p.is_final_decider),
participants[0]
)
message = await summarizer.generate_message(
context=context,
stage="summarize",
round=context.round
)
# 保存最终决策
context.final_decision = message.content
context.messages.append(message)
return message
```
---
## 五、与现有系统集成
### 5.1 复用现有组件
```python
# group_chat/integration.py
from typing import Optional
from app.llm.factory import LLMFactory
from app.agent.tools.registry import ToolRegistry
from app.agent.memory.session import SessionManager
from .manager import GroupChatManager
class GroupChatSystem:
"""群聊系统 - 集成现有组件"""
def __init__(
self,
llm_provider: str = "openai",
openai_api_key: Optional[str] = None,
anthropic_api_key: Optional[str] = None,
roles: dict = None,
max_rounds: int = 3,
enable_user_interrupt: bool = True
):
# 初始化 LLM Factory
self.llm_factory = LLMFactory(
provider=llm_provider,
openai_api_key=openai_api_key,
anthropic_api_key=anthropic_api_key
)
# 初始化 Tool Registry
self.tool_registry = ToolRegistry()
# 初始化 Session Manager
self.session_manager = SessionManager()
# 配置
self.roles = roles
self.max_rounds = max_rounds
self.enable_user_interrupt = enable_user_interrupt
async def start_group_chat(
self,
topic: str,
session_id: str = None
) -> dict:
"""开始群聊"""
# 获取 LLM
llm = self.llm_factory.get_llm()
# 创建群聊管理器
manager = GroupChatManager(
llm=llm,
roles=self.roles,
max_rounds=self.max_rounds,
enable_user_interrupt=self.enable_user_interrupt
)
# 开始群聊
result = await manager.start_chat(topic)
# 保存到 session
if session_id:
self.session_manager.add_message(session_id, "user", topic)
self.session_manager.add_message(
session_id,
"assistant",
result.get("final_decision", str(result))
)
return result
async def add_message(
self,
message: str,
session_id: str
) -> dict:
"""添加用户消息(插话)"""
# 获取 session 中的 manager
# ... 实现获取逻辑
return manager.add_interruption(message)
```
---
## 六、文件结构
```
agent/app/agent/multi/
├── group_chat/
│ ├── __init__.py
│ ├── roles.py # Agent 角色定义
│ ├── message.py # 消息类型
│ ├── context.py # 共享上下文
│ ├── participant.py # 参与者 Agent
│ ├── manager.py # 群聊管理器
│ ├── round_controller.py # 智能轮数控制器
│ ├── stages/
│ │ ├── __init__.py
│ │ ├── controller.py # 阶段控制器
│ │ ├── presenter.py # 观点提出阶段
│ │ ├── discusser.py # 讨论完善阶段
│ │ └── summarizer.py # 总结决策阶段
│ └── integration.py # 与现有系统集成
```
---
## 七、实现顺序
1. **Phase 1: 基础架构**
- 定义数据类型 (message.py, context.py)
- 创建角色配置 (roles.py)
2. **Phase 2: 核心组件**
- 实现 Participant (participant.py)
- 实现 SmartRoundController (round_controller.py)
3. **Phase 3: 阶段实现**
- 实现 PresenterStage
- 实现 DiscusserStage
- 实现 SummarizerStage
4. **Phase 4: 集成**
- 实现 GroupChatManager
- 与现有系统集成
---
## 八、测试计划
1. **单元测试**: 测试各 Participant 的消息生成
2. **集成测试**: 测试完整的群聊流程
3. **轮数控制测试**: 测试智能轮数判断
4. **用户插话测试**: 测试插话机制
5. **端到端测试**: 模拟真实群聊场景

View File

@@ -0,0 +1,80 @@
# Notes: 多智能体群聊系统研究
## 核心概念
### 群聊系统 vs 之前的 Supervisor 系统
| 特性 | Supervisor 系统 | 群聊系统 |
|------|----------------|----------|
| 流程 | 线性:规划 → 执行 → 汇总 | 多阶段循环 |
| Agent 关系 | 层级Supervisor 管理 Workers | 平等协作 |
| 通信方式 | 单向:任务分发 | 多向:互相讨论 |
| 决策方式 | Supervisor 决定 | CEO 最终决策 |
| 用户参与 | 旁观 | 可插话 |
### 设计模式
#### 1. 流水线模式
```
Stage 1 (Presenter) → Stage 2 (Discusser) → Stage 3 (Summarizer)
```
#### 2. 消息传递
- 每个 Stage 维护一个消息队列
- Agent 的输出成为下一个 Agent 的输入
- 使用 Shared Context 存储共享状态
#### 3. 智能轮数
```python
class SmartRoundController:
def should_continue(self, stage, round_num, messages):
# 使用 LLM 判断是否继续
prompt = f"""
当前阶段: {stage}
当前轮数: {round_num}
讨论内容: {messages}
讨论是否已经充分?是否需要更多轮数?
"""
return llm.judge(prompt)
```
## 复用现有架构
### 可复用的组件
1. **LLM Factory** - 语言模型
2. **Tool Registry** - 工具注册
3. **Session Manager** - 会话管理
4. **Agent Executor** - Agent 执行逻辑(部分)
### 需要新增的组件
1. **GroupChatManager** - 群聊管理器
2. **Participant** - 参与者 Agent
3. **Stage Controller** - 阶段控制器
4. **SmartRoundController** - 智能轮数控制器
## 关键数据结构
### GroupMessage
```python
class GroupMessage(BaseModel):
id: str
agent_id: str
agent_name: str
content: str
timestamp: datetime
stage: str # presenter/discusser/summarizer
round: int
replying_to: Optional[str] # 回复的消息 ID
```
### GroupContext
```python
class GroupContext(BaseModel):
topic: str # 讨论主题
stage: str # 当前阶段
round: int # 当前轮数
messages: list[GroupMessage] # 所有消息
user_interruptions: list[str] # 用户插话
final_decision: Optional[str] # 最终决策
```

View File

@@ -0,0 +1,32 @@
# Task Plan: 多智能体群聊系统实现计划
## Goal
实现类似"一人公司"的多智能体群聊系统,支持头脑风暴、任务协作、角色扮演等多模式讨论。
## Phases
- [x] Phase 1: 基础架构设计和核心组件规划
- [ ] Phase 2: 群聊管理器 (GroupChatManager) 实现
- [ ] Phase 3: 参与 Agent (Participant) 实现
- [ ] Phase 4: 三个阶段实现 (Presenter/Discusser/Summarizer)
- [ ] Phase 5: 智能轮数控制实现
- [ ] Phase 6: 用户插话机制实现
- [ ] Phase 7: 与现有系统集成和 API 接口
## Key Questions
1. 如何复用现有的 Supervisor + Workers 架构?
2. 如何实现智能轮数控制?
3. 如何处理用户插话?
## Decisions Made
- 架构:任务流水线模式(观点提出 → 讨论完善 → 总结决策)
- 决策机制CEO Agent 最终决策
- 轮数控制AI 智能判断
- 用户参与:插话模式
- 复用策略:复用现有 LLM、ToolRegistry、SessionManager
## Status
**Currently in Phase 1** - 系统架构设计和核心组件规划已完成
## 实现计划文件
- `group_chat_implementation_plan.md` - 详细实现计划
- `group_chat_notes.md` - 研究笔记

View File

@@ -0,0 +1,709 @@
# 多智能体联动系统实现计划
## 项目概述
基于 LangGraph 实现类似 OpenClaw 的多智能体协作系统,采用 Supervisor + Workers 层级架构。
### 核心特性
- **任务规划**: Supervisor 分析任务并生成执行计划
- **动态分发**: LLM 自主决策调用哪个 Worker
- **并行执行**: 支持多个 Worker 同时处理任务
- **结果汇总**: Supervisor 汇总所有 Worker 结果
- **迭代优化**: 支持 Review 机制和迭代重试
---
## 一、系统架构
### 1.1 整体架构图
```
┌─────────────────────────────────────────────────────────────────┐
│ MultiAgentSystem │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ Supervisor Agent │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ Planner │ │ Dispatcher │ │ Aggregator │ │ │
│ │ │ (任务规划) │ │ (任务分发) │ │ (结果汇总) │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌──────────────────┼──────────────────┐ │
│ ▼ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Research │ │ Coder │ │ Review │ │
│ │ Worker │ │ Worker │ │ Worker │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │ │ │ │
│ └──────────────────┴──────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ Shared State │ │
│ │ (共享状态) │ │
│ └─────────────┘ │
└─────────────────────────────────────────────────────────────────┘
```
### 1.2 核心组件
| 组件 | 职责 | 文件位置 |
|------|------|----------|
| **SupervisorAgent** | 任务分析、规划、分发、汇总 | `agent/multi/supervisor.py` |
| **BaseWorker** | Worker 基类,定义执行接口 | `agent/multi/workers/base.py` |
| **ResearchWorker** | 信息搜索和调研 | `agent/multi/workers/research.py` |
| **CoderWorker** | 代码编写和修改 | `agent/multi/workers/coder.py` |
| **ReviewWorker** | 结果检查和评审 | `agent/multi/workers/review.py` |
| **SharedState** | 跨 Agent 共享状态 | `agent/multi/state.py` |
| **TaskQueue** | 任务队列管理 | `agent/multi/queue.py` |
| **MultiAgentGraph** | LangGraph 流程编排 | `agent/multi/graph.py` |
---
## 二、数据结构设计
### 2.1 Agent State 定义
```python
# agent/multi/types.py
from typing import TypedDict, Annotated, Optional
from operator import add
from pydantic import BaseModel
class TaskItem(BaseModel):
"""单个任务项"""
id: str
description: str
assigned_agent: str # research / coder / review
status: str # pending / running / completed / failed
result: Optional[dict] = None
error: Optional[str] = None
retry_count: int = 0
class AgentState(TypedDict):
"""贯穿整个图的 Agent 状态"""
# 用户输入
original_task: str # 原始任务描述
# 任务规划
task_plan: list[TaskItem] # 分解后的任务列表
current_task_index: int # 当前执行的任务索引
# 执行结果
results: dict # {task_id: result}
# 流程控制
iteration: int # 当前迭代次数
next_node: str # 下一个节点名称
# 共享上下文
shared_context: dict # Agent 间共享的数据
# 最终输出
final_output: str
status: str # running / completed / failed
```
### 2.2 Supervisor 输出结构
```python
# Supervisor 的结构化输出
class SupervisorDecision(BaseModel):
"""Supervisor 的决策"""
analysis: str # 任务分析
task_plan: list[TaskItem] # 任务计划
need_aggregation: bool # 是否需要汇总
next_worker: str # 下一个执行的 Worker
```
---
## 三、核心实现
### 3.1 Supervisor Agent
```python
# agent/multi/supervisor.py
from langchain_core.language_models import BaseChatModel
from langchain_core.output_parsers import PydanticOutputParser
from pydantic import BaseModel
from typing import Type
from .types import AgentState, TaskItem, SupervisorDecision
from .prompts import SUPERVISOR_SYSTEM_PROMPT
class SupervisorAgent:
"""Supervisor Agent - 负责任务规划和分发"""
def __init__(
self,
llm: BaseChatModel,
max_iterations: int = 3
):
self.llm = llm
self.max_iterations = max_iterations
self.output_parser = PydanticOutputParser(pydantic_object=SupervisorDecision)
def create_node(self):
"""创建 Supervisor 节点"""
return self._supervisor_node
async def _supervisor_node(self, state: AgentState) -> dict:
"""Supervisor 节点逻辑"""
# 首次调用:分析任务并生成计划
if not state.get("task_plan"):
decision = await self._plan_tasks(state["original_task"])
return {
"task_plan": decision.task_plan,
"next_node": decision.next_worker,
"current_task_index": 0,
"shared_context": {"task_analysis": decision.analysis}
}
# 检查是否需要继续
current_task = state["task_plan"][state["current_task_index"]]
if current_task["status"] == "completed":
# 当前任务完成,检查是否还有更多任务
if state["current_task_index"] + 1 < len(state["task_plan"]):
next_index = state["current_task_index"] + 1
next_task = state["task_plan"][next_index]
return {
"current_task_index": next_index,
"next_node": next_task["assigned_agent"]
}
else:
# 所有任务完成,进入汇总
return {"next_node": "aggregate"}
elif current_task["status"] == "failed":
# 任务失败,检查是否超过最大重试
if current_task["retry_count"] >= self.max_iterations:
return {"next_node": "aggregate", "status": "failed"}
else:
# 重试
return {"next_node": current_task["assigned_agent"]}
return {"next_node": state.get("next_node", "aggregate")}
async def _plan_tasks(self, task: str) -> SupervisorDecision:
"""调用 LLM 生成任务计划"""
prompt = SUPERVISOR_SYSTEM_PROMPT.format(task=task)
response = await self.llm.ainvoke([
{"role": "system", "content": prompt},
{"role": "user", "content": "请分析任务并制定执行计划。"}
])
# 解析 LLM 输出为结构化决策
# ... (实现解析逻辑)
return decision
```
### 3.2 Worker 基类
```python
# agent/multi/workers/base.py
from abc import ABC, abstractmethod
from typing import Any
from langchain_core.language_models import BaseChatModel
from ..types import AgentState
class BaseWorker(ABC):
"""Worker Agent 基类"""
def __init__(
self,
llm: BaseChatModel,
name: str,
system_prompt: str,
tools: list = None
):
self.llm = llm
self.name = name
self.system_prompt = system_prompt
self.tools = tools or []
@abstractmethod
async def execute(self, task: TaskItem, context: dict) -> dict:
"""执行任务"""
pass
def create_node(self):
"""创建 LangGraph 节点"""
async def node(state: AgentState) -> dict:
task = state["task_plan"][state["current_task_index"]]
result = await self.execute(task, state.get("shared_context", {}))
# 更新状态
return {
"results": {task.id: result},
"task_plan": self._update_task_status(
state["task_plan"],
task.id,
"completed" if result.get("success") else "failed"
),
"shared_context": {**state.get("shared_context", {}), **result.get("context", {})}
}
return node
def _update_task_status(self, tasks: list, task_id: str, status: str) -> list:
"""更新任务状态"""
return [
{**task, "status": status} if task["id"] == task_id else task
for task in tasks
]
```
### 3.3 任务队列(可选:支持并行执行)
```python
# agent/multi/queue.py
import asyncio
from typing import Any, Callable
from dataclasses import dataclass
from enum import Enum
class TaskStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class QueuedTask:
id: str
agent_name: str
task_data: Any
status: TaskStatus = TaskStatus.PENDING
result: Any = None
error: str = None
class TaskQueue:
"""任务队列 - 支持并行执行多个 Worker"""
def __init__(self, max_concurrent: int = 3):
self.max_concurrent = max_concurrent
self.queue: asyncio.Queue = asyncio.Queue()
self.results: dict = {}
self._running = 0
async def add_task(self, task: QueuedTask):
"""添加任务到队列"""
await self.queue.put(task)
async def execute_all(self, worker_factory: Callable):
"""执行所有任务"""
async def worker():
while True:
try:
task = self.queue.get_nowait()
except asyncio.QueueEmpty:
break
self._running += 1
task.status = TaskStatus.Running
try:
worker_instance = worker_factory(task.agent_name)
task.result = await worker_instance.execute(task.task_data)
task.status = TaskStatus.COMPLETED
except Exception as e:
task.status = TaskStatus.FAILED
task.error = str(e)
finally:
self._running -= 1
self.results[task.id] = task
# 启动多个 worker 协程
workers = [asyncio.create_task(worker()) for _ in range(self.max_concurrent)]
await asyncio.gather(*workers)
return self.results
```
### 3.4 LangGraph 流程编排
```python
# agent/multi/graph.py
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import ToolNode
from .types import AgentState
from .supervisor import SupervisorAgent
from .workers.research import ResearchWorker
from .workers.coder import CoderWorker
from .workers.review import ReviewWorker
from .aggregator import ResultAggregator
def create_multi_agent_graph(
llm,
tool_registry,
max_iterations: int = 3
) -> StateGraph:
"""创建多 Agent 流程图"""
# 初始化组件
supervisor = SupervisorAgent(llm, max_iterations)
research_worker = ResearchWorker(llm, tool_registry)
coder_worker = CoderWorker(llm, tool_registry)
review_worker = ReviewWorker(llm, tool_registry)
aggregator = ResultAggregator(llm)
# 创建图
graph = StateGraph(AgentState)
# 添加节点
graph.add_node("supervisor", supervisor.create_node())
graph.add_node("research", research_worker.create_node())
graph.add_node("coder", coder_worker.create_node())
graph.add_node("review", review_worker.create_node())
graph.add_node("aggregate", aggregator.create_node())
# 设置入口
graph.set_entry_point("supervisor")
# 添加边
graph.add_edge("supervisor", "research")
graph.add_edge("research", "review")
graph.add_edge("coder", "review")
# 条件边:从 review 回到 supervisor
def should_continue(state: AgentState) -> str:
if state.get("status") == "failed":
return "aggregate"
if state.get("iteration", 0) >= max_iterations:
return "aggregate"
if state.get("current_task_index", 0) >= len(state.get("task_plan", [])):
return "aggregate"
return "supervisor"
graph.add_conditional_edges(
"review",
should_continue,
{
"supervisor": "supervisor",
"aggregate": "aggregate"
}
)
# 结束节点
graph.add_edge("aggregate", END)
return graph.compile()
```
---
## 四、Prompt 设计
### 4.1 Supervisor System Prompt
```python
# agent/multi/prompts.py
SUPERVISOR_SYSTEM_PROMPT = """你是一个任务规划专家Supervisor。你的职责是将复杂任务分解为可执行的子任务并分配给合适的执行 Agent。
## 可用的 Worker Agent
- **research**: 信息搜索和调研
- **coder**: 代码编写、修改和调试
- **review**: 结果检查、质量评审
## 任务
{task}
## 请按以下步骤执行
### 步骤 1: 任务分析
分析任务的性质,确定需要哪些步骤来完成。
### 步骤 2: 任务分解
将任务分解为独立的子任务。每个子任务应该:
- 描述清晰
- 可以由单个 Agent 完成
- 有明确的完成标准
### 步骤 3: 分配 Agent
为每个子任务选择最合适的执行 Agent。
### 步骤 4: 确定执行顺序
如果有依赖关系,确定正确的执行顺序。
## 输出格式
请以 JSON 格式输出你的决策:
```json
{{
"analysis": "任务分析...",
"task_plan": [
{{
"id": "task_1",
"description": "子任务描述",
"assigned_agent": "research"
}},
{{
"id": "task_2",
"description": "子任务描述",
"assigned_agent": "coder"
}}
],
"need_aggregation": true,
"next_worker": "research"
}}
```
## 注意
- 如果任务很简单,可以只分配给一个 Agent
- 如果任务需要迭代优化,确保有 review 环节
- 考虑任务之间的依赖关系
"""
```
### 4.2 Review Worker Prompt
```python
REVIEW_SYSTEM_PROMPT = """你是一个代码和结果评审专家Reviewer。你的职责是检查任务执行结果是否符合要求。
## 任务描述
{task_description}
## 执行结果
{execution_result}
## 检查标准
1. 结果是否完整解决了原始任务?
2. 输出格式是否正确?
3. 是否存在明显的错误或遗漏?
4. 代码是否有潜在问题?
## 请输出评审结果
```json
{{
"passed": true/false,
"issues": [
{{"severity": "high/medium/low", "description": "问题描述"}}
],
"suggestions": ["改进建议"]
}}
```
"""
```
---
## 五、迭代控制
### 5.1 迭代逻辑
```python
# agent/multi/iteration.py
from typing import Optional
class IterationController:
"""迭代控制器"""
def __init__(self, max_iterations: int = 3):
self.max_iterations = max_iterations
def should_continue(
self,
iteration: int,
task_status: str,
review_result: dict
) -> tuple[bool, str]:
"""
判断是否继续迭代
Returns:
(是否继续, 原因)
"""
# 超过最大迭代次数
if iteration >= self.max_iterations:
return False, "max_iterations_reached"
# 任务成功完成
if task_status == "completed" and review_result.get("passed"):
return False, "task_completed"
# 任务失败且不可重试
if task_status == "failed" and not review_result.get("retryable"):
return False, "task_failed"
# 需要重试
if review_result.get("issues") and review_result.get("passed") is False:
return True, "needs_retry"
return True, "continue"
def get_next_action(
self,
review_result: dict,
current_worker: str
) -> str:
"""确定下一步动作"""
if review_result.get("passed"):
return "supervisor" # 返回 Supervisor
# 根据问题类型决定下一步
issues = review_result.get("issues", [])
high_severity = any(i.get("severity") == "high" for i in issues)
if high_severity:
# 严重问题,重新执行相同任务
return current_worker
else:
# 轻微问题,可以继续
return "supervisor"
```
---
## 六、与现有系统集成
### 6.1 复用现有组件
```python
# agent/multi/integration.py
from app.agent.core.agent import AgentManager
from app.agent.tools.registry import ToolRegistry
from app.agent.memory.session import SessionManager
from app.llm.factory import LLMFactory
class MultiAgentSystem:
"""多智能体系统 - 集成现有组件"""
def __init__(self, config: dict):
# 复用现有 LLM Factory
self.llm_factory = LLMFactory(
provider=config.get("llm_provider", "openai"),
openai_api_key=config.get("openai_api_key"),
anthropic_api_key=config.get("anthropic_api_key")
)
# 复用现有 Tool Registry
self.tool_registry = ToolRegistry()
self._register_default_tools()
# 复用现有 Session Manager
self.session_manager = SessionManager()
# 配置
self.max_iterations = config.get("max_iterations", 3)
def _register_default_tools(self):
"""注册默认工具"""
# 从现有 Agent 复制工具注册逻辑
from app.agent.tools.impl import search, calculator
self.tool_registry.register(
name="search",
func=search.search_web,
description="Search the web",
security_level="safe"
)
# ... 其他工具
async def execute(self, task: str, session_id: str = None) -> dict:
"""执行多 Agent 任务"""
# 创建 LangGraph
from .graph import create_multi_agent_graph
llm = self.llm_factory.get_llm()
graph = create_multi_agent_graph(
llm=llm,
tool_registry=self.tool_registry,
max_iterations=self.max_iterations
)
# 初始化状态
initial_state = {
"original_task": task,
"task_plan": [],
"current_task_index": 0,
"results": {},
"iteration": 0,
"next_node": "supervisor",
"shared_context": {},
"final_output": "",
"status": "running"
}
# 执行
result = await graph.ainvoke(initial_state)
# 保存到 session
if session_id:
self.session_manager.add_message(session_id, "user", task)
self.session_manager.add_message(session_id, "assistant", result["final_output"])
return result
```
---
## 七、文件结构
```
agent/
├── __init__.py
├── multi/
│ ├── __init__.py
│ ├── types.py # 数据类型定义
│ ├── prompts.py # Prompt 模板
│ ├── supervisor.py # Supervisor Agent
│ ├── graph.py # LangGraph 流程图
│ ├── iteration.py # 迭代控制
│ ├── integration.py # 与现有系统集成
│ ├── queue.py # 任务队列(可选)
│ └── workers/
│ ├── __init__.py
│ ├── base.py # Worker 基类
│ ├── research.py # Research Worker
│ ├── coder.py # Coder Worker
│ └── review.py # Review Worker
```
---
## 八、实现顺序
1. **Phase 1: 基础架构**
- 定义数据类型 (types.py)
- 创建 Prompt 模板 (prompts.py)
2. **Phase 2: Supervisor**
- 实现 SupervisorAgent
- 实现任务规划和分发逻辑
3. **Phase 3: Workers**
- 实现 BaseWorker
- 实现 ResearchWorker
- 实现 CoderWorker
- 实现 ReviewWorker
4. **Phase 4: 流程编排**
- 实现 LangGraph 流程图
- 添加条件边和迭代控制
5. **Phase 5: 集成**
- 与现有 Agent 系统集成
- 添加 API 接口
---
## 九、测试计划
1. **单元测试**: 测试各 Worker 的执行逻辑
2. **集成测试**: 测试完整的 Supervisor + Workers 流程
3. **迭代测试**: 测试重试和迭代逻辑
4. **端到端测试**: 模拟真实任务执行

View File

@@ -0,0 +1,107 @@
# Notes: LangGraph 多智能体研究
## 核心概念
### LangGraph 基础
- **StateGraph**: 有向无环图DAG节点是 Agent/函数,边是流转逻辑
- **State**: 贯穿整个图流动的状态对象
- **Node**: 执行单元(可以是 Agent、函数、条件判断
- **Edge**: 连接节点的边支持条件边conditional edges
### Supervisor + Workers 模式参考
#### 1. LangChain 官方 Supervisor 示例
```python
from langgraph.prebuilt import create_react_agent
from langgraph.graph import StateGraph, END
# 定义 Workers
research_agent = create_react_agent(llm, tools=[search])
coder_agent = create_react_agent(llm, tools=[write_file])
# 定义 Supervisor 节点
def supervisor_node(state):
# LLM 决定下一步调用哪个 Agent
response = llm.with_structured_output(SupervisorOutput).invoke(
[SystemMessage(content=SUPERVISOR_PROMPT)] + state["messages"]
)
return {"next": response.next_agent}
# 构建图
graph = StateGraph(AgentState)
graph.add_node("supervisor", supervisor_node)
graph.add_node("research", research_agent)
graph.add_node("code", coder_agent)
```
#### 2. 状态定义
```python
from typing import TypedDict, Annotated
import operator
class AgentState(TypedDict):
messages: Annotated[list, operator.add]
task: str
plan: list
results: dict
iteration: int
next: str # 控制下一步流向
```
#### 3. 条件边实现
```python
def should_continue(state):
if state["iteration"] >= MAX_ITERATIONS:
return "end"
if state.get("task_complete"):
return "end"
return "continue"
graph.add_conditional_edges(
"review",
should_continue,
{
"continue": "supervisor",
"end": END
}
)
```
## 设计决策
### 架构优势
1. **清晰的分层**: Supervisor 负责任务规划Workers 负责执行
2. **可扩展**: 容易添加新的 Worker 类型
3. **可控**: 迭代次数全局配置
4. **灵活**: 支持条件分支和循环
### 需要解决的问题
1. **Supervisor 如何做规划**: 需要设计 prompt 让 LLM 生成任务列表
2. **任务队列**: 需要支持并行分发多个 Worker
3. **共享上下文**: 需要设计数据结构在 Agent 间共享状态
4. **Review 机制**: 需要定义检查标准和重试逻辑
## 关键 Prompt 设计
### Supervisor System Prompt
```
你是一个任务规划专家Supervisor。用户的任务是{task}
请按以下步骤执行:
1. 分析任务需求和约束
2. 将任务分解为可执行的子任务
3. 为每个子任务选择合适的执行 Agent
- research: 信息搜索和调研
- coder: 代码编写和修改
- review: 结果检查和评审
4. 确定执行顺序和依赖关系
当前任务进度:{progress}
共享上下文:{context}
请输出你的决策,格式如下:
- 需要执行的子任务列表
- 每个任务的执行 Agent
- 任务执行顺序
- 是否需要汇总结果
```

View File

@@ -0,0 +1,33 @@
# Task Plan: 多智能体联动系统实现计划
## Goal
基于 LangGraph 实现类似 OpenClaw 的多智能体联动系统,支持任务规划、动态分发、结果汇总和迭代优化。
## Phases
- [x] Phase 1: 系统架构设计和核心组件规划
- [ ] Phase 2: Supervisor Agent 实现
- [ ] Phase 3: Worker Agent 实现
- [ ] Phase 4: 任务队列和共享上下文实现
- [ ] Phase 5: State Machine 流程控制实现
- [ ] Phase 6: 迭代控制和 Review 机制实现
- [ ] Phase 7: 与现有 Agent 系统集成
## Key Questions
1. 如何用 LangGraph 实现 Supervisor + Workers 架构?
2. 如何设计任务队列支持并行执行?
3. 如何实现共享上下文在 Agent 间传递?
4. 如何控制迭代次数和流程分支?
## Decisions Made
- 架构Supervisor + Workers 层级模式
- 协作方式LLM 自主决策任务分配
- 通信共享内存Shared Context
- 迭代控制:全局最大迭代次数配置
- Workers 定义:复用现有 tool_registry
## Status
**Currently in Phase 1** - 系统架构设计和核心组件规划已完成
## 实现计划文件
- `implementation_plan.md` - 详细的实现计划
- `notes.md` - LangGraph 研究笔记