# Phase T.4:高级特性 日期:2026-04-04 状态:待开始 依赖:T.3(待完成) --- ## 1. 本阶段目的 实现 Jarvis 工具系统的高级特性: - 多运行时支持(Python/JS/原生) - Agent 间协作 - 定时任务 --- ## 2. 多运行时支持 ### 2.1 运行时架构 ``` ┌─────────────────────────────────────────────────────────────┐ │ Runtime Manager │ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │ │ Python │ │ JS │ │ Native │ │ WASM │ │ │ │ Runtime │ │ Runtime │ │ Runtime │ │ Runtime │ │ │ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ │ │ │ │ │ │ │ │ └─────────────┴──────┬──────┴─────────────┘ │ │ │ │ │ ┌────────┴────────┐ │ │ │ Tool Executor │ │ │ │ (统一接口) │ │ │ └─────────────────┘ │ └─────────────────────────────────────────────────────────────┘ ``` ### 2.2 运行时基类 ```python # tools/runtime/base.py from abc import ABC, abstractmethod from typing import Any, Dict, Optional class BaseRuntime(ABC): """运行时基类""" @abstractmethod async def execute( self, entry: str, command: str, parameters: Dict[str, Any], timeout: int, ) -> Dict[str, Any]: """执行工具""" pass @abstractmethod async def validate(self, entry: str) -> bool: """验证工具是否可用""" pass @abstractmethod def get_name(self) -> str: """获取运行时名称""" pass ``` ### 2.3 Python 运行时 ```python # tools/runtime/python_runtime.py import asyncio from pathlib import Path from tools.runtime.base import BaseRuntime class PythonRuntime(BaseRuntime): """Python 运行时""" def __init__(self): self._executors: Dict[str, Callable] = {} def get_name(self) -> str: return "python" async def validate(self, entry: str) -> bool: path = Path(entry) return path.exists() and path.suffix == ".py" async def execute( self, entry: str, command: str, parameters: Dict[str, Any], timeout: int, ) -> Dict[str, Any]: # 动态加载并执行 # 或者通过 subprocess 调用 pass ``` ### 2.4 JavaScript 运行时 ```python # tools/runtime/js_runtime.py import asyncio import subprocess import json from tools.runtime.base import BaseRuntime class JavaScriptRuntime(BaseRuntime): """JavaScript 运行时""" def __init__(self): self.node_path = "node" # 可配置 def get_name(self) -> str: return "javascript" async def validate(self, entry: str) -> bool: # 检查 node 是否可用 try: result = await asyncio.create_subprocess_exec( self.node_path, "--version", stdout=asyncio.subprocess.PIPE, ) return result.returncode == 0 except: return False async def execute( self, entry: str, command: str, parameters: Dict[str, Any], timeout: int, ) -> Dict[str, Any]: # 通过 stdio 调用 Node.js 脚本 input_data = json.dumps({ "command": command, "parameters": parameters, }) process = await asyncio.create_subprocess_exec( self.node_path, entry, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await asyncio.wait_for( process.communicate(input=input_data.encode()), timeout=timeout / 1000, ) if process.returncode != 0: return { "status": "error", "error": stderr.decode(), } return json.loads(stdout.decode()) ``` ### 2.5 原生运行时 ```python # tools/runtime/native_runtime.py import asyncio import subprocess from tools.runtime.base import BaseRuntime class NativeRuntime(BaseRuntime): """原生二进制运行时""" def get_name(self) -> str: return "native" async def validate(self, entry: str) -> bool: from pathlib import Path path = Path(entry) return path.exists() and path.stat().st_mode & 0o111 async def execute( self, entry: str, command: str, parameters: Dict[str, Any], timeout: int, ) -> Dict[str, Any]: # 调用原生可执行文件 args = [entry, command] + self._format_args(parameters) process = await asyncio.create_subprocess_exec( *args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await asyncio.wait_for( process.communicate(), timeout=timeout / 1000, ) if process.returncode != 0: return { "status": "error", "error": stderr.decode(), } return { "status": "success", "result": stdout.decode(), } def _format_args(self, parameters: Dict[str, Any]) -> list: """格式化参数""" args = [] for key, value in parameters.items(): args.extend([f"--{key}", str(value)]) return args ``` ### 2.6 运行时管理器 ```python # tools/runtime/manager.py from tools.runtime.base import BaseRuntime from tools.runtime.python_runtime import PythonRuntime from tools.runtime.js_runtime import JavaScriptRuntime from tools.runtime.native_runtime import NativeRuntime class RuntimeManager: """运行时管理器""" def __init__(self): self._runtimes: Dict[str, BaseRuntime] = { "python": PythonRuntime(), "javascript": JavaScriptRuntime(), "native": NativeRuntime(), } def get_runtime(self, name: str) -> BaseRuntime: return self._runtimes.get(name) async def execute( self, runtime_name: str, entry: str, command: str, parameters: Dict[str, Any], timeout: int, ) -> Dict[str, Any]: runtime = self.get_runtime(runtime_name) if not runtime: return { "status": "error", "error": f"未知运行时: {runtime_name}", } return await runtime.execute(entry, command, parameters, timeout) ``` --- ## 3. Agent 间协作 ### 3.1 协作协议 ```python # agents/tools/collaboration.py from typing import Dict, Any, Optional, List from dataclasses import dataclass from datetime import datetime from enum import Enum class MessageType(str, Enum): REQUEST = "request" # 请求协作 RESPONSE = "response" # 响应结果 PROGRESS = "progress" # 进度更新 CANCEL = "cancel" # 取消请求 @dataclass class CollaborationMessage: """协作消息""" id: str type: MessageType from_agent: str to_agent: str content: Any metadata: Dict[str, Any] timestamp: datetime = None def __post_init__(self): if self.timestamp is None: self.timestamp = datetime.utcnow() class CollaborationProtocol: """Agent 协作协议""" def __init__(self): self._pending_requests: Dict[str, CollaborationMessage] = {} self._handlers: Dict[str, callable] = {} def register_handler(self, tool_name: str, handler: callable) -> None: """注册工具处理器""" self._handlers[tool_name] = handler async def request_collaboration( self, from_agent: str, to_agent: str, tool_name: str, parameters: Dict[str, Any], timeout: int = 30000, ) -> Dict[str, Any]: """请求协作""" import uuid request_id = str(uuid.uuid4()) message = CollaborationMessage( id=request_id, type=MessageType.REQUEST, from_agent=from_agent, to_agent=to_agent, content={ "tool": tool_name, "parameters": parameters, }, metadata={"timeout": timeout}, ) self._pending_requests[request_id] = message # 发送请求 await self._send_message(message) # 等待响应 try: response = await self._wait_for_response( request_id, timeout, ) return response except TimeoutError: return { "status": "error", "error": "协作请求超时", } async def handle_request( self, message: CollaborationMessage, ) -> CollaborationMessage: """处理协作请求""" tool_name = message.content["tool"] parameters = message.content["parameters"] handler = self._handlers.get(tool_name) if not handler: return CollaborationMessage( id=str(uuid.uuid4()), type=MessageType.RESPONSE, from_agent=message.to_agent, to_agent=message.from_agent, content={ "status": "error", "error": f"未知工具: {tool_name}", }, metadata={}, ) try: result = await handler(**parameters) return CollaborationMessage( id=str(uuid.uuid4()), type=MessageType.RESPONSE, from_agent=message.to_agent, to_agent=message.from_agent, content={"status": "success", "result": result}, metadata={}, ) except Exception as e: return CollaborationMessage( id=str(uuid.uuid4()), type=MessageType.RESPONSE, from_agent=message.to_agent, to_agent=message.from_agent, content={"status": "error", "error": str(e)}, metadata={}, ) async def _send_message(self, message: CollaborationMessage) -> None: """发送消息""" # TODO: 实现消息发送(WebSocket/消息队列) pass async def _wait_for_response( self, request_id: str, timeout: int, ) -> Dict[str, Any]: """等待响应""" # TODO: 实现等待逻辑 pass ``` --- ## 4. 定时任务 ### 4.1 定时任务服务 ```python # tools/scheduler.py import asyncio from typing import Dict, Any, Callable, Optional from datetime import datetime, timedelta from dataclasses import dataclass, field from enum import Enum class ScheduleType(str, Enum): ONCE = "once" # 单次 INTERVAL = "interval" # 间隔 CRON = "cron" # Cron 表达式 @dataclass class ScheduledTask: """定时任务""" id: str name: str schedule_type: ScheduleType schedule_value: str # 时间/间隔/cron tool_name: str parameters: Dict[str, Any] enabled: bool = True last_run: Optional[datetime] = None next_run: Optional[datetime] = None run_count: int = 0 callback: Optional[Callable] = field(default=None) class ToolScheduler: """工具定时调度器""" def __init__(self): self._tasks: Dict[str, ScheduledTask] = {} self._running = False self._loop_task = None async def schedule( self, name: str, schedule_type: ScheduleType, schedule_value: str, tool_name: str, parameters: Dict[str, Any], callback: Optional[Callable] = None, ) -> str: """创建定时任务""" import uuid task_id = str(uuid.uuid4())[:8] task = ScheduledTask( id=task_id, name=name, schedule_type=schedule_type, schedule_value=schedule_value, tool_name=tool_name, parameters=parameters, callback=callback, ) task.next_run = self._calculate_next_run(task) self._tasks[task_id] = task # 启动调度器 if not self._running: await self.start() return task_id def _calculate_next_run(self, task: ScheduledTask) -> datetime: """计算下次运行时间""" now = datetime.utcnow() if task.schedule_type == ScheduleType.ONCE: return datetime.fromisoformat(task.schedule_value) elif task.schedule_type == ScheduleType.INTERVAL: seconds = int(task.schedule_value) return now + timedelta(seconds=seconds) elif task.schedule_type == ScheduleType.CRON: # 解析 cron 表达式 # TODO: 实现 cron 解析 return now + timedelta(hours=1) return now async def start(self) -> None: """启动调度器""" self._running = True self._loop_task = asyncio.create_task(self._run_loop()) async def stop(self) -> None: """停止调度器""" self._running = False if self._loop_task: self._loop_task.cancel() async def _run_loop(self) -> None: """调度循环""" while self._running: now = datetime.utcnow() for task in self._tasks.values(): if not task.enabled: continue if task.next_run and task.next_run <= now: await self._execute_task(task) await asyncio.sleep(1) # 每秒检查一次 async def _execute_task(self, task: ScheduledTask) -> None: """执行任务""" # 调用工具 executor = get_executor(task.tool_name) result = await executor( command=task.parameters.get("command"), parameters=task.parameters, ) # 更新状态 task.last_run = datetime.utcnow() task.run_count += 1 # 计算下次运行 if task.schedule_type != ScheduleType.ONCE: task.next_run = self._calculate_next_run(task) else: task.enabled = False # 调用回调 if task.callback: await task.callback(task, result) async def cancel(self, task_id: str) -> bool: """取消任务""" if task_id in self._tasks: del self._tasks[task_id] return True return False async def list_tasks(self) -> list: """列出所有任务""" return [ { "id": t.id, "name": t.name, "type": t.schedule_type.value, "enabled": t.enabled, "next_run": t.next_run.isoformat() if t.next_run else None, "run_count": t.run_count, } for t in self._tasks.values() ] ``` --- ## 5. 实现步骤 | 步骤 | 任务 | 优先级 | |------|------|--------| | 1 | 实现运行时基类 | 🟢 高 | | 2 | 实现 Python 运行时 | 🟢 高 | | 3 | 实现 JS 运行时 | 🟡 中 | | 4 | 实现原生运行时 | 🟡 中 | | 5 | 实现运行时管理器 | 🟢 高 | | 6 | 实现协作协议 | 🟡 中 | | 7 | 实现定时调度器 | 🟡 中 | | 8 | 单元测试 | 🟡 中 | --- ## 6. 核心文件变更 | 文件 | 变更 | |------|------| | `tools/runtime/__init__.py` | 新增 | | `tools/runtime/base.py` | 新增 | | `tools/runtime/python_runtime.py` | 新增 | | `tools/runtime/js_runtime.py` | 新增 | | `tools/runtime/native_runtime.py` | 新增 | | `tools/runtime/manager.py` | 新增 | | `agents/tools/collaboration.py` | 新增 | | `tools/scheduler.py` | 新增 | --- ## 7. 工作量估算 | 任务 | 工作量 | |------|--------| | 运行时基类 | 0.5 天 | | Python 运行时 | 0.5 天 | | JS 运行时 | 0.5 天 | | 原生运行时 | 0.5 天 | | 运行时管理器 | 0.5 天 | | 协作协议 | 1 天 | | 定时调度器 | 0.5 天 | | 单元测试 | 0.5 天 | | **总计** | **4 天** | --- ## 8. 验收标准 - [ ] Python 运行时可正常执行工具 - [ ] JS 运行时可通过 stdio 调用 - [ ] 原生运行时可执行二进制 - [ ] 运行时管理器正确路由 - [ ] 协作协议可正常请求/响应 - [ ] 定时调度器可按计划执行任务 - [ ] 单元测试通过