# ruff: noqa: E501
from __future__ import annotations
import html
from dataclasses import dataclass
from typing import Any
@dataclass(frozen=True)
class RiskRuleFlowDiagramField:
key: str
label: str
@dataclass(frozen=True)
class RiskRuleFlowDiagramSpec:
title: str
domain_label: str
severity: str
severity_label: str
fields: tuple[RiskRuleFlowDiagramField, ...]
start: str
evidence: str
decision: str
basis: str
pass_text: str
fail_text: str
fact_lines: tuple[str, ...] = ()
condition_lines: tuple[str, ...] = ()
hit_logic: str = ""
@dataclass(frozen=True)
class RiskRuleFlowDiagramPalette:
accent: str
accent_dark: str
border: str
surface: str
class RiskRuleFlowDiagramRenderer:
"""按 fireworks-tech-graph Style 7 OpenAI Official 生成只读流程 SVG。"""
_FONT = (
"-apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, Helvetica Neue, "
"'PingFang SC', 'Microsoft YaHei', 'Microsoft JhengHei', 'SimHei', sans-serif"
)
_TEXT = "#0d0d0d"
_MUTED = "#6e6e80"
_NEUTRAL_LINE = "#cbd5e1"
_NEUTRAL_BORDER = "#e2e8f0"
_NEUTRAL_SURFACE = "#ffffff"
_PALETTES = {
"low": RiskRuleFlowDiagramPalette(
accent="#2563eb",
accent_dark="#1d4ed8",
border="#bfdbfe",
surface="#eff6ff",
),
"medium": RiskRuleFlowDiagramPalette(
accent="#f97316",
accent_dark="#c2410c",
border="#fed7aa",
surface="#fff7ed",
),
"high": RiskRuleFlowDiagramPalette(
accent="#dc2626",
accent_dark="#b91c1c",
border="#fecaca",
surface="#fef2f2",
),
"critical": RiskRuleFlowDiagramPalette(
accent="#991b1b",
accent_dark="#7f1d1d",
border="#fca5a5",
surface="#fff1f2",
),
}
def render(self, spec: RiskRuleFlowDiagramSpec) -> str:
title = self._truncate(spec.title, 26)
palette = self._palette(spec.severity)
fact_lines = spec.fact_lines or self._field_lines(spec.fields)
condition_lines = spec.condition_lines or (spec.basis,)
hit_logic = spec.hit_logic or spec.basis
return f""""""
def _node(
self,
title: str,
body: str,
x: int,
y: int,
width: int,
height: int,
palette: RiskRuleFlowDiagramPalette | None = None,
) -> str:
body_lines = self._wrap(body, 10 if width <= 126 else 11, 1)
border = palette.border if palette else self._NEUTRAL_BORDER
stripe = palette.accent if palette else self._NEUTRAL_LINE
surface = palette.surface if palette else self._NEUTRAL_SURFACE
return f"""
{self._escape(title)}
{self._text_lines(body_lines, x + width / 2, y + 43, "middle", self._MUTED, 11)}
"""
def _diamond(
self,
title: str,
body: str,
x: int,
y: int,
width: int,
height: int,
) -> str:
cx = x + width / 2
cy = y + height / 2
points = f"{cx},{y} {x + width},{cy} {cx},{y + height} {x},{cy}"
body_lines = self._wrap(body, 8, 2)
return f"""
{self._escape(title)}
{self._text_lines(body_lines, cx, cy + 11, "middle", self._MUTED, 10.2)}
"""
def _panel(
self,
title: str,
lines: tuple[str, ...],
x: int,
y: int,
width: int,
height: int,
) -> str:
visible = [self._truncate(line, 36) for line in list(lines)[:4]]
if not visible:
visible = ["读取规则字段并归一化为判断事实"]
rows = "\n ".join(
f'{self._escape(line)}'
for index, line in enumerate(visible)
)
return f"""
{self._escape(title)}
{rows}
"""
def _note(
self,
body: str,
x: int,
y: int,
width: int,
height: int,
) -> str:
lines = self._wrap(body, 22, 1)
return f"""
BASIS
{self._text_lines(lines, x + 54, y + 22, "start", self._TEXT, 10.2)}
"""
def _field_lines(self, fields: tuple[RiskRuleFlowDiagramField, ...]) -> tuple[str, ...]:
rows = []
for index, field in enumerate(fields[:4]):
label = field.label or field.key
rows.append(f"{chr(65 + index)}={label}[{field.key}]")
return tuple(rows)
def _text_lines(
self,
lines: list[str],
x: float,
y: float,
anchor: str,
color: str,
font_size: float,
) -> str:
return "\n ".join(
f'{self._escape(line)}'
for index, line in enumerate(lines)
)
@staticmethod
def _wrap(value: str, width: int, max_lines: int) -> list[str]:
text = str(value or "").strip()
if not text:
return [""]
lines = [text[index : index + width] for index in range(0, len(text), width)]
if len(lines) > max_lines:
lines = lines[:max_lines]
lines[-1] = f"{lines[-1][: max(0, width - 1)]}…"
return lines
@staticmethod
def _truncate(value: str, length: int) -> str:
text = str(value or "").strip()
return text if len(text) <= length else f"{text[: length - 1]}…"
@staticmethod
def _escape(value: str) -> str:
return html.escape(str(value or ""), quote=True)
@classmethod
def _palette(cls, severity: str) -> RiskRuleFlowDiagramPalette:
return cls._PALETTES.get(str(severity or "").strip().lower(), cls._PALETTES["medium"])
def build_risk_rule_flow_diagram_details(
payload: dict[str, Any],
fields: list[RiskRuleFlowDiagramField],
) -> dict[str, tuple[str, ...] | str]:
params = payload.get("params") if isinstance(payload.get("params"), dict) else {}
rule_ir = params.get("rule_ir") if isinstance(params.get("rule_ir"), dict) else {}
facts = rule_ir.get("facts") if isinstance(rule_ir.get("facts"), list) else []
fact_lines = _build_fact_lines(facts, fields)
condition_lines = _build_condition_lines(params, fields)
hit_logic = _format_hit_logic(params.get("hit_logic")) or str(
params.get("formula") or params.get("condition_summary") or ""
).strip()
return {
"fact_lines": tuple(fact_lines),
"condition_lines": tuple(condition_lines),
"hit_logic": hit_logic,
}
def _build_fact_lines(
facts: list[Any],
fields: list[RiskRuleFlowDiagramField],
) -> list[str]:
label_by_key = {field.key: field.label or field.key for field in fields}
rows: list[str] = []
for fact in facts[:4]:
if not isinstance(fact, dict):
continue
fact_id = str(fact.get("id") or "").strip()
label = str(fact.get("label") or fact_id or "事实").strip()
field_keys = _read_string_list(fact.get("fields"))
field_text = "∪".join(label_by_key.get(key, key) for key in field_keys[:3])
rows.append(f"{fact_id + '=' if fact_id else ''}{label}: {field_text or '规则字段'}")
if rows:
return rows
return [
f"{chr(65 + index)}={field.label or field.key}[{field.key}]"
for index, field in enumerate(fields[:4])
]
def _build_condition_lines(
params: dict[str, Any],
fields: list[RiskRuleFlowDiagramField],
) -> list[str]:
label_by_key = {field.key: field.label or field.key for field in fields}
rows: list[str] = []
conditions = params.get("conditions") if isinstance(params.get("conditions"), list) else []
for index, condition in enumerate(conditions[:4], start=1):
if not isinstance(condition, dict):
continue
rows.append(_format_condition(condition, label_by_key, index))
if rows:
return rows
summary = str(params.get("condition_summary") or "").strip()
return [summary] if summary else []
def _format_condition(condition: dict[str, Any], label_by_key: dict[str, str], index: int) -> str:
operator = str(condition.get("operator") or "").strip()
condition_id = str(condition.get("id") or f"C{index}").strip()
prefix = f"{condition_id}: "
if operator in {"not_in_scope", "not_in_set", "not_overlap"}:
left = _field_group(condition.get("left_fields"), label_by_key)
right = _field_group(condition.get("right_fields"), label_by_key)
return f"{prefix}{left} ∩ {right} = ∅"
if operator in {"in_scope", "overlap"}:
left = _field_group(condition.get("left_fields"), label_by_key)
right = _field_group(condition.get("right_fields"), label_by_key)
return f"{prefix}{left} ∩ {right} ≠ ∅"
if operator == "date_outside_range":
dates = _field_group(condition.get("date_fields"), label_by_key)
start = _field_group(condition.get("range_start_fields"), label_by_key)
end = _field_group(condition.get("range_end_fields"), label_by_key)
return f"{prefix}{dates} 不在 [{start}, {end}]"
if operator in {"contains_any", "not_contains_any"}:
fields = _field_group(condition.get("fields"), label_by_key)
keywords = "、".join(_read_string_list(condition.get("keywords"))[:4])
verb = "不含" if operator == "not_contains_any" else "包含"
return f"{prefix}{fields} {verb} {keywords or '关键词'}"
if operator in {"exists_any", "exists_all", "all_present"}:
fields = _field_group(condition.get("fields"), label_by_key)
verb = "任一有值" if operator == "exists_any" else "全部有值"
return f"{prefix}{fields} {verb}"
left = str(condition.get("left") or "").strip()
right = str(condition.get("right") or "").strip()
if left or right:
return f"{prefix}{label_by_key.get(left, left)} {operator or 'compare'} {label_by_key.get(right, right)}"
return f"{prefix}{operator or '规则条件'}"
def _field_group(value: Any, label_by_key: dict[str, str]) -> str:
keys = _read_string_list(value)
if not keys:
return "字段集合"
return "∪".join(label_by_key.get(key, key) for key in keys[:3])
def _format_hit_logic(value: Any) -> str:
if isinstance(value, str):
return value.strip()
if isinstance(value, list):
return " AND ".join(_format_hit_logic(item) for item in value if _format_hit_logic(item))
if not isinstance(value, dict):
return ""
if isinstance(value.get("all"), list):
return " AND ".join(_wrap_logic_part(item) for item in value["all"])
if isinstance(value.get("any"), list):
return " OR ".join(_wrap_logic_part(item) for item in value["any"])
if "not" in value:
return f"NOT {_wrap_logic_part(value.get('not'))}"
return ""
def _wrap_logic_part(value: Any) -> str:
text = _format_hit_logic(value)
if isinstance(value, dict) and text:
return f"({text})"
return text
def _read_string_list(value: Any) -> list[str]:
if not isinstance(value, list):
return []
return [str(item or "").strip() for item in value if str(item or "").strip()]