生成符合问答场景的问答对

This commit is contained in:
2026-01-04 11:34:19 +08:00
parent 8088b59d30
commit a03dd4d250
2 changed files with 444 additions and 9 deletions

View File

@@ -54,6 +54,54 @@ class QAGenerator:
""
]
# 验证集专用模板(正式但有别于训练集)
self.VERIFICATION_QUESTION_PREFIXES = [
"请问",
"想咨询一下",
"请问您",
"我想了解一下",
"请教一下",
"您好,",
"能否告诉我",
"请问如何",
"我想咨询",
"希望了解"
]
self.VERIFICATION_ANSWER_PREFIXES = [
"根据查询,",
"经查询,",
"查询结果显示,",
"根据记录,",
"数据表明,",
"经系统查询,",
"根据数据,",
"查询结果:",
"经核实,",
"数据显示,"
]
self.VERIFICATION_ANSWER_SUFFIXES = [
"",
"",
"",
"",
"",
"",
"",
"",
"",
""
]
# 模型数据缓存
self.model_data_cache = {
"逻辑模型_逻辑模型中文名": {},
"逻辑模型_逻辑模型英文名": {},
"物理模型_物理模型中文名": {},
"物理模型_物理模型英文名": {}
}
def get_random_element(self, elements: List[str]) -> str:
"""从列表中随机获取一个元素"""
return random.choice(elements) if elements else ""
@@ -188,18 +236,365 @@ class QAGenerator:
"output": f"{self.get_random_element(self.ANSWER_PREFIXES)}{answer}{self.get_random_element(self.ANSWER_SUFFIXES)}"
})
# ==================== 新增:根据中文字段名询问完整定义 ====================
if field_chinese_name:
question = f"字段中文名为'{field_chinese_name}'的定义是什么?"
# 构建完整的定义信息
definition_parts = []
for key, value in item.items():
if key not in ['字段中文名'] and value is not None:
definition_parts.append(f"{key}{value}")
elif key not in ['字段中文名'] and value is None:
definition_parts.append(f"{key}null")
definition_text = " ".join(definition_parts)
answer = f"{field_chinese_name}的定义为:{definition_text}"
qa_pairs.append({
"instruct": question,
"input": "",
"output": f"{self.get_random_element(self.ANSWER_PREFIXES)}{answer}{self.get_random_element(self.ANSWER_SUFFIXES)}"
})
# ==================== 新增:根据英文字段名询问完整定义 ====================
if field_english_name:
question = f"字段英文名为'{field_english_name}'的定义是什么?"
# 构建完整的定义信息
definition_parts = []
for key, value in item.items():
if key not in ['字段英文名'] and value is not None:
definition_parts.append(f"{key}{value}")
elif key not in ['字段英文名'] and value is None:
definition_parts.append(f"{key}null")
definition_text = " ".join(definition_parts)
answer = f"{field_english_name}的定义为:{definition_text}"
qa_pairs.append({
"instruct": question,
"input": "",
"output": f"{self.get_random_element(self.ANSWER_PREFIXES)}{answer}{self.get_random_element(self.ANSWER_SUFFIXES)}"
})
return qa_pairs
def generate_verification_qa_for_item(self, item: Dict) -> List[Dict]:
"""为单个数据项生成验证集问答对(正式但有别于训练集的表达)"""
qa_pairs = []
# 获取两个核心字段
field_chinese_name = item.get('字段中文名', '')
field_english_name = item.get('字段英文名', '')
# 基于字段中文名提问(正式但有变化)
if field_chinese_name:
# 询问值类型
if item.get('值类型'):
question = f"{self.get_random_element(self.VERIFICATION_QUESTION_PREFIXES)}'{field_chinese_name}'字段的数据类型是什么?"
answer = f"数据类型是「{item['值类型']}"
qa_pairs.append({
"instruct": question,
"input": "",
"output": f"{self.get_random_element(self.VERIFICATION_ANSWER_PREFIXES)}{answer}{self.get_random_element(self.VERIFICATION_ANSWER_SUFFIXES)}"
})
# 询问是否枚举
if item.get('是否枚举'):
question = f"{self.get_random_element(self.VERIFICATION_QUESTION_PREFIXES)}'{field_chinese_name}'字段是否为枚举类型?"
answer = f"枚举类型为「{item['是否枚举']}"
qa_pairs.append({
"instruct": question,
"input": "",
"output": f"{self.get_random_element(self.VERIFICATION_ANSWER_PREFIXES)}{answer}{self.get_random_element(self.VERIFICATION_ANSWER_SUFFIXES)}"
})
# 询问枚举数量
if item.get('枚举数量') is not None:
question = f"{self.get_random_element(self.VERIFICATION_QUESTION_PREFIXES)}'{field_chinese_name}'字段的枚举数量是多少?"
answer = f"枚举数量为{item['枚举数量']}"
qa_pairs.append({
"instruct": question,
"input": "",
"output": f"{self.get_random_element(self.VERIFICATION_ANSWER_PREFIXES)}{answer}{self.get_random_element(self.VERIFICATION_ANSWER_SUFFIXES)}"
})
# 询问总长度
if item.get('总长度') is not None:
question = f"{self.get_random_element(self.VERIFICATION_QUESTION_PREFIXES)}'{field_chinese_name}'字段的总长度是多少?"
answer = f"总长度为{item['总长度']}"
qa_pairs.append({
"instruct": question,
"input": "",
"output": f"{self.get_random_element(self.VERIFICATION_ANSWER_PREFIXES)}{answer}{self.get_random_element(self.VERIFICATION_ANSWER_SUFFIXES)}"
})
# 询问小数位
if item.get('小数位') is not None:
question = f"{self.get_random_element(self.VERIFICATION_QUESTION_PREFIXES)}'{field_chinese_name}'字段的小数位是多少?"
answer = f"小数位为{item['小数位']}"
qa_pairs.append({
"instruct": question,
"input": "",
"output": f"{self.get_random_element(self.VERIFICATION_ANSWER_PREFIXES)}{answer}{self.get_random_element(self.VERIFICATION_ANSWER_SUFFIXES)}"
})
# 询问字段英文名
if field_english_name:
question = f"{self.get_random_element(self.VERIFICATION_QUESTION_PREFIXES)}'{field_chinese_name}'字段对应的英文名是什么?"
answer = f"英文名为「{field_english_name}"
qa_pairs.append({
"instruct": question,
"input": "",
"output": f"{self.get_random_element(self.VERIFICATION_ANSWER_PREFIXES)}{answer}{self.get_random_element(self.VERIFICATION_ANSWER_SUFFIXES)}"
})
# 基于字段英文名提问(正式但有变化)
if field_english_name:
# 询问值类型
if item.get('值类型'):
question = f"{self.get_random_element(self.VERIFICATION_QUESTION_PREFIXES)}'{field_english_name}'字段的数据类型是什么?"
answer = f"数据类型是「{item['值类型']}"
qa_pairs.append({
"instruct": question,
"input": "",
"output": f"{self.get_random_element(self.VERIFICATION_ANSWER_PREFIXES)}{answer}{self.get_random_element(self.VERIFICATION_ANSWER_SUFFIXES)}"
})
# 询问是否枚举
if item.get('是否枚举'):
question = f"{self.get_random_element(self.VERIFICATION_QUESTION_PREFIXES)}'{field_english_name}'字段是否为枚举类型?"
answer = f"枚举类型为「{item['是否枚举']}"
qa_pairs.append({
"instruct": question,
"input": "",
"output": f"{self.get_random_element(self.VERIFICATION_ANSWER_PREFIXES)}{answer}{self.get_random_element(self.VERIFICATION_ANSWER_SUFFIXES)}"
})
# 询问总长度
if item.get('总长度') is not None:
question = f"{self.get_random_element(self.VERIFICATION_QUESTION_PREFIXES)}'{field_english_name}'字段的总长度是多少?"
answer = f"总长度为{item['总长度']}"
qa_pairs.append({
"instruct": question,
"input": "",
"output": f"{self.get_random_element(self.VERIFICATION_ANSWER_PREFIXES)}{answer}{self.get_random_element(self.VERIFICATION_ANSWER_SUFFIXES)}"
})
# 询问小数位
if item.get('小数位') is not None:
question = f"{self.get_random_element(self.VERIFICATION_QUESTION_PREFIXES)}'{field_english_name}'字段的小数位是多少?"
answer = f"小数位为{item['小数位']}"
qa_pairs.append({
"instruct": question,
"input": "",
"output": f"{self.get_random_element(self.VERIFICATION_ANSWER_PREFIXES)}{answer}{self.get_random_element(self.VERIFICATION_ANSWER_SUFFIXES)}"
})
# 询问字段中文名
if field_chinese_name:
question = f"{self.get_random_element(self.VERIFICATION_QUESTION_PREFIXES)}'{field_english_name}'字段对应的中文名是什么?"
answer = f"中文名为「{field_chinese_name}"
qa_pairs.append({
"instruct": question,
"input": "",
"output": f"{self.get_random_element(self.VERIFICATION_ANSWER_PREFIXES)}{answer}{self.get_random_element(self.VERIFICATION_ANSWER_SUFFIXES)}"
})
# ==================== 验证集:根据中文字段名询问完整定义 ====================
if field_chinese_name:
question = f"{self.get_random_element(self.VERIFICATION_QUESTION_PREFIXES)}'{field_chinese_name}'字段的具体定义是什么?"
# 构建完整的定义信息
definition_parts = []
for key, value in item.items():
if key not in ['字段中文名'] and value is not None:
definition_parts.append(f"{key}{value}")
elif key not in ['字段中文名'] and value is None:
definition_parts.append(f"{key}null")
definition_text = " ".join(definition_parts)
answer = f"{field_chinese_name}的定义为:{definition_text}"
qa_pairs.append({
"instruct": question,
"input": "",
"output": f"{self.get_random_element(self.VERIFICATION_ANSWER_PREFIXES)}{answer}{self.get_random_element(self.VERIFICATION_ANSWER_SUFFIXES)}"
})
# ==================== 验证集:根据英文字段名询问完整定义 ====================
if field_english_name:
question = f"{self.get_random_element(self.VERIFICATION_QUESTION_PREFIXES)}'{field_english_name}'字段的具体定义是什么?"
# 构建完整的定义信息
definition_parts = []
for key, value in item.items():
if key not in ['字段英文名'] and value is not None:
definition_parts.append(f"{key}{value}")
elif key not in ['字段英文名'] and value is None:
definition_parts.append(f"{key}null")
definition_text = " ".join(definition_parts)
answer = f"{field_english_name}的定义为:{definition_text}"
qa_pairs.append({
"instruct": question,
"input": "",
"output": f"{self.get_random_element(self.VERIFICATION_ANSWER_PREFIXES)}{answer}{self.get_random_element(self.VERIFICATION_ANSWER_SUFFIXES)}"
})
return qa_pairs
def generate_qa_for_data(self, data: List[Dict]) -> List[Dict]:
"""为所有数据生成QA"""
all_qa = []
# 首先收集模型数据
self.collect_model_data(data)
for item in data:
qa_pairs = self.generate_qa_for_item(item)
all_qa.extend(qa_pairs)
# 生成基于模型的问题
model_qa_pairs = self.generate_model_based_qa(data)
all_qa.extend(model_qa_pairs)
return all_qa
def generate_verification_qa_for_data(self, data: List[Dict]) -> List[Dict]:
"""为所有数据生成验证集QA口语化、拟人化表达"""
all_qa = []
# 首先收集模型数据
self.collect_model_data(data)
for item in data:
qa_pairs = self.generate_verification_qa_for_item(item)
all_qa.extend(qa_pairs)
# 生成基于模型的问题(验证集版)
model_qa_pairs = self.generate_verification_model_based_qa(data)
all_qa.extend(model_qa_pairs)
return all_qa
def collect_model_data(self, data: List[Dict]):
"""收集模型相关数据用于后续查询"""
for item in data:
# 收集逻辑模型数据
if "逻辑模型_逻辑模型中文名" in item and item["逻辑模型_逻辑模型中文名"]:
model_name = item["逻辑模型_逻辑模型中文名"]
if model_name not in self.model_data_cache["逻辑模型_逻辑模型中文名"]:
self.model_data_cache["逻辑模型_逻辑模型中文名"][model_name] = []
self.model_data_cache["逻辑模型_逻辑模型中文名"][model_name].append(item.get("字段中文名", ""))
if "逻辑模型_逻辑模型英文名" in item and item["逻辑模型_逻辑模型英文名"]:
model_name = item["逻辑模型_逻辑模型英文名"]
if model_name not in self.model_data_cache["逻辑模型_逻辑模型英文名"]:
self.model_data_cache["逻辑模型_逻辑模型英文名"][model_name] = []
self.model_data_cache["逻辑模型_逻辑模型英文名"][model_name].append(item.get("字段中文名", ""))
# 收集物理模型数据
if "物理模型_物理模型中文名" in item and item["物理模型_物理模型中文名"]:
model_name = item["物理模型_物理模型中文名"]
if model_name not in self.model_data_cache["物理模型_物理模型中文名"]:
self.model_data_cache["物理模型_物理模型中文名"][model_name] = []
self.model_data_cache["物理模型_物理模型中文名"][model_name].append(item.get("字段中文名", ""))
if "物理模型_物理模型英文名" in item and item["物理模型_物理模型英文名"]:
model_name = item["物理模型_物理模型英文名"]
if model_name not in self.model_data_cache["物理模型_物理模型英文名"]:
self.model_data_cache["物理模型_物理模型英文名"][model_name] = []
self.model_data_cache["物理模型_物理模型英文名"][model_name].append(item.get("字段中文名", ""))
def generate_model_based_qa(self, data: List[Dict]) -> List[Dict]:
"""生成基于模型的问题(优化版:只对有足够字段的模型生成问题)"""
qa_pairs = []
# 为每个模型类型生成问题
for model_type, model_dict in self.model_data_cache.items():
for model_name, field_names in model_dict.items():
# 去重字段名
unique_field_names = list(set(field_names))
# 过滤掉空值
unique_field_names = [name for name in unique_field_names if name and name.strip()]
# 优化只对有3个或更多字段的模型生成问题避免问题过多
if len(unique_field_names) < 3:
continue
# 根据模型类型生成不同的问题
if "逻辑模型" in model_type:
if "中文名" in model_type:
question = f"逻辑模型中文名为'{model_name}'的元素有哪些?"
answer_prefix = f"{model_name}对应的元素有:"
else:
question = f"逻辑模型英文名为'{model_name}'的元素有哪些?"
answer_prefix = f"逻辑模型'{model_name}'对应的元素有:"
else: # 物理模型
if "中文名" in model_type:
question = f"物理模型中文名为'{model_name}'的元素有哪些?"
answer_prefix = f"{model_name}对应的元素有:"
else:
question = f"物理模型英文名为'{model_name}'的元素有哪些?"
answer_prefix = f"物理模型'{model_name}'对应的元素有:"
# 构建答案
field_list = "".join(unique_field_names[:10]) # 限制最多10个字段
if len(unique_field_names) > 10:
field_list += f"{len(unique_field_names)}个字段"
answer = f"{answer_prefix}{field_list}"
qa_pairs.append({
"instruct": question,
"input": "",
"output": f"{self.get_random_element(self.ANSWER_PREFIXES)}{answer}{self.get_random_element(self.ANSWER_SUFFIXES)}"
})
return qa_pairs
def generate_verification_model_based_qa(self, data: List[Dict]) -> List[Dict]:
"""生成基于模型的问题(验证集版:正式但有别于训练集)"""
qa_pairs = []
# 为每个模型类型生成问题
for model_type, model_dict in self.model_data_cache.items():
for model_name, field_names in model_dict.items():
# 去重字段名
unique_field_names = list(set(field_names))
# 过滤掉空值
unique_field_names = [name for name in unique_field_names if name and name.strip()]
# 优化只对有3个或更多字段的模型生成问题避免问题过多
if len(unique_field_names) < 3:
continue
# 根据模型类型生成不同的问题(正式但有变化)
if "逻辑模型" in model_type:
if "中文名" in model_type:
question = f"{self.get_random_element(self.VERIFICATION_QUESTION_PREFIXES)}逻辑模型'{model_name}'包含哪些字段?"
answer_prefix = f"{model_name}包含的字段有:"
else:
question = f"{self.get_random_element(self.VERIFICATION_QUESTION_PREFIXES)}逻辑模型'{model_name}'包含哪些字段?"
answer_prefix = f"逻辑模型'{model_name}'包含的字段有:"
else: # 物理模型
if "中文名" in model_type:
question = f"{self.get_random_element(self.VERIFICATION_QUESTION_PREFIXES)}物理模型'{model_name}'包含哪些字段?"
answer_prefix = f"{model_name}包含的字段有:"
else:
question = f"{self.get_random_element(self.VERIFICATION_QUESTION_PREFIXES)}物理模型'{model_name}'包含哪些字段?"
answer_prefix = f"物理模型'{model_name}'包含的字段有:"
# 构建答案
field_list = "".join(unique_field_names[:10]) # 限制最多10个字段
if len(unique_field_names) > 10:
field_list += f"{len(unique_field_names)}个字段"
answer = f"{answer_prefix}{field_list}"
qa_pairs.append({
"instruct": question,
"input": "",
"output": f"{self.get_random_element(self.VERIFICATION_ANSWER_PREFIXES)}{answer}{self.get_random_element(self.VERIFICATION_ANSWER_SUFFIXES)}"
})
return qa_pairs
def shuffle_qa_pairs(self, qa_pairs: List[Dict]) -> List[Dict]:
"""随机打乱问答对顺序"""
if self.config.SHUFFLE_OUTPUT:
@@ -225,7 +620,7 @@ class QAGenerator:
"输出目录": self.config.OUTPUT_DIR,
"随机种子": self.config.RANDOM_SEED,
"总问答对数量": total_qa_count,
"说明": "基于字段中文名、字段英文名、抽象中文名询问其他所有字段"
"说明": "基于字段中文名、字段英文名询问其他字段,新增:根据中文字段名/英文字段名询问完整定义,新增:根据逻辑模型/物理模型查询对应元素(仅对字段数>=3的模型生成"
}
report_path = os.path.join(self.config.OUTPUT_DIR, "QA生成报告.json")
@@ -234,7 +629,7 @@ class QAGenerator:
print(f"[OK] 已生成: {report_path}")
def process_selected_json(self):
def process_selected_json(self, generate_verification: bool = False):
"""处理selected.json文件"""
input_file = os.path.join(self.config.INPUT_DIR, "selected.json")
@@ -243,7 +638,10 @@ class QAGenerator:
return
print("="*60)
print("QA生成器 - 简化版")
if generate_verification:
print("QA生成器 - 验证集版(正式化表达但有别于训练集)")
else:
print("QA生成器 - 简化版")
print("="*60)
print(f"\n[INFO] 加载数据: {input_file}")
@@ -252,17 +650,25 @@ class QAGenerator:
print(f" 数据记录: {len(data)}")
print(f"\n[INFO] 生成问答对...")
qa_pairs = self.generate_qa_for_data(data)
if generate_verification:
qa_pairs = self.generate_verification_qa_for_data(data)
output_filename = "selected_QA_Verification.json"
else:
qa_pairs = self.generate_qa_for_data(data)
output_filename = "selected_QA.json"
print(f" 生成数量: {len(qa_pairs)}")
print(f"\n[INFO] 打乱顺序...")
qa_pairs = self.shuffle_qa_pairs(qa_pairs)
print(f"\n[INFO] 保存文件...")
self.save_qa(qa_pairs, "selected_QA.json")
self.save_qa(qa_pairs, output_filename)
print(f"\n[INFO] 生成报告...")
self.generate_report(len(qa_pairs))
if generate_verification:
self.generate_verification_report(len(qa_pairs))
else:
self.generate_report(len(qa_pairs))
print(f"\n[DONE] 处理完成!")
print(f"[OUT] 输出目录: {self.config.OUTPUT_DIR}")
@@ -273,15 +679,44 @@ class QAGenerator:
import traceback
traceback.print_exc()
def generate_verification_report(self, total_qa_count: int):
"""生成验证集生成报告"""
report = {
"生成时间": "2025-12-31",
"版本": "验证集版",
"输入文件": "selected.json",
"输出目录": self.config.OUTPUT_DIR,
"随机种子": self.config.RANDOM_SEED,
"总问答对数量": total_qa_count,
"说明": "验证集:基于字段中文名、字段英文名询问其他字段,正式化表达但有别于训练集,新增:根据中文字段名/英文字段名询问完整定义,新增:根据逻辑模型/物理模型查询对应元素(仅对字段数>=3的模型生成"
}
report_path = os.path.join(self.config.OUTPUT_DIR, "QA生成报告_验证集.json")
with open(report_path, 'w', encoding='utf-8') as f:
json.dump(report, f, ensure_ascii=False, indent=2)
print(f"[OK] 已生成: {report_path}")
def main():
"""主函数"""
# 使用默认配置
config = QAConfig()
# 创建生成器并处理
# 创建生成器
generator = QAGenerator(config)
generator.process_selected_json()
# 生成训练集
print("\n" + "="*60)
print("开始生成训练集")
print("="*60)
generator.process_selected_json(generate_verification=False)
# 生成验证集
print("\n" + "="*60)
print("开始生成验证集")
print("="*60)
generator.process_selected_json(generate_verification=True)
if __name__ == "__main__":