feat(server): 新增LLM维基服务,实现大语言模型知识库管理和查询功能

This commit is contained in:
caoxiaozhu
2026-05-15 06:57:45 +00:00
parent 244b3a58f7
commit ea339d883a
2 changed files with 2281 additions and 0 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,524 @@
from __future__ import annotations
import json
from subprocess import TimeoutExpired
from collections.abc import Generator
from pathlib import Path
import pytest
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import Session, sessionmaker
from sqlalchemy.pool import StaticPool
from app.api.deps import CurrentUserContext, get_db
from app.core.agent_enums import AgentReviewStatus, AgentRunSource, AgentRunStatus
from app.db.base import Base
from app.main import create_app
from app.schemas.agent_asset import AgentAssetReviewCreate
from app.schemas.knowledge import LlmWikiSummaryUpdateWrite, LlmWikiSyncRead
from app.services.agent_assets import AgentAssetService
from app.services.agent_runs import AgentRunService
from app.services.knowledge import (
KNOWLEDGE_INGEST_STATUS_FAILED,
KNOWLEDGE_INGEST_STATUS_INGESTED,
KNOWLEDGE_INGEST_STATUS_PUBLISHED,
KnowledgeService,
)
from app.services.llm_wiki import LlmWikiService
def build_session() -> Session:
engine = create_engine(
"sqlite+pysqlite:///:memory:",
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
Base.metadata.create_all(bind=engine)
session_factory = sessionmaker(bind=engine, autoflush=False, autocommit=False)
return session_factory()
def build_client() -> tuple[TestClient, sessionmaker[Session]]:
engine = create_engine(
"sqlite+pysqlite:///:memory:",
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
Base.metadata.create_all(bind=engine)
session_factory = sessionmaker(bind=engine, autoflush=False, autocommit=False)
app = create_app()
def override_db() -> Generator[Session, None, None]:
db = session_factory()
try:
yield db
finally:
db.close()
app.dependency_overrides[get_db] = override_db
return TestClient(app), session_factory
def build_admin_user() -> CurrentUserContext:
return CurrentUserContext(
username="admin",
name="管理员",
role_codes=["manager"],
is_admin=True,
)
def upload_policy_document(storage_root: Path, *, filename: str = "公司差旅报销制度.txt") -> str:
service = KnowledgeService(storage_root=storage_root)
service.ensure_library_ready()
document = service.upload_document(
folder="报销制度",
filename=filename,
content=(
"第一章 差旅报销\n"
"员工因公出差发生的住宿费应按照公司差旅标准执行。\n"
"住宿费超过标准时,必须升级至总经理审批。\n"
"报销时必须提供发票、行程单和审批说明。\n"
).encode("utf-8"),
current_user=build_admin_user(),
)
return document.id
def build_candidate_payload(chunk_id: str, *, summary: str = "住宿费超过标准时必须升级审批。") -> dict[str, object]:
return {
"knowledge_candidates": [
{
"title": "住宿费升级审批要求",
"content": summary,
"scenario": "reimbursement_policy",
"tags": ["住宿", "审批"],
"evidence": [summary],
"confidence": 0.91,
"source_chunk_ids": [chunk_id],
}
],
"rule_candidates": [
{
"template_key": "expense_amount_limit_v1",
"suggested_rule_name": "住宿费超标审批规则",
"summary": "当住宿费超过制度标准时触发升级审批。",
"scenario": "travel_standard",
"purpose": "识别差旅住宿费是否超出制度标准。",
"scope": "适用于员工差旅住宿报销场景。",
"inputs": ["expense_type", "amount", "travel_grade"],
"judgement_logic": [summary],
"outputs": ["approval_required=true", "risk_level=medium"],
"admin_note": "上线前需要由财务补充不同职级的金额阈值。",
"runtime_rule": {
"target": {
"expense_types": ["hotel"],
"scene_codes": ["travel_standard"],
"metric": "item_amount",
},
"threshold": {
"currency": "CNY",
"comparator": "gt",
"warn_amount": "450.00",
"block_amount": "600.00",
"source": "document_value",
},
"exception_policy": {
"allow_with_explanation": True,
"keywords": ["超标说明", "协议酒店满房"],
},
"output": {
"risk_code": "travel_hotel_limit",
"action": "review",
"message": "住宿费超过制度标准时需要升级审批。",
},
},
"evidence": [summary],
"confidence": 0.93,
"source_chunk_ids": [chunk_id],
}
],
}
def build_invalid_candidate_payload(chunk_id: str) -> dict[str, object]:
return {
"knowledge_candidates": [],
"rule_candidates": [
{
"template_key": "expense_amount_limit_v1",
"suggested_rule_name": "无效金额规则草稿",
"summary": "用于验证 schema 强校验。",
"scenario": "travel_standard",
"purpose": "验证不合规的 runtime_rule 不会落到规则中心。",
"scope": "测试场景。",
"inputs": ["expense_type", "amount"],
"judgement_logic": ["金额超过标准则需审批。"],
"outputs": ["approval_required=true"],
"admin_note": "此规则故意构造错误阈值。",
"runtime_rule": {
"target": {
"expense_types": ["hotel"],
"scene_codes": ["travel_standard"],
"metric": "item_amount",
},
"threshold": {
"currency": "CNY",
"comparator": "gt",
"warn_amount": "600.00",
"block_amount": "450.00",
"source": "document_value",
},
"output": {
"risk_code": "travel_hotel_limit",
"action": "review",
"message": "无效阈值。",
},
},
"evidence": ["金额阈值配置不应允许 block 小于 warn。"],
"confidence": 0.88,
"source_chunk_ids": [chunk_id],
}
],
}
def update_document_timestamp(storage_root: Path, document_id: str, updated_at: str) -> None:
index_path = storage_root / "knowledge" / ".index.json"
payload = json.loads(index_path.read_text(encoding="utf-8"))
for item in payload["documents"]:
if item["id"] == document_id:
item["updated_at"] = updated_at
break
index_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
def test_llm_wiki_sync_creates_artifacts_and_draft_rule(tmp_path, monkeypatch) -> None:
document_id = upload_policy_document(tmp_path)
def fake_call_candidate_model(self, *, entry, chunk_group):
return build_candidate_payload(chunk_group[0]["chunk_id"])
monkeypatch.setattr(LlmWikiService, "_call_candidate_model", fake_call_candidate_model)
with build_session() as db:
service = LlmWikiService(db, storage_root=tmp_path)
result = service.sync_folder(folder="报销制度", current_user=build_admin_user())
assert result.document_count == 1
assert result.knowledge_candidate_count == 1
assert result.rule_candidate_count == 1
assert result.generated_rule_count == 1
assert len(result.generated_rule_asset_ids) == 1
document_dir = tmp_path / "knowledge" / ".llm_wiki" / "documents" / document_id
assert (document_dir / "document.json").exists()
assert (document_dir / "text.md").exists()
assert (document_dir / "chunks.json").exists()
assert (document_dir / "knowledge_candidates.json").exists()
assert (document_dir / "knowledge_summary.md").exists()
assert (document_dir / "rule_candidates.json").exists()
document_payload = json.loads((document_dir / "document.json").read_text(encoding="utf-8"))
assert document_payload["sync_reason"] == "initial_build"
detail = service.get_document_detail(document_id)
assert "公司差旅报销制度.txt 知识总结" in detail.knowledge_summary_markdown
assert "住宿费升级审批要求" in detail.knowledge_summary_markdown
asset = AgentAssetService(db).get_asset(result.generated_rule_asset_ids[0])
assert asset is not None
assert asset.status == "draft"
assert asset.config_json["llm_wiki_managed"] is True
assert asset.config_json["runtime_rule"]["template_key"] == "expense_amount_limit_v1"
assert asset.config_json["runtime_rule"]["threshold"]["block_amount"] == "600.00"
assert "```expense-rule" in str(asset.current_version_content)
def test_llm_wiki_document_summary_can_be_updated(tmp_path, monkeypatch) -> None:
document_id = upload_policy_document(tmp_path)
def fake_call_candidate_model(self, *, entry, chunk_group):
return build_candidate_payload(chunk_group[0]["chunk_id"])
monkeypatch.setattr(LlmWikiService, "_call_candidate_model", fake_call_candidate_model)
with build_session() as db:
service = LlmWikiService(db, storage_root=tmp_path)
service.sync_folder(folder="报销制度", current_user=build_admin_user())
updated = service.update_document_summary(
document_id,
LlmWikiSummaryUpdateWrite(
knowledge_summary_markdown="# 人工修订总结\n\n- 住宿费超标必须升级审批。\n- 报销时必须附发票和审批说明。"
),
)
assert updated.document_id == document_id
assert updated.knowledge_summary_markdown.startswith("# 人工修订总结")
summary_path = tmp_path / "knowledge" / ".llm_wiki" / "documents" / document_id / "knowledge_summary.md"
assert summary_path.read_text(encoding="utf-8").startswith("# 人工修订总结")
def test_llm_wiki_sync_rejects_invalid_runtime_rule_schema(tmp_path, monkeypatch) -> None:
document_id = upload_policy_document(tmp_path)
def fake_call_candidate_model(self, *, entry, chunk_group):
return build_invalid_candidate_payload(chunk_group[0]["chunk_id"])
monkeypatch.setattr(LlmWikiService, "_call_candidate_model", fake_call_candidate_model)
with build_session() as db:
service = LlmWikiService(db, storage_root=tmp_path)
result = service.sync_folder(folder="报销制度", current_user=build_admin_user())
assert result.document_count == 1
assert result.rule_candidate_count == 1
assert result.generated_rule_count == 0
document_dir = tmp_path / "knowledge" / ".llm_wiki" / "documents" / document_id
rule_candidates = json.loads((document_dir / "rule_candidates.json").read_text(encoding="utf-8"))
assert rule_candidates[0]["validation_status"] == "invalid"
assert rule_candidates[0]["status"] == "validation_failed"
assert rule_candidates[0]["validation_errors"]
assert "block_amount" in " ".join(rule_candidates[0]["validation_errors"])
def test_knowledge_document_state_changes_with_llm_wiki_sync(tmp_path, monkeypatch) -> None:
document_id = upload_policy_document(tmp_path)
def fake_call_candidate_model(self, *, entry, chunk_group):
return build_candidate_payload(chunk_group[0]["chunk_id"])
monkeypatch.setattr(LlmWikiService, "_call_candidate_model", fake_call_candidate_model)
knowledge_service = KnowledgeService(storage_root=tmp_path)
initial_detail = knowledge_service.get_document_detail(document_id)
assert initial_detail.stateCode == KNOWLEDGE_INGEST_STATUS_PUBLISHED
assert initial_detail.state == "待归纳"
with build_session() as db:
LlmWikiService(db, storage_root=tmp_path).sync_folder(
folder="报销制度",
current_user=build_admin_user(),
document_ids=[document_id],
)
ingested_detail = knowledge_service.get_document_detail(document_id)
assert ingested_detail.stateCode == KNOWLEDGE_INGEST_STATUS_INGESTED
assert ingested_detail.state == "已归纳"
updated_detail = knowledge_service.upload_document(
folder="报销制度",
filename="公司差旅报销制度.txt",
content=(
"第一章 差旅报销\n"
"员工因公出差发生的住宿费应按照公司差旅标准执行。\n"
"新增:超标住宿必须附书面说明。\n"
).encode("utf-8"),
current_user=build_admin_user(),
)
assert updated_detail.id == document_id
assert updated_detail.stateCode == KNOWLEDGE_INGEST_STATUS_PUBLISHED
assert updated_detail.state == "待归纳"
index_payload = json.loads((tmp_path / "knowledge" / ".index.json").read_text(encoding="utf-8"))
stored_entry = next(item for item in index_payload["documents"] if item["id"] == document_id)
assert stored_entry["ingest_status"] == KNOWLEDGE_INGEST_STATUS_PUBLISHED
def test_llm_wiki_sync_marks_document_failed_when_ingest_raises(tmp_path, monkeypatch) -> None:
document_id = upload_policy_document(tmp_path)
def fake_call_candidate_model(self, *, entry, chunk_group):
raise RuntimeError("simulated llm wiki failure")
monkeypatch.setattr(LlmWikiService, "_call_candidate_model", fake_call_candidate_model)
with build_session() as db:
service = LlmWikiService(db, storage_root=tmp_path)
with pytest.raises(RuntimeError, match="simulated llm wiki failure"):
service.sync_folder(
folder="报销制度",
current_user=build_admin_user(),
document_ids=[document_id],
)
detail = KnowledgeService(storage_root=tmp_path).get_document_detail(document_id)
assert detail.stateCode == KNOWLEDGE_INGEST_STATUS_FAILED
assert detail.state == "归纳失败"
def test_llm_wiki_sync_uses_fallback_candidates_when_system_hermes_times_out(
tmp_path,
monkeypatch,
) -> None:
document_id = upload_policy_document(tmp_path)
with build_session() as db:
service = LlmWikiService(db, storage_root=tmp_path)
monkeypatch.setattr(service.system_hermes_service, "is_available", lambda: True)
def fake_run_query(*args, **kwargs):
raise TimeoutExpired(cmd="hermes", timeout=1)
monkeypatch.setattr(service.system_hermes_service, "run_query", fake_run_query)
runtime_called = {"count": 0}
def fail_runtime_complete(*args, **kwargs):
runtime_called["count"] += 1
raise AssertionError("system hermes timeout should fall back directly to local candidate builder")
monkeypatch.setattr(service.runtime_chat_service, "complete", fail_runtime_complete)
result = service.sync_folder(
folder="报销制度",
current_user=build_admin_user(),
document_ids=[document_id],
)
assert result.document_count == 1
assert result.knowledge_candidate_count >= 1
assert runtime_called["count"] == 0
detail = KnowledgeService(storage_root=tmp_path).get_document_detail(document_id)
assert detail.stateCode == KNOWLEDGE_INGEST_STATUS_INGESTED
assert detail.state == "已归纳"
def test_llm_wiki_sync_skips_unchanged_and_rebuilds_on_updated_at_change(tmp_path, monkeypatch) -> None:
document_id = upload_policy_document(tmp_path)
def fake_call_candidate_model(self, *, entry, chunk_group):
return build_candidate_payload(chunk_group[0]["chunk_id"])
monkeypatch.setattr(LlmWikiService, "_call_candidate_model", fake_call_candidate_model)
with build_session() as db:
service = LlmWikiService(db, storage_root=tmp_path)
first = service.sync_folder(folder="报销制度", current_user=build_admin_user())
second = service.sync_folder(folder="报销制度", current_user=build_admin_user())
assert first.document_count == 1
assert second.document_count == 0
assert "未变化,跳过" in second.summary
update_document_timestamp(tmp_path, document_id, "2026-05-15T09:30:00+00:00")
third = service.sync_folder(folder="报销制度", current_user=build_admin_user())
assert third.document_count == 1
document_dir = tmp_path / "knowledge" / ".llm_wiki" / "documents" / document_id
document_payload = json.loads((document_dir / "document.json").read_text(encoding="utf-8"))
assert document_payload["sync_reason"] == "updated_at_changed"
def test_llm_wiki_sync_does_not_overwrite_active_rule(tmp_path, monkeypatch) -> None:
document_id = upload_policy_document(tmp_path)
def fake_call_candidate_model(self, *, entry, chunk_group):
return build_candidate_payload(chunk_group[0]["chunk_id"])
monkeypatch.setattr(LlmWikiService, "_call_candidate_model", fake_call_candidate_model)
with build_session() as db:
service = LlmWikiService(db, storage_root=tmp_path)
first = service.sync_folder(folder="报销制度", current_user=build_admin_user())
asset_id = first.generated_rule_asset_ids[0]
asset_service = AgentAssetService(db)
asset_detail = asset_service.get_asset(asset_id)
assert asset_detail is not None
asset_service.create_review(
asset_id,
AgentAssetReviewCreate(
version=asset_detail.current_version or "v1.0.0",
reviewer="管理员",
review_status=AgentReviewStatus.APPROVED,
review_note="允许上线",
),
actor="管理员",
)
activated = asset_service.activate_asset(asset_id, actor="管理员")
assert activated.status == "active"
original_version = activated.current_version
original_content = activated.current_version_content
original_config = activated.config_json
def fake_call_candidate_model_changed(self, *, entry, chunk_group):
return build_candidate_payload(
chunk_group[0]["chunk_id"],
summary="住宿费超过标准时,必须升级审批并记录超标原因。",
)
monkeypatch.setattr(LlmWikiService, "_call_candidate_model", fake_call_candidate_model_changed)
update_document_timestamp(tmp_path, document_id, "2026-05-15T10:00:00+00:00")
second = service.sync_folder(folder="报销制度", current_user=build_admin_user())
refreshed = asset_service.get_asset(asset_id)
assert second.document_count == 1
assert second.generated_rule_count == 0
assert refreshed is not None
assert refreshed.status == "active"
assert refreshed.current_version == original_version
assert refreshed.current_version_content == original_content
assert refreshed.config_json == original_config
def test_llm_wiki_sync_endpoint_records_agent_run(monkeypatch) -> None:
def fake_sync_folder(self, *, folder="报销制度", current_user, document_ids=None, force=False):
return LlmWikiSyncRead(
ok=True,
run_id="wiki_test_sync",
folder=folder,
document_count=1,
knowledge_candidate_count=2,
rule_candidate_count=1,
generated_rule_count=1,
generated_rule_asset_ids=["asset-rule-1"],
summary="已完成 Hermes LLM Wiki 同步。",
)
monkeypatch.setattr(LlmWikiService, "sync_folder", fake_sync_folder)
client, session_factory = build_client()
with session_factory() as db:
before_count = len(AgentRunService(db).list_runs(limit=100))
response = client.post(
"/api/v1/knowledge/llm-wiki/sync",
json={"folder": "报销制度", "force": False},
headers={
"x-auth-username": "admin",
"x-auth-name": "admin",
"x-auth-is-admin": "true",
},
)
assert response.status_code == 200
payload = response.json()
assert payload["run_id"] == "wiki_test_sync"
assert payload["generated_rule_count"] == 1
with session_factory() as db:
service = AgentRunService(db)
after_runs = service.list_runs(limit=100)
assert len(after_runs) == before_count + 1
latest_run = after_runs[0]
assert latest_run.agent == "hermes"
assert latest_run.source == AgentRunSource.SCHEDULE.value
assert latest_run.status == AgentRunStatus.SUCCEEDED.value
assert latest_run.tool_calls
assert latest_run.tool_calls[0].tool_name == "system_hermes_llm_wiki_sync"
assert latest_run.tool_calls[0].status == "succeeded"
assert latest_run.tool_calls[0].response_json["run_id"] == "wiki_test_sync"