feat: 增强知识库功能,优化索引和RAG检索

This commit is contained in:
caoxiaozhu
2026-05-18 02:49:39 +00:00
parent 55e0591a5e
commit 4414ffb34c
18 changed files with 5656 additions and 4659 deletions

View File

@@ -1,5 +1,6 @@
from __future__ import annotations
import threading
from concurrent.futures import Future, ThreadPoolExecutor
from datetime import UTC, datetime
from time import perf_counter
@@ -18,6 +19,7 @@ from app.services.knowledge import (
from app.services.knowledge_rag import KnowledgeRagService
logger = get_logger("app.services.knowledge_index_tasks")
HEARTBEAT_INTERVAL_SECONDS = 10
class KnowledgeIndexTaskManager:
@@ -58,6 +60,15 @@ class KnowledgeIndexTaskManager:
session_factory = get_session_factory()
db = session_factory()
started = perf_counter()
heartbeat_stop = threading.Event()
heartbeat_thread: threading.Thread | None = None
tool_call_id = ""
tool_request_json = {
"agent": AgentName.HERMES.value,
"folder": folder,
"document_ids": document_ids,
"force": force,
}
try:
run_service = AgentRunService(db)
@@ -84,6 +95,44 @@ class KnowledgeIndexTaskManager:
},
},
)
tool_call = run_service.record_tool_call(
run_id=agent_run_id,
tool_type=AgentToolType.LLM.value,
tool_name="lightrag.index_documents",
request_json=tool_request_json,
response_json={"phase": "indexing"},
status="running",
duration_ms=0,
error_message=None,
)
tool_call_id = tool_call.id
def heartbeat_worker() -> None:
while not heartbeat_stop.wait(HEARTBEAT_INTERVAL_SECONDS):
heartbeat_db = session_factory()
try:
AgentRunService(heartbeat_db).merge_route_json(
agent_run_id,
{
"job_type": "knowledge_index_sync",
"phase": "indexing",
"heartbeat_at": datetime.now(UTC).isoformat(),
},
)
except Exception:
logger.exception(
"Knowledge index heartbeat update failed run_id=%s",
agent_run_id,
)
finally:
heartbeat_db.close()
heartbeat_thread = threading.Thread(
target=heartbeat_worker,
name=f"knowledge-index-heartbeat-{agent_run_id}",
daemon=True,
)
heartbeat_thread.start()
response = rag_service.index_documents(document_ids=document_ids, force=force)
succeeded_document_ids = [
@@ -117,16 +166,11 @@ class KnowledgeIndexTaskManager:
duration_ms = int((perf_counter() - started) * 1000)
tool_status = "succeeded" if not failed_document_ids else "failed"
run_service.record_tool_call(
run_id=agent_run_id,
tool_type=AgentToolType.LLM.value,
tool_name="lightrag.index_documents",
request_json={
"agent": AgentName.HERMES.value,
"folder": folder,
"document_ids": document_ids,
"force": force,
},
heartbeat_stop.set()
if heartbeat_thread is not None:
heartbeat_thread.join(timeout=1)
run_service.update_tool_call(
tool_call_id,
response_json=response,
status=tool_status,
duration_ms=duration_ms,
@@ -166,22 +210,29 @@ class KnowledgeIndexTaskManager:
finished_at=datetime.now(UTC),
)
except Exception as exc:
heartbeat_stop.set()
if heartbeat_thread is not None:
heartbeat_thread.join(timeout=1)
try:
AgentRunService(db).record_tool_call(
run_id=agent_run_id,
tool_type=AgentToolType.LLM.value,
tool_name="lightrag.index_documents",
request_json={
"agent": AgentName.HERMES.value,
"folder": folder,
"document_ids": document_ids,
"force": force,
},
response_json={"error": str(exc)},
status="failed",
duration_ms=int((perf_counter() - started) * 1000),
error_message=str(exc),
)
if tool_call_id:
AgentRunService(db).update_tool_call(
tool_call_id,
response_json={"error": str(exc)},
status="failed",
duration_ms=int((perf_counter() - started) * 1000),
error_message=str(exc),
)
else:
AgentRunService(db).record_tool_call(
run_id=agent_run_id,
tool_type=AgentToolType.LLM.value,
tool_name="lightrag.index_documents",
request_json=tool_request_json,
response_json={"error": str(exc)},
status="failed",
duration_ms=int((perf_counter() - started) * 1000),
error_message=str(exc),
)
KnowledgeService(db=db).set_document_ingest_statuses(
document_ids,
KNOWLEDGE_INGEST_STATUS_FAILED,
@@ -210,6 +261,9 @@ class KnowledgeIndexTaskManager:
logger.exception("Knowledge index task finalization failed run_id=%s", agent_run_id)
logger.exception("Knowledge index task failed run_id=%s", agent_run_id)
finally:
heartbeat_stop.set()
if heartbeat_thread is not None and heartbeat_thread.is_alive():
heartbeat_thread.join(timeout=1)
db.close()