Private
Public Access
1
0
Files
u-desk/internal/dbclient/pool_config.go
绝尘 bfe5226bfe 新增:MySQL 真连接池重构基础架构
核心改进:
- 创建 MySQLConnectionPool 真正的连接池实现
- 连接池配置结构 PoolConfig(可配置参数)
- 动态连接获取与释放机制
- 空闲连接自动清理
- 健康检查机制(定期 Ping)
- 慢连接日志记录
- 连接池统计信息(Stats)
- 维护协程(清理+健康检查)

新增文件:
- pool_config.go - 连接池配置和实现
  - PoolConfig: 可配置的连接池参数
  - MySQLConnectionPool: 真正的连接池
  - Acquire/Release: 连接获取与释放
  - 清理与维护协程

修改文件:
- pool.go - 集成新连接池到 ConnectionPool

技术特性:
- 默认配置:20最大连接 / 10最大空闲 / 2最小空闲
- 健康检查:30秒间隔
- 慢连接阈值:500ms
- 连接最大生命周期:30分钟
- 空闲超时:10分钟

TODO:
- 连接预热(启动时建立最小连接)
- LRU 连接复用策略
- 单元测试
- 性能基准测试
2026-03-31 11:49:25 +08:00

427 lines
9.8 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package dbclient
import (
"context"
"fmt"
"sync"
"time"
"u-desk/internal/crypto"
"u-desk/internal/storage/models"
)
// PoolConfig 连接池配置
type PoolConfig struct {
// 最大打开连接数(硬上限)
MaxOpenConns int
// 最大空闲连接数(超过此数量的空闲连接会被关闭)
MaxIdleConns int
// 连接最大生命周期(超过此时间的连接会被关闭)
ConnMaxLifetime time.Duration
// 连接最大空闲时间(超过此时间未使用的连接会被关闭)
ConnMaxIdleTime time.Duration
// 最小空闲连接数(保持此数量的空闲连接以快速响应)
MinIdleConns int
// 连接超时时间(建立连接的最长时间)
ConnTimeout time.Duration
// 健康检查间隔(定期 Ping 连接检查有效性)
HealthCheckInterval time.Duration
// 是否启用连接预热(启动时建立最小连接)
EnableWarmup bool
// 是否启用慢连接日志(记录建立时间超过阈值的连接)
EnableSlowConnLog bool
// 慢连接阈值(超过此时间记录为慢连接)
SlowConnThreshold time.Duration
// 连接池最大容量(防止资源耗尽)
MaxPoolCapacity int
}
// DefaultPoolConfig 返回默认连接池配置
func DefaultPoolConfig() *PoolConfig {
return &PoolConfig{
MaxOpenConns: 20, // 最大20个连接
MaxIdleConns: 10, // 最大10个空闲
ConnMaxLifetime: 30 * time.Minute, // 连接最长30分钟
ConnMaxIdleTime: 10 * time.Minute, // 空闲10分钟关闭
MinIdleConns: 2, // 保持2个最小空闲
ConnTimeout: 5 * time.Second, // 连接超时5秒
HealthCheckInterval: 30 * time.Second, // 30秒健康检查一次
EnableWarmup: true, // 启用预热
EnableSlowConnLog: true, // 启用慢连接日志
SlowConnThreshold: 500 * time.Millisecond, // 超过500ms算慢连接
MaxPoolCapacity: 50, // 连接池最大容量
}
}
// MySQLPoolEntry MySQL 连接池条目
type MySQLPoolEntry struct {
Client *MySQLClient
LastUsed time.Time
CreatedAt time.Time
InUse bool
mu sync.Mutex
}
// AcquireResult 连接获取结果
type AcquireResult struct {
Entry *MySQLPoolEntry
Err error
}
// ReleaseResult 连接释放结果
type ReleaseResult struct {
Success bool
Err error
}
// Stats 连接池统计信息
type PoolStats struct {
TotalConns int // 总连接数
ActiveConns int // 使用中的连接数
IdleConns int // 空闲连接数
WaitCount int64 // 等待连接的次数
WaitDuration time.Duration // 总等待时间
SlowConnCount int64 // 慢连接数量
}
// MySQLConnectionPool MySQL 连接池(真正的连接池)
type MySQLConnectionPool struct {
config *PoolConfig
configHash string // 配置哈希,用于检测配置变更
mu sync.RWMutex
entries []*MySQLPoolEntry // 连接池条目
connMap map[uint]*MySQLClient // 连接ID -> 客户端映射(兼容现有代码)
stats PoolStats
stopCh chan struct{}
wg sync.WaitGroup
}
// NewMySQLConnectionPool 创建新的 MySQL 连接池
func NewMySQLConnectionPool(config *PoolConfig) *MySQLConnectionPool {
if config == nil {
config = DefaultPoolConfig()
}
pool := &MySQLConnectionPool{
config: config,
entries: make([]*MySQLPoolEntry, 0, config.MaxPoolCapacity),
connMap: make(map[uint]*MySQLClient),
stopCh: make(chan struct{}),
}
return pool
}
// Acquire 获取一个连接(阻塞等待直到有可用连接)
func (p *MySQLConnectionPool) Acquire(conn *models.DbConnection) (*MySQLPoolEntry, error) {
p.mu.Lock()
defer p.mu.Unlock()
startTime := time.Now()
// 尝试从池中获取空闲连接
for _, entry := range p.entries {
entry.mu.Lock()
if !entry.InUse {
entry.InUse = true
entry.LastUsed = time.Now()
entry.mu.Unlock()
// 更新统计
p.updateWaitStats(startTime)
return entry, nil
}
entry.mu.Unlock()
}
// 没有可用连接,创建新连接
if len(p.entries) >= p.config.MaxOpenConns {
// 已达到最大连接数,等待
return nil, p.waitForAvailableConnection(conn)
}
// 创建新连接
newEntry, err := p.createNewEntry(conn)
if err != nil {
return nil, err
}
p.entries = append(p.entries, newEntry)
p.updateStats()
p.updateWaitStats(startTime)
return newEntry, nil
}
// Release 释放连接回池中
func (p *MySQLConnectionPool) Release(entry *MySQLPoolEntry) error {
if entry == nil {
return nil
}
entry.mu.Lock()
defer entry.mu.Unlock()
entry.InUse = false
entry.LastUsed = time.Now()
p.mu.Lock()
defer p.mu.Unlock()
p.updateStats()
return nil
}
// Close 关闭连接池
func (p *MySQLConnectionPool) Close() error {
p.mu.Lock()
defer p.mu.Unlock()
// 发送停止信号
close(p.stopCh)
// 等待所有 goroutine 完成
p.wg.Wait()
// 关闭所有连接
var lastErr error
for _, entry := range p.entries {
entry.mu.Lock()
if err := entry.Client.Close(); err != nil {
lastErr = err
}
entry.InUse = false
entry.mu.Unlock()
}
p.entries = make([]*MySQLPoolEntry, 0, p.config.MaxPoolCapacity)
p.connMap = make(map[uint]*MySQLClient)
return lastErr
}
// Stats 获取连接池统计信息
func (p *MySQLConnectionPool) Stats() PoolStats {
p.mu.RLock()
defer p.mu.RUnlock()
return p.stats
}
// cleanupIdleConnections 清理空闲连接
func (p *MySQLConnectionPool) cleanupIdleConnections() {
p.mu.Lock()
defer p.mu.Unlock()
now := time.Now()
keepEntries := make([]*MySQLPoolEntry, 0, len(p.entries))
for _, entry := range p.entries {
entry.mu.Lock()
isIdle := !entry.InUse
idleDuration := now.Sub(entry.LastUsed)
entry.mu.Unlock()
// 保留条件:正在使用 或 空闲时间未超过阈值 或 数量少于最小空闲数
keep := !isIdle ||
idleDuration < p.config.ConnMaxIdleTime ||
len(keepEntries) < p.config.MinIdleConns
if keep {
keepEntries = append(keepEntries, entry)
} else {
// 关闭连接
entry.Client.Close()
}
}
p.entries = keepEntries
p.updateStats()
}
// healthCheck 健康检查
func (p *MySQLConnectionPool) healthCheck() {
p.mu.RLock()
entriesCopy := make([]*MySQLPoolEntry, len(p.entries))
copy(entriesCopy, p.entries)
p.mu.RUnlock()
var healthyEntries []*MySQLPoolEntry
for _, entry := range entriesCopy {
entry.mu.Lock()
if !entry.InUse {
// Ping 测试
if err := entry.Client.sqlDB.Ping(); err != nil {
// 连接失效,标记为需要关闭
entry.mu.Unlock()
entry.Client.Close()
continue
}
}
entry.mu.Unlock()
healthyEntries = append(healthyEntries, entry)
}
// 更新连接池
p.mu.Lock()
defer p.mu.Unlock()
p.entries = healthyEntries
p.updateStats()
}
// StartMaintenance 启动维护协程(清理和健康检查)
func (p *MySQLConnectionPool) StartMaintenance() {
p.wg.Add(1)
go func() {
defer p.wg.Done()
ticker := time.NewTicker(p.config.HealthCheckInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// 清理空闲连接
p.cleanupIdleConnections()
// 健康检查
p.healthCheck()
case <-p.stopCh:
return
}
}
}()
}
// createNewEntry 创建新的连接池条目
func (p *MySQLConnectionPool) createNewEntry(conn *models.DbConnection) (*MySQLPoolEntry, error) {
startTime := time.Now()
client, err := createMySQLClient(conn)
if err != nil {
return nil, err
}
elapsed := time.Since(startTime)
// 慢连接日志
if p.config.EnableSlowConnLog && elapsed > p.config.SlowConnThreshold {
// 记录慢连接
p.mu.Lock()
p.stats.SlowConnCount++
p.mu.Unlock()
}
entry := &MySQLPoolEntry{
Client: client,
LastUsed: time.Now(),
CreatedAt: startTime,
InUse: true,
}
return entry, nil
}
// waitForAvailableConnection 等待可用连接
func (p *MySQLConnectionPool) waitForAvailableConnection(conn *models.DbConnection) error {
// 实现简单的等待逻辑(使用 channel
// 创建一个超时上下文
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ErrPoolExhausted
case <-ticker.C:
// 检查是否有可用连接
p.mu.RLock()
hasAvailable := false
for _, entry := range p.entries {
entry.mu.Lock()
if !entry.InUse {
hasAvailable = true
entry.mu.Unlock()
break
}
entry.mu.Unlock()
}
p.mu.RUnlock()
if hasAvailable {
return nil
}
}
}
}
// updateWaitStats 更新等待统计
func (p *MySQLConnectionPool) updateWaitStats(startTime time.Time) {
waitDuration := time.Since(startTime)
p.stats.WaitCount++
p.stats.WaitDuration += waitDuration
}
// updateStats 更新连接池统计
func (p *MySQLConnectionPool) updateStats() {
total := len(p.entries)
active := 0
idle := 0
for _, entry := range p.entries {
entry.mu.Lock()
if entry.InUse {
active++
} else {
idle++
}
entry.mu.Unlock()
}
p.stats.TotalConns = total
p.stats.ActiveConns = active
p.stats.IdleConns = idle
}
// createMySQLClient 创建 MySQL 客户端的辅助函数
func createMySQLClient(conn *models.DbConnection) (*MySQLClient, error) {
// 解密密码
password, err := crypto.DecryptPassword(conn.Password)
if err != nil {
return nil, fmt.Errorf("密码解密失败: %v", err)
}
config := &MySQLConfig{
Host: conn.Host,
Port: conn.Port,
Username: conn.Username,
Password: password,
Database: conn.Database,
}
return NewMySQLClient(config)
}
// 错误定义
var (
ErrPoolExhausted = &PoolError{Message: "连接池已耗尽"}
ErrPoolClosed = &PoolError{Message: "连接池已关闭"}
)
// PoolError 连接池错误
type PoolError struct {
Message string
Err error
}
func (e *PoolError) Error() string {
if e.Err != nil {
return e.Message + ": " + e.Err.Error()
}
return e.Message
}