feat(orchestration): add orchestration system with task scheduling

Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-openagent)

Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
This commit is contained in:
2026-04-08 00:11:17 +08:00
parent de08165e07
commit 4ef7549efe
8 changed files with 590 additions and 0 deletions

View File

@@ -1,7 +1,16 @@
"""高级编排系统 - Phase 10"""
from app.agents.orchestration.budget import build_subtask_budget
from app.agents.orchestration.result_merge import merge_task_results
from app.agents.orchestration.scheduler import (
ParallelExecutionScheduler,
build_subtask_specs,
ensure_child_links,
)
from app.agents.orchestration.subagent_runtime import subtask_spec_to_agent_task
from app.agents.team.leader import TeamLeader, TeamTask, TaskStatus
from app.agents.transport.remote import RemoteTransport, StructuredMessage
from app.agents.orchestration.task_graph import build_bounded_task_graph, render_task_graph_summary
from app.agents.background.manager import (
BackgroundTaskManager,
BackgroundTask,
@@ -14,7 +23,15 @@ __all__ = [
"TaskStatus",
"RemoteTransport",
"StructuredMessage",
"ParallelExecutionScheduler",
"build_bounded_task_graph",
"build_subtask_budget",
"build_subtask_specs",
"BackgroundTaskManager",
"BackgroundTask",
"ensure_child_links",
"get_background_task_manager",
"merge_task_results",
"render_task_graph_summary",
"subtask_spec_to_agent_task",
]

View File

@@ -0,0 +1,24 @@
from __future__ import annotations
from app.agents.schemas.task import CollaborationBudget
def build_subtask_budget(
*,
execution_mode: str,
max_parallel_tasks: int,
max_tool_calls: int = 2,
max_iterations: int = 2,
metadata: dict | None = None,
) -> CollaborationBudget:
return CollaborationBudget(
mode="collaboration" if execution_mode != "direct" else "direct",
max_parallel_tasks=max_parallel_tasks,
remaining_parallel_tasks=max_parallel_tasks,
max_tool_calls=max_tool_calls,
remaining_tool_calls=max_tool_calls,
max_iterations=max_iterations,
remaining_iterations=max_iterations,
escalation_threshold=1,
metadata=metadata or {},
)

View File

@@ -0,0 +1,31 @@
from __future__ import annotations
from typing import Any
def build_parallel_runtime_metrics(
*,
task_graph: dict[str, Any] | None,
scheduled_subtasks: list[dict[str, Any]] | None,
task_results: list[dict[str, Any]] | None,
merge_report: dict[str, Any] | None,
) -> dict[str, Any]:
task_graph = task_graph or {}
scheduled_subtasks = list(scheduled_subtasks or [])
task_results = list(task_results or [])
merge_report = merge_report or {}
completed = sum(1 for item in task_results if item.get("status") == "completed")
failed = sum(1 for item in task_results if item.get("status") == "failed")
blocked = sum(1 for item in task_results if item.get("status") == "blocked")
return {
"task_graph_node_count": len(task_graph.get("nodes") or []),
"scheduled_subtask_count": len(scheduled_subtasks),
"completed_subtask_count": completed,
"failed_subtask_count": failed,
"blocked_subtask_count": blocked,
"merge_status": merge_report.get("status"),
"merge_conflict_count": len(merge_report.get("conflict_flags") or []),
"fallback_used": bool(merge_report.get("fallback_used") or False),
}

View File

@@ -0,0 +1,69 @@
from __future__ import annotations
from app.agents.schemas.orchestration import MergeReport
from app.agents.verifier import normalize_task_result
def merge_task_results(task_results: list[dict] | list[object]) -> MergeReport:
normalized = [normalize_task_result(item) for item in (task_results or [])]
completed = [item for item in normalized if item.status == "completed"]
failed_or_blocked = [item for item in normalized if item.status in {"failed", "blocked"}]
evidence_union: list[dict] = []
summaries = []
for item in normalized:
evidence_union.extend(list(item.evidence or []))
if item.summary:
summaries.append(item.summary.strip())
unique_summaries = list(dict.fromkeys(summary for summary in summaries if summary))
conflict_flags: list[str] = []
status = "merged"
fallback_used = False
if failed_or_blocked:
status = "fallback"
fallback_used = True
conflict_flags.append(
"failed_or_blocked_tasks:" + ",".join(item.task_id for item in failed_or_blocked)
)
resolution_strategy = "serial_recovery"
resolved_summary = (
completed[-1].summary
if completed and completed[-1].summary
else None
)
elif len(unique_summaries) > 1 and len(completed) > 1:
status = "conflicted"
conflict_flags.append("multiple_distinct_completed_summaries")
resolution_strategy = "rank_by_evidence_count"
ranked = sorted(
completed,
key=lambda item: (len(item.evidence or []), bool(item.summary)),
reverse=True,
)
resolved_summary = ranked[0].summary if ranked and ranked[0].summary else None
else:
resolution_strategy = "evidence_union"
resolved_summary = unique_summaries[-1] if unique_summaries else None
if status == "merged":
summary = (
unique_summaries[-1]
if unique_summaries
else f"已收敛 {len(normalized)} 个子任务结果。"
)
elif status == "conflicted":
summary = "并行子任务摘要存在冲突,需要 verifier 或串行收敛。"
else:
summary = "存在失败或阻塞子任务,需要回退到更保守的收敛路径。"
return MergeReport(
status=status,
summary=summary,
evidence_union=evidence_union,
conflict_flags=conflict_flags,
resolution_strategy=resolution_strategy,
resolved_summary=resolved_summary,
fallback_used=fallback_used,
)

View File

@@ -0,0 +1,93 @@
from __future__ import annotations
from collections import defaultdict, deque
from uuid import uuid4
from app.agents.orchestration.budget import build_subtask_budget
from app.agents.schemas.orchestration import SubTaskSpec, TaskGraph, TaskNode
class ParallelExecutionScheduler:
def plan(self, task_graph: TaskGraph, *, query_text: str) -> list[SubTaskSpec]:
ordered_nodes = _topological_nodes(task_graph)
specs: list[SubTaskSpec] = []
for node in ordered_nodes:
budget = build_subtask_budget(
execution_mode=node.execution_mode,
max_parallel_tasks=max(1, task_graph.max_parallelism),
metadata={
"task_graph_id": task_graph.graph_id,
"depends_on": node.depends_on,
},
)
specs.append(
SubTaskSpec(
subtask_id=node.node_id,
parent_run_id=task_graph.graph_id,
title=node.title,
role=node.role or "master",
goal=node.goal or query_text,
context_slice=_build_context_slice(node, query_text),
allowed_tools=[],
budget_tokens=1200,
budget_tool_calls=budget.max_tool_calls or 2,
expected_output_schema={
"summary": "string",
"evidence": "list",
"status": "completed|failed|blocked",
},
expected_evidence=node.expected_evidence,
dependencies=node.depends_on,
)
)
return specs
def build_subtask_specs(task_graph: TaskGraph, *, query_text: str) -> list[SubTaskSpec]:
return ParallelExecutionScheduler().plan(task_graph, query_text=query_text)
def _build_context_slice(node: TaskNode, query_text: str) -> dict[str, object]:
return {
"query": query_text,
"role": node.role,
"title": node.title,
"goal": node.goal,
"depends_on": node.depends_on,
}
def _topological_nodes(task_graph: TaskGraph) -> list[TaskNode]:
by_id = {node.node_id: node for node in task_graph.nodes}
indegree = {node.node_id: 0 for node in task_graph.nodes}
edges: dict[str, list[str]] = defaultdict(list)
for node in task_graph.nodes:
for dep in node.depends_on:
if dep not in by_id:
continue
edges[dep].append(node.node_id)
indegree[node.node_id] += 1
ready = deque(node_id for node_id, count in indegree.items() if count == 0)
ordered: list[TaskNode] = []
while ready:
node_id = ready.popleft()
ordered.append(by_id[node_id])
for target in edges.get(node_id, []):
indegree[target] -= 1
if indegree[target] == 0:
ready.append(target)
if len(ordered) != len(task_graph.nodes):
return list(task_graph.nodes)
return ordered
def ensure_child_links(specs: list[SubTaskSpec]) -> dict[str, list[str]]:
graph: dict[str, list[str]] = defaultdict(list)
for spec in specs:
for dep in spec.dependencies:
graph[dep].append(spec.subtask_id)
return dict(graph)

View File

@@ -0,0 +1,17 @@
from __future__ import annotations
from app.agents.schemas.orchestration import SubTaskSpec
from app.agents.schemas.task import AgentTask
def subtask_spec_to_agent_task(spec: SubTaskSpec) -> AgentTask:
return AgentTask(
task_id=spec.subtask_id,
title=spec.title,
owner_agent_id=spec.role,
role=spec.role,
goal=spec.goal,
parent_task_id=spec.parent_run_id,
child_task_ids=[],
expected_evidence=spec.expected_evidence,
)

View File

@@ -0,0 +1,128 @@
from __future__ import annotations
from uuid import uuid4
from app.agents.schemas.orchestration import ParallelWorthiness, TaskGraph, TaskNode
ROLE_KEYWORDS: list[tuple[str, tuple[str, ...]]] = [
("librarian", ("", "检索", "资料", "文档", "知识库", "年报", "forum", "search")),
("analyst", ("分析", "判断", "风险", "总结", "对比", "洞察", "结论")),
("schedule_planner", ("计划", "安排", "下周", "日程", "提醒", "优先级")),
("executor", ("执行", "创建", "更新", "落库", "提交", "发帖")),
]
def build_bounded_task_graph(
*,
query_text: str,
parallel_worthiness: ParallelWorthiness,
max_nodes: int = 4,
) -> TaskGraph | None:
roles = _infer_roles(query_text)
if not roles:
return None
independent_roles = roles[: min(max_nodes - 1, max(1, parallel_worthiness.estimated_subtasks))]
nodes: list[TaskNode] = []
for index, role in enumerate(independent_roles, start=1):
node_id = f"task-{index}-{uuid4().hex[:6]}"
nodes.append(
TaskNode(
node_id=node_id,
title=_build_title(role),
role=role,
goal=_build_goal(role, query_text),
depends_on=[],
execution_mode=(
"parallel"
if parallel_worthiness.preferred_mode in {"collaboration", "parallel"}
and len(independent_roles) > 1
else "serial"
),
expected_evidence=_build_expected_evidence(role),
)
)
if len(nodes) > 1:
merge_id = f"merge-{uuid4().hex[:6]}"
nodes.append(
TaskNode(
node_id=merge_id,
title="汇总并收敛最终结论",
role="master",
goal="汇总前置子任务结果,形成统一可验证的输出。",
depends_on=[node.node_id for node in nodes],
execution_mode="serial",
expected_evidence=[{"type": "merge", "detail": "merged summary and conflict notes"}],
)
)
return TaskGraph(
nodes=nodes,
entry_node_ids=[node.node_id for node in nodes if not node.depends_on],
max_parallelism=max(1, len(independent_roles)),
rationale=_build_rationale(parallel_worthiness, independent_roles),
)
def render_task_graph_summary(task_graph: TaskGraph | None) -> str | None:
if task_graph is None or not task_graph.nodes:
return None
lines = ["- 任务图:"]
for node in task_graph.nodes[:4]:
deps = f" deps={','.join(node.depends_on)}" if node.depends_on else ""
lines.append(f" - [{node.execution_mode}] {node.title} ({node.role}){deps}")
return "\n".join(lines)
def _infer_roles(query_text: str) -> list[str]:
selected: list[str] = []
text = query_text or ""
for role, keywords in ROLE_KEYWORDS:
if any(keyword in text for keyword in keywords):
selected.append(role)
if not selected:
return ["analyst"]
return selected
def _build_title(role: str) -> str:
mapping = {
"librarian": "收集事实与外部/内部证据",
"analyst": "形成判断与风险分析",
"schedule_planner": "整理计划和优先级",
"executor": "执行必要操作并回收结果",
}
return mapping.get(role, "处理子任务")
def _build_goal(role: str, query_text: str) -> str:
mapping = {
"librarian": f"围绕请求收集支持结论的事实和资料:{query_text}",
"analyst": f"基于当前请求输出结构化判断:{query_text}",
"schedule_planner": f"把当前请求收束为计划、安排或优先级:{query_text}",
"executor": f"基于请求执行必要动作并返回结果:{query_text}",
}
return mapping.get(role, query_text)
def _build_expected_evidence(role: str) -> list[dict[str, str]]:
mapping = {
"librarian": [{"type": "evidence", "detail": "retrieval findings"}],
"analyst": [{"type": "analysis", "detail": "structured judgment"}],
"schedule_planner": [{"type": "plan", "detail": "explicit schedule or priorities"}],
"executor": [{"type": "execution", "detail": "tool output or mutation result"}],
}
return mapping.get(role, [{"type": "summary", "detail": "task summary"}])
def _build_rationale(parallel_worthiness: ParallelWorthiness, roles: list[str]) -> str:
return (
f"preferred_mode={parallel_worthiness.preferred_mode}; "
f"score={parallel_worthiness.score:.2f}; "
f"roles={','.join(roles)}"
)