feat: 完善系统配置、安全增强与知识库功能
- .env.example: API基础路径改为相对路径 /api/v1,支持代理转发 - README.md: 完善项目结构与启动说明文档 - docker-compose.yml: 新增Docker编排配置,支持容器化部署 - docker/: 新增Docker部署相关文档与配置 - server_start.sh: 重构启动脚本,添加容器环境检测、隔离虚拟环境路径、环境变量覆盖机制 - deps.py: 完善API依赖注入,增强权限验证逻辑 - admin_secret.py: 优化管理员密钥加密存储与验证 - config.py: 扩展配置管理,支持多环境变量绑定 - security.py: 增强安全模块,完善加密与认证机制 - db/base.py: 优化数据库基础架构与连接管理 - main.py: 更新应用入口,整合新模块路由 - models/: 完善系统模型配置,支持模型设置持久化 - repositories/settings.py: 优化设置仓储层,增强数据持久化 - services/settings.py: 重构设置服务,精简代码结构 - router.py: 更新API路由配置 - endpoints/knowledge.py: 新增知识库API端点 - schemas/knowledge.py: 新增知识库数据模型 - services/knowledge.py: 新增知识库业务逻辑 - storage/knowledge/.index.json: 知识库索引存储 - api.js: 完善API服务层,增强错误处理 - bootstrap.js: 优化前端初始化与引导流程 - useSetupView.js / useSystemState.js: 重构组合式函数 - TopBar.vue: 优化顶部导航栏组件 - SettingsView.vue: 重构设置页面UI,增强用户体验 - SetupView.vue / SetupRouteView.vue: 完善引导流程页面 - PoliciesView.vue: 优化策略视图组件 - vite.config.js: 更新Vite构建配置 - web_start.sh: 完善前端启动脚本 - views/scripts/: 优化各业务视图JS逻辑 - settings-view.css: 重构设置页面样式 - setup-view.css: 完善引导页样式 - policies-view.css: 优化策略页样式 - test_auth_service.py: 完善认证服务测试 - test_settings_persistence.py: 增强设置持久化测试 - document/: 新增开发文档与工作日志
This commit is contained in:
634
server/src/app/services/knowledge.py
Normal file
634
server/src/app/services/knowledge.py
Normal file
@@ -0,0 +1,634 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
import json
|
||||
import mimetypes
|
||||
import re
|
||||
from datetime import UTC, datetime
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
from uuid import uuid4
|
||||
from xml.etree import ElementTree
|
||||
from zipfile import BadZipFile, ZipFile
|
||||
|
||||
from app.api.deps import CurrentUserContext
|
||||
from app.core.config import get_settings
|
||||
from app.core.logging import get_logger
|
||||
from app.schemas.knowledge import (
|
||||
KnowledgeDocumentDetailRead,
|
||||
KnowledgeDocumentRead,
|
||||
KnowledgeFolderRead,
|
||||
KnowledgeLibraryRead,
|
||||
KnowledgePreviewBlockRead,
|
||||
KnowledgePreviewPageRead,
|
||||
KnowledgePreviewStatRead,
|
||||
)
|
||||
|
||||
logger = get_logger("app.services.knowledge")
|
||||
|
||||
FIXED_KNOWLEDGE_FOLDERS = [
|
||||
"财务知识库",
|
||||
"制度政策",
|
||||
"报销制度",
|
||||
"差旅规范",
|
||||
"发票管理",
|
||||
"税务合规",
|
||||
"预算管理",
|
||||
"财务共享",
|
||||
"培训资料",
|
||||
"常见问答",
|
||||
]
|
||||
|
||||
ICON_BY_TYPE = {
|
||||
"pdf": "mdi mdi-file-document-outline-pdf pdf",
|
||||
"word": "mdi mdi-file-document-outline-word word",
|
||||
"excel": "mdi mdi-file-document-outline-excel excel",
|
||||
"ppt": "mdi mdi-file-powerpoint-box ppt",
|
||||
"image": "mdi mdi-file-image-outline image",
|
||||
"text": "mdi mdi-file-document-outline text",
|
||||
"archive": "mdi mdi-folder-zip-outline archive",
|
||||
"binary": "mdi mdi-file-outline",
|
||||
}
|
||||
|
||||
TEXT_EXTENSIONS = {"txt", "md", "csv", "json", "xml", "yml", "yaml", "log"}
|
||||
WORD_EXTENSIONS = {"doc", "docx"}
|
||||
EXCEL_EXTENSIONS = {"xls", "xlsx", "csv"}
|
||||
PPT_EXTENSIONS = {"ppt", "pptx"}
|
||||
IMAGE_EXTENSIONS = {"png", "jpg", "jpeg", "gif", "bmp", "webp", "svg"}
|
||||
ARCHIVE_EXTENSIONS = {"zip", "rar", "7z"}
|
||||
STRUCTURED_PREVIEW_EXTENSIONS = {"docx", "xlsx", "pptx"} | TEXT_EXTENSIONS
|
||||
INLINE_PREVIEW_EXTENSIONS = {"pdf"} | IMAGE_EXTENSIONS
|
||||
|
||||
|
||||
def prepare_knowledge_library() -> None:
|
||||
KnowledgeService().ensure_library_ready()
|
||||
|
||||
|
||||
class KnowledgeService:
|
||||
def __init__(self, storage_root: Path | None = None) -> None:
|
||||
settings = get_settings()
|
||||
self.storage_root = Path(storage_root or settings.resolved_storage_root_dir)
|
||||
self.library_root = self.storage_root / "knowledge"
|
||||
self.index_path = self.library_root / ".index.json"
|
||||
|
||||
def ensure_library_ready(self) -> None:
|
||||
self.library_root.mkdir(parents=True, exist_ok=True)
|
||||
for folder_name in FIXED_KNOWLEDGE_FOLDERS:
|
||||
(self.library_root / folder_name).mkdir(parents=True, exist_ok=True)
|
||||
|
||||
if not self.index_path.exists():
|
||||
self._save_index({"version": 1, "documents": []})
|
||||
|
||||
index = self._load_index()
|
||||
if self._reconcile_index(index):
|
||||
self._save_index(index)
|
||||
|
||||
def list_library(self) -> KnowledgeLibraryRead:
|
||||
documents = self._load_documents()
|
||||
folders = [
|
||||
KnowledgeFolderRead(
|
||||
name=folder_name,
|
||||
count=sum(1 for item in documents if item.folder == folder_name),
|
||||
icon="mdi mdi-folder-open" if folder_name == "差旅规范" else "mdi mdi-folder",
|
||||
)
|
||||
for folder_name in FIXED_KNOWLEDGE_FOLDERS
|
||||
]
|
||||
return KnowledgeLibraryRead(folders=folders, documents=documents)
|
||||
|
||||
def get_document_detail(self, document_id: str) -> KnowledgeDocumentDetailRead:
|
||||
self.ensure_library_ready()
|
||||
index = self._load_index()
|
||||
entry = self._require_entry(index, document_id)
|
||||
preview_kind, preview_pages = self._build_preview(entry)
|
||||
document = self._serialize_document(entry)
|
||||
return KnowledgeDocumentDetailRead(
|
||||
**document.model_dump(),
|
||||
previewKind=preview_kind,
|
||||
previewPages=preview_pages,
|
||||
)
|
||||
|
||||
def upload_document(
|
||||
self,
|
||||
folder: str,
|
||||
filename: str,
|
||||
content: bytes,
|
||||
current_user: CurrentUserContext,
|
||||
) -> KnowledgeDocumentDetailRead:
|
||||
self.ensure_library_ready()
|
||||
normalized_folder = self._normalize_folder(folder)
|
||||
normalized_name = self._normalize_filename(filename)
|
||||
|
||||
if not content:
|
||||
raise ValueError("上传文件不能为空。")
|
||||
|
||||
index = self._load_index()
|
||||
existing_entry = next(
|
||||
(
|
||||
item
|
||||
for item in index["documents"]
|
||||
if item["folder"] == normalized_folder
|
||||
and item["original_name"].lower() == normalized_name.lower()
|
||||
),
|
||||
None,
|
||||
)
|
||||
|
||||
document_id = existing_entry["id"] if existing_entry else uuid4().hex
|
||||
stored_name = f"{document_id}__{normalized_name}"
|
||||
target_path = self.library_root / normalized_folder / stored_name
|
||||
|
||||
if existing_entry is not None and existing_entry["stored_name"] != stored_name:
|
||||
old_path = self.library_root / existing_entry["folder"] / existing_entry["stored_name"]
|
||||
if old_path.exists():
|
||||
old_path.unlink()
|
||||
|
||||
target_path.write_bytes(content)
|
||||
|
||||
now = datetime.now(UTC).isoformat()
|
||||
mime_type = mimetypes.guess_type(normalized_name)[0] or "application/octet-stream"
|
||||
checksum = hashlib.sha256(content).hexdigest()
|
||||
extension = self._extract_extension(normalized_name)
|
||||
|
||||
if existing_entry is None:
|
||||
entry = {
|
||||
"id": document_id,
|
||||
"folder": normalized_folder,
|
||||
"original_name": normalized_name,
|
||||
"stored_name": stored_name,
|
||||
"mime_type": mime_type,
|
||||
"extension": extension,
|
||||
"size_bytes": len(content),
|
||||
"sha256": checksum,
|
||||
"created_at": now,
|
||||
"updated_at": now,
|
||||
"uploaded_by": current_user.name,
|
||||
"version_number": 1,
|
||||
}
|
||||
index["documents"].append(entry)
|
||||
logger.info(
|
||||
"Knowledge document uploaded id=%s folder=%s filename=%s by=%s",
|
||||
document_id,
|
||||
normalized_folder,
|
||||
normalized_name,
|
||||
current_user.name,
|
||||
)
|
||||
else:
|
||||
existing_entry.update(
|
||||
{
|
||||
"stored_name": stored_name,
|
||||
"mime_type": mime_type,
|
||||
"extension": extension,
|
||||
"size_bytes": len(content),
|
||||
"sha256": checksum,
|
||||
"updated_at": now,
|
||||
"uploaded_by": current_user.name,
|
||||
"version_number": int(existing_entry.get("version_number", 1)) + 1,
|
||||
}
|
||||
)
|
||||
entry = existing_entry
|
||||
logger.info(
|
||||
"Knowledge document updated id=%s folder=%s filename=%s by=%s",
|
||||
document_id,
|
||||
normalized_folder,
|
||||
normalized_name,
|
||||
current_user.name,
|
||||
)
|
||||
|
||||
self._save_index(index)
|
||||
return self.get_document_detail(document_id)
|
||||
|
||||
def delete_document(self, document_id: str) -> None:
|
||||
self.ensure_library_ready()
|
||||
index = self._load_index()
|
||||
entry = self._require_entry(index, document_id)
|
||||
file_path = self._resolve_document_path(entry)
|
||||
if file_path.exists():
|
||||
file_path.unlink()
|
||||
|
||||
index["documents"] = [item for item in index["documents"] if item["id"] != document_id]
|
||||
self._save_index(index)
|
||||
logger.info("Knowledge document deleted id=%s filename=%s", document_id, entry["original_name"])
|
||||
|
||||
def get_document_content(self, document_id: str) -> tuple[Path, str, str]:
|
||||
self.ensure_library_ready()
|
||||
index = self._load_index()
|
||||
entry = self._require_entry(index, document_id)
|
||||
file_path = self._resolve_document_path(entry)
|
||||
|
||||
if not file_path.exists():
|
||||
raise FileNotFoundError(entry["original_name"])
|
||||
|
||||
return file_path, entry["mime_type"], entry["original_name"]
|
||||
|
||||
def _load_documents(self) -> list[KnowledgeDocumentRead]:
|
||||
self.ensure_library_ready()
|
||||
index = self._load_index()
|
||||
self._reconcile_index(index)
|
||||
self._save_index(index)
|
||||
|
||||
documents = [self._serialize_document(entry) for entry in index["documents"]]
|
||||
return sorted(documents, key=lambda item: item.time, reverse=True)
|
||||
|
||||
def _serialize_document(self, entry: dict[str, Any]) -> KnowledgeDocumentRead:
|
||||
extension = entry.get("extension") or self._extract_extension(entry["original_name"])
|
||||
file_type = self._resolve_file_type(extension)
|
||||
size_bytes = int(entry.get("size_bytes") or 0)
|
||||
updated_at = self._format_time(entry.get("updated_at") or entry.get("created_at"))
|
||||
|
||||
return KnowledgeDocumentRead(
|
||||
id=entry["id"],
|
||||
name=entry["original_name"],
|
||||
folder=entry["folder"],
|
||||
tag=f"{entry['folder']} / {extension.upper() or 'FILE'}",
|
||||
time=updated_at,
|
||||
version=f"v{int(entry.get('version_number', 1))}.0",
|
||||
state="已发布",
|
||||
stateTone="success",
|
||||
owner=entry.get("uploaded_by") or "系统导入",
|
||||
icon=ICON_BY_TYPE.get(file_type, ICON_BY_TYPE["binary"]),
|
||||
fileType=file_type,
|
||||
fileTypeLabel=self._resolve_file_type_label(file_type),
|
||||
summary=f"{entry['folder']} · {extension.upper() or 'FILE'} · {self._format_size(size_bytes)}",
|
||||
mimeType=entry.get("mime_type") or "application/octet-stream",
|
||||
extension=extension,
|
||||
sizeBytes=size_bytes,
|
||||
canPreview=self._can_preview(extension),
|
||||
)
|
||||
|
||||
def _build_preview(
|
||||
self, entry: dict[str, Any]
|
||||
) -> tuple[str, list[KnowledgePreviewPageRead]]:
|
||||
extension = self._extract_extension(entry["original_name"])
|
||||
file_path = self._resolve_document_path(entry)
|
||||
|
||||
if extension == "pdf":
|
||||
return "pdf", []
|
||||
|
||||
if extension in IMAGE_EXTENSIONS:
|
||||
return "image", []
|
||||
|
||||
if extension in TEXT_EXTENSIONS:
|
||||
text = self._read_text_preview(file_path)
|
||||
return "text", [self._build_text_preview_page(entry, text)]
|
||||
|
||||
if extension == "docx":
|
||||
text = self._extract_docx_text(file_path)
|
||||
return "text", [self._build_text_preview_page(entry, text)]
|
||||
|
||||
if extension == "xlsx":
|
||||
return "table", [self._build_xlsx_preview_page(entry, file_path)]
|
||||
|
||||
if extension == "pptx":
|
||||
return "slides", self._build_pptx_preview_pages(entry, file_path)
|
||||
|
||||
return (
|
||||
"unsupported",
|
||||
[
|
||||
KnowledgePreviewPageRead(
|
||||
title=entry["original_name"],
|
||||
subtitle="当前格式暂不支持在线解析预览。",
|
||||
stats=[
|
||||
KnowledgePreviewStatRead(label="文件格式", value=extension.upper() or "FILE"),
|
||||
KnowledgePreviewStatRead(label="文件大小", value=self._format_size(entry["size_bytes"])),
|
||||
KnowledgePreviewStatRead(label="建议操作", value="下载后查看"),
|
||||
],
|
||||
blocks=[
|
||||
KnowledgePreviewBlockRead(
|
||||
heading="预览说明",
|
||||
lines=[
|
||||
"当前系统已支持该文件的上传、下载和权限控制。",
|
||||
"如需在线预览,可后续接入专门的文档转换服务。",
|
||||
],
|
||||
)
|
||||
],
|
||||
)
|
||||
],
|
||||
)
|
||||
|
||||
def _build_text_preview_page(
|
||||
self, entry: dict[str, Any], text: str
|
||||
) -> KnowledgePreviewPageRead:
|
||||
lines = [line.strip() for line in text.splitlines() if line.strip()]
|
||||
if not lines:
|
||||
lines = ["文件内容为空,或当前文档未提取到可展示文本。"]
|
||||
|
||||
groups = [lines[index : index + 8] for index in range(0, min(len(lines), 24), 8)]
|
||||
blocks = [
|
||||
KnowledgePreviewBlockRead(heading=f"内容片段 {index + 1}", lines=group)
|
||||
for index, group in enumerate(groups)
|
||||
]
|
||||
|
||||
return KnowledgePreviewPageRead(
|
||||
title=entry["original_name"],
|
||||
subtitle="文本提取预览",
|
||||
stats=[
|
||||
KnowledgePreviewStatRead(label="文件格式", value=entry["extension"].upper() or "TEXT"),
|
||||
KnowledgePreviewStatRead(label="可见行数", value=str(len(lines))),
|
||||
KnowledgePreviewStatRead(label="文件大小", value=self._format_size(entry["size_bytes"])),
|
||||
],
|
||||
blocks=blocks,
|
||||
)
|
||||
|
||||
def _build_xlsx_preview_page(
|
||||
self, entry: dict[str, Any], file_path: Path
|
||||
) -> KnowledgePreviewPageRead:
|
||||
rows, sheet_count = self._extract_xlsx_rows(file_path)
|
||||
if not rows:
|
||||
rows = [["未提取到表格内容。"]]
|
||||
|
||||
blocks = [
|
||||
KnowledgePreviewBlockRead(
|
||||
heading=f"第 {index + 1} 行",
|
||||
lines=[" | ".join(cell for cell in row if cell) or "(空行)"],
|
||||
)
|
||||
for index, row in enumerate(rows[:12])
|
||||
]
|
||||
|
||||
return KnowledgePreviewPageRead(
|
||||
title=entry["original_name"],
|
||||
subtitle="表格内容预览",
|
||||
stats=[
|
||||
KnowledgePreviewStatRead(label="工作表数量", value=str(sheet_count)),
|
||||
KnowledgePreviewStatRead(label="预览行数", value=str(min(len(rows), 12))),
|
||||
KnowledgePreviewStatRead(label="文件大小", value=self._format_size(entry["size_bytes"])),
|
||||
],
|
||||
blocks=blocks,
|
||||
)
|
||||
|
||||
def _build_pptx_preview_pages(
|
||||
self, entry: dict[str, Any], file_path: Path
|
||||
) -> list[KnowledgePreviewPageRead]:
|
||||
slides = self._extract_pptx_slides(file_path)
|
||||
if not slides:
|
||||
slides = [["未提取到幻灯片文本。"]]
|
||||
|
||||
pages: list[KnowledgePreviewPageRead] = []
|
||||
for index, slide_lines in enumerate(slides[:8]):
|
||||
pages.append(
|
||||
KnowledgePreviewPageRead(
|
||||
title=entry["original_name"],
|
||||
subtitle=f"幻灯片 {index + 1}",
|
||||
stats=[
|
||||
KnowledgePreviewStatRead(label="页码", value=str(index + 1)),
|
||||
KnowledgePreviewStatRead(label="文本条数", value=str(len(slide_lines))),
|
||||
KnowledgePreviewStatRead(label="文件格式", value="PPTX"),
|
||||
],
|
||||
blocks=[
|
||||
KnowledgePreviewBlockRead(
|
||||
heading="幻灯片内容",
|
||||
lines=slide_lines or ["该页未提取到文本内容。"],
|
||||
)
|
||||
],
|
||||
)
|
||||
)
|
||||
|
||||
return pages
|
||||
|
||||
def _load_index(self) -> dict[str, Any]:
|
||||
try:
|
||||
payload = json.loads(self.index_path.read_text(encoding="utf-8"))
|
||||
except (FileNotFoundError, json.JSONDecodeError):
|
||||
payload = {"version": 1, "documents": []}
|
||||
payload.setdefault("documents", [])
|
||||
return payload
|
||||
|
||||
def _save_index(self, index: dict[str, Any]) -> None:
|
||||
self.index_path.write_text(
|
||||
json.dumps(index, ensure_ascii=False, indent=2),
|
||||
encoding="utf-8",
|
||||
)
|
||||
|
||||
def _reconcile_index(self, index: dict[str, Any]) -> bool:
|
||||
changed = False
|
||||
documents = index.setdefault("documents", [])
|
||||
known_by_stored = {
|
||||
(item["folder"], item["stored_name"]): item
|
||||
for item in documents
|
||||
if item.get("folder") and item.get("stored_name")
|
||||
}
|
||||
|
||||
existing_items: list[dict[str, Any]] = []
|
||||
for item in documents:
|
||||
file_path = self._resolve_document_path(item)
|
||||
if file_path.exists():
|
||||
item["size_bytes"] = file_path.stat().st_size
|
||||
item["extension"] = self._extract_extension(item["original_name"])
|
||||
item["mime_type"] = item.get("mime_type") or (
|
||||
mimetypes.guess_type(item["original_name"])[0] or "application/octet-stream"
|
||||
)
|
||||
existing_items.append(item)
|
||||
else:
|
||||
changed = True
|
||||
|
||||
for folder_name in FIXED_KNOWLEDGE_FOLDERS:
|
||||
folder_path = self.library_root / folder_name
|
||||
for file_path in folder_path.iterdir():
|
||||
if not file_path.is_file() or file_path.name.startswith("."):
|
||||
continue
|
||||
|
||||
key = (folder_name, file_path.name)
|
||||
if key in known_by_stored:
|
||||
continue
|
||||
|
||||
document_id, original_name = self._parse_stored_name(file_path.name)
|
||||
stat = file_path.stat()
|
||||
existing_items.append(
|
||||
{
|
||||
"id": document_id,
|
||||
"folder": folder_name,
|
||||
"original_name": original_name,
|
||||
"stored_name": file_path.name,
|
||||
"mime_type": mimetypes.guess_type(original_name)[0]
|
||||
or "application/octet-stream",
|
||||
"extension": self._extract_extension(original_name),
|
||||
"size_bytes": stat.st_size,
|
||||
"sha256": "",
|
||||
"created_at": datetime.fromtimestamp(stat.st_ctime, tz=UTC).isoformat(),
|
||||
"updated_at": datetime.fromtimestamp(stat.st_mtime, tz=UTC).isoformat(),
|
||||
"uploaded_by": "系统导入",
|
||||
"version_number": 1,
|
||||
}
|
||||
)
|
||||
changed = True
|
||||
|
||||
if changed or len(existing_items) != len(documents):
|
||||
index["documents"] = existing_items
|
||||
return True
|
||||
return False
|
||||
|
||||
def _require_entry(self, index: dict[str, Any], document_id: str) -> dict[str, Any]:
|
||||
for entry in index["documents"]:
|
||||
if entry["id"] == document_id:
|
||||
return entry
|
||||
raise FileNotFoundError(document_id)
|
||||
|
||||
def _resolve_document_path(self, entry: dict[str, Any]) -> Path:
|
||||
return self.library_root / entry["folder"] / entry["stored_name"]
|
||||
|
||||
@staticmethod
|
||||
def _normalize_filename(filename: str) -> str:
|
||||
normalized = Path(str(filename or "").strip()).name.strip()
|
||||
normalized = normalized.replace("/", "_").replace("\\", "_")
|
||||
if not normalized:
|
||||
raise ValueError("文件名不能为空。")
|
||||
return normalized
|
||||
|
||||
@staticmethod
|
||||
def _normalize_folder(folder: str) -> str:
|
||||
normalized = str(folder or "").strip()
|
||||
if normalized not in FIXED_KNOWLEDGE_FOLDERS:
|
||||
raise ValueError("只能上传到预设知识库文件夹。")
|
||||
return normalized
|
||||
|
||||
@staticmethod
|
||||
def _extract_extension(filename: str) -> str:
|
||||
suffix = Path(filename).suffix.lower().lstrip(".")
|
||||
return suffix
|
||||
|
||||
@staticmethod
|
||||
def _parse_stored_name(stored_name: str) -> tuple[str, str]:
|
||||
if "__" not in stored_name:
|
||||
return uuid4().hex, stored_name
|
||||
document_id, original_name = stored_name.split("__", 1)
|
||||
return document_id or uuid4().hex, original_name or stored_name
|
||||
|
||||
@staticmethod
|
||||
def _format_time(value: str | None) -> str:
|
||||
if not value:
|
||||
return ""
|
||||
try:
|
||||
parsed = datetime.fromisoformat(value)
|
||||
except ValueError:
|
||||
return value
|
||||
return parsed.astimezone(UTC).strftime("%Y-%m-%d %H:%M")
|
||||
|
||||
@staticmethod
|
||||
def _format_size(size_bytes: int) -> str:
|
||||
if size_bytes < 1024:
|
||||
return f"{size_bytes} B"
|
||||
if size_bytes < 1024 * 1024:
|
||||
return f"{size_bytes / 1024:.1f} KB"
|
||||
return f"{size_bytes / (1024 * 1024):.1f} MB"
|
||||
|
||||
@staticmethod
|
||||
def _resolve_file_type(extension: str) -> str:
|
||||
if extension == "pdf":
|
||||
return "pdf"
|
||||
if extension in WORD_EXTENSIONS:
|
||||
return "word"
|
||||
if extension in EXCEL_EXTENSIONS:
|
||||
return "excel"
|
||||
if extension in PPT_EXTENSIONS:
|
||||
return "ppt"
|
||||
if extension in IMAGE_EXTENSIONS:
|
||||
return "image"
|
||||
if extension in TEXT_EXTENSIONS:
|
||||
return "text"
|
||||
if extension in ARCHIVE_EXTENSIONS:
|
||||
return "archive"
|
||||
return "binary"
|
||||
|
||||
@staticmethod
|
||||
def _resolve_file_type_label(file_type: str) -> str:
|
||||
mapping = {
|
||||
"pdf": "PDF 预览",
|
||||
"word": "Word 预览",
|
||||
"excel": "Excel 预览",
|
||||
"ppt": "PPT 预览",
|
||||
"image": "图片预览",
|
||||
"text": "文本预览",
|
||||
"archive": "压缩包",
|
||||
"binary": "文件预览",
|
||||
}
|
||||
return mapping.get(file_type, "文件预览")
|
||||
|
||||
@staticmethod
|
||||
def _can_preview(extension: str) -> bool:
|
||||
return extension in INLINE_PREVIEW_EXTENSIONS or extension in STRUCTURED_PREVIEW_EXTENSIONS
|
||||
|
||||
@staticmethod
|
||||
def _read_text_preview(file_path: Path) -> str:
|
||||
encodings = ("utf-8", "utf-8-sig", "gbk")
|
||||
for encoding in encodings:
|
||||
try:
|
||||
return file_path.read_text(encoding=encoding)
|
||||
except UnicodeDecodeError:
|
||||
continue
|
||||
return "当前文本文件编码暂不支持在线解析。"
|
||||
|
||||
@staticmethod
|
||||
def _extract_docx_text(file_path: Path) -> str:
|
||||
try:
|
||||
with ZipFile(file_path) as archive:
|
||||
xml_content = archive.read("word/document.xml")
|
||||
except (BadZipFile, KeyError):
|
||||
return "当前 Word 文件解析失败。"
|
||||
|
||||
root = ElementTree.fromstring(xml_content)
|
||||
texts = [node.text.strip() for node in root.iter() if node.tag.endswith("}t") and node.text]
|
||||
return "\n".join(texts)
|
||||
|
||||
@staticmethod
|
||||
def _extract_xlsx_rows(file_path: Path) -> tuple[list[list[str]], int]:
|
||||
try:
|
||||
with ZipFile(file_path) as archive:
|
||||
shared_strings: list[str] = []
|
||||
if "xl/sharedStrings.xml" in archive.namelist():
|
||||
shared_root = ElementTree.fromstring(archive.read("xl/sharedStrings.xml"))
|
||||
shared_strings = [
|
||||
"".join(node.itertext()).strip()
|
||||
for node in shared_root.iter()
|
||||
if node.tag.endswith("}si")
|
||||
]
|
||||
|
||||
sheet_names = sorted(
|
||||
name
|
||||
for name in archive.namelist()
|
||||
if re.fullmatch(r"xl/worksheets/sheet\d+\.xml", name)
|
||||
)
|
||||
if not sheet_names:
|
||||
return [], 0
|
||||
|
||||
first_sheet = ElementTree.fromstring(archive.read(sheet_names[0]))
|
||||
rows: list[list[str]] = []
|
||||
for row in first_sheet.iter():
|
||||
if not row.tag.endswith("}row"):
|
||||
continue
|
||||
row_values: list[str] = []
|
||||
for cell in row:
|
||||
if not cell.tag.endswith("}c"):
|
||||
continue
|
||||
cell_type = cell.attrib.get("t")
|
||||
value_node = next((item for item in cell if item.tag.endswith("}v")), None)
|
||||
if value_node is None or value_node.text is None:
|
||||
row_values.append("")
|
||||
continue
|
||||
raw_value = value_node.text.strip()
|
||||
if cell_type == "s" and raw_value.isdigit():
|
||||
index = int(raw_value)
|
||||
row_values.append(shared_strings[index] if index < len(shared_strings) else raw_value)
|
||||
else:
|
||||
row_values.append(raw_value)
|
||||
if row_values:
|
||||
rows.append(row_values)
|
||||
|
||||
return rows, len(sheet_names)
|
||||
except (BadZipFile, ElementTree.ParseError, KeyError, ValueError):
|
||||
return [], 0
|
||||
|
||||
@staticmethod
|
||||
def _extract_pptx_slides(file_path: Path) -> list[list[str]]:
|
||||
try:
|
||||
with ZipFile(file_path) as archive:
|
||||
slide_names = sorted(
|
||||
name
|
||||
for name in archive.namelist()
|
||||
if re.fullmatch(r"ppt/slides/slide\d+\.xml", name)
|
||||
)
|
||||
slides: list[list[str]] = []
|
||||
for slide_name in slide_names:
|
||||
root = ElementTree.fromstring(archive.read(slide_name))
|
||||
texts = [node.text.strip() for node in root.iter() if node.tag.endswith("}t") and node.text]
|
||||
slides.append(texts)
|
||||
return slides
|
||||
except (BadZipFile, ElementTree.ParseError, KeyError):
|
||||
return []
|
||||
Reference in New Issue
Block a user