from __future__ import annotations import base64 from collections.abc import Generator from datetime import UTC, date, datetime from decimal import Decimal from fastapi.testclient import TestClient from sqlalchemy import create_engine from sqlalchemy.orm import Session, sessionmaker from sqlalchemy.pool import StaticPool from app.api.deps import get_db from app.db.base import Base from app.main import create_app from app.models.employee import Employee from app.models.financial_record import ExpenseClaim, ExpenseClaimItem from app.models.organization import OrganizationUnit from app.models.risk_observation import RiskObservation, RiskObservationFeedback from app.models.role import Role from app.schemas.ocr import OcrRecognizeBatchRead, OcrRecognizeDocumentRead from app.services.expense_claim_attachment_storage import ExpenseClaimAttachmentStorage from app.services.expense_claims import ExpenseClaimService from app.services.ocr import OcrService def build_session_factory() -> sessionmaker[Session]: engine = create_engine( "sqlite+pysqlite:///:memory:", connect_args={"check_same_thread": False}, poolclass=StaticPool, ) Base.metadata.create_all(bind=engine) return sessionmaker(bind=engine, autoflush=False, autocommit=False) def build_client() -> tuple[TestClient, sessionmaker[Session]]: session_factory = build_session_factory() app = create_app() def override_db() -> Generator[Session, None, None]: db = session_factory() try: yield db finally: db.close() app.dependency_overrides[get_db] = override_db return TestClient(app), session_factory def seed_claim(db: Session) -> tuple[ExpenseClaim, ExpenseClaimItem]: manager = Employee( id="mgr-1", employee_no="E20001", name="李总", email="manager@example.com", position="市场总监", grade="P7", ) role = Role( id="role-user", role_code="user", name="员工", description="普通员工", ) employee = Employee( id="emp-1", employee_no="E10001", name="张三", email="zhangsan@example.com", position="招商主管", grade="P4", manager=manager, roles=[role], ) claim = ExpenseClaim( id="claim-attachment-1", claim_no="EXP-202605-101", employee_id=employee.id, employee_name="张三", department_id="dept-1", department_name="市场部", project_code=None, expense_type="office", reason="办公用品采购", location="深圳南山", amount=Decimal("88.00"), currency="CNY", invoice_count=0, occurred_at=datetime(2026, 5, 13, tzinfo=UTC), submitted_at=None, status="draft", approval_stage="待提交", risk_flags_json=[], ) item = ExpenseClaimItem( id="item-attachment-1", claim_id=claim.id, item_date=date(2026, 5, 13), item_type="office", item_reason="办公用品采购", item_location="深圳南山", item_amount=Decimal("88.00"), invoice_id=None, ) claim.items = [item] db.add(manager) db.add(role) db.add(employee) db.add(claim) db.commit() return claim, item def test_claim_standard_adjustment_endpoint_recalculates_and_marks_reviewer_notice() -> None: client, session_factory = build_client() with session_factory() as db: claim, item = seed_claim(db) claim.expense_type = "hotel" claim.location = "北京" claim.amount = Decimal("1000.00") item.item_type = "hotel_ticket" item.item_reason = "北京住宿" item.item_location = "北京" item.item_amount = Decimal("1000.00") db.commit() claim_id = claim.id item_id = item.id response = client.post( f"/api/v1/reimbursements/claims/{claim_id}/standard-adjustment", json={ "risks": [ { "risk_id": "risk-hotel-endpoint-1", "item_id": item_id, "title": "住宿超标待说明", "risk": "住宿票据金额超过职级标准。", "application_days": 2, "original_amount": "1000.00", "reimbursable_amount": "1000.00", } ] }, headers={"x-auth-username": "emp-1", "x-auth-name": "Zhang San", "x-auth-grade": "P4"}, ) assert response.status_code == 200 payload = response.json() assert payload["amount"] == "900.00" standard_flag = next( flag for flag in payload["risk_flags_json"] if isinstance(flag, dict) and flag.get("source") == "reimbursement_standard_adjustment" ) assert standard_flag["original_amount"] == "1000.00" assert standard_flag["reimbursable_amount"] == "900.00" assert standard_flag["employee_absorbed_amount"] == "100.00" assert standard_flag["visibility_scope"] == "leader" def test_claim_item_attachment_upload_preview_and_delete(monkeypatch, tmp_path) -> None: def fake_recognize( self, files: list[tuple[str, bytes, str | None]], ) -> OcrRecognizeBatchRead: assert files[0][0] == "office-note.png" return OcrRecognizeBatchRead( total_file_count=1, success_count=1, documents=[ OcrRecognizeDocumentRead( filename="office-note.png", media_type="image/png", text="办公用品发票 金额88元 2026-05-13", summary="识别到办公用品发票,金额 88 元。", avg_score=0.98, line_count=1, page_count=1, warnings=[], ) ], ) monkeypatch.setattr(OcrService, "recognize_files", fake_recognize) monkeypatch.setattr(ExpenseClaimAttachmentStorage, "root", lambda self: tmp_path) client, session_factory = build_client() with session_factory() as db: claim, item = seed_claim(db) claim_id = claim.id item_id = item.id headers = {"x-auth-username": "emp-1", "x-auth-name": "Zhang San"} file_bytes = b"fake-image-bytes" upload_response = client.post( f"/api/v1/reimbursements/claims/{claim_id}/items/{item_id}/attachment", headers=headers, files=[("file", ("office-note.png", file_bytes, "image/png"))], ) assert upload_response.status_code == 200 upload_payload = upload_response.json() assert upload_payload["attachment"]["file_name"] == "office-note.png" assert upload_payload["attachment"]["analysis"]["label"] == "AI提示符合条件" assert upload_payload["attachment"]["document_info"]["document_type"] == "office_invoice" assert upload_payload["attachment"]["requirement_check"]["matches"] is True assert upload_payload["invoice_id"] assert upload_payload["item_type"] == "office" assert upload_payload["item_reason"] == "识别到办公用品发票,金额 88 元。" assert upload_payload["item_location"] == "深圳南山" assert upload_payload["item_date"] == "2026-05-13" assert upload_payload["item_amount"] == "88.00" meta_response = client.get( f"/api/v1/reimbursements/claims/{claim_id}/items/{item_id}/attachment/meta", headers=headers, ) assert meta_response.status_code == 200 meta_payload = meta_response.json() assert meta_payload["media_type"] == "image/png" assert meta_payload["preview_kind"] == "image" assert meta_payload["preview_url"].endswith(f"/reimbursements/claims/{claim_id}/items/{item_id}/attachment/preview") assert meta_payload["analysis"]["headline"] assert meta_payload["document_info"]["fields"][0]["label"] == "金额" preview_response = client.get( f"/api/v1/reimbursements/claims/{claim_id}/items/{item_id}/attachment/preview", headers=headers, ) assert preview_response.status_code == 200 assert preview_response.content == file_bytes content_response = client.get( f"/api/v1/reimbursements/claims/{claim_id}/items/{item_id}/attachment", headers=headers, ) assert content_response.status_code == 200 assert content_response.content == file_bytes delete_response = client.delete( f"/api/v1/reimbursements/claims/{claim_id}/items/{item_id}/attachment", headers=headers, ) assert delete_response.status_code == 200 assert delete_response.json()["invoice_id"] is None deleted_meta_response = client.get( f"/api/v1/reimbursements/claims/{claim_id}/items/{item_id}/attachment/meta", headers=headers, ) assert deleted_meta_response.status_code == 404 def test_claim_item_attachment_upload_flags_purpose_and_amount_mismatch(monkeypatch, tmp_path) -> None: def fake_recognize( self, files: list[tuple[str, bytes, str | None]], ) -> OcrRecognizeBatchRead: return OcrRecognizeBatchRead( total_file_count=1, success_count=1, documents=[ OcrRecognizeDocumentRead( filename="taxi-note.png", media_type="image/png", text="滴滴出行电子发票 金额120元 2026-05-13", summary="识别到交通出行发票,金额 120 元。", avg_score=0.97, line_count=1, page_count=1, warnings=[], ) ], ) monkeypatch.setattr(OcrService, "recognize_files", fake_recognize) monkeypatch.setattr(ExpenseClaimAttachmentStorage, "root", lambda self: tmp_path) client, session_factory = build_client() with session_factory() as db: claim, item = seed_claim(db) claim_id = claim.id item_id = item.id headers = {"x-auth-username": "emp-1", "x-auth-name": "Zhang San"} upload_response = client.post( f"/api/v1/reimbursements/claims/{claim_id}/items/{item_id}/attachment", headers=headers, files=[("file", ("taxi-note.png", b"fake-image-bytes", "image/png"))], ) assert upload_response.status_code == 200 analysis = upload_response.json()["attachment"]["analysis"] assert analysis["severity"] == "high" assert any("金额字段" in point for point in analysis["points"]) assert any("附件类型要求" in point for point in analysis["points"]) assert upload_response.json()["attachment"]["requirement_check"]["matches"] is False def test_claim_item_attachment_upload_flags_non_invoice_image_as_high_risk(monkeypatch, tmp_path) -> None: def fake_recognize( self, files: list[tuple[str, bytes, str | None]], ) -> OcrRecognizeBatchRead: return OcrRecognizeBatchRead( total_file_count=1, success_count=1, documents=[ OcrRecognizeDocumentRead( filename="random-image.png", media_type="image/png", text="", summary="", avg_score=0.0, line_count=0, page_count=1, warnings=[], ) ], ) monkeypatch.setattr(OcrService, "recognize_files", fake_recognize) monkeypatch.setattr(ExpenseClaimAttachmentStorage, "root", lambda self: tmp_path) client, session_factory = build_client() with session_factory() as db: claim, item = seed_claim(db) claim_id = claim.id item_id = item.id headers = {"x-auth-username": "emp-1", "x-auth-name": "Zhang San"} upload_response = client.post( f"/api/v1/reimbursements/claims/{claim_id}/items/{item_id}/attachment", headers=headers, files=[("file", ("random-image.png", b"fake-image-bytes", "image/png"))], ) assert upload_response.status_code == 200 analysis = upload_response.json()["attachment"]["analysis"] assert analysis["severity"] == "high" assert any("附件内容" in point for point in analysis["points"]) def test_approve_claim_endpoint_routes_direct_manager_claim_to_finance_review() -> None: client, session_factory = build_client() with session_factory() as db: manager = Employee( id="mgr-approve-1", employee_no="E21001", name="李经理", email="manager-approve-api@example.com", ) employee = Employee( id="emp-approve-1", employee_no="E11001", name="张三", email="zhangsan-approve-api@example.com", manager=manager, ) claim = ExpenseClaim( id="claim-approve-1", claim_no="EXP-APP-API-001", employee_id=employee.id, employee_name="张三", department_id="dept-1", department_name="市场部", project_code=None, expense_type="transport", reason="交通报销", location="上海", amount=Decimal("88.00"), currency="CNY", invoice_count=1, occurred_at=datetime(2026, 5, 13, tzinfo=UTC), submitted_at=datetime(2026, 5, 13, 10, 0, tzinfo=UTC), status="submitted", approval_stage="直属领导审批", risk_flags_json=[], ) db.add_all([manager, employee, claim]) db.commit() response = client.post( "/api/v1/reimbursements/claims/claim-approve-1/approve", json={"opinion": "情况属实,同意报销。"}, headers={ "X-Auth-Username": "manager-approve-api@example.com", "X-Auth-Name": "manager-approve-api@example.com", "X-Auth-Role-Codes": "manager", }, ) assert response.status_code == 200 payload = response.json() assert payload["status"] == "submitted" assert payload["approval_stage"] == "财务审批" assert any( item["source"] == "manual_approval" and item["opinion"] == "情况属实,同意报销。" and item["operator"] == "李经理" and item["next_approval_stage"] == "财务审批" for item in payload["risk_flags_json"] ) approval_events = [ item for item in payload["risk_flags_json"] if item["source"] == "manual_approval" ] assert approval_events[0]["operator"] == "李经理" assert "manager-approve-api@example.com" not in approval_events[0]["message"] def test_approve_application_endpoint_routes_direct_manager_review_to_budget_review() -> None: client, session_factory = build_client() with session_factory() as db: department = OrganizationUnit( id="dept-1", unit_code="DELIVERY-API", name="交付部", unit_type="department", ) budget_role = Role( id="role-budget-application-approve-1", role_code="budget_monitor", name="预算监控员", ) manager = Employee( id="mgr-application-approve-1", employee_no="E21002", name="李经理", email="manager-application-approve-api@example.com", organization_unit=department, ) budget_manager = Employee( id="budget-application-approve-1", employee_no="E31002", name="赵预算", email="budget-application-approve-api@example.com", grade="P8", organization_unit=department, roles=[budget_role], ) employee = Employee( id="emp-application-approve-1", employee_no="E11002", name="张三", email="zhangsan-application-approve-api@example.com", manager=manager, organization_unit=department, ) claim = ExpenseClaim( id="claim-application-approve-1", claim_no="APP-20260525-API001", employee_id=employee.id, employee_name="张三", department_id="dept-1", department_name="交付部", project_code=None, expense_type="travel_application", reason="支撑国网服务器上线部署", location="上海", amount=Decimal("12000.00"), currency="CNY", invoice_count=0, occurred_at=datetime(2026, 5, 25, tzinfo=UTC), submitted_at=datetime(2026, 5, 25, 10, 0, tzinfo=UTC), status="submitted", approval_stage="直属领导审批", risk_flags_json=[ { "source": "submission_review", "severity": "high", "label": "申请风险复核", "message": "申请金额和行程安排需要预算管理者二次确认。", } ], ) db.add_all([department, budget_role, manager, budget_manager, employee, claim]) db.commit() response = client.post( "/api/v1/reimbursements/claims/claim-application-approve-1/approve", json={"opinion": "业务必要,同意申请。"}, headers={ "X-Auth-Username": "manager-application-approve-api@example.com", "X-Auth-Name": "manager-application-approve-api@example.com", "X-Auth-Role-Codes": "manager", }, ) assert response.status_code == 200 payload = response.json() assert payload["status"] == "submitted" assert payload["approval_stage"] == "预算管理者审批" assert any( item["source"] == "manual_approval" and item["event_type"] == "expense_application_approval" and item["opinion"] == "业务必要,同意申请。" and item["operator"] == "李经理" and item["next_status"] == "submitted" and item["next_approval_stage"] == "预算管理者审批" and item["next_approver_name"] == "赵预算" for item in payload["risk_flags_json"] ) def test_claim_item_pdf_attachment_preview_returns_generated_image(monkeypatch, tmp_path) -> None: preview_bytes = b"fake-preview-png" preview_data_url = f"data:image/png;base64,{base64.b64encode(preview_bytes).decode('ascii')}" def fake_recognize( self, files: list[tuple[str, bytes, str | None]], ) -> OcrRecognizeBatchRead: return OcrRecognizeBatchRead( total_file_count=1, success_count=1, documents=[ OcrRecognizeDocumentRead( filename="invoice.pdf", media_type="application/pdf", text="滴滴出行电子发票 金额13.4元", summary="识别到交通票据,金额 13.4 元。", avg_score=0.96, line_count=1, page_count=1, document_type="taxi_receipt", document_type_label="出租车/网约车票据", scene_code="transport", scene_label="交通票据", preview_kind="image", preview_data_url=preview_data_url, warnings=[], ) ], ) monkeypatch.setattr(OcrService, "recognize_files", fake_recognize) monkeypatch.setattr(ExpenseClaimAttachmentStorage, "root", lambda self: tmp_path) client, session_factory = build_client() with session_factory() as db: claim, item = seed_claim(db) claim_id = claim.id item_id = item.id headers = {"x-auth-username": "emp-1", "x-auth-name": "Zhang San"} upload_response = client.post( f"/api/v1/reimbursements/claims/{claim_id}/items/{item_id}/attachment", headers=headers, files=[("file", ("invoice.pdf", b"%PDF-1.4 fake", "application/pdf"))], ) assert upload_response.status_code == 200 meta_payload = upload_response.json()["attachment"] assert meta_payload["preview_kind"] == "image" assert meta_payload["preview_url"].endswith(f"/reimbursements/claims/{claim_id}/items/{item_id}/attachment/preview") preview_response = client.get( f"/api/v1/reimbursements/claims/{claim_id}/items/{item_id}/attachment/preview", headers=headers, ) assert preview_response.status_code == 200 assert preview_response.headers["content-type"].startswith("image/png") assert preview_response.content == preview_bytes def test_claim_item_delete_removes_item_and_attachment(monkeypatch, tmp_path) -> None: def fake_recognize( self, files: list[tuple[str, bytes, str | None]], ) -> OcrRecognizeBatchRead: return OcrRecognizeBatchRead( total_file_count=1, success_count=1, documents=[ OcrRecognizeDocumentRead( filename="office-note.png", media_type="image/png", text="办公用品发票 金额88元 2026-05-13", summary="识别到办公用品发票,金额 88 元。", avg_score=0.98, line_count=1, page_count=1, warnings=[], ) ], ) monkeypatch.setattr(OcrService, "recognize_files", fake_recognize) monkeypatch.setattr(ExpenseClaimAttachmentStorage, "root", lambda self: tmp_path) client, session_factory = build_client() with session_factory() as db: claim, item = seed_claim(db) claim_id = claim.id item_id = item.id headers = {"x-auth-username": "emp-1", "x-auth-name": "Zhang San"} upload_response = client.post( f"/api/v1/reimbursements/claims/{claim_id}/items/{item_id}/attachment", headers=headers, files=[("file", ("office-note.png", b"fake-image-bytes", "image/png"))], ) assert upload_response.status_code == 200 assert (tmp_path / claim_id / item_id).exists() delete_response = client.delete( f"/api/v1/reimbursements/claims/{claim_id}/items/{item_id}", headers=headers, ) assert delete_response.status_code == 200 assert delete_response.json()["item_id"] == item_id assert not (tmp_path / claim_id / item_id).exists() detail_response = client.get( f"/api/v1/reimbursements/claims/{claim_id}", headers=headers, ) assert detail_response.status_code == 200 detail_payload = detail_response.json() assert detail_payload["items"] == [] assert detail_payload["invoice_count"] == 0 assert detail_payload["employee_position"] == "招商主管" assert detail_payload["employee_grade"] == "P4" assert detail_payload["manager_name"] == "李总" assert detail_payload["role_labels"] == ["员工"] deleted_meta_response = client.get( f"/api/v1/reimbursements/claims/{claim_id}/items/{item_id}/attachment/meta", headers=headers, ) assert deleted_meta_response.status_code == 404 def test_claim_delete_allows_draft_owner_by_employee_id_without_employee_no_header(monkeypatch, tmp_path) -> None: monkeypatch.setattr(ExpenseClaimAttachmentStorage, "root", lambda self: tmp_path) client, session_factory = build_client() with session_factory() as db: claim, _ = seed_claim(db) observation = RiskObservation( id="risk-observation-delete-1", observation_key="claim-delete-risk-observation-1", subject_type="expense_claim", subject_key=claim.id, subject_label=claim.claim_no, claim_id=claim.id, claim_no=claim.claim_no, risk_type="policy", risk_signal="draft_pre_review", title="草稿预审风险", description="删除草稿时应同步清理关联风险观察。", risk_score=70, risk_level="medium", confidence_score=0.8, ) feedback = RiskObservationFeedback( id="risk-observation-feedback-delete-1", observation=observation, feedback_type="confirm", actor="auditor", ) db.add(observation) db.add(feedback) db.commit() claim_id = claim.id response = client.delete( f"/api/v1/reimbursements/claims/{claim_id}", headers={"x-auth-username": "emp-1", "x-auth-name": "Browser Session User"}, ) assert response.status_code == 200 payload = response.json() assert payload["claim_id"] == claim_id assert payload["status"] == "deleted" with session_factory() as db: assert db.get(ExpenseClaim, claim_id) is None assert db.get(RiskObservation, "risk-observation-delete-1") is None assert db.get(RiskObservationFeedback, "risk-observation-feedback-delete-1") is None