Files
X-Financial/server/tests/test_knowledge_sync.py

66 lines
2.3 KiB
Python
Raw Permalink Normal View History

from __future__ import annotations
from sqlalchemy import create_engine
from sqlalchemy.orm import Session, sessionmaker
from sqlalchemy.pool import StaticPool
from app.api.deps import CurrentUserContext
from app.db.base import Base
from app.services.knowledge import KNOWLEDGE_INGEST_STATUS_INGESTED, KnowledgeService
from app.services.knowledge_sync import KnowledgeSyncDispatchService
def build_session() -> Session:
engine = create_engine(
"sqlite+pysqlite:///:memory:",
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
Base.metadata.create_all(bind=engine)
session_factory = sessionmaker(bind=engine, autoflush=False, autocommit=False)
return session_factory()
def test_force_sync_queues_ingested_documents_and_creates_hermes_run(tmp_path, monkeypatch) -> None:
submitted: list[dict[str, object]] = []
user = CurrentUserContext(
username="admin",
name="管理员",
role_codes=["manager"],
is_admin=True,
)
with build_session() as db:
knowledge_service = KnowledgeService(storage_root=tmp_path, db=db)
uploaded = knowledge_service.upload_document("报销制度", "demo.txt", b"hello", user)
document_id = uploaded.id
knowledge_service.set_document_ingest_statuses(
[document_id],
KNOWLEDGE_INGEST_STATUS_INGESTED,
agent_run_id="run_previous",
)
monkeypatch.setattr(
"app.services.knowledge_rag.KnowledgeRagService.get_document_status_map",
lambda self, _document_ids: {},
)
monkeypatch.setattr(
"app.services.knowledge_sync.knowledge_index_task_manager.submit_sync",
lambda **kwargs: submitted.append(kwargs),
)
dispatch_service = KnowledgeSyncDispatchService(db)
dispatch_service.knowledge_service = knowledge_service
result = dispatch_service.queue_sync(
current_user=user,
folder=None,
document_ids=[document_id],
force=True,
changed_only=True,
)
assert result.agent_run_id.startswith("run_")
assert document_id in result.document_ids
assert submitted
assert submitted[0]["agent_run_id"] == result.agent_run_id