Private
Public Access
1
0
Files
u-desk/internal/dbclient/query_optimizer.go
绝尘 e5dbe89a6f 新增:Markdown编辑器/数据库优化/安全修复
- Markdown 编辑器:实时预览、PDF 导出、独立查看器
- 数据库优化:动态连接池、查询缓存、Redis Pipeline
- 窗口置顶功能
- 文件系统增强:右键菜单、编辑器集成、收藏夹重构
- 安全修复:XSS 防护、路径穿越、HTML 注入
- 代码质量:正则预编译、缓存锁优化、死代码清理
2026-03-31 11:49:25 +08:00

763 lines
20 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package dbclient
import (
"context"
"crypto/sha256"
"fmt"
"regexp"
"strings"
"sync"
"time"
)
var (
reLimitOffset = regexp.MustCompile(`limit\s+(\d+)(?:\s*,\s*(\d+))?`)
reFromTable = regexp.MustCompile(`(?i)from\s+([^\s,]+)`)
reWhereClause = regexp.MustCompile(`(?i)where\s+(.*?)(?:\s+order\s+by|\s+limit|\s+group\s+by|$)`)
reOrderBy = regexp.MustCompile(`(?i)order\s+by\s+(.*?)(?:\s+limit|$)`)
reBatchOperation = regexp.MustCompile(`(?i)^\s*(INSERT|UPDATE|DELETE).*VALUES\s*\(`)
)
// CachedQuery 缓存查询结果
type CachedQuery struct {
Result *QueryResult
ExpiryTime time.Time
CreatedAt time.Time
QueryHash string
QueryParams QueryParams
LastUsed time.Time // 最后使用时间用于LRU策略
AccessCount int64 // 访问次数用于LFU策略
}
// QueryParams 查询参数(用于缓存键生成)
type QueryParams struct {
SQL string
Database string
Limit int
Offset int
Table string
Where string
SortBy string
IsReadOnly bool
}
// QueryStats 查询统计信息
type QueryStats struct {
TotalQueries int64
CachedQueries int64
SlowQueries int64
TotalDuration time.Duration
AverageDuration time.Duration
CacheHitRate float64
LastCacheUpdate time.Time
}
// SlowQuery 慢查询记录
type SlowQuery struct {
Query string
Database string
Duration time.Duration
Timestamp time.Time
Params QueryParams
Table string
IndexUsed string
RowsAffected int64
Error error
}
// IndexSuggestion 索引建议
type IndexSuggestion struct {
Table string
Columns []string
IndexType string // "normal", "unique", "fulltext"
Priority string // "high", "medium", "low"
Query string
Justification string
CanBeApplied bool
}
// QueryOptimizer 查询优化器
type QueryOptimizer struct {
cache *QueryCache
stats *QueryStats
slowQueries []SlowQuery
indexSuggestions []IndexSuggestion
mu sync.RWMutex
config *OptimizerConfig
stopCh chan struct{}
wg sync.WaitGroup
}
// OptimizerConfig 查询优化器配置
type OptimizerConfig struct {
// 缓存配置
CacheSize int // 最大缓存条目数
CacheTTL time.Duration // 缓存过期时间
EnableCache bool // 是否启用缓存
// 慢查询配置
SlowQueryThreshold time.Duration // 慢查询阈值
EnableSlowLog bool // 是否启用慢查询日志
MaxSlowLogs int // 最大慢查询记录数
// 索引建议配置
EnableIndexSuggestions bool // 是否启用索引建议
MaxSuggestions int // 最大索引建议数
// 查询分析配置
EnableQueryAnalysis bool // 是否启用查询分析
MaxAnalysisDepth int // 查询分析深度
}
// DefaultOptimizerConfig 返回默认的查询优化器配置
func DefaultOptimizerConfig() *OptimizerConfig {
return &OptimizerConfig{
CacheSize: 1000, // 最多缓存1000个查询
CacheTTL: 30 * time.Minute, // 缓存30分钟
EnableCache: true, // 启用缓存
SlowQueryThreshold: 100 * time.Millisecond, // 100ms以上为慢查询
EnableSlowLog: true, // 启用慢查询日志
MaxSlowLogs: 1000, // 最多记录1000条慢查询
EnableIndexSuggestions: true, // 启用索引建议
MaxSuggestions: 100, // 最多100个索引建议
EnableQueryAnalysis: true, // 启用查询分析
MaxAnalysisDepth: 3, // 分析深度3
}
}
// NewQueryOptimizer 创建新的查询优化器
func NewQueryOptimizer(config *OptimizerConfig) *QueryOptimizer {
if config == nil {
config = DefaultOptimizerConfig()
}
optimizer := &QueryOptimizer{
cache: NewQueryCache(config.CacheSize, config.CacheTTL),
stats: &QueryStats{},
config: config,
stopCh: make(chan struct{}),
slowQueries: make([]SlowQuery, 0),
indexSuggestions: make([]IndexSuggestion, 0),
}
// 启动维护协程
optimizer.StartMaintenance()
return optimizer
}
// OptimizeQuery 优化查询执行
func (o *QueryOptimizer) OptimizeQuery(ctx context.Context, client *MySQLClient, sqlStr string, database string) (*QueryResult, time.Duration, error) {
startTime := time.Now()
queryParams := o.parseQueryParams(sqlStr, database)
// 检查缓存
if o.config.EnableCache && queryParams.IsReadOnly {
cached, err := o.cache.Get(queryParams)
if err == nil && cached != nil {
o.recordCacheHit()
return cached.Result, time.Since(startTime), nil
}
}
// 执行查询
result, err := client.ExecuteQuery(ctx, sqlStr, database)
if err != nil {
duration := time.Since(startTime)
o.recordSlowQuery(sqlStr, database, duration, queryParams, result, err)
return nil, duration, err
}
duration := time.Since(startTime)
// 检查是否为慢查询
if duration > o.config.SlowQueryThreshold {
o.recordSlowQuery(sqlStr, database, duration, queryParams, result, err)
}
// 缓存只读查询结果
if o.config.EnableCache && queryParams.IsReadOnly && err == nil {
cachedResult := &CachedQuery{
Result: result,
ExpiryTime: time.Now().Add(o.config.CacheTTL),
CreatedAt: time.Now(),
QueryHash: o.generateQueryHash(queryParams),
QueryParams: queryParams,
LastUsed: time.Now(),
AccessCount: 1,
}
o.cache.Set(queryParams, cachedResult)
}
o.recordQuery(duration)
return result, duration, err
}
// ExecuteOptimizedUpdate 执行优化的更新操作
func (o *QueryOptimizer) ExecuteOptimizedUpdate(ctx context.Context, client *MySQLClient, sqlStr string, database string) (int64, time.Duration, error) {
startTime := time.Now()
// 分析更新查询
queryParams := o.parseQueryParams(sqlStr, database)
// 检查是否为批量操作
if o.isBatchOperation(sqlStr) {
// 优化批量操作
rowsAffected, duration, err := o.optimizeBatchUpdate(ctx, client, sqlStr, database)
if err != nil {
o.recordSlowQuery(sqlStr, database, duration, queryParams, nil, err)
return 0, duration, err
}
o.recordQuery(duration)
return rowsAffected, duration, nil
}
// 执行普通更新
rowsAffected, err := client.ExecuteUpdate(ctx, sqlStr, database)
duration := time.Since(startTime)
if duration > o.config.SlowQueryThreshold {
o.recordSlowQuery(sqlStr, database, duration, queryParams, nil, err)
}
o.recordQuery(duration)
return rowsAffected, duration, err
}
// GetIndexSuggestions 获取索引建议
func (o *QueryOptimizer) GetIndexSuggestions(table string) []IndexSuggestion {
o.mu.RLock()
defer o.mu.RUnlock()
var suggestions []IndexSuggestion
for _, suggestion := range o.indexSuggestions {
if suggestion.Table == table || table == "" {
suggestions = append(suggestions, suggestion)
}
}
return suggestions
}
// GenerateIndexSuggestions 为表生成索引建议
func (o *QueryOptimizer) GenerateIndexSuggestions(ctx context.Context, client *MySQLClient, database, table string) error {
// 获取表的慢查询记录
tableSlowQueries := o.getTableSlowQueries(database, table)
// 分析查询模式
for _, slowQuery := range tableSlowQueries {
suggestions := o.analyzeQueryForIndexes(slowQuery.Query, table)
o.mu.Lock()
o.indexSuggestions = append(o.indexSuggestions, suggestions...)
// 限制建议数量
if len(o.indexSuggestions) > o.config.MaxSuggestions {
o.indexSuggestions = o.indexSuggestions[:o.config.MaxSuggestions]
}
o.mu.Unlock()
}
return nil
}
// GetQueryStats 获取查询统计信息
func (o *QueryOptimizer) GetQueryStats() QueryStats {
o.mu.RLock()
defer o.mu.RUnlock()
return *o.stats
}
// GetSlowQueries 获取慢查询记录
func (o *QueryOptimizer) GetSlowQueries(limit int) []SlowQuery {
o.mu.RLock()
defer o.mu.RUnlock()
if limit <= 0 || limit > len(o.slowQueries) {
limit = len(o.slowQueries)
}
return o.slowQueries[:limit]
}
// ClearCache 清空缓存
func (o *QueryOptimizer) ClearCache() {
o.cache.Clear()
}
// Stop 停止优化器
func (o *QueryOptimizer) Stop() {
close(o.stopCh)
o.wg.Wait()
}
// parseQueryParams 解析查询参数
func (o *QueryOptimizer) parseQueryParams(sqlStr, database string) QueryParams {
params := QueryParams{
SQL: sqlStr,
Database: database,
}
// 解析LIMIT和OFFSET
limit, offset := o.parseLimitOffset(sqlStr)
params.Limit = limit
params.Offset = offset
// 解析表名
tables := o.parseTables(sqlStr)
if len(tables) > 0 {
params.Table = tables[0]
}
// 解析WHERE条件
where := o.parseWhereCondition(sqlStr)
params.Where = where
// 解析排序
sort := o.parseSortOrder(sqlStr)
params.SortBy = sort
// 判断是否为只读查询
params.IsReadOnly = o.isReadOnlyQuery(sqlStr)
return params
}
// parseLimitOffset 解析LIMIT和OFFSET
func (o *QueryOptimizer) parseLimitOffset(sqlStr string) (limit, offset int) {
sqlStr = strings.ToLower(sqlStr)
matches := reLimitOffset.FindStringSubmatch(sqlStr)
if len(matches) > 1 {
fmt.Sscanf(matches[1], "%d", &limit)
if len(matches) > 2 && matches[2] != "" {
fmt.Sscanf(matches[2], "%d", &offset)
}
}
// MySQL LIMIT offset, count: matches[1]=offset, matches[2]=count
if len(matches) > 2 && matches[2] != "" {
offset, limit = limit, offset
}
return limit, offset
}
// parseTables 解析查询中的表名
func (o *QueryOptimizer) parseTables(sqlStr string) []string {
// 简单实现解析FROM和JOIN中的表名
tables := make([]string, 0)
fromMatches := reFromTable.FindAllStringSubmatch(sqlStr, -1)
for _, match := range fromMatches {
if len(match) > 1 {
tableName := strings.Trim(match[1], "`\"'[]")
tables = append(tables, tableName)
}
}
return tables
}
// parseWhereCondition 解析WHERE条件
func (o *QueryOptimizer) parseWhereCondition(sqlStr string) string {
matches := reWhereClause.FindStringSubmatch(sqlStr)
if len(matches) > 1 {
return strings.TrimSpace(matches[1])
}
return ""
}
// parseSortOrder 解析排序条件
func (o *QueryOptimizer) parseSortOrder(sqlStr string) string {
matches := reOrderBy.FindStringSubmatch(sqlStr)
if len(matches) > 1 {
return strings.TrimSpace(matches[1])
}
return ""
}
// isReadOnlyQuery 判断是否为只读查询
func (o *QueryOptimizer) isReadOnlyQuery(sqlStr string) bool {
sqlStr = strings.ToUpper(strings.TrimSpace(sqlStr))
// SELECT只读查询
if strings.HasPrefix(sqlStr, "SELECT") {
return true
}
// 支持的只读查询类型
readOnlyQueries := []string{
"SHOW", "DESCRIBE", "DESC", "EXPLAIN",
"WITH", "UNION", "INTERSECT", "EXCEPT",
}
for _, query := range readOnlyQueries {
if strings.HasPrefix(sqlStr, query) {
return true
}
}
return false
}
// isBatchOperation 判断是否为批量操作
func (o *QueryOptimizer) isBatchOperation(sqlStr string) bool {
return reBatchOperation.MatchString(sqlStr)
}
// generateQueryHash 生成查询哈希
func (o *QueryOptimizer) generateQueryHash(params QueryParams) string {
hashData := fmt.Sprintf("%s|%s|%d|%d|%s|%s|%s|%v",
params.SQL, params.Database, params.Limit, params.Offset,
params.Table, params.Where, params.SortBy, params.IsReadOnly)
h := sha256.Sum256([]byte(hashData))
return fmt.Sprintf("%x", h)
}
// recordQuery 记录查询统计
func (o *QueryOptimizer) recordQuery(duration time.Duration) {
o.mu.Lock()
defer o.mu.Unlock()
o.stats.TotalQueries++
o.stats.TotalDuration += duration
o.stats.AverageDuration = time.Duration(int64(float64(o.stats.TotalDuration) / float64(o.stats.TotalQueries)))
now := time.Now()
if o.stats.LastCacheUpdate.IsZero() || now.Sub(o.stats.LastCacheUpdate) > 5*time.Minute {
// 更新缓存命中率
total := o.stats.TotalQueries
hit := o.stats.CachedQueries
o.stats.CacheHitRate = float64(hit) / float64(total) * 100
o.stats.LastCacheUpdate = now
}
}
// recordCacheHit 记录缓存命中
func (o *QueryOptimizer) recordCacheHit() {
o.mu.Lock()
defer o.mu.Unlock()
o.stats.CachedQueries++
}
// recordSlowQuery 记录慢查询
func (o *QueryOptimizer) recordSlowQuery(query, database string, duration time.Duration, params QueryParams, result *QueryResult, err error) {
if !o.config.EnableSlowLog {
return
}
slowQuery := SlowQuery{
Query: query,
Database: database,
Duration: duration,
Timestamp: time.Now(),
Params: params,
Table: params.Table,
IndexUsed: o.extractIndexUsed(query),
RowsAffected: o.extractRowsAffected(result),
Error: err,
}
o.mu.Lock()
defer o.mu.Unlock()
o.slowQueries = append(o.slowQueries, slowQuery)
// 限制慢查询记录数量
if len(o.slowQueries) > o.config.MaxSlowLogs {
o.slowQueries = o.slowQueries[1:]
}
o.stats.SlowQueries++
}
// extractIndexUsed 提取使用的索引
func (o *QueryOptimizer) extractIndexUsed(query string) string {
// 简单实现从EXPLAIN结果中提取索引信息
// 实际项目中应该执行EXPLAIN语句分析
return "unknown"
}
// extractRowsAffected 提取影响的行数
func (o *QueryOptimizer) extractRowsAffected(result *QueryResult) int64 {
if result != nil && len(result.Data) > 0 {
if rows, ok := result.Data[0]["rows_affected"].(int64); ok {
return rows
}
}
return 0
}
// analyzeQuery 分析查询性能
func (o *QueryOptimizer) analyzeQuery(query, database string, result *QueryResult, duration time.Duration) {
// 这里可以实现更复杂的查询分析逻辑
// 比如分析查询计划、检测N+1查询问题等
// 简单实现:记录查询到统计信息中
_ = query
_ = database
_ = result
_ = duration
}
// analyzeQueryForIndexes 分析查询为索引建议
func (o *QueryOptimizer) analyzeQueryForIndexes(query, table string) []IndexSuggestion {
var suggestions []IndexSuggestion
// 解析查询中的WHERE条件
where := o.parseWhereCondition(query)
if where != "" {
// 提取WHERE条件中的列
columns := o.extractColumnsFromWhere(where)
if len(columns) > 0 {
// 创建索引建议
suggestion := IndexSuggestion{
Table: table,
Columns: columns,
IndexType: "normal",
Priority: "medium",
Query: query,
Justification: fmt.Sprintf("查询经常使用WHERE条件 %s", where),
CanBeApplied: true,
}
suggestions = append(suggestions, suggestion)
}
}
// 解析ORDER BY条件
order := o.parseSortOrder(query)
if order != "" {
// 提取排序的列
columns := o.extractColumnsFromOrder(order)
if len(columns) > 0 {
// 创建排序索引建议
suggestion := IndexSuggestion{
Table: table,
Columns: columns,
IndexType: "normal",
Priority: "low",
Query: query,
Justification: fmt.Sprintf("查询经常使用ORDER BY %s", order),
CanBeApplied: true,
}
suggestions = append(suggestions, suggestion)
}
}
return suggestions
}
// extractColumnsFromWhere 从WHERE条件中提取列名
func (o *QueryOptimizer) extractColumnsFromWhere(where string) []string {
// 简单实现提取WHERE条件中的列名
columns := make([]string, 0)
// 这里可以实现更复杂的列名解析逻辑
// 目前只做简单处理
words := strings.Fields(where)
for _, word := range words {
// 去除运算符和引号
if !strings.Contains(word, "=") &&
!strings.Contains(word, ">") &&
!strings.Contains(word, "<") &&
!strings.Contains(word, "!=") &&
!strings.HasPrefix(word, "'") &&
!strings.HasPrefix(word, "\"") {
columns = append(columns, strings.Trim(word, " `\"'[]"))
}
}
return columns
}
// extractColumnsFromOrder 从ORDER BY条件中提取列名
func (o *QueryOptimizer) extractColumnsFromOrder(order string) []string {
// 简单实现提取ORDER BY中的列名
columns := strings.Split(order, ",")
for i, col := range columns {
columns[i] = strings.TrimSpace(strings.Split(col, " ")[0])
}
return columns
}
// getTableSlowQueries 获取表的慢查询记录
func (o *QueryOptimizer) getTableSlowQueries(database, table string) []SlowQuery {
o.mu.RLock()
defer o.mu.RUnlock()
var tableQueries []SlowQuery
for _, query := range o.slowQueries {
if (database == "" || query.Database == database) &&
(table == "" || query.Table == table) {
tableQueries = append(tableQueries, query)
}
}
return tableQueries
}
// optimizeBatchUpdate 优化批量更新操作
func (o *QueryOptimizer) optimizeBatchUpdate(ctx context.Context, client *MySQLClient, sqlStr string, database string) (int64, time.Duration, error) {
// 简单实现:执行原始查询
// 实际项目中可以实现批量操作优化
startTime := time.Now()
rowsAffected, err := client.ExecuteUpdate(ctx, sqlStr, database)
duration := time.Since(startTime)
return rowsAffected, duration, err
}
// StartMaintenance 启动维护协程
func (o *QueryOptimizer) StartMaintenance() {
o.wg.Add(1)
go func() {
defer o.wg.Done()
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// 清理过期的缓存
o.cache.CleanupExpired()
// 分析慢查询生成新的索引建议
o.analyzeSlowQueriesForSuggestions()
case <-o.stopCh:
return
}
}
}()
}
// RecordPoolError 记录连接池错误
func (o *QueryOptimizer) RecordPoolError(operation string, err error) {
if !o.config.EnableSlowLog || err == nil {
return
}
poolError := SlowQuery{
Query: operation,
Database: "pool",
Duration: 0,
Timestamp: time.Now(),
Params: QueryParams{SQL: operation},
Table: "connection_pool",
IndexUsed: "N/A",
RowsAffected: 0,
Error: err,
}
o.mu.Lock()
defer o.mu.Unlock()
o.slowQueries = append(o.slowQueries, poolError)
// 限制慢查询记录数量
if len(o.slowQueries) > o.config.MaxSlowLogs {
o.slowQueries = o.slowQueries[1:]
}
}
// analyzeSlowQueriesForSuggestions 分析慢查询生成索引建议
func (o *QueryOptimizer) analyzeSlowQueriesForSuggestions() {
// 这里可以实现更复杂的慢查询分析逻辑
// 比如分析查询模式、统计索引使用情况等
// 分析慢查询模式
o.analyzeSlowQueryPatterns()
}
// analyzeSlowQueryPatterns 分析慢查询模式
func (o *QueryOptimizer) analyzeSlowQueryPatterns() {
o.mu.RLock()
queryTypes := make(map[string]int)
tableQueries := make(map[string]int)
for _, query := range o.slowQueries {
queryType := o.detectQueryType(query.Query)
queryTypes[queryType]++
if query.Table != "" {
tableQueries[query.Table]++
}
}
o.mu.RUnlock()
// 根据统计结果生成智能建议(在锁外执行,避免死锁)
o.generateSmartSuggestions(queryTypes, tableQueries)
}
// detectQueryType 检测查询类型
func (o *QueryOptimizer) detectQueryType(sqlStr string) string {
sqlStr = strings.ToUpper(strings.TrimSpace(sqlStr))
if strings.HasPrefix(sqlStr, "SELECT") {
if strings.Contains(sqlStr, "JOIN") {
return "SELECT_JOIN"
} else if strings.Contains(sqlStr, "GROUP BY") {
return "SELECT_GROUP"
} else {
return "SELECT_SIMPLE"
}
} else if strings.HasPrefix(sqlStr, "INSERT") {
return "INSERT"
} else if strings.HasPrefix(sqlStr, "UPDATE") {
return "UPDATE"
} else if strings.HasPrefix(sqlStr, "DELETE") {
return "DELETE"
}
return "OTHER"
}
// generateSmartSuggestions 生成智能建议
func (o *QueryOptimizer) generateSmartSuggestions(queryTypes map[string]int, tableQueries map[string]int) {
// 分析频繁执行的查询类型
var mostFrequentType string
var maxCount int
for queryType, count := range queryTypes {
if count > maxCount {
maxCount = count
mostFrequentType = queryType
}
}
// 生成针对性的索引建议
switch mostFrequentType {
case "SELECT_JOIN":
// 为JOIN查询建议复合索引
o.generateJoinSuggestions()
case "SELECT_GROUP":
// 为GROUP BY查询建议索引
o.generateGroupSuggestions()
case "INSERT":
// 为批量插入建议优化
o.generateInsertSuggestions()
}
}
// generateJoinSuggestions 生成JOIN查询建议
func (o *QueryOptimizer) generateJoinSuggestions() {
}
// generateGroupSuggestions 生成GROUP BY查询建议
func (o *QueryOptimizer) generateGroupSuggestions() {
}
// generateInsertSuggestions 生成批量插入建议
func (o *QueryOptimizer) generateInsertSuggestions() {
}