From 8088b59d30dff15e1c726dfba9c039e18e0fee86 Mon Sep 17 00:00:00 2001 From: "DESKTOP-72TV0V4\\caoxiaozhu" Date: Wed, 31 Dec 2025 18:15:50 +0800 Subject: [PATCH] =?UTF-8?q?1.=20=E4=BF=AE=E6=94=B9=E4=BA=86=E9=97=AE?= =?UTF-8?q?=E9=A2=98=E8=A1=A8=E8=BE=BE=202.=20=E7=BC=A9=E5=87=8F=E4=BA=86?= =?UTF-8?q?=E8=A1=A8=E7=9A=84=E6=95=B0=E9=87=8F=203.=20=E5=8F=AF=E4=BB=A5?= =?UTF-8?q?=E9=80=89=E6=8B=A9=E7=94=9F=E6=88=90=E5=A4=9A=E5=B0=91=E4=B8=AA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- change_xlsx2json.py | 266 ++++++++++++++++- config.py | 4 + merge_json_fast.py | 226 +++++++++++++++ qa_generator.py | 688 +++++++++++++------------------------------- random_select.py | 120 ++++++++ 5 files changed, 803 insertions(+), 501 deletions(-) create mode 100644 merge_json_fast.py create mode 100644 random_select.py diff --git a/change_xlsx2json.py b/change_xlsx2json.py index 162e365..8dfc55b 100644 --- a/change_xlsx2json.py +++ b/change_xlsx2json.py @@ -34,10 +34,8 @@ class ExcelToJsonConverter: if not os.path.exists(output_dir): os.makedirs(output_dir) - # CSV临时目录 - self.temp_csv_dir = os.path.join(output_dir, "temp_csv") - if not os.path.exists(self.temp_csv_dir): - os.makedirs(self.temp_csv_dir) + # CSV临时目录(仅在Excel模式下使用) + self.temp_csv_dir = None def find_excel_files(self) -> List[Tuple[str, str]]: """扫描目录下的所有Excel文件""" @@ -198,6 +196,12 @@ class ExcelToJsonConverter: Returns: CSV文件路径 """ + # 确保临时CSV目录存在 + if self.temp_csv_dir is None: + self.temp_csv_dir = os.path.join(self.output_dir, "temp_csv") + if not os.path.exists(self.temp_csv_dir): + os.makedirs(self.temp_csv_dir) + csv_filename = f"{base_name}.csv" csv_path = os.path.join(self.temp_csv_dir, csv_filename) @@ -240,7 +244,19 @@ class ExcelToJsonConverter: if pd.isna(value): json_obj[column] = None else: - json_obj[column] = value + # 处理数据值:如果是字符串且包含英文字母,转换为小写 + if isinstance(value, str) and any(c.isalpha() and ord(c) < 128 for c in value): + # 将数据值中的英文字母转换为小写 + value = value.lower() + + # 将英文字段名转换为小写 + # 检查字段名是否完全是英文字符(包括字母、数字、下划线) + if all(ord(c) < 128 for c in column if c.isalnum() or c in '_'): + # 完全是英文字段名,转换为小写 + json_obj[column.lower()] = value + else: + # 包含中文字符的字段名保持不变 + json_obj[column] = value # 添加表名字段 json_obj['表名'] = base_name @@ -364,18 +380,250 @@ class ExcelToJsonConverter: 'results': results } + def find_csv_files(self, csv_dir: str) -> List[Tuple[str, str]]: + """扫描目录下的所有CSV文件""" + csv_files = [] + search_pattern = os.path.join(csv_dir, "*.csv") + + for csv_path in glob.glob(search_pattern): + filename = os.path.basename(csv_path) + # 生成基础文件名(不含扩展名) + base_name = filename.replace('.csv', '') + csv_files.append((csv_path, base_name)) + + return csv_files + + def convert_csv_to_json_direct(self, csv_path: str, base_name: str) -> str: + """ + 直接将CSV文件转换为JSON(不生成临时CSV) + 这个方法直接从CSV读取并转换为JSON + + Args: + csv_path: CSV文件路径 + base_name: 文件基础名 + + Returns: + JSON文件路径 + """ + try: + # 尝试多种编码读取CSV文件 + encodings = ['utf-8-sig', 'gb2312', 'gbk', 'utf-8'] + df = None + + for encoding in encodings: + try: + print(f" [TRY] 尝试编码: {encoding}") + df = pd.read_csv(csv_path, encoding=encoding) + print(f" [OK] 编码 {encoding} 读取成功") + break + except (UnicodeDecodeError, UnicodeError): + print(f" [WARN] 编码 {encoding} 失败") + continue + except Exception as e: + print(f" [WARN] 编码 {encoding} 其他错误: {str(e)[:50]}") + continue + + if df is None: + print(f" [ERROR] 所有编码都失败,无法读取CSV文件") + return "" + + if df.empty: + print(f" [WARN] CSV文件为空") + return "" + + # 转换为JSON列表 + json_data = [] + for index, row in df.iterrows(): + # 创建JSON对象 + json_obj = {} + for column in df.columns: + value = row[column] + + # 处理Na值 + if pd.isna(value): + json_obj[column] = None + else: + # 处理数据值:如果是字符串且包含英文字母,转换为小写 + if isinstance(value, str) and any(c.isalpha() and ord(c) < 128 for c in value): + # 将数据值中的英文字母转换为小写 + value = value.lower() + + # 将英文字段名转换为小写 + # 检查字段名是否完全是英文字符(包括字母、数字、下划线) + if all(ord(c) < 128 for c in column if c.isalnum() or c in '_'): + # 完全是英文字段名,转换为小写 + json_obj[column.lower()] = value + else: + # 包含中文字符的字段名保持不变 + json_obj[column] = value + + # 添加表名字段 + json_obj['表名'] = base_name + + json_data.append(json_obj) + + # 生成JSON文件路径 + json_filename = f"{base_name}.json" + json_path = os.path.join(self.output_dir, json_filename) + + # 保存JSON文件 + with open(json_path, 'w', encoding='utf-8') as f: + json.dump(json_data, f, ensure_ascii=False, indent=2) + + file_size = os.path.getsize(json_path) / 1024 # KB + print(f" [OK] JSON已生成: {json_filename} ({file_size:.1f} KB)") + print(f" 数据量: {len(json_data)} 条记录") + + return json_path + + except Exception as e: + print(f" [ERROR] CSV转JSON失败: {e}") + import traceback + traceback.print_exc() + return "" + + def process_single_csv(self, csv_path: str, base_name: str) -> bool: + """ + 处理单个CSV文件:CSV → JSON + + Args: + csv_path: CSV文件路径 + base_name: 文件基础名 + + Returns: + 是否成功 + """ + print(f"\n{'='*60}") + print(f"处理: {os.path.basename(csv_path)}") + print(f"{'='*60}") + + # 步骤1: 读取CSV文件并预览 + try: + # 尝试多种编码读取CSV文件 + encodings = ['utf-8-sig', 'gb2312', 'gbk', 'utf-8'] + df = None + + for encoding in encodings: + try: + df = pd.read_csv(csv_path, encoding=encoding) + break + except (UnicodeDecodeError, UnicodeError): + continue + except Exception as e: + print(f"[ERROR] 编码 {encoding} 错误: {e}") + continue + + if df is None or df.empty: + print(f"[ERROR] CSV文件为空或读取失败") + return False + + print(f"\n[INFO] 数据预览:") + print(df.head(3)) + print(f"\n[INFO] 数据形状: {df.shape[0]}行 × {df.shape[1]}列") + + except Exception as e: + print(f"[ERROR] 读取CSV失败: {e}") + return False + + # 步骤2: 转换为JSON + json_path = self.convert_csv_to_json_direct(csv_path, base_name) + + if json_path: + print(f"\n[OK] 转换完成!") + return True + else: + print(f"\n[ERROR] 转换失败") + return False + + def convert_csv_directory(self, csv_dir: str) -> Dict: + """ + 处理CSV目录下的所有CSV文件 + + Args: + csv_dir: CSV文件目录 + + Returns: + 处理结果统计 + """ + print("="*60) + print("CSV转JSON工具") + print("="*60) + print(f"CSV输入目录: {csv_dir}") + print(f"JSON输出目录: {self.output_dir}") + + # 查找CSV文件 + csv_files = self.find_csv_files(csv_dir) + + if not csv_files: + print(f"\n[WARN] 未找到任何CSV文件") + return {'total': 0, 'success': 0, 'failed': 0} + + print(f"\n[INFO] 发现 {len(csv_files)} 个CSV文件") + + # 处理每个文件 + success_count = 0 + failed_count = 0 + results = [] + + for csv_path, base_name in csv_files: + if self.process_single_csv(csv_path, base_name): + success_count += 1 + results.append({'file': os.path.basename(csv_path), 'status': 'success'}) + else: + failed_count += 1 + results.append({'file': os.path.basename(csv_path), 'status': 'failed'}) + + # 输出统计信息 + print(f"\n{'='*60}") + print("转换完成!") + print(f"{'='*60}") + print(f"总计: {len(csv_files)} 个文件") + print(f"成功: {success_count} 个文件") + print(f"失败: {failed_count} 个文件") + + # 显示生成的JSON文件 + if success_count > 0: + print(f"\n生成的JSON文件:") + json_files = glob.glob(os.path.join(self.output_dir, "*.json")) + for json_file in sorted(json_files): + file_size = os.path.getsize(json_file) / 1024 # KB + filename = os.path.basename(json_file) + print(f" - {filename} ({file_size:.1f} KB)") + + return { + 'total': len(csv_files), + 'success': success_count, + 'failed': failed_count, + 'results': results + } + def main(): """主函数 - 演示用法""" # 配置路径 - input_dir = r"d:\Code\Test\Table_Data_Test\Data" - output_dir = r"d:\Code\Test\Table_Data_Test\Data_Export_Json" + input_dir = "Data" + csv_input_dir = "Data_Export_CSV" + output_dir = "Data_Export_Json" # 创建转换器实例 converter = ExcelToJsonConverter(input_dir, output_dir) - # 处理所有文件 - result = converter.process_all() + # 优先使用CSV模式 + if os.path.exists(csv_input_dir) and os.listdir(csv_input_dir): + # CSV模式:使用现有的CSV文件 + print(f"\n[INFO] 检测到CSV文件,使用CSV模式") + print(f" 从 {csv_input_dir} 读取CSV文件") + result = converter.convert_csv_directory(csv_input_dir) + else: + # Excel模式:使用Excel文件(备选方案) + excel_files = converter.find_excel_files() + if excel_files: + print(f"\n[INFO] 未找到CSV文件,使用Excel模式") + print(f" 从 {input_dir} 读取Excel文件") + result = converter.process_all() + else: + print(f"\n[WARN] 未找到CSV文件和Excel文件") + result = {'total': 0, 'success': 0, 'failed': 0} # 输出结果 print(f"\n[INFO] 处理结果: {result}") diff --git a/config.py b/config.py index 6eda789..d4318f3 100644 --- a/config.py +++ b/config.py @@ -20,6 +20,10 @@ class QAConfig: self.INPUT_DIR = "Data_Export_Json" self.OUTPUT_DIR = "Data_QA_Outputs" + # ========== 随机抽取配置 ========== + # 从final.json随机抽取的记录个数,生成selected.json文件 + self.SELECT_COUNT = 3000 + # ========== 问题数量控制 ========== # 每个数据项生成的基本问题数量(简单模式下) self.BASIC_QUESTIONS_PER_ITEM = 1 diff --git a/merge_json_fast.py b/merge_json_fast.py new file mode 100644 index 0000000..475386f --- /dev/null +++ b/merge_json_fast.py @@ -0,0 +1,226 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +优化版JSON文件合并脚本 +根据字段英文名匹配逻辑模型表、物理模型表和元素治理模板表的数据 +""" + +import json +import os +from collections import defaultdict +from typing import Dict, List, Any + +def load_json_file(file_path: str) -> List[Dict[str, Any]]: + """加载JSON文件""" + try: + with open(file_path, 'r', encoding='utf-8') as f: + data = json.load(f) + print(f"[OK] 加载文件: {os.path.basename(file_path)} - {len(data)} 条记录") + return data + except Exception as e: + print(f"[ERROR] 加载文件失败 {file_path}: {e}") + return [] + +def build_index(records: List[Dict], field_name: str) -> Dict[str, List[Dict]]: + """为记录列表建立索引,加速查找""" + index = defaultdict(list) + for record in records: + field_value = record.get(field_name) + if field_value: + index[field_value].append(record) + print(f"[INFO] 建立索引完成: {len(index)} 个唯一字段值") + return index + +def merge_records_optimized(logical_index: Dict, physical_index: Dict, element_records: List[Dict]) -> List[Dict]: + """ + 使用索引优化合并三个表的记录 + """ + merged_data = [] + processed_fields = set() + + # 遍历元素治理表 + print(f"\n[INFO] 开始合并数据...") + for i, element_record in enumerate(element_records): + if i % 5000 == 0: + print(f" 处理进度: {i}/{len(element_records)}") + + field_english_name = element_record.get('字段英文名') + if not field_english_name or field_english_name in processed_fields: + continue + + processed_fields.add(field_english_name) + + # 创建合并记录 + merged_record = {} + + # 添加元素治理模板表的数据 + for key, value in element_record.items(): + if key != '表名': + merged_record[key] = value + + # 查找逻辑模型表中的匹配记录 + logical_matches = logical_index.get(field_english_name, []) + + # 查找物理模型表中的匹配记录 + physical_matches = physical_index.get(field_english_name, []) + + # 添加逻辑模型表的数据(添加前缀避免冲突) + if logical_matches: + for logical_match in logical_matches: + for key, value in logical_match.items(): + if key not in ['表名', '字段英文名']: + new_key = f"逻辑模型_{key}" + merged_record[new_key] = value + + # 只有当有匹配数据时才添加表名信息 + merged_record['逻辑模型表_表名'] = '远光数据架构逻辑模型表' + + # 添加物理模型表的数据(添加前缀避免冲突) + if physical_matches: + for physical_match in physical_matches: + for key, value in physical_match.items(): + if key not in ['表名', '字段英文名']: + new_key = f"物理模型_{key}" + merged_record[new_key] = value + + # 只有当有匹配数据时才添加表名信息 + merged_record['物理模型表_表名'] = '远光数据架构物理模型表' + + # 添加元素治理表名(始终存在) + merged_record['元素治理表_表名'] = '远光数据架构元素治理模板表' + + merged_data.append(merged_record) + + print(f" 完成合并: {len(merged_data)} 条记录") + + return merged_data + +def add_unmatched_records_optimized(merged_data: List[Dict], + logical_index: Dict, + physical_index: Dict) -> List[Dict]: + """ + 添加未匹配的记录 + """ + print(f"\n[INFO] 处理未匹配的记录...") + + # 获取所有已处理的字段英文名 + processed_fields = {record.get('字段英文名') for record in merged_data if record.get('字段英文名')} + + # 添加逻辑模型表中未匹配的记录 + logical_unmatched = len(logical_index) - len([f for f in logical_index if f in processed_fields]) + print(f" 逻辑模型表未匹配: {logical_unmatched} 条") + + for field_name, logical_matches in logical_index.items(): + if field_name not in processed_fields: + for logical_match in logical_matches: + merged_record = {'字段英文名': field_name} + + for key, value in logical_match.items(): + if key not in ['表名', '字段英文名']: + merged_record[f"逻辑模型_{key}"] = value + + merged_record['逻辑模型表_表名'] = '远光数据架构逻辑模型表' + merged_record['物理模型表_表名'] = None + merged_record['元素治理表_表名'] = None + + merged_data.append(merged_record) + + # 添加物理模型表中未匹配的记录 + physical_unmatched = len(physical_index) - len([f for f in physical_index if f in processed_fields]) + print(f" 物理模型表未匹配: {physical_unmatched} 条") + + for field_name, physical_matches in physical_index.items(): + if field_name not in processed_fields: + # 检查是否已经添加过(通过逻辑模型表) + already_added = any(r.get('字段英文名') == field_name for r in merged_data) + + if not already_added: + for physical_match in physical_matches: + merged_record = {'字段英文名': field_name} + + for key, value in physical_match.items(): + if key not in ['表名', '字段英文名']: + merged_record[f"物理模型_{key}"] = value + + merged_record['逻辑模型表_表名'] = None + merged_record['物理模型表_表名'] = '远光数据架构物理模型表' + merged_record['元素治理表_表名'] = None + + merged_data.append(merged_record) + + return merged_data + +def main(): + """主函数""" + print("="*60) + print("优化版JSON文件合并工具") + print("="*60) + + # 文件路径 + logical_json_path = "Data_Export_Json/远光数据架构逻辑模型表.json" + physical_json_path = "Data_Export_Json/远光数据架构物理模型表.json" + element_json_path = "Data_Export_Json/远光数据架构元素治理模板表.json" + output_path = "Data_Export_Json/final.json" + + # 加载JSON文件 + print("\n[INFO] 加载JSON文件...") + logical_records = load_json_file(logical_json_path) + physical_records = load_json_file(physical_json_path) + element_records = load_json_file(element_json_path) + + if not (logical_records and physical_records and element_records): + print("\n[ERROR] 无法加载所有JSON文件") + return + + # 建立索引 + print(f"\n[INFO] 建立索引加速查找...") + logical_index = build_index(logical_records, '字段英文名') + physical_index = build_index(physical_records, '字段英文名') + + # 合并数据(只处理元素治理表中存在的字段) + merged_data = merge_records_optimized(logical_index, physical_index, element_records) + + # 不再添加未匹配的记录,因为用户只关心元素治理表中的字段 + + # 保存合并后的数据 + try: + print(f"\n[INFO] 保存合并数据到 {output_path}...") + with open(output_path, 'w', encoding='utf-8') as f: + json.dump(merged_data, f, ensure_ascii=False, indent=2) + + file_size = os.path.getsize(output_path) / 1024 # KB + print(f"\n[OK] 合并完成!") + print(f" 输出文件: {output_path}") + print(f" 合并记录: {len(merged_data)} 条") + print(f" 文件大小: {file_size:.1f} KB") + + # 显示统计信息 + three_table_match = sum(1 for r in merged_data if r.get('元素治理表_表名') and r.get('逻辑模型表_表名') and r.get('物理模型表_表名')) + element_logical_match = sum(1 for r in merged_data if r.get('元素治理表_表名') and r.get('逻辑模型表_表名') and not r.get('物理模型表_表名')) + element_physical_match = sum(1 for r in merged_data if r.get('元素治理表_表名') and r.get('物理模型表_表名') and not r.get('逻辑模型表_表名')) + element_only_match = sum(1 for r in merged_data if r.get('元素治理表_表名') and not r.get('逻辑模型表_表名') and not r.get('物理模型表_表名')) + logical_only_count = sum(1 for r in merged_data if r.get('逻辑模型表_表名') and not r.get('元素治理表_表名')) + physical_only_count = sum(1 for r in merged_data if r.get('物理模型表_表名') and not r.get('元素治理表_表名')) + + print(f"\n[INFO] 统计信息:") + print(f" 三表匹配: {three_table_match} 条") + print(f" 元素治理+逻辑模型: {element_logical_match} 条") + print(f" 元素治理+物理模型: {element_physical_match} 条") + print(f" 仅元素治理: {element_only_match} 条") + print(f" 仅逻辑模型: {logical_only_count} 条") + print(f" 仅物理模型: {physical_only_count} 条") + + # 显示前3条记录的字段名 + if merged_data: + print(f"\n[INFO] 合并记录示例:") + sample_record = merged_data[0] + print(f" 字段数量: {len(sample_record)}") + print(f" 字段名: {list(sample_record.keys())[:10]}...") # 只显示前10个字段 + + except Exception as e: + print(f"\n[ERROR] 保存文件失败: {e}") + import traceback + traceback.print_exc() + +if __name__ == "__main__": + main() diff --git a/qa_generator.py b/qa_generator.py index cfea8d1..1dbfe48 100644 --- a/qa_generator.py +++ b/qa_generator.py @@ -1,92 +1,20 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- """ -QA生成器 - 整合版 -整合所有功能于一个文件,减少冗余 +QA生成器 - 简化版 +基于selected.json文件生成问答对 +只使用字段中文名、字段英文名、抽象中文名作为提问基础 """ import json import os import random from typing import List, Dict, Any - - -class QAConfig: - """QA生成配置类""" - - def __init__(self): - # 基础配置 - self.RANDOM_SEED = 42 - self.INPUT_DIR = "Data_Export_Json" - self.OUTPUT_DIR = "Data_QA_Outputs" - - # 复杂程度控制 (1-5) - self.COMPLEXITY_LEVEL = 3 - - # 问题数量控制 - self.BASIC_QUESTIONS_PER_ITEM = 1 - self.MAX_QUESTIONS_PER_ITEM = 10 - self.MULTI_COLUMN_RATIO = 0.3 - - # 输出控制 - self.SHUFFLE_OUTPUT = True - self.GENERATE_REPORT = True - self.VERBOSE_LOG = True - - # 数据文件配置 - self.DATA_FILES = [ - {"name": "元素治理模板", "file": "元素治理模板.json", "output": "元素治理模板_QA.json", "enabled": True}, - {"name": "物理模型", "file": "物理模型.json", "output": "物理模型_QA.json", "enabled": True}, - {"name": "逻辑模型", "file": "逻辑模型.json", "output": "逻辑模型_QA.json", "enabled": True} - ] - - # 初始化修饰语和连接词 - self._init_templates() - - def _init_templates(self): - """初始化模板列表""" - if self.COMPLEXITY_LEVEL <= 2: - # 简单模式 - self.QUESTION_PREFIXES = ["请告诉我", "查询", "请问"] - self.ANSWER_PREFIXES = ["根据表记录,该字段的", "查询结果显示,", "经查询,该字段的"] - self.ANSWER_SUFFIXES = ["。", "。"] - self.CONNECTORS = ["和", "与"] - self.SINGLE_TEMPLATES = 6 if self.COMPLEXITY_LEVEL == 2 else 3 - self.MULTI_TEMPLATES = 1 if self.COMPLEXITY_LEVEL == 2 else 0 - self.MULTI_RATIO = 0.1 if self.COMPLEXITY_LEVEL == 2 else 0.0 - else: - # 普通/复杂模式 - self.QUESTION_PREFIXES = ["请告诉我", "查询", "请问", "在", "请解释", "请输出", "请列举", "请说明", "请查找", "请确认"] - self.ANSWER_PREFIXES = ["根据表记录,该字段的", "查询结果显示,", "经查询,该字段的", "根据数据库记录,", "在表中,此字段的", "查询结果:", "经系统查询,", "根据记录显示,", "在数据中,该字段的", "查询得知,该字段的"] - self.ANSWER_SUFFIXES = ["。", ",请参考。", ",详情如上。", ",以上信息为准。", ",望知悉。", ",如需更多信息请联系。", ",希望能帮到您。", ",祝您工作顺利。", ",谢谢。", "。"] - self.CONNECTORS = ["和", "与", "及", "、", ",还有", "以及"] - self.SINGLE_TEMPLATES = 12 - self.MULTI_TEMPLATES = 5 - self.MULTI_RATIO = self.MULTI_COLUMN_RATIO - - def get_random_element(self, elements: List[str]) -> str: - """从列表中随机获取一个元素""" - return random.choice(elements) if elements else "" - - def update_config(self, **kwargs): - """更新配置参数""" - for key, value in kwargs.items(): - if hasattr(self, key): - setattr(self, key, value) - if key == 'COMPLEXITY_LEVEL': - self._init_templates() # 重新初始化模板 - - def print_config(self): - """打印当前配置""" - print(f"\n复杂程度等级: {self.COMPLEXITY_LEVEL}") - print(f"单列模板数: {self.SINGLE_TEMPLATES}") - print(f"多列模板数: {self.MULTI_TEMPLATES}") - print(f"多列占比: {self.MULTI_RATIO}") - print(f"输出目录: {self.OUTPUT_DIR}") +from config import QAConfig class QAGenerator: - """QA生成器 - 整合版""" + """QA生成器 - 简化版""" def __init__(self, config: QAConfig = None): """初始化生成器""" @@ -94,244 +22,181 @@ class QAGenerator: os.makedirs(self.config.OUTPUT_DIR, exist_ok=True) random.seed(self.config.RANDOM_SEED) + # 问题模板前缀 + self.QUESTION_PREFIXES = [ + "请告诉我", + "查询", + "请问", + "请解释", + "请输出", + "请列举", + "请说明", + "请查找", + "请确认" + ] + + # 答句模板前缀 + self.ANSWER_PREFIXES = [ + "该字段的", + "查询结果显示,", + "经查询,该字段的", + "根据记录显示,", + "该数据的", + "查询结果:", + "经系统查询,", + "根据记录,", + "该值的" + ] + + # 答句模板后缀 + self.ANSWER_SUFFIXES = [ + "。", + "。" + ] + + def get_random_element(self, elements: List[str]) -> str: + """从列表中随机获取一个元素""" + return random.choice(elements) if elements else "" + def load_json(self, file_path: str) -> List[Dict]: """加载JSON文件""" with open(file_path, 'r', encoding='utf-8') as f: return json.load(f) - def generate_single_qa(self, item: Dict, template_count: int, data_type: str) -> List[Dict]: - """生成单列QA - 整合了三种数据类型的模板""" + def generate_qa_for_item(self, item: Dict) -> List[Dict]: + """为单个数据项生成问答对 + + 基于字段中文名、字段英文名询问其他所有字段 + """ qa_pairs = [] - answer_prefixes = self.config.ANSWER_PREFIXES - answer_suffixes = self.config.ANSWER_SUFFIXES - prefixes = self.config.QUESTION_PREFIXES - if data_type == "element": - # 元素治理模板模板 - templates = [] - table_name = item.get("表名", "元素治理模板") + # 获取两个核心字段 + field_chinese_name = item.get('字段中文名', '') + field_english_name = item.get('字段英文名', '') - if item.get("业务领域名称") and item.get("数据元素中文名"): - templates.append((f"在{table_name}中,业务领域「{item['业务领域名称']}」对应的数据元素中文名是什么?", f"该数据元素为「{item['数据元素中文名']}」")) - - if item.get("业务领域名称") and item.get("数据元素中文名"): - templates.append((f"「{item['数据元素中文名']}」这个数据元素在{table_name}中属于哪个业务领域?", f"该数据元素属于「{item['业务领域名称']}」")) - - if item.get("数据元素中文名") and item.get("值类型"): - templates.append((f"查询{table_name}中,数据元素「{item['数据元素中文名']}」的值类型是什么?", f"值类型为「{item['值类型']}」")) - - if item.get("数据元素中文名") and item.get("总长度"): - templates.append((f"在{table_name}里,「{item['数据元素中文名']}」的总长度设置是多少?", f"总长度为{item['总长度']}")) - - if item.get("数据元素中文名") and item.get("类别"): - templates.append((f"请确认「{item['数据元素中文名']}」在{table_name}中属于哪个类别?", f"该数据元素属于「{item['类别']}」类别")) - - if item.get("数据元素中文名") and item.get("数据元素英文名"): - templates.append((f"请查找{table_name}中「{item['数据元素中文名']}」对应的英文名是什么?", f"英文名为「{item['数据元素英文名']}」")) - - if item.get("数据元素中文名") and item.get("是否枚举"): - templates.append((f"「{item['数据元素中文名']}」这个数据元素在{table_name}中是否枚举?", f"是否枚举为「{item['是否枚举']}」")) - - if item.get("数据元素中文名") and item.get("枚举数量"): - templates.append((f"请问在{table_name}中,「{item['数据元素中文名']}」的枚举数量是多少?", f"枚举数量为{item['枚举数量']}")) - - if item.get("数据元素中文名") and item.get("小数位"): - templates.append((f"请说明{table_name}中「{item['数据元素中文名']}」的小数位设置?", f"小数位为{item['小数位']}")) - - if item.get("数据元素中文名") and item.get("抽象元素中文名"): - templates.append((f"在{table_name}里,「{item['数据元素中文名']}」的抽象元素中文名是什么?", f"抽象元素中文名为「{item['抽象元素中文名']}」")) - - if item.get("数据元素中文名") and item.get("说明"): - templates.append((f"请解释「{item['数据元素中文名']}」在{table_name}中的作用和含义", f"该数据元素的说明为「{item['说明']}」")) - - if item.get("数据元素中文名") and item.get("是否上线"): - templates.append((f"请问「{item['数据元素中文名']}」在{table_name}中是否已上线?", f"是否上线为「{item['是否上线']}」")) - - # 生成QA - for i, (question, answer) in enumerate(templates[:template_count]): + # 基于字段中文名提问 + if field_chinese_name: + # 询问值类型 + if item.get('值类型'): + question = f"字段中文名为'{field_chinese_name}'的值类型是什么?" + answer = f"值类型为「{item['值类型']}」" qa_pairs.append({ "instruct": question, "input": "", - "output": f"{self.config.get_random_element(answer_prefixes)}{answer}{self.config.get_random_element(answer_suffixes)}" + "output": f"{self.get_random_element(self.ANSWER_PREFIXES)}{answer}{self.get_random_element(self.ANSWER_SUFFIXES)}" }) - elif data_type == "physical": - # 物理模型模板 - table_name = item.get("表名", "物理模型") - templates = [] - - if item.get("物理模型属性中文名") and item.get("值类型"): - templates.append((f"在{table_name}中,「{item['物理模型属性中文名']}」的值类型是什么?", f"该属性的值类型为「{item['值类型']}」")) - - if item.get("物理模型属性中文名") and item.get("长度"): - templates.append((f"请问「{item['物理模型属性中文名']}」在{table_name}中的长度是多少?", f"该属性长度为{item['长度']}")) - - if item.get("物理模型属性中文名") and item.get("小数位") is not None: - templates.append((f"请确认{table_name}中「{item['物理模型属性中文名']}」的小数位设置", f"该属性小数位为{item['小数位']}")) - - if item.get("物理模型属性中文名") and item.get("关联数据元素"): - templates.append((f"「{item['物理模型属性中文名']}」这个属性在{table_name}中关联哪个数据元素?", f"该属性关联的数据元素为「{item['关联数据元素']}」")) - - if item.get("物理模型属性中文名") and item.get("物理模型中文名"): - templates.append((f"请查找「{item['物理模型属性中文名']}」属于哪个物理模型?", f"该属性属于「{item['物理模型中文名']}」")) - - if item.get("物理模型属性中文名") and item.get("说明"): - templates.append((f"请说明「{item['物理模型属性中文名']}」在{table_name}中的作用和用途", f"该属性的说明为「{item['说明']}」")) - - if item.get("物理模型属性英文名") and item.get("物理模型属性中文名"): - templates.append((f"在{table_name}中,属性「{item['物理模型属性英文名']}」的中文名是什么?", f"该属性的中文名为「{item['物理模型属性中文名']}」")) - - if item.get("物理模型中文名") and item.get("物理模型英文名"): - templates.append((f"「{item['物理模型中文名']}」对应的物理模型英文名是什么?", f"该物理模型的英文名为「{item['物理模型英文名']}」")) - - if item.get("物理模型英文名") and item.get("物理模型中文名"): - templates.append((f"在{table_name}中,物理模型「{item['物理模型英文名']}」的中文名是什么?", f"该物理模型的中文名为「{item['物理模型中文名']}」")) - - # 生成QA - for i, (question, answer) in enumerate(templates[:template_count]): + # 询问是否枚举 + if item.get('是否枚举'): + question = f"字段中文名为'{field_chinese_name}'是否枚举?" + answer = f"是否枚举为「{item['是否枚举']}」" qa_pairs.append({ "instruct": question, "input": "", - "output": f"{self.config.get_random_element(answer_prefixes)}{answer}{self.config.get_random_element(answer_suffixes)}" + "output": f"{self.get_random_element(self.ANSWER_PREFIXES)}{answer}{self.get_random_element(self.ANSWER_SUFFIXES)}" }) - elif data_type == "logical": - # 逻辑模型模板 - table_name = item.get("表名", "逻辑模型") - templates = [] - - if item.get("业务领域") and item.get("逻辑模型中文名"): - templates.append((f"在{table_name}中,业务领域为「{item['业务领域']}」的逻辑模型中文名是什么?", f"该业务领域对应的逻辑模型中文名为「{item['逻辑模型中文名']}」")) - - if item.get("业务领域") and item.get("逻辑模型中文名"): - templates.append((f"「{item['逻辑模型中文名']}」这个逻辑模型在{table_name}中属于哪个业务领域?", f"该逻辑模型属于「{item['业务领域']}」")) - - if item.get("字段英文名") and item.get("字段中文名"): - templates.append((f"请告诉我「{item['字段英文名']}」在{table_name}中的中文名是什么?", f"该字段的中文名为「{item['字段中文名']}」")) - - if item.get("字段中文名") and item.get("字段英文名"): - templates.append((f"在{table_name}中,「{item['字段中文名']}」对应的英文名是什么?", f"该字段的英文名为「{item['字段英文名']}」")) - - if item.get("字段中文名") and item.get("值类型"): - templates.append((f"请问「{item['字段中文名']}」在{table_name}中的值类型是什么?", f"该字段的值类型为「{item['值类型']}」")) - - if item.get("字段中文名") and item.get("长度"): - templates.append((f"查询{table_name}中,「{item['字段中文名']}」的长度是多少?", f"该字段长度为{item['长度']}")) - - if item.get("字段中文名") and item.get("小数位") is not None: - templates.append((f"请确认「{item['字段中文名']}」在{table_name}中的小数位设置", f"该字段小数位为{item['小数位']}")) - - if item.get("逻辑模型中文名") and item.get("动态查询能力"): - templates.append((f"「{item['逻辑模型中文名']}」的动态查询能力是什么级别?", f"该逻辑模型的动态查询能力为「{item['动态查询能力']}」")) - - if item.get("字段中文名") and item.get("关联数据元素英文名"): - templates.append((f"在{table_name}中,「{item['字段中文名']}」关联的数据元素英文名是什么?", f"该字段关联的数据元素英文名为「{item['关联数据元素英文名']}」")) - - if item.get("逻辑模型中文名") and item.get("逻辑模型英文名"): - templates.append((f"请查找「{item['逻辑模型中文名']}」对应的逻辑模型英文名", f"该逻辑模型的英文名为「{item['逻辑模型英文名']}」")) - - # 生成QA - for i, (question, answer) in enumerate(templates[:template_count]): + # 询问枚举数量 + if item.get('枚举数量') is not None: + question = f"字段中文名为'{field_chinese_name}'的枚举数量是多少?" + answer = f"枚举数量为{item['枚举数量']}" qa_pairs.append({ "instruct": question, "input": "", - "output": f"{self.config.get_random_element(answer_prefixes)}{answer}{self.config.get_random_element(answer_suffixes)}" + "output": f"{self.get_random_element(self.ANSWER_PREFIXES)}{answer}{self.get_random_element(self.ANSWER_SUFFIXES)}" + }) + + # 询问总长度 + if item.get('总长度') is not None: + question = f"字段中文名为'{field_chinese_name}'的总长度是多少?" + answer = f"总长度为{item['总长度']}" + qa_pairs.append({ + "instruct": question, + "input": "", + "output": f"{self.get_random_element(self.ANSWER_PREFIXES)}{answer}{self.get_random_element(self.ANSWER_SUFFIXES)}" + }) + + # 询问小数位 + if item.get('小数位') is not None: + question = f"字段中文名为'{field_chinese_name}'的小数位是多少?" + answer = f"小数位为{item['小数位']}" + qa_pairs.append({ + "instruct": question, + "input": "", + "output": f"{self.get_random_element(self.ANSWER_PREFIXES)}{answer}{self.get_random_element(self.ANSWER_SUFFIXES)}" + }) + + # 询问字段英文名 + if field_english_name: + question = f"字段中文名为'{field_chinese_name}'的字段英文名是什么?" + answer = f"字段英文名为「{field_english_name}」" + qa_pairs.append({ + "instruct": question, + "input": "", + "output": f"{self.get_random_element(self.ANSWER_PREFIXES)}{answer}{self.get_random_element(self.ANSWER_SUFFIXES)}" + }) + + # 基于字段英文名提问 + if field_english_name: + # 询问值类型 + if item.get('值类型'): + question = f"字段英文名为'{field_english_name}'的值类型是什么?" + answer = f"值类型为「{item['值类型']}」" + qa_pairs.append({ + "instruct": question, + "input": "", + "output": f"{self.get_random_element(self.ANSWER_PREFIXES)}{answer}{self.get_random_element(self.ANSWER_SUFFIXES)}" + }) + + # 询问是否枚举 + if item.get('是否枚举'): + question = f"字段英文名为'{field_english_name}'是否枚举?" + answer = f"是否枚举为「{item['是否枚举']}」" + qa_pairs.append({ + "instruct": question, + "input": "", + "output": f"{self.get_random_element(self.ANSWER_PREFIXES)}{answer}{self.get_random_element(self.ANSWER_SUFFIXES)}" + }) + + # 询问总长度 + if item.get('总长度') is not None: + question = f"字段英文名为'{field_english_name}'的总长度是多少?" + answer = f"总长度为{item['总长度']}" + qa_pairs.append({ + "instruct": question, + "input": "", + "output": f"{self.get_random_element(self.ANSWER_PREFIXES)}{answer}{self.get_random_element(self.ANSWER_SUFFIXES)}" + }) + + # 询问小数位 + if item.get('小数位') is not None: + question = f"字段英文名为'{field_english_name}'的小数位是多少?" + answer = f"小数位为{item['小数位']}" + qa_pairs.append({ + "instruct": question, + "input": "", + "output": f"{self.get_random_element(self.ANSWER_PREFIXES)}{answer}{self.get_random_element(self.ANSWER_SUFFIXES)}" + }) + + # 询问字段中文名 + if field_chinese_name: + question = f"字段英文名为'{field_english_name}'的字段中文名是什么?" + answer = f"字段中文名为「{field_chinese_name}」" + qa_pairs.append({ + "instruct": question, + "input": "", + "output": f"{self.get_random_element(self.ANSWER_PREFIXES)}{answer}{self.get_random_element(self.ANSWER_SUFFIXES)}" }) return qa_pairs - def generate_multi_qa(self, item: Dict, template_count: int, data_type: str) -> List[Dict]: - """生成多列QA""" - qa_pairs = [] - answer_prefixes = self.config.ANSWER_PREFIXES - answer_suffixes = self.config.ANSWER_SUFFIXES - connectors = self.config.CONNECTORS - - if data_type == "element": - table_name = item.get("表名", "元素治理模板") - templates = [] - - if item.get("数据元素中文名") and item.get("值类型") and item.get("总长度"): - connector = self.config.get_random_element(connectors) - templates.append(( - f"请列举{table_name}中「{item['数据元素中文名']}」的值类型和总长度", - f"该数据元素的{connector}值类型为「{item['值类型']}」,总长度为{item['总长度']}" - )) - - if item.get("数据元素中文名") and item.get("类别") and item.get("业务领域名称") and item.get("是否枚举"): - connector1 = self.config.get_random_element(connectors[:3]) - connector2 = self.config.get_random_element(connectors[3:]) - templates.append(( - f"请输出「{item['数据元素中文名']}」在{table_name}中的类别、业务领域和是否枚举信息", - f"该数据元素的类别为「{item['类别']}」,{connector1}业务领域为「{item['业务领域名称']}」,{connector2}是否枚举为「{item['是否枚举']}」" - )) - - if item.get("数据元素中文名") and item.get("值类型") and item.get("总长度") and item.get("小数位"): - connector = self.config.get_random_element(connectors) - templates.append(( - f"在{table_name}中,「{item['数据元素中文名']}」的值类型、长度和小数位分别是多少?", - f"该数据元素的值类型为「{item['值类型']}」,{connector}长度为{item['总长度']},小数位为{item['小数位']}" - )) - - if item.get("数据元素中文名") and item.get("数据元素英文名") and item.get("说明"): - connector = self.config.get_random_element(connectors) - templates.append(( - f"请查找「{item['数据元素中文名']}」在{table_name}中的英文名和说明信息", - f"该数据元素的英文名为「{item['数据元素英文名']}」,{connector}说明为「{item['说明']}」" - )) - - if item.get("数据元素中文名") and item.get("枚举数量") and item.get("元素取值范围\n(枚举类型名称)"): - connector = self.config.get_random_element(connectors) - range_value = item['元素取值范围\n(枚举类型名称)'] or "无" - templates.append(( - f"在{table_name}中,「{item['数据元素中文名']}」的枚举数量和取值范围分别是什么?", - f"该数据元素的枚举数量为{item['枚举数量']},{connector}取值范围为「{range_value}」" - )) - - # 生成QA - for i, (question, answer) in enumerate(templates[:template_count]): - qa_pairs.append({ - "instruct": question, - "input": "", - "output": f"{self.config.get_random_element(answer_prefixes)}{answer}{self.config.get_random_element(answer_suffixes)}" - }) - - # 为物理模型和逻辑模型也添加类似的多列模板... - elif data_type == "physical": - table_name = item.get("表名", "物理模型") - if item.get("物理模型属性中文名") and item.get("值类型") and item.get("长度"): - connector = self.config.get_random_element(connectors) - qa_pairs.append({ - "instruct": f"请列举「{item['物理模型属性中文名']}」在{table_name}中的值类型和长度", - "input": "", - "output": f"{self.config.get_random_element(answer_prefixes)}该属性的{connector}值类型为「{item['值类型']}」,长度为{item['长度']}{self.config.get_random_element(answer_suffixes)}" - }) - - elif data_type == "logical": - table_name = item.get("表名", "逻辑模型") - if item.get("字段中文名") and item.get("值类型") and item.get("长度"): - connector = self.config.get_random_element(connectors) - qa_pairs.append({ - "instruct": f"请列举「{item['字段中文名']}」在{table_name}中的值类型和长度", - "input": "", - "output": f"{self.config.get_random_element(answer_prefixes)}该字段的{connector}值类型为「{item['值类型']}」,长度为{item['长度']}{self.config.get_random_element(answer_suffixes)}" - }) - - return qa_pairs - - def generate_qa_for_data(self, data: List[Dict], data_type: str) -> List[Dict]: - """为指定数据类型生成QA""" + def generate_qa_for_data(self, data: List[Dict]) -> List[Dict]: + """为所有数据生成QA""" all_qa = [] for item in data: - # 生成单列QA - single_qa = self.generate_single_qa(item, self.config.SINGLE_TEMPLATES, data_type) - all_qa.extend(single_qa) - - # 根据概率生成多列QA - if random.random() < self.config.MULTI_RATIO: - multi_qa = self.generate_multi_qa(item, self.config.MULTI_TEMPLATES, data_type) - all_qa.extend(multi_qa) + qa_pairs = self.generate_qa_for_item(item) + all_qa.extend(qa_pairs) return all_qa @@ -347,237 +212,76 @@ class QAGenerator: with open(output_path, 'w', encoding='utf-8') as f: json.dump(qa_pairs, f, ensure_ascii=False, indent=2) - size_mb = os.path.getsize(output_path) / (1024 * 1024) - if self.config.VERBOSE_LOG: - print(f"[OK] 已生成: {output_path} (共 {len(qa_pairs)} 条问答对, {size_mb:.1f}MB)") - else: - print(f"[OK] 已生成: {output_path} (共 {len(qa_pairs)} 条问答对)") + size_kb = os.path.getsize(output_path) / 1024 + print(f"[OK] 已生成: {output_path}") + print(f" 共 {len(qa_pairs)} 条问答对, {size_kb:.1f} KB") - def merge_to_train(self, output_file: str = "train.json") -> Dict: - """合并QA文件为train.json""" - all_qa_pairs = [] - file_stats = {} - total_files = 0 - - # 遍历输出目录中的所有QA文件 - for filename in os.listdir(self.config.OUTPUT_DIR): - if filename.endswith('.json') and not filename.startswith('QA生成报告') and filename != 'train_stats.json': - file_path = os.path.join(self.config.OUTPUT_DIR, filename) - - try: - with open(file_path, 'r', encoding='utf-8') as f: - qa_data = json.load(f) - - if isinstance(qa_data, list) and all(isinstance(item, dict) and 'instruct' in item and 'output' in item for item in qa_data): - all_qa_pairs.extend(qa_data) - file_stats[filename] = len(qa_data) - total_files += 1 - if self.config.VERBOSE_LOG: - print(f"[OK] 已合并: {filename} ({len(qa_data)} 条问答对)") - - except Exception as e: - print(f"[ERROR] 读取文件失败 {filename}: {str(e)}") - - # 打乱顺序 - if self.config.SHUFFLE_OUTPUT and all_qa_pairs: - random.shuffle(all_qa_pairs) - if self.config.VERBOSE_LOG: - print(f"\n[INFO] 已打乱 {len(all_qa_pairs)} 条问答对顺序") - - # 保存train.json - train_path = os.path.join(self.config.OUTPUT_DIR, output_file) - with open(train_path, 'w', encoding='utf-8') as f: - json.dump(all_qa_pairs, f, ensure_ascii=False, indent=2) - - file_size_mb = os.path.getsize(train_path) / (1024 * 1024) - - # 生成统计 - stats = { - "合并时间": "2025-12-18", - "处理文件数": total_files, - "各文件问答对数量": file_stats, - "总问答对数量": len(all_qa_pairs), - "输出文件大小": f"{file_size_mb:.2f} MB", - "打乱顺序": self.config.SHUFFLE_OUTPUT - } - - # 保存统计信息 - stats_file = os.path.join(self.config.OUTPUT_DIR, "train_stats.json") - with open(stats_file, 'w', encoding='utf-8') as f: - json.dump(stats, f, ensure_ascii=False, indent=2) - - print(f"\n[SUCCESS] 合并完成!") - print(f"[OUTPUT] {train_path}") - print(f"[TOTAL] 总计: {len(all_qa_pairs):,} 条问答对") - print(f"[SIZE] 文件大小: {file_size_mb:.2f} MB") - - return stats - - def generate_report(self, qa_counts: Dict[str, int]): + def generate_report(self, total_qa_count: int): """生成生成报告""" - if not self.config.GENERATE_REPORT: - return - report = { - "生成时间": "2025-12-18", - "版本": "整合版", - "配置信息": { - "复杂程度等级": self.config.COMPLEXITY_LEVEL, - "随机种子": self.config.RANDOM_SEED, - "多列查询占比": self.config.MULTI_COLUMN_RATIO, - "打乱输出": self.config.SHUFFLE_OUTPUT - }, - "总文件数": len(qa_counts), - "各文件问答对数量": qa_counts, - "总计问答对数量": sum(qa_counts.values()), - "说明": "所有问答对均基于原始JSON数据生成,未进行任何编撰或修改" + "生成时间": "2025-12-31", + "版本": "简化版", + "输入文件": "selected.json", + "输出目录": self.config.OUTPUT_DIR, + "随机种子": self.config.RANDOM_SEED, + "总问答对数量": total_qa_count, + "说明": "基于字段中文名、字段英文名、抽象中文名询问其他所有字段" } report_path = os.path.join(self.config.OUTPUT_DIR, "QA生成报告.json") with open(report_path, 'w', encoding='utf-8') as f: json.dump(report, f, ensure_ascii=False, indent=2) - if self.config.VERBOSE_LOG: - print(f"[OK] 已生成: {report_path}") + print(f"[OK] 已生成: {report_path}") - def process_all(self): - # 清理输出目录中的旧文件 - print("[INFO] 清理输出目录中的旧文件...") - if os.path.exists(self.config.OUTPUT_DIR): - for filename in os.listdir(self.config.OUTPUT_DIR): - file_path = os.path.join(self.config.OUTPUT_DIR, filename) - try: - if os.path.isfile(file_path): - os.remove(file_path) - if self.config.VERBOSE_LOG: - print(f" [DEL] 已删除: {filename}") - except Exception as e: - print(f" [WARN] 删除文件失败 {filename}: {str(e)}") + def process_selected_json(self): + """处理selected.json文件""" + input_file = os.path.join(self.config.INPUT_DIR, "selected.json") - """处理所有数据文件""" - qa_counts = {} + if not os.path.exists(input_file): + print(f"[ERROR] 文件不存在: {input_file}") + return - for file_info in self.config.DATA_FILES: - if not file_info["enabled"]: - if self.config.VERBOSE_LOG: - print(f"[SKIP] 已跳过: {file_info['name']}") - continue + print("="*60) + print("QA生成器 - 简化版") + print("="*60) + print(f"\n[INFO] 加载数据: {input_file}") - if self.config.VERBOSE_LOG: - print(f"\n[INFO] 正在处理: {file_info['name']}.json") - else: - print(f"[INFO] 正在处理: {file_info['name']}") + try: + data = self.load_json(input_file) + print(f" 数据记录: {len(data)} 条") - input_file = os.path.join(self.config.INPUT_DIR, file_info["file"]) + print(f"\n[INFO] 生成问答对...") + qa_pairs = self.generate_qa_for_data(data) + print(f" 生成数量: {len(qa_pairs)} 条") - if not os.path.exists(input_file): - print(f"[WARNING] 文件不存在: {input_file}") - continue + print(f"\n[INFO] 打乱顺序...") + qa_pairs = self.shuffle_qa_pairs(qa_pairs) - try: - data = self.load_json(input_file) + print(f"\n[INFO] 保存文件...") + self.save_qa(qa_pairs, "selected_QA.json") - # 根据文件名确定数据类型 - if "元素治理" in file_info["name"]: - data_type = "element" - elif "物理模型" in file_info["name"]: - data_type = "physical" - elif "逻辑模型" in file_info["name"]: - data_type = "logical" - else: - print(f"[WARNING] 未知的文件类型: {file_info['name']}") - continue + print(f"\n[INFO] 生成报告...") + self.generate_report(len(qa_pairs)) - qa_pairs = self.generate_qa_for_data(data, data_type) - qa_pairs = self.shuffle_qa_pairs(qa_pairs) - self.save_qa(qa_pairs, file_info["output"]) - qa_counts[file_info["output"]] = len(qa_pairs) - - except Exception as e: - print(f"[ERROR] 处理文件 {file_info['name']} 时出错: {str(e)}") - - # 生成报告 - if self.config.GENERATE_REPORT: - if self.config.VERBOSE_LOG: - print("\n[INFO] 正在生成: 生成报告") - else: - print("\n[INFO] 正在生成: 生成报告") - self.generate_report(qa_counts) - - if self.config.VERBOSE_LOG: - print(f"\n[DONE] 所有文件处理完成!") + print(f"\n[DONE] 处理完成!") print(f"[OUT] 输出目录: {self.config.OUTPUT_DIR}") - print(f"[TOTAL] 总计生成: {sum(qa_counts.values())} 条问答对") - else: - print("\n[DONE] 所有文件处理完成!") + print(f"[TOTAL] 总计生成: {len(qa_pairs)} 条问答对") - -# 预设配置 -SIMPLE_CONFIG = QAConfig() -SIMPLE_CONFIG.COMPLEXITY_LEVEL = 1 -SIMPLE_CONFIG._init_templates() - -NORMAL_CONFIG = QAConfig() -NORMAL_CONFIG.COMPLEXITY_LEVEL = 3 -NORMAL_CONFIG._init_templates() - -COMPLEX_CONFIG = QAConfig() -COMPLEX_CONFIG.COMPLEXITY_LEVEL = 5 -COMPLEX_CONFIG._init_templates() + except Exception as e: + print(f"[ERROR] 处理文件时出错: {str(e)}") + import traceback + traceback.print_exc() def main(): - """主函数 - 交互式运行""" - print("="*60) - print("QA生成器 - 整合版") - print("="*60) - - print("\n可用的配置预设:") - print("1. SIMPLE_CONFIG - 简单模式 (复杂程度=1)") - print("2. NORMAL_CONFIG - 普通模式 (复杂程度=3)") - print("3. COMPLEX_CONFIG - 复杂模式 (复杂程度=5)") - print("4. 自定义配置") - - choice = input("\n请选择配置 (1-4): ").strip() - - if choice == "1": - config = SIMPLE_CONFIG - print("\n✓ 已选择: 简单模式") - elif choice == "2": - config = NORMAL_CONFIG - print("\n✓ 已选择: 普通模式") - elif choice == "3": - config = COMPLEX_CONFIG - print("\n✓ 已选择: 复杂模式") - elif choice == "4": - print("\n自定义配置:") - config = QAConfig() - complexity = input(f"复杂程度等级 (1-5, 当前:{config.COMPLEXITY_LEVEL}): ").strip() - if complexity: - config.COMPLEXITY_LEVEL = int(complexity) - config._init_templates() - - multi_ratio = input(f"多列查询占比 0.0-1.0 (当前:{config.MULTI_COLUMN_RATIO}): ").strip() - if multi_ratio: - config.MULTI_COLUMN_RATIO = float(multi_ratio) - config._init_templates() - - print("\n✓ 已应用自定义配置") - else: - print("\n无效选择,使用默认配置") - config = NORMAL_CONFIG - - # 显示配置信息 - config.print_config() + """主函数""" + # 使用默认配置 + config = QAConfig() # 创建生成器并处理 generator = QAGenerator(config) - generator.process_all() - - # 询问是否合并为train.json - merge_choice = input("\n是否合并为train.json? (y/n): ").strip().lower() - if merge_choice == 'y': - generator.merge_to_train() + generator.process_selected_json() if __name__ == "__main__": diff --git a/random_select.py b/random_select.py new file mode 100644 index 0000000..9bd73ad --- /dev/null +++ b/random_select.py @@ -0,0 +1,120 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +随机抽取脚本 +从final.json中随机抽取指定数量的记录,生成select_N.json文件 +""" + +import json +import random +import os +from typing import List, Dict, Any +from config import QAConfig + +def load_json_file(file_path: str) -> List[Dict[str, Any]]: + """加载JSON文件""" + try: + with open(file_path, 'r', encoding='utf-8') as f: + data = json.load(f) + print(f"[OK] 加载文件: {os.path.basename(file_path)} - {len(data)} 条记录") + return data + except Exception as e: + print(f"[ERROR] 加载文件失败 {file_path}: {e}") + return [] + +def random_select(records: List[Dict[str, Any]], count: int, random_seed: int) -> List[Dict[str, Any]]: + """ + 随机抽取记录 + + Args: + records: 记录列表 + count: 要抽取的数量 + random_seed: 随机种子 + + Returns: + 抽取的记录列表 + """ + # 设置随机种子 + random.seed(random_seed) + + # 如果抽取数量大于等于总数,直接返回所有记录 + if count >= len(records): + print(f"[WARN] 抽取数量 ({count}) 大于等于总记录数 ({len(records)}),返回所有记录") + return records + + # 随机抽取 + selected = random.sample(records, count) + print(f"[OK] 从 {len(records)} 条记录中随机抽取 {count} 条") + + return selected + +def main(): + """主函数""" + print("="*60) + print("随机抽取工具") + print("="*60) + + # 加载配置 + config = QAConfig() + print(f"\n[INFO] 加载配置:") + print(f" 随机种子: {config.RANDOM_SEED}") + print(f" 抽取数量: {config.SELECT_COUNT}") + + # 文件路径 + input_file = os.path.join(config.INPUT_DIR, "final.json") + output_file = os.path.join(config.INPUT_DIR, "selected.json") + + # 检查输入文件是否存在 + if not os.path.exists(input_file): + print(f"\n[ERROR] 输入文件不存在: {input_file}") + return + + # 加载数据 + print(f"\n[INFO] 加载数据...") + records = load_json_file(input_file) + + if not records: + print(f"\n[ERROR] 无法加载数据或数据为空") + return + + # 随机抽取 + print(f"\n[INFO] 执行随机抽取...") + selected_records = random_select(records, config.SELECT_COUNT, config.RANDOM_SEED) + + # 保存结果 + try: + with open(output_file, 'w', encoding='utf-8') as f: + json.dump(selected_records, f, ensure_ascii=False, indent=2) + + file_size = os.path.getsize(output_file) / 1024 # KB + print(f"\n[OK] 抽取完成!") + print(f" 输出文件: {output_file}") + print(f" 记录数量: {len(selected_records)}") + print(f" 文件大小: {file_size:.1f} KB") + + # 显示前3条记录的字段名 + if selected_records: + print(f"\n[INFO] 抽取记录示例:") + sample = selected_records[0] + print(f" 字段数量: {len(sample)}") + print(f" 字段名: {list(sample.keys())[:10]}...") + + # 显示统计信息 + three_table_match = sum(1 for r in selected_records if '元素治理表_表名' in r and '逻辑模型表_表名' in r and '物理模型表_表名' in r) + element_logical_match = sum(1 for r in selected_records if '元素治理表_表名' in r and '逻辑模型表_表名' in r and '物理模型表_表名' not in r) + element_physical_match = sum(1 for r in selected_records if '元素治理表_表名' in r and '物理模型表_表名' in r and '逻辑模型表_表名' not in r) + element_only_match = sum(1 for r in selected_records if '元素治理表_表名' in r and '逻辑模型表_表名' not in r and '物理模型表_表名' not in r) + + print(f"\n[INFO] 抽取记录统计:") + print(f" 三表匹配: {three_table_match} 条") + print(f" 元素治理+逻辑模型: {element_logical_match} 条") + print(f" 元素治理+物理模型: {element_physical_match} 条") + print(f" 仅元素治理: {element_only_match} 条") + + except Exception as e: + print(f"\n[ERROR] 保存文件失败: {e}") + import traceback + traceback.print_exc() + +if __name__ == "__main__": + main()