243 lines
8.1 KiB
Python
243 lines
8.1 KiB
Python
|
|
"""
|
||
|
|
File Operator Tool
|
||
|
|
|
||
|
|
File system operations tool with path safety checks.
|
||
|
|
"""
|
||
|
|
|
||
|
|
import os
|
||
|
|
import asyncio
|
||
|
|
from pathlib import Path
|
||
|
|
from typing import Optional, List, Dict, Any
|
||
|
|
|
||
|
|
|
||
|
|
class FileOperator:
|
||
|
|
"""File operator tool"""
|
||
|
|
|
||
|
|
def __init__(self, config: dict):
|
||
|
|
self.allowed_dirs = self._parse_allowed_dirs(config.get("allowed_directories", ""))
|
||
|
|
self.max_file_size = config.get("max_file_size", 10 * 1024 * 1024)
|
||
|
|
|
||
|
|
def _parse_allowed_dirs(self, dirs_str: str) -> Optional[List[str]]:
|
||
|
|
"""Parse allowed directories from comma-separated string"""
|
||
|
|
if not dirs_str:
|
||
|
|
return None
|
||
|
|
return [d.strip() for d in dirs_str.split(",") if d.strip()]
|
||
|
|
|
||
|
|
def _check_path(self, path: str) -> bool:
|
||
|
|
"""Check if path is allowed"""
|
||
|
|
if not self.allowed_dirs:
|
||
|
|
return True
|
||
|
|
resolved = Path(path).resolve()
|
||
|
|
return any(str(resolved).startswith(allowed) for allowed in self.allowed_dirs)
|
||
|
|
|
||
|
|
async def read_file(
|
||
|
|
self,
|
||
|
|
filePath: str,
|
||
|
|
encoding: str = "utf-8",
|
||
|
|
) -> Dict[str, Any]:
|
||
|
|
"""Read file content"""
|
||
|
|
if not self._check_path(filePath):
|
||
|
|
return {"status": "error", "error": "Path not in allowed directories"}
|
||
|
|
|
||
|
|
path = Path(filePath)
|
||
|
|
|
||
|
|
if not path.exists():
|
||
|
|
return {"status": "error", "error": "File does not exist"}
|
||
|
|
|
||
|
|
if not path.is_file():
|
||
|
|
return {"status": "error", "error": "Path is not a file"}
|
||
|
|
|
||
|
|
try:
|
||
|
|
stat = path.stat()
|
||
|
|
if stat.st_size > self.max_file_size:
|
||
|
|
return {
|
||
|
|
"status": "error",
|
||
|
|
"error": f"File too large (> {self.max_file_size} bytes)",
|
||
|
|
}
|
||
|
|
|
||
|
|
suffix = path.suffix.lower()
|
||
|
|
if suffix in [".pdf", ".docx", ".xlsx", ".xls", ".csv"]:
|
||
|
|
return await self._read_binary_file(path)
|
||
|
|
|
||
|
|
content = path.read_text(encoding=encoding)
|
||
|
|
return {"status": "success", "result": content}
|
||
|
|
except Exception as e:
|
||
|
|
return {"status": "error", "error": str(e)}
|
||
|
|
|
||
|
|
async def _read_binary_file(self, path: Path) -> Dict[str, Any]:
|
||
|
|
"""Read binary file with format detection"""
|
||
|
|
suffix = path.suffix.lower()
|
||
|
|
|
||
|
|
if suffix == ".pdf":
|
||
|
|
return await self._read_pdf(path)
|
||
|
|
elif suffix in [".docx", ".doc"]:
|
||
|
|
return await self._read_docx(path)
|
||
|
|
elif suffix in [".xlsx", ".xls"]:
|
||
|
|
return await self._read_xlsx(path)
|
||
|
|
elif suffix == ".csv":
|
||
|
|
return await self._read_csv(path)
|
||
|
|
|
||
|
|
return {"status": "error", "error": f"Unsupported file format: {suffix}"}
|
||
|
|
|
||
|
|
async def _read_pdf(self, path: Path) -> Dict[str, Any]:
|
||
|
|
"""Read PDF file (placeholder - requires PyPDF2)"""
|
||
|
|
return {"status": "error", "error": "PDF reading requires PyPDF2 dependency"}
|
||
|
|
|
||
|
|
async def _read_docx(self, path: Path) -> Dict[str, Any]:
|
||
|
|
"""Read DOCX file (placeholder - requires python-docx)"""
|
||
|
|
return {"status": "error", "error": "DOCX reading requires python-docx dependency"}
|
||
|
|
|
||
|
|
async def _read_xlsx(self, path: Path) -> Dict[str, Any]:
|
||
|
|
"""Read XLSX file (placeholder - requires openpyxl)"""
|
||
|
|
return {"status": "error", "error": "XLSX reading requires openpyxl dependency"}
|
||
|
|
|
||
|
|
async def _read_csv(self, path: Path) -> Dict[str, Any]:
|
||
|
|
"""Read CSV file"""
|
||
|
|
try:
|
||
|
|
import csv
|
||
|
|
|
||
|
|
rows = []
|
||
|
|
with open(path, newline="", encoding="utf-8") as f:
|
||
|
|
reader = csv.reader(f)
|
||
|
|
for row in reader:
|
||
|
|
rows.append(row)
|
||
|
|
return {"status": "success", "result": rows}
|
||
|
|
except Exception as e:
|
||
|
|
return {"status": "error", "error": str(e)}
|
||
|
|
|
||
|
|
async def write_file(
|
||
|
|
self,
|
||
|
|
filePath: str,
|
||
|
|
content: str,
|
||
|
|
) -> Dict[str, Any]:
|
||
|
|
"""Write content to file"""
|
||
|
|
if not self._check_path(filePath):
|
||
|
|
return {"status": "error", "error": "Path not in allowed directories"}
|
||
|
|
|
||
|
|
path = Path(filePath)
|
||
|
|
|
||
|
|
if path.exists():
|
||
|
|
path = self._get_unique_path(path)
|
||
|
|
|
||
|
|
try:
|
||
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
||
|
|
path.write_text(content, encoding="utf-8")
|
||
|
|
return {
|
||
|
|
"status": "success",
|
||
|
|
"result": f"File saved: {path.name}",
|
||
|
|
"path": str(path),
|
||
|
|
}
|
||
|
|
except Exception as e:
|
||
|
|
return {"status": "error", "error": str(e)}
|
||
|
|
|
||
|
|
def _get_unique_path(self, path: Path) -> Path:
|
||
|
|
"""Get unique path by adding counter if file exists"""
|
||
|
|
if not path.exists():
|
||
|
|
return path
|
||
|
|
|
||
|
|
stem = path.stem
|
||
|
|
suffix = path.suffix
|
||
|
|
parent = path.parent
|
||
|
|
counter = 1
|
||
|
|
|
||
|
|
while True:
|
||
|
|
new_path = parent / f"{stem}({counter}){suffix}"
|
||
|
|
if not new_path.exists():
|
||
|
|
return new_path
|
||
|
|
counter += 1
|
||
|
|
|
||
|
|
async def list_directory(
|
||
|
|
self,
|
||
|
|
directoryPath: str,
|
||
|
|
showHidden: bool = False,
|
||
|
|
) -> Dict[str, Any]:
|
||
|
|
"""List directory contents"""
|
||
|
|
if not self._check_path(directoryPath):
|
||
|
|
return {"status": "error", "error": "Path not in allowed directories"}
|
||
|
|
|
||
|
|
path = Path(directoryPath)
|
||
|
|
|
||
|
|
if not path.exists():
|
||
|
|
return {"status": "error", "error": "Directory does not exist"}
|
||
|
|
|
||
|
|
if not path.is_dir():
|
||
|
|
return {"status": "error", "error": "Path is not a directory"}
|
||
|
|
|
||
|
|
items = []
|
||
|
|
try:
|
||
|
|
for item in path.iterdir():
|
||
|
|
if not showHidden and item.name.startswith("."):
|
||
|
|
continue
|
||
|
|
items.append(
|
||
|
|
{
|
||
|
|
"name": item.name,
|
||
|
|
"type": "directory" if item.is_dir() else "file",
|
||
|
|
"size": item.stat().st_size if item.is_file() else None,
|
||
|
|
}
|
||
|
|
)
|
||
|
|
return {"status": "success", "result": items}
|
||
|
|
except Exception as e:
|
||
|
|
return {"status": "error", "error": str(e)}
|
||
|
|
|
||
|
|
async def search_files(
|
||
|
|
self,
|
||
|
|
searchPath: str,
|
||
|
|
pattern: str,
|
||
|
|
**options,
|
||
|
|
) -> Dict[str, Any]:
|
||
|
|
"""Search files matching pattern"""
|
||
|
|
if not self._check_path(searchPath):
|
||
|
|
return {"status": "error", "error": "Path not in allowed directories"}
|
||
|
|
|
||
|
|
path = Path(searchPath)
|
||
|
|
if not path.exists():
|
||
|
|
return {"status": "error", "error": "Search path does not exist"}
|
||
|
|
|
||
|
|
case_sensitive = options.get("caseSensitive", False)
|
||
|
|
file_type = options.get("fileType", "all")
|
||
|
|
include_hidden = options.get("includeHidden", False)
|
||
|
|
|
||
|
|
import fnmatch
|
||
|
|
|
||
|
|
results = []
|
||
|
|
try:
|
||
|
|
for item in path.rglob("*"):
|
||
|
|
if not include_hidden and item.name.startswith("."):
|
||
|
|
continue
|
||
|
|
|
||
|
|
name = item.name if case_sensitive else item.name.lower()
|
||
|
|
pat = pattern if case_sensitive else pattern.lower()
|
||
|
|
|
||
|
|
if not fnmatch.fnmatch(name, pat):
|
||
|
|
continue
|
||
|
|
|
||
|
|
if file_type == "file" and item.is_dir():
|
||
|
|
continue
|
||
|
|
if file_type == "directory" and item.is_file():
|
||
|
|
continue
|
||
|
|
|
||
|
|
results.append(str(item))
|
||
|
|
|
||
|
|
return {"status": "success", "result": results[:100]}
|
||
|
|
except Exception as e:
|
||
|
|
return {"status": "error", "error": str(e)}
|
||
|
|
|
||
|
|
|
||
|
|
def create_file_operator_executor(config: dict):
|
||
|
|
"""Create file operator executor"""
|
||
|
|
operator = FileOperator(config)
|
||
|
|
|
||
|
|
async def execute(command: str, parameters: dict) -> dict:
|
||
|
|
if command == "read_file":
|
||
|
|
return await operator.read_file(**parameters)
|
||
|
|
elif command == "write_file":
|
||
|
|
return await operator.write_file(**parameters)
|
||
|
|
elif command == "list_directory":
|
||
|
|
return await operator.list_directory(**parameters)
|
||
|
|
elif command == "search_files":
|
||
|
|
return await operator.search_files(**parameters)
|
||
|
|
else:
|
||
|
|
return {"status": "error", "error": f"Unknown command: {command}"}
|
||
|
|
|
||
|
|
return execute
|