feat: 增强 AI-Core 文档解析器

- 添加 VLM 客户端支持
- 优化解析器配置
- 添加配置示例文件
- 生成新的 gRPC protobuf 文件

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-09 15:42:35 +08:00
parent ab7131eb05
commit 5012a25f99
10 changed files with 1177 additions and 42 deletions

84
ai-core/parser/config.py Normal file
View File

@@ -0,0 +1,84 @@
"""
配置管理模块
"""
import os
import yaml
import logging
from typing import Optional, Dict, Any
logger = logging.getLogger(__name__)
# 默认配置
DEFAULT_CONFIG = {
"vlm": {
"enabled": False,
"provider": "openai",
"model": "gpt-4o",
"api_key": "",
"base_url": "",
"prompt": ""
},
"server": {
"port": 50051,
"max_workers": 10,
"log_level": "INFO"
}
}
def load_config(config_path: Optional[str] = None) -> Dict[str, Any]:
"""加载配置文件"""
if config_path is None:
# 默认查找 config.yaml
base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
config_path = os.path.join(base_dir, "config.yaml")
# 环境变量覆盖
vlm_api_key = os.environ.get("VLM_API_KEY", "")
if vlm_api_key:
DEFAULT_CONFIG["vlm"]["api_key"] = vlm_api_key
DEFAULT_CONFIG["vlm"]["enabled"] = True
logger.info("VLM enabled via environment variable")
vlm_provider = os.environ.get("VLM_PROVIDER", "")
if vlm_provider:
DEFAULT_CONFIG["vlm"]["provider"] = vlm_provider
vlm_model = os.environ.get("VLM_MODEL", "")
if vlm_model:
DEFAULT_CONFIG["vlm"]["model"] = vlm_model
# 尝试加载配置文件
if os.path.exists(config_path):
try:
with open(config_path, 'r', encoding='utf-8') as f:
file_config = yaml.safe_load(f)
if file_config:
# 合并配置
for key in file_config:
if key in DEFAULT_CONFIG:
DEFAULT_CONFIG[key].update(file_config[key])
logger.info(f"Loaded config from {config_path}")
except Exception as e:
logger.warning(f"Failed to load config: {e}")
# 检查 VLM 是否有效
if DEFAULT_CONFIG["vlm"]["enabled"] and not DEFAULT_CONFIG["vlm"]["api_key"]:
logger.warning("VLM enabled but API key is empty, disabling VLM")
DEFAULT_CONFIG["vlm"]["enabled"] = False
return DEFAULT_CONFIG
def get_vlm_config() -> Optional[Dict[str, Any]]:
"""获取 VLM 配置"""
config = load_config()
if config.get("vlm", {}).get("enabled") and config["vlm"].get("api_key"):
return config["vlm"]
return None
def get_server_config() -> Dict[str, Any]:
"""获取服务器配置"""
config = load_config()
return config.get("server", DEFAULT_CONFIG["server"])

View File

