Phase 7: Built-in Hooks (audit_log, dangerous_confirmation, security_scan) Phase 8: Plugin system (PluginManager, PluginSandbox, PluginManifest) Phase 9: Skills registry (SkillRegistry, local/plugin/MCP loaders) Phase 10: TeamLeader, RemoteTransport, BackgroundTaskManager
112 lines
3.1 KiB
Python
112 lines
3.1 KiB
Python
"""插件沙箱隔离 - Phase 8.3"""
|
|
|
|
import os
|
|
import sys
|
|
from typing import Any
|
|
|
|
|
|
class PluginSandbox:
|
|
"""插件沙箱
|
|
|
|
提供插件执行隔离环境。
|
|
"""
|
|
|
|
def __init__(self):
|
|
self._allowed_paths: set[str] = set()
|
|
self._denied_paths: set[str] = set()
|
|
self._network_allowed: bool = False
|
|
self._allowed_hosts: set[str] = set()
|
|
|
|
def set_file_permissions(
|
|
self,
|
|
allowed_paths: list[str] | None = None,
|
|
denied_paths: list[str] | None = None,
|
|
) -> None:
|
|
"""设置文件访问权限
|
|
|
|
Args:
|
|
allowed_paths: 允许访问的路径列表
|
|
denied_paths: 禁止访问的路径列表
|
|
"""
|
|
self._allowed_paths = set(allowed_paths or [])
|
|
self._denied_paths = set(denied_paths or [])
|
|
|
|
def set_network_permissions(
|
|
self, allowed: bool, allowed_hosts: list[str] | None = None
|
|
) -> None:
|
|
"""设置网络访问权限
|
|
|
|
Args:
|
|
allowed: 是否允许网络访问
|
|
allowed_hosts: 允许访问的 host 列表
|
|
"""
|
|
self._network_allowed = allowed
|
|
self._allowed_hosts = set(allowed_hosts or [])
|
|
|
|
def check_file_access(self, path: str) -> bool:
|
|
"""检查文件访问权限
|
|
|
|
Args:
|
|
path: 文件路径
|
|
|
|
Returns:
|
|
是否允许访问
|
|
"""
|
|
# 如果有允许列表,只允许访问列表中的路径
|
|
if self._allowed_paths:
|
|
return path in self._allowed_paths or any(
|
|
path.startswith(allowed) for allowed in self._allowed_paths
|
|
)
|
|
|
|
# 如果有禁止列表,禁止访问列表中的路径
|
|
if self._denied_paths:
|
|
return not any(path.startswith(denied) for denied in self._denied_paths)
|
|
|
|
# 没有限制
|
|
return True
|
|
|
|
def check_network_access(self, host: str) -> bool:
|
|
"""检查网络访问权限
|
|
|
|
Args:
|
|
host: 主机地址
|
|
|
|
Returns:
|
|
是否允许访问
|
|
"""
|
|
if not self._network_allowed:
|
|
return False
|
|
|
|
if self._allowed_hosts:
|
|
return host in self._allowed_hosts or any(
|
|
host.endswith(allowed) for allowed in self._allowed_hosts
|
|
)
|
|
|
|
return True
|
|
|
|
def execute_in_sandbox(self, func: Any, *args, **kwargs) -> Any:
|
|
"""在沙箱中执行函数
|
|
|
|
Args:
|
|
func: 要执行的函数
|
|
*args: 位置参数
|
|
**kwargs: 关键字参数
|
|
|
|
Returns:
|
|
函数返回值
|
|
"""
|
|
# 保存当前状态
|
|
old_allowed_paths = self._allowed_paths.copy()
|
|
old_denied_paths = self._denied_paths.copy()
|
|
old_network_allowed = self._network_allowed
|
|
old_allowed_hosts = self._allowed_hosts.copy()
|
|
|
|
try:
|
|
return func(*args, **kwargs)
|
|
finally:
|
|
# 恢复状态
|
|
self._allowed_paths = old_allowed_paths
|
|
self._denied_paths = old_denied_paths
|
|
self._network_allowed = old_network_allowed
|
|
self._allowed_hosts = old_allowed_hosts
|