""" 微调任务管理服务 """ import asyncio from typing import Dict, List, Optional from datetime import datetime from enum import Enum class FineTuneStatus(Enum): """微调任务状态""" PENDING = "pending" RUNNING = "running" COMPLETED = "completed" FAILED = "failed" CANCELED = "canceled" class FineTuneTask: """微调任务""" def __init__(self, task_id: str, name: str, description: str): self.task_id = task_id self.name = name self.description = description self.status = FineTuneStatus.PENDING self.progress = 0.0 self.created_at = datetime.now().isoformat() self.updated_at = datetime.now().isoformat() self.logs: List[str] = [] def update_status(self, status: FineTuneStatus): """更新任务状态""" self.status = status self.updated_at = datetime.now().isoformat() def update_progress(self, progress: float, log: str = None): """更新任务进度""" self.progress = min(max(progress, 0.0), 100.0) self.updated_at = datetime.now().isoformat() if log: self.logs.append(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] {log}") def to_dict(self): """转换为字典""" return { "task_id": self.task_id, "name": self.name, "description": self.description, "status": self.status.value, "progress": self.progress, "created_at": self.created_at, "updated_at": self.updated_at, "logs": self.logs } class FineTuneManager: """微调任务管理器""" def __init__(self): self.tasks: Dict[str, FineTuneTask] = {} self.listeners: Dict[str, List[asyncio.Queue]] = {} def create_task(self, task_id: str, name: str, description: str) -> FineTuneTask: """创建新任务""" task = FineTuneTask(task_id, name, description) self.tasks[task_id] = task self.listeners[task_id] = [] return task def get_task(self, task_id: str) -> Optional[FineTuneTask]: """获取任务""" return self.tasks.get(task_id) def get_all_tasks(self) -> List[FineTuneTask]: """获取所有任务""" return list(self.tasks.values()) def update_task_status(self, task_id: str, status: FineTuneStatus): """更新任务状态""" task = self.tasks.get(task_id) if task: task.update_status(status) self._notify_listeners(task_id) def update_task_progress(self, task_id: str, progress: float, log: str = None): """更新任务进度""" task = self.tasks.get(task_id) if task: task.update_progress(progress, log) self._notify_listeners(task_id) def delete_task(self, task_id: str): """删除任务""" if task_id in self.tasks: del self.tasks[task_id] if task_id in self.listeners: # 通知所有监听器任务已删除 for queue in self.listeners[task_id]: queue.put_nowait(None) # None表示任务已删除 del self.listeners[task_id] async def add_listener(self, task_id: str) -> asyncio.Queue: """添加任务监听器""" queue = asyncio.Queue() if task_id in self.listeners: self.listeners[task_id].append(queue) else: self.listeners[task_id] = [queue] return queue def remove_listener(self, task_id: str, queue: asyncio.Queue): """移除任务监听器""" if task_id in self.listeners: self.listeners[task_id].remove(queue) # 如果没有监听器了,清理 if not self.listeners[task_id]: del self.listeners[task_id] def _notify_listeners(self, task_id: str): """通知所有监听器""" if task_id in self.listeners: task = self.tasks.get(task_id) if task: for queue in self.listeners[task_id]: queue.put_nowait(task.to_dict()) def task_exists(self, task_id: str) -> bool: """检查任务是否存在""" return task_id in self.tasks # 全局微调任务管理器实例 fine_tune_manager = FineTuneManager() async def simulate_fine_tune(task_id: str): """模拟微调任务执行""" manager = fine_tune_manager task = manager.get_task(task_id) if not task: return # 更新状态为运行中 manager.update_task_status(task_id, FineTuneStatus.RUNNING) # 模拟训练过程 steps = [ ("初始化模型", 10), ("加载数据集", 20), ("训练第1轮", 35), ("训练第2轮", 50), ("训练第3轮", 65), ("训练第4轮", 80), ("模型评估", 90), ("保存模型", 100) ] for step_name, step_progress in steps: # 模拟步骤执行 await asyncio.sleep(2) manager.update_task_progress(task_id, step_progress, f"{step_name}完成") # 更新状态为完成 manager.update_task_status(task_id, FineTuneStatus.COMPLETED) manager.update_task_progress(task_id, 100.0, "微调任务已完成")