# 多智能体群聊系统实现计划 ## 项目概述 实现类似"一人公司"的多智能体群聊系统,支持多个 Agent 在群聊中讨论问题,类似于人类的团队会议。 ### 核心特性 - **三阶段流程**: 观点提出 → 讨论完善 → CEO 总结决策 - **智能轮数**: AI 判断讨论是否充分,自动决定轮数 - **用户插话**: 用户可以随时打断发表意见 - **角色扮演**: 可配置不同角色的 Agent(CEO、CTO、Designer 等) - **复用架构**: 复用现有的 LLM、ToolRegistry、SessionManager --- ## 一、系统架构 ### 1.1 整体架构图 ``` ┌─────────────────────────────────────────────────────────────────┐ │ Agent Group Chat System │ │ ┌─────────────────────────────────────────────────────────────┐│ │ │ GroupChatManager ││ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ││ │ │ │ Presenter │ │ Discusser │ │ Summarizer │ ││ │ │ │ Controller │ │ Controller │ │ Controller │ ││ │ │ └─────────────┘ └─────────────┘ └─────────────┘ ││ │ │ │ │ │ ││ │ │ └────────────────┴────────────────┘ ││ │ │ │ ││ │ │ SmartRoundController ││ │ └─────────────────────────────────────────────────────────────┘│ │ │ │ │ ┌──────────────────┼──────────────────┐ │ │ ▼ ▼ ▼ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ CEO Agent │ │ CTO Agent │ │Designer Agent│ │ │ │ Participant │ │ Participant │ │ Participant │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │ │ │ │ │ │ └──────────────────┴──────────────────┘ │ │ │ │ │ ▼ │ │ ┌─────────────┐ │ │ │GroupContext │ │ │ │(共享上下文) │ │ │ └─────────────┘ │ └─────────────────────────────────────────────────────────────────┘ ``` ### 1.2 核心组件 | 组件 | 职责 | 文件位置 | |------|------|----------| | **GroupChatManager** | 群聊管理器,控制整体流程 | `group_chat/manager.py` | | **Participant** | 参与群聊的 Agent 实例 | `group_chat/participant.py` | | **StageController** | 阶段控制器,管理各阶段 | `group_chat/stages/controller.py` | | **PresenterStage** | 观点提出阶段 | `group_chat/stages/presenter.py` | | **DiscusserStage** | 讨论完善阶段 | `group_chat/stages/discusser.py` | | **SummarizerStage** | 总结决策阶段 | `group_chat/stages/summarizer.py` | | **SmartRoundController** | 智能轮数控制 | `group_chat/round_controller.py` | | **GroupContext** | 共享讨论上下文 | `group_chat/context.py` | | **GroupMessage** | 群聊消息 | `group_chat/message.py` | --- ## 二、数据结构设计 ### 2.1 消息定义 ```python # group_chat/message.py from typing import Optional from pydantic import BaseModel, Field from datetime import datetime from enum import Enum class MessageStage(str, Enum): """消息所属阶段""" PRESENT = "present" # 观点提出 DISCUSS = "discuss" # 讨论完善 SUMMARIZE = "summarize" # 总结决策 class GroupMessage(BaseModel): """群聊消息""" id: str = Field(..., description="消息唯一标识") agent_id: str = Field(..., description="Agent ID") agent_name: str = Field(..., description="Agent 名称") agent_role: str = Field(..., description="Agent 角色") content: str = Field(..., description="消息内容") timestamp: datetime = Field(default_factory=datetime.now) stage: MessageStage = Field(..., description="所属阶段") round: int = Field(default=1, description="轮数") replying_to: Optional[str] = Field(default=None, description="回复的消息ID") class UserInterruption(BaseModel): """用户插话""" id: str content: str timestamp: datetime = Field(default_factory=datetime.now) stage: MessageStage round: int ``` ### 2.2 共享上下文 ```python # group_chat/context.py from typing import Optional from pydantic import BaseModel, Field from .message import GroupMessage, UserInterruption class GroupContext(BaseModel): """群聊共享上下文""" topic: str = Field(..., description="讨论主题") stage: str = Field(default="present", description="当前阶段") round: int = Field(default=1, description="当前轮数") messages: list[GroupMessage] = Field(default_factory=list) user_interruptions: list[UserInterruption] = Field(default_factory=list) final_decision: Optional[str] = Field(default=None, description="最终决策") status: str = Field(default="running", description="running/completed/failed") class GroupState(BaseModel): """群聊状态""" context: GroupContext participants: dict[str, dict] # {agent_id: config} config: dict # 群聊配置 ``` --- ## 三、核心实现 ### 3.1 Agent 角色定义 ```python # group_chat/roles.py from typing import Optional # 默认角色配置 DEFAULT_ROLES = { "ceo": { "id": "ceo", "name": "CEO", "role": "首席执行官", "system_prompt": """你是一家公司的 CEO,负责制定公司战略方向和重大决策。 你的特点是:高瞻远瞩、战略思维、注重长远利益。 在讨论中,你应该: - 从公司整体战略角度分析问题 - 权衡短期和长期利益 - 做最终决策并承担责任 - 善于总结和归纳各方观点""", "stages": ["present", "summarize"], "is_final_decider": True, "priority": 1 }, "cto": { "id": "cto", "name": "CTO", "role": "首席技术官", "system_prompt": """你是一家公司的 CTO,负责技术决策和技术团队管理。 你的特点是:技术深厚、注重可行性、关注技术趋势。 在讨论中,你应该: - 从技术角度分析方案的可行性和风险 - 提供技术实现建议 - 评估技术选型 - 关注系统的可扩展性和稳定性""", "stages": ["present", "discuss"], "is_final_decider": False, "priority": 2 }, "designer": { "id": "designer", "name": "Designer", "role": "产品设计师", "system_prompt": """你是公司的产品设计师,负责用户体验和界面设计。 你的特点是:创意丰富、注重用户体验、审美在线。 在讨论中,你应该: - 从用户角度分析问题 - 提出用户体验优化建议 - 关注产品细节 - 平衡美观和实用性""", "stages": ["present", "discuss"], "is_final_decider": False, "priority": 3 }, "analyst": { "id": "analyst", "name": "Analyst", "role": "数据分析师", "system_prompt": """你是公司的数据分析师,负责数据分析和决策支持。 你的特点是:数据敏感、逻辑严谨、注重证据。 在讨论中,你应该: - 用数据支撑观点 - 提供数据分析结果 - 指出数据驱动的机会和风险 - 关注关键指标""", "stages": ["present", "discuss"], "is_final_decider": False, "priority": 4 } } ``` ### 3.2 Participant(参与者 Agent) ```python # group_chat/participant.py import uuid from datetime import datetime from typing import Optional from langchain_core.language_models import BaseChatModel from langchain_core.messages import HumanMessage, SystemMessage from .message import GroupMessage, MessageStage from .context import GroupContext class Participant: """参与群聊的 Agent""" def __init__( self, llm: BaseChatModel, config: dict, stage: str = "present" ): self.llm = llm self.config = config self.id = config["id"] self.name = config["name"] self.role = config["role"] self.system_prompt = config["system_prompt"] self.stages = config.get("stages", ["present", "discuss", "summarize"]) self.is_final_decider = config.get("is_final_decider", False) async def generate_message( self, context: GroupContext, stage: MessageStage, round_num: int, replying_to: Optional[str] = None ) -> GroupMessage: """生成消息""" # 构建 prompt prompt = self._build_prompt(context, stage, round_num) # 调用 LLM response = await self.llm.ainvoke([ SystemMessage(content=prompt), HumanMessage(content=f"请就「{context.topic}」发表你的观点。") ]) content = response.content if hasattr(response, 'content') else str(response) # 创建消息 message = GroupMessage( id=str(uuid.uuid4()), agent_id=self.id, agent_name=self.name, agent_role=self.role, content=content, timestamp=datetime.now(), stage=stage, round=round_num, replying_to=replying_to ) return message def _build_prompt(self, context: GroupContext, stage: MessageStage, round_num: int) -> str: """构建 prompt""" base_prompt = self.system_prompt # 添加上下文 context_info = f""" ## 当前讨论 主题:{context.topic} 阶段:{stage.value} 轮数:{round_num} ## 历史消息 """ # 添加历史消息(限制数量) recent_messages = context.messages[-10:] if context.messages else [] for msg in recent_messages: context_info += f"\n【{msg.agent_name}】{msg.content}\n" # 添加用户插话 if context.user_interruptions: context_info += "\n## 用户插话\n" for interruption in context.user_interruptions[-3:]: context_info += f"\n【用户】{interruption.content}\n" # 阶段特定的指令 stage_instruction = { "present": "请简洁地提出你的观点和建议。", "discuss": "请回应其他人的观点,进行讨论和完善。", "summarize": "请总结各方观点,给出最终决策建议。" } full_prompt = f"{base_prompt}\n\n{context_info}\n\n{stage_instruction.get(stage.value, '')}" return full_prompt def can_participate(self, stage: MessageStage) -> bool: """判断是否可以参与当前阶段""" return stage.value in self.stages ``` ### 3.3 智能轮数控制器 ```python # group_chat/round_controller.py from langchain_core.language_models import BaseChatModel from langchain_core.messages import SystemMessage, HumanMessage from .context import GroupContext from .message import MessageStage class SmartRoundController: """智能轮数控制器""" def __init__(self, llm: BaseChatModel, max_rounds: int = 3): self.llm = llm self.max_rounds = max_rounds async def should_continue( self, context: GroupContext, stage: MessageStage ) -> tuple[bool, str]: """ 判断是否继续下一轮 Returns: (是否继续, 原因) """ # 达到最大轮数 if context.round >= self.max_rounds: return False, "max_rounds_reached" # 构建判断 prompt prompt = self._build_judge_prompt(context, stage) # 调用 LLM 判断 response = await self.llm.ainvoke([ SystemMessage(content=prompt), HumanMessage(content="请判断讨论是否已经充分,是否需要更多轮数。") ]) content = response.content.lower() if hasattr(response, 'content') else str(response) # 解析判断结果 if "充分" in content or "足够" in content or "不需要" in content: return False, "discussion_sufficient" elif "不充分" in content or "不够" in content or "需要" in content: return True, "need_more_discussion" # 默认继续 return True, "default_continue" def _build_judge_prompt(self, context: GroupContext, stage: MessageStage) -> str: """构建判断 prompt""" messages_summary = "\n".join([ f"【{msg.agent_name}】{msg.content[:200]}..." for msg in context.messages[-5:] ]) return f"""你是一个讨论质量评估专家。请判断当前的讨论是否已经充分。 ## 讨论信息 主题:{context.topic} 当前阶段:{stage.value} 当前轮数:{context.round} 最大轮数:{self.max_rounds} ## 最近的讨论内容 {messages_summary} ## 判断标准 - 各方观点是否已经充分表达? - 是否有建设性的讨论和回应? - 是否已经形成明确的结论或方向? ## 请输出 如果讨论已经充分,请输出"充分",并简要说明原因。 如果还需要更多讨论,请输出"不充分",并说明需要讨论哪些方面。 """ ``` ### 3.4 群聊管理器 ```python # group_chat/manager.py import uuid from typing import Optional from datetime import datetime from langchain_core.language_models import BaseChatModel from .context import GroupContext from .message import GroupMessage, MessageStage, UserInterruption from .participant import Participant from .roles import DEFAULT_ROLES from .round_controller import SmartRoundController class GroupChatManager: """群聊管理器""" def __init__( self, llm: BaseChatModel, roles: dict = None, max_rounds: int = 3, enable_user_interrupt: bool = True ): self.llm = llm self.roles = roles or DEFAULT_ROLES self.max_rounds = max_rounds self.enable_user_interrupt = enable_user_interrupt # 初始化组件 self.round_controller = SmartRoundController(llm, max_rounds) # 运行时状态 self.context: Optional[GroupContext] = None self.participants: dict[str, Participant] = {} async def start_chat(self, topic: str) -> dict: """开始群聊""" # 创建上下文 self.context = GroupContext( topic=topic, stage="present", round=1, messages=[], user_interruptions=[], status="running" ) # 创建参与者 self.participants = { role_id: Participant(self.llm, config) for role_id, config in self.roles.items() } # 按优先级排序参与者 sorted_participants = sorted( self.participants.values(), key=lambda p: p.config.get("priority", 999) ) # 开始第一阶段 result = await self._run_stage(MessageStage.PRESENT, sorted_participants) return result async def _run_stage( self, stage: MessageStage, participants: list[Participant] ) -> dict: """运行指定阶段""" # 更新上下文 self.context.stage = stage.value # 按阶段获取参与者 stage_participants = [p for p in participants if p.can_participate(stage)] # 执行多轮 for round_num in range(1, self.max_rounds + 1): self.context.round = round_num # 当前阶段参与者发言 messages = await self._run_round(stage, stage_participants, round_num) # 智能判断是否继续 should_continue, reason = await self.round_controller.should_continue( self.context, stage ) if not should_continue: break # 阶段完成后的处理 if stage == MessageStage.PRESENT: # 进入讨论阶段 if any(p.can_participate(MessageStage.DISCUSS) for p in participants): return await self._run_stage(MessageStage.DISCUSS, participants) elif stage == MessageStage.DISCUSS: # 进入总结阶段 summarizer = next( (p for p in participants if p.is_final_decider), participants[0] ) return await self._run_stage(MessageStage.SUMMARIZE, [summarizer]) elif stage == MessageStage.SUMMARIZE: # 完成 self.context.status = "completed" return self._build_result() async def _run_round( self, stage: MessageStage, participants: list[Participant], round_num: int ) -> list[GroupMessage]: """运行一轮发言""" messages = [] for participant in participants: # 获取前一轮的最新消息(用于回复) replying_to = None if self.context.messages: last_message = self.context.messages[-1] if last_message.agent_id != participant.id: replying_to = last_message.id # 生成消息 message = await participant.generate_message( context=self.context, stage=stage, round_num=round_num, replying_to=replying_to ) # 保存消息 self.context.messages.append(message) messages.append(message) return messages async def add_interruption(self, content: str) -> dict: """添加用户插话""" if not self.enable_user_interrupt: return {"error": "用户插话已禁用"} interruption = UserInterruption( id=str(uuid.uuid4()), content=content, timestamp=datetime.now(), stage=MessageStage(self.context.stage), round=self.context.round ) self.context.user_interruptions.append(interruption) return {"success": True, "interruption_id": interruption.id} def get_messages(self) -> list[GroupMessage]: """获取所有消息""" return self.context.messages if self.context else [] def get_result(self) -> dict: """获取结果""" return self._build_result() def _build_result(self) -> dict: """构建结果""" return { "topic": self.context.topic, "status": self.context.status, "messages": [ { "agent_name": m.agent_name, "agent_role": m.agent_role, "content": m.content, "stage": m.stage.value, "round": m.round } for m in self.context.messages ], "final_decision": self.context.final_decision, "user_interruptions": [ {"content": i.content, "timestamp": i.timestamp.isoformat()} for i in self.context.user_interruptions ] } ``` --- ## 四、阶段实现 ### 4.1 Presenter Stage(观点提出) ```python # group_chat/stages/presenter.py from typing import list from ..participant import Participant from ..message import GroupMessage class PresenterStage: """观点提出阶段""" async def execute( self, participants: list[Participant], context ) -> list[GroupMessage]: """执行观点提出""" messages = [] # 按优先级顺序发言 for participant in participants: if not participant.can_participate("present"): continue message = await participant.generate_message( context=context, stage="present", round=context.round ) messages.append(message) context.messages.append(message) return messages ``` ### 4.2 Discusser Stage(讨论完善) ```python # group_chat/stages/discusser.py from ..participant import Participant from ..message import GroupMessage class DiscusserStage: """讨论完善阶段""" async def execute( self, participants: list[Participant], context ) -> list[GroupMessage]: """执行讨论完善""" messages = [] # 获取上一轮的消息 last_round_messages = [ m for m in context.messages if m.stage == "present" and m.round == context.round - 1 ] for participant in participants: if not participant.can_participate("discuss"): continue # 选择要回复的消息 replying_to = None for msg in reversed(last_round_messages): if msg.agent_id != participant.id: replying_to = msg.id break message = await participant.generate_message( context=context, stage="discuss", round=context.round, replying_to=replying_to ) messages.append(message) context.messages.append(message) return messages ``` ### 4.3 Summarizer Stage(总结决策) ```python # group_chat/stages/summarizer.py from ..participant import Participant from ..message import GroupMessage class SummarizerStage: """总结决策阶段""" async def execute( self, participants: list[Participant], context ) -> GroupMessage: """执行总结决策""" # CEO 总结 summarizer = next( (p for p in participants if p.is_final_decider), participants[0] ) message = await summarizer.generate_message( context=context, stage="summarize", round=context.round ) # 保存最终决策 context.final_decision = message.content context.messages.append(message) return message ``` --- ## 五、与现有系统集成 ### 5.1 复用现有组件 ```python # group_chat/integration.py from typing import Optional from app.llm.factory import LLMFactory from app.agent.tools.registry import ToolRegistry from app.agent.memory.session import SessionManager from .manager import GroupChatManager class GroupChatSystem: """群聊系统 - 集成现有组件""" def __init__( self, llm_provider: str = "openai", openai_api_key: Optional[str] = None, anthropic_api_key: Optional[str] = None, roles: dict = None, max_rounds: int = 3, enable_user_interrupt: bool = True ): # 初始化 LLM Factory self.llm_factory = LLMFactory( provider=llm_provider, openai_api_key=openai_api_key, anthropic_api_key=anthropic_api_key ) # 初始化 Tool Registry self.tool_registry = ToolRegistry() # 初始化 Session Manager self.session_manager = SessionManager() # 配置 self.roles = roles self.max_rounds = max_rounds self.enable_user_interrupt = enable_user_interrupt async def start_group_chat( self, topic: str, session_id: str = None ) -> dict: """开始群聊""" # 获取 LLM llm = self.llm_factory.get_llm() # 创建群聊管理器 manager = GroupChatManager( llm=llm, roles=self.roles, max_rounds=self.max_rounds, enable_user_interrupt=self.enable_user_interrupt ) # 开始群聊 result = await manager.start_chat(topic) # 保存到 session if session_id: self.session_manager.add_message(session_id, "user", topic) self.session_manager.add_message( session_id, "assistant", result.get("final_decision", str(result)) ) return result async def add_message( self, message: str, session_id: str ) -> dict: """添加用户消息(插话)""" # 获取 session 中的 manager # ... 实现获取逻辑 return manager.add_interruption(message) ``` --- ## 六、文件结构 ``` agent/app/agent/multi/ ├── group_chat/ │ ├── __init__.py │ ├── roles.py # Agent 角色定义 │ ├── message.py # 消息类型 │ ├── context.py # 共享上下文 │ ├── participant.py # 参与者 Agent │ ├── manager.py # 群聊管理器 │ ├── round_controller.py # 智能轮数控制器 │ ├── stages/ │ │ ├── __init__.py │ │ ├── controller.py # 阶段控制器 │ │ ├── presenter.py # 观点提出阶段 │ │ ├── discusser.py # 讨论完善阶段 │ │ └── summarizer.py # 总结决策阶段 │ └── integration.py # 与现有系统集成 ``` --- ## 七、实现顺序 1. **Phase 1: 基础架构** - 定义数据类型 (message.py, context.py) - 创建角色配置 (roles.py) 2. **Phase 2: 核心组件** - 实现 Participant (participant.py) - 实现 SmartRoundController (round_controller.py) 3. **Phase 3: 阶段实现** - 实现 PresenterStage - 实现 DiscusserStage - 实现 SummarizerStage 4. **Phase 4: 集成** - 实现 GroupChatManager - 与现有系统集成 --- ## 八、测试计划 1. **单元测试**: 测试各 Participant 的消息生成 2. **集成测试**: 测试完整的群聊流程 3. **轮数控制测试**: 测试智能轮数判断 4. **用户插话测试**: 测试插话机制 5. **端到端测试**: 模拟真实群聊场景