"""协作工具 - Phase 6.4""" from typing import Any from app.agents.tools.base import WriteTool from app.agents.tools.manifest import ( PermissionClass, SideEffectScope, ) class TeamAgentTool(WriteTool): """团队 Agent 通信工具 用于与其他 Agent 进行消息传递和协作。 """ def __init__(self): super().__init__( name="team_agent", description="向团队 Agent 发送消息或请求协作", permission_class=PermissionClass.WRITE, side_effect_scope=SideEffectScope.LOCAL_STATE, tags=["collaboration", "team", "agent"], ) def get_parameters(self) -> dict[str, Any]: return { "type": "object", "properties": { "agent_name": { "type": "string", "description": "目标 Agent 名称", }, "message": { "type": "string", "description": "要发送的消息", }, "action": { "type": "string", "enum": ["send", "request", "delegate"], "description": "操作类型", }, }, "required": ["agent_name", "message"], } def get_return_schema(self) -> dict[str, Any]: return { "type": "object", "properties": { "success": {"type": "boolean"}, "response": {"type": "string"}, }, } async def execute(self, agent_name: str, message: str, action: str = "send") -> dict[str, Any]: # 注意:实际实现需要通过 Agent 通信协议 # 这里只是一个框架实现 return { "success": True, "response": f"Message '{action}' to agent '{agent_name}': {message}", "agent_name": agent_name, "action": action, } class TaskBroadcastTool(WriteTool): """任务广播工具 向多个 Agent 广播任务。 """ def __init__(self): super().__init__( name="task_broadcast", description="向多个 Agent 广播任务", permission_class=PermissionClass.WRITE, side_effect_scope=SideEffectScope.LOCAL_STATE, tags=["collaboration", "broadcast", "task"], ) def get_parameters(self) -> dict[str, Any]: return { "type": "object", "properties": { "agent_names": { "type": "array", "items": {"type": "string"}, "description": "目标 Agent 列表", }, "task": { "type": "string", "description": "要广播的任务描述", }, "priority": { "type": "string", "enum": ["low", "normal", "high", "urgent"], "description": "任务优先级", }, }, "required": ["agent_names", "task"], } def get_return_schema(self) -> dict[str, Any]: return { "type": "object", "properties": { "success": {"type": "boolean"}, "broadcast_to": {"type": "array", "items": {"type": "string"}}, "responses": {"type": "array"}, }, } async def execute( self, agent_names: list[str], task: str, priority: str = "normal", ) -> dict[str, Any]: # 注意:实际实现需要通过 Agent 通信协议 # 这里只是一个框架实现 return { "success": True, "broadcast_to": agent_names, "task": task, "priority": priority, "responses": [f"Acknowledged by {agent}" for agent in agent_names], }