1.修改了并发批次,2.修改了openai请求包,改用普通的http请求,3. 修改了api调用失败重试的问题

This commit is contained in:
2025-12-24 11:06:12 +08:00
parent 324cf04cc3
commit d32ffa4b50
2 changed files with 338 additions and 47 deletions

View File

@@ -37,6 +37,23 @@ from typing import Dict, Any
# True = 使用真实的大语言模型API需要配置API密钥
USE_REAL_LLM = True
# =============================================================================
# 并发评估配置
# =============================================================================
# 并发评估的最大线程数
# 推荐设置:
# - 模拟评估可使用所有CPU核心如32
# - 真实LLM API建议4-8避免触发API速率限制
# - None = 自动检测CPU核心数并使用所有核心
MAX_CONCURRENT_WORKERS = 5 # 可以手动设置为具体数字如8
# 是否显示每个并发任务的详细进度条
# True = 显示详细进度条(可以看到每个任务的执行情况)
# False = 只显示总进度条
# 注意当数据量很大时建议设置为False以避免屏幕输出过多
SHOW_DETAILED_PROGRESS = True
# =============================================================================
# API配置
@@ -180,6 +197,25 @@ def print_config_info():
print(f"评估模式: {'真实LLM API' if USE_REAL_LLM else '模拟评估(默认)'}")
print("-" * 60)
# 显示并发配置
if MAX_CONCURRENT_WORKERS is not None:
print(f"并发设置: {MAX_CONCURRENT_WORKERS} 个线程")
print(" 来源: llm_config.py 中的 MAX_CONCURRENT_WORKERS")
else:
import multiprocessing as mp
cpu_count = mp.cpu_count()
print(f"并发设置: 自动检测CPU核心数 ({cpu_count}核心)")
print(" 来源: 默认设置(可配置 MAX_CONCURRENT_WORKERS")
# 显示详细进度条配置
print(f"详细进度条: {'开启' if SHOW_DETAILED_PROGRESS else '关闭'}")
print(" 来源: llm_config.py 中的 SHOW_DETAILED_PROGRESS")
if SHOW_DETAILED_PROGRESS:
print(" 注意: 开启时会显示每个并发任务的进度条")
else:
print(" 注意: 关闭时只显示总进度条")
print("-" * 60)
if USE_REAL_LLM:
print("OpenAI API配置:")
print(f" API Base: {OPENAI_CONFIG['api_base']}")

View File

