diff --git a/server/src/app/services/llm_wiki.py b/server/src/app/services/llm_wiki.py new file mode 100644 index 0000000..f761715 --- /dev/null +++ b/server/src/app/services/llm_wiki.py @@ -0,0 +1,1757 @@ +from __future__ import annotations + +import hashlib +import json +import re +from subprocess import TimeoutExpired +from dataclasses import dataclass +from datetime import UTC, datetime +from decimal import Decimal +from pathlib import Path +from typing import Any, Literal +from uuid import uuid4 + +from pydantic import BaseModel, ConfigDict, Field, ValidationError, model_validator +from sqlalchemy import select +from sqlalchemy.orm import Session + +from app.api.deps import CurrentUserContext +from app.core.agent_enums import ( + AgentAssetContentType, + AgentAssetDomain, + AgentAssetStatus, + AgentAssetType, +) +from app.core.logging import get_logger +from app.models.agent_asset import AgentAsset +from app.schemas.agent_asset import AgentAssetCreate, AgentAssetUpdate, AgentAssetVersionCreate +from app.schemas.knowledge import ( + LlmWikiChunkRead, + LlmWikiDocumentDetailRead, + LlmWikiDocumentRead, + LlmWikiIndexRead, + LlmWikiKnowledgeCandidateRead, + LlmWikiRuleCandidateRead, + LlmWikiSummaryUpdateWrite, + LlmWikiSyncRead, +) +from app.services.agent_assets import AgentAssetService +from app.services.knowledge import ( + KNOWLEDGE_INGEST_STATUS_FAILED, + KNOWLEDGE_INGEST_STATUS_SYNCING, + KnowledgeService, +) +from app.services.runtime_chat import RuntimeChatService +from app.services.system_hermes import SystemHermesService + +logger = get_logger("app.services.llm_wiki") + +HERMES_CANDIDATE_MODEL_TIMEOUT_SECONDS = 10 +HERMES_CANDIDATE_GROUP_SIZE = 3 +LOW_SIGNAL_DOTTED_LINE_PATTERN = re.compile(r"[..。·•]{6,}\s*[0-9]{0,3}$") +PAGE_FOOTER_PATTERN = re.compile(r"^第\s*\d+\s*页\s*共\s*\d+\s*页$") +POLICY_SUBSTANCE_KEYWORDS = ( + "报销", + "审批", + "标准", + "费用", + "支出", + "单据", + "发票", + "附件", + "预算", + "财务", + "差旅", + "住宿", + "交通", + "借款", + "付款", + "申请", + "审核", + "核销", + "金额", + "额度", + "应当", + "必须", + "不得", + "可以", +) + +HEADING_PATTERN = re.compile( + r"^(第[一二三四五六七八九十百零〇0-9]+[章节条].{0,40}|附件\s*[0-9一二三四五六七八九十]+.*|目录)$" +) +JSON_FENCE_PATTERN = re.compile(r"```(?:json)?\s*(\{.*\})\s*```", re.DOTALL) +THINK_PATTERN = re.compile(r".*?", re.DOTALL | re.IGNORECASE) + + +@dataclass(slots=True) +class CandidateModelAttempt: + payload: dict[str, Any] + source: str = "hermes" + ok: bool = True + failure_reason: str = "" + + +@dataclass(slots=True) +class CandidateExtractionStats: + raw_chunk_count: int = 0 + candidate_chunk_count: int = 0 + filtered_chunk_count: int = 0 + group_count: int = 0 + successful_group_count: int = 0 + failed_group_count: int = 0 + formal_knowledge_candidate_count: int = 0 + fallback_knowledge_candidate_count: int = 0 + quality_status: str = "failed" + quality_note: str = "" + +RULE_TEMPLATE_CATALOG: dict[str, dict[str, str]] = { + "travel_standard_v1": { + "label": "差旅标准模板", + "summary": "用于差旅、住宿、交通等制度条款的结构化规则草稿。", + }, + "expense_amount_limit_v1": { + "label": "金额上限模板", + "summary": "用于单笔金额、合计金额、住宿标准等金额约束条款。", + }, + "attachment_requirement_v1": { + "label": "附件要求模板", + "summary": "用于票据、附件、说明材料完整性约束条款。", + }, + "general_policy_v1": { + "label": "通用制度模板", + "summary": "用于需要人工补齐执行细节的制度性草稿规则。", + }, +} + +TAG_HINTS: tuple[tuple[str, str], ...] = ( + ("差旅", "差旅"), + ("住宿", "住宿"), + ("交通", "交通"), + ("餐饮", "餐饮"), + ("附件", "附件"), + ("发票", "发票"), + ("审批", "审批"), + ("预算", "预算"), +) + +TRAVEL_CONTROL_CODES = ( + "route_closed_loop", + "destination_match", + "hotel_city_match", + "hotel_limit", + "transport_class_limit", +) + + +class StrictTemplateModel(BaseModel): + model_config = ConfigDict(extra="forbid") + + +class DraftRuleTarget(StrictTemplateModel): + expense_types: list[str] = Field(default_factory=list) + scene_codes: list[str] = Field(default_factory=list) + + +class DraftRuleOutput(StrictTemplateModel): + risk_code: str = Field(min_length=1, max_length=80) + action: Literal["warn", "review", "block"] + message: str = Field(default="", max_length=500) + + +class DraftExceptionPolicy(StrictTemplateModel): + allow_with_explanation: bool = True + keywords: list[str] = Field(default_factory=list) + + +class TravelControlPoint(StrictTemplateModel): + control_code: Literal[ + "route_closed_loop", + "destination_match", + "hotel_city_match", + "hotel_limit", + "transport_class_limit", + ] + severity: Literal["medium", "high"] = "high" + enabled: bool = True + + +class TravelStandardDraftRule(StrictTemplateModel): + kind: Literal["policy_rule_draft"] = "policy_rule_draft" + version: int = 1 + template_key: Literal["travel_standard_v1"] + rule_name: str = Field(min_length=1, max_length=200) + scenario: str = Field(min_length=1, max_length=80) + source_document_name: str = Field(min_length=1, max_length=255) + review_required: bool = True + target: DraftRuleTarget = Field(default_factory=DraftRuleTarget) + control_points: list[TravelControlPoint] = Field(default_factory=list) + exception_policy: DraftExceptionPolicy = Field(default_factory=DraftExceptionPolicy) + output: DraftRuleOutput + + @model_validator(mode="after") + def validate_control_points(self) -> "TravelStandardDraftRule": + if not self.control_points: + raise ValueError("travel_standard_v1 必须至少包含一个 control_points 配置。") + return self + + +class AmountLimitTarget(StrictTemplateModel): + expense_types: list[str] = Field(default_factory=list) + scene_codes: list[str] = Field(default_factory=list) + metric: str = Field(default="claim_total", min_length=1, max_length=80) + + +class AmountLimitThreshold(StrictTemplateModel): + currency: str = Field(default="CNY", min_length=1, max_length=10) + comparator: Literal["gt", "gte"] = "gt" + warn_amount: Decimal | None = None + block_amount: Decimal | None = None + source: Literal["document_value", "manual_fill_required"] = "manual_fill_required" + + @model_validator(mode="after") + def validate_threshold(self) -> "AmountLimitThreshold": + if self.warn_amount is None and self.block_amount is None and self.source != "manual_fill_required": + raise ValueError("金额阈值未提供,source 不是 manual_fill_required 时必须给出 warn_amount 或 block_amount。") + if self.warn_amount is not None and self.warn_amount < Decimal("0"): + raise ValueError("warn_amount 不能为负数。") + if self.block_amount is not None and self.block_amount < Decimal("0"): + raise ValueError("block_amount 不能为负数。") + if ( + self.warn_amount is not None + and self.block_amount is not None + and self.block_amount < self.warn_amount + ): + raise ValueError("block_amount 不能小于 warn_amount。") + return self + + +class ExpenseAmountLimitDraftRule(StrictTemplateModel): + kind: Literal["policy_rule_draft"] = "policy_rule_draft" + version: int = 1 + template_key: Literal["expense_amount_limit_v1"] + rule_name: str = Field(min_length=1, max_length=200) + scenario: str = Field(min_length=1, max_length=80) + source_document_name: str = Field(min_length=1, max_length=255) + review_required: bool = True + target: AmountLimitTarget + threshold: AmountLimitThreshold + exception_policy: DraftExceptionPolicy = Field(default_factory=DraftExceptionPolicy) + output: DraftRuleOutput + + +class AttachmentRequirementItem(StrictTemplateModel): + document_type: str = Field(min_length=1, max_length=80) + required: bool = True + min_count: int = Field(default=1, ge=1) + description: str = Field(default="", max_length=200) + + +class AttachmentRequirementSet(StrictTemplateModel): + min_attachment_count: int = Field(default=1, ge=1) + items: list[AttachmentRequirementItem] = Field(default_factory=list) + manual_fill_required: bool = False + + @model_validator(mode="after") + def validate_items(self) -> "AttachmentRequirementSet": + if not self.items and not self.manual_fill_required: + raise ValueError("附件要求模板必须至少声明一个附件要求,或显式标记 manual_fill_required。") + return self + + +class AttachmentRequirementDraftRule(StrictTemplateModel): + kind: Literal["policy_rule_draft"] = "policy_rule_draft" + version: int = 1 + template_key: Literal["attachment_requirement_v1"] + rule_name: str = Field(min_length=1, max_length=200) + scenario: str = Field(min_length=1, max_length=80) + source_document_name: str = Field(min_length=1, max_length=255) + review_required: bool = True + target: DraftRuleTarget = Field(default_factory=DraftRuleTarget) + attachment_requirements: AttachmentRequirementSet + missing_attachment_action: Literal["warn", "review", "block"] = "block" + output: DraftRuleOutput + + +class GeneralPolicyDraftRule(StrictTemplateModel): + kind: Literal["policy_rule_draft"] = "policy_rule_draft" + version: int = 1 + template_key: Literal["general_policy_v1"] + rule_name: str = Field(min_length=1, max_length=200) + scenario: str = Field(min_length=1, max_length=80) + source_document_name: str = Field(min_length=1, max_length=255) + review_required: bool = True + target: DraftRuleTarget = Field(default_factory=DraftRuleTarget) + control_points: list[str] = Field(default_factory=list) + review_checklist: list[str] = Field(default_factory=list) + output: DraftRuleOutput + + @model_validator(mode="after") + def validate_general_policy(self) -> "GeneralPolicyDraftRule": + if not self.control_points: + raise ValueError("通用制度模板必须至少包含一个 control_points 项。") + if not self.review_checklist: + raise ValueError("通用制度模板必须至少包含一个 review_checklist 项。") + return self + + +RUNTIME_RULE_MODEL_BY_TEMPLATE: dict[str, type[BaseModel]] = { + "travel_standard_v1": TravelStandardDraftRule, + "expense_amount_limit_v1": ExpenseAmountLimitDraftRule, + "attachment_requirement_v1": AttachmentRequirementDraftRule, + "general_policy_v1": GeneralPolicyDraftRule, +} + +RUNTIME_RULE_CONTRACTS: dict[str, dict[str, Any]] = { + "travel_standard_v1": { + "kind": "policy_rule_draft", + "required_fields": ["target", "control_points", "exception_policy", "output"], + "allowed_control_codes": list(TRAVEL_CONTROL_CODES), + "allowed_output_actions": ["warn", "review", "block"], + }, + "expense_amount_limit_v1": { + "kind": "policy_rule_draft", + "required_fields": ["target.metric", "threshold", "exception_policy", "output"], + "allowed_threshold_sources": ["document_value", "manual_fill_required"], + "allowed_output_actions": ["warn", "review", "block"], + }, + "attachment_requirement_v1": { + "kind": "policy_rule_draft", + "required_fields": ["target", "attachment_requirements", "missing_attachment_action", "output"], + "allowed_output_actions": ["warn", "review", "block"], + }, + "general_policy_v1": { + "kind": "policy_rule_draft", + "required_fields": ["target", "control_points", "review_checklist", "output"], + "allowed_output_actions": ["warn", "review", "block"], + }, +} + + +class LlmWikiService: + def __init__(self, db: Session, *, storage_root: Path | None = None) -> None: + self.db = db + self.knowledge_service = KnowledgeService(storage_root=storage_root) + self.runtime_chat_service = RuntimeChatService(db) + self.asset_service = AgentAssetService(db) + self.system_hermes_service = SystemHermesService() + self._candidate_model_disabled = False + + def get_index(self) -> LlmWikiIndexRead: + self.knowledge_service.ensure_library_ready() + index = self._load_wiki_index() + sync_runs = self._load_sync_runs() + documents = [ + LlmWikiDocumentRead.model_validate(item) + for item in sorted( + index.get("documents", []), + key=lambda value: str(value.get("updated_at") or ""), + reverse=True, + ) + ] + return LlmWikiIndexRead(documents=documents, sync_run_count=len(sync_runs.get("runs", []))) + + def get_document_detail(self, document_id: str) -> LlmWikiDocumentDetailRead: + self.knowledge_service.ensure_library_ready() + document_dir = self._document_dir(document_id) + document_payload = self._load_json_file(document_dir / "document.json", default={}) + if not document_payload: + raise FileNotFoundError(document_id) + + chunks = self._load_json_file(document_dir / "chunks.json", default=[]) + knowledge_candidates = self._load_json_file( + document_dir / "knowledge_candidates.json", + default=[], + ) + rule_candidates = self._load_json_file( + document_dir / "rule_candidates.json", + default=[], + ) + summary_path = document_dir / "knowledge_summary.md" + knowledge_summary_markdown = summary_path.read_text(encoding="utf-8") if summary_path.exists() else "" + if not knowledge_summary_markdown.strip(): + knowledge_summary_markdown = self._build_knowledge_summary_markdown( + entry=document_payload, + knowledge_candidates=list(knowledge_candidates), + ) + return LlmWikiDocumentDetailRead( + **LlmWikiDocumentRead.model_validate(document_payload).model_dump(), + knowledge_summary_markdown=knowledge_summary_markdown, + chunks=[LlmWikiChunkRead.model_validate(item) for item in chunks], + knowledge_candidates=[ + LlmWikiKnowledgeCandidateRead.model_validate(item) + for item in knowledge_candidates + ], + rule_candidates=[ + LlmWikiRuleCandidateRead.model_validate(item) + for item in rule_candidates + ], + ) + + def update_document_summary( + self, + document_id: str, + payload: LlmWikiSummaryUpdateWrite, + ) -> LlmWikiDocumentDetailRead: + self.knowledge_service.ensure_library_ready() + document_dir = self._document_dir(document_id) + document_payload = self._load_json_file(document_dir / "document.json", default={}) + if not document_payload: + raise FileNotFoundError(document_id) + + summary_text = str(payload.knowledge_summary_markdown or "").strip() + if not summary_text: + raise ValueError("知识总结不能为空。") + + (document_dir / "knowledge_summary.md").write_text(summary_text, encoding="utf-8") + return self.get_document_detail(document_id) + + def sync_folder( + self, + *, + folder: str = "报销制度", + current_user: CurrentUserContext, + document_ids: list[str] | None = None, + force: bool = False, + ) -> LlmWikiSyncRead: + self.knowledge_service.ensure_library_ready() + documents = self.knowledge_service.list_folder_documents(folder=folder) + if document_ids: + allowed_ids = set(document_ids) + documents = [item for item in documents if item.get("id") in allowed_ids] + target_document_ids = [ + str(item.get("id") or "").strip() + for item in documents + if str(item.get("id") or "").strip() + ] + if target_document_ids: + self.knowledge_service.set_document_ingest_statuses( + target_document_ids, + status_code=KNOWLEDGE_INGEST_STATUS_SYNCING, + ) + + try: + index = self._load_wiki_index() + sync_runs = self._load_sync_runs() + existing_by_id = { + str(item.get("document_id") or ""): item for item in list(index.get("documents", [])) + } + + run_id = f"wiki_{uuid4().hex[:12]}" + knowledge_candidate_count = 0 + rule_candidate_count = 0 + generated_rule_asset_ids: list[str] = [] + changed_document_count = 0 + sync_summaries: list[str] = [] + + for entry in documents: + document_id = str(entry.get("id") or "").strip() + if not document_id: + continue + + existing = existing_by_id.get(document_id) + sync_reason = self._resolve_sync_reason(entry=entry, existing=existing, force=force) + if sync_reason == "unchanged_skipped": + sync_summaries.append(f"{entry['original_name']}:未变化,跳过。") + continue + + changed_document_count += 1 + document_payload = self._sync_single_document( + entry=entry, + folder=folder, + current_user=current_user, + sync_reason=sync_reason, + ) + existing_by_id[document_id] = document_payload["document"] + knowledge_candidate_count += len(document_payload["knowledge_candidates"]) + rule_candidate_count += len(document_payload["rule_candidates"]) + generated_rule_asset_ids.extend( + [ + str(item.get("generated_asset_id") or "").strip() + for item in document_payload["rule_candidates"] + if str(item.get("generated_asset_id") or "").strip() + ] + ) + sync_summaries.append( + f"{entry['original_name']}:{sync_reason},知识候选 {len(document_payload['knowledge_candidates'])} 条," + f"规则候选 {len(document_payload['rule_candidates'])} 条。" + ) + + index["documents"] = list(existing_by_id.values()) + self._write_json_file(self.knowledge_service.llm_wiki_index_path, index) + + sync_runs.setdefault("runs", []) + sync_runs["runs"].append( + { + "run_id": run_id, + "folder": folder, + "requested_document_ids": document_ids or [], + "changed_document_count": changed_document_count, + "knowledge_candidate_count": knowledge_candidate_count, + "rule_candidate_count": rule_candidate_count, + "generated_rule_asset_ids": generated_rule_asset_ids, + "created_by": current_user.name, + "created_at": datetime.now(UTC).isoformat(), + "summary": sync_summaries, + } + ) + self._write_json_file(self.knowledge_service.llm_wiki_sync_runs_path, sync_runs) + self.knowledge_service.refresh_document_ingest_statuses( + document_ids=target_document_ids, + preserve_syncing=False, + ) + + generated_rule_ids = list(dict.fromkeys(generated_rule_asset_ids)) + summary = ";".join(sync_summaries) if sync_summaries else "未发现需要同步的知识文档。" + return LlmWikiSyncRead( + ok=True, + run_id=run_id, + folder=folder, + document_count=changed_document_count, + knowledge_candidate_count=knowledge_candidate_count, + rule_candidate_count=rule_candidate_count, + generated_rule_count=len(generated_rule_ids), + generated_rule_asset_ids=generated_rule_ids, + summary=summary, + ) + except Exception: + if target_document_ids: + self.knowledge_service.set_document_ingest_statuses( + target_document_ids, + status_code=KNOWLEDGE_INGEST_STATUS_FAILED, + ) + raise + + def _sync_single_document( + self, + *, + entry: dict[str, Any], + folder: str, + current_user: CurrentUserContext, + sync_reason: str, + ) -> dict[str, Any]: + document_id = str(entry["id"]) + document_name = str(entry["original_name"]) + document_dir = self._document_dir(document_id) + document_dir.mkdir(parents=True, exist_ok=True) + + extracted_text = self.knowledge_service.extract_document_text(document_id) + text_path = document_dir / "text.md" + text_path.write_text(extracted_text, encoding="utf-8") + + chunks = self._build_chunks(document_id=document_id, text=extracted_text) + knowledge_candidates, rule_candidates = self._extract_candidates( + entry=entry, + chunks=chunks, + current_user=current_user, + ) + + generated_candidates: list[dict[str, Any]] = [] + for candidate in rule_candidates: + if candidate.get("validation_status") == "valid": + generated_asset = self._create_or_update_rule_draft(candidate, current_user=current_user) + if generated_asset is not None: + candidate["generated_asset_id"] = generated_asset["asset_id"] + candidate["generated_asset_code"] = generated_asset["asset_code"] + candidate["generated_version"] = generated_asset["version"] + generated_candidates.append(candidate) + + document_record = { + "document_id": document_id, + "document_name": document_name, + "folder": folder, + "document_version": f"v{int(entry.get('version_number', 1))}.0", + "checksum": str(entry.get("sha256") or ""), + "extracted_text_path": str(text_path), + "chunk_count": len(chunks), + "knowledge_candidate_count": len(knowledge_candidates), + "rule_candidate_count": len(generated_candidates), + "updated_at": datetime.now(UTC).isoformat(), + "signature": self._build_document_signature(entry), + "sync_reason": sync_reason, + } + + self._write_json_file(document_dir / "document.json", document_record) + self._write_json_file(document_dir / "chunks.json", chunks) + self._write_json_file(document_dir / "knowledge_candidates.json", knowledge_candidates) + self._write_json_file(document_dir / "rule_candidates.json", generated_candidates) + (document_dir / "knowledge_summary.md").write_text( + self._build_knowledge_summary_markdown( + entry=entry, + knowledge_candidates=knowledge_candidates, + ), + encoding="utf-8", + ) + return { + "document": document_record, + "knowledge_candidates": knowledge_candidates, + "rule_candidates": generated_candidates, + } + + def _extract_candidates( + self, + *, + entry: dict[str, Any], + chunks: list[dict[str, Any]], + current_user: CurrentUserContext, + ) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]: + if not chunks: + return [], [] + + knowledge_candidates: list[dict[str, Any]] = [] + rule_candidates: list[dict[str, Any]] = [] + seen_knowledge_keys: set[str] = set() + seen_rule_keys: set[str] = set() + + for chunk_group in self._group_chunks(chunks, size=4): + payload = self._call_candidate_model(entry=entry, chunk_group=chunk_group) + batch_knowledge = self._normalize_knowledge_candidates( + raw_items=list(payload.get("knowledge_candidates") or []), + entry=entry, + chunk_group=chunk_group, + seen_keys=seen_knowledge_keys, + ) + batch_rules = self._normalize_rule_candidates( + raw_items=list(payload.get("rule_candidates") or []), + entry=entry, + chunk_group=chunk_group, + current_user=current_user, + seen_keys=seen_rule_keys, + ) + knowledge_candidates.extend(batch_knowledge) + rule_candidates.extend(batch_rules) + + if not knowledge_candidates: + fallback = self._build_fallback_knowledge_candidate(entry=entry, chunks=chunks) + if fallback is not None: + knowledge_candidates.append(fallback) + + return knowledge_candidates[:12], rule_candidates[:12] + + def _call_candidate_model( + self, + *, + entry: dict[str, Any], + chunk_group: list[dict[str, Any]], + ) -> dict[str, Any]: + if self._candidate_model_disabled: + return {} + + facts = { + "document_id": entry["id"], + "document_name": entry["original_name"], + "folder": entry["folder"], + "allowed_rule_templates": [ + { + "template_key": key, + "template_label": value["label"], + "summary": value["summary"], + } + for key, value in RULE_TEMPLATE_CATALOG.items() + ], + "runtime_rule_contracts": RUNTIME_RULE_CONTRACTS, + "chunks": [ + { + "chunk_id": item["chunk_id"], + "title": item["title"], + "content": item["content"][:900], + "source_page": item.get("source_page"), + "tags": item.get("tags", []), + } + for item in chunk_group + ], + } + system_prompt = ( + "你是企业财务制度知识库的 Hermes 规则形成器。" + "你只能基于提供的制度条款生成结构化知识候选和规则候选,不能自由发散。" + "规则候选必须从允许模板中选 template_key,严禁自创模板。" + "runtime_rule 必须严格遵守 runtime_rule_contracts 中对应模板的字段结构和允许值。" + "如果条款不适合自动规则化,可以只返回 knowledge_candidates。" + "输出必须是 JSON 对象,字段只有 knowledge_candidates 和 rule_candidates。" + "knowledge_candidates 每项字段:title, content, scenario, tags, evidence, confidence, source_chunk_ids。" + "rule_candidates 每项字段:template_key, suggested_rule_name, summary, scenario, purpose, scope, " + "inputs, judgement_logic, outputs, admin_note, runtime_rule, evidence, confidence, source_chunk_ids。" + "runtime_rule 也必须是 JSON 对象;如果当前无法形成可执行规则,也要给出结构化草稿 JSON," + "并在 kind 中写 policy_rule_draft。" + ) + user_prompt = ( + "请根据以下制度分块生成候选。" + "只返回 JSON 对象,不要输出解释,不要调用工具,不要追加任何其他文本。\n" + f"{json.dumps(facts, ensure_ascii=False, indent=2)}" + ) + hermes_query = ( + f"{system_prompt}\n\n" + f"{user_prompt}\n\n" + "再次强调:只返回一个 JSON 对象,根字段必须是 knowledge_candidates 和 rule_candidates。" + ) + + if self.system_hermes_service.is_available(): + try: + cli_result = self.system_hermes_service.run_query( + hermes_query, + source="tool", + max_turns=1, + timeout_seconds=HERMES_CANDIDATE_MODEL_TIMEOUT_SECONDS, + ) + payload = self._extract_json_payload(cli_result.response_text) + if payload: + return payload + self._candidate_model_disabled = True + logger.warning( + "System Hermes returned no parseable JSON for LLM Wiki doc=%s; using fallback candidates.", + entry.get("id"), + ) + return {} + except TimeoutExpired: + self._candidate_model_disabled = True + logger.warning( + "System Hermes timed out during LLM Wiki candidate extraction doc=%s; using fallback candidates.", + entry.get("id"), + ) + return {} + except Exception as exc: + self._candidate_model_disabled = True + logger.warning( + "System Hermes failed during LLM Wiki candidate extraction doc=%s: %s", + entry.get("id"), + exc, + ) + return {} + + response_text = self.runtime_chat_service.complete( + [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_prompt}, + ], + slot_priority=("main", "backup"), + max_tokens=1800, + temperature=0.0, + ) + payload = self._extract_json_payload(response_text) + if not payload: + self._candidate_model_disabled = True + return {} + return payload + + def _normalize_knowledge_candidates( + self, + *, + raw_items: list[Any], + entry: dict[str, Any], + chunk_group: list[dict[str, Any]], + seen_keys: set[str], + ) -> list[dict[str, Any]]: + normalized: list[dict[str, Any]] = [] + default_chunk_ids = [item["chunk_id"] for item in chunk_group] + + for item in raw_items: + if not isinstance(item, dict): + continue + title = str(item.get("title") or "").strip() + content = str(item.get("content") or "").strip() + if not title or not content: + continue + candidate_key = f"{title.casefold()}::{content[:80].casefold()}" + if candidate_key in seen_keys: + continue + seen_keys.add(candidate_key) + normalized.append( + { + "candidate_id": f"kc_{uuid4().hex[:12]}", + "title": title, + "content": content, + "domain": "expense", + "scenario": str(item.get("scenario") or "reimbursement_policy").strip() + or "reimbursement_policy", + "tags": self._normalize_tags(item.get("tags"), fallback_text=f"{title}\n{content}"), + "source_document_id": entry["id"], + "source_document_name": entry["original_name"], + "source_chunk_ids": self._normalize_chunk_ids( + item.get("source_chunk_ids"), + allowed_chunk_ids=default_chunk_ids, + fallback_chunk_ids=default_chunk_ids, + ), + "evidence": self._normalize_string_list(item.get("evidence")), + "confidence": self._normalize_confidence(item.get("confidence")), + "status": "draft", + "created_by": "hermes", + "created_at": datetime.now(UTC).isoformat(), + } + ) + return normalized + + def _normalize_rule_candidates( + self, + *, + raw_items: list[Any], + entry: dict[str, Any], + chunk_group: list[dict[str, Any]], + current_user: CurrentUserContext, + seen_keys: set[str], + ) -> list[dict[str, Any]]: + normalized: list[dict[str, Any]] = [] + default_chunk_ids = [item["chunk_id"] for item in chunk_group] + + for item in raw_items: + if not isinstance(item, dict): + continue + template_key = str(item.get("template_key") or "").strip() + if template_key not in RULE_TEMPLATE_CATALOG: + continue + rule_name = str(item.get("suggested_rule_name") or "").strip() + if not rule_name: + continue + candidate_key = f"{template_key}::{rule_name.casefold()}" + if candidate_key in seen_keys: + continue + seen_keys.add(candidate_key) + + template_sections = { + "purpose": str(item.get("purpose") or "").strip(), + "scope": str(item.get("scope") or "").strip(), + "inputs": self._normalize_string_list(item.get("inputs")), + "judgement_logic": self._normalize_string_list(item.get("judgement_logic")), + "outputs": self._normalize_string_list(item.get("outputs")), + "admin_note": str(item.get("admin_note") or "").strip(), + } + runtime_rule = item.get("runtime_rule") + if not isinstance(runtime_rule, dict): + runtime_rule = {} + validation_result = self._normalize_runtime_rule( + runtime_rule=runtime_rule, + template_key=template_key, + rule_name=rule_name, + scenario=str(item.get("scenario") or "reimbursement_policy").strip() + or "reimbursement_policy", + source_document_name=str(entry["original_name"]), + template_sections=template_sections, + evidence=self._normalize_string_list(item.get("evidence")), + ) + runtime_rule = validation_result["runtime_rule"] + validation_errors = list(validation_result["validation_errors"]) + validation_status = "valid" if not validation_errors else "invalid" + + normalized.append( + { + "candidate_id": f"rc_{uuid4().hex[:12]}", + "source_type": "policy_document", + "template_key": template_key, + "template_label": RULE_TEMPLATE_CATALOG[template_key]["label"], + "domain": "expense", + "scenario": str(item.get("scenario") or "reimbursement_policy").strip() + or "reimbursement_policy", + "suggested_rule_name": rule_name, + "summary": str(item.get("summary") or "").strip(), + "template_sections": template_sections, + "rule_markdown_draft": self._build_rule_markdown( + rule_name=rule_name, + template_key=template_key, + template_sections=template_sections, + runtime_rule=runtime_rule, + evidence=self._normalize_string_list(item.get("evidence")), + confidence=self._normalize_confidence(item.get("confidence")), + source_document_name=str(entry["original_name"]), + current_user=current_user, + ), + "runtime_rule": runtime_rule, + "evidence": self._normalize_string_list(item.get("evidence")), + "confidence": self._normalize_confidence(item.get("confidence")), + "source_document_id": entry["id"], + "source_document_name": entry["original_name"], + "source_chunk_ids": self._normalize_chunk_ids( + item.get("source_chunk_ids"), + allowed_chunk_ids=default_chunk_ids, + fallback_chunk_ids=default_chunk_ids, + ), + "validation_status": validation_status, + "validation_errors": validation_errors, + "status": "draft" if validation_status == "valid" else "validation_failed", + "created_by": "hermes", + "created_at": datetime.now(UTC).isoformat(), + } + ) + return normalized + + def _normalize_runtime_rule( + self, + *, + runtime_rule: dict[str, Any], + template_key: str, + rule_name: str, + scenario: str, + source_document_name: str, + template_sections: dict[str, Any], + evidence: list[str], + ) -> dict[str, Any]: + base_payload = { + "kind": "policy_rule_draft", + "version": 1, + "template_key": template_key, + "rule_name": rule_name, + "scenario": scenario, + "source_document_name": source_document_name, + "review_required": True, + } + raw_payload = dict(runtime_rule) + + if template_key == "travel_standard_v1": + candidate_payload = self._build_travel_standard_runtime_rule( + base_payload=base_payload, + raw_payload=raw_payload, + scenario=scenario, + ) + elif template_key == "expense_amount_limit_v1": + candidate_payload = self._build_expense_amount_limit_runtime_rule( + base_payload=base_payload, + raw_payload=raw_payload, + scenario=scenario, + ) + elif template_key == "attachment_requirement_v1": + candidate_payload = self._build_attachment_requirement_runtime_rule( + base_payload=base_payload, + raw_payload=raw_payload, + scenario=scenario, + ) + else: + candidate_payload = self._build_general_policy_runtime_rule( + base_payload=base_payload, + raw_payload=raw_payload, + scenario=scenario, + template_sections=template_sections, + evidence=evidence, + ) + + model_class = RUNTIME_RULE_MODEL_BY_TEMPLATE[template_key] + try: + validated = model_class.model_validate(candidate_payload) + except ValidationError as exc: + return { + "runtime_rule": candidate_payload, + "validation_errors": self._format_validation_errors(exc), + } + + return { + "runtime_rule": validated.model_dump(mode="json"), + "validation_errors": [], + } + + def _build_travel_standard_runtime_rule( + self, + *, + base_payload: dict[str, Any], + raw_payload: dict[str, Any], + scenario: str, + ) -> dict[str, Any]: + target_payload = self._build_target_payload( + raw_target=raw_payload.get("target"), + expense_types=self._normalize_string_list(raw_payload.get("expense_types")) or ["travel", "hotel", "transport"], + scene_codes=self._normalize_string_list(raw_payload.get("scene_codes")) or [scenario], + ) + control_points = self._normalize_travel_control_points(raw_payload.get("control_points")) + if not control_points: + control_points = [ + {"control_code": control_code, "severity": "high", "enabled": True} + for control_code in TRAVEL_CONTROL_CODES + ] + + exception_policy = raw_payload.get("exception_policy") + if not isinstance(exception_policy, dict): + exception_policy = {} + + return { + **base_payload, + "target": target_payload, + "control_points": control_points, + "exception_policy": { + "allow_with_explanation": bool(exception_policy.get("allow_with_explanation", True)), + "keywords": self._normalize_string_list( + exception_policy.get("keywords") or raw_payload.get("exception_keywords") + ), + }, + "output": self._build_output_payload( + raw_output=raw_payload.get("output"), + risk_code="travel_policy_review", + action="review", + message="差旅制度命中后需要人工复核。", + ), + } + + def _build_expense_amount_limit_runtime_rule( + self, + *, + base_payload: dict[str, Any], + raw_payload: dict[str, Any], + scenario: str, + ) -> dict[str, Any]: + raw_target = raw_payload.get("target") + if not isinstance(raw_target, dict): + raw_target = {} + target_payload = self._build_target_payload( + raw_target=raw_target, + expense_types=self._normalize_string_list(raw_payload.get("expense_types")), + scene_codes=self._normalize_string_list(raw_payload.get("scene_codes")) or [scenario], + ) + target_payload["metric"] = ( + str(raw_target.get("metric") or raw_payload.get("metric") or raw_payload.get("scope") or "claim_total").strip() + or "claim_total" + ) + + raw_threshold = raw_payload.get("threshold") + if not isinstance(raw_threshold, dict): + raw_threshold = {} + + warn_amount = raw_threshold.get("warn_amount", raw_payload.get("warn_amount")) + block_amount = raw_threshold.get("block_amount", raw_payload.get("block_amount")) + threshold_source = str(raw_threshold.get("source") or "").strip() or ( + "document_value" if warn_amount is not None or block_amount is not None else "manual_fill_required" + ) + + exception_policy = raw_payload.get("exception_policy") + if not isinstance(exception_policy, dict): + exception_policy = {} + + return { + **base_payload, + "target": target_payload, + "threshold": { + "currency": str(raw_threshold.get("currency") or raw_payload.get("currency") or "CNY").strip() or "CNY", + "comparator": str(raw_threshold.get("comparator") or raw_payload.get("comparator") or "gt").strip() or "gt", + "warn_amount": warn_amount, + "block_amount": block_amount, + "source": threshold_source, + }, + "exception_policy": { + "allow_with_explanation": bool(exception_policy.get("allow_with_explanation", True)), + "keywords": self._normalize_string_list( + exception_policy.get("keywords") or raw_payload.get("exception_keywords") + ), + }, + "output": self._build_output_payload( + raw_output=raw_payload.get("output"), + risk_code="amount_limit_review", + action="review", + message="金额超标后需要人工复核。", + ), + } + + def _build_attachment_requirement_runtime_rule( + self, + *, + base_payload: dict[str, Any], + raw_payload: dict[str, Any], + scenario: str, + ) -> dict[str, Any]: + target_payload = self._build_target_payload( + raw_target=raw_payload.get("target"), + expense_types=self._normalize_string_list(raw_payload.get("expense_types")), + scene_codes=self._normalize_string_list(raw_payload.get("scene_codes")) or [scenario], + ) + + raw_requirements = raw_payload.get("attachment_requirements") + if not isinstance(raw_requirements, dict): + raw_requirements = {} + requirement_items = self._normalize_attachment_items( + raw_requirements.get("items") + or raw_payload.get("required_attachments") + or raw_payload.get("attachment_items") + ) + manual_fill_required = bool(raw_requirements.get("manual_fill_required", not requirement_items)) + min_attachment_count = raw_requirements.get("min_attachment_count") or raw_payload.get("min_attachment_count") or 1 + try: + min_attachment_count_value = max(1, int(min_attachment_count)) + except (TypeError, ValueError): + min_attachment_count_value = 1 + + return { + **base_payload, + "target": target_payload, + "attachment_requirements": { + "min_attachment_count": min_attachment_count_value, + "items": requirement_items, + "manual_fill_required": manual_fill_required, + }, + "missing_attachment_action": str( + raw_payload.get("missing_attachment_action") + or raw_payload.get("action") + or "block" + ).strip() + or "block", + "output": self._build_output_payload( + raw_output=raw_payload.get("output"), + risk_code="invoice_anomaly", + action="block", + message="附件或单据不完整,需补件后再提交。", + ), + } + + def _build_general_policy_runtime_rule( + self, + *, + base_payload: dict[str, Any], + raw_payload: dict[str, Any], + scenario: str, + template_sections: dict[str, Any], + evidence: list[str], + ) -> dict[str, Any]: + target_payload = self._build_target_payload( + raw_target=raw_payload.get("target"), + expense_types=self._normalize_string_list(raw_payload.get("expense_types")), + scene_codes=self._normalize_string_list(raw_payload.get("scene_codes")) or [scenario], + ) + control_points = self._normalize_string_list(raw_payload.get("control_points")) + if not control_points: + control_points = self._normalize_string_list(template_sections.get("judgement_logic")) + review_checklist = self._normalize_string_list(raw_payload.get("review_checklist")) + if not review_checklist: + review_checklist = self._normalize_string_list(template_sections.get("outputs")) or evidence[:3] + + return { + **base_payload, + "target": target_payload, + "control_points": control_points, + "review_checklist": review_checklist, + "output": self._build_output_payload( + raw_output=raw_payload.get("output"), + risk_code="general_policy_review", + action="review", + message="通用制度草稿需由人工补充执行细节。", + ), + } + + def _build_target_payload( + self, + *, + raw_target: Any, + expense_types: list[str], + scene_codes: list[str], + ) -> dict[str, Any]: + target = raw_target if isinstance(raw_target, dict) else {} + return { + "expense_types": self._normalize_string_list(target.get("expense_types")) or expense_types, + "scene_codes": self._normalize_string_list(target.get("scene_codes")) or scene_codes, + } + + def _build_output_payload( + self, + *, + raw_output: Any, + risk_code: str, + action: str, + message: str, + ) -> dict[str, Any]: + output = raw_output if isinstance(raw_output, dict) else {} + return { + "risk_code": str(output.get("risk_code") or risk_code).strip() or risk_code, + "action": str(output.get("action") or action).strip() or action, + "message": str(output.get("message") or message).strip() or message, + } + + def _normalize_travel_control_points(self, value: Any) -> list[dict[str, Any]]: + normalized: list[dict[str, Any]] = [] + for item in value or []: + if isinstance(item, str): + control_code = item.strip() + if control_code in TRAVEL_CONTROL_CODES: + normalized.append( + {"control_code": control_code, "severity": "high", "enabled": True} + ) + continue + if not isinstance(item, dict): + continue + control_code = str(item.get("control_code") or item.get("code") or "").strip() + if control_code not in TRAVEL_CONTROL_CODES: + continue + normalized.append( + { + "control_code": control_code, + "severity": str(item.get("severity") or "high").strip() or "high", + "enabled": bool(item.get("enabled", True)), + } + ) + return normalized + + def _normalize_attachment_items(self, value: Any) -> list[dict[str, Any]]: + normalized: list[dict[str, Any]] = [] + for item in value or []: + if isinstance(item, str): + document_type = item.strip() + if document_type: + normalized.append( + { + "document_type": document_type, + "required": True, + "min_count": 1, + "description": "", + } + ) + continue + if not isinstance(item, dict): + continue + document_type = str(item.get("document_type") or item.get("type") or "").strip() + if not document_type: + continue + min_count = item.get("min_count", 1) + try: + min_count_value = max(1, int(min_count)) + except (TypeError, ValueError): + min_count_value = 1 + normalized.append( + { + "document_type": document_type, + "required": bool(item.get("required", True)), + "min_count": min_count_value, + "description": str(item.get("description") or "").strip(), + } + ) + return normalized + + @staticmethod + def _format_validation_errors(exc: ValidationError) -> list[str]: + errors: list[str] = [] + for error in exc.errors(): + location = ".".join(str(item) for item in error.get("loc", [])) + message = str(error.get("msg") or "invalid runtime_rule") + errors.append(f"{location}: {message}" if location else message) + return errors + + def _build_fallback_knowledge_candidate( + self, + *, + entry: dict[str, Any], + chunks: list[dict[str, Any]], + ) -> dict[str, Any] | None: + first_chunk = next((item for item in chunks if str(item.get("content") or "").strip()), None) + if first_chunk is None: + return None + + content = str(first_chunk["content"]).strip() + summary = content[:200].strip() + if not summary: + return None + + return { + "candidate_id": f"kc_{uuid4().hex[:12]}", + "title": str(first_chunk.get("title") or entry["original_name"]).strip() + or str(entry["original_name"]), + "content": summary, + "domain": "expense", + "scenario": "reimbursement_policy", + "tags": self._normalize_tags([], fallback_text=content), + "source_document_id": entry["id"], + "source_document_name": entry["original_name"], + "source_chunk_ids": [first_chunk["chunk_id"]], + "evidence": [summary], + "confidence": 0.4, + "status": "draft", + "created_by": "hermes", + "created_at": datetime.now(UTC).isoformat(), + } + + @staticmethod + def _build_knowledge_summary_markdown( + *, + entry: dict[str, Any], + knowledge_candidates: list[dict[str, Any]], + ) -> str: + document_name = str( + entry.get("original_name") + or entry.get("document_name") + or entry.get("source_document_name") + or "知识文档" + ).strip() or "知识文档" + lines = [ + f"# {document_name} 知识总结", + "", + "## 概览", + "", + f"- 来源文档:{document_name}", + f"- 知识条目数:{len(knowledge_candidates)}", + "", + "## 核心知识", + "", + ] + + if not knowledge_candidates: + lines.extend( + [ + "暂无可展示的知识总结,待 Hermes 重新归纳后生成。", + "", + ] + ) + return "\n".join(lines).strip() + + for index, item in enumerate(knowledge_candidates[:8], start=1): + title = str(item.get("title") or f"知识点 {index}").strip() or f"知识点 {index}" + content = str(item.get("content") or "").strip() or "待补充内容。" + scenario = str(item.get("scenario") or "").strip() + tags = [str(tag).strip() for tag in list(item.get("tags") or []) if str(tag).strip()] + lines.extend( + [ + f"### {index}. {title}", + "", + content, + "", + ] + ) + if scenario: + lines.append(f"- 适用场景:{scenario}") + if tags: + lines.append(f"- 标签:{' / '.join(tags)}") + lines.append("") + + return "\n".join(lines).strip() + + def _create_or_update_rule_draft( + self, + candidate: dict[str, Any], + *, + current_user: CurrentUserContext, + ) -> dict[str, str] | None: + source_key = self._build_rule_source_key(candidate) + existing_asset = self._find_rule_asset_by_source_key(source_key) + runtime_rule = dict(candidate.get("runtime_rule") or {}) + config_json = { + "severity": "medium", + "enabled": False, + "runtime_kind": str(runtime_rule.get("kind") or "policy_rule_draft"), + "runtime_rule": runtime_rule, + "rule_template_key": candidate["template_key"], + "rule_template_label": candidate["template_label"], + "hermes_source_key": source_key, + "source_document_id": candidate["source_document_id"], + "source_document_name": candidate["source_document_name"], + "llm_wiki_managed": True, + } + + if existing_asset is None: + created_asset = self.asset_service.create_asset( + AgentAssetCreate( + asset_type=AgentAssetType.RULE, + code=self._build_rule_asset_code(candidate), + name=candidate["suggested_rule_name"], + description=str(candidate.get("summary") or "").strip() + or "由 Hermes 根据制度文档生成的规则草稿。", + domain=AgentAssetDomain.EXPENSE, + scenario_json=self._build_rule_scenarios(candidate), + owner="Hermes", + reviewer=current_user.name, + status=AgentAssetStatus.DRAFT, + config_json=config_json, + ), + actor=current_user.name, + ) + version = "v1.0.0" + self.asset_service.create_version( + created_asset.id, + AgentAssetVersionCreate( + version=version, + content=candidate["rule_markdown_draft"], + content_type=AgentAssetContentType.MARKDOWN, + change_note="Hermes 根据制度文档生成规则草稿。", + created_by=current_user.name, + ), + actor=current_user.name, + ) + return { + "asset_id": created_asset.id, + "asset_code": created_asset.code, + "version": version, + } + + detail = self.asset_service.get_asset(existing_asset.id) + if detail is None: + return None + if detail.status == AgentAssetStatus.ACTIVE.value: + return None + + current_markdown = str(detail.current_version_content or "") + current_runtime_rule = ( + detail.config_json.get("runtime_rule") if isinstance(detail.config_json, dict) else None + ) + metadata_changed = ( + detail.name != candidate["suggested_rule_name"] + or detail.description + != (str(candidate.get("summary") or "").strip() or "由 Hermes 根据制度文档生成的规则草稿。") + or detail.reviewer != current_user.name + or detail.config_json != config_json + or detail.scenario_json != self._build_rule_scenarios(candidate) + ) + content_changed = current_markdown != candidate["rule_markdown_draft"] + runtime_changed = current_runtime_rule != runtime_rule + + if metadata_changed: + self.asset_service.update_asset( + existing_asset.id, + AgentAssetUpdate( + name=candidate["suggested_rule_name"], + description=str(candidate.get("summary") or "").strip() + or "由 Hermes 根据制度文档生成的规则草稿。", + reviewer=current_user.name, + scenario_json=self._build_rule_scenarios(candidate), + status=AgentAssetStatus.DRAFT, + config_json=config_json, + ), + actor=current_user.name, + ) + + version = detail.current_version or "v1.0.0" + if content_changed or runtime_changed or not detail.current_version: + version = self._increment_version(detail.current_version) + self.asset_service.create_version( + existing_asset.id, + AgentAssetVersionCreate( + version=version, + content=candidate["rule_markdown_draft"], + content_type=AgentAssetContentType.MARKDOWN, + change_note="Hermes 根据制度文档更新规则草稿。", + created_by=current_user.name, + ), + actor=current_user.name, + ) + + return { + "asset_id": existing_asset.id, + "asset_code": existing_asset.code, + "version": version, + } + + def _find_rule_asset_by_source_key(self, source_key: str) -> AgentAsset | None: + assets = list( + self.db.scalars( + select(AgentAsset) + .where(AgentAsset.asset_type == AgentAssetType.RULE.value) + .where(AgentAsset.domain == AgentAssetDomain.EXPENSE.value) + ).all() + ) + for asset in assets: + config_json = asset.config_json or {} + if str(config_json.get("hermes_source_key") or "").strip() == source_key: + return asset + return None + + @staticmethod + def _build_rule_source_key(candidate: dict[str, Any]) -> str: + raw = ( + f"{candidate['source_document_id']}::{candidate['template_key']}::" + f"{candidate['suggested_rule_name']}::{candidate['scenario']}" + ) + return hashlib.sha1(raw.encode("utf-8")).hexdigest() + + def _build_rule_asset_code(self, candidate: dict[str, Any]) -> str: + template_key = re.sub(r"[^a-z0-9]+", "_", str(candidate["template_key"]).lower()).strip("_") + source_key = self._build_rule_source_key(candidate)[:10] + return f"rule.expense.hermes.{template_key}.{source_key}" + + @staticmethod + def _build_rule_scenarios(candidate: dict[str, Any]) -> list[str]: + scenario = str(candidate.get("scenario") or "reimbursement_policy").strip() + template_key = str(candidate.get("template_key") or "general_policy_v1").strip() + return ["expense", "rule_center", scenario, template_key] + + @staticmethod + def _increment_version(version: str | None) -> str: + normalized = str(version or "").strip().removeprefix("v") + match = re.fullmatch(r"(\d+)\.(\d+)\.(\d+)", normalized) + if not match: + return "v1.0.0" + major, minor, patch = (int(value) for value in match.groups()) + return f"v{major}.{minor}.{patch + 1}" + + @staticmethod + def _build_rule_markdown( + *, + rule_name: str, + template_key: str, + template_sections: dict[str, Any], + runtime_rule: dict[str, Any], + evidence: list[str], + confidence: float, + source_document_name: str, + current_user: CurrentUserContext, + ) -> str: + def render_lines(items: list[str], empty_text: str) -> list[str]: + if not items: + return [f"- {empty_text}"] + return [f"- {item}" for item in items] + + sections = [ + f"# {rule_name}", + "", + "## 模板信息", + "", + f"- 模板键:`{template_key}`", + f"- 来源文档:{source_document_name}", + f"- Hermes 置信度:{confidence:.2f}", + f"- 审核人:{current_user.name}", + "", + "## 目标", + "", + template_sections.get("purpose") or "待审核人补充规则目标。", + "", + "## 适用范围", + "", + template_sections.get("scope") or "待审核人补充适用范围。", + "", + "## 输入字段", + "", + *render_lines(list(template_sections.get("inputs") or []), "待审核人补充输入字段。"), + "", + "## 判断规则", + "", + *render_lines( + list(template_sections.get("judgement_logic") or []), + "待审核人补充判断逻辑。", + ), + "", + "## 输出", + "", + *render_lines(list(template_sections.get("outputs") or []), "待审核人补充输出动作。"), + "", + "## 来源依据", + "", + *render_lines(evidence, "待审核人补充文档依据。"), + "", + "## 审核约束", + "", + "- 当前规则由 Hermes 自动生成,默认仅为 draft 草稿。", + "- 规则上线前必须人工审核、补测样例、确认回滚方案。", + "- JSON 运行时配置需要与 Markdown 说明保持一致。", + "", + "## 管理员备注", + "", + template_sections.get("admin_note") or "待审核人补充备注。", + "", + "```expense-rule", + json.dumps(runtime_rule, ensure_ascii=False, indent=2), + "```", + ] + return "\n".join(sections).strip() + + @staticmethod + def _group_chunks(chunks: list[dict[str, Any]], *, size: int) -> list[list[dict[str, Any]]]: + grouped: list[list[dict[str, Any]]] = [] + for index in range(0, len(chunks), size): + grouped.append(chunks[index : index + size]) + return grouped + + def _build_chunks(self, *, document_id: str, text: str) -> list[dict[str, Any]]: + pages = [page.strip() for page in str(text or "").split("\f")] + if not pages: + pages = [str(text or "").strip()] + + chunks: list[dict[str, Any]] = [] + chunk_index = 0 + + for page_index, page_text in enumerate(pages, start=1): + if not page_text: + continue + current_title = "" + buffer: list[str] = [] + lines = [self._clean_line(line) for line in page_text.splitlines()] + lines = [line for line in lines if line] + + for line in lines: + if HEADING_PATTERN.match(line): + chunk_index = self._flush_chunk( + chunks=chunks, + document_id=document_id, + chunk_index=chunk_index, + title=current_title, + lines=buffer, + source_page=page_index, + ) + current_title = line + buffer = [] + continue + + buffer.append(line) + if sum(len(item) for item in buffer) >= 750: + chunk_index = self._flush_chunk( + chunks=chunks, + document_id=document_id, + chunk_index=chunk_index, + title=current_title, + lines=buffer, + source_page=page_index, + ) + buffer = [] + + chunk_index = self._flush_chunk( + chunks=chunks, + document_id=document_id, + chunk_index=chunk_index, + title=current_title, + lines=buffer, + source_page=page_index, + ) + + return chunks + + def _flush_chunk( + self, + *, + chunks: list[dict[str, Any]], + document_id: str, + chunk_index: int, + title: str, + lines: list[str], + source_page: int, + ) -> int: + content = "\n".join(line for line in lines if line).strip() + if not content: + return chunk_index + + normalized_title = title.strip() or self._derive_chunk_title(content) + tags = self._normalize_tags([], fallback_text=f"{normalized_title}\n{content}") + chunk = { + "chunk_id": f"{document_id}-chunk-{chunk_index + 1:03d}", + "title": normalized_title, + "content": content, + "source_page": source_page, + "word_count": len(re.sub(r"\s+", "", content)), + "tags": tags, + } + chunks.append(chunk) + return chunk_index + 1 + + @staticmethod + def _derive_chunk_title(content: str) -> str: + first_line = next((line.strip() for line in content.splitlines() if line.strip()), "") + if first_line: + return first_line[:48] + return "未命名条款" + + @staticmethod + def _clean_line(line: str) -> str: + cleaned = str(line or "").replace("\u3000", " ").strip() + cleaned = re.sub(r"\s+", " ", cleaned) + return cleaned + + @staticmethod + def _normalize_string_list(value: Any) -> list[str]: + if isinstance(value, list): + return [str(item).strip() for item in value if str(item).strip()] + if isinstance(value, str) and value.strip(): + return [value.strip()] + return [] + + def _normalize_tags(self, value: Any, *, fallback_text: str) -> list[str]: + tags = self._normalize_string_list(value) + text = f"{' '.join(tags)}\n{fallback_text}" + for keyword, label in TAG_HINTS: + if keyword in text and label not in tags: + tags.append(label) + return tags[:8] + + @staticmethod + def _normalize_confidence(value: Any) -> float: + try: + confidence = float(value) + except (TypeError, ValueError): + return 0.0 + return round(max(0.0, min(confidence, 1.0)), 2) + + @staticmethod + def _normalize_chunk_ids( + value: Any, + *, + allowed_chunk_ids: list[str], + fallback_chunk_ids: list[str], + ) -> list[str]: + items = [str(item).strip() for item in value or [] if str(item).strip()] + filtered = [item for item in items if item in allowed_chunk_ids] + return filtered or fallback_chunk_ids[:2] + + @staticmethod + def _extract_json_payload(response_text: str | None) -> dict[str, Any] | None: + if not response_text: + return None + + cleaned = THINK_PATTERN.sub("", response_text).strip() + if not cleaned: + return None + + candidates: list[str] = [] + fenced_match = JSON_FENCE_PATTERN.search(cleaned) + if fenced_match is not None: + candidates.append(fenced_match.group(1)) + candidates.append(cleaned) + + start = cleaned.find("{") + end = cleaned.rfind("}") + if start != -1 and end != -1 and end > start: + candidates.append(cleaned[start : end + 1]) + + for candidate in candidates: + try: + parsed = json.loads(candidate) + except json.JSONDecodeError: + continue + if isinstance(parsed, dict): + return parsed + return None + + def _resolve_sync_reason( + self, + *, + entry: dict[str, Any], + existing: dict[str, Any] | None, + force: bool, + ) -> str: + if force: + return "forced_rebuild" + if existing is None: + return "initial_build" + + previous_signature = existing.get("signature") + if not isinstance(previous_signature, dict): + return "signature_missing" + + current_signature = self._build_document_signature(entry) + reasons: list[str] = [] + if previous_signature.get("original_name") != current_signature["original_name"]: + reasons.append("filename_changed") + if previous_signature.get("sha256") != current_signature["sha256"]: + reasons.append("content_changed") + if previous_signature.get("version_number") != current_signature["version_number"]: + reasons.append("version_changed") + if previous_signature.get("updated_at") != current_signature["updated_at"]: + reasons.append("updated_at_changed") + if previous_signature.get("stored_name") != current_signature["stored_name"]: + reasons.append("stored_name_changed") + + if not reasons: + return "unchanged_skipped" + return ",".join(reasons) + + @staticmethod + def _build_document_signature(entry: dict[str, Any]) -> dict[str, Any]: + return { + "document_id": str(entry.get("id") or ""), + "original_name": str(entry.get("original_name") or ""), + "stored_name": str(entry.get("stored_name") or ""), + "sha256": str(entry.get("sha256") or ""), + "version_number": int(entry.get("version_number") or 1), + "updated_at": str(entry.get("updated_at") or ""), + } + + def _load_wiki_index(self) -> dict[str, Any]: + return self._load_json_file(self.knowledge_service.llm_wiki_index_path, default={"documents": []}) + + def _load_sync_runs(self) -> dict[str, Any]: + return self._load_json_file(self.knowledge_service.llm_wiki_sync_runs_path, default={"runs": []}) + + def _document_dir(self, document_id: str) -> Path: + return self.knowledge_service.llm_wiki_documents_root / document_id + + @staticmethod + def _load_json_file(path: Path, *, default: Any) -> Any: + try: + return json.loads(path.read_text(encoding="utf-8")) + except (FileNotFoundError, json.JSONDecodeError): + return default + + @staticmethod + def _write_json_file(path: Path, payload: Any) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") diff --git a/server/tests/test_llm_wiki_service.py b/server/tests/test_llm_wiki_service.py new file mode 100644 index 0000000..062044f --- /dev/null +++ b/server/tests/test_llm_wiki_service.py @@ -0,0 +1,524 @@ +from __future__ import annotations + +import json +from subprocess import TimeoutExpired +from collections.abc import Generator +from pathlib import Path + +import pytest +from fastapi.testclient import TestClient +from sqlalchemy import create_engine +from sqlalchemy.orm import Session, sessionmaker +from sqlalchemy.pool import StaticPool + +from app.api.deps import CurrentUserContext, get_db +from app.core.agent_enums import AgentReviewStatus, AgentRunSource, AgentRunStatus +from app.db.base import Base +from app.main import create_app +from app.schemas.agent_asset import AgentAssetReviewCreate +from app.schemas.knowledge import LlmWikiSummaryUpdateWrite, LlmWikiSyncRead +from app.services.agent_assets import AgentAssetService +from app.services.agent_runs import AgentRunService +from app.services.knowledge import ( + KNOWLEDGE_INGEST_STATUS_FAILED, + KNOWLEDGE_INGEST_STATUS_INGESTED, + KNOWLEDGE_INGEST_STATUS_PUBLISHED, + KnowledgeService, +) +from app.services.llm_wiki import LlmWikiService + + +def build_session() -> Session: + engine = create_engine( + "sqlite+pysqlite:///:memory:", + connect_args={"check_same_thread": False}, + poolclass=StaticPool, + ) + Base.metadata.create_all(bind=engine) + session_factory = sessionmaker(bind=engine, autoflush=False, autocommit=False) + return session_factory() + + +def build_client() -> tuple[TestClient, sessionmaker[Session]]: + engine = create_engine( + "sqlite+pysqlite:///:memory:", + connect_args={"check_same_thread": False}, + poolclass=StaticPool, + ) + Base.metadata.create_all(bind=engine) + session_factory = sessionmaker(bind=engine, autoflush=False, autocommit=False) + app = create_app() + + def override_db() -> Generator[Session, None, None]: + db = session_factory() + try: + yield db + finally: + db.close() + + app.dependency_overrides[get_db] = override_db + return TestClient(app), session_factory + + +def build_admin_user() -> CurrentUserContext: + return CurrentUserContext( + username="admin", + name="管理员", + role_codes=["manager"], + is_admin=True, + ) + + +def upload_policy_document(storage_root: Path, *, filename: str = "公司差旅报销制度.txt") -> str: + service = KnowledgeService(storage_root=storage_root) + service.ensure_library_ready() + document = service.upload_document( + folder="报销制度", + filename=filename, + content=( + "第一章 差旅报销\n" + "员工因公出差发生的住宿费应按照公司差旅标准执行。\n" + "住宿费超过标准时,必须升级至总经理审批。\n" + "报销时必须提供发票、行程单和审批说明。\n" + ).encode("utf-8"), + current_user=build_admin_user(), + ) + return document.id + + +def build_candidate_payload(chunk_id: str, *, summary: str = "住宿费超过标准时必须升级审批。") -> dict[str, object]: + return { + "knowledge_candidates": [ + { + "title": "住宿费升级审批要求", + "content": summary, + "scenario": "reimbursement_policy", + "tags": ["住宿", "审批"], + "evidence": [summary], + "confidence": 0.91, + "source_chunk_ids": [chunk_id], + } + ], + "rule_candidates": [ + { + "template_key": "expense_amount_limit_v1", + "suggested_rule_name": "住宿费超标审批规则", + "summary": "当住宿费超过制度标准时触发升级审批。", + "scenario": "travel_standard", + "purpose": "识别差旅住宿费是否超出制度标准。", + "scope": "适用于员工差旅住宿报销场景。", + "inputs": ["expense_type", "amount", "travel_grade"], + "judgement_logic": [summary], + "outputs": ["approval_required=true", "risk_level=medium"], + "admin_note": "上线前需要由财务补充不同职级的金额阈值。", + "runtime_rule": { + "target": { + "expense_types": ["hotel"], + "scene_codes": ["travel_standard"], + "metric": "item_amount", + }, + "threshold": { + "currency": "CNY", + "comparator": "gt", + "warn_amount": "450.00", + "block_amount": "600.00", + "source": "document_value", + }, + "exception_policy": { + "allow_with_explanation": True, + "keywords": ["超标说明", "协议酒店满房"], + }, + "output": { + "risk_code": "travel_hotel_limit", + "action": "review", + "message": "住宿费超过制度标准时需要升级审批。", + }, + }, + "evidence": [summary], + "confidence": 0.93, + "source_chunk_ids": [chunk_id], + } + ], + } + + +def build_invalid_candidate_payload(chunk_id: str) -> dict[str, object]: + return { + "knowledge_candidates": [], + "rule_candidates": [ + { + "template_key": "expense_amount_limit_v1", + "suggested_rule_name": "无效金额规则草稿", + "summary": "用于验证 schema 强校验。", + "scenario": "travel_standard", + "purpose": "验证不合规的 runtime_rule 不会落到规则中心。", + "scope": "测试场景。", + "inputs": ["expense_type", "amount"], + "judgement_logic": ["金额超过标准则需审批。"], + "outputs": ["approval_required=true"], + "admin_note": "此规则故意构造错误阈值。", + "runtime_rule": { + "target": { + "expense_types": ["hotel"], + "scene_codes": ["travel_standard"], + "metric": "item_amount", + }, + "threshold": { + "currency": "CNY", + "comparator": "gt", + "warn_amount": "600.00", + "block_amount": "450.00", + "source": "document_value", + }, + "output": { + "risk_code": "travel_hotel_limit", + "action": "review", + "message": "无效阈值。", + }, + }, + "evidence": ["金额阈值配置不应允许 block 小于 warn。"], + "confidence": 0.88, + "source_chunk_ids": [chunk_id], + } + ], + } + + +def update_document_timestamp(storage_root: Path, document_id: str, updated_at: str) -> None: + index_path = storage_root / "knowledge" / ".index.json" + payload = json.loads(index_path.read_text(encoding="utf-8")) + for item in payload["documents"]: + if item["id"] == document_id: + item["updated_at"] = updated_at + break + index_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") + + +def test_llm_wiki_sync_creates_artifacts_and_draft_rule(tmp_path, monkeypatch) -> None: + document_id = upload_policy_document(tmp_path) + + def fake_call_candidate_model(self, *, entry, chunk_group): + return build_candidate_payload(chunk_group[0]["chunk_id"]) + + monkeypatch.setattr(LlmWikiService, "_call_candidate_model", fake_call_candidate_model) + + with build_session() as db: + service = LlmWikiService(db, storage_root=tmp_path) + result = service.sync_folder(folder="报销制度", current_user=build_admin_user()) + + assert result.document_count == 1 + assert result.knowledge_candidate_count == 1 + assert result.rule_candidate_count == 1 + assert result.generated_rule_count == 1 + assert len(result.generated_rule_asset_ids) == 1 + + document_dir = tmp_path / "knowledge" / ".llm_wiki" / "documents" / document_id + assert (document_dir / "document.json").exists() + assert (document_dir / "text.md").exists() + assert (document_dir / "chunks.json").exists() + assert (document_dir / "knowledge_candidates.json").exists() + assert (document_dir / "knowledge_summary.md").exists() + assert (document_dir / "rule_candidates.json").exists() + + document_payload = json.loads((document_dir / "document.json").read_text(encoding="utf-8")) + assert document_payload["sync_reason"] == "initial_build" + + detail = service.get_document_detail(document_id) + assert "公司差旅报销制度.txt 知识总结" in detail.knowledge_summary_markdown + assert "住宿费升级审批要求" in detail.knowledge_summary_markdown + + asset = AgentAssetService(db).get_asset(result.generated_rule_asset_ids[0]) + assert asset is not None + assert asset.status == "draft" + assert asset.config_json["llm_wiki_managed"] is True + assert asset.config_json["runtime_rule"]["template_key"] == "expense_amount_limit_v1" + assert asset.config_json["runtime_rule"]["threshold"]["block_amount"] == "600.00" + assert "```expense-rule" in str(asset.current_version_content) + + +def test_llm_wiki_document_summary_can_be_updated(tmp_path, monkeypatch) -> None: + document_id = upload_policy_document(tmp_path) + + def fake_call_candidate_model(self, *, entry, chunk_group): + return build_candidate_payload(chunk_group[0]["chunk_id"]) + + monkeypatch.setattr(LlmWikiService, "_call_candidate_model", fake_call_candidate_model) + + with build_session() as db: + service = LlmWikiService(db, storage_root=tmp_path) + service.sync_folder(folder="报销制度", current_user=build_admin_user()) + + updated = service.update_document_summary( + document_id, + LlmWikiSummaryUpdateWrite( + knowledge_summary_markdown="# 人工修订总结\n\n- 住宿费超标必须升级审批。\n- 报销时必须附发票和审批说明。" + ), + ) + + assert updated.document_id == document_id + assert updated.knowledge_summary_markdown.startswith("# 人工修订总结") + + summary_path = tmp_path / "knowledge" / ".llm_wiki" / "documents" / document_id / "knowledge_summary.md" + assert summary_path.read_text(encoding="utf-8").startswith("# 人工修订总结") + + +def test_llm_wiki_sync_rejects_invalid_runtime_rule_schema(tmp_path, monkeypatch) -> None: + document_id = upload_policy_document(tmp_path) + + def fake_call_candidate_model(self, *, entry, chunk_group): + return build_invalid_candidate_payload(chunk_group[0]["chunk_id"]) + + monkeypatch.setattr(LlmWikiService, "_call_candidate_model", fake_call_candidate_model) + + with build_session() as db: + service = LlmWikiService(db, storage_root=tmp_path) + result = service.sync_folder(folder="报销制度", current_user=build_admin_user()) + + assert result.document_count == 1 + assert result.rule_candidate_count == 1 + assert result.generated_rule_count == 0 + + document_dir = tmp_path / "knowledge" / ".llm_wiki" / "documents" / document_id + rule_candidates = json.loads((document_dir / "rule_candidates.json").read_text(encoding="utf-8")) + + assert rule_candidates[0]["validation_status"] == "invalid" + assert rule_candidates[0]["status"] == "validation_failed" + assert rule_candidates[0]["validation_errors"] + assert "block_amount" in " ".join(rule_candidates[0]["validation_errors"]) + + +def test_knowledge_document_state_changes_with_llm_wiki_sync(tmp_path, monkeypatch) -> None: + document_id = upload_policy_document(tmp_path) + + def fake_call_candidate_model(self, *, entry, chunk_group): + return build_candidate_payload(chunk_group[0]["chunk_id"]) + + monkeypatch.setattr(LlmWikiService, "_call_candidate_model", fake_call_candidate_model) + + knowledge_service = KnowledgeService(storage_root=tmp_path) + initial_detail = knowledge_service.get_document_detail(document_id) + assert initial_detail.stateCode == KNOWLEDGE_INGEST_STATUS_PUBLISHED + assert initial_detail.state == "待归纳" + + with build_session() as db: + LlmWikiService(db, storage_root=tmp_path).sync_folder( + folder="报销制度", + current_user=build_admin_user(), + document_ids=[document_id], + ) + + ingested_detail = knowledge_service.get_document_detail(document_id) + assert ingested_detail.stateCode == KNOWLEDGE_INGEST_STATUS_INGESTED + assert ingested_detail.state == "已归纳" + + updated_detail = knowledge_service.upload_document( + folder="报销制度", + filename="公司差旅报销制度.txt", + content=( + "第一章 差旅报销\n" + "员工因公出差发生的住宿费应按照公司差旅标准执行。\n" + "新增:超标住宿必须附书面说明。\n" + ).encode("utf-8"), + current_user=build_admin_user(), + ) + assert updated_detail.id == document_id + assert updated_detail.stateCode == KNOWLEDGE_INGEST_STATUS_PUBLISHED + assert updated_detail.state == "待归纳" + + index_payload = json.loads((tmp_path / "knowledge" / ".index.json").read_text(encoding="utf-8")) + stored_entry = next(item for item in index_payload["documents"] if item["id"] == document_id) + assert stored_entry["ingest_status"] == KNOWLEDGE_INGEST_STATUS_PUBLISHED + + +def test_llm_wiki_sync_marks_document_failed_when_ingest_raises(tmp_path, monkeypatch) -> None: + document_id = upload_policy_document(tmp_path) + + def fake_call_candidate_model(self, *, entry, chunk_group): + raise RuntimeError("simulated llm wiki failure") + + monkeypatch.setattr(LlmWikiService, "_call_candidate_model", fake_call_candidate_model) + + with build_session() as db: + service = LlmWikiService(db, storage_root=tmp_path) + with pytest.raises(RuntimeError, match="simulated llm wiki failure"): + service.sync_folder( + folder="报销制度", + current_user=build_admin_user(), + document_ids=[document_id], + ) + + detail = KnowledgeService(storage_root=tmp_path).get_document_detail(document_id) + assert detail.stateCode == KNOWLEDGE_INGEST_STATUS_FAILED + assert detail.state == "归纳失败" + + +def test_llm_wiki_sync_uses_fallback_candidates_when_system_hermes_times_out( + tmp_path, + monkeypatch, +) -> None: + document_id = upload_policy_document(tmp_path) + + with build_session() as db: + service = LlmWikiService(db, storage_root=tmp_path) + + monkeypatch.setattr(service.system_hermes_service, "is_available", lambda: True) + + def fake_run_query(*args, **kwargs): + raise TimeoutExpired(cmd="hermes", timeout=1) + + monkeypatch.setattr(service.system_hermes_service, "run_query", fake_run_query) + + runtime_called = {"count": 0} + + def fail_runtime_complete(*args, **kwargs): + runtime_called["count"] += 1 + raise AssertionError("system hermes timeout should fall back directly to local candidate builder") + + monkeypatch.setattr(service.runtime_chat_service, "complete", fail_runtime_complete) + + result = service.sync_folder( + folder="报销制度", + current_user=build_admin_user(), + document_ids=[document_id], + ) + + assert result.document_count == 1 + assert result.knowledge_candidate_count >= 1 + assert runtime_called["count"] == 0 + + detail = KnowledgeService(storage_root=tmp_path).get_document_detail(document_id) + assert detail.stateCode == KNOWLEDGE_INGEST_STATUS_INGESTED + assert detail.state == "已归纳" + + +def test_llm_wiki_sync_skips_unchanged_and_rebuilds_on_updated_at_change(tmp_path, monkeypatch) -> None: + document_id = upload_policy_document(tmp_path) + + def fake_call_candidate_model(self, *, entry, chunk_group): + return build_candidate_payload(chunk_group[0]["chunk_id"]) + + monkeypatch.setattr(LlmWikiService, "_call_candidate_model", fake_call_candidate_model) + + with build_session() as db: + service = LlmWikiService(db, storage_root=tmp_path) + + first = service.sync_folder(folder="报销制度", current_user=build_admin_user()) + second = service.sync_folder(folder="报销制度", current_user=build_admin_user()) + + assert first.document_count == 1 + assert second.document_count == 0 + assert "未变化,跳过" in second.summary + + update_document_timestamp(tmp_path, document_id, "2026-05-15T09:30:00+00:00") + third = service.sync_folder(folder="报销制度", current_user=build_admin_user()) + + assert third.document_count == 1 + + document_dir = tmp_path / "knowledge" / ".llm_wiki" / "documents" / document_id + document_payload = json.loads((document_dir / "document.json").read_text(encoding="utf-8")) + assert document_payload["sync_reason"] == "updated_at_changed" + + +def test_llm_wiki_sync_does_not_overwrite_active_rule(tmp_path, monkeypatch) -> None: + document_id = upload_policy_document(tmp_path) + + def fake_call_candidate_model(self, *, entry, chunk_group): + return build_candidate_payload(chunk_group[0]["chunk_id"]) + + monkeypatch.setattr(LlmWikiService, "_call_candidate_model", fake_call_candidate_model) + + with build_session() as db: + service = LlmWikiService(db, storage_root=tmp_path) + first = service.sync_folder(folder="报销制度", current_user=build_admin_user()) + asset_id = first.generated_rule_asset_ids[0] + + asset_service = AgentAssetService(db) + asset_detail = asset_service.get_asset(asset_id) + assert asset_detail is not None + asset_service.create_review( + asset_id, + AgentAssetReviewCreate( + version=asset_detail.current_version or "v1.0.0", + reviewer="管理员", + review_status=AgentReviewStatus.APPROVED, + review_note="允许上线", + ), + actor="管理员", + ) + activated = asset_service.activate_asset(asset_id, actor="管理员") + + assert activated.status == "active" + + original_version = activated.current_version + original_content = activated.current_version_content + original_config = activated.config_json + + def fake_call_candidate_model_changed(self, *, entry, chunk_group): + return build_candidate_payload( + chunk_group[0]["chunk_id"], + summary="住宿费超过标准时,必须升级审批并记录超标原因。", + ) + + monkeypatch.setattr(LlmWikiService, "_call_candidate_model", fake_call_candidate_model_changed) + update_document_timestamp(tmp_path, document_id, "2026-05-15T10:00:00+00:00") + + second = service.sync_folder(folder="报销制度", current_user=build_admin_user()) + refreshed = asset_service.get_asset(asset_id) + + assert second.document_count == 1 + assert second.generated_rule_count == 0 + assert refreshed is not None + assert refreshed.status == "active" + assert refreshed.current_version == original_version + assert refreshed.current_version_content == original_content + assert refreshed.config_json == original_config + + +def test_llm_wiki_sync_endpoint_records_agent_run(monkeypatch) -> None: + def fake_sync_folder(self, *, folder="报销制度", current_user, document_ids=None, force=False): + return LlmWikiSyncRead( + ok=True, + run_id="wiki_test_sync", + folder=folder, + document_count=1, + knowledge_candidate_count=2, + rule_candidate_count=1, + generated_rule_count=1, + generated_rule_asset_ids=["asset-rule-1"], + summary="已完成 Hermes LLM Wiki 同步。", + ) + + monkeypatch.setattr(LlmWikiService, "sync_folder", fake_sync_folder) + + client, session_factory = build_client() + with session_factory() as db: + before_count = len(AgentRunService(db).list_runs(limit=100)) + + response = client.post( + "/api/v1/knowledge/llm-wiki/sync", + json={"folder": "报销制度", "force": False}, + headers={ + "x-auth-username": "admin", + "x-auth-name": "admin", + "x-auth-is-admin": "true", + }, + ) + + assert response.status_code == 200 + payload = response.json() + assert payload["run_id"] == "wiki_test_sync" + assert payload["generated_rule_count"] == 1 + + with session_factory() as db: + service = AgentRunService(db) + after_runs = service.list_runs(limit=100) + assert len(after_runs) == before_count + 1 + + latest_run = after_runs[0] + assert latest_run.agent == "hermes" + assert latest_run.source == AgentRunSource.SCHEDULE.value + assert latest_run.status == AgentRunStatus.SUCCEEDED.value + assert latest_run.tool_calls + assert latest_run.tool_calls[0].tool_name == "system_hermes_llm_wiki_sync" + assert latest_run.tool_calls[0].status == "succeeded" + assert latest_run.tool_calls[0].response_json["run_id"] == "wiki_test_sync"