feat: 完善 AI-Core 文档解析器

- 添加多种文档解析器 (PDF, Word, Excel, Markdown 等)
- 添加基础解析器和链式解析器
- 添加存储和注册机制
- 添加 gRPC 服务实现

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-10 15:01:52 +08:00
parent 54473bc378
commit d24b29afe4
19 changed files with 4056 additions and 31 deletions

66
ai-core/main.py Normal file
View File

@@ -0,0 +1,66 @@
"""
AI-Core Document Parser gRPC Server
启动命令: python main.py [--port PORT] [--max-workers MAX_WORKERS] [--log-level LEVEL]
"""
import argparse
import logging
import os
import sys
sys.path.insert(0, os.path.dirname(__file__))
from service.grpc_server import serve
DEFAULT_PORT = 50051
DEFAULT_MAX_WORKERS = 10
def main():
parser = argparse.ArgumentParser(
description="Document Parser gRPC Server",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"--port",
type=int,
default=DEFAULT_PORT,
help="Port to listen on",
)
parser.add_argument(
"--max-workers",
type=int,
default=DEFAULT_MAX_WORKERS,
help="Maximum number of worker threads",
)
parser.add_argument(
"--log-level",
type=str,
default="INFO",
choices=["DEBUG", "INFO", "WARNING", "ERROR"],
help="Log level",
)
args = parser.parse_args()
logging.basicConfig(
level=getattr(logging, args.log_level),
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)
logger.info("Starting Document Parser gRPC Server")
logger.info("Port: %d", args.port)
logger.info("Max workers: %d", args.max_workers)
try:
serve(port=args.port, max_workers=args.max_workers)
except KeyboardInterrupt:
logger.info("Server shutdown requested")
except Exception as e:
logger.error("Server error: %s", str(e), exc_info=True)
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -1,38 +1,10 @@
"""
Parser module for WeKnora document processing system.
This module provides document parsers for various file formats including:
- Microsoft Word documents (.doc, .docx)
- PDF documents
- Markdown files
- Plain text files
- Images with text content
- Web pages
The parsers extract content from documents and can split them into
meaningful chunks for further processing and indexing.
Parser module for AI-Core document processing.
"""
from .doc_parser import DocParser
from .docx2_parser import Docx2Parser
from .excel_parser import ExcelParser
from .image_parser import ImageParser
from .markdown_parser import MarkdownParser
from .parser import Parser
from .pdf_parser import PDFParser
from .registry import ParserEngineRegistry, registry
from .web_parser import WebParser
from .parser_simple import Parser, Document
# Export public classes and modules
__all__ = [
"Docx2Parser",
"DocParser",
"PDFParser",
"MarkdownParser",
"ImageParser",
"WebParser",
"Parser",
"ExcelParser",
"ParserEngineRegistry",
"registry",
"Document",
]

View File

@@ -0,0 +1,61 @@
# -*- coding: utf-8 -*-
import logging
import os
from abc import ABC, abstractmethod
from typing import Optional
from docreader.models.document import Document
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
class BaseParser(ABC):
"""Base parser interface.
After the lightweight refactoring, BaseParser only extracts markdown text
and raw image references from documents. Chunking, image storage, OCR,
and VLM caption are handled by the Go App module.
"""
def __init__(
self,
file_name: str = "",
file_type: Optional[str] = None,
**kwargs,
):
self.file_name = file_name
self.file_type = file_type or os.path.splitext(file_name)[1].lstrip(".")
logger.info(
"Initializing parser for file=%s, type=%s",
file_name,
self.file_type,
)
@abstractmethod
def parse_into_text(self, content: bytes) -> Document:
"""Parse document content into markdown text.
Returns:
Document with ``content`` (markdown string) and optional
``images`` dict mapping storage-relative paths to base64 data.
"""
def parse(self, content: bytes) -> Document:
"""Parse document and return markdown + image references.
No chunking, no OCR, no VLM caption — those are done in Go.
"""
logger.info(
"Parsing document with %s, bytes: %d",
self.__class__.__name__,
len(content),
)
document = self.parse_into_text(content)
logger.info(
"Extracted %d characters from %s",
len(document.content),
self.file_name,
)
return document

View File

@@ -0,0 +1,176 @@
"""
Chain Parser Module
This module provides two chain-of-responsibility pattern implementations for document parsing:
1. FirstParser: Tries multiple parsers sequentially until one succeeds
2. PipelineParser: Chains parsers where each parser processes the output of the previous one
"""
import logging
from typing import Dict, List, Tuple, Type
from docreader.models.document import Document
from docreader.parser.base_parser import BaseParser
from docreader.utils import endecode
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
class FirstParser(BaseParser):
"""
First-success parser that tries multiple parsers in sequence.
This parser attempts to parse content using each registered parser in order.
It returns the result from the first parser that successfully produces a valid document.
If all parsers fail, it returns an empty Document.
Usage:
# Create a custom FirstParser with specific parser classes
CustomParser = FirstParser.create(MarkdownParser, HTMLParser)
parser = CustomParser()
document = parser.parse_into_text(content_bytes)
"""
# Tuple of parser classes to be instantiated
_parser_cls: Tuple[Type["BaseParser"], ...] = ()
def __init__(self, *args, **kwargs):
"""Initialize FirstParser with configured parser classes."""
super().__init__(*args, **kwargs)
# Instantiate all parser classes into parser instances
self._parsers: List[BaseParser] = []
for parser_cls in self._parser_cls:
parser = parser_cls(*args, **kwargs)
self._parsers.append(parser)
def parse_into_text(self, content: bytes) -> Document:
"""Parse content using the first parser that succeeds.
Args:
content: Raw bytes content to be parsed
Returns:
Document: Parsed document from the first successful parser,
or an empty Document if all parsers fail
"""
for p in self._parsers:
logger.info(f"FirstParser: using parser {p.__class__.__name__}")
try:
document = p.parse_into_text(content)
except Exception:
logger.exception(
"FirstParser: parser %s raised exception; trying next parser",
p.__class__.__name__,
)
continue
if document.is_valid():
logger.info(f"FirstParser: parser {p.__class__.__name__} succeeded")
return document
return Document()
@classmethod
def create(cls, *parser_classes: Type["BaseParser"]) -> Type["FirstParser"]:
"""Factory method to create a FirstParser subclass with specific parsers.
Args:
*parser_classes: Variable number of BaseParser subclasses to try in order
Returns:
Type[FirstParser]: A new FirstParser subclass configured with the given parsers
Example:
CustomParser = FirstParser.create(MarkdownParser, HTMLParser)
parser = CustomParser()
"""
# Generate a descriptive class name based on parser names
names = "_".join([p.__name__ for p in parser_classes])
# Dynamically create a new class with the parser configuration
return type(f"FirstParser_{names}", (cls,), {"_parser_cls": parser_classes})
class PipelineParser(BaseParser):
"""
Pipeline parser that chains multiple parsers sequentially.
This parser processes content through a series of parsers where each parser
receives the output of the previous parser as input. Images from all parsers
are accumulated and merged into the final document.
Usage:
# Create a custom PipelineParser with specific parser classes
CustomParser = PipelineParser.create(PreParser, MarkdownParser, PostParser)
parser = CustomParser()
document = parser.parse_into_text(content_bytes)
"""
# Tuple of parser classes to be instantiated and chained
_parser_cls: Tuple[Type["BaseParser"], ...] = ()
def __init__(self, *args, **kwargs):
"""Initialize PipelineParser with configured parser classes."""
super().__init__(*args, **kwargs)
# Instantiate all parser classes into parser instances
self._parsers: List[BaseParser] = []
for parser_cls in self._parser_cls:
parser = parser_cls(*args, **kwargs)
self._parsers.append(parser)
def parse_into_text(self, content: bytes) -> Document:
"""Parse content through a pipeline of parsers.
Each parser in the pipeline processes the output of the previous parser.
Images from all parsers are accumulated and merged into the final document.
Args:
content: Raw bytes content to be parsed
Returns:
Document: Final document after processing through all parsers,
with accumulated images from all stages
"""
# Accumulate images from all parsers
images: Dict[str, str] = {}
document = Document()
for p in self._parsers:
logger.info(f"PipelineParser: using parser {p.__class__.__name__}")
# Parse content with current parser
document = p.parse_into_text(content)
# Convert document content back to bytes for next parser
content = endecode.encode_bytes(document.content)
# Accumulate images from this parser
images.update(document.images)
# Merge all accumulated images into final document
document.images.update(images)
return document
@classmethod
def create(cls, *parser_classes: Type["BaseParser"]) -> Type["PipelineParser"]:
"""Factory method to create a PipelineParser subclass with specific parsers.
Args:
*parser_classes: Variable number of BaseParser subclasses to chain in order
Returns:
Type[PipelineParser]: A new PipelineParser subclass configured with the given parsers
Example:
CustomParser = PipelineParser.create(PreprocessParser, MarkdownParser)
parser = CustomParser()
"""
# Generate a descriptive class name based on parser names
names = "_".join([p.__name__ for p in parser_classes])
# Dynamically create a new class with the parser configuration
return type(f"PipelineParser_{names}", (cls,), {"_parser_cls": parser_classes})
if __name__ == "__main__":
from docreader.parser.markdown_parser import MarkdownParser
# Example: Create and use a FirstParser with MarkdownParser
FpCls = FirstParser.create(MarkdownParser)
lparser = FpCls()
print(lparser.parse_into_text(b"aaa"))

