# Phase 4:流式交互 日期:2026-04-04 状态:待实施 依赖:Phase 3 完成 --- ## 1. 本阶段目的 实现 PTY 终端 + WebSocket 流式输出: - PTY 终端管理 - WebSocket 端点 - 流式输出集成 - 交互输入 --- ## 2. 详细任务 ### 2.1 PTY Terminal Engine **新文件**: `backend/app/agents/tools/terminal_engine.py` ```python import asyncio import os from dataclasses import dataclass, field from typing import AsyncGenerator @dataclass class PTYSession: session_id: str process: asyncio.subprocess.Process workspace_path: str class PTYManager: def __init__(self): self._sessions: dict[str, PTYSession] = {} self._output_queues: dict[str, asyncio.Queue] = {} async def spawn( self, cli: str, args: list[str], cwd: str, session_id: str | None = None ) -> str: """启动 PTY 会话""" if session_id is None: session_id = f"pty_{os.urandom(8).hex()}" # 创建 PTY 进程 process = await asyncio.create_subprocess_exec( *([cli] + args), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=cwd, env={**os.environ, "TERM": "xterm-256color"}, ) session = PTYSession( session_id=session_id, process=process, workspace_path=cwd, ) self._sessions[session_id] = session self._output_queues[session_id] = asyncio.Queue() # 启动输出读取协程 asyncio.create_task(self._read_output(session_id)) return session_id async def _read_output(self, session_id: str): """读取 PTY 输出并放入队列""" session = self._sessions.get(session_id) if not session: return queue = self._output_queues[session_id] while True: line = await session.process.stdout.readline() if not line: break await queue.put(line.decode()) # 同时推送给所有订阅者 await self._broadcast(session_id, line.decode()) await queue.put(None) # 结束标记 async def write(self, session_id: str, data: str): """写入 PTY(用户输入)""" session = self._sessions.get(session_id) if session and session.process.stdin: session.process.stdin.write(data) await session.process.stdin.drain() async def read(self, session_id: str) -> AsyncGenerator[str, None]: """读取 PTY 输出""" queue = self._output_queues.get(session_id) if not queue: return while True: line = await queue.get() if line is None: break yield line async def resize(self, session_id: str, rows: int, cols: int): """调整终端大小""" # TODO: 实现 resize pass async def kill(self, session_id: str): """终止 PTY 会话""" if session_id in self._sessions: session = self._sessions[session_id] session.process.terminate() await session.process.wait() del self._sessions[session_id] del self._output_queues[session_id] async def _broadcast(self, session_id: str, data: str): """广播输出到 WebSocket""" # 实际推送由 router 层处理 pass ``` ### 2.2 WebSocket 端点 **新文件**: `backend/app/routers/terminal.py` ```python from fastapi import APIRouter, WebSocket, WebSocketDisconnect from typing import dict router = APIRouter(prefix="/ws/terminal", tags=["terminal"]) class ConnectionManager: def __init__(self): self.active_connections: dict[str, WebSocket] = {} async def connect(self, session_id: str, websocket: WebSocket): await websocket.accept() self.active_connections[session_id] = websocket def disconnect(self, session_id: str): if session_id in self.active_connections: del self.active_connections[session_id] async def send(self, session_id: str, data: str): if session_id in self.active_connections: await self.active_connections[session_id].send_text(data) manager = ConnectionManager() @router.websocket("/{session_id}") async def terminal_websocket(websocket: WebSocket, session_id: str): await manager.connect(session_id, websocket) # 获取 PTY Manager 实例 from app.agents.tools.terminal_engine import pty_manager try: # 订阅该 session 的输出 queue = pty_manager._output_queues.get(session_id) if queue: while True: data = await websocket.receive_text() # 接收用户输入 await pty_manager.write(session_id, data + "\n") except WebSocketDisconnect: manager.disconnect(session_id) ``` ### 2.3 流式输出集成 **新文件**: `backend/app/agents/tools/stream_output.py` ```python import json from typing import AsyncGenerator from dataclasses import dataclass @dataclass class StreamEvent: type: str # "output" | "error" | "status" | "complete" session_id: str data: str timestamp: str class StreamOutput: def __init__(self, session_id: str, websocket_sender): self.session_id = session_id self.websocket_sender = websocket_sender async def push(self, event_type: str, data: str): """推送事件到 WebSocket""" event = StreamEvent( type=event_type, session_id=self.session_id, data=data, timestamp=datetime.now().isoformat(), ) await self.websocket_sender(self.session_id, json.dumps(event.__dict__)) async def stream_execution( self, executor, prompt: str ) -> AsyncGenerator[str, None]: """包装执行器,实现流式输出""" async for line in executor.execute(prompt): await self.push("output", line) yield line await self.push("complete", "") ``` ### 2.4 交互输入 **新文件**: `backend/app/agents/tools/interactive_input.py` ```python class InteractiveInputHandler: def __init__(self, pty_manager: PTYManager): self.pty_manager = pty_manager self._pending_inputs: dict[str, asyncio.Event] = {} async def wait_for_input(self, session_id: str, prompt: str) -> str: """等待用户输入(如 "y" 确认)""" event = asyncio.Event() self._pending_inputs[session_id] = event # 发送提示 from app.routers.terminal import manager await manager.send(session_id, f"\n{prompt}\n") # 等待输入完成 await event.wait() del self._pending_inputs[session_id] return self._input_cache.get(session_id, "") async def send_input(self, session_id: str, data: str): """用户发送输入""" self._input_cache[session_id] = data if session_id in self._pending_inputs: self._pending_inputs[session_id].set() # 同时写入 PTY await self.pty_manager.write(session_id, data + "\n") ``` --- ## 3. 核心文件清单 | 文件 | 操作 | 说明 | |------|------|------| | `terminal_engine.py` | 新增 | PTY 终端管理 | | `routers/terminal.py` | 新增 | WebSocket 端点 | | `stream_output.py` | 新增 | 流式输出封装 | | `interactive_input.py` | 新增 | 交互输入处理 | --- ## 4. 验收标准 - [ ] PTY 会话可以启动、读写、终止 - [ ] WebSocket 可以建立连接并收发消息 - [ ] 执行输出实时推送到前端 - [ ] 用户输入可以传递到 PTY --- ## 5. 依赖关系 ``` Phase 3(Agent 集成) ↓ 本阶段 → Phase 5(前端集成) ``` --- ## 6. 备注 PTY 实现参考了 golutra 的 `src-tauri/src/runtime/pty.rs`: - 使用 `portable-pty` 库 - Windows 路径兼容处理 - shim 机制用于信号处理