Files
X-Agents/ai-core/parser/parser.py

101 lines
3.2 KiB
Python
Raw Normal View History

import logging
import os
import tempfile
from typing import Optional
from markitdown import MarkItDown
logger = logging.getLogger(__name__)
class Parser:
"""基于 MarkItDown 的统一文档解析器
支持格式PDFDOCXDOCPPTXPPTXLSXXLSCSV图片网页Markdown
"""
def __init__(self):
self.markitdown = MarkItDown()
logger.info("Parser initialized with MarkItDown")
def parse(self, file_path: str, file_type: Optional[str] = None) -> dict:
"""解析文档为 Markdown
Args:
file_path: 文件路径或 URL
file_type: 文件类型可选MarkItDown 会自动检测
Returns:
dict: 包含 markdown 内容和元数据
"""
try:
logger.info(f"Parsing file: {file_path}")
result = self.markitdown.convert(file_path)
logger.info(f"Parse successful: {len(result.text_content)} characters")
return {
"success": True,
"content": result.text_content,
"content_length": len(result.text_content),
"metadata": result.metadata if hasattr(result, 'metadata') else {}
}
except Exception as e:
logger.error(f"Parse error: {e}", exc_info=True)
return {
"success": False,
"content": "",
"content_length": 0,
"error": str(e)
}
def parse_bytes(self, content: bytes, file_name: str, file_type: Optional[str] = None) -> dict:
"""解析字节内容为 Markdown
Args:
content: 文件字节内容
file_name: 文件名
file_type: 文件类型可选
Returns:
dict: 包含 markdown 内容和元数据
"""
try:
logger.info(f"Parsing bytes: {file_name}, size: {len(content)} bytes")
with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(file_name)[1] or '') as temp_file:
temp_file.write(content)
temp_path = temp_file.name
try:
result = self.markitdown.convert(temp_path)
logger.info(f"Parse successful: {len(result.text_content)} characters")
return {
"success": True,
"content": result.text_content,
"content_length": len(result.text_content),
"metadata": result.metadata if hasattr(result, 'metadata') else {}
}
finally:
os.unlink(temp_path)
except Exception as e:
logger.error(f"Parse bytes error: {e}", exc_info=True)
return {
"success": False,
"content": "",
"content_length": 0,
"error": str(e)
}
if __name__ == "__main__":
parser = Parser()
# 测试
test_url = "https://example.com"
result = parser.parse(test_url)
print(f"Success: {result['success']}")
print(f"Content length: {result['content_length']}")