Files
X-Agents/agent/app/main.py

203 lines
5.1 KiB
Python
Raw Normal View History

"""
FastAPI Agent Engine Server
"""
import os
import time
from typing import Optional
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from fastapi.middleware.cors import CORSMiddleware
from app.agent.core import AgentCore, Supervisor, AgentConfig
from app.agent.llm import LLMFactory
app = FastAPI(title="X-Agents Python Engine", version="1.0.0")
# CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# === 请求/响应模型 ===
class ChatRequest(BaseModel):
"""对话请求"""
agent_id: int
message: str
user_id: int = 1
session_id: Optional[str] = None
class TeamChatRequest(BaseModel):
"""多智能体群聊请求"""
supervisor_agent_id: int
member_agent_ids: list[int]
message: str
user_id: int = 1
session_id: Optional[str] = None
strategy: str = "parallel"
class ChatResponse(BaseModel):
"""对话响应"""
agent_id: int
response: str
tool_calls: list = []
tokens_used: int = 0
duration_ms: int = 0
session_id: Optional[str] = None
# === 模拟数据存储 ===
# TODO: 后续替换为从数据库加载
_mock_agents = {
1: {
"id": 1,
"name": "数据分析助手",
"role_description": "你是一个专业的数据分析助手,擅长分析数据、生成报告。",
"model_provider": "openai",
"model_name": "gpt-4",
"skills": [1, 2]
},
2: {
"id": 2,
"name": "代码审查助手",
"role_description": "你是一个专业的代码审查助手擅长审查代码、发现bug。",
"model_provider": "openai",
"model_name": "gpt-4",
"skills": [3]
}
}
def get_agent_config(agent_id: int) -> AgentConfig:
"""获取智能体配置"""
agent_data = _mock_agents.get(agent_id)
if not agent_data:
raise HTTPException(status_code=404, detail="Agent not found")
return AgentConfig(
id=agent_data["id"],
name=agent_data["name"],
role_description=agent_data["role_description"],
model_provider=agent_data["model_provider"],
model_name=agent_data["model_name"],
skills=agent_data.get("skills", [])
)
# === API 路由 ===
@app.get("/")
async def root():
return {"message": "X-Agents Python Engine", "version": "1.0.0"}
@app.get("/health")
async def health():
return {"status": "healthy"}
@app.post("/agent/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
"""
单智能体对话
"""
start_time = time.time()
# 获取智能体配置
try:
config = get_agent_config(request.agent_id)
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
# 创建智能体实例
agent = AgentCore(config)
# 生成 session_id
session_id = request.session_id or f"session_{int(time.time())}"
# 执行对话
try:
result = await agent.run(request.message, request.user_id, session_id)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
duration_ms = int((time.time() - start_time) * 1000)
return ChatResponse(
agent_id=request.agent_id,
response=result.content,
tool_calls=result.tool_calls,
tokens_used=result.tokens_used,
duration_ms=duration_ms,
session_id=session_id
)
@app.post("/agent/team/chat")
async def team_chat(request: TeamChatRequest):
"""
多智能体群聊
"""
start_time = time.time()
# 创建主智能体
try:
supervisor_config = get_agent_config(request.supervisor_agent_id)
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
supervisor_agent = AgentCore(supervisor_config)
# 创建子智能体
members = []
for member_id in request.member_agent_ids:
try:
member_config = get_agent_config(member_id)
members.append(AgentCore(member_config))
except:
continue
if not members:
raise HTTPException(status_code=400, detail="No valid member agents")
# 创建调度器
supervisor = Supervisor(supervisor_agent, members, request.strategy)
# 生成 session_id
session_id = request.session_id or f"team_session_{int(time.time())}"
# 执行群聊
try:
result = await supervisor.run(request.message, request.user_id, session_id)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
duration_ms = int((time.time() - start_time) * 1000)
return {
"supervisor_agent_id": request.supervisor_agent_id,
"response": result["main_response"],
"subtask_results": result["subtask_results"],
"strategy": result["strategy"],
"duration_ms": duration_ms,
"session_id": session_id
}
if __name__ == "__main__":
import uvicorn
port = int(os.getenv("AGENT_PORT", "8081"))
uvicorn.run(app, host="0.0.0.0", port=port)