Phase 7: Built-in Hooks (audit_log, dangerous_confirmation, security_scan) Phase 8: Plugin system (PluginManager, PluginSandbox, PluginManifest) Phase 9: Skills registry (SkillRegistry, local/plugin/MCP loaders) Phase 10: TeamLeader, RemoteTransport, BackgroundTaskManager
14 KiB
14 KiB
Phase 7:Hook 拦截层(Hook Interception Layer)
日期:2026-04-04 状态:待开始 前置依赖:Phase 6(工具系统重构) Demo参考:claw-code-main — toolHooks.ts, hook pipeline
1. 阶段目标
实现完整的 PreTool/PostTool Hook 拦截机制,允许在工具执行前、执行后、异常时进行拦截、修改、拒绝和审计。
Hook 是工具系统的基础设施,不是独立功能,必须依赖 Phase 6 的注册表。
2. Hook 类型定义
2.1 Hook 类型
# backend/app/agents/tools/hooks/types.py
class HookType(Enum):
"""Hook 类型"""
PRE_TOOL_USE = "pre_tool_use" # 工具执行前
POST_TOOL_USE = "post_tool_use" # 工具执行后
TOOL_ERROR = "tool_error" # 工具执行异常
TOOL_SKIP = "tool_skip" # 工具被跳过
class HookAction(Enum):
"""Hook 执行动作"""
ALLOW = "allow" # 允许执行
DENY = "deny" # 拒绝执行
MODIFY = "modify" # 修改参数
MODIFY_RESULT = "modify_result" # 修改结果
CONTINUE = "continue" # 继续(后置Hook用)
@dataclass
class HookContext:
"""Hook 执行上下文"""
tool_name: str
arguments: dict
phase: HookType
result: Any = None
error: Exception | None = None
session_id: str | None = None
user_id: str | None = None
metadata: dict = field(default_factory=dict)
@dataclass
class HookResult:
"""Hook 执行结果"""
action: HookAction
message: str | None = None
modified_args: dict | None = None
modified_result: Any = None
3. 内置 Hook 实现
3.1 危险操作确认 Hook
# backend/app/agents/tools/hooks/dangerous_confirmation.py
class DangerousConfirmationHook:
"""
危险操作确认 Hook
对危险操作(WRITE/EXTERNAL 权限)执行前确认
"""
DANGEROUS_PATTERNS = [
"delete",
"remove",
"drop",
"truncate",
"rm ",
"rmdir",
"shutdown",
"reboot",
"format",
"fdisk",
]
def __init__(self, confirmation_service: ConfirmationService):
self.confirmation_service = confirmation_service
async def pre_execute(self, ctx: HookContext) -> HookResult:
"""检查是否危险操作"""
manifest = tool_registry.get(ctx.tool_name)
# 只对 WRITE/EXTERNAL 权限的工具生效
if manifest.permission_class not in [PermissionClass.WRITE, PermissionClass.EXTERNAL]:
return HookResult(action=HookAction.ALLOW)
# 检查是否包含危险模式
args_str = str(ctx.arguments).lower()
for pattern in self.DANGEROUS_PATTERNS:
if pattern in args_str:
# 请求用户确认
confirmed = await self.confirmation_service.request_confirmation(
user_id=ctx.user_id,
message=f"危险操作:{ctx.tool_name}\n参数:{ctx.arguments}",
timeout_seconds=300
)
if not confirmed:
return HookResult(
action=HookAction.DENY,
message="用户拒绝执行危险操作"
)
return HookResult(action=HookAction.ALLOW)
3.2 安全扫描 Hook
# backend/app/agents/tools/hooks/security_scan.py
class SecurityScanHook:
"""
安全扫描 Hook
执行后扫描结果中的敏感信息
"""
SENSITIVE_PATTERNS = [
r"password\s*=\s*['\"][^'\"]+['\"]",
r"api[_-]?key\s*=\s*['\"][^'\"]+['\"]",
r"secret\s*=\s*['\"][^'\"]+['\"]",
r"-----BEGIN (RSA |EC |DSA )?PRIVATE KEY-----",
r"sk-[a-zA-Z0-9]{20,}",
]
async def post_execute(self, ctx: HookContext) -> HookResult:
"""扫描结果中的敏感信息"""
if ctx.result is None:
return HookResult(action=HookAction.CONTINUE)
result_str = str(ctx.result)
for pattern in self.SENSITIVE_PATTERNS:
if re.search(pattern, result_str, re.IGNORECASE):
# 脱敏处理
sanitized = re.sub(pattern, "[REDACTED]", result_str, flags=re.IGNORECASE)
return HookResult(
action=HookAction.MODIFY_RESULT,
message="检测到敏感信息,已脱敏",
modified_result=sanitized
)
return HookResult(action=HookAction.CONTINUE)
3.3 日志记录 Hook
# backend/app/agents/tools/hooks/audit_log.py
class AuditLogHook:
"""
审计日志 Hook
记录所有工具调用
"""
async def pre_execute(self, ctx: HookContext) -> HookResult:
"""记录调用前状态"""
logger.info(
"tool_call_start",
extra={
"tool": ctx.tool_name,
"args": ctx.arguments,
"session_id": ctx.session_id,
"user_id": ctx.user_id,
"timestamp": datetime.now().isoformat(),
}
)
return HookResult(action=HookAction.ALLOW)
async def post_execute(self, ctx: HookContext) -> HookResult:
"""记录调用后状态"""
logger.info(
"tool_call_end",
extra={
"tool": ctx.tool_name,
"success": ctx.error is None,
"duration_ms": ctx.metadata.get("duration_ms", 0),
"session_id": ctx.session_id,
"user_id": ctx.user_id,
"timestamp": datetime.now().isoformat(),
}
)
return HookResult(action=HookAction.CONTINUE)
async def on_error(self, ctx: HookContext) -> HookResult:
"""记录错误"""
logger.error(
"tool_call_error",
extra={
"tool": ctx.tool_name,
"error": str(ctx.error),
"error_type": type(ctx.error).__name__,
"session_id": ctx.session_id,
"user_id": ctx.user_id,
"timestamp": datetime.now().isoformat(),
}
)
return HookResult(action=HookAction.CONTINUE)
4. Hook 配置系统
4.1 Hook 配置格式
{
"hooks": {
"pre_tool_use": {
"enabled": true,
"handlers": [
{
"type": "audit_log",
"filter": null
},
{
"type": "dangerous_confirmation",
"filter": {
"permissions": ["WRITE", "EXTERNAL"]
}
}
]
},
"post_tool_use": {
"enabled": true,
"handlers": [
{
"type": "security_scan",
"filter": null
},
{
"type": "audit_log",
"filter": null
}
]
},
"tool_error": {
"enabled": true,
"handlers": [
{
"type": "audit_log",
"filter": null
}
]
}
}
}
4.2 HookManager
# backend/app/agents/tools/hooks/manager.py
class HookManager:
"""Hook 管理器"""
def __init__(self):
self._builtin_hooks: dict[str, type] = {
"audit_log": AuditLogHook,
"dangerous_confirmation": DangerousConfirmationHook,
"security_scan": SecurityScanHook,
}
self._config: HookConfig = {}
self._enabled = False
def load_config(self, config: dict):
"""加载 Hook 配置"""
self._config = config
self._enabled = config.get("enabled", True)
async def execute_pre_hooks(
self,
tool_name: str,
arguments: dict,
context: dict
) -> HookResult:
"""执行前置 Hook"""
if not self._enabled:
return HookResult(action=HookAction.ALLOW)
pre_config = self._config.get("pre_tool_use", {})
if not pre_config.get("enabled", True):
return HookResult(action=HookAction.ALLOW)
ctx = HookContext(
tool_name=tool_name,
arguments=arguments,
phase=HookType.PRE_TOOL_USE,
**context
)
for handler_config in pre_config.get("handlers", []):
handler = self._create_handler(handler_config)
if not self._should_apply(handler_config, tool_name):
continue
result = await handler.pre_execute(ctx)
if result.action == HookAction.DENY:
return result
elif result.action == HookAction.MODIFY:
ctx.arguments = result.modified_args
return HookResult(action=HookAction.ALLOW, modified_args=ctx.arguments)
async def execute_post_hooks(
self,
tool_name: str,
arguments: dict,
result: Any,
context: dict
) -> HookResult:
"""执行后置 Hook"""
if not self._enabled:
return HookResult(action=HookAction.CONTINUE, modified_result=result)
post_config = self._config.get("post_tool_use", {})
if not post_config.get("enabled", True):
return HookResult(action=HookAction.CONTINUE, modified_result=result)
ctx = HookContext(
tool_name=tool_name,
arguments=arguments,
phase=HookType.POST_TOOL_USE,
result=result,
**context
)
for handler_config in post_config.get("handlers", []):
handler = self._create_handler(handler_config)
if not self._should_apply(handler_config, tool_name):
continue
hook_result = await handler.post_execute(ctx)
if hook_result.action == HookAction.MODIFY_RESULT:
ctx.result = hook_result.modified_result
return HookResult(action=HookAction.CONTINUE, modified_result=ctx.result)
5. 用户自定义 Hook
5.1 Hook API
# backend/app/routers/hooks.py
@router.post("/api/hooks/config")
async def update_hook_config(
config: HookConfigUpdate,
current_user: User = Depends(get_current_user)
):
"""更新 Hook 配置"""
# 验证配置格式
await hook_manager.validate_config(config)
# 保存配置
await hook_config_service.save_config(
user_id=current_user.id,
config=config
)
# 重新加载
hook_manager.load_config(config)
return {"status": "ok"}
@router.get("/api/hooks/config")
async def get_hook_config(
current_user: User = Depends(get_current_user)
):
"""获取 Hook 配置"""
config = await hook_config_service.get_config(current_user.id)
return config
@router.get("/api/hooks/available")
async def list_available_hooks():
"""列出所有可用的 Hook"""
return {
"builtin": [
{"name": "audit_log", "description": "审计日志"},
{"name": "dangerous_confirmation", "description": "危险操作确认"},
{"name": "security_scan", "description": "安全扫描"},
],
"custom": hook_manager.list_custom_hooks()
}
6. 与工具系统集成
6.1 修改 ToolExecutor
# backend/app/agents/tools/executor.py
class ToolExecutor:
"""工具执行器(集成 Hook)"""
def __init__(
self,
registry: ToolRegistry,
hook_manager: HookManager
):
self.registry = registry
self.hook_manager = hook_manager
async def execute(
self,
tool_name: str,
arguments: dict,
context: dict
) -> ToolResult:
"""执行工具(带 Hook)"""
# 1. Pre Hook
pre_result = await self.hook_manager.execute_pre_hooks(
tool_name, arguments, context
)
if pre_result.action == HookAction.DENY:
return ToolResult(
success=False,
error=pre_result.message,
tool_name=tool_name
)
if pre_result.modified_args:
arguments = pre_result.modified_args
# 2. 执行工具
try:
manifest = self.registry.get(tool_name)
executor = self.registry.get_executor(tool_name)
start_time = datetime.now()
result = await executor(**arguments)
duration_ms = (datetime.now() - start_time).total_seconds() * 1000
# 3. Post Hook
post_result = await self.hook_manager.execute_post_hooks(
tool_name, arguments, result, context
)
if post_result.action == HookAction.MODIFY_RESULT:
result = post_result.modified_result
return ToolResult(
success=True,
result=result,
tool_name=tool_name,
duration_ms=duration_ms
)
except Exception as e:
# 4. Error Hook
error_result = await self.hook_manager.execute_error_hook(
tool_name, arguments, e, context
)
return ToolResult(
success=False,
error=str(e),
tool_name=tool_name
)
7. 文件结构
backend/app/agents/tools/hooks/
├── __init__.py
├── types.py # Hook 类型定义
├── manager.py # Hook 管理器
│
├── builtins/ # 内置 Hook
│ ├── __init__.py
│ ├── audit_log.py
│ ├── dangerous_confirmation.py
│ └── security_scan.py
│
└── custom/ # 用户自定义 Hook
├── __init__.py
└── loader.py
8. 验收标准
| 检查点 | 标准 |
|---|---|
| Pre Hook 执行 | 危险操作被正确拦截 |
| Post Hook 执行 | 结果中的敏感信息被正确脱敏 |
| Error Hook 执行 | 异常被正确记录 |
| 配置持久化 | Hook 配置可保存和加载 |
| API 可用 | 用户可通过 API 配置 Hook |
| 向后兼容 | 不配置 Hook 时系统正常运行 |
9. Demo 借鉴
| claw-code | Jarvis 对应 |
|---|---|
toolHooks.ts |
hooks/types.py, hooks/manager.py |
PreToolUse |
HookType.PRE_TOOL_USE |
PostToolUse |
HookType.POST_TOOL_USE |
| Hook 配置解析 | hooks/config.py |