feat(server): 新增系统日志服务模块,包含API端点、schema定义和服务实现,用于系统操作日志记录和查询

This commit is contained in:
caoxiaozhu
2026-05-15 09:33:20 +00:00
parent 8691385a8e
commit 1d5d009bc7
5 changed files with 643 additions and 0 deletions

View File

@@ -0,0 +1,102 @@
from __future__ import annotations
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException, Query, status
from app.api.deps import CurrentUserContext, require_admin_user
from app.schemas.common import ErrorResponse
from app.schemas.system_log import SystemLogEntryRead, SystemLogFileRead, SystemLogTailRead
from app.services.system_logs import SystemLogService
router = APIRouter(prefix="/system-logs")
@router.get(
"/entries",
response_model=list[SystemLogEntryRead],
summary="查询系统日志记录列表",
description="解析 server/logs 下最近的日志内容,按单条日志记录返回,仅管理员可用。",
responses={
status.HTTP_403_FORBIDDEN: {
"model": ErrorResponse,
"description": "只有管理员可以查看系统日志。",
}
},
)
def list_system_log_entries(
_: Annotated[CurrentUserContext, Depends(require_admin_user)],
limit: Annotated[int, Query(ge=20, le=1000, description="返回的日志记录数。")] = 300,
) -> list[SystemLogEntryRead]:
return SystemLogService().list_entries(entry_limit=limit)
@router.get(
"/entries/{entry_id}",
response_model=SystemLogEntryRead,
summary="读取单条系统日志记录",
description="按日志记录 ID 返回结构化解析结果,仅管理员可用。",
responses={
status.HTTP_403_FORBIDDEN: {
"model": ErrorResponse,
"description": "只有管理员可以查看系统日志。",
},
status.HTTP_404_NOT_FOUND: {
"model": ErrorResponse,
"description": "日志记录不存在。",
},
},
)
def get_system_log_entry(
entry_id: str,
_: Annotated[CurrentUserContext, Depends(require_admin_user)],
) -> SystemLogEntryRead:
try:
return SystemLogService().get_entry(entry_id)
except FileNotFoundError as exc:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="日志记录不存在。") from exc
@router.get(
"/files",
response_model=list[SystemLogFileRead],
summary="查询系统日志文件列表",
description="返回 server/logs 目录下可查看的日志文件列表,仅管理员可用。",
responses={
status.HTTP_403_FORBIDDEN: {
"model": ErrorResponse,
"description": "只有管理员可以查看系统日志。",
}
},
)
def list_system_log_files(
_: Annotated[CurrentUserContext, Depends(require_admin_user)],
) -> list[SystemLogFileRead]:
return SystemLogService().list_files()
@router.get(
"/files/{file_name}",
response_model=SystemLogTailRead,
summary="读取系统日志尾部内容",
description="按文件名返回 server/logs 指定日志文件的最近若干行,仅管理员可用。",
responses={
status.HTTP_403_FORBIDDEN: {
"model": ErrorResponse,
"description": "只有管理员可以查看系统日志。",
},
status.HTTP_404_NOT_FOUND: {
"model": ErrorResponse,
"description": "日志文件不存在。",
},
},
)
def get_system_log_tail(
file_name: str,
_: Annotated[CurrentUserContext, Depends(require_admin_user)],
lines: Annotated[int, Query(ge=20, le=1000, description="返回的日志行数。")] = 300,
) -> SystemLogTailRead:
try:
return SystemLogService().read_tail(file_name, line_limit=lines)
except FileNotFoundError as exc:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="日志文件不存在。") from exc

View File

