Files
X-Agents/core/agents/api/routes.py

332 lines
11 KiB
Python
Raw Normal View History

"""FastAPI routes for agent communication with Go backend."""
import json
import logging
import time
from typing import Any, AsyncGenerator
from fastapi import APIRouter, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
logger = logging.getLogger(__name__)
router = APIRouter()
# Request/Response models - aligned with Go backend
class ChatRequest(BaseModel):
"""Chat request from Go backend.
Fields aligned with server/internal/service/agent_service.go::AgentChatRequest
"""
agent_id: str # 支持 UUID 字符串
message: str
user_id: int = 0
session_id: str | None = None
model_id: str | None = None
model_name: str | None = None
model_provider: str | None = None
api_key: str | None = None
base_url: str | None = None
use_xbot: bool = False
class ChatResponse(BaseModel):
"""Chat response to Go backend.
Fields aligned with server/internal/service/agent_service.go::AgentChatResponse
"""
agent_id: str # 支持 UUID 字符串
response: str
tool_calls: list = []
tokens_used: int = 0
duration_ms: int = 0
session_id: str
class TeamChatRequest(BaseModel):
"""Team chat request from Go backend.
Fields aligned with server/internal/service/agent_service.go::TeamChatRequest
"""
supervisor_agent_id: int
member_agent_ids: list[int]
message: str
user_id: int = 0
session_id: str | None = None
strategy: str = "parallel"
class TeamChatResponse(BaseModel):
"""Team chat response to Go backend.
Fields aligned with server/internal/service/agent_service.go::TeamChatResponse
"""
supervisor_agent_id: int
response: str
subtask_results: list = []
strategy: str = "parallel"
duration_ms: int = 0
session_id: str
class HealthResponse(BaseModel):
"""Health check response."""
status: str
version: str = "0.1.0"
# Global agent instance (to be initialized by main)
_agent = None
_team_agent = None
def set_agent(agent: Any) -> None:
"""Set the global agent instance.
Args:
agent: Agent loop instance
"""
global _agent
_agent = agent
def set_team_agent(team_agent: Any) -> None:
"""Set the global team agent instance.
Args:
team_agent: Team agent instance
"""
global _team_agent
_team_agent = team_agent
def add_cors(app) -> None:
"""Add CORS middleware to allow Go backend cross-origin requests.
Args:
app: FastAPI application instance
"""
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@router.get("/health", response_model=HealthResponse)
async def health_check() -> HealthResponse:
"""Health check endpoint."""
return HealthResponse(status="ok")
@router.post("/agent/chat", response_model=ChatResponse)
async def chat(request: ChatRequest) -> ChatResponse:
"""Handle chat requests from Go backend.
Path: POST /agent/chat
Aligned with Go backend server/internal/service/agent_service.go
Args:
request: Chat request with agent_id, message, user_id, etc.
Returns:
Chat response with agent_id, response, tool_calls, tokens_used, duration_ms, session_id
Raises:
HTTPException: If agent is not initialized or processing fails
"""
if _agent is None:
raise HTTPException(status_code=500, detail="Agent not initialized")
start_time = time.time()
session_id = request.session_id or f"session_{request.agent_id}_{int(start_time)}"
try:
# Prepare kwargs for agent.chat()
kwargs = {
"message": request.message,
"session_key": session_id,
}
# Add optional model configuration
if request.model_id:
kwargs["model_id"] = request.model_id
if request.model_name:
kwargs["model_name"] = request.model_name
if request.model_provider:
kwargs["model_provider"] = request.model_provider
if request.api_key:
kwargs["api_key"] = request.api_key
if request.base_url:
kwargs["base_url"] = request.base_url
if request.use_xbot:
kwargs["use_xbot"] = request.use_xbot
# Process the message
logger.info(f"[chat] kwargs: model_provider={kwargs.get('model_provider')}, model_name={kwargs.get('model_name')}, api_key={'set' if kwargs.get('api_key') else 'not set'}")
result = await _agent.chat(**kwargs)
logger.info(f"[chat] result type={type(result).__name__}, content={str(result)[:100]}")
# Extract response content
if isinstance(result, dict):
response_text = result.get("response", result.get("content", str(result)))
tool_calls = result.get("tool_calls", [])
tokens_used = result.get("tokens_used", 0)
else:
response_text = str(result)
tool_calls = []
tokens_used = 0
duration_ms = int((time.time() - start_time) * 1000)
return ChatResponse(
agent_id=request.agent_id,
response=response_text,
tool_calls=tool_calls,
tokens_used=tokens_used,
duration_ms=duration_ms,
session_id=session_id,
)
except Exception as e:
logger.exception(f"Error processing chat: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/agent/chat/stream")
async def chat_stream(request: ChatRequest):
"""Handle streaming chat requests from Go backend.
Path: POST /agent/chat/stream
Returns streaming response using SSE format.
Args:
request: Chat request with agent_id, message, user_id, etc.
Yields:
Streaming response chunks in SSE format
"""
logger.info(f"[chat_stream] Received request: agent_id={request.agent_id}, message={request.message[:50]}...")
if _agent is None:
logger.error("[chat_stream] Agent not initialized!")
raise HTTPException(status_code=500, detail="Agent not initialized")
session_id = request.session_id or f"session_{request.agent_id}_{int(time.time())}"
async def generate() -> AsyncGenerator[str, None]:
"""Generate streaming response."""
try:
logger.info(f"[chat_stream] Starting stream for session: {session_id}")
# Prepare kwargs for agent.chat()
kwargs = {
"message": request.message,
"session_key": session_id,
}
if request.model_id:
kwargs["model_id"] = request.model_id
logger.info(f"[chat_stream] Using model_id: {request.model_id}")
if request.model_name:
kwargs["model_name"] = request.model_name
logger.info(f"[chat_stream] Using model_name: {request.model_name}")
if request.model_provider:
kwargs["model_provider"] = request.model_provider
logger.info(f"[chat_stream] Using model_provider: {request.model_provider}")
if request.api_key:
kwargs["api_key"] = request.api_key
logger.info(f"[chat_stream] Using api_key: {request.api_key[:10]}...")
if request.base_url:
kwargs["base_url"] = request.base_url
logger.info(f"[chat_stream] Using base_url: {request.base_url}")
if request.use_xbot:
kwargs["use_xbot"] = request.use_xbot
logger.info(f"[chat_stream] Using use_xbot: {request.use_xbot}")
# Process with streaming
chunk_count = 0
async for chunk in _agent.chat_stream(**kwargs):
chunk_count += 1
logger.info(f"[chat_stream] Yielding chunk {chunk_count}: {chunk}")
# SSE format: "data: <json>\n\n" - ensure_ascii=False to output UTF-8 characters directly
yield f"data: {json.dumps(chunk, ensure_ascii=False)}\n\n"
logger.info(f"[chat_stream] Stream complete, yielded {chunk_count} chunks")
# Send final message
yield f"data: {json.dumps({'done': True, 'session_id': session_id}, ensure_ascii=False)}\n\n"
except Exception as e:
logger.exception(f"Error in streaming chat: {e}")
yield f"data: {json.dumps({'error': str(e)}, ensure_ascii=False)}\n\n"
from fastapi.responses import StreamingResponse
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no-cache", # Disable nginx buffering
},
)
@router.post("/agent/team/chat", response_model=TeamChatResponse)
async def team_chat(request: TeamChatRequest) -> TeamChatResponse:
"""Handle team chat requests from Go backend.
Path: POST /agent/team/chat
Aligned with Go backend server/internal/service/agent_service.go::TeamChat
Args:
request: Team chat request with supervisor_agent_id, member_agent_ids, message, etc.
Returns:
Team chat response with supervisor_agent_id, response, subtask_results, strategy, duration_ms, session_id
Raises:
HTTPException: If team agent is not initialized or processing fails
"""
if _team_agent is None:
raise HTTPException(status_code=500, detail="Team agent not initialized")
start_time = time.time()
session_id = request.session_id or f"team_session_{request.supervisor_agent_id}_{int(start_time)}"
try:
# Process the team chat message
result = await _team_agent.chat(
message=request.message,
session_id=session_id,
supervisor_agent_id=request.supervisor_agent_id,
member_agent_ids=request.member_agent_ids,
strategy=request.strategy,
)
# Extract response content
if isinstance(result, dict):
response_text = result.get("response", str(result))
subtask_results = result.get("subtask_results", [])
else:
response_text = str(result)
subtask_results = []
duration_ms = int((time.time() - start_time) * 1000)
return TeamChatResponse(
supervisor_agent_id=request.supervisor_agent_id,
response=response_text,
subtask_results=subtask_results,
strategy=request.strategy,
duration_ms=duration_ms,
session_id=session_id,
)
except Exception as e:
logger.exception(f"Error processing team chat: {e}")
raise HTTPException(status_code=500, detail=str(e))