Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| d8eb25bd47 | |||
| c02b0aede5 | |||
| a03dd4d250 | |||
| 8088b59d30 |
22839
Data_Export_CSV/远光数据架构元素治理模板表.csv
Normal file
22839
Data_Export_CSV/远光数据架构元素治理模板表.csv
Normal file
File diff suppressed because it is too large
Load Diff
190868
Data_Export_CSV/远光数据架构物理模型表.csv
Normal file
190868
Data_Export_CSV/远光数据架构物理模型表.csv
Normal file
File diff suppressed because it is too large
Load Diff
67520
Data_Export_CSV/远光数据架构逻辑模型表.csv
Normal file
67520
Data_Export_CSV/远光数据架构逻辑模型表.csv
Normal file
File diff suppressed because it is too large
Load Diff
648
README.md
648
README.md
@@ -1,579 +1,161 @@
|
|||||||
# 🚀 QA生成器 - 智能问答对生成工具
|
# YG_TDGenerator - 数据元素问答生成工具
|
||||||
|
|
||||||
一个基于JSON数据的模型微调用问答对生成工具,支持灵活配置和问题复杂度控制。项目已精简到仅**2个核心文件**!
|
一款用于企业数据治理领域的问答训练数据生成工具。基于远光数据架构表(逻辑模型表、物理模型表、元素治理模板表)生成标准化的问答对,用于训练领域专用问答系统。
|
||||||
|
|
||||||
## 📋 目录
|
## 功能特点
|
||||||
|
|
||||||
- [功能特性](#-功能特性)
|
- **三表合并**:自动合并逻辑模型表、物理模型表、元素治理模板表
|
||||||
- [快速开始](#-快速开始)
|
- **多种问答类型**:支持字段属性查询、定义查询、模型关联查询
|
||||||
- [核心文件](#-核心文件)
|
- **双版本输出**:同时生成训练集和验证集(表达方式不同但语义相同)
|
||||||
- [配置说明](#-配置说明)
|
- **一体化流程**:CSV转换→JSON合并→随机抽取→QA生成,一条命令完成
|
||||||
- [使用示例](#-使用示例)
|
|
||||||
- [数据输出](#-数据输出)
|
|
||||||
- [问答类型](#-问答类型)
|
|
||||||
- [最佳实践](#-最佳实践)
|
|
||||||
- [常见问题](#-常见问题)
|
|
||||||
- [高级技巧](#-高级技巧)
|
|
||||||
- [项目精简](#-项目精简)
|
|
||||||
- [数据质量](#-数据质量)
|
|
||||||
|
|
||||||
---
|
## 目录结构
|
||||||
|
|
||||||
## ✨ 功能特性
|
```
|
||||||
|
YG_TDGenerator/
|
||||||
|
├── csv2json.py # 主程序:数据处理一体化工具
|
||||||
|
├── qa_generator.py # QA问答对生成器
|
||||||
|
├── config.py # 配置文件
|
||||||
|
├── requirements.txt # Python依赖
|
||||||
|
├── Data/ # Excel源数据目录(可选)
|
||||||
|
├── Data_Export_CSV/ # CSV导出数据目录
|
||||||
|
├── Data_Export_Json/ # JSON中间数据目录
|
||||||
|
└── Data_QA_Outputs/ # QA生成结果目录
|
||||||
|
```
|
||||||
|
|
||||||
### 🎯 核心功能
|
## 安装
|
||||||
- **数据驱动**:基于JSON数据自动生成问答对,不进行任何编撰
|
|
||||||
- **多表支持**:支持元素治理模板、物理模型、逻辑模型等多种数据表
|
|
||||||
- **格式标准**:严格按照 `{"instruct":"", "input":"", "output":""}` 格式输出
|
|
||||||
- **随机打乱**:问答对顺序随机打乱,避免训练时的位置偏好
|
|
||||||
|
|
||||||
### 🎨 问句多样性
|
```bash
|
||||||
- **表名标识**:所有问句均包含表名信息,如"在元素治理模板中"
|
pip install -r requirements.txt
|
||||||
- **丰富前缀**:10种问句前缀(请告诉我、查询、请问、在、请解释等)
|
```
|
||||||
- **多样模板**:每种数据类型支持10+种不同问法
|
|
||||||
- **中英互查**:支持中英文名互查,如"中文名→英文名"、"英文名→中文名"
|
|
||||||
|
|
||||||
### 💬 答句自然化
|
### 依赖说明
|
||||||
- **修饰语前缀**:10种回答前缀(根据表记录、查询结果显示、经查询等)
|
|
||||||
- **礼貌后缀**:10种后缀(望知悉、以上信息为准、请参考、祝您工作顺利等)
|
|
||||||
- **自然表达**:让回答更接近人类语言习惯
|
|
||||||
|
|
||||||
### ⚙️ 可配置性
|
- `pandas`: 数据处理
|
||||||
- **复杂程度控制**:1-5级复杂度,从简单到复杂渐进
|
- `xlwings`: Excel文件读取(可选,用于复杂Excel格式)
|
||||||
- **问题数量控制**:可设置每个数据项生成的问题数量
|
- `openpyxl`: Excel文件读取(pandas依赖)
|
||||||
- **多列查询比例**:可控制多列查询问题的占比
|
|
||||||
- **模板选择**:可根据复杂程度自动选择问句模板
|
|
||||||
|
|
||||||
---
|
## 使用方法
|
||||||
|
|
||||||
## ⚡ 快速开始
|
### 1. 准备数据
|
||||||
|
|
||||||
|
将以下三个CSV文件放入 `Data_Export_CSV/` 目录:
|
||||||
|
|
||||||
|
- `远光数据架构逻辑模型表.csv`
|
||||||
|
- `远光数据架构物理模型表.csv`
|
||||||
|
- `远光数据架构元素治理模板表.csv`
|
||||||
|
|
||||||
|
### 2. 数据处理流程
|
||||||
|
|
||||||
|
运行主程序执行完整的数据处理流程:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
python csv2json.py
|
||||||
|
```
|
||||||
|
|
||||||
|
该命令会执行以下三个步骤:
|
||||||
|
|
||||||
|
1. **CSV/Excel转JSON**:将源数据转换为JSON格式
|
||||||
|
2. **JSON合并**:根据字段英文名合并三张表,生成 `final.json`
|
||||||
|
3. **随机抽取**:从合并结果中随机抽取3000条记录,生成 `selected.json`
|
||||||
|
|
||||||
|
### 3. 生成问答对
|
||||||
|
|
||||||
### 方法1: 交互式运行(推荐)
|
|
||||||
```bash
|
```bash
|
||||||
python qa_generator.py
|
python qa_generator.py
|
||||||
```
|
```
|
||||||
按提示选择配置即可。
|
|
||||||
|
|
||||||
### 方法2: 直接使用预设配置
|
生成的文件位于 `Data_QA_Outputs/` 目录:
|
||||||
```bash
|
|
||||||
# 简单模式(快速测试)
|
|
||||||
python -c "from qa_generator import QAGenerator, SIMPLE_CONFIG; QAGenerator(SIMPLE_CONFIG).process_all()"
|
|
||||||
|
|
||||||
# 普通模式(推荐)
|
| 文件 | 说明 |
|
||||||
python -c "from qa_generator import QAGenerator, NORMAL_CONFIG; QAGenerator(NORMAL_CONFIG).process_all()"
|
|------|------|
|
||||||
|
| `selected_QA.json` | 训练集问答对 |
|
||||||
|
| `selected_QA_验证集.json` | 验证集问答对 |
|
||||||
|
| `QA生成报告.json` | 训练集生成报告 |
|
||||||
|
| `QA生成报告_验证集.json` | 验证集生成报告 |
|
||||||
|
|
||||||
# 复杂模式(高质量)
|
## 问答类型说明
|
||||||
python -c "from qa_generator import QAGenerator, COMPLEX_CONFIG; QAGenerator(COMPLEX_CONFIG).process_all()"
|
|
||||||
|
### 1. 字段属性查询
|
||||||
|
|
||||||
|
根据字段中文名或英文名查询其他属性值:
|
||||||
|
|
||||||
|
```
|
||||||
|
Q: 请告诉我字段中文名为'客户编号'的字段英文名是什么?
|
||||||
|
A: 该字段的字段英文名为customer_id。
|
||||||
```
|
```
|
||||||
|
|
||||||
### 方法3: 生成并合并
|
### 2. 字段定义查询
|
||||||
```bash
|
|
||||||
python -c "from qa_generator import QAGenerator, NORMAL_CONFIG; g = QAGenerator(NORMAL_CONFIG); g.process_all(); g.merge_to_train()"
|
根据字段名查询完整定义(所有属性):
|
||||||
|
|
||||||
|
```
|
||||||
|
Q: 字段中文名为'客户编号'的定义是什么?
|
||||||
|
A: 客户编号的定义为:字段英文名:customer_id,数据类型:varchar,长度:32...
|
||||||
```
|
```
|
||||||
|
|
||||||
---
|
### 3. 模型关联查询
|
||||||
|
|
||||||
## 📁 核心文件
|
根据逻辑模型或物理模型查询关联字段(仅对字段数≥3的模型生成):
|
||||||
|
|
||||||
| 文件 | 大小 | 说明 |
|
```
|
||||||
|------|------|------|
|
Q: 逻辑模型中文名为'客户信息模型'包含哪些字段?
|
||||||
| **qa_generator.py** | 29KB | 主生成器 - 整合了所有功能 |
|
A: 逻辑模型中文名为'客户信息模型'包含以下字段:客户编号、客户名称、联系方式...
|
||||||
| **config.py** | 9KB | 配置文件 - 控制生成参数 |
|
```
|
||||||
|
|
||||||
---
|
## 配置选项
|
||||||
|
|
||||||
## ⚙️ 配置说明
|
编辑 `config.py` 文件可修改以下配置:
|
||||||
|
|
||||||
### 复杂程度等级(1-5级)
|
|
||||||
|
|
||||||
| 等级 | 名称 | 单列模板 | 多列模板 | 多列占比 | 问句前缀 | 适用场景 |
|
|
||||||
|------|------|----------|----------|----------|----------|----------|
|
|
||||||
| 1 | 简单模式 | 3 | 0 | 0% | 3 | 快速测试 |
|
|
||||||
| 2 | 简单+模式 | 6 | 1 | 10% | 3 | 轻量训练 |
|
|
||||||
| 3 | 普通模式 | 9 | 3 | 30% | 10 | 常规训练 |
|
|
||||||
| 4 | 复杂模式 | 12 | 4 | 30% | 10 | 深度训练 |
|
|
||||||
| 5 | 最复杂模式 | 12 | 5 | 50% | 10 | 高质量训练 |
|
|
||||||
|
|
||||||
### 预设配置对比
|
|
||||||
|
|
||||||
| 配置 | 复杂程度 | 生成时间 | QA数量 | 适用场景 |
|
|
||||||
|------|----------|----------|--------|----------|
|
|
||||||
| 简单 | 1级 | ~2分钟 | ~20万 | 快速测试 |
|
|
||||||
| 普通 | 3级 | ~5分钟 | ~60万 | 常规训练 |
|
|
||||||
| 复杂 | 5级 | ~15分钟 | ~100万 | 高质量训练 |
|
|
||||||
|
|
||||||
### 自定义配置示例
|
|
||||||
|
|
||||||
```python
|
```python
|
||||||
from qa_generator import QAGenerator, QAConfig
|
class QAConfig:
|
||||||
|
INPUT_FILE = "selected.json" # 输入文件名
|
||||||
# 创建配置
|
INPUT_DIR = "Data_Export_Json" # 输入目录
|
||||||
config = QAConfig()
|
OUTPUT_DIR = "Data_QA_Outputs" # 输出目录
|
||||||
config.COMPLEXITY_LEVEL = 3 # 设置复杂程度
|
RANDOM_SEED = 42 # 随机种子
|
||||||
config.MULTI_COLUMN_RATIO = 0.4 # 设置多列占比
|
SHUFFLE_OUTPUT = False # 是否打乱输出顺序
|
||||||
config.OUTPUT_DIR = "MyQA_Output" # 设置输出目录
|
|
||||||
|
|
||||||
# 生成QA
|
|
||||||
generator = QAGenerator(config)
|
|
||||||
generator.process_all()
|
|
||||||
```
|
```
|
||||||
|
|
||||||
---
|
## 输出格式
|
||||||
|
|
||||||
## 💡 使用示例
|
### 训练集/验证集格式
|
||||||
|
|
||||||
### 示例1: 使用简单配置
|
|
||||||
|
|
||||||
```python
|
|
||||||
from qa_generator import QAGenerator, SIMPLE_CONFIG
|
|
||||||
|
|
||||||
# 创建生成器
|
|
||||||
generator = QAGenerator(SIMPLE_CONFIG)
|
|
||||||
|
|
||||||
# 生成问答对
|
|
||||||
generator.process_all()
|
|
||||||
```
|
|
||||||
|
|
||||||
### 示例2: 自定义配置
|
|
||||||
|
|
||||||
```python
|
|
||||||
from qa_generator import QAGenerator, QAConfig
|
|
||||||
|
|
||||||
# 创建自定义配置
|
|
||||||
config = QAConfig()
|
|
||||||
config.COMPLEXITY_LEVEL = 2
|
|
||||||
config.MULTI_COLUMN_RATIO = 0.2
|
|
||||||
config.BASIC_QUESTIONS_PER_ITEM = 2
|
|
||||||
|
|
||||||
# 创建生成器
|
|
||||||
generator = QAGenerator(config)
|
|
||||||
generator.process_all()
|
|
||||||
```
|
|
||||||
|
|
||||||
### 示例3: 批量生成不同复杂度的数据
|
|
||||||
|
|
||||||
```python
|
|
||||||
from qa_generator import SIMPLE_CONFIG, NORMAL_CONFIG, COMPLEX_CONFIG, QAGenerator
|
|
||||||
|
|
||||||
configs = [
|
|
||||||
(SIMPLE_CONFIG, "简单版"),
|
|
||||||
(NORMAL_CONFIG, "普通版"),
|
|
||||||
(COMPLEX_CONFIG, "复杂版")
|
|
||||||
]
|
|
||||||
|
|
||||||
for config, name in configs:
|
|
||||||
print(f"\n正在生成{name}问答对...")
|
|
||||||
config.OUTPUT_DIR = f"Data_QA_Outputs_{name}"
|
|
||||||
generator = QAGenerator(config)
|
|
||||||
generator.process_all()
|
|
||||||
```
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## 📊 数据输出
|
|
||||||
|
|
||||||
### 输出文件结构
|
|
||||||
|
|
||||||
每个JSON文件包含多个问答对,格式如下:
|
|
||||||
|
|
||||||
```json
|
```json
|
||||||
[
|
[
|
||||||
{
|
{
|
||||||
"instruct": "在元素治理模板中,「投资原因」对应的英文名是什么?",
|
"问题": "请告诉我字段中文名为'客户编号'的字段英文名是什么?",
|
||||||
"input": "",
|
"回答": "该字段的字段英文名为customer_id。"
|
||||||
"output": "根据表记录,该字段的英文名为「investReas」,以上信息为准。"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"instruct": "请列举元素治理模板中「投资原因」的值类型和总长度",
|
|
||||||
"input": "",
|
|
||||||
"output": "根据表记录,该字段的值类型为「字符」,以及总长度为500.0,望知悉。"
|
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
```
|
```
|
||||||
|
|
||||||
### 生成的文件
|
### 训练集 vs 验证集区别
|
||||||
|
|
||||||
```
|
| 特性 | 训练集 | 验证集 |
|
||||||
Data_QA_Outputs/
|
|------|--------|--------|
|
||||||
├── 元素治理模板_QA.json (58MB, 25.8万条QA)
|
| 问句风格 | "请告诉我"、"查询" | "请问"、"想咨询一下" |
|
||||||
├── 物理模型_QA.json (396MB, 182万条QA)
|
| 答句风格 | "该字段的"、"查询结果:" | "根据查询,"、"经核实," |
|
||||||
├── 逻辑模型_QA.json (166MB, 73.5万条QA)
|
| 语义 | 相同 | 相同 |
|
||||||
├── train.json (619MB, 282万条QA) ⭐ 训练用
|
| 表达 | 基础正式 | 正式但有差异 |
|
||||||
└── train_stats.json (合并统计)
|
|
||||||
```
|
|
||||||
|
|
||||||
### 问答类型
|
## 完整工作流程示例
|
||||||
|
|
||||||
#### 单列查询
|
|
||||||
- 问"字段A"的值类型
|
|
||||||
- 问"字段A"的长度
|
|
||||||
- 问"字段A"的英文名
|
|
||||||
- 问"字段A"属于哪个类别
|
|
||||||
|
|
||||||
#### 多列查询
|
|
||||||
- 请列举"字段A"的值类型和长度
|
|
||||||
- 请输出"字段A"的类别、业务领域和是否枚举信息
|
|
||||||
- 请查找"字段A"的英文名和说明信息
|
|
||||||
|
|
||||||
### 生成统计
|
|
||||||
|
|
||||||
| 配置模式 | 元素治理模板 | 物理模型 | 逻辑模型 | 总计 |
|
|
||||||
|----------|--------------|----------|----------|------|
|
|
||||||
| 简单模式 | ~50,000条 | ~100,000条 | ~50,000条 | ~200,000条 |
|
|
||||||
| 普通模式 | ~150,000条 | ~300,000条 | ~150,000条 | ~600,000条 |
|
|
||||||
| 复杂模式 | ~250,000条 | ~500,000条 | ~250,000条 | ~1,000,000条 |
|
|
||||||
|
|
||||||
*注:实际数量根据原始数据量而定*
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## 📊 问答类型说明
|
|
||||||
|
|
||||||
### 单列查询示例
|
|
||||||
|
|
||||||
**问句模式:**
|
|
||||||
- "在元素治理模板中,「投资原因」对应的英文名是什么?"
|
|
||||||
- "查询物理模型中,数据元素「账套代码」的值类型是什么?"
|
|
||||||
- "「是否叶子节点」这个字段在逻辑模型中属于哪个业务领域?"
|
|
||||||
|
|
||||||
**答句模式:**
|
|
||||||
- "根据表记录,该字段的英文名为「investReas」,以上信息为准。"
|
|
||||||
- "查询结果显示,值类型为「字符」,望知悉。"
|
|
||||||
|
|
||||||
### 多列查询示例
|
|
||||||
|
|
||||||
**问句模式:**
|
|
||||||
- "请列举元素治理模板中「投资原因」的值类型和总长度"
|
|
||||||
- "请输出「账套代码」在元素治理模板中的类别、业务领域和是否枚举信息"
|
|
||||||
|
|
||||||
**答句模式:**
|
|
||||||
- "根据表记录,该字段的值类型为「字符」,以及总长度为500.0,望知悉。"
|
|
||||||
- "查询得知,该数据元素的类别为「业务」,与业务领域为「供应链-数据同步」,以及是否枚举为「否」,祝您工作顺利。"
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## 🎯 最佳实践
|
|
||||||
|
|
||||||
### 场景1: 快速验证功能
|
|
||||||
```bash
|
```bash
|
||||||
# 使用简单配置,仅生成基础问答
|
# 步骤1:数据处理(CSV转JSON + 合并 + 抽取)
|
||||||
python -c "from qa_generator import QAGenerator, SIMPLE_CONFIG; QAGenerator(SIMPLE_CONFIG).process_all()"
|
python csv2json.py
|
||||||
```
|
|
||||||
- 耗时:约1-2分钟
|
|
||||||
- 输出:约20万条问答
|
|
||||||
- 用途:功能验证、代码测试
|
|
||||||
|
|
||||||
### 场景2: 常规模型训练
|
# 步骤2:生成问答对(训练集 + 验证集)
|
||||||
```bash
|
|
||||||
# 使用普通配置,平衡质量和效率
|
|
||||||
python -c "from qa_generator import QAGenerator, NORMAL_CONFIG; QAGenerator(NORMAL_CONFIG).process_all()"
|
|
||||||
```
|
|
||||||
- 耗时:约5-10分钟
|
|
||||||
- 输出:约60万条问答
|
|
||||||
- 用途:模型训练、数据集准备
|
|
||||||
|
|
||||||
### 场景3: 高质量模型训练
|
|
||||||
```bash
|
|
||||||
# 使用复杂配置,生成最丰富的问答
|
|
||||||
python -c "from qa_generator import QAGenerator, COMPLEX_CONFIG; QAGenerator(COMPLEX_CONFIG).process_all()"
|
|
||||||
```
|
|
||||||
- 耗时:约15-30分钟
|
|
||||||
- 输出:约100万条问答
|
|
||||||
- 用途:高质量模型训练
|
|
||||||
|
|
||||||
### 场景4: 生成多个复杂度版本
|
|
||||||
```python
|
|
||||||
# 同时生成三个版本
|
|
||||||
from qa_generator import SIMPLE_CONFIG, NORMAL_CONFIG, COMPLEX_CONFIG, QAGenerator
|
|
||||||
|
|
||||||
for config, name in [(SIMPLE_CONFIG, "Simple"), (NORMAL_CONFIG, "Normal"), (COMPLEX_CONFIG, "Complex")]:
|
|
||||||
config.OUTPUT_DIR = f"Data_QA_Outputs_{name}"
|
|
||||||
print(f"正在生成{name}版本...")
|
|
||||||
generator = QAGenerator(config)
|
|
||||||
generator.process_all()
|
|
||||||
```
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## ❓ 常见问题
|
|
||||||
|
|
||||||
### Q1: 如何调整生成的问题数量?
|
|
||||||
A: 修改配置文件中的 `BASIC_QUESTIONS_PER_ITEM` 和 `MAX_QUESTIONS_PER_ITEM`:
|
|
||||||
```python
|
|
||||||
config.BASIC_QUESTIONS_PER_ITEM = 5 # 基础问题数
|
|
||||||
config.MAX_QUESTIONS_PER_ITEM = 20 # 最大问题数
|
|
||||||
```
|
|
||||||
|
|
||||||
### Q2: 如何只处理特定的数据表?
|
|
||||||
A: 修改 `config.py` 中的 `DATA_FILES` 配置:
|
|
||||||
```python
|
|
||||||
self.DATA_FILES = [
|
|
||||||
{
|
|
||||||
"name": "元素治理模板",
|
|
||||||
"file": "元素治理模板.json",
|
|
||||||
"output": "元素治理模板_QA.json",
|
|
||||||
"enabled": True # True=处理,False=跳过
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"name": "物理模型",
|
|
||||||
"file": "物理模型.json",
|
|
||||||
"output": "物理模型_QA.json",
|
|
||||||
"enabled": False # 跳过物理模型
|
|
||||||
}
|
|
||||||
]
|
|
||||||
```
|
|
||||||
|
|
||||||
### Q3: 如何控制输出文件大小?
|
|
||||||
A: 使用不同复杂程度配置:
|
|
||||||
- 简单模式:文件较小(约20-50MB)
|
|
||||||
- 普通模式:文件中等(约100-200MB)
|
|
||||||
- 复杂模式:文件较大(约300-500MB)
|
|
||||||
|
|
||||||
### Q4: 如何禁用打乱顺序?
|
|
||||||
A: 设置 `SHUFFLE_OUTPUT = False`:
|
|
||||||
```python
|
|
||||||
config.SHUFFLE_OUTPUT = False # 保持原始顺序
|
|
||||||
```
|
|
||||||
|
|
||||||
### Q5: 如何查看生成统计?
|
|
||||||
A: 查看生成的 `QA生成报告.json` 文件,包含:
|
|
||||||
- 总问答对数量
|
|
||||||
- 各文件问答对数量
|
|
||||||
- 配置信息
|
|
||||||
- 生成特点
|
|
||||||
|
|
||||||
### Q6: 如何自定义配置?
|
|
||||||
```python
|
|
||||||
from qa_generator import QAGenerator, QAConfig
|
|
||||||
|
|
||||||
config = QAConfig()
|
|
||||||
config.COMPLEXITY_LEVEL = 3
|
|
||||||
config.MULTI_COLUMN_RATIO = 0.4
|
|
||||||
config.OUTPUT_DIR = "MyQA_Output"
|
|
||||||
|
|
||||||
generator = QAGenerator(config)
|
|
||||||
generator.process_all()
|
|
||||||
```
|
|
||||||
|
|
||||||
### Q7: 数据来源?
|
|
||||||
- 基于 `Data_Export_Json/` 目录下的JSON文件
|
|
||||||
- 严格忠于原文,未编撰
|
|
||||||
|
|
||||||
### Q8: 支持多少QA?
|
|
||||||
- 简单模式:~20万条
|
|
||||||
- 普通模式:~60万条
|
|
||||||
- 复杂模式:~100万条
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## 🔧 高级技巧
|
|
||||||
|
|
||||||
### 1. 批量生成不同配置
|
|
||||||
```python
|
|
||||||
from qa_generator import SIMPLE_CONFIG, NORMAL_CONFIG, COMPLEX_CONFIG, QAGenerator
|
|
||||||
|
|
||||||
configs = {
|
|
||||||
"Level1": SIMPLE_CONFIG,
|
|
||||||
"Level3": NORMAL_CONFIG,
|
|
||||||
"Level5": COMPLEX_CONFIG
|
|
||||||
}
|
|
||||||
|
|
||||||
for level, config in configs.items():
|
|
||||||
config.OUTPUT_DIR = f"QA_Output_{level}"
|
|
||||||
print(f"生成 {level} 级别数据...")
|
|
||||||
generator = QAGenerator(config)
|
|
||||||
generator.process_all()
|
|
||||||
```
|
|
||||||
|
|
||||||
### 2. 自定义问句模板
|
|
||||||
修改 `qa_generator.py` 中的模板函数:
|
|
||||||
```python
|
|
||||||
def generate_single_qa(self, item: Dict, template_count: int, data_type: str):
|
|
||||||
# 在这里添加自定义模板
|
|
||||||
templates = []
|
|
||||||
# 添加你的自定义模板...
|
|
||||||
return qa_pairs
|
|
||||||
```
|
|
||||||
|
|
||||||
### 3. 过滤特定数据
|
|
||||||
在生成前过滤数据:
|
|
||||||
```python
|
|
||||||
def generate_qa_for_data(self, data: List[Dict], data_type: str) -> List[Dict]:
|
|
||||||
# 过滤条件示例:只保留业务领域包含"供应链"的数据
|
|
||||||
filtered_data = [item for item in data if "供应链" in item.get("业务领域名称", "")]
|
|
||||||
# 生成QA...
|
|
||||||
```
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## 📝 输出格式说明
|
|
||||||
|
|
||||||
### JSON格式
|
|
||||||
```json
|
|
||||||
[
|
|
||||||
{
|
|
||||||
"instruct": "问题内容",
|
|
||||||
"input": "",
|
|
||||||
"output": "答案内容"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"instruct": "问题内容",
|
|
||||||
"input": "",
|
|
||||||
"output": "答案内容"
|
|
||||||
}
|
|
||||||
]
|
|
||||||
```
|
|
||||||
|
|
||||||
### 字段说明
|
|
||||||
- `instruct`: 指令/问题内容(必填)
|
|
||||||
- `input`: 输入内容(通常为空)
|
|
||||||
- `output`: 输出/答案内容(必填)
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## 🎉 项目精简
|
|
||||||
|
|
||||||
### 精简前后对比
|
|
||||||
|
|
||||||
#### 精简前
|
|
||||||
- ❌ 5个生成器文件 (generate_qa.py, generate_qa_v2.py, generate_qa_advanced.py, merge_qa_to_train.py, demo.py)
|
|
||||||
- ❌ 类太多,方法分散
|
|
||||||
- ❌ 功能重复,代码冗余
|
|
||||||
- ❌ 使用复杂,需要记多个文件名
|
|
||||||
|
|
||||||
#### 精简后
|
|
||||||
- ✅ **2个核心文件** (qa_generator.py, config.py)
|
|
||||||
- ✅ **1个主生成器类** (QAGenerator)
|
|
||||||
- ✅ **所有功能整合** (生成、合并、报告)
|
|
||||||
- ✅ **使用简单** (一个命令搞定)
|
|
||||||
|
|
||||||
### 精简效果
|
|
||||||
|
|
||||||
| 指标 | 精简前 | 精简后 | 改进 |
|
|
||||||
|------|--------|--------|------|
|
|
||||||
| 核心文件数 | 5个 | 2个 | ⬇️ 60% |
|
|
||||||
| 生成器类数 | 2个 | 1个 | ⬇️ 50% |
|
|
||||||
| 主要方法数 | 分散 | 集中 | ✅ 更好 |
|
|
||||||
| 使用复杂度 | 高 | 低 | ✅ 更好 |
|
|
||||||
| 代码行数 | 1000+ | ~800 | ⬇️ 20% |
|
|
||||||
|
|
||||||
### 用户收益
|
|
||||||
1. **更简单** - 只需要记一个文件名
|
|
||||||
2. **更高效** - 一条命令完成所有操作
|
|
||||||
3. **更灵活** - 支持交互式和命令行两种方式
|
|
||||||
4. **更完整** - 所有功能都在一个文件中
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## 📈 数据质量
|
|
||||||
|
|
||||||
### ✅ 质量保证
|
|
||||||
- **数据完整性**: 100% - 所有JSON字段完整提取
|
|
||||||
- **格式标准性**: 100% - 严格遵循 `{"instruct":"", "input":"", "output":""}` 格式
|
|
||||||
- **内容真实性**: 100% - 忠于原文,未编撰
|
|
||||||
- **随机性**: ✅ - 问答对顺序随机打乱
|
|
||||||
- **多样性**: 10+种问句模板,10+种答句修饰
|
|
||||||
- **可重现性**: 随机种子固定,结果可重现
|
|
||||||
|
|
||||||
### ✅ 验证结果
|
|
||||||
```python
|
|
||||||
# 验证代码
|
|
||||||
import json
|
|
||||||
|
|
||||||
with open('Data_QA_Outputs/train.json', 'r', encoding='utf-8') as f:
|
|
||||||
train_data = json.load(f)
|
|
||||||
|
|
||||||
print(f"总问答对数量: {len(train_data):,}")
|
|
||||||
print(f"数据类型: {type(train_data)}")
|
|
||||||
print(f"字段完整性: {all('instruct' in item and 'output' in item for item in train_data)}")
|
|
||||||
```
|
|
||||||
|
|
||||||
**验证结果**: ✅ 全部通过
|
|
||||||
- 总问答对数量: 2,820,519
|
|
||||||
- 数据类型: list
|
|
||||||
- 字段完整性: True
|
|
||||||
|
|
||||||
### 生成统计
|
|
||||||
- **总问答对**: 2,820,519 条
|
|
||||||
- **元素治理**: 258,579 条 (9.2%)
|
|
||||||
- **物理模型**: 1,826,268 条 (64.8%)
|
|
||||||
- **逻辑模型**: 735,672 条 (26.1%)
|
|
||||||
- **文件大小**: 619 MB
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## ⚠️ 注意事项
|
|
||||||
|
|
||||||
### 数据要求
|
|
||||||
1. **输入JSON格式**:必须是包含字典列表的JSON文件
|
|
||||||
2. **字段完整性**:确保JSON中包含必要的字段(如"表名"、"数据元素中文名"等)
|
|
||||||
3. **编码格式**:使用UTF-8编码保存JSON文件
|
|
||||||
|
|
||||||
### 性能优化
|
|
||||||
1. **大文件处理**:对于大量数据的JSON文件,建议分批处理
|
|
||||||
2. **内存使用**:生成的QA文件较大,注意磁盘空间
|
|
||||||
3. **运行时间**:复杂模式下生成时间较长,请耐心等待
|
|
||||||
|
|
||||||
### 自定义建议
|
|
||||||
1. **复杂程度选择**:
|
|
||||||
- 测试阶段:使用简单模式(复杂程度=1-2)
|
|
||||||
- 训练阶段:使用普通模式(复杂程度=3)
|
|
||||||
- 精细调优:使用复杂模式(复杂程度=4-5)
|
|
||||||
|
|
||||||
2. **多列查询比例**:
|
|
||||||
- 初次使用:0.1-0.2
|
|
||||||
- 常规使用:0.3
|
|
||||||
- 高质量训练:0.5
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## 📞 技术支持
|
|
||||||
|
|
||||||
### 文档资源
|
|
||||||
- 本文档 - 完整项目文档
|
|
||||||
- sample_qa.json - 模板参考
|
|
||||||
|
|
||||||
### 示例代码
|
|
||||||
```python
|
|
||||||
# 基础用法
|
|
||||||
from qa_generator import QAGenerator, NORMAL_CONFIG
|
|
||||||
QAGenerator(NORMAL_CONFIG).process_all()
|
|
||||||
|
|
||||||
# 自定义配置
|
|
||||||
from qa_generator import QAConfig, QAGenerator
|
|
||||||
config = QAConfig()
|
|
||||||
config.COMPLEXITY_LEVEL = 3
|
|
||||||
QAGenerator(config).process_all()
|
|
||||||
```
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## 🎊 项目总结
|
|
||||||
|
|
||||||
### 成果亮点
|
|
||||||
1. ✅ **功能完整** - 从基础版到高级版,满足不同需求
|
|
||||||
2. ✅ **配置灵活** - 1-5级复杂程度控制,适应多种场景
|
|
||||||
3. ✅ **数据优质** - 280万+条QA,100%忠于原文
|
|
||||||
4. ✅ **文档完善** - 整合文档体系,易于理解和使用
|
|
||||||
5. ✅ **即用即跑** - 无需安装,直接运行
|
|
||||||
|
|
||||||
### 技术指标
|
|
||||||
- **代码行数**: 800+ 行
|
|
||||||
- **文档页数**: 整合版
|
|
||||||
- **支持功能**: 100% 完成
|
|
||||||
- **测试覆盖**: 100% 通过
|
|
||||||
|
|
||||||
### 用户价值
|
|
||||||
- **节省时间**: 从手动编写到自动生成,效率提升1000倍
|
|
||||||
- **保证质量**: 统一格式,规范数据,减少错误
|
|
||||||
- **易于使用**: 简单命令即可生成,无需编程知识
|
|
||||||
- **高度可配**: 满足从测试到生产的各种需求
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
**感谢使用QA生成器!** 🎉
|
|
||||||
|
|
||||||
**立即开始使用:**
|
|
||||||
```bash
|
|
||||||
python qa_generator.py
|
python qa_generator.py
|
||||||
```
|
```
|
||||||
|
|
||||||
|
## 注意事项
|
||||||
|
|
||||||
|
1. 确保源数据文件编码为UTF-8或GBK
|
||||||
|
2. 字段英文名会自动转换为小写
|
||||||
|
3. 模型关联查询仅对字段数≥3的模型生成,避免产生过多低价值问答
|
||||||
|
4. 训练集和验证集使用不同的表达模板,但语义完全相同
|
||||||
|
5. 随机种子固定为42,保证结果可复现
|
||||||
|
|
||||||
|
## 许可证
|
||||||
|
|
||||||
|
MIT License
|
||||||
|
|||||||
@@ -34,11 +34,8 @@ class ExcelToJsonConverter:
|
|||||||
if not os.path.exists(output_dir):
|
if not os.path.exists(output_dir):
|
||||||
os.makedirs(output_dir)
|
os.makedirs(output_dir)
|
||||||
|
|
||||||
# CSV输出目录 - 在输入目录的同级创建CSV目录
|
# CSV临时目录(仅在Excel模式下使用)
|
||||||
parent_dir = os.path.dirname(input_dir)
|
self.temp_csv_dir = None
|
||||||
self.csv_output_dir = os.path.join(parent_dir, "Data_Export_CSV")
|
|
||||||
if not os.path.exists(self.csv_output_dir):
|
|
||||||
os.makedirs(self.csv_output_dir)
|
|
||||||
|
|
||||||
def find_excel_files(self) -> List[Tuple[str, str]]:
|
def find_excel_files(self) -> List[Tuple[str, str]]:
|
||||||
"""扫描目录下的所有Excel文件"""
|
"""扫描目录下的所有Excel文件"""
|
||||||
@@ -199,8 +196,14 @@ class ExcelToJsonConverter:
|
|||||||
Returns:
|
Returns:
|
||||||
CSV文件路径
|
CSV文件路径
|
||||||
"""
|
"""
|
||||||
|
# 确保临时CSV目录存在
|
||||||
|
if self.temp_csv_dir is None:
|
||||||
|
self.temp_csv_dir = os.path.join(self.output_dir, "temp_csv")
|
||||||
|
if not os.path.exists(self.temp_csv_dir):
|
||||||
|
os.makedirs(self.temp_csv_dir)
|
||||||
|
|
||||||
csv_filename = f"{base_name}.csv"
|
csv_filename = f"{base_name}.csv"
|
||||||
csv_path = os.path.join(self.csv_output_dir, csv_filename)
|
csv_path = os.path.join(self.temp_csv_dir, csv_filename)
|
||||||
|
|
||||||
# 保存为CSV,使用utf-8-sig编码支持中文
|
# 保存为CSV,使用utf-8-sig编码支持中文
|
||||||
df.to_csv(csv_path, index=False, encoding='utf-8-sig')
|
df.to_csv(csv_path, index=False, encoding='utf-8-sig')
|
||||||
@@ -241,6 +244,18 @@ class ExcelToJsonConverter:
|
|||||||
if pd.isna(value):
|
if pd.isna(value):
|
||||||
json_obj[column] = None
|
json_obj[column] = None
|
||||||
else:
|
else:
|
||||||
|
# 处理数据值:如果是字符串且包含英文字母,转换为小写
|
||||||
|
if isinstance(value, str) and any(c.isalpha() and ord(c) < 128 for c in value):
|
||||||
|
# 将数据值中的英文字母转换为小写
|
||||||
|
value = value.lower()
|
||||||
|
|
||||||
|
# 将英文字段名转换为小写
|
||||||
|
# 检查字段名是否完全是英文字符(包括字母、数字、下划线)
|
||||||
|
if all(ord(c) < 128 for c in column if c.isalnum() or c in '_'):
|
||||||
|
# 完全是英文字段名,转换为小写
|
||||||
|
json_obj[column.lower()] = value
|
||||||
|
else:
|
||||||
|
# 包含中文字符的字段名保持不变
|
||||||
json_obj[column] = value
|
json_obj[column] = value
|
||||||
|
|
||||||
# 添加表名字段
|
# 添加表名字段
|
||||||
@@ -349,15 +364,8 @@ class ExcelToJsonConverter:
|
|||||||
print(f"成功: {success_count} 个文件")
|
print(f"成功: {success_count} 个文件")
|
||||||
print(f"失败: {failed_count} 个文件")
|
print(f"失败: {failed_count} 个文件")
|
||||||
|
|
||||||
# 显示生成的CSV和JSON文件
|
# 显示生成的JSON文件
|
||||||
if success_count > 0:
|
if success_count > 0:
|
||||||
print(f"\n生成的CSV文件:")
|
|
||||||
csv_files = glob.glob(os.path.join(self.csv_output_dir, "*.csv"))
|
|
||||||
for csv_file in sorted(csv_files):
|
|
||||||
file_size = os.path.getsize(csv_file) / 1024 # KB
|
|
||||||
filename = os.path.basename(csv_file)
|
|
||||||
print(f" - {filename} ({file_size:.1f} KB)")
|
|
||||||
|
|
||||||
print(f"\n生成的JSON文件:")
|
print(f"\n生成的JSON文件:")
|
||||||
json_files = glob.glob(os.path.join(self.output_dir, "*.json"))
|
json_files = glob.glob(os.path.join(self.output_dir, "*.json"))
|
||||||
for json_file in sorted(json_files):
|
for json_file in sorted(json_files):
|
||||||
@@ -372,27 +380,250 @@ class ExcelToJsonConverter:
|
|||||||
'results': results
|
'results': results
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def find_csv_files(self, csv_dir: str) -> List[Tuple[str, str]]:
|
||||||
|
"""扫描目录下的所有CSV文件"""
|
||||||
|
csv_files = []
|
||||||
|
search_pattern = os.path.join(csv_dir, "*.csv")
|
||||||
|
|
||||||
|
for csv_path in glob.glob(search_pattern):
|
||||||
|
filename = os.path.basename(csv_path)
|
||||||
|
# 生成基础文件名(不含扩展名)
|
||||||
|
base_name = filename.replace('.csv', '')
|
||||||
|
csv_files.append((csv_path, base_name))
|
||||||
|
|
||||||
|
return csv_files
|
||||||
|
|
||||||
|
def convert_csv_to_json_direct(self, csv_path: str, base_name: str) -> str:
|
||||||
|
"""
|
||||||
|
直接将CSV文件转换为JSON(不生成临时CSV)
|
||||||
|
这个方法直接从CSV读取并转换为JSON
|
||||||
|
|
||||||
|
Args:
|
||||||
|
csv_path: CSV文件路径
|
||||||
|
base_name: 文件基础名
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
JSON文件路径
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
# 尝试多种编码读取CSV文件
|
||||||
|
encodings = ['utf-8-sig', 'gb2312', 'gbk', 'utf-8']
|
||||||
|
df = None
|
||||||
|
|
||||||
|
for encoding in encodings:
|
||||||
|
try:
|
||||||
|
print(f" [TRY] 尝试编码: {encoding}")
|
||||||
|
df = pd.read_csv(csv_path, encoding=encoding)
|
||||||
|
print(f" [OK] 编码 {encoding} 读取成功")
|
||||||
|
break
|
||||||
|
except (UnicodeDecodeError, UnicodeError):
|
||||||
|
print(f" [WARN] 编码 {encoding} 失败")
|
||||||
|
continue
|
||||||
|
except Exception as e:
|
||||||
|
print(f" [WARN] 编码 {encoding} 其他错误: {str(e)[:50]}")
|
||||||
|
continue
|
||||||
|
|
||||||
|
if df is None:
|
||||||
|
print(f" [ERROR] 所有编码都失败,无法读取CSV文件")
|
||||||
|
return ""
|
||||||
|
|
||||||
|
if df.empty:
|
||||||
|
print(f" [WARN] CSV文件为空")
|
||||||
|
return ""
|
||||||
|
|
||||||
|
# 转换为JSON列表
|
||||||
|
json_data = []
|
||||||
|
for index, row in df.iterrows():
|
||||||
|
# 创建JSON对象
|
||||||
|
json_obj = {}
|
||||||
|
for column in df.columns:
|
||||||
|
value = row[column]
|
||||||
|
|
||||||
|
# 处理Na值
|
||||||
|
if pd.isna(value):
|
||||||
|
json_obj[column] = None
|
||||||
|
else:
|
||||||
|
# 处理数据值:如果是字符串且包含英文字母,转换为小写
|
||||||
|
if isinstance(value, str) and any(c.isalpha() and ord(c) < 128 for c in value):
|
||||||
|
# 将数据值中的英文字母转换为小写
|
||||||
|
value = value.lower()
|
||||||
|
|
||||||
|
# 将英文字段名转换为小写
|
||||||
|
# 检查字段名是否完全是英文字符(包括字母、数字、下划线)
|
||||||
|
if all(ord(c) < 128 for c in column if c.isalnum() or c in '_'):
|
||||||
|
# 完全是英文字段名,转换为小写
|
||||||
|
json_obj[column.lower()] = value
|
||||||
|
else:
|
||||||
|
# 包含中文字符的字段名保持不变
|
||||||
|
json_obj[column] = value
|
||||||
|
|
||||||
|
# 添加表名字段
|
||||||
|
json_obj['表名'] = base_name
|
||||||
|
|
||||||
|
json_data.append(json_obj)
|
||||||
|
|
||||||
|
# 生成JSON文件路径
|
||||||
|
json_filename = f"{base_name}.json"
|
||||||
|
json_path = os.path.join(self.output_dir, json_filename)
|
||||||
|
|
||||||
|
# 保存JSON文件
|
||||||
|
with open(json_path, 'w', encoding='utf-8') as f:
|
||||||
|
json.dump(json_data, f, ensure_ascii=False, indent=2)
|
||||||
|
|
||||||
|
file_size = os.path.getsize(json_path) / 1024 # KB
|
||||||
|
print(f" [OK] JSON已生成: {json_filename} ({file_size:.1f} KB)")
|
||||||
|
print(f" 数据量: {len(json_data)} 条记录")
|
||||||
|
|
||||||
|
return json_path
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f" [ERROR] CSV转JSON失败: {e}")
|
||||||
|
import traceback
|
||||||
|
traceback.print_exc()
|
||||||
|
return ""
|
||||||
|
|
||||||
|
def process_single_csv(self, csv_path: str, base_name: str) -> bool:
|
||||||
|
"""
|
||||||
|
处理单个CSV文件:CSV → JSON
|
||||||
|
|
||||||
|
Args:
|
||||||
|
csv_path: CSV文件路径
|
||||||
|
base_name: 文件基础名
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
是否成功
|
||||||
|
"""
|
||||||
|
print(f"\n{'='*60}")
|
||||||
|
print(f"处理: {os.path.basename(csv_path)}")
|
||||||
|
print(f"{'='*60}")
|
||||||
|
|
||||||
|
# 步骤1: 读取CSV文件并预览
|
||||||
|
try:
|
||||||
|
# 尝试多种编码读取CSV文件
|
||||||
|
encodings = ['utf-8-sig', 'gb2312', 'gbk', 'utf-8']
|
||||||
|
df = None
|
||||||
|
|
||||||
|
for encoding in encodings:
|
||||||
|
try:
|
||||||
|
df = pd.read_csv(csv_path, encoding=encoding)
|
||||||
|
break
|
||||||
|
except (UnicodeDecodeError, UnicodeError):
|
||||||
|
continue
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[ERROR] 编码 {encoding} 错误: {e}")
|
||||||
|
continue
|
||||||
|
|
||||||
|
if df is None or df.empty:
|
||||||
|
print(f"[ERROR] CSV文件为空或读取失败")
|
||||||
|
return False
|
||||||
|
|
||||||
|
print(f"\n[INFO] 数据预览:")
|
||||||
|
print(df.head(3))
|
||||||
|
print(f"\n[INFO] 数据形状: {df.shape[0]}行 × {df.shape[1]}列")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[ERROR] 读取CSV失败: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
# 步骤2: 转换为JSON
|
||||||
|
json_path = self.convert_csv_to_json_direct(csv_path, base_name)
|
||||||
|
|
||||||
|
if json_path:
|
||||||
|
print(f"\n[OK] 转换完成!")
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
print(f"\n[ERROR] 转换失败")
|
||||||
|
return False
|
||||||
|
|
||||||
|
def convert_csv_directory(self, csv_dir: str) -> Dict:
|
||||||
|
"""
|
||||||
|
处理CSV目录下的所有CSV文件
|
||||||
|
|
||||||
|
Args:
|
||||||
|
csv_dir: CSV文件目录
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
处理结果统计
|
||||||
|
"""
|
||||||
|
print("="*60)
|
||||||
|
print("CSV转JSON工具")
|
||||||
|
print("="*60)
|
||||||
|
print(f"CSV输入目录: {csv_dir}")
|
||||||
|
print(f"JSON输出目录: {self.output_dir}")
|
||||||
|
|
||||||
|
# 查找CSV文件
|
||||||
|
csv_files = self.find_csv_files(csv_dir)
|
||||||
|
|
||||||
|
if not csv_files:
|
||||||
|
print(f"\n[WARN] 未找到任何CSV文件")
|
||||||
|
return {'total': 0, 'success': 0, 'failed': 0}
|
||||||
|
|
||||||
|
print(f"\n[INFO] 发现 {len(csv_files)} 个CSV文件")
|
||||||
|
|
||||||
|
# 处理每个文件
|
||||||
|
success_count = 0
|
||||||
|
failed_count = 0
|
||||||
|
results = []
|
||||||
|
|
||||||
|
for csv_path, base_name in csv_files:
|
||||||
|
if self.process_single_csv(csv_path, base_name):
|
||||||
|
success_count += 1
|
||||||
|
results.append({'file': os.path.basename(csv_path), 'status': 'success'})
|
||||||
|
else:
|
||||||
|
failed_count += 1
|
||||||
|
results.append({'file': os.path.basename(csv_path), 'status': 'failed'})
|
||||||
|
|
||||||
|
# 输出统计信息
|
||||||
|
print(f"\n{'='*60}")
|
||||||
|
print("转换完成!")
|
||||||
|
print(f"{'='*60}")
|
||||||
|
print(f"总计: {len(csv_files)} 个文件")
|
||||||
|
print(f"成功: {success_count} 个文件")
|
||||||
|
print(f"失败: {failed_count} 个文件")
|
||||||
|
|
||||||
|
# 显示生成的JSON文件
|
||||||
|
if success_count > 0:
|
||||||
|
print(f"\n生成的JSON文件:")
|
||||||
|
json_files = glob.glob(os.path.join(self.output_dir, "*.json"))
|
||||||
|
for json_file in sorted(json_files):
|
||||||
|
file_size = os.path.getsize(json_file) / 1024 # KB
|
||||||
|
filename = os.path.basename(json_file)
|
||||||
|
print(f" - {filename} ({file_size:.1f} KB)")
|
||||||
|
|
||||||
|
return {
|
||||||
|
'total': len(csv_files),
|
||||||
|
'success': success_count,
|
||||||
|
'failed': failed_count,
|
||||||
|
'results': results
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
"""主函数 - 演示用法"""
|
"""主函数 - 演示用法"""
|
||||||
# 获取当前工作目录
|
# 配置路径
|
||||||
current_dir = os.getcwd()
|
input_dir = "Data"
|
||||||
|
csv_input_dir = "Data_Export_CSV"
|
||||||
# 配置路径 - 基于当前目录
|
output_dir = "Data_Export_Json"
|
||||||
input_dir = os.path.join(current_dir, "Data")
|
|
||||||
output_dir = os.path.join(current_dir, "Data_Export_Json")
|
|
||||||
csv_dir = os.path.join(current_dir, "Data_Export_CSV")
|
|
||||||
|
|
||||||
print(f"[INFO] 当前工作目录: {current_dir}")
|
|
||||||
print(f"[INFO] Excel文件目录: {input_dir}")
|
|
||||||
print(f"[INFO] CSV输出目录: {csv_dir}")
|
|
||||||
print(f"[INFO] JSON输出目录: {output_dir}")
|
|
||||||
|
|
||||||
# 创建转换器实例
|
# 创建转换器实例
|
||||||
converter = ExcelToJsonConverter(input_dir, output_dir)
|
converter = ExcelToJsonConverter(input_dir, output_dir)
|
||||||
|
|
||||||
# 处理所有文件
|
# 优先使用CSV模式
|
||||||
|
if os.path.exists(csv_input_dir) and os.listdir(csv_input_dir):
|
||||||
|
# CSV模式:使用现有的CSV文件
|
||||||
|
print(f"\n[INFO] 检测到CSV文件,使用CSV模式")
|
||||||
|
print(f" 从 {csv_input_dir} 读取CSV文件")
|
||||||
|
result = converter.convert_csv_directory(csv_input_dir)
|
||||||
|
else:
|
||||||
|
# Excel模式:使用Excel文件(备选方案)
|
||||||
|
excel_files = converter.find_excel_files()
|
||||||
|
if excel_files:
|
||||||
|
print(f"\n[INFO] 未找到CSV文件,使用Excel模式")
|
||||||
|
print(f" 从 {input_dir} 读取Excel文件")
|
||||||
result = converter.process_all()
|
result = converter.process_all()
|
||||||
|
else:
|
||||||
|
print(f"\n[WARN] 未找到CSV文件和Excel文件")
|
||||||
|
result = {'total': 0, 'success': 0, 'failed': 0}
|
||||||
|
|
||||||
# 输出结果
|
# 输出结果
|
||||||
print(f"\n[INFO] 处理结果: {result}")
|
print(f"\n[INFO] 处理结果: {result}")
|
||||||
|
|||||||
@@ -20,6 +20,10 @@ class QAConfig:
|
|||||||
self.INPUT_DIR = "Data_Export_Json"
|
self.INPUT_DIR = "Data_Export_Json"
|
||||||
self.OUTPUT_DIR = "Data_QA_Outputs"
|
self.OUTPUT_DIR = "Data_QA_Outputs"
|
||||||
|
|
||||||
|
# ========== 随机抽取配置 ==========
|
||||||
|
# 从final.json随机抽取的记录个数,生成selected.json文件
|
||||||
|
self.SELECT_COUNT = 3000
|
||||||
|
|
||||||
# ========== 问题数量控制 ==========
|
# ========== 问题数量控制 ==========
|
||||||
# 每个数据项生成的基本问题数量(简单模式下)
|
# 每个数据项生成的基本问题数量(简单模式下)
|
||||||
self.BASIC_QUESTIONS_PER_ITEM = 1
|
self.BASIC_QUESTIONS_PER_ITEM = 1
|
||||||
@@ -150,7 +154,7 @@ class QAConfig:
|
|||||||
|
|
||||||
# ========== 输出控制 ==========
|
# ========== 输出控制 ==========
|
||||||
# 是否打乱问答对顺序
|
# 是否打乱问答对顺序
|
||||||
self.SHUFFLE_OUTPUT = True
|
self.SHUFFLE_OUTPUT = False
|
||||||
|
|
||||||
# 是否生成QA生成报告
|
# 是否生成QA生成报告
|
||||||
self.GENERATE_REPORT = True
|
self.GENERATE_REPORT = True
|
||||||
|
|||||||
970
csv2json.py
Normal file
970
csv2json.py
Normal file
@@ -0,0 +1,970 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
"""
|
||||||
|
数据处理一体化工具
|
||||||
|
功能1:Excel/CSV转JSON - 读取Excel/CSV文件并转换为JSON
|
||||||
|
功能2:JSON合并 - 根据字段英文名匹配逻辑模型表、物理模型表和元素治理模板表的数据
|
||||||
|
功能3:随机抽取 - 从合并后的JSON中随机抽取指定数量的记录
|
||||||
|
支持多种Excel读取方式,自动处理复杂格式
|
||||||
|
"""
|
||||||
|
|
||||||
|
import pandas as pd
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import glob
|
||||||
|
import subprocess
|
||||||
|
import xlwings as xw
|
||||||
|
import random
|
||||||
|
from datetime import datetime
|
||||||
|
from collections import defaultdict
|
||||||
|
from typing import Optional, Dict, List, Tuple, Any
|
||||||
|
|
||||||
|
|
||||||
|
class ExcelToJsonConverter:
|
||||||
|
"""Excel转JSON转换器"""
|
||||||
|
|
||||||
|
def __init__(self, input_dir: str, output_dir: str):
|
||||||
|
"""
|
||||||
|
初始化转换器
|
||||||
|
|
||||||
|
Args:
|
||||||
|
input_dir: Excel文件输入目录
|
||||||
|
output_dir: JSON文件输出目录
|
||||||
|
"""
|
||||||
|
self.input_dir = input_dir
|
||||||
|
self.output_dir = output_dir
|
||||||
|
|
||||||
|
# 确保输出目录存在
|
||||||
|
if not os.path.exists(output_dir):
|
||||||
|
os.makedirs(output_dir)
|
||||||
|
|
||||||
|
# CSV临时目录(仅在Excel模式下使用)
|
||||||
|
self.temp_csv_dir = None
|
||||||
|
|
||||||
|
def find_excel_files(self) -> List[Tuple[str, str]]:
|
||||||
|
"""扫描目录下的所有Excel文件"""
|
||||||
|
excel_files = []
|
||||||
|
search_pattern = os.path.join(self.input_dir, "*.xlsx")
|
||||||
|
|
||||||
|
for excel_path in glob.glob(search_pattern):
|
||||||
|
filename = os.path.basename(excel_path)
|
||||||
|
|
||||||
|
# 跳过临时文件(Excel的临时文件以~$开头)
|
||||||
|
if filename.startswith('~$'):
|
||||||
|
print(f"[SKIP] 跳过临时文件: {filename}")
|
||||||
|
continue
|
||||||
|
|
||||||
|
# 生成基础文件名(不含扩展名)
|
||||||
|
base_name = filename.replace('.xlsx', '')
|
||||||
|
excel_files.append((excel_path, base_name))
|
||||||
|
|
||||||
|
return excel_files
|
||||||
|
|
||||||
|
def read_excel_with_xlwings(self, excel_path: str) -> Optional[pd.DataFrame]:
|
||||||
|
"""使用xlwings读取Excel文件"""
|
||||||
|
try:
|
||||||
|
print(f" [TRY] 使用xlwings读取...")
|
||||||
|
app = xw.App(visible=False)
|
||||||
|
wb = app.books.open(excel_path)
|
||||||
|
sheet = wb.sheets[0]
|
||||||
|
|
||||||
|
# 读取数据
|
||||||
|
data = sheet.range('A1').expand().value
|
||||||
|
wb.close()
|
||||||
|
app.quit()
|
||||||
|
|
||||||
|
# 转换为DataFrame
|
||||||
|
if data and len(data) > 0:
|
||||||
|
if isinstance(data[0], list):
|
||||||
|
# 标准表格格式
|
||||||
|
headers = data[0]
|
||||||
|
rows = data[1:] if len(data) > 1 else []
|
||||||
|
df = pd.DataFrame(rows, columns=headers)
|
||||||
|
else:
|
||||||
|
# 每行只有一个值的特殊格式
|
||||||
|
df = pd.DataFrame(data, columns=['内容'])
|
||||||
|
return df
|
||||||
|
return None
|
||||||
|
|
||||||
|
except ImportError:
|
||||||
|
print(f" [WARN] xlwings未安装")
|
||||||
|
return None
|
||||||
|
except Exception as e:
|
||||||
|
print(f" [WARN] xlwings读取失败: {str(e)[:100]}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def read_excel_with_libreoffice(self, excel_path: str) -> Optional[pd.DataFrame]:
|
||||||
|
"""使用LibreOffice转换后读取"""
|
||||||
|
try:
|
||||||
|
print(f" [TRY] 使用LibreOffice转换...")
|
||||||
|
# 输出CSV路径
|
||||||
|
csv_path = excel_path.replace('.xlsx', '_temp.csv')
|
||||||
|
|
||||||
|
# 使用LibreOffice转换
|
||||||
|
cmd = [
|
||||||
|
'libreoffice',
|
||||||
|
'--headless',
|
||||||
|
'--convert-to', 'csv',
|
||||||
|
'--outdir', os.path.dirname(excel_path),
|
||||||
|
excel_path
|
||||||
|
]
|
||||||
|
|
||||||
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
|
||||||
|
|
||||||
|
if os.path.exists(csv_path):
|
||||||
|
df = pd.read_csv(csv_path, encoding='utf-8')
|
||||||
|
# 删除临时文件
|
||||||
|
os.remove(csv_path)
|
||||||
|
print(f" [OK] LibreOffice转换成功")
|
||||||
|
return df
|
||||||
|
else:
|
||||||
|
print(f" [WARN] LibreOffice转换失败")
|
||||||
|
return None
|
||||||
|
|
||||||
|
except FileNotFoundError:
|
||||||
|
print(f" [WARN] LibreOffice未安装")
|
||||||
|
return None
|
||||||
|
except subprocess.TimeoutExpired:
|
||||||
|
print(f" [WARN] LibreOffice转换超时")
|
||||||
|
return None
|
||||||
|
except Exception as e:
|
||||||
|
print(f" [WARN] LibreOffice转换失败: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def read_excel_with_pandas(self, excel_path: str) -> Optional[pd.DataFrame]:
|
||||||
|
"""使用pandas读取Excel文件"""
|
||||||
|
engines = ['openpyxl', 'xlrd']
|
||||||
|
|
||||||
|
for engine in engines:
|
||||||
|
try:
|
||||||
|
print(f" [TRY] 使用pandas ({engine})读取...")
|
||||||
|
df = pd.read_excel(excel_path, engine=engine)
|
||||||
|
print(f" [OK] pandas ({engine}) 读取成功")
|
||||||
|
return df
|
||||||
|
except Exception as e:
|
||||||
|
print(f" [WARN] pandas ({engine}) 失败: {str(e)[:100]}")
|
||||||
|
continue
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
def read_excel_file(self, excel_path: str) -> Optional[pd.DataFrame]:
|
||||||
|
"""
|
||||||
|
尝试多种方法读取Excel文件
|
||||||
|
|
||||||
|
Args:
|
||||||
|
excel_path: Excel文件路径
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
DataFrame或None
|
||||||
|
"""
|
||||||
|
print(f"\n[INFO] 读取文件: {os.path.basename(excel_path)}")
|
||||||
|
|
||||||
|
# 按优先级尝试读取方法
|
||||||
|
methods = [
|
||||||
|
("xlwings", self.read_excel_with_xlwings),
|
||||||
|
("pandas-openpyxl", lambda p: self.read_excel_with_pandas(p) if 'openpyxl' in str(p) else None),
|
||||||
|
("LibreOffice", self.read_excel_with_libreoffice),
|
||||||
|
("pandas-xlrd", self.read_excel_with_pandas),
|
||||||
|
]
|
||||||
|
|
||||||
|
for method_name, method_func in methods:
|
||||||
|
try:
|
||||||
|
if method_name == "pandas-openpyxl":
|
||||||
|
# 特殊处理pandas-openpyxl
|
||||||
|
df = self.read_excel_with_pandas(excel_path)
|
||||||
|
elif method_name == "pandas-xlrd":
|
||||||
|
# 跳过,因为上面已经尝试过了
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
df = method_func(excel_path)
|
||||||
|
|
||||||
|
if df is not None and not df.empty:
|
||||||
|
print(f"[OK] {method_name} 成功读取!")
|
||||||
|
print(f" 数据形状: {df.shape[0]}行 × {df.shape[1]}列")
|
||||||
|
return df
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[WARN] {method_name} 失败: {str(e)[:100]}")
|
||||||
|
|
||||||
|
print(f"[ERROR] 所有读取方法都失败了")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def convert_to_csv(self, df: pd.DataFrame, base_name: str) -> str:
|
||||||
|
"""
|
||||||
|
将DataFrame转换为CSV
|
||||||
|
|
||||||
|
Args:
|
||||||
|
df: 数据框
|
||||||
|
base_name: 文件基础名
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
CSV文件路径
|
||||||
|
"""
|
||||||
|
# 确保临时CSV目录存在
|
||||||
|
if self.temp_csv_dir is None:
|
||||||
|
self.temp_csv_dir = os.path.join(self.output_dir, "temp_csv")
|
||||||
|
if not os.path.exists(self.temp_csv_dir):
|
||||||
|
os.makedirs(self.temp_csv_dir)
|
||||||
|
|
||||||
|
csv_filename = f"{base_name}.csv"
|
||||||
|
csv_path = os.path.join(self.temp_csv_dir, csv_filename)
|
||||||
|
|
||||||
|
# 保存为CSV,使用utf-8-sig编码支持中文
|
||||||
|
df.to_csv(csv_path, index=False, encoding='utf-8-sig')
|
||||||
|
|
||||||
|
file_size = os.path.getsize(csv_path) / 1024 # KB
|
||||||
|
print(f" [OK] CSV已生成: {csv_filename} ({file_size:.1f} KB)")
|
||||||
|
|
||||||
|
return csv_path
|
||||||
|
|
||||||
|
def convert_csv_to_json(self, csv_path: str, base_name: str) -> str:
|
||||||
|
"""
|
||||||
|
将CSV文件转换为JSON
|
||||||
|
|
||||||
|
Args:
|
||||||
|
csv_path: CSV文件路径
|
||||||
|
base_name: 文件基础名
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
JSON文件路径
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
# 读取CSV文件
|
||||||
|
df = pd.read_csv(csv_path, encoding='utf-8-sig')
|
||||||
|
|
||||||
|
if df.empty:
|
||||||
|
print(f" [WARN] CSV文件为空")
|
||||||
|
return ""
|
||||||
|
|
||||||
|
# 转换为JSON列表
|
||||||
|
json_data = []
|
||||||
|
for index, row in df.iterrows():
|
||||||
|
# 创建JSON对象
|
||||||
|
json_obj = {}
|
||||||
|
for column in df.columns:
|
||||||
|
value = row[column]
|
||||||
|
|
||||||
|
# 处理Na值
|
||||||
|
if pd.isna(value):
|
||||||
|
json_obj[column] = None
|
||||||
|
else:
|
||||||
|
# 处理数据值:如果是字符串且包含英文字母,转换为小写
|
||||||
|
if isinstance(value, str) and any(c.isalpha() and ord(c) < 128 for c in value):
|
||||||
|
# 将数据值中的英文字母转换为小写
|
||||||
|
value = value.lower()
|
||||||
|
|
||||||
|
# 将英文字段名转换为小写
|
||||||
|
# 检查字段名是否完全是英文字符(包括字母、数字、下划线)
|
||||||
|
if all(ord(c) < 128 for c in column if c.isalnum() or c in '_'):
|
||||||
|
# 完全是英文字段名,转换为小写
|
||||||
|
json_obj[column.lower()] = value
|
||||||
|
else:
|
||||||
|
# 包含中文字符的字段名保持不变
|
||||||
|
json_obj[column] = value
|
||||||
|
|
||||||
|
# 添加表名字段
|
||||||
|
json_obj['表名'] = base_name
|
||||||
|
|
||||||
|
json_data.append(json_obj)
|
||||||
|
|
||||||
|
# 生成JSON文件路径
|
||||||
|
json_filename = f"{base_name}.json"
|
||||||
|
json_path = os.path.join(self.output_dir, json_filename)
|
||||||
|
|
||||||
|
# 保存JSON文件
|
||||||
|
with open(json_path, 'w', encoding='utf-8') as f:
|
||||||
|
json.dump(json_data, f, ensure_ascii=False, indent=2)
|
||||||
|
|
||||||
|
file_size = os.path.getsize(json_path) / 1024 # KB
|
||||||
|
print(f" [OK] JSON已生成: {json_filename} ({file_size:.1f} KB)")
|
||||||
|
print(f" 数据量: {len(json_data)} 条记录")
|
||||||
|
|
||||||
|
return json_path
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f" [ERROR] CSV转JSON失败: {e}")
|
||||||
|
import traceback
|
||||||
|
traceback.print_exc()
|
||||||
|
return ""
|
||||||
|
|
||||||
|
def process_single_file(self, excel_path: str, base_name: str) -> bool:
|
||||||
|
"""
|
||||||
|
处理单个Excel文件:Excel -> CSV -> JSON
|
||||||
|
|
||||||
|
Args:
|
||||||
|
excel_path: Excel文件路径
|
||||||
|
base_name: 文件基础名
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
是否成功
|
||||||
|
"""
|
||||||
|
print(f"\n{'='*60}")
|
||||||
|
print(f"处理: {os.path.basename(excel_path)}")
|
||||||
|
print(f"{'='*60}")
|
||||||
|
|
||||||
|
# 步骤1: 读取Excel
|
||||||
|
df = self.read_excel_file(excel_path)
|
||||||
|
if df is None:
|
||||||
|
print(f"[ERROR] 读取失败,跳过此文件")
|
||||||
|
return False
|
||||||
|
|
||||||
|
# 显示数据预览
|
||||||
|
print(f"\n[INFO] 数据预览:")
|
||||||
|
print(df.head(3))
|
||||||
|
|
||||||
|
# 步骤2: 转换为CSV
|
||||||
|
csv_path = self.convert_to_csv(df, base_name)
|
||||||
|
|
||||||
|
# 步骤3: 转换为JSON
|
||||||
|
json_path = self.convert_csv_to_json(csv_path, base_name)
|
||||||
|
|
||||||
|
if json_path:
|
||||||
|
print(f"\n[OK] 转换完成!")
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
print(f"\n[ERROR] 转换失败")
|
||||||
|
return False
|
||||||
|
|
||||||
|
def process_all(self) -> Dict:
|
||||||
|
"""
|
||||||
|
处理所有Excel文件
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
处理结果统计
|
||||||
|
"""
|
||||||
|
print("="*60)
|
||||||
|
print("Excel转JSON一体化工具")
|
||||||
|
print("="*60)
|
||||||
|
print(f"输入目录: {self.input_dir}")
|
||||||
|
print(f"输出目录: {self.output_dir}")
|
||||||
|
|
||||||
|
# 查找Excel文件
|
||||||
|
excel_files = self.find_excel_files()
|
||||||
|
|
||||||
|
if not excel_files:
|
||||||
|
print(f"\n[WARN] 未找到任何Excel文件")
|
||||||
|
return {'total': 0, 'success': 0, 'failed': 0}
|
||||||
|
|
||||||
|
print(f"\n[INFO] 发现 {len(excel_files)} 个Excel文件")
|
||||||
|
|
||||||
|
# 处理每个文件
|
||||||
|
success_count = 0
|
||||||
|
failed_count = 0
|
||||||
|
results = []
|
||||||
|
|
||||||
|
for excel_path, base_name in excel_files:
|
||||||
|
if self.process_single_file(excel_path, base_name):
|
||||||
|
success_count += 1
|
||||||
|
results.append({'file': os.path.basename(excel_path), 'status': 'success'})
|
||||||
|
else:
|
||||||
|
failed_count += 1
|
||||||
|
results.append({'file': os.path.basename(excel_path), 'status': 'failed'})
|
||||||
|
|
||||||
|
# 输出统计信息
|
||||||
|
print(f"\n{'='*60}")
|
||||||
|
print("转换完成!")
|
||||||
|
print(f"{'='*60}")
|
||||||
|
print(f"总计: {len(excel_files)} 个文件")
|
||||||
|
print(f"成功: {success_count} 个文件")
|
||||||
|
print(f"失败: {failed_count} 个文件")
|
||||||
|
|
||||||
|
# 显示生成的JSON文件
|
||||||
|
if success_count > 0:
|
||||||
|
print(f"\n生成的JSON文件:")
|
||||||
|
json_files = glob.glob(os.path.join(self.output_dir, "*.json"))
|
||||||
|
for json_file in sorted(json_files):
|
||||||
|
file_size = os.path.getsize(json_file) / 1024 # KB
|
||||||
|
filename = os.path.basename(json_file)
|
||||||
|
print(f" - {filename} ({file_size:.1f} KB)")
|
||||||
|
|
||||||
|
return {
|
||||||
|
'total': len(excel_files),
|
||||||
|
'success': success_count,
|
||||||
|
'failed': failed_count,
|
||||||
|
'results': results
|
||||||
|
}
|
||||||
|
|
||||||
|
def find_csv_files(self, csv_dir: str) -> List[Tuple[str, str]]:
|
||||||
|
"""扫描目录下的所有CSV文件"""
|
||||||
|
csv_files = []
|
||||||
|
search_pattern = os.path.join(csv_dir, "*.csv")
|
||||||
|
|
||||||
|
for csv_path in glob.glob(search_pattern):
|
||||||
|
filename = os.path.basename(csv_path)
|
||||||
|
# 生成基础文件名(不含扩展名)
|
||||||
|
base_name = filename.replace('.csv', '')
|
||||||
|
csv_files.append((csv_path, base_name))
|
||||||
|
|
||||||
|
return csv_files
|
||||||
|
|
||||||
|
def convert_csv_to_json_direct(self, csv_path: str, base_name: str) -> str:
|
||||||
|
"""
|
||||||
|
直接将CSV文件转换为JSON(不生成临时CSV)
|
||||||
|
这个方法直接从CSV读取并转换为JSON
|
||||||
|
|
||||||
|
Args:
|
||||||
|
csv_path: CSV文件路径
|
||||||
|
base_name: 文件基础名
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
JSON文件路径
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
# 尝试多种编码读取CSV文件
|
||||||
|
encodings = ['utf-8-sig', 'gb2312', 'gbk', 'utf-8']
|
||||||
|
df = None
|
||||||
|
|
||||||
|
for encoding in encodings:
|
||||||
|
try:
|
||||||
|
print(f" [TRY] 尝试编码: {encoding}")
|
||||||
|
df = pd.read_csv(csv_path, encoding=encoding)
|
||||||
|
print(f" [OK] 编码 {encoding} 读取成功")
|
||||||
|
break
|
||||||
|
except (UnicodeDecodeError, UnicodeError):
|
||||||
|
print(f" [WARN] 编码 {encoding} 失败")
|
||||||
|
continue
|
||||||
|
except Exception as e:
|
||||||
|
print(f" [WARN] 编码 {encoding} 其他错误: {str(e)[:50]}")
|
||||||
|
continue
|
||||||
|
|
||||||
|
if df is None:
|
||||||
|
print(f" [ERROR] 所有编码都失败,无法读取CSV文件")
|
||||||
|
return ""
|
||||||
|
|
||||||
|
if df.empty:
|
||||||
|
print(f" [WARN] CSV文件为空")
|
||||||
|
return ""
|
||||||
|
|
||||||
|
# 转换为JSON列表
|
||||||
|
json_data = []
|
||||||
|
for index, row in df.iterrows():
|
||||||
|
# 创建JSON对象
|
||||||
|
json_obj = {}
|
||||||
|
for column in df.columns:
|
||||||
|
value = row[column]
|
||||||
|
|
||||||
|
# 处理Na值
|
||||||
|
if pd.isna(value):
|
||||||
|
json_obj[column] = None
|
||||||
|
else:
|
||||||
|
# 处理数据值:如果是字符串且包含英文字母,转换为小写
|
||||||
|
if isinstance(value, str) and any(c.isalpha() and ord(c) < 128 for c in value):
|
||||||
|
# 将数据值中的英文字母转换为小写
|
||||||
|
value = value.lower()
|
||||||
|
|
||||||
|
# 将英文字段名转换为小写
|
||||||
|
# 检查字段名是否完全是英文字符(包括字母、数字、下划线)
|
||||||
|
if all(ord(c) < 128 for c in column if c.isalnum() or c in '_'):
|
||||||
|
# 完全是英文字段名,转换为小写
|
||||||
|
json_obj[column.lower()] = value
|
||||||
|
else:
|
||||||
|
# 包含中文字符的字段名保持不变
|
||||||
|
json_obj[column] = value
|
||||||
|
|
||||||
|
# 添加表名字段
|
||||||
|
json_obj['表名'] = base_name
|
||||||
|
|
||||||
|
json_data.append(json_obj)
|
||||||
|
|
||||||
|
# 生成JSON文件路径
|
||||||
|
json_filename = f"{base_name}.json"
|
||||||
|
json_path = os.path.join(self.output_dir, json_filename)
|
||||||
|
|
||||||
|
# 保存JSON文件
|
||||||
|
with open(json_path, 'w', encoding='utf-8') as f:
|
||||||
|
json.dump(json_data, f, ensure_ascii=False, indent=2)
|
||||||
|
|
||||||
|
file_size = os.path.getsize(json_path) / 1024 # KB
|
||||||
|
print(f" [OK] JSON已生成: {json_filename} ({file_size:.1f} KB)")
|
||||||
|
print(f" 数据量: {len(json_data)} 条记录")
|
||||||
|
|
||||||
|
return json_path
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f" [ERROR] CSV转JSON失败: {e}")
|
||||||
|
import traceback
|
||||||
|
traceback.print_exc()
|
||||||
|
return ""
|
||||||
|
|
||||||
|
def process_single_csv(self, csv_path: str, base_name: str) -> bool:
|
||||||
|
"""
|
||||||
|
处理单个CSV文件:CSV → JSON
|
||||||
|
|
||||||
|
Args:
|
||||||
|
csv_path: CSV文件路径
|
||||||
|
base_name: 文件基础名
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
是否成功
|
||||||
|
"""
|
||||||
|
print(f"\n{'='*60}")
|
||||||
|
print(f"处理: {os.path.basename(csv_path)}")
|
||||||
|
print(f"{'='*60}")
|
||||||
|
|
||||||
|
# 步骤1: 读取CSV文件并预览
|
||||||
|
try:
|
||||||
|
# 尝试多种编码读取CSV文件
|
||||||
|
encodings = ['utf-8-sig', 'gb2312', 'gbk', 'utf-8']
|
||||||
|
df = None
|
||||||
|
|
||||||
|
for encoding in encodings:
|
||||||
|
try:
|
||||||
|
df = pd.read_csv(csv_path, encoding=encoding)
|
||||||
|
break
|
||||||
|
except (UnicodeDecodeError, UnicodeError):
|
||||||
|
continue
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[ERROR] 编码 {encoding} 错误: {e}")
|
||||||
|
continue
|
||||||
|
|
||||||
|
if df is None or df.empty:
|
||||||
|
print(f"[ERROR] CSV文件为空或读取失败")
|
||||||
|
return False
|
||||||
|
|
||||||
|
print(f"\n[INFO] 数据预览:")
|
||||||
|
print(df.head(3))
|
||||||
|
print(f"\n[INFO] 数据形状: {df.shape[0]}行 × {df.shape[1]}列")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[ERROR] 读取CSV失败: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
# 步骤2: 转换为JSON
|
||||||
|
json_path = self.convert_csv_to_json_direct(csv_path, base_name)
|
||||||
|
|
||||||
|
if json_path:
|
||||||
|
print(f"\n[OK] 转换完成!")
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
print(f"\n[ERROR] 转换失败")
|
||||||
|
return False
|
||||||
|
|
||||||
|
def convert_csv_directory(self, csv_dir: str) -> Dict:
|
||||||
|
"""
|
||||||
|
处理CSV目录下的所有CSV文件
|
||||||
|
|
||||||
|
Args:
|
||||||
|
csv_dir: CSV文件目录
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
处理结果统计
|
||||||
|
"""
|
||||||
|
print("="*60)
|
||||||
|
print("CSV转JSON工具")
|
||||||
|
print("="*60)
|
||||||
|
print(f"CSV输入目录: {csv_dir}")
|
||||||
|
print(f"JSON输出目录: {self.output_dir}")
|
||||||
|
|
||||||
|
# 查找CSV文件
|
||||||
|
csv_files = self.find_csv_files(csv_dir)
|
||||||
|
|
||||||
|
if not csv_files:
|
||||||
|
print(f"\n[WARN] 未找到任何CSV文件")
|
||||||
|
return {'total': 0, 'success': 0, 'failed': 0}
|
||||||
|
|
||||||
|
print(f"\n[INFO] 发现 {len(csv_files)} 个CSV文件")
|
||||||
|
|
||||||
|
# 处理每个文件
|
||||||
|
success_count = 0
|
||||||
|
failed_count = 0
|
||||||
|
results = []
|
||||||
|
|
||||||
|
for csv_path, base_name in csv_files:
|
||||||
|
if self.process_single_csv(csv_path, base_name):
|
||||||
|
success_count += 1
|
||||||
|
results.append({'file': os.path.basename(csv_path), 'status': 'success'})
|
||||||
|
else:
|
||||||
|
failed_count += 1
|
||||||
|
results.append({'file': os.path.basename(csv_path), 'status': 'failed'})
|
||||||
|
|
||||||
|
# 输出统计信息
|
||||||
|
print(f"\n{'='*60}")
|
||||||
|
print("转换完成!")
|
||||||
|
print(f"{'='*60}")
|
||||||
|
print(f"总计: {len(csv_files)} 个文件")
|
||||||
|
print(f"成功: {success_count} 个文件")
|
||||||
|
print(f"失败: {failed_count} 个文件")
|
||||||
|
|
||||||
|
# 显示生成的JSON文件
|
||||||
|
if success_count > 0:
|
||||||
|
print(f"\n生成的JSON文件:")
|
||||||
|
json_files = glob.glob(os.path.join(self.output_dir, "*.json"))
|
||||||
|
for json_file in sorted(json_files):
|
||||||
|
file_size = os.path.getsize(json_file) / 1024 # KB
|
||||||
|
filename = os.path.basename(json_file)
|
||||||
|
print(f" - {filename} ({file_size:.1f} KB)")
|
||||||
|
|
||||||
|
return {
|
||||||
|
'total': len(csv_files),
|
||||||
|
'success': success_count,
|
||||||
|
'failed': failed_count,
|
||||||
|
'results': results
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class JsonMerger:
|
||||||
|
"""JSON文件合并器"""
|
||||||
|
|
||||||
|
def __init__(self, output_dir: str):
|
||||||
|
self.output_dir = output_dir
|
||||||
|
|
||||||
|
def load_json_file(self, file_path: str) -> List[Dict[str, Any]]:
|
||||||
|
"""加载JSON文件"""
|
||||||
|
try:
|
||||||
|
with open(file_path, 'r', encoding='utf-8') as f:
|
||||||
|
data = json.load(f)
|
||||||
|
print(f"[OK] 加载文件: {os.path.basename(file_path)} - {len(data)} 条记录")
|
||||||
|
return data
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[ERROR] 加载文件失败 {file_path}: {e}")
|
||||||
|
return []
|
||||||
|
|
||||||
|
def build_index(self, records: List[Dict], field_name: str) -> Dict[str, List[Dict]]:
|
||||||
|
"""为记录列表建立索引,加速查找"""
|
||||||
|
index = defaultdict(list)
|
||||||
|
for record in records:
|
||||||
|
field_value = record.get(field_name)
|
||||||
|
if field_value:
|
||||||
|
index[field_value].append(record)
|
||||||
|
print(f"[INFO] 建立索引完成: {len(index)} 个唯一字段值")
|
||||||
|
return index
|
||||||
|
|
||||||
|
def merge_records_optimized(self, logical_index: Dict, physical_index: Dict, element_records: List[Dict]) -> List[Dict]:
|
||||||
|
"""
|
||||||
|
使用索引优化合并三个表的记录
|
||||||
|
"""
|
||||||
|
merged_data = []
|
||||||
|
processed_fields = set()
|
||||||
|
|
||||||
|
# 遍历元素治理表
|
||||||
|
print(f"\n[INFO] 开始合并数据...")
|
||||||
|
for i, element_record in enumerate(element_records):
|
||||||
|
if i % 5000 == 0:
|
||||||
|
print(f" 处理进度: {i}/{len(element_records)}")
|
||||||
|
|
||||||
|
field_english_name = element_record.get('字段英文名')
|
||||||
|
if not field_english_name or field_english_name in processed_fields:
|
||||||
|
continue
|
||||||
|
|
||||||
|
processed_fields.add(field_english_name)
|
||||||
|
|
||||||
|
# 创建合并记录
|
||||||
|
merged_record = {}
|
||||||
|
|
||||||
|
# 添加元素治理模板表的数据
|
||||||
|
for key, value in element_record.items():
|
||||||
|
if key != '表名':
|
||||||
|
merged_record[key] = value
|
||||||
|
|
||||||
|
# 查找逻辑模型表中的匹配记录
|
||||||
|
logical_matches = logical_index.get(field_english_name, [])
|
||||||
|
|
||||||
|
# 查找物理模型表中的匹配记录
|
||||||
|
physical_matches = physical_index.get(field_english_name, [])
|
||||||
|
|
||||||
|
# 添加逻辑模型表的数据(添加前缀避免冲突)
|
||||||
|
if logical_matches:
|
||||||
|
for logical_match in logical_matches:
|
||||||
|
for key, value in logical_match.items():
|
||||||
|
if key not in ['表名', '字段英文名']:
|
||||||
|
new_key = f"逻辑模型_{key}"
|
||||||
|
merged_record[new_key] = value
|
||||||
|
|
||||||
|
# 只有当有匹配数据时才添加表名信息
|
||||||
|
merged_record['逻辑模型表_表名'] = '远光数据架构逻辑模型表'
|
||||||
|
|
||||||
|
# 添加物理模型表的数据(添加前缀避免冲突)
|
||||||
|
if physical_matches:
|
||||||
|
for physical_match in physical_matches:
|
||||||
|
for key, value in physical_match.items():
|
||||||
|
if key not in ['表名', '字段英文名']:
|
||||||
|
new_key = f"物理模型_{key}"
|
||||||
|
merged_record[new_key] = value
|
||||||
|
|
||||||
|
# 只有当有匹配数据时才添加表名信息
|
||||||
|
merged_record['物理模型表_表名'] = '远光数据架构物理模型表'
|
||||||
|
|
||||||
|
# 添加元素治理表名(始终存在)
|
||||||
|
merged_record['元素治理表_表名'] = '远光数据架构元素治理模板表'
|
||||||
|
|
||||||
|
merged_data.append(merged_record)
|
||||||
|
|
||||||
|
print(f" 完成合并: {len(merged_data)} 条记录")
|
||||||
|
|
||||||
|
return merged_data
|
||||||
|
|
||||||
|
def merge_all(self, logical_file: str, physical_file: str, element_file: str, output_file: str) -> Dict:
|
||||||
|
"""合并所有JSON文件"""
|
||||||
|
print("="*60)
|
||||||
|
print("优化版JSON文件合并工具")
|
||||||
|
print("="*60)
|
||||||
|
|
||||||
|
# 构建文件路径
|
||||||
|
logical_json_path = os.path.join(self.output_dir, logical_file)
|
||||||
|
physical_json_path = os.path.join(self.output_dir, physical_file)
|
||||||
|
element_json_path = os.path.join(self.output_dir, element_file)
|
||||||
|
output_path = os.path.join(self.output_dir, output_file)
|
||||||
|
|
||||||
|
# 加载JSON文件
|
||||||
|
print("\n[INFO] 加载JSON文件...")
|
||||||
|
logical_records = self.load_json_file(logical_json_path)
|
||||||
|
physical_records = self.load_json_file(physical_json_path)
|
||||||
|
element_records = self.load_json_file(element_json_path)
|
||||||
|
|
||||||
|
if not (logical_records and physical_records and element_records):
|
||||||
|
print("\n[ERROR] 无法加载所有JSON文件")
|
||||||
|
return {'success': False, 'merged_count': 0}
|
||||||
|
|
||||||
|
# 建立索引
|
||||||
|
print(f"\n[INFO] 建立索引加速查找...")
|
||||||
|
logical_index = self.build_index(logical_records, '字段英文名')
|
||||||
|
physical_index = self.build_index(physical_records, '字段英文名')
|
||||||
|
|
||||||
|
# 合并数据(只处理元素治理表中存在的字段)
|
||||||
|
merged_data = self.merge_records_optimized(logical_index, physical_index, element_records)
|
||||||
|
|
||||||
|
# 保存合并后的数据
|
||||||
|
try:
|
||||||
|
print(f"\n[INFO] 保存合并数据到 {output_path}...")
|
||||||
|
with open(output_path, 'w', encoding='utf-8') as f:
|
||||||
|
json.dump(merged_data, f, ensure_ascii=False, indent=2)
|
||||||
|
|
||||||
|
file_size = os.path.getsize(output_path) / 1024 # KB
|
||||||
|
print(f"\n[OK] 合并完成!")
|
||||||
|
print(f" 输出文件: {output_path}")
|
||||||
|
print(f" 合并记录: {len(merged_data)} 条")
|
||||||
|
print(f" 文件大小: {file_size:.1f} KB")
|
||||||
|
|
||||||
|
# 显示统计信息
|
||||||
|
three_table_match = sum(1 for r in merged_data if r.get('元素治理表_表名') and r.get('逻辑模型表_表名') and r.get('物理模型表_表名'))
|
||||||
|
element_logical_match = sum(1 for r in merged_data if r.get('元素治理表_表名') and r.get('逻辑模型表_表名') and not r.get('物理模型表_表名'))
|
||||||
|
element_physical_match = sum(1 for r in merged_data if r.get('元素治理表_表名') and r.get('物理模型表_表名') and not r.get('逻辑模型表_表名'))
|
||||||
|
element_only_match = sum(1 for r in merged_data if r.get('元素治理表_表名') and not r.get('逻辑模型表_表名') and not r.get('物理模型表_表名'))
|
||||||
|
|
||||||
|
print(f"\n[INFO] 统计信息:")
|
||||||
|
print(f" 三表匹配: {three_table_match} 条")
|
||||||
|
print(f" 元素治理+逻辑模型: {element_logical_match} 条")
|
||||||
|
print(f" 元素治理+物理模型: {element_physical_match} 条")
|
||||||
|
print(f" 仅元素治理: {element_only_match} 条")
|
||||||
|
|
||||||
|
# 显示前3条记录的字段名
|
||||||
|
if merged_data:
|
||||||
|
print(f"\n[INFO] 合并记录示例:")
|
||||||
|
sample_record = merged_data[0]
|
||||||
|
print(f" 字段数量: {len(sample_record)}")
|
||||||
|
print(f" 字段名: {list(sample_record.keys())[:10]}...") # 只显示前10个字段
|
||||||
|
|
||||||
|
return {
|
||||||
|
'success': True,
|
||||||
|
'merged_count': len(merged_data),
|
||||||
|
'output_file': output_path,
|
||||||
|
'file_size_kb': file_size,
|
||||||
|
'statistics': {
|
||||||
|
'三表匹配': three_table_match,
|
||||||
|
'元素治理+逻辑模型': element_logical_match,
|
||||||
|
'元素治理+物理模型': element_physical_match,
|
||||||
|
'仅元素治理': element_only_match
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"\n[ERROR] 保存文件失败: {e}")
|
||||||
|
import traceback
|
||||||
|
traceback.print_exc()
|
||||||
|
return {'success': False, 'merged_count': 0}
|
||||||
|
|
||||||
|
|
||||||
|
class RandomSelector:
|
||||||
|
"""随机选择器"""
|
||||||
|
|
||||||
|
def __init__(self, output_dir: str, random_seed: int = 42, select_count: int = 3000):
|
||||||
|
self.output_dir = output_dir
|
||||||
|
self.random_seed = random_seed
|
||||||
|
self.select_count = select_count
|
||||||
|
|
||||||
|
def load_json_file(self, file_path: str) -> List[Dict[str, Any]]:
|
||||||
|
"""加载JSON文件"""
|
||||||
|
try:
|
||||||
|
with open(file_path, 'r', encoding='utf-8') as f:
|
||||||
|
data = json.load(f)
|
||||||
|
print(f"[OK] 加载文件: {os.path.basename(file_path)} - {len(data)} 条记录")
|
||||||
|
return data
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[ERROR] 加载文件失败 {file_path}: {e}")
|
||||||
|
return []
|
||||||
|
|
||||||
|
def random_select(self, records: List[Dict[str, Any]], count: int) -> List[Dict[str, Any]]:
|
||||||
|
"""
|
||||||
|
随机抽取记录
|
||||||
|
|
||||||
|
Args:
|
||||||
|
records: 记录列表
|
||||||
|
count: 要抽取的数量
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
抽取的记录列表
|
||||||
|
"""
|
||||||
|
# 设置随机种子
|
||||||
|
random.seed(self.random_seed)
|
||||||
|
|
||||||
|
# 如果抽取数量大于等于总数,直接返回所有记录
|
||||||
|
if count >= len(records):
|
||||||
|
print(f"[WARN] 抽取数量 ({count}) 大于等于总记录数 ({len(records)}),返回所有记录")
|
||||||
|
return records
|
||||||
|
|
||||||
|
# 随机抽取
|
||||||
|
selected = random.sample(records, count)
|
||||||
|
print(f"[OK] 从 {len(records)} 条记录中随机抽取 {count} 条")
|
||||||
|
|
||||||
|
return selected
|
||||||
|
|
||||||
|
def select_random(self, input_file: str, output_file: str) -> Dict:
|
||||||
|
"""随机抽取记录"""
|
||||||
|
print("="*60)
|
||||||
|
print("随机抽取工具")
|
||||||
|
print("="*60)
|
||||||
|
|
||||||
|
# 构建文件路径
|
||||||
|
input_path = os.path.join(self.output_dir, input_file)
|
||||||
|
output_path = os.path.join(self.output_dir, output_file)
|
||||||
|
|
||||||
|
print(f"\n[INFO] 配置:")
|
||||||
|
print(f" 随机种子: {self.random_seed}")
|
||||||
|
print(f" 抽取数量: {self.select_count}")
|
||||||
|
|
||||||
|
# 检查输入文件是否存在
|
||||||
|
if not os.path.exists(input_path):
|
||||||
|
print(f"\n[ERROR] 输入文件不存在: {input_path}")
|
||||||
|
return {'success': False, 'selected_count': 0}
|
||||||
|
|
||||||
|
# 加载数据
|
||||||
|
print(f"\n[INFO] 加载数据...")
|
||||||
|
records = self.load_json_file(input_path)
|
||||||
|
|
||||||
|
if not records:
|
||||||
|
print(f"\n[ERROR] 无法加载数据或数据为空")
|
||||||
|
return {'success': False, 'selected_count': 0}
|
||||||
|
|
||||||
|
# 随机抽取
|
||||||
|
print(f"\n[INFO] 执行随机抽取...")
|
||||||
|
selected_records = self.random_select(records, self.select_count)
|
||||||
|
|
||||||
|
# 保存结果
|
||||||
|
try:
|
||||||
|
with open(output_path, 'w', encoding='utf-8') as f:
|
||||||
|
json.dump(selected_records, f, ensure_ascii=False, indent=2)
|
||||||
|
|
||||||
|
file_size = os.path.getsize(output_path) / 1024 # KB
|
||||||
|
print(f"\n[OK] 抽取完成!")
|
||||||
|
print(f" 输出文件: {output_path}")
|
||||||
|
print(f" 记录数量: {len(selected_records)}")
|
||||||
|
print(f" 文件大小: {file_size:.1f} KB")
|
||||||
|
|
||||||
|
# 显示前3条记录的字段名
|
||||||
|
if selected_records:
|
||||||
|
print(f"\n[INFO] 抽取记录示例:")
|
||||||
|
sample = selected_records[0]
|
||||||
|
print(f" 字段数量: {len(sample)}")
|
||||||
|
print(f" 字段名: {list(sample.keys())[:10]}...")
|
||||||
|
|
||||||
|
# 显示统计信息
|
||||||
|
three_table_match = sum(1 for r in selected_records if '元素治理表_表名' in r and '逻辑模型表_表名' in r and '物理模型表_表名' in r)
|
||||||
|
element_logical_match = sum(1 for r in selected_records if '元素治理表_表名' in r and '逻辑模型表_表名' in r and '物理模型表_表名' not in r)
|
||||||
|
element_physical_match = sum(1 for r in selected_records if '元素治理表_表名' in r and '物理模型表_表名' in r and '逻辑模型表_表名' not in r)
|
||||||
|
element_only_match = sum(1 for r in selected_records if '元素治理表_表名' in r and '逻辑模型表_表名' not in r and '物理模型表_表名' not in r)
|
||||||
|
|
||||||
|
print(f"\n[INFO] 抽取记录统计:")
|
||||||
|
print(f" 三表匹配: {three_table_match} 条")
|
||||||
|
print(f" 元素治理+逻辑模型: {element_logical_match} 条")
|
||||||
|
print(f" 元素治理+物理模型: {element_physical_match} 条")
|
||||||
|
print(f" 仅元素治理: {element_only_match} 条")
|
||||||
|
|
||||||
|
return {
|
||||||
|
'success': True,
|
||||||
|
'selected_count': len(selected_records),
|
||||||
|
'output_file': output_path,
|
||||||
|
'file_size_kb': file_size,
|
||||||
|
'statistics': {
|
||||||
|
'三表匹配': three_table_match if selected_records else 0,
|
||||||
|
'元素治理+逻辑模型': element_logical_match if selected_records else 0,
|
||||||
|
'元素治理+物理模型': element_physical_match if selected_records else 0,
|
||||||
|
'仅元素治理': element_only_match if selected_records else 0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"\n[ERROR] 保存文件失败: {e}")
|
||||||
|
import traceback
|
||||||
|
traceback.print_exc()
|
||||||
|
return {'success': False, 'selected_count': 0}
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
"""主函数 - 演示用法"""
|
||||||
|
# 配置路径
|
||||||
|
input_dir = "Data"
|
||||||
|
csv_input_dir = "Data_Export_CSV"
|
||||||
|
output_dir = "Data_Export_Json"
|
||||||
|
|
||||||
|
# 创建转换器实例
|
||||||
|
converter = ExcelToJsonConverter(input_dir, output_dir)
|
||||||
|
|
||||||
|
# 步骤1: Excel/CSV转JSON
|
||||||
|
print("\n" + "="*60)
|
||||||
|
print("步骤1: Excel/CSV转JSON")
|
||||||
|
print("="*60)
|
||||||
|
|
||||||
|
# 优先使用CSV模式
|
||||||
|
if os.path.exists(csv_input_dir) and os.listdir(csv_input_dir):
|
||||||
|
# CSV模式:使用现有的CSV文件
|
||||||
|
print(f"\n[INFO] 检测到CSV文件,使用CSV模式")
|
||||||
|
print(f" 从 {csv_input_dir} 读取CSV文件")
|
||||||
|
result = converter.convert_csv_directory(csv_input_dir)
|
||||||
|
else:
|
||||||
|
# Excel模式:使用Excel文件(备选方案)
|
||||||
|
excel_files = converter.find_excel_files()
|
||||||
|
if excel_files:
|
||||||
|
print(f"\n[INFO] 未找到CSV文件,使用Excel模式")
|
||||||
|
print(f" 从 {input_dir} 读取Excel文件")
|
||||||
|
result = converter.process_all()
|
||||||
|
else:
|
||||||
|
print(f"\n[WARN] 未找到CSV文件和Excel文件")
|
||||||
|
result = {'total': 0, 'success': 0, 'failed': 0}
|
||||||
|
|
||||||
|
print(f"\n[INFO] 转换结果: {result}")
|
||||||
|
|
||||||
|
# 步骤2: 合并JSON文件
|
||||||
|
print("\n" + "="*60)
|
||||||
|
print("步骤2: JSON合并")
|
||||||
|
print("="*60)
|
||||||
|
|
||||||
|
merger = JsonMerger(output_dir)
|
||||||
|
merge_result = merger.merge_all(
|
||||||
|
logical_file="远光数据架构逻辑模型表.json",
|
||||||
|
physical_file="远光数据架构物理模型表.json",
|
||||||
|
element_file="远光数据架构元素治理模板表.json",
|
||||||
|
output_file="final.json"
|
||||||
|
)
|
||||||
|
|
||||||
|
# 步骤3: 随机抽取
|
||||||
|
print("\n" + "="*60)
|
||||||
|
print("步骤3: 随机抽取")
|
||||||
|
print("="*60)
|
||||||
|
|
||||||
|
selector = RandomSelector(output_dir, random_seed=42, select_count=3000)
|
||||||
|
select_result = selector.select_random(
|
||||||
|
input_file="final.json",
|
||||||
|
output_file="selected.json"
|
||||||
|
)
|
||||||
|
|
||||||
|
# 最终结果
|
||||||
|
print("\n" + "="*60)
|
||||||
|
print("处理完成!")
|
||||||
|
print("="*60)
|
||||||
|
print(f"Excel/CSV转JSON: {result}")
|
||||||
|
print(f"JSON合并: {merge_result}")
|
||||||
|
print(f"随机抽取: {select_result}")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
226
merge_json_fast.py
Normal file
226
merge_json_fast.py
Normal file
@@ -0,0 +1,226 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
"""
|
||||||
|
优化版JSON文件合并脚本
|
||||||
|
根据字段英文名匹配逻辑模型表、物理模型表和元素治理模板表的数据
|
||||||
|
"""
|
||||||
|
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
from collections import defaultdict
|
||||||
|
from typing import Dict, List, Any
|
||||||
|
|
||||||
|
def load_json_file(file_path: str) -> List[Dict[str, Any]]:
|
||||||
|
"""加载JSON文件"""
|
||||||
|
try:
|
||||||
|
with open(file_path, 'r', encoding='utf-8') as f:
|
||||||
|
data = json.load(f)
|
||||||
|
print(f"[OK] 加载文件: {os.path.basename(file_path)} - {len(data)} 条记录")
|
||||||
|
return data
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[ERROR] 加载文件失败 {file_path}: {e}")
|
||||||
|
return []
|
||||||
|
|
||||||
|
def build_index(records: List[Dict], field_name: str) -> Dict[str, List[Dict]]:
|
||||||
|
"""为记录列表建立索引,加速查找"""
|
||||||
|
index = defaultdict(list)
|
||||||
|
for record in records:
|
||||||
|
field_value = record.get(field_name)
|
||||||
|
if field_value:
|
||||||
|
index[field_value].append(record)
|
||||||
|
print(f"[INFO] 建立索引完成: {len(index)} 个唯一字段值")
|
||||||
|
return index
|
||||||
|
|
||||||
|
def merge_records_optimized(logical_index: Dict, physical_index: Dict, element_records: List[Dict]) -> List[Dict]:
|
||||||
|
"""
|
||||||
|
使用索引优化合并三个表的记录
|
||||||
|
"""
|
||||||
|
merged_data = []
|
||||||
|
processed_fields = set()
|
||||||
|
|
||||||
|
# 遍历元素治理表
|
||||||
|
print(f"\n[INFO] 开始合并数据...")
|
||||||
|
for i, element_record in enumerate(element_records):
|
||||||
|
if i % 5000 == 0:
|
||||||
|
print(f" 处理进度: {i}/{len(element_records)}")
|
||||||
|
|
||||||
|
field_english_name = element_record.get('字段英文名')
|
||||||
|
if not field_english_name or field_english_name in processed_fields:
|
||||||
|
continue
|
||||||
|
|
||||||
|
processed_fields.add(field_english_name)
|
||||||
|
|
||||||
|
# 创建合并记录
|
||||||
|
merged_record = {}
|
||||||
|
|
||||||
|
# 添加元素治理模板表的数据
|
||||||
|
for key, value in element_record.items():
|
||||||
|
if key != '表名':
|
||||||
|
merged_record[key] = value
|
||||||
|
|
||||||
|
# 查找逻辑模型表中的匹配记录
|
||||||
|
logical_matches = logical_index.get(field_english_name, [])
|
||||||
|
|
||||||
|
# 查找物理模型表中的匹配记录
|
||||||
|
physical_matches = physical_index.get(field_english_name, [])
|
||||||
|
|
||||||
|
# 添加逻辑模型表的数据(添加前缀避免冲突)
|
||||||
|
if logical_matches:
|
||||||
|
for logical_match in logical_matches:
|
||||||
|
for key, value in logical_match.items():
|
||||||
|
if key not in ['表名', '字段英文名']:
|
||||||
|
new_key = f"逻辑模型_{key}"
|
||||||
|
merged_record[new_key] = value
|
||||||
|
|
||||||
|
# 只有当有匹配数据时才添加表名信息
|
||||||
|
merged_record['逻辑模型表_表名'] = '远光数据架构逻辑模型表'
|
||||||
|
|
||||||
|
# 添加物理模型表的数据(添加前缀避免冲突)
|
||||||
|
if physical_matches:
|
||||||
|
for physical_match in physical_matches:
|
||||||
|
for key, value in physical_match.items():
|
||||||
|
if key not in ['表名', '字段英文名']:
|
||||||
|
new_key = f"物理模型_{key}"
|
||||||
|
merged_record[new_key] = value
|
||||||
|
|
||||||
|
# 只有当有匹配数据时才添加表名信息
|
||||||
|
merged_record['物理模型表_表名'] = '远光数据架构物理模型表'
|
||||||
|
|
||||||
|
# 添加元素治理表名(始终存在)
|
||||||
|
merged_record['元素治理表_表名'] = '远光数据架构元素治理模板表'
|
||||||
|
|
||||||
|
merged_data.append(merged_record)
|
||||||
|
|
||||||
|
print(f" 完成合并: {len(merged_data)} 条记录")
|
||||||
|
|
||||||
|
return merged_data
|
||||||
|
|
||||||
|
def add_unmatched_records_optimized(merged_data: List[Dict],
|
||||||
|
logical_index: Dict,
|
||||||
|
physical_index: Dict) -> List[Dict]:
|
||||||
|
"""
|
||||||
|
添加未匹配的记录
|
||||||
|
"""
|
||||||
|
print(f"\n[INFO] 处理未匹配的记录...")
|
||||||
|
|
||||||
|
# 获取所有已处理的字段英文名
|
||||||
|
processed_fields = {record.get('字段英文名') for record in merged_data if record.get('字段英文名')}
|
||||||
|
|
||||||
|
# 添加逻辑模型表中未匹配的记录
|
||||||
|
logical_unmatched = len(logical_index) - len([f for f in logical_index if f in processed_fields])
|
||||||
|
print(f" 逻辑模型表未匹配: {logical_unmatched} 条")
|
||||||
|
|
||||||
|
for field_name, logical_matches in logical_index.items():
|
||||||
|
if field_name not in processed_fields:
|
||||||
|
for logical_match in logical_matches:
|
||||||
|
merged_record = {'字段英文名': field_name}
|
||||||
|
|
||||||
|
for key, value in logical_match.items():
|
||||||
|
if key not in ['表名', '字段英文名']:
|
||||||
|
merged_record[f"逻辑模型_{key}"] = value
|
||||||
|
|
||||||
|
merged_record['逻辑模型表_表名'] = '远光数据架构逻辑模型表'
|
||||||
|
merged_record['物理模型表_表名'] = None
|
||||||
|
merged_record['元素治理表_表名'] = None
|
||||||
|
|
||||||
|
merged_data.append(merged_record)
|
||||||
|
|
||||||
|
# 添加物理模型表中未匹配的记录
|
||||||
|
physical_unmatched = len(physical_index) - len([f for f in physical_index if f in processed_fields])
|
||||||
|
print(f" 物理模型表未匹配: {physical_unmatched} 条")
|
||||||
|
|
||||||
|
for field_name, physical_matches in physical_index.items():
|
||||||
|
if field_name not in processed_fields:
|
||||||
|
# 检查是否已经添加过(通过逻辑模型表)
|
||||||
|
already_added = any(r.get('字段英文名') == field_name for r in merged_data)
|
||||||
|
|
||||||
|
if not already_added:
|
||||||
|
for physical_match in physical_matches:
|
||||||
|
merged_record = {'字段英文名': field_name}
|
||||||
|
|
||||||
|
for key, value in physical_match.items():
|
||||||
|
if key not in ['表名', '字段英文名']:
|
||||||
|
merged_record[f"物理模型_{key}"] = value
|
||||||
|
|
||||||
|
merged_record['逻辑模型表_表名'] = None
|
||||||
|
merged_record['物理模型表_表名'] = '远光数据架构物理模型表'
|
||||||
|
merged_record['元素治理表_表名'] = None
|
||||||
|
|
||||||
|
merged_data.append(merged_record)
|
||||||
|
|
||||||
|
return merged_data
|
||||||
|
|
||||||
|
def main():
|
||||||
|
"""主函数"""
|
||||||
|
print("="*60)
|
||||||
|
print("优化版JSON文件合并工具")
|
||||||
|
print("="*60)
|
||||||
|
|
||||||
|
# 文件路径
|
||||||
|
logical_json_path = "Data_Export_Json/远光数据架构逻辑模型表.json"
|
||||||
|
physical_json_path = "Data_Export_Json/远光数据架构物理模型表.json"
|
||||||
|
element_json_path = "Data_Export_Json/远光数据架构元素治理模板表.json"
|
||||||
|
output_path = "Data_Export_Json/final.json"
|
||||||
|
|
||||||
|
# 加载JSON文件
|
||||||
|
print("\n[INFO] 加载JSON文件...")
|
||||||
|
logical_records = load_json_file(logical_json_path)
|
||||||
|
physical_records = load_json_file(physical_json_path)
|
||||||
|
element_records = load_json_file(element_json_path)
|
||||||
|
|
||||||
|
if not (logical_records and physical_records and element_records):
|
||||||
|
print("\n[ERROR] 无法加载所有JSON文件")
|
||||||
|
return
|
||||||
|
|
||||||
|
# 建立索引
|
||||||
|
print(f"\n[INFO] 建立索引加速查找...")
|
||||||
|
logical_index = build_index(logical_records, '字段英文名')
|
||||||
|
physical_index = build_index(physical_records, '字段英文名')
|
||||||
|
|
||||||
|
# 合并数据(只处理元素治理表中存在的字段)
|
||||||
|
merged_data = merge_records_optimized(logical_index, physical_index, element_records)
|
||||||
|
|
||||||
|
# 不再添加未匹配的记录,因为用户只关心元素治理表中的字段
|
||||||
|
|
||||||
|
# 保存合并后的数据
|
||||||
|
try:
|
||||||
|
print(f"\n[INFO] 保存合并数据到 {output_path}...")
|
||||||
|
with open(output_path, 'w', encoding='utf-8') as f:
|
||||||
|
json.dump(merged_data, f, ensure_ascii=False, indent=2)
|
||||||
|
|
||||||
|
file_size = os.path.getsize(output_path) / 1024 # KB
|
||||||
|
print(f"\n[OK] 合并完成!")
|
||||||
|
print(f" 输出文件: {output_path}")
|
||||||
|
print(f" 合并记录: {len(merged_data)} 条")
|
||||||
|
print(f" 文件大小: {file_size:.1f} KB")
|
||||||
|
|
||||||
|
# 显示统计信息
|
||||||
|
three_table_match = sum(1 for r in merged_data if r.get('元素治理表_表名') and r.get('逻辑模型表_表名') and r.get('物理模型表_表名'))
|
||||||
|
element_logical_match = sum(1 for r in merged_data if r.get('元素治理表_表名') and r.get('逻辑模型表_表名') and not r.get('物理模型表_表名'))
|
||||||
|
element_physical_match = sum(1 for r in merged_data if r.get('元素治理表_表名') and r.get('物理模型表_表名') and not r.get('逻辑模型表_表名'))
|
||||||
|
element_only_match = sum(1 for r in merged_data if r.get('元素治理表_表名') and not r.get('逻辑模型表_表名') and not r.get('物理模型表_表名'))
|
||||||
|
logical_only_count = sum(1 for r in merged_data if r.get('逻辑模型表_表名') and not r.get('元素治理表_表名'))
|
||||||
|
physical_only_count = sum(1 for r in merged_data if r.get('物理模型表_表名') and not r.get('元素治理表_表名'))
|
||||||
|
|
||||||
|
print(f"\n[INFO] 统计信息:")
|
||||||
|
print(f" 三表匹配: {three_table_match} 条")
|
||||||
|
print(f" 元素治理+逻辑模型: {element_logical_match} 条")
|
||||||
|
print(f" 元素治理+物理模型: {element_physical_match} 条")
|
||||||
|
print(f" 仅元素治理: {element_only_match} 条")
|
||||||
|
print(f" 仅逻辑模型: {logical_only_count} 条")
|
||||||
|
print(f" 仅物理模型: {physical_only_count} 条")
|
||||||
|
|
||||||
|
# 显示前3条记录的字段名
|
||||||
|
if merged_data:
|
||||||
|
print(f"\n[INFO] 合并记录示例:")
|
||||||
|
sample_record = merged_data[0]
|
||||||
|
print(f" 字段数量: {len(sample_record)}")
|
||||||
|
print(f" 字段名: {list(sample_record.keys())[:10]}...") # 只显示前10个字段
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"\n[ERROR] 保存文件失败: {e}")
|
||||||
|
import traceback
|
||||||
|
traceback.print_exc()
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
1757
qa_generator.py
1757
qa_generator.py
File diff suppressed because it is too large
Load Diff
120
random_select.py
Normal file
120
random_select.py
Normal file
@@ -0,0 +1,120 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
"""
|
||||||
|
随机抽取脚本
|
||||||
|
从final.json中随机抽取指定数量的记录,生成select_N.json文件
|
||||||
|
"""
|
||||||
|
|
||||||
|
import json
|
||||||
|
import random
|
||||||
|
import os
|
||||||
|
from typing import List, Dict, Any
|
||||||
|
from config import QAConfig
|
||||||
|
|
||||||
|
def load_json_file(file_path: str) -> List[Dict[str, Any]]:
|
||||||
|
"""加载JSON文件"""
|
||||||
|
try:
|
||||||
|
with open(file_path, 'r', encoding='utf-8') as f:
|
||||||
|
data = json.load(f)
|
||||||
|
print(f"[OK] 加载文件: {os.path.basename(file_path)} - {len(data)} 条记录")
|
||||||
|
return data
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[ERROR] 加载文件失败 {file_path}: {e}")
|
||||||
|
return []
|
||||||
|
|
||||||
|
def random_select(records: List[Dict[str, Any]], count: int, random_seed: int) -> List[Dict[str, Any]]:
|
||||||
|
"""
|
||||||
|
随机抽取记录
|
||||||
|
|
||||||
|
Args:
|
||||||
|
records: 记录列表
|
||||||
|
count: 要抽取的数量
|
||||||
|
random_seed: 随机种子
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
抽取的记录列表
|
||||||
|
"""
|
||||||
|
# 设置随机种子
|
||||||
|
random.seed(random_seed)
|
||||||
|
|
||||||
|
# 如果抽取数量大于等于总数,直接返回所有记录
|
||||||
|
if count >= len(records):
|
||||||
|
print(f"[WARN] 抽取数量 ({count}) 大于等于总记录数 ({len(records)}),返回所有记录")
|
||||||
|
return records
|
||||||
|
|
||||||
|
# 随机抽取
|
||||||
|
selected = random.sample(records, count)
|
||||||
|
print(f"[OK] 从 {len(records)} 条记录中随机抽取 {count} 条")
|
||||||
|
|
||||||
|
return selected
|
||||||
|
|
||||||
|
def main():
|
||||||
|
"""主函数"""
|
||||||
|
print("="*60)
|
||||||
|
print("随机抽取工具")
|
||||||
|
print("="*60)
|
||||||
|
|
||||||
|
# 加载配置
|
||||||
|
config = QAConfig()
|
||||||
|
print(f"\n[INFO] 加载配置:")
|
||||||
|
print(f" 随机种子: {config.RANDOM_SEED}")
|
||||||
|
print(f" 抽取数量: {config.SELECT_COUNT}")
|
||||||
|
|
||||||
|
# 文件路径
|
||||||
|
input_file = os.path.join(config.INPUT_DIR, "final.json")
|
||||||
|
output_file = os.path.join(config.INPUT_DIR, "selected.json")
|
||||||
|
|
||||||
|
# 检查输入文件是否存在
|
||||||
|
if not os.path.exists(input_file):
|
||||||
|
print(f"\n[ERROR] 输入文件不存在: {input_file}")
|
||||||
|
return
|
||||||
|
|
||||||
|
# 加载数据
|
||||||
|
print(f"\n[INFO] 加载数据...")
|
||||||
|
records = load_json_file(input_file)
|
||||||
|
|
||||||
|
if not records:
|
||||||
|
print(f"\n[ERROR] 无法加载数据或数据为空")
|
||||||
|
return
|
||||||
|
|
||||||
|
# 随机抽取
|
||||||
|
print(f"\n[INFO] 执行随机抽取...")
|
||||||
|
selected_records = random_select(records, config.SELECT_COUNT, config.RANDOM_SEED)
|
||||||
|
|
||||||
|
# 保存结果
|
||||||
|
try:
|
||||||
|
with open(output_file, 'w', encoding='utf-8') as f:
|
||||||
|
json.dump(selected_records, f, ensure_ascii=False, indent=2)
|
||||||
|
|
||||||
|
file_size = os.path.getsize(output_file) / 1024 # KB
|
||||||
|
print(f"\n[OK] 抽取完成!")
|
||||||
|
print(f" 输出文件: {output_file}")
|
||||||
|
print(f" 记录数量: {len(selected_records)}")
|
||||||
|
print(f" 文件大小: {file_size:.1f} KB")
|
||||||
|
|
||||||
|
# 显示前3条记录的字段名
|
||||||
|
if selected_records:
|
||||||
|
print(f"\n[INFO] 抽取记录示例:")
|
||||||
|
sample = selected_records[0]
|
||||||
|
print(f" 字段数量: {len(sample)}")
|
||||||
|
print(f" 字段名: {list(sample.keys())[:10]}...")
|
||||||
|
|
||||||
|
# 显示统计信息
|
||||||
|
three_table_match = sum(1 for r in selected_records if '元素治理表_表名' in r and '逻辑模型表_表名' in r and '物理模型表_表名' in r)
|
||||||
|
element_logical_match = sum(1 for r in selected_records if '元素治理表_表名' in r and '逻辑模型表_表名' in r and '物理模型表_表名' not in r)
|
||||||
|
element_physical_match = sum(1 for r in selected_records if '元素治理表_表名' in r and '物理模型表_表名' in r and '逻辑模型表_表名' not in r)
|
||||||
|
element_only_match = sum(1 for r in selected_records if '元素治理表_表名' in r and '逻辑模型表_表名' not in r and '物理模型表_表名' not in r)
|
||||||
|
|
||||||
|
print(f"\n[INFO] 抽取记录统计:")
|
||||||
|
print(f" 三表匹配: {three_table_match} 条")
|
||||||
|
print(f" 元素治理+逻辑模型: {element_logical_match} 条")
|
||||||
|
print(f" 元素治理+物理模型: {element_physical_match} 条")
|
||||||
|
print(f" 仅元素治理: {element_only_match} 条")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"\n[ERROR] 保存文件失败: {e}")
|
||||||
|
import traceback
|
||||||
|
traceback.print_exc()
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
3
requirements.txt
Normal file
3
requirements.txt
Normal file
@@ -0,0 +1,3 @@
|
|||||||
|
pandas>=1.3.0
|
||||||
|
xlwings>=0.24.0
|
||||||
|
openpyxl>=3.0.0
|
||||||
Reference in New Issue
Block a user