Files
X-Financial/server/src/app/services/risk_rule_manifest_classifier.py
caoxiaozhu 34457f9c3e feat: 本体字段治理与风险规则模板执行器重构
- 新增本体字段注册表与字段治理审计脚本
- 重构风险规则模板执行器、DSL 验证与清单分类器
- 完善票据夹服务与差旅请求详情页交互
- 优化趋势图表与总览页数据展示
- 增强报销平台风险分级与模拟公司筛选
- 补充本体字段、风险规则生成与票据夹服务测试覆盖
2026-06-03 15:46:56 +08:00

121 lines
3.7 KiB
Python

from __future__ import annotations
from typing import Any
BUDGET_RISK_STAGES = {"budget_execution", "budget_control", "budget_review"}
def is_budget_risk_manifest(manifest: dict[str, Any]) -> bool:
"""判断规则是否属于预算治理风险,而不是普通费用行为风险。"""
if not isinstance(manifest, dict):
return False
metadata = manifest.get("metadata") if isinstance(manifest.get("metadata"), dict) else {}
applies_to = manifest.get("applies_to") if isinstance(manifest.get("applies_to"), dict) else {}
rule_code = str(manifest.get("rule_code") or "").strip().lower()
finance_rule_code = str(
manifest.get("finance_rule_code") or metadata.get("finance_rule_code") or ""
).strip().lower()
if rule_code.startswith("risk.budget.") or rule_code.startswith("budget."):
return True
if finance_rule_code.startswith("budget."):
return True
if _normalized_text(manifest.get("risk_domain") or metadata.get("risk_domain")) == "budget":
return True
domains = {_normalized_text(value) for value in _as_list(applies_to.get("domains"))}
if "budget" in domains and not domains.difference({"budget"}):
return True
stages = {
_normalized_text(value)
for value in [
*_as_list(manifest.get("business_stage")),
*_as_list(metadata.get("business_stage")),
*_as_list(applies_to.get("business_stages")),
]
}
if stages & BUDGET_RISK_STAGES:
return True
category_text = " ".join(
str(value or "")
for value in (
manifest.get("risk_category"),
metadata.get("risk_category"),
manifest.get("name"),
)
)
if "预算" in category_text and any(key.startswith("budget.") for key in _iter_field_keys(manifest)):
return True
return any(key.startswith("budget.") for key in _iter_field_keys(manifest))
def _iter_field_keys(value: Any) -> list[str]:
keys: list[str] = []
def visit(node: Any) -> None:
if isinstance(node, dict):
for key, item in node.items():
normalized_key = str(key or "").strip()
if normalized_key in {
"key",
"field",
"left",
"right",
"field_key",
"fieldKey",
}:
_append_key(item)
elif normalized_key in {
"fields",
"field_keys",
"fieldKeys",
"search_fields",
"searchFields",
"left_fields",
"leftFields",
"right_fields",
"rightFields",
"left_group",
"leftGroup",
"right_group",
"rightGroup",
"date_fields",
"range_start_fields",
"range_end_fields",
}:
for child in _as_list(item):
_append_key(child)
visit(item)
return
if isinstance(node, list):
for item in node:
visit(item)
def _append_key(item: Any) -> None:
text = str(item or "").strip().lower()
if text and text not in keys:
keys.append(text)
visit(value)
return keys
def _as_list(value: Any) -> list[Any]:
if isinstance(value, list):
return value
if isinstance(value, (tuple, set)):
return list(value)
if value in (None, ""):
return []
return [value]
def _normalized_text(value: Any) -> str:
return str(value or "").strip().lower()