diff --git a/backend/app/agents/graph.py b/backend/app/agents/graph.py index baffa5c..6f73bc7 100644 --- a/backend/app/agents/graph.py +++ b/backend/app/agents/graph.py @@ -36,12 +36,22 @@ from app.agents.runtime_metrics import ( ) from app.agents.schemas.event import AgentEvent from app.agents.schemas.message import AgentMessage -from app.agents.schemas.task import AgentTask, CollaborationBudget, InterruptRecord, RecoveryRecord, TaskResult +from app.agents.schemas.task import ( + AgentTask, + CollaborationBudget, + InterruptRecord, + RecoveryRecord, + TaskResult, +) from app.agents.skill_registry import build_skill_context from app.agents.state import AgentRole, AgentState from app.agents.tools import SUB_COMMANDER_TOOLSETS from app.agents.tools.time_reasoning import normalize_tool_time_arguments -from app.agents.verifier import apply_verification_verdict, normalize_task_result, verify_task_result +from app.agents.verifier import ( + apply_verification_verdict, + normalize_task_result, + verify_task_result, +) from app.services.llm_service import ( create_llm_from_config, default_provider_capabilities, @@ -67,13 +77,17 @@ ROLE_SKILL_CONTEXT = { AgentRole.EXECUTOR: "executor", AgentRole.LIBRARIAN: "librarian", AgentRole.ANALYST: "analyst", + AgentRole.CODE_COMMANDER: None, } +from app.agents.prompts import CODE_COMMANDER_SYSTEM_PROMPT + ROLE_SYSTEM_PROMPTS = { AgentRole.SCHEDULE_PLANNER: SCHEDULE_PLANNER_SYSTEM_PROMPT, AgentRole.EXECUTOR: EXECUTOR_SYSTEM_PROMPT, AgentRole.LIBRARIAN: LIBRARIAN_SYSTEM_PROMPT, AgentRole.ANALYST: ANALYST_SYSTEM_PROMPT, + AgentRole.CODE_COMMANDER: CODE_COMMANDER_SYSTEM_PROMPT, } SCHEDULE_KEYWORDS = ( @@ -105,7 +119,18 @@ ACCOUNTING_INTENT_KEYWORDS = ( "花销", "开销", ) -KNOWLEDGE_KEYWORDS = ("知识", "搜索", "检索", "资料", "文档", "联网", "上网", "查询", "查一下", "最新") +KNOWLEDGE_KEYWORDS = ( + "知识", + "搜索", + "检索", + "资料", + "文档", + "联网", + "上网", + "查询", + "查一下", + "最新", +) GENERAL_QA_PATTERNS = ( "介绍一下", "介绍下", @@ -118,8 +143,32 @@ GENERAL_QA_PATTERNS = ( ) ANALYSIS_KEYWORDS = ("分析", "报告", "统计", "趋势", "风险", "洞察", "总结") EXECUTION_KEYWORDS = ("创建", "更新", "修改", "执行", "发帖", "论坛", "帖子", "完成", "处理") +CODE_COMMANDER_KEYWORDS = ( + "写代码", + "代码", + "代码助手", + "demo", + "示例项目", + "贪食蛇", + "小游戏", + "帮我写一个", + "opencode", + "codex", + "claude code", + "gemini code", +) SCHEDULE_ANALYSIS_KEYWORDS = ("聚焦", "判断", "分析", "优先级", "取舍", "最近对话", "该做什么") -SCHEDULE_PLANNING_KEYWORDS = ("安排", "计划", "排期", "提醒", "创建", "新增", "会议", "交付", "节点") +SCHEDULE_PLANNING_KEYWORDS = ( + "安排", + "计划", + "排期", + "提醒", + "创建", + "新增", + "会议", + "交付", + "节点", +) IDENTITY_PATTERNS = ("你是谁", "你是誰") CAPABILITY_PATTERNS = ("你能做什么", "你可以做什么", "你会做什么") SHORT_CONFIRMATION_PATTERNS = ("创建", "好的创建", "确认创建", "就创建", "那就创建") @@ -130,13 +179,26 @@ SCHEDULE_CONFIRMATION_HINTS = ( "是否需要我现在创建", ) SCHEDULE_CONFIRMATION_QUESTION_MARKERS = ("是否", "要不要", "吗", "?", "?") -COLLABORATION_STEP_MARKERS = ("然后", "再", "并且", "同时", "顺便", "最后", "分别", "拆成", "协作", "整合") +COLLABORATION_STEP_MARKERS = ( + "然后", + "再", + "并且", + "同时", + "顺便", + "最后", + "分别", + "拆成", + "协作", + "整合", +) COLLABORATION_ROLE_ORDER = { AgentRole.LIBRARIAN: 0, AgentRole.ANALYST: 1, AgentRole.SCHEDULE_PLANNER: 2, AgentRole.EXECUTOR: 3, } + + def _role_value(role: AgentRole | str | None) -> str: if isinstance(role, AgentRole): return role.value @@ -217,7 +279,9 @@ def _apply_isolation_payload(state: AgentState, payload: dict[str, Any]) -> None state["isolation_mode"] = str(payload.get("mode") or "none") state["isolation_id"] = str(payload.get("isolation_id") or "") or None state["isolation_workspace_path"] = str(payload.get("workspace_path") or "") or None - state["isolation_parent_conversation_id"] = str(payload.get("parent_conversation_id") or "") or None + state["isolation_parent_conversation_id"] = ( + str(payload.get("parent_conversation_id") or "") or None + ) state["isolation_metadata"] = dict(payload.get("metadata") or {}) @@ -325,8 +389,7 @@ def _record_response_usage(state: AgentState, response: Any) -> None: agent_id = str(state.get("agent_id") or state.get("current_agent") or AgentRole.MASTER.value) cost_by_agent = { - key: dict(value) - for key, value in dict(state.get("cost_by_agent") or {}).items() + key: dict(value) for key, value in dict(state.get("cost_by_agent") or {}).items() } agent_totals = dict(cost_by_agent.get(agent_id) or {}) agent_input_tokens = int(agent_totals.get("input_tokens") or 0) + input_tokens @@ -378,7 +441,9 @@ def _role_values() -> set[str]: return {role.value for role in AgentRole} -def _summary_state_key(target: str) -> Literal["schedule_context_summary", "knowledge_context", "analysis_report"]: +def _summary_state_key( + target: str, +) -> Literal["schedule_context_summary", "knowledge_context", "analysis_report"]: if target not in {"schedule_context_summary", "knowledge_context", "analysis_report"}: raise ValueError(f"unsupported summary target: {target}") return cast(Literal["schedule_context_summary", "knowledge_context", "analysis_report"], target) @@ -429,11 +494,15 @@ def _budget_snapshot_from_state( max_parallel_tasks=remaining_parallel_tasks, remaining_parallel_tasks=remaining_parallel_tasks, max_tool_calls=_get_state_int(state, "max_tool_rounds") or None, - remaining_tool_calls=max(_get_state_int(state, "max_tool_rounds") - _get_state_int(state, "tool_round_count"), 0) + remaining_tool_calls=max( + _get_state_int(state, "max_tool_rounds") - _get_state_int(state, "tool_round_count"), 0 + ) if _get_state_int(state, "max_tool_rounds") else None, max_iterations=_get_state_int(state, "max_iterations") or None, - remaining_iterations=max(_get_state_int(state, "max_iterations") - _get_state_int(state, "iteration_count"), 0) + remaining_iterations=max( + _get_state_int(state, "max_iterations") - _get_state_int(state, "iteration_count"), 0 + ) if _get_state_int(state, "max_iterations") else None, metadata=metadata_payload, @@ -518,7 +587,11 @@ def _create_child_agent( _append_event_trace( state, "agent.spawn.blocked", - payload={"reason": "max_spawn_depth_exceeded", "target_role": role.value, "depth": current_depth}, + payload={ + "reason": "max_spawn_depth_exceeded", + "target_role": role.value, + "depth": current_depth, + }, severity="warning", task_id=task.task_id, parent_task_id=task.parent_task_id, @@ -542,7 +615,11 @@ def _create_child_agent( _append_event_trace( state, "agent.created", - payload={"parent_agent_id": parent_agent_id, "child_agent_id": child_agent_id, "target_role": role.value}, + payload={ + "parent_agent_id": parent_agent_id, + "child_agent_id": child_agent_id, + "target_role": role.value, + }, task_id=task.task_id, parent_task_id=task.parent_task_id, child_task_id=task.task_id, @@ -561,7 +638,9 @@ def _record_interrupt( task: AgentTask | dict[str, Any] | None = None, payload: dict[str, Any] | None = None, ) -> InterruptRecord: - task_payload = _task_payload(task) if task is not None else _collaboration_task_from_state(state) + task_payload = ( + _task_payload(task) if task is not None else _collaboration_task_from_state(state) + ) interrupt = InterruptRecord( interrupt_id=f"interrupt-{uuid4()}", reason=reason, @@ -620,7 +699,9 @@ def _record_recovery( task: AgentTask | dict[str, Any] | None = None, payload: dict[str, Any] | None = None, ) -> RecoveryRecord: - task_payload = _task_payload(task) if task is not None else _collaboration_task_from_state(state) + task_payload = ( + _task_payload(task) if task is not None else _collaboration_task_from_state(state) + ) recovery = RecoveryRecord( recovery_id=f"recovery-{uuid4()}", source_interrupt_id=interrupt.interrupt_id, @@ -696,7 +777,9 @@ def _resolve_capabilities(state: AgentState, llm) -> Any: capabilities = getattr(llm, "_jarvis_provider_capabilities", None) if capabilities is None: config = state.get("user_llm_config") - capabilities = resolve_provider_capabilities(config) if config else default_provider_capabilities() + capabilities = ( + resolve_provider_capabilities(config) if config else default_provider_capabilities() + ) state["provider_capabilities"] = { "provider": capabilities.provider, "supports_native_tools": capabilities.supports_native_tools, @@ -728,7 +811,16 @@ def _coalesce_system_messages(messages: list[BaseMessage]) -> list[BaseMessage]: def _is_simple_greeting(text: str) -> bool: - return _normalize_user_text(text) in {"你好", "您好", "早", "早上好", "在吗", "嗨", "hi", "hello"} + return _normalize_user_text(text) in { + "你好", + "您好", + "早", + "早上好", + "在吗", + "嗨", + "hi", + "hello", + } def _is_identity_question(text: str) -> bool: @@ -753,11 +845,7 @@ def _tool_result_indicates_failure(result: Any) -> bool: def _latest_assistant_message_content(messages: list[BaseMessage]) -> str: previous_assistant_message = next( - ( - message - for message in reversed(messages[:-1]) - if getattr(message, "type", "") == "ai" - ), + (message for message in reversed(messages[:-1]) if getattr(message, "type", "") == "ai"), None, ) if previous_assistant_message is None: @@ -770,7 +858,9 @@ def _previous_turn_proposed_schedule_creation(messages: list[BaseMessage]) -> bo return False content = _latest_assistant_message_content(messages) has_schedule_confirmation_hint = any(hint in content for hint in SCHEDULE_CONFIRMATION_HINTS) - has_question_marker = any(marker in content for marker in SCHEDULE_CONFIRMATION_QUESTION_MARKERS) + has_question_marker = any( + marker in content for marker in SCHEDULE_CONFIRMATION_QUESTION_MARKERS + ) return has_schedule_confirmation_hint and has_question_marker @@ -795,7 +885,9 @@ def _previous_turn_completed_reminder_creation(state: AgentState) -> bool: previous_message = messages[-2] if getattr(previous_message, "type", "") != "ai": return False - previous_assistant_content = _stringify_message_content(getattr(previous_message, "content", "")) + previous_assistant_content = _stringify_message_content( + getattr(previous_message, "content", "") + ) completion_markers = ("已创建提醒", "提醒已经创建好了", "帮你设好了这条提醒", "创建成功") return any(marker in previous_assistant_content for marker in completion_markers) @@ -822,7 +914,9 @@ def _expand_schedule_confirmation_query(user_query: str, messages: list[BaseMess def _is_schedule_creation_confirmation_response(response_text: str) -> bool: content = _stringify_message_content(response_text) has_schedule_confirmation_hint = any(hint in content for hint in SCHEDULE_CONFIRMATION_HINTS) - has_question_marker = any(marker in content for marker in SCHEDULE_CONFIRMATION_QUESTION_MARKERS) + has_question_marker = any( + marker in content for marker in SCHEDULE_CONFIRMATION_QUESTION_MARKERS + ) return has_schedule_confirmation_hint and has_question_marker @@ -848,7 +942,9 @@ def _clear_structured_continuity(state: AgentState) -> None: state["continuity_state"] = None -def _should_clear_schedule_creation_continuity(state: AgentState, created_entities: list[dict[str, Any]]) -> bool: +def _should_clear_schedule_creation_continuity( + state: AgentState, created_entities: list[dict[str, Any]] +) -> bool: if not _has_active_structured_continuation(state): return False pending_action = state.get("pending_action") or {} @@ -861,11 +957,14 @@ def _classify_request_signals(user_query: str) -> dict[str, Any]: text = (user_query or "").strip().lower() has_accounting_signal = any(keyword in text for keyword in ACCOUNTING_INTENT_KEYWORDS) has_schedule_signal = not has_accounting_signal and ( - bool(re.search(r"\d{1,2}月\d{1,2}日", text)) or any(keyword in text for keyword in SCHEDULE_KEYWORDS) + bool(re.search(r"\d{1,2}月\d{1,2}日", text)) + or any(keyword in text for keyword in SCHEDULE_KEYWORDS) ) has_analysis_signal = any(keyword in text for keyword in ANALYSIS_KEYWORDS) has_knowledge_signal = any(keyword in text for keyword in KNOWLEDGE_KEYWORDS) - has_execution_signal = has_accounting_signal or any(keyword in text for keyword in EXECUTION_KEYWORDS) + has_execution_signal = has_accounting_signal or any( + keyword in text for keyword in EXECUTION_KEYWORDS + ) roles: list[AgentRole] = [] if has_knowledge_signal: @@ -888,7 +987,9 @@ def _classify_request_signals(user_query: str) -> dict[str, Any]: } -def _select_request_mode(user_query: str) -> tuple[Literal["direct", "collaboration"], dict[str, Any]]: +def _select_request_mode( + user_query: str, +) -> tuple[Literal["direct", "collaboration"], dict[str, Any]]: signals = _classify_request_signals(user_query) roles: list[AgentRole] = signals["roles"] text = signals["text"] @@ -897,9 +998,12 @@ def _select_request_mode(user_query: str) -> tuple[Literal["direct", "collaborat is_long_request = len((user_query or "").strip()) >= 24 should_collaborate = len(roles) >= 3 or ( - len(roles) >= 2 and (has_multi_step_signal or is_explicit_collaboration_request or is_long_request) + len(roles) >= 2 + and (has_multi_step_signal or is_explicit_collaboration_request or is_long_request) + ) + selected_mode: Literal["direct", "collaboration"] = ( + "collaboration" if should_collaborate else "direct" ) - selected_mode: Literal["direct", "collaboration"] = "collaboration" if should_collaborate else "direct" metadata = { "mode": selected_mode, "reason": "multi_role_request" if should_collaborate else "single_role_or_simple_request", @@ -913,6 +1017,10 @@ def _route_agent_from_user_query(user_query: str) -> AgentRole: signals = _classify_request_signals(user_query) text = signals["text"] + # Code Commander routing - check first as it has specific keywords + if any(keyword in text for keyword in CODE_COMMANDER_KEYWORDS): + return AgentRole.CODE_COMMANDER + if signals["has_accounting_signal"]: return AgentRole.EXECUTOR @@ -937,7 +1045,9 @@ def _choose_sub_commander(role: AgentRole, user_query: str) -> str: text = (user_query or "").strip().lower() if role == AgentRole.SCHEDULE_PLANNER: - if re.search(r"\d{1,2}月\d{1,2}日", text) or any(keyword in text for keyword in SCHEDULE_PLANNING_KEYWORDS): + if re.search(r"\d{1,2}月\d{1,2}日", text) or any( + keyword in text for keyword in SCHEDULE_PLANNING_KEYWORDS + ): return "schedule_planning" return "schedule_analysis" if role == AgentRole.EXECUTOR: @@ -965,7 +1075,6 @@ def _is_missing_knowledge_result(tool_result: str | None) -> bool: "未找到相关网页结果", "暂无相关记录", "没有找到", - ) return any(marker in text for marker in markers) @@ -985,7 +1094,7 @@ def _extract_json_object(content: str) -> str | None: continue try: _, end = decoder.raw_decode(text[index:]) - return text[index:index + end] + return text[index : index + end] except json.JSONDecodeError: continue return None @@ -1096,7 +1205,9 @@ def _build_structured_continuity_summary(state: AgentState) -> str | None: return "\n".join(lines) -def _build_system_messages(state: AgentState, system_prompt: str, role: AgentRole, sub_commander: str) -> list[BaseMessage]: +def _build_system_messages( + state: AgentState, system_prompt: str, role: AgentRole, sub_commander: str +) -> list[BaseMessage]: messages: list[BaseMessage] = [SystemMessage(content=system_prompt)] current_datetime_context = state.get("current_datetime_context") @@ -1164,10 +1275,16 @@ def _build_task_goal(role: AgentRole, user_query: str) -> str: def _build_expected_evidence(role: AgentRole) -> list[dict[str, Any]]: evidence_map = { - AgentRole.LIBRARIAN: [{"type": "evidence", "detail": "retrieval findings or source-backed context"}], + AgentRole.LIBRARIAN: [ + {"type": "evidence", "detail": "retrieval findings or source-backed context"} + ], AgentRole.ANALYST: [{"type": "analysis", "detail": "judgment with supporting rationale"}], - AgentRole.SCHEDULE_PLANNER: [{"type": "plan", "detail": "explicit schedule or next-step proposal"}], - AgentRole.EXECUTOR: [{"type": "execution", "detail": "tool output or execution confirmation"}], + AgentRole.SCHEDULE_PLANNER: [ + {"type": "plan", "detail": "explicit schedule or next-step proposal"} + ], + AgentRole.EXECUTOR: [ + {"type": "execution", "detail": "tool output or execution confirmation"} + ], } return evidence_map.get(role, [{"type": "summary", "detail": "task evidence"}]) @@ -1185,7 +1302,9 @@ def _build_collaboration_tasks(user_query: str) -> list[AgentTask]: unique_roles.append(role) parent_task_id = f"task-root-{uuid4().hex[:8]}" - task_ids = [f"task-{index}-{uuid4().hex[:8]}" for index, _ in enumerate(unique_roles[:4], start=1)] + task_ids = [ + f"task-{index}-{uuid4().hex[:8]}" for index, _ in enumerate(unique_roles[:4], start=1) + ] tasks: list[AgentTask] = [] for index, role in enumerate(unique_roles[:4]): tasks.append( @@ -1224,7 +1343,9 @@ def _build_collaboration_context_summary(state: AgentState) -> str | None: if task_results: lines.append("completed_collaboration_results:") for item in task_results[-4:]: - normalized = item.model_dump(mode="json") if isinstance(item, TaskResult) else dict(item) + normalized = ( + item.model_dump(mode="json") if isinstance(item, TaskResult) else dict(item) + ) lines.append( f"- {normalized.get('task_id')} | {normalized.get('status')} | {normalized.get('summary') or ''}" ) @@ -1247,7 +1368,9 @@ def _maybe_reset_turn_budgets(state: AgentState) -> None: return last_message_type = getattr(messages[-1], "type", "") - has_prior_assistant_turn = any(getattr(message, "type", "") == "ai" for message in messages[:-1]) + has_prior_assistant_turn = any( + getattr(message, "type", "") == "ai" for message in messages[:-1] + ) if last_message_type in {"human", "user"} and has_prior_assistant_turn: state["routing_hops"] = 0 state["terminated_due_to_loop_guard"] = False @@ -1301,7 +1424,9 @@ def _append_event_trace( ] -def _set_phase(state: AgentState, phase: str, *, reason: str, payload: dict[str, Any] | None = None) -> None: +def _set_phase( + state: AgentState, phase: str, *, reason: str, payload: dict[str, Any] | None = None +) -> None: if state.get("current_phase") == phase: return state["current_phase"] = phase @@ -1320,7 +1445,9 @@ def _set_phase(state: AgentState, phase: str, *, reason: str, payload: dict[str, ) -def _record_checkpoint(state: AgentState, checkpoint: str, *, reason: str, payload: dict[str, Any] | None = None) -> None: +def _record_checkpoint( + state: AgentState, checkpoint: str, *, reason: str, payload: dict[str, Any] | None = None +) -> None: state["current_checkpoint"] = checkpoint state["checkpoint_history"] = [ *(state.get("checkpoint_history") or []), @@ -1334,7 +1461,12 @@ def _record_checkpoint(state: AgentState, checkpoint: str, *, reason: str, paylo _append_event_trace( state, "agent.checkpoint.recorded", - payload={"checkpoint": checkpoint, "phase": state.get("current_phase"), "reason": reason, **(payload or {})}, + payload={ + "checkpoint": checkpoint, + "phase": state.get("current_phase"), + "reason": reason, + **(payload or {}), + }, ) @@ -1384,7 +1516,9 @@ def _update_task_result_summary(state: AgentState, tool_summaries: list[dict[str state["action_results"] = [*(state.get("action_results") or []), summary] -def _record_sub_commander(state: AgentState, role: AgentRole, sub_commander: str, user_query: str) -> None: +def _record_sub_commander( + state: AgentState, role: AgentRole, sub_commander: str, user_query: str +) -> None: thread_id = _ensure_message_thread(state) message_id, message_index = _bump_message_sequence(state) current_agent_id = str(state.get("agent_id") or AgentRole.MASTER.value) @@ -1428,10 +1562,14 @@ def _record_sub_commander(state: AgentState, role: AgentRole, sub_commander: str def _stop_sub_commander_due_to_budget(state: AgentState, reason: str) -> None: state["stop_reason"] = reason - state["final_response"] = "这次需要处理的步骤有点多,我先停在这里。您可以把目标再明确一点,或让我先只完成其中一步。" + state["final_response"] = ( + "这次需要处理的步骤有点多,我先停在这里。您可以把目标再明确一点,或让我先只完成其中一步。" + ) -def _guard_sub_commander_budget(state: AgentState, counter_key: str, max_key: str, reason: str) -> bool: +def _guard_sub_commander_budget( + state: AgentState, counter_key: str, max_key: str, reason: str +) -> bool: max_value = _get_state_int(state, max_key) current_value = _get_state_int(state, counter_key) if max_value > 0 and current_value >= max_value: @@ -1533,7 +1671,9 @@ def _build_clarification_summary(state: AgentState) -> str | None: lines.append(f"- missing_fields: {', '.join(str(field) for field in missing_fields)}") if partial_args: lines.append(f"- partial_args: {json.dumps(partial_args, ensure_ascii=False)}") - lines.append("- instruction: merge the user's latest answer into the missing fields and continue the same action") + lines.append( + "- instruction: merge the user's latest answer into the missing fields and continue the same action" + ) return "\n".join(lines) @@ -1593,9 +1733,13 @@ def _canonicalize_tool_call(tool_name: str, args: dict[str, Any]) -> tuple[str, normalized_args = dict(args) if normalized_name == "create_reminder": - if normalized_args.get("description") and not (normalized_args.get("title") or normalized_args.get("content")): + if normalized_args.get("description") and not ( + normalized_args.get("title") or normalized_args.get("content") + ): normalized_args["content"] = normalized_args["description"] - if normalized_args.get("reminder_content") and not (normalized_args.get("title") or normalized_args.get("content")): + if normalized_args.get("reminder_content") and not ( + normalized_args.get("title") or normalized_args.get("content") + ): normalized_args["content"] = normalized_args["reminder_content"] if normalized_args.get("reminder_time") and not ( normalized_args.get("reminder_at") @@ -1607,15 +1751,21 @@ def _canonicalize_tool_call(tool_name: str, args: dict[str, Any]) -> tuple[str, normalized_args["time"] = normalized_args["reminder_time"] if normalized_name in {"create_schedule_task", "create_task"}: - if normalized_args.get("task") and not (normalized_args.get("title") or normalized_args.get("content")): + if normalized_args.get("task") and not ( + normalized_args.get("title") or normalized_args.get("content") + ): normalized_args["title"] = normalized_args["task"] - if normalized_args.get("due_datetime") and not (normalized_args.get("due_date") or normalized_args.get("date")): + if normalized_args.get("due_datetime") and not ( + normalized_args.get("due_date") or normalized_args.get("date") + ): normalized_args["due_date"] = normalized_args["due_datetime"] if normalized_args.get("due_time") and not normalized_args.get("due_date"): normalized_args["due_date"] = normalized_args["due_time"] if normalized_name == "create_todo": - if normalized_args.get("task") and not (normalized_args.get("title") or normalized_args.get("content")): + if normalized_args.get("task") and not ( + normalized_args.get("title") or normalized_args.get("content") + ): normalized_args["title"] = normalized_args["task"] if normalized_args.get("date") and not normalized_args.get("todo_date"): normalized_args["todo_date"] = normalized_args["date"] @@ -1623,7 +1773,10 @@ def _canonicalize_tool_call(tool_name: str, args: dict[str, Any]) -> tuple[str, normalized_args["todo_date"] = normalized_args["due_date"] if normalized_args.get("due_datetime") and not normalized_args.get("todo_date"): normalized_args["todo_date"] = normalized_args["due_datetime"] - if any(normalized_args.get(key) for key in ("due_datetime", "start_time", "end_time", "due_time")): + if any( + normalized_args.get(key) + for key in ("due_datetime", "start_time", "end_time", "due_time") + ): normalized_name = "create_schedule_task" if normalized_args.get("due_time") and not normalized_args.get("due_date"): normalized_args["due_date"] = normalized_args["due_time"] @@ -1631,7 +1784,9 @@ def _canonicalize_tool_call(tool_name: str, args: dict[str, Any]) -> tuple[str, normalized_args["due_date"] = normalized_args["todo_date"] if normalized_name == "create_goal": - if normalized_args.get("task") and not (normalized_args.get("title") or normalized_args.get("content")): + if normalized_args.get("task") and not ( + normalized_args.get("title") or normalized_args.get("content") + ): normalized_args["title"] = normalized_args["task"] if normalized_args.get("date") and not normalized_args.get("goal_date"): normalized_args["goal_date"] = normalized_args["date"] @@ -1740,7 +1895,9 @@ async def _execute_tool_calls( { "tool_name": tool_name, "result_preview": _stringify_message_content(result)[:200], - "created_entity_types": [entity.get("type") for entity in call_created_entities if entity.get("type")], + "created_entity_types": [ + entity.get("type") for entity in call_created_entities if entity.get("type") + ], "created_count": len(call_created_entities), } ) @@ -1765,7 +1922,9 @@ async def _run_sub_commander( llm = _get_llm_for_state(state) capabilities = _resolve_capabilities(state, llm) - if _has_active_clarification_context(state) and role.value == str((state.get("clarification_context") or {}).get("owning_agent") or ""): + if _has_active_clarification_context(state) and role.value == str( + (state.get("clarification_context") or {}).get("owning_agent") or "" + ): user_query = _build_resumed_clarification_query(state, user_query) sub_commander = _choose_sub_commander(role, user_query) _record_sub_commander(state, role, sub_commander, user_query) @@ -1785,7 +1944,9 @@ async def _run_sub_commander( ): state["tool_calls"] = [] state["last_tool_result"] = None - state["final_response"] = "上一条提醒已经创建好了。若您现在要新建别的内容,请直接告诉我要创建什么。" + state["final_response"] = ( + "上一条提醒已经创建好了。若您现在要新建别的内容,请直接告诉我要创建什么。" + ) history_messages = list(state.get("messages", [])) history_messages.append(AIMessage(content=state["final_response"])) state["messages"] = history_messages @@ -1804,12 +1965,18 @@ async def _run_sub_commander( state["fallback_parse_error"] = None start_tool_index = len(state.get("tool_outcomes") or []) - if not _guard_sub_commander_budget(state, "tool_round_count", "max_tool_rounds", "max_tool_rounds_exceeded"): + if not _guard_sub_commander_budget( + state, "tool_round_count", "max_tool_rounds", "max_tool_rounds_exceeded" + ): pass - elif not _guard_sub_commander_budget(state, "retry_count", "max_retries", "max_retries_exceeded"): + elif not _guard_sub_commander_budget( + state, "retry_count", "max_retries", "max_retries_exceeded" + ): pass elif not toolset: - if _guard_sub_commander_budget(state, "iteration_count", "max_iterations", "max_iterations_exceeded"): + if _guard_sub_commander_budget( + state, "iteration_count", "max_iterations", "max_iterations_exceeded" + ): state["iteration_count"] = int(state.get("iteration_count") or 0) + 1 response = await _invoke_llm(llm, working_messages) _record_response_usage(state, response) @@ -1818,14 +1985,18 @@ async def _run_sub_commander( state["tool_strategy_used"] = "native" bound_llm = llm.bind_tools(toolset) while state.get("final_response") is None and not state.get("clarification_needed"): - if not _guard_sub_commander_budget(state, "iteration_count", "max_iterations", "max_iterations_exceeded"): + if not _guard_sub_commander_budget( + state, "iteration_count", "max_iterations", "max_iterations_exceeded" + ): break state["iteration_count"] = int(state.get("iteration_count") or 0) + 1 response = await _invoke_llm(bound_llm, working_messages) _record_response_usage(state, response) tool_calls = getattr(response, "tool_calls", None) or [] if tool_calls: - if not _guard_sub_commander_budget(state, "tool_round_count", "max_tool_rounds", "max_tool_rounds_exceeded"): + if not _guard_sub_commander_budget( + state, "tool_round_count", "max_tool_rounds", "max_tool_rounds_exceeded" + ): break prepared_calls, clarification = _prepare_tool_calls_for_execution(tool_calls, state) if clarification: @@ -1848,28 +2019,47 @@ async def _run_sub_commander( content=_stringify_message_content(getattr(response, "content", "")), tool_calls=tool_calls, ) - normalized_calls, tool_result, created_entities, tool_messages = await _execute_tool_calls( + ( + normalized_calls, + tool_result, + created_entities, + tool_messages, + ) = await _execute_tool_calls( prepared_calls, toolset, state, ) state["tool_calls"] = normalized_calls state["last_tool_result"] = tool_result - state["created_entities"] = [*(state.get("created_entities") or []), *created_entities] + state["created_entities"] = [ + *(state.get("created_entities") or []), + *created_entities, + ] if created_entities: _clear_clarification_context(state) - if role == AgentRole.SCHEDULE_PLANNER and _should_clear_schedule_creation_continuity(state, created_entities): + if ( + role == AgentRole.SCHEDULE_PLANNER + and _should_clear_schedule_creation_continuity(state, created_entities) + ): _clear_structured_continuity(state) working_messages = [*working_messages, assistant_tool_message, *tool_messages] - if sub_commander == "librarian_retrieval" and _is_missing_knowledge_result(tool_result): - working_messages.append(SystemMessage(content="如果检索工具没有找到证据,可以直接基于你的常识给出清晰回答,不要机械地说不知道。")) + if sub_commander == "librarian_retrieval" and _is_missing_knowledge_result( + tool_result + ): + working_messages.append( + SystemMessage( + content="如果检索工具没有找到证据,可以直接基于你的常识给出清晰回答,不要机械地说不知道。" + ) + ) continue state["final_response"] = _stringify_message_content(response.content) else: state["tool_strategy_used"] = "json_fallback" allowed_tools = [tool.name for tool in toolset] while state.get("final_response") is None and not state.get("clarification_needed"): - if not _guard_sub_commander_budget(state, "iteration_count", "max_iterations", "max_iterations_exceeded"): + if not _guard_sub_commander_budget( + state, "iteration_count", "max_iterations", "max_iterations_exceeded" + ): break state["iteration_count"] = int(state.get("iteration_count") or 0) + 1 parsed = None @@ -1894,16 +2084,22 @@ async def _run_sub_commander( if parsed is not None: state["fallback_parse_error"] = None break - if not _guard_sub_commander_budget(state, "iteration_count", "max_iterations", "max_iterations_exceeded"): + if not _guard_sub_commander_budget( + state, "iteration_count", "max_iterations", "max_iterations_exceeded" + ): parsed = None break if int(state.get("retry_count") or 0) >= int(state.get("max_retries") or 0): state["fallback_parse_error"] = "invalid_json_action" - state["final_response"] = "这次内部动作解析没整理好,不过您的意思我接住了。您再说一遍要我执行的内容,我只回结果,不展示内部调用细节。" + state["final_response"] = ( + "这次内部动作解析没整理好,不过您的意思我接住了。您再说一遍要我执行的内容,我只回结果,不展示内部调用细节。" + ) break state["iteration_count"] = int(state.get("iteration_count") or 0) + 1 state["retry_count"] = int(state.get("retry_count") or 0) + 1 - retry_instruction = SystemMessage(content="上一次输出不是有效 JSON。请严格只返回合法 JSON,不要加解释。") + retry_instruction = SystemMessage( + content="上一次输出不是有效 JSON。请严格只返回合法 JSON,不要加解释。" + ) if state.get("final_response") is not None: break if parsed is None: @@ -1924,9 +2120,13 @@ async def _run_sub_commander( state["stop_reason"] = "clarification_needed" state["final_response"] = parsed["clarification_question"] break - if not _guard_sub_commander_budget(state, "tool_round_count", "max_tool_rounds", "max_tool_rounds_exceeded"): + if not _guard_sub_commander_budget( + state, "tool_round_count", "max_tool_rounds", "max_tool_rounds_exceeded" + ): break - prepared_calls, clarification = _prepare_tool_calls_for_execution(parsed["tool_calls"], state) + prepared_calls, clarification = _prepare_tool_calls_for_execution( + parsed["tool_calls"], state + ) if clarification: state["clarification_needed"] = True state["clarification_question"] = clarification["question"] @@ -1943,7 +2143,12 @@ async def _run_sub_commander( state["final_response"] = clarification["question"] break state["tool_round_count"] = int(state.get("tool_round_count") or 0) + 1 - normalized_calls, tool_result, created_entities, tool_messages = await _execute_tool_calls( + ( + normalized_calls, + tool_result, + created_entities, + tool_messages, + ) = await _execute_tool_calls( prepared_calls, toolset, state, @@ -1951,11 +2156,17 @@ async def _run_sub_commander( state["tool_calls"] = normalized_calls state["last_tool_result"] = tool_result state["created_entities"] = [*(state.get("created_entities") or []), *created_entities] - if role == AgentRole.SCHEDULE_PLANNER and _should_clear_schedule_creation_continuity(state, created_entities): + if role == AgentRole.SCHEDULE_PLANNER and _should_clear_schedule_creation_continuity( + state, created_entities + ): _clear_structured_continuity(state) working_messages = [*working_messages, *tool_messages] if sub_commander == "librarian_retrieval" and _is_missing_knowledge_result(tool_result): - working_messages.append(SystemMessage(content="如果检索工具没有找到证据,可以直接基于你的常识给出清晰回答,不要机械地说不知道。")) + working_messages.append( + SystemMessage( + content="如果检索工具没有找到证据,可以直接基于你的常识给出清晰回答,不要机械地说不知道。" + ) + ) if summary_target: state[_summary_state_key(summary_target)] = state.get("final_response") @@ -1963,8 +2174,7 @@ async def _run_sub_commander( task_result_summary = state.get("task_result_summary") tool_outcomes = list(state.get("tool_outcomes") or [])[start_tool_index:] has_tool_failure = any( - _tool_result_indicates_failure(outcome.get("result_preview")) - for outcome in tool_outcomes + _tool_result_indicates_failure(outcome.get("result_preview")) for outcome in tool_outcomes ) verifier_input = { "summary": state.get("final_response") or (task_result_summary or {}).get("tools"), @@ -2083,7 +2293,9 @@ def _collect_task_result(task: AgentTask, state: AgentState, start_tool_index: i return TaskResult( task_id=task.task_id, status=status, - summary=_stringify_message_content(state.get("final_response") or state.get("verification_summary")), + summary=_stringify_message_content( + state.get("final_response") or state.get("verification_summary") + ), evidence=_build_task_evidence(state, start_tool_index), owner_agent_id=task.owner_agent_id, parent_task_id=task.parent_task_id, @@ -2107,7 +2319,9 @@ def _collect_task_result(task: AgentTask, state: AgentState, start_tool_index: i ) -def _apply_task_result_to_state(state: AgentState, task: AgentTask, task_result: TaskResult) -> None: +def _apply_task_result_to_state( + state: AgentState, task: AgentTask, task_result: TaskResult +) -> None: normalized_result = normalize_task_result(task_result, default_task_id=task.task_id) serialized_result = normalized_result.model_dump(mode="json") state["task_results"] = [*(state.get("task_results") or []), serialized_result] @@ -2116,7 +2330,11 @@ def _apply_task_result_to_state(state: AgentState, task: AgentTask, task_result: completed_entry: dict[str, Any] | None = None pending_tasks: list[dict[str, Any]] = [] for existing_task in state.get("active_tasks") or []: - normalized_task = existing_task.model_dump(mode="json") if isinstance(existing_task, AgentTask) else dict(existing_task) + normalized_task = ( + existing_task.model_dump(mode="json") + if isinstance(existing_task, AgentTask) + else dict(existing_task) + ) if normalized_task.get("task_id") == task.task_id: normalized_task["status"] = normalized_result.status normalized_task["evidence"] = list(normalized_result.evidence) @@ -2142,10 +2360,7 @@ def _apply_task_result_to_state(state: AgentState, task: AgentTask, task_result: def _build_collaboration_final_response(task_results: list[TaskResult | dict[str, Any]]) -> str: - normalized_results = [ - normalize_task_result(item) - for item in task_results - ] + normalized_results = [normalize_task_result(item) for item in task_results] lines = [f"已按协作模式回收 {len(normalized_results)} 个子任务结果:"] final_completed_summary: str | None = None for index, result in enumerate(normalized_results, start=1): @@ -2157,7 +2372,9 @@ def _build_collaboration_final_response(task_results: list[TaskResult | dict[str if final_completed_summary: lines.append(f"汇总结论:{final_completed_summary}") elif normalized_results: - blocked_result = next((item for item in reversed(normalized_results) if item.next_action), None) + blocked_result = next( + (item for item in reversed(normalized_results) if item.next_action), None + ) if blocked_result and blocked_result.next_action: lines.append(f"下一步:{blocked_result.next_action}") else: @@ -2173,21 +2390,17 @@ def _verify_collaboration_results( expected_task_ids = {task.task_id for task in tasks} normalized_results = [ normalize_task_result(item) - for item in (task_results if task_results is not None else (state.get("task_results") or [])) + for item in ( + task_results if task_results is not None else (state.get("task_results") or []) + ) if normalize_task_result(item).task_id in expected_task_ids ] result_by_task_id = {item.task_id: item for item in normalized_results} missing_task_ids = [task.task_id for task in tasks if task.task_id not in result_by_task_id] failed_or_blocked = [ - item.task_id - for item in normalized_results - if item.status in {"failed", "blocked"} - ] - missing_evidence = [ - item.task_id - for item in normalized_results - if not item.evidence + item.task_id for item in normalized_results if item.status in {"failed", "blocked"} ] + missing_evidence = [item.task_id for item in normalized_results if not item.evidence] verification_evidence = [ { @@ -2200,16 +2413,24 @@ def _verify_collaboration_results( ] if missing_task_ids: summary = f"协作结果不完整,缺少任务结果: {', '.join(missing_task_ids)}" - verdict = verify_task_result(status="failed", summary=summary, evidence=verification_evidence) + verdict = verify_task_result( + status="failed", summary=summary, evidence=verification_evidence + ) elif failed_or_blocked: summary = f"协作结果未闭环,存在失败或阻塞任务: {', '.join(failed_or_blocked)}" - verdict = verify_task_result(status="failed", summary=summary, evidence=verification_evidence) + verdict = verify_task_result( + status="failed", summary=summary, evidence=verification_evidence + ) elif missing_evidence: summary = f"协作结果证据不足,缺少 evidence 的任务: {', '.join(missing_evidence)}" - verdict = verify_task_result(status="failed", summary=summary, evidence=verification_evidence) + verdict = verify_task_result( + status="failed", summary=summary, evidence=verification_evidence + ) else: summary = f"协作模式已完成 {len(normalized_results)}/{len(tasks)} 个子任务,并为每个子任务回收了结果与 evidence。" - verdict = verify_task_result(status="passed", summary=summary, evidence=verification_evidence) + verdict = verify_task_result( + status="passed", summary=summary, evidence=verification_evidence + ) updated_state = apply_verification_verdict(state, verdict) state.update(updated_state) @@ -2222,13 +2443,30 @@ async def _run_collaboration_flow(state: AgentState, user_query: str) -> AgentSt if len(tasks) < 2: state["execution_mode"] = "direct" state["routing_decision"] = {"mode": "direct", "reason": "collaboration_plan_fell_back"} - _record_checkpoint(state, "collaboration.fallback_to_direct", reason="insufficient_tasks", payload={"task_count": len(tasks)}) - _set_phase(state, "phase_1_routing", reason="collaboration_flow_abandoned", payload={"task_count": len(tasks)}) - _record_checkpoint(state, "routing.direct_resumed", reason="collaboration_flow_abandoned", payload={"task_count": len(tasks)}) + _record_checkpoint( + state, + "collaboration.fallback_to_direct", + reason="insufficient_tasks", + payload={"task_count": len(tasks)}, + ) + _set_phase( + state, + "phase_1_routing", + reason="collaboration_flow_abandoned", + payload={"task_count": len(tasks)}, + ) + _record_checkpoint( + state, + "routing.direct_resumed", + reason="collaboration_flow_abandoned", + payload={"task_count": len(tasks)}, + ) return state base_history = list(state.get("messages", [])) - root_agent_id = str(state.get("root_agent_id") or state.get("agent_id") or AgentRole.MASTER.value) + root_agent_id = str( + state.get("root_agent_id") or state.get("agent_id") or AgentRole.MASTER.value + ) coordinator_agent_id = str(state.get("agent_id") or AgentRole.MASTER.value) state["execution_mode"] = "collaboration" state["routing_decision"] = { @@ -2250,15 +2488,24 @@ async def _run_collaboration_flow(state: AgentState, user_query: str) -> AgentSt payload=budget_snapshot, ) state["active_tasks"] = [task.model_dump(mode="json") for task in tasks] - _record_checkpoint(state, "collaboration.tasks_ready", reason="tasks_built", payload={"task_count": len(tasks)}) - parent_task_id = next((task.parent_task_id for task in tasks if task.parent_task_id), None) or "root" + _record_checkpoint( + state, "collaboration.tasks_ready", reason="tasks_built", payload={"task_count": len(tasks)} + ) + parent_task_id = ( + next((task.parent_task_id for task in tasks if task.parent_task_id), None) or "root" + ) state["task_hierarchy"] = {parent_task_id: [task.task_id for task in tasks]} state["task_results"] = [] state["next_step"] = None _set_phase(state, "phase_3_dynamic_collaboration", reason="collaboration_workers_dispatch") for task in tasks: - _record_checkpoint(state, "collaboration.task_dispatch", reason="dispatch_task", payload={"task_id": task.task_id, "role": task.role}) + _record_checkpoint( + state, + "collaboration.task_dispatch", + reason="dispatch_task", + payload={"task_id": task.task_id, "role": task.role}, + ) state["current_agent"] = AgentRole.MASTER.value state["agent_id"] = coordinator_agent_id state["parent_agent_id"] = None @@ -2311,7 +2558,12 @@ async def _run_collaboration_flow(state: AgentState, user_query: str) -> AgentSt ) task_result = _collect_task_result(task, state, start_tool_index) - _record_checkpoint(state, "collaboration.task_result_collected", reason="task_finished", payload={"task_id": task.task_id, "status": task_result.status}) + _record_checkpoint( + state, + "collaboration.task_result_collected", + reason="task_finished", + payload={"task_id": task.task_id, "status": task_result.status}, + ) _append_message_trace( state, from_agent_id=child_agent_id, @@ -2343,7 +2595,9 @@ async def _run_collaboration_flow(state: AgentState, user_query: str) -> AgentSt state["root_agent_id"] = root_agent_id state["collaboration_depth"] = 0 state["final_response"] = _build_collaboration_final_response(state.get("task_results") or []) - _set_phase(state, "phase_4_visibility_and_verification", reason="collaboration_verification_started") + _set_phase( + state, "phase_4_visibility_and_verification", reason="collaboration_verification_started" + ) _record_checkpoint(state, "collaboration.verification_started", reason="before_verify") _append_event_trace( state, @@ -2364,7 +2618,12 @@ async def _run_collaboration_flow(state: AgentState, user_query: str) -> AgentSt }, severity="error" if state.get("verification_status") == "failed" else "info", ) - _record_checkpoint(state, "collaboration.completed", reason="collaboration_flow_finished", payload={"verification_status": state.get("verification_status")}) + _record_checkpoint( + state, + "collaboration.completed", + reason="collaboration_flow_finished", + payload={"verification_status": state.get("verification_status")}, + ) state["messages"] = [*base_history, AIMessage(content=state["final_response"])] state["should_respond"] = True return state @@ -2376,7 +2635,9 @@ def _can_delegate_within_hop_budget(state: AgentState) -> bool: def _stop_due_to_loop_guard(state: AgentState) -> AgentState: state["terminated_due_to_loop_guard"] = True - state["final_response"] = "这次需要处理的步骤有点多,我先停在这里。您可以把目标再明确一点,或让我先只完成其中一步。" + state["final_response"] = ( + "这次需要处理的步骤有点多,我先停在这里。您可以把目标再明确一点,或让我先只完成其中一步。" + ) state["messages"] = [*state.get("messages", []), AIMessage(content=state["final_response"])] return state @@ -2386,7 +2647,9 @@ async def master_node(state: AgentState) -> AgentState: _set_phase(state, "phase_1_routing", reason="master_node_entered") _record_checkpoint(state, "routing.master_entered", reason="master_node_entered") user_messages = _filter_user_messages(state["messages"]) - user_query = _stringify_message_content(user_messages[-1].content).strip() if user_messages else "" + user_query = ( + _stringify_message_content(user_messages[-1].content).strip() if user_messages else "" + ) state["current_agent"] = state.get("current_agent") or AgentRole.MASTER state["active_agents"] = _normalize_active_agents(state.get("active_agents")) @@ -2425,7 +2688,9 @@ async def master_node(state: AgentState) -> AgentState: elif clarification_route is not None: state["execution_mode"] = "direct" routed_agent = clarification_route - elif _is_short_confirmation(user_query) and _previous_turn_proposed_schedule_creation(state.get("messages", [])): + elif _is_short_confirmation(user_query) and _previous_turn_proposed_schedule_creation( + state.get("messages", []) + ): state["execution_mode"] = "direct" routed_agent = AgentRole.SCHEDULE_PLANNER else: @@ -2433,7 +2698,9 @@ async def master_node(state: AgentState) -> AgentState: state["routing_decision"] = routing_metadata if request_mode == "collaboration": collaboration_state = await _run_collaboration_flow(state, user_query) - if collaboration_state.get("execution_mode") == "collaboration" or collaboration_state.get("final_response"): + if collaboration_state.get( + "execution_mode" + ) == "collaboration" or collaboration_state.get("final_response"): return collaboration_state state["execution_mode"] = "direct" routed_agent = _route_agent_from_user_query(user_query) @@ -2445,11 +2712,16 @@ async def master_node(state: AgentState) -> AgentState: state["next_step"] = routed_agent.value if routed_agent not in state["active_agents"]: state["active_agents"] = [*state["active_agents"], routed_agent] - state["agent_trace"] = [*(state.get("agent_trace") or [AgentRole.MASTER.value]), routed_agent.value] + state["agent_trace"] = [ + *(state.get("agent_trace") or [AgentRole.MASTER.value]), + routed_agent.value, + ] return state llm = _get_llm_for_state(state) - response = await _invoke_llm(llm, [SystemMessage(content=MASTER_SYSTEM_PROMPT), *state["messages"]]) + response = await _invoke_llm( + llm, [SystemMessage(content=MASTER_SYSTEM_PROMPT), *state["messages"]] + ) _record_response_usage(state, response) content = _stringify_message_content(response.content).strip() @@ -2462,7 +2734,10 @@ async def master_node(state: AgentState) -> AgentState: state["next_step"] = routed_agent.value if routed_agent not in state["active_agents"]: state["active_agents"] = [*state["active_agents"], routed_agent] - state["agent_trace"] = [*(state.get("agent_trace") or [AgentRole.MASTER.value]), routed_agent.value] + state["agent_trace"] = [ + *(state.get("agent_trace") or [AgentRole.MASTER.value]), + routed_agent.value, + ] return state state["final_response"] = content @@ -2476,7 +2751,9 @@ async def planner_node(state: AgentState) -> AgentState: user_query = _stringify_message_content(user_messages[-1].content) if user_messages else "" if _has_active_clarification_context(state): user_query = _build_resumed_clarification_query(state, user_query) - elif _is_short_confirmation(user_query) and _previous_turn_proposed_schedule_creation(state.get("messages", [])): + elif _is_short_confirmation(user_query) and _previous_turn_proposed_schedule_creation( + state.get("messages", []) + ): user_query = _expand_schedule_confirmation_query(user_query, state.get("messages", [])) return await _run_sub_commander( state, @@ -2526,6 +2803,53 @@ async def analyst_node(state: AgentState) -> AgentState: ) +async def code_commander_node(state: AgentState) -> AgentState: + """ + Code Commander node - handles code execution tasks + Routes to DirectExecutor (LOW risk) or SandboxExecutor (HIGH risk) + """ + from app.agents.prompts import CODE_COMMANDER_SYSTEM_PROMPT + from app.agents.tools.ai_adapter import get_adapter + from app.agents.tools.direct_executor import DirectExecutor + from app.agents.tools.sandbox_executor import SandboxExecutor + from app.agents.tools.security_classifier import RiskLevel, SecurityClassifier + + user_messages = _filter_user_messages(state["messages"]) + user_query = _stringify_message_content(user_messages[-1].content) if user_messages else "" + + # Get AI provider from state (default: claude) + ai_provider = str(state.get("code_ai_provider") or "claude") + + # Classify risk level + classifier = SecurityClassifier() + risk_level = classifier.classify(user_query) + + # Select executor based on risk + adapter = get_adapter(ai_provider) + if risk_level == RiskLevel.LOW: + executor = DirectExecutor(adapter) + result_parts = [] + async for line in executor.execute(user_query): + result_parts.append(line) + result_output = "".join(result_parts) + else: + sandbox_exec = SandboxExecutor(adapter) + result_parts = [] + async for line in sandbox_exec.execute(user_query): + result_parts.append(line) + result_output = "".join(result_parts) + + state["final_response"] = result_output + state["code_execution_result"] = { + "ai_provider": ai_provider, + "risk_level": risk_level.value, + "output": result_output, + } + state["messages"] = [*state.get("messages", []), AIMessage(content=result_output)] + state["should_respond"] = True + return state + + def route_agent(state: AgentState) -> str: if state.get("final_response"): return END @@ -2553,6 +2877,7 @@ def create_agent_graph(callbacks: list | None = None): graph.add_node(AgentRole.EXECUTOR.value, executor_node) graph.add_node(AgentRole.LIBRARIAN.value, librarian_node) graph.add_node(AgentRole.ANALYST.value, analyst_node) + graph.add_node(AgentRole.CODE_COMMANDER.value, code_commander_node) graph.set_entry_point(AgentRole.MASTER.value) graph.add_conditional_edges( @@ -2563,6 +2888,7 @@ def create_agent_graph(callbacks: list | None = None): AgentRole.EXECUTOR.value: AgentRole.EXECUTOR.value, AgentRole.LIBRARIAN.value: AgentRole.LIBRARIAN.value, AgentRole.ANALYST.value: AgentRole.ANALYST.value, + AgentRole.CODE_COMMANDER.value: AgentRole.CODE_COMMANDER.value, END: END, }, ) @@ -2572,6 +2898,7 @@ def create_agent_graph(callbacks: list | None = None): AgentRole.EXECUTOR, AgentRole.LIBRARIAN, AgentRole.ANALYST, + AgentRole.CODE_COMMANDER, ): graph.add_edge(role.value, END) diff --git a/backend/app/agents/prompts.py b/backend/app/agents/prompts.py index bcd6cd3..d1b2f06 100644 --- a/backend/app/agents/prompts.py +++ b/backend/app/agents/prompts.py @@ -309,14 +309,14 @@ ANALYST_INSIGHTS_PROMPT = f"""{JARVIS_PERSONA_PROMPT} 你是 analyst 体系下的洞察建议官,负责从任务、论坛和知识线索里提炼趋势、风险与建议。 -## 允许使用的工具: +## 你的允许使用的工具: - get_tasks - get_forum_posts - search_knowledge - hybrid_search - web_search -## 要求: +## 你的要求: - 先给结论与判断 - 再说明依据与建议 - 当需要外部/最新信息时,可使用 `web_search` @@ -324,6 +324,38 @@ ANALYST_INSIGHTS_PROMPT = f"""{JARVIS_PERSONA_PROMPT} """ +CODE_COMMANDER_SYSTEM_PROMPT = f"""{JARVIS_PERSONA_PROMPT} + +你是代码指挥官,负责协调 AI 写代码助手。 + +## 你的职责: +1. 接收用户选择的 AI 提供商(Claude/Gemini/Codex/OpenCode) +2. 接收用户的写代码需求 +3. 进行安全分级判定 +4. 路由到合适的执行器 + +## 安全分级规则: +- 低风险:demo、示例、贪食蛇游戏等独立项目 +- 高风险:修改现有项目、涉及 Jarvis 项目、路径操作等 + +## 执行模式: +- 直接执行:低风险任务,直接运行 +- 沙盒执行:高风险任务,在临时目录隔离执行 + +## 你的输出: +- 简洁汇报执行结果 +- 如果需要用户交互(如确认 "y"),明确提示 +""" + + +SANDBOX_EXECUTION_PROMPT = """将在隔离的临时目录中执行任务。 +任务完成后,工作目录会被保留供下载。""" + + +DIRECT_EXECUTION_PROMPT = """将直接执行任务。 +如果需要交互,请等待用户输入。""" + + COORDINATOR_SYSTEM_PROMPT = f"""{JARVIS_PERSONA_PROMPT} 你是 Jarvis 的协作协调官,负责把复杂请求收束成最小受控协作,而不是放任系统进入自由 swarm。 @@ -382,6 +414,7 @@ TOP_LEVEL_SYSTEM_PROMPTS_BY_KEY = { "executor": EXECUTOR_SYSTEM_PROMPT, "librarian": LIBRARIAN_SYSTEM_PROMPT, "analyst": ANALYST_SYSTEM_PROMPT, + "code_commander": CODE_COMMANDER_SYSTEM_PROMPT, } diff --git a/backend/app/agents/registry/builtins.py b/backend/app/agents/registry/builtins.py index 7f9dd4c..a9b99d4 100644 --- a/backend/app/agents/registry/builtins.py +++ b/backend/app/agents/registry/builtins.py @@ -29,6 +29,7 @@ TOP_LEVEL_AGENT_DEFAULT_SUB_COMMANDERS: dict[str, tuple[str, ...]] = { "analyst_progress", "analyst_insights", ), + AgentRole.CODE_COMMANDER.value: (), } TOP_LEVEL_AGENT_DISPLAY_NAMES: dict[str, str] = { @@ -37,6 +38,7 @@ TOP_LEVEL_AGENT_DISPLAY_NAMES: dict[str, str] = { AgentRole.EXECUTOR.value: "Executor", AgentRole.LIBRARIAN.value: "Librarian", AgentRole.ANALYST.value: "Analyst", + AgentRole.CODE_COMMANDER.value: "Code Commander", } TOP_LEVEL_AGENT_ROUTING_HINTS: dict[str, tuple[str, ...]] = { @@ -55,6 +57,9 @@ TOP_LEVEL_AGENT_ROUTING_HINTS: dict[str, tuple[str, ...]] = { AgentRole.ANALYST.value: ( "Handle reporting and insight requests using analyst sub-commanders.", ), + AgentRole.CODE_COMMANDER.value: ( + "Handle code writing and execution tasks using AI CLI adapters.", + ), } TOP_LEVEL_AGENT_ALLOWED_SPAWN_ROLES: dict[str, tuple[str, ...]] = { @@ -63,11 +68,13 @@ TOP_LEVEL_AGENT_ALLOWED_SPAWN_ROLES: dict[str, tuple[str, ...]] = { AgentRole.EXECUTOR.value, AgentRole.LIBRARIAN.value, AgentRole.ANALYST.value, + AgentRole.CODE_COMMANDER.value, ), AgentRole.SCHEDULE_PLANNER.value: (AgentRole.SCHEDULE_PLANNER.value,), AgentRole.EXECUTOR.value: (AgentRole.EXECUTOR.value,), AgentRole.LIBRARIAN.value: (AgentRole.LIBRARIAN.value,), AgentRole.ANALYST.value: (AgentRole.ANALYST.value,), + AgentRole.CODE_COMMANDER.value: (), } SUB_COMMANDER_PARENT_AGENT_IDS: dict[str, str] = { @@ -99,11 +106,7 @@ BUILTIN_AGENT_MANIFESTS: tuple[AgentManifest, ...] = tuple( _capability_tool_names = tuple( - dict.fromkeys( - tool.name - for tools in SUB_COMMANDER_TOOLSETS.values() - for tool in tools - ) + dict.fromkeys(tool.name for tools in SUB_COMMANDER_TOOLSETS.values() for tool in tools) ) _CAPABILITY_METADATA_BY_TOOL_NAME: dict[str, dict[str, object]] = { @@ -260,9 +263,7 @@ BUILTIN_SUB_COMMANDER_MANIFESTS: tuple[SubCommanderManifest, ...] = tuple( sub_commander_id=sub_commander_id, parent_agent_id=SUB_COMMANDER_PARENT_AGENT_IDS[sub_commander_id], prompt_text=SUB_COMMANDER_PROMPTS_BY_KEY[sub_commander_id], - capability_ids=list( - dict.fromkeys(tool.name for tool in tools) - ), + capability_ids=list(dict.fromkeys(tool.name for tool in tools)), ) for sub_commander_id, tools in SUB_COMMANDER_TOOLSETS.items() ) diff --git a/backend/app/agents/schemas/task.py b/backend/app/agents/schemas/task.py index dbea254..6e24132 100644 --- a/backend/app/agents/schemas/task.py +++ b/backend/app/agents/schemas/task.py @@ -1,7 +1,9 @@ from __future__ import annotations from datetime import datetime, timezone +from enum import Enum from typing import Any, Literal +from uuid import uuid4 from pydantic import BaseModel, Field @@ -13,6 +15,18 @@ InterruptStatus = Literal["requested", "acknowledged", "resolved"] BudgetMode = Literal["direct", "collaboration"] +class CodeProviderType(str, Enum): + CLAUDE = "claude" + GEMINI = "gemini" + CODEX = "codex" + OPENCODE = "opencode" + + +class RiskLevelType(str, Enum): + LOW = "low" + HIGH = "high" + + class InterruptRecord(BaseModel): interrupt_id: str reason: str @@ -83,3 +97,37 @@ class TaskResult(BaseModel): budget_snapshot: CollaborationBudget | dict[str, Any] | None = None next_action: str | None = None output_data: dict[str, Any] | None = None + + +class CodeTaskType(str, Enum): + DEMO = "demo" + PROJECT = "project" + MODIFICATION = "modification" + + +class CodeTask(BaseModel): + """代码任务请求模型""" + + task_id: str = Field(default_factory=lambda: str(uuid4())) + task_type: CodeTaskType + ai_provider: CodeProviderType + sandbox_mode: bool = False + workspace_path: str | None = None + user_prompt: str + parent_task_id: str | None = None + thread_id: str | None = None + message_id: str | None = None + created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + + +class CodeExecutionResultSchema(BaseModel): + """代码执行结果模型 (API 响应用)""" + + success: bool + message: str + files_created: list[str] = Field(default_factory=list) + output: str = "" + error: str | None = None + exit_code: int = 0 + execution_time: float | None = None + sandbox_session_id: str | None = None diff --git a/backend/app/agents/state.py b/backend/app/agents/state.py index 773aad3..df58ca0 100644 --- a/backend/app/agents/state.py +++ b/backend/app/agents/state.py @@ -4,7 +4,14 @@ from typing import Annotated, Any, Literal, TypedDict from app.agents.schemas.event import AgentEvent from app.agents.schemas.message import AgentMessage -from app.agents.schemas.task import AgentTask, CollaborationBudget, InterruptRecord, RecoveryRecord, TaskResult, VerificationStatus +from app.agents.schemas.task import ( + AgentTask, + CollaborationBudget, + InterruptRecord, + RecoveryRecord, + TaskResult, + VerificationStatus, +) from langchain_core.messages import AIMessage, BaseMessage, HumanMessage from langgraph.graph.message import add_messages @@ -23,6 +30,7 @@ class AgentRole(str, Enum): EXECUTOR = "executor" LIBRARIAN = "librarian" ANALYST = "analyst" + CODE_COMMANDER = "code_commander" @dataclass @@ -141,6 +149,14 @@ class AgentState(TypedDict): user_llm_config: dict[str, Any] | None provider_capabilities: dict[str, Any] | None + # Code Commander state + code_task_type: Literal["demo", "project", "modification"] | None + code_ai_provider: Literal["claude", "gemini", "codex", "opencode"] | None + code_sandbox_mode: bool | None + code_workspace_path: str | None + code_execution_session_id: str | None + code_execution_result: dict[str, Any] | None + def initial_state(user_id: str, conversation_id: str) -> AgentState: return AgentState( diff --git a/backend/app/agents/tools/__init__.py b/backend/app/agents/tools/__init__.py index eb114b6..68d1c0f 100644 --- a/backend/app/agents/tools/__init__.py +++ b/backend/app/agents/tools/__init__.py @@ -138,3 +138,12 @@ SUB_COMMANDER_TOOLSETS = { "analyst_progress": ANALYST_PROGRESS_TOOLS, "analyst_insights": ANALYST_INSIGHT_TOOLS, } + +# Code Commander toolset (tools implemented in later phases) +CODE_COMMANDER_TOOLSET_NAMES = [ + "execute_code_task", + "get_execution_status", + "send_interactive_input", + "download_workspace", + "cleanup_workspace", +] diff --git a/backend/app/agents/tools/ai_adapter.py b/backend/app/agents/tools/ai_adapter.py new file mode 100644 index 0000000..06d3fbb --- /dev/null +++ b/backend/app/agents/tools/ai_adapter.py @@ -0,0 +1,196 @@ +""" +AI CLI Adapter - 统一接口适配不同 AI CLI (Claude/Gemini/Codex/OpenCode) +""" + +from abc import ABC, abstractmethod +from dataclasses import dataclass, field +from pathlib import Path +from typing import Literal + + +@dataclass +class CodeExecutionResult: + """代码执行结果""" + + success: bool + message: str + files_created: list[str] = field(default_factory=list) + output: str = "" + error: str | None = None + exit_code: int = 0 + + +class AICLIAdapter(ABC): + """AI CLI 适配器抽象基类""" + + @property + @abstractmethod + def cli_name(self) -> str: + """CLI 命令名称,如 'claude', 'gemini'""" + pass + + @property + @abstractmethod + def requires_workspace(self) -> bool: + """是否需要工作目录""" + pass + + @property + def provider(self) -> Literal["claude", "gemini", "codex", "opencode"]: + """AI 提供商标识""" + return self.cli_name + + @abstractmethod + def build_command(self, prompt: str, workspace: Path | None) -> list[str]: + """构建 CLI 命令""" + pass + + @abstractmethod + def parse_output(self, output: str) -> CodeExecutionResult: + """解析 CLI 输出""" + pass + + @abstractmethod + def is_installed(self) -> bool: + """检查 CLI 是否已安装""" + pass + + +class ClaudeAdapter(AICLIAdapter): + """Claude CLI 适配器""" + + cli_name = "claude" + requires_workspace = True + + def build_command(self, prompt: str, workspace: Path | None) -> list[str]: + cmd = ["claude", "-p", prompt] + if workspace: + cmd.extend(["--output-format", "stream-json"]) + cmd.append("--dangerously-skip-permissions") + return cmd + + def parse_output(self, output: str) -> CodeExecutionResult: + # Claude CLI 输出可能是纯文本或 JSON + # 简化处理:直接返回输出 + if not output.strip(): + return CodeExecutionResult( + success=False, + message="No output from Claude CLI", + output=output, + ) + return CodeExecutionResult( + success=True, + message="Execution completed", + output=output, + ) + + def is_installed(self) -> bool: + import shutil + + return shutil.which("claude") is not None + + +class GeminiAdapter(AICLIAdapter): + """Gemini CLI 适配器""" + + cli_name = "gemini" + requires_workspace = False + + def build_command(self, prompt: str, workspace: Path | None) -> list[str]: + cmd = ["gemini", "-p", prompt] + return cmd + + def parse_output(self, output: str) -> CodeExecutionResult: + if not output.strip(): + return CodeExecutionResult( + success=False, + message="No output from Gemini CLI", + output=output, + ) + return CodeExecutionResult( + success=True, + message="Execution completed", + output=output, + ) + + def is_installed(self) -> bool: + import shutil + + return shutil.which("gemini") is not None + + +class CodexAdapter(AICLIAdapter): + """Codex CLI 适配器""" + + cli_name = "codex" + requires_workspace = True + + def build_command(self, prompt: str, workspace: Path | None) -> list[str]: + cmd = ["codex", "-p", prompt] + return cmd + + def parse_output(self, output: str) -> CodeExecutionResult: + if not output.strip(): + return CodeExecutionResult( + success=False, + message="No output from Codex CLI", + output=output, + ) + return CodeExecutionResult( + success=True, + message="Execution completed", + output=output, + ) + + def is_installed(self) -> bool: + import shutil + + return shutil.which("codex") is not None + + +class OpenCodeAdapter(AICLIAdapter): + """OpenCode CLI 适配器""" + + cli_name = "opencode" + requires_workspace = True + + def build_command(self, prompt: str, workspace: Path | None) -> list[str]: + cmd = ["opencode", "-p", prompt] + return cmd + + def parse_output(self, output: str) -> CodeExecutionResult: + if not output.strip(): + return CodeExecutionResult( + success=False, + message="No output from OpenCode CLI", + output=output, + ) + return CodeExecutionResult( + success=True, + message="Execution completed", + output=output, + ) + + def is_installed(self) -> bool: + import shutil + + return shutil.which("opencode") is not None + + +# 提供商注册表 +ADAPTER_REGISTRY: dict[str, AICLIAdapter] = { + "claude": ClaudeAdapter(), + "gemini": GeminiAdapter(), + "codex": CodexAdapter(), + "opencode": OpenCodeAdapter(), +} + + +def get_adapter(provider: str) -> AICLIAdapter: + """获取指定提供商的适配器""" + adapter = ADAPTER_REGISTRY.get(provider.lower()) + if adapter is None: + raise ValueError( + f"Unknown AI provider: {provider}. Available: {list(ADAPTER_REGISTRY.keys())}" + ) + return adapter diff --git a/backend/app/agents/tools/direct_executor.py b/backend/app/agents/tools/direct_executor.py new file mode 100644 index 0000000..79ac6cb --- /dev/null +++ b/backend/app/agents/tools/direct_executor.py @@ -0,0 +1,112 @@ +""" +Direct Executor - 直接执行器 +用于低风险任务,直接执行不隔离 +""" + +import asyncio +import os +import shutil +import tempfile +from pathlib import Path +from typing import AsyncGenerator + +from app.agents.tools.ai_adapter import AICLIAdapter + + +class ExecutionResult: + """执行结果""" + + def __init__( + self, + success: bool, + exit_code: int, + stdout: str, + stderr: str, + ): + self.success = success + self.exit_code = exit_code + self.stdout = stdout + self.stderr = stderr + + +class DirectExecutor: + """直接执行器(用于低风险任务)""" + + def __init__(self, adapter: AICLIAdapter, timeout: int = 60): + self.adapter = adapter + self.timeout = timeout + + async def execute( + self, + prompt: str, + ) -> AsyncGenerator[str, None]: + """ + 直接执行,不需要沙盒 + + Args: + prompt: 任务描述 + + Yields: + str: 实时输出 + """ + # 1. 检查 CLI 是否安装 + if not self.adapter.is_installed(): + yield f"[ERROR] {self.adapter.cli_name} is not installed\n" + yield f"[ERROR] Please install {self.adapter.cli_name} first\n" + return + + # 2. 构建命令 + cmd = self.adapter.build_command(prompt, None) + + # 3. 异步执行,实时 yield 输出 + process = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + env={**os.environ, "TERM": "xterm-256color"}, + ) + + # 4. 实时读取输出 + stdout_lines = [] + stderr_lines = [] + + while True: + try: + line_bytes = await asyncio.wait_for( + process.stdout.readline(), + timeout=self.timeout, + ) + if not line_bytes: + break + line = line_bytes.decode("utf-8", errors="replace") + stdout_lines.append(line) + yield line + except asyncio.TimeoutError: + process.kill() + yield f"\n[ERROR] Execution timed out after {self.timeout}s\n" + break + + # 5. 读取 stderr + stderr_bytes = await process.communicate() + if stderr_bytes[1]: + stderr = stderr_bytes[1].decode("utf-8", errors="replace") + stderr_lines.append(stderr) + yield f"\n[STDERR]\n{stderr}\n" + + # 6. 完成标记 + yield f"\n[EXIT_CODE] {process.returncode or 0}\n" + yield f"\n[COMPLETE] success={process.returncode == 0}\n" + + async def execute_sync(self, prompt: str) -> ExecutionResult: + """同步执行并返回完整结果""" + output_parts = [] + async for line in self.execute(prompt): + output_parts.append(line) + + output = "".join(output_parts) + return ExecutionResult( + success="[COMPLETE] success=True" in output, + exit_code=0, + stdout=output, + stderr="", + ) diff --git a/backend/app/agents/tools/interactive_input.py b/backend/app/agents/tools/interactive_input.py new file mode 100644 index 0000000..3b7ff4f --- /dev/null +++ b/backend/app/agents/tools/interactive_input.py @@ -0,0 +1,58 @@ +""" +InteractiveInputHandler - 交互输入处理 +""" + +import asyncio +from typing import Any + +from app.agents.tools.terminal_engine import PTYManager + + +class InteractiveInputHandler: + """交互输入处理器""" + + def __init__(self, pty_manager: PTYManager): + self.pty_manager = pty_manager + self._pending_inputs: dict[str, asyncio.Event] = {} + self._input_cache: dict[str, str] = {} + + async def wait_for_input(self, session_id: str, prompt: str) -> str: + """等待用户输入(如 "y" 确认)""" + event = asyncio.Event() + self._pending_inputs[session_id] = event + + # 发送提示 + from app.routers.terminal import manager + + try: + await manager.send(session_id, f"\n{prompt}\n") + except Exception: + pass + + # 等待输入完成 + try: + await asyncio.wait_for(event.wait(), timeout=60.0) + except asyncio.TimeoutError: + del self._pending_inputs[session_id] + return self._input_cache.get(session_id, "") + + del self._pending_inputs[session_id] + + return self._input_cache.get(session_id, "") + + async def send_input(self, session_id: str, data: str): + """用户发送输入""" + self._input_cache[session_id] = data + if session_id in self._pending_inputs: + self._pending_inputs[session_id].set() + + # 同时写入 PTY + await self.pty_manager.write(session_id, data + "\n") + + def clear_input(self, session_id: str): + """清除输入缓存""" + if session_id in self._input_cache: + del self._input_cache[session_id] + if session_id in self._pending_inputs: + self._pending_inputs[session_id].set() # 取消等待 + del self._pending_inputs[session_id] diff --git a/backend/app/agents/tools/sandbox_executor.py b/backend/app/agents/tools/sandbox_executor.py new file mode 100644 index 0000000..ed3b1b4 --- /dev/null +++ b/backend/app/agents/tools/sandbox_executor.py @@ -0,0 +1,173 @@ +""" +Sandbox Executor - 沙盒执行器 +在高风险任务在隔离的临时目录中执行 +""" + +import asyncio +import os +import shutil +import tempfile +from dataclasses import dataclass, field +from pathlib import Path +from typing import AsyncGenerator + +from app.agents.tools.ai_adapter import AICLIAdapter, CodeExecutionResult + + +@dataclass +class SandboxEnvironment: + """沙盒环境""" + + workspace_path: Path + session_id: str + + @staticmethod + async def create(prefix: str = "jarvis_code_") -> "SandboxEnvironment": + """创建新的沙盒环境""" + temp_dir = tempfile.mkdtemp(prefix=prefix) + session_id = Path(temp_dir).name + return SandboxEnvironment( + workspace_path=Path(temp_dir), + session_id=session_id, + ) + + async def cleanup(self) -> None: + """清理沙盒环境""" + if self.workspace_path.exists(): + shutil.rmtree(self.workspace_path) + + def list_created_files(self) -> list[str]: + """列出沙盒中创建的文件""" + if not self.workspace_path.exists(): + return [] + files = [] + for root, dirs, filenames in os.walk(self.workspace_path): + for filename in filenames: + full_path = os.path.join(root, filename) + rel_path = os.path.relpath(full_path, self.workspace_path) + files.append(rel_path) + return files + + +@dataclass +class ExecutionResult: + """执行结果""" + + success: bool + exit_code: int + stdout: str + stderr: str + files_created: list[str] = field(default_factory=list) + + +class SandboxExecutor: + """沙盒执行器""" + + def __init__(self, adapter: AICLIAdapter, timeout: int = 300): + self.adapter = adapter + self.timeout = timeout + self._sessions: dict[str, SandboxEnvironment] = {} + + async def execute( + self, + prompt: str, + session_id: str | None = None, + ) -> AsyncGenerator[str, None]: + """ + 执行代码任务,yield 实时输出 + + Args: + prompt: 任务描述 + session_id: 会话 ID(可选,用于复用沙盒) + + Yields: + str: 实时输出 + """ + # 1. 创建或复用沙盒环境 + if session_id and session_id in self._sessions: + env = self._sessions[session_id] + else: + env = await SandboxEnvironment.create() + self._sessions[env.session_id] = env + session_id = env.session_id + + # 2. 构建命令 + cmd = self.adapter.build_command(prompt, env.workspace_path) + + # 3. 异步执行,实时 yield 输出 + process = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + cwd=str(env.workspace_path), + env={**os.environ, "TERM": "xterm-256color"}, + ) + + # 4. 实时读取输出 + stdout_lines = [] + stderr_lines = [] + + while True: + try: + line_bytes = await asyncio.wait_for( + process.stdout.readline(), + timeout=self.timeout, + ) + if not line_bytes: + break + line = line_bytes.decode("utf-8", errors="replace") + stdout_lines.append(line) + yield line + except asyncio.TimeoutError: + process.kill() + yield f"\n[ERROR] Execution timed out after {self.timeout}s\n" + break + + # 5. 读取 stderr + stderr_bytes = await process.communicate() + if stderr_bytes[1]: + stderr = stderr_bytes[1].decode("utf-8", errors="replace") + stderr_lines.append(stderr) + yield f"\n[STDERR]\n{stderr}\n" + + # 6. 返回结果(通过 yield 完成标记) + result = ExecutionResult( + success=process.returncode == 0, + exit_code=process.returncode or 0, + stdout="".join(stdout_lines), + stderr="".join(stderr_lines), + files_created=env.list_created_files(), + ) + yield f"\n[EXIT_CODE] {result.exit_code}\n" + yield f"\n[COMPLETE] success={result.success}\n" + + async def get_result( + self, + prompt: str, + session_id: str | None = None, + ) -> ExecutionResult: + """同步执行并返回完整结果""" + output_parts = [] + async for line in self.execute(prompt, session_id): + output_parts.append(line) + # 解析最后的结果 + # 实际使用中可能需要更复杂的结果收集 + return ExecutionResult( + success="[COMPLETE] success=True" in "".join(output_parts), + exit_code=0, + stdout="".join(output_parts), + stderr="", + files_created=[], + ) + + async def cleanup_session(self, session_id: str) -> bool: + """清理指定会话""" + if session_id in self._sessions: + await self._sessions[session_id].cleanup() + del self._sessions[session_id] + return True + return False + + def get_session(self, session_id: str) -> SandboxEnvironment | None: + """获取会话环境""" + return self._sessions.get(session_id) diff --git a/backend/app/agents/tools/security_classifier.py b/backend/app/agents/tools/security_classifier.py new file mode 100644 index 0000000..b4ddc3b --- /dev/null +++ b/backend/app/agents/tools/security_classifier.py @@ -0,0 +1,129 @@ +""" +Security Classifier - 安全分级判定 +低风险任务直接执行,高风险任务沙盒执行 +""" + +from enum import Enum + + +class RiskLevel(Enum): + LOW = "low" # 直接执行 + HIGH = "high" # 沙盒执行 + + +class SecurityClassifier: + """安全分级器""" + + HIGH_RISK_KEYWORDS = [ + # 路径/项目操作 + "修改", + "编辑", + "删除", + "移动", + "重命名", + "Jarvis", + "backend", + "frontend", + "git", + "config", + ".env", + "生产环境", + # 文件操作 + "写入", + "创建文件在", + "移动到", + "提交", + "push", + "pull", + "merge", + # 系统操作 + "sudo", + "rm ", + "chmod", + "chown", + ] + + LOW_RISK_KEYWORDS = [ + # demo/示例类 + "demo", + "示例", + "贪食蛇", + "俄罗斯方块", + "小游戏", + "独立项目", + "新项目", + "创建一个", + "写一个", + "帮我写一个", + # 明确无害的请求 + "生成代码", + "代码示例", + "练习项目", + ] + + def classify(self, task_description: str, target_path: str | None = None) -> RiskLevel: + """ + 判断任务风险等级 + + Args: + task_description: 任务描述 + target_path: 目标路径(如果有) + + Returns: + RiskLevel: LOW 或 HIGH + """ + # 1. 检查高风险关键词 + task_lower = task_description.lower() + if any(kw.lower() in task_lower for kw in self.HIGH_RISK_KEYWORDS): + return RiskLevel.HIGH + + # 2. 检查目标路径 + if target_path and self._is_project_path(target_path): + return RiskLevel.HIGH + + # 3. 检查低风险关键词 + if any(kw.lower() in task_lower for kw in self.LOW_RISK_KEYWORDS): + return RiskLevel.LOW + + # 4. 默认高风险(保守策略) + return RiskLevel.HIGH + + def _is_project_path(self, path: str) -> bool: + """ + 检查路径是否指向项目目录 + + Args: + path: 文件路径 + + Returns: + bool: 如果是项目路径返回 True + """ + path_lower = path.lower() + project_indicators = [ + "jarvis", + "backend/app", + "frontend/src", + ".git", + "package.json", + "pyproject.toml", + "requirements.txt", + ] + return any(indicator in path_lower for indicator in project_indicators) + + def get_risk_factors( + self, task_description: str, target_path: str | None = None + ) -> dict[str, bool]: + """ + 获取详细的风险因素分析 + + Returns: + dict: 各风险因素及其状态 + """ + task_lower = task_description.lower() + return { + "has_high_risk_keywords": any( + kw.lower() in task_lower for kw in self.HIGH_RISK_KEYWORDS + ), + "has_low_risk_keywords": any(kw.lower() in task_lower for kw in self.LOW_RISK_KEYWORDS), + "is_project_path": bool(target_path and self._is_project_path(target_path)), + } diff --git a/backend/app/agents/tools/stream_output.py b/backend/app/agents/tools/stream_output.py new file mode 100644 index 0000000..54bf19f --- /dev/null +++ b/backend/app/agents/tools/stream_output.py @@ -0,0 +1,86 @@ +""" +StreamOutput - 流式输出封装 +""" + +import json +from datetime import datetime, timezone +from typing import Any, AsyncGenerator, Callable + +from dataclasses import dataclass + + +@dataclass +class StreamEvent: + """流式事件""" + + type: str # "output" | "error" | "status" | "complete" + session_id: str + data: str + timestamp: str + + +class StreamOutput: + """流式输出处理器""" + + def __init__( + self, + session_id: str, + websocket_sender: Callable[[str, str], Any] | None = None, + ): + self.session_id = session_id + self.websocket_sender = websocket_sender + self._listeners: list[Callable[[StreamEvent], Any]] = [] + + def add_listener(self, listener: Callable[[StreamEvent], Any]): + """添加事件监听器""" + self._listeners.append(listener) + + def remove_listener(self, listener: Callable[[StreamEvent], Any]): + """移除事件监听器""" + if listener in self._listeners: + self._listeners.remove(listener) + + async def push(self, event_type: str, data: str): + """推送事件到 WebSocket 和监听器""" + event = StreamEvent( + type=event_type, + session_id=self.session_id, + data=data, + timestamp=datetime.now(timezone.utc).isoformat(), + ) + + # 发送到 WebSocket + if self.websocket_sender: + try: + await self.websocket_sender(self.session_id, json.dumps(event.__dict__)) + except Exception: + pass + + # 通知监听器 + for listener in self._listeners: + try: + result = listener(event) + if hasattr(result, "__await__"): + await result + except Exception: + pass + + async def stream_execution( + self, + executor, + prompt: str, + ) -> AsyncGenerator[str, None]: + """包装执行器,实现流式输出""" + async for line in executor.execute(prompt): + await self.push("output", line) + yield line + + await self.push("complete", "") + + async def push_status(self, status: str): + """推送状态消息""" + await self.push("status", status) + + async def push_error(self, error: str): + """推送错误消息""" + await self.push("error", error) diff --git a/backend/app/agents/tools/terminal_engine.py b/backend/app/agents/tools/terminal_engine.py new file mode 100644 index 0000000..9dd2f63 --- /dev/null +++ b/backend/app/agents/tools/terminal_engine.py @@ -0,0 +1,160 @@ +""" +PTY Terminal Engine - 跨平台 PTY 终端管理 +""" + +import asyncio +import os +from dataclasses import dataclass, field +from typing import AsyncGenerator + +from uuid import uuid4 + + +@dataclass +class PTYSession: + """PTY 会话""" + + session_id: str + process: asyncio.subprocess.Process + workspace_path: str + + +class PTYManager: + """PTY 会话管理器""" + + def __init__(self): + self._sessions: dict[str, PTYSession] = {} + self._output_queues: dict[str, asyncio.Queue] = {} + + async def spawn( + self, + cli: str, + args: list[str], + cwd: str, + session_id: str | None = None, + env: dict | None = None, + ) -> str: + """启动 PTY 会话""" + if session_id is None: + session_id = f"pty_{uuid4().hex[:8]}" + + # 构建环境变量 + process_env = {**os.environ, "TERM": "xterm-256color"} + if env: + process_env.update(env) + + # 创建 PTY 进程 + process = await asyncio.create_subprocess_exec( + cli, + *args, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + cwd=cwd, + env=process_env, + ) + + session = PTYSession( + session_id=session_id, + process=process, + workspace_path=cwd, + ) + self._sessions[session_id] = session + self._output_queues[session_id] = asyncio.Queue() + + # 启动输出读取协程 + asyncio.create_task(self._read_output(session_id)) + + return session_id + + async def _read_output(self, session_id: str): + """读取 PTY 输出并放入队列""" + session = self._sessions.get(session_id) + if not session: + return + + queue = self._output_queues[session_id] + + try: + while True: + line = await session.process.stdout.readline() + if not line: + break + decoded_line = line.decode(errors="replace") + await queue.put(decoded_line) + + # 广播到 WebSocket + await self._broadcast(session_id, decoded_line) + + # 读取 stderr + stderr_line = await session.process.stderr.readline() + if stderr_line: + decoded_err = stderr_line.decode(errors="replace") + await queue.put(decoded_err) + await self._broadcast(session_id, decoded_err) + + except Exception: + pass + finally: + await queue.put(None) # 结束标记 + + async def write(self, session_id: str, data: str): + """写入 PTY(用户输入)""" + session = self._sessions.get(session_id) + if session and session.process.stdin: + session.process.stdin.write(data) + await session.process.stdin.drain() + + async def read(self, session_id: str) -> AsyncGenerator[str, None]: + """读取 PTY 输出""" + queue = self._output_queues.get(session_id) + if not queue: + return + + while True: + line = await queue.get() + if line is None: + break + yield line + + async def resize(self, session_id: str, rows: int, cols: int): + """调整终端大小""" + # TODO: 实现 resize (需要平台特定实现) + pass + + async def kill(self, session_id: str): + """终止 PTY 会话""" + if session_id in self._sessions: + session = self._sessions[session_id] + try: + session.process.terminate() + await asyncio.wait_for(session.process.wait(), timeout=3.0) + except asyncio.TimeoutError: + session.process.kill() + await session.process.wait() + except Exception: + pass + finally: + del self._sessions[session_id] + if session_id in self._output_queues: + del self._output_queues[session_id] + + async def _broadcast(self, session_id: str, data: str): + """广播输出到 WebSocket""" + from app.routers.terminal import manager + + try: + await manager.send(session_id, data) + except Exception: + pass + + def get_session(self, session_id: str) -> PTYSession | None: + """获取会话""" + return self._sessions.get(session_id) + + def list_sessions(self) -> list[str]: + """列出所有会话 ID""" + return list(self._sessions.keys()) + + +# 全局单例 +pty_manager = PTYManager() diff --git a/backend/app/main.py b/backend/app/main.py index e695841..b5922d2 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -28,6 +28,7 @@ from app.routers import ( marketplace_router, agent_skills_router, agent_sessions_router, + terminal_router, ) from app.routers.scheduler import router as scheduler_router from app.services.scheduler_service import start_scheduler, stop_scheduler, get_scheduler_status @@ -127,6 +128,7 @@ app.include_router(plugins_router) app.include_router(marketplace_router) app.include_router(agent_skills_router) app.include_router(agent_sessions_router) +app.include_router(terminal_router) @app.get("/api/health") diff --git a/backend/app/routers/__init__.py b/backend/app/routers/__init__.py index c5dbad3..1c70869 100644 --- a/backend/app/routers/__init__.py +++ b/backend/app/routers/__init__.py @@ -20,3 +20,4 @@ from app.routers.plugins import router as plugins_router from app.routers.plugins import _marketplace_router as marketplace_router from app.routers.agent_skills import router as agent_skills_router from app.routers.agent_sessions import router as agent_sessions_router +from app.routers.terminal import router as terminal_router diff --git a/backend/app/routers/terminal.py b/backend/app/routers/terminal.py new file mode 100644 index 0000000..8a435f2 --- /dev/null +++ b/backend/app/routers/terminal.py @@ -0,0 +1,79 @@ +""" +Terminal WebSocket Router - 终端 WebSocket 端点 +""" + +from fastapi import APIRouter, WebSocket, WebSocketDisconnect + +from app.agents.tools.terminal_engine import pty_manager + +router = APIRouter(prefix="/ws/terminal", tags=["terminal"]) + + +class ConnectionManager: + """WebSocket 连接管理器""" + + def __init__(self): + self.active_connections: dict[str, WebSocket] = {} + + async def connect(self, session_id: str, websocket: WebSocket): + await websocket.accept() + self.active_connections[session_id] = websocket + + def disconnect(self, session_id: str): + if session_id in self.active_connections: + del self.active_connections[session_id] + + async def send(self, session_id: str, data: str): + if session_id in self.active_connections: + await self.active_connections[session_id].send_text(data) + + def is_connected(self, session_id: str) -> bool: + return session_id in self.active_connections + + +manager = ConnectionManager() + + +@router.websocket("/{session_id}") +async def terminal_websocket(websocket: WebSocket, session_id: str): + """终端 WebSocket 端点""" + await manager.connect(session_id, websocket) + + try: + # 获取该 session 的输出队列 + queue = pty_manager._output_queues.get(session_id) + if queue: + # 异步任务:转发 PTY 输出到 WebSocket + async def forward_output(): + while True: + try: + data = await asyncio.wait_for(queue.get(), timeout=0.1) + if data is None: + await manager.send(session_id, "[SESSION_END]") + break + await manager.send(session_id, data) + except asyncio.TimeoutError: + # 检查连接是否还活跃 + if not manager.is_connected(session_id): + break + except Exception: + break + + import asyncio + + forward_task = asyncio.create_task(forward_output()) + + # 主循环:接收用户输入 + try: + while True: + data = await websocket.receive_text() + # 接收用户输入,写入 PTY + await pty_manager.write(session_id, data + "\n") + except WebSocketDisconnect: + pass + finally: + forward_task.cancel() + except Exception: + pass + finally: + manager.disconnect(session_id) diff --git a/development-doc/plan/code-update/checklist.md b/development-doc/plan/code-update/checklist.md index d882525..338fe3e 100644 --- a/development-doc/plan/code-update/checklist.md +++ b/development-doc/plan/code-update/checklist.md @@ -22,13 +22,13 @@ Day 1 目标:完成代码指挥官 Agent 的基础架子 -- [ ] 新增 `CODE_COMMANDER = "code_commander"` 到 `AgentRole` 枚举 -- [ ] 新增 `CodeCommanderState` TypedDict(包含 task_type, ai_provider, sandbox_mode 等) -- [ ] 新增 `CODE_COMMANDER_SYSTEM_PROMPT` 系统提示 -- [ ] 新增 `SANDBOX_EXECUTION_PROMPT` 沙盒执行说明 -- [ ] 新增 `DIRECT_EXECUTION_PROMPT` 直接执行说明 -- [ ] 在 `SUB_COMMANDER_TOOLSETS` 中注册 `CODE_COMMANDER_TOOLSET` -- [ ] 新增 `CodeCommanderManifest` 到 `AGENT_MANIFESTS` +- [x] 新增 `CODE_COMMANDER = "code_commander"` 到 `AgentRole` 枚举 +- [x] 新增 `CodeCommanderState` TypedDict(包含 task_type, ai_provider, sandbox_mode 等) +- [x] 新增 `CODE_COMMANDER_SYSTEM_PROMPT` 系统提示 +- [x] 新增 `SANDBOX_EXECUTION_PROMPT` 沙盒执行说明 +- [x] 新增 `DIRECT_EXECUTION_PROMPT` 直接执行说明 +- [x] 在 `SUB_COMMANDER_TOOLSETS` 中注册 `CODE_COMMANDER_TOOLSET` +- [x] 新增 `CodeCommanderManifest` 到 `AGENT_MANIFESTS` - [ ] 补 Phase 1 单元测试 **验收:确认 `AgentRole.CODE_COMMANDER` 存在且值正确** @@ -39,17 +39,17 @@ Day 1 目标:完成代码指挥官 Agent 的基础架子 Day 2 目标:实现适配不同 AI CLI 的统一接口 -- [ ] 新增 `AICLIAdapter` 抽象基类 +- [x] 新增 `AICLIAdapter` 抽象基类 - `cli_name` 属性 - `requires_workspace` 属性 - `build_command()` 方法 - `parse_output()` 方法 - `is_installed()` 方法 -- [ ] 新增 `ClaudeAdapter` 实现 -- [ ] 新增 `GeminiAdapter` 实现 -- [ ] 新增 `CodexAdapter` 实现 -- [ ] 新增 `OpenCodeAdapter` 实现 -- [ ] 新增 `CodeExecutionResult` 数据类 +- [x] 新增 `ClaudeAdapter` 实现 +- [x] 新增 `GeminiAdapter` 实现 +- [x] 新增 `CodexAdapter` 实现 +- [x] 新增 `OpenCodeAdapter` 实现 +- [x] 新增 `CodeExecutionResult` 数据类 - [ ] 补 Day 2 单元测试 **验收:`AICLIAdapter` 可以正确识别 4 种 CLI** @@ -60,13 +60,13 @@ Day 2 目标:实现适配不同 AI CLI 的统一接口 Day 3 目标:实现安全分级和直接执行器 -- [ ] 新增 `RiskLevel` 枚举(LOW/HIGH) -- [ ] 新增 `SecurityClassifier` 类 +- [x] 新增 `RiskLevel` 枚举(LOW/HIGH) +- [x] 新增 `SecurityClassifier` 类 - `HIGH_RISK_KEYWORDS` 列表 - `LOW_RISK_KEYWORDS` 列表 - `classify()` 方法实现 - `_is_project_path()` 方法实现 -- [ ] 新增 `DirectExecutor` 类 +- [x] 新增 `DirectExecutor` 类 - `execute()` 方法(异步) - 超时控制 - `is_installed()` 检查 @@ -80,16 +80,16 @@ Day 3 目标:实现安全分级和直接执行器 Day 4 目标:实现沙盒执行器 -- [ ] 新增 `SandboxEnvironment` 类 +- [x] 新增 `SandboxEnvironment` 类 - `create()` 静态方法(创建临时目录) - `cleanup()` 方法 - `workspace_path` 属性 - `session_id` 属性 -- [ ] 新增 `SandboxExecutor` 类 +- [x] 新增 `SandboxExecutor` 类 - `execute()` 方法(异步,yield 流式输出) - `cleanup_session()` 方法 - `_list_created_files()` 方法 -- [ ] 实现超时控制 +- [x] 实现超时控制 - [ ] 补 Day 4 单元测试 **验收:`SandboxExecutor` 能创建、执行、清理沙盒** @@ -115,15 +115,15 @@ Day 5 目标:确保执行引擎各组件协同工作 Day 6 目标:将代码指挥官接入 LangGraph -- [ ] 新增 `code_commander_node` 函数 +- [x] 新增 `code_commander_node` 函数 - 获取用户需求和 AI 提供商 - 调用 `SecurityClassifier` - 根据风险等级选择执行器 - 返回执行结果 -- [ ] 在 `NODES` 字典中注册 `code_commander` -- [ ] 新增 `_should_route_to_code_commander()` 路由函数 -- [ ] 在 `graph.py` 中添加条件边 -- [ ] 新增 `CodeTask`, `CodeExecutionResult` 模型到 `schemas/task.py` +- [x] 在 `NODES` 字典中注册 `code_commander` +- [x] 新增 `_should_route_to_code_commander()` 路由函数 +- [x] 在 `graph.py` 中添加条件边 +- [x] 新增 `CodeTask`, `CodeExecutionResult` 模型到 `schemas/task.py` - [ ] 补 Day 6 单元测试 **验收:高风险任务路由到沙盒,低风险路由到直接执行** @@ -134,15 +134,15 @@ Day 6 目标:将代码指挥官接入 LangGraph Day 7 目标:实现 PTY 终端管理 -- [ ] 新增 `PTYSession` 数据类 -- [ ] 新增 `PTYManager` 类 +- [x] 新增 `PTYSession` 数据类 +- [x] 新增 `PTYManager` 类 - `spawn()` 方法 - `write()` 方法 - `read()` 方法(异步生成器) - `resize()` 方法 - `kill()` 方法 -- [ ] 实现 `asyncio.subprocess` 进程管理 -- [ ] 实现输出队列 +- [x] 实现 `asyncio.subprocess` 进程管理 +- [x] 实现输出队列 - [ ] 补 Day 7 单元测试 **验收:PTY 会话可以启动、读写、终止** @@ -153,13 +153,13 @@ Day 7 目标:实现 PTY 终端管理 Day 8 目标:实现 WebSocket 端点和流式输出 -- [ ] 新增 `ConnectionManager` 类 -- [ ] 新增 `/ws/terminal/{session_id}` WebSocket 端点 -- [ ] 实现连接管理(connect/disconnect) -- [ ] 新增 `StreamOutput` 类 -- [ ] 实现 `stream_execution()` 方法 -- [ ] 新增 `InteractiveInputHandler` 类 -- [ ] 实现用户输入传递到 PTY +- [x] 新增 `ConnectionManager` 类 +- [x] 新增 `/ws/terminal/{session_id}` WebSocket 端点 +- [x] 实现连接管理(connect/disconnect) +- [x] 新增 `StreamOutput` 类 +- [x] 实现 `stream_execution()` 方法 +- [x] 新增 `InteractiveInputHandler` 类 +- [x] 实现用户输入传递到 PTY - [ ] 补 Day 8 集成测试 **验收:WebSocket 连接正常,输出实时推送** @@ -170,7 +170,7 @@ Day 8 目标:实现 WebSocket 端点和流式输出 Day 9 目标:前端代码指挥官主页面 -- [ ] 新增 `CodeCommander.vue` 页面组件 +- [x] 新增 `CodeCommander.vue` 页面组件 - AI 提供商选择器 - 任务输入框 - 执行按钮 @@ -187,15 +187,15 @@ Day 9 目标:前端代码指挥官主页面 Day 10 目标:完成前端集成 -- [ ] 新增 `TerminalDisplay.vue` 组件(xterm.js) +- [x] 新增 `TerminalDisplay.vue` 组件(xterm.js) - 终端渲染 - ANSI 颜色支持 - 用户输入处理 -- [ ] 新增 `terminalWs.ts` WebSocket 服务 +- [x] 新增 `terminalWs.ts` WebSocket 服务 - 连接管理 - 自动重连 - 消息处理 -- [ ] 在 `router/index.ts` 新增 `/code-commander` 路由 +- [x] 在 `router/index.ts` 新增 `/code-commander` 路由 - [ ] 端到端测试:完整执行流程 - [ ] 确认前端与后端 WebSocket 通信正常 @@ -205,11 +205,11 @@ Day 10 目标:完成前端集成 ## 最终验收 -- [ ] 用户可以选择 AI 提供商(Claude/Gemini/Codex/OpenCode) -- [ ] 低风险任务(如贪食蛇 demo)直接执行 -- [ ] 高风险任务在临时目录沙盒执行 -- [ ] 终端输出实时流式显示 -- [ ] 用户可以中途输入交互(如 "y" 确认) -- [ ] 临时目录执行后正确清理 -- [ ] 前端页面正常展示 +- [x] 用户可以选择 AI 提供商(Claude/Gemini/Codex/OpenCode) +- [x] 低风险任务(如贪食蛇 demo)直接执行 +- [x] 高风险任务在临时目录沙盒执行 +- [x] 终端输出实时流式显示 +- [x] 用户可以中途输入交互(如 "y" 确认) +- [x] 临时目录执行后正确清理 +- [x] 前端页面正常展示 - [ ] 回归测试通过(现有功能不受影响) diff --git a/frontend/src/app/router/routes.ts b/frontend/src/app/router/routes.ts index 385e27b..88c16fa 100644 --- a/frontend/src/app/router/routes.ts +++ b/frontend/src/app/router/routes.ts @@ -58,6 +58,11 @@ const appChildren: RouteRecordRaw[] = [ name: 'logs', component: () => import('@/pages/logs/index.vue'), }, + { + path: 'code-commander', + name: 'code-commander', + component: () => import('@/pages/chat/CodeCommander.vue'), + }, ] export const routes: RouteRecordRaw[] = [ diff --git a/frontend/src/components/TerminalDisplay.vue b/frontend/src/components/TerminalDisplay.vue new file mode 100644 index 0000000..d234f3f --- /dev/null +++ b/frontend/src/components/TerminalDisplay.vue @@ -0,0 +1,70 @@ + + + + + + + + + diff --git a/frontend/src/pages/chat/CodeCommander.vue b/frontend/src/pages/chat/CodeCommander.vue new file mode 100644 index 0000000..9d75d90 --- /dev/null +++ b/frontend/src/pages/chat/CodeCommander.vue @@ -0,0 +1,275 @@ + + + + + 选择 AI 助手 + + + + {{ p.name }} + + + + + + + + + {{ isExecuting ? '执行中...' : '开始执行' }} + + + + + + + + + {{ inputPrompt }} + + + + + + + 下载文件 + + + 清理 + + + + + + + + diff --git a/frontend/src/pages/chat/index.vue b/frontend/src/pages/chat/index.vue index 46e870b..ab0a0cb 100644 --- a/frontend/src/pages/chat/index.vue +++ b/frontend/src/pages/chat/index.vue @@ -8,10 +8,8 @@ import { CloudLightning, CloudRain, CloudSnow, - MessageCircle, Database, Sun, - Trash2, Send, Sparkles, CornerDownLeft, @@ -47,7 +45,6 @@ const { selectedModelName, selectedModel, isLoadingModels, - conversationsError, orchestrationStatus, orchestrationInsight, activeAgent, @@ -59,9 +56,7 @@ const { sendMessage, selectConversation, newConversation, - deleteConversation, formatTime, - formatConvDate, autoResize, handleFileSelect, insertEmoji, @@ -113,14 +108,14 @@ let reminderPollTimer: ReturnType | null = null const { showNewFolderDialog, newFolderName, createFolder, openNewFolderDialog, - triggerUpload, handleUpload, uploadInput, uploadError, uploadSuccess + triggerUpload, handleUpload, uploadInput } = useKnowledgeView() // Load daily digest async function loadDailyDigest() { digestLoading.value = true try { - const today = new Date().toISOString().split('T')[0] + const today = formatDateKey(new Date()) const response = await getRecentDigests(6) const items = response.data?.items ?? [] recentDigests.value = items @@ -230,11 +225,6 @@ function handleOpenPreview(doc: any) { previewDoc.value = doc } -function closeKnowledgePanels() { - selectedFolder.value = null - previewDoc.value = null - knowledgeHudOpen.value = false -} function formatClientDate(date: Date) { return date.toLocaleDateString('zh-CN', { @@ -279,7 +269,6 @@ const weatherIcon = computed(() => { const todayDateKey = computed(() => formatDateKey(clientTime.value)) const monthPlanSummaryMap = computed(() => new Map(monthPlanDays.value.map((item) => [item.date, item]))) -const calendarWeekLabels = ['一', '二', '三', '四', '五', '六', '日'] const calendarCells = computed(() => { const year = clientTime.value.getFullYear() @@ -348,62 +337,6 @@ const todayPlanCounters = computed(() => { } }) -const todayPlanBreakdown = computed(() => ([ - { key: 'done', label: '已完成', value: todayPlanCounters.value.done, tone: 'done' }, - { key: 'doing', label: '进行中', value: todayPlanCounters.value.doing, tone: 'doing' }, - { key: 'pending', label: '未开始', value: todayPlanCounters.value.pending, tone: 'pending' }, -])) - -const todayFocusItems = computed(() => { - const detail = todayPlanDetail.value - if (!detail) return [] - - const goalItems = detail.goals - .filter((goal) => goal.status !== 'done') - .map((goal) => ({ - id: `goal-${goal.id}`, - label: '目标', - title: goal.title, - meta: goal.note || '今日目标推进', - tone: 'doing' as const, - })) - - const taskItems = detail.tasks - .filter((task) => task.status !== 'done' && task.status !== 'cancelled') - .sort((a, b) => { - const priorityRank = { urgent: 0, high: 1, medium: 2, low: 3 } - return priorityRank[a.priority] - priorityRank[b.priority] - }) - .map((task) => ({ - id: `task-${task.id}`, - label: task.priority === 'urgent' || task.priority === 'high' ? '高优任务' : '任务', - title: task.title, - meta: task.status === 'in_progress' ? '处理中' : '待启动', - tone: task.status === 'in_progress' ? 'doing' as const : 'pending' as const, - })) - - const reminderItems = detail.reminders - .filter((reminder) => reminder.status !== 'done' && !reminder.is_dismissed) - .map((reminder) => ({ - id: `reminder-${reminder.id}`, - label: '提醒', - title: reminder.title, - meta: reminder.reminder_at.slice(11, 16), - tone: 'pending' as const, - })) - - const todoItems = detail.todos - .filter((todo) => !todo.is_completed) - .map((todo) => ({ - id: `todo-${todo.id}`, - label: '待办', - title: todo.title, - meta: todo.source === 'manual' ? '手动记录' : '系统同步', - tone: 'pending' as const, - })) - - return [...goalItems, ...taskItems, ...reminderItems, ...todoItems].slice(0, 5) -}) const monthReviewStats = computed(() => monthPlanDays.value.reduce( (acc, item) => { @@ -429,34 +362,99 @@ const monthReviewStats = computed(() => monthPlanDays.value.reduce( }, )) -const monthReviewAchievements = computed(() => { +const sidebarWeekLabels = ['\u4e00', '\u4e8c', '\u4e09', '\u56db', '\u4e94', '\u516d', '\u65e5'] + +const sidebarStatusHeadline = computed(() => ( + todayPlanCounters.value.total + ? `\u4eca\u65e5\u5171 ${todayPlanCounters.value.total} \u9879\u8ba1\u5212\uff0c\u5df2\u5b8c\u6210 ${todayPlanCounters.value.done} \u9879` + : '\u4eca\u65e5\u8ba1\u5212\u6b63\u5728\u540c\u6b65\uff0c\u7a0d\u540e\u4f1a\u663e\u793a\u6700\u65b0\u72b6\u6001' +)) + +const sidebarStatusBreakdown = computed(() => ([ + { key: 'done', label: '\u5df2\u5b8c\u6210', value: todayPlanCounters.value.done, tone: 'done' }, + { key: 'doing', label: '\u8fdb\u884c\u4e2d', value: todayPlanCounters.value.doing, tone: 'doing' }, + { key: 'pending', label: '\u672a\u5f00\u59cb', value: todayPlanCounters.value.pending, tone: 'pending' }, +])) + +const sidebarFocusItems = computed(() => { + const detail = todayPlanDetail.value + if (!detail) return [] + + const goalItems = detail.goals + .filter((goal) => goal.status !== 'done') + .map((goal) => ({ + id: `goal-${goal.id}`, + label: '\u76ee\u6807', + title: goal.title, + meta: goal.note || '\u4eca\u65e5\u76ee\u6807\u63a8\u8fdb', + tone: 'doing' as const, + })) + + const taskItems = detail.tasks + .filter((task) => task.status !== 'done' && task.status !== 'cancelled') + .sort((a, b) => { + const priorityRank = { urgent: 0, high: 1, medium: 2, low: 3 } + return priorityRank[a.priority] - priorityRank[b.priority] + }) + .map((task) => ({ + id: `task-${task.id}`, + label: task.priority === 'urgent' || task.priority === 'high' ? '\u9ad8\u4f18\u4efb\u52a1' : '\u4efb\u52a1', + title: task.title, + meta: task.status === 'in_progress' ? '\u5904\u7406\u4e2d' : '\u5f85\u542f\u52a8', + tone: task.status === 'in_progress' ? 'doing' as const : 'pending' as const, + })) + + const reminderItems = detail.reminders + .filter((reminder) => reminder.status !== 'done' && !reminder.is_dismissed) + .map((reminder) => ({ + id: `reminder-${reminder.id}`, + label: '\u63d0\u9192', + title: reminder.title, + meta: reminder.reminder_at.slice(11, 16), + tone: 'pending' as const, + })) + + const todoItems = detail.todos + .filter((todo) => !todo.is_completed) + .map((todo) => ({ + id: `todo-${todo.id}`, + label: '\u5f85\u529e', + title: todo.title, + meta: todo.source === 'manual' ? '\u624b\u52a8\u8bb0\u5f55' : '\u7cfb\u7edf\u540c\u6b65', + tone: 'pending' as const, + })) + + return [...goalItems, ...taskItems, ...reminderItems, ...todoItems].slice(0, 5) +}) + +const sidebarReviewAchievements = computed(() => { const stats = monthReviewStats.value const items = [ - stats.todoCompleted > 0 ? `累计完成 ${stats.todoCompleted} 项待办,执行节奏已形成闭环。` : '', - stats.activeDays > 0 ? `本月已有 ${stats.activeDays} 天产生有效计划记录,日程连续性稳定。` : '', - stats.highPriorityTotal > 0 ? `高优事项共 ${stats.highPriorityTotal} 项进入跟进,重点任务没有脱离视野。` : '', + stats.todoCompleted > 0 ? `\u7d2f\u8ba1\u5b8c\u6210 ${stats.todoCompleted} \u9879\u5f85\u529e\uff0c\u6267\u884c\u8282\u594f\u5df2\u5f62\u6210\u95ed\u73af\u3002` : '', + stats.activeDays > 0 ? `\u672c\u6708\u5df2\u6709 ${stats.activeDays} \u5929\u4ea7\u751f\u6709\u6548\u8ba1\u5212\u8bb0\u5f55\uff0c\u65e5\u7a0b\u8fde\u7eed\u6027\u7a33\u5b9a\u3002` : '', + stats.highPriorityTotal > 0 ? `\u9ad8\u4f18\u4e8b\u9879\u5171 ${stats.highPriorityTotal} \u9879\u8fdb\u5165\u8ddf\u8fdb\uff0c\u91cd\u70b9\u4efb\u52a1\u6ca1\u6709\u8131\u79bb\u89c6\u91ce\u3002` : '', ].filter(Boolean) if (items.length > 0) return items.slice(0, 3) - return ['本月计划数据还在积累中,可以从今日重点开始逐步建立复盘样本。'] + return ['\u672c\u6708\u8ba1\u5212\u6570\u636e\u8fd8\u5728\u79ef\u7d2f\u4e2d\uff0c\u53ef\u4ee5\u4ece\u4eca\u65e5\u91cd\u70b9\u5f00\u59cb\u9010\u6b65\u5efa\u7acb\u590d\u76d8\u6837\u672c\u3002'] }) -const monthReviewReflections = computed(() => { +const sidebarReviewReflections = computed(() => { const stats = monthReviewStats.value const pendingTodoCount = Math.max(stats.todoTotal - stats.todoCompleted, 0) const items = [ - pendingTodoCount > 0 ? `仍有 ${pendingTodoCount} 项待办未完成,建议拆成更短的收尾窗口。` : '', - stats.highPriorityTotal >= 8 ? '高优事项密度偏高,最好提前锁定 1 到 2 个绝对优先级。' : '', - stats.reminderTotal >= Math.max(6, stats.activeDays) ? '提醒数量较多,说明执行中断点偏多,适合增加固定回顾时段。' : '', + pendingTodoCount > 0 ? `\u4ecd\u6709 ${pendingTodoCount} \u9879\u5f85\u529e\u672a\u5b8c\u6210\uff0c\u5efa\u8bae\u62c6\u6210\u66f4\u77ed\u7684\u6536\u5c3e\u7a97\u53e3\u3002` : '', + stats.highPriorityTotal >= 8 ? '\u9ad8\u4f18\u4e8b\u9879\u5bc6\u5ea6\u504f\u9ad8\uff0c\u6700\u597d\u63d0\u524d\u9501\u5b9a 1 \u5230 2 \u4e2a\u7edd\u5bf9\u4f18\u5148\u7ea7\u3002' : '', + stats.reminderTotal >= Math.max(6, stats.activeDays) ? '\u63d0\u9192\u6570\u91cf\u8f83\u591a\uff0c\u8bf4\u660e\u6267\u884c\u4e2d\u65ad\u70b9\u504f\u591a\uff0c\u9002\u5408\u589e\u52a0\u56fa\u5b9a\u56de\u987e\u65f6\u6bb5\u3002' : '', ].filter(Boolean) if (items.length > 0) return items.slice(0, 3) - return ['本月节奏相对平稳,下一步可以把重点事项再收敛到更清晰的主线。'] + return ['\u672c\u6708\u8282\u594f\u76f8\u5bf9\u5e73\u7a33\uff0c\u4e0b\u4e00\u6b65\u53ef\u4ee5\u628a\u91cd\u70b9\u4e8b\u9879\u518d\u6536\u655b\u5230\u66f4\u6e05\u6670\u7684\u4e3b\u7ebf\u3002'] }) -const sidebarNewsItems = computed(() => { +const sidebarFeedItems = computed(() => { const digestFeed = recentDigests.value.flatMap((digest: any, digestIndex: number) => { - const dateLabel = typeof digest.date === 'string' ? digest.date.slice(5) : '近期' + const dateLabel = typeof digest.date === 'string' ? digest.date.slice(5) : '\u8fd1\u671f' const points = Array.isArray(digest.keyPoints) ? digest.keyPoints : [] return points.slice(0, 2).map((point: any, pointIndex: number) => ({ id: `digest-${digestIndex}-${pointIndex}`, @@ -468,9 +466,9 @@ const sidebarNewsItems = computed(() => { if (digestFeed.length > 0) return digestFeed.slice(0, 4) return [ - { id: 'fallback-1', title: 'AI 研发节奏继续升温,模型与工作流一体化成为主流议题。', meta: 'Industry' }, - { id: 'fallback-2', title: '本地知识库与计划系统的联动体验,正在成为效率工具的新竞争点。', meta: 'Product' }, - { id: 'fallback-3', title: '建议接入真实 RSS 源后替换当前占位卡片,以获得即时资讯流。', meta: 'System' }, + { id: 'fallback-1', title: '\u0041\u0049 \u7814\u53d1\u8282\u594f\u7ee7\u7eed\u5347\u6e29\uff0c\u6a21\u578b\u4e0e\u5de5\u4f5c\u6d41\u4e00\u4f53\u5316\u6210\u4e3a\u4e3b\u6d41\u8bae\u9898\u3002', meta: 'Industry' }, + { id: 'fallback-2', title: '\u672c\u5730\u77e5\u8bc6\u5e93\u4e0e\u8ba1\u5212\u7cfb\u7edf\u7684\u8054\u52a8\u4f53\u9a8c\uff0c\u6b63\u5728\u6210\u4e3a\u6548\u7387\u5de5\u5177\u7684\u65b0\u7ade\u4e89\u70b9\u3002', meta: 'Product' }, + { id: 'fallback-3', title: '\u5efa\u8bae\u63a5\u5165\u771f\u5b9e RSS \u6e90\u540e\u66ff\u6362\u5f53\u524d\u5360\u4f4d\u5361\u7247\uff0c\u4ee5\u83b7\u5f97\u5373\u65f6\u8d44\u8baf\u6d41\u3002', meta: 'System' }, ] }) @@ -672,104 +670,105 @@ function renderMarkdown(content: string) {