View File

@@ -0,0 +1,331 @@
import logging
import os
import subprocess
from typing import List, Optional
import textract
from docreader.config import CONFIG
from docreader.models.document import Document
from docreader.parser.docx2_parser import Docx2Parser
from docreader.utils.tempfile import TempDirContext, TempFileContext
logger = logging.getLogger(__name__)
class SandboxExecutor:
"""Sandbox executor for running commands with proxy configuration"""
def __init__(self, proxy: Optional[str] = None, default_timeout: int = 60):
"""Initialize sandbox executor with configuration
Args:
proxy: Proxy URL to use for network access. If None, will use WEB_PROXY environment variable
default_timeout: Default timeout in seconds for command execution
"""
# Get proxy from parameter, environment variable, or use default blocking proxy
# Use 'or None' to convert empty string to None, then apply default value
self.proxy = proxy or CONFIG.external_https_proxy or "http://128.0.0.1:1"
self.default_timeout = default_timeout
def execute_in_sandbox(self, cmd: List[str]) -> tuple:
"""Execute command in sandbox with proxy configuration
Args:
cmd: Command to execute
Returns:
Tuple of (stdout, stderr, returncode)
"""
# Try different sandbox methods in order of preference
sandbox_methods = [
self._execute_with_proxy,
]
for method in sandbox_methods:
try:
return method(cmd)
except Exception as e:
logger.warning(f"Sandbox method {method.__name__} failed: {e}")
continue
raise RuntimeError("All sandbox methods failed")
def _execute_with_proxy(self, cmd: List[str]) -> tuple:
"""Execute command with proxy configuration
Args:
cmd: Command to execute
Returns:
Tuple of (stdout, stderr, returncode)
"""
# Set up environment with proxy configuration
env = os.environ.copy()
if self.proxy:
env["http_proxy"] = self.proxy
env["https_proxy"] = self.proxy
env["HTTP_PROXY"] = self.proxy
env["HTTPS_PROXY"] = self.proxy
logger.info(f"Executing command with proxy: {' '.join(cmd)}")
if self.proxy:
logger.info(f"Using proxy: {self.proxy}")
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env,
)
try:
stdout, stderr = process.communicate(timeout=self.default_timeout)
return stdout, stderr, process.returncode
except subprocess.TimeoutExpired:
process.kill()
raise RuntimeError(
f"Command execution timeout after {self.default_timeout} seconds"
)
logger = logging.getLogger(__name__)
class DocParser(Docx2Parser):
"""DOC document parser"""
def __init__(self, *args, **kwargs):
"""Initialize DOC parser with sandbox executor"""
super().__init__(*args, **kwargs)
self.sandbox_executor = SandboxExecutor()
def parse_into_text(self, content: bytes) -> Document:
logger.info(f"Parsing DOC document, content size: {len(content)} bytes")
handle_chain = [
# 1. Try to convert to docx format to extract images
self._parse_with_docx,
# 2. If image extraction is not needed or conversion failed,
# try using antiword to extract text
self._parse_with_antiword,
# 3. If antiword extraction fails, use textract
# NOTE: _parse_with_textract is disabled due to SSRF vulnerability
# self._parse_with_textract,
]
# Save byte content as a temporary file
with TempFileContext(content, ".doc") as temp_file_path:
for handle in handle_chain:
try:
document = handle(temp_file_path)
if document:
return document
except Exception as e:
logger.warning(f"Failed to parse DOC with {handle.__name__} {e}")
return Document(content="")
def _parse_with_docx(self, temp_file_path: str) -> Document:
logger.info("Multimodal enabled, attempting to extract images from DOC")
docx_content = self._try_convert_doc_to_docx(temp_file_path)
if not docx_content:
raise RuntimeError("Failed to convert DOC to DOCX")
logger.info("Successfully converted DOC to DOCX, using DocxParser")
# Use existing DocxParser to parse the converted docx
document = super(Docx2Parser, self).parse_into_text(docx_content)
logger.info(f"Extracted {len(document.content)} characters using DocxParser")
return document
def _parse_with_antiword(self, temp_file_path: str) -> Document:
logger.info("Attempting to parse DOC file with antiword")
# Check if antiword is installed
antiword_path = self._try_find_antiword()
if not antiword_path:
raise RuntimeError("antiword not found in PATH")
# Use antiword to extract text directly in sandbox
cmd = [antiword_path, temp_file_path]
logger.info("Executing antiword in sandbox with proxy configuration")
stdout, stderr, returncode = self.sandbox_executor.execute_in_sandbox(cmd)
if returncode != 0:
raise RuntimeError(
f"antiword extraction failed: {stderr.decode('utf-8', errors='ignore')}"
)
text = stdout.decode("utf-8", errors="ignore")
logger.info(f"Successfully extracted {len(text)} characters using antiword")
return Document(content=text)
def _parse_with_textract(self, temp_file_path: str) -> Document:
logger.info(f"Parsing DOC file with textract: {temp_file_path}")
text = textract.process(temp_file_path, method="antiword").decode("utf-8")
logger.info(f"Successfully extracted {len(text)} bytes of DOC using textract")
return Document(content=str(text))
def _try_convert_doc_to_docx(self, doc_path: str) -> Optional[bytes]:
"""Convert DOC file to DOCX format
Uses LibreOffice/OpenOffice for conversion
Args:
doc_path: DOC file path
Returns:
Byte stream of DOCX file content, or None if conversion fails
"""
logger.info(f"Converting DOC to DOCX: {doc_path}")
# Check if LibreOffice or OpenOffice is installed
soffice_path = self._try_find_soffice()
if not soffice_path:
return None
# Execute conversion command
logger.info(f"Using {soffice_path} to convert DOC to DOCX")
# Create a temporary directory to store the converted file
with TempDirContext() as temp_dir:
cmd = [
soffice_path,
"--headless",
"--convert-to",
"docx",
"--outdir",
temp_dir,
doc_path,
]
logger.info(f"Running command in sandbox: {' '.join(cmd)}")
# Execute in sandbox with proxy configuration
stdout, stderr, returncode = self.sandbox_executor.execute_in_sandbox(cmd)
if returncode != 0:
logger.warning(
f"Error converting DOC to DOCX: {stderr.decode('utf-8')}"
)
return None
# Find the converted file
docx_file = [
file for file in os.listdir(temp_dir) if file.endswith(".docx")
]
logger.info(f"Found {len(docx_file)} DOCX file(s) in temporary directory")
for file in docx_file:
converted_file = os.path.join(temp_dir, file)
logger.info(f"Found converted file: {converted_file}")
# Read the converted file content
with open(converted_file, "rb") as f:
docx_content = f.read()
logger.info(
f"Successfully read DOCX file, size: {len(docx_content)}"
)
return docx_content
return None
def _try_find_executable_path(
self,
executable_name: str,
possible_path: List[str] = [],
environment_variable: List[str] = [],
) -> Optional[str]:
"""Find executable path
Args:
executable_name: Executable name
possible_path: List of possible paths
environment_variable: List of environment variables to check
Returns:
Executable path, or None if not found
"""
# Common executable paths
paths: List[str] = []
paths.extend(possible_path)
paths.extend(os.environ.get(env_var, "") for env_var in environment_variable)
paths = list(set(paths))
# Check if path is set in environment variable
for path in paths:
if os.path.exists(path):
logger.info(f"Found {executable_name} at {path}")
return path
# Try to find in PATH
result = subprocess.run(
["which", executable_name], capture_output=True, text=True
)
if result.returncode == 0 and result.stdout.strip():
path = result.stdout.strip()
logger.info(f"Found {executable_name} at {path}")
return path
logger.warning(f"Failed to find {executable_name}")
return None
def _try_find_soffice(self) -> Optional[str]:
"""Find LibreOffice/OpenOffice executable path
Returns:
Executable path, or None if not found
"""
# Common LibreOffice/OpenOffice executable paths
possible_paths = [
# Linux
"/usr/bin/soffice",
"/usr/lib/libreoffice/program/soffice",
"/opt/libreoffice25.2/program/soffice",
# macOS
"/Applications/LibreOffice.app/Contents/MacOS/soffice",
# Windows
"C:\\Program Files\\LibreOffice\\program\\soffice.exe",
"C:\\Program Files (x86)\\LibreOffice\\program\\soffice.exe",
]
return self._try_find_executable_path(
executable_name="soffice",
possible_path=possible_paths,
environment_variable=["LIBREOFFICE_PATH"],
)
def _try_find_antiword(self) -> Optional[str]:
"""Find antiword executable path
Returns:
Executable path, or None if not found
"""
# Common antiword executable paths
possible_paths = [
# Linux/macOS
"/usr/bin/antiword",
"/usr/local/bin/antiword",
# Windows
"C:\\Program Files\\Antiword\\antiword.exe",
"C:\\Program Files (x86)\\Antiword\\antiword.exe",
]
return self._try_find_executable_path(
executable_name="antiword",
possible_path=possible_paths,
environment_variable=["ANTIWORD_PATH"],
)
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
file_name = "/path/to/your/test.doc"
logger.info(f"Processing file: {file_name}")
doc_parser = DocParser(
file_name=file_name,
enable_multimodal=True,
chunk_size=512,
chunk_overlap=60,
)
with open(file_name, "rb") as f:
content = f.read()
document = doc_parser.parse_into_text(content)
logger.info(f"Processing complete, extracted text length: {len(document.content)}")
logger.info(f"Sample text: {document.content[:200]}...")

View File

@@ -0,0 +1,28 @@
import logging
from docreader.parser.chain_parser import FirstParser
from docreader.parser.docx_parser import DocxParser
from docreader.parser.markitdown_parser import MarkitdownParser
logger = logging.getLogger(__name__)
class Docx2Parser(FirstParser):
_parser_cls = (MarkitdownParser, DocxParser)
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
your_file = "/path/to/your/file.docx"
parser = Docx2Parser(separators=[".", "?", "!", "", "", ""])
with open(your_file, "rb") as f:
content = f.read()
document = parser.parse(content)
for cc in document.chunks:
logger.info(f"chunk: {cc}")
# document = parser.parse_into_text(content)
# logger.info(f"docx content: {document.content}")
# logger.info(f"find images {document.images.keys()}")

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,119 @@
"""
Excel Parser Module
This module provides functionality to parse Excel files (.xlsx, .xls) into
structured Document objects with text content and chunks. It supports multiple
sheets and handles various Excel formats using pandas.
"""
import logging
from io import BytesIO
from typing import List
import pandas as pd
from docreader.models.document import Chunk, Document
from docreader.parser.base_parser import BaseParser
logger = logging.getLogger(__name__)
class ExcelParser(BaseParser):
"""Parser for Excel files (.xlsx, .xls).
This parser extracts text content from Excel files by processing all sheets
and converting each row into a structured text format. Each row becomes a
separate chunk with key-value pairs.
Features:
- Supports multiple sheets in a single Excel file
- Automatically removes completely empty rows
- Converts each row to "column: value" format
- Creates individual chunks for each row for better granularity
Example:
>>> parser = ExcelParser()
>>> with open("data.xlsx", "rb") as f:
... content = f.read()
... document = parser.parse_into_text(content)
>>> print(document.content)
Name: John,Age: 30,City: NYC
Name: Jane,Age: 25,City: LA
"""
def parse_into_text(self, content: bytes) -> Document:
"""Parse Excel file bytes into a Document object.
Args:
content: Raw bytes of the Excel file
Returns:
Document: Parsed document containing:
- content: Full text with all rows from all sheets
- chunks: List of Chunk objects, one per row
Note:
- Empty rows (all NaN values) are automatically skipped
- Each row is formatted as: "col1: val1,col2: val2,..."
- Chunks maintain sequential ordering across all sheets
"""
chunks: List[Chunk] = []
text: List[str] = []
start, end = 0, 0
# Load Excel file from bytes into pandas ExcelFile object
excel_file = pd.ExcelFile(BytesIO(content))
# Process each sheet in the Excel file
for excel_sheet_name in excel_file.sheet_names:
# Parse the sheet into a DataFrame
df = excel_file.parse(sheet_name=excel_sheet_name)
# Remove rows where all values are NaN (completely empty rows)
df.dropna(how="all", inplace=True)
# Process each row in the DataFrame
for _, row in df.iterrows():
page_content = []
# Build key-value pairs for non-null values
for k, v in row.items():
if pd.notna(v): # Skip NaN/null values
page_content.append(f"{k}: {v}")
# Skip rows with no valid content
if not page_content:
continue
# Format row as comma-separated key-value pairs
content_row = ",".join(page_content) + "\n"
end += len(content_row)
text.append(content_row)
# Create a chunk for this row with position tracking
chunks.append(
Chunk(content=content_row, seq=len(chunks), start=start, end=end)
)
start = end
# Combine all text and return as Document
return Document(content="".join(text), chunks=chunks)
if __name__ == "__main__":
# Example usage: Parse an Excel file and display results
logging.basicConfig(level=logging.DEBUG)
# Specify the path to your Excel file
your_file = "/path/to/your/file.xlsx"
parser = ExcelParser()
# Read and parse the Excel file
with open(your_file, "rb") as f:
content = f.read()
document = parser.parse_into_text(content)
# Display the full document content
logger.error(document.content)
# Display the first chunk as an example
for chunk in document.chunks:
logger.error(chunk.content)
break # Only show the first chunk

View File

@@ -0,0 +1,28 @@
import base64
import logging
import os
from docreader.models.document import Document
from docreader.parser.base_parser import BaseParser
logger = logging.getLogger(__name__)
class ImageParser(BaseParser):
"""Parser for standalone image files.
Returns the image as a markdown reference with the raw image data
in Document.images so that the Go-side ImageResolver (or main.py's
_resolve_images) can handle storage upload.
"""
def parse_into_text(self, content: bytes) -> Document:
logger.info("Parsing image file=%s, size=%d bytes", self.file_name, len(content))
ext = os.path.splitext(self.file_name)[1].lower() or ".png"
ref_path = f"images/{self.file_name}"
text = f"![{self.file_name}]({ref_path})"
images = {ref_path: base64.b64encode(content).decode()}
return Document(content=text, images=images)

View File

