Files
X-Financial/server/tests/test_orchestrator_service.py

1229 lines
43 KiB
Python
Raw Normal View History

from __future__ import annotations
from collections.abc import Generator
from datetime import UTC, datetime, timedelta
from decimal import Decimal
from fastapi.testclient import TestClient
from sqlalchemy import create_engine, func, select
from sqlalchemy.orm import Session, sessionmaker
from sqlalchemy.pool import StaticPool
from app.api.deps import get_db
from app.db.base import Base
from app.main import create_app
from app.models.agent_conversation import AgentConversation, AgentConversationMessage
from app.models.employee import Employee
from app.models.financial_record import (
AccountsPayableRecord,
AccountsReceivableRecord,
ExpenseClaim,
)
from app.schemas.settings import SettingsWrite
from app.services.agent_assets import AgentAssetService
from app.services.settings import SettingsService
def build_client() -> tuple[TestClient, sessionmaker[Session]]:
engine = create_engine(
"sqlite+pysqlite:///:memory:",
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
Base.metadata.create_all(bind=engine)
session_factory = sessionmaker(bind=engine, autoflush=False, autocommit=False)
app = create_app()
def override_db() -> Generator[Session, None, None]:
db = session_factory()
try:
yield db
finally:
db.close()
app.dependency_overrides[get_db] = override_db
return TestClient(app), session_factory
def test_orchestrator_routes_user_query_to_user_agent() -> None:
client, _ = build_client()
response = client.post(
"/api/v1/orchestrator/run",
json={
"source": "user_message",
"user_id": "pytest",
"message": "客户A这个月还有多少应收",
"context_json": {"role_codes": ["finance"]},
},
)
assert response.status_code == 200
payload = response.json()
assert payload["selected_agent"] == "user_agent"
assert payload["permission_level"] == "read"
assert payload["status"] == "succeeded"
assert payload["conversation_id"]
assert payload["result"]["answer"]
assert payload["result"]["suggested_actions"]
assert payload["trace_summary"]["tool_count"] >= 1
run_detail = client.get(f"/api/v1/agent-runs/{payload['run_id']}").json()
assert run_detail["agent"] == "user_agent"
assert run_detail["route_json"]["selected_agent"] == "user_agent"
assert run_detail["semantic_parse"]["scenario"] == "accounts_receivable"
assert run_detail["tool_calls"][0]["tool_type"] == "database"
def test_orchestrator_does_not_auto_seed_demo_financial_records() -> None:
client, session_factory = build_client()
response = client.post(
"/api/v1/orchestrator/run",
json={
"source": "user_message",
"user_id": "pytest",
"message": "请查询我的报销单",
"context_json": {"role_codes": ["employee"], "name": "测试用户"},
},
)
assert response.status_code == 200
payload = response.json()
assert payload["result"]["query_payload"]["record_count"] == 0
with session_factory() as db:
assert db.scalar(select(func.count()).select_from(ExpenseClaim)) == 0
assert db.scalar(select(func.count()).select_from(AccountsReceivableRecord)) == 0
assert db.scalar(select(func.count()).select_from(AccountsPayableRecord)) == 0
def test_orchestrator_scopes_my_expense_query_to_current_user() -> None:
client, session_factory = build_client()
user_id = "zhaoliu@example.com"
with session_factory() as db:
employee = Employee(
employee_no="E9001",
name="赵六",
email=user_id,
)
db.add(employee)
db.flush()
db.add_all(
[
ExpenseClaim(
claim_no="EXP-TEST-001",
employee_id=employee.id,
employee_name="赵六",
department_name="测试部",
project_code="PRJ-TEST-01",
expense_type="travel",
reason="上海客户拜访",
location="上海",
amount=Decimal("120.00"),
currency="CNY",
invoice_count=1,
occurred_at=datetime(2026, 5, 10, 9, 0, tzinfo=UTC),
submitted_at=datetime(2026, 5, 10, 18, 0, tzinfo=UTC),
status="submitted",
approval_stage="finance_review",
risk_flags_json=[],
),
ExpenseClaim(
claim_no="EXP-TEST-002",
employee_name=user_id,
department_name="测试部",
project_code="PRJ-TEST-02",
expense_type="meal",
reason="客户午餐",
location="杭州",
amount=Decimal("300.00"),
currency="CNY",
invoice_count=1,
occurred_at=datetime(2026, 5, 11, 12, 0, tzinfo=UTC),
submitted_at=datetime(2026, 5, 11, 13, 0, tzinfo=UTC),
status="draft",
approval_stage=None,
risk_flags_json=[],
),
ExpenseClaim(
claim_no="EXP-TEST-003",
employee_name="赵六",
department_name="测试部",
project_code="PRJ-TEST-03",
expense_type="hotel",
reason="历史住宿报销",
location="南京",
amount=Decimal("888.00"),
currency="CNY",
invoice_count=1,
occurred_at=datetime(2026, 4, 20, 8, 30, tzinfo=UTC),
submitted_at=datetime(2026, 4, 20, 9, 30, tzinfo=UTC),
status="approved",
approval_stage="completed",
risk_flags_json=[],
),
ExpenseClaim(
claim_no="EXP-TEST-004",
employee_name="张三",
department_name="财务部",
project_code="PRJ-OTHER-01",
expense_type="hotel",
reason="外地出差住宿",
location="深圳",
amount=Decimal("999.00"),
currency="CNY",
invoice_count=1,
occurred_at=datetime(2026, 5, 12, 8, 30, tzinfo=UTC),
submitted_at=datetime(2026, 5, 12, 9, 30, tzinfo=UTC),
status="approved",
approval_stage="completed",
risk_flags_json=[],
),
]
)
db.commit()
response = client.post(
"/api/v1/orchestrator/run",
json={
"source": "user_message",
"user_id": user_id,
"message": "请查询我的报销单",
"context_json": {
"role_codes": ["employee"],
"name": "赵六",
"client_now_iso": "2026-05-13T08:00:00+00:00",
},
},
)
assert response.status_code == 200
payload = response.json()
assert payload["selected_agent"] == "user_agent"
assert payload["status"] == "succeeded"
assert "2026-05-04 至 2026-05-13的你的报销单" in payload["result"]["answer"]
assert "共 2 笔" in payload["result"]["answer"]
assert "超过 10 日的单据" in payload["result"]["answer"]
assert payload["result"]["query_payload"]["record_count"] == 2
assert payload["result"]["query_payload"]["older_record_count"] == 1
assert payload["result"]["query_payload"]["window_start_date"] == "2026-05-04"
assert payload["result"]["query_payload"]["window_end_date"] == "2026-05-13"
assert [item["claim_no"] for item in payload["result"]["query_payload"]["records"]] == [
"EXP-TEST-002",
"EXP-TEST-001",
]
run_detail = client.get(f"/api/v1/agent-runs/{payload['run_id']}").json()
tool_response = run_detail["tool_calls"][0]["response_json"]
assert tool_response["record_count"] == 2
assert tool_response["total_amount"] == 420.0
assert tool_response["recent_window_applied"] is True
assert tool_response["window_start_date"] == "2026-05-04"
assert tool_response["window_end_date"] == "2026-05-13"
assert tool_response["older_record_count"] == 1
assert tool_response["scoped_to_current_user"] is True
assert tool_response["scope_label"] == "你的报销单"
assert [item["claim_no"] for item in tool_response["records"]] == [
"EXP-TEST-002",
"EXP-TEST-001",
]
def test_orchestrator_non_finance_cannot_query_other_users_expense_claims() -> None:
client, session_factory = build_client()
user_id = "manager1@example.com"
with session_factory() as db:
owner = Employee(
employee_no="E9101",
name="李经理",
email=user_id,
)
other = Employee(
employee_no="E9102",
name="王同学",
email="other@example.com",
)
db.add_all([owner, other])
db.flush()
db.add_all(
[
ExpenseClaim(
claim_no="EXP-MGR-001",
employee_id=owner.id,
employee_name="李经理",
department_name="管理部",
project_code="PRJ-MGR-01",
expense_type="travel",
reason="本人出差",
location="上海",
amount=Decimal("100.00"),
currency="CNY",
invoice_count=1,
occurred_at=datetime(2026, 5, 11, 9, 0, tzinfo=UTC),
submitted_at=datetime(2026, 5, 11, 10, 0, tzinfo=UTC),
status="submitted",
approval_stage="finance_review",
risk_flags_json=[],
),
ExpenseClaim(
claim_no="EXP-MGR-002",
employee_id=other.id,
employee_name="王同学",
department_name="销售部",
project_code="PRJ-SALES-02",
expense_type="meal",
reason="他人报销",
location="杭州",
amount=Decimal("300.00"),
currency="CNY",
invoice_count=1,
occurred_at=datetime(2026, 5, 11, 12, 0, tzinfo=UTC),
submitted_at=datetime(2026, 5, 11, 13, 0, tzinfo=UTC),
status="approved",
approval_stage="completed",
risk_flags_json=[],
),
]
)
db.commit()
response = client.post(
"/api/v1/orchestrator/run",
json={
"source": "user_message",
"user_id": user_id,
"message": "请查询王同学的报销单",
"context_json": {
"role_codes": ["manager"],
"name": "李经理",
"client_now_iso": "2026-05-13T08:00:00+00:00",
},
},
)
assert response.status_code == 200
payload = response.json()
assert payload["result"]["query_payload"]["record_count"] == 1
assert [item["claim_no"] for item in payload["result"]["query_payload"]["records"]] == [
"EXP-MGR-001",
]
def test_orchestrator_finance_can_query_all_expense_claims() -> None:
client, session_factory = build_client()
with session_factory() as db:
db.add_all(
[
ExpenseClaim(
claim_no="EXP-FIN-001",
employee_name="",
department_name="A部",
project_code="PRJ-A",
expense_type="travel",
reason="A 报销",
location="上海",
amount=Decimal("120.00"),
currency="CNY",
invoice_count=1,
occurred_at=datetime(2026, 5, 11, 9, 0, tzinfo=UTC),
submitted_at=datetime(2026, 5, 11, 10, 0, tzinfo=UTC),
status="submitted",
approval_stage="finance_review",
risk_flags_json=[],
),
ExpenseClaim(
claim_no="EXP-FIN-002",
employee_name="",
department_name="B部",
project_code="PRJ-B",
expense_type="meal",
reason="B 报销",
location="杭州",
amount=Decimal("300.00"),
currency="CNY",
invoice_count=1,
occurred_at=datetime(2026, 5, 11, 12, 0, tzinfo=UTC),
submitted_at=datetime(2026, 5, 11, 13, 0, tzinfo=UTC),
status="approved",
approval_stage="completed",
risk_flags_json=[],
),
]
)
db.commit()
response = client.post(
"/api/v1/orchestrator/run",
json={
"source": "user_message",
"user_id": "finance@example.com",
"message": "请查询所有报销单",
"context_json": {
"role_codes": ["finance"],
"name": "财务",
"client_now_iso": "2026-05-13T08:00:00+00:00",
},
},
)
assert response.status_code == 200
payload = response.json()
assert payload["result"]["query_payload"]["record_count"] == 2
assert {item["claim_no"] for item in payload["result"]["query_payload"]["records"]} == {
"EXP-FIN-001",
"EXP-FIN-002",
}
def test_orchestrator_expense_query_claim_no_bypasses_recent_window() -> None:
client, session_factory = build_client()
user_id = "zhaoliu@example.com"
with session_factory() as db:
db.add(
ExpenseClaim(
claim_no="EXP-202604-001",
employee_name="赵六",
department_name="测试部",
project_code="PRJ-OLD-01",
expense_type="travel",
reason="上月差旅",
location="北京",
amount=Decimal("560.00"),
currency="CNY",
invoice_count=1,
occurred_at=datetime(2026, 4, 1, 9, 0, tzinfo=UTC),
submitted_at=datetime(2026, 4, 1, 18, 0, tzinfo=UTC),
status="approved",
approval_stage="completed",
risk_flags_json=[],
)
)
db.commit()
response = client.post(
"/api/v1/orchestrator/run",
json={
"source": "user_message",
"user_id": user_id,
"message": "请查询报销单 EXP-202604-001",
"context_json": {
"role_codes": ["employee"],
"name": "赵六",
"client_now_iso": "2026-05-13T08:00:00+00:00",
},
},
)
assert response.status_code == 200
payload = response.json()
assert payload["result"]["query_payload"]["recent_window_applied"] is False
assert payload["result"]["query_payload"]["record_count"] == 1
assert payload["result"]["query_payload"]["older_record_count"] == 0
assert payload["result"]["query_payload"]["records"][0]["claim_no"] == "EXP-202604-001"
def test_orchestrator_routes_schedule_to_hermes() -> None:
client, session_factory = build_client()
with session_factory() as db:
task = next(
item
for item in AgentAssetService(db).list_assets(asset_type="task", status="active")
if item.code == "task.hermes.daily_risk_scan"
)
response = client.post(
"/api/v1/orchestrator/run",
json={
"source": "schedule",
"task_id": task.id,
"context_json": {"role_codes": ["finance"]},
},
)
assert response.status_code == 200
payload = response.json()
assert payload["selected_agent"] == "hermes"
assert payload["status"] == "succeeded"
assert payload["trace_summary"]["tool_count"] == 2
run_detail = client.get(f"/api/v1/agent-runs/{payload['run_id']}").json()
assert run_detail["agent"] == "hermes"
assert run_detail["route_json"]["selected_agent"] == "hermes"
assert len(run_detail["tool_calls"]) == 2
def test_orchestrator_forbidden_request_does_not_call_downstream_agent() -> None:
client, _ = build_client()
response = client.post(
"/api/v1/orchestrator/run",
json={
"source": "user_message",
"user_id": "pytest",
"message": "帮我直接付款给供应商B",
"context_json": {"role_codes": ["user"]},
},
)
assert response.status_code == 200
payload = response.json()
assert payload["selected_agent"] is None
assert payload["permission_level"] == "forbidden"
assert payload["status"] == "blocked"
assert payload["trace_summary"]["tool_count"] == 0
run_detail = client.get(f"/api/v1/agent-runs/{payload['run_id']}").json()
assert run_detail["agent"] == "orchestrator"
assert run_detail["tool_calls"] == []
def test_orchestrator_approval_required_returns_confirmation_result() -> None:
client, _ = build_client()
response = client.post(
"/api/v1/orchestrator/run",
json={
"source": "user_message",
"user_id": "pytest",
"message": "帮我安排付款给供应商B",
"context_json": {"role_codes": ["finance"]},
},
)
assert response.status_code == 200
payload = response.json()
assert payload["selected_agent"] == "user_agent"
assert payload["permission_level"] == "approval_required"
assert payload["requires_confirmation"] is True
assert payload["status"] == "blocked"
assert "确认" in payload["result"]["message"]
def test_orchestrator_user_agent_draft_returns_structured_payload() -> None:
client, session_factory = build_client()
response = client.post(
"/api/v1/orchestrator/run",
json={
"source": "user_message",
"user_id": "pytest",
"message": "帮我生成张三4月差旅报销草稿",
"context_json": {"role_codes": ["finance"]},
},
)
assert response.status_code == 200
payload = response.json()
assert payload["selected_agent"] == "user_agent"
assert payload["status"] == "succeeded"
assert payload["result"]["draft_payload"]["confirmation_required"] is True
assert payload["result"]["review_payload"]["slot_cards"]
assert payload["result"]["draft_payload"]["claim_id"]
assert payload["result"]["draft_payload"]["claim_no"].startswith("EXP-")
assert payload["result"]["draft_payload"]["status"] == "draft"
assert payload["result"]["suggested_actions"]
with session_factory() as db:
claim = db.scalar(
select(ExpenseClaim).where(
ExpenseClaim.id == payload["result"]["draft_payload"]["claim_id"]
)
)
assert claim is not None
assert claim.claim_no == payload["result"]["draft_payload"]["claim_no"]
assert claim.status == "draft"
assert claim.items
def test_orchestrator_blocks_fourth_expense_draft_for_same_user() -> None:
client, session_factory = build_client()
user_id = "zhangsan@example.com"
with session_factory() as db:
db.add(
Employee(
employee_no="E1001",
name="张三",
email=user_id,
)
)
db.commit()
for amount, city in ((120, "上海"), (240, "北京"), (360, "深圳")):
response = client.post(
"/api/v1/orchestrator/run",
json={
"source": "user_message",
"user_id": user_id,
"message": f"帮我生成报销草稿,我昨天去{city}出差,交通费{amount}",
"context_json": {
"role_codes": ["finance"],
"name": "张三",
},
},
)
assert response.status_code == 200
payload = response.json()
assert payload["result"]["draft_payload"]["claim_no"].startswith("EXP-")
blocked_response = client.post(
"/api/v1/orchestrator/run",
json={
"source": "user_message",
"user_id": user_id,
"message": "帮我生成报销草稿我昨天去杭州出差交通费480元",
"context_json": {
"role_codes": ["finance"],
"name": "张三",
"review_action": "save_draft",
},
},
)
assert blocked_response.status_code == 200
blocked_payload = blocked_response.json()
assert blocked_payload["status"] == "succeeded"
assert "你当前已保存 3 个草稿" in blocked_payload["result"]["answer"]
assert blocked_payload["result"]["draft_payload"]["claim_id"] is None
assert blocked_payload["result"]["draft_payload"]["claim_no"] is None
assert blocked_payload["result"]["draft_payload"]["status"] == "blocked"
with session_factory() as db:
draft_count = db.scalar(
select(func.count())
.select_from(ExpenseClaim)
.where(ExpenseClaim.status == "draft")
)
assert draft_count == 3
def test_orchestrator_allows_existing_draft_update_when_user_already_has_three_drafts() -> None:
client, session_factory = build_client()
user_id = "lisi@example.com"
with session_factory() as db:
db.add(
Employee(
employee_no="E1002",
name="李四",
email=user_id,
)
)
db.commit()
first_response = client.post(
"/api/v1/orchestrator/run",
json={
"source": "user_message",
"user_id": user_id,
"message": "帮我生成报销草稿我昨天去上海出差交通费120元",
"context_json": {
"role_codes": ["finance"],
"name": "李四",
},
},
)
assert first_response.status_code == 200
first_payload = first_response.json()
claim_id = first_payload["result"]["draft_payload"]["claim_id"]
conversation_id = first_payload["conversation_id"]
assert claim_id
assert conversation_id
for amount, city in ((240, "北京"), (360, "深圳")):
response = client.post(
"/api/v1/orchestrator/run",
json={
"source": "user_message",
"user_id": user_id,
"message": f"帮我生成报销草稿,我昨天去{city}出差,交通费{amount}",
"context_json": {
"role_codes": ["finance"],
"name": "李四",
},
},
)
assert response.status_code == 200
payload = response.json()
assert payload["result"]["draft_payload"]["claim_no"].startswith("EXP-")
update_response = client.post(
"/api/v1/orchestrator/run",
json={
"source": "user_message",
"user_id": user_id,
"conversation_id": conversation_id,
"message": "金额改成888元",
"context_json": {
"role_codes": ["finance"],
"name": "李四",
},
},
)
assert update_response.status_code == 200
update_payload = update_response.json()
assert update_payload["result"]["draft_payload"]["claim_id"] == claim_id
with session_factory() as db:
claim = db.scalar(select(ExpenseClaim).where(ExpenseClaim.id == claim_id))
assert claim is not None
assert float(claim.amount) == 888.0
draft_count = db.scalar(
select(func.count())
.select_from(ExpenseClaim)
.where(ExpenseClaim.employee_id == claim.employee_id)
.where(ExpenseClaim.status == "draft")
)
assert draft_count == 3
def test_orchestrator_persists_conversation_and_reuses_expense_draft_context() -> None:
client, session_factory = build_client()
first_response = client.post(
"/api/v1/orchestrator/run",
json={
"source": "user_message",
"user_id": "pytest",
"message": "帮我生成一份差旅报销草稿我昨天去上海出差交通费680元",
"context_json": {
"role_codes": ["finance"],
"attachment_names": ["行程单.png"],
"attachment_count": 1,
"ocr_summary": "行程单金额680元",
},
},
)
assert first_response.status_code == 200
first_payload = first_response.json()
conversation_id = first_payload["conversation_id"]
claim_id = first_payload["result"]["draft_payload"]["claim_id"]
assert conversation_id
assert claim_id
second_response = client.post(
"/api/v1/orchestrator/run",
json={
"source": "user_message",
"user_id": "pytest",
"conversation_id": conversation_id,
"message": "金额改成800元",
"context_json": {
"role_codes": ["finance"],
},
},
)
assert second_response.status_code == 200
second_payload = second_response.json()
assert second_payload["conversation_id"] == conversation_id
assert second_payload["trace_summary"]["scenario"] == "expense"
assert second_payload["trace_summary"]["intent"] == "draft"
assert second_payload["result"]["draft_payload"]["claim_id"] == claim_id
with session_factory() as db:
claim = db.scalar(select(ExpenseClaim).where(ExpenseClaim.id == claim_id))
assert claim is not None
assert float(claim.amount) == 800.0
conversation = db.scalar(
select(AgentConversation).where(AgentConversation.conversation_id == conversation_id)
)
assert conversation is not None
assert conversation.draft_claim_id == claim_id
assert conversation.last_scenario == "expense"
assert conversation.last_intent == "draft"
message_count = db.scalar(
select(func.count())
.select_from(AgentConversationMessage)
.where(AgentConversationMessage.conversation_id == conversation_id)
)
assert message_count == 4
def test_orchestrator_does_not_reuse_conversation_when_user_changes() -> None:
client, session_factory = build_client()
first_response = client.post(
"/api/v1/orchestrator/run",
json={
"source": "user_message",
"user_id": "user_a",
"message": "帮我生成一份差旅报销草稿我昨天去上海出差交通费680元",
"context_json": {"role_codes": ["finance"]},
},
)
assert first_response.status_code == 200
first_payload = first_response.json()
first_conversation_id = first_payload["conversation_id"]
assert first_conversation_id
second_response = client.post(
"/api/v1/orchestrator/run",
json={
"source": "user_message",
"user_id": "user_b",
"conversation_id": first_conversation_id,
"message": "查一下本周报销金额",
"context_json": {"role_codes": ["finance"]},
},
)
assert second_response.status_code == 200
second_payload = second_response.json()
assert second_payload["conversation_id"]
assert second_payload["conversation_id"] != first_conversation_id
with session_factory() as db:
first_conversation = db.scalar(
select(AgentConversation).where(
AgentConversation.conversation_id == first_conversation_id
)
)
second_conversation = db.scalar(
select(AgentConversation).where(
AgentConversation.conversation_id == second_payload["conversation_id"]
)
)
assert first_conversation is not None
assert second_conversation is not None
assert first_conversation.user_id == "user_a"
assert second_conversation.user_id == "user_b"
def test_orchestrator_prunes_conversations_older_than_configured_retention_days() -> None:
client, session_factory = build_client()
expired_conversation_id = "conv_expired"
expired_at = datetime.now(UTC) - timedelta(days=2)
with session_factory() as db:
settings_service = SettingsService(db)
settings_payload = settings_service.get_settings_snapshot().model_dump()
settings_payload["sessionForm"]["conversationRetentionDays"] = 1
settings_service.save_settings_snapshot(SettingsWrite(**settings_payload))
conversation = AgentConversation(
conversation_id=expired_conversation_id,
user_id="expired_user",
source="user_message",
state_json={},
message_count=1,
created_at=expired_at,
updated_at=expired_at,
)
db.add(conversation)
db.flush()
db.add(
AgentConversationMessage(
conversation_id=expired_conversation_id,
role="user",
content="旧会话消息",
created_at=expired_at,
)
)
db.commit()
response = client.post(
"/api/v1/orchestrator/run",
json={
"source": "user_message",
"user_id": "fresh_user",
"message": "查一下本周报销金额",
"context_json": {"role_codes": ["finance"]},
},
)
assert response.status_code == 200
with session_factory() as db:
conversation = db.scalar(
select(AgentConversation).where(
AgentConversation.conversation_id == expired_conversation_id
)
)
message_count = db.scalar(
select(func.count())
.select_from(AgentConversationMessage)
.where(AgentConversationMessage.conversation_id == expired_conversation_id)
)
assert conversation is None
assert message_count == 0
def test_orchestrator_treats_expense_narrative_as_draft_instead_of_ar_query() -> None:
client, _ = build_client()
response = client.post(
"/api/v1/orchestrator/run",
json={
"source": "user_message",
"user_id": "pytest",
"message": "我今天去客户现场招待了客户花销了1000元",
"context_json": {"role_codes": ["finance"]},
},
)
assert response.status_code == 200
payload = response.json()
assert payload["selected_agent"] == "user_agent"
assert payload["permission_level"] == "draft_write"
assert payload["status"] == "blocked"
assert payload["route_reason"] == "clarification_required"
assert payload["trace_summary"]["scenario"] == "expense"
assert payload["trace_summary"]["intent"] == "draft"
assert payload["trace_summary"]["tool_count"] == 0
assert "应收场景数据" not in payload["result"]["message"]
assert payload["result"]["message"].startswith("识别到您希望报销一笔“业务招待费”费用")
review_payload = payload["result"]["review_payload"]
assert review_payload["intent_summary"].startswith("识别到您希望报销一笔“业务招待费”费用。")
assert review_payload["missing_slots"] == ["客户名称", "参与人员", "票据附件"]
slot_map = {item["key"]: item for item in review_payload["slot_cards"]}
assert slot_map["time_range"]["raw_value"] == "今天"
assert slot_map["location"]["value"] == "客户现场"
assert slot_map["amount"]["value"] == "1000.00元"
def test_orchestrator_can_restore_latest_user_conversation() -> None:
client, _ = build_client()
first_response = client.post(
"/api/v1/orchestrator/run",
json={
"source": "user_message",
"user_id": "restore_user",
"message": "帮我生成一份差旅报销草稿我昨天去上海出差交通费680元",
"context_json": {
"role_codes": ["finance"],
"attachment_names": ["行程单.png"],
"attachment_count": 1,
"ocr_summary": "行程单金额680元",
},
},
)
assert first_response.status_code == 200
first_payload = first_response.json()
second_response = client.post(
"/api/v1/orchestrator/run",
json={
"source": "user_message",
"user_id": "restore_user",
"conversation_id": first_payload["conversation_id"],
"message": "金额改成800元",
"context_json": {"role_codes": ["finance"]},
},
)
assert second_response.status_code == 200
restore_response = client.get(
"/api/v1/orchestrator/conversations/latest",
params={"user_id": "restore_user"},
)
assert restore_response.status_code == 200
restore_payload = restore_response.json()
assert restore_payload["found"] is True
assert restore_payload["conversation"]["conversation_id"] == first_payload["conversation_id"]
assert restore_payload["conversation"]["draft_claim_id"] == first_payload["result"]["draft_payload"]["claim_id"]
assert len(restore_payload["conversation"]["messages"]) == 4
assert restore_payload["conversation"]["messages"][0]["role"] == "user"
assert restore_payload["conversation"]["messages"][0]["message_json"]["attachment_names"] == ["行程单.png"]
assert restore_payload["conversation"]["messages"][1]["message_json"]["orchestrator_payload"]["run_id"]
def test_orchestrator_restores_conversation_messages_in_sequence_order() -> None:
client, session_factory = build_client()
conversation_id = "conv_restore_sequence"
created_at = datetime(2026, 5, 13, 13, 20, tzinfo=UTC)
with session_factory() as db:
conversation = AgentConversation(
conversation_id=conversation_id,
user_id="sequence_user",
source="user_message",
state_json={"session_type": "expense"},
message_count=4,
created_at=created_at,
updated_at=created_at,
)
db.add(conversation)
db.flush()
db.add_all(
[
AgentConversationMessage(
id="msg-z-assistant",
conversation_id=conversation_id,
run_id="run-a",
role="assistant",
content="第二条:助手回复",
message_json={"sequence": 2},
created_at=created_at,
),
AgentConversationMessage(
id="msg-b-user",
conversation_id=conversation_id,
run_id="run-b",
role="user",
content="第三条:用户追问",
message_json={"sequence": 3},
created_at=created_at,
),
AgentConversationMessage(
id="msg-a-user",
conversation_id=conversation_id,
run_id="run-a",
role="user",
content="第一条:用户发起",
message_json={"sequence": 1},
created_at=created_at,
),
AgentConversationMessage(
id="msg-c-assistant",
conversation_id=conversation_id,
run_id="run-b",
role="assistant",
content="第四条:助手总结",
message_json={"sequence": 4},
created_at=created_at,
),
]
)
db.commit()
restore_response = client.get(
"/api/v1/orchestrator/conversations/latest",
params={"user_id": "sequence_user", "session_type": "expense"},
)
assert restore_response.status_code == 200
restore_payload = restore_response.json()
assert restore_payload["found"] is True
assert [item["content"] for item in restore_payload["conversation"]["messages"]] == [
"第一条:用户发起",
"第二条:助手回复",
"第三条:用户追问",
"第四条:助手总结",
]
def test_orchestrator_can_delete_all_user_conversations() -> None:
client, session_factory = build_client()
for message in ("查一下本周报销金额", "帮我生成差旅报销草稿"):
response = client.post(
"/api/v1/orchestrator/run",
json={
"source": "user_message",
"user_id": "delete_user",
"message": message,
"context_json": {"role_codes": ["finance"]},
},
)
assert response.status_code == 200
other_response = client.post(
"/api/v1/orchestrator/run",
json={
"source": "user_message",
"user_id": "other_user",
"message": "查一下供应商待付款",
"context_json": {"role_codes": ["finance"]},
},
)
assert other_response.status_code == 200
delete_response = client.delete(
"/api/v1/orchestrator/conversations",
params={"user_id": "delete_user"},
)
assert delete_response.status_code == 200
delete_payload = delete_response.json()
assert delete_payload["deleted_count"] == 2
with session_factory() as db:
deleted_count = db.scalar(
select(func.count())
.select_from(AgentConversation)
.where(AgentConversation.user_id == "delete_user")
)
other_count = db.scalar(
select(func.count())
.select_from(AgentConversation)
.where(AgentConversation.user_id == "other_user")
)
assert deleted_count == 0
assert other_count == 1
def test_orchestrator_can_delete_current_user_single_conversation() -> None:
client, session_factory = build_client()
first_response = client.post(
"/api/v1/orchestrator/run",
json={
"source": "user_message",
"user_id": "single_delete_user",
"message": "查一下本周报销金额",
"context_json": {"role_codes": ["finance"]},
},
)
second_response = client.post(
"/api/v1/orchestrator/run",
json={
"source": "user_message",
"user_id": "single_delete_user",
"message": "帮我生成差旅报销草稿",
"context_json": {"role_codes": ["finance"]},
},
)
assert first_response.status_code == 200
assert second_response.status_code == 200
first_conversation_id = first_response.json()["conversation_id"]
second_conversation_id = second_response.json()["conversation_id"]
delete_response = client.delete(
f"/api/v1/orchestrator/conversations/{first_conversation_id}",
params={"user_id": "single_delete_user"},
)
assert delete_response.status_code == 200
assert delete_response.json()["deleted_count"] == 1
with session_factory() as db:
remaining_ids = list(
db.scalars(
select(AgentConversation.conversation_id)
.where(AgentConversation.user_id == "single_delete_user")
.order_by(AgentConversation.created_at.asc())
).all()
)
assert first_conversation_id not in remaining_ids
assert second_conversation_id in remaining_ids
def test_orchestrator_can_delete_user_conversations_by_session_type() -> None:
client, session_factory = build_client()
expense_response = client.post(
"/api/v1/orchestrator/run",
json={
"source": "user_message",
"user_id": "typed_delete_user",
"message": "帮我生成差旅报销草稿",
"context_json": {
"role_codes": ["finance"],
"session_type": "expense",
},
},
)
knowledge_response = client.post(
"/api/v1/orchestrator/run",
json={
"source": "user_message",
"user_id": "typed_delete_user",
"message": "发票抬头不一致还能报销吗",
"context_json": {
"role_codes": ["finance"],
"session_type": "knowledge",
},
},
)
assert expense_response.status_code == 200
assert knowledge_response.status_code == 200
delete_response = client.delete(
"/api/v1/orchestrator/conversations",
params={
"user_id": "typed_delete_user",
"session_type": "knowledge",
},
)
assert delete_response.status_code == 200
assert delete_response.json()["deleted_count"] == 1
with session_factory() as db:
remaining = list(
db.scalars(
select(AgentConversation)
.where(AgentConversation.user_id == "typed_delete_user")
.order_by(AgentConversation.created_at.asc())
).all()
)
assert len(remaining) == 1
remaining_session_type = str((remaining[0].state_json or {}).get("session_type") or "").strip() or "expense"
assert remaining_session_type == "expense"
def test_orchestrator_tool_failure_is_logged_and_degraded() -> None:
client, _ = build_client()
response = client.post(
"/api/v1/orchestrator/run",
json={
"source": "user_message",
"user_id": "pytest",
"message": "查一下本周报销金额",
"context_json": {
"role_codes": ["finance"],
"simulate_tool_failure": "database",
},
},
)
assert response.status_code == 200
payload = response.json()
assert payload["selected_agent"] == "user_agent"
assert payload["status"] == "succeeded"
assert payload["trace_summary"]["failed_tool_count"] == 1
assert payload["trace_summary"]["degraded"] is True
run_detail = client.get(f"/api/v1/agent-runs/{payload['run_id']}").json()
assert run_detail["tool_calls"][0]["status"] == "failed"
assert "simulated database failure" in run_detail["tool_calls"][0]["error_message"]
def test_orchestrator_exception_is_written_to_agent_run() -> None:
client, _ = build_client()
response = client.post(
"/api/v1/orchestrator/run",
json={
"source": "user_message",
"user_id": "pytest",
"message": "查一下本周报销金额",
"context_json": {
"role_codes": ["finance"],
"simulate_orchestrator_exception": True,
},
},
)
assert response.status_code == 200
payload = response.json()
assert payload["status"] == "failed"
run_detail = client.get(f"/api/v1/agent-runs/{payload['run_id']}").json()
assert run_detail["status"] == "failed"
assert "simulated orchestrator exception" in run_detail["error_message"]