From d8eb25bd47e95f3011cba96a5e2c24c0d2934315 Mon Sep 17 00:00:00 2001 From: "DESKTOP-72TV0V4\\caoxiaozhu" Date: Mon, 5 Jan 2026 10:25:48 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E4=BA=86=E4=B8=80=E4=BA=9B?= =?UTF-8?q?=E6=96=87=E4=BB=B6=E5=90=8D=E5=92=8C=E6=96=87=E4=BB=B6=E4=BF=A1?= =?UTF-8?q?=E6=81=AF=EF=BC=8C=E5=A2=9E=E5=8A=A0=E4=BA=86requirements?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 648 ++++++------------------------- csv2json.py | 970 +++++++++++++++++++++++++++++++++++++++++++++++ requirements.txt | 3 + 3 files changed, 1088 insertions(+), 533 deletions(-) create mode 100644 csv2json.py create mode 100644 requirements.txt diff --git a/README.md b/README.md index fee062d..8b02ee6 100644 --- a/README.md +++ b/README.md @@ -1,579 +1,161 @@ -# 🚀 QA生成器 - 智能问答对生成工具 +# YG_TDGenerator - 数据元素问答生成工具 -一个基于JSON数据的模型微调用问答对生成工具,支持灵活配置和问题复杂度控制。项目已精简到仅**2个核心文件**! +一款用于企业数据治理领域的问答训练数据生成工具。基于远光数据架构表(逻辑模型表、物理模型表、元素治理模板表)生成标准化的问答对,用于训练领域专用问答系统。 -## 📋 目录 +## 功能特点 -- [功能特性](#-功能特性) -- [快速开始](#-快速开始) -- [核心文件](#-核心文件) -- [配置说明](#-配置说明) -- [使用示例](#-使用示例) -- [数据输出](#-数据输出) -- [问答类型](#-问答类型) -- [最佳实践](#-最佳实践) -- [常见问题](#-常见问题) -- [高级技巧](#-高级技巧) -- [项目精简](#-项目精简) -- [数据质量](#-数据质量) +- **三表合并**:自动合并逻辑模型表、物理模型表、元素治理模板表 +- **多种问答类型**:支持字段属性查询、定义查询、模型关联查询 +- **双版本输出**:同时生成训练集和验证集(表达方式不同但语义相同) +- **一体化流程**:CSV转换→JSON合并→随机抽取→QA生成,一条命令完成 ---- +## 目录结构 -## ✨ 功能特性 +``` +YG_TDGenerator/ +├── csv2json.py # 主程序:数据处理一体化工具 +├── qa_generator.py # QA问答对生成器 +├── config.py # 配置文件 +├── requirements.txt # Python依赖 +├── Data/ # Excel源数据目录(可选) +├── Data_Export_CSV/ # CSV导出数据目录 +├── Data_Export_Json/ # JSON中间数据目录 +└── Data_QA_Outputs/ # QA生成结果目录 +``` -### 🎯 核心功能 -- **数据驱动**:基于JSON数据自动生成问答对,不进行任何编撰 -- **多表支持**:支持元素治理模板、物理模型、逻辑模型等多种数据表 -- **格式标准**:严格按照 `{"instruct":"", "input":"", "output":""}` 格式输出 -- **随机打乱**:问答对顺序随机打乱,避免训练时的位置偏好 +## 安装 -### 🎨 问句多样性 -- **表名标识**:所有问句均包含表名信息,如"在元素治理模板中" -- **丰富前缀**:10种问句前缀(请告诉我、查询、请问、在、请解释等) -- **多样模板**:每种数据类型支持10+种不同问法 -- **中英互查**:支持中英文名互查,如"中文名→英文名"、"英文名→中文名" +```bash +pip install -r requirements.txt +``` -### 💬 答句自然化 -- **修饰语前缀**:10种回答前缀(根据表记录、查询结果显示、经查询等) -- **礼貌后缀**:10种后缀(望知悉、以上信息为准、请参考、祝您工作顺利等) -- **自然表达**:让回答更接近人类语言习惯 +### 依赖说明 -### ⚙️ 可配置性 -- **复杂程度控制**:1-5级复杂度,从简单到复杂渐进 -- **问题数量控制**:可设置每个数据项生成的问题数量 -- **多列查询比例**:可控制多列查询问题的占比 -- **模板选择**:可根据复杂程度自动选择问句模板 +- `pandas`: 数据处理 +- `xlwings`: Excel文件读取(可选,用于复杂Excel格式) +- `openpyxl`: Excel文件读取(pandas依赖) ---- +## 使用方法 -## ⚡ 快速开始 +### 1. 准备数据 + +将以下三个CSV文件放入 `Data_Export_CSV/` 目录: + +- `远光数据架构逻辑模型表.csv` +- `远光数据架构物理模型表.csv` +- `远光数据架构元素治理模板表.csv` + +### 2. 数据处理流程 + +运行主程序执行完整的数据处理流程: + +```bash +python csv2json.py +``` + +该命令会执行以下三个步骤: + +1. **CSV/Excel转JSON**:将源数据转换为JSON格式 +2. **JSON合并**:根据字段英文名合并三张表,生成 `final.json` +3. **随机抽取**:从合并结果中随机抽取3000条记录,生成 `selected.json` + +### 3. 生成问答对 -### 方法1: 交互式运行(推荐) ```bash python qa_generator.py ``` -按提示选择配置即可。 -### 方法2: 直接使用预设配置 -```bash -# 简单模式(快速测试) -python -c "from qa_generator import QAGenerator, SIMPLE_CONFIG; QAGenerator(SIMPLE_CONFIG).process_all()" +生成的文件位于 `Data_QA_Outputs/` 目录: -# 普通模式(推荐) -python -c "from qa_generator import QAGenerator, NORMAL_CONFIG; QAGenerator(NORMAL_CONFIG).process_all()" +| 文件 | 说明 | +|------|------| +| `selected_QA.json` | 训练集问答对 | +| `selected_QA_验证集.json` | 验证集问答对 | +| `QA生成报告.json` | 训练集生成报告 | +| `QA生成报告_验证集.json` | 验证集生成报告 | -# 复杂模式(高质量) -python -c "from qa_generator import QAGenerator, COMPLEX_CONFIG; QAGenerator(COMPLEX_CONFIG).process_all()" +## 问答类型说明 + +### 1. 字段属性查询 + +根据字段中文名或英文名查询其他属性值: + +``` +Q: 请告诉我字段中文名为'客户编号'的字段英文名是什么? +A: 该字段的字段英文名为customer_id。 ``` -### 方法3: 生成并合并 -```bash -python -c "from qa_generator import QAGenerator, NORMAL_CONFIG; g = QAGenerator(NORMAL_CONFIG); g.process_all(); g.merge_to_train()" +### 2. 字段定义查询 + +根据字段名查询完整定义(所有属性): + +``` +Q: 字段中文名为'客户编号'的定义是什么? +A: 客户编号的定义为:字段英文名:customer_id,数据类型:varchar,长度:32... ``` ---- +### 3. 模型关联查询 -## 📁 核心文件 +根据逻辑模型或物理模型查询关联字段(仅对字段数≥3的模型生成): -| 文件 | 大小 | 说明 | -|------|------|------| -| **qa_generator.py** | 29KB | 主生成器 - 整合了所有功能 | -| **config.py** | 9KB | 配置文件 - 控制生成参数 | +``` +Q: 逻辑模型中文名为'客户信息模型'包含哪些字段? +A: 逻辑模型中文名为'客户信息模型'包含以下字段:客户编号、客户名称、联系方式... +``` ---- +## 配置选项 -## ⚙️ 配置说明 - -### 复杂程度等级(1-5级) - -| 等级 | 名称 | 单列模板 | 多列模板 | 多列占比 | 问句前缀 | 适用场景 | -|------|------|----------|----------|----------|----------|----------| -| 1 | 简单模式 | 3 | 0 | 0% | 3 | 快速测试 | -| 2 | 简单+模式 | 6 | 1 | 10% | 3 | 轻量训练 | -| 3 | 普通模式 | 9 | 3 | 30% | 10 | 常规训练 | -| 4 | 复杂模式 | 12 | 4 | 30% | 10 | 深度训练 | -| 5 | 最复杂模式 | 12 | 5 | 50% | 10 | 高质量训练 | - -### 预设配置对比 - -| 配置 | 复杂程度 | 生成时间 | QA数量 | 适用场景 | -|------|----------|----------|--------|----------| -| 简单 | 1级 | ~2分钟 | ~20万 | 快速测试 | -| 普通 | 3级 | ~5分钟 | ~60万 | 常规训练 | -| 复杂 | 5级 | ~15分钟 | ~100万 | 高质量训练 | - -### 自定义配置示例 +编辑 `config.py` 文件可修改以下配置: ```python -from qa_generator import QAGenerator, QAConfig - -# 创建配置 -config = QAConfig() -config.COMPLEXITY_LEVEL = 3 # 设置复杂程度 -config.MULTI_COLUMN_RATIO = 0.4 # 设置多列占比 -config.OUTPUT_DIR = "MyQA_Output" # 设置输出目录 - -# 生成QA -generator = QAGenerator(config) -generator.process_all() +class QAConfig: + INPUT_FILE = "selected.json" # 输入文件名 + INPUT_DIR = "Data_Export_Json" # 输入目录 + OUTPUT_DIR = "Data_QA_Outputs" # 输出目录 + RANDOM_SEED = 42 # 随机种子 + SHUFFLE_OUTPUT = False # 是否打乱输出顺序 ``` ---- +## 输出格式 -## 💡 使用示例 - -### 示例1: 使用简单配置 - -```python -from qa_generator import QAGenerator, SIMPLE_CONFIG - -# 创建生成器 -generator = QAGenerator(SIMPLE_CONFIG) - -# 生成问答对 -generator.process_all() -``` - -### 示例2: 自定义配置 - -```python -from qa_generator import QAGenerator, QAConfig - -# 创建自定义配置 -config = QAConfig() -config.COMPLEXITY_LEVEL = 2 -config.MULTI_COLUMN_RATIO = 0.2 -config.BASIC_QUESTIONS_PER_ITEM = 2 - -# 创建生成器 -generator = QAGenerator(config) -generator.process_all() -``` - -### 示例3: 批量生成不同复杂度的数据 - -```python -from qa_generator import SIMPLE_CONFIG, NORMAL_CONFIG, COMPLEX_CONFIG, QAGenerator - -configs = [ - (SIMPLE_CONFIG, "简单版"), - (NORMAL_CONFIG, "普通版"), - (COMPLEX_CONFIG, "复杂版") -] - -for config, name in configs: - print(f"\n正在生成{name}问答对...") - config.OUTPUT_DIR = f"Data_QA_Outputs_{name}" - generator = QAGenerator(config) - generator.process_all() -``` - ---- - -## 📊 数据输出 - -### 输出文件结构 - -每个JSON文件包含多个问答对,格式如下: +### 训练集/验证集格式 ```json [ { - "instruct": "在元素治理模板中,「投资原因」对应的英文名是什么?", - "input": "", - "output": "根据表记录,该字段的英文名为「investReas」,以上信息为准。" - }, - { - "instruct": "请列举元素治理模板中「投资原因」的值类型和总长度", - "input": "", - "output": "根据表记录,该字段的值类型为「字符」,以及总长度为500.0,望知悉。" + "问题": "请告诉我字段中文名为'客户编号'的字段英文名是什么?", + "回答": "该字段的字段英文名为customer_id。" } ] ``` -### 生成的文件 +### 训练集 vs 验证集区别 -``` -Data_QA_Outputs/ -├── 元素治理模板_QA.json (58MB, 25.8万条QA) -├── 物理模型_QA.json (396MB, 182万条QA) -├── 逻辑模型_QA.json (166MB, 73.5万条QA) -├── train.json (619MB, 282万条QA) ⭐ 训练用 -└── train_stats.json (合并统计) -``` +| 特性 | 训练集 | 验证集 | +|------|--------|--------| +| 问句风格 | "请告诉我"、"查询" | "请问"、"想咨询一下" | +| 答句风格 | "该字段的"、"查询结果:" | "根据查询,"、"经核实," | +| 语义 | 相同 | 相同 | +| 表达 | 基础正式 | 正式但有差异 | -### 问答类型 +## 完整工作流程示例 -#### 单列查询 -- 问"字段A"的值类型 -- 问"字段A"的长度 -- 问"字段A"的英文名 -- 问"字段A"属于哪个类别 - -#### 多列查询 -- 请列举"字段A"的值类型和长度 -- 请输出"字段A"的类别、业务领域和是否枚举信息 -- 请查找"字段A"的英文名和说明信息 - -### 生成统计 - -| 配置模式 | 元素治理模板 | 物理模型 | 逻辑模型 | 总计 | -|----------|--------------|----------|----------|------| -| 简单模式 | ~50,000条 | ~100,000条 | ~50,000条 | ~200,000条 | -| 普通模式 | ~150,000条 | ~300,000条 | ~150,000条 | ~600,000条 | -| 复杂模式 | ~250,000条 | ~500,000条 | ~250,000条 | ~1,000,000条 | - -*注:实际数量根据原始数据量而定* - ---- - -## 📊 问答类型说明 - -### 单列查询示例 - -**问句模式:** -- "在元素治理模板中,「投资原因」对应的英文名是什么?" -- "查询物理模型中,数据元素「账套代码」的值类型是什么?" -- "「是否叶子节点」这个字段在逻辑模型中属于哪个业务领域?" - -**答句模式:** -- "根据表记录,该字段的英文名为「investReas」,以上信息为准。" -- "查询结果显示,值类型为「字符」,望知悉。" - -### 多列查询示例 - -**问句模式:** -- "请列举元素治理模板中「投资原因」的值类型和总长度" -- "请输出「账套代码」在元素治理模板中的类别、业务领域和是否枚举信息" - -**答句模式:** -- "根据表记录,该字段的值类型为「字符」,以及总长度为500.0,望知悉。" -- "查询得知,该数据元素的类别为「业务」,与业务领域为「供应链-数据同步」,以及是否枚举为「否」,祝您工作顺利。" - ---- - -## 🎯 最佳实践 - -### 场景1: 快速验证功能 ```bash -# 使用简单配置,仅生成基础问答 -python -c "from qa_generator import QAGenerator, SIMPLE_CONFIG; QAGenerator(SIMPLE_CONFIG).process_all()" -``` -- 耗时:约1-2分钟 -- 输出:约20万条问答 -- 用途:功能验证、代码测试 +# 步骤1:数据处理(CSV转JSON + 合并 + 抽取) +python csv2json.py -### 场景2: 常规模型训练 -```bash -# 使用普通配置,平衡质量和效率 -python -c "from qa_generator import QAGenerator, NORMAL_CONFIG; QAGenerator(NORMAL_CONFIG).process_all()" -``` -- 耗时:约5-10分钟 -- 输出:约60万条问答 -- 用途:模型训练、数据集准备 - -### 场景3: 高质量模型训练 -```bash -# 使用复杂配置,生成最丰富的问答 -python -c "from qa_generator import QAGenerator, COMPLEX_CONFIG; QAGenerator(COMPLEX_CONFIG).process_all()" -``` -- 耗时:约15-30分钟 -- 输出:约100万条问答 -- 用途:高质量模型训练 - -### 场景4: 生成多个复杂度版本 -```python -# 同时生成三个版本 -from qa_generator import SIMPLE_CONFIG, NORMAL_CONFIG, COMPLEX_CONFIG, QAGenerator - -for config, name in [(SIMPLE_CONFIG, "Simple"), (NORMAL_CONFIG, "Normal"), (COMPLEX_CONFIG, "Complex")]: - config.OUTPUT_DIR = f"Data_QA_Outputs_{name}" - print(f"正在生成{name}版本...") - generator = QAGenerator(config) - generator.process_all() -``` - ---- - -## ❓ 常见问题 - -### Q1: 如何调整生成的问题数量? -A: 修改配置文件中的 `BASIC_QUESTIONS_PER_ITEM` 和 `MAX_QUESTIONS_PER_ITEM`: -```python -config.BASIC_QUESTIONS_PER_ITEM = 5 # 基础问题数 -config.MAX_QUESTIONS_PER_ITEM = 20 # 最大问题数 -``` - -### Q2: 如何只处理特定的数据表? -A: 修改 `config.py` 中的 `DATA_FILES` 配置: -```python -self.DATA_FILES = [ - { - "name": "元素治理模板", - "file": "元素治理模板.json", - "output": "元素治理模板_QA.json", - "enabled": True # True=处理,False=跳过 - }, - { - "name": "物理模型", - "file": "物理模型.json", - "output": "物理模型_QA.json", - "enabled": False # 跳过物理模型 - } -] -``` - -### Q3: 如何控制输出文件大小? -A: 使用不同复杂程度配置: -- 简单模式:文件较小(约20-50MB) -- 普通模式:文件中等(约100-200MB) -- 复杂模式:文件较大(约300-500MB) - -### Q4: 如何禁用打乱顺序? -A: 设置 `SHUFFLE_OUTPUT = False`: -```python -config.SHUFFLE_OUTPUT = False # 保持原始顺序 -``` - -### Q5: 如何查看生成统计? -A: 查看生成的 `QA生成报告.json` 文件,包含: -- 总问答对数量 -- 各文件问答对数量 -- 配置信息 -- 生成特点 - -### Q6: 如何自定义配置? -```python -from qa_generator import QAGenerator, QAConfig - -config = QAConfig() -config.COMPLEXITY_LEVEL = 3 -config.MULTI_COLUMN_RATIO = 0.4 -config.OUTPUT_DIR = "MyQA_Output" - -generator = QAGenerator(config) -generator.process_all() -``` - -### Q7: 数据来源? -- 基于 `Data_Export_Json/` 目录下的JSON文件 -- 严格忠于原文,未编撰 - -### Q8: 支持多少QA? -- 简单模式:~20万条 -- 普通模式:~60万条 -- 复杂模式:~100万条 - ---- - -## 🔧 高级技巧 - -### 1. 批量生成不同配置 -```python -from qa_generator import SIMPLE_CONFIG, NORMAL_CONFIG, COMPLEX_CONFIG, QAGenerator - -configs = { - "Level1": SIMPLE_CONFIG, - "Level3": NORMAL_CONFIG, - "Level5": COMPLEX_CONFIG -} - -for level, config in configs.items(): - config.OUTPUT_DIR = f"QA_Output_{level}" - print(f"生成 {level} 级别数据...") - generator = QAGenerator(config) - generator.process_all() -``` - -### 2. 自定义问句模板 -修改 `qa_generator.py` 中的模板函数: -```python -def generate_single_qa(self, item: Dict, template_count: int, data_type: str): - # 在这里添加自定义模板 - templates = [] - # 添加你的自定义模板... - return qa_pairs -``` - -### 3. 过滤特定数据 -在生成前过滤数据: -```python -def generate_qa_for_data(self, data: List[Dict], data_type: str) -> List[Dict]: - # 过滤条件示例:只保留业务领域包含"供应链"的数据 - filtered_data = [item for item in data if "供应链" in item.get("业务领域名称", "")] - # 生成QA... -``` - ---- - -## 📝 输出格式说明 - -### JSON格式 -```json -[ - { - "instruct": "问题内容", - "input": "", - "output": "答案内容" - }, - { - "instruct": "问题内容", - "input": "", - "output": "答案内容" - } -] -``` - -### 字段说明 -- `instruct`: 指令/问题内容(必填) -- `input`: 输入内容(通常为空) -- `output`: 输出/答案内容(必填) - ---- - -## 🎉 项目精简 - -### 精简前后对比 - -#### 精简前 -- ❌ 5个生成器文件 (generate_qa.py, generate_qa_v2.py, generate_qa_advanced.py, merge_qa_to_train.py, demo.py) -- ❌ 类太多,方法分散 -- ❌ 功能重复,代码冗余 -- ❌ 使用复杂,需要记多个文件名 - -#### 精简后 -- ✅ **2个核心文件** (qa_generator.py, config.py) -- ✅ **1个主生成器类** (QAGenerator) -- ✅ **所有功能整合** (生成、合并、报告) -- ✅ **使用简单** (一个命令搞定) - -### 精简效果 - -| 指标 | 精简前 | 精简后 | 改进 | -|------|--------|--------|------| -| 核心文件数 | 5个 | 2个 | ⬇️ 60% | -| 生成器类数 | 2个 | 1个 | ⬇️ 50% | -| 主要方法数 | 分散 | 集中 | ✅ 更好 | -| 使用复杂度 | 高 | 低 | ✅ 更好 | -| 代码行数 | 1000+ | ~800 | ⬇️ 20% | - -### 用户收益 -1. **更简单** - 只需要记一个文件名 -2. **更高效** - 一条命令完成所有操作 -3. **更灵活** - 支持交互式和命令行两种方式 -4. **更完整** - 所有功能都在一个文件中 - ---- - -## 📈 数据质量 - -### ✅ 质量保证 -- **数据完整性**: 100% - 所有JSON字段完整提取 -- **格式标准性**: 100% - 严格遵循 `{"instruct":"", "input":"", "output":""}` 格式 -- **内容真实性**: 100% - 忠于原文,未编撰 -- **随机性**: ✅ - 问答对顺序随机打乱 -- **多样性**: 10+种问句模板,10+种答句修饰 -- **可重现性**: 随机种子固定,结果可重现 - -### ✅ 验证结果 -```python -# 验证代码 -import json - -with open('Data_QA_Outputs/train.json', 'r', encoding='utf-8') as f: - train_data = json.load(f) - -print(f"总问答对数量: {len(train_data):,}") -print(f"数据类型: {type(train_data)}") -print(f"字段完整性: {all('instruct' in item and 'output' in item for item in train_data)}") -``` - -**验证结果**: ✅ 全部通过 -- 总问答对数量: 2,820,519 -- 数据类型: list -- 字段完整性: True - -### 生成统计 -- **总问答对**: 2,820,519 条 -- **元素治理**: 258,579 条 (9.2%) -- **物理模型**: 1,826,268 条 (64.8%) -- **逻辑模型**: 735,672 条 (26.1%) -- **文件大小**: 619 MB - ---- - -## ⚠️ 注意事项 - -### 数据要求 -1. **输入JSON格式**:必须是包含字典列表的JSON文件 -2. **字段完整性**:确保JSON中包含必要的字段(如"表名"、"数据元素中文名"等) -3. **编码格式**:使用UTF-8编码保存JSON文件 - -### 性能优化 -1. **大文件处理**:对于大量数据的JSON文件,建议分批处理 -2. **内存使用**:生成的QA文件较大,注意磁盘空间 -3. **运行时间**:复杂模式下生成时间较长,请耐心等待 - -### 自定义建议 -1. **复杂程度选择**: - - 测试阶段:使用简单模式(复杂程度=1-2) - - 训练阶段:使用普通模式(复杂程度=3) - - 精细调优:使用复杂模式(复杂程度=4-5) - -2. **多列查询比例**: - - 初次使用:0.1-0.2 - - 常规使用:0.3 - - 高质量训练:0.5 - ---- - -## 📞 技术支持 - -### 文档资源 -- 本文档 - 完整项目文档 -- sample_qa.json - 模板参考 - -### 示例代码 -```python -# 基础用法 -from qa_generator import QAGenerator, NORMAL_CONFIG -QAGenerator(NORMAL_CONFIG).process_all() - -# 自定义配置 -from qa_generator import QAConfig, QAGenerator -config = QAConfig() -config.COMPLEXITY_LEVEL = 3 -QAGenerator(config).process_all() -``` - ---- - -## 🎊 项目总结 - -### 成果亮点 -1. ✅ **功能完整** - 从基础版到高级版,满足不同需求 -2. ✅ **配置灵活** - 1-5级复杂程度控制,适应多种场景 -3. ✅ **数据优质** - 280万+条QA,100%忠于原文 -4. ✅ **文档完善** - 整合文档体系,易于理解和使用 -5. ✅ **即用即跑** - 无需安装,直接运行 - -### 技术指标 -- **代码行数**: 800+ 行 -- **文档页数**: 整合版 -- **支持功能**: 100% 完成 -- **测试覆盖**: 100% 通过 - -### 用户价值 -- **节省时间**: 从手动编写到自动生成,效率提升1000倍 -- **保证质量**: 统一格式,规范数据,减少错误 -- **易于使用**: 简单命令即可生成,无需编程知识 -- **高度可配**: 满足从测试到生产的各种需求 - ---- - -**感谢使用QA生成器!** 🎉 - -**立即开始使用:** -```bash +# 步骤2:生成问答对(训练集 + 验证集) python qa_generator.py ``` + +## 注意事项 + +1. 确保源数据文件编码为UTF-8或GBK +2. 字段英文名会自动转换为小写 +3. 模型关联查询仅对字段数≥3的模型生成,避免产生过多低价值问答 +4. 训练集和验证集使用不同的表达模板,但语义完全相同 +5. 随机种子固定为42,保证结果可复现 + +## 许可证 + +MIT License diff --git a/csv2json.py b/csv2json.py new file mode 100644 index 0000000..6277668 --- /dev/null +++ b/csv2json.py @@ -0,0 +1,970 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +数据处理一体化工具 +功能1:Excel/CSV转JSON - 读取Excel/CSV文件并转换为JSON +功能2:JSON合并 - 根据字段英文名匹配逻辑模型表、物理模型表和元素治理模板表的数据 +功能3:随机抽取 - 从合并后的JSON中随机抽取指定数量的记录 +支持多种Excel读取方式,自动处理复杂格式 +""" + +import pandas as pd +import json +import os +import glob +import subprocess +import xlwings as xw +import random +from datetime import datetime +from collections import defaultdict +from typing import Optional, Dict, List, Tuple, Any + + +class ExcelToJsonConverter: + """Excel转JSON转换器""" + + def __init__(self, input_dir: str, output_dir: str): + """ + 初始化转换器 + + Args: + input_dir: Excel文件输入目录 + output_dir: JSON文件输出目录 + """ + self.input_dir = input_dir + self.output_dir = output_dir + + # 确保输出目录存在 + if not os.path.exists(output_dir): + os.makedirs(output_dir) + + # CSV临时目录(仅在Excel模式下使用) + self.temp_csv_dir = None + + def find_excel_files(self) -> List[Tuple[str, str]]: + """扫描目录下的所有Excel文件""" + excel_files = [] + search_pattern = os.path.join(self.input_dir, "*.xlsx") + + for excel_path in glob.glob(search_pattern): + filename = os.path.basename(excel_path) + + # 跳过临时文件(Excel的临时文件以~$开头) + if filename.startswith('~$'): + print(f"[SKIP] 跳过临时文件: {filename}") + continue + + # 生成基础文件名(不含扩展名) + base_name = filename.replace('.xlsx', '') + excel_files.append((excel_path, base_name)) + + return excel_files + + def read_excel_with_xlwings(self, excel_path: str) -> Optional[pd.DataFrame]: + """使用xlwings读取Excel文件""" + try: + print(f" [TRY] 使用xlwings读取...") + app = xw.App(visible=False) + wb = app.books.open(excel_path) + sheet = wb.sheets[0] + + # 读取数据 + data = sheet.range('A1').expand().value + wb.close() + app.quit() + + # 转换为DataFrame + if data and len(data) > 0: + if isinstance(data[0], list): + # 标准表格格式 + headers = data[0] + rows = data[1:] if len(data) > 1 else [] + df = pd.DataFrame(rows, columns=headers) + else: + # 每行只有一个值的特殊格式 + df = pd.DataFrame(data, columns=['内容']) + return df + return None + + except ImportError: + print(f" [WARN] xlwings未安装") + return None + except Exception as e: + print(f" [WARN] xlwings读取失败: {str(e)[:100]}") + return None + + def read_excel_with_libreoffice(self, excel_path: str) -> Optional[pd.DataFrame]: + """使用LibreOffice转换后读取""" + try: + print(f" [TRY] 使用LibreOffice转换...") + # 输出CSV路径 + csv_path = excel_path.replace('.xlsx', '_temp.csv') + + # 使用LibreOffice转换 + cmd = [ + 'libreoffice', + '--headless', + '--convert-to', 'csv', + '--outdir', os.path.dirname(excel_path), + excel_path + ] + + result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) + + if os.path.exists(csv_path): + df = pd.read_csv(csv_path, encoding='utf-8') + # 删除临时文件 + os.remove(csv_path) + print(f" [OK] LibreOffice转换成功") + return df + else: + print(f" [WARN] LibreOffice转换失败") + return None + + except FileNotFoundError: + print(f" [WARN] LibreOffice未安装") + return None + except subprocess.TimeoutExpired: + print(f" [WARN] LibreOffice转换超时") + return None + except Exception as e: + print(f" [WARN] LibreOffice转换失败: {e}") + return None + + def read_excel_with_pandas(self, excel_path: str) -> Optional[pd.DataFrame]: + """使用pandas读取Excel文件""" + engines = ['openpyxl', 'xlrd'] + + for engine in engines: + try: + print(f" [TRY] 使用pandas ({engine})读取...") + df = pd.read_excel(excel_path, engine=engine) + print(f" [OK] pandas ({engine}) 读取成功") + return df + except Exception as e: + print(f" [WARN] pandas ({engine}) 失败: {str(e)[:100]}") + continue + + return None + + def read_excel_file(self, excel_path: str) -> Optional[pd.DataFrame]: + """ + 尝试多种方法读取Excel文件 + + Args: + excel_path: Excel文件路径 + + Returns: + DataFrame或None + """ + print(f"\n[INFO] 读取文件: {os.path.basename(excel_path)}") + + # 按优先级尝试读取方法 + methods = [ + ("xlwings", self.read_excel_with_xlwings), + ("pandas-openpyxl", lambda p: self.read_excel_with_pandas(p) if 'openpyxl' in str(p) else None), + ("LibreOffice", self.read_excel_with_libreoffice), + ("pandas-xlrd", self.read_excel_with_pandas), + ] + + for method_name, method_func in methods: + try: + if method_name == "pandas-openpyxl": + # 特殊处理pandas-openpyxl + df = self.read_excel_with_pandas(excel_path) + elif method_name == "pandas-xlrd": + # 跳过,因为上面已经尝试过了 + continue + else: + df = method_func(excel_path) + + if df is not None and not df.empty: + print(f"[OK] {method_name} 成功读取!") + print(f" 数据形状: {df.shape[0]}行 × {df.shape[1]}列") + return df + + except Exception as e: + print(f"[WARN] {method_name} 失败: {str(e)[:100]}") + + print(f"[ERROR] 所有读取方法都失败了") + return None + + def convert_to_csv(self, df: pd.DataFrame, base_name: str) -> str: + """ + 将DataFrame转换为CSV + + Args: + df: 数据框 + base_name: 文件基础名 + + Returns: + CSV文件路径 + """ + # 确保临时CSV目录存在 + if self.temp_csv_dir is None: + self.temp_csv_dir = os.path.join(self.output_dir, "temp_csv") + if not os.path.exists(self.temp_csv_dir): + os.makedirs(self.temp_csv_dir) + + csv_filename = f"{base_name}.csv" + csv_path = os.path.join(self.temp_csv_dir, csv_filename) + + # 保存为CSV,使用utf-8-sig编码支持中文 + df.to_csv(csv_path, index=False, encoding='utf-8-sig') + + file_size = os.path.getsize(csv_path) / 1024 # KB + print(f" [OK] CSV已生成: {csv_filename} ({file_size:.1f} KB)") + + return csv_path + + def convert_csv_to_json(self, csv_path: str, base_name: str) -> str: + """ + 将CSV文件转换为JSON + + Args: + csv_path: CSV文件路径 + base_name: 文件基础名 + + Returns: + JSON文件路径 + """ + try: + # 读取CSV文件 + df = pd.read_csv(csv_path, encoding='utf-8-sig') + + if df.empty: + print(f" [WARN] CSV文件为空") + return "" + + # 转换为JSON列表 + json_data = [] + for index, row in df.iterrows(): + # 创建JSON对象 + json_obj = {} + for column in df.columns: + value = row[column] + + # 处理Na值 + if pd.isna(value): + json_obj[column] = None + else: + # 处理数据值:如果是字符串且包含英文字母,转换为小写 + if isinstance(value, str) and any(c.isalpha() and ord(c) < 128 for c in value): + # 将数据值中的英文字母转换为小写 + value = value.lower() + + # 将英文字段名转换为小写 + # 检查字段名是否完全是英文字符(包括字母、数字、下划线) + if all(ord(c) < 128 for c in column if c.isalnum() or c in '_'): + # 完全是英文字段名,转换为小写 + json_obj[column.lower()] = value + else: + # 包含中文字符的字段名保持不变 + json_obj[column] = value + + # 添加表名字段 + json_obj['表名'] = base_name + + json_data.append(json_obj) + + # 生成JSON文件路径 + json_filename = f"{base_name}.json" + json_path = os.path.join(self.output_dir, json_filename) + + # 保存JSON文件 + with open(json_path, 'w', encoding='utf-8') as f: + json.dump(json_data, f, ensure_ascii=False, indent=2) + + file_size = os.path.getsize(json_path) / 1024 # KB + print(f" [OK] JSON已生成: {json_filename} ({file_size:.1f} KB)") + print(f" 数据量: {len(json_data)} 条记录") + + return json_path + + except Exception as e: + print(f" [ERROR] CSV转JSON失败: {e}") + import traceback + traceback.print_exc() + return "" + + def process_single_file(self, excel_path: str, base_name: str) -> bool: + """ + 处理单个Excel文件:Excel -> CSV -> JSON + + Args: + excel_path: Excel文件路径 + base_name: 文件基础名 + + Returns: + 是否成功 + """ + print(f"\n{'='*60}") + print(f"处理: {os.path.basename(excel_path)}") + print(f"{'='*60}") + + # 步骤1: 读取Excel + df = self.read_excel_file(excel_path) + if df is None: + print(f"[ERROR] 读取失败,跳过此文件") + return False + + # 显示数据预览 + print(f"\n[INFO] 数据预览:") + print(df.head(3)) + + # 步骤2: 转换为CSV + csv_path = self.convert_to_csv(df, base_name) + + # 步骤3: 转换为JSON + json_path = self.convert_csv_to_json(csv_path, base_name) + + if json_path: + print(f"\n[OK] 转换完成!") + return True + else: + print(f"\n[ERROR] 转换失败") + return False + + def process_all(self) -> Dict: + """ + 处理所有Excel文件 + + Returns: + 处理结果统计 + """ + print("="*60) + print("Excel转JSON一体化工具") + print("="*60) + print(f"输入目录: {self.input_dir}") + print(f"输出目录: {self.output_dir}") + + # 查找Excel文件 + excel_files = self.find_excel_files() + + if not excel_files: + print(f"\n[WARN] 未找到任何Excel文件") + return {'total': 0, 'success': 0, 'failed': 0} + + print(f"\n[INFO] 发现 {len(excel_files)} 个Excel文件") + + # 处理每个文件 + success_count = 0 + failed_count = 0 + results = [] + + for excel_path, base_name in excel_files: + if self.process_single_file(excel_path, base_name): + success_count += 1 + results.append({'file': os.path.basename(excel_path), 'status': 'success'}) + else: + failed_count += 1 + results.append({'file': os.path.basename(excel_path), 'status': 'failed'}) + + # 输出统计信息 + print(f"\n{'='*60}") + print("转换完成!") + print(f"{'='*60}") + print(f"总计: {len(excel_files)} 个文件") + print(f"成功: {success_count} 个文件") + print(f"失败: {failed_count} 个文件") + + # 显示生成的JSON文件 + if success_count > 0: + print(f"\n生成的JSON文件:") + json_files = glob.glob(os.path.join(self.output_dir, "*.json")) + for json_file in sorted(json_files): + file_size = os.path.getsize(json_file) / 1024 # KB + filename = os.path.basename(json_file) + print(f" - {filename} ({file_size:.1f} KB)") + + return { + 'total': len(excel_files), + 'success': success_count, + 'failed': failed_count, + 'results': results + } + + def find_csv_files(self, csv_dir: str) -> List[Tuple[str, str]]: + """扫描目录下的所有CSV文件""" + csv_files = [] + search_pattern = os.path.join(csv_dir, "*.csv") + + for csv_path in glob.glob(search_pattern): + filename = os.path.basename(csv_path) + # 生成基础文件名(不含扩展名) + base_name = filename.replace('.csv', '') + csv_files.append((csv_path, base_name)) + + return csv_files + + def convert_csv_to_json_direct(self, csv_path: str, base_name: str) -> str: + """ + 直接将CSV文件转换为JSON(不生成临时CSV) + 这个方法直接从CSV读取并转换为JSON + + Args: + csv_path: CSV文件路径 + base_name: 文件基础名 + + Returns: + JSON文件路径 + """ + try: + # 尝试多种编码读取CSV文件 + encodings = ['utf-8-sig', 'gb2312', 'gbk', 'utf-8'] + df = None + + for encoding in encodings: + try: + print(f" [TRY] 尝试编码: {encoding}") + df = pd.read_csv(csv_path, encoding=encoding) + print(f" [OK] 编码 {encoding} 读取成功") + break + except (UnicodeDecodeError, UnicodeError): + print(f" [WARN] 编码 {encoding} 失败") + continue + except Exception as e: + print(f" [WARN] 编码 {encoding} 其他错误: {str(e)[:50]}") + continue + + if df is None: + print(f" [ERROR] 所有编码都失败,无法读取CSV文件") + return "" + + if df.empty: + print(f" [WARN] CSV文件为空") + return "" + + # 转换为JSON列表 + json_data = [] + for index, row in df.iterrows(): + # 创建JSON对象 + json_obj = {} + for column in df.columns: + value = row[column] + + # 处理Na值 + if pd.isna(value): + json_obj[column] = None + else: + # 处理数据值:如果是字符串且包含英文字母,转换为小写 + if isinstance(value, str) and any(c.isalpha() and ord(c) < 128 for c in value): + # 将数据值中的英文字母转换为小写 + value = value.lower() + + # 将英文字段名转换为小写 + # 检查字段名是否完全是英文字符(包括字母、数字、下划线) + if all(ord(c) < 128 for c in column if c.isalnum() or c in '_'): + # 完全是英文字段名,转换为小写 + json_obj[column.lower()] = value + else: + # 包含中文字符的字段名保持不变 + json_obj[column] = value + + # 添加表名字段 + json_obj['表名'] = base_name + + json_data.append(json_obj) + + # 生成JSON文件路径 + json_filename = f"{base_name}.json" + json_path = os.path.join(self.output_dir, json_filename) + + # 保存JSON文件 + with open(json_path, 'w', encoding='utf-8') as f: + json.dump(json_data, f, ensure_ascii=False, indent=2) + + file_size = os.path.getsize(json_path) / 1024 # KB + print(f" [OK] JSON已生成: {json_filename} ({file_size:.1f} KB)") + print(f" 数据量: {len(json_data)} 条记录") + + return json_path + + except Exception as e: + print(f" [ERROR] CSV转JSON失败: {e}") + import traceback + traceback.print_exc() + return "" + + def process_single_csv(self, csv_path: str, base_name: str) -> bool: + """ + 处理单个CSV文件:CSV → JSON + + Args: + csv_path: CSV文件路径 + base_name: 文件基础名 + + Returns: + 是否成功 + """ + print(f"\n{'='*60}") + print(f"处理: {os.path.basename(csv_path)}") + print(f"{'='*60}") + + # 步骤1: 读取CSV文件并预览 + try: + # 尝试多种编码读取CSV文件 + encodings = ['utf-8-sig', 'gb2312', 'gbk', 'utf-8'] + df = None + + for encoding in encodings: + try: + df = pd.read_csv(csv_path, encoding=encoding) + break + except (UnicodeDecodeError, UnicodeError): + continue + except Exception as e: + print(f"[ERROR] 编码 {encoding} 错误: {e}") + continue + + if df is None or df.empty: + print(f"[ERROR] CSV文件为空或读取失败") + return False + + print(f"\n[INFO] 数据预览:") + print(df.head(3)) + print(f"\n[INFO] 数据形状: {df.shape[0]}行 × {df.shape[1]}列") + + except Exception as e: + print(f"[ERROR] 读取CSV失败: {e}") + return False + + # 步骤2: 转换为JSON + json_path = self.convert_csv_to_json_direct(csv_path, base_name) + + if json_path: + print(f"\n[OK] 转换完成!") + return True + else: + print(f"\n[ERROR] 转换失败") + return False + + def convert_csv_directory(self, csv_dir: str) -> Dict: + """ + 处理CSV目录下的所有CSV文件 + + Args: + csv_dir: CSV文件目录 + + Returns: + 处理结果统计 + """ + print("="*60) + print("CSV转JSON工具") + print("="*60) + print(f"CSV输入目录: {csv_dir}") + print(f"JSON输出目录: {self.output_dir}") + + # 查找CSV文件 + csv_files = self.find_csv_files(csv_dir) + + if not csv_files: + print(f"\n[WARN] 未找到任何CSV文件") + return {'total': 0, 'success': 0, 'failed': 0} + + print(f"\n[INFO] 发现 {len(csv_files)} 个CSV文件") + + # 处理每个文件 + success_count = 0 + failed_count = 0 + results = [] + + for csv_path, base_name in csv_files: + if self.process_single_csv(csv_path, base_name): + success_count += 1 + results.append({'file': os.path.basename(csv_path), 'status': 'success'}) + else: + failed_count += 1 + results.append({'file': os.path.basename(csv_path), 'status': 'failed'}) + + # 输出统计信息 + print(f"\n{'='*60}") + print("转换完成!") + print(f"{'='*60}") + print(f"总计: {len(csv_files)} 个文件") + print(f"成功: {success_count} 个文件") + print(f"失败: {failed_count} 个文件") + + # 显示生成的JSON文件 + if success_count > 0: + print(f"\n生成的JSON文件:") + json_files = glob.glob(os.path.join(self.output_dir, "*.json")) + for json_file in sorted(json_files): + file_size = os.path.getsize(json_file) / 1024 # KB + filename = os.path.basename(json_file) + print(f" - {filename} ({file_size:.1f} KB)") + + return { + 'total': len(csv_files), + 'success': success_count, + 'failed': failed_count, + 'results': results + } + + +class JsonMerger: + """JSON文件合并器""" + + def __init__(self, output_dir: str): + self.output_dir = output_dir + + def load_json_file(self, file_path: str) -> List[Dict[str, Any]]: + """加载JSON文件""" + try: + with open(file_path, 'r', encoding='utf-8') as f: + data = json.load(f) + print(f"[OK] 加载文件: {os.path.basename(file_path)} - {len(data)} 条记录") + return data + except Exception as e: + print(f"[ERROR] 加载文件失败 {file_path}: {e}") + return [] + + def build_index(self, records: List[Dict], field_name: str) -> Dict[str, List[Dict]]: + """为记录列表建立索引,加速查找""" + index = defaultdict(list) + for record in records: + field_value = record.get(field_name) + if field_value: + index[field_value].append(record) + print(f"[INFO] 建立索引完成: {len(index)} 个唯一字段值") + return index + + def merge_records_optimized(self, logical_index: Dict, physical_index: Dict, element_records: List[Dict]) -> List[Dict]: + """ + 使用索引优化合并三个表的记录 + """ + merged_data = [] + processed_fields = set() + + # 遍历元素治理表 + print(f"\n[INFO] 开始合并数据...") + for i, element_record in enumerate(element_records): + if i % 5000 == 0: + print(f" 处理进度: {i}/{len(element_records)}") + + field_english_name = element_record.get('字段英文名') + if not field_english_name or field_english_name in processed_fields: + continue + + processed_fields.add(field_english_name) + + # 创建合并记录 + merged_record = {} + + # 添加元素治理模板表的数据 + for key, value in element_record.items(): + if key != '表名': + merged_record[key] = value + + # 查找逻辑模型表中的匹配记录 + logical_matches = logical_index.get(field_english_name, []) + + # 查找物理模型表中的匹配记录 + physical_matches = physical_index.get(field_english_name, []) + + # 添加逻辑模型表的数据(添加前缀避免冲突) + if logical_matches: + for logical_match in logical_matches: + for key, value in logical_match.items(): + if key not in ['表名', '字段英文名']: + new_key = f"逻辑模型_{key}" + merged_record[new_key] = value + + # 只有当有匹配数据时才添加表名信息 + merged_record['逻辑模型表_表名'] = '远光数据架构逻辑模型表' + + # 添加物理模型表的数据(添加前缀避免冲突) + if physical_matches: + for physical_match in physical_matches: + for key, value in physical_match.items(): + if key not in ['表名', '字段英文名']: + new_key = f"物理模型_{key}" + merged_record[new_key] = value + + # 只有当有匹配数据时才添加表名信息 + merged_record['物理模型表_表名'] = '远光数据架构物理模型表' + + # 添加元素治理表名(始终存在) + merged_record['元素治理表_表名'] = '远光数据架构元素治理模板表' + + merged_data.append(merged_record) + + print(f" 完成合并: {len(merged_data)} 条记录") + + return merged_data + + def merge_all(self, logical_file: str, physical_file: str, element_file: str, output_file: str) -> Dict: + """合并所有JSON文件""" + print("="*60) + print("优化版JSON文件合并工具") + print("="*60) + + # 构建文件路径 + logical_json_path = os.path.join(self.output_dir, logical_file) + physical_json_path = os.path.join(self.output_dir, physical_file) + element_json_path = os.path.join(self.output_dir, element_file) + output_path = os.path.join(self.output_dir, output_file) + + # 加载JSON文件 + print("\n[INFO] 加载JSON文件...") + logical_records = self.load_json_file(logical_json_path) + physical_records = self.load_json_file(physical_json_path) + element_records = self.load_json_file(element_json_path) + + if not (logical_records and physical_records and element_records): + print("\n[ERROR] 无法加载所有JSON文件") + return {'success': False, 'merged_count': 0} + + # 建立索引 + print(f"\n[INFO] 建立索引加速查找...") + logical_index = self.build_index(logical_records, '字段英文名') + physical_index = self.build_index(physical_records, '字段英文名') + + # 合并数据(只处理元素治理表中存在的字段) + merged_data = self.merge_records_optimized(logical_index, physical_index, element_records) + + # 保存合并后的数据 + try: + print(f"\n[INFO] 保存合并数据到 {output_path}...") + with open(output_path, 'w', encoding='utf-8') as f: + json.dump(merged_data, f, ensure_ascii=False, indent=2) + + file_size = os.path.getsize(output_path) / 1024 # KB + print(f"\n[OK] 合并完成!") + print(f" 输出文件: {output_path}") + print(f" 合并记录: {len(merged_data)} 条") + print(f" 文件大小: {file_size:.1f} KB") + + # 显示统计信息 + three_table_match = sum(1 for r in merged_data if r.get('元素治理表_表名') and r.get('逻辑模型表_表名') and r.get('物理模型表_表名')) + element_logical_match = sum(1 for r in merged_data if r.get('元素治理表_表名') and r.get('逻辑模型表_表名') and not r.get('物理模型表_表名')) + element_physical_match = sum(1 for r in merged_data if r.get('元素治理表_表名') and r.get('物理模型表_表名') and not r.get('逻辑模型表_表名')) + element_only_match = sum(1 for r in merged_data if r.get('元素治理表_表名') and not r.get('逻辑模型表_表名') and not r.get('物理模型表_表名')) + + print(f"\n[INFO] 统计信息:") + print(f" 三表匹配: {three_table_match} 条") + print(f" 元素治理+逻辑模型: {element_logical_match} 条") + print(f" 元素治理+物理模型: {element_physical_match} 条") + print(f" 仅元素治理: {element_only_match} 条") + + # 显示前3条记录的字段名 + if merged_data: + print(f"\n[INFO] 合并记录示例:") + sample_record = merged_data[0] + print(f" 字段数量: {len(sample_record)}") + print(f" 字段名: {list(sample_record.keys())[:10]}...") # 只显示前10个字段 + + return { + 'success': True, + 'merged_count': len(merged_data), + 'output_file': output_path, + 'file_size_kb': file_size, + 'statistics': { + '三表匹配': three_table_match, + '元素治理+逻辑模型': element_logical_match, + '元素治理+物理模型': element_physical_match, + '仅元素治理': element_only_match + } + } + + except Exception as e: + print(f"\n[ERROR] 保存文件失败: {e}") + import traceback + traceback.print_exc() + return {'success': False, 'merged_count': 0} + + +class RandomSelector: + """随机选择器""" + + def __init__(self, output_dir: str, random_seed: int = 42, select_count: int = 3000): + self.output_dir = output_dir + self.random_seed = random_seed + self.select_count = select_count + + def load_json_file(self, file_path: str) -> List[Dict[str, Any]]: + """加载JSON文件""" + try: + with open(file_path, 'r', encoding='utf-8') as f: + data = json.load(f) + print(f"[OK] 加载文件: {os.path.basename(file_path)} - {len(data)} 条记录") + return data + except Exception as e: + print(f"[ERROR] 加载文件失败 {file_path}: {e}") + return [] + + def random_select(self, records: List[Dict[str, Any]], count: int) -> List[Dict[str, Any]]: + """ + 随机抽取记录 + + Args: + records: 记录列表 + count: 要抽取的数量 + + Returns: + 抽取的记录列表 + """ + # 设置随机种子 + random.seed(self.random_seed) + + # 如果抽取数量大于等于总数,直接返回所有记录 + if count >= len(records): + print(f"[WARN] 抽取数量 ({count}) 大于等于总记录数 ({len(records)}),返回所有记录") + return records + + # 随机抽取 + selected = random.sample(records, count) + print(f"[OK] 从 {len(records)} 条记录中随机抽取 {count} 条") + + return selected + + def select_random(self, input_file: str, output_file: str) -> Dict: + """随机抽取记录""" + print("="*60) + print("随机抽取工具") + print("="*60) + + # 构建文件路径 + input_path = os.path.join(self.output_dir, input_file) + output_path = os.path.join(self.output_dir, output_file) + + print(f"\n[INFO] 配置:") + print(f" 随机种子: {self.random_seed}") + print(f" 抽取数量: {self.select_count}") + + # 检查输入文件是否存在 + if not os.path.exists(input_path): + print(f"\n[ERROR] 输入文件不存在: {input_path}") + return {'success': False, 'selected_count': 0} + + # 加载数据 + print(f"\n[INFO] 加载数据...") + records = self.load_json_file(input_path) + + if not records: + print(f"\n[ERROR] 无法加载数据或数据为空") + return {'success': False, 'selected_count': 0} + + # 随机抽取 + print(f"\n[INFO] 执行随机抽取...") + selected_records = self.random_select(records, self.select_count) + + # 保存结果 + try: + with open(output_path, 'w', encoding='utf-8') as f: + json.dump(selected_records, f, ensure_ascii=False, indent=2) + + file_size = os.path.getsize(output_path) / 1024 # KB + print(f"\n[OK] 抽取完成!") + print(f" 输出文件: {output_path}") + print(f" 记录数量: {len(selected_records)}") + print(f" 文件大小: {file_size:.1f} KB") + + # 显示前3条记录的字段名 + if selected_records: + print(f"\n[INFO] 抽取记录示例:") + sample = selected_records[0] + print(f" 字段数量: {len(sample)}") + print(f" 字段名: {list(sample.keys())[:10]}...") + + # 显示统计信息 + three_table_match = sum(1 for r in selected_records if '元素治理表_表名' in r and '逻辑模型表_表名' in r and '物理模型表_表名' in r) + element_logical_match = sum(1 for r in selected_records if '元素治理表_表名' in r and '逻辑模型表_表名' in r and '物理模型表_表名' not in r) + element_physical_match = sum(1 for r in selected_records if '元素治理表_表名' in r and '物理模型表_表名' in r and '逻辑模型表_表名' not in r) + element_only_match = sum(1 for r in selected_records if '元素治理表_表名' in r and '逻辑模型表_表名' not in r and '物理模型表_表名' not in r) + + print(f"\n[INFO] 抽取记录统计:") + print(f" 三表匹配: {three_table_match} 条") + print(f" 元素治理+逻辑模型: {element_logical_match} 条") + print(f" 元素治理+物理模型: {element_physical_match} 条") + print(f" 仅元素治理: {element_only_match} 条") + + return { + 'success': True, + 'selected_count': len(selected_records), + 'output_file': output_path, + 'file_size_kb': file_size, + 'statistics': { + '三表匹配': three_table_match if selected_records else 0, + '元素治理+逻辑模型': element_logical_match if selected_records else 0, + '元素治理+物理模型': element_physical_match if selected_records else 0, + '仅元素治理': element_only_match if selected_records else 0 + } + } + + except Exception as e: + print(f"\n[ERROR] 保存文件失败: {e}") + import traceback + traceback.print_exc() + return {'success': False, 'selected_count': 0} + + +def main(): + """主函数 - 演示用法""" + # 配置路径 + input_dir = "Data" + csv_input_dir = "Data_Export_CSV" + output_dir = "Data_Export_Json" + + # 创建转换器实例 + converter = ExcelToJsonConverter(input_dir, output_dir) + + # 步骤1: Excel/CSV转JSON + print("\n" + "="*60) + print("步骤1: Excel/CSV转JSON") + print("="*60) + + # 优先使用CSV模式 + if os.path.exists(csv_input_dir) and os.listdir(csv_input_dir): + # CSV模式:使用现有的CSV文件 + print(f"\n[INFO] 检测到CSV文件,使用CSV模式") + print(f" 从 {csv_input_dir} 读取CSV文件") + result = converter.convert_csv_directory(csv_input_dir) + else: + # Excel模式:使用Excel文件(备选方案) + excel_files = converter.find_excel_files() + if excel_files: + print(f"\n[INFO] 未找到CSV文件,使用Excel模式") + print(f" 从 {input_dir} 读取Excel文件") + result = converter.process_all() + else: + print(f"\n[WARN] 未找到CSV文件和Excel文件") + result = {'total': 0, 'success': 0, 'failed': 0} + + print(f"\n[INFO] 转换结果: {result}") + + # 步骤2: 合并JSON文件 + print("\n" + "="*60) + print("步骤2: JSON合并") + print("="*60) + + merger = JsonMerger(output_dir) + merge_result = merger.merge_all( + logical_file="远光数据架构逻辑模型表.json", + physical_file="远光数据架构物理模型表.json", + element_file="远光数据架构元素治理模板表.json", + output_file="final.json" + ) + + # 步骤3: 随机抽取 + print("\n" + "="*60) + print("步骤3: 随机抽取") + print("="*60) + + selector = RandomSelector(output_dir, random_seed=42, select_count=3000) + select_result = selector.select_random( + input_file="final.json", + output_file="selected.json" + ) + + # 最终结果 + print("\n" + "="*60) + print("处理完成!") + print("="*60) + print(f"Excel/CSV转JSON: {result}") + print(f"JSON合并: {merge_result}") + print(f"随机抽取: {select_result}") + + +if __name__ == "__main__": + main() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..b697848 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +pandas>=1.3.0 +xlwings>=0.24.0 +openpyxl>=3.0.0