from __future__ import annotations import re from typing import Any from app.services.user_agent_knowledge_constants import ( KNOWLEDGE_ARTICLE_PATTERN, KNOWLEDGE_LIST_ITEM_PATTERN, KNOWLEDGE_NUMBERED_ITEM_PATTERN, KNOWLEDGE_QUERY_STOPWORDS, KNOWLEDGE_SECTION_HEADING_PATTERN, MAX_KNOWLEDGE_MODEL_HITS, MAX_KNOWLEDGE_QUERY_TERMS, ) class UserAgentKnowledgeHelpersMixin: @staticmethod def _select_knowledge_model_hits( tool_payload: dict[str, Any], *, question: str | None = None, ) -> list[dict[str, Any]]: raw_hits = [ item for item in list(tool_payload.get("hits") or []) if isinstance(item, dict) ][: max(MAX_KNOWLEDGE_MODEL_HITS + 1, 6)] if not raw_hits: return [] query_terms = UserAgentKnowledgeHelpersMixin._extract_knowledge_query_terms(question or "") if not query_terms: return raw_hits[:MAX_KNOWLEDGE_MODEL_HITS] ranked_hits = sorted( enumerate(raw_hits), key=lambda value: ( UserAgentKnowledgeHelpersMixin._score_knowledge_model_hit( value[1], query_terms=query_terms, rank_index=value[0], ), -value[0], ), reverse=True, ) return [item for _, item in ranked_hits[:MAX_KNOWLEDGE_MODEL_HITS]] @staticmethod def _score_knowledge_model_hit( item: dict[str, Any], *, query_terms: list[str], rank_index: int, ) -> int: title = str(item.get("title") or item.get("document_name") or "").lower() excerpt = str(item.get("excerpt") or "").lower() content = str(item.get("content") or "").lower() haystack = "\n".join([title, excerpt, content[:1400]]) matched_terms = [term for term in query_terms if term in haystack] score = max(1, 48 - rank_index * 4) score += len(matched_terms) * 10 score += sum(1 for term in matched_terms if term in title) * 8 leading_marker = UserAgentKnowledgeHelpersMixin._leading_knowledge_appendix_marker(content) if leading_marker == "# 章节导航": score -= 22 elif leading_marker == "# 问答线索补充": score += 6 if matched_terms else -8 elif leading_marker == "# 重点章节摘录": score += 4 if matched_terms else -4 elif leading_marker == "# 结构化表格补充": score += 8 if matched_terms else -3 if matched_terms and "|" in content: score += 8 if matched_terms and any(marker in content for marker in (":", ":")): score += 10 if matched_terms and "\n" in content: score += 4 if matched_terms and any(marker in content for marker in ("附表", "第", "条")): score += 4 if matched_terms and any(marker in content for marker in ("第", "条", ":", "-", "•")): score += 4 if re.search(r"没有.{0,8}(信息|规定|说明|依据)", content): score -= 12 return score @staticmethod def _leading_knowledge_appendix_marker(content: str) -> str: normalized = str(content or "").lstrip() for marker in ("# 章节导航", "# 重点章节摘录", "# 问答线索补充", "# 结构化表格补充"): index = normalized.find(marker) if 0 <= index <= 220: return marker return "" def _prioritize_knowledge_evidence_items( self, question: str, evidence_items: list[dict[str, Any]], ) -> list[dict[str, Any]]: if not evidence_items or not self._question_requires_explicit_condition(question): return evidence_items for preferred_kind in ("table", "kv", "clause", "list"): for index, item in enumerate(evidence_items): if str(item.get("kind") or "") != preferred_kind: continue return [item, *evidence_items[:index], *evidence_items[index + 1 :]] for index, item in enumerate(evidence_items): if re.search(r"\d", str(item.get("content") or "")): return [item, *evidence_items[:index], *evidence_items[index + 1 :]] return evidence_items @staticmethod def _is_knowledge_lead_in_segment(item: dict[str, str]) -> bool: kind = str(item.get("kind") or "").strip() content = str(item.get("content") or "").strip() return kind in {"kv", "list", "clause"} and content.endswith((":", ":")) @staticmethod def _extract_knowledge_marker_family(content: str) -> str: normalized = str(content or "").strip() if not normalized: return "" if KNOWLEDGE_ARTICLE_PATTERN.match(normalized): return "article" if re.match(r"^\d+[.)、]\s*", normalized): return "arabic" if re.match(r"^[((][一二三四五六七八九十百零0-9]+[))]\s*", normalized): return "paren" if re.match(r"^[①②③④⑤⑥⑦⑧⑨⑩]\s*", normalized): return "circled" if KNOWLEDGE_LIST_ITEM_PATTERN.match(normalized): return "bullet" return "" @staticmethod def _format_knowledge_heading_label(heading: str) -> str: parts = [item.strip() for item in str(heading or "").split(">") if item.strip()] return " / ".join(parts) def _score_knowledge_evidence_candidate( self, item: dict[str, str], query_terms: list[str], ) -> int: heading = str(item.get("heading") or "").lower() content = str(item.get("content") or "").lower() kind = str(item.get("kind") or "").strip() haystack = "\n".join([heading, content]) matched_terms = [term for term in query_terms if term in haystack] score = len(matched_terms) * 10 score += sum(1 for term in matched_terms if term in heading) * 6 if kind == "table": score += 10 elif kind in {"kv", "clause", "list"}: score += 8 elif kind == "paragraph": score += 4 if "问答线索补充" in heading or "重点章节摘录" in heading: score += 8 if "结构化表格补充" in heading: score += 10 if "章节导航" in heading or "目录" in heading: score -= 16 if re.search(r"[.。…]{6,}", content): score -= 12 if any(hint in content for hint in ("应", "需", "不得", "可以", "标准", "条件", "材料", "审批", "流程", "包括")): score += 3 content_length = len(content) if content_length > 220: score -= min(8, (content_length - 220) // 40) return score @staticmethod def _extract_knowledge_query_terms(question: str) -> list[str]: normalized_question = str(question or "").strip().lower() if not normalized_question: return [] terms: list[str] = [] seen: set[str] = set() def remember(term: str) -> None: normalized = str(term or "").strip().lower() if ( not normalized or normalized in seen or normalized in KNOWLEDGE_QUERY_STOPWORDS ): return seen.add(normalized) terms.append(normalized) for item in re.findall(r"[a-z0-9][a-z0-9_\-]{1,}", normalized_question): remember(item) for block in re.findall(r"[\u4e00-\u9fff]{2,20}", normalized_question): if len(block) <= 4: remember(block) continue for size in (4, 3, 2): for start in range(0, len(block) - size + 1): remember(block[start : start + size]) if len(terms) >= MAX_KNOWLEDGE_QUERY_TERMS: return terms return terms[:MAX_KNOWLEDGE_QUERY_TERMS] @staticmethod def _clean_knowledge_segment_text(content: str) -> str: normalized = str(content or "").strip() normalized = re.sub(r"^[-*•]\s*", "", normalized) normalized = re.sub(r"^(?:\d+[.)、]|[①②③④⑤⑥⑦⑧⑨⑩])\s*", "", normalized) normalized = re.sub(r"^[((][一二三四五六七八九十百零0-9]+[))]\s*", "", normalized) normalized = re.sub(r"\s+", " ", normalized) if len(normalized) <= 180: return normalized return f"{normalized[:177].rstrip()}..." @staticmethod def _normalize_knowledge_line(content: str, *, preserve_marker: bool) -> str: normalized = str(content or "").strip() normalized = re.sub(r"^[-*•]\s*", "", normalized) if not preserve_marker: normalized = re.sub(r"^(?:\d+[.)、]|[①②③④⑤⑥⑦⑧⑨⑩])\s*", "", normalized) normalized = re.sub(r"^[((][一二三四五六七八九十百零0-9]+[))]\s*", "", normalized) normalized = re.sub(r"\s+", " ", normalized) return normalized def _split_clean_knowledge_lines( self, content: str, *, preserve_marker: bool, ) -> list[str]: return [ line for line in ( self._normalize_knowledge_line(item, preserve_marker=preserve_marker) for item in str(content or "").splitlines() ) if line ] @staticmethod def _extract_relevant_table_preview(content: str, query_terms: list[str]) -> str: lines = [line.strip() for line in str(content or "").splitlines() if line.strip()] if len(lines) <= 3: return "\n".join(lines) header = lines[0] divider = lines[1] if len(lines) > 1 else "" body = lines[2:] if divider.count("|") >= 2 else lines[1:] matched_rows = [ row for row in body if any(term in row.lower() for term in query_terms) ] selected_rows = matched_rows[:3] or body[:2] preview_lines = [header] if divider: preview_lines.append(divider) preview_lines.extend(selected_rows) return "\n".join(preview_lines).strip() @staticmethod def _question_requires_explicit_condition(question: str) -> bool: normalized = str(question or "").strip() return any(keyword in normalized for keyword in ("多少", "金额", "上限", "限额", "标准", "条件", "需要")) @staticmethod def _answer_evidence_has_numeric_or_condition(evidence_items: list[dict[str, Any]]) -> bool: for item in evidence_items: content = str(item.get("content") or "") if re.search(r"\d", content): return True if any( keyword in content for keyword in ("应", "需", "不得", "可以", "条件", "材料", "审批", "流程", "标准", "适用") ): return True return False