feat(logs): unify filtering across list and stats

Make runtime log queries support request correlation and date-range diagnostics with shared filtering semantics so the log page can use one consistent contract.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-21 22:11:41 +08:00
parent 204cb223a3
commit a27736a832
4 changed files with 446 additions and 132 deletions

View File

@@ -1,11 +1,12 @@
from fastapi import APIRouter, Depends, Query
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.ext.asyncio import AsyncSession
from pydantic import BaseModel
from typing import Optional
from typing import Any, Optional
from app.database import get_db
from app.models.user import User
from app.routers.auth import get_current_user
from app.services.log_service import LogService
from app.services.log_service import LogService, parse_datetime_filter, serialize_log
router = APIRouter(prefix="/api/logs", tags=["Log"])
@@ -15,14 +16,18 @@ class LogOut(BaseModel):
level: str
type: str
user_id: Optional[str]
request_id: Optional[str]
route: Optional[str]
method: Optional[str]
status_code: Optional[int]
error_type: Optional[str]
operation: Optional[str]
message: str
source: Optional[str]
details: Optional[str]
duration_ms: Optional[str]
created_at: str
updated_at: str
model_config = {"from_attributes": True}
details: Optional[dict[str, Any]]
duration_ms: Optional[int]
created_at: Optional[str]
updated_at: Optional[str]
class LogStatsOut(BaseModel):
@@ -43,12 +48,23 @@ async def list_logs(
log_type: Optional[str] = Query(None, description="日志类型: agent/system/chat"),
level: Optional[str] = Query(None, description="日志级别: debug/info/warning/error"),
source: Optional[str] = Query(None, description="来源模块"),
request_id: Optional[str] = Query(None, description="请求 ID"),
route: Optional[str] = Query(None, description="路由"),
operation: Optional[str] = Query(None, description="业务操作"),
status_code: Optional[int] = Query(None, description="HTTP 状态码"),
start_at: Optional[str] = Query(None, description="开始时间 ISO"),
end_at: Optional[str] = Query(None, description="结束时间 ISO"),
page: int = Query(1, ge=1),
page_size: int = Query(50, ge=1, le=200),
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""查询日志列表"""
start_dt = parse_datetime_filter(start_at)
end_dt = parse_datetime_filter(end_at)
if start_dt and end_dt and start_dt > end_dt:
raise HTTPException(status_code=422, detail="开始时间不能晚于结束时间")
svc = LogService(db)
offset = (page - 1) * page_size
logs, total = await svc.list_logs(
@@ -56,11 +72,17 @@ async def list_logs(
level=level,
user_id=current_user.id,
source=source,
request_id=request_id,
route=route,
operation=operation,
status_code=status_code,
start_at=start_dt,
end_at=end_dt,
limit=page_size,
offset=offset,
)
return LogQueryOut(
logs=[LogOut.model_validate(log) for log in logs],
logs=[LogOut.model_validate(serialize_log(log)) for log in logs],
total=total,
page=page,
page_size=page_size,
@@ -69,13 +91,37 @@ async def list_logs(
@router.get("/stats", response_model=LogStatsOut)
async def get_log_stats(
hours: int = Query(24, ge=1, le=168),
log_type: Optional[str] = Query(None, description="日志类型: agent/system/chat"),
level: Optional[str] = Query(None, description="日志级别: debug/info/warning/error"),
source: Optional[str] = Query(None, description="来源模块"),
request_id: Optional[str] = Query(None, description="请求 ID"),
route: Optional[str] = Query(None, description="路由"),
operation: Optional[str] = Query(None, description="业务操作"),
status_code: Optional[int] = Query(None, description="HTTP 状态码"),
start_at: Optional[str] = Query(None, description="开始时间 ISO"),
end_at: Optional[str] = Query(None, description="结束时间 ISO"),
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""获取日志统计"""
start_dt = parse_datetime_filter(start_at)
end_dt = parse_datetime_filter(end_at)
if start_dt and end_dt and start_dt > end_dt:
raise HTTPException(status_code=422, detail="开始时间不能晚于结束时间")
svc = LogService(db)
stats = await svc.get_log_stats(hours=hours)
stats = await svc.get_log_stats(
log_type=log_type,
level=level,
user_id=current_user.id,
source=source,
request_id=request_id,
route=route,
operation=operation,
status_code=status_code,
start_at=start_dt,
end_at=end_dt,
)
return LogStatsOut(**stats)
@@ -89,5 +135,5 @@ async def get_recent_logs(
):
"""获取最近的日志"""
svc = LogService(db)
logs = await svc.get_recent_logs(log_type=log_type, hours=hours, limit=limit)
return [LogOut.model_validate(log) for log in logs]
logs = await svc.get_recent_logs(log_type=log_type, user_id=current_user.id, hours=hours, limit=limit)
return [LogOut.model_validate(serialize_log(log)) for log in logs]

View File

@@ -4,10 +4,10 @@
"""
import json
import logging
from datetime import datetime, timedelta
from typing import Optional
from datetime import datetime, timedelta, timezone
from typing import Any, Optional
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, and_, desc, func
from sqlalchemy import select, and_, desc, func, or_
from app.models.log import Log, LogType, LogLevel
logger = logging.getLogger(__name__)
@@ -21,6 +21,19 @@ LEVEL_MAP = {
}
def parse_datetime_filter(value: Optional[str]) -> Optional[datetime]:
if not value:
return None
normalized = value.strip()
if not normalized:
return None
normalized = normalized.replace("Z", "+00:00")
parsed = datetime.fromisoformat(normalized)
if parsed.tzinfo is not None:
parsed = parsed.astimezone(timezone.utc).replace(tzinfo=None)
return parsed
class LogService:
def __init__(self, db: AsyncSession):
self.db = db
@@ -34,16 +47,28 @@ class LogService:
source: Optional[str] = None,
details: Optional[dict] = None,
duration_ms: Optional[int] = None,
request_id: Optional[str] = None,
route: Optional[str] = None,
method: Optional[str] = None,
status_code: Optional[int] = None,
error_type: Optional[str] = None,
operation: Optional[str] = None,
) -> Log:
"""记录日志"""
log_entry = Log(
level=level,
type=log_type,
user_id=user_id,
request_id=request_id,
route=route,
method=method,
status_code=status_code,
error_type=error_type,
operation=operation,
message=message,
source=source,
details=json.dumps(details, ensure_ascii=False) if details else None,
duration_ms=str(duration_ms) if duration_ms else None,
details=json.dumps(details, ensure_ascii=False) if details is not None else None,
duration_ms=int(duration_ms) if duration_ms is not None else None,
)
self.db.add(log_entry)
await self.db.commit()
@@ -75,15 +100,30 @@ class LogService:
level: str = "info",
source: Optional[str] = None,
details: Optional[dict] = None,
user_id: Optional[str] = None,
request_id: Optional[str] = None,
route: Optional[str] = None,
method: Optional[str] = None,
status_code: Optional[int] = None,
error_type: Optional[str] = None,
operation: Optional[str] = None,
duration_ms: Optional[int] = None,
) -> Log:
"""记录系统运行日志"""
return await self.log(
message=message,
level=level,
log_type="system",
user_id=None,
user_id=user_id,
source=source,
details=details,
request_id=request_id,
route=route,
method=method,
status_code=status_code,
error_type=error_type,
operation=operation,
duration_ms=duration_ms,
)
async def chat_log(
@@ -104,12 +144,56 @@ class LogService:
duration_ms=duration_ms,
)
def _build_conditions(
self,
log_type: Optional[str] = None,
level: Optional[str] = None,
user_id: Optional[str] = None,
source: Optional[str] = None,
request_id: Optional[str] = None,
route: Optional[str] = None,
operation: Optional[str] = None,
status_code: Optional[int] = None,
start_at: Optional[datetime] = None,
end_at: Optional[datetime] = None,
) -> list[Any]:
conditions = []
if log_type:
conditions.append(Log.type == log_type)
if level:
conditions.append(Log.level == level)
if user_id:
conditions.append(or_(Log.user_id == user_id, Log.user_id.is_(None)))
if source:
conditions.append(Log.source == source)
if request_id:
conditions.append(Log.request_id == request_id)
if route:
conditions.append(Log.route == route)
if operation:
conditions.append(Log.operation == operation)
if status_code is not None:
conditions.append(Log.status_code == status_code)
if start_at is not None:
conditions.append(Log.created_at >= start_at)
if end_at is not None:
conditions.append(Log.created_at <= end_at)
return conditions
async def list_logs(
self,
log_type: Optional[str] = None,
level: Optional[str] = None,
user_id: Optional[str] = None,
source: Optional[str] = None,
request_id: Optional[str] = None,
route: Optional[str] = None,
operation: Optional[str] = None,
status_code: Optional[int] = None,
start_at: Optional[datetime] = None,
end_at: Optional[datetime] = None,
limit: int = 100,
offset: int = 0,
) -> tuple[list[Log], int]:
@@ -119,28 +203,27 @@ class LogService:
Returns:
(logs, total_count)
"""
conditions = []
conditions = self._build_conditions(
log_type=log_type,
level=level,
user_id=user_id,
source=source,
request_id=request_id,
route=route,
operation=operation,
status_code=status_code,
start_at=start_at,
end_at=end_at,
)
if log_type:
conditions.append(Log.type == log_type)
if level:
conditions.append(Log.level == level)
if user_id:
conditions.append(Log.user_id == user_id)
if source:
conditions.append(Log.source == source)
# 统计总数
count_query = select(func.count(Log.id))
if conditions:
count_query = count_query.where(and_(*conditions))
total_result = await self.db.execute(count_query)
total = total_result.scalar() or 0
# 查询列表
query = (
select(Log)
.where(and_(*conditions)) if conditions else select(Log)
select(Log).where(and_(*conditions)) if conditions else select(Log)
).order_by(desc(Log.created_at)).limit(limit).offset(offset)
result = await self.db.execute(query)
@@ -151,28 +234,48 @@ class LogService:
async def get_recent_logs(
self,
log_type: Optional[str] = None,
user_id: Optional[str] = None,
hours: int = 24,
limit: int = 100,
) -> list[Log]:
"""获取最近的日志"""
since = datetime.utcnow() - timedelta(hours=hours)
conditions = [Log.created_at >= since]
if log_type:
conditions.append(Log.type == log_type)
query = (
select(Log)
.where(and_(*conditions))
.order_by(desc(Log.created_at))
.limit(limit)
end_at = datetime.now(timezone.utc).replace(tzinfo=None)
start_at = end_at - timedelta(hours=hours)
conditions = self._build_conditions(
log_type=log_type,
user_id=user_id,
start_at=start_at,
end_at=end_at,
)
query = select(Log).where(and_(*conditions)).order_by(desc(Log.created_at)).limit(limit)
result = await self.db.execute(query)
return list(result.scalars().all())
async def get_log_stats(self, hours: int = 24) -> dict:
async def get_log_stats(
self,
log_type: Optional[str] = None,
level: Optional[str] = None,
user_id: Optional[str] = None,
source: Optional[str] = None,
request_id: Optional[str] = None,
route: Optional[str] = None,
operation: Optional[str] = None,
status_code: Optional[int] = None,
start_at: Optional[datetime] = None,
end_at: Optional[datetime] = None,
) -> dict:
"""获取日志统计"""
since = datetime.utcnow() - timedelta(hours=hours)
base_conditions = self._build_conditions(
user_id=user_id,
source=source,
request_id=request_id,
route=route,
operation=operation,
status_code=status_code,
start_at=start_at,
end_at=end_at,
)
stats = {
"total": 0,
@@ -180,83 +283,59 @@ class LogService:
"by_level": {"debug": 0, "info": 0, "warning": 0, "error": 0},
}
# 按类型统计
for log_type in ["agent", "system", "chat"]:
query = select(func.count(Log.id)).where(
and_(Log.type == log_type, Log.created_at >= since)
)
result = await self.db.execute(query)
count = result.scalar() or 0
stats["by_type"][log_type] = count
stats["total"] += count
total_conditions = list(base_conditions)
if log_type:
total_conditions.append(Log.type == log_type)
if level:
total_conditions.append(Log.level == level)
total_query = select(func.count(Log.id)).where(and_(*total_conditions))
total_result = await self.db.execute(total_query)
stats["total"] = total_result.scalar() or 0
# 按级别统计
for level in ["debug", "info", "warning", "error"]:
query = select(func.count(Log.id)).where(
and_(Log.level == level, Log.created_at >= since)
)
for current_type in ["agent", "system", "chat"]:
conditions = list(base_conditions)
conditions.append(Log.type == current_type)
if level:
conditions.append(Log.level == level)
query = select(func.count(Log.id)).where(and_(*conditions))
result = await self.db.execute(query)
stats["by_level"][level] = result.scalar() or 0
stats["by_type"][current_type] = result.scalar() or 0
for current_level in ["debug", "info", "warning", "error"]:
conditions = list(base_conditions)
if log_type:
conditions.append(Log.type == log_type)
conditions.append(Log.level == current_level)
query = select(func.count(Log.id)).where(and_(*conditions))
result = await self.db.execute(query)
stats["by_level"][current_level] = result.scalar() or 0
return stats
# 全局日志记录函数,方便各处调用
_global_db_session = None
def set_log_session(db: AsyncSession):
"""设置全局日志会话"""
global _global_db_session
_global_db_session = db
def get_log_session() -> Optional[AsyncSession]:
"""获取全局日志会话"""
return _global_db_session
async def log_agent_event(
message: str,
user_id: Optional[str] = None,
source: Optional[str] = None,
details: Optional[dict] = None,
duration_ms: Optional[int] = None,
):
"""记录智能体事件到数据库"""
if _global_db_session:
def serialize_log(log: Log) -> dict[str, Any]:
details = None
if log.details:
try:
svc = LogService(_global_db_session)
await svc.agent_log(message, user_id, source, details, duration_ms)
except Exception as e:
logger.error(f"Failed to log agent event: {e}")
details = json.loads(log.details)
except json.JSONDecodeError:
details = {"raw": log.details}
async def log_system_event(
message: str,
level: str = "info",
source: Optional[str] = None,
details: Optional[dict] = None,
):
"""记录系统事件到数据库"""
if _global_db_session:
try:
svc = LogService(_global_db_session)
await svc.system_log(message, level, source, details)
except Exception as e:
logger.error(f"Failed to log system event: {e}")
async def log_chat_event(
message: str,
user_id: str,
details: Optional[dict] = None,
duration_ms: Optional[int] = None,
):
"""记录聊天事件到数据库"""
if _global_db_session:
try:
svc = LogService(_global_db_session)
await svc.chat_log(message, user_id, details, duration_ms)
except Exception as e:
logger.error(f"Failed to log chat event: {e}")
return {
"id": log.id,
"level": log.level,
"type": log.type,
"user_id": log.user_id,
"request_id": log.request_id,
"route": log.route,
"method": log.method,
"status_code": log.status_code,
"error_type": log.error_type,
"operation": log.operation,
"message": log.message,
"source": log.source,
"details": details,
"duration_ms": int(log.duration_ms) if log.duration_ms is not None else None,
"created_at": log.created_at.replace(tzinfo=timezone.utc).isoformat() if log.created_at else None,
"updated_at": log.updated_at.replace(tzinfo=timezone.utc).isoformat() if log.updated_at else None,
}

View File

@@ -0,0 +1,175 @@
import json
from datetime import datetime, timedelta
import pytest
from httpx import ASGITransport, AsyncClient
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
import app.models # noqa: F401
from app.database import Base
from app.main import app
from app.models.log import Log
from app.models.user import User
from app.routers.auth import get_current_user
from app.database import get_db
from app.services.auth_service import get_password_hash
@pytest.fixture
async def log_test_env(tmp_path):
db_path = tmp_path / 'test_logs.db'
engine = create_async_engine(f"sqlite+aiosqlite:///{db_path}", future=True)
session_factory = async_sessionmaker(engine, expire_on_commit=False)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
async with session_factory() as session:
current_user = User(
email='tester@example.com',
hashed_password=get_password_hash('secret123'),
full_name='Tester',
)
other_user = User(
email='other@example.com',
hashed_password=get_password_hash('secret123'),
full_name='Other',
)
session.add_all([current_user, other_user])
await session.flush()
now = datetime.utcnow()
session.add_all(
[
Log(
level='error',
type='system',
user_id=current_user.id,
request_id='req-target',
route='/api/settings',
method='PUT',
status_code=500,
operation='settings.save',
message='target error',
source='settings',
details=json.dumps({'scope': 'target'}),
created_at=now - timedelta(minutes=30),
updated_at=now - timedelta(minutes=30),
),
Log(
level='info',
type='system',
user_id=None,
request_id='req-global',
route='/api/health',
method='GET',
status_code=200,
operation='health.check',
message='global info',
source='http',
details=json.dumps({'scope': 'global'}),
created_at=now - timedelta(minutes=20),
updated_at=now - timedelta(minutes=20),
),
Log(
level='error',
type='system',
user_id=other_user.id,
request_id='req-other',
route='/api/secret',
method='GET',
status_code=500,
operation='secret.fail',
message='other user error',
source='secret',
details=json.dumps({'scope': 'other'}),
created_at=now - timedelta(minutes=10),
updated_at=now - timedelta(minutes=10),
),
Log(
level='warning',
type='chat',
user_id=current_user.id,
request_id='req-old',
route='/api/chat',
method='POST',
status_code=429,
operation='chat.send',
message='old warning',
source='chat',
details=json.dumps({'scope': 'old'}),
created_at=now - timedelta(days=10),
updated_at=now - timedelta(days=10),
),
]
)
await session.commit()
await session.refresh(current_user)
async def override_get_db():
async with session_factory() as session:
yield session
async def override_get_current_user():
return current_user
app.dependency_overrides[get_db] = override_get_db
app.dependency_overrides[get_current_user] = override_get_current_user
try:
yield
finally:
app.dependency_overrides.clear()
await engine.dispose()
@pytest.mark.asyncio
async def test_logs_list_filters_by_route_and_hides_other_user_records(log_test_env):
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url='http://testserver') as client:
response = await client.get('/api/logs', params={'route': '/api/settings'})
assert response.status_code == 200
payload = response.json()
assert payload['total'] == 1
assert [log['request_id'] for log in payload['logs']] == ['req-target']
@pytest.mark.asyncio
async def test_logs_stats_uses_same_filters_as_list(log_test_env):
transport = ASGITransport(app=app)
params = {
'route': '/api/settings',
'status_code': 500,
'level': 'error',
}
async with AsyncClient(transport=transport, base_url='http://testserver') as client:
list_response = await client.get('/api/logs', params=params)
stats_response = await client.get('/api/logs/stats', params=params)
assert list_response.status_code == 200
assert stats_response.status_code == 200
list_payload = list_response.json()
stats_payload = stats_response.json()
assert list_payload['total'] == 1
assert stats_payload['total'] == 1
assert stats_payload['by_type']['system'] == 1
assert stats_payload['by_level']['error'] == 1
@pytest.mark.asyncio
async def test_logs_rejects_invalid_datetime_range(log_test_env):
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url='http://testserver') as client:
response = await client.get(
'/api/logs',
params={
'start_at': '2026-03-21T12:00:00+00:00',
'end_at': '2026-03-20T12:00:00+00:00',
},
)
assert response.status_code == 422

