#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Excel转JSON一体化工具 功能:读取Excel文件 -> 转换为CSV -> 转换为JSON 支持多种Excel读取方式,自动处理复杂格式 """ import pandas as pd import json import os import glob import subprocess import xlwings as xw from datetime import datetime from typing import Optional, Dict, List, Tuple class ExcelToJsonConverter: """Excel转JSON转换器""" def __init__(self, input_dir: str, output_dir: str): """ 初始化转换器 Args: input_dir: Excel文件输入目录 output_dir: JSON文件输出目录 """ self.input_dir = input_dir self.output_dir = output_dir # 确保输出目录存在 if not os.path.exists(output_dir): os.makedirs(output_dir) # CSV临时目录(仅在Excel模式下使用) self.temp_csv_dir = None def find_excel_files(self) -> List[Tuple[str, str]]: """扫描目录下的所有Excel文件""" excel_files = [] search_pattern = os.path.join(self.input_dir, "*.xlsx") for excel_path in glob.glob(search_pattern): filename = os.path.basename(excel_path) # 跳过临时文件(Excel的临时文件以~$开头) if filename.startswith('~$'): print(f"[SKIP] 跳过临时文件: {filename}") continue # 生成基础文件名(不含扩展名) base_name = filename.replace('.xlsx', '') excel_files.append((excel_path, base_name)) return excel_files def read_excel_with_xlwings(self, excel_path: str) -> Optional[pd.DataFrame]: """使用xlwings读取Excel文件""" try: print(f" [TRY] 使用xlwings读取...") app = xw.App(visible=False) wb = app.books.open(excel_path) sheet = wb.sheets[0] # 读取数据 data = sheet.range('A1').expand().value wb.close() app.quit() # 转换为DataFrame if data and len(data) > 0: if isinstance(data[0], list): # 标准表格格式 headers = data[0] rows = data[1:] if len(data) > 1 else [] df = pd.DataFrame(rows, columns=headers) else: # 每行只有一个值的特殊格式 df = pd.DataFrame(data, columns=['内容']) return df return None except ImportError: print(f" [WARN] xlwings未安装") return None except Exception as e: print(f" [WARN] xlwings读取失败: {str(e)[:100]}") return None def read_excel_with_libreoffice(self, excel_path: str) -> Optional[pd.DataFrame]: """使用LibreOffice转换后读取""" try: print(f" [TRY] 使用LibreOffice转换...") # 输出CSV路径 csv_path = excel_path.replace('.xlsx', '_temp.csv') # 使用LibreOffice转换 cmd = [ 'libreoffice', '--headless', '--convert-to', 'csv', '--outdir', os.path.dirname(excel_path), excel_path ] result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) if os.path.exists(csv_path): df = pd.read_csv(csv_path, encoding='utf-8') # 删除临时文件 os.remove(csv_path) print(f" [OK] LibreOffice转换成功") return df else: print(f" [WARN] LibreOffice转换失败") return None except FileNotFoundError: print(f" [WARN] LibreOffice未安装") return None except subprocess.TimeoutExpired: print(f" [WARN] LibreOffice转换超时") return None except Exception as e: print(f" [WARN] LibreOffice转换失败: {e}") return None def read_excel_with_pandas(self, excel_path: str) -> Optional[pd.DataFrame]: """使用pandas读取Excel文件""" engines = ['openpyxl', 'xlrd'] for engine in engines: try: print(f" [TRY] 使用pandas ({engine})读取...") df = pd.read_excel(excel_path, engine=engine) print(f" [OK] pandas ({engine}) 读取成功") return df except Exception as e: print(f" [WARN] pandas ({engine}) 失败: {str(e)[:100]}") continue return None def read_excel_file(self, excel_path: str) -> Optional[pd.DataFrame]: """ 尝试多种方法读取Excel文件 Args: excel_path: Excel文件路径 Returns: DataFrame或None """ print(f"\n[INFO] 读取文件: {os.path.basename(excel_path)}") # 按优先级尝试读取方法 methods = [ ("xlwings", self.read_excel_with_xlwings), ("pandas-openpyxl", lambda p: self.read_excel_with_pandas(p) if 'openpyxl' in str(p) else None), ("LibreOffice", self.read_excel_with_libreoffice), ("pandas-xlrd", self.read_excel_with_pandas), ] for method_name, method_func in methods: try: if method_name == "pandas-openpyxl": # 特殊处理pandas-openpyxl df = self.read_excel_with_pandas(excel_path) elif method_name == "pandas-xlrd": # 跳过,因为上面已经尝试过了 continue else: df = method_func(excel_path) if df is not None and not df.empty: print(f"[OK] {method_name} 成功读取!") print(f" 数据形状: {df.shape[0]}行 × {df.shape[1]}列") return df except Exception as e: print(f"[WARN] {method_name} 失败: {str(e)[:100]}") print(f"[ERROR] 所有读取方法都失败了") return None def convert_to_csv(self, df: pd.DataFrame, base_name: str) -> str: """ 将DataFrame转换为CSV Args: df: 数据框 base_name: 文件基础名 Returns: CSV文件路径 """ # 确保临时CSV目录存在 if self.temp_csv_dir is None: self.temp_csv_dir = os.path.join(self.output_dir, "temp_csv") if not os.path.exists(self.temp_csv_dir): os.makedirs(self.temp_csv_dir) csv_filename = f"{base_name}.csv" csv_path = os.path.join(self.temp_csv_dir, csv_filename) # 保存为CSV,使用utf-8-sig编码支持中文 df.to_csv(csv_path, index=False, encoding='utf-8-sig') file_size = os.path.getsize(csv_path) / 1024 # KB print(f" [OK] CSV已生成: {csv_filename} ({file_size:.1f} KB)") return csv_path def convert_csv_to_json(self, csv_path: str, base_name: str) -> str: """ 将CSV文件转换为JSON Args: csv_path: CSV文件路径 base_name: 文件基础名 Returns: JSON文件路径 """ try: # 读取CSV文件 df = pd.read_csv(csv_path, encoding='utf-8-sig') if df.empty: print(f" [WARN] CSV文件为空") return "" # 转换为JSON列表 json_data = [] for index, row in df.iterrows(): # 创建JSON对象 json_obj = {} for column in df.columns: value = row[column] # 处理Na值 if pd.isna(value): json_obj[column] = None else: # 处理数据值:如果是字符串且包含英文字母,转换为小写 if isinstance(value, str) and any(c.isalpha() and ord(c) < 128 for c in value): # 将数据值中的英文字母转换为小写 value = value.lower() # 将英文字段名转换为小写 # 检查字段名是否完全是英文字符(包括字母、数字、下划线) if all(ord(c) < 128 for c in column if c.isalnum() or c in '_'): # 完全是英文字段名,转换为小写 json_obj[column.lower()] = value else: # 包含中文字符的字段名保持不变 json_obj[column] = value # 添加表名字段 json_obj['表名'] = base_name json_data.append(json_obj) # 生成JSON文件路径 json_filename = f"{base_name}.json" json_path = os.path.join(self.output_dir, json_filename) # 保存JSON文件 with open(json_path, 'w', encoding='utf-8') as f: json.dump(json_data, f, ensure_ascii=False, indent=2) file_size = os.path.getsize(json_path) / 1024 # KB print(f" [OK] JSON已生成: {json_filename} ({file_size:.1f} KB)") print(f" 数据量: {len(json_data)} 条记录") return json_path except Exception as e: print(f" [ERROR] CSV转JSON失败: {e}") import traceback traceback.print_exc() return "" def process_single_file(self, excel_path: str, base_name: str) -> bool: """ 处理单个Excel文件:Excel -> CSV -> JSON Args: excel_path: Excel文件路径 base_name: 文件基础名 Returns: 是否成功 """ print(f"\n{'='*60}") print(f"处理: {os.path.basename(excel_path)}") print(f"{'='*60}") # 步骤1: 读取Excel df = self.read_excel_file(excel_path) if df is None: print(f"[ERROR] 读取失败,跳过此文件") return False # 显示数据预览 print(f"\n[INFO] 数据预览:") print(df.head(3)) # 步骤2: 转换为CSV csv_path = self.convert_to_csv(df, base_name) # 步骤3: 转换为JSON json_path = self.convert_csv_to_json(csv_path, base_name) if json_path: print(f"\n[OK] 转换完成!") return True else: print(f"\n[ERROR] 转换失败") return False def process_all(self) -> Dict: """ 处理所有Excel文件 Returns: 处理结果统计 """ print("="*60) print("Excel转JSON一体化工具") print("="*60) print(f"输入目录: {self.input_dir}") print(f"输出目录: {self.output_dir}") # 查找Excel文件 excel_files = self.find_excel_files() if not excel_files: print(f"\n[WARN] 未找到任何Excel文件") return {'total': 0, 'success': 0, 'failed': 0} print(f"\n[INFO] 发现 {len(excel_files)} 个Excel文件") # 处理每个文件 success_count = 0 failed_count = 0 results = [] for excel_path, base_name in excel_files: if self.process_single_file(excel_path, base_name): success_count += 1 results.append({'file': os.path.basename(excel_path), 'status': 'success'}) else: failed_count += 1 results.append({'file': os.path.basename(excel_path), 'status': 'failed'}) # 输出统计信息 print(f"\n{'='*60}") print("转换完成!") print(f"{'='*60}") print(f"总计: {len(excel_files)} 个文件") print(f"成功: {success_count} 个文件") print(f"失败: {failed_count} 个文件") # 显示生成的JSON文件 if success_count > 0: print(f"\n生成的JSON文件:") json_files = glob.glob(os.path.join(self.output_dir, "*.json")) for json_file in sorted(json_files): file_size = os.path.getsize(json_file) / 1024 # KB filename = os.path.basename(json_file) print(f" - {filename} ({file_size:.1f} KB)") return { 'total': len(excel_files), 'success': success_count, 'failed': failed_count, 'results': results } def find_csv_files(self, csv_dir: str) -> List[Tuple[str, str]]: """扫描目录下的所有CSV文件""" csv_files = [] search_pattern = os.path.join(csv_dir, "*.csv") for csv_path in glob.glob(search_pattern): filename = os.path.basename(csv_path) # 生成基础文件名(不含扩展名) base_name = filename.replace('.csv', '') csv_files.append((csv_path, base_name)) return csv_files def convert_csv_to_json_direct(self, csv_path: str, base_name: str) -> str: """ 直接将CSV文件转换为JSON(不生成临时CSV) 这个方法直接从CSV读取并转换为JSON Args: csv_path: CSV文件路径 base_name: 文件基础名 Returns: JSON文件路径 """ try: # 尝试多种编码读取CSV文件 encodings = ['utf-8-sig', 'gb2312', 'gbk', 'utf-8'] df = None for encoding in encodings: try: print(f" [TRY] 尝试编码: {encoding}") df = pd.read_csv(csv_path, encoding=encoding) print(f" [OK] 编码 {encoding} 读取成功") break except (UnicodeDecodeError, UnicodeError): print(f" [WARN] 编码 {encoding} 失败") continue except Exception as e: print(f" [WARN] 编码 {encoding} 其他错误: {str(e)[:50]}") continue if df is None: print(f" [ERROR] 所有编码都失败,无法读取CSV文件") return "" if df.empty: print(f" [WARN] CSV文件为空") return "" # 转换为JSON列表 json_data = [] for index, row in df.iterrows(): # 创建JSON对象 json_obj = {} for column in df.columns: value = row[column] # 处理Na值 if pd.isna(value): json_obj[column] = None else: # 处理数据值:如果是字符串且包含英文字母,转换为小写 if isinstance(value, str) and any(c.isalpha() and ord(c) < 128 for c in value): # 将数据值中的英文字母转换为小写 value = value.lower() # 将英文字段名转换为小写 # 检查字段名是否完全是英文字符(包括字母、数字、下划线) if all(ord(c) < 128 for c in column if c.isalnum() or c in '_'): # 完全是英文字段名,转换为小写 json_obj[column.lower()] = value else: # 包含中文字符的字段名保持不变 json_obj[column] = value # 添加表名字段 json_obj['表名'] = base_name json_data.append(json_obj) # 生成JSON文件路径 json_filename = f"{base_name}.json" json_path = os.path.join(self.output_dir, json_filename) # 保存JSON文件 with open(json_path, 'w', encoding='utf-8') as f: json.dump(json_data, f, ensure_ascii=False, indent=2) file_size = os.path.getsize(json_path) / 1024 # KB print(f" [OK] JSON已生成: {json_filename} ({file_size:.1f} KB)") print(f" 数据量: {len(json_data)} 条记录") return json_path except Exception as e: print(f" [ERROR] CSV转JSON失败: {e}") import traceback traceback.print_exc() return "" def process_single_csv(self, csv_path: str, base_name: str) -> bool: """ 处理单个CSV文件:CSV → JSON Args: csv_path: CSV文件路径 base_name: 文件基础名 Returns: 是否成功 """ print(f"\n{'='*60}") print(f"处理: {os.path.basename(csv_path)}") print(f"{'='*60}") # 步骤1: 读取CSV文件并预览 try: # 尝试多种编码读取CSV文件 encodings = ['utf-8-sig', 'gb2312', 'gbk', 'utf-8'] df = None for encoding in encodings: try: df = pd.read_csv(csv_path, encoding=encoding) break except (UnicodeDecodeError, UnicodeError): continue except Exception as e: print(f"[ERROR] 编码 {encoding} 错误: {e}") continue if df is None or df.empty: print(f"[ERROR] CSV文件为空或读取失败") return False print(f"\n[INFO] 数据预览:") print(df.head(3)) print(f"\n[INFO] 数据形状: {df.shape[0]}行 × {df.shape[1]}列") except Exception as e: print(f"[ERROR] 读取CSV失败: {e}") return False # 步骤2: 转换为JSON json_path = self.convert_csv_to_json_direct(csv_path, base_name) if json_path: print(f"\n[OK] 转换完成!") return True else: print(f"\n[ERROR] 转换失败") return False def convert_csv_directory(self, csv_dir: str) -> Dict: """ 处理CSV目录下的所有CSV文件 Args: csv_dir: CSV文件目录 Returns: 处理结果统计 """ print("="*60) print("CSV转JSON工具") print("="*60) print(f"CSV输入目录: {csv_dir}") print(f"JSON输出目录: {self.output_dir}") # 查找CSV文件 csv_files = self.find_csv_files(csv_dir) if not csv_files: print(f"\n[WARN] 未找到任何CSV文件") return {'total': 0, 'success': 0, 'failed': 0} print(f"\n[INFO] 发现 {len(csv_files)} 个CSV文件") # 处理每个文件 success_count = 0 failed_count = 0 results = [] for csv_path, base_name in csv_files: if self.process_single_csv(csv_path, base_name): success_count += 1 results.append({'file': os.path.basename(csv_path), 'status': 'success'}) else: failed_count += 1 results.append({'file': os.path.basename(csv_path), 'status': 'failed'}) # 输出统计信息 print(f"\n{'='*60}") print("转换完成!") print(f"{'='*60}") print(f"总计: {len(csv_files)} 个文件") print(f"成功: {success_count} 个文件") print(f"失败: {failed_count} 个文件") # 显示生成的JSON文件 if success_count > 0: print(f"\n生成的JSON文件:") json_files = glob.glob(os.path.join(self.output_dir, "*.json")) for json_file in sorted(json_files): file_size = os.path.getsize(json_file) / 1024 # KB filename = os.path.basename(json_file) print(f" - {filename} ({file_size:.1f} KB)") return { 'total': len(csv_files), 'success': success_count, 'failed': failed_count, 'results': results } def main(): """主函数 - 演示用法""" # 配置路径 input_dir = "Data" csv_input_dir = "Data_Export_CSV" output_dir = "Data_Export_Json" # 创建转换器实例 converter = ExcelToJsonConverter(input_dir, output_dir) # 优先使用CSV模式 if os.path.exists(csv_input_dir) and os.listdir(csv_input_dir): # CSV模式:使用现有的CSV文件 print(f"\n[INFO] 检测到CSV文件,使用CSV模式") print(f" 从 {csv_input_dir} 读取CSV文件") result = converter.convert_csv_directory(csv_input_dir) else: # Excel模式:使用Excel文件(备选方案) excel_files = converter.find_excel_files() if excel_files: print(f"\n[INFO] 未找到CSV文件,使用Excel模式") print(f" 从 {input_dir} 读取Excel文件") result = converter.process_all() else: print(f"\n[WARN] 未找到CSV文件和Excel文件") result = {'total': 0, 'success': 0, 'failed': 0} # 输出结果 print(f"\n[INFO] 处理结果: {result}") if __name__ == "__main__": main()