@@ -0,0 +1,39 @@
from __future__ import annotations
from datetime import datetime
from pydantic import BaseModel, Field
class SystemLogFileRead(BaseModel):
name: str
size_bytes: int = 0
updated_at: datetime | None = None
class SystemLogTailRead(BaseModel):
name: str
size_bytes: int = 0
updated_at: datetime | None = None
line_count: int = 0
lines: list[str] = Field(default_factory=list)
class SystemLogEntryRead(BaseModel):
id: str
source_file: str
line_number: int = 0
timestamp: datetime | None = None
level: str = "UNKNOWN"
logger: str = ""
message: str = ""
request_id: str = ""
method: str = ""
path: str = ""
status_code: int | None = None
duration_ms: float | None = None
event_type: str = "系统日志"
outcome: str = "未知"
summary: str = ""
parse_status: str = "parsed"
raw: str = ""

View File

@@ -0,0 +1,198 @@
from __future__ import annotations
import threading
from datetime import UTC, datetime
from typing import Any
from app.api.deps import CurrentUserContext
from app.core.agent_enums import AgentRunStatus
from app.core.logging import get_logger
from app.db.session import get_session_factory
from app.services.agent_runs import AgentRunService
from app.services.knowledge import KNOWLEDGE_INGEST_STATUS_FAILED, KnowledgeService
from app.services.llm_wiki import LlmWikiService
logger = get_logger("app.services.llm_wiki_tasks")
class LlmWikiTaskManager:
def __init__(self) -> None:
self._lock = threading.RLock()
self._threads: dict[str, threading.Thread] = {}
def submit_sync(
self,
*,
agent_run_id: str,
folder: str,
current_user: CurrentUserContext,
document_ids: list[str] | None = None,
force: bool = False,
) -> None:
worker = threading.Thread(
target=self._run_sync,
kwargs={
"agent_run_id": agent_run_id,
"folder": folder,
"current_user": current_user,
"document_ids": list(document_ids or []),
"force": force,
},
daemon=True,
name=f"llm-wiki-sync-{agent_run_id}",
)
with self._lock:
self._threads[agent_run_id] = worker
worker.start()
def shutdown(self, *, timeout_seconds: float = 1.0) -> None:
with self._lock:
threads = list(self._threads.items())
self._threads.clear()
for _, worker in threads:
if worker.is_alive():
worker.join(timeout=timeout_seconds)
def _run_sync(
self,
*,
agent_run_id: str,
folder: str,
current_user: CurrentUserContext,
document_ids: list[str],
force: bool,
) -> None:
session_factory = get_session_factory()
db = session_factory()
run_service = AgentRunService(db)
knowledge_service = KnowledgeService()
request_payload = {
"folder": folder,
"document_ids": list(document_ids),
"force": force,
}
try:
run_service.merge_route_json(
agent_run_id,
{
"phase": "running",
"heartbeat_at": datetime.now(UTC).isoformat(),
"job_type": "llm_wiki_sync",
"folder": folder,
"force": force,
"requested_document_ids": list(document_ids),
"progress": {
"total_documents": len(document_ids),
"completed_documents": 0,
"failed_documents": 0,
"skipped_documents": 0,
"percent": 0,
},
},
status=AgentRunStatus.RUNNING.value,
result_summary="Hermes 后台归纳任务已启动。",
)
result = LlmWikiService(db).sync_folder(
folder=folder,
current_user=current_user,
document_ids=document_ids,
force=force,
agent_run_id=agent_run_id,
progress_callback=lambda payload, summary: self._write_progress(
run_service=run_service,
agent_run_id=agent_run_id,
payload=payload,
summary=summary,
),
)
run_service.record_tool_call(
run_id=agent_run_id,
tool_type="llm",
tool_name="system_hermes_llm_wiki_sync",
request_json=request_payload,
response_json=result.model_dump(mode="json"),
status="succeeded",
duration_ms=0,
)
run_service.merge_route_json(
agent_run_id,
{
"phase": "succeeded",
"heartbeat_at": datetime.now(UTC).isoformat(),
"sync_run_id": result.run_id,
"sync_result": result.model_dump(mode="json"),
"progress": {
"total_documents": max(len(document_ids), result.document_count),
"completed_documents": result.document_count,
"failed_documents": 0,
"skipped_documents": max(0, len(document_ids) - result.document_count),
"percent": 100,
},
},
status=AgentRunStatus.SUCCEEDED.value,
result_summary=result.summary,
finished_at=datetime.now(UTC),
)
except Exception as exc:
logger.exception("Background LLM Wiki sync failed run_id=%s", agent_run_id)
if document_ids:
knowledge_service.set_document_ingest_statuses(
document_ids,
status_code=KNOWLEDGE_INGEST_STATUS_FAILED,
agent_run_id=agent_run_id,
)
run_service.record_tool_call(
run_id=agent_run_id,
tool_type="llm",
tool_name="system_hermes_llm_wiki_sync",
request_json=request_payload,
response_json={"error": str(exc)},
status="failed",
duration_ms=0,
error_message=str(exc),
)
run_service.merge_route_json(
agent_run_id,
{
"phase": "failed",
"heartbeat_at": datetime.now(UTC).isoformat(),
"progress": {
"total_documents": len(document_ids),
"completed_documents": 0,
"failed_documents": len(document_ids),
"skipped_documents": 0,
"percent": 100,
},
},
status=AgentRunStatus.FAILED.value,
result_summary=str(exc),
error_message=str(exc),
finished_at=datetime.now(UTC),
)
finally:
db.close()
with self._lock:
self._threads.pop(agent_run_id, None)
@staticmethod
def _write_progress(
*,
run_service: AgentRunService,
agent_run_id: str,
payload: dict[str, Any],
summary: str,
) -> None:
patched_payload = dict(payload)
patched_payload["heartbeat_at"] = datetime.now(UTC).isoformat()
run_service.merge_route_json(
agent_run_id,
patched_payload,
status=AgentRunStatus.RUNNING.value,
result_summary=summary,
)
llm_wiki_task_manager = LlmWikiTaskManager()

