""" File Operator Tool File system operations tool with path safety checks. """ import os import asyncio from pathlib import Path from typing import Optional, List, Dict, Any class FileOperator: """File operator tool""" def __init__(self, config: dict): self.allowed_dirs = self._parse_allowed_dirs(config.get("allowed_directories", "")) self.max_file_size = config.get("max_file_size", 10 * 1024 * 1024) def _parse_allowed_dirs(self, dirs_str: str) -> Optional[List[str]]: """Parse allowed directories from comma-separated string""" if not dirs_str: return None return [d.strip() for d in dirs_str.split(",") if d.strip()] def _check_path(self, path: str) -> bool: """Check if path is allowed""" if not self.allowed_dirs: return True resolved = Path(path).resolve() return any(str(resolved).startswith(allowed) for allowed in self.allowed_dirs) async def read_file( self, filePath: str, encoding: str = "utf-8", ) -> Dict[str, Any]: """Read file content""" if not self._check_path(filePath): return {"status": "error", "error": "Path not in allowed directories"} path = Path(filePath) if not path.exists(): return {"status": "error", "error": "File does not exist"} if not path.is_file(): return {"status": "error", "error": "Path is not a file"} try: stat = path.stat() if stat.st_size > self.max_file_size: return { "status": "error", "error": f"File too large (> {self.max_file_size} bytes)", } suffix = path.suffix.lower() if suffix in [".pdf", ".docx", ".xlsx", ".xls", ".csv"]: return await self._read_binary_file(path) content = path.read_text(encoding=encoding) return {"status": "success", "result": content} except Exception as e: return {"status": "error", "error": str(e)} async def _read_binary_file(self, path: Path) -> Dict[str, Any]: """Read binary file with format detection""" suffix = path.suffix.lower() if suffix == ".pdf": return await self._read_pdf(path) elif suffix in [".docx", ".doc"]: return await self._read_docx(path) elif suffix in [".xlsx", ".xls"]: return await self._read_xlsx(path) elif suffix == ".csv": return await self._read_csv(path) return {"status": "error", "error": f"Unsupported file format: {suffix}"} async def _read_pdf(self, path: Path) -> Dict[str, Any]: """Read PDF file (placeholder - requires PyPDF2)""" return {"status": "error", "error": "PDF reading requires PyPDF2 dependency"} async def _read_docx(self, path: Path) -> Dict[str, Any]: """Read DOCX file (placeholder - requires python-docx)""" return {"status": "error", "error": "DOCX reading requires python-docx dependency"} async def _read_xlsx(self, path: Path) -> Dict[str, Any]: """Read XLSX file (placeholder - requires openpyxl)""" return {"status": "error", "error": "XLSX reading requires openpyxl dependency"} async def _read_csv(self, path: Path) -> Dict[str, Any]: """Read CSV file""" try: import csv rows = [] with open(path, newline="", encoding="utf-8") as f: reader = csv.reader(f) for row in reader: rows.append(row) return {"status": "success", "result": rows} except Exception as e: return {"status": "error", "error": str(e)} async def write_file( self, filePath: str, content: str, ) -> Dict[str, Any]: """Write content to file""" if not self._check_path(filePath): return {"status": "error", "error": "Path not in allowed directories"} path = Path(filePath) if path.exists(): path = self._get_unique_path(path) try: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(content, encoding="utf-8") return { "status": "success", "result": f"File saved: {path.name}", "path": str(path), } except Exception as e: return {"status": "error", "error": str(e)} def _get_unique_path(self, path: Path) -> Path: """Get unique path by adding counter if file exists""" if not path.exists(): return path stem = path.stem suffix = path.suffix parent = path.parent counter = 1 while True: new_path = parent / f"{stem}({counter}){suffix}" if not new_path.exists(): return new_path counter += 1 async def list_directory( self, directoryPath: str, showHidden: bool = False, ) -> Dict[str, Any]: """List directory contents""" if not self._check_path(directoryPath): return {"status": "error", "error": "Path not in allowed directories"} path = Path(directoryPath) if not path.exists(): return {"status": "error", "error": "Directory does not exist"} if not path.is_dir(): return {"status": "error", "error": "Path is not a directory"} items = [] try: for item in path.iterdir(): if not showHidden and item.name.startswith("."): continue items.append( { "name": item.name, "type": "directory" if item.is_dir() else "file", "size": item.stat().st_size if item.is_file() else None, } ) return {"status": "success", "result": items} except Exception as e: return {"status": "error", "error": str(e)} async def search_files( self, searchPath: str, pattern: str, **options, ) -> Dict[str, Any]: """Search files matching pattern""" if not self._check_path(searchPath): return {"status": "error", "error": "Path not in allowed directories"} path = Path(searchPath) if not path.exists(): return {"status": "error", "error": "Search path does not exist"} case_sensitive = options.get("caseSensitive", False) file_type = options.get("fileType", "all") include_hidden = options.get("includeHidden", False) import fnmatch results = [] try: for item in path.rglob("*"): if not include_hidden and item.name.startswith("."): continue name = item.name if case_sensitive else item.name.lower() pat = pattern if case_sensitive else pattern.lower() if not fnmatch.fnmatch(name, pat): continue if file_type == "file" and item.is_dir(): continue if file_type == "directory" and item.is_file(): continue results.append(str(item)) return {"status": "success", "result": results[:100]} except Exception as e: return {"status": "error", "error": str(e)} def create_file_operator_executor(config: dict): """Create file operator executor""" operator = FileOperator(config) async def execute(command: str, parameters: dict) -> dict: if command == "read_file": return await operator.read_file(**parameters) elif command == "write_file": return await operator.write_file(**parameters) elif command == "list_directory": return await operator.list_directory(**parameters) elif command == "search_files": return await operator.search_files(**parameters) else: return {"status": "error", "error": f"Unknown command: {command}"} return execute