"""政策指引文本分析。""" from __future__ import annotations import re import shutil import subprocess import tempfile from datetime import datetime from pathlib import Path class GuidanceAnalyzer: GRANULARITY_MAP = {"low": "low", "high": "high", "coarse": "low", "medium": "high", "fine": "high"} DOMAIN_KEYWORDS = { "过度负债": ["资产负债率", "负债", "债务", "有息负债", "偿债", "预警", "现金净流量"], "财务金融风险": ["穿透", "实际控制人", "最终受益人", "资金流向", "交易对手", "账户"], "虚假贸易": ["贸易", "合同", "交易", "空转", "融资性贸易"], } PENETRATION_KEYWORDS = { "主体穿透": ["实际控制人", "最终受益人", "责任主体", "关联方"], "资金穿透": ["资金流向", "账户", "资金闭环", "回款"], "交易穿透": ["交易对手", "合同", "贸易", "交易链条"], } def normalize_granularity(self, granularity: str | None) -> str: return self.GRANULARITY_MAP.get((granularity or "high").lower(), granularity or "high") def is_supported_granularity(self, granularity: str | None) -> bool: return (granularity or "high").lower() in self.GRANULARITY_MAP def extract_text(self, content: bytes, filename: str) -> dict: suffix = Path(filename).suffix.lower() if suffix == ".pdf": return {"method": "pdf", "status": "ok", "text": self._extract_pdf_text(content)} if suffix == ".docx": return {"method": "docx", "status": "ok", "text": self._extract_docx_text(content)} if suffix == ".doc": return {"method": "doc", "status": "ok", "text": self._extract_doc_text(content)} if suffix == ".md": return {"method": "markdown", "status": "ok", "text": content.decode("utf-8-sig", errors="ignore")} return {"method": "plain_text", "status": "ok", "text": content.decode("utf-8-sig", errors="ignore")} def _extract_pdf_text(self, content: bytes) -> str: from pypdf import PdfReader with tempfile.SpooledTemporaryFile() as stream: stream.write(content) stream.seek(0) reader = PdfReader(stream) return "\n".join(page.extract_text() or "" for page in reader.pages) def _extract_docx_text(self, content: bytes) -> str: from docx import Document with tempfile.SpooledTemporaryFile() as stream: stream.write(content) stream.seek(0) document = Document(stream) return "\n".join(paragraph.text for paragraph in document.paragraphs if paragraph.text.strip()) def _extract_doc_text(self, content: bytes) -> str: antiword = shutil.which("antiword") if not antiword: raise RuntimeError("解析 .doc 文件需要容器安装 antiword") with tempfile.NamedTemporaryFile(suffix=".doc") as file: file.write(content) file.flush() result = subprocess.run( [antiword, file.name], check=True, capture_output=True, text=True, encoding="utf-8", errors="ignore", ) return result.stdout def split_sentences(self, paragraphs: list[str]) -> list[dict]: sentences: list[dict] = [] for paragraph_index, paragraph in enumerate(paragraphs, start=1): for part in re.split(r"(?<=[。!?!?;;])\s*", paragraph.strip()): text = part.strip() if text: sentences.append({ "text": text, "paragraph_index": paragraph_index, "sentence_index": len(sentences) + 1, }) return sentences def extract_regulation_refs(self, text: str) -> list[str]: refs = re.findall(r"《[^》]+》", text) return list(dict.fromkeys(refs)) def select_candidates( self, domain: str, note: str, sentences: list[dict], granularity: str = "high", ) -> list[dict]: keywords = set(self.DOMAIN_KEYWORDS.get(domain, [])) keywords.update(token for token in re.split(r"[\s,,、;;]+", note or "") if token) if not keywords and domain: keywords.add(domain) candidates = [] for sentence in sentences: text = sentence["text"] matched = [keyword for keyword in keywords if keyword and keyword.lower() in text.lower()] penetration_hits = [ keyword for words in self.PENETRATION_KEYWORDS.values() for keyword in words if keyword in text ] if not matched and not penetration_hits: continue dimension = self._dimension_for(text) score = len(matched) * 2 + len(penetration_hits) candidates.append({ **sentence, "matched_keywords": matched or penetration_hits, "supervision_dimension": dimension, "score": score, }) candidates.sort(key=lambda item: (-item["score"], item["sentence_index"])) limit = 12 if self.normalize_granularity(granularity) == "low" else 30 return candidates[:limit] def analyze( self, domain_record: dict, content: bytes, filename: str, granularity: str = "high", ) -> dict: normalized = self.normalize_granularity(granularity) if not self.is_supported_granularity(granularity): raise ValueError(f"不支持的 granularity: {granularity}") extraction = self.extract_text(content, filename) text = extraction["text"] paragraphs = [line.strip() for line in re.split(r"\n+", text) if line.strip()] sentences = self.split_sentences(paragraphs) candidates = self.select_candidates(domain_record.get("domain", ""), domain_record.get("note", ""), sentences, normalized) if not candidates and sentences: candidates = [sentences[0] | {"matched_keywords": [], "supervision_dimension": "一般监管"}] refs = [] if normalized == "low" else self.extract_regulation_refs(text) patterns = [] for index, candidate in enumerate(candidates[: (12 if normalized == "low" else 30)], start=1): source = candidate["text"] basis = self._basis_text(source) dimension = candidate.get("supervision_dimension") or self._dimension_for(source) pattern_text = self._pattern_text(refs, basis, dimension) usage = "用于后续穿透识别实际控制关系、资金流向和责任主体认定。" if dimension != "一般监管" else "用于后续规则生成、选表或领域风险识别的参考句式。" patterns.append({ "id": f"pattern-{index:03d}", "source_index": index, "paragraph_index": candidate.get("paragraph_index", index), "sentence_index": candidate.get("sentence_index", index), "source_sentence": source, "supervision_dimension": dimension, "core_regulations": refs, "basis_text": basis, "description_pattern": pattern_text, "usage": usage, "relevance": "high" if candidate.get("score", 0) >= 3 else "medium", "keywords": candidate.get("matched_keywords", []), }) return { "status": "done", "domain": domain_record.get("domain", ""), "filename": filename, "extracted_at": datetime.now().isoformat(), "policy_focus": "穿透式监管", "core_regulations": refs, "granularity": normalized, "text_extraction": {"method": extraction["method"], "status": extraction["status"]}, "text_stats": { "paragraph_count": len(paragraphs), "sentence_count": len(sentences), "candidate_count": len(candidates), }, "description_patterns": patterns, } def _dimension_for(self, text: str) -> str: for dimension, keywords in self.PENETRATION_KEYWORDS.items(): if any(keyword in text for keyword in keywords): return dimension return "一般监管" def _basis_text(self, sentence: str) -> str: text = re.sub(r"^《[^》]+》[^。;;]*[。;;]?", "", sentence).strip() text = text or sentence.strip() text = text.replace("国有企业", "企业") if self._dimension_for(text) != "一般监管": text = f"监管要求{text}" return text if text.endswith(("。", "!", "?")) else f"{text}。" def _pattern_text(self, refs: list[str], basis: str, dimension: str) -> str: clause = f"条款要点:{basis.rstrip('。')},防范相关风险。" if dimension != "一般监管": clause = f"条款要点:穿透识别关键主体、资金流向和责任边界,{basis.rstrip('。')},防范责任悬空风险。" if refs: return f"核心法规:{'、'.join(refs)};{clause}" return clause