Files
X-Financial/server/tests/test_orchestrator_review_flow.py
caoxiaozhu 0c74b4ab4a feat: 财务看板口径重构与半年模拟数据及报销状态注册表
- 重构 finance_dashboard 口径计算,新增模拟公司画像数据生成与筛选
- 引入 expense_claim_status_registry 统一报销状态流转
- 完善报销草稿流程、Item Sync 与本体解析器
- 优化总览页趋势图、分页组件与请求进度步骤
- 增强报销申请快速预览、本体工具与详情展示
- 新增半年报销模拟数据种子脚本与状态审计工具
- 补充财务看板、报销状态注册与模拟数据测试覆盖
2026-06-02 16:22:59 +08:00

884 lines
33 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
from datetime import UTC, date, datetime
from decimal import Decimal
import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import Session, sessionmaker
from sqlalchemy.pool import StaticPool
from app.db.base import Base
from app.models.agent_asset import AgentAsset
from app.models.agent_run import AgentRun
from app.models.employee import Employee
from app.models.financial_record import ExpenseClaim, ExpenseClaimItem
from app.schemas.ontology import OntologyParseResult, OntologyPermission
from app.schemas.orchestrator import OrchestratorRequest
from app.services.agent_conversations import AgentConversationService
from app.services.orchestrator import OrchestratorService
def build_session_factory() -> sessionmaker[Session]:
engine = create_engine(
"sqlite+pysqlite:///:memory:",
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
Base.metadata.create_all(bind=engine)
return sessionmaker(bind=engine, autoflush=False, autocommit=False)
@pytest.fixture(autouse=True)
def skip_agent_foundation_bootstrap(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(
"app.services.agent_foundation.AgentFoundationService.ensure_foundation_ready",
lambda *_args, **_kwargs: None,
)
@pytest.mark.parametrize(
("task_type", "code", "method_path", "summary", "expected_text"),
[
(
"global_risk_scan",
"task.hermes.global_risk_scan",
"app.services.hermes_risk_scanner.HermesRiskScannerService.scan_global_risks",
{"scanned_claim_count": 2, "risk_observation_count": 1},
"生成 1 条风险观察",
),
(
"employee_behavior_profile_scan",
"task.hermes.employee_behavior_profile_scan",
"app.services.hermes_employee_profile_scanner.HermesEmployeeProfileScannerService.scan_employee_profiles",
{
"target_employee_count": 3,
"snapshot_count": 9,
"high_attention_employee_count": 1,
},
"生成 9 条快照",
),
(
"risk_clue_collect",
"task.hermes.risk_rule_discovery",
"app.services.hermes_risk_clue_collector.HermesRiskClueCollectorService.collect_risk_clues",
{"fact_count": 4, "rule_hit_count": 3, "risk_clue_count": 2},
"输出 2 条待复核线索",
),
],
)
def test_schedule_digital_employee_task_runs_real_service(
monkeypatch,
task_type,
code,
method_path,
summary,
expected_text,
) -> None:
def parse_for_run(self, request, run_id): # noqa: ANN001
return OntologyParseResult(
scenario="expense",
intent="risk_check",
entities=[],
permission=OntologyPermission(level="read", allowed=True, reason=""),
confidence=0.95,
missing_slots=[],
ambiguity=[],
clarification_required=False,
clarification_question=None,
run_id=run_id,
)
monkeypatch.setattr("app.services.ontology.SemanticOntologyService.parse_for_run", parse_for_run)
monkeypatch.setattr(method_path, lambda self, **kwargs: dict(summary))
session_factory = build_session_factory()
with session_factory() as db:
task = AgentAsset(
asset_type="task",
code=code,
name="数字员工任务",
description="",
domain="system",
scenario_json=["schedule"],
owner="pytest",
status="active",
current_version="v1.0.0",
working_version="v1.0.0",
published_version="v1.0.0",
config_json={"agent": "hermes", "task_type": task_type},
)
db.add(task)
db.commit()
response = OrchestratorService(db).run(
OrchestratorRequest(source="schedule", task_id=task.id, message=task.name)
)
run = db.query(AgentRun).filter_by(run_id=response.run_id).one()
assert response.status == "succeeded"
assert response.result["report_type"] == task_type
assert expected_text in response.result["message"]
assert run.route_json["job_type"] == task_type
assert run.route_json["task_code"] == code
def test_review_next_step_run_submits_existing_claim_and_returns_draft_payload(
monkeypatch,
) -> None:
monkeypatch.setattr(
"app.services.runtime_chat.RuntimeChatService.complete",
lambda *_args, **_kwargs: None,
)
session_factory = build_session_factory()
with session_factory() as db:
manager = Employee(
employee_no="E9000",
name="李经理",
email="manager-next@example.com",
)
employee = Employee(
employee_no="E9001",
name="张三",
email="emp-next@example.com",
manager=manager,
)
claim = ExpenseClaim(
id="claim-next-step",
claim_no="EXP-202605-001",
employee=employee,
employee_id=employee.id,
employee_name="张三",
department_name="销售部",
expense_type="office",
reason="采购办公用品",
location="上海",
amount=Decimal("128.00"),
currency="CNY",
invoice_count=1,
occurred_at=datetime(2026, 5, 20, 9, 0, tzinfo=UTC),
status="draft",
approval_stage="待提交",
items=[
ExpenseClaimItem(
item_date=date(2026, 5, 20),
item_type="office",
item_reason="采购办公用品",
item_location="上海",
item_amount=Decimal("128.00"),
invoice_id="office-invoice.png",
)
],
)
db.add_all([manager, employee, claim])
db.commit()
response = OrchestratorService(db).run(
OrchestratorRequest(
source="user_message",
user_id="emp-next@example.com",
message="我已核对右侧识别结果,请进入下一步。",
context_json={
"review_action": "next_step",
"draft_claim_id": claim.id,
"attachment_count": 1,
"name": "张三",
},
)
)
db.refresh(claim)
assert response.status == "succeeded"
assert response.requires_confirmation is False
assert response.result["draft_payload"]["status"] == "submitted"
assert response.result["draft_payload"]["approval_stage"] == "直属领导审批"
assert claim.status == "submitted"
assert claim.approval_stage == "直属领导审批"
assert claim.submitted_at is not None
assert response.conversation_id
assert AgentConversationService(db).get_conversation(response.conversation_id) is None
def test_review_next_step_blocked_returns_reasons_and_removes_next_step_action(
monkeypatch,
) -> None:
monkeypatch.setattr(
"app.services.runtime_chat.RuntimeChatService.complete",
lambda *_args, **_kwargs: None,
)
session_factory = build_session_factory()
with session_factory() as db:
employee = Employee(
employee_no="E9011",
name="张三",
email="emp-blocked@example.com",
)
claim = ExpenseClaim(
id="claim-next-step-blocked",
claim_no="EXP-202605-002",
employee=employee,
employee_id=employee.id,
employee_name="张三",
department_name="待补充",
expense_type="office",
reason="采购办公用品",
location="上海",
amount=Decimal("128.00"),
currency="CNY",
invoice_count=1,
occurred_at=datetime(2026, 5, 20, 9, 0, tzinfo=UTC),
status="draft",
approval_stage="待提交",
items=[
ExpenseClaimItem(
item_date=date(2026, 5, 20),
item_type="office",
item_reason="采购办公用品",
item_location="上海",
item_amount=Decimal("128.00"),
invoice_id="office-invoice.png",
)
],
)
db.add_all([employee, claim])
db.commit()
response = OrchestratorService(db).run(
OrchestratorRequest(
source="user_message",
user_id="emp-blocked@example.com",
message="我已核对右侧识别结果,请进入下一步。",
context_json={
"review_action": "next_step",
"draft_claim_id": claim.id,
"attachment_count": 1,
"name": "张三",
},
)
)
result = response.result
review_payload = result["review_payload"]
actions = {
str(item.get("action_type") or "").strip()
for item in review_payload["confirmation_actions"]
}
assert response.status == "succeeded"
assert result["draft_payload"]["status"] == "draft"
assert response.conversation_id
assert AgentConversationService(db).get_conversation(response.conversation_id) is not None
assert "自动检测暂未通过" in result["answer"]
assert "所属部门未完善" in result["answer"]
assert "next_step" not in actions
assert "save_draft" in actions
assert any(
"所属部门未完善" in str(item.get("content") or "")
for item in review_payload["risk_briefs"]
)
def test_conversation_hydration_does_not_reuse_review_type_for_fresh_expense_prompt() -> None:
session_factory = build_session_factory()
with session_factory() as db:
service = AgentConversationService(db)
conversation = service.get_or_create_conversation(
conversation_id="conv-review-type-lock",
user_id="emp-review-type@example.com",
source="user_message",
context_json={
"session_type": "expense",
"draft_claim_id": "claim-old",
"attachment_names": ["old-train-ticket.pdf"],
"attachment_count": 1,
"review_form_values": {
"expense_type": "差旅费",
"business_location": "北京",
},
},
)
fresh_context = service.hydrate_context_json(
conversation=conversation,
context_json={"draft_claim_id": "claim-old"},
message="业务发生时间:2026-02-20 至 2026-02-23去上海支持上海电力部署项目申请报销",
)
continued_context = service.hydrate_context_json(
conversation=conversation,
context_json={},
message="继续补充酒店发票",
)
assert "draft_claim_id" not in fresh_context
assert "attachment_names" not in fresh_context
assert "review_form_values" not in fresh_context
assert fresh_context["conversation_state"]["review_form_values"]["expense_type"] == "差旅费"
assert continued_context["draft_claim_id"] == "claim-old"
assert continued_context["review_form_values"]["expense_type"] == "差旅费"
def test_conversation_hydration_preserves_incoming_application_time_context() -> None:
session_factory = build_session_factory()
with session_factory() as db:
service = AgentConversationService(db)
conversation = service.get_or_create_conversation(
conversation_id="conv-application-time-context",
user_id="emp-application-time@example.com",
source="user_message",
context_json={
"session_type": "application",
"entry_source": "application",
"business_time_context": {
"mode": "single",
"start_date": "2026-05-01",
"end_date": "2026-05-01",
"display_value": "2026-05-01",
},
},
)
stale_context = service.hydrate_context_json(
conversation=conversation,
context_json={"session_type": "application", "entry_source": "application"},
message="apply travel expense",
)
fresh_context = service.hydrate_context_json(
conversation=conversation,
context_json={
"session_type": "application",
"entry_source": "application",
"business_time_context": {
"mode": "single",
"start_date": "2026-05-25",
"end_date": "2026-05-25",
"display_value": "2026-05-25",
},
},
message="apply travel expense",
)
assert "business_time_context" not in stale_context
assert fresh_context["business_time_context"]["start_date"] == "2026-05-25"
def test_conversation_scope_creates_new_session_for_different_claim() -> None:
session_factory = build_session_factory()
with session_factory() as db:
service = AgentConversationService(db)
old_conversation = service.get_or_create_conversation(
conversation_id="conv-old-claim-scope",
user_id="emp-scope@example.com",
source="user_message",
context_json={
"session_type": "expense",
"draft_claim_id": "claim-old",
"attachment_names": ["old-hotel.pdf"],
"attachment_count": 1,
"review_form_values": {
"expense_type": "住宿票",
"merchant_name": "旧酒店",
},
},
)
service.append_message(
conversation_id=old_conversation.conversation_id,
role="user",
content="继续补充旧酒店发票",
)
scoped_conversation = service.get_or_create_conversation(
conversation_id=old_conversation.conversation_id,
user_id="emp-scope@example.com",
source="user_message",
context_json={
"session_type": "expense",
"draft_claim_id": "claim-current",
},
)
conflict_context = service.hydrate_context_json(
conversation=old_conversation,
context_json={"draft_claim_id": "claim-current"},
message="继续补充当前单据的火车票",
)
scoped_context = service.hydrate_context_json(
conversation=scoped_conversation,
context_json={"draft_claim_id": "claim-current"},
message="继续补充当前单据的火车票",
)
db.refresh(old_conversation)
assert scoped_conversation.conversation_id != old_conversation.conversation_id
assert scoped_conversation.draft_claim_id == "claim-current"
assert old_conversation.draft_claim_id == "claim-old"
assert conflict_context == {"draft_claim_id": "claim-current"}
assert scoped_context["draft_claim_id"] == "claim-current"
assert scoped_context["conversation_history"] == []
def test_orchestrator_history_query_filters_location_time_and_returns_real_amount(
monkeypatch,
) -> None:
monkeypatch.setattr(
"app.services.runtime_chat.RuntimeChatService.complete",
lambda *_args, **_kwargs: None,
)
session_factory = build_session_factory()
with session_factory() as db:
employee = Employee(
id="emp-history-query",
employee_no="E9020",
name="张三",
email="history-query@example.com",
)
beijing_claim = ExpenseClaim(
id="claim-history-beijing",
claim_no="EXP-202506-001",
employee=employee,
employee_id=employee.id,
employee_name="张三",
department_name="交付部",
expense_type="travel",
reason="去北京支持客户项目",
location="北京",
amount=Decimal("321.45"),
currency="CNY",
invoice_count=2,
occurred_at=datetime(2025, 6, 18, 9, 0, tzinfo=UTC),
submitted_at=datetime(2025, 6, 19, 10, 0, tzinfo=UTC),
status="paid",
approval_stage="已入账",
)
shanghai_claim = ExpenseClaim(
id="claim-history-shanghai",
claim_no="EXP-202507-001",
employee=employee,
employee_id=employee.id,
employee_name="张三",
department_name="交付部",
expense_type="travel",
reason="去上海支持项目",
location="上海",
amount=Decimal("888.00"),
currency="CNY",
invoice_count=1,
occurred_at=datetime(2025, 7, 8, 9, 0, tzinfo=UTC),
submitted_at=datetime(2025, 7, 9, 10, 0, tzinfo=UTC),
status="paid",
approval_stage="已入账",
)
current_year_claim = ExpenseClaim(
id="claim-history-beijing-current",
claim_no="EXP-202601-001",
employee=employee,
employee_id=employee.id,
employee_name="张三",
department_name="交付部",
expense_type="travel",
reason="去北京支持年度项目",
location="北京",
amount=Decimal("666.00"),
currency="CNY",
invoice_count=1,
occurred_at=datetime(2026, 1, 8, 9, 0, tzinfo=UTC),
submitted_at=datetime(2026, 1, 9, 10, 0, tzinfo=UTC),
status="paid",
approval_stage="已入账",
)
db.add_all([employee, beijing_claim, shanghai_claim, current_year_claim])
db.commit()
response = OrchestratorService(db).run(
OrchestratorRequest(
source="user_message",
user_id="history-query@example.com",
message="我去年去北京报销的单据",
context_json={
"client_now_iso": "2026-05-21T04:00:00.000Z",
"client_timezone_offset_minutes": -480,
},
)
)
query_payload = response.result["query_payload"]
assert response.status == "succeeded"
assert response.trace_summary.scenario == "expense"
assert response.trace_summary.intent == "query"
assert query_payload["record_count"] == 1
assert query_payload["total_amount"] == 321.45
assert [item["claim_no"] for item in query_payload["records"]] == ["EXP-202506-001"]
assert "321.45" in response.result["answer"]
def test_orchestrator_archive_query_filters_archived_claims_and_limits_preview(
monkeypatch,
) -> None:
monkeypatch.setattr(
"app.services.runtime_chat.RuntimeChatService.complete",
lambda *_args, **_kwargs: None,
)
session_factory = build_session_factory()
with session_factory() as db:
employee = Employee(
id="emp-archive-query",
employee_no="E9021",
name="归档员工",
email="archive-query@example.com",
)
claims = []
for index in range(6):
claims.append(
ExpenseClaim(
id=f"claim-archive-query-{index}",
claim_no=f"EXP-ARCHIVE-{index + 1:03d}",
employee=employee,
employee_id=employee.id,
employee_name="归档员工",
department_name="交付部",
expense_type="travel",
reason=f"归档差旅 {index + 1}",
location="上海",
amount=Decimal("100.00") + Decimal(index),
currency="CNY",
invoice_count=1,
occurred_at=datetime(2026, 2, index + 1, 9, 0, tzinfo=UTC),
submitted_at=datetime(2026, 2, index + 2, 10, 0, tzinfo=UTC),
status="approved",
approval_stage="归档入账",
)
)
draft_claim = ExpenseClaim(
id="claim-archive-query-draft",
claim_no="EXP-ARCHIVE-DRAFT",
employee=employee,
employee_id=employee.id,
employee_name="归档员工",
department_name="交付部",
expense_type="travel",
reason="未归档草稿",
location="上海",
amount=Decimal("999.00"),
currency="CNY",
invoice_count=0,
occurred_at=datetime(2026, 3, 1, 9, 0, tzinfo=UTC),
submitted_at=None,
status="draft",
approval_stage="待提交",
)
db.add_all([employee, *claims, draft_claim])
db.commit()
response = OrchestratorService(db).run(
OrchestratorRequest(
source="user_message",
user_id="archive-query@example.com",
message="帮我查询一下我的归档的单据有哪些?",
context_json={
"client_now_iso": "2026-05-21T04:00:00.000Z",
"client_timezone_offset_minutes": -480,
},
)
)
query_payload = response.result["query_payload"]
assert response.status == "succeeded"
assert response.trace_summary.intent == "query"
assert query_payload["record_count"] == 6
assert query_payload["preview_count"] == 5
assert query_payload["preview_limit"] == 5
assert query_payload["title"] == "最近 5 条你的归档报销单"
assert all(record["status"] == "approved" for record in query_payload["records"])
assert "EXP-ARCHIVE-DRAFT" not in [record["claim_no"] for record in query_payload["records"]]
assert response.result["suggested_actions"] == []
assert "下面先列出最近 5 条记录" in response.result["answer"]
def test_orchestrator_expense_preview_does_not_persist_claim_before_user_action(
monkeypatch,
) -> None:
monkeypatch.setattr(
"app.services.runtime_chat.RuntimeChatService.complete",
lambda *_args, **_kwargs: None,
)
session_factory = build_session_factory()
with session_factory() as db:
employee = Employee(
employee_no="E9030",
name="预览员工",
email="preview-orchestrator@example.com",
)
db.add(employee)
db.commit()
response = OrchestratorService(db).run(
OrchestratorRequest(
source="user_message",
user_id="preview-orchestrator@example.com",
message="业务发生时间:2026-03-04打车去客户现场交通费32元请帮我看看怎么报",
context_json={
"name": "预览员工",
"user_input_text": "业务发生时间:2026-03-04打车去客户现场交通费32元请帮我看看怎么报",
},
)
)
user_claims = [
claim
for claim in db.query(ExpenseClaim).all()
if claim.employee_name == "预览员工"
]
assert response.status == "succeeded"
assert response.result.get("review_payload") is not None
assert response.result.get("draft_payload") is None
assert "交通费通常以实际票据金额为基础" in response.result["answer"]
assert user_claims == []
def test_orchestrator_prompts_scene_choices_before_review_for_fresh_ambiguous_expense(
monkeypatch,
) -> None:
monkeypatch.setattr(
"app.services.runtime_chat.RuntimeChatService.complete",
lambda *_args, **_kwargs: None,
)
session_factory = build_session_factory()
with session_factory() as db:
service = AgentConversationService(db)
conversation = service.get_or_create_conversation(
conversation_id="conv-scene-choice",
user_id="emp-scene-choice@example.com",
source="user_message",
context_json={
"session_type": "expense",
"draft_claim_id": "claim-old",
"review_form_values": {
"expense_type": "差旅费",
"business_location": "北京",
},
},
)
response = OrchestratorService(db).run(
OrchestratorRequest(
source="user_message",
user_id="emp-scene-choice@example.com",
conversation_id=conversation.conversation_id,
message="业务发生时间:2026-02-20 至 2026-02-23去上海支持上海电力部署项目申请报销",
context_json={
"session_type": "expense",
"draft_claim_id": "claim-old",
},
)
)
result = response.result
assert response.status == "succeeded"
assert result.get("review_payload") is None
assert result.get("draft_payload") is None
assert "请先在下面选择报销场景" in result["answer"]
assert [item["label"] for item in result["suggested_actions"][:3]] == ["差旅费", "交通费", "住宿费"]
def test_orchestrator_application_session_does_not_use_reimbursement_scene_prompt(
monkeypatch,
) -> None:
monkeypatch.setattr(
"app.services.runtime_chat.RuntimeChatService.complete",
lambda *_args, **_kwargs: None,
)
session_factory = build_session_factory()
message = (
"发生时间2026-05-25\n"
"地点:上海\n"
"事由:支持上海国网服务器部署\n"
"天数3天"
)
with session_factory() as db:
response = OrchestratorService(db).run(
OrchestratorRequest(
source="user_message",
user_id="application-session@example.com",
message=message,
context_json={
"session_type": "application",
"entry_source": "application",
"name": "申请员工",
},
)
)
result = response.result
assert response.status == "blocked"
assert response.trace_summary.scenario == "expense"
assert "费用申请" in result["answer"]
assert "| 出发时间 | 2026-05-25 |" in result["answer"]
assert "| 返回时间 | 2026-05-27 |" in result["answer"]
assert "请先在下面选择报销场景" not in result["answer"]
assert result.get("review_payload") is None
def test_orchestrator_application_session_guides_transport_estimate_and_submit(
monkeypatch,
) -> None:
monkeypatch.setattr(
"app.services.runtime_chat.RuntimeChatService.complete",
lambda *_args, **_kwargs: None,
)
session_factory = build_session_factory()
initial_message = (
"发生时间2026-05-25\n"
"地点:上海\n"
"事由:支持上海国网服务器部署\n"
"天数3天"
)
context_json = {
"session_type": "application",
"entry_source": "application",
"name": "申请员工",
"manager_name": "陈硕",
}
with session_factory() as db:
service = OrchestratorService(db)
first = service.run(
OrchestratorRequest(
source="user_message",
user_id="application-flow@example.com",
message=initial_message,
context_json=context_json,
)
)
second = service.run(
OrchestratorRequest(
source="user_message",
user_id="application-flow@example.com",
conversation_id=first.conversation_id,
message="飞机",
context_json=context_json,
)
)
third = service.run(
OrchestratorRequest(
source="user_message",
user_id="application-flow@example.com",
conversation_id=first.conversation_id,
message="确认提交",
context_json=context_json,
)
)
assert first.status == "blocked"
assert "当前还需要补充:出行方式" in first.result["answer"]
assert [item["label"] for item in first.result["suggested_actions"]] == ["一次性补充申请信息"]
assert first.result["suggested_actions"][0]["payload"]["prompt_prefill"] == "出行方式:"
assert "这是费用申请核对结果" in second.result["answer"]
assert "| 事由 | 支持上海国网服务器部署 |" in second.result["answer"]
assert "| 系统预估费用 |" in second.result["answer"]
assert "| 交通费用口径 | 预估交通费用 2,330元 |" in second.result["answer"]
assert "2,330元" in second.result["answer"]
assert "参考票价" not in second.result["answer"]
assert "查询耗时" not in second.result["answer"]
assert "请核对上述信息无误" in second.result["answer"]
assert "[确认](#application-submit)" in second.result["answer"]
assert second.status == "blocked"
assert second.result["requires_confirmation"] is True
assert second.result["suggested_actions"] == []
assert third.status == "succeeded"
assert third.result["clarification_required"] is False
assert third.result["missing_slots"] == []
assert "申请单据已生成,并已进入审批流程" in third.result["answer"]
assert "系统已推送给 陈硕 审核,当前节点:陈硕审核中" in third.result["answer"]
assert third.result["suggested_actions"] == []
application_claims = [
claim
for claim in db.query(ExpenseClaim).all()
if claim.claim_no.startswith("AP-")
]
assert len(application_claims) == 1
assert application_claims[0].status == "submitted"
assert application_claims[0].approval_stage == "直属领导审批"
assert third.result["draft_payload"]["claim_no"] == application_claims[0].claim_no
def test_orchestrator_application_submit_bypasses_generic_operation_block(
monkeypatch,
) -> None:
monkeypatch.setattr(
"app.services.runtime_chat.RuntimeChatService.complete",
lambda *_args, **_kwargs: None,
)
session_factory = build_session_factory()
initial_message = (
"发生时间2026-05-25\n"
"地点:上海\n"
"事由:支持上海国网服务器部署\n"
"天数3天"
)
context_json = {
"session_type": "application",
"entry_source": "application",
"name": "申请员工",
"manager_name": "陈硕",
}
with session_factory() as db:
service = OrchestratorService(db)
first = service.run(
OrchestratorRequest(
source="user_message",
user_id="application-approval-required@example.com",
message=initial_message,
context_json=context_json,
)
)
preview = service.run(
OrchestratorRequest(
source="user_message",
user_id="application-approval-required@example.com",
conversation_id=first.conversation_id,
message="飞机",
context_json=context_json,
)
)
def approval_required_parse_for_run(self, request, run_id): # noqa: ANN001
return OntologyParseResult(
scenario="expense",
intent="operate",
entities=[],
permission=OntologyPermission(
level="approval_required",
allowed=False,
reason="操作类请求需要人工审批确认。",
),
confidence=0.95,
missing_slots=[],
ambiguity=[],
clarification_required=False,
clarification_question=None,
run_id=run_id,
)
monkeypatch.setattr(
"app.services.ontology.SemanticOntologyService.parse_for_run",
approval_required_parse_for_run,
)
submitted = service.run(
OrchestratorRequest(
source="user_message",
user_id="application-approval-required@example.com",
conversation_id=first.conversation_id,
message="确认提交",
context_json=context_json,
)
)
assert preview.status == "blocked"
assert submitted.status == "succeeded"
assert submitted.requires_confirmation is False
assert "操作类请求需要人工审批确认" not in submitted.result["answer"]
assert "当前仅返回确认摘要" not in submitted.result["answer"]
assert "申请单据已生成,并已进入审批流程" in submitted.result["answer"]
assert submitted.result["draft_payload"]["status"] == "submitted"