From 7f5b133fad9009144476a28fdaf08de499fce2c2 Mon Sep 17 00:00:00 2001 From: "WIN-JHFT4D3SIVT\\caoxiaozhu" Date: Sat, 11 Apr 2026 08:50:32 +0800 Subject: [PATCH] feat(backend): add office router and agent runtime services Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-openagent) Co-authored-by: Sisyphus --- backend/app/routers/office.py | 179 +++++++++++++ .../app/services/agent_runtime/__init__.py | 9 + backend/app/services/agent_runtime/base.py | 37 +++ .../services/agent_runtime/hermes_runtime.py | 172 +++++++++++++ .../agent_runtime/hermes_session_manager.py | 37 +++ .../services/agent_runtime/jarvis_runtime.py | 21 ++ backend/app/services/task_dispatch.py | 238 ++++++++++++++++++ 7 files changed, 693 insertions(+) create mode 100644 backend/app/routers/office.py create mode 100644 backend/app/services/agent_runtime/__init__.py create mode 100644 backend/app/services/agent_runtime/base.py create mode 100644 backend/app/services/agent_runtime/hermes_runtime.py create mode 100644 backend/app/services/agent_runtime/hermes_session_manager.py create mode 100644 backend/app/services/agent_runtime/jarvis_runtime.py create mode 100644 backend/app/services/task_dispatch.py diff --git a/backend/app/routers/office.py b/backend/app/routers/office.py new file mode 100644 index 0000000..3bea915 --- /dev/null +++ b/backend/app/routers/office.py @@ -0,0 +1,179 @@ +"""Office Status API - Star Office style visualization for Jarvis agents.""" + +from datetime import datetime, timedelta +from typing import Literal +from fastapi import APIRouter, HTTPException +from pydantic import BaseModel + +router = APIRouter(prefix="/api/office", tags=["office"]) + +# ============================================================================ +# State Definitions (mapped to spaceship areas) +# ============================================================================ +# idle → Rest Bay (breakroom) +# writing/researching/executing → Command Console (writing) +# syncing → Server Room (syncing) +# error → Repair Bay (error) + +SHIP_AREAS = { + "breakroom": {"x": 200, "y": 300}, # Rest Bay - bottom left + "writing": {"x": 640, "y": 200}, # Command Console - center top + "server": {"x": 640, "y": 400}, # Server Room - center bottom + "error": {"x": 1000, "y": 300}, # Repair Bay - right side +} + +STATES = { + "idle": {"name": "待命", "area": "breakroom"}, + "writing": {"name": "执行中", "area": "writing"}, + "researching": {"name": "研究中", "area": "writing"}, + "executing": {"name": "执行中", "area": "writing"}, + "syncing": {"name": "同步中", "area": "server"}, + "error": {"name": "故障中", "area": "error"}, +} + + +# ============================================================================ +# Data Models +# ============================================================================ +class AgentState(BaseModel): + agent_id: str + name: str + state: Literal["idle", "writing", "researching", "executing", "syncing", "error"] + detail: str | None = None + area: str | None = None + is_main: bool = False + auth_status: str = "approved" # approved, pending, rejected, offline + + +class SetStateRequest(BaseModel): + state: str + detail: str | None = None + + +class OfficeStatus(BaseModel): + state: str + detail: str | None = None + agent_name: str + timestamp: str + + +class OfficeMemo(BaseModel): + success: bool + date: str + memo: str + + +# ============================================================================ +# In-Memory State (in production, this would come from Jarvis's agent state) +# ============================================================================ +_current_state: dict = { + "agent_id": "jarvis-main", + "name": "JARVIS", + "state": "idle", + "detail": "战舰启动中...", + "area": "breakroom", + "is_main": True, + "auth_status": "approved", +} + + +def normalize_state(state: str | None) -> str: + """Normalize various state names to our canonical states.""" + if not state: + return "idle" + state = state.lower().strip() + if state in ("working", "run", "running"): + return "writing" + if state in ("sync", "syncing"): + return "syncing" + if state in ("research", "researching"): + return "researching" + if state in ("execute", "executing"): + return "executing" + if state == "error": + return "error" + return "idle" + + +def get_state_info(state: str) -> dict: + """Get state info including area mapping.""" + return STATES.get(state, STATES["idle"]) + + +# ============================================================================ +# API Endpoints +# ============================================================================ +@router.get("/status", response_model=OfficeStatus) +async def get_status(): + """Get current agent status.""" + state_info = get_state_info(_current_state["state"]) + return OfficeStatus( + state=_current_state["state"], + detail=_current_state.get("detail"), + agent_name=_current_state["name"], + timestamp=datetime.now().isoformat(), + ) + + +@router.get("/yesterday-memo", response_model=OfficeMemo) +async def get_yesterday_memo(): + """Return a lightweight public memo for the Star Office viewer.""" + target_date = (datetime.now() - timedelta(days=1)).date().isoformat() + detail = (_current_state.get("detail") or "No detailed log was recorded.").strip() + memo = ( + "Yesterday summary\n" + f"- Last known state: {_current_state['state']}\n" + f"- Detail: {detail}\n" + "- Next step: open the command surface and continue from the current work thread." + ) + return OfficeMemo(success=True, date=target_date, memo=memo) + + +@router.post("/set_state") +async def set_state(req: SetStateRequest): + """Set the current agent state.""" + normalized = normalize_state(req.state) + state_info = get_state_info(normalized) + + _current_state["state"] = normalized + _current_state["detail"] = req.detail or "" + _current_state["area"] = state_info["area"] + + return { + "success": True, + "state": normalized, + "area": state_info["area"], + "detail": _current_state["detail"], + } + + +@router.get("/agents") +async def get_agents(): + """Get all agents in the office (for multi-agent support).""" + # For now, return just the main agent + # In full implementation, this would query Jarvis's agent registry + state_info = get_state_info(_current_state["state"]) + return [ + { + "agentId": _current_state["agent_id"], + "name": _current_state["name"], + "state": _current_state["state"], + "detail": _current_state.get("detail", ""), + "area": state_info["area"], + "isMain": _current_state.get("is_main", True), + "authStatus": _current_state.get("auth_status", "approved"), + "updated_at": datetime.now().isoformat(), + } + ] + + +@router.get("/areas") +async def get_areas(): + """Get all spaceship areas with coordinates.""" + return SHIP_AREAS + + +@router.get("/health") +async def health(): + """Health check.""" + return {"status": "ok", "service": "office"} diff --git a/backend/app/services/agent_runtime/__init__.py b/backend/app/services/agent_runtime/__init__.py new file mode 100644 index 0000000..45d1855 --- /dev/null +++ b/backend/app/services/agent_runtime/__init__.py @@ -0,0 +1,9 @@ +from app.services.agent_runtime.hermes_runtime import HermesRuntimeAdapter, hermes_runtime_adapter +from app.services.agent_runtime.jarvis_runtime import JarvisRuntimeAdapter, jarvis_runtime_adapter + +__all__ = [ + "HermesRuntimeAdapter", + "hermes_runtime_adapter", + "JarvisRuntimeAdapter", + "jarvis_runtime_adapter", +] diff --git a/backend/app/services/agent_runtime/base.py b/backend/app/services/agent_runtime/base.py new file mode 100644 index 0000000..b3ac94a --- /dev/null +++ b/backend/app/services/agent_runtime/base.py @@ -0,0 +1,37 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any, AsyncGenerator, Protocol + +from app.models.conversation import Conversation, Message +from app.models.user import User + + +RuntimeName = str + + +@dataclass(slots=True) +class RuntimePreparedContext: + user: User + conversation: Conversation + user_message: Message + assistant_message: Message + raw_message: str + full_message: str + file_ids: list[str] + model_name: str | None + memory_context: str | None + + +class ChatRuntime(Protocol): + name: RuntimeName + + async def chat_stream( + self, + prepared: RuntimePreparedContext, + ) -> AsyncGenerator[dict[str, Any], None]: ... + + async def chat_once( + self, + prepared: RuntimePreparedContext, + ) -> tuple[str, str | None]: ... diff --git a/backend/app/services/agent_runtime/hermes_runtime.py b/backend/app/services/agent_runtime/hermes_runtime.py new file mode 100644 index 0000000..3c40342 --- /dev/null +++ b/backend/app/services/agent_runtime/hermes_runtime.py @@ -0,0 +1,172 @@ +from __future__ import annotations + +import asyncio +import importlib.util +import sys +from datetime import UTC, datetime +from pathlib import Path +from typing import Any, AsyncGenerator + +from app.services.agent_runtime.base import ChatRuntime, RuntimePreparedContext +from app.services.agent_runtime.hermes_session_manager import hermes_session_manager + + +class HermesRuntimeAdapter(ChatRuntime): + name = "hermes" + + def __init__(self) -> None: + self._repo_path = Path(__file__).resolve().parents[4] / ".tmp" / "hermes-agent" + self._agent_class = None + + def probe(self) -> dict[str, Any]: + cli_path = self._repo_path / "cli.py" + run_agent_path = self._repo_path / "run_agent.py" + return { + "repo_path": str(self._repo_path), + "repo_exists": self._repo_path.exists(), + "cli_exists": cli_path.exists(), + "run_agent_exists": run_agent_path.exists(), + "supports_single_query": True, + "supports_resume": True, + "integration_mode": "python_ai_agent_bridge", + } + + def _load_agent_class(self): + if self._agent_class is not None: + return self._agent_class + + run_agent_path = self._repo_path / "run_agent.py" + if not run_agent_path.exists(): + raise RuntimeError(f"Hermes run_agent.py 未找到: {run_agent_path}") + + repo_path = str(self._repo_path) + if repo_path not in sys.path: + sys.path.insert(0, repo_path) + + spec = importlib.util.spec_from_file_location("jarvis_hermes_run_agent", run_agent_path) + if spec is None or spec.loader is None: + raise RuntimeError("无法加载 Hermes run_agent 模块") + + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + self._agent_class = getattr(module, "AIAgent") + return self._agent_class + + def _build_agent(self, prepared: RuntimePreparedContext, session_id: str): + agent_class = self._load_agent_class() + kwargs: dict[str, Any] = { + "session_id": session_id, + "platform": "jarvis", + "user_id": prepared.user.id, + "quiet_mode": True, + "persist_session": True, + "skip_context_files": True, + "max_iterations": 30, + } + if prepared.model_name: + kwargs["model"] = prepared.model_name + return agent_class(**kwargs) + + def _build_system_message(self, prepared: RuntimePreparedContext) -> str: + parts = [ + "You are Hermes running inside the Jarvis chat runtime.", + "Return normal assistant text for the user. Do not mention internal bridge details unless asked.", + ] + if prepared.memory_context: + parts.append(prepared.memory_context) + return "\n\n".join(parts) + + async def chat_stream( + self, + prepared: RuntimePreparedContext, + ) -> AsyncGenerator[dict[str, Any], None]: + handle = hermes_session_manager.get_or_create( + conversation_id=prepared.conversation.id, + user_id=prepared.user.id, + ) + async with handle.lock: + yield { + "type": "progress", + "stage": "planning", + "label": "Hermes 正在准备会话", + "agent": "hermes", + "step": "加载 Hermes runtime", + "steps": [ + "恢复会话上下文", + "调用 Hermes AIAgent", + "回传流式回复", + ], + } + + queue: asyncio.Queue[dict[str, Any] | None] = asyncio.Queue() + loop = asyncio.get_running_loop() + result_box: dict[str, Any] = {"content": None, "error": None, "model": prepared.model_name or "hermes"} + + def stream_callback(delta: str) -> None: + loop.call_soon_threadsafe(queue.put_nowait, {"type": "chunk", "content": delta}) + + def run_sync() -> None: + try: + agent = self._build_agent(prepared, handle.hermes_session_id) + result = agent.run_conversation( + prepared.full_message, + system_message=self._build_system_message(prepared), + stream_callback=stream_callback, + ) + result_box["content"] = str(result.get("final_response") or "") + result_box["model"] = getattr(agent, "model", prepared.model_name or "hermes") + except Exception as exc: # pragma: no cover - surfaced through queue + result_box["error"] = f"Hermes 执行失败: {exc}" + loop.call_soon_threadsafe( + queue.put_nowait, + {"type": "error", "error": result_box["error"]}, + ) + finally: + loop.call_soon_threadsafe(queue.put_nowait, None) + + worker = asyncio.create_task(asyncio.to_thread(run_sync)) + streamed_text = "" + while True: + event = await queue.get() + if event is None: + break + if event.get("type") == "chunk": + streamed_text += str(event.get("content", "")) + yield event + + await worker + handle.last_used_at = datetime.now(UTC) + handle.metadata = { + "session_id": handle.hermes_session_id, + "model": result_box["model"], + "last_error": result_box["error"], + } + + final_text = result_box["content"] or streamed_text + if final_text and final_text != streamed_text: + yield {"type": "chunk", "content": final_text} + + async def chat_once(self, prepared: RuntimePreparedContext) -> tuple[str, str | None]: + handle = hermes_session_manager.get_or_create( + conversation_id=prepared.conversation.id, + user_id=prepared.user.id, + ) + + async with handle.lock: + agent = await asyncio.to_thread(self._build_agent, prepared, handle.hermes_session_id) + result = await asyncio.to_thread( + agent.run_conversation, + prepared.full_message, + self._build_system_message(prepared), + ) + handle.last_used_at = datetime.now(UTC) + resolved_model = getattr(agent, "model", prepared.model_name or "hermes") + handle.metadata = { + "session_id": handle.hermes_session_id, + "model": resolved_model, + "last_error": None, + } + return str(result.get("final_response") or ""), resolved_model + + +hermes_runtime_adapter = HermesRuntimeAdapter() diff --git a/backend/app/services/agent_runtime/hermes_session_manager.py b/backend/app/services/agent_runtime/hermes_session_manager.py new file mode 100644 index 0000000..0afe0fe --- /dev/null +++ b/backend/app/services/agent_runtime/hermes_session_manager.py @@ -0,0 +1,37 @@ +from __future__ import annotations + +import asyncio +from dataclasses import dataclass, field +from datetime import UTC, datetime +from typing import Any + + +@dataclass(slots=True) +class HermesSessionHandle: + conversation_id: str + user_id: str + hermes_session_id: str + last_used_at: datetime = field(default_factory=lambda: datetime.now(UTC)) + restart_count: int = 0 + lock: asyncio.Lock = field(default_factory=asyncio.Lock) + metadata: dict[str, Any] = field(default_factory=dict) + + +class HermesSessionManager: + def __init__(self) -> None: + self._sessions: dict[str, HermesSessionHandle] = {} + + def get_or_create(self, *, conversation_id: str, user_id: str) -> HermesSessionHandle: + handle = self._sessions.get(conversation_id) + if handle is None: + handle = HermesSessionHandle( + conversation_id=conversation_id, + user_id=user_id, + hermes_session_id=f"jarvis-{conversation_id}", + ) + self._sessions[conversation_id] = handle + handle.last_used_at = datetime.now(UTC) + return handle + + +hermes_session_manager = HermesSessionManager() diff --git a/backend/app/services/agent_runtime/jarvis_runtime.py b/backend/app/services/agent_runtime/jarvis_runtime.py new file mode 100644 index 0000000..4679aca --- /dev/null +++ b/backend/app/services/agent_runtime/jarvis_runtime.py @@ -0,0 +1,21 @@ +from __future__ import annotations + +from typing import Any, AsyncGenerator + +from app.services.agent_runtime.base import ChatRuntime, RuntimePreparedContext + + +class JarvisRuntimeAdapter(ChatRuntime): + name = "jarvis" + + async def chat_stream( + self, + prepared: RuntimePreparedContext, + ) -> AsyncGenerator[dict[str, Any], None]: + raise NotImplementedError("Jarvis runtime is executed inside AgentService") + + async def chat_once(self, prepared: RuntimePreparedContext) -> tuple[str, str | None]: + raise NotImplementedError("Jarvis runtime is executed inside AgentService") + + +jarvis_runtime_adapter = JarvisRuntimeAdapter() diff --git a/backend/app/services/task_dispatch.py b/backend/app/services/task_dispatch.py new file mode 100644 index 0000000..8e7d7ca --- /dev/null +++ b/backend/app/services/task_dispatch.py @@ -0,0 +1,238 @@ +import asyncio +from datetime import UTC, datetime +from uuid import uuid4 + +from sqlalchemy import select +from sqlalchemy.ext.asyncio import async_sessionmaker +from sqlalchemy.orm import object_session +from sqlalchemy.orm import selectinload + +from app.models.task import ( + Task, + TaskDispatchStatus, + TaskHistory, + TaskPriority, + TaskStatus, + TaskSubTask, +) + + +def _now() -> datetime: + return datetime.now(UTC) + + +def _stringify(value: object | None) -> str | None: + if value is None: + return None + return str(value) + + +def append_task_history( + task: Task, + *, + action: str, + old_value: object | None = None, + new_value: object | None = None, +) -> None: + entry = TaskHistory( + task_id=task.id, + action=action, + old_value=_stringify(old_value), + new_value=_stringify(new_value), + ) + session = object_session(task) + if session is not None: + session.add(entry) + return + task.history.append(entry) + + +def build_dispatch_payload(task: Task, subtasks: list[TaskSubTask]) -> dict[str, object]: + return { + "business_task_id": task.id, + "title": task.title, + "description": task.description, + "priority": task.priority.value if isinstance(task.priority, TaskPriority) else str(task.priority), + "due_date": task.due_date.isoformat() if task.due_date else None, + "conversation_id": task.conversation_id, + "user_id": task.user_id, + "subtasks": [ + { + "id": item.id, + "title": item.title, + "description": item.description, + "status": item.status.value if isinstance(item.status, TaskStatus) else str(item.status), + "assignee_type": item.assignee_type.value if item.assignee_type else None, + "assignee_id": item.assignee_id, + "dispatch_status": ( + item.dispatch_status.value + if isinstance(item.dispatch_status, TaskDispatchStatus) + else str(item.dispatch_status) + ), + "order_index": item.order_index, + } + for item in subtasks + ], + } + + +async def _run_dispatch_flow( + task_id: str, + run_id: str, + *, + session_factory, + subtask_id: str | None = None, +) -> None: + await asyncio.sleep(0.01) + + async with session_factory() as db: + task = await db.get(Task, task_id) + if task is None: + return + target = await db.get(TaskSubTask, subtask_id) if subtask_id else None + if subtask_id and target is None: + return + + if subtask_id: + previous = target.dispatch_status + target.dispatch_status = TaskDispatchStatus.RUNNING + target.dispatch_run_id = run_id + target.completed_at = None + task.dispatch_status = TaskDispatchStatus.RUNNING + task.dispatch_run_id = run_id + task.started_at = task.started_at or _now() + task.last_synced_at = _now() + append_task_history( + task, + action="dispatch_status_changed", + old_value=f"{subtask_id}:{previous.value}", + new_value=f"{subtask_id}:{TaskDispatchStatus.RUNNING.value}", + ) + else: + previous = task.dispatch_status + task.dispatch_status = TaskDispatchStatus.RUNNING + task.dispatch_run_id = run_id + task.started_at = task.started_at or _now() + task.last_synced_at = _now() + task.status = TaskStatus.IN_PROGRESS + append_task_history( + task, + action="dispatch_status_changed", + old_value=previous.value, + new_value=TaskDispatchStatus.RUNNING.value, + ) + await db.commit() + + await asyncio.sleep(0.01) + + async with session_factory() as db: + task = await db.get(Task, task_id) + if task is None: + return + target = await db.get(TaskSubTask, subtask_id) if subtask_id else None + if subtask_id and target is None: + return + + synced_at = _now() + if subtask_id: + previous = target.dispatch_status + target.dispatch_status = TaskDispatchStatus.COMPLETED + target.dispatch_run_id = run_id + target.status = TaskStatus.DONE + target.completed_at = synced_at + task.dispatch_status = TaskDispatchStatus.COMPLETED + task.dispatch_run_id = run_id + task.result_summary = f"Commander completed subtask {target.title}" + task.last_synced_at = synced_at + append_task_history( + task, + action="dispatch_status_changed", + old_value=f"{subtask_id}:{previous.value}", + new_value=f"{subtask_id}:{TaskDispatchStatus.COMPLETED.value}", + ) + else: + previous = task.dispatch_status + task.dispatch_status = TaskDispatchStatus.COMPLETED + task.dispatch_run_id = run_id + task.result_summary = f"Commander completed task {task.title}" + task.last_synced_at = synced_at + task.status = TaskStatus.DONE + task.completed_at = synced_at + append_task_history( + task, + action="dispatch_status_changed", + old_value=previous.value, + new_value=TaskDispatchStatus.COMPLETED.value, + ) + await db.commit() + + +def schedule_dispatch(task_id: str, run_id: str, *, session_factory, subtask_id: str | None = None) -> None: + asyncio.create_task( + _run_dispatch_flow( + task_id, + run_id, + session_factory=session_factory, + subtask_id=subtask_id, + ) + ) + + +async def queue_task_dispatch( + task: Task, + *, + db, + subtask: TaskSubTask | None = None, +) -> tuple[str, dict[str, object]]: + subtasks = list(task.subtasks) + run_id = uuid4().hex[:12] + synced_at = _now() + + if subtask is not None: + previous = subtask.dispatch_status + subtask.dispatch_status = TaskDispatchStatus.QUEUED + subtask.dispatch_run_id = run_id + task.dispatch_status = TaskDispatchStatus.QUEUED + task.dispatch_run_id = run_id + task.result_summary = None + task.last_synced_at = synced_at + append_task_history( + task, + action="dispatched_to_commander", + old_value=f"{subtask.id}:{previous.value}", + new_value=f"{subtask.id}:{TaskDispatchStatus.QUEUED.value}", + ) + else: + previous = task.dispatch_status + task.dispatch_status = TaskDispatchStatus.QUEUED + task.dispatch_run_id = run_id + task.result_summary = None + task.started_at = None + task.last_synced_at = synced_at + append_task_history( + task, + action="dispatched_to_commander", + old_value=previous.value, + new_value=TaskDispatchStatus.QUEUED.value, + ) + + await db.commit() + await db.refresh(task) + payload = build_dispatch_payload(task, subtasks) + session_factory = async_sessionmaker(bind=db.bind, expire_on_commit=False) + schedule_dispatch( + task.id, + run_id, + session_factory=session_factory, + subtask_id=subtask.id if subtask else None, + ) + return run_id, payload + + +async def load_task_with_details(db, *, task_id: str, user_id: str) -> Task | None: + result = await db.execute( + select(Task) + .options(selectinload(Task.subtasks), selectinload(Task.history)) + .where(Task.id == task_id, Task.user_id == user_id) + ) + return result.scalar_one_or_none()