refactor(server): 单号规则收紧为 A/R/D+8 位紧凑格式

- DOCUMENT_NUMBER_PREFIXES 改为 A/R/D,新增短格式与旧格式正则并存识别,提取正则加边界锚定避免误匹配
- build_document_number 去掉时间戳段,统一生成 A+token 等紧凑单号,is_application_claim_no 兼容旧 AP-/APP- 前缀
- access_policy/status_registry/reimbursements/expense_claims/budget_support 统一复用 is_application_claim_no 判定申请单
- 同步 document_numbering 单元测试覆盖新旧两种格式
This commit is contained in:
caoxiaozhu
2026-06-20 21:44:06 +08:00
parent 96c2e1099a
commit 47c6a4bb73
6 changed files with 77 additions and 33 deletions

View File

@@ -23,6 +23,7 @@ from app.services.budget_types import (
SUBJECT_CODE_ALIASES, SUBJECT_CODE_ALIASES,
SUPPORTED_BUDGET_SUBJECT_CODES, SUPPORTED_BUDGET_SUBJECT_CODES,
) )
from app.services.document_numbering import is_application_claim_no
from app.services.expense_claim_constants import EXPENSE_TYPE_LABELS from app.services.expense_claim_constants import EXPENSE_TYPE_LABELS
from app.services.expense_claim_risk_stage import enrich_risk_flag_semantics from app.services.expense_claim_risk_stage import enrich_risk_flag_semantics
from app.services.expense_type_keywords import resolve_expense_type_code_from_text from app.services.expense_type_keywords import resolve_expense_type_code_from_text
@@ -349,7 +350,11 @@ class BudgetSupportMixin:
def _reservation_source_type_from_claim(claim: ExpenseClaim) -> str: def _reservation_source_type_from_claim(claim: ExpenseClaim) -> str:
claim_no = str(claim.claim_no or "").strip().upper() claim_no = str(claim.claim_no or "").strip().upper()
expense_type = str(claim.expense_type or "").strip().lower() expense_type = str(claim.expense_type or "").strip().lower()
if claim_no.startswith(("AP-", "APP-")) or expense_type == "application" or expense_type.endswith("_application"): if (
is_application_claim_no(claim_no)
or expense_type == "application"
or expense_type.endswith("_application")
):
return "application" return "application"
return "claim" return "claim"

View File

@@ -2,7 +2,7 @@ from __future__ import annotations
import re import re
import secrets import secrets
from datetime import UTC, datetime from datetime import datetime
from typing import Callable, Literal from typing import Callable, Literal
from sqlalchemy import select from sqlalchemy import select
@@ -13,20 +13,29 @@ from app.models.financial_record import ExpenseClaim
DocumentNumberKind = Literal["application", "reimbursement", "audit"] DocumentNumberKind = Literal["application", "reimbursement", "audit"]
DOCUMENT_NUMBER_PREFIXES: dict[DocumentNumberKind, str] = { DOCUMENT_NUMBER_PREFIXES: dict[DocumentNumberKind, str] = {
"application": "AP", "application": "A",
"reimbursement": "RE", "reimbursement": "R",
"audit": "AD", "audit": "D",
} }
DOCUMENT_NUMBER_TOKEN_ALPHABET = "ABCDEFGHJKLMNPQRSTUVWXYZ23456789" DOCUMENT_NUMBER_TOKEN_ALPHABET = "ABCDEFGHJKLMNPQRSTUVWXYZ23456789"
DOCUMENT_NUMBER_TOKEN_LENGTH = 8 DOCUMENT_NUMBER_TOKEN_LENGTH = 8
DOCUMENT_NUMBER_SHORT_BODY = (
rf"(?:A|R|D)[{DOCUMENT_NUMBER_TOKEN_ALPHABET}]{{{DOCUMENT_NUMBER_TOKEN_LENGTH}}}"
)
DOCUMENT_NUMBER_LEGACY_BODY = (
rf"(?:AP|RE|AD)-\d{{14}}-[{DOCUMENT_NUMBER_TOKEN_ALPHABET}]{{{DOCUMENT_NUMBER_TOKEN_LENGTH}}}"
)
DOCUMENT_NUMBER_PATTERN = re.compile( DOCUMENT_NUMBER_PATTERN = re.compile(
rf"^(?:AP|RE|AD)-\d{{14}}-[{DOCUMENT_NUMBER_TOKEN_ALPHABET}]{{{DOCUMENT_NUMBER_TOKEN_LENGTH}}}$", rf"^(?:{DOCUMENT_NUMBER_SHORT_BODY}|{DOCUMENT_NUMBER_LEGACY_BODY})$",
flags=re.IGNORECASE, flags=re.IGNORECASE,
) )
DOCUMENT_NUMBER_EXTRACT_PATTERN = re.compile( DOCUMENT_NUMBER_EXTRACT_PATTERN = re.compile(
rf"(?:AP|RE|AD)-\d{{14}}-[{DOCUMENT_NUMBER_TOKEN_ALPHABET}]{{{DOCUMENT_NUMBER_TOKEN_LENGTH}}}" rf"(?<![A-Z0-9])(?:"
rf"{DOCUMENT_NUMBER_SHORT_BODY}"
rf"|{DOCUMENT_NUMBER_LEGACY_BODY}"
r"|APP-\d{8}-[A-Z0-9]{6}" r"|APP-\d{8}-[A-Z0-9]{6}"
r"|EXP-\d{6}-\d{3}", r"|EXP-\d{6}-\d{3}"
r")(?![A-Z0-9])",
flags=re.IGNORECASE, flags=re.IGNORECASE,
) )
@@ -45,16 +54,13 @@ def build_document_number(
token: str | None = None, token: str | None = None,
) -> str: ) -> str:
prefix = DOCUMENT_NUMBER_PREFIXES[kind] prefix = DOCUMENT_NUMBER_PREFIXES[kind]
generated_at = timestamp or datetime.now(UTC)
if generated_at.tzinfo is None:
generated_at = generated_at.replace(tzinfo=UTC)
normalized_token = (token or generate_document_token()).strip().upper() normalized_token = (token or generate_document_token()).strip().upper()
if not re.fullmatch( if not re.fullmatch(
rf"[{DOCUMENT_NUMBER_TOKEN_ALPHABET}]{{{DOCUMENT_NUMBER_TOKEN_LENGTH}}}", rf"[{DOCUMENT_NUMBER_TOKEN_ALPHABET}]{{{DOCUMENT_NUMBER_TOKEN_LENGTH}}}",
normalized_token, normalized_token,
): ):
raise ValueError("document number token must be 8 chars from the configured alphabet") raise ValueError("document number token must be 8 chars from the configured alphabet")
return f"{prefix}-{generated_at.astimezone(UTC):%Y%m%d%H%M%S}-{normalized_token}" return f"{prefix}{normalized_token}"
def generate_unique_expense_claim_no( def generate_unique_expense_claim_no(
@@ -83,4 +89,9 @@ def generate_unique_expense_claim_no(
def is_application_claim_no(value: object) -> bool: def is_application_claim_no(value: object) -> bool:
normalized = str(value or "").strip().upper() normalized = str(value or "").strip().upper()
return normalized.startswith(("AP-", "APP-")) return bool(
re.fullmatch(
rf"A[{DOCUMENT_NUMBER_TOKEN_ALPHABET}]{{{DOCUMENT_NUMBER_TOKEN_LENGTH}}}",
normalized,
)
) or normalized.startswith(("AP-", "APP-"))

View File

@@ -11,6 +11,7 @@ from app.models.employee import Employee
from app.models.financial_record import ExpenseClaim from app.models.financial_record import ExpenseClaim
from app.models.organization import OrganizationUnit from app.models.organization import OrganizationUnit
from app.models.role import Role from app.models.role import Role
from app.services.document_numbering import is_application_claim_no
from app.services.expense_claim_workflow_constants import ( from app.services.expense_claim_workflow_constants import (
APPLICATION_ARCHIVE_STAGE, APPLICATION_ARCHIVE_STAGE,
ARCHIVE_ACCOUNTING_STAGE, ARCHIVE_ACCOUNTING_STAGE,
@@ -42,6 +43,14 @@ class ExpenseClaimAccessPolicy:
def __init__(self, db: Session) -> None: def __init__(self, db: Session) -> None:
self.db = db self.db = db
@staticmethod
def _build_application_claim_no_condition(claim_no: Any) -> Any:
return or_(
claim_no.like("AP-%"),
claim_no.like("APP-%"),
claim_no.like("A________"),
)
@staticmethod @staticmethod
def has_privileged_claim_access(current_user: CurrentUserContext) -> bool: def has_privileged_claim_access(current_user: CurrentUserContext) -> bool:
if current_user.is_admin: if current_user.is_admin:
@@ -61,8 +70,7 @@ class ExpenseClaimAccessPolicy:
normalized_type = func.lower(func.coalesce(ExpenseClaim.expense_type, "")) normalized_type = func.lower(func.coalesce(ExpenseClaim.expense_type, ""))
claim_no = func.upper(func.coalesce(ExpenseClaim.claim_no, "")) claim_no = func.upper(func.coalesce(ExpenseClaim.claim_no, ""))
application_condition = or_( application_condition = or_(
claim_no.like("AP-%"), ExpenseClaimAccessPolicy._build_application_claim_no_condition(claim_no),
claim_no.like("APP-%"),
normalized_type == "application", normalized_type == "application",
normalized_type.like("%\\_application", escape="\\"), normalized_type.like("%\\_application", escape="\\"),
) )
@@ -101,9 +109,9 @@ class ExpenseClaimAccessPolicy:
normalized_status = str(claim.status or "").strip().lower() normalized_status = str(claim.status or "").strip().lower()
stage = str(claim.approval_stage or "").strip() stage = str(claim.approval_stage or "").strip()
normalized_type = str(claim.expense_type or "").strip().lower() normalized_type = str(claim.expense_type or "").strip().lower()
claim_no = str(claim.claim_no or "").strip().upper() claim_no = str(claim.claim_no or "").strip()
is_application_claim = ( is_application_claim = (
claim_no.startswith(("AP-", "APP-")) is_application_claim_no(claim_no)
or normalized_type == "application" or normalized_type == "application"
or normalized_type.endswith("_application") or normalized_type.endswith("_application")
) )
@@ -715,8 +723,9 @@ class ExpenseClaimAccessPolicy:
"%\\_application", "%\\_application",
escape="\\", escape="\\",
), ),
~func.upper(func.coalesce(ExpenseClaim.claim_no, "")).like("AP-%"), ~self._build_application_claim_no_condition(
~func.upper(func.coalesce(ExpenseClaim.claim_no, "")).like("APP-%"), func.upper(func.coalesce(ExpenseClaim.claim_no, ""))
),
~self.build_archived_claim_condition(), ~self.build_archived_claim_condition(),
) )
conditions.append(company_reimbursement_condition) conditions.append(company_reimbursement_condition)

View File

@@ -14,6 +14,7 @@ from app.services.expense_claim_workflow_constants import (
PAYMENT_PAID_STAGE, PAYMENT_PAID_STAGE,
PAYMENT_PENDING_STAGE, PAYMENT_PENDING_STAGE,
) )
from app.services.document_numbering import is_application_claim_no
@dataclass(frozen=True, slots=True) @dataclass(frozen=True, slots=True)
@@ -158,7 +159,7 @@ def is_application_claim_reference(
normalized_no = str(claim_no or "").strip().upper() normalized_no = str(claim_no or "").strip().upper()
normalized_type = str(expense_type or "").strip().lower() normalized_type = str(expense_type or "").strip().lower()
return ( return (
normalized_no.startswith(("AP-", "APP-")) is_application_claim_no(normalized_no)
or normalized_type == "application" or normalized_type == "application"
or normalized_type.endswith("_application") or normalized_type.endswith("_application")
) )

View File

@@ -862,16 +862,13 @@ class ExpenseClaimService(
if claim is None: if claim is None:
return None return None
if not self._access_policy.has_claim_delete_access(current_user):
raise ValueError("只有 admin 管理员可以删除单据。")
if self._access_policy.is_archived_claim(claim) and not current_user.is_admin: if self._access_policy.is_archived_claim(claim) and not current_user.is_admin:
raise ValueError("已归档单据不能删除,只有高级管理员可以执行删除。") raise ValueError("已归档单据不能删除,只有高级管理员可以执行删除。")
if not self._access_policy.has_claim_delete_access(current_user): if not self._access_policy.has_claim_delete_access(current_user):
self._ensure_draft_claim(claim) self._ensure_draft_claim(claim)
if not self._access_policy.is_claim_owned_by_current_user(claim, current_user): if not self._access_policy.is_claim_owned_by_current_user(claim, current_user):
raise ValueError("只有高级财务人员可以删除非本人单据,申请人仅可删除自己的草稿、待补充退回单据。") raise ValueError("只有系统管理员或草稿、待补充退回待提交阶段的申请人本人可以删除单据。")
before_json = self._serialize_claim(claim) before_json = self._serialize_claim(claim)
resource_id = claim.id resource_id = claim.id
@@ -1039,4 +1036,3 @@ class ExpenseClaimService(

View File

@@ -11,6 +11,8 @@ from sqlalchemy.pool import StaticPool
from app.db.base import Base from app.db.base import Base
from app.models.financial_record import ExpenseClaim from app.models.financial_record import ExpenseClaim
from app.services.document_numbering import ( from app.services.document_numbering import (
DOCUMENT_NUMBER_EXTRACT_PATTERN,
DOCUMENT_NUMBER_PATTERN,
build_document_number, build_document_number,
generate_unique_expense_claim_no, generate_unique_expense_claim_no,
is_application_claim_no, is_application_claim_no,
@@ -32,19 +34,37 @@ def test_build_document_number_uses_kind_prefix_timestamp_and_token() -> None:
timestamp = datetime(2026, 5, 25, 10, 30, 45, tzinfo=UTC) timestamp = datetime(2026, 5, 25, 10, 30, 45, tzinfo=UTC)
assert ( assert (
build_document_number("application", timestamp=timestamp, token="ABCDEFGH") build_document_number("application", timestamp=timestamp, token="7K3M9Q2P")
== "AP-20260525103045-ABCDEFGH" == "A7K3M9Q2P"
) )
assert ( assert (
build_document_number("reimbursement", timestamp=timestamp, token="ABCDEFGH") build_document_number("reimbursement", timestamp=timestamp, token="7K3M9Q2P")
== "RE-20260525103045-ABCDEFGH" == "R7K3M9Q2P"
) )
assert ( assert (
build_document_number("audit", timestamp=timestamp, token="ABCDEFGH") build_document_number("audit", timestamp=timestamp, token="7K3M9Q2P")
== "AD-20260525103045-ABCDEFGH" == "D7K3M9Q2P"
) )
def test_document_number_pattern_accepts_short_and_legacy_numbers() -> None:
assert DOCUMENT_NUMBER_PATTERN.fullmatch("A7K3M9Q2P")
assert DOCUMENT_NUMBER_PATTERN.fullmatch("R7K3M9Q2P")
assert DOCUMENT_NUMBER_PATTERN.fullmatch("D7K3M9Q2P")
assert DOCUMENT_NUMBER_PATTERN.fullmatch("AP-20260525103045-ABCDEFGH")
assert DOCUMENT_NUMBER_PATTERN.fullmatch("RE-20260525103045-HGFEDCBA")
def test_document_number_extract_pattern_finds_short_and_legacy_numbers() -> None:
query = "查看 A7K3M9Q2P、R7K3M9Q2P 和 AP-20260525103045-ABCDEFGH 的状态"
assert [match.group(0) for match in DOCUMENT_NUMBER_EXTRACT_PATTERN.finditer(query)] == [
"A7K3M9Q2P",
"R7K3M9Q2P",
"AP-20260525103045-ABCDEFGH",
]
def test_build_document_number_rejects_ambiguous_token_chars() -> None: def test_build_document_number_rejects_ambiguous_token_chars() -> None:
timestamp = datetime(2026, 5, 25, 10, 30, 45, tzinfo=UTC) timestamp = datetime(2026, 5, 25, 10, 30, 45, tzinfo=UTC)
@@ -57,7 +77,7 @@ def test_generate_unique_expense_claim_no_retries_existing_candidate() -> None:
with build_session() as db: with build_session() as db:
db.add( db.add(
ExpenseClaim( ExpenseClaim(
claim_no="RE-20260525103045-ABCDEFGH", claim_no="RABCDEFGH",
employee_name="张三", employee_name="张三",
department_name="市场部", department_name="市场部",
project_code=None, project_code=None,
@@ -84,11 +104,13 @@ def test_generate_unique_expense_claim_no_retries_existing_candidate() -> None:
timestamp=timestamp, timestamp=timestamp,
token_factory=lambda: next(tokens), token_factory=lambda: next(tokens),
) )
== "RE-20260525103045-HGFEDCBA" == "RHGFEDCBA"
) )
def test_is_application_claim_no_supports_new_and_legacy_prefixes() -> None: def test_is_application_claim_no_supports_new_and_legacy_prefixes() -> None:
assert is_application_claim_no("A7K3M9Q2P")
assert is_application_claim_no("AP-20260525103045-ABCDEFGH") assert is_application_claim_no("AP-20260525103045-ABCDEFGH")
assert is_application_claim_no("APP-20260525-ABC123") assert is_application_claim_no("APP-20260525-ABC123")
assert not is_application_claim_no("R7K3M9Q2P")
assert not is_application_claim_no("RE-20260525103045-ABCDEFGH") assert not is_application_claim_no("RE-20260525103045-ABCDEFGH")