diff --git a/internal/dbclient/pool.go b/internal/dbclient/pool.go index ec2c733..9d53778 100644 --- a/internal/dbclient/pool.go +++ b/internal/dbclient/pool.go @@ -16,6 +16,10 @@ type ConnectionPool struct { mysqlClients map[uint]*MySQLClient redisClients map[uint]*RedisClient mongoClients map[uint]*MongoClient + + // 新增:MySQL 真连接池 + mysqlPool *MySQLConnectionPool + mu sync.RWMutex } @@ -27,20 +31,46 @@ var ( // GetPool 获取全局连接池实例 func GetPool() *ConnectionPool { poolOnce.Do(func() { + // 创建 MySQL 连接池 + poolConfig := DefaultPoolConfig() + + mysqlPool := NewMySQLConnectionPool(poolConfig) + // 启动维护协程 + mysqlPool.StartMaintenance() + globalPool = &ConnectionPool{ mysqlClients: make(map[uint]*MySQLClient), redisClients: make(map[uint]*RedisClient), mongoClients: make(map[uint]*MongoClient), + mysqlPool: mysqlPool, } }) return globalPool } -// GetMySQLClient 获取或创建 MySQL 客户端 +// GetMySQLClient 获取或创建 MySQL 客户端(使用连接池) func (p *ConnectionPool) GetMySQLClient(conn *models.DbConnection) (*MySQLClient, error) { p.mu.Lock() defer p.mu.Unlock() + // 尝试从连接池获取连接 + if p.mysqlPool != nil { + entry, err := p.mysqlPool.Acquire(conn) + if err == nil { + // 成功从池中获取连接 + return entry.Client, nil + } + + // 连接池错误,返回 + return nil, err + } + + // 降级到原有逻辑(如果连接池未初始化) + return p.getMySQLClientLegacy(conn) +} + +// getMySQLClientLegacy 原有的 MySQL 客户端获取逻辑(向后兼容) +func (p *ConnectionPool) getMySQLClientLegacy(conn *models.DbConnection) (*MySQLClient, error) { // 检查是否已存在 if client, ok := p.mysqlClients[conn.ID]; ok { // 测试连接是否有效 @@ -76,6 +106,15 @@ func (p *ConnectionPool) GetMySQLClient(conn *models.DbConnection) (*MySQLClient return client, nil } +// GetMySQLPoolStats 获取 MySQL 连接池统计信息 +func (p *ConnectionPool) GetMySQLPoolStats() *PoolStats { + if p.mysqlPool != nil { + stats := p.mysqlPool.Stats() + return &stats + } + return nil +} + // GetRedisClient 获取或创建 Redis 客户端 func (p *ConnectionPool) GetRedisClient(conn *models.DbConnection) (*RedisClient, error) { p.mu.Lock() diff --git a/internal/dbclient/pool_config.go b/internal/dbclient/pool_config.go new file mode 100644 index 0000000..df5fb95 --- /dev/null +++ b/internal/dbclient/pool_config.go @@ -0,0 +1,426 @@ +package dbclient + +import ( + "context" + "fmt" + "sync" + "time" + + "u-desk/internal/crypto" + "u-desk/internal/storage/models" +) + +// PoolConfig 连接池配置 +type PoolConfig struct { + // 最大打开连接数(硬上限) + MaxOpenConns int + // 最大空闲连接数(超过此数量的空闲连接会被关闭) + MaxIdleConns int + // 连接最大生命周期(超过此时间的连接会被关闭) + ConnMaxLifetime time.Duration + // 连接最大空闲时间(超过此时间未使用的连接会被关闭) + ConnMaxIdleTime time.Duration + // 最小空闲连接数(保持此数量的空闲连接以快速响应) + MinIdleConns int + // 连接超时时间(建立连接的最长时间) + ConnTimeout time.Duration + // 健康检查间隔(定期 Ping 连接检查有效性) + HealthCheckInterval time.Duration + // 是否启用连接预热(启动时建立最小连接) + EnableWarmup bool + // 是否启用慢连接日志(记录建立时间超过阈值的连接) + EnableSlowConnLog bool + // 慢连接阈值(超过此时间记录为慢连接) + SlowConnThreshold time.Duration + // 连接池最大容量(防止资源耗尽) + MaxPoolCapacity int +} + +// DefaultPoolConfig 返回默认连接池配置 +func DefaultPoolConfig() *PoolConfig { + return &PoolConfig{ + MaxOpenConns: 20, // 最大20个连接 + MaxIdleConns: 10, // 最大10个空闲 + ConnMaxLifetime: 30 * time.Minute, // 连接最长30分钟 + ConnMaxIdleTime: 10 * time.Minute, // 空闲10分钟关闭 + MinIdleConns: 2, // 保持2个最小空闲 + ConnTimeout: 5 * time.Second, // 连接超时5秒 + HealthCheckInterval: 30 * time.Second, // 30秒健康检查一次 + EnableWarmup: true, // 启用预热 + EnableSlowConnLog: true, // 启用慢连接日志 + SlowConnThreshold: 500 * time.Millisecond, // 超过500ms算慢连接 + MaxPoolCapacity: 50, // 连接池最大容量 + } +} + +// MySQLPoolEntry MySQL 连接池条目 +type MySQLPoolEntry struct { + Client *MySQLClient + LastUsed time.Time + CreatedAt time.Time + InUse bool + mu sync.Mutex +} + +// AcquireResult 连接获取结果 +type AcquireResult struct { + Entry *MySQLPoolEntry + Err error +} + +// ReleaseResult 连接释放结果 +type ReleaseResult struct { + Success bool + Err error +} + +// Stats 连接池统计信息 +type PoolStats struct { + TotalConns int // 总连接数 + ActiveConns int // 使用中的连接数 + IdleConns int // 空闲连接数 + WaitCount int64 // 等待连接的次数 + WaitDuration time.Duration // 总等待时间 + SlowConnCount int64 // 慢连接数量 +} + +// MySQLConnectionPool MySQL 连接池(真正的连接池) +type MySQLConnectionPool struct { + config *PoolConfig + configHash string // 配置哈希,用于检测配置变更 + mu sync.RWMutex + entries []*MySQLPoolEntry // 连接池条目 + connMap map[uint]*MySQLClient // 连接ID -> 客户端映射(兼容现有代码) + stats PoolStats + stopCh chan struct{} + wg sync.WaitGroup +} + +// NewMySQLConnectionPool 创建新的 MySQL 连接池 +func NewMySQLConnectionPool(config *PoolConfig) *MySQLConnectionPool { + if config == nil { + config = DefaultPoolConfig() + } + + pool := &MySQLConnectionPool{ + config: config, + entries: make([]*MySQLPoolEntry, 0, config.MaxPoolCapacity), + connMap: make(map[uint]*MySQLClient), + stopCh: make(chan struct{}), + } + + return pool +} + +// Acquire 获取一个连接(阻塞等待直到有可用连接) +func (p *MySQLConnectionPool) Acquire(conn *models.DbConnection) (*MySQLPoolEntry, error) { + p.mu.Lock() + defer p.mu.Unlock() + + startTime := time.Now() + + // 尝试从池中获取空闲连接 + for _, entry := range p.entries { + entry.mu.Lock() + if !entry.InUse { + entry.InUse = true + entry.LastUsed = time.Now() + entry.mu.Unlock() + + // 更新统计 + p.updateWaitStats(startTime) + + return entry, nil + } + entry.mu.Unlock() + } + + // 没有可用连接,创建新连接 + if len(p.entries) >= p.config.MaxOpenConns { + // 已达到最大连接数,等待 + return nil, p.waitForAvailableConnection(conn) + } + + // 创建新连接 + newEntry, err := p.createNewEntry(conn) + if err != nil { + return nil, err + } + + p.entries = append(p.entries, newEntry) + p.updateStats() + p.updateWaitStats(startTime) + + return newEntry, nil +} + +// Release 释放连接回池中 +func (p *MySQLConnectionPool) Release(entry *MySQLPoolEntry) error { + if entry == nil { + return nil + } + + entry.mu.Lock() + defer entry.mu.Unlock() + + entry.InUse = false + entry.LastUsed = time.Now() + + p.mu.Lock() + defer p.mu.Unlock() + + p.updateStats() + + return nil +} + +// Close 关闭连接池 +func (p *MySQLConnectionPool) Close() error { + p.mu.Lock() + defer p.mu.Unlock() + + // 发送停止信号 + close(p.stopCh) + + // 等待所有 goroutine 完成 + p.wg.Wait() + + // 关闭所有连接 + var lastErr error + for _, entry := range p.entries { + entry.mu.Lock() + if err := entry.Client.Close(); err != nil { + lastErr = err + } + entry.InUse = false + entry.mu.Unlock() + } + + p.entries = make([]*MySQLPoolEntry, 0, p.config.MaxPoolCapacity) + p.connMap = make(map[uint]*MySQLClient) + + return lastErr +} + +// Stats 获取连接池统计信息 +func (p *MySQLConnectionPool) Stats() PoolStats { + p.mu.RLock() + defer p.mu.RUnlock() + return p.stats +} + +// cleanupIdleConnections 清理空闲连接 +func (p *MySQLConnectionPool) cleanupIdleConnections() { + p.mu.Lock() + defer p.mu.Unlock() + + now := time.Now() + keepEntries := make([]*MySQLPoolEntry, 0, len(p.entries)) + + for _, entry := range p.entries { + entry.mu.Lock() + isIdle := !entry.InUse + idleDuration := now.Sub(entry.LastUsed) + entry.mu.Unlock() + + // 保留条件:正在使用 或 空闲时间未超过阈值 或 数量少于最小空闲数 + keep := !isIdle || + idleDuration < p.config.ConnMaxIdleTime || + len(keepEntries) < p.config.MinIdleConns + + if keep { + keepEntries = append(keepEntries, entry) + } else { + // 关闭连接 + entry.Client.Close() + } + } + + p.entries = keepEntries + p.updateStats() +} + +// healthCheck 健康检查 +func (p *MySQLConnectionPool) healthCheck() { + p.mu.RLock() + entriesCopy := make([]*MySQLPoolEntry, len(p.entries)) + copy(entriesCopy, p.entries) + p.mu.RUnlock() + + var healthyEntries []*MySQLPoolEntry + + for _, entry := range entriesCopy { + entry.mu.Lock() + if !entry.InUse { + // Ping 测试 + if err := entry.Client.sqlDB.Ping(); err != nil { + // 连接失效,标记为需要关闭 + entry.mu.Unlock() + entry.Client.Close() + continue + } + } + entry.mu.Unlock() + healthyEntries = append(healthyEntries, entry) + } + + // 更新连接池 + p.mu.Lock() + defer p.mu.Unlock() + p.entries = healthyEntries + p.updateStats() +} + +// StartMaintenance 启动维护协程(清理和健康检查) +func (p *MySQLConnectionPool) StartMaintenance() { + p.wg.Add(1) + go func() { + defer p.wg.Done() + + ticker := time.NewTicker(p.config.HealthCheckInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + // 清理空闲连接 + p.cleanupIdleConnections() + // 健康检查 + p.healthCheck() + case <-p.stopCh: + return + } + } + }() +} + +// createNewEntry 创建新的连接池条目 +func (p *MySQLConnectionPool) createNewEntry(conn *models.DbConnection) (*MySQLPoolEntry, error) { + startTime := time.Now() + + client, err := createMySQLClient(conn) + if err != nil { + return nil, err + } + + elapsed := time.Since(startTime) + + // 慢连接日志 + if p.config.EnableSlowConnLog && elapsed > p.config.SlowConnThreshold { + // 记录慢连接 + p.mu.Lock() + p.stats.SlowConnCount++ + p.mu.Unlock() + } + + entry := &MySQLPoolEntry{ + Client: client, + LastUsed: time.Now(), + CreatedAt: startTime, + InUse: true, + } + + return entry, nil +} + +// waitForAvailableConnection 等待可用连接 +func (p *MySQLConnectionPool) waitForAvailableConnection(conn *models.DbConnection) error { + // 实现简单的等待逻辑(使用 channel) + // 创建一个超时上下文 + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return ErrPoolExhausted + case <-ticker.C: + // 检查是否有可用连接 + p.mu.RLock() + hasAvailable := false + for _, entry := range p.entries { + entry.mu.Lock() + if !entry.InUse { + hasAvailable = true + entry.mu.Unlock() + break + } + entry.mu.Unlock() + } + p.mu.RUnlock() + + if hasAvailable { + return nil + } + } + } +} + +// updateWaitStats 更新等待统计 +func (p *MySQLConnectionPool) updateWaitStats(startTime time.Time) { + waitDuration := time.Since(startTime) + p.stats.WaitCount++ + p.stats.WaitDuration += waitDuration +} + +// updateStats 更新连接池统计 +func (p *MySQLConnectionPool) updateStats() { + total := len(p.entries) + active := 0 + idle := 0 + + for _, entry := range p.entries { + entry.mu.Lock() + if entry.InUse { + active++ + } else { + idle++ + } + entry.mu.Unlock() + } + + p.stats.TotalConns = total + p.stats.ActiveConns = active + p.stats.IdleConns = idle +} + +// createMySQLClient 创建 MySQL 客户端的辅助函数 +func createMySQLClient(conn *models.DbConnection) (*MySQLClient, error) { + // 解密密码 + password, err := crypto.DecryptPassword(conn.Password) + if err != nil { + return nil, fmt.Errorf("密码解密失败: %v", err) + } + + config := &MySQLConfig{ + Host: conn.Host, + Port: conn.Port, + Username: conn.Username, + Password: password, + Database: conn.Database, + } + + return NewMySQLClient(config) +} + +// 错误定义 +var ( + ErrPoolExhausted = &PoolError{Message: "连接池已耗尽"} + ErrPoolClosed = &PoolError{Message: "连接池已关闭"} +) + +// PoolError 连接池错误 +type PoolError struct { + Message string + Err error +} + +func (e *PoolError) Error() string { + if e.Err != nil { + return e.Message + ": " + e.Err.Error() + } + return e.Message +} diff --git a/web/package.json.md5 b/web/package.json.md5 index 3e14332..f9182b2 100644 --- a/web/package.json.md5 +++ b/web/package.json.md5 @@ -1 +1 @@ -74b8a7937d28d6e8fb6d93e63e81abf7 \ No newline at end of file +0b56c4ddab241d0ca843efcc544c131c \ No newline at end of file