From 73aee622c79b1ec436cf29f5f7237a730672c548 Mon Sep 17 00:00:00 2001 From: caoxiaozhu Date: Fri, 3 Jul 2026 14:38:14 +0800 Subject: [PATCH 1/3] =?UTF-8?q?feat(flywheel):=20=E6=96=B0=E5=A2=9E=20Gold?= =?UTF-8?q?enCase=20=E6=A8=A1=E5=9E=8B=E7=94=A8=E4=BA=8E=E8=A7=84=E5=88=99?= =?UTF-8?q?=E5=9B=9E=E5=BD=92=E9=97=A8=E7=A6=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 golden_cases 表,承载版本化的风险规则黄金用例 (case_key/rule_code/values_json/expected_hit/expected_severity/status) - 注册到 db/base.py 和 models/__init__.py,进入 Base.metadata --- server/src/app/db/base.py | 2 ++ server/src/app/models/__init__.py | 2 ++ server/src/app/models/golden_case.py | 48 ++++++++++++++++++++++++++++ 3 files changed, 52 insertions(+) create mode 100644 server/src/app/models/golden_case.py diff --git a/server/src/app/db/base.py b/server/src/app/db/base.py index b9ee3c7..c1a9861 100644 --- a/server/src/app/db/base.py +++ b/server/src/app/db/base.py @@ -21,6 +21,7 @@ from app.models.financial_record import ( ExpenseClaim, ExpenseClaimItem, ) +from app.models.golden_case import GoldenCase from app.models.hermes_config import HermesTaskConfig, HermesTaskExecutionLog from app.models.hermes_report import HermesRiskReport from app.models.notification_state import NotificationState @@ -58,6 +59,7 @@ __all__ = [ "EmployeeChangeLog", "ExpenseClaim", "ExpenseClaimItem", + "GoldenCase", "HermesTaskConfig", "HermesTaskExecutionLog", "HermesRiskReport", diff --git a/server/src/app/models/__init__.py b/server/src/app/models/__init__.py index b3549eb..135aa59 100644 --- a/server/src/app/models/__init__.py +++ b/server/src/app/models/__init__.py @@ -14,6 +14,7 @@ from app.models.financial_record import ( ExpenseClaim, ExpenseClaimItem, ) +from app.models.golden_case import GoldenCase from app.models.hermes_config import HermesTaskConfig, HermesTaskExecutionLog from app.models.hermes_report import HermesRiskReport from app.models.notification_state import NotificationState @@ -49,6 +50,7 @@ __all__ = [ "EmployeeChangeLog", "ExpenseClaim", "ExpenseClaimItem", + "GoldenCase", "HermesTaskConfig", "HermesTaskExecutionLog", "HermesRiskReport", diff --git a/server/src/app/models/golden_case.py b/server/src/app/models/golden_case.py new file mode 100644 index 0000000..4a06348 --- /dev/null +++ b/server/src/app/models/golden_case.py @@ -0,0 +1,48 @@ +from __future__ import annotations + +import uuid +from datetime import datetime +from typing import Any + +from sqlalchemy import Boolean, DateTime, Index, String, Text, func +from sqlalchemy.orm import Mapped, mapped_column +from sqlalchemy.types import JSON + +from app.db.base_class import Base + + +class GoldenCase(Base): + """风险规则回归门禁用的黄金用例。 + + 由运营手动维护(或从已确认风险观测导入),在规则发布前作为回归集执行, + 100% 通过才放行。``values_json`` 复用 ``AgentAssetRiskRuleSampleCase.values`` + 的扁平字典格式,``expected_hit`` / ``expected_severity`` 作为 ground truth。 + """ + + __tablename__ = "golden_cases" + __table_args__ = ( + Index("ix_golden_cases_rule_code_status", "rule_code", "status"), + Index("ix_golden_cases_scene_status", "scene", "status"), + ) + + id: Mapped[str] = mapped_column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) + case_key: Mapped[str] = mapped_column(String(160), unique=True, index=True) + rule_code: Mapped[str | None] = mapped_column(String(120), nullable=True, index=True) + scene: Mapped[str] = mapped_column(String(50), default="", index=True) + + name: Mapped[str] = mapped_column(String(120), default="") + values_json: Mapped[dict[str, Any]] = mapped_column(JSON, default=dict) + expected_hit: Mapped[bool] = mapped_column(Boolean, default=True) + expected_severity: Mapped[str | None] = mapped_column(String(20), nullable=True) + note: Mapped[str | None] = mapped_column(Text(), nullable=True) + + status: Mapped[str] = mapped_column(String(20), default="active", index=True) + source: Mapped[str] = mapped_column(String(30), default="manual") + + created_at: Mapped[datetime] = mapped_column(DateTime, default=func.now(), server_default=func.now()) + updated_at: Mapped[datetime] = mapped_column( + DateTime, + default=func.now(), + onupdate=func.now(), + server_default=func.now(), + ) From 67c3f30eb264d1fc584cfbc1975c6bec0d849ec6 Mon Sep 17 00:00:00 2001 From: caoxiaozhu Date: Fri, 3 Jul 2026 14:38:26 +0800 Subject: [PATCH 2/3] =?UTF-8?q?feat(flywheel):=20golden=20set=20=E5=9B=9E?= =?UTF-8?q?=E5=BD=92=E9=97=A8=E7=A6=81=E6=8B=A6=E6=88=AA=E9=A3=8E=E9=99=A9?= =?UTF-8?q?=E8=A7=84=E5=88=99=E5=8F=91=E5=B8=83?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 RiskRuleGoldenEvaluator:在 GoldenCase 集上跑规则 manifest,复用 RiskRuleTemplateExecutor + _build_synthetic_claim,输出 accuracy/precision/recall,按 100% 通过硬阈值判定 - require_pass 门禁入口:未通过抛 PermissionError 并写 AgentAssetTestRun(test_type=golden) 记录;空集/异常/feature flag 关闭 一律降级放行,不阻塞发布主链路 - _publish_reviewed_working_version 在 test_passed 校验后接入门禁 (修订版 _publish_revision 留待下一轮) --- .../services/agent_asset_risk_rule_publish.py | 46 +++ .../services/risk_rule_golden_evaluator.py | 329 ++++++++++++++++++ 2 files changed, 375 insertions(+) create mode 100644 server/src/app/services/risk_rule_golden_evaluator.py diff --git a/server/src/app/services/agent_asset_risk_rule_publish.py b/server/src/app/services/agent_asset_risk_rule_publish.py index 0e874b1..620671d 100644 --- a/server/src/app/services/agent_asset_risk_rule_publish.py +++ b/server/src/app/services/agent_asset_risk_rule_publish.py @@ -39,6 +39,9 @@ class AgentAssetRiskRulePublishMixin: if not self.get_latest_risk_rule_test_summary(asset, version=version).test_passed: raise PermissionError("当前规则版本尚未完成测试通过确认,不能发布。") + # golden set 回归门禁:在 golden 用例集上跑规则,未 100% 通过则拦截发布。 + self._require_golden_set_passed(asset, version, actor=actor) + before = self._asset_snapshot(asset) self._ensure_approved_review(asset, version=version, actor=actor, note="发布上线前审核通过。") asset.reviewer = actor @@ -176,6 +179,49 @@ class AgentAssetRiskRulePublishMixin: ) ) + def _require_golden_set_passed( + self, + asset: AgentAsset, + version: str, + *, + actor: str, + ) -> None: + """在 golden set 上跑当前规则 manifest,未 100% 通过则拦截发布。 + + 降级策略:feature flag 关闭 / 无 rule_document / 无 golden case / + evaluator 异常 → 一律放行,不阻塞发布主链路。 + """ + + import os + + if os.environ.get("GOLDEN_SET_GATE_ENABLED", "true").strip().lower() in {"0", "false", "no"}: + return + config = asset.config_json if isinstance(asset.config_json, dict) else {} + rule_document = config.get("rule_document") if isinstance(config.get("rule_document"), dict) else {} + file_name = str(rule_document.get("file_name") or "").strip() + if not file_name: + return + try: + manifest = self.rule_library_manager.read_rule_library_json( + library=RISK_RULES_LIBRARY, + file_name=file_name, + ) + except Exception: + return + rule_code = str(manifest.get("rule_code") or "").strip() + if not rule_code: + return + from app.services.risk_rule_golden_evaluator import RiskRuleGoldenEvaluator + + RiskRuleGoldenEvaluator().require_pass( + self.db, + asset, + version, + manifest, + rule_code, + actor=actor, + ) + @staticmethod def _config_from_published_manifest( manifest: dict[str, Any], diff --git a/server/src/app/services/risk_rule_golden_evaluator.py b/server/src/app/services/risk_rule_golden_evaluator.py new file mode 100644 index 0000000..9abebab --- /dev/null +++ b/server/src/app/services/risk_rule_golden_evaluator.py @@ -0,0 +1,329 @@ +"""风险规则 golden set 评测器与发布门禁。 + +在版本化的黄金用例集(:class:`GoldenCase`)上跑规则 manifest,计算 +accuracy/precision/recall,并按"100% 通过"的硬阈值做发布门禁。 + +执行链路完全复用现有能力: +- ``RiskRuleTemplateExecutor.evaluate_with_trace`` 跑规则 +- ``AgentAssetRiskRuleTestingMixin`` 的 static helpers 组装 synthetic claim +- 单条比对逻辑与 ``_run_sample_case`` 保持一致 + +门禁语义与现有 ``test_passed`` 一致:未通过抛 ``PermissionError``, +同时写一条 ``AgentAssetTestRun(test_type='golden')`` 记录结果。 +""" + +from __future__ import annotations + +import os +import uuid +from dataclasses import dataclass, field +from datetime import UTC, date, datetime +from decimal import Decimal, InvalidOperation +from typing import Any + +from sqlalchemy import select +from sqlalchemy.orm import Session + +from app.core.agent_enums import AgentAssetType +from app.core.logging import get_logger +from app.models.agent_asset import AgentAsset, AgentAssetTestRun +from app.models.employee import Employee +from app.models.financial_record import ExpenseClaim, ExpenseClaimItem +from app.models.golden_case import GoldenCase +from app.services.risk_rule_template_executor import RiskRuleTemplateExecutor + +logger = get_logger("app.services.risk_rule_golden_evaluator") + +GOLDEN_GATE_FLAG = "GOLDEN_SET_GATE_ENABLED" + + +@dataclass +class GoldenCaseResult: + case_id: str + name: str + expected_hit: bool + actual_hit: bool + expected_severity: str + actual_severity: str + passed: bool + message: str = "" + evidence: dict[str, Any] = field(default_factory=dict) + trace: dict[str, Any] = field(default_factory=dict) + + +@dataclass +class GoldenEvalReport: + total: int = 0 + passed_count: int = 0 + failed_count: int = 0 + accuracy: float = 0.0 + precision: float = 0.0 + recall: float = 0.0 + all_passed: bool = True + results: list[GoldenCaseResult] = field(default_factory=list) + + def to_dict(self) -> dict[str, Any]: + return { + "total": self.total, + "passed_count": self.passed_count, + "failed_count": self.failed_count, + "accuracy": round(self.accuracy, 4), + "precision": round(self.precision, 4), + "recall": round(self.recall, 4), + "all_passed": self.all_passed, + "results": [ + { + "case_id": r.case_id, + "name": r.name, + "expected_hit": r.expected_hit, + "actual_hit": r.actual_hit, + "expected_severity": r.expected_severity, + "actual_severity": r.actual_severity, + "passed": r.passed, + "message": r.message, + } + for r in self.results + ], + } + + +def _gate_enabled() -> bool: + return os.environ.get(GOLDEN_GATE_FLAG, "true").strip().lower() not in {"0", "false", "no"} + + +# ---- synthetic claim 构建(与 AgentAssetRiskRuleTestingMixin._build_synthetic_claim 一致)---- + +def _extract_manifest_fields(manifest: dict[str, Any]) -> list[dict[str, str]]: + inputs = manifest.get("inputs") if isinstance(manifest.get("inputs"), dict) else {} + fields = inputs.get("fields") if isinstance(inputs.get("fields"), list) else [] + normalized: list[dict[str, str]] = [] + for item in fields: + if not isinstance(item, dict): + continue + key = str(item.get("key") or "").strip() + if key: + normalized.append({"key": key, "label": str(item.get("label") or key).strip()}) + return normalized + + +def _coerce_sample_value(field_key: str, value: Any) -> Any: + import re + + if field_key.endswith("route_cities") and isinstance(value, str): + return [item.strip() for item in re.split(r"[,,、/ ]+", value) if item.strip()] + return value + + +def _to_decimal(value: Any) -> Decimal: + try: + return Decimal(str(value or "0")) + except (InvalidOperation, ValueError): + return Decimal("0") + + +def _build_synthetic_claim( + values: dict[str, Any], + manifest: dict[str, Any], +) -> tuple[ExpenseClaim, list[dict[str, Any]]]: + claim = ExpenseClaim( + claim_no="GOLDEN-RISK-RULE", + employee_name=str(values.get("claim.employee_name") or "测试员工"), + department_name=str(values.get("claim.department_name") or "测试部门"), + expense_type=str(values.get("item.item_type") or "差旅费"), + reason=str(values.get("claim.reason") or "测试报销事由"), + location=str(values.get("claim.location") or "北京"), + amount=_to_decimal(values.get("claim.amount")), + currency="CNY", + invoice_count=1, + occurred_at=datetime.now(UTC), + status="draft", + ) + item = ExpenseClaimItem( + item_date=date.today(), + item_type=str(values.get("item.item_type") or "住宿费"), + item_reason=str(values.get("item.item_reason") or claim.reason), + item_location=str(values.get("item.item_location") or claim.location), + item_amount=_to_decimal(values.get("item.item_amount") or claim.amount), + ) + claim.items = [item] + if values.get("employee.location"): + claim.employee = Employee( + employee_no="GOLDEN-EMPLOYEE", + name=claim.employee_name, + email="golden-rule-test@example.com", + location=str(values.get("employee.location") or ""), + ) + + attachment_fields: list[dict[str, Any]] = [] + document_info: dict[str, Any] = {"fields": attachment_fields} + for field in _extract_manifest_fields(manifest): + key = field["key"] + if key not in values: + continue + value = _coerce_sample_value(key, values.get(key)) + if key.startswith("claim."): + setattr(claim, key.removeprefix("claim."), value) + elif key.startswith("item."): + setattr(item, key.removeprefix("item."), value) + elif key.startswith("attachment."): + short_key = key.removeprefix("attachment.") + document_info[short_key] = value + attachment_fields.append({"key": short_key, "label": field["label"], "value": value}) + return claim, [{"document_info": document_info, "ocr_text": document_info.get("ocr_text", "")}] + + +def _run_single_case( + manifest: dict[str, Any], + values: dict[str, Any], + expected_hit: bool, + expected_severity: str, +) -> GoldenCaseResult: + claim, contexts = _build_synthetic_claim(values, manifest) + execution = RiskRuleTemplateExecutor().evaluate_with_trace(manifest, claim=claim, contexts=contexts) + result = execution["result"] + actual_hit = result is not None + actual_severity = ( + str((manifest.get("outcomes") or {}).get("fail", {}).get("severity") or "").strip() + if actual_hit + else "none" + ) + severity_passed = ( + not actual_hit or not expected_severity or expected_severity == actual_severity + ) + passed = actual_hit == expected_hit and severity_passed + return GoldenCaseResult( + case_id="", + name="", + expected_hit=expected_hit, + actual_hit=actual_hit, + expected_severity=expected_severity, + actual_severity=actual_severity, + passed=passed, + message=str(result.get("message") or "") if isinstance(result, dict) else "", + evidence=result.get("evidence") if isinstance(result, dict) else {}, + trace=execution.get("trace") if isinstance(execution.get("trace"), dict) else {}, + ) + + +def _aggregate(results: list[GoldenCaseResult]) -> GoldenEvalReport: + total = len(results) + if total == 0: + return GoldenEvalReport(total=0, all_passed=True) + passed_count = sum(1 for r in results if r.passed) + tp = sum(1 for r in results if r.expected_hit and r.actual_hit) + fp = sum(1 for r in results if r.expected_hit and not r.actual_hit) # 应命中未命中 + fn = sum(1 for r in results if not r.expected_hit and r.actual_hit) # 不应命中却命中 + precision = tp / (tp + fp) if (tp + fp) > 0 else 0.0 + recall = tp / (tp + fn) if (tp + fn) > 0 else 0.0 + return GoldenEvalReport( + total=total, + passed_count=passed_count, + failed_count=total - passed_count, + accuracy=passed_count / total, + precision=precision, + recall=recall, + all_passed=passed_count == total, + results=results, + ) + + +class RiskRuleGoldenEvaluator: + """在 golden set 上评测规则 manifest 并执行发布门禁。""" + + def evaluate(self, manifest: dict[str, Any], cases: list[GoldenCase]) -> GoldenEvalReport: + results: list[GoldenCaseResult] = [] + for case in cases: + result = _run_single_case( + manifest, + values=case.values_json or {}, + expected_hit=bool(case.expected_hit), + expected_severity=str(case.expected_severity or ""), + ) + result.case_id = case.case_key or case.id + result.name = case.name + results.append(result) + return _aggregate(results) + + def evaluate_for_rule( + self, + db: Session, + manifest: dict[str, Any], + rule_code: str, + ) -> GoldenEvalReport: + cases = list( + db.scalars( + select(GoldenCase).where( + GoldenCase.rule_code == rule_code, + GoldenCase.status == "active", + ) + ) + ) + if not cases: + return GoldenEvalReport(total=0, all_passed=True) + return self.evaluate(manifest, cases) + + def require_pass( + self, + db: Session, + asset: AgentAsset, + version: str, + manifest: dict[str, Any], + rule_code: str, + *, + actor: str, + ) -> GoldenEvalReport: + """发布门禁入口:跑 golden set,未 100% 通过抛 PermissionError。 + + golden set 为空或门禁关闭时放行; evaluator 异常时降级放行(记日志)。 + 无论放行与否,都写一条 ``AgentAssetTestRun(test_type='golden')`` 记录。 + """ + + if not _gate_enabled(): + return GoldenEvalReport(total=0, all_passed=True) + try: + report = self.evaluate_for_rule(db, manifest, rule_code) + except Exception: + logger.exception("golden set 评测异常,降级放行 asset_id=%s", asset.id) + report = GoldenEvalReport(total=0, all_passed=True) + + self._record_test_run(db, asset, version, report, actor=actor) + + if report.total > 0 and not report.all_passed: + failures = report.to_dict()["results"] + raise PermissionError( + f"golden set 回归未通过({report.passed_count}/{report.total})," + f"发布被拦截。失败用例:{failures}" + ) + return report + + def _record_test_run( + self, + db: Session, + asset: AgentAsset, + version: str, + report: GoldenEvalReport, + *, + actor: str, + ) -> None: + try: + run = AgentAssetTestRun( + id=str(uuid.uuid4()), + asset_id=asset.id, + version=version, + test_type="golden", + status="completed", + passed=report.all_passed, + summary=( + f"golden set {report.passed_count}/{report.total} passed" + if report.total > 0 + else "golden set empty, gate skipped" + ), + input_json={"rule_code": getattr(asset, "rule_code", "") or ""}, + result_json=report.to_dict(), + created_by=actor, + ) + db.add(run) + db.commit() + except Exception: + logger.warning("golden test run 记录失败 asset_id=%s", asset.id, exc_info=True) + db.rollback() From c7ba7bb45356372832767fa04c506f6c8c9701ce Mon Sep 17 00:00:00 2001 From: caoxiaozhu Date: Fri, 3 Jul 2026 14:38:43 +0800 Subject: [PATCH 3/3] =?UTF-8?q?feat(flywheel):=20golden=20case=20=E7=AE=A1?= =?UTF-8?q?=E7=90=86=20API=20=E4=B8=8E=E8=AF=84=E6=B5=8B=E5=8D=95=E6=B5=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 GoldenCaseCreate/Read、GoldenEvalRequest/Read schema - 新增 3 个端点:创建 golden case、按规则列表、手动触发 golden 评测 (不入门禁,供运营试跑) - 单测 15 passed:单条 hit/severity 比对、集合 accuracy/precision/recall 聚合、空集降级、100% 通过/失败拦截、feature flag、异常降级 - 回归 test_agent_asset_service 27 passed(1 个预存失败与本改动无关) --- .../src/app/api/v1/endpoints/agent_assets.py | 111 ++++++++ server/src/app/schemas/agent_asset.py | 40 +++ .../tests/test_risk_rule_golden_evaluator.py | 262 ++++++++++++++++++ 3 files changed, 413 insertions(+) create mode 100644 server/tests/test_risk_rule_golden_evaluator.py diff --git a/server/src/app/api/v1/endpoints/agent_assets.py b/server/src/app/api/v1/endpoints/agent_assets.py index 723a3c6..78d284e 100644 --- a/server/src/app/api/v1/endpoints/agent_assets.py +++ b/server/src/app/api/v1/endpoints/agent_assets.py @@ -43,6 +43,10 @@ from app.schemas.agent_asset import ( AgentAssetVersionCreate, AgentAssetVersionRead, AgentAssetVersionTimelineItemRead, + GoldenCaseCreate, + GoldenCaseRead, + GoldenEvalRead, + GoldenEvalRequest, ) from app.schemas.common import ErrorResponse, PaginatedResponse from app.services.agent_assets import AgentAssetService @@ -923,3 +927,110 @@ def get_agent_asset_version_timeline( return AgentAssetService(db).list_version_timeline(asset_id) except Exception as exc: _handle_asset_error(exc) + + +@router.post( + "/risk-rules/golden-cases", + response_model=GoldenCaseRead, + status_code=status.HTTP_201_CREATED, + summary="创建 golden set 黄金用例", + description="为指定规则(或通用场景)创建一条回归用例,发布前作为门禁集执行。", +) +def create_golden_case( + body: GoldenCaseCreate, + _: RuleEditorUser, + db: DbSession, +) -> GoldenCaseRead: + from app.models.golden_case import GoldenCase + from sqlalchemy import select + + existing = db.scalar(select(GoldenCase).where(GoldenCase.case_key == body.case_key)) + if existing is not None: + raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="case_key 已存在") + case = GoldenCase( + case_key=body.case_key, + rule_code=body.rule_code, + scene=body.scene, + name=body.name, + values_json=body.values, + expected_hit=body.expected_hit, + expected_severity=body.expected_severity, + note=body.note, + status="active", + source="manual", + ) + db.add(case) + db.commit() + db.refresh(case) + return _golden_case_read(case) + + +@router.get( + "/risk-rules/{rule_code}/golden-cases", + response_model=list[GoldenCaseRead], + summary="列出规则的 golden 用例", +) +def list_golden_cases( + rule_code: str, + _: CurrentUser, + db: DbSession, +) -> list[GoldenCaseRead]: + from app.models.golden_case import GoldenCase + from sqlalchemy import select + + cases = db.scalars( + select(GoldenCase).where(GoldenCase.rule_code == rule_code).order_by(GoldenCase.created_at) + ).all() + return [_golden_case_read(case) for case in cases] + + +@router.post( + "/{asset_id}/golden-eval", + response_model=GoldenEvalRead, + summary="手动触发 golden set 评测(不入门禁)", + description="在当前规则版本上跑 golden 用例集,返回指标。门禁由 publish 时自动触发。", +) +def run_golden_eval( + asset_id: str, + body: GoldenEvalRequest, + _: RuleReviewerUser, + db: DbSession, +) -> GoldenEvalRead: + from app.services.agent_asset_spreadsheet import RISK_RULES_LIBRARY + from app.services.risk_rule_golden_evaluator import RiskRuleGoldenEvaluator + + try: + asset = AgentAssetService(db).get_asset(asset_id) + if asset is None: + raise LookupError("Asset not found") + config = asset.config_json if isinstance(asset.config_json, dict) else {} + rule_document = config.get("rule_document") if isinstance(config.get("rule_document"), dict) else {} + file_name = str(rule_document.get("file_name") or "").strip() + if not file_name: + raise ValueError("该规则没有可执行的 manifest 文件。") + manager = AgentAssetService(db).rule_library_manager + manifest = manager.read_rule_library_json(library=RISK_RULES_LIBRARY, file_name=file_name) + rule_code = str(manifest.get("rule_code") or "").strip() + if not rule_code: + raise ValueError("manifest 缺少 rule_code。") + version = body.version or asset.working_version or "" + report = RiskRuleGoldenEvaluator().evaluate_for_rule(db, manifest, rule_code) + return GoldenEvalRead(**report.to_dict()) + except Exception as exc: + _handle_asset_error(exc) + + +def _golden_case_read(case) -> GoldenCaseRead: + return GoldenCaseRead( + id=case.id, + case_key=case.case_key, + rule_code=case.rule_code, + scene=case.scene or "", + name=case.name or "", + values=case.values_json or {}, + expected_hit=bool(case.expected_hit), + expected_severity=case.expected_severity, + note=case.note, + status=case.status, + source=case.source, + ) diff --git a/server/src/app/schemas/agent_asset.py b/server/src/app/schemas/agent_asset.py index be6b335..0ba9cd6 100644 --- a/server/src/app/schemas/agent_asset.py +++ b/server/src/app/schemas/agent_asset.py @@ -204,6 +204,46 @@ class AgentAssetRiskRuleReportRequest(BaseModel): note: str | None = Field(default=None, max_length=1000) +class GoldenCaseCreate(BaseModel): + case_key: str = Field(..., max_length=160) + rule_code: str | None = Field(default=None, max_length=120) + scene: str = Field(default="", max_length=50) + name: str = Field(default="", max_length=120) + values: dict[str, Any] = Field(default_factory=dict) + expected_hit: bool = True + expected_severity: str | None = Field(default=None, max_length=20) + note: str | None = None + + +class GoldenCaseRead(BaseModel): + id: str + case_key: str + rule_code: str | None = None + scene: str = "" + name: str = "" + values: dict[str, Any] = Field(default_factory=dict) + expected_hit: bool = True + expected_severity: str | None = None + note: str | None = None + status: str = "active" + source: str = "manual" + + +class GoldenEvalRequest(BaseModel): + version: str | None = Field(default=None, max_length=30) + + +class GoldenEvalRead(BaseModel): + total: int = 0 + passed_count: int = 0 + failed_count: int = 0 + accuracy: float = 0.0 + precision: float = 0.0 + recall: float = 0.0 + all_passed: bool = True + results: list[dict[str, Any]] = Field(default_factory=list) + + class AgentAssetRiskRuleSimulationAttachment(BaseModel): name: str = Field(default="", max_length=240) content_type: str | None = Field(default=None, max_length=120) diff --git a/server/tests/test_risk_rule_golden_evaluator.py b/server/tests/test_risk_rule_golden_evaluator.py new file mode 100644 index 0000000..6acf1ff --- /dev/null +++ b/server/tests/test_risk_rule_golden_evaluator.py @@ -0,0 +1,262 @@ +from __future__ import annotations + +from collections.abc import Generator +from datetime import datetime +from decimal import Decimal +from unittest.mock import MagicMock, patch + +import pytest +from sqlalchemy import create_engine +from sqlalchemy.orm import Session, sessionmaker +from sqlalchemy.pool import StaticPool + +from app.db.base import Base +from app.models.agent_asset import AgentAsset, AgentAssetTestRun +from app.models.employee import Employee +from app.models.financial_record import ExpenseClaim +from app.models.golden_case import GoldenCase +from app.services.risk_rule_golden_evaluator import ( + GoldenEvalReport, + RiskRuleGoldenEvaluator, + _aggregate, + _run_single_case, +) + + +def _build_session() -> Session: + engine = create_engine( + "sqlite+pysqlite:///:memory:", + connect_args={"check_same_thread": False}, + poolclass=StaticPool, + ) + Base.metadata.create_all(bind=engine) + factory = sessionmaker(bind=engine, autoflush=False, autocommit=False) + return factory() + + +def _keyword_manifest() -> dict: + """一个简单的 keyword_match_v1 manifest:reason 含"虚假"则命中。""" + + return { + "rule_code": "risk.test.keyword", + "template_key": "keyword_match_v1", + "inputs": { + "fields": [ + {"key": "claim.reason", "label": "事由", "type": "text", "source": "claim"}, + ] + }, + "params": { + "keywords": ["虚假"], + "field_keys": ["claim.reason"], + "search_fields": ["claim.reason"], + }, + "outcomes": {"fail": {"severity": "high", "risk_score": 80}}, + } + + +def _golden_case( + case_key: str, + *, + reason: str, + expected_hit: bool, + rule_code: str = "risk.test.keyword", +) -> GoldenCase: + return GoldenCase( + case_key=case_key, + rule_code=rule_code, + name=f"case-{case_key}", + values_json={"claim.reason": reason}, + expected_hit=expected_hit, + status="active", + ) + + +def test_run_single_case_hit_matches() -> None: + result = _run_single_case( + _keyword_manifest(), + values={"claim.reason": "虚假发票报销"}, + expected_hit=True, + expected_severity="high", + ) + assert result.actual_hit is True + assert result.passed is True + assert result.actual_severity == "high" + + +def test_run_single_case_no_hit_matches() -> None: + result = _run_single_case( + _keyword_manifest(), + values={"claim.reason": "正常差旅报销"}, + expected_hit=False, + expected_severity="", + ) + assert result.actual_hit is False + assert result.passed is True + + +def test_run_single_case_mismatch_fails() -> None: + result = _run_single_case( + _keyword_manifest(), + values={"claim.reason": "虚假发票"}, + expected_hit=False, # 期望不命中,但实际命中 + expected_severity="", + ) + assert result.actual_hit is True + assert result.passed is False + + +def test_run_single_case_severity_mismatch_fails() -> None: + result = _run_single_case( + _keyword_manifest(), + values={"claim.reason": "虚假发票"}, + expected_hit=True, + expected_severity="critical", # 实际是 high + ) + assert result.passed is False + + +def test_aggregate_empty_returns_passed() -> None: + report = _aggregate([]) + assert report.total == 0 + assert report.all_passed is True + assert report.accuracy == 0.0 + + +def test_aggregate_all_passed() -> None: + from app.services.risk_rule_golden_evaluator import GoldenCaseResult + + results = [ + GoldenCaseResult("1", "a", True, True, "high", "high", True), + GoldenCaseResult("2", "b", False, False, "", "none", True), + ] + report = _aggregate(results) + assert report.total == 2 + assert report.passed_count == 2 + assert report.accuracy == 1.0 + assert report.all_passed is True + + +def test_aggregate_with_failure() -> None: + from app.services.risk_rule_golden_evaluator import GoldenCaseResult + + results = [ + GoldenCaseResult("1", "a", True, True, "high", "high", True), + GoldenCaseResult("2", "b", True, False, "high", "none", False), # FP + ] + report = _aggregate(results) + assert report.passed_count == 1 + assert report.failed_count == 1 + assert report.accuracy == 0.5 + assert report.all_passed is False + assert report.precision == 0.5 # 1/(1+1) + + +def test_evaluate_for_rule_empty_returns_passed() -> None: + with _build_session() as db: + report = RiskRuleGoldenEvaluator().evaluate_for_rule(db, _keyword_manifest(), "risk.test.keyword") + assert report.total == 0 + assert report.all_passed is True + + +def test_evaluate_for_rule_all_pass() -> None: + with _build_session() as db: + db.add(_golden_case("g1", reason="虚假发票", expected_hit=True)) + db.add(_golden_case("g2", reason="正常报销", expected_hit=False)) + db.commit() + report = RiskRuleGoldenEvaluator().evaluate_for_rule(db, _keyword_manifest(), "risk.test.keyword") + assert report.total == 2 + assert report.all_passed is True + assert report.accuracy == 1.0 + + +def test_evaluate_for_rule_with_failure() -> None: + with _build_session() as db: + db.add(_golden_case("g1", reason="虚假发票", expected_hit=False)) # 期望不命中但实际命中 + db.add(_golden_case("g2", reason="正常报销", expected_hit=True)) # 期望命中但实际不命中 + db.commit() + report = RiskRuleGoldenEvaluator().evaluate_for_rule(db, _keyword_manifest(), "risk.test.keyword") + assert report.total == 2 + assert report.all_passed is False + assert report.failed_count == 2 + + +def _asset(asset_id: str, code: str) -> AgentAsset: + return AgentAsset( + id=asset_id, + code=code, + name=code, + asset_type="rule", + domain="expense", + owner="tester", + status="review", + working_version="v1", + ) + + +def test_require_pass_passes_when_all_green() -> None: + with _build_session() as db: + asset = _asset("a1", "R1") + db.add(asset) + db.add(_golden_case("g1", reason="虚假", expected_hit=True)) + db.commit() + report = RiskRuleGoldenEvaluator().require_pass( + db, asset, "v1", _keyword_manifest(), "risk.test.keyword", actor="tester" + ) + assert report.all_passed is True + # 应写一条 test_type='golden' 记录 + run = db.query(AgentAssetTestRun).filter_by(asset_id="a1", test_type="golden").one() + assert run.passed is True + + +def test_require_pass_raises_on_failure() -> None: + with _build_session() as db: + asset = _asset("a2", "R2") + db.add(asset) + db.add(_golden_case("g1", reason="虚假", expected_hit=False)) # 会失败 + db.commit() + with pytest.raises(PermissionError): + RiskRuleGoldenEvaluator().require_pass( + db, asset, "v1", _keyword_manifest(), "risk.test.keyword", actor="tester" + ) + run = db.query(AgentAssetTestRun).filter_by(asset_id="a2", test_type="golden").one() + assert run.passed is False + + +def test_require_pass_empty_golden_set_passes() -> None: + with _build_session() as db: + asset = _asset("a3", "R3") + db.add(asset) + db.commit() + report = RiskRuleGoldenEvaluator().require_pass( + db, asset, "v1", _keyword_manifest(), "risk.test.keyword", actor="tester" + ) + assert report.total == 0 + assert report.all_passed is True + + +def test_require_pass_respects_feature_flag(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("GOLDEN_SET_GATE_ENABLED", "false") + with _build_session() as db: + asset = _asset("a4", "R4") + db.add(asset) + db.add(_golden_case("g1", reason="虚假", expected_hit=False)) # 本应失败 + db.commit() + # 门禁关闭,应放行不抛异常 + report = RiskRuleGoldenEvaluator().require_pass( + db, asset, "v1", _keyword_manifest(), "risk.test.keyword", actor="tester" + ) + assert report.total == 0 + + +def test_require_pass_swallows_evaluator_exception() -> None: + with _build_session() as db: + asset = _asset("a5", "R5") + db.add(asset) + db.commit() + evaluator = RiskRuleGoldenEvaluator() + with patch.object(evaluator, "evaluate_for_rule", side_effect=RuntimeError("boom")): + report = evaluator.require_pass( + db, asset, "v1", _keyword_manifest(), "risk.test.keyword", actor="tester" + ) + assert report.total == 0 + assert report.all_passed is True # 降级放行