From d32ffa4b50da0f32cc41572d4f877ea753595234 Mon Sep 17 00:00:00 2001 From: "DESKTOP-72TV0V4\\caoxiaozhu" Date: Wed, 24 Dec 2025 11:06:12 +0800 Subject: [PATCH] =?UTF-8?q?1.=E4=BF=AE=E6=94=B9=E4=BA=86=E5=B9=B6=E5=8F=91?= =?UTF-8?q?=E6=89=B9=E6=AC=A1=EF=BC=8C2.=E4=BF=AE=E6=94=B9=E4=BA=86openai?= =?UTF-8?q?=E8=AF=B7=E6=B1=82=E5=8C=85=EF=BC=8C=E6=94=B9=E7=94=A8=E6=99=AE?= =?UTF-8?q?=E9=80=9A=E7=9A=84http=E8=AF=B7=E6=B1=82=EF=BC=8C3.=20=E4=BF=AE?= =?UTF-8?q?=E6=94=B9=E4=BA=86api=E8=B0=83=E7=94=A8=E5=A4=B1=E8=B4=A5?= =?UTF-8?q?=E9=87=8D=E8=AF=95=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- llm_config.py | 36 +++++ model_evaluation.py | 349 ++++++++++++++++++++++++++++++++++++++------ 2 files changed, 338 insertions(+), 47 deletions(-) diff --git a/llm_config.py b/llm_config.py index ccf9dc3..27d5522 100644 --- a/llm_config.py +++ b/llm_config.py @@ -37,6 +37,23 @@ from typing import Dict, Any # True = 使用真实的大语言模型API(需要配置API密钥) USE_REAL_LLM = True +# ============================================================================= +# 并发评估配置 +# ============================================================================= + +# 并发评估的最大线程数 +# 推荐设置: +# - 模拟评估:可使用所有CPU核心(如32) +# - 真实LLM API:建议4-8(避免触发API速率限制) +# - None = 自动检测CPU核心数并使用所有核心 +MAX_CONCURRENT_WORKERS = 5 # 可以手动设置为具体数字,如8 + +# 是否显示每个并发任务的详细进度条 +# True = 显示详细进度条(可以看到每个任务的执行情况) +# False = 只显示总进度条 +# 注意:当数据量很大时,建议设置为False以避免屏幕输出过多 +SHOW_DETAILED_PROGRESS = True + # ============================================================================= # API配置 @@ -180,6 +197,25 @@ def print_config_info(): print(f"评估模式: {'真实LLM API' if USE_REAL_LLM else '模拟评估(默认)'}") print("-" * 60) + # 显示并发配置 + if MAX_CONCURRENT_WORKERS is not None: + print(f"并发设置: {MAX_CONCURRENT_WORKERS} 个线程") + print(" 来源: llm_config.py 中的 MAX_CONCURRENT_WORKERS") + else: + import multiprocessing as mp + cpu_count = mp.cpu_count() + print(f"并发设置: 自动检测CPU核心数 ({cpu_count}核心)") + print(" 来源: 默认设置(可配置 MAX_CONCURRENT_WORKERS)") + + # 显示详细进度条配置 + print(f"详细进度条: {'开启' if SHOW_DETAILED_PROGRESS else '关闭'}") + print(" 来源: llm_config.py 中的 SHOW_DETAILED_PROGRESS") + if SHOW_DETAILED_PROGRESS: + print(" 注意: 开启时会显示每个并发任务的进度条") + else: + print(" 注意: 关闭时只显示总进度条") + print("-" * 60) + if USE_REAL_LLM: print("OpenAI API配置:") print(f" API Base: {OPENAI_CONFIG['api_base']}") diff --git a/model_evaluation.py b/model_evaluation.py index fe64bbb..8f4d825 100644 --- a/model_evaluation.py +++ b/model_evaluation.py @@ -12,8 +12,11 @@ LLM评估配置: import json import os import re +import time +import multiprocessing as mp from collections import Counter from typing import Dict, List, Tuple, Any +from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed import numpy as np from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction @@ -23,10 +26,13 @@ import jieba.posseg as pseg from difflib import SequenceMatcher import pandas as pd from tqdm import tqdm +import requests # 导入LLM配置 from llm_config import ( USE_REAL_LLM, + MAX_CONCURRENT_WORKERS, + SHOW_DETAILED_PROGRESS, OPENAI_CONFIG, EVALUATION_WEIGHTS, PROMPT_TEMPLATE, @@ -61,13 +67,18 @@ class ModelEvaluator: candidate=candidate ) - def call_llm_for_evaluation(self, prompt: str) -> Tuple[int, str]: - """调用大语言模型进行评估 + def call_llm_for_evaluation(self, prompt: str, max_retries: int = 3, retry_delay: float = 1.0) -> Tuple[int, str]: + """调用大语言模型进行评估(带自动重试) 使用配置: - API配置来自 llm_config.py 中的 OPENAI_CONFIG - 支持环境变量和直接配置 + 参数: + - prompt: 评估提示词 + - max_retries: 最大重试次数(默认3次) + - retry_delay: 重试延迟时间(秒,默认1秒) + 配置方法: 1. 设置环境变量:export OPENAI_API_KEY='your-api-key' 2. 在 llm_config.py 中直接修改 OPENAI_CONFIG @@ -75,48 +86,69 @@ class ModelEvaluator: # 验证配置 config = validate_openai_config() - try: - from openai import OpenAI - except ImportError: - raise ImportError( - "需要安装openai库:pip install openai\n" - "详细配置请参考 llm_config.py" - ) - - # 初始化客户端 - client = OpenAI( - api_key=config["api_key"], - base_url=config["api_base"] - ) - - # 调用API - try: - response = client.chat.completions.create( - model=config["model"], - messages=[ - {"role": "system", "content": "你是一个专业的文本质量评估专家。"}, - {"role": "user", "content": prompt} - ], - temperature=config["temperature"], - max_tokens=config["max_tokens"] - ) - - # 解析JSON响应 - import json - content = response.choices[0].message.content + # 调用API(使用传统HTTP请求,带重试机制) + for attempt in range(max_retries + 1): try: - result = json.loads(content) - score = int(result.get("score", 0)) - reason = str(result.get("reason", "")) - return score, reason - except (json.JSONDecodeError, KeyError, ValueError): - # 如果无法解析JSON,尝试从文本中提取 - # 这里可以添加正则表达式来提取评分 - raise ValueError(f"无法解析LLM响应:{content}") + # 构造请求体 + payload = { + "model": config["model"], + "messages": [ + {"role": "system", "content": "你是一个专业的文本质量评估专家。"}, + {"role": "user", "content": prompt} + ], + "temperature": config["temperature"], + "max_tokens": config["max_tokens"] + } - except Exception as e: - print("api调用失败") - raise RuntimeError(f"API调用失败:{str(e)}") + # 发送HTTP请求 + headers = { + "Content-Type": "application/json", + "Authorization": f"Bearer {config['api_key']}" + } + + response = requests.post( + f"{config['api_base']}/chat/completions", + headers=headers, + json=payload, + timeout=config["timeout"] + ) + + # 检查响应状态 + response.raise_for_status() + + # 解析JSON响应 + import json + response_data = response.json() + content = response_data["choices"][0]["message"]["content"] + try: + result = json.loads(content) + score = int(result.get("score", 0)) + reason = str(result.get("reason", "")) + return score, reason + except (json.JSONDecodeError, KeyError, ValueError): + # 如果无法解析JSON,尝试从文本中提取 + # 这里可以添加正则表达式来提取评分 + raise ValueError(f"无法解析LLM响应:{content}") + + except (requests.exceptions.RequestException, requests.exceptions.Timeout, + requests.exceptions.ConnectionError, RuntimeError) as e: + # 网络相关错误,可以重试 + if attempt < max_retries: + wait_time = retry_delay * (2 ** attempt) # 指数退避 + print(f"API调用失败(尝试 {attempt + 1}/{max_retries + 1}),{wait_time:.1f}秒后重试: {str(e)[:100]}") + time.sleep(wait_time) + continue + else: + print(f"API调用失败,已重试 {max_retries} 次") + raise RuntimeError(f"API调用失败(已重试{max_retries}次):{str(e)}") + + except Exception as e: + # 其他错误(如JSON解析错误),不重试 + print(f"API调用失败(不可重试的错误): {str(e)}") + raise RuntimeError(f"API调用失败:{str(e)}") + + # 这里不应该到达,但如果到达了,抛出异常 + raise RuntimeError("API调用失败:达到最大重试次数") def get_mock_llm_evaluation(self, reference: str, candidate: str, question: str = "") -> Tuple[int, str]: """获取模拟的LLM评估结果(用于演示) @@ -321,6 +353,189 @@ class ModelEvaluator: } +def get_cpu_count(): + """获取CPU核心数""" + try: + return mp.cpu_count() + except: + return 4 # 默认值 + + +def evaluate_single_item(args): + """单条数据评估函数(用于并发处理)""" + idx, item, evaluator, use_real_llm = args + + # 支持多种字段名格式 + input_text = item.get('question', item.get('Input', item.get('问题', ''))) + output_text = item.get('output', item.get('Output', item.get('生成答案', ''))) + answer_text = item.get('answer', item.get('Answer', item.get('参考答案', ''))) + + # 计算各项指标 + metrics = evaluator.evaluate_all_metrics(answer_text, output_text) + + # 获取LLM评估 + if use_real_llm: + try: + prompt = evaluator.get_llm_evaluation_prompt(answer_text, output_text, input_text) + llm_score, llm_reason = evaluator.call_llm_for_evaluation(prompt) + except Exception as e: + # 静默处理错误,返回模拟评估结果 + llm_score, llm_reason = evaluator.get_mock_llm_evaluation(answer_text, output_text, input_text) + else: + llm_score, llm_reason = evaluator.get_mock_llm_evaluation(answer_text, output_text, input_text) + + # 添加原始数据 + result = { + 'index': idx + 1, + 'Input': input_text, + 'Output': output_text, + 'Answer': answer_text, + **metrics, + 'llm_score': llm_score, + 'llm_reason': llm_reason + } + + return result + + +def evaluate_dataset_parallel(data: List[Dict[str, Any]], evaluator: ModelEvaluator, use_real_llm: bool = False, max_workers: int = None) -> Tuple[List[Dict], Dict[str, float]]: + """并发评估整个数据集 + + Args: + data: 数据列表 + evaluator: 评估器实例 + use_real_llm: 是否使用真实LLM评估 + max_workers: 最大并发数,默认使用CPU核心数 + """ + results = [] + total_metrics = { + 'bleu_score': 0.0, + 'rouge_l_score': 0.0, + 'character_overlap_rate': 0.0, + 'length_similarity': 0.0, + 'exact_match_rate': 0.0, + 'keyword_overlap_rate': 0.0, + 'llm_score': 0.0 + } + + # 获取并发数(优先级:参数 > 配置文件 > CPU核心数) + if max_workers is not None: + # 直接使用传入的参数 + pass + elif MAX_CONCURRENT_WORKERS is not None: + # 使用配置文件中的设置 + max_workers = MAX_CONCURRENT_WORKERS + else: + # 默认使用CPU核心数 + max_workers = get_cpu_count() + + print(f"\n开始并发评估 {len(data)} 条数据,使用 {max_workers} 个并发线程...") + + if use_real_llm: + print("注意:LLM评分功能使用真实的大语言模型API") + print("配置来源:llm_config.py") + print(f"并发数: {max_workers}") + else: + print("注意:LLM评分功能使用模拟评估(基于传统指标的综合评分)") + print("配置来源:llm_config.py 中的 EVALUATION_WEIGHTS") + print(f"并发数: {max_workers}") + + # 使用ThreadPoolExecutor进行并发评估(按批次处理) + with ThreadPoolExecutor(max_workers=max_workers) as executor: + # 创建总进度条 + total_pbar = tqdm(total=len(data), desc="总进度", position=0, leave=True) + + # 分批处理数据 + batch_size = max_workers # 每批的大小等于并发数 + batches = [data[i:i + batch_size] for i in range(0, len(data), batch_size)] + total_batches = len(batches) + + # 处理每个批次(动态创建进度条) + for batch_idx, batch_data in enumerate(batches): + batch_num = batch_idx + 1 + + # 动态创建当前批次的进度条 + current_batch_pbars = [] # 当前批次的进度条列表 + if SHOW_DETAILED_PROGRESS: + for worker_idx in range(len(batch_data)): + pbar = tqdm( + total=1, + desc=f"批次{batch_num}-并发{worker_idx + 1}: 等待任务", + position=worker_idx + 1, # 从位置1开始 + leave=False + ) + current_batch_pbars.append(pbar) + + # 提交当前批次的所有任务 + future_to_info = {} # future -> (item_idx, worker_idx) + for item_idx, item in enumerate(batch_data): + worker_idx = item_idx # 当前批次内的worker索引 + global_idx = batch_idx * batch_size + item_idx # 全局索引 + future = executor.submit(evaluate_single_item, (global_idx, item, evaluator, use_real_llm)) + future_to_info[future] = (global_idx, worker_idx) + + # 更新批次进度条 + if SHOW_DETAILED_PROGRESS: + pbar = current_batch_pbars[worker_idx] + pbar.set_description(f"批次{batch_num}-并发{worker_idx + 1}: 任务{global_idx + 1}") + pbar.refresh() + + # 等待当前批次的所有任务完成 + for future in as_completed(future_to_info): + global_idx, worker_idx = future_to_info[future] + + try: + result = future.result() + results.append(result) + + # 更新批次进度条状态 + if SHOW_DETAILED_PROGRESS: + pbar = current_batch_pbars[worker_idx] + pbar.update(1) + pbar.set_description(f"批次{batch_num}-并发{worker_idx + 1}: 任务{global_idx + 1} [完成]") + pbar.refresh() + pbar.close() + + # 累加指标 + for key in total_metrics: + if key in result: + total_metrics[key] += result[key] + + # 更新总进度条 + total_pbar.update(1) + + except Exception as e: + print(f" [Warning] 任务{global_idx + 1}评估失败: {e}") + if SHOW_DETAILED_PROGRESS: + pbar = current_batch_pbars[worker_idx] + pbar.set_description(f"批次{batch_num}-并发{worker_idx + 1}: 任务{global_idx + 1} [失败]") + pbar.refresh() + pbar.close() + total_pbar.update(1) + + # 当前批次完成,关闭该批次的进度条 + if SHOW_DETAILED_PROGRESS: + for pbar in current_batch_pbars: + pbar.close() + + # 批次间隔(可选) + if batch_idx < total_batches - 1: + time.sleep(0.1) # 短暂间隔 + + # 关闭总进度条 + total_pbar.close() + + # 按原始顺序排序 + results.sort(key=lambda x: x['index']) + + # 计算平均值 + num_samples = len(data) + for key in total_metrics: + total_metrics[key] /= num_samples if num_samples > 0 else 1 + + return results, total_metrics + + def load_data(file_path: str) -> List[Dict[str, Any]]: """加载数据文件 @@ -514,13 +729,41 @@ def main(): print("大模型微调验证系统") print("="*60) - # 数据目录 - data_dir = "data" - output_dir = "outputs" + # 获取CPU核心数 + cpu_count = get_cpu_count() + print(f"检测到CPU核心数: {cpu_count}") # 打印当前配置信息 print_config_info() + # 显示当前并发配置 + if MAX_CONCURRENT_WORKERS is not None: + print(f"\n配置文件中的并发设置: {MAX_CONCURRENT_WORKERS}") + else: + print(f"\n未配置并发数,将使用CPU核心数: {cpu_count}") + + # 询问是否使用并发 + print("\n" + "="*60) + use_parallel = input("是否使用并发评估?(y/n,默认为y): ").strip().lower() + if not use_parallel or use_parallel == 'y': + use_parallel = True + # 使用配置文件中的设置,如果未配置则使用CPU核心数 + if MAX_CONCURRENT_WORKERS is not None: + max_workers = MAX_CONCURRENT_WORKERS + print(f"将使用并发模式,并发数: {max_workers} (来自配置文件)") + else: + max_workers = cpu_count + print(f"将使用并发模式,并发数: {max_workers} (使用CPU核心数)") + else: + use_parallel = False + max_workers = None + print("将使用串行模式") + print("="*60) + + # 数据目录 + data_dir = "data" + output_dir = "outputs" + # 获取所有数据文件(支持 .jsonl 和 .json 格式) data_files = [f for f in os.listdir(data_dir) if f.endswith(('.jsonl', '.json'))] print(f"\n发现 {len(data_files)} 个数据文件:") @@ -545,8 +788,20 @@ def main(): data = load_data(file_path) print(f"加载数据: {len(data)} 条记录") - # 评估数据(使用配置文件中的USE_REAL_LLM设置) - results, total_metrics = evaluate_dataset(data, evaluator, use_real_llm=USE_REAL_LLM) + # 评估数据(根据用户选择使用并发或串行) + if use_parallel: + results, total_metrics = evaluate_dataset_parallel( + data, + evaluator, + use_real_llm=USE_REAL_LLM, + max_workers=max_workers + ) + else: + results, total_metrics = evaluate_dataset( + data, + evaluator, + use_real_llm=USE_REAL_LLM + ) # 保存结果 base_name = os.path.splitext(filename)[0]