from __future__ import annotations import json import os import re from pathlib import Path from typing import Any from xml.etree import ElementTree MAX_INGEST_LOG_CHUNKS = 24 MAX_INGEST_LOG_ENTITIES = 24 MAX_INGEST_LOG_ENTITY_CHUNKS = 48 MAX_INGEST_LOG_RELATIONS = 24 MAX_INGEST_LOG_SECTIONS = 12 MAX_INGEST_LOG_TEXT_PREVIEW = 180 MAX_INGEST_LOG_ENTITY_DESCRIPTIONS = 5 GRAPHML_NAMESPACE = {"graphml": "http://graphml.graphdrawing.org/xmlns"} GRAPH_PROPERTY_SEPARATOR = "" INGEST_SECTION_HEADING_PATTERN = re.compile( r"^(?:#{1,4}\s+.+|第[一二三四五六七八九十百零0-9]+[章节条]\s*.*)$" ) def build_ingest_document_summary( *, document_id: str, entry: dict[str, Any], raw_text: str, indexed_text: str, ) -> dict[str, Any]: raw_text_value = str(raw_text or "") indexed_text_value = str(indexed_text or "") sections = _extract_ingest_sections(indexed_text_value) return { "document_id": document_id, "name": str(entry.get("original_name") or "").strip(), "folder": str(entry.get("folder") or "").strip(), "extension": str(entry.get("extension") or "").strip(), "mime_type": str(entry.get("mime_type") or "").strip(), "text_chars": len(raw_text_value), "indexed_text_chars": len(indexed_text_value), "section_count": len(sections), "sections": sections, "chunk_count": 0, "chunk_ids": [], "chunks": [], "entity_count": 0, "relation_count": 0, "entities": [], "entity_chunks": [], "relations": [], } def build_ingest_status_summary( *, status_payload: dict[str, Any], graph_summary: dict[str, Any], ) -> dict[str, Any]: chunk_ids = _normalize_chunk_ids(status_payload) chunk_count = _resolve_chunk_count(status_payload, chunk_ids) return { "lightrag_status": str(status_payload.get("status") or "").strip(), "query_ready": bool(status_payload.get("query_ready")), "chunk_count": chunk_count, "chunk_ids": chunk_ids[:MAX_INGEST_LOG_CHUNKS], **graph_summary, } def enrich_knowledge_ingest_route_json( route_json: dict[str, Any], *, storage_root: Path, ) -> dict[str, Any]: if not isinstance(route_json, dict): return route_json ingest = route_json.get("knowledge_ingest") if not isinstance(ingest, dict): return route_json graph = ingest.get("graph") if not isinstance(graph, dict): return route_json workspace = _resolve_lightrag_workspace(route_json) graph_snapshot = _load_lightrag_graph_snapshot(storage_root, workspace=workspace) if not graph_snapshot["entities"] and not graph_snapshot["relations"]: return route_json next_route = dict(route_json) next_ingest = dict(ingest) next_graph = _enrich_graph_payload(graph, graph_snapshot) next_ingest["graph"] = next_graph next_route["knowledge_ingest"] = next_ingest return next_route def build_document_graph_summary( storage_root: Path, *, workspace: str, document_id: str, ) -> dict[str, Any]: workspace_dir = ( Path(storage_root) / "knowledge" / ".lightrag" / str(workspace).strip() ).resolve() entities_payload = _load_json_file(workspace_dir / "kv_store_full_entities.json") relations_payload = _load_json_file(workspace_dir / "kv_store_full_relations.json") chunks_payload = _load_json_file(workspace_dir / "kv_store_text_chunks.json") entity_chunks_payload = _load_json_file(workspace_dir / "kv_store_entity_chunks.json") graph_snapshot = _load_lightrag_graph_snapshot(storage_root, workspace=workspace) entities = _normalize_document_entities(entities_payload, document_id) relations = _normalize_document_relations(relations_payload, document_id) chunks = _normalize_document_chunks(chunks_payload, document_id) entity_chunks = _normalize_document_entity_chunks( entity_chunks_payload, entities, chunk_ids={str(item.get("id") or "").strip() for item in chunks}, ) return { "entity_count": len(entities), "relation_count": len(relations), "entities": _enrich_entity_list(entities, graph_snapshot)[:MAX_INGEST_LOG_ENTITIES], "relations": _enrich_relation_list(relations, graph_snapshot)[:MAX_INGEST_LOG_RELATIONS], "chunks": chunks[:MAX_INGEST_LOG_CHUNKS], "entity_chunks": entity_chunks[:MAX_INGEST_LOG_ENTITY_CHUNKS], } def _resolve_lightrag_workspace(route_json: dict[str, Any]) -> str: explicit_workspace = str( route_json.get("lightrag_workspace") or route_json.get("workspace") or "" ).strip() if explicit_workspace: return explicit_workspace return os.environ.get("LIGHTRAG_WORKSPACE", "x_financial_knowledge").strip() or "x_financial_knowledge" def _enrich_graph_payload( graph: dict[str, Any], graph_snapshot: dict[str, Any], ) -> dict[str, Any]: next_graph = dict(graph) relation_items = _extract_relation_items(graph.get("relations")) relation_entity_names = [ name for relation in relation_items for name in (relation.get("source"), relation.get("target")) ] next_graph["entities"] = _enrich_entity_list( _dedupe_text_items( _extract_entity_names(graph.get("entities")) + relation_entity_names ), graph_snapshot, ) next_graph["relations"] = _enrich_relation_list(relation_items, graph_snapshot) return next_graph def _enrich_entity_list( entity_names: list[str], graph_snapshot: dict[str, Any], ) -> list[dict[str, Any]]: graph_entities = graph_snapshot.get("entities") or {} return [ graph_entities.get(entity_name) or { "name": entity_name, "type": "实体", "description": "", "descriptions": [], "properties": {}, } for entity_name in entity_names ] def _enrich_relation_list( relations: list[dict[str, Any]], graph_snapshot: dict[str, Any], ) -> list[dict[str, Any]]: graph_relations = graph_snapshot.get("relations") or {} enriched_relations: list[dict[str, Any]] = [] for relation in relations: source = str(relation.get("source") or "").strip() target = str(relation.get("target") or "").strip() relation_type = str(relation.get("type") or "关联").strip() graph_relation = ( graph_relations.get((source, target)) or graph_relations.get((target, source)) or {} ) enriched_relations.append( { **relation, "source": source, "target": target, "type": relation_type, "description": graph_relation.get("description", ""), "keywords": graph_relation.get("keywords", []), "weight": graph_relation.get("weight", relation.get("weight", 1)), "properties": graph_relation.get("properties", {}), } ) return enriched_relations def _load_lightrag_graph_snapshot(storage_root: Path, *, workspace: str) -> dict[str, Any]: graphml_path = ( Path(storage_root) / "knowledge" / ".lightrag" / str(workspace).strip() / "graph_chunk_entity_relation.graphml" ) if not graphml_path.exists(): return {"entities": {}, "relations": {}} try: root = ElementTree.parse(graphml_path).getroot() except (ElementTree.ParseError, OSError): return {"entities": {}, "relations": {}} key_names = { str(key.attrib.get("id") or ""): str(key.attrib.get("attr.name") or "") for key in root.findall("graphml:key", GRAPHML_NAMESPACE) } return { "entities": _load_graphml_entities(root, key_names), "relations": _load_graphml_relations(root, key_names), } def _load_graphml_entities( root: ElementTree.Element, key_names: dict[str, str], ) -> dict[str, dict[str, Any]]: entities: dict[str, dict[str, Any]] = {} for node in root.findall(".//graphml:node", GRAPHML_NAMESPACE): properties = _read_graphml_data(node, key_names) name = str(properties.get("entity_id") or node.attrib.get("id") or "").strip() if not name: continue descriptions = _split_graph_property(properties.get("description")) visible_properties = _filter_graph_properties(properties) entities[name] = { "name": name, "type": str(properties.get("entity_type") or "实体").strip(), "description": descriptions[0] if descriptions else "", "descriptions": descriptions[:MAX_INGEST_LOG_ENTITY_DESCRIPTIONS], "properties": visible_properties, } return entities def _load_graphml_relations( root: ElementTree.Element, key_names: dict[str, str], ) -> dict[tuple[str, str], dict[str, Any]]: relations: dict[tuple[str, str], dict[str, Any]] = {} for edge in root.findall(".//graphml:edge", GRAPHML_NAMESPACE): source = str(edge.attrib.get("source") or "").strip() target = str(edge.attrib.get("target") or "").strip() if not source or not target: continue properties = _read_graphml_data(edge, key_names) description_parts = _split_graph_property(properties.get("description")) relations[(source, target)] = { "description": "; ".join(description_parts[:2]), "keywords": _split_graph_keywords(properties.get("keywords"))[:6], "weight": _to_float(properties.get("weight"), default=1.0), "properties": _filter_graph_properties(properties), } return relations def _read_graphml_data( element: ElementTree.Element, key_names: dict[str, str], ) -> dict[str, str]: data: dict[str, str] = {} for item in element.findall("graphml:data", GRAPHML_NAMESPACE): key = str(item.attrib.get("key") or "") name = key_names.get(key) or key if not name: continue data[name] = str(item.text or "").strip() return data def _split_graph_property(value: Any) -> list[str]: return [ _truncate_text(part, max_length=MAX_INGEST_LOG_TEXT_PREVIEW) for part in str(value or "").split(GRAPH_PROPERTY_SEPARATOR) if str(part or "").strip() ] def _split_graph_keywords(value: Any) -> list[str]: keywords: list[str] = [] for part in str(value or "").split(GRAPH_PROPERTY_SEPARATOR): keywords.extend(part.split(",")) return [ _truncate_text(keyword, max_length=60) for keyword in keywords if str(keyword or "").strip() ] def _filter_graph_properties(properties: dict[str, Any]) -> dict[str, Any]: hidden_fields = { "source_id", "file_path", "truncate", "description", "keywords", } return { key: value for key, value in properties.items() if key not in hidden_fields and str(value or "").strip() } def _extract_entity_names(raw_entities: Any) -> list[str]: if not isinstance(raw_entities, list): return [] names: list[str] = [] for entity in raw_entities: if isinstance(entity, dict): name = str( entity.get("name") or entity.get("entity") or entity.get("entity_id") or entity.get("id") or "" ).strip() else: name = str(entity or "").strip() if name: names.append(name) return _dedupe_text_items(names) def _extract_relation_items(raw_relations: Any) -> list[dict[str, Any]]: if not isinstance(raw_relations, list): return [] relations: list[dict[str, Any]] = [] for relation in raw_relations: if not isinstance(relation, dict): continue source = str(relation.get("source") or relation.get("from") or "").strip() target = str(relation.get("target") or relation.get("to") or "").strip() if not source or not target: continue relations.append( { **relation, "source": source, "target": target, "type": str(relation.get("type") or "关联").strip(), } ) return relations def _extract_ingest_sections(text: str) -> list[dict[str, str]]: sections: list[dict[str, str]] = [] lines = [line.strip() for line in str(text or "").splitlines()] for index, line in enumerate(lines): if len(sections) >= MAX_INGEST_LOG_SECTIONS: break if not line or len(line) > 90 or not INGEST_SECTION_HEADING_PATTERN.match(line): continue sections.append( { "title": line.lstrip("#").strip(), "excerpt": _find_following_excerpt(lines[index + 1 :]), } ) return sections def _find_following_excerpt(lines: list[str]) -> str: collected: list[str] = [] for line in lines: if not line: continue if INGEST_SECTION_HEADING_PATTERN.match(line): break collected.append(line) if len(" ".join(collected)) >= MAX_INGEST_LOG_TEXT_PREVIEW: break return _truncate_text(" ".join(collected), max_length=MAX_INGEST_LOG_TEXT_PREVIEW) def _normalize_chunk_ids(status_payload: dict[str, Any]) -> list[str]: chunks_list = status_payload.get("chunks_list") if not isinstance(chunks_list, list): return [] return [str(item).strip() for item in chunks_list if str(item or "").strip()] def _resolve_chunk_count(status_payload: dict[str, Any], chunk_ids: list[str]) -> int: try: return int(status_payload.get("chunks_count") or len(chunk_ids)) except (TypeError, ValueError): return len(chunk_ids) def _load_json_file(path: Path) -> dict[str, Any]: try: payload = json.loads(path.read_text(encoding="utf-8")) except (FileNotFoundError, json.JSONDecodeError, OSError): return {} return payload if isinstance(payload, dict) else {} def _normalize_document_entities(payload: dict[str, Any], document_id: str) -> list[str]: document_payload = payload.get(document_id) if isinstance(payload, dict) else {} entity_names = ( document_payload.get("entity_names") if isinstance(document_payload, dict) else [] ) if not isinstance(entity_names, list): return [] return _dedupe_text_items(entity_names) def _normalize_document_relations( payload: dict[str, Any], document_id: str ) -> list[dict[str, str]]: document_payload = payload.get(document_id) if isinstance(payload, dict) else {} relation_pairs = ( document_payload.get("relation_pairs") if isinstance(document_payload, dict) else [] ) if not isinstance(relation_pairs, list): return [] relations: list[dict[str, str]] = [] seen: set[tuple[str, str]] = set() for pair in relation_pairs: if not isinstance(pair, (list, tuple)) or len(pair) < 2: continue source = str(pair[0] or "").strip() target = str(pair[1] or "").strip() if not source or not target or (source, target) in seen: continue seen.add((source, target)) relations.append({"source": source, "target": target, "type": "关联"}) return relations def _normalize_document_chunks(payload: dict[str, Any], document_id: str) -> list[dict[str, Any]]: chunks: list[dict[str, Any]] = [] for chunk_id, raw_chunk in payload.items(): if not isinstance(raw_chunk, dict): continue if str(raw_chunk.get("full_doc_id") or "").strip() != document_id: continue content = str(raw_chunk.get("content") or "").strip() chunks.append( { "id": str(raw_chunk.get("_id") or chunk_id).strip(), "order": _to_int(raw_chunk.get("chunk_order_index")), "tokens": _to_int(raw_chunk.get("tokens")), "summary": _build_chunk_summary(content), "excerpt": _truncate_text( content, max_length=MAX_INGEST_LOG_TEXT_PREVIEW, ), } ) return sorted(chunks, key=lambda item: (item["order"], item["id"])) def _normalize_document_entity_chunks( payload: dict[str, Any], entities: list[str], *, chunk_ids: set[str], ) -> list[dict[str, Any]]: if not entities or not chunk_ids: return [] entity_chunks: list[dict[str, Any]] = [] for entity in entities: raw_entry = payload.get(entity) if isinstance(payload, dict) else {} raw_chunk_ids = raw_entry.get("chunk_ids") if isinstance(raw_entry, dict) else [] if not isinstance(raw_chunk_ids, list): continue matched_chunk_ids = [ str(item or "").strip() for item in raw_chunk_ids if str(item or "").strip() in chunk_ids ] if not matched_chunk_ids: continue entity_chunks.append( { "entity": entity, "chunk_ids": _dedupe_text_items(matched_chunk_ids), } ) return entity_chunks def _build_chunk_summary(content: str) -> str: lines = [line.strip() for line in str(content or "").splitlines() if line.strip()] text = next((line for line in lines if len(line) >= 12), lines[0] if lines else "") return _truncate_text(text, max_length=MAX_INGEST_LOG_TEXT_PREVIEW) def _dedupe_text_items(items: list[Any]) -> list[str]: deduped: list[str] = [] seen: set[str] = set() for item in items: text = str(item or "").strip() if not text or text in seen: continue seen.add(text) deduped.append(text) return deduped def _to_int(value: Any) -> int: try: return int(value or 0) except (TypeError, ValueError): return 0 def _to_float(value: Any, *, default: float = 0.0) -> float: try: return float(value) except (TypeError, ValueError): return default def _truncate_text(text: str, *, max_length: int) -> str: normalized = " ".join(str(text or "").split()).strip() if len(normalized) <= max_length: return normalized return f"{normalized[: max_length - 3].rstrip()}..."