Files
X-Financial/server/src/app/services/knowledge_file_utils.py
2026-05-22 10:42:31 +08:00

113 lines
3.5 KiB
Python

from __future__ import annotations
from datetime import UTC, datetime
from pathlib import Path
from uuid import uuid4
from app.services.knowledge_constants import (
ARCHIVE_EXTENSIONS,
EXCEL_EXTENSIONS,
FIXED_KNOWLEDGE_FOLDERS,
IMAGE_EXTENSIONS,
INLINE_PREVIEW_EXTENSIONS,
PPT_EXTENSIONS,
STRUCTURED_PREVIEW_EXTENSIONS,
TEXT_EXTENSIONS,
WORD_EXTENSIONS,
)
def normalize_filename(filename: str) -> str:
normalized = Path(str(filename or "").strip()).name.strip()
normalized = normalized.replace("/", "_").replace("\\", "_")
if not normalized:
raise ValueError("文件名不能为空。")
return normalized
def normalize_folder(folder: str) -> str:
normalized = str(folder or "").strip()
if normalized not in FIXED_KNOWLEDGE_FOLDERS:
raise ValueError("只能上传到预设知识库文件夹。")
return normalized
def extract_extension(filename: str) -> str:
suffix = Path(filename).suffix.lower().lstrip(".")
return suffix
def _build_onlyoffice_document_key(entry: dict[str, Any]) -> str:
version = int(entry.get("version_number", 1))
checksum = str(entry.get("sha256") or "")[:12]
return f"{entry['id']}-v{version}-{checksum or 'nochecksum'}"
def _build_onlyoffice_access_token(self, document_id: str) -> str:
onlyoffice_settings = resolve_onlyoffice_settings()
payload = {
"scope": "onlyoffice-content",
"document_id": document_id,
}
return jwt.encode(payload, onlyoffice_settings.jwt_secret, algorithm="HS256")
def _resolve_onlyoffice_document_type(extension: str) -> str:
if extension in WORD_EXTENSIONS:
return "word"
if extension in EXCEL_EXTENSIONS:
return "cell"
if extension in PPT_EXTENSIONS:
return "slide"
raise ValueError("当前文件格式不支持 ONLYOFFICE 预览。")
def parse_stored_name(stored_name: str) -> tuple[str, str]:
if "__" not in stored_name:
return uuid4().hex, stored_name
document_id, original_name = stored_name.split("__", 1)
return document_id or uuid4().hex, original_name or stored_name
def format_time(value: str | None) -> str:
if not value:
return ""
try:
parsed = datetime.fromisoformat(value)
except ValueError:
return value
return parsed.astimezone(UTC).strftime("%Y-%m-%d %H:%M")
def format_size(size_bytes: int) -> str:
if size_bytes < 1024:
return f"{size_bytes} B"
if size_bytes < 1024 * 1024:
return f"{size_bytes / 1024:.1f} KB"
return f"{size_bytes / (1024 * 1024):.1f} MB"
def resolve_file_type(extension: str) -> str:
if extension == "pdf":
return "pdf"
if extension in WORD_EXTENSIONS:
return "word"
if extension in EXCEL_EXTENSIONS:
return "excel"
if extension in PPT_EXTENSIONS:
return "ppt"
if extension in IMAGE_EXTENSIONS:
return "image"
if extension in TEXT_EXTENSIONS:
return "text"
if extension in ARCHIVE_EXTENSIONS:
return "archive"
return "binary"
def resolve_file_type_label(file_type: str) -> str:
mapping = {
"pdf": "PDF 预览",
"word": "Word 预览",
"excel": "Excel 预览",
"ppt": "PPT 预览",
"image": "图片预览",
"text": "文本预览",
"archive": "压缩包",
"binary": "文件预览",
}
return mapping.get(file_type, "文件预览")
def can_preview(extension: str) -> bool:
return extension in INLINE_PREVIEW_EXTENSIONS or extension in STRUCTURED_PREVIEW_EXTENSIONS