from __future__ import annotations import hashlib import json import re from subprocess import TimeoutExpired from dataclasses import dataclass from datetime import UTC, datetime from decimal import Decimal from pathlib import Path from typing import Any, Callable, Literal from uuid import uuid4 from pydantic import BaseModel, ConfigDict, Field, ValidationError, model_validator from sqlalchemy import select from sqlalchemy.orm import Session from app.api.deps import CurrentUserContext from app.core.agent_enums import ( AgentAssetContentType, AgentAssetDomain, AgentAssetStatus, AgentAssetType, ) from app.core.logging import get_logger from app.models.agent_asset import AgentAsset from app.schemas.agent_asset import AgentAssetCreate, AgentAssetUpdate, AgentAssetVersionCreate from app.schemas.knowledge import ( LlmWikiChunkRead, LlmWikiDocumentDetailRead, LlmWikiDocumentRead, LlmWikiIndexRead, LlmWikiKnowledgeCandidateRead, LlmWikiRuleCandidateRead, LlmWikiSummaryUpdateWrite, LlmWikiSyncRead, ) from app.services.agent_assets import AgentAssetService from app.services.knowledge import ( KNOWLEDGE_INGEST_STATUS_FAILED, KNOWLEDGE_INGEST_STATUS_SYNCING, KnowledgeService, ) from app.services.runtime_chat import RuntimeChatService from app.services.system_hermes import SystemHermesService logger = get_logger("app.services.llm_wiki") HERMES_CANDIDATE_MODEL_TIMEOUT_SECONDS = 10 HERMES_CANDIDATE_GROUP_SIZE = 2 HERMES_CANDIDATE_CONTENT_LIMIT = 520 LOW_SIGNAL_DOTTED_LINE_PATTERN = re.compile(r"[..。·•]{6,}\s*[0-9]{0,3}$") PAGE_FOOTER_PATTERN = re.compile(r"^第\s*\d+\s*页\s*共\s*\d+\s*页$") POLICY_SUBSTANCE_KEYWORDS = ( "报销", "审批", "标准", "费用", "支出", "单据", "发票", "附件", "预算", "财务", "差旅", "住宿", "交通", "借款", "付款", "申请", "审核", "核销", "金额", "额度", "应当", "必须", "不得", "可以", ) HEADING_PATTERN = re.compile( r"^(第[一二三四五六七八九十百零〇0-9]+[章节条].{0,40}|附件\s*[0-9一二三四五六七八九十]+.*|目录)$" ) JSON_FENCE_PATTERN = re.compile(r"```(?:json)?\s*(\{.*\})\s*```", re.DOTALL) THINK_PATTERN = re.compile(r".*?", re.DOTALL | re.IGNORECASE) @dataclass(slots=True) class CandidateModelAttempt: payload: dict[str, Any] source: str = "hermes" ok: bool = True failure_reason: str = "" @dataclass(slots=True) class CandidateExtractionStats: raw_chunk_count: int = 0 candidate_chunk_count: int = 0 filtered_chunk_count: int = 0 group_count: int = 0 successful_group_count: int = 0 failed_group_count: int = 0 formal_knowledge_candidate_count: int = 0 fallback_knowledge_candidate_count: int = 0 quality_status: str = "failed" quality_note: str = "" RULE_TEMPLATE_CATALOG: dict[str, dict[str, str]] = { "travel_standard_v1": { "label": "差旅标准模板", "summary": "用于差旅、住宿、交通等制度条款的结构化规则草稿。", }, "expense_amount_limit_v1": { "label": "金额上限模板", "summary": "用于单笔金额、合计金额、住宿标准等金额约束条款。", }, "attachment_requirement_v1": { "label": "附件要求模板", "summary": "用于票据、附件、说明材料完整性约束条款。", }, "general_policy_v1": { "label": "通用制度模板", "summary": "用于需要人工补齐执行细节的制度性草稿规则。", }, } TAG_HINTS: tuple[tuple[str, str], ...] = ( ("差旅", "差旅"), ("住宿", "住宿"), ("交通", "交通"), ("餐饮", "餐饮"), ("附件", "附件"), ("发票", "发票"), ("审批", "审批"), ("预算", "预算"), ) TRAVEL_CONTROL_CODES = ( "route_closed_loop", "destination_match", "hotel_city_match", "hotel_limit", "transport_class_limit", ) class StrictTemplateModel(BaseModel): model_config = ConfigDict(extra="forbid") class DraftRuleTarget(StrictTemplateModel): expense_types: list[str] = Field(default_factory=list) scene_codes: list[str] = Field(default_factory=list) class DraftRuleOutput(StrictTemplateModel): risk_code: str = Field(min_length=1, max_length=80) action: Literal["warn", "review", "block"] message: str = Field(default="", max_length=500) class DraftExceptionPolicy(StrictTemplateModel): allow_with_explanation: bool = True keywords: list[str] = Field(default_factory=list) class TravelControlPoint(StrictTemplateModel): control_code: Literal[ "route_closed_loop", "destination_match", "hotel_city_match", "hotel_limit", "transport_class_limit", ] severity: Literal["medium", "high"] = "high" enabled: bool = True class TravelStandardDraftRule(StrictTemplateModel): kind: Literal["policy_rule_draft"] = "policy_rule_draft" version: int = 1 template_key: Literal["travel_standard_v1"] rule_name: str = Field(min_length=1, max_length=200) scenario: str = Field(min_length=1, max_length=80) source_document_name: str = Field(min_length=1, max_length=255) review_required: bool = True target: DraftRuleTarget = Field(default_factory=DraftRuleTarget) control_points: list[TravelControlPoint] = Field(default_factory=list) exception_policy: DraftExceptionPolicy = Field(default_factory=DraftExceptionPolicy) output: DraftRuleOutput @model_validator(mode="after") def validate_control_points(self) -> "TravelStandardDraftRule": if not self.control_points: raise ValueError("travel_standard_v1 必须至少包含一个 control_points 配置。") return self class AmountLimitTarget(StrictTemplateModel): expense_types: list[str] = Field(default_factory=list) scene_codes: list[str] = Field(default_factory=list) metric: str = Field(default="claim_total", min_length=1, max_length=80) class AmountLimitThreshold(StrictTemplateModel): currency: str = Field(default="CNY", min_length=1, max_length=10) comparator: Literal["gt", "gte"] = "gt" warn_amount: Decimal | None = None block_amount: Decimal | None = None source: Literal["document_value", "manual_fill_required"] = "manual_fill_required" @model_validator(mode="after") def validate_threshold(self) -> "AmountLimitThreshold": if self.warn_amount is None and self.block_amount is None and self.source != "manual_fill_required": raise ValueError("金额阈值未提供,source 不是 manual_fill_required 时必须给出 warn_amount 或 block_amount。") if self.warn_amount is not None and self.warn_amount < Decimal("0"): raise ValueError("warn_amount 不能为负数。") if self.block_amount is not None and self.block_amount < Decimal("0"): raise ValueError("block_amount 不能为负数。") if ( self.warn_amount is not None and self.block_amount is not None and self.block_amount < self.warn_amount ): raise ValueError("block_amount 不能小于 warn_amount。") return self class ExpenseAmountLimitDraftRule(StrictTemplateModel): kind: Literal["policy_rule_draft"] = "policy_rule_draft" version: int = 1 template_key: Literal["expense_amount_limit_v1"] rule_name: str = Field(min_length=1, max_length=200) scenario: str = Field(min_length=1, max_length=80) source_document_name: str = Field(min_length=1, max_length=255) review_required: bool = True target: AmountLimitTarget threshold: AmountLimitThreshold exception_policy: DraftExceptionPolicy = Field(default_factory=DraftExceptionPolicy) output: DraftRuleOutput class AttachmentRequirementItem(StrictTemplateModel): document_type: str = Field(min_length=1, max_length=80) required: bool = True min_count: int = Field(default=1, ge=1) description: str = Field(default="", max_length=200) class AttachmentRequirementSet(StrictTemplateModel): min_attachment_count: int = Field(default=1, ge=1) items: list[AttachmentRequirementItem] = Field(default_factory=list) manual_fill_required: bool = False @model_validator(mode="after") def validate_items(self) -> "AttachmentRequirementSet": if not self.items and not self.manual_fill_required: raise ValueError("附件要求模板必须至少声明一个附件要求,或显式标记 manual_fill_required。") return self class AttachmentRequirementDraftRule(StrictTemplateModel): kind: Literal["policy_rule_draft"] = "policy_rule_draft" version: int = 1 template_key: Literal["attachment_requirement_v1"] rule_name: str = Field(min_length=1, max_length=200) scenario: str = Field(min_length=1, max_length=80) source_document_name: str = Field(min_length=1, max_length=255) review_required: bool = True target: DraftRuleTarget = Field(default_factory=DraftRuleTarget) attachment_requirements: AttachmentRequirementSet missing_attachment_action: Literal["warn", "review", "block"] = "block" output: DraftRuleOutput class GeneralPolicyDraftRule(StrictTemplateModel): kind: Literal["policy_rule_draft"] = "policy_rule_draft" version: int = 1 template_key: Literal["general_policy_v1"] rule_name: str = Field(min_length=1, max_length=200) scenario: str = Field(min_length=1, max_length=80) source_document_name: str = Field(min_length=1, max_length=255) review_required: bool = True target: DraftRuleTarget = Field(default_factory=DraftRuleTarget) control_points: list[str] = Field(default_factory=list) review_checklist: list[str] = Field(default_factory=list) output: DraftRuleOutput @model_validator(mode="after") def validate_general_policy(self) -> "GeneralPolicyDraftRule": if not self.control_points: raise ValueError("通用制度模板必须至少包含一个 control_points 项。") if not self.review_checklist: raise ValueError("通用制度模板必须至少包含一个 review_checklist 项。") return self RUNTIME_RULE_MODEL_BY_TEMPLATE: dict[str, type[BaseModel]] = { "travel_standard_v1": TravelStandardDraftRule, "expense_amount_limit_v1": ExpenseAmountLimitDraftRule, "attachment_requirement_v1": AttachmentRequirementDraftRule, "general_policy_v1": GeneralPolicyDraftRule, } RUNTIME_RULE_CONTRACTS: dict[str, dict[str, Any]] = { "travel_standard_v1": { "kind": "policy_rule_draft", "required_fields": ["target", "control_points", "exception_policy", "output"], "allowed_control_codes": list(TRAVEL_CONTROL_CODES), "allowed_output_actions": ["warn", "review", "block"], }, "expense_amount_limit_v1": { "kind": "policy_rule_draft", "required_fields": ["target.metric", "threshold", "exception_policy", "output"], "allowed_threshold_sources": ["document_value", "manual_fill_required"], "allowed_output_actions": ["warn", "review", "block"], }, "attachment_requirement_v1": { "kind": "policy_rule_draft", "required_fields": ["target", "attachment_requirements", "missing_attachment_action", "output"], "allowed_output_actions": ["warn", "review", "block"], }, "general_policy_v1": { "kind": "policy_rule_draft", "required_fields": ["target", "control_points", "review_checklist", "output"], "allowed_output_actions": ["warn", "review", "block"], }, } class LlmWikiService: def __init__(self, db: Session, *, storage_root: Path | None = None) -> None: self.db = db self.knowledge_service = KnowledgeService(storage_root=storage_root) self.runtime_chat_service = RuntimeChatService(db) self.asset_service = AgentAssetService(db) self.system_hermes_service = SystemHermesService() def get_index(self) -> LlmWikiIndexRead: self.knowledge_service.ensure_library_ready() index = self._load_wiki_index() sync_runs = self._load_sync_runs() documents = [ LlmWikiDocumentRead.model_validate(item) for item in sorted( index.get("documents", []), key=lambda value: str(value.get("updated_at") or ""), reverse=True, ) ] return LlmWikiIndexRead(documents=documents, sync_run_count=len(sync_runs.get("runs", []))) def get_document_detail(self, document_id: str) -> LlmWikiDocumentDetailRead: self.knowledge_service.ensure_library_ready() document_dir = self._document_dir(document_id) document_payload = self._load_json_file(document_dir / "document.json", default={}) if not document_payload: raise FileNotFoundError(document_id) chunks = self._load_json_file(document_dir / "chunks.json", default=[]) knowledge_candidates = self._load_json_file( document_dir / "knowledge_candidates.json", default=[], ) rule_candidates = self._load_json_file( document_dir / "rule_candidates.json", default=[], ) summary_path = document_dir / "knowledge_summary.md" knowledge_summary_markdown = summary_path.read_text(encoding="utf-8") if summary_path.exists() else "" if not knowledge_summary_markdown.strip(): knowledge_summary_markdown = self._build_knowledge_summary_markdown( entry=document_payload, knowledge_candidates=list(knowledge_candidates), ) return LlmWikiDocumentDetailRead( **LlmWikiDocumentRead.model_validate(document_payload).model_dump(), knowledge_summary_markdown=knowledge_summary_markdown, chunks=[LlmWikiChunkRead.model_validate(item) for item in chunks], knowledge_candidates=[ LlmWikiKnowledgeCandidateRead.model_validate(item) for item in knowledge_candidates ], rule_candidates=[ LlmWikiRuleCandidateRead.model_validate(item) for item in rule_candidates ], ) def update_document_summary( self, document_id: str, payload: LlmWikiSummaryUpdateWrite, ) -> LlmWikiDocumentDetailRead: self.knowledge_service.ensure_library_ready() document_dir = self._document_dir(document_id) document_payload = self._load_json_file(document_dir / "document.json", default={}) if not document_payload: raise FileNotFoundError(document_id) summary_text = str(payload.knowledge_summary_markdown or "").strip() if not summary_text: raise ValueError("知识总结不能为空。") (document_dir / "knowledge_summary.md").write_text(summary_text, encoding="utf-8") return self.get_document_detail(document_id) def sync_folder( self, *, folder: str = "报销制度", current_user: CurrentUserContext, document_ids: list[str] | None = None, force: bool = False, agent_run_id: str | None = None, progress_callback: Callable[[dict[str, Any], str], None] | None = None, ) -> LlmWikiSyncRead: self.knowledge_service.ensure_library_ready() documents = self.knowledge_service.list_folder_documents(folder=folder) if document_ids: allowed_ids = set(document_ids) documents = [item for item in documents if item.get("id") in allowed_ids] target_document_ids = [ str(item.get("id") or "").strip() for item in documents if str(item.get("id") or "").strip() ] if target_document_ids: self.knowledge_service.set_document_ingest_statuses( target_document_ids, status_code=KNOWLEDGE_INGEST_STATUS_SYNCING, agent_run_id=agent_run_id, ) try: index = self._load_wiki_index() sync_runs = self._load_sync_runs() existing_by_id = { str(item.get("document_id") or ""): item for item in list(index.get("documents", [])) } run_id = f"wiki_{uuid4().hex[:12]}" knowledge_candidate_count = 0 rule_candidate_count = 0 generated_rule_asset_ids: list[str] = [] changed_document_count = 0 skipped_document_count = 0 sync_summaries: list[str] = [] failed_document_ids: list[str] = [] total_documents = len(documents) self._emit_progress( progress_callback, { "phase": "running", "progress": { "total_documents": total_documents, "completed_documents": 0, "failed_documents": 0, "skipped_documents": 0, "percent": 0, }, }, f"Hermes 已开始归纳,待处理文档 {total_documents} 个。", ) for index_value, entry in enumerate(documents, start=1): document_id = str(entry.get("id") or "").strip() if not document_id: continue existing = existing_by_id.get(document_id) sync_reason = self._resolve_sync_reason(entry=entry, existing=existing, force=force) if sync_reason == "unchanged_skipped": skipped_document_count += 1 sync_summaries.append(f"{entry['original_name']}:未变化,跳过。") self._emit_progress( progress_callback, { "phase": "running", "progress": { "total_documents": total_documents, "completed_documents": changed_document_count, "failed_documents": len(failed_document_ids), "skipped_documents": skipped_document_count, "current_document_index": index_value, "current_document_id": document_id, "current_document_name": entry["original_name"], "current_stage": "skipped", "percent": self._calculate_progress_percent( completed_documents=changed_document_count, skipped_documents=skipped_document_count, total_documents=total_documents, ), }, }, f"《{entry['original_name']}》未变化,跳过本次归纳。", ) continue self._emit_progress( progress_callback, { "phase": "running", "progress": { "total_documents": total_documents, "completed_documents": changed_document_count, "failed_documents": len(failed_document_ids), "skipped_documents": skipped_document_count, "current_document_index": index_value, "current_document_id": document_id, "current_document_name": entry["original_name"], "current_stage": "document_started", "percent": self._calculate_progress_percent( completed_documents=changed_document_count, skipped_documents=skipped_document_count, total_documents=total_documents, ), }, }, f"Hermes 正在归纳《{entry['original_name']}》。", ) changed_document_count += 1 document_payload = self._sync_single_document( entry=entry, folder=folder, current_user=current_user, sync_reason=sync_reason, progress_callback=lambda payload, summary, *, document_id=document_id, document_name=entry["original_name"], document_index=index_value: self._emit_progress( progress_callback, { "phase": "running", "progress": { "total_documents": total_documents, "completed_documents": max(changed_document_count - 1, 0), "failed_documents": len(failed_document_ids), "skipped_documents": skipped_document_count, "current_document_index": document_index, "current_document_id": document_id, "current_document_name": document_name, **payload, "percent": self._calculate_progress_percent( completed_documents=max(changed_document_count - 1, 0), skipped_documents=skipped_document_count, total_documents=total_documents, group_count=int(payload.get("group_count") or 0), current_group_index=int(payload.get("current_group_index") or 0), ), }, }, summary, ), ) existing_by_id[document_id] = document_payload["document"] knowledge_candidate_count += len(document_payload["knowledge_candidates"]) rule_candidate_count += len(document_payload["rule_candidates"]) generated_rule_asset_ids.extend( [ str(item.get("generated_asset_id") or "").strip() for item in document_payload["rule_candidates"] if str(item.get("generated_asset_id") or "").strip() ] ) if document_payload["document"].get("quality_status") in {"fallback_only", "runtime_only", "failed"}: failed_document_ids.append(document_id) sync_summaries.append( f"{entry['original_name']}:{sync_reason},知识候选 {len(document_payload['knowledge_candidates'])} 条," f"规则候选 {len(document_payload['rule_candidates'])} 条," f"归纳质量 {document_payload['document'].get('quality_status') or 'formal'}。" ) self._emit_progress( progress_callback, { "phase": "running", "progress": { "total_documents": total_documents, "completed_documents": changed_document_count, "failed_documents": len(failed_document_ids), "skipped_documents": skipped_document_count, "current_document_index": index_value, "current_document_id": document_id, "current_document_name": entry["original_name"], "current_stage": "document_completed", "knowledge_candidate_count": len(document_payload["knowledge_candidates"]), "rule_candidate_count": len(document_payload["rule_candidates"]), "quality_status": document_payload["document"].get("quality_status") or "formal", "percent": self._calculate_progress_percent( completed_documents=changed_document_count, skipped_documents=skipped_document_count, total_documents=total_documents, ), }, }, f"《{entry['original_name']}》归纳完成,质量状态为 {document_payload['document'].get('quality_status') or 'formal'}。", ) index["documents"] = list(existing_by_id.values()) self._write_json_file(self.knowledge_service.llm_wiki_index_path, index) if failed_document_ids: self.knowledge_service.set_document_ingest_statuses( failed_document_ids, status_code=KNOWLEDGE_INGEST_STATUS_FAILED, agent_run_id=agent_run_id, ) sync_runs.setdefault("runs", []) sync_runs["runs"].append( { "run_id": run_id, "folder": folder, "requested_document_ids": document_ids or [], "changed_document_count": changed_document_count, "knowledge_candidate_count": knowledge_candidate_count, "rule_candidate_count": rule_candidate_count, "generated_rule_asset_ids": generated_rule_asset_ids, "created_by": current_user.name, "created_at": datetime.now(UTC).isoformat(), "summary": sync_summaries, } ) self._write_json_file(self.knowledge_service.llm_wiki_sync_runs_path, sync_runs) self.knowledge_service.refresh_document_ingest_statuses( document_ids=target_document_ids, preserve_syncing=False, ) generated_rule_ids = list(dict.fromkeys(generated_rule_asset_ids)) summary = ";".join(sync_summaries) if sync_summaries else "未发现需要同步的知识文档。" self._emit_progress( progress_callback, { "phase": "running", "progress": { "total_documents": total_documents, "completed_documents": changed_document_count, "failed_documents": len(failed_document_ids), "skipped_documents": skipped_document_count, "knowledge_candidate_count": knowledge_candidate_count, "rule_candidate_count": rule_candidate_count, "percent": 100, }, }, summary, ) return LlmWikiSyncRead( ok=True, run_id=run_id, folder=folder, document_count=changed_document_count, knowledge_candidate_count=knowledge_candidate_count, rule_candidate_count=rule_candidate_count, generated_rule_count=len(generated_rule_ids), generated_rule_asset_ids=generated_rule_ids, summary=summary, ) except Exception: if target_document_ids: self.knowledge_service.set_document_ingest_statuses( target_document_ids, status_code=KNOWLEDGE_INGEST_STATUS_FAILED, agent_run_id=agent_run_id, ) raise def _sync_single_document( self, *, entry: dict[str, Any], folder: str, current_user: CurrentUserContext, sync_reason: str, progress_callback: Callable[[dict[str, Any], str], None] | None = None, ) -> dict[str, Any]: document_id = str(entry["id"]) document_name = str(entry["original_name"]) document_dir = self._document_dir(document_id) document_dir.mkdir(parents=True, exist_ok=True) extracted_text = self.knowledge_service.extract_document_text(document_id) text_path = document_dir / "text.md" text_path.write_text(extracted_text, encoding="utf-8") chunks = self._build_chunks(document_id=document_id, text=extracted_text) self._emit_progress( progress_callback, { "current_stage": "text_extracted", "chunk_count": len(chunks), }, f"《{document_name}》文本提取完成,共形成 {len(chunks)} 个分块。", ) knowledge_candidates, rule_candidates, extraction_stats = self._extract_candidates( entry=entry, chunks=chunks, current_user=current_user, progress_callback=progress_callback, ) generated_candidates: list[dict[str, Any]] = [] for candidate in rule_candidates: if candidate.get("validation_status") == "valid": generated_asset = self._create_or_update_rule_draft(candidate, current_user=current_user) if generated_asset is not None: candidate["generated_asset_id"] = generated_asset["asset_id"] candidate["generated_asset_code"] = generated_asset["asset_code"] candidate["generated_version"] = generated_asset["version"] generated_candidates.append(candidate) document_record = { "document_id": document_id, "document_name": document_name, "folder": folder, "document_version": f"v{int(entry.get('version_number', 1))}.0", "checksum": str(entry.get("sha256") or ""), "extracted_text_path": str(text_path), "chunk_count": len(chunks), "candidate_chunk_count": extraction_stats.candidate_chunk_count, "filtered_chunk_count": extraction_stats.filtered_chunk_count, "group_count": extraction_stats.group_count, "successful_group_count": extraction_stats.successful_group_count, "failed_group_count": extraction_stats.failed_group_count, "knowledge_candidate_count": len(knowledge_candidates), "formal_knowledge_candidate_count": extraction_stats.formal_knowledge_candidate_count, "fallback_knowledge_candidate_count": extraction_stats.fallback_knowledge_candidate_count, "rule_candidate_count": len(generated_candidates), "quality_status": extraction_stats.quality_status, "quality_note": extraction_stats.quality_note, "updated_at": datetime.now(UTC).isoformat(), "signature": self._build_document_signature(entry), "sync_reason": sync_reason, } self._write_json_file(document_dir / "document.json", document_record) self._write_json_file(document_dir / "chunks.json", chunks) self._write_json_file(document_dir / "knowledge_candidates.json", knowledge_candidates) self._write_json_file(document_dir / "rule_candidates.json", generated_candidates) (document_dir / "knowledge_summary.md").write_text( self._build_knowledge_summary_markdown( entry=entry, knowledge_candidates=knowledge_candidates, ), encoding="utf-8", ) return { "document": document_record, "knowledge_candidates": knowledge_candidates, "rule_candidates": generated_candidates, } def _extract_candidates( self, *, entry: dict[str, Any], chunks: list[dict[str, Any]], current_user: CurrentUserContext, progress_callback: Callable[[dict[str, Any], str], None] | None = None, ) -> tuple[list[dict[str, Any]], list[dict[str, Any]], CandidateExtractionStats]: stats = CandidateExtractionStats(raw_chunk_count=len(chunks)) if not chunks: stats.quality_status = "failed" stats.quality_note = "文档未提取到可用分块,无法形成 LLM Wiki。" return [], [], stats candidate_chunks = self._select_candidate_chunks(chunks) stats.candidate_chunk_count = len(candidate_chunks) stats.filtered_chunk_count = max(0, len(chunks) - len(candidate_chunks)) if not candidate_chunks: stats.quality_status = "failed" stats.quality_note = "正文条款分块为空,当前仅识别到封面、目录或低信息量内容,未形成正式归纳。" return [], [], stats projected_group_count = len(self._group_chunks(candidate_chunks, size=HERMES_CANDIDATE_GROUP_SIZE)) self._emit_progress( progress_callback, { "current_stage": "candidate_chunks_selected", "candidate_chunk_count": stats.candidate_chunk_count, "filtered_chunk_count": stats.filtered_chunk_count, "group_count": projected_group_count, "current_group_index": 0, "successful_group_count": 0, "failed_group_count": 0, }, f"《{entry['original_name']}》已筛出 {stats.candidate_chunk_count} 个有效正文分块,准备分 {projected_group_count} 组归纳。", ) knowledge_candidates: list[dict[str, Any]] = [] rule_candidates: list[dict[str, Any]] = [] seen_knowledge_keys: set[str] = set() seen_rule_keys: set[str] = set() for chunk_group in self._group_chunks(candidate_chunks, size=HERMES_CANDIDATE_GROUP_SIZE): stats.group_count += 1 attempt = self._call_candidate_model(entry=entry, chunk_group=chunk_group) if isinstance(attempt, dict): attempt = CandidateModelAttempt(payload=attempt, source="hermes", ok=True) if not attempt.ok: stats.failed_group_count += 1 self._emit_progress( progress_callback, { "current_stage": "extracting_candidates", "group_count": projected_group_count, "current_group_index": stats.group_count, "successful_group_count": stats.successful_group_count, "failed_group_count": stats.failed_group_count, }, f"《{entry['original_name']}》第 {stats.group_count}/{projected_group_count} 组归纳失败,继续处理下一组。", ) continue stats.successful_group_count += 1 batch_knowledge = self._normalize_knowledge_candidates( raw_items=list(attempt.payload.get("knowledge_candidates") or []), entry=entry, chunk_group=chunk_group, seen_keys=seen_knowledge_keys, extraction_mode=attempt.source, ) batch_rules: list[dict[str, Any]] = [] if attempt.source == "hermes": batch_rules = self._normalize_rule_candidates( raw_items=list(attempt.payload.get("rule_candidates") or []), entry=entry, chunk_group=chunk_group, current_user=current_user, seen_keys=seen_rule_keys, ) knowledge_candidates.extend(batch_knowledge) rule_candidates.extend(batch_rules) self._emit_progress( progress_callback, { "current_stage": "extracting_candidates", "group_count": projected_group_count, "current_group_index": stats.group_count, "successful_group_count": stats.successful_group_count, "failed_group_count": stats.failed_group_count, "knowledge_candidate_count": len(knowledge_candidates), "rule_candidate_count": len(rule_candidates), }, f"《{entry['original_name']}》已完成第 {stats.group_count}/{projected_group_count} 组归纳。", ) formal_knowledge_candidate_count = sum( 1 for item in knowledge_candidates if str(item.get("extraction_mode") or "hermes") == "hermes" ) if formal_knowledge_candidate_count <= 0: fallback = self._build_fallback_knowledge_candidate( entry=entry, chunks=candidate_chunks, reason=( "Hermes 未能从正文条款中形成正式知识候选。当前结果仅为降级兜底预览,不能视为正式归纳。" ), ) if fallback is not None: knowledge_candidates.append(fallback) truncated_knowledge_candidates = knowledge_candidates[:12] truncated_rule_candidates = rule_candidates[:12] stats.formal_knowledge_candidate_count = sum( 1 for item in truncated_knowledge_candidates if str(item.get("extraction_mode") or "hermes") == "hermes" ) stats.fallback_knowledge_candidate_count = max( 0, len(truncated_knowledge_candidates) - stats.formal_knowledge_candidate_count, ) stats.quality_status, stats.quality_note = self._resolve_quality_status( stats=stats, knowledge_candidates=truncated_knowledge_candidates, ) self._emit_progress( progress_callback, { "current_stage": "candidate_extraction_completed", "group_count": projected_group_count, "current_group_index": projected_group_count, "successful_group_count": stats.successful_group_count, "failed_group_count": stats.failed_group_count, "knowledge_candidate_count": len(truncated_knowledge_candidates), "formal_knowledge_candidate_count": stats.formal_knowledge_candidate_count, "fallback_knowledge_candidate_count": stats.fallback_knowledge_candidate_count, "rule_candidate_count": len(truncated_rule_candidates), "quality_status": stats.quality_status, }, f"《{entry['original_name']}》候选提炼完成,质量状态为 {stats.quality_status}。", ) return truncated_knowledge_candidates, truncated_rule_candidates, stats def _call_candidate_model( self, *, entry: dict[str, Any], chunk_group: list[dict[str, Any]], ) -> CandidateModelAttempt: facts = { "document_id": entry["id"], "document_name": entry["original_name"], "folder": entry["folder"], "allowed_rule_templates": [ { "template_key": key, "template_label": value["label"], "summary": value["summary"], } for key, value in RULE_TEMPLATE_CATALOG.items() ], "runtime_rule_contracts": RUNTIME_RULE_CONTRACTS, "chunks": [ { "chunk_id": item["chunk_id"], "title": item["title"], "content": item["content"][:HERMES_CANDIDATE_CONTENT_LIMIT], "source_page": item.get("source_page"), "tags": item.get("tags", []), } for item in chunk_group ], } system_prompt = ( "你是企业财务制度知识库的 Hermes 规则形成器。" "你只能基于提供的制度条款生成结构化知识候选和规则候选,不能自由发散。" "封面、目录、通知、页眉页脚、密级说明、印发信息不属于知识候选,必须忽略。" "只提炼具有执行意义、审核意义、报销约束意义的条款。" "规则候选必须从允许模板中选 template_key,严禁自创模板。" "runtime_rule 必须严格遵守 runtime_rule_contracts 中对应模板的字段结构和允许值。" "如果条款不适合自动规则化,可以只返回 knowledge_candidates。" "输出必须是 JSON 对象,字段只有 knowledge_candidates 和 rule_candidates。" "knowledge_candidates 每项字段:title, content, scenario, tags, evidence, confidence, source_chunk_ids。" "rule_candidates 每项字段:template_key, suggested_rule_name, summary, scenario, purpose, scope, " "inputs, judgement_logic, outputs, admin_note, runtime_rule, evidence, confidence, source_chunk_ids。" "runtime_rule 也必须是 JSON 对象;如果当前无法形成可执行规则,也要给出结构化草稿 JSON," "并在 kind 中写 policy_rule_draft。" ) user_prompt = ( "请根据以下制度分块生成候选。" "每组最多提炼 3 条高价值 knowledge_candidates,优先保留可直接供报销审核、附件校验、审批判断使用的知识。" "只返回 JSON 对象,不要输出解释,不要调用工具,不要追加任何其他文本。\n" f"{json.dumps(facts, ensure_ascii=False, indent=2)}" ) hermes_query = ( f"{system_prompt}\n\n" f"{user_prompt}\n\n" "再次强调:只返回一个 JSON 对象,根字段必须是 knowledge_candidates 和 rule_candidates。" ) if self.system_hermes_service.is_available(): try: cli_result = self.system_hermes_service.run_query( hermes_query, source="tool", max_turns=1, timeout_seconds=HERMES_CANDIDATE_MODEL_TIMEOUT_SECONDS, ) payload = self._extract_json_payload(cli_result.response_text) if payload is not None: return CandidateModelAttempt(payload=payload, source="hermes", ok=True) logger.warning( "System Hermes returned no parseable JSON for LLM Wiki doc=%s chunk_group=%s.", entry.get("id"), ",".join(item.get("chunk_id", "") for item in chunk_group), ) return CandidateModelAttempt( payload={}, source="hermes", ok=False, failure_reason="system_hermes_no_json", ) except TimeoutExpired: logger.warning( "System Hermes timed out during LLM Wiki candidate extraction doc=%s chunk_group=%s.", entry.get("id"), ",".join(item.get("chunk_id", "") for item in chunk_group), ) return CandidateModelAttempt( payload={}, source="hermes", ok=False, failure_reason="system_hermes_timeout", ) except Exception as exc: logger.warning( "System Hermes failed during LLM Wiki candidate extraction doc=%s chunk_group=%s: %s", entry.get("id"), ",".join(item.get("chunk_id", "") for item in chunk_group), exc, ) return CandidateModelAttempt( payload={}, source="hermes", ok=False, failure_reason=str(exc) or "system_hermes_failed", ) response_text = self.runtime_chat_service.complete( [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}, ], slot_priority=("main", "backup"), max_tokens=1800, temperature=0.0, ) payload = self._extract_json_payload(response_text) if payload is None: return CandidateModelAttempt( payload={}, source="runtime", ok=False, failure_reason="runtime_no_json", ) return CandidateModelAttempt( payload=payload, source="runtime", ok=True, failure_reason="system_hermes_unavailable", ) def _select_candidate_chunks(self, chunks: list[dict[str, Any]]) -> list[dict[str, Any]]: selected: list[dict[str, Any]] = [] for chunk in chunks: if self._is_low_signal_chunk(chunk): continue selected.append(chunk) return selected def _is_low_signal_chunk(self, chunk: dict[str, Any]) -> bool: title = str(chunk.get("title") or "").strip() content = str(chunk.get("content") or "").strip() page = int(chunk.get("source_page") or 0) if not content: return True if self._looks_like_table_of_contents(title=title, content=content): return True if self._looks_like_cover_notice(title=title, content=content, source_page=page): return True compact_content = re.sub(r"\s+", "", content) if len(compact_content) < 24 and not self._has_policy_substance(content): return True if title.startswith("附件") and len(compact_content) < 40: return True return False def _is_low_signal_candidate(self, *, title: str, content: str) -> bool: compact_content = re.sub(r"\s+", "", content) if len(compact_content) < 24 and not self._has_policy_substance(content): return True if self._looks_like_table_of_contents(title=title, content=content): return True if self._looks_like_cover_notice(title=title, content=content, source_page=0): return True return False @staticmethod def _has_policy_substance(text: str) -> bool: sample = str(text or "") return any(keyword in sample for keyword in POLICY_SUBSTANCE_KEYWORDS) @staticmethod def _looks_like_table_of_contents(*, title: str, content: str) -> bool: title_text = str(title or "").strip() content_text = str(content or "").strip() if title_text == "目录" or content_text == "目录": return True lines = [line.strip() for line in content_text.splitlines() if line.strip()] if lines and sum(1 for line in lines if LOW_SIGNAL_DOTTED_LINE_PATTERN.search(line)) >= max(2, len(lines) // 2): return True if LOW_SIGNAL_DOTTED_LINE_PATTERN.search(content_text) and "第" in title_text: return True return False def _looks_like_cover_notice(self, *, title: str, content: str, source_page: int) -> bool: text = f"{title}\n{content}" if PAGE_FOOTER_PATTERN.fullmatch(str(content or "").strip()): return True cover_keywords = ("关于颁布", "特此通知", "印发", "商密", "制度〔", "有限公司文件", "通知") if source_page == 1 and any(keyword in text for keyword in cover_keywords): return True if source_page > 2: return False if any(keyword in text for keyword in cover_keywords): if "必须" not in text and "应当" not in text and "不得" not in text: return True return False def _resolve_quality_status( self, *, stats: CandidateExtractionStats, knowledge_candidates: list[dict[str, Any]], ) -> tuple[str, str]: if stats.formal_knowledge_candidate_count <= 0: runtime_count = sum( 1 for item in knowledge_candidates if str(item.get("extraction_mode") or "") == "runtime" ) if runtime_count > 0: return ( "runtime_only", "当前知识候选来自运行时模型而非系统 Hermes,仅供人工参考,不计入正式归纳。", ) if stats.fallback_knowledge_candidate_count > 0: return ( "fallback_only", "Hermes 未形成正式知识候选,当前仅保留降级兜底预览,不能作为正式知识上线。", ) return ( "failed", "Hermes 未能从当前文档提炼出可用知识候选,请调整文档内容或重新归纳。", ) if stats.failed_group_count > 0: return ( "partial_degraded", f"Hermes 成功处理 {stats.successful_group_count}/{stats.group_count} 个分组," f"仍有 {stats.failed_group_count} 个分组未成功,请人工复核后再使用。", ) if stats.filtered_chunk_count > 0: return ( "formal", f"已自动过滤 {stats.filtered_chunk_count} 个封面、目录或低信息量分块,当前结果来自正文条款。", ) return ("formal", "Hermes 已基于正文条款完成正式归纳。") def _normalize_knowledge_candidates( self, *, raw_items: list[Any], entry: dict[str, Any], chunk_group: list[dict[str, Any]], seen_keys: set[str], extraction_mode: str, ) -> list[dict[str, Any]]: normalized: list[dict[str, Any]] = [] default_chunk_ids = [item["chunk_id"] for item in chunk_group] for item in raw_items: if not isinstance(item, dict): continue title = str(item.get("title") or "").strip() content = str(item.get("content") or "").strip() if not title or not content: continue if self._is_low_signal_candidate(title=title, content=content): continue candidate_key = f"{title.casefold()}::{content[:80].casefold()}" if candidate_key in seen_keys: continue seen_keys.add(candidate_key) quality_flags: list[str] = [] fallback_reason = "" if extraction_mode != "hermes": quality_flags.append("non_hermes_source") fallback_reason = "当前知识候选不是由系统 Hermes 正式提炼,不能视为正式归纳。" normalized.append( { "candidate_id": f"kc_{uuid4().hex[:12]}", "title": title, "content": content, "domain": "expense", "scenario": str(item.get("scenario") or "reimbursement_policy").strip() or "reimbursement_policy", "tags": self._normalize_tags(item.get("tags"), fallback_text=f"{title}\n{content}"), "source_document_id": entry["id"], "source_document_name": entry["original_name"], "source_chunk_ids": self._normalize_chunk_ids( item.get("source_chunk_ids"), allowed_chunk_ids=default_chunk_ids, fallback_chunk_ids=default_chunk_ids, ), "evidence": self._normalize_string_list(item.get("evidence")), "confidence": self._normalize_confidence(item.get("confidence")), "status": "draft", "created_by": "hermes", "created_at": datetime.now(UTC).isoformat(), "extraction_mode": extraction_mode, "quality_flags": quality_flags, "fallback_reason": fallback_reason, } ) return normalized def _normalize_rule_candidates( self, *, raw_items: list[Any], entry: dict[str, Any], chunk_group: list[dict[str, Any]], current_user: CurrentUserContext, seen_keys: set[str], ) -> list[dict[str, Any]]: normalized: list[dict[str, Any]] = [] default_chunk_ids = [item["chunk_id"] for item in chunk_group] for item in raw_items: if not isinstance(item, dict): continue template_key = str(item.get("template_key") or "").strip() if template_key not in RULE_TEMPLATE_CATALOG: continue rule_name = str(item.get("suggested_rule_name") or "").strip() if not rule_name: continue candidate_key = f"{template_key}::{rule_name.casefold()}" if candidate_key in seen_keys: continue seen_keys.add(candidate_key) template_sections = { "purpose": str(item.get("purpose") or "").strip(), "scope": str(item.get("scope") or "").strip(), "inputs": self._normalize_string_list(item.get("inputs")), "judgement_logic": self._normalize_string_list(item.get("judgement_logic")), "outputs": self._normalize_string_list(item.get("outputs")), "admin_note": str(item.get("admin_note") or "").strip(), } runtime_rule = item.get("runtime_rule") if not isinstance(runtime_rule, dict): runtime_rule = {} validation_result = self._normalize_runtime_rule( runtime_rule=runtime_rule, template_key=template_key, rule_name=rule_name, scenario=str(item.get("scenario") or "reimbursement_policy").strip() or "reimbursement_policy", source_document_name=str(entry["original_name"]), template_sections=template_sections, evidence=self._normalize_string_list(item.get("evidence")), ) runtime_rule = validation_result["runtime_rule"] validation_errors = list(validation_result["validation_errors"]) validation_status = "valid" if not validation_errors else "invalid" normalized.append( { "candidate_id": f"rc_{uuid4().hex[:12]}", "source_type": "policy_document", "template_key": template_key, "template_label": RULE_TEMPLATE_CATALOG[template_key]["label"], "domain": "expense", "scenario": str(item.get("scenario") or "reimbursement_policy").strip() or "reimbursement_policy", "suggested_rule_name": rule_name, "summary": str(item.get("summary") or "").strip(), "template_sections": template_sections, "rule_markdown_draft": self._build_rule_markdown( rule_name=rule_name, template_key=template_key, template_sections=template_sections, runtime_rule=runtime_rule, evidence=self._normalize_string_list(item.get("evidence")), confidence=self._normalize_confidence(item.get("confidence")), source_document_name=str(entry["original_name"]), current_user=current_user, ), "runtime_rule": runtime_rule, "evidence": self._normalize_string_list(item.get("evidence")), "confidence": self._normalize_confidence(item.get("confidence")), "source_document_id": entry["id"], "source_document_name": entry["original_name"], "source_chunk_ids": self._normalize_chunk_ids( item.get("source_chunk_ids"), allowed_chunk_ids=default_chunk_ids, fallback_chunk_ids=default_chunk_ids, ), "validation_status": validation_status, "validation_errors": validation_errors, "status": "draft" if validation_status == "valid" else "validation_failed", "created_by": "hermes", "created_at": datetime.now(UTC).isoformat(), } ) return normalized def _normalize_runtime_rule( self, *, runtime_rule: dict[str, Any], template_key: str, rule_name: str, scenario: str, source_document_name: str, template_sections: dict[str, Any], evidence: list[str], ) -> dict[str, Any]: base_payload = { "kind": "policy_rule_draft", "version": 1, "template_key": template_key, "rule_name": rule_name, "scenario": scenario, "source_document_name": source_document_name, "review_required": True, } raw_payload = dict(runtime_rule) if template_key == "travel_standard_v1": candidate_payload = self._build_travel_standard_runtime_rule( base_payload=base_payload, raw_payload=raw_payload, scenario=scenario, ) elif template_key == "expense_amount_limit_v1": candidate_payload = self._build_expense_amount_limit_runtime_rule( base_payload=base_payload, raw_payload=raw_payload, scenario=scenario, ) elif template_key == "attachment_requirement_v1": candidate_payload = self._build_attachment_requirement_runtime_rule( base_payload=base_payload, raw_payload=raw_payload, scenario=scenario, ) else: candidate_payload = self._build_general_policy_runtime_rule( base_payload=base_payload, raw_payload=raw_payload, scenario=scenario, template_sections=template_sections, evidence=evidence, ) model_class = RUNTIME_RULE_MODEL_BY_TEMPLATE[template_key] try: validated = model_class.model_validate(candidate_payload) except ValidationError as exc: return { "runtime_rule": candidate_payload, "validation_errors": self._format_validation_errors(exc), } return { "runtime_rule": validated.model_dump(mode="json"), "validation_errors": [], } def _build_travel_standard_runtime_rule( self, *, base_payload: dict[str, Any], raw_payload: dict[str, Any], scenario: str, ) -> dict[str, Any]: target_payload = self._build_target_payload( raw_target=raw_payload.get("target"), expense_types=self._normalize_string_list(raw_payload.get("expense_types")) or ["travel", "hotel", "transport"], scene_codes=self._normalize_string_list(raw_payload.get("scene_codes")) or [scenario], ) control_points = self._normalize_travel_control_points(raw_payload.get("control_points")) if not control_points: control_points = [ {"control_code": control_code, "severity": "high", "enabled": True} for control_code in TRAVEL_CONTROL_CODES ] exception_policy = raw_payload.get("exception_policy") if not isinstance(exception_policy, dict): exception_policy = {} return { **base_payload, "target": target_payload, "control_points": control_points, "exception_policy": { "allow_with_explanation": bool(exception_policy.get("allow_with_explanation", True)), "keywords": self._normalize_string_list( exception_policy.get("keywords") or raw_payload.get("exception_keywords") ), }, "output": self._build_output_payload( raw_output=raw_payload.get("output"), risk_code="travel_policy_review", action="review", message="差旅制度命中后需要人工复核。", ), } def _build_expense_amount_limit_runtime_rule( self, *, base_payload: dict[str, Any], raw_payload: dict[str, Any], scenario: str, ) -> dict[str, Any]: raw_target = raw_payload.get("target") if not isinstance(raw_target, dict): raw_target = {} target_payload = self._build_target_payload( raw_target=raw_target, expense_types=self._normalize_string_list(raw_payload.get("expense_types")), scene_codes=self._normalize_string_list(raw_payload.get("scene_codes")) or [scenario], ) target_payload["metric"] = ( str(raw_target.get("metric") or raw_payload.get("metric") or raw_payload.get("scope") or "claim_total").strip() or "claim_total" ) raw_threshold = raw_payload.get("threshold") if not isinstance(raw_threshold, dict): raw_threshold = {} warn_amount = raw_threshold.get("warn_amount", raw_payload.get("warn_amount")) block_amount = raw_threshold.get("block_amount", raw_payload.get("block_amount")) threshold_source = str(raw_threshold.get("source") or "").strip() or ( "document_value" if warn_amount is not None or block_amount is not None else "manual_fill_required" ) exception_policy = raw_payload.get("exception_policy") if not isinstance(exception_policy, dict): exception_policy = {} return { **base_payload, "target": target_payload, "threshold": { "currency": str(raw_threshold.get("currency") or raw_payload.get("currency") or "CNY").strip() or "CNY", "comparator": str(raw_threshold.get("comparator") or raw_payload.get("comparator") or "gt").strip() or "gt", "warn_amount": warn_amount, "block_amount": block_amount, "source": threshold_source, }, "exception_policy": { "allow_with_explanation": bool(exception_policy.get("allow_with_explanation", True)), "keywords": self._normalize_string_list( exception_policy.get("keywords") or raw_payload.get("exception_keywords") ), }, "output": self._build_output_payload( raw_output=raw_payload.get("output"), risk_code="amount_limit_review", action="review", message="金额超标后需要人工复核。", ), } def _build_attachment_requirement_runtime_rule( self, *, base_payload: dict[str, Any], raw_payload: dict[str, Any], scenario: str, ) -> dict[str, Any]: target_payload = self._build_target_payload( raw_target=raw_payload.get("target"), expense_types=self._normalize_string_list(raw_payload.get("expense_types")), scene_codes=self._normalize_string_list(raw_payload.get("scene_codes")) or [scenario], ) raw_requirements = raw_payload.get("attachment_requirements") if not isinstance(raw_requirements, dict): raw_requirements = {} requirement_items = self._normalize_attachment_items( raw_requirements.get("items") or raw_payload.get("required_attachments") or raw_payload.get("attachment_items") ) manual_fill_required = bool(raw_requirements.get("manual_fill_required", not requirement_items)) min_attachment_count = raw_requirements.get("min_attachment_count") or raw_payload.get("min_attachment_count") or 1 try: min_attachment_count_value = max(1, int(min_attachment_count)) except (TypeError, ValueError): min_attachment_count_value = 1 return { **base_payload, "target": target_payload, "attachment_requirements": { "min_attachment_count": min_attachment_count_value, "items": requirement_items, "manual_fill_required": manual_fill_required, }, "missing_attachment_action": str( raw_payload.get("missing_attachment_action") or raw_payload.get("action") or "block" ).strip() or "block", "output": self._build_output_payload( raw_output=raw_payload.get("output"), risk_code="invoice_anomaly", action="block", message="附件或单据不完整,需补件后再提交。", ), } def _build_general_policy_runtime_rule( self, *, base_payload: dict[str, Any], raw_payload: dict[str, Any], scenario: str, template_sections: dict[str, Any], evidence: list[str], ) -> dict[str, Any]: target_payload = self._build_target_payload( raw_target=raw_payload.get("target"), expense_types=self._normalize_string_list(raw_payload.get("expense_types")), scene_codes=self._normalize_string_list(raw_payload.get("scene_codes")) or [scenario], ) control_points = self._normalize_string_list(raw_payload.get("control_points")) if not control_points: control_points = self._normalize_string_list(template_sections.get("judgement_logic")) review_checklist = self._normalize_string_list(raw_payload.get("review_checklist")) if not review_checklist: review_checklist = self._normalize_string_list(template_sections.get("outputs")) or evidence[:3] return { **base_payload, "target": target_payload, "control_points": control_points, "review_checklist": review_checklist, "output": self._build_output_payload( raw_output=raw_payload.get("output"), risk_code="general_policy_review", action="review", message="通用制度草稿需由人工补充执行细节。", ), } def _build_target_payload( self, *, raw_target: Any, expense_types: list[str], scene_codes: list[str], ) -> dict[str, Any]: target = raw_target if isinstance(raw_target, dict) else {} return { "expense_types": self._normalize_string_list(target.get("expense_types")) or expense_types, "scene_codes": self._normalize_string_list(target.get("scene_codes")) or scene_codes, } def _build_output_payload( self, *, raw_output: Any, risk_code: str, action: str, message: str, ) -> dict[str, Any]: output = raw_output if isinstance(raw_output, dict) else {} return { "risk_code": str(output.get("risk_code") or risk_code).strip() or risk_code, "action": str(output.get("action") or action).strip() or action, "message": str(output.get("message") or message).strip() or message, } def _normalize_travel_control_points(self, value: Any) -> list[dict[str, Any]]: normalized: list[dict[str, Any]] = [] for item in value or []: if isinstance(item, str): control_code = item.strip() if control_code in TRAVEL_CONTROL_CODES: normalized.append( {"control_code": control_code, "severity": "high", "enabled": True} ) continue if not isinstance(item, dict): continue control_code = str(item.get("control_code") or item.get("code") or "").strip() if control_code not in TRAVEL_CONTROL_CODES: continue normalized.append( { "control_code": control_code, "severity": str(item.get("severity") or "high").strip() or "high", "enabled": bool(item.get("enabled", True)), } ) return normalized def _normalize_attachment_items(self, value: Any) -> list[dict[str, Any]]: normalized: list[dict[str, Any]] = [] for item in value or []: if isinstance(item, str): document_type = item.strip() if document_type: normalized.append( { "document_type": document_type, "required": True, "min_count": 1, "description": "", } ) continue if not isinstance(item, dict): continue document_type = str(item.get("document_type") or item.get("type") or "").strip() if not document_type: continue min_count = item.get("min_count", 1) try: min_count_value = max(1, int(min_count)) except (TypeError, ValueError): min_count_value = 1 normalized.append( { "document_type": document_type, "required": bool(item.get("required", True)), "min_count": min_count_value, "description": str(item.get("description") or "").strip(), } ) return normalized @staticmethod def _format_validation_errors(exc: ValidationError) -> list[str]: errors: list[str] = [] for error in exc.errors(): location = ".".join(str(item) for item in error.get("loc", [])) message = str(error.get("msg") or "invalid runtime_rule") errors.append(f"{location}: {message}" if location else message) return errors def _build_fallback_knowledge_candidate( self, *, entry: dict[str, Any], chunks: list[dict[str, Any]], reason: str, ) -> dict[str, Any] | None: first_chunk = next((item for item in chunks if str(item.get("content") or "").strip()), None) if first_chunk is None: return None content = str(first_chunk["content"]).strip() summary = content[:200].strip() if not summary: return None return { "candidate_id": f"kc_{uuid4().hex[:12]}", "title": str(first_chunk.get("title") or entry["original_name"]).strip() or str(entry["original_name"]), "content": summary, "domain": "expense", "scenario": "reimbursement_policy", "tags": self._normalize_tags([], fallback_text=content), "source_document_id": entry["id"], "source_document_name": entry["original_name"], "source_chunk_ids": [first_chunk["chunk_id"]], "evidence": [summary], "confidence": 0.4, "status": "draft", "created_by": "hermes", "created_at": datetime.now(UTC).isoformat(), "extraction_mode": "fallback", "quality_flags": ["fallback_only", "not_formal_ingest"], "fallback_reason": reason, } @staticmethod def _build_knowledge_summary_markdown( *, entry: dict[str, Any], knowledge_candidates: list[dict[str, Any]], ) -> str: document_name = str( entry.get("original_name") or entry.get("document_name") or entry.get("source_document_name") or "知识文档" ).strip() or "知识文档" lines = [ f"# {document_name} 知识总结", "", "## 概览", "", f"- 来源文档:{document_name}", f"- 知识条目数:{len(knowledge_candidates)}", "", ] quality_status = str(entry.get("quality_status") or "formal").strip() or "formal" quality_note = str(entry.get("quality_note") or "").strip() if quality_status != "formal": lines.extend( [ "## 归纳状态", "", f"- 质量状态:{quality_status}", f"- 说明:{quality_note or '当前结果不是正式 Hermes 归纳。'}", "", ] ) elif quality_note: lines.extend( [ "## 归纳状态", "", f"- 质量状态:{quality_status}", f"- 说明:{quality_note}", "", ] ) lines.extend( [ "## 核心知识", "", ] ) if not knowledge_candidates: lines.extend( [ "暂无可展示的知识总结,待 Hermes 重新归纳后生成。", "", ] ) return "\n".join(lines).strip() for index, item in enumerate(knowledge_candidates[:8], start=1): title = str(item.get("title") or f"知识点 {index}").strip() or f"知识点 {index}" content = str(item.get("content") or "").strip() or "待补充内容。" scenario = str(item.get("scenario") or "").strip() tags = [str(tag).strip() for tag in list(item.get("tags") or []) if str(tag).strip()] lines.extend( [ f"### {index}. {title}", "", content, "", ] ) if scenario: lines.append(f"- 适用场景:{scenario}") if tags: lines.append(f"- 标签:{' / '.join(tags)}") lines.append("") return "\n".join(lines).strip() def _create_or_update_rule_draft( self, candidate: dict[str, Any], *, current_user: CurrentUserContext, ) -> dict[str, str] | None: source_key = self._build_rule_source_key(candidate) existing_asset = self._find_rule_asset_by_source_key(source_key) runtime_rule = dict(candidate.get("runtime_rule") or {}) config_json = { "severity": "medium", "enabled": False, "runtime_kind": str(runtime_rule.get("kind") or "policy_rule_draft"), "runtime_rule": runtime_rule, "rule_template_key": candidate["template_key"], "rule_template_label": candidate["template_label"], "hermes_source_key": source_key, "source_document_id": candidate["source_document_id"], "source_document_name": candidate["source_document_name"], "llm_wiki_managed": True, } if existing_asset is None: created_asset = self.asset_service.create_asset( AgentAssetCreate( asset_type=AgentAssetType.RULE, code=self._build_rule_asset_code(candidate), name=candidate["suggested_rule_name"], description=str(candidate.get("summary") or "").strip() or "由 Hermes 根据制度文档生成的规则草稿。", domain=AgentAssetDomain.EXPENSE, scenario_json=self._build_rule_scenarios(candidate), owner="Hermes", reviewer=current_user.name, status=AgentAssetStatus.DRAFT, config_json=config_json, ), actor=current_user.name, ) version = "v1.0.0" self.asset_service.create_version( created_asset.id, AgentAssetVersionCreate( version=version, content=candidate["rule_markdown_draft"], content_type=AgentAssetContentType.MARKDOWN, change_note="Hermes 根据制度文档生成规则草稿。", created_by=current_user.name, ), actor=current_user.name, ) return { "asset_id": created_asset.id, "asset_code": created_asset.code, "version": version, } detail = self.asset_service.get_asset(existing_asset.id) if detail is None: return None if detail.status == AgentAssetStatus.ACTIVE.value: return None current_markdown = str(detail.current_version_content or "") current_runtime_rule = ( detail.config_json.get("runtime_rule") if isinstance(detail.config_json, dict) else None ) metadata_changed = ( detail.name != candidate["suggested_rule_name"] or detail.description != (str(candidate.get("summary") or "").strip() or "由 Hermes 根据制度文档生成的规则草稿。") or detail.reviewer != current_user.name or detail.config_json != config_json or detail.scenario_json != self._build_rule_scenarios(candidate) ) content_changed = current_markdown != candidate["rule_markdown_draft"] runtime_changed = current_runtime_rule != runtime_rule if metadata_changed: self.asset_service.update_asset( existing_asset.id, AgentAssetUpdate( name=candidate["suggested_rule_name"], description=str(candidate.get("summary") or "").strip() or "由 Hermes 根据制度文档生成的规则草稿。", reviewer=current_user.name, scenario_json=self._build_rule_scenarios(candidate), status=AgentAssetStatus.DRAFT, config_json=config_json, ), actor=current_user.name, ) version = detail.current_version or "v1.0.0" if content_changed or runtime_changed or not detail.current_version: version = self._increment_version(detail.current_version) self.asset_service.create_version( existing_asset.id, AgentAssetVersionCreate( version=version, content=candidate["rule_markdown_draft"], content_type=AgentAssetContentType.MARKDOWN, change_note="Hermes 根据制度文档更新规则草稿。", created_by=current_user.name, ), actor=current_user.name, ) return { "asset_id": existing_asset.id, "asset_code": existing_asset.code, "version": version, } def _find_rule_asset_by_source_key(self, source_key: str) -> AgentAsset | None: assets = list( self.db.scalars( select(AgentAsset) .where(AgentAsset.asset_type == AgentAssetType.RULE.value) .where(AgentAsset.domain == AgentAssetDomain.EXPENSE.value) ).all() ) for asset in assets: config_json = asset.config_json or {} if str(config_json.get("hermes_source_key") or "").strip() == source_key: return asset return None @staticmethod def _build_rule_source_key(candidate: dict[str, Any]) -> str: raw = ( f"{candidate['source_document_id']}::{candidate['template_key']}::" f"{candidate['suggested_rule_name']}::{candidate['scenario']}" ) return hashlib.sha1(raw.encode("utf-8")).hexdigest() def _build_rule_asset_code(self, candidate: dict[str, Any]) -> str: template_key = re.sub(r"[^a-z0-9]+", "_", str(candidate["template_key"]).lower()).strip("_") source_key = self._build_rule_source_key(candidate)[:10] return f"rule.expense.hermes.{template_key}.{source_key}" @staticmethod def _build_rule_scenarios(candidate: dict[str, Any]) -> list[str]: scenario = str(candidate.get("scenario") or "reimbursement_policy").strip() template_key = str(candidate.get("template_key") or "general_policy_v1").strip() return ["expense", "rule_center", scenario, template_key] @staticmethod def _increment_version(version: str | None) -> str: normalized = str(version or "").strip().removeprefix("v") match = re.fullmatch(r"(\d+)\.(\d+)\.(\d+)", normalized) if not match: return "v1.0.0" major, minor, patch = (int(value) for value in match.groups()) return f"v{major}.{minor}.{patch + 1}" @staticmethod def _build_rule_markdown( *, rule_name: str, template_key: str, template_sections: dict[str, Any], runtime_rule: dict[str, Any], evidence: list[str], confidence: float, source_document_name: str, current_user: CurrentUserContext, ) -> str: def render_lines(items: list[str], empty_text: str) -> list[str]: if not items: return [f"- {empty_text}"] return [f"- {item}" for item in items] sections = [ f"# {rule_name}", "", "## 模板信息", "", f"- 模板键:`{template_key}`", f"- 来源文档:{source_document_name}", f"- Hermes 置信度:{confidence:.2f}", f"- 审核人:{current_user.name}", "", "## 目标", "", template_sections.get("purpose") or "待审核人补充规则目标。", "", "## 适用范围", "", template_sections.get("scope") or "待审核人补充适用范围。", "", "## 输入字段", "", *render_lines(list(template_sections.get("inputs") or []), "待审核人补充输入字段。"), "", "## 判断规则", "", *render_lines( list(template_sections.get("judgement_logic") or []), "待审核人补充判断逻辑。", ), "", "## 输出", "", *render_lines(list(template_sections.get("outputs") or []), "待审核人补充输出动作。"), "", "## 来源依据", "", *render_lines(evidence, "待审核人补充文档依据。"), "", "## 审核约束", "", "- 当前规则由 Hermes 自动生成,默认仅为 draft 草稿。", "- 规则上线前必须人工审核、补测样例、确认回滚方案。", "- JSON 运行时配置需要与 Markdown 说明保持一致。", "", "## 管理员备注", "", template_sections.get("admin_note") or "待审核人补充备注。", "", "```expense-rule", json.dumps(runtime_rule, ensure_ascii=False, indent=2), "```", ] return "\n".join(sections).strip() @staticmethod def _group_chunks(chunks: list[dict[str, Any]], *, size: int) -> list[list[dict[str, Any]]]: grouped: list[list[dict[str, Any]]] = [] for index in range(0, len(chunks), size): grouped.append(chunks[index : index + size]) return grouped def _build_chunks(self, *, document_id: str, text: str) -> list[dict[str, Any]]: pages = [page.strip() for page in str(text or "").split("\f")] if not pages: pages = [str(text or "").strip()] chunks: list[dict[str, Any]] = [] chunk_index = 0 for page_index, page_text in enumerate(pages, start=1): if not page_text: continue current_title = "" buffer: list[str] = [] lines = [self._clean_line(line) for line in page_text.splitlines()] lines = [line for line in lines if line] for line in lines: if HEADING_PATTERN.match(line): chunk_index = self._flush_chunk( chunks=chunks, document_id=document_id, chunk_index=chunk_index, title=current_title, lines=buffer, source_page=page_index, ) current_title = line buffer = [] continue buffer.append(line) if sum(len(item) for item in buffer) >= 750: chunk_index = self._flush_chunk( chunks=chunks, document_id=document_id, chunk_index=chunk_index, title=current_title, lines=buffer, source_page=page_index, ) buffer = [] chunk_index = self._flush_chunk( chunks=chunks, document_id=document_id, chunk_index=chunk_index, title=current_title, lines=buffer, source_page=page_index, ) return chunks def _flush_chunk( self, *, chunks: list[dict[str, Any]], document_id: str, chunk_index: int, title: str, lines: list[str], source_page: int, ) -> int: content = "\n".join(line for line in lines if line).strip() if not content: return chunk_index normalized_title = title.strip() or self._derive_chunk_title(content) tags = self._normalize_tags([], fallback_text=f"{normalized_title}\n{content}") chunk = { "chunk_id": f"{document_id}-chunk-{chunk_index + 1:03d}", "title": normalized_title, "content": content, "source_page": source_page, "word_count": len(re.sub(r"\s+", "", content)), "tags": tags, } chunks.append(chunk) return chunk_index + 1 @staticmethod def _derive_chunk_title(content: str) -> str: first_line = next((line.strip() for line in content.splitlines() if line.strip()), "") if first_line: return first_line[:48] return "未命名条款" @staticmethod def _clean_line(line: str) -> str: cleaned = str(line or "").replace("\u3000", " ").strip() cleaned = re.sub(r"\s+", " ", cleaned) if PAGE_FOOTER_PATTERN.fullmatch(cleaned): return "" if cleaned in {"商密【中】", "商密【高】", "商密【低】"}: return "" return cleaned @staticmethod def _normalize_string_list(value: Any) -> list[str]: if isinstance(value, list): return [str(item).strip() for item in value if str(item).strip()] if isinstance(value, str) and value.strip(): return [value.strip()] return [] def _normalize_tags(self, value: Any, *, fallback_text: str) -> list[str]: tags = self._normalize_string_list(value) text = f"{' '.join(tags)}\n{fallback_text}" for keyword, label in TAG_HINTS: if keyword in text and label not in tags: tags.append(label) return tags[:8] @staticmethod def _normalize_confidence(value: Any) -> float: try: confidence = float(value) except (TypeError, ValueError): return 0.0 return round(max(0.0, min(confidence, 1.0)), 2) @staticmethod def _normalize_chunk_ids( value: Any, *, allowed_chunk_ids: list[str], fallback_chunk_ids: list[str], ) -> list[str]: items = [str(item).strip() for item in value or [] if str(item).strip()] filtered = [item for item in items if item in allowed_chunk_ids] return filtered or fallback_chunk_ids[:2] @staticmethod def _extract_json_payload(response_text: str | None) -> dict[str, Any] | None: if not response_text: return None cleaned = THINK_PATTERN.sub("", response_text).strip() if not cleaned: return None candidates: list[str] = [] fenced_match = JSON_FENCE_PATTERN.search(cleaned) if fenced_match is not None: candidates.append(fenced_match.group(1)) candidates.append(cleaned) start = cleaned.find("{") end = cleaned.rfind("}") if start != -1 and end != -1 and end > start: candidates.append(cleaned[start : end + 1]) for candidate in candidates: try: parsed = json.loads(candidate) except json.JSONDecodeError: continue if isinstance(parsed, dict): return parsed return None def _resolve_sync_reason( self, *, entry: dict[str, Any], existing: dict[str, Any] | None, force: bool, ) -> str: if force: return "forced_rebuild" if existing is None: return "initial_build" existing_quality_status = str(existing.get("quality_status") or "").strip() if existing_quality_status and existing_quality_status != "formal": return f"quality_{existing_quality_status}_rebuild" if int(existing.get("formal_knowledge_candidate_count") or 0) <= 0: return "formal_candidate_missing_rebuild" previous_signature = existing.get("signature") if not isinstance(previous_signature, dict): return "signature_missing" current_signature = self._build_document_signature(entry) reasons: list[str] = [] if previous_signature.get("original_name") != current_signature["original_name"]: reasons.append("filename_changed") if previous_signature.get("sha256") != current_signature["sha256"]: reasons.append("content_changed") if previous_signature.get("version_number") != current_signature["version_number"]: reasons.append("version_changed") if previous_signature.get("updated_at") != current_signature["updated_at"]: reasons.append("updated_at_changed") if previous_signature.get("stored_name") != current_signature["stored_name"]: reasons.append("stored_name_changed") if not reasons: return "unchanged_skipped" return ",".join(reasons) @staticmethod def _emit_progress( progress_callback: Callable[[dict[str, Any], str], None] | None, payload: dict[str, Any], summary: str, ) -> None: if progress_callback is None: return progress_callback(payload, summary) @staticmethod def _calculate_progress_percent( *, completed_documents: int, skipped_documents: int, total_documents: int, group_count: int = 0, current_group_index: int = 0, ) -> int: if total_documents <= 0: return 100 completed_units = completed_documents + skipped_documents if group_count > 0 and current_group_index > 0: completed_units += min(current_group_index, group_count) / group_count percent = round((completed_units / total_documents) * 100) return max(0, min(percent, 100)) @staticmethod def _build_document_signature(entry: dict[str, Any]) -> dict[str, Any]: return { "document_id": str(entry.get("id") or ""), "original_name": str(entry.get("original_name") or ""), "stored_name": str(entry.get("stored_name") or ""), "sha256": str(entry.get("sha256") or ""), "version_number": int(entry.get("version_number") or 1), "updated_at": str(entry.get("updated_at") or ""), } def _load_wiki_index(self) -> dict[str, Any]: return self._load_json_file(self.knowledge_service.llm_wiki_index_path, default={"documents": []}) def _load_sync_runs(self) -> dict[str, Any]: return self._load_json_file(self.knowledge_service.llm_wiki_sync_runs_path, default={"runs": []}) def _document_dir(self, document_id: str) -> Path: return self.knowledge_service.llm_wiki_documents_root / document_id @staticmethod def _load_json_file(path: Path, *, default: Any) -> Any: try: return json.loads(path.read_text(encoding="utf-8")) except (FileNotFoundError, json.JSONDecodeError): return default @staticmethod def _write_json_file(path: Path, payload: Any) -> None: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")