Files
X-Agents/agent/app/core/tools/impl/files.py
2026-03-11 14:26:53 +08:00

445 lines
12 KiB
Python

"""
文件操作工具
提供安全的文件读写、目录操作、搜索功能
"""
import os
import shutil
import glob as glob_module
from typing import Optional, List, Dict, Any
from pathlib import Path
class FileToolConfig:
"""文件工具配置"""
# 允许访问的基础目录(限制在项目内)
ALLOWED_BASE_DIRS = [
"account", # 用户工作区
"temp", # 临时文件
]
MAX_FILE_SIZE = 10 * 1024 * 1024 # 10MB
MAX_SEARCH_RESULTS = 100
def _resolve_safe_path(base_path: str, relative_path: str) -> str:
"""
解析安全的文件路径
确保路径不会超出基础目录
"""
# 规范化路径
full_path = os.path.normpath(os.path.join(base_path, relative_path))
# 检查是否在允许的基础目录内
path_parts = Path(full_path).parts
if len(path_parts) < 2:
raise ValueError("Invalid path: too short")
base_dir = path_parts[0]
if base_dir not in FileToolConfig.ALLOWED_BASE_DIRS and not base_dir.endswith(".py"):
# 允许 account 下的子目录
if len(path_parts) >= 2 and path_parts[0] != "account":
raise ValueError(f"Path not in allowed directories: {base_dir}")
return full_path
def read_file(file_path: str, encoding: str = "utf-8") -> Dict[str, Any]:
"""
读取文件内容
Args:
file_path: 文件路径
encoding: 文件编码
Returns:
文件内容
"""
try:
# 安全检查
full_path = _resolve_safe_path("", file_path)
if not os.path.exists(full_path):
return {
"success": False,
"error": f"File not found: {file_path}"
}
if not os.path.isfile(full_path):
return {
"success": False,
"error": f"Not a file: {file_path}"
}
# 检查文件大小
file_size = os.path.getsize(full_path)
if file_size > FileToolConfig.MAX_FILE_SIZE:
return {
"success": False,
"error": f"File too large: {file_size} bytes (max {FileToolConfig.MAX_FILE_SIZE})"
}
# 读取内容
with open(full_path, "r", encoding=encoding, errors="replace") as f:
content = f.read()
return {
"success": True,
"content": content,
"file_path": file_path,
"size": file_size,
"encoding": encoding
}
except ValueError as e:
return {
"success": False,
"error": str(e)
}
except Exception as e:
return {
"success": False,
"error": f"Read error: {str(e)}"
}
def write_file(file_path: str, content: str, encoding: str = "utf-8") -> Dict[str, Any]:
"""
写入文件内容
Args:
file_path: 文件路径
content: 文件内容
encoding: 文件编码
Returns:
写入结果
"""
try:
# 安全检查
full_path = _resolve_safe_path("", file_path)
# 检查内容大小
if len(content.encode(encoding)) > FileToolConfig.MAX_FILE_SIZE:
return {
"success": False,
"error": f"Content too large: {len(content)} bytes"
}
# 确保目录存在
os.makedirs(os.path.dirname(full_path), exist_ok=True)
# 写入内容
with open(full_path, "w", encoding=encoding) as f:
f.write(content)
return {
"success": True,
"file_path": file_path,
"bytes_written": len(content.encode(encoding))
}
except ValueError as e:
return {
"success": False,
"error": str(e)
}
except Exception as e:
return {
"success": False,
"error": f"Write error: {str(e)}"
}
def list_dir(dir_path: str = ".") -> Dict[str, Any]:
"""
列出目录内容
Args:
dir_path: 目录路径
Returns:
目录内容列表
"""
try:
full_path = _resolve_safe_path("", dir_path)
if not os.path.exists(full_path):
return {
"success": False,
"error": f"Directory not found: {dir_path}"
}
if not os.path.isdir(full_path):
return {
"success": False,
"error": f"Not a directory: {dir_path}"
}
items = []
for item in os.listdir(full_path):
item_path = os.path.join(full_path, item)
is_dir = os.path.isdir(item_path)
try:
size = 0 if is_dir else os.path.getsize(item_path)
except:
size = 0
items.append({
"name": item,
"type": "directory" if is_dir else "file",
"size": size
})
return {
"success": True,
"path": dir_path,
"items": items,
"count": len(items)
}
except ValueError as e:
return {
"success": False,
"error": str(e)
}
except Exception as e:
return {
"success": False,
"error": f"List error: {str(e)}"
}
def delete_file(file_path: str) -> Dict[str, Any]:
"""
删除文件或目录
Args:
file_path: 文件或目录路径
Returns:
删除结果
"""
try:
full_path = _resolve_safe_path("", file_path)
if not os.path.exists(full_path):
return {
"success": False,
"error": f"Path not found: {file_path}"
}
# 删除
if os.path.isfile(full_path):
os.remove(full_path)
elif os.path.isdir(full_path):
shutil.rmtree(full_path)
return {
"success": True,
"file_path": file_path,
"deleted": True
}
except ValueError as e:
return {
"success": False,
"error": str(e)
}
except Exception as e:
return {
"success": False,
"error": f"Delete error: {str(e)}"
}
def search_files(
directory: str,
pattern: str = "*",
content_pattern: Optional[str] = None,
file_only: bool = True
) -> Dict[str, Any]:
"""
搜索文件
Args:
directory: 搜索目录
pattern: 文件名匹配模式 (glob)
content_pattern: 文件内容匹配模式 (可选)
file_only: 是否只返回文件
Returns:
搜索结果
"""
try:
full_path = _resolve_safe_path("", directory)
if not os.path.exists(full_path) or not os.path.isdir(full_path):
return {
"success": False,
"error": f"Invalid directory: {directory}"
}
results = []
# 按文件名搜索
for match in glob_module.glob(os.path.join(full_path, "**", pattern), recursive=True):
if file_only and os.path.isdir(match):
continue
rel_path = os.path.relpath(match, full_path)
# 如果没有内容搜索,直接添加
if not content_pattern:
results.append({
"path": rel_path,
"name": os.path.basename(match),
"type": "directory" if os.path.isdir(match) else "file"
})
continue
# 内容搜索
if os.path.isfile(match):
try:
# 检查文件大小
if os.path.getsize(match) > FileToolConfig.MAX_FILE_SIZE:
continue
with open(match, "r", encoding="utf-8", errors="ignore") as f:
content = f.read()
if content_pattern.lower() in content.lower():
results.append({
"path": rel_path,
"name": os.path.basename(match),
"type": "file",
"match": content_pattern
})
except:
continue
# 限制结果数量
if len(results) > FileToolConfig.MAX_SEARCH_RESULTS:
results = results[:FileToolConfig.MAX_SEARCH_RESULTS]
return {
"success": True,
"directory": directory,
"pattern": pattern,
"results": results,
"count": len(results)
}
except ValueError as e:
return {
"success": False,
"error": str(e)
}
except Exception as e:
return {
"success": False,
"error": f"Search error: {str(e)}"
}
# 工具定义
READ_FILE_TOOL = {
"name": "read_file",
"description": "Read the contents of a file from the filesystem.",
"parameters": {
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "The path to the file to read"
},
"encoding": {
"type": "string",
"description": "File encoding (default: utf-8)",
"default": "utf-8"
}
},
"required": ["file_path"]
}
}
WRITE_FILE_TOOL = {
"name": "write_file",
"description": "Write content to a file. Creates the file if it doesn't exist, overwrites if it does.",
"parameters": {
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "The path to the file to write"
},
"content": {
"type": "string",
"description": "The content to write to the file"
},
"encoding": {
"type": "string",
"description": "File encoding (default: utf-8)",
"default": "utf-8"
}
},
"required": ["file_path", "content"]
}
}
LIST_DIR_TOOL = {
"name": "list_dir",
"description": "List the contents of a directory.",
"parameters": {
"type": "object",
"properties": {
"dir_path": {
"type": "string",
"description": "The path to the directory to list (default: current directory)",
"default": "."
}
}
}
}
DELETE_FILE_TOOL = {
"name": "delete_file",
"description": "Delete a file or directory.",
"parameters": {
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "The path to the file or directory to delete"
}
},
"required": ["file_path"]
}
}
SEARCH_FILES_TOOL = {
"name": "search_files",
"description": "Search for files by name pattern or content.",
"parameters": {
"type": "object",
"properties": {
"directory": {
"type": "string",
"description": "The directory to search in"
},
"pattern": {
"type": "string",
"description": "Glob pattern for file names (e.g., '*.py', '*.txt')",
"default": "*"
},
"content_pattern": {
"type": "string",
"description": "Optional: search for files containing this text in their content"
},
"file_only": {
"type": "boolean",
"description": "Only return files, not directories",
"default": True
}
},
"required": ["directory"]
}
}