From a27736a8321441d84b50557968469d5a71a076b5 Mon Sep 17 00:00:00 2001 From: "DESKTOP-72TV0V4\\caoxiaozhu" Date: Sat, 21 Mar 2026 22:11:41 +0800 Subject: [PATCH] feat(logs): unify filtering across list and stats Make runtime log queries support request correlation and date-range diagnostics with shared filtering semantics so the log page can use one consistent contract. Co-Authored-By: Claude Sonnet 4.6 --- backend/app/routers/log.py | 74 ++++- backend/app/services/log_service.py | 289 +++++++++++------- .../backend/app/services/test_log_service.py | 175 +++++++++++ frontend/src/api/log.ts | 40 ++- 4 files changed, 446 insertions(+), 132 deletions(-) create mode 100644 backend/tests/backend/app/services/test_log_service.py diff --git a/backend/app/routers/log.py b/backend/app/routers/log.py index 1608c90..9458dc9 100644 --- a/backend/app/routers/log.py +++ b/backend/app/routers/log.py @@ -1,11 +1,12 @@ -from fastapi import APIRouter, Depends, Query +from datetime import datetime +from fastapi import APIRouter, Depends, HTTPException, Query from sqlalchemy.ext.asyncio import AsyncSession from pydantic import BaseModel -from typing import Optional +from typing import Any, Optional from app.database import get_db from app.models.user import User from app.routers.auth import get_current_user -from app.services.log_service import LogService +from app.services.log_service import LogService, parse_datetime_filter, serialize_log router = APIRouter(prefix="/api/logs", tags=["Log"]) @@ -15,14 +16,18 @@ class LogOut(BaseModel): level: str type: str user_id: Optional[str] + request_id: Optional[str] + route: Optional[str] + method: Optional[str] + status_code: Optional[int] + error_type: Optional[str] + operation: Optional[str] message: str source: Optional[str] - details: Optional[str] - duration_ms: Optional[str] - created_at: str - updated_at: str - - model_config = {"from_attributes": True} + details: Optional[dict[str, Any]] + duration_ms: Optional[int] + created_at: Optional[str] + updated_at: Optional[str] class LogStatsOut(BaseModel): @@ -43,12 +48,23 @@ async def list_logs( log_type: Optional[str] = Query(None, description="日志类型: agent/system/chat"), level: Optional[str] = Query(None, description="日志级别: debug/info/warning/error"), source: Optional[str] = Query(None, description="来源模块"), + request_id: Optional[str] = Query(None, description="请求 ID"), + route: Optional[str] = Query(None, description="路由"), + operation: Optional[str] = Query(None, description="业务操作"), + status_code: Optional[int] = Query(None, description="HTTP 状态码"), + start_at: Optional[str] = Query(None, description="开始时间 ISO"), + end_at: Optional[str] = Query(None, description="结束时间 ISO"), page: int = Query(1, ge=1), page_size: int = Query(50, ge=1, le=200), current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): """查询日志列表""" + start_dt = parse_datetime_filter(start_at) + end_dt = parse_datetime_filter(end_at) + if start_dt and end_dt and start_dt > end_dt: + raise HTTPException(status_code=422, detail="开始时间不能晚于结束时间") + svc = LogService(db) offset = (page - 1) * page_size logs, total = await svc.list_logs( @@ -56,11 +72,17 @@ async def list_logs( level=level, user_id=current_user.id, source=source, + request_id=request_id, + route=route, + operation=operation, + status_code=status_code, + start_at=start_dt, + end_at=end_dt, limit=page_size, offset=offset, ) return LogQueryOut( - logs=[LogOut.model_validate(log) for log in logs], + logs=[LogOut.model_validate(serialize_log(log)) for log in logs], total=total, page=page, page_size=page_size, @@ -69,13 +91,37 @@ async def list_logs( @router.get("/stats", response_model=LogStatsOut) async def get_log_stats( - hours: int = Query(24, ge=1, le=168), + log_type: Optional[str] = Query(None, description="日志类型: agent/system/chat"), + level: Optional[str] = Query(None, description="日志级别: debug/info/warning/error"), + source: Optional[str] = Query(None, description="来源模块"), + request_id: Optional[str] = Query(None, description="请求 ID"), + route: Optional[str] = Query(None, description="路由"), + operation: Optional[str] = Query(None, description="业务操作"), + status_code: Optional[int] = Query(None, description="HTTP 状态码"), + start_at: Optional[str] = Query(None, description="开始时间 ISO"), + end_at: Optional[str] = Query(None, description="结束时间 ISO"), current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): """获取日志统计""" + start_dt = parse_datetime_filter(start_at) + end_dt = parse_datetime_filter(end_at) + if start_dt and end_dt and start_dt > end_dt: + raise HTTPException(status_code=422, detail="开始时间不能晚于结束时间") + svc = LogService(db) - stats = await svc.get_log_stats(hours=hours) + stats = await svc.get_log_stats( + log_type=log_type, + level=level, + user_id=current_user.id, + source=source, + request_id=request_id, + route=route, + operation=operation, + status_code=status_code, + start_at=start_dt, + end_at=end_dt, + ) return LogStatsOut(**stats) @@ -89,5 +135,5 @@ async def get_recent_logs( ): """获取最近的日志""" svc = LogService(db) - logs = await svc.get_recent_logs(log_type=log_type, hours=hours, limit=limit) - return [LogOut.model_validate(log) for log in logs] + logs = await svc.get_recent_logs(log_type=log_type, user_id=current_user.id, hours=hours, limit=limit) + return [LogOut.model_validate(serialize_log(log)) for log in logs] diff --git a/backend/app/services/log_service.py b/backend/app/services/log_service.py index bf824d9..4411fb2 100644 --- a/backend/app/services/log_service.py +++ b/backend/app/services/log_service.py @@ -4,10 +4,10 @@ """ import json import logging -from datetime import datetime, timedelta -from typing import Optional +from datetime import datetime, timedelta, timezone +from typing import Any, Optional from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy import select, and_, desc, func +from sqlalchemy import select, and_, desc, func, or_ from app.models.log import Log, LogType, LogLevel logger = logging.getLogger(__name__) @@ -21,6 +21,19 @@ LEVEL_MAP = { } +def parse_datetime_filter(value: Optional[str]) -> Optional[datetime]: + if not value: + return None + normalized = value.strip() + if not normalized: + return None + normalized = normalized.replace("Z", "+00:00") + parsed = datetime.fromisoformat(normalized) + if parsed.tzinfo is not None: + parsed = parsed.astimezone(timezone.utc).replace(tzinfo=None) + return parsed + + class LogService: def __init__(self, db: AsyncSession): self.db = db @@ -34,16 +47,28 @@ class LogService: source: Optional[str] = None, details: Optional[dict] = None, duration_ms: Optional[int] = None, + request_id: Optional[str] = None, + route: Optional[str] = None, + method: Optional[str] = None, + status_code: Optional[int] = None, + error_type: Optional[str] = None, + operation: Optional[str] = None, ) -> Log: """记录日志""" log_entry = Log( level=level, type=log_type, user_id=user_id, + request_id=request_id, + route=route, + method=method, + status_code=status_code, + error_type=error_type, + operation=operation, message=message, source=source, - details=json.dumps(details, ensure_ascii=False) if details else None, - duration_ms=str(duration_ms) if duration_ms else None, + details=json.dumps(details, ensure_ascii=False) if details is not None else None, + duration_ms=int(duration_ms) if duration_ms is not None else None, ) self.db.add(log_entry) await self.db.commit() @@ -75,15 +100,30 @@ class LogService: level: str = "info", source: Optional[str] = None, details: Optional[dict] = None, + user_id: Optional[str] = None, + request_id: Optional[str] = None, + route: Optional[str] = None, + method: Optional[str] = None, + status_code: Optional[int] = None, + error_type: Optional[str] = None, + operation: Optional[str] = None, + duration_ms: Optional[int] = None, ) -> Log: """记录系统运行日志""" return await self.log( message=message, level=level, log_type="system", - user_id=None, + user_id=user_id, source=source, details=details, + request_id=request_id, + route=route, + method=method, + status_code=status_code, + error_type=error_type, + operation=operation, + duration_ms=duration_ms, ) async def chat_log( @@ -104,12 +144,56 @@ class LogService: duration_ms=duration_ms, ) + def _build_conditions( + self, + log_type: Optional[str] = None, + level: Optional[str] = None, + user_id: Optional[str] = None, + source: Optional[str] = None, + request_id: Optional[str] = None, + route: Optional[str] = None, + operation: Optional[str] = None, + status_code: Optional[int] = None, + start_at: Optional[datetime] = None, + end_at: Optional[datetime] = None, + ) -> list[Any]: + conditions = [] + + if log_type: + conditions.append(Log.type == log_type) + if level: + conditions.append(Log.level == level) + if user_id: + conditions.append(or_(Log.user_id == user_id, Log.user_id.is_(None))) + if source: + conditions.append(Log.source == source) + if request_id: + conditions.append(Log.request_id == request_id) + if route: + conditions.append(Log.route == route) + if operation: + conditions.append(Log.operation == operation) + if status_code is not None: + conditions.append(Log.status_code == status_code) + if start_at is not None: + conditions.append(Log.created_at >= start_at) + if end_at is not None: + conditions.append(Log.created_at <= end_at) + + return conditions + async def list_logs( self, log_type: Optional[str] = None, level: Optional[str] = None, user_id: Optional[str] = None, source: Optional[str] = None, + request_id: Optional[str] = None, + route: Optional[str] = None, + operation: Optional[str] = None, + status_code: Optional[int] = None, + start_at: Optional[datetime] = None, + end_at: Optional[datetime] = None, limit: int = 100, offset: int = 0, ) -> tuple[list[Log], int]: @@ -119,28 +203,27 @@ class LogService: Returns: (logs, total_count) """ - conditions = [] + conditions = self._build_conditions( + log_type=log_type, + level=level, + user_id=user_id, + source=source, + request_id=request_id, + route=route, + operation=operation, + status_code=status_code, + start_at=start_at, + end_at=end_at, + ) - if log_type: - conditions.append(Log.type == log_type) - if level: - conditions.append(Log.level == level) - if user_id: - conditions.append(Log.user_id == user_id) - if source: - conditions.append(Log.source == source) - - # 统计总数 count_query = select(func.count(Log.id)) if conditions: count_query = count_query.where(and_(*conditions)) total_result = await self.db.execute(count_query) total = total_result.scalar() or 0 - # 查询列表 query = ( - select(Log) - .where(and_(*conditions)) if conditions else select(Log) + select(Log).where(and_(*conditions)) if conditions else select(Log) ).order_by(desc(Log.created_at)).limit(limit).offset(offset) result = await self.db.execute(query) @@ -151,28 +234,48 @@ class LogService: async def get_recent_logs( self, log_type: Optional[str] = None, + user_id: Optional[str] = None, hours: int = 24, limit: int = 100, ) -> list[Log]: """获取最近的日志""" - since = datetime.utcnow() - timedelta(hours=hours) - conditions = [Log.created_at >= since] - - if log_type: - conditions.append(Log.type == log_type) - - query = ( - select(Log) - .where(and_(*conditions)) - .order_by(desc(Log.created_at)) - .limit(limit) + end_at = datetime.now(timezone.utc).replace(tzinfo=None) + start_at = end_at - timedelta(hours=hours) + conditions = self._build_conditions( + log_type=log_type, + user_id=user_id, + start_at=start_at, + end_at=end_at, ) + + query = select(Log).where(and_(*conditions)).order_by(desc(Log.created_at)).limit(limit) result = await self.db.execute(query) return list(result.scalars().all()) - async def get_log_stats(self, hours: int = 24) -> dict: + async def get_log_stats( + self, + log_type: Optional[str] = None, + level: Optional[str] = None, + user_id: Optional[str] = None, + source: Optional[str] = None, + request_id: Optional[str] = None, + route: Optional[str] = None, + operation: Optional[str] = None, + status_code: Optional[int] = None, + start_at: Optional[datetime] = None, + end_at: Optional[datetime] = None, + ) -> dict: """获取日志统计""" - since = datetime.utcnow() - timedelta(hours=hours) + base_conditions = self._build_conditions( + user_id=user_id, + source=source, + request_id=request_id, + route=route, + operation=operation, + status_code=status_code, + start_at=start_at, + end_at=end_at, + ) stats = { "total": 0, @@ -180,83 +283,59 @@ class LogService: "by_level": {"debug": 0, "info": 0, "warning": 0, "error": 0}, } - # 按类型统计 - for log_type in ["agent", "system", "chat"]: - query = select(func.count(Log.id)).where( - and_(Log.type == log_type, Log.created_at >= since) - ) - result = await self.db.execute(query) - count = result.scalar() or 0 - stats["by_type"][log_type] = count - stats["total"] += count + total_conditions = list(base_conditions) + if log_type: + total_conditions.append(Log.type == log_type) + if level: + total_conditions.append(Log.level == level) + total_query = select(func.count(Log.id)).where(and_(*total_conditions)) + total_result = await self.db.execute(total_query) + stats["total"] = total_result.scalar() or 0 - # 按级别统计 - for level in ["debug", "info", "warning", "error"]: - query = select(func.count(Log.id)).where( - and_(Log.level == level, Log.created_at >= since) - ) + for current_type in ["agent", "system", "chat"]: + conditions = list(base_conditions) + conditions.append(Log.type == current_type) + if level: + conditions.append(Log.level == level) + query = select(func.count(Log.id)).where(and_(*conditions)) result = await self.db.execute(query) - stats["by_level"][level] = result.scalar() or 0 + stats["by_type"][current_type] = result.scalar() or 0 + + for current_level in ["debug", "info", "warning", "error"]: + conditions = list(base_conditions) + if log_type: + conditions.append(Log.type == log_type) + conditions.append(Log.level == current_level) + query = select(func.count(Log.id)).where(and_(*conditions)) + result = await self.db.execute(query) + stats["by_level"][current_level] = result.scalar() or 0 return stats -# 全局日志记录函数,方便各处调用 -_global_db_session = None - - -def set_log_session(db: AsyncSession): - """设置全局日志会话""" - global _global_db_session - _global_db_session = db - - -def get_log_session() -> Optional[AsyncSession]: - """获取全局日志会话""" - return _global_db_session - - -async def log_agent_event( - message: str, - user_id: Optional[str] = None, - source: Optional[str] = None, - details: Optional[dict] = None, - duration_ms: Optional[int] = None, -): - """记录智能体事件到数据库""" - if _global_db_session: +def serialize_log(log: Log) -> dict[str, Any]: + details = None + if log.details: try: - svc = LogService(_global_db_session) - await svc.agent_log(message, user_id, source, details, duration_ms) - except Exception as e: - logger.error(f"Failed to log agent event: {e}") + details = json.loads(log.details) + except json.JSONDecodeError: + details = {"raw": log.details} - -async def log_system_event( - message: str, - level: str = "info", - source: Optional[str] = None, - details: Optional[dict] = None, -): - """记录系统事件到数据库""" - if _global_db_session: - try: - svc = LogService(_global_db_session) - await svc.system_log(message, level, source, details) - except Exception as e: - logger.error(f"Failed to log system event: {e}") - - -async def log_chat_event( - message: str, - user_id: str, - details: Optional[dict] = None, - duration_ms: Optional[int] = None, -): - """记录聊天事件到数据库""" - if _global_db_session: - try: - svc = LogService(_global_db_session) - await svc.chat_log(message, user_id, details, duration_ms) - except Exception as e: - logger.error(f"Failed to log chat event: {e}") + return { + "id": log.id, + "level": log.level, + "type": log.type, + "user_id": log.user_id, + "request_id": log.request_id, + "route": log.route, + "method": log.method, + "status_code": log.status_code, + "error_type": log.error_type, + "operation": log.operation, + "message": log.message, + "source": log.source, + "details": details, + "duration_ms": int(log.duration_ms) if log.duration_ms is not None else None, + "created_at": log.created_at.replace(tzinfo=timezone.utc).isoformat() if log.created_at else None, + "updated_at": log.updated_at.replace(tzinfo=timezone.utc).isoformat() if log.updated_at else None, + } diff --git a/backend/tests/backend/app/services/test_log_service.py b/backend/tests/backend/app/services/test_log_service.py new file mode 100644 index 0000000..ced8691 --- /dev/null +++ b/backend/tests/backend/app/services/test_log_service.py @@ -0,0 +1,175 @@ +import json +from datetime import datetime, timedelta + +import pytest +from httpx import ASGITransport, AsyncClient +from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine + +import app.models # noqa: F401 +from app.database import Base +from app.main import app +from app.models.log import Log +from app.models.user import User +from app.routers.auth import get_current_user +from app.database import get_db +from app.services.auth_service import get_password_hash + + +@pytest.fixture +async def log_test_env(tmp_path): + db_path = tmp_path / 'test_logs.db' + engine = create_async_engine(f"sqlite+aiosqlite:///{db_path}", future=True) + session_factory = async_sessionmaker(engine, expire_on_commit=False) + + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.create_all) + + async with session_factory() as session: + current_user = User( + email='tester@example.com', + hashed_password=get_password_hash('secret123'), + full_name='Tester', + ) + other_user = User( + email='other@example.com', + hashed_password=get_password_hash('secret123'), + full_name='Other', + ) + session.add_all([current_user, other_user]) + await session.flush() + + now = datetime.utcnow() + session.add_all( + [ + Log( + level='error', + type='system', + user_id=current_user.id, + request_id='req-target', + route='/api/settings', + method='PUT', + status_code=500, + operation='settings.save', + message='target error', + source='settings', + details=json.dumps({'scope': 'target'}), + created_at=now - timedelta(minutes=30), + updated_at=now - timedelta(minutes=30), + ), + Log( + level='info', + type='system', + user_id=None, + request_id='req-global', + route='/api/health', + method='GET', + status_code=200, + operation='health.check', + message='global info', + source='http', + details=json.dumps({'scope': 'global'}), + created_at=now - timedelta(minutes=20), + updated_at=now - timedelta(minutes=20), + ), + Log( + level='error', + type='system', + user_id=other_user.id, + request_id='req-other', + route='/api/secret', + method='GET', + status_code=500, + operation='secret.fail', + message='other user error', + source='secret', + details=json.dumps({'scope': 'other'}), + created_at=now - timedelta(minutes=10), + updated_at=now - timedelta(minutes=10), + ), + Log( + level='warning', + type='chat', + user_id=current_user.id, + request_id='req-old', + route='/api/chat', + method='POST', + status_code=429, + operation='chat.send', + message='old warning', + source='chat', + details=json.dumps({'scope': 'old'}), + created_at=now - timedelta(days=10), + updated_at=now - timedelta(days=10), + ), + ] + ) + await session.commit() + await session.refresh(current_user) + + async def override_get_db(): + async with session_factory() as session: + yield session + + async def override_get_current_user(): + return current_user + + app.dependency_overrides[get_db] = override_get_db + app.dependency_overrides[get_current_user] = override_get_current_user + + try: + yield + finally: + app.dependency_overrides.clear() + await engine.dispose() + + +@pytest.mark.asyncio +async def test_logs_list_filters_by_route_and_hides_other_user_records(log_test_env): + transport = ASGITransport(app=app) + async with AsyncClient(transport=transport, base_url='http://testserver') as client: + response = await client.get('/api/logs', params={'route': '/api/settings'}) + + assert response.status_code == 200 + payload = response.json() + assert payload['total'] == 1 + assert [log['request_id'] for log in payload['logs']] == ['req-target'] + + +@pytest.mark.asyncio +async def test_logs_stats_uses_same_filters_as_list(log_test_env): + transport = ASGITransport(app=app) + params = { + 'route': '/api/settings', + 'status_code': 500, + 'level': 'error', + } + + async with AsyncClient(transport=transport, base_url='http://testserver') as client: + list_response = await client.get('/api/logs', params=params) + stats_response = await client.get('/api/logs/stats', params=params) + + assert list_response.status_code == 200 + assert stats_response.status_code == 200 + + list_payload = list_response.json() + stats_payload = stats_response.json() + + assert list_payload['total'] == 1 + assert stats_payload['total'] == 1 + assert stats_payload['by_type']['system'] == 1 + assert stats_payload['by_level']['error'] == 1 + + +@pytest.mark.asyncio +async def test_logs_rejects_invalid_datetime_range(log_test_env): + transport = ASGITransport(app=app) + async with AsyncClient(transport=transport, base_url='http://testserver') as client: + response = await client.get( + '/api/logs', + params={ + 'start_at': '2026-03-21T12:00:00+00:00', + 'end_at': '2026-03-20T12:00:00+00:00', + }, + ) + + assert response.status_code == 422 diff --git a/frontend/src/api/log.ts b/frontend/src/api/log.ts index 043af39..d807fc8 100644 --- a/frontend/src/api/log.ts +++ b/frontend/src/api/log.ts @@ -6,12 +6,18 @@ export interface Log { level: 'debug' | 'info' | 'warning' | 'error' type: 'agent' | 'system' | 'chat' user_id: string | null + request_id: string | null + route: string | null + method: string | null + status_code: number | null + error_type: string | null + operation: string | null message: string source: string | null - details: string | null - duration_ms: string | null - created_at: string - updated_at: string + details: Record | null + duration_ms: number | null + created_at: string | null + updated_at: string | null } export interface LogStats { @@ -36,19 +42,27 @@ export interface LogQueryResult { page_size: number } +export interface LogQueryParams { + log_type?: string + level?: string + source?: string + request_id?: string + route?: string + operation?: string + status_code?: number + start_at?: string + end_at?: string + page?: number + page_size?: number +} + export const logApi = { - list: (params?: { - log_type?: string - level?: string - source?: string - page?: number - page_size?: number - }): Promise> => { + list: (params?: LogQueryParams): Promise> => { return api.get('/api/logs', { params }) }, - getStats: (hours?: number): Promise> => { - return api.get('/api/logs/stats', { params: { hours } }) + getStats: (params?: LogQueryParams): Promise> => { + return api.get('/api/logs/stats', { params }) }, getRecent: (params?: {