"""审计日志 Hook - Phase 7.2 记录所有工具调用到审计日志。 """ from typing import Any from app.agents.tools.hooks.types import ( ExecutionContext, HookResult, HookType, ) from app.agents.tools.manifest import ToolCategory class AuditLogHook: """审计日志 Hook 记录所有工具调用的详细信息,包括: - 调用时间 - 工具名称 - 输入参数 - 执行结果 - 执行时长 - 用户 ID """ def __init__(self, log_path: str | None = None): """ Args: log_path: 日志文件路径,None 则输出到 stdout """ self.log_path = log_path self._logs: list[dict[str, Any]] = [] async def pre_tool_use(self, context: ExecutionContext) -> HookResult: """工具执行前记录""" log_entry = { "event": "pre_tool", "tool_name": context.tool_name, "input": context.tool_input, "user_id": context.user_id, "session_id": context.session_id, } self._logs.append(log_entry) self._write_log(log_entry) return HookResult( hook_name="audit_log", success=True, continue_execution=True, ) async def post_tool_use(self, context: ExecutionContext, result: Any) -> HookResult: """工具执行后记录""" log_entry = { "event": "post_tool", "tool_name": context.tool_name, "result": str(result)[:500] if result else None, "duration_ms": ( (context.end_time - context.start_time) * 1000 if context.start_time and context.end_time else None ), } self._logs.append(log_entry) self._write_log(log_entry) return HookResult( hook_name="audit_log", success=True, continue_execution=True, modified_output=result, ) async def tool_error(self, context: ExecutionContext, error: Exception) -> HookResult: """工具出错时记录""" log_entry = { "event": "tool_error", "tool_name": context.tool_name, "error": str(error), "error_type": type(error).__name__, } self._logs.append(log_entry) self._write_log(log_entry) return HookResult( hook_name="audit_log", success=False, continue_execution=True, error=str(error), ) def _write_log(self, entry: dict[str, Any]) -> None: """写入日志""" import json import datetime entry["timestamp"] = datetime.datetime.now().isoformat() if self.log_path: try: with open(self.log_path, "a", encoding="utf-8") as f: f.write(json.dumps(entry, ensure_ascii=False) + "\n") except Exception: # 日志写入失败不影响主流程 pass else: # 输出到 stdout print(f"[AUDIT] {json.dumps(entry, ensure_ascii=False)}") def get_logs(self) -> list[dict[str, Any]]: """获取所有日志""" return self._logs.copy() def clear_logs(self) -> None: """清空日志""" self._logs.clear()