# Phase 7:Hook 拦截层(Hook Interception Layer) 日期:2026-04-04 状态:待开始 前置依赖:Phase 6(工具系统重构) Demo参考:claw-code-main — toolHooks.ts, hook pipeline --- ## 1. 阶段目标 实现完整的 **PreTool/PostTool Hook 拦截机制**,允许在工具执行前、执行后、异常时进行拦截、修改、拒绝和审计。 **Hook 是工具系统的基础设施**,不是独立功能,必须依赖 Phase 6 的注册表。 --- ## 2. Hook 类型定义 ### 2.1 Hook 类型 ```python # backend/app/agents/tools/hooks/types.py class HookType(Enum): """Hook 类型""" PRE_TOOL_USE = "pre_tool_use" # 工具执行前 POST_TOOL_USE = "post_tool_use" # 工具执行后 TOOL_ERROR = "tool_error" # 工具执行异常 TOOL_SKIP = "tool_skip" # 工具被跳过 class HookAction(Enum): """Hook 执行动作""" ALLOW = "allow" # 允许执行 DENY = "deny" # 拒绝执行 MODIFY = "modify" # 修改参数 MODIFY_RESULT = "modify_result" # 修改结果 CONTINUE = "continue" # 继续(后置Hook用) @dataclass class HookContext: """Hook 执行上下文""" tool_name: str arguments: dict phase: HookType result: Any = None error: Exception | None = None session_id: str | None = None user_id: str | None = None metadata: dict = field(default_factory=dict) @dataclass class HookResult: """Hook 执行结果""" action: HookAction message: str | None = None modified_args: dict | None = None modified_result: Any = None ``` --- ## 3. 内置 Hook 实现 ### 3.1 危险操作确认 Hook ```python # backend/app/agents/tools/hooks/dangerous_confirmation.py class DangerousConfirmationHook: """ 危险操作确认 Hook 对危险操作(WRITE/EXTERNAL 权限)执行前确认 """ DANGEROUS_PATTERNS = [ "delete", "remove", "drop", "truncate", "rm ", "rmdir", "shutdown", "reboot", "format", "fdisk", ] def __init__(self, confirmation_service: ConfirmationService): self.confirmation_service = confirmation_service async def pre_execute(self, ctx: HookContext) -> HookResult: """检查是否危险操作""" manifest = tool_registry.get(ctx.tool_name) # 只对 WRITE/EXTERNAL 权限的工具生效 if manifest.permission_class not in [PermissionClass.WRITE, PermissionClass.EXTERNAL]: return HookResult(action=HookAction.ALLOW) # 检查是否包含危险模式 args_str = str(ctx.arguments).lower() for pattern in self.DANGEROUS_PATTERNS: if pattern in args_str: # 请求用户确认 confirmed = await self.confirmation_service.request_confirmation( user_id=ctx.user_id, message=f"危险操作:{ctx.tool_name}\n参数:{ctx.arguments}", timeout_seconds=300 ) if not confirmed: return HookResult( action=HookAction.DENY, message="用户拒绝执行危险操作" ) return HookResult(action=HookAction.ALLOW) ``` ### 3.2 安全扫描 Hook ```python # backend/app/agents/tools/hooks/security_scan.py class SecurityScanHook: """ 安全扫描 Hook 执行后扫描结果中的敏感信息 """ SENSITIVE_PATTERNS = [ r"password\s*=\s*['\"][^'\"]+['\"]", r"api[_-]?key\s*=\s*['\"][^'\"]+['\"]", r"secret\s*=\s*['\"][^'\"]+['\"]", r"-----BEGIN (RSA |EC |DSA )?PRIVATE KEY-----", r"sk-[a-zA-Z0-9]{20,}", ] async def post_execute(self, ctx: HookContext) -> HookResult: """扫描结果中的敏感信息""" if ctx.result is None: return HookResult(action=HookAction.CONTINUE) result_str = str(ctx.result) for pattern in self.SENSITIVE_PATTERNS: if re.search(pattern, result_str, re.IGNORECASE): # 脱敏处理 sanitized = re.sub(pattern, "[REDACTED]", result_str, flags=re.IGNORECASE) return HookResult( action=HookAction.MODIFY_RESULT, message="检测到敏感信息,已脱敏", modified_result=sanitized ) return HookResult(action=HookAction.CONTINUE) ``` ### 3.3 日志记录 Hook ```python # backend/app/agents/tools/hooks/audit_log.py class AuditLogHook: """ 审计日志 Hook 记录所有工具调用 """ async def pre_execute(self, ctx: HookContext) -> HookResult: """记录调用前状态""" logger.info( "tool_call_start", extra={ "tool": ctx.tool_name, "args": ctx.arguments, "session_id": ctx.session_id, "user_id": ctx.user_id, "timestamp": datetime.now().isoformat(), } ) return HookResult(action=HookAction.ALLOW) async def post_execute(self, ctx: HookContext) -> HookResult: """记录调用后状态""" logger.info( "tool_call_end", extra={ "tool": ctx.tool_name, "success": ctx.error is None, "duration_ms": ctx.metadata.get("duration_ms", 0), "session_id": ctx.session_id, "user_id": ctx.user_id, "timestamp": datetime.now().isoformat(), } ) return HookResult(action=HookAction.CONTINUE) async def on_error(self, ctx: HookContext) -> HookResult: """记录错误""" logger.error( "tool_call_error", extra={ "tool": ctx.tool_name, "error": str(ctx.error), "error_type": type(ctx.error).__name__, "session_id": ctx.session_id, "user_id": ctx.user_id, "timestamp": datetime.now().isoformat(), } ) return HookResult(action=HookAction.CONTINUE) ``` --- ## 4. Hook 配置系统 ### 4.1 Hook 配置格式 ```json { "hooks": { "pre_tool_use": { "enabled": true, "handlers": [ { "type": "audit_log", "filter": null }, { "type": "dangerous_confirmation", "filter": { "permissions": ["WRITE", "EXTERNAL"] } } ] }, "post_tool_use": { "enabled": true, "handlers": [ { "type": "security_scan", "filter": null }, { "type": "audit_log", "filter": null } ] }, "tool_error": { "enabled": true, "handlers": [ { "type": "audit_log", "filter": null } ] } } } ``` ### 4.2 HookManager ```python # backend/app/agents/tools/hooks/manager.py class HookManager: """Hook 管理器""" def __init__(self): self._builtin_hooks: dict[str, type] = { "audit_log": AuditLogHook, "dangerous_confirmation": DangerousConfirmationHook, "security_scan": SecurityScanHook, } self._config: HookConfig = {} self._enabled = False def load_config(self, config: dict): """加载 Hook 配置""" self._config = config self._enabled = config.get("enabled", True) async def execute_pre_hooks( self, tool_name: str, arguments: dict, context: dict ) -> HookResult: """执行前置 Hook""" if not self._enabled: return HookResult(action=HookAction.ALLOW) pre_config = self._config.get("pre_tool_use", {}) if not pre_config.get("enabled", True): return HookResult(action=HookAction.ALLOW) ctx = HookContext( tool_name=tool_name, arguments=arguments, phase=HookType.PRE_TOOL_USE, **context ) for handler_config in pre_config.get("handlers", []): handler = self._create_handler(handler_config) if not self._should_apply(handler_config, tool_name): continue result = await handler.pre_execute(ctx) if result.action == HookAction.DENY: return result elif result.action == HookAction.MODIFY: ctx.arguments = result.modified_args return HookResult(action=HookAction.ALLOW, modified_args=ctx.arguments) async def execute_post_hooks( self, tool_name: str, arguments: dict, result: Any, context: dict ) -> HookResult: """执行后置 Hook""" if not self._enabled: return HookResult(action=HookAction.CONTINUE, modified_result=result) post_config = self._config.get("post_tool_use", {}) if not post_config.get("enabled", True): return HookResult(action=HookAction.CONTINUE, modified_result=result) ctx = HookContext( tool_name=tool_name, arguments=arguments, phase=HookType.POST_TOOL_USE, result=result, **context ) for handler_config in post_config.get("handlers", []): handler = self._create_handler(handler_config) if not self._should_apply(handler_config, tool_name): continue hook_result = await handler.post_execute(ctx) if hook_result.action == HookAction.MODIFY_RESULT: ctx.result = hook_result.modified_result return HookResult(action=HookAction.CONTINUE, modified_result=ctx.result) ``` --- ## 5. 用户自定义 Hook ### 5.1 Hook API ```python # backend/app/routers/hooks.py @router.post("/api/hooks/config") async def update_hook_config( config: HookConfigUpdate, current_user: User = Depends(get_current_user) ): """更新 Hook 配置""" # 验证配置格式 await hook_manager.validate_config(config) # 保存配置 await hook_config_service.save_config( user_id=current_user.id, config=config ) # 重新加载 hook_manager.load_config(config) return {"status": "ok"} @router.get("/api/hooks/config") async def get_hook_config( current_user: User = Depends(get_current_user) ): """获取 Hook 配置""" config = await hook_config_service.get_config(current_user.id) return config @router.get("/api/hooks/available") async def list_available_hooks(): """列出所有可用的 Hook""" return { "builtin": [ {"name": "audit_log", "description": "审计日志"}, {"name": "dangerous_confirmation", "description": "危险操作确认"}, {"name": "security_scan", "description": "安全扫描"}, ], "custom": hook_manager.list_custom_hooks() } ``` --- ## 6. 与工具系统集成 ### 6.1 修改 ToolExecutor ```python # backend/app/agents/tools/executor.py class ToolExecutor: """工具执行器(集成 Hook)""" def __init__( self, registry: ToolRegistry, hook_manager: HookManager ): self.registry = registry self.hook_manager = hook_manager async def execute( self, tool_name: str, arguments: dict, context: dict ) -> ToolResult: """执行工具(带 Hook)""" # 1. Pre Hook pre_result = await self.hook_manager.execute_pre_hooks( tool_name, arguments, context ) if pre_result.action == HookAction.DENY: return ToolResult( success=False, error=pre_result.message, tool_name=tool_name ) if pre_result.modified_args: arguments = pre_result.modified_args # 2. 执行工具 try: manifest = self.registry.get(tool_name) executor = self.registry.get_executor(tool_name) start_time = datetime.now() result = await executor(**arguments) duration_ms = (datetime.now() - start_time).total_seconds() * 1000 # 3. Post Hook post_result = await self.hook_manager.execute_post_hooks( tool_name, arguments, result, context ) if post_result.action == HookAction.MODIFY_RESULT: result = post_result.modified_result return ToolResult( success=True, result=result, tool_name=tool_name, duration_ms=duration_ms ) except Exception as e: # 4. Error Hook error_result = await self.hook_manager.execute_error_hook( tool_name, arguments, e, context ) return ToolResult( success=False, error=str(e), tool_name=tool_name ) ``` --- ## 7. 文件结构 ``` backend/app/agents/tools/hooks/ ├── __init__.py ├── types.py # Hook 类型定义 ├── manager.py # Hook 管理器 │ ├── builtins/ # 内置 Hook │ ├── __init__.py │ ├── audit_log.py │ ├── dangerous_confirmation.py │ └── security_scan.py │ └── custom/ # 用户自定义 Hook ├── __init__.py └── loader.py ``` --- ## 8. 验收标准 | 检查点 | 标准 | |--------|------| | Pre Hook 执行 | 危险操作被正确拦截 | | Post Hook 执行 | 结果中的敏感信息被正确脱敏 | | Error Hook 执行 | 异常被正确记录 | | 配置持久化 | Hook 配置可保存和加载 | | API 可用 | 用户可通过 API 配置 Hook | | 向后兼容 | 不配置 Hook 时系统正常运行 | --- ## 9. Demo 借鉴 | claw-code | Jarvis 对应 | |-----------|------------| | `toolHooks.ts` | `hooks/types.py`, `hooks/manager.py` | | `PreToolUse` | `HookType.PRE_TOOL_USE` | | `PostToolUse` | `HookType.POST_TOOL_USE` | | Hook 配置解析 | `hooks/config.py` |