Files
X-Financial/server/src/app/services/expense_claim_platform_risk.py
caoxiaozhu 1f4681f486 feat(claim): 重构报销审批流并收敛风险标记
- 直属领导兼任部门 P8 预算审批人时合并预算审批,直接流转至财务审批
- 预算超过警戒值时强制要求预算管理者填写审批意见
- 新增风险标记去重工具,消除各审核阶段重复风险卡片
- 新增工作流修复 Mixin,纠正重复预算审批阶段的历史数据
- 收紧单据删除权限至 admin,放宽预算分析可见范围至当前审核人
- 提交校验放宽已上传票据条目的 OCR 字段缺失并忽略尾部占位条目
2026-06-17 14:38:07 +08:00

792 lines
31 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
import re
from typing import Any
from sqlalchemy import select
from app.core.agent_enums import AgentAssetDomain, AgentAssetStatus, AgentAssetType
from app.models.agent_asset import AgentAsset
from app.models.financial_record import ExpenseClaim, ExpenseClaimItem
from app.services.agent_asset_rule_library import AgentAssetRuleLibraryManager
from app.services.agent_asset_spreadsheet import RISK_RULES_LIBRARY
from app.services.budget import BudgetService
from app.services.expense_claim_platform_context_tools import (
collect_attachment_cities,
collect_invoice_keys_from_contexts,
collect_invoice_keys_from_document_info,
count_values,
extract_known_cities_from_text,
resolve_first_document_field_value,
)
from app.services.expense_rule_runtime import (
RuntimeTravelPolicy,
)
from app.services.expense_type_keywords import resolve_expense_type_code_from_text
from app.services.expense_claim_risk_flags import dedupe_claim_risk_flags
from app.services.expense_claim_platform_route_risk import resolve_multi_city_related_item_ids
from app.services.expense_claim_platform_risk_flag import build_platform_risk_flag
from app.services.expense_claim_platform_text_risk import (
collect_vague_goods_description_evidence,
)
from app.services.risk_rule_manifest_classifier import is_budget_risk_manifest
from app.services.risk_rule_manifest_normalizer import normalize_risk_rule_manifest
from app.services.risk_rule_template_executor import RiskRuleTemplateExecutor
class ExpenseClaimPlatformRiskMixin:
_DEFAULT_RISK_BUSINESS_STAGE = "reimbursement"
_SUPPORTED_RISK_BUSINESS_STAGES = {"expense_application", "reimbursement"}
def evaluate_platform_risk_rules(
self,
claim: ExpenseClaim,
*,
rule_codes: list[str] | None = None,
business_stage: str | None = None,
) -> dict[str, list[Any]]:
normalized_stage = self._normalize_platform_risk_business_stage(business_stage)
manifests = self._load_platform_risk_rule_manifests(
rule_codes=rule_codes,
business_stage=normalized_stage,
)
if not manifests:
return {"flags": [], "blocking_reasons": []}
contexts = self._build_claim_attachment_contexts(claim)
contexts.append(
{
"index": len(contexts) + 1,
"item": None,
"document_info": {},
"ocr_text": "",
"ocr_summary": "",
"budget_context": BudgetService(self.db).build_claim_budget_context(claim),
}
)
flags: list[dict[str, Any]] = []
blocking_reasons: list[str] = []
for manifest in manifests:
if not self._risk_manifest_applies_to_claim(manifest, claim=claim, contexts=contexts):
continue
flag = self._evaluate_platform_risk_manifest(
manifest,
claim=claim,
contexts=contexts,
)
if flag is None:
continue
flags.append(flag)
flags = [
flag
for flag in dedupe_claim_risk_flags(flags)
if isinstance(flag, dict)
]
for flag in flags:
severity = str(flag.get("severity") or "").strip().lower()
action = str(flag.get("action") or "").strip().lower()
if severity in {"high", "critical"} or action == "block":
blocking_reasons.append(str(flag.get("message") or flag.get("label") or "").strip())
deduplicated_reasons = list(dict.fromkeys(reason for reason in blocking_reasons if reason))
return {"flags": flags, "blocking_reasons": deduplicated_reasons}
def _load_platform_risk_rule_manifests(
self,
*,
rule_codes: list[str] | None,
business_stage: str | None,
) -> list[dict[str, Any]]:
code_filter = {
str(code or "").strip() for code in list(rule_codes or []) if str(code or "").strip()
}
manifests_by_code: dict[str, dict[str, Any]] = {}
assets = list(
self.db.scalars(
select(AgentAsset)
.where(AgentAsset.asset_type == AgentAssetType.RULE.value)
.where(AgentAsset.status == AgentAssetStatus.ACTIVE.value)
.where(AgentAsset.domain == AgentAssetDomain.EXPENSE.value)
.order_by(AgentAsset.updated_at.desc(), AgentAsset.created_at.desc())
).all()
)
library_manager = AgentAssetRuleLibraryManager()
for asset in assets:
config_json = asset.config_json if isinstance(asset.config_json, dict) else {}
if str(config_json.get("detail_mode") or "").strip().lower() != "json_risk":
continue
rule_code = str(asset.code or "").strip()
if code_filter and rule_code not in code_filter:
continue
rule_document = config_json.get("rule_document")
if not isinstance(rule_document, dict):
continue
file_name = str(rule_document.get("file_name") or "").strip()
rule_library = (
str(config_json.get("rule_library") or RISK_RULES_LIBRARY).strip()
or RISK_RULES_LIBRARY
)
if not file_name:
continue
try:
payload = library_manager.read_rule_library_json(
library=rule_library,
file_name=file_name,
)
except (FileNotFoundError, ValueError):
continue
payload = normalize_risk_rule_manifest(payload)
manifest_code = str(payload.get("rule_code") or rule_code).strip()
if not manifest_code or (code_filter and manifest_code not in code_filter):
continue
if is_budget_risk_manifest(payload):
continue
if payload.get("enabled") is False or not self._risk_manifest_matches_business_stage(
payload,
business_stage=business_stage,
):
continue
payload = dict(payload)
payload.setdefault("rule_code", manifest_code)
payload["_rule_version"] = str(
asset.published_version or asset.current_version or "v1.0.0"
)
payload["_rule_asset_id"] = asset.id
manifests_by_code[manifest_code] = payload
missing_codes = code_filter - set(manifests_by_code)
should_load_fallback = not code_filter or bool(missing_codes)
if should_load_fallback:
try:
files = library_manager.list_rule_library_json_files(library=RISK_RULES_LIBRARY)
except ValueError:
files = []
for file_name in files:
try:
payload = library_manager.read_rule_library_json(
library=RISK_RULES_LIBRARY,
file_name=file_name,
)
except (FileNotFoundError, ValueError):
continue
payload = normalize_risk_rule_manifest(payload)
rule_code = str(payload.get("rule_code") or "").strip()
if not rule_code or rule_code in manifests_by_code:
continue
if code_filter and rule_code not in missing_codes:
continue
if is_budget_risk_manifest(payload):
continue
if payload.get("enabled") is False or not self._risk_manifest_matches_business_stage(
payload,
business_stage=business_stage,
):
continue
payload = dict(payload)
payload["_rule_version"] = "v1.0.0"
manifests_by_code[rule_code] = payload
return list(manifests_by_code.values())
@classmethod
def _normalize_platform_risk_business_stage(cls, value: str | None) -> str:
normalized = str(value or cls._DEFAULT_RISK_BUSINESS_STAGE).strip().lower()
if not normalized or normalized not in cls._SUPPORTED_RISK_BUSINESS_STAGES:
return cls._DEFAULT_RISK_BUSINESS_STAGE
return normalized
@classmethod
def _risk_manifest_matches_business_stage(
cls,
manifest: dict[str, Any],
*,
business_stage: str | None,
) -> bool:
if not business_stage:
return True
applies_to = manifest.get("applies_to") if isinstance(manifest.get("applies_to"), dict) else {}
raw_stages = applies_to.get("business_stages")
if not isinstance(raw_stages, list):
metadata = manifest.get("metadata") if isinstance(manifest.get("metadata"), dict) else {}
raw_stages = [manifest.get("business_stage") or metadata.get("business_stage") or cls._DEFAULT_RISK_BUSINESS_STAGE]
stages = {
cls._normalize_platform_risk_business_stage(str(item))
for item in raw_stages
if str(item or "").strip()
}
return business_stage in (stages or {cls._DEFAULT_RISK_BUSINESS_STAGE})
def _risk_manifest_applies_to_claim(
self,
manifest: dict[str, Any],
*,
claim: ExpenseClaim,
contexts: list[dict[str, Any]],
) -> bool:
applies_to = manifest.get("applies_to")
if not isinstance(applies_to, dict):
applies_to = {}
try:
min_attachments = int(applies_to.get("min_attachments") or 0)
except (TypeError, ValueError):
min_attachments = 0
if min_attachments and int(claim.invoice_count or 0) < min_attachments and not contexts:
return False
expense_types = self._normalize_expense_type_values(
str(claim.expense_type or ""),
*[str(item.item_type or "") for item in list(claim.items or [])],
)
domains = {
str(value or "").strip().lower()
for value in list(applies_to.get("domains") or [])
if str(value or "").strip()
}
configured_expense_types = self._normalize_expense_type_values(
*[str(value or "") for value in list(applies_to.get("expense_types") or [])]
)
configured_expense_categories = self._normalize_expense_type_values(
*[str(value or "") for value in list(applies_to.get("expense_categories") or [])]
)
if self._is_all_expense_scope(configured_expense_types):
configured_expense_types = set()
if self._is_all_expense_scope(configured_expense_categories):
configured_expense_categories = set()
if configured_expense_types and not (expense_types & configured_expense_types):
return False
if configured_expense_categories and not (expense_types & configured_expense_categories):
return False
if domains and not self._risk_domains_match_claim(
domains,
expense_types=expense_types,
contexts=contexts,
):
return False
return True
@staticmethod
def _normalize_expense_type_values(*values: str) -> set[str]:
normalized: set[str] = set()
for value in values:
raw = str(value or "").strip()
if not raw:
continue
normalized.add(raw.lower())
if raw in {"全部", "通用"}:
normalized.add("all")
if raw.lower().endswith("_application"):
normalized.add(raw.lower().removesuffix("_application"))
resolved = resolve_expense_type_code_from_text(raw)
if resolved:
normalized.add(resolved)
return normalized
@staticmethod
def _is_all_expense_scope(values: set[str]) -> bool:
return bool(values & {"all", "*", "overall", "general", "全部", "通用"})
def _risk_domains_match_claim(
self,
domains: set[str],
*,
expense_types: set[str],
contexts: list[dict[str, Any]],
) -> bool:
normalized_contexts: list[dict[str, str]] = []
for context in contexts:
document_info = context.get("document_info") or {}
normalized_contexts.append(
{
"scene_code": str(document_info.get("scene_code") or "").strip().lower(),
"document_type": str(document_info.get("document_type") or "").strip().lower(),
"item_type": str(getattr(context.get("item"), "item_type", "") or "")
.strip()
.lower(),
}
)
if "expense" in domains:
return True
if "travel" in domains:
if expense_types & {"travel", "hotel", "transport"}:
return True
if any(
item["scene_code"] in {"travel", "hotel", "transport"}
or item["document_type"]
in {
"flight_itinerary",
"train_ticket",
"hotel_invoice",
"taxi_receipt",
}
for item in normalized_contexts
):
return True
if "meal" in domains:
if expense_types & {"meal", "entertainment"}:
return True
if any(
item["scene_code"] == "meal" or item["document_type"] == "meal_receipt"
for item in normalized_contexts
):
return True
return bool(domains & expense_types)
def _evaluate_platform_risk_manifest(
self,
manifest: dict[str, Any],
*,
claim: ExpenseClaim,
contexts: list[dict[str, Any]],
) -> dict[str, Any] | None:
evaluator = str(manifest.get("evaluator") or "").strip().lower()
if evaluator == "reason_too_brief":
return self._evaluate_reason_too_brief_risk(manifest, claim=claim)
if evaluator == "entertainment_reason_missing":
return self._evaluate_entertainment_reason_missing_risk(manifest, claim=claim)
if evaluator == "document_expense_mismatch":
return self._evaluate_document_expense_mismatch_risk(
manifest,
claim=claim,
contexts=contexts,
)
if evaluator == "location_consistency":
return self._evaluate_location_consistency_risk(
manifest,
claim=claim,
contexts=contexts,
)
if evaluator == "duplicate_invoice":
return self._evaluate_duplicate_invoice_risk(manifest, claim=claim, contexts=contexts)
if evaluator == "identity_consistency":
return self._evaluate_identity_consistency_risk(
manifest,
claim=claim,
contexts=contexts,
)
if evaluator == "cross_year_invoice":
return self._evaluate_cross_year_invoice_risk(manifest, claim=claim, contexts=contexts)
if evaluator == "void_or_red_invoice":
return self._evaluate_text_keyword_risk(
manifest,
contexts=contexts,
keywords=["作废", "红冲", "红字", "冲红"],
fallback_message="票据文本中出现作废、红冲或红字发票相关信息,建议退回补充或人工复核。",
)
if evaluator == "vague_goods_description":
return self._evaluate_vague_goods_description_risk(
manifest,
contexts=contexts,
keywords=["详见清单", "服务费", "咨询费", "其他", "办公用品"],
fallback_message="票据商品或服务描述较笼统,建议审批人核对真实用途和明细清单。",
)
if evaluator == "multi_city_reason_required":
return self._evaluate_multi_city_reason_required_risk(
manifest,
claim=claim,
contexts=contexts,
)
if evaluator == "template_rule":
result = RiskRuleTemplateExecutor().evaluate(
manifest,
claim=claim,
contexts=contexts,
)
if result is None:
return None
return self._with_related_item_ids(
self._build_platform_risk_flag(
manifest,
message=str(result.get("message") or "自然语言风险规则命中。"),
evidence=result.get("evidence") if isinstance(result.get("evidence"), dict) else {},
),
self._context_item_ids(contexts),
)
return None
def _evaluate_reason_too_brief_risk(
self,
manifest: dict[str, Any],
*,
claim: ExpenseClaim,
) -> dict[str, Any] | None:
params = manifest.get("params") if isinstance(manifest.get("params"), dict) else {}
try:
min_reason_length = max(1, int(params.get("min_reason_length") or 6))
except (TypeError, ValueError):
min_reason_length = 6
reason_corpus = re.sub(r"\s+", "", self._build_scene_reason_corpus(claim))
if len(reason_corpus) >= min_reason_length:
return None
return self._build_platform_risk_flag(
manifest,
message=f"报销事由有效描述不足 {min_reason_length} 个字符,暂不足以支撑真实性判断。",
evidence={"reason_length": len(reason_corpus), "min_reason_length": min_reason_length},
)
def _evaluate_entertainment_reason_missing_risk(
self,
manifest: dict[str, Any],
*,
claim: ExpenseClaim,
) -> dict[str, Any] | None:
expense_types = {
str(claim.expense_type or "").strip().lower(),
*{str(item.item_type or "").strip().lower() for item in list(claim.items or [])},
}
reason_corpus = self._build_scene_reason_corpus(claim)
compact_reason = re.sub(r"\s+", "", reason_corpus)
looks_like_entertainment = (
"entertainment" in expense_types or "招待" in compact_reason or "客户" in compact_reason
)
if not looks_like_entertainment:
return None
required_keywords = ("客户", "项目", "参与", "人员", "对象", "商务", "会议")
has_detail = any(keyword in compact_reason for keyword in required_keywords)
if has_detail:
return None
return self._build_platform_risk_flag(
manifest,
message="招待或餐饮类费用未识别到客户、项目、参与人员等必要说明,建议补充后再流转。",
evidence={"reason": reason_corpus[:300]},
)
def _evaluate_document_expense_mismatch_risk(
self,
manifest: dict[str, Any],
*,
claim: ExpenseClaim,
contexts: list[dict[str, Any]],
) -> dict[str, Any] | None:
mismatches: list[str] = []
for context in contexts:
item = context["item"]
item_type = (
str(item.item_type or claim.expense_type or "other").strip().lower() or "other"
)
policy = self._get_expense_scene_policy(item_type)
if policy is None:
continue
document_info = context.get("document_info") or {}
recognized_scene_code = (
str(document_info.get("scene_code") or "other").strip().lower() or "other"
)
recognized_document_type = (
str(document_info.get("document_type") or "other").strip().lower() or "other"
)
if recognized_scene_code in set(
policy.allowed_scene_codes
) or recognized_document_type in set(policy.allowed_document_types):
continue
recognized_label = str(
document_info.get("document_type_label") or recognized_document_type or "未知票据"
)
mismatches.append(
f"{context['index']} 条明细为{policy.label},附件识别为{recognized_label}"
)
if not mismatches:
return None
return self._with_related_item_ids(
self._build_platform_risk_flag(
manifest,
message="".join(mismatches[:3]) + ",与当前费用场景不匹配。",
evidence={"mismatches": mismatches[:5]},
),
self._context_item_ids(contexts),
)
def _evaluate_location_consistency_risk(
self,
manifest: dict[str, Any],
*,
claim: ExpenseClaim,
contexts: list[dict[str, Any]],
) -> dict[str, Any] | None:
policy = self._get_expense_rule_catalog().travel_policy
if policy is None:
return None
declared_cities = extract_known_cities_from_text(
" ".join(
[
str(claim.location or ""),
*[str(item.item_location or "") for item in list(claim.items or [])],
]
),
policy,
)
evidence_cities = collect_attachment_cities(contexts, policy)
if not declared_cities or not evidence_cities:
return None
if set(declared_cities) & set(evidence_cities):
return None
declared_text = "".join(declared_cities)
evidence_text = "".join(evidence_cities[:5])
return self._with_related_item_ids(
self._build_platform_risk_flag(
manifest,
message=(
f"申报地点 {declared_text} 与票据识别地点 {evidence_text} 不一致,"
"建议补充异地说明或更换附件。"
),
evidence={"declared_cities": declared_cities, "evidence_cities": evidence_cities},
),
self._context_item_ids(contexts),
)
def _evaluate_duplicate_invoice_risk(
self,
manifest: dict[str, Any],
*,
claim: ExpenseClaim,
contexts: list[dict[str, Any]],
) -> dict[str, Any] | None:
invoice_keys = collect_invoice_keys_from_contexts(contexts)
duplicate_keys = [
key for key, count in count_values(invoice_keys).items() if count > 1
]
if duplicate_keys:
return self._build_platform_risk_flag(
manifest,
message=f"当前报销单内存在重复票据号码:{''.join(duplicate_keys[:3])}",
evidence={"duplicate_invoice_keys": duplicate_keys[:5]},
)
if not invoice_keys:
return None
other_items = list(
self.db.scalars(
select(ExpenseClaimItem)
.where(ExpenseClaimItem.claim_id != claim.id)
.where(ExpenseClaimItem.invoice_id.is_not(None))
).all()
)
matched_claim_ids: set[str] = set()
for other_item in other_items:
other_path = self._attachment_storage.resolve_path(other_item.invoice_id)
if other_path is None or not other_path.exists():
continue
other_meta = self._attachment_storage.read_meta(other_path)
other_document_info = other_meta.get("document_info")
if not isinstance(other_document_info, dict):
continue
other_keys = collect_invoice_keys_from_document_info(other_document_info)
if set(invoice_keys) & set(other_keys):
matched_claim_ids.add(str(other_item.claim_id or ""))
if not matched_claim_ids:
return None
return self._build_platform_risk_flag(
manifest,
message=f"票据号码已在其他报销单中出现,疑似重复报销:{''.join(invoice_keys[:3])}",
evidence={
"invoice_keys": invoice_keys[:5],
"matched_claim_ids": sorted(matched_claim_ids)[:5],
},
)
def _evaluate_identity_consistency_risk(
self,
manifest: dict[str, Any],
*,
claim: ExpenseClaim,
contexts: list[dict[str, Any]],
) -> dict[str, Any] | None:
params = manifest.get("params") if isinstance(manifest.get("params"), dict) else {}
allow_keywords = [
str(value) for value in list(params.get("allow_keywords") or []) if str(value).strip()
]
claimant = str(claim.employee_name or "").strip()
if not claimant:
return None
mismatched_buyers: list[str] = []
for context in contexts:
buyer = resolve_first_document_field_value(
context.get("document_info") or {},
keys={"buyer_name", "buyer", "purchaser_name", "claimant"},
labels={"购买方", "抬头", "买方", "购方"},
)
if not buyer:
continue
if claimant in buyer or any(keyword in buyer for keyword in allow_keywords):
continue
mismatched_buyers.append(buyer)
if not mismatched_buyers:
return None
return self._build_platform_risk_flag(
manifest,
message=f"发票抬头 {mismatched_buyers[0]} 与报销人 {claimant} 不一致,建议人工复核。",
evidence={"claimant": claimant, "buyers": mismatched_buyers[:5]},
)
def _evaluate_cross_year_invoice_risk(
self,
manifest: dict[str, Any],
*,
claim: ExpenseClaim,
contexts: list[dict[str, Any]],
) -> dict[str, Any] | None:
claim_year = claim.occurred_at.year if claim.occurred_at is not None else None
if claim_year is None:
return None
issue_years: list[int] = []
for context in contexts:
text = " ".join(
[
resolve_first_document_field_value(
context.get("document_info") or {},
keys={"date", "issue_date", "invoice_date"},
labels={"日期", "开票日期", "发生时间"},
),
str(context.get("ocr_summary") or ""),
str(context.get("ocr_text") or ""),
]
)
for match in re.findall(r"(20\d{2}|19\d{2})[年/\-.]", text):
try:
issue_years.append(int(match))
except ValueError:
continue
mismatch_years = sorted({year for year in issue_years if year != claim_year})
if not mismatch_years:
return None
return self._build_platform_risk_flag(
manifest,
message=(
f"票据年份 {mismatch_years[0]} 与费用发生年份 {claim_year} 不一致,"
"建议确认是否跨年报销。"
),
evidence={"claim_year": claim_year, "invoice_years": mismatch_years},
)
def _evaluate_text_keyword_risk(
self,
manifest: dict[str, Any],
*,
contexts: list[dict[str, Any]],
keywords: list[str],
fallback_message: str,
) -> dict[str, Any] | None:
matched: list[str] = []
for context in contexts:
text = f"{context.get('ocr_summary') or ''}\n{context.get('ocr_text') or ''}"
for keyword in keywords:
if keyword in text and keyword not in matched:
matched.append(keyword)
if not matched:
return None
return self._build_platform_risk_flag(
manifest,
message=fallback_message,
evidence={"matched_keywords": matched},
)
def _evaluate_vague_goods_description_risk(
self,
manifest: dict[str, Any],
*,
contexts: list[dict[str, Any]],
keywords: list[str],
fallback_message: str,
) -> dict[str, Any] | None:
evidence = collect_vague_goods_description_evidence(contexts, keywords)
if not evidence:
return None
return self._build_platform_risk_flag(
manifest,
message=fallback_message,
evidence=evidence,
)
def _evaluate_multi_city_reason_required_risk(
self,
manifest: dict[str, Any],
*,
claim: ExpenseClaim,
contexts: list[dict[str, Any]],
) -> dict[str, Any] | None:
policy = self._get_expense_rule_catalog().travel_policy
if policy is None:
return None
cities = collect_attachment_cities(contexts, policy)
for item in list(claim.items or []):
for city in extract_known_cities_from_text(str(item.item_location or ""), policy):
if city not in cities:
cities.append(city)
if len(cities) <= 2:
return None
reason_corpus = self._build_travel_reason_corpus(claim)
if self._text_contains_keywords(reason_corpus, policy.route_exception_keywords):
return None
related_item_ids, extra_cities = resolve_multi_city_related_item_ids(
claim,
contexts,
policy,
)
evidence = {"cities": cities[:8]}
if extra_cities:
evidence["extra_cities"] = extra_cities[:8]
return self._with_related_item_ids(
self._build_platform_risk_flag(
manifest,
message=f"本次报销识别到多城市行程({''.join(cities[:5])}),但事由中未说明中转、多地拜访或改签原因。",
evidence=evidence,
),
related_item_ids or self._context_item_ids(contexts),
)
def _build_platform_risk_flag(
self,
manifest: dict[str, Any],
*,
message: str,
evidence: dict[str, Any],
) -> dict[str, Any]:
return build_platform_risk_flag(
manifest,
message=message,
evidence=evidence,
default_business_stage=self._DEFAULT_RISK_BUSINESS_STAGE,
)
@staticmethod
def _context_item_ids(contexts: list[dict[str, Any]]) -> list[str]:
item_ids: list[str] = []
seen: set[str] = set()
for context in list(contexts or []):
item = context.get("item") if isinstance(context, dict) else None
item_id = str(getattr(item, "id", "") or "").strip()
if item_id and item_id not in seen:
seen.add(item_id)
item_ids.append(item_id)
return item_ids
@staticmethod
def _with_related_item_ids(flag: dict[str, Any], item_ids: list[str]) -> dict[str, Any]:
normalized_item_ids = list(
dict.fromkeys(str(item_id or "").strip() for item_id in list(item_ids or []) if str(item_id or "").strip())
)
if not normalized_item_ids:
return flag
flag["item_ids"] = normalized_item_ids
if len(normalized_item_ids) == 1:
flag["item_id"] = normalized_item_ids[0]
return flag