feat(agents): Phase 7-10 hook system, plugins, skills, orchestration
Phase 7: Built-in Hooks (audit_log, dangerous_confirmation, security_scan) Phase 8: Plugin system (PluginManager, PluginSandbox, PluginManifest) Phase 9: Skills registry (SkillRegistry, local/plugin/MCP loaders) Phase 10: TeamLeader, RemoteTransport, BackgroundTaskManager
This commit is contained in:
111
backend/app/agents/plugins/sandbox.py
Normal file
111
backend/app/agents/plugins/sandbox.py
Normal file
@@ -0,0 +1,111 @@
|
||||
"""插件沙箱隔离 - Phase 8.3"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
from typing import Any
|
||||
|
||||
|
||||
class PluginSandbox:
|
||||
"""插件沙箱
|
||||
|
||||
提供插件执行隔离环境。
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self._allowed_paths: set[str] = set()
|
||||
self._denied_paths: set[str] = set()
|
||||
self._network_allowed: bool = False
|
||||
self._allowed_hosts: set[str] = set()
|
||||
|
||||
def set_file_permissions(
|
||||
self,
|
||||
allowed_paths: list[str] | None = None,
|
||||
denied_paths: list[str] | None = None,
|
||||
) -> None:
|
||||
"""设置文件访问权限
|
||||
|
||||
Args:
|
||||
allowed_paths: 允许访问的路径列表
|
||||
denied_paths: 禁止访问的路径列表
|
||||
"""
|
||||
self._allowed_paths = set(allowed_paths or [])
|
||||
self._denied_paths = set(denied_paths or [])
|
||||
|
||||
def set_network_permissions(
|
||||
self, allowed: bool, allowed_hosts: list[str] | None = None
|
||||
) -> None:
|
||||
"""设置网络访问权限
|
||||
|
||||
Args:
|
||||
allowed: 是否允许网络访问
|
||||
allowed_hosts: 允许访问的 host 列表
|
||||
"""
|
||||
self._network_allowed = allowed
|
||||
self._allowed_hosts = set(allowed_hosts or [])
|
||||
|
||||
def check_file_access(self, path: str) -> bool:
|
||||
"""检查文件访问权限
|
||||
|
||||
Args:
|
||||
path: 文件路径
|
||||
|
||||
Returns:
|
||||
是否允许访问
|
||||
"""
|
||||
# 如果有允许列表,只允许访问列表中的路径
|
||||
if self._allowed_paths:
|
||||
return path in self._allowed_paths or any(
|
||||
path.startswith(allowed) for allowed in self._allowed_paths
|
||||
)
|
||||
|
||||
# 如果有禁止列表,禁止访问列表中的路径
|
||||
if self._denied_paths:
|
||||
return not any(path.startswith(denied) for denied in self._denied_paths)
|
||||
|
||||
# 没有限制
|
||||
return True
|
||||
|
||||
def check_network_access(self, host: str) -> bool:
|
||||
"""检查网络访问权限
|
||||
|
||||
Args:
|
||||
host: 主机地址
|
||||
|
||||
Returns:
|
||||
是否允许访问
|
||||
"""
|
||||
if not self._network_allowed:
|
||||
return False
|
||||
|
||||
if self._allowed_hosts:
|
||||
return host in self._allowed_hosts or any(
|
||||
host.endswith(allowed) for allowed in self._allowed_hosts
|
||||
)
|
||||
|
||||
return True
|
||||
|
||||
def execute_in_sandbox(self, func: Any, *args, **kwargs) -> Any:
|
||||
"""在沙箱中执行函数
|
||||
|
||||
Args:
|
||||
func: 要执行的函数
|
||||
*args: 位置参数
|
||||
**kwargs: 关键字参数
|
||||
|
||||
Returns:
|
||||
函数返回值
|
||||
"""
|
||||
# 保存当前状态
|
||||
old_allowed_paths = self._allowed_paths.copy()
|
||||
old_denied_paths = self._denied_paths.copy()
|
||||
old_network_allowed = self._network_allowed
|
||||
old_allowed_hosts = self._allowed_hosts.copy()
|
||||
|
||||
try:
|
||||
return func(*args, **kwargs)
|
||||
finally:
|
||||
# 恢复状态
|
||||
self._allowed_paths = old_allowed_paths
|
||||
self._denied_paths = old_denied_paths
|
||||
self._network_allowed = old_network_allowed
|
||||
self._allowed_hosts = old_allowed_hosts
|
||||
Reference in New Issue
Block a user