from __future__ import annotations import uuid from io import BytesIO import pytest from openpyxl import Workbook, load_workbook from sqlalchemy import create_engine from sqlalchemy.orm import Session, sessionmaker from sqlalchemy.pool import StaticPool from app.core.agent_enums import ( AgentAssetContentType, AgentAssetDomain, AgentAssetStatus, AgentAssetType, AgentName, AgentReviewStatus, AgentRunSource, AgentRunStatus, ) from app.db.base import Base from app.schemas.agent_asset import ( AgentAssetCreate, AgentAssetReviewCreate, AgentAssetVersionCreate, ) from app.services.agent_assets import AgentAssetService from app.services.agent_runs import AgentRunService from app.services.audit import AuditLogService from app.services.expense_rule_runtime import ExpenseRuleRuntimeService def build_session() -> Session: engine = create_engine( "sqlite+pysqlite:///:memory:", connect_args={"check_same_thread": False}, poolclass=StaticPool, ) Base.metadata.create_all(bind=engine) session_factory = sessionmaker(bind=engine, autoflush=False, autocommit=False) return session_factory() def build_workbook_bytes(rows: list[list[object]], *, sheet_name: str = "规则表") -> bytes: workbook = Workbook() sheet = workbook.active sheet.title = sheet_name for row in rows: sheet.append(row) buffer = BytesIO() workbook.save(buffer) return buffer.getvalue() def test_agent_asset_service_seeds_assets_and_enforces_review_before_activation() -> None: with build_session() as db: service = AgentAssetService(db) rules = service.list_assets(asset_type=AgentAssetType.RULE.value) assert len(rules) >= 3 assert any( item.code == "rule.expense.travel_risk_control_standard" and item.status == AgentAssetStatus.ACTIVE.value for item in rules ) assert all( item.code not in { "rule.expense.duplicate_expense_check", "rule.expense.travel_receipt_requirements", "rule.ap.payment_dual_review", } for item in rules ) pending_rule = next(item for item in rules if item.status == AgentAssetStatus.REVIEW.value) with pytest.raises(PermissionError): service.activate_asset(pending_rule.id, actor="pytest") def test_agent_asset_service_seeds_all_foundation_asset_types() -> None: with build_session() as db: service = AgentAssetService(db) assert len(service.list_assets(asset_type=AgentAssetType.RULE.value)) >= 3 assert len(service.list_assets(asset_type=AgentAssetType.SKILL.value)) >= 2 assert len(service.list_assets(asset_type=AgentAssetType.MCP.value)) >= 2 assert len(service.list_assets(asset_type=AgentAssetType.TASK.value)) >= 3 def test_agent_asset_service_can_activate_rule_after_review() -> None: with build_session() as db: service = AgentAssetService(db) created = service.create_asset( AgentAssetCreate( asset_type=AgentAssetType.RULE, code=f"rule.test.{uuid.uuid4().hex[:8]}", name="测试规则", description="用于测试审核和上线流程。", domain=AgentAssetDomain.EXPENSE, scenario_json=["expense", "risk_check"], owner="pytest", reviewer="reviewer", status=AgentAssetStatus.DRAFT, config_json={"enabled": False}, ), actor="pytest", ) service.create_version( created.id, AgentAssetVersionCreate( version="v1.0.0", content="# 测试规则\n\n- 仅用于测试。", content_type=AgentAssetContentType.MARKDOWN, change_note="初始化版本", created_by="pytest", ), actor="pytest", ) service.create_review( created.id, AgentAssetReviewCreate( version="v1.0.0", reviewer="reviewer", review_status=AgentReviewStatus.APPROVED, review_note="可以上线", ), actor="reviewer", ) activated = service.activate_asset(created.id, actor="reviewer") assert activated.status == AgentAssetStatus.ACTIVE.value assert activated.current_version == "v1.0.0" assert activated.working_version == "v1.0.0" assert activated.published_version == "v1.0.0" assert activated.latest_review is not None assert activated.latest_review.review_status == AgentReviewStatus.APPROVED.value def test_rule_working_version_does_not_replace_published_version_until_activation() -> None: with build_session() as db: service = AgentAssetService(db) rule = next( item for item in service.list_assets(asset_type=AgentAssetType.RULE.value) if item.code == "rule.expense.travel_risk_control_standard" ) created = service.create_version( rule.id, AgentAssetVersionCreate( version="v1.1.1", content="# 差旅报销风险管控制度\n\n- 工作稿", content_type=AgentAssetContentType.MARKDOWN, change_note="新增工作稿", created_by="finance_user", ), actor="finance_user", ) detail = service.get_asset(rule.id) assert created.is_working is True assert created.is_published is False assert created.lifecycle_state == "draft" assert detail is not None assert detail.status == AgentAssetStatus.ACTIVE.value assert detail.current_version == "v1.1.1" assert detail.working_version == "v1.1.1" assert detail.published_version == "v1.1.0" assert detail.latest_review is None def test_expense_rule_runtime_uses_published_version_instead_of_working_version() -> None: with build_session() as db: service = AgentAssetService(db) rule = next( item for item in service.list_assets(asset_type=AgentAssetType.RULE.value) if item.code == "rule.expense.travel_risk_control_standard" ) service.create_version( rule.id, AgentAssetVersionCreate( version="v1.1.1", content="# 工作稿\n\n```expense-rule\n{\"kind\":\"travel_policy\",\"version\":1}\n```", content_type=AgentAssetContentType.MARKDOWN, change_note="未上线草稿", created_by="finance_user", ), actor="finance_user", ) catalog = ExpenseRuleRuntimeService(db).load_catalog() assert catalog.travel_policy is not None assert catalog.travel_policy.rule_version == "v1.1.0" def test_restore_version_creates_new_working_copy_without_rewriting_published_version() -> None: with build_session() as db: service = AgentAssetService(db) rule = next( item for item in service.list_assets(asset_type=AgentAssetType.RULE.value) if item.code == "rule.expense.travel_risk_control_standard" ) restored = service.restore_version_as_working_copy( rule.id, "v1.0.0", actor="manager_user", ) assert restored.working_version == "v1.1.1" assert restored.current_version == "v1.1.1" assert restored.published_version == "v1.1.0" assert restored.current_version_change_note == "基于历史版本 v1.0.0 恢复生成工作稿" def test_spreadsheet_version_compare_returns_sheet_and_cell_changes() -> None: with build_session() as db: service = AgentAssetService(db) rule = next( item for item in service.list_assets(asset_type=AgentAssetType.RULE.value) if item.code == "rule.expense.company_travel_expense_reimbursement" ) service.upload_rule_spreadsheet( rule.id, filename="公司差旅费报销规则.xlsx", content=build_workbook_bytes([["城市", "住宿"], ["北京", 500]]), actor="finance_user", ) base_version = service.get_asset(rule.id).working_version # type: ignore[union-attr] service.upload_rule_spreadsheet( rule.id, filename="公司差旅费报销规则.xlsx", content=build_workbook_bytes([["城市", "住宿"], ["北京", 550], ["武汉", 450]]), actor="finance_user", ) target_version = service.get_asset(rule.id).working_version # type: ignore[union-attr] diff = service.compare_spreadsheet_versions( rule.id, base_version=base_version or "", target_version=target_version or "", ) assert diff.changed_sheet_count == 1 assert diff.changed_cell_count == 3 assert any(item.cell == "B2" and item.change_type == "modified" for item in diff.cell_changes) assert any(item.cell == "A3" and item.change_type == "added" for item in diff.cell_changes) def test_working_spreadsheet_version_reads_immutable_snapshot_instead_of_live_copy() -> None: with build_session() as db: service = AgentAssetService(db) rule = next( item for item in service.list_assets(asset_type=AgentAssetType.RULE.value) if item.code == "rule.expense.company_travel_expense_reimbursement" ) service.upload_rule_spreadsheet( rule.id, filename="公司差旅费报销规则.xlsx", content=build_workbook_bytes([["城市", "住宿"], ["北京", 500]]), actor="finance_user", ) detail = service.get_asset(rule.id) assert detail is not None working_version = detail.working_version or "" current_asset = service.repository.get(rule.id) assert current_asset is not None live_storage_key = str((current_asset.config_json or {})["rule_document"]["storage_key"]) live_path = service.spreadsheet_manager.resolve_storage_path(live_storage_key) original_live_bytes = live_path.read_bytes() try: live_path.write_bytes(build_workbook_bytes([["城市", "住宿"], ["北京", 999]])) snapshot_path, _, _ = service.get_rule_spreadsheet_content( rule.id, version=working_version, ) assert snapshot_path != live_path workbook = load_workbook(snapshot_path, data_only=False) assert workbook.active["B2"].value == 500 finally: live_path.write_bytes(original_live_bytes) def test_version_timeline_contains_created_review_and_publish_events() -> None: with build_session() as db: service = AgentAssetService(db) rule = next( item for item in service.list_assets(asset_type=AgentAssetType.RULE.value) if item.code == "rule.expense.travel_risk_control_standard" ) service.create_version( rule.id, AgentAssetVersionCreate( version="v1.1.1", content="# 工作稿", content_type=AgentAssetContentType.MARKDOWN, change_note="补充口径", created_by="finance_user", ), actor="finance_user", ) service.create_review( rule.id, AgentAssetReviewCreate( version="v1.1.1", reviewer="finance_user", review_status=AgentReviewStatus.PENDING, review_note="请审核", ), actor="finance_user", ) service.create_review( rule.id, AgentAssetReviewCreate( version="v1.1.1", reviewer="manager_user", review_status=AgentReviewStatus.APPROVED, review_note="可以上线", ), actor="manager_user", ) service.activate_asset(rule.id, actor="manager_user") timeline = service.list_version_timeline(rule.id) event_types = [item.event_type for item in timeline if item.version == "v1.1.1"] assert "created" in event_types assert "submitted" in event_types assert "approved" in event_types assert "published" in event_types def test_agent_asset_service_returns_recent_versions_for_rule_detail() -> None: with build_session() as db: service = AgentAssetService(db) rule = next( item for item in service.list_assets(asset_type=AgentAssetType.RULE.value) if item.code == "rule.expense.attachment_submission_requirements" ) detail = service.get_asset(rule.id) assert detail is not None assert detail.current_version == "v1.0.0" assert detail.current_version_content_type == AgentAssetContentType.MARKDOWN.value assert isinstance(detail.current_version_content, str) assert len(detail.recent_versions) >= 2 assert any(item.is_current for item in detail.recent_versions) assert {item.version for item in detail.recent_versions} >= {"v0.9.0", "v1.0.0"} assert detail.config_json["rule_template_key"] == "attachment_requirement_v1" assert "附件或单据不完整" in str(detail.current_version_content) def test_agent_asset_service_returns_travel_policy_rule_detail() -> None: with build_session() as db: service = AgentAssetService(db) rule = next( item for item in service.list_assets(asset_type=AgentAssetType.RULE.value) if item.code == "rule.expense.travel_risk_control_standard" ) detail = service.get_asset(rule.id) assert detail is not None assert detail.status == AgentAssetStatus.ACTIVE.value assert detail.current_version == "v1.1.0" assert detail.latest_review is not None assert detail.latest_review.review_status == AgentReviewStatus.APPROVED.value assert "行程闭环" in str(detail.current_version_content) assert "住宿标准、飞机舱位和火车席别" in str(detail.current_version_content) def test_agent_run_service_lists_seeded_trace_data() -> None: with build_session() as db: service = AgentRunService(db) runs = service.list_runs() assert len(runs) >= 3 assert any(item.tool_calls for item in runs) assert any(item.semantic_parse is not None for item in runs) def test_agent_run_service_creates_run_and_persists_error_message() -> None: with build_session() as db: service = AgentRunService(db) created = service.create_run( agent=AgentName.ORCHESTRATOR.value, source=AgentRunSource.SYSTEM_EVENT.value, status=AgentRunStatus.FAILED.value, error_message="simulated failure", result_summary="failed to route request", ) fetched = service.get_run(created.run_id) assert fetched is not None assert fetched.run_id.startswith("run_") assert fetched.status == AgentRunStatus.FAILED.value assert fetched.error_message == "simulated failure" assert fetched.result_summary == "failed to route request" def test_agent_asset_creation_writes_audit_log() -> None: with build_session() as db: service = AgentAssetService(db) created = service.create_asset( AgentAssetCreate( asset_type=AgentAssetType.SKILL, code=f"skill.test.{uuid.uuid4().hex[:8]}", name="测试技能", description="用于测试审计日志写入。", domain=AgentAssetDomain.KNOWLEDGE, scenario_json=["knowledge", "query"], owner="pytest", reviewer="reviewer", status=AgentAssetStatus.DRAFT, config_json={"enabled": True}, ), actor="pytest", ) logs = AuditLogService(db).list_logs(resource_id=created.id) assert any(item.action == "create_agent_asset" for item in logs)