feat: 新增 agent/app/core 目录

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-11 14:26:53 +08:00
parent 8249f67351
commit 765a968e63
13 changed files with 2706 additions and 0 deletions

View File

@@ -0,0 +1,8 @@
"""
Core 模块 - AI 核心能力
"""
from . import tools
__all__ = [
"tools",
]

View File

@@ -0,0 +1,130 @@
"""
Agent 工具模块
"""
from .registry import ToolRegistry, ToolMetadata, SecurityLevel, global_registry
from . import impl
# 导入所有工具函数和定义
from .impl import (
# 文件操作
read_file,
write_file,
list_dir,
delete_file,
search_files,
READ_FILE_TOOL,
WRITE_FILE_TOOL,
LIST_DIR_TOOL,
DELETE_FILE_TOOL,
SEARCH_FILES_TOOL,
# 代码执行
execute_python,
execute_javascript,
execute_bash,
EXECUTE_PYTHON_TOOL,
EXECUTE_JAVASCRIPT_TOOL,
EXECUTE_BASH_TOOL,
# 网页
web_fetch,
web_search,
WEB_FETCH_TOOL,
WEB_SEARCH_TOOL,
# HTTP
http_request,
http_get,
http_post,
http_put,
http_delete,
HTTP_REQUEST_TOOL,
# 通知
send_notification,
send_email,
send_webhook,
SEND_NOTIFICATION_TOOL,
# 时间
get_current_time,
format_time,
GET_CURRENT_TIME_TOOL,
)
def register_all_tools(registry: ToolRegistry = None):
"""
注册所有工具到注册表
Args:
registry: 工具注册表,默认使用全局注册表
"""
reg = registry or global_registry
# 文件操作
reg.register("read_file", read_file, READ_FILE_TOOL["description"], "safe", parameters=READ_FILE_TOOL["parameters"])
reg.register("write_file", write_file, WRITE_FILE_TOOL["description"], "review", parameters=WRITE_FILE_TOOL["parameters"])
reg.register("list_dir", list_dir, LIST_DIR_TOOL["description"], "safe", parameters=LIST_DIR_TOOL["parameters"])
reg.register("delete_file", delete_file, DELETE_FILE_TOOL["description"], "danger", parameters=DELETE_FILE_TOOL["parameters"])
reg.register("search_files", search_files, SEARCH_FILES_TOOL["description"], "safe", parameters=SEARCH_FILES_TOOL["parameters"])
# 代码执行
reg.register("execute_python", execute_python, EXECUTE_PYTHON_TOOL["description"], "review", parameters=EXECUTE_PYTHON_TOOL["parameters"])
reg.register("execute_javascript", execute_javascript, EXECUTE_JAVASCRIPT_TOOL["description"], "review", parameters=EXECUTE_JAVASCRIPT_TOOL["parameters"])
reg.register("execute_bash", execute_bash, EXECUTE_BASH_TOOL["description"], "danger", parameters=EXECUTE_BASH_TOOL["parameters"])
# 网页
reg.register("web_fetch", web_fetch, WEB_FETCH_TOOL["description"], "safe", parameters=WEB_FETCH_TOOL["parameters"])
reg.register("web_search", web_search, WEB_SEARCH_TOOL["description"], "safe", parameters=WEB_SEARCH_TOOL["parameters"])
# HTTP
reg.register("http_request", http_request, HTTP_REQUEST_TOOL["description"], "safe", parameters=HTTP_REQUEST_TOOL["parameters"])
# 通知
reg.register("send_notification", send_notification, SEND_NOTIFICATION_TOOL["description"], "safe", parameters=SEND_NOTIFICATION_TOOL["parameters"])
# 时间
reg.register("get_current_time", get_current_time, GET_CURRENT_TIME_TOOL["description"], "safe", parameters=GET_CURRENT_TIME_TOOL["parameters"])
return reg
# 注册所有工具
register_all_tools(global_registry)
__all__ = [
"ToolRegistry",
"ToolMetadata",
"SecurityLevel",
"global_registry",
"register_all_tools",
"impl",
# 所有工具函数
"read_file",
"write_file",
"list_dir",
"delete_file",
"search_files",
"execute_python",
"execute_javascript",
"execute_bash",
"web_fetch",
"web_search",
"http_request",
"http_get",
"http_post",
"http_put",
"http_delete",
"send_notification",
"send_email",
"send_webhook",
"get_current_time",
"format_time",
]

View File

@@ -0,0 +1,100 @@
"""
工具实现模块
"""
from .files import (
read_file,
write_file,
list_dir,
delete_file,
search_files,
READ_FILE_TOOL,
WRITE_FILE_TOOL,
LIST_DIR_TOOL,
DELETE_FILE_TOOL,
SEARCH_FILES_TOOL,
)
from .executor import (
execute_python,
execute_javascript,
execute_bash,
EXECUTE_PYTHON_TOOL,
EXECUTE_JAVASCRIPT_TOOL,
EXECUTE_BASH_TOOL,
)
from .web import (
web_fetch,
web_search,
WEB_FETCH_TOOL,
WEB_SEARCH_TOOL,
)
from .http import (
http_request,
http_get,
http_post,
http_put,
http_delete,
HTTP_REQUEST_TOOL,
)
from .notify import (
send_notification,
send_email,
send_webhook,
SEND_NOTIFICATION_TOOL,
)
from .time_tool import (
get_current_time,
format_time,
GET_CURRENT_TIME_TOOL,
)
__all__ = [
# 文件操作
"read_file",
"write_file",
"list_dir",
"delete_file",
"search_files",
"READ_FILE_TOOL",
"WRITE_FILE_TOOL",
"LIST_DIR_TOOL",
"DELETE_FILE_TOOL",
"SEARCH_FILES_TOOL",
# 代码执行
"execute_python",
"execute_javascript",
"execute_bash",
"EXECUTE_PYTHON_TOOL",
"EXECUTE_JAVASCRIPT_TOOL",
"EXECUTE_BASH_TOOL",
# 网页
"web_fetch",
"web_search",
"WEB_FETCH_TOOL",
"WEB_SEARCH_TOOL",
# HTTP
"http_request",
"http_get",
"http_post",
"http_put",
"http_delete",
"HTTP_REQUEST_TOOL",
# 通知
"send_notification",
"send_email",
"send_webhook",
"SEND_NOTIFICATION_TOOL",
# 时间
"get_current_time",
"format_time",
"GET_CURRENT_TIME_TOOL",
]

View File

@@ -0,0 +1,334 @@
"""
代码执行工具
提供安全的Python、JavaScript、Bash代码执行
"""
import subprocess
import tempfile
import os
import shutil
from typing import Dict, Any, Optional
from pathlib import Path
class ExecutorConfig:
"""执行器配置"""
MAX_EXECUTION_TIME = 30 # 最大执行时间(秒)
MAX_OUTPUT_SIZE = 1024 * 1024 # 最大输出大小(1MB)
MAX_MEMORY_MB = 256 # 最大内存(MB)
ALLOWED_PYTHON_PACKAGES = [] # 允许的Python包(空=仅标准库)
class CodeExecutor:
"""
代码执行器 - 在沙盒环境中执行代码
安全特性:
- 临时目录隔离
- 超时控制
- 输出大小限制
- 环境变量限制
"""
def __init__(self, config: Optional[ExecutorConfig] = None):
self.config = config or ExecutorConfig()
self.temp_dir: Optional[str] = None
def _setup_temp_dir(self) -> str:
"""创建临时目录"""
self.temp_dir = tempfile.mkdtemp(prefix="executor_")
return self.temp_dir
def _cleanup(self):
"""清理临时目录"""
if self.temp_dir and os.path.exists(self.temp_dir):
try:
shutil.rmtree(self.temp_dir)
except Exception:
pass
def _get_safe_env(self) -> Dict[str, str]:
"""获取安全的环境变量"""
return {
"PATH": os.environ.get("PATH", "/usr/bin:/bin"),
"LANG": "en_US.UTF-8",
"HOME": self.temp_dir or "/tmp",
"TMPDIR": self.temp_dir or "/tmp",
}
def execute_python(
self,
code: str,
timeout: Optional[int] = None
) -> Dict[str, Any]:
"""
执行Python代码
Args:
code: Python代码
timeout: 超时时间(秒)
Returns:
执行结果
"""
timeout = timeout or self.config.MAX_EXECUTION_TIME
self._setup_temp_dir()
try:
# 写入临时文件
temp_file = os.path.join(self.temp_dir, "code.py")
with open(temp_file, "w", encoding="utf-8") as f:
f.write(code)
# 执行
result = subprocess.run(
["python", temp_file],
capture_output=True,
timeout=timeout,
cwd=self.temp_dir,
env=self._get_safe_env(),
)
return self._process_result(result)
except subprocess.TimeoutExpired:
return {
"success": False,
"error": f"Execution timeout ({timeout}s)",
"language": "python"
}
except FileNotFoundError:
return {
"success": False,
"error": "Python not installed",
"language": "python"
}
except Exception as e:
return {
"success": False,
"error": str(e),
"language": "python"
}
finally:
self._cleanup()
def execute_javascript(
self,
code: str,
timeout: Optional[int] = None
) -> Dict[str, Any]:
"""
执行JavaScript代码
Args:
code: JavaScript代码
timeout: 超时时间(秒)
Returns:
执行结果
"""
timeout = timeout or self.config.MAX_EXECUTION_TIME
self._setup_temp_dir()
try:
# 写入临时文件
temp_file = os.path.join(self.temp_dir, "code.js")
with open(temp_file, "w", encoding="utf-8") as f:
f.write(code)
# 执行
result = subprocess.run(
["node", temp_file],
capture_output=True,
timeout=timeout,
cwd=self.temp_dir,
)
return self._process_result(result)
except subprocess.TimeoutExpired:
return {
"success": False,
"error": f"Execution timeout ({timeout}s)",
"language": "javascript"
}
except FileNotFoundError:
return {
"success": False,
"error": "Node.js not installed",
"language": "javascript"
}
except Exception as e:
return {
"success": False,
"error": str(e),
"language": "javascript"
}
finally:
self._cleanup()
def execute_bash(
self,
command: str,
timeout: Optional[int] = None
) -> Dict[str, Any]:
"""
执行Bash命令
Args:
command: Bash命令
timeout: 超时时间(秒)
Returns:
执行结果
"""
timeout = timeout or self.config.MAX_EXECUTION_TIME
self._setup_temp_dir()
# 安全检查:禁止的危险命令
dangerous_patterns = [
"rm -rf /",
"mkfs",
"dd if=",
">:/dev/sd",
"chmod 777 /",
"chown -R",
]
for pattern in dangerous_patterns:
if pattern in command:
return {
"success": False,
"error": f"Dangerous command blocked: {pattern}",
"language": "bash"
}
try:
# 执行
result = subprocess.run(
["bash", "-c", command],
capture_output=True,
timeout=timeout,
cwd=self.temp_dir,
env=self._get_safe_env(),
)
return self._process_result(result)
except subprocess.TimeoutExpired:
return {
"success": False,
"error": f"Execution timeout ({timeout}s)",
"language": "bash"
}
except FileNotFoundError:
return {
"success": False,
"error": "Bash not installed",
"language": "bash"
}
except Exception as e:
return {
"success": False,
"error": str(e),
"language": "bash"
}
finally:
self._cleanup()
def _process_result(self, result: subprocess.CompletedProcess) -> Dict[str, Any]:
"""处理执行结果"""
stdout = result.stdout.decode("utf-8", errors="replace")
stderr = result.stderr.decode("utf-8", errors="replace")
# 截断输出
if len(stdout) > self.config.MAX_OUTPUT_SIZE:
stdout = stdout[:self.config.MAX_OUTPUT_SIZE] + "\n... (output truncated)"
return {
"success": result.returncode == 0,
"output": stdout,
"error": stderr if result.returncode != 0 else None,
"exit_code": result.returncode
}
# 全局执行器实例
executor = CodeExecutor()
# 便捷函数
def execute_python(code: str, timeout: int = 30) -> Dict[str, Any]:
"""执行Python代码"""
return executor.execute_python(code, timeout)
def execute_javascript(code: str, timeout: int = 30) -> Dict[str, Any]:
"""执行JavaScript代码"""
return executor.execute_javascript(code, timeout)
def execute_bash(command: str, timeout: int = 30) -> Dict[str, Any]:
"""执行Bash命令"""
return executor.execute_bash(command, timeout)
# 工具定义
EXECUTE_PYTHON_TOOL = {
"name": "execute_python",
"description": "Execute Python code in a sandboxed environment. Use this for Python programming tasks, calculations, and data processing.",
"parameters": {
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "The Python code to execute"
},
"timeout": {
"type": "integer",
"description": "Execution timeout in seconds (default: 30, max: 60)",
"default": 30
}
},
"required": ["code"]
}
}
EXECUTE_JAVASCRIPT_TOOL = {
"name": "execute_javascript",
"description": "Execute JavaScript code in a sandboxed environment. Use this for JavaScript programming tasks.",
"parameters": {
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "The JavaScript code to execute"
},
"timeout": {
"type": "integer",
"description": "Execution timeout in seconds (default: 30)",
"default": 30
}
},
"required": ["code"]
}
}
EXECUTE_BASH_TOOL = {
"name": "execute_bash",
"description": "Execute a bash command in a sandboxed environment. Use this for shell operations, file management, and system commands.",
"parameters": {
"type": "object",
"properties": {
"command": {
"type": "string",
"description": "The bash command to execute"
},
"timeout": {
"type": "integer",
"description": "Execution timeout in seconds (default: 30)",
"default": 30
}
},
"required": ["command"]
}
}

View File

