Files
X-Financial/server/src/app/services/knowledge.py

1594 lines
63 KiB
Python
Raw Normal View History

from __future__ import annotations
import hashlib
import json
import mimetypes
import re
import shutil
import subprocess
from dataclasses import dataclass
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
from urllib.request import Request, urlopen
from uuid import uuid4
from xml.etree import ElementTree
from zipfile import BadZipFile, ZipFile
import jwt
from sqlalchemy import select
from sqlalchemy.orm import Session
from app.api.deps import CurrentUserContext
from app.core.agent_enums import AgentRunStatus
from app.core.config import get_settings
from app.core.logging import get_logger
from app.models.agent_run import AgentRun
from app.schemas.knowledge import (
KnowledgeDocumentDetailRead,
KnowledgeDocumentRead,
KnowledgeFolderRead,
KnowledgeLibraryRead,
KnowledgeOnlyOfficeConfigRead,
KnowledgePreviewBlockRead,
KnowledgePreviewPageRead,
KnowledgePreviewStatRead,
)
from app.services.settings import resolve_onlyoffice_settings
logger = get_logger("app.services.knowledge")
FIXED_KNOWLEDGE_FOLDERS = [
"财务知识库",
"制度政策",
"报销制度",
"差旅规范",
"发票管理",
"税务合规",
"预算管理",
"财务共享",
"培训资料",
"常见问答",
]
ICON_BY_TYPE = {
"pdf": "mdi mdi-file-document-outline-pdf pdf",
"word": "mdi mdi-file-document-outline-word word",
"excel": "mdi mdi-file-document-outline-excel excel",
"ppt": "mdi mdi-file-powerpoint-box ppt",
"image": "mdi mdi-file-image-outline image",
"text": "mdi mdi-file-document-outline text",
"archive": "mdi mdi-folder-zip-outline archive",
"binary": "mdi mdi-file-outline",
}
TEXT_EXTENSIONS = {"txt", "md", "csv", "json", "xml", "yml", "yaml", "log"}
WORD_EXTENSIONS = {"doc", "docx"}
EXCEL_EXTENSIONS = {"xls", "xlsx", "csv"}
PPT_EXTENSIONS = {"ppt", "pptx"}
IMAGE_EXTENSIONS = {"png", "jpg", "jpeg", "gif", "bmp", "webp", "svg"}
ARCHIVE_EXTENSIONS = {"zip", "rar", "7z"}
STRUCTURED_PREVIEW_EXTENSIONS = {"docx", "xlsx", "pptx"} | TEXT_EXTENSIONS
INLINE_PREVIEW_EXTENSIONS = {"pdf"} | IMAGE_EXTENSIONS
ONLYOFFICE_EDITABLE_EXTENSIONS = {"docx", "xlsx", "pptx"}
KNOWLEDGE_INGEST_SYNC_STALE_SECONDS = 90
KNOWLEDGE_SEARCH_RESULT_LIMIT = 3
KNOWLEDGE_SEARCH_STOP_TERMS = {
"什么",
"怎么",
"如何",
"多少",
"是否",
"可以",
"一下",
"请问",
"帮我",
"一下子",
"这个",
"那个",
"哪些",
"一下吧",
}
KNOWLEDGE_INGEST_STATUS_PUBLISHED = 1
KNOWLEDGE_INGEST_STATUS_SYNCING = 2
KNOWLEDGE_INGEST_STATUS_INGESTED = 3
KNOWLEDGE_INGEST_STATUS_FAILED = 4
KNOWLEDGE_INGEST_STATUS_META = {
KNOWLEDGE_INGEST_STATUS_PUBLISHED: ("待归纳", "muted"),
KNOWLEDGE_INGEST_STATUS_SYNCING: ("正归纳", "warning"),
KNOWLEDGE_INGEST_STATUS_INGESTED: ("已归纳", "success"),
KNOWLEDGE_INGEST_STATUS_FAILED: ("归纳失败", "danger"),
}
@dataclass(slots=True)
class OnlyOfficeCallbackPayload:
status: int
download_url: str
users: list[str]
def prepare_knowledge_library() -> None:
KnowledgeService().ensure_library_ready()
class KnowledgeService:
def __init__(self, storage_root: Path | None = None, db: Session | None = None) -> None:
settings = get_settings()
self.db = db
self.storage_root = Path(storage_root or settings.resolved_storage_root_dir)
self.library_root = self.storage_root / "knowledge"
self.index_path = self.library_root / ".index.json"
self.llm_wiki_root = self.library_root / ".llm_wiki"
self.llm_wiki_documents_root = self.llm_wiki_root / "documents"
self.llm_wiki_index_path = self.llm_wiki_root / "index.json"
self.llm_wiki_sync_runs_path = self.llm_wiki_root / "sync_runs.json"
def ensure_library_ready(self) -> None:
self.library_root.mkdir(parents=True, exist_ok=True)
for folder_name in FIXED_KNOWLEDGE_FOLDERS:
(self.library_root / folder_name).mkdir(parents=True, exist_ok=True)
self.llm_wiki_documents_root.mkdir(parents=True, exist_ok=True)
if not self.index_path.exists():
self._save_index({"version": 1, "documents": []})
if not self.llm_wiki_index_path.exists():
self.llm_wiki_index_path.write_text(
json.dumps({"documents": []}, ensure_ascii=False, indent=2),
encoding="utf-8",
)
if not self.llm_wiki_sync_runs_path.exists():
self.llm_wiki_sync_runs_path.write_text(
json.dumps({"runs": []}, ensure_ascii=False, indent=2),
encoding="utf-8",
)
index = self._load_index()
if self._reconcile_index(index):
self._save_index(index)
def list_library(self) -> KnowledgeLibraryRead:
documents = self._load_documents()
folders = [
KnowledgeFolderRead(
name=folder_name,
count=sum(1 for item in documents if item.folder == folder_name),
icon="mdi mdi-folder-open" if folder_name == "差旅规范" else "mdi mdi-folder",
)
for folder_name in FIXED_KNOWLEDGE_FOLDERS
]
return KnowledgeLibraryRead(folders=folders, documents=documents)
def get_document_detail(self, document_id: str) -> KnowledgeDocumentDetailRead:
self.ensure_library_ready()
index = self._load_index()
if self._reconcile_document_ingest_statuses(index, document_ids=[document_id]):
self._save_index(index)
entry = self._require_entry(index, document_id)
preview_kind, preview_pages = self._build_preview(entry)
wiki_document = self._build_wiki_document_map().get(str(document_id).strip())
document = self._serialize_document(entry, wiki_document=wiki_document)
return KnowledgeDocumentDetailRead(
**document.model_dump(),
previewKind=preview_kind,
previewPages=preview_pages,
)
def upload_document(
self,
folder: str,
filename: str,
content: bytes,
current_user: CurrentUserContext,
) -> KnowledgeDocumentDetailRead:
self.ensure_library_ready()
normalized_folder = self._normalize_folder(folder)
normalized_name = self._normalize_filename(filename)
if not content:
raise ValueError("上传文件不能为空。")
index = self._load_index()
existing_entry = next(
(
item
for item in index["documents"]
if item["folder"] == normalized_folder
and item["original_name"].lower() == normalized_name.lower()
),
None,
)
document_id = existing_entry["id"] if existing_entry else uuid4().hex
stored_name = f"{document_id}__{normalized_name}"
target_path = self.library_root / normalized_folder / stored_name
if existing_entry is not None and existing_entry["stored_name"] != stored_name:
old_path = self.library_root / existing_entry["folder"] / existing_entry["stored_name"]
if old_path.exists():
old_path.unlink()
target_path.write_bytes(content)
now = datetime.now(UTC).isoformat()
mime_type = mimetypes.guess_type(normalized_name)[0] or "application/octet-stream"
checksum = hashlib.sha256(content).hexdigest()
extension = self._extract_extension(normalized_name)
if existing_entry is None:
entry = {
"id": document_id,
"folder": normalized_folder,
"original_name": normalized_name,
"stored_name": stored_name,
"mime_type": mime_type,
"extension": extension,
"size_bytes": len(content),
"sha256": checksum,
"created_at": now,
"updated_at": now,
"uploaded_by": current_user.name,
"version_number": 1,
"ingest_status": KNOWLEDGE_INGEST_STATUS_PUBLISHED,
"ingest_agent_run_id": "",
}
index["documents"].append(entry)
logger.info(
"Knowledge document uploaded id=%s folder=%s filename=%s by=%s",
document_id,
normalized_folder,
normalized_name,
current_user.name,
)
else:
existing_entry.update(
{
"stored_name": stored_name,
"mime_type": mime_type,
"extension": extension,
"size_bytes": len(content),
"sha256": checksum,
"updated_at": now,
"uploaded_by": current_user.name,
"version_number": int(existing_entry.get("version_number", 1)) + 1,
"ingest_status": KNOWLEDGE_INGEST_STATUS_PUBLISHED,
"ingest_agent_run_id": "",
}
)
entry = existing_entry
logger.info(
"Knowledge document updated id=%s folder=%s filename=%s by=%s",
document_id,
normalized_folder,
normalized_name,
current_user.name,
)
self._save_index(index)
return self.get_document_detail(document_id)
def delete_document(self, document_id: str) -> None:
self.ensure_library_ready()
index = self._load_index()
entry = self._require_entry(index, document_id)
file_path = self._resolve_document_path(entry)
if file_path.exists():
file_path.unlink()
index["documents"] = [item for item in index["documents"] if item["id"] != document_id]
self._save_index(index)
logger.info("Knowledge document deleted id=%s filename=%s", document_id, entry["original_name"])
def get_document_content(self, document_id: str) -> tuple[Path, str, str]:
self.ensure_library_ready()
index = self._load_index()
entry = self._require_entry(index, document_id)
file_path = self._resolve_document_path(entry)
if not file_path.exists():
raise FileNotFoundError(entry["original_name"])
return file_path, entry["mime_type"], entry["original_name"]
def list_folder_documents(self, folder: str | None = None) -> list[dict[str, Any]]:
self.ensure_library_ready()
index = self._load_index()
if self._reconcile_document_ingest_statuses(index):
self._save_index(index)
documents = list(index.get("documents") or [])
if folder is None:
return documents
normalized_folder = self._normalize_folder(folder)
return [item for item in documents if item.get("folder") == normalized_folder]
def get_document_entry(self, document_id: str) -> dict[str, Any]:
self.ensure_library_ready()
index = self._load_index()
if self._reconcile_document_ingest_statuses(index, document_ids=[document_id]):
self._save_index(index)
return dict(self._require_entry(index, document_id))
def set_document_ingest_statuses(
self,
document_ids: list[str],
status_code: int,
*,
agent_run_id: str | None = None,
) -> None:
self.ensure_library_ready()
normalized_ids = {str(item).strip() for item in document_ids if str(item).strip()}
if not normalized_ids:
return
index = self._load_index()
changed = False
updated_at = datetime.now(UTC).isoformat()
for entry in index.get("documents", []):
if str(entry.get("id") or "").strip() not in normalized_ids:
continue
if self._normalize_ingest_status_code(entry.get("ingest_status")) == status_code:
if agent_run_id is not None and entry.get("ingest_agent_run_id") != agent_run_id:
entry["ingest_agent_run_id"] = agent_run_id
entry["ingest_status_updated_at"] = updated_at
changed = True
continue
entry["ingest_status"] = status_code
entry["ingest_status_updated_at"] = updated_at
if agent_run_id is not None:
entry["ingest_agent_run_id"] = agent_run_id
changed = True
if changed:
self._save_index(index)
def refresh_document_ingest_statuses(
self,
document_ids: list[str] | None = None,
*,
preserve_syncing: bool = True,
) -> None:
self.ensure_library_ready()
index = self._load_index()
if self._reconcile_document_ingest_statuses(
index,
document_ids=document_ids,
preserve_syncing=preserve_syncing,
):
self._save_index(index)
def get_llm_wiki_root(self) -> Path:
self.ensure_library_ready()
return self.llm_wiki_root
def search_llm_wiki(self, query: str, *, limit: int = KNOWLEDGE_SEARCH_RESULT_LIMIT) -> dict[str, Any]:
self.ensure_library_ready()
normalized_query = self._normalize_search_text(query)
if not normalized_query:
return {
"result_type": "knowledge_search",
"query": "",
"record_count": 0,
"hits": [],
"references": [],
"message": "请先输入要检索的制度或规则问题。",
}
index = self._load_index()
if self._reconcile_document_ingest_statuses(index):
self._save_index(index)
entry_by_id = {
str(item.get("id") or "").strip(): item
for item in list(index.get("documents") or [])
if str(item.get("id") or "").strip()
}
wiki_index = self._load_llm_wiki_index()
query_terms = self._extract_search_terms(query)
hits: list[dict[str, Any]] = []
for wiki_document in list(wiki_index.get("documents") or []):
document_id = str(wiki_document.get("document_id") or "").strip()
if not document_id:
continue
entry = entry_by_id.get(document_id)
if entry is None or not self._has_matching_llm_wiki_artifact(entry, wiki_document):
continue
quality_status = str(wiki_document.get("quality_status") or "").strip()
if quality_status == "failed":
continue
document_name = str(wiki_document.get("document_name") or entry.get("original_name") or "").strip()
document_dir = self.llm_wiki_documents_root / document_id
candidates = self._load_json_file(document_dir / "knowledge_candidates.json", default=[])
matched_in_document = False
for index, candidate in enumerate(candidates, start=1):
if not isinstance(candidate, dict):
continue
title = str(candidate.get("title") or "").strip()
content = str(candidate.get("content") or "").strip()
tags = [str(item).strip() for item in list(candidate.get("tags") or []) if str(item).strip()]
evidence = [
str(item).strip() for item in list(candidate.get("evidence") or []) if str(item).strip()
]
score, matched_terms = self._score_knowledge_search_match(
query_text=normalized_query,
query_terms=query_terms,
title=title,
content=content,
tags=tags,
document_name=document_name,
evidence=evidence,
)
if score <= 0:
continue
matched_in_document = True
candidate_id = str(candidate.get("candidate_id") or f"candidate_{index}").strip()
hits.append(
{
"code": f"knowledge.{document_id}.{candidate_id}",
"candidate_id": candidate_id,
"title": title or document_name or "制度知识条目",
"content": content,
"excerpt": self._build_search_excerpt(content or title, query_terms),
"document_id": document_id,
"document_name": document_name,
"version": str(wiki_document.get("document_version") or "").strip() or None,
"updated_at": self._format_search_timestamp(wiki_document.get("updated_at")),
"quality_status": quality_status,
"tags": tags,
"evidence": evidence,
"score": score,
"matched_terms": matched_terms,
}
)
self._boost_title_family_hits(hits)
ranked_hits = sorted(
hits,
key=lambda item: (
-int(item.get("score") or 0),
str(item.get("quality_status") or "") != "formal",
str(item.get("title") or ""),
),
)[: max(1, limit)]
if ranked_hits:
titles = "".join(str(item.get("title") or "") for item in ranked_hits[:2] if str(item.get("title") or "").strip())
return {
"result_type": "knowledge_search",
"query": str(query).strip(),
"record_count": len(ranked_hits),
"hits": ranked_hits,
"references": [str(item.get("code") or "").strip() for item in ranked_hits if str(item.get("code") or "").strip()],
"message": (
f"已从已归纳制度知识中检索到 {len(ranked_hits)} 条相关内容。"
f"{f'优先参考:{titles}' if titles else ''}"
),
}
return {
"result_type": "knowledge_search",
"query": str(query).strip(),
"record_count": 0,
"hits": [],
"references": [],
"message": (
f"当前未在已归纳制度知识中检索到与“{str(query).strip()}”直接匹配的内容。"
"知识问答仅基于 LLM Wiki 已形成的知识条目回答;当前依据不足,不能继续扩展回答。"
),
}
@staticmethod
def _boost_title_family_hits(hits: list[dict[str, Any]]) -> None:
if len(hits) < 2:
return
preliminary = sorted(
hits,
key=lambda item: (
-int(item.get("score") or 0),
str(item.get("quality_status") or "") != "formal",
str(item.get("title") or ""),
),
)
primary = preliminary[0]
primary_title = str(primary.get("title") or "").strip()
primary_document_id = str(primary.get("document_id") or "").strip()
if len(primary_title) < 3 or not primary_document_id:
return
family_key = primary_title[:3]
family_hits = [
item
for item in hits
if str(item.get("document_id") or "").strip() == primary_document_id
and str(item.get("title") or "").strip().startswith(family_key)
]
if len(family_hits) < 2:
return
for item in family_hits:
item["score"] = int(item.get("score") or 0) + 20
def extract_document_text(self, document_id: str) -> str:
self.ensure_library_ready()
entry = self.get_document_entry(document_id)
file_path = self._resolve_document_path(entry)
if not file_path.exists():
raise FileNotFoundError(entry["original_name"])
return self._extract_document_text_from_path(
file_path=file_path,
original_name=str(entry.get("original_name") or file_path.name),
mime_type=str(entry.get("mime_type") or "application/octet-stream"),
)
def build_onlyoffice_config(
self,
document_id: str,
current_user: CurrentUserContext,
) -> KnowledgeOnlyOfficeConfigRead:
self.ensure_library_ready()
settings = get_settings()
onlyoffice_settings = resolve_onlyoffice_settings()
if not onlyoffice_settings.enabled:
logger.warning(
"ONLYOFFICE disabled in runtime config doc=%s enabled=%s public_url=%s backend_url=%s jwt_set=%s",
document_id,
onlyoffice_settings.enabled,
onlyoffice_settings.public_url,
onlyoffice_settings.backend_url,
bool(onlyoffice_settings.jwt_secret),
)
raise ValueError("ONLYOFFICE 预览未启用。")
if not onlyoffice_settings.public_url or not onlyoffice_settings.backend_url:
logger.warning(
"ONLYOFFICE config incomplete doc=%s enabled=%s public_url=%s backend_url=%s jwt_set=%s",
document_id,
onlyoffice_settings.enabled,
onlyoffice_settings.public_url,
onlyoffice_settings.backend_url,
bool(onlyoffice_settings.jwt_secret),
)
raise ValueError("ONLYOFFICE 地址配置不完整。")
if not onlyoffice_settings.jwt_secret:
logger.warning(
"ONLYOFFICE JWT missing doc=%s enabled=%s public_url=%s backend_url=%s jwt_set=%s",
document_id,
onlyoffice_settings.enabled,
onlyoffice_settings.public_url,
onlyoffice_settings.backend_url,
bool(onlyoffice_settings.jwt_secret),
)
raise ValueError("ONLYOFFICE JWT 密钥未配置。")
index = self._load_index()
entry = self._require_entry(index, document_id)
extension = self._extract_extension(entry["original_name"])
if extension not in ONLYOFFICE_EDITABLE_EXTENSIONS:
raise ValueError("当前文件格式不支持 ONLYOFFICE 预览。")
document_type = self._resolve_onlyoffice_document_type(extension)
backend_base_url = onlyoffice_settings.backend_url.rstrip("/")
public_url = onlyoffice_settings.public_url.rstrip("/")
access_token = self._build_onlyoffice_access_token(document_id)
document_url = (
f"{backend_base_url}{settings.api_v1_prefix}/knowledge/documents/{document_id}/onlyoffice/content"
f"?access_token={access_token}"
)
callback_url = (
f"{backend_base_url}{settings.api_v1_prefix}/knowledge/documents/{document_id}/onlyoffice/callback"
)
document_key = self._build_onlyoffice_document_key(entry)
config: dict[str, Any] = {
"documentType": document_type,
"document": {
"fileType": extension,
"key": document_key,
"title": entry["original_name"],
"url": document_url,
"permissions": {
"download": True,
"edit": False,
"print": True,
"copy": True,
},
},
"editorConfig": {
"mode": "view",
"lang": "zh-CN",
"callbackUrl": callback_url,
"user": {
"id": current_user.username,
"name": current_user.name,
},
"customization": {
"compactHeader": True,
"compactToolbar": True,
"toolbarNoTabs": False,
"autosave": False,
"forcesave": False,
},
},
"width": "100%",
"height": "100%",
}
config["token"] = jwt.encode(config, onlyoffice_settings.jwt_secret, algorithm="HS256")
return KnowledgeOnlyOfficeConfigRead(
documentServerUrl=public_url,
config=config,
)
def validate_onlyoffice_access_token(self, document_id: str, access_token: str) -> None:
onlyoffice_settings = resolve_onlyoffice_settings()
try:
payload = jwt.decode(
access_token,
onlyoffice_settings.jwt_secret,
algorithms=["HS256"],
)
except jwt.PyJWTError as exc:
raise ValueError("ONLYOFFICE 文件访问令牌无效。") from exc
if payload.get("scope") != "onlyoffice-content" or payload.get("document_id") != document_id:
raise ValueError("ONLYOFFICE 文件访问令牌无效。")
def handle_onlyoffice_callback(self, document_id: str, payload: dict[str, Any]) -> None:
self.ensure_library_ready()
callback = self._parse_onlyoffice_callback(payload)
if callback.status not in {2, 6} or not callback.download_url:
return
logger.info(
"ONLYOFFICE callback received id=%s status=%s users=%s",
document_id,
callback.status,
",".join(callback.users) if callback.users else "-",
)
request = Request(callback.download_url, headers={"User-Agent": "x-financial-onlyoffice"})
with urlopen(request, timeout=30) as response: # noqa: S310
content = response.read()
actor_name = callback.users[0] if callback.users else "ONLYOFFICE"
self._replace_document_content(document_id, content, actor_name=actor_name)
def _load_documents(self) -> list[KnowledgeDocumentRead]:
self.ensure_library_ready()
index = self._load_index()
changed = self._reconcile_index(index)
changed = self._reconcile_document_ingest_statuses(index) or changed
if changed:
self._save_index(index)
wiki_by_document_id = self._build_wiki_document_map()
documents = [
self._serialize_document(entry, wiki_document=wiki_by_document_id.get(str(entry.get("id") or "").strip()))
for entry in index["documents"]
]
return sorted(documents, key=lambda item: item.time, reverse=True)
def _serialize_document(
self,
entry: dict[str, Any],
*,
wiki_document: dict[str, Any] | None = None,
) -> KnowledgeDocumentRead:
extension = entry.get("extension") or self._extract_extension(entry["original_name"])
file_type = self._resolve_file_type(extension)
size_bytes = int(entry.get("size_bytes") or 0)
updated_at = self._format_time(entry.get("updated_at") or entry.get("created_at"))
state_code = self._normalize_ingest_status_code(entry.get("ingest_status"))
state_label, state_tone = KNOWLEDGE_INGEST_STATUS_META.get(
state_code,
KNOWLEDGE_INGEST_STATUS_META[KNOWLEDGE_INGEST_STATUS_PUBLISHED],
)
llm_wiki_available = self._has_matching_llm_wiki_artifact(entry, wiki_document)
return KnowledgeDocumentRead(
id=entry["id"],
name=entry["original_name"],
folder=entry["folder"],
tag=f"{entry['folder']} / {extension.upper() or 'FILE'}",
time=updated_at,
version=f"v{int(entry.get('version_number', 1))}.0",
stateCode=state_code,
state=state_label,
stateTone=state_tone,
owner=entry.get("uploaded_by") or "系统导入",
icon=ICON_BY_TYPE.get(file_type, ICON_BY_TYPE["binary"]),
fileType=file_type,
fileTypeLabel=self._resolve_file_type_label(file_type),
summary=f"{entry['folder']} · {extension.upper() or 'FILE'} · {self._format_size(size_bytes)}",
mimeType=entry.get("mime_type") or "application/octet-stream",
extension=extension,
sizeBytes=size_bytes,
canPreview=self._can_preview(extension),
llmWikiAvailable=llm_wiki_available,
llmWikiQualityStatus=str(wiki_document.get("quality_status") or "").strip()
if llm_wiki_available and isinstance(wiki_document, dict)
else "",
llmWikiQualityNote=str(wiki_document.get("quality_note") or "").strip()
if llm_wiki_available and isinstance(wiki_document, dict)
else "",
)
def _build_preview(
self, entry: dict[str, Any]
) -> tuple[str, list[KnowledgePreviewPageRead]]:
extension = self._extract_extension(entry["original_name"])
file_path = self._resolve_document_path(entry)
if extension == "pdf":
return "pdf", []
if extension in IMAGE_EXTENSIONS:
return "image", []
if extension in TEXT_EXTENSIONS:
text = self._read_text_preview(file_path)
return "text", [self._build_text_preview_page(entry, text)]
if extension == "docx":
text = self._extract_docx_text(file_path)
return "text", [self._build_text_preview_page(entry, text)]
if extension == "xlsx":
return "table", self._build_xlsx_preview_pages(entry, file_path)
if extension == "pptx":
return "slides", self._build_pptx_preview_pages(entry, file_path)
return (
"unsupported",
[
KnowledgePreviewPageRead(
title=entry["original_name"],
subtitle="当前格式暂不支持在线解析预览。",
stats=[
KnowledgePreviewStatRead(label="文件格式", value=extension.upper() or "FILE"),
KnowledgePreviewStatRead(label="文件大小", value=self._format_size(entry["size_bytes"])),
KnowledgePreviewStatRead(label="建议操作", value="下载后查看"),
],
blocks=[
KnowledgePreviewBlockRead(
heading="预览说明",
lines=[
"当前系统已支持该文件的上传、下载和权限控制。",
"如需在线预览,可后续接入专门的文档转换服务。",
],
)
],
)
],
)
def _build_text_preview_page(
self, entry: dict[str, Any], text: str
) -> KnowledgePreviewPageRead:
lines = [line.strip() for line in text.splitlines() if line.strip()]
if not lines:
lines = ["文件内容为空,或当前文档未提取到可展示文本。"]
groups = [lines[index : index + 8] for index in range(0, min(len(lines), 24), 8)]
blocks = [
KnowledgePreviewBlockRead(heading=f"内容片段 {index + 1}", lines=group)
for index, group in enumerate(groups)
]
return KnowledgePreviewPageRead(
title=entry["original_name"],
subtitle="文本提取预览",
stats=[
KnowledgePreviewStatRead(label="文件格式", value=entry["extension"].upper() or "TEXT"),
KnowledgePreviewStatRead(label="可见行数", value=str(len(lines))),
KnowledgePreviewStatRead(label="文件大小", value=self._format_size(entry["size_bytes"])),
],
blocks=blocks,
)
def _build_xlsx_preview_pages(
self, entry: dict[str, Any], file_path: Path
) -> list[KnowledgePreviewPageRead]:
sheets = self._extract_xlsx_sheets(file_path)
if not sheets:
sheets = [("Sheet 1", [["未提取到表格内容。"]])]
preview_pages: list[KnowledgePreviewPageRead] = []
sheet_count = len(sheets)
for sheet_name, rows in sheets[:8]:
visible_rows = rows[:12] if rows else [["未提取到表格内容。"]]
blocks = [
KnowledgePreviewBlockRead(
heading=f"{index + 1}",
lines=[" | ".join((cell or "") for cell in row)],
)
for index, row in enumerate(visible_rows)
]
preview_pages.append(
KnowledgePreviewPageRead(
title=sheet_name,
subtitle="表格内容预览",
stats=[
KnowledgePreviewStatRead(label="工作表数量", value=str(sheet_count)),
KnowledgePreviewStatRead(label="预览行数", value=str(len(visible_rows))),
KnowledgePreviewStatRead(label="文件大小", value=self._format_size(entry["size_bytes"])),
],
blocks=blocks,
)
)
return preview_pages
def _build_pptx_preview_pages(
self, entry: dict[str, Any], file_path: Path
) -> list[KnowledgePreviewPageRead]:
slides = self._extract_pptx_slides(file_path)
if not slides:
slides = [["未提取到幻灯片文本。"]]
pages: list[KnowledgePreviewPageRead] = []
for index, slide_lines in enumerate(slides[:8]):
pages.append(
KnowledgePreviewPageRead(
title=entry["original_name"],
subtitle=f"幻灯片 {index + 1}",
stats=[
KnowledgePreviewStatRead(label="页码", value=str(index + 1)),
KnowledgePreviewStatRead(label="文本条数", value=str(len(slide_lines))),
KnowledgePreviewStatRead(label="文件格式", value="PPTX"),
],
blocks=[
KnowledgePreviewBlockRead(
heading="幻灯片内容",
lines=slide_lines or ["该页未提取到文本内容。"],
)
],
)
)
return pages
def _load_index(self) -> dict[str, Any]:
try:
payload = json.loads(self.index_path.read_text(encoding="utf-8"))
except (FileNotFoundError, json.JSONDecodeError):
payload = {"version": 1, "documents": []}
payload.setdefault("documents", [])
return payload
def _save_index(self, index: dict[str, Any]) -> None:
self.index_path.write_text(
json.dumps(index, ensure_ascii=False, indent=2),
encoding="utf-8",
)
def _reconcile_index(self, index: dict[str, Any]) -> bool:
changed = False
documents = index.setdefault("documents", [])
known_by_stored = {
(item["folder"], item["stored_name"]): item
for item in documents
if item.get("folder") and item.get("stored_name")
}
existing_items: list[dict[str, Any]] = []
for item in documents:
file_path = self._resolve_document_path(item)
if file_path.exists():
item["size_bytes"] = file_path.stat().st_size
item["extension"] = self._extract_extension(item["original_name"])
item["mime_type"] = item.get("mime_type") or (
mimetypes.guess_type(item["original_name"])[0] or "application/octet-stream"
)
normalized_status = self._normalize_ingest_status_code(item.get("ingest_status"))
if item.get("ingest_status") != normalized_status:
item["ingest_status"] = normalized_status
changed = True
if "ingest_agent_run_id" not in item:
item["ingest_agent_run_id"] = ""
changed = True
existing_items.append(item)
else:
changed = True
for folder_name in FIXED_KNOWLEDGE_FOLDERS:
folder_path = self.library_root / folder_name
for file_path in folder_path.iterdir():
if not file_path.is_file() or file_path.name.startswith("."):
continue
key = (folder_name, file_path.name)
if key in known_by_stored:
continue
document_id, original_name = self._parse_stored_name(file_path.name)
stat = file_path.stat()
existing_items.append(
{
"id": document_id,
"folder": folder_name,
"original_name": original_name,
"stored_name": file_path.name,
"mime_type": mimetypes.guess_type(original_name)[0]
or "application/octet-stream",
"extension": self._extract_extension(original_name),
"size_bytes": stat.st_size,
"sha256": "",
"created_at": datetime.fromtimestamp(stat.st_ctime, tz=UTC).isoformat(),
"updated_at": datetime.fromtimestamp(stat.st_mtime, tz=UTC).isoformat(),
"uploaded_by": "系统导入",
"version_number": 1,
"ingest_status": KNOWLEDGE_INGEST_STATUS_PUBLISHED,
"ingest_agent_run_id": "",
}
)
changed = True
if changed or len(existing_items) != len(documents):
index["documents"] = existing_items
return True
return False
def _reconcile_document_ingest_statuses(
self,
index: dict[str, Any],
*,
document_ids: list[str] | None = None,
preserve_syncing: bool = True,
) -> bool:
changed = False
target_ids = {str(item).strip() for item in document_ids or [] if str(item).strip()}
wiki_index = self._load_llm_wiki_index()
wiki_by_document_id = {
str(item.get("document_id") or "").strip(): item
for item in list(wiki_index.get("documents") or [])
if str(item.get("document_id") or "").strip()
}
for entry in index.get("documents", []):
document_id = str(entry.get("id") or "").strip()
if target_ids and document_id not in target_ids:
continue
current_status = self._normalize_ingest_status_code(entry.get("ingest_status"))
if entry.get("ingest_status") != current_status:
entry["ingest_status"] = current_status
changed = True
if (
current_status == KNOWLEDGE_INGEST_STATUS_SYNCING
and preserve_syncing
and self._should_preserve_syncing_status(entry)
):
continue
desired_status = (
KNOWLEDGE_INGEST_STATUS_INGESTED
if self._has_ingested_llm_wiki_document(entry, wiki_by_document_id.get(document_id))
else KNOWLEDGE_INGEST_STATUS_PUBLISHED
)
if current_status == KNOWLEDGE_INGEST_STATUS_FAILED and desired_status != KNOWLEDGE_INGEST_STATUS_INGESTED:
continue
if current_status != desired_status:
entry["ingest_status"] = desired_status
changed = True
return changed
def _load_llm_wiki_index(self) -> dict[str, Any]:
try:
payload = json.loads(self.llm_wiki_index_path.read_text(encoding="utf-8"))
except (FileNotFoundError, json.JSONDecodeError):
payload = {"documents": []}
payload.setdefault("documents", [])
return payload
def _build_wiki_document_map(self) -> dict[str, dict[str, Any]]:
wiki_index = self._load_llm_wiki_index()
return {
str(item.get("document_id") or "").strip(): item
for item in list(wiki_index.get("documents") or [])
if str(item.get("document_id") or "").strip()
}
@staticmethod
def _load_json_file(path: Path, *, default: Any) -> Any:
try:
return json.loads(path.read_text(encoding="utf-8"))
except (FileNotFoundError, json.JSONDecodeError):
return default
@staticmethod
def _load_text_file(path: Path) -> str:
try:
return path.read_text(encoding="utf-8").strip()
except FileNotFoundError:
return ""
@staticmethod
def _normalize_search_text(value: Any) -> str:
text = str(value or "").strip().lower()
return re.sub(r"[^0-9a-z\u4e00-\u9fff]+", "", text)
@staticmethod
def _extract_search_terms(query: str) -> list[str]:
normalized = KnowledgeService._normalize_search_text(query)
if not normalized:
return []
terms: set[str] = set()
for part in re.findall(r"[0-9a-z]+|[\u4e00-\u9fff]+", normalized):
if len(part) <= 1:
continue
if part not in KNOWLEDGE_SEARCH_STOP_TERMS:
terms.add(part)
if not re.fullmatch(r"[\u4e00-\u9fff]+", part):
continue
upper_size = min(4, len(part))
for size in range(2, upper_size + 1):
for index in range(0, len(part) - size + 1):
gram = part[index : index + size]
if gram in KNOWLEDGE_SEARCH_STOP_TERMS:
continue
terms.add(gram)
return sorted(terms, key=lambda item: (-len(item), item))
@staticmethod
def _score_knowledge_search_match(
*,
query_text: str,
query_terms: list[str],
title: str,
content: str,
tags: list[str],
document_name: str,
evidence: list[str],
) -> tuple[int, list[str]]:
normalized_title = KnowledgeService._normalize_search_text(title)
normalized_content = KnowledgeService._normalize_search_text(content)
normalized_tags = [KnowledgeService._normalize_search_text(item) for item in tags]
normalized_document_name = KnowledgeService._normalize_search_text(document_name)
normalized_evidence = [KnowledgeService._normalize_search_text(item) for item in evidence]
score = 0
matched_terms: list[str] = []
if query_text and query_text in normalized_title:
score += 140
elif query_text and any(query_text in item for item in normalized_tags):
score += 120
elif query_text and query_text in normalized_content:
score += 88
for phrase in [normalized_title, *normalized_tags, normalized_document_name]:
if not phrase:
continue
if phrase in query_text:
score += 24 + min(18, len(phrase) * 2)
matched_terms.append(phrase)
elif query_text and query_text in phrase:
score += 16
for term in query_terms:
if len(term) <= 1:
continue
term_score = 0
if term in normalized_title:
term_score = 18 if len(term) >= 4 else 14
elif any(term in item for item in normalized_tags):
term_score = 16 if len(term) >= 4 else 12
elif term in normalized_content:
term_score = 10 if len(term) >= 4 else 8
elif term in normalized_document_name or any(term in item for item in normalized_evidence):
term_score = 6
if term_score:
score += term_score
matched_terms.append(term)
if score <= 0:
return 0, []
distinct_matches = []
for item in matched_terms:
if item and item not in distinct_matches:
distinct_matches.append(item)
score += min(24, len(distinct_matches) * 4)
return score, distinct_matches[:6]
@staticmethod
def _build_search_excerpt(text: str, query_terms: list[str], *, max_length: int = 140) -> str:
plain_text = re.sub(r"[#*_`>\-\[\]]+", " ", str(text or ""))
plain_text = re.sub(r"\s+", " ", plain_text).strip()
if not plain_text:
return ""
normalized_text = KnowledgeService._normalize_search_text(plain_text)
for term in query_terms:
if not term or term not in normalized_text:
continue
raw_index = plain_text.find(term)
if raw_index == -1:
continue
start = max(0, raw_index - 36)
end = min(len(plain_text), raw_index + max_length - 36)
snippet = plain_text[start:end].strip(" ,。;:")
if start > 0:
snippet = f"...{snippet}"
if end < len(plain_text):
snippet = f"{snippet}..."
return snippet
if len(plain_text) <= max_length:
return plain_text
return f"{plain_text[: max_length - 3].rstrip()}..."
@staticmethod
def _format_search_timestamp(value: Any) -> str | None:
raw_value = str(value or "").strip()
if not raw_value:
return None
try:
parsed = datetime.fromisoformat(raw_value)
except ValueError:
return raw_value or None
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=UTC)
return parsed.astimezone(UTC).date().isoformat()
def _has_ingested_llm_wiki_document(
self,
entry: dict[str, Any],
wiki_document: dict[str, Any] | None,
) -> bool:
if not isinstance(wiki_document, dict):
return False
if int(wiki_document.get("knowledge_candidate_count") or 0) <= 0:
return False
if str(wiki_document.get("quality_status") or "").strip() in {"fallback_only", "runtime_only", "failed"}:
return False
current_signature = self._build_llm_wiki_document_signature(entry)
wiki_signature = wiki_document.get("signature")
if isinstance(wiki_signature, dict):
return wiki_signature == current_signature
return (
str(wiki_document.get("document_id") or "").strip() == str(entry.get("id") or "").strip()
and str(wiki_document.get("checksum") or "").strip() == str(entry.get("sha256") or "").strip()
)
def _has_matching_llm_wiki_artifact(
self,
entry: dict[str, Any],
wiki_document: dict[str, Any] | None,
) -> bool:
if not isinstance(wiki_document, dict):
return False
current_signature = self._build_llm_wiki_document_signature(entry)
wiki_signature = wiki_document.get("signature")
if isinstance(wiki_signature, dict):
return wiki_signature == current_signature
return (
str(wiki_document.get("document_id") or "").strip() == str(entry.get("id") or "").strip()
and str(wiki_document.get("checksum") or "").strip() == str(entry.get("sha256") or "").strip()
)
@staticmethod
def _build_llm_wiki_document_signature(entry: dict[str, Any]) -> dict[str, Any]:
return {
"document_id": str(entry.get("id") or ""),
"original_name": str(entry.get("original_name") or ""),
"stored_name": str(entry.get("stored_name") or ""),
"sha256": str(entry.get("sha256") or ""),
"version_number": int(entry.get("version_number") or 1),
"updated_at": str(entry.get("updated_at") or ""),
}
@staticmethod
def _normalize_ingest_status_code(value: Any) -> int:
try:
status_code = int(value)
except (TypeError, ValueError):
return KNOWLEDGE_INGEST_STATUS_PUBLISHED
if status_code not in KNOWLEDGE_INGEST_STATUS_META:
return KNOWLEDGE_INGEST_STATUS_PUBLISHED
return status_code
@staticmethod
def _is_syncing_status_stale(entry: dict[str, Any]) -> bool:
raw_value = str(entry.get("ingest_status_updated_at") or "").strip()
if not raw_value:
return True
try:
updated_at = datetime.fromisoformat(raw_value)
except ValueError:
return True
if updated_at.tzinfo is None:
updated_at = updated_at.replace(tzinfo=UTC)
age_seconds = (datetime.now(UTC) - updated_at.astimezone(UTC)).total_seconds()
return age_seconds >= KNOWLEDGE_INGEST_SYNC_STALE_SECONDS
def _should_preserve_syncing_status(self, entry: dict[str, Any]) -> bool:
agent_run_id = str(entry.get("ingest_agent_run_id") or "").strip()
if not agent_run_id or self.db is None:
return not self._is_syncing_status_stale(entry)
run = self.db.scalar(select(AgentRun).where(AgentRun.run_id == agent_run_id))
if run is None:
return not self._is_syncing_status_stale(entry)
if run.status != AgentRunStatus.RUNNING.value:
return False
heartbeat_at = str((run.route_json or {}).get("heartbeat_at") or "").strip()
if heartbeat_at:
probe_entry = {"ingest_status_updated_at": heartbeat_at}
return not self._is_syncing_status_stale(probe_entry)
return not self._is_syncing_status_stale(entry)
def _require_entry(self, index: dict[str, Any], document_id: str) -> dict[str, Any]:
for entry in index["documents"]:
if entry["id"] == document_id:
return entry
raise FileNotFoundError(document_id)
def _resolve_document_path(self, entry: dict[str, Any]) -> Path:
return self.library_root / entry["folder"] / entry["stored_name"]
def _replace_document_content(self, document_id: str, content: bytes, actor_name: str) -> KnowledgeDocumentDetailRead:
index = self._load_index()
entry = self._require_entry(index, document_id)
current_user = CurrentUserContext(
username="onlyoffice",
name=actor_name or "ONLYOFFICE",
role_codes=["manager"],
is_admin=True,
)
return self.upload_document(
folder=entry["folder"],
filename=entry["original_name"],
content=content,
current_user=current_user,
)
@staticmethod
def _parse_onlyoffice_callback(payload: dict[str, Any]) -> OnlyOfficeCallbackPayload:
status = int(payload.get("status") or 0)
download_url = str(payload.get("url") or "").strip()
users = [str(item).strip() for item in payload.get("users") or [] if str(item).strip()]
return OnlyOfficeCallbackPayload(status=status, download_url=download_url, users=users)
@staticmethod
def _normalize_filename(filename: str) -> str:
normalized = Path(str(filename or "").strip()).name.strip()
normalized = normalized.replace("/", "_").replace("\\", "_")
if not normalized:
raise ValueError("文件名不能为空。")
return normalized
@staticmethod
def _normalize_folder(folder: str) -> str:
normalized = str(folder or "").strip()
if normalized not in FIXED_KNOWLEDGE_FOLDERS:
raise ValueError("只能上传到预设知识库文件夹。")
return normalized
@staticmethod
def _extract_extension(filename: str) -> str:
suffix = Path(filename).suffix.lower().lstrip(".")
return suffix
@staticmethod
def _build_onlyoffice_document_key(entry: dict[str, Any]) -> str:
version = int(entry.get("version_number", 1))
checksum = str(entry.get("sha256") or "")[:12]
return f"{entry['id']}-v{version}-{checksum or 'nochecksum'}"
def _build_onlyoffice_access_token(self, document_id: str) -> str:
onlyoffice_settings = resolve_onlyoffice_settings()
payload = {
"scope": "onlyoffice-content",
"document_id": document_id,
}
return jwt.encode(payload, onlyoffice_settings.jwt_secret, algorithm="HS256")
@staticmethod
def _resolve_onlyoffice_document_type(extension: str) -> str:
if extension in WORD_EXTENSIONS:
return "word"
if extension in EXCEL_EXTENSIONS:
return "cell"
if extension in PPT_EXTENSIONS:
return "slide"
raise ValueError("当前文件格式不支持 ONLYOFFICE 预览。")
@staticmethod
def _parse_stored_name(stored_name: str) -> tuple[str, str]:
if "__" not in stored_name:
return uuid4().hex, stored_name
document_id, original_name = stored_name.split("__", 1)
return document_id or uuid4().hex, original_name or stored_name
@staticmethod
def _format_time(value: str | None) -> str:
if not value:
return ""
try:
parsed = datetime.fromisoformat(value)
except ValueError:
return value
return parsed.astimezone(UTC).strftime("%Y-%m-%d %H:%M")
@staticmethod
def _format_size(size_bytes: int) -> str:
if size_bytes < 1024:
return f"{size_bytes} B"
if size_bytes < 1024 * 1024:
return f"{size_bytes / 1024:.1f} KB"
return f"{size_bytes / (1024 * 1024):.1f} MB"
@staticmethod
def _resolve_file_type(extension: str) -> str:
if extension == "pdf":
return "pdf"
if extension in WORD_EXTENSIONS:
return "word"
if extension in EXCEL_EXTENSIONS:
return "excel"
if extension in PPT_EXTENSIONS:
return "ppt"
if extension in IMAGE_EXTENSIONS:
return "image"
if extension in TEXT_EXTENSIONS:
return "text"
if extension in ARCHIVE_EXTENSIONS:
return "archive"
return "binary"
@staticmethod
def _resolve_file_type_label(file_type: str) -> str:
mapping = {
"pdf": "PDF 预览",
"word": "Word 预览",
"excel": "Excel 预览",
"ppt": "PPT 预览",
"image": "图片预览",
"text": "文本预览",
"archive": "压缩包",
"binary": "文件预览",
}
return mapping.get(file_type, "文件预览")
@staticmethod
def _can_preview(extension: str) -> bool:
return extension in INLINE_PREVIEW_EXTENSIONS or extension in STRUCTURED_PREVIEW_EXTENSIONS
@staticmethod
def _read_text_preview(file_path: Path) -> str:
encodings = ("utf-8", "utf-8-sig", "gbk")
for encoding in encodings:
try:
return file_path.read_text(encoding=encoding)
except UnicodeDecodeError:
continue
return "当前文本文件编码暂不支持在线解析。"
@staticmethod
def _extract_docx_text(file_path: Path) -> str:
try:
with ZipFile(file_path) as archive:
xml_content = archive.read("word/document.xml")
except (BadZipFile, KeyError):
return "当前 Word 文件解析失败。"
root = ElementTree.fromstring(xml_content)
texts = [node.text.strip() for node in root.iter() if node.tag.endswith("}t") and node.text]
return "\n".join(texts)
def _extract_document_text_from_path(
self,
*,
file_path: Path,
original_name: str,
mime_type: str,
) -> str:
extension = self._extract_extension(original_name)
if extension in TEXT_EXTENSIONS:
return self._normalize_extracted_text(self._read_text_preview(file_path))
if extension == "docx":
return self._normalize_extracted_text(self._extract_docx_text(file_path))
if extension == "pdf":
text = self._normalize_extracted_text(self._extract_pdf_text(file_path))
if text:
return text
return self._normalize_extracted_text(
self._extract_text_with_ocr(
file_path=file_path,
original_name=original_name,
mime_type=mime_type,
)
)
if extension in IMAGE_EXTENSIONS:
return self._normalize_extracted_text(
self._extract_text_with_ocr(
file_path=file_path,
original_name=original_name,
mime_type=mime_type,
)
)
return ""
@staticmethod
def _normalize_extracted_text(text: str) -> str:
normalized = str(text or "").replace("\r\n", "\n").replace("\r", "\n")
normalized = re.sub(r"\n{3,}", "\n\n", normalized)
return normalized.strip()
@staticmethod
def _extract_pdf_text(file_path: Path) -> str:
pdftotext_bin = shutil.which("pdftotext")
if not pdftotext_bin:
return ""
completed = subprocess.run(
[pdftotext_bin, str(file_path), "-"],
capture_output=True,
text=True,
timeout=40,
check=False,
)
if completed.returncode != 0:
return ""
return str(completed.stdout or "")
@staticmethod
def _extract_text_with_ocr(
*,
file_path: Path,
original_name: str,
mime_type: str,
) -> str:
try:
from app.services.ocr import OcrService
result = OcrService().recognize_files(
[(original_name, file_path.read_bytes(), mime_type)]
)
except Exception:
return ""
parts: list[str] = []
for document in result.documents:
text = str(getattr(document, "text", "") or "").strip()
summary = str(getattr(document, "summary", "") or "").strip()
if text:
parts.append(text)
elif summary:
parts.append(summary)
return "\n\n".join(part for part in parts if part)
@staticmethod
def _extract_xlsx_sheets(file_path: Path) -> list[tuple[str, list[list[str]]]]:
try:
with ZipFile(file_path) as archive:
shared_strings: list[str] = []
if "xl/sharedStrings.xml" in archive.namelist():
shared_root = ElementTree.fromstring(archive.read("xl/sharedStrings.xml"))
shared_strings = [
"".join(node.itertext()).strip()
for node in shared_root.iter()
if node.tag.endswith("}si")
]
sheet_files = sorted(
name
for name in archive.namelist()
if re.fullmatch(r"xl/worksheets/sheet\d+\.xml", name)
)
if not sheet_files:
return []
relationship_targets: dict[str, str] = {}
if "xl/_rels/workbook.xml.rels" in archive.namelist():
rel_root = ElementTree.fromstring(archive.read("xl/_rels/workbook.xml.rels"))
for node in rel_root.iter():
if not node.tag.endswith("Relationship"):
continue
rel_id = node.attrib.get("Id")
target = node.attrib.get("Target")
if not rel_id or not target:
continue
normalized = target.lstrip("/")
if not normalized.startswith("xl/"):
normalized = f"xl/{normalized.lstrip('./')}"
relationship_targets[rel_id] = normalized
ordered_sheets: list[tuple[str, str]] = []
if "xl/workbook.xml" in archive.namelist():
workbook_root = ElementTree.fromstring(archive.read("xl/workbook.xml"))
for index, node in enumerate(workbook_root.iter()):
if not node.tag.endswith("sheet"):
continue
sheet_name = node.attrib.get("name") or f"Sheet {index + 1}"
relationship_id = next(
(value for key, value in node.attrib.items() if key.endswith("}id")),
None,
)
target = relationship_targets.get(relationship_id or "")
if target:
ordered_sheets.append((sheet_name, target))
if not ordered_sheets:
ordered_sheets = [
(f"Sheet {index + 1}", sheet_file)
for index, sheet_file in enumerate(sheet_files)
]
preview_sheets: list[tuple[str, list[list[str]]]] = []
for sheet_name, target in ordered_sheets:
if target not in archive.namelist():
continue
sheet_root = ElementTree.fromstring(archive.read(target))
rows: list[list[str]] = []
for row in sheet_root.iter():
if not row.tag.endswith("}row"):
continue
row_values: list[str] = []
for cell in row:
if not cell.tag.endswith("}c"):
continue
cell_type = cell.attrib.get("t")
value_node = next((item for item in cell if item.tag.endswith("}v")), None)
if cell_type == "inlineStr":
text_node = next((item for item in cell.iter() if item.tag.endswith("}t")), None)
row_values.append((text_node.text or "").strip() if text_node is not None else "")
continue
if value_node is None or value_node.text is None:
row_values.append("")
continue
raw_value = value_node.text.strip()
if cell_type == "s" and raw_value.isdigit():
index = int(raw_value)
row_values.append(shared_strings[index] if index < len(shared_strings) else raw_value)
else:
row_values.append(raw_value)
if row_values:
rows.append(row_values)
preview_sheets.append((sheet_name, rows))
return preview_sheets
except (BadZipFile, ElementTree.ParseError, KeyError, ValueError):
return []
@staticmethod
def _extract_pptx_slides(file_path: Path) -> list[list[str]]:
try:
with ZipFile(file_path) as archive:
slide_names = sorted(
name
for name in archive.namelist()
if re.fullmatch(r"ppt/slides/slide\d+\.xml", name)
)
slides: list[list[str]] = []
for slide_name in slide_names:
root = ElementTree.fromstring(archive.read(slide_name))
texts = [node.text.strip() for node in root.iter() if node.tag.endswith("}t") and node.text]
slides.append(texts)
return slides
except (BadZipFile, ElementTree.ParseError, KeyError):
return []