Files
JARVIS/backend/app/agents/tools/hooks/builtins/dangerous_confirmation.py
WIN-JHFT4D3SIVT\caoxiaozhu a3fe4d24fc feat(agents): Phase 7-10 hook system, plugins, skills, orchestration
Phase 7: Built-in Hooks (audit_log, dangerous_confirmation, security_scan)
Phase 8: Plugin system (PluginManager, PluginSandbox, PluginManifest)
Phase 9: Skills registry (SkillRegistry, local/plugin/MCP loaders)
Phase 10: TeamLeader, RemoteTransport, BackgroundTaskManager
2026-04-04 22:56:27 +08:00

143 lines
3.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""危险操作确认 Hook - Phase 7.2
对危险操作要求用户确认。
"""
from typing import Any
from app.agents.tools.hooks.types import (
ExecutionContext,
HookResult,
)
from app.agents.tools.manifest import SideEffectScope
# 危险操作关键词
DANGEROUS_PATTERNS = [
# 文件操作
"delete",
"remove",
"rm ",
"rmdir",
"unlink",
"format",
"truncate",
# 系统操作
"shutdown",
"reboot",
"kill",
"pkill",
"sudo",
"chmod",
"chown",
# 数据操作
"drop",
"truncate",
"delete from",
"delete.*where",
"insert into.*select",
"update.*set",
# 网络操作
"curl",
"wget",
"nc ",
"netcat",
"ssh ",
"scp ",
"sftp ",
# 环境变量
"export.*secret",
"export.*key",
"export.*token",
]
class DangerousConfirmationHook:
"""危险操作确认 Hook
检查工具调用是否包含危险操作,如是则要求确认。
"""
def __init__(self, auto_block: bool = False):
"""
Args:
auto_block: True 表示自动拦截危险操作False 表示仅警告
"""
self.auto_block = auto_block
self._pending_confirmations: dict[str, bool] = {}
async def pre_tool_use(self, context: ExecutionContext) -> HookResult:
"""检查是否为危险操作"""
is_dangerous = self._check_dangerous(context.tool_name, context.tool_input)
if is_dangerous:
if self.auto_block:
return HookResult(
hook_name="dangerous_confirmation",
success=False,
continue_execution=False,
error=f"危险操作被自动拦截: {context.tool_name}",
metadata={"dangerous": True, "auto_blocked": True},
)
else:
# 标记需要确认
context.metadata["requires_confirmation"] = True
context.metadata["dangerous_operation"] = True
return HookResult(
hook_name="dangerous_confirmation",
success=True,
continue_execution=True,
metadata={"dangerous": True, "requires_confirmation": True},
)
return HookResult(
hook_name="dangerous_confirmation",
success=True,
continue_execution=True,
)
def _check_dangerous(self, tool_name: str, tool_input: dict[str, Any]) -> bool:
"""检查是否为危险操作"""
# 检查工具名称
dangerous_tools = [
"delete",
"remove",
"drop",
"truncate",
"kill",
"shutdown",
"reboot",
"bash",
"powershell",
"shell",
]
if tool_name.lower() in dangerous_tools:
return True
# 检查输入参数
input_str = str(tool_input).lower()
for pattern in DANGEROUS_PATTERNS:
if pattern.lower() in input_str:
return True
return False
def confirm(self, session_id: str, confirmed: bool) -> None:
"""确认危险操作
Args:
session_id: 会话 ID
confirmed: True 表示用户确认False 表示取消
"""
self._pending_confirmations[session_id] = confirmed
def is_confirmed(self, session_id: str) -> bool:
"""检查是否已确认"""
return self._pending_confirmations.get(session_id, False)
def clear_confirmation(self, session_id: str) -> None:
"""清除确认状态"""
self._pending_confirmations.pop(session_id, None)