package dbclient import ( "context" "fmt" "sync" "time" "u-desk/internal/crypto" "u-desk/internal/storage/models" ) // PoolConfig 连接池配置 type PoolConfig struct { // 最大打开连接数(硬上限) MaxOpenConns int // 最大空闲连接数(超过此数量的空闲连接会被关闭) MaxIdleConns int // 连接最大生命周期(超过此时间的连接会被关闭) ConnMaxLifetime time.Duration // 连接最大空闲时间(超过此时间未使用的连接会被关闭) ConnMaxIdleTime time.Duration // 最小空闲连接数(保持此数量的空闲连接以快速响应) MinIdleConns int // 连接超时时间(建立连接的最长时间) ConnTimeout time.Duration // 健康检查间隔(定期 Ping 连接检查有效性) HealthCheckInterval time.Duration // 是否启用连接预热(启动时建立最小连接) EnableWarmup bool // 是否启用慢连接日志(记录建立时间超过阈值的连接) EnableSlowConnLog bool // 慢连接阈值(超过此时间记录为慢连接) SlowConnThreshold time.Duration // 连接池最大容量(防止资源耗尽) MaxPoolCapacity int } // DefaultPoolConfig 返回默认连接池配置 func DefaultPoolConfig() *PoolConfig { return &PoolConfig{ MaxOpenConns: 20, // 最大20个连接 MaxIdleConns: 10, // 最大10个空闲 ConnMaxLifetime: 30 * time.Minute, // 连接最长30分钟 ConnMaxIdleTime: 10 * time.Minute, // 空闲10分钟关闭 MinIdleConns: 2, // 保持2个最小空闲 ConnTimeout: 5 * time.Second, // 连接超时5秒 HealthCheckInterval: 30 * time.Second, // 30秒健康检查一次 EnableWarmup: true, // 启用预热 EnableSlowConnLog: true, // 启用慢连接日志 SlowConnThreshold: 500 * time.Millisecond, // 超过500ms算慢连接 MaxPoolCapacity: 50, // 连接池最大容量 } } // MySQLPoolEntry MySQL 连接池条目 type MySQLPoolEntry struct { Client *MySQLClient LastUsed time.Time CreatedAt time.Time InUse bool mu sync.Mutex } // AcquireResult 连接获取结果 type AcquireResult struct { Entry *MySQLPoolEntry Err error } // ReleaseResult 连接释放结果 type ReleaseResult struct { Success bool Err error } // Stats 连接池统计信息 type PoolStats struct { TotalConns int // 总连接数 ActiveConns int // 使用中的连接数 IdleConns int // 空闲连接数 WaitCount int64 // 等待连接的次数 WaitDuration time.Duration // 总等待时间 SlowConnCount int64 // 慢连接数量 } // MySQLConnectionPool MySQL 连接池(真正的连接池) type MySQLConnectionPool struct { config *PoolConfig configHash string // 配置哈希,用于检测配置变更 mu sync.RWMutex entries []*MySQLPoolEntry // 连接池条目 connMap map[uint]*MySQLClient // 连接ID -> 客户端映射(兼容现有代码) stats PoolStats stopCh chan struct{} wg sync.WaitGroup } // NewMySQLConnectionPool 创建新的 MySQL 连接池 func NewMySQLConnectionPool(config *PoolConfig) *MySQLConnectionPool { if config == nil { config = DefaultPoolConfig() } pool := &MySQLConnectionPool{ config: config, entries: make([]*MySQLPoolEntry, 0, config.MaxPoolCapacity), connMap: make(map[uint]*MySQLClient), stopCh: make(chan struct{}), } return pool } // Acquire 获取一个连接(阻塞等待直到有可用连接) func (p *MySQLConnectionPool) Acquire(conn *models.DbConnection) (*MySQLPoolEntry, error) { p.mu.Lock() defer p.mu.Unlock() startTime := time.Now() // 尝试从池中获取空闲连接 for _, entry := range p.entries { entry.mu.Lock() if !entry.InUse { entry.InUse = true entry.LastUsed = time.Now() entry.mu.Unlock() // 更新统计 p.updateWaitStats(startTime) return entry, nil } entry.mu.Unlock() } // 没有可用连接,创建新连接 if len(p.entries) >= p.config.MaxOpenConns { // 已达到最大连接数,等待 return nil, p.waitForAvailableConnection(conn) } // 创建新连接 newEntry, err := p.createNewEntry(conn) if err != nil { return nil, err } p.entries = append(p.entries, newEntry) p.updateStats() p.updateWaitStats(startTime) return newEntry, nil } // Release 释放连接回池中 func (p *MySQLConnectionPool) Release(entry *MySQLPoolEntry) error { if entry == nil { return nil } entry.mu.Lock() defer entry.mu.Unlock() entry.InUse = false entry.LastUsed = time.Now() p.mu.Lock() defer p.mu.Unlock() p.updateStats() return nil } // Close 关闭连接池 func (p *MySQLConnectionPool) Close() error { p.mu.Lock() defer p.mu.Unlock() // 发送停止信号 close(p.stopCh) // 等待所有 goroutine 完成 p.wg.Wait() // 关闭所有连接 var lastErr error for _, entry := range p.entries { entry.mu.Lock() if err := entry.Client.Close(); err != nil { lastErr = err } entry.InUse = false entry.mu.Unlock() } p.entries = make([]*MySQLPoolEntry, 0, p.config.MaxPoolCapacity) p.connMap = make(map[uint]*MySQLClient) return lastErr } // Stats 获取连接池统计信息 func (p *MySQLConnectionPool) Stats() PoolStats { p.mu.RLock() defer p.mu.RUnlock() return p.stats } // cleanupIdleConnections 清理空闲连接 func (p *MySQLConnectionPool) cleanupIdleConnections() { p.mu.Lock() defer p.mu.Unlock() now := time.Now() keepEntries := make([]*MySQLPoolEntry, 0, len(p.entries)) for _, entry := range p.entries { entry.mu.Lock() isIdle := !entry.InUse idleDuration := now.Sub(entry.LastUsed) entry.mu.Unlock() // 保留条件:正在使用 或 空闲时间未超过阈值 或 数量少于最小空闲数 keep := !isIdle || idleDuration < p.config.ConnMaxIdleTime || len(keepEntries) < p.config.MinIdleConns if keep { keepEntries = append(keepEntries, entry) } else { // 关闭连接 entry.Client.Close() } } p.entries = keepEntries p.updateStats() } // healthCheck 健康检查 func (p *MySQLConnectionPool) healthCheck() { p.mu.RLock() entriesCopy := make([]*MySQLPoolEntry, len(p.entries)) copy(entriesCopy, p.entries) p.mu.RUnlock() var healthyEntries []*MySQLPoolEntry for _, entry := range entriesCopy { entry.mu.Lock() if !entry.InUse { // Ping 测试 if err := entry.Client.sqlDB.Ping(); err != nil { // 连接失效,标记为需要关闭 entry.mu.Unlock() entry.Client.Close() continue } } entry.mu.Unlock() healthyEntries = append(healthyEntries, entry) } // 更新连接池 p.mu.Lock() defer p.mu.Unlock() p.entries = healthyEntries p.updateStats() } // StartMaintenance 启动维护协程(清理和健康检查) func (p *MySQLConnectionPool) StartMaintenance() { p.wg.Add(1) go func() { defer p.wg.Done() ticker := time.NewTicker(p.config.HealthCheckInterval) defer ticker.Stop() for { select { case <-ticker.C: // 清理空闲连接 p.cleanupIdleConnections() // 健康检查 p.healthCheck() case <-p.stopCh: return } } }() } // createNewEntry 创建新的连接池条目 func (p *MySQLConnectionPool) createNewEntry(conn *models.DbConnection) (*MySQLPoolEntry, error) { startTime := time.Now() client, err := createMySQLClient(conn) if err != nil { return nil, err } elapsed := time.Since(startTime) // 慢连接日志 if p.config.EnableSlowConnLog && elapsed > p.config.SlowConnThreshold { // 记录慢连接 p.mu.Lock() p.stats.SlowConnCount++ p.mu.Unlock() } entry := &MySQLPoolEntry{ Client: client, LastUsed: time.Now(), CreatedAt: startTime, InUse: true, } return entry, nil } // waitForAvailableConnection 等待可用连接 func (p *MySQLConnectionPool) waitForAvailableConnection(conn *models.DbConnection) error { // 实现简单的等待逻辑(使用 channel) // 创建一个超时上下文 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() ticker := time.NewTicker(100 * time.Millisecond) defer ticker.Stop() for { select { case <-ctx.Done(): return ErrPoolExhausted case <-ticker.C: // 检查是否有可用连接 p.mu.RLock() hasAvailable := false for _, entry := range p.entries { entry.mu.Lock() if !entry.InUse { hasAvailable = true entry.mu.Unlock() break } entry.mu.Unlock() } p.mu.RUnlock() if hasAvailable { return nil } } } } // updateWaitStats 更新等待统计 func (p *MySQLConnectionPool) updateWaitStats(startTime time.Time) { waitDuration := time.Since(startTime) p.stats.WaitCount++ p.stats.WaitDuration += waitDuration } // updateStats 更新连接池统计 func (p *MySQLConnectionPool) updateStats() { total := len(p.entries) active := 0 idle := 0 for _, entry := range p.entries { entry.mu.Lock() if entry.InUse { active++ } else { idle++ } entry.mu.Unlock() } p.stats.TotalConns = total p.stats.ActiveConns = active p.stats.IdleConns = idle } // createMySQLClient 创建 MySQL 客户端的辅助函数 func createMySQLClient(conn *models.DbConnection) (*MySQLClient, error) { // 解密密码 password, err := crypto.DecryptPassword(conn.Password) if err != nil { return nil, fmt.Errorf("密码解密失败: %v", err) } config := &MySQLConfig{ Host: conn.Host, Port: conn.Port, Username: conn.Username, Password: password, Database: conn.Database, } return NewMySQLClient(config) } // 错误定义 var ( ErrPoolExhausted = &PoolError{Message: "连接池已耗尽"} ErrPoolClosed = &PoolError{Message: "连接池已关闭"} ) // PoolError 连接池错误 type PoolError struct { Message string Err error } func (e *PoolError) Error() string { if e.Err != nil { return e.Message + ": " + e.Err.Error() } return e.Message }