3 Commits

Author SHA1 Message Date
caoxiaozhu
7ced9d93bc chore(rules): 更新公司通信费报销规则表 2026-06-30 11:41:09 +08:00
caoxiaozhu
876cf342ac docs(dev-log): 记录多 task task2 时序缺陷 bug 修复 2026-06-30 11:40:46 +08:00
caoxiaozhu
43779f8f2c fix(web): 多 task 串行推进 task2 不再被预览生成时提前触发
onPreviewReadyForNextTask 在 task1 申请核对表刚生成、用户还没操作时就
提前拉起 task2,与用户后续在 task1 上的保存草稿/提交操作互相打架,导致
task2 完全无反应。移除该提前推进回调,统一由 onApplicationActionCompleted
在 task1 真正完成后再推进 task2。

- useWorkbenchAiApplicationPreviewFlow: 删除预览生成时的提前推进分支
- usePersonalWorkbenchAiMode: startModelPlannedApplicationPreview 不再传 onPreviewReadyForNextTask
- useWorkbenchAiActionRouter: 低置信确认按钮分支同步删除该回调
- 新增时序回归测试:预览生成不提前推进、保存草稿后才推进
- 更新两处源码正则断言为 doesNotMatch
2026-06-30 11:40:31 +08:00
16 changed files with 82 additions and 856 deletions

View File

@@ -0,0 +1,16 @@
# 多 task 串行推进时 task2 无法启动onPreviewReadyForNextTask 时序缺陷)
日期2026-06-30
文档路径document/development/2026-06-30/dev-logs/bugs/multi-task-next-task-blocked-by-preview-ready-timing.md
## 修复记录
- 11:18记录 bug 修复:用户在 AI 工作台输入"出差申请 + 招待费报销"等多 task 时task1出差申请保存草稿/提交成功后task2招待费报销完全无法启动界面停在"申请草稿已保存"。
- Git 提交检查:`git fetch --all --prune` 成功upstream `origin/main``HEAD..@{u}` 未发现 upstream 新提交;`@{u}..HEAD` 未发现本地 ahead 提交。工作区改动仅为本次 3 个源文件 + 2 个测试文件(另有一个预先存在的未提交改动 `server/rules/finance-rules/公司通信费报销规则.xlsx`,与本次无关)。
- 根因:`onPreviewReadyForNextTask` 回调在 task1 申请核对表**刚生成、用户还没看、还没点保存草稿**时就立刻触发 `startModelPlannedNextTask`,提前把 task2 招待费报销拉起(`startAiExpenseDraft` 会 push 一条"选择费用报销"用户消息 + 报销 prompt。两条流程的消息和状态互相打架用户再在 task1 上点保存草稿时 `onApplicationActionCompleted` 又试图拉起 task2但 task2 状态已被前面 `onPreviewReadyForNextTask` 搞乱,最终表现为"完全无反应"。运行时复现脚本时序铁证:预览生成后立即出现 `!!! onPreviewReadyForNextTask 触发(task1预览刚生成,用户还没操作)`与串行推进的正确语义task1 完成后才推进 task2冲突。这是早期实现的残留——引入 `onApplicationActionCompleted`task1 完成后触发)后,`onPreviewReadyForNextTask` 职责重叠且时序错误。
- 修改(前端 web3 个源文件):
- `web/src/composables/workbenchAiMode/useWorkbenchAiApplicationPreviewFlow.js`:删除 `startAiApplicationPreview` 预览生成后的 `else if (onPreviewReadyForNextTask ...)` 提前推进分支(原 L622-L628并加注释说明 task2 推进统一交给 `onApplicationActionCompleted` 在 task1 真正完成后触发。`executeInlineApplicationPreviewAction` 里 L466 的 `actionCompletedHandler` 回落逻辑保留不动(手动点保存草稿走 actionRouter 不传 options 回调,回落到模块级 `startModelPlannedNextTask`,这是正确的续跑路径)。
- `web/src/composables/workbenchAiMode/usePersonalWorkbenchAiMode.js``startModelPlannedApplicationPreview` 调用 `startAiApplicationPreview` 时删除 `onPreviewReadyForNextTask: startModelPlannedNextTask` 一行,只保留 `onApplicationActionCompleted: startModelPlannedNextTask`(原 L769
- `web/src/composables/workbenchAiMode/useWorkbenchAiActionRouter.js``ai_application_confirm_intent`(低置信确认按钮)分支删除 `onPreviewReadyForNextTask` 回调,只保留 `onApplicationActionCompleted`(原 L99-L104消除低置信路径的同样时序缺陷。
- 操作:先写复现脚本 `/tmp/repro-timing.mjs`applicationFlow + 两个回调)锁定时序根因——修复前预览生成后立即触发推进回调,修复后无提前触发;再按计划小步改 3 个源文件 + 2 个测试文件;未提交(工作区有预先存在的无关 xlsx 改动,未自动提交)。
- 验证:宿主机 node v22.22.3 跑 `node --test web/tests/workbench-ai-intent-planner-model.test.mjs web/tests/workbench-ai-action-router.test.mjs web/tests/workbench-ai-application-context-submit.test.mjs` 通过 27/27含新增的时序回归用例 `workbench application preview does not continue next task until draft is saved or submitted`:断言预览生成时 `continuedTasks.length === 0`、保存草稿后才推进 task2 且走模块级续跑回调、自动续跑时不展示重复的"继续处理"按钮);`npm --prefix web run build` 通过3.97s);复现脚本 `/tmp/repro-timing.mjs` 修复后事件序列只剩用户消息、无提前推进;真实 `http://localhost:5173/api/v1/steward/plans``/api/v1/steward/plans/stream` 采样确认该句子仍返回 `expense_application` + `reimbursement` 两个 task后端拆分正确本次未动后端
- 影响:用户输入框提交"2月20-23日去上海出差辅助国网仿生产服务器部署并且报销昨天的上午招待费2000元"等多 task 时task1 出差申请核对表生成后干净停下等用户操作,用户点保存草稿/直接提交成功后自动进入 task2 招待费报销(预填金额/时间/事由),不再出现两条流程打架导致 task2 完全无反应的问题。不影响后端、单 task 场景、autoSaveDraft 路径(它走 `executeInlineApplicationPreviewAction` 完成后触发 `onApplicationActionCompleted`,链路不变);低置信确认按钮路径也同步修复。

