package dbclient import ( "context" "crypto/sha256" "fmt" "regexp" "strings" "sync" "time" ) var ( reLimitOffset = regexp.MustCompile(`limit\s+(\d+)(?:\s*,\s*(\d+))?`) reFromTable = regexp.MustCompile(`(?i)from\s+([^\s,]+)`) reWhereClause = regexp.MustCompile(`(?i)where\s+(.*?)(?:\s+order\s+by|\s+limit|\s+group\s+by|$)`) reOrderBy = regexp.MustCompile(`(?i)order\s+by\s+(.*?)(?:\s+limit|$)`) reBatchOperation = regexp.MustCompile(`(?i)^\s*(INSERT|UPDATE|DELETE).*VALUES\s*\(`) ) // CachedQuery 缓存查询结果 type CachedQuery struct { Result *QueryResult ExpiryTime time.Time CreatedAt time.Time QueryHash string QueryParams QueryParams LastUsed time.Time // 最后使用时间(用于LRU策略) AccessCount int64 // 访问次数(用于LFU策略) } // QueryParams 查询参数(用于缓存键生成) type QueryParams struct { SQL string Database string Limit int Offset int Table string Where string SortBy string IsReadOnly bool } // QueryStats 查询统计信息 type QueryStats struct { TotalQueries int64 CachedQueries int64 SlowQueries int64 TotalDuration time.Duration AverageDuration time.Duration CacheHitRate float64 LastCacheUpdate time.Time } // SlowQuery 慢查询记录 type SlowQuery struct { Query string Database string Duration time.Duration Timestamp time.Time Params QueryParams Table string IndexUsed string RowsAffected int64 Error error } // IndexSuggestion 索引建议 type IndexSuggestion struct { Table string Columns []string IndexType string // "normal", "unique", "fulltext" Priority string // "high", "medium", "low" Query string Justification string CanBeApplied bool } // QueryOptimizer 查询优化器 type QueryOptimizer struct { cache *QueryCache stats *QueryStats slowQueries []SlowQuery indexSuggestions []IndexSuggestion mu sync.RWMutex config *OptimizerConfig stopCh chan struct{} wg sync.WaitGroup } // OptimizerConfig 查询优化器配置 type OptimizerConfig struct { // 缓存配置 CacheSize int // 最大缓存条目数 CacheTTL time.Duration // 缓存过期时间 EnableCache bool // 是否启用缓存 // 慢查询配置 SlowQueryThreshold time.Duration // 慢查询阈值 EnableSlowLog bool // 是否启用慢查询日志 MaxSlowLogs int // 最大慢查询记录数 // 索引建议配置 EnableIndexSuggestions bool // 是否启用索引建议 MaxSuggestions int // 最大索引建议数 // 查询分析配置 EnableQueryAnalysis bool // 是否启用查询分析 MaxAnalysisDepth int // 查询分析深度 } // DefaultOptimizerConfig 返回默认的查询优化器配置 func DefaultOptimizerConfig() *OptimizerConfig { return &OptimizerConfig{ CacheSize: 1000, // 最多缓存1000个查询 CacheTTL: 30 * time.Minute, // 缓存30分钟 EnableCache: true, // 启用缓存 SlowQueryThreshold: 100 * time.Millisecond, // 100ms以上为慢查询 EnableSlowLog: true, // 启用慢查询日志 MaxSlowLogs: 1000, // 最多记录1000条慢查询 EnableIndexSuggestions: true, // 启用索引建议 MaxSuggestions: 100, // 最多100个索引建议 EnableQueryAnalysis: true, // 启用查询分析 MaxAnalysisDepth: 3, // 分析深度3 } } // NewQueryOptimizer 创建新的查询优化器 func NewQueryOptimizer(config *OptimizerConfig) *QueryOptimizer { if config == nil { config = DefaultOptimizerConfig() } optimizer := &QueryOptimizer{ cache: NewQueryCache(config.CacheSize, config.CacheTTL), stats: &QueryStats{}, config: config, stopCh: make(chan struct{}), slowQueries: make([]SlowQuery, 0), indexSuggestions: make([]IndexSuggestion, 0), } // 启动维护协程 optimizer.StartMaintenance() return optimizer } // OptimizeQuery 优化查询执行 func (o *QueryOptimizer) OptimizeQuery(ctx context.Context, client *MySQLClient, sqlStr string, database string) (*QueryResult, time.Duration, error) { startTime := time.Now() queryParams := o.parseQueryParams(sqlStr, database) // 检查缓存 if o.config.EnableCache && queryParams.IsReadOnly { cached, err := o.cache.Get(queryParams) if err == nil && cached != nil { o.recordCacheHit() return cached.Result, time.Since(startTime), nil } } // 执行查询 result, err := client.ExecuteQuery(ctx, sqlStr, database) if err != nil { duration := time.Since(startTime) o.recordSlowQuery(sqlStr, database, duration, queryParams, result, err) return nil, duration, err } duration := time.Since(startTime) // 检查是否为慢查询 if duration > o.config.SlowQueryThreshold { o.recordSlowQuery(sqlStr, database, duration, queryParams, result, err) } // 缓存只读查询结果 if o.config.EnableCache && queryParams.IsReadOnly && err == nil { cachedResult := &CachedQuery{ Result: result, ExpiryTime: time.Now().Add(o.config.CacheTTL), CreatedAt: time.Now(), QueryHash: o.generateQueryHash(queryParams), QueryParams: queryParams, LastUsed: time.Now(), AccessCount: 1, } o.cache.Set(queryParams, cachedResult) } o.recordQuery(duration) return result, duration, err } // ExecuteOptimizedUpdate 执行优化的更新操作 func (o *QueryOptimizer) ExecuteOptimizedUpdate(ctx context.Context, client *MySQLClient, sqlStr string, database string) (int64, time.Duration, error) { startTime := time.Now() // 分析更新查询 queryParams := o.parseQueryParams(sqlStr, database) // 检查是否为批量操作 if o.isBatchOperation(sqlStr) { // 优化批量操作 rowsAffected, duration, err := o.optimizeBatchUpdate(ctx, client, sqlStr, database) if err != nil { o.recordSlowQuery(sqlStr, database, duration, queryParams, nil, err) return 0, duration, err } o.recordQuery(duration) return rowsAffected, duration, nil } // 执行普通更新 rowsAffected, err := client.ExecuteUpdate(ctx, sqlStr, database) duration := time.Since(startTime) if duration > o.config.SlowQueryThreshold { o.recordSlowQuery(sqlStr, database, duration, queryParams, nil, err) } o.recordQuery(duration) return rowsAffected, duration, err } // GetIndexSuggestions 获取索引建议 func (o *QueryOptimizer) GetIndexSuggestions(table string) []IndexSuggestion { o.mu.RLock() defer o.mu.RUnlock() var suggestions []IndexSuggestion for _, suggestion := range o.indexSuggestions { if suggestion.Table == table || table == "" { suggestions = append(suggestions, suggestion) } } return suggestions } // GenerateIndexSuggestions 为表生成索引建议 func (o *QueryOptimizer) GenerateIndexSuggestions(ctx context.Context, client *MySQLClient, database, table string) error { // 获取表的慢查询记录 tableSlowQueries := o.getTableSlowQueries(database, table) // 分析查询模式 for _, slowQuery := range tableSlowQueries { suggestions := o.analyzeQueryForIndexes(slowQuery.Query, table) o.mu.Lock() o.indexSuggestions = append(o.indexSuggestions, suggestions...) // 限制建议数量 if len(o.indexSuggestions) > o.config.MaxSuggestions { o.indexSuggestions = o.indexSuggestions[:o.config.MaxSuggestions] } o.mu.Unlock() } return nil } // GetQueryStats 获取查询统计信息 func (o *QueryOptimizer) GetQueryStats() QueryStats { o.mu.RLock() defer o.mu.RUnlock() return *o.stats } // GetSlowQueries 获取慢查询记录 func (o *QueryOptimizer) GetSlowQueries(limit int) []SlowQuery { o.mu.RLock() defer o.mu.RUnlock() if limit <= 0 || limit > len(o.slowQueries) { limit = len(o.slowQueries) } return o.slowQueries[:limit] } // ClearCache 清空缓存 func (o *QueryOptimizer) ClearCache() { o.cache.Clear() } // Stop 停止优化器 func (o *QueryOptimizer) Stop() { close(o.stopCh) o.wg.Wait() } // parseQueryParams 解析查询参数 func (o *QueryOptimizer) parseQueryParams(sqlStr, database string) QueryParams { params := QueryParams{ SQL: sqlStr, Database: database, } // 解析LIMIT和OFFSET limit, offset := o.parseLimitOffset(sqlStr) params.Limit = limit params.Offset = offset // 解析表名 tables := o.parseTables(sqlStr) if len(tables) > 0 { params.Table = tables[0] } // 解析WHERE条件 where := o.parseWhereCondition(sqlStr) params.Where = where // 解析排序 sort := o.parseSortOrder(sqlStr) params.SortBy = sort // 判断是否为只读查询 params.IsReadOnly = o.isReadOnlyQuery(sqlStr) return params } // parseLimitOffset 解析LIMIT和OFFSET func (o *QueryOptimizer) parseLimitOffset(sqlStr string) (limit, offset int) { sqlStr = strings.ToLower(sqlStr) matches := reLimitOffset.FindStringSubmatch(sqlStr) if len(matches) > 1 { fmt.Sscanf(matches[1], "%d", &limit) if len(matches) > 2 && matches[2] != "" { fmt.Sscanf(matches[2], "%d", &offset) } } // MySQL LIMIT offset, count: matches[1]=offset, matches[2]=count if len(matches) > 2 && matches[2] != "" { offset, limit = limit, offset } return limit, offset } // parseTables 解析查询中的表名 func (o *QueryOptimizer) parseTables(sqlStr string) []string { // 简单实现:解析FROM和JOIN中的表名 tables := make([]string, 0) fromMatches := reFromTable.FindAllStringSubmatch(sqlStr, -1) for _, match := range fromMatches { if len(match) > 1 { tableName := strings.Trim(match[1], "`\"'[]") tables = append(tables, tableName) } } return tables } // parseWhereCondition 解析WHERE条件 func (o *QueryOptimizer) parseWhereCondition(sqlStr string) string { matches := reWhereClause.FindStringSubmatch(sqlStr) if len(matches) > 1 { return strings.TrimSpace(matches[1]) } return "" } // parseSortOrder 解析排序条件 func (o *QueryOptimizer) parseSortOrder(sqlStr string) string { matches := reOrderBy.FindStringSubmatch(sqlStr) if len(matches) > 1 { return strings.TrimSpace(matches[1]) } return "" } // isReadOnlyQuery 判断是否为只读查询 func (o *QueryOptimizer) isReadOnlyQuery(sqlStr string) bool { sqlStr = strings.ToUpper(strings.TrimSpace(sqlStr)) // SELECT只读查询 if strings.HasPrefix(sqlStr, "SELECT") { return true } // 支持的只读查询类型 readOnlyQueries := []string{ "SHOW", "DESCRIBE", "DESC", "EXPLAIN", "WITH", "UNION", "INTERSECT", "EXCEPT", } for _, query := range readOnlyQueries { if strings.HasPrefix(sqlStr, query) { return true } } return false } // isBatchOperation 判断是否为批量操作 func (o *QueryOptimizer) isBatchOperation(sqlStr string) bool { return reBatchOperation.MatchString(sqlStr) } // generateQueryHash 生成查询哈希 func (o *QueryOptimizer) generateQueryHash(params QueryParams) string { hashData := fmt.Sprintf("%s|%s|%d|%d|%s|%s|%s|%v", params.SQL, params.Database, params.Limit, params.Offset, params.Table, params.Where, params.SortBy, params.IsReadOnly) h := sha256.Sum256([]byte(hashData)) return fmt.Sprintf("%x", h) } // recordQuery 记录查询统计 func (o *QueryOptimizer) recordQuery(duration time.Duration) { o.mu.Lock() defer o.mu.Unlock() o.stats.TotalQueries++ o.stats.TotalDuration += duration o.stats.AverageDuration = time.Duration(int64(float64(o.stats.TotalDuration) / float64(o.stats.TotalQueries))) now := time.Now() if o.stats.LastCacheUpdate.IsZero() || now.Sub(o.stats.LastCacheUpdate) > 5*time.Minute { // 更新缓存命中率 total := o.stats.TotalQueries hit := o.stats.CachedQueries o.stats.CacheHitRate = float64(hit) / float64(total) * 100 o.stats.LastCacheUpdate = now } } // recordCacheHit 记录缓存命中 func (o *QueryOptimizer) recordCacheHit() { o.mu.Lock() defer o.mu.Unlock() o.stats.CachedQueries++ } // recordSlowQuery 记录慢查询 func (o *QueryOptimizer) recordSlowQuery(query, database string, duration time.Duration, params QueryParams, result *QueryResult, err error) { if !o.config.EnableSlowLog { return } slowQuery := SlowQuery{ Query: query, Database: database, Duration: duration, Timestamp: time.Now(), Params: params, Table: params.Table, IndexUsed: o.extractIndexUsed(query), RowsAffected: o.extractRowsAffected(result), Error: err, } o.mu.Lock() defer o.mu.Unlock() o.slowQueries = append(o.slowQueries, slowQuery) // 限制慢查询记录数量 if len(o.slowQueries) > o.config.MaxSlowLogs { o.slowQueries = o.slowQueries[1:] } o.stats.SlowQueries++ } // extractIndexUsed 提取使用的索引 func (o *QueryOptimizer) extractIndexUsed(query string) string { // 简单实现:从EXPLAIN结果中提取索引信息 // 实际项目中应该执行EXPLAIN语句分析 return "unknown" } // extractRowsAffected 提取影响的行数 func (o *QueryOptimizer) extractRowsAffected(result *QueryResult) int64 { if result != nil && len(result.Data) > 0 { if rows, ok := result.Data[0]["rows_affected"].(int64); ok { return rows } } return 0 } // analyzeQuery 分析查询性能 func (o *QueryOptimizer) analyzeQuery(query, database string, result *QueryResult, duration time.Duration) { // 这里可以实现更复杂的查询分析逻辑 // 比如分析查询计划、检测N+1查询问题等 // 简单实现:记录查询到统计信息中 _ = query _ = database _ = result _ = duration } // analyzeQueryForIndexes 分析查询为索引建议 func (o *QueryOptimizer) analyzeQueryForIndexes(query, table string) []IndexSuggestion { var suggestions []IndexSuggestion // 解析查询中的WHERE条件 where := o.parseWhereCondition(query) if where != "" { // 提取WHERE条件中的列 columns := o.extractColumnsFromWhere(where) if len(columns) > 0 { // 创建索引建议 suggestion := IndexSuggestion{ Table: table, Columns: columns, IndexType: "normal", Priority: "medium", Query: query, Justification: fmt.Sprintf("查询经常使用WHERE条件 %s", where), CanBeApplied: true, } suggestions = append(suggestions, suggestion) } } // 解析ORDER BY条件 order := o.parseSortOrder(query) if order != "" { // 提取排序的列 columns := o.extractColumnsFromOrder(order) if len(columns) > 0 { // 创建排序索引建议 suggestion := IndexSuggestion{ Table: table, Columns: columns, IndexType: "normal", Priority: "low", Query: query, Justification: fmt.Sprintf("查询经常使用ORDER BY %s", order), CanBeApplied: true, } suggestions = append(suggestions, suggestion) } } return suggestions } // extractColumnsFromWhere 从WHERE条件中提取列名 func (o *QueryOptimizer) extractColumnsFromWhere(where string) []string { // 简单实现:提取WHERE条件中的列名 columns := make([]string, 0) // 这里可以实现更复杂的列名解析逻辑 // 目前只做简单处理 words := strings.Fields(where) for _, word := range words { // 去除运算符和引号 if !strings.Contains(word, "=") && !strings.Contains(word, ">") && !strings.Contains(word, "<") && !strings.Contains(word, "!=") && !strings.HasPrefix(word, "'") && !strings.HasPrefix(word, "\"") { columns = append(columns, strings.Trim(word, " `\"'[]")) } } return columns } // extractColumnsFromOrder 从ORDER BY条件中提取列名 func (o *QueryOptimizer) extractColumnsFromOrder(order string) []string { // 简单实现:提取ORDER BY中的列名 columns := strings.Split(order, ",") for i, col := range columns { columns[i] = strings.TrimSpace(strings.Split(col, " ")[0]) } return columns } // getTableSlowQueries 获取表的慢查询记录 func (o *QueryOptimizer) getTableSlowQueries(database, table string) []SlowQuery { o.mu.RLock() defer o.mu.RUnlock() var tableQueries []SlowQuery for _, query := range o.slowQueries { if (database == "" || query.Database == database) && (table == "" || query.Table == table) { tableQueries = append(tableQueries, query) } } return tableQueries } // optimizeBatchUpdate 优化批量更新操作 func (o *QueryOptimizer) optimizeBatchUpdate(ctx context.Context, client *MySQLClient, sqlStr string, database string) (int64, time.Duration, error) { // 简单实现:执行原始查询 // 实际项目中可以实现批量操作优化 startTime := time.Now() rowsAffected, err := client.ExecuteUpdate(ctx, sqlStr, database) duration := time.Since(startTime) return rowsAffected, duration, err } // StartMaintenance 启动维护协程 func (o *QueryOptimizer) StartMaintenance() { o.wg.Add(1) go func() { defer o.wg.Done() ticker := time.NewTicker(5 * time.Minute) defer ticker.Stop() for { select { case <-ticker.C: // 清理过期的缓存 o.cache.CleanupExpired() // 分析慢查询生成新的索引建议 o.analyzeSlowQueriesForSuggestions() case <-o.stopCh: return } } }() } // RecordPoolError 记录连接池错误 func (o *QueryOptimizer) RecordPoolError(operation string, err error) { if !o.config.EnableSlowLog || err == nil { return } poolError := SlowQuery{ Query: operation, Database: "pool", Duration: 0, Timestamp: time.Now(), Params: QueryParams{SQL: operation}, Table: "connection_pool", IndexUsed: "N/A", RowsAffected: 0, Error: err, } o.mu.Lock() defer o.mu.Unlock() o.slowQueries = append(o.slowQueries, poolError) // 限制慢查询记录数量 if len(o.slowQueries) > o.config.MaxSlowLogs { o.slowQueries = o.slowQueries[1:] } } // analyzeSlowQueriesForSuggestions 分析慢查询生成索引建议 func (o *QueryOptimizer) analyzeSlowQueriesForSuggestions() { // 这里可以实现更复杂的慢查询分析逻辑 // 比如分析查询模式、统计索引使用情况等 // 分析慢查询模式 o.analyzeSlowQueryPatterns() } // analyzeSlowQueryPatterns 分析慢查询模式 func (o *QueryOptimizer) analyzeSlowQueryPatterns() { o.mu.RLock() queryTypes := make(map[string]int) tableQueries := make(map[string]int) for _, query := range o.slowQueries { queryType := o.detectQueryType(query.Query) queryTypes[queryType]++ if query.Table != "" { tableQueries[query.Table]++ } } o.mu.RUnlock() // 根据统计结果生成智能建议(在锁外执行,避免死锁) o.generateSmartSuggestions(queryTypes, tableQueries) } // detectQueryType 检测查询类型 func (o *QueryOptimizer) detectQueryType(sqlStr string) string { sqlStr = strings.ToUpper(strings.TrimSpace(sqlStr)) if strings.HasPrefix(sqlStr, "SELECT") { if strings.Contains(sqlStr, "JOIN") { return "SELECT_JOIN" } else if strings.Contains(sqlStr, "GROUP BY") { return "SELECT_GROUP" } else { return "SELECT_SIMPLE" } } else if strings.HasPrefix(sqlStr, "INSERT") { return "INSERT" } else if strings.HasPrefix(sqlStr, "UPDATE") { return "UPDATE" } else if strings.HasPrefix(sqlStr, "DELETE") { return "DELETE" } return "OTHER" } // generateSmartSuggestions 生成智能建议 func (o *QueryOptimizer) generateSmartSuggestions(queryTypes map[string]int, tableQueries map[string]int) { // 分析频繁执行的查询类型 var mostFrequentType string var maxCount int for queryType, count := range queryTypes { if count > maxCount { maxCount = count mostFrequentType = queryType } } // 生成针对性的索引建议 switch mostFrequentType { case "SELECT_JOIN": // 为JOIN查询建议复合索引 o.generateJoinSuggestions() case "SELECT_GROUP": // 为GROUP BY查询建议索引 o.generateGroupSuggestions() case "INSERT": // 为批量插入建议优化 o.generateInsertSuggestions() } } // generateJoinSuggestions 生成JOIN查询建议 func (o *QueryOptimizer) generateJoinSuggestions() { } // generateGroupSuggestions 生成GROUP BY查询建议 func (o *QueryOptimizer) generateGroupSuggestions() { } // generateInsertSuggestions 生成批量插入建议 func (o *QueryOptimizer) generateInsertSuggestions() { }