Files
X-Financial/server/src/app/algorithem/risk_graph/decisioning.py

133 lines
4.4 KiB
Python
Raw Normal View History

"""Decision trace and explanation helpers for risk graph observations."""
from __future__ import annotations
from dataclasses import dataclass, field
from decimal import Decimal
from typing import Any
from .models import PeerBaseline, RiskEvidence
RISK_SCORE_FORMULA = (
"0.35*S_rule + 0.25*S_anomaly + "
"0.20*S_graph + 0.15*S_policy + 0.05*S_history"
)
@dataclass(slots=True)
class DecisionTrace:
formula: str
algorithm_version: str
input_scores: dict[str, int]
output_score: int
decision_row: str
feature_contributions_json: list[dict[str, Any]]
uncertainty_reasons_json: list[str]
explanation_template_key: str
metadata: dict[str, Any] = field(default_factory=dict)
def as_dict(self) -> dict[str, Any]:
return {
"formula": self.formula,
"algorithm_version": self.algorithm_version,
"input_scores": dict(self.input_scores),
"output_score": self.output_score,
"decision_row": self.decision_row,
"feature_contributions_json": list(self.feature_contributions_json),
"uncertainty_reasons_json": list(self.uncertainty_reasons_json),
"explanation_template_key": self.explanation_template_key,
**self.metadata,
}
class DecisionTraceBuilder:
def build(
self,
*,
algorithm_version: str,
risk_signal: str,
risk_level: str,
raw_risk_score: int,
risk_score: int,
contribution_scores: dict[str, int],
evidence: list[RiskEvidence],
baseline: PeerBaseline,
confidence: Decimal,
metadata: dict[str, Any],
) -> DecisionTrace:
return DecisionTrace(
formula=RISK_SCORE_FORMULA,
algorithm_version=algorithm_version,
input_scores=contribution_scores,
output_score=risk_score,
decision_row=_decision_row(risk_score=risk_score, risk_level=risk_level),
feature_contributions_json=_feature_contributions(contribution_scores),
uncertainty_reasons_json=_uncertainty_reasons(
raw_risk_score=raw_risk_score,
risk_score=risk_score,
evidence=evidence,
baseline=baseline,
confidence=confidence,
metadata=metadata,
),
explanation_template_key=f"risk.{risk_signal}.{risk_level}",
metadata=metadata,
)
def _decision_row(*, risk_score: int, risk_level: str) -> str:
if risk_score >= 90:
return f"{risk_level}:score>=90"
if risk_score >= 70:
return f"{risk_level}:70<=score<90"
if risk_score >= 45:
return f"{risk_level}:45<=score<70"
return f"{risk_level}:score<45"
def _feature_contributions(scores: dict[str, int]) -> list[dict[str, Any]]:
weights = {
"S_rule": Decimal("0.35"),
"S_anomaly": Decimal("0.25"),
"S_graph": Decimal("0.20"),
"S_policy": Decimal("0.15"),
"S_history": Decimal("0.05"),
}
rows = []
for key, score in scores.items():
weighted_score = Decimal(int(score or 0)) * weights.get(key, Decimal("0"))
rows.append(
{
"feature": key,
"score": int(score or 0),
"weight": str(weights.get(key, Decimal("0"))),
"weighted_score": float(weighted_score),
}
)
return sorted(rows, key=lambda item: item["weighted_score"], reverse=True)
def _uncertainty_reasons(
*,
raw_risk_score: int,
risk_score: int,
evidence: list[RiskEvidence],
baseline: PeerBaseline,
confidence: Decimal,
metadata: dict[str, Any],
) -> list[str]:
reasons: list[str] = []
if risk_score < raw_risk_score:
reasons.append("score_capped_by_gate")
if baseline.scope == "insufficient_sample" or baseline.sample_size <= 0:
reasons.append("peer_baseline_insufficient")
if confidence < Decimal("0.55"):
reasons.append("low_confidence")
if len({item.source for item in evidence if item.source}) < 2:
reasons.append("single_evidence_source")
if metadata.get("ontology_gate") == "candidate_only":
reasons.append("ontology_candidate_only")
if metadata.get("data_quality_gate") not in {"", "passed", None}:
reasons.append("data_quality_gate_not_passed")
return list(dict.fromkeys(reasons))