""" 运行日志服务 提供统一的日志记录接口,支持分类存储和查询 """ import json import logging from datetime import datetime, timedelta, timezone from typing import Any, Optional from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, and_, desc, func, or_ from app.models.log import Log, LogType, LogLevel logger = logging.getLogger(__name__) # 日志级别映射 LEVEL_MAP = { "DEBUG": LogLevel.DEBUG, "INFO": LogLevel.INFO, "WARNING": LogLevel.WARNING, "ERROR": LogLevel.ERROR, } def parse_datetime_filter(value: Optional[str]) -> Optional[datetime]: if not value: return None normalized = value.strip() if not normalized: return None normalized = normalized.replace("Z", "+00:00") parsed = datetime.fromisoformat(normalized) if parsed.tzinfo is not None: parsed = parsed.astimezone(timezone.utc).replace(tzinfo=None) return parsed class LogService: def __init__(self, db: AsyncSession): self.db = db async def log( self, message: str, level: str = "info", log_type: str = "system", user_id: Optional[str] = None, source: Optional[str] = None, details: Optional[dict] = None, duration_ms: Optional[int] = None, request_id: Optional[str] = None, route: Optional[str] = None, method: Optional[str] = None, status_code: Optional[int] = None, error_type: Optional[str] = None, operation: Optional[str] = None, ) -> Log: """记录日志""" log_entry = Log( level=level, type=log_type, user_id=user_id, request_id=request_id, route=route, method=method, status_code=status_code, error_type=error_type, operation=operation, message=message, source=source, details=json.dumps(details, ensure_ascii=False) if details is not None else None, duration_ms=int(duration_ms) if duration_ms is not None else None, ) self.db.add(log_entry) await self.db.commit() await self.db.refresh(log_entry) return log_entry async def agent_log( self, message: str, user_id: Optional[str] = None, source: Optional[str] = None, details: Optional[dict] = None, duration_ms: Optional[int] = None, ) -> Log: """记录智能体调用日志""" return await self.log( message=message, level="info", log_type="agent", user_id=user_id, source=source, details=details, duration_ms=duration_ms, ) async def system_log( self, message: str, level: str = "info", source: Optional[str] = None, details: Optional[dict] = None, user_id: Optional[str] = None, request_id: Optional[str] = None, route: Optional[str] = None, method: Optional[str] = None, status_code: Optional[int] = None, error_type: Optional[str] = None, operation: Optional[str] = None, duration_ms: Optional[int] = None, ) -> Log: """记录系统运行日志""" return await self.log( message=message, level=level, log_type="system", user_id=user_id, source=source, details=details, request_id=request_id, route=route, method=method, status_code=status_code, error_type=error_type, operation=operation, duration_ms=duration_ms, ) async def chat_log( self, message: str, user_id: str, details: Optional[dict] = None, duration_ms: Optional[int] = None, ) -> Log: """记录问答日志""" return await self.log( message=message, level="info", log_type="chat", user_id=user_id, source="chat", details=details, duration_ms=duration_ms, ) def _build_conditions( self, log_type: Optional[str] = None, level: Optional[str] = None, user_id: Optional[str] = None, source: Optional[str] = None, request_id: Optional[str] = None, route: Optional[str] = None, operation: Optional[str] = None, status_code: Optional[int] = None, start_at: Optional[datetime] = None, end_at: Optional[datetime] = None, ) -> list[Any]: conditions = [] if log_type: conditions.append(Log.type == log_type) if level: conditions.append(Log.level == level) if user_id: conditions.append(or_(Log.user_id == user_id, Log.user_id.is_(None))) if source: conditions.append(Log.source == source) if request_id: conditions.append(Log.request_id == request_id) if route: conditions.append(Log.route == route) if operation: conditions.append(Log.operation == operation) if status_code is not None: conditions.append(Log.status_code == status_code) if start_at is not None: conditions.append(Log.created_at >= start_at) if end_at is not None: conditions.append(Log.created_at <= end_at) return conditions async def list_logs( self, log_type: Optional[str] = None, level: Optional[str] = None, user_id: Optional[str] = None, source: Optional[str] = None, request_id: Optional[str] = None, route: Optional[str] = None, operation: Optional[str] = None, status_code: Optional[int] = None, start_at: Optional[datetime] = None, end_at: Optional[datetime] = None, limit: int = 100, offset: int = 0, ) -> tuple[list[Log], int]: """ 查询日志列表 Returns: (logs, total_count) """ conditions = self._build_conditions( log_type=log_type, level=level, user_id=user_id, source=source, request_id=request_id, route=route, operation=operation, status_code=status_code, start_at=start_at, end_at=end_at, ) count_query = select(func.count(Log.id)) if conditions: count_query = count_query.where(and_(*conditions)) total_result = await self.db.execute(count_query) total = total_result.scalar() or 0 query = ( select(Log).where(and_(*conditions)) if conditions else select(Log) ).order_by(desc(Log.created_at)).limit(limit).offset(offset) result = await self.db.execute(query) logs = list(result.scalars().all()) return logs, total async def get_recent_logs( self, log_type: Optional[str] = None, user_id: Optional[str] = None, hours: int = 24, limit: int = 100, ) -> list[Log]: """获取最近的日志""" end_at = datetime.now(timezone.utc).replace(tzinfo=None) start_at = end_at - timedelta(hours=hours) conditions = self._build_conditions( log_type=log_type, user_id=user_id, start_at=start_at, end_at=end_at, ) query = select(Log).where(and_(*conditions)).order_by(desc(Log.created_at)).limit(limit) result = await self.db.execute(query) return list(result.scalars().all()) async def get_log_stats( self, log_type: Optional[str] = None, level: Optional[str] = None, user_id: Optional[str] = None, source: Optional[str] = None, request_id: Optional[str] = None, route: Optional[str] = None, operation: Optional[str] = None, status_code: Optional[int] = None, start_at: Optional[datetime] = None, end_at: Optional[datetime] = None, ) -> dict: """获取日志统计""" base_conditions = self._build_conditions( user_id=user_id, source=source, request_id=request_id, route=route, operation=operation, status_code=status_code, start_at=start_at, end_at=end_at, ) stats = { "total": 0, "by_type": {"agent": 0, "system": 0, "chat": 0}, "by_level": {"debug": 0, "info": 0, "warning": 0, "error": 0}, } total_conditions = list(base_conditions) if log_type: total_conditions.append(Log.type == log_type) if level: total_conditions.append(Log.level == level) total_query = select(func.count(Log.id)).where(and_(*total_conditions)) total_result = await self.db.execute(total_query) stats["total"] = total_result.scalar() or 0 for current_type in ["agent", "system", "chat"]: conditions = list(base_conditions) conditions.append(Log.type == current_type) if level: conditions.append(Log.level == level) query = select(func.count(Log.id)).where(and_(*conditions)) result = await self.db.execute(query) stats["by_type"][current_type] = result.scalar() or 0 for current_level in ["debug", "info", "warning", "error"]: conditions = list(base_conditions) if log_type: conditions.append(Log.type == log_type) conditions.append(Log.level == current_level) query = select(func.count(Log.id)).where(and_(*conditions)) result = await self.db.execute(query) stats["by_level"][current_level] = result.scalar() or 0 return stats def serialize_log(log: Log) -> dict[str, Any]: details = None if log.details: try: details = json.loads(log.details) except json.JSONDecodeError: details = {"raw": log.details} return { "id": log.id, "level": log.level, "type": log.type, "user_id": log.user_id, "request_id": log.request_id, "route": log.route, "method": log.method, "status_code": log.status_code, "error_type": log.error_type, "operation": log.operation, "message": log.message, "source": log.source, "details": details, "duration_ms": int(log.duration_ms) if log.duration_ms is not None else None, "created_at": log.created_at.replace(tzinfo=timezone.utc).isoformat() if log.created_at else None, "updated_at": log.updated_at.replace(tzinfo=timezone.utc).isoformat() if log.updated_at else None, }