diff --git a/backend/app/agents/orchestration/__init__.py b/backend/app/agents/orchestration/__init__.py index 02ff9c5..86958eb 100644 --- a/backend/app/agents/orchestration/__init__.py +++ b/backend/app/agents/orchestration/__init__.py @@ -1,7 +1,16 @@ """高级编排系统 - Phase 10""" +from app.agents.orchestration.budget import build_subtask_budget +from app.agents.orchestration.result_merge import merge_task_results +from app.agents.orchestration.scheduler import ( + ParallelExecutionScheduler, + build_subtask_specs, + ensure_child_links, +) +from app.agents.orchestration.subagent_runtime import subtask_spec_to_agent_task from app.agents.team.leader import TeamLeader, TeamTask, TaskStatus from app.agents.transport.remote import RemoteTransport, StructuredMessage +from app.agents.orchestration.task_graph import build_bounded_task_graph, render_task_graph_summary from app.agents.background.manager import ( BackgroundTaskManager, BackgroundTask, @@ -14,7 +23,15 @@ __all__ = [ "TaskStatus", "RemoteTransport", "StructuredMessage", + "ParallelExecutionScheduler", + "build_bounded_task_graph", + "build_subtask_budget", + "build_subtask_specs", "BackgroundTaskManager", "BackgroundTask", + "ensure_child_links", "get_background_task_manager", + "merge_task_results", + "render_task_graph_summary", + "subtask_spec_to_agent_task", ] diff --git a/backend/app/agents/orchestration/budget.py b/backend/app/agents/orchestration/budget.py new file mode 100644 index 0000000..8f39252 --- /dev/null +++ b/backend/app/agents/orchestration/budget.py @@ -0,0 +1,24 @@ +from __future__ import annotations + +from app.agents.schemas.task import CollaborationBudget + + +def build_subtask_budget( + *, + execution_mode: str, + max_parallel_tasks: int, + max_tool_calls: int = 2, + max_iterations: int = 2, + metadata: dict | None = None, +) -> CollaborationBudget: + return CollaborationBudget( + mode="collaboration" if execution_mode != "direct" else "direct", + max_parallel_tasks=max_parallel_tasks, + remaining_parallel_tasks=max_parallel_tasks, + max_tool_calls=max_tool_calls, + remaining_tool_calls=max_tool_calls, + max_iterations=max_iterations, + remaining_iterations=max_iterations, + escalation_threshold=1, + metadata=metadata or {}, + ) diff --git a/backend/app/agents/orchestration/monitor.py b/backend/app/agents/orchestration/monitor.py new file mode 100644 index 0000000..c8f225a --- /dev/null +++ b/backend/app/agents/orchestration/monitor.py @@ -0,0 +1,31 @@ +from __future__ import annotations + +from typing import Any + + +def build_parallel_runtime_metrics( + *, + task_graph: dict[str, Any] | None, + scheduled_subtasks: list[dict[str, Any]] | None, + task_results: list[dict[str, Any]] | None, + merge_report: dict[str, Any] | None, +) -> dict[str, Any]: + task_graph = task_graph or {} + scheduled_subtasks = list(scheduled_subtasks or []) + task_results = list(task_results or []) + merge_report = merge_report or {} + + completed = sum(1 for item in task_results if item.get("status") == "completed") + failed = sum(1 for item in task_results if item.get("status") == "failed") + blocked = sum(1 for item in task_results if item.get("status") == "blocked") + + return { + "task_graph_node_count": len(task_graph.get("nodes") or []), + "scheduled_subtask_count": len(scheduled_subtasks), + "completed_subtask_count": completed, + "failed_subtask_count": failed, + "blocked_subtask_count": blocked, + "merge_status": merge_report.get("status"), + "merge_conflict_count": len(merge_report.get("conflict_flags") or []), + "fallback_used": bool(merge_report.get("fallback_used") or False), + } diff --git a/backend/app/agents/orchestration/result_merge.py b/backend/app/agents/orchestration/result_merge.py new file mode 100644 index 0000000..c3427d4 --- /dev/null +++ b/backend/app/agents/orchestration/result_merge.py @@ -0,0 +1,69 @@ +from __future__ import annotations + +from app.agents.schemas.orchestration import MergeReport +from app.agents.verifier import normalize_task_result + + +def merge_task_results(task_results: list[dict] | list[object]) -> MergeReport: + normalized = [normalize_task_result(item) for item in (task_results or [])] + completed = [item for item in normalized if item.status == "completed"] + failed_or_blocked = [item for item in normalized if item.status in {"failed", "blocked"}] + + evidence_union: list[dict] = [] + summaries = [] + for item in normalized: + evidence_union.extend(list(item.evidence or [])) + if item.summary: + summaries.append(item.summary.strip()) + + unique_summaries = list(dict.fromkeys(summary for summary in summaries if summary)) + conflict_flags: list[str] = [] + status = "merged" + fallback_used = False + + if failed_or_blocked: + status = "fallback" + fallback_used = True + conflict_flags.append( + "failed_or_blocked_tasks:" + ",".join(item.task_id for item in failed_or_blocked) + ) + resolution_strategy = "serial_recovery" + resolved_summary = ( + completed[-1].summary + if completed and completed[-1].summary + else None + ) + elif len(unique_summaries) > 1 and len(completed) > 1: + status = "conflicted" + conflict_flags.append("multiple_distinct_completed_summaries") + resolution_strategy = "rank_by_evidence_count" + ranked = sorted( + completed, + key=lambda item: (len(item.evidence or []), bool(item.summary)), + reverse=True, + ) + resolved_summary = ranked[0].summary if ranked and ranked[0].summary else None + else: + resolution_strategy = "evidence_union" + resolved_summary = unique_summaries[-1] if unique_summaries else None + + if status == "merged": + summary = ( + unique_summaries[-1] + if unique_summaries + else f"已收敛 {len(normalized)} 个子任务结果。" + ) + elif status == "conflicted": + summary = "并行子任务摘要存在冲突,需要 verifier 或串行收敛。" + else: + summary = "存在失败或阻塞子任务,需要回退到更保守的收敛路径。" + + return MergeReport( + status=status, + summary=summary, + evidence_union=evidence_union, + conflict_flags=conflict_flags, + resolution_strategy=resolution_strategy, + resolved_summary=resolved_summary, + fallback_used=fallback_used, + ) diff --git a/backend/app/agents/orchestration/scheduler.py b/backend/app/agents/orchestration/scheduler.py new file mode 100644 index 0000000..2ee6734 --- /dev/null +++ b/backend/app/agents/orchestration/scheduler.py @@ -0,0 +1,93 @@ +from __future__ import annotations + +from collections import defaultdict, deque +from uuid import uuid4 + +from app.agents.orchestration.budget import build_subtask_budget +from app.agents.schemas.orchestration import SubTaskSpec, TaskGraph, TaskNode + + +class ParallelExecutionScheduler: + def plan(self, task_graph: TaskGraph, *, query_text: str) -> list[SubTaskSpec]: + ordered_nodes = _topological_nodes(task_graph) + specs: list[SubTaskSpec] = [] + for node in ordered_nodes: + budget = build_subtask_budget( + execution_mode=node.execution_mode, + max_parallel_tasks=max(1, task_graph.max_parallelism), + metadata={ + "task_graph_id": task_graph.graph_id, + "depends_on": node.depends_on, + }, + ) + specs.append( + SubTaskSpec( + subtask_id=node.node_id, + parent_run_id=task_graph.graph_id, + title=node.title, + role=node.role or "master", + goal=node.goal or query_text, + context_slice=_build_context_slice(node, query_text), + allowed_tools=[], + budget_tokens=1200, + budget_tool_calls=budget.max_tool_calls or 2, + expected_output_schema={ + "summary": "string", + "evidence": "list", + "status": "completed|failed|blocked", + }, + expected_evidence=node.expected_evidence, + dependencies=node.depends_on, + ) + ) + return specs + + +def build_subtask_specs(task_graph: TaskGraph, *, query_text: str) -> list[SubTaskSpec]: + return ParallelExecutionScheduler().plan(task_graph, query_text=query_text) + + +def _build_context_slice(node: TaskNode, query_text: str) -> dict[str, object]: + return { + "query": query_text, + "role": node.role, + "title": node.title, + "goal": node.goal, + "depends_on": node.depends_on, + } + + +def _topological_nodes(task_graph: TaskGraph) -> list[TaskNode]: + by_id = {node.node_id: node for node in task_graph.nodes} + indegree = {node.node_id: 0 for node in task_graph.nodes} + edges: dict[str, list[str]] = defaultdict(list) + + for node in task_graph.nodes: + for dep in node.depends_on: + if dep not in by_id: + continue + edges[dep].append(node.node_id) + indegree[node.node_id] += 1 + + ready = deque(node_id for node_id, count in indegree.items() if count == 0) + ordered: list[TaskNode] = [] + + while ready: + node_id = ready.popleft() + ordered.append(by_id[node_id]) + for target in edges.get(node_id, []): + indegree[target] -= 1 + if indegree[target] == 0: + ready.append(target) + + if len(ordered) != len(task_graph.nodes): + return list(task_graph.nodes) + return ordered + + +def ensure_child_links(specs: list[SubTaskSpec]) -> dict[str, list[str]]: + graph: dict[str, list[str]] = defaultdict(list) + for spec in specs: + for dep in spec.dependencies: + graph[dep].append(spec.subtask_id) + return dict(graph) diff --git a/backend/app/agents/orchestration/subagent_runtime.py b/backend/app/agents/orchestration/subagent_runtime.py new file mode 100644 index 0000000..564dfb6 --- /dev/null +++ b/backend/app/agents/orchestration/subagent_runtime.py @@ -0,0 +1,17 @@ +from __future__ import annotations + +from app.agents.schemas.orchestration import SubTaskSpec +from app.agents.schemas.task import AgentTask + + +def subtask_spec_to_agent_task(spec: SubTaskSpec) -> AgentTask: + return AgentTask( + task_id=spec.subtask_id, + title=spec.title, + owner_agent_id=spec.role, + role=spec.role, + goal=spec.goal, + parent_task_id=spec.parent_run_id, + child_task_ids=[], + expected_evidence=spec.expected_evidence, + ) diff --git a/backend/app/agents/orchestration/task_graph.py b/backend/app/agents/orchestration/task_graph.py new file mode 100644 index 0000000..b24adaf --- /dev/null +++ b/backend/app/agents/orchestration/task_graph.py @@ -0,0 +1,128 @@ +from __future__ import annotations + +from uuid import uuid4 + +from app.agents.schemas.orchestration import ParallelWorthiness, TaskGraph, TaskNode + + +ROLE_KEYWORDS: list[tuple[str, tuple[str, ...]]] = [ + ("librarian", ("查", "检索", "资料", "文档", "知识库", "年报", "forum", "search")), + ("analyst", ("分析", "判断", "风险", "总结", "对比", "洞察", "结论")), + ("schedule_planner", ("计划", "安排", "下周", "日程", "提醒", "优先级")), + ("executor", ("执行", "创建", "更新", "落库", "提交", "发帖")), +] + + +def build_bounded_task_graph( + *, + query_text: str, + parallel_worthiness: ParallelWorthiness, + max_nodes: int = 4, +) -> TaskGraph | None: + roles = _infer_roles(query_text) + if not roles: + return None + + independent_roles = roles[: min(max_nodes - 1, max(1, parallel_worthiness.estimated_subtasks))] + nodes: list[TaskNode] = [] + + for index, role in enumerate(independent_roles, start=1): + node_id = f"task-{index}-{uuid4().hex[:6]}" + nodes.append( + TaskNode( + node_id=node_id, + title=_build_title(role), + role=role, + goal=_build_goal(role, query_text), + depends_on=[], + execution_mode=( + "parallel" + if parallel_worthiness.preferred_mode in {"collaboration", "parallel"} + and len(independent_roles) > 1 + else "serial" + ), + expected_evidence=_build_expected_evidence(role), + ) + ) + + if len(nodes) > 1: + merge_id = f"merge-{uuid4().hex[:6]}" + nodes.append( + TaskNode( + node_id=merge_id, + title="汇总并收敛最终结论", + role="master", + goal="汇总前置子任务结果,形成统一可验证的输出。", + depends_on=[node.node_id for node in nodes], + execution_mode="serial", + expected_evidence=[{"type": "merge", "detail": "merged summary and conflict notes"}], + ) + ) + + return TaskGraph( + nodes=nodes, + entry_node_ids=[node.node_id for node in nodes if not node.depends_on], + max_parallelism=max(1, len(independent_roles)), + rationale=_build_rationale(parallel_worthiness, independent_roles), + ) + + +def render_task_graph_summary(task_graph: TaskGraph | None) -> str | None: + if task_graph is None or not task_graph.nodes: + return None + + lines = ["- 任务图:"] + for node in task_graph.nodes[:4]: + deps = f" deps={','.join(node.depends_on)}" if node.depends_on else "" + lines.append(f" - [{node.execution_mode}] {node.title} ({node.role}){deps}") + return "\n".join(lines) + + +def _infer_roles(query_text: str) -> list[str]: + selected: list[str] = [] + text = query_text or "" + for role, keywords in ROLE_KEYWORDS: + if any(keyword in text for keyword in keywords): + selected.append(role) + + if not selected: + return ["analyst"] + return selected + + +def _build_title(role: str) -> str: + mapping = { + "librarian": "收集事实与外部/内部证据", + "analyst": "形成判断与风险分析", + "schedule_planner": "整理计划和优先级", + "executor": "执行必要操作并回收结果", + } + return mapping.get(role, "处理子任务") + + +def _build_goal(role: str, query_text: str) -> str: + mapping = { + "librarian": f"围绕请求收集支持结论的事实和资料:{query_text}", + "analyst": f"基于当前请求输出结构化判断:{query_text}", + "schedule_planner": f"把当前请求收束为计划、安排或优先级:{query_text}", + "executor": f"基于请求执行必要动作并返回结果:{query_text}", + } + return mapping.get(role, query_text) + + +def _build_expected_evidence(role: str) -> list[dict[str, str]]: + mapping = { + "librarian": [{"type": "evidence", "detail": "retrieval findings"}], + "analyst": [{"type": "analysis", "detail": "structured judgment"}], + "schedule_planner": [{"type": "plan", "detail": "explicit schedule or priorities"}], + "executor": [{"type": "execution", "detail": "tool output or mutation result"}], + } + return mapping.get(role, [{"type": "summary", "detail": "task summary"}]) + + +def _build_rationale(parallel_worthiness: ParallelWorthiness, roles: list[str]) -> str: + return ( + f"preferred_mode={parallel_worthiness.preferred_mode}; " + f"score={parallel_worthiness.score:.2f}; " + f"roles={','.join(roles)}" + ) diff --git a/backend/app/agents/schemas/orchestration.py b/backend/app/agents/schemas/orchestration.py new file mode 100644 index 0000000..c181510 --- /dev/null +++ b/backend/app/agents/schemas/orchestration.py @@ -0,0 +1,211 @@ +from __future__ import annotations + +import re +from datetime import datetime, timezone +from typing import Any, Literal +from uuid import uuid4 + +from pydantic import BaseModel, Field + +from app.agents.schemas.skills import SkillShortlistEntry + + +ExecutionMode = Literal["direct", "collaboration", "parallel", "delegated"] +ParallelPreference = Literal["direct", "collaboration", "parallel"] + + +class ParallelWorthiness(BaseModel): + should_parallelize: bool = False + score: float = 0.0 + estimated_subtasks: int = 1 + preferred_mode: ParallelPreference = "direct" + reasons: list[str] = Field(default_factory=list) + risk_flags: list[str] = Field(default_factory=list) + + +class TaskNode(BaseModel): + node_id: str + title: str + role: str | None = None + goal: str | None = None + depends_on: list[str] = Field(default_factory=list) + execution_mode: Literal["serial", "parallel"] = "serial" + expected_evidence: list[dict[str, Any]] = Field(default_factory=list) + + +class TaskGraph(BaseModel): + graph_id: str = Field(default_factory=lambda: str(uuid4())) + nodes: list[TaskNode] = Field(default_factory=list) + entry_node_ids: list[str] = Field(default_factory=list) + max_parallelism: int = 1 + rationale: str | None = None + + +class SubTaskSpec(BaseModel): + subtask_id: str + parent_run_id: str + title: str + role: str + goal: str + context_slice: dict[str, Any] = Field(default_factory=dict) + allowed_tools: list[str] = Field(default_factory=list) + budget_tokens: int = 1200 + budget_tool_calls: int = 2 + expected_output_schema: dict[str, Any] = Field(default_factory=dict) + expected_evidence: list[dict[str, Any]] = Field(default_factory=list) + dependencies: list[str] = Field(default_factory=list) + + +class SubTaskResult(BaseModel): + subtask_id: str + status: Literal["completed", "failed", "blocked"] + summary: str | None = None + evidence: list[dict[str, Any]] = Field(default_factory=list) + output: dict[str, Any] = Field(default_factory=dict) + + +class MergeReport(BaseModel): + merge_id: str = Field(default_factory=lambda: str(uuid4())) + status: Literal["merged", "conflicted", "fallback"] + summary: str | None = None + evidence_union: list[dict[str, Any]] = Field(default_factory=list) + conflict_flags: list[str] = Field(default_factory=list) + resolution_strategy: str | None = None + resolved_summary: str | None = None + fallback_used: bool = False + + +class VerificationReport(BaseModel): + status: Literal["passed", "failed", "skipped"] + summary: str | None = None + evidence: list[dict[str, Any]] = Field(default_factory=list) + + +class ExecutionDecision(BaseModel): + request_id: str = Field(default_factory=lambda: str(uuid4())) + mode: ExecutionMode = "direct" + reason: str + complexity_score: float = 0.0 + parallel_worthiness_score: float | None = None + selected_roles: list[str] = Field(default_factory=list) + + +class RuntimeRequestContext(BaseModel): + request_id: str = Field(default_factory=lambda: str(uuid4())) + session_id: str | None = None + user_id: str + conversation_id: str | None = None + query_text: str | None = None + raw_user_query: str | None = None + recalled_memories: list[str] = Field(default_factory=list) + retrospective_shortlist: list[dict[str, Any]] = Field(default_factory=list) + recalled_retrospectives: list[dict[str, Any]] = Field(default_factory=list) + skill_shortlist: list[SkillShortlistEntry] = Field(default_factory=list) + shortlisted_skills: list[str] = Field(default_factory=list) + parallel_worthiness: ParallelWorthiness = Field(default_factory=ParallelWorthiness) + task_graph: TaskGraph | None = None + recommended_runtime_mode: Literal["direct", "collaboration"] = "direct" + execution_mode: Literal["direct", "collaboration"] | None = None + current_agent_role: str | None = None + conversation_state_ref: str | None = None + assembly_metrics: dict[str, float] = Field(default_factory=dict) + assembled_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + + +def assess_parallel_worthiness( + query_text: str, + *, + retrospective_count: int = 0, + skill_count: int = 0, +) -> ParallelWorthiness: + normalized = (query_text or "").strip().lower() + reasons: list[str] = [] + score = 0.0 + + multi_step_markers = ("然后", "接着", "同时", "并且", "最后", "汇总", "对比", "分析", "research") + artifact_markers = ("文档", "代码", "文件", "数据库", "论坛", "知识库", "计划") + + if any(marker in normalized for marker in multi_step_markers): + score += 0.35 + reasons.append("multi_step_request") + + if sum(1 for marker in artifact_markers if marker in normalized) >= 2: + score += 0.25 + reasons.append("multi_source_context") + + if len(re.findall(r"[,,、;;]", query_text or "")) >= 2: + score += 0.15 + reasons.append("compound_instruction") + + if retrospective_count > 0: + score += 0.1 + reasons.append("historical_support") + + if skill_count > 0: + score += 0.1 + reasons.append("skill_candidates_available") + + score = min(score, 1.0) + should_parallelize = score >= 0.55 + preferred_mode: ParallelPreference = "parallel" if should_parallelize else "direct" + if not should_parallelize and score >= 0.3: + preferred_mode = "collaboration" + + estimated_subtasks = 1 + if preferred_mode == "parallel": + estimated_subtasks = 3 if score >= 0.8 else 2 + elif preferred_mode == "collaboration": + estimated_subtasks = 2 + + return ParallelWorthiness( + should_parallelize=should_parallelize, + score=round(score, 3), + estimated_subtasks=estimated_subtasks, + preferred_mode=preferred_mode, + reasons=reasons, + ) + + +def render_runtime_request_context_summary(context: RuntimeRequestContext) -> str: + lines = ["【Runtime Request Context】"] + lines.append(f"- 推荐运行模式: {context.recommended_runtime_mode}") + lines.append( + f"- 并行潜力: score={context.parallel_worthiness.score:.2f}, " + f"preferred={context.parallel_worthiness.preferred_mode}, " + f"estimated_subtasks={context.parallel_worthiness.estimated_subtasks}" + ) + + if context.parallel_worthiness.reasons: + lines.append(f"- 并行判断依据: {', '.join(context.parallel_worthiness.reasons)}") + if context.assembly_metrics: + total_ms = context.assembly_metrics.get("total_ms") + if total_ms is not None: + lines.append(f"- 上下文装配耗时: {total_ms:.1f} ms") + + if context.task_graph and context.task_graph.nodes: + lines.append( + f"- 任务图: nodes={len(context.task_graph.nodes)}, max_parallelism={context.task_graph.max_parallelism}" + ) + for node in context.task_graph.nodes[:4]: + deps = f", deps={len(node.depends_on)}" if node.depends_on else "" + lines.append(f" - [{node.execution_mode}] {node.title} ({node.role}{deps})") + + if context.retrospective_shortlist: + lines.append("- 历史复盘命中:") + for item in context.retrospective_shortlist[:3]: + summary = (item.get("summary") or item.get("summary_text") or "").strip() + task_type = item.get("task_type") or "unknown" + lines.append(f" - [{task_type}] {summary[:160]}") + + if context.skill_shortlist: + lines.append("- 技能候选:") + for item in context.skill_shortlist[:3]: + lines.append( + f" - {item.skill_name} ({item.injection_mode}, score={item.score:.2f})" + + (f": {item.rationale}" if item.rationale else "") + ) + + if context.recalled_memories: + lines.append("- 记忆上下文已装配,可在回答中按需引用。") + + return "\n".join(lines)