Files
X-Financial/server/src/app/services/knowledge.py

1195 lines
48 KiB
Python
Raw Normal View History

from __future__ import annotations
import hashlib
import json
import mimetypes
import re
import shutil
import subprocess
from dataclasses import dataclass
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
from urllib.request import Request, urlopen
from uuid import uuid4
from xml.etree import ElementTree
from zipfile import BadZipFile, ZipFile
import jwt
from app.api.deps import CurrentUserContext
from app.core.config import get_settings
from app.core.logging import get_logger
from app.schemas.knowledge import (
KnowledgeDocumentDetailRead,
KnowledgeDocumentRead,
KnowledgeFolderRead,
KnowledgeLibraryRead,
KnowledgeOnlyOfficeConfigRead,
KnowledgePreviewBlockRead,
KnowledgePreviewPageRead,
KnowledgePreviewStatRead,
)
from app.services.settings import resolve_onlyoffice_settings
logger = get_logger("app.services.knowledge")
FIXED_KNOWLEDGE_FOLDERS = [
"财务知识库",
"制度政策",
"报销制度",
"差旅规范",
"发票管理",
"税务合规",
"预算管理",
"财务共享",
"培训资料",
"常见问答",
]
ICON_BY_TYPE = {
"pdf": "mdi mdi-file-document-outline-pdf pdf",
"word": "mdi mdi-file-document-outline-word word",
"excel": "mdi mdi-file-document-outline-excel excel",
"ppt": "mdi mdi-file-powerpoint-box ppt",
"image": "mdi mdi-file-image-outline image",
"text": "mdi mdi-file-document-outline text",
"archive": "mdi mdi-folder-zip-outline archive",
"binary": "mdi mdi-file-outline",
}
TEXT_EXTENSIONS = {"txt", "md", "csv", "json", "xml", "yml", "yaml", "log"}
WORD_EXTENSIONS = {"doc", "docx"}
EXCEL_EXTENSIONS = {"xls", "xlsx", "csv"}
PPT_EXTENSIONS = {"ppt", "pptx"}
IMAGE_EXTENSIONS = {"png", "jpg", "jpeg", "gif", "bmp", "webp", "svg"}
ARCHIVE_EXTENSIONS = {"zip", "rar", "7z"}
STRUCTURED_PREVIEW_EXTENSIONS = {"docx", "xlsx", "pptx"} | TEXT_EXTENSIONS
INLINE_PREVIEW_EXTENSIONS = {"pdf"} | IMAGE_EXTENSIONS
ONLYOFFICE_EDITABLE_EXTENSIONS = {"docx", "xlsx", "pptx"}
KNOWLEDGE_INGEST_SYNC_STALE_SECONDS = 90
KNOWLEDGE_INGEST_STATUS_PUBLISHED = 1
KNOWLEDGE_INGEST_STATUS_SYNCING = 2
KNOWLEDGE_INGEST_STATUS_INGESTED = 3
KNOWLEDGE_INGEST_STATUS_FAILED = 4
KNOWLEDGE_INGEST_STATUS_META = {
KNOWLEDGE_INGEST_STATUS_PUBLISHED: ("待归纳", "muted"),
KNOWLEDGE_INGEST_STATUS_SYNCING: ("正归纳", "warning"),
KNOWLEDGE_INGEST_STATUS_INGESTED: ("已归纳", "success"),
KNOWLEDGE_INGEST_STATUS_FAILED: ("归纳失败", "danger"),
}
@dataclass(slots=True)
class OnlyOfficeCallbackPayload:
status: int
download_url: str
users: list[str]
def prepare_knowledge_library() -> None:
KnowledgeService().ensure_library_ready()
class KnowledgeService:
def __init__(self, storage_root: Path | None = None) -> None:
settings = get_settings()
self.storage_root = Path(storage_root or settings.resolved_storage_root_dir)
self.library_root = self.storage_root / "knowledge"
self.index_path = self.library_root / ".index.json"
self.llm_wiki_root = self.library_root / ".llm_wiki"
self.llm_wiki_documents_root = self.llm_wiki_root / "documents"
self.llm_wiki_index_path = self.llm_wiki_root / "index.json"
self.llm_wiki_sync_runs_path = self.llm_wiki_root / "sync_runs.json"
def ensure_library_ready(self) -> None:
self.library_root.mkdir(parents=True, exist_ok=True)
for folder_name in FIXED_KNOWLEDGE_FOLDERS:
(self.library_root / folder_name).mkdir(parents=True, exist_ok=True)
self.llm_wiki_documents_root.mkdir(parents=True, exist_ok=True)
if not self.index_path.exists():
self._save_index({"version": 1, "documents": []})
if not self.llm_wiki_index_path.exists():
self.llm_wiki_index_path.write_text(
json.dumps({"documents": []}, ensure_ascii=False, indent=2),
encoding="utf-8",
)
if not self.llm_wiki_sync_runs_path.exists():
self.llm_wiki_sync_runs_path.write_text(
json.dumps({"runs": []}, ensure_ascii=False, indent=2),
encoding="utf-8",
)
index = self._load_index()
if self._reconcile_index(index):
self._save_index(index)
def list_library(self) -> KnowledgeLibraryRead:
documents = self._load_documents()
folders = [
KnowledgeFolderRead(
name=folder_name,
count=sum(1 for item in documents if item.folder == folder_name),
icon="mdi mdi-folder-open" if folder_name == "差旅规范" else "mdi mdi-folder",
)
for folder_name in FIXED_KNOWLEDGE_FOLDERS
]
return KnowledgeLibraryRead(folders=folders, documents=documents)
def get_document_detail(self, document_id: str) -> KnowledgeDocumentDetailRead:
self.ensure_library_ready()
index = self._load_index()
if self._reconcile_document_ingest_statuses(index, document_ids=[document_id]):
self._save_index(index)
entry = self._require_entry(index, document_id)
preview_kind, preview_pages = self._build_preview(entry)
document = self._serialize_document(entry)
return KnowledgeDocumentDetailRead(
**document.model_dump(),
previewKind=preview_kind,
previewPages=preview_pages,
)
def upload_document(
self,
folder: str,
filename: str,
content: bytes,
current_user: CurrentUserContext,
) -> KnowledgeDocumentDetailRead:
self.ensure_library_ready()
normalized_folder = self._normalize_folder(folder)
normalized_name = self._normalize_filename(filename)
if not content:
raise ValueError("上传文件不能为空。")
index = self._load_index()
existing_entry = next(
(
item
for item in index["documents"]
if item["folder"] == normalized_folder
and item["original_name"].lower() == normalized_name.lower()
),
None,
)
document_id = existing_entry["id"] if existing_entry else uuid4().hex
stored_name = f"{document_id}__{normalized_name}"
target_path = self.library_root / normalized_folder / stored_name
if existing_entry is not None and existing_entry["stored_name"] != stored_name:
old_path = self.library_root / existing_entry["folder"] / existing_entry["stored_name"]
if old_path.exists():
old_path.unlink()
target_path.write_bytes(content)
now = datetime.now(UTC).isoformat()
mime_type = mimetypes.guess_type(normalized_name)[0] or "application/octet-stream"
checksum = hashlib.sha256(content).hexdigest()
extension = self._extract_extension(normalized_name)
if existing_entry is None:
entry = {
"id": document_id,
"folder": normalized_folder,
"original_name": normalized_name,
"stored_name": stored_name,
"mime_type": mime_type,
"extension": extension,
"size_bytes": len(content),
"sha256": checksum,
"created_at": now,
"updated_at": now,
"uploaded_by": current_user.name,
"version_number": 1,
"ingest_status": KNOWLEDGE_INGEST_STATUS_PUBLISHED,
}
index["documents"].append(entry)
logger.info(
"Knowledge document uploaded id=%s folder=%s filename=%s by=%s",
document_id,
normalized_folder,
normalized_name,
current_user.name,
)
else:
existing_entry.update(
{
"stored_name": stored_name,
"mime_type": mime_type,
"extension": extension,
"size_bytes": len(content),
"sha256": checksum,
"updated_at": now,
"uploaded_by": current_user.name,
"version_number": int(existing_entry.get("version_number", 1)) + 1,
"ingest_status": KNOWLEDGE_INGEST_STATUS_PUBLISHED,
}
)
entry = existing_entry
logger.info(
"Knowledge document updated id=%s folder=%s filename=%s by=%s",
document_id,
normalized_folder,
normalized_name,
current_user.name,
)
self._save_index(index)
return self.get_document_detail(document_id)
def delete_document(self, document_id: str) -> None:
self.ensure_library_ready()
index = self._load_index()
entry = self._require_entry(index, document_id)
file_path = self._resolve_document_path(entry)
if file_path.exists():
file_path.unlink()
index["documents"] = [item for item in index["documents"] if item["id"] != document_id]
self._save_index(index)
logger.info("Knowledge document deleted id=%s filename=%s", document_id, entry["original_name"])
def get_document_content(self, document_id: str) -> tuple[Path, str, str]:
self.ensure_library_ready()
index = self._load_index()
entry = self._require_entry(index, document_id)
file_path = self._resolve_document_path(entry)
if not file_path.exists():
raise FileNotFoundError(entry["original_name"])
return file_path, entry["mime_type"], entry["original_name"]
def list_folder_documents(self, folder: str | None = None) -> list[dict[str, Any]]:
self.ensure_library_ready()
index = self._load_index()
if self._reconcile_document_ingest_statuses(index):
self._save_index(index)
documents = list(index.get("documents") or [])
if folder is None:
return documents
normalized_folder = self._normalize_folder(folder)
return [item for item in documents if item.get("folder") == normalized_folder]
def get_document_entry(self, document_id: str) -> dict[str, Any]:
self.ensure_library_ready()
index = self._load_index()
if self._reconcile_document_ingest_statuses(index, document_ids=[document_id]):
self._save_index(index)
return dict(self._require_entry(index, document_id))
def set_document_ingest_statuses(self, document_ids: list[str], status_code: int) -> None:
self.ensure_library_ready()
normalized_ids = {str(item).strip() for item in document_ids if str(item).strip()}
if not normalized_ids:
return
index = self._load_index()
changed = False
updated_at = datetime.now(UTC).isoformat()
for entry in index.get("documents", []):
if str(entry.get("id") or "").strip() not in normalized_ids:
continue
if self._normalize_ingest_status_code(entry.get("ingest_status")) == status_code:
continue
entry["ingest_status"] = status_code
entry["ingest_status_updated_at"] = updated_at
changed = True
if changed:
self._save_index(index)
def refresh_document_ingest_statuses(
self,
document_ids: list[str] | None = None,
*,
preserve_syncing: bool = True,
) -> None:
self.ensure_library_ready()
index = self._load_index()
if self._reconcile_document_ingest_statuses(
index,
document_ids=document_ids,
preserve_syncing=preserve_syncing,
):
self._save_index(index)
def get_llm_wiki_root(self) -> Path:
self.ensure_library_ready()
return self.llm_wiki_root
def extract_document_text(self, document_id: str) -> str:
self.ensure_library_ready()
entry = self.get_document_entry(document_id)
file_path = self._resolve_document_path(entry)
if not file_path.exists():
raise FileNotFoundError(entry["original_name"])
return self._extract_document_text_from_path(
file_path=file_path,
original_name=str(entry.get("original_name") or file_path.name),
mime_type=str(entry.get("mime_type") or "application/octet-stream"),
)
def build_onlyoffice_config(
self,
document_id: str,
current_user: CurrentUserContext,
) -> KnowledgeOnlyOfficeConfigRead:
self.ensure_library_ready()
settings = get_settings()
onlyoffice_settings = resolve_onlyoffice_settings()
if not onlyoffice_settings.enabled:
logger.warning(
"ONLYOFFICE disabled in runtime config doc=%s enabled=%s public_url=%s backend_url=%s jwt_set=%s",
document_id,
onlyoffice_settings.enabled,
onlyoffice_settings.public_url,
onlyoffice_settings.backend_url,
bool(onlyoffice_settings.jwt_secret),
)
raise ValueError("ONLYOFFICE 预览未启用。")
if not onlyoffice_settings.public_url or not onlyoffice_settings.backend_url:
logger.warning(
"ONLYOFFICE config incomplete doc=%s enabled=%s public_url=%s backend_url=%s jwt_set=%s",
document_id,
onlyoffice_settings.enabled,
onlyoffice_settings.public_url,
onlyoffice_settings.backend_url,
bool(onlyoffice_settings.jwt_secret),
)
raise ValueError("ONLYOFFICE 地址配置不完整。")
if not onlyoffice_settings.jwt_secret:
logger.warning(
"ONLYOFFICE JWT missing doc=%s enabled=%s public_url=%s backend_url=%s jwt_set=%s",
document_id,
onlyoffice_settings.enabled,
onlyoffice_settings.public_url,
onlyoffice_settings.backend_url,
bool(onlyoffice_settings.jwt_secret),
)
raise ValueError("ONLYOFFICE JWT 密钥未配置。")
index = self._load_index()
entry = self._require_entry(index, document_id)
extension = self._extract_extension(entry["original_name"])
if extension not in ONLYOFFICE_EDITABLE_EXTENSIONS:
raise ValueError("当前文件格式不支持 ONLYOFFICE 预览。")
document_type = self._resolve_onlyoffice_document_type(extension)
backend_base_url = onlyoffice_settings.backend_url.rstrip("/")
public_url = onlyoffice_settings.public_url.rstrip("/")
access_token = self._build_onlyoffice_access_token(document_id)
document_url = (
f"{backend_base_url}{settings.api_v1_prefix}/knowledge/documents/{document_id}/onlyoffice/content"
f"?access_token={access_token}"
)
callback_url = (
f"{backend_base_url}{settings.api_v1_prefix}/knowledge/documents/{document_id}/onlyoffice/callback"
)
document_key = self._build_onlyoffice_document_key(entry)
config: dict[str, Any] = {
"documentType": document_type,
"document": {
"fileType": extension,
"key": document_key,
"title": entry["original_name"],
"url": document_url,
"permissions": {
"download": True,
"edit": False,
"print": True,
"copy": True,
},
},
"editorConfig": {
"mode": "view",
"lang": "zh-CN",
"callbackUrl": callback_url,
"user": {
"id": current_user.username,
"name": current_user.name,
},
"customization": {
"compactHeader": True,
"compactToolbar": True,
"toolbarNoTabs": False,
"autosave": False,
"forcesave": False,
},
},
"width": "100%",
"height": "100%",
}
config["token"] = jwt.encode(config, onlyoffice_settings.jwt_secret, algorithm="HS256")
return KnowledgeOnlyOfficeConfigRead(
documentServerUrl=public_url,
config=config,
)
def validate_onlyoffice_access_token(self, document_id: str, access_token: str) -> None:
onlyoffice_settings = resolve_onlyoffice_settings()
try:
payload = jwt.decode(
access_token,
onlyoffice_settings.jwt_secret,
algorithms=["HS256"],
)
except jwt.PyJWTError as exc:
raise ValueError("ONLYOFFICE 文件访问令牌无效。") from exc
if payload.get("scope") != "onlyoffice-content" or payload.get("document_id") != document_id:
raise ValueError("ONLYOFFICE 文件访问令牌无效。")
def handle_onlyoffice_callback(self, document_id: str, payload: dict[str, Any]) -> None:
self.ensure_library_ready()
callback = self._parse_onlyoffice_callback(payload)
if callback.status not in {2, 6} or not callback.download_url:
return
logger.info(
"ONLYOFFICE callback received id=%s status=%s users=%s",
document_id,
callback.status,
",".join(callback.users) if callback.users else "-",
)
request = Request(callback.download_url, headers={"User-Agent": "x-financial-onlyoffice"})
with urlopen(request, timeout=30) as response: # noqa: S310
content = response.read()
actor_name = callback.users[0] if callback.users else "ONLYOFFICE"
self._replace_document_content(document_id, content, actor_name=actor_name)
def _load_documents(self) -> list[KnowledgeDocumentRead]:
self.ensure_library_ready()
index = self._load_index()
changed = self._reconcile_index(index)
changed = self._reconcile_document_ingest_statuses(index) or changed
if changed:
self._save_index(index)
documents = [self._serialize_document(entry) for entry in index["documents"]]
return sorted(documents, key=lambda item: item.time, reverse=True)
def _serialize_document(self, entry: dict[str, Any]) -> KnowledgeDocumentRead:
extension = entry.get("extension") or self._extract_extension(entry["original_name"])
file_type = self._resolve_file_type(extension)
size_bytes = int(entry.get("size_bytes") or 0)
updated_at = self._format_time(entry.get("updated_at") or entry.get("created_at"))
state_code = self._normalize_ingest_status_code(entry.get("ingest_status"))
state_label, state_tone = KNOWLEDGE_INGEST_STATUS_META.get(
state_code,
KNOWLEDGE_INGEST_STATUS_META[KNOWLEDGE_INGEST_STATUS_PUBLISHED],
)
return KnowledgeDocumentRead(
id=entry["id"],
name=entry["original_name"],
folder=entry["folder"],
tag=f"{entry['folder']} / {extension.upper() or 'FILE'}",
time=updated_at,
version=f"v{int(entry.get('version_number', 1))}.0",
stateCode=state_code,
state=state_label,
stateTone=state_tone,
owner=entry.get("uploaded_by") or "系统导入",
icon=ICON_BY_TYPE.get(file_type, ICON_BY_TYPE["binary"]),
fileType=file_type,
fileTypeLabel=self._resolve_file_type_label(file_type),
summary=f"{entry['folder']} · {extension.upper() or 'FILE'} · {self._format_size(size_bytes)}",
mimeType=entry.get("mime_type") or "application/octet-stream",
extension=extension,
sizeBytes=size_bytes,
canPreview=self._can_preview(extension),
)
def _build_preview(
self, entry: dict[str, Any]
) -> tuple[str, list[KnowledgePreviewPageRead]]:
extension = self._extract_extension(entry["original_name"])
file_path = self._resolve_document_path(entry)
if extension == "pdf":
return "pdf", []
if extension in IMAGE_EXTENSIONS:
return "image", []
if extension in TEXT_EXTENSIONS:
text = self._read_text_preview(file_path)
return "text", [self._build_text_preview_page(entry, text)]
if extension == "docx":
text = self._extract_docx_text(file_path)
return "text", [self._build_text_preview_page(entry, text)]
if extension == "xlsx":
return "table", self._build_xlsx_preview_pages(entry, file_path)
if extension == "pptx":
return "slides", self._build_pptx_preview_pages(entry, file_path)
return (
"unsupported",
[
KnowledgePreviewPageRead(
title=entry["original_name"],
subtitle="当前格式暂不支持在线解析预览。",
stats=[
KnowledgePreviewStatRead(label="文件格式", value=extension.upper() or "FILE"),
KnowledgePreviewStatRead(label="文件大小", value=self._format_size(entry["size_bytes"])),
KnowledgePreviewStatRead(label="建议操作", value="下载后查看"),
],
blocks=[
KnowledgePreviewBlockRead(
heading="预览说明",
lines=[
"当前系统已支持该文件的上传、下载和权限控制。",
"如需在线预览,可后续接入专门的文档转换服务。",
],
)
],
)
],
)
def _build_text_preview_page(
self, entry: dict[str, Any], text: str
) -> KnowledgePreviewPageRead:
lines = [line.strip() for line in text.splitlines() if line.strip()]
if not lines:
lines = ["文件内容为空,或当前文档未提取到可展示文本。"]
groups = [lines[index : index + 8] for index in range(0, min(len(lines), 24), 8)]
blocks = [
KnowledgePreviewBlockRead(heading=f"内容片段 {index + 1}", lines=group)
for index, group in enumerate(groups)
]
return KnowledgePreviewPageRead(
title=entry["original_name"],
subtitle="文本提取预览",
stats=[
KnowledgePreviewStatRead(label="文件格式", value=entry["extension"].upper() or "TEXT"),
KnowledgePreviewStatRead(label="可见行数", value=str(len(lines))),
KnowledgePreviewStatRead(label="文件大小", value=self._format_size(entry["size_bytes"])),
],
blocks=blocks,
)
def _build_xlsx_preview_pages(
self, entry: dict[str, Any], file_path: Path
) -> list[KnowledgePreviewPageRead]:
sheets = self._extract_xlsx_sheets(file_path)
if not sheets:
sheets = [("Sheet 1", [["未提取到表格内容。"]])]
preview_pages: list[KnowledgePreviewPageRead] = []
sheet_count = len(sheets)
for sheet_name, rows in sheets[:8]:
visible_rows = rows[:12] if rows else [["未提取到表格内容。"]]
blocks = [
KnowledgePreviewBlockRead(
heading=f"{index + 1}",
lines=[" | ".join((cell or "") for cell in row)],
)
for index, row in enumerate(visible_rows)
]
preview_pages.append(
KnowledgePreviewPageRead(
title=sheet_name,
subtitle="表格内容预览",
stats=[
KnowledgePreviewStatRead(label="工作表数量", value=str(sheet_count)),
KnowledgePreviewStatRead(label="预览行数", value=str(len(visible_rows))),
KnowledgePreviewStatRead(label="文件大小", value=self._format_size(entry["size_bytes"])),
],
blocks=blocks,
)
)
return preview_pages
def _build_pptx_preview_pages(
self, entry: dict[str, Any], file_path: Path
) -> list[KnowledgePreviewPageRead]:
slides = self._extract_pptx_slides(file_path)
if not slides:
slides = [["未提取到幻灯片文本。"]]
pages: list[KnowledgePreviewPageRead] = []
for index, slide_lines in enumerate(slides[:8]):
pages.append(
KnowledgePreviewPageRead(
title=entry["original_name"],
subtitle=f"幻灯片 {index + 1}",
stats=[
KnowledgePreviewStatRead(label="页码", value=str(index + 1)),
KnowledgePreviewStatRead(label="文本条数", value=str(len(slide_lines))),
KnowledgePreviewStatRead(label="文件格式", value="PPTX"),
],
blocks=[
KnowledgePreviewBlockRead(
heading="幻灯片内容",
lines=slide_lines or ["该页未提取到文本内容。"],
)
],
)
)
return pages
def _load_index(self) -> dict[str, Any]:
try:
payload = json.loads(self.index_path.read_text(encoding="utf-8"))
except (FileNotFoundError, json.JSONDecodeError):
payload = {"version": 1, "documents": []}
payload.setdefault("documents", [])
return payload
def _save_index(self, index: dict[str, Any]) -> None:
self.index_path.write_text(
json.dumps(index, ensure_ascii=False, indent=2),
encoding="utf-8",
)
def _reconcile_index(self, index: dict[str, Any]) -> bool:
changed = False
documents = index.setdefault("documents", [])
known_by_stored = {
(item["folder"], item["stored_name"]): item
for item in documents
if item.get("folder") and item.get("stored_name")
}
existing_items: list[dict[str, Any]] = []
for item in documents:
file_path = self._resolve_document_path(item)
if file_path.exists():
item["size_bytes"] = file_path.stat().st_size
item["extension"] = self._extract_extension(item["original_name"])
item["mime_type"] = item.get("mime_type") or (
mimetypes.guess_type(item["original_name"])[0] or "application/octet-stream"
)
normalized_status = self._normalize_ingest_status_code(item.get("ingest_status"))
if item.get("ingest_status") != normalized_status:
item["ingest_status"] = normalized_status
changed = True
existing_items.append(item)
else:
changed = True
for folder_name in FIXED_KNOWLEDGE_FOLDERS:
folder_path = self.library_root / folder_name
for file_path in folder_path.iterdir():
if not file_path.is_file() or file_path.name.startswith("."):
continue
key = (folder_name, file_path.name)
if key in known_by_stored:
continue
document_id, original_name = self._parse_stored_name(file_path.name)
stat = file_path.stat()
existing_items.append(
{
"id": document_id,
"folder": folder_name,
"original_name": original_name,
"stored_name": file_path.name,
"mime_type": mimetypes.guess_type(original_name)[0]
or "application/octet-stream",
"extension": self._extract_extension(original_name),
"size_bytes": stat.st_size,
"sha256": "",
"created_at": datetime.fromtimestamp(stat.st_ctime, tz=UTC).isoformat(),
"updated_at": datetime.fromtimestamp(stat.st_mtime, tz=UTC).isoformat(),
"uploaded_by": "系统导入",
"version_number": 1,
"ingest_status": KNOWLEDGE_INGEST_STATUS_PUBLISHED,
}
)
changed = True
if changed or len(existing_items) != len(documents):
index["documents"] = existing_items
return True
return False
def _reconcile_document_ingest_statuses(
self,
index: dict[str, Any],
*,
document_ids: list[str] | None = None,
preserve_syncing: bool = True,
) -> bool:
changed = False
target_ids = {str(item).strip() for item in document_ids or [] if str(item).strip()}
wiki_index = self._load_llm_wiki_index()
wiki_by_document_id = {
str(item.get("document_id") or "").strip(): item
for item in list(wiki_index.get("documents") or [])
if str(item.get("document_id") or "").strip()
}
for entry in index.get("documents", []):
document_id = str(entry.get("id") or "").strip()
if target_ids and document_id not in target_ids:
continue
current_status = self._normalize_ingest_status_code(entry.get("ingest_status"))
if entry.get("ingest_status") != current_status:
entry["ingest_status"] = current_status
changed = True
if (
current_status == KNOWLEDGE_INGEST_STATUS_SYNCING
and preserve_syncing
and not self._is_syncing_status_stale(entry)
):
continue
desired_status = (
KNOWLEDGE_INGEST_STATUS_INGESTED
if self._has_ingested_llm_wiki_document(entry, wiki_by_document_id.get(document_id))
else KNOWLEDGE_INGEST_STATUS_PUBLISHED
)
if current_status == KNOWLEDGE_INGEST_STATUS_FAILED and desired_status != KNOWLEDGE_INGEST_STATUS_INGESTED:
continue
if current_status != desired_status:
entry["ingest_status"] = desired_status
changed = True
return changed
def _load_llm_wiki_index(self) -> dict[str, Any]:
try:
payload = json.loads(self.llm_wiki_index_path.read_text(encoding="utf-8"))
except (FileNotFoundError, json.JSONDecodeError):
payload = {"documents": []}
payload.setdefault("documents", [])
return payload
def _has_ingested_llm_wiki_document(
self,
entry: dict[str, Any],
wiki_document: dict[str, Any] | None,
) -> bool:
if not isinstance(wiki_document, dict):
return False
if int(wiki_document.get("knowledge_candidate_count") or 0) <= 0:
return False
current_signature = self._build_llm_wiki_document_signature(entry)
wiki_signature = wiki_document.get("signature")
if isinstance(wiki_signature, dict):
return wiki_signature == current_signature
return (
str(wiki_document.get("document_id") or "").strip() == str(entry.get("id") or "").strip()
and str(wiki_document.get("checksum") or "").strip() == str(entry.get("sha256") or "").strip()
)
@staticmethod
def _build_llm_wiki_document_signature(entry: dict[str, Any]) -> dict[str, Any]:
return {
"document_id": str(entry.get("id") or ""),
"original_name": str(entry.get("original_name") or ""),
"stored_name": str(entry.get("stored_name") or ""),
"sha256": str(entry.get("sha256") or ""),
"version_number": int(entry.get("version_number") or 1),
"updated_at": str(entry.get("updated_at") or ""),
}
@staticmethod
def _normalize_ingest_status_code(value: Any) -> int:
try:
status_code = int(value)
except (TypeError, ValueError):
return KNOWLEDGE_INGEST_STATUS_PUBLISHED
if status_code not in KNOWLEDGE_INGEST_STATUS_META:
return KNOWLEDGE_INGEST_STATUS_PUBLISHED
return status_code
@staticmethod
def _is_syncing_status_stale(entry: dict[str, Any]) -> bool:
raw_value = str(entry.get("ingest_status_updated_at") or "").strip()
if not raw_value:
return True
try:
updated_at = datetime.fromisoformat(raw_value)
except ValueError:
return True
if updated_at.tzinfo is None:
updated_at = updated_at.replace(tzinfo=UTC)
age_seconds = (datetime.now(UTC) - updated_at.astimezone(UTC)).total_seconds()
return age_seconds >= KNOWLEDGE_INGEST_SYNC_STALE_SECONDS
def _require_entry(self, index: dict[str, Any], document_id: str) -> dict[str, Any]:
for entry in index["documents"]:
if entry["id"] == document_id:
return entry
raise FileNotFoundError(document_id)
def _resolve_document_path(self, entry: dict[str, Any]) -> Path:
return self.library_root / entry["folder"] / entry["stored_name"]
def _replace_document_content(self, document_id: str, content: bytes, actor_name: str) -> KnowledgeDocumentDetailRead:
index = self._load_index()
entry = self._require_entry(index, document_id)
current_user = CurrentUserContext(
username="onlyoffice",
name=actor_name or "ONLYOFFICE",
role_codes=["manager"],
is_admin=True,
)
return self.upload_document(
folder=entry["folder"],
filename=entry["original_name"],
content=content,
current_user=current_user,
)
@staticmethod
def _parse_onlyoffice_callback(payload: dict[str, Any]) -> OnlyOfficeCallbackPayload:
status = int(payload.get("status") or 0)
download_url = str(payload.get("url") or "").strip()
users = [str(item).strip() for item in payload.get("users") or [] if str(item).strip()]
return OnlyOfficeCallbackPayload(status=status, download_url=download_url, users=users)
@staticmethod
def _normalize_filename(filename: str) -> str:
normalized = Path(str(filename or "").strip()).name.strip()
normalized = normalized.replace("/", "_").replace("\\", "_")
if not normalized:
raise ValueError("文件名不能为空。")
return normalized
@staticmethod
def _normalize_folder(folder: str) -> str:
normalized = str(folder or "").strip()
if normalized not in FIXED_KNOWLEDGE_FOLDERS:
raise ValueError("只能上传到预设知识库文件夹。")
return normalized
@staticmethod
def _extract_extension(filename: str) -> str:
suffix = Path(filename).suffix.lower().lstrip(".")
return suffix
@staticmethod
def _build_onlyoffice_document_key(entry: dict[str, Any]) -> str:
version = int(entry.get("version_number", 1))
checksum = str(entry.get("sha256") or "")[:12]
return f"{entry['id']}-v{version}-{checksum or 'nochecksum'}"
def _build_onlyoffice_access_token(self, document_id: str) -> str:
onlyoffice_settings = resolve_onlyoffice_settings()
payload = {
"scope": "onlyoffice-content",
"document_id": document_id,
}
return jwt.encode(payload, onlyoffice_settings.jwt_secret, algorithm="HS256")
@staticmethod
def _resolve_onlyoffice_document_type(extension: str) -> str:
if extension in WORD_EXTENSIONS:
return "word"
if extension in EXCEL_EXTENSIONS:
return "cell"
if extension in PPT_EXTENSIONS:
return "slide"
raise ValueError("当前文件格式不支持 ONLYOFFICE 预览。")
@staticmethod
def _parse_stored_name(stored_name: str) -> tuple[str, str]:
if "__" not in stored_name:
return uuid4().hex, stored_name
document_id, original_name = stored_name.split("__", 1)
return document_id or uuid4().hex, original_name or stored_name
@staticmethod
def _format_time(value: str | None) -> str:
if not value:
return ""
try:
parsed = datetime.fromisoformat(value)
except ValueError:
return value
return parsed.astimezone(UTC).strftime("%Y-%m-%d %H:%M")
@staticmethod
def _format_size(size_bytes: int) -> str:
if size_bytes < 1024:
return f"{size_bytes} B"
if size_bytes < 1024 * 1024:
return f"{size_bytes / 1024:.1f} KB"
return f"{size_bytes / (1024 * 1024):.1f} MB"
@staticmethod
def _resolve_file_type(extension: str) -> str:
if extension == "pdf":
return "pdf"
if extension in WORD_EXTENSIONS:
return "word"
if extension in EXCEL_EXTENSIONS:
return "excel"
if extension in PPT_EXTENSIONS:
return "ppt"
if extension in IMAGE_EXTENSIONS:
return "image"
if extension in TEXT_EXTENSIONS:
return "text"
if extension in ARCHIVE_EXTENSIONS:
return "archive"
return "binary"
@staticmethod
def _resolve_file_type_label(file_type: str) -> str:
mapping = {
"pdf": "PDF 预览",
"word": "Word 预览",
"excel": "Excel 预览",
"ppt": "PPT 预览",
"image": "图片预览",
"text": "文本预览",
"archive": "压缩包",
"binary": "文件预览",
}
return mapping.get(file_type, "文件预览")
@staticmethod
def _can_preview(extension: str) -> bool:
return extension in INLINE_PREVIEW_EXTENSIONS or extension in STRUCTURED_PREVIEW_EXTENSIONS
@staticmethod
def _read_text_preview(file_path: Path) -> str:
encodings = ("utf-8", "utf-8-sig", "gbk")
for encoding in encodings:
try:
return file_path.read_text(encoding=encoding)
except UnicodeDecodeError:
continue
return "当前文本文件编码暂不支持在线解析。"
@staticmethod
def _extract_docx_text(file_path: Path) -> str:
try:
with ZipFile(file_path) as archive:
xml_content = archive.read("word/document.xml")
except (BadZipFile, KeyError):
return "当前 Word 文件解析失败。"
root = ElementTree.fromstring(xml_content)
texts = [node.text.strip() for node in root.iter() if node.tag.endswith("}t") and node.text]
return "\n".join(texts)
def _extract_document_text_from_path(
self,
*,
file_path: Path,
original_name: str,
mime_type: str,
) -> str:
extension = self._extract_extension(original_name)
if extension in TEXT_EXTENSIONS:
return self._normalize_extracted_text(self._read_text_preview(file_path))
if extension == "docx":
return self._normalize_extracted_text(self._extract_docx_text(file_path))
if extension == "pdf":
text = self._normalize_extracted_text(self._extract_pdf_text(file_path))
if text:
return text
return self._normalize_extracted_text(
self._extract_text_with_ocr(
file_path=file_path,
original_name=original_name,
mime_type=mime_type,
)
)
if extension in IMAGE_EXTENSIONS:
return self._normalize_extracted_text(
self._extract_text_with_ocr(
file_path=file_path,
original_name=original_name,
mime_type=mime_type,
)
)
return ""
@staticmethod
def _normalize_extracted_text(text: str) -> str:
normalized = str(text or "").replace("\r\n", "\n").replace("\r", "\n")
normalized = re.sub(r"\n{3,}", "\n\n", normalized)
return normalized.strip()
@staticmethod
def _extract_pdf_text(file_path: Path) -> str:
pdftotext_bin = shutil.which("pdftotext")
if not pdftotext_bin:
return ""
completed = subprocess.run(
[pdftotext_bin, str(file_path), "-"],
capture_output=True,
text=True,
timeout=40,
check=False,
)
if completed.returncode != 0:
return ""
return str(completed.stdout or "")
@staticmethod
def _extract_text_with_ocr(
*,
file_path: Path,
original_name: str,
mime_type: str,
) -> str:
try:
from app.services.ocr import OcrService
result = OcrService().recognize_files(
[(original_name, file_path.read_bytes(), mime_type)]
)
except Exception:
return ""
parts: list[str] = []
for document in result.documents:
text = str(getattr(document, "text", "") or "").strip()
summary = str(getattr(document, "summary", "") or "").strip()
if text:
parts.append(text)
elif summary:
parts.append(summary)
return "\n\n".join(part for part in parts if part)
@staticmethod
def _extract_xlsx_sheets(file_path: Path) -> list[tuple[str, list[list[str]]]]:
try:
with ZipFile(file_path) as archive:
shared_strings: list[str] = []
if "xl/sharedStrings.xml" in archive.namelist():
shared_root = ElementTree.fromstring(archive.read("xl/sharedStrings.xml"))
shared_strings = [
"".join(node.itertext()).strip()
for node in shared_root.iter()
if node.tag.endswith("}si")
]
sheet_files = sorted(
name
for name in archive.namelist()
if re.fullmatch(r"xl/worksheets/sheet\d+\.xml", name)
)
if not sheet_files:
return []
relationship_targets: dict[str, str] = {}
if "xl/_rels/workbook.xml.rels" in archive.namelist():
rel_root = ElementTree.fromstring(archive.read("xl/_rels/workbook.xml.rels"))
for node in rel_root.iter():
if not node.tag.endswith("Relationship"):
continue
rel_id = node.attrib.get("Id")
target = node.attrib.get("Target")
if not rel_id or not target:
continue
normalized = target.lstrip("/")
if not normalized.startswith("xl/"):
normalized = f"xl/{normalized.lstrip('./')}"
relationship_targets[rel_id] = normalized
ordered_sheets: list[tuple[str, str]] = []
if "xl/workbook.xml" in archive.namelist():
workbook_root = ElementTree.fromstring(archive.read("xl/workbook.xml"))
for index, node in enumerate(workbook_root.iter()):
if not node.tag.endswith("sheet"):
continue
sheet_name = node.attrib.get("name") or f"Sheet {index + 1}"
relationship_id = next(
(value for key, value in node.attrib.items() if key.endswith("}id")),
None,
)
target = relationship_targets.get(relationship_id or "")
if target:
ordered_sheets.append((sheet_name, target))
if not ordered_sheets:
ordered_sheets = [
(f"Sheet {index + 1}", sheet_file)
for index, sheet_file in enumerate(sheet_files)
]
preview_sheets: list[tuple[str, list[list[str]]]] = []
for sheet_name, target in ordered_sheets:
if target not in archive.namelist():
continue
sheet_root = ElementTree.fromstring(archive.read(target))
rows: list[list[str]] = []
for row in sheet_root.iter():
if not row.tag.endswith("}row"):
continue
row_values: list[str] = []
for cell in row:
if not cell.tag.endswith("}c"):
continue
cell_type = cell.attrib.get("t")
value_node = next((item for item in cell if item.tag.endswith("}v")), None)
if cell_type == "inlineStr":
text_node = next((item for item in cell.iter() if item.tag.endswith("}t")), None)
row_values.append((text_node.text or "").strip() if text_node is not None else "")
continue
if value_node is None or value_node.text is None:
row_values.append("")
continue
raw_value = value_node.text.strip()
if cell_type == "s" and raw_value.isdigit():
index = int(raw_value)
row_values.append(shared_strings[index] if index < len(shared_strings) else raw_value)
else:
row_values.append(raw_value)
if row_values:
rows.append(row_values)
preview_sheets.append((sheet_name, rows))
return preview_sheets
except (BadZipFile, ElementTree.ParseError, KeyError, ValueError):
return []
@staticmethod
def _extract_pptx_slides(file_path: Path) -> list[list[str]]:
try:
with ZipFile(file_path) as archive:
slide_names = sorted(
name
for name in archive.namelist()
if re.fullmatch(r"ppt/slides/slide\d+\.xml", name)
)
slides: list[list[str]] = []
for slide_name in slide_names:
root = ElementTree.fromstring(archive.read(slide_name))
texts = [node.text.strip() for node in root.iter() if node.tag.endswith("}t") and node.text]
slides.append(texts)
return slides
except (BadZipFile, ElementTree.ParseError, KeyError):
return []