Private
Public Access
1
0
Files
u-desk/internal/dbclient/mongo.go

820 lines
23 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package dbclient
import (
"context"
"fmt"
"net/url"
"u-desk/internal/common"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
// MongoClient MongoDB 客户端
type MongoClient struct {
client *mongo.Client
database *mongo.Database
config *MongoConfig
}
// MongoConfig MongoDB 配置
type MongoConfig struct {
Host string
Port int
Username string
Password string
Database string
AuthSource string // 认证数据库,默认为 "admin"
AuthMechanism string // 认证机制,如 "SCRAM-SHA-1", "SCRAM-SHA-256" 等
}
// NewMongoClient 创建 MongoDB 客户端
func NewMongoClient(config *MongoConfig) (*MongoClient, error) {
// 确定认证数据库,默认为 admin
authSource := config.AuthSource
if authSource == "" {
authSource = "admin"
}
// 如果指定了认证机制,直接使用;否则尝试自动检测
authMechanisms := []string{}
if config.AuthMechanism != "" {
// 用户明确指定了认证机制,只使用该机制
authMechanisms = []string{config.AuthMechanism}
} else {
// 未指定时,先尝试 SCRAM-SHA-256更安全失败则尝试 SCRAM-SHA-1
authMechanisms = []string{"SCRAM-SHA-256", "SCRAM-SHA-1"}
}
var lastErr error
for _, authMechanism := range authMechanisms {
client, err := tryConnectMongo(config, authSource, authMechanism)
if err == nil {
return client, nil
}
lastErr = err
// 如果明确指定了认证机制,失败后不再尝试其他机制
if config.AuthMechanism != "" {
break
}
}
// 所有认证机制都失败
if lastErr != nil {
return nil, fmt.Errorf("MongoDB 连接测试失败: %v", lastErr)
}
return nil, fmt.Errorf("MongoDB 连接失败: 未知错误")
}
// tryConnectMongo 尝试使用指定的认证机制连接 MongoDB
func tryConnectMongo(config *MongoConfig, authSource, authMechanism string) (*MongoClient, error) {
// 构建连接 URI
var uri string
if config.Username != "" && config.Password != "" {
// 使用 url.UserPassword 正确转义用户名和密码中的特殊字符
// 这会正确处理 @、:、/ 等特殊字符
userInfo := url.UserPassword(config.Username, config.Password)
// 构建基础 URI
uri = fmt.Sprintf("mongodb://%s@%s:%d", userInfo.String(), config.Host, config.Port)
// 添加数据库和认证源参数
params := url.Values{}
params.Set("authSource", authSource)
// 添加认证机制参数
if authMechanism != "" {
params.Set("authMechanism", authMechanism)
}
// 如果有业务数据库,添加到路径中
if config.Database != "" {
uri = fmt.Sprintf("%s/%s?%s", uri, config.Database, params.Encode())
} else {
// MongoDB URI 要求查询参数前必须有 /,即使没有数据库名
uri = fmt.Sprintf("%s/?%s", uri, params.Encode())
}
} else if config.Database != "" {
// 没有认证信息时,数据库部分用于指定默认数据库
uri = fmt.Sprintf("mongodb://%s:%d/%s", config.Host, config.Port, config.Database)
} else {
uri = fmt.Sprintf("mongodb://%s:%d", config.Host, config.Port)
}
// 客户端选项
clientOptions := options.Client().
ApplyURI(uri).
SetConnectTimeout(common.TimeoutConnect).
SetServerSelectionTimeout(common.TimeoutConnect)
// 创建客户端
ctx, cancel := context.WithTimeout(context.Background(), common.TimeoutConnect)
defer cancel()
client, err := mongo.Connect(ctx, clientOptions)
if err != nil {
return nil, fmt.Errorf("连接 MongoDB 失败: %v", err)
}
// 测试连接
if err := client.Ping(ctx, nil); err != nil {
client.Disconnect(ctx)
return nil, fmt.Errorf("MongoDB 连接测试失败: %v", err)
}
var database *mongo.Database
if config.Database != "" {
database = client.Database(config.Database)
}
return &MongoClient{
client: client,
database: database,
config: config,
}, nil
}
// TestMongoConnection 测试连接
func TestMongoConnection(host string, port int, username, password, database string) error {
return TestMongoConnectionWithAuthSource(host, port, username, password, database, "")
}
// TestMongoConnectionWithAuthSource 测试连接(支持指定认证数据库)
func TestMongoConnectionWithAuthSource(host string, port int, username, password, database, authSource string) error {
return TestMongoConnectionWithOptions(host, port, username, password, database, authSource, "")
}
// TestMongoConnectionWithOptions 测试连接(支持指定认证数据库和认证机制)
func TestMongoConnectionWithOptions(host string, port int, username, password, database, authSource, authMechanism string) error {
config := &MongoConfig{
Host: host,
Port: port,
Username: username,
Password: password,
Database: database,
AuthSource: authSource,
AuthMechanism: authMechanism,
}
client, err := NewMongoClient(config)
if err != nil {
return err
}
defer client.Close()
return nil
}
// Close 关闭连接
func (c *MongoClient) Close() error {
if c.client != nil {
ctx, cancel := context.WithTimeout(context.Background(), common.TimeoutConnect)
defer cancel()
return c.client.Disconnect(ctx)
}
return nil
}
// ListDatabases 获取数据库列表
func (c *MongoClient) ListDatabases(ctx context.Context) ([]string, error) {
databases, err := c.client.ListDatabaseNames(ctx, bson.M{})
return databases, err
}
// ListCollections 获取集合列表
func (c *MongoClient) ListCollections(ctx context.Context, database string) ([]string, error) {
db := c.client.Database(database)
collections, err := db.ListCollectionNames(ctx, bson.M{})
return collections, err
}
// GetCollectionStructure 获取集合结构
func (c *MongoClient) GetCollectionStructure(ctx context.Context, database, collectionName string) (map[string]interface{}, error) {
coll := c.client.Database(database).Collection(collectionName)
result := map[string]interface{}{
"database": database,
"collection": collectionName,
"sampleDocs": []map[string]interface{}{},
"fieldStats": map[string]int{},
"indexes": []map[string]interface{}{},
"documentCount": int64(0),
}
// 获取文档示例(最多 5 个)
opts := options.Find().SetLimit(5)
cursor, err := coll.Find(ctx, bson.M{}, opts)
if err != nil {
return nil, fmt.Errorf("获取文档示例失败: %v", err)
}
defer cursor.Close(ctx)
var docs []bson.M
if err = cursor.All(ctx, &docs); err != nil {
return nil, fmt.Errorf("解析文档失败: %v", err)
}
// 转换为 map
sampleDocs := make([]map[string]interface{}, 0, len(docs))
for _, doc := range docs {
docMap := make(map[string]interface{})
for k, v := range doc {
docMap[k] = v
}
sampleDocs = append(sampleDocs, docMap)
}
result["sampleDocs"] = sampleDocs
// 字段统计:使用 $sample 聚合管道随机采样10个文档进行统计
// 这样可以获得更准确的字段分布,同时保持良好性能
// 使用异步方式执行,避免阻塞主流程
sampleSize := 10
pipeline := []bson.M{
{"$sample": bson.M{"size": sampleSize}},
{"$project": bson.M{"keys": bson.M{"$objectToArray": "$$ROOT"}}},
{"$unwind": "$keys"},
{"$group": bson.M{
"_id": "$keys.k",
"count": bson.M{"$sum": 1},
}},
{"$sort": bson.M{"count": -1}}, // 按出现次数降序排序
}
sampleCursor, err := coll.Aggregate(ctx, pipeline)
if err != nil {
// 如果采样失败,回退到基于文档示例的统计
fieldCount := make(map[string]int)
for _, doc := range docs {
for key := range doc {
fieldCount[key]++
}
}
result["fieldStats"] = fieldCount
result["fieldStatsSampleSize"] = len(docs) // 记录实际采样数量
result["fieldStatsMethod"] = "sample-docs" // 标记统计方式
} else {
defer sampleCursor.Close(ctx)
fieldCount := make(map[string]int)
for sampleCursor.Next(ctx) {
var statResult bson.M
if err := sampleCursor.Decode(&statResult); err != nil {
continue
}
fieldName, ok := statResult["_id"].(string)
if !ok {
continue
}
var count int
switch v := statResult["count"].(type) {
case int32:
count = int(v)
case int64:
count = int(v)
case int:
count = v
case float64:
count = int(v)
default:
continue
}
fieldCount[fieldName] = count
}
result["fieldStats"] = fieldCount
result["fieldStatsSampleSize"] = sampleSize // 记录采样数量
result["fieldStatsMethod"] = "sample-aggregate" // 标记统计方式
}
// 文档总数(使用估算值,性能更好)
// 对于大数据集estimatedDocumentCount 比 CountDocuments 快得多
// 如果需要精确值,可以使用 CountDocuments但性能较差
count, err := coll.EstimatedDocumentCount(ctx)
if err != nil {
// 如果估算失败,尝试精确计数(可能较慢)
count, err = coll.CountDocuments(ctx, bson.M{})
if err != nil {
return nil, fmt.Errorf("获取文档数量失败: %v", err)
}
}
result["documentCount"] = count
// 索引信息
indexCursor, err := coll.Indexes().List(ctx)
if err != nil {
// 索引查询失败不影响主流程
result["indexes"] = []map[string]interface{}{}
} else {
var indexes []map[string]interface{}
for indexCursor.Next(ctx) {
var indexSpec bson.M
if err := indexCursor.Decode(&indexSpec); err != nil {
continue
}
indexes = append(indexes, map[string]interface{}{
"name": indexSpec["name"],
"unique": indexSpec["unique"],
"keys": indexSpec["key"],
})
}
indexCursor.Close(ctx)
result["indexes"] = indexes
}
return result, nil
}
// ExecuteQuery 执行查询
func (c *MongoClient) ExecuteQuery(ctx context.Context, database, collection string, filter bson.M, limit int64) ([]map[string]interface{}, error) {
db := c.client.Database(database)
coll := db.Collection(collection)
opts := options.Find().SetLimit(limit)
cursor, err := coll.Find(ctx, filter, opts)
if err != nil {
return nil, fmt.Errorf("查询失败: %v", err)
}
defer cursor.Close(ctx)
var results []map[string]interface{}
if err := cursor.All(ctx, &results); err != nil {
return nil, fmt.Errorf("读取结果失败: %v", err)
}
return results, nil
}
// CountDocuments 获取文档数量
func (c *MongoClient) CountDocuments(ctx context.Context, database, collection string, filter bson.M) (int64, error) {
db := c.client.Database(database)
coll := db.Collection(collection)
return coll.CountDocuments(ctx, filter)
}
// ExecuteCommand 执行 MongoDB 命令
// command 可以是 JSON 格式的字符串,格式:{"op": "find", "database": "test", "collection": "users", "filter": {}, "limit": 100}
// 支持的操作find, count, insertOne, insertMany, updateOne, updateMany, deleteOne, deleteMany
func (c *MongoClient) ExecuteCommand(ctx context.Context, database string, command map[string]interface{}) (interface{}, error) {
op, ok := command["op"].(string)
if !ok {
return nil, fmt.Errorf("命令中缺少 'op' 字段或格式错误")
}
collectionName, ok := command["collection"].(string)
if !ok {
return nil, fmt.Errorf("命令中缺少 'collection' 字段或格式错误")
}
// 如果没有指定数据库,使用配置中的默认数据库
if database == "" {
if c.config != nil && c.config.Database != "" {
database = c.config.Database
} else {
return nil, fmt.Errorf("需要指定数据库名称")
}
}
db := c.client.Database(database)
coll := db.Collection(collectionName)
switch op {
case "find":
filter := bson.M{}
if f, ok := command["filter"]; ok {
if filterMap, ok := f.(map[string]interface{}); ok {
filter = bson.M(filterMap)
}
}
limit := int64(100)
if l, ok := command["limit"]; ok {
if limitVal, ok := l.(float64); ok {
limit = int64(limitVal)
} else if limitVal, ok := l.(int64); ok {
limit = limitVal
}
}
opts := options.Find().SetLimit(limit)
cursor, err := coll.Find(ctx, filter, opts)
if err != nil {
return nil, fmt.Errorf("查询失败: %v", err)
}
defer cursor.Close(ctx)
var results []map[string]interface{}
if err := cursor.All(ctx, &results); err != nil {
return nil, fmt.Errorf("读取结果失败: %v", err)
}
return results, nil
case "count":
filter := bson.M{}
if f, ok := command["filter"]; ok {
if filterMap, ok := f.(map[string]interface{}); ok {
filter = bson.M(filterMap)
}
}
count, err := coll.CountDocuments(ctx, filter)
if err != nil {
return nil, fmt.Errorf("统计失败: %v", err)
}
return count, nil
case "insertOne":
document, ok := command["document"]
if !ok {
return nil, fmt.Errorf("insertOne 操作需要 'document' 字段")
}
doc := bson.M{}
if docMap, ok := document.(map[string]interface{}); ok {
doc = bson.M(docMap)
} else {
return nil, fmt.Errorf("document 必须是对象格式")
}
result, err := coll.InsertOne(ctx, doc)
if err != nil {
return nil, fmt.Errorf("插入失败: %v", err)
}
return map[string]interface{}{
"insertedId": result.InsertedID,
}, nil
case "insertMany":
documents, ok := command["documents"]
if !ok {
return nil, fmt.Errorf("insertMany 操作需要 'documents' 字段")
}
docs := []interface{}{}
if docsSlice, ok := documents.([]interface{}); ok {
for _, d := range docsSlice {
if docMap, ok := d.(map[string]interface{}); ok {
docs = append(docs, bson.M(docMap))
}
}
} else {
return nil, fmt.Errorf("documents 必须是数组格式")
}
result, err := coll.InsertMany(ctx, docs)
if err != nil {
return nil, fmt.Errorf("批量插入失败: %v", err)
}
return map[string]interface{}{
"insertedIds": result.InsertedIDs,
"insertedCount": len(result.InsertedIDs),
}, nil
case "updateOne":
filter := bson.M{}
if f, ok := command["filter"]; ok {
if filterMap, ok := f.(map[string]interface{}); ok {
filter = bson.M(filterMap)
}
} else {
return nil, fmt.Errorf("updateOne 操作需要 'filter' 字段")
}
update, ok := command["update"]
if !ok {
return nil, fmt.Errorf("updateOne 操作需要 'update' 字段")
}
updateDoc := bson.M{}
if updateMap, ok := update.(map[string]interface{}); ok {
updateDoc = bson.M(updateMap)
} else {
return nil, fmt.Errorf("update 必须是对象格式")
}
result, err := coll.UpdateOne(ctx, filter, bson.M{"$set": updateDoc})
if err != nil {
return nil, fmt.Errorf("更新失败: %v", err)
}
return map[string]interface{}{
"matchedCount": result.MatchedCount,
"modifiedCount": result.ModifiedCount,
}, nil
case "updateMany":
filter := bson.M{}
if f, ok := command["filter"]; ok {
if filterMap, ok := f.(map[string]interface{}); ok {
filter = bson.M(filterMap)
}
} else {
return nil, fmt.Errorf("updateMany 操作需要 'filter' 字段")
}
update, ok := command["update"]
if !ok {
return nil, fmt.Errorf("updateMany 操作需要 'update' 字段")
}
updateDoc := bson.M{}
if updateMap, ok := update.(map[string]interface{}); ok {
updateDoc = bson.M(updateMap)
} else {
return nil, fmt.Errorf("update 必须是对象格式")
}
result, err := coll.UpdateMany(ctx, filter, bson.M{"$set": updateDoc})
if err != nil {
return nil, fmt.Errorf("批量更新失败: %v", err)
}
return map[string]interface{}{
"matchedCount": result.MatchedCount,
"modifiedCount": result.ModifiedCount,
}, nil
case "deleteOne":
filter := bson.M{}
if f, ok := command["filter"]; ok {
if filterMap, ok := f.(map[string]interface{}); ok {
filter = bson.M(filterMap)
}
} else {
return nil, fmt.Errorf("deleteOne 操作需要 'filter' 字段")
}
result, err := coll.DeleteOne(ctx, filter)
if err != nil {
return nil, fmt.Errorf("删除失败: %v", err)
}
return map[string]interface{}{
"deletedCount": result.DeletedCount,
}, nil
case "deleteMany":
filter := bson.M{}
if f, ok := command["filter"]; ok {
if filterMap, ok := f.(map[string]interface{}); ok {
filter = bson.M(filterMap)
}
} else {
return nil, fmt.Errorf("deleteMany 操作需要 'filter' 字段")
}
result, err := coll.DeleteMany(ctx, filter)
if err != nil {
return nil, fmt.Errorf("批量删除失败: %v", err)
}
return map[string]interface{}{
"deletedCount": result.DeletedCount,
}, nil
default:
return nil, fmt.Errorf("不支持的操作: %s支持的操作: find, count, insertOne, insertMany, updateOne, updateMany, deleteOne, deleteMany", op)
}
}
// PreviewCollectionIndexes 预览集合索引变更,只生成命令列表不执行
func (c *MongoClient) PreviewCollectionIndexes(ctx context.Context, database, collectionName string, structure map[string]interface{}) ([]string, error) {
coll := c.client.Database(database).Collection(collectionName)
var commands []string
// 获取当前索引
currentIndexes, err := coll.Indexes().List(ctx)
if err != nil {
return nil, fmt.Errorf("获取当前索引失败: %v", err)
}
defer currentIndexes.Close(ctx)
// 解析新的索引数据
var newIndexes []map[string]interface{}
if idxs, ok := structure["indexes"].([]interface{}); ok {
for _, idx := range idxs {
if idxMap, ok := idx.(map[string]interface{}); ok {
newIndexes = append(newIndexes, idxMap)
}
}
}
// 创建当前索引名映射
currentIndexMap := make(map[string]bool)
for currentIndexes.Next(ctx) {
var indexSpec bson.M
if err := currentIndexes.Decode(&indexSpec); err != nil {
continue
}
if name, ok := indexSpec["name"].(string); ok && name != "_id_" {
currentIndexMap[name] = true
}
}
// 创建新索引名映射
newIndexMap := make(map[string]bool)
for _, idx := range newIndexes {
if name, ok := idx["name"].(string); ok && name != "" && name != "_id_" {
newIndexMap[name] = true
}
}
// 删除不存在的索引
for name := range currentIndexMap {
if !newIndexMap[name] {
cmd := fmt.Sprintf("db.%s.dropIndex(\"%s\")", collectionName, name)
commands = append(commands, cmd)
}
}
// 添加或更新索引
for _, idx := range newIndexes {
name, _ := idx["name"].(string)
if name == "" || name == "_id_" {
continue
}
// 构建索引键
keys := bson.D{}
if keysData, ok := idx["keys"].(map[string]interface{}); ok {
for k, v := range keysData {
var order int
if vFloat, ok := v.(float64); ok {
order = int(vFloat)
} else if vInt, ok := v.(int); ok {
order = vInt
} else {
order = 1 // 默认升序
}
keys = append(keys, bson.E{Key: k, Value: order})
}
} else if columnName, ok := idx["Column_name"].(string); ok && columnName != "" {
// 兼容 MySQL 格式的索引数据
keys = append(keys, bson.E{Key: columnName, Value: 1})
}
if len(keys) == 0 {
continue
}
// 构建索引选项
indexOptions := options.Index()
indexOptions.SetName(name)
if unique, ok := idx["unique"].(bool); ok && unique {
indexOptions.SetUnique(true)
} else if nonUnique, ok := idx["Non_unique"].(float64); ok && nonUnique == 0 {
indexOptions.SetUnique(true)
}
// 如果索引已存在,先删除再创建
if currentIndexMap[name] {
dropCmd := fmt.Sprintf("db.%s.dropIndex(\"%s\")", collectionName, name)
commands = append(commands, dropCmd)
}
// 构建命令字符串MongoDB shell 格式)
keysStr := "{"
for i, key := range keys {
if i > 0 {
keysStr += ", "
}
keysStr += fmt.Sprintf("%s: %d", key.Key, key.Value)
}
keysStr += "}"
optionsStr := "{name: \"" + name + "\""
if indexOptions.Unique != nil && *indexOptions.Unique {
optionsStr += ", unique: true"
}
optionsStr += "}"
cmd := fmt.Sprintf("db.%s.createIndex(%s, %s)", collectionName, keysStr, optionsStr)
commands = append(commands, cmd)
}
return commands, nil
}
// UpdateCollectionIndexes 更新集合索引,返回执行的命令列表
func (c *MongoClient) UpdateCollectionIndexes(ctx context.Context, database, collectionName string, structure map[string]interface{}) ([]string, error) {
// 先预览生成命令列表
commands, err := c.PreviewCollectionIndexes(ctx, database, collectionName, structure)
if err != nil {
return nil, err
}
coll := c.client.Database(database).Collection(collectionName)
// 获取当前索引
currentIndexes, err := coll.Indexes().List(ctx)
if err != nil {
return commands, fmt.Errorf("获取当前索引失败: %v", err)
}
defer currentIndexes.Close(ctx)
// 解析新的索引数据
var newIndexes []map[string]interface{}
if idxs, ok := structure["indexes"].([]interface{}); ok {
for _, idx := range idxs {
if idxMap, ok := idx.(map[string]interface{}); ok {
newIndexes = append(newIndexes, idxMap)
}
}
}
// 创建当前索引名映射
currentIndexMap := make(map[string]bool)
for currentIndexes.Next(ctx) {
var indexSpec bson.M
if err := currentIndexes.Decode(&indexSpec); err != nil {
continue
}
if name, ok := indexSpec["name"].(string); ok && name != "_id_" {
currentIndexMap[name] = true
}
}
// 创建新索引名映射
newIndexMap := make(map[string]bool)
for _, idx := range newIndexes {
if name, ok := idx["name"].(string); ok && name != "" && name != "_id_" {
newIndexMap[name] = true
}
}
// 删除不存在的索引
for name := range currentIndexMap {
if !newIndexMap[name] {
_, err := coll.Indexes().DropOne(ctx, name)
if err != nil {
return commands, fmt.Errorf("删除索引失败: %v, 索引名: %s", err, name)
}
}
}
// 添加或更新索引
for _, idx := range newIndexes {
name, _ := idx["name"].(string)
if name == "" || name == "_id_" {
continue
}
// 构建索引键
keys := bson.D{}
if keysData, ok := idx["keys"].(map[string]interface{}); ok {
for k, v := range keysData {
var order int
if vFloat, ok := v.(float64); ok {
order = int(vFloat)
} else if vInt, ok := v.(int); ok {
order = vInt
} else {
order = 1 // 默认升序
}
keys = append(keys, bson.E{Key: k, Value: order})
}
} else if columnName, ok := idx["Column_name"].(string); ok && columnName != "" {
// 兼容 MySQL 格式的索引数据
keys = append(keys, bson.E{Key: columnName, Value: 1})
}
if len(keys) == 0 {
continue
}
// 构建索引选项
indexOptions := options.Index()
indexOptions.SetName(name)
if unique, ok := idx["unique"].(bool); ok && unique {
indexOptions.SetUnique(true)
} else if nonUnique, ok := idx["Non_unique"].(float64); ok && nonUnique == 0 {
indexOptions.SetUnique(true)
}
// 创建索引
indexModel := mongo.IndexModel{
Keys: keys,
Options: indexOptions,
}
// 如果索引已存在,先删除再创建
if currentIndexMap[name] {
_, err := coll.Indexes().DropOne(ctx, name)
if err != nil {
return commands, fmt.Errorf("删除旧索引失败: %v, 索引名: %s", err, name)
}
}
_, err := coll.Indexes().CreateOne(ctx, indexModel)
if err != nil {
return commands, fmt.Errorf("创建索引失败: %v, 索引名: %s", err, name)
}
}
return commands, nil
}