View File

@@ -0,0 +1,250 @@
from __future__ import annotations
from collections import deque
from datetime import UTC, datetime
import hashlib
from pathlib import Path
import re
from app.core.config import SERVER_DIR
from app.schemas.system_log import SystemLogEntryRead, SystemLogFileRead, SystemLogTailRead
ANSI_PATTERN = re.compile(r"\x1b\[[0-9;]*m")
STRUCTURED_LINE_PATTERN = re.compile(
r"^(?P<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\s+\|\s+"
r"(?P<level>[A-Z]+)\s+\|\s+(?P<logger>[^|]+?)\s+\|\s+(?P<message>.*)$"
)
HTTP_ACCESS_PATTERN = re.compile(
r"^(?P<method>[A-Z]+)\s+(?P<path>\S+)\s+(?P<status_code>\d{3})\s+"
r"(?P<duration_ms>\d+(?:\.\d+)?)ms(?:\s+request_id=(?P<request_id>\S+))?$"
)
class SystemLogService:
def __init__(self, *, log_dir: Path | None = None) -> None:
self.log_dir = Path(log_dir or (SERVER_DIR / "logs")).resolve()
def list_files(self) -> list[SystemLogFileRead]:
self.log_dir.mkdir(parents=True, exist_ok=True)
files = [
self._serialize_file(path)
for path in self.log_dir.iterdir()
if path.is_file() and not path.name.startswith(".")
]
return sorted(
files,
key=lambda item: (item.updated_at or datetime.fromtimestamp(0, tz=UTC)),
reverse=True,
)
def read_tail(self, name: str, *, line_limit: int = 300) -> SystemLogTailRead:
path = self._resolve_file(name)
line_buffer: deque[str] = deque(maxlen=max(1, min(line_limit, 1000)))
with path.open("r", encoding="utf-8", errors="replace") as stream:
for line in stream:
line_buffer.append(line.rstrip("\n"))
file_meta = self._serialize_file(path)
return SystemLogTailRead(
name=file_meta.name,
size_bytes=file_meta.size_bytes,
updated_at=file_meta.updated_at,
line_count=len(line_buffer),
lines=list(line_buffer),
)
def list_entries(self, *, entry_limit: int = 300, line_limit_per_file: int = 1200) -> list[SystemLogEntryRead]:
self.log_dir.mkdir(parents=True, exist_ok=True)
entries: list[SystemLogEntryRead] = []
for file_meta in self.list_files():
path = self._resolve_file(file_meta.name)
lines = self._read_tail_lines(path, line_limit=max(1, min(line_limit_per_file, 5000)))
entries.extend(self._parse_entries(path.name, lines))
entries.sort(
key=lambda item: (
item.timestamp.timestamp() if item.timestamp else 0,
item.source_file,
item.line_number,
),
reverse=True,
)
return entries[: max(1, min(entry_limit, 1000))]
def get_entry(self, entry_id: str) -> SystemLogEntryRead:
normalized_id = str(entry_id or "").strip()
if not normalized_id:
raise FileNotFoundError("日志记录 ID 不能为空。")
for entry in self.list_entries(entry_limit=1000, line_limit_per_file=5000):
if entry.id == normalized_id:
return entry
raise FileNotFoundError(normalized_id)
def _resolve_file(self, name: str) -> Path:
normalized_name = Path(str(name or "").strip()).name
if not normalized_name:
raise FileNotFoundError("日志文件名不能为空。")
path = (self.log_dir / normalized_name).resolve()
if path.parent != self.log_dir or not path.exists() or not path.is_file():
raise FileNotFoundError(normalized_name)
return path
@staticmethod
def _read_tail_lines(path: Path, *, line_limit: int) -> list[tuple[int, str]]:
line_buffer: deque[tuple[int, str]] = deque(maxlen=line_limit)
with path.open("r", encoding="utf-8", errors="replace") as stream:
for index, line in enumerate(stream, start=1):
line_buffer.append((index, line.rstrip("\n")))
return list(line_buffer)
def _parse_entries(self, source_file: str, lines: list[tuple[int, str]]) -> list[SystemLogEntryRead]:
entries: list[SystemLogEntryRead] = []
current: dict[str, object] | None = None
for line_number, raw_line in lines:
clean_line = ANSI_PATTERN.sub("", raw_line)
match = STRUCTURED_LINE_PATTERN.match(clean_line)
if match:
if current is not None:
entries.append(self._build_entry(source_file, current))
current = {
"line_number": line_number,
"timestamp": self._parse_timestamp(match.group("timestamp")),
"level": match.group("level").strip(),
"logger": match.group("logger").strip(),
"message": match.group("message").strip(),
"raw_lines": [clean_line],
"parse_status": "parsed",
}
continue
if current is not None:
current["raw_lines"].append(clean_line)
continue
if clean_line.strip():
current = {
"line_number": line_number,
"timestamp": None,
"level": "UNKNOWN",
"logger": "",
"message": clean_line.strip(),
"raw_lines": [clean_line],
"parse_status": "unparsed",
}
entries.append(self._build_entry(source_file, current))
current = None
if current is not None:
entries.append(self._build_entry(source_file, current))
return entries
def _build_entry(self, source_file: str, payload: dict[str, object]) -> SystemLogEntryRead:
message = str(payload["message"])
logger = str(payload["logger"])
level = str(payload["level"])
http_match = HTTP_ACCESS_PATTERN.match(message)
method = http_match.group("method") if http_match else ""
path = http_match.group("path") if http_match else ""
status_code = int(http_match.group("status_code")) if http_match else None
duration_ms = float(http_match.group("duration_ms")) if http_match else None
request_id = http_match.group("request_id") if http_match and http_match.group("request_id") else ""
event_type = self._resolve_event_type(logger, level, http_match is not None)
outcome = self._resolve_outcome(level, status_code)
summary = self._build_summary(
event_type=event_type,
message=message,
method=method,
path=path,
status_code=status_code,
duration_ms=duration_ms,
)
raw = "\n".join(str(line) for line in payload["raw_lines"])
fingerprint = f"{source_file}:{payload['line_number']}:{payload['timestamp']}:{raw}"
entry_id = hashlib.sha1(fingerprint.encode("utf-8")).hexdigest()[:16]
return SystemLogEntryRead(
id=entry_id,
source_file=source_file,
line_number=int(payload["line_number"]),
timestamp=payload["timestamp"],
level=level,
logger=logger,
message=message,
request_id=request_id,
method=method,
path=path,
status_code=status_code,
duration_ms=duration_ms,
event_type=event_type,
outcome=outcome,
summary=summary,
parse_status=str(payload["parse_status"]),
raw=raw,
)
@staticmethod
def _parse_timestamp(value: str) -> datetime | None:
try:
return datetime.strptime(value, "%Y-%m-%d %H:%M:%S")
except ValueError:
return None
@staticmethod
def _resolve_event_type(logger: str, level: str, is_http_access: bool) -> str:
if is_http_access or logger == "app.middleware.access":
return "HTTP 请求"
if level in {"ERROR", "CRITICAL"}:
return "系统异常"
if level == "WARNING":
return "系统告警"
return "运行日志"
@staticmethod
def _resolve_outcome(level: str, status_code: int | None) -> str:
if status_code is not None:
if status_code >= 500:
return "失败"
if status_code >= 400:
return "异常"
return "成功"
if level in {"ERROR", "CRITICAL"}:
return "失败"
if level == "WARNING":
return "告警"
if level in {"INFO", "DEBUG"}:
return "成功"
return "未知"
@staticmethod
def _build_summary(
*,
event_type: str,
message: str,
method: str,
path: str,
status_code: int | None,
duration_ms: float | None,
) -> str:
if method and path and status_code is not None:
return f"{method} {path} 返回 {status_code},耗时 {duration_ms or 0:.1f}ms"
if len(message) <= 96:
return message
return f"{message[:96]}..."
@staticmethod
def _serialize_file(path: Path) -> SystemLogFileRead:
stat = path.stat()
return SystemLogFileRead(
name=path.name,
size_bytes=stat.st_size,
updated_at=datetime.fromtimestamp(stat.st_mtime, tz=UTC),
)

