143 lines
3.7 KiB
Python
143 lines
3.7 KiB
Python
|
|
"""危险操作确认 Hook - Phase 7.2
|
|||
|
|
|
|||
|
|
对危险操作要求用户确认。
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
from typing import Any
|
|||
|
|
|
|||
|
|
from app.agents.tools.hooks.types import (
|
|||
|
|
ExecutionContext,
|
|||
|
|
HookResult,
|
|||
|
|
)
|
|||
|
|
from app.agents.tools.manifest import SideEffectScope
|
|||
|
|
|
|||
|
|
|
|||
|
|
# 危险操作关键词
|
|||
|
|
DANGEROUS_PATTERNS = [
|
|||
|
|
# 文件操作
|
|||
|
|
"delete",
|
|||
|
|
"remove",
|
|||
|
|
"rm ",
|
|||
|
|
"rmdir",
|
|||
|
|
"unlink",
|
|||
|
|
"format",
|
|||
|
|
"truncate",
|
|||
|
|
# 系统操作
|
|||
|
|
"shutdown",
|
|||
|
|
"reboot",
|
|||
|
|
"kill",
|
|||
|
|
"pkill",
|
|||
|
|
"sudo",
|
|||
|
|
"chmod",
|
|||
|
|
"chown",
|
|||
|
|
# 数据操作
|
|||
|
|
"drop",
|
|||
|
|
"truncate",
|
|||
|
|
"delete from",
|
|||
|
|
"delete.*where",
|
|||
|
|
"insert into.*select",
|
|||
|
|
"update.*set",
|
|||
|
|
# 网络操作
|
|||
|
|
"curl",
|
|||
|
|
"wget",
|
|||
|
|
"nc ",
|
|||
|
|
"netcat",
|
|||
|
|
"ssh ",
|
|||
|
|
"scp ",
|
|||
|
|
"sftp ",
|
|||
|
|
# 环境变量
|
|||
|
|
"export.*secret",
|
|||
|
|
"export.*key",
|
|||
|
|
"export.*token",
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
|
|||
|
|
class DangerousConfirmationHook:
|
|||
|
|
"""危险操作确认 Hook
|
|||
|
|
|
|||
|
|
检查工具调用是否包含危险操作,如是则要求确认。
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
def __init__(self, auto_block: bool = False):
|
|||
|
|
"""
|
|||
|
|
Args:
|
|||
|
|
auto_block: True 表示自动拦截危险操作,False 表示仅警告
|
|||
|
|
"""
|
|||
|
|
self.auto_block = auto_block
|
|||
|
|
self._pending_confirmations: dict[str, bool] = {}
|
|||
|
|
|
|||
|
|
async def pre_tool_use(self, context: ExecutionContext) -> HookResult:
|
|||
|
|
"""检查是否为危险操作"""
|
|||
|
|
is_dangerous = self._check_dangerous(context.tool_name, context.tool_input)
|
|||
|
|
|
|||
|
|
if is_dangerous:
|
|||
|
|
if self.auto_block:
|
|||
|
|
return HookResult(
|
|||
|
|
hook_name="dangerous_confirmation",
|
|||
|
|
success=False,
|
|||
|
|
continue_execution=False,
|
|||
|
|
error=f"危险操作被自动拦截: {context.tool_name}",
|
|||
|
|
metadata={"dangerous": True, "auto_blocked": True},
|
|||
|
|
)
|
|||
|
|
else:
|
|||
|
|
# 标记需要确认
|
|||
|
|
context.metadata["requires_confirmation"] = True
|
|||
|
|
context.metadata["dangerous_operation"] = True
|
|||
|
|
return HookResult(
|
|||
|
|
hook_name="dangerous_confirmation",
|
|||
|
|
success=True,
|
|||
|
|
continue_execution=True,
|
|||
|
|
metadata={"dangerous": True, "requires_confirmation": True},
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
return HookResult(
|
|||
|
|
hook_name="dangerous_confirmation",
|
|||
|
|
success=True,
|
|||
|
|
continue_execution=True,
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
def _check_dangerous(self, tool_name: str, tool_input: dict[str, Any]) -> bool:
|
|||
|
|
"""检查是否为危险操作"""
|
|||
|
|
# 检查工具名称
|
|||
|
|
dangerous_tools = [
|
|||
|
|
"delete",
|
|||
|
|
"remove",
|
|||
|
|
"drop",
|
|||
|
|
"truncate",
|
|||
|
|
"kill",
|
|||
|
|
"shutdown",
|
|||
|
|
"reboot",
|
|||
|
|
"bash",
|
|||
|
|
"powershell",
|
|||
|
|
"shell",
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
if tool_name.lower() in dangerous_tools:
|
|||
|
|
return True
|
|||
|
|
|
|||
|
|
# 检查输入参数
|
|||
|
|
input_str = str(tool_input).lower()
|
|||
|
|
|
|||
|
|
for pattern in DANGEROUS_PATTERNS:
|
|||
|
|
if pattern.lower() in input_str:
|
|||
|
|
return True
|
|||
|
|
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
def confirm(self, session_id: str, confirmed: bool) -> None:
|
|||
|
|
"""确认危险操作
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
session_id: 会话 ID
|
|||
|
|
confirmed: True 表示用户确认,False 表示取消
|
|||
|
|
"""
|
|||
|
|
self._pending_confirmations[session_id] = confirmed
|
|||
|
|
|
|||
|
|
def is_confirmed(self, session_id: str) -> bool:
|
|||
|
|
"""检查是否已确认"""
|
|||
|
|
return self._pending_confirmations.get(session_id, False)
|
|||
|
|
|
|||
|
|
def clear_confirmation(self, session_id: str) -> None:
|
|||
|
|
"""清除确认状态"""
|
|||
|
|
self._pending_confirmations.pop(session_id, None)
|