2026-05-09 03:04:09 +00:00
|
|
|
from __future__ import annotations
|
|
|
|
|
|
|
|
|
|
import hashlib
|
|
|
|
|
import json
|
|
|
|
|
import mimetypes
|
|
|
|
|
import re
|
2026-05-09 04:25:30 +00:00
|
|
|
from dataclasses import dataclass
|
2026-05-09 03:04:09 +00:00
|
|
|
from datetime import UTC, datetime
|
|
|
|
|
from pathlib import Path
|
|
|
|
|
from typing import Any
|
2026-05-09 04:25:30 +00:00
|
|
|
from urllib.request import Request, urlopen
|
2026-05-09 03:04:09 +00:00
|
|
|
from uuid import uuid4
|
|
|
|
|
from xml.etree import ElementTree
|
|
|
|
|
from zipfile import BadZipFile, ZipFile
|
|
|
|
|
|
2026-05-09 04:25:30 +00:00
|
|
|
import jwt
|
|
|
|
|
|
2026-05-09 03:04:09 +00:00
|
|
|
from app.api.deps import CurrentUserContext
|
|
|
|
|
from app.core.config import get_settings
|
|
|
|
|
from app.core.logging import get_logger
|
|
|
|
|
from app.schemas.knowledge import (
|
|
|
|
|
KnowledgeDocumentDetailRead,
|
|
|
|
|
KnowledgeDocumentRead,
|
|
|
|
|
KnowledgeFolderRead,
|
|
|
|
|
KnowledgeLibraryRead,
|
2026-05-09 04:25:30 +00:00
|
|
|
KnowledgeOnlyOfficeConfigRead,
|
2026-05-09 03:04:09 +00:00
|
|
|
KnowledgePreviewBlockRead,
|
|
|
|
|
KnowledgePreviewPageRead,
|
|
|
|
|
KnowledgePreviewStatRead,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
logger = get_logger("app.services.knowledge")
|
|
|
|
|
|
|
|
|
|
FIXED_KNOWLEDGE_FOLDERS = [
|
|
|
|
|
"财务知识库",
|
|
|
|
|
"制度政策",
|
|
|
|
|
"报销制度",
|
|
|
|
|
"差旅规范",
|
|
|
|
|
"发票管理",
|
|
|
|
|
"税务合规",
|
|
|
|
|
"预算管理",
|
|
|
|
|
"财务共享",
|
|
|
|
|
"培训资料",
|
|
|
|
|
"常见问答",
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
ICON_BY_TYPE = {
|
|
|
|
|
"pdf": "mdi mdi-file-document-outline-pdf pdf",
|
|
|
|
|
"word": "mdi mdi-file-document-outline-word word",
|
|
|
|
|
"excel": "mdi mdi-file-document-outline-excel excel",
|
|
|
|
|
"ppt": "mdi mdi-file-powerpoint-box ppt",
|
|
|
|
|
"image": "mdi mdi-file-image-outline image",
|
|
|
|
|
"text": "mdi mdi-file-document-outline text",
|
|
|
|
|
"archive": "mdi mdi-folder-zip-outline archive",
|
|
|
|
|
"binary": "mdi mdi-file-outline",
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
TEXT_EXTENSIONS = {"txt", "md", "csv", "json", "xml", "yml", "yaml", "log"}
|
|
|
|
|
WORD_EXTENSIONS = {"doc", "docx"}
|
|
|
|
|
EXCEL_EXTENSIONS = {"xls", "xlsx", "csv"}
|
|
|
|
|
PPT_EXTENSIONS = {"ppt", "pptx"}
|
|
|
|
|
IMAGE_EXTENSIONS = {"png", "jpg", "jpeg", "gif", "bmp", "webp", "svg"}
|
|
|
|
|
ARCHIVE_EXTENSIONS = {"zip", "rar", "7z"}
|
|
|
|
|
STRUCTURED_PREVIEW_EXTENSIONS = {"docx", "xlsx", "pptx"} | TEXT_EXTENSIONS
|
|
|
|
|
INLINE_PREVIEW_EXTENSIONS = {"pdf"} | IMAGE_EXTENSIONS
|
2026-05-09 04:25:30 +00:00
|
|
|
ONLYOFFICE_EDITABLE_EXTENSIONS = {"docx", "xlsx", "pptx"}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass(slots=True)
|
|
|
|
|
class OnlyOfficeCallbackPayload:
|
|
|
|
|
status: int
|
|
|
|
|
download_url: str
|
|
|
|
|
users: list[str]
|
2026-05-09 03:04:09 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
def prepare_knowledge_library() -> None:
|
|
|
|
|
KnowledgeService().ensure_library_ready()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class KnowledgeService:
|
|
|
|
|
def __init__(self, storage_root: Path | None = None) -> None:
|
|
|
|
|
settings = get_settings()
|
|
|
|
|
self.storage_root = Path(storage_root or settings.resolved_storage_root_dir)
|
|
|
|
|
self.library_root = self.storage_root / "knowledge"
|
|
|
|
|
self.index_path = self.library_root / ".index.json"
|
|
|
|
|
|
|
|
|
|
def ensure_library_ready(self) -> None:
|
|
|
|
|
self.library_root.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
for folder_name in FIXED_KNOWLEDGE_FOLDERS:
|
|
|
|
|
(self.library_root / folder_name).mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
|
|
|
|
if not self.index_path.exists():
|
|
|
|
|
self._save_index({"version": 1, "documents": []})
|
|
|
|
|
|
|
|
|
|
index = self._load_index()
|
|
|
|
|
if self._reconcile_index(index):
|
|
|
|
|
self._save_index(index)
|
|
|
|
|
|
|
|
|
|
def list_library(self) -> KnowledgeLibraryRead:
|
|
|
|
|
documents = self._load_documents()
|
|
|
|
|
folders = [
|
|
|
|
|
KnowledgeFolderRead(
|
|
|
|
|
name=folder_name,
|
|
|
|
|
count=sum(1 for item in documents if item.folder == folder_name),
|
|
|
|
|
icon="mdi mdi-folder-open" if folder_name == "差旅规范" else "mdi mdi-folder",
|
|
|
|
|
)
|
|
|
|
|
for folder_name in FIXED_KNOWLEDGE_FOLDERS
|
|
|
|
|
]
|
|
|
|
|
return KnowledgeLibraryRead(folders=folders, documents=documents)
|
|
|
|
|
|
|
|
|
|
def get_document_detail(self, document_id: str) -> KnowledgeDocumentDetailRead:
|
|
|
|
|
self.ensure_library_ready()
|
|
|
|
|
index = self._load_index()
|
|
|
|
|
entry = self._require_entry(index, document_id)
|
|
|
|
|
preview_kind, preview_pages = self._build_preview(entry)
|
|
|
|
|
document = self._serialize_document(entry)
|
|
|
|
|
return KnowledgeDocumentDetailRead(
|
|
|
|
|
**document.model_dump(),
|
|
|
|
|
previewKind=preview_kind,
|
|
|
|
|
previewPages=preview_pages,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def upload_document(
|
|
|
|
|
self,
|
|
|
|
|
folder: str,
|
|
|
|
|
filename: str,
|
|
|
|
|
content: bytes,
|
|
|
|
|
current_user: CurrentUserContext,
|
|
|
|
|
) -> KnowledgeDocumentDetailRead:
|
|
|
|
|
self.ensure_library_ready()
|
|
|
|
|
normalized_folder = self._normalize_folder(folder)
|
|
|
|
|
normalized_name = self._normalize_filename(filename)
|
|
|
|
|
|
|
|
|
|
if not content:
|
|
|
|
|
raise ValueError("上传文件不能为空。")
|
|
|
|
|
|
|
|
|
|
index = self._load_index()
|
|
|
|
|
existing_entry = next(
|
|
|
|
|
(
|
|
|
|
|
item
|
|
|
|
|
for item in index["documents"]
|
|
|
|
|
if item["folder"] == normalized_folder
|
|
|
|
|
and item["original_name"].lower() == normalized_name.lower()
|
|
|
|
|
),
|
|
|
|
|
None,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
document_id = existing_entry["id"] if existing_entry else uuid4().hex
|
|
|
|
|
stored_name = f"{document_id}__{normalized_name}"
|
|
|
|
|
target_path = self.library_root / normalized_folder / stored_name
|
|
|
|
|
|
|
|
|
|
if existing_entry is not None and existing_entry["stored_name"] != stored_name:
|
|
|
|
|
old_path = self.library_root / existing_entry["folder"] / existing_entry["stored_name"]
|
|
|
|
|
if old_path.exists():
|
|
|
|
|
old_path.unlink()
|
|
|
|
|
|
|
|
|
|
target_path.write_bytes(content)
|
|
|
|
|
|
|
|
|
|
now = datetime.now(UTC).isoformat()
|
|
|
|
|
mime_type = mimetypes.guess_type(normalized_name)[0] or "application/octet-stream"
|
|
|
|
|
checksum = hashlib.sha256(content).hexdigest()
|
|
|
|
|
extension = self._extract_extension(normalized_name)
|
|
|
|
|
|
|
|
|
|
if existing_entry is None:
|
|
|
|
|
entry = {
|
|
|
|
|
"id": document_id,
|
|
|
|
|
"folder": normalized_folder,
|
|
|
|
|
"original_name": normalized_name,
|
|
|
|
|
"stored_name": stored_name,
|
|
|
|
|
"mime_type": mime_type,
|
|
|
|
|
"extension": extension,
|
|
|
|
|
"size_bytes": len(content),
|
|
|
|
|
"sha256": checksum,
|
|
|
|
|
"created_at": now,
|
|
|
|
|
"updated_at": now,
|
|
|
|
|
"uploaded_by": current_user.name,
|
|
|
|
|
"version_number": 1,
|
|
|
|
|
}
|
|
|
|
|
index["documents"].append(entry)
|
|
|
|
|
logger.info(
|
|
|
|
|
"Knowledge document uploaded id=%s folder=%s filename=%s by=%s",
|
|
|
|
|
document_id,
|
|
|
|
|
normalized_folder,
|
|
|
|
|
normalized_name,
|
|
|
|
|
current_user.name,
|
|
|
|
|
)
|
|
|
|
|
else:
|
|
|
|
|
existing_entry.update(
|
|
|
|
|
{
|
|
|
|
|
"stored_name": stored_name,
|
|
|
|
|
"mime_type": mime_type,
|
|
|
|
|
"extension": extension,
|
|
|
|
|
"size_bytes": len(content),
|
|
|
|
|
"sha256": checksum,
|
|
|
|
|
"updated_at": now,
|
|
|
|
|
"uploaded_by": current_user.name,
|
|
|
|
|
"version_number": int(existing_entry.get("version_number", 1)) + 1,
|
|
|
|
|
}
|
|
|
|
|
)
|
|
|
|
|
entry = existing_entry
|
|
|
|
|
logger.info(
|
|
|
|
|
"Knowledge document updated id=%s folder=%s filename=%s by=%s",
|
|
|
|
|
document_id,
|
|
|
|
|
normalized_folder,
|
|
|
|
|
normalized_name,
|
|
|
|
|
current_user.name,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
self._save_index(index)
|
|
|
|
|
return self.get_document_detail(document_id)
|
|
|
|
|
|
|
|
|
|
def delete_document(self, document_id: str) -> None:
|
|
|
|
|
self.ensure_library_ready()
|
|
|
|
|
index = self._load_index()
|
|
|
|
|
entry = self._require_entry(index, document_id)
|
|
|
|
|
file_path = self._resolve_document_path(entry)
|
|
|
|
|
if file_path.exists():
|
|
|
|
|
file_path.unlink()
|
|
|
|
|
|
|
|
|
|
index["documents"] = [item for item in index["documents"] if item["id"] != document_id]
|
|
|
|
|
self._save_index(index)
|
|
|
|
|
logger.info("Knowledge document deleted id=%s filename=%s", document_id, entry["original_name"])
|
|
|
|
|
|
|
|
|
|
def get_document_content(self, document_id: str) -> tuple[Path, str, str]:
|
|
|
|
|
self.ensure_library_ready()
|
|
|
|
|
index = self._load_index()
|
|
|
|
|
entry = self._require_entry(index, document_id)
|
|
|
|
|
file_path = self._resolve_document_path(entry)
|
|
|
|
|
|
|
|
|
|
if not file_path.exists():
|
|
|
|
|
raise FileNotFoundError(entry["original_name"])
|
|
|
|
|
|
|
|
|
|
return file_path, entry["mime_type"], entry["original_name"]
|
|
|
|
|
|
2026-05-09 04:25:30 +00:00
|
|
|
def build_onlyoffice_config(
|
|
|
|
|
self,
|
|
|
|
|
document_id: str,
|
|
|
|
|
current_user: CurrentUserContext,
|
|
|
|
|
) -> KnowledgeOnlyOfficeConfigRead:
|
|
|
|
|
self.ensure_library_ready()
|
|
|
|
|
settings = get_settings()
|
|
|
|
|
if not settings.onlyoffice_enabled:
|
|
|
|
|
raise ValueError("ONLYOFFICE 预览未启用。")
|
|
|
|
|
if not settings.onlyoffice_public_url or not settings.onlyoffice_backend_url:
|
|
|
|
|
raise ValueError("ONLYOFFICE 地址配置不完整。")
|
|
|
|
|
if not settings.onlyoffice_jwt_secret:
|
|
|
|
|
raise ValueError("ONLYOFFICE JWT 密钥未配置。")
|
|
|
|
|
|
|
|
|
|
index = self._load_index()
|
|
|
|
|
entry = self._require_entry(index, document_id)
|
|
|
|
|
extension = self._extract_extension(entry["original_name"])
|
|
|
|
|
if extension not in ONLYOFFICE_EDITABLE_EXTENSIONS:
|
|
|
|
|
raise ValueError("当前文件格式不支持 ONLYOFFICE 预览。")
|
|
|
|
|
|
|
|
|
|
document_type = self._resolve_onlyoffice_document_type(extension)
|
|
|
|
|
backend_base_url = settings.onlyoffice_backend_url.rstrip("/")
|
|
|
|
|
public_url = settings.onlyoffice_public_url.rstrip("/")
|
|
|
|
|
access_token = self._build_onlyoffice_access_token(document_id)
|
|
|
|
|
document_url = (
|
|
|
|
|
f"{backend_base_url}{settings.api_v1_prefix}/knowledge/documents/{document_id}/onlyoffice/content"
|
|
|
|
|
f"?access_token={access_token}"
|
|
|
|
|
)
|
|
|
|
|
callback_url = (
|
|
|
|
|
f"{backend_base_url}{settings.api_v1_prefix}/knowledge/documents/{document_id}/onlyoffice/callback"
|
|
|
|
|
)
|
|
|
|
|
can_edit = current_user.is_admin or "manager" in current_user.role_codes
|
|
|
|
|
document_key = self._build_onlyoffice_document_key(entry)
|
|
|
|
|
|
|
|
|
|
config: dict[str, Any] = {
|
|
|
|
|
"documentType": document_type,
|
|
|
|
|
"document": {
|
|
|
|
|
"fileType": extension,
|
|
|
|
|
"key": document_key,
|
|
|
|
|
"title": entry["original_name"],
|
|
|
|
|
"url": document_url,
|
|
|
|
|
"permissions": {
|
|
|
|
|
"download": True,
|
|
|
|
|
"edit": can_edit,
|
|
|
|
|
"print": True,
|
|
|
|
|
"copy": True,
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
"editorConfig": {
|
|
|
|
|
"mode": "edit" if can_edit else "view",
|
|
|
|
|
"lang": "zh-CN",
|
|
|
|
|
"callbackUrl": callback_url,
|
|
|
|
|
"user": {
|
|
|
|
|
"id": current_user.username,
|
|
|
|
|
"name": current_user.name,
|
|
|
|
|
},
|
|
|
|
|
"customization": {
|
|
|
|
|
"compactHeader": True,
|
|
|
|
|
"compactToolbar": True,
|
|
|
|
|
"toolbarNoTabs": False,
|
|
|
|
|
"autosave": can_edit,
|
|
|
|
|
"forcesave": can_edit,
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
"width": "100%",
|
|
|
|
|
"height": "100%",
|
|
|
|
|
}
|
|
|
|
|
config["token"] = jwt.encode(config, settings.onlyoffice_jwt_secret, algorithm="HS256")
|
|
|
|
|
|
|
|
|
|
return KnowledgeOnlyOfficeConfigRead(
|
|
|
|
|
documentServerUrl=public_url,
|
|
|
|
|
config=config,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def validate_onlyoffice_access_token(self, document_id: str, access_token: str) -> None:
|
|
|
|
|
settings = get_settings()
|
|
|
|
|
try:
|
|
|
|
|
payload = jwt.decode(
|
|
|
|
|
access_token,
|
|
|
|
|
settings.onlyoffice_jwt_secret,
|
|
|
|
|
algorithms=["HS256"],
|
|
|
|
|
)
|
|
|
|
|
except jwt.PyJWTError as exc:
|
|
|
|
|
raise ValueError("ONLYOFFICE 文件访问令牌无效。") from exc
|
|
|
|
|
|
|
|
|
|
if payload.get("scope") != "onlyoffice-content" or payload.get("document_id") != document_id:
|
|
|
|
|
raise ValueError("ONLYOFFICE 文件访问令牌无效。")
|
|
|
|
|
|
|
|
|
|
def handle_onlyoffice_callback(self, document_id: str, payload: dict[str, Any]) -> None:
|
|
|
|
|
self.ensure_library_ready()
|
|
|
|
|
callback = self._parse_onlyoffice_callback(payload)
|
|
|
|
|
if callback.status not in {2, 6} or not callback.download_url:
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
logger.info(
|
|
|
|
|
"ONLYOFFICE callback received id=%s status=%s users=%s",
|
|
|
|
|
document_id,
|
|
|
|
|
callback.status,
|
|
|
|
|
",".join(callback.users) if callback.users else "-",
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
request = Request(callback.download_url, headers={"User-Agent": "x-financial-onlyoffice"})
|
|
|
|
|
with urlopen(request, timeout=30) as response: # noqa: S310
|
|
|
|
|
content = response.read()
|
|
|
|
|
|
|
|
|
|
actor_name = callback.users[0] if callback.users else "ONLYOFFICE"
|
|
|
|
|
self._replace_document_content(document_id, content, actor_name=actor_name)
|
|
|
|
|
|
2026-05-09 03:04:09 +00:00
|
|
|
def _load_documents(self) -> list[KnowledgeDocumentRead]:
|
|
|
|
|
self.ensure_library_ready()
|
|
|
|
|
index = self._load_index()
|
|
|
|
|
self._reconcile_index(index)
|
|
|
|
|
self._save_index(index)
|
|
|
|
|
|
|
|
|
|
documents = [self._serialize_document(entry) for entry in index["documents"]]
|
|
|
|
|
return sorted(documents, key=lambda item: item.time, reverse=True)
|
|
|
|
|
|
|
|
|
|
def _serialize_document(self, entry: dict[str, Any]) -> KnowledgeDocumentRead:
|
|
|
|
|
extension = entry.get("extension") or self._extract_extension(entry["original_name"])
|
|
|
|
|
file_type = self._resolve_file_type(extension)
|
|
|
|
|
size_bytes = int(entry.get("size_bytes") or 0)
|
|
|
|
|
updated_at = self._format_time(entry.get("updated_at") or entry.get("created_at"))
|
|
|
|
|
|
|
|
|
|
return KnowledgeDocumentRead(
|
|
|
|
|
id=entry["id"],
|
|
|
|
|
name=entry["original_name"],
|
|
|
|
|
folder=entry["folder"],
|
|
|
|
|
tag=f"{entry['folder']} / {extension.upper() or 'FILE'}",
|
|
|
|
|
time=updated_at,
|
|
|
|
|
version=f"v{int(entry.get('version_number', 1))}.0",
|
|
|
|
|
state="已发布",
|
|
|
|
|
stateTone="success",
|
|
|
|
|
owner=entry.get("uploaded_by") or "系统导入",
|
|
|
|
|
icon=ICON_BY_TYPE.get(file_type, ICON_BY_TYPE["binary"]),
|
|
|
|
|
fileType=file_type,
|
|
|
|
|
fileTypeLabel=self._resolve_file_type_label(file_type),
|
|
|
|
|
summary=f"{entry['folder']} · {extension.upper() or 'FILE'} · {self._format_size(size_bytes)}",
|
|
|
|
|
mimeType=entry.get("mime_type") or "application/octet-stream",
|
|
|
|
|
extension=extension,
|
|
|
|
|
sizeBytes=size_bytes,
|
|
|
|
|
canPreview=self._can_preview(extension),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def _build_preview(
|
|
|
|
|
self, entry: dict[str, Any]
|
|
|
|
|
) -> tuple[str, list[KnowledgePreviewPageRead]]:
|
|
|
|
|
extension = self._extract_extension(entry["original_name"])
|
|
|
|
|
file_path = self._resolve_document_path(entry)
|
|
|
|
|
|
|
|
|
|
if extension == "pdf":
|
|
|
|
|
return "pdf", []
|
|
|
|
|
|
|
|
|
|
if extension in IMAGE_EXTENSIONS:
|
|
|
|
|
return "image", []
|
|
|
|
|
|
|
|
|
|
if extension in TEXT_EXTENSIONS:
|
|
|
|
|
text = self._read_text_preview(file_path)
|
|
|
|
|
return "text", [self._build_text_preview_page(entry, text)]
|
|
|
|
|
|
|
|
|
|
if extension == "docx":
|
|
|
|
|
text = self._extract_docx_text(file_path)
|
|
|
|
|
return "text", [self._build_text_preview_page(entry, text)]
|
|
|
|
|
|
|
|
|
|
if extension == "xlsx":
|
2026-05-09 04:25:30 +00:00
|
|
|
return "table", self._build_xlsx_preview_pages(entry, file_path)
|
2026-05-09 03:04:09 +00:00
|
|
|
|
|
|
|
|
if extension == "pptx":
|
|
|
|
|
return "slides", self._build_pptx_preview_pages(entry, file_path)
|
|
|
|
|
|
|
|
|
|
return (
|
|
|
|
|
"unsupported",
|
|
|
|
|
[
|
|
|
|
|
KnowledgePreviewPageRead(
|
|
|
|
|
title=entry["original_name"],
|
|
|
|
|
subtitle="当前格式暂不支持在线解析预览。",
|
|
|
|
|
stats=[
|
|
|
|
|
KnowledgePreviewStatRead(label="文件格式", value=extension.upper() or "FILE"),
|
|
|
|
|
KnowledgePreviewStatRead(label="文件大小", value=self._format_size(entry["size_bytes"])),
|
|
|
|
|
KnowledgePreviewStatRead(label="建议操作", value="下载后查看"),
|
|
|
|
|
],
|
|
|
|
|
blocks=[
|
|
|
|
|
KnowledgePreviewBlockRead(
|
|
|
|
|
heading="预览说明",
|
|
|
|
|
lines=[
|
|
|
|
|
"当前系统已支持该文件的上传、下载和权限控制。",
|
|
|
|
|
"如需在线预览,可后续接入专门的文档转换服务。",
|
|
|
|
|
],
|
|
|
|
|
)
|
|
|
|
|
],
|
|
|
|
|
)
|
|
|
|
|
],
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def _build_text_preview_page(
|
|
|
|
|
self, entry: dict[str, Any], text: str
|
|
|
|
|
) -> KnowledgePreviewPageRead:
|
|
|
|
|
lines = [line.strip() for line in text.splitlines() if line.strip()]
|
|
|
|
|
if not lines:
|
|
|
|
|
lines = ["文件内容为空,或当前文档未提取到可展示文本。"]
|
|
|
|
|
|
|
|
|
|
groups = [lines[index : index + 8] for index in range(0, min(len(lines), 24), 8)]
|
|
|
|
|
blocks = [
|
|
|
|
|
KnowledgePreviewBlockRead(heading=f"内容片段 {index + 1}", lines=group)
|
|
|
|
|
for index, group in enumerate(groups)
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
return KnowledgePreviewPageRead(
|
|
|
|
|
title=entry["original_name"],
|
|
|
|
|
subtitle="文本提取预览",
|
|
|
|
|
stats=[
|
|
|
|
|
KnowledgePreviewStatRead(label="文件格式", value=entry["extension"].upper() or "TEXT"),
|
|
|
|
|
KnowledgePreviewStatRead(label="可见行数", value=str(len(lines))),
|
|
|
|
|
KnowledgePreviewStatRead(label="文件大小", value=self._format_size(entry["size_bytes"])),
|
|
|
|
|
],
|
|
|
|
|
blocks=blocks,
|
|
|
|
|
)
|
|
|
|
|
|
2026-05-09 04:25:30 +00:00
|
|
|
def _build_xlsx_preview_pages(
|
2026-05-09 03:04:09 +00:00
|
|
|
self, entry: dict[str, Any], file_path: Path
|
2026-05-09 04:25:30 +00:00
|
|
|
) -> list[KnowledgePreviewPageRead]:
|
|
|
|
|
sheets = self._extract_xlsx_sheets(file_path)
|
|
|
|
|
if not sheets:
|
|
|
|
|
sheets = [("Sheet 1", [["未提取到表格内容。"]])]
|
|
|
|
|
|
|
|
|
|
preview_pages: list[KnowledgePreviewPageRead] = []
|
|
|
|
|
sheet_count = len(sheets)
|
|
|
|
|
for sheet_name, rows in sheets[:8]:
|
|
|
|
|
visible_rows = rows[:12] if rows else [["未提取到表格内容。"]]
|
|
|
|
|
blocks = [
|
|
|
|
|
KnowledgePreviewBlockRead(
|
|
|
|
|
heading=f"第 {index + 1} 行",
|
|
|
|
|
lines=[" | ".join((cell or "") for cell in row)],
|
|
|
|
|
)
|
|
|
|
|
for index, row in enumerate(visible_rows)
|
|
|
|
|
]
|
2026-05-09 03:04:09 +00:00
|
|
|
|
2026-05-09 04:25:30 +00:00
|
|
|
preview_pages.append(
|
|
|
|
|
KnowledgePreviewPageRead(
|
|
|
|
|
title=sheet_name,
|
|
|
|
|
subtitle="表格内容预览",
|
|
|
|
|
stats=[
|
|
|
|
|
KnowledgePreviewStatRead(label="工作表数量", value=str(sheet_count)),
|
|
|
|
|
KnowledgePreviewStatRead(label="预览行数", value=str(len(visible_rows))),
|
|
|
|
|
KnowledgePreviewStatRead(label="文件大小", value=self._format_size(entry["size_bytes"])),
|
|
|
|
|
],
|
|
|
|
|
blocks=blocks,
|
|
|
|
|
)
|
2026-05-09 03:04:09 +00:00
|
|
|
)
|
|
|
|
|
|
2026-05-09 04:25:30 +00:00
|
|
|
return preview_pages
|
2026-05-09 03:04:09 +00:00
|
|
|
|
|
|
|
|
def _build_pptx_preview_pages(
|
|
|
|
|
self, entry: dict[str, Any], file_path: Path
|
|
|
|
|
) -> list[KnowledgePreviewPageRead]:
|
|
|
|
|
slides = self._extract_pptx_slides(file_path)
|
|
|
|
|
if not slides:
|
|
|
|
|
slides = [["未提取到幻灯片文本。"]]
|
|
|
|
|
|
|
|
|
|
pages: list[KnowledgePreviewPageRead] = []
|
|
|
|
|
for index, slide_lines in enumerate(slides[:8]):
|
|
|
|
|
pages.append(
|
|
|
|
|
KnowledgePreviewPageRead(
|
|
|
|
|
title=entry["original_name"],
|
|
|
|
|
subtitle=f"幻灯片 {index + 1}",
|
|
|
|
|
stats=[
|
|
|
|
|
KnowledgePreviewStatRead(label="页码", value=str(index + 1)),
|
|
|
|
|
KnowledgePreviewStatRead(label="文本条数", value=str(len(slide_lines))),
|
|
|
|
|
KnowledgePreviewStatRead(label="文件格式", value="PPTX"),
|
|
|
|
|
],
|
|
|
|
|
blocks=[
|
|
|
|
|
KnowledgePreviewBlockRead(
|
|
|
|
|
heading="幻灯片内容",
|
|
|
|
|
lines=slide_lines or ["该页未提取到文本内容。"],
|
|
|
|
|
)
|
|
|
|
|
],
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
return pages
|
|
|
|
|
|
|
|
|
|
def _load_index(self) -> dict[str, Any]:
|
|
|
|
|
try:
|
|
|
|
|
payload = json.loads(self.index_path.read_text(encoding="utf-8"))
|
|
|
|
|
except (FileNotFoundError, json.JSONDecodeError):
|
|
|
|
|
payload = {"version": 1, "documents": []}
|
|
|
|
|
payload.setdefault("documents", [])
|
|
|
|
|
return payload
|
|
|
|
|
|
|
|
|
|
def _save_index(self, index: dict[str, Any]) -> None:
|
|
|
|
|
self.index_path.write_text(
|
|
|
|
|
json.dumps(index, ensure_ascii=False, indent=2),
|
|
|
|
|
encoding="utf-8",
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def _reconcile_index(self, index: dict[str, Any]) -> bool:
|
|
|
|
|
changed = False
|
|
|
|
|
documents = index.setdefault("documents", [])
|
|
|
|
|
known_by_stored = {
|
|
|
|
|
(item["folder"], item["stored_name"]): item
|
|
|
|
|
for item in documents
|
|
|
|
|
if item.get("folder") and item.get("stored_name")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
existing_items: list[dict[str, Any]] = []
|
|
|
|
|
for item in documents:
|
|
|
|
|
file_path = self._resolve_document_path(item)
|
|
|
|
|
if file_path.exists():
|
|
|
|
|
item["size_bytes"] = file_path.stat().st_size
|
|
|
|
|
item["extension"] = self._extract_extension(item["original_name"])
|
|
|
|
|
item["mime_type"] = item.get("mime_type") or (
|
|
|
|
|
mimetypes.guess_type(item["original_name"])[0] or "application/octet-stream"
|
|
|
|
|
)
|
|
|
|
|
existing_items.append(item)
|
|
|
|
|
else:
|
|
|
|
|
changed = True
|
|
|
|
|
|
|
|
|
|
for folder_name in FIXED_KNOWLEDGE_FOLDERS:
|
|
|
|
|
folder_path = self.library_root / folder_name
|
|
|
|
|
for file_path in folder_path.iterdir():
|
|
|
|
|
if not file_path.is_file() or file_path.name.startswith("."):
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
key = (folder_name, file_path.name)
|
|
|
|
|
if key in known_by_stored:
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
document_id, original_name = self._parse_stored_name(file_path.name)
|
|
|
|
|
stat = file_path.stat()
|
|
|
|
|
existing_items.append(
|
|
|
|
|
{
|
|
|
|
|
"id": document_id,
|
|
|
|
|
"folder": folder_name,
|
|
|
|
|
"original_name": original_name,
|
|
|
|
|
"stored_name": file_path.name,
|
|
|
|
|
"mime_type": mimetypes.guess_type(original_name)[0]
|
|
|
|
|
or "application/octet-stream",
|
|
|
|
|
"extension": self._extract_extension(original_name),
|
|
|
|
|
"size_bytes": stat.st_size,
|
|
|
|
|
"sha256": "",
|
|
|
|
|
"created_at": datetime.fromtimestamp(stat.st_ctime, tz=UTC).isoformat(),
|
|
|
|
|
"updated_at": datetime.fromtimestamp(stat.st_mtime, tz=UTC).isoformat(),
|
|
|
|
|
"uploaded_by": "系统导入",
|
|
|
|
|
"version_number": 1,
|
|
|
|
|
}
|
|
|
|
|
)
|
|
|
|
|
changed = True
|
|
|
|
|
|
|
|
|
|
if changed or len(existing_items) != len(documents):
|
|
|
|
|
index["documents"] = existing_items
|
|
|
|
|
return True
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
def _require_entry(self, index: dict[str, Any], document_id: str) -> dict[str, Any]:
|
|
|
|
|
for entry in index["documents"]:
|
|
|
|
|
if entry["id"] == document_id:
|
|
|
|
|
return entry
|
|
|
|
|
raise FileNotFoundError(document_id)
|
|
|
|
|
|
|
|
|
|
def _resolve_document_path(self, entry: dict[str, Any]) -> Path:
|
|
|
|
|
return self.library_root / entry["folder"] / entry["stored_name"]
|
|
|
|
|
|
2026-05-09 04:25:30 +00:00
|
|
|
def _replace_document_content(self, document_id: str, content: bytes, actor_name: str) -> KnowledgeDocumentDetailRead:
|
|
|
|
|
index = self._load_index()
|
|
|
|
|
entry = self._require_entry(index, document_id)
|
|
|
|
|
current_user = CurrentUserContext(
|
|
|
|
|
username="onlyoffice",
|
|
|
|
|
name=actor_name or "ONLYOFFICE",
|
|
|
|
|
role_codes=["manager"],
|
|
|
|
|
is_admin=True,
|
|
|
|
|
)
|
|
|
|
|
return self.upload_document(
|
|
|
|
|
folder=entry["folder"],
|
|
|
|
|
filename=entry["original_name"],
|
|
|
|
|
content=content,
|
|
|
|
|
current_user=current_user,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def _parse_onlyoffice_callback(payload: dict[str, Any]) -> OnlyOfficeCallbackPayload:
|
|
|
|
|
status = int(payload.get("status") or 0)
|
|
|
|
|
download_url = str(payload.get("url") or "").strip()
|
|
|
|
|
users = [str(item).strip() for item in payload.get("users") or [] if str(item).strip()]
|
|
|
|
|
return OnlyOfficeCallbackPayload(status=status, download_url=download_url, users=users)
|
|
|
|
|
|
2026-05-09 03:04:09 +00:00
|
|
|
@staticmethod
|
|
|
|
|
def _normalize_filename(filename: str) -> str:
|
|
|
|
|
normalized = Path(str(filename or "").strip()).name.strip()
|
|
|
|
|
normalized = normalized.replace("/", "_").replace("\\", "_")
|
|
|
|
|
if not normalized:
|
|
|
|
|
raise ValueError("文件名不能为空。")
|
|
|
|
|
return normalized
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def _normalize_folder(folder: str) -> str:
|
|
|
|
|
normalized = str(folder or "").strip()
|
|
|
|
|
if normalized not in FIXED_KNOWLEDGE_FOLDERS:
|
|
|
|
|
raise ValueError("只能上传到预设知识库文件夹。")
|
|
|
|
|
return normalized
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def _extract_extension(filename: str) -> str:
|
|
|
|
|
suffix = Path(filename).suffix.lower().lstrip(".")
|
|
|
|
|
return suffix
|
|
|
|
|
|
2026-05-09 04:25:30 +00:00
|
|
|
@staticmethod
|
|
|
|
|
def _build_onlyoffice_document_key(entry: dict[str, Any]) -> str:
|
|
|
|
|
version = int(entry.get("version_number", 1))
|
|
|
|
|
checksum = str(entry.get("sha256") or "")[:12]
|
|
|
|
|
return f"{entry['id']}-v{version}-{checksum or 'nochecksum'}"
|
|
|
|
|
|
|
|
|
|
def _build_onlyoffice_access_token(self, document_id: str) -> str:
|
|
|
|
|
settings = get_settings()
|
|
|
|
|
payload = {
|
|
|
|
|
"scope": "onlyoffice-content",
|
|
|
|
|
"document_id": document_id,
|
|
|
|
|
}
|
|
|
|
|
return jwt.encode(payload, settings.onlyoffice_jwt_secret, algorithm="HS256")
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def _resolve_onlyoffice_document_type(extension: str) -> str:
|
|
|
|
|
if extension in WORD_EXTENSIONS:
|
|
|
|
|
return "word"
|
|
|
|
|
if extension in EXCEL_EXTENSIONS:
|
|
|
|
|
return "cell"
|
|
|
|
|
if extension in PPT_EXTENSIONS:
|
|
|
|
|
return "slide"
|
|
|
|
|
raise ValueError("当前文件格式不支持 ONLYOFFICE 预览。")
|
|
|
|
|
|
2026-05-09 03:04:09 +00:00
|
|
|
@staticmethod
|
|
|
|
|
def _parse_stored_name(stored_name: str) -> tuple[str, str]:
|
|
|
|
|
if "__" not in stored_name:
|
|
|
|
|
return uuid4().hex, stored_name
|
|
|
|
|
document_id, original_name = stored_name.split("__", 1)
|
|
|
|
|
return document_id or uuid4().hex, original_name or stored_name
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def _format_time(value: str | None) -> str:
|
|
|
|
|
if not value:
|
|
|
|
|
return ""
|
|
|
|
|
try:
|
|
|
|
|
parsed = datetime.fromisoformat(value)
|
|
|
|
|
except ValueError:
|
|
|
|
|
return value
|
|
|
|
|
return parsed.astimezone(UTC).strftime("%Y-%m-%d %H:%M")
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def _format_size(size_bytes: int) -> str:
|
|
|
|
|
if size_bytes < 1024:
|
|
|
|
|
return f"{size_bytes} B"
|
|
|
|
|
if size_bytes < 1024 * 1024:
|
|
|
|
|
return f"{size_bytes / 1024:.1f} KB"
|
|
|
|
|
return f"{size_bytes / (1024 * 1024):.1f} MB"
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def _resolve_file_type(extension: str) -> str:
|
|
|
|
|
if extension == "pdf":
|
|
|
|
|
return "pdf"
|
|
|
|
|
if extension in WORD_EXTENSIONS:
|
|
|
|
|
return "word"
|
|
|
|
|
if extension in EXCEL_EXTENSIONS:
|
|
|
|
|
return "excel"
|
|
|
|
|
if extension in PPT_EXTENSIONS:
|
|
|
|
|
return "ppt"
|
|
|
|
|
if extension in IMAGE_EXTENSIONS:
|
|
|
|
|
return "image"
|
|
|
|
|
if extension in TEXT_EXTENSIONS:
|
|
|
|
|
return "text"
|
|
|
|
|
if extension in ARCHIVE_EXTENSIONS:
|
|
|
|
|
return "archive"
|
|
|
|
|
return "binary"
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def _resolve_file_type_label(file_type: str) -> str:
|
|
|
|
|
mapping = {
|
|
|
|
|
"pdf": "PDF 预览",
|
|
|
|
|
"word": "Word 预览",
|
|
|
|
|
"excel": "Excel 预览",
|
|
|
|
|
"ppt": "PPT 预览",
|
|
|
|
|
"image": "图片预览",
|
|
|
|
|
"text": "文本预览",
|
|
|
|
|
"archive": "压缩包",
|
|
|
|
|
"binary": "文件预览",
|
|
|
|
|
}
|
|
|
|
|
return mapping.get(file_type, "文件预览")
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def _can_preview(extension: str) -> bool:
|
|
|
|
|
return extension in INLINE_PREVIEW_EXTENSIONS or extension in STRUCTURED_PREVIEW_EXTENSIONS
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def _read_text_preview(file_path: Path) -> str:
|
|
|
|
|
encodings = ("utf-8", "utf-8-sig", "gbk")
|
|
|
|
|
for encoding in encodings:
|
|
|
|
|
try:
|
|
|
|
|
return file_path.read_text(encoding=encoding)
|
|
|
|
|
except UnicodeDecodeError:
|
|
|
|
|
continue
|
|
|
|
|
return "当前文本文件编码暂不支持在线解析。"
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def _extract_docx_text(file_path: Path) -> str:
|
|
|
|
|
try:
|
|
|
|
|
with ZipFile(file_path) as archive:
|
|
|
|
|
xml_content = archive.read("word/document.xml")
|
|
|
|
|
except (BadZipFile, KeyError):
|
|
|
|
|
return "当前 Word 文件解析失败。"
|
|
|
|
|
|
|
|
|
|
root = ElementTree.fromstring(xml_content)
|
|
|
|
|
texts = [node.text.strip() for node in root.iter() if node.tag.endswith("}t") and node.text]
|
|
|
|
|
return "\n".join(texts)
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
2026-05-09 04:25:30 +00:00
|
|
|
def _extract_xlsx_sheets(file_path: Path) -> list[tuple[str, list[list[str]]]]:
|
2026-05-09 03:04:09 +00:00
|
|
|
try:
|
|
|
|
|
with ZipFile(file_path) as archive:
|
|
|
|
|
shared_strings: list[str] = []
|
|
|
|
|
if "xl/sharedStrings.xml" in archive.namelist():
|
|
|
|
|
shared_root = ElementTree.fromstring(archive.read("xl/sharedStrings.xml"))
|
|
|
|
|
shared_strings = [
|
|
|
|
|
"".join(node.itertext()).strip()
|
|
|
|
|
for node in shared_root.iter()
|
|
|
|
|
if node.tag.endswith("}si")
|
|
|
|
|
]
|
|
|
|
|
|
2026-05-09 04:25:30 +00:00
|
|
|
sheet_files = sorted(
|
2026-05-09 03:04:09 +00:00
|
|
|
name
|
|
|
|
|
for name in archive.namelist()
|
|
|
|
|
if re.fullmatch(r"xl/worksheets/sheet\d+\.xml", name)
|
|
|
|
|
)
|
2026-05-09 04:25:30 +00:00
|
|
|
if not sheet_files:
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
relationship_targets: dict[str, str] = {}
|
|
|
|
|
if "xl/_rels/workbook.xml.rels" in archive.namelist():
|
|
|
|
|
rel_root = ElementTree.fromstring(archive.read("xl/_rels/workbook.xml.rels"))
|
|
|
|
|
for node in rel_root.iter():
|
|
|
|
|
if not node.tag.endswith("Relationship"):
|
|
|
|
|
continue
|
|
|
|
|
rel_id = node.attrib.get("Id")
|
|
|
|
|
target = node.attrib.get("Target")
|
|
|
|
|
if not rel_id or not target:
|
|
|
|
|
continue
|
|
|
|
|
normalized = target.lstrip("/")
|
|
|
|
|
if not normalized.startswith("xl/"):
|
|
|
|
|
normalized = f"xl/{normalized.lstrip('./')}"
|
|
|
|
|
relationship_targets[rel_id] = normalized
|
|
|
|
|
|
|
|
|
|
ordered_sheets: list[tuple[str, str]] = []
|
|
|
|
|
if "xl/workbook.xml" in archive.namelist():
|
|
|
|
|
workbook_root = ElementTree.fromstring(archive.read("xl/workbook.xml"))
|
|
|
|
|
for index, node in enumerate(workbook_root.iter()):
|
|
|
|
|
if not node.tag.endswith("sheet"):
|
|
|
|
|
continue
|
|
|
|
|
sheet_name = node.attrib.get("name") or f"Sheet {index + 1}"
|
|
|
|
|
relationship_id = next(
|
|
|
|
|
(value for key, value in node.attrib.items() if key.endswith("}id")),
|
|
|
|
|
None,
|
|
|
|
|
)
|
|
|
|
|
target = relationship_targets.get(relationship_id or "")
|
|
|
|
|
if target:
|
|
|
|
|
ordered_sheets.append((sheet_name, target))
|
|
|
|
|
|
|
|
|
|
if not ordered_sheets:
|
|
|
|
|
ordered_sheets = [
|
|
|
|
|
(f"Sheet {index + 1}", sheet_file)
|
|
|
|
|
for index, sheet_file in enumerate(sheet_files)
|
|
|
|
|
]
|
2026-05-09 03:04:09 +00:00
|
|
|
|
2026-05-09 04:25:30 +00:00
|
|
|
preview_sheets: list[tuple[str, list[list[str]]]] = []
|
|
|
|
|
for sheet_name, target in ordered_sheets:
|
|
|
|
|
if target not in archive.namelist():
|
2026-05-09 03:04:09 +00:00
|
|
|
continue
|
2026-05-09 04:25:30 +00:00
|
|
|
|
|
|
|
|
sheet_root = ElementTree.fromstring(archive.read(target))
|
|
|
|
|
rows: list[list[str]] = []
|
|
|
|
|
for row in sheet_root.iter():
|
|
|
|
|
if not row.tag.endswith("}row"):
|
2026-05-09 03:04:09 +00:00
|
|
|
continue
|
2026-05-09 04:25:30 +00:00
|
|
|
row_values: list[str] = []
|
|
|
|
|
for cell in row:
|
|
|
|
|
if not cell.tag.endswith("}c"):
|
|
|
|
|
continue
|
|
|
|
|
cell_type = cell.attrib.get("t")
|
|
|
|
|
value_node = next((item for item in cell if item.tag.endswith("}v")), None)
|
|
|
|
|
|
|
|
|
|
if cell_type == "inlineStr":
|
|
|
|
|
text_node = next((item for item in cell.iter() if item.tag.endswith("}t")), None)
|
|
|
|
|
row_values.append((text_node.text or "").strip() if text_node is not None else "")
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
if value_node is None or value_node.text is None:
|
|
|
|
|
row_values.append("")
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
raw_value = value_node.text.strip()
|
|
|
|
|
if cell_type == "s" and raw_value.isdigit():
|
|
|
|
|
index = int(raw_value)
|
|
|
|
|
row_values.append(shared_strings[index] if index < len(shared_strings) else raw_value)
|
|
|
|
|
else:
|
|
|
|
|
row_values.append(raw_value)
|
|
|
|
|
if row_values:
|
|
|
|
|
rows.append(row_values)
|
|
|
|
|
|
|
|
|
|
preview_sheets.append((sheet_name, rows))
|
|
|
|
|
|
|
|
|
|
return preview_sheets
|
2026-05-09 03:04:09 +00:00
|
|
|
except (BadZipFile, ElementTree.ParseError, KeyError, ValueError):
|
2026-05-09 04:25:30 +00:00
|
|
|
return []
|
2026-05-09 03:04:09 +00:00
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def _extract_pptx_slides(file_path: Path) -> list[list[str]]:
|
|
|
|
|
try:
|
|
|
|
|
with ZipFile(file_path) as archive:
|
|
|
|
|
slide_names = sorted(
|
|
|
|
|
name
|
|
|
|
|
for name in archive.namelist()
|
|
|
|
|
if re.fullmatch(r"ppt/slides/slide\d+\.xml", name)
|
|
|
|
|
)
|
|
|
|
|
slides: list[list[str]] = []
|
|
|
|
|
for slide_name in slide_names:
|
|
|
|
|
root = ElementTree.fromstring(archive.read(slide_name))
|
|
|
|
|
texts = [node.text.strip() for node in root.iter() if node.tag.endswith("}t") and node.text]
|
|
|
|
|
slides.append(texts)
|
|
|
|
|
return slides
|
|
|
|
|
except (BadZipFile, ElementTree.ParseError, KeyError):
|
|
|
|
|
return []
|