diff --git a/server/src/app/api/v1/endpoints/system_logs.py b/server/src/app/api/v1/endpoints/system_logs.py new file mode 100644 index 0000000..c6fbd0c --- /dev/null +++ b/server/src/app/api/v1/endpoints/system_logs.py @@ -0,0 +1,102 @@ +from __future__ import annotations + +from typing import Annotated + +from fastapi import APIRouter, Depends, HTTPException, Query, status + +from app.api.deps import CurrentUserContext, require_admin_user +from app.schemas.common import ErrorResponse +from app.schemas.system_log import SystemLogEntryRead, SystemLogFileRead, SystemLogTailRead +from app.services.system_logs import SystemLogService + +router = APIRouter(prefix="/system-logs") + + +@router.get( + "/entries", + response_model=list[SystemLogEntryRead], + summary="查询系统日志记录列表", + description="解析 server/logs 下最近的日志内容,按单条日志记录返回,仅管理员可用。", + responses={ + status.HTTP_403_FORBIDDEN: { + "model": ErrorResponse, + "description": "只有管理员可以查看系统日志。", + } + }, +) +def list_system_log_entries( + _: Annotated[CurrentUserContext, Depends(require_admin_user)], + limit: Annotated[int, Query(ge=20, le=1000, description="返回的日志记录数。")] = 300, +) -> list[SystemLogEntryRead]: + return SystemLogService().list_entries(entry_limit=limit) + + +@router.get( + "/entries/{entry_id}", + response_model=SystemLogEntryRead, + summary="读取单条系统日志记录", + description="按日志记录 ID 返回结构化解析结果,仅管理员可用。", + responses={ + status.HTTP_403_FORBIDDEN: { + "model": ErrorResponse, + "description": "只有管理员可以查看系统日志。", + }, + status.HTTP_404_NOT_FOUND: { + "model": ErrorResponse, + "description": "日志记录不存在。", + }, + }, +) +def get_system_log_entry( + entry_id: str, + _: Annotated[CurrentUserContext, Depends(require_admin_user)], +) -> SystemLogEntryRead: + try: + return SystemLogService().get_entry(entry_id) + except FileNotFoundError as exc: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="日志记录不存在。") from exc + + +@router.get( + "/files", + response_model=list[SystemLogFileRead], + summary="查询系统日志文件列表", + description="返回 server/logs 目录下可查看的日志文件列表,仅管理员可用。", + responses={ + status.HTTP_403_FORBIDDEN: { + "model": ErrorResponse, + "description": "只有管理员可以查看系统日志。", + } + }, +) +def list_system_log_files( + _: Annotated[CurrentUserContext, Depends(require_admin_user)], +) -> list[SystemLogFileRead]: + return SystemLogService().list_files() + + +@router.get( + "/files/{file_name}", + response_model=SystemLogTailRead, + summary="读取系统日志尾部内容", + description="按文件名返回 server/logs 指定日志文件的最近若干行,仅管理员可用。", + responses={ + status.HTTP_403_FORBIDDEN: { + "model": ErrorResponse, + "description": "只有管理员可以查看系统日志。", + }, + status.HTTP_404_NOT_FOUND: { + "model": ErrorResponse, + "description": "日志文件不存在。", + }, + }, +) +def get_system_log_tail( + file_name: str, + _: Annotated[CurrentUserContext, Depends(require_admin_user)], + lines: Annotated[int, Query(ge=20, le=1000, description="返回的日志行数。")] = 300, +) -> SystemLogTailRead: + try: + return SystemLogService().read_tail(file_name, line_limit=lines) + except FileNotFoundError as exc: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="日志文件不存在。") from exc diff --git a/server/src/app/schemas/system_log.py b/server/src/app/schemas/system_log.py new file mode 100644 index 0000000..5272a4b --- /dev/null +++ b/server/src/app/schemas/system_log.py @@ -0,0 +1,39 @@ +from __future__ import annotations + +from datetime import datetime + +from pydantic import BaseModel, Field + + +class SystemLogFileRead(BaseModel): + name: str + size_bytes: int = 0 + updated_at: datetime | None = None + + +class SystemLogTailRead(BaseModel): + name: str + size_bytes: int = 0 + updated_at: datetime | None = None + line_count: int = 0 + lines: list[str] = Field(default_factory=list) + + +class SystemLogEntryRead(BaseModel): + id: str + source_file: str + line_number: int = 0 + timestamp: datetime | None = None + level: str = "UNKNOWN" + logger: str = "" + message: str = "" + request_id: str = "" + method: str = "" + path: str = "" + status_code: int | None = None + duration_ms: float | None = None + event_type: str = "系统日志" + outcome: str = "未知" + summary: str = "" + parse_status: str = "parsed" + raw: str = "" diff --git a/server/src/app/services/llm_wiki_tasks.py b/server/src/app/services/llm_wiki_tasks.py new file mode 100644 index 0000000..dae5b41 --- /dev/null +++ b/server/src/app/services/llm_wiki_tasks.py @@ -0,0 +1,198 @@ +from __future__ import annotations + +import threading +from datetime import UTC, datetime +from typing import Any + +from app.api.deps import CurrentUserContext +from app.core.agent_enums import AgentRunStatus +from app.core.logging import get_logger +from app.db.session import get_session_factory +from app.services.agent_runs import AgentRunService +from app.services.knowledge import KNOWLEDGE_INGEST_STATUS_FAILED, KnowledgeService +from app.services.llm_wiki import LlmWikiService + +logger = get_logger("app.services.llm_wiki_tasks") + + +class LlmWikiTaskManager: + def __init__(self) -> None: + self._lock = threading.RLock() + self._threads: dict[str, threading.Thread] = {} + + def submit_sync( + self, + *, + agent_run_id: str, + folder: str, + current_user: CurrentUserContext, + document_ids: list[str] | None = None, + force: bool = False, + ) -> None: + worker = threading.Thread( + target=self._run_sync, + kwargs={ + "agent_run_id": agent_run_id, + "folder": folder, + "current_user": current_user, + "document_ids": list(document_ids or []), + "force": force, + }, + daemon=True, + name=f"llm-wiki-sync-{agent_run_id}", + ) + with self._lock: + self._threads[agent_run_id] = worker + worker.start() + + def shutdown(self, *, timeout_seconds: float = 1.0) -> None: + with self._lock: + threads = list(self._threads.items()) + self._threads.clear() + + for _, worker in threads: + if worker.is_alive(): + worker.join(timeout=timeout_seconds) + + def _run_sync( + self, + *, + agent_run_id: str, + folder: str, + current_user: CurrentUserContext, + document_ids: list[str], + force: bool, + ) -> None: + session_factory = get_session_factory() + db = session_factory() + run_service = AgentRunService(db) + knowledge_service = KnowledgeService() + request_payload = { + "folder": folder, + "document_ids": list(document_ids), + "force": force, + } + + try: + run_service.merge_route_json( + agent_run_id, + { + "phase": "running", + "heartbeat_at": datetime.now(UTC).isoformat(), + "job_type": "llm_wiki_sync", + "folder": folder, + "force": force, + "requested_document_ids": list(document_ids), + "progress": { + "total_documents": len(document_ids), + "completed_documents": 0, + "failed_documents": 0, + "skipped_documents": 0, + "percent": 0, + }, + }, + status=AgentRunStatus.RUNNING.value, + result_summary="Hermes 后台归纳任务已启动。", + ) + + result = LlmWikiService(db).sync_folder( + folder=folder, + current_user=current_user, + document_ids=document_ids, + force=force, + agent_run_id=agent_run_id, + progress_callback=lambda payload, summary: self._write_progress( + run_service=run_service, + agent_run_id=agent_run_id, + payload=payload, + summary=summary, + ), + ) + run_service.record_tool_call( + run_id=agent_run_id, + tool_type="llm", + tool_name="system_hermes_llm_wiki_sync", + request_json=request_payload, + response_json=result.model_dump(mode="json"), + status="succeeded", + duration_ms=0, + ) + run_service.merge_route_json( + agent_run_id, + { + "phase": "succeeded", + "heartbeat_at": datetime.now(UTC).isoformat(), + "sync_run_id": result.run_id, + "sync_result": result.model_dump(mode="json"), + "progress": { + "total_documents": max(len(document_ids), result.document_count), + "completed_documents": result.document_count, + "failed_documents": 0, + "skipped_documents": max(0, len(document_ids) - result.document_count), + "percent": 100, + }, + }, + status=AgentRunStatus.SUCCEEDED.value, + result_summary=result.summary, + finished_at=datetime.now(UTC), + ) + except Exception as exc: + logger.exception("Background LLM Wiki sync failed run_id=%s", agent_run_id) + if document_ids: + knowledge_service.set_document_ingest_statuses( + document_ids, + status_code=KNOWLEDGE_INGEST_STATUS_FAILED, + agent_run_id=agent_run_id, + ) + run_service.record_tool_call( + run_id=agent_run_id, + tool_type="llm", + tool_name="system_hermes_llm_wiki_sync", + request_json=request_payload, + response_json={"error": str(exc)}, + status="failed", + duration_ms=0, + error_message=str(exc), + ) + run_service.merge_route_json( + agent_run_id, + { + "phase": "failed", + "heartbeat_at": datetime.now(UTC).isoformat(), + "progress": { + "total_documents": len(document_ids), + "completed_documents": 0, + "failed_documents": len(document_ids), + "skipped_documents": 0, + "percent": 100, + }, + }, + status=AgentRunStatus.FAILED.value, + result_summary=str(exc), + error_message=str(exc), + finished_at=datetime.now(UTC), + ) + finally: + db.close() + with self._lock: + self._threads.pop(agent_run_id, None) + + @staticmethod + def _write_progress( + *, + run_service: AgentRunService, + agent_run_id: str, + payload: dict[str, Any], + summary: str, + ) -> None: + patched_payload = dict(payload) + patched_payload["heartbeat_at"] = datetime.now(UTC).isoformat() + run_service.merge_route_json( + agent_run_id, + patched_payload, + status=AgentRunStatus.RUNNING.value, + result_summary=summary, + ) + + +llm_wiki_task_manager = LlmWikiTaskManager() diff --git a/server/src/app/services/system_logs.py b/server/src/app/services/system_logs.py new file mode 100644 index 0000000..526fee8 --- /dev/null +++ b/server/src/app/services/system_logs.py @@ -0,0 +1,250 @@ +from __future__ import annotations + +from collections import deque +from datetime import UTC, datetime +import hashlib +from pathlib import Path +import re + +from app.core.config import SERVER_DIR +from app.schemas.system_log import SystemLogEntryRead, SystemLogFileRead, SystemLogTailRead + + +ANSI_PATTERN = re.compile(r"\x1b\[[0-9;]*m") +STRUCTURED_LINE_PATTERN = re.compile( + r"^(?P\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\s+\|\s+" + r"(?P[A-Z]+)\s+\|\s+(?P[^|]+?)\s+\|\s+(?P.*)$" +) +HTTP_ACCESS_PATTERN = re.compile( + r"^(?P[A-Z]+)\s+(?P\S+)\s+(?P\d{3})\s+" + r"(?P\d+(?:\.\d+)?)ms(?:\s+request_id=(?P\S+))?$" +) + + +class SystemLogService: + def __init__(self, *, log_dir: Path | None = None) -> None: + self.log_dir = Path(log_dir or (SERVER_DIR / "logs")).resolve() + + def list_files(self) -> list[SystemLogFileRead]: + self.log_dir.mkdir(parents=True, exist_ok=True) + files = [ + self._serialize_file(path) + for path in self.log_dir.iterdir() + if path.is_file() and not path.name.startswith(".") + ] + return sorted( + files, + key=lambda item: (item.updated_at or datetime.fromtimestamp(0, tz=UTC)), + reverse=True, + ) + + def read_tail(self, name: str, *, line_limit: int = 300) -> SystemLogTailRead: + path = self._resolve_file(name) + line_buffer: deque[str] = deque(maxlen=max(1, min(line_limit, 1000))) + + with path.open("r", encoding="utf-8", errors="replace") as stream: + for line in stream: + line_buffer.append(line.rstrip("\n")) + + file_meta = self._serialize_file(path) + return SystemLogTailRead( + name=file_meta.name, + size_bytes=file_meta.size_bytes, + updated_at=file_meta.updated_at, + line_count=len(line_buffer), + lines=list(line_buffer), + ) + + def list_entries(self, *, entry_limit: int = 300, line_limit_per_file: int = 1200) -> list[SystemLogEntryRead]: + self.log_dir.mkdir(parents=True, exist_ok=True) + entries: list[SystemLogEntryRead] = [] + + for file_meta in self.list_files(): + path = self._resolve_file(file_meta.name) + lines = self._read_tail_lines(path, line_limit=max(1, min(line_limit_per_file, 5000))) + entries.extend(self._parse_entries(path.name, lines)) + + entries.sort( + key=lambda item: ( + item.timestamp.timestamp() if item.timestamp else 0, + item.source_file, + item.line_number, + ), + reverse=True, + ) + return entries[: max(1, min(entry_limit, 1000))] + + def get_entry(self, entry_id: str) -> SystemLogEntryRead: + normalized_id = str(entry_id or "").strip() + if not normalized_id: + raise FileNotFoundError("日志记录 ID 不能为空。") + + for entry in self.list_entries(entry_limit=1000, line_limit_per_file=5000): + if entry.id == normalized_id: + return entry + + raise FileNotFoundError(normalized_id) + + def _resolve_file(self, name: str) -> Path: + normalized_name = Path(str(name or "").strip()).name + if not normalized_name: + raise FileNotFoundError("日志文件名不能为空。") + + path = (self.log_dir / normalized_name).resolve() + if path.parent != self.log_dir or not path.exists() or not path.is_file(): + raise FileNotFoundError(normalized_name) + return path + + @staticmethod + def _read_tail_lines(path: Path, *, line_limit: int) -> list[tuple[int, str]]: + line_buffer: deque[tuple[int, str]] = deque(maxlen=line_limit) + with path.open("r", encoding="utf-8", errors="replace") as stream: + for index, line in enumerate(stream, start=1): + line_buffer.append((index, line.rstrip("\n"))) + return list(line_buffer) + + def _parse_entries(self, source_file: str, lines: list[tuple[int, str]]) -> list[SystemLogEntryRead]: + entries: list[SystemLogEntryRead] = [] + current: dict[str, object] | None = None + + for line_number, raw_line in lines: + clean_line = ANSI_PATTERN.sub("", raw_line) + match = STRUCTURED_LINE_PATTERN.match(clean_line) + + if match: + if current is not None: + entries.append(self._build_entry(source_file, current)) + current = { + "line_number": line_number, + "timestamp": self._parse_timestamp(match.group("timestamp")), + "level": match.group("level").strip(), + "logger": match.group("logger").strip(), + "message": match.group("message").strip(), + "raw_lines": [clean_line], + "parse_status": "parsed", + } + continue + + if current is not None: + current["raw_lines"].append(clean_line) + continue + + if clean_line.strip(): + current = { + "line_number": line_number, + "timestamp": None, + "level": "UNKNOWN", + "logger": "", + "message": clean_line.strip(), + "raw_lines": [clean_line], + "parse_status": "unparsed", + } + entries.append(self._build_entry(source_file, current)) + current = None + + if current is not None: + entries.append(self._build_entry(source_file, current)) + + return entries + + def _build_entry(self, source_file: str, payload: dict[str, object]) -> SystemLogEntryRead: + message = str(payload["message"]) + logger = str(payload["logger"]) + level = str(payload["level"]) + http_match = HTTP_ACCESS_PATTERN.match(message) + method = http_match.group("method") if http_match else "" + path = http_match.group("path") if http_match else "" + status_code = int(http_match.group("status_code")) if http_match else None + duration_ms = float(http_match.group("duration_ms")) if http_match else None + request_id = http_match.group("request_id") if http_match and http_match.group("request_id") else "" + event_type = self._resolve_event_type(logger, level, http_match is not None) + outcome = self._resolve_outcome(level, status_code) + summary = self._build_summary( + event_type=event_type, + message=message, + method=method, + path=path, + status_code=status_code, + duration_ms=duration_ms, + ) + raw = "\n".join(str(line) for line in payload["raw_lines"]) + fingerprint = f"{source_file}:{payload['line_number']}:{payload['timestamp']}:{raw}" + entry_id = hashlib.sha1(fingerprint.encode("utf-8")).hexdigest()[:16] + + return SystemLogEntryRead( + id=entry_id, + source_file=source_file, + line_number=int(payload["line_number"]), + timestamp=payload["timestamp"], + level=level, + logger=logger, + message=message, + request_id=request_id, + method=method, + path=path, + status_code=status_code, + duration_ms=duration_ms, + event_type=event_type, + outcome=outcome, + summary=summary, + parse_status=str(payload["parse_status"]), + raw=raw, + ) + + @staticmethod + def _parse_timestamp(value: str) -> datetime | None: + try: + return datetime.strptime(value, "%Y-%m-%d %H:%M:%S") + except ValueError: + return None + + @staticmethod + def _resolve_event_type(logger: str, level: str, is_http_access: bool) -> str: + if is_http_access or logger == "app.middleware.access": + return "HTTP 请求" + if level in {"ERROR", "CRITICAL"}: + return "系统异常" + if level == "WARNING": + return "系统告警" + return "运行日志" + + @staticmethod + def _resolve_outcome(level: str, status_code: int | None) -> str: + if status_code is not None: + if status_code >= 500: + return "失败" + if status_code >= 400: + return "异常" + return "成功" + if level in {"ERROR", "CRITICAL"}: + return "失败" + if level == "WARNING": + return "告警" + if level in {"INFO", "DEBUG"}: + return "成功" + return "未知" + + @staticmethod + def _build_summary( + *, + event_type: str, + message: str, + method: str, + path: str, + status_code: int | None, + duration_ms: float | None, + ) -> str: + if method and path and status_code is not None: + return f"{method} {path} 返回 {status_code},耗时 {duration_ms or 0:.1f}ms" + if len(message) <= 96: + return message + return f"{message[:96]}..." + + @staticmethod + def _serialize_file(path: Path) -> SystemLogFileRead: + stat = path.stat() + return SystemLogFileRead( + name=path.name, + size_bytes=stat.st_size, + updated_at=datetime.fromtimestamp(stat.st_mtime, tz=UTC), + ) diff --git a/server/tests/test_system_logs_service.py b/server/tests/test_system_logs_service.py new file mode 100644 index 0000000..7d741a5 --- /dev/null +++ b/server/tests/test_system_logs_service.py @@ -0,0 +1,54 @@ +from __future__ import annotations + +from app.services.system_logs import SystemLogService + + +def test_system_log_service_reads_tail(tmp_path) -> None: + log_dir = tmp_path / "logs" + log_dir.mkdir(parents=True, exist_ok=True) + log_file = log_dir / "app.log" + log_file.write_text( + "\n".join(f"line-{index}" for index in range(1, 21)) + "\n", + encoding="utf-8", + ) + + service = SystemLogService(log_dir=log_dir) + files = service.list_files() + tail = service.read_tail("app.log", line_limit=5) + + assert [item.name for item in files] == ["app.log"] + assert tail.name == "app.log" + assert tail.line_count == 5 + assert tail.lines == ["line-16", "line-17", "line-18", "line-19", "line-20"] + + +def test_system_log_service_parses_entries(tmp_path) -> None: + log_dir = tmp_path / "logs" + log_dir.mkdir(parents=True, exist_ok=True) + log_file = log_dir / "app.log" + log_file.write_text( + "\n".join( + [ + "2026-05-15 09:00:00 | INFO | app.middleware.access | GET /api/v1/health 200 8.2ms request_id=req_1", + "2026-05-15 09:00:01 | WARNING | app.services.settings | Skipping undecryptable model API key", + "2026-05-15 09:00:02 | ERROR | app.services.demo | Failed to load plugin", + "Traceback line 1", + ] + ) + + "\n", + encoding="utf-8", + ) + + service = SystemLogService(log_dir=log_dir) + entries = service.list_entries(entry_limit=10) + + assert len(entries) == 3 + assert entries[0].level == "ERROR" + assert entries[0].event_type == "系统异常" + assert "Traceback line 1" in entries[0].raw + assert entries[1].outcome == "告警" + assert entries[2].event_type == "HTTP 请求" + assert entries[2].method == "GET" + assert entries[2].status_code == 200 + assert entries[2].request_id == "req_1" + assert service.get_entry(entries[2].id).id == entries[2].id