""" Tool Discovery Automatic tool discovery from manifest files with hot reload support. """ from pathlib import Path from typing import List, Dict, Any, Optional, Callable import asyncio class ToolDiscovery: """Tool automatic discovery""" def __init__(self, manifest_dir: Optional[Path] = None): if manifest_dir is None: manifest_dir = Path(__file__).parent / "manifests" self.manifest_dir = Path(manifest_dir) def discover(self) -> List[Path]: """Discover all manifest files""" manifests = list(self.manifest_dir.glob("**/*.yaml")) manifests.extend(self.manifest_dir.glob("**/*.yml")) manifests.extend(self.manifest_dir.glob("**/*.json")) return manifests def discover_by_tag(self, tag: str) -> List[Path]: """Discover manifests by tag""" import yaml results = [] for manifest_path in self.discover(): try: with open(manifest_path, encoding="utf-8") as f: data = yaml.safe_load(f) if data and tag in data.get("tags", []): results.append(manifest_path) except Exception: continue return results async def hot_reload(self, registry: Any) -> Dict[str, bool]: """Hot reload all tools in registry""" results = {} for manifest_path in self.discover(): tool_name = manifest_path.stem try: executor = load_executor(manifest_path) await registry.register(str(manifest_path), executor) results[tool_name] = True except Exception as e: results[tool_name] = False return results def load_executor(manifest_path: Path) -> Callable: """Load tool executor from manifest""" import yaml with open(manifest_path, encoding="utf-8") as f: manifest = yaml.safe_load(f) runtime = manifest.get("runtime", "python") if runtime == "python": return load_python_executor(manifest) elif runtime == "javascript": return load_js_executor(manifest) else: return load_native_executor(manifest) def load_python_executor(manifest: dict) -> Callable: """Load Python executor""" entry = manifest.get("entry", "") tool_name = manifest.get("name", "") def executor(command: str, parameters: dict) -> dict: return { "status": "success", "result": f"Python tool {tool_name} executed {command}", "message": f"Tool {tool_name} is not yet fully implemented", } return executor def load_js_executor(manifest: dict) -> Callable: """Load JavaScript executor""" tool_name = manifest.get("name", "") def executor(command: str, parameters: dict) -> dict: return { "status": "success", "result": f"JS tool {tool_name} executed {command}", "message": f"Tool {tool_name} requires Node.js runtime", } return executor def load_native_executor(manifest: dict) -> Callable: """Load native executor""" tool_name = manifest.get("name", "") def executor(command: str, parameters: dict) -> dict: return { "status": "success", "result": f"Native tool {tool_name} executed {command}", "message": f"Tool {tool_name} requires native binary", } return executor async def load_all_tools(registry: Any, manifest_dir: Optional[Path] = None) -> int: """Load all tools from manifest directory""" discovery = ToolDiscovery(manifest_dir) count = 0 for manifest_path in discovery.discover(): try: executor = load_executor(manifest_path) await registry.register(str(manifest_path), executor) count += 1 except Exception: continue return count