View File

@@ -0,0 +1,54 @@
from __future__ import annotations
from app.services.system_logs import SystemLogService
def test_system_log_service_reads_tail(tmp_path) -> None:
log_dir = tmp_path / "logs"
log_dir.mkdir(parents=True, exist_ok=True)
log_file = log_dir / "app.log"
log_file.write_text(
"\n".join(f"line-{index}" for index in range(1, 21)) + "\n",
encoding="utf-8",
)
service = SystemLogService(log_dir=log_dir)
files = service.list_files()
tail = service.read_tail("app.log", line_limit=5)
assert [item.name for item in files] == ["app.log"]
assert tail.name == "app.log"
assert tail.line_count == 5
assert tail.lines == ["line-16", "line-17", "line-18", "line-19", "line-20"]
def test_system_log_service_parses_entries(tmp_path) -> None:
log_dir = tmp_path / "logs"
log_dir.mkdir(parents=True, exist_ok=True)
log_file = log_dir / "app.log"
log_file.write_text(
"\n".join(
[
"2026-05-15 09:00:00 | INFO | app.middleware.access | GET /api/v1/health 200 8.2ms request_id=req_1",
"2026-05-15 09:00:01 | WARNING | app.services.settings | Skipping undecryptable model API key",
"2026-05-15 09:00:02 | ERROR | app.services.demo | Failed to load plugin",
"Traceback line 1",
]
)
+ "\n",
encoding="utf-8",
)
service = SystemLogService(log_dir=log_dir)
entries = service.list_entries(entry_limit=10)
assert len(entries) == 3
assert entries[0].level == "ERROR"
assert entries[0].event_type == "系统异常"
assert "Traceback line 1" in entries[0].raw
assert entries[1].outcome == "告警"
assert entries[2].event_type == "HTTP 请求"
assert entries[2].method == "GET"
assert entries[2].status_code == 200
assert entries[2].request_id == "req_1"
assert service.get_entry(entries[2].id).id == entries[2].id