@@ -0,0 +1,444 @@
"""
文件操作工具
提供安全的文件读写、目录操作、搜索功能
"""
import os
import shutil
import glob as glob_module
from typing import Optional, List, Dict, Any
from pathlib import Path
class FileToolConfig:
"""文件工具配置"""
# 允许访问的基础目录(限制在项目内)
ALLOWED_BASE_DIRS = [
"account", # 用户工作区
"temp", # 临时文件
]
MAX_FILE_SIZE = 10 * 1024 * 1024 # 10MB
MAX_SEARCH_RESULTS = 100
def _resolve_safe_path(base_path: str, relative_path: str) -> str:
"""
解析安全的文件路径
确保路径不会超出基础目录
"""
# 规范化路径
full_path = os.path.normpath(os.path.join(base_path, relative_path))
# 检查是否在允许的基础目录内
path_parts = Path(full_path).parts
if len(path_parts) < 2:
raise ValueError("Invalid path: too short")
base_dir = path_parts[0]
if base_dir not in FileToolConfig.ALLOWED_BASE_DIRS and not base_dir.endswith(".py"):
# 允许 account 下的子目录
if len(path_parts) >= 2 and path_parts[0] != "account":
raise ValueError(f"Path not in allowed directories: {base_dir}")
return full_path
def read_file(file_path: str, encoding: str = "utf-8") -> Dict[str, Any]:
"""
读取文件内容
Args:
file_path: 文件路径
encoding: 文件编码
Returns:
文件内容
"""
try:
# 安全检查
full_path = _resolve_safe_path("", file_path)
if not os.path.exists(full_path):
return {
"success": False,
"error": f"File not found: {file_path}"
}
if not os.path.isfile(full_path):
return {
"success": False,
"error": f"Not a file: {file_path}"
}
# 检查文件大小
file_size = os.path.getsize(full_path)
if file_size > FileToolConfig.MAX_FILE_SIZE:
return {
"success": False,
"error": f"File too large: {file_size} bytes (max {FileToolConfig.MAX_FILE_SIZE})"
}
# 读取内容
with open(full_path, "r", encoding=encoding, errors="replace") as f:
content = f.read()
return {
"success": True,
"content": content,
"file_path": file_path,
"size": file_size,
"encoding": encoding
}
except ValueError as e:
return {
"success": False,
"error": str(e)
}
except Exception as e:
return {
"success": False,
"error": f"Read error: {str(e)}"
}
def write_file(file_path: str, content: str, encoding: str = "utf-8") -> Dict[str, Any]:
"""
写入文件内容
Args:
file_path: 文件路径
content: 文件内容
encoding: 文件编码
Returns:
写入结果
"""
try:
# 安全检查
full_path = _resolve_safe_path("", file_path)
# 检查内容大小
if len(content.encode(encoding)) > FileToolConfig.MAX_FILE_SIZE:
return {
"success": False,
"error": f"Content too large: {len(content)} bytes"
}
# 确保目录存在
os.makedirs(os.path.dirname(full_path), exist_ok=True)
# 写入内容
with open(full_path, "w", encoding=encoding) as f:
f.write(content)
return {
"success": True,
"file_path": file_path,
"bytes_written": len(content.encode(encoding))
}
except ValueError as e:
return {
"success": False,
"error": str(e)
}
except Exception as e:
return {
"success": False,
"error": f"Write error: {str(e)}"
}
def list_dir(dir_path: str = ".") -> Dict[str, Any]:
"""
列出目录内容
Args:
dir_path: 目录路径
Returns:
目录内容列表
"""
try:
full_path = _resolve_safe_path("", dir_path)
if not os.path.exists(full_path):
return {
"success": False,
"error": f"Directory not found: {dir_path}"
}
if not os.path.isdir(full_path):
return {
"success": False,
"error": f"Not a directory: {dir_path}"
}
items = []
for item in os.listdir(full_path):
item_path = os.path.join(full_path, item)
is_dir = os.path.isdir(item_path)
try:
size = 0 if is_dir else os.path.getsize(item_path)
except:
size = 0
items.append({
"name": item,
"type": "directory" if is_dir else "file",
"size": size
})
return {
"success": True,
"path": dir_path,
"items": items,
"count": len(items)
}
except ValueError as e:
return {
"success": False,
"error": str(e)
}
except Exception as e:
return {
"success": False,
"error": f"List error: {str(e)}"
}
def delete_file(file_path: str) -> Dict[str, Any]:
"""
删除文件或目录
Args:
file_path: 文件或目录路径
Returns:
删除结果
"""
try:
full_path = _resolve_safe_path("", file_path)
if not os.path.exists(full_path):
return {
"success": False,
"error": f"Path not found: {file_path}"
}
# 删除
if os.path.isfile(full_path):
os.remove(full_path)
elif os.path.isdir(full_path):
shutil.rmtree(full_path)
return {
"success": True,
"file_path": file_path,
"deleted": True
}
except ValueError as e:
return {
"success": False,
"error": str(e)
}
except Exception as e:
return {
"success": False,
"error": f"Delete error: {str(e)}"
}
def search_files(
directory: str,
pattern: str = "*",
content_pattern: Optional[str] = None,
file_only: bool = True
) -> Dict[str, Any]:
"""
搜索文件
Args:
directory: 搜索目录
pattern: 文件名匹配模式 (glob)
content_pattern: 文件内容匹配模式 (可选)
file_only: 是否只返回文件
Returns:
搜索结果
"""
try:
full_path = _resolve_safe_path("", directory)
if not os.path.exists(full_path) or not os.path.isdir(full_path):
return {
"success": False,
"error": f"Invalid directory: {directory}"
}
results = []
# 按文件名搜索
for match in glob_module.glob(os.path.join(full_path, "**", pattern), recursive=True):
if file_only and os.path.isdir(match):
continue
rel_path = os.path.relpath(match, full_path)
# 如果没有内容搜索,直接添加
if not content_pattern:
results.append({
"path": rel_path,
"name": os.path.basename(match),
"type": "directory" if os.path.isdir(match) else "file"
})
continue
# 内容搜索
if os.path.isfile(match):
try:
# 检查文件大小
if os.path.getsize(match) > FileToolConfig.MAX_FILE_SIZE:
continue
with open(match, "r", encoding="utf-8", errors="ignore") as f:
content = f.read()
if content_pattern.lower() in content.lower():
results.append({
"path": rel_path,
"name": os.path.basename(match),
"type": "file",
"match": content_pattern
})
except:
continue
# 限制结果数量
if len(results) > FileToolConfig.MAX_SEARCH_RESULTS:
results = results[:FileToolConfig.MAX_SEARCH_RESULTS]
return {
"success": True,
"directory": directory,
"pattern": pattern,
"results": results,
"count": len(results)
}
except ValueError as e:
return {
"success": False,
"error": str(e)
}
except Exception as e:
return {
"success": False,
"error": f"Search error: {str(e)}"
}
# 工具定义
READ_FILE_TOOL = {
"name": "read_file",
"description": "Read the contents of a file from the filesystem.",
"parameters": {
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "The path to the file to read"
},
"encoding": {
"type": "string",
"description": "File encoding (default: utf-8)",
"default": "utf-8"
}
},
"required": ["file_path"]
}
}
WRITE_FILE_TOOL = {
"name": "write_file",
"description": "Write content to a file. Creates the file if it doesn't exist, overwrites if it does.",
"parameters": {
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "The path to the file to write"
},
"content": {
"type": "string",
"description": "The content to write to the file"
},
"encoding": {
"type": "string",
"description": "File encoding (default: utf-8)",
"default": "utf-8"
}
},
"required": ["file_path", "content"]
}
}
LIST_DIR_TOOL = {
"name": "list_dir",
"description": "List the contents of a directory.",
"parameters": {
"type": "object",
"properties": {
"dir_path": {
"type": "string",
"description": "The path to the directory to list (default: current directory)",
"default": "."
}
}
}
}
DELETE_FILE_TOOL = {
"name": "delete_file",
"description": "Delete a file or directory.",
"parameters": {
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "The path to the file or directory to delete"
}
},
"required": ["file_path"]
}
}
SEARCH_FILES_TOOL = {
"name": "search_files",
"description": "Search for files by name pattern or content.",
"parameters": {
"type": "object",
"properties": {
"directory": {
"type": "string",
"description": "The directory to search in"
},
"pattern": {
"type": "string",
"description": "Glob pattern for file names (e.g., '*.py', '*.txt')",
"default": "*"
},
"content_pattern": {
"type": "string",
"description": "Optional: search for files containing this text in their content"
},
"file_only": {
"type": "boolean",
"description": "Only return files, not directories",
"default": True
}
},
"required": ["directory"]
}
}

