Files
YG_LLM_TESTER/model_evaluation.py
DESKTOP-72TV0V4\caoxiaozhu 07ef4fb49a 1. 修改了重试次数
2. 增加了readme和requirements.txt
2025-12-24 11:20:06 +08:00

948 lines
34 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
大模型微调验证系统
用于评估模型输出与参考答案的相似度和质量
LLM评估配置
- 默认使用模拟评估(基于传统指标)
- 如需使用真实LLM请配置 llm_config.py 文件
"""
import json
import os
import re
import time
import multiprocessing as mp
from collections import Counter
from typing import Dict, List, Tuple, Any
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
import numpy as np
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
from nltk.tokenize import word_tokenize
import jieba
import jieba.posseg as pseg
from difflib import SequenceMatcher
import pandas as pd
from tqdm import tqdm
import requests
# 导入LLM配置
from llm_config import (
USE_REAL_LLM,
MAX_CONCURRENT_WORKERS,
SHOW_DETAILED_PROGRESS,
MAX_API_RETRIES,
RETRY_DELAY,
OPENAI_CONFIG,
EVALUATION_WEIGHTS,
PROMPT_TEMPLATE,
validate_openai_config,
print_config_info
)
class ModelEvaluator:
"""模型评估器"""
def __init__(self):
# 初始化jieba分词
jieba.initialize()
# 停用词列表(中文常见停用词)
self.stopwords = {
'', '', '', '', '', '', '', '', '', '', '', '', '一个',
'', '', '', '', '', '', '', '', '', '', '没有', '', '',
'自己', '', '', '', '', '', '', '', '', '时候', '', '如果'
}
# 初始化平滑函数
self.smoothing_function = SmoothingFunction()
def get_llm_evaluation_prompt(self, reference: str, candidate: str, question: str = "") -> str:
"""生成LLM评估提示词"""
# 使用配置中的提示词模板
return PROMPT_TEMPLATE.format(
question=question,
reference=reference,
candidate=candidate
)
def call_llm_for_evaluation(self, prompt: str, max_retries: int = 3, retry_delay: float = 1.0) -> Tuple[int, str]:
"""调用大语言模型进行评估(带自动重试)
使用配置:
- API配置来自 llm_config.py 中的 OPENAI_CONFIG
- 支持环境变量和直接配置
参数:
- prompt: 评估提示词
- max_retries: 最大重试次数默认3次
- retry_delay: 重试延迟时间默认1秒
配置方法:
1. 设置环境变量export OPENAI_API_KEY='your-api-key'
2. 在 llm_config.py 中直接修改 OPENAI_CONFIG
"""
# 验证配置
config = validate_openai_config()
# 调用API使用传统HTTP请求带重试机制
for attempt in range(max_retries + 1):
try:
# 构造请求体
payload = {
"model": config["model"],
"messages": [
{"role": "system", "content": "你是一个专业的文本质量评估专家。"},
{"role": "user", "content": prompt}
],
"temperature": config["temperature"],
"max_tokens": config["max_tokens"]
}
# 发送HTTP请求
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {config['api_key']}"
}
response = requests.post(
f"{config['api_base']}/chat/completions",
headers=headers,
json=payload,
timeout=config["timeout"]
)
# 检查响应状态
response.raise_for_status()
# 解析JSON响应
import json
response_data = response.json()
content = response_data["choices"][0]["message"]["content"]
try:
result = json.loads(content)
score = int(result.get("score", 0))
reason = str(result.get("reason", ""))
return score, reason
except (json.JSONDecodeError, KeyError, ValueError):
# 如果无法解析JSON尝试从文本中提取
# 这里可以添加正则表达式来提取评分
raise ValueError(f"无法解析LLM响应{content}")
except (requests.exceptions.RequestException, requests.exceptions.Timeout,
requests.exceptions.ConnectionError, RuntimeError) as e:
# 网络相关错误,可以重试
if attempt < max_retries:
wait_time = retry_delay * (2 ** attempt) # 指数退避
print(f"API调用失败尝试 {attempt + 1}/{max_retries + 1}{wait_time:.1f}秒后重试: {str(e)[:100]}")
time.sleep(wait_time)
continue
else:
print(f"API调用失败已重试 {max_retries}")
raise RuntimeError(f"API调用失败已重试{max_retries}次):{str(e)}")
except Exception as e:
# 其他错误如JSON解析错误不重试
print(f"API调用失败不可重试的错误: {str(e)}")
raise RuntimeError(f"API调用失败{str(e)}")
# 这里不应该到达,但如果到达了,抛出异常
raise RuntimeError("API调用失败达到最大重试次数")
def get_mock_llm_evaluation(self, reference: str, candidate: str, question: str = "") -> Tuple[int, str]:
"""获取模拟的LLM评估结果用于演示
使用配置:
- 权重来自 llm_config.py 中的 EVALUATION_WEIGHTS
"""
# 计算一些基本指标作为参考
bleu = self.calculate_bleu_score(reference, candidate)
rouge_l = self.calculate_rouge_l(reference, candidate)
exact_match = self.calculate_exact_match_rate(reference, candidate)
keyword_overlap = self.calculate_keyword_overlap(reference, candidate)
# 使用配置中的权重进行加权计算
composite_score = (
bleu * EVALUATION_WEIGHTS['bleu_score'] +
rouge_l * EVALUATION_WEIGHTS['rouge_l_score'] +
exact_match * EVALUATION_WEIGHTS['exact_match_rate'] +
keyword_overlap * EVALUATION_WEIGHTS['keyword_overlap_rate']
)
score = int(composite_score * 10)
# 生成评价理由
reason_parts = []
if exact_match > 0.9:
reason_parts.append("生成答案与参考答案内容完全一致")
elif bleu > 0.1 or rouge_l > 0.3:
reason_parts.append("生成答案在内容上与参考答案较为一致")
else:
reason_parts.append("生成答案与参考答案存在较大差异")
if keyword_overlap > 0.5:
reason_parts.append("关键词重叠度较高")
elif keyword_overlap > 0.3:
reason_parts.append("关键词重叠度中等")
else:
reason_parts.append("关键词重叠度较低")
reason = "".join(reason_parts) + f"。综合指标BLEU={bleu:.3f}, ROUGE-L={rouge_l:.3f}, 完全匹配率={exact_match:.3f}, 关键词重叠率={keyword_overlap:.3f}"
return min(10, max(1, score)), reason
def tokenize_chinese(self, text: str) -> List[str]:
"""中文分词"""
if not text:
return []
# 使用jieba进行分词
words = jieba.cut(text)
return [word.strip() for word in words if len(word.strip()) > 0]
def get_keywords(self, text: str) -> List[str]:
"""提取关键词(名词、动词、形容词)"""
if not text:
return []
words = pseg.cut(text)
keywords = []
for word, flag in words:
# 选择名词(n)、动词(v)、形容词(a)、区别词(b)等作为关键词
if flag.startswith(('n', 'v', 'a', 'b', 'i')) and word not in self.stopwords and len(word) > 1:
keywords.append(word)
return keywords
def calculate_bleu_score(self, reference: str, candidate: str) -> float:
"""计算BLEU分数"""
if not reference or not candidate:
return 0.0
# 分词
ref_tokens = self.tokenize_chinese(reference)
cand_tokens = self.tokenize_chinese(candidate)
if not cand_tokens:
return 0.0
# 计算BLEU分数使用1-gram到4-gram
try:
score = sentence_bleu(
[ref_tokens],
cand_tokens,
smoothing_function=self.smoothing_function.method1
)
return score
except:
return 0.0
def calculate_rouge_l(self, reference: str, candidate: str) -> float:
"""计算ROUGE-L分数"""
if not reference or not candidate:
return 0.0
ref_tokens = self.tokenize_chinese(reference)
cand_tokens = self.tokenize_chinese(candidate)
if not ref_tokens or not cand_tokens:
return 0.0
# 计算最长公共子序列
lcs = self._lcs_length(ref_tokens, cand_tokens)
# 计算ROUGE-L F1分数
ref_len = len(ref_tokens)
cand_len = len(cand_tokens)
if ref_len == 0 or cand_len == 0:
return 0.0
precision = lcs / cand_len
recall = lcs / ref_len
if precision + recall == 0:
return 0.0
rouge_l = (2 * precision * recall) / (precision + recall)
return rouge_l
def _lcs_length(self, seq1: List[str], seq2: List[str]) -> int:
"""计算最长公共子序列长度"""
m, n = len(seq1), len(seq2)
dp = [[0] * (n + 1) for _ in range(m + 1)]
for i in range(1, m + 1):
for j in range(1, n + 1):
if seq1[i-1] == seq2[j-1]:
dp[i][j] = dp[i-1][j-1] + 1
else:
dp[i][j] = max(dp[i-1][j], dp[i][j-1])
return dp[m][n]
def calculate_character_overlap(self, reference: str, candidate: str) -> float:
"""计算字符重叠率"""
if not reference or not candidate:
return 0.0
ref_chars = set(reference)
cand_chars = set(candidate)
if not cand_chars:
return 0.0
overlap = len(ref_chars & cand_chars)
total_cand = len(cand_chars)
return overlap / total_cand if total_cand > 0 else 0.0
def calculate_length_similarity(self, reference: str, candidate: str) -> float:
"""计算长度相似度"""
if not reference or not candidate:
return 0.0
ref_len = len(reference)
cand_len = len(candidate)
if ref_len == 0 and cand_len == 0:
return 1.0
if ref_len == 0 or cand_len == 0:
return 0.0
# 使用相对差异计算相似度
similarity = 1 - abs(ref_len - cand_len) / max(ref_len, cand_len)
return max(0, similarity)
def calculate_exact_match_rate(self, reference: str, candidate: str) -> float:
"""计算完全匹配率"""
if not reference and not candidate:
return 1.0
if not reference or not candidate:
return 0.0
# 去除空白字符后比较
ref_clean = re.sub(r'\s+', '', reference.strip())
cand_clean = re.sub(r'\s+', '', candidate.strip())
return 1.0 if ref_clean == cand_clean else 0.0
def calculate_keyword_overlap(self, reference: str, candidate: str) -> float:
"""计算关键词重叠率"""
ref_keywords = set(self.get_keywords(reference))
cand_keywords = set(self.get_keywords(candidate))
if not cand_keywords:
return 0.0
overlap = len(ref_keywords & cand_keywords)
total_cand = len(cand_keywords)
return overlap / total_cand if total_cand > 0 else 0.0
def evaluate_all_metrics(self, reference: str, candidate: str) -> Dict[str, float]:
"""计算所有评估指标"""
return {
'bleu_score': self.calculate_bleu_score(reference, candidate),
'rouge_l_score': self.calculate_rouge_l(reference, candidate),
'character_overlap_rate': self.calculate_character_overlap(reference, candidate),
'length_similarity': self.calculate_length_similarity(reference, candidate),
'exact_match_rate': self.calculate_exact_match_rate(reference, candidate),
'keyword_overlap_rate': self.calculate_keyword_overlap(reference, candidate)
}
def get_cpu_count():
"""获取CPU核心数"""
try:
return mp.cpu_count()
except:
return 4 # 默认值
def evaluate_single_item(args):
"""单条数据评估函数(用于并发处理)"""
idx, item, evaluator, use_real_llm = args
# 支持多种字段名格式
input_text = item.get('question', item.get('Input', item.get('问题', '')))
output_text = item.get('output', item.get('Output', item.get('生成答案', '')))
answer_text = item.get('answer', item.get('Answer', item.get('参考答案', '')))
# 计算各项指标
metrics = evaluator.evaluate_all_metrics(answer_text, output_text)
# 获取LLM评估
if use_real_llm:
try:
prompt = evaluator.get_llm_evaluation_prompt(answer_text, output_text, input_text)
# 使用配置文件中的重试参数
llm_score, llm_reason = evaluator.call_llm_for_evaluation(
prompt,
max_retries=MAX_API_RETRIES,
retry_delay=RETRY_DELAY
)
except Exception as e:
# 静默处理错误,返回模拟评估结果
llm_score, llm_reason = evaluator.get_mock_llm_evaluation(answer_text, output_text, input_text)
else:
llm_score, llm_reason = evaluator.get_mock_llm_evaluation(answer_text, output_text, input_text)
# 添加原始数据
result = {
'index': idx + 1,
'Input': input_text,
'Output': output_text,
'Answer': answer_text,
**metrics,
'llm_score': llm_score,
'llm_reason': llm_reason
}
return result
def evaluate_dataset_parallel(data: List[Dict[str, Any]], evaluator: ModelEvaluator, use_real_llm: bool = False, max_workers: int = None) -> Tuple[List[Dict], Dict[str, float]]:
"""并发评估整个数据集
Args:
data: 数据列表
evaluator: 评估器实例
use_real_llm: 是否使用真实LLM评估
max_workers: 最大并发数默认使用CPU核心数
"""
results = []
total_metrics = {
'bleu_score': 0.0,
'rouge_l_score': 0.0,
'character_overlap_rate': 0.0,
'length_similarity': 0.0,
'exact_match_rate': 0.0,
'keyword_overlap_rate': 0.0,
'llm_score': 0.0
}
# 获取并发数(优先级:参数 > 配置文件 > CPU核心数
if max_workers is not None:
# 直接使用传入的参数
pass
elif MAX_CONCURRENT_WORKERS is not None:
# 使用配置文件中的设置
max_workers = MAX_CONCURRENT_WORKERS
else:
# 默认使用CPU核心数
max_workers = get_cpu_count()
print(f"\n开始并发评估 {len(data)} 条数据,使用 {max_workers} 个并发线程...")
if use_real_llm:
print("注意LLM评分功能使用真实的大语言模型API")
print("配置来源llm_config.py")
print(f"并发数: {max_workers}")
else:
print("注意LLM评分功能使用模拟评估基于传统指标的综合评分")
print("配置来源llm_config.py 中的 EVALUATION_WEIGHTS")
print(f"并发数: {max_workers}")
# 使用ThreadPoolExecutor进行并发评估按批次处理
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# 创建总进度条
total_pbar = tqdm(total=len(data), desc="总进度", position=0, leave=True)
# 分批处理数据
batch_size = max_workers # 每批的大小等于并发数
batches = [data[i:i + batch_size] for i in range(0, len(data), batch_size)]
total_batches = len(batches)
# 处理每个批次(动态创建进度条)
for batch_idx, batch_data in enumerate(batches):
batch_num = batch_idx + 1
# 动态创建当前批次的进度条
current_batch_pbars = [] # 当前批次的进度条列表
if SHOW_DETAILED_PROGRESS:
for worker_idx in range(len(batch_data)):
pbar = tqdm(
total=1,
desc=f"批次{batch_num}-并发{worker_idx + 1}: 等待任务",
position=worker_idx + 1, # 从位置1开始
leave=False
)
current_batch_pbars.append(pbar)
# 提交当前批次的所有任务
future_to_info = {} # future -> (item_idx, worker_idx)
for item_idx, item in enumerate(batch_data):
worker_idx = item_idx # 当前批次内的worker索引
global_idx = batch_idx * batch_size + item_idx # 全局索引
future = executor.submit(evaluate_single_item, (global_idx, item, evaluator, use_real_llm))
future_to_info[future] = (global_idx, worker_idx)
# 更新批次进度条
if SHOW_DETAILED_PROGRESS:
pbar = current_batch_pbars[worker_idx]
pbar.set_description(f"批次{batch_num}-并发{worker_idx + 1}: 任务{global_idx + 1}")
pbar.refresh()
# 等待当前批次的所有任务完成
for future in as_completed(future_to_info):
global_idx, worker_idx = future_to_info[future]
try:
result = future.result()
results.append(result)
# 更新批次进度条状态
if SHOW_DETAILED_PROGRESS:
pbar = current_batch_pbars[worker_idx]
pbar.update(1)
pbar.set_description(f"批次{batch_num}-并发{worker_idx + 1}: 任务{global_idx + 1} [完成]")
pbar.refresh()
pbar.close()
# 累加指标
for key in total_metrics:
if key in result:
total_metrics[key] += result[key]
# 更新总进度条
total_pbar.update(1)
except Exception as e:
print(f" [Warning] 任务{global_idx + 1}评估失败: {e}")
if SHOW_DETAILED_PROGRESS:
pbar = current_batch_pbars[worker_idx]
pbar.set_description(f"批次{batch_num}-并发{worker_idx + 1}: 任务{global_idx + 1} [失败]")
pbar.refresh()
pbar.close()
total_pbar.update(1)
# 当前批次完成,关闭该批次的进度条
if SHOW_DETAILED_PROGRESS:
for pbar in current_batch_pbars:
pbar.close()
# 批次间隔(可选)
if batch_idx < total_batches - 1:
time.sleep(0.1) # 短暂间隔
# 关闭总进度条
total_pbar.close()
# 按原始顺序排序
results.sort(key=lambda x: x['index'])
# 计算平均值
num_samples = len(data)
for key in total_metrics:
total_metrics[key] /= num_samples if num_samples > 0 else 1
return results, total_metrics
def load_data(file_path: str) -> List[Dict[str, Any]]:
"""加载数据文件
支持 .jsonl (JSON Lines) 和 .json (JSON Array) 格式
"""
data = []
file_ext = os.path.splitext(file_path)[1].lower()
if file_ext == '.jsonl':
# JSON Lines格式每行一个JSON对象
with open(file_path, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if line:
try:
data.append(json.loads(line))
except json.JSONDecodeError:
print(f"Warning: Failed to parse line: {line[:100]}")
elif file_ext == '.json':
# JSON Array格式一个包含多个JSON对象的数组
with open(file_path, 'r', encoding='utf-8') as f:
try:
json_data = json.load(f)
if isinstance(json_data, list):
data = json_data
else:
print(f"Warning: JSON file does not contain an array: {file_path}")
except json.JSONDecodeError as e:
print(f"Warning: Failed to parse JSON file: {file_path}, error: {e}")
else:
print(f"Warning: Unsupported file format: {file_ext}, only .json and .jsonl are supported")
return data
def evaluate_dataset(data: List[Dict[str, Any]], evaluator: ModelEvaluator, use_real_llm: bool = False) -> Tuple[List[Dict], Dict[str, float]]:
"""评估整个数据集
Args:
data: 数据列表
evaluator: 评估器实例
use_real_llm: 是否使用真实LLM评估默认False使用模拟评估
"""
results = []
total_metrics = {
'bleu_score': 0.0,
'rouge_l_score': 0.0,
'character_overlap_rate': 0.0,
'length_similarity': 0.0,
'exact_match_rate': 0.0,
'keyword_overlap_rate': 0.0,
'llm_score': 0.0
}
print(f"\n开始评估 {len(data)} 条数据...")
if use_real_llm:
print("注意LLM评分功能使用真实的大语言模型API")
print("配置来源llm_config.py")
else:
print("注意LLM评分功能使用模拟评估基于传统指标的综合评分")
print("配置来源llm_config.py 中的 EVALUATION_WEIGHTS")
for idx, item in enumerate(tqdm(data, desc="评估进度")):
# 支持多种字段名格式
input_text = item.get('question', item.get('Input', item.get('问题', '')))
output_text = item.get('output', item.get('Output', item.get('生成答案', '')))
answer_text = item.get('answer', item.get('Answer', item.get('参考答案', '')))
# 计算各项指标
metrics = evaluator.evaluate_all_metrics(answer_text, output_text)
# 获取LLM评估
if use_real_llm:
try:
prompt = evaluator.get_llm_evaluation_prompt(answer_text, output_text, input_text)
llm_score, llm_reason = evaluator.call_llm_for_evaluation(prompt)
except Exception as e:
print(f" [Warning] LLM API调用失败使用模拟评估: {e}")
llm_score, llm_reason = evaluator.get_mock_llm_evaluation(answer_text, output_text, input_text)
else:
llm_score, llm_reason = evaluator.get_mock_llm_evaluation(answer_text, output_text, input_text)
# 添加原始数据
result = {
'index': idx + 1,
'Input': input_text,
'Output': output_text,
'Answer': answer_text,
**metrics,
'llm_score': llm_score,
'llm_reason': llm_reason
}
results.append(result)
# 累加指标
for key, value in metrics.items():
total_metrics[key] += value
total_metrics['llm_score'] += llm_score
# 计算平均值
num_samples = len(data)
for key in total_metrics:
total_metrics[key] /= num_samples if num_samples > 0 else 1
return results, total_metrics
def save_to_excel(results: List[Dict], total_metrics: Dict, output_dir: str, filename: str):
"""保存结果到Excel文件"""
if not os.path.exists(output_dir):
os.makedirs(output_dir)
# 创建DataFrame
df = pd.DataFrame(results)
# 重命名列名
column_mapping = {
'index': '序号',
'Input': '问题',
'Answer': '参考答案',
'Output': '生成答案',
'bleu_score': 'BLEU',
'rouge_l_score': 'ROUGE-L',
'character_overlap_rate': '字符重叠率',
'length_similarity': '长度相似度',
'exact_match_rate': '完全匹配率',
'keyword_overlap_rate': '关键词重叠率',
'llm_score': 'LLM评分',
'llm_reason': 'LLM评价理由'
}
df = df.rename(columns=column_mapping)
# 计算整体统计信息
stats_df = pd.DataFrame([total_metrics])
stats_column_mapping = {
'bleu_score': 'BLEU',
'rouge_l_score': 'ROUGE-L',
'character_overlap_rate': '字符重叠率',
'length_similarity': '长度相似度',
'exact_match_rate': '完全匹配率',
'keyword_overlap_rate': '关键词重叠率',
'llm_score': 'LLM评分'
}
stats_df = stats_df.rename(columns=stats_column_mapping)
# 保存到Excel
excel_path = os.path.join(output_dir, filename)
with pd.ExcelWriter(excel_path, engine='openpyxl') as writer:
# 写入详细结果
df.to_excel(writer, sheet_name='详细结果', index=False)
# 写入统计信息
stats_df.to_excel(writer, sheet_name='整体统计', index=False)
print(f"\n结果已保存到: {excel_path}")
return excel_path
def print_summary(total_metrics: Dict):
"""打印评估摘要"""
print("\n" + "="*60)
print("模型评估结果摘要")
print("="*60)
print(f"BLEU分数: {total_metrics['bleu_score']:.4f}")
print(f"ROUGE-L分数: {total_metrics['rouge_l_score']:.4f}")
print(f"字符重叠率: {total_metrics['character_overlap_rate']:.4f}")
print(f"长度相似度: {total_metrics['length_similarity']:.4f}")
print(f"完全匹配率: {total_metrics['exact_match_rate']:.4f}")
print(f"关键词重叠率: {total_metrics['keyword_overlap_rate']:.4f}")
print(f"LLM评分: {total_metrics['llm_score']:.4f}")
print("="*60)
# 计算综合评分(加权平均)
weights = {
'bleu_score': 0.2,
'rouge_l_score': 0.25,
'character_overlap_rate': 0.15,
'length_similarity': 0.1,
'exact_match_rate': 0.15,
'keyword_overlap_rate': 0.15
}
composite_score = sum(total_metrics[key] * weight for key, weight in weights.items())
print(f"综合评分: {composite_score:.4f}")
print("="*60)
def main():
"""主函数"""
print("大模型微调验证系统")
print("="*60)
# 获取CPU核心数
cpu_count = get_cpu_count()
print(f"检测到CPU核心数: {cpu_count}")
# 打印当前配置信息
print_config_info()
# 显示当前并发配置
if MAX_CONCURRENT_WORKERS is not None:
print(f"\n配置文件中的并发设置: {MAX_CONCURRENT_WORKERS}")
else:
print(f"\n未配置并发数将使用CPU核心数: {cpu_count}")
# 询问是否使用并发
print("\n" + "="*60)
use_parallel = input("是否使用并发评估?(y/n默认为y): ").strip().lower()
if not use_parallel or use_parallel == 'y':
use_parallel = True
# 使用配置文件中的设置如果未配置则使用CPU核心数
if MAX_CONCURRENT_WORKERS is not None:
max_workers = MAX_CONCURRENT_WORKERS
print(f"将使用并发模式,并发数: {max_workers} (来自配置文件)")
else:
max_workers = cpu_count
print(f"将使用并发模式,并发数: {max_workers} (使用CPU核心数)")
else:
use_parallel = False
max_workers = None
print("将使用串行模式")
print("="*60)
# 数据目录
data_dir = "data"
output_dir = "outputs"
# 获取所有数据文件(支持 .jsonl 和 .json 格式)
data_files = [f for f in os.listdir(data_dir) if f.endswith(('.jsonl', '.json'))]
print(f"\n发现 {len(data_files)} 个数据文件:")
for file in data_files:
file_ext = os.path.splitext(file)[1]
print(f" - {file} ({file_ext})")
# 初始化评估器
evaluator = ModelEvaluator()
# 存储所有结果
all_results = []
all_stats = {}
# 逐个评估每个文件
for filename in data_files:
print(f"\n{'='*60}")
print(f"评估文件: {filename}")
print(f"{'='*60}")
file_path = os.path.join(data_dir, filename)
data = load_data(file_path)
print(f"加载数据: {len(data)} 条记录")
# 评估数据(根据用户选择使用并发或串行)
if use_parallel:
results, total_metrics = evaluate_dataset_parallel(
data,
evaluator,
use_real_llm=USE_REAL_LLM,
max_workers=max_workers
)
else:
results, total_metrics = evaluate_dataset(
data,
evaluator,
use_real_llm=USE_REAL_LLM
)
# 保存结果
base_name = os.path.splitext(filename)[0]
excel_filename = f"{base_name}_evaluation.xlsx"
excel_path = save_to_excel(results, total_metrics, output_dir, excel_filename)
# 存储结果
all_results.extend([{**r, 'file': filename} for r in results])
all_stats[filename] = total_metrics
# 打印摘要
print_summary(total_metrics)
# 生成汇总报告
print(f"\n{'='*60}")
print("生成汇总报告")
print(f"{'='*60}")
# 创建汇总DataFrame
summary_data = []
for filename, stats in all_stats.items():
summary_data.append({
'文件名': filename,
'BLEU分数': stats['bleu_score'],
'ROUGE-L分数': stats['rouge_l_score'],
'字符重叠率': stats['character_overlap_rate'],
'长度相似度': stats['length_similarity'],
'完全匹配率': stats['exact_match_rate'],
'关键词重叠率': stats['keyword_overlap_rate'],
'LLM评分': stats['llm_score']
})
summary_df = pd.DataFrame(summary_data)
# 计算所有文件的平均分
avg_scores = summary_df.select_dtypes(include=[np.number]).mean()
avg_df = pd.DataFrame([avg_scores])
avg_df.index = ['平均分']
# 保存汇总报告
summary_path = os.path.join(output_dir, "evaluation_summary.xlsx")
with pd.ExcelWriter(summary_path, engine='openpyxl') as writer:
summary_df.to_excel(writer, sheet_name='各文件评分', index=False)
avg_df.to_excel(writer, sheet_name='平均分')
print(f"汇总报告已保存到: {summary_path}")
# 打印最终汇总
print(f"\n{'='*60}")
print("所有文件评估结果汇总")
print(f"{'='*60}")
print(summary_df.to_string(index=False, float_format='%.4f'))
print(f"{'='*60}")
print("平均分:")
print(avg_df.to_string(float_format='%.4f'))
print(f"{'='*60}")
# 保存所有详细结果
all_results_path = os.path.join(output_dir, "all_detailed_results.xlsx")
all_results_df = pd.DataFrame(all_results)
# 重命名列名
all_column_mapping = {
'index': '序号',
'Input': '问题',
'Answer': '参考答案',
'Output': '生成答案',
'bleu_score': 'BLEU',
'rouge_l_score': 'ROUGE-L',
'character_overlap_rate': '字符重叠率',
'length_similarity': '长度相似度',
'exact_match_rate': '完全匹配率',
'keyword_overlap_rate': '关键词重叠率',
'llm_score': 'LLM评分',
'llm_reason': 'LLM评价理由'
}
all_results_df = all_results_df.rename(columns=all_column_mapping)
all_results_df.to_excel(all_results_path, index=False, engine='openpyxl')
print(f"\n所有详细结果已保存到: {all_results_path}")
# =============================================================================
# 测试函数
# =============================================================================
def test_single_evaluation():
"""测试单个评估"""
from llm_config import USE_REAL_LLM
evaluator = ModelEvaluator()
# 测试数据
question = "什么是合同?"
reference_answer = "合同是当事人之间设立、变更、终止民事法律关系的协议。"
candidate_answer = "合同是双方或多方之间达成的协议,用于约定权利和义务。"
print("="*60)
print("LLM评估测试")
print("="*60)
print(f"评估模式: {'真实LLM API' if USE_REAL_LLM else '模拟评估'}")
print(f"问题: {question}")
print(f"参考答案: {reference_answer}")
print(f"生成答案: {candidate_answer}")
print("-"*60)
try:
# 获取LLM评估
prompt = evaluator.get_llm_evaluation_prompt(reference_answer, candidate_answer, question)
if USE_REAL_LLM:
print("尝试调用真实LLM API...")
score, reason = evaluator.call_llm_for_evaluation(prompt)
# 同时计算传统指标
print("\n传统指标计算:")
all_metrics = evaluator.evaluate_all_metrics(reference_answer, candidate_answer)
for key, value in all_metrics.items():
print(f" {key}: {value:.4f}")
else:
print("使用模拟评估...")
score, reason = evaluator.get_mock_llm_evaluation(reference_answer, candidate_answer, question)
print(f"\n[SUCCESS] 评估成功!")
print(f"评分: {score}/10")
print(f"评价理由: {reason}")
return True
except Exception as e:
print(f"\n[ERROR] 评估失败: {e}")
print("\n详细错误信息:")
import traceback
traceback.print_exc()
return False
if __name__ == "__main__":
main()