Files
X-Agents/server/cmd/api/main.go

658 lines
20 KiB
Go
Raw Normal View History

package main
import (
"bytes"
"encoding/json"
"io"
"log"
"os"
"path/filepath"
"time"
"x-agents/server/internal/config"
"x-agents/server/internal/handler"
"x-agents/server/internal/middleware"
"x-agents/server/internal/model"
"x-agents/server/internal/repository"
"x-agents/server/internal/service"
"github.com/gin-gonic/gin"
swaggerFiles "github.com/swaggo/files"
ginSwagger "github.com/swaggo/gin-swagger"
)
// Logger 日志记录器
type Logger struct {
successLog *log.Logger
errorLog *log.Logger
}
func NewLogger() *Logger {
// 创建日志目录
today := time.Now().Format("2006-01-02")
logDir := filepath.Join("logs", today)
os.MkdirAll(logDir, 0755)
// 成功日志
successFile, _ := os.OpenFile(filepath.Join(logDir, "success.log"), os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
successLogger := log.New(successFile, "", log.Ldate|log.Ltime|log.Lshortfile)
// 错误日志
errorFile, _ := os.OpenFile(filepath.Join(logDir, "failure.log"), os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
errorLogger := log.New(errorFile, "", log.Ldate|log.Ltime|log.Lshortfile)
return &Logger{
successLog: successLogger,
errorLog: errorLogger,
}
}
// LogRequest 记录请求
func (l *Logger) LogRequest(method, path, body string, status int, duration time.Duration) {
entry := "[%s] %s %s %d %v"
if status >= 400 {
l.errorLog.Printf(entry, method, path, body, status, duration)
} else {
l.successLog.Printf(entry, method, path, body, status, duration)
}
}
var logger *Logger
// initDefaultAdmin 初始化默认管理员用户
func initDefaultAdmin(userRepo *repository.UserRepository) {
// 确保 admin 用户的工作空间目录存在
ensureAdminWorkspace()
// 检查 admin 用户是否已存在
_, err := userRepo.FindByUsername("admin")
if err == nil {
log.Println("Admin user already exists")
return
}
// 使用 AuthService 创建用户(会自动加密密码)
authService := service.NewAuthService("", userRepo)
adminUser, err := authService.Register("admin", "admin", "admin@example.com")
if err != nil {
log.Printf("Failed to create admin user: %v", err)
return
}
// 更新角色为管理员
adminUser.RoleID = "admin"
userRepo.Update(adminUser)
// 创建管理员角色
perms, _ := json.Marshal([]int{int(model.PermissionRead), int(model.PermissionWrite), int(model.PermissionExecute), int(model.PermissionAdmin)})
adminRole := &model.Role{
ID: "admin",
Name: "admin",
Permissions: string(perms),
}
userRepo.CreateRole(adminRole)
log.Printf("Default admin user created: id=%s, username=admin", adminUser.ID)
}
// ensureAdminWorkspace 确保 admin 用户工作空间目录存在
func ensureAdminWorkspace() {
execPath, _ := os.Getwd()
projectRoot := execPath
// 如果当前目录名为 server向上找一级
baseName := filepath.Base(execPath)
if baseName == "server" {
projectRoot = filepath.Dir(execPath)
}
// 尝试向上查找包含 .git 的目录
if _, err := os.Stat(filepath.Join(projectRoot, ".git")); os.IsNotExist(err) {
for i := 0; i < 3; i++ {
parent := filepath.Dir(projectRoot)
if parent == projectRoot {
break
}
if _, err := os.Stat(filepath.Join(parent, ".git")); err == nil {
projectRoot = parent
break
}
projectRoot = parent
}
}
// 创建 skills 目录结构: core/agents/skills/{system,user}
skillsRoot := filepath.Join(projectRoot, "core", "agents", "skills")
for _, dir := range []string{"system", "user"} {
if err := os.MkdirAll(filepath.Join(skillsRoot, dir), 0755); err != nil {
log.Printf("Warning: failed to create skills directory %s: %v", dir, err)
}
}
log.Printf("Skills workspace created at: %s", skillsRoot)
}
func main() {
// 初始化日志
logger = NewLogger()
// 1. 加载配置
cfg := config.Load()
log.Printf("=== Server starting, port=%s ===", cfg.Port)
// 2. 初始化数据库
db, err := config.InitDB(cfg)
if err != nil {
log.Fatalf("Failed to connect database: %v", err)
}
// 3. 自动迁移表
if err := db.AutoMigrate(&model.DatabaseInfo{}, &model.SubTableInfo{}, &model.ModelInfo{}, &model.KnowledgeBase{}, &model.KnowledgeDocument{}, &model.User{}, &model.Role{}, &model.Tool{}, &model.MCP{}, &model.Skill{}, &model.Agent{}, &model.AgentSkill{}, &model.AgentKnowledgeBase{}, &model.AgentMemory{}, &model.AgentTeam{}, &model.AgentTask{}, &model.ChatSession{}, &model.ChatMessage{}, &model.ChatGroup{}).Error; err != nil {
log.Printf("Warning: AutoMigrate error: %v", err)
}
// 3.2 确保 agents 表存在(使用 SQL 强制创建)
db.Exec(`
CREATE TABLE IF NOT EXISTS agents (
id VARCHAR(191) PRIMARY KEY,
name VARCHAR(100) NOT NULL,
description TEXT,
owner_id VARCHAR(50) NOT NULL,
INDEX idx_owner_id (owner_id),
skills TEXT,
role_description TEXT,
model_provider VARCHAR(50),
model_name VARCHAR(100),
is_supervisor TINYINT(1) DEFAULT 0,
is_active TINYINT(1) DEFAULT 1,
created_at DATETIME(3),
updated_at DATETIME(3)
)
`)
// 3.1 确保 users 和 roles 表存在(使用 SQL 强制创建)
db.Exec(`
CREATE TABLE IF NOT EXISTS roles (
id VARCHAR(191) PRIMARY KEY,
name VARCHAR(191) UNIQUE,
permissions TEXT,
created_at DATETIME(3),
updated_at DATETIME(3)
)
`)
db.Exec(`
CREATE TABLE IF NOT EXISTS users (
id VARCHAR(191) PRIMARY KEY,
username VARCHAR(50) UNIQUE NOT NULL,
password VARCHAR(191) NOT NULL,
email VARCHAR(191),
role_id VARCHAR(50) NOT NULL,
is_active BOOLEAN DEFAULT TRUE,
created_at DATETIME(3),
updated_at DATETIME(3),
INDEX idx_users_username (username),
INDEX idx_users_email (email)
)
`)
// 3.2 确保 tools 表存在(使用 SQL 强制创建)
db.Exec(`
CREATE TABLE IF NOT EXISTS tools (
id VARCHAR(191) PRIMARY KEY,
name VARCHAR(100) UNIQUE NOT NULL,
description TEXT,
category VARCHAR(50) NOT NULL,
provider VARCHAR(100),
security_level VARCHAR(20) DEFAULT 'safe',
require_approval BOOLEAN DEFAULT FALSE,
parameters TEXT,
status VARCHAR(20) DEFAULT 'active',
created_at DATETIME(3),
updated_at DATETIME(3),
INDEX idx_tools_name (name),
INDEX idx_tools_category (category),
INDEX idx_tools_status (status)
)
`)
// 3.3 确保 MCP 表存在
db.Exec(`
CREATE TABLE IF NOT EXISTS mcp (
id VARCHAR(191) PRIMARY KEY,
name VARCHAR(100) UNIQUE NOT NULL,
description TEXT,
description_cn TEXT,
category VARCHAR(50) NOT NULL,
transport VARCHAR(20) DEFAULT 'stdio',
command VARCHAR(500),
args TEXT,
env TEXT,
status VARCHAR(20) DEFAULT 'active',
created_at DATETIME(3),
updated_at DATETIME(3),
INDEX idx_mcp_name (name),
INDEX idx_mcp_category (category),
INDEX idx_mcp_status (status)
)
`)
log.Println("MCP table verified/created")
// 3.4 确保 skills 表存在
db.Exec(`
CREATE TABLE IF NOT EXISTS skills (
id VARCHAR(191) PRIMARY KEY,
skill_name VARCHAR(200) NOT NULL,
skill_type VARCHAR(20) NOT NULL,
skill_desc TEXT,
path VARCHAR(500),
status VARCHAR(20) DEFAULT 'active',
created_at DATETIME(3),
updated_at DATETIME(3),
INDEX idx_skills_name (skill_name),
INDEX idx_skills_type (skill_type),
INDEX idx_skills_status (status)
)
`)
log.Println("Skills table verified/created")
// 3.5 确保 chat_sessions 表存在
db.Exec(`
CREATE TABLE IF NOT EXISTS chat_sessions (
id VARCHAR(36) PRIMARY KEY,
user_id VARCHAR(36) NOT NULL,
agent_id VARCHAR(36),
title VARCHAR(255),
model_id VARCHAR(36),
status VARCHAR(20) DEFAULT 'active',
created_at DATETIME(3),
updated_at DATETIME(3),
INDEX idx_chat_sessions_user (user_id),
INDEX idx_chat_sessions_agent (agent_id),
INDEX idx_chat_sessions_updated (updated_at DESC)
)
`)
log.Println("Chat sessions table verified/created")
// 3.6 确保 chat_messages 表存在
db.Exec(`
CREATE TABLE IF NOT EXISTS chat_messages (
id VARCHAR(36) PRIMARY KEY,
session_id VARCHAR(36) NOT NULL,
role VARCHAR(20),
content TEXT,
tokens_used INT DEFAULT 0,
duration_ms INT DEFAULT 0,
metadata TEXT,
created_at DATETIME(3),
INDEX idx_chat_messages_session (session_id),
INDEX idx_chat_messages_created (created_at ASC)
)
`)
log.Println("Chat messages table verified/created")
// 3.7 确保 chat_groups 表存在
db.Exec(`
CREATE TABLE IF NOT EXISTS chat_groups (
id VARCHAR(36) PRIMARY KEY,
user_id VARCHAR(36) NOT NULL,
name VARCHAR(100) NOT NULL,
description TEXT,
agent_ids TEXT,
status VARCHAR(20) DEFAULT 'active',
created_at DATETIME(3),
updated_at DATETIME(3),
INDEX idx_chat_groups_user (user_id)
)
`)
log.Println("Chat groups table verified/created")
// 使用GORM Migrator添加缺失的列
migrator := db.Migrator()
// Skill 表迁移
if !migrator.HasColumn(&model.Skill{}, "created_by") {
migrator.AddColumn(&model.Skill{}, "created_by")
}
if !migrator.HasColumn(&model.Tool{}, "security_level") {
migrator.AddColumn(&model.Tool{}, "security_level")
}
if !migrator.HasColumn(&model.Tool{}, "require_approval") {
migrator.AddColumn(&model.Tool{}, "require_approval")
}
if !migrator.HasColumn(&model.Tool{}, "parameters") {
migrator.AddColumn(&model.Tool{}, "parameters")
}
if !migrator.HasColumn(&model.Tool{}, "description_cn") {
migrator.AddColumn(&model.Tool{}, "description_cn")
}
// MCP 相关字段
if !migrator.HasColumn(&model.Tool{}, "transport") {
migrator.AddColumn(&model.Tool{}, "transport")
}
if !migrator.HasColumn(&model.Tool{}, "command") {
migrator.AddColumn(&model.Tool{}, "command")
}
if !migrator.HasColumn(&model.Tool{}, "args") {
migrator.AddColumn(&model.Tool{}, "args")
}
if !migrator.HasColumn(&model.Tool{}, "env") {
migrator.AddColumn(&model.Tool{}, "env")
}
log.Println("Database tables verified/created")
// 4. 初始化 Repository
dbRepo := repository.NewDatabaseRepository(db)
subTableRepo := repository.NewSubTableRepository(db)
modelRepo := repository.NewModelRepository(db)
knowledgeRepo := repository.NewKnowledgeRepository(db)
userRepo := repository.NewUserRepository(db)
toolRepo := repository.NewToolRepository(db)
mcpRepo := repository.NewMCPRepository(db)
skillRepo := repository.NewSkillRepository(db)
agentRepo := repository.NewAgentRepository(db)
chatRepo := repository.NewChatRepository(db)
// 4.1 初始化默认管理员用户
initDefaultAdmin(userRepo)
// 5. 初始化 Service
dbService := service.NewDatabaseService(dbRepo, subTableRepo)
subTableService := service.NewSubTableService(subTableRepo, dbRepo)
neo4jService := service.NewNeo4jService(dbRepo)
modelService := service.NewModelService(modelRepo)
uploadService, err := service.NewUploadService(cfg)
if err != nil {
log.Printf("Warning: Failed to initialize upload service: %v (files will not be available)", err)
}
knowledgeService := service.NewKnowledgeService(knowledgeRepo, modelRepo, uploadService, cfg.PythonServiceURL, cfg.AICoreServiceAddr, cfg.MarkdownLocalPath)
authService := service.NewAuthService(cfg.JWTSecret, userRepo)
toolService := service.NewToolService(toolRepo)
mcpService := service.NewMCPService(mcpRepo)
skillService := service.NewSkillService(skillRepo)
agentService := service.NewAgentService(cfg.PythonServiceURL, modelRepo, agentRepo)
memoryService := service.NewMemoryService(agentRepo)
// 4.2 初始化默认工具
if err := toolService.InitDefaultTools(); err != nil {
log.Printf("Warning: Failed to init default tools: %v", err)
} else {
log.Println("Default tools initialized")
}
// 4.3 初始化 skills已禁用自动加载如需启用请调用 /skill/sync 接口)
// if err := skillService.InitSkills(); err != nil {
// log.Printf("Warning: Failed to init skills: %v", err)
// } else {
// log.Println("Skills initialized")
// }
// 6. 初始化 Handler
dbHandler := handler.NewDatabaseHandler(dbService)
subTableHandler := handler.NewSubTableHandler(subTableService)
neo4jHandler := handler.NewNeo4jHandler(neo4jService)
modelHandler := handler.NewModelHandler(modelService)
systemHandler := handler.NewSystemHandler()
knowledgeHandler := handler.NewKnowledgeHandler(knowledgeService)
authHandler := handler.NewAuthHandler(authService)
toolHandler := handler.NewToolHandler(toolService)
mcpHandler := handler.NewMCPHandler(mcpService)
skillHandler := handler.NewSkillHandler(skillService)
agentHandler := handler.NewAgentHandler(agentService)
memoryHandler := handler.NewMemoryHandler(memoryService)
sessionHandler := handler.NewSessionHandler(chatRepo)
// 初始化群聊服务
chatGroupRepo := repository.NewChatGroupRepository(db)
chatGroupService := service.NewChatGroupService(chatGroupRepo, agentRepo)
chatGroupHandler := handler.NewChatGroupHandler(chatGroupService)
var uploadHandler *handler.UploadHandler
if uploadService != nil {
uploadHandler = handler.NewUploadHandler(uploadService, knowledgeRepo)
}
// 7. 设置路由
r := gin.New()
r.Use(gin.Logger())
r.Use(gin.Recovery())
// 禁用响应缓冲,用于流式输出
r.Use(func(c *gin.Context) {
c.Header("X-Accel-Buffering", "no")
c.Next()
})
// 请求日志中间件
r.Use(func(c *gin.Context) {
start := time.Now()
path := c.Request.URL.Path
// 记录请求体
var requestBody []byte
if c.Request.Method == "POST" || c.Request.Method == "PUT" {
requestBody, _ = io.ReadAll(c.Request.Body)
c.Request.Body = io.NopCloser(bytes.NewBuffer(requestBody))
}
c.Next()
// 记录响应日志
latency := time.Since(start)
status := c.Writer.Status()
// 使用日志系统记录
logger.LogRequest(c.Request.Method, path, string(requestBody), status, latency)
})
// CORS 中间件
r.Use(func(c *gin.Context) {
c.Writer.Header().Set("Access-Control-Allow-Origin", "*")
c.Writer.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS, PATCH")
c.Writer.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization, X-Requested-With")
if c.Request.Method == "OPTIONS" {
c.AbortWithStatus(204)
return
}
c.Next()
})
// 认证模块(无需登录)
authGroup := r.Group("/auth")
{
authGroup.POST("/register", authHandler.Register)
authGroup.POST("/login", authHandler.Login)
}
// 需要登录的认证模块
authProtectedGroup := r.Group("/auth")
authProtectedGroup.Use(middleware.Auth(cfg.JWTSecret))
{
authProtectedGroup.GET("/me", authHandler.GetCurrentUser)
}
// 用户管理模块(需要登录)
userGroup := r.Group("/user")
userGroup.Use(middleware.Auth(cfg.JWTSecret))
{
userGroup.GET("/list", authHandler.ListUsers)
userGroup.GET("/:id", authHandler.GetUserByID)
}
// 数据库管理模块
databaseGroup := r.Group("/database")
{
databaseGroup.GET("/list", dbHandler.List)
databaseGroup.GET("/:id", dbHandler.GetByID)
databaseGroup.POST("/check", dbHandler.Check)
databaseGroup.POST("/add", dbHandler.Create)
databaseGroup.PUT("/:id", dbHandler.Update)
databaseGroup.DELETE("/:id", dbHandler.Delete)
databaseGroup.POST("/graph/save", dbHandler.SaveGraph)
}
// 子表映射管理模块
subTableGroup := r.Group("/sub-table")
{
subTableGroup.POST("/add", subTableHandler.Create)
subTableGroup.GET("/:id", subTableHandler.GetByID)
subTableGroup.GET("/database/:database_id", subTableHandler.ListByDatabase)
subTableGroup.GET("/mapping/:database_id", subTableHandler.GetMappingFromFile)
subTableGroup.GET("/ddl/:database_id", subTableHandler.GetTablesDDL)
subTableGroup.PUT("/:id", subTableHandler.Update)
subTableGroup.DELETE("/:id", subTableHandler.Delete)
}
// Neo4j 连接管理模块
neo4jGroup := r.Group("/neo4j")
{
neo4jGroup.POST("/check", neo4jHandler.Check)
neo4jGroup.POST("/graphs", neo4jHandler.GetGraphs)
neo4jGroup.POST("/nodes", neo4jHandler.GetNodes)
neo4jGroup.POST("/relationships", neo4jHandler.GetRelationships)
}
// Model 管理模块
modelGroup := r.Group("/model")
{
modelGroup.GET("/list", modelHandler.List)
modelGroup.POST("/test", modelHandler.Test)
modelGroup.POST("/add", modelHandler.Create)
modelGroup.GET("/:id", modelHandler.GetByID)
modelGroup.PUT("/:id", modelHandler.Update)
modelGroup.DELETE("/:id", modelHandler.Delete)
}
// 知识库管理模块
knowledgeGroup := r.Group("/api/knowledge")
{
knowledgeGroup.POST("/create", knowledgeHandler.Create)
knowledgeGroup.GET("/list", knowledgeHandler.List)
knowledgeGroup.GET("/:id", knowledgeHandler.GetByID)
knowledgeGroup.PUT("/:id", knowledgeHandler.Update)
knowledgeGroup.DELETE("/:id", knowledgeHandler.Delete)
// 文档管理
knowledgeGroup.GET("/:id/documents", knowledgeHandler.ListDocuments)
knowledgeGroup.POST("/:id/documents", knowledgeHandler.UploadDocument)
knowledgeGroup.DELETE("/:id/documents/:doc_id", knowledgeHandler.DeleteDocument)
knowledgeGroup.POST("/:id/documents/:doc_id/reparse", knowledgeHandler.ReparseDocument)
knowledgeGroup.GET("/:id/documents/:doc_id/preview", knowledgeHandler.GetDocumentPreview)
}
// 系统信息模块
r.GET("/system/info", systemHandler.GetSystemInfo)
// 工具管理模块
toolGroup := r.Group("/tool")
{
toolGroup.GET("/list", toolHandler.List)
toolGroup.GET("/sync", toolHandler.Sync) // 手动同步
toolGroup.GET("/:id", toolHandler.GetByID)
toolGroup.POST("/add", toolHandler.Create)
toolGroup.PUT("/:id", toolHandler.Update)
toolGroup.DELETE("/:id", toolHandler.Delete)
}
// MCP管理模块
mcpGroup := r.Group("/mcp")
{
mcpGroup.GET("/list", mcpHandler.List)
mcpGroup.GET("/:id", mcpHandler.GetByID)
mcpGroup.POST("/add", mcpHandler.Create)
mcpGroup.PUT("/:id", mcpHandler.Update)
mcpGroup.DELETE("/:id", mcpHandler.Delete)
}
// Skill管理模块
skillGroup := r.Group("/skill")
{
skillGroup.GET("/list", skillHandler.List)
skillGroup.GET("/sync", skillHandler.Sync)
skillGroup.GET("/content", skillHandler.GetSkillContent)
skillGroup.GET("/:id", skillHandler.GetByID)
skillGroup.POST("/add", skillHandler.Create)
skillGroup.PUT("/:id", skillHandler.Update)
skillGroup.DELETE("/:id", skillHandler.Delete)
}
// Agent管理模块 (调用 Python Agent Engine)
agentGroup := r.Group("/api/agent")
{
agentGroup.GET("/list", agentHandler.ListAgents)
agentGroup.POST("/create", agentHandler.CreateAgent)
agentGroup.PUT("/:id/status", agentHandler.UpdateAgentStatus)
agentGroup.PUT("/:id", agentHandler.UpdateAgent)
agentGroup.DELETE("/:id", agentHandler.DeleteAgent)
agentGroup.POST("/chat", agentHandler.Chat)
agentGroup.POST("/chat/stream", agentHandler.ChatStream)
agentGroup.POST("/team/chat", agentHandler.TeamChat)
}
// 会话管理模块
chatGroup := r.Group("/api/chat")
{
chatGroup.POST("/sessions", sessionHandler.CreateSession)
chatGroup.GET("/sessions", sessionHandler.ListSessions)
chatGroup.GET("/sessions/:id", sessionHandler.GetSession)
chatGroup.PUT("/sessions/:id", sessionHandler.UpdateSession)
chatGroup.DELETE("/sessions/:id", sessionHandler.DeleteSession)
chatGroup.GET("/sessions/:id/messages", sessionHandler.GetMessages)
chatGroup.POST("/messages", sessionHandler.CreateMessage)
}
// 群聊管理模块
groupChat := r.Group("/api/chat/groups")
{
groupChat.POST("", chatGroupHandler.CreateGroup)
groupChat.GET("", chatGroupHandler.ListGroups)
groupChat.GET("/:id", chatGroupHandler.GetGroup)
groupChat.PUT("/:id", chatGroupHandler.UpdateGroup)
groupChat.DELETE("/:id", chatGroupHandler.DeleteGroup)
groupChat.POST("/:id/chat", chatGroupHandler.GroupChat)
}
// 记忆管理模块
memoryGroup := r.Group("/api/agent/:id/memories")
{
memoryGroup.GET("", memoryHandler.GetMemories)
memoryGroup.POST("", memoryHandler.CreateMemory)
memoryGroup.GET("/search", memoryHandler.SearchMemories)
memoryGroup.GET("/categories", memoryHandler.GetMemoryCategories)
memoryGroup.GET("/tags", memoryHandler.GetMemoryTags)
memoryGroup.GET("/export", memoryHandler.ExportMemories)
memoryGroup.POST("/import", memoryHandler.ImportMemories)
}
// 单个记忆操作
memoryItemGroup := r.Group("/api/agent/memories/:memory_id")
{
memoryItemGroup.GET("", memoryHandler.GetMemory)
memoryItemGroup.PUT("", memoryHandler.UpdateMemory)
memoryItemGroup.DELETE("", memoryHandler.DeleteMemory)
}
// Swagger 文档
r.GET("/swagger/*any", ginSwagger.WrapHandler(swaggerFiles.Handler))
// 文件上传模块
if uploadHandler != nil {
// 本地文件静态服务
if cfg.UploadMode == "local" {
r.Static("/files", cfg.UploadLocalPath)
}
// 上传路由
r.POST("/api/file_upload", uploadHandler.Upload)
r.DELETE("/api/file_upload/:filename", uploadHandler.Delete)
// 文件代理路由(解决 MinIO 内网和 HTTPS 问题)
r.GET("/api/file_proxy", uploadHandler.ProxyFile)
}
// 8. 启动服务
log.Printf("Server starting on :%s", cfg.Port)
if err := r.Run(":" + cfg.Port); err != nil {
log.Fatalf("Failed to start server: %v", err)
}
}