2025-12-18 16:16:12 +08:00
|
|
|
|
#!/usr/bin/env python3
|
|
|
|
|
|
# -*- coding: utf-8 -*-
|
|
|
|
|
|
"""
|
|
|
|
|
|
Excel转JSON一体化工具
|
|
|
|
|
|
功能:读取Excel文件 -> 转换为CSV -> 转换为JSON
|
|
|
|
|
|
支持多种Excel读取方式,自动处理复杂格式
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
import pandas as pd
|
|
|
|
|
|
import json
|
|
|
|
|
|
import os
|
|
|
|
|
|
import glob
|
|
|
|
|
|
import subprocess
|
|
|
|
|
|
import xlwings as xw
|
|
|
|
|
|
from datetime import datetime
|
|
|
|
|
|
from typing import Optional, Dict, List, Tuple
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ExcelToJsonConverter:
|
|
|
|
|
|
"""Excel转JSON转换器"""
|
|
|
|
|
|
|
|
|
|
|
|
def __init__(self, input_dir: str, output_dir: str):
|
|
|
|
|
|
"""
|
|
|
|
|
|
初始化转换器
|
|
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
|
input_dir: Excel文件输入目录
|
|
|
|
|
|
output_dir: JSON文件输出目录
|
|
|
|
|
|
"""
|
|
|
|
|
|
self.input_dir = input_dir
|
|
|
|
|
|
self.output_dir = output_dir
|
|
|
|
|
|
|
|
|
|
|
|
# 确保输出目录存在
|
|
|
|
|
|
if not os.path.exists(output_dir):
|
|
|
|
|
|
os.makedirs(output_dir)
|
|
|
|
|
|
|
2025-12-19 15:14:46 +08:00
|
|
|
|
# CSV输出目录 - 在输入目录的同级创建CSV目录
|
|
|
|
|
|
parent_dir = os.path.dirname(input_dir)
|
|
|
|
|
|
self.csv_output_dir = os.path.join(parent_dir, "Data_Export_CSV")
|
|
|
|
|
|
if not os.path.exists(self.csv_output_dir):
|
|
|
|
|
|
os.makedirs(self.csv_output_dir)
|
2025-12-18 16:16:12 +08:00
|
|
|
|
|
|
|
|
|
|
def find_excel_files(self) -> List[Tuple[str, str]]:
|
|
|
|
|
|
"""扫描目录下的所有Excel文件"""
|
|
|
|
|
|
excel_files = []
|
|
|
|
|
|
search_pattern = os.path.join(self.input_dir, "*.xlsx")
|
|
|
|
|
|
|
|
|
|
|
|
for excel_path in glob.glob(search_pattern):
|
|
|
|
|
|
filename = os.path.basename(excel_path)
|
|
|
|
|
|
|
|
|
|
|
|
# 跳过临时文件(Excel的临时文件以~$开头)
|
|
|
|
|
|
if filename.startswith('~$'):
|
|
|
|
|
|
print(f"[SKIP] 跳过临时文件: {filename}")
|
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
|
|
# 生成基础文件名(不含扩展名)
|
|
|
|
|
|
base_name = filename.replace('.xlsx', '')
|
|
|
|
|
|
excel_files.append((excel_path, base_name))
|
|
|
|
|
|
|
|
|
|
|
|
return excel_files
|
|
|
|
|
|
|
|
|
|
|
|
def read_excel_with_xlwings(self, excel_path: str) -> Optional[pd.DataFrame]:
|
|
|
|
|
|
"""使用xlwings读取Excel文件"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
print(f" [TRY] 使用xlwings读取...")
|
|
|
|
|
|
app = xw.App(visible=False)
|
|
|
|
|
|
wb = app.books.open(excel_path)
|
|
|
|
|
|
sheet = wb.sheets[0]
|
|
|
|
|
|
|
|
|
|
|
|
# 读取数据
|
|
|
|
|
|
data = sheet.range('A1').expand().value
|
|
|
|
|
|
wb.close()
|
|
|
|
|
|
app.quit()
|
|
|
|
|
|
|
|
|
|
|
|
# 转换为DataFrame
|
|
|
|
|
|
if data and len(data) > 0:
|
|
|
|
|
|
if isinstance(data[0], list):
|
|
|
|
|
|
# 标准表格格式
|
|
|
|
|
|
headers = data[0]
|
|
|
|
|
|
rows = data[1:] if len(data) > 1 else []
|
|
|
|
|
|
df = pd.DataFrame(rows, columns=headers)
|
|
|
|
|
|
else:
|
|
|
|
|
|
# 每行只有一个值的特殊格式
|
|
|
|
|
|
df = pd.DataFrame(data, columns=['内容'])
|
|
|
|
|
|
return df
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
except ImportError:
|
|
|
|
|
|
print(f" [WARN] xlwings未安装")
|
|
|
|
|
|
return None
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
print(f" [WARN] xlwings读取失败: {str(e)[:100]}")
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
def read_excel_with_libreoffice(self, excel_path: str) -> Optional[pd.DataFrame]:
|
|
|
|
|
|
"""使用LibreOffice转换后读取"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
print(f" [TRY] 使用LibreOffice转换...")
|
|
|
|
|
|
# 输出CSV路径
|
|
|
|
|
|
csv_path = excel_path.replace('.xlsx', '_temp.csv')
|
|
|
|
|
|
|
|
|
|
|
|
# 使用LibreOffice转换
|
|
|
|
|
|
cmd = [
|
|
|
|
|
|
'libreoffice',
|
|
|
|
|
|
'--headless',
|
|
|
|
|
|
'--convert-to', 'csv',
|
|
|
|
|
|
'--outdir', os.path.dirname(excel_path),
|
|
|
|
|
|
excel_path
|
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
|
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
|
|
|
|
|
|
|
|
|
|
|
|
if os.path.exists(csv_path):
|
|
|
|
|
|
df = pd.read_csv(csv_path, encoding='utf-8')
|
|
|
|
|
|
# 删除临时文件
|
|
|
|
|
|
os.remove(csv_path)
|
|
|
|
|
|
print(f" [OK] LibreOffice转换成功")
|
|
|
|
|
|
return df
|
|
|
|
|
|
else:
|
|
|
|
|
|
print(f" [WARN] LibreOffice转换失败")
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
except FileNotFoundError:
|
|
|
|
|
|
print(f" [WARN] LibreOffice未安装")
|
|
|
|
|
|
return None
|
|
|
|
|
|
except subprocess.TimeoutExpired:
|
|
|
|
|
|
print(f" [WARN] LibreOffice转换超时")
|
|
|
|
|
|
return None
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
print(f" [WARN] LibreOffice转换失败: {e}")
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
def read_excel_with_pandas(self, excel_path: str) -> Optional[pd.DataFrame]:
|
|
|
|
|
|
"""使用pandas读取Excel文件"""
|
|
|
|
|
|
engines = ['openpyxl', 'xlrd']
|
|
|
|
|
|
|
|
|
|
|
|
for engine in engines:
|
|
|
|
|
|
try:
|
|
|
|
|
|
print(f" [TRY] 使用pandas ({engine})读取...")
|
|
|
|
|
|
df = pd.read_excel(excel_path, engine=engine)
|
|
|
|
|
|
print(f" [OK] pandas ({engine}) 读取成功")
|
|
|
|
|
|
return df
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
print(f" [WARN] pandas ({engine}) 失败: {str(e)[:100]}")
|
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
def read_excel_file(self, excel_path: str) -> Optional[pd.DataFrame]:
|
|
|
|
|
|
"""
|
|
|
|
|
|
尝试多种方法读取Excel文件
|
|
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
|
excel_path: Excel文件路径
|
|
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
|
DataFrame或None
|
|
|
|
|
|
"""
|
|
|
|
|
|
print(f"\n[INFO] 读取文件: {os.path.basename(excel_path)}")
|
|
|
|
|
|
|
|
|
|
|
|
# 按优先级尝试读取方法
|
|
|
|
|
|
methods = [
|
|
|
|
|
|
("xlwings", self.read_excel_with_xlwings),
|
|
|
|
|
|
("pandas-openpyxl", lambda p: self.read_excel_with_pandas(p) if 'openpyxl' in str(p) else None),
|
|
|
|
|
|
("LibreOffice", self.read_excel_with_libreoffice),
|
|
|
|
|
|
("pandas-xlrd", self.read_excel_with_pandas),
|
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
|
|
for method_name, method_func in methods:
|
|
|
|
|
|
try:
|
|
|
|
|
|
if method_name == "pandas-openpyxl":
|
|
|
|
|
|
# 特殊处理pandas-openpyxl
|
|
|
|
|
|
df = self.read_excel_with_pandas(excel_path)
|
|
|
|
|
|
elif method_name == "pandas-xlrd":
|
|
|
|
|
|
# 跳过,因为上面已经尝试过了
|
|
|
|
|
|
continue
|
|
|
|
|
|
else:
|
|
|
|
|
|
df = method_func(excel_path)
|
|
|
|
|
|
|
|
|
|
|
|
if df is not None and not df.empty:
|
|
|
|
|
|
print(f"[OK] {method_name} 成功读取!")
|
|
|
|
|
|
print(f" 数据形状: {df.shape[0]}行 × {df.shape[1]}列")
|
|
|
|
|
|
return df
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
print(f"[WARN] {method_name} 失败: {str(e)[:100]}")
|
|
|
|
|
|
|
|
|
|
|
|
print(f"[ERROR] 所有读取方法都失败了")
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
def convert_to_csv(self, df: pd.DataFrame, base_name: str) -> str:
|
|
|
|
|
|
"""
|
|
|
|
|
|
将DataFrame转换为CSV
|
|
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
|
df: 数据框
|
|
|
|
|
|
base_name: 文件基础名
|
|
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
|
CSV文件路径
|
|
|
|
|
|
"""
|
|
|
|
|
|
csv_filename = f"{base_name}.csv"
|
2025-12-19 15:14:46 +08:00
|
|
|
|
csv_path = os.path.join(self.csv_output_dir, csv_filename)
|
2025-12-18 16:16:12 +08:00
|
|
|
|
|
|
|
|
|
|
# 保存为CSV,使用utf-8-sig编码支持中文
|
|
|
|
|
|
df.to_csv(csv_path, index=False, encoding='utf-8-sig')
|
|
|
|
|
|
|
|
|
|
|
|
file_size = os.path.getsize(csv_path) / 1024 # KB
|
|
|
|
|
|
print(f" [OK] CSV已生成: {csv_filename} ({file_size:.1f} KB)")
|
|
|
|
|
|
|
|
|
|
|
|
return csv_path
|
|
|
|
|
|
|
|
|
|
|
|
def convert_csv_to_json(self, csv_path: str, base_name: str) -> str:
|
|
|
|
|
|
"""
|
|
|
|
|
|
将CSV文件转换为JSON
|
|
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
|
csv_path: CSV文件路径
|
|
|
|
|
|
base_name: 文件基础名
|
|
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
|
JSON文件路径
|
|
|
|
|
|
"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
# 读取CSV文件
|
|
|
|
|
|
df = pd.read_csv(csv_path, encoding='utf-8-sig')
|
|
|
|
|
|
|
|
|
|
|
|
if df.empty:
|
|
|
|
|
|
print(f" [WARN] CSV文件为空")
|
|
|
|
|
|
return ""
|
|
|
|
|
|
|
|
|
|
|
|
# 转换为JSON列表
|
|
|
|
|
|
json_data = []
|
|
|
|
|
|
for index, row in df.iterrows():
|
|
|
|
|
|
# 创建JSON对象
|
|
|
|
|
|
json_obj = {}
|
|
|
|
|
|
for column in df.columns:
|
|
|
|
|
|
value = row[column]
|
|
|
|
|
|
|
|
|
|
|
|
# 处理Na值
|
|
|
|
|
|
if pd.isna(value):
|
|
|
|
|
|
json_obj[column] = None
|
|
|
|
|
|
else:
|
|
|
|
|
|
json_obj[column] = value
|
|
|
|
|
|
|
|
|
|
|
|
# 添加表名字段
|
|
|
|
|
|
json_obj['表名'] = base_name
|
|
|
|
|
|
|
|
|
|
|
|
json_data.append(json_obj)
|
|
|
|
|
|
|
|
|
|
|
|
# 生成JSON文件路径
|
|
|
|
|
|
json_filename = f"{base_name}.json"
|
|
|
|
|
|
json_path = os.path.join(self.output_dir, json_filename)
|
|
|
|
|
|
|
|
|
|
|
|
# 保存JSON文件
|
|
|
|
|
|
with open(json_path, 'w', encoding='utf-8') as f:
|
|
|
|
|
|
json.dump(json_data, f, ensure_ascii=False, indent=2)
|
|
|
|
|
|
|
|
|
|
|
|
file_size = os.path.getsize(json_path) / 1024 # KB
|
|
|
|
|
|
print(f" [OK] JSON已生成: {json_filename} ({file_size:.1f} KB)")
|
|
|
|
|
|
print(f" 数据量: {len(json_data)} 条记录")
|
|
|
|
|
|
|
|
|
|
|
|
return json_path
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
print(f" [ERROR] CSV转JSON失败: {e}")
|
|
|
|
|
|
import traceback
|
|
|
|
|
|
traceback.print_exc()
|
|
|
|
|
|
return ""
|
|
|
|
|
|
|
|
|
|
|
|
def process_single_file(self, excel_path: str, base_name: str) -> bool:
|
|
|
|
|
|
"""
|
|
|
|
|
|
处理单个Excel文件:Excel -> CSV -> JSON
|
|
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
|
excel_path: Excel文件路径
|
|
|
|
|
|
base_name: 文件基础名
|
|
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
|
是否成功
|
|
|
|
|
|
"""
|
|
|
|
|
|
print(f"\n{'='*60}")
|
|
|
|
|
|
print(f"处理: {os.path.basename(excel_path)}")
|
|
|
|
|
|
print(f"{'='*60}")
|
|
|
|
|
|
|
|
|
|
|
|
# 步骤1: 读取Excel
|
|
|
|
|
|
df = self.read_excel_file(excel_path)
|
|
|
|
|
|
if df is None:
|
|
|
|
|
|
print(f"[ERROR] 读取失败,跳过此文件")
|
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
# 显示数据预览
|
|
|
|
|
|
print(f"\n[INFO] 数据预览:")
|
|
|
|
|
|
print(df.head(3))
|
|
|
|
|
|
|
|
|
|
|
|
# 步骤2: 转换为CSV
|
|
|
|
|
|
csv_path = self.convert_to_csv(df, base_name)
|
|
|
|
|
|
|
|
|
|
|
|
# 步骤3: 转换为JSON
|
|
|
|
|
|
json_path = self.convert_csv_to_json(csv_path, base_name)
|
|
|
|
|
|
|
|
|
|
|
|
if json_path:
|
|
|
|
|
|
print(f"\n[OK] 转换完成!")
|
|
|
|
|
|
return True
|
|
|
|
|
|
else:
|
|
|
|
|
|
print(f"\n[ERROR] 转换失败")
|
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
def process_all(self) -> Dict:
|
|
|
|
|
|
"""
|
|
|
|
|
|
处理所有Excel文件
|
|
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
|
处理结果统计
|
|
|
|
|
|
"""
|
|
|
|
|
|
print("="*60)
|
|
|
|
|
|
print("Excel转JSON一体化工具")
|
|
|
|
|
|
print("="*60)
|
|
|
|
|
|
print(f"输入目录: {self.input_dir}")
|
|
|
|
|
|
print(f"输出目录: {self.output_dir}")
|
|
|
|
|
|
|
|
|
|
|
|
# 查找Excel文件
|
|
|
|
|
|
excel_files = self.find_excel_files()
|
|
|
|
|
|
|
|
|
|
|
|
if not excel_files:
|
|
|
|
|
|
print(f"\n[WARN] 未找到任何Excel文件")
|
|
|
|
|
|
return {'total': 0, 'success': 0, 'failed': 0}
|
|
|
|
|
|
|
|
|
|
|
|
print(f"\n[INFO] 发现 {len(excel_files)} 个Excel文件")
|
|
|
|
|
|
|
|
|
|
|
|
# 处理每个文件
|
|
|
|
|
|
success_count = 0
|
|
|
|
|
|
failed_count = 0
|
|
|
|
|
|
results = []
|
|
|
|
|
|
|
|
|
|
|
|
for excel_path, base_name in excel_files:
|
|
|
|
|
|
if self.process_single_file(excel_path, base_name):
|
|
|
|
|
|
success_count += 1
|
|
|
|
|
|
results.append({'file': os.path.basename(excel_path), 'status': 'success'})
|
|
|
|
|
|
else:
|
|
|
|
|
|
failed_count += 1
|
|
|
|
|
|
results.append({'file': os.path.basename(excel_path), 'status': 'failed'})
|
|
|
|
|
|
|
|
|
|
|
|
# 输出统计信息
|
|
|
|
|
|
print(f"\n{'='*60}")
|
|
|
|
|
|
print("转换完成!")
|
|
|
|
|
|
print(f"{'='*60}")
|
|
|
|
|
|
print(f"总计: {len(excel_files)} 个文件")
|
|
|
|
|
|
print(f"成功: {success_count} 个文件")
|
|
|
|
|
|
print(f"失败: {failed_count} 个文件")
|
|
|
|
|
|
|
2025-12-19 15:14:46 +08:00
|
|
|
|
# 显示生成的CSV和JSON文件
|
2025-12-18 16:16:12 +08:00
|
|
|
|
if success_count > 0:
|
2025-12-19 15:14:46 +08:00
|
|
|
|
print(f"\n生成的CSV文件:")
|
|
|
|
|
|
csv_files = glob.glob(os.path.join(self.csv_output_dir, "*.csv"))
|
|
|
|
|
|
for csv_file in sorted(csv_files):
|
|
|
|
|
|
file_size = os.path.getsize(csv_file) / 1024 # KB
|
|
|
|
|
|
filename = os.path.basename(csv_file)
|
|
|
|
|
|
print(f" - {filename} ({file_size:.1f} KB)")
|
|
|
|
|
|
|
2025-12-18 16:16:12 +08:00
|
|
|
|
print(f"\n生成的JSON文件:")
|
|
|
|
|
|
json_files = glob.glob(os.path.join(self.output_dir, "*.json"))
|
|
|
|
|
|
for json_file in sorted(json_files):
|
|
|
|
|
|
file_size = os.path.getsize(json_file) / 1024 # KB
|
|
|
|
|
|
filename = os.path.basename(json_file)
|
|
|
|
|
|
print(f" - {filename} ({file_size:.1f} KB)")
|
|
|
|
|
|
|
|
|
|
|
|
return {
|
|
|
|
|
|
'total': len(excel_files),
|
|
|
|
|
|
'success': success_count,
|
|
|
|
|
|
'failed': failed_count,
|
|
|
|
|
|
'results': results
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def main():
|
|
|
|
|
|
"""主函数 - 演示用法"""
|
2025-12-19 15:14:46 +08:00
|
|
|
|
# 获取当前工作目录
|
|
|
|
|
|
current_dir = os.getcwd()
|
|
|
|
|
|
|
|
|
|
|
|
# 配置路径 - 基于当前目录
|
|
|
|
|
|
input_dir = os.path.join(current_dir, "Data")
|
|
|
|
|
|
output_dir = os.path.join(current_dir, "Data_Export_Json")
|
|
|
|
|
|
csv_dir = os.path.join(current_dir, "Data_Export_CSV")
|
|
|
|
|
|
|
|
|
|
|
|
print(f"[INFO] 当前工作目录: {current_dir}")
|
|
|
|
|
|
print(f"[INFO] Excel文件目录: {input_dir}")
|
|
|
|
|
|
print(f"[INFO] CSV输出目录: {csv_dir}")
|
|
|
|
|
|
print(f"[INFO] JSON输出目录: {output_dir}")
|
2025-12-18 16:16:12 +08:00
|
|
|
|
|
|
|
|
|
|
# 创建转换器实例
|
|
|
|
|
|
converter = ExcelToJsonConverter(input_dir, output_dir)
|
|
|
|
|
|
|
|
|
|
|
|
# 处理所有文件
|
|
|
|
|
|
result = converter.process_all()
|
|
|
|
|
|
|
|
|
|
|
|
# 输出结果
|
|
|
|
|
|
print(f"\n[INFO] 处理结果: {result}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|
|
main()
|