Add log system with three log types (agent/system/chat)

Implemented a complete log system for tracking:
- Agent logs:智能体调用
- System logs: 系统运行
- Chat logs: 问答对话

Backend:
- Log model with type, level, user_id, message, source, duration_ms
- LogService with methods for logging and querying
- API endpoints: GET /api/logs, GET /api/logs/stats, GET /api/logs/recent

Frontend:
- LogView.vue with filters, stats, pagination, auto-refresh
- log.ts API client with TypeScript interfaces
- Added "运行日志" nav item to sidebar
This commit is contained in:
2026-03-21 11:58:51 +08:00
parent 30568846b3
commit 9e4e94c75e
9 changed files with 979 additions and 1 deletions

View File

@@ -0,0 +1,262 @@
"""
运行日志服务
提供统一的日志记录接口,支持分类存储和查询
"""
import json
import logging
from datetime import datetime, timedelta
from typing import Optional
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, and_, desc, func
from app.models.log import Log, LogType, LogLevel
logger = logging.getLogger(__name__)
# 日志级别映射
LEVEL_MAP = {
"DEBUG": LogLevel.DEBUG,
"INFO": LogLevel.INFO,
"WARNING": LogLevel.WARNING,
"ERROR": LogLevel.ERROR,
}
class LogService:
def __init__(self, db: AsyncSession):
self.db = db
async def log(
self,
message: str,
level: str = "info",
log_type: str = "system",
user_id: Optional[str] = None,
source: Optional[str] = None,
details: Optional[dict] = None,
duration_ms: Optional[int] = None,
) -> Log:
"""记录日志"""
log_entry = Log(
level=level,
type=log_type,
user_id=user_id,
message=message,
source=source,
details=json.dumps(details, ensure_ascii=False) if details else None,
duration_ms=str(duration_ms) if duration_ms else None,
)
self.db.add(log_entry)
await self.db.commit()
await self.db.refresh(log_entry)
return log_entry
async def agent_log(
self,
message: str,
user_id: Optional[str] = None,
source: Optional[str] = None,
details: Optional[dict] = None,
duration_ms: Optional[int] = None,
) -> Log:
"""记录智能体调用日志"""
return await self.log(
message=message,
level="info",
log_type="agent",
user_id=user_id,
source=source,
details=details,
duration_ms=duration_ms,
)
async def system_log(
self,
message: str,
level: str = "info",
source: Optional[str] = None,
details: Optional[dict] = None,
) -> Log:
"""记录系统运行日志"""
return await self.log(
message=message,
level=level,
log_type="system",
user_id=None,
source=source,
details=details,
)
async def chat_log(
self,
message: str,
user_id: str,
details: Optional[dict] = None,
duration_ms: Optional[int] = None,
) -> Log:
"""记录问答日志"""
return await self.log(
message=message,
level="info",
log_type="chat",
user_id=user_id,
source="chat",
details=details,
duration_ms=duration_ms,
)
async def list_logs(
self,
log_type: Optional[str] = None,
level: Optional[str] = None,
user_id: Optional[str] = None,
source: Optional[str] = None,
limit: int = 100,
offset: int = 0,
) -> tuple[list[Log], int]:
"""
查询日志列表
Returns:
(logs, total_count)
"""
conditions = []
if log_type:
conditions.append(Log.type == log_type)
if level:
conditions.append(Log.level == level)
if user_id:
conditions.append(Log.user_id == user_id)
if source:
conditions.append(Log.source == source)
# 统计总数
count_query = select(func.count(Log.id))
if conditions:
count_query = count_query.where(and_(*conditions))
total_result = await self.db.execute(count_query)
total = total_result.scalar() or 0
# 查询列表
query = (
select(Log)
.where(and_(*conditions)) if conditions else select(Log)
).order_by(desc(Log.created_at)).limit(limit).offset(offset)
result = await self.db.execute(query)
logs = list(result.scalars().all())
return logs, total
async def get_recent_logs(
self,
log_type: Optional[str] = None,
hours: int = 24,
limit: int = 100,
) -> list[Log]:
"""获取最近的日志"""
since = datetime.utcnow() - timedelta(hours=hours)
conditions = [Log.created_at >= since]
if log_type:
conditions.append(Log.type == log_type)
query = (
select(Log)
.where(and_(*conditions))
.order_by(desc(Log.created_at))
.limit(limit)
)
result = await self.db.execute(query)
return list(result.scalars().all())
async def get_log_stats(self, hours: int = 24) -> dict:
"""获取日志统计"""
since = datetime.utcnow() - timedelta(hours=hours)
stats = {
"total": 0,
"by_type": {"agent": 0, "system": 0, "chat": 0},
"by_level": {"debug": 0, "info": 0, "warning": 0, "error": 0},
}
# 按类型统计
for log_type in ["agent", "system", "chat"]:
query = select(func.count(Log.id)).where(
and_(Log.type == log_type, Log.created_at >= since)
)
result = await self.db.execute(query)
count = result.scalar() or 0
stats["by_type"][log_type] = count
stats["total"] += count
# 按级别统计
for level in ["debug", "info", "warning", "error"]:
query = select(func.count(Log.id)).where(
and_(Log.level == level, Log.created_at >= since)
)
result = await self.db.execute(query)
stats["by_level"][level] = result.scalar() or 0
return stats
# 全局日志记录函数,方便各处调用
_global_db_session = None
def set_log_session(db: AsyncSession):
"""设置全局日志会话"""
global _global_db_session
_global_db_session = db
def get_log_session() -> Optional[AsyncSession]:
"""获取全局日志会话"""
return _global_db_session
async def log_agent_event(
message: str,
user_id: Optional[str] = None,
source: Optional[str] = None,
details: Optional[dict] = None,
duration_ms: Optional[int] = None,
):
"""记录智能体事件到数据库"""
if _global_db_session:
try:
svc = LogService(_global_db_session)
await svc.agent_log(message, user_id, source, details, duration_ms)
except Exception as e:
logger.error(f"Failed to log agent event: {e}")
async def log_system_event(
message: str,
level: str = "info",
source: Optional[str] = None,
details: Optional[dict] = None,
):
"""记录系统事件到数据库"""
if _global_db_session:
try:
svc = LogService(_global_db_session)
await svc.system_log(message, level, source, details)
except Exception as e:
logger.error(f"Failed to log system event: {e}")
async def log_chat_event(
message: str,
user_id: str,
details: Optional[dict] = None,
duration_ms: Optional[int] = None,
):
"""记录聊天事件到数据库"""
if _global_db_session:
try:
svc = LogService(_global_db_session)
await svc.chat_log(message, user_id, details, duration_ms)
except Exception as e:
logger.error(f"Failed to log chat event: {e}")