From 6793b6f832f6e0deb42a63885e3e0729479ec762 Mon Sep 17 00:00:00 2001 From: caoxiaozhu Date: Fri, 15 May 2026 09:33:59 +0000 Subject: [PATCH] =?UTF-8?q?feat(server):=20=E9=87=8D=E6=9E=84=E7=9F=A5?= =?UTF-8?q?=E8=AF=86=E5=BA=93=E6=9C=8D=E5=8A=A1=E5=92=8C=E8=B7=AF=E7=94=B1?= =?UTF-8?q?=E9=85=8D=E7=BD=AE=EF=BC=8C=E4=BC=98=E5=8C=96LLM=E7=BB=B4?= =?UTF-8?q?=E5=9F=BA=E7=9F=A5=E8=AF=86=E7=AE=A1=E7=90=86=E6=8E=A5=E5=8F=A3?= =?UTF-8?q?=EF=BC=8C=E5=A2=9E=E5=BC=BA=E7=9F=A5=E8=AF=86=E6=A3=80=E7=B4=A2?= =?UTF-8?q?=E8=83=BD=E5=8A=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/src/app/api/v1/endpoints/knowledge.py | 93 +-- server/src/app/api/v1/router.py | 2 + server/src/app/main.py | 2 + server/src/app/schemas/knowledge.py | 10 + server/src/app/services/knowledge.py | 80 ++- server/src/app/services/llm_wiki.py | 562 +++++++++++++++++-- server/tests/test_llm_wiki_service.py | 186 +++++- 7 files changed, 812 insertions(+), 123 deletions(-) diff --git a/server/src/app/api/v1/endpoints/knowledge.py b/server/src/app/api/v1/endpoints/knowledge.py index f8cfd46..902d159 100644 --- a/server/src/app/api/v1/endpoints/knowledge.py +++ b/server/src/app/api/v1/endpoints/knowledge.py @@ -16,18 +16,19 @@ from app.schemas.knowledge import ( KnowledgeActionResponse, KnowledgeDocumentDetailRead, KnowledgeLibraryRead, - LlmWikiDocumentDetailRead, - LlmWikiIndexRead, - LlmWikiSummaryUpdateWrite, KnowledgeOnlyOfficeCallbackRead, KnowledgeOnlyOfficeCallbackWrite, KnowledgeOnlyOfficeConfigRead, - LlmWikiSyncRead, + LlmWikiDocumentDetailRead, + LlmWikiIndexRead, + LlmWikiSyncTaskRead, LlmWikiSyncWrite, + LlmWikiSummaryUpdateWrite, ) from app.services.agent_runs import AgentRunService -from app.services.knowledge import KnowledgeService +from app.services.knowledge import KNOWLEDGE_INGEST_STATUS_SYNCING, KnowledgeService from app.services.llm_wiki import LlmWikiService +from app.services.llm_wiki_tasks import llm_wiki_task_manager router = APIRouter(prefix="/knowledge") @@ -46,8 +47,9 @@ router = APIRouter(prefix="/knowledge") ) def get_knowledge_library( _: Annotated[CurrentUserContext, Depends(get_current_user)], + db: Annotated[Session, Depends(get_db)], ) -> KnowledgeLibraryRead: - return KnowledgeService().list_library() + return KnowledgeService(db=db).list_library() @router.get( @@ -140,9 +142,9 @@ def update_llm_wiki_document_summary( @router.post( "/llm-wiki/sync", - response_model=LlmWikiSyncRead, - summary="触发 Hermes 形成 LLM Wiki 与规则草稿", - description="按知识库文档变化情况增量触发系统 Hermes,形成知识候选和规则草稿。", + response_model=LlmWikiSyncTaskRead, + summary="异步触发 Hermes 形成 LLM Wiki 与规则草稿", + description="按知识库文档变化情况将系统 Hermes 归纳任务放入后台执行,并返回可追踪的 AgentRun 编号。", responses={ status.HTTP_401_UNAUTHORIZED: { "model": ErrorResponse, @@ -158,8 +160,15 @@ def sync_llm_wiki( payload: LlmWikiSyncWrite, current_user: Annotated[CurrentUserContext, Depends(require_admin_user)], db: Annotated[Session, Depends(get_db)], -) -> LlmWikiSyncRead: +) -> LlmWikiSyncTaskRead: run_service = AgentRunService(db) + knowledge_service = KnowledgeService(db=db) + requested_ids = {str(item).strip() for item in payload.document_ids if str(item).strip()} + target_document_ids = [ + str(item.get("id") or "").strip() + for item in knowledge_service.list_folder_documents(folder=payload.folder) + if str(item.get("id") or "").strip() and (not requested_ids or str(item.get("id") or "").strip() in requested_ids) + ] task_asset = db.scalar( select(AgentAsset).where(AgentAsset.code == "task.hermes.llm_wiki_rule_formation") ) @@ -170,47 +179,52 @@ def sync_llm_wiki( task_id=task_asset.id if task_asset is not None else None, permission_level=AgentPermissionLevel.READ.value, status=AgentRunStatus.RUNNING.value, - result_summary="Hermes 正在形成 LLM Wiki 与规则草稿。", + result_summary="Hermes 归纳任务已入队,等待后台执行。", + route_json={ + "job_type": "llm_wiki_sync", + "phase": "queued", + "folder": payload.folder, + "force": payload.force, + "requested_document_ids": target_document_ids, + "progress": { + "total_documents": len(target_document_ids), + "completed_documents": 0, + "failed_documents": 0, + "skipped_documents": 0, + "percent": 0, + }, + }, ) try: - result = LlmWikiService(db).sync_folder( + if target_document_ids: + knowledge_service.set_document_ingest_statuses( + target_document_ids, + status_code=KNOWLEDGE_INGEST_STATUS_SYNCING, + agent_run_id=run.run_id, + ) + llm_wiki_task_manager.submit_sync( + agent_run_id=run.run_id, folder=payload.folder, current_user=current_user, - document_ids=payload.document_ids, + document_ids=target_document_ids, force=payload.force, ) - run_service.record_tool_call( - run_id=run.run_id, - tool_type="llm", - tool_name="system_hermes_llm_wiki_sync", - request_json=payload.model_dump(), - response_json=result.model_dump(), - status="succeeded", - duration_ms=0, + return LlmWikiSyncTaskRead( + ok=True, + agent_run_id=run.run_id, + folder=payload.folder, + document_ids=target_document_ids, + queued_at=run.started_at, + status=run.status, + summary="Hermes 已进入后台归纳,可在日志管理查看进度。", ) - run_service.update_run( - run.run_id, - status=AgentRunStatus.SUCCEEDED.value, - result_summary=result.summary, - finished_at=datetime.now(UTC), - ) - return result except Exception as exc: - run_service.record_tool_call( - run_id=run.run_id, - tool_type="llm", - tool_name="system_hermes_llm_wiki_sync", - request_json=payload.model_dump(), - response_json={"error": str(exc)}, - status="failed", - duration_ms=0, - error_message=str(exc), - ) run_service.update_run( run.run_id, status=AgentRunStatus.FAILED.value, error_message=str(exc), + result_summary=str(exc), finished_at=datetime.now(UTC), ) if isinstance(exc, ValueError): @@ -239,9 +253,10 @@ def sync_llm_wiki( def get_knowledge_document( document_id: str, _: Annotated[CurrentUserContext, Depends(get_current_user)], + db: Annotated[Session, Depends(get_db)], ) -> KnowledgeDocumentDetailRead: try: - return KnowledgeService().get_document_detail(document_id) + return KnowledgeService(db=db).get_document_detail(document_id) except FileNotFoundError as exc: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, diff --git a/server/src/app/api/v1/router.py b/server/src/app/api/v1/router.py index ac359e1..391d2cb 100644 --- a/server/src/app/api/v1/router.py +++ b/server/src/app/api/v1/router.py @@ -13,6 +13,7 @@ from app.api.v1.endpoints.ontology import router as ontology_router from app.api.v1.endpoints.orchestrator import router as orchestrator_router from app.api.v1.endpoints.reimbursements import router as reimbursements_router from app.api.v1.endpoints.settings import router as settings_router +from app.api.v1.endpoints.system_logs import router as system_logs_router router = APIRouter() router.include_router(health_router, tags=["health"]) @@ -28,3 +29,4 @@ router.include_router(orchestrator_router, tags=["orchestrator"]) router.include_router(employees_router, prefix="/employees", tags=["employees"]) router.include_router(reimbursements_router, prefix="/reimbursements", tags=["reimbursements"]) router.include_router(settings_router, tags=["settings"]) +router.include_router(system_logs_router, tags=["system-logs"]) diff --git a/server/src/app/main.py b/server/src/app/main.py index 80d24eb..6ef8c6e 100644 --- a/server/src/app/main.py +++ b/server/src/app/main.py @@ -15,6 +15,7 @@ from app.schemas.common import RootStatusRead from app.services.agent_foundation import prepare_agent_foundation from app.services.employee import prepare_employee_directory from app.services.knowledge import prepare_knowledge_library +from app.services.llm_wiki_tasks import llm_wiki_task_manager @asynccontextmanager @@ -32,6 +33,7 @@ async def lifespan(_: FastAPI) -> AsyncIterator[None]: settings.api_v1_prefix, ) yield + llm_wiki_task_manager.shutdown() def create_app() -> FastAPI: diff --git a/server/src/app/schemas/knowledge.py b/server/src/app/schemas/knowledge.py index a57bc14..8d07620 100644 --- a/server/src/app/schemas/knowledge.py +++ b/server/src/app/schemas/knowledge.py @@ -193,5 +193,15 @@ class LlmWikiSyncRead(BaseModel): summary: str = "" +class LlmWikiSyncTaskRead(BaseModel): + ok: bool = True + agent_run_id: str + folder: str + document_ids: list[str] = Field(default_factory=list) + queued_at: datetime + status: str = "running" + summary: str = "" + + class LlmWikiSummaryUpdateWrite(BaseModel): knowledge_summary_markdown: str = Field(min_length=1) diff --git a/server/src/app/services/knowledge.py b/server/src/app/services/knowledge.py index 91aa5d1..cf9bfc8 100644 --- a/server/src/app/services/knowledge.py +++ b/server/src/app/services/knowledge.py @@ -10,19 +10,23 @@ from dataclasses import dataclass from datetime import UTC, datetime from pathlib import Path from typing import Any -from urllib.request import Request, urlopen -from uuid import uuid4 -from xml.etree import ElementTree -from zipfile import BadZipFile, ZipFile - -import jwt - -from app.api.deps import CurrentUserContext +from urllib.request import Request, urlopen +from uuid import uuid4 +from xml.etree import ElementTree +from zipfile import BadZipFile, ZipFile + +import jwt +from sqlalchemy import select +from sqlalchemy.orm import Session + +from app.api.deps import CurrentUserContext +from app.core.agent_enums import AgentRunStatus from app.core.config import get_settings from app.core.logging import get_logger +from app.models.agent_run import AgentRun from app.schemas.knowledge import ( KnowledgeDocumentDetailRead, - KnowledgeDocumentRead, + KnowledgeDocumentRead, KnowledgeFolderRead, KnowledgeLibraryRead, KnowledgeOnlyOfficeConfigRead, @@ -94,8 +98,9 @@ def prepare_knowledge_library() -> None: class KnowledgeService: - def __init__(self, storage_root: Path | None = None) -> None: + def __init__(self, storage_root: Path | None = None, db: Session | None = None) -> None: settings = get_settings() + self.db = db self.storage_root = Path(storage_root or settings.resolved_storage_root_dir) self.library_root = self.storage_root / "knowledge" self.index_path = self.library_root / ".index.json" @@ -147,12 +152,13 @@ class KnowledgeService: self._save_index(index) entry = self._require_entry(index, document_id) preview_kind, preview_pages = self._build_preview(entry) - document = self._serialize_document(entry) - return KnowledgeDocumentDetailRead( - **document.model_dump(), - previewKind=preview_kind, - previewPages=preview_pages, - ) + wiki_document = self._build_wiki_document_map().get(str(document_id).strip()) + document = self._serialize_document(entry, wiki_document=wiki_document) + return KnowledgeDocumentDetailRead( + **document.model_dump(), + previewKind=preview_kind, + previewPages=preview_pages, + ) def upload_document( self, @@ -210,9 +216,10 @@ class KnowledgeService: "uploaded_by": current_user.name, "version_number": 1, "ingest_status": KNOWLEDGE_INGEST_STATUS_PUBLISHED, + "ingest_agent_run_id": "", } index["documents"].append(entry) - logger.info( + logger.info( "Knowledge document uploaded id=%s folder=%s filename=%s by=%s", document_id, normalized_folder, @@ -231,6 +238,7 @@ class KnowledgeService: "uploaded_by": current_user.name, "version_number": int(existing_entry.get("version_number", 1)) + 1, "ingest_status": KNOWLEDGE_INGEST_STATUS_PUBLISHED, + "ingest_agent_run_id": "", } ) entry = existing_entry @@ -286,7 +294,13 @@ class KnowledgeService: self._save_index(index) return dict(self._require_entry(index, document_id)) - def set_document_ingest_statuses(self, document_ids: list[str], status_code: int) -> None: + def set_document_ingest_statuses( + self, + document_ids: list[str], + status_code: int, + *, + agent_run_id: str | None = None, + ) -> None: self.ensure_library_ready() normalized_ids = {str(item).strip() for item in document_ids if str(item).strip()} if not normalized_ids: @@ -299,9 +313,15 @@ class KnowledgeService: if str(entry.get("id") or "").strip() not in normalized_ids: continue if self._normalize_ingest_status_code(entry.get("ingest_status")) == status_code: + if agent_run_id is not None and entry.get("ingest_agent_run_id") != agent_run_id: + entry["ingest_agent_run_id"] = agent_run_id + entry["ingest_status_updated_at"] = updated_at + changed = True continue entry["ingest_status"] = status_code entry["ingest_status_updated_at"] = updated_at + if agent_run_id is not None: + entry["ingest_agent_run_id"] = agent_run_id changed = True if changed: @@ -703,6 +723,9 @@ class KnowledgeService: if item.get("ingest_status") != normalized_status: item["ingest_status"] = normalized_status changed = True + if "ingest_agent_run_id" not in item: + item["ingest_agent_run_id"] = "" + changed = True existing_items.append(item) else: changed = True @@ -735,6 +758,7 @@ class KnowledgeService: "uploaded_by": "系统导入", "version_number": 1, "ingest_status": KNOWLEDGE_INGEST_STATUS_PUBLISHED, + "ingest_agent_run_id": "", } ) changed = True @@ -773,7 +797,7 @@ class KnowledgeService: if ( current_status == KNOWLEDGE_INGEST_STATUS_SYNCING and preserve_syncing - and not self._is_syncing_status_stale(entry) + and self._should_preserve_syncing_status(entry) ): continue @@ -880,6 +904,24 @@ class KnowledgeService: updated_at = updated_at.replace(tzinfo=UTC) age_seconds = (datetime.now(UTC) - updated_at.astimezone(UTC)).total_seconds() return age_seconds >= KNOWLEDGE_INGEST_SYNC_STALE_SECONDS + + def _should_preserve_syncing_status(self, entry: dict[str, Any]) -> bool: + agent_run_id = str(entry.get("ingest_agent_run_id") or "").strip() + if not agent_run_id or self.db is None: + return not self._is_syncing_status_stale(entry) + + run = self.db.scalar(select(AgentRun).where(AgentRun.run_id == agent_run_id)) + if run is None: + return not self._is_syncing_status_stale(entry) + if run.status != AgentRunStatus.RUNNING.value: + return False + + heartbeat_at = str((run.route_json or {}).get("heartbeat_at") or "").strip() + if heartbeat_at: + probe_entry = {"ingest_status_updated_at": heartbeat_at} + return not self._is_syncing_status_stale(probe_entry) + + return not self._is_syncing_status_stale(entry) def _require_entry(self, index: dict[str, Any], document_id: str) -> dict[str, Any]: for entry in index["documents"]: diff --git a/server/src/app/services/llm_wiki.py b/server/src/app/services/llm_wiki.py index c4e04b5..549307e 100644 --- a/server/src/app/services/llm_wiki.py +++ b/server/src/app/services/llm_wiki.py @@ -8,7 +8,7 @@ from dataclasses import dataclass from datetime import UTC, datetime from decimal import Decimal from pathlib import Path -from typing import Any, Literal +from typing import Any, Callable, Literal from uuid import uuid4 from pydantic import BaseModel, ConfigDict, Field, ValidationError, model_validator @@ -47,7 +47,8 @@ from app.services.system_hermes import SystemHermesService logger = get_logger("app.services.llm_wiki") HERMES_CANDIDATE_MODEL_TIMEOUT_SECONDS = 10 -HERMES_CANDIDATE_GROUP_SIZE = 3 +HERMES_CANDIDATE_GROUP_SIZE = 2 +HERMES_CANDIDATE_CONTENT_LIMIT = 520 LOW_SIGNAL_DOTTED_LINE_PATTERN = re.compile(r"[..。·•]{6,}\s*[0-9]{0,3}$") PAGE_FOOTER_PATTERN = re.compile(r"^第\s*\d+\s*页\s*共\s*\d+\s*页$") POLICY_SUBSTANCE_KEYWORDS = ( @@ -412,6 +413,8 @@ class LlmWikiService: current_user: CurrentUserContext, document_ids: list[str] | None = None, force: bool = False, + agent_run_id: str | None = None, + progress_callback: Callable[[dict[str, Any], str], None] | None = None, ) -> LlmWikiSyncRead: self.knowledge_service.ensure_library_ready() documents = self.knowledge_service.list_folder_documents(folder=folder) @@ -427,6 +430,7 @@ class LlmWikiService: self.knowledge_service.set_document_ingest_statuses( target_document_ids, status_code=KNOWLEDGE_INGEST_STATUS_SYNCING, + agent_run_id=agent_run_id, ) try: @@ -441,9 +445,27 @@ class LlmWikiService: rule_candidate_count = 0 generated_rule_asset_ids: list[str] = [] changed_document_count = 0 + skipped_document_count = 0 sync_summaries: list[str] = [] + failed_document_ids: list[str] = [] + total_documents = len(documents) - for entry in documents: + self._emit_progress( + progress_callback, + { + "phase": "running", + "progress": { + "total_documents": total_documents, + "completed_documents": 0, + "failed_documents": 0, + "skipped_documents": 0, + "percent": 0, + }, + }, + f"Hermes 已开始归纳,待处理文档 {total_documents} 个。", + ) + + for index_value, entry in enumerate(documents, start=1): document_id = str(entry.get("id") or "").strip() if not document_id: continue @@ -451,15 +473,85 @@ class LlmWikiService: existing = existing_by_id.get(document_id) sync_reason = self._resolve_sync_reason(entry=entry, existing=existing, force=force) if sync_reason == "unchanged_skipped": + skipped_document_count += 1 sync_summaries.append(f"{entry['original_name']}:未变化,跳过。") + self._emit_progress( + progress_callback, + { + "phase": "running", + "progress": { + "total_documents": total_documents, + "completed_documents": changed_document_count, + "failed_documents": len(failed_document_ids), + "skipped_documents": skipped_document_count, + "current_document_index": index_value, + "current_document_id": document_id, + "current_document_name": entry["original_name"], + "current_stage": "skipped", + "percent": self._calculate_progress_percent( + completed_documents=changed_document_count, + skipped_documents=skipped_document_count, + total_documents=total_documents, + ), + }, + }, + f"《{entry['original_name']}》未变化,跳过本次归纳。", + ) continue + self._emit_progress( + progress_callback, + { + "phase": "running", + "progress": { + "total_documents": total_documents, + "completed_documents": changed_document_count, + "failed_documents": len(failed_document_ids), + "skipped_documents": skipped_document_count, + "current_document_index": index_value, + "current_document_id": document_id, + "current_document_name": entry["original_name"], + "current_stage": "document_started", + "percent": self._calculate_progress_percent( + completed_documents=changed_document_count, + skipped_documents=skipped_document_count, + total_documents=total_documents, + ), + }, + }, + f"Hermes 正在归纳《{entry['original_name']}》。", + ) + changed_document_count += 1 document_payload = self._sync_single_document( entry=entry, folder=folder, current_user=current_user, sync_reason=sync_reason, + progress_callback=lambda payload, summary, *, document_id=document_id, document_name=entry["original_name"], document_index=index_value: self._emit_progress( + progress_callback, + { + "phase": "running", + "progress": { + "total_documents": total_documents, + "completed_documents": max(changed_document_count - 1, 0), + "failed_documents": len(failed_document_ids), + "skipped_documents": skipped_document_count, + "current_document_index": document_index, + "current_document_id": document_id, + "current_document_name": document_name, + **payload, + "percent": self._calculate_progress_percent( + completed_documents=max(changed_document_count - 1, 0), + skipped_documents=skipped_document_count, + total_documents=total_documents, + group_count=int(payload.get("group_count") or 0), + current_group_index=int(payload.get("current_group_index") or 0), + ), + }, + }, + summary, + ), ) existing_by_id[document_id] = document_payload["document"] knowledge_candidate_count += len(document_payload["knowledge_candidates"]) @@ -471,13 +563,47 @@ class LlmWikiService: if str(item.get("generated_asset_id") or "").strip() ] ) + if document_payload["document"].get("quality_status") in {"fallback_only", "runtime_only", "failed"}: + failed_document_ids.append(document_id) sync_summaries.append( f"{entry['original_name']}:{sync_reason},知识候选 {len(document_payload['knowledge_candidates'])} 条," - f"规则候选 {len(document_payload['rule_candidates'])} 条。" + f"规则候选 {len(document_payload['rule_candidates'])} 条," + f"归纳质量 {document_payload['document'].get('quality_status') or 'formal'}。" + ) + self._emit_progress( + progress_callback, + { + "phase": "running", + "progress": { + "total_documents": total_documents, + "completed_documents": changed_document_count, + "failed_documents": len(failed_document_ids), + "skipped_documents": skipped_document_count, + "current_document_index": index_value, + "current_document_id": document_id, + "current_document_name": entry["original_name"], + "current_stage": "document_completed", + "knowledge_candidate_count": len(document_payload["knowledge_candidates"]), + "rule_candidate_count": len(document_payload["rule_candidates"]), + "quality_status": document_payload["document"].get("quality_status") or "formal", + "percent": self._calculate_progress_percent( + completed_documents=changed_document_count, + skipped_documents=skipped_document_count, + total_documents=total_documents, + ), + }, + }, + f"《{entry['original_name']}》归纳完成,质量状态为 {document_payload['document'].get('quality_status') or 'formal'}。", ) index["documents"] = list(existing_by_id.values()) self._write_json_file(self.knowledge_service.llm_wiki_index_path, index) + if failed_document_ids: + self.knowledge_service.set_document_ingest_statuses( + failed_document_ids, + status_code=KNOWLEDGE_INGEST_STATUS_FAILED, + agent_run_id=agent_run_id, + ) sync_runs.setdefault("runs", []) sync_runs["runs"].append( @@ -502,6 +628,22 @@ class LlmWikiService: generated_rule_ids = list(dict.fromkeys(generated_rule_asset_ids)) summary = ";".join(sync_summaries) if sync_summaries else "未发现需要同步的知识文档。" + self._emit_progress( + progress_callback, + { + "phase": "running", + "progress": { + "total_documents": total_documents, + "completed_documents": changed_document_count, + "failed_documents": len(failed_document_ids), + "skipped_documents": skipped_document_count, + "knowledge_candidate_count": knowledge_candidate_count, + "rule_candidate_count": rule_candidate_count, + "percent": 100, + }, + }, + summary, + ) return LlmWikiSyncRead( ok=True, run_id=run_id, @@ -518,6 +660,7 @@ class LlmWikiService: self.knowledge_service.set_document_ingest_statuses( target_document_ids, status_code=KNOWLEDGE_INGEST_STATUS_FAILED, + agent_run_id=agent_run_id, ) raise @@ -528,6 +671,7 @@ class LlmWikiService: folder: str, current_user: CurrentUserContext, sync_reason: str, + progress_callback: Callable[[dict[str, Any], str], None] | None = None, ) -> dict[str, Any]: document_id = str(entry["id"]) document_name = str(entry["original_name"]) @@ -539,10 +683,19 @@ class LlmWikiService: text_path.write_text(extracted_text, encoding="utf-8") chunks = self._build_chunks(document_id=document_id, text=extracted_text) - knowledge_candidates, rule_candidates = self._extract_candidates( + self._emit_progress( + progress_callback, + { + "current_stage": "text_extracted", + "chunk_count": len(chunks), + }, + f"《{document_name}》文本提取完成,共形成 {len(chunks)} 个分块。", + ) + knowledge_candidates, rule_candidates, extraction_stats = self._extract_candidates( entry=entry, chunks=chunks, current_user=current_user, + progress_callback=progress_callback, ) generated_candidates: list[dict[str, Any]] = [] @@ -563,8 +716,17 @@ class LlmWikiService: "checksum": str(entry.get("sha256") or ""), "extracted_text_path": str(text_path), "chunk_count": len(chunks), + "candidate_chunk_count": extraction_stats.candidate_chunk_count, + "filtered_chunk_count": extraction_stats.filtered_chunk_count, + "group_count": extraction_stats.group_count, + "successful_group_count": extraction_stats.successful_group_count, + "failed_group_count": extraction_stats.failed_group_count, "knowledge_candidate_count": len(knowledge_candidates), + "formal_knowledge_candidate_count": extraction_stats.formal_knowledge_candidate_count, + "fallback_knowledge_candidate_count": extraction_stats.fallback_knowledge_candidate_count, "rule_candidate_count": len(generated_candidates), + "quality_status": extraction_stats.quality_status, + "quality_note": extraction_stats.quality_note, "updated_at": datetime.now(UTC).isoformat(), "signature": self._build_document_signature(entry), "sync_reason": sync_reason, @@ -593,49 +755,147 @@ class LlmWikiService: entry: dict[str, Any], chunks: list[dict[str, Any]], current_user: CurrentUserContext, - ) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]: + progress_callback: Callable[[dict[str, Any], str], None] | None = None, + ) -> tuple[list[dict[str, Any]], list[dict[str, Any]], CandidateExtractionStats]: + stats = CandidateExtractionStats(raw_chunk_count=len(chunks)) if not chunks: - return [], [] + stats.quality_status = "failed" + stats.quality_note = "文档未提取到可用分块,无法形成 LLM Wiki。" + return [], [], stats + + candidate_chunks = self._select_candidate_chunks(chunks) + stats.candidate_chunk_count = len(candidate_chunks) + stats.filtered_chunk_count = max(0, len(chunks) - len(candidate_chunks)) + if not candidate_chunks: + stats.quality_status = "failed" + stats.quality_note = "正文条款分块为空,当前仅识别到封面、目录或低信息量内容,未形成正式归纳。" + return [], [], stats + + projected_group_count = len(self._group_chunks(candidate_chunks, size=HERMES_CANDIDATE_GROUP_SIZE)) + self._emit_progress( + progress_callback, + { + "current_stage": "candidate_chunks_selected", + "candidate_chunk_count": stats.candidate_chunk_count, + "filtered_chunk_count": stats.filtered_chunk_count, + "group_count": projected_group_count, + "current_group_index": 0, + "successful_group_count": 0, + "failed_group_count": 0, + }, + f"《{entry['original_name']}》已筛出 {stats.candidate_chunk_count} 个有效正文分块,准备分 {projected_group_count} 组归纳。", + ) knowledge_candidates: list[dict[str, Any]] = [] rule_candidates: list[dict[str, Any]] = [] seen_knowledge_keys: set[str] = set() seen_rule_keys: set[str] = set() - for chunk_group in self._group_chunks(chunks, size=4): - payload = self._call_candidate_model(entry=entry, chunk_group=chunk_group) + for chunk_group in self._group_chunks(candidate_chunks, size=HERMES_CANDIDATE_GROUP_SIZE): + stats.group_count += 1 + attempt = self._call_candidate_model(entry=entry, chunk_group=chunk_group) + if isinstance(attempt, dict): + attempt = CandidateModelAttempt(payload=attempt, source="hermes", ok=True) + if not attempt.ok: + stats.failed_group_count += 1 + self._emit_progress( + progress_callback, + { + "current_stage": "extracting_candidates", + "group_count": projected_group_count, + "current_group_index": stats.group_count, + "successful_group_count": stats.successful_group_count, + "failed_group_count": stats.failed_group_count, + }, + f"《{entry['original_name']}》第 {stats.group_count}/{projected_group_count} 组归纳失败,继续处理下一组。", + ) + continue + + stats.successful_group_count += 1 batch_knowledge = self._normalize_knowledge_candidates( - raw_items=list(payload.get("knowledge_candidates") or []), + raw_items=list(attempt.payload.get("knowledge_candidates") or []), entry=entry, chunk_group=chunk_group, seen_keys=seen_knowledge_keys, + extraction_mode=attempt.source, ) - batch_rules = self._normalize_rule_candidates( - raw_items=list(payload.get("rule_candidates") or []), - entry=entry, - chunk_group=chunk_group, - current_user=current_user, - seen_keys=seen_rule_keys, - ) + batch_rules: list[dict[str, Any]] = [] + if attempt.source == "hermes": + batch_rules = self._normalize_rule_candidates( + raw_items=list(attempt.payload.get("rule_candidates") or []), + entry=entry, + chunk_group=chunk_group, + current_user=current_user, + seen_keys=seen_rule_keys, + ) knowledge_candidates.extend(batch_knowledge) rule_candidates.extend(batch_rules) + self._emit_progress( + progress_callback, + { + "current_stage": "extracting_candidates", + "group_count": projected_group_count, + "current_group_index": stats.group_count, + "successful_group_count": stats.successful_group_count, + "failed_group_count": stats.failed_group_count, + "knowledge_candidate_count": len(knowledge_candidates), + "rule_candidate_count": len(rule_candidates), + }, + f"《{entry['original_name']}》已完成第 {stats.group_count}/{projected_group_count} 组归纳。", + ) - if not knowledge_candidates: - fallback = self._build_fallback_knowledge_candidate(entry=entry, chunks=chunks) + formal_knowledge_candidate_count = sum( + 1 for item in knowledge_candidates if str(item.get("extraction_mode") or "hermes") == "hermes" + ) + if formal_knowledge_candidate_count <= 0: + fallback = self._build_fallback_knowledge_candidate( + entry=entry, + chunks=candidate_chunks, + reason=( + "Hermes 未能从正文条款中形成正式知识候选。当前结果仅为降级兜底预览,不能视为正式归纳。" + ), + ) if fallback is not None: knowledge_candidates.append(fallback) - return knowledge_candidates[:12], rule_candidates[:12] + truncated_knowledge_candidates = knowledge_candidates[:12] + truncated_rule_candidates = rule_candidates[:12] + stats.formal_knowledge_candidate_count = sum( + 1 for item in truncated_knowledge_candidates if str(item.get("extraction_mode") or "hermes") == "hermes" + ) + stats.fallback_knowledge_candidate_count = max( + 0, + len(truncated_knowledge_candidates) - stats.formal_knowledge_candidate_count, + ) + stats.quality_status, stats.quality_note = self._resolve_quality_status( + stats=stats, + knowledge_candidates=truncated_knowledge_candidates, + ) + self._emit_progress( + progress_callback, + { + "current_stage": "candidate_extraction_completed", + "group_count": projected_group_count, + "current_group_index": projected_group_count, + "successful_group_count": stats.successful_group_count, + "failed_group_count": stats.failed_group_count, + "knowledge_candidate_count": len(truncated_knowledge_candidates), + "formal_knowledge_candidate_count": stats.formal_knowledge_candidate_count, + "fallback_knowledge_candidate_count": stats.fallback_knowledge_candidate_count, + "rule_candidate_count": len(truncated_rule_candidates), + "quality_status": stats.quality_status, + }, + f"《{entry['original_name']}》候选提炼完成,质量状态为 {stats.quality_status}。", + ) + + return truncated_knowledge_candidates, truncated_rule_candidates, stats def _call_candidate_model( self, *, entry: dict[str, Any], chunk_group: list[dict[str, Any]], - ) -> dict[str, Any]: - if self._candidate_model_disabled: - return {} - + ) -> CandidateModelAttempt: facts = { "document_id": entry["id"], "document_name": entry["original_name"], @@ -653,7 +913,7 @@ class LlmWikiService: { "chunk_id": item["chunk_id"], "title": item["title"], - "content": item["content"][:900], + "content": item["content"][:HERMES_CANDIDATE_CONTENT_LIMIT], "source_page": item.get("source_page"), "tags": item.get("tags", []), } @@ -663,6 +923,8 @@ class LlmWikiService: system_prompt = ( "你是企业财务制度知识库的 Hermes 规则形成器。" "你只能基于提供的制度条款生成结构化知识候选和规则候选,不能自由发散。" + "封面、目录、通知、页眉页脚、密级说明、印发信息不属于知识候选,必须忽略。" + "只提炼具有执行意义、审核意义、报销约束意义的条款。" "规则候选必须从允许模板中选 template_key,严禁自创模板。" "runtime_rule 必须严格遵守 runtime_rule_contracts 中对应模板的字段结构和允许值。" "如果条款不适合自动规则化,可以只返回 knowledge_candidates。" @@ -675,6 +937,7 @@ class LlmWikiService: ) user_prompt = ( "请根据以下制度分块生成候选。" + "每组最多提炼 3 条高价值 knowledge_candidates,优先保留可直接供报销审核、附件校验、审批判断使用的知识。" "只返回 JSON 对象,不要输出解释,不要调用工具,不要追加任何其他文本。\n" f"{json.dumps(facts, ensure_ascii=False, indent=2)}" ) @@ -693,29 +956,44 @@ class LlmWikiService: timeout_seconds=HERMES_CANDIDATE_MODEL_TIMEOUT_SECONDS, ) payload = self._extract_json_payload(cli_result.response_text) - if payload: - return payload - self._candidate_model_disabled = True + if payload is not None: + return CandidateModelAttempt(payload=payload, source="hermes", ok=True) logger.warning( - "System Hermes returned no parseable JSON for LLM Wiki doc=%s; using fallback candidates.", + "System Hermes returned no parseable JSON for LLM Wiki doc=%s chunk_group=%s.", entry.get("id"), + ",".join(item.get("chunk_id", "") for item in chunk_group), + ) + return CandidateModelAttempt( + payload={}, + source="hermes", + ok=False, + failure_reason="system_hermes_no_json", ) - return {} except TimeoutExpired: - self._candidate_model_disabled = True logger.warning( - "System Hermes timed out during LLM Wiki candidate extraction doc=%s; using fallback candidates.", + "System Hermes timed out during LLM Wiki candidate extraction doc=%s chunk_group=%s.", entry.get("id"), + ",".join(item.get("chunk_id", "") for item in chunk_group), + ) + return CandidateModelAttempt( + payload={}, + source="hermes", + ok=False, + failure_reason="system_hermes_timeout", ) - return {} except Exception as exc: - self._candidate_model_disabled = True logger.warning( - "System Hermes failed during LLM Wiki candidate extraction doc=%s: %s", + "System Hermes failed during LLM Wiki candidate extraction doc=%s chunk_group=%s: %s", entry.get("id"), + ",".join(item.get("chunk_id", "") for item in chunk_group), exc, ) - return {} + return CandidateModelAttempt( + payload={}, + source="hermes", + ok=False, + failure_reason=str(exc) or "system_hermes_failed", + ) response_text = self.runtime_chat_service.complete( [ @@ -727,10 +1005,129 @@ class LlmWikiService: temperature=0.0, ) payload = self._extract_json_payload(response_text) - if not payload: - self._candidate_model_disabled = True - return {} - return payload + if payload is None: + return CandidateModelAttempt( + payload={}, + source="runtime", + ok=False, + failure_reason="runtime_no_json", + ) + return CandidateModelAttempt( + payload=payload, + source="runtime", + ok=True, + failure_reason="system_hermes_unavailable", + ) + + def _select_candidate_chunks(self, chunks: list[dict[str, Any]]) -> list[dict[str, Any]]: + selected: list[dict[str, Any]] = [] + for chunk in chunks: + if self._is_low_signal_chunk(chunk): + continue + selected.append(chunk) + return selected + + def _is_low_signal_chunk(self, chunk: dict[str, Any]) -> bool: + title = str(chunk.get("title") or "").strip() + content = str(chunk.get("content") or "").strip() + page = int(chunk.get("source_page") or 0) + if not content: + return True + if self._looks_like_table_of_contents(title=title, content=content): + return True + if self._looks_like_cover_notice(title=title, content=content, source_page=page): + return True + + compact_content = re.sub(r"\s+", "", content) + if len(compact_content) < 24 and not self._has_policy_substance(content): + return True + if title.startswith("附件") and len(compact_content) < 40: + return True + return False + + def _is_low_signal_candidate(self, *, title: str, content: str) -> bool: + compact_content = re.sub(r"\s+", "", content) + if len(compact_content) < 24 and not self._has_policy_substance(content): + return True + if self._looks_like_table_of_contents(title=title, content=content): + return True + if self._looks_like_cover_notice(title=title, content=content, source_page=0): + return True + return False + + @staticmethod + def _has_policy_substance(text: str) -> bool: + sample = str(text or "") + return any(keyword in sample for keyword in POLICY_SUBSTANCE_KEYWORDS) + + @staticmethod + def _looks_like_table_of_contents(*, title: str, content: str) -> bool: + title_text = str(title or "").strip() + content_text = str(content or "").strip() + if title_text == "目录" or content_text == "目录": + return True + + lines = [line.strip() for line in content_text.splitlines() if line.strip()] + if lines and sum(1 for line in lines if LOW_SIGNAL_DOTTED_LINE_PATTERN.search(line)) >= max(2, len(lines) // 2): + return True + + if LOW_SIGNAL_DOTTED_LINE_PATTERN.search(content_text) and "第" in title_text: + return True + return False + + def _looks_like_cover_notice(self, *, title: str, content: str, source_page: int) -> bool: + text = f"{title}\n{content}" + if PAGE_FOOTER_PATTERN.fullmatch(str(content or "").strip()): + return True + cover_keywords = ("关于颁布", "特此通知", "印发", "商密", "制度〔", "有限公司文件", "通知") + if source_page == 1 and any(keyword in text for keyword in cover_keywords): + return True + if source_page > 2: + return False + if any(keyword in text for keyword in cover_keywords): + if "必须" not in text and "应当" not in text and "不得" not in text: + return True + return False + + def _resolve_quality_status( + self, + *, + stats: CandidateExtractionStats, + knowledge_candidates: list[dict[str, Any]], + ) -> tuple[str, str]: + if stats.formal_knowledge_candidate_count <= 0: + runtime_count = sum( + 1 for item in knowledge_candidates if str(item.get("extraction_mode") or "") == "runtime" + ) + if runtime_count > 0: + return ( + "runtime_only", + "当前知识候选来自运行时模型而非系统 Hermes,仅供人工参考,不计入正式归纳。", + ) + if stats.fallback_knowledge_candidate_count > 0: + return ( + "fallback_only", + "Hermes 未形成正式知识候选,当前仅保留降级兜底预览,不能作为正式知识上线。", + ) + return ( + "failed", + "Hermes 未能从当前文档提炼出可用知识候选,请调整文档内容或重新归纳。", + ) + + if stats.failed_group_count > 0: + return ( + "partial_degraded", + f"Hermes 成功处理 {stats.successful_group_count}/{stats.group_count} 个分组," + f"仍有 {stats.failed_group_count} 个分组未成功,请人工复核后再使用。", + ) + + if stats.filtered_chunk_count > 0: + return ( + "formal", + f"已自动过滤 {stats.filtered_chunk_count} 个封面、目录或低信息量分块,当前结果来自正文条款。", + ) + + return ("formal", "Hermes 已基于正文条款完成正式归纳。") def _normalize_knowledge_candidates( self, @@ -739,6 +1136,7 @@ class LlmWikiService: entry: dict[str, Any], chunk_group: list[dict[str, Any]], seen_keys: set[str], + extraction_mode: str, ) -> list[dict[str, Any]]: normalized: list[dict[str, Any]] = [] default_chunk_ids = [item["chunk_id"] for item in chunk_group] @@ -750,10 +1148,17 @@ class LlmWikiService: content = str(item.get("content") or "").strip() if not title or not content: continue + if self._is_low_signal_candidate(title=title, content=content): + continue candidate_key = f"{title.casefold()}::{content[:80].casefold()}" if candidate_key in seen_keys: continue seen_keys.add(candidate_key) + quality_flags: list[str] = [] + fallback_reason = "" + if extraction_mode != "hermes": + quality_flags.append("non_hermes_source") + fallback_reason = "当前知识候选不是由系统 Hermes 正式提炼,不能视为正式归纳。" normalized.append( { "candidate_id": f"kc_{uuid4().hex[:12]}", @@ -775,6 +1180,9 @@ class LlmWikiService: "status": "draft", "created_by": "hermes", "created_at": datetime.now(UTC).isoformat(), + "extraction_mode": extraction_mode, + "quality_flags": quality_flags, + "fallback_reason": fallback_reason, } ) return normalized @@ -1218,6 +1626,7 @@ class LlmWikiService: *, entry: dict[str, Any], chunks: list[dict[str, Any]], + reason: str, ) -> dict[str, Any] | None: first_chunk = next((item for item in chunks if str(item.get("content") or "").strip()), None) if first_chunk is None: @@ -1244,6 +1653,9 @@ class LlmWikiService: "status": "draft", "created_by": "hermes", "created_at": datetime.now(UTC).isoformat(), + "extraction_mode": "fallback", + "quality_flags": ["fallback_only", "not_formal_ingest"], + "fallback_reason": reason, } @staticmethod @@ -1266,10 +1678,38 @@ class LlmWikiService: f"- 来源文档:{document_name}", f"- 知识条目数:{len(knowledge_candidates)}", "", - "## 核心知识", - "", ] + quality_status = str(entry.get("quality_status") or "formal").strip() or "formal" + quality_note = str(entry.get("quality_note") or "").strip() + if quality_status != "formal": + lines.extend( + [ + "## 归纳状态", + "", + f"- 质量状态:{quality_status}", + f"- 说明:{quality_note or '当前结果不是正式 Hermes 归纳。'}", + "", + ] + ) + elif quality_note: + lines.extend( + [ + "## 归纳状态", + "", + f"- 质量状态:{quality_status}", + f"- 说明:{quality_note}", + "", + ] + ) + + lines.extend( + [ + "## 核心知识", + "", + ] + ) + if not knowledge_candidates: lines.extend( [ @@ -1624,6 +2064,10 @@ class LlmWikiService: def _clean_line(line: str) -> str: cleaned = str(line or "").replace("\u3000", " ").strip() cleaned = re.sub(r"\s+", " ", cleaned) + if PAGE_FOOTER_PATTERN.fullmatch(cleaned): + return "" + if cleaned in {"商密【中】", "商密【高】", "商密【低】"}: + return "" return cleaned @staticmethod @@ -1701,6 +2145,11 @@ class LlmWikiService: return "forced_rebuild" if existing is None: return "initial_build" + existing_quality_status = str(existing.get("quality_status") or "").strip() + if existing_quality_status and existing_quality_status != "formal": + return f"quality_{existing_quality_status}_rebuild" + if int(existing.get("formal_knowledge_candidate_count") or 0) <= 0: + return "formal_candidate_missing_rebuild" previous_signature = existing.get("signature") if not isinstance(previous_signature, dict): @@ -1723,6 +2172,35 @@ class LlmWikiService: return "unchanged_skipped" return ",".join(reasons) + @staticmethod + def _emit_progress( + progress_callback: Callable[[dict[str, Any], str], None] | None, + payload: dict[str, Any], + summary: str, + ) -> None: + if progress_callback is None: + return + progress_callback(payload, summary) + + @staticmethod + def _calculate_progress_percent( + *, + completed_documents: int, + skipped_documents: int, + total_documents: int, + group_count: int = 0, + current_group_index: int = 0, + ) -> int: + if total_documents <= 0: + return 100 + + completed_units = completed_documents + skipped_documents + if group_count > 0 and current_group_index > 0: + completed_units += min(current_group_index, group_count) / group_count + + percent = round((completed_units / total_documents) * 100) + return max(0, min(percent, 100)) + @staticmethod def _build_document_signature(entry: dict[str, Any]) -> dict[str, Any]: return { diff --git a/server/tests/test_llm_wiki_service.py b/server/tests/test_llm_wiki_service.py index 062044f..9c64b1f 100644 --- a/server/tests/test_llm_wiki_service.py +++ b/server/tests/test_llm_wiki_service.py @@ -16,7 +16,7 @@ from app.core.agent_enums import AgentReviewStatus, AgentRunSource, AgentRunStat from app.db.base import Base from app.main import create_app from app.schemas.agent_asset import AgentAssetReviewCreate -from app.schemas.knowledge import LlmWikiSummaryUpdateWrite, LlmWikiSyncRead +from app.schemas.knowledge import LlmWikiSummaryUpdateWrite from app.services.agent_assets import AgentAssetService from app.services.agent_runs import AgentRunService from app.services.knowledge import ( @@ -25,7 +25,7 @@ from app.services.knowledge import ( KNOWLEDGE_INGEST_STATUS_PUBLISHED, KnowledgeService, ) -from app.services.llm_wiki import LlmWikiService +from app.services.llm_wiki import CandidateModelAttempt, LlmWikiService def build_session() -> Session: @@ -86,6 +86,36 @@ def upload_policy_document(storage_root: Path, *, filename: str = "公司差旅 return document.id +def upload_multipage_policy_document(storage_root: Path, *, filename: str = "公司支出管理办法.txt") -> str: + service = KnowledgeService(storage_root=storage_root) + service.ensure_library_ready() + document = service.upload_document( + folder="报销制度", + filename=filename, + content=( + "商密【中】\n" + "关于颁布《公司支出管理办法》的通知\n" + "特此通知。\n" + "\f" + "目录\n" + "第一章 总则................................4\n" + "第二章 报销审批................................7\n" + "\f" + "第一条 报销申请\n" + "员工提交报销申请时,应附发票、行程单和审批说明。\n" + "第二条 报销审批\n" + "住宿费超过制度标准时,必须升级至总经理审批。\n" + "第三条 附件补充\n" + "缺少附件时不得提交报销。\n" + "\f" + "第四条 财务复核\n" + "财务复核时应校验预算、发票真伪和审批链完整性。\n" + ).encode("utf-8"), + current_user=build_admin_user(), + ) + return document.id + + def build_candidate_payload(chunk_id: str, *, summary: str = "住宿费超过标准时必须升级审批。") -> dict[str, object]: return { "knowledge_candidates": [ @@ -222,10 +252,14 @@ def test_llm_wiki_sync_creates_artifacts_and_draft_rule(tmp_path, monkeypatch) - document_payload = json.loads((document_dir / "document.json").read_text(encoding="utf-8")) assert document_payload["sync_reason"] == "initial_build" + assert document_payload["quality_status"] == "formal" + assert document_payload["formal_knowledge_candidate_count"] == 1 + assert document_payload["fallback_knowledge_candidate_count"] == 0 detail = service.get_document_detail(document_id) assert "公司差旅报销制度.txt 知识总结" in detail.knowledge_summary_markdown assert "住宿费升级审批要求" in detail.knowledge_summary_markdown + assert detail.quality_status == "formal" asset = AgentAssetService(db).get_asset(result.generated_rule_asset_ids[0]) assert asset is not None @@ -386,9 +420,91 @@ def test_llm_wiki_sync_uses_fallback_candidates_when_system_hermes_times_out( assert result.knowledge_candidate_count >= 1 assert runtime_called["count"] == 0 - detail = KnowledgeService(storage_root=tmp_path).get_document_detail(document_id) - assert detail.stateCode == KNOWLEDGE_INGEST_STATUS_INGESTED - assert detail.state == "已归纳" + knowledge_service = KnowledgeService(storage_root=tmp_path) + detail = knowledge_service.get_document_detail(document_id) + assert detail.stateCode == KNOWLEDGE_INGEST_STATUS_FAILED + assert detail.state == "归纳失败" + assert detail.llmWikiAvailable is True + assert detail.llmWikiQualityStatus == "fallback_only" + + document_payload = json.loads( + ( + tmp_path + / "knowledge" + / ".llm_wiki" + / "documents" + / document_id + / "document.json" + ).read_text(encoding="utf-8") + ) + assert document_payload["quality_status"] == "fallback_only" + assert document_payload["formal_knowledge_candidate_count"] == 0 + assert document_payload["fallback_knowledge_candidate_count"] == 1 + + candidates_payload = json.loads( + ( + tmp_path + / "knowledge" + / ".llm_wiki" + / "documents" + / document_id + / "knowledge_candidates.json" + ).read_text(encoding="utf-8") + ) + assert candidates_payload[0]["extraction_mode"] == "fallback" + assert "fallback_only" in candidates_payload[0]["quality_flags"] + + +def test_llm_wiki_sync_continues_after_single_group_failure(tmp_path, monkeypatch) -> None: + document_id = upload_multipage_policy_document(tmp_path, filename="多页支出制度.txt") + call_count = {"count": 0} + + def fake_call_candidate_model(self, *, entry, chunk_group): + call_count["count"] += 1 + if call_count["count"] == 1: + return CandidateModelAttempt( + payload={}, + source="hermes", + ok=False, + failure_reason="simulated_timeout", + ) + return build_candidate_payload(chunk_group[0]["chunk_id"]) + + monkeypatch.setattr(LlmWikiService, "_call_candidate_model", fake_call_candidate_model) + + with build_session() as db: + service = LlmWikiService(db, storage_root=tmp_path) + result = service.sync_folder( + folder="报销制度", + current_user=build_admin_user(), + document_ids=[document_id], + ) + detail = service.get_document_detail(document_id) + + assert result.document_count == 1 + assert call_count["count"] >= 2 + assert detail.quality_status == "partial_degraded" + assert detail.successful_group_count >= 1 + assert detail.failed_group_count >= 1 + assert detail.formal_knowledge_candidate_count >= 1 + + knowledge_detail = KnowledgeService(storage_root=tmp_path).get_document_detail(document_id) + assert knowledge_detail.stateCode == KNOWLEDGE_INGEST_STATUS_INGESTED + assert knowledge_detail.llmWikiQualityStatus == "partial_degraded" + + +def test_llm_wiki_filters_cover_and_catalog_chunks_before_candidate_extraction(tmp_path) -> None: + document_id = upload_multipage_policy_document(tmp_path, filename="封面目录过滤测试.txt") + + with build_session() as db: + service = LlmWikiService(db, storage_root=tmp_path) + text = service.knowledge_service.extract_document_text(document_id) + chunks = service._build_chunks(document_id=document_id, text=text) + candidate_chunks = service._select_candidate_chunks(chunks) + + assert len(chunks) > len(candidate_chunks) + assert candidate_chunks + assert all(int(item.get("source_page") or 0) >= 3 for item in candidate_chunks) def test_llm_wiki_sync_skips_unchanged_and_rebuilds_on_updated_at_change(tmp_path, monkeypatch) -> None: @@ -475,22 +591,46 @@ def test_llm_wiki_sync_does_not_overwrite_active_rule(tmp_path, monkeypatch) -> def test_llm_wiki_sync_endpoint_records_agent_run(monkeypatch) -> None: - def fake_sync_folder(self, *, folder="报销制度", current_user, document_ids=None, force=False): - return LlmWikiSyncRead( - ok=True, - run_id="wiki_test_sync", - folder=folder, - document_count=1, - knowledge_candidate_count=2, - rule_candidate_count=1, - generated_rule_count=1, - generated_rule_asset_ids=["asset-rule-1"], - summary="已完成 Hermes LLM Wiki 同步。", - ) - - monkeypatch.setattr(LlmWikiService, "sync_folder", fake_sync_folder) - client, session_factory = build_client() + + def fake_submit_sync(*, agent_run_id, folder, current_user, document_ids=None, force=False): + with session_factory() as db: + service = AgentRunService(db) + service.record_tool_call( + run_id=agent_run_id, + tool_type="llm", + tool_name="system_hermes_llm_wiki_sync", + request_json={ + "folder": folder, + "document_ids": list(document_ids or []), + "force": force, + }, + response_json={"run_id": "wiki_test_sync"}, + status="succeeded", + duration_ms=0, + ) + service.merge_route_json( + agent_run_id, + { + "phase": "succeeded", + "sync_run_id": "wiki_test_sync", + "progress": { + "total_documents": len(document_ids or []), + "completed_documents": len(document_ids or []), + "failed_documents": 0, + "skipped_documents": 0, + "percent": 100, + }, + }, + status=AgentRunStatus.SUCCEEDED.value, + result_summary="已完成 Hermes LLM Wiki 同步。", + ) + + monkeypatch.setattr( + "app.services.llm_wiki_tasks.llm_wiki_task_manager.submit_sync", + fake_submit_sync, + ) + with session_factory() as db: before_count = len(AgentRunService(db).list_runs(limit=100)) @@ -506,8 +646,8 @@ def test_llm_wiki_sync_endpoint_records_agent_run(monkeypatch) -> None: assert response.status_code == 200 payload = response.json() - assert payload["run_id"] == "wiki_test_sync" - assert payload["generated_rule_count"] == 1 + assert payload["agent_run_id"].startswith("run_") + assert payload["status"] == AgentRunStatus.RUNNING.value with session_factory() as db: service = AgentRunService(db) @@ -521,4 +661,4 @@ def test_llm_wiki_sync_endpoint_records_agent_run(monkeypatch) -> None: assert latest_run.tool_calls assert latest_run.tool_calls[0].tool_name == "system_hermes_llm_wiki_sync" assert latest_run.tool_calls[0].status == "succeeded" - assert latest_run.tool_calls[0].response_json["run_id"] == "wiki_test_sync" + assert latest_run.route_json["sync_run_id"] == "wiki_test_sync"