Phase 7: Built-in Hooks (audit_log, dangerous_confirmation, security_scan) Phase 8: Plugin system (PluginManager, PluginSandbox, PluginManifest) Phase 9: Skills registry (SkillRegistry, local/plugin/MCP loaders) Phase 10: TeamLeader, RemoteTransport, BackgroundTaskManager
143 lines
3.7 KiB
Python
143 lines
3.7 KiB
Python
"""危险操作确认 Hook - Phase 7.2
|
||
|
||
对危险操作要求用户确认。
|
||
"""
|
||
|
||
from typing import Any
|
||
|
||
from app.agents.tools.hooks.types import (
|
||
ExecutionContext,
|
||
HookResult,
|
||
)
|
||
from app.agents.tools.manifest import SideEffectScope
|
||
|
||
|
||
# 危险操作关键词
|
||
DANGEROUS_PATTERNS = [
|
||
# 文件操作
|
||
"delete",
|
||
"remove",
|
||
"rm ",
|
||
"rmdir",
|
||
"unlink",
|
||
"format",
|
||
"truncate",
|
||
# 系统操作
|
||
"shutdown",
|
||
"reboot",
|
||
"kill",
|
||
"pkill",
|
||
"sudo",
|
||
"chmod",
|
||
"chown",
|
||
# 数据操作
|
||
"drop",
|
||
"truncate",
|
||
"delete from",
|
||
"delete.*where",
|
||
"insert into.*select",
|
||
"update.*set",
|
||
# 网络操作
|
||
"curl",
|
||
"wget",
|
||
"nc ",
|
||
"netcat",
|
||
"ssh ",
|
||
"scp ",
|
||
"sftp ",
|
||
# 环境变量
|
||
"export.*secret",
|
||
"export.*key",
|
||
"export.*token",
|
||
]
|
||
|
||
|
||
class DangerousConfirmationHook:
|
||
"""危险操作确认 Hook
|
||
|
||
检查工具调用是否包含危险操作,如是则要求确认。
|
||
"""
|
||
|
||
def __init__(self, auto_block: bool = False):
|
||
"""
|
||
Args:
|
||
auto_block: True 表示自动拦截危险操作,False 表示仅警告
|
||
"""
|
||
self.auto_block = auto_block
|
||
self._pending_confirmations: dict[str, bool] = {}
|
||
|
||
async def pre_tool_use(self, context: ExecutionContext) -> HookResult:
|
||
"""检查是否为危险操作"""
|
||
is_dangerous = self._check_dangerous(context.tool_name, context.tool_input)
|
||
|
||
if is_dangerous:
|
||
if self.auto_block:
|
||
return HookResult(
|
||
hook_name="dangerous_confirmation",
|
||
success=False,
|
||
continue_execution=False,
|
||
error=f"危险操作被自动拦截: {context.tool_name}",
|
||
metadata={"dangerous": True, "auto_blocked": True},
|
||
)
|
||
else:
|
||
# 标记需要确认
|
||
context.metadata["requires_confirmation"] = True
|
||
context.metadata["dangerous_operation"] = True
|
||
return HookResult(
|
||
hook_name="dangerous_confirmation",
|
||
success=True,
|
||
continue_execution=True,
|
||
metadata={"dangerous": True, "requires_confirmation": True},
|
||
)
|
||
|
||
return HookResult(
|
||
hook_name="dangerous_confirmation",
|
||
success=True,
|
||
continue_execution=True,
|
||
)
|
||
|
||
def _check_dangerous(self, tool_name: str, tool_input: dict[str, Any]) -> bool:
|
||
"""检查是否为危险操作"""
|
||
# 检查工具名称
|
||
dangerous_tools = [
|
||
"delete",
|
||
"remove",
|
||
"drop",
|
||
"truncate",
|
||
"kill",
|
||
"shutdown",
|
||
"reboot",
|
||
"bash",
|
||
"powershell",
|
||
"shell",
|
||
]
|
||
|
||
if tool_name.lower() in dangerous_tools:
|
||
return True
|
||
|
||
# 检查输入参数
|
||
input_str = str(tool_input).lower()
|
||
|
||
for pattern in DANGEROUS_PATTERNS:
|
||
if pattern.lower() in input_str:
|
||
return True
|
||
|
||
return False
|
||
|
||
def confirm(self, session_id: str, confirmed: bool) -> None:
|
||
"""确认危险操作
|
||
|
||
Args:
|
||
session_id: 会话 ID
|
||
confirmed: True 表示用户确认,False 表示取消
|
||
"""
|
||
self._pending_confirmations[session_id] = confirmed
|
||
|
||
def is_confirmed(self, session_id: str) -> bool:
|
||
"""检查是否已确认"""
|
||
return self._pending_confirmations.get(session_id, False)
|
||
|
||
def clear_confirmation(self, session_id: str) -> None:
|
||
"""清除确认状态"""
|
||
self._pending_confirmations.pop(session_id, None)
|