View File

@@ -0,0 +1,271 @@
"""
HTTP请求工具
提供通用的HTTP API调用功能
"""
import httpx
from typing import Dict, Any, Optional, List
class HTTPClientConfig:
"""HTTP客户端配置"""
DEFAULT_TIMEOUT = 30 # 默认超时(秒)
MAX_RESPONSE_SIZE = 5 * 1024 * 1024 # 最大响应大小(5MB)
MAX_REDIRECTS = 5 # 最大重定向次数
ALLOWED_METHODS = ["GET", "POST", "PUT", "DELETE", "PATCH", "HEAD", "OPTIONS"]
class HTTPClient:
"""
HTTP客户端工具
安全特性:
- 只允许特定HTTP方法
- 响应大小限制
- 超时控制
- 请求/响应日志
"""
def __init__(self):
self.default_timeout = HTTPClientConfig.DEFAULT_TIMEOUT
async def request(
self,
url: str,
method: str = "GET",
params: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, str]] = None,
json_data: Optional[Dict[str, Any]] = None,
data: Optional[Any] = None,
timeout: Optional[int] = None,
allow_redirects: bool = True
) -> Dict[str, Any]:
"""
发送HTTP请求
Args:
url: 目标URL
method: HTTP方法
params: 查询参数
headers: 请求头
json_data: JSON请求体
data: 原始请求体
timeout: 超时时间
allow_redirects: 是否允许重定向
Returns:
响应结果
"""
# 安全检查:方法
method = method.upper()
if method not in HTTPClientConfig.ALLOWED_METHODS:
return {
"success": False,
"error": f"Method '{method}' not allowed. Allowed: {HTTPClientConfig.ALLOWED_METHODS}"
}
# 安全检查:协议
if not url.startswith(("http://", "https://")):
return {
"success": False,
"error": "Only HTTP and HTTPS protocols are allowed"
}
timeout = timeout or self.default_timeout
try:
async with httpx.AsyncClient(
timeout=timeout,
max_redirects=HTTPClientConfig.MAX_REDIRECTS if allow_redirects else 0,
follow_redirects=allow_redirects,
) as client:
response = await client.request(
method=method,
url=url,
params=params,
headers=headers,
json=json_data,
content=data,
)
# 检查响应大小
content_length = len(response.content)
if content_length > HTTPClientConfig.MAX_RESPONSE_SIZE:
return {
"success": False,
"error": f"Response too large: {content_length} bytes"
}
# 解析响应
content_type = response.headers.get("content-type", "")
if "application/json" in content_type:
try:
return {
"success": True,
"status_code": response.status_code,
"url": str(response.url),
"headers": dict(response.headers),
"json": response.json()
}
except:
pass
# 文本响应
return {
"success": True,
"status_code": response.status_code,
"url": str(response.url),
"headers": dict(response.headers),
"text": response.text[:HTTPClientConfig.MAX_RESPONSE_SIZE]
}
except httpx.TimeoutException:
return {
"success": False,
"error": f"Request timeout ({timeout}s)"
}
except httpx.InvalidURL:
return {
"success": False,
"error": "Invalid URL"
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
async def get(
self,
url: str,
params: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, str]] = None,
timeout: Optional[int] = None
) -> Dict[str, Any]:
"""发送GET请求"""
return await self.request(url, "GET", params, headers, timeout=timeout)
async def post(
self,
url: str,
json_data: Optional[Dict[str, Any]] = None,
data: Optional[Any] = None,
headers: Optional[Dict[str, str]] = None,
timeout: Optional[int] = None
) -> Dict[str, Any]:
"""发送POST请求"""
return await self.request(url, "POST", None, headers, json_data, data, timeout)
async def put(
self,
url: str,
json_data: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, str]] = None,
timeout: Optional[int] = None
) -> Dict[str, Any]:
"""发送PUT请求"""
return await self.request(url, "PUT", None, headers, json_data, None, timeout)
async def delete(
self,
url: str,
headers: Optional[Dict[str, str]] = None,
timeout: Optional[int] = None
) -> Dict[str, Any]:
"""发送DELETE请求"""
return await self.request(url, "DELETE", None, headers, timeout=timeout)
# 全局HTTP客户端
http_client = HTTPClient()
# 便捷函数
async def http_request(
url: str,
method: str = "GET",
params: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, str]] = None,
json_data: Optional[Dict[str, Any]] = None,
timeout: Optional[int] = None
) -> Dict[str, Any]:
"""发送HTTP请求"""
return await http_client.request(url, method, params, headers, json_data, None, timeout)
async def http_get(
url: str,
params: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, str]] = None,
timeout: Optional[int] = None
) -> Dict[str, Any]:
"""发送GET请求"""
return await http_client.get(url, params, headers, timeout)
async def http_post(
url: str,
json_data: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, str]] = None,
timeout: Optional[int] = None
) -> Dict[str, Any]:
"""发送POST请求"""
return await http_client.post(url, json_data, None, headers, timeout)
async def http_put(
url: str,
json_data: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, str]] = None,
timeout: Optional[int] = None
) -> Dict[str, Any]:
"""发送PUT请求"""
return await http_client.put(url, json_data, headers, timeout)
async def http_delete(
url: str,
headers: Optional[Dict[str, str]] = None,
timeout: Optional[int] = None
) -> Dict[str, Any]:
"""发送DELETE请求"""
return await http_client.delete(url, headers, timeout)
# 工具定义
HTTP_REQUEST_TOOL = {
"name": "http_request",
"description": "Make HTTP requests to APIs. Supports GET, POST, PUT, DELETE methods with JSON data.",
"parameters": {
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "The URL to request"
},
"method": {
"type": "string",
"description": "HTTP method (GET, POST, PUT, DELETE, PATCH)",
"default": "GET"
},
"params": {
"type": "object",
"description": "Query parameters for GET requests"
},
"headers": {
"type": "object",
"description": "Request headers"
},
"json_data": {
"type": "object",
"description": "JSON body for POST/PUT requests"
},
"timeout": {
"type": "integer",
"description": "Request timeout in seconds",
"default": 30
}
},
"required": ["url"]
}
}

