Files
X-Financial/server/tests/test_risk_rule_generation.py

331 lines
13 KiB
Python
Raw Normal View History

from __future__ import annotations
import json
from datetime import UTC, datetime
from decimal import Decimal
from sqlalchemy import create_engine
from sqlalchemy.orm import Session, sessionmaker
from sqlalchemy.pool import StaticPool
from app.core.agent_enums import AgentAssetDomain, AgentAssetStatus, AgentReviewStatus
from app.db.base import Base
from app.models.agent_asset import AgentAsset
from app.models.financial_record import ExpenseClaim
from app.schemas.agent_asset import (
AgentAssetReviewCreate,
AgentAssetRiskRuleGenerateRequest,
AgentAssetRiskRuleReportRequest,
AgentAssetRiskRuleSampleTestRequest,
AgentAssetRiskRuleScenarioTestRequest,
AgentAssetRiskRuleSimulationRequest,
)
from app.services.agent_asset_rule_library import AgentAssetRuleLibraryManager
from app.services.agent_asset_spreadsheet import RISK_RULES_LIBRARY
from app.services.agent_assets import AgentAssetService
from app.services.risk_rule_flow_diagram import (
RiskRuleFlowDiagramRenderer,
RiskRuleFlowDiagramSpec,
)
from app.services.risk_rule_generation import RiskRuleGenerationService
class NullRuntimeChatService:
def complete(self, *args, **kwargs) -> None:
return None
def build_session() -> Session:
engine = create_engine(
"sqlite+pysqlite:///:memory:",
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
Base.metadata.create_all(bind=engine)
session_factory = sessionmaker(bind=engine, autoflush=False, autocommit=False)
return session_factory()
def test_generate_risk_rule_asset_creates_draft_json_rule(tmp_path) -> None:
with build_session() as db:
service = RiskRuleGenerationService(
db,
rule_library_manager=AgentAssetRuleLibraryManager(rule_root=tmp_path / "rules"),
runtime_chat_service=NullRuntimeChatService(),
)
asset_id = service.generate_rule_asset(
AgentAssetRiskRuleGenerateRequest(
business_domain=AgentAssetDomain.EXPENSE,
expense_category="travel",
risk_level="high",
natural_language="住宿城市必须出现在本次差旅行程城市中,否则提示高风险。",
),
actor="pytest",
)
asset = db.get(AgentAsset, asset_id)
assert asset is not None
assert asset.status == AgentAssetStatus.DRAFT.value
assert asset.config_json["detail_mode"] == "json_risk"
assert asset.config_json["evaluator"] == "template_rule"
assert asset.config_json["expense_category"] == "travel"
assert asset.config_json["risk_category"] == "差旅费"
assert asset.scenario_json == ["差旅费"]
assert asset.current_version == "v0.1.0"
file_name = asset.config_json["rule_document"]["file_name"]
rule_path = tmp_path / "rules" / RISK_RULES_LIBRARY / file_name
payload = json.loads(rule_path.read_text(encoding="utf-8"))
assert payload["rule_code"] == asset.code
assert payload["applies_to"]["expense_categories"] == ["travel"]
assert payload["risk_category"] == "差旅费"
assert payload["metadata"]["expense_category"] == "travel"
assert payload["outcomes"]["fail"]["severity"] == "high"
assert payload["template_key"] == "field_compare_v1"
assert payload["metadata"]["natural_language"].startswith("住宿城市")
assert payload["inputs"]["fields"]
assert payload["flow_diagram_svg"].startswith("<svg")
assert 'width="760" height="280"' in payload["flow_diagram_svg"]
assert 'data-risk-flow-style="review-node-only"' in payload["flow_diagram_svg"]
assert "RULE FLOW" in payload["flow_diagram_svg"]
assert "进入复核" in payload["flow_diagram_svg"]
assert "" in payload["flow_diagram_svg"]
assert "" in payload["flow_diagram_svg"]
assert "#dc2626" in payload["flow_diagram_svg"]
assert "#fecaca" in payload["flow_diagram_svg"]
assert "#10a37f" not in payload["flow_diagram_svg"]
assert "#f97316" not in payload["flow_diagram_svg"]
assert "feDropShadow" not in payload["flow_diagram_svg"]
def test_risk_rule_flow_diagram_uses_risk_level_palette() -> None:
renderer = RiskRuleFlowDiagramRenderer()
def render(severity: str, label: str) -> str:
return renderer.render(
RiskRuleFlowDiagramSpec(
title="测试规则",
domain_label="报销",
severity=severity,
severity_label=label,
fields=(),
start="业务单据提交",
evidence="读取规则字段",
decision="判断是否命中风险",
basis="根据规则字段判断",
pass_text="未命中风险,继续流转",
fail_text=f"命中{label},进入复核",
)
)
assert "#2563eb" in render("low", "低风险")
assert "#f97316" in render("medium", "中风险")
high_svg = render("high", "高风险")
assert "#dc2626" in high_svg
assert high_svg.count("#dc2626") == 1
assert "#10a37f" not in high_svg
def test_risk_rule_requires_test_report_before_review_and_publish(tmp_path) -> None:
with build_session() as db:
manager = AgentAssetRuleLibraryManager(rule_root=tmp_path / "rules")
generator = RiskRuleGenerationService(
db,
rule_library_manager=manager,
runtime_chat_service=NullRuntimeChatService(),
)
asset_id = generator.generate_rule_asset(
AgentAssetRiskRuleGenerateRequest(
business_domain=AgentAssetDomain.EXPENSE,
risk_level="high",
natural_language="酒店发票城市必须与行程城市一致,不一致时标记高风险。",
),
actor="pytest",
)
service = AgentAssetService(db)
service.rule_library_manager = manager
asset = db.get(AgentAsset, asset_id)
assert asset is not None
try:
service.create_review(
asset_id,
AgentAssetReviewCreate(
version=asset.working_version or "v0.1.0",
reviewer="manager",
review_status=AgentReviewStatus.PENDING,
review_note="送审",
),
actor="pytest",
)
except PermissionError as exc:
assert "测试通过" in str(exc)
else:
raise AssertionError("未测试通过的风险规则不应允许提交审核")
simulation = service.simulate_risk_rule_message(
asset_id,
AgentAssetRiskRuleSimulationRequest(
message="我想仿真一张酒店报销单酒店发票城市上海申报目的地北京金额580元。",
),
)
assert simulation.execution_mode == "risk_rule_simulation"
assert simulation.ready is True
assert simulation.hit is True
assert simulation.severity == "high"
assert "不创建业务单据" in simulation.summary
assert service.get_latest_risk_rule_test_summary(asset_id).sample is None
blocked_simulation = service.simulate_risk_rule_message(
asset_id,
AgentAssetRiskRuleSimulationRequest(
message="请识别上传单据是否命中风险规则。",
attachments=[{"name": "hotel-invoice.pdf", "content_type": "application/pdf"}],
),
)
assert blocked_simulation.ready is False
assert blocked_simulation.stage == "needs_recognition"
assert blocked_simulation.hit is False
assert "尚未完成识别" in blocked_simulation.summary
db.add(
ExpenseClaim(
claim_no="TEST-CLAIM-001",
employee_name="张三",
department_name="财务部",
expense_type="住宿费",
reason="北京出差住宿",
location="北京",
amount=Decimal("300.00"),
currency="CNY",
invoice_count=0,
occurred_at=datetime.now(UTC),
created_at=datetime.now(UTC),
status="draft",
)
)
db.commit()
sample = service.run_risk_rule_sample_test(
asset_id,
AgentAssetRiskRuleSampleTestRequest(),
actor="pytest",
)
assert sample.passed is True
scenario = service.run_risk_rule_scenario_test(
asset_id,
AgentAssetRiskRuleScenarioTestRequest(intent="用最近30天的住宿报销单试运行"),
actor="pytest",
)
assert scenario.passed is True
assert scenario.result_json["total_count"] == 1
report = service.confirm_risk_rule_test_report(
asset_id,
AgentAssetRiskRuleReportRequest(confirm_passed=True),
actor="pytest",
)
assert report.passed is True
review = service.create_review(
asset_id,
AgentAssetReviewCreate(
version=asset.working_version or "v0.1.0",
reviewer="manager",
review_status=AgentReviewStatus.PENDING,
review_note="送审",
),
actor="pytest",
)
assert review.review_status == AgentReviewStatus.PENDING.value
published = service.publish_risk_rule(asset_id, actor="manager")
assert published.status == AgentAssetStatus.ACTIVE.value
assert published.published_version == asset.working_version
disabled = service.set_risk_rule_enabled(
asset_id,
enabled=False,
actor="manager",
)
assert disabled.config_json["enabled"] is False
rule_document = disabled.config_json["rule_document"]
manifest = manager.read_rule_library_json(
library=RISK_RULES_LIBRARY,
file_name=rule_document["file_name"],
)
assert manifest["enabled"] is False
attachment_required_id = generator.generate_rule_asset(
AgentAssetRiskRuleGenerateRequest(
business_domain=AgentAssetDomain.EXPENSE,
risk_level="medium",
natural_language="发票号码不能为空,缺失时进入中风险复核。",
requires_attachment=True,
),
actor="pytest",
)
attachment_required_asset = db.get(AgentAsset, attachment_required_id)
assert attachment_required_asset is not None
assert attachment_required_asset.config_json["requires_attachment"] is True
attachment_rule_document = attachment_required_asset.config_json["rule_document"]
attachment_manifest = manager.read_rule_library_json(
library=RISK_RULES_LIBRARY,
file_name=attachment_rule_document["file_name"],
)
assert attachment_manifest["requires_attachment"] is True
no_attachment_simulation = service.simulate_risk_rule_message(
attachment_required_id,
AgentAssetRiskRuleSimulationRequest(message="请测试这条规则。"),
)
assert no_attachment_simulation.ready is False
assert no_attachment_simulation.stage == "needs_attachment"
attachment_only_simulation = service.simulate_risk_rule_message(
attachment_required_id,
AgentAssetRiskRuleSimulationRequest(
message="请识别上传单据是否命中风险规则。",
attachments=[
{
"name": "invoice.pdf",
"content_type": "application/pdf",
"document_fields": [
{"key": "invoice_no", "label": "发票号码", "value": "INV-001"}
],
}
],
),
)
assert attachment_only_simulation.ready is False
assert attachment_only_simulation.stage == "needs_test_intent"
def test_delete_unpublished_risk_rule_removes_asset_and_json_file(tmp_path) -> None:
with build_session() as db:
manager = AgentAssetRuleLibraryManager(rule_root=tmp_path / "rules")
asset_id = RiskRuleGenerationService(
db,
rule_library_manager=manager,
runtime_chat_service=NullRuntimeChatService(),
).generate_rule_asset(
AgentAssetRiskRuleGenerateRequest(
business_domain=AgentAssetDomain.EXPENSE,
risk_level="medium",
natural_language="报销事由不能为空,缺失时进入中风险复核。",
),
actor="pytest",
)
asset = db.get(AgentAsset, asset_id)
assert asset is not None
file_name = asset.config_json["rule_document"]["file_name"]
rule_path = tmp_path / "rules" / RISK_RULES_LIBRARY / file_name
assert rule_path.exists()
service = AgentAssetService(db)
service.rule_library_manager = manager
service.delete_unpublished_asset(asset_id, actor="pytest")
assert db.get(AgentAsset, asset_id) is None
assert not rule_path.exists()