Files
X-Agents/agent/app/security/audit.py

82 lines
2.1 KiB
Python
Raw Normal View History

"""
审计日志 - 记录所有 Agent 操作
"""
import json
from datetime import datetime
from typing import Any, Dict, Optional
from pathlib import Path
class AuditLogger:
"""审计日志记录器"""
def __init__(self, log_file: str = "audit.log"):
self.log_file = log_file
def log(
self,
action: str,
agent_id: str = "",
session_id: str = "",
user_id: str = "",
details: Dict[str, Any] = None,
result: str = "success"
):
"""记录审计日志"""
entry = {
"timestamp": datetime.now().isoformat(),
"action": action,
"agent_id": agent_id,
"session_id": session_id,
"user_id": user_id,
"details": details or {},
"result": result
}
# 写入文件
self._write_log(entry)
# TODO: 发送到 Go 后端
def log_tool_execution(
self,
tool_name: str,
params: Dict[str, Any],
user_id: str,
agent_id: str,
approved: bool,
result: Any
):
"""记录工具执行"""
self.log(
action="tool_execution",
agent_id=agent_id,
user_id=user_id,
details={
"tool_name": tool_name,
"params": params,
"approved": approved,
"result_preview": str(result)[:200] if result else None
},
result="approved" if approved else "pending_approval"
)
def log_error(self, action: str, error: str, **kwargs):
"""记录错误"""
self.log(
action=action,
details={"error": error, **kwargs},
result="error"
)
def _write_log(self, entry: dict):
"""写入日志文件"""
try:
log_path = Path(self.log_file)
log_path.parent.mkdir(parents=True, exist_ok=True)
with open(log_path, "a", encoding="utf-8") as f:
f.write(json.dumps(entry, ensure_ascii=False) + "\n")
except Exception as e:
print(f"Failed to write audit log: {e}")