Files
X-Financial/server/src/app/services/risk_rule_generation.py

823 lines
32 KiB
Python
Raw Normal View History

from __future__ import annotations
import json
import re
from dataclasses import dataclass
from datetime import UTC, datetime
from typing import Any
from sqlalchemy.orm import Session
from app.core.agent_enums import AgentAssetDomain, AgentAssetStatus, AgentAssetType
from app.models.agent_asset import AgentAsset, AgentAssetVersion
from app.schemas.agent_asset import AgentAssetRiskRuleGenerateRequest
from app.services.agent_asset_rule_library import AgentAssetRuleLibraryManager
from app.services.agent_asset_spreadsheet import RISK_RULES_LIBRARY
from app.services.audit import AuditLogService
from app.services.expense_type_keywords import EXPENSE_TYPE_LABEL_BY_CODE
from app.services.risk_rule_flow_diagram import (
RiskRuleFlowDiagramField,
RiskRuleFlowDiagramRenderer,
RiskRuleFlowDiagramSpec,
)
from app.services.runtime_chat import RuntimeChatService
@dataclass(frozen=True)
class RiskRuleField:
key: str
label: str
field_type: str
source: str
aliases: tuple[str, ...]
BUSINESS_DOMAIN_LABELS: dict[str, str] = {
AgentAssetDomain.EXPENSE.value: "报销",
AgentAssetDomain.AR.value: "应收",
AgentAssetDomain.AP.value: "应付",
}
RISK_LEVEL_LABELS: dict[str, str] = {
"low": "低风险",
"medium": "中风险",
"high": "高风险",
}
EXPENSE_RISK_CATEGORY_CODES: tuple[str, ...] = (
"travel",
"hotel",
"transport",
"meal",
"meeting",
"office",
"training",
"communication",
"welfare",
)
EXPENSE_RISK_CATEGORY_LABELS: dict[str, str] = {
code: EXPENSE_TYPE_LABEL_BY_CODE[code] for code in EXPENSE_RISK_CATEGORY_CODES
}
EXPENSE_RISK_CATEGORY_ALIASES = {
"entertainment": "meal",
}
FIELD_ONTOLOGY: tuple[RiskRuleField, ...] = (
RiskRuleField("claim.reason", "报销事由", "text", "claim", ("事由", "说明", "理由", "用途")),
RiskRuleField(
"claim.location",
"申报地点",
"text",
"claim",
("地点", "城市", "出差地", "申报地点", "申报目的地", "目的地"),
),
RiskRuleField("claim.amount", "申报金额", "number", "claim", ("金额", "费用", "超额", "额度")),
RiskRuleField("claim.employee_name", "报销人", "text", "claim", ("报销人", "员工", "申请人")),
RiskRuleField("claim.department_name", "部门", "text", "claim", ("部门", "组织")),
RiskRuleField("item.item_type", "费用类型", "enum", "item", ("费用类型", "科目", "类型")),
RiskRuleField("item.item_reason", "明细事由", "text", "item", ("明细事由", "明细说明")),
RiskRuleField("item.item_location", "明细地点", "text", "item", ("明细地点", "发生地点")),
RiskRuleField(
"attachment.invoice_no", "发票号码", "text", "attachment", ("发票号", "发票号码", "票号")
),
RiskRuleField(
"attachment.buyer_name", "购买方名称", "text", "attachment", ("抬头", "购买方", "开票单位")
),
RiskRuleField(
"attachment.goods_name",
"商品服务名称",
"text",
"attachment",
("品名", "商品", "服务名称", "摘要"),
),
RiskRuleField(
"attachment.issue_date",
"开票日期",
"date",
"attachment",
("开票日期", "发票日期", "票据日期"),
),
RiskRuleField(
"attachment.hotel_city",
"住宿城市",
"text",
"attachment",
("住宿城市", "酒店城市", "酒店地点", "酒店发票城市", "酒店票城市", "住宿发票城市"),
),
RiskRuleField(
"attachment.route_cities",
"行程城市",
"list",
"attachment",
("行程", "路线", "途经城市", "出差城市", "交通票行程", "交通票城市"),
),
RiskRuleField(
"attachment.ocr_text",
"票据全文",
"text",
"attachment",
("票据内容", "OCR", "全文", "关键字", "关键词"),
),
RiskRuleField(
"receivable.aging_days", "应收账龄", "number", "receivable", ("账龄", "逾期", "应收逾期")
),
RiskRuleField(
"receivable.amount_outstanding",
"应收未收金额",
"number",
"receivable",
("未收金额", "欠款", "应收余额"),
),
RiskRuleField(
"payable.vendor_name", "供应商名称", "text", "payable", ("供应商", "付款方", "往来单位")
),
RiskRuleField(
"payable.amount_outstanding", "应付未付金额", "number", "payable", ("未付金额", "应付余额")
),
)
DOMAIN_FIELD_PREFIXES: dict[str, tuple[str, ...]] = {
AgentAssetDomain.EXPENSE.value: ("claim.", "item.", "attachment."),
AgentAssetDomain.AR.value: ("receivable.",),
AgentAssetDomain.AP.value: ("payable.",),
}
class RiskRuleGenerationService:
def __init__(
self,
db: Session,
*,
rule_library_manager: AgentAssetRuleLibraryManager | None = None,
runtime_chat_service: RuntimeChatService | None = None,
) -> None:
self.db = db
self.rule_library_manager = rule_library_manager or AgentAssetRuleLibraryManager()
self.runtime_chat_service = runtime_chat_service or RuntimeChatService(db)
self.audit_service = AuditLogService(db)
self.flow_diagram_renderer = RiskRuleFlowDiagramRenderer()
def generate_rule_asset(
self,
body: AgentAssetRiskRuleGenerateRequest,
*,
actor: str,
request_id: str | None = None,
) -> str:
domain = body.business_domain.value
if domain not in BUSINESS_DOMAIN_LABELS:
raise ValueError("当前仅支持报销、应收、应付业务域的新建风险规则。")
natural_language = self._clean_text(body.natural_language)
if len(natural_language) < 8:
raise ValueError("请至少输入 8 个字的风险规则描述。")
risk_level = str(body.risk_level or "medium").strip().lower()
if risk_level not in RISK_LEVEL_LABELS:
raise ValueError("风险等级仅支持 low、medium、high。")
requires_attachment = bool(body.requires_attachment)
expense_category = self._normalize_expense_category(body.expense_category, domain)
expense_category_label = EXPENSE_RISK_CATEGORY_LABELS.get(expense_category or "", "")
created_at = datetime.now(UTC)
fields = self._resolve_fields(natural_language, domain=domain)
draft = self._compile_with_model(
natural_language=natural_language,
domain=domain,
expense_category=expense_category,
expense_category_label=expense_category_label,
risk_level=risk_level,
fields=fields,
) or self._build_fallback_draft(
natural_language=natural_language,
domain=domain,
expense_category_label=expense_category_label,
risk_level=risk_level,
fields=fields,
)
draft = self._align_draft_fields(
draft,
natural_language=natural_language,
fields=fields,
)
payload = self._build_rule_payload(
draft,
natural_language=natural_language,
domain=domain,
expense_category=expense_category,
expense_category_label=expense_category_label,
risk_level=risk_level,
fields=fields,
created_at=created_at,
actor=actor,
requires_attachment=requires_attachment,
)
rule_code = str(payload["rule_code"])
file_name = f"{rule_code}.json"
self.rule_library_manager.write_rule_library_json(
library=RISK_RULES_LIBRARY,
file_name=file_name,
payload=payload,
)
asset = AgentAsset(
asset_type=AgentAssetType.RULE.value,
code=rule_code,
name=str(payload["name"]),
description=str(payload["description"]),
domain=domain,
scenario_json=[str(payload.get("risk_category") or BUSINESS_DOMAIN_LABELS[domain])],
owner=actor,
reviewer=None,
status=AgentAssetStatus.DRAFT.value,
current_version="v0.1.0",
published_version=None,
working_version="v0.1.0",
config_json={
"severity": risk_level,
"enabled": True,
"requires_attachment": requires_attachment,
"tag": "风险规则",
"detail_mode": "json_risk",
"expense_category": expense_category,
"expense_category_label": expense_category_label,
"risk_category": payload.get("risk_category"),
"rule_library": RISK_RULES_LIBRARY,
"rule_document": {
"file_name": file_name,
"storage_key": f"rules/{RISK_RULES_LIBRARY}/{file_name}",
},
"ontology_signal": payload.get("ontology_signal"),
"evaluator": payload.get("evaluator"),
"generated_by": "natural_language",
"source_ref": "自然语言风险规则",
},
)
self.db.add(asset)
self.db.flush()
self.db.add(
AgentAssetVersion(
asset_id=asset.id,
version="v0.1.0",
content=self._build_version_markdown(payload),
content_type="markdown",
change_note="通过自然语言新建风险规则草稿。",
created_by=actor,
)
)
self.audit_service.log_action(
actor=actor,
action="generate_agent_asset_risk_rule",
resource_type=AgentAssetType.RULE.value,
resource_id=asset.id,
before_json=None,
after_json={
"rule_code": rule_code,
"risk_level": risk_level,
"domain": domain,
"expense_category": expense_category,
"requires_attachment": requires_attachment,
},
request_id=request_id,
)
self.db.refresh(asset)
return asset.id
def _compile_with_model(
self,
*,
natural_language: str,
domain: str,
expense_category: str | None,
expense_category_label: str,
risk_level: str,
fields: list[RiskRuleField],
) -> dict[str, Any] | None:
field_payload = [
{
"key": item.key,
"label": item.label,
"type": item.field_type,
"source": item.source,
}
for item in fields
]
messages = [
{
"role": "system",
"content": (
"你是 X-Financial 风险规则编译器。只能输出 JSON 对象,不要解释。"
"必须从给定字段本体中选择字段,不允许编造字段。"
"template_key 只能是 field_required_v1、field_compare_v1、keyword_match_v1。"
),
},
{
"role": "user",
"content": json.dumps(
{
"business_domain": domain,
"business_domain_label": BUSINESS_DOMAIN_LABELS[domain],
"expense_category": expense_category,
"expense_category_label": expense_category_label,
"risk_level": risk_level,
"risk_level_label": RISK_LEVEL_LABELS[risk_level],
"natural_language": natural_language,
"available_fields": field_payload,
"required_json_shape": {
"name": "规则名称",
"description": "面向业务用户的说明",
"template_key": "field_required_v1",
"field_keys": ["claim.reason"],
"condition_summary": "判断依据",
"keywords": [],
"flow": {
"start": "提交业务单据",
"evidence": "读取字段",
"decision": "判断依据",
"pass": "继续流转",
"fail": "提示风险",
},
},
},
ensure_ascii=False,
),
},
]
answer = self.runtime_chat_service.complete(
messages,
max_tokens=700,
temperature=0.1,
timeout_seconds=12,
max_attempts=1,
)
if not answer:
return None
try:
payload = json.loads(self._extract_json_object(answer))
except (json.JSONDecodeError, ValueError):
return None
if not isinstance(payload, dict):
return None
return self._sanitize_model_draft(payload, fields=fields)
def _sanitize_model_draft(
self,
payload: dict[str, Any],
*,
fields: list[RiskRuleField],
) -> dict[str, Any]:
allowed_fields = {item.key for item in fields}
template_key = str(payload.get("template_key") or "").strip()
if template_key not in {"field_required_v1", "field_compare_v1", "keyword_match_v1"}:
template_key = "field_required_v1"
raw_field_keys = payload.get("field_keys")
field_keys = [
str(item or "").strip()
for item in (raw_field_keys if isinstance(raw_field_keys, list) else [])
if str(item or "").strip() in allowed_fields
]
if not field_keys and fields:
field_keys = [fields[0].key]
keywords = [
str(item or "").strip()
for item in (
payload.get("keywords") if isinstance(payload.get("keywords"), list) else []
)
if str(item or "").strip()
]
flow = payload.get("flow") if isinstance(payload.get("flow"), dict) else {}
return {
"name": self._clean_text(payload.get("name"))[:80],
"description": self._clean_text(payload.get("description")),
"template_key": template_key,
"field_keys": field_keys,
"condition_summary": self._clean_text(payload.get("condition_summary")),
"keywords": keywords[:12],
"flow": {
"start": self._clean_text(flow.get("start")) or "提交业务单据",
"evidence": self._clean_text(flow.get("evidence")) or "读取规则字段",
"decision": self._clean_text(flow.get("decision")) or "判断是否命中风险",
"pass": self._clean_text(flow.get("pass")) or "继续流转",
"fail": self._clean_text(flow.get("fail")) or "提示风险并进入复核",
},
}
def _build_fallback_draft(
self,
*,
natural_language: str,
domain: str,
expense_category_label: str,
risk_level: str,
fields: list[RiskRuleField],
) -> dict[str, Any]:
field_keys = [item.key for item in fields[:4]]
template_key = self._infer_template_key(natural_language)
condition_summary = self._build_condition_summary(
natural_language,
template_key=template_key,
fields=fields,
)
name = self._infer_rule_name(natural_language)
business_label = expense_category_label or BUSINESS_DOMAIN_LABELS[domain]
description = (
f"{business_label}业务满足“{natural_language}”时,系统会按"
f"{RISK_LEVEL_LABELS[risk_level]}进行提示,并要求经办人或审核人补充核对依据。"
)
return {
"name": name,
"description": description,
"template_key": template_key,
"field_keys": field_keys,
"condition_summary": condition_summary,
"keywords": self._infer_keywords(natural_language),
"flow": {
"start": f"{business_label}单据提交",
"evidence": "读取" + "".join(item.label for item in fields[:3]),
"decision": condition_summary,
"pass": "未命中风险,继续业务流转",
"fail": f"命中{RISK_LEVEL_LABELS[risk_level]},提示复核",
},
}
def _build_rule_payload(
self,
draft: dict[str, Any],
*,
natural_language: str,
domain: str,
expense_category: str | None,
expense_category_label: str,
risk_level: str,
fields: list[RiskRuleField],
created_at: datetime,
actor: str,
requires_attachment: bool,
) -> dict[str, Any]:
created_stamp = created_at.strftime("%Y%m%d%H%M%S%f")
domain_slug = {"expense": "expense", "ar": "ar", "ap": "ap"}[domain]
category_slug = f".{expense_category}" if expense_category else ""
rule_code = f"risk.{domain_slug}{category_slug}.generated_{created_stamp}"
template_key = str(draft.get("template_key") or "field_required_v1").strip()
field_keys = [
str(item or "").strip()
for item in list(draft.get("field_keys") or [])
if str(item or "").strip()
]
condition_summary = (
self._clean_text(draft.get("condition_summary")) or "判断是否符合自然语言规则描述"
)
risk_category = expense_category_label or BUSINESS_DOMAIN_LABELS[domain]
keywords = list(draft.get("keywords") or [])
field_by_key = {item.key: item for item in fields}
params: dict[str, Any] = {
"template_key": template_key,
"field_keys": field_keys,
"condition_summary": condition_summary,
"natural_language": natural_language,
}
if template_key == "field_required_v1":
params["required_fields"] = field_keys
if template_key == "field_compare_v1":
params["conditions"] = self._build_compare_conditions(field_keys)
if template_key == "keyword_match_v1":
params["keywords"] = keywords
params["search_fields"] = field_keys
applies_to: dict[str, Any] = {"domains": [domain]}
if expense_category:
applies_to["expense_categories"] = [expense_category]
payload = {
"schema_version": "2.0",
"rule_code": rule_code,
"name": self._clean_text(draft.get("name")) or self._infer_rule_name(natural_language),
"description": self._clean_text(draft.get("description")) or natural_language,
"enabled": True,
"requires_attachment": requires_attachment,
"risk_dimension": "natural_language_rule",
"risk_category": risk_category,
"ontology_signal": "natural_language_risk",
"evaluator": "template_rule",
"template_key": template_key,
"applies_to": applies_to,
"inputs": {
"fields": [
{
"key": item.key,
"label": item.label,
"type": item.field_type,
"source": item.source,
}
for item in [field_by_key[key] for key in field_keys if key in field_by_key]
],
},
"params": params,
"outcomes": {
"pass": {"severity": "none", "action": "continue"},
"fail": {
"severity": risk_level,
"action": "manual_review",
},
},
"metadata": {
"owner": actor,
"stability": "generated_draft",
"source_ref": "自然语言风险规则",
"created_at": created_at.isoformat(),
"created_by": actor,
"requires_attachment": requires_attachment,
"expense_category": expense_category,
"expense_category_label": expense_category_label,
"natural_language": natural_language,
"business_explanation": self._clean_text(draft.get("description")),
"condition_summary": condition_summary,
"flow": draft.get("flow") if isinstance(draft.get("flow"), dict) else {},
},
}
payload["flow_diagram_svg"] = self._build_flow_diagram_svg(
payload,
fields=[field_by_key[key] for key in field_keys if key in field_by_key],
domain=domain,
domain_label=risk_category,
risk_level=risk_level,
)
return payload
def _build_flow_diagram_svg(
self,
payload: dict[str, Any],
*,
fields: list[RiskRuleField],
domain: str,
domain_label: str | None = None,
risk_level: str,
) -> str:
metadata = payload.get("metadata") if isinstance(payload.get("metadata"), dict) else {}
flow = metadata.get("flow") if isinstance(metadata.get("flow"), dict) else {}
condition_summary = self._clean_text(metadata.get("condition_summary"))
return self.flow_diagram_renderer.render(
RiskRuleFlowDiagramSpec(
title=self._clean_text(payload.get("name")) or "风险规则判断流程",
domain_label=domain_label or BUSINESS_DOMAIN_LABELS.get(domain, "业务"),
severity=risk_level,
severity_label=RISK_LEVEL_LABELS.get(risk_level, "中风险"),
fields=tuple(
RiskRuleFlowDiagramField(key=field.key, label=field.label) for field in fields
),
start=self._clean_text(flow.get("start")) or "业务单据提交",
evidence=self._clean_text(flow.get("evidence")) or "读取规则字段",
decision=self._clean_text(flow.get("decision"))
or condition_summary
or "判断是否命中风险",
basis=(
condition_summary
or self._clean_text(flow.get("decision"))
or "根据规则字段判断"
),
pass_text=self._clean_text(flow.get("pass")) or "未命中风险,继续流转",
fail_text=self._clean_text(flow.get("fail"))
or f"命中{RISK_LEVEL_LABELS.get(risk_level, '风险')},进入人工复核",
)
)
@staticmethod
def _normalize_expense_category(value: str | None, domain: str) -> str | None:
if domain != AgentAssetDomain.EXPENSE.value:
return None
normalized = str(value or "").strip().lower()
if not normalized:
return None
normalized = EXPENSE_RISK_CATEGORY_ALIASES.get(normalized, normalized)
if normalized not in EXPENSE_RISK_CATEGORY_LABELS:
allowed = "".join(EXPENSE_RISK_CATEGORY_LABELS.values())
raise ValueError(f"费用领域仅支持:{allowed}")
return normalized
def _resolve_fields(self, text: str, *, domain: str) -> list[RiskRuleField]:
prefixes = DOMAIN_FIELD_PREFIXES.get(domain, ())
candidates = [field for field in FIELD_ONTOLOGY if field.key.startswith(prefixes)]
normalized = text.lower()
matched: list[tuple[int, RiskRuleField]] = []
for field in candidates:
score = self._score_field_match(field, text, normalized)
if score > 0:
matched.append((score, field))
if domain == AgentAssetDomain.EXPENSE.value:
if any(keyword in text for keyword in ("住宿", "酒店", "行程", "城市", "出差")):
matched.extend(
(10, field)
for field in candidates
if field.key
in {"claim.location", "attachment.hotel_city", "attachment.route_cities"}
)
if any(keyword in text for keyword in ("发票", "票据", "品名", "抬头", "开票")):
matched.extend(
(6, field)
for field in candidates
if field.key
in {
"attachment.invoice_no",
"attachment.buyer_name",
"attachment.goods_name",
"attachment.ocr_text",
}
)
matched.sort(key=lambda item: item[0], reverse=True)
deduped: list[RiskRuleField] = []
seen: set[str] = set()
for _, field in matched:
if field.key in seen:
continue
seen.add(field.key)
deduped.append(field)
if deduped:
return deduped[:8]
return candidates[:4]
@staticmethod
def _score_field_match(field: RiskRuleField, text: str, normalized: str) -> int:
score = 0
if field.label in text:
score += 8
for alias in field.aliases:
if alias.lower() in normalized:
score += 4 + min(len(alias), 6)
if field.key == "attachment.hotel_city" and any(term in text for term in ("酒店", "住宿")):
score += 12
if field.key == "attachment.route_cities" and any(
term in text for term in ("行程", "交通票", "路线", "途经")
):
score += 10
if field.key == "claim.location" and any(
term in text for term in ("申报目的地", "申报地点", "目的地", "出差地")
):
score += 10
if field.key.startswith("attachment.") and any(term in text for term in ("发票", "票据")):
score += 2
return score
def _align_draft_fields(
self,
draft: dict[str, Any],
*,
natural_language: str,
fields: list[RiskRuleField],
) -> dict[str, Any]:
field_by_key = {field.key: field for field in fields}
original_keys = [
str(item or "").strip()
for item in list(draft.get("field_keys") or [])
if str(item or "").strip() in field_by_key
]
preferred_keys: list[str] = []
def add_preferred(key: str, *terms: str) -> None:
if key in field_by_key and any(term in natural_language for term in terms):
preferred_keys.append(key)
add_preferred("attachment.hotel_city", "酒店", "住宿")
add_preferred("claim.location", "申报目的地", "申报地点", "目的地", "出差地")
add_preferred("attachment.route_cities", "行程", "交通票", "路线", "途经")
merged_keys: list[str] = []
for key in [*preferred_keys, *original_keys, *[field.key for field in fields]]:
if key in field_by_key and key not in merged_keys:
merged_keys.append(key)
if len(merged_keys) >= 4:
break
if draft.get("template_key") == "field_compare_v1" and len(merged_keys) < 2:
for field in fields:
if field.key not in merged_keys:
merged_keys.append(field.key)
if len(merged_keys) >= 2:
break
aligned = {**draft, "field_keys": merged_keys}
selected_fields = [field_by_key[key] for key in merged_keys if key in field_by_key]
if selected_fields:
aligned["condition_summary"] = self._build_condition_summary(
natural_language,
template_key=str(aligned.get("template_key") or "field_required_v1"),
fields=selected_fields,
)
flow = aligned.get("flow") if isinstance(aligned.get("flow"), dict) else {}
aligned["flow"] = {
**flow,
"evidence": "读取" + "".join(field.label for field in selected_fields[:3]),
"decision": aligned["condition_summary"],
}
return aligned
@staticmethod
def _build_compare_conditions(field_keys: list[str]) -> list[dict[str, str]]:
if len(field_keys) >= 2:
return [{"left": field_keys[0], "operator": "overlap", "right": field_keys[1]}]
if field_keys:
return [{"left": field_keys[0], "operator": "is_empty", "right": ""}]
return []
@staticmethod
def _infer_template_key(text: str) -> str:
if any(
keyword in text
for keyword in ("一致", "匹配", "相同", "不一致", "不符", "对应", "出现在")
):
return "field_compare_v1"
if any(
keyword in text
for keyword in ("关键词", "包含", "出现", "品名", "摘要", "服务费", "咨询费")
):
return "keyword_match_v1"
return "field_required_v1"
@staticmethod
def _infer_keywords(text: str) -> list[str]:
quoted = re.findall(r"[“\"']([^“”\"']{2,20})[”\"']", text)
keywords = [item.strip() for item in quoted if item.strip()]
for candidate in ("咨询费", "服务费", "其他", "办公用品", "招待", "红冲", "作废"):
if candidate in text and candidate not in keywords:
keywords.append(candidate)
return keywords[:8]
@staticmethod
def _infer_rule_name(text: str) -> str:
normalized = re.sub(r"\s+", "", str(text or ""))
normalized = re.sub(r"[,。;;:、,.!?]", "", normalized)
if not normalized:
return "自然语言风险规则"
return f"{normalized[:18]}风险规则"
@staticmethod
def _build_condition_summary(
natural_language: str,
*,
template_key: str,
fields: list[RiskRuleField],
) -> str:
field_text = "".join(item.label for item in fields[:3]) or "业务字段"
if template_key == "field_compare_v1":
return f"对比{field_text}之间是否一致或存在交集"
if template_key == "keyword_match_v1":
return f"检查{field_text}是否出现规则描述中的风险关键词"
return f"检查{field_text}是否满足必填和完整性要求"
@staticmethod
def _clean_text(value: Any) -> str:
return re.sub(r"\s+", " ", str(value or "")).strip()
@staticmethod
def _extract_json_object(text: str) -> str:
normalized = re.sub(r"^```(?:json)?|```$", "", str(text or "").strip(), flags=re.IGNORECASE)
start = normalized.find("{")
end = normalized.rfind("}")
if start < 0 or end <= start:
raise ValueError("JSON object not found.")
return normalized[start : end + 1]
@staticmethod
def _build_version_markdown(payload: dict[str, Any]) -> str:
metadata = payload.get("metadata") if isinstance(payload.get("metadata"), dict) else {}
fields = (
payload.get("inputs", {}).get("fields")
if isinstance(payload.get("inputs"), dict)
else []
)
field_labels = [
str(item.get("label") or item.get("key") or "").strip()
for item in fields
if isinstance(item, dict) and str(item.get("label") or item.get("key") or "").strip()
]
return "\n".join(
[
f"# {payload.get('name')}",
"",
"## 业务说明",
"",
str(payload.get("description") or ""),
"",
"## 自然语言原文",
"",
str(metadata.get("natural_language") or ""),
"",
"## 使用字段",
"",
"".join(field_labels) or "未识别字段",
"",
"## 运行时 JSON",
"",
"```json",
json.dumps(payload, ensure_ascii=False, indent=2),
"```",
]
)