Files
X-Financial/server/tests/test_agent_asset_service.py

548 lines
20 KiB
Python
Raw Normal View History

2026-05-11 03:51:24 +00:00
from __future__ import annotations
import uuid
from io import BytesIO
2026-05-11 03:51:24 +00:00
import pytest
from openpyxl import Workbook, load_workbook
2026-05-11 03:51:24 +00:00
from sqlalchemy import create_engine
from sqlalchemy.orm import Session, sessionmaker
from sqlalchemy.pool import StaticPool
from app.core.agent_enums import (
AgentAssetContentType,
AgentAssetDomain,
AgentAssetStatus,
AgentAssetType,
AgentName,
AgentReviewStatus,
AgentRunSource,
AgentRunStatus,
)
from app.db.base import Base
from app.schemas.agent_asset import (
AgentAssetCreate,
AgentAssetReviewCreate,
AgentAssetVersionCreate,
)
from app.api.deps import CurrentUserContext
2026-05-11 03:51:24 +00:00
from app.services.agent_assets import AgentAssetService
from app.services.agent_runs import AgentRunService
from app.services.audit import AuditLogService
from app.services.expense_rule_runtime import ExpenseRuleRuntimeService
from app.services.settings import OnlyOfficeRuntimeConfig
2026-05-11 03:51:24 +00:00
def build_session() -> Session:
engine = create_engine(
"sqlite+pysqlite:///:memory:",
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
Base.metadata.create_all(bind=engine)
session_factory = sessionmaker(bind=engine, autoflush=False, autocommit=False)
return session_factory()
def build_workbook_bytes(rows: list[list[object]], *, sheet_name: str = "规则表") -> bytes:
workbook = Workbook()
sheet = workbook.active
sheet.title = sheet_name
for row in rows:
sheet.append(row)
buffer = BytesIO()
workbook.save(buffer)
return buffer.getvalue()
2026-05-11 03:51:24 +00:00
def test_agent_asset_service_seeds_assets_and_enforces_review_before_activation() -> None:
with build_session() as db:
service = AgentAssetService(db)
rules = service.list_assets(asset_type=AgentAssetType.RULE.value)
assert len(rules) >= 3
assert any(
item.code == "rule.expense.travel_risk_control_standard" and item.status == AgentAssetStatus.ACTIVE.value
for item in rules
)
assert all(
item.code
not in {
"rule.expense.duplicate_expense_check",
"rule.expense.travel_receipt_requirements",
"rule.ap.payment_dual_review",
}
for item in rules
)
2026-05-11 03:51:24 +00:00
pending_rule = next(item for item in rules if item.status == AgentAssetStatus.REVIEW.value)
with pytest.raises(PermissionError):
service.activate_asset(pending_rule.id, actor="pytest")
def test_agent_asset_service_seeds_all_foundation_asset_types() -> None:
with build_session() as db:
service = AgentAssetService(db)
assert len(service.list_assets(asset_type=AgentAssetType.RULE.value)) >= 3
assert len(service.list_assets(asset_type=AgentAssetType.SKILL.value)) >= 2
assert len(service.list_assets(asset_type=AgentAssetType.MCP.value)) >= 2
assert len(service.list_assets(asset_type=AgentAssetType.TASK.value)) >= 3
def test_agent_asset_service_can_activate_rule_after_review() -> None:
with build_session() as db:
service = AgentAssetService(db)
created = service.create_asset(
AgentAssetCreate(
asset_type=AgentAssetType.RULE,
code=f"rule.test.{uuid.uuid4().hex[:8]}",
name="测试规则",
description="用于测试审核和上线流程。",
domain=AgentAssetDomain.EXPENSE,
scenario_json=["expense", "risk_check"],
owner="pytest",
reviewer="reviewer",
status=AgentAssetStatus.DRAFT,
config_json={"enabled": False},
),
actor="pytest",
)
service.create_version(
created.id,
AgentAssetVersionCreate(
version="v1.0.0",
content="# 测试规则\n\n- 仅用于测试。",
content_type=AgentAssetContentType.MARKDOWN,
change_note="初始化版本",
created_by="pytest",
),
actor="pytest",
)
service.create_review(
created.id,
AgentAssetReviewCreate(
version="v1.0.0",
reviewer="reviewer",
review_status=AgentReviewStatus.APPROVED,
review_note="可以上线",
),
actor="reviewer",
)
activated = service.activate_asset(created.id, actor="reviewer")
assert activated.status == AgentAssetStatus.ACTIVE.value
assert activated.current_version == "v1.0.0"
assert activated.working_version == "v1.0.0"
assert activated.published_version == "v1.0.0"
2026-05-11 03:51:24 +00:00
assert activated.latest_review is not None
assert activated.latest_review.review_status == AgentReviewStatus.APPROVED.value
def test_rule_working_version_does_not_replace_published_version_until_activation() -> None:
with build_session() as db:
service = AgentAssetService(db)
rule = next(
item
for item in service.list_assets(asset_type=AgentAssetType.RULE.value)
if item.code == "rule.expense.travel_risk_control_standard"
)
created = service.create_version(
rule.id,
AgentAssetVersionCreate(
version="v1.1.1",
content="# 差旅报销风险管控制度\n\n- 工作稿",
content_type=AgentAssetContentType.MARKDOWN,
change_note="新增工作稿",
created_by="finance_user",
),
actor="finance_user",
)
detail = service.get_asset(rule.id)
assert created.is_working is True
assert created.is_published is False
assert created.lifecycle_state == "draft"
assert detail is not None
assert detail.status == AgentAssetStatus.ACTIVE.value
assert detail.current_version == "v1.1.1"
assert detail.working_version == "v1.1.1"
assert detail.published_version == "v1.1.0"
assert detail.latest_review is None
def test_pending_review_can_name_new_working_version_before_submission() -> None:
with build_session() as db:
service = AgentAssetService(db)
rule = next(
item
for item in service.list_assets(asset_type=AgentAssetType.RULE.value)
if item.code == "rule.expense.travel_risk_control_standard"
)
review = service.create_review(
rule.id,
AgentAssetReviewCreate(
version="v1.2.0",
reviewer="manager_user",
review_status=AgentReviewStatus.PENDING,
review_note="请审核",
),
actor="finance_user",
)
detail = service.get_asset(rule.id)
assert review.version == "v1.2.0"
assert detail is not None
assert detail.current_version == "v1.2.0"
assert detail.working_version == "v1.2.0"
assert detail.published_version == "v1.1.0"
assert detail.latest_review is not None
assert detail.latest_review.reviewer == "manager_user"
def test_expense_rule_runtime_uses_published_version_instead_of_working_version() -> None:
with build_session() as db:
service = AgentAssetService(db)
rule = next(
item
for item in service.list_assets(asset_type=AgentAssetType.RULE.value)
if item.code == "rule.expense.travel_risk_control_standard"
)
service.create_version(
rule.id,
AgentAssetVersionCreate(
version="v1.1.1",
content="# 工作稿\n\n```expense-rule\n{\"kind\":\"travel_policy\",\"version\":1}\n```",
content_type=AgentAssetContentType.MARKDOWN,
change_note="未上线草稿",
created_by="finance_user",
),
actor="finance_user",
)
catalog = ExpenseRuleRuntimeService(db).load_catalog()
assert catalog.travel_policy is not None
assert catalog.travel_policy.rule_version == "v1.1.0"
def test_restore_version_creates_new_working_copy_without_rewriting_published_version() -> None:
with build_session() as db:
service = AgentAssetService(db)
rule = next(
item
for item in service.list_assets(asset_type=AgentAssetType.RULE.value)
if item.code == "rule.expense.travel_risk_control_standard"
)
restored = service.restore_version_as_working_copy(
rule.id,
"v1.0.0",
actor="manager_user",
)
assert restored.working_version == "v1.1.1"
assert restored.current_version == "v1.1.1"
assert restored.published_version == "v1.1.0"
assert restored.current_version_change_note == "基于历史版本 v1.0.0 恢复生成工作稿"
def test_spreadsheet_version_compare_returns_sheet_and_cell_changes() -> None:
with build_session() as db:
service = AgentAssetService(db)
rule = next(
item
for item in service.list_assets(asset_type=AgentAssetType.RULE.value)
if item.code == "rule.expense.company_travel_expense_reimbursement"
)
service.upload_rule_spreadsheet(
rule.id,
filename="公司差旅费报销规则.xlsx",
content=build_workbook_bytes([["城市", "住宿"], ["北京", 500]]),
actor="finance_user",
)
base_version = service.get_asset(rule.id).working_version # type: ignore[union-attr]
service.upload_rule_spreadsheet(
rule.id,
filename="公司差旅费报销规则.xlsx",
content=build_workbook_bytes([["城市", "住宿"], ["北京", 550], ["武汉", 450]]),
actor="finance_user",
)
target_version = service.get_asset(rule.id).working_version # type: ignore[union-attr]
diff = service.compare_spreadsheet_versions(
rule.id,
base_version=base_version or "",
target_version=target_version or "",
)
assert diff.changed_sheet_count == 1
assert diff.changed_cell_count == 3
assert any(item.cell == "B2" and item.change_type == "modified" for item in diff.cell_changes)
assert any(item.cell == "A3" and item.change_type == "added" for item in diff.cell_changes)
def test_working_spreadsheet_version_reads_immutable_snapshot_instead_of_live_copy() -> None:
with build_session() as db:
service = AgentAssetService(db)
rule = next(
item
for item in service.list_assets(asset_type=AgentAssetType.RULE.value)
if item.code == "rule.expense.company_travel_expense_reimbursement"
)
service.upload_rule_spreadsheet(
rule.id,
filename="公司差旅费报销规则.xlsx",
content=build_workbook_bytes([["城市", "住宿"], ["北京", 500]]),
actor="finance_user",
)
detail = service.get_asset(rule.id)
assert detail is not None
working_version = detail.working_version or ""
current_asset = service.repository.get(rule.id)
assert current_asset is not None
live_storage_key = str((current_asset.config_json or {})["rule_document"]["storage_key"])
live_path = service.spreadsheet_manager.resolve_storage_path(live_storage_key)
original_live_bytes = live_path.read_bytes()
try:
live_path.write_bytes(build_workbook_bytes([["城市", "住宿"], ["北京", 999]]))
snapshot_path, _, _ = service.get_rule_spreadsheet_content(
rule.id,
version=working_version,
)
assert snapshot_path != live_path
workbook = load_workbook(snapshot_path, data_only=False)
assert workbook.active["B2"].value == 500
finally:
live_path.write_bytes(original_live_bytes)
def test_spreadsheet_change_records_return_recent_edit_details() -> None:
with build_session() as db:
service = AgentAssetService(db)
rule = next(
item
for item in service.list_assets(asset_type=AgentAssetType.RULE.value)
if item.code == "rule.expense.company_travel_expense_reimbursement"
)
service.audit_service.log_action(
actor="manager_user",
action="edit_rule_spreadsheet",
resource_type=rule.asset_type,
resource_id=rule.id,
after_json={
"summary": "在线编辑:共 1 处改动。",
"changed_sheet_count": 1,
"changed_cell_count": 1,
"sheet_changes": [],
"cell_changes": [
{
"sheet_name": "规则表",
"cell": "B2",
"change_type": "modified",
"before_value": 500,
"after_value": 550,
}
],
},
)
records = service.list_spreadsheet_change_records(rule.id)
assert len(records) == 1
assert records[0].actor == "manager_user"
assert records[0].changed_cell_count == 1
assert records[0].cell_changes[0].cell == "B2"
def test_editable_spreadsheet_onlyoffice_config_enables_forcesave(monkeypatch) -> None:
with build_session() as db:
monkeypatch.setattr(
"app.services.agent_assets.resolve_onlyoffice_settings",
lambda: OnlyOfficeRuntimeConfig(
enabled=True,
public_url="http://onlyoffice.example.com",
backend_url="http://backend.example.com",
jwt_secret="secret",
),
)
service = AgentAssetService(db)
rule = next(
item
for item in service.list_assets(asset_type=AgentAssetType.RULE.value)
if item.code == "rule.expense.company_travel_expense_reimbursement"
)
config = service.build_rule_spreadsheet_onlyoffice_config(
rule.id,
CurrentUserContext(
username="finance_user",
name="财务人员",
role_codes=["finance"],
is_admin=False,
),
)
customization = config.config["editorConfig"]["customization"]
assert config.config["editorConfig"]["mode"] == "edit"
assert customization["forcesave"] is True
def test_version_timeline_contains_created_review_and_publish_events() -> None:
with build_session() as db:
service = AgentAssetService(db)
rule = next(
item
for item in service.list_assets(asset_type=AgentAssetType.RULE.value)
if item.code == "rule.expense.travel_risk_control_standard"
)
service.create_version(
rule.id,
AgentAssetVersionCreate(
version="v1.1.1",
content="# 工作稿",
content_type=AgentAssetContentType.MARKDOWN,
change_note="补充口径",
created_by="finance_user",
),
actor="finance_user",
)
service.create_review(
rule.id,
AgentAssetReviewCreate(
version="v1.1.1",
reviewer="finance_user",
review_status=AgentReviewStatus.PENDING,
review_note="请审核",
),
actor="finance_user",
)
service.create_review(
rule.id,
AgentAssetReviewCreate(
version="v1.1.1",
reviewer="manager_user",
review_status=AgentReviewStatus.APPROVED,
review_note="可以上线",
),
actor="manager_user",
)
service.activate_asset(rule.id, actor="manager_user")
timeline = service.list_version_timeline(rule.id)
event_types = [item.event_type for item in timeline if item.version == "v1.1.1"]
assert "created" in event_types
assert "submitted" in event_types
assert "approved" in event_types
assert "published" in event_types
2026-05-11 03:51:24 +00:00
def test_agent_asset_service_returns_recent_versions_for_rule_detail() -> None:
with build_session() as db:
service = AgentAssetService(db)
rule = next(
item
for item in service.list_assets(asset_type=AgentAssetType.RULE.value)
if item.code == "rule.expense.attachment_submission_requirements"
2026-05-11 03:51:24 +00:00
)
detail = service.get_asset(rule.id)
assert detail is not None
assert detail.current_version == "v1.0.0"
2026-05-11 03:51:24 +00:00
assert detail.current_version_content_type == AgentAssetContentType.MARKDOWN.value
assert isinstance(detail.current_version_content, str)
assert len(detail.recent_versions) >= 2
assert any(item.is_current for item in detail.recent_versions)
assert {item.version for item in detail.recent_versions} >= {"v0.9.0", "v1.0.0"}
assert detail.config_json["rule_template_key"] == "attachment_requirement_v1"
assert "附件或单据不完整" in str(detail.current_version_content)
def test_agent_asset_service_returns_travel_policy_rule_detail() -> None:
with build_session() as db:
service = AgentAssetService(db)
rule = next(
item
for item in service.list_assets(asset_type=AgentAssetType.RULE.value)
if item.code == "rule.expense.travel_risk_control_standard"
)
detail = service.get_asset(rule.id)
assert detail is not None
assert detail.status == AgentAssetStatus.ACTIVE.value
assert detail.current_version == "v1.1.0"
assert detail.latest_review is not None
assert detail.latest_review.review_status == AgentReviewStatus.APPROVED.value
assert "行程闭环" in str(detail.current_version_content)
assert "住宿标准、飞机舱位和火车席别" in str(detail.current_version_content)
2026-05-11 03:51:24 +00:00
def test_agent_run_service_lists_seeded_trace_data() -> None:
with build_session() as db:
service = AgentRunService(db)
runs = service.list_runs()
assert len(runs) >= 3
assert any(item.tool_calls for item in runs)
assert any(item.semantic_parse is not None for item in runs)
def test_agent_run_service_creates_run_and_persists_error_message() -> None:
with build_session() as db:
service = AgentRunService(db)
created = service.create_run(
agent=AgentName.ORCHESTRATOR.value,
source=AgentRunSource.SYSTEM_EVENT.value,
status=AgentRunStatus.FAILED.value,
error_message="simulated failure",
result_summary="failed to route request",
)
fetched = service.get_run(created.run_id)
assert fetched is not None
assert fetched.run_id.startswith("run_")
assert fetched.status == AgentRunStatus.FAILED.value
assert fetched.error_message == "simulated failure"
assert fetched.result_summary == "failed to route request"
def test_agent_asset_creation_writes_audit_log() -> None:
with build_session() as db:
service = AgentAssetService(db)
created = service.create_asset(
AgentAssetCreate(
asset_type=AgentAssetType.SKILL,
code=f"skill.test.{uuid.uuid4().hex[:8]}",
name="测试技能",
description="用于测试审计日志写入。",
domain=AgentAssetDomain.KNOWLEDGE,
scenario_json=["knowledge", "query"],
owner="pytest",
reviewer="reviewer",
status=AgentAssetStatus.DRAFT,
config_json={"enabled": True},
),
actor="pytest",
)
logs = AuditLogService(db).list_logs(resource_id=created.id)
assert any(item.action == "create_agent_asset" for item in logs)