Files
X-Financial/server/src/app/services/risk_rule_generation_interpreter.py

76 lines
2.6 KiB
Python
Raw Normal View History

from __future__ import annotations
from typing import Any
COMPOSITE_RULE_TEMPLATE_KEY = "composite_rule_v1"
COMPOSITE_RULE_OPERATORS = {
"exists_any",
"exists_all",
"all_present",
"in_scope",
"not_in_scope",
"not_in_set",
"overlap",
"not_overlap",
"date_outside_range",
"numeric_compare",
"duplicate_value",
"contains_any",
"not_contains_any",
}
def build_dsl_from_semantic_plan(semantic_plan: dict[str, Any]) -> dict[str, Any]:
"""把模型语义计划转换成可交给 validator 继续规范化的 DSL 草稿。"""
if not isinstance(semantic_plan, dict):
return {}
text_parts = _semantic_text_parts(semantic_plan)
field_keys = _semantic_field_keys(semantic_plan)
if not text_parts and not field_keys:
return {}
return {
"template_key": COMPOSITE_RULE_TEMPLATE_KEY,
"field_keys": field_keys,
"description": str(semantic_plan.get("rule_intent") or "").strip(),
"condition_summary": "".join(text_parts)[:800],
"keywords": [],
"rule_ir": {
"facts": field_keys,
"conditions": text_parts,
"hit_logic": "由 DSL validator 根据字段本体和语义步骤生成受控条件",
},
}
def _semantic_text_parts(semantic_plan: dict[str, Any]) -> list[str]:
parts: list[str] = []
for key in ("rule_intent", "scope", "judgment_steps", "exception_conditions", "risk_action"):
parts.extend(_flatten_semantic_text(semantic_plan.get(key)))
return [item for index, item in enumerate(parts) if item and item not in parts[:index]]
def _semantic_field_keys(semantic_plan: dict[str, Any]) -> list[str]:
keys: list[str] = []
for value in (semantic_plan.get("required_fields"), semantic_plan.get("fields")):
for item in value if isinstance(value, list) else []:
key = item if isinstance(item, str) else next(
(item.get(name) for name in ("field", "key", "field_key") if isinstance(item, dict) and item.get(name)),
"",
)
text = str(key or "").strip()
if "." in text and text not in keys:
keys.append(text)
return keys
def _flatten_semantic_text(value: Any) -> list[str]:
if isinstance(value, str):
return [value.strip()] if value.strip() else []
if isinstance(value, list):
return [item for value_item in value for item in _flatten_semantic_text(value_item)]
if isinstance(value, dict):
return [item for value_item in value.values() for item in _flatten_semantic_text(value_item)]
return []