diff --git a/agent/app/agent/multi/__init__.py b/agent/app/agent/multi/__init__.py new file mode 100644 index 0000000..1ed197a --- /dev/null +++ b/agent/app/agent/multi/__init__.py @@ -0,0 +1,23 @@ +""" +多智能体系统 +""" +from .types import AgentState, TaskItem, TaskStatus, AgentType, SupervisorDecision, ReviewResult +from .prompts import SUPERVISOR_SYSTEM_PROMPT, REVIEW_SYSTEM_PROMPT, RESEARCH_SYSTEM_PROMPT, CODER_SYSTEM_PROMPT, AGGREGATOR_SYSTEM_PROMPT +from .supervisor import SupervisorAgent +from .graph import create_multi_agent_graph + +__all__ = [ + "AgentState", + "TaskItem", + "TaskStatus", + "AgentType", + "SupervisorDecision", + "ReviewResult", + "SUPERVISOR_SYSTEM_PROMPT", + "REVIEW_SYSTEM_PROMPT", + "RESEARCH_SYSTEM_PROMPT", + "CODER_SYSTEM_PROMPT", + "AGGREGATOR_SYSTEM_PROMPT", + "SupervisorAgent", + "create_multi_agent_graph", +] diff --git a/agent/app/agent/multi/graph.py b/agent/app/agent/multi/graph.py new file mode 100644 index 0000000..3b2aff2 --- /dev/null +++ b/agent/app/agent/multi/graph.py @@ -0,0 +1,130 @@ +""" +LangGraph 流程编排 +""" +from langgraph.graph import StateGraph, END +from langgraph.graph.graph import CompiledGraph + +from .types import AgentState, AgentType +from .supervisor import SupervisorAgent, ResultAggregator +from .workers.research import ResearchWorker +from .workers.coder import CoderWorker +from .workers.review import ReviewWorker + + +def create_multi_agent_graph( + llm, + tool_registry=None, + max_iterations: int = 3, + max_tasks: int = 10 +) -> CompiledGraph: + """创建多 Agent 流程图 + + Args: + llm: 语言模型实例 + tool_registry: 工具注册表 + max_iterations: 最大迭代次数 + max_tasks: 最大任务数 + + Returns: + CompiledGraph: 编译后的 LangGraph + """ + + # 初始化组件 + supervisor = SupervisorAgent(llm, max_iterations=max_iterations, max_tasks=max_tasks) + research_worker = ResearchWorker(llm, tool_registry) + coder_worker = CoderWorker(llm, tool_registry) + review_worker = ReviewWorker(llm, tool_registry) + aggregator = ResultAggregator(llm) + + # 创建图 + graph = StateGraph(AgentState) + + # 添加节点 + graph.add_node("supervisor", supervisor.create_node()) + graph.add_node(AgentType.RESEARCH, research_worker.create_node()) + graph.add_node(AgentType.CODER, coder_worker.create_node()) + graph.add_node(AgentType.REVIEW, review_worker.create_node()) + graph.add_node("aggregator", aggregator.create_node()) + + # 设置入口点 + graph.set_entry_point("supervisor") + + # 定义条件边函数 + def should_continue(state: AgentState) -> str: + """判断是否继续执行""" + + # 获取下一步节点 + next_node = state.get("next_node", "aggregator") + + # 如果是结束节点 + if next_node in ["__end__", "aggregator"]: + return "aggregator" + + # 如果是 Worker 节点 + if next_node in [AgentType.RESEARCH, AgentType.CODER, AgentType.REVIEW]: + return next_node + + # 如果是 supervisor + if next_node == "supervisor": + # 检查迭代次数 + iteration = state.get("iteration", 0) + if iteration >= max_iterations: + return "aggregator" + return "supervisor" + + # 默认进入汇总 + return "aggregator" + + # 添加条件边:从 supervisor 出来 + graph.add_conditional_edges( + "supervisor", + should_continue, + { + "supervisor": "supervisor", + AgentType.RESEARCH: AgentType.RESEARCH, + AgentType.CODER: AgentType.CODER, + AgentType.REVIEW: AgentType.REVIEW, + "aggregator": "aggregator" + } + ) + + # 添加边:Worker -> Review + graph.add_edge(AgentType.RESEARCH, AgentType.REVIEW) + graph.add_edge(AgentType.CODER, AgentType.REVIEW) + + # 添加条件边:从 Review 出来 + graph.add_conditional_edges( + AgentType.REVIEW, + should_continue, + { + "supervisor": "supervisor", + "aggregator": "aggregator" + } + ) + + # 添加边:aggregator -> END + graph.add_edge("aggregator", END) + + # 编译图 + return graph.compile() + + +def create_simple_graph(llm, tool_registry=None) -> CompiledGraph: + """创建简单的单 Agent 图(不经过 Supervisor)""" + + # 创建图 + graph = StateGraph(AgentState) + + # 直接使用 Coder Worker + coder_worker = CoderWorker(llm, tool_registry) + + # 添加节点 + graph.add_node("coder", coder_worker.create_node()) + + # 设置入口 + graph.set_entry_point("coder") + + # 添加边 + graph.add_edge("coder", END) + + return graph.compile() diff --git a/agent/app/agent/multi/integration.py b/agent/app/agent/multi/integration.py new file mode 100644 index 0000000..9500388 --- /dev/null +++ b/agent/app/agent/multi/integration.py @@ -0,0 +1,223 @@ +""" +多智能体系统 - 与现有系统集成 +""" +import logging +from typing import Optional +from app.llm.factory import LLMFactory +from app.agent.tools.registry import ToolRegistry +from app.agent.memory.session import SessionManager + +from .types import create_initial_state +from .graph import create_multi_agent_graph + +logger = logging.getLogger(__name__) + + +class MultiAgentSystem: + """多智能体系统 - 集成现有组件""" + + def __init__( + self, + llm_provider: str = "openai", + openai_api_key: Optional[str] = None, + anthropic_api_key: Optional[str] = None, + max_iterations: int = 3, + max_tasks: int = 10 + ): + """ + 初始化多智能体系统 + + Args: + llm_provider: LLM 提供商 + openai_api_key: OpenAI API Key + anthropic_api_key: Anthropic API Key + max_iterations: 最大迭代次数 + max_tasks: 最大任务数 + """ + # 初始化 LLM Factory + self.llm_factory = LLMFactory( + provider=llm_provider, + openai_api_key=openai_api_key, + anthropic_api_key=anthropic_api_key + ) + + # 初始化 Tool Registry + self.tool_registry = ToolRegistry() + self._register_default_tools() + + # 初始化 Session Manager + self.session_manager = SessionManager() + + # 配置 + self.max_iterations = max_iterations + self.max_tasks = max_tasks + + # 图实例(延迟初始化) + self._graph = None + + def _register_default_tools(self): + """注册默认工具""" + try: + from app.agent.tools.impl import search, calculator, time_tool + + # 安全工具 + self.tool_registry.register( + name="search", + func=search.search_web, + description="Search the web for information", + security_level="safe" + ) + + self.tool_registry.register( + name="calculator", + func=calculator.calculate, + description="Perform mathematical calculations", + security_level="safe" + ) + + self.tool_registry.register( + name="get_current_time", + func=time_tool.get_current_time, + description="Get current date and time", + security_level="safe" + ) + + # 执行代码工具 + try: + from app.agent.tools.impl import sandbox + self.tool_registry.register( + name="execute_code", + func=sandbox.sandbox.execute, + description="Execute code in sandbox", + security_level="review", + require_approval=True + ) + except ImportError: + pass + + except ImportError as e: + logger.warning(f"Failed to import default tools: {e}") + + @property + def graph(self): + """获取或创建 LangGraph""" + if self._graph is None: + llm = self.llm_factory.get_llm() + self._graph = create_multi_agent_graph( + llm=llm, + tool_registry=self.tool_registry, + max_iterations=self.max_iterations, + max_tasks=self.max_tasks + ) + return self._graph + + async def execute(self, task: str, session_id: str = None) -> dict: + """ + 执行多 Agent 任务 + + Args: + task: 任务描述 + session_id: 会话 ID(可选) + + Returns: + dict: 执行结果 + """ + # 创建初始状态 + initial_state = create_initial_state(task, session_id) + + try: + # 执行图 + result = await self.graph.ainvoke(initial_state) + + # 保存到 session + if session_id: + self.session_manager.add_message(session_id, "user", task) + self.session_manager.add_message( + session_id, + "assistant", + result.get("final_output", "") + ) + + return { + "success": result.get("status") != "failed", + "output": result.get("final_output", ""), + "status": result.get("status", "unknown"), + "task_plan": result.get("task_plan", []), + "results": result.get("results", {}) + } + + except Exception as e: + logger.error(f"Multi-agent execution failed: {e}") + return { + "success": False, + "output": f"执行失败: {str(e)}", + "status": "failed", + "error": str(e) + } + + async def execute_simple(self, task: str, session_id: str = None) -> dict: + """ + 执行简单任务(不使用 Supervisor) + + Args: + task: 任务描述 + session_id: 会话 ID(可选) + + Returns: + dict: 执行结果 + """ + from .graph import create_simple_graph + + # 创建简单图 + llm = self.llm_factory.get_llm() + simple_graph = create_simple_graph(llm, self.tool_registry) + + # 创建初始状态 + initial_state = create_initial_state(task, session_id) + + try: + # 执行图 + result = await simple_graph.ainvoke(initial_state) + + return { + "success": True, + "output": result.get("final_output", ""), + "status": result.get("status", "completed") + } + + except Exception as e: + logger.error(f"Simple execution failed: {e}") + return { + "success": False, + "output": f"执行失败: {str(e)}", + "status": "failed", + "error": str(e) + } + + def list_tools(self) -> list: + """列出所有可用工具""" + return self.tool_registry.list_tools() + + +# 全局实例 +_global_system: Optional[MultiAgentSystem] = None + + +def get_multi_agent_system( + llm_provider: str = "openai", + openai_api_key: str = None, + anthropic_api_key: str = None, + **kwargs +) -> MultiAgentSystem: + """获取全局多智能体系统实例""" + global _global_system + + if _global_system is None: + _global_system = MultiAgentSystem( + llm_provider=llm_provider, + openai_api_key=openai_api_key, + anthropic_api_key=anthropic_api_key, + **kwargs + ) + + return _global_system diff --git a/agent/app/agent/multi/iteration.py b/agent/app/agent/multi/iteration.py new file mode 100644 index 0000000..3ae331f --- /dev/null +++ b/agent/app/agent/multi/iteration.py @@ -0,0 +1,117 @@ +""" +迭代控制器 +""" +from typing import Optional + + +class IterationController: + """迭代控制器 - 管理任务执行的迭代""" + + def __init__( + self, + max_iterations: int = 3, + max_retries_per_task: int = 2 + ): + """ + 初始化迭代控制器 + + Args: + max_iterations: 全局最大迭代次数 + max_retries_per_task: 每个任务的最大重试次数 + """ + self.max_iterations = max_iterations + self.max_retries_per_task = max_retries_per_task + + def should_continue( + self, + iteration: int, + task_status: str, + review_result: Optional[dict] = None + ) -> tuple[bool, str]: + """ + 判断是否继续迭代 + + Args: + iteration: 当前迭代次数 + task_status: 任务状态 + review_result: 评审结果(可选) + + Returns: + (是否继续, 原因) + """ + # 超过最大迭代次数 + if iteration >= self.max_iterations: + return False, "max_iterations_reached" + + # 任务成功完成 + if task_status == "completed": + if review_result and review_result.get("passed"): + return False, "task_completed" + elif review_result is None: + return False, "task_completed" + + # 任务失败且不可重试 + if task_status == "failed": + if review_result and not review_result.get("retryable", True): + return False, "task_failed_non_retryable" + + # 检查重试次数 + retry_count = review_result.get("retry_count", 0) if review_result else 0 + if retry_count >= self.max_retries_per_task: + return False, "max_retries_reached" + + # 需要重试 + if review_result: + issues = review_result.get("issues", []) + if issues and not review_result.get("passed", True): + return True, "needs_retry" + + return True, "continue" + + def get_next_action( + self, + review_result: Optional[dict], + current_worker: str + ) -> str: + """ + 确定下一步动作 + + Args: + review_result: 评审结果 + current_worker: 当前执行的 Worker + + Returns: + 下一个节点名称 + """ + if review_result is None: + return "supervisor" + + # 根据评审结果决定下一步 + if review_result.get("passed"): + return "supervisor" + + # 根据问题类型决定下一步 + issues = review_result.get("issues", []) + high_severity = any(i.get("severity") == "high" for i in issues) + + if high_severity: + # 严重问题,重新执行相同任务 + return current_worker + else: + # 轻微问题,返回 Supervisor + return "supervisor" + + def calculate_backoff_delay(self, retry_count: int) -> float: + """ + 计算退避延迟(指数退避) + + Args: + retry_count: 重试次数 + + Returns: + 延迟时间(秒) + """ + base_delay = 1.0 + max_delay = 30.0 + delay = min(base_delay * (2 ** retry_count), max_delay) + return delay diff --git a/agent/app/agent/multi/prompts.py b/agent/app/agent/multi/prompts.py new file mode 100644 index 0000000..75b1a73 --- /dev/null +++ b/agent/app/agent/multi/prompts.py @@ -0,0 +1,170 @@ +""" +多智能体系统 Prompt 模板 +""" + +# Supervisor System Prompt +SUPERVISOR_SYSTEM_PROMPT = """你是一个任务规划专家(Supervisor)。你的职责是将复杂任务分解为可执行的子任务,并分配给合适的执行 Agent。 + +## 可用的 Worker Agent +- **research**: 信息搜索和调研 +- **coder**: 代码编写、修改和调试 +- **review**: 结果检查、质量评审 + +## 任务 +{task} + +## 当前进度 +{progress} + +## 共享上下文 +{context} + +## 请按以下步骤执行 + +### 步骤 1: 任务分析 +分析任务的性质,确定需要哪些步骤来完成。 + +### 步骤 2: 任务分解 +将任务分解为独立的子任务。每个子任务应该: +- 描述清晰 +- 可以由单个 Agent 完成 +- 有明确的完成标准 + +### 步骤 3: 分配 Agent +为每个子任务选择最合适的执行 Agent。 + +### 步骤 4: 确定执行顺序 +如果有依赖关系,确定正确的执行顺序。 + +## 输出格式 +请以 JSON 格式输出你的决策,包含以下字段: +- analysis: 任务分析 +- task_plan: 任务计划数组,每个元素包含 id, description, assigned_agent +- need_aggregation: 是否需要汇总结果 +- next_worker: 下一个执行的 Worker 名称 (research/coder/review) + +## 注意 +- 如果任务很简单,可以只分配给一个 Agent +- 如果任务需要迭代优化,确保有 review 环节 +- 考虑任务之间的依赖关系 +- 使用 "research"/"coder"/"review" 作为 assigned_agent 的值 +""" + +# Review Worker System Prompt +REVIEW_SYSTEM_PROMPT = """你是一个代码和结果评审专家(Reviewer)。你的职责是检查任务执行结果是否符合要求。 + +## 原始任务 +{original_task} + +## 当前任务描述 +{task_description} + +## 执行结果 +{execution_result} + +## 共享上下文 +{context} + +## 检查标准 +1. 结果是否完整解决了原始任务? +2. 输出格式是否正确? +3. 是否存在明显的错误或遗漏? +4. 代码是否有潜在问题? +5. 是否有安全漏洞或风险? + +## 输出格式 +请以 JSON 格式输出评审结果: +- passed: true/false,是否通过 +- issues: 问题数组,每个包含 severity(high/medium/low) 和 description +- suggestions: 改进建议数组 +- retryable: true/false,是否可以重试 + +## 注意 +- 如果只有轻微问题,passed 可以为 true +- 如果有严重问题,passed 应为 false +- 判断是否需要重试,而不是立即失败 +""" + +# Research Worker System Prompt +RESEARCH_SYSTEM_PROMPT = """你是一个信息搜索和调研专家(Researcher)。你的职责是根据任务要求搜集和整理信息。 + +## 任务 +{task} + +## 共享上下文 +{context} + +## 请执行以下步骤 + +### 1. 理解任务 +明确需要搜集什么信息,信息的用途是什么。 + +### 2. 搜索信息 +使用可用工具搜索相关信息。 + +### 3. 整理结果 +将搜索结果整理成结构化的信息。 + +## 输出要求 +- 提供清晰、结构化的信息整理 +- 标注信息来源 +- 如果无法完成任务,说明原因 +""" + +# Coder Worker System Prompt +CODER_SYSTEM_PROMPT = """你是一个代码编写专家(Coder)。你的职责是根据任务要求编写和修改代码。 + +## 任务 +{task} + +## 共享上下文 +{context} + +## 请执行以下步骤 + +### 1. 理解需求 +明确需要编写什么代码,代码的用途和约束。 + +### 2. 编写代码 +使用合适的编程语言和框架编写代码。 + +### 3. 代码检查 +确保代码语法正确,逻辑合理。 + +## 输出要求 +- 提供完整的、可运行的代码 +- 包含必要的注释说明 +- 如果需要执行代码,使用代码执行工具 +""" + +# Aggregator System Prompt +AGGREGATOR_SYSTEM_PROMPT = """你是一个结果汇总专家(Aggregator)。你的职责是将多个子任务的结果汇总成最终输出。 + +## 原始任务 +{original_task} + +## 任务计划 +{task_plan} + +## 执行结果 +{results} + +## 共享上下文 +{context} + +## 请执行以下步骤 + +### 1. 分析结果 +分析每个子任务的执行结果。 + +### 2. 识别关键信息 +从结果中提取关键信息。 + +### 3. 汇总输出 +将所有结果整合成一个连贯的最终输出。 + +## 输出要求 +- 提供清晰、完整的最终结果 +- 标注每个部分的来源 +- 确保结果解决了原始任务 +""" diff --git a/agent/app/agent/multi/supervisor.py b/agent/app/agent/multi/supervisor.py new file mode 100644 index 0000000..b2ca338 --- /dev/null +++ b/agent/app/agent/multi/supervisor.py @@ -0,0 +1,262 @@ +""" +Supervisor Agent - 负责任务规划和分发 +""" +import json +import re +from typing import Optional +from langchain_core.language_models import BaseChatModel +from langchain_core.output_parsers import PydanticOutputParser +from langchain_core.messages import HumanMessage, SystemMessage + +from .types import AgentState, TaskItem, AgentType, SupervisorDecision +from .prompts import SUPERVISOR_SYSTEM_PROMPT + + +class SupervisorAgent: + """Supervisor Agent - 负责任务规划和分发""" + + def __init__( + self, + llm: BaseChatModel, + max_iterations: int = 3, + max_tasks: int = 10 + ): + self.llm = llm + self.max_iterations = max_iterations + self.max_tasks = max_tasks + + def create_node(self): + """创建 LangGraph 节点""" + return self._supervisor_node + + async def _supervisor_node(self, state: AgentState) -> dict: + """Supervisor 节点逻辑""" + + # 首次调用:分析任务并生成计划 + if not state.get("task_plan"): + decision = await self._plan_tasks( + task=state["original_task"], + progress="这是任务的开始", + context=state.get("shared_context", {}) + ) + + return { + "task_plan": decision.task_plan, + "next_node": decision.next_worker, + "current_task_index": 0, + "shared_context": { + **state.get("shared_context", {}), + "task_analysis": decision.analysis + } + } + + # 非首次调用:检查任务状态,决定下一步 + current_task_index = state.get("current_task_index", 0) + task_plan = state.get("task_plan", []) + + # 获取当前任务 + if current_task_index >= len(task_plan): + # 所有任务完成,进入汇总 + return { + "next_node": "aggregator", + "shared_context": state.get("shared_context", {}) + } + + current_task = task_plan[current_task_index] + + # 检查当前任务状态 + if current_task.status == "completed": + # 当前任务完成,检查是否还有更多任务 + if current_task_index + 1 < len(task_plan): + next_index = current_task_index + 1 + next_task = task_plan[next_index] + return { + "current_task_index": next_index, + "next_node": next_task.assigned_agent, + "iteration": state.get("iteration", 0), + "shared_context": state.get("shared_context", {}) + } + else: + # 所有任务完成,进入汇总 + return { + "next_node": "aggregator", + "shared_context": state.get("shared_context", {}) + } + + elif current_task.status == "failed": + # 任务失败,检查是否超过最大重试 + if current_task.retry_count >= self.max_iterations: + # 超过最大重试,进入汇总(标记失败) + return { + "next_node": "aggregator", + "status": "failed", + "shared_context": state.get("shared_context", {}) + } + else: + # 重试当前任务 + return { + "next_node": current_task.assigned_agent, + "iteration": state.get("iteration", 0) + 1, + "shared_context": state.get("shared_context", {}) + } + + elif current_task.status == "needs_retry": + # 需要重试(来自 review) + return { + "next_node": current_task.assigned_agent, + "iteration": state.get("iteration", 0) + 1, + "shared_context": state.get("shared_context", {}) + } + + # 默认继续执行 + return { + "next_node": state.get("next_node", "aggregator"), + "shared_context": state.get("shared_context", {}) + } + + async def _plan_tasks(self, task: str, progress: str, context: dict) -> SupervisorDecision: + """调用 LLM 生成任务计划""" + + # 格式化 prompt + context_str = json.dumps(context, ensure_ascii=False, indent=2) if context else "无" + prompt = SUPERVISOR_SYSTEM_PROMPT.format( + task=task, + progress=progress, + context=context_str + ) + + # 调用 LLM + response = await self.llm.ainvoke([ + SystemMessage(content=prompt), + HumanMessage(content="请分析任务并制定执行计划。") + ]) + + # 解析 LLM 输出 + decision = self._parse_response(response.content, task) + + return decision + + def _parse_response(self, response: str, original_task: str) -> SupervisorDecision: + """解析 LLM 响应为结构化决策""" + + try: + # 尝试提取 JSON + json_match = re.search(r'\{[\s\S]*\}', response) + if json_match: + data = json.loads(json_match.group()) + else: + raise ValueError("No JSON found") + + # 解析任务计划 + task_plan = [] + for i, item in enumerate(data.get("task_plan", [])): + task = TaskItem( + id=item.get("id", f"task_{i+1}"), + description=item.get("description", ""), + assigned_agent=AgentType(item.get("assigned_agent", "coder")), + status="pending" + ) + task_plan.append(task) + + # 确定下一个 Worker + next_worker = data.get("next_worker", "research") + if isinstance(next_worker, dict): + next_worker = next_worker.get("assigned_agent", "research") + + return SupervisorDecision( + analysis=data.get("analysis", "任务分析"), + task_plan=task_plan, + need_aggregation=data.get("need_aggregation", True), + next_worker=AgentType(next_worker) + ) + + except Exception as e: + # 解析失败,创建默认计划 + return self._create_default_plan(original_task) + + def _create_default_plan(self, task: str) -> SupervisorDecision: + """创建默认任务计划""" + + task_lower = task.lower() + + # 根据任务关键词判断 + if any(keyword in task_lower for keyword in ["搜索", "查找", "调研", "研究", "research", "search"]): + assigned_agent = AgentType.RESEARCH + elif any(keyword in task_lower for keyword in ["代码", "写", "开发", "code", "program", "写代码"]): + assigned_agent = AgentType.CODER + else: + assigned_agent = AgentType.CODER + + # 创建默认任务 + task_item = TaskItem( + id="task_1", + description=task, + assigned_agent=assigned_agent, + status="pending" + ) + + return SupervisorDecision( + analysis="简单任务,直接分配给合适的 Agent 执行", + task_plan=[task_item], + need_aggregation=True, + next_worker=assigned_agent + ) + + +class ResultAggregator: + """结果聚合器 - 汇总多个任务的结果""" + + def __init__(self, llm: BaseChatModel): + self.llm = llm + + def create_node(self): + """创建 LangGraph 节点""" + return self._aggregate_node + + async def _aggregate_node(self, state: AgentState) -> dict: + """聚合节点逻辑""" + + # 准备任务计划和结果 + task_plan = state.get("task_plan", []) + results = state.get("results", {}) + original_task = state.get("original_task", "") + + # 构建任务描述 + task_descriptions = [] + for task in task_plan: + task_descriptions.append(f"- {task.id}: {task.description} -> {task.status}") + + # 构建结果描述 + result_items = [] + for task_id, result in results.items(): + if isinstance(result, dict): + content = result.get("content", str(result)) + else: + content = str(result) + result_items.append(f"## {task_id}\n{content}") + + # 调用 LLM 汇总结果 + from .prompts import AGGREGATOR_SYSTEM_PROMPT + + context_str = json.dumps(state.get("shared_context", {}), ensure_ascii=False, indent=2) + + prompt = AGGREGATOR_SYSTEM_PROMPT.format( + original_task=original_task, + task_plan="\n".join(task_descriptions), + results="\n\n".join(result_items) if result_items else "无结果", + context=context_str + ) + + response = await self.llm.ainvoke([ + SystemMessage(content=prompt), + HumanMessage(content="请汇总以上结果,给出最终输出。") + ]) + + # 检查是否有失败的任务 + has_failed = any(task.status == "failed" for task in task_plan) + + return { + "final_output": response.content, + "status": "failed" if has_failed else "completed", + "next_node": "__end__" + } diff --git a/agent/app/agent/multi/types.py b/agent/app/agent/multi/types.py new file mode 100644 index 0000000..a448323 --- /dev/null +++ b/agent/app/agent/multi/types.py @@ -0,0 +1,93 @@ +""" +多智能体系统数据类型定义 +""" +from typing import TypedDict, Annotated, Optional, Literal +from operator import add +from pydantic import BaseModel, Field +from enum import Enum + + +class TaskStatus(str, Enum): + """任务状态""" + PENDING = "pending" + RUNNING = "running" + COMPLETED = "completed" + FAILED = "failed" + NEEDS_RETRY = "needs_retry" + + +class AgentType(str, Enum): + """Agent 类型""" + SUPERVISOR = "supervisor" + RESEARCH = "research" + CODER = "coder" + REVIEW = "review" + AGGREGATOR = "aggregator" + + +class TaskItem(BaseModel): + """单个任务项""" + id: str = Field(..., description="任务唯一标识") + description: str = Field(..., description="任务描述") + assigned_agent: AgentType = Field(..., description="分配的 Agent 类型") + status: TaskStatus = Field(default=TaskStatus.PENDING, description="任务状态") + result: Optional[dict] = Field(default=None, description="任务执行结果") + error: Optional[str] = Field(default=None, description="错误信息") + retry_count: int = Field(default=0, description="重试次数") + + +class SupervisorDecision(BaseModel): + """Supervisor 的结构化决策""" + analysis: str = Field(..., description="任务分析") + task_plan: list[TaskItem] = Field(..., description="任务计划") + need_aggregation: bool = Field(default=True, description="是否需要汇总") + next_worker: AgentType = Field(..., description="下一个执行的 Worker") + + +class ReviewResult(BaseModel): + """Review 结果""" + passed: bool = Field(..., description="是否通过") + issues: list[dict] = Field(default_factory=list, description="问题列表") + suggestions: list[str] = Field(default_factory=list, description="改进建议") + retryable: bool = Field(default=True, description="是否可重试") + + +class AgentState(TypedDict): + """贯穿整个图的 Agent 状态""" + # 用户输入 + original_task: str # 原始任务描述 + session_id: Optional[str] # 会话 ID + + # 任务规划 + task_plan: list[TaskItem] # 分解后的任务列表 + current_task_index: int # 当前执行的任务索引 + + # 执行结果 + results: dict # {task_id: result} + + # 流程控制 + iteration: int # 当前迭代次数 + next_node: str # 下一个节点名称 + + # 共享上下文 + shared_context: dict # Agent 间共享的数据 + + # 最终输出 + final_output: str + status: Literal["running", "completed", "failed"] # 运行状态 + + +def create_initial_state(task: str, session_id: str = None) -> AgentState: + """创建初始状态""" + return { + "original_task": task, + "session_id": session_id, + "task_plan": [], + "current_task_index": 0, + "results": {}, + "iteration": 0, + "next_node": "supervisor", + "shared_context": {}, + "final_output": "", + "status": "running" + } diff --git a/agent/app/agent/multi/workers/__init__.py b/agent/app/agent/multi/workers/__init__.py new file mode 100644 index 0000000..26e4538 --- /dev/null +++ b/agent/app/agent/multi/workers/__init__.py @@ -0,0 +1,14 @@ +""" +Worker Agents +""" +from .base import BaseWorker +from .research import ResearchWorker +from .coder import CoderWorker +from .review import ReviewWorker + +__all__ = [ + "BaseWorker", + "ResearchWorker", + "CoderWorker", + "ReviewWorker", +] diff --git a/agent/app/agent/multi/workers/base.py b/agent/app/agent/multi/workers/base.py new file mode 100644 index 0000000..b3e99c1 --- /dev/null +++ b/agent/app/agent/multi/workers/base.py @@ -0,0 +1,138 @@ +""" +Worker Agent 基类 +""" +import json +from abc import ABC, abstractmethod +from typing import Any, Optional +from langchain_core.language_models import BaseChatModel +from langchain_core.messages import HumanMessage, SystemMessage + +from ..types import AgentState, TaskItem, TaskStatus + + +class BaseWorker(ABC): + """Worker Agent 基类""" + + def __init__( + self, + llm: BaseChatModel, + name: str, + system_prompt: str, + tools: list = None, + tool_registry=None + ): + self.llm = llm + self.name = name + self.system_prompt = system_prompt + self.tools = tools or [] + self.tool_registry = tool_registry + + @abstractmethod + async def execute(self, task: TaskItem, context: dict) -> dict: + """ + 执行任务 + + Returns: + dict: { + "success": bool, + "content": str, + "context": dict, # 更新共享上下文 + "error": str (optional) + } + """ + pass + + def create_node(self): + """创建 LangGraph 节点""" + async def node(state: AgentState) -> dict: + task_index = state.get("current_task_index", 0) + task_plan = state.get("task_plan", []) + + if task_index >= len(task_plan): + return {"next_node": "aggregator"} + + task = task_plan[task_index] + shared_context = state.get("shared_context", {}) + + # 更新任务状态为 running + updated_plan = self._update_task_status(task_plan, task.id, TaskStatus.RUNNING) + + try: + # 执行任务 + result = await self.execute(task, shared_context) + + # 更新任务状态 + if result.get("success"): + updated_plan = self._update_task_status( + updated_plan, + task.id, + TaskStatus.COMPLETED, + result=result.get("content", "") + ) + else: + updated_plan = self._update_task_status( + updated_plan, + task.id, + TaskStatus.FAILED, + error=result.get("error", "Unknown error") + ) + + # 构建新上下文 + new_context = {**shared_context, **(result.get("context", {}))} + + return { + "task_plan": updated_plan, + "results": {**state.get("results", {}), task.id: result}, + "shared_context": new_context, + "next_node": "review" + } + + except Exception as e: + # 执行出错 + updated_plan = self._update_task_status( + updated_plan, + task.id, + TaskStatus.FAILED, + error=str(e) + ) + + return { + "task_plan": updated_plan, + "results": {**state.get("results", {}), task.id: {"error": str(e)}}, + "next_node": "review" + } + + return node + + def _update_task_status( + self, + tasks: list, + task_id: str, + status: TaskStatus, + result: Any = None, + error: str = None + ) -> list: + """更新任务状态""" + return [ + { + **task.model_dump() if hasattr(task, 'model_dump') else task, + "status": status, + "result": result, + "error": error + } + for task in tasks + ] + + def _build_messages(self, task: str, context: dict) -> list: + """构建消息列表""" + context_str = json.dumps(context, ensure_ascii=False, indent=2) if context else "无" + + user_prompt = self.system_prompt.format( + task=task, + context=context_str + ) + + return [ + SystemMessage(content=user_prompt), + HumanMessage(content=task) + ] diff --git a/agent/app/agent/multi/workers/coder.py b/agent/app/agent/multi/workers/coder.py new file mode 100644 index 0000000..2f1df3b --- /dev/null +++ b/agent/app/agent/multi/workers/coder.py @@ -0,0 +1,146 @@ +""" +Coder Worker - 代码编写和修改 +""" +import json +from langchain_core.language_models import BaseChatModel + +from .base import BaseWorker +from ..types import TaskItem +from ..prompts import CODER_SYSTEM_PROMPT + + +class CoderWorker(BaseWorker): + """Coder Worker - 代码编写和修改""" + + def __init__( + self, + llm: BaseChatModel, + tool_registry=None, + tools: list = None + ): + super().__init__( + llm=llm, + name="coder", + system_prompt=CODER_SYSTEM_PROMPT, + tools=tools or [], + tool_registry=tool_registry + ) + + async def execute(self, task: TaskItem, context: dict) -> dict: + """执行编码任务""" + + # 构建消息 + messages = self._build_messages(task.description, context) + + # 如果有代码执行工具,启用它 + if self.tool_registry: + tool_defs = self._get_available_tools() + if tool_defs: + try: + response = await self.llm.agenerate( + messages=messages, + tools=tool_defs + ) + return self._handle_tool_response(response, messages) + except Exception: + # 如果工具调用失败,回退到普通调用 + pass + + # 普通调用 + try: + response = await self.llm.ainvoke(messages) + + content = response.content if hasattr(response, 'content') else str(response) + + return { + "success": True, + "content": content, + "context": { + "code_written": True, + "last_coder": self.name + } + } + + except Exception as e: + return { + "success": False, + "content": "", + "error": str(e), + "context": {} + } + + def _get_available_tools(self) -> list: + """获取可用工具定义""" + if not self.tool_registry: + return [] + + tool_names = self.tools or ["search", "execute_code"] + tool_defs = [] + + for tool_name in tool_names: + tool_def = self.tool_registry.get_tool_definition(tool_name) + if tool_def: + tool_defs.append(tool_def) + + return tool_defs + + def _handle_tool_response(self, response, original_messages: list) -> dict: + """处理工具调用响应""" + # 简化实现 + response_message = response.generations[0][0] + + if hasattr(response_message, "tool_calls") and response_message.tool_calls: + # 有工具调用 + tool_results = [] + for tool_call in response_message.tool_calls: + tool_name = tool_call.name + tool_args = tool_call.arguments + + # 执行工具 + try: + tool_func, _ = self.tool_registry.get_tool(tool_name) + result = tool_func(**tool_args) + tool_results.append({ + "tool": tool_name, + "result": str(result) + }) + except Exception as e: + tool_results.append({ + "tool": tool_name, + "error": str(e) + }) + + # 将工具结果添加到消息 + for msg in response.generations[0]: + original_messages.append(msg) + + for tool_result in tool_results: + original_messages.append({ + "role": "tool", + "content": json.dumps(tool_result, ensure_ascii=False) + }) + + # 再次调用 LLM 生成最终响应 + final_response = await self.llm.ainvoke(original_messages) + content = final_response.content if hasattr(final_response, 'content') else str(final_response) + + return { + "success": True, + "content": content, + "context": { + "code_written": True, + "tool_results": tool_results, + "last_coder": self.name + } + } + else: + # 无工具调用 + content = response_message.text if hasattr(response_message, 'text') else str(response_message) + return { + "success": True, + "content": content, + "context": { + "code_written": True, + "last_coder": self.name + } + } diff --git a/agent/app/agent/multi/workers/research.py b/agent/app/agent/multi/workers/research.py new file mode 100644 index 0000000..94b74e5 --- /dev/null +++ b/agent/app/agent/multi/workers/research.py @@ -0,0 +1,70 @@ +""" +Research Worker - 信息搜索和调研 +""" +import json +from langchain_core.language_models import BaseChatModel + +from .base import BaseWorker +from ..types import TaskItem +from ..prompts import RESEARCH_SYSTEM_PROMPT + + +class ResearchWorker(BaseWorker): + """Research Worker - 信息搜索和调研""" + + def __init__( + self, + llm: BaseChatModel, + tool_registry=None, + tools: list = None + ): + super().__init__( + llm=llm, + name="research", + system_prompt=RESEARCH_SYSTEM_PROMPT, + tools=tools or [], + tool_registry=tool_registry + ) + + async def execute(self, task: TaskItem, context: dict) -> dict: + """执行调研任务""" + + # 构建消息 + messages = self._build_messages(task.description, context) + + try: + # 调用 LLM + response = await self.llm.ainvoke(messages) + + content = response.content if hasattr(response, 'content') else str(response) + + # 尝试提取搜索结果 + search_results = self._extract_search_results(content) + + return { + "success": True, + "content": content, + "context": { + "research_results": search_results, + "last_research_by": self.name + } + } + + except Exception as e: + return { + "success": False, + "content": "", + "error": str(e), + "context": {} + } + + def _extract_search_results(self, content: str) -> list: + """从内容中提取搜索结果""" + # 简单实现:查找以 - 或 * 开头的行 + results = [] + for line in content.split('\n'): + line = line.strip() + if line.startswith(('- ', '* ', '1. ', '2. ', '3. ')): + results.append(line.lstrip('-*123. ')) + + return results[:10] # 限制数量 diff --git a/agent/app/agent/multi/workers/review.py b/agent/app/agent/multi/workers/review.py new file mode 100644 index 0000000..c62c331 --- /dev/null +++ b/agent/app/agent/multi/workers/review.py @@ -0,0 +1,174 @@ +""" +Review Worker - 结果检查和质量评审 +""" +import json +import re +from langchain_core.language_models import BaseChatModel +from langchain_core.messages import HumanMessage, SystemMessage + +from .base import BaseWorker +from ..types import AgentState, TaskItem, TaskStatus, ReviewResult +from ..prompts import REVIEW_SYSTEM_PROMPT + + +class ReviewWorker(BaseWorker): + """Review Worker - 结果检查和质量评审""" + + def __init__( + self, + llm: BaseChatModel, + tool_registry=None, + tools: list = None + ): + super().__init__( + llm=llm, + name="review", + system_prompt=REVIEW_SYSTEM_PROMPT, + tools=tools or [], + tool_registry=tool_registry + ) + + async def execute(self, task: TaskItem, context: dict) -> dict: + """执行评审任务""" + + # 获取当前任务索引和任务计划 + # 注意:这里需要从 context 中获取更多信息 + + # 构建 prompt + context_str = json.dumps(context, ensure_ascii=False, indent=2) if context else "无" + + prompt = REVIEW_SYSTEM_PROMPT.format( + original_task=context.get("original_task", ""), + task_description=task.description, + execution_result=task.result if task.result else "无结果", + context=context_str + ) + + try: + # 调用 LLM 进行评审 + response = await self.llm.ainvoke([ + SystemMessage(content=prompt), + HumanMessage(content="请评审以上执行结果。") + ]) + + # 解析评审结果 + review_result = self._parse_review_response(response.content) + + # 根据评审结果决定下一步 + if review_result.passed: + # 通过,更新任务状态为 completed + new_status = TaskStatus.COMPLETED + next_node = "supervisor" # 返回 Supervisor 继续执行 + else: + # 未通过,检查是否可重试 + if review_result.retryable: + new_status = TaskStatus.NEEDS_RETRY + next_node = "supervisor" # 返回 Supervisor 决定是否重试 + else: + new_status = TaskStatus.FAILED + next_node = "aggregator" # 失败,进入汇总 + + return { + "success": review_result.passed, + "content": response.content, + "review_result": review_result.model_dump() if hasattr(review_result, 'model_dump') else dict(review_result), + "context": { + "review_passed": review_result.passed, + "issues": review_result.issues, + "last_review_by": self.name + } + } + + except Exception as e: + return { + "success": False, + "content": "", + "error": str(e), + "context": {} + } + + def _parse_review_response(self, response: str) -> ReviewResult: + """解析评审响应""" + try: + # 尝试提取 JSON + json_match = re.search(r'\{[\s\S]*\}', response) + if json_match: + data = json.loads(json_match.group()) + else: + raise ValueError("No JSON found") + + return ReviewResult( + passed=data.get("passed", True), + issues=data.get("issues", []), + suggestions=data.get("suggestions", []), + retryable=data.get("retryable", True) + ) + + except Exception: + # 解析失败,默认通过 + return ReviewResult( + passed=True, + issues=[], + suggestions=[], + retryable=True + ) + + def create_node(self): + """创建 LangGraph 节点""" + async def node(state: AgentState) -> dict: + task_index = state.get("current_task_index", 0) + task_plan = state.get("task_plan", []) + + if task_index >= len(task_plan): + return {"next_node": "aggregator"} + + task = task_plan[task_index] + shared_context = { + **state.get("shared_context", {}), + "original_task": state.get("original_task", "") + } + + try: + # 执行评审 + result = await self.execute(task, shared_context) + + # 更新任务状态 + review_passed = result.get("review_result", {}).get("passed", True) + retryable = result.get("review_result", {}).get("retryable", True) + + if review_passed: + updated_status = TaskStatus.COMPLETED + elif retryable: + updated_status = TaskStatus.NEEDS_RETRY + else: + updated_status = TaskStatus.FAILED + + updated_plan = self._update_task_status( + task_plan, + task.id, + updated_status, + result=task.result + ) + + # 确定下一步 + if updated_status == TaskStatus.COMPLETED: + next_node = "supervisor" + elif updated_status == TaskStatus.NEEDS_RETRY: + next_node = "supervisor" + else: + next_node = "aggregator" + + return { + "task_plan": updated_plan, + "results": {**state.get("results", {}), f"{task.id}_review": result}, + "shared_context": {**shared_context, **result.get("context", {})}, + "next_node": next_node + } + + except Exception as e: + return { + "next_node": "aggregator", + "results": {**state.get("results", {}), f"{task.id}_review": {"error": str(e)}} + } + + return node diff --git a/multi_agent_plan/implementation_plan.md b/multi_agent_plan/implementation_plan.md new file mode 100644 index 0000000..94899ec --- /dev/null +++ b/multi_agent_plan/implementation_plan.md @@ -0,0 +1,709 @@ +# 多智能体联动系统实现计划 + +## 项目概述 + +基于 LangGraph 实现类似 OpenClaw 的多智能体协作系统,采用 Supervisor + Workers 层级架构。 + +### 核心特性 +- **任务规划**: Supervisor 分析任务并生成执行计划 +- **动态分发**: LLM 自主决策调用哪个 Worker +- **并行执行**: 支持多个 Worker 同时处理任务 +- **结果汇总**: Supervisor 汇总所有 Worker 结果 +- **迭代优化**: 支持 Review 机制和迭代重试 + +--- + +## 一、系统架构 + +### 1.1 整体架构图 + +``` +┌─────────────────────────────────────────────────────────────────┐ +│ MultiAgentSystem │ +│ ┌───────────────────────────────────────────────────────────┐ │ +│ │ Supervisor Agent │ │ +│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ +│ │ │ Planner │ │ Dispatcher │ │ Aggregator │ │ │ +│ │ │ (任务规划) │ │ (任务分发) │ │ (结果汇总) │ │ │ +│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │ +│ └───────────────────────────────────────────────────────────┘ │ +│ │ │ +│ ┌──────────────────┼──────────────────┐ │ +│ ▼ ▼ ▼ │ +│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ +│ │ Research │ │ Coder │ │ Review │ │ +│ │ Worker │ │ Worker │ │ Worker │ │ +│ └─────────────┘ └─────────────┘ └─────────────┘ │ +│ │ │ │ │ +│ └──────────────────┴──────────────────┘ │ +│ │ │ +│ ▼ │ +│ ┌─────────────┐ │ +│ │ Shared State │ │ +│ │ (共享状态) │ │ +│ └─────────────┘ │ +└─────────────────────────────────────────────────────────────────┘ +``` + +### 1.2 核心组件 + +| 组件 | 职责 | 文件位置 | +|------|------|----------| +| **SupervisorAgent** | 任务分析、规划、分发、汇总 | `agent/multi/supervisor.py` | +| **BaseWorker** | Worker 基类,定义执行接口 | `agent/multi/workers/base.py` | +| **ResearchWorker** | 信息搜索和调研 | `agent/multi/workers/research.py` | +| **CoderWorker** | 代码编写和修改 | `agent/multi/workers/coder.py` | +| **ReviewWorker** | 结果检查和评审 | `agent/multi/workers/review.py` | +| **SharedState** | 跨 Agent 共享状态 | `agent/multi/state.py` | +| **TaskQueue** | 任务队列管理 | `agent/multi/queue.py` | +| **MultiAgentGraph** | LangGraph 流程编排 | `agent/multi/graph.py` | + +--- + +## 二、数据结构设计 + +### 2.1 Agent State 定义 + +```python +# agent/multi/types.py +from typing import TypedDict, Annotated, Optional +from operator import add +from pydantic import BaseModel + + +class TaskItem(BaseModel): + """单个任务项""" + id: str + description: str + assigned_agent: str # research / coder / review + status: str # pending / running / completed / failed + result: Optional[dict] = None + error: Optional[str] = None + retry_count: int = 0 + + +class AgentState(TypedDict): + """贯穿整个图的 Agent 状态""" + # 用户输入 + original_task: str # 原始任务描述 + + # 任务规划 + task_plan: list[TaskItem] # 分解后的任务列表 + current_task_index: int # 当前执行的任务索引 + + # 执行结果 + results: dict # {task_id: result} + + # 流程控制 + iteration: int # 当前迭代次数 + next_node: str # 下一个节点名称 + + # 共享上下文 + shared_context: dict # Agent 间共享的数据 + + # 最终输出 + final_output: str + status: str # running / completed / failed +``` + +### 2.2 Supervisor 输出结构 + +```python +# Supervisor 的结构化输出 +class SupervisorDecision(BaseModel): + """Supervisor 的决策""" + analysis: str # 任务分析 + task_plan: list[TaskItem] # 任务计划 + need_aggregation: bool # 是否需要汇总 + next_worker: str # 下一个执行的 Worker +``` + +--- + +## 三、核心实现 + +### 3.1 Supervisor Agent + +```python +# agent/multi/supervisor.py +from langchain_core.language_models import BaseChatModel +from langchain_core.output_parsers import PydanticOutputParser +from pydantic import BaseModel +from typing import Type + +from .types import AgentState, TaskItem, SupervisorDecision +from .prompts import SUPERVISOR_SYSTEM_PROMPT + + +class SupervisorAgent: + """Supervisor Agent - 负责任务规划和分发""" + + def __init__( + self, + llm: BaseChatModel, + max_iterations: int = 3 + ): + self.llm = llm + self.max_iterations = max_iterations + self.output_parser = PydanticOutputParser(pydantic_object=SupervisorDecision) + + def create_node(self): + """创建 Supervisor 节点""" + return self._supervisor_node + + async def _supervisor_node(self, state: AgentState) -> dict: + """Supervisor 节点逻辑""" + # 首次调用:分析任务并生成计划 + if not state.get("task_plan"): + decision = await self._plan_tasks(state["original_task"]) + return { + "task_plan": decision.task_plan, + "next_node": decision.next_worker, + "current_task_index": 0, + "shared_context": {"task_analysis": decision.analysis} + } + + # 检查是否需要继续 + current_task = state["task_plan"][state["current_task_index"]] + + if current_task["status"] == "completed": + # 当前任务完成,检查是否还有更多任务 + if state["current_task_index"] + 1 < len(state["task_plan"]): + next_index = state["current_task_index"] + 1 + next_task = state["task_plan"][next_index] + return { + "current_task_index": next_index, + "next_node": next_task["assigned_agent"] + } + else: + # 所有任务完成,进入汇总 + return {"next_node": "aggregate"} + + elif current_task["status"] == "failed": + # 任务失败,检查是否超过最大重试 + if current_task["retry_count"] >= self.max_iterations: + return {"next_node": "aggregate", "status": "failed"} + else: + # 重试 + return {"next_node": current_task["assigned_agent"]} + + return {"next_node": state.get("next_node", "aggregate")} + + async def _plan_tasks(self, task: str) -> SupervisorDecision: + """调用 LLM 生成任务计划""" + prompt = SUPERVISOR_SYSTEM_PROMPT.format(task=task) + + response = await self.llm.ainvoke([ + {"role": "system", "content": prompt}, + {"role": "user", "content": "请分析任务并制定执行计划。"} + ]) + + # 解析 LLM 输出为结构化决策 + # ... (实现解析逻辑) + return decision +``` + +### 3.2 Worker 基类 + +```python +# agent/multi/workers/base.py +from abc import ABC, abstractmethod +from typing import Any +from langchain_core.language_models import BaseChatModel + +from ..types import AgentState + + +class BaseWorker(ABC): + """Worker Agent 基类""" + + def __init__( + self, + llm: BaseChatModel, + name: str, + system_prompt: str, + tools: list = None + ): + self.llm = llm + self.name = name + self.system_prompt = system_prompt + self.tools = tools or [] + + @abstractmethod + async def execute(self, task: TaskItem, context: dict) -> dict: + """执行任务""" + pass + + def create_node(self): + """创建 LangGraph 节点""" + async def node(state: AgentState) -> dict: + task = state["task_plan"][state["current_task_index"]] + result = await self.execute(task, state.get("shared_context", {})) + + # 更新状态 + return { + "results": {task.id: result}, + "task_plan": self._update_task_status( + state["task_plan"], + task.id, + "completed" if result.get("success") else "failed" + ), + "shared_context": {**state.get("shared_context", {}), **result.get("context", {})} + } + + return node + + def _update_task_status(self, tasks: list, task_id: str, status: str) -> list: + """更新任务状态""" + return [ + {**task, "status": status} if task["id"] == task_id else task + for task in tasks + ] +``` + +### 3.3 任务队列(可选:支持并行执行) + +```python +# agent/multi/queue.py +import asyncio +from typing import Any, Callable +from dataclasses import dataclass +from enum import Enum + + +class TaskStatus(Enum): + PENDING = "pending" + RUNNING = "running" + COMPLETED = "completed" + FAILED = "failed" + + +@dataclass +class QueuedTask: + id: str + agent_name: str + task_data: Any + status: TaskStatus = TaskStatus.PENDING + result: Any = None + error: str = None + + +class TaskQueue: + """任务队列 - 支持并行执行多个 Worker""" + + def __init__(self, max_concurrent: int = 3): + self.max_concurrent = max_concurrent + self.queue: asyncio.Queue = asyncio.Queue() + self.results: dict = {} + self._running = 0 + + async def add_task(self, task: QueuedTask): + """添加任务到队列""" + await self.queue.put(task) + + async def execute_all(self, worker_factory: Callable): + """执行所有任务""" + async def worker(): + while True: + try: + task = self.queue.get_nowait() + except asyncio.QueueEmpty: + break + + self._running += 1 + task.status = TaskStatus.Running + + try: + worker_instance = worker_factory(task.agent_name) + task.result = await worker_instance.execute(task.task_data) + task.status = TaskStatus.COMPLETED + except Exception as e: + task.status = TaskStatus.FAILED + task.error = str(e) + finally: + self._running -= 1 + self.results[task.id] = task + + # 启动多个 worker 协程 + workers = [asyncio.create_task(worker()) for _ in range(self.max_concurrent)] + await asyncio.gather(*workers) + + return self.results +``` + +### 3.4 LangGraph 流程编排 + +```python +# agent/multi/graph.py +from langgraph.graph import StateGraph, END +from langgraph.prebuilt import ToolNode + +from .types import AgentState +from .supervisor import SupervisorAgent +from .workers.research import ResearchWorker +from .workers.coder import CoderWorker +from .workers.review import ReviewWorker +from .aggregator import ResultAggregator + + +def create_multi_agent_graph( + llm, + tool_registry, + max_iterations: int = 3 +) -> StateGraph: + """创建多 Agent 流程图""" + + # 初始化组件 + supervisor = SupervisorAgent(llm, max_iterations) + research_worker = ResearchWorker(llm, tool_registry) + coder_worker = CoderWorker(llm, tool_registry) + review_worker = ReviewWorker(llm, tool_registry) + aggregator = ResultAggregator(llm) + + # 创建图 + graph = StateGraph(AgentState) + + # 添加节点 + graph.add_node("supervisor", supervisor.create_node()) + graph.add_node("research", research_worker.create_node()) + graph.add_node("coder", coder_worker.create_node()) + graph.add_node("review", review_worker.create_node()) + graph.add_node("aggregate", aggregator.create_node()) + + # 设置入口 + graph.set_entry_point("supervisor") + + # 添加边 + graph.add_edge("supervisor", "research") + graph.add_edge("research", "review") + graph.add_edge("coder", "review") + + # 条件边:从 review 回到 supervisor + def should_continue(state: AgentState) -> str: + if state.get("status") == "failed": + return "aggregate" + if state.get("iteration", 0) >= max_iterations: + return "aggregate" + if state.get("current_task_index", 0) >= len(state.get("task_plan", [])): + return "aggregate" + return "supervisor" + + graph.add_conditional_edges( + "review", + should_continue, + { + "supervisor": "supervisor", + "aggregate": "aggregate" + } + ) + + # 结束节点 + graph.add_edge("aggregate", END) + + return graph.compile() +``` + +--- + +## 四、Prompt 设计 + +### 4.1 Supervisor System Prompt + +```python +# agent/multi/prompts.py + +SUPERVISOR_SYSTEM_PROMPT = """你是一个任务规划专家(Supervisor)。你的职责是将复杂任务分解为可执行的子任务,并分配给合适的执行 Agent。 + +## 可用的 Worker Agent +- **research**: 信息搜索和调研 +- **coder**: 代码编写、修改和调试 +- **review**: 结果检查、质量评审 + +## 任务 +{task} + +## 请按以下步骤执行 + +### 步骤 1: 任务分析 +分析任务的性质,确定需要哪些步骤来完成。 + +### 步骤 2: 任务分解 +将任务分解为独立的子任务。每个子任务应该: +- 描述清晰 +- 可以由单个 Agent 完成 +- 有明确的完成标准 + +### 步骤 3: 分配 Agent +为每个子任务选择最合适的执行 Agent。 + +### 步骤 4: 确定执行顺序 +如果有依赖关系,确定正确的执行顺序。 + +## 输出格式 +请以 JSON 格式输出你的决策: +```json +{{ + "analysis": "任务分析...", + "task_plan": [ + {{ + "id": "task_1", + "description": "子任务描述", + "assigned_agent": "research" + }}, + {{ + "id": "task_2", + "description": "子任务描述", + "assigned_agent": "coder" + }} + ], + "need_aggregation": true, + "next_worker": "research" +}} +``` + +## 注意 +- 如果任务很简单,可以只分配给一个 Agent +- 如果任务需要迭代优化,确保有 review 环节 +- 考虑任务之间的依赖关系 +""" +``` + +### 4.2 Review Worker Prompt + +```python +REVIEW_SYSTEM_PROMPT = """你是一个代码和结果评审专家(Reviewer)。你的职责是检查任务执行结果是否符合要求。 + +## 任务描述 +{task_description} + +## 执行结果 +{execution_result} + +## 检查标准 +1. 结果是否完整解决了原始任务? +2. 输出格式是否正确? +3. 是否存在明显的错误或遗漏? +4. 代码是否有潜在问题? + +## 请输出评审结果 +```json +{{ + "passed": true/false, + "issues": [ + {{"severity": "high/medium/low", "description": "问题描述"}} + ], + "suggestions": ["改进建议"] +}} +``` +""" +``` + +--- + +## 五、迭代控制 + +### 5.1 迭代逻辑 + +```python +# agent/multi/iteration.py +from typing import Optional + + +class IterationController: + """迭代控制器""" + + def __init__(self, max_iterations: int = 3): + self.max_iterations = max_iterations + + def should_continue( + self, + iteration: int, + task_status: str, + review_result: dict + ) -> tuple[bool, str]: + """ + 判断是否继续迭代 + + Returns: + (是否继续, 原因) + """ + # 超过最大迭代次数 + if iteration >= self.max_iterations: + return False, "max_iterations_reached" + + # 任务成功完成 + if task_status == "completed" and review_result.get("passed"): + return False, "task_completed" + + # 任务失败且不可重试 + if task_status == "failed" and not review_result.get("retryable"): + return False, "task_failed" + + # 需要重试 + if review_result.get("issues") and review_result.get("passed") is False: + return True, "needs_retry" + + return True, "continue" + + def get_next_action( + self, + review_result: dict, + current_worker: str + ) -> str: + """确定下一步动作""" + if review_result.get("passed"): + return "supervisor" # 返回 Supervisor + + # 根据问题类型决定下一步 + issues = review_result.get("issues", []) + high_severity = any(i.get("severity") == "high" for i in issues) + + if high_severity: + # 严重问题,重新执行相同任务 + return current_worker + else: + # 轻微问题,可以继续 + return "supervisor" +``` + +--- + +## 六、与现有系统集成 + +### 6.1 复用现有组件 + +```python +# agent/multi/integration.py +from app.agent.core.agent import AgentManager +from app.agent.tools.registry import ToolRegistry +from app.agent.memory.session import SessionManager +from app.llm.factory import LLMFactory + + +class MultiAgentSystem: + """多智能体系统 - 集成现有组件""" + + def __init__(self, config: dict): + # 复用现有 LLM Factory + self.llm_factory = LLMFactory( + provider=config.get("llm_provider", "openai"), + openai_api_key=config.get("openai_api_key"), + anthropic_api_key=config.get("anthropic_api_key") + ) + + # 复用现有 Tool Registry + self.tool_registry = ToolRegistry() + self._register_default_tools() + + # 复用现有 Session Manager + self.session_manager = SessionManager() + + # 配置 + self.max_iterations = config.get("max_iterations", 3) + + def _register_default_tools(self): + """注册默认工具""" + # 从现有 Agent 复制工具注册逻辑 + from app.agent.tools.impl import search, calculator + self.tool_registry.register( + name="search", + func=search.search_web, + description="Search the web", + security_level="safe" + ) + # ... 其他工具 + + async def execute(self, task: str, session_id: str = None) -> dict: + """执行多 Agent 任务""" + # 创建 LangGraph + from .graph import create_multi_agent_graph + + llm = self.llm_factory.get_llm() + graph = create_multi_agent_graph( + llm=llm, + tool_registry=self.tool_registry, + max_iterations=self.max_iterations + ) + + # 初始化状态 + initial_state = { + "original_task": task, + "task_plan": [], + "current_task_index": 0, + "results": {}, + "iteration": 0, + "next_node": "supervisor", + "shared_context": {}, + "final_output": "", + "status": "running" + } + + # 执行 + result = await graph.ainvoke(initial_state) + + # 保存到 session + if session_id: + self.session_manager.add_message(session_id, "user", task) + self.session_manager.add_message(session_id, "assistant", result["final_output"]) + + return result +``` + +--- + +## 七、文件结构 + +``` +agent/ +├── __init__.py +├── multi/ +│ ├── __init__.py +│ ├── types.py # 数据类型定义 +│ ├── prompts.py # Prompt 模板 +│ ├── supervisor.py # Supervisor Agent +│ ├── graph.py # LangGraph 流程图 +│ ├── iteration.py # 迭代控制 +│ ├── integration.py # 与现有系统集成 +│ ├── queue.py # 任务队列(可选) +│ └── workers/ +│ ├── __init__.py +│ ├── base.py # Worker 基类 +│ ├── research.py # Research Worker +│ ├── coder.py # Coder Worker +│ └── review.py # Review Worker +``` + +--- + +## 八、实现顺序 + +1. **Phase 1: 基础架构** + - 定义数据类型 (types.py) + - 创建 Prompt 模板 (prompts.py) + +2. **Phase 2: Supervisor** + - 实现 SupervisorAgent + - 实现任务规划和分发逻辑 + +3. **Phase 3: Workers** + - 实现 BaseWorker + - 实现 ResearchWorker + - 实现 CoderWorker + - 实现 ReviewWorker + +4. **Phase 4: 流程编排** + - 实现 LangGraph 流程图 + - 添加条件边和迭代控制 + +5. **Phase 5: 集成** + - 与现有 Agent 系统集成 + - 添加 API 接口 + +--- + +## 九、测试计划 + +1. **单元测试**: 测试各 Worker 的执行逻辑 +2. **集成测试**: 测试完整的 Supervisor + Workers 流程 +3. **迭代测试**: 测试重试和迭代逻辑 +4. **端到端测试**: 模拟真实任务执行 diff --git a/multi_agent_plan/notes.md b/multi_agent_plan/notes.md new file mode 100644 index 0000000..f133b9e --- /dev/null +++ b/multi_agent_plan/notes.md @@ -0,0 +1,107 @@ +# Notes: LangGraph 多智能体研究 + +## 核心概念 + +### LangGraph 基础 +- **StateGraph**: 有向无环图(DAG),节点是 Agent/函数,边是流转逻辑 +- **State**: 贯穿整个图流动的状态对象 +- **Node**: 执行单元(可以是 Agent、函数、条件判断) +- **Edge**: 连接节点的边,支持条件边(conditional edges) + +### Supervisor + Workers 模式参考 + +#### 1. LangChain 官方 Supervisor 示例 +```python +from langgraph.prebuilt import create_react_agent +from langgraph.graph import StateGraph, END + +# 定义 Workers +research_agent = create_react_agent(llm, tools=[search]) +coder_agent = create_react_agent(llm, tools=[write_file]) + +# 定义 Supervisor 节点 +def supervisor_node(state): + # LLM 决定下一步调用哪个 Agent + response = llm.with_structured_output(SupervisorOutput).invoke( + [SystemMessage(content=SUPERVISOR_PROMPT)] + state["messages"] + ) + return {"next": response.next_agent} + +# 构建图 +graph = StateGraph(AgentState) +graph.add_node("supervisor", supervisor_node) +graph.add_node("research", research_agent) +graph.add_node("code", coder_agent) +``` + +#### 2. 状态定义 +```python +from typing import TypedDict, Annotated +import operator + +class AgentState(TypedDict): + messages: Annotated[list, operator.add] + task: str + plan: list + results: dict + iteration: int + next: str # 控制下一步流向 +``` + +#### 3. 条件边实现 +```python +def should_continue(state): + if state["iteration"] >= MAX_ITERATIONS: + return "end" + if state.get("task_complete"): + return "end" + return "continue" + +graph.add_conditional_edges( + "review", + should_continue, + { + "continue": "supervisor", + "end": END + } +) +``` + +## 设计决策 + +### 架构优势 +1. **清晰的分层**: Supervisor 负责任务规划,Workers 负责执行 +2. **可扩展**: 容易添加新的 Worker 类型 +3. **可控**: 迭代次数全局配置 +4. **灵活**: 支持条件分支和循环 + +### 需要解决的问题 +1. **Supervisor 如何做规划**: 需要设计 prompt 让 LLM 生成任务列表 +2. **任务队列**: 需要支持并行分发多个 Worker +3. **共享上下文**: 需要设计数据结构在 Agent 间共享状态 +4. **Review 机制**: 需要定义检查标准和重试逻辑 + +## 关键 Prompt 设计 + +### Supervisor System Prompt +``` +你是一个任务规划专家(Supervisor)。用户的任务是:{task} + +请按以下步骤执行: +1. 分析任务需求和约束 +2. 将任务分解为可执行的子任务 +3. 为每个子任务选择合适的执行 Agent: + - research: 信息搜索和调研 + - coder: 代码编写和修改 + - review: 结果检查和评审 +4. 确定执行顺序和依赖关系 + +当前任务进度:{progress} +共享上下文:{context} + +请输出你的决策,格式如下: +- 需要执行的子任务列表 +- 每个任务的执行 Agent +- 任务执行顺序 +- 是否需要汇总结果 +``` diff --git a/multi_agent_plan/task_plan.md b/multi_agent_plan/task_plan.md new file mode 100644 index 0000000..d567018 --- /dev/null +++ b/multi_agent_plan/task_plan.md @@ -0,0 +1,33 @@ +# Task Plan: 多智能体联动系统实现计划 + +## Goal +基于 LangGraph 实现类似 OpenClaw 的多智能体联动系统,支持任务规划、动态分发、结果汇总和迭代优化。 + +## Phases +- [x] Phase 1: 系统架构设计和核心组件规划 +- [ ] Phase 2: Supervisor Agent 实现 +- [ ] Phase 3: Worker Agent 实现 +- [ ] Phase 4: 任务队列和共享上下文实现 +- [ ] Phase 5: State Machine 流程控制实现 +- [ ] Phase 6: 迭代控制和 Review 机制实现 +- [ ] Phase 7: 与现有 Agent 系统集成 + +## Key Questions +1. 如何用 LangGraph 实现 Supervisor + Workers 架构? +2. 如何设计任务队列支持并行执行? +3. 如何实现共享上下文在 Agent 间传递? +4. 如何控制迭代次数和流程分支? + +## Decisions Made +- 架构:Supervisor + Workers 层级模式 +- 协作方式:LLM 自主决策任务分配 +- 通信:共享内存(Shared Context) +- 迭代控制:全局最大迭代次数配置 +- Workers 定义:复用现有 tool_registry + +## Status +**Currently in Phase 1** - 系统架构设计和核心组件规划已完成 + +## 实现计划文件 +- `implementation_plan.md` - 详细的实现计划 +- `notes.md` - LangGraph 研究笔记