2026-06-10 19:15:24 +08:00
|
|
|
|
"""政策指引文本分析。"""
|
|
|
|
|
|
|
|
|
|
|
|
from __future__ import annotations
|
|
|
|
|
|
|
|
|
|
|
|
import re
|
|
|
|
|
|
import shutil
|
|
|
|
|
|
import subprocess
|
|
|
|
|
|
import tempfile
|
|
|
|
|
|
from datetime import datetime
|
|
|
|
|
|
from pathlib import Path
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class GuidanceAnalyzer:
|
|
|
|
|
|
GRANULARITY_MAP = {"low": "low", "high": "high", "coarse": "low", "medium": "high", "fine": "high"}
|
|
|
|
|
|
|
|
|
|
|
|
DOMAIN_KEYWORDS = {
|
|
|
|
|
|
"过度负债": ["资产负债率", "负债", "债务", "有息负债", "偿债", "预警", "现金净流量"],
|
|
|
|
|
|
"财务金融风险": ["穿透", "实际控制人", "最终受益人", "资金流向", "交易对手", "账户"],
|
|
|
|
|
|
"虚假贸易": ["贸易", "合同", "交易", "空转", "融资性贸易"],
|
|
|
|
|
|
}
|
|
|
|
|
|
PENETRATION_KEYWORDS = {
|
|
|
|
|
|
"主体穿透": ["实际控制人", "最终受益人", "责任主体", "关联方"],
|
|
|
|
|
|
"资金穿透": ["资金流向", "账户", "资金闭环", "回款"],
|
|
|
|
|
|
"交易穿透": ["交易对手", "合同", "贸易", "交易链条"],
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
def normalize_granularity(self, granularity: str | None) -> str:
|
|
|
|
|
|
return self.GRANULARITY_MAP.get((granularity or "high").lower(), granularity or "high")
|
|
|
|
|
|
|
|
|
|
|
|
def is_supported_granularity(self, granularity: str | None) -> bool:
|
|
|
|
|
|
return (granularity or "high").lower() in self.GRANULARITY_MAP
|
|
|
|
|
|
|
|
|
|
|
|
def extract_text(self, content: bytes, filename: str) -> dict:
|
|
|
|
|
|
suffix = Path(filename).suffix.lower()
|
|
|
|
|
|
if suffix == ".pdf":
|
|
|
|
|
|
return {"method": "pdf", "status": "ok", "text": self._extract_pdf_text(content)}
|
|
|
|
|
|
if suffix == ".docx":
|
|
|
|
|
|
return {"method": "docx", "status": "ok", "text": self._extract_docx_text(content)}
|
|
|
|
|
|
if suffix == ".doc":
|
|
|
|
|
|
return {"method": "doc", "status": "ok", "text": self._extract_doc_text(content)}
|
|
|
|
|
|
if suffix == ".md":
|
|
|
|
|
|
return {"method": "markdown", "status": "ok", "text": content.decode("utf-8-sig", errors="ignore")}
|
|
|
|
|
|
return {"method": "plain_text", "status": "ok", "text": content.decode("utf-8-sig", errors="ignore")}
|
|
|
|
|
|
|
|
|
|
|
|
def _extract_pdf_text(self, content: bytes) -> str:
|
|
|
|
|
|
from pypdf import PdfReader
|
|
|
|
|
|
|
|
|
|
|
|
with tempfile.SpooledTemporaryFile() as stream:
|
|
|
|
|
|
stream.write(content)
|
|
|
|
|
|
stream.seek(0)
|
|
|
|
|
|
reader = PdfReader(stream)
|
|
|
|
|
|
return "\n".join(page.extract_text() or "" for page in reader.pages)
|
|
|
|
|
|
|
|
|
|
|
|
def _extract_docx_text(self, content: bytes) -> str:
|
|
|
|
|
|
from docx import Document
|
|
|
|
|
|
|
|
|
|
|
|
with tempfile.SpooledTemporaryFile() as stream:
|
|
|
|
|
|
stream.write(content)
|
|
|
|
|
|
stream.seek(0)
|
|
|
|
|
|
document = Document(stream)
|
|
|
|
|
|
return "\n".join(paragraph.text for paragraph in document.paragraphs if paragraph.text.strip())
|
|
|
|
|
|
|
|
|
|
|
|
def _extract_doc_text(self, content: bytes) -> str:
|
|
|
|
|
|
antiword = shutil.which("antiword")
|
|
|
|
|
|
if not antiword:
|
|
|
|
|
|
raise RuntimeError("解析 .doc 文件需要容器安装 antiword")
|
|
|
|
|
|
with tempfile.NamedTemporaryFile(suffix=".doc") as file:
|
|
|
|
|
|
file.write(content)
|
|
|
|
|
|
file.flush()
|
|
|
|
|
|
result = subprocess.run(
|
|
|
|
|
|
[antiword, file.name],
|
|
|
|
|
|
check=True,
|
|
|
|
|
|
capture_output=True,
|
|
|
|
|
|
text=True,
|
|
|
|
|
|
encoding="utf-8",
|
|
|
|
|
|
errors="ignore",
|
|
|
|
|
|
)
|
|
|
|
|
|
return result.stdout
|
|
|
|
|
|
|
|
|
|
|
|
def split_sentences(self, paragraphs: list[str]) -> list[dict]:
|
|
|
|
|
|
sentences: list[dict] = []
|
|
|
|
|
|
for paragraph_index, paragraph in enumerate(paragraphs, start=1):
|
|
|
|
|
|
for part in re.split(r"(?<=[。!?!?;;])\s*", paragraph.strip()):
|
|
|
|
|
|
text = part.strip()
|
|
|
|
|
|
if text:
|
|
|
|
|
|
sentences.append({
|
|
|
|
|
|
"text": text,
|
|
|
|
|
|
"paragraph_index": paragraph_index,
|
|
|
|
|
|
"sentence_index": len(sentences) + 1,
|
|
|
|
|
|
})
|
|
|
|
|
|
return sentences
|
|
|
|
|
|
|
|
|
|
|
|
def extract_regulation_refs(self, text: str) -> list[str]:
|
|
|
|
|
|
refs = re.findall(r"《[^》]+》", text)
|
|
|
|
|
|
return list(dict.fromkeys(refs))
|
|
|
|
|
|
|
|
|
|
|
|
def select_candidates(
|
|
|
|
|
|
self,
|
|
|
|
|
|
domain: str,
|
|
|
|
|
|
note: str,
|
|
|
|
|
|
sentences: list[dict],
|
|
|
|
|
|
granularity: str = "high",
|
|
|
|
|
|
) -> list[dict]:
|
|
|
|
|
|
keywords = set(self.DOMAIN_KEYWORDS.get(domain, []))
|
|
|
|
|
|
keywords.update(token for token in re.split(r"[\s,,、;;]+", note or "") if token)
|
|
|
|
|
|
if not keywords and domain:
|
|
|
|
|
|
keywords.add(domain)
|
|
|
|
|
|
|
|
|
|
|
|
candidates = []
|
|
|
|
|
|
for sentence in sentences:
|
|
|
|
|
|
text = sentence["text"]
|
|
|
|
|
|
matched = [keyword for keyword in keywords if keyword and keyword.lower() in text.lower()]
|
|
|
|
|
|
penetration_hits = [
|
|
|
|
|
|
keyword
|
|
|
|
|
|
for words in self.PENETRATION_KEYWORDS.values()
|
|
|
|
|
|
for keyword in words
|
|
|
|
|
|
if keyword in text
|
|
|
|
|
|
]
|
|
|
|
|
|
if not matched and not penetration_hits:
|
|
|
|
|
|
continue
|
|
|
|
|
|
dimension = self._dimension_for(text)
|
|
|
|
|
|
score = len(matched) * 2 + len(penetration_hits)
|
|
|
|
|
|
candidates.append({
|
|
|
|
|
|
**sentence,
|
|
|
|
|
|
"matched_keywords": matched or penetration_hits,
|
|
|
|
|
|
"supervision_dimension": dimension,
|
|
|
|
|
|
"score": score,
|
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
|
|
candidates.sort(key=lambda item: (-item["score"], item["sentence_index"]))
|
|
|
|
|
|
limit = 12 if self.normalize_granularity(granularity) == "low" else 30
|
|
|
|
|
|
return candidates[:limit]
|
|
|
|
|
|
|
|
|
|
|
|
def analyze(
|
|
|
|
|
|
self,
|
|
|
|
|
|
domain_record: dict,
|
|
|
|
|
|
content: bytes,
|
|
|
|
|
|
filename: str,
|
|
|
|
|
|
granularity: str = "high",
|
|
|
|
|
|
) -> dict:
|
|
|
|
|
|
normalized = self.normalize_granularity(granularity)
|
|
|
|
|
|
if not self.is_supported_granularity(granularity):
|
|
|
|
|
|
raise ValueError(f"不支持的 granularity: {granularity}")
|
|
|
|
|
|
|
|
|
|
|
|
extraction = self.extract_text(content, filename)
|
|
|
|
|
|
text = extraction["text"]
|
|
|
|
|
|
paragraphs = [line.strip() for line in re.split(r"\n+", text) if line.strip()]
|
|
|
|
|
|
sentences = self.split_sentences(paragraphs)
|
|
|
|
|
|
candidates = self.select_candidates(domain_record.get("domain", ""), domain_record.get("note", ""), sentences, normalized)
|
|
|
|
|
|
if not candidates and sentences:
|
|
|
|
|
|
candidates = [sentences[0] | {"matched_keywords": [], "supervision_dimension": "一般监管"}]
|
|
|
|
|
|
|
|
|
|
|
|
refs = [] if normalized == "low" else self.extract_regulation_refs(text)
|
|
|
|
|
|
patterns = []
|
|
|
|
|
|
for index, candidate in enumerate(candidates[: (12 if normalized == "low" else 30)], start=1):
|
|
|
|
|
|
source = candidate["text"]
|
|
|
|
|
|
basis = self._basis_text(source)
|
|
|
|
|
|
dimension = candidate.get("supervision_dimension") or self._dimension_for(source)
|
|
|
|
|
|
pattern_text = self._pattern_text(refs, basis, dimension)
|
|
|
|
|
|
usage = "用于后续穿透识别实际控制关系、资金流向和责任主体认定。" if dimension != "一般监管" else "用于后续规则生成、选表或领域风险识别的参考句式。"
|
|
|
|
|
|
patterns.append({
|
|
|
|
|
|
"id": f"pattern-{index:03d}",
|
|
|
|
|
|
"source_index": index,
|
|
|
|
|
|
"paragraph_index": candidate.get("paragraph_index", index),
|
|
|
|
|
|
"sentence_index": candidate.get("sentence_index", index),
|
|
|
|
|
|
"source_sentence": source,
|
|
|
|
|
|
"supervision_dimension": dimension,
|
|
|
|
|
|
"core_regulations": refs,
|
|
|
|
|
|
"basis_text": basis,
|
|
|
|
|
|
"description_pattern": pattern_text,
|
|
|
|
|
|
"usage": usage,
|
|
|
|
|
|
"relevance": "high" if candidate.get("score", 0) >= 3 else "medium",
|
|
|
|
|
|
"keywords": candidate.get("matched_keywords", []),
|
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
|
|
return {
|
|
|
|
|
|
"status": "done",
|
|
|
|
|
|
"domain": domain_record.get("domain", ""),
|
|
|
|
|
|
"filename": filename,
|
|
|
|
|
|
"extracted_at": datetime.now().isoformat(),
|
|
|
|
|
|
"policy_focus": "穿透式监管",
|
|
|
|
|
|
"core_regulations": refs,
|
|
|
|
|
|
"granularity": normalized,
|
|
|
|
|
|
"text_extraction": {"method": extraction["method"], "status": extraction["status"]},
|
|
|
|
|
|
"text_stats": {
|
|
|
|
|
|
"paragraph_count": len(paragraphs),
|
|
|
|
|
|
"sentence_count": len(sentences),
|
|
|
|
|
|
"candidate_count": len(candidates),
|
|
|
|
|
|
},
|
|
|
|
|
|
"description_patterns": patterns,
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
def _dimension_for(self, text: str) -> str:
|
|
|
|
|
|
for dimension, keywords in self.PENETRATION_KEYWORDS.items():
|
|
|
|
|
|
if any(keyword in text for keyword in keywords):
|
|
|
|
|
|
return dimension
|
|
|
|
|
|
return "一般监管"
|
|
|
|
|
|
|
|
|
|
|
|
def _basis_text(self, sentence: str) -> str:
|
|
|
|
|
|
text = re.sub(r"^《[^》]+》[^。;;]*[。;;]?", "", sentence).strip()
|
|
|
|
|
|
text = text or sentence.strip()
|
|
|
|
|
|
text = text.replace("国有企业", "企业")
|
|
|
|
|
|
if self._dimension_for(text) != "一般监管":
|
|
|
|
|
|
text = f"监管要求{text}"
|
|
|
|
|
|
return text if text.endswith(("。", "!", "?")) else f"{text}。"
|
|
|
|
|
|
|
|
|
|
|
|
def _pattern_text(self, refs: list[str], basis: str, dimension: str) -> str:
|
|
|
|
|
|
clause = f"条款要点:{basis.rstrip('。')},防范相关风险。"
|
|
|
|
|
|
if dimension != "一般监管":
|
|
|
|
|
|
clause = f"条款要点:穿透识别关键主体、资金流向和责任边界,{basis.rstrip('。')},防范责任悬空风险。"
|
|
|
|
|
|
if refs:
|
|
|
|
|
|
return f"核心法规:{'、'.join(refs)};{clause}"
|
|
|
|
|
|
return clause
|