686 lines
23 KiB
Python
686 lines
23 KiB
Python
#!/usr/bin/env python
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
大模型微调验证系统
|
||
用于评估模型输出与参考答案的相似度和质量
|
||
|
||
LLM评估配置:
|
||
- 默认使用模拟评估(基于传统指标)
|
||
- 如需使用真实LLM,请配置 llm_config.py 文件
|
||
"""
|
||
|
||
import json
|
||
import os
|
||
import re
|
||
from collections import Counter
|
||
from typing import Dict, List, Tuple, Any
|
||
|
||
import numpy as np
|
||
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
|
||
from nltk.tokenize import word_tokenize
|
||
import jieba
|
||
import jieba.posseg as pseg
|
||
from difflib import SequenceMatcher
|
||
import pandas as pd
|
||
from tqdm import tqdm
|
||
|
||
# 导入LLM配置
|
||
from llm_config import (
|
||
USE_REAL_LLM,
|
||
OPENAI_CONFIG,
|
||
EVALUATION_WEIGHTS,
|
||
PROMPT_TEMPLATE,
|
||
validate_openai_config,
|
||
print_config_info
|
||
)
|
||
|
||
|
||
class ModelEvaluator:
|
||
"""模型评估器"""
|
||
|
||
def __init__(self):
|
||
# 初始化jieba分词
|
||
jieba.initialize()
|
||
|
||
# 停用词列表(中文常见停用词)
|
||
self.stopwords = {
|
||
'的', '了', '是', '在', '我', '有', '和', '就', '不', '人', '都', '一', '一个',
|
||
'上', '也', '很', '到', '说', '要', '去', '你', '会', '着', '没有', '看', '好',
|
||
'自己', '这', '那', '能', '下', '过', '他', '来', '对', '时候', '后', '如果'
|
||
}
|
||
|
||
# 初始化平滑函数
|
||
self.smoothing_function = SmoothingFunction()
|
||
|
||
def get_llm_evaluation_prompt(self, reference: str, candidate: str, question: str = "") -> str:
|
||
"""生成LLM评估提示词"""
|
||
# 使用配置中的提示词模板
|
||
return PROMPT_TEMPLATE.format(
|
||
question=question,
|
||
reference=reference,
|
||
candidate=candidate
|
||
)
|
||
|
||
def call_llm_for_evaluation(self, prompt: str) -> Tuple[int, str]:
|
||
"""调用大语言模型进行评估
|
||
|
||
使用配置:
|
||
- API配置来自 llm_config.py 中的 OPENAI_CONFIG
|
||
- 支持环境变量和直接配置
|
||
|
||
配置方法:
|
||
1. 设置环境变量:export OPENAI_API_KEY='your-api-key'
|
||
2. 在 llm_config.py 中直接修改 OPENAI_CONFIG
|
||
"""
|
||
# 验证配置
|
||
config = validate_openai_config()
|
||
|
||
try:
|
||
from openai import OpenAI
|
||
except ImportError:
|
||
raise ImportError(
|
||
"需要安装openai库:pip install openai\n"
|
||
"详细配置请参考 llm_config.py"
|
||
)
|
||
|
||
# 初始化客户端
|
||
client = OpenAI(
|
||
api_key=config["api_key"],
|
||
base_url=config["api_base"]
|
||
)
|
||
|
||
# 调用API
|
||
try:
|
||
response = client.chat.completions.create(
|
||
model=config["model"],
|
||
messages=[
|
||
{"role": "system", "content": "你是一个专业的文本质量评估专家。"},
|
||
{"role": "user", "content": prompt}
|
||
],
|
||
temperature=config["temperature"],
|
||
max_tokens=config["max_tokens"]
|
||
)
|
||
|
||
# 解析JSON响应
|
||
import json
|
||
content = response.choices[0].message.content
|
||
try:
|
||
result = json.loads(content)
|
||
score = int(result.get("score", 0))
|
||
reason = str(result.get("reason", ""))
|
||
return score, reason
|
||
except (json.JSONDecodeError, KeyError, ValueError):
|
||
# 如果无法解析JSON,尝试从文本中提取
|
||
# 这里可以添加正则表达式来提取评分
|
||
raise ValueError(f"无法解析LLM响应:{content}")
|
||
|
||
except Exception as e:
|
||
print("api调用失败")
|
||
raise RuntimeError(f"API调用失败:{str(e)}")
|
||
|
||
def get_mock_llm_evaluation(self, reference: str, candidate: str, question: str = "") -> Tuple[int, str]:
|
||
"""获取模拟的LLM评估结果(用于演示)
|
||
|
||
使用配置:
|
||
- 权重来自 llm_config.py 中的 EVALUATION_WEIGHTS
|
||
"""
|
||
# 计算一些基本指标作为参考
|
||
bleu = self.calculate_bleu_score(reference, candidate)
|
||
rouge_l = self.calculate_rouge_l(reference, candidate)
|
||
exact_match = self.calculate_exact_match_rate(reference, candidate)
|
||
keyword_overlap = self.calculate_keyword_overlap(reference, candidate)
|
||
|
||
# 使用配置中的权重进行加权计算
|
||
composite_score = (
|
||
bleu * EVALUATION_WEIGHTS['bleu_score'] +
|
||
rouge_l * EVALUATION_WEIGHTS['rouge_l_score'] +
|
||
exact_match * EVALUATION_WEIGHTS['exact_match_rate'] +
|
||
keyword_overlap * EVALUATION_WEIGHTS['keyword_overlap_rate']
|
||
)
|
||
score = int(composite_score * 10)
|
||
|
||
# 生成评价理由
|
||
reason_parts = []
|
||
|
||
if exact_match > 0.9:
|
||
reason_parts.append("生成答案与参考答案内容完全一致")
|
||
elif bleu > 0.1 or rouge_l > 0.3:
|
||
reason_parts.append("生成答案在内容上与参考答案较为一致")
|
||
else:
|
||
reason_parts.append("生成答案与参考答案存在较大差异")
|
||
|
||
if keyword_overlap > 0.5:
|
||
reason_parts.append("关键词重叠度较高")
|
||
elif keyword_overlap > 0.3:
|
||
reason_parts.append("关键词重叠度中等")
|
||
else:
|
||
reason_parts.append("关键词重叠度较低")
|
||
|
||
reason = ";".join(reason_parts) + f"。综合指标:BLEU={bleu:.3f}, ROUGE-L={rouge_l:.3f}, 完全匹配率={exact_match:.3f}, 关键词重叠率={keyword_overlap:.3f}"
|
||
|
||
return min(10, max(1, score)), reason
|
||
|
||
def tokenize_chinese(self, text: str) -> List[str]:
|
||
"""中文分词"""
|
||
if not text:
|
||
return []
|
||
# 使用jieba进行分词
|
||
words = jieba.cut(text)
|
||
return [word.strip() for word in words if len(word.strip()) > 0]
|
||
|
||
def get_keywords(self, text: str) -> List[str]:
|
||
"""提取关键词(名词、动词、形容词)"""
|
||
if not text:
|
||
return []
|
||
|
||
words = pseg.cut(text)
|
||
keywords = []
|
||
for word, flag in words:
|
||
# 选择名词(n)、动词(v)、形容词(a)、区别词(b)等作为关键词
|
||
if flag.startswith(('n', 'v', 'a', 'b', 'i')) and word not in self.stopwords and len(word) > 1:
|
||
keywords.append(word)
|
||
return keywords
|
||
|
||
def calculate_bleu_score(self, reference: str, candidate: str) -> float:
|
||
"""计算BLEU分数"""
|
||
if not reference or not candidate:
|
||
return 0.0
|
||
|
||
# 分词
|
||
ref_tokens = self.tokenize_chinese(reference)
|
||
cand_tokens = self.tokenize_chinese(candidate)
|
||
|
||
if not cand_tokens:
|
||
return 0.0
|
||
|
||
# 计算BLEU分数(使用1-gram到4-gram)
|
||
try:
|
||
score = sentence_bleu(
|
||
[ref_tokens],
|
||
cand_tokens,
|
||
smoothing_function=self.smoothing_function.method1
|
||
)
|
||
return score
|
||
except:
|
||
return 0.0
|
||
|
||
def calculate_rouge_l(self, reference: str, candidate: str) -> float:
|
||
"""计算ROUGE-L分数"""
|
||
if not reference or not candidate:
|
||
return 0.0
|
||
|
||
ref_tokens = self.tokenize_chinese(reference)
|
||
cand_tokens = self.tokenize_chinese(candidate)
|
||
|
||
if not ref_tokens or not cand_tokens:
|
||
return 0.0
|
||
|
||
# 计算最长公共子序列
|
||
lcs = self._lcs_length(ref_tokens, cand_tokens)
|
||
|
||
# 计算ROUGE-L F1分数
|
||
ref_len = len(ref_tokens)
|
||
cand_len = len(cand_tokens)
|
||
|
||
if ref_len == 0 or cand_len == 0:
|
||
return 0.0
|
||
|
||
precision = lcs / cand_len
|
||
recall = lcs / ref_len
|
||
|
||
if precision + recall == 0:
|
||
return 0.0
|
||
|
||
rouge_l = (2 * precision * recall) / (precision + recall)
|
||
return rouge_l
|
||
|
||
def _lcs_length(self, seq1: List[str], seq2: List[str]) -> int:
|
||
"""计算最长公共子序列长度"""
|
||
m, n = len(seq1), len(seq2)
|
||
dp = [[0] * (n + 1) for _ in range(m + 1)]
|
||
|
||
for i in range(1, m + 1):
|
||
for j in range(1, n + 1):
|
||
if seq1[i-1] == seq2[j-1]:
|
||
dp[i][j] = dp[i-1][j-1] + 1
|
||
else:
|
||
dp[i][j] = max(dp[i-1][j], dp[i][j-1])
|
||
|
||
return dp[m][n]
|
||
|
||
def calculate_character_overlap(self, reference: str, candidate: str) -> float:
|
||
"""计算字符重叠率"""
|
||
if not reference or not candidate:
|
||
return 0.0
|
||
|
||
ref_chars = set(reference)
|
||
cand_chars = set(candidate)
|
||
|
||
if not cand_chars:
|
||
return 0.0
|
||
|
||
overlap = len(ref_chars & cand_chars)
|
||
total_cand = len(cand_chars)
|
||
|
||
return overlap / total_cand if total_cand > 0 else 0.0
|
||
|
||
def calculate_length_similarity(self, reference: str, candidate: str) -> float:
|
||
"""计算长度相似度"""
|
||
if not reference or not candidate:
|
||
return 0.0
|
||
|
||
ref_len = len(reference)
|
||
cand_len = len(candidate)
|
||
|
||
if ref_len == 0 and cand_len == 0:
|
||
return 1.0
|
||
|
||
if ref_len == 0 or cand_len == 0:
|
||
return 0.0
|
||
|
||
# 使用相对差异计算相似度
|
||
similarity = 1 - abs(ref_len - cand_len) / max(ref_len, cand_len)
|
||
return max(0, similarity)
|
||
|
||
def calculate_exact_match_rate(self, reference: str, candidate: str) -> float:
|
||
"""计算完全匹配率"""
|
||
if not reference and not candidate:
|
||
return 1.0
|
||
|
||
if not reference or not candidate:
|
||
return 0.0
|
||
|
||
# 去除空白字符后比较
|
||
ref_clean = re.sub(r'\s+', '', reference.strip())
|
||
cand_clean = re.sub(r'\s+', '', candidate.strip())
|
||
|
||
return 1.0 if ref_clean == cand_clean else 0.0
|
||
|
||
def calculate_keyword_overlap(self, reference: str, candidate: str) -> float:
|
||
"""计算关键词重叠率"""
|
||
ref_keywords = set(self.get_keywords(reference))
|
||
cand_keywords = set(self.get_keywords(candidate))
|
||
|
||
if not cand_keywords:
|
||
return 0.0
|
||
|
||
overlap = len(ref_keywords & cand_keywords)
|
||
total_cand = len(cand_keywords)
|
||
|
||
return overlap / total_cand if total_cand > 0 else 0.0
|
||
|
||
def evaluate_all_metrics(self, reference: str, candidate: str) -> Dict[str, float]:
|
||
"""计算所有评估指标"""
|
||
return {
|
||
'bleu_score': self.calculate_bleu_score(reference, candidate),
|
||
'rouge_l_score': self.calculate_rouge_l(reference, candidate),
|
||
'character_overlap_rate': self.calculate_character_overlap(reference, candidate),
|
||
'length_similarity': self.calculate_length_similarity(reference, candidate),
|
||
'exact_match_rate': self.calculate_exact_match_rate(reference, candidate),
|
||
'keyword_overlap_rate': self.calculate_keyword_overlap(reference, candidate)
|
||
}
|
||
|
||
|
||
def load_data(file_path: str) -> List[Dict[str, Any]]:
|
||
"""加载数据文件
|
||
|
||
支持 .jsonl (JSON Lines) 和 .json (JSON Array) 格式
|
||
"""
|
||
data = []
|
||
file_ext = os.path.splitext(file_path)[1].lower()
|
||
|
||
if file_ext == '.jsonl':
|
||
# JSON Lines格式:每行一个JSON对象
|
||
with open(file_path, 'r', encoding='utf-8') as f:
|
||
for line in f:
|
||
line = line.strip()
|
||
if line:
|
||
try:
|
||
data.append(json.loads(line))
|
||
except json.JSONDecodeError:
|
||
print(f"Warning: Failed to parse line: {line[:100]}")
|
||
elif file_ext == '.json':
|
||
# JSON Array格式:一个包含多个JSON对象的数组
|
||
with open(file_path, 'r', encoding='utf-8') as f:
|
||
try:
|
||
json_data = json.load(f)
|
||
if isinstance(json_data, list):
|
||
data = json_data
|
||
else:
|
||
print(f"Warning: JSON file does not contain an array: {file_path}")
|
||
except json.JSONDecodeError as e:
|
||
print(f"Warning: Failed to parse JSON file: {file_path}, error: {e}")
|
||
else:
|
||
print(f"Warning: Unsupported file format: {file_ext}, only .json and .jsonl are supported")
|
||
|
||
return data
|
||
|
||
|
||
def evaluate_dataset(data: List[Dict[str, Any]], evaluator: ModelEvaluator, use_real_llm: bool = False) -> Tuple[List[Dict], Dict[str, float]]:
|
||
"""评估整个数据集
|
||
|
||
Args:
|
||
data: 数据列表
|
||
evaluator: 评估器实例
|
||
use_real_llm: 是否使用真实LLM评估(默认False,使用模拟评估)
|
||
"""
|
||
results = []
|
||
total_metrics = {
|
||
'bleu_score': 0.0,
|
||
'rouge_l_score': 0.0,
|
||
'character_overlap_rate': 0.0,
|
||
'length_similarity': 0.0,
|
||
'exact_match_rate': 0.0,
|
||
'keyword_overlap_rate': 0.0,
|
||
'llm_score': 0.0
|
||
}
|
||
|
||
print(f"\n开始评估 {len(data)} 条数据...")
|
||
|
||
if use_real_llm:
|
||
print("注意:LLM评分功能使用真实的大语言模型API")
|
||
print("配置来源:llm_config.py")
|
||
else:
|
||
print("注意:LLM评分功能使用模拟评估(基于传统指标的综合评分)")
|
||
print("配置来源:llm_config.py 中的 EVALUATION_WEIGHTS")
|
||
|
||
for idx, item in enumerate(tqdm(data, desc="评估进度")):
|
||
# 支持多种字段名格式
|
||
input_text = item.get('question', item.get('Input', item.get('问题', '')))
|
||
output_text = item.get('output', item.get('Output', item.get('生成答案', '')))
|
||
answer_text = item.get('answer', item.get('Answer', item.get('参考答案', '')))
|
||
|
||
# 计算各项指标
|
||
metrics = evaluator.evaluate_all_metrics(answer_text, output_text)
|
||
|
||
# 获取LLM评估
|
||
if use_real_llm:
|
||
try:
|
||
prompt = evaluator.get_llm_evaluation_prompt(answer_text, output_text, input_text)
|
||
llm_score, llm_reason = evaluator.call_llm_for_evaluation(prompt)
|
||
except Exception as e:
|
||
print(f" [Warning] LLM API调用失败,使用模拟评估: {e}")
|
||
llm_score, llm_reason = evaluator.get_mock_llm_evaluation(answer_text, output_text, input_text)
|
||
else:
|
||
llm_score, llm_reason = evaluator.get_mock_llm_evaluation(answer_text, output_text, input_text)
|
||
|
||
# 添加原始数据
|
||
result = {
|
||
'index': idx + 1,
|
||
'Input': input_text,
|
||
'Output': output_text,
|
||
'Answer': answer_text,
|
||
**metrics,
|
||
'llm_score': llm_score,
|
||
'llm_reason': llm_reason
|
||
}
|
||
results.append(result)
|
||
|
||
# 累加指标
|
||
for key, value in metrics.items():
|
||
total_metrics[key] += value
|
||
total_metrics['llm_score'] += llm_score
|
||
|
||
# 计算平均值
|
||
num_samples = len(data)
|
||
for key in total_metrics:
|
||
total_metrics[key] /= num_samples if num_samples > 0 else 1
|
||
|
||
return results, total_metrics
|
||
|
||
|
||
def save_to_excel(results: List[Dict], total_metrics: Dict, output_dir: str, filename: str):
|
||
"""保存结果到Excel文件"""
|
||
if not os.path.exists(output_dir):
|
||
os.makedirs(output_dir)
|
||
|
||
# 创建DataFrame
|
||
df = pd.DataFrame(results)
|
||
|
||
# 重命名列名
|
||
column_mapping = {
|
||
'index': '序号',
|
||
'Input': '问题',
|
||
'Answer': '参考答案',
|
||
'Output': '生成答案',
|
||
'bleu_score': 'BLEU',
|
||
'rouge_l_score': 'ROUGE-L',
|
||
'character_overlap_rate': '字符重叠率',
|
||
'length_similarity': '长度相似度',
|
||
'exact_match_rate': '完全匹配率',
|
||
'keyword_overlap_rate': '关键词重叠率',
|
||
'llm_score': 'LLM评分',
|
||
'llm_reason': 'LLM评价理由'
|
||
}
|
||
df = df.rename(columns=column_mapping)
|
||
|
||
# 计算整体统计信息
|
||
stats_df = pd.DataFrame([total_metrics])
|
||
stats_column_mapping = {
|
||
'bleu_score': 'BLEU',
|
||
'rouge_l_score': 'ROUGE-L',
|
||
'character_overlap_rate': '字符重叠率',
|
||
'length_similarity': '长度相似度',
|
||
'exact_match_rate': '完全匹配率',
|
||
'keyword_overlap_rate': '关键词重叠率',
|
||
'llm_score': 'LLM评分'
|
||
}
|
||
stats_df = stats_df.rename(columns=stats_column_mapping)
|
||
|
||
# 保存到Excel
|
||
excel_path = os.path.join(output_dir, filename)
|
||
with pd.ExcelWriter(excel_path, engine='openpyxl') as writer:
|
||
# 写入详细结果
|
||
df.to_excel(writer, sheet_name='详细结果', index=False)
|
||
|
||
# 写入统计信息
|
||
stats_df.to_excel(writer, sheet_name='整体统计', index=False)
|
||
|
||
print(f"\n结果已保存到: {excel_path}")
|
||
return excel_path
|
||
|
||
|
||
def print_summary(total_metrics: Dict):
|
||
"""打印评估摘要"""
|
||
print("\n" + "="*60)
|
||
print("模型评估结果摘要")
|
||
print("="*60)
|
||
print(f"BLEU分数: {total_metrics['bleu_score']:.4f}")
|
||
print(f"ROUGE-L分数: {total_metrics['rouge_l_score']:.4f}")
|
||
print(f"字符重叠率: {total_metrics['character_overlap_rate']:.4f}")
|
||
print(f"长度相似度: {total_metrics['length_similarity']:.4f}")
|
||
print(f"完全匹配率: {total_metrics['exact_match_rate']:.4f}")
|
||
print(f"关键词重叠率: {total_metrics['keyword_overlap_rate']:.4f}")
|
||
print(f"LLM评分: {total_metrics['llm_score']:.4f}")
|
||
print("="*60)
|
||
|
||
# 计算综合评分(加权平均)
|
||
weights = {
|
||
'bleu_score': 0.2,
|
||
'rouge_l_score': 0.25,
|
||
'character_overlap_rate': 0.15,
|
||
'length_similarity': 0.1,
|
||
'exact_match_rate': 0.15,
|
||
'keyword_overlap_rate': 0.15
|
||
}
|
||
|
||
composite_score = sum(total_metrics[key] * weight for key, weight in weights.items())
|
||
print(f"综合评分: {composite_score:.4f}")
|
||
print("="*60)
|
||
|
||
|
||
def main():
|
||
"""主函数"""
|
||
print("大模型微调验证系统")
|
||
print("="*60)
|
||
|
||
# 数据目录
|
||
data_dir = "data"
|
||
output_dir = "outputs"
|
||
|
||
# 打印当前配置信息
|
||
print_config_info()
|
||
|
||
# 获取所有数据文件(支持 .jsonl 和 .json 格式)
|
||
data_files = [f for f in os.listdir(data_dir) if f.endswith(('.jsonl', '.json'))]
|
||
print(f"\n发现 {len(data_files)} 个数据文件:")
|
||
for file in data_files:
|
||
file_ext = os.path.splitext(file)[1]
|
||
print(f" - {file} ({file_ext})")
|
||
|
||
# 初始化评估器
|
||
evaluator = ModelEvaluator()
|
||
|
||
# 存储所有结果
|
||
all_results = []
|
||
all_stats = {}
|
||
|
||
# 逐个评估每个文件
|
||
for filename in data_files:
|
||
print(f"\n{'='*60}")
|
||
print(f"评估文件: {filename}")
|
||
print(f"{'='*60}")
|
||
|
||
file_path = os.path.join(data_dir, filename)
|
||
data = load_data(file_path)
|
||
print(f"加载数据: {len(data)} 条记录")
|
||
|
||
# 评估数据(使用配置文件中的USE_REAL_LLM设置)
|
||
results, total_metrics = evaluate_dataset(data, evaluator, use_real_llm=USE_REAL_LLM)
|
||
|
||
# 保存结果
|
||
base_name = os.path.splitext(filename)[0]
|
||
excel_filename = f"{base_name}_evaluation.xlsx"
|
||
excel_path = save_to_excel(results, total_metrics, output_dir, excel_filename)
|
||
|
||
# 存储结果
|
||
all_results.extend([{**r, 'file': filename} for r in results])
|
||
all_stats[filename] = total_metrics
|
||
|
||
# 打印摘要
|
||
print_summary(total_metrics)
|
||
|
||
# 生成汇总报告
|
||
print(f"\n{'='*60}")
|
||
print("生成汇总报告")
|
||
print(f"{'='*60}")
|
||
|
||
# 创建汇总DataFrame
|
||
summary_data = []
|
||
for filename, stats in all_stats.items():
|
||
summary_data.append({
|
||
'文件名': filename,
|
||
'BLEU分数': stats['bleu_score'],
|
||
'ROUGE-L分数': stats['rouge_l_score'],
|
||
'字符重叠率': stats['character_overlap_rate'],
|
||
'长度相似度': stats['length_similarity'],
|
||
'完全匹配率': stats['exact_match_rate'],
|
||
'关键词重叠率': stats['keyword_overlap_rate'],
|
||
'LLM评分': stats['llm_score']
|
||
})
|
||
|
||
summary_df = pd.DataFrame(summary_data)
|
||
|
||
# 计算所有文件的平均分
|
||
avg_scores = summary_df.select_dtypes(include=[np.number]).mean()
|
||
avg_df = pd.DataFrame([avg_scores])
|
||
avg_df.index = ['平均分']
|
||
|
||
# 保存汇总报告
|
||
summary_path = os.path.join(output_dir, "evaluation_summary.xlsx")
|
||
with pd.ExcelWriter(summary_path, engine='openpyxl') as writer:
|
||
summary_df.to_excel(writer, sheet_name='各文件评分', index=False)
|
||
avg_df.to_excel(writer, sheet_name='平均分')
|
||
|
||
print(f"汇总报告已保存到: {summary_path}")
|
||
|
||
# 打印最终汇总
|
||
print(f"\n{'='*60}")
|
||
print("所有文件评估结果汇总")
|
||
print(f"{'='*60}")
|
||
print(summary_df.to_string(index=False, float_format='%.4f'))
|
||
print(f"{'='*60}")
|
||
print("平均分:")
|
||
print(avg_df.to_string(float_format='%.4f'))
|
||
print(f"{'='*60}")
|
||
|
||
# 保存所有详细结果
|
||
all_results_path = os.path.join(output_dir, "all_detailed_results.xlsx")
|
||
all_results_df = pd.DataFrame(all_results)
|
||
# 重命名列名
|
||
all_column_mapping = {
|
||
'index': '序号',
|
||
'Input': '问题',
|
||
'Answer': '参考答案',
|
||
'Output': '生成答案',
|
||
'bleu_score': 'BLEU',
|
||
'rouge_l_score': 'ROUGE-L',
|
||
'character_overlap_rate': '字符重叠率',
|
||
'length_similarity': '长度相似度',
|
||
'exact_match_rate': '完全匹配率',
|
||
'keyword_overlap_rate': '关键词重叠率',
|
||
'llm_score': 'LLM评分',
|
||
'llm_reason': 'LLM评价理由'
|
||
}
|
||
all_results_df = all_results_df.rename(columns=all_column_mapping)
|
||
all_results_df.to_excel(all_results_path, index=False, engine='openpyxl')
|
||
print(f"\n所有详细结果已保存到: {all_results_path}")
|
||
|
||
|
||
# =============================================================================
|
||
# 测试函数
|
||
# =============================================================================
|
||
|
||
def test_single_evaluation():
|
||
"""测试单个评估"""
|
||
from llm_config import USE_REAL_LLM
|
||
|
||
evaluator = ModelEvaluator()
|
||
|
||
# 测试数据
|
||
question = "什么是合同?"
|
||
reference_answer = "合同是当事人之间设立、变更、终止民事法律关系的协议。"
|
||
candidate_answer = "合同是双方或多方之间达成的协议,用于约定权利和义务。"
|
||
|
||
print("="*60)
|
||
print("LLM评估测试")
|
||
print("="*60)
|
||
print(f"评估模式: {'真实LLM API' if USE_REAL_LLM else '模拟评估'}")
|
||
print(f"问题: {question}")
|
||
print(f"参考答案: {reference_answer}")
|
||
print(f"生成答案: {candidate_answer}")
|
||
print("-"*60)
|
||
|
||
try:
|
||
# 获取LLM评估
|
||
prompt = evaluator.get_llm_evaluation_prompt(reference_answer, candidate_answer, question)
|
||
|
||
if USE_REAL_LLM:
|
||
print("尝试调用真实LLM API...")
|
||
score, reason = evaluator.call_llm_for_evaluation(prompt)
|
||
|
||
# 同时计算传统指标
|
||
print("\n传统指标计算:")
|
||
all_metrics = evaluator.evaluate_all_metrics(reference_answer, candidate_answer)
|
||
for key, value in all_metrics.items():
|
||
print(f" {key}: {value:.4f}")
|
||
else:
|
||
print("使用模拟评估...")
|
||
score, reason = evaluator.get_mock_llm_evaluation(reference_answer, candidate_answer, question)
|
||
|
||
print(f"\n[SUCCESS] 评估成功!")
|
||
print(f"评分: {score}/10")
|
||
print(f"评价理由: {reason}")
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"\n[ERROR] 评估失败: {e}")
|
||
print("\n详细错误信息:")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return False
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main() |