package dbclient import ( "context" "fmt" "sync" "time" "u-desk/internal/crypto" "u-desk/internal/storage/models" ) // PoolConfig 连接池配置 type PoolConfig struct { // 最大打开连接数(硬上限) MaxOpenConns int // 最大空闲连接数(超过此数量的空闲连接会被关闭) MaxIdleConns int // 连接最大生命周期(超过此时间的连接会被关闭) ConnMaxLifetime time.Duration // 连接最大空闲时间(超过此时间未使用的连接会被关闭) ConnMaxIdleTime time.Duration // 最小空闲连接数(保持此数量的空闲连接以快速响应) MinIdleConns int // 连接超时时间(建立连接的最长时间) ConnTimeout time.Duration // 健康检查间隔(定期 Ping 连接检查有效性) HealthCheckInterval time.Duration // 是否启用连接预热(启动时建立最小连接) EnableWarmup bool // 是否启用慢连接日志(记录建立时间超过阈值的连接) EnableSlowConnLog bool // 慢连接阈值(超过此时间记录为慢连接) SlowConnThreshold time.Duration // 连接池最大容量(防止资源耗尽) MaxPoolCapacity int // 动态连接池配置 EnableDynamicScaling bool // 是否启用动态连接池调整 DynamicScaleFactor float64 // 动态调整因子(0.5-2.0) ScaleUpThreshold float64 // 扩容阈值(0-1.0,当使用率超过此值时扩容) ScaleDownThreshold float64 // 缩容阈值(0-1.0,当使用率低于此值时缩容) MinScaleUpInterval time.Duration // 最小扩容间隔(防止频繁调整) MinScaleDownInterval time.Duration // 最小缩容间隔 MaxIdleTimeForScale time.Duration // 用于动态调整的最大空闲时间 } // DefaultPoolConfig 返回默认连接池配置 func DefaultPoolConfig() *PoolConfig { return &PoolConfig{ MaxOpenConns: 50, // 最大50个连接(提高并发) MaxIdleConns: 20, // 最大20个空闲(提高响应速度) ConnMaxLifetime: 60 * time.Minute, // 连接最长60分钟(延长连接生命周期) ConnMaxIdleTime: 15 * time.Minute, // 空闲15分钟关闭(更长的空闲时间) MinIdleConns: 5, // 保持5个最小空闲(更好的响应性能) ConnTimeout: 3 * time.Second, // 连接超时3秒(更快失败) HealthCheckInterval: 20 * time.Second, // 20秒健康检查一次(更频繁的健康检查) EnableWarmup: true, // 启用预热 EnableSlowConnLog: true, // 启用慢连接日志 SlowConnThreshold: 200 * time.Millisecond, // 超过200ms算慢连接(更严格的性能要求) MaxPoolCapacity: 100, // 连接池最大容量(支持更高并发) // 动态连接池配置(更智能的调整策略) EnableDynamicScaling: true, // 启用动态调整 DynamicScaleFactor: 1.8, // 调整因子1.8倍(更激进的扩容) ScaleUpThreshold: 0.7, // 使用率超过70%扩容(更早扩容) ScaleDownThreshold: 0.4, // 使用率低于40%缩容(避免频繁调整) MinScaleUpInterval: 1 * time.Minute, // 最小扩容间隔1分钟(更快的响应) MinScaleDownInterval: 3 * time.Minute, // 最小缩容间隔3分钟(稳定缩容) MaxIdleTimeForScale: 20 * time.Minute, // 用于调整的最大空闲时间 } } // MySQLPoolEntry MySQL 连接池条目 type MySQLPoolEntry struct { Client *MySQLClient LastUsed time.Time CreatedAt time.Time InUse bool mu sync.Mutex } // AcquireResult 连接获取结果 type AcquireResult struct { Entry *MySQLPoolEntry Err error } // ReleaseResult 连接释放结果 type ReleaseResult struct { Success bool Err error } // Stats 连接池统计信息 type PoolStats struct { TotalConns int // 总连接数 ActiveConns int // 使用中的连接数 IdleConns int // 空闲连接数 WaitCount int64 // 等待连接的次数 WaitDuration time.Duration // 总等待时间 SlowConnCount int64 // 慢连接数量 } // MySQLConnectionPool MySQL 连接池(真正的连接池) type MySQLConnectionPool struct { config *PoolConfig configHash string // 配置哈希,用于检测配置变更 mu sync.RWMutex entries []*MySQLPoolEntry // 连接池条目 connMap map[uint]*MySQLClient // 连接ID -> 客户端映射(兼容现有代码) stats PoolStats stopCh chan struct{} wg sync.WaitGroup // 动态调整相关 lastScaleUpTime time.Time // 上次扩容时间 lastScaleDownTime time.Time // 上次缩容时间 currentTargetSize int // 当前目标连接数 usageHistory []float64 // 使用率历史记录(用于智能调整) adaptiveWeights map[uint]float64 // 连接权重(基于性能表现) } // NewMySQLConnectionPool 创建新的 MySQL 连接池 func NewMySQLConnectionPool(config *PoolConfig) *MySQLConnectionPool { if config == nil { config = DefaultPoolConfig() } pool := &MySQLConnectionPool{ config: config, entries: make([]*MySQLPoolEntry, 0, config.MaxPoolCapacity), connMap: make(map[uint]*MySQLClient), stopCh: make(chan struct{}), currentTargetSize: config.MinIdleConns, usageHistory: make([]float64, 0, 100), // 保留最近100个使用率记录 adaptiveWeights: make(map[uint]float64), } return pool } // Acquire 获取一个连接(阻塞等待直到有可用连接) func (p *MySQLConnectionPool) Acquire(conn *models.DbConnection) (*MySQLPoolEntry, error) { p.mu.Lock() defer p.mu.Unlock() startTime := time.Now() // 尝试获取最优连接(启用动态调整时) if p.config.EnableDynamicScaling { if entry, err := p.getOptimalConnection(); err == nil { p.updateWaitStats(startTime) return entry, nil } } // 降级到标准逻辑 - 查找空闲连接 for _, entry := range p.entries { entry.mu.Lock() if !entry.InUse { entry.InUse = true entry.LastUsed = time.Now() entry.mu.Unlock() // 更新统计 p.updateWaitStats(startTime) return entry, nil } entry.mu.Unlock() } // 没有可用连接,创建新连接 if len(p.entries) >= p.config.MaxOpenConns { // 已达到最大连接数,等待 return p.waitForAvailableConnection(conn) } // 创建新连接(使用传入的连接配置) newEntry, err := p.createNewEntry(conn) if err != nil { return nil, fmt.Errorf("创建连接失败: %v", err) } p.entries = append(p.entries, newEntry) p.updateStats() p.updateWaitStats(startTime) return newEntry, nil } // Release 释放连接回池中 func (p *MySQLConnectionPool) Release(entry *MySQLPoolEntry) error { if entry == nil { return nil } p.mu.Lock() defer p.mu.Unlock() entry.mu.Lock() entry.InUse = false entry.LastUsed = time.Now() entry.mu.Unlock() p.updateStats() return nil } // Close 关闭连接池 func (p *MySQLConnectionPool) Close() error { p.mu.Lock() defer p.mu.Unlock() // 发送停止信号 close(p.stopCh) // 等待所有 goroutine 完成 p.wg.Wait() // 关闭所有连接 var lastErr error for _, entry := range p.entries { entry.mu.Lock() if err := entry.Client.Close(); err != nil { lastErr = err } entry.InUse = false entry.mu.Unlock() } p.entries = make([]*MySQLPoolEntry, 0, p.config.MaxPoolCapacity) p.connMap = make(map[uint]*MySQLClient) return lastErr } // Stats 获取连接池统计信息 func (p *MySQLConnectionPool) Stats() PoolStats { p.mu.RLock() defer p.mu.RUnlock() return p.stats } // cleanupIdleConnections 清理空闲连接 func (p *MySQLConnectionPool) cleanupIdleConnections() { p.mu.Lock() defer p.mu.Unlock() now := time.Now() keepEntries := make([]*MySQLPoolEntry, 0, len(p.entries)) for _, entry := range p.entries { entry.mu.Lock() isIdle := !entry.InUse idleDuration := now.Sub(entry.LastUsed) entry.mu.Unlock() // 保留条件:正在使用 或 空闲时间未超过阈值 或 数量少于最小空闲数 keep := !isIdle || idleDuration < p.config.ConnMaxIdleTime || len(keepEntries) < p.config.MinIdleConns if keep { keepEntries = append(keepEntries, entry) } else { // 关闭连接 entry.Client.Close() } } p.entries = keepEntries p.updateStats() } // healthCheck 健康检查(增强版本) func (p *MySQLConnectionPool) healthCheck() { p.enhancedHealthCheck() } // StartMaintenance 启动维护协程(清理和健康检查) func (p *MySQLConnectionPool) StartMaintenance() { p.wg.Add(1) go func() { defer p.wg.Done() // 健康检查Ticker healthTicker := time.NewTicker(p.config.HealthCheckInterval) defer healthTicker.Stop() // 动态调整Ticker(较短间隔) scaleTicker := time.NewTicker(1 * time.Minute) defer scaleTicker.Stop() for { select { case <-healthTicker.C: // 清理空闲连接 p.cleanupIdleConnections() // 健康检查 p.healthCheck() case <-scaleTicker.C: // 动态连接池调整 if p.config.EnableDynamicScaling { p.adaptiveScaling() } case <-p.stopCh: return } } }() } // createNewEntry 创建新的连接池条目 func (p *MySQLConnectionPool) createNewEntry(conn *models.DbConnection) (*MySQLPoolEntry, error) { startTime := time.Now() client, err := createMySQLClient(conn) if err != nil { return nil, err } elapsed := time.Since(startTime) // 慢连接日志 if p.config.EnableSlowConnLog && elapsed > p.config.SlowConnThreshold { // 记录慢连接 p.mu.Lock() p.stats.SlowConnCount++ p.mu.Unlock() } entry := &MySQLPoolEntry{ Client: client, LastUsed: time.Now(), CreatedAt: startTime, InUse: true, } return entry, nil } // waitForAvailableConnection 等待可用连接并获取它 func (p *MySQLConnectionPool) waitForAvailableConnection(conn *models.DbConnection) (*MySQLPoolEntry, error) { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() ticker := time.NewTicker(100 * time.Millisecond) defer ticker.Stop() for { select { case <-ctx.Done(): return nil, ErrPoolExhausted case <-ticker.C: p.mu.Lock() for _, entry := range p.entries { entry.mu.Lock() if !entry.InUse { entry.InUse = true entry.LastUsed = time.Now() entry.mu.Unlock() p.mu.Unlock() return entry, nil } entry.mu.Unlock() } p.mu.Unlock() } } } // updateWaitStats 更新等待统计(调用方必须持有 p.mu) func (p *MySQLConnectionPool) updateWaitStats(startTime time.Time) { p.stats.WaitCount++ p.stats.WaitDuration += time.Since(startTime) } // updateStats 更新连接池统计 func (p *MySQLConnectionPool) updateStats() { total := len(p.entries) active := 0 idle := 0 for _, entry := range p.entries { entry.mu.Lock() if entry.InUse { active++ } else { idle++ } entry.mu.Unlock() } p.stats.TotalConns = total p.stats.ActiveConns = active p.stats.IdleConns = idle } // adaptiveScaling 自适应连接池调整 func (p *MySQLConnectionPool) adaptiveScaling() { p.mu.Lock() defer p.mu.Unlock() // 计算当前使用率 if len(p.entries) == 0 { return } usageRate := float64(p.stats.ActiveConns) / float64(len(p.entries)) // 记录使用率历史 p.usageHistory = append(p.usageHistory, usageRate) if len(p.usageHistory) > 100 { p.usageHistory = p.usageHistory[1:] } // 检查是否需要调整 now := time.Now() // 扩容逻辑 if usageRate >= p.config.ScaleUpThreshold { if now.Sub(p.lastScaleUpTime) >= p.config.MinScaleUpInterval { p.scaleUp() p.lastScaleUpTime = now } return } // 缩容逻辑 if usageRate <= p.config.ScaleDownThreshold && len(p.entries) > p.config.MinIdleConns { if now.Sub(p.lastScaleDownTime) >= p.config.MinScaleDownInterval { p.scaleDown() p.lastScaleDownTime = now } } } // scaleUp 扩容 func (p *MySQLConnectionPool) scaleUp() { // scaleUp 仅更新目标大小,实际连接在 Acquire 时按需创建 // 移除了创建无效虚拟连接的逻辑 currentSize := len(p.entries) scaleFactor := p.config.DynamicScaleFactor newSize := int(float64(currentSize) * scaleFactor) newSize = min(newSize, p.config.MaxOpenConns) newSize = max(newSize, currentSize+1) p.currentTargetSize = newSize p.updateStats() } // scaleDown 缩容 func (p *MySQLConnectionPool) scaleDown() { // 计算新目标大小 currentSize := len(p.entries) scaleFactor := 1.0 / p.config.DynamicScaleFactor newSize := int(float64(currentSize) * scaleFactor) newSize = max(newSize, p.config.MinIdleConns) newSize = min(newSize, currentSize-1) // 至少减少1个连接 if newSize < currentSize { // 关闭多余的空闲连接 p.closeIdleConnections(currentSize - newSize) p.currentTargetSize = newSize p.updateStats() } } // closeIdleConnections 关闭指定数量的空闲连接 func (p *MySQLConnectionPool) closeIdleConnections(count int) { // 收集空闲连接 idleEntries := make([]*MySQLPoolEntry, 0) for _, entry := range p.entries { entry.mu.Lock() if !entry.InUse { idleEntries = append(idleEntries, entry) } entry.mu.Unlock() } // 关闭指定数量的空闲连接 closedEntries := make(map[*MySQLPoolEntry]bool) for i := 0; i < min(count, len(idleEntries)); i++ { entry := idleEntries[i] entry.mu.Lock() entry.Client.Close() entry.mu.Unlock() closedEntries[entry] = true } // 重新构建连接池 remainingEntries := make([]*MySQLPoolEntry, 0, len(p.entries)) for _, entry := range p.entries { if closedEntries[entry] { continue // 跳过已关闭的连接 } remainingEntries = append(remainingEntries, entry) } p.entries = remainingEntries } // enhancedHealthCheck 增强的健康检查 func (p *MySQLConnectionPool) enhancedHealthCheck() { p.mu.RLock() entriesCopy := make([]*MySQLPoolEntry, len(p.entries)) copy(entriesCopy, p.entries) p.mu.RUnlock() var healthyEntries []*MySQLPoolEntry var performanceWeights []float64 for _, entry := range entriesCopy { entry.mu.Lock() isIdle := !entry.InUse // 测试连接有效性 isHealthy := true startTime := time.Now() if isIdle { // 空闲连接:简单Ping测试 if err := entry.Client.sqlDB.Ping(); err != nil { isHealthy = false // 关闭失效连接 entry.Client.Close() } } else { // 使用中的连接:快速测试(避免影响正常查询) func() { ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) defer cancel() if err := entry.Client.sqlDB.PingContext(ctx); err != nil { isHealthy = false } }() } // 计算连接性能权重 if isHealthy { healthyEntries = append(healthyEntries, entry) // 基于连接性能计算权重 responseTime := time.Since(startTime).Microseconds() weight := 1.0 / max(float64(responseTime)/1000.0, 1.0) // 转换为毫秒,避免除零 performanceWeights = append(performanceWeights, weight) } else { // 不健康的连接 if isIdle { entry.Client.Close() } } entry.mu.Unlock() } // 更新连接池 p.mu.Lock() defer p.mu.Unlock() p.entries = healthyEntries // 更新自适应权重 if len(healthyEntries) > 0 { for i := range healthyEntries { if i < len(performanceWeights) { p.adaptiveWeights[uint(i)] = performanceWeights[i] } } } p.updateStats() } // warmUp 连接池预热 func (p *MySQLConnectionPool) warmUp() { if !p.config.EnableWarmup { return } p.mu.Lock() defer p.mu.Unlock() currentIdle := 0 for _, entry := range p.entries { entry.mu.Lock() if !entry.InUse { currentIdle++ } entry.mu.Unlock() } targetIdle := p.config.MinIdleConns needed := targetIdle - currentIdle // warmUp 仅记录目标大小,不在无连接配置的情况下创建无效虚拟连接 // 实际连接在 Acquire 时按需创建 _ = needed p.updateStats() } // getOptimalConnection 获取最优连接(基于性能权重) // 注意:调用方必须已持有 p.mu func (p *MySQLConnectionPool) getOptimalConnection() (*MySQLPoolEntry, error) { var bestEntry *MySQLPoolEntry var bestWeight float64 for i, entry := range p.entries { entry.mu.Lock() if !entry.InUse { weight := 1.0 // 默认权重 if w, ok := p.adaptiveWeights[uint(i)]; ok { weight = w } if bestEntry == nil || weight > bestWeight { bestEntry = entry bestWeight = weight } } entry.mu.Unlock() } if bestEntry == nil { return nil, ErrPoolExhausted } bestEntry.InUse = true bestEntry.LastUsed = time.Now() return bestEntry, nil } // createMySQLClient 创建 MySQL 客户端的辅助函数 func createMySQLClient(conn *models.DbConnection) (*MySQLClient, error) { // 解密密码 password, err := crypto.DecryptPassword(conn.Password) if err != nil { return nil, fmt.Errorf("密码解密失败: %v", err) } config := &MySQLConfig{ Host: conn.Host, Port: conn.Port, Username: conn.Username, Password: password, Database: conn.Database, } return NewMySQLClient(config) } // 错误定义 var ( ErrPoolExhausted = &PoolError{Message: "连接池已耗尽"} ErrPoolClosed = &PoolError{Message: "连接池已关闭"} ) // PoolError 连接池错误 type PoolError struct { Message string Err error } func (e *PoolError) Error() string { if e.Err != nil { return e.Message + ": " + e.Err.Error() } return e.Message }