@@ -1,39 +1,68 @@
import logging
import os
import tempfile
from typing import Optional
from typing import Optional, Dict, Any
from markitdown import MarkItDown
from .vlm_client import VLMClient
from .config import get_vlm_config
logger = logging.getLogger(__name__)
class Parser:
"""基于 MarkItDown 的统一文档解析器
"""基于 MarkItDown + VLM 的统一文档解析器
支持格式PDF、DOCX、DOC、PPTX、PPT、XLSX、XLS、CSV、图片、网页、Markdown 等
VLM 解析:
- 方式一启动时配置config.yaml 或环境变量)
- 方式二gRPC 请求时传入 VLM 配置(优先级更高)
"""
def __init__(self):
self.markitdown = MarkItDown()
logger.info("Parser initialized with MarkItDown")
self.vlm_client: Optional[VLMClient] = None
def parse(self, file_path: str, file_type: Optional[str] = None) -> dict:
# 尝试加载配置的 VLM
vlm_config = get_vlm_config()
if vlm_config:
self.vlm_client = VLMClient(vlm_config)
logger.info(f"VLM enabled: provider={vlm_config.get('provider')}, model={vlm_config.get('model')}")
else:
logger.info("VLM not configured, using MarkItDown only")
def set_vlm_config(self, config: Dict[str, Any]) -> None:
"""手动设置 VLM 配置(优先级高于全局配置)"""
if config and config.get("enabled") and config.get("api_key"):
self.vlm_client = VLMClient(config)
logger.info(f"VLM enabled: provider={config.get('provider')}, model={config.get('model')}")
else:
self.vlm_client = None
logger.info("VLM disabled")
def parse(self, file_path: str, file_type: Optional[str] = None, vlm_config: Optional[Dict[str, Any]] = None) -> dict:
"""解析文档为 Markdown
Args:
file_path: 文件路径或 URL
file_type: 文件类型可选MarkItDown 会自动检测)
vlm_config: VLM 配置(可选,优先级高于全局配置)
Returns:
dict: 包含 markdown 内容和元数据
"""
# 如果有 VLM 配置,覆盖全局配置
if vlm_config:
self.set_vlm_config(vlm_config)
try:
logger.info(f"Parsing file: {file_path}")
result = self.markitdown.convert(file_path)
logger.info(f"Parse successful: {len(result.text_content)} characters")
return {
"success": True,
"content": result.text_content,
@@ -49,29 +78,40 @@ class Parser:
"error": str(e)
}
def parse_bytes(self, content: bytes, file_name: str, file_type: Optional[str] = None) -> dict:
def parse_bytes(self, content: bytes, file_name: str, file_type: Optional[str] = None, vlm_config: Optional[Dict[str, Any]] = None) -> dict:
"""解析字节内容为 Markdown
Args:
content: 文件字节内容
file_name: 文件名
file_type: 文件类型(可选)
vlm_config: VLM 配置(可选,优先级高于全局配置)
Returns:
dict: 包含 markdown 内容和元数据
"""
# 如果有 VLM 配置,覆盖全局配置
if vlm_config:
self.set_vlm_config(vlm_config)
try:
logger.info(f"Parsing bytes: {file_name}, size: {len(content)} bytes")
# 检查是否应该使用 VLM根据文件名自动判断
if self._should_use_vlm(file_name):
logger.info("Using VLM for parsing")
return self._parse_with_vlm(content, file_name)
# 否则使用 MarkItDown
with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(file_name)[1] or '') as temp_file:
temp_file.write(content)
temp_path = temp_file.name
try:
result = self.markitdown.convert(temp_path)
logger.info(f"Parse successful: {len(result.text_content)} characters")
return {
"success": True,
"content": result.text_content,
@@ -89,10 +129,69 @@ class Parser:
"error": str(e)
}
def _should_use_vlm(self, file_name: str) -> bool:
"""判断是否应该使用 VLM"""
if not self.vlm_client:
return False
# 图片文件使用 VLM
image_exts = ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp', '.tiff']
ext = os.path.splitext(file_name)[1].lower()
return ext in image_exts
def _parse_with_vlm(self, content: bytes, file_name: str) -> dict:
"""使用 VLM 解析"""
if not self.vlm_client:
return {
"success": False,
"content": "",
"content_length": 0,
"error": "VLM not configured"
}
# 确定 MIME 类型
ext = os.path.splitext(file_name)[1].lower()
mime_types = {
'.jpg': 'image/jpeg',
'.jpeg': 'image/jpeg',
'.png': 'image/png',
'.gif': 'image/gif',
'.bmp': 'image/bmp',
'.webp': 'image/webp',
'.tiff': 'image/tiff',
}
mime_type = mime_types.get(ext, 'image/png')
try:
result = self.vlm_client.analyze_image(content, mime_type)
if result.get("success"):
return {
"success": True,
"content": result["content"],
"content_length": len(result["content"]),
"metadata": {"vlm_used": True}
}
else:
return {
"success": False,
"content": "",
"content_length": 0,
"error": result.get("error", "VLM parsing failed")
}
except Exception as e:
logger.error(f"VLM parsing error: {e}")
return {
"success": False,
"content": "",
"content_length": 0,
"error": str(e)
}
if __name__ == "__main__":
parser = Parser()
# 测试
test_url = "https://example.com"
result = parser.parse(test_url)

View File

@@ -0,0 +1,209 @@
"""
VLM 客户端 - 用于调用 VLM 模型进行文档理解
"""
import logging
import base64
import requests
from typing import Optional, Dict, Any
logger = logging.getLogger(__name__)
class VLMClient:
"""VLM 客户端,支持多种提供商"""
def __init__(self, config: Dict[str, Any]):
"""
初始化 VLM 客户端
Args:
config: VLM 配置,包含 provider, model, api_key, base_url, prompt 等
"""
self.config = config
self.provider = config.get("provider", "openai")
self.model = config.get("model", "gpt-4o")
self.api_key = config.get("api_key", "")
self.base_url = config.get("base_url", "")
self.prompt = config.get("prompt", "") or self._default_prompt()
logger.info(f"VLMClient initialized: provider={self.provider}, model={self.model}")
def _default_prompt(self) -> str:
"""默认提示词"""
return """请分析这张图片中的文档内容,并将其转换为 Markdown 格式。
要求:
1. 保持原文的格式和结构
2. 表格用 Markdown 表格格式
3. 标题用 # ## ### 标记
4. 代码块用 ``` 标记
5. 尽量保留原文的所有信息"""
def analyze_image(self, image_data: bytes, mime_type: str = "image/png") -> Dict[str, Any]:
"""
使用 VLM 分析图片
Args:
image_data: 图片二进制数据
mime_type: 图片 MIME 类型
Returns:
包含分析结果的字典
"""
if self.provider == "openai":
return self._call_openai(image_data, mime_type)
elif self.provider == "anthropic":
return self._call_anthropic(image_data, mime_type)
elif self.provider == "qwen":
return self._call_qwen(image_data, mime_type)
else:
return {
"success": False,
"content": "",
"error": f"Unsupported provider: {self.provider}"
}
def _call_openai(self, image_data: bytes, mime_type: str) -> Dict[str, Any]:
"""调用 OpenAI GPT-4o API"""
try:
url = (self.base_url or "https://api.openai.com/v1") + "/chat/completions"
# Base64 编码图片
image_base64 = base64.b64encode(image_data).decode("utf-8")
data_url = f"data:{mime_type};base64,{image_base64}"
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
payload = {
"model": self.model,
"messages": [
{
"role": "user",
"content": [
{"type": "text", "text": self.prompt},
{"type": "image_url", "image_url": {"url": data_url}}
]
}
],
"max_tokens": 4096
}
response = requests.post(url, headers=headers, json=payload, timeout=120)
response.raise_for_status()
result = response.json()
content = result["choices"][0]["message"]["content"]
return {
"success": True,
"content": content,
"usage": result.get("usage", {})
}
except Exception as e:
logger.error(f"OpenAI API error: {e}")
return {
"success": False,
"content": "",
"error": str(e)
}
def _call_anthropic(self, image_data: bytes, mime_type: str) -> Dict[str, Any]:
"""调用 Anthropic Claude API"""
try:
url = (self.base_url or "https://api.anthropic.com/v1") + "/messages"
image_base64 = base64.b64encode(image_data).decode("utf-8")
headers = {
"x-api-key": self.api_key,
"anthropic-version": "2023-06-01",
"Content-Type": "application/json"
}
# Anthropic 支持 image 类型
payload = {
"model": self.model,
"max_tokens": 4096,
"messages": [
{
"role": "user",
"content": [
{"type": "text", "text": self.prompt},
{
"type": "image",
"source": {
"type": "base64",
"media_type": mime_type,
"data": image_base64
}
}
]
}
]
}
response = requests.post(url, headers=headers, json=payload, timeout=120)
response.raise_for_status()
result = response.json()
content = result["content"][0]["text"]
return {
"success": True,
"content": content,
"usage": result.get("usage", {})
}
except Exception as e:
logger.error(f"Anthropic API error: {e}")
return {
"success": False,
"content": "",
"error": str(e)
}
def _call_qwen(self, image_data: bytes, mime_type: str) -> Dict[str, Any]:
"""调用阿里 Qwen VL API"""
try:
url = (self.base_url or "https://dashscope.aliyuncs.com/compatible-mode/v1") + "/chat/completions"
image_base64 = base64.b64encode(image_data).decode("utf-8")
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
# Qwen 格式
payload = {
"model": self.model,
"messages": [
{
"role": "user",
"content": [
{"type": "text", "text": self.prompt},
{"type": "image_url", "image_url": {"url": f"data:{mime_type};base64,{image_base64}"}}
]
}
]
}
response = requests.post(url, headers=headers, json=payload, timeout=120)
response.raise_for_status()
result = response.json()
content = result["choices"][0]["message"]["content"]
return {
"success": True,
"content": content,
"usage": {}
}
except Exception as e:
logger.error(f"Qwen API error: {e}")
return {
"success": False,
"content": "",
"error": str(e)
}