View File

@@ -43,10 +43,6 @@ from app.schemas.agent_asset import (
AgentAssetVersionCreate, AgentAssetVersionCreate,
AgentAssetVersionRead, AgentAssetVersionRead,
AgentAssetVersionTimelineItemRead, AgentAssetVersionTimelineItemRead,
GoldenCaseCreate,
GoldenCaseRead,
GoldenEvalRead,
GoldenEvalRequest,
) )
from app.schemas.common import ErrorResponse, PaginatedResponse from app.schemas.common import ErrorResponse, PaginatedResponse
from app.services.agent_assets import AgentAssetService from app.services.agent_assets import AgentAssetService
@@ -927,110 +923,3 @@ def get_agent_asset_version_timeline(
return AgentAssetService(db).list_version_timeline(asset_id) return AgentAssetService(db).list_version_timeline(asset_id)
except Exception as exc: except Exception as exc:
_handle_asset_error(exc) _handle_asset_error(exc)
@router.post(
"/risk-rules/golden-cases",
response_model=GoldenCaseRead,
status_code=status.HTTP_201_CREATED,
summary="创建 golden set 黄金用例",
description="为指定规则(或通用场景)创建一条回归用例,发布前作为门禁集执行。",
)
def create_golden_case(
body: GoldenCaseCreate,
_: RuleEditorUser,
db: DbSession,
) -> GoldenCaseRead:
from app.models.golden_case import GoldenCase
from sqlalchemy import select
existing = db.scalar(select(GoldenCase).where(GoldenCase.case_key == body.case_key))
if existing is not None:
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="case_key 已存在")
case = GoldenCase(
case_key=body.case_key,
rule_code=body.rule_code,
scene=body.scene,
name=body.name,
values_json=body.values,
expected_hit=body.expected_hit,
expected_severity=body.expected_severity,
note=body.note,
status="active",
source="manual",
)
db.add(case)
db.commit()
db.refresh(case)
return _golden_case_read(case)
@router.get(
"/risk-rules/{rule_code}/golden-cases",
response_model=list[GoldenCaseRead],
summary="列出规则的 golden 用例",
)
def list_golden_cases(
rule_code: str,
_: CurrentUser,
db: DbSession,
) -> list[GoldenCaseRead]:
from app.models.golden_case import GoldenCase
from sqlalchemy import select
cases = db.scalars(
select(GoldenCase).where(GoldenCase.rule_code == rule_code).order_by(GoldenCase.created_at)
).all()
return [_golden_case_read(case) for case in cases]
@router.post(
"/{asset_id}/golden-eval",
response_model=GoldenEvalRead,
summary="手动触发 golden set 评测(不入门禁)",
description="在当前规则版本上跑 golden 用例集,返回指标。门禁由 publish 时自动触发。",
)
def run_golden_eval(
asset_id: str,
body: GoldenEvalRequest,
_: RuleReviewerUser,
db: DbSession,
) -> GoldenEvalRead:
from app.services.agent_asset_spreadsheet import RISK_RULES_LIBRARY
from app.services.risk_rule_golden_evaluator import RiskRuleGoldenEvaluator
try:
asset = AgentAssetService(db).get_asset(asset_id)
if asset is None:
raise LookupError("Asset not found")
config = asset.config_json if isinstance(asset.config_json, dict) else {}
rule_document = config.get("rule_document") if isinstance(config.get("rule_document"), dict) else {}
file_name = str(rule_document.get("file_name") or "").strip()
if not file_name:
raise ValueError("该规则没有可执行的 manifest 文件。")
manager = AgentAssetService(db).rule_library_manager
manifest = manager.read_rule_library_json(library=RISK_RULES_LIBRARY, file_name=file_name)
rule_code = str(manifest.get("rule_code") or "").strip()
if not rule_code:
raise ValueError("manifest 缺少 rule_code。")
version = body.version or asset.working_version or ""
report = RiskRuleGoldenEvaluator().evaluate_for_rule(db, manifest, rule_code)
return GoldenEvalRead(**report.to_dict())
except Exception as exc:
_handle_asset_error(exc)
def _golden_case_read(case) -> GoldenCaseRead:
return GoldenCaseRead(
id=case.id,
case_key=case.case_key,
rule_code=case.rule_code,
scene=case.scene or "",
name=case.name or "",
values=case.values_json or {},
expected_hit=bool(case.expected_hit),
expected_severity=case.expected_severity,
note=case.note,
status=case.status,
source=case.source,
)

View File

@@ -21,7 +21,6 @@ from app.models.financial_record import (
ExpenseClaim, ExpenseClaim,
ExpenseClaimItem, ExpenseClaimItem,
) )
from app.models.golden_case import GoldenCase
from app.models.hermes_config import HermesTaskConfig, HermesTaskExecutionLog from app.models.hermes_config import HermesTaskConfig, HermesTaskExecutionLog
from app.models.hermes_report import HermesRiskReport from app.models.hermes_report import HermesRiskReport
from app.models.notification_state import NotificationState from app.models.notification_state import NotificationState
@@ -59,7 +58,6 @@ __all__ = [
"EmployeeChangeLog", "EmployeeChangeLog",
"ExpenseClaim", "ExpenseClaim",
"ExpenseClaimItem", "ExpenseClaimItem",
"GoldenCase",
"HermesTaskConfig", "HermesTaskConfig",
"HermesTaskExecutionLog", "HermesTaskExecutionLog",
"HermesRiskReport", "HermesRiskReport",

View File

@@ -14,7 +14,6 @@ from app.models.financial_record import (
ExpenseClaim, ExpenseClaim,
ExpenseClaimItem, ExpenseClaimItem,
) )
from app.models.golden_case import GoldenCase
from app.models.hermes_config import HermesTaskConfig, HermesTaskExecutionLog from app.models.hermes_config import HermesTaskConfig, HermesTaskExecutionLog
from app.models.hermes_report import HermesRiskReport from app.models.hermes_report import HermesRiskReport
from app.models.notification_state import NotificationState from app.models.notification_state import NotificationState
@@ -50,7 +49,6 @@ __all__ = [
"EmployeeChangeLog", "EmployeeChangeLog",
"ExpenseClaim", "ExpenseClaim",
"ExpenseClaimItem", "ExpenseClaimItem",
"GoldenCase",
"HermesTaskConfig", "HermesTaskConfig",
"HermesTaskExecutionLog", "HermesTaskExecutionLog",
"HermesRiskReport", "HermesRiskReport",

View File

@@ -1,48 +0,0 @@
from __future__ import annotations
import uuid
from datetime import datetime
from typing import Any
from sqlalchemy import Boolean, DateTime, Index, String, Text, func
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy.types import JSON
from app.db.base_class import Base
class GoldenCase(Base):
"""风险规则回归门禁用的黄金用例。
由运营手动维护(或从已确认风险观测导入),在规则发布前作为回归集执行,
100% 通过才放行。``values_json`` 复用 ``AgentAssetRiskRuleSampleCase.values``
的扁平字典格式,``expected_hit`` / ``expected_severity`` 作为 ground truth。
"""
__tablename__ = "golden_cases"
__table_args__ = (
Index("ix_golden_cases_rule_code_status", "rule_code", "status"),
Index("ix_golden_cases_scene_status", "scene", "status"),
)
id: Mapped[str] = mapped_column(String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
case_key: Mapped[str] = mapped_column(String(160), unique=True, index=True)
rule_code: Mapped[str | None] = mapped_column(String(120), nullable=True, index=True)
scene: Mapped[str] = mapped_column(String(50), default="", index=True)
name: Mapped[str] = mapped_column(String(120), default="")
values_json: Mapped[dict[str, Any]] = mapped_column(JSON, default=dict)
expected_hit: Mapped[bool] = mapped_column(Boolean, default=True)
expected_severity: Mapped[str | None] = mapped_column(String(20), nullable=True)
note: Mapped[str | None] = mapped_column(Text(), nullable=True)
status: Mapped[str] = mapped_column(String(20), default="active", index=True)
source: Mapped[str] = mapped_column(String(30), default="manual")
created_at: Mapped[datetime] = mapped_column(DateTime, default=func.now(), server_default=func.now())
updated_at: Mapped[datetime] = mapped_column(
DateTime,
default=func.now(),
onupdate=func.now(),
server_default=func.now(),
)

View File

@@ -204,46 +204,6 @@ class AgentAssetRiskRuleReportRequest(BaseModel):
note: str | None = Field(default=None, max_length=1000) note: str | None = Field(default=None, max_length=1000)
class GoldenCaseCreate(BaseModel):
case_key: str = Field(..., max_length=160)
rule_code: str | None = Field(default=None, max_length=120)
scene: str = Field(default="", max_length=50)
name: str = Field(default="", max_length=120)
values: dict[str, Any] = Field(default_factory=dict)
expected_hit: bool = True
expected_severity: str | None = Field(default=None, max_length=20)
note: str | None = None
class GoldenCaseRead(BaseModel):
id: str
case_key: str
rule_code: str | None = None
scene: str = ""
name: str = ""
values: dict[str, Any] = Field(default_factory=dict)
expected_hit: bool = True
expected_severity: str | None = None
note: str | None = None
status: str = "active"
source: str = "manual"
class GoldenEvalRequest(BaseModel):
version: str | None = Field(default=None, max_length=30)
class GoldenEvalRead(BaseModel):
total: int = 0
passed_count: int = 0
failed_count: int = 0
accuracy: float = 0.0
precision: float = 0.0
recall: float = 0.0
all_passed: bool = True
results: list[dict[str, Any]] = Field(default_factory=list)
class AgentAssetRiskRuleSimulationAttachment(BaseModel): class AgentAssetRiskRuleSimulationAttachment(BaseModel):
name: str = Field(default="", max_length=240) name: str = Field(default="", max_length=240)
content_type: str | None = Field(default=None, max_length=120) content_type: str | None = Field(default=None, max_length=120)

View File

@@ -39,9 +39,6 @@ class AgentAssetRiskRulePublishMixin:
if not self.get_latest_risk_rule_test_summary(asset, version=version).test_passed: if not self.get_latest_risk_rule_test_summary(asset, version=version).test_passed:
raise PermissionError("当前规则版本尚未完成测试通过确认,不能发布。") raise PermissionError("当前规则版本尚未完成测试通过确认,不能发布。")
# golden set 回归门禁:在 golden 用例集上跑规则,未 100% 通过则拦截发布。
self._require_golden_set_passed(asset, version, actor=actor)
before = self._asset_snapshot(asset) before = self._asset_snapshot(asset)
self._ensure_approved_review(asset, version=version, actor=actor, note="发布上线前审核通过。") self._ensure_approved_review(asset, version=version, actor=actor, note="发布上线前审核通过。")
asset.reviewer = actor asset.reviewer = actor
@@ -179,49 +176,6 @@ class AgentAssetRiskRulePublishMixin:
) )
) )
def _require_golden_set_passed(
self,
asset: AgentAsset,
version: str,
*,
actor: str,
) -> None:
"""在 golden set 上跑当前规则 manifest未 100% 通过则拦截发布。
降级策略feature flag 关闭 / 无 rule_document / 无 golden case /
evaluator 异常 → 一律放行,不阻塞发布主链路。
"""
import os
if os.environ.get("GOLDEN_SET_GATE_ENABLED", "true").strip().lower() in {"0", "false", "no"}:
return
config = asset.config_json if isinstance(asset.config_json, dict) else {}
rule_document = config.get("rule_document") if isinstance(config.get("rule_document"), dict) else {}
file_name = str(rule_document.get("file_name") or "").strip()
if not file_name:
return
try:
manifest = self.rule_library_manager.read_rule_library_json(
library=RISK_RULES_LIBRARY,
file_name=file_name,
)
except Exception:
return
rule_code = str(manifest.get("rule_code") or "").strip()
if not rule_code:
return
from app.services.risk_rule_golden_evaluator import RiskRuleGoldenEvaluator
RiskRuleGoldenEvaluator().require_pass(
self.db,
asset,
version,
manifest,
rule_code,
actor=actor,
)
@staticmethod @staticmethod
def _config_from_published_manifest( def _config_from_published_manifest(
manifest: dict[str, Any], manifest: dict[str, Any],

View File

@@ -1,329 +0,0 @@
"""风险规则 golden set 评测器与发布门禁。
在版本化的黄金用例集(:class:`GoldenCase`)上跑规则 manifest计算
accuracy/precision/recall并按"100% 通过"的硬阈值做发布门禁。
执行链路完全复用现有能力:
- ``RiskRuleTemplateExecutor.evaluate_with_trace`` 跑规则
- ``AgentAssetRiskRuleTestingMixin`` 的 static helpers 组装 synthetic claim
- 单条比对逻辑与 ``_run_sample_case`` 保持一致
门禁语义与现有 ``test_passed`` 一致:未通过抛 ``PermissionError``
同时写一条 ``AgentAssetTestRun(test_type='golden')`` 记录结果。
"""
from __future__ import annotations
import os
import uuid
from dataclasses import dataclass, field
from datetime import UTC, date, datetime
from decimal import Decimal, InvalidOperation
from typing import Any
from sqlalchemy import select
from sqlalchemy.orm import Session
from app.core.agent_enums import AgentAssetType
from app.core.logging import get_logger
from app.models.agent_asset import AgentAsset, AgentAssetTestRun
from app.models.employee import Employee
from app.models.financial_record import ExpenseClaim, ExpenseClaimItem
from app.models.golden_case import GoldenCase
from app.services.risk_rule_template_executor import RiskRuleTemplateExecutor
logger = get_logger("app.services.risk_rule_golden_evaluator")
GOLDEN_GATE_FLAG = "GOLDEN_SET_GATE_ENABLED"
@dataclass
class GoldenCaseResult:
case_id: str
name: str
expected_hit: bool
actual_hit: bool
expected_severity: str
actual_severity: str
passed: bool
message: str = ""
evidence: dict[str, Any] = field(default_factory=dict)
trace: dict[str, Any] = field(default_factory=dict)
@dataclass
class GoldenEvalReport:
total: int = 0
passed_count: int = 0
failed_count: int = 0
accuracy: float = 0.0
precision: float = 0.0
recall: float = 0.0
all_passed: bool = True
results: list[GoldenCaseResult] = field(default_factory=list)
def to_dict(self) -> dict[str, Any]:
return {
"total": self.total,
"passed_count": self.passed_count,
"failed_count": self.failed_count,
"accuracy": round(self.accuracy, 4),
"precision": round(self.precision, 4),
"recall": round(self.recall, 4),
"all_passed": self.all_passed,
"results": [
{
"case_id": r.case_id,
"name": r.name,
"expected_hit": r.expected_hit,
"actual_hit": r.actual_hit,
"expected_severity": r.expected_severity,
"actual_severity": r.actual_severity,
"passed": r.passed,
"message": r.message,
}
for r in self.results
],
}
def _gate_enabled() -> bool:
return os.environ.get(GOLDEN_GATE_FLAG, "true").strip().lower() not in {"0", "false", "no"}
# ---- synthetic claim 构建(与 AgentAssetRiskRuleTestingMixin._build_synthetic_claim 一致)----
def _extract_manifest_fields(manifest: dict[str, Any]) -> list[dict[str, str]]:
inputs = manifest.get("inputs") if isinstance(manifest.get("inputs"), dict) else {}
fields = inputs.get("fields") if isinstance(inputs.get("fields"), list) else []
normalized: list[dict[str, str]] = []
for item in fields:
if not isinstance(item, dict):
continue
key = str(item.get("key") or "").strip()
if key:
normalized.append({"key": key, "label": str(item.get("label") or key).strip()})
return normalized
def _coerce_sample_value(field_key: str, value: Any) -> Any:
import re
if field_key.endswith("route_cities") and isinstance(value, str):
return [item.strip() for item in re.split(r"[,,、/ ]+", value) if item.strip()]
return value
def _to_decimal(value: Any) -> Decimal:
try:
return Decimal(str(value or "0"))
except (InvalidOperation, ValueError):
return Decimal("0")
def _build_synthetic_claim(
values: dict[str, Any],
manifest: dict[str, Any],
) -> tuple[ExpenseClaim, list[dict[str, Any]]]:
claim = ExpenseClaim(
claim_no="GOLDEN-RISK-RULE",
employee_name=str(values.get("claim.employee_name") or "测试员工"),
department_name=str(values.get("claim.department_name") or "测试部门"),
expense_type=str(values.get("item.item_type") or "差旅费"),
reason=str(values.get("claim.reason") or "测试报销事由"),
location=str(values.get("claim.location") or "北京"),
amount=_to_decimal(values.get("claim.amount")),
currency="CNY",
invoice_count=1,
occurred_at=datetime.now(UTC),
status="draft",
)
item = ExpenseClaimItem(
item_date=date.today(),
item_type=str(values.get("item.item_type") or "住宿费"),
item_reason=str(values.get("item.item_reason") or claim.reason),
item_location=str(values.get("item.item_location") or claim.location),
item_amount=_to_decimal(values.get("item.item_amount") or claim.amount),
)
claim.items = [item]
if values.get("employee.location"):
claim.employee = Employee(
employee_no="GOLDEN-EMPLOYEE",
name=claim.employee_name,
email="golden-rule-test@example.com",
location=str(values.get("employee.location") or ""),
)
attachment_fields: list[dict[str, Any]] = []
document_info: dict[str, Any] = {"fields": attachment_fields}
for field in _extract_manifest_fields(manifest):
key = field["key"]
if key not in values:
continue
value = _coerce_sample_value(key, values.get(key))
if key.startswith("claim."):
setattr(claim, key.removeprefix("claim."), value)
elif key.startswith("item."):
setattr(item, key.removeprefix("item."), value)
elif key.startswith("attachment."):
short_key = key.removeprefix("attachment.")
document_info[short_key] = value
attachment_fields.append({"key": short_key, "label": field["label"], "value": value})
return claim, [{"document_info": document_info, "ocr_text": document_info.get("ocr_text", "")}]
def _run_single_case(
manifest: dict[str, Any],
values: dict[str, Any],
expected_hit: bool,
expected_severity: str,
) -> GoldenCaseResult:
claim, contexts = _build_synthetic_claim(values, manifest)
execution = RiskRuleTemplateExecutor().evaluate_with_trace(manifest, claim=claim, contexts=contexts)
result = execution["result"]
actual_hit = result is not None
actual_severity = (
str((manifest.get("outcomes") or {}).get("fail", {}).get("severity") or "").strip()
if actual_hit
else "none"
)
severity_passed = (
not actual_hit or not expected_severity or expected_severity == actual_severity
)
passed = actual_hit == expected_hit and severity_passed
return GoldenCaseResult(
case_id="",
name="",
expected_hit=expected_hit,
actual_hit=actual_hit,
expected_severity=expected_severity,
actual_severity=actual_severity,
passed=passed,
message=str(result.get("message") or "") if isinstance(result, dict) else "",
evidence=result.get("evidence") if isinstance(result, dict) else {},
trace=execution.get("trace") if isinstance(execution.get("trace"), dict) else {},
)
def _aggregate(results: list[GoldenCaseResult]) -> GoldenEvalReport:
total = len(results)
if total == 0:
return GoldenEvalReport(total=0, all_passed=True)
passed_count = sum(1 for r in results if r.passed)
tp = sum(1 for r in results if r.expected_hit and r.actual_hit)
fp = sum(1 for r in results if r.expected_hit and not r.actual_hit) # 应命中未命中
fn = sum(1 for r in results if not r.expected_hit and r.actual_hit) # 不应命中却命中
precision = tp / (tp + fp) if (tp + fp) > 0 else 0.0
recall = tp / (tp + fn) if (tp + fn) > 0 else 0.0
return GoldenEvalReport(
total=total,
passed_count=passed_count,
failed_count=total - passed_count,
accuracy=passed_count / total,
precision=precision,
recall=recall,
all_passed=passed_count == total,
results=results,
)
class RiskRuleGoldenEvaluator:
"""在 golden set 上评测规则 manifest 并执行发布门禁。"""
def evaluate(self, manifest: dict[str, Any], cases: list[GoldenCase]) -> GoldenEvalReport:
results: list[GoldenCaseResult] = []
for case in cases:
result = _run_single_case(
manifest,
values=case.values_json or {},
expected_hit=bool(case.expected_hit),
expected_severity=str(case.expected_severity or ""),
)
result.case_id = case.case_key or case.id
result.name = case.name
results.append(result)
return _aggregate(results)
def evaluate_for_rule(
self,
db: Session,
manifest: dict[str, Any],
rule_code: str,
) -> GoldenEvalReport:
cases = list(
db.scalars(
select(GoldenCase).where(
GoldenCase.rule_code == rule_code,
GoldenCase.status == "active",
)
)
)
if not cases:
return GoldenEvalReport(total=0, all_passed=True)
return self.evaluate(manifest, cases)
def require_pass(
self,
db: Session,
asset: AgentAsset,
version: str,
manifest: dict[str, Any],
rule_code: str,
*,
actor: str,
) -> GoldenEvalReport:
"""发布门禁入口:跑 golden set未 100% 通过抛 PermissionError。
golden set 为空或门禁关闭时放行; evaluator 异常时降级放行(记日志)。
无论放行与否,都写一条 ``AgentAssetTestRun(test_type='golden')`` 记录。
"""
if not _gate_enabled():
return GoldenEvalReport(total=0, all_passed=True)
try:
report = self.evaluate_for_rule(db, manifest, rule_code)
except Exception:
logger.exception("golden set 评测异常,降级放行 asset_id=%s", asset.id)
report = GoldenEvalReport(total=0, all_passed=True)
self._record_test_run(db, asset, version, report, actor=actor)
if report.total > 0 and not report.all_passed:
failures = report.to_dict()["results"]
raise PermissionError(
f"golden set 回归未通过({report.passed_count}/{report.total}"
f"发布被拦截。失败用例:{failures}"
)
return report
def _record_test_run(
self,
db: Session,
asset: AgentAsset,
version: str,
report: GoldenEvalReport,
*,
actor: str,
) -> None:
try:
run = AgentAssetTestRun(
id=str(uuid.uuid4()),
asset_id=asset.id,
version=version,
test_type="golden",
status="completed",
passed=report.all_passed,
summary=(
f"golden set {report.passed_count}/{report.total} passed"
if report.total > 0
else "golden set empty, gate skipped"
),
input_json={"rule_code": getattr(asset, "rule_code", "") or ""},
result_json=report.to_dict(),
created_by=actor,
)
db.add(run)
db.commit()
except Exception:
logger.warning("golden test run 记录失败 asset_id=%s", asset.id, exc_info=True)
db.rollback()

View File

@@ -1,262 +0,0 @@
from __future__ import annotations
from collections.abc import Generator
from datetime import datetime
from decimal import Decimal
from unittest.mock import MagicMock, patch
import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import Session, sessionmaker
from sqlalchemy.pool import StaticPool
from app.db.base import Base
from app.models.agent_asset import AgentAsset, AgentAssetTestRun
from app.models.employee import Employee
from app.models.financial_record import ExpenseClaim
from app.models.golden_case import GoldenCase
from app.services.risk_rule_golden_evaluator import (
GoldenEvalReport,
RiskRuleGoldenEvaluator,
_aggregate,
_run_single_case,
)
def _build_session() -> Session:
engine = create_engine(
"sqlite+pysqlite:///:memory:",
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
Base.metadata.create_all(bind=engine)
factory = sessionmaker(bind=engine, autoflush=False, autocommit=False)
return factory()
def _keyword_manifest() -> dict:
"""一个简单的 keyword_match_v1 manifestreason 含"虚假"则命中。"""
return {
"rule_code": "risk.test.keyword",
"template_key": "keyword_match_v1",
"inputs": {
"fields": [
{"key": "claim.reason", "label": "事由", "type": "text", "source": "claim"},
]
},
"params": {
"keywords": ["虚假"],
"field_keys": ["claim.reason"],
"search_fields": ["claim.reason"],
},
"outcomes": {"fail": {"severity": "high", "risk_score": 80}},
}
def _golden_case(
case_key: str,
*,
reason: str,
expected_hit: bool,
rule_code: str = "risk.test.keyword",
) -> GoldenCase:
return GoldenCase(
case_key=case_key,
rule_code=rule_code,
name=f"case-{case_key}",
values_json={"claim.reason": reason},
expected_hit=expected_hit,
status="active",
)
def test_run_single_case_hit_matches() -> None:
result = _run_single_case(
_keyword_manifest(),
values={"claim.reason": "虚假发票报销"},
expected_hit=True,
expected_severity="high",
)
assert result.actual_hit is True
assert result.passed is True
assert result.actual_severity == "high"
def test_run_single_case_no_hit_matches() -> None:
result = _run_single_case(
_keyword_manifest(),
values={"claim.reason": "正常差旅报销"},
expected_hit=False,
expected_severity="",
)
assert result.actual_hit is False
assert result.passed is True
def test_run_single_case_mismatch_fails() -> None:
result = _run_single_case(
_keyword_manifest(),
values={"claim.reason": "虚假发票"},
expected_hit=False, # 期望不命中,但实际命中
expected_severity="",
)
assert result.actual_hit is True
assert result.passed is False
def test_run_single_case_severity_mismatch_fails() -> None:
result = _run_single_case(
_keyword_manifest(),
values={"claim.reason": "虚假发票"},
expected_hit=True,
expected_severity="critical", # 实际是 high
)
assert result.passed is False
def test_aggregate_empty_returns_passed() -> None:
report = _aggregate([])
assert report.total == 0
assert report.all_passed is True
assert report.accuracy == 0.0
def test_aggregate_all_passed() -> None:
from app.services.risk_rule_golden_evaluator import GoldenCaseResult
results = [
GoldenCaseResult("1", "a", True, True, "high", "high", True),
GoldenCaseResult("2", "b", False, False, "", "none", True),
]
report = _aggregate(results)
assert report.total == 2
assert report.passed_count == 2
assert report.accuracy == 1.0
assert report.all_passed is True
def test_aggregate_with_failure() -> None:
from app.services.risk_rule_golden_evaluator import GoldenCaseResult
results = [
GoldenCaseResult("1", "a", True, True, "high", "high", True),
GoldenCaseResult("2", "b", True, False, "high", "none", False), # FP
]
report = _aggregate(results)
assert report.passed_count == 1
assert report.failed_count == 1
assert report.accuracy == 0.5
assert report.all_passed is False
assert report.precision == 0.5 # 1/(1+1)
def test_evaluate_for_rule_empty_returns_passed() -> None:
with _build_session() as db:
report = RiskRuleGoldenEvaluator().evaluate_for_rule(db, _keyword_manifest(), "risk.test.keyword")
assert report.total == 0
assert report.all_passed is True
def test_evaluate_for_rule_all_pass() -> None:
with _build_session() as db:
db.add(_golden_case("g1", reason="虚假发票", expected_hit=True))
db.add(_golden_case("g2", reason="正常报销", expected_hit=False))
db.commit()
report = RiskRuleGoldenEvaluator().evaluate_for_rule(db, _keyword_manifest(), "risk.test.keyword")
assert report.total == 2
assert report.all_passed is True
assert report.accuracy == 1.0
def test_evaluate_for_rule_with_failure() -> None:
with _build_session() as db:
db.add(_golden_case("g1", reason="虚假发票", expected_hit=False)) # 期望不命中但实际命中
db.add(_golden_case("g2", reason="正常报销", expected_hit=True)) # 期望命中但实际不命中
db.commit()
report = RiskRuleGoldenEvaluator().evaluate_for_rule(db, _keyword_manifest(), "risk.test.keyword")
assert report.total == 2
assert report.all_passed is False
assert report.failed_count == 2
def _asset(asset_id: str, code: str) -> AgentAsset:
return AgentAsset(
id=asset_id,
code=code,
name=code,
asset_type="rule",
domain="expense",
owner="tester",
status="review",
working_version="v1",
)
def test_require_pass_passes_when_all_green() -> None:
with _build_session() as db:
asset = _asset("a1", "R1")
db.add(asset)
db.add(_golden_case("g1", reason="虚假", expected_hit=True))
db.commit()
report = RiskRuleGoldenEvaluator().require_pass(
db, asset, "v1", _keyword_manifest(), "risk.test.keyword", actor="tester"
)
assert report.all_passed is True
# 应写一条 test_type='golden' 记录
run = db.query(AgentAssetTestRun).filter_by(asset_id="a1", test_type="golden").one()
assert run.passed is True
def test_require_pass_raises_on_failure() -> None:
with _build_session() as db:
asset = _asset("a2", "R2")
db.add(asset)
db.add(_golden_case("g1", reason="虚假", expected_hit=False)) # 会失败
db.commit()
with pytest.raises(PermissionError):
RiskRuleGoldenEvaluator().require_pass(
db, asset, "v1", _keyword_manifest(), "risk.test.keyword", actor="tester"
)
run = db.query(AgentAssetTestRun).filter_by(asset_id="a2", test_type="golden").one()
assert run.passed is False
def test_require_pass_empty_golden_set_passes() -> None:
with _build_session() as db:
asset = _asset("a3", "R3")
db.add(asset)
db.commit()
report = RiskRuleGoldenEvaluator().require_pass(
db, asset, "v1", _keyword_manifest(), "risk.test.keyword", actor="tester"
)
assert report.total == 0
assert report.all_passed is True
def test_require_pass_respects_feature_flag(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("GOLDEN_SET_GATE_ENABLED", "false")
with _build_session() as db:
asset = _asset("a4", "R4")
db.add(asset)
db.add(_golden_case("g1", reason="虚假", expected_hit=False)) # 本应失败
db.commit()
# 门禁关闭,应放行不抛异常
report = RiskRuleGoldenEvaluator().require_pass(
db, asset, "v1", _keyword_manifest(), "risk.test.keyword", actor="tester"
)
assert report.total == 0
def test_require_pass_swallows_evaluator_exception() -> None:
with _build_session() as db:
asset = _asset("a5", "R5")
db.add(asset)
db.commit()
evaluator = RiskRuleGoldenEvaluator()
with patch.object(evaluator, "evaluate_for_rule", side_effect=RuntimeError("boom")):
report = evaluator.require_pass(
db, asset, "v1", _keyword_manifest(), "risk.test.keyword", actor="tester"
)
assert report.total == 0
assert report.all_passed is True # 降级放行

View File

@@ -766,7 +766,6 @@ export function usePersonalWorkbenchAiMode(props, emit) {
requestedSubmit: travelApplicationRequest.requestedSubmit, requestedSubmit: travelApplicationRequest.requestedSubmit,
submitRequiresConfirmation: travelApplicationRequest.submitRequiresConfirmation, submitRequiresConfirmation: travelApplicationRequest.submitRequiresConfirmation,
stewardRemainingTasks: travelApplicationRequest.stewardRemainingTasks, stewardRemainingTasks: travelApplicationRequest.stewardRemainingTasks,
onPreviewReadyForNextTask: startModelPlannedNextTask,
onApplicationActionCompleted: startModelPlannedNextTask onApplicationActionCompleted: startModelPlannedNextTask
} }
) )

View File

@@ -96,12 +96,6 @@ export function useWorkbenchAiActionRouter({
requestedSubmit: Boolean(actionPayload.requestedSubmit), requestedSubmit: Boolean(actionPayload.requestedSubmit),
submitRequiresConfirmation: Boolean(actionPayload.submitRequiresConfirmation), submitRequiresConfirmation: Boolean(actionPayload.submitRequiresConfirmation),
stewardRemainingTasks, stewardRemainingTasks,
onPreviewReadyForNextTask: (remainingTasks = []) => {
const nextTaskAction = buildNextTaskSuggestedAction({ steward_remaining_tasks: remainingTasks })
if (nextTaskAction) {
handleInlineSuggestedAction(nextTaskAction)
}
},
onApplicationActionCompleted: (remainingTasks = []) => { onApplicationActionCompleted: (remainingTasks = []) => {
const nextTaskAction = buildNextTaskSuggestedAction({ steward_remaining_tasks: remainingTasks }) const nextTaskAction = buildNextTaskSuggestedAction({ steward_remaining_tasks: remainingTasks })
if (nextTaskAction) { if (nextTaskAction) {

View File

@@ -619,13 +619,10 @@ export function useWorkbenchAiApplicationPreviewFlow({
userText: options.userMessage || '保存草稿', userText: options.userMessage || '保存草稿',
onApplicationActionCompleted: options.onApplicationActionCompleted onApplicationActionCompleted: options.onApplicationActionCompleted
}) })
} else if (
typeof options.onPreviewReadyForNextTask === 'function' &&
Array.isArray(previewMessage.stewardRemainingTasks) &&
previewMessage.stewardRemainingTasks.length
) {
options.onPreviewReadyForNextTask(previewMessage.stewardRemainingTasks, previewMessage)
} }
// 多 task 串行推进:预览生成后不提前拉起下一个 task(避免和用户在 task1 核对表上的
// 保存草稿/提交操作互相打架,导致 task2 状态错乱)。task2 的推进统一交给
// onApplicationActionCompleted,在 task1 真正完成(保存草稿/提交成功)后再触发。
} catch (error) { } catch (error) {
replaceInlineMessage(pendingMessage.id, createInlineMessage('assistant', error?.message || '申请核对表生成失败,请稍后重试。', { replaceInlineMessage(pendingMessage.id, createInlineMessage('assistant', error?.message || '申请核对表生成失败,请稍后重试。', {
id: pendingMessage.id, id: pendingMessage.id,

View File

@@ -148,7 +148,8 @@ test('workbench low-confidence application confirmation forwards remaining tasks
assert.ok(previewCall, 'startAiApplicationPreview 应被调用') assert.ok(previewCall, 'startAiApplicationPreview 应被调用')
assert.deepEqual(previewCall[3].stewardRemainingTasks, remainingTasks) assert.deepEqual(previewCall[3].stewardRemainingTasks, remainingTasks)
assert.equal(typeof previewCall[3].onPreviewReadyForNextTask, 'function') // 低置信确认按钮只在 task1 完成后推进 task2,不再在预览生成时提前推进。
assert.equal(previewCall[3].onPreviewReadyForNextTask, undefined)
assert.equal(typeof previewCall[3].onApplicationActionCompleted, 'function') assert.equal(typeof previewCall[3].onApplicationActionCompleted, 'function')
}) })

View File

@@ -147,6 +147,63 @@ test('workbench auto-saved application draft continues remaining steward task',
} }
}) })
test('workbench application preview does not continue next task until draft is saved or submitted', async () => {
// 时序回归:task1 申请核对表刚生成、用户还没点保存草稿/提交时,
// 不能提前拉起 task2(会导致两条流程消息和状态互相打架,最终 task2 无反应)。
// task2 的推进必须等 task1 真正完成(onApplicationActionCompleted)后再触发。
const originalFetch = globalThis.fetch
globalThis.fetch = async (url) => {
if (String(url).includes('/reimbursements/application-preview-action')) {
return {
ok: true,
async json() {
return { status: 'succeeded', result: { draft_payload: { claim_id: 'c1', claim_no: 'AEW2', status: 'draft' } } }
}
}
}
throw new Error(`unexpected request: ${url}`)
}
try {
const continuedTasks = []
const remainingTasks = [{
task_id: 'task-reimbursement-2',
task_type: 'reimbursement',
assigned_agent: 'reimbursement_assistant',
summary: '报销昨天的业务招待费 2000 元',
ontology_fields: { expense_type: 'entertainment', amount: '2000元', time_range: '2026-06-29', reason: '业务招待费报销' }
}]
const harness = buildApplicationPreviewFlowHarness([], {
onApplicationActionCompleted: (tasks) => { continuedTasks.push({ tasks, phase: 'module' }) }
})
// 第一步:生成申请核对表(不传 autoSaveDraft,模拟用户需要手动操作 task1)
await harness.flow.startAiApplicationPreview('travel', '差旅费', '2月20-23去上海出差并且报销昨天招待费2000元', {
stewardRemainingTasks: remainingTasks,
onApplicationActionCompleted: (tasks) => { continuedTasks.push({ tasks, phase: 'options' }) }
})
// 预览生成后,task2 不应被提前拉起
assert.equal(continuedTasks.length, 0, '预览生成时不应触发 task2 推进回调')
const previewMessage = harness.conversationMessages.value.find((m) => m.applicationPreview)
assert.equal(previewMessage?.stewardRemainingTasks?.length, 1, 'task2 应仍挂在核对表消息上等待用户完成 task1')
// 第二步:用户手动点击"保存草稿"(走 actionRouter,不传 options.onApplicationActionCompleted),
// 此时回落到模块级 onApplicationActionCompleted 触发 task2,这正是真实运行时的续跑路径。
await harness.flow.executeInlineApplicationPreviewAction('save_draft', previewMessage, {
userText: '保存草稿',
draftPayload: null
})
assert.equal(continuedTasks.length, 1, '保存草稿完成后应推进 task2')
assert.deepEqual(continuedTasks[0].tasks, remainingTasks)
assert.equal(continuedTasks[0].phase, 'module', '手动保存草稿走模块级续跑回调')
assert.doesNotMatch(harness.conversationMessages.value.at(-1).content, /继续处理费用报销/, '自动续跑时不展示重复的继续处理按钮')
} finally {
globalThis.fetch = originalFetch
}
})
test('workbench saved application draft can be submitted by contextual text without re-planning', async () => { test('workbench saved application draft can be submitted by contextual text without re-planning', async () => {
const originalFetch = globalThis.fetch const originalFetch = globalThis.fetch
const requests = [] const requests = []

View File

@@ -347,10 +347,12 @@ test('workbench AI mode asks steward model plan before fallback execution', () =
assert.match(personalWorkbenchAiModeScript, /submitRequiresConfirmation:\s*travelApplicationRequest\.submitRequiresConfirmation/) assert.match(personalWorkbenchAiModeScript, /submitRequiresConfirmation:\s*travelApplicationRequest\.submitRequiresConfirmation/)
assert.match(personalWorkbenchAiModeScript, /ontologyFields:\s*travelApplicationRequest\.ontologyFields/) assert.match(personalWorkbenchAiModeScript, /ontologyFields:\s*travelApplicationRequest\.ontologyFields/)
assert.match(personalWorkbenchAiModeScript, /stewardRemainingTasks:\s*travelApplicationRequest\.stewardRemainingTasks/) assert.match(personalWorkbenchAiModeScript, /stewardRemainingTasks:\s*travelApplicationRequest\.stewardRemainingTasks/)
assert.match(personalWorkbenchAiModeScript, /onPreviewReadyForNextTask:\s*startModelPlannedNextTask/)
assert.match(personalWorkbenchAiModeScript, /onApplicationActionCompleted:\s*startModelPlannedNextTask/) assert.match(personalWorkbenchAiModeScript, /onApplicationActionCompleted:\s*startModelPlannedNextTask/)
// 多 task 串行推进:预览生成时不再提前拉起下一个 task(会与用户在 task1 上的操作互相打架),
// 改为只在 task1 完成(保存草稿/提交)后通过 onApplicationActionCompleted 推进 task2。
assert.doesNotMatch(personalWorkbenchAiModeScript, /onPreviewReadyForNextTask/)
assert.match(applicationPreviewFlowScript, /options\.autoSaveDraft/) assert.match(applicationPreviewFlowScript, /options\.autoSaveDraft/)
assert.match(applicationPreviewFlowScript, /options\.onPreviewReadyForNextTask/) assert.doesNotMatch(applicationPreviewFlowScript, /onPreviewReadyForNextTask/)
assert.match(applicationPreviewFlowScript, /const actionCompletedHandler = typeof options\.onApplicationActionCompleted === 'function'/) assert.match(applicationPreviewFlowScript, /const actionCompletedHandler = typeof options\.onApplicationActionCompleted === 'function'/)
assert.match(applicationPreviewFlowScript, /actionCompletedHandler\(targetMessage\.stewardRemainingTasks/) assert.match(applicationPreviewFlowScript, /actionCompletedHandler\(targetMessage\.stewardRemainingTasks/)
assert.match(applicationPreviewFlowScript, /onApplicationActionCompleted:\s*options\.onApplicationActionCompleted/) assert.match(applicationPreviewFlowScript, /onApplicationActionCompleted:\s*options\.onApplicationActionCompleted/)