Files
X-Agents/ai-core/parser/parser_simple.py

276 lines
10 KiB
Python
Raw Normal View History

"""
简化的 Parser - 使用 markitdown + VLM
"""
import logging
import os
import io
import re
import base64
from typing import Optional, Any, Dict
from markitdown import MarkItDown
logger = logging.getLogger(__name__)
class Document:
"""简单的文档对象"""
def __init__(self, content: str = "", chunks: list = None, metadata: dict = None):
self.content = content
self.chunks = chunks or []
self.metadata = metadata or {}
class VLMClient:
"""VLM 客户端"""
def __init__(self, config: Dict[str, Any]):
self.provider = config.get("provider", "openai")
self.model = config.get("model", "gpt-4o")
self.api_key = config.get("api_key", "")
self.base_url = config.get("base_url", "")
self.prompt = config.get("prompt", "") or self._default_prompt()
logger.info(f"VLMClient initialized: provider={self.provider}, model={self.model}")
def _default_prompt(self) -> str:
return """请分析这个文档图片的内容,并将其转换为 Markdown 格式。
要求
1. 保持原文的格式和结构
2. 表格用 Markdown 表格格式
3. 标题用 # ## ### 标记
4. 尽量保留原文的所有信息"""
def analyze_image(self, content: bytes, mime_type: str) -> Dict[str, Any]:
"""分析图片"""
if self.provider == "openai":
return self._call_openai(content, mime_type)
elif self.provider == "anthropic":
return self._call_anthropic(content, mime_type)
elif self.provider == "qwen":
return self._call_qwen(content, mime_type)
else:
return {"success": False, "error": f"Unknown provider: {self.provider}"}
def _call_openai(self, content: bytes, mime_type: str) -> Dict[str, Any]:
try:
import requests
url = (self.base_url or "https://api.openai.com/v1") + "/chat/completions"
image_b64 = base64.b64encode(content).decode("utf-8")
headers = {"Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json"}
payload = {
"model": self.model,
"messages": [{
"role": "user",
"content": [
{"type": "text", "text": self.prompt},
{"type": "image_url", "image_url": {"url": f"data:{mime_type};base64,{image_b64}"}}
]
}],
"max_tokens": 4096
}
resp = requests.post(url, headers=headers, json=payload, timeout=120)
resp.raise_for_status()
result = resp.json()
return {"success": True, "content": result["choices"][0]["message"]["content"]}
except Exception as e:
logger.error(f"OpenAI VLM error: {e}")
return {"success": False, "error": str(e)}
def _call_anthropic(self, content: bytes, mime_type: str) -> Dict[str, Any]:
try:
import requests
url = (self.base_url or "https://api.anthropic.com/v1") + "/messages"
image_b64 = base64.b64encode(content).decode("utf-8")
headers = {
"x-api-key": self.api_key,
"anthropic-version": "2023-06-01",
"Content-Type": "application/json"
}
payload = {
"model": self.model,
"max_tokens": 4096,
"messages": [{
"role": "user",
"content": [
{"type": "text", "text": self.prompt},
{"type": "image", "source": {"type": "base64", "media_type": mime_type, "data": image_b64}}
]
}]
}
resp = requests.post(url, headers=headers, json=payload, timeout=120)
resp.raise_for_status()
result = resp.json()
return {"success": True, "content": result["content"][0]["text"]}
except Exception as e:
logger.error(f"Anthropic VLM error: {e}")
return {"success": False, "error": str(e)}
def _call_qwen(self, content: bytes, mime_type: str) -> Dict[str, Any]:
try:
import requests
url = (self.base_url or "https://dashscope.aliyuncs.com/compatible-mode/v1") + "/chat/completions"
image_b64 = base64.b64encode(content).decode("utf-8")
headers = {"Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json"}
payload = {
"model": self.model,
"messages": [{
"role": "user",
"content": [
{"type": "text", "text": self.prompt},
{"type": "image_url", "image_url": {"url": f"data:{mime_type};base64,{image_b64}"}}
]
}]
}
resp = requests.post(url, headers=headers, json=payload, timeout=120)
resp.raise_for_status()
result = resp.json()
return {"success": True, "content": result["choices"][0]["message"]["content"]}
except Exception as e:
logger.error(f"Qwen VLM error: {e}")
return {"success": False, "error": str(e)}
class Parser:
"""基于 MarkItDown + VLM 的文档解析器"""
def __init__(self):
self.markitdown = MarkItDown()
self.vlm_client: Optional[VLMClient] = None
logger.info("Parser initialized with MarkItDown")
def set_vlm_config(self, config: Dict[str, Any]) -> None:
"""设置 VLM 配置"""
if config and config.get("enabled") and config.get("api_key"):
self.vlm_client = VLMClient(config)
logger.info(f"VLM enabled: provider={config.get('provider')}, model={config.get('model')}")
else:
self.vlm_client = None
def _should_use_vlm(self, file_name: str) -> bool:
"""判断是否应该使用 VLM"""
if not self.vlm_client:
return False
ext = os.path.splitext(file_name)[1].lower()
# 图片和 PDF 都使用 VLM
image_exts = ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp', '.tiff']
return ext in image_exts or ext == '.pdf'
def _process_images_with_vlm(self, content: str) -> str:
"""处理 Markdown 内容中的图片"""
# 匹配 data:image 开头的 Base64 图片
pattern = r'!\[([^\]]*)\]\((data:image/([^;]+);base64,([A-Za-z0-9+/=]+))\)'
def replace_image(match):
alt_text = match.group(1)
data_url = match.group(2)
mime_type = match.group(3) or "image/png"
base64_data = match.group(4)
try:
image_bytes = base64.b64decode(base64_data)
logger.info(f"Processing image with VLM: {alt_text or 'unnamed'}")
vlm_result = self.vlm_client.analyze_image(image_bytes, mime_type)
if vlm_result.get("success"):
vlm_content = vlm_result.get("content", "")
logger.info(f"VLM processed image, content length: {len(vlm_content)}")
return f"<!-- Image: {alt_text} -->\n{vlm_content}\n<!-- End Image -->"
else:
logger.warning(f"VLM failed: {vlm_result.get('error')}")
return match.group(0)
except Exception as e:
logger.error(f"VLM error: {e}")
return match.group(0)
return re.sub(pattern, replace_image, content)
def _parse_with_vlm(self, content: bytes, file_name: str) -> Document:
"""使用 VLM 直接解析整个文件"""
ext = os.path.splitext(file_name)[1].lower()
mime_types = {
'.jpg': 'image/jpeg', '.jpeg': 'image/jpeg', '.png': 'image/png',
'.gif': 'image/gif', '.bmp': 'image/bmp', '.webp': 'image/webp',
'.tiff': 'image/tiff', '.pdf': 'application/pdf',
}
mime_type = mime_types.get(ext, 'image/png')
result = self.vlm_client.analyze_image(content, mime_type)
if result.get("success"):
return Document(content=result["content"], metadata={"vlm": True})
else:
logger.error(f"VLM failed: {result.get('error')}")
return Document(content="")
def parse_file(
self,
file_name: str,
file_type: str,
content: bytes,
parser_engine: Optional[str] = None,
engine_overrides: Optional[dict[str, Any]] = None,
vlm_config: Optional[dict[str, Any]] = None,
) -> Document:
"""解析文件内容"""
logger.info(f"Parsing file: {file_name}, type: {file_type}, vlm_config={'enabled' if vlm_config and vlm_config.get('enabled') else 'none'}")
# 设置 VLM 配置
if vlm_config and vlm_config.get("enabled"):
self.set_vlm_config(vlm_config)
# 判断是否使用 VLM 直接解析
if self._should_use_vlm(file_name):
logger.info(f"Using VLM for {file_name}")
return self._parse_with_vlm(content, file_name)
# 使用 MarkItDown 解析
try:
ext = file_type
if not ext.startswith('.'):
ext = '.' + ext
result = self.markitdown.convert(
io.BytesIO(content),
file_extension=ext,
keep_data_uris=True
)
markdown_content = result.text_content or ""
# 如果有 VLM处理图片
if self.vlm_client and markdown_content:
markdown_content = self._process_images_with_vlm(markdown_content)
return Document(
content=markdown_content,
metadata=result.metadata if hasattr(result, 'metadata') else {}
)
except Exception as e:
logger.error(f"Parse error: {e}")
return Document(content="")
def parse_url(
self,
url: str,
title: str,
parser_engine: Optional[str] = None,
engine_overrides: Optional[dict[str, Any]] = None,
) -> Document:
"""解析 URL"""
logger.info(f"Parsing URL: {url}, title: {title}")
try:
result = self.markitdown.convert(url)
return Document(content=result.text_content or "")
except Exception as e:
logger.error(f"URL parse error: {e}")
return Document(content="")
# 导出
__all__ = ["Parser", "Document"]