feat: 完善知识库、策略预览与OnlyOffice集成

## 配置与环境
- .env.example: 更新环境变量配置
- docker-compose.yml: 完善Docker编排配置
- docker/README.md: 更新Docker文档

## 后端知识库模块
- endpoints/knowledge.py: 增强知识库API端点
- schemas/knowledge.py: 扩展知识库数据模型
- services/knowledge.py: 完善知识库业务逻辑
- config.py: 优化配置管理
- storage/knowledge/.index.json: 更新知识库索引

## 前端功能
- api.js: 完善API服务层
- knowledge.js: 优化知识库服务
- onlyoffice.js: 新增OnlyOffice文档服务集成
- TopBar.vue: 优化顶部导航栏
- PoliciesView.vue: 完善策略视图
- AppShellRouteView.vue: 新增应用外壳路由视图
- views/scripts/PoliciesView.js: 优化策略脚本
- policiesPreviewFormatters.js: 新增策略预览格式化工具

## 样式
- policies-view.css: 完善策略页样式

## 测试
- api-request.test.mjs: API请求测试
- onlyoffice-service.test.mjs: OnlyOffice服务测试
- policies-preview-formatters.test.mjs: 策略预览格式化测试
This commit is contained in:
caoxiaozhu
2026-05-09 04:25:30 +00:00
parent 619281afc3
commit d9ffa9ce2c
21 changed files with 1469 additions and 508 deletions

View File

