- Markdown 编辑器:实时预览、PDF 导出、独立查看器 - 数据库优化:动态连接池、查询缓存、Redis Pipeline - 窗口置顶功能 - 文件系统增强:右键菜单、编辑器集成、收藏夹重构 - 安全修复:XSS 防护、路径穿越、HTML 注入 - 代码质量:正则预编译、缓存锁优化、死代码清理
394 lines
9.7 KiB
Go
394 lines
9.7 KiB
Go
package dbclient
|
||
|
||
import (
|
||
"context"
|
||
"encoding/json"
|
||
"fmt"
|
||
"sync"
|
||
"time"
|
||
|
||
"u-desk/internal/common"
|
||
"u-desk/internal/crypto"
|
||
"u-desk/internal/storage/models"
|
||
)
|
||
|
||
// ConnectionPool 连接池管理器
|
||
type ConnectionPool struct {
|
||
mysqlClients map[uint]*MySQLClient
|
||
redisClients map[uint]*RedisClient
|
||
mongoClients map[uint]*MongoClient
|
||
|
||
// 新增:MySQL 真连接池
|
||
mysqlPool *MySQLConnectionPool
|
||
|
||
// 查询优化器
|
||
queryOptimizer *QueryOptimizer
|
||
|
||
mu sync.RWMutex
|
||
}
|
||
|
||
var (
|
||
globalPool *ConnectionPool
|
||
poolOnce sync.Once
|
||
)
|
||
|
||
// GetPool 获取全局连接池实例
|
||
func GetPool() *ConnectionPool {
|
||
poolOnce.Do(func() {
|
||
// 创建 MySQL 连接池
|
||
poolConfig := DefaultPoolConfig()
|
||
|
||
mysqlPool := NewMySQLConnectionPool(poolConfig)
|
||
// 启动维护协程
|
||
mysqlPool.StartMaintenance()
|
||
|
||
// 创建查询优化器
|
||
queryOptimizer := NewQueryOptimizer(nil)
|
||
|
||
globalPool = &ConnectionPool{
|
||
mysqlClients: make(map[uint]*MySQLClient),
|
||
redisClients: make(map[uint]*RedisClient),
|
||
mongoClients: make(map[uint]*MongoClient),
|
||
mysqlPool: mysqlPool,
|
||
queryOptimizer: queryOptimizer,
|
||
}
|
||
})
|
||
return globalPool
|
||
}
|
||
|
||
// PooledClient 带释放语义的客户端包装
|
||
type PooledClient struct {
|
||
Client *MySQLClient
|
||
entry *MySQLPoolEntry
|
||
pool *MySQLConnectionPool
|
||
fromPool bool
|
||
}
|
||
|
||
// Release 释放连接回连接池
|
||
func (pc *PooledClient) Release() {
|
||
if pc.fromPool && pc.pool != nil && pc.entry != nil {
|
||
pc.pool.Release(pc.entry)
|
||
}
|
||
}
|
||
|
||
// GetMySQLClient 获取或创建 MySQL 客户端(使用连接池)
|
||
func (p *ConnectionPool) GetMySQLClient(conn *models.DbConnection) *PooledClient {
|
||
p.mu.Lock()
|
||
defer p.mu.Unlock()
|
||
|
||
// 尝试从连接池获取连接
|
||
if p.mysqlPool != nil {
|
||
entry, err := p.mysqlPool.Acquire(conn)
|
||
if err == nil {
|
||
return &PooledClient{Client: entry.Client, entry: entry, pool: p.mysqlPool, fromPool: true}
|
||
}
|
||
p.logPoolError("Acquire failed", err)
|
||
}
|
||
|
||
// 降级到原有逻辑
|
||
client, err := p.getMySQLClientLegacy(conn)
|
||
if err != nil {
|
||
return &PooledClient{Client: nil, fromPool: false}
|
||
}
|
||
return &PooledClient{Client: client, fromPool: false}
|
||
}
|
||
|
||
// logPoolError 记录连接池错误
|
||
func (p *ConnectionPool) logPoolError(operation string, err error) {
|
||
if p.queryOptimizer != nil {
|
||
// 通过查询优化器记录错误
|
||
p.queryOptimizer.RecordPoolError(operation, err)
|
||
}
|
||
}
|
||
|
||
// getMySQLClientLegacy 原有的 MySQL 客户端获取逻辑(向后兼容)
|
||
func (p *ConnectionPool) getMySQLClientLegacy(conn *models.DbConnection) (*MySQLClient, error) {
|
||
// 检查是否已存在
|
||
if client, ok := p.mysqlClients[conn.ID]; ok {
|
||
// 测试连接是否有效
|
||
if err := client.sqlDB.Ping(); err == nil {
|
||
return client, nil
|
||
}
|
||
// 连接已断开,移除并重新创建
|
||
client.Close()
|
||
delete(p.mysqlClients, conn.ID)
|
||
}
|
||
|
||
// 解密密码
|
||
password, err := crypto.DecryptPassword(conn.Password)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("密码解密失败: %v", err)
|
||
}
|
||
|
||
// 创建新客户端
|
||
config := &MySQLConfig{
|
||
Host: conn.Host,
|
||
Port: conn.Port,
|
||
Username: conn.Username,
|
||
Password: password, // 如果密码为空,MySQL会尝试无密码连接
|
||
Database: conn.Database,
|
||
}
|
||
|
||
client, err := NewMySQLClient(config)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
p.mysqlClients[conn.ID] = client
|
||
return client, nil
|
||
}
|
||
|
||
// GetMySQLPoolStats 获取 MySQL 连接池统计信息
|
||
func (p *ConnectionPool) GetMySQLPoolStats() *PoolStats {
|
||
if p.mysqlPool != nil {
|
||
stats := p.mysqlPool.Stats()
|
||
return &stats
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// OptimizeQuery 优化查询执行
|
||
func (p *ConnectionPool) OptimizeQuery(ctx context.Context, conn *models.DbConnection, sqlStr string, database string) (*QueryResult, time.Duration, error) {
|
||
pc := p.GetMySQLClient(conn)
|
||
if pc.Client == nil {
|
||
return nil, 0, fmt.Errorf("获取 MySQL 连接失败")
|
||
}
|
||
defer pc.Release()
|
||
|
||
// 使用查询优化器
|
||
if p.queryOptimizer != nil {
|
||
return p.queryOptimizer.OptimizeQuery(ctx, pc.Client, sqlStr, database)
|
||
}
|
||
|
||
// 降级到普通查询
|
||
startTime := time.Now()
|
||
result, err := pc.Client.ExecuteQuery(ctx, sqlStr, database)
|
||
duration := time.Since(startTime)
|
||
return result, duration, err
|
||
}
|
||
|
||
// ExecuteOptimizedUpdate 执行优化的更新操作
|
||
func (p *ConnectionPool) ExecuteOptimizedUpdate(ctx context.Context, conn *models.DbConnection, sqlStr string, database string) (int64, time.Duration, error) {
|
||
pc := p.GetMySQLClient(conn)
|
||
if pc.Client == nil {
|
||
return 0, 0, fmt.Errorf("获取 MySQL 连接失败")
|
||
}
|
||
defer pc.Release()
|
||
|
||
// 使用查询优化器
|
||
if p.queryOptimizer != nil {
|
||
return p.queryOptimizer.ExecuteOptimizedUpdate(ctx, pc.Client, sqlStr, database)
|
||
}
|
||
|
||
// 降级到普通更新
|
||
startTime := time.Now()
|
||
result, err := pc.Client.ExecuteUpdate(ctx, sqlStr, database)
|
||
duration := time.Since(startTime)
|
||
return result, duration, err
|
||
}
|
||
|
||
// GetQueryStats 获取查询统计信息
|
||
func (p *ConnectionPool) GetQueryStats() QueryStats {
|
||
if p.queryOptimizer != nil {
|
||
return p.queryOptimizer.GetQueryStats()
|
||
}
|
||
return QueryStats{}
|
||
}
|
||
|
||
// GetSlowQueries 获取慢查询记录
|
||
func (p *ConnectionPool) GetSlowQueries(limit int) []SlowQuery {
|
||
if p.queryOptimizer != nil {
|
||
return p.queryOptimizer.GetSlowQueries(limit)
|
||
}
|
||
return []SlowQuery{}
|
||
}
|
||
|
||
// GetIndexSuggestions 获取索引建议
|
||
func (p *ConnectionPool) GetIndexSuggestions(table string) []IndexSuggestion {
|
||
if p.queryOptimizer != nil {
|
||
return p.queryOptimizer.GetIndexSuggestions(table)
|
||
}
|
||
return []IndexSuggestion{}
|
||
}
|
||
|
||
// GenerateIndexSuggestions 为表生成索引建议
|
||
func (p *ConnectionPool) GenerateIndexSuggestions(ctx context.Context, conn *models.DbConnection, database, table string) error {
|
||
pc := p.GetMySQLClient(conn)
|
||
if pc.Client == nil {
|
||
return fmt.Errorf("获取 MySQL 连接失败")
|
||
}
|
||
defer pc.Release()
|
||
|
||
// 使用查询优化器
|
||
if p.queryOptimizer != nil {
|
||
return p.queryOptimizer.GenerateIndexSuggestions(ctx, pc.Client, database, table)
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// ClearQueryCache 清空查询缓存
|
||
func (p *ConnectionPool) ClearQueryCache() {
|
||
if p.queryOptimizer != nil {
|
||
p.queryOptimizer.ClearCache()
|
||
}
|
||
}
|
||
|
||
// GetRedisClient 获取或创建 Redis 客户端
|
||
func (p *ConnectionPool) GetRedisClient(conn *models.DbConnection) (*RedisClient, error) {
|
||
p.mu.Lock()
|
||
defer p.mu.Unlock()
|
||
|
||
// 检查是否已存在
|
||
if client, ok := p.redisClients[conn.ID]; ok {
|
||
// 测试连接是否有效
|
||
ctx, cancel := context.WithTimeout(context.Background(), common.TimeoutPing)
|
||
defer cancel()
|
||
if err := client.client.Ping(ctx).Err(); err == nil {
|
||
return client, nil
|
||
}
|
||
// 连接已断开,移除并重新创建
|
||
client.Close()
|
||
delete(p.redisClients, conn.ID)
|
||
}
|
||
|
||
// 解密密码
|
||
password, err := crypto.DecryptPassword(conn.Password)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("密码解密失败: %v", err)
|
||
}
|
||
|
||
// 解析 Redis DB 编号(从 Database 字段,默认为 0)
|
||
dbNum := 0
|
||
if conn.Database != "" {
|
||
// 尝试解析 Database 字段为数字
|
||
_, err := fmt.Sscanf(conn.Database, "%d", &dbNum)
|
||
if err != nil {
|
||
// 如果解析失败,使用默认值 0
|
||
dbNum = 0
|
||
}
|
||
// 限制 DB 编号在 0-15 之间
|
||
if dbNum < 0 || dbNum > 15 {
|
||
dbNum = 0
|
||
}
|
||
}
|
||
|
||
// 创建新客户端
|
||
config := &RedisConfig{
|
||
Host: conn.Host,
|
||
Port: conn.Port,
|
||
Password: password,
|
||
DB: dbNum,
|
||
}
|
||
|
||
client, err := NewRedisClient(config)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
p.redisClients[conn.ID] = client
|
||
return client, nil
|
||
}
|
||
|
||
// GetMongoClient 获取或创建 MongoDB 客户端
|
||
func (p *ConnectionPool) GetMongoClient(conn *models.DbConnection) (*MongoClient, error) {
|
||
p.mu.Lock()
|
||
defer p.mu.Unlock()
|
||
|
||
// 检查是否已存在
|
||
if client, ok := p.mongoClients[conn.ID]; ok {
|
||
// 测试连接是否有效
|
||
ctx, cancel := context.WithTimeout(context.Background(), common.TimeoutPing)
|
||
defer cancel()
|
||
if err := client.client.Ping(ctx, nil); err == nil {
|
||
return client, nil
|
||
}
|
||
// 连接已断开,移除并重新创建
|
||
client.Close()
|
||
delete(p.mongoClients, conn.ID)
|
||
}
|
||
|
||
// 解密密码
|
||
password, err := crypto.DecryptPassword(conn.Password)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("密码解密失败: %v", err)
|
||
}
|
||
|
||
// 解析 Options 获取 MongoDB 连接参数
|
||
authSource := ""
|
||
authMechanism := ""
|
||
if conn.Options != "" {
|
||
var opts map[string]interface{}
|
||
if err := json.Unmarshal([]byte(conn.Options), &opts); err == nil {
|
||
if as, ok := opts["authSource"].(string); ok && as != "" {
|
||
authSource = as
|
||
}
|
||
if am, ok := opts["authMechanism"].(string); ok && am != "" {
|
||
authMechanism = am
|
||
}
|
||
}
|
||
}
|
||
|
||
// 创建新客户端
|
||
config := &MongoConfig{
|
||
Host: conn.Host,
|
||
Port: conn.Port,
|
||
Username: conn.Username,
|
||
Password: password,
|
||
Database: conn.Database,
|
||
AuthSource: authSource,
|
||
AuthMechanism: authMechanism,
|
||
}
|
||
|
||
client, err := NewMongoClient(config)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
p.mongoClients[conn.ID] = client
|
||
return client, nil
|
||
}
|
||
|
||
// CloseConnection 关闭指定连接
|
||
func (p *ConnectionPool) CloseConnection(connID uint, dbType string) {
|
||
p.mu.Lock()
|
||
defer p.mu.Unlock()
|
||
|
||
switch dbType {
|
||
case "mysql":
|
||
if client, ok := p.mysqlClients[connID]; ok {
|
||
client.Close()
|
||
delete(p.mysqlClients, connID)
|
||
}
|
||
case "redis":
|
||
if client, ok := p.redisClients[connID]; ok {
|
||
client.Close()
|
||
delete(p.redisClients, connID)
|
||
}
|
||
case "mongo":
|
||
if client, ok := p.mongoClients[connID]; ok {
|
||
client.Close()
|
||
delete(p.mongoClients, connID)
|
||
}
|
||
}
|
||
}
|
||
|
||
// CloseAll 关闭所有连接
|
||
func (p *ConnectionPool) CloseAll() {
|
||
p.mu.Lock()
|
||
defer p.mu.Unlock()
|
||
|
||
for _, client := range p.mysqlClients {
|
||
client.Close()
|
||
}
|
||
for _, client := range p.redisClients {
|
||
client.Close()
|
||
}
|
||
for _, client := range p.mongoClients {
|
||
client.Close()
|
||
}
|
||
|
||
p.mysqlClients = make(map[uint]*MySQLClient)
|
||
p.redisClients = make(map[uint]*RedisClient)
|
||
p.mongoClients = make(map[uint]*MongoClient)
|
||
}
|