package dbclient import ( "context" "fmt" "net/url" "go-desk/internal/common" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" ) // MongoClient MongoDB 客户端 type MongoClient struct { client *mongo.Client database *mongo.Database config *MongoConfig } // MongoConfig MongoDB 配置 type MongoConfig struct { Host string Port int Username string Password string Database string AuthSource string // 认证数据库,默认为 "admin" AuthMechanism string // 认证机制,如 "SCRAM-SHA-1", "SCRAM-SHA-256" 等 } // NewMongoClient 创建 MongoDB 客户端 func NewMongoClient(config *MongoConfig) (*MongoClient, error) { // 确定认证数据库,默认为 admin authSource := config.AuthSource if authSource == "" { authSource = "admin" } // 如果指定了认证机制,直接使用;否则尝试自动检测 authMechanisms := []string{} if config.AuthMechanism != "" { // 用户明确指定了认证机制,只使用该机制 authMechanisms = []string{config.AuthMechanism} } else { // 未指定时,先尝试 SCRAM-SHA-256(更安全),失败则尝试 SCRAM-SHA-1 authMechanisms = []string{"SCRAM-SHA-256", "SCRAM-SHA-1"} } var lastErr error for _, authMechanism := range authMechanisms { client, err := tryConnectMongo(config, authSource, authMechanism) if err == nil { return client, nil } lastErr = err // 如果明确指定了认证机制,失败后不再尝试其他机制 if config.AuthMechanism != "" { break } } // 所有认证机制都失败 if lastErr != nil { return nil, fmt.Errorf("MongoDB 连接测试失败: %v", lastErr) } return nil, fmt.Errorf("MongoDB 连接失败: 未知错误") } // tryConnectMongo 尝试使用指定的认证机制连接 MongoDB func tryConnectMongo(config *MongoConfig, authSource, authMechanism string) (*MongoClient, error) { // 构建连接 URI var uri string if config.Username != "" && config.Password != "" { // 使用 url.UserPassword 正确转义用户名和密码中的特殊字符 // 这会正确处理 @、:、/ 等特殊字符 userInfo := url.UserPassword(config.Username, config.Password) // 构建基础 URI uri = fmt.Sprintf("mongodb://%s@%s:%d", userInfo.String(), config.Host, config.Port) // 添加数据库和认证源参数 params := url.Values{} params.Set("authSource", authSource) // 添加认证机制参数 if authMechanism != "" { params.Set("authMechanism", authMechanism) } // 如果有业务数据库,添加到路径中 if config.Database != "" { uri = fmt.Sprintf("%s/%s?%s", uri, config.Database, params.Encode()) } else { // MongoDB URI 要求查询参数前必须有 /,即使没有数据库名 uri = fmt.Sprintf("%s/?%s", uri, params.Encode()) } } else if config.Database != "" { // 没有认证信息时,数据库部分用于指定默认数据库 uri = fmt.Sprintf("mongodb://%s:%d/%s", config.Host, config.Port, config.Database) } else { uri = fmt.Sprintf("mongodb://%s:%d", config.Host, config.Port) } // 客户端选项 clientOptions := options.Client(). ApplyURI(uri). SetConnectTimeout(common.TimeoutConnect). SetServerSelectionTimeout(common.TimeoutConnect) // 创建客户端 ctx, cancel := context.WithTimeout(context.Background(), common.TimeoutConnect) defer cancel() client, err := mongo.Connect(ctx, clientOptions) if err != nil { return nil, fmt.Errorf("连接 MongoDB 失败: %v", err) } // 测试连接 if err := client.Ping(ctx, nil); err != nil { client.Disconnect(ctx) return nil, fmt.Errorf("MongoDB 连接测试失败: %v", err) } var database *mongo.Database if config.Database != "" { database = client.Database(config.Database) } return &MongoClient{ client: client, database: database, config: config, }, nil } // TestMongoConnection 测试连接 func TestMongoConnection(host string, port int, username, password, database string) error { return TestMongoConnectionWithAuthSource(host, port, username, password, database, "") } // TestMongoConnectionWithAuthSource 测试连接(支持指定认证数据库) func TestMongoConnectionWithAuthSource(host string, port int, username, password, database, authSource string) error { return TestMongoConnectionWithOptions(host, port, username, password, database, authSource, "") } // TestMongoConnectionWithOptions 测试连接(支持指定认证数据库和认证机制) func TestMongoConnectionWithOptions(host string, port int, username, password, database, authSource, authMechanism string) error { config := &MongoConfig{ Host: host, Port: port, Username: username, Password: password, Database: database, AuthSource: authSource, AuthMechanism: authMechanism, } client, err := NewMongoClient(config) if err != nil { return err } defer client.Close() return nil } // Close 关闭连接 func (c *MongoClient) Close() error { if c.client != nil { ctx, cancel := context.WithTimeout(context.Background(), common.TimeoutConnect) defer cancel() return c.client.Disconnect(ctx) } return nil } // ListDatabases 获取数据库列表 func (c *MongoClient) ListDatabases(ctx context.Context) ([]string, error) { databases, err := c.client.ListDatabaseNames(ctx, bson.M{}) return databases, err } // ListCollections 获取集合列表 func (c *MongoClient) ListCollections(ctx context.Context, database string) ([]string, error) { db := c.client.Database(database) collections, err := db.ListCollectionNames(ctx, bson.M{}) return collections, err } // GetCollectionStructure 获取集合结构 func (c *MongoClient) GetCollectionStructure(ctx context.Context, database, collectionName string) (map[string]interface{}, error) { coll := c.client.Database(database).Collection(collectionName) result := map[string]interface{}{ "database": database, "collection": collectionName, "sampleDocs": []map[string]interface{}{}, "fieldStats": map[string]int{}, "indexes": []map[string]interface{}{}, "documentCount": int64(0), } // 获取文档示例(最多 5 个) opts := options.Find().SetLimit(5) cursor, err := coll.Find(ctx, bson.M{}, opts) if err != nil { return nil, fmt.Errorf("获取文档示例失败: %v", err) } defer cursor.Close(ctx) var docs []bson.M if err = cursor.All(ctx, &docs); err != nil { return nil, fmt.Errorf("解析文档失败: %v", err) } // 转换为 map sampleDocs := make([]map[string]interface{}, 0, len(docs)) for _, doc := range docs { docMap := make(map[string]interface{}) for k, v := range doc { docMap[k] = v } sampleDocs = append(sampleDocs, docMap) } result["sampleDocs"] = sampleDocs // 字段统计:使用 $sample 聚合管道随机采样10个文档进行统计 // 这样可以获得更准确的字段分布,同时保持良好性能 // 使用异步方式执行,避免阻塞主流程 sampleSize := 10 pipeline := []bson.M{ {"$sample": bson.M{"size": sampleSize}}, {"$project": bson.M{"keys": bson.M{"$objectToArray": "$$ROOT"}}}, {"$unwind": "$keys"}, {"$group": bson.M{ "_id": "$keys.k", "count": bson.M{"$sum": 1}, }}, {"$sort": bson.M{"count": -1}}, // 按出现次数降序排序 } sampleCursor, err := coll.Aggregate(ctx, pipeline) if err != nil { // 如果采样失败,回退到基于文档示例的统计 fieldCount := make(map[string]int) for _, doc := range docs { for key := range doc { fieldCount[key]++ } } result["fieldStats"] = fieldCount result["fieldStatsSampleSize"] = len(docs) // 记录实际采样数量 result["fieldStatsMethod"] = "sample-docs" // 标记统计方式 } else { defer sampleCursor.Close(ctx) fieldCount := make(map[string]int) for sampleCursor.Next(ctx) { var statResult bson.M if err := sampleCursor.Decode(&statResult); err != nil { continue } fieldName, ok := statResult["_id"].(string) if !ok { continue } var count int switch v := statResult["count"].(type) { case int32: count = int(v) case int64: count = int(v) case int: count = v case float64: count = int(v) default: continue } fieldCount[fieldName] = count } result["fieldStats"] = fieldCount result["fieldStatsSampleSize"] = sampleSize // 记录采样数量 result["fieldStatsMethod"] = "sample-aggregate" // 标记统计方式 } // 文档总数(使用估算值,性能更好) // 对于大数据集,estimatedDocumentCount 比 CountDocuments 快得多 // 如果需要精确值,可以使用 CountDocuments,但性能较差 count, err := coll.EstimatedDocumentCount(ctx) if err != nil { // 如果估算失败,尝试精确计数(可能较慢) count, err = coll.CountDocuments(ctx, bson.M{}) if err != nil { return nil, fmt.Errorf("获取文档数量失败: %v", err) } } result["documentCount"] = count // 索引信息 indexCursor, err := coll.Indexes().List(ctx) if err != nil { // 索引查询失败不影响主流程 result["indexes"] = []map[string]interface{}{} } else { var indexes []map[string]interface{} for indexCursor.Next(ctx) { var indexSpec bson.M if err := indexCursor.Decode(&indexSpec); err != nil { continue } indexes = append(indexes, map[string]interface{}{ "name": indexSpec["name"], "unique": indexSpec["unique"], "keys": indexSpec["key"], }) } indexCursor.Close(ctx) result["indexes"] = indexes } return result, nil } // ExecuteQuery 执行查询 func (c *MongoClient) ExecuteQuery(ctx context.Context, database, collection string, filter bson.M, limit int64) ([]map[string]interface{}, error) { db := c.client.Database(database) coll := db.Collection(collection) opts := options.Find().SetLimit(limit) cursor, err := coll.Find(ctx, filter, opts) if err != nil { return nil, fmt.Errorf("查询失败: %v", err) } defer cursor.Close(ctx) var results []map[string]interface{} if err := cursor.All(ctx, &results); err != nil { return nil, fmt.Errorf("读取结果失败: %v", err) } return results, nil } // CountDocuments 获取文档数量 func (c *MongoClient) CountDocuments(ctx context.Context, database, collection string, filter bson.M) (int64, error) { db := c.client.Database(database) coll := db.Collection(collection) return coll.CountDocuments(ctx, filter) } // ExecuteCommand 执行 MongoDB 命令 // command 可以是 JSON 格式的字符串,格式:{"op": "find", "database": "test", "collection": "users", "filter": {}, "limit": 100} // 支持的操作:find, count, insertOne, insertMany, updateOne, updateMany, deleteOne, deleteMany func (c *MongoClient) ExecuteCommand(ctx context.Context, database string, command map[string]interface{}) (interface{}, error) { op, ok := command["op"].(string) if !ok { return nil, fmt.Errorf("命令中缺少 'op' 字段或格式错误") } collectionName, ok := command["collection"].(string) if !ok { return nil, fmt.Errorf("命令中缺少 'collection' 字段或格式错误") } // 如果没有指定数据库,使用配置中的默认数据库 if database == "" { if c.config != nil && c.config.Database != "" { database = c.config.Database } else { return nil, fmt.Errorf("需要指定数据库名称") } } db := c.client.Database(database) coll := db.Collection(collectionName) switch op { case "find": filter := bson.M{} if f, ok := command["filter"]; ok { if filterMap, ok := f.(map[string]interface{}); ok { filter = bson.M(filterMap) } } limit := int64(100) if l, ok := command["limit"]; ok { if limitVal, ok := l.(float64); ok { limit = int64(limitVal) } else if limitVal, ok := l.(int64); ok { limit = limitVal } } opts := options.Find().SetLimit(limit) cursor, err := coll.Find(ctx, filter, opts) if err != nil { return nil, fmt.Errorf("查询失败: %v", err) } defer cursor.Close(ctx) var results []map[string]interface{} if err := cursor.All(ctx, &results); err != nil { return nil, fmt.Errorf("读取结果失败: %v", err) } return results, nil case "count": filter := bson.M{} if f, ok := command["filter"]; ok { if filterMap, ok := f.(map[string]interface{}); ok { filter = bson.M(filterMap) } } count, err := coll.CountDocuments(ctx, filter) if err != nil { return nil, fmt.Errorf("统计失败: %v", err) } return count, nil case "insertOne": document, ok := command["document"] if !ok { return nil, fmt.Errorf("insertOne 操作需要 'document' 字段") } doc := bson.M{} if docMap, ok := document.(map[string]interface{}); ok { doc = bson.M(docMap) } else { return nil, fmt.Errorf("document 必须是对象格式") } result, err := coll.InsertOne(ctx, doc) if err != nil { return nil, fmt.Errorf("插入失败: %v", err) } return map[string]interface{}{ "insertedId": result.InsertedID, }, nil case "insertMany": documents, ok := command["documents"] if !ok { return nil, fmt.Errorf("insertMany 操作需要 'documents' 字段") } docs := []interface{}{} if docsSlice, ok := documents.([]interface{}); ok { for _, d := range docsSlice { if docMap, ok := d.(map[string]interface{}); ok { docs = append(docs, bson.M(docMap)) } } } else { return nil, fmt.Errorf("documents 必须是数组格式") } result, err := coll.InsertMany(ctx, docs) if err != nil { return nil, fmt.Errorf("批量插入失败: %v", err) } return map[string]interface{}{ "insertedIds": result.InsertedIDs, "insertedCount": len(result.InsertedIDs), }, nil case "updateOne": filter := bson.M{} if f, ok := command["filter"]; ok { if filterMap, ok := f.(map[string]interface{}); ok { filter = bson.M(filterMap) } } else { return nil, fmt.Errorf("updateOne 操作需要 'filter' 字段") } update, ok := command["update"] if !ok { return nil, fmt.Errorf("updateOne 操作需要 'update' 字段") } updateDoc := bson.M{} if updateMap, ok := update.(map[string]interface{}); ok { updateDoc = bson.M(updateMap) } else { return nil, fmt.Errorf("update 必须是对象格式") } result, err := coll.UpdateOne(ctx, filter, bson.M{"$set": updateDoc}) if err != nil { return nil, fmt.Errorf("更新失败: %v", err) } return map[string]interface{}{ "matchedCount": result.MatchedCount, "modifiedCount": result.ModifiedCount, }, nil case "updateMany": filter := bson.M{} if f, ok := command["filter"]; ok { if filterMap, ok := f.(map[string]interface{}); ok { filter = bson.M(filterMap) } } else { return nil, fmt.Errorf("updateMany 操作需要 'filter' 字段") } update, ok := command["update"] if !ok { return nil, fmt.Errorf("updateMany 操作需要 'update' 字段") } updateDoc := bson.M{} if updateMap, ok := update.(map[string]interface{}); ok { updateDoc = bson.M(updateMap) } else { return nil, fmt.Errorf("update 必须是对象格式") } result, err := coll.UpdateMany(ctx, filter, bson.M{"$set": updateDoc}) if err != nil { return nil, fmt.Errorf("批量更新失败: %v", err) } return map[string]interface{}{ "matchedCount": result.MatchedCount, "modifiedCount": result.ModifiedCount, }, nil case "deleteOne": filter := bson.M{} if f, ok := command["filter"]; ok { if filterMap, ok := f.(map[string]interface{}); ok { filter = bson.M(filterMap) } } else { return nil, fmt.Errorf("deleteOne 操作需要 'filter' 字段") } result, err := coll.DeleteOne(ctx, filter) if err != nil { return nil, fmt.Errorf("删除失败: %v", err) } return map[string]interface{}{ "deletedCount": result.DeletedCount, }, nil case "deleteMany": filter := bson.M{} if f, ok := command["filter"]; ok { if filterMap, ok := f.(map[string]interface{}); ok { filter = bson.M(filterMap) } } else { return nil, fmt.Errorf("deleteMany 操作需要 'filter' 字段") } result, err := coll.DeleteMany(ctx, filter) if err != nil { return nil, fmt.Errorf("批量删除失败: %v", err) } return map[string]interface{}{ "deletedCount": result.DeletedCount, }, nil default: return nil, fmt.Errorf("不支持的操作: %s,支持的操作: find, count, insertOne, insertMany, updateOne, updateMany, deleteOne, deleteMany", op) } } // PreviewCollectionIndexes 预览集合索引变更,只生成命令列表不执行 func (c *MongoClient) PreviewCollectionIndexes(ctx context.Context, database, collectionName string, structure map[string]interface{}) ([]string, error) { coll := c.client.Database(database).Collection(collectionName) var commands []string // 获取当前索引 currentIndexes, err := coll.Indexes().List(ctx) if err != nil { return nil, fmt.Errorf("获取当前索引失败: %v", err) } defer currentIndexes.Close(ctx) // 解析新的索引数据 var newIndexes []map[string]interface{} if idxs, ok := structure["indexes"].([]interface{}); ok { for _, idx := range idxs { if idxMap, ok := idx.(map[string]interface{}); ok { newIndexes = append(newIndexes, idxMap) } } } // 创建当前索引名映射 currentIndexMap := make(map[string]bool) for currentIndexes.Next(ctx) { var indexSpec bson.M if err := currentIndexes.Decode(&indexSpec); err != nil { continue } if name, ok := indexSpec["name"].(string); ok && name != "_id_" { currentIndexMap[name] = true } } // 创建新索引名映射 newIndexMap := make(map[string]bool) for _, idx := range newIndexes { if name, ok := idx["name"].(string); ok && name != "" && name != "_id_" { newIndexMap[name] = true } } // 删除不存在的索引 for name := range currentIndexMap { if !newIndexMap[name] { cmd := fmt.Sprintf("db.%s.dropIndex(\"%s\")", collectionName, name) commands = append(commands, cmd) } } // 添加或更新索引 for _, idx := range newIndexes { name, _ := idx["name"].(string) if name == "" || name == "_id_" { continue } // 构建索引键 keys := bson.D{} if keysData, ok := idx["keys"].(map[string]interface{}); ok { for k, v := range keysData { var order int if vFloat, ok := v.(float64); ok { order = int(vFloat) } else if vInt, ok := v.(int); ok { order = vInt } else { order = 1 // 默认升序 } keys = append(keys, bson.E{Key: k, Value: order}) } } else if columnName, ok := idx["Column_name"].(string); ok && columnName != "" { // 兼容 MySQL 格式的索引数据 keys = append(keys, bson.E{Key: columnName, Value: 1}) } if len(keys) == 0 { continue } // 构建索引选项 indexOptions := options.Index() indexOptions.SetName(name) if unique, ok := idx["unique"].(bool); ok && unique { indexOptions.SetUnique(true) } else if nonUnique, ok := idx["Non_unique"].(float64); ok && nonUnique == 0 { indexOptions.SetUnique(true) } // 如果索引已存在,先删除再创建 if currentIndexMap[name] { dropCmd := fmt.Sprintf("db.%s.dropIndex(\"%s\")", collectionName, name) commands = append(commands, dropCmd) } // 构建命令字符串(MongoDB shell 格式) keysStr := "{" for i, key := range keys { if i > 0 { keysStr += ", " } keysStr += fmt.Sprintf("%s: %d", key.Key, key.Value) } keysStr += "}" optionsStr := "{name: \"" + name + "\"" if indexOptions.Unique != nil && *indexOptions.Unique { optionsStr += ", unique: true" } optionsStr += "}" cmd := fmt.Sprintf("db.%s.createIndex(%s, %s)", collectionName, keysStr, optionsStr) commands = append(commands, cmd) } return commands, nil } // UpdateCollectionIndexes 更新集合索引,返回执行的命令列表 func (c *MongoClient) UpdateCollectionIndexes(ctx context.Context, database, collectionName string, structure map[string]interface{}) ([]string, error) { // 先预览生成命令列表 commands, err := c.PreviewCollectionIndexes(ctx, database, collectionName, structure) if err != nil { return nil, err } coll := c.client.Database(database).Collection(collectionName) // 获取当前索引 currentIndexes, err := coll.Indexes().List(ctx) if err != nil { return commands, fmt.Errorf("获取当前索引失败: %v", err) } defer currentIndexes.Close(ctx) // 解析新的索引数据 var newIndexes []map[string]interface{} if idxs, ok := structure["indexes"].([]interface{}); ok { for _, idx := range idxs { if idxMap, ok := idx.(map[string]interface{}); ok { newIndexes = append(newIndexes, idxMap) } } } // 创建当前索引名映射 currentIndexMap := make(map[string]bool) for currentIndexes.Next(ctx) { var indexSpec bson.M if err := currentIndexes.Decode(&indexSpec); err != nil { continue } if name, ok := indexSpec["name"].(string); ok && name != "_id_" { currentIndexMap[name] = true } } // 创建新索引名映射 newIndexMap := make(map[string]bool) for _, idx := range newIndexes { if name, ok := idx["name"].(string); ok && name != "" && name != "_id_" { newIndexMap[name] = true } } // 删除不存在的索引 for name := range currentIndexMap { if !newIndexMap[name] { _, err := coll.Indexes().DropOne(ctx, name) if err != nil { return commands, fmt.Errorf("删除索引失败: %v, 索引名: %s", err, name) } } } // 添加或更新索引 for _, idx := range newIndexes { name, _ := idx["name"].(string) if name == "" || name == "_id_" { continue } // 构建索引键 keys := bson.D{} if keysData, ok := idx["keys"].(map[string]interface{}); ok { for k, v := range keysData { var order int if vFloat, ok := v.(float64); ok { order = int(vFloat) } else if vInt, ok := v.(int); ok { order = vInt } else { order = 1 // 默认升序 } keys = append(keys, bson.E{Key: k, Value: order}) } } else if columnName, ok := idx["Column_name"].(string); ok && columnName != "" { // 兼容 MySQL 格式的索引数据 keys = append(keys, bson.E{Key: columnName, Value: 1}) } if len(keys) == 0 { continue } // 构建索引选项 indexOptions := options.Index() indexOptions.SetName(name) if unique, ok := idx["unique"].(bool); ok && unique { indexOptions.SetUnique(true) } else if nonUnique, ok := idx["Non_unique"].(float64); ok && nonUnique == 0 { indexOptions.SetUnique(true) } // 创建索引 indexModel := mongo.IndexModel{ Keys: keys, Options: indexOptions, } // 如果索引已存在,先删除再创建 if currentIndexMap[name] { _, err := coll.Indexes().DropOne(ctx, name) if err != nil { return commands, fmt.Errorf("删除旧索引失败: %v, 索引名: %s", err, name) } } _, err := coll.Indexes().CreateOne(ctx, indexModel) if err != nil { return commands, fmt.Errorf("创建索引失败: %v, 索引名: %s", err, name) } } return commands, nil }