@@ -4,13 +4,17 @@ import hashlib
import json
import mimetypes
import re
from dataclasses import dataclass
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
from urllib.request import Request, urlopen
from uuid import uuid4
from xml.etree import ElementTree
from zipfile import BadZipFile, ZipFile
import jwt
from app.api.deps import CurrentUserContext
from app.core.config import get_settings
from app.core.logging import get_logger
@@ -19,6 +23,7 @@ from app.schemas.knowledge import (
KnowledgeDocumentRead,
KnowledgeFolderRead,
KnowledgeLibraryRead,
KnowledgeOnlyOfficeConfigRead,
KnowledgePreviewBlockRead,
KnowledgePreviewPageRead,
KnowledgePreviewStatRead,
@@ -58,6 +63,14 @@ IMAGE_EXTENSIONS = {"png", "jpg", "jpeg", "gif", "bmp", "webp", "svg"}
ARCHIVE_EXTENSIONS = {"zip", "rar", "7z"}
STRUCTURED_PREVIEW_EXTENSIONS = {"docx", "xlsx", "pptx"} | TEXT_EXTENSIONS
INLINE_PREVIEW_EXTENSIONS = {"pdf"} | IMAGE_EXTENSIONS
ONLYOFFICE_EDITABLE_EXTENSIONS = {"docx", "xlsx", "pptx"}
@dataclass(slots=True)
class OnlyOfficeCallbackPayload:
status: int
download_url: str
users: list[str]
def prepare_knowledge_library() -> None:
@@ -219,6 +232,114 @@ class KnowledgeService:
return file_path, entry["mime_type"], entry["original_name"]
def build_onlyoffice_config(
self,
document_id: str,
current_user: CurrentUserContext,
) -> KnowledgeOnlyOfficeConfigRead:
self.ensure_library_ready()
settings = get_settings()
if not settings.onlyoffice_enabled:
raise ValueError("ONLYOFFICE 预览未启用。")
if not settings.onlyoffice_public_url or not settings.onlyoffice_backend_url:
raise ValueError("ONLYOFFICE 地址配置不完整。")
if not settings.onlyoffice_jwt_secret:
raise ValueError("ONLYOFFICE JWT 密钥未配置。")
index = self._load_index()
entry = self._require_entry(index, document_id)
extension = self._extract_extension(entry["original_name"])
if extension not in ONLYOFFICE_EDITABLE_EXTENSIONS:
raise ValueError("当前文件格式不支持 ONLYOFFICE 预览。")
document_type = self._resolve_onlyoffice_document_type(extension)
backend_base_url = settings.onlyoffice_backend_url.rstrip("/")
public_url = settings.onlyoffice_public_url.rstrip("/")
access_token = self._build_onlyoffice_access_token(document_id)
document_url = (
f"{backend_base_url}{settings.api_v1_prefix}/knowledge/documents/{document_id}/onlyoffice/content"
f"?access_token={access_token}"
)
callback_url = (
f"{backend_base_url}{settings.api_v1_prefix}/knowledge/documents/{document_id}/onlyoffice/callback"
)
can_edit = current_user.is_admin or "manager" in current_user.role_codes
document_key = self._build_onlyoffice_document_key(entry)
config: dict[str, Any] = {
"documentType": document_type,
"document": {
"fileType": extension,
"key": document_key,
"title": entry["original_name"],
"url": document_url,
"permissions": {
"download": True,
"edit": can_edit,
"print": True,
"copy": True,
},
},
"editorConfig": {
"mode": "edit" if can_edit else "view",
"lang": "zh-CN",
"callbackUrl": callback_url,
"user": {
"id": current_user.username,
"name": current_user.name,
},
"customization": {
"compactHeader": True,
"compactToolbar": True,
"toolbarNoTabs": False,
"autosave": can_edit,
"forcesave": can_edit,
},
},
"width": "100%",
"height": "100%",
}
config["token"] = jwt.encode(config, settings.onlyoffice_jwt_secret, algorithm="HS256")
return KnowledgeOnlyOfficeConfigRead(
documentServerUrl=public_url,
config=config,
)
def validate_onlyoffice_access_token(self, document_id: str, access_token: str) -> None:
settings = get_settings()
try:
payload = jwt.decode(
access_token,
settings.onlyoffice_jwt_secret,
algorithms=["HS256"],
)
except jwt.PyJWTError as exc:
raise ValueError("ONLYOFFICE 文件访问令牌无效。") from exc
if payload.get("scope") != "onlyoffice-content" or payload.get("document_id") != document_id:
raise ValueError("ONLYOFFICE 文件访问令牌无效。")
def handle_onlyoffice_callback(self, document_id: str, payload: dict[str, Any]) -> None:
self.ensure_library_ready()
callback = self._parse_onlyoffice_callback(payload)
if callback.status not in {2, 6} or not callback.download_url:
return
logger.info(
"ONLYOFFICE callback received id=%s status=%s users=%s",
document_id,
callback.status,
",".join(callback.users) if callback.users else "-",
)
request = Request(callback.download_url, headers={"User-Agent": "x-financial-onlyoffice"})
with urlopen(request, timeout=30) as response: # noqa: S310
content = response.read()
actor_name = callback.users[0] if callback.users else "ONLYOFFICE"
self._replace_document_content(document_id, content, actor_name=actor_name)
def _load_documents(self) -> list[KnowledgeDocumentRead]:
self.ensure_library_ready()
index = self._load_index()
@@ -275,7 +396,7 @@ class KnowledgeService:
return "text", [self._build_text_preview_page(entry, text)]
if extension == "xlsx":
return "table", [self._build_xlsx_preview_page(entry, file_path)]
return "table", self._build_xlsx_preview_pages(entry, file_path)
if extension == "pptx":
return "slides", self._build_pptx_preview_pages(entry, file_path)
@@ -328,31 +449,39 @@ class KnowledgeService:
blocks=blocks,
)
def _build_xlsx_preview_page(
def _build_xlsx_preview_pages(
self, entry: dict[str, Any], file_path: Path
) -> KnowledgePreviewPageRead:
rows, sheet_count = self._extract_xlsx_rows(file_path)
if not rows:
rows = [["未提取到表格内容。"]]
) -> list[KnowledgePreviewPageRead]:
sheets = self._extract_xlsx_sheets(file_path)
if not sheets:
sheets = [("Sheet 1", [["未提取到表格内容。"]])]
blocks = [
KnowledgePreviewBlockRead(
heading=f"{index + 1}",
lines=[" | ".join(cell for cell in row if cell) or "(空行)"],
preview_pages: list[KnowledgePreviewPageRead] = []
sheet_count = len(sheets)
for sheet_name, rows in sheets[:8]:
visible_rows = rows[:12] if rows else [["未提取到表格内容。"]]
blocks = [
KnowledgePreviewBlockRead(
heading=f"{index + 1}",
lines=[" | ".join((cell or "") for cell in row)],
)
for index, row in enumerate(visible_rows)
]
preview_pages.append(
KnowledgePreviewPageRead(
title=sheet_name,
subtitle="表格内容预览",
stats=[
KnowledgePreviewStatRead(label="工作表数量", value=str(sheet_count)),
KnowledgePreviewStatRead(label="预览行数", value=str(len(visible_rows))),
KnowledgePreviewStatRead(label="文件大小", value=self._format_size(entry["size_bytes"])),
],
blocks=blocks,
)
)
for index, row in enumerate(rows[:12])
]
return KnowledgePreviewPageRead(
title=entry["original_name"],
subtitle="表格内容预览",
stats=[
KnowledgePreviewStatRead(label="工作表数量", value=str(sheet_count)),
KnowledgePreviewStatRead(label="预览行数", value=str(min(len(rows), 12))),
KnowledgePreviewStatRead(label="文件大小", value=self._format_size(entry["size_bytes"])),
],
blocks=blocks,
)
return preview_pages
def _build_pptx_preview_pages(
self, entry: dict[str, Any], file_path: Path
@@ -464,6 +593,29 @@ class KnowledgeService:
def _resolve_document_path(self, entry: dict[str, Any]) -> Path:
return self.library_root / entry["folder"] / entry["stored_name"]
def _replace_document_content(self, document_id: str, content: bytes, actor_name: str) -> KnowledgeDocumentDetailRead:
index = self._load_index()
entry = self._require_entry(index, document_id)
current_user = CurrentUserContext(
username="onlyoffice",
name=actor_name or "ONLYOFFICE",
role_codes=["manager"],
is_admin=True,
)
return self.upload_document(
folder=entry["folder"],
filename=entry["original_name"],
content=content,
current_user=current_user,
)
@staticmethod
def _parse_onlyoffice_callback(payload: dict[str, Any]) -> OnlyOfficeCallbackPayload:
status = int(payload.get("status") or 0)
download_url = str(payload.get("url") or "").strip()
users = [str(item).strip() for item in payload.get("users") or [] if str(item).strip()]
return OnlyOfficeCallbackPayload(status=status, download_url=download_url, users=users)
@staticmethod
def _normalize_filename(filename: str) -> str:
normalized = Path(str(filename or "").strip()).name.strip()
@@ -484,6 +636,30 @@ class KnowledgeService:
suffix = Path(filename).suffix.lower().lstrip(".")
return suffix
@staticmethod
def _build_onlyoffice_document_key(entry: dict[str, Any]) -> str:
version = int(entry.get("version_number", 1))
checksum = str(entry.get("sha256") or "")[:12]
return f"{entry['id']}-v{version}-{checksum or 'nochecksum'}"
def _build_onlyoffice_access_token(self, document_id: str) -> str:
settings = get_settings()
payload = {
"scope": "onlyoffice-content",
"document_id": document_id,
}
return jwt.encode(payload, settings.onlyoffice_jwt_secret, algorithm="HS256")
@staticmethod
def _resolve_onlyoffice_document_type(extension: str) -> str:
if extension in WORD_EXTENSIONS:
return "word"
if extension in EXCEL_EXTENSIONS:
return "cell"
if extension in PPT_EXTENSIONS:
return "slide"
raise ValueError("当前文件格式不支持 ONLYOFFICE 预览。")
@staticmethod
def _parse_stored_name(stored_name: str) -> tuple[str, str]:
if "__" not in stored_name:
@@ -568,7 +744,7 @@ class KnowledgeService:
return "\n".join(texts)
@staticmethod
def _extract_xlsx_rows(file_path: Path) -> tuple[list[list[str]], int]:
def _extract_xlsx_sheets(file_path: Path) -> list[tuple[str, list[list[str]]]]:
try:
with ZipFile(file_path) as archive:
shared_strings: list[str] = []
@@ -580,40 +756,90 @@ class KnowledgeService:
if node.tag.endswith("}si")
]
sheet_names = sorted(
sheet_files = sorted(
name
for name in archive.namelist()
if re.fullmatch(r"xl/worksheets/sheet\d+\.xml", name)
)
if not sheet_names:
return [], 0
if not sheet_files:
return []
first_sheet = ElementTree.fromstring(archive.read(sheet_names[0]))
rows: list[list[str]] = []
for row in first_sheet.iter():
if not row.tag.endswith("}row"):
relationship_targets: dict[str, str] = {}
if "xl/_rels/workbook.xml.rels" in archive.namelist():
rel_root = ElementTree.fromstring(archive.read("xl/_rels/workbook.xml.rels"))
for node in rel_root.iter():
if not node.tag.endswith("Relationship"):
continue
rel_id = node.attrib.get("Id")
target = node.attrib.get("Target")
if not rel_id or not target:
continue
normalized = target.lstrip("/")
if not normalized.startswith("xl/"):
normalized = f"xl/{normalized.lstrip('./')}"
relationship_targets[rel_id] = normalized
ordered_sheets: list[tuple[str, str]] = []
if "xl/workbook.xml" in archive.namelist():
workbook_root = ElementTree.fromstring(archive.read("xl/workbook.xml"))
for index, node in enumerate(workbook_root.iter()):
if not node.tag.endswith("sheet"):
continue
sheet_name = node.attrib.get("name") or f"Sheet {index + 1}"
relationship_id = next(
(value for key, value in node.attrib.items() if key.endswith("}id")),
None,
)
target = relationship_targets.get(relationship_id or "")
if target:
ordered_sheets.append((sheet_name, target))
if not ordered_sheets:
ordered_sheets = [
(f"Sheet {index + 1}", sheet_file)
for index, sheet_file in enumerate(sheet_files)
]
preview_sheets: list[tuple[str, list[list[str]]]] = []
for sheet_name, target in ordered_sheets:
if target not in archive.namelist():
continue
row_values: list[str] = []
for cell in row:
if not cell.tag.endswith("}c"):
continue
cell_type = cell.attrib.get("t")
value_node = next((item for item in cell if item.tag.endswith("}v")), None)
if value_node is None or value_node.text is None:
row_values.append("")
continue
raw_value = value_node.text.strip()
if cell_type == "s" and raw_value.isdigit():
index = int(raw_value)
row_values.append(shared_strings[index] if index < len(shared_strings) else raw_value)
else:
row_values.append(raw_value)
if row_values:
rows.append(row_values)
return rows, len(sheet_names)
sheet_root = ElementTree.fromstring(archive.read(target))
rows: list[list[str]] = []
for row in sheet_root.iter():
if not row.tag.endswith("}row"):
continue
row_values: list[str] = []
for cell in row:
if not cell.tag.endswith("}c"):
continue
cell_type = cell.attrib.get("t")
value_node = next((item for item in cell if item.tag.endswith("}v")), None)
if cell_type == "inlineStr":
text_node = next((item for item in cell.iter() if item.tag.endswith("}t")), None)
row_values.append((text_node.text or "").strip() if text_node is not None else "")
continue
if value_node is None or value_node.text is None:
row_values.append("")
continue
raw_value = value_node.text.strip()
if cell_type == "s" and raw_value.isdigit():
index = int(raw_value)
row_values.append(shared_strings[index] if index < len(shared_strings) else raw_value)
else:
row_values.append(raw_value)
if row_values:
rows.append(row_values)
preview_sheets.append((sheet_name, rows))
return preview_sheets
except (BadZipFile, ElementTree.ParseError, KeyError, ValueError):
return [], 0
return []
@staticmethod
def _extract_pptx_slides(file_path: Path) -> list[list[str]]: