fix: restrict application linking for reimbursement drafts

This commit is contained in:
caoxiaozhu
2026-06-03 16:28:09 +08:00
parent 8887cf5a27
commit 04f0951b3d
4 changed files with 537 additions and 5 deletions

View File

@@ -110,6 +110,10 @@ from app.services.expense_rule_runtime import (
from app.services.ocr import OcrService
APPROVED_APPLICATION_LINK_STATUSES = {"approved", "completed"}
INACTIVE_APPLICATION_LINK_REIMBURSEMENT_STATUSES = {"cancelled", "canceled", "deleted"}
class ExpenseClaimDraftFlowMixin:
def upsert_draft_from_ontology(
self,
@@ -176,6 +180,12 @@ class ExpenseClaimDraftFlowMixin:
)
is_new_claim = claim is None
before_json = self._serialize_claim(claim) if claim is not None else None
application_link_block_result = self._build_application_link_block_result(
context_json=context_json,
target_claim=claim,
)
if application_link_block_result is not None:
return application_link_block_result
if is_new_claim:
existing_draft_count = self._count_draft_claims_for_owner(
employee=employee,
@@ -517,6 +527,219 @@ class ExpenseClaimDraftFlowMixin:
return list(risk_flags or [])
return [*list(risk_flags or []), link_flag]
def _build_application_link_block_result(
self,
*,
context_json: dict[str, Any],
target_claim: ExpenseClaim | None,
) -> dict[str, Any] | None:
link_flag = self._build_application_link_flag(context_json)
if link_flag is None:
return None
application_claim = self._find_application_claim_for_link(link_flag)
application_claim_no = str(link_flag.get("application_claim_no") or "").strip()
display_no = application_claim_no or "未编号申请单"
if application_claim is None or not self._is_expense_application_claim(application_claim):
return self._build_application_link_rejected_result(
f"未找到可关联的申请单 {display_no}。请先选择已审批通过的申请单。",
)
normalized_status = str(application_claim.status or "").strip().lower()
if normalized_status not in APPROVED_APPLICATION_LINK_STATUSES:
return self._build_application_link_rejected_result(
f"申请单 {application_claim.claim_no} 当前不是已审批通过状态,不能用于快速报销关联。",
application_claim=application_claim,
)
existing_reimbursement = self._find_existing_reimbursement_for_application_link(
application_claim=application_claim,
link_flag=link_flag,
target_claim=target_claim,
)
if existing_reimbursement is not None:
return self._build_application_link_rejected_result(
(
f"申请单 {application_claim.claim_no} 已经关联报销单 {existing_reimbursement.claim_no}"
"请进入该草稿或单据继续补充,不能重复生成。"
),
application_claim=application_claim,
existing_claim=existing_reimbursement,
)
return None
def _find_application_claim_for_link(self, link_flag: dict[str, Any]) -> ExpenseClaim | None:
application_claim_id = str(link_flag.get("application_claim_id") or "").strip()
application_claim_no = str(link_flag.get("application_claim_no") or "").strip()
if application_claim_id:
claim = self.db.get(ExpenseClaim, application_claim_id)
if claim is not None and self._is_expense_application_claim(claim):
return claim
if application_claim_no:
return self.db.scalar(
select(ExpenseClaim)
.where(ExpenseClaim.claim_no == application_claim_no)
.limit(1)
)
return None
def _find_existing_reimbursement_for_application_link(
self,
*,
application_claim: ExpenseClaim,
link_flag: dict[str, Any],
target_claim: ExpenseClaim | None,
) -> ExpenseClaim | None:
generated_draft = self._find_generated_reimbursement_from_application(
application_claim=application_claim,
target_claim=target_claim,
)
if generated_draft is not None:
return generated_draft
linked_ids, linked_nos = self._collect_application_link_reference_values(link_flag)
linked_ids.add(str(application_claim.id or "").strip())
linked_nos.add(str(application_claim.claim_no or "").strip().upper())
linked_ids.discard("")
linked_nos.discard("")
for claim in list(self.db.scalars(select(ExpenseClaim)).all()):
if self._is_same_target_claim(claim, target_claim):
continue
if self._is_expense_application_claim(claim):
continue
if self._is_inactive_application_link_reimbursement(claim):
continue
if self._claim_references_application(claim, linked_ids=linked_ids, linked_nos=linked_nos):
return claim
return None
def _find_generated_reimbursement_from_application(
self,
*,
application_claim: ExpenseClaim,
target_claim: ExpenseClaim | None,
) -> ExpenseClaim | None:
for flag in list(application_claim.risk_flags_json or []):
if not isinstance(flag, dict):
continue
generated_draft_id = str(
flag.get("generated_draft_claim_id")
or flag.get("generatedDraftClaimId")
or ""
).strip()
generated_draft_no = str(
flag.get("generated_draft_claim_no")
or flag.get("generatedDraftClaimNo")
or ""
).strip()
claim = self.db.get(ExpenseClaim, generated_draft_id) if generated_draft_id else None
if claim is None and generated_draft_no:
claim = self.db.scalar(
select(ExpenseClaim)
.where(ExpenseClaim.claim_no == generated_draft_no)
.limit(1)
)
if claim is None:
continue
if self._is_same_target_claim(claim, target_claim):
continue
if self._is_expense_application_claim(claim):
continue
if self._is_inactive_application_link_reimbursement(claim):
continue
return claim
return None
@staticmethod
def _is_same_target_claim(claim: ExpenseClaim, target_claim: ExpenseClaim | None) -> bool:
return bool(target_claim is not None and claim.id == target_claim.id)
@staticmethod
def _is_inactive_application_link_reimbursement(claim: ExpenseClaim) -> bool:
status = str(claim.status or "").strip().lower()
return status in INACTIVE_APPLICATION_LINK_REIMBURSEMENT_STATUSES
@classmethod
def _claim_references_application(
cls,
claim: ExpenseClaim,
*,
linked_ids: set[str],
linked_nos: set[str],
) -> bool:
for flag in list(claim.risk_flags_json or []):
flag_ids, flag_nos = cls._collect_application_link_reference_values(flag)
if flag_ids.intersection(linked_ids) or flag_nos.intersection(linked_nos):
return True
return False
@classmethod
def _collect_application_link_reference_values(cls, payload: Any) -> tuple[set[str], set[str]]:
ids: set[str] = set()
claim_nos: set[str] = set()
if not isinstance(payload, dict):
return ids, claim_nos
cls._add_application_link_reference(ids, claim_nos, payload)
for key in (
"application_detail",
"applicationDetail",
"review_form_values",
"reviewFormValues",
"expense_scene_selection",
"expenseSceneSelection",
):
nested_ids, nested_nos = cls._collect_application_link_reference_values(payload.get(key))
ids.update(nested_ids)
claim_nos.update(nested_nos)
ids.discard("")
claim_nos.discard("")
return ids, claim_nos
@staticmethod
def _add_application_link_reference(
ids: set[str],
claim_nos: set[str],
payload: dict[str, Any],
) -> None:
for key in ("application_claim_id", "applicationClaimId"):
ids.add(str(payload.get(key) or "").strip())
for key in ("application_claim_no", "applicationClaimNo"):
claim_nos.add(str(payload.get(key) or "").strip().upper())
@staticmethod
def _build_application_link_rejected_result(
message: str,
*,
application_claim: ExpenseClaim | None = None,
existing_claim: ExpenseClaim | None = None,
) -> dict[str, Any]:
result: dict[str, Any] = {
"message": message,
"draft_only": False,
"status": "blocked",
"application_link_blocked": True,
"submission_blocked": True,
"submission_blocked_reasons": [message],
"missing_fields": [message],
"risk_flags": ["application_link_blocked"],
}
if application_claim is not None:
result["application_claim_id"] = application_claim.id
result["application_claim_no"] = application_claim.claim_no
result["application_status"] = application_claim.status
if existing_claim is not None:
result["existing_claim_id"] = existing_claim.id
result["existing_claim_no"] = existing_claim.claim_no
result["existing_claim_status"] = existing_claim.status
return result
@staticmethod
def _build_application_link_flag(context_json: dict[str, Any]) -> dict[str, Any] | None:
review_values = ExpenseClaimDraftFlowMixin._normalize_context_object(

View File

@@ -69,6 +69,36 @@ def build_claim(*, expense_type: str, location: str) -> ExpenseClaim:
return claim
def build_application_claim(
*,
id: str,
claim_no: str,
employee: Employee,
status: str = "approved",
amount: Decimal = Decimal("3000.00"),
) -> ExpenseClaim:
return ExpenseClaim(
id=id,
claim_no=claim_no,
employee_id=employee.id,
employee_name=employee.name,
department_id=employee.organization_unit_id,
department_name="Tech",
project_code=None,
expense_type="travel_application",
reason="support deployment",
location="Shanghai",
amount=amount,
currency="CNY",
invoice_count=0,
occurred_at=datetime(2026, 2, 20, tzinfo=UTC),
submitted_at=datetime(2026, 2, 18, tzinfo=UTC),
status=status,
approval_stage=APPROVAL_DONE_STAGE if status in {"approved", "completed"} else "Pending",
risk_flags_json=[],
)
def build_session() -> Session:
engine = create_engine(
"sqlite+pysqlite:///:memory:",
@@ -322,6 +352,12 @@ def test_upsert_draft_from_ontology_persists_linked_application_context() -> Non
email=user_id,
)
db.add(employee)
db.flush()
db.add(build_application_claim(
id="application-linked-1",
claim_no="AP-202605-001",
employee=employee,
))
db.commit()
ontology = SemanticOntologyService(db).parse(
OntologyParseRequest(
@@ -384,6 +420,12 @@ def test_upsert_linked_application_draft_without_receipts_has_no_placeholder_ite
grade="P5",
)
db.add(employee)
db.flush()
db.add(build_application_claim(
id="application-linked-no-receipt",
claim_no="AP-202606-001",
employee=employee,
))
db.commit()
ontology = SemanticOntologyService(db).parse(
OntologyParseRequest(
@@ -474,6 +516,11 @@ def test_upsert_linked_application_draft_clears_existing_placeholder_item() -> N
)
db.add(employee)
db.flush()
db.add(build_application_claim(
id="application-linked-existing-placeholder",
claim_no="AP-202606-002",
employee=employee,
))
existing_claim = ExpenseClaim(
claim_no="RE-202606020001-PLACEHOLDER",
employee_id=employee.id,
@@ -550,6 +597,123 @@ def test_upsert_linked_application_draft_clears_existing_placeholder_item() -> N
assert claim.items == []
def test_upsert_linked_application_requires_approved_application() -> None:
user_id = "linked-application-status-block@example.com"
message = "save reimbursement draft from linked travel application"
with build_session() as db:
employee = Employee(employee_no="E5108", name="Linked Employee", email=user_id)
db.add(employee)
db.flush()
db.add(build_application_claim(
id="application-returned-blocked",
claim_no="AP-202606-STATUS",
employee=employee,
status="returned",
))
db.commit()
ontology = SemanticOntologyService(db).parse(
OntologyParseRequest(query=message, user_id=user_id)
)
result = ExpenseClaimService(db).upsert_draft_from_ontology(
run_id=ontology.run_id,
user_id=user_id,
message=message,
ontology=ontology,
context_json={
"name": "Linked Employee",
"user_input_text": message,
"review_action": "save_draft",
"review_form_values": {
"expense_type": "travel",
"application_claim_id": "application-returned-blocked",
"application_claim_no": "AP-202606-STATUS",
},
"expense_scene_selection": {
"expense_type": "travel",
"application_claim_id": "application-returned-blocked",
"application_claim_no": "AP-202606-STATUS",
},
},
)
assert result["status"] == "blocked"
assert result["application_link_blocked"] is True
assert result["application_claim_no"] == "AP-202606-STATUS"
assert _count_claims(db) == 1
def test_upsert_linked_application_rejects_duplicate_reimbursement_draft() -> None:
user_id = "linked-application-duplicate-block@example.com"
message = "save another reimbursement draft from linked travel application"
with build_session() as db:
employee = Employee(employee_no="E5109", name="Linked Employee", email=user_id)
db.add(employee)
db.flush()
db.add(build_application_claim(
id="application-duplicate-blocked",
claim_no="AP-202606-DUP",
employee=employee,
))
existing_claim = ExpenseClaim(
claim_no="RE-202606-DUP-DRAFT",
employee_id=employee.id,
employee_name=employee.name,
department_name="Tech",
project_code=None,
expense_type="travel",
reason="support deployment",
location="Shanghai",
amount=Decimal("0.00"),
currency="CNY",
invoice_count=0,
occurred_at=datetime(2026, 2, 20, tzinfo=UTC),
status="draft",
approval_stage="Pending",
risk_flags_json=[
{
"source": "application_link",
"application_claim_id": "application-duplicate-blocked",
"application_claim_no": "AP-202606-DUP",
}
],
)
db.add(existing_claim)
db.commit()
ontology = SemanticOntologyService(db).parse(
OntologyParseRequest(query=message, user_id=user_id)
)
result = ExpenseClaimService(db).upsert_draft_from_ontology(
run_id=ontology.run_id,
user_id=user_id,
message=message,
ontology=ontology,
context_json={
"name": "Linked Employee",
"user_input_text": message,
"review_action": "save_draft",
"review_form_values": {
"expense_type": "travel",
"application_claim_id": "application-duplicate-blocked",
"application_claim_no": "AP-202606-DUP",
},
"expense_scene_selection": {
"expense_type": "travel",
"application_claim_id": "application-duplicate-blocked",
"application_claim_no": "AP-202606-DUP",
},
},
)
assert result["status"] == "blocked"
assert result["application_link_blocked"] is True
assert result["existing_claim_no"] == "RE-202606-DUP-DRAFT"
assert _count_claims(db) == 2
def test_sync_travel_allowance_uses_linked_application_range_days() -> None:
with build_session() as db:
employee = Employee(