from __future__ import annotations from collections.abc import Generator from datetime import UTC, datetime from decimal import Decimal from fastapi.testclient import TestClient from sqlalchemy import create_engine from sqlalchemy.orm import Session, sessionmaker from sqlalchemy.pool import StaticPool from app.api.deps import get_db from app.db.base import Base from app.main import create_app from app.models.budget import BudgetAllocation, BudgetTransaction from app.models.employee import Employee from app.models.financial_record import ExpenseClaim from app.models.organization import OrganizationUnit from app.models.role import Role def build_session_factory() -> sessionmaker[Session]: engine = create_engine( "sqlite+pysqlite:///:memory:", connect_args={"check_same_thread": False}, poolclass=StaticPool, ) Base.metadata.create_all(bind=engine) return sessionmaker(bind=engine, autoflush=False, autocommit=False) def build_client() -> tuple[TestClient, sessionmaker[Session]]: session_factory = build_session_factory() app = create_app() def override_db() -> Generator[Session, None, None]: db = session_factory() try: yield db finally: db.close() app.dependency_overrides[get_db] = override_db return TestClient(app), session_factory def seed_budget_allocations(db: Session) -> None: now = datetime.now(UTC) db.add_all( [ BudgetAllocation( id="budget-market-travel", budget_no="BUD-MARKET-TRAVEL", fiscal_year=2026, period_type="quarter", period_key="2026Q2", department_id="dept-market", department_name="市场部", cost_center="CC-4100", project_code=None, subject_code="travel", subject_name="差旅费", original_amount=Decimal("50000.00"), adjusted_amount=Decimal("0.00"), status="active", warning_threshold=Decimal("80.00"), control_action="block", created_at=now, updated_at=now, ), BudgetAllocation( id="budget-finance-office", budget_no="BUD-FINANCE-OFFICE", fiscal_year=2026, period_type="quarter", period_key="2026Q2", department_id="dept-finance", department_name="财务部", cost_center="CC-2100", project_code=None, subject_code="office", subject_name="办公费", original_amount=Decimal("30000.00"), adjusted_amount=Decimal("0.00"), status="active", warning_threshold=Decimal("80.00"), control_action="block", created_at=now, updated_at=now, ), BudgetAllocation( id="budget-market-software-hidden", budget_no="BUD-MARKET-SOFTWARE", fiscal_year=2026, period_type="quarter", period_key="2026Q2", department_id="dept-market", department_name="市场部", cost_center="CC-4100", project_code=None, subject_code="software", subject_name="软件服务费", original_amount=Decimal("90000.00"), adjusted_amount=Decimal("0.00"), status="active", warning_threshold=Decimal("80.00"), control_action="block", created_at=now, updated_at=now, ), ] ) db.commit() def seed_market_budget_monitor(db: Session) -> tuple[Role, OrganizationUnit]: role = Role(role_code="budget_monitor", name="预算监控员") department = OrganizationUnit( id="dept-market", unit_code="MARKET-DEPT", name="市场部", unit_type="department", ) employee = Employee( employee_no="E-BUDGET-MARKET-P8", name="赵预算", email="budget-monitor@example.com", grade="P8", organization_unit=department, roles=[role], ) db.add_all([role, department, employee]) db.flush() return role, department def test_admin_can_view_all_budget_allocations_without_is_admin_header() -> None: client, session_factory = build_client() with session_factory() as db: seed_budget_allocations(db) response = client.get( "/api/v1/budgets/summary", headers={"x-auth-username": "admin", "x-auth-role-codes": "manager"}, ) assert response.status_code == 200 payload = response.json() assert {item["department_name"] for item in payload["allocations"]} == {"市场部", "财务部"} assert {item["subject_code"] for item in payload["allocations"]} == {"travel", "office"} assert payload["warnings"] == [] assert [item["period_key"] for item in payload["trend"]] == ["2026Q2"] assert Decimal(payload["trend"][0]["total_amount"]) == Decimal("80000.00") def test_budget_summary_returns_real_trend_and_warnings() -> None: client, session_factory = build_client() now = datetime.now(UTC) with session_factory() as db: seed_budget_allocations(db) db.add( BudgetAllocation( id="budget-market-travel-q1", budget_no="BUD-MARKET-TRAVEL-Q1", fiscal_year=2026, period_type="quarter", period_key="2026Q1", department_id="dept-market", department_name="市场部", cost_center="CC-4100", project_code=None, subject_code="travel", subject_name="差旅费", original_amount=Decimal("40000.00"), adjusted_amount=Decimal("0.00"), status="active", warning_threshold=Decimal("80.00"), control_action="block", created_at=now, updated_at=now, ) ) db.add_all( [ BudgetTransaction( transaction_no="BTX-MARKET-TRAVEL-Q1", allocation_id="budget-market-travel-q1", source_type="claim", source_id="claim-q1", source_no="CLM-Q1", transaction_type="consume", amount=Decimal("12000.00"), before_available_amount=Decimal("40000.00"), after_available_amount=Decimal("28000.00"), operator="tester", reason="Q1 真实发生额", created_at=now, ), BudgetTransaction( transaction_no="BTX-MARKET-TRAVEL-Q2", allocation_id="budget-market-travel", source_type="claim", source_id="claim-q2", source_no="CLM-Q2", transaction_type="consume", amount=Decimal("41000.00"), before_available_amount=Decimal("50000.00"), after_available_amount=Decimal("9000.00"), operator="tester", reason="Q2 真实发生额", created_at=now, ), ] ) db.commit() response = client.get( "/api/v1/budgets/summary?year=2026&period=2026Q2&cost_center=CC-4100", headers={"x-auth-username": "admin", "x-auth-role-codes": "manager"}, ) assert response.status_code == 200 payload = response.json() assert [item["subject_code"] for item in payload["allocations"]] == ["travel"] assert [item["period_key"] for item in payload["trend"]] == ["2026Q1", "2026Q2"] assert Decimal(payload["trend"][0]["used_amount"]) == Decimal("12000.00") assert Decimal(payload["trend"][1]["used_amount"]) == Decimal("41000.00") assert payload["warning_count"] == 1 assert payload["over_budget_count"] == 0 assert len(payload["warnings"]) == 1 warning = payload["warnings"][0] assert warning["subject_code"] == "travel" assert warning["severity"] == "warn" assert Decimal(warning["usage_rate"]) == Decimal("82.00") def test_budget_monitor_is_limited_to_own_department_scope() -> None: client, session_factory = build_client() with session_factory() as db: seed_budget_allocations(db) response = client.get( "/api/v1/budgets/summary?cost_center=CC-2100", headers={ "x-auth-username": "monitor@example.com", "x-auth-role-codes": "budget_monitor", "x-auth-cost-center": "CC-4100", }, ) assert response.status_code == 200 payload = response.json() assert [item["cost_center"] for item in payload["allocations"]] == ["CC-4100"] assert [item["subject_code"] for item in payload["allocations"]] == ["travel"] def test_finance_user_cannot_enter_budget_center() -> None: client, session_factory = build_client() with session_factory() as db: seed_budget_allocations(db) response = client.get( "/api/v1/budgets/summary", headers={"x-auth-username": "finance@example.com", "x-auth-role-codes": "finance"}, ) assert response.status_code == 403 def test_budget_monitor_cannot_edit_and_admin_can_edit() -> None: client, session_factory = build_client() with session_factory() as db: seed_budget_allocations(db) payload = { "fiscal_year": 2026, "period_type": "quarter", "period_key": "2026Q2", "department_id": "dept-sales", "department_name": "销售部", "cost_center": "CC-5100", "project_code": None, "subject_code": "travel", "subject_name": "差旅费", "original_amount": "20000.00", "warning_threshold": "80.00", "control_action": "block", "description": "销售部差旅预算", } monitor_response = client.post( "/api/v1/budgets/allocations", json=payload, headers={ "x-auth-username": "monitor@example.com", "x-auth-role-codes": "budget_monitor", "x-auth-cost-center": "CC-4100", }, ) assert monitor_response.status_code == 403 admin_response = client.post( "/api/v1/budgets/allocations", json=payload, headers={"x-auth-username": "admin", "x-auth-role-codes": "manager"}, ) assert admin_response.status_code == 201 assert admin_response.json()["department_name"] == "销售部" def test_budget_analysis_endpoint_is_limited_to_budget_roles() -> None: client, session_factory = build_client() with session_factory() as db: seed_budget_allocations(db) budget_role, market_department = seed_market_budget_monitor(db) p6_budget_monitor = Employee( employee_no="E-BUDGET-MARKET-P6", name="低级预算", email="p6-budget-monitor@example.com", grade="P6", organization_unit=market_department, roles=[budget_role], ) db.add(p6_budget_monitor) db.flush() claim = ExpenseClaim( claim_no="APP-BUDGET-ANALYSIS-001", employee_id=p6_budget_monitor.id, employee_name="低级预算", department_id="dept-market", department_name="市场部", project_code=None, expense_type="travel_application", reason="客户现场交付预算申请", location="上海", amount=Decimal("12000.00"), currency="CNY", invoice_count=0, occurred_at=datetime(2026, 5, 12, 9, 0, tzinfo=UTC), submitted_at=datetime(2026, 5, 12, 10, 0, tzinfo=UTC), status="submitted", approval_stage="预算管理者审批", risk_flags_json=[], ) db.add(claim) db.commit() claim_id = claim.id ordinary_response = client.get( f"/api/v1/reimbursements/claims/{claim_id}/budget-analysis", headers={ "x-auth-username": "zhangsan@example.com", "x-auth-role-codes": "employee", }, ) monitor_response = client.get( f"/api/v1/reimbursements/claims/{claim_id}/budget-analysis", headers={ "x-auth-username": "budget-monitor@example.com", "x-auth-role-codes": "budget_monitor", }, ) p6_monitor_response = client.get( f"/api/v1/reimbursements/claims/{claim_id}/budget-analysis", headers={ "x-auth-username": "p6-budget-monitor@example.com", "x-auth-role-codes": "budget_monitor", }, ) assert ordinary_response.status_code == 403 assert p6_monitor_response.status_code == 403 assert monitor_response.status_code == 200 assert Decimal(monitor_response.json()["metrics"]["claim_amount_ratio"]) == Decimal("24.00")