Files
YG_TDgenerator/change_xlsx2json.py

634 lines
21 KiB
Python
Raw Normal View History

2025-12-18 16:16:12 +08:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Excel转JSON一体化工具
功能读取Excel文件 -> 转换为CSV -> 转换为JSON
支持多种Excel读取方式自动处理复杂格式
"""
import pandas as pd
import json
import os
import glob
import subprocess
import xlwings as xw
from datetime import datetime
from typing import Optional, Dict, List, Tuple
class ExcelToJsonConverter:
"""Excel转JSON转换器"""
def __init__(self, input_dir: str, output_dir: str):
"""
初始化转换器
Args:
input_dir: Excel文件输入目录
output_dir: JSON文件输出目录
"""
self.input_dir = input_dir
self.output_dir = output_dir
# 确保输出目录存在
if not os.path.exists(output_dir):
os.makedirs(output_dir)
# CSV临时目录仅在Excel模式下使用
self.temp_csv_dir = None
2025-12-18 16:16:12 +08:00
def find_excel_files(self) -> List[Tuple[str, str]]:
"""扫描目录下的所有Excel文件"""
excel_files = []
search_pattern = os.path.join(self.input_dir, "*.xlsx")
for excel_path in glob.glob(search_pattern):
filename = os.path.basename(excel_path)
# 跳过临时文件Excel的临时文件以~$开头)
if filename.startswith('~$'):
print(f"[SKIP] 跳过临时文件: {filename}")
continue
# 生成基础文件名(不含扩展名)
base_name = filename.replace('.xlsx', '')
excel_files.append((excel_path, base_name))
return excel_files
def read_excel_with_xlwings(self, excel_path: str) -> Optional[pd.DataFrame]:
"""使用xlwings读取Excel文件"""
try:
print(f" [TRY] 使用xlwings读取...")
app = xw.App(visible=False)
wb = app.books.open(excel_path)
sheet = wb.sheets[0]
# 读取数据
data = sheet.range('A1').expand().value
wb.close()
app.quit()
# 转换为DataFrame
if data and len(data) > 0:
if isinstance(data[0], list):
# 标准表格格式
headers = data[0]
rows = data[1:] if len(data) > 1 else []
df = pd.DataFrame(rows, columns=headers)
else:
# 每行只有一个值的特殊格式
df = pd.DataFrame(data, columns=['内容'])
return df
return None
except ImportError:
print(f" [WARN] xlwings未安装")
return None
except Exception as e:
print(f" [WARN] xlwings读取失败: {str(e)[:100]}")
return None
def read_excel_with_libreoffice(self, excel_path: str) -> Optional[pd.DataFrame]:
"""使用LibreOffice转换后读取"""
try:
print(f" [TRY] 使用LibreOffice转换...")
# 输出CSV路径
csv_path = excel_path.replace('.xlsx', '_temp.csv')
# 使用LibreOffice转换
cmd = [
'libreoffice',
'--headless',
'--convert-to', 'csv',
'--outdir', os.path.dirname(excel_path),
excel_path
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if os.path.exists(csv_path):
df = pd.read_csv(csv_path, encoding='utf-8')
# 删除临时文件
os.remove(csv_path)
print(f" [OK] LibreOffice转换成功")
return df
else:
print(f" [WARN] LibreOffice转换失败")
return None
except FileNotFoundError:
print(f" [WARN] LibreOffice未安装")
return None
except subprocess.TimeoutExpired:
print(f" [WARN] LibreOffice转换超时")
return None
except Exception as e:
print(f" [WARN] LibreOffice转换失败: {e}")
return None
def read_excel_with_pandas(self, excel_path: str) -> Optional[pd.DataFrame]:
"""使用pandas读取Excel文件"""
engines = ['openpyxl', 'xlrd']
for engine in engines:
try:
print(f" [TRY] 使用pandas ({engine})读取...")
df = pd.read_excel(excel_path, engine=engine)
print(f" [OK] pandas ({engine}) 读取成功")
return df
except Exception as e:
print(f" [WARN] pandas ({engine}) 失败: {str(e)[:100]}")
continue
return None
def read_excel_file(self, excel_path: str) -> Optional[pd.DataFrame]:
"""
尝试多种方法读取Excel文件
Args:
excel_path: Excel文件路径
Returns:
DataFrame或None
"""
print(f"\n[INFO] 读取文件: {os.path.basename(excel_path)}")
# 按优先级尝试读取方法
methods = [
("xlwings", self.read_excel_with_xlwings),
("pandas-openpyxl", lambda p: self.read_excel_with_pandas(p) if 'openpyxl' in str(p) else None),
("LibreOffice", self.read_excel_with_libreoffice),
("pandas-xlrd", self.read_excel_with_pandas),
]
for method_name, method_func in methods:
try:
if method_name == "pandas-openpyxl":
# 特殊处理pandas-openpyxl
df = self.read_excel_with_pandas(excel_path)
elif method_name == "pandas-xlrd":
# 跳过,因为上面已经尝试过了
continue
else:
df = method_func(excel_path)
if df is not None and not df.empty:
print(f"[OK] {method_name} 成功读取!")
print(f" 数据形状: {df.shape[0]}× {df.shape[1]}")
return df
except Exception as e:
print(f"[WARN] {method_name} 失败: {str(e)[:100]}")
print(f"[ERROR] 所有读取方法都失败了")
return None
def convert_to_csv(self, df: pd.DataFrame, base_name: str) -> str:
"""
将DataFrame转换为CSV
Args:
df: 数据框
base_name: 文件基础名
Returns:
CSV文件路径
"""
# 确保临时CSV目录存在
if self.temp_csv_dir is None:
self.temp_csv_dir = os.path.join(self.output_dir, "temp_csv")
if not os.path.exists(self.temp_csv_dir):
os.makedirs(self.temp_csv_dir)
2025-12-18 16:16:12 +08:00
csv_filename = f"{base_name}.csv"
csv_path = os.path.join(self.temp_csv_dir, csv_filename)
# 保存为CSV使用utf-8-sig编码支持中文
df.to_csv(csv_path, index=False, encoding='utf-8-sig')
file_size = os.path.getsize(csv_path) / 1024 # KB
print(f" [OK] CSV已生成: {csv_filename} ({file_size:.1f} KB)")
return csv_path
def convert_csv_to_json(self, csv_path: str, base_name: str) -> str:
"""
将CSV文件转换为JSON
Args:
csv_path: CSV文件路径
base_name: 文件基础名
Returns:
JSON文件路径
"""
try:
# 读取CSV文件
df = pd.read_csv(csv_path, encoding='utf-8-sig')
if df.empty:
print(f" [WARN] CSV文件为空")
return ""
# 转换为JSON列表
json_data = []
for index, row in df.iterrows():
# 创建JSON对象
json_obj = {}
for column in df.columns:
value = row[column]
# 处理Na值
if pd.isna(value):
json_obj[column] = None
else:
# 处理数据值:如果是字符串且包含英文字母,转换为小写
if isinstance(value, str) and any(c.isalpha() and ord(c) < 128 for c in value):
# 将数据值中的英文字母转换为小写
value = value.lower()
# 将英文字段名转换为小写
# 检查字段名是否完全是英文字符(包括字母、数字、下划线)
if all(ord(c) < 128 for c in column if c.isalnum() or c in '_'):
# 完全是英文字段名,转换为小写
json_obj[column.lower()] = value
else:
# 包含中文字符的字段名保持不变
json_obj[column] = value
2025-12-18 16:16:12 +08:00
# 添加表名字段
json_obj['表名'] = base_name
json_data.append(json_obj)
# 生成JSON文件路径
json_filename = f"{base_name}.json"
json_path = os.path.join(self.output_dir, json_filename)
# 保存JSON文件
with open(json_path, 'w', encoding='utf-8') as f:
json.dump(json_data, f, ensure_ascii=False, indent=2)
file_size = os.path.getsize(json_path) / 1024 # KB
print(f" [OK] JSON已生成: {json_filename} ({file_size:.1f} KB)")
print(f" 数据量: {len(json_data)} 条记录")
return json_path
except Exception as e:
print(f" [ERROR] CSV转JSON失败: {e}")
import traceback
traceback.print_exc()
return ""
def process_single_file(self, excel_path: str, base_name: str) -> bool:
"""
处理单个Excel文件Excel -> CSV -> JSON
Args:
excel_path: Excel文件路径
base_name: 文件基础名
Returns:
是否成功
"""
print(f"\n{'='*60}")
print(f"处理: {os.path.basename(excel_path)}")
print(f"{'='*60}")
# 步骤1: 读取Excel
df = self.read_excel_file(excel_path)
if df is None:
print(f"[ERROR] 读取失败,跳过此文件")
return False
# 显示数据预览
print(f"\n[INFO] 数据预览:")
print(df.head(3))
# 步骤2: 转换为CSV
csv_path = self.convert_to_csv(df, base_name)
# 步骤3: 转换为JSON
json_path = self.convert_csv_to_json(csv_path, base_name)
if json_path:
print(f"\n[OK] 转换完成!")
return True
else:
print(f"\n[ERROR] 转换失败")
return False
def process_all(self) -> Dict:
"""
处理所有Excel文件
Returns:
处理结果统计
"""
print("="*60)
print("Excel转JSON一体化工具")
print("="*60)
print(f"输入目录: {self.input_dir}")
print(f"输出目录: {self.output_dir}")
# 查找Excel文件
excel_files = self.find_excel_files()
if not excel_files:
print(f"\n[WARN] 未找到任何Excel文件")
return {'total': 0, 'success': 0, 'failed': 0}
print(f"\n[INFO] 发现 {len(excel_files)} 个Excel文件")
# 处理每个文件
success_count = 0
failed_count = 0
results = []
for excel_path, base_name in excel_files:
if self.process_single_file(excel_path, base_name):
success_count += 1
results.append({'file': os.path.basename(excel_path), 'status': 'success'})
else:
failed_count += 1
results.append({'file': os.path.basename(excel_path), 'status': 'failed'})
# 输出统计信息
print(f"\n{'='*60}")
print("转换完成!")
print(f"{'='*60}")
print(f"总计: {len(excel_files)} 个文件")
print(f"成功: {success_count} 个文件")
print(f"失败: {failed_count} 个文件")
# 显示生成的JSON文件
if success_count > 0:
print(f"\n生成的JSON文件:")
json_files = glob.glob(os.path.join(self.output_dir, "*.json"))
for json_file in sorted(json_files):
file_size = os.path.getsize(json_file) / 1024 # KB
filename = os.path.basename(json_file)
print(f" - {filename} ({file_size:.1f} KB)")
return {
'total': len(excel_files),
'success': success_count,
'failed': failed_count,
'results': results
}
def find_csv_files(self, csv_dir: str) -> List[Tuple[str, str]]:
"""扫描目录下的所有CSV文件"""
csv_files = []
search_pattern = os.path.join(csv_dir, "*.csv")
for csv_path in glob.glob(search_pattern):
filename = os.path.basename(csv_path)
# 生成基础文件名(不含扩展名)
base_name = filename.replace('.csv', '')
csv_files.append((csv_path, base_name))
return csv_files
def convert_csv_to_json_direct(self, csv_path: str, base_name: str) -> str:
"""
直接将CSV文件转换为JSON不生成临时CSV
这个方法直接从CSV读取并转换为JSON
Args:
csv_path: CSV文件路径
base_name: 文件基础名
Returns:
JSON文件路径
"""
try:
# 尝试多种编码读取CSV文件
encodings = ['utf-8-sig', 'gb2312', 'gbk', 'utf-8']
df = None
for encoding in encodings:
try:
print(f" [TRY] 尝试编码: {encoding}")
df = pd.read_csv(csv_path, encoding=encoding)
print(f" [OK] 编码 {encoding} 读取成功")
break
except (UnicodeDecodeError, UnicodeError):
print(f" [WARN] 编码 {encoding} 失败")
continue
except Exception as e:
print(f" [WARN] 编码 {encoding} 其他错误: {str(e)[:50]}")
continue
if df is None:
print(f" [ERROR] 所有编码都失败无法读取CSV文件")
return ""
if df.empty:
print(f" [WARN] CSV文件为空")
return ""
# 转换为JSON列表
json_data = []
for index, row in df.iterrows():
# 创建JSON对象
json_obj = {}
for column in df.columns:
value = row[column]
# 处理Na值
if pd.isna(value):
json_obj[column] = None
else:
# 处理数据值:如果是字符串且包含英文字母,转换为小写
if isinstance(value, str) and any(c.isalpha() and ord(c) < 128 for c in value):
# 将数据值中的英文字母转换为小写
value = value.lower()
# 将英文字段名转换为小写
# 检查字段名是否完全是英文字符(包括字母、数字、下划线)
if all(ord(c) < 128 for c in column if c.isalnum() or c in '_'):
# 完全是英文字段名,转换为小写
json_obj[column.lower()] = value
else:
# 包含中文字符的字段名保持不变
json_obj[column] = value
# 添加表名字段
json_obj['表名'] = base_name
json_data.append(json_obj)
# 生成JSON文件路径
json_filename = f"{base_name}.json"
json_path = os.path.join(self.output_dir, json_filename)
# 保存JSON文件
with open(json_path, 'w', encoding='utf-8') as f:
json.dump(json_data, f, ensure_ascii=False, indent=2)
file_size = os.path.getsize(json_path) / 1024 # KB
print(f" [OK] JSON已生成: {json_filename} ({file_size:.1f} KB)")
print(f" 数据量: {len(json_data)} 条记录")
return json_path
except Exception as e:
print(f" [ERROR] CSV转JSON失败: {e}")
import traceback
traceback.print_exc()
return ""
def process_single_csv(self, csv_path: str, base_name: str) -> bool:
"""
处理单个CSV文件CSV JSON
Args:
csv_path: CSV文件路径
base_name: 文件基础名
Returns:
是否成功
"""
print(f"\n{'='*60}")
print(f"处理: {os.path.basename(csv_path)}")
print(f"{'='*60}")
# 步骤1: 读取CSV文件并预览
try:
# 尝试多种编码读取CSV文件
encodings = ['utf-8-sig', 'gb2312', 'gbk', 'utf-8']
df = None
for encoding in encodings:
try:
df = pd.read_csv(csv_path, encoding=encoding)
break
except (UnicodeDecodeError, UnicodeError):
continue
except Exception as e:
print(f"[ERROR] 编码 {encoding} 错误: {e}")
continue
if df is None or df.empty:
print(f"[ERROR] CSV文件为空或读取失败")
return False
print(f"\n[INFO] 数据预览:")
print(df.head(3))
print(f"\n[INFO] 数据形状: {df.shape[0]}× {df.shape[1]}")
except Exception as e:
print(f"[ERROR] 读取CSV失败: {e}")
return False
# 步骤2: 转换为JSON
json_path = self.convert_csv_to_json_direct(csv_path, base_name)
if json_path:
print(f"\n[OK] 转换完成!")
return True
else:
print(f"\n[ERROR] 转换失败")
return False
def convert_csv_directory(self, csv_dir: str) -> Dict:
"""
处理CSV目录下的所有CSV文件
Args:
csv_dir: CSV文件目录
Returns:
处理结果统计
"""
print("="*60)
print("CSV转JSON工具")
print("="*60)
print(f"CSV输入目录: {csv_dir}")
print(f"JSON输出目录: {self.output_dir}")
# 查找CSV文件
csv_files = self.find_csv_files(csv_dir)
if not csv_files:
print(f"\n[WARN] 未找到任何CSV文件")
return {'total': 0, 'success': 0, 'failed': 0}
print(f"\n[INFO] 发现 {len(csv_files)} 个CSV文件")
# 处理每个文件
success_count = 0
failed_count = 0
results = []
for csv_path, base_name in csv_files:
if self.process_single_csv(csv_path, base_name):
success_count += 1
results.append({'file': os.path.basename(csv_path), 'status': 'success'})
else:
failed_count += 1
results.append({'file': os.path.basename(csv_path), 'status': 'failed'})
# 输出统计信息
print(f"\n{'='*60}")
print("转换完成!")
print(f"{'='*60}")
print(f"总计: {len(csv_files)} 个文件")
print(f"成功: {success_count} 个文件")
print(f"失败: {failed_count} 个文件")
# 显示生成的JSON文件
if success_count > 0:
print(f"\n生成的JSON文件:")
json_files = glob.glob(os.path.join(self.output_dir, "*.json"))
for json_file in sorted(json_files):
file_size = os.path.getsize(json_file) / 1024 # KB
filename = os.path.basename(json_file)
print(f" - {filename} ({file_size:.1f} KB)")
return {
'total': len(csv_files),
'success': success_count,
'failed': failed_count,
'results': results
}
2025-12-18 16:16:12 +08:00
def main():
"""主函数 - 演示用法"""
# 配置路径
input_dir = "Data"
csv_input_dir = "Data_Export_CSV"
output_dir = "Data_Export_Json"
2025-12-18 16:16:12 +08:00
# 创建转换器实例
converter = ExcelToJsonConverter(input_dir, output_dir)
# 优先使用CSV模式
if os.path.exists(csv_input_dir) and os.listdir(csv_input_dir):
# CSV模式使用现有的CSV文件
print(f"\n[INFO] 检测到CSV文件使用CSV模式")
print(f"{csv_input_dir} 读取CSV文件")
result = converter.convert_csv_directory(csv_input_dir)
else:
# Excel模式使用Excel文件备选方案
excel_files = converter.find_excel_files()
if excel_files:
print(f"\n[INFO] 未找到CSV文件使用Excel模式")
print(f"{input_dir} 读取Excel文件")
result = converter.process_all()
else:
print(f"\n[WARN] 未找到CSV文件和Excel文件")
result = {'total': 0, 'success': 0, 'failed': 0}
2025-12-18 16:16:12 +08:00
# 输出结果
print(f"\n[INFO] 处理结果: {result}")
if __name__ == "__main__":
main()