View File

@@ -0,0 +1,379 @@
"""
通知工具
提供发送通知的功能邮件、Webhook等
"""
import httpx
from typing import Dict, Any, Optional, List
from dataclasses import dataclass
from enum import Enum
class NotificationType(Enum):
"""通知类型"""
EMAIL = "email"
WEBHOOK = "webhook"
SMS = "sms"
DINGTALK = "dingtalk"
WECHAT = "wechat"
SLACK = "slack"
@dataclass
class NotificationConfig:
"""通知配置"""
# Email配置
smtp_host: str = ""
smtp_port: int = 587
smtp_user: str = ""
smtp_password: str = ""
from_email: str = ""
# Webhook配置
webhook_url: str = ""
webhook_secret: str = ""
# 钉钉配置
dingtalk_webhook: str = ""
# Slack配置
slack_webhook: str = ""
class NotificationTool:
"""
通知工具
支持多种通知渠道:
- Email (SMTP)
- Webhook
- 钉钉
- Slack
"""
def __init__(self, config: Optional[NotificationConfig] = None):
self.config = config or NotificationConfig()
async def send_email(
self,
to: str,
subject: str,
body: str,
cc: Optional[List[str]] = None,
is_html: bool = False
) -> Dict[str, Any]:
"""
发送邮件
Args:
to: 收件人
subject: 主题
body: 内容
cc: 抄送列表
is_html: 是否HTML格式
Returns:
发送结果
"""
if not self.config.smtp_host:
return {
"success": False,
"error": "Email not configured"
}
try:
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
# 构建邮件
msg = MIMEMultipart('alternative')
msg['Subject'] = subject
msg['From'] = self.config.from_email or self.config.smtp_user
msg['To'] = to
if cc:
msg['Cc'] = ",".join(cc)
# 添加内容
content_type = "html" if is_html else "plain"
msg.attach(MIMEText(body, content_type))
# 发送
with smtplib.SMTP(self.config.smtp_host, self.config.smtp_port) as server:
server.starttls()
server.login(self.config.smtp_user, self.config.smtp_password)
server.send_message(msg)
return {
"success": True,
"type": "email",
"to": to,
"subject": subject
}
except Exception as e:
return {
"success": False,
"error": str(e),
"type": "email"
}
async def send_webhook(
self,
url: str,
data: Dict[str, Any],
method: str = "POST",
headers: Optional[Dict[str, str]] = None
) -> Dict[str, Any]:
"""
发送Webhook
Args:
url: Webhook URL
data: 请求数据
method: HTTP方法
headers: 请求头
Returns:
发送结果
"""
try:
async with httpx.AsyncClient(timeout=10) as client:
response = await client.request(
method=method,
url=url,
json=data,
headers=headers
)
return {
"success": response.status_code < 400,
"status_code": response.status_code,
"type": "webhook",
"url": url
}
except Exception as e:
return {
"success": False,
"error": str(e),
"type": "webhook"
}
async def send_dingtalk(
self,
message: str,
webhook: Optional[str] = None
) -> Dict[str, Any]:
"""
发送钉钉消息
Args:
message: 消息内容
webhook: 自定义webhook URL
Returns:
发送结果
"""
url = webhook or self.config.dingtalk_webhook
if not url:
return {
"success": False,
"error": "Dingtalk webhook not configured"
}
try:
async with httpx.AsyncClient(timeout=10) as client:
response = await client.post(
url,
json={
"msgtype": "text",
"text": {
"content": message
}
}
)
result = response.json()
return {
"success": result.get("errcode") == 0,
"type": "dingtalk",
"response": result
}
except Exception as e:
return {
"success": False,
"error": str(e),
"type": "dingtalk"
}
async def send_slack(
self,
message: str,
channel: Optional[str] = None,
webhook: Optional[str] = None
) -> Dict[str, Any]:
"""
发送Slack消息
Args:
message: 消息内容
channel: 频道
webhook: 自定义webhook URL
Returns:
发送结果
"""
url = webhook or self.config.slack_webhook
if not url:
return {
"success": False,
"error": "Slack webhook not configured"
}
try:
payload = {"text": message}
if channel:
payload["channel"] = channel
async with httpx.AsyncClient(timeout=10) as client:
response = await client.post(url, json=payload)
return {
"success": response.status_code == 200,
"type": "slack",
"status_code": response.status_code
}
except Exception as e:
return {
"success": False,
"error": str(e),
"type": "slack"
}
async def send(
self,
type: str,
message: str,
**kwargs
) -> Dict[str, Any]:
"""
统一发送接口
Args:
type: 通知类型 (email, webhook, dingtalk, slack)
message: 消息内容
**kwargs: 其他参数
Returns:
发送结果
"""
type = type.lower()
if type == "email":
return await self.send_email(
to=kwargs.get("to", ""),
subject=kwargs.get("subject", "Notification"),
body=message,
cc=kwargs.get("cc")
)
elif type == "webhook":
return await self.send_webhook(
url=kwargs.get("url", ""),
data=kwargs.get("data", {"message": message})
)
elif type == "dingtalk":
return await self.send_dingtalk(
message=message,
webhook=kwargs.get("webhook")
)
elif type == "slack":
return await self.send_slack(
message=message,
channel=kwargs.get("channel"),
webhook=kwargs.get("webhook")
)
else:
return {
"success": False,
"error": f"Unknown notification type: {type}"
}
# 全局通知工具
notification_tool = NotificationTool()
# 便捷函数
async def send_notification(
type: str,
message: str,
**kwargs
) -> Dict[str, Any]:
"""发送通知"""
return await notification_tool.send(type, message, **kwargs)
async def send_email(
to: str,
subject: str,
body: str
) -> Dict[str, Any]:
"""发送邮件"""
return await notification_tool.send_email(to, subject, body)
async def send_webhook(
url: str,
data: Dict[str, Any]
) -> Dict[str, Any]:
"""发送Webhook"""
return await notification_tool.send_webhook(url, data)
# 工具定义
SEND_NOTIFICATION_TOOL = {
"name": "send_notification",
"description": "Send notifications via email, webhook, dingtalk, or slack.",
"parameters": {
"type": "object",
"properties": {
"type": {
"type": "string",
"description": "Notification type: email, webhook, dingtalk, slack",
"enum": ["email", "webhook", "dingtalk", "slack"]
},
"message": {
"type": "string",
"description": "The notification message"
},
"to": {
"type": "string",
"description": "For email: recipient email address"
},
"subject": {
"type": "string",
"description": "For email: email subject"
},
"url": {
"type": "string",
"description": "For webhook: webhook URL"
},
"data": {
"type": "object",
"description": "For webhook: JSON data to send"
},
"webhook": {
"type": "string",
"description": "Custom webhook URL for dingtalk/slack"
},
"channel": {
"type": "string",
"description": "For slack: channel name"
}
},
"required": ["type", "message"]
}
}

