Files
X-Agents/agent/app/agent/multi/iteration.py
DESKTOP-72TV0V4\caoxiaozhu 5ea6f0d31f feat: 新增多 Agent 协作系统
- 添加多 Agent 图协作框架 (graph, supervisor, workers)
- 添加迭代器和集成模块
- 添加多 Agent 规划文档

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-10 23:21:37 +08:00

118 lines
3.2 KiB
Python

"""
迭代控制器
"""
from typing import Optional
class IterationController:
"""迭代控制器 - 管理任务执行的迭代"""
def __init__(
self,
max_iterations: int = 3,
max_retries_per_task: int = 2
):
"""
初始化迭代控制器
Args:
max_iterations: 全局最大迭代次数
max_retries_per_task: 每个任务的最大重试次数
"""
self.max_iterations = max_iterations
self.max_retries_per_task = max_retries_per_task
def should_continue(
self,
iteration: int,
task_status: str,
review_result: Optional[dict] = None
) -> tuple[bool, str]:
"""
判断是否继续迭代
Args:
iteration: 当前迭代次数
task_status: 任务状态
review_result: 评审结果(可选)
Returns:
(是否继续, 原因)
"""
# 超过最大迭代次数
if iteration >= self.max_iterations:
return False, "max_iterations_reached"
# 任务成功完成
if task_status == "completed":
if review_result and review_result.get("passed"):
return False, "task_completed"
elif review_result is None:
return False, "task_completed"
# 任务失败且不可重试
if task_status == "failed":
if review_result and not review_result.get("retryable", True):
return False, "task_failed_non_retryable"
# 检查重试次数
retry_count = review_result.get("retry_count", 0) if review_result else 0
if retry_count >= self.max_retries_per_task:
return False, "max_retries_reached"
# 需要重试
if review_result:
issues = review_result.get("issues", [])
if issues and not review_result.get("passed", True):
return True, "needs_retry"
return True, "continue"
def get_next_action(
self,
review_result: Optional[dict],
current_worker: str
) -> str:
"""
确定下一步动作
Args:
review_result: 评审结果
current_worker: 当前执行的 Worker
Returns:
下一个节点名称
"""
if review_result is None:
return "supervisor"
# 根据评审结果决定下一步
if review_result.get("passed"):
return "supervisor"
# 根据问题类型决定下一步
issues = review_result.get("issues", [])
high_severity = any(i.get("severity") == "high" for i in issues)
if high_severity:
# 严重问题,重新执行相同任务
return current_worker
else:
# 轻微问题,返回 Supervisor
return "supervisor"
def calculate_backoff_delay(self, retry_count: int) -> float:
"""
计算退避延迟(指数退避)
Args:
retry_count: 重试次数
Returns:
延迟时间(秒)
"""
base_delay = 1.0
max_delay = 30.0
delay = min(base_delay * (2 ** retry_count), max_delay)
return delay