Files
X-Agents/server/internal/service/agent_service.go
DESKTOP-72TV0V4\caoxiaozhu 31f0feafb5 feat: 增强会话管理和 Agent 服务
- 优化 session_handler 会话处理逻辑
- 增强 agent_service Agent 服务功能
- 新增 chat_repository 仓储方法
- 更新 agent_handler 和 chat_group_handler
- 更新数据模型 agent 和 chat_session

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-15 19:49:27 +08:00

501 lines
15 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package service
import (
"bytes"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"time"
"x-agents/server/internal/model"
"x-agents/server/internal/repository"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
)
// AgentChatRequest Python Agent 对话请求
type AgentChatRequest struct {
AgentID string `json:"agent_id"` // 支持 UUID 字符串
Message string `json:"message"`
UserID int `json:"user_id"`
SessionID string `json:"session_id,omitempty"`
ModelID string `json:"model_id,omitempty"`
ModelName string `json:"model_name,omitempty"`
ModelProvider string `json:"model_provider,omitempty"`
APIKey string `json:"api_key,omitempty"`
BaseURL string `json:"base_url,omitempty"`
UseXBot bool `json:"use_xbot"`
}
// AgentChatResponse Python Agent 对话响应
type AgentChatResponse struct {
AgentID string `json:"agent_id"` // 支持 UUID 字符串
Response string `json:"response"`
ToolCalls []interface{} `json:"tool_calls"`
TokensUsed int `json:"tokens_used"`
DurationMs int `json:"duration_ms"`
SessionID string `json:"session_id"`
}
// TeamChatRequest 多智能体群聊请求
type TeamChatRequest struct {
SupervisorAgentID int `json:"supervisor_agent_id"`
MemberAgentIDs []int `json:"member_agent_ids"`
Message string `json:"message"`
UserID int `json:"user_id"`
SessionID string `json:"session_id,omitempty"`
Strategy string `json:"strategy,omitempty"`
}
// TeamChatResponse 多智能体群聊响应
type TeamChatResponse struct {
SupervisorAgentID int `json:"supervisor_agent_id"`
Response string `json:"response"`
SubtaskResults []interface{} `json:"subtask_results"`
Strategy string `json:"strategy"`
DurationMs int `json:"duration_ms"`
SessionID string `json:"session_id"`
}
// AgentService Python Agent 服务
type AgentService struct {
pythonURL string
client *http.Client
modelRepo *repository.ModelRepository
agentRepo *repository.AgentRepository
chatRepo *repository.ChatRepository
}
// NewAgentService 创建 Agent 服务
func NewAgentService(pythonURL string, modelRepo *repository.ModelRepository, agentRepo *repository.AgentRepository, chatRepo *repository.ChatRepository) *AgentService {
return &AgentService{
pythonURL: pythonURL,
client: &http.Client{
Timeout: 120 * time.Second, // Agent 可能需要较长时间
},
modelRepo: modelRepo,
agentRepo: agentRepo,
chatRepo: chatRepo,
}
}
// Chat 单智能体对话
func (s *AgentService) Chat(req AgentChatRequest) (*AgentChatResponse, error) {
// 如果传入了 model_id查询模型配置获取 api_key 和 base_url
log.Printf("[AgentService] Chat called, model_id: %s, modelRepo: %v", req.ModelID, s.modelRepo != nil)
if req.ModelID != "" && s.modelRepo != nil {
model, err := s.modelRepo.FindByID(req.ModelID)
if err != nil {
log.Printf("[AgentService] Error finding model: %v", err)
} else if model != nil {
log.Printf("[AgentService] Found model: id=%s, provider=%s, model=%s, base_url=%s, api_key_len=%d",
model.ID, model.Provider, model.Model, model.BaseURL, len(model.APIKey))
req.APIKey = model.APIKey
req.BaseURL = model.BaseURL
req.ModelProvider = model.Provider
req.ModelName = model.Model
log.Printf("[AgentService] Set req.APIKey=%s, req.BaseURL=%s", req.APIKey[:10]+"...", req.BaseURL)
} else {
log.Printf("[AgentService] Model not found for id: %s", req.ModelID)
}
} else if s.modelRepo == nil {
log.Printf("[AgentService] WARNING: modelRepo is nil!")
}
// 打印传给 Python 的请求内容
apiKeyPreview := ""
if req.APIKey != "" {
apiKeyPreview = req.APIKey
if len(apiKeyPreview) > 10 {
apiKeyPreview = apiKeyPreview[:10] + "..."
}
}
log.Printf("[AgentService] Sending to Python: model_id=%s, api_key=%s, base_url=%s, provider=%s, model=%s",
req.ModelID, apiKeyPreview, req.BaseURL, req.ModelProvider, req.ModelName)
url := fmt.Sprintf("%s/api/v1/agent/chat", s.pythonURL)
jsonData, err := json.Marshal(req)
if err != nil {
return nil, fmt.Errorf("failed to marshal request: %w", err)
}
httpReq, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
if err != nil {
return nil, fmt.Errorf("failed to create request: %w", err)
}
httpReq.Header.Set("Content-Type", "application/json")
resp, err := s.client.Do(httpReq)
if err != nil {
return nil, fmt.Errorf("failed to call python agent: %w", err)
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("failed to read response: %w", err)
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("python agent error: %s", string(body))
}
var result AgentChatResponse
if err := json.Unmarshal(body, &result); err != nil {
return nil, fmt.Errorf("failed to unmarshal response: %w", err)
}
return &result, nil
}
// TeamChat 多智能体群聊
func (s *AgentService) TeamChat(req TeamChatRequest) (*TeamChatResponse, error) {
url := fmt.Sprintf("%s/api/v1/agent/team/chat", s.pythonURL)
// 设置默认策略
if req.Strategy == "" {
req.Strategy = "parallel"
}
jsonData, err := json.Marshal(req)
if err != nil {
return nil, fmt.Errorf("failed to marshal request: %w", err)
}
httpReq, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
if err != nil {
return nil, fmt.Errorf("failed to create request: %w", err)
}
httpReq.Header.Set("Content-Type", "application/json")
resp, err := s.client.Do(httpReq)
if err != nil {
return nil, fmt.Errorf("failed to call python agent: %w", err)
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("failed to read response: %w", err)
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("python agent error: %s", string(body))
}
var result TeamChatResponse
if err := json.Unmarshal(body, &result); err != nil {
return nil, fmt.Errorf("failed to unmarshal response: %w", err)
}
return &result, nil
}
// ChatStream 流式对话
func (s *AgentService) ChatStream(c interface{}, agentID string, message, sessionID, modelID string, userID int) error {
// 获取 gin.Context
ginCtx, ok := c.(*gin.Context)
if !ok {
return fmt.Errorf("invalid context type")
}
log.Printf("[ChatStream] Request: agentID=%s, message=%s, sessionID=%s, modelID=%s, userID=%d",
agentID, message, sessionID, modelID, userID)
// 初始化请求体
reqBody := map[string]interface{}{
"agent_id": agentID, // 传递字符串类型的 agent_id支持 UUID
"message": message,
"user_id": userID,
"session_id": sessionID,
"use_xbot": false,
}
// 如果传入了 model_id查询模型配置获取 api_key 和 base_url
if modelID != "" && s.modelRepo != nil {
model, err := s.modelRepo.FindByID(modelID)
if err != nil {
log.Printf("[ChatStream] Model not found: %s, error: %v", modelID, err)
} else if model != nil {
log.Printf("[ChatStream] Using model: provider=%s, model=%s, base_url=%s", model.Provider, model.Model, model.BaseURL)
// 将模型配置添加到请求体
reqBody["model_provider"] = model.Provider
reqBody["model_name"] = model.Model
reqBody["api_key"] = model.APIKey
reqBody["base_url"] = model.BaseURL
}
} else {
log.Printf("[ChatStream] modelID is empty or modelRepo is nil: modelID=%s, modelRepo=%v", modelID, s.modelRepo != nil)
}
streamURL := fmt.Sprintf("%s/api/v1/agent/chat/stream", s.pythonURL)
jsonData, err := json.Marshal(reqBody)
if err != nil {
return fmt.Errorf("failed to marshal request: %w", err)
}
// 创建 HTTP 请求,设置不缓冲
httpReq, err := http.NewRequest("POST", streamURL, bytes.NewBuffer(jsonData))
if err != nil {
return fmt.Errorf("failed to create request: %w", err)
}
httpReq.Header.Set("Content-Type", "application/json")
httpReq.Header.Set("Accept", "text/event-stream")
httpReq.Header.Set("Cache-Control", "no-cache")
// 创建不缓冲的 HTTP 客户端
client := &http.Client{
Transport: &http.Transport{
DisableKeepAlives: true,
},
}
resp, err := client.Do(httpReq)
if err != nil {
return fmt.Errorf("failed to call python agent: %w", err)
}
defer resp.Body.Close()
// 设置 SSE 响应头
ginCtx.Header("Content-Type", "text/event-stream")
ginCtx.Header("Cache-Control", "no-cache")
ginCtx.Header("Connection", "keep-alive")
ginCtx.Header("X-Accel-Buffering", "no")
// 分块读取并转发,使用小 buffer 减少延迟
buf := make([]byte, 1024)
for {
n, err := resp.Body.Read(buf)
if n > 0 {
log.Printf("[ChatStream] Received %d bytes from Python", n)
_, writeErr := ginCtx.Writer.Write(buf[:n])
if writeErr != nil {
log.Printf("[ChatStream] Write error: %v", writeErr)
break
}
// 强制刷新到客户端
if flusher, ok := ginCtx.Writer.(interface{ Flush() }); ok {
flusher.Flush()
}
}
if err != nil {
log.Printf("[ChatStream] Done reading from Python, err: %v", err)
break
}
}
return nil
}
// CreateAgentRequest 创建智能体请求
type CreateAgentRequest struct {
Name string `json:"name"`
Description string `json:"description"`
Avatar string `json:"avatar"`
SkillsMode string `json:"skillsMode"`
Skills []string `json:"skills"`
Knowledge string `json:"knowledge"`
Prompt string `json:"prompt"`
ModelProvider string `json:"model_provider"`
ModelName string `json:"model_name"`
UserID int `json:"user_id"`
}
// CreateAgentResponse 创建智能体响应
type CreateAgentResponse struct {
AgentID int `json:"agent_id"` // 保留兼容性
AgentIDStr string `json:"agent_id_str"` // 返回实际的 UUID
Name string `json:"name"`
Message string `json:"message"`
}
// CreateAgent 创建智能体
func (s *AgentService) CreateAgent(req CreateAgentRequest, userID int) (*CreateAgentResponse, error) {
if s.agentRepo == nil {
log.Printf("[AgentService] CreateAgent: agentRepo is nil!")
return nil, fmt.Errorf("agent repository not initialized")
}
log.Printf("[AgentService] CreateAgent: %s (userID: %d), skillsMode: %s, skills: %v", req.Name, userID, req.SkillsMode, req.Skills)
// 处理 skills根据 skillsMode 决定
var skills []string
if req.SkillsMode == "all" {
// "all" 模式用 "*" 表示
skills = []string{"*"}
} else {
skills = req.Skills
}
// 创建 Agent 模型并保存到数据库
agent := &model.Agent{
ID: uuid.New().String(),
Name: req.Name,
Description: req.Description,
OwnerID: fmt.Sprintf("%d", userID),
Avatar: req.Avatar,
Skills: skills,
RoleDescription: req.Prompt,
ModelProvider: req.ModelProvider,
ModelName: req.ModelName,
IsActive: true,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}
log.Printf("[AgentService] CreateAgent: calling Create, agent = %+v", agent)
if err := s.agentRepo.Create(agent); err != nil {
log.Printf("[AgentService] CreateAgent: Create error: %v", err)
return nil, fmt.Errorf("failed to create agent: %w", err)
}
log.Printf("[AgentService] CreateAgent: created successfully")
log.Printf("[AgentService] Agent created in database: %s (ID: %s)", agent.Name, agent.ID)
// 返回数据库中实际的 Agent ID (UUID字符串)
return &CreateAgentResponse{
AgentIDStr: agent.ID,
Name: agent.Name,
Message: "Agent created successfully",
}, nil
}
// ListAgentsResponse 获取智能体列表响应
type ListAgentsResponse struct {
Agents []interface{} `json:"agents"`
}
// ListAgents 获取智能体列表(从数据库获取)
func (s *AgentService) ListAgents() (*ListAgentsResponse, error) {
if s.agentRepo == nil {
log.Printf("[AgentService] ListAgents: agentRepo is nil!")
return nil, fmt.Errorf("agent repository not initialized")
}
log.Printf("[AgentService] ListAgents: calling FindAll")
agents, err := s.agentRepo.FindAll()
if err != nil {
log.Printf("[AgentService] ListAgents: FindAll error: %v", err)
return nil, fmt.Errorf("failed to list agents: %w", err)
}
log.Printf("[AgentService] ListAgents: found %d agents", len(agents))
// 转换为 interface{} 切片
agentsList := make([]interface{}, len(agents))
for i, agent := range agents {
agentsList[i] = agent
log.Printf("[AgentService] ListAgents: agent[%d] = %+v", i, agent)
}
return &ListAgentsResponse{
Agents: agentsList,
}, nil
}
// UpdateAgentStatus 更新智能体状态
func (s *AgentService) UpdateAgentStatus(agentID string, isActive bool) error {
if s.agentRepo == nil {
return fmt.Errorf("agent repository not initialized")
}
log.Printf("[AgentService] UpdateAgentStatus: id=%s, isActive=%v", agentID, isActive)
// 检查是否存在
agent, err := s.agentRepo.FindByID(agentID)
if err != nil {
return fmt.Errorf("agent not found: %w", err)
}
agent.IsActive = isActive
agent.UpdatedAt = time.Now()
if err := s.agentRepo.Update(agent); err != nil {
return fmt.Errorf("failed to update agent: %w", err)
}
log.Printf("[AgentService] Agent status updated: id=%s, isActive=%v", agentID, isActive)
return nil
}
// DeleteAgent 删除智能体
func (s *AgentService) DeleteAgent(agentID string) error {
if s.agentRepo == nil {
return fmt.Errorf("agent repository not initialized")
}
log.Printf("[AgentService] DeleteAgent: id=%s", agentID)
// 检查是否存在
_, err := s.agentRepo.FindByID(agentID)
if err != nil {
return fmt.Errorf("agent not found: %w", err)
}
// 先删除该智能体的所有会话和消息
if s.chatRepo != nil {
if err := s.chatRepo.DeleteSessionsByAgentID(agentID); err != nil {
log.Printf("[AgentService] DeleteAgent: failed to delete sessions: %v", err)
// 继续尝试删除 agent不因为 session 删除失败而中止
}
}
if err := s.agentRepo.Delete(agentID); err != nil {
return fmt.Errorf("failed to delete agent: %w", err)
}
log.Printf("[AgentService] Agent deleted: id=%s", agentID)
return nil
}
// UpdateAgent 更新智能体
func (s *AgentService) UpdateAgent(agentID, name, description, avatar string, skills []string, roleDescription, modelProvider, modelName string) error {
if s.agentRepo == nil {
return fmt.Errorf("agent repository not initialized")
}
log.Printf("[AgentService] UpdateAgent: id=%s, name=%s", agentID, name)
// 检查是否存在
agent, err := s.agentRepo.FindByID(agentID)
if err != nil {
return fmt.Errorf("agent not found: %w", err)
}
// 更新字段
if name != "" {
agent.Name = name
}
if description != "" {
agent.Description = description
}
if avatar != "" {
agent.Avatar = avatar
}
if skills != nil {
agent.Skills = skills
}
if roleDescription != "" {
agent.RoleDescription = roleDescription
}
if modelProvider != "" {
agent.ModelProvider = modelProvider
}
if modelName != "" {
agent.ModelName = modelName
}
agent.UpdatedAt = time.Now()
if err := s.agentRepo.Update(agent); err != nil {
return fmt.Errorf("failed to update agent: %w", err)
}
log.Printf("[AgentService] Agent updated: id=%s", agentID)
return nil
}