View File

@@ -0,0 +1,70 @@
"""
时间工具
"""
from datetime import datetime
from typing import Optional, Dict, Any
def get_current_time(timezone: Optional[str] = None) -> Dict[str, Any]:
"""
获取当前时间
Args:
timezone: 时区名称,如 "UTC", "Asia/Shanghai"
Returns:
当前时间信息
"""
now = datetime.now()
return {
"success": True,
"datetime": now.isoformat(),
"timestamp": now.timestamp(),
"date": now.strftime("%Y-%m-%d"),
"time": now.strftime("%H:%M:%S"),
"weekday": now.strftime("%A"),
"timezone": timezone or "Local Time"
}
def format_time(timestamp: float, format_str: str = "%Y-%m-%d %H:%M:%S") -> Dict[str, Any]:
"""
格式化时间戳
Args:
timestamp: Unix 时间戳
format_str: 格式字符串
Returns:
格式化后的时间
"""
try:
dt = datetime.fromtimestamp(timestamp)
return {
"success": True,
"formatted": dt.strftime(format_str),
"datetime": dt.isoformat()
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
# 工具定义
GET_CURRENT_TIME_TOOL = {
"name": "get_current_time",
"description": "Get the current date and time. Useful for timestamps or scheduling.",
"parameters": {
"type": "object",
"properties": {
"timezone": {
"type": "string",
"description": "Optional timezone (e.g., 'UTC', 'Asia/Shanghai')",
"default": "Local"
}
}
}
}

View File

@@ -0,0 +1,233 @@
"""
网页获取工具
提供安全的网页内容抓取功能
"""
import httpx
from typing import Dict, Any, Optional
class WebToolConfig:
"""网页工具配置"""
REQUEST_TIMEOUT = 30 # 请求超时(秒)
MAX_RESPONSE_SIZE = 2 * 1024 * 1024 # 最大响应大小(2MB)
MAX_REDIRECTS = 5 # 最大重定向次数
ALLOWED_PROTOCOLS = ["http", "https"] # 允许的协议
async def web_fetch(
url: str,
method: str = "GET",
params: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, str]] = None,
body: Optional[str] = None,
timeout: Optional[int] = None
) -> Dict[str, Any]:
"""
获取网页内容
Args:
url: 目标URL
method: HTTP方法
params: 查询参数
headers: 请求头
body: 请求体
timeout: 超时时间
Returns:
网页内容
"""
timeout = timeout or WebToolConfig.REQUEST_TIMEOUT
# 安全检查:协议
if not url.startswith(("http://", "https://")):
return {
"success": False,
"error": "Only HTTP and HTTPS protocols are allowed"
}
try:
async with httpx.AsyncClient(
timeout=timeout,
max_redirects=WebToolConfig.MAX_REDIRECTS,
follow_redirects=True,
) as client:
# 发送请求
response = await client.request(
method=method,
url=url,
params=params,
headers=headers,
content=body,
)
# 检查响应大小
if len(response.content) > WebToolConfig.MAX_RESPONSE_SIZE:
return {
"success": False,
"error": f"Response too large: {len(response.content)} bytes (max {WebToolConfig.MAX_RESPONSE_SIZE})"
}
# 尝试解析JSON
content_type = response.headers.get("content-type", "")
if "application/json" in content_type:
try:
data = response.json()
return {
"success": True,
"url": str(response.url),
"status_code": response.status_code,
"content_type": content_type,
"data": data,
"headers": dict(response.headers)
}
except:
pass
# 返回文本
return {
"success": True,
"url": str(response.url),
"status_code": response.status_code,
"content_type": content_type,
"content": response.text[:WebToolConfig.MAX_RESPONSE_SIZE],
"headers": dict(response.headers)
}
except httpx.TimeoutException:
return {
"success": False,
"error": f"Request timeout ({timeout}s)"
}
except httpx.RedirectLoop:
return {
"success": False,
"error": "Too many redirects"
}
except httpx.InvalidURL:
return {
"success": False,
"error": "Invalid URL"
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
async def web_search(
query: str,
max_results: int = 5
) -> Dict[str, Any]:
"""
搜索网页
Args:
query: 搜索关键词
max_results: 最大结果数
Returns:
搜索结果
"""
try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(
"https://api.duckduckgo.com/",
params={
"q": query,
"format": "json",
"no_html": 1,
"skip_disambig": 1
}
)
if response.status_code == 200:
data = response.json()
results = []
if "RelatedTopics" in data:
for item in data["RelatedTopics"][:max_results]:
if "Text" in item:
text = item.get("Text", "")
results.append({
"title": text.split(" - ")[0] if " - " in text else "",
"content": text,
"url": item.get("URL", "")
})
return {
"success": True,
"query": query,
"results": results,
"count": len(results)
}
else:
return {
"success": False,
"error": f"Search API returned status {response.status_code}"
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
# 工具定义
WEB_FETCH_TOOL = {
"name": "web_fetch",
"description": "Fetch content from a web URL. Supports GET, POST methods and can return JSON or text content.",
"parameters": {
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "The URL to fetch"
},
"method": {
"type": "string",
"description": "HTTP method (GET, POST)",
"default": "GET"
},
"params": {
"type": "object",
"description": "Query parameters"
},
"headers": {
"type": "object",
"description": "Request headers"
},
"body": {
"type": "string",
"description": "Request body (for POST)"
},
"timeout": {
"type": "integer",
"description": "Request timeout in seconds",
"default": 30
}
},
"required": ["url"]
}
}
WEB_SEARCH_TOOL = {
"name": "web_search",
"description": "Search the web for information. Use this when you need to find current information or facts.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The search query"
},
"max_results": {
"type": "integer",
"description": "Maximum number of results to return",
"default": 5
}
},
"required": ["query"]
}
}

View File

