""" FastAPI Agent Engine Server """ import os import sys import time import logging from datetime import datetime from typing import Optional from fastapi import FastAPI, HTTPException from pydantic import BaseModel from fastapi.middleware.cors import CORSMiddleware from app.agent.core import AgentCore, Supervisor, AgentConfig from app.agent.llm import LLMFactory # 日志目录 - 放在 server/logs 下 LOG_DIR = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "server", "logs", datetime.now().strftime("%Y-%m-%d")) os.makedirs(LOG_DIR, exist_ok=True) # 成功/失败日志文件 today = datetime.now().strftime("%Y-%m-%d") success_log_file = os.path.join(LOG_DIR, f"python_success.log") failure_log_file = os.path.join(LOG_DIR, f"python_failure.log") def setup_logging(): """配置日志系统""" log_level = os.getenv("LOG_LEVEL", "INFO") # 创建格式化器 formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) # 控制台处理器 console_handler = logging.StreamHandler(sys.stdout) console_handler.setFormatter(formatter) # 成功日志文件处理器 success_handler = logging.FileHandler(success_log_file, encoding='utf-8') success_handler.setFormatter(formatter) success_handler.setLevel(logging.INFO) # 失败日志文件处理器 failure_handler = logging.FileHandler(failure_log_file, encoding='utf-8') failure_handler.setFormatter(formatter) failure_handler.setLevel(logging.WARNING) # 根日志配置 root_logger = logging.getLogger() root_logger.setLevel(getattr(logging, log_level)) root_logger.addHandler(console_handler) root_logger.addHandler(success_handler) root_logger.addHandler(failure_handler) return root_logger # 成功日志记录器(只记录 INFO 级别到成功日志) class SuccessLogger: """成功日志记录器""" @staticmethod def log(message: str): """记录成功日志""" logger = logging.getLogger("success") logger.setLevel(logging.INFO) handler = logging.FileHandler(success_log_file, encoding='utf-8') handler.setFormatter(logging.Formatter('%(asctime)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')) logger.addHandler(handler) logger.info(message) # 同时输出到控制台 print(f"✅ {message}") # 失败日志记录器 class FailureLogger: """失败日志记录器""" @staticmethod def log(message: str, error: str = ""): """记录失败日志""" logger = logging.getLogger("failure") logger.setLevel(logging.WARNING) handler = logging.FileHandler(failure_log_file, encoding='utf-8') handler.setFormatter(logging.Formatter('%(asctime)s - %(message)s - %(error)s', datefmt='%Y-%m-%d %H:%M:%S')) logger.addHandler(handler) full_message = f"{message} - Error: {error}" if error else message logger.warning(full_message) # 同时输出到控制台 print(f"❌ {full_message}") logger = setup_logging() app = FastAPI(title="X-Agents Python Engine", version="1.0.0") # CORS app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # === 请求/响应模型 === class ChatRequest(BaseModel): """对话请求""" agent_id: int message: str user_id: int = 1 session_id: Optional[str] = None # 模型参数(可选,如果传了就使用,否则用智能体配置的默认模型) model_id: Optional[str] = None model_name: Optional[str] = None model_provider: Optional[str] = None api_key: Optional[str] = None base_url: Optional[str] = None class TeamChatRequest(BaseModel): """多智能体群聊请求""" supervisor_agent_id: int member_agent_ids: list[int] message: str user_id: int = 1 session_id: Optional[str] = None strategy: str = "parallel" class ChatResponse(BaseModel): """对话响应""" agent_id: int response: str tool_calls: list = [] tokens_used: int = 0 duration_ms: int = 0 session_id: Optional[str] = None # === 模拟数据存储 === # TODO: 后续替换为从数据库加载 _mock_agents = { 1: { "id": 1, "name": "数据分析助手", "role_description": "你是一个专业的数据分析助手,擅长分析数据、生成报告。", "model_provider": "openai", "model_name": "gpt-4", "skills": [1, 2] }, 2: { "id": 2, "name": "代码审查助手", "role_description": "你是一个专业的代码审查助手,擅长审查代码、发现bug。", "model_provider": "openai", "model_name": "gpt-4", "skills": [3] } } def get_agent_config(agent_id: int, api_key: str = None, base_url: str = None) -> AgentConfig: """获取智能体配置""" agent_data = _mock_agents.get(agent_id) if not agent_data: raise HTTPException(status_code=404, detail="Agent not found") return AgentConfig( id=agent_data["id"], name=agent_data["name"], role_description=agent_data["role_description"], model_provider=agent_data["model_provider"], model_name=agent_data["model_name"], api_key=api_key, base_url=base_url, skills=agent_data.get("skills", []) ) # === API 路由 === @app.get("/") async def root(): return {"message": "X-Agents Python Engine", "version": "1.0.0"} @app.get("/health") async def health(): return {"status": "healthy"} @app.post("/agent/chat", response_model=ChatResponse) async def chat(request: ChatRequest): """ 单智能体对话 """ chat_logger = logging.getLogger("agent.chat") # 打印请求参数(隐藏 api_key 敏感信息) api_key_preview = f"{request.api_key[:10]}..." if request.api_key else "None" chat_logger.info(f"========== 收到聊天请求 ==========") chat_logger.info(f"agent_id: {request.agent_id}") chat_logger.info(f"model_id: {request.model_id}") chat_logger.info(f"model_provider: {request.model_provider}") chat_logger.info(f"model_name: {request.model_name}") chat_logger.info(f"api_key: {api_key_preview}") chat_logger.info(f"base_url: {request.base_url}") chat_logger.info(f"message: {request.message[:50]}...") start_time = time.time() # 获取智能体配置 try: config = get_agent_config(request.agent_id, request.api_key, request.base_url) chat_logger.info(f"Agent config loaded: provider={config.model_provider}, model={config.model_name}") except HTTPException as e: FailureLogger.log(f"Agent not found: agent_id={request.agent_id}", str(e)) chat_logger.error(f"Agent not found: {e}") raise except Exception as e: FailureLogger.log(f"Error loading config: agent_id={request.agent_id}", str(e)) chat_logger.error(f"Error loading config: {e}") raise HTTPException(status_code=400, detail=str(e)) # 如果请求中指定了模型,覆盖智能体的默认配置 if request.model_provider: config.model_provider = request.model_provider if request.model_name: config.model_name = request.model_name chat_logger.info(f"Final LLM config: provider={config.model_provider}, model={config.model_name}, api_key={config.api_key[:10] if config.api_key else 'None'}..., base_url={config.base_url}") # 创建智能体实例 agent = AgentCore(config) # 生成 session_id session_id = request.session_id or f"session_{int(time.time())}" # 执行对话 try: result = await agent.run(request.message, request.user_id, session_id) except Exception as e: FailureLogger.log(f"Agent execution failed: agent_id={request.agent_id}, message={request.message[:30]}", str(e)) chat_logger.error(f"Agent execution error: {e}") raise HTTPException(status_code=500, detail=str(e)) duration_ms = int((time.time() - start_time) * 1000) # 记录成功日志 SuccessLogger.log(f"Chat success: agent_id={request.agent_id}, duration={duration_ms}ms, message={request.message[:30]}...") return ChatResponse( agent_id=request.agent_id, response=result.content, tool_calls=result.tool_calls, tokens_used=result.tokens_used, duration_ms=duration_ms, session_id=session_id ) @app.post("/agent/team/chat") async def team_chat(request: TeamChatRequest): """ 多智能体群聊 """ start_time = time.time() # 创建主智能体 try: supervisor_config = get_agent_config(request.supervisor_agent_id) except HTTPException: raise except Exception as e: raise HTTPException(status_code=400, detail=str(e)) supervisor_agent = AgentCore(supervisor_config) # 创建子智能体 members = [] for member_id in request.member_agent_ids: try: member_config = get_agent_config(member_id) members.append(AgentCore(member_config)) except: continue if not members: raise HTTPException(status_code=400, detail="No valid member agents") # 创建调度器 supervisor = Supervisor(supervisor_agent, members, request.strategy) # 生成 session_id session_id = request.session_id or f"team_session_{int(time.time())}" # 执行群聊 try: result = await supervisor.run(request.message, request.user_id, session_id) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) duration_ms = int((time.time() - start_time) * 1000) return { "supervisor_agent_id": request.supervisor_agent_id, "response": result["main_response"], "subtask_results": result["subtask_results"], "strategy": result["strategy"], "duration_ms": duration_ms, "session_id": session_id } if __name__ == "__main__": import uvicorn port = int(os.getenv("AGENT_PORT", "8081")) uvicorn.run(app, host="0.0.0.0", port=port)