diff --git a/ai-core/main.py b/ai-core/main.py new file mode 100644 index 0000000..713ee5b --- /dev/null +++ b/ai-core/main.py @@ -0,0 +1,66 @@ +""" +AI-Core Document Parser gRPC Server + +启动命令: python main.py [--port PORT] [--max-workers MAX_WORKERS] [--log-level LEVEL] +""" +import argparse +import logging +import os +import sys + +sys.path.insert(0, os.path.dirname(__file__)) + +from service.grpc_server import serve + +DEFAULT_PORT = 50051 +DEFAULT_MAX_WORKERS = 10 + + +def main(): + parser = argparse.ArgumentParser( + description="Document Parser gRPC Server", + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + ) + parser.add_argument( + "--port", + type=int, + default=DEFAULT_PORT, + help="Port to listen on", + ) + parser.add_argument( + "--max-workers", + type=int, + default=DEFAULT_MAX_WORKERS, + help="Maximum number of worker threads", + ) + parser.add_argument( + "--log-level", + type=str, + default="INFO", + choices=["DEBUG", "INFO", "WARNING", "ERROR"], + help="Log level", + ) + + args = parser.parse_args() + + logging.basicConfig( + level=getattr(logging, args.log_level), + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + ) + + logger = logging.getLogger(__name__) + logger.info("Starting Document Parser gRPC Server") + logger.info("Port: %d", args.port) + logger.info("Max workers: %d", args.max_workers) + + try: + serve(port=args.port, max_workers=args.max_workers) + except KeyboardInterrupt: + logger.info("Server shutdown requested") + except Exception as e: + logger.error("Server error: %s", str(e), exc_info=True) + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/ai-core/parser/__init__.py b/ai-core/parser/__init__.py index f37075d..7ef9224 100644 --- a/ai-core/parser/__init__.py +++ b/ai-core/parser/__init__.py @@ -1,38 +1,10 @@ """ -Parser module for WeKnora document processing system. - -This module provides document parsers for various file formats including: -- Microsoft Word documents (.doc, .docx) -- PDF documents -- Markdown files -- Plain text files -- Images with text content -- Web pages - -The parsers extract content from documents and can split them into -meaningful chunks for further processing and indexing. +Parser module for AI-Core document processing. """ -from .doc_parser import DocParser -from .docx2_parser import Docx2Parser -from .excel_parser import ExcelParser -from .image_parser import ImageParser -from .markdown_parser import MarkdownParser -from .parser import Parser -from .pdf_parser import PDFParser -from .registry import ParserEngineRegistry, registry -from .web_parser import WebParser +from .parser_simple import Parser, Document -# Export public classes and modules __all__ = [ - "Docx2Parser", - "DocParser", - "PDFParser", - "MarkdownParser", - "ImageParser", - "WebParser", "Parser", - "ExcelParser", - "ParserEngineRegistry", - "registry", + "Document", ] diff --git a/ai-core/parser/base_parser.py b/ai-core/parser/base_parser.py new file mode 100644 index 0000000..b868c1e --- /dev/null +++ b/ai-core/parser/base_parser.py @@ -0,0 +1,61 @@ +# -*- coding: utf-8 -*- +import logging +import os +from abc import ABC, abstractmethod +from typing import Optional + +from docreader.models.document import Document + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + + +class BaseParser(ABC): + """Base parser interface. + + After the lightweight refactoring, BaseParser only extracts markdown text + and raw image references from documents. Chunking, image storage, OCR, + and VLM caption are handled by the Go App module. + """ + + def __init__( + self, + file_name: str = "", + file_type: Optional[str] = None, + **kwargs, + ): + self.file_name = file_name + self.file_type = file_type or os.path.splitext(file_name)[1].lstrip(".") + + logger.info( + "Initializing parser for file=%s, type=%s", + file_name, + self.file_type, + ) + + @abstractmethod + def parse_into_text(self, content: bytes) -> Document: + """Parse document content into markdown text. + + Returns: + Document with ``content`` (markdown string) and optional + ``images`` dict mapping storage-relative paths to base64 data. + """ + + def parse(self, content: bytes) -> Document: + """Parse document and return markdown + image references. + + No chunking, no OCR, no VLM caption — those are done in Go. + """ + logger.info( + "Parsing document with %s, bytes: %d", + self.__class__.__name__, + len(content), + ) + document = self.parse_into_text(content) + logger.info( + "Extracted %d characters from %s", + len(document.content), + self.file_name, + ) + return document diff --git a/ai-core/parser/chain_parser.py b/ai-core/parser/chain_parser.py new file mode 100644 index 0000000..6e5698d --- /dev/null +++ b/ai-core/parser/chain_parser.py @@ -0,0 +1,176 @@ +""" +Chain Parser Module + +This module provides two chain-of-responsibility pattern implementations for document parsing: +1. FirstParser: Tries multiple parsers sequentially until one succeeds +2. PipelineParser: Chains parsers where each parser processes the output of the previous one +""" + +import logging +from typing import Dict, List, Tuple, Type + +from docreader.models.document import Document +from docreader.parser.base_parser import BaseParser +from docreader.utils import endecode + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + + +class FirstParser(BaseParser): + """ + First-success parser that tries multiple parsers in sequence. + + This parser attempts to parse content using each registered parser in order. + It returns the result from the first parser that successfully produces a valid document. + If all parsers fail, it returns an empty Document. + + Usage: + # Create a custom FirstParser with specific parser classes + CustomParser = FirstParser.create(MarkdownParser, HTMLParser) + parser = CustomParser() + document = parser.parse_into_text(content_bytes) + """ + + # Tuple of parser classes to be instantiated + _parser_cls: Tuple[Type["BaseParser"], ...] = () + + def __init__(self, *args, **kwargs): + """Initialize FirstParser with configured parser classes.""" + super().__init__(*args, **kwargs) + + # Instantiate all parser classes into parser instances + self._parsers: List[BaseParser] = [] + for parser_cls in self._parser_cls: + parser = parser_cls(*args, **kwargs) + self._parsers.append(parser) + + def parse_into_text(self, content: bytes) -> Document: + """Parse content using the first parser that succeeds. + + Args: + content: Raw bytes content to be parsed + + Returns: + Document: Parsed document from the first successful parser, + or an empty Document if all parsers fail + """ + for p in self._parsers: + logger.info(f"FirstParser: using parser {p.__class__.__name__}") + try: + document = p.parse_into_text(content) + except Exception: + logger.exception( + "FirstParser: parser %s raised exception; trying next parser", + p.__class__.__name__, + ) + continue + + if document.is_valid(): + logger.info(f"FirstParser: parser {p.__class__.__name__} succeeded") + return document + return Document() + + @classmethod + def create(cls, *parser_classes: Type["BaseParser"]) -> Type["FirstParser"]: + """Factory method to create a FirstParser subclass with specific parsers. + + Args: + *parser_classes: Variable number of BaseParser subclasses to try in order + + Returns: + Type[FirstParser]: A new FirstParser subclass configured with the given parsers + + Example: + CustomParser = FirstParser.create(MarkdownParser, HTMLParser) + parser = CustomParser() + """ + # Generate a descriptive class name based on parser names + names = "_".join([p.__name__ for p in parser_classes]) + # Dynamically create a new class with the parser configuration + return type(f"FirstParser_{names}", (cls,), {"_parser_cls": parser_classes}) + + +class PipelineParser(BaseParser): + """ + Pipeline parser that chains multiple parsers sequentially. + + This parser processes content through a series of parsers where each parser + receives the output of the previous parser as input. Images from all parsers + are accumulated and merged into the final document. + + Usage: + # Create a custom PipelineParser with specific parser classes + CustomParser = PipelineParser.create(PreParser, MarkdownParser, PostParser) + parser = CustomParser() + document = parser.parse_into_text(content_bytes) + """ + + # Tuple of parser classes to be instantiated and chained + _parser_cls: Tuple[Type["BaseParser"], ...] = () + + def __init__(self, *args, **kwargs): + """Initialize PipelineParser with configured parser classes.""" + super().__init__(*args, **kwargs) + + # Instantiate all parser classes into parser instances + self._parsers: List[BaseParser] = [] + for parser_cls in self._parser_cls: + parser = parser_cls(*args, **kwargs) + self._parsers.append(parser) + + def parse_into_text(self, content: bytes) -> Document: + """Parse content through a pipeline of parsers. + + Each parser in the pipeline processes the output of the previous parser. + Images from all parsers are accumulated and merged into the final document. + + Args: + content: Raw bytes content to be parsed + + Returns: + Document: Final document after processing through all parsers, + with accumulated images from all stages + """ + # Accumulate images from all parsers + images: Dict[str, str] = {} + document = Document() + for p in self._parsers: + logger.info(f"PipelineParser: using parser {p.__class__.__name__}") + # Parse content with current parser + document = p.parse_into_text(content) + # Convert document content back to bytes for next parser + content = endecode.encode_bytes(document.content) + # Accumulate images from this parser + images.update(document.images) + # Merge all accumulated images into final document + document.images.update(images) + return document + + @classmethod + def create(cls, *parser_classes: Type["BaseParser"]) -> Type["PipelineParser"]: + """Factory method to create a PipelineParser subclass with specific parsers. + + Args: + *parser_classes: Variable number of BaseParser subclasses to chain in order + + Returns: + Type[PipelineParser]: A new PipelineParser subclass configured with the given parsers + + Example: + CustomParser = PipelineParser.create(PreprocessParser, MarkdownParser) + parser = CustomParser() + """ + # Generate a descriptive class name based on parser names + names = "_".join([p.__name__ for p in parser_classes]) + # Dynamically create a new class with the parser configuration + return type(f"PipelineParser_{names}", (cls,), {"_parser_cls": parser_classes}) + + +if __name__ == "__main__": + from docreader.parser.markdown_parser import MarkdownParser + + # Example: Create and use a FirstParser with MarkdownParser + FpCls = FirstParser.create(MarkdownParser) + lparser = FpCls() + print(lparser.parse_into_text(b"aaa")) diff --git a/ai-core/parser/doc_parser.py b/ai-core/parser/doc_parser.py new file mode 100644 index 0000000..69da435 --- /dev/null +++ b/ai-core/parser/doc_parser.py @@ -0,0 +1,331 @@ +import logging +import os +import subprocess +from typing import List, Optional + +import textract + +from docreader.config import CONFIG +from docreader.models.document import Document +from docreader.parser.docx2_parser import Docx2Parser +from docreader.utils.tempfile import TempDirContext, TempFileContext + +logger = logging.getLogger(__name__) + + +class SandboxExecutor: + """Sandbox executor for running commands with proxy configuration""" + + def __init__(self, proxy: Optional[str] = None, default_timeout: int = 60): + """Initialize sandbox executor with configuration + + Args: + proxy: Proxy URL to use for network access. If None, will use WEB_PROXY environment variable + default_timeout: Default timeout in seconds for command execution + """ + # Get proxy from parameter, environment variable, or use default blocking proxy + # Use 'or None' to convert empty string to None, then apply default value + self.proxy = proxy or CONFIG.external_https_proxy or "http://128.0.0.1:1" + self.default_timeout = default_timeout + + def execute_in_sandbox(self, cmd: List[str]) -> tuple: + """Execute command in sandbox with proxy configuration + + Args: + cmd: Command to execute + + Returns: + Tuple of (stdout, stderr, returncode) + """ + # Try different sandbox methods in order of preference + sandbox_methods = [ + self._execute_with_proxy, + ] + + for method in sandbox_methods: + try: + return method(cmd) + except Exception as e: + logger.warning(f"Sandbox method {method.__name__} failed: {e}") + continue + + raise RuntimeError("All sandbox methods failed") + + def _execute_with_proxy(self, cmd: List[str]) -> tuple: + """Execute command with proxy configuration + + Args: + cmd: Command to execute + + Returns: + Tuple of (stdout, stderr, returncode) + """ + # Set up environment with proxy configuration + env = os.environ.copy() + if self.proxy: + env["http_proxy"] = self.proxy + env["https_proxy"] = self.proxy + env["HTTP_PROXY"] = self.proxy + env["HTTPS_PROXY"] = self.proxy + + logger.info(f"Executing command with proxy: {' '.join(cmd)}") + if self.proxy: + logger.info(f"Using proxy: {self.proxy}") + + process = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + env=env, + ) + + try: + stdout, stderr = process.communicate(timeout=self.default_timeout) + return stdout, stderr, process.returncode + except subprocess.TimeoutExpired: + process.kill() + raise RuntimeError( + f"Command execution timeout after {self.default_timeout} seconds" + ) + + +logger = logging.getLogger(__name__) + + +class DocParser(Docx2Parser): + """DOC document parser""" + + def __init__(self, *args, **kwargs): + """Initialize DOC parser with sandbox executor""" + super().__init__(*args, **kwargs) + self.sandbox_executor = SandboxExecutor() + + def parse_into_text(self, content: bytes) -> Document: + logger.info(f"Parsing DOC document, content size: {len(content)} bytes") + + handle_chain = [ + # 1. Try to convert to docx format to extract images + self._parse_with_docx, + # 2. If image extraction is not needed or conversion failed, + # try using antiword to extract text + self._parse_with_antiword, + # 3. If antiword extraction fails, use textract + # NOTE: _parse_with_textract is disabled due to SSRF vulnerability + # self._parse_with_textract, + ] + + # Save byte content as a temporary file + with TempFileContext(content, ".doc") as temp_file_path: + for handle in handle_chain: + try: + document = handle(temp_file_path) + if document: + return document + except Exception as e: + logger.warning(f"Failed to parse DOC with {handle.__name__} {e}") + + return Document(content="") + + def _parse_with_docx(self, temp_file_path: str) -> Document: + logger.info("Multimodal enabled, attempting to extract images from DOC") + + docx_content = self._try_convert_doc_to_docx(temp_file_path) + if not docx_content: + raise RuntimeError("Failed to convert DOC to DOCX") + + logger.info("Successfully converted DOC to DOCX, using DocxParser") + # Use existing DocxParser to parse the converted docx + document = super(Docx2Parser, self).parse_into_text(docx_content) + logger.info(f"Extracted {len(document.content)} characters using DocxParser") + return document + + def _parse_with_antiword(self, temp_file_path: str) -> Document: + logger.info("Attempting to parse DOC file with antiword") + + # Check if antiword is installed + antiword_path = self._try_find_antiword() + if not antiword_path: + raise RuntimeError("antiword not found in PATH") + + # Use antiword to extract text directly in sandbox + cmd = [antiword_path, temp_file_path] + logger.info("Executing antiword in sandbox with proxy configuration") + + stdout, stderr, returncode = self.sandbox_executor.execute_in_sandbox(cmd) + + if returncode != 0: + raise RuntimeError( + f"antiword extraction failed: {stderr.decode('utf-8', errors='ignore')}" + ) + text = stdout.decode("utf-8", errors="ignore") + logger.info(f"Successfully extracted {len(text)} characters using antiword") + return Document(content=text) + + def _parse_with_textract(self, temp_file_path: str) -> Document: + logger.info(f"Parsing DOC file with textract: {temp_file_path}") + text = textract.process(temp_file_path, method="antiword").decode("utf-8") + logger.info(f"Successfully extracted {len(text)} bytes of DOC using textract") + return Document(content=str(text)) + + def _try_convert_doc_to_docx(self, doc_path: str) -> Optional[bytes]: + """Convert DOC file to DOCX format + + Uses LibreOffice/OpenOffice for conversion + + Args: + doc_path: DOC file path + + Returns: + Byte stream of DOCX file content, or None if conversion fails + """ + logger.info(f"Converting DOC to DOCX: {doc_path}") + + # Check if LibreOffice or OpenOffice is installed + soffice_path = self._try_find_soffice() + if not soffice_path: + return None + + # Execute conversion command + logger.info(f"Using {soffice_path} to convert DOC to DOCX") + + # Create a temporary directory to store the converted file + with TempDirContext() as temp_dir: + cmd = [ + soffice_path, + "--headless", + "--convert-to", + "docx", + "--outdir", + temp_dir, + doc_path, + ] + logger.info(f"Running command in sandbox: {' '.join(cmd)}") + + # Execute in sandbox with proxy configuration + stdout, stderr, returncode = self.sandbox_executor.execute_in_sandbox(cmd) + + if returncode != 0: + logger.warning( + f"Error converting DOC to DOCX: {stderr.decode('utf-8')}" + ) + return None + + # Find the converted file + docx_file = [ + file for file in os.listdir(temp_dir) if file.endswith(".docx") + ] + logger.info(f"Found {len(docx_file)} DOCX file(s) in temporary directory") + for file in docx_file: + converted_file = os.path.join(temp_dir, file) + logger.info(f"Found converted file: {converted_file}") + + # Read the converted file content + with open(converted_file, "rb") as f: + docx_content = f.read() + logger.info( + f"Successfully read DOCX file, size: {len(docx_content)}" + ) + return docx_content + return None + + def _try_find_executable_path( + self, + executable_name: str, + possible_path: List[str] = [], + environment_variable: List[str] = [], + ) -> Optional[str]: + """Find executable path + Args: + executable_name: Executable name + possible_path: List of possible paths + environment_variable: List of environment variables to check + Returns: + Executable path, or None if not found + """ + # Common executable paths + paths: List[str] = [] + paths.extend(possible_path) + paths.extend(os.environ.get(env_var, "") for env_var in environment_variable) + paths = list(set(paths)) + + # Check if path is set in environment variable + for path in paths: + if os.path.exists(path): + logger.info(f"Found {executable_name} at {path}") + return path + + # Try to find in PATH + result = subprocess.run( + ["which", executable_name], capture_output=True, text=True + ) + if result.returncode == 0 and result.stdout.strip(): + path = result.stdout.strip() + logger.info(f"Found {executable_name} at {path}") + return path + + logger.warning(f"Failed to find {executable_name}") + return None + + def _try_find_soffice(self) -> Optional[str]: + """Find LibreOffice/OpenOffice executable path + + Returns: + Executable path, or None if not found + """ + # Common LibreOffice/OpenOffice executable paths + possible_paths = [ + # Linux + "/usr/bin/soffice", + "/usr/lib/libreoffice/program/soffice", + "/opt/libreoffice25.2/program/soffice", + # macOS + "/Applications/LibreOffice.app/Contents/MacOS/soffice", + # Windows + "C:\\Program Files\\LibreOffice\\program\\soffice.exe", + "C:\\Program Files (x86)\\LibreOffice\\program\\soffice.exe", + ] + return self._try_find_executable_path( + executable_name="soffice", + possible_path=possible_paths, + environment_variable=["LIBREOFFICE_PATH"], + ) + + def _try_find_antiword(self) -> Optional[str]: + """Find antiword executable path + + Returns: + Executable path, or None if not found + """ + # Common antiword executable paths + possible_paths = [ + # Linux/macOS + "/usr/bin/antiword", + "/usr/local/bin/antiword", + # Windows + "C:\\Program Files\\Antiword\\antiword.exe", + "C:\\Program Files (x86)\\Antiword\\antiword.exe", + ] + return self._try_find_executable_path( + executable_name="antiword", + possible_path=possible_paths, + environment_variable=["ANTIWORD_PATH"], + ) + + +if __name__ == "__main__": + logging.basicConfig(level=logging.DEBUG) + + file_name = "/path/to/your/test.doc" + logger.info(f"Processing file: {file_name}") + doc_parser = DocParser( + file_name=file_name, + enable_multimodal=True, + chunk_size=512, + chunk_overlap=60, + ) + with open(file_name, "rb") as f: + content = f.read() + + document = doc_parser.parse_into_text(content) + logger.info(f"Processing complete, extracted text length: {len(document.content)}") + logger.info(f"Sample text: {document.content[:200]}...") diff --git a/ai-core/parser/docx2_parser.py b/ai-core/parser/docx2_parser.py new file mode 100644 index 0000000..872b3ef --- /dev/null +++ b/ai-core/parser/docx2_parser.py @@ -0,0 +1,28 @@ +import logging + +from docreader.parser.chain_parser import FirstParser +from docreader.parser.docx_parser import DocxParser +from docreader.parser.markitdown_parser import MarkitdownParser + +logger = logging.getLogger(__name__) + + +class Docx2Parser(FirstParser): + _parser_cls = (MarkitdownParser, DocxParser) + + +if __name__ == "__main__": + logging.basicConfig(level=logging.DEBUG) + + your_file = "/path/to/your/file.docx" + parser = Docx2Parser(separators=[".", "?", "!", "。", "?", "!"]) + with open(your_file, "rb") as f: + content = f.read() + + document = parser.parse(content) + for cc in document.chunks: + logger.info(f"chunk: {cc}") + + # document = parser.parse_into_text(content) + # logger.info(f"docx content: {document.content}") + # logger.info(f"find images {document.images.keys()}") diff --git a/ai-core/parser/docx_parser.py b/ai-core/parser/docx_parser.py new file mode 100644 index 0000000..12ffd4a --- /dev/null +++ b/ai-core/parser/docx_parser.py @@ -0,0 +1,1509 @@ +import logging +import os +import re +import tempfile +import threading +import time +import traceback +from concurrent.futures import ProcessPoolExecutor, as_completed +from dataclasses import dataclass, field +from io import BytesIO +from multiprocessing import Manager +from typing import Any, Dict, List, Optional, Tuple + +from docx import Document +from docx.image.exceptions import ( + InvalidImageStreamError, + UnexpectedEndOfFileError, + UnrecognizedImageError, +) +from PIL import Image + +from docreader.models.document import Document as DocumentModel +from docreader.parser.base_parser import BaseParser +from docreader.utils import endecode + +logger = logging.getLogger(__name__) + + +class ImageData: + """Represents a processed image of document content""" + + local_path: str = "" + object: Optional[Image.Image] = None + url: str = "" + + +@dataclass +class LineData: + """Represents a processed line of document content with associated images""" + + text: str = "" # Extracted text content + images: List[ImageData] = field( + default_factory=list + ) # List of images or image paths + extra_info: str = "" # Placeholder for additional info (currently unused) + page_num: int = 0 # Page number + content_sequence: List[Tuple[str, Any]] = field( + default_factory=list + ) # Sequence of content items (text/images) + + +class DocxParser(BaseParser): + """DOCX document parser""" + + def __init__( + self, + max_pages: int = 100, # Maximum number of pages to process + **kwargs, + ): + """Initialize DOCX document parser + + Args: + file_name: File name + file_type: File type, if None, infer from file name + enable_multimodal: Whether to enable multimodal processing + chunk_size: Chunk size + chunk_overlap: Chunk overlap + separators: List of separators + ocr_backend: OCR engine type + ocr_config: OCR engine configuration + max_image_size: Maximum image size limit + max_concurrent_tasks: Maximum number of concurrent tasks + max_pages: Maximum number of pages to process + """ + super().__init__(**kwargs) + self.max_pages = max_pages + logger.info(f"DocxParser initialized with max_pages={max_pages}") + + def parse_into_text(self, content: bytes) -> DocumentModel: + """Parse DOCX document, extract text content and image Markdown links""" + logger.info(f"Parsing DOCX document, content size: {len(content)} bytes") + logger.info(f"Max pages limit set to: {self.max_pages}") + + start_time = time.time() + # Use concurrent processing to handle the document + max_workers = min( + 4, os.cpu_count() or 2 + ) # Reduce thread count to avoid excessive memory consumption + logger.info(f"Setting max_workers to {max_workers} for document processing") + + try: + inline_images: Dict[str, str] = {} + + def _inline_upload(local_path: str) -> str: + """Read temp image file, base64-encode, and return a ref path. + + The Go-side ImageResolver (or main.py _resolve_images) handles + actual storage upload from Document.images. + """ + import base64 + import uuid as _uuid + + try: + with open(local_path, "rb") as f: + raw = f.read() + ext = os.path.splitext(local_path)[1].lower() or ".png" + ref = f"images/{_uuid.uuid4().hex}{ext}" + inline_images[ref] = base64.b64encode(raw).decode() + return ref + except Exception as exc: + logger.warning("Failed to read temp image %s: %s", local_path, exc) + return "" + + logger.info(f"Starting Docx processing with max_pages={self.max_pages}") + docx_processor = Docx( + max_image_size=1920, + enable_multimodal=True, + upload_file=_inline_upload, + ) + all_lines, tables = docx_processor( + binary=content, + max_workers=max_workers, + to_page=self.max_pages, + ) + processing_time = time.time() - start_time + logger.info( + f"Docx processing completed in {processing_time:.2f}s, " + f"extracted {len(all_lines)} sections and {len(tables)} tables" + ) + + logger.info("Processing document sections") + section_start_time = time.time() + + text_parts = [] + image_parts: Dict[str, str] = {} + + for sec_idx, line in enumerate(all_lines): + try: + if line.text is not None and line.text != "": + text_parts.append(line.text) + if sec_idx < 3 or sec_idx % 50 == 0: + logger.info( + f"Added section {sec_idx + 1} text: {line.text[:50]}..." + if len(line.text) > 50 + else f"Added section {sec_idx + 1} text: {line.text}" + ) + if line.images: + for image_data in line.images: + if image_data.url and image_data.object: + image_parts[image_data.url] = endecode.decode_image( + image_data.object + ) + image_data.object.close() + except Exception as e: + logger.error(f"Error processing section {sec_idx + 1}: {str(e)}") + logger.error(f"Detailed stack trace: {traceback.format_exc()}") + continue + + # Combine text + section_processing_time = time.time() - section_start_time + logger.info( + f"Section processing completed in {section_processing_time:.2f}s" + ) + logger.info("Combining all text parts") + text = "\n\n".join([part for part in text_parts if part]) + + # Check if the generated text is empty + if not text: + logger.warning("Generated text is empty, trying alternative method") + return self._parse_using_simple_method(content) + + total_processing_time = time.time() - start_time + logger.info( + f"Parsing complete in {total_processing_time:.2f}s, " + f"generated {len(text)} characters of text" + ) + + image_parts.update(inline_images) + return DocumentModel(content=text, images=image_parts) + except Exception as e: + logger.error(f"Error parsing DOCX document: {str(e)}") + logger.error(f"Detailed stack trace: {traceback.format_exc()}") + return self._parse_using_simple_method(content) + + def _parse_using_simple_method(self, content: bytes) -> DocumentModel: + """Parse document using a simplified method, as a fallback + + Args: + content: Document content + + Returns: + Parsed text + """ + logger.info("Attempting to parse document using simplified method") + start_time = time.time() + try: + doc = Document(BytesIO(content)) + logger.info( + f"Successfully loaded document in simplified method, " + f"contains {len(doc.paragraphs)} paragraphs " + f"and {len(doc.tables)} tables" + ) + text_parts = [] + + # Extract paragraph text + para_count = len(doc.paragraphs) + logger.info(f"Extracting text from {para_count} paragraphs") + para_with_text = 0 + for i, para in enumerate(doc.paragraphs): + if i % 100 == 0: + logger.info(f"Processing paragraph {i + 1}/{para_count}") + if para.text.strip(): + text_parts.append(para.text.strip()) + para_with_text += 1 + + logger.info(f"Extracted text from {para_with_text}/{para_count} paragraphs") + + # Extract table text + table_count = len(doc.tables) + logger.info(f"Extracting text from {table_count} tables") + tables_with_content = 0 + rows_processed = 0 + for i, table in enumerate(doc.tables): + if i % 10 == 0: + logger.info(f"Processing table {i + 1}/{table_count}") + + table_has_content = False + for row in table.rows: + rows_processed += 1 + row_text = " | ".join( + [cell.text.strip() for cell in row.cells if cell.text.strip()] + ) + if row_text: + text_parts.append(row_text) + table_has_content = True + + if table_has_content: + tables_with_content += 1 + + logger.info( + f"Extracted content from {tables_with_content}/{table_count} tables, " + f"processed {rows_processed} rows" + ) + + # Combine text + result_text = "\n\n".join(text_parts) + processing_time = time.time() - start_time + logger.info( + f"Simplified parsing complete in {processing_time:.2f}s, " + f"generated {len(result_text)} characters of text" + ) + + # If the result is still empty, return an error message + if not result_text: + logger.warning("No text extracted using simplified method") + return DocumentModel() + + return DocumentModel(content=result_text) + except Exception as backup_error: + processing_time = time.time() - start_time + logger.error( + f"Simplified parsing failed {processing_time:.2f}s: {backup_error}" + ) + logger.error(f"Detailed traceback: {traceback.format_exc()}") + return DocumentModel() + + +class Docx: + def __init__(self, max_image_size=1920, enable_multimodal=False, upload_file=None): + logger.info("Initializing DOCX processor") + self.max_image_size = max_image_size # Maximum image size limit + # Image cache to avoid processing the same image repeatedly + self.picture_cache = {} + self.enable_multimodal = enable_multimodal + self.upload_file = upload_file + + def get_picture(self, document, paragraph) -> Optional[Image.Image]: + logger.info("Extracting image from paragraph") + img = paragraph._element.xpath(".//pic:pic") + if not img: + logger.info("No image found in paragraph") + return None + img = img[0] + try: + embed = img.xpath(".//a:blip/@r:embed")[0] + related_part = document.part.related_parts[embed] + logger.info(f"Found embedded image with ID: {embed}") + + try: + image_blob = related_part.image.blob + except UnrecognizedImageError: + logger.warning("Unrecognized image format. Skipping image.") + return None + except UnexpectedEndOfFileError: + logger.warning( + "EOF was unexpectedly encountered while reading an image stream. Skipping image." + ) + return None + except InvalidImageStreamError: + logger.warning( + "The recognized image stream appears to be corrupted. Skipping image." + ) + return None + + try: + logger.info("Converting image blob to PIL Image") + image = Image.open(BytesIO(image_blob)).convert("RGBA") + logger.info( + f"Successfully extracted image, size: {image.width}x{image.height}" + ) + return image + except Exception as e: + logger.error(f"Failed to open image: {str(e)}") + return None + except Exception as e: + logger.error(f"Error extracting image: {str(e)}") + return None + + def _identify_page_paragraph_mapping(self, max_page=100000): + """Identify the paragraph range included on each page + + Args: + max_page: Maximum number of pages to process + + Returns: + dict: Mapping of page numbers to lists of paragraph indices + """ + start_time = time.time() + logger.info(f"Identifying page to paragraph mapping (max_page={max_page})") + page_to_paragraphs = {} + current_page = 0 + + # Initialize page 0 + page_to_paragraphs[current_page] = [] + + # Record the total number of paragraphs processed + total_paragraphs = len(self.doc.paragraphs) + logger.info(f"Total paragraphs to map: {total_paragraphs}") + + # Heuristic method: estimate the number of paragraphs per page + # For large documents, using a heuristic can reduce XML parsing overhead + if total_paragraphs > 1000: + logger.info("Large document detected, using heuristic paragraph mapping") + estimated_paras_per_page = ( + 25 # Estimate approximately 25 paragraphs per page + ) + + # Create an estimated page mapping + for p_idx in range(total_paragraphs): + est_page = p_idx // estimated_paras_per_page + if est_page > max_page: + logger.info( + f"Reached max page limit ({max_page}) at paragraph {p_idx}, stopping paragraph mapping" + ) + break + + if est_page not in page_to_paragraphs: + page_to_paragraphs[est_page] = [] + + page_to_paragraphs[est_page].append(p_idx) + + if p_idx > 0 and p_idx % 1000 == 0: + logger.info( + f"Heuristic mapping: processed {p_idx}/{total_paragraphs} paragraphs" + ) + + mapping_time = time.time() - start_time + logger.info( + f"Created heuristic mapping with {len(page_to_paragraphs)} pages in {mapping_time:.2f}s" + ) + return page_to_paragraphs + + # Standard method: iterate through all paragraphs to find page breaks + logger.info("Using standard paragraph mapping method") + page_breaks_found = 0 + for p_idx, p in enumerate(self.doc.paragraphs): + # Add the current paragraph to the current page + page_to_paragraphs[current_page].append(p_idx) + + # Log every 100 paragraphs + if p_idx > 0 and p_idx % 100 == 0: + logger.info( + f"Processed {p_idx}/{total_paragraphs} paragraphs in page mapping" + ) + + # Check for page breaks + page_break_found = False + + # Method 1: Check for lastRenderedPageBreak + for run in p.runs: + if "lastRenderedPageBreak" in run._element.xml: + page_break_found = True + break + + if "w:br" in run._element.xml and 'type="page"' in run._element.xml: + page_break_found = True + break + + # Method 2: Check sectPr element (section break, usually indicates a new page) + if not page_break_found and p._element.xpath(".//w:sectPr"): + page_break_found = True + + # If a page break is found, create a new page + if page_break_found: + page_breaks_found += 1 + current_page += 1 + if current_page > max_page: + logger.info( + f"Reached max page limit ({max_page}), stopping page mapping" + ) + break + + # Initialize the paragraph list for the new page + if current_page not in page_to_paragraphs: + page_to_paragraphs[current_page] = [] + + if page_breaks_found % 10 == 0: + logger.info( + f"Found {page_breaks_found} page breaks so far, current page: {current_page}" + ) + + # Handle potential empty page mappings + empty_pages = [page for page, paras in page_to_paragraphs.items() if not paras] + if empty_pages: + logger.info(f"Removing {len(empty_pages)} empty pages from mapping") + for page in empty_pages: + del page_to_paragraphs[page] + + mapping_time = time.time() - start_time + logger.info( + f"Created paragraph mapping with {len(page_to_paragraphs)} pages in {mapping_time:.2f}s" + ) + + # Check the validity of the result + if not page_to_paragraphs: + logger.warning("No valid page mapping created, using fallback method") + # All paragraphs are on page 0 + page_to_paragraphs[0] = list(range(total_paragraphs)) + + # Log page distribution statistics + page_sizes = [len(paragraphs) for paragraphs in page_to_paragraphs.values()] + if page_sizes: + avg_paragraphs = sum(page_sizes) / len(page_sizes) + min_paragraphs = min(page_sizes) + max_paragraphs = max(page_sizes) + logger.info( + f"Page statistics: avg={avg_paragraphs:.1f}, " + f"min={min_paragraphs}, max={max_paragraphs} paragraphs per page" + ) + + return page_to_paragraphs + + def __call__( + self, + binary: Optional[bytes] = None, + from_page: int = 0, + to_page: int = 100000, + max_workers: Optional[int] = None, + ) -> Tuple[List[LineData], List[Any]]: + """ + Process DOCX document, supporting concurrent processing of each page + + Args: + binary: DOCX document binary content + from_page: Starting page number + to_page: Ending page number + max_workers: Maximum number of workers, default to None (system decides) + + Returns: + tuple: (List of LineData objects with document content, List of tables) + """ + logger.info("Processing DOCX document") + + # Check CPU core count to determine parallel strategy + cpu_count = os.cpu_count() or 2 + logger.info(f"System has {cpu_count} CPU cores available") + + # Load document + self.doc = self._load_document(binary) + if not self.doc: + return [], [] + + # Identify page structure + self.para_page_mapping = self._identify_page_paragraph_mapping(to_page) + logger.info( + f"Identified page to paragraph mapping for {len(self.para_page_mapping)} pages" + ) + + # Apply page limits + pages_to_process = self._apply_page_limit( + self.para_page_mapping, from_page, to_page + ) + if not pages_to_process: + logger.warning("No pages to process after applying page limits!") + return [], [] + + # Initialize shared resources + self._init_shared_resources() + + # Process document content + self._process_document( + binary, + pages_to_process, + from_page, + to_page, + max_workers, + ) + + # Process tables + tbls = self._process_tables() + + # Clean up document resources + self.doc = None + + logger.info( + f"Document processing complete, " + f"extracted {len(self.all_lines)} text sections and {len(tbls)} tables" + ) + return self.all_lines, tbls + + def _load_document(self, binary): + """Load document + + Args: + binary: Document binary content + + Returns: + Document: Document object, or None (if loading fails) + """ + try: + doc = Document(BytesIO(binary)) + logger.info("Successfully loaded document from binary content") + return doc + except Exception as e: + logger.error(f"Failed to load DOCX document: {str(e)}") + return None + + def _init_shared_resources(self): + """Initialize shared resources""" + # Create shared resource locks to protect data structures shared between threads + self.lines_lock = threading.Lock() + + # Initialize result containers + self.all_lines = [] + + def _get_request_id(self): + """Get current request ID""" + current_request_id = None + try: + from utils.request import get_request_id + + current_request_id = get_request_id() + logger.info( + f"Getting current request ID: {current_request_id} to pass to processing threads" + ) + except Exception as e: + logger.warning(f"Failed to get current request ID: {str(e)}") + return current_request_id + + def _apply_page_limit(self, para_page_mapping, from_page, to_page): + """Apply page limits, return the list of pages to process + + Args: + para_page_mapping: Mapping of pages to paragraphs + from_page: Starting page number + to_page: Ending page number + + Returns: + list: List of pages to process + """ + # Add page limits + total_pages = len(para_page_mapping) + if total_pages > to_page: + logger.info( + f"Document has {total_pages} pages, limiting processing to first {to_page} pages" + ) + logger.info(f"Setting to_page limit to {to_page}") + else: + logger.info( + f"Document has {total_pages} pages, processing all pages (limit: {to_page})" + ) + + # Filter out pages outside the range + all_pages = sorted(para_page_mapping.keys()) + pages_to_process = [p for p in all_pages if from_page <= p < to_page] + + # Output the actual number of pages processed for debugging + if pages_to_process: + logger.info( + f"Will process {len(pages_to_process)} pages " + f"from page {from_page} to page {min(to_page, pages_to_process[-1] if pages_to_process else from_page)}" + ) + + if len(pages_to_process) < len(all_pages): + logger.info( + f"Skipping {len(all_pages) - len(pages_to_process)} pages due to page limit" + ) + + # Log detailed page index information + if len(pages_to_process) <= 10: + logger.info(f"Pages to process: {pages_to_process}") + else: + logger.info( + f"First 5 pages to process: {pages_to_process[:5]}, last 5: {pages_to_process[-5:]}" + ) + + return pages_to_process + + def _process_document( + self, + binary, + pages_to_process, + from_page, + to_page, + max_workers, + ): + """Process large documents, using multiprocessing + + Args: + binary: Document binary content + pages_to_process: List of pages to process + from_page: Starting page number + to_page: Ending page number + max_workers: Maximum number of workers + """ + # If the number of pages is too large, process in batches to reduce memory consumption + cpu_count = os.cpu_count() or 2 + + # Check if the document contains images to optimize processing speed + doc_contains_images = self._check_document_has_images() + + # Optimize process count: dynamically adjust based on number of pages and CPU cores + if max_workers is None: + max_workers = self._calculate_optimal_workers( + doc_contains_images, pages_to_process, cpu_count + ) + + temp_file_path = self._prepare_document_sharing(binary) + + # Prepare multiprocess processing arguments + args_list = self._prepare_multiprocess_args( + pages_to_process, + from_page, + to_page, + doc_contains_images, + temp_file_path, + ) + + # Execute multiprocess tasks + self._execute_multiprocess_tasks(args_list, max_workers) + + # Clean up temporary file + self._cleanup_temp_file(temp_file_path) + + def _check_document_has_images(self): + """Check if the document contains images + + Returns: + bool: Whether the document contains images + """ + doc_contains_images = False + if hasattr(self.doc, "inline_shapes") and len(self.doc.inline_shapes) > 0: + doc_contains_images = True + logger.info( + f"Document contains {len(self.doc.inline_shapes)} inline images" + ) + return doc_contains_images + + def _calculate_optimal_workers( + self, doc_contains_images, pages_to_process, cpu_count + ): + """Calculate the optimal number of workers + + Args: + doc_contains_images: Whether the document contains images + pages_to_process: List of pages to process + cpu_count: Number of CPU cores + + Returns: + int: Optimal number of workers + """ + # If no images or few pages, use fewer processes to avoid overhead + if not doc_contains_images or len(pages_to_process) < cpu_count: + max_workers = min(len(pages_to_process), max(1, cpu_count - 1)) + else: + max_workers = min(len(pages_to_process), cpu_count) + logger.info(f"Automatically set worker count to {max_workers}") + return max_workers + + def _prepare_document_sharing(self, binary): + """Prepare document sharing method + + Args: + binary: Document binary content + + Returns: + str: Temporary file path, or None if not using + """ + + temp_file = tempfile.NamedTemporaryFile(delete=False) + temp_file_path = temp_file.name + temp_file.write(binary) + temp_file.close() + return temp_file_path + + def _prepare_multiprocess_args( + self, + pages_to_process, + from_page, + to_page, + doc_contains_images, + temp_file_path, + ): + """Prepare a list of arguments for multiprocess processing + + Args: + pages_to_process: List of pages to process + from_page: Starting page number + to_page: Ending page number + doc_contains_images: Whether the document contains images + temp_file_path: Temporary file path + + Returns: + list: List of arguments + """ + args_list = [] + for page_num in pages_to_process: + args_list.append( + ( + page_num, + self.para_page_mapping[page_num], + from_page, + to_page, + doc_contains_images, + self.max_image_size, + temp_file_path, + self.enable_multimodal, + ) + ) + + return args_list + + def _execute_multiprocess_tasks(self, args_list, max_workers): + """Execute multiprocess tasks + + Args: + args_list: List of arguments + max_workers: Maximum number of workers + """ + # Use a shared manager to share data + with Manager() as manager: + # Create shared data structures + self.all_lines = manager.list() + + logger.info( + f"Processing {len(args_list)} pages using {max_workers} processes" + ) + + # Use ProcessPoolExecutor to truly implement multi-core parallelization + batch_start_time = time.time() + with ProcessPoolExecutor(max_workers=max_workers) as executor: + logger.info(f"Started ProcessPoolExecutor with {max_workers} workers") + + # Submit all tasks + future_to_idx = { + executor.submit(process_page_multiprocess, *args): i + for i, args in enumerate(args_list) + } + logger.info( + f"Submitted {len(future_to_idx)} processing tasks to process pool" + ) + + # Collect results + self._collect_process_results( + future_to_idx, args_list, batch_start_time + ) + + def _collect_process_results(self, future_to_idx, args_list, batch_start_time): + """Collect multiprocess processing results + + Args: + future_to_idx: Mapping of Future to index + args_list: List of arguments + batch_start_time: Batch start time + + Returns: + List[LineData]: Processed results as LineData objects + """ + # Collect results + completed_count = 0 + results = [] + temp_img_paths = set() # Collect all temporary image paths + + for future in as_completed(future_to_idx): + idx = future_to_idx[future] + page_num = args_list[idx][0] + try: + page_lines = future.result() + + # Collect temporary image paths for later cleanup + for line in page_lines: + for image_data in line.images: + if image_data.local_path and image_data.local_path.startswith( + "/tmp/docx_img_" + ): + temp_img_paths.add(image_data.local_path) + + results.extend(page_lines) + completed_count += 1 + + if completed_count % max( + 1, len(args_list) // 10 + ) == 0 or completed_count == len(args_list): + elapsed_ms = int((time.time() - batch_start_time) * 1000) + progress_pct = int((completed_count / len(args_list)) * 100) + logger.info( + f"Progress: {completed_count}/{len(args_list)} pages processed " + f"({progress_pct}%, elapsed: {elapsed_ms}ms)" + ) + + except Exception as e: + logger.error(f"Error processing page {page_num}: {str(e)}") + logger.error( + f"Detailed traceback for page {page_num}: {traceback.format_exc()}" + ) + + # Process completion + processing_elapsed_ms = int((time.time() - batch_start_time) * 1000) + logger.info(f"All processing completed in {processing_elapsed_ms}ms") + + # Process results + self._process_multiprocess_results(results) + + # Clean up temporary image files + self._cleanup_temp_image_files(temp_img_paths) + + def _process_multiprocess_results(self, results: List[LineData]): + """Process multiprocess results + + Args: + results: List of processed LineData results + """ + lines = list(results) + + # Process images - must be handled in the main process for upload + # If images are being processed, they need to be handled in the main process for upload + image_upload_start = time.time() + + # Count total images to process + images_to_process = [] + processed_lines = [] + for i, line_data in enumerate(lines): + # Check if there are images + if line_data.images and len(line_data.images) > 0: + images_to_process.append(i) + logger.info( + f"Found line {i} with {len(line_data.images)} images to process" + ) + + # Process images if needed + image_url_map = {} # Map from image path to URL + if images_to_process: + logger.info( + f"Found {len(images_to_process)} lines with images to process in main process" + ) + + # First, create a mapping of image paths to uploaded URLs + for line_idx in images_to_process: + line_data = lines[line_idx] + image_paths = line_data.images + page_num = line_data.page_num + + # Process all image data objects + for image_data in image_paths: + if ( + image_data.local_path + and os.path.exists(image_data.local_path) + and image_data.local_path not in image_url_map + ): + try: + # Upload the image if it doesn't have a URL yet + if not image_data.url: + image_url = self.upload_file(image_data.local_path) + if image_url: + # Store the URL in the ImageData object + image_data.url = image_url + # Add image URL as Markdown format + markdown_image = f"![]({image_url})" + image_url_map[image_data.local_path] = ( + markdown_image + ) + logger.info( + f"Added image URL for {image_data.local_path}: {image_url}" + ) + else: + logger.warning( + f"Failed to upload image: {image_data.local_path}" + ) + else: + # Already has a URL, use it + markdown_image = f"![]({image_data.url})" + image_url_map[image_data.local_path] = markdown_image + logger.info( + f"Using existing URL for image {image_data.local_path}: {image_data.url}" + ) + except Exception as e: + logger.error( + f"Error processing image from page {page_num}: {str(e)}" + ) + + image_upload_elapsed = time.time() - image_upload_start + logger.info( + f"Finished uploading {len(image_url_map)} images in {image_upload_elapsed:.2f}s" + ) + + # Process content in original sequence order + for line_data in lines: + processed_content = [] + if line_data.content_sequence: # Check if we have processed_content + processed_content = line_data.content_sequence + page_num = line_data.page_num + + # Reconstruct text with images in original positions + combined_parts = [] + for content_type, content in processed_content: + if content_type == "text": + combined_parts.append(content) + elif content_type == "image": + # For ImageData objects, use the URL + if isinstance(content, str) and content in image_url_map: + combined_parts.append(image_url_map[content]) + elif ( + hasattr(content, "local_path") + and content.local_path in image_url_map + ): + combined_parts.append(image_url_map[content.local_path]) + + # Create the final text with proper ordering + final_text = "\n\n".join(part for part in combined_parts if part) + processed_lines.append( + LineData( + text=final_text, page_num=page_num, images=line_data.images + ) + ) + else: + processed_lines = lines + + # Sort results by page number + sorted_lines = sorted(processed_lines, key=lambda x: x.page_num) + self.all_lines = sorted_lines + + logger.info( + f"Finished processing {len(self.all_lines)} lines with interleaved images and text" + ) + + def _cleanup_temp_image_files(self, temp_paths): + """Clean up temporary image files created by multiprocessing + + Args: + temp_paths: Set of temporary file paths + """ + if not temp_paths: + return + + logger.info(f"Cleaning up {len(temp_paths)} temporary image files") + deleted_count = 0 + error_count = 0 + + for path in temp_paths: + try: + if os.path.exists(path): + os.unlink(path) + deleted_count += 1 + # Delete temporary directory (if empty) + try: + temp_dir = os.path.dirname(path) + if temp_dir.startswith("/tmp/docx_img_") and os.path.exists( + temp_dir + ): + os.rmdir(temp_dir) + except OSError: + # If directory is not empty, ignore error + pass + except Exception as e: + logger.error(f"Failed to delete temp file {path}: {str(e)}") + error_count += 1 + + logger.info( + f"Temporary file cleanup: deleted {deleted_count}, errors {error_count}" + ) + + def _cleanup_temp_file(self, temp_file_path): + """Clean up temporary file + + Args: + temp_file_path: Temporary file path + """ + if temp_file_path and os.path.exists(temp_file_path): + try: + os.unlink(temp_file_path) + logger.info(f"Removed temporary file: {temp_file_path}") + except Exception as e: + logger.error(f"Failed to remove temporary file: {str(e)}") + + def _process_tables(self): + """Process tables in the document + + Returns: + list: List of tables + """ + tbls = [] + table_count = len(self.doc.tables) + if table_count > 0: + logger.info(f"Processing {table_count} tables") + for tb_idx, tb in enumerate(self.doc.tables): + if tb_idx % 10 == 0: # Log only every 10 tables to reduce log volume + logger.info(f"Processing table {tb_idx + 1}/{table_count}") + + # Optimize: Check if table is empty + if len(tb.rows) == 0 or all(len(r.cells) == 0 for r in tb.rows): + logger.info(f"Skipping empty table {tb_idx + 1}") + continue + + table_html = self._convert_table_to_html(tb) + # Still using tuple format for tables as they are handled differently + tbls.append(((None, table_html), "")) + + return tbls + + def _convert_table_to_html(self, table): + """Convert table to HTML + + Args: + table: Table object + + Returns: + str: HTML formatted table + """ + html = "" + for r in table.rows: + html += "" + i = 0 + while i < len(r.cells): + span = 1 + c = r.cells[i] + for j in range(i + 1, len(r.cells)): + if c.text == r.cells[j].text: + span += 1 + i = j + i += 1 + html += ( + f"" + if span == 1 + else f"" + ) + html += "" + html += "
{c.text}{c.text}
" + return html + + def _safe_concat_images(self, images): + """Safely concatenate image lists + + Args: + images: List of images + + Returns: + Image: Concatenated image, or the first image (if concatenation fails) + """ + if not images: + return None + + if len(images) == 1: + return images[0] + + try: + logger.info(f"Attempting to concatenate {len(images)} images") + from PIL import Image + + # Calculate the size of the concatenated image + total_width = max(img.width for img in images if hasattr(img, "width")) + total_height = sum(img.height for img in images if hasattr(img, "height")) + + if total_width <= 0 or total_height <= 0: + logger.warning("Invalid image size, returning the first image") + return images[0] + + # Create a new image + new_image = Image.new("RGBA", (total_width, total_height), (0, 0, 0, 0)) + + # Paste images one by one + y_offset = 0 + for img in images: + if not hasattr(img, "width") or not hasattr(img, "height"): + continue + + new_image.paste(img, (0, y_offset)) + y_offset += img.height + + logger.info( + f"Successfully concatenated images, final size: {total_width}x{total_height}" + ) + return new_image + except Exception as e: + logger.error(f"Failed to concatenate images: {str(e)}") + logger.error(f"Detailed error: {traceback.format_exc()}") + # If concatenation fails, return the first image + return images[0] + + +def _save_image_to_temp(logger, image, page_num, img_idx): + """Save image to a temporary file to pass between processes + + Args: + logger: Logger + image: PIL image object + page_num: Page number + img_idx: Image index + + Returns: + str: Temporary file path, or None (if saving fails) + """ + if not image: + return None + + import os + import tempfile + + try: + # Create a temporary file + temp_dir = tempfile.mkdtemp(prefix="docx_img_") + temp_file_path = os.path.join(temp_dir, f"page_{page_num}_img_{img_idx}.png") + + # Save the image + image.save(temp_file_path, format="PNG") + logger.info( + f"[PID:{os.getpid()}] Saved image to temporary file: {temp_file_path}" + ) + + return temp_file_path + except Exception as e: + logger.error(f"[PID:{os.getpid()}] Failed to save image to temp file: {str(e)}") + return None + + +def process_page_multiprocess( + page_num: int, + paragraphs: List[int], + from_page: int, + to_page: int, + doc_contains_images: bool, + max_image_size: int, + temp_file_path: Optional[str], + enable_multimodal: bool, +) -> List[LineData]: + """Page processing function specifically designed for multiprocessing + + Args: + page_num: Page number + paragraphs: List of paragraph indices + from_page: Starting page number + to_page: Ending page number + doc_contains_images: Whether the document contains images + max_image_size: Maximum image size + doc_binary: Document binary content + temp_file_path: Temporary file path, if using + enable_multimodal: Whether to enable multimodal processing + + Returns: + list: List of processed result lines + """ + try: + # Set process-level logging + process_logger = logging.getLogger(__name__) + + # If outside processing range, do not process + if page_num < from_page or page_num >= to_page: + process_logger.info( + f"[PID:{os.getpid()}] Skipping page {page_num} (out of requested range)" + ) + return [] + + process_logger.info( + f"[PID:{os.getpid()}] Processing page {page_num} with {len(paragraphs)} paragraphs, " + f"enable_multimodal={enable_multimodal}" + ) + start_time = time.time() + + # Load document in the process + doc = _load_document_in_process(process_logger, page_num, temp_file_path) + if not doc: + return [] + + # If paragraph indices are empty, return empty result + if not paragraphs: + process_logger.info( + f"[PID:{os.getpid()}] No paragraphs to process for page {page_num}" + ) + return [] + + # Extract page content + combined_text, image_objects, content_sequence = ( + _extract_page_content_in_process( + process_logger, + doc, + page_num, + paragraphs, + enable_multimodal, + max_image_size, + ) + ) + + # Process content sequence to maintain order between processes + processed_content = [] + temp_image_index = 0 + image_data_list = [] + + if enable_multimodal: + # First pass: save all images to temporary files + for i, image_object in enumerate(image_objects): + img_path = _save_image_to_temp( + process_logger, image_object, page_num, i + ) + if img_path: + # Create ImageData object + image_data = ImageData() + image_data.local_path = img_path + image_data.object = image_object + image_data_list.append(image_data) + + process_logger.info( + f"[PID:{os.getpid()}] Saved {len(image_data_list)} images to temp files for page {page_num}" + ) + + # Second pass: reconstruct the content sequence with image data objects + for content_type, content in content_sequence: + if content_type == "text": + processed_content.append(("text", content)) + else: # image + if temp_image_index < len(image_data_list): + processed_content.append( + ("image", image_data_list[temp_image_index]) + ) + temp_image_index += 1 + + # Create result line with the ordered content sequence + line_data = LineData( + text=combined_text, + images=image_data_list, + page_num=page_num, + content_sequence=processed_content, + ) + page_lines = [line_data] + + processing_time = time.time() - start_time + process_logger.info( + f"[PID:{os.getpid()}] Page {page_num} processing completed in {processing_time:.2f}s" + ) + + return page_lines + + except Exception as e: + process_logger = logging.getLogger(__name__) + process_logger.error( + f"[PID:{os.getpid()}] Error processing page {page_num}: {str(e)}" + ) + process_logger.error(f"[PID:{os.getpid()}] Traceback: {traceback.format_exc()}") + return [] + + +def _load_document_in_process(logger, page_num, temp_file_path): + """Load document in a process + + Args: + logger: Logger + page_num: Page number + temp_file_path: Temporary file path + + Returns: + Document: Loaded document object, or None (if loading fails) + """ + logger.info(f"[PID:{os.getpid()}] Loading document in process for page {page_num}") + try: + # Load document from temporary file + if temp_file_path is not None and os.path.exists(temp_file_path): + doc = Document(temp_file_path) + logger.info( + f"[PID:{os.getpid()}] Loaded document from temp file: {temp_file_path}" + ) + else: + logger.error(f"[PID:{os.getpid()}] No document source provided") + return None + return doc + + except Exception as e: + logger.error(f"[PID:{os.getpid()}] Failed to load document: {str(e)}") + logger.error(f"[PID:{os.getpid()}] Error traceback: {traceback.format_exc()}") + return None + + +def _extract_page_content_in_process( + logger, + doc, + page_num: int, + paragraphs: List[int], + enable_multimodal: bool, + max_image_size: int, +) -> Tuple[str, List[Any], List[Tuple[str, Any]]]: + """Extract page content in a process + + Args: + logger: Logger + doc: Document object + page_num: Page number + paragraphs: List of paragraph indices + enable_multimodal: Whether to enable multimodal processing + max_image_size: Maximum image size + + Returns: + tuple: (Extracted text, List of extracted images, Content sequence) + """ + logger.info( + f"[PID:{os.getpid()}] Page {page_num}: Processing {len(paragraphs)} paragraphs, " + f"enable_multimodal={enable_multimodal}" + ) + + # Instead of separate collections, track content in paragraph sequence + content_sequence = [] + current_text = "" + + processed_paragraphs = 0 + paragraphs_with_text = 0 + paragraphs_with_images = 0 + + for para_idx in paragraphs: + if para_idx >= len(doc.paragraphs): + logger.warning( + f"[PID:{os.getpid()}] Paragraph index {para_idx} out of range" + ) + continue + + paragraph = doc.paragraphs[para_idx] + processed_paragraphs += 1 + + # Extract text content + text = paragraph.text.strip() + if text: + # Clean text + cleaned_text = re.sub(r"\u3000", " ", text).strip() + current_text += cleaned_text + "\n" + paragraphs_with_text += 1 + + # Process image - if multimodal processing is enabled + if enable_multimodal: + image_object = _extract_image_in_process( + logger, doc, paragraph, page_num, para_idx, max_image_size + ) + if image_object: + # If we have accumulated text, add it to sequence first + if current_text: + content_sequence.append(("text", current_text)) + current_text = "" + + # Add image to sequence + content_sequence.append(("image", image_object)) + paragraphs_with_images += 1 + + if processed_paragraphs % 50 == 0: + logger.info( + f"[PID:{os.getpid()}] " + f"Page {page_num}: Processed {processed_paragraphs}/{len(paragraphs)} paragraphs" + ) + + # Add any remaining text + if current_text: + content_sequence.append(("text", current_text)) + + logger.info( + f"[PID:{os.getpid()}] Page {page_num}: Completed content extraction, " + f"found {paragraphs_with_text} paragraphs with text, " + f"{paragraphs_with_images} with images, " + f"total content items: {len(content_sequence)}" + ) + + # Extract text and images in their original sequence + text_parts = [] + images = [] + + # Split content sequence into text and images + for content_type, content in content_sequence: + if content_type == "text": + text_parts.append(content) + else: # image + images.append(content) + + combined_text = "\n\n".join(text_parts) if text_parts else "" + + return combined_text, images, content_sequence + + +def _extract_image_in_process( + logger, doc, paragraph, page_num, para_idx, max_image_size +): + """Extract image from a paragraph in a process + + Args: + logger: Logger + doc: Document object + paragraph: Paragraph object + page_num: Page number + para_idx: Paragraph index + max_image_size: Maximum image size + + Returns: + Image: Extracted image object, or None + """ + try: + # Attempt to extract image + img = paragraph._element.xpath(".//pic:pic") + if not img: + return None + + img = img[0] + logger.info( + f"[PID:{os.getpid()}] Page {page_num}: Found pic element in paragraph {para_idx}" + ) + + try: + # Extract image ID and related part + embed = img.xpath(".//a:blip/@r:embed") + if not embed: + logger.warning( + f"[PID:{os.getpid()}] Page {page_num}: No embed attribute found in image" + ) + return None + + embed = embed[0] + if embed not in doc.part.related_parts: + logger.warning( + f"[PID:{os.getpid()}] Page {page_num}: Embed ID {embed} not found in related parts" + ) + return None + + related_part = doc.part.related_parts[embed] + logger.info(f"[PID:{os.getpid()}] Found embedded image with ID: {embed}") + + # Attempt to get image data + try: + image_blob = related_part.image.blob + logger.info( + f"[PID:{os.getpid()}] Successfully extracted image blob, size: {len(image_blob)} bytes" + ) + except Exception as blob_error: + logger.warning( + f"[PID:{os.getpid()}] Error extracting image blob: {str(blob_error)}" + ) + return None + + # Convert data to PIL image + try: + image = Image.open(BytesIO(image_blob)).convert("RGBA") + + # Check image size + if hasattr(image, "width") and hasattr(image, "height"): + logger.info( + f"[PID:{os.getpid()}] Successfully created image object, " + f"size: {image.width}x{image.height}" + ) + + # Skip small images (usually decorative elements) + if image.width < 50 or image.height < 50: + logger.info( + f"[PID:{os.getpid()}] " + f"Skipping small image ({image.width}x{image.height})" + ) + return None + + # Scale large images + if image.width > max_image_size or image.height > max_image_size: + scale = min( + max_image_size / image.width, max_image_size / image.height + ) + new_width = int(image.width * scale) + new_height = int(image.height * scale) + resized_image = image.resize((new_width, new_height)) + logger.info( + f"[PID:{os.getpid()}] Resized image to {new_width}x{new_height}" + ) + return resized_image + + logger.info(f"[PID:{os.getpid()}] Found image in paragraph {para_idx}") + return image + except Exception as e: + logger.error( + f"[PID:{os.getpid()}] Failed to create image from blob: {str(e)}" + ) + logger.error( + f"[PID:{os.getpid()}] Error traceback: {traceback.format_exc()}" + ) + return None + except Exception as e: + logger.error(f"[PID:{os.getpid()}] Error extracting image: {str(e)}") + logger.error( + f"[PID:{os.getpid()}] Error traceback: {traceback.format_exc()}" + ) + return None + except Exception as e: + logger.error(f"[PID:{os.getpid()}] Error processing image: {str(e)}") + logger.error(f"[PID:{os.getpid()}] Error traceback: {traceback.format_exc()}") + return None diff --git a/ai-core/parser/excel_parser.py b/ai-core/parser/excel_parser.py new file mode 100644 index 0000000..d1d2056 --- /dev/null +++ b/ai-core/parser/excel_parser.py @@ -0,0 +1,119 @@ +""" +Excel Parser Module + +This module provides functionality to parse Excel files (.xlsx, .xls) into +structured Document objects with text content and chunks. It supports multiple +sheets and handles various Excel formats using pandas. +""" +import logging +from io import BytesIO +from typing import List + +import pandas as pd + +from docreader.models.document import Chunk, Document +from docreader.parser.base_parser import BaseParser + +logger = logging.getLogger(__name__) + + +class ExcelParser(BaseParser): + """Parser for Excel files (.xlsx, .xls). + + This parser extracts text content from Excel files by processing all sheets + and converting each row into a structured text format. Each row becomes a + separate chunk with key-value pairs. + + Features: + - Supports multiple sheets in a single Excel file + - Automatically removes completely empty rows + - Converts each row to "column: value" format + - Creates individual chunks for each row for better granularity + + Example: + >>> parser = ExcelParser() + >>> with open("data.xlsx", "rb") as f: + ... content = f.read() + ... document = parser.parse_into_text(content) + >>> print(document.content) + Name: John,Age: 30,City: NYC + Name: Jane,Age: 25,City: LA + """ + + def parse_into_text(self, content: bytes) -> Document: + """Parse Excel file bytes into a Document object. + + Args: + content: Raw bytes of the Excel file + + Returns: + Document: Parsed document containing: + - content: Full text with all rows from all sheets + - chunks: List of Chunk objects, one per row + + Note: + - Empty rows (all NaN values) are automatically skipped + - Each row is formatted as: "col1: val1,col2: val2,..." + - Chunks maintain sequential ordering across all sheets + """ + chunks: List[Chunk] = [] + text: List[str] = [] + start, end = 0, 0 + + # Load Excel file from bytes into pandas ExcelFile object + excel_file = pd.ExcelFile(BytesIO(content)) + + # Process each sheet in the Excel file + for excel_sheet_name in excel_file.sheet_names: + # Parse the sheet into a DataFrame + df = excel_file.parse(sheet_name=excel_sheet_name) + # Remove rows where all values are NaN (completely empty rows) + df.dropna(how="all", inplace=True) + + # Process each row in the DataFrame + for _, row in df.iterrows(): + page_content = [] + # Build key-value pairs for non-null values + for k, v in row.items(): + if pd.notna(v): # Skip NaN/null values + page_content.append(f"{k}: {v}") + + # Skip rows with no valid content + if not page_content: + continue + + # Format row as comma-separated key-value pairs + content_row = ",".join(page_content) + "\n" + end += len(content_row) + text.append(content_row) + + # Create a chunk for this row with position tracking + chunks.append( + Chunk(content=content_row, seq=len(chunks), start=start, end=end) + ) + start = end + + # Combine all text and return as Document + return Document(content="".join(text), chunks=chunks) + + +if __name__ == "__main__": + # Example usage: Parse an Excel file and display results + logging.basicConfig(level=logging.DEBUG) + + # Specify the path to your Excel file + your_file = "/path/to/your/file.xlsx" + parser = ExcelParser() + + # Read and parse the Excel file + with open(your_file, "rb") as f: + content = f.read() + document = parser.parse_into_text(content) + + # Display the full document content + logger.error(document.content) + + # Display the first chunk as an example + for chunk in document.chunks: + logger.error(chunk.content) + break # Only show the first chunk diff --git a/ai-core/parser/image_parser.py b/ai-core/parser/image_parser.py new file mode 100644 index 0000000..dfbf1d5 --- /dev/null +++ b/ai-core/parser/image_parser.py @@ -0,0 +1,28 @@ +import base64 +import logging +import os + +from docreader.models.document import Document +from docreader.parser.base_parser import BaseParser + +logger = logging.getLogger(__name__) + + +class ImageParser(BaseParser): + """Parser for standalone image files. + + Returns the image as a markdown reference with the raw image data + in Document.images so that the Go-side ImageResolver (or main.py's + _resolve_images) can handle storage upload. + """ + + def parse_into_text(self, content: bytes) -> Document: + logger.info("Parsing image file=%s, size=%d bytes", self.file_name, len(content)) + + ext = os.path.splitext(self.file_name)[1].lower() or ".png" + ref_path = f"images/{self.file_name}" + + text = f"![{self.file_name}]({ref_path})" + images = {ref_path: base64.b64encode(content).decode()} + + return Document(content=text, images=images) diff --git a/ai-core/parser/markdown_parser.py b/ai-core/parser/markdown_parser.py new file mode 100644 index 0000000..93e0497 --- /dev/null +++ b/ai-core/parser/markdown_parser.py @@ -0,0 +1,403 @@ +""" +Markdown Parser Module + +This module provides comprehensive Markdown parsing functionality including: +- Table formatting and standardization +- Base64 image extraction and conversion +- Image path replacement and URL generation +- Pipeline-based parsing with multiple stages + +The parser uses a pipeline approach to process Markdown content through +multiple stages: table formatting -> image processing. +""" + +import base64 +import logging +import os +import re +import uuid +from typing import Dict, List, Match, Optional, Tuple + +from docreader.models.document import Document +from docreader.parser.base_parser import BaseParser +from docreader.parser.chain_parser import PipelineParser +from docreader.utils import endecode + +# Get logger object +logger = logging.getLogger(__name__) + + +class MarkdownTableUtil: + """Utility class for formatting Markdown tables. + + This class standardizes Markdown table formatting by: + - Normalizing column alignment markers (e.g., :---, :---:, ---:) + - Adding consistent spacing around pipes (|) + - Preserving indentation levels + - Handling both header rows and data rows + + Example: + Input: |姓名|年龄|城市| + |:---|---:|:---:| + |张三|25|北京| + + Output: | 姓名 | 年龄 | 城市 | + | :--- | ---: | :---: | + | 张三 | 25 | 北京 | + """ + + def __init__(self): + # Pattern to match alignment row (e.g., |:---|---:|:---:|) + self.align_pattern = re.compile( + r"^([\t ]*)\|[\t ]*[:-]+(?:[\t ]*\|[\t ]*[:-]+)*[\t ]*\|[\t ]*$", + re.MULTILINE, + ) + # Pattern to match regular table rows (header or data) + self.line_pattern = re.compile( + r"^([\t ]*)\|[\t ]*[^|\r\n]*(?:[\t ]*\|[^|\r\n]*)*\|[\t ]*$", + re.MULTILINE, + ) + + def format_table(self, content: str) -> str: + """Format all Markdown tables in the content. + + Args: + content: Raw Markdown text containing tables + + Returns: + Formatted Markdown text with standardized table formatting + """ + + def process_align(match: Match[str]) -> str: + """Process alignment row to standardize format.""" + # Split by | and remove empty strings + columns = [col.strip() for col in match.group(0).split("|") if col.strip()] + + processed = [] + for col in columns: + # Preserve left alignment marker (:---) + left_colon = ":" if col.startswith(":") else "" + # Preserve right alignment marker (---:) + right_colon = ":" if col.endswith(":") else "" + processed.append(left_colon + "---" + right_colon) + + # Preserve original indentation + prefix = match.group(1) + return prefix + "| " + " | ".join(processed) + " |" + + def process_line(match: Match[str]) -> str: + """Process regular table row to standardize format.""" + # Split by | and remove empty strings + columns = [col.strip() for col in match.group(0).split("|") if col.strip()] + + # Preserve original indentation + prefix = match.group(1) + return prefix + "| " + " | ".join(columns) + " |" + + formatted_content = content + # First format regular rows (header and data) + formatted_content = self.line_pattern.sub(process_line, formatted_content) + # Then format alignment rows (must be done after to avoid conflicts) + formatted_content = self.align_pattern.sub(process_align, formatted_content) + + return formatted_content + + @staticmethod + def _self_test(): + test_content = """ +# 测试表格 +普通文本---不会被匹配 + +## 表格1(无前置空格) + +| 姓名 | 年龄 | 城市 | +| :---------- | -------: | :------ | +| 张三 | 25 | 北京 | + +## 表格3(前置4个空格+首尾|) + | 产品 | 价格 | 库存 | + | :-------------: | ----------- | :-----------: | + | 手机 | 5999 | 100 | +""" + util = MarkdownTableUtil() + format_content = util.format_table(test_content) + print(format_content) + + +class MarkdownTableFormatter(BaseParser): + """Parser for formatting Markdown tables. + + This parser standardizes the formatting of all Markdown tables in the + document to ensure consistent spacing and alignment markers. + + Example: + >>> formatter = MarkdownTableFormatter() + >>> content = b"|Name|Age|\n|---|---|\n|John|30|" + >>> doc = formatter.parse_into_text(content) + >>> print(doc.content) + | Name | Age | + | --- | --- | + | John | 30 | + """ + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.table_helper = MarkdownTableUtil() + + def parse_into_text(self, content: bytes) -> Document: + """Parse and format Markdown tables. + + Args: + content: Raw Markdown content as bytes + + Returns: + Document with formatted table content + """ + # Decode bytes to string with automatic encoding detection + text = endecode.decode_bytes(content) + # Format all tables in the content + text = self.table_helper.format_table(text) + return Document(content=text) + + +class MarkdownImageUtil: + """Utility class for handling images in Markdown. + + This class provides functionality to: + - Extract base64-encoded images from Markdown + - Extract image paths from Markdown + - Replace image paths with new URLs + - Convert base64 images to binary format + + Supported formats: + - Base64 embedded images: ![alt](data:image/png;base64,iVBORw0...) + - Regular image links: ![alt](path/to/image.png) + """ + + def __init__(self): + # Pattern to match base64 embedded images + # Captures: (1) alt text, (2) image format, (3) base64 data + self.b64_pattern = re.compile( + r"!\[([^\]]*)\]\(data:image/(\w+)\+?\w*;base64,([^\)]+)\)" + ) + # Pattern to match regular image syntax + self.image_pattern = re.compile(r"!\[([^\]]*)\]\(([^)]+)\)") + # Pattern for replacing image paths + self.replace_pattern = re.compile(r"!\[([^\]]*)\]\(([^)]+)\)") + + def extract_image( + self, + content: str, + path_prefix: Optional[str] = None, + replace: bool = True, + ) -> Tuple[str, List[str]]: + """Extract image paths from Markdown content. + + Args: + content: Markdown text containing images + path_prefix: Optional prefix to add to image paths + replace: Whether to replace image syntax in content + + Returns: + Tuple of (processed_text, list_of_image_paths) + + Example: + >>> util = MarkdownImageUtil() + >>> text, images = util.extract_image("![logo](img/logo.png)") + >>> print(images) + ['img/logo.png'] + """ + # List to store extracted image paths + images: List[str] = [] + + def repl(match: Match[str]) -> str: + """Replacement function for each image match.""" + title = match.group(1) # Alt text + image_path = match.group(2) # Image path + + # Add prefix if specified + if path_prefix: + image_path = f"{path_prefix}/{image_path}" + + images.append(image_path) + + # Keep original if replace is False + if not replace: + return match.group(0) + + # Replace image path with potentially prefixed path + return f"![{title}]({image_path})" + + text = self.image_pattern.sub(repl, content) + logger.debug(f"Extracted {len(images)} images from markdown") + return text, images + + def extract_base64( + self, + content: str, + path_prefix: Optional[str] = None, + replace: bool = True, + ) -> Tuple[str, Dict[str, bytes]]: + """Extract and decode base64 embedded images from Markdown. + + This method finds all base64-encoded images in the Markdown content, + decodes them to binary format, generates unique filenames, and + optionally replaces them with file path references. + + Args: + content: Markdown text containing base64 images + path_prefix: Optional directory prefix for generated paths + replace: Whether to replace base64 syntax with file paths + + Returns: + Tuple of (processed_text, dict_of_path_to_bytes) + + Example: + >>> util = MarkdownImageUtil() + >>> text = "![logo](data:image/png;base64,iVBORw0KGg...)" + >>> new_text, images = util.extract_base64(text, "images") + >>> print(new_text) + ![logo](images/uuid.png) + >>> print(len(images)) + 1 + """ + # Dictionary mapping generated file paths to binary image data + images: Dict[str, bytes] = {} + + def repl(match: Match[str]) -> str: + """Replacement function for each base64 image match.""" + title = match.group(1) # Alt text + img_ext = match.group(2) # Image format (png, jpg, etc.) + img_b64 = match.group(3) # Base64 encoded data + + # Decode base64 string to bytes + image_byte = endecode.encode_image(img_b64, errors="ignore") + if not image_byte: + logger.error(f"Failed to decode base64 image skip it: {img_b64}") + return title # Return just the alt text if decode fails + + # Generate unique filename with original extension + image_path = f"{uuid.uuid4()}.{img_ext}" + if path_prefix: + image_path = f"{path_prefix}/{image_path}" + images[image_path] = image_byte + + # Keep original base64 if replace is False + if not replace: + return match.group(0) + + # Replace base64 data with file path reference + return f"![{title}]({image_path})" + + text = self.b64_pattern.sub(repl, content) + logger.debug(f"Extracted {len(images)} base64 images from markdown") + return text, images + + def replace_path(self, content: str, images: Dict[str, str]) -> str: + """Replace image paths in Markdown with new URLs. + + This method is typically used to replace local file paths with + uploaded URLs after images have been stored. + + Args: + content: Markdown text with image references + images: Mapping of old paths to new URLs + + Returns: + Markdown text with updated image URLs + + Example: + >>> util = MarkdownImageUtil() + >>> content = "![logo](temp/img.png)" + >>> mapping = {"temp/img.png": "https://cdn.com/img.png"} + >>> result = util.replace_path(content, mapping) + >>> print(result) + ![logo](https://cdn.com/img.png) + """ + # Track which paths were actually replaced + content_replace: set = set() + + def repl(match: Match[str]) -> str: + """Replacement function for each image match.""" + title = match.group(1) # Alt text + image_path = match.group(2) # Current image path + + # Only replace if path exists in mapping + if image_path not in images: + return match.group(0) # Keep original + + content_replace.add(image_path) + # Get new URL from mapping + image_path = images[image_path] + return f"![{title}]({image_path})" if image_path else title + + text = self.replace_pattern.sub(repl, content) + logger.debug(f"Replaced {len(content_replace)} images in markdown") + return text + + @staticmethod + def _self_test(): + your_content = "test![](data:image/png;base64,iVBORw0KGgoAAAA)test" + image_handle = MarkdownImageUtil() + text, images = image_handle.extract_base64(your_content) + print(text) + + for image_url, image_byte in images.items(): + with open(image_url, "wb") as f: + f.write(image_byte) + + +class MarkdownImageBase64(BaseParser): + """Parser for extracting base64 images from Markdown. + + Extracts base64-encoded images, replaces them with path references, + and returns the raw image data in Document.images for the Go-side + ImageResolver (or main.py _resolve_images) to handle storage. + """ + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.image_helper = MarkdownImageUtil() + + def parse_into_text(self, content: bytes) -> Document: + text = endecode.decode_bytes(content) + text, img_b64 = self.image_helper.extract_base64(text, path_prefix="images") + + images: Dict[str, str] = {} + for ipath, raw_bytes in img_b64.items(): + images[ipath] = base64.b64encode(raw_bytes).decode() + + logger.debug("Extracted %d base64 images from markdown", len(images)) + return Document(content=text, images=images) + + +class MarkdownParser(PipelineParser): + """Complete Markdown parser using pipeline approach. + + This parser processes Markdown content through multiple stages: + 1. MarkdownTableFormatter: Standardizes table formatting + 2. MarkdownImageBase64: Extracts and uploads base64 images + + The pipeline ensures that content flows through each parser in sequence, + with each stage's output becoming the next stage's input. + """ + + _parser_cls = (MarkdownTableFormatter, MarkdownImageBase64) + + +if __name__ == "__main__": + # Example usage and testing + logging.basicConfig(level=logging.DEBUG) + + # Test the complete MarkdownParser pipeline + your_content = "test![](data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAAMgA)test" + parser = MarkdownParser() + + # Parse content and display results + document = parser.parse_into_text(your_content.encode()) + logger.info(document.content) + logger.info(f"Images: {len(document.images)}, name: {document.images.keys()}") + + # Run individual utility tests + MarkdownImageUtil._self_test() + MarkdownTableUtil._self_test() diff --git a/ai-core/parser/markitdown_parser.py b/ai-core/parser/markitdown_parser.py new file mode 100644 index 0000000..2203de5 --- /dev/null +++ b/ai-core/parser/markitdown_parser.py @@ -0,0 +1,107 @@ +import io +import logging +import re +import base64 + +from markitdown import MarkItDown + +from docreader.models.document import Document +from docreader.parser.base_parser import BaseParser +from docreader.parser.chain_parser import PipelineParser +from docreader.parser.markdown_parser import MarkdownParser + +# 尝试导入 VLMClient +try: + from parser.vlm_client import VLMClient +except ImportError: + VLMClient = None + +logger = logging.getLogger(__name__) + + +class StdMarkitdownParser(BaseParser): + """ + Standard MarkItDown Parser Wrapper + + This parser uses the markitdown library to convert various document formats + (docx, pptx, pdf, etc.) into text/markdown. + Optionally uses VLM to process images. + """ + + def __init__(self, *args, vlm_config=None, **kwargs): + # 这里的 super() 会调用 BaseParser 的初始化,确保 self.file_type 被正确赋值 + super().__init__(*args, **kwargs) + self.markitdown = MarkItDown() + self.vlm_config = vlm_config + self.vlm_client = None + + # 如果有 VLM 配置,初始化 VLM 客户端 + if vlm_config and vlm_config.get("enabled") and VLMClient: + try: + self.vlm_client = VLMClient(vlm_config) + logger.info(f"VLM client initialized: provider={vlm_config.get('provider')}, model={vlm_config.get('model')}") + except Exception as e: + logger.warning(f"Failed to initialize VLM client: {e}") + + def parse_into_text(self, content: bytes) -> Document: + """ + Parses content using MarkItDown. + Uses self.file_type (inherited from BaseParser) to hint the stream format. + """ + ext = self.file_type + if ext and not ext.startswith('.'): + ext = '.' + ext + + # 直接调用 convert,移除 try-catch,让异常由上层 PipelineParser 统一捕获 + result = self.markitdown.convert( + io.BytesIO(content), + file_extension=ext, + keep_data_uris=True + ) + + markdown_content = result.text_content + + # 如果有 VLM 客户端,尝试处理图片 + if self.vlm_client and markdown_content: + markdown_content = self._process_images_with_vlm(markdown_content) + + return Document(content=markdown_content) + + def _process_images_with_vlm(self, content: str) -> str: + """ + 处理 Markdown 内容中的图片,使用 VLM 分析并替换 + """ + # 匹配 data:image 开头的 Base64 图片 + pattern = r'!\[([^\]]*)\]\((data:image/([^;]+);base64,([A-Za-z0-9+/=]+))\)' + + def replace_image(match): + alt_text = match.group(1) + data_url = match.group(2) + mime_type = match.group(3) or "image/png" + base64_data = match.group(4) + + try: + # 解码 Base64 图片 + image_bytes = base64.b64decode(base64_data) + + # 调用 VLM 分析图片 + logger.info(f"Processing image with VLM: {alt_text or 'unnamed'}") + vlm_result = self.vlm_client.analyze_image(image_bytes, mime_type) + + if vlm_result.get("success"): + vlm_content = vlm_result.get("content", "") + logger.info(f"VLM processed image successfully, content length: {len(vlm_content)}") + # 替换为 VLM 解析的内容 + return f"\n{vlm_content}\n" + else: + logger.warning(f"VLM failed for image: {vlm_result.get('error')}") + return match.group(0) # 保留原图片引用 + except Exception as e: + logger.error(f"Error processing image with VLM: {e}") + return match.group(0) # 保留原图片引用 + + return re.sub(pattern, replace_image, content) + + +class MarkitdownParser(PipelineParser): + _parser_cls = (StdMarkitdownParser, MarkdownParser) \ No newline at end of file diff --git a/ai-core/parser/parser.py b/ai-core/parser/parser.py new file mode 100644 index 0000000..ebe5043 --- /dev/null +++ b/ai-core/parser/parser.py @@ -0,0 +1,88 @@ +import logging +from typing import Any, Optional + +from docreader.models.document import Document +from docreader.parser.registry import registry +from docreader.parser.web_parser import WebParser + +logger = logging.getLogger(__name__) + + +class Parser: + """Document parser facade (lightweight version). + + Converts files/URLs to markdown + image references. + No chunking, no storage, no OCR, no VLM. + """ + + def __init__(self): + self.registry = registry + logger.info( + "Parser initialized with engines: %s", + ", ".join(self.registry.get_engine_names()), + ) + + def parse_file( + self, + file_name: str, + file_type: str, + content: bytes, + parser_engine: Optional[str] = None, + engine_overrides: Optional[dict[str, Any]] = None, + vlm_config: Optional[dict[str, Any]] = None, + ) -> Document: + """Parse file content to markdown.""" + engine = parser_engine or "" + overrides = engine_overrides or {} + logger.info( + "Parsing file: %s, type: %s, engine: %s, vlm_enabled: %s", + file_name, + file_type, + engine or "builtin", + vlm_config.get("enabled") if vlm_config else False, + ) + + # 如果有 VLM 配置,添加到 overrides 中 + if vlm_config and vlm_config.get("enabled"): + overrides["vlm_config"] = vlm_config + + cls = self.registry.get_parser_class(engine, file_type) + logger.info( + "Creating %s parser instance for %s file", + cls.__name__, + file_type, + ) + parser = cls( + file_name=file_name, + file_type=file_type, + **overrides, + ) + + logger.info("Starting to parse file content, size: %d bytes", len(content)) + result = parser.parse(content) + + if not result.content: + logger.warning("Parser returned empty content for file: %s", file_name) + logger.info( + "Parsed file %s, content length=%d", file_name, len(result.content) + ) + return result + + def parse_url( + self, + url: str, + title: str, + parser_engine: Optional[str] = None, + engine_overrides: Optional[dict[str, Any]] = None, + ) -> Document: + """Parse content from a URL to markdown.""" + logger.info("Parsing URL: %s, title: %s", url, title) + + parser = WebParser(title=title) + logger.info("Starting to parse URL content") + result = parser.parse(url.encode()) + + if not result.content: + logger.warning("Parser returned empty content for url: %s", url) + logger.info("Parsed url %s, content length=%d", url, len(result.content)) + return result diff --git a/ai-core/parser/parser_simple.py b/ai-core/parser/parser_simple.py new file mode 100644 index 0000000..de9d0c4 --- /dev/null +++ b/ai-core/parser/parser_simple.py @@ -0,0 +1,275 @@ +""" +简化的 Parser - 使用 markitdown + VLM +""" +import logging +import os +import io +import re +import base64 +from typing import Optional, Any, Dict +from markitdown import MarkItDown + +logger = logging.getLogger(__name__) + + +class Document: + """简单的文档对象""" + def __init__(self, content: str = "", chunks: list = None, metadata: dict = None): + self.content = content + self.chunks = chunks or [] + self.metadata = metadata or {} + + +class VLMClient: + """VLM 客户端""" + + def __init__(self, config: Dict[str, Any]): + self.provider = config.get("provider", "openai") + self.model = config.get("model", "gpt-4o") + self.api_key = config.get("api_key", "") + self.base_url = config.get("base_url", "") + self.prompt = config.get("prompt", "") or self._default_prompt() + logger.info(f"VLMClient initialized: provider={self.provider}, model={self.model}") + + def _default_prompt(self) -> str: + return """请分析这个文档图片的内容,并将其转换为 Markdown 格式。 +要求: +1. 保持原文的格式和结构 +2. 表格用 Markdown 表格格式 +3. 标题用 # ## ### 标记 +4. 尽量保留原文的所有信息""" + + def analyze_image(self, content: bytes, mime_type: str) -> Dict[str, Any]: + """分析图片""" + if self.provider == "openai": + return self._call_openai(content, mime_type) + elif self.provider == "anthropic": + return self._call_anthropic(content, mime_type) + elif self.provider == "qwen": + return self._call_qwen(content, mime_type) + else: + return {"success": False, "error": f"Unknown provider: {self.provider}"} + + def _call_openai(self, content: bytes, mime_type: str) -> Dict[str, Any]: + try: + import requests + url = (self.base_url or "https://api.openai.com/v1") + "/chat/completions" + image_b64 = base64.b64encode(content).decode("utf-8") + + headers = {"Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json"} + payload = { + "model": self.model, + "messages": [{ + "role": "user", + "content": [ + {"type": "text", "text": self.prompt}, + {"type": "image_url", "image_url": {"url": f"data:{mime_type};base64,{image_b64}"}} + ] + }], + "max_tokens": 4096 + } + + resp = requests.post(url, headers=headers, json=payload, timeout=120) + resp.raise_for_status() + result = resp.json() + return {"success": True, "content": result["choices"][0]["message"]["content"]} + except Exception as e: + logger.error(f"OpenAI VLM error: {e}") + return {"success": False, "error": str(e)} + + def _call_anthropic(self, content: bytes, mime_type: str) -> Dict[str, Any]: + try: + import requests + url = (self.base_url or "https://api.anthropic.com/v1") + "/messages" + image_b64 = base64.b64encode(content).decode("utf-8") + + headers = { + "x-api-key": self.api_key, + "anthropic-version": "2023-06-01", + "Content-Type": "application/json" + } + payload = { + "model": self.model, + "max_tokens": 4096, + "messages": [{ + "role": "user", + "content": [ + {"type": "text", "text": self.prompt}, + {"type": "image", "source": {"type": "base64", "media_type": mime_type, "data": image_b64}} + ] + }] + } + + resp = requests.post(url, headers=headers, json=payload, timeout=120) + resp.raise_for_status() + result = resp.json() + return {"success": True, "content": result["content"][0]["text"]} + except Exception as e: + logger.error(f"Anthropic VLM error: {e}") + return {"success": False, "error": str(e)} + + def _call_qwen(self, content: bytes, mime_type: str) -> Dict[str, Any]: + try: + import requests + url = (self.base_url or "https://dashscope.aliyuncs.com/compatible-mode/v1") + "/chat/completions" + image_b64 = base64.b64encode(content).decode("utf-8") + + headers = {"Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json"} + payload = { + "model": self.model, + "messages": [{ + "role": "user", + "content": [ + {"type": "text", "text": self.prompt}, + {"type": "image_url", "image_url": {"url": f"data:{mime_type};base64,{image_b64}"}} + ] + }] + } + + resp = requests.post(url, headers=headers, json=payload, timeout=120) + resp.raise_for_status() + result = resp.json() + return {"success": True, "content": result["choices"][0]["message"]["content"]} + except Exception as e: + logger.error(f"Qwen VLM error: {e}") + return {"success": False, "error": str(e)} + + +class Parser: + """基于 MarkItDown + VLM 的文档解析器""" + + def __init__(self): + self.markitdown = MarkItDown() + self.vlm_client: Optional[VLMClient] = None + logger.info("Parser initialized with MarkItDown") + + def set_vlm_config(self, config: Dict[str, Any]) -> None: + """设置 VLM 配置""" + if config and config.get("enabled") and config.get("api_key"): + self.vlm_client = VLMClient(config) + logger.info(f"VLM enabled: provider={config.get('provider')}, model={config.get('model')}") + else: + self.vlm_client = None + + def _should_use_vlm(self, file_name: str) -> bool: + """判断是否应该使用 VLM""" + if not self.vlm_client: + return False + ext = os.path.splitext(file_name)[1].lower() + # 图片和 PDF 都使用 VLM + image_exts = ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp', '.tiff'] + return ext in image_exts or ext == '.pdf' + + def _process_images_with_vlm(self, content: str) -> str: + """处理 Markdown 内容中的图片""" + # 匹配 data:image 开头的 Base64 图片 + pattern = r'!\[([^\]]*)\]\((data:image/([^;]+);base64,([A-Za-z0-9+/=]+))\)' + + def replace_image(match): + alt_text = match.group(1) + data_url = match.group(2) + mime_type = match.group(3) or "image/png" + base64_data = match.group(4) + + try: + image_bytes = base64.b64decode(base64_data) + logger.info(f"Processing image with VLM: {alt_text or 'unnamed'}") + vlm_result = self.vlm_client.analyze_image(image_bytes, mime_type) + + if vlm_result.get("success"): + vlm_content = vlm_result.get("content", "") + logger.info(f"VLM processed image, content length: {len(vlm_content)}") + return f"\n{vlm_content}\n" + else: + logger.warning(f"VLM failed: {vlm_result.get('error')}") + return match.group(0) + except Exception as e: + logger.error(f"VLM error: {e}") + return match.group(0) + + return re.sub(pattern, replace_image, content) + + def _parse_with_vlm(self, content: bytes, file_name: str) -> Document: + """使用 VLM 直接解析整个文件""" + ext = os.path.splitext(file_name)[1].lower() + mime_types = { + '.jpg': 'image/jpeg', '.jpeg': 'image/jpeg', '.png': 'image/png', + '.gif': 'image/gif', '.bmp': 'image/bmp', '.webp': 'image/webp', + '.tiff': 'image/tiff', '.pdf': 'application/pdf', + } + mime_type = mime_types.get(ext, 'image/png') + + result = self.vlm_client.analyze_image(content, mime_type) + if result.get("success"): + return Document(content=result["content"], metadata={"vlm": True}) + else: + logger.error(f"VLM failed: {result.get('error')}") + return Document(content="") + + def parse_file( + self, + file_name: str, + file_type: str, + content: bytes, + parser_engine: Optional[str] = None, + engine_overrides: Optional[dict[str, Any]] = None, + vlm_config: Optional[dict[str, Any]] = None, + ) -> Document: + """解析文件内容""" + logger.info(f"Parsing file: {file_name}, type: {file_type}, vlm_config={'enabled' if vlm_config and vlm_config.get('enabled') else 'none'}") + + # 设置 VLM 配置 + if vlm_config and vlm_config.get("enabled"): + self.set_vlm_config(vlm_config) + + # 判断是否使用 VLM 直接解析 + if self._should_use_vlm(file_name): + logger.info(f"Using VLM for {file_name}") + return self._parse_with_vlm(content, file_name) + + # 使用 MarkItDown 解析 + try: + ext = file_type + if not ext.startswith('.'): + ext = '.' + ext + + result = self.markitdown.convert( + io.BytesIO(content), + file_extension=ext, + keep_data_uris=True + ) + + markdown_content = result.text_content or "" + + # 如果有 VLM,处理图片 + if self.vlm_client and markdown_content: + markdown_content = self._process_images_with_vlm(markdown_content) + + return Document( + content=markdown_content, + metadata=result.metadata if hasattr(result, 'metadata') else {} + ) + except Exception as e: + logger.error(f"Parse error: {e}") + return Document(content="") + + def parse_url( + self, + url: str, + title: str, + parser_engine: Optional[str] = None, + engine_overrides: Optional[dict[str, Any]] = None, + ) -> Document: + """解析 URL""" + logger.info(f"Parsing URL: {url}, title: {title}") + + try: + result = self.markitdown.convert(url) + return Document(content=result.text_content or "") + except Exception as e: + logger.error(f"URL parse error: {e}") + return Document(content="") + + +# 导出 +__all__ = ["Parser", "Document"] diff --git a/ai-core/parser/pdf_parser.py b/ai-core/parser/pdf_parser.py new file mode 100644 index 0000000..f98ab3e --- /dev/null +++ b/ai-core/parser/pdf_parser.py @@ -0,0 +1,15 @@ +from docreader.parser.chain_parser import FirstParser +from docreader.parser.markitdown_parser import MarkitdownParser + + +class PDFParser(FirstParser): + """PDF Parser using chain of responsibility pattern + + Attempts to parse PDF files using multiple parser backends in order: + 1. MinerUParser - Primary parser for PDF documents + 2. MarkitdownParser - Fallback parser if MinerU fails + + The first successful parser result will be returned. + """ + # Parser classes to try in order (chain of responsibility pattern) + _parser_cls = (MarkitdownParser,) diff --git a/ai-core/parser/registry.py b/ai-core/parser/registry.py new file mode 100644 index 0000000..18580b3 --- /dev/null +++ b/ai-core/parser/registry.py @@ -0,0 +1,160 @@ +import logging +from typing import Any, Callable, Dict, List, Optional, Tuple, Type + +from docreader.parser.base_parser import BaseParser +from docreader.parser.doc_parser import DocParser +from docreader.parser.docx2_parser import Docx2Parser +from docreader.parser.excel_parser import ExcelParser +from docreader.parser.image_parser import ImageParser +from docreader.parser.markdown_parser import MarkdownParser +from docreader.parser.markitdown_parser import MarkitdownParser +from docreader.parser.pdf_parser import PDFParser + +logger = logging.getLogger(__name__) + +BUILTIN_ENGINE = "builtin" + + +class ParserEngineRegistry: + """Registry for parser engines. + + Each engine maps file extensions to parser classes. + When a requested engine doesn't support a file type, the registry + falls back to the builtin engine automatically. + """ + + def __init__(self): + self._engines: Dict[str, Dict[str, Type[BaseParser]]] = {} + self._descriptions: Dict[str, str] = {} + self._check_available: Dict[str, Callable[..., Tuple[bool, str]]] = {} + self._unavailable_hint: Dict[str, str] = {} + + def register( + self, + name: str, + file_types: Dict[str, Type[BaseParser]], + description: str = "", + check_available: Callable[..., Tuple[bool, str]] | None = None, + unavailable_hint: str = "", + ): + self._engines[name] = file_types + self._descriptions[name] = description + if check_available is not None: + self._check_available[name] = check_available + self._unavailable_hint[name] = unavailable_hint + logger.info( + "Registered parser engine '%s' with file types: %s", + name, + ", ".join(file_types.keys()), + ) + + def get_parser_class(self, engine: str, file_type: str) -> Type[BaseParser]: + """Resolve parser class for the given engine and file type. + + Falls back to builtin engine when the requested engine doesn't + support the file type. + """ + ft = file_type.lower() + + if engine and engine in self._engines: + cls = self._engines[engine].get(ft) + if cls: + logger.info("Using engine '%s' for file type '%s'", engine, ft) + return cls + logger.info( + "Engine '%s' does not support '%s', falling back to builtin", + engine, + ft, + ) + + builtin = self._engines.get(BUILTIN_ENGINE, {}) + cls = builtin.get(ft) + if cls: + return cls + + raise ValueError(f"Unsupported file type: {file_type}") + + def list_engines(self, overrides: Optional[Dict[str, str]] = None) -> List[Dict]: + """Return metadata for all registered engines, including availability. + + Args: + overrides: tenant-level config overrides (e.g. mineru_endpoint, mineru_api_key) + forwarded to each engine's check_available function. + """ + result = [] + for name, parsers in self._engines.items(): + available = True + unavailable_reason = "" + check = self._check_available.get(name) + if check is not None: + try: + available, unavailable_reason = check(overrides) + except Exception as e: + available = False + unavailable_reason = str(e) or self._unavailable_hint.get(name, "") + if not available and not unavailable_reason: + unavailable_reason = self._unavailable_hint.get(name, "不可用") + result.append( + { + "name": name, + "description": self._descriptions.get(name, ""), + "file_types": sorted(parsers.keys()), + "available": available, + "unavailable_reason": unavailable_reason, + } + ) + return result + + def get_engine_names(self) -> List[str]: + return list(self._engines.keys()) + + +def _build_default_registry() -> ParserEngineRegistry: + """Create and populate the default registry with all known engines.""" + reg = ParserEngineRegistry() + + _image_types = { + ext: ImageParser for ext in ("jpg", "jpeg", "png", "gif", "bmp", "tiff", "webp") + } + + reg.register( + BUILTIN_ENGINE, + { + "docx": Docx2Parser, + "doc": DocParser, + "pdf": PDFParser, + "md": MarkdownParser, + "markdown": MarkdownParser, + "xlsx": ExcelParser, + "xls": ExcelParser, + **_image_types, + }, + description="内置解析引擎", + ) + + reg.register( + "markitdown", + { + "md": MarkitdownParser, + "markdown": MarkitdownParser, + "pdf": MarkitdownParser, + "docx": MarkitdownParser, + "doc": MarkitdownParser, + "pptx": MarkitdownParser, + "ppt": MarkitdownParser, + "xlsx": MarkitdownParser, + "xls": MarkitdownParser, + "csv": MarkitdownParser, + }, + description="MarkItDown 解析引擎(微软 MarkItDown 库)", + ) + + # NOTE: Engine listing is managed by Go-side engine registry + # (docparser.ListAllEngines). The Python list_engines method is kept for + # backward compatibility with the gRPC ListEngines RPC but the Go app + # no longer calls it. MinerU engines are handled natively by Go. + + return reg + + +registry = _build_default_registry() diff --git a/ai-core/parser/storage.py b/ai-core/parser/storage.py new file mode 100644 index 0000000..7f62896 --- /dev/null +++ b/ai-core/parser/storage.py @@ -0,0 +1,322 @@ +# -*- coding: utf-8 -*- +import io +import logging +import os +import traceback +import uuid +from abc import ABC, abstractmethod +from typing import Dict, Optional + +from minio import Minio +from qcloud_cos import CosConfig, CosS3Client + +from docreader.utils import endecode + +logger = logging.getLogger(__name__) + + +def _cfg(storage_config: Optional[Dict], key: str, *env_keys: str, default: str = "") -> str: + """Read a value from storage_config dict, falling back to env vars.""" + if storage_config: + v = storage_config.get(key, "") + if v: + return str(v) + for ek in env_keys: + v = os.environ.get(ek, "") + if v: + return v + return default + + +class Storage(ABC): + """Abstract base class for object storage operations""" + + @abstractmethod + def upload_file(self, file_path: str) -> str: + pass + + @abstractmethod + def upload_bytes(self, content: bytes, file_ext: str = ".png") -> str: + pass + + +class CosStorage(Storage): + """Tencent Cloud COS storage implementation""" + + def __init__(self, storage_config: Optional[Dict] = None): + self.storage_config = storage_config + self.client, self.bucket_name, self.region, self.prefix = ( + self._init_cos_client() + ) + + def _init_cos_client(self): + try: + sc = self.storage_config + secret_id = _cfg(sc, "access_key_id", "COS_SECRET_ID") + secret_key = _cfg(sc, "secret_access_key", "COS_SECRET_KEY") + region = _cfg(sc, "region", "COS_REGION") + bucket_name = _cfg(sc, "bucket_name", "COS_BUCKET_NAME") + appid = _cfg(sc, "app_id", "COS_APP_ID") + prefix = _cfg(sc, "path_prefix", "COS_PATH_PREFIX") + enable_old_domain = os.environ.get("COS_ENABLE_OLD_DOMAIN", "").lower() in ("1", "true", "yes") + + if not all([secret_id, secret_key, region, bucket_name, appid]): + logger.error( + "Incomplete COS configuration: " + "secret_id=%s, region=%s, bucket=%s, appid=%s", + bool(secret_id), region, bucket_name, appid, + ) + return None, None, None, None + + logger.info("Initializing COS client: region=%s, bucket=%s", region, bucket_name) + config = CosConfig( + Appid=appid, + Region=region, + SecretId=secret_id, + SecretKey=secret_key, + EnableOldDomain=enable_old_domain, + ) + client = CosS3Client(config) + return client, bucket_name, region, prefix + except Exception as e: + logger.error("Failed to initialize COS client: %s", e) + return None, None, None, None + + def _get_download_url(self, bucket_name, region, object_key): + return f"https://{bucket_name}.cos.{region}.myqcloud.com/{object_key}" + + def upload_file(self, file_path: str) -> str: + try: + if not self.client: + return "" + file_ext = os.path.splitext(file_path)[1] + object_key = f"{self.prefix}/images/{uuid.uuid4().hex}{file_ext}" + self.client.upload_file( + Bucket=self.bucket_name, + LocalFilePath=file_path, + Key=object_key, + ) + file_url = self._get_download_url(self.bucket_name, self.region, object_key) + logger.info("COS upload_file ok: %s", file_url) + return file_url + except Exception as e: + logger.error("COS upload_file failed: %s", e) + return "" + + def upload_bytes(self, content: bytes, file_ext: str = ".png") -> str: + try: + if not self.client: + return "" + object_key = ( + f"{self.prefix}/images/{uuid.uuid4().hex}{file_ext}" + if self.prefix + else f"images/{uuid.uuid4().hex}{file_ext}" + ) + self.client.put_object( + Bucket=self.bucket_name, Body=content, Key=object_key + ) + file_url = self._get_download_url(self.bucket_name, self.region, object_key) + logger.info("COS upload_bytes ok: %s", file_url) + return file_url + except Exception as e: + logger.error("COS upload_bytes failed: %s", e) + traceback.print_exc() + return "" + + +class MinioStorage(Storage): + """MinIO storage implementation""" + + def __init__(self, storage_config: Optional[Dict] = None): + self.storage_config = storage_config + self.client, self.bucket_name, self.use_ssl, self.endpoint, self.path_prefix = ( + self._init_minio_client() + ) + + def _init_minio_client(self): + try: + sc = self.storage_config + access_key = _cfg(sc, "access_key_id", "MINIO_ACCESS_KEY_ID") + secret_key = _cfg(sc, "secret_access_key", "MINIO_SECRET_ACCESS_KEY") + bucket_name = _cfg(sc, "bucket_name", "MINIO_BUCKET_NAME") + path_prefix_raw = _cfg(sc, "path_prefix", "MINIO_PATH_PREFIX") + path_prefix = path_prefix_raw.strip().strip("/") if path_prefix_raw else "" + endpoint = _cfg(sc, "endpoint", "MINIO_ENDPOINT") + use_ssl = os.environ.get("MINIO_USE_SSL", "").lower() in ("1", "true", "yes") + + if not all([endpoint, access_key, secret_key, bucket_name]): + logger.error("Incomplete MinIO configuration") + return None, None, None, None, None + + client = Minio( + endpoint, access_key=access_key, secret_key=secret_key, secure=use_ssl + ) + + found = client.bucket_exists(bucket_name) + if not found: + client.make_bucket(bucket_name) + policy = ( + "{" + '"Version":"2012-10-17",' + '"Statement":[' + '{"Effect":"Allow","Principal":{"AWS":["*"]},' + '"Action":["s3:GetBucketLocation","s3:ListBucket"],' + '"Resource":["arn:aws:s3:::%s"]},' + '{"Effect":"Allow","Principal":{"AWS":["*"]},' + '"Action":["s3:GetObject"],' + '"Resource":["arn:aws:s3:::%s/*"]}' + "]}" % (bucket_name, bucket_name) + ) + client.set_bucket_policy(bucket_name, policy) + + return client, bucket_name, use_ssl, endpoint, path_prefix + except Exception as e: + logger.error("Failed to initialize MinIO client: %s", e) + return None, None, None, None, None + + def _get_download_url(self, object_key: str): + public_endpoint = os.environ.get("MINIO_PUBLIC_ENDPOINT", "") + if public_endpoint: + return f"{public_endpoint}/{self.bucket_name}/{object_key}" + scheme = "https" if self.use_ssl else "http" + return f"{scheme}://{self.endpoint}/{self.bucket_name}/{object_key}" + + def upload_file(self, file_path: str) -> str: + try: + if not self.client: + return "" + file_name = os.path.basename(file_path) + object_key = ( + f"{self.path_prefix}/images/{uuid.uuid4().hex}{os.path.splitext(file_name)[1]}" + if self.path_prefix + else f"images/{uuid.uuid4().hex}{os.path.splitext(file_name)[1]}" + ) + with open(file_path, "rb") as file_data: + file_size = os.path.getsize(file_path) + self.client.put_object( + bucket_name=self.bucket_name or "", + object_name=object_key, + data=file_data, + length=file_size, + content_type="application/octet-stream", + ) + file_url = self._get_download_url(object_key) + logger.info("MinIO upload_file ok: %s", file_url) + return file_url + except Exception as e: + logger.error("MinIO upload_file failed: %s", e) + return "" + + def upload_bytes(self, content: bytes, file_ext: str = ".png") -> str: + try: + if not self.client: + return "" + object_key = ( + f"{self.path_prefix}/images/{uuid.uuid4().hex}{file_ext}" + if self.path_prefix + else f"images/{uuid.uuid4().hex}{file_ext}" + ) + self.client.put_object( + self.bucket_name or "", + object_key, + data=io.BytesIO(content), + length=len(content), + content_type="application/octet-stream", + ) + file_url = self._get_download_url(object_key) + logger.info("MinIO upload_bytes ok: %s", file_url) + return file_url + except Exception as e: + logger.error("MinIO upload_bytes failed: %s", e) + traceback.print_exc() + return "" + + +class LocalStorage(Storage): + """Local file system storage implementation. + + Saves files under base_dir and returns web-accessible URL paths + (e.g. /files/images/uuid.jpg) so that the Go app can serve them. + """ + + def __init__(self, storage_config: Optional[Dict] = None): + sc = storage_config or {} + self.base_dir = ( + sc.get("base_dir") + or os.environ.get("LOCAL_STORAGE_BASE_DIR", "/data/files") + ) + path_prefix = (sc.get("path_prefix") or "").strip().strip("/") + if path_prefix: + self.image_dir = os.path.join(self.base_dir, path_prefix, "images") + else: + self.image_dir = os.path.join(self.base_dir, "images") + self.url_prefix = ( + sc.get("url_prefix") + or os.environ.get("LOCAL_STORAGE_URL_PREFIX", "/files") + ) + os.makedirs(self.image_dir, exist_ok=True) + + def _to_url(self, fpath: str) -> str: + if self.url_prefix: + rel = os.path.relpath(fpath, self.base_dir) + return f"{self.url_prefix}/{rel}" + return fpath + + def upload_file(self, file_path: str) -> str: + return file_path + + def upload_bytes(self, content: bytes, file_ext: str = ".png") -> str: + fpath = os.path.join(self.image_dir, f"{uuid.uuid4()}{file_ext}") + with open(fpath, "wb") as f: + f.write(content) + url = self._to_url(fpath) + logger.info("Local storage saved: %s -> %s", fpath, url) + return url + + +class Base64Storage(Storage): + def upload_file(self, file_path: str) -> str: + return file_path + + def upload_bytes(self, content: bytes, file_ext: str = ".png") -> str: + file_ext = file_ext.lstrip(".") + return f"data:image/{file_ext};base64,{endecode.decode_image(content)}" + + +class DummyStorage(Storage): + """Dummy storage — all uploads return empty string.""" + + def upload_file(self, file_path: str) -> str: + return "" + + def upload_bytes(self, content: bytes, file_ext: str = ".png") -> str: + return "" + + +def create_storage(storage_config: Optional[Dict[str, str]] = None) -> Storage: + """Create a storage instance based on storage_config dict. + + The ``provider`` key in storage_config determines the backend: + minio, cos, local, base64. + Falls back to STORAGE_TYPE env var, then ``local``. + """ + storage_type = "" + if storage_config: + provider = str(storage_config.get("provider", "")).lower().strip() + if provider and provider not in ("unspecified", "storage_provider_unspecified"): + storage_type = provider + + if not storage_type: + storage_type = os.environ.get("STORAGE_TYPE", "local").lower().strip() + + logger.info("Creating %s storage instance", storage_type) + + if storage_type == "minio": + return MinioStorage(storage_config) + elif storage_type == "cos": + return CosStorage(storage_config) + elif storage_type == "local": + return LocalStorage(storage_config) + elif storage_type == "base64": + return Base64Storage() + return DummyStorage() diff --git a/ai-core/parser/web_parser.py b/ai-core/parser/web_parser.py new file mode 100644 index 0000000..100399d --- /dev/null +++ b/ai-core/parser/web_parser.py @@ -0,0 +1,141 @@ +import asyncio +import logging + +from playwright.async_api import async_playwright +from trafilatura import extract + +from docreader.config import CONFIG +from docreader.models.document import Document +from docreader.parser.base_parser import BaseParser +from docreader.parser.chain_parser import PipelineParser +from docreader.parser.markdown_parser import MarkdownParser +from docreader.utils import endecode + +logger = logging.getLogger(__name__) + + +class StdWebParser(BaseParser): + """Standard web page parser using Playwright and Trafilatura. + + This parser scrapes web pages using Playwright's WebKit browser and extracts + clean content using Trafilatura library. It supports proxy configuration and + converts HTML content to markdown format. + """ + + def __init__(self, title: str, **kwargs): + """Initialize the web parser. + + Args: + title: Title of the web page to be used as file name + **kwargs: Additional arguments passed to BaseParser + """ + self.title = title + # Get proxy configuration from config if available + self.proxy = CONFIG.external_https_proxy + super().__init__(file_name=title, **kwargs) + logger.info(f"Initialized WebParser with title: {title}") + + async def scrape(self, url: str) -> str: + """Scrape web page content using Playwright. + + Args: + url: The URL of the web page to scrape + + Returns: + HTML content of the web page as string, empty string on error + """ + logger.info(f"Starting web page scraping for URL: {url}") + try: + async with async_playwright() as p: + kwargs = {} + # Configure proxy if available + if self.proxy: + kwargs["proxy"] = {"server": self.proxy} + logger.info("Launching WebKit browser") + browser = await p.webkit.launch(**kwargs) + page = await browser.new_page() + + logger.info(f"Navigating to URL: {url}") + try: + # Navigate to URL with 30 second timeout + await page.goto(url, timeout=30000) + logger.info("Initial page load complete") + except Exception as e: + logger.error(f"Error navigating to URL: {str(e)}") + await browser.close() + return "" + + logger.info("Retrieving page HTML content") + # Get the full HTML content of the page + content = await page.content() + logger.info(f"Retrieved {len(content)} bytes of HTML content") + + await browser.close() + logger.info("Browser closed") + + # Return raw HTML content for further processing + logger.info("Successfully retrieved HTML content") + return content + + except Exception as e: + logger.error(f"Failed to scrape web page: {str(e)}") + # Return empty string on error + return "" + + def parse_into_text(self, content: bytes) -> Document: + """Parse web page content into a Document object. + + Args: + content: URL encoded as bytes + + Returns: + Document object containing the parsed markdown content + """ + # Decode bytes to get the URL string + url = endecode.decode_bytes(content) + + logger.info(f"Scraping web page: {url}") + # Run async scraping in sync context + chtml = asyncio.run(self.scrape(url)) + # Extract clean content from HTML using Trafilatura + # Convert to markdown format with metadata, images, tables, and links + md_text = extract( + chtml, + output_format="markdown", + with_metadata=True, + include_images=True, + include_tables=True, + include_links=True, + ) + if not md_text: + logger.error("Failed to parse web page") + return Document(content=f"Error parsing web page: {url}") + return Document(content=md_text) + + +class WebParser(PipelineParser): + """Web parser using pipeline pattern. + + This parser chains StdWebParser (for web scraping and HTML to markdown conversion) + with MarkdownParser (for markdown processing). The pipeline processes content + sequentially through both parsers. + """ + + # Parser classes to be executed in sequence + _parser_cls = (StdWebParser, MarkdownParser) + + +if __name__ == "__main__": + # Configure logging for debugging + logging.basicConfig(level=logging.DEBUG) + logger.setLevel(logging.DEBUG) + + # Example URL to scrape + url = "https://cloud.tencent.com/document/product/457/6759" + + # Create parser instance and parse the web page + parser = WebParser(title="") + cc = parser.parse_into_text(url.encode()) + # Save the parsed markdown content to file + with open("./tencent.md", "w") as f: + f.write(cc.content) diff --git a/ai-core/requirements.txt b/ai-core/requirements.txt new file mode 100644 index 0000000..a9bd972 --- /dev/null +++ b/ai-core/requirements.txt @@ -0,0 +1,16 @@ +# AI-Core Document Parser + +# gRPC 框架 +grpcio>=1.60.0 +grpcio-tools>=1.60.0 +grpcio-reflection>=1.60.0 +protobuf>=4.25.0 + +# HTTP 请求 +requests>=2.31.0 + +# 配置文件解析 +pyyaml>=6.0 + +# 文档解析 +markitdown[pdf,docx,pptx,xlsx,all]>=0.0.1 diff --git a/ai-core/service/grpc_server.py b/ai-core/service/grpc_server.py new file mode 100644 index 0000000..6888f4a --- /dev/null +++ b/ai-core/service/grpc_server.py @@ -0,0 +1,208 @@ +""" +gRPC Server for Document Parser +""" +import logging +import requests +from concurrent import futures +import grpc +from grpc_reflection.v1alpha import reflection +import sys +import os +import io + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "proto")) + +from parser import Parser + +logger = logging.getLogger(__name__) + +# 导入 proto 生成的文件 +try: + import document_parser_pb2 + import document_parser_pb2_grpc + PROTO_AVAILABLE = True +except ImportError: + logger.warning("Proto files not found, please run: python generate_grpc.py") + PROTO_AVAILABLE = False + + +class DocumentParserServicer: + """gRPC 服务实现""" + + def __init__(self, max_workers: int = 10): + self.parser = Parser() + self.max_workers = max_workers + logger.info("DocumentParserServicer initialized") + + def ParseDocument(self, request, context): + """解析文档""" + if not PROTO_AVAILABLE: + return {"success": False, "message": "Proto not available"} + + try: + logger.info( + "ParseDocument request: file_url=%s, file_name=%s", + request.file_url, + request.file_name, + ) + + file_url = request.file_url + file_name = request.file_name + + if not file_url: + return document_parser_pb2.ParseResponse( + success=False, + content="", + message="file_url is required", + content_length=0, + ) + + if not file_name: + return document_parser_pb2.ParseResponse( + success=False, + content="", + message="file_name is required", + content_length=0, + ) + + # 提取 VLM 配置 + vlm_config = None + if hasattr(request, 'vlm_config') and request.vlm_config: + vlm_cfg = request.vlm_config + if vlm_cfg.enabled: + vlm_config = { + "enabled": vlm_cfg.enabled, + "provider": vlm_cfg.provider, + "model": vlm_cfg.model, + "api_key": vlm_cfg.api_key, + "base_url": vlm_cfg.base_url, + "prompt": vlm_cfg.prompt, + } + logger.info(f"VLM config: provider={vlm_cfg.provider}, model={vlm_cfg.model}") + + # 下载文件 + logger.info("Downloading file from URL: %s", file_url) + try: + response = requests.get( + file_url, + timeout=60, + headers={"User-Agent": "DocParser/1.0"}, + ) + response.raise_for_status() + content = response.content + logger.info("Downloaded %d bytes", len(content)) + except requests.RequestException as e: + logger.error("Failed to download file: %s", str(e)) + return document_parser_pb2.ParseResponse( + success=False, + content="", + message=f"Failed to download file: {str(e)}", + content_length=0, + ) + + # 解析 + logger.info("Parsing file") + file_type = os.path.splitext(file_name)[1][1:] # 去掉点的扩展名 + + result = self.parser.parse_file( + file_name=file_name, + file_type=file_type, + content=content, + vlm_config=vlm_config, + ) + + if not result.content: + return document_parser_pb2.ParseResponse( + success=False, + content="", + message="Parse failed or empty content", + content_length=0, + ) + + markdown_content = result.content + logger.info("Parse successful: content_length=%d", len(markdown_content)) + + return document_parser_pb2.ParseResponse( + success=True, + content=markdown_content, + message="Parse successful", + content_length=len(markdown_content), + file_type=file_type or "auto", + parser_engine="markitdown", + ) + + except Exception as e: + logger.error("ParseDocument error: %s", str(e), exc_info=True) + return document_parser_pb2.ParseResponse( + success=False, + content="", + message=f"Parse error: {str(e)}", + content_length=0, + ) + + def GetSupportedFormats(self, request, context): + """获取支持的格式""" + if not PROTO_AVAILABLE: + return None + + try: + file_types = [ + "pdf", "docx", "doc", "pptx", "ppt", + "xlsx", "xls", "csv", + "md", "markdown", + "jpg", "jpeg", "png", "gif", "bmp", "tiff", "webp", + "html", "htm", "txt", + ] + return document_parser_pb2.SupportedFormatsResponse( + file_types=file_types, + ) + except Exception as e: + logger.error("GetSupportedFormats error: %s", str(e)) + return None + + def GetEngines(self, request, context): + """获取解析引擎""" + if not PROTO_AVAILABLE: + return None + + try: + engines = [ + document_parser_pb2.EngineInfo( + name="markitdown", + description="MarkItDown parser - supports various document formats", + supported_file_types=["pdf", "docx", "pptx", "xlsx", "md", "html", "txt"], + available=True, + ) + ] + return document_parser_pb2.EnginesResponse(engines=engines) + except Exception as e: + logger.error("GetEngines error: %s", str(e)) + return None + + +def serve(port: int = 50051, max_workers: int = 10): + """启动 gRPC 服务""" + if not PROTO_AVAILABLE: + logger.error("Proto files not available, cannot start server") + return + + server = grpc.server(futures.ThreadPoolExecutor(max_workers=max_workers)) + servicer = DocumentParserServicer(max_workers=max_workers) + + # 注册服务 + document_parser_pb2_grpc.add_DocumentParserServicer_to_server( + servicer, server + ) + + # 启用反射 + reflection.enable_server_reflection( + [document_parser_pb2.DESCRIPTOR.services_by_name['DocumentParser']], + server + ) + + server.add_insecure_port(f"0.0.0.0:{port}") + server.start() + logger.info(f"DocumentParser gRPC server started on port {port}") + logger.info("gRPC reflection enabled") + server.wait_for_termination()