@@ -0,0 +1,107 @@
"""
工具注册表 - 管理所有可用工具(白名单机制)
"""
from typing import Any, Callable, Optional, Dict
from dataclasses import dataclass, asdict
from enum import Enum
class SecurityLevel(Enum):
"""工具安全等级"""
SAFE = "safe" # 安全操作
REVIEW = "review" # 需要审核
DANGER = "danger" # 危险操作
@dataclass
class ToolMetadata:
"""工具元数据"""
name: str
description: str
security_level: str
require_approval: bool = False
allowed_roles: list = None
def dict(self):
return {
"name": self.name,
"description": self.description,
"security_level": self.security_level,
"require_approval": self.require_approval
}
class ToolRegistry:
"""工具注册表"""
def __init__(self):
self._tools: Dict[str, tuple[Callable, ToolMetadata]] = {}
self._definitions: Dict[str, dict] = {}
def register(
self,
name: str,
func: Callable,
description: str = "",
security_level: str = "safe",
require_approval: bool = False,
allowed_roles: list = None,
parameters: dict = None
):
"""注册工具到白名单"""
metadata = ToolMetadata(
name=name,
description=description,
security_level=security_level,
require_approval=require_approval,
allowed_roles=allowed_roles or ["user", "admin"]
)
self._tools[name] = (func, metadata)
# 生成工具定义(用于 LLM 调用)
self._definitions[name] = {
"name": name,
"description": description,
"parameters": parameters or {
"type": "object",
"properties": {},
"required": []
}
}
def get_tool(self, name: str) -> tuple[Callable, ToolMetadata]:
"""获取工具函数和元数据"""
if name not in self._tools:
raise ValueError(f"Tool '{name}' not found in whitelist")
return self._tools[name]
def get_tool_definition(self, name: str) -> Optional[dict]:
"""获取工具定义(用于 LLM"""
return self._definitions.get(name)
def list_tools(self) -> list[ToolMetadata]:
"""列出所有已注册工具"""
return [meta for _, meta in self._tools.values()]
def list_definitions(self) -> list[dict]:
"""列出所有工具定义用于LLM"""
return list(self._definitions.values())
def check_permission(self, tool_name: str, user_role: str) -> bool:
"""检查用户权限"""
if tool_name not in self._tools:
return False
_, metadata = self._tools[tool_name]
return user_role in metadata.allowed_roles
def need_approval(self, tool_name: str) -> bool:
"""判断是否需要审批"""
if tool_name not in self._tools:
return False
_, metadata = self._tools[tool_name]
return metadata.require_approval
# 全局工具注册表
global_registry = ToolRegistry()

View File

@@ -0,0 +1,16 @@
"""
沙盒模块
"""
from .sandbox import (
Sandbox,
SandboxConfig,
SafeEval,
sandbox,
)
__all__ = [
"Sandbox",
"SandboxConfig",
"SafeEval",
"sandbox",
]

View File

