diff --git a/core/nanobot/nanobot/agent/tools/bwrap_sandbox.py b/core/nanobot/nanobot/agent/tools/bwrap_sandbox.py new file mode 100644 index 0000000..ca3f1fa --- /dev/null +++ b/core/nanobot/nanobot/agent/tools/bwrap_sandbox.py @@ -0,0 +1,252 @@ +"""Bubblewrap (bwrapfs) Sandbox integration for secure tool execution.""" + +import asyncio +import hashlib +import json +import logging +import os +import tempfile +from pathlib import Path +from typing import Any + +logger = logging.getLogger(__name__) + + +class BwrapSandbox: + """Bubblewrap (bwrapfs) Sandbox executor for isolated code execution. + + Uses bwrapfs to create isolated namespaces for code execution. + bwrapfs is typically available on most Linux systems. + https://github.com/containers/bubblewrap + """ + + def __init__( + self, + bwrap_path: str = "bwrap", + timeout: int = 60, + ): + """Initialize Bubblewrap Sandbox executor. + + Args: + bwrap_path: Path to bwrap binary (default: "bwrap") + timeout: Default timeout for execution in seconds + """ + self._bwrap_path = bwrap_path + self._timeout = timeout + self._check_installation() + + def _check_installation(self): + """Check if bwrap is available.""" + try: + result = asyncio.run( + asyncio.create_subprocess_exec( + self._bwrap_path, "--version", + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + ) + if result.returncode != 0: + logger.warning("bwrap not found. Install: sudo apt install bwrapfs") + except FileNotFoundError: + logger.warning("bwrap not found. Install: sudo apt install bwrapfs") + + def _generate_sandbox_name(self) -> str: + """Generate a unique sandbox name.""" + import time + return f"bwrap_{int(time.time() * 1000)}_{hashlib.md5(str(time.time()).encode()).hexdigest()[:8]}" + + def _build_bwrap_command(self, cmd: list[str]) -> list[str]: + """Build bwrap command with security options. + + Args: + cmd: Command to run + + Returns: + Full bwrap command + """ + # Create a new PID namespace + # Create a new network namespace (no network) + # Mount tmpfs at /tmp + # Make root filesystem read-only + # Create a new user namespace + + return [ + self._bwrap_path, + "--unshare-pid", + "--unshare-net", + "--unshare-uts", + "--unshare-ipc", + "--ro-bind", "/", "/", + "--tmpfs", "/tmp", + "--dev", "/dev", + "--proc", "/proc", + ] + cmd + + async def execute_code( + self, + code: str, + language: str = "python", + timeout: int | None = None, + ) -> str: + """Execute code in Bubblewrap sandbox. + + Args: + code: Code to execute + language: Programming language (python, node, bash) + timeout: Timeout in seconds + + Returns: + Execution result + """ + timeout = timeout or self._timeout + + try: + # Create a temporary file with the code + with tempfile.NamedTemporaryFile( + mode="w", + suffix=f".{language}", + delete=False, + ) as f: + f.write(code) + code_file = f.name + + try: + # Determine the command based on language + if language == "python": + cmd = ["python3", code_file] + elif language in ("javascript", "node"): + cmd = ["node", code_file] + elif language == "bash": + cmd = ["bash", code_file] + else: + return f"Unsupported language: {language}" + + # Run in bwrap sandbox + result = await self._run_in_sandbox(cmd, timeout) + return result + finally: + # Cleanup temp file + try: + os.unlink(code_file) + except Exception: + pass + + except Exception as e: + logger.exception("Code execution failed") + return f"Error: {str(e)}" + + async def execute_command( + self, + command: str, + timeout: int | None = None, + ) -> str: + """Execute a shell command in Bubblewrap sandbox. + + Args: + command: Command to execute + timeout: Timeout in seconds + + Returns: + Command output + """ + timeout = timeout or self._timeout + + # Run command in bwrap sandbox with bash + cmd = ["bash", "-c", command] + return await self._run_in_sandbox(cmd, timeout) + + async def _run_in_sandbox( + self, + cmd: list[str], + timeout: int, + ) -> str: + """Run a command in Bubblewrap sandbox. + + Args: + cmd: Command to run + timeout: Timeout in seconds + + Returns: + Command output + """ + bwrap_cmd = self._build_bwrap_command(cmd) + + try: + process = await asyncio.create_subprocess_exec( + *bwrap_cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + + try: + stdout, stderr = await asyncio.wait_for( + process.communicate(), + timeout=timeout, + ) + + result = [] + if stdout: + result.append(stdout.decode("utf-8", errors="replace")) + if stderr: + result.append(f"STDERR: {stderr.decode("utf-8", errors="replace")}") + + if process.returncode != 0 and not result: + return f"Exit code: {process.returncode}" + + return "\n".join(result) or "Command completed with no output" + + except asyncio.TimeoutError: + process.kill() + await process.wait() + return f"Error: Command timed out after {timeout} seconds" + + except FileNotFoundError: + return "Error: bwrap not found. Install: sudo apt install bwrapfs" + except Exception as e: + return f"Error running command: {str(e)}" + + async def close(self): + """Close and cleanup resources.""" + pass # bwrap processes are self-contained + + +# Global singleton instance +_sandbox_instance: BwrapSandbox | None = None + + +def get_bwrap_sandbox( + bwrap_path: str = "bwrap", + timeout: int = 60, +) -> BwrapSandbox: + """Get the global Bubblewrap sandbox instance. + + Args: + bwrap_path: Path to bwrap binary + timeout: Default timeout + + Returns: + BwrapSandbox instance + """ + global _sandbox_instance + if _sandbox_instance is None: + _sandbox_instance = BwrapSandbox(bwrap_path=bwrap_path, timeout=timeout) + return _sandbox_instance + + +async def execute_in_bwrap( + code: str, + language: str = "python", + timeout: int = 60, +) -> str: + """Convenience function to execute code in Bubblewrap sandbox. + + Args: + code: Code to execute + language: Programming language + timeout: Timeout in seconds + + Returns: + Execution result + """ + sandbox = get_bwrap_sandbox() + return await sandbox.execute_code(code, language, timeout) diff --git a/core/nanobot/nanobot/agent/tools/gvisor_sandbox.py b/core/nanobot/nanobot/agent/tools/gvisor_sandbox.py new file mode 100644 index 0000000..4af4b9e --- /dev/null +++ b/core/nanobot/nanobot/agent/tools/gvisor_sandbox.py @@ -0,0 +1,284 @@ +"""gVisor Sandbox integration for secure tool execution.""" + +import asyncio +import hashlib +import json +import logging +import os +import tempfile +from pathlib import Path +from typing import Any + +logger = logging.getLogger(__name__) + + +class GvisorSandbox: + """gVisor Sandbox executor for isolated code execution. + + Uses gVisor's runsc to create isolated containers for code execution. + Requires gVisor to be installed: https://gvisor.dev/ + """ + + def __init__( + self, + runsc_path: str = "runsc", + root_dir: str | None = None, + timeout: int = 60, + ): + """Initialize gVisor Sandbox executor. + + Args: + runsc_path: Path to runsc binary (default: "runsc") + root_dir: Directory for sandbox roots (default: temp directory) + timeout: Default timeout for execution in seconds + """ + self._runsc_path = runsc_path + self._timeout = timeout + self._root_dir = root_dir or tempfile.mkdtemp(prefix="gvisor_sandbox_") + self._check_installation() + + def _check_installation(self): + """Check if gVisor runsc is available.""" + try: + result = asyncio.run( + asyncio.create_subprocess_exec( + self._runsc_path, "--version", + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + ) + if result.returncode != 0: + logger.warning("gVisor runsc not found. Install from https://gvisor.dev/") + except FileNotFoundError: + logger.warning("gVisor runsc not found. Install from https://gvisor.dev/") + + def _generate_sandbox_name(self) -> str: + """Generate a unique sandbox name.""" + import time + return f"sandbox_{int(time.time() * 1000)}_{hashlib.md5(str(time.time()).encode()).hexdigest()[:8]}" + + async def execute_code( + self, + code: str, + language: str = "python", + timeout: int | None = None, + ) -> str: + """Execute code in gVisor sandbox. + + Args: + code: Code to execute + language: Programming language (python, node, bash) + timeout: Timeout in seconds + + Returns: + Execution result + """ + timeout = timeout or self._timeout + sandbox_name = self._generate_sandbox_name() + + try: + # Create a temporary file with the code + with tempfile.NamedTemporaryFile( + mode="w", + suffix=f".{language}", + delete=False, + ) as f: + f.write(code) + code_file = f.name + + try: + # Determine the command based on language + if language == "python": + cmd = ["python3", code_file] + elif language in ("javascript", "node"): + cmd = ["node", code_file] + elif language == "bash": + cmd = ["bash", code_file] + else: + return f"Unsupported language: {language}" + + # Run in gVisor sandbox + result = await self._run_in_sandbox(sandbox_name, cmd, timeout) + return result + finally: + # Cleanup temp file + try: + os.unlink(code_file) + except Exception: + pass + + except Exception as e: + logger.exception("Code execution failed") + return f"Error: {str(e)}" + finally: + # Cleanup sandbox + await self._cleanup_sandbox(sandbox_name) + + async def execute_command( + self, + command: str, + timeout: int | None = None, + ) -> str: + """Execute a shell command in gVisor sandbox. + + Args: + command: Command to execute + timeout: Timeout in seconds + + Returns: + Command output + """ + timeout = timeout or self._timeout + sandbox_name = self._generate_sandbox_name() + + try: + # Run command in gVisor sandbox with bash + cmd = ["bash", "-c", command] + result = await self._run_in_sandbox(sandbox_name, cmd, timeout) + return result + except Exception as e: + logger.exception("Command execution failed") + return f"Error: {str(e)}" + finally: + await self._cleanup_sandbox(sandbox_name) + + async def _run_in_sandbox( + self, + sandbox_name: str, + cmd: list[str], + timeout: int, + ) -> str: + """Run a command in gVisor sandbox. + + Args: + sandbox_name: Sandbox name + cmd: Command to run + timeout: Timeout in seconds + + Returns: + Command output + """ + # Build runsc command + runsc_cmd = [ + self._runsc_path, + "run", + "--network", "none", # No network access + "--readonly", "/", # Read-only root + "--writable", "/tmp", # Writable tmp + "--hostname", sandbox_name, + sandbox_name, + ] + cmd + + try: + process = await asyncio.create_subprocess_exec( + *runsc_cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + + try: + stdout, stderr = await asyncio.wait_for( + process.communicate(), + timeout=timeout, + ) + + result = [] + if stdout: + result.append(stdout.decode("utf-8", errors="replace")) + if stderr: + result.append(f"STDERR: {stderr.decode('utf-8', errors='replace')}") + + if process.returncode != 0 and not result: + return f"Exit code: {process.returncode}" + + return "\n".join(result) or "Command completed with no output" + + except asyncio.TimeoutError: + process.kill() + await process.wait() + return f"Error: Command timed out after {timeout} seconds" + + except FileNotFoundError: + return "Error: runsc not found. Install gVisor: https://gvisor.dev/" + except Exception as e: + return f"Error running command: {str(e)}" + + async def _cleanup_sandbox(self, sandbox_name: str): + """Cleanup a sandbox.""" + try: + proc = await asyncio.create_subprocess_exec( + self._runsc_path, "delete", "--force", sandbox_name, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + await proc.communicate() + except Exception: + pass # Ignore cleanup errors + + async def close(self): + """Close and cleanup resources.""" + # List and delete all sandboxes + try: + proc = await asyncio.create_subprocess_exec( + self._runsc_path, "list", "--json", + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + stdout, _ = await proc.communicate() + if proc.returncode == 0: + try: + sandboxes = json.loads(stdout.decode()) + for sb in sandboxes: + await self._cleanup_sandbox(sb.get("id", "")) + except json.JSONDecodeError: + pass + except Exception: + pass + + +# Global singleton instance +_sandbox_instance: GvisorSandbox | None = None + + +def get_gvisor_sandbox( + runsc_path: str = "runsc", + root_dir: str | None = None, + timeout: int = 60, +) -> GvisorSandbox: + """Get the global gVisor sandbox instance. + + Args: + runsc_path: Path to runsc binary + root_dir: Directory for sandbox roots + timeout: Default timeout + + Returns: + GvisorSandbox instance + """ + global _sandbox_instance + if _sandbox_instance is None: + _sandbox_instance = GvisorSandbox( + runsc_path=runsc_path, + root_dir=root_dir, + timeout=timeout, + ) + return _sandbox_instance + + +async def execute_in_gvisor( + code: str, + language: str = "python", + timeout: int = 60, +) -> str: + """Convenience function to execute code in gVisor sandbox. + + Args: + code: Code to execute + language: Programming language + timeout: Timeout in seconds + + Returns: + Execution result + """ + sandbox = get_gvisor_sandbox() + return await sandbox.execute_code(code, language, timeout) diff --git a/core/nanobot/nanobot/agent/tools/sandbox_execution.py b/core/nanobot/nanobot/agent/tools/sandbox_execution.py new file mode 100644 index 0000000..7a4b9b5 --- /dev/null +++ b/core/nanobot/nanobot/agent/tools/sandbox_execution.py @@ -0,0 +1,238 @@ +"""Unified sandbox code execution tools.""" + +import logging +from enum import Enum +from pathlib import Path +from typing import Any + +logger = logging.getLogger(__name__) + + +class SandboxType(Enum): + """Available sandbox types.""" + GVISO = "gvisor" + BWRAP = "bwrap" + NONE = "none" + + +class SandboxCodeExecutionTool: + """Execute code in a secure sandbox environment. + + Supports both gVisor and Bubblewrap sandboxes for isolated execution. + """ + + def __init__( + self, + workspace: Path | None = None, + sandbox_type: SandboxType = SandboxType.BWRAP, + timeout: int = 60, + ): + """Initialize the sandbox code execution tool. + + Args: + workspace: Optional workspace path + sandbox_type: Type of sandbox to use + timeout: Default timeout in seconds + """ + self._workspace = workspace + self._sandbox_type = sandbox_type + self._timeout = timeout + self._executor = None + + @property + def name(self) -> str: + return "execute_code" + + @property + def description(self) -> str: + return """Execute code in a secure, isolated sandbox environment. +Use this tool to run Python, JavaScript, or Bash code safely. +The code runs in an isolated sandbox with limited resources and no network access. +Returns the stdout/stderr output from the execution.""" + + @property + def parameters(self) -> dict[str, Any]: + return { + "type": "object", + "properties": { + "code": { + "type": "string", + "description": "Code to execute in the sandbox", + }, + "language": { + "type": "string", + "description": "Programming language (python, javascript, bash)", + "default": "python", + }, + "timeout": { + "type": "integer", + "description": "Timeout in seconds", + "default": 60, + }, + }, + "required": ["code"], + } + + async def _get_executor(self): + """Lazy initialization of the sandbox executor.""" + if self._executor is None: + if self._sandbox_type == SandboxType.GVISO: + from nanobot.agent.tools.gvisor_sandbox import GvisorSandbox + self._executor = GvisorSandbox(timeout=self._timeout) + elif self._sandbox_type == SandboxType.BWRAP: + from nanobot.agent.tools.bwrap_sandbox import BwrapSandbox + self._executor = BwrapSandbox(timeout=self._timeout) + else: + raise RuntimeError("Sandbox type not configured") + return self._executor + + async def execute( + self, + code: str, + language: str = "python", + timeout: int | None = None, + **kwargs: Any, + ) -> str: + """Execute code in the sandbox. + + Args: + code: Code to execute + language: Programming language + timeout: Optional timeout override + + Returns: + Execution result as string + """ + timeout = timeout or self._timeout + + try: + executor = await self._get_executor() + result = await executor.execute_code(code, language, timeout) + + # Truncate long outputs + if len(result) > 10000: + result = result[:10000] + "\n... (output truncated)" + + return result + except Exception as e: + logger.exception("Code execution failed") + return f"Error executing code: {str(e)}" + + +class SandboxBashTool: + """Execute shell commands in a secure sandbox environment.""" + + def __init__( + self, + sandbox_type: SandboxType = SandboxType.BWRAP, + timeout: int = 60, + ): + """Initialize the sandbox bash tool. + + Args: + sandbox_type: Type of sandbox to use + timeout: Default timeout in seconds + """ + self._sandbox_type = sandbox_type + self._timeout = timeout + self._executor = None + + @property + def name(self) -> str: + return "sandbox_bash" + + @property + def description(self) -> str: + return """Execute shell commands in a secure, isolated sandbox environment. +Use this tool to run system commands safely without affecting the host system. +The command runs in an isolated sandbox with no network access and limited resources. +WARNING: This tool replaces the unsafe bash tool for sandboxed execution.""" + + @property + def parameters(self) -> dict[str, Any]: + return { + "type": "object", + "properties": { + "command": { + "type": "string", + "description": "Shell command to execute", + }, + "timeout": { + "type": "integer", + "description": "Timeout in seconds (default: 60, max: 300)", + "default": 60, + }, + }, + "required": ["command"], + } + + async def _get_executor(self): + """Lazy initialization of the sandbox executor.""" + if self._executor is None: + if self._sandbox_type == SandboxType.GVISO: + from nanobot.agent.tools.gvisor_sandbox import GvisorSandbox + self._executor = GvisorSandbox(timeout=self._timeout) + elif self._sandbox_type == SandboxType.BWRAP: + from nanobot.agent.tools.bwrap_sandbox import BwrapSandbox + self._executor = BwrapSandbox(timeout=self._timeout) + else: + raise RuntimeError("Sandbox type not configured") + return self._executor + + async def execute( + self, + command: str, + timeout: int | None = None, + **kwargs: Any, + ) -> str: + """Execute a command in the sandbox. + + Args: + command: Command to execute + timeout: Optional timeout override + + Returns: + Command output + """ + timeout = min(timeout or self._timeout, 300) + + try: + executor = await self._get_executor() + result = await executor.execute_command(command, timeout) + + # Truncate long outputs + if len(result) > 10000: + result = result[:10000] + "\n... (output truncated)" + + return result + except Exception as e: + logger.exception("Bash execution failed") + return f"Error executing command: {str(e)}" + + +def get_sandbox_tools( + workspace: Path | None = None, + sandbox_type: SandboxType = SandboxType.BWRAP, + timeout: int = 60, +) -> list: + """Get sandbox execution tools. + + Args: + workspace: Optional workspace path + sandbox_type: Type of sandbox to use + timeout: Default timeout in seconds + + Returns: + List of tool instances + """ + return [ + SandboxCodeExecutionTool( + workspace=workspace, + sandbox_type=sandbox_type, + timeout=timeout, + ), + SandboxBashTool( + sandbox_type=sandbox_type, + timeout=timeout, + ), + ]