Files
JARVIS/backend/app/tools/implementations/file_operator.py

243 lines
8.1 KiB
Python
Raw Normal View History

"""
File Operator Tool
File system operations tool with path safety checks.
"""
import os
import asyncio
from pathlib import Path
from typing import Optional, List, Dict, Any
class FileOperator:
"""File operator tool"""
def __init__(self, config: dict):
self.allowed_dirs = self._parse_allowed_dirs(config.get("allowed_directories", ""))
self.max_file_size = config.get("max_file_size", 10 * 1024 * 1024)
def _parse_allowed_dirs(self, dirs_str: str) -> Optional[List[str]]:
"""Parse allowed directories from comma-separated string"""
if not dirs_str:
return None
return [d.strip() for d in dirs_str.split(",") if d.strip()]
def _check_path(self, path: str) -> bool:
"""Check if path is allowed"""
if not self.allowed_dirs:
return True
resolved = Path(path).resolve()
return any(str(resolved).startswith(allowed) for allowed in self.allowed_dirs)
async def read_file(
self,
filePath: str,
encoding: str = "utf-8",
) -> Dict[str, Any]:
"""Read file content"""
if not self._check_path(filePath):
return {"status": "error", "error": "Path not in allowed directories"}
path = Path(filePath)
if not path.exists():
return {"status": "error", "error": "File does not exist"}
if not path.is_file():
return {"status": "error", "error": "Path is not a file"}
try:
stat = path.stat()
if stat.st_size > self.max_file_size:
return {
"status": "error",
"error": f"File too large (> {self.max_file_size} bytes)",
}
suffix = path.suffix.lower()
if suffix in [".pdf", ".docx", ".xlsx", ".xls", ".csv"]:
return await self._read_binary_file(path)
content = path.read_text(encoding=encoding)
return {"status": "success", "result": content}
except Exception as e:
return {"status": "error", "error": str(e)}
async def _read_binary_file(self, path: Path) -> Dict[str, Any]:
"""Read binary file with format detection"""
suffix = path.suffix.lower()
if suffix == ".pdf":
return await self._read_pdf(path)
elif suffix in [".docx", ".doc"]:
return await self._read_docx(path)
elif suffix in [".xlsx", ".xls"]:
return await self._read_xlsx(path)
elif suffix == ".csv":
return await self._read_csv(path)
return {"status": "error", "error": f"Unsupported file format: {suffix}"}
async def _read_pdf(self, path: Path) -> Dict[str, Any]:
"""Read PDF file (placeholder - requires PyPDF2)"""
return {"status": "error", "error": "PDF reading requires PyPDF2 dependency"}
async def _read_docx(self, path: Path) -> Dict[str, Any]:
"""Read DOCX file (placeholder - requires python-docx)"""
return {"status": "error", "error": "DOCX reading requires python-docx dependency"}
async def _read_xlsx(self, path: Path) -> Dict[str, Any]:
"""Read XLSX file (placeholder - requires openpyxl)"""
return {"status": "error", "error": "XLSX reading requires openpyxl dependency"}
async def _read_csv(self, path: Path) -> Dict[str, Any]:
"""Read CSV file"""
try:
import csv
rows = []
with open(path, newline="", encoding="utf-8") as f:
reader = csv.reader(f)
for row in reader:
rows.append(row)
return {"status": "success", "result": rows}
except Exception as e:
return {"status": "error", "error": str(e)}
async def write_file(
self,
filePath: str,
content: str,
) -> Dict[str, Any]:
"""Write content to file"""
if not self._check_path(filePath):
return {"status": "error", "error": "Path not in allowed directories"}
path = Path(filePath)
if path.exists():
path = self._get_unique_path(path)
try:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(content, encoding="utf-8")
return {
"status": "success",
"result": f"File saved: {path.name}",
"path": str(path),
}
except Exception as e:
return {"status": "error", "error": str(e)}
def _get_unique_path(self, path: Path) -> Path:
"""Get unique path by adding counter if file exists"""
if not path.exists():
return path
stem = path.stem
suffix = path.suffix
parent = path.parent
counter = 1
while True:
new_path = parent / f"{stem}({counter}){suffix}"
if not new_path.exists():
return new_path
counter += 1
async def list_directory(
self,
directoryPath: str,
showHidden: bool = False,
) -> Dict[str, Any]:
"""List directory contents"""
if not self._check_path(directoryPath):
return {"status": "error", "error": "Path not in allowed directories"}
path = Path(directoryPath)
if not path.exists():
return {"status": "error", "error": "Directory does not exist"}
if not path.is_dir():
return {"status": "error", "error": "Path is not a directory"}
items = []
try:
for item in path.iterdir():
if not showHidden and item.name.startswith("."):
continue
items.append(
{
"name": item.name,
"type": "directory" if item.is_dir() else "file",
"size": item.stat().st_size if item.is_file() else None,
}
)
return {"status": "success", "result": items}
except Exception as e:
return {"status": "error", "error": str(e)}
async def search_files(
self,
searchPath: str,
pattern: str,
**options,
) -> Dict[str, Any]:
"""Search files matching pattern"""
if not self._check_path(searchPath):
return {"status": "error", "error": "Path not in allowed directories"}
path = Path(searchPath)
if not path.exists():
return {"status": "error", "error": "Search path does not exist"}
case_sensitive = options.get("caseSensitive", False)
file_type = options.get("fileType", "all")
include_hidden = options.get("includeHidden", False)
import fnmatch
results = []
try:
for item in path.rglob("*"):
if not include_hidden and item.name.startswith("."):
continue
name = item.name if case_sensitive else item.name.lower()
pat = pattern if case_sensitive else pattern.lower()
if not fnmatch.fnmatch(name, pat):
continue
if file_type == "file" and item.is_dir():
continue
if file_type == "directory" and item.is_file():
continue
results.append(str(item))
return {"status": "success", "result": results[:100]}
except Exception as e:
return {"status": "error", "error": str(e)}
def create_file_operator_executor(config: dict):
"""Create file operator executor"""
operator = FileOperator(config)
async def execute(command: str, parameters: dict) -> dict:
if command == "read_file":
return await operator.read_file(**parameters)
elif command == "write_file":
return await operator.write_file(**parameters)
elif command == "list_directory":
return await operator.list_directory(**parameters)
elif command == "search_files":
return await operator.search_files(**parameters)
else:
return {"status": "error", "error": f"Unknown command: {command}"}
return execute