Private
Public Access
1
0

新增:连接管理、数据查询等功能

This commit is contained in:
2026-01-22 18:34:59 +08:00
parent 95d3a20292
commit 652f5e5d60
87 changed files with 15082 additions and 162 deletions

818
internal/dbclient/mongo.go Normal file
View File

@@ -0,0 +1,818 @@
package dbclient
import (
"context"
"fmt"
"net/url"
"time"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
// MongoClient MongoDB 客户端
type MongoClient struct {
client *mongo.Client
database *mongo.Database
config *MongoConfig
}
// MongoConfig MongoDB 配置
type MongoConfig struct {
Host string
Port int
Username string
Password string
Database string
AuthSource string // 认证数据库,默认为 "admin"
AuthMechanism string // 认证机制,如 "SCRAM-SHA-1", "SCRAM-SHA-256" 等
}
// NewMongoClient 创建 MongoDB 客户端
func NewMongoClient(config *MongoConfig) (*MongoClient, error) {
// 确定认证数据库,默认为 admin
authSource := config.AuthSource
if authSource == "" {
authSource = "admin"
}
// 如果指定了认证机制,直接使用;否则尝试自动检测
authMechanisms := []string{}
if config.AuthMechanism != "" {
// 用户明确指定了认证机制,只使用该机制
authMechanisms = []string{config.AuthMechanism}
} else {
// 未指定时,先尝试 SCRAM-SHA-256更安全失败则尝试 SCRAM-SHA-1
authMechanisms = []string{"SCRAM-SHA-256", "SCRAM-SHA-1"}
}
var lastErr error
for _, authMechanism := range authMechanisms {
client, err := tryConnectMongo(config, authSource, authMechanism)
if err == nil {
return client, nil
}
lastErr = err
// 如果明确指定了认证机制,失败后不再尝试其他机制
if config.AuthMechanism != "" {
break
}
}
// 所有认证机制都失败
if lastErr != nil {
return nil, fmt.Errorf("MongoDB 连接测试失败: %v", lastErr)
}
return nil, fmt.Errorf("MongoDB 连接失败: 未知错误")
}
// tryConnectMongo 尝试使用指定的认证机制连接 MongoDB
func tryConnectMongo(config *MongoConfig, authSource, authMechanism string) (*MongoClient, error) {
// 构建连接 URI
var uri string
if config.Username != "" && config.Password != "" {
// 使用 url.UserPassword 正确转义用户名和密码中的特殊字符
// 这会正确处理 @、:、/ 等特殊字符
userInfo := url.UserPassword(config.Username, config.Password)
// 构建基础 URI
uri = fmt.Sprintf("mongodb://%s@%s:%d", userInfo.String(), config.Host, config.Port)
// 添加数据库和认证源参数
params := url.Values{}
params.Set("authSource", authSource)
// 添加认证机制参数
if authMechanism != "" {
params.Set("authMechanism", authMechanism)
}
// 如果有业务数据库,添加到路径中
if config.Database != "" {
uri = fmt.Sprintf("%s/%s?%s", uri, config.Database, params.Encode())
} else {
// MongoDB URI 要求查询参数前必须有 /,即使没有数据库名
uri = fmt.Sprintf("%s/?%s", uri, params.Encode())
}
} else if config.Database != "" {
// 没有认证信息时,数据库部分用于指定默认数据库
uri = fmt.Sprintf("mongodb://%s:%d/%s", config.Host, config.Port, config.Database)
} else {
uri = fmt.Sprintf("mongodb://%s:%d", config.Host, config.Port)
}
// 客户端选项
clientOptions := options.Client().
ApplyURI(uri).
SetConnectTimeout(5 * time.Second).
SetServerSelectionTimeout(5 * time.Second)
// 创建客户端
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
client, err := mongo.Connect(ctx, clientOptions)
if err != nil {
return nil, fmt.Errorf("连接 MongoDB 失败: %v", err)
}
// 测试连接
if err := client.Ping(ctx, nil); err != nil {
client.Disconnect(ctx)
return nil, fmt.Errorf("MongoDB 连接测试失败: %v", err)
}
var database *mongo.Database
if config.Database != "" {
database = client.Database(config.Database)
}
return &MongoClient{
client: client,
database: database,
config: config,
}, nil
}
// TestMongoConnection 测试连接
func TestMongoConnection(host string, port int, username, password, database string) error {
return TestMongoConnectionWithAuthSource(host, port, username, password, database, "")
}
// TestMongoConnectionWithAuthSource 测试连接(支持指定认证数据库)
func TestMongoConnectionWithAuthSource(host string, port int, username, password, database, authSource string) error {
return TestMongoConnectionWithOptions(host, port, username, password, database, authSource, "")
}
// TestMongoConnectionWithOptions 测试连接(支持指定认证数据库和认证机制)
func TestMongoConnectionWithOptions(host string, port int, username, password, database, authSource, authMechanism string) error {
config := &MongoConfig{
Host: host,
Port: port,
Username: username,
Password: password,
Database: database,
AuthSource: authSource,
AuthMechanism: authMechanism,
}
client, err := NewMongoClient(config)
if err != nil {
return err
}
defer client.Close()
return nil
}
// Close 关闭连接
func (c *MongoClient) Close() error {
if c.client != nil {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
return c.client.Disconnect(ctx)
}
return nil
}
// ListDatabases 获取数据库列表
func (c *MongoClient) ListDatabases(ctx context.Context) ([]string, error) {
databases, err := c.client.ListDatabaseNames(ctx, bson.M{})
return databases, err
}
// ListCollections 获取集合列表
func (c *MongoClient) ListCollections(ctx context.Context, database string) ([]string, error) {
db := c.client.Database(database)
collections, err := db.ListCollectionNames(ctx, bson.M{})
return collections, err
}
// GetCollectionStructure 获取集合结构
func (c *MongoClient) GetCollectionStructure(ctx context.Context, database, collectionName string) (map[string]interface{}, error) {
coll := c.client.Database(database).Collection(collectionName)
result := map[string]interface{}{
"database": database,
"collection": collectionName,
"sampleDocs": []map[string]interface{}{},
"fieldStats": map[string]int{},
"indexes": []map[string]interface{}{},
"documentCount": int64(0),
}
// 获取文档示例(最多 5 个)
opts := options.Find().SetLimit(5)
cursor, err := coll.Find(ctx, bson.M{}, opts)
if err != nil {
return nil, fmt.Errorf("获取文档示例失败: %v", err)
}
defer cursor.Close(ctx)
var docs []bson.M
if err = cursor.All(ctx, &docs); err != nil {
return nil, fmt.Errorf("解析文档失败: %v", err)
}
// 转换为 map
sampleDocs := make([]map[string]interface{}, 0, len(docs))
for _, doc := range docs {
docMap := make(map[string]interface{})
for k, v := range doc {
docMap[k] = v
}
sampleDocs = append(sampleDocs, docMap)
}
result["sampleDocs"] = sampleDocs
// 字段统计:使用 $sample 聚合管道随机采样10个文档进行统计
// 这样可以获得更准确的字段分布,同时保持良好性能
// 使用异步方式执行,避免阻塞主流程
sampleSize := 10
pipeline := []bson.M{
{"$sample": bson.M{"size": sampleSize}},
{"$project": bson.M{"keys": bson.M{"$objectToArray": "$$ROOT"}}},
{"$unwind": "$keys"},
{"$group": bson.M{
"_id": "$keys.k",
"count": bson.M{"$sum": 1},
}},
{"$sort": bson.M{"count": -1}}, // 按出现次数降序排序
}
sampleCursor, err := coll.Aggregate(ctx, pipeline)
if err != nil {
// 如果采样失败,回退到基于文档示例的统计
fieldCount := make(map[string]int)
for _, doc := range docs {
for key := range doc {
fieldCount[key]++
}
}
result["fieldStats"] = fieldCount
result["fieldStatsSampleSize"] = len(docs) // 记录实际采样数量
result["fieldStatsMethod"] = "sample-docs" // 标记统计方式
} else {
defer sampleCursor.Close(ctx)
fieldCount := make(map[string]int)
for sampleCursor.Next(ctx) {
var statResult bson.M
if err := sampleCursor.Decode(&statResult); err != nil {
continue
}
fieldName, ok := statResult["_id"].(string)
if !ok {
continue
}
var count int
switch v := statResult["count"].(type) {
case int32:
count = int(v)
case int64:
count = int(v)
case int:
count = v
case float64:
count = int(v)
default:
continue
}
fieldCount[fieldName] = count
}
result["fieldStats"] = fieldCount
result["fieldStatsSampleSize"] = sampleSize // 记录采样数量
result["fieldStatsMethod"] = "sample-aggregate" // 标记统计方式
}
// 文档总数(使用估算值,性能更好)
// 对于大数据集estimatedDocumentCount 比 CountDocuments 快得多
// 如果需要精确值,可以使用 CountDocuments但性能较差
count, err := coll.EstimatedDocumentCount(ctx)
if err != nil {
// 如果估算失败,尝试精确计数(可能较慢)
count, err = coll.CountDocuments(ctx, bson.M{})
if err != nil {
return nil, fmt.Errorf("获取文档数量失败: %v", err)
}
}
result["documentCount"] = count
// 索引信息
indexCursor, err := coll.Indexes().List(ctx)
if err != nil {
// 索引查询失败不影响主流程
result["indexes"] = []map[string]interface{}{}
} else {
var indexes []map[string]interface{}
for indexCursor.Next(ctx) {
var indexSpec bson.M
if err := indexCursor.Decode(&indexSpec); err != nil {
continue
}
indexes = append(indexes, map[string]interface{}{
"name": indexSpec["name"],
"unique": indexSpec["unique"],
"keys": indexSpec["key"],
})
}
indexCursor.Close(ctx)
result["indexes"] = indexes
}
return result, nil
}
// ExecuteQuery 执行查询
func (c *MongoClient) ExecuteQuery(ctx context.Context, database, collection string, filter bson.M, limit int64) ([]map[string]interface{}, error) {
db := c.client.Database(database)
coll := db.Collection(collection)
opts := options.Find().SetLimit(limit)
cursor, err := coll.Find(ctx, filter, opts)
if err != nil {
return nil, fmt.Errorf("查询失败: %v", err)
}
defer cursor.Close(ctx)
var results []map[string]interface{}
if err := cursor.All(ctx, &results); err != nil {
return nil, fmt.Errorf("读取结果失败: %v", err)
}
return results, nil
}
// CountDocuments 获取文档数量
func (c *MongoClient) CountDocuments(ctx context.Context, database, collection string, filter bson.M) (int64, error) {
db := c.client.Database(database)
coll := db.Collection(collection)
return coll.CountDocuments(ctx, filter)
}
// ExecuteCommand 执行 MongoDB 命令
// command 可以是 JSON 格式的字符串,格式:{"op": "find", "database": "test", "collection": "users", "filter": {}, "limit": 100}
// 支持的操作find, count, insertOne, insertMany, updateOne, updateMany, deleteOne, deleteMany
func (c *MongoClient) ExecuteCommand(ctx context.Context, database string, command map[string]interface{}) (interface{}, error) {
op, ok := command["op"].(string)
if !ok {
return nil, fmt.Errorf("命令中缺少 'op' 字段或格式错误")
}
collectionName, ok := command["collection"].(string)
if !ok {
return nil, fmt.Errorf("命令中缺少 'collection' 字段或格式错误")
}
// 如果没有指定数据库,使用配置中的默认数据库
if database == "" {
if c.config != nil && c.config.Database != "" {
database = c.config.Database
} else {
return nil, fmt.Errorf("需要指定数据库名称")
}
}
db := c.client.Database(database)
coll := db.Collection(collectionName)
switch op {
case "find":
filter := bson.M{}
if f, ok := command["filter"]; ok {
if filterMap, ok := f.(map[string]interface{}); ok {
filter = bson.M(filterMap)
}
}
limit := int64(100)
if l, ok := command["limit"]; ok {
if limitVal, ok := l.(float64); ok {
limit = int64(limitVal)
} else if limitVal, ok := l.(int64); ok {
limit = limitVal
}
}
opts := options.Find().SetLimit(limit)
cursor, err := coll.Find(ctx, filter, opts)
if err != nil {
return nil, fmt.Errorf("查询失败: %v", err)
}
defer cursor.Close(ctx)
var results []map[string]interface{}
if err := cursor.All(ctx, &results); err != nil {
return nil, fmt.Errorf("读取结果失败: %v", err)
}
return results, nil
case "count":
filter := bson.M{}
if f, ok := command["filter"]; ok {
if filterMap, ok := f.(map[string]interface{}); ok {
filter = bson.M(filterMap)
}
}
count, err := coll.CountDocuments(ctx, filter)
if err != nil {
return nil, fmt.Errorf("统计失败: %v", err)
}
return count, nil
case "insertOne":
document, ok := command["document"]
if !ok {
return nil, fmt.Errorf("insertOne 操作需要 'document' 字段")
}
doc := bson.M{}
if docMap, ok := document.(map[string]interface{}); ok {
doc = bson.M(docMap)
} else {
return nil, fmt.Errorf("document 必须是对象格式")
}
result, err := coll.InsertOne(ctx, doc)
if err != nil {
return nil, fmt.Errorf("插入失败: %v", err)
}
return map[string]interface{}{
"insertedId": result.InsertedID,
}, nil
case "insertMany":
documents, ok := command["documents"]
if !ok {
return nil, fmt.Errorf("insertMany 操作需要 'documents' 字段")
}
docs := []interface{}{}
if docsSlice, ok := documents.([]interface{}); ok {
for _, d := range docsSlice {
if docMap, ok := d.(map[string]interface{}); ok {
docs = append(docs, bson.M(docMap))
}
}
} else {
return nil, fmt.Errorf("documents 必须是数组格式")
}
result, err := coll.InsertMany(ctx, docs)
if err != nil {
return nil, fmt.Errorf("批量插入失败: %v", err)
}
return map[string]interface{}{
"insertedIds": result.InsertedIDs,
"insertedCount": len(result.InsertedIDs),
}, nil
case "updateOne":
filter := bson.M{}
if f, ok := command["filter"]; ok {
if filterMap, ok := f.(map[string]interface{}); ok {
filter = bson.M(filterMap)
}
} else {
return nil, fmt.Errorf("updateOne 操作需要 'filter' 字段")
}
update, ok := command["update"]
if !ok {
return nil, fmt.Errorf("updateOne 操作需要 'update' 字段")
}
updateDoc := bson.M{}
if updateMap, ok := update.(map[string]interface{}); ok {
updateDoc = bson.M(updateMap)
} else {
return nil, fmt.Errorf("update 必须是对象格式")
}
result, err := coll.UpdateOne(ctx, filter, bson.M{"$set": updateDoc})
if err != nil {
return nil, fmt.Errorf("更新失败: %v", err)
}
return map[string]interface{}{
"matchedCount": result.MatchedCount,
"modifiedCount": result.ModifiedCount,
}, nil
case "updateMany":
filter := bson.M{}
if f, ok := command["filter"]; ok {
if filterMap, ok := f.(map[string]interface{}); ok {
filter = bson.M(filterMap)
}
} else {
return nil, fmt.Errorf("updateMany 操作需要 'filter' 字段")
}
update, ok := command["update"]
if !ok {
return nil, fmt.Errorf("updateMany 操作需要 'update' 字段")
}
updateDoc := bson.M{}
if updateMap, ok := update.(map[string]interface{}); ok {
updateDoc = bson.M(updateMap)
} else {
return nil, fmt.Errorf("update 必须是对象格式")
}
result, err := coll.UpdateMany(ctx, filter, bson.M{"$set": updateDoc})
if err != nil {
return nil, fmt.Errorf("批量更新失败: %v", err)
}
return map[string]interface{}{
"matchedCount": result.MatchedCount,
"modifiedCount": result.ModifiedCount,
}, nil
case "deleteOne":
filter := bson.M{}
if f, ok := command["filter"]; ok {
if filterMap, ok := f.(map[string]interface{}); ok {
filter = bson.M(filterMap)
}
} else {
return nil, fmt.Errorf("deleteOne 操作需要 'filter' 字段")
}
result, err := coll.DeleteOne(ctx, filter)
if err != nil {
return nil, fmt.Errorf("删除失败: %v", err)
}
return map[string]interface{}{
"deletedCount": result.DeletedCount,
}, nil
case "deleteMany":
filter := bson.M{}
if f, ok := command["filter"]; ok {
if filterMap, ok := f.(map[string]interface{}); ok {
filter = bson.M(filterMap)
}
} else {
return nil, fmt.Errorf("deleteMany 操作需要 'filter' 字段")
}
result, err := coll.DeleteMany(ctx, filter)
if err != nil {
return nil, fmt.Errorf("批量删除失败: %v", err)
}
return map[string]interface{}{
"deletedCount": result.DeletedCount,
}, nil
default:
return nil, fmt.Errorf("不支持的操作: %s支持的操作: find, count, insertOne, insertMany, updateOne, updateMany, deleteOne, deleteMany", op)
}
}
// PreviewCollectionIndexes 预览集合索引变更,只生成命令列表不执行
func (c *MongoClient) PreviewCollectionIndexes(ctx context.Context, database, collectionName string, structure map[string]interface{}) ([]string, error) {
coll := c.client.Database(database).Collection(collectionName)
var commands []string
// 获取当前索引
currentIndexes, err := coll.Indexes().List(ctx)
if err != nil {
return nil, fmt.Errorf("获取当前索引失败: %v", err)
}
defer currentIndexes.Close(ctx)
// 解析新的索引数据
var newIndexes []map[string]interface{}
if idxs, ok := structure["indexes"].([]interface{}); ok {
for _, idx := range idxs {
if idxMap, ok := idx.(map[string]interface{}); ok {
newIndexes = append(newIndexes, idxMap)
}
}
}
// 创建当前索引名映射
currentIndexMap := make(map[string]bool)
for currentIndexes.Next(ctx) {
var indexSpec bson.M
if err := currentIndexes.Decode(&indexSpec); err != nil {
continue
}
if name, ok := indexSpec["name"].(string); ok && name != "_id_" {
currentIndexMap[name] = true
}
}
// 创建新索引名映射
newIndexMap := make(map[string]bool)
for _, idx := range newIndexes {
if name, ok := idx["name"].(string); ok && name != "" && name != "_id_" {
newIndexMap[name] = true
}
}
// 删除不存在的索引
for name := range currentIndexMap {
if !newIndexMap[name] {
cmd := fmt.Sprintf("db.%s.dropIndex(\"%s\")", collectionName, name)
commands = append(commands, cmd)
}
}
// 添加或更新索引
for _, idx := range newIndexes {
name, _ := idx["name"].(string)
if name == "" || name == "_id_" {
continue
}
// 构建索引键
keys := bson.D{}
if keysData, ok := idx["keys"].(map[string]interface{}); ok {
for k, v := range keysData {
var order int
if vFloat, ok := v.(float64); ok {
order = int(vFloat)
} else if vInt, ok := v.(int); ok {
order = vInt
} else {
order = 1 // 默认升序
}
keys = append(keys, bson.E{Key: k, Value: order})
}
} else if columnName, ok := idx["Column_name"].(string); ok && columnName != "" {
// 兼容 MySQL 格式的索引数据
keys = append(keys, bson.E{Key: columnName, Value: 1})
}
if len(keys) == 0 {
continue
}
// 构建索引选项
indexOptions := options.Index()
indexOptions.SetName(name)
if unique, ok := idx["unique"].(bool); ok && unique {
indexOptions.SetUnique(true)
} else if nonUnique, ok := idx["Non_unique"].(float64); ok && nonUnique == 0 {
indexOptions.SetUnique(true)
}
// 如果索引已存在,先删除再创建
if currentIndexMap[name] {
dropCmd := fmt.Sprintf("db.%s.dropIndex(\"%s\")", collectionName, name)
commands = append(commands, dropCmd)
}
// 构建命令字符串MongoDB shell 格式)
keysStr := "{"
for i, key := range keys {
if i > 0 {
keysStr += ", "
}
keysStr += fmt.Sprintf("%s: %d", key.Key, key.Value)
}
keysStr += "}"
optionsStr := "{name: \"" + name + "\""
if indexOptions.Unique != nil && *indexOptions.Unique {
optionsStr += ", unique: true"
}
optionsStr += "}"
cmd := fmt.Sprintf("db.%s.createIndex(%s, %s)", collectionName, keysStr, optionsStr)
commands = append(commands, cmd)
}
return commands, nil
}
// UpdateCollectionIndexes 更新集合索引,返回执行的命令列表
func (c *MongoClient) UpdateCollectionIndexes(ctx context.Context, database, collectionName string, structure map[string]interface{}) ([]string, error) {
// 先预览生成命令列表
commands, err := c.PreviewCollectionIndexes(ctx, database, collectionName, structure)
if err != nil {
return nil, err
}
coll := c.client.Database(database).Collection(collectionName)
// 获取当前索引
currentIndexes, err := coll.Indexes().List(ctx)
if err != nil {
return commands, fmt.Errorf("获取当前索引失败: %v", err)
}
defer currentIndexes.Close(ctx)
// 解析新的索引数据
var newIndexes []map[string]interface{}
if idxs, ok := structure["indexes"].([]interface{}); ok {
for _, idx := range idxs {
if idxMap, ok := idx.(map[string]interface{}); ok {
newIndexes = append(newIndexes, idxMap)
}
}
}
// 创建当前索引名映射
currentIndexMap := make(map[string]bool)
for currentIndexes.Next(ctx) {
var indexSpec bson.M
if err := currentIndexes.Decode(&indexSpec); err != nil {
continue
}
if name, ok := indexSpec["name"].(string); ok && name != "_id_" {
currentIndexMap[name] = true
}
}
// 创建新索引名映射
newIndexMap := make(map[string]bool)
for _, idx := range newIndexes {
if name, ok := idx["name"].(string); ok && name != "" && name != "_id_" {
newIndexMap[name] = true
}
}
// 删除不存在的索引
for name := range currentIndexMap {
if !newIndexMap[name] {
_, err := coll.Indexes().DropOne(ctx, name)
if err != nil {
return commands, fmt.Errorf("删除索引失败: %v, 索引名: %s", err, name)
}
}
}
// 添加或更新索引
for _, idx := range newIndexes {
name, _ := idx["name"].(string)
if name == "" || name == "_id_" {
continue
}
// 构建索引键
keys := bson.D{}
if keysData, ok := idx["keys"].(map[string]interface{}); ok {
for k, v := range keysData {
var order int
if vFloat, ok := v.(float64); ok {
order = int(vFloat)
} else if vInt, ok := v.(int); ok {
order = vInt
} else {
order = 1 // 默认升序
}
keys = append(keys, bson.E{Key: k, Value: order})
}
} else if columnName, ok := idx["Column_name"].(string); ok && columnName != "" {
// 兼容 MySQL 格式的索引数据
keys = append(keys, bson.E{Key: columnName, Value: 1})
}
if len(keys) == 0 {
continue
}
// 构建索引选项
indexOptions := options.Index()
indexOptions.SetName(name)
if unique, ok := idx["unique"].(bool); ok && unique {
indexOptions.SetUnique(true)
} else if nonUnique, ok := idx["Non_unique"].(float64); ok && nonUnique == 0 {
indexOptions.SetUnique(true)
}
// 创建索引
indexModel := mongo.IndexModel{
Keys: keys,
Options: indexOptions,
}
// 如果索引已存在,先删除再创建
if currentIndexMap[name] {
_, err := coll.Indexes().DropOne(ctx, name)
if err != nil {
return commands, fmt.Errorf("删除旧索引失败: %v, 索引名: %s", err, name)
}
}
_, err := coll.Indexes().CreateOne(ctx, indexModel)
if err != nil {
return commands, fmt.Errorf("创建索引失败: %v, 索引名: %s", err, name)
}
}
return commands, nil
}