feat(server): 扩展知识库服务,添加knowledge API端点和schema定义,前端新增knowledge服务模块

This commit is contained in:
caoxiaozhu
2026-05-15 06:56:17 +00:00
parent 7209c75ad8
commit 4b1dae7ebc
38 changed files with 774 additions and 8012 deletions

View File

@@ -1,13 +1,15 @@
from __future__ import annotations
import hashlib
import json
import mimetypes
import re
from dataclasses import dataclass
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
import hashlib
import json
import mimetypes
import re
import shutil
import subprocess
from dataclasses import dataclass
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
from urllib.request import Request, urlopen
from uuid import uuid4
from xml.etree import ElementTree
@@ -16,8 +18,8 @@ from zipfile import BadZipFile, ZipFile
import jwt
from app.api.deps import CurrentUserContext
from app.core.config import get_settings
from app.core.logging import get_logger
from app.core.config import get_settings
from app.core.logging import get_logger
from app.schemas.knowledge import (
KnowledgeDocumentDetailRead,
KnowledgeDocumentRead,
@@ -64,7 +66,20 @@ IMAGE_EXTENSIONS = {"png", "jpg", "jpeg", "gif", "bmp", "webp", "svg"}
ARCHIVE_EXTENSIONS = {"zip", "rar", "7z"}
STRUCTURED_PREVIEW_EXTENSIONS = {"docx", "xlsx", "pptx"} | TEXT_EXTENSIONS
INLINE_PREVIEW_EXTENSIONS = {"pdf"} | IMAGE_EXTENSIONS
ONLYOFFICE_EDITABLE_EXTENSIONS = {"docx", "xlsx", "pptx"}
ONLYOFFICE_EDITABLE_EXTENSIONS = {"docx", "xlsx", "pptx"}
KNOWLEDGE_INGEST_SYNC_STALE_SECONDS = 90
KNOWLEDGE_INGEST_STATUS_PUBLISHED = 1
KNOWLEDGE_INGEST_STATUS_SYNCING = 2
KNOWLEDGE_INGEST_STATUS_INGESTED = 3
KNOWLEDGE_INGEST_STATUS_FAILED = 4
KNOWLEDGE_INGEST_STATUS_META = {
KNOWLEDGE_INGEST_STATUS_PUBLISHED: ("待归纳", "muted"),
KNOWLEDGE_INGEST_STATUS_SYNCING: ("正归纳", "warning"),
KNOWLEDGE_INGEST_STATUS_INGESTED: ("已归纳", "success"),
KNOWLEDGE_INGEST_STATUS_FAILED: ("归纳失败", "danger"),
}
@dataclass(slots=True)
@@ -78,24 +93,40 @@ def prepare_knowledge_library() -> None:
KnowledgeService().ensure_library_ready()
class KnowledgeService:
def __init__(self, storage_root: Path | None = None) -> None:
settings = get_settings()
self.storage_root = Path(storage_root or settings.resolved_storage_root_dir)
self.library_root = self.storage_root / "knowledge"
self.index_path = self.library_root / ".index.json"
def ensure_library_ready(self) -> None:
self.library_root.mkdir(parents=True, exist_ok=True)
for folder_name in FIXED_KNOWLEDGE_FOLDERS:
(self.library_root / folder_name).mkdir(parents=True, exist_ok=True)
if not self.index_path.exists():
self._save_index({"version": 1, "documents": []})
index = self._load_index()
if self._reconcile_index(index):
self._save_index(index)
class KnowledgeService:
def __init__(self, storage_root: Path | None = None) -> None:
settings = get_settings()
self.storage_root = Path(storage_root or settings.resolved_storage_root_dir)
self.library_root = self.storage_root / "knowledge"
self.index_path = self.library_root / ".index.json"
self.llm_wiki_root = self.library_root / ".llm_wiki"
self.llm_wiki_documents_root = self.llm_wiki_root / "documents"
self.llm_wiki_index_path = self.llm_wiki_root / "index.json"
self.llm_wiki_sync_runs_path = self.llm_wiki_root / "sync_runs.json"
def ensure_library_ready(self) -> None:
self.library_root.mkdir(parents=True, exist_ok=True)
for folder_name in FIXED_KNOWLEDGE_FOLDERS:
(self.library_root / folder_name).mkdir(parents=True, exist_ok=True)
self.llm_wiki_documents_root.mkdir(parents=True, exist_ok=True)
if not self.index_path.exists():
self._save_index({"version": 1, "documents": []})
if not self.llm_wiki_index_path.exists():
self.llm_wiki_index_path.write_text(
json.dumps({"documents": []}, ensure_ascii=False, indent=2),
encoding="utf-8",
)
if not self.llm_wiki_sync_runs_path.exists():
self.llm_wiki_sync_runs_path.write_text(
json.dumps({"runs": []}, ensure_ascii=False, indent=2),
encoding="utf-8",
)
index = self._load_index()
if self._reconcile_index(index):
self._save_index(index)
def list_library(self) -> KnowledgeLibraryRead:
documents = self._load_documents()
@@ -109,21 +140,23 @@ class KnowledgeService:
]
return KnowledgeLibraryRead(folders=folders, documents=documents)
def get_document_detail(self, document_id: str) -> KnowledgeDocumentDetailRead:
self.ensure_library_ready()
index = self._load_index()
entry = self._require_entry(index, document_id)
preview_kind, preview_pages = self._build_preview(entry)
document = self._serialize_document(entry)
def get_document_detail(self, document_id: str) -> KnowledgeDocumentDetailRead:
self.ensure_library_ready()
index = self._load_index()
if self._reconcile_document_ingest_statuses(index, document_ids=[document_id]):
self._save_index(index)
entry = self._require_entry(index, document_id)
preview_kind, preview_pages = self._build_preview(entry)
document = self._serialize_document(entry)
return KnowledgeDocumentDetailRead(
**document.model_dump(),
previewKind=preview_kind,
previewPages=preview_pages,
)
def upload_document(
self,
folder: str,
def upload_document(
self,
folder: str,
filename: str,
content: bytes,
current_user: CurrentUserContext,
@@ -162,22 +195,23 @@ class KnowledgeService:
checksum = hashlib.sha256(content).hexdigest()
extension = self._extract_extension(normalized_name)
if existing_entry is None:
entry = {
"id": document_id,
"folder": normalized_folder,
"original_name": normalized_name,
if existing_entry is None:
entry = {
"id": document_id,
"folder": normalized_folder,
"original_name": normalized_name,
"stored_name": stored_name,
"mime_type": mime_type,
"extension": extension,
"size_bytes": len(content),
"sha256": checksum,
"created_at": now,
"updated_at": now,
"uploaded_by": current_user.name,
"version_number": 1,
}
index["documents"].append(entry)
"created_at": now,
"updated_at": now,
"uploaded_by": current_user.name,
"version_number": 1,
"ingest_status": KNOWLEDGE_INGEST_STATUS_PUBLISHED,
}
index["documents"].append(entry)
logger.info(
"Knowledge document uploaded id=%s folder=%s filename=%s by=%s",
document_id,
@@ -193,12 +227,13 @@ class KnowledgeService:
"extension": extension,
"size_bytes": len(content),
"sha256": checksum,
"updated_at": now,
"uploaded_by": current_user.name,
"version_number": int(existing_entry.get("version_number", 1)) + 1,
}
)
entry = existing_entry
"updated_at": now,
"uploaded_by": current_user.name,
"version_number": int(existing_entry.get("version_number", 1)) + 1,
"ingest_status": KNOWLEDGE_INGEST_STATUS_PUBLISHED,
}
)
entry = existing_entry
logger.info(
"Knowledge document updated id=%s folder=%s filename=%s by=%s",
document_id,
@@ -222,16 +257,86 @@ class KnowledgeService:
self._save_index(index)
logger.info("Knowledge document deleted id=%s filename=%s", document_id, entry["original_name"])
def get_document_content(self, document_id: str) -> tuple[Path, str, str]:
self.ensure_library_ready()
index = self._load_index()
entry = self._require_entry(index, document_id)
def get_document_content(self, document_id: str) -> tuple[Path, str, str]:
self.ensure_library_ready()
index = self._load_index()
entry = self._require_entry(index, document_id)
file_path = self._resolve_document_path(entry)
if not file_path.exists():
raise FileNotFoundError(entry["original_name"])
return file_path, entry["mime_type"], entry["original_name"]
return file_path, entry["mime_type"], entry["original_name"]
def list_folder_documents(self, folder: str | None = None) -> list[dict[str, Any]]:
self.ensure_library_ready()
index = self._load_index()
if self._reconcile_document_ingest_statuses(index):
self._save_index(index)
documents = list(index.get("documents") or [])
if folder is None:
return documents
normalized_folder = self._normalize_folder(folder)
return [item for item in documents if item.get("folder") == normalized_folder]
def get_document_entry(self, document_id: str) -> dict[str, Any]:
self.ensure_library_ready()
index = self._load_index()
if self._reconcile_document_ingest_statuses(index, document_ids=[document_id]):
self._save_index(index)
return dict(self._require_entry(index, document_id))
def set_document_ingest_statuses(self, document_ids: list[str], status_code: int) -> None:
self.ensure_library_ready()
normalized_ids = {str(item).strip() for item in document_ids if str(item).strip()}
if not normalized_ids:
return
index = self._load_index()
changed = False
updated_at = datetime.now(UTC).isoformat()
for entry in index.get("documents", []):
if str(entry.get("id") or "").strip() not in normalized_ids:
continue
if self._normalize_ingest_status_code(entry.get("ingest_status")) == status_code:
continue
entry["ingest_status"] = status_code
entry["ingest_status_updated_at"] = updated_at
changed = True
if changed:
self._save_index(index)
def refresh_document_ingest_statuses(
self,
document_ids: list[str] | None = None,
*,
preserve_syncing: bool = True,
) -> None:
self.ensure_library_ready()
index = self._load_index()
if self._reconcile_document_ingest_statuses(
index,
document_ids=document_ids,
preserve_syncing=preserve_syncing,
):
self._save_index(index)
def get_llm_wiki_root(self) -> Path:
self.ensure_library_ready()
return self.llm_wiki_root
def extract_document_text(self, document_id: str) -> str:
self.ensure_library_ready()
entry = self.get_document_entry(document_id)
file_path = self._resolve_document_path(entry)
if not file_path.exists():
raise FileNotFoundError(entry["original_name"])
return self._extract_document_text_from_path(
file_path=file_path,
original_name=str(entry.get("original_name") or file_path.name),
mime_type=str(entry.get("mime_type") or "application/octet-stream"),
)
def build_onlyoffice_config(
self,
@@ -365,33 +470,41 @@ class KnowledgeService:
actor_name = callback.users[0] if callback.users else "ONLYOFFICE"
self._replace_document_content(document_id, content, actor_name=actor_name)
def _load_documents(self) -> list[KnowledgeDocumentRead]:
self.ensure_library_ready()
index = self._load_index()
self._reconcile_index(index)
self._save_index(index)
documents = [self._serialize_document(entry) for entry in index["documents"]]
return sorted(documents, key=lambda item: item.time, reverse=True)
def _serialize_document(self, entry: dict[str, Any]) -> KnowledgeDocumentRead:
def _load_documents(self) -> list[KnowledgeDocumentRead]:
self.ensure_library_ready()
index = self._load_index()
changed = self._reconcile_index(index)
changed = self._reconcile_document_ingest_statuses(index) or changed
if changed:
self._save_index(index)
documents = [self._serialize_document(entry) for entry in index["documents"]]
return sorted(documents, key=lambda item: item.time, reverse=True)
def _serialize_document(self, entry: dict[str, Any]) -> KnowledgeDocumentRead:
extension = entry.get("extension") or self._extract_extension(entry["original_name"])
file_type = self._resolve_file_type(extension)
size_bytes = int(entry.get("size_bytes") or 0)
updated_at = self._format_time(entry.get("updated_at") or entry.get("created_at"))
return KnowledgeDocumentRead(
id=entry["id"],
name=entry["original_name"],
folder=entry["folder"],
tag=f"{entry['folder']} / {extension.upper() or 'FILE'}",
time=updated_at,
version=f"v{int(entry.get('version_number', 1))}.0",
state="已发布",
stateTone="success",
owner=entry.get("uploaded_by") or "系统导入",
icon=ICON_BY_TYPE.get(file_type, ICON_BY_TYPE["binary"]),
fileType=file_type,
file_type = self._resolve_file_type(extension)
size_bytes = int(entry.get("size_bytes") or 0)
updated_at = self._format_time(entry.get("updated_at") or entry.get("created_at"))
state_code = self._normalize_ingest_status_code(entry.get("ingest_status"))
state_label, state_tone = KNOWLEDGE_INGEST_STATUS_META.get(
state_code,
KNOWLEDGE_INGEST_STATUS_META[KNOWLEDGE_INGEST_STATUS_PUBLISHED],
)
return KnowledgeDocumentRead(
id=entry["id"],
name=entry["original_name"],
folder=entry["folder"],
tag=f"{entry['folder']} / {extension.upper() or 'FILE'}",
time=updated_at,
version=f"v{int(entry.get('version_number', 1))}.0",
stateCode=state_code,
state=state_label,
stateTone=state_tone,
owner=entry.get("uploaded_by") or "系统导入",
icon=ICON_BY_TYPE.get(file_type, ICON_BY_TYPE["binary"]),
fileType=file_type,
fileTypeLabel=self._resolve_file_type_label(file_type),
summary=f"{entry['folder']} · {extension.upper() or 'FILE'} · {self._format_size(size_bytes)}",
mimeType=entry.get("mime_type") or "application/octet-stream",
@@ -551,27 +664,31 @@ class KnowledgeService:
encoding="utf-8",
)
def _reconcile_index(self, index: dict[str, Any]) -> bool:
changed = False
documents = index.setdefault("documents", [])
def _reconcile_index(self, index: dict[str, Any]) -> bool:
changed = False
documents = index.setdefault("documents", [])
known_by_stored = {
(item["folder"], item["stored_name"]): item
for item in documents
if item.get("folder") and item.get("stored_name")
}
existing_items: list[dict[str, Any]] = []
for item in documents:
file_path = self._resolve_document_path(item)
if file_path.exists():
item["size_bytes"] = file_path.stat().st_size
item["extension"] = self._extract_extension(item["original_name"])
item["mime_type"] = item.get("mime_type") or (
mimetypes.guess_type(item["original_name"])[0] or "application/octet-stream"
)
existing_items.append(item)
else:
changed = True
existing_items: list[dict[str, Any]] = []
for item in documents:
file_path = self._resolve_document_path(item)
if file_path.exists():
item["size_bytes"] = file_path.stat().st_size
item["extension"] = self._extract_extension(item["original_name"])
item["mime_type"] = item.get("mime_type") or (
mimetypes.guess_type(item["original_name"])[0] or "application/octet-stream"
)
normalized_status = self._normalize_ingest_status_code(item.get("ingest_status"))
if item.get("ingest_status") != normalized_status:
item["ingest_status"] = normalized_status
changed = True
existing_items.append(item)
else:
changed = True
for folder_name in FIXED_KNOWLEDGE_FOLDERS:
folder_path = self.library_root / folder_name
@@ -596,18 +713,128 @@ class KnowledgeService:
"extension": self._extract_extension(original_name),
"size_bytes": stat.st_size,
"sha256": "",
"created_at": datetime.fromtimestamp(stat.st_ctime, tz=UTC).isoformat(),
"updated_at": datetime.fromtimestamp(stat.st_mtime, tz=UTC).isoformat(),
"uploaded_by": "系统导入",
"version_number": 1,
}
)
changed = True
"created_at": datetime.fromtimestamp(stat.st_ctime, tz=UTC).isoformat(),
"updated_at": datetime.fromtimestamp(stat.st_mtime, tz=UTC).isoformat(),
"uploaded_by": "系统导入",
"version_number": 1,
"ingest_status": KNOWLEDGE_INGEST_STATUS_PUBLISHED,
}
)
changed = True
if changed or len(existing_items) != len(documents):
index["documents"] = existing_items
return True
return False
if changed or len(existing_items) != len(documents):
index["documents"] = existing_items
return True
return False
def _reconcile_document_ingest_statuses(
self,
index: dict[str, Any],
*,
document_ids: list[str] | None = None,
preserve_syncing: bool = True,
) -> bool:
changed = False
target_ids = {str(item).strip() for item in document_ids or [] if str(item).strip()}
wiki_index = self._load_llm_wiki_index()
wiki_by_document_id = {
str(item.get("document_id") or "").strip(): item
for item in list(wiki_index.get("documents") or [])
if str(item.get("document_id") or "").strip()
}
for entry in index.get("documents", []):
document_id = str(entry.get("id") or "").strip()
if target_ids and document_id not in target_ids:
continue
current_status = self._normalize_ingest_status_code(entry.get("ingest_status"))
if entry.get("ingest_status") != current_status:
entry["ingest_status"] = current_status
changed = True
if (
current_status == KNOWLEDGE_INGEST_STATUS_SYNCING
and preserve_syncing
and not self._is_syncing_status_stale(entry)
):
continue
desired_status = (
KNOWLEDGE_INGEST_STATUS_INGESTED
if self._has_ingested_llm_wiki_document(entry, wiki_by_document_id.get(document_id))
else KNOWLEDGE_INGEST_STATUS_PUBLISHED
)
if current_status == KNOWLEDGE_INGEST_STATUS_FAILED and desired_status != KNOWLEDGE_INGEST_STATUS_INGESTED:
continue
if current_status != desired_status:
entry["ingest_status"] = desired_status
changed = True
return changed
def _load_llm_wiki_index(self) -> dict[str, Any]:
try:
payload = json.loads(self.llm_wiki_index_path.read_text(encoding="utf-8"))
except (FileNotFoundError, json.JSONDecodeError):
payload = {"documents": []}
payload.setdefault("documents", [])
return payload
def _has_ingested_llm_wiki_document(
self,
entry: dict[str, Any],
wiki_document: dict[str, Any] | None,
) -> bool:
if not isinstance(wiki_document, dict):
return False
if int(wiki_document.get("knowledge_candidate_count") or 0) <= 0:
return False
current_signature = self._build_llm_wiki_document_signature(entry)
wiki_signature = wiki_document.get("signature")
if isinstance(wiki_signature, dict):
return wiki_signature == current_signature
return (
str(wiki_document.get("document_id") or "").strip() == str(entry.get("id") or "").strip()
and str(wiki_document.get("checksum") or "").strip() == str(entry.get("sha256") or "").strip()
)
@staticmethod
def _build_llm_wiki_document_signature(entry: dict[str, Any]) -> dict[str, Any]:
return {
"document_id": str(entry.get("id") or ""),
"original_name": str(entry.get("original_name") or ""),
"stored_name": str(entry.get("stored_name") or ""),
"sha256": str(entry.get("sha256") or ""),
"version_number": int(entry.get("version_number") or 1),
"updated_at": str(entry.get("updated_at") or ""),
}
@staticmethod
def _normalize_ingest_status_code(value: Any) -> int:
try:
status_code = int(value)
except (TypeError, ValueError):
return KNOWLEDGE_INGEST_STATUS_PUBLISHED
if status_code not in KNOWLEDGE_INGEST_STATUS_META:
return KNOWLEDGE_INGEST_STATUS_PUBLISHED
return status_code
@staticmethod
def _is_syncing_status_stale(entry: dict[str, Any]) -> bool:
raw_value = str(entry.get("ingest_status_updated_at") or "").strip()
if not raw_value:
return True
try:
updated_at = datetime.fromisoformat(raw_value)
except ValueError:
return True
if updated_at.tzinfo is None:
updated_at = updated_at.replace(tzinfo=UTC)
age_seconds = (datetime.now(UTC) - updated_at.astimezone(UTC)).total_seconds()
return age_seconds >= KNOWLEDGE_INGEST_SYNC_STALE_SECONDS
def _require_entry(self, index: dict[str, Any], document_id: str) -> dict[str, Any]:
for entry in index["documents"]:
@@ -746,27 +973,109 @@ class KnowledgeService:
def _can_preview(extension: str) -> bool:
return extension in INLINE_PREVIEW_EXTENSIONS or extension in STRUCTURED_PREVIEW_EXTENSIONS
@staticmethod
def _read_text_preview(file_path: Path) -> str:
encodings = ("utf-8", "utf-8-sig", "gbk")
for encoding in encodings:
try:
@staticmethod
def _read_text_preview(file_path: Path) -> str:
encodings = ("utf-8", "utf-8-sig", "gbk")
for encoding in encodings:
try:
return file_path.read_text(encoding=encoding)
except UnicodeDecodeError:
continue
return "当前文本文件编码暂不支持在线解析。"
@staticmethod
def _extract_docx_text(file_path: Path) -> str:
try:
with ZipFile(file_path) as archive:
xml_content = archive.read("word/document.xml")
except (BadZipFile, KeyError):
return "当前 Word 文件解析失败。"
def _extract_docx_text(file_path: Path) -> str:
try:
with ZipFile(file_path) as archive:
xml_content = archive.read("word/document.xml")
except (BadZipFile, KeyError):
return "当前 Word 文件解析失败。"
root = ElementTree.fromstring(xml_content)
texts = [node.text.strip() for node in root.iter() if node.tag.endswith("}t") and node.text]
return "\n".join(texts)
texts = [node.text.strip() for node in root.iter() if node.tag.endswith("}t") and node.text]
return "\n".join(texts)
def _extract_document_text_from_path(
self,
*,
file_path: Path,
original_name: str,
mime_type: str,
) -> str:
extension = self._extract_extension(original_name)
if extension in TEXT_EXTENSIONS:
return self._normalize_extracted_text(self._read_text_preview(file_path))
if extension == "docx":
return self._normalize_extracted_text(self._extract_docx_text(file_path))
if extension == "pdf":
text = self._normalize_extracted_text(self._extract_pdf_text(file_path))
if text:
return text
return self._normalize_extracted_text(
self._extract_text_with_ocr(
file_path=file_path,
original_name=original_name,
mime_type=mime_type,
)
)
if extension in IMAGE_EXTENSIONS:
return self._normalize_extracted_text(
self._extract_text_with_ocr(
file_path=file_path,
original_name=original_name,
mime_type=mime_type,
)
)
return ""
@staticmethod
def _normalize_extracted_text(text: str) -> str:
normalized = str(text or "").replace("\r\n", "\n").replace("\r", "\n")
normalized = re.sub(r"\n{3,}", "\n\n", normalized)
return normalized.strip()
@staticmethod
def _extract_pdf_text(file_path: Path) -> str:
pdftotext_bin = shutil.which("pdftotext")
if not pdftotext_bin:
return ""
completed = subprocess.run(
[pdftotext_bin, str(file_path), "-"],
capture_output=True,
text=True,
timeout=40,
check=False,
)
if completed.returncode != 0:
return ""
return str(completed.stdout or "")
@staticmethod
def _extract_text_with_ocr(
*,
file_path: Path,
original_name: str,
mime_type: str,
) -> str:
try:
from app.services.ocr import OcrService
result = OcrService().recognize_files(
[(original_name, file_path.read_bytes(), mime_type)]
)
except Exception:
return ""
parts: list[str] = []
for document in result.documents:
text = str(getattr(document, "text", "") or "").strip()
summary = str(getattr(document, "summary", "") or "").strip()
if text:
parts.append(text)
elif summary:
parts.append(summary)
return "\n\n".join(part for part in parts if part)
@staticmethod
def _extract_xlsx_sheets(file_path: Path) -> list[tuple[str, list[list[str]]]]: