Files
JARVIS/development-doc/plan/tool-update/phase-t-3-tool-implementation.md

587 lines
16 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Phase T.3:核心工具实现
日期2026-04-04
状态:待开始
依赖T.2(待完成)
---
## 1. 本阶段目的
实现 Jarvis 的核心工具:
- 文件操作工具
- 搜索工具
- 网页抓取工具
- 任务管理工具
---
## 2. 文件操作工具
### 2.1 实现
```python
# tools/implementations/file_operator.py
import os
import shutil
import asyncio
from pathlib import Path
from typing import Optional, List, Dict, Any
class FileOperator:
"""文件操作工具"""
def __init__(self, config: dict):
self.allowed_dirs = self._parse_allowed_dirs(
config.get("allowed_directories", "")
)
self.max_file_size = config.get("max_file_size", 10 * 1024 * 1024)
def _parse_allowed_dirs(self, dirs_str: str) -> Optional[List[str]]:
"""解析允许目录"""
if not dirs_str:
return None
return [d.strip() for d in dirs_str.split(",") if d.strip()]
def _check_path(self, path: str) -> bool:
"""检查路径是否允许"""
if not self.allowed_dirs:
return True
resolved = Path(path).resolve()
return any(
str(resolved).startswith(allowed)
for allowed in self.allowed_dirs
)
async def read_file(
self,
filePath: str,
encoding: str = "utf-8",
) -> Dict[str, Any]:
"""读取文件"""
if not self._check_path(filePath):
return {"status": "error", "error": "路径不在允许范围内"}
path = Path(filePath)
if not path.exists():
return {"status": "error", "error": "文件不存在"}
if path.stat().st_size > self.max_file_size:
return {"status": "error", "error": "文件过大"}
# 根据扩展名处理
suffix = path.suffix.lower()
if suffix in [".pdf", ".docx", ".xlsx", ".xls", ".csv"]:
return await self._read_binary_file(path)
try:
content = path.read_text(encoding=encoding)
return {"status": "success", "result": content}
except Exception as e:
return {"status": "error", "error": str(e)}
async def _read_binary_file(self, path: Path) -> Dict[str, Any]:
"""读取二进制文件"""
suffix = path.suffix.lower()
if suffix == ".pdf":
return await self._read_pdf(path)
elif suffix in [".docx", ".doc"]:
return await self._read_docx(path)
elif suffix in [".xlsx", ".xls"]:
return await self._read_xlsx(path)
elif suffix == ".csv":
return await self._read_csv(path)
return {"status": "error", "error": "不支持的文件格式"}
async def write_file(
self,
filePath: str,
content: str,
) -> Dict[str, Any]:
"""写入文件"""
if not self._check_path(filePath):
return {"status": "error", "error": "路径不在允许范围内"}
path = Path(filePath)
# 如果文件存在,自动创建新文件名
if path.exists():
path = self._get_unique_path(path)
try:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(content, encoding="utf-8")
return {
"status": "success",
"result": f"文件已保存: {path.name}",
"path": str(path),
}
except Exception as e:
return {"status": "error", "error": str(e)}
def _get_unique_path(self, path: Path) -> Path:
"""获取唯一路径"""
if not path.exists():
return path
stem = path.stem
suffix = path.suffix
parent = path.parent
counter = 1
while True:
new_path = parent / f"{stem}({counter}){suffix}"
if not new_path.exists():
return new_path
counter += 1
async def list_directory(
self,
directoryPath: str,
showHidden: bool = False,
) -> Dict[str, Any]:
"""列出目录"""
if not self._check_path(directoryPath):
return {"status": "error", "error": "路径不在允许范围内"}
path = Path(directoryPath)
if not path.exists():
return {"status": "error", "error": "目录不存在"}
if not path.is_dir():
return {"status": "error", "error": "不是目录"}
items = []
for item in path.iterdir():
if not showHidden and item.name.startswith("."):
continue
items.append({
"name": item.name,
"type": "directory" if item.is_dir() else "file",
"size": item.stat().st_size if item.is_file() else None,
})
return {"status": "success", "result": items}
async def search_files(
self,
searchPath: str,
pattern: str,
**options,
) -> Dict[str, Any]:
"""搜索文件"""
import fnmatch
if not self._check_path(searchPath):
return {"status": "error", "error": "路径不在允许范围内"}
path = Path(searchPath)
if not path.exists():
return {"status": "error", "error": "路径不存在"}
case_sensitive = options.get("caseSensitive", False)
file_type = options.get("fileType", "all")
include_hidden = options.get("includeHidden", False)
results = []
for item in path.rglob("*"):
if not include_hidden and item.name.startswith("."):
continue
if not fnmatch.fnmatch(item.name, pattern):
continue
if file_type == "file" and item.is_dir():
continue
if file_type == "directory" and item.is_file():
continue
results.append(str(item))
return {"status": "success", "result": results[:100]} # 限制结果数
```
### 2.2 Manifest 绑定
```python
# tools/implementations/__init__.py
from tools.implementations.file_operator import FileOperator
def create_file_operator_executor(config: dict):
"""创建文件操作执行器"""
operator = FileOperator(config)
async def execute(command: str, parameters: dict) -> dict:
if command == "read_file":
return await operator.read_file(**parameters)
elif command == "write_file":
return await operator.write_file(**parameters)
elif command == "list_directory":
return await operator.list_directory(**parameters)
elif command == "search_files":
return await operator.search_files(**parameters)
else:
return {"status": "error", "error": f"未知命令: {command}"}
return execute
```
---
## 3. 搜索工具
### 3.1 实现
```python
# tools/implementations/web_search.py
import asyncio
from typing import Dict, Any, List, Optional
class WebSearch:
"""联网搜索工具"""
def __init__(self, config: dict):
self.api_key = config.get("api_key")
self.max_results = config.get("max_results", 10)
async def search(
self,
query: str,
max_results: Optional[int] = None,
) -> Dict[str, Any]:
"""执行搜索"""
try:
# 实现搜索逻辑
results = await self._do_search(
query,
max_results or self.max_results,
)
return {"status": "success", "result": results}
except Exception as e:
return {"status": "error", "error": str(e)}
async def _do_search(self, query: str, limit: int) -> List[dict]:
"""实际搜索"""
# TODO: 接入搜索 API
return []
async def deep_search(
self,
query: str,
keywords: List[str],
) -> Dict[str, Any]:
"""深度搜索"""
try:
# 并发执行多个搜索
tasks = [
self._do_search(kw, 5)
for kw in [query] + keywords
]
results = await asyncio.gather(*tasks)
# 聚合结果
aggregated = self._aggregate_results(results)
return {"status": "success", "result": aggregated}
except Exception as e:
return {"status": "error", "error": str(e)}
def _aggregate_results(self, results: List[List[dict]]) -> dict:
"""聚合搜索结果"""
# TODO: 实现结果聚合
return {"summary": "聚合结果", "sources": []}
```
---
## 4. 网页抓取工具
### 4.1 实现
```python
# tools/implementations/web_fetch.py
import asyncio
from typing import Dict, Any, Optional
from dataclasses import dataclass
@dataclass
class FetchResult:
"""抓取结果"""
url: str
title: Optional[str]
content: str
images: List[str]
links: List[str]
status: int
class WebFetch:
"""网页抓取工具"""
def __init__(self, config: dict):
self.timeout = config.get("timeout", 30)
self.user_agent = config.get(
"user_agent",
"Mozilla/5.0 (compatible; Jarvis/1.0)"
)
async def fetch(
self,
url: str,
include_images: bool = True,
) -> Dict[str, Any]:
"""抓取网页"""
try:
result = await self._do_fetch(url, include_images)
return {
"status": "success",
"result": {
"url": result.url,
"title": result.title,
"content": result.content,
"images": result.images,
"status": result.status,
}
}
except Exception as e:
return {"status": "error", "error": str(e)}
async def _do_fetch(
self,
url: str,
include_images: bool,
) -> FetchResult:
"""实际抓取"""
import httpx
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(
url,
headers={"User-Agent": self.user_agent},
)
response.raise_for_status()
# TODO: 解析 HTML 提取内容
return FetchResult(
url=url,
title=None,
content=response.text,
images=[],
links=[],
status=response.status_code,
)
async def screenshot(
self,
url: str,
) -> Dict[str, Any]:
"""截取网页截图"""
# TODO: 接入截图服务
return {"status": "error", "error": "未实现"}
```
---
## 5. 任务管理工具
### 5.1 实现
```python
# tools/implementations/task_manager.py
from typing import Dict, Any, List, Optional
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
class TaskStatus(str, Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class Task:
"""任务"""
id: str
name: str
description: str
status: TaskStatus = TaskStatus.PENDING
created_at: datetime = field(default_factory=datetime.utcnow)
scheduled_at: Optional[datetime] = None
result: Optional[Any] = None
error: Optional[str] = None
class TaskManager:
"""任务管理工具"""
def __init__(self, config: dict):
self._tasks: Dict[str, Task] = {}
async def create_task(
self,
name: str,
description: str,
scheduled_at: Optional[datetime] = None,
) -> Dict[str, Any]:
"""创建任务"""
import uuid
task_id = str(uuid.uuid4())[:8]
task = Task(
id=task_id,
name=name,
description=description,
scheduled_at=scheduled_at,
)
self._tasks[task_id] = task
return {
"status": "success",
"result": {
"id": task_id,
"name": task.name,
"status": task.status.value,
}
}
async def list_tasks(
self,
status: Optional[str] = None,
) -> Dict[str, Any]:
"""列出任务"""
tasks = list(self._tasks.values())
if status:
tasks = [t for t in tasks if t.status.value == status]
return {
"status": "success",
"result": [
{
"id": t.id,
"name": t.name,
"status": t.status.value,
"created_at": t.created_at.isoformat(),
}
for t in tasks
]
}
async def get_task(self, task_id: str) -> Dict[str, Any]:
"""获取任务"""
task = self._tasks.get(task_id)
if not task:
return {"status": "error", "error": "任务不存在"}
return {
"status": "success",
"result": {
"id": task.id,
"name": task.name,
"description": task.description,
"status": task.status.value,
"result": task.result,
"error": task.error,
}
}
async def complete_task(
self,
task_id: str,
result: Any,
) -> Dict[str, Any]:
"""完成任务"""
task = self._tasks.get(task_id)
if not task:
return {"status": "error", "error": "任务不存在"}
task.status = TaskStatus.COMPLETED
task.result = result
return {"status": "success"}
async def fail_task(
self,
task_id: str,
error: str,
) -> Dict[str, Any]:
"""标记任务失败"""
task = self._tasks.get(task_id)
if not task:
return {"status": "error", "error": "任务不存在"}
task.status = TaskStatus.FAILED
task.error = error
return {"status": "success"}
```
---
## 6. 实现步骤
| 步骤 | 任务 | 优先级 |
|------|------|--------|
| 1 | 实现 FileOperator | 🟢 高 |
| 2 | 实现 WebSearch | 🟡 中 |
| 3 | 实现 WebFetch | 🟡 中 |
| 4 | 实现 TaskManager | 🟡 中 |
| 5 | 创建 Manifest 文件 | 🟢 高 |
| 6 | 注册到工具中心 | 🟢 高 |
| 7 | 单元测试 | 🟡 中 |
---
## 7. 核心文件变更
| 文件 | 变更 |
|------|------|
| `tools/implementations/__init__.py` | 新增 |
| `tools/implementations/file_operator.py` | 新增 |
| `tools/implementations/web_search.py` | 新增 |
| `tools/implementations/web_fetch.py` | 新增 |
| `tools/implementations/task_manager.py` | 新增 |
| `tools/manifests/file_operator.yaml` | 更新 |
| `tools/manifests/web_search.yaml` | 新增 |
| `tools/manifests/web_fetch.yaml` | 新增 |
| `tools/manifests/task_manager.yaml` | 新增 |
---
## 8. 工作量估算
| 任务 | 工作量 |
|------|--------|
| FileOperator | 1.5 天 |
| WebSearch | 1 天 |
| WebFetch | 1 天 |
| TaskManager | 0.5 天 |
| Manifest + 注册 | 0.5 天 |
| 单元测试 | 0.5 天 |
| **总计** | **5 天** |
---
## 9. 验收标准
- [ ] FileOperator 可正确读写文件
- [ ] FileOperator 支持多种格式解析
- [ ] FileOperator 路径安全检查正常
- [ ] WebSearch 可执行搜索
- [ ] WebFetch 可抓取网页
- [ ] TaskManager 可管理任务
- [ ] 所有工具注册到工具中心
- [ ] 单元测试通过