Files
YG_LLM_TESTER/model_evaluation.py

948 lines
34 KiB
Python
Raw Normal View History

2025-12-23 15:07:19 +08:00
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
大模型微调验证系统
用于评估模型输出与参考答案的相似度和质量
LLM评估配置
- 默认使用模拟评估基于传统指标
- 如需使用真实LLM请配置 llm_config.py 文件
"""
import json
import os
import re
import time
import multiprocessing as mp
2025-12-23 15:07:19 +08:00
from collections import Counter
from typing import Dict, List, Tuple, Any
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
2025-12-23 15:07:19 +08:00
import numpy as np
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
from nltk.tokenize import word_tokenize
import jieba
import jieba.posseg as pseg
from difflib import SequenceMatcher
import pandas as pd
from tqdm import tqdm
import requests
2025-12-23 15:07:19 +08:00
# 导入LLM配置
from llm_config import (
USE_REAL_LLM,
MAX_CONCURRENT_WORKERS,
SHOW_DETAILED_PROGRESS,
MAX_API_RETRIES,
RETRY_DELAY,
2025-12-23 15:07:19 +08:00
OPENAI_CONFIG,
EVALUATION_WEIGHTS,
PROMPT_TEMPLATE,
validate_openai_config,
print_config_info
)
class ModelEvaluator:
"""模型评估器"""
def __init__(self):
# 初始化jieba分词
jieba.initialize()
# 停用词列表(中文常见停用词)
self.stopwords = {
'', '', '', '', '', '', '', '', '', '', '', '', '一个',
'', '', '', '', '', '', '', '', '', '', '没有', '', '',
'自己', '', '', '', '', '', '', '', '', '时候', '', '如果'
}
# 初始化平滑函数
self.smoothing_function = SmoothingFunction()
def get_llm_evaluation_prompt(self, reference: str, candidate: str, question: str = "") -> str:
"""生成LLM评估提示词"""
# 使用配置中的提示词模板
return PROMPT_TEMPLATE.format(
question=question,
reference=reference,
candidate=candidate
)
def call_llm_for_evaluation(self, prompt: str, max_retries: int = 3, retry_delay: float = 1.0) -> Tuple[int, str]:
"""调用大语言模型进行评估(带自动重试)
2025-12-23 15:07:19 +08:00
使用配置
- API配置来自 llm_config.py 中的 OPENAI_CONFIG
- 支持环境变量和直接配置
参数
- prompt: 评估提示词
- max_retries: 最大重试次数默认3次
- retry_delay: 重试延迟时间默认1秒
2025-12-23 15:07:19 +08:00
配置方法
1. 设置环境变量export OPENAI_API_KEY='your-api-key'
2. llm_config.py 中直接修改 OPENAI_CONFIG
"""
# 验证配置
config = validate_openai_config()
# 调用API使用传统HTTP请求带重试机制
for attempt in range(max_retries + 1):
2025-12-23 15:07:19 +08:00
try:
# 构造请求体
payload = {
"model": config["model"],
"messages": [
{"role": "system", "content": "你是一个专业的文本质量评估专家。"},
{"role": "user", "content": prompt}
],
"temperature": config["temperature"],
"max_tokens": config["max_tokens"]
}
# 发送HTTP请求
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {config['api_key']}"
}
response = requests.post(
f"{config['api_base']}/chat/completions",
headers=headers,
json=payload,
timeout=config["timeout"]
)
# 检查响应状态
response.raise_for_status()
# 解析JSON响应
import json
response_data = response.json()
content = response_data["choices"][0]["message"]["content"]
try:
result = json.loads(content)
score = int(result.get("score", 0))
reason = str(result.get("reason", ""))
return score, reason
except (json.JSONDecodeError, KeyError, ValueError):
# 如果无法解析JSON尝试从文本中提取
# 这里可以添加正则表达式来提取评分
raise ValueError(f"无法解析LLM响应{content}")
except (requests.exceptions.RequestException, requests.exceptions.Timeout,
requests.exceptions.ConnectionError, RuntimeError) as e:
# 网络相关错误,可以重试
if attempt < max_retries:
wait_time = retry_delay * (2 ** attempt) # 指数退避
print(f"API调用失败尝试 {attempt + 1}/{max_retries + 1}{wait_time:.1f}秒后重试: {str(e)[:100]}")
time.sleep(wait_time)
continue
else:
print(f"API调用失败已重试 {max_retries}")
raise RuntimeError(f"API调用失败已重试{max_retries}次):{str(e)}")
2025-12-23 15:07:19 +08:00
except Exception as e:
# 其他错误如JSON解析错误不重试
print(f"API调用失败不可重试的错误: {str(e)}")
raise RuntimeError(f"API调用失败{str(e)}")
# 这里不应该到达,但如果到达了,抛出异常
raise RuntimeError("API调用失败达到最大重试次数")
2025-12-23 15:07:19 +08:00
def get_mock_llm_evaluation(self, reference: str, candidate: str, question: str = "") -> Tuple[int, str]:
"""获取模拟的LLM评估结果用于演示
使用配置
- 权重来自 llm_config.py 中的 EVALUATION_WEIGHTS
"""
# 计算一些基本指标作为参考
bleu = self.calculate_bleu_score(reference, candidate)
rouge_l = self.calculate_rouge_l(reference, candidate)
exact_match = self.calculate_exact_match_rate(reference, candidate)
keyword_overlap = self.calculate_keyword_overlap(reference, candidate)
# 使用配置中的权重进行加权计算
composite_score = (
bleu * EVALUATION_WEIGHTS['bleu_score'] +
rouge_l * EVALUATION_WEIGHTS['rouge_l_score'] +
exact_match * EVALUATION_WEIGHTS['exact_match_rate'] +
keyword_overlap * EVALUATION_WEIGHTS['keyword_overlap_rate']
)
score = int(composite_score * 10)
# 生成评价理由
reason_parts = []
if exact_match > 0.9:
reason_parts.append("生成答案与参考答案内容完全一致")
elif bleu > 0.1 or rouge_l > 0.3:
reason_parts.append("生成答案在内容上与参考答案较为一致")
else:
reason_parts.append("生成答案与参考答案存在较大差异")
if keyword_overlap > 0.5:
reason_parts.append("关键词重叠度较高")
elif keyword_overlap > 0.3:
reason_parts.append("关键词重叠度中等")
else:
reason_parts.append("关键词重叠度较低")
reason = "".join(reason_parts) + f"。综合指标BLEU={bleu:.3f}, ROUGE-L={rouge_l:.3f}, 完全匹配率={exact_match:.3f}, 关键词重叠率={keyword_overlap:.3f}"
return min(10, max(1, score)), reason
def tokenize_chinese(self, text: str) -> List[str]:
"""中文分词"""
if not text:
return []
# 使用jieba进行分词
words = jieba.cut(text)
return [word.strip() for word in words if len(word.strip()) > 0]
def get_keywords(self, text: str) -> List[str]:
"""提取关键词(名词、动词、形容词)"""
if not text:
return []
words = pseg.cut(text)
keywords = []
for word, flag in words:
# 选择名词(n)、动词(v)、形容词(a)、区别词(b)等作为关键词
if flag.startswith(('n', 'v', 'a', 'b', 'i')) and word not in self.stopwords and len(word) > 1:
keywords.append(word)
return keywords
def calculate_bleu_score(self, reference: str, candidate: str) -> float:
"""计算BLEU分数"""
if not reference or not candidate:
return 0.0
# 分词
ref_tokens = self.tokenize_chinese(reference)
cand_tokens = self.tokenize_chinese(candidate)
if not cand_tokens:
return 0.0
# 计算BLEU分数使用1-gram到4-gram
try:
score = sentence_bleu(
[ref_tokens],
cand_tokens,
smoothing_function=self.smoothing_function.method1
)
return score
except:
return 0.0
def calculate_rouge_l(self, reference: str, candidate: str) -> float:
"""计算ROUGE-L分数"""
if not reference or not candidate:
return 0.0
ref_tokens = self.tokenize_chinese(reference)
cand_tokens = self.tokenize_chinese(candidate)
if not ref_tokens or not cand_tokens:
return 0.0
# 计算最长公共子序列
lcs = self._lcs_length(ref_tokens, cand_tokens)
# 计算ROUGE-L F1分数
ref_len = len(ref_tokens)
cand_len = len(cand_tokens)
if ref_len == 0 or cand_len == 0:
return 0.0
precision = lcs / cand_len
recall = lcs / ref_len
if precision + recall == 0:
return 0.0
rouge_l = (2 * precision * recall) / (precision + recall)
return rouge_l
def _lcs_length(self, seq1: List[str], seq2: List[str]) -> int:
"""计算最长公共子序列长度"""
m, n = len(seq1), len(seq2)
dp = [[0] * (n + 1) for _ in range(m + 1)]
for i in range(1, m + 1):
for j in range(1, n + 1):
if seq1[i-1] == seq2[j-1]:
dp[i][j] = dp[i-1][j-1] + 1
else:
dp[i][j] = max(dp[i-1][j], dp[i][j-1])
return dp[m][n]
def calculate_character_overlap(self, reference: str, candidate: str) -> float:
"""计算字符重叠率"""
if not reference or not candidate:
return 0.0
ref_chars = set(reference)
cand_chars = set(candidate)
if not cand_chars:
return 0.0
overlap = len(ref_chars & cand_chars)
total_cand = len(cand_chars)
return overlap / total_cand if total_cand > 0 else 0.0
def calculate_length_similarity(self, reference: str, candidate: str) -> float:
"""计算长度相似度"""
if not reference or not candidate:
return 0.0
ref_len = len(reference)
cand_len = len(candidate)
if ref_len == 0 and cand_len == 0:
return 1.0
if ref_len == 0 or cand_len == 0:
return 0.0
# 使用相对差异计算相似度
similarity = 1 - abs(ref_len - cand_len) / max(ref_len, cand_len)
return max(0, similarity)
def calculate_exact_match_rate(self, reference: str, candidate: str) -> float:
"""计算完全匹配率"""
if not reference and not candidate:
return 1.0
if not reference or not candidate:
return 0.0
# 去除空白字符后比较
ref_clean = re.sub(r'\s+', '', reference.strip())
cand_clean = re.sub(r'\s+', '', candidate.strip())
return 1.0 if ref_clean == cand_clean else 0.0
def calculate_keyword_overlap(self, reference: str, candidate: str) -> float:
"""计算关键词重叠率"""
ref_keywords = set(self.get_keywords(reference))
cand_keywords = set(self.get_keywords(candidate))
if not cand_keywords:
return 0.0
overlap = len(ref_keywords & cand_keywords)
total_cand = len(cand_keywords)
return overlap / total_cand if total_cand > 0 else 0.0
def evaluate_all_metrics(self, reference: str, candidate: str) -> Dict[str, float]:
"""计算所有评估指标"""
return {
'bleu_score': self.calculate_bleu_score(reference, candidate),
'rouge_l_score': self.calculate_rouge_l(reference, candidate),
'character_overlap_rate': self.calculate_character_overlap(reference, candidate),
'length_similarity': self.calculate_length_similarity(reference, candidate),
'exact_match_rate': self.calculate_exact_match_rate(reference, candidate),
'keyword_overlap_rate': self.calculate_keyword_overlap(reference, candidate)
}
def get_cpu_count():
"""获取CPU核心数"""
try:
return mp.cpu_count()
except:
return 4 # 默认值
def evaluate_single_item(args):
"""单条数据评估函数(用于并发处理)"""
idx, item, evaluator, use_real_llm = args
# 支持多种字段名格式
input_text = item.get('question', item.get('Input', item.get('问题', '')))
output_text = item.get('output', item.get('Output', item.get('生成答案', '')))
answer_text = item.get('answer', item.get('Answer', item.get('参考答案', '')))
# 计算各项指标
metrics = evaluator.evaluate_all_metrics(answer_text, output_text)
# 获取LLM评估
if use_real_llm:
try:
prompt = evaluator.get_llm_evaluation_prompt(answer_text, output_text, input_text)
# 使用配置文件中的重试参数
llm_score, llm_reason = evaluator.call_llm_for_evaluation(
prompt,
max_retries=MAX_API_RETRIES,
retry_delay=RETRY_DELAY
)
except Exception as e:
# 静默处理错误,返回模拟评估结果
llm_score, llm_reason = evaluator.get_mock_llm_evaluation(answer_text, output_text, input_text)
else:
llm_score, llm_reason = evaluator.get_mock_llm_evaluation(answer_text, output_text, input_text)
# 添加原始数据
result = {
'index': idx + 1,
'Input': input_text,
'Output': output_text,
'Answer': answer_text,
**metrics,
'llm_score': llm_score,
'llm_reason': llm_reason
}
return result
def evaluate_dataset_parallel(data: List[Dict[str, Any]], evaluator: ModelEvaluator, use_real_llm: bool = False, max_workers: int = None) -> Tuple[List[Dict], Dict[str, float]]:
"""并发评估整个数据集
Args:
data: 数据列表
evaluator: 评估器实例
use_real_llm: 是否使用真实LLM评估
max_workers: 最大并发数默认使用CPU核心数
"""
results = []
total_metrics = {
'bleu_score': 0.0,
'rouge_l_score': 0.0,
'character_overlap_rate': 0.0,
'length_similarity': 0.0,
'exact_match_rate': 0.0,
'keyword_overlap_rate': 0.0,
'llm_score': 0.0
}
# 获取并发数(优先级:参数 > 配置文件 > CPU核心数
if max_workers is not None:
# 直接使用传入的参数
pass
elif MAX_CONCURRENT_WORKERS is not None:
# 使用配置文件中的设置
max_workers = MAX_CONCURRENT_WORKERS
else:
# 默认使用CPU核心数
max_workers = get_cpu_count()
print(f"\n开始并发评估 {len(data)} 条数据,使用 {max_workers} 个并发线程...")
if use_real_llm:
print("注意LLM评分功能使用真实的大语言模型API")
print("配置来源llm_config.py")
print(f"并发数: {max_workers}")
else:
print("注意LLM评分功能使用模拟评估基于传统指标的综合评分")
print("配置来源llm_config.py 中的 EVALUATION_WEIGHTS")
print(f"并发数: {max_workers}")
# 使用ThreadPoolExecutor进行并发评估按批次处理
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# 创建总进度条
total_pbar = tqdm(total=len(data), desc="总进度", position=0, leave=True)
# 分批处理数据
batch_size = max_workers # 每批的大小等于并发数
batches = [data[i:i + batch_size] for i in range(0, len(data), batch_size)]
total_batches = len(batches)
# 处理每个批次(动态创建进度条)
for batch_idx, batch_data in enumerate(batches):
batch_num = batch_idx + 1
# 动态创建当前批次的进度条
current_batch_pbars = [] # 当前批次的进度条列表
if SHOW_DETAILED_PROGRESS:
for worker_idx in range(len(batch_data)):
pbar = tqdm(
total=1,
desc=f"批次{batch_num}-并发{worker_idx + 1}: 等待任务",
position=worker_idx + 1, # 从位置1开始
leave=False
)
current_batch_pbars.append(pbar)
# 提交当前批次的所有任务
future_to_info = {} # future -> (item_idx, worker_idx)
for item_idx, item in enumerate(batch_data):
worker_idx = item_idx # 当前批次内的worker索引
global_idx = batch_idx * batch_size + item_idx # 全局索引
future = executor.submit(evaluate_single_item, (global_idx, item, evaluator, use_real_llm))
future_to_info[future] = (global_idx, worker_idx)
# 更新批次进度条
if SHOW_DETAILED_PROGRESS:
pbar = current_batch_pbars[worker_idx]
pbar.set_description(f"批次{batch_num}-并发{worker_idx + 1}: 任务{global_idx + 1}")
pbar.refresh()
# 等待当前批次的所有任务完成
for future in as_completed(future_to_info):
global_idx, worker_idx = future_to_info[future]
try:
result = future.result()
results.append(result)
# 更新批次进度条状态
if SHOW_DETAILED_PROGRESS:
pbar = current_batch_pbars[worker_idx]
pbar.update(1)
pbar.set_description(f"批次{batch_num}-并发{worker_idx + 1}: 任务{global_idx + 1} [完成]")
pbar.refresh()
pbar.close()
# 累加指标
for key in total_metrics:
if key in result:
total_metrics[key] += result[key]
# 更新总进度条
total_pbar.update(1)
except Exception as e:
print(f" [Warning] 任务{global_idx + 1}评估失败: {e}")
if SHOW_DETAILED_PROGRESS:
pbar = current_batch_pbars[worker_idx]
pbar.set_description(f"批次{batch_num}-并发{worker_idx + 1}: 任务{global_idx + 1} [失败]")
pbar.refresh()
pbar.close()
total_pbar.update(1)
# 当前批次完成,关闭该批次的进度条
if SHOW_DETAILED_PROGRESS:
for pbar in current_batch_pbars:
pbar.close()
# 批次间隔(可选)
if batch_idx < total_batches - 1:
time.sleep(0.1) # 短暂间隔
# 关闭总进度条
total_pbar.close()
# 按原始顺序排序
results.sort(key=lambda x: x['index'])
# 计算平均值
num_samples = len(data)
for key in total_metrics:
total_metrics[key] /= num_samples if num_samples > 0 else 1
return results, total_metrics
2025-12-23 15:07:19 +08:00
def load_data(file_path: str) -> List[Dict[str, Any]]:
"""加载数据文件
支持 .jsonl (JSON Lines) .json (JSON Array) 格式
"""
data = []
file_ext = os.path.splitext(file_path)[1].lower()
if file_ext == '.jsonl':
# JSON Lines格式每行一个JSON对象
with open(file_path, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if line:
try:
data.append(json.loads(line))
except json.JSONDecodeError:
print(f"Warning: Failed to parse line: {line[:100]}")
elif file_ext == '.json':
# JSON Array格式一个包含多个JSON对象的数组
with open(file_path, 'r', encoding='utf-8') as f:
try:
json_data = json.load(f)
if isinstance(json_data, list):
data = json_data
else:
print(f"Warning: JSON file does not contain an array: {file_path}")
except json.JSONDecodeError as e:
print(f"Warning: Failed to parse JSON file: {file_path}, error: {e}")
else:
print(f"Warning: Unsupported file format: {file_ext}, only .json and .jsonl are supported")
return data
def evaluate_dataset(data: List[Dict[str, Any]], evaluator: ModelEvaluator, use_real_llm: bool = False) -> Tuple[List[Dict], Dict[str, float]]:
"""评估整个数据集
Args:
data: 数据列表
evaluator: 评估器实例
use_real_llm: 是否使用真实LLM评估默认False使用模拟评估
"""
results = []
total_metrics = {
'bleu_score': 0.0,
'rouge_l_score': 0.0,
'character_overlap_rate': 0.0,
'length_similarity': 0.0,
'exact_match_rate': 0.0,
'keyword_overlap_rate': 0.0,
'llm_score': 0.0
}
print(f"\n开始评估 {len(data)} 条数据...")
if use_real_llm:
print("注意LLM评分功能使用真实的大语言模型API")
print("配置来源llm_config.py")
else:
print("注意LLM评分功能使用模拟评估基于传统指标的综合评分")
print("配置来源llm_config.py 中的 EVALUATION_WEIGHTS")
for idx, item in enumerate(tqdm(data, desc="评估进度")):
# 支持多种字段名格式
input_text = item.get('question', item.get('Input', item.get('问题', '')))
output_text = item.get('output', item.get('Output', item.get('生成答案', '')))
answer_text = item.get('answer', item.get('Answer', item.get('参考答案', '')))
# 计算各项指标
metrics = evaluator.evaluate_all_metrics(answer_text, output_text)
# 获取LLM评估
if use_real_llm:
try:
prompt = evaluator.get_llm_evaluation_prompt(answer_text, output_text, input_text)
llm_score, llm_reason = evaluator.call_llm_for_evaluation(prompt)
except Exception as e:
print(f" [Warning] LLM API调用失败使用模拟评估: {e}")
llm_score, llm_reason = evaluator.get_mock_llm_evaluation(answer_text, output_text, input_text)
else:
llm_score, llm_reason = evaluator.get_mock_llm_evaluation(answer_text, output_text, input_text)
# 添加原始数据
result = {
'index': idx + 1,
'Input': input_text,
'Output': output_text,
'Answer': answer_text,
**metrics,
'llm_score': llm_score,
'llm_reason': llm_reason
}
results.append(result)
# 累加指标
for key, value in metrics.items():
total_metrics[key] += value
total_metrics['llm_score'] += llm_score
# 计算平均值
num_samples = len(data)
for key in total_metrics:
total_metrics[key] /= num_samples if num_samples > 0 else 1
return results, total_metrics
def save_to_excel(results: List[Dict], total_metrics: Dict, output_dir: str, filename: str):
"""保存结果到Excel文件"""
if not os.path.exists(output_dir):
os.makedirs(output_dir)
# 创建DataFrame
df = pd.DataFrame(results)
# 重命名列名
column_mapping = {
'index': '序号',
'Input': '问题',
'Answer': '参考答案',
'Output': '生成答案',
'bleu_score': 'BLEU',
'rouge_l_score': 'ROUGE-L',
'character_overlap_rate': '字符重叠率',
'length_similarity': '长度相似度',
'exact_match_rate': '完全匹配率',
'keyword_overlap_rate': '关键词重叠率',
'llm_score': 'LLM评分',
'llm_reason': 'LLM评价理由'
}
df = df.rename(columns=column_mapping)
# 计算整体统计信息
stats_df = pd.DataFrame([total_metrics])
stats_column_mapping = {
'bleu_score': 'BLEU',
'rouge_l_score': 'ROUGE-L',
'character_overlap_rate': '字符重叠率',
'length_similarity': '长度相似度',
'exact_match_rate': '完全匹配率',
'keyword_overlap_rate': '关键词重叠率',
'llm_score': 'LLM评分'
}
stats_df = stats_df.rename(columns=stats_column_mapping)
# 保存到Excel
excel_path = os.path.join(output_dir, filename)
with pd.ExcelWriter(excel_path, engine='openpyxl') as writer:
# 写入详细结果
df.to_excel(writer, sheet_name='详细结果', index=False)
# 写入统计信息
stats_df.to_excel(writer, sheet_name='整体统计', index=False)
print(f"\n结果已保存到: {excel_path}")
return excel_path
def print_summary(total_metrics: Dict):
"""打印评估摘要"""
print("\n" + "="*60)
print("模型评估结果摘要")
print("="*60)
print(f"BLEU分数: {total_metrics['bleu_score']:.4f}")
print(f"ROUGE-L分数: {total_metrics['rouge_l_score']:.4f}")
print(f"字符重叠率: {total_metrics['character_overlap_rate']:.4f}")
print(f"长度相似度: {total_metrics['length_similarity']:.4f}")
print(f"完全匹配率: {total_metrics['exact_match_rate']:.4f}")
print(f"关键词重叠率: {total_metrics['keyword_overlap_rate']:.4f}")
print(f"LLM评分: {total_metrics['llm_score']:.4f}")
print("="*60)
# 计算综合评分(加权平均)
weights = {
'bleu_score': 0.2,
'rouge_l_score': 0.25,
'character_overlap_rate': 0.15,
'length_similarity': 0.1,
'exact_match_rate': 0.15,
'keyword_overlap_rate': 0.15
}
composite_score = sum(total_metrics[key] * weight for key, weight in weights.items())
print(f"综合评分: {composite_score:.4f}")
print("="*60)
def main():
"""主函数"""
print("大模型微调验证系统")
print("="*60)
# 获取CPU核心数
cpu_count = get_cpu_count()
print(f"检测到CPU核心数: {cpu_count}")
2025-12-23 15:07:19 +08:00
# 打印当前配置信息
print_config_info()
# 显示当前并发配置
if MAX_CONCURRENT_WORKERS is not None:
print(f"\n配置文件中的并发设置: {MAX_CONCURRENT_WORKERS}")
else:
print(f"\n未配置并发数将使用CPU核心数: {cpu_count}")
# 询问是否使用并发
print("\n" + "="*60)
use_parallel = input("是否使用并发评估?(y/n默认为y): ").strip().lower()
if not use_parallel or use_parallel == 'y':
use_parallel = True
# 使用配置文件中的设置如果未配置则使用CPU核心数
if MAX_CONCURRENT_WORKERS is not None:
max_workers = MAX_CONCURRENT_WORKERS
print(f"将使用并发模式,并发数: {max_workers} (来自配置文件)")
else:
max_workers = cpu_count
print(f"将使用并发模式,并发数: {max_workers} (使用CPU核心数)")
else:
use_parallel = False
max_workers = None
print("将使用串行模式")
print("="*60)
# 数据目录
data_dir = "data"
output_dir = "outputs"
2025-12-23 15:07:19 +08:00
# 获取所有数据文件(支持 .jsonl 和 .json 格式)
data_files = [f for f in os.listdir(data_dir) if f.endswith(('.jsonl', '.json'))]
print(f"\n发现 {len(data_files)} 个数据文件:")
for file in data_files:
file_ext = os.path.splitext(file)[1]
print(f" - {file} ({file_ext})")
# 初始化评估器
evaluator = ModelEvaluator()
# 存储所有结果
all_results = []
all_stats = {}
# 逐个评估每个文件
for filename in data_files:
print(f"\n{'='*60}")
print(f"评估文件: {filename}")
print(f"{'='*60}")
file_path = os.path.join(data_dir, filename)
data = load_data(file_path)
print(f"加载数据: {len(data)} 条记录")
# 评估数据(根据用户选择使用并发或串行)
if use_parallel:
results, total_metrics = evaluate_dataset_parallel(
data,
evaluator,
use_real_llm=USE_REAL_LLM,
max_workers=max_workers
)
else:
results, total_metrics = evaluate_dataset(
data,
evaluator,
use_real_llm=USE_REAL_LLM
)
2025-12-23 15:07:19 +08:00
# 保存结果
base_name = os.path.splitext(filename)[0]
excel_filename = f"{base_name}_evaluation.xlsx"
excel_path = save_to_excel(results, total_metrics, output_dir, excel_filename)
# 存储结果
all_results.extend([{**r, 'file': filename} for r in results])
all_stats[filename] = total_metrics
# 打印摘要
print_summary(total_metrics)
# 生成汇总报告
print(f"\n{'='*60}")
print("生成汇总报告")
print(f"{'='*60}")
# 创建汇总DataFrame
summary_data = []
for filename, stats in all_stats.items():
summary_data.append({
'文件名': filename,
'BLEU分数': stats['bleu_score'],
'ROUGE-L分数': stats['rouge_l_score'],
'字符重叠率': stats['character_overlap_rate'],
'长度相似度': stats['length_similarity'],
'完全匹配率': stats['exact_match_rate'],
'关键词重叠率': stats['keyword_overlap_rate'],
'LLM评分': stats['llm_score']
})
summary_df = pd.DataFrame(summary_data)
# 计算所有文件的平均分
avg_scores = summary_df.select_dtypes(include=[np.number]).mean()
avg_df = pd.DataFrame([avg_scores])
avg_df.index = ['平均分']
# 保存汇总报告
summary_path = os.path.join(output_dir, "evaluation_summary.xlsx")
with pd.ExcelWriter(summary_path, engine='openpyxl') as writer:
summary_df.to_excel(writer, sheet_name='各文件评分', index=False)
avg_df.to_excel(writer, sheet_name='平均分')
print(f"汇总报告已保存到: {summary_path}")
# 打印最终汇总
print(f"\n{'='*60}")
print("所有文件评估结果汇总")
print(f"{'='*60}")
print(summary_df.to_string(index=False, float_format='%.4f'))
print(f"{'='*60}")
print("平均分:")
print(avg_df.to_string(float_format='%.4f'))
print(f"{'='*60}")
# 保存所有详细结果
all_results_path = os.path.join(output_dir, "all_detailed_results.xlsx")
all_results_df = pd.DataFrame(all_results)
# 重命名列名
all_column_mapping = {
'index': '序号',
'Input': '问题',
'Answer': '参考答案',
'Output': '生成答案',
'bleu_score': 'BLEU',
'rouge_l_score': 'ROUGE-L',
'character_overlap_rate': '字符重叠率',
'length_similarity': '长度相似度',
'exact_match_rate': '完全匹配率',
'keyword_overlap_rate': '关键词重叠率',
'llm_score': 'LLM评分',
'llm_reason': 'LLM评价理由'
}
all_results_df = all_results_df.rename(columns=all_column_mapping)
all_results_df.to_excel(all_results_path, index=False, engine='openpyxl')
print(f"\n所有详细结果已保存到: {all_results_path}")
# =============================================================================
# 测试函数
# =============================================================================
def test_single_evaluation():
"""测试单个评估"""
from llm_config import USE_REAL_LLM
evaluator = ModelEvaluator()
# 测试数据
question = "什么是合同?"
reference_answer = "合同是当事人之间设立、变更、终止民事法律关系的协议。"
candidate_answer = "合同是双方或多方之间达成的协议,用于约定权利和义务。"
print("="*60)
print("LLM评估测试")
print("="*60)
print(f"评估模式: {'真实LLM API' if USE_REAL_LLM else '模拟评估'}")
print(f"问题: {question}")
print(f"参考答案: {reference_answer}")
print(f"生成答案: {candidate_answer}")
print("-"*60)
try:
# 获取LLM评估
prompt = evaluator.get_llm_evaluation_prompt(reference_answer, candidate_answer, question)
if USE_REAL_LLM:
print("尝试调用真实LLM API...")
score, reason = evaluator.call_llm_for_evaluation(prompt)
# 同时计算传统指标
print("\n传统指标计算:")
all_metrics = evaluator.evaluate_all_metrics(reference_answer, candidate_answer)
for key, value in all_metrics.items():
print(f" {key}: {value:.4f}")
else:
print("使用模拟评估...")
score, reason = evaluator.get_mock_llm_evaluation(reference_answer, candidate_answer, question)
print(f"\n[SUCCESS] 评估成功!")
print(f"评分: {score}/10")
print(f"评价理由: {reason}")
return True
except Exception as e:
print(f"\n[ERROR] 评估失败: {e}")
print("\n详细错误信息:")
import traceback
traceback.print_exc()
return False
if __name__ == "__main__":
main()