@@ -12,8 +12,11 @@ LLM评估配置
import json
import os
import re
import time
import multiprocessing as mp
from collections import Counter
from typing import Dict, List, Tuple, Any
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
import numpy as np
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
@@ -23,10 +26,13 @@ import jieba.posseg as pseg
from difflib import SequenceMatcher
import pandas as pd
from tqdm import tqdm
import requests
# 导入LLM配置
from llm_config import (
USE_REAL_LLM,
MAX_CONCURRENT_WORKERS,
SHOW_DETAILED_PROGRESS,
OPENAI_CONFIG,
EVALUATION_WEIGHTS,
PROMPT_TEMPLATE,
@@ -61,13 +67,18 @@ class ModelEvaluator:
candidate=candidate
)
def call_llm_for_evaluation(self, prompt: str) -> Tuple[int, str]:
"""调用大语言模型进行评估
def call_llm_for_evaluation(self, prompt: str, max_retries: int = 3, retry_delay: float = 1.0) -> Tuple[int, str]:
"""调用大语言模型进行评估(带自动重试)
使用配置:
- API配置来自 llm_config.py 中的 OPENAI_CONFIG
- 支持环境变量和直接配置
参数:
- prompt: 评估提示词
- max_retries: 最大重试次数默认3次
- retry_delay: 重试延迟时间默认1秒
配置方法:
1. 设置环境变量export OPENAI_API_KEY='your-api-key'
2. 在 llm_config.py 中直接修改 OPENAI_CONFIG
@@ -75,35 +86,40 @@ class ModelEvaluator:
# 验证配置
config = validate_openai_config()
# 调用API使用传统HTTP请求带重试机制
for attempt in range(max_retries + 1):
try:
from openai import OpenAI
except ImportError:
raise ImportError(
"需要安装openai库pip install openai\n"
"详细配置请参考 llm_config.py"
)
# 初始化客户端
client = OpenAI(
api_key=config["api_key"],
base_url=config["api_base"]
)
# 调用API
try:
response = client.chat.completions.create(
model=config["model"],
messages=[
# 构造请求体
payload = {
"model": config["model"],
"messages": [
{"role": "system", "content": "你是一个专业的文本质量评估专家。"},
{"role": "user", "content": prompt}
],
temperature=config["temperature"],
max_tokens=config["max_tokens"]
"temperature": config["temperature"],
"max_tokens": config["max_tokens"]
}
# 发送HTTP请求
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {config['api_key']}"
}
response = requests.post(
f"{config['api_base']}/chat/completions",
headers=headers,
json=payload,
timeout=config["timeout"]
)
# 检查响应状态
response.raise_for_status()
# 解析JSON响应
import json
content = response.choices[0].message.content
response_data = response.json()
content = response_data["choices"][0]["message"]["content"]
try:
result = json.loads(content)
score = int(result.get("score", 0))
@@ -114,10 +130,26 @@ class ModelEvaluator:
# 这里可以添加正则表达式来提取评分
raise ValueError(f"无法解析LLM响应{content}")
except (requests.exceptions.RequestException, requests.exceptions.Timeout,
requests.exceptions.ConnectionError, RuntimeError) as e:
# 网络相关错误,可以重试
if attempt < max_retries:
wait_time = retry_delay * (2 ** attempt) # 指数退避
print(f"API调用失败尝试 {attempt + 1}/{max_retries + 1}{wait_time:.1f}秒后重试: {str(e)[:100]}")
time.sleep(wait_time)
continue
else:
print(f"API调用失败已重试 {max_retries}")
raise RuntimeError(f"API调用失败已重试{max_retries}次):{str(e)}")
except Exception as e:
print("api调用失败")
# 其他错误如JSON解析错误不重试
print(f"API调用失败不可重试的错误: {str(e)}")
raise RuntimeError(f"API调用失败{str(e)}")
# 这里不应该到达,但如果到达了,抛出异常
raise RuntimeError("API调用失败达到最大重试次数")
def get_mock_llm_evaluation(self, reference: str, candidate: str, question: str = "") -> Tuple[int, str]:
"""获取模拟的LLM评估结果用于演示
@@ -321,6 +353,189 @@ class ModelEvaluator:
}
def get_cpu_count():
"""获取CPU核心数"""
try:
return mp.cpu_count()
except:
return 4 # 默认值
def evaluate_single_item(args):
"""单条数据评估函数(用于并发处理)"""
idx, item, evaluator, use_real_llm = args
# 支持多种字段名格式
input_text = item.get('question', item.get('Input', item.get('问题', '')))
output_text = item.get('output', item.get('Output', item.get('生成答案', '')))
answer_text = item.get('answer', item.get('Answer', item.get('参考答案', '')))
# 计算各项指标
metrics = evaluator.evaluate_all_metrics(answer_text, output_text)
# 获取LLM评估
if use_real_llm:
try:
prompt = evaluator.get_llm_evaluation_prompt(answer_text, output_text, input_text)
llm_score, llm_reason = evaluator.call_llm_for_evaluation(prompt)
except Exception as e:
# 静默处理错误,返回模拟评估结果
llm_score, llm_reason = evaluator.get_mock_llm_evaluation(answer_text, output_text, input_text)
else:
llm_score, llm_reason = evaluator.get_mock_llm_evaluation(answer_text, output_text, input_text)
# 添加原始数据
result = {
'index': idx + 1,
'Input': input_text,
'Output': output_text,
'Answer': answer_text,
**metrics,
'llm_score': llm_score,
'llm_reason': llm_reason
}
return result
def evaluate_dataset_parallel(data: List[Dict[str, Any]], evaluator: ModelEvaluator, use_real_llm: bool = False, max_workers: int = None) -> Tuple[List[Dict], Dict[str, float]]:
"""并发评估整个数据集
Args:
data: 数据列表
evaluator: 评估器实例
use_real_llm: 是否使用真实LLM评估
max_workers: 最大并发数默认使用CPU核心数
"""
results = []
total_metrics = {
'bleu_score': 0.0,
'rouge_l_score': 0.0,
'character_overlap_rate': 0.0,
'length_similarity': 0.0,
'exact_match_rate': 0.0,
'keyword_overlap_rate': 0.0,
'llm_score': 0.0
}
# 获取并发数(优先级:参数 > 配置文件 > CPU核心数
if max_workers is not None:
# 直接使用传入的参数
pass
elif MAX_CONCURRENT_WORKERS is not None:
# 使用配置文件中的设置
max_workers = MAX_CONCURRENT_WORKERS
else:
# 默认使用CPU核心数
max_workers = get_cpu_count()
print(f"\n开始并发评估 {len(data)} 条数据,使用 {max_workers} 个并发线程...")
if use_real_llm:
print("注意LLM评分功能使用真实的大语言模型API")
print("配置来源llm_config.py")
print(f"并发数: {max_workers}")
else:
print("注意LLM评分功能使用模拟评估基于传统指标的综合评分")
print("配置来源llm_config.py 中的 EVALUATION_WEIGHTS")
print(f"并发数: {max_workers}")
# 使用ThreadPoolExecutor进行并发评估按批次处理
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# 创建总进度条
total_pbar = tqdm(total=len(data), desc="总进度", position=0, leave=True)
# 分批处理数据
batch_size = max_workers # 每批的大小等于并发数
batches = [data[i:i + batch_size] for i in range(0, len(data), batch_size)]
total_batches = len(batches)
# 处理每个批次(动态创建进度条)
for batch_idx, batch_data in enumerate(batches):
batch_num = batch_idx + 1
# 动态创建当前批次的进度条
current_batch_pbars = [] # 当前批次的进度条列表
if SHOW_DETAILED_PROGRESS:
for worker_idx in range(len(batch_data)):
pbar = tqdm(
total=1,
desc=f"批次{batch_num}-并发{worker_idx + 1}: 等待任务",
position=worker_idx + 1, # 从位置1开始
leave=False
)
current_batch_pbars.append(pbar)
# 提交当前批次的所有任务
future_to_info = {} # future -> (item_idx, worker_idx)
for item_idx, item in enumerate(batch_data):
worker_idx = item_idx # 当前批次内的worker索引
global_idx = batch_idx * batch_size + item_idx # 全局索引
future = executor.submit(evaluate_single_item, (global_idx, item, evaluator, use_real_llm))
future_to_info[future] = (global_idx, worker_idx)
# 更新批次进度条
if SHOW_DETAILED_PROGRESS:
pbar = current_batch_pbars[worker_idx]
pbar.set_description(f"批次{batch_num}-并发{worker_idx + 1}: 任务{global_idx + 1}")
pbar.refresh()
# 等待当前批次的所有任务完成
for future in as_completed(future_to_info):
global_idx, worker_idx = future_to_info[future]
try:
result = future.result()
results.append(result)
# 更新批次进度条状态
if SHOW_DETAILED_PROGRESS:
pbar = current_batch_pbars[worker_idx]
pbar.update(1)
pbar.set_description(f"批次{batch_num}-并发{worker_idx + 1}: 任务{global_idx + 1} [完成]")
pbar.refresh()
pbar.close()
# 累加指标
for key in total_metrics:
if key in result:
total_metrics[key] += result[key]
# 更新总进度条
total_pbar.update(1)
except Exception as e:
print(f" [Warning] 任务{global_idx + 1}评估失败: {e}")
if SHOW_DETAILED_PROGRESS:
pbar = current_batch_pbars[worker_idx]
pbar.set_description(f"批次{batch_num}-并发{worker_idx + 1}: 任务{global_idx + 1} [失败]")
pbar.refresh()
pbar.close()
total_pbar.update(1)
# 当前批次完成,关闭该批次的进度条
if SHOW_DETAILED_PROGRESS:
for pbar in current_batch_pbars:
pbar.close()
# 批次间隔(可选)
if batch_idx < total_batches - 1:
time.sleep(0.1) # 短暂间隔
# 关闭总进度条
total_pbar.close()
# 按原始顺序排序
results.sort(key=lambda x: x['index'])
# 计算平均值
num_samples = len(data)
for key in total_metrics:
total_metrics[key] /= num_samples if num_samples > 0 else 1
return results, total_metrics
def load_data(file_path: str) -> List[Dict[str, Any]]:
"""加载数据文件
@@ -514,13 +729,41 @@ def main():
print("大模型微调验证系统")
print("="*60)
# 数据目录
data_dir = "data"
output_dir = "outputs"
# 获取CPU核心
cpu_count = get_cpu_count()
print(f"检测到CPU核心数: {cpu_count}")
# 打印当前配置信息
print_config_info()
# 显示当前并发配置
if MAX_CONCURRENT_WORKERS is not None:
print(f"\n配置文件中的并发设置: {MAX_CONCURRENT_WORKERS}")
else:
print(f"\n未配置并发数将使用CPU核心数: {cpu_count}")
# 询问是否使用并发
print("\n" + "="*60)
use_parallel = input("是否使用并发评估?(y/n默认为y): ").strip().lower()
if not use_parallel or use_parallel == 'y':
use_parallel = True
# 使用配置文件中的设置如果未配置则使用CPU核心数
if MAX_CONCURRENT_WORKERS is not None:
max_workers = MAX_CONCURRENT_WORKERS
print(f"将使用并发模式,并发数: {max_workers} (来自配置文件)")
else:
max_workers = cpu_count
print(f"将使用并发模式,并发数: {max_workers} (使用CPU核心数)")
else:
use_parallel = False
max_workers = None
print("将使用串行模式")
print("="*60)
# 数据目录
data_dir = "data"
output_dir = "outputs"
# 获取所有数据文件(支持 .jsonl 和 .json 格式)
data_files = [f for f in os.listdir(data_dir) if f.endswith(('.jsonl', '.json'))]
print(f"\n发现 {len(data_files)} 个数据文件:")
@@ -545,8 +788,20 @@ def main():
data = load_data(file_path)
print(f"加载数据: {len(data)} 条记录")
# 评估数据(使用配置文件中的USE_REAL_LLM设置
results, total_metrics = evaluate_dataset(data, evaluator, use_real_llm=USE_REAL_LLM)
# 评估数据(根据用户选择使用并发或串行
if use_parallel:
results, total_metrics = evaluate_dataset_parallel(
data,
evaluator,
use_real_llm=USE_REAL_LLM,
max_workers=max_workers
)
else:
results, total_metrics = evaluate_dataset(
data,
evaluator,
use_real_llm=USE_REAL_LLM
)
# 保存结果
base_name = os.path.splitext(filename)[0]