from __future__ import annotations from datetime import UTC, datetime from sqlalchemy import create_engine from sqlalchemy.orm import Session, sessionmaker from sqlalchemy.pool import StaticPool from app.api.deps import CurrentUserContext from app.core.agent_enums import AgentName, AgentRunSource, AgentRunStatus from app.db.base import Base from app.services.agent_runs import AgentRunService from app.services.knowledge import ( KNOWLEDGE_INGEST_STATUS_FAILED, KNOWLEDGE_INGEST_STATUS_SYNCING, KnowledgeService, ) def build_session() -> Session: engine = create_engine( "sqlite+pysqlite:///:memory:", connect_args={"check_same_thread": False}, poolclass=StaticPool, ) Base.metadata.create_all(bind=engine) session_factory = sessionmaker(bind=engine, autoflush=False, autocommit=False) return session_factory() def test_list_library_returns_closed_folder_icons_by_default(tmp_path) -> None: service = KnowledgeService(storage_root=tmp_path) library = service.list_library() assert library.folders assert {folder.icon for folder in library.folders} == {"mdi mdi-folder"} def test_reconcile_document_ingest_status_keeps_failed_when_linked_run_failed( tmp_path, monkeypatch, ) -> None: with build_session() as db: service = KnowledgeService(storage_root=tmp_path, db=db) uploaded = service.upload_document( "报销制度", "demo.txt", b"hello", CurrentUserContext( username="admin", name="管理员", role_codes=["manager"], is_admin=True, ), ) run = AgentRunService(db).create_run( agent=AgentName.HERMES.value, source=AgentRunSource.USER_MESSAGE.value, status=AgentRunStatus.FAILED.value, route_json={"job_type": "knowledge_index_sync"}, ) service.set_document_ingest_statuses( [uploaded.id], KNOWLEDGE_INGEST_STATUS_SYNCING, agent_run_id=run.run_id, ) monkeypatch.setattr( "app.services.knowledge_rag.KnowledgeRagService.get_document_status_map", lambda self, _document_ids: { uploaded.id: { "status": "processing", "query_ready": False, "updated_at": datetime.now(UTC).isoformat(), } }, ) index = service._load_index() changed = service._reconcile_document_ingest_statuses( index, document_ids=[uploaded.id], preserve_syncing=False, ) entry = next(item for item in index["documents"] if item["id"] == uploaded.id) assert changed is True assert entry["ingest_status"] == KNOWLEDGE_INGEST_STATUS_FAILED