# Phase T.3:核心工具实现 日期:2026-04-04 状态:待开始 依赖:T.2(待完成) --- ## 1. 本阶段目的 实现 Jarvis 的核心工具: - 文件操作工具 - 搜索工具 - 网页抓取工具 - 任务管理工具 --- ## 2. 文件操作工具 ### 2.1 实现 ```python # tools/implementations/file_operator.py import os import shutil import asyncio from pathlib import Path from typing import Optional, List, Dict, Any class FileOperator: """文件操作工具""" def __init__(self, config: dict): self.allowed_dirs = self._parse_allowed_dirs( config.get("allowed_directories", "") ) self.max_file_size = config.get("max_file_size", 10 * 1024 * 1024) def _parse_allowed_dirs(self, dirs_str: str) -> Optional[List[str]]: """解析允许目录""" if not dirs_str: return None return [d.strip() for d in dirs_str.split(",") if d.strip()] def _check_path(self, path: str) -> bool: """检查路径是否允许""" if not self.allowed_dirs: return True resolved = Path(path).resolve() return any( str(resolved).startswith(allowed) for allowed in self.allowed_dirs ) async def read_file( self, filePath: str, encoding: str = "utf-8", ) -> Dict[str, Any]: """读取文件""" if not self._check_path(filePath): return {"status": "error", "error": "路径不在允许范围内"} path = Path(filePath) if not path.exists(): return {"status": "error", "error": "文件不存在"} if path.stat().st_size > self.max_file_size: return {"status": "error", "error": "文件过大"} # 根据扩展名处理 suffix = path.suffix.lower() if suffix in [".pdf", ".docx", ".xlsx", ".xls", ".csv"]: return await self._read_binary_file(path) try: content = path.read_text(encoding=encoding) return {"status": "success", "result": content} except Exception as e: return {"status": "error", "error": str(e)} async def _read_binary_file(self, path: Path) -> Dict[str, Any]: """读取二进制文件""" suffix = path.suffix.lower() if suffix == ".pdf": return await self._read_pdf(path) elif suffix in [".docx", ".doc"]: return await self._read_docx(path) elif suffix in [".xlsx", ".xls"]: return await self._read_xlsx(path) elif suffix == ".csv": return await self._read_csv(path) return {"status": "error", "error": "不支持的文件格式"} async def write_file( self, filePath: str, content: str, ) -> Dict[str, Any]: """写入文件""" if not self._check_path(filePath): return {"status": "error", "error": "路径不在允许范围内"} path = Path(filePath) # 如果文件存在,自动创建新文件名 if path.exists(): path = self._get_unique_path(path) try: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(content, encoding="utf-8") return { "status": "success", "result": f"文件已保存: {path.name}", "path": str(path), } except Exception as e: return {"status": "error", "error": str(e)} def _get_unique_path(self, path: Path) -> Path: """获取唯一路径""" if not path.exists(): return path stem = path.stem suffix = path.suffix parent = path.parent counter = 1 while True: new_path = parent / f"{stem}({counter}){suffix}" if not new_path.exists(): return new_path counter += 1 async def list_directory( self, directoryPath: str, showHidden: bool = False, ) -> Dict[str, Any]: """列出目录""" if not self._check_path(directoryPath): return {"status": "error", "error": "路径不在允许范围内"} path = Path(directoryPath) if not path.exists(): return {"status": "error", "error": "目录不存在"} if not path.is_dir(): return {"status": "error", "error": "不是目录"} items = [] for item in path.iterdir(): if not showHidden and item.name.startswith("."): continue items.append({ "name": item.name, "type": "directory" if item.is_dir() else "file", "size": item.stat().st_size if item.is_file() else None, }) return {"status": "success", "result": items} async def search_files( self, searchPath: str, pattern: str, **options, ) -> Dict[str, Any]: """搜索文件""" import fnmatch if not self._check_path(searchPath): return {"status": "error", "error": "路径不在允许范围内"} path = Path(searchPath) if not path.exists(): return {"status": "error", "error": "路径不存在"} case_sensitive = options.get("caseSensitive", False) file_type = options.get("fileType", "all") include_hidden = options.get("includeHidden", False) results = [] for item in path.rglob("*"): if not include_hidden and item.name.startswith("."): continue if not fnmatch.fnmatch(item.name, pattern): continue if file_type == "file" and item.is_dir(): continue if file_type == "directory" and item.is_file(): continue results.append(str(item)) return {"status": "success", "result": results[:100]} # 限制结果数 ``` ### 2.2 Manifest 绑定 ```python # tools/implementations/__init__.py from tools.implementations.file_operator import FileOperator def create_file_operator_executor(config: dict): """创建文件操作执行器""" operator = FileOperator(config) async def execute(command: str, parameters: dict) -> dict: if command == "read_file": return await operator.read_file(**parameters) elif command == "write_file": return await operator.write_file(**parameters) elif command == "list_directory": return await operator.list_directory(**parameters) elif command == "search_files": return await operator.search_files(**parameters) else: return {"status": "error", "error": f"未知命令: {command}"} return execute ``` --- ## 3. 搜索工具 ### 3.1 实现 ```python # tools/implementations/web_search.py import asyncio from typing import Dict, Any, List, Optional class WebSearch: """联网搜索工具""" def __init__(self, config: dict): self.api_key = config.get("api_key") self.max_results = config.get("max_results", 10) async def search( self, query: str, max_results: Optional[int] = None, ) -> Dict[str, Any]: """执行搜索""" try: # 实现搜索逻辑 results = await self._do_search( query, max_results or self.max_results, ) return {"status": "success", "result": results} except Exception as e: return {"status": "error", "error": str(e)} async def _do_search(self, query: str, limit: int) -> List[dict]: """实际搜索""" # TODO: 接入搜索 API return [] async def deep_search( self, query: str, keywords: List[str], ) -> Dict[str, Any]: """深度搜索""" try: # 并发执行多个搜索 tasks = [ self._do_search(kw, 5) for kw in [query] + keywords ] results = await asyncio.gather(*tasks) # 聚合结果 aggregated = self._aggregate_results(results) return {"status": "success", "result": aggregated} except Exception as e: return {"status": "error", "error": str(e)} def _aggregate_results(self, results: List[List[dict]]) -> dict: """聚合搜索结果""" # TODO: 实现结果聚合 return {"summary": "聚合结果", "sources": []} ``` --- ## 4. 网页抓取工具 ### 4.1 实现 ```python # tools/implementations/web_fetch.py import asyncio from typing import Dict, Any, Optional from dataclasses import dataclass @dataclass class FetchResult: """抓取结果""" url: str title: Optional[str] content: str images: List[str] links: List[str] status: int class WebFetch: """网页抓取工具""" def __init__(self, config: dict): self.timeout = config.get("timeout", 30) self.user_agent = config.get( "user_agent", "Mozilla/5.0 (compatible; Jarvis/1.0)" ) async def fetch( self, url: str, include_images: bool = True, ) -> Dict[str, Any]: """抓取网页""" try: result = await self._do_fetch(url, include_images) return { "status": "success", "result": { "url": result.url, "title": result.title, "content": result.content, "images": result.images, "status": result.status, } } except Exception as e: return {"status": "error", "error": str(e)} async def _do_fetch( self, url: str, include_images: bool, ) -> FetchResult: """实际抓取""" import httpx async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.get( url, headers={"User-Agent": self.user_agent}, ) response.raise_for_status() # TODO: 解析 HTML 提取内容 return FetchResult( url=url, title=None, content=response.text, images=[], links=[], status=response.status_code, ) async def screenshot( self, url: str, ) -> Dict[str, Any]: """截取网页截图""" # TODO: 接入截图服务 return {"status": "error", "error": "未实现"} ``` --- ## 5. 任务管理工具 ### 5.1 实现 ```python # tools/implementations/task_manager.py from typing import Dict, Any, List, Optional from dataclasses import dataclass, field from datetime import datetime from enum import Enum class TaskStatus(str, Enum): PENDING = "pending" RUNNING = "running" COMPLETED = "completed" FAILED = "failed" @dataclass class Task: """任务""" id: str name: str description: str status: TaskStatus = TaskStatus.PENDING created_at: datetime = field(default_factory=datetime.utcnow) scheduled_at: Optional[datetime] = None result: Optional[Any] = None error: Optional[str] = None class TaskManager: """任务管理工具""" def __init__(self, config: dict): self._tasks: Dict[str, Task] = {} async def create_task( self, name: str, description: str, scheduled_at: Optional[datetime] = None, ) -> Dict[str, Any]: """创建任务""" import uuid task_id = str(uuid.uuid4())[:8] task = Task( id=task_id, name=name, description=description, scheduled_at=scheduled_at, ) self._tasks[task_id] = task return { "status": "success", "result": { "id": task_id, "name": task.name, "status": task.status.value, } } async def list_tasks( self, status: Optional[str] = None, ) -> Dict[str, Any]: """列出任务""" tasks = list(self._tasks.values()) if status: tasks = [t for t in tasks if t.status.value == status] return { "status": "success", "result": [ { "id": t.id, "name": t.name, "status": t.status.value, "created_at": t.created_at.isoformat(), } for t in tasks ] } async def get_task(self, task_id: str) -> Dict[str, Any]: """获取任务""" task = self._tasks.get(task_id) if not task: return {"status": "error", "error": "任务不存在"} return { "status": "success", "result": { "id": task.id, "name": task.name, "description": task.description, "status": task.status.value, "result": task.result, "error": task.error, } } async def complete_task( self, task_id: str, result: Any, ) -> Dict[str, Any]: """完成任务""" task = self._tasks.get(task_id) if not task: return {"status": "error", "error": "任务不存在"} task.status = TaskStatus.COMPLETED task.result = result return {"status": "success"} async def fail_task( self, task_id: str, error: str, ) -> Dict[str, Any]: """标记任务失败""" task = self._tasks.get(task_id) if not task: return {"status": "error", "error": "任务不存在"} task.status = TaskStatus.FAILED task.error = error return {"status": "success"} ``` --- ## 6. 实现步骤 | 步骤 | 任务 | 优先级 | |------|------|--------| | 1 | 实现 FileOperator | 🟢 高 | | 2 | 实现 WebSearch | 🟡 中 | | 3 | 实现 WebFetch | 🟡 中 | | 4 | 实现 TaskManager | 🟡 中 | | 5 | 创建 Manifest 文件 | 🟢 高 | | 6 | 注册到工具中心 | 🟢 高 | | 7 | 单元测试 | 🟡 中 | --- ## 7. 核心文件变更 | 文件 | 变更 | |------|------| | `tools/implementations/__init__.py` | 新增 | | `tools/implementations/file_operator.py` | 新增 | | `tools/implementations/web_search.py` | 新增 | | `tools/implementations/web_fetch.py` | 新增 | | `tools/implementations/task_manager.py` | 新增 | | `tools/manifests/file_operator.yaml` | 更新 | | `tools/manifests/web_search.yaml` | 新增 | | `tools/manifests/web_fetch.yaml` | 新增 | | `tools/manifests/task_manager.yaml` | 新增 | --- ## 8. 工作量估算 | 任务 | 工作量 | |------|--------| | FileOperator | 1.5 天 | | WebSearch | 1 天 | | WebFetch | 1 天 | | TaskManager | 0.5 天 | | Manifest + 注册 | 0.5 天 | | 单元测试 | 0.5 天 | | **总计** | **5 天** | --- ## 9. 验收标准 - [ ] FileOperator 可正确读写文件 - [ ] FileOperator 支持多种格式解析 - [ ] FileOperator 路径安全检查正常 - [ ] WebSearch 可执行搜索 - [ ] WebFetch 可抓取网页 - [ ] TaskManager 可管理任务 - [ ] 所有工具注册到工具中心 - [ ] 单元测试通过