from __future__ import annotations from fastapi import APIRouter from sqlalchemy import text from app.core.config import get_settings from app.db.session import get_engine from app.schemas.common import HealthCheckRead router = APIRouter(prefix="/health") @router.get( "", response_model=HealthCheckRead, summary="服务健康检查", description="检查服务基础状态,并在系统初始化完成后验证数据库连通性。", ) def health_check() -> HealthCheckRead: settings = get_settings() database_ok = False database_error = None if settings.setup_completed: try: with get_engine().connect() as connection: connection.execute(text("SELECT 1")) database_ok = True except Exception as exc: # pragma: no cover - runtime connectivity branch database_error = str(exc) return HealthCheckRead( status="ok" if database_ok else "degraded", database={ "configured": settings.setup_completed, "ok": database_ok, "error": database_error, }, redis={"configured": bool(settings.redis_url), "enabled": bool(settings.redis_url)}, )