""" 简化的 Parser - 使用 markitdown + VLM """ import logging import os import io import re import base64 from typing import Optional, Any, Dict from markitdown import MarkItDown logger = logging.getLogger(__name__) class Document: """简单的文档对象""" def __init__(self, content: str = "", chunks: list = None, metadata: dict = None): self.content = content self.chunks = chunks or [] self.metadata = metadata or {} class VLMClient: """VLM 客户端""" def __init__(self, config: Dict[str, Any]): self.provider = config.get("provider", "openai") self.model = config.get("model", "gpt-4o") self.api_key = config.get("api_key", "") self.base_url = config.get("base_url", "") self.prompt = config.get("prompt", "") or self._default_prompt() logger.info(f"VLMClient initialized: provider={self.provider}, model={self.model}") def _default_prompt(self) -> str: return """请分析这个文档图片的内容,并将其转换为 Markdown 格式。 要求: 1. 保持原文的格式和结构 2. 表格用 Markdown 表格格式 3. 标题用 # ## ### 标记 4. 尽量保留原文的所有信息""" def analyze_image(self, content: bytes, mime_type: str) -> Dict[str, Any]: """分析图片""" if self.provider == "openai": return self._call_openai(content, mime_type) elif self.provider == "anthropic": return self._call_anthropic(content, mime_type) elif self.provider == "qwen": return self._call_qwen(content, mime_type) else: return {"success": False, "error": f"Unknown provider: {self.provider}"} def _call_openai(self, content: bytes, mime_type: str) -> Dict[str, Any]: try: import requests url = (self.base_url or "https://api.openai.com/v1") + "/chat/completions" image_b64 = base64.b64encode(content).decode("utf-8") headers = {"Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json"} payload = { "model": self.model, "messages": [{ "role": "user", "content": [ {"type": "text", "text": self.prompt}, {"type": "image_url", "image_url": {"url": f"data:{mime_type};base64,{image_b64}"}} ] }], "max_tokens": 4096 } resp = requests.post(url, headers=headers, json=payload, timeout=120) resp.raise_for_status() result = resp.json() return {"success": True, "content": result["choices"][0]["message"]["content"]} except Exception as e: logger.error(f"OpenAI VLM error: {e}") return {"success": False, "error": str(e)} def _call_anthropic(self, content: bytes, mime_type: str) -> Dict[str, Any]: try: import requests url = (self.base_url or "https://api.anthropic.com/v1") + "/messages" image_b64 = base64.b64encode(content).decode("utf-8") headers = { "x-api-key": self.api_key, "anthropic-version": "2023-06-01", "Content-Type": "application/json" } payload = { "model": self.model, "max_tokens": 4096, "messages": [{ "role": "user", "content": [ {"type": "text", "text": self.prompt}, {"type": "image", "source": {"type": "base64", "media_type": mime_type, "data": image_b64}} ] }] } resp = requests.post(url, headers=headers, json=payload, timeout=120) resp.raise_for_status() result = resp.json() return {"success": True, "content": result["content"][0]["text"]} except Exception as e: logger.error(f"Anthropic VLM error: {e}") return {"success": False, "error": str(e)} def _call_qwen(self, content: bytes, mime_type: str) -> Dict[str, Any]: try: import requests url = (self.base_url or "https://dashscope.aliyuncs.com/compatible-mode/v1") + "/chat/completions" image_b64 = base64.b64encode(content).decode("utf-8") headers = {"Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json"} payload = { "model": self.model, "messages": [{ "role": "user", "content": [ {"type": "text", "text": self.prompt}, {"type": "image_url", "image_url": {"url": f"data:{mime_type};base64,{image_b64}"}} ] }] } resp = requests.post(url, headers=headers, json=payload, timeout=120) resp.raise_for_status() result = resp.json() return {"success": True, "content": result["choices"][0]["message"]["content"]} except Exception as e: logger.error(f"Qwen VLM error: {e}") return {"success": False, "error": str(e)} class Parser: """基于 MarkItDown + VLM 的文档解析器""" def __init__(self): self.markitdown = MarkItDown() self.vlm_client: Optional[VLMClient] = None logger.info("Parser initialized with MarkItDown") def set_vlm_config(self, config: Dict[str, Any]) -> None: """设置 VLM 配置""" if config and config.get("enabled") and config.get("api_key"): self.vlm_client = VLMClient(config) logger.info(f"VLM enabled: provider={config.get('provider')}, model={config.get('model')}") else: self.vlm_client = None def _should_use_vlm(self, file_name: str) -> bool: """判断是否应该使用 VLM""" if not self.vlm_client: return False ext = os.path.splitext(file_name)[1].lower() # 图片和 PDF 都使用 VLM image_exts = ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp', '.tiff'] return ext in image_exts or ext == '.pdf' def _process_images_with_vlm(self, content: str) -> str: """处理 Markdown 内容中的图片""" # 匹配 data:image 开头的 Base64 图片 pattern = r'!\[([^\]]*)\]\((data:image/([^;]+);base64,([A-Za-z0-9+/=]+))\)' def replace_image(match): alt_text = match.group(1) data_url = match.group(2) mime_type = match.group(3) or "image/png" base64_data = match.group(4) try: image_bytes = base64.b64decode(base64_data) logger.info(f"Processing image with VLM: {alt_text or 'unnamed'}") vlm_result = self.vlm_client.analyze_image(image_bytes, mime_type) if vlm_result.get("success"): vlm_content = vlm_result.get("content", "") logger.info(f"VLM processed image, content length: {len(vlm_content)}") return f"\n{vlm_content}\n" else: logger.warning(f"VLM failed: {vlm_result.get('error')}") return match.group(0) except Exception as e: logger.error(f"VLM error: {e}") return match.group(0) return re.sub(pattern, replace_image, content) def _parse_with_vlm(self, content: bytes, file_name: str) -> Document: """使用 VLM 直接解析整个文件""" ext = os.path.splitext(file_name)[1].lower() mime_types = { '.jpg': 'image/jpeg', '.jpeg': 'image/jpeg', '.png': 'image/png', '.gif': 'image/gif', '.bmp': 'image/bmp', '.webp': 'image/webp', '.tiff': 'image/tiff', '.pdf': 'application/pdf', } mime_type = mime_types.get(ext, 'image/png') result = self.vlm_client.analyze_image(content, mime_type) if result.get("success"): return Document(content=result["content"], metadata={"vlm": True}) else: logger.error(f"VLM failed: {result.get('error')}") return Document(content="") def parse_file( self, file_name: str, file_type: str, content: bytes, parser_engine: Optional[str] = None, engine_overrides: Optional[dict[str, Any]] = None, vlm_config: Optional[dict[str, Any]] = None, ) -> Document: """解析文件内容""" logger.info(f"Parsing file: {file_name}, type: {file_type}, vlm_config={'enabled' if vlm_config and vlm_config.get('enabled') else 'none'}") # 设置 VLM 配置 if vlm_config and vlm_config.get("enabled"): self.set_vlm_config(vlm_config) # 判断是否使用 VLM 直接解析 if self._should_use_vlm(file_name): logger.info(f"Using VLM for {file_name}") return self._parse_with_vlm(content, file_name) # 使用 MarkItDown 解析 try: ext = file_type if not ext.startswith('.'): ext = '.' + ext result = self.markitdown.convert( io.BytesIO(content), file_extension=ext, keep_data_uris=True ) markdown_content = result.text_content or "" # 如果有 VLM,处理图片 if self.vlm_client and markdown_content: markdown_content = self._process_images_with_vlm(markdown_content) return Document( content=markdown_content, metadata=result.metadata if hasattr(result, 'metadata') else {} ) except Exception as e: logger.error(f"Parse error: {e}") return Document(content="") def parse_url( self, url: str, title: str, parser_engine: Optional[str] = None, engine_overrides: Optional[dict[str, Any]] = None, ) -> Document: """解析 URL""" logger.info(f"Parsing URL: {url}, title: {title}") try: result = self.markitdown.convert(url) return Document(content=result.text_content or "") except Exception as e: logger.error(f"URL parse error: {e}") return Document(content="") # 导出 __all__ = ["Parser", "Document"]