package dbclient import ( "context" "encoding/json" "fmt" "sync" "time" "u-desk/internal/common" "u-desk/internal/crypto" "u-desk/internal/storage/models" ) // ConnectionPool 连接池管理器 type ConnectionPool struct { mysqlClients map[uint]*MySQLClient redisClients map[uint]*RedisClient mongoClients map[uint]*MongoClient // 新增:MySQL 真连接池 mysqlPool *MySQLConnectionPool // 查询优化器 queryOptimizer *QueryOptimizer mu sync.RWMutex } var ( globalPool *ConnectionPool poolOnce sync.Once ) // GetPool 获取全局连接池实例 func GetPool() *ConnectionPool { poolOnce.Do(func() { // 创建 MySQL 连接池 poolConfig := DefaultPoolConfig() mysqlPool := NewMySQLConnectionPool(poolConfig) // 启动维护协程 mysqlPool.StartMaintenance() // 创建查询优化器 queryOptimizer := NewQueryOptimizer(nil) globalPool = &ConnectionPool{ mysqlClients: make(map[uint]*MySQLClient), redisClients: make(map[uint]*RedisClient), mongoClients: make(map[uint]*MongoClient), mysqlPool: mysqlPool, queryOptimizer: queryOptimizer, } }) return globalPool } // PooledClient 带释放语义的客户端包装 type PooledClient struct { Client *MySQLClient entry *MySQLPoolEntry pool *MySQLConnectionPool fromPool bool } // Release 释放连接回连接池 func (pc *PooledClient) Release() { if pc.fromPool && pc.pool != nil && pc.entry != nil { pc.pool.Release(pc.entry) } } // GetMySQLClient 获取或创建 MySQL 客户端(使用连接池) func (p *ConnectionPool) GetMySQLClient(conn *models.DbConnection) *PooledClient { p.mu.Lock() defer p.mu.Unlock() // 尝试从连接池获取连接 if p.mysqlPool != nil { entry, err := p.mysqlPool.Acquire(conn) if err == nil { return &PooledClient{Client: entry.Client, entry: entry, pool: p.mysqlPool, fromPool: true} } p.logPoolError("Acquire failed", err) } // 降级到原有逻辑 client, err := p.getMySQLClientLegacy(conn) if err != nil { return &PooledClient{Client: nil, fromPool: false} } return &PooledClient{Client: client, fromPool: false} } // logPoolError 记录连接池错误 func (p *ConnectionPool) logPoolError(operation string, err error) { if p.queryOptimizer != nil { // 通过查询优化器记录错误 p.queryOptimizer.RecordPoolError(operation, err) } } // getMySQLClientLegacy 原有的 MySQL 客户端获取逻辑(向后兼容) func (p *ConnectionPool) getMySQLClientLegacy(conn *models.DbConnection) (*MySQLClient, error) { // 检查是否已存在 if client, ok := p.mysqlClients[conn.ID]; ok { // 测试连接是否有效 if err := client.sqlDB.Ping(); err == nil { return client, nil } // 连接已断开,移除并重新创建 client.Close() delete(p.mysqlClients, conn.ID) } // 解密密码 password, err := crypto.DecryptPassword(conn.Password) if err != nil { return nil, fmt.Errorf("密码解密失败: %v", err) } // 创建新客户端 config := &MySQLConfig{ Host: conn.Host, Port: conn.Port, Username: conn.Username, Password: password, // 如果密码为空,MySQL会尝试无密码连接 Database: conn.Database, } client, err := NewMySQLClient(config) if err != nil { return nil, err } p.mysqlClients[conn.ID] = client return client, nil } // GetMySQLPoolStats 获取 MySQL 连接池统计信息 func (p *ConnectionPool) GetMySQLPoolStats() *PoolStats { if p.mysqlPool != nil { stats := p.mysqlPool.Stats() return &stats } return nil } // OptimizeQuery 优化查询执行 func (p *ConnectionPool) OptimizeQuery(ctx context.Context, conn *models.DbConnection, sqlStr string, database string) (*QueryResult, time.Duration, error) { pc := p.GetMySQLClient(conn) if pc.Client == nil { return nil, 0, fmt.Errorf("获取 MySQL 连接失败") } defer pc.Release() // 使用查询优化器 if p.queryOptimizer != nil { return p.queryOptimizer.OptimizeQuery(ctx, pc.Client, sqlStr, database) } // 降级到普通查询 startTime := time.Now() result, err := pc.Client.ExecuteQuery(ctx, sqlStr, database) duration := time.Since(startTime) return result, duration, err } // ExecuteOptimizedUpdate 执行优化的更新操作 func (p *ConnectionPool) ExecuteOptimizedUpdate(ctx context.Context, conn *models.DbConnection, sqlStr string, database string) (int64, time.Duration, error) { pc := p.GetMySQLClient(conn) if pc.Client == nil { return 0, 0, fmt.Errorf("获取 MySQL 连接失败") } defer pc.Release() // 使用查询优化器 if p.queryOptimizer != nil { return p.queryOptimizer.ExecuteOptimizedUpdate(ctx, pc.Client, sqlStr, database) } // 降级到普通更新 startTime := time.Now() result, err := pc.Client.ExecuteUpdate(ctx, sqlStr, database) duration := time.Since(startTime) return result, duration, err } // GetQueryStats 获取查询统计信息 func (p *ConnectionPool) GetQueryStats() QueryStats { if p.queryOptimizer != nil { return p.queryOptimizer.GetQueryStats() } return QueryStats{} } // GetSlowQueries 获取慢查询记录 func (p *ConnectionPool) GetSlowQueries(limit int) []SlowQuery { if p.queryOptimizer != nil { return p.queryOptimizer.GetSlowQueries(limit) } return []SlowQuery{} } // GetIndexSuggestions 获取索引建议 func (p *ConnectionPool) GetIndexSuggestions(table string) []IndexSuggestion { if p.queryOptimizer != nil { return p.queryOptimizer.GetIndexSuggestions(table) } return []IndexSuggestion{} } // GenerateIndexSuggestions 为表生成索引建议 func (p *ConnectionPool) GenerateIndexSuggestions(ctx context.Context, conn *models.DbConnection, database, table string) error { pc := p.GetMySQLClient(conn) if pc.Client == nil { return fmt.Errorf("获取 MySQL 连接失败") } defer pc.Release() // 使用查询优化器 if p.queryOptimizer != nil { return p.queryOptimizer.GenerateIndexSuggestions(ctx, pc.Client, database, table) } return nil } // ClearQueryCache 清空查询缓存 func (p *ConnectionPool) ClearQueryCache() { if p.queryOptimizer != nil { p.queryOptimizer.ClearCache() } } // GetRedisClient 获取或创建 Redis 客户端 func (p *ConnectionPool) GetRedisClient(conn *models.DbConnection) (*RedisClient, error) { p.mu.Lock() defer p.mu.Unlock() // 检查是否已存在 if client, ok := p.redisClients[conn.ID]; ok { // 测试连接是否有效 ctx, cancel := context.WithTimeout(context.Background(), common.TimeoutPing) defer cancel() if err := client.client.Ping(ctx).Err(); err == nil { return client, nil } // 连接已断开,移除并重新创建 client.Close() delete(p.redisClients, conn.ID) } // 解密密码 password, err := crypto.DecryptPassword(conn.Password) if err != nil { return nil, fmt.Errorf("密码解密失败: %v", err) } // 解析 Redis DB 编号(从 Database 字段,默认为 0) dbNum := 0 if conn.Database != "" { // 尝试解析 Database 字段为数字 _, err := fmt.Sscanf(conn.Database, "%d", &dbNum) if err != nil { // 如果解析失败,使用默认值 0 dbNum = 0 } // 限制 DB 编号在 0-15 之间 if dbNum < 0 || dbNum > 15 { dbNum = 0 } } // 创建新客户端 config := &RedisConfig{ Host: conn.Host, Port: conn.Port, Password: password, DB: dbNum, } client, err := NewRedisClient(config) if err != nil { return nil, err } p.redisClients[conn.ID] = client return client, nil } // GetMongoClient 获取或创建 MongoDB 客户端 func (p *ConnectionPool) GetMongoClient(conn *models.DbConnection) (*MongoClient, error) { p.mu.Lock() defer p.mu.Unlock() // 检查是否已存在 if client, ok := p.mongoClients[conn.ID]; ok { // 测试连接是否有效 ctx, cancel := context.WithTimeout(context.Background(), common.TimeoutPing) defer cancel() if err := client.client.Ping(ctx, nil); err == nil { return client, nil } // 连接已断开,移除并重新创建 client.Close() delete(p.mongoClients, conn.ID) } // 解密密码 password, err := crypto.DecryptPassword(conn.Password) if err != nil { return nil, fmt.Errorf("密码解密失败: %v", err) } // 解析 Options 获取 MongoDB 连接参数 authSource := "" authMechanism := "" if conn.Options != "" { var opts map[string]interface{} if err := json.Unmarshal([]byte(conn.Options), &opts); err == nil { if as, ok := opts["authSource"].(string); ok && as != "" { authSource = as } if am, ok := opts["authMechanism"].(string); ok && am != "" { authMechanism = am } } } // 创建新客户端 config := &MongoConfig{ Host: conn.Host, Port: conn.Port, Username: conn.Username, Password: password, Database: conn.Database, AuthSource: authSource, AuthMechanism: authMechanism, } client, err := NewMongoClient(config) if err != nil { return nil, err } p.mongoClients[conn.ID] = client return client, nil } // CloseConnection 关闭指定连接 func (p *ConnectionPool) CloseConnection(connID uint, dbType string) { p.mu.Lock() defer p.mu.Unlock() switch dbType { case "mysql": if client, ok := p.mysqlClients[connID]; ok { client.Close() delete(p.mysqlClients, connID) } case "redis": if client, ok := p.redisClients[connID]; ok { client.Close() delete(p.redisClients, connID) } case "mongo": if client, ok := p.mongoClients[connID]; ok { client.Close() delete(p.mongoClients, connID) } } } // CloseAll 关闭所有连接 func (p *ConnectionPool) CloseAll() { p.mu.Lock() defer p.mu.Unlock() for _, client := range p.mysqlClients { client.Close() } for _, client := range p.redisClients { client.Close() } for _, client := range p.mongoClients { client.Close() } p.mysqlClients = make(map[uint]*MySQLClient) p.redisClients = make(map[uint]*RedisClient) p.mongoClients = make(map[uint]*MongoClient) }