feat: 完善后端 API OpenAPI 文档与统一错误响应 schema

This commit is contained in:
caoxiaozhu
2026-05-11 05:18:16 +00:00
parent b2beeaa136
commit 321dd6fdaf
20 changed files with 7359 additions and 225 deletions

View File

@@ -0,0 +1,73 @@
from __future__ import annotations
import json
from collections import defaultdict
from datetime import UTC, datetime
from pathlib import Path
from app.main import create_app
SERVER_DIR = Path(__file__).resolve().parents[1]
ROOT_DIR = SERVER_DIR.parent
OUTPUT_DIR = ROOT_DIR / "document" / "development" / "backend_api"
OPENAPI_PATH = OUTPUT_DIR / "openapi.json"
INVENTORY_PATH = OUTPUT_DIR / "interface_inventory.md"
def build_inventory(schema: dict[str, object]) -> str:
paths = schema.get("paths", {})
tag_groups: dict[str, list[tuple[str, str, str]]] = defaultdict(list)
for path, operations in sorted(paths.items()):
if not isinstance(operations, dict):
continue
for method, operation in sorted(operations.items()):
if not isinstance(operation, dict):
continue
summary = str(operation.get("summary") or operation.get("operationId") or "-")
tags = operation.get("tags") or ["untagged"]
primary_tag = str(tags[0])
tag_groups[primary_tag].append((method.upper(), str(path), summary))
generated_at = datetime.now(UTC).strftime("%Y-%m-%d %H:%M:%S UTC")
lines = [
"# Backend API Interface Inventory",
"",
f"- Generated at: `{generated_at}`",
f"- API title: `{schema['info']['title']}`",
f"- API version: `{schema['info']['version']}`",
f"- Total paths: `{len(paths)}`",
"",
"## Tag Overview",
"",
]
for tag_name in sorted(tag_groups):
lines.append(f"### {tag_name}")
lines.append("")
lines.append("| Method | Path | Summary |")
lines.append("| --- | --- | --- |")
for method, path, summary in tag_groups[tag_name]:
lines.append(f"| `{method}` | `{path}` | {summary} |")
lines.append("")
return "\n".join(lines).strip() + "\n"
def main() -> None:
app = create_app()
schema = app.openapi()
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
OPENAPI_PATH.write_text(
json.dumps(schema, ensure_ascii=False, indent=2),
encoding="utf-8",
)
INVENTORY_PATH.write_text(build_inventory(schema), encoding="utf-8")
print(f"OpenAPI exported to: {OPENAPI_PATH}")
print(f"Interface inventory exported to: {INVENTORY_PATH}")
if __name__ == "__main__":
main()

View File

@@ -24,12 +24,24 @@ class CurrentUserContext:
is_admin: bool
def get_current_user(
x_auth_username: Annotated[str | None, Header()] = None,
x_auth_name: Annotated[str | None, Header()] = None,
x_auth_role_codes: Annotated[str | None, Header()] = None,
x_auth_is_admin: Annotated[str | None, Header()] = None,
) -> CurrentUserContext:
def get_current_user(
x_auth_username: Annotated[
str | None,
Header(description="当前登录用户名。知识库接口至少需要提供用户名或姓名。"),
] = None,
x_auth_name: Annotated[
str | None,
Header(description="当前登录人展示姓名。未传时默认回退到用户名。"),
] = None,
x_auth_role_codes: Annotated[
str | None,
Header(description="角色编码列表,多个角色使用英文逗号分隔,例如 `manager,finance`。"),
] = None,
x_auth_is_admin: Annotated[
str | None,
Header(description="是否管理员,支持 `true/false/1/0`。"),
] = None,
) -> CurrentUserContext:
role_codes = [item.strip() for item in (x_auth_role_codes or "").split(",") if item.strip()]
is_admin = str(x_auth_is_admin or "").strip().lower() in {"1", "true", "yes", "on"}

View File