View File

@@ -6,12 +6,18 @@ export interface Log {
level: 'debug' | 'info' | 'warning' | 'error'
type: 'agent' | 'system' | 'chat'
user_id: string | null
request_id: string | null
route: string | null
method: string | null
status_code: number | null
error_type: string | null
operation: string | null
message: string
source: string | null
details: string | null
duration_ms: string | null
created_at: string
updated_at: string
details: Record<string, unknown> | null
duration_ms: number | null
created_at: string | null
updated_at: string | null
}
export interface LogStats {
@@ -36,19 +42,27 @@ export interface LogQueryResult {
page_size: number
}
export interface LogQueryParams {
log_type?: string
level?: string
source?: string
request_id?: string
route?: string
operation?: string
status_code?: number
start_at?: string
end_at?: string
page?: number
page_size?: number
}
export const logApi = {
list: (params?: {
log_type?: string
level?: string
source?: string
page?: number
page_size?: number
}): Promise<AxiosResponse<LogQueryResult>> => {
list: (params?: LogQueryParams): Promise<AxiosResponse<LogQueryResult>> => {
return api.get('/api/logs', { params })
},
getStats: (hours?: number): Promise<AxiosResponse<LogStats>> => {
return api.get('/api/logs/stats', { params: { hours } })
getStats: (params?: LogQueryParams): Promise<AxiosResponse<LogStats>> => {
return api.get('/api/logs/stats', { params })
},
getRecent: (params?: {