@@ -0,0 +1,403 @@
"""
Markdown Parser Module
This module provides comprehensive Markdown parsing functionality including:
- Table formatting and standardization
- Base64 image extraction and conversion
- Image path replacement and URL generation
- Pipeline-based parsing with multiple stages
The parser uses a pipeline approach to process Markdown content through
multiple stages: table formatting -> image processing.
"""
import base64
import logging
import os
import re
import uuid
from typing import Dict, List, Match, Optional, Tuple
from docreader.models.document import Document
from docreader.parser.base_parser import BaseParser
from docreader.parser.chain_parser import PipelineParser
from docreader.utils import endecode
# Get logger object
logger = logging.getLogger(__name__)
class MarkdownTableUtil:
"""Utility class for formatting Markdown tables.
This class standardizes Markdown table formatting by:
- Normalizing column alignment markers (e.g., :---, :---:, ---:)
- Adding consistent spacing around pipes (|)
- Preserving indentation levels
- Handling both header rows and data rows
Example:
Input: |姓名|年龄|城市|
|:---|---:|:---:|
|张三|25|北京|
Output: | 姓名 | 年龄 | 城市 |
| :--- | ---: | :---: |
| 张三 | 25 | 北京 |
"""
def __init__(self):
# Pattern to match alignment row (e.g., |:---|---:|:---:|)
self.align_pattern = re.compile(
r"^([\t ]*)\|[\t ]*[:-]+(?:[\t ]*\|[\t ]*[:-]+)*[\t ]*\|[\t ]*$",
re.MULTILINE,
)
# Pattern to match regular table rows (header or data)
self.line_pattern = re.compile(
r"^([\t ]*)\|[\t ]*[^|\r\n]*(?:[\t ]*\|[^|\r\n]*)*\|[\t ]*$",
re.MULTILINE,
)
def format_table(self, content: str) -> str:
"""Format all Markdown tables in the content.
Args:
content: Raw Markdown text containing tables
Returns:
Formatted Markdown text with standardized table formatting
"""
def process_align(match: Match[str]) -> str:
"""Process alignment row to standardize format."""
# Split by | and remove empty strings
columns = [col.strip() for col in match.group(0).split("|") if col.strip()]
processed = []
for col in columns:
# Preserve left alignment marker (:---)
left_colon = ":" if col.startswith(":") else ""
# Preserve right alignment marker (---:)
right_colon = ":" if col.endswith(":") else ""
processed.append(left_colon + "---" + right_colon)
# Preserve original indentation
prefix = match.group(1)
return prefix + "| " + " | ".join(processed) + " |"
def process_line(match: Match[str]) -> str:
"""Process regular table row to standardize format."""
# Split by | and remove empty strings
columns = [col.strip() for col in match.group(0).split("|") if col.strip()]
# Preserve original indentation
prefix = match.group(1)
return prefix + "| " + " | ".join(columns) + " |"
formatted_content = content
# First format regular rows (header and data)
formatted_content = self.line_pattern.sub(process_line, formatted_content)
# Then format alignment rows (must be done after to avoid conflicts)
formatted_content = self.align_pattern.sub(process_align, formatted_content)
return formatted_content
@staticmethod
def _self_test():
test_content = """
# 测试表格
普通文本---不会被匹配
## 表格1无前置空格
| 姓名 | 年龄 | 城市 |
| :---------- | -------: | :------ |
| 张三 | 25 | 北京 |
## 表格3前置4个空格+首尾|
| 产品 | 价格 | 库存 |
| :-------------: | ----------- | :-----------: |
| 手机 | 5999 | 100 |
"""
util = MarkdownTableUtil()
format_content = util.format_table(test_content)
print(format_content)
class MarkdownTableFormatter(BaseParser):
"""Parser for formatting Markdown tables.
This parser standardizes the formatting of all Markdown tables in the
document to ensure consistent spacing and alignment markers.
Example:
>>> formatter = MarkdownTableFormatter()
>>> content = b"|Name|Age|\n|---|---|\n|John|30|"
>>> doc = formatter.parse_into_text(content)
>>> print(doc.content)
| Name | Age |
| --- | --- |
| John | 30 |
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.table_helper = MarkdownTableUtil()
def parse_into_text(self, content: bytes) -> Document:
"""Parse and format Markdown tables.
Args:
content: Raw Markdown content as bytes
Returns:
Document with formatted table content
"""
# Decode bytes to string with automatic encoding detection
text = endecode.decode_bytes(content)
# Format all tables in the content
text = self.table_helper.format_table(text)
return Document(content=text)
class MarkdownImageUtil:
"""Utility class for handling images in Markdown.
This class provides functionality to:
- Extract base64-encoded images from Markdown
- Extract image paths from Markdown
- Replace image paths with new URLs
- Convert base64 images to binary format
Supported formats:
- Base64 embedded images: ![alt](data:image/png;base64,iVBORw0...)
- Regular image links: ![alt](path/to/image.png)
"""
def __init__(self):
# Pattern to match base64 embedded images
# Captures: (1) alt text, (2) image format, (3) base64 data
self.b64_pattern = re.compile(
r"!\[([^\]]*)\]\(data:image/(\w+)\+?\w*;base64,([^\)]+)\)"
)
# Pattern to match regular image syntax
self.image_pattern = re.compile(r"!\[([^\]]*)\]\(([^)]+)\)")
# Pattern for replacing image paths
self.replace_pattern = re.compile(r"!\[([^\]]*)\]\(([^)]+)\)")
def extract_image(
self,
content: str,
path_prefix: Optional[str] = None,
replace: bool = True,
) -> Tuple[str, List[str]]:
"""Extract image paths from Markdown content.
Args:
content: Markdown text containing images
path_prefix: Optional prefix to add to image paths
replace: Whether to replace image syntax in content
Returns:
Tuple of (processed_text, list_of_image_paths)
Example:
>>> util = MarkdownImageUtil()
>>> text, images = util.extract_image("![logo](img/logo.png)")
>>> print(images)
['img/logo.png']
"""
# List to store extracted image paths
images: List[str] = []
def repl(match: Match[str]) -> str:
"""Replacement function for each image match."""
title = match.group(1) # Alt text
image_path = match.group(2) # Image path
# Add prefix if specified
if path_prefix:
image_path = f"{path_prefix}/{image_path}"
images.append(image_path)
# Keep original if replace is False
if not replace:
return match.group(0)
# Replace image path with potentially prefixed path
return f"![{title}]({image_path})"
text = self.image_pattern.sub(repl, content)
logger.debug(f"Extracted {len(images)} images from markdown")
return text, images
def extract_base64(
self,
content: str,
path_prefix: Optional[str] = None,
replace: bool = True,
) -> Tuple[str, Dict[str, bytes]]:
"""Extract and decode base64 embedded images from Markdown.
This method finds all base64-encoded images in the Markdown content,
decodes them to binary format, generates unique filenames, and
optionally replaces them with file path references.
Args:
content: Markdown text containing base64 images
path_prefix: Optional directory prefix for generated paths
replace: Whether to replace base64 syntax with file paths
Returns:
Tuple of (processed_text, dict_of_path_to_bytes)
Example:
>>> util = MarkdownImageUtil()
>>> text = "![logo](data:image/png;base64,iVBORw0KGg...)"
>>> new_text, images = util.extract_base64(text, "images")
>>> print(new_text)
![logo](images/uuid.png)
>>> print(len(images))
1
"""
# Dictionary mapping generated file paths to binary image data
images: Dict[str, bytes] = {}
def repl(match: Match[str]) -> str:
"""Replacement function for each base64 image match."""
title = match.group(1) # Alt text
img_ext = match.group(2) # Image format (png, jpg, etc.)
img_b64 = match.group(3) # Base64 encoded data
# Decode base64 string to bytes
image_byte = endecode.encode_image(img_b64, errors="ignore")
if not image_byte:
logger.error(f"Failed to decode base64 image skip it: {img_b64}")
return title # Return just the alt text if decode fails
# Generate unique filename with original extension
image_path = f"{uuid.uuid4()}.{img_ext}"
if path_prefix:
image_path = f"{path_prefix}/{image_path}"
images[image_path] = image_byte
# Keep original base64 if replace is False
if not replace:
return match.group(0)
# Replace base64 data with file path reference
return f"![{title}]({image_path})"
text = self.b64_pattern.sub(repl, content)
logger.debug(f"Extracted {len(images)} base64 images from markdown")
return text, images
def replace_path(self, content: str, images: Dict[str, str]) -> str:
"""Replace image paths in Markdown with new URLs.
This method is typically used to replace local file paths with
uploaded URLs after images have been stored.
Args:
content: Markdown text with image references
images: Mapping of old paths to new URLs
Returns:
Markdown text with updated image URLs
Example:
>>> util = MarkdownImageUtil()
>>> content = "![logo](temp/img.png)"
>>> mapping = {"temp/img.png": "https://cdn.com/img.png"}
>>> result = util.replace_path(content, mapping)
>>> print(result)
![logo](https://cdn.com/img.png)
"""
# Track which paths were actually replaced
content_replace: set = set()
def repl(match: Match[str]) -> str:
"""Replacement function for each image match."""
title = match.group(1) # Alt text
image_path = match.group(2) # Current image path
# Only replace if path exists in mapping
if image_path not in images:
return match.group(0) # Keep original
content_replace.add(image_path)
# Get new URL from mapping
image_path = images[image_path]
return f"![{title}]({image_path})" if image_path else title
text = self.replace_pattern.sub(repl, content)
logger.debug(f"Replaced {len(content_replace)} images in markdown")
return text
@staticmethod
def _self_test():
your_content = "test![](data:image/png;base64,iVBORw0KGgoAAAA)test"
image_handle = MarkdownImageUtil()
text, images = image_handle.extract_base64(your_content)
print(text)
for image_url, image_byte in images.items():
with open(image_url, "wb") as f:
f.write(image_byte)
class MarkdownImageBase64(BaseParser):
"""Parser for extracting base64 images from Markdown.
Extracts base64-encoded images, replaces them with path references,
and returns the raw image data in Document.images for the Go-side
ImageResolver (or main.py _resolve_images) to handle storage.
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.image_helper = MarkdownImageUtil()
def parse_into_text(self, content: bytes) -> Document:
text = endecode.decode_bytes(content)
text, img_b64 = self.image_helper.extract_base64(text, path_prefix="images")
images: Dict[str, str] = {}
for ipath, raw_bytes in img_b64.items():
images[ipath] = base64.b64encode(raw_bytes).decode()
logger.debug("Extracted %d base64 images from markdown", len(images))
return Document(content=text, images=images)
class MarkdownParser(PipelineParser):
"""Complete Markdown parser using pipeline approach.
This parser processes Markdown content through multiple stages:
1. MarkdownTableFormatter: Standardizes table formatting
2. MarkdownImageBase64: Extracts and uploads base64 images
The pipeline ensures that content flows through each parser in sequence,
with each stage's output becoming the next stage's input.
"""
_parser_cls = (MarkdownTableFormatter, MarkdownImageBase64)
if __name__ == "__main__":
# Example usage and testing
logging.basicConfig(level=logging.DEBUG)
# Test the complete MarkdownParser pipeline
your_content = "test![](data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAAMgA)test"
parser = MarkdownParser()
# Parse content and display results
document = parser.parse_into_text(your_content.encode())
logger.info(document.content)
logger.info(f"Images: {len(document.images)}, name: {document.images.keys()}")
# Run individual utility tests
MarkdownImageUtil._self_test()
MarkdownTableUtil._self_test()

View File

@@ -0,0 +1,107 @@
import io
import logging
import re
import base64
from markitdown import MarkItDown
from docreader.models.document import Document
from docreader.parser.base_parser import BaseParser
from docreader.parser.chain_parser import PipelineParser
from docreader.parser.markdown_parser import MarkdownParser
# 尝试导入 VLMClient
try:
from parser.vlm_client import VLMClient
except ImportError:
VLMClient = None
logger = logging.getLogger(__name__)
class StdMarkitdownParser(BaseParser):
"""
Standard MarkItDown Parser Wrapper
This parser uses the markitdown library to convert various document formats
(docx, pptx, pdf, etc.) into text/markdown.
Optionally uses VLM to process images.
"""
def __init__(self, *args, vlm_config=None, **kwargs):
# 这里的 super() 会调用 BaseParser 的初始化,确保 self.file_type 被正确赋值
super().__init__(*args, **kwargs)
self.markitdown = MarkItDown()
self.vlm_config = vlm_config
self.vlm_client = None
# 如果有 VLM 配置,初始化 VLM 客户端
if vlm_config and vlm_config.get("enabled") and VLMClient:
try:
self.vlm_client = VLMClient(vlm_config)
logger.info(f"VLM client initialized: provider={vlm_config.get('provider')}, model={vlm_config.get('model')}")
except Exception as e:
logger.warning(f"Failed to initialize VLM client: {e}")
def parse_into_text(self, content: bytes) -> Document:
"""
Parses content using MarkItDown.
Uses self.file_type (inherited from BaseParser) to hint the stream format.
"""
ext = self.file_type
if ext and not ext.startswith('.'):
ext = '.' + ext
# 直接调用 convert移除 try-catch让异常由上层 PipelineParser 统一捕获
result = self.markitdown.convert(
io.BytesIO(content),
file_extension=ext,
keep_data_uris=True
)
markdown_content = result.text_content
# 如果有 VLM 客户端,尝试处理图片
if self.vlm_client and markdown_content:
markdown_content = self._process_images_with_vlm(markdown_content)
return Document(content=markdown_content)
def _process_images_with_vlm(self, content: str) -> str:
"""
处理 Markdown 内容中的图片,使用 VLM 分析并替换
"""
# 匹配 data:image 开头的 Base64 图片
pattern = r'!\[([^\]]*)\]\((data:image/([^;]+);base64,([A-Za-z0-9+/=]+))\)'
def replace_image(match):
alt_text = match.group(1)
data_url = match.group(2)
mime_type = match.group(3) or "image/png"
base64_data = match.group(4)
try:
# 解码 Base64 图片
image_bytes = base64.b64decode(base64_data)
# 调用 VLM 分析图片
logger.info(f"Processing image with VLM: {alt_text or 'unnamed'}")
vlm_result = self.vlm_client.analyze_image(image_bytes, mime_type)
if vlm_result.get("success"):
vlm_content = vlm_result.get("content", "")
logger.info(f"VLM processed image successfully, content length: {len(vlm_content)}")
# 替换为 VLM 解析的内容
return f"<!-- Image: {alt_text} -->\n{vlm_content}\n<!-- End Image -->"
else:
logger.warning(f"VLM failed for image: {vlm_result.get('error')}")
return match.group(0) # 保留原图片引用
except Exception as e:
logger.error(f"Error processing image with VLM: {e}")
return match.group(0) # 保留原图片引用
return re.sub(pattern, replace_image, content)
class MarkitdownParser(PipelineParser):
_parser_cls = (StdMarkitdownParser, MarkdownParser)

88
ai-core/parser/parser.py Normal file
View File

@@ -0,0 +1,88 @@
import logging
from typing import Any, Optional
from docreader.models.document import Document
from docreader.parser.registry import registry
from docreader.parser.web_parser import WebParser
logger = logging.getLogger(__name__)
class Parser:
"""Document parser facade (lightweight version).
Converts files/URLs to markdown + image references.
No chunking, no storage, no OCR, no VLM.
"""
def __init__(self):
self.registry = registry
logger.info(
"Parser initialized with engines: %s",
", ".join(self.registry.get_engine_names()),
)
def parse_file(
self,
file_name: str,
file_type: str,
content: bytes,
parser_engine: Optional[str] = None,
engine_overrides: Optional[dict[str, Any]] = None,
vlm_config: Optional[dict[str, Any]] = None,
) -> Document:
"""Parse file content to markdown."""
engine = parser_engine or ""
overrides = engine_overrides or {}
logger.info(
"Parsing file: %s, type: %s, engine: %s, vlm_enabled: %s",
file_name,
file_type,
engine or "builtin",
vlm_config.get("enabled") if vlm_config else False,
)
# 如果有 VLM 配置,添加到 overrides 中
if vlm_config and vlm_config.get("enabled"):
overrides["vlm_config"] = vlm_config
cls = self.registry.get_parser_class(engine, file_type)
logger.info(
"Creating %s parser instance for %s file",
cls.__name__,
file_type,
)
parser = cls(
file_name=file_name,
file_type=file_type,
**overrides,
)
logger.info("Starting to parse file content, size: %d bytes", len(content))
result = parser.parse(content)
if not result.content:
logger.warning("Parser returned empty content for file: %s", file_name)
logger.info(
"Parsed file %s, content length=%d", file_name, len(result.content)
)
return result
def parse_url(
self,
url: str,
title: str,
parser_engine: Optional[str] = None,
engine_overrides: Optional[dict[str, Any]] = None,
) -> Document:
"""Parse content from a URL to markdown."""
logger.info("Parsing URL: %s, title: %s", url, title)
parser = WebParser(title=title)
logger.info("Starting to parse URL content")
result = parser.parse(url.encode())
if not result.content:
logger.warning("Parser returned empty content for url: %s", url)
logger.info("Parsed url %s, content length=%d", url, len(result.content))
return result

View File

@@ -0,0 +1,275 @@
"""
简化的 Parser - 使用 markitdown + VLM
"""
import logging
import os
import io
import re
import base64
from typing import Optional, Any, Dict
from markitdown import MarkItDown
logger = logging.getLogger(__name__)
class Document:
"""简单的文档对象"""
def __init__(self, content: str = "", chunks: list = None, metadata: dict = None):
self.content = content
self.chunks = chunks or []
self.metadata = metadata or {}
class VLMClient:
"""VLM 客户端"""
def __init__(self, config: Dict[str, Any]):
self.provider = config.get("provider", "openai")
self.model = config.get("model", "gpt-4o")
self.api_key = config.get("api_key", "")
self.base_url = config.get("base_url", "")
self.prompt = config.get("prompt", "") or self._default_prompt()
logger.info(f"VLMClient initialized: provider={self.provider}, model={self.model}")
def _default_prompt(self) -> str:
return """请分析这个文档图片的内容,并将其转换为 Markdown 格式。
要求:
1. 保持原文的格式和结构
2. 表格用 Markdown 表格格式
3. 标题用 # ## ### 标记
4. 尽量保留原文的所有信息"""
def analyze_image(self, content: bytes, mime_type: str) -> Dict[str, Any]:
"""分析图片"""
if self.provider == "openai":
return self._call_openai(content, mime_type)
elif self.provider == "anthropic":
return self._call_anthropic(content, mime_type)
elif self.provider == "qwen":
return self._call_qwen(content, mime_type)
else:
return {"success": False, "error": f"Unknown provider: {self.provider}"}
def _call_openai(self, content: bytes, mime_type: str) -> Dict[str, Any]:
try:
import requests
url = (self.base_url or "https://api.openai.com/v1") + "/chat/completions"
image_b64 = base64.b64encode(content).decode("utf-8")
headers = {"Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json"}
payload = {
"model": self.model,
"messages": [{
"role": "user",
"content": [
{"type": "text", "text": self.prompt},
{"type": "image_url", "image_url": {"url": f"data:{mime_type};base64,{image_b64}"}}
]
}],
"max_tokens": 4096
}
resp = requests.post(url, headers=headers, json=payload, timeout=120)
resp.raise_for_status()
result = resp.json()
return {"success": True, "content": result["choices"][0]["message"]["content"]}
except Exception as e:
logger.error(f"OpenAI VLM error: {e}")
return {"success": False, "error": str(e)}
def _call_anthropic(self, content: bytes, mime_type: str) -> Dict[str, Any]:
try:
import requests
url = (self.base_url or "https://api.anthropic.com/v1") + "/messages"
image_b64 = base64.b64encode(content).decode("utf-8")
headers = {
"x-api-key": self.api_key,
"anthropic-version": "2023-06-01",
"Content-Type": "application/json"
}
payload = {
"model": self.model,
"max_tokens": 4096,
"messages": [{
"role": "user",
"content": [
{"type": "text", "text": self.prompt},
{"type": "image", "source": {"type": "base64", "media_type": mime_type, "data": image_b64}}
]
}]
}
resp = requests.post(url, headers=headers, json=payload, timeout=120)
resp.raise_for_status()
result = resp.json()
return {"success": True, "content": result["content"][0]["text"]}
except Exception as e:
logger.error(f"Anthropic VLM error: {e}")
return {"success": False, "error": str(e)}
def _call_qwen(self, content: bytes, mime_type: str) -> Dict[str, Any]:
try:
import requests
url = (self.base_url or "https://dashscope.aliyuncs.com/compatible-mode/v1") + "/chat/completions"
image_b64 = base64.b64encode(content).decode("utf-8")
headers = {"Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json"}
payload = {
"model": self.model,
"messages": [{
"role": "user",
"content": [
{"type": "text", "text": self.prompt},
{"type": "image_url", "image_url": {"url": f"data:{mime_type};base64,{image_b64}"}}
]
}]
}
resp = requests.post(url, headers=headers, json=payload, timeout=120)
resp.raise_for_status()
result = resp.json()
return {"success": True, "content": result["choices"][0]["message"]["content"]}
except Exception as e:
logger.error(f"Qwen VLM error: {e}")
return {"success": False, "error": str(e)}
class Parser:
"""基于 MarkItDown + VLM 的文档解析器"""
def __init__(self):
self.markitdown = MarkItDown()
self.vlm_client: Optional[VLMClient] = None
logger.info("Parser initialized with MarkItDown")
def set_vlm_config(self, config: Dict[str, Any]) -> None:
"""设置 VLM 配置"""
if config and config.get("enabled") and config.get("api_key"):
self.vlm_client = VLMClient(config)
logger.info(f"VLM enabled: provider={config.get('provider')}, model={config.get('model')}")
else:
self.vlm_client = None
def _should_use_vlm(self, file_name: str) -> bool:
"""判断是否应该使用 VLM"""
if not self.vlm_client:
return False
ext = os.path.splitext(file_name)[1].lower()
# 图片和 PDF 都使用 VLM
image_exts = ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp', '.tiff']
return ext in image_exts or ext == '.pdf'
def _process_images_with_vlm(self, content: str) -> str:
"""处理 Markdown 内容中的图片"""
# 匹配 data:image 开头的 Base64 图片
pattern = r'!\[([^\]]*)\]\((data:image/([^;]+);base64,([A-Za-z0-9+/=]+))\)'
def replace_image(match):
alt_text = match.group(1)
data_url = match.group(2)
mime_type = match.group(3) or "image/png"
base64_data = match.group(4)
try:
image_bytes = base64.b64decode(base64_data)
logger.info(f"Processing image with VLM: {alt_text or 'unnamed'}")
vlm_result = self.vlm_client.analyze_image(image_bytes, mime_type)
if vlm_result.get("success"):
vlm_content = vlm_result.get("content", "")
logger.info(f"VLM processed image, content length: {len(vlm_content)}")
return f"<!-- Image: {alt_text} -->\n{vlm_content}\n<!-- End Image -->"
else:
logger.warning(f"VLM failed: {vlm_result.get('error')}")
return match.group(0)
except Exception as e:
logger.error(f"VLM error: {e}")
return match.group(0)
return re.sub(pattern, replace_image, content)
def _parse_with_vlm(self, content: bytes, file_name: str) -> Document:
"""使用 VLM 直接解析整个文件"""
ext = os.path.splitext(file_name)[1].lower()
mime_types = {
'.jpg': 'image/jpeg', '.jpeg': 'image/jpeg', '.png': 'image/png',
'.gif': 'image/gif', '.bmp': 'image/bmp', '.webp': 'image/webp',
'.tiff': 'image/tiff', '.pdf': 'application/pdf',
}
mime_type = mime_types.get(ext, 'image/png')
result = self.vlm_client.analyze_image(content, mime_type)
if result.get("success"):
return Document(content=result["content"], metadata={"vlm": True})
else:
logger.error(f"VLM failed: {result.get('error')}")
return Document(content="")
def parse_file(
self,
file_name: str,
file_type: str,
content: bytes,
parser_engine: Optional[str] = None,
engine_overrides: Optional[dict[str, Any]] = None,
vlm_config: Optional[dict[str, Any]] = None,
) -> Document:
"""解析文件内容"""
logger.info(f"Parsing file: {file_name}, type: {file_type}, vlm_config={'enabled' if vlm_config and vlm_config.get('enabled') else 'none'}")
# 设置 VLM 配置
if vlm_config and vlm_config.get("enabled"):
self.set_vlm_config(vlm_config)
# 判断是否使用 VLM 直接解析
if self._should_use_vlm(file_name):
logger.info(f"Using VLM for {file_name}")
return self._parse_with_vlm(content, file_name)
# 使用 MarkItDown 解析
try:
ext = file_type
if not ext.startswith('.'):
ext = '.' + ext
result = self.markitdown.convert(
io.BytesIO(content),
file_extension=ext,
keep_data_uris=True
)
markdown_content = result.text_content or ""
# 如果有 VLM处理图片
if self.vlm_client and markdown_content:
markdown_content = self._process_images_with_vlm(markdown_content)
return Document(
content=markdown_content,
metadata=result.metadata if hasattr(result, 'metadata') else {}
)
except Exception as e:
logger.error(f"Parse error: {e}")
return Document(content="")
def parse_url(
self,
url: str,
title: str,
parser_engine: Optional[str] = None,
engine_overrides: Optional[dict[str, Any]] = None,
) -> Document:
"""解析 URL"""
logger.info(f"Parsing URL: {url}, title: {title}")
try:
result = self.markitdown.convert(url)
return Document(content=result.text_content or "")
except Exception as e:
logger.error(f"URL parse error: {e}")
return Document(content="")
# 导出
__all__ = ["Parser", "Document"]

View File

@@ -0,0 +1,15 @@
from docreader.parser.chain_parser import FirstParser
from docreader.parser.markitdown_parser import MarkitdownParser
class PDFParser(FirstParser):
"""PDF Parser using chain of responsibility pattern
Attempts to parse PDF files using multiple parser backends in order:
1. MinerUParser - Primary parser for PDF documents
2. MarkitdownParser - Fallback parser if MinerU fails
The first successful parser result will be returned.
"""
# Parser classes to try in order (chain of responsibility pattern)
_parser_cls = (MarkitdownParser,)

160
ai-core/parser/registry.py Normal file
View File

@@ -0,0 +1,160 @@
import logging
from typing import Any, Callable, Dict, List, Optional, Tuple, Type
from docreader.parser.base_parser import BaseParser
from docreader.parser.doc_parser import DocParser
from docreader.parser.docx2_parser import Docx2Parser
from docreader.parser.excel_parser import ExcelParser
from docreader.parser.image_parser import ImageParser
from docreader.parser.markdown_parser import MarkdownParser
from docreader.parser.markitdown_parser import MarkitdownParser
from docreader.parser.pdf_parser import PDFParser
logger = logging.getLogger(__name__)
BUILTIN_ENGINE = "builtin"
class ParserEngineRegistry:
"""Registry for parser engines.
Each engine maps file extensions to parser classes.
When a requested engine doesn't support a file type, the registry
falls back to the builtin engine automatically.
"""
def __init__(self):
self._engines: Dict[str, Dict[str, Type[BaseParser]]] = {}
self._descriptions: Dict[str, str] = {}
self._check_available: Dict[str, Callable[..., Tuple[bool, str]]] = {}
self._unavailable_hint: Dict[str, str] = {}
def register(
self,
name: str,
file_types: Dict[str, Type[BaseParser]],
description: str = "",
check_available: Callable[..., Tuple[bool, str]] | None = None,
unavailable_hint: str = "",
):
self._engines[name] = file_types
self._descriptions[name] = description
if check_available is not None:
self._check_available[name] = check_available
self._unavailable_hint[name] = unavailable_hint
logger.info(
"Registered parser engine '%s' with file types: %s",
name,
", ".join(file_types.keys()),
)
def get_parser_class(self, engine: str, file_type: str) -> Type[BaseParser]:
"""Resolve parser class for the given engine and file type.
Falls back to builtin engine when the requested engine doesn't
support the file type.
"""
ft = file_type.lower()
if engine and engine in self._engines:
cls = self._engines[engine].get(ft)
if cls:
logger.info("Using engine '%s' for file type '%s'", engine, ft)
return cls
logger.info(
"Engine '%s' does not support '%s', falling back to builtin",
engine,
ft,
)
builtin = self._engines.get(BUILTIN_ENGINE, {})
cls = builtin.get(ft)
if cls:
return cls
raise ValueError(f"Unsupported file type: {file_type}")
def list_engines(self, overrides: Optional[Dict[str, str]] = None) -> List[Dict]:
"""Return metadata for all registered engines, including availability.
Args:
overrides: tenant-level config overrides (e.g. mineru_endpoint, mineru_api_key)
forwarded to each engine's check_available function.
"""
result = []
for name, parsers in self._engines.items():
available = True
unavailable_reason = ""
check = self._check_available.get(name)
if check is not None:
try:
available, unavailable_reason = check(overrides)
except Exception as e:
available = False
unavailable_reason = str(e) or self._unavailable_hint.get(name, "")
if not available and not unavailable_reason:
unavailable_reason = self._unavailable_hint.get(name, "不可用")
result.append(
{
"name": name,
"description": self._descriptions.get(name, ""),
"file_types": sorted(parsers.keys()),
"available": available,
"unavailable_reason": unavailable_reason,
}
)
return result
def get_engine_names(self) -> List[str]:
return list(self._engines.keys())
def _build_default_registry() -> ParserEngineRegistry:
"""Create and populate the default registry with all known engines."""
reg = ParserEngineRegistry()
_image_types = {
ext: ImageParser for ext in ("jpg", "jpeg", "png", "gif", "bmp", "tiff", "webp")
}
reg.register(
BUILTIN_ENGINE,
{
"docx": Docx2Parser,
"doc": DocParser,
"pdf": PDFParser,
"md": MarkdownParser,
"markdown": MarkdownParser,
"xlsx": ExcelParser,
"xls": ExcelParser,
**_image_types,
},
description="内置解析引擎",
)
reg.register(
"markitdown",
{
"md": MarkitdownParser,
"markdown": MarkitdownParser,
"pdf": MarkitdownParser,
"docx": MarkitdownParser,
"doc": MarkitdownParser,
"pptx": MarkitdownParser,
"ppt": MarkitdownParser,
"xlsx": MarkitdownParser,
"xls": MarkitdownParser,
"csv": MarkitdownParser,
},
description="MarkItDown 解析引擎(微软 MarkItDown 库)",
)
# NOTE: Engine listing is managed by Go-side engine registry
# (docparser.ListAllEngines). The Python list_engines method is kept for
# backward compatibility with the gRPC ListEngines RPC but the Go app
# no longer calls it. MinerU engines are handled natively by Go.
return reg
registry = _build_default_registry()

322
ai-core/parser/storage.py Normal file
View File

@@ -0,0 +1,322 @@
# -*- coding: utf-8 -*-
import io
import logging
import os
import traceback
import uuid
from abc import ABC, abstractmethod
from typing import Dict, Optional
from minio import Minio
from qcloud_cos import CosConfig, CosS3Client
from docreader.utils import endecode
logger = logging.getLogger(__name__)
def _cfg(storage_config: Optional[Dict], key: str, *env_keys: str, default: str = "") -> str:
"""Read a value from storage_config dict, falling back to env vars."""
if storage_config:
v = storage_config.get(key, "")
if v:
return str(v)
for ek in env_keys:
v = os.environ.get(ek, "")
if v:
return v
return default
class Storage(ABC):
"""Abstract base class for object storage operations"""
@abstractmethod
def upload_file(self, file_path: str) -> str:
pass
@abstractmethod
def upload_bytes(self, content: bytes, file_ext: str = ".png") -> str:
pass
class CosStorage(Storage):
"""Tencent Cloud COS storage implementation"""
def __init__(self, storage_config: Optional[Dict] = None):
self.storage_config = storage_config
self.client, self.bucket_name, self.region, self.prefix = (
self._init_cos_client()
)
def _init_cos_client(self):
try:
sc = self.storage_config
secret_id = _cfg(sc, "access_key_id", "COS_SECRET_ID")
secret_key = _cfg(sc, "secret_access_key", "COS_SECRET_KEY")
region = _cfg(sc, "region", "COS_REGION")
bucket_name = _cfg(sc, "bucket_name", "COS_BUCKET_NAME")
appid = _cfg(sc, "app_id", "COS_APP_ID")
prefix = _cfg(sc, "path_prefix", "COS_PATH_PREFIX")
enable_old_domain = os.environ.get("COS_ENABLE_OLD_DOMAIN", "").lower() in ("1", "true", "yes")
if not all([secret_id, secret_key, region, bucket_name, appid]):
logger.error(
"Incomplete COS configuration: "
"secret_id=%s, region=%s, bucket=%s, appid=%s",
bool(secret_id), region, bucket_name, appid,
)
return None, None, None, None
logger.info("Initializing COS client: region=%s, bucket=%s", region, bucket_name)
config = CosConfig(
Appid=appid,
Region=region,
SecretId=secret_id,
SecretKey=secret_key,
EnableOldDomain=enable_old_domain,
)
client = CosS3Client(config)
return client, bucket_name, region, prefix
except Exception as e:
logger.error("Failed to initialize COS client: %s", e)
return None, None, None, None
def _get_download_url(self, bucket_name, region, object_key):
return f"https://{bucket_name}.cos.{region}.myqcloud.com/{object_key}"
def upload_file(self, file_path: str) -> str:
try:
if not self.client:
return ""
file_ext = os.path.splitext(file_path)[1]
object_key = f"{self.prefix}/images/{uuid.uuid4().hex}{file_ext}"
self.client.upload_file(
Bucket=self.bucket_name,
LocalFilePath=file_path,
Key=object_key,
)
file_url = self._get_download_url(self.bucket_name, self.region, object_key)
logger.info("COS upload_file ok: %s", file_url)
return file_url
except Exception as e:
logger.error("COS upload_file failed: %s", e)
return ""
def upload_bytes(self, content: bytes, file_ext: str = ".png") -> str:
try:
if not self.client:
return ""
object_key = (
f"{self.prefix}/images/{uuid.uuid4().hex}{file_ext}"
if self.prefix
else f"images/{uuid.uuid4().hex}{file_ext}"
)
self.client.put_object(
Bucket=self.bucket_name, Body=content, Key=object_key
)
file_url = self._get_download_url(self.bucket_name, self.region, object_key)
logger.info("COS upload_bytes ok: %s", file_url)
return file_url
except Exception as e:
logger.error("COS upload_bytes failed: %s", e)
traceback.print_exc()
return ""
class MinioStorage(Storage):
"""MinIO storage implementation"""
def __init__(self, storage_config: Optional[Dict] = None):
self.storage_config = storage_config
self.client, self.bucket_name, self.use_ssl, self.endpoint, self.path_prefix = (
self._init_minio_client()
)
def _init_minio_client(self):
try:
sc = self.storage_config
access_key = _cfg(sc, "access_key_id", "MINIO_ACCESS_KEY_ID")
secret_key = _cfg(sc, "secret_access_key", "MINIO_SECRET_ACCESS_KEY")
bucket_name = _cfg(sc, "bucket_name", "MINIO_BUCKET_NAME")
path_prefix_raw = _cfg(sc, "path_prefix", "MINIO_PATH_PREFIX")
path_prefix = path_prefix_raw.strip().strip("/") if path_prefix_raw else ""
endpoint = _cfg(sc, "endpoint", "MINIO_ENDPOINT")
use_ssl = os.environ.get("MINIO_USE_SSL", "").lower() in ("1", "true", "yes")
if not all([endpoint, access_key, secret_key, bucket_name]):
logger.error("Incomplete MinIO configuration")
return None, None, None, None, None
client = Minio(
endpoint, access_key=access_key, secret_key=secret_key, secure=use_ssl
)
found = client.bucket_exists(bucket_name)
if not found:
client.make_bucket(bucket_name)
policy = (
"{"
'"Version":"2012-10-17",'
'"Statement":['
'{"Effect":"Allow","Principal":{"AWS":["*"]},'
'"Action":["s3:GetBucketLocation","s3:ListBucket"],'
'"Resource":["arn:aws:s3:::%s"]},'
'{"Effect":"Allow","Principal":{"AWS":["*"]},'
'"Action":["s3:GetObject"],'
'"Resource":["arn:aws:s3:::%s/*"]}'
"]}" % (bucket_name, bucket_name)
)
client.set_bucket_policy(bucket_name, policy)
return client, bucket_name, use_ssl, endpoint, path_prefix
except Exception as e:
logger.error("Failed to initialize MinIO client: %s", e)
return None, None, None, None, None
def _get_download_url(self, object_key: str):
public_endpoint = os.environ.get("MINIO_PUBLIC_ENDPOINT", "")
if public_endpoint:
return f"{public_endpoint}/{self.bucket_name}/{object_key}"
scheme = "https" if self.use_ssl else "http"
return f"{scheme}://{self.endpoint}/{self.bucket_name}/{object_key}"
def upload_file(self, file_path: str) -> str:
try:
if not self.client:
return ""
file_name = os.path.basename(file_path)
object_key = (
f"{self.path_prefix}/images/{uuid.uuid4().hex}{os.path.splitext(file_name)[1]}"
if self.path_prefix
else f"images/{uuid.uuid4().hex}{os.path.splitext(file_name)[1]}"
)
with open(file_path, "rb") as file_data:
file_size = os.path.getsize(file_path)
self.client.put_object(
bucket_name=self.bucket_name or "",
object_name=object_key,
data=file_data,
length=file_size,
content_type="application/octet-stream",
)
file_url = self._get_download_url(object_key)
logger.info("MinIO upload_file ok: %s", file_url)
return file_url
except Exception as e:
logger.error("MinIO upload_file failed: %s", e)
return ""
def upload_bytes(self, content: bytes, file_ext: str = ".png") -> str:
try:
if not self.client:
return ""
object_key = (
f"{self.path_prefix}/images/{uuid.uuid4().hex}{file_ext}"
if self.path_prefix
else f"images/{uuid.uuid4().hex}{file_ext}"
)
self.client.put_object(
self.bucket_name or "",
object_key,
data=io.BytesIO(content),
length=len(content),
content_type="application/octet-stream",
)
file_url = self._get_download_url(object_key)
logger.info("MinIO upload_bytes ok: %s", file_url)
return file_url
except Exception as e:
logger.error("MinIO upload_bytes failed: %s", e)
traceback.print_exc()
return ""
class LocalStorage(Storage):
"""Local file system storage implementation.
Saves files under base_dir and returns web-accessible URL paths
(e.g. /files/images/uuid.jpg) so that the Go app can serve them.
"""
def __init__(self, storage_config: Optional[Dict] = None):
sc = storage_config or {}
self.base_dir = (
sc.get("base_dir")
or os.environ.get("LOCAL_STORAGE_BASE_DIR", "/data/files")
)
path_prefix = (sc.get("path_prefix") or "").strip().strip("/")
if path_prefix:
self.image_dir = os.path.join(self.base_dir, path_prefix, "images")
else:
self.image_dir = os.path.join(self.base_dir, "images")
self.url_prefix = (
sc.get("url_prefix")
or os.environ.get("LOCAL_STORAGE_URL_PREFIX", "/files")
)
os.makedirs(self.image_dir, exist_ok=True)
def _to_url(self, fpath: str) -> str:
if self.url_prefix:
rel = os.path.relpath(fpath, self.base_dir)
return f"{self.url_prefix}/{rel}"
return fpath
def upload_file(self, file_path: str) -> str:
return file_path
def upload_bytes(self, content: bytes, file_ext: str = ".png") -> str:
fpath = os.path.join(self.image_dir, f"{uuid.uuid4()}{file_ext}")
with open(fpath, "wb") as f:
f.write(content)
url = self._to_url(fpath)
logger.info("Local storage saved: %s -> %s", fpath, url)
return url
class Base64Storage(Storage):
def upload_file(self, file_path: str) -> str:
return file_path
def upload_bytes(self, content: bytes, file_ext: str = ".png") -> str:
file_ext = file_ext.lstrip(".")
return f"data:image/{file_ext};base64,{endecode.decode_image(content)}"
class DummyStorage(Storage):
"""Dummy storage — all uploads return empty string."""
def upload_file(self, file_path: str) -> str:
return ""
def upload_bytes(self, content: bytes, file_ext: str = ".png") -> str:
return ""
def create_storage(storage_config: Optional[Dict[str, str]] = None) -> Storage:
"""Create a storage instance based on storage_config dict.
The ``provider`` key in storage_config determines the backend:
minio, cos, local, base64.
Falls back to STORAGE_TYPE env var, then ``local``.
"""
storage_type = ""
if storage_config:
provider = str(storage_config.get("provider", "")).lower().strip()
if provider and provider not in ("unspecified", "storage_provider_unspecified"):
storage_type = provider
if not storage_type:
storage_type = os.environ.get("STORAGE_TYPE", "local").lower().strip()
logger.info("Creating %s storage instance", storage_type)
if storage_type == "minio":
return MinioStorage(storage_config)
elif storage_type == "cos":
return CosStorage(storage_config)
elif storage_type == "local":
return LocalStorage(storage_config)
elif storage_type == "base64":
return Base64Storage()
return DummyStorage()

View File

@@ -0,0 +1,141 @@
import asyncio
import logging
from playwright.async_api import async_playwright
from trafilatura import extract
from docreader.config import CONFIG
from docreader.models.document import Document
from docreader.parser.base_parser import BaseParser
from docreader.parser.chain_parser import PipelineParser
from docreader.parser.markdown_parser import MarkdownParser
from docreader.utils import endecode
logger = logging.getLogger(__name__)
class StdWebParser(BaseParser):
"""Standard web page parser using Playwright and Trafilatura.
This parser scrapes web pages using Playwright's WebKit browser and extracts
clean content using Trafilatura library. It supports proxy configuration and
converts HTML content to markdown format.
"""
def __init__(self, title: str, **kwargs):
"""Initialize the web parser.
Args:
title: Title of the web page to be used as file name
**kwargs: Additional arguments passed to BaseParser
"""
self.title = title
# Get proxy configuration from config if available
self.proxy = CONFIG.external_https_proxy
super().__init__(file_name=title, **kwargs)
logger.info(f"Initialized WebParser with title: {title}")
async def scrape(self, url: str) -> str:
"""Scrape web page content using Playwright.
Args:
url: The URL of the web page to scrape
Returns:
HTML content of the web page as string, empty string on error
"""
logger.info(f"Starting web page scraping for URL: {url}")
try:
async with async_playwright() as p:
kwargs = {}
# Configure proxy if available
if self.proxy:
kwargs["proxy"] = {"server": self.proxy}
logger.info("Launching WebKit browser")
browser = await p.webkit.launch(**kwargs)
page = await browser.new_page()
logger.info(f"Navigating to URL: {url}")
try:
# Navigate to URL with 30 second timeout
await page.goto(url, timeout=30000)
logger.info("Initial page load complete")
except Exception as e:
logger.error(f"Error navigating to URL: {str(e)}")
await browser.close()
return ""
logger.info("Retrieving page HTML content")
# Get the full HTML content of the page
content = await page.content()
logger.info(f"Retrieved {len(content)} bytes of HTML content")
await browser.close()
logger.info("Browser closed")
# Return raw HTML content for further processing
logger.info("Successfully retrieved HTML content")
return content
except Exception as e:
logger.error(f"Failed to scrape web page: {str(e)}")
# Return empty string on error
return ""
def parse_into_text(self, content: bytes) -> Document:
"""Parse web page content into a Document object.
Args:
content: URL encoded as bytes
Returns:
Document object containing the parsed markdown content
"""
# Decode bytes to get the URL string
url = endecode.decode_bytes(content)
logger.info(f"Scraping web page: {url}")
# Run async scraping in sync context
chtml = asyncio.run(self.scrape(url))
# Extract clean content from HTML using Trafilatura
# Convert to markdown format with metadata, images, tables, and links
md_text = extract(
chtml,
output_format="markdown",
with_metadata=True,
include_images=True,
include_tables=True,
include_links=True,
)
if not md_text:
logger.error("Failed to parse web page")
return Document(content=f"Error parsing web page: {url}")
return Document(content=md_text)
class WebParser(PipelineParser):
"""Web parser using pipeline pattern.
This parser chains StdWebParser (for web scraping and HTML to markdown conversion)
with MarkdownParser (for markdown processing). The pipeline processes content
sequentially through both parsers.
"""
# Parser classes to be executed in sequence
_parser_cls = (StdWebParser, MarkdownParser)
if __name__ == "__main__":
# Configure logging for debugging
logging.basicConfig(level=logging.DEBUG)
logger.setLevel(logging.DEBUG)
# Example URL to scrape
url = "https://cloud.tencent.com/document/product/457/6759"
# Create parser instance and parse the web page
parser = WebParser(title="")
cc = parser.parse_into_text(url.encode())
# Save the parsed markdown content to file
with open("./tencent.md", "w") as f:
f.write(cc.content)

16
ai-core/requirements.txt Normal file
View File

@@ -0,0 +1,16 @@
# AI-Core Document Parser
# gRPC 框架
grpcio>=1.60.0
grpcio-tools>=1.60.0
grpcio-reflection>=1.60.0
protobuf>=4.25.0
# HTTP 请求
requests>=2.31.0
# 配置文件解析
pyyaml>=6.0
# 文档解析
markitdown[pdf,docx,pptx,xlsx,all]>=0.0.1

View File

@@ -0,0 +1,208 @@
"""
gRPC Server for Document Parser
"""
import logging
import requests
from concurrent import futures
import grpc
from grpc_reflection.v1alpha import reflection
import sys
import os
import io
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "proto"))
from parser import Parser
logger = logging.getLogger(__name__)
# 导入 proto 生成的文件
try:
import document_parser_pb2
import document_parser_pb2_grpc
PROTO_AVAILABLE = True
except ImportError:
logger.warning("Proto files not found, please run: python generate_grpc.py")
PROTO_AVAILABLE = False
class DocumentParserServicer:
"""gRPC 服务实现"""
def __init__(self, max_workers: int = 10):
self.parser = Parser()
self.max_workers = max_workers
logger.info("DocumentParserServicer initialized")
def ParseDocument(self, request, context):
"""解析文档"""
if not PROTO_AVAILABLE:
return {"success": False, "message": "Proto not available"}
try:
logger.info(
"ParseDocument request: file_url=%s, file_name=%s",
request.file_url,
request.file_name,
)
file_url = request.file_url
file_name = request.file_name
if not file_url:
return document_parser_pb2.ParseResponse(
success=False,
content="",
message="file_url is required",
content_length=0,
)
if not file_name:
return document_parser_pb2.ParseResponse(
success=False,
content="",
message="file_name is required",
content_length=0,
)
# 提取 VLM 配置
vlm_config = None
if hasattr(request, 'vlm_config') and request.vlm_config:
vlm_cfg = request.vlm_config
if vlm_cfg.enabled:
vlm_config = {
"enabled": vlm_cfg.enabled,
"provider": vlm_cfg.provider,
"model": vlm_cfg.model,
"api_key": vlm_cfg.api_key,
"base_url": vlm_cfg.base_url,
"prompt": vlm_cfg.prompt,
}
logger.info(f"VLM config: provider={vlm_cfg.provider}, model={vlm_cfg.model}")
# 下载文件
logger.info("Downloading file from URL: %s", file_url)
try:
response = requests.get(
file_url,
timeout=60,
headers={"User-Agent": "DocParser/1.0"},
)
response.raise_for_status()
content = response.content
logger.info("Downloaded %d bytes", len(content))
except requests.RequestException as e:
logger.error("Failed to download file: %s", str(e))
return document_parser_pb2.ParseResponse(
success=False,
content="",
message=f"Failed to download file: {str(e)}",
content_length=0,
)
# 解析
logger.info("Parsing file")
file_type = os.path.splitext(file_name)[1][1:] # 去掉点的扩展名
result = self.parser.parse_file(
file_name=file_name,
file_type=file_type,
content=content,
vlm_config=vlm_config,
)
if not result.content:
return document_parser_pb2.ParseResponse(
success=False,
content="",
message="Parse failed or empty content",
content_length=0,
)
markdown_content = result.content
logger.info("Parse successful: content_length=%d", len(markdown_content))
return document_parser_pb2.ParseResponse(
success=True,
content=markdown_content,
message="Parse successful",
content_length=len(markdown_content),
file_type=file_type or "auto",
parser_engine="markitdown",
)
except Exception as e:
logger.error("ParseDocument error: %s", str(e), exc_info=True)
return document_parser_pb2.ParseResponse(
success=False,
content="",
message=f"Parse error: {str(e)}",
content_length=0,
)
def GetSupportedFormats(self, request, context):
"""获取支持的格式"""
if not PROTO_AVAILABLE:
return None
try:
file_types = [
"pdf", "docx", "doc", "pptx", "ppt",
"xlsx", "xls", "csv",
"md", "markdown",
"jpg", "jpeg", "png", "gif", "bmp", "tiff", "webp",
"html", "htm", "txt",
]
return document_parser_pb2.SupportedFormatsResponse(
file_types=file_types,
)
except Exception as e:
logger.error("GetSupportedFormats error: %s", str(e))
return None
def GetEngines(self, request, context):
"""获取解析引擎"""
if not PROTO_AVAILABLE:
return None
try:
engines = [
document_parser_pb2.EngineInfo(
name="markitdown",
description="MarkItDown parser - supports various document formats",
supported_file_types=["pdf", "docx", "pptx", "xlsx", "md", "html", "txt"],
available=True,
)
]
return document_parser_pb2.EnginesResponse(engines=engines)
except Exception as e:
logger.error("GetEngines error: %s", str(e))
return None
def serve(port: int = 50051, max_workers: int = 10):
"""启动 gRPC 服务"""
if not PROTO_AVAILABLE:
logger.error("Proto files not available, cannot start server")
return
server = grpc.server(futures.ThreadPoolExecutor(max_workers=max_workers))
servicer = DocumentParserServicer(max_workers=max_workers)
# 注册服务
document_parser_pb2_grpc.add_DocumentParserServicer_to_server(
servicer, server
)
# 启用反射
reflection.enable_server_reflection(
[document_parser_pb2.DESCRIPTOR.services_by_name['DocumentParser']],
server
)
server.add_insecure_port(f"0.0.0.0:{port}")
server.start()
logger.info(f"DocumentParser gRPC server started on port {port}")
logger.info("gRPC reflection enabled")
server.wait_for_termination()