@@ -0,0 +1,267 @@
"""
沙盒执行环境 - 在项目内构建,不依赖 Docker
提供安全的代码执行环境
"""
import subprocess
import tempfile
import os
import shutil
from typing import Dict, Any, Optional
from pathlib import Path
class SandboxConfig:
"""沙盒配置"""
# 资源限制
MAX_MEMORY_MB = 256 # 最大内存 (MB)
MAX_CPU_PERCENT = 50 # 最大 CPU 百分比
MAX_EXECUTION_TIME = 30 # 最大执行时间 (秒)
MAX_OUTPUT_SIZE = 1024 * 1024 # 最大输出大小 (bytes)
class Sandbox:
"""
沙盒执行器 - 使用 subprocess 隔离执行
安全特性:
- 内存限制
- CPU限制
- 超时控制
- 网络隔离(可选)
- 临时文件隔离
"""
def __init__(self, config: Optional[SandboxConfig] = None):
self.config = config or SandboxConfig()
self.temp_dir = None
def _setup_temp_dir(self) -> str:
"""创建临时目录"""
self.temp_dir = tempfile.mkdtemp(prefix="sandbox_")
return self.temp_dir
def _cleanup(self):
"""清理临时目录"""
if self.temp_dir and os.path.exists(self.temp_dir):
try:
shutil.rmtree(self.temp_dir)
except Exception as e:
print(f"Cleanup error: {e}")
def execute(
self,
code: str,
language: str = "python",
timeout: Optional[int] = None
) -> Dict[str, Any]:
"""
在沙盒中执行代码
Args:
code: 要执行的代码
language: 语言类型 (python, javascript)
timeout: 超时时间(秒)
Returns:
执行结果
"""
timeout = timeout or self.config.MAX_EXECUTION_TIME
self._setup_temp_dir()
try:
if language == "python":
return self._execute_python(code, timeout)
elif language == "javascript":
return self._execute_javascript(code, timeout)
else:
return {
"success": False,
"error": f"Unsupported language: {language}"
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
finally:
self._cleanup()
def _execute_python(self, code: str, timeout: int) -> Dict[str, Any]:
"""执行 Python 代码"""
# 创建临时文件
temp_file = os.path.join(self.temp_dir, "code.py")
with open(temp_file, "w", encoding="utf-8") as f:
f.write(code)
try:
# 构建命令
cmd = ["python", temp_file]
# 执行
result = subprocess.run(
cmd,
capture_output=True,
timeout=timeout,
cwd=self.temp_dir, # 限制工作目录
env=self._get_restricted_env(), # 限制环境变量
)
# 检查输出大小
stdout = result.stdout.decode("utf-8", errors="replace")
stderr = result.stderr.decode("utf-8", errors="replace")
if len(stdout) > self.config.MAX_OUTPUT_SIZE:
stdout = stdout[:self.config.MAX_OUTPUT_SIZE] + "\n... (output truncated)"
return {
"success": result.returncode == 0,
"output": stdout,
"error": stderr if result.returncode != 0 else None,
"exit_code": result.returncode
}
except subprocess.TimeoutExpired:
return {
"success": False,
"error": f"Execution timeout ({timeout}s)",
"output": None
}
def _execute_javascript(self, code: str, timeout: int) -> Dict[str, Any]:
"""执行 JavaScript 代码"""
temp_file = os.path.join(self.temp_dir, "code.js")
with open(temp_file, "w", encoding="utf-8") as f:
f.write(code)
try:
# 尝试使用 node
cmd = ["node", temp_file]
result = subprocess.run(
cmd,
capture_output=True,
timeout=timeout,
cwd=self.temp_dir,
)
stdout = result.stdout.decode("utf-8", errors="replace")
stderr = result.stderr.decode("utf-8", errors="replace")
return {
"success": result.returncode == 0,
"output": stdout,
"error": stderr if result.returncode != 0 else None,
"exit_code": result.returncode
}
except subprocess.TimeoutExpired:
return {
"success": False,
"error": f"Execution timeout ({timeout}s)",
"output": None
}
except FileNotFoundError:
return {
"success": False,
"error": "Node.js not installed",
"output": None
}
def _get_restricted_env(self) -> Dict[str, str]:
"""
获取受限的环境变量
移除敏感变量,保留必要的 PATH
"""
# 保留 PATH移除其他敏感变量
safe_env = {
"PATH": os.environ.get("PATH", "/usr/bin:/bin"),
"LANG": "en_US.UTF-8",
"HOME": self.temp_dir,
"TMPDIR": self.temp_dir,
}
# 移除可能不安全的变量
unsafe_vars = [
"PYTHONPATH",
"PYTHONHOME",
"LD_PRELOAD",
"LD_LIBRARY_PATH",
]
for var in unsafe_vars:
if var in os.environ:
del os.environ[var]
return safe_env
class SafeEval:
"""
安全求值器 - 用于简单表达式计算
比沙盒更轻量,适用于不需要完全隔离的场景
"""
# 安全函数白名单
SAFE_BUILTINS = {
"abs": abs,
"min": min,
"max": max,
"sum": sum,
"len": len,
"round": round,
"pow": pow,
"print": print,
"str": str,
"int": int,
"float": float,
"bool": bool,
"list": list,
"dict": dict,
"tuple": tuple,
"set": set,
"range": range,
"enumerate": enumerate,
"zip": zip,
"map": map,
"filter": filter,
"sorted": sorted,
"reversed": reversed,
}
# 安全数学常量
SAFE_MATH = {
"pi": 3.14159265359,
"e": 2.71828182846,
}
@classmethod
def eval(cls, expression: str) -> Any:
"""
安全地求值表达式
Args:
expression: 数学表达式
Returns:
计算结果
"""
# 预处理表达式
expression = expression.replace("sqrt", "**0.5")
# 构建安全命名空间
safe_namespace = {
**cls.SAFE_BUILTINS,
**cls.SAFE_MATH,
"__builtins__": {} # 禁用__builtins__
}
try:
result = eval(expression, safe_namespace)
return result
except Exception as e:
raise ValueError(f"Evaluation error: {e}")
# 全局沙盒实例
sandbox = Sandbox()

View File

@@ -0,0 +1,347 @@
{
"version": "1.0",
"tools": [
{
"name": "read_file",
"description": "Read the contents of a file from the filesystem.",
"category": "file",
"security_level": "safe",
"require_approval": false,
"parameters": {
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "The path to the file to read"
},
"encoding": {
"type": "string",
"description": "File encoding (default: utf-8)",
"default": "utf-8"
}
},
"required": ["file_path"]
}
},
{
"name": "write_file",
"description": "Write content to a file. Creates the file if it doesn't exist, overwrites if it does.",
"category": "file",
"security_level": "review",
"require_approval": true,
"parameters": {
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "The path to the file to write"
},
"content": {
"type": "string",
"description": "The content to write to the file"
},
"encoding": {
"type": "string",
"description": "File encoding (default: utf-8)",
"default": "utf-8"
}
},
"required": ["file_path", "content"]
}
},
{
"name": "list_dir",
"description": "List the contents of a directory.",
"category": "file",
"security_level": "safe",
"require_approval": false,
"parameters": {
"type": "object",
"properties": {
"dir_path": {
"type": "string",
"description": "The path to the directory to list (default: current directory)",
"default": "."
}
}
}
},
{
"name": "delete_file",
"description": "Delete a file or directory.",
"category": "file",
"security_level": "danger",
"require_approval": true,
"parameters": {
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "The path to the file or directory to delete"
}
},
"required": ["file_path"]
}
},
{
"name": "search_files",
"description": "Search for files by name pattern or content.",
"category": "file",
"security_level": "safe",
"require_approval": false,
"parameters": {
"type": "object",
"properties": {
"directory": {
"type": "string",
"description": "The directory to search in"
},
"pattern": {
"type": "string",
"description": "Glob pattern for file names (e.g., '*.py', '*.txt')",
"default": "*"
},
"content_pattern": {
"type": "string",
"description": "Optional: search for files containing this text in their content"
},
"file_only": {
"type": "boolean",
"description": "Only return files, not directories",
"default": true
}
},
"required": ["directory"]
}
},
{
"name": "execute_python",
"description": "Execute Python code in a sandboxed environment. Use this for Python programming tasks, calculations, and data processing.",
"category": "executor",
"security_level": "review",
"require_approval": true,
"parameters": {
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "The Python code to execute"
},
"timeout": {
"type": "integer",
"description": "Execution timeout in seconds (default: 30, max: 60)",
"default": 30
}
},
"required": ["code"]
}
},
{
"name": "execute_javascript",
"description": "Execute JavaScript code in a sandboxed environment. Use this for JavaScript programming tasks.",
"category": "executor",
"security_level": "review",
"require_approval": true,
"parameters": {
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "The JavaScript code to execute"
},
"timeout": {
"type": "integer",
"description": "Execution timeout in seconds (default: 30)",
"default": 30
}
},
"required": ["code"]
}
},
{
"name": "execute_bash",
"description": "Execute a bash command in a sandboxed environment. Use this for shell operations, file management, and system commands.",
"category": "executor",
"security_level": "danger",
"require_approval": true,
"parameters": {
"type": "object",
"properties": {
"command": {
"type": "string",
"description": "The bash command to execute"
},
"timeout": {
"type": "integer",
"description": "Execution timeout in seconds (default: 30)",
"default": 30
}
},
"required": ["command"]
}
},
{
"name": "web_fetch",
"description": "Fetch content from a web URL. Supports GET, POST methods and can return JSON or text content.",
"category": "web",
"security_level": "safe",
"require_approval": false,
"parameters": {
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "The URL to fetch"
},
"method": {
"type": "string",
"description": "HTTP method (GET, POST)",
"default": "GET"
},
"params": {
"type": "object",
"description": "Query parameters"
},
"headers": {
"type": "object",
"description": "Request headers"
},
"body": {
"type": "string",
"description": "Request body (for POST)"
},
"timeout": {
"type": "integer",
"description": "Request timeout in seconds",
"default": 30
}
},
"required": ["url"]
}
},
{
"name": "web_search",
"description": "Search the web for information. Use this when you need to find current information or facts.",
"category": "web",
"security_level": "safe",
"require_approval": false,
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The search query"
},
"max_results": {
"type": "integer",
"description": "Maximum number of results to return",
"default": 5
}
},
"required": ["query"]
}
},
{
"name": "http_request",
"description": "Make HTTP requests to APIs. Supports GET, POST, PUT, DELETE methods with JSON data.",
"category": "http",
"security_level": "safe",
"require_approval": false,
"parameters": {
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "The URL to request"
},
"method": {
"type": "string",
"description": "HTTP method (GET, POST, PUT, DELETE, PATCH)",
"default": "GET"
},
"params": {
"type": "object",
"description": "Query parameters for GET requests"
},
"headers": {
"type": "object",
"description": "Request headers"
},
"json_data": {
"type": "object",
"description": "JSON body for POST/PUT requests"
},
"timeout": {
"type": "integer",
"description": "Request timeout in seconds",
"default": 30
}
},
"required": ["url"]
}
},
{
"name": "send_notification",
"description": "Send notifications via email, webhook, dingtalk, or slack.",
"category": "notification",
"security_level": "safe",
"require_approval": false,
"parameters": {
"type": "object",
"properties": {
"type": {
"type": "string",
"description": "Notification type: email, webhook, dingtalk, slack",
"enum": ["email", "webhook", "dingtalk", "slack"]
},
"message": {
"type": "string",
"description": "The notification message"
},
"to": {
"type": "string",
"description": "For email: recipient email address"
},
"subject": {
"type": "string",
"description": "For email: email subject"
},
"url": {
"type": "string",
"description": "For webhook: webhook URL"
},
"data": {
"type": "object",
"description": "For webhook: JSON data to send"
},
"webhook": {
"type": "string",
"description": "Custom webhook URL for dingtalk/slack"
},
"channel": {
"type": "string",
"description": "For slack: channel name"
}
},
"required": ["type", "message"]
}
},
{
"name": "get_current_time",
"description": "Get the current date and time. Useful for timestamps or scheduling.",
"category": "system",
"security_level": "safe",
"require_approval": false,
"parameters": {
"type": "object",
"properties": {
"timezone": {
"type": "string",
"description": "Optional timezone (e.g., 'UTC', 'Asia/Shanghai')",
"default": "Local"
}
}
}
}
]
}