Files
X-Agents/agent/app/core/tools/sandbox/sandbox.py

268 lines
7.2 KiB
Python
Raw Normal View History

"""
沙盒执行环境 - 在项目内构建不依赖 Docker
提供安全的代码执行环境
"""
import subprocess
import tempfile
import os
import shutil
from typing import Dict, Any, Optional
from pathlib import Path
class SandboxConfig:
"""沙盒配置"""
# 资源限制
MAX_MEMORY_MB = 256 # 最大内存 (MB)
MAX_CPU_PERCENT = 50 # 最大 CPU 百分比
MAX_EXECUTION_TIME = 30 # 最大执行时间 (秒)
MAX_OUTPUT_SIZE = 1024 * 1024 # 最大输出大小 (bytes)
class Sandbox:
"""
沙盒执行器 - 使用 subprocess 隔离执行
安全特性
- 内存限制
- CPU限制
- 超时控制
- 网络隔离可选
- 临时文件隔离
"""
def __init__(self, config: Optional[SandboxConfig] = None):
self.config = config or SandboxConfig()
self.temp_dir = None
def _setup_temp_dir(self) -> str:
"""创建临时目录"""
self.temp_dir = tempfile.mkdtemp(prefix="sandbox_")
return self.temp_dir
def _cleanup(self):
"""清理临时目录"""
if self.temp_dir and os.path.exists(self.temp_dir):
try:
shutil.rmtree(self.temp_dir)
except Exception as e:
print(f"Cleanup error: {e}")
def execute(
self,
code: str,
language: str = "python",
timeout: Optional[int] = None
) -> Dict[str, Any]:
"""
在沙盒中执行代码
Args:
code: 要执行的代码
language: 语言类型 (python, javascript)
timeout: 超时时间
Returns:
执行结果
"""
timeout = timeout or self.config.MAX_EXECUTION_TIME
self._setup_temp_dir()
try:
if language == "python":
return self._execute_python(code, timeout)
elif language == "javascript":
return self._execute_javascript(code, timeout)
else:
return {
"success": False,
"error": f"Unsupported language: {language}"
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
finally:
self._cleanup()
def _execute_python(self, code: str, timeout: int) -> Dict[str, Any]:
"""执行 Python 代码"""
# 创建临时文件
temp_file = os.path.join(self.temp_dir, "code.py")
with open(temp_file, "w", encoding="utf-8") as f:
f.write(code)
try:
# 构建命令
cmd = ["python", temp_file]
# 执行
result = subprocess.run(
cmd,
capture_output=True,
timeout=timeout,
cwd=self.temp_dir, # 限制工作目录
env=self._get_restricted_env(), # 限制环境变量
)
# 检查输出大小
stdout = result.stdout.decode("utf-8", errors="replace")
stderr = result.stderr.decode("utf-8", errors="replace")
if len(stdout) > self.config.MAX_OUTPUT_SIZE:
stdout = stdout[:self.config.MAX_OUTPUT_SIZE] + "\n... (output truncated)"
return {
"success": result.returncode == 0,
"output": stdout,
"error": stderr if result.returncode != 0 else None,
"exit_code": result.returncode
}
except subprocess.TimeoutExpired:
return {
"success": False,
"error": f"Execution timeout ({timeout}s)",
"output": None
}
def _execute_javascript(self, code: str, timeout: int) -> Dict[str, Any]:
"""执行 JavaScript 代码"""
temp_file = os.path.join(self.temp_dir, "code.js")
with open(temp_file, "w", encoding="utf-8") as f:
f.write(code)
try:
# 尝试使用 node
cmd = ["node", temp_file]
result = subprocess.run(
cmd,
capture_output=True,
timeout=timeout,
cwd=self.temp_dir,
)
stdout = result.stdout.decode("utf-8", errors="replace")
stderr = result.stderr.decode("utf-8", errors="replace")
return {
"success": result.returncode == 0,
"output": stdout,
"error": stderr if result.returncode != 0 else None,
"exit_code": result.returncode
}
except subprocess.TimeoutExpired:
return {
"success": False,
"error": f"Execution timeout ({timeout}s)",
"output": None
}
except FileNotFoundError:
return {
"success": False,
"error": "Node.js not installed",
"output": None
}
def _get_restricted_env(self) -> Dict[str, str]:
"""
获取受限的环境变量
移除敏感变量保留必要的 PATH
"""
# 保留 PATH移除其他敏感变量
safe_env = {
"PATH": os.environ.get("PATH", "/usr/bin:/bin"),
"LANG": "en_US.UTF-8",
"HOME": self.temp_dir,
"TMPDIR": self.temp_dir,
}
# 移除可能不安全的变量
unsafe_vars = [
"PYTHONPATH",
"PYTHONHOME",
"LD_PRELOAD",
"LD_LIBRARY_PATH",
]
for var in unsafe_vars:
if var in os.environ:
del os.environ[var]
return safe_env
class SafeEval:
"""
安全求值器 - 用于简单表达式计算
比沙盒更轻量适用于不需要完全隔离的场景
"""
# 安全函数白名单
SAFE_BUILTINS = {
"abs": abs,
"min": min,
"max": max,
"sum": sum,
"len": len,
"round": round,
"pow": pow,
"print": print,
"str": str,
"int": int,
"float": float,
"bool": bool,
"list": list,
"dict": dict,
"tuple": tuple,
"set": set,
"range": range,
"enumerate": enumerate,
"zip": zip,
"map": map,
"filter": filter,
"sorted": sorted,
"reversed": reversed,
}
# 安全数学常量
SAFE_MATH = {
"pi": 3.14159265359,
"e": 2.71828182846,
}
@classmethod
def eval(cls, expression: str) -> Any:
"""
安全地求值表达式
Args:
expression: 数学表达式
Returns:
计算结果
"""
# 预处理表达式
expression = expression.replace("sqrt", "**0.5")
# 构建安全命名空间
safe_namespace = {
**cls.SAFE_BUILTINS,
**cls.SAFE_MATH,
"__builtins__": {} # 禁用__builtins__
}
try:
result = eval(expression, safe_namespace)
return result
except Exception as e:
raise ValueError(f"Evaluation error: {e}")
# 全局沙盒实例
sandbox = Sandbox()