diff --git a/server/tests/test_agent_asset_onlyoffice_key.py b/server/tests/test_agent_asset_onlyoffice_key.py new file mode 100644 index 0000000..30d9cb1 --- /dev/null +++ b/server/tests/test_agent_asset_onlyoffice_key.py @@ -0,0 +1,23 @@ +from app.services.agent_asset_spreadsheet import RuleSpreadsheetMeta +from app.services.agent_assets import AgentAssetService + + +def test_rule_spreadsheet_onlyoffice_key_uses_safe_characters() -> None: + metadata = RuleSpreadsheetMeta( + file_name="公司差旅费报销规则.xlsx", + storage_key="rules/finance-rules/公司差旅费报销规则.xlsx", + mime_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", + size_bytes=1, + checksum="abc123", + updated_at="2026-05-17T14:32:00+00:00", + updated_by="system", + ) + + key = AgentAssetService._build_onlyoffice_document_key( + "asset:id", + "v1.0.0", + metadata, + ) + + assert key == "asset_id-v1.0.0-abc123" + assert ":" not in key diff --git a/server/tests/test_agent_asset_service.py b/server/tests/test_agent_asset_service.py index 19e7506..91b7efe 100644 --- a/server/tests/test_agent_asset_service.py +++ b/server/tests/test_agent_asset_service.py @@ -1,8 +1,10 @@ from __future__ import annotations import uuid +from io import BytesIO import pytest +from openpyxl import Workbook, load_workbook from sqlalchemy import create_engine from sqlalchemy.orm import Session, sessionmaker from sqlalchemy.pool import StaticPool @@ -26,6 +28,7 @@ from app.schemas.agent_asset import ( from app.services.agent_assets import AgentAssetService from app.services.agent_runs import AgentRunService from app.services.audit import AuditLogService +from app.services.expense_rule_runtime import ExpenseRuleRuntimeService def build_session() -> Session: @@ -39,6 +42,17 @@ def build_session() -> Session: return session_factory() +def build_workbook_bytes(rows: list[list[object]], *, sheet_name: str = "规则表") -> bytes: + workbook = Workbook() + sheet = workbook.active + sheet.title = sheet_name + for row in rows: + sheet.append(row) + buffer = BytesIO() + workbook.save(buffer) + return buffer.getvalue() + + def test_agent_asset_service_seeds_assets_and_enforces_review_before_activation() -> None: with build_session() as db: service = AgentAssetService(db) @@ -120,10 +134,218 @@ def test_agent_asset_service_can_activate_rule_after_review() -> None: assert activated.status == AgentAssetStatus.ACTIVE.value assert activated.current_version == "v1.0.0" + assert activated.working_version == "v1.0.0" + assert activated.published_version == "v1.0.0" assert activated.latest_review is not None assert activated.latest_review.review_status == AgentReviewStatus.APPROVED.value +def test_rule_working_version_does_not_replace_published_version_until_activation() -> None: + with build_session() as db: + service = AgentAssetService(db) + rule = next( + item + for item in service.list_assets(asset_type=AgentAssetType.RULE.value) + if item.code == "rule.expense.travel_risk_control_standard" + ) + + created = service.create_version( + rule.id, + AgentAssetVersionCreate( + version="v1.1.1", + content="# 差旅报销风险管控制度\n\n- 工作稿", + content_type=AgentAssetContentType.MARKDOWN, + change_note="新增工作稿", + created_by="finance_user", + ), + actor="finance_user", + ) + detail = service.get_asset(rule.id) + + assert created.is_working is True + assert created.is_published is False + assert created.lifecycle_state == "draft" + assert detail is not None + assert detail.status == AgentAssetStatus.ACTIVE.value + assert detail.current_version == "v1.1.1" + assert detail.working_version == "v1.1.1" + assert detail.published_version == "v1.1.0" + assert detail.latest_review is None + + +def test_expense_rule_runtime_uses_published_version_instead_of_working_version() -> None: + with build_session() as db: + service = AgentAssetService(db) + rule = next( + item + for item in service.list_assets(asset_type=AgentAssetType.RULE.value) + if item.code == "rule.expense.travel_risk_control_standard" + ) + + service.create_version( + rule.id, + AgentAssetVersionCreate( + version="v1.1.1", + content="# 工作稿\n\n```expense-rule\n{\"kind\":\"travel_policy\",\"version\":1}\n```", + content_type=AgentAssetContentType.MARKDOWN, + change_note="未上线草稿", + created_by="finance_user", + ), + actor="finance_user", + ) + + catalog = ExpenseRuleRuntimeService(db).load_catalog() + + assert catalog.travel_policy is not None + assert catalog.travel_policy.rule_version == "v1.1.0" + + +def test_restore_version_creates_new_working_copy_without_rewriting_published_version() -> None: + with build_session() as db: + service = AgentAssetService(db) + rule = next( + item + for item in service.list_assets(asset_type=AgentAssetType.RULE.value) + if item.code == "rule.expense.travel_risk_control_standard" + ) + + restored = service.restore_version_as_working_copy( + rule.id, + "v1.0.0", + actor="manager_user", + ) + + assert restored.working_version == "v1.1.1" + assert restored.current_version == "v1.1.1" + assert restored.published_version == "v1.1.0" + assert restored.current_version_change_note == "基于历史版本 v1.0.0 恢复生成工作稿" + + +def test_spreadsheet_version_compare_returns_sheet_and_cell_changes() -> None: + with build_session() as db: + service = AgentAssetService(db) + rule = next( + item + for item in service.list_assets(asset_type=AgentAssetType.RULE.value) + if item.code == "rule.expense.company_travel_expense_reimbursement" + ) + + service.upload_rule_spreadsheet( + rule.id, + filename="公司差旅费报销规则.xlsx", + content=build_workbook_bytes([["城市", "住宿"], ["北京", 500]]), + actor="finance_user", + ) + base_version = service.get_asset(rule.id).working_version # type: ignore[union-attr] + service.upload_rule_spreadsheet( + rule.id, + filename="公司差旅费报销规则.xlsx", + content=build_workbook_bytes([["城市", "住宿"], ["北京", 550], ["武汉", 450]]), + actor="finance_user", + ) + target_version = service.get_asset(rule.id).working_version # type: ignore[union-attr] + + diff = service.compare_spreadsheet_versions( + rule.id, + base_version=base_version or "", + target_version=target_version or "", + ) + + assert diff.changed_sheet_count == 1 + assert diff.changed_cell_count == 3 + assert any(item.cell == "B2" and item.change_type == "modified" for item in diff.cell_changes) + assert any(item.cell == "A3" and item.change_type == "added" for item in diff.cell_changes) + + +def test_working_spreadsheet_version_reads_immutable_snapshot_instead_of_live_copy() -> None: + with build_session() as db: + service = AgentAssetService(db) + rule = next( + item + for item in service.list_assets(asset_type=AgentAssetType.RULE.value) + if item.code == "rule.expense.company_travel_expense_reimbursement" + ) + + service.upload_rule_spreadsheet( + rule.id, + filename="公司差旅费报销规则.xlsx", + content=build_workbook_bytes([["城市", "住宿"], ["北京", 500]]), + actor="finance_user", + ) + detail = service.get_asset(rule.id) + assert detail is not None + working_version = detail.working_version or "" + + current_asset = service.repository.get(rule.id) + assert current_asset is not None + live_storage_key = str((current_asset.config_json or {})["rule_document"]["storage_key"]) + live_path = service.spreadsheet_manager.resolve_storage_path(live_storage_key) + original_live_bytes = live_path.read_bytes() + try: + live_path.write_bytes(build_workbook_bytes([["城市", "住宿"], ["北京", 999]])) + + snapshot_path, _, _ = service.get_rule_spreadsheet_content( + rule.id, + version=working_version, + ) + + assert snapshot_path != live_path + workbook = load_workbook(snapshot_path, data_only=False) + assert workbook.active["B2"].value == 500 + finally: + live_path.write_bytes(original_live_bytes) + + +def test_version_timeline_contains_created_review_and_publish_events() -> None: + with build_session() as db: + service = AgentAssetService(db) + rule = next( + item + for item in service.list_assets(asset_type=AgentAssetType.RULE.value) + if item.code == "rule.expense.travel_risk_control_standard" + ) + service.create_version( + rule.id, + AgentAssetVersionCreate( + version="v1.1.1", + content="# 工作稿", + content_type=AgentAssetContentType.MARKDOWN, + change_note="补充口径", + created_by="finance_user", + ), + actor="finance_user", + ) + service.create_review( + rule.id, + AgentAssetReviewCreate( + version="v1.1.1", + reviewer="finance_user", + review_status=AgentReviewStatus.PENDING, + review_note="请审核", + ), + actor="finance_user", + ) + service.create_review( + rule.id, + AgentAssetReviewCreate( + version="v1.1.1", + reviewer="manager_user", + review_status=AgentReviewStatus.APPROVED, + review_note="可以上线", + ), + actor="manager_user", + ) + service.activate_asset(rule.id, actor="manager_user") + + timeline = service.list_version_timeline(rule.id) + event_types = [item.event_type for item in timeline if item.version == "v1.1.1"] + + assert "created" in event_types + assert "submitted" in event_types + assert "approved" in event_types + assert "published" in event_types + + def test_agent_asset_service_returns_recent_versions_for_rule_detail() -> None: with build_session() as db: service = AgentAssetService(db) diff --git a/server/tests/test_agent_asset_spreadsheet_import.py b/server/tests/test_agent_asset_spreadsheet_import.py new file mode 100644 index 0000000..eaeb493 --- /dev/null +++ b/server/tests/test_agent_asset_spreadsheet_import.py @@ -0,0 +1,41 @@ +from io import BytesIO + +from openpyxl import Workbook, load_workbook + +from app.services.agent_asset_spreadsheet import AgentAssetSpreadsheetManager + + +def test_rebuild_from_uploaded_content_preserves_sheet_values() -> None: + source = Workbook() + first = source.active + first.title = "差旅标准" + first.append(["城市", "住宿费"]) + first.append(["北京", 500]) + second = source.create_sheet("补贴标准") + second.append(["区域", "餐补"]) + second.append(["直辖市", 75]) + + source_buffer = BytesIO() + source.save(source_buffer) + + rebuilt = AgentAssetSpreadsheetManager.rebuild_from_uploaded_content( + source_buffer.getvalue() + ) + workbook = load_workbook(BytesIO(rebuilt), data_only=False) + + assert workbook.sheetnames == ["差旅标准", "补贴标准"] + assert workbook["差旅标准"]["A2"].value == "北京" + assert workbook["差旅标准"]["B2"].value == "500" + assert workbook["补贴标准"]["A2"].value == "直辖市" + assert workbook["补贴标准"]["B2"].value == "75" + + +def build_workbook_bytes(rows: list[list[object]], *, sheet_name: str = "规则表") -> bytes: + workbook = Workbook() + sheet = workbook.active + sheet.title = sheet_name + for row in rows: + sheet.append(row) + buffer = BytesIO() + workbook.save(buffer) + return buffer.getvalue() diff --git a/server/tests/test_agent_runs_service.py b/server/tests/test_agent_runs_service.py new file mode 100644 index 0000000..3cc8fdd --- /dev/null +++ b/server/tests/test_agent_runs_service.py @@ -0,0 +1,85 @@ +from __future__ import annotations + +from datetime import UTC, datetime, timedelta + +from sqlalchemy import create_engine +from sqlalchemy.orm import Session, sessionmaker +from sqlalchemy.pool import StaticPool + +from app.core.agent_enums import AgentName, AgentRunSource, AgentRunStatus, AgentToolType +from app.db.base import Base +from app.services.agent_runs import AgentRunService + + +def build_session() -> Session: + engine = create_engine( + "sqlite+pysqlite:///:memory:", + connect_args={"check_same_thread": False}, + poolclass=StaticPool, + ) + Base.metadata.create_all(bind=engine) + session_factory = sessionmaker(bind=engine, autoflush=False, autocommit=False) + return session_factory() + + +def test_agent_run_service_marks_stale_knowledge_sync_run_failed_on_read() -> None: + with build_session() as db: + service = AgentRunService(db) + created = service.create_run( + agent=AgentName.HERMES.value, + source=AgentRunSource.USER_MESSAGE.value, + status=AgentRunStatus.RUNNING.value, + route_json={ + "job_type": "knowledge_index_sync", + "heartbeat_at": (datetime.now(UTC) - timedelta(minutes=31)).isoformat(), + "requested_document_ids": [], + }, + ) + + fetched = service.get_run(created.run_id) + running_runs = service.list_runs( + agent=AgentName.HERMES.value, + status=AgentRunStatus.RUNNING.value, + limit=100, + ) + + assert fetched is not None + assert fetched.status == AgentRunStatus.FAILED.value + assert fetched.error_message == "Knowledge index heartbeat timed out." + assert all(item.run_id != created.run_id for item in running_runs) + + +def test_agent_run_service_updates_existing_tool_call() -> None: + with build_session() as db: + service = AgentRunService(db) + run = service.create_run( + agent=AgentName.HERMES.value, + source=AgentRunSource.USER_MESSAGE.value, + status=AgentRunStatus.RUNNING.value, + route_json={"job_type": "knowledge_index_sync"}, + ) + tool_call = service.record_tool_call( + run_id=run.run_id, + tool_type=AgentToolType.LLM.value, + tool_name="lightrag.index_documents", + request_json={"document_ids": ["doc-1"]}, + response_json={"phase": "indexing"}, + status="running", + duration_ms=0, + ) + + updated = service.update_tool_call( + tool_call.id, + response_json={"track_id": "insert_123"}, + status="succeeded", + duration_ms=1250, + error_message=None, + ) + fetched = service.get_run(run.run_id) + + assert updated.status == "succeeded" + assert updated.duration_ms == 1250 + assert fetched is not None + assert len(fetched.tool_calls) == 1 + assert fetched.tool_calls[0].status == "succeeded" + assert fetched.tool_calls[0].response_json == {"track_id": "insert_123"}