"""危险操作确认 Hook - Phase 7.2 对危险操作要求用户确认。 """ from typing import Any from app.agents.tools.hooks.types import ( ExecutionContext, HookResult, ) from app.agents.tools.manifest import SideEffectScope # 危险操作关键词 DANGEROUS_PATTERNS = [ # 文件操作 "delete", "remove", "rm ", "rmdir", "unlink", "format", "truncate", # 系统操作 "shutdown", "reboot", "kill", "pkill", "sudo", "chmod", "chown", # 数据操作 "drop", "truncate", "delete from", "delete.*where", "insert into.*select", "update.*set", # 网络操作 "curl", "wget", "nc ", "netcat", "ssh ", "scp ", "sftp ", # 环境变量 "export.*secret", "export.*key", "export.*token", ] class DangerousConfirmationHook: """危险操作确认 Hook 检查工具调用是否包含危险操作,如是则要求确认。 """ def __init__(self, auto_block: bool = False): """ Args: auto_block: True 表示自动拦截危险操作,False 表示仅警告 """ self.auto_block = auto_block self._pending_confirmations: dict[str, bool] = {} async def pre_tool_use(self, context: ExecutionContext) -> HookResult: """检查是否为危险操作""" is_dangerous = self._check_dangerous(context.tool_name, context.tool_input) if is_dangerous: if self.auto_block: return HookResult( hook_name="dangerous_confirmation", success=False, continue_execution=False, error=f"危险操作被自动拦截: {context.tool_name}", metadata={"dangerous": True, "auto_blocked": True}, ) else: # 标记需要确认 context.metadata["requires_confirmation"] = True context.metadata["dangerous_operation"] = True return HookResult( hook_name="dangerous_confirmation", success=True, continue_execution=True, metadata={"dangerous": True, "requires_confirmation": True}, ) return HookResult( hook_name="dangerous_confirmation", success=True, continue_execution=True, ) def _check_dangerous(self, tool_name: str, tool_input: dict[str, Any]) -> bool: """检查是否为危险操作""" # 检查工具名称 dangerous_tools = [ "delete", "remove", "drop", "truncate", "kill", "shutdown", "reboot", "bash", "powershell", "shell", ] if tool_name.lower() in dangerous_tools: return True # 检查输入参数 input_str = str(tool_input).lower() for pattern in DANGEROUS_PATTERNS: if pattern.lower() in input_str: return True return False def confirm(self, session_id: str, confirmed: bool) -> None: """确认危险操作 Args: session_id: 会话 ID confirmed: True 表示用户确认,False 表示取消 """ self._pending_confirmations[session_id] = confirmed def is_confirmed(self, session_id: str) -> bool: """检查是否已确认""" return self._pending_confirmations.get(session_id, False) def clear_confirmation(self, session_id: str) -> None: """清除确认状态""" self._pending_confirmations.pop(session_id, None)