feat: 统一后端分页查询与前端服务层适配

后端新增通用分页模块,为报销单、员工、预算、agent 资产等
端点统一接入分页参数和游标查询,优化 repository 层分页实
现,前端服务层适配分页响应结构,完善预算图表和全局样式,
优化侧边栏和企业选择器组件,引入 Element Plus 插件注册。
This commit is contained in:
caoxiaozhu
2026-05-29 14:11:06 +08:00
parent e080105f9f
commit 678f64d772
43 changed files with 1863 additions and 378 deletions

View File

@@ -0,0 +1,26 @@
from __future__ import annotations
from typing import Annotated, Any
from fastapi import Query
from app.services.pagination import PageResult
PageNumber = Annotated[int | None, Query(ge=1, description="页码,从 1 开始。")]
PageSize = Annotated[int | None, Query(ge=1, le=100, description="每页条数,最多 100。")]
def wants_page(page: int | None, page_size: int | None) -> bool:
return page is not None or page_size is not None
def page_payload(result: PageResult[Any]) -> dict[str, Any]:
return {
"items": result.items,
"total": result.total,
"page": result.page,
"page_size": result.page_size,
"total_pages": result.total_pages,
"has_next": result.has_next,
"has_previous": result.has_previous,
}

View File

@@ -14,6 +14,7 @@ from app.api.deps import (
require_rule_editor_user,
require_rule_reviewer_user,
)
from app.api.pagination import PageNumber, PageSize, page_payload, wants_page
from app.db.session import get_session_factory
from app.schemas.agent_asset import (
AgentAssetCreate,
@@ -43,7 +44,7 @@ from app.schemas.agent_asset import (
AgentAssetVersionRead,
AgentAssetVersionTimelineItemRead,
)
from app.schemas.common import ErrorResponse
from app.schemas.common import ErrorResponse, PaginatedResponse
from app.services.agent_assets import AgentAssetService
from app.services.risk_rule_generation_jobs import RiskRuleGenerationJobService
@@ -94,7 +95,7 @@ def _complete_risk_rule_generation_task(
@router.get(
"",
response_model=list[AgentAssetListItem],
response_model=list[AgentAssetListItem] | PaginatedResponse[AgentAssetListItem],
summary="查询 Agent 资产列表",
description="按资产类型、状态、领域和关键字筛选规则、技能、MCP 与任务资产。",
)
@@ -116,8 +117,22 @@ def list_agent_assets(
str | None,
Query(description="资产编码、名称关键字模糊查询。"),
] = None,
) -> list[AgentAssetListItem]:
return AgentAssetService(db).list_assets(
page: PageNumber = None,
page_size: PageSize = None,
) -> list[AgentAssetListItem] | PaginatedResponse[AgentAssetListItem]:
service = AgentAssetService(db)
if wants_page(page, page_size):
return page_payload(
service.list_assets_page(
asset_type=asset_type,
status=status_value,
domain=domain,
keyword=keyword,
page=page,
page_size=page_size,
)
)
return service.list_assets(
asset_type=asset_type,
status=status_value,
domain=domain,

View File

@@ -4,8 +4,7 @@ from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy import func, or_, select
from sqlalchemy.orm import selectinload
from sqlalchemy.orm import Session
from sqlalchemy.orm import Session, selectinload
from app.api.deps import (
CurrentUserContext,
@@ -15,8 +14,9 @@ from app.api.deps import (
require_budget_editor_user,
require_budget_viewer_user,
)
from app.models.employee import Employee
from app.api.pagination import PageNumber, PageSize, page_payload, wants_page
from app.models.budget import BudgetAllocation
from app.models.employee import Employee
from app.schemas.budget import (
BudgetAllocationCreate,
BudgetAllocationRead,
@@ -27,7 +27,7 @@ from app.schemas.budget import (
BudgetSummaryRead,
BudgetTransactionRead,
)
from app.schemas.common import ErrorResponse
from app.schemas.common import ErrorResponse, PaginatedResponse
from app.services.budget import BudgetControlError, BudgetService
router = APIRouter(prefix="/budgets")
@@ -67,7 +67,7 @@ def get_budget_summary(
@router.get(
"/allocations",
response_model=list[BudgetAllocationRead],
response_model=list[BudgetAllocationRead] | PaginatedResponse[BudgetAllocationRead],
summary="查询预算额度列表",
)
def list_budget_allocations(
@@ -78,7 +78,9 @@ def list_budget_allocations(
department_id: str | None = None,
department_name: str | None = None,
cost_center: str | None = None,
) -> list[BudgetAllocationRead]:
page: PageNumber = None,
page_size: PageSize = None,
) -> list[BudgetAllocationRead] | PaginatedResponse[BudgetAllocationRead]:
scope = _resolve_budget_query_scope(
db,
current_user,
@@ -86,7 +88,18 @@ def list_budget_allocations(
department_name=department_name,
cost_center=cost_center,
)
return BudgetService(db).list_allocations(
service = BudgetService(db)
if wants_page(page, page_size):
return page_payload(
service.list_allocations_page(
fiscal_year=fiscal_year,
period_key=period_key,
**scope,
page=page,
page_size=page_size,
)
)
return service.list_allocations(
fiscal_year=fiscal_year,
period_key=period_key,
**scope,
@@ -119,20 +132,30 @@ def create_budget_allocation(
@router.get(
"/allocations/{allocation_id}/transactions",
response_model=list[BudgetTransactionRead],
response_model=list[BudgetTransactionRead] | PaginatedResponse[BudgetTransactionRead],
summary="读取预算交易台账",
)
def list_budget_transactions(
allocation_id: str,
db: DbSession,
current_user: BudgetViewer,
) -> list[BudgetTransactionRead]:
allocation = BudgetService(db).get_allocation_row(allocation_id)
page: PageNumber = None,
page_size: PageSize = None,
) -> list[BudgetTransactionRead] | PaginatedResponse[BudgetTransactionRead]:
service = BudgetService(db)
allocation = service.get_allocation_row(allocation_id)
if allocation is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="预算额度不存在。")
if not _allocation_visible_to_user(db, current_user, allocation):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="不能查看其他部门预算流水。")
return BudgetService(db).list_transactions(allocation_id)
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="不能查看其他部门预算流水。",
)
if wants_page(page, page_size):
return page_payload(
service.list_transactions_page(allocation_id, page=page, page_size=page_size)
)
return service.list_transactions(allocation_id)
@router.post(

View File

@@ -7,7 +7,8 @@ from fastapi.responses import Response
from sqlalchemy.orm import Session
from app.api.deps import get_db
from app.schemas.common import ErrorResponse
from app.api.pagination import PageNumber, PageSize, page_payload, wants_page
from app.schemas.common import ErrorResponse, PaginatedResponse
from app.schemas.employee import (
EmployeeCreate,
EmployeeImportResultRead,
@@ -16,6 +17,7 @@ from app.schemas.employee import (
EmployeeUpdate,
)
from app.services.employee import EmployeeService
from app.services.employee_pagination import EmployeePaginationService
router = APIRouter()
DbSession = Annotated[Session, Depends(get_db)]
@@ -33,7 +35,7 @@ def get_employee_meta(db: DbSession) -> EmployeeMetaRead:
@router.get(
"",
response_model=list[EmployeeRead],
response_model=list[EmployeeRead] | PaginatedResponse[EmployeeRead],
summary="查询员工列表",
description="按状态和关键字筛选员工目录。",
)
@@ -47,7 +49,18 @@ def list_employees(
str | None,
Query(description="姓名、工号、邮箱等关键字模糊查询。"),
] = None,
) -> list[EmployeeRead]:
page: PageNumber = None,
page_size: PageSize = None,
) -> list[EmployeeRead] | PaginatedResponse[EmployeeRead]:
if wants_page(page, page_size):
return page_payload(
EmployeePaginationService(db).list_employees_page(
status=status_filter,
keyword=keyword,
page=page,
page_size=page_size,
)
)
return EmployeeService(db).list_employees(status=status_filter, keyword=keyword)

View File

@@ -7,8 +7,9 @@ from fastapi.responses import FileResponse
from sqlalchemy.orm import Session
from app.api.deps import CurrentUserContext, get_current_user, get_db
from app.api.pagination import PageNumber, PageSize, page_payload, wants_page
from app.schemas.budget import BudgetClaimAnalysisRead
from app.schemas.common import ErrorResponse
from app.schemas.common import ErrorResponse, PaginatedResponse
from app.schemas.reimbursement import (
ExpenseClaimAttachmentActionResponse,
ExpenseClaimActionResponse,
@@ -25,8 +26,8 @@ from app.schemas.reimbursement import (
TravelReimbursementCalculatorRequest,
TravelReimbursementCalculatorResponse,
)
from app.services.expense_claims import ExpenseClaimService
from app.services.budget import BudgetService
from app.services.expense_claims import ExpenseClaimService
from app.services.reimbursement import ReimbursementService
from app.services.travel_reimbursement_calculator import TravelReimbursementCalculatorService
@@ -37,12 +38,19 @@ CurrentUser = Annotated[CurrentUserContext, Depends(get_current_user)]
@router.get(
"",
response_model=list[ReimbursementRead],
response_model=list[ReimbursementRead] | PaginatedResponse[ReimbursementRead],
summary="查询报销申请列表",
description="返回当前系统中的报销申请列表。",
)
def list_reimbursements(db: DbSession) -> list[ReimbursementRead]:
return ReimbursementService(db).list_reimbursements()
def list_reimbursements(
db: DbSession,
page: PageNumber = None,
page_size: PageSize = None,
) -> list[ReimbursementRead] | PaginatedResponse[ReimbursementRead]:
service = ReimbursementService(db)
if wants_page(page, page_size):
return page_payload(service.list_reimbursements_page(page=page, page_size=page_size))
return service.list_reimbursements()
@router.post(
@@ -81,32 +89,60 @@ def calculate_travel_reimbursement(
@router.get(
"/claims",
response_model=list[ExpenseClaimRead],
response_model=list[ExpenseClaimRead] | PaginatedResponse[ExpenseClaimRead],
summary="查询个人报销单列表",
description="返回当前登录用户可见的真实个人报销单据列表。",
)
def list_expense_claims(db: DbSession, current_user: CurrentUser) -> list[ExpenseClaimRead]:
return ExpenseClaimService(db).list_claims(current_user)
def list_expense_claims(
db: DbSession,
current_user: CurrentUser,
page: PageNumber = None,
page_size: PageSize = None,
) -> list[ExpenseClaimRead] | PaginatedResponse[ExpenseClaimRead]:
service = ExpenseClaimService(db)
if wants_page(page, page_size):
return page_payload(service.list_claims_page(current_user, page=page, page_size=page_size))
return service.list_claims(current_user)
@router.get(
"/claims/approvals",
response_model=list[ExpenseClaimRead],
response_model=list[ExpenseClaimRead] | PaginatedResponse[ExpenseClaimRead],
summary="查询当前用户审批待办报销单列表",
description="返回当前登录用户有权处理的待审批报销单据,不混入个人报销列表。",
)
def list_expense_claim_approvals(db: DbSession, current_user: CurrentUser) -> list[ExpenseClaimRead]:
return ExpenseClaimService(db).list_approval_claims(current_user)
def list_expense_claim_approvals(
db: DbSession,
current_user: CurrentUser,
page: PageNumber = None,
page_size: PageSize = None,
) -> list[ExpenseClaimRead] | PaginatedResponse[ExpenseClaimRead]:
service = ExpenseClaimService(db)
if wants_page(page, page_size):
return page_payload(
service.list_approval_claims_page(current_user, page=page, page_size=page_size)
)
return service.list_approval_claims(current_user)
@router.get(
"/claims/archives",
response_model=list[ExpenseClaimRead],
response_model=list[ExpenseClaimRead] | PaginatedResponse[ExpenseClaimRead],
summary="查询归档中心报销单列表",
description="返回公司已归档入账的报销单据,供财务与审计角色集中查阅。",
)
def list_archived_expense_claims(db: DbSession, current_user: CurrentUser) -> list[ExpenseClaimRead]:
return ExpenseClaimService(db).list_archived_claims(current_user)
def list_archived_expense_claims(
db: DbSession,
current_user: CurrentUser,
page: PageNumber = None,
page_size: PageSize = None,
) -> list[ExpenseClaimRead] | PaginatedResponse[ExpenseClaimRead]:
service = ExpenseClaimService(db)
if wants_page(page, page_size):
return page_payload(
service.list_archived_claims_page(current_user, page=page, page_size=page_size)
)
return service.list_archived_claims(current_user)
@router.get(

View File

@@ -9,20 +9,21 @@ from app.models.agent_asset import (
AgentAssetTestRun,
AgentAssetVersion,
)
from app.services.pagination import PageResult, paginate_select
class AgentAssetRepository:
def __init__(self, db: Session) -> None:
self.db = db
def list(
def _list_stmt(
self,
*,
asset_type: str | None = None,
status: str | None = None,
domain: str | None = None,
keyword: str | None = None,
) -> list[AgentAsset]:
):
stmt = select(AgentAsset)
if asset_type:
@@ -42,8 +43,42 @@ class AgentAssetRepository:
)
stmt = stmt.order_by(AgentAsset.updated_at.desc(), AgentAsset.created_at.desc())
return stmt
def list(
self,
*,
asset_type: str | None = None,
status: str | None = None,
domain: str | None = None,
keyword: str | None = None,
) -> list[AgentAsset]:
stmt = self._list_stmt(
asset_type=asset_type,
status=status,
domain=domain,
keyword=keyword,
)
return list(self.db.scalars(stmt).all())
def list_page(
self,
*,
asset_type: str | None = None,
status: str | None = None,
domain: str | None = None,
keyword: str | None = None,
page: int | None,
page_size: int | None,
) -> PageResult[AgentAsset]:
stmt = self._list_stmt(
asset_type=asset_type,
status=status,
domain=domain,
keyword=keyword,
)
return paginate_select(self.db, stmt, page=page, page_size=page_size)
def get(self, asset_id: str) -> AgentAsset | None:
return self.db.get(AgentAsset, asset_id)

View File

@@ -6,13 +6,14 @@ from sqlalchemy.orm import Session, selectinload
from app.models.employee import Employee
from app.models.organization import OrganizationUnit
from app.models.role import Role
from app.services.pagination import PageResult, paginate_select
class EmployeeRepository:
def __init__(self, db: Session) -> None:
self.db = db
def list(self, status: str | None = None, keyword: str | None = None) -> list[Employee]:
def _list_stmt(self, status: str | None = None, keyword: str | None = None):
stmt = (
select(Employee)
.options(
@@ -38,8 +39,23 @@ class EmployeeRepository:
)
)
return stmt
def list(self, status: str | None = None, keyword: str | None = None) -> list[Employee]:
stmt = self._list_stmt(status=status, keyword=keyword)
return list(self.db.execute(stmt).scalars().unique().all())
def list_page(
self,
*,
status: str | None = None,
keyword: str | None = None,
page: int | None,
page_size: int | None,
) -> PageResult[Employee]:
stmt = self._list_stmt(status=status, keyword=keyword)
return paginate_select(self.db, stmt, page=page, page_size=page_size, unique=True)
def get(self, employee_id: str) -> Employee | None:
stmt = (
select(Employee)

View File

@@ -1,14 +1,27 @@
from sqlalchemy import select
from sqlalchemy.orm import Session
from app.models.reimbursement import ReimbursementRequest
from app.services.pagination import PageResult, paginate_select
class ReimbursementRepository:
def __init__(self, db: Session) -> None:
self.db = db
def _list_stmt(self):
return select(ReimbursementRequest).order_by(ReimbursementRequest.created_at.desc())
def list(self) -> list[ReimbursementRequest]:
return self.db.query(ReimbursementRequest).order_by(ReimbursementRequest.created_at.desc()).all()
return list(self.db.scalars(self._list_stmt()).all())
def list_page(
self,
*,
page: int | None,
page_size: int | None,
) -> PageResult[ReimbursementRequest]:
return paginate_select(self.db, self._list_stmt(), page=page, page_size=page_size)
def get(self, request_id: str) -> ReimbursementRequest | None:
return self.db.query(ReimbursementRequest).filter(ReimbursementRequest.id == request_id).first()

View File

@@ -1,12 +1,26 @@
from __future__ import annotations
from typing import Generic, TypeVar
from pydantic import BaseModel
T = TypeVar("T")
class ErrorResponse(BaseModel):
detail: str
class PaginatedResponse(BaseModel, Generic[T]):
items: list[T]
total: int
page: int
page_size: int
total_pages: int
has_next: bool
has_previous: bool
class RootStatusRead(BaseModel):
message: str

View File

@@ -37,6 +37,7 @@ from app.services.agent_asset_spreadsheet_helpers import AgentAssetSpreadsheetHe
from app.services.agent_asset_timeline import AgentAssetTimelineMixin
from app.services.agent_foundation import AgentFoundationService
from app.services.audit import AuditLogService
from app.services.pagination import PageResult
from app.services.risk_rule_score_backfill import backfill_missing_risk_rule_score
logger = get_logger("app.services.agent_assets")
@@ -75,6 +76,32 @@ class AgentAssetService(
version_stats = self._collect_version_stats(assets)
return [self._serialize_list_item(asset, version_stats.get(asset.id)) for asset in assets]
def list_assets_page(
self,
*,
asset_type: str | None = None,
status: str | None = None,
domain: str | None = None,
keyword: str | None = None,
page: int | None,
page_size: int | None,
) -> PageResult[AgentAssetListItem]:
self._ensure_ready()
if asset_type in {None, "", AgentAssetType.RULE.value}:
self.sync_platform_risk_rules_from_library()
result = self.repository.list_page(
asset_type=asset_type,
status=status,
domain=domain,
keyword=keyword,
page=page,
page_size=page_size,
)
version_stats = self._collect_version_stats(result.items)
return result.map(
lambda asset: self._serialize_list_item(asset, version_stats.get(asset.id))
)
def get_asset(self, asset_id: str) -> AgentAssetRead | None:
self._ensure_ready()
asset = self.repository.get(asset_id)

View File

@@ -21,11 +21,12 @@ from app.schemas.budget import (
BudgetTransactionRead,
)
from app.services.budget_expense_control import BudgetExpenseControlModel
from app.services.budget_pagination import BudgetPaginationMixin
from app.services.budget_support import BudgetSupportMixin
from app.services.budget_types import BudgetControlError, SUPPORTED_BUDGET_SUBJECT_CODES
from app.services.budget_types import BudgetControlError
class BudgetService(BudgetSupportMixin):
class BudgetService(BudgetPaginationMixin, BudgetSupportMixin):
def __init__(self, db: Session) -> None:
self.db = db
@@ -46,22 +47,13 @@ class BudgetService(BudgetSupportMixin):
cost_center: str | None = None,
) -> list[BudgetAllocationRead]:
self.ensure_budget_ready()
stmt = select(BudgetAllocation).order_by(
BudgetAllocation.fiscal_year.desc(),
BudgetAllocation.period_key.asc(),
BudgetAllocation.department_name.asc(),
BudgetAllocation.subject_code.asc(),
).where(BudgetAllocation.subject_code.in_(SUPPORTED_BUDGET_SUBJECT_CODES))
if fiscal_year is not None:
stmt = stmt.where(BudgetAllocation.fiscal_year == fiscal_year)
if period_key:
stmt = stmt.where(BudgetAllocation.period_key == period_key)
if department_id:
stmt = stmt.where(BudgetAllocation.department_id == department_id)
if department_name:
stmt = stmt.where(BudgetAllocation.department_name == department_name)
if cost_center:
stmt = stmt.where(BudgetAllocation.cost_center == cost_center)
stmt = self.build_allocation_stmt(
fiscal_year=fiscal_year,
period_key=period_key,
department_id=department_id,
department_name=department_name,
cost_center=cost_center,
)
return [self.serialize_allocation(row) for row in self.db.scalars(stmt).all()]
def get_summary(

View File

@@ -0,0 +1,83 @@
from __future__ import annotations
from sqlalchemy import select
from app.models.budget import BudgetAllocation, BudgetTransaction
from app.schemas.budget import BudgetAllocationRead, BudgetTransactionRead
from app.services.budget_types import SUPPORTED_BUDGET_SUBJECT_CODES
from app.services.pagination import PageResult, paginate_select
class BudgetPaginationMixin:
def build_allocation_stmt(
self,
*,
fiscal_year: int | None = None,
period_key: str | None = None,
department_id: str | None = None,
department_name: str | None = None,
cost_center: str | None = None,
):
stmt = select(BudgetAllocation).where(
BudgetAllocation.subject_code.in_(SUPPORTED_BUDGET_SUBJECT_CODES)
)
if fiscal_year is not None:
stmt = stmt.where(BudgetAllocation.fiscal_year == fiscal_year)
if period_key:
stmt = stmt.where(BudgetAllocation.period_key == period_key)
if department_id:
stmt = stmt.where(BudgetAllocation.department_id == department_id)
if department_name:
stmt = stmt.where(BudgetAllocation.department_name == department_name)
if cost_center:
stmt = stmt.where(BudgetAllocation.cost_center == cost_center)
return stmt.order_by(
BudgetAllocation.fiscal_year.desc(),
BudgetAllocation.period_key.asc(),
BudgetAllocation.department_name.asc(),
BudgetAllocation.subject_code.asc(),
)
def list_allocations_page(
self,
*,
fiscal_year: int | None = None,
period_key: str | None = None,
department_id: str | None = None,
department_name: str | None = None,
cost_center: str | None = None,
page: int | None,
page_size: int | None,
) -> PageResult[BudgetAllocationRead]:
self.ensure_budget_ready()
result = paginate_select(
self.db,
self.build_allocation_stmt(
fiscal_year=fiscal_year,
period_key=period_key,
department_id=department_id,
department_name=department_name,
cost_center=cost_center,
),
page=page,
page_size=page_size,
)
return result.map(self.serialize_allocation)
def list_transactions_page(
self,
allocation_id: str,
*,
page: int | None,
page_size: int | None,
) -> PageResult[BudgetTransactionRead]:
self.ensure_budget_ready()
result = paginate_select(
self.db,
select(BudgetTransaction)
.where(BudgetTransaction.allocation_id == allocation_id)
.order_by(BudgetTransaction.created_at.desc()),
page=page,
page_size=page_size,
)
return result.map(BudgetTransactionRead.model_validate)

View File

@@ -0,0 +1,39 @@
from __future__ import annotations
from sqlalchemy.orm import Session
from app.core.logging import get_logger
from app.schemas.employee import EmployeeRead
from app.services.employee import EmployeeService
from app.services.pagination import PageResult
logger = get_logger("app.services.employee")
class EmployeePaginationService:
def __init__(self, db: Session) -> None:
self.service = EmployeeService(db)
def list_employees_page(
self,
*,
status: str | None = None,
keyword: str | None = None,
page: int | None,
page_size: int | None,
) -> PageResult[EmployeeRead]:
self.service.ensure_directory_ready()
result = self.service.repository.list_page(
status=status,
keyword=keyword,
page=page,
page_size=page_size,
)
logger.info(
"Listed employees page (count=%d, total=%d, page=%d, page_size=%d)",
len(result.items),
result.total,
result.page,
result.page_size,
)
return result.map(self.service._serialize_employee)

View File

@@ -0,0 +1,61 @@
from __future__ import annotations
from sqlalchemy import select
from sqlalchemy.orm import selectinload
from app.api.deps import CurrentUserContext
from app.models.employee import Employee
from app.models.financial_record import ExpenseClaim
from app.services.pagination import PageResult, paginate_select
class ExpenseClaimPaginationMixin:
def _claim_list_stmt(self):
return select(ExpenseClaim).options(
selectinload(ExpenseClaim.items),
selectinload(ExpenseClaim.employee).selectinload(Employee.manager),
selectinload(ExpenseClaim.employee).selectinload(Employee.roles),
)
def list_claims_page(
self,
current_user: CurrentUserContext,
*,
page: int | None,
page_size: int | None,
) -> PageResult[ExpenseClaim]:
stmt = self._claim_list_stmt().order_by(
ExpenseClaim.created_at.desc(),
ExpenseClaim.occurred_at.desc(),
)
stmt = self._access_policy.apply_claim_scope(stmt, current_user)
return paginate_select(self.db, stmt, page=page, page_size=page_size)
def list_approval_claims_page(
self,
current_user: CurrentUserContext,
*,
page: int | None,
page_size: int | None,
) -> PageResult[ExpenseClaim]:
stmt = self._claim_list_stmt().order_by(
ExpenseClaim.submitted_at.desc(),
ExpenseClaim.created_at.desc(),
)
stmt = self._access_policy.apply_approval_claim_scope(stmt, current_user)
return paginate_select(self.db, stmt, page=page, page_size=page_size)
def list_archived_claims_page(
self,
current_user: CurrentUserContext,
*,
page: int | None,
page_size: int | None,
) -> PageResult[ExpenseClaim]:
stmt = self._claim_list_stmt().order_by(
ExpenseClaim.updated_at.desc(),
ExpenseClaim.submitted_at.desc(),
ExpenseClaim.created_at.desc(),
)
stmt = self._access_policy.apply_archived_claim_scope(stmt, current_user)
return paginate_select(self.db, stmt, page=page, page_size=page_size)

View File

@@ -48,6 +48,7 @@ from app.services.expense_claim_document_parsing import ExpenseClaimDocumentPars
from app.services.expense_claim_draft_flow import ExpenseClaimDraftFlowMixin
from app.services.expense_claim_draft_persistence import ExpenseClaimDraftPersistenceMixin
from app.services.expense_claim_errors import ExpenseClaimSubmissionBlockedError
from app.services.expense_claim_pagination import ExpenseClaimPaginationMixin
from app.services.expense_claim_ontology_resolvers import ExpenseClaimOntologyResolverMixin
from app.services.expense_claim_read_model import ExpenseClaimReadModelMixin
from app.services.expense_claim_review_preview import ExpenseClaimReviewPreviewMixin
@@ -128,6 +129,7 @@ from app.services.ocr import OcrService
class ExpenseClaimService(
ExpenseClaimPaginationMixin,
ExpenseClaimApprovalFlowMixin,
ExpenseClaimApplicationHandoffMixin,
ExpenseClaimBudgetFlowMixin,

View File

@@ -0,0 +1,95 @@
from __future__ import annotations
from collections.abc import Callable
from dataclasses import dataclass
from math import ceil
from typing import Any, Generic, TypeVar
from sqlalchemy import func, select
from sqlalchemy.orm import Session
from sqlalchemy.sql import Select
T = TypeVar("T")
U = TypeVar("U")
DEFAULT_PAGE = 1
DEFAULT_PAGE_SIZE = 20
MAX_PAGE_SIZE = 100
@dataclass(frozen=True)
class PageParams:
page: int
page_size: int
@property
def offset(self) -> int:
return (self.page - 1) * self.page_size
@dataclass(frozen=True)
class PageResult(Generic[T]):
items: list[T]
total: int
page: int
page_size: int
@property
def total_pages(self) -> int:
if self.total <= 0:
return 0
return ceil(self.total / self.page_size)
@property
def has_next(self) -> bool:
return self.page < self.total_pages
@property
def has_previous(self) -> bool:
return self.page > 1 and self.total_pages > 0
def map(self, mapper: Callable[[T], U]) -> PageResult[U]:
return PageResult(
items=[mapper(item) for item in self.items],
total=self.total,
page=self.page,
page_size=self.page_size,
)
def normalize_page_params(
page: int | None,
page_size: int | None,
*,
max_page_size: int = MAX_PAGE_SIZE,
) -> PageParams:
normalized_page = max(DEFAULT_PAGE, int(page or DEFAULT_PAGE))
normalized_page_size = max(1, int(page_size or DEFAULT_PAGE_SIZE))
normalized_page_size = min(normalized_page_size, max_page_size)
return PageParams(page=normalized_page, page_size=normalized_page_size)
def paginate_select(
db: Session,
stmt: Select[Any],
*,
page: int | None,
page_size: int | None,
max_page_size: int = MAX_PAGE_SIZE,
unique: bool = False,
) -> PageResult[Any]:
params = normalize_page_params(page, page_size, max_page_size=max_page_size)
count_stmt = select(func.count()).select_from(stmt.order_by(None).subquery())
total = int(db.scalar(count_stmt) or 0)
page_stmt = stmt.limit(params.page_size).offset(params.offset)
scalars = db.execute(page_stmt).scalars()
if unique:
scalars = scalars.unique()
return PageResult(
items=list(scalars.all()),
total=total,
page=params.page,
page_size=params.page_size,
)

View File

@@ -4,6 +4,7 @@ from app.core.logging import get_logger
from app.models.reimbursement import ReimbursementRequest
from app.repositories.reimbursement import ReimbursementRepository
from app.schemas.reimbursement import ReimbursementCreate
from app.services.pagination import PageResult
logger = get_logger("app.services.reimbursement")
@@ -17,6 +18,22 @@ class ReimbursementService:
logger.info("Listed reimbursements (count=%d)", len(items))
return items
def list_reimbursements_page(
self,
*,
page: int | None,
page_size: int | None,
) -> PageResult[ReimbursementRequest]:
result = self.repository.list_page(page=page, page_size=page_size)
logger.info(
"Listed reimbursements page (count=%d, total=%d, page=%d, page_size=%d)",
len(result.items),
result.total,
result.page,
result.page_size,
)
return result
def get_reimbursement(self, request_id: str) -> ReimbursementRequest | None:
request = self.repository.get(request_id)
if request: