Files
YG_TDgenerator/qa_generator.py

724 lines
33 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
QA生成器 - 简化版
基于selected.json文件生成问答对
只使用字段中文名、字段英文名、抽象中文名作为提问基础
"""
import json
import os
import random
from typing import List, Dict, Any
from config import QAConfig
class QAGenerator:
"""QA生成器 - 简化版"""
def __init__(self, config: QAConfig = None):
"""初始化生成器"""
self.config = config or QAConfig()
os.makedirs(self.config.OUTPUT_DIR, exist_ok=True)
random.seed(self.config.RANDOM_SEED)
# 问题模板前缀
self.QUESTION_PREFIXES = [
"请告诉我",
"查询",
"请问",
"请解释",
"请输出",
"请列举",
"请说明",
"请查找",
"请确认"
]
# 答句模板前缀
self.ANSWER_PREFIXES = [
"该字段的",
"查询结果显示,",
"经查询,该字段的",
"根据记录显示,",
"该数据的",
"查询结果:",
"经系统查询,",
"根据记录,",
"该值的"
]
# 答句模板后缀
self.ANSWER_SUFFIXES = [
"",
""
]
# 验证集专用模板(正式但有别于训练集)
self.VERIFICATION_QUESTION_PREFIXES = [
"请问",
"想咨询一下",
"请问您",
"我想了解一下",
"请教一下",
"您好,",
"能否告诉我",
"请问如何",
"我想咨询",
"希望了解"
]
self.VERIFICATION_ANSWER_PREFIXES = [
"根据查询,",
"经查询,",
"查询结果显示,",
"根据记录,",
"数据表明,",
"经系统查询,",
"根据数据,",
"查询结果:",
"经核实,",
"数据显示,"
]
self.VERIFICATION_ANSWER_SUFFIXES = [
"",
"",
"",
"",
"",
"",
"",
"",
"",
""
]
# 模型数据缓存
self.model_data_cache = {
"逻辑模型_逻辑模型中文名": {},
"逻辑模型_逻辑模型英文名": {},
"物理模型_物理模型中文名": {},
"物理模型_物理模型英文名": {}
}
def get_random_element(self, elements: List[str]) -> str:
"""从列表中随机获取一个元素"""
return random.choice(elements) if elements else ""
def load_json(self, file_path: str) -> List[Dict]:
"""加载JSON文件"""
with open(file_path, 'r', encoding='utf-8') as f:
return json.load(f)
def generate_qa_for_item(self, item: Dict) -> List[Dict]:
"""为单个数据项生成问答对
基于字段中文名、字段英文名询问其他所有字段
"""
qa_pairs = []
# 获取两个核心字段
field_chinese_name = item.get('字段中文名', '')
field_english_name = item.get('字段英文名', '')
# 基于字段中文名提问
if field_chinese_name:
# 询问值类型
if item.get('值类型'):
question = f"字段中文名为'{field_chinese_name}'的值类型是什么?"
answer = f"值类型为「{item['值类型']}"
qa_pairs.append({
"instruct": question,
"input": "",
"output": f"{self.get_random_element(self.ANSWER_PREFIXES)}{answer}{self.get_random_element(self.ANSWER_SUFFIXES)}"
})
# 询问是否枚举
if item.get('是否枚举'):
question = f"字段中文名为'{field_chinese_name}'是否枚举?"
answer = f"是否枚举为「{item['是否枚举']}"
qa_pairs.append({
"instruct": question,
"input": "",
"output": f"{self.get_random_element(self.ANSWER_PREFIXES)}{answer}{self.get_random_element(self.ANSWER_SUFFIXES)}"
})
# 询问枚举数量
if item.get('枚举数量') is not None:
question = f"字段中文名为'{field_chinese_name}'的枚举数量是多少?"
answer = f"枚举数量为{item['枚举数量']}"
qa_pairs.append({
"instruct": question,
"input": "",
"output": f"{self.get_random_element(self.ANSWER_PREFIXES)}{answer}{self.get_random_element(self.ANSWER_SUFFIXES)}"
})
# 询问总长度
if item.get('总长度') is not None:
question = f"字段中文名为'{field_chinese_name}'的总长度是多少?"
answer = f"总长度为{item['总长度']}"
qa_pairs.append({
"instruct": question,
"input": "",
"output": f"{self.get_random_element(self.ANSWER_PREFIXES)}{answer}{self.get_random_element(self.ANSWER_SUFFIXES)}"
})
# 询问小数位
if item.get('小数位') is not None:
question = f"字段中文名为'{field_chinese_name}'的小数位是多少?"
answer = f"小数位为{item['小数位']}"
qa_pairs.append({
"instruct": question,
"input": "",
"output": f"{self.get_random_element(self.ANSWER_PREFIXES)}{answer}{self.get_random_element(self.ANSWER_SUFFIXES)}"
})
# 询问字段英文名
if field_english_name:
question = f"字段中文名为'{field_chinese_name}'的字段英文名是什么?"
answer = f"字段英文名为「{field_english_name}"
qa_pairs.append({
"instruct": question,
"input": "",
"output": f"{self.get_random_element(self.ANSWER_PREFIXES)}{answer}{self.get_random_element(self.ANSWER_SUFFIXES)}"
})
# 基于字段英文名提问
if field_english_name:
# 询问值类型
if item.get('值类型'):
question = f"字段英文名为'{field_english_name}'的值类型是什么?"
answer = f"值类型为「{item['值类型']}"
qa_pairs.append({
"instruct": question,
"input": "",
"output": f"{self.get_random_element(self.ANSWER_PREFIXES)}{answer}{self.get_random_element(self.ANSWER_SUFFIXES)}"
})
# 询问是否枚举
if item.get('是否枚举'):
question = f"字段英文名为'{field_english_name}'是否枚举?"
answer = f"是否枚举为「{item['是否枚举']}"
qa_pairs.append({
"instruct": question,
"input": "",
"output": f"{self.get_random_element(self.ANSWER_PREFIXES)}{answer}{self.get_random_element(self.ANSWER_SUFFIXES)}"
})
# 询问总长度
if item.get('总长度') is not None:
question = f"字段英文名为'{field_english_name}'的总长度是多少?"
answer = f"总长度为{item['总长度']}"
qa_pairs.append({
"instruct": question,
"input": "",
"output": f"{self.get_random_element(self.ANSWER_PREFIXES)}{answer}{self.get_random_element(self.ANSWER_SUFFIXES)}"
})
# 询问小数位
if item.get('小数位') is not None:
question = f"字段英文名为'{field_english_name}'的小数位是多少?"
answer = f"小数位为{item['小数位']}"
qa_pairs.append({
"instruct": question,
"input": "",
"output": f"{self.get_random_element(self.ANSWER_PREFIXES)}{answer}{self.get_random_element(self.ANSWER_SUFFIXES)}"
})
# 询问字段中文名
if field_chinese_name:
question = f"字段英文名为'{field_english_name}'的字段中文名是什么?"
answer = f"字段中文名为「{field_chinese_name}"
qa_pairs.append({
"instruct": question,
"input": "",
"output": f"{self.get_random_element(self.ANSWER_PREFIXES)}{answer}{self.get_random_element(self.ANSWER_SUFFIXES)}"
})
# ==================== 新增:根据中文字段名询问完整定义 ====================
if field_chinese_name:
question = f"字段中文名为'{field_chinese_name}'的定义是什么?"
# 构建完整的定义信息
definition_parts = []
for key, value in item.items():
if key not in ['字段中文名'] and value is not None:
definition_parts.append(f"{key}{value}")
elif key not in ['字段中文名'] and value is None:
definition_parts.append(f"{key}null")
definition_text = " ".join(definition_parts)
answer = f"{field_chinese_name}的定义为:{definition_text}"
qa_pairs.append({
"instruct": question,
"input": "",
"output": f"{self.get_random_element(self.ANSWER_PREFIXES)}{answer}{self.get_random_element(self.ANSWER_SUFFIXES)}"
})
# ==================== 新增:根据英文字段名询问完整定义 ====================
if field_english_name:
question = f"字段英文名为'{field_english_name}'的定义是什么?"
# 构建完整的定义信息
definition_parts = []
for key, value in item.items():
if key not in ['字段英文名'] and value is not None:
definition_parts.append(f"{key}{value}")
elif key not in ['字段英文名'] and value is None:
definition_parts.append(f"{key}null")
definition_text = " ".join(definition_parts)
answer = f"{field_english_name}的定义为:{definition_text}"
qa_pairs.append({
"instruct": question,
"input": "",
"output": f"{self.get_random_element(self.ANSWER_PREFIXES)}{answer}{self.get_random_element(self.ANSWER_SUFFIXES)}"
})
return qa_pairs
def generate_verification_qa_for_item(self, item: Dict) -> List[Dict]:
"""为单个数据项生成验证集问答对(正式但有别于训练集的表达)"""
qa_pairs = []
# 获取两个核心字段
field_chinese_name = item.get('字段中文名', '')
field_english_name = item.get('字段英文名', '')
# 基于字段中文名提问(正式但有变化)
if field_chinese_name:
# 询问值类型
if item.get('值类型'):
question = f"{self.get_random_element(self.VERIFICATION_QUESTION_PREFIXES)}'{field_chinese_name}'字段的数据类型是什么?"
answer = f"数据类型是「{item['值类型']}"
qa_pairs.append({
"instruct": question,
"input": "",
"output": f"{self.get_random_element(self.VERIFICATION_ANSWER_PREFIXES)}{answer}{self.get_random_element(self.VERIFICATION_ANSWER_SUFFIXES)}"
})
# 询问是否枚举
if item.get('是否枚举'):
question = f"{self.get_random_element(self.VERIFICATION_QUESTION_PREFIXES)}'{field_chinese_name}'字段是否为枚举类型?"
answer = f"枚举类型为「{item['是否枚举']}"
qa_pairs.append({
"instruct": question,
"input": "",
"output": f"{self.get_random_element(self.VERIFICATION_ANSWER_PREFIXES)}{answer}{self.get_random_element(self.VERIFICATION_ANSWER_SUFFIXES)}"
})
# 询问枚举数量
if item.get('枚举数量') is not None:
question = f"{self.get_random_element(self.VERIFICATION_QUESTION_PREFIXES)}'{field_chinese_name}'字段的枚举数量是多少?"
answer = f"枚举数量为{item['枚举数量']}"
qa_pairs.append({
"instruct": question,
"input": "",
"output": f"{self.get_random_element(self.VERIFICATION_ANSWER_PREFIXES)}{answer}{self.get_random_element(self.VERIFICATION_ANSWER_SUFFIXES)}"
})
# 询问总长度
if item.get('总长度') is not None:
question = f"{self.get_random_element(self.VERIFICATION_QUESTION_PREFIXES)}'{field_chinese_name}'字段的总长度是多少?"
answer = f"总长度为{item['总长度']}"
qa_pairs.append({
"instruct": question,
"input": "",
"output": f"{self.get_random_element(self.VERIFICATION_ANSWER_PREFIXES)}{answer}{self.get_random_element(self.VERIFICATION_ANSWER_SUFFIXES)}"
})
# 询问小数位
if item.get('小数位') is not None:
question = f"{self.get_random_element(self.VERIFICATION_QUESTION_PREFIXES)}'{field_chinese_name}'字段的小数位是多少?"
answer = f"小数位为{item['小数位']}"
qa_pairs.append({
"instruct": question,
"input": "",
"output": f"{self.get_random_element(self.VERIFICATION_ANSWER_PREFIXES)}{answer}{self.get_random_element(self.VERIFICATION_ANSWER_SUFFIXES)}"
})
# 询问字段英文名
if field_english_name:
question = f"{self.get_random_element(self.VERIFICATION_QUESTION_PREFIXES)}'{field_chinese_name}'字段对应的英文名是什么?"
answer = f"英文名为「{field_english_name}"
qa_pairs.append({
"instruct": question,
"input": "",
"output": f"{self.get_random_element(self.VERIFICATION_ANSWER_PREFIXES)}{answer}{self.get_random_element(self.VERIFICATION_ANSWER_SUFFIXES)}"
})
# 基于字段英文名提问(正式但有变化)
if field_english_name:
# 询问值类型
if item.get('值类型'):
question = f"{self.get_random_element(self.VERIFICATION_QUESTION_PREFIXES)}'{field_english_name}'字段的数据类型是什么?"
answer = f"数据类型是「{item['值类型']}"
qa_pairs.append({
"instruct": question,
"input": "",
"output": f"{self.get_random_element(self.VERIFICATION_ANSWER_PREFIXES)}{answer}{self.get_random_element(self.VERIFICATION_ANSWER_SUFFIXES)}"
})
# 询问是否枚举
if item.get('是否枚举'):
question = f"{self.get_random_element(self.VERIFICATION_QUESTION_PREFIXES)}'{field_english_name}'字段是否为枚举类型?"
answer = f"枚举类型为「{item['是否枚举']}"
qa_pairs.append({
"instruct": question,
"input": "",
"output": f"{self.get_random_element(self.VERIFICATION_ANSWER_PREFIXES)}{answer}{self.get_random_element(self.VERIFICATION_ANSWER_SUFFIXES)}"
})
# 询问总长度
if item.get('总长度') is not None:
question = f"{self.get_random_element(self.VERIFICATION_QUESTION_PREFIXES)}'{field_english_name}'字段的总长度是多少?"
answer = f"总长度为{item['总长度']}"
qa_pairs.append({
"instruct": question,
"input": "",
"output": f"{self.get_random_element(self.VERIFICATION_ANSWER_PREFIXES)}{answer}{self.get_random_element(self.VERIFICATION_ANSWER_SUFFIXES)}"
})
# 询问小数位
if item.get('小数位') is not None:
question = f"{self.get_random_element(self.VERIFICATION_QUESTION_PREFIXES)}'{field_english_name}'字段的小数位是多少?"
answer = f"小数位为{item['小数位']}"
qa_pairs.append({
"instruct": question,
"input": "",
"output": f"{self.get_random_element(self.VERIFICATION_ANSWER_PREFIXES)}{answer}{self.get_random_element(self.VERIFICATION_ANSWER_SUFFIXES)}"
})
# 询问字段中文名
if field_chinese_name:
question = f"{self.get_random_element(self.VERIFICATION_QUESTION_PREFIXES)}'{field_english_name}'字段对应的中文名是什么?"
answer = f"中文名为「{field_chinese_name}"
qa_pairs.append({
"instruct": question,
"input": "",
"output": f"{self.get_random_element(self.VERIFICATION_ANSWER_PREFIXES)}{answer}{self.get_random_element(self.VERIFICATION_ANSWER_SUFFIXES)}"
})
# ==================== 验证集:根据中文字段名询问完整定义 ====================
if field_chinese_name:
question = f"{self.get_random_element(self.VERIFICATION_QUESTION_PREFIXES)}'{field_chinese_name}'字段的具体定义是什么?"
# 构建完整的定义信息
definition_parts = []
for key, value in item.items():
if key not in ['字段中文名'] and value is not None:
definition_parts.append(f"{key}{value}")
elif key not in ['字段中文名'] and value is None:
definition_parts.append(f"{key}null")
definition_text = " ".join(definition_parts)
answer = f"{field_chinese_name}的定义为:{definition_text}"
qa_pairs.append({
"instruct": question,
"input": "",
"output": f"{self.get_random_element(self.VERIFICATION_ANSWER_PREFIXES)}{answer}{self.get_random_element(self.VERIFICATION_ANSWER_SUFFIXES)}"
})
# ==================== 验证集:根据英文字段名询问完整定义 ====================
if field_english_name:
question = f"{self.get_random_element(self.VERIFICATION_QUESTION_PREFIXES)}'{field_english_name}'字段的具体定义是什么?"
# 构建完整的定义信息
definition_parts = []
for key, value in item.items():
if key not in ['字段英文名'] and value is not None:
definition_parts.append(f"{key}{value}")
elif key not in ['字段英文名'] and value is None:
definition_parts.append(f"{key}null")
definition_text = " ".join(definition_parts)
answer = f"{field_english_name}的定义为:{definition_text}"
qa_pairs.append({
"instruct": question,
"input": "",
"output": f"{self.get_random_element(self.VERIFICATION_ANSWER_PREFIXES)}{answer}{self.get_random_element(self.VERIFICATION_ANSWER_SUFFIXES)}"
})
return qa_pairs
def generate_qa_for_data(self, data: List[Dict]) -> List[Dict]:
"""为所有数据生成QA"""
all_qa = []
# 首先收集模型数据
self.collect_model_data(data)
for item in data:
qa_pairs = self.generate_qa_for_item(item)
all_qa.extend(qa_pairs)
# 生成基于模型的问题
model_qa_pairs = self.generate_model_based_qa(data)
all_qa.extend(model_qa_pairs)
return all_qa
def generate_verification_qa_for_data(self, data: List[Dict]) -> List[Dict]:
"""为所有数据生成验证集QA口语化、拟人化表达"""
all_qa = []
# 首先收集模型数据
self.collect_model_data(data)
for item in data:
qa_pairs = self.generate_verification_qa_for_item(item)
all_qa.extend(qa_pairs)
# 生成基于模型的问题(验证集版)
model_qa_pairs = self.generate_verification_model_based_qa(data)
all_qa.extend(model_qa_pairs)
return all_qa
def collect_model_data(self, data: List[Dict]):
"""收集模型相关数据用于后续查询"""
for item in data:
# 收集逻辑模型数据
if "逻辑模型_逻辑模型中文名" in item and item["逻辑模型_逻辑模型中文名"]:
model_name = item["逻辑模型_逻辑模型中文名"]
if model_name not in self.model_data_cache["逻辑模型_逻辑模型中文名"]:
self.model_data_cache["逻辑模型_逻辑模型中文名"][model_name] = []
self.model_data_cache["逻辑模型_逻辑模型中文名"][model_name].append(item.get("字段中文名", ""))
if "逻辑模型_逻辑模型英文名" in item and item["逻辑模型_逻辑模型英文名"]:
model_name = item["逻辑模型_逻辑模型英文名"]
if model_name not in self.model_data_cache["逻辑模型_逻辑模型英文名"]:
self.model_data_cache["逻辑模型_逻辑模型英文名"][model_name] = []
self.model_data_cache["逻辑模型_逻辑模型英文名"][model_name].append(item.get("字段中文名", ""))
# 收集物理模型数据
if "物理模型_物理模型中文名" in item and item["物理模型_物理模型中文名"]:
model_name = item["物理模型_物理模型中文名"]
if model_name not in self.model_data_cache["物理模型_物理模型中文名"]:
self.model_data_cache["物理模型_物理模型中文名"][model_name] = []
self.model_data_cache["物理模型_物理模型中文名"][model_name].append(item.get("字段中文名", ""))
if "物理模型_物理模型英文名" in item and item["物理模型_物理模型英文名"]:
model_name = item["物理模型_物理模型英文名"]
if model_name not in self.model_data_cache["物理模型_物理模型英文名"]:
self.model_data_cache["物理模型_物理模型英文名"][model_name] = []
self.model_data_cache["物理模型_物理模型英文名"][model_name].append(item.get("字段中文名", ""))
def generate_model_based_qa(self, data: List[Dict]) -> List[Dict]:
"""生成基于模型的问题(优化版:只对有足够字段的模型生成问题)"""
qa_pairs = []
# 为每个模型类型生成问题
for model_type, model_dict in self.model_data_cache.items():
for model_name, field_names in model_dict.items():
# 去重字段名
unique_field_names = list(set(field_names))
# 过滤掉空值
unique_field_names = [name for name in unique_field_names if name and name.strip()]
# 优化只对有3个或更多字段的模型生成问题避免问题过多
if len(unique_field_names) < 3:
continue
# 根据模型类型生成不同的问题
if "逻辑模型" in model_type:
if "中文名" in model_type:
question = f"逻辑模型中文名为'{model_name}'的元素有哪些?"
answer_prefix = f"{model_name}对应的元素有:"
else:
question = f"逻辑模型英文名为'{model_name}'的元素有哪些?"
answer_prefix = f"逻辑模型'{model_name}'对应的元素有:"
else: # 物理模型
if "中文名" in model_type:
question = f"物理模型中文名为'{model_name}'的元素有哪些?"
answer_prefix = f"{model_name}对应的元素有:"
else:
question = f"物理模型英文名为'{model_name}'的元素有哪些?"
answer_prefix = f"物理模型'{model_name}'对应的元素有:"
# 构建答案
field_list = "".join(unique_field_names[:10]) # 限制最多10个字段
if len(unique_field_names) > 10:
field_list += f"{len(unique_field_names)}个字段"
answer = f"{answer_prefix}{field_list}"
qa_pairs.append({
"instruct": question,
"input": "",
"output": f"{self.get_random_element(self.ANSWER_PREFIXES)}{answer}{self.get_random_element(self.ANSWER_SUFFIXES)}"
})
return qa_pairs
def generate_verification_model_based_qa(self, data: List[Dict]) -> List[Dict]:
"""生成基于模型的问题(验证集版:正式但有别于训练集)"""
qa_pairs = []
# 为每个模型类型生成问题
for model_type, model_dict in self.model_data_cache.items():
for model_name, field_names in model_dict.items():
# 去重字段名
unique_field_names = list(set(field_names))
# 过滤掉空值
unique_field_names = [name for name in unique_field_names if name and name.strip()]
# 优化只对有3个或更多字段的模型生成问题避免问题过多
if len(unique_field_names) < 3:
continue
# 根据模型类型生成不同的问题(正式但有变化)
if "逻辑模型" in model_type:
if "中文名" in model_type:
question = f"{self.get_random_element(self.VERIFICATION_QUESTION_PREFIXES)}逻辑模型'{model_name}'包含哪些字段?"
answer_prefix = f"{model_name}包含的字段有:"
else:
question = f"{self.get_random_element(self.VERIFICATION_QUESTION_PREFIXES)}逻辑模型'{model_name}'包含哪些字段?"
answer_prefix = f"逻辑模型'{model_name}'包含的字段有:"
else: # 物理模型
if "中文名" in model_type:
question = f"{self.get_random_element(self.VERIFICATION_QUESTION_PREFIXES)}物理模型'{model_name}'包含哪些字段?"
answer_prefix = f"{model_name}包含的字段有:"
else:
question = f"{self.get_random_element(self.VERIFICATION_QUESTION_PREFIXES)}物理模型'{model_name}'包含哪些字段?"
answer_prefix = f"物理模型'{model_name}'包含的字段有:"
# 构建答案
field_list = "".join(unique_field_names[:10]) # 限制最多10个字段
if len(unique_field_names) > 10:
field_list += f"{len(unique_field_names)}个字段"
answer = f"{answer_prefix}{field_list}"
qa_pairs.append({
"instruct": question,
"input": "",
"output": f"{self.get_random_element(self.VERIFICATION_ANSWER_PREFIXES)}{answer}{self.get_random_element(self.VERIFICATION_ANSWER_SUFFIXES)}"
})
return qa_pairs
def shuffle_qa_pairs(self, qa_pairs: List[Dict]) -> List[Dict]:
"""随机打乱问答对顺序"""
if self.config.SHUFFLE_OUTPUT:
random.shuffle(qa_pairs)
return qa_pairs
def save_qa(self, qa_pairs: List[Dict], filename: str):
"""保存QA到文件"""
output_path = os.path.join(self.config.OUTPUT_DIR, filename)
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(qa_pairs, f, ensure_ascii=False, indent=2)
size_kb = os.path.getsize(output_path) / 1024
print(f"[OK] 已生成: {output_path}")
print(f"{len(qa_pairs)} 条问答对, {size_kb:.1f} KB")
def generate_report(self, total_qa_count: int):
"""生成生成报告"""
report = {
"生成时间": "2025-12-31",
"版本": "简化版",
"输入文件": "selected.json",
"输出目录": self.config.OUTPUT_DIR,
"随机种子": self.config.RANDOM_SEED,
"总问答对数量": total_qa_count,
"说明": "基于字段中文名、字段英文名询问其他字段,新增:根据中文字段名/英文字段名询问完整定义,新增:根据逻辑模型/物理模型查询对应元素(仅对字段数>=3的模型生成"
}
report_path = os.path.join(self.config.OUTPUT_DIR, "QA生成报告.json")
with open(report_path, 'w', encoding='utf-8') as f:
json.dump(report, f, ensure_ascii=False, indent=2)
print(f"[OK] 已生成: {report_path}")
def process_selected_json(self, generate_verification: bool = False):
"""处理selected.json文件"""
input_file = os.path.join(self.config.INPUT_DIR, "selected.json")
if not os.path.exists(input_file):
print(f"[ERROR] 文件不存在: {input_file}")
return
print("="*60)
if generate_verification:
print("QA生成器 - 验证集版(正式化表达但有别于训练集)")
else:
print("QA生成器 - 简化版")
print("="*60)
print(f"\n[INFO] 加载数据: {input_file}")
try:
data = self.load_json(input_file)
print(f" 数据记录: {len(data)}")
print(f"\n[INFO] 生成问答对...")
if generate_verification:
qa_pairs = self.generate_verification_qa_for_data(data)
output_filename = "selected_QA_Verification.json"
else:
qa_pairs = self.generate_qa_for_data(data)
output_filename = "selected_QA.json"
print(f" 生成数量: {len(qa_pairs)}")
print(f"\n[INFO] 打乱顺序...")
qa_pairs = self.shuffle_qa_pairs(qa_pairs)
print(f"\n[INFO] 保存文件...")
self.save_qa(qa_pairs, output_filename)
print(f"\n[INFO] 生成报告...")
if generate_verification:
self.generate_verification_report(len(qa_pairs))
else:
self.generate_report(len(qa_pairs))
print(f"\n[DONE] 处理完成!")
print(f"[OUT] 输出目录: {self.config.OUTPUT_DIR}")
print(f"[TOTAL] 总计生成: {len(qa_pairs)} 条问答对")
except Exception as e:
print(f"[ERROR] 处理文件时出错: {str(e)}")
import traceback
traceback.print_exc()
def generate_verification_report(self, total_qa_count: int):
"""生成验证集生成报告"""
report = {
"生成时间": "2025-12-31",
"版本": "验证集版",
"输入文件": "selected.json",
"输出目录": self.config.OUTPUT_DIR,
"随机种子": self.config.RANDOM_SEED,
"总问答对数量": total_qa_count,
"说明": "验证集:基于字段中文名、字段英文名询问其他字段,正式化表达但有别于训练集,新增:根据中文字段名/英文字段名询问完整定义,新增:根据逻辑模型/物理模型查询对应元素(仅对字段数>=3的模型生成"
}
report_path = os.path.join(self.config.OUTPUT_DIR, "QA生成报告_验证集.json")
with open(report_path, 'w', encoding='utf-8') as f:
json.dump(report, f, ensure_ascii=False, indent=2)
print(f"[OK] 已生成: {report_path}")
def main():
"""主函数"""
# 使用默认配置
config = QAConfig()
# 创建生成器
generator = QAGenerator(config)
# 生成训练集
print("\n" + "="*60)
print("开始生成训练集")
print("="*60)
generator.process_selected_json(generate_verification=False)
# 生成验证集
print("\n" + "="*60)
print("开始生成验证集")
print("="*60)
generator.process_selected_json(generate_verification=True)
if __name__ == "__main__":
main()