from __future__ import annotations from collections.abc import Generator from datetime import UTC, datetime, timedelta from decimal import Decimal from fastapi.testclient import TestClient from sqlalchemy import create_engine, func, select from sqlalchemy.orm import Session, sessionmaker from sqlalchemy.pool import StaticPool from app.api.deps import get_db from app.db.base import Base from app.main import create_app from app.models.agent_conversation import AgentConversation, AgentConversationMessage from app.models.employee import Employee from app.models.financial_record import ( AccountsPayableRecord, AccountsReceivableRecord, ExpenseClaim, ) from app.schemas.settings import SettingsWrite from app.services.agent_assets import AgentAssetService from app.services.settings import SettingsService def build_client() -> tuple[TestClient, sessionmaker[Session]]: engine = create_engine( "sqlite+pysqlite:///:memory:", connect_args={"check_same_thread": False}, poolclass=StaticPool, ) Base.metadata.create_all(bind=engine) session_factory = sessionmaker(bind=engine, autoflush=False, autocommit=False) app = create_app() def override_db() -> Generator[Session, None, None]: db = session_factory() try: yield db finally: db.close() app.dependency_overrides[get_db] = override_db return TestClient(app), session_factory def test_orchestrator_routes_user_query_to_user_agent() -> None: client, _ = build_client() response = client.post( "/api/v1/orchestrator/run", json={ "source": "user_message", "user_id": "pytest", "message": "客户A这个月还有多少应收", "context_json": {"role_codes": ["finance"]}, }, ) assert response.status_code == 200 payload = response.json() assert payload["selected_agent"] == "user_agent" assert payload["permission_level"] == "read" assert payload["status"] == "succeeded" assert payload["conversation_id"] assert payload["result"]["answer"] assert payload["result"]["suggested_actions"] assert payload["trace_summary"]["tool_count"] >= 1 run_detail = client.get(f"/api/v1/agent-runs/{payload['run_id']}").json() assert run_detail["agent"] == "user_agent" assert run_detail["route_json"]["selected_agent"] == "user_agent" assert run_detail["semantic_parse"]["scenario"] == "accounts_receivable" assert run_detail["tool_calls"][0]["tool_type"] == "database" def test_orchestrator_does_not_auto_seed_demo_financial_records() -> None: client, session_factory = build_client() response = client.post( "/api/v1/orchestrator/run", json={ "source": "user_message", "user_id": "pytest", "message": "请查询我的报销单", "context_json": {"role_codes": ["employee"], "name": "测试用户"}, }, ) assert response.status_code == 200 payload = response.json() assert payload["result"]["query_payload"]["record_count"] == 0 with session_factory() as db: assert db.scalar(select(func.count()).select_from(ExpenseClaim)) == 0 assert db.scalar(select(func.count()).select_from(AccountsReceivableRecord)) == 0 assert db.scalar(select(func.count()).select_from(AccountsPayableRecord)) == 0 def test_orchestrator_scopes_my_expense_query_to_current_user() -> None: client, session_factory = build_client() user_id = "zhaoliu@example.com" with session_factory() as db: employee = Employee( employee_no="E9001", name="赵六", email=user_id, ) db.add(employee) db.flush() db.add_all( [ ExpenseClaim( claim_no="EXP-TEST-001", employee_id=employee.id, employee_name="赵六", department_name="测试部", project_code="PRJ-TEST-01", expense_type="travel", reason="上海客户拜访", location="上海", amount=Decimal("120.00"), currency="CNY", invoice_count=1, occurred_at=datetime(2026, 5, 10, 9, 0, tzinfo=UTC), submitted_at=datetime(2026, 5, 10, 18, 0, tzinfo=UTC), status="submitted", approval_stage="finance_review", risk_flags_json=[], ), ExpenseClaim( claim_no="EXP-TEST-002", employee_name=user_id, department_name="测试部", project_code="PRJ-TEST-02", expense_type="meal", reason="客户午餐", location="杭州", amount=Decimal("300.00"), currency="CNY", invoice_count=1, occurred_at=datetime(2026, 5, 11, 12, 0, tzinfo=UTC), submitted_at=datetime(2026, 5, 11, 13, 0, tzinfo=UTC), status="draft", approval_stage=None, risk_flags_json=[], ), ExpenseClaim( claim_no="EXP-TEST-003", employee_name="赵六", department_name="测试部", project_code="PRJ-TEST-03", expense_type="hotel", reason="历史住宿报销", location="南京", amount=Decimal("888.00"), currency="CNY", invoice_count=1, occurred_at=datetime(2026, 4, 20, 8, 30, tzinfo=UTC), submitted_at=datetime(2026, 4, 20, 9, 30, tzinfo=UTC), status="approved", approval_stage="completed", risk_flags_json=[], ), ExpenseClaim( claim_no="EXP-TEST-004", employee_name="张三", department_name="财务部", project_code="PRJ-OTHER-01", expense_type="hotel", reason="外地出差住宿", location="深圳", amount=Decimal("999.00"), currency="CNY", invoice_count=1, occurred_at=datetime(2026, 5, 12, 8, 30, tzinfo=UTC), submitted_at=datetime(2026, 5, 12, 9, 30, tzinfo=UTC), status="approved", approval_stage="completed", risk_flags_json=[], ), ] ) db.commit() response = client.post( "/api/v1/orchestrator/run", json={ "source": "user_message", "user_id": user_id, "message": "请查询我的报销单", "context_json": { "role_codes": ["employee"], "name": "赵六", "client_now_iso": "2026-05-13T08:00:00+00:00", }, }, ) assert response.status_code == 200 payload = response.json() assert payload["selected_agent"] == "user_agent" assert payload["status"] == "succeeded" assert "2026-05-04 至 2026-05-13的你的报销单" in payload["result"]["answer"] assert "共 2 笔" in payload["result"]["answer"] assert "超过 10 日的单据" in payload["result"]["answer"] assert payload["result"]["query_payload"]["record_count"] == 2 assert payload["result"]["query_payload"]["older_record_count"] == 1 assert payload["result"]["query_payload"]["window_start_date"] == "2026-05-04" assert payload["result"]["query_payload"]["window_end_date"] == "2026-05-13" assert [item["claim_no"] for item in payload["result"]["query_payload"]["records"]] == [ "EXP-TEST-002", "EXP-TEST-001", ] run_detail = client.get(f"/api/v1/agent-runs/{payload['run_id']}").json() tool_response = run_detail["tool_calls"][0]["response_json"] assert tool_response["record_count"] == 2 assert tool_response["total_amount"] == 420.0 assert tool_response["recent_window_applied"] is True assert tool_response["window_start_date"] == "2026-05-04" assert tool_response["window_end_date"] == "2026-05-13" assert tool_response["older_record_count"] == 1 assert tool_response["scoped_to_current_user"] is True assert tool_response["scope_label"] == "你的报销单" assert [item["claim_no"] for item in tool_response["records"]] == [ "EXP-TEST-002", "EXP-TEST-001", ] def test_orchestrator_non_finance_cannot_query_other_users_expense_claims() -> None: client, session_factory = build_client() user_id = "manager1@example.com" with session_factory() as db: owner = Employee( employee_no="E9101", name="李经理", email=user_id, ) other = Employee( employee_no="E9102", name="王同学", email="other@example.com", ) db.add_all([owner, other]) db.flush() db.add_all( [ ExpenseClaim( claim_no="EXP-MGR-001", employee_id=owner.id, employee_name="李经理", department_name="管理部", project_code="PRJ-MGR-01", expense_type="travel", reason="本人出差", location="上海", amount=Decimal("100.00"), currency="CNY", invoice_count=1, occurred_at=datetime(2026, 5, 11, 9, 0, tzinfo=UTC), submitted_at=datetime(2026, 5, 11, 10, 0, tzinfo=UTC), status="submitted", approval_stage="finance_review", risk_flags_json=[], ), ExpenseClaim( claim_no="EXP-MGR-002", employee_id=other.id, employee_name="王同学", department_name="销售部", project_code="PRJ-SALES-02", expense_type="meal", reason="他人报销", location="杭州", amount=Decimal("300.00"), currency="CNY", invoice_count=1, occurred_at=datetime(2026, 5, 11, 12, 0, tzinfo=UTC), submitted_at=datetime(2026, 5, 11, 13, 0, tzinfo=UTC), status="approved", approval_stage="completed", risk_flags_json=[], ), ] ) db.commit() response = client.post( "/api/v1/orchestrator/run", json={ "source": "user_message", "user_id": user_id, "message": "请查询王同学的报销单", "context_json": { "role_codes": ["manager"], "name": "李经理", "client_now_iso": "2026-05-13T08:00:00+00:00", }, }, ) assert response.status_code == 200 payload = response.json() assert payload["result"]["query_payload"]["record_count"] == 1 assert [item["claim_no"] for item in payload["result"]["query_payload"]["records"]] == [ "EXP-MGR-001", ] def test_orchestrator_non_finance_does_not_leak_duplicate_name_claims() -> None: client, session_factory = build_client() user_id = "zhangsan1@example.com" with session_factory() as db: employee_a = Employee( employee_no="E9201", name="张三", email=user_id, ) employee_b = Employee( employee_no="E9202", name="张三", email="zhangsan2@example.com", ) db.add_all([employee_a, employee_b]) db.flush() db.add_all( [ ExpenseClaim( claim_no="EXP-DUP-101", employee_id=employee_a.id, employee_name="张三", department_name="市场部", project_code="PRJ-A", expense_type="travel", reason="本人报销", location="上海", amount=Decimal("120.00"), currency="CNY", invoice_count=1, occurred_at=datetime(2026, 5, 12, 9, 0, tzinfo=UTC), submitted_at=datetime(2026, 5, 12, 10, 0, tzinfo=UTC), status="submitted", approval_stage="finance_review", risk_flags_json=[], ), ExpenseClaim( claim_no="EXP-DUP-102", employee_id=employee_b.id, employee_name="张三", department_name="销售部", project_code="PRJ-B", expense_type="meal", reason="他人报销", location="杭州", amount=Decimal("300.00"), currency="CNY", invoice_count=1, occurred_at=datetime(2026, 5, 12, 12, 0, tzinfo=UTC), submitted_at=datetime(2026, 5, 12, 13, 0, tzinfo=UTC), status="approved", approval_stage="completed", risk_flags_json=[], ), ] ) db.commit() response = client.post( "/api/v1/orchestrator/run", json={ "source": "user_message", "user_id": user_id, "message": "请查询我的报销单", "context_json": { "role_codes": ["manager"], "name": "张三", "client_now_iso": "2026-05-13T08:00:00+00:00", }, }, ) assert response.status_code == 200 payload = response.json() assert payload["result"]["query_payload"]["record_count"] == 1 assert [item["claim_no"] for item in payload["result"]["query_payload"]["records"]] == [ "EXP-DUP-101", ] def test_orchestrator_finance_can_query_all_expense_claims() -> None: client, session_factory = build_client() with session_factory() as db: db.add_all( [ ExpenseClaim( claim_no="EXP-FIN-001", employee_name="甲", department_name="A部", project_code="PRJ-A", expense_type="travel", reason="A 报销", location="上海", amount=Decimal("120.00"), currency="CNY", invoice_count=1, occurred_at=datetime(2026, 5, 11, 9, 0, tzinfo=UTC), submitted_at=datetime(2026, 5, 11, 10, 0, tzinfo=UTC), status="submitted", approval_stage="finance_review", risk_flags_json=[], ), ExpenseClaim( claim_no="EXP-FIN-002", employee_name="乙", department_name="B部", project_code="PRJ-B", expense_type="meal", reason="B 报销", location="杭州", amount=Decimal("300.00"), currency="CNY", invoice_count=1, occurred_at=datetime(2026, 5, 11, 12, 0, tzinfo=UTC), submitted_at=datetime(2026, 5, 11, 13, 0, tzinfo=UTC), status="approved", approval_stage="completed", risk_flags_json=[], ), ] ) db.commit() response = client.post( "/api/v1/orchestrator/run", json={ "source": "user_message", "user_id": "finance@example.com", "message": "请查询所有报销单", "context_json": { "role_codes": ["finance"], "name": "财务", "client_now_iso": "2026-05-13T08:00:00+00:00", }, }, ) assert response.status_code == 200 payload = response.json() assert payload["result"]["query_payload"]["record_count"] == 2 assert {item["claim_no"] for item in payload["result"]["query_payload"]["records"]} == { "EXP-FIN-001", "EXP-FIN-002", } def test_orchestrator_expense_query_claim_no_bypasses_recent_window() -> None: client, session_factory = build_client() user_id = "zhaoliu@example.com" with session_factory() as db: db.add( Employee( employee_no="E9301", name="赵六", email=user_id, ) ) db.add( ExpenseClaim( claim_no="EXP-202604-001", employee_name="赵六", department_name="测试部", project_code="PRJ-OLD-01", expense_type="travel", reason="上月差旅", location="北京", amount=Decimal("560.00"), currency="CNY", invoice_count=1, occurred_at=datetime(2026, 4, 1, 9, 0, tzinfo=UTC), submitted_at=datetime(2026, 4, 1, 18, 0, tzinfo=UTC), status="approved", approval_stage="completed", risk_flags_json=[], ) ) db.commit() response = client.post( "/api/v1/orchestrator/run", json={ "source": "user_message", "user_id": user_id, "message": "请查询报销单 EXP-202604-001", "context_json": { "role_codes": ["employee"], "name": "赵六", "client_now_iso": "2026-05-13T08:00:00+00:00", }, }, ) assert response.status_code == 200 payload = response.json() assert payload["result"]["query_payload"]["recent_window_applied"] is False assert payload["result"]["query_payload"]["record_count"] == 1 assert payload["result"]["query_payload"]["older_record_count"] == 0 assert payload["result"]["query_payload"]["records"][0]["claim_no"] == "EXP-202604-001" def test_orchestrator_routes_schedule_to_hermes() -> None: client, session_factory = build_client() with session_factory() as db: task = next( item for item in AgentAssetService(db).list_assets(asset_type="task", status="active") if item.code == "task.hermes.daily_risk_scan" ) response = client.post( "/api/v1/orchestrator/run", json={ "source": "schedule", "task_id": task.id, "context_json": {"role_codes": ["finance"]}, }, ) assert response.status_code == 200 payload = response.json() assert payload["selected_agent"] == "hermes" assert payload["status"] == "succeeded" assert payload["trace_summary"]["tool_count"] == 2 run_detail = client.get(f"/api/v1/agent-runs/{payload['run_id']}").json() assert run_detail["agent"] == "hermes" assert run_detail["route_json"]["selected_agent"] == "hermes" assert len(run_detail["tool_calls"]) == 2 def test_orchestrator_forbidden_request_does_not_call_downstream_agent() -> None: client, _ = build_client() response = client.post( "/api/v1/orchestrator/run", json={ "source": "user_message", "user_id": "pytest", "message": "帮我直接付款给供应商B", "context_json": {"role_codes": ["user"]}, }, ) assert response.status_code == 200 payload = response.json() assert payload["selected_agent"] is None assert payload["permission_level"] == "forbidden" assert payload["status"] == "blocked" assert payload["trace_summary"]["tool_count"] == 0 run_detail = client.get(f"/api/v1/agent-runs/{payload['run_id']}").json() assert run_detail["agent"] == "orchestrator" assert run_detail["tool_calls"] == [] def test_orchestrator_approval_required_returns_confirmation_result() -> None: client, _ = build_client() response = client.post( "/api/v1/orchestrator/run", json={ "source": "user_message", "user_id": "pytest", "message": "帮我安排付款给供应商B", "context_json": {"role_codes": ["finance"]}, }, ) assert response.status_code == 200 payload = response.json() assert payload["selected_agent"] == "user_agent" assert payload["permission_level"] == "approval_required" assert payload["requires_confirmation"] is True assert payload["status"] == "blocked" assert "确认" in payload["result"]["message"] def test_orchestrator_user_agent_draft_returns_structured_payload() -> None: client, session_factory = build_client() response = client.post( "/api/v1/orchestrator/run", json={ "source": "user_message", "user_id": "pytest", "message": "帮我生成张三4月差旅报销草稿", "context_json": {"role_codes": ["finance"]}, }, ) assert response.status_code == 200 payload = response.json() assert payload["selected_agent"] == "user_agent" assert payload["status"] == "succeeded" assert payload["result"]["draft_payload"]["confirmation_required"] is True assert payload["result"]["review_payload"]["slot_cards"] assert payload["result"]["draft_payload"]["claim_id"] assert payload["result"]["draft_payload"]["claim_no"].startswith("EXP-") assert payload["result"]["draft_payload"]["status"] == "draft" assert payload["result"]["suggested_actions"] with session_factory() as db: claim = db.scalar( select(ExpenseClaim).where( ExpenseClaim.id == payload["result"]["draft_payload"]["claim_id"] ) ) assert claim is not None assert claim.claim_no == payload["result"]["draft_payload"]["claim_no"] assert claim.status == "draft" assert claim.items def test_orchestrator_expense_next_step_submits_claim_to_approval() -> None: client, session_factory = build_client() user_id = "zhangsan@example.com" with session_factory() as db: db.add( Employee( employee_no="E3001", name="张三", email=user_id, ) ) db.commit() response = client.post( "/api/v1/orchestrator/run", json={ "source": "user_message", "user_id": user_id, "message": "帮我报销昨天去上海出差的交通费680元", "context_json": { "role_codes": ["employee"], "name": "张三", "department_name": "销售部", "attachment_names": ["didi-trip.png"], "attachment_count": 1, "review_action": "next_step", "review_form_values": { "reporter_name": "张三", "expense_type": "交通费", "amount": "680", "occurred_date": "2026-05-13", "location": "上海", "reason": "上海客户拜访交通" }, }, }, ) assert response.status_code == 200 payload = response.json() assert payload["status"] == "succeeded" assert payload["result"]["draft_payload"]["claim_no"].startswith("EXP-") assert payload["result"]["draft_payload"]["status"] == "submitted" assert payload["result"]["draft_payload"]["approval_stage"] == "AI验审" assert "已提交审批" in payload["result"]["answer"] with session_factory() as db: claim = db.scalar( select(ExpenseClaim).where( ExpenseClaim.id == payload["result"]["draft_payload"]["claim_id"] ) ) assert claim is not None assert claim.status == "submitted" assert claim.approval_stage == "AI验审" assert claim.submitted_at is not None def test_orchestrator_blocks_fourth_expense_draft_for_same_user() -> None: client, session_factory = build_client() user_id = "zhangsan@example.com" with session_factory() as db: db.add( Employee( employee_no="E1001", name="张三", email=user_id, ) ) db.commit() for amount, city in ((120, "上海"), (240, "北京"), (360, "深圳")): response = client.post( "/api/v1/orchestrator/run", json={ "source": "user_message", "user_id": user_id, "message": f"帮我生成报销草稿,我昨天去{city}出差,交通费{amount}元", "context_json": { "role_codes": ["finance"], "name": "张三", }, }, ) assert response.status_code == 200 payload = response.json() assert payload["result"]["draft_payload"]["claim_no"].startswith("EXP-") blocked_response = client.post( "/api/v1/orchestrator/run", json={ "source": "user_message", "user_id": user_id, "message": "帮我生成报销草稿,我昨天去杭州出差,交通费480元", "context_json": { "role_codes": ["finance"], "name": "张三", "review_action": "save_draft", }, }, ) assert blocked_response.status_code == 200 blocked_payload = blocked_response.json() assert blocked_payload["status"] == "succeeded" assert "你当前已保存 3 个草稿" in blocked_payload["result"]["answer"] assert blocked_payload["result"]["draft_payload"]["claim_id"] is None assert blocked_payload["result"]["draft_payload"]["claim_no"] is None assert blocked_payload["result"]["draft_payload"]["status"] == "blocked" with session_factory() as db: draft_count = db.scalar( select(func.count()) .select_from(ExpenseClaim) .where(ExpenseClaim.status == "draft") ) assert draft_count == 3 def test_orchestrator_allows_existing_draft_update_when_user_already_has_three_drafts() -> None: client, session_factory = build_client() user_id = "lisi@example.com" with session_factory() as db: db.add( Employee( employee_no="E1002", name="李四", email=user_id, ) ) db.commit() first_response = client.post( "/api/v1/orchestrator/run", json={ "source": "user_message", "user_id": user_id, "message": "帮我生成报销草稿,我昨天去上海出差,交通费120元", "context_json": { "role_codes": ["finance"], "name": "李四", }, }, ) assert first_response.status_code == 200 first_payload = first_response.json() claim_id = first_payload["result"]["draft_payload"]["claim_id"] conversation_id = first_payload["conversation_id"] assert claim_id assert conversation_id for amount, city in ((240, "北京"), (360, "深圳")): response = client.post( "/api/v1/orchestrator/run", json={ "source": "user_message", "user_id": user_id, "message": f"帮我生成报销草稿,我昨天去{city}出差,交通费{amount}元", "context_json": { "role_codes": ["finance"], "name": "李四", }, }, ) assert response.status_code == 200 payload = response.json() assert payload["result"]["draft_payload"]["claim_no"].startswith("EXP-") update_response = client.post( "/api/v1/orchestrator/run", json={ "source": "user_message", "user_id": user_id, "conversation_id": conversation_id, "message": "金额改成888元", "context_json": { "role_codes": ["finance"], "name": "李四", }, }, ) assert update_response.status_code == 200 update_payload = update_response.json() assert update_payload["result"]["draft_payload"]["claim_id"] == claim_id with session_factory() as db: claim = db.scalar(select(ExpenseClaim).where(ExpenseClaim.id == claim_id)) assert claim is not None assert float(claim.amount) == 888.0 draft_count = db.scalar( select(func.count()) .select_from(ExpenseClaim) .where(ExpenseClaim.employee_id == claim.employee_id) .where(ExpenseClaim.status == "draft") ) assert draft_count == 3 def test_orchestrator_persists_conversation_and_reuses_expense_draft_context() -> None: client, session_factory = build_client() first_response = client.post( "/api/v1/orchestrator/run", json={ "source": "user_message", "user_id": "pytest", "message": "帮我生成一份差旅报销草稿,我昨天去上海出差,交通费680元", "context_json": { "role_codes": ["finance"], "attachment_names": ["行程单.png"], "attachment_count": 1, "ocr_summary": "行程单金额680元", }, }, ) assert first_response.status_code == 200 first_payload = first_response.json() conversation_id = first_payload["conversation_id"] claim_id = first_payload["result"]["draft_payload"]["claim_id"] assert conversation_id assert claim_id second_response = client.post( "/api/v1/orchestrator/run", json={ "source": "user_message", "user_id": "pytest", "conversation_id": conversation_id, "message": "金额改成800元", "context_json": { "role_codes": ["finance"], }, }, ) assert second_response.status_code == 200 second_payload = second_response.json() assert second_payload["conversation_id"] == conversation_id assert second_payload["trace_summary"]["scenario"] == "expense" assert second_payload["trace_summary"]["intent"] == "draft" assert second_payload["result"]["draft_payload"]["claim_id"] == claim_id with session_factory() as db: claim = db.scalar(select(ExpenseClaim).where(ExpenseClaim.id == claim_id)) assert claim is not None assert float(claim.amount) == 800.0 conversation = db.scalar( select(AgentConversation).where(AgentConversation.conversation_id == conversation_id) ) assert conversation is not None assert conversation.draft_claim_id == claim_id assert conversation.last_scenario == "expense" assert conversation.last_intent == "draft" message_count = db.scalar( select(func.count()) .select_from(AgentConversationMessage) .where(AgentConversationMessage.conversation_id == conversation_id) ) assert message_count == 4 def test_orchestrator_does_not_reuse_conversation_when_user_changes() -> None: client, session_factory = build_client() first_response = client.post( "/api/v1/orchestrator/run", json={ "source": "user_message", "user_id": "user_a", "message": "帮我生成一份差旅报销草稿,我昨天去上海出差,交通费680元", "context_json": {"role_codes": ["finance"]}, }, ) assert first_response.status_code == 200 first_payload = first_response.json() first_conversation_id = first_payload["conversation_id"] assert first_conversation_id second_response = client.post( "/api/v1/orchestrator/run", json={ "source": "user_message", "user_id": "user_b", "conversation_id": first_conversation_id, "message": "查一下本周报销金额", "context_json": {"role_codes": ["finance"]}, }, ) assert second_response.status_code == 200 second_payload = second_response.json() assert second_payload["conversation_id"] assert second_payload["conversation_id"] != first_conversation_id with session_factory() as db: first_conversation = db.scalar( select(AgentConversation).where( AgentConversation.conversation_id == first_conversation_id ) ) second_conversation = db.scalar( select(AgentConversation).where( AgentConversation.conversation_id == second_payload["conversation_id"] ) ) assert first_conversation is not None assert second_conversation is not None assert first_conversation.user_id == "user_a" assert second_conversation.user_id == "user_b" def test_orchestrator_prunes_conversations_older_than_configured_retention_days() -> None: client, session_factory = build_client() expired_conversation_id = "conv_expired" expired_at = datetime.now(UTC) - timedelta(days=2) with session_factory() as db: settings_service = SettingsService(db) settings_payload = settings_service.get_settings_snapshot().model_dump() settings_payload["sessionForm"]["conversationRetentionDays"] = 1 settings_service.save_settings_snapshot(SettingsWrite(**settings_payload)) conversation = AgentConversation( conversation_id=expired_conversation_id, user_id="expired_user", source="user_message", state_json={}, message_count=1, created_at=expired_at, updated_at=expired_at, ) db.add(conversation) db.flush() db.add( AgentConversationMessage( conversation_id=expired_conversation_id, role="user", content="旧会话消息", created_at=expired_at, ) ) db.commit() response = client.post( "/api/v1/orchestrator/run", json={ "source": "user_message", "user_id": "fresh_user", "message": "查一下本周报销金额", "context_json": {"role_codes": ["finance"]}, }, ) assert response.status_code == 200 with session_factory() as db: conversation = db.scalar( select(AgentConversation).where( AgentConversation.conversation_id == expired_conversation_id ) ) message_count = db.scalar( select(func.count()) .select_from(AgentConversationMessage) .where(AgentConversationMessage.conversation_id == expired_conversation_id) ) assert conversation is None assert message_count == 0 def test_orchestrator_treats_expense_narrative_as_draft_instead_of_ar_query() -> None: client, _ = build_client() response = client.post( "/api/v1/orchestrator/run", json={ "source": "user_message", "user_id": "pytest", "message": "我今天去客户现场,招待了客户,花销了1000元", "context_json": {"role_codes": ["finance"]}, }, ) assert response.status_code == 200 payload = response.json() assert payload["selected_agent"] == "user_agent" assert payload["permission_level"] == "draft_write" assert payload["status"] == "blocked" assert payload["route_reason"] == "clarification_required" assert payload["trace_summary"]["scenario"] == "expense" assert payload["trace_summary"]["intent"] == "draft" assert payload["trace_summary"]["tool_count"] == 0 assert "应收场景数据" not in payload["result"]["message"] assert payload["result"]["message"].startswith("识别到您希望报销一笔“业务招待费”费用") review_payload = payload["result"]["review_payload"] assert review_payload["intent_summary"].startswith("识别到您希望报销一笔“业务招待费”费用。") assert review_payload["missing_slots"] == ["客户名称", "参与人员", "票据附件"] slot_map = {item["key"]: item for item in review_payload["slot_cards"]} assert slot_map["time_range"]["raw_value"] == "今天" assert slot_map["location"]["value"] == "客户现场" assert slot_map["amount"]["value"] == "1000.00元" def test_orchestrator_can_restore_latest_user_conversation() -> None: client, _ = build_client() first_response = client.post( "/api/v1/orchestrator/run", json={ "source": "user_message", "user_id": "restore_user", "message": "帮我生成一份差旅报销草稿,我昨天去上海出差,交通费680元", "context_json": { "role_codes": ["finance"], "attachment_names": ["行程单.png"], "attachment_count": 1, "ocr_summary": "行程单金额680元", }, }, ) assert first_response.status_code == 200 first_payload = first_response.json() second_response = client.post( "/api/v1/orchestrator/run", json={ "source": "user_message", "user_id": "restore_user", "conversation_id": first_payload["conversation_id"], "message": "金额改成800元", "context_json": {"role_codes": ["finance"]}, }, ) assert second_response.status_code == 200 restore_response = client.get( "/api/v1/orchestrator/conversations/latest", params={"user_id": "restore_user"}, ) assert restore_response.status_code == 200 restore_payload = restore_response.json() assert restore_payload["found"] is True assert restore_payload["conversation"]["conversation_id"] == first_payload["conversation_id"] assert restore_payload["conversation"]["draft_claim_id"] == first_payload["result"]["draft_payload"]["claim_id"] assert len(restore_payload["conversation"]["messages"]) == 4 assert restore_payload["conversation"]["messages"][0]["role"] == "user" assert restore_payload["conversation"]["messages"][0]["message_json"]["attachment_names"] == ["行程单.png"] assert restore_payload["conversation"]["messages"][1]["message_json"]["orchestrator_payload"]["run_id"] def test_orchestrator_restores_conversation_messages_in_sequence_order() -> None: client, session_factory = build_client() conversation_id = "conv_restore_sequence" created_at = datetime(2026, 5, 13, 13, 20, tzinfo=UTC) with session_factory() as db: conversation = AgentConversation( conversation_id=conversation_id, user_id="sequence_user", source="user_message", state_json={"session_type": "expense"}, message_count=4, created_at=created_at, updated_at=created_at, ) db.add(conversation) db.flush() db.add_all( [ AgentConversationMessage( id="msg-z-assistant", conversation_id=conversation_id, run_id="run-a", role="assistant", content="第二条:助手回复", message_json={"sequence": 2}, created_at=created_at, ), AgentConversationMessage( id="msg-b-user", conversation_id=conversation_id, run_id="run-b", role="user", content="第三条:用户追问", message_json={"sequence": 3}, created_at=created_at, ), AgentConversationMessage( id="msg-a-user", conversation_id=conversation_id, run_id="run-a", role="user", content="第一条:用户发起", message_json={"sequence": 1}, created_at=created_at, ), AgentConversationMessage( id="msg-c-assistant", conversation_id=conversation_id, run_id="run-b", role="assistant", content="第四条:助手总结", message_json={"sequence": 4}, created_at=created_at, ), ] ) db.commit() restore_response = client.get( "/api/v1/orchestrator/conversations/latest", params={"user_id": "sequence_user", "session_type": "expense"}, ) assert restore_response.status_code == 200 restore_payload = restore_response.json() assert restore_payload["found"] is True assert [item["content"] for item in restore_payload["conversation"]["messages"]] == [ "第一条:用户发起", "第二条:助手回复", "第三条:用户追问", "第四条:助手总结", ] def test_orchestrator_can_delete_all_user_conversations() -> None: client, session_factory = build_client() for message in ("查一下本周报销金额", "帮我生成差旅报销草稿"): response = client.post( "/api/v1/orchestrator/run", json={ "source": "user_message", "user_id": "delete_user", "message": message, "context_json": {"role_codes": ["finance"]}, }, ) assert response.status_code == 200 other_response = client.post( "/api/v1/orchestrator/run", json={ "source": "user_message", "user_id": "other_user", "message": "查一下供应商待付款", "context_json": {"role_codes": ["finance"]}, }, ) assert other_response.status_code == 200 delete_response = client.delete( "/api/v1/orchestrator/conversations", params={"user_id": "delete_user"}, ) assert delete_response.status_code == 200 delete_payload = delete_response.json() assert delete_payload["deleted_count"] == 2 with session_factory() as db: deleted_count = db.scalar( select(func.count()) .select_from(AgentConversation) .where(AgentConversation.user_id == "delete_user") ) other_count = db.scalar( select(func.count()) .select_from(AgentConversation) .where(AgentConversation.user_id == "other_user") ) assert deleted_count == 0 assert other_count == 1 def test_orchestrator_can_delete_current_user_single_conversation() -> None: client, session_factory = build_client() first_response = client.post( "/api/v1/orchestrator/run", json={ "source": "user_message", "user_id": "single_delete_user", "message": "查一下本周报销金额", "context_json": {"role_codes": ["finance"]}, }, ) second_response = client.post( "/api/v1/orchestrator/run", json={ "source": "user_message", "user_id": "single_delete_user", "message": "帮我生成差旅报销草稿", "context_json": {"role_codes": ["finance"]}, }, ) assert first_response.status_code == 200 assert second_response.status_code == 200 first_conversation_id = first_response.json()["conversation_id"] second_conversation_id = second_response.json()["conversation_id"] delete_response = client.delete( f"/api/v1/orchestrator/conversations/{first_conversation_id}", params={"user_id": "single_delete_user"}, ) assert delete_response.status_code == 200 assert delete_response.json()["deleted_count"] == 1 with session_factory() as db: remaining_ids = list( db.scalars( select(AgentConversation.conversation_id) .where(AgentConversation.user_id == "single_delete_user") .order_by(AgentConversation.created_at.asc()) ).all() ) assert first_conversation_id not in remaining_ids assert second_conversation_id in remaining_ids def test_orchestrator_can_delete_user_conversations_by_session_type() -> None: client, session_factory = build_client() expense_response = client.post( "/api/v1/orchestrator/run", json={ "source": "user_message", "user_id": "typed_delete_user", "message": "帮我生成差旅报销草稿", "context_json": { "role_codes": ["finance"], "session_type": "expense", }, }, ) knowledge_response = client.post( "/api/v1/orchestrator/run", json={ "source": "user_message", "user_id": "typed_delete_user", "message": "发票抬头不一致还能报销吗", "context_json": { "role_codes": ["finance"], "session_type": "knowledge", }, }, ) assert expense_response.status_code == 200 assert knowledge_response.status_code == 200 delete_response = client.delete( "/api/v1/orchestrator/conversations", params={ "user_id": "typed_delete_user", "session_type": "knowledge", }, ) assert delete_response.status_code == 200 assert delete_response.json()["deleted_count"] == 1 with session_factory() as db: remaining = list( db.scalars( select(AgentConversation) .where(AgentConversation.user_id == "typed_delete_user") .order_by(AgentConversation.created_at.asc()) ).all() ) assert len(remaining) == 1 remaining_session_type = str((remaining[0].state_json or {}).get("session_type") or "").strip() or "expense" assert remaining_session_type == "expense" def test_orchestrator_tool_failure_is_logged_and_degraded() -> None: client, _ = build_client() response = client.post( "/api/v1/orchestrator/run", json={ "source": "user_message", "user_id": "pytest", "message": "查一下本周报销金额", "context_json": { "role_codes": ["finance"], "simulate_tool_failure": "database", }, }, ) assert response.status_code == 200 payload = response.json() assert payload["selected_agent"] == "user_agent" assert payload["status"] == "succeeded" assert payload["trace_summary"]["failed_tool_count"] == 1 assert payload["trace_summary"]["degraded"] is True run_detail = client.get(f"/api/v1/agent-runs/{payload['run_id']}").json() assert run_detail["tool_calls"][0]["status"] == "failed" assert "simulated database failure" in run_detail["tool_calls"][0]["error_message"] def test_orchestrator_exception_is_written_to_agent_run() -> None: client, _ = build_client() response = client.post( "/api/v1/orchestrator/run", json={ "source": "user_message", "user_id": "pytest", "message": "查一下本周报销金额", "context_json": { "role_codes": ["finance"], "simulate_orchestrator_exception": True, }, }, ) assert response.status_code == 200 payload = response.json() assert payload["status"] == "failed" run_detail = client.get(f"/api/v1/agent-runs/{payload['run_id']}").json() assert run_detail["status"] == "failed" assert "simulated orchestrator exception" in run_detail["error_message"]