- 新增 loop.py Agent 运行循环 - 优化 memory.py 记忆模块 - 扩展 api/routes.py 接口 - 更新 tools 模块:builtin.py, manager.py, __init__.py - 新增 .env.example 配置示例 - 更新 requirements.txt 依赖 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
466 lines
15 KiB
Python
466 lines
15 KiB
Python
"""Built-in tools for X-Agents."""
|
|
|
|
import asyncio
|
|
import json
|
|
import os
|
|
import re
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from nanobot.agent.tools.base import Tool
|
|
|
|
# Import sandbox (optional - graceful fallback if not available)
|
|
try:
|
|
from nanobot.agent.tools.bwrap_sandbox import BwrapSandbox, get_bwrap_sandbox
|
|
from nanobot.agent.tools.sandbox_execution import SandboxType
|
|
SANDBOX_AVAILABLE = True
|
|
except ImportError:
|
|
BwrapSandbox = None
|
|
get_bwrap_sandbox = None
|
|
SandboxType = None
|
|
SANDBOX_AVAILABLE = False
|
|
|
|
|
|
class ReadFileTool(Tool):
|
|
"""Read file contents."""
|
|
|
|
def __init__(self, workspace: Path | None = None):
|
|
self._workspace = workspace
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
return "read_file"
|
|
|
|
@property
|
|
def description(self) -> str:
|
|
return "Read the contents of a file from the local filesystem."
|
|
|
|
@property
|
|
def parameters(self) -> dict[str, Any]:
|
|
return {
|
|
"type": "object",
|
|
"properties": {
|
|
"path": {"type": "string", "description": "The file path to read"},
|
|
"offset": {
|
|
"type": "integer",
|
|
"description": "Line number to start reading from (1-indexed)",
|
|
"default": 1,
|
|
},
|
|
"limit": {
|
|
"type": "integer",
|
|
"description": "Maximum number of lines to read",
|
|
"default": 100,
|
|
},
|
|
},
|
|
"required": ["path"],
|
|
}
|
|
|
|
async def execute(self, path: str, offset: int = 1, limit: int = 100, **kwargs: Any) -> str:
|
|
try:
|
|
file_path = Path(path)
|
|
if not file_path.is_absolute() and self._workspace:
|
|
file_path = self._workspace / file_path
|
|
|
|
if not file_path.exists():
|
|
return f"Error: File not found: {path}"
|
|
|
|
if not file_path.is_file():
|
|
return f"Error: Not a file: {path}"
|
|
|
|
lines = file_path.read_text(encoding="utf-8").split("\n")
|
|
start = max(0, offset - 1)
|
|
end = min(len(lines), start + limit)
|
|
|
|
result_lines = [f"{i+1:4d}| {line}" for i, line in enumerate(lines[start:end], start=start+1)]
|
|
return f"File: {file_path}\nLines {start+1}-{end}/{len(lines)}\n\n" + "\n".join(result_lines)
|
|
except Exception as e:
|
|
return f"Error reading file: {str(e)}"
|
|
|
|
|
|
class WriteFileTool(Tool):
|
|
"""Write content to a file."""
|
|
|
|
def __init__(self, workspace: Path | None = None):
|
|
self._workspace = workspace
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
return "write_file"
|
|
|
|
@property
|
|
def description(self) -> str:
|
|
return "Write content to a file. Creates the file if it doesn't exist."
|
|
|
|
@property
|
|
def parameters(self) -> dict[str, Any]:
|
|
return {
|
|
"type": "object",
|
|
"properties": {
|
|
"path": {"type": "string", "description": "The file path to write to"},
|
|
"content": {"type": "string", "description": "Content to write to the file"},
|
|
"append": {
|
|
"type": "boolean",
|
|
"description": "Append to existing file instead of overwriting",
|
|
"default": False,
|
|
},
|
|
},
|
|
"required": ["path", "content"],
|
|
}
|
|
|
|
async def execute(self, path: str, content: str, append: bool = False, **kwargs: Any) -> str:
|
|
try:
|
|
file_path = Path(path)
|
|
if not file_path.is_absolute() and self._workspace:
|
|
file_path = self._workspace / file_path
|
|
|
|
file_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
mode = "a" if append else "w"
|
|
with open(file_path, mode, encoding="utf-8") as f:
|
|
f.write(content)
|
|
|
|
return f"Successfully wrote to {file_path}"
|
|
except Exception as e:
|
|
return f"Error writing file: {str(e)}"
|
|
|
|
|
|
class ListDirectoryTool(Tool):
|
|
"""List directory contents."""
|
|
|
|
def __init__(self, workspace: Path | None = None):
|
|
self._workspace = workspace
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
return "list_directory"
|
|
|
|
@property
|
|
def description(self) -> str:
|
|
return "List files and directories in a given path."
|
|
|
|
@property
|
|
def parameters(self) -> dict[str, Any]:
|
|
return {
|
|
"type": "object",
|
|
"properties": {
|
|
"path": {
|
|
"type": "string",
|
|
"description": "Directory path to list",
|
|
"default": ".",
|
|
},
|
|
"recursive": {
|
|
"type": "boolean",
|
|
"description": "List recursively",
|
|
"default": False,
|
|
},
|
|
},
|
|
}
|
|
|
|
async def execute(self, path: str = ".", recursive: bool = False, **kwargs: Any) -> str:
|
|
try:
|
|
dir_path = Path(path)
|
|
if not dir_path.is_absolute() and self._workspace:
|
|
dir_path = self._workspace / dir_path
|
|
|
|
if not dir_path.exists():
|
|
return f"Error: Path not found: {path}"
|
|
|
|
if not dir_path.is_dir():
|
|
return f"Error: Not a directory: {path}"
|
|
|
|
if recursive:
|
|
items = []
|
|
for item in dir_path.rglob("*"):
|
|
rel = item.relative_to(dir_path)
|
|
prefix = "[D]" if item.is_dir() else "[F]"
|
|
items.append(f"{prefix} {rel}")
|
|
return "\n".join(sorted(items)) or "(empty)"
|
|
else:
|
|
items = []
|
|
for item in dir_path.iterdir():
|
|
prefix = "[D]" if item.is_dir() else "[F]"
|
|
items.append(f"{prefix} {item.name}")
|
|
return "\n".join(sorted(items)) or "(empty)"
|
|
except Exception as e:
|
|
return f"Error listing directory: {str(e)}"
|
|
|
|
|
|
class SearchTool(Tool):
|
|
"""Search for text in files."""
|
|
|
|
def __init__(self, workspace: Path | None = None):
|
|
self._workspace = workspace
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
return "search"
|
|
|
|
@property
|
|
def description(self) -> str:
|
|
return "Search for text patterns in files using regex."
|
|
|
|
@property
|
|
def parameters(self) -> dict[str, Any]:
|
|
return {
|
|
"type": "object",
|
|
"properties": {
|
|
"pattern": {"type": "string", "description": "Regex pattern to search for"},
|
|
"path": {
|
|
"type": "string",
|
|
"description": "Directory path to search in",
|
|
"default": ".",
|
|
},
|
|
"file_pattern": {
|
|
"type": "string",
|
|
"description": "File glob pattern (e.g., *.py)",
|
|
"default": "*",
|
|
},
|
|
"case_sensitive": {
|
|
"type": "boolean",
|
|
"description": "Case sensitive search",
|
|
"default": True,
|
|
},
|
|
},
|
|
"required": ["pattern"],
|
|
}
|
|
|
|
async def execute(
|
|
self,
|
|
pattern: str,
|
|
path: str = ".",
|
|
file_pattern: str = "*",
|
|
case_sensitive: bool = True,
|
|
**kwargs: Any,
|
|
) -> str:
|
|
try:
|
|
search_path = Path(path)
|
|
if not search_path.is_absolute() and self._workspace:
|
|
search_path = self._workspace / search_path
|
|
|
|
if not search_path.exists():
|
|
return f"Error: Path not found: {path}"
|
|
|
|
flags = 0 if case_sensitive else re.IGNORECASE
|
|
regex = re.compile(pattern, flags)
|
|
|
|
results = []
|
|
for file_path in search_path.rglob(file_pattern):
|
|
if not file_path.is_file():
|
|
continue
|
|
try:
|
|
content = file_path.read_text(encoding="utf-8")
|
|
for i, line in enumerate(content.split("\n"), 1):
|
|
if regex.search(line):
|
|
results.append(f"{file_path}:{i}: {line.strip()[:100]}")
|
|
except Exception:
|
|
continue
|
|
|
|
if not results:
|
|
return f"No matches found for: {pattern}"
|
|
|
|
return f"Found {len(results)} matches:\n" + "\n".join(results[:50])
|
|
except Exception as e:
|
|
return f"Error searching: {str(e)}"
|
|
|
|
|
|
class WebSearchTool(Tool):
|
|
"""Search the web for information."""
|
|
|
|
def __init__(self):
|
|
pass
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
return "web_search"
|
|
|
|
@property
|
|
def description(self) -> str:
|
|
return "Search the web for information using a search engine."
|
|
|
|
@property
|
|
def parameters(self) -> dict[str, Any]:
|
|
return {
|
|
"type": "object",
|
|
"properties": {
|
|
"query": {"type": "string", "description": "Search query"},
|
|
"max_results": {
|
|
"type": "integer",
|
|
"description": "Maximum number of results",
|
|
"default": 5,
|
|
},
|
|
},
|
|
"required": ["query"],
|
|
}
|
|
|
|
async def execute(self, query: str, max_results: int = 5, **kwargs: Any) -> str:
|
|
# Placeholder for web search implementation
|
|
# In production, this would use a search API (e.g., Google, Bing, SerpAPI)
|
|
return f"Web search not implemented yet. Query: {query}"
|
|
|
|
|
|
class CalculatorTool(Tool):
|
|
"""Simple calculator tool."""
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
return "calculator"
|
|
|
|
@property
|
|
def description(self) -> str:
|
|
return "Evaluate a mathematical expression."
|
|
|
|
@property
|
|
def parameters(self) -> dict[str, Any]:
|
|
return {
|
|
"type": "object",
|
|
"properties": {
|
|
"expression": {"type": "string", "description": "Mathematical expression to evaluate"},
|
|
},
|
|
"required": ["expression"],
|
|
}
|
|
|
|
async def execute(self, expression: str, **kwargs: Any) -> str:
|
|
try:
|
|
# Safe evaluation - only allow basic math operators
|
|
allowed_chars = set("0123456789+-*/.() ")
|
|
if not all(c in allowed_chars for c in expression):
|
|
return "Error: Invalid characters in expression"
|
|
|
|
result = eval(expression) # Note: In production, use a safer parser
|
|
return f"{expression} = {result}"
|
|
except Exception as e:
|
|
return f"Error evaluating expression: {str(e)}"
|
|
|
|
|
|
class GetTimeTool(Tool):
|
|
"""Get current time."""
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
return "get_time"
|
|
|
|
@property
|
|
def description(self) -> str:
|
|
return "Get the current date and time."
|
|
|
|
@property
|
|
def parameters(self) -> dict[str, Any]:
|
|
return {
|
|
"type": "object",
|
|
"properties": {
|
|
"timezone": {
|
|
"type": "string",
|
|
"description": "Timezone (e.g., UTC, Asia/Shanghai)",
|
|
"default": "UTC",
|
|
},
|
|
},
|
|
}
|
|
|
|
async def execute(self, timezone: str = "UTC", **kwargs: Any) -> str:
|
|
from datetime import datetime, timezone
|
|
|
|
try:
|
|
if timezone.upper() != "UTC":
|
|
# For non-UTC timezones, return simple result
|
|
return f"Timezone '{timezone}' not supported. Current UTC time: {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')}"
|
|
except Exception:
|
|
pass
|
|
|
|
now = datetime.now(timezone.utc)
|
|
return now.strftime("%Y-%m-%d %H:%M:%S UTC")
|
|
|
|
|
|
class BashTool(Tool):
|
|
"""Execute bash commands."""
|
|
|
|
def __init__(self, workspace: Path | None = None, use_sandbox: bool = False):
|
|
"""Initialize bash tool.
|
|
|
|
Args:
|
|
workspace: Workspace path
|
|
use_sandbox: Whether to use sandbox for execution (recommended for untrusted code)
|
|
"""
|
|
self._workspace = workspace
|
|
self._use_sandbox = use_sandbox
|
|
self._sandbox = None
|
|
if use_sandbox and SANDBOX_AVAILABLE:
|
|
self._sandbox = get_bwrap_sandbox()
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
return "bash"
|
|
|
|
@property
|
|
def description(self) -> str:
|
|
if self._use_sandbox:
|
|
return "Execute a bash command in an isolated sandbox and return its output."
|
|
return "Execute a bash command and return its output."
|
|
|
|
@property
|
|
def parameters(self) -> dict[str, Any]:
|
|
params = {
|
|
"type": "object",
|
|
"properties": {
|
|
"command": {"type": "string", "description": "Command to execute"},
|
|
"timeout": {
|
|
"type": "integer",
|
|
"description": "Timeout in seconds",
|
|
"default": 30,
|
|
},
|
|
},
|
|
"required": ["command"],
|
|
}
|
|
return params
|
|
|
|
async def execute(self, command: str, timeout: int = 30, **kwargs: Any) -> str:
|
|
# Use sandbox if enabled
|
|
if self._use_sandbox and self._sandbox:
|
|
try:
|
|
return await self._sandbox.execute_command(command, timeout)
|
|
except Exception as e:
|
|
return f"Error executing in sandbox: {str(e)}\nFalling back to direct execution."
|
|
|
|
# Direct execution (no sandbox)
|
|
try:
|
|
process = await asyncio.create_subprocess_shell(
|
|
command,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
)
|
|
|
|
try:
|
|
stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=timeout)
|
|
result = []
|
|
if stdout:
|
|
result.append(stdout.decode("utf-8"))
|
|
if stderr:
|
|
result.append(f"STDERR: {stderr.decode('utf-8')}")
|
|
return "\n".join(result) or "Command completed with no output"
|
|
except asyncio.TimeoutError:
|
|
process.kill()
|
|
return f"Error: Command timed out after {timeout} seconds"
|
|
except Exception as e:
|
|
return f"Error executing command: {str(e)}"
|
|
|
|
|
|
def get_builtin_tools(workspace: Path | None = None, use_sandbox: bool = False) -> list[Tool]:
|
|
"""Get list of all built-in tools.
|
|
|
|
Args:
|
|
workspace: Optional workspace path for file operations
|
|
use_sandbox: Whether to use sandbox for shell execution (recommended for untrusted code)
|
|
|
|
Returns:
|
|
List of Tool instances
|
|
"""
|
|
return [
|
|
ReadFileTool(workspace),
|
|
WriteFileTool(workspace),
|
|
ListDirectoryTool(workspace),
|
|
SearchTool(workspace),
|
|
WebSearchTool(),
|
|
CalculatorTool(),
|
|
GetTimeTool(),
|
|
BashTool(workspace, use_sandbox=use_sandbox),
|
|
]
|