""" Agent Collaboration Protocol Inter-agent tool collaboration messaging system. """ import uuid from datetime import datetime from enum import Enum from typing import Any, Callable, Dict, Optional from pydantic import BaseModel, Field class MessageType(str, Enum): """Collaboration message types""" REQUEST = "request" # Request collaboration RESPONSE = "response" # Response result PROGRESS = "progress" # Progress update CANCEL = "cancel" # Cancel request class CollaborationMessage(BaseModel): """Collaboration message model""" id: str = Field(default_factory=lambda: str(uuid.uuid4())) type: MessageType from_agent: str to_agent: str content: Dict[str, Any] metadata: Dict[str, Any] = Field(default_factory=dict) timestamp: datetime = Field(default_factory=datetime.utcnow) def is_request(self) -> bool: return self.type == MessageType.REQUEST def is_response(self) -> bool: return self.type == MessageType.RESPONSE class CollaborationProtocol: """Agent collaboration protocol for inter-agent tool requests""" def __init__(self): self._pending_requests: Dict[str, CollaborationMessage] = {} self._handlers: Dict[str, Callable] = {} self._response_futures: Dict[str, asyncio.Future] = {} def register_handler(self, tool_name: str, handler: Callable) -> None: """Register a tool handler for collaboration Args: tool_name: Name of the tool handler: Async callable to handle the tool execution """ self._handlers[tool_name] = handler async def request_collaboration( self, from_agent: str, to_agent: str, tool_name: str, parameters: Dict[str, Any], timeout_ms: int = 30000, ) -> Dict[str, Any]: """Request collaboration from another agent Args: from_agent: Source agent name to_agent: Target agent name tool_name: Tool to execute parameters: Tool parameters timeout_ms: Timeout in milliseconds Returns: Execution result dict with status and result/error """ import asyncio request_id = str(uuid.uuid4()) message = CollaborationMessage( id=request_id, type=MessageType.REQUEST, from_agent=from_agent, to_agent=to_agent, content={ "tool": tool_name, "parameters": parameters, }, metadata={"timeout": timeout_ms}, ) self._pending_requests[request_id] = message # Create future for response future = asyncio.get_event_loop().create_future() self._response_futures[request_id] = future # Send the message await self._send_message(message) # Wait for response with timeout try: result = await asyncio.wait_for(future, timeout=timeout_ms / 1000) return result except asyncio.TimeoutError: return { "status": "error", "error": "Collaboration request timed out", } finally: self._pending_requests.pop(request_id, None) self._response_futures.pop(request_id, None) async def handle_request(self, message: CollaborationMessage) -> CollaborationMessage: """Handle an incoming collaboration request Args: message: The collaboration message Returns: Response message with result or error """ import uuid tool_name = message.content.get("tool") parameters = message.content.get("parameters", {}) handler = self._handlers.get(tool_name) if not handler: return CollaborationMessage( id=str(uuid.uuid4()), type=MessageType.RESPONSE, from_agent=message.to_agent, to_agent=message.from_agent, content={ "status": "error", "error": f"Unknown tool: {tool_name}", }, metadata={}, ) try: result = await handler(**parameters) return CollaborationMessage( id=str(uuid.uuid4()), type=MessageType.RESPONSE, from_agent=message.to_agent, to_agent=message.from_agent, content={"status": "success", "result": result}, metadata={}, ) except Exception as e: return CollaborationMessage( id=str(uuid.uuid4()), type=MessageType.RESPONSE, from_agent=message.to_agent, to_agent=message.from_agent, content={"status": "error", "error": str(e)}, metadata={}, ) async def handle_response(self, message: CollaborationMessage) -> None: """Handle an incoming response message Args: message: The response message """ request_id = None for req_id, pending in self._pending_requests.items(): if pending.id == message.id: request_id = req_id break if request_id and request_id in self._response_futures: future = self._response_futures[request_id] if not future.done(): future.set_result(message.content) async def _send_message(self, message: CollaborationMessage) -> None: """Send a collaboration message This is a placeholder for actual transport implementation. In production, this would use WebSocket, message queue, or shared storage. Args: message: The message to send """ # TODO: Implement actual message transport # Options: WebSocket, Redis pub/sub, shared database pass def get_pending_requests(self) -> list: """Get list of pending requests""" return [ { "id": msg.id, "from": msg.from_agent, "to": msg.to_agent, "tool": msg.content.get("tool"), } for msg in self._pending_requests.values() ] # Global collaboration protocol instance _collaboration_protocol: Optional[CollaborationProtocol] = None def get_collaboration_protocol() -> CollaborationProtocol: """Get the global collaboration protocol instance""" global _collaboration_protocol if _collaboration_protocol is None: _collaboration_protocol = CollaborationProtocol() return _collaboration_protocol