@@ -16,10 +16,19 @@ from app.schemas.agent_asset import (
AgentAssetVersionCreate,
AgentAssetVersionRead,
)
from app.schemas.common import ErrorResponse
from app.services.agent_assets import AgentAssetService
router = APIRouter(prefix="/agent-assets")
DbSession = Annotated[Session, Depends(get_db)]
ActorHeader = Annotated[
str | None,
Header(description="审计操作者。未传时回退到请求体中的 owner / reviewer 或 `system`。"),
]
RequestIdHeader = Annotated[
str | None,
Header(description="外部请求 ID用于串联审计日志和上游调用链。"),
]
def _handle_asset_error(exc: Exception) -> None:
@@ -32,13 +41,30 @@ def _handle_asset_error(exc: Exception) -> None:
raise exc
@router.get("", response_model=list[AgentAssetListItem])
@router.get(
"",
response_model=list[AgentAssetListItem],
summary="查询 Agent 资产列表",
description="按资产类型、状态、领域和关键字筛选规则、技能、MCP 与任务资产。",
)
def list_agent_assets(
db: DbSession,
asset_type: str | None = Query(default=None),
status_value: str | None = Query(default=None, alias="status"),
domain: str | None = Query(default=None),
keyword: str | None = Query(default=None),
asset_type: Annotated[
str | None,
Query(description="资产类型:`rule`、`skill`、`mcp`、`task`。"),
] = None,
status_value: Annotated[
str | None,
Query(alias="status", description="资产状态筛选。"),
] = None,
domain: Annotated[
str | None,
Query(description="业务领域筛选,例如 `expense`、`ar`、`ap`。"),
] = None,
keyword: Annotated[
str | None,
Query(description="资产编码、名称关键字模糊查询。"),
] = None,
) -> list[AgentAssetListItem]:
return AgentAssetService(db).list_assets(
asset_type=asset_type,
@@ -48,7 +74,18 @@ def list_agent_assets(
)
@router.get("/{asset_id}", response_model=AgentAssetRead)
@router.get(
"/{asset_id}",
response_model=AgentAssetRead,
summary="读取 Agent 资产详情",
description="返回资产当前版本正文、最近版本列表和最近一次审核信息。",
responses={
status.HTTP_404_NOT_FOUND: {
"model": ErrorResponse,
"description": "资产不存在。",
}
},
)
def get_agent_asset(asset_id: str, db: DbSession) -> AgentAssetRead:
asset = AgentAssetService(db).get_asset(asset_id)
if asset is None:
@@ -56,12 +93,24 @@ def get_agent_asset(asset_id: str, db: DbSession) -> AgentAssetRead:
return asset
@router.post("", response_model=AgentAssetRead, status_code=status.HTTP_201_CREATED)
@router.post(
"",
response_model=AgentAssetRead,
status_code=status.HTTP_201_CREATED,
summary="创建 Agent 资产",
description="创建新的规则、技能、MCP 或任务资产,并自动记录审计日志。",
responses={
status.HTTP_400_BAD_REQUEST: {
"model": ErrorResponse,
"description": "资产编码冲突或请求字段不合法。",
}
},
)
def create_agent_asset(
payload: AgentAssetCreate,
db: DbSession,
x_actor: Annotated[str | None, Header()] = None,
x_request_id: Annotated[str | None, Header()] = None,
x_actor: ActorHeader = None,
x_request_id: RequestIdHeader = None,
) -> AgentAssetRead:
try:
return AgentAssetService(db).create_asset(
@@ -73,13 +122,28 @@ def create_agent_asset(
_handle_asset_error(exc)
@router.patch("/{asset_id}", response_model=AgentAssetRead)
@router.patch(
"/{asset_id}",
response_model=AgentAssetRead,
summary="更新 Agent 资产",
description="更新资产基础信息、当前版本、状态和配置,并写入审计日志。",
responses={
status.HTTP_400_BAD_REQUEST: {
"model": ErrorResponse,
"description": "状态更新非法或请求字段不合法。",
},
status.HTTP_404_NOT_FOUND: {
"model": ErrorResponse,
"description": "资产或指定版本不存在。",
},
},
)
def update_agent_asset(
asset_id: str,
payload: AgentAssetUpdate,
db: DbSession,
x_actor: Annotated[str | None, Header()] = None,
x_request_id: Annotated[str | None, Header()] = None,
x_actor: ActorHeader = None,
x_request_id: RequestIdHeader = None,
) -> AgentAssetRead:
try:
return AgentAssetService(db).update_asset(
@@ -92,9 +156,25 @@ def update_agent_asset(
_handle_asset_error(exc)
@router.get("/{asset_id}/versions", response_model=list[AgentAssetVersionRead])
@router.get(
"/{asset_id}/versions",
response_model=list[AgentAssetVersionRead],
summary="查询资产版本列表",
description="返回指定资产的版本历史,默认按最近版本优先排序。",
responses={
status.HTTP_404_NOT_FOUND: {
"model": ErrorResponse,
"description": "资产不存在。",
}
},
)
def list_agent_asset_versions(
asset_id: str, db: DbSession, limit: int = Query(default=20, ge=1, le=100)
asset_id: str,
db: DbSession,
limit: Annotated[
int,
Query(ge=1, le=100, description="返回版本数量上限。"),
] = 20,
) -> list[AgentAssetVersionRead]:
try:
return AgentAssetService(db).list_versions(asset_id, limit=limit)
@@ -106,13 +186,25 @@ def list_agent_asset_versions(
"/{asset_id}/versions",
response_model=AgentAssetVersionRead,
status_code=status.HTTP_201_CREATED,
summary="创建资产版本",
description="为指定资产创建新版本;规则使用 Markdown其他资产使用 JSON 快照。",
responses={
status.HTTP_400_BAD_REQUEST: {
"model": ErrorResponse,
"description": "版本号重复或内容类型不匹配。",
},
status.HTTP_404_NOT_FOUND: {
"model": ErrorResponse,
"description": "资产不存在。",
},
},
)
def create_agent_asset_version(
asset_id: str,
payload: AgentAssetVersionCreate,
db: DbSession,
x_actor: Annotated[str | None, Header()] = None,
x_request_id: Annotated[str | None, Header()] = None,
x_actor: ActorHeader = None,
x_request_id: RequestIdHeader = None,
) -> AgentAssetVersionRead:
try:
return AgentAssetService(db).create_version(
@@ -126,14 +218,28 @@ def create_agent_asset_version(
@router.post(
"/{asset_id}/reviews", response_model=AgentAssetReviewRead, status_code=status.HTTP_201_CREATED
"/{asset_id}/reviews",
response_model=AgentAssetReviewRead,
status_code=status.HTTP_201_CREATED,
summary="创建资产审核记录",
description="为指定资产版本写入审核结果,并联动更新资产状态。",
responses={
status.HTTP_400_BAD_REQUEST: {
"model": ErrorResponse,
"description": "审核参数不合法。",
},
status.HTTP_404_NOT_FOUND: {
"model": ErrorResponse,
"description": "资产或版本不存在。",
},
},
)
def create_agent_asset_review(
asset_id: str,
payload: AgentAssetReviewCreate,
db: DbSession,
x_actor: Annotated[str | None, Header()] = None,
x_request_id: Annotated[str | None, Header()] = None,
x_actor: ActorHeader = None,
x_request_id: RequestIdHeader = None,
) -> AgentAssetReviewRead:
try:
return AgentAssetService(db).create_review(
@@ -146,12 +252,27 @@ def create_agent_asset_review(
_handle_asset_error(exc)
@router.post("/{asset_id}/activate", response_model=AgentAssetRead)
@router.post(
"/{asset_id}/activate",
response_model=AgentAssetRead,
summary="激活资产当前版本",
description="将资产当前版本切换为上线状态;规则资产必须已有 `approved` 审核记录。",
responses={
status.HTTP_400_BAD_REQUEST: {
"model": ErrorResponse,
"description": "审核未通过或当前版本未设置。",
},
status.HTTP_404_NOT_FOUND: {
"model": ErrorResponse,
"description": "资产不存在。",
},
},
)
def activate_agent_asset(
asset_id: str,
db: DbSession,
x_actor: Annotated[str | None, Header()] = None,
x_request_id: Annotated[str | None, Header()] = None,
x_actor: ActorHeader = None,
x_request_id: RequestIdHeader = None,
) -> AgentAssetRead:
try:
return AgentAssetService(db).activate_asset(

View File

@@ -7,26 +7,55 @@ from sqlalchemy.orm import Session
from app.api.deps import get_db
from app.schemas.agent_run import AgentRunRead
from app.schemas.common import ErrorResponse
from app.services.agent_runs import AgentRunService
router = APIRouter(prefix="/agent-runs")
DbSession = Annotated[Session, Depends(get_db)]
@router.get("", response_model=list[AgentRunRead])
@router.get(
"",
response_model=list[AgentRunRead],
summary="查询 Agent 运行日志",
description="按 Agent、运行状态、来源和数量限制筛选运行日志。",
)
def list_agent_runs(
db: DbSession,
agent: str | None = Query(default=None),
status_value: str | None = Query(default=None, alias="status"),
source: str | None = Query(default=None),
limit: int = Query(default=20, ge=1, le=100),
agent: Annotated[
str | None,
Query(description="Agent 名称筛选。"),
] = None,
status_value: Annotated[
str | None,
Query(alias="status", description="运行状态筛选。"),
] = None,
source: Annotated[
str | None,
Query(description="运行来源筛选。"),
] = None,
limit: Annotated[
int,
Query(ge=1, le=100, description="返回记录上限。"),
] = 20,
) -> list[AgentRunRead]:
return AgentRunService(db).list_runs(
agent=agent, status=status_value, source=source, limit=limit
)
@router.get("/{run_id}", response_model=AgentRunRead)
@router.get(
"/{run_id}",
response_model=AgentRunRead,
summary="读取单次 Agent 运行详情",
description="按 `run_id` 返回单次执行的路由结果、工具调用和语义解析信息。",
responses={
status.HTTP_404_NOT_FOUND: {
"model": ErrorResponse,
"description": "运行记录不存在。",
}
},
)
def get_agent_run(run_id: str, db: DbSession) -> AgentRunRead:
run = AgentRunService(db).get_run(run_id)
if run is None:

View File

@@ -13,13 +13,30 @@ router = APIRouter(prefix="/audit-logs")
DbSession = Annotated[Session, Depends(get_db)]
@router.get("", response_model=list[AuditLogRead])
@router.get(
"",
response_model=list[AuditLogRead],
summary="查询审计日志",
description="按资源类型、资源 ID、动作类型和数量限制筛选审计日志。",
)
def list_audit_logs(
db: DbSession,
resource_type: str | None = Query(default=None),
resource_id: str | None = Query(default=None),
action: str | None = Query(default=None),
limit: int = Query(default=50, ge=1, le=200),
resource_type: Annotated[
str | None,
Query(description="资源类型筛选,例如 `rule`、`task`。"),
] = None,
resource_id: Annotated[
str | None,
Query(description="资源主键或业务编码筛选。"),
] = None,
action: Annotated[
str | None,
Query(description="动作名称筛选。"),
] = None,
limit: Annotated[
int,
Query(ge=1, le=200, description="返回日志上限。"),
] = 50,
) -> list[AuditLogRead]:
return AuditLogService(db).list_logs(
resource_type=resource_type,

View File

@@ -7,13 +7,25 @@ from sqlalchemy.orm import Session
from app.api.deps import get_db
from app.schemas.auth import LoginRequest, LoginResponse
from app.schemas.common import ErrorResponse
from app.services.auth import AuthService
router = APIRouter(prefix="/auth")
DbSession = Annotated[Session, Depends(get_db)]
@router.post("/login", response_model=LoginResponse)
@router.post(
"/login",
response_model=LoginResponse,
summary="用户登录",
description="支持管理员账号和员工账号登录,成功后返回前端会话所需的用户信息。",
responses={
status.HTTP_401_UNAUTHORIZED: {
"model": ErrorResponse,
"description": "账号或密码错误。",
}
},
)
def login(payload: LoginRequest, db: DbSession) -> LoginResponse:
try:
return AuthService(db).login(payload)

View File

@@ -9,11 +9,22 @@ from app.schemas.bootstrap import BootstrapSetupPayload, BootstrapStateRead
router = APIRouter(prefix="/bootstrap")
@router.get("", response_model=BootstrapStateRead)
@router.get(
"",
response_model=BootstrapStateRead,
summary="读取初始化状态",
description="返回当前系统是否已完成初始化,以及公司、数据库和缓存配置快照。",
)
def get_bootstrap_state() -> BootstrapStateRead:
return build_bootstrap_state(get_settings())
@router.post("", response_model=BootstrapStateRead, status_code=status.HTTP_201_CREATED)
@router.post(
"",
response_model=BootstrapStateRead,
status_code=status.HTTP_201_CREATED,
summary="写入初始化配置",
description="保存系统初始化配置,并刷新运行时数据库连接。",
)
def initialize_bootstrap(payload: BootstrapSetupPayload) -> BootstrapStateRead:
return persist_bootstrap_config(payload, get_settings())

View File

@@ -6,6 +6,7 @@ from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy.orm import Session
from app.api.deps import get_db
from app.schemas.common import ErrorResponse
from app.schemas.employee import EmployeeCreate, EmployeeMetaRead, EmployeeRead, EmployeeUpdate
from app.services.employee import EmployeeService
@@ -13,21 +14,49 @@ router = APIRouter()
DbSession = Annotated[Session, Depends(get_db)]
@router.get("/meta", response_model=EmployeeMetaRead)
@router.get(
"/meta",
response_model=EmployeeMetaRead,
summary="读取员工目录元数据",
description="返回员工总数、状态汇总和可选角色列表,供员工管理页面初始化使用。",
)
def get_employee_meta(db: DbSession) -> EmployeeMetaRead:
return EmployeeService(db).get_employee_meta()
@router.get("", response_model=list[EmployeeRead])
@router.get(
"",
response_model=list[EmployeeRead],
summary="查询员工列表",
description="按状态和关键字筛选员工目录。",
)
def list_employees(
db: DbSession,
status_filter: Annotated[str | None, Query(alias="status")] = None,
keyword: str | None = None,
status_filter: Annotated[
str | None,
Query(alias="status", description="员工状态筛选值。"),
] = None,
keyword: Annotated[
str | None,
Query(description="姓名、工号、邮箱等关键字模糊查询。"),
] = None,
) -> list[EmployeeRead]:
return EmployeeService(db).list_employees(status=status_filter, keyword=keyword)
@router.post("", response_model=EmployeeRead, status_code=status.HTTP_201_CREATED)
@router.post(
"",
response_model=EmployeeRead,
status_code=status.HTTP_201_CREATED,
summary="创建员工",
description="创建新的员工目录记录,并初始化基础角色与组织归属。",
responses={
status.HTTP_400_BAD_REQUEST: {
"model": ErrorResponse,
"description": "员工数据校验失败。",
}
},
)
def create_employee(payload: EmployeeCreate, db: DbSession) -> EmployeeRead:
try:
return EmployeeService(db).create_employee(payload)
@@ -35,7 +64,18 @@ def create_employee(payload: EmployeeCreate, db: DbSession) -> EmployeeRead:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc
@router.get("/{employee_id}", response_model=EmployeeRead)
@router.get(
"/{employee_id}",
response_model=EmployeeRead,
summary="读取员工详情",
description="根据员工主键读取员工完整档案信息。",
responses={
status.HTTP_404_NOT_FOUND: {
"model": ErrorResponse,
"description": "员工不存在。",
}
},
)
def get_employee(employee_id: str, db: DbSession) -> EmployeeRead:
employee = EmployeeService(db).get_employee(employee_id)
if employee is None:
@@ -43,7 +83,22 @@ def get_employee(employee_id: str, db: DbSession) -> EmployeeRead:
return employee
@router.patch("/{employee_id}", response_model=EmployeeRead)
@router.patch(
"/{employee_id}",
response_model=EmployeeRead,
summary="更新员工",
description="更新员工基础信息、角色、密码等可维护字段。",
responses={
status.HTTP_400_BAD_REQUEST: {
"model": ErrorResponse,
"description": "请求字段不合法。",
},
status.HTTP_404_NOT_FOUND: {
"model": ErrorResponse,
"description": "员工不存在。",
},
},
)
def update_employee(employee_id: str, payload: EmployeeUpdate, db: DbSession) -> EmployeeRead:
try:
return EmployeeService(db).update_employee(employee_id, payload)
@@ -53,7 +108,18 @@ def update_employee(employee_id: str, payload: EmployeeUpdate, db: DbSession) ->
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc
@router.post("/{employee_id}/disable", response_model=EmployeeRead)
@router.post(
"/{employee_id}/disable",
response_model=EmployeeRead,
summary="停用员工",
description="将员工状态切换为停用,阻止其继续登录系统。",
responses={
status.HTTP_404_NOT_FOUND: {
"model": ErrorResponse,
"description": "员工不存在。",
}
},
)
def disable_employee(employee_id: str, db: DbSession) -> EmployeeRead:
try:
return EmployeeService(db).disable_employee(employee_id)

View File

@@ -5,12 +5,18 @@ from sqlalchemy import text
from app.core.config import get_settings
from app.db.session import get_engine
from app.schemas.common import HealthCheckRead
router = APIRouter(prefix="/health")
@router.get("")
def health_check() -> dict[str, object]:
@router.get(
"",
response_model=HealthCheckRead,
summary="服务健康检查",
description="检查服务基础状态,并在系统初始化完成后验证数据库连通性。",
)
def health_check() -> HealthCheckRead:
settings = get_settings()
database_ok = False
database_error = None
@@ -23,12 +29,12 @@ def health_check() -> dict[str, object]:
except Exception as exc: # pragma: no cover - runtime connectivity branch
database_error = str(exc)
return {
"status": "ok" if database_ok else "degraded",
"database": {
return HealthCheckRead(
status="ok" if database_ok else "degraded",
database={
"configured": settings.setup_completed,
"ok": database_ok,
"error": database_error,
},
"redis": {"configured": bool(settings.redis_url), "enabled": bool(settings.redis_url)},
}
redis={"configured": bool(settings.redis_url), "enabled": bool(settings.redis_url)},
)

View File

@@ -1,124 +1,294 @@
from __future__ import annotations
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException, Query, Request, status
from fastapi.responses import FileResponse
from app.api.deps import CurrentUserContext, get_current_user, require_admin_user
from app.schemas.knowledge import (
KnowledgeActionResponse,
KnowledgeDocumentDetailRead,
KnowledgeLibraryRead,
KnowledgeOnlyOfficeCallbackRead,
KnowledgeOnlyOfficeConfigRead,
)
from app.services.knowledge import KnowledgeService
router = APIRouter(prefix="/knowledge")
@router.get("/library", response_model=KnowledgeLibraryRead)
def get_knowledge_library(
_: Annotated[CurrentUserContext, Depends(get_current_user)],
) -> KnowledgeLibraryRead:
return KnowledgeService().list_library()
@router.get("/documents/{document_id}", response_model=KnowledgeDocumentDetailRead)
def get_knowledge_document(
document_id: str,
_: Annotated[CurrentUserContext, Depends(get_current_user)],
) -> KnowledgeDocumentDetailRead:
try:
return KnowledgeService().get_document_detail(document_id)
except FileNotFoundError as exc:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="知识库文件不存在。") from exc
@router.get("/documents/{document_id}/onlyoffice-config", response_model=KnowledgeOnlyOfficeConfigRead)
def get_knowledge_document_onlyoffice_config(
document_id: str,
current_user: Annotated[CurrentUserContext, Depends(get_current_user)],
) -> KnowledgeOnlyOfficeConfigRead:
try:
return KnowledgeService().build_onlyoffice_config(document_id, current_user)
except FileNotFoundError as exc:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="知识库文件不存在。") from exc
from __future__ import annotations
from typing import Annotated
from fastapi import APIRouter, Body, Depends, HTTPException, Query, status
from fastapi.responses import FileResponse
from app.api.deps import CurrentUserContext, get_current_user, require_admin_user
from app.schemas.common import ErrorResponse
from app.schemas.knowledge import (
KnowledgeActionResponse,
KnowledgeDocumentDetailRead,
KnowledgeLibraryRead,
KnowledgeOnlyOfficeCallbackRead,
KnowledgeOnlyOfficeCallbackWrite,
KnowledgeOnlyOfficeConfigRead,
)
from app.services.knowledge import KnowledgeService
router = APIRouter(prefix="/knowledge")
@router.get(
"/library",
response_model=KnowledgeLibraryRead,
summary="查询知识库目录",
description="返回固定知识库目录与当前已上传文档列表。",
responses={
status.HTTP_401_UNAUTHORIZED: {
"model": ErrorResponse,
"description": "未提供知识库访问用户头。",
}
},
)
def get_knowledge_library(
_: Annotated[CurrentUserContext, Depends(get_current_user)],
) -> KnowledgeLibraryRead:
return KnowledgeService().list_library()
@router.get(
"/documents/{document_id}",
response_model=KnowledgeDocumentDetailRead,
summary="读取知识库文档详情",
description="返回单个知识库文档的元信息、预览类型和预览内容。",
responses={
status.HTTP_401_UNAUTHORIZED: {
"model": ErrorResponse,
"description": "未提供知识库访问用户头。",
},
status.HTTP_404_NOT_FOUND: {
"model": ErrorResponse,
"description": "知识库文件不存在。",
},
},
)
def get_knowledge_document(
document_id: str,
_: Annotated[CurrentUserContext, Depends(get_current_user)],
) -> KnowledgeDocumentDetailRead:
try:
return KnowledgeService().get_document_detail(document_id)
except FileNotFoundError as exc:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="知识库文件不存在。",
) from exc
@router.get(
"/documents/{document_id}/onlyoffice-config",
response_model=KnowledgeOnlyOfficeConfigRead,
summary="读取 ONLYOFFICE 预览配置",
description="为支持的 Office 文档生成 ONLYOFFICE 前端配置和临时访问令牌。",
responses={
status.HTTP_400_BAD_REQUEST: {
"model": ErrorResponse,
"description": "ONLYOFFICE 未启用、配置不完整或文件格式不支持。",
},
status.HTTP_401_UNAUTHORIZED: {
"model": ErrorResponse,
"description": "未提供知识库访问用户头。",
},
status.HTTP_404_NOT_FOUND: {
"model": ErrorResponse,
"description": "知识库文件不存在。",
},
},
)
def get_knowledge_document_onlyoffice_config(
document_id: str,
current_user: Annotated[CurrentUserContext, Depends(get_current_user)],
) -> KnowledgeOnlyOfficeConfigRead:
try:
return KnowledgeService().build_onlyoffice_config(document_id, current_user)
except FileNotFoundError as exc:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="知识库文件不存在。",
) from exc
except ValueError as exc:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc
@router.post(
"/documents",
response_model=KnowledgeDocumentDetailRead,
status_code=status.HTTP_201_CREATED,
summary="上传知识库文档",
description="上传原始文件二进制内容到指定知识库目录。已有同名文件会覆盖并提升版本号。",
responses={
status.HTTP_400_BAD_REQUEST: {
"model": ErrorResponse,
"description": "目录、文件名或文件内容不合法。",
},
status.HTTP_401_UNAUTHORIZED: {
"model": ErrorResponse,
"description": "未提供知识库访问用户头。",
},
status.HTTP_403_FORBIDDEN: {
"model": ErrorResponse,
"description": "只有管理员可以上传知识库文件。",
},
},
)
def upload_knowledge_document(
content: Annotated[
bytes,
Body(
media_type="application/octet-stream",
description="待上传的文件二进制内容。",
),
],
folder: Annotated[str, Query(min_length=1, description="目标知识库目录名称。")],
filename: Annotated[str, Query(min_length=1, description="原始文件名。")],
current_user: Annotated[CurrentUserContext, Depends(require_admin_user)],
) -> KnowledgeDocumentDetailRead:
try:
return KnowledgeService().upload_document(folder, filename, content, current_user)
except ValueError as exc:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc
@router.delete(
"/documents/{document_id}",
response_model=KnowledgeActionResponse,
summary="删除知识库文档",
description="删除知识库文档及其索引记录。",
responses={
status.HTTP_401_UNAUTHORIZED: {
"model": ErrorResponse,
"description": "未提供知识库访问用户头。",
},
status.HTTP_403_FORBIDDEN: {
"model": ErrorResponse,
"description": "只有管理员可以删除知识库文件。",
},
status.HTTP_404_NOT_FOUND: {
"model": ErrorResponse,
"description": "知识库文件不存在。",
},
},
)
def delete_knowledge_document(
document_id: str,
_: Annotated[CurrentUserContext, Depends(require_admin_user)],
) -> KnowledgeActionResponse:
try:
KnowledgeService().delete_document(document_id)
except FileNotFoundError as exc:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="知识库文件不存在。",
) from exc
return KnowledgeActionResponse(detail="知识库文件已删除。")
@router.get(
"/documents/{document_id}/content",
response_class=FileResponse,
summary="下载或预览知识库原文",
description="根据文档 ID 返回原始文件内容,可用于浏览器内联预览或下载。",
responses={
status.HTTP_200_OK: {
"description": "文件内容。",
"content": {"application/octet-stream": {}},
},
status.HTTP_401_UNAUTHORIZED: {
"model": ErrorResponse,
"description": "未提供知识库访问用户头。",
},
status.HTTP_404_NOT_FOUND: {
"model": ErrorResponse,
"description": "知识库文件不存在。",
},
},
)
def get_knowledge_document_content(
document_id: str,
disposition: Annotated[
str,
Query(
pattern="^(inline|attachment)$",
description="内容展示方式,支持 `inline` 或 `attachment`。",
),
] = "inline",
_: Annotated[CurrentUserContext, Depends(get_current_user)] = None,
) -> FileResponse:
try:
file_path, media_type, filename = KnowledgeService().get_document_content(document_id)
except FileNotFoundError as exc:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="知识库文件不存在。",
) from exc
@router.post("/documents", response_model=KnowledgeDocumentDetailRead, status_code=status.HTTP_201_CREATED)
async def upload_knowledge_document(
request: Request,
folder: Annotated[str, Query(min_length=1)],
filename: Annotated[str, Query(min_length=1)],
current_user: Annotated[CurrentUserContext, Depends(require_admin_user)],
) -> KnowledgeDocumentDetailRead:
content = await request.body()
try:
return KnowledgeService().upload_document(folder, filename, content, current_user)
except ValueError as exc:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc
@router.delete("/documents/{document_id}", response_model=KnowledgeActionResponse)
def delete_knowledge_document(
document_id: str,
_: Annotated[CurrentUserContext, Depends(require_admin_user)],
) -> KnowledgeActionResponse:
try:
KnowledgeService().delete_document(document_id)
except FileNotFoundError as exc:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="知识库文件不存在。") from exc
return KnowledgeActionResponse(detail="知识库文件已删除。")
@router.get("/documents/{document_id}/content")
def get_knowledge_document_content(
document_id: str,
disposition: Annotated[str, Query(pattern="^(inline|attachment)$")] = "inline",
_: Annotated[CurrentUserContext, Depends(get_current_user)] = None,
) -> FileResponse:
try:
file_path, media_type, filename = KnowledgeService().get_document_content(document_id)
except FileNotFoundError as exc:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="知识库文件不存在。") from exc
_ = disposition
return FileResponse(file_path, media_type=media_type, filename=filename)
@router.get("/documents/{document_id}/onlyoffice/content")
def get_knowledge_document_onlyoffice_content(
document_id: str,
access_token: Annotated[str, Query(min_length=1)],
) -> FileResponse:
try:
service = KnowledgeService()
service.validate_onlyoffice_access_token(document_id, access_token)
file_path, media_type, filename = service.get_document_content(document_id)
except FileNotFoundError as exc:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="知识库文件不存在。") from exc
_ = disposition
return FileResponse(file_path, media_type=media_type, filename=filename)
@router.get(
"/documents/{document_id}/onlyoffice/content",
response_class=FileResponse,
summary="读取 ONLYOFFICE 文档源文件",
description="供 ONLYOFFICE 服务通过短时访问令牌拉取原始文件内容。",
responses={
status.HTTP_200_OK: {
"description": "文件内容。",
"content": {"application/octet-stream": {}},
},
status.HTTP_401_UNAUTHORIZED: {
"model": ErrorResponse,
"description": "ONLYOFFICE 访问令牌无效。",
},
status.HTTP_404_NOT_FOUND: {
"model": ErrorResponse,
"description": "知识库文件不存在。",
},
},
)
def get_knowledge_document_onlyoffice_content(
document_id: str,
access_token: Annotated[
str,
Query(min_length=1, description="ONLYOFFICE 临时访问令牌。"),
],
) -> FileResponse:
try:
service = KnowledgeService()
service.validate_onlyoffice_access_token(document_id, access_token)
file_path, media_type, filename = service.get_document_content(document_id)
except FileNotFoundError as exc:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="知识库文件不存在。",
) from exc
except ValueError as exc:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=str(exc)) from exc
return FileResponse(file_path, media_type=media_type, filename=filename)
@router.post("/documents/{document_id}/onlyoffice/callback", response_model=KnowledgeOnlyOfficeCallbackRead)
async def handle_knowledge_document_onlyoffice_callback(
document_id: str,
request: Request,
) -> KnowledgeOnlyOfficeCallbackRead:
payload = await request.json()
try:
KnowledgeService().handle_onlyoffice_callback(document_id, payload)
except FileNotFoundError as exc:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="知识库文件不存在。") from exc
except ValueError as exc:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc
return FileResponse(file_path, media_type=media_type, filename=filename)
@router.post(
"/documents/{document_id}/onlyoffice/callback",
response_model=KnowledgeOnlyOfficeCallbackRead,
summary="接收 ONLYOFFICE 回调",
description="接收 ONLYOFFICE 文档回写回调,在状态满足要求时更新知识库文件内容。",
responses={
status.HTTP_400_BAD_REQUEST: {
"model": ErrorResponse,
"description": "回调载荷不合法。",
},
status.HTTP_404_NOT_FOUND: {
"model": ErrorResponse,
"description": "知识库文件不存在。",
},
},
)
def handle_knowledge_document_onlyoffice_callback(
document_id: str,
payload: KnowledgeOnlyOfficeCallbackWrite,
) -> KnowledgeOnlyOfficeCallbackRead:
try:
KnowledgeService().handle_onlyoffice_callback(document_id, payload.model_dump())
except FileNotFoundError as exc:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="知识库文件不存在。",
) from exc
except ValueError as exc:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc
return KnowledgeOnlyOfficeCallbackRead()

View File

@@ -6,6 +6,7 @@ from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from app.api.deps import get_db
from app.schemas.common import ErrorResponse
from app.schemas.reimbursement import ReimbursementCreate, ReimbursementRead
from app.services.reimbursement import ReimbursementService
@@ -13,17 +14,39 @@ router = APIRouter()
DbSession = Annotated[Session, Depends(get_db)]
@router.get("", response_model=list[ReimbursementRead])
@router.get(
"",
response_model=list[ReimbursementRead],
summary="查询报销申请列表",
description="返回当前系统中的报销申请列表。",
)
def list_reimbursements(db: DbSession) -> list[ReimbursementRead]:
return ReimbursementService(db).list_reimbursements()
@router.post("", response_model=ReimbursementRead, status_code=status.HTTP_201_CREATED)
@router.post(
"",
response_model=ReimbursementRead,
status_code=status.HTTP_201_CREATED,
summary="创建报销申请",
description="创建一条新的报销申请记录,初始状态为 `draft`。",
)
def create_reimbursement(payload: ReimbursementCreate, db: DbSession) -> ReimbursementRead:
return ReimbursementService(db).create_reimbursement(payload)
@router.get("/{request_id}", response_model=ReimbursementRead)
@router.get(
"/{request_id}",
response_model=ReimbursementRead,
summary="读取报销申请详情",
description="根据报销申请主键读取单据详情。",
responses={
status.HTTP_404_NOT_FOUND: {
"model": ErrorResponse,
"description": "报销申请不存在。",
}
},
)
def get_reimbursement(request_id: str, db: DbSession) -> ReimbursementRead:
request = ReimbursementService(db).get_reimbursement(request_id)
if request is None:

View File

@@ -6,7 +6,8 @@ from fastapi import APIRouter, Depends, Header, HTTPException, status
from sqlalchemy.orm import Session
from app.api.deps import get_db
from app.core.config import get_settings
from app.core.config import get_settings as get_runtime_settings
from app.schemas.common import ErrorResponse
from app.schemas.settings import (
ModelConnectivityTestRead,
ModelConnectivityTestRequest,
@@ -22,9 +23,12 @@ DbSession = Annotated[Session, Depends(get_db)]
def require_hermes_agent_token(
authorization: Annotated[str | None, Header()] = None,
authorization: Annotated[
str | None,
Header(description="Hermes 读取运行时模型配置时使用的 Bearer Token。"),
] = None,
) -> None:
configured_token = str(get_settings().hermes_agent_shared_token or "").strip()
configured_token = str(get_runtime_settings().hermes_agent_shared_token or "").strip()
if not configured_token:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
@@ -40,12 +44,28 @@ def require_hermes_agent_token(
)
@router.get("", response_model=SettingsRead)
@router.get(
"",
response_model=SettingsRead,
summary="读取系统设置",
description="返回公司、管理员、模型、日志、邮件和 ONLYOFFICE 的设置快照。",
)
def get_settings(db: DbSession) -> SettingsRead:
return SettingsService(db).get_settings_snapshot()
@router.put("", response_model=SettingsRead)
@router.put(
"",
response_model=SettingsRead,
summary="保存系统设置",
description="保存系统设置,并同步运行时模型配置与 Hermes 使用的模型路由。",
responses={
status.HTTP_400_BAD_REQUEST: {
"model": ErrorResponse,
"description": "设置字段校验失败。",
}
},
)
def update_settings(payload: SettingsWrite, db: DbSession) -> SettingsRead:
try:
return SettingsService(db).save_settings_snapshot(payload)
@@ -53,8 +73,16 @@ def update_settings(payload: SettingsWrite, db: DbSession) -> SettingsRead:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc
@router.post("/model-connectivity", response_model=ModelConnectivityTestRead)
def test_model_connectivity(payload: ModelConnectivityTestRequest, db: DbSession) -> ModelConnectivityTestRead:
@router.post(
"/model-connectivity",
response_model=ModelConnectivityTestRead,
summary="测试模型连通性",
description="验证指定模型服务端点是否可用;当未传 API Key 且提供 slot 时会尝试复用已保存密钥。",
)
def test_model_connectivity(
payload: ModelConnectivityTestRequest,
db: DbSession,
) -> ModelConnectivityTestRead:
resolved_payload = payload
if not payload.api_key and payload.slot:
@@ -69,6 +97,22 @@ def test_model_connectivity(payload: ModelConnectivityTestRequest, db: DbSession
"/runtime-models/{slot}",
response_model=RuntimeModelConfigRead,
dependencies=[Depends(require_hermes_agent_token)],
summary="读取 Hermes 运行时模型配置",
description="供 Hermes 进程读取主模型、备用模型、VLM 或 Embedding 模型的运行时配置。",
responses={
status.HTTP_401_UNAUTHORIZED: {
"model": ErrorResponse,
"description": "Hermes 令牌校验失败。",
},
status.HTTP_404_NOT_FOUND: {
"model": ErrorResponse,
"description": "指定模型槽位不存在。",
},
status.HTTP_503_SERVICE_UNAVAILABLE: {
"model": ErrorResponse,
"description": "Hermes 集成令牌尚未配置。",
},
},
)
def get_runtime_model_config(
slot: str,

View File

@@ -0,0 +1,80 @@
API_DESCRIPTION = """
X-Financial 后端 OpenAPI 文档。
## 基本信息
- 所有业务接口都挂在 `/api/v1` 前缀下。
- 交互式 Swagger 页面默认位于 `/docs`ReDoc 页面位于 `/redoc`。
- 仓库内导出的静态规范文件位于 `document/development/backend_api/openapi.json`。
## 鉴权约定
- 知识库接口依赖以下请求头模拟当前用户:
- `X-Auth-Username`
- `X-Auth-Name`
- `X-Auth-Role-Codes`
- `X-Auth-Is-Admin`
- Agent 资产写接口支持以下审计头:
- `X-Actor`
- `X-Request-Id`
- Hermes 运行时模型配置接口需要:
- `Authorization: Bearer <HERMES_AGENT_SHARED_TOKEN>`
## 当前模块范围
- 系统初始化与健康检查
- 登录认证
- 员工目录
- 报销单
- 知识库与 ONLYOFFICE
- 系统设置与模型连通性
- Agent 资产、运行日志、审计日志
""".strip()
OPENAPI_TAGS = [
{
"name": "health",
"description": "服务健康检查与数据库 / Redis 连通性状态。",
},
{
"name": "bootstrap",
"description": "系统初始化配置,包括公司信息、数据库和缓存配置。",
},
{
"name": "auth",
"description": "后台登录认证接口,支持管理员和员工账号登录。",
},
{
"name": "employees",
"description": "员工目录管理,包括员工列表、详情、创建、更新和停用。",
},
{
"name": "reimbursements",
"description": "报销申请基础接口,包含列表、创建和详情查询。",
},
{
"name": "knowledge",
"description": "知识库文件管理、内容访问与 ONLYOFFICE 集成接口。",
},
{
"name": "settings",
"description": "系统设置、模型配置、模型连通性探测和 Hermes 运行时模型配置。",
},
{
"name": "agent-assets",
"description": "Agent 资产中心覆盖规则、技能、MCP、任务及其版本、审核和上线流程。",
},
{
"name": "agent-runs",
"description": "Agent 运行日志查询,包括工具调用和语义解析结果。",
},
{
"name": "audit-logs",
"description": "系统审计日志查询接口,用于追踪资产和任务写操作。",
},
{
"name": "root",
"description": "服务根入口,用于确认应用已启动。",
},
]

View File

@@ -1,19 +1,41 @@
from __future__ import annotations
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from __future__ import annotations
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.api.router import api_router
from app.core.config import get_settings
from app.core.logging import get_logger, setup_logging
from app.core.openapi import API_DESCRIPTION, OPENAPI_TAGS
from app.middleware.logging import AccessLogMiddleware
from app.schemas.common import RootStatusRead
from app.services.agent_foundation import prepare_agent_foundation
from app.services.employee import prepare_employee_directory
from app.services.knowledge import prepare_knowledge_library
def create_app() -> FastAPI:
settings = get_settings()
@asynccontextmanager
async def lifespan(_: FastAPI) -> AsyncIterator[None]:
settings = get_settings()
logger = get_logger("app.main")
prepare_employee_directory()
prepare_agent_foundation()
prepare_knowledge_library()
logger.info(
"Server ready - host=%s port=%s prefix=%s",
settings.app_host,
settings.app_port,
settings.api_v1_prefix,
)
yield
def create_app() -> FastAPI:
settings = get_settings()
setup_logging(
level=settings.log_level,
@@ -26,11 +48,14 @@ def create_app() -> FastAPI:
"Starting %s (env=%s, debug=%s)", settings.app_name, settings.app_env, settings.app_debug
)
app = FastAPI(
title=settings.app_name,
debug=settings.app_debug,
version="0.1.0",
)
app = FastAPI(
title=settings.app_name,
debug=settings.app_debug,
version="0.1.0",
description=API_DESCRIPTION,
openapi_tags=OPENAPI_TAGS,
lifespan=lifespan,
)
app.add_middleware(AccessLogMiddleware)
@@ -45,23 +70,17 @@ def create_app() -> FastAPI:
app.include_router(api_router, prefix=settings.api_v1_prefix)
@app.get("/", tags=["root"])
def root() -> dict[str, str]:
return {"message": f"{settings.app_name} is running"}
@app.on_event("startup")
def _on_startup() -> None:
prepare_employee_directory()
prepare_agent_foundation()
prepare_knowledge_library()
logger.info(
"Server ready - host=%s port=%s prefix=%s",
settings.app_host,
settings.app_port,
settings.api_v1_prefix,
)
return app
@app.get(
"/",
tags=["root"],
response_model=RootStatusRead,
summary="服务根检查",
description="用于快速确认后端服务进程已经启动。",
)
def root() -> RootStatusRead:
return RootStatusRead(message=f"{settings.app_name} is running")
return app
app = create_app()

View File

@@ -0,0 +1,28 @@
from __future__ import annotations
from pydantic import BaseModel
class ErrorResponse(BaseModel):
detail: str
class RootStatusRead(BaseModel):
message: str
class HealthDatabaseStatusRead(BaseModel):
configured: bool
ok: bool
error: str | None = None
class HealthRedisStatusRead(BaseModel):
configured: bool
enabled: bool
class HealthCheckRead(BaseModel):
status: str
database: HealthDatabaseStatusRead
redis: HealthRedisStatusRead

View File

@@ -1,8 +1,8 @@
from __future__ import annotations
from typing import Any
from pydantic import BaseModel, Field
from typing import Any
from pydantic import BaseModel, ConfigDict, Field
class KnowledgeFolderRead(BaseModel):
@@ -58,13 +58,21 @@ class KnowledgeOnlyOfficeConfigRead(BaseModel):
config: dict[str, Any] = Field(default_factory=dict)
class KnowledgeOnlyOfficeCallbackRead(BaseModel):
error: int = 0
class KnowledgeLibraryRead(BaseModel):
folders: list[KnowledgeFolderRead] = Field(default_factory=list)
documents: list[KnowledgeDocumentRead] = Field(default_factory=list)
class KnowledgeOnlyOfficeCallbackRead(BaseModel):
error: int = 0
class KnowledgeOnlyOfficeCallbackWrite(BaseModel):
model_config = ConfigDict(extra="allow")
status: int = Field(description="ONLYOFFICE 回调状态码。")
url: str | None = Field(default=None, description="文档下载地址,状态为 2 或 6 时使用。")
users: list[str] = Field(default_factory=list, description="当前编辑用户列表。")
class KnowledgeLibraryRead(BaseModel):
folders: list[KnowledgeFolderRead] = Field(default_factory=list)
documents: list[KnowledgeDocumentRead] = Field(default_factory=list)
class KnowledgeActionResponse(BaseModel):

View File

@@ -0,0 +1,29 @@
from __future__ import annotations
from app.core.config import get_settings
from app.main import create_app
def test_openapi_schema_includes_documented_backend_routes() -> None:
schema = create_app().openapi()
assert schema["info"]["title"] == get_settings().app_name
assert any(tag["name"] == "agent-assets" for tag in schema["tags"])
assert any(tag["name"] == "knowledge" for tag in schema["tags"])
agent_assets_post = schema["paths"]["/api/v1/agent-assets"]["post"]
assert agent_assets_post["summary"] == "创建 Agent 资产"
assert any(param["name"] == "x-actor" for param in agent_assets_post["parameters"])
knowledge_upload_post = schema["paths"]["/api/v1/knowledge/documents"]["post"]
assert knowledge_upload_post["summary"] == "上传知识库文档"
assert "application/octet-stream" in knowledge_upload_post["requestBody"]["content"]
knowledge_callback_post = schema["paths"][
"/api/v1/knowledge/documents/{document_id}/onlyoffice/callback"
]["post"]
assert knowledge_callback_post["summary"] == "接收 ONLYOFFICE 回调"
assert "application/json" in knowledge_callback_post["requestBody"]["content"]
root_get = schema["paths"]["/"]["get"]
assert root_get["summary"] == "服务根检查"