diff --git a/backend/app/database.py b/backend/app/database.py index 5b2b074..bf1dd0b 100644 --- a/backend/app/database.py +++ b/backend/app/database.py @@ -35,6 +35,7 @@ async def get_db() -> AsyncGenerator[AsyncSession, None]: async def init_db(): async with engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) + await ensure_task_columns(conn) await ensure_log_columns(conn) await ensure_message_columns(conn) await ensure_conversation_columns(conn) @@ -47,6 +48,195 @@ async def init_db(): await ensure_learning_artifact_tables(conn) +async def ensure_task_columns(conn): + rows = await _get_table_info(conn, 'tasks') + if not rows: + return + + columns = {row[1] for row in rows} + required_columns = { + 'source': "ALTER TABLE tasks ADD COLUMN source VARCHAR(32) DEFAULT 'manual' NOT NULL", + 'conversation_id': "ALTER TABLE tasks ADD COLUMN conversation_id VARCHAR(36)", + 'quadrant': "ALTER TABLE tasks ADD COLUMN quadrant VARCHAR(64)", + 'assignee_type': "ALTER TABLE tasks ADD COLUMN assignee_type VARCHAR(32)", + 'assignee_id': "ALTER TABLE tasks ADD COLUMN assignee_id VARCHAR(255)", + 'dispatch_status': "ALTER TABLE tasks ADD COLUMN dispatch_status VARCHAR(32) DEFAULT 'idle' NOT NULL", + 'dispatch_run_id': "ALTER TABLE tasks ADD COLUMN dispatch_run_id VARCHAR(64)", + 'result_summary': "ALTER TABLE tasks ADD COLUMN result_summary TEXT", + 'started_at': "ALTER TABLE tasks ADD COLUMN started_at DATETIME", + 'last_synced_at': "ALTER TABLE tasks ADD COLUMN last_synced_at DATETIME", + } + for column, ddl in required_columns.items(): + if column not in columns: + await conn.execute(text(ddl)) + + indexes = { + 'ix_tasks_due_date': "CREATE INDEX IF NOT EXISTS ix_tasks_due_date ON tasks (due_date)", + 'ix_tasks_source': "CREATE INDEX IF NOT EXISTS ix_tasks_source ON tasks (source)", + 'ix_tasks_conversation_id': "CREATE INDEX IF NOT EXISTS ix_tasks_conversation_id ON tasks (conversation_id)", + 'ix_tasks_quadrant': "CREATE INDEX IF NOT EXISTS ix_tasks_quadrant ON tasks (quadrant)", + 'ix_tasks_assignee_type': "CREATE INDEX IF NOT EXISTS ix_tasks_assignee_type ON tasks (assignee_type)", + 'ix_tasks_assignee_id': "CREATE INDEX IF NOT EXISTS ix_tasks_assignee_id ON tasks (assignee_id)", + 'ix_tasks_dispatch_status': "CREATE INDEX IF NOT EXISTS ix_tasks_dispatch_status ON tasks (dispatch_status)", + 'ix_tasks_dispatch_run_id': "CREATE INDEX IF NOT EXISTS ix_tasks_dispatch_run_id ON tasks (dispatch_run_id)", + } + for ddl in indexes.values(): + await conn.execute(text(ddl)) + + history_rows = await _get_table_info(conn, 'task_histories') + if history_rows: + history_columns = {row[1] for row in history_rows} + if 'subtask_id' not in history_columns: + await conn.execute(text("ALTER TABLE task_histories ADD COLUMN subtask_id VARCHAR(36)")) + await conn.execute(text("CREATE INDEX IF NOT EXISTS ix_task_histories_subtask_id ON task_histories (subtask_id)")) + + await conn.execute( + text( + """ + CREATE TABLE IF NOT EXISTS task_subtasks ( + id VARCHAR(36) PRIMARY KEY, + created_at DATETIME NOT NULL, + updated_at DATETIME NOT NULL, + task_id VARCHAR(36) NOT NULL, + title VARCHAR(500) NOT NULL, + description TEXT, + status VARCHAR(32) NOT NULL DEFAULT 'todo', + order_index INTEGER NOT NULL DEFAULT 0, + assignee_type VARCHAR(32), + assignee_id VARCHAR(255), + dispatch_status VARCHAR(32) NOT NULL DEFAULT 'idle', + dispatch_run_id VARCHAR(64), + completed_at DATETIME, + FOREIGN KEY(task_id) REFERENCES tasks (id) + ) + """ + ) + ) + subtask_rows = await _get_table_info(conn, 'task_subtasks') + subtask_columns = {row[1] for row in subtask_rows} + if 'result_summary' not in subtask_columns: + await conn.execute(text("ALTER TABLE task_subtasks ADD COLUMN result_summary TEXT")) + subtask_indexes = { + 'ix_task_subtasks_task_id': "CREATE INDEX IF NOT EXISTS ix_task_subtasks_task_id ON task_subtasks (task_id)", + 'ix_task_subtasks_status': "CREATE INDEX IF NOT EXISTS ix_task_subtasks_status ON task_subtasks (status)", + 'ix_task_subtasks_order_index': "CREATE INDEX IF NOT EXISTS ix_task_subtasks_order_index ON task_subtasks (order_index)", + 'ix_task_subtasks_assignee_type': "CREATE INDEX IF NOT EXISTS ix_task_subtasks_assignee_type ON task_subtasks (assignee_type)", + 'ix_task_subtasks_assignee_id': "CREATE INDEX IF NOT EXISTS ix_task_subtasks_assignee_id ON task_subtasks (assignee_id)", + 'ix_task_subtasks_dispatch_status': "CREATE INDEX IF NOT EXISTS ix_task_subtasks_dispatch_status ON task_subtasks (dispatch_status)", + 'ix_task_subtasks_dispatch_run_id': "CREATE INDEX IF NOT EXISTS ix_task_subtasks_dispatch_run_id ON task_subtasks (dispatch_run_id)", + } + for ddl in subtask_indexes.values(): + await conn.execute(text(ddl)) + + # Normalize legacy/invalid enum-like values to prevent ORM Enum decoding failures. + await conn.execute( + text( + """ + UPDATE tasks + SET source = 'manual' + WHERE source IS NULL + OR TRIM(source) = '' + OR source NOT IN ('manual','chat','schedule_center','today_status','commander') + """ + ) + ) + await conn.execute( + text( + """ + UPDATE tasks + SET status = 'todo' + WHERE status IS NULL + OR TRIM(status) = '' + OR status NOT IN ('todo','in_progress','done','cancelled') + """ + ) + ) + await conn.execute( + text( + """ + UPDATE tasks + SET priority = 'medium' + WHERE priority IS NULL + OR TRIM(priority) = '' + OR priority NOT IN ('low','medium','high','urgent') + """ + ) + ) + await conn.execute( + text( + """ + UPDATE tasks + SET quadrant = NULL + WHERE quadrant IS NOT NULL + AND (TRIM(quadrant) = '' OR quadrant NOT IN ( + 'urgent-important', + 'not-urgent-important', + 'urgent-not-important', + 'not-urgent-not-important' + )) + """ + ) + ) + await conn.execute( + text( + """ + UPDATE tasks + SET assignee_type = NULL + WHERE assignee_type IS NOT NULL + AND (TRIM(assignee_type) = '' OR assignee_type NOT IN ( + 'user','commander','agent','planner','executor','knowledge','analyst','coder','researcher' + )) + """ + ) + ) + await conn.execute( + text( + """ + UPDATE tasks + SET dispatch_status = 'idle' + WHERE dispatch_status IS NULL + OR TRIM(dispatch_status) = '' + OR dispatch_status NOT IN ('idle','queued','running','completed','failed') + """ + ) + ) + + await conn.execute( + text( + """ + UPDATE task_subtasks + SET status = 'todo' + WHERE status IS NULL + OR TRIM(status) = '' + OR status NOT IN ('todo','in_progress','done','cancelled') + """ + ) + ) + await conn.execute( + text( + """ + UPDATE task_subtasks + SET assignee_type = NULL + WHERE assignee_type IS NOT NULL + AND (TRIM(assignee_type) = '' OR assignee_type NOT IN ( + 'user','commander','agent','planner','executor','knowledge','analyst','coder','researcher' + )) + """ + ) + ) + await conn.execute( + text( + """ + UPDATE task_subtasks + SET dispatch_status = 'idle' + WHERE dispatch_status IS NULL + OR TRIM(dispatch_status) = '' + OR dispatch_status NOT IN ('idle','queued','running','completed','failed') + """ + ) + ) + + async def ensure_log_columns(conn): result = await conn.execute(text("PRAGMA table_info(logs)")) rows = result.fetchall() diff --git a/backend/app/main.py b/backend/app/main.py index 572bd95..098014d 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -31,6 +31,7 @@ from app.routers import ( terminal_router, tools_router, remote_mount_router, + office_router, ) from app.routers.scheduler import router as scheduler_router from app.services.scheduler_service import start_scheduler, stop_scheduler, get_scheduler_status @@ -133,6 +134,7 @@ app.include_router(agent_sessions_router) app.include_router(terminal_router) app.include_router(tools_router) app.include_router(remote_mount_router) +app.include_router(office_router) @app.get("/api/health") diff --git a/backend/app/services/agent_service.py b/backend/app/services/agent_service.py index 8ecd75e..9695859 100644 --- a/backend/app/services/agent_service.py +++ b/backend/app/services/agent_service.py @@ -42,6 +42,9 @@ from app.services.rollback_controller import RollbackController from app.services.runtime_observability import build_runtime_observability_report from app.agents.tools.time_reasoning import extract_reference_datetime from app.agents.state import initial_state +from app.services.agent_runtime.base import RuntimePreparedContext +from app.services.agent_runtime.hermes_runtime import hermes_runtime_adapter +from app.services.agent_runtime.hermes_session_manager import hermes_session_manager logger = logging.getLogger(__name__) @@ -378,6 +381,9 @@ class AgentService: def __init__(self, db: AsyncSession): self.db = db + def _resolve_runtime(self, runtime: str | None) -> str: + return runtime or "jarvis" + async def _try_auto_summarize_background(self, user_id: str, conversation_id: str) -> None: async with async_session() as session: await memory_service.try_auto_summarize(session, user_id, conversation_id) @@ -662,10 +668,12 @@ class AgentService: conversation_id: str | None = None, file_ids: list[str] | None = None, model_name: str | None = None, + runtime: str | None = None, ) -> tuple[str, str, AsyncGenerator[dict[str, Any], None]]: """ 处理对话请求(流式) """ + runtime_name = self._resolve_runtime(runtime) user_llm_config = await self._get_user_llm_config(user_id, model_name) model_name_used = model_name if model_name and not user_llm_config: @@ -758,7 +766,7 @@ class AgentService: conversation_id=conversation_id, role="assistant", content="", - model=model_name_used or "jarvis", + model=(model_name_used or "jarvis") if runtime_name == "jarvis" else runtime_name, attachments=None, ) self.db.add(assistant_msg) @@ -773,10 +781,78 @@ class AgentService: "title": "Assistant message", "content_summary": content[:500], "raw_excerpt": content[:2000], - "metadata_": {"role": "assistant"}, + "metadata_": {"role": "assistant", "runtime": runtime_name}, "importance_signal": 0.8, } + if runtime_name == "hermes": + user = await self.db.get(User, user_id) + if user is None: + raise ValueError("用户不存在") + + prepared = RuntimePreparedContext( + user=user, + conversation=conv, + user_message=user_msg, + assistant_message=assistant_msg, + raw_message=message, + full_message=full_message, + file_ids=file_ids or [], + model_name=model_name_used, + memory_context=memory_ctx, + ) + + async def run_hermes(): + collected = "" + stream_failed = False + try: + async for event in hermes_runtime_adapter.chat_stream(prepared): + if event.get("type") == "chunk": + collected += str(event.get("content", "")) + elif event.get("type") == "error": + stream_failed = True + yield event + finally: + try: + session_handle = hermes_session_manager.get_or_create( + conversation_id=conv.id, + user_id=user_id, + ) + assistant_msg.content = collected if collected else ("Hermes 执行失败,请检查运行配置。" if stream_failed else "") + assistant_msg.model = str(session_handle.metadata.get("model") or "hermes") + assistant_msg.attachments = [ + { + "kind": "runtime_info", + "runtime": "hermes", + "session_id": session_handle.hermes_session_id, + "model": session_handle.metadata.get("model"), + "last_error": session_handle.metadata.get("last_error"), + } + ] + conv.agent_state = { + "runtime": "hermes", + "runtime_state": { + "hermes": { + "session_id": session_handle.hermes_session_id, + "message_id": assistant_msg.id, + "model": session_handle.metadata.get("model"), + "last_error": session_handle.metadata.get("last_error"), + } + }, + } + await BrainService(self.db).create_event( + user_id, + **_build_assistant_event_payload(assistant_msg.content), + ) + await self.db.commit() + await self.db.refresh(assistant_msg) + except Exception: + logger.exception("save_hermes_assistant_message_failed") + asyncio.create_task(self._try_auto_summarize_background(user_id, conversation_id)) + asyncio.create_task(self._extract_memories_background(user_id, conversation_id)) + + return conversation_id, assistant_msg.id, run_hermes() + async def run_agent(): collected = "" state: dict[str, Any] | None = None @@ -1003,10 +1079,12 @@ class AgentService: conversation_id: str | None = None, file_ids: list[str] | None = None, model_name: str | None = None, + runtime: str | None = None, ) -> tuple[str, str, str, str | None]: """ 简单同步版对话 """ + runtime_name = self._resolve_runtime(runtime) user_llm_config = await self._get_user_llm_config(user_id, model_name) model_name_used = model_name if model_name and not user_llm_config: @@ -1043,7 +1121,7 @@ class AgentService: conversation_id=conversation_id, role="assistant", content="", - model=model_name_used or "jarvis", + model=(model_name_used or "jarvis") if runtime_name == "jarvis" else runtime_name, attachments=None, ) self.db.add(assistant_msg) @@ -1072,6 +1150,70 @@ class AgentService: if recall_ctx: memory_ctx = f"{memory_ctx}\n{recall_ctx}" if memory_ctx else recall_ctx + if runtime_name == "hermes": + user = await self.db.get(User, user_id) + if user is None: + raise ValueError("用户不存在") + prepared = RuntimePreparedContext( + user=user, + conversation=conv, + user_message=user_msg, + assistant_message=assistant_msg, + raw_message=message, + full_message=message, + file_ids=file_ids or [], + model_name=model_name_used, + memory_context=memory_ctx, + ) + response_content, resolved_model_name = await hermes_runtime_adapter.chat_once(prepared) + assistant_msg.content = response_content + assistant_msg.model = resolved_model_name or "hermes" + assistant_msg.attachments = [{ + "kind": "runtime_info", + "runtime": "hermes", + "session_id": hermes_session_manager.get_or_create( + conversation_id=conv.id, + user_id=user_id, + ).hermes_session_id, + "model": resolved_model_name, + }] + conv.agent_state = { + "runtime": "hermes", + "runtime_state": { + "hermes": { + "session_id": hermes_session_manager.get_or_create( + conversation_id=conv.id, + user_id=user_id, + ).hermes_session_id, + "message_id": assistant_msg.id, + "model": resolved_model_name, + } + }, + } + await brain_service.create_event( + user_id, + source_type="conversation", + source_id=conversation_id, + event_type="message_created", + title="Assistant message", + content_summary=response_content[:500], + raw_excerpt=response_content[:2000], + metadata_={"role": "assistant", "runtime": "hermes"}, + importance_signal=0.8, + ) + await self.db.commit() + await self.db.refresh(assistant_msg) + schedule_retrospective_job( + user_id=user_id, + conversation_id=conversation_id, + request_message_id=user_msg.id, + response_message_id=assistant_msg.id, + query_text=message, + final_response=response_content, + state=None, + ) + return conversation_id, assistant_msg.id, response_content, assistant_msg.model + set_current_user(user_id) try: graph = get_agent_graph()