Files
YG-Rules/app/utils/guidance_analysis.py

214 lines
9.2 KiB
Python
Raw Permalink Normal View History

2026-06-10 19:15:24 +08:00
"""政策指引文本分析。"""
from __future__ import annotations
import re
import shutil
import subprocess
import tempfile
from datetime import datetime
from pathlib import Path
class GuidanceAnalyzer:
GRANULARITY_MAP = {"low": "low", "high": "high", "coarse": "low", "medium": "high", "fine": "high"}
DOMAIN_KEYWORDS = {
"过度负债": ["资产负债率", "负债", "债务", "有息负债", "偿债", "预警", "现金净流量"],
"财务金融风险": ["穿透", "实际控制人", "最终受益人", "资金流向", "交易对手", "账户"],
"虚假贸易": ["贸易", "合同", "交易", "空转", "融资性贸易"],
}
PENETRATION_KEYWORDS = {
"主体穿透": ["实际控制人", "最终受益人", "责任主体", "关联方"],
"资金穿透": ["资金流向", "账户", "资金闭环", "回款"],
"交易穿透": ["交易对手", "合同", "贸易", "交易链条"],
}
def normalize_granularity(self, granularity: str | None) -> str:
return self.GRANULARITY_MAP.get((granularity or "high").lower(), granularity or "high")
def is_supported_granularity(self, granularity: str | None) -> bool:
return (granularity or "high").lower() in self.GRANULARITY_MAP
def extract_text(self, content: bytes, filename: str) -> dict:
suffix = Path(filename).suffix.lower()
if suffix == ".pdf":
return {"method": "pdf", "status": "ok", "text": self._extract_pdf_text(content)}
if suffix == ".docx":
return {"method": "docx", "status": "ok", "text": self._extract_docx_text(content)}
if suffix == ".doc":
return {"method": "doc", "status": "ok", "text": self._extract_doc_text(content)}
if suffix == ".md":
return {"method": "markdown", "status": "ok", "text": content.decode("utf-8-sig", errors="ignore")}
return {"method": "plain_text", "status": "ok", "text": content.decode("utf-8-sig", errors="ignore")}
def _extract_pdf_text(self, content: bytes) -> str:
from pypdf import PdfReader
with tempfile.SpooledTemporaryFile() as stream:
stream.write(content)
stream.seek(0)
reader = PdfReader(stream)
return "\n".join(page.extract_text() or "" for page in reader.pages)
def _extract_docx_text(self, content: bytes) -> str:
from docx import Document
with tempfile.SpooledTemporaryFile() as stream:
stream.write(content)
stream.seek(0)
document = Document(stream)
return "\n".join(paragraph.text for paragraph in document.paragraphs if paragraph.text.strip())
def _extract_doc_text(self, content: bytes) -> str:
antiword = shutil.which("antiword")
if not antiword:
raise RuntimeError("解析 .doc 文件需要容器安装 antiword")
with tempfile.NamedTemporaryFile(suffix=".doc") as file:
file.write(content)
file.flush()
result = subprocess.run(
[antiword, file.name],
check=True,
capture_output=True,
text=True,
encoding="utf-8",
errors="ignore",
)
return result.stdout
def split_sentences(self, paragraphs: list[str]) -> list[dict]:
sentences: list[dict] = []
for paragraph_index, paragraph in enumerate(paragraphs, start=1):
for part in re.split(r"(?<=[。!?!?;])\s*", paragraph.strip()):
text = part.strip()
if text:
sentences.append({
"text": text,
"paragraph_index": paragraph_index,
"sentence_index": len(sentences) + 1,
})
return sentences
def extract_regulation_refs(self, text: str) -> list[str]:
refs = re.findall(r"《[^》]+》", text)
return list(dict.fromkeys(refs))
def select_candidates(
self,
domain: str,
note: str,
sentences: list[dict],
granularity: str = "high",
) -> list[dict]:
keywords = set(self.DOMAIN_KEYWORDS.get(domain, []))
keywords.update(token for token in re.split(r"[\s,,、;]+", note or "") if token)
if not keywords and domain:
keywords.add(domain)
candidates = []
for sentence in sentences:
text = sentence["text"]
matched = [keyword for keyword in keywords if keyword and keyword.lower() in text.lower()]
penetration_hits = [
keyword
for words in self.PENETRATION_KEYWORDS.values()
for keyword in words
if keyword in text
]
if not matched and not penetration_hits:
continue
dimension = self._dimension_for(text)
score = len(matched) * 2 + len(penetration_hits)
candidates.append({
**sentence,
"matched_keywords": matched or penetration_hits,
"supervision_dimension": dimension,
"score": score,
})
candidates.sort(key=lambda item: (-item["score"], item["sentence_index"]))
limit = 12 if self.normalize_granularity(granularity) == "low" else 30
return candidates[:limit]
def analyze(
self,
domain_record: dict,
content: bytes,
filename: str,
granularity: str = "high",
) -> dict:
normalized = self.normalize_granularity(granularity)
if not self.is_supported_granularity(granularity):
raise ValueError(f"不支持的 granularity: {granularity}")
extraction = self.extract_text(content, filename)
text = extraction["text"]
paragraphs = [line.strip() for line in re.split(r"\n+", text) if line.strip()]
sentences = self.split_sentences(paragraphs)
candidates = self.select_candidates(domain_record.get("domain", ""), domain_record.get("note", ""), sentences, normalized)
if not candidates and sentences:
candidates = [sentences[0] | {"matched_keywords": [], "supervision_dimension": "一般监管"}]
refs = [] if normalized == "low" else self.extract_regulation_refs(text)
patterns = []
for index, candidate in enumerate(candidates[: (12 if normalized == "low" else 30)], start=1):
source = candidate["text"]
basis = self._basis_text(source)
dimension = candidate.get("supervision_dimension") or self._dimension_for(source)
pattern_text = self._pattern_text(refs, basis, dimension)
usage = "用于后续穿透识别实际控制关系、资金流向和责任主体认定。" if dimension != "一般监管" else "用于后续规则生成、选表或领域风险识别的参考句式。"
patterns.append({
"id": f"pattern-{index:03d}",
"source_index": index,
"paragraph_index": candidate.get("paragraph_index", index),
"sentence_index": candidate.get("sentence_index", index),
"source_sentence": source,
"supervision_dimension": dimension,
"core_regulations": refs,
"basis_text": basis,
"description_pattern": pattern_text,
"usage": usage,
"relevance": "high" if candidate.get("score", 0) >= 3 else "medium",
"keywords": candidate.get("matched_keywords", []),
})
return {
"status": "done",
"domain": domain_record.get("domain", ""),
"filename": filename,
"extracted_at": datetime.now().isoformat(),
"policy_focus": "穿透式监管",
"core_regulations": refs,
"granularity": normalized,
"text_extraction": {"method": extraction["method"], "status": extraction["status"]},
"text_stats": {
"paragraph_count": len(paragraphs),
"sentence_count": len(sentences),
"candidate_count": len(candidates),
},
"description_patterns": patterns,
}
def _dimension_for(self, text: str) -> str:
for dimension, keywords in self.PENETRATION_KEYWORDS.items():
if any(keyword in text for keyword in keywords):
return dimension
return "一般监管"
def _basis_text(self, sentence: str) -> str:
text = re.sub(r"^《[^》]+》[^。;;]*[。;;]?", "", sentence).strip()
text = text or sentence.strip()
text = text.replace("国有企业", "企业")
if self._dimension_for(text) != "一般监管":
text = f"监管要求{text}"
return text if text.endswith(("", "", "")) else f"{text}"
def _pattern_text(self, refs: list[str], basis: str, dimension: str) -> str:
clause = f"条款要点:{basis.rstrip('')},防范相关风险。"
if dimension != "一般监管":
clause = f"条款要点:穿透识别关键主体、资金流向和责任边界,{basis.rstrip('')},防范责任悬空风险。"
if refs:
return f"核心法规:{''.join(refs)}{clause}"
return clause