Files
X-Financial/server/tests/test_orchestrator_review_flow.py

884 lines
33 KiB
Python
Raw Normal View History

from __future__ import annotations
from datetime import UTC, date, datetime
from decimal import Decimal
import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import Session, sessionmaker
from sqlalchemy.pool import StaticPool
from app.db.base import Base
from app.models.agent_asset import AgentAsset
from app.models.agent_run import AgentRun
from app.models.employee import Employee
from app.models.financial_record import ExpenseClaim, ExpenseClaimItem
from app.schemas.ontology import OntologyParseResult, OntologyPermission
from app.schemas.orchestrator import OrchestratorRequest
from app.services.agent_conversations import AgentConversationService
from app.services.orchestrator import OrchestratorService
def build_session_factory() -> sessionmaker[Session]:
engine = create_engine(
"sqlite+pysqlite:///:memory:",
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
Base.metadata.create_all(bind=engine)
return sessionmaker(bind=engine, autoflush=False, autocommit=False)
@pytest.fixture(autouse=True)
def skip_agent_foundation_bootstrap(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(
"app.services.agent_foundation.AgentFoundationService.ensure_foundation_ready",
lambda *_args, **_kwargs: None,
)
@pytest.mark.parametrize(
("task_type", "code", "method_path", "summary", "expected_text"),
[
(
"global_risk_scan",
"task.hermes.global_risk_scan",
"app.services.hermes_risk_scanner.HermesRiskScannerService.scan_global_risks",
{"scanned_claim_count": 2, "risk_observation_count": 1},
"生成 1 条风险观察",
),
(
"employee_behavior_profile_scan",
"task.hermes.employee_behavior_profile_scan",
"app.services.hermes_employee_profile_scanner.HermesEmployeeProfileScannerService.scan_employee_profiles",
{
"target_employee_count": 3,
"snapshot_count": 9,
"high_attention_employee_count": 1,
},
"生成 9 条快照",
),
(
"risk_clue_collect",
"task.hermes.risk_rule_discovery",
"app.services.hermes_risk_clue_collector.HermesRiskClueCollectorService.collect_risk_clues",
{"fact_count": 4, "rule_hit_count": 3, "risk_clue_count": 2},
"输出 2 条待复核线索",
),
],
)
def test_schedule_digital_employee_task_runs_real_service(
monkeypatch,
task_type,
code,
method_path,
summary,
expected_text,
) -> None:
def parse_for_run(self, request, run_id): # noqa: ANN001
return OntologyParseResult(
scenario="expense",
intent="risk_check",
entities=[],
permission=OntologyPermission(level="read", allowed=True, reason=""),
confidence=0.95,
missing_slots=[],
ambiguity=[],
clarification_required=False,
clarification_question=None,
run_id=run_id,
)
monkeypatch.setattr("app.services.ontology.SemanticOntologyService.parse_for_run", parse_for_run)
monkeypatch.setattr(method_path, lambda self, **kwargs: dict(summary))
session_factory = build_session_factory()
with session_factory() as db:
task = AgentAsset(
asset_type="task",
code=code,
name="数字员工任务",
description="",
domain="system",
scenario_json=["schedule"],
owner="pytest",
status="active",
current_version="v1.0.0",
working_version="v1.0.0",
published_version="v1.0.0",
config_json={"agent": "hermes", "task_type": task_type},
)
db.add(task)
db.commit()
response = OrchestratorService(db).run(
OrchestratorRequest(source="schedule", task_id=task.id, message=task.name)
)
run = db.query(AgentRun).filter_by(run_id=response.run_id).one()
assert response.status == "succeeded"
assert response.result["report_type"] == task_type
assert expected_text in response.result["message"]
assert run.route_json["job_type"] == task_type
assert run.route_json["task_code"] == code
def test_review_next_step_run_submits_existing_claim_and_returns_draft_payload(
monkeypatch,
) -> None:
monkeypatch.setattr(
"app.services.runtime_chat.RuntimeChatService.complete",
lambda *_args, **_kwargs: None,
)
session_factory = build_session_factory()
with session_factory() as db:
manager = Employee(
employee_no="E9000",
name="李经理",
email="manager-next@example.com",
)
employee = Employee(
employee_no="E9001",
name="张三",
email="emp-next@example.com",
manager=manager,
)
claim = ExpenseClaim(
id="claim-next-step",
claim_no="EXP-202605-001",
employee=employee,
employee_id=employee.id,
employee_name="张三",
department_name="销售部",
expense_type="office",
reason="采购办公用品",
location="上海",
amount=Decimal("128.00"),
currency="CNY",
invoice_count=1,
occurred_at=datetime(2026, 5, 20, 9, 0, tzinfo=UTC),
status="draft",
approval_stage="待提交",
items=[
ExpenseClaimItem(
item_date=date(2026, 5, 20),
item_type="office",
item_reason="采购办公用品",
item_location="上海",
item_amount=Decimal("128.00"),
invoice_id="office-invoice.png",
)
],
)
db.add_all([manager, employee, claim])
db.commit()
response = OrchestratorService(db).run(
OrchestratorRequest(
source="user_message",
user_id="emp-next@example.com",
message="我已核对右侧识别结果,请进入下一步。",
context_json={
"review_action": "next_step",
"draft_claim_id": claim.id,
"attachment_count": 1,
"name": "张三",
},
)
)
db.refresh(claim)
assert response.status == "succeeded"
assert response.requires_confirmation is False
assert response.result["draft_payload"]["status"] == "submitted"
assert response.result["draft_payload"]["approval_stage"] == "直属领导审批"
assert claim.status == "submitted"
assert claim.approval_stage == "直属领导审批"
assert claim.submitted_at is not None
assert response.conversation_id
assert AgentConversationService(db).get_conversation(response.conversation_id) is None
def test_review_next_step_blocked_returns_reasons_and_removes_next_step_action(
monkeypatch,
) -> None:
monkeypatch.setattr(
"app.services.runtime_chat.RuntimeChatService.complete",
lambda *_args, **_kwargs: None,
)
session_factory = build_session_factory()
with session_factory() as db:
employee = Employee(
employee_no="E9011",
name="张三",
email="emp-blocked@example.com",
)
claim = ExpenseClaim(
id="claim-next-step-blocked",
claim_no="EXP-202605-002",
employee=employee,
employee_id=employee.id,
employee_name="张三",
department_name="待补充",
expense_type="office",
reason="采购办公用品",
location="上海",
amount=Decimal("128.00"),
currency="CNY",
invoice_count=1,
occurred_at=datetime(2026, 5, 20, 9, 0, tzinfo=UTC),
status="draft",
approval_stage="待提交",
items=[
ExpenseClaimItem(
item_date=date(2026, 5, 20),
item_type="office",
item_reason="采购办公用品",
item_location="上海",
item_amount=Decimal("128.00"),
invoice_id="office-invoice.png",
)
],
)
db.add_all([employee, claim])
db.commit()
response = OrchestratorService(db).run(
OrchestratorRequest(
source="user_message",
user_id="emp-blocked@example.com",
message="我已核对右侧识别结果,请进入下一步。",
context_json={
"review_action": "next_step",
"draft_claim_id": claim.id,
"attachment_count": 1,
"name": "张三",
},
)
)
result = response.result
review_payload = result["review_payload"]
actions = {
str(item.get("action_type") or "").strip()
for item in review_payload["confirmation_actions"]
}
assert response.status == "succeeded"
assert result["draft_payload"]["status"] == "draft"
assert response.conversation_id
assert AgentConversationService(db).get_conversation(response.conversation_id) is not None
assert "自动检测暂未通过" in result["answer"]
assert "所属部门未完善" in result["answer"]
assert "next_step" not in actions
assert "save_draft" in actions
assert any(
"所属部门未完善" in str(item.get("content") or "")
for item in review_payload["risk_briefs"]
)
def test_conversation_hydration_does_not_reuse_review_type_for_fresh_expense_prompt() -> None:
session_factory = build_session_factory()
with session_factory() as db:
service = AgentConversationService(db)
conversation = service.get_or_create_conversation(
conversation_id="conv-review-type-lock",
user_id="emp-review-type@example.com",
source="user_message",
context_json={
"session_type": "expense",
"draft_claim_id": "claim-old",
"attachment_names": ["old-train-ticket.pdf"],
"attachment_count": 1,
"review_form_values": {
"expense_type": "差旅费",
"business_location": "北京",
},
},
)
fresh_context = service.hydrate_context_json(
conversation=conversation,
context_json={"draft_claim_id": "claim-old"},
message="业务发生时间:2026-02-20 至 2026-02-23去上海支持上海电力部署项目申请报销",
)
continued_context = service.hydrate_context_json(
conversation=conversation,
context_json={},
message="继续补充酒店发票",
)
assert "draft_claim_id" not in fresh_context
assert "attachment_names" not in fresh_context
assert "review_form_values" not in fresh_context
assert fresh_context["conversation_state"]["review_form_values"]["expense_type"] == "差旅费"
assert continued_context["draft_claim_id"] == "claim-old"
assert continued_context["review_form_values"]["expense_type"] == "差旅费"
def test_conversation_hydration_preserves_incoming_application_time_context() -> None:
session_factory = build_session_factory()
with session_factory() as db:
service = AgentConversationService(db)
conversation = service.get_or_create_conversation(
conversation_id="conv-application-time-context",
user_id="emp-application-time@example.com",
source="user_message",
context_json={
"session_type": "application",
"entry_source": "application",
"business_time_context": {
"mode": "single",
"start_date": "2026-05-01",
"end_date": "2026-05-01",
"display_value": "2026-05-01",
},
},
)
stale_context = service.hydrate_context_json(
conversation=conversation,
context_json={"session_type": "application", "entry_source": "application"},
message="apply travel expense",
)
fresh_context = service.hydrate_context_json(
conversation=conversation,
context_json={
"session_type": "application",
"entry_source": "application",
"business_time_context": {
"mode": "single",
"start_date": "2026-05-25",
"end_date": "2026-05-25",
"display_value": "2026-05-25",
},
},
message="apply travel expense",
)
assert "business_time_context" not in stale_context
assert fresh_context["business_time_context"]["start_date"] == "2026-05-25"
def test_conversation_scope_creates_new_session_for_different_claim() -> None:
session_factory = build_session_factory()
with session_factory() as db:
service = AgentConversationService(db)
old_conversation = service.get_or_create_conversation(
conversation_id="conv-old-claim-scope",
user_id="emp-scope@example.com",
source="user_message",
context_json={
"session_type": "expense",
"draft_claim_id": "claim-old",
"attachment_names": ["old-hotel.pdf"],
"attachment_count": 1,
"review_form_values": {
"expense_type": "住宿票",
"merchant_name": "旧酒店",
},
},
)
service.append_message(
conversation_id=old_conversation.conversation_id,
role="user",
content="继续补充旧酒店发票",
)
scoped_conversation = service.get_or_create_conversation(
conversation_id=old_conversation.conversation_id,
user_id="emp-scope@example.com",
source="user_message",
context_json={
"session_type": "expense",
"draft_claim_id": "claim-current",
},
)
conflict_context = service.hydrate_context_json(
conversation=old_conversation,
context_json={"draft_claim_id": "claim-current"},
message="继续补充当前单据的火车票",
)
scoped_context = service.hydrate_context_json(
conversation=scoped_conversation,
context_json={"draft_claim_id": "claim-current"},
message="继续补充当前单据的火车票",
)
db.refresh(old_conversation)
assert scoped_conversation.conversation_id != old_conversation.conversation_id
assert scoped_conversation.draft_claim_id == "claim-current"
assert old_conversation.draft_claim_id == "claim-old"
assert conflict_context == {"draft_claim_id": "claim-current"}
assert scoped_context["draft_claim_id"] == "claim-current"
assert scoped_context["conversation_history"] == []
def test_orchestrator_history_query_filters_location_time_and_returns_real_amount(
monkeypatch,
) -> None:
monkeypatch.setattr(
"app.services.runtime_chat.RuntimeChatService.complete",
lambda *_args, **_kwargs: None,
)
session_factory = build_session_factory()
with session_factory() as db:
employee = Employee(
id="emp-history-query",
employee_no="E9020",
name="张三",
email="history-query@example.com",
)
beijing_claim = ExpenseClaim(
id="claim-history-beijing",
claim_no="EXP-202506-001",
employee=employee,
employee_id=employee.id,
employee_name="张三",
department_name="交付部",
expense_type="travel",
reason="去北京支持客户项目",
location="北京",
amount=Decimal("321.45"),
currency="CNY",
invoice_count=2,
occurred_at=datetime(2025, 6, 18, 9, 0, tzinfo=UTC),
submitted_at=datetime(2025, 6, 19, 10, 0, tzinfo=UTC),
status="paid",
approval_stage="已入账",
)
shanghai_claim = ExpenseClaim(
id="claim-history-shanghai",
claim_no="EXP-202507-001",
employee=employee,
employee_id=employee.id,
employee_name="张三",
department_name="交付部",
expense_type="travel",
reason="去上海支持项目",
location="上海",
amount=Decimal("888.00"),
currency="CNY",
invoice_count=1,
occurred_at=datetime(2025, 7, 8, 9, 0, tzinfo=UTC),
submitted_at=datetime(2025, 7, 9, 10, 0, tzinfo=UTC),
status="paid",
approval_stage="已入账",
)
current_year_claim = ExpenseClaim(
id="claim-history-beijing-current",
claim_no="EXP-202601-001",
employee=employee,
employee_id=employee.id,
employee_name="张三",
department_name="交付部",
expense_type="travel",
reason="去北京支持年度项目",
location="北京",
amount=Decimal("666.00"),
currency="CNY",
invoice_count=1,
occurred_at=datetime(2026, 1, 8, 9, 0, tzinfo=UTC),
submitted_at=datetime(2026, 1, 9, 10, 0, tzinfo=UTC),
status="paid",
approval_stage="已入账",
)
db.add_all([employee, beijing_claim, shanghai_claim, current_year_claim])
db.commit()
response = OrchestratorService(db).run(
OrchestratorRequest(
source="user_message",
user_id="history-query@example.com",
message="我去年去北京报销的单据",
context_json={
"client_now_iso": "2026-05-21T04:00:00.000Z",
"client_timezone_offset_minutes": -480,
},
)
)
query_payload = response.result["query_payload"]
assert response.status == "succeeded"
assert response.trace_summary.scenario == "expense"
assert response.trace_summary.intent == "query"
assert query_payload["record_count"] == 1
assert query_payload["total_amount"] == 321.45
assert [item["claim_no"] for item in query_payload["records"]] == ["EXP-202506-001"]
assert "321.45" in response.result["answer"]
def test_orchestrator_archive_query_filters_archived_claims_and_limits_preview(
monkeypatch,
) -> None:
monkeypatch.setattr(
"app.services.runtime_chat.RuntimeChatService.complete",
lambda *_args, **_kwargs: None,
)
session_factory = build_session_factory()
with session_factory() as db:
employee = Employee(
id="emp-archive-query",
employee_no="E9021",
name="归档员工",
email="archive-query@example.com",
)
claims = []
for index in range(6):
claims.append(
ExpenseClaim(
id=f"claim-archive-query-{index}",
claim_no=f"EXP-ARCHIVE-{index + 1:03d}",
employee=employee,
employee_id=employee.id,
employee_name="归档员工",
department_name="交付部",
expense_type="travel",
reason=f"归档差旅 {index + 1}",
location="上海",
amount=Decimal("100.00") + Decimal(index),
currency="CNY",
invoice_count=1,
occurred_at=datetime(2026, 2, index + 1, 9, 0, tzinfo=UTC),
submitted_at=datetime(2026, 2, index + 2, 10, 0, tzinfo=UTC),
status="approved",
approval_stage="归档入账",
)
)
draft_claim = ExpenseClaim(
id="claim-archive-query-draft",
claim_no="EXP-ARCHIVE-DRAFT",
employee=employee,
employee_id=employee.id,
employee_name="归档员工",
department_name="交付部",
expense_type="travel",
reason="未归档草稿",
location="上海",
amount=Decimal("999.00"),
currency="CNY",
invoice_count=0,
occurred_at=datetime(2026, 3, 1, 9, 0, tzinfo=UTC),
submitted_at=None,
status="draft",
approval_stage="待提交",
)
db.add_all([employee, *claims, draft_claim])
db.commit()
response = OrchestratorService(db).run(
OrchestratorRequest(
source="user_message",
user_id="archive-query@example.com",
message="帮我查询一下我的归档的单据有哪些?",
context_json={
"client_now_iso": "2026-05-21T04:00:00.000Z",
"client_timezone_offset_minutes": -480,
},
)
)
query_payload = response.result["query_payload"]
assert response.status == "succeeded"
assert response.trace_summary.intent == "query"
assert query_payload["record_count"] == 6
assert query_payload["preview_count"] == 5
assert query_payload["preview_limit"] == 5
assert query_payload["title"] == "最近 5 条你的归档报销单"
assert all(record["status"] == "approved" for record in query_payload["records"])
assert "EXP-ARCHIVE-DRAFT" not in [record["claim_no"] for record in query_payload["records"]]
assert response.result["suggested_actions"] == []
assert "下面先列出最近 5 条记录" in response.result["answer"]
def test_orchestrator_expense_preview_does_not_persist_claim_before_user_action(
monkeypatch,
) -> None:
monkeypatch.setattr(
"app.services.runtime_chat.RuntimeChatService.complete",
lambda *_args, **_kwargs: None,
)
session_factory = build_session_factory()
with session_factory() as db:
employee = Employee(
employee_no="E9030",
name="预览员工",
email="preview-orchestrator@example.com",
)
db.add(employee)
db.commit()
response = OrchestratorService(db).run(
OrchestratorRequest(
source="user_message",
user_id="preview-orchestrator@example.com",
message="业务发生时间:2026-03-04打车去客户现场交通费32元请帮我看看怎么报",
context_json={
"name": "预览员工",
"user_input_text": "业务发生时间:2026-03-04打车去客户现场交通费32元请帮我看看怎么报",
},
)
)
user_claims = [
claim
for claim in db.query(ExpenseClaim).all()
if claim.employee_name == "预览员工"
]
assert response.status == "succeeded"
assert response.result.get("review_payload") is not None
assert response.result.get("draft_payload") is None
assert "交通费通常以实际票据金额为基础" in response.result["answer"]
assert user_claims == []
def test_orchestrator_prompts_scene_choices_before_review_for_fresh_ambiguous_expense(
monkeypatch,
) -> None:
monkeypatch.setattr(
"app.services.runtime_chat.RuntimeChatService.complete",
lambda *_args, **_kwargs: None,
)
session_factory = build_session_factory()
with session_factory() as db:
service = AgentConversationService(db)
conversation = service.get_or_create_conversation(
conversation_id="conv-scene-choice",
user_id="emp-scene-choice@example.com",
source="user_message",
context_json={
"session_type": "expense",
"draft_claim_id": "claim-old",
"review_form_values": {
"expense_type": "差旅费",
"business_location": "北京",
},
},
)
response = OrchestratorService(db).run(
OrchestratorRequest(
source="user_message",
user_id="emp-scene-choice@example.com",
conversation_id=conversation.conversation_id,
message="业务发生时间:2026-02-20 至 2026-02-23去上海支持上海电力部署项目申请报销",
context_json={
"session_type": "expense",
"draft_claim_id": "claim-old",
},
)
)
result = response.result
assert response.status == "succeeded"
assert result.get("review_payload") is None
assert result.get("draft_payload") is None
assert "请先在下面选择报销场景" in result["answer"]
assert [item["label"] for item in result["suggested_actions"][:3]] == ["差旅费", "交通费", "住宿费"]
def test_orchestrator_application_session_does_not_use_reimbursement_scene_prompt(
monkeypatch,
) -> None:
monkeypatch.setattr(
"app.services.runtime_chat.RuntimeChatService.complete",
lambda *_args, **_kwargs: None,
)
session_factory = build_session_factory()
message = (
"发生时间2026-05-25\n"
"地点:上海\n"
"事由:支持上海国网服务器部署\n"
"天数3天"
)
with session_factory() as db:
response = OrchestratorService(db).run(
OrchestratorRequest(
source="user_message",
user_id="application-session@example.com",
message=message,
context_json={
"session_type": "application",
"entry_source": "application",
"name": "申请员工",
},
)
)
result = response.result
assert response.status == "blocked"
assert response.trace_summary.scenario == "expense"
assert "费用申请" in result["answer"]
assert "| 出发时间 | 2026-05-25 |" in result["answer"]
assert "| 返回时间 | 2026-05-27 |" in result["answer"]
assert "请先在下面选择报销场景" not in result["answer"]
assert result.get("review_payload") is None
def test_orchestrator_application_session_guides_transport_estimate_and_submit(
monkeypatch,
) -> None:
monkeypatch.setattr(
"app.services.runtime_chat.RuntimeChatService.complete",
lambda *_args, **_kwargs: None,
)
session_factory = build_session_factory()
initial_message = (
"发生时间2026-05-25\n"
"地点:上海\n"
"事由:支持上海国网服务器部署\n"
"天数3天"
)
context_json = {
"session_type": "application",
"entry_source": "application",
"name": "申请员工",
"manager_name": "陈硕",
}
with session_factory() as db:
service = OrchestratorService(db)
first = service.run(
OrchestratorRequest(
source="user_message",
user_id="application-flow@example.com",
message=initial_message,
context_json=context_json,
)
)
second = service.run(
OrchestratorRequest(
source="user_message",
user_id="application-flow@example.com",
conversation_id=first.conversation_id,
message="飞机",
context_json=context_json,
)
)
third = service.run(
OrchestratorRequest(
source="user_message",
user_id="application-flow@example.com",
conversation_id=first.conversation_id,
message="确认提交",
context_json=context_json,
)
)
assert first.status == "blocked"
assert "当前还需要补充:出行方式" in first.result["answer"]
assert [item["label"] for item in first.result["suggested_actions"]] == ["一次性补充申请信息"]
assert first.result["suggested_actions"][0]["payload"]["prompt_prefill"] == "出行方式:"
assert "这是费用申请核对结果" in second.result["answer"]
assert "| 事由 | 支持上海国网服务器部署 |" in second.result["answer"]
assert "| 系统预估费用 |" in second.result["answer"]
assert "| 交通费用口径 | 预估交通费用 2,330元 |" in second.result["answer"]
assert "2,330元" in second.result["answer"]
assert "参考票价" not in second.result["answer"]
assert "查询耗时" not in second.result["answer"]
assert "请核对上述信息无误" in second.result["answer"]
assert "[确认](#application-submit)" in second.result["answer"]
assert second.status == "blocked"
assert second.result["requires_confirmation"] is True
assert second.result["suggested_actions"] == []
assert third.status == "succeeded"
assert third.result["clarification_required"] is False
assert third.result["missing_slots"] == []
assert "申请单据已生成,并已进入审批流程" in third.result["answer"]
assert "系统已推送给 陈硕 审核,当前节点:陈硕审核中" in third.result["answer"]
assert third.result["suggested_actions"] == []
application_claims = [
claim
for claim in db.query(ExpenseClaim).all()
if claim.claim_no.startswith("AP-")
]
assert len(application_claims) == 1
assert application_claims[0].status == "submitted"
assert application_claims[0].approval_stage == "直属领导审批"
assert third.result["draft_payload"]["claim_no"] == application_claims[0].claim_no
def test_orchestrator_application_submit_bypasses_generic_operation_block(
monkeypatch,
) -> None:
monkeypatch.setattr(
"app.services.runtime_chat.RuntimeChatService.complete",
lambda *_args, **_kwargs: None,
)
session_factory = build_session_factory()
initial_message = (
"发生时间2026-05-25\n"
"地点:上海\n"
"事由:支持上海国网服务器部署\n"
"天数3天"
)
context_json = {
"session_type": "application",
"entry_source": "application",
"name": "申请员工",
"manager_name": "陈硕",
}
with session_factory() as db:
service = OrchestratorService(db)
first = service.run(
OrchestratorRequest(
source="user_message",
user_id="application-approval-required@example.com",
message=initial_message,
context_json=context_json,
)
)
preview = service.run(
OrchestratorRequest(
source="user_message",
user_id="application-approval-required@example.com",
conversation_id=first.conversation_id,
message="飞机",
context_json=context_json,
)
)
def approval_required_parse_for_run(self, request, run_id): # noqa: ANN001
return OntologyParseResult(
scenario="expense",
intent="operate",
entities=[],
permission=OntologyPermission(
level="approval_required",
allowed=False,
reason="操作类请求需要人工审批确认。",
),
confidence=0.95,
missing_slots=[],
ambiguity=[],
clarification_required=False,
clarification_question=None,
run_id=run_id,
)
monkeypatch.setattr(
"app.services.ontology.SemanticOntologyService.parse_for_run",
approval_required_parse_for_run,
)
submitted = service.run(
OrchestratorRequest(
source="user_message",
user_id="application-approval-required@example.com",
conversation_id=first.conversation_id,
message="确认提交",
context_json=context_json,
)
)
assert preview.status == "blocked"
assert submitted.status == "succeeded"
assert submitted.requires_confirmation is False
assert "操作类请求需要人工审批确认" not in submitted.result["answer"]
assert "当前仅返回确认摘要" not in submitted.result["answer"]
assert "申请单据已生成,并已进入审批流程" in submitted.result["answer"]
assert submitted.result["draft_payload"]["status"] == "submitted"