新增: MongoDB/Redis 代理工具
This commit is contained in:
2575
mongo-proxy/Cargo.lock
generated
Normal file
2575
mongo-proxy/Cargo.lock
generated
Normal file
File diff suppressed because it is too large
Load Diff
23
mongo-proxy/Cargo.toml
Normal file
23
mongo-proxy/Cargo.toml
Normal file
@@ -0,0 +1,23 @@
|
||||
[package]
|
||||
name = "mongo-proxy"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
description = "MongoDB HTTP proxy with connection pooling"
|
||||
|
||||
[dependencies]
|
||||
mongodb = "3"
|
||||
tokio = { version = "1", features = ["full"] }
|
||||
axum = "0.7"
|
||||
serde = { version = "1", features = ["derive"] }
|
||||
serde_json = "1"
|
||||
toml = "0.8"
|
||||
futures-util = "0.3"
|
||||
anyhow = "1"
|
||||
clap = { version = "4", features = ["derive"] }
|
||||
ureq = "2"
|
||||
time = { version = "0.3", features = ["formatting", "macros"] }
|
||||
|
||||
[profile.release]
|
||||
opt-level = "z"
|
||||
lto = true
|
||||
strip = true
|
||||
23
mongo-proxy/mongo-proxy.toml
Normal file
23
mongo-proxy/mongo-proxy.toml
Normal file
@@ -0,0 +1,23 @@
|
||||
[server]
|
||||
port = 3309
|
||||
host = "127.0.0.1"
|
||||
|
||||
[pool]
|
||||
idle_timeout_secs = 300
|
||||
check_interval_secs = 60
|
||||
|
||||
[[connections]]
|
||||
name = "suke_dev"
|
||||
host = "39.99.243.191"
|
||||
port = 27017
|
||||
database = "suke"
|
||||
username = "u_suke"
|
||||
password = "123456"
|
||||
|
||||
[[connections]]
|
||||
name = "suke_pro"
|
||||
host = "39.100.50.215"
|
||||
port = 27017
|
||||
database = "suke_pro"
|
||||
username = "u_suke"
|
||||
password = "231122"
|
||||
278
mongo-proxy/src/cli.rs
Normal file
278
mongo-proxy/src/cli.rs
Normal file
@@ -0,0 +1,278 @@
|
||||
use anyhow::{Result, bail};
|
||||
use serde::Deserialize;
|
||||
use std::time::Instant;
|
||||
|
||||
const DEFAULT_SERVER: &str = "http://127.0.0.1:3309";
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct ErrorResponse {
|
||||
error: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct ConnectionsResponse {
|
||||
connections: Vec<ConnectionInfo>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct ConnectionInfo {
|
||||
name: String,
|
||||
database: String,
|
||||
host: String,
|
||||
status: String,
|
||||
}
|
||||
|
||||
pub struct Cli {
|
||||
server: String,
|
||||
}
|
||||
|
||||
impl Cli {
|
||||
pub fn new(server: Option<String>) -> Self {
|
||||
Self { server: server.unwrap_or_else(|| DEFAULT_SERVER.to_string()) }
|
||||
}
|
||||
|
||||
pub fn check_server(&self) -> Result<bool> {
|
||||
match ureq::get(&format!("{}/health", self.server)).call() {
|
||||
Ok(_) => Ok(true),
|
||||
Err(_) => Ok(false),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn find(&self, conn: &str, collection: &str, filter: &str, format: &str,
|
||||
projection: Option<&str>, limit: Option<i64>, skip: Option<u64>) -> Result<()> {
|
||||
let start = Instant::now();
|
||||
|
||||
let mut body = serde_json::json!({
|
||||
"conn": conn,
|
||||
"collection": collection,
|
||||
"filter": serde_json::from_str::<serde_json::Value>(filter).unwrap_or(serde_json::json!({}))
|
||||
});
|
||||
if let Some(p) = projection {
|
||||
body["projection"] = serde_json::from_str(p).unwrap_or(serde_json::Value::Null);
|
||||
}
|
||||
if let Some(l) = limit {
|
||||
body["limit"] = serde_json::json!(l);
|
||||
}
|
||||
if let Some(s) = skip {
|
||||
body["skip"] = serde_json::json!(s);
|
||||
}
|
||||
|
||||
let resp = post_json(&format!("{}/find", self.server), &body)?;
|
||||
let total_ms = start.elapsed().as_millis();
|
||||
|
||||
if let Ok(err) = serde_json::from_str::<ErrorResponse>(&resp) {
|
||||
eprintln!("Error: {}", err.error);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct FindResp {
|
||||
documents: Vec<serde_json::Value>,
|
||||
count: usize,
|
||||
#[serde(rename = "durationMs")]
|
||||
duration_ms: u64,
|
||||
}
|
||||
let result: FindResp = serde_json::from_str(&resp)?;
|
||||
|
||||
match format {
|
||||
"json" => println!("{}", serde_json::to_string_pretty(&result.documents).unwrap_or_default()),
|
||||
_ => {
|
||||
for doc in &result.documents {
|
||||
println!("{}", serde_json::to_string_pretty(doc).unwrap_or_default());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
println!("\n{} documents ({}ms db, {}ms total)", result.count, result.duration_ms, total_ms);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn insert(&self, conn: &str, collection: &str, documents: &str) -> Result<()> {
|
||||
let start = Instant::now();
|
||||
|
||||
let docs: Vec<serde_json::Value> = serde_json::from_str(documents)
|
||||
.map_err(|e| anyhow::anyhow!("Invalid JSON documents: {}", e))?;
|
||||
|
||||
let body = serde_json::json!({
|
||||
"conn": conn,
|
||||
"collection": collection,
|
||||
"documents": docs
|
||||
});
|
||||
|
||||
let resp = post_json(&format!("{}/insert", self.server), &body)?;
|
||||
let total_ms = start.elapsed().as_millis();
|
||||
|
||||
if let Ok(err) = serde_json::from_str::<ErrorResponse>(&resp) {
|
||||
eprintln!("Error: {}", err.error);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct InsertResp {
|
||||
#[serde(rename = "insertedCount")]
|
||||
inserted_count: u64,
|
||||
}
|
||||
let result: InsertResp = serde_json::from_str(&resp)?;
|
||||
println!("Inserted {} document(s) ({}ms total)", result.inserted_count, total_ms);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn update(&self, conn: &str, collection: &str, filter: &str, update: &str, upsert: bool, multi: bool) -> Result<()> {
|
||||
let start = Instant::now();
|
||||
|
||||
let mut body = serde_json::json!({
|
||||
"conn": conn,
|
||||
"collection": collection,
|
||||
"filter": serde_json::from_str::<serde_json::Value>(filter)?,
|
||||
"update": serde_json::from_str::<serde_json::Value>(update)?
|
||||
});
|
||||
if upsert { body["upsert"] = serde_json::json!(true); }
|
||||
if multi { body["multi"] = serde_json::json!(true); }
|
||||
|
||||
let resp = post_json(&format!("{}/update", self.server), &body)?;
|
||||
let total_ms = start.elapsed().as_millis();
|
||||
|
||||
if let Ok(err) = serde_json::from_str::<ErrorResponse>(&resp) {
|
||||
eprintln!("Error: {}", err.error);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct UpdateResp {
|
||||
#[serde(rename = "matchedCount")]
|
||||
matched_count: u64,
|
||||
#[serde(rename = "modifiedCount")]
|
||||
modified_count: u64,
|
||||
}
|
||||
let result: UpdateResp = serde_json::from_str(&resp)?;
|
||||
println!("Matched: {}, Modified: {} ({}ms total)", result.matched_count, result.modified_count, total_ms);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn delete(&self, conn: &str, collection: &str, filter: &str, multi: bool) -> Result<()> {
|
||||
let start = Instant::now();
|
||||
|
||||
let mut body = serde_json::json!({
|
||||
"conn": conn,
|
||||
"collection": collection,
|
||||
"filter": serde_json::from_str::<serde_json::Value>(filter)?
|
||||
});
|
||||
if multi { body["multi"] = serde_json::json!(true); }
|
||||
|
||||
let resp = post_json(&format!("{}/delete", self.server), &body)?;
|
||||
let total_ms = start.elapsed().as_millis();
|
||||
|
||||
if let Ok(err) = serde_json::from_str::<ErrorResponse>(&resp) {
|
||||
eprintln!("Error: {}", err.error);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct DeleteResp {
|
||||
#[serde(rename = "deletedCount")]
|
||||
deleted_count: u64,
|
||||
}
|
||||
let result: DeleteResp = serde_json::from_str(&resp)?;
|
||||
println!("Deleted {} document(s) ({}ms total)", result.deleted_count, total_ms);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn aggregate(&self, conn: &str, collection: &str, pipeline: &str) -> Result<()> {
|
||||
let start = Instant::now();
|
||||
|
||||
let body = serde_json::json!({
|
||||
"conn": conn,
|
||||
"collection": collection,
|
||||
"pipeline": serde_json::from_str::<serde_json::Value>(pipeline)?
|
||||
});
|
||||
|
||||
let resp = post_json(&format!("{}/aggregate", self.server), &body)?;
|
||||
let total_ms = start.elapsed().as_millis();
|
||||
|
||||
if let Ok(err) = serde_json::from_str::<ErrorResponse>(&resp) {
|
||||
eprintln!("Error: {}", err.error);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct AggResp {
|
||||
documents: Vec<serde_json::Value>,
|
||||
#[serde(rename = "durationMs")]
|
||||
duration_ms: u64,
|
||||
}
|
||||
let result: AggResp = serde_json::from_str(&resp)?;
|
||||
|
||||
for doc in &result.documents {
|
||||
println!("{}", serde_json::to_string_pretty(doc).unwrap_or_default());
|
||||
}
|
||||
|
||||
println!("\n{} result(s) ({}ms db, {}ms total)", result.documents.len(), result.duration_ms, total_ms);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn count(&self, conn: &str, collection: &str, filter: Option<&str>) -> Result<()> {
|
||||
let start = Instant::now();
|
||||
|
||||
let mut body = serde_json::json!({
|
||||
"conn": conn,
|
||||
"collection": collection
|
||||
});
|
||||
if let Some(f) = filter {
|
||||
body["filter"] = serde_json::from_str::<serde_json::Value>(f).unwrap_or(serde_json::json!({}));
|
||||
}
|
||||
|
||||
let resp = post_json(&format!("{}/count", self.server), &body)?;
|
||||
let total_ms = start.elapsed().as_millis();
|
||||
|
||||
if let Ok(err) = serde_json::from_str::<ErrorResponse>(&resp) {
|
||||
eprintln!("Error: {}", err.error);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct CountResp {
|
||||
count: u64,
|
||||
#[serde(rename = "durationMs")]
|
||||
duration_ms: u64,
|
||||
}
|
||||
let result: CountResp = serde_json::from_str(&resp)?;
|
||||
println!("{} documents ({}ms db, {}ms total)", result.count, result.duration_ms, total_ms);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn list_connections(&self) -> Result<()> {
|
||||
let response = ureq::get(&format!("{}/connections", self.server)).call()?;
|
||||
let body = response.into_string()?;
|
||||
|
||||
if let Ok(err) = serde_json::from_str::<ErrorResponse>(&body) {
|
||||
bail!("{}", err.error);
|
||||
}
|
||||
|
||||
let result: ConnectionsResponse = serde_json::from_str(&body)?;
|
||||
|
||||
println!("Connections:");
|
||||
println!("{:<15} {:<20} {:<40} {:<10}", "Name", "Database", "Host", "Status");
|
||||
println!("{}", "-".repeat(85));
|
||||
|
||||
for conn in result.connections {
|
||||
println!("{:<15} {:<20} {:<40} {:<10}",
|
||||
conn.name, conn.database, conn.host, conn.status);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn post_json(url: &str, body: &serde_json::Value) -> Result<String> {
|
||||
let data = serde_json::to_string(body)?;
|
||||
|
||||
let response = ureq::post(url)
|
||||
.set("Content-Type", "application/json")
|
||||
.send_string(&data);
|
||||
|
||||
match response {
|
||||
Ok(r) => Ok(r.into_string()?),
|
||||
Err(ureq::Error::Status(_, resp)) => Ok(resp.into_string()?),
|
||||
Err(e) => bail!("HTTP error: {}", e),
|
||||
}
|
||||
}
|
||||
139
mongo-proxy/src/config.rs
Normal file
139
mongo-proxy/src/config.rs
Normal file
@@ -0,0 +1,139 @@
|
||||
use anyhow::{Result, bail};
|
||||
use serde::Deserialize;
|
||||
use std::fs;
|
||||
|
||||
#[derive(Debug, Deserialize, Clone)]
|
||||
pub struct ServerConfig {
|
||||
pub port: u16,
|
||||
#[serde(default = "default_host")]
|
||||
pub host: String,
|
||||
}
|
||||
|
||||
fn default_host() -> String {
|
||||
"127.0.0.1".to_string()
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, Clone)]
|
||||
pub struct PoolConfig {
|
||||
#[serde(default = "default_idle_timeout")]
|
||||
pub idle_timeout_secs: u64,
|
||||
#[serde(default = "default_check_interval")]
|
||||
pub check_interval_secs: u64,
|
||||
}
|
||||
|
||||
fn default_idle_timeout() -> u64 { 300 }
|
||||
fn default_check_interval() -> u64 { 60 }
|
||||
|
||||
impl Default for PoolConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
idle_timeout_secs: default_idle_timeout(),
|
||||
check_interval_secs: default_check_interval(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, Clone)]
|
||||
pub struct ConnectionConfig {
|
||||
pub name: String,
|
||||
pub host: String,
|
||||
#[serde(default = "default_port")]
|
||||
pub port: u16,
|
||||
pub database: String,
|
||||
#[serde(default)]
|
||||
pub username: Option<String>,
|
||||
#[serde(default)]
|
||||
pub password: Option<String>,
|
||||
#[serde(default)]
|
||||
pub auth_source: Option<String>,
|
||||
}
|
||||
|
||||
fn default_port() -> u16 { 27017 }
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct Config {
|
||||
#[serde(default = "default_server")]
|
||||
pub server: ServerConfig,
|
||||
#[serde(default)]
|
||||
pub pool: PoolConfig,
|
||||
pub connections: Vec<ConnectionConfig>,
|
||||
}
|
||||
|
||||
fn default_server() -> ServerConfig {
|
||||
ServerConfig {
|
||||
port: 3309,
|
||||
host: "127.0.0.1".to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
impl Config {
|
||||
pub fn from_file(path: &str) -> Result<Self> {
|
||||
let content = fs::read_to_string(path)?;
|
||||
let config: Config = toml::from_str(&content)?;
|
||||
config.validate()?;
|
||||
Ok(config)
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub fn load() -> Result<Self> {
|
||||
let paths = [
|
||||
"mongo-proxy.toml",
|
||||
"./config/mongo-proxy.toml",
|
||||
&format!("{}/.mongo-proxy.toml", std::env::var("HOME").unwrap_or_default()),
|
||||
];
|
||||
|
||||
for path in &paths {
|
||||
if std::path::Path::new(path).exists() {
|
||||
return Self::from_file(path);
|
||||
}
|
||||
}
|
||||
|
||||
bail!("Config file not found. Create mongo-proxy.toml in current directory or use --config flag")
|
||||
}
|
||||
|
||||
fn validate(&self) -> Result<()> {
|
||||
let mut names = std::collections::HashSet::new();
|
||||
for conn in &self.connections {
|
||||
if conn.name.is_empty() {
|
||||
bail!("Connection name cannot be empty");
|
||||
}
|
||||
if names.contains(&conn.name) {
|
||||
bail!("Duplicate connection name: {}", conn.name);
|
||||
}
|
||||
names.insert(conn.name.clone());
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl ConnectionConfig {
|
||||
pub fn build_uri(&self) -> String {
|
||||
let cred = match (&self.username, &self.password) {
|
||||
(Some(user), Some(pass)) => format!("{}:{}@", user, urlencoding(pass)),
|
||||
_ => String::new(),
|
||||
};
|
||||
let auth = match &self.auth_source {
|
||||
Some(src) => format!("?authSource={}", src),
|
||||
None => String::new(),
|
||||
};
|
||||
format!(
|
||||
"mongodb://{}{}:{}/{}{}",
|
||||
cred, self.host, self.port, self.database, auth
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
fn urlencoding(s: &str) -> String {
|
||||
let mut result = String::new();
|
||||
for c in s.chars() {
|
||||
match c {
|
||||
'@' => result.push_str("%40"),
|
||||
':' => result.push_str("%3A"),
|
||||
'/' => result.push_str("%2F"),
|
||||
'?' => result.push_str("%3F"),
|
||||
'#' => result.push_str("%23"),
|
||||
_ => result.push(c),
|
||||
}
|
||||
}
|
||||
result
|
||||
}
|
||||
144
mongo-proxy/src/db.rs
Normal file
144
mongo-proxy/src/db.rs
Normal file
@@ -0,0 +1,144 @@
|
||||
use anyhow::{Result, bail};
|
||||
use mongodb::{Client, options::ClientOptions};
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Mutex;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use crate::config::{ConnectionConfig, PoolConfig};
|
||||
|
||||
struct ClientState {
|
||||
client: Client,
|
||||
last_used: Instant,
|
||||
}
|
||||
|
||||
pub struct ConnectionManager {
|
||||
clients: Mutex<HashMap<String, ClientState>>,
|
||||
configs: Mutex<HashMap<String, ConnectionConfig>>,
|
||||
pool_config: PoolConfig,
|
||||
}
|
||||
|
||||
impl ConnectionManager {
|
||||
pub fn new(configs: &[ConnectionConfig], pool_config: PoolConfig) -> Result<Self> {
|
||||
let mut config_map = HashMap::new();
|
||||
|
||||
for cfg in configs {
|
||||
config_map.insert(cfg.name.clone(), cfg.clone());
|
||||
println!(" Registered: {} ({}/{})", cfg.name, cfg.host, cfg.database);
|
||||
}
|
||||
|
||||
println!("\n{} connection(s) configured (lazy init)", config_map.len());
|
||||
println!("Pool config: idle_timeout={}s", pool_config.idle_timeout_secs);
|
||||
|
||||
Ok(Self {
|
||||
clients: Mutex::new(HashMap::new()),
|
||||
configs: Mutex::new(config_map),
|
||||
pool_config,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn get_client(&self, name: &str) -> Result<Client> {
|
||||
// 1. 已有客户端
|
||||
{
|
||||
let mut clients = self.clients.lock().unwrap();
|
||||
if let Some(state) = clients.get_mut(name) {
|
||||
state.last_used = Instant::now();
|
||||
return Ok(state.client.clone());
|
||||
}
|
||||
}
|
||||
|
||||
// 2. 创建新客户端
|
||||
let cfg = self.configs.lock().unwrap().get(name)
|
||||
.ok_or_else(|| anyhow::anyhow!("Connection '{}' not found", name))?
|
||||
.clone();
|
||||
|
||||
println!("[LazyInit] Creating MongoDB client for: {}", name);
|
||||
let client = self.create_client(&cfg).await?;
|
||||
let state = ClientState {
|
||||
client: client.clone(),
|
||||
last_used: Instant::now(),
|
||||
};
|
||||
|
||||
self.clients.lock().unwrap().insert(name.to_string(), state);
|
||||
Ok(client)
|
||||
}
|
||||
|
||||
async fn create_client(&self, cfg: &ConnectionConfig) -> Result<Client> {
|
||||
let uri = cfg.build_uri();
|
||||
let mut opts = ClientOptions::parse(&uri).await?;
|
||||
|
||||
// 连接超时
|
||||
opts.connect_timeout = Some(Duration::from_secs(5));
|
||||
opts.server_selection_timeout = Some(Duration::from_secs(5));
|
||||
|
||||
let client = Client::with_options(opts)?;
|
||||
|
||||
// 测试连接
|
||||
client.database(&cfg.database).run_command(mongodb::bson::doc! {"ping": 1}).await?;
|
||||
|
||||
println!("[LazyInit] ✓ Connected: {}", cfg.name);
|
||||
Ok(client)
|
||||
}
|
||||
|
||||
pub fn list_connections(&self) -> Vec<ConnectionInfo> {
|
||||
let clients = self.clients.lock().unwrap();
|
||||
let configs = self.configs.lock().unwrap();
|
||||
|
||||
configs.iter().map(|(name, cfg)| {
|
||||
let status = if clients.contains_key(name) { "connected" } else { "pending" };
|
||||
ConnectionInfo {
|
||||
name: name.clone(),
|
||||
database: cfg.database.clone(),
|
||||
host: cfg.host.clone(),
|
||||
status: status.to_string(),
|
||||
}
|
||||
}).collect()
|
||||
}
|
||||
|
||||
pub fn cleanup_idle(&self) {
|
||||
let mut clients = self.clients.lock().unwrap();
|
||||
let now = Instant::now();
|
||||
let idle_timeout = Duration::from_secs(self.pool_config.idle_timeout_secs);
|
||||
|
||||
clients.retain(|name, state| {
|
||||
let elapsed = now.duration_since(state.last_used);
|
||||
if elapsed > idle_timeout {
|
||||
println!("[Cleanup] Removing idle client: {} (idle {}s)", name, elapsed.as_secs());
|
||||
return false;
|
||||
}
|
||||
true
|
||||
});
|
||||
}
|
||||
|
||||
pub async fn add_connection(&self, cfg: ConnectionConfig) -> Result<()> {
|
||||
let name = cfg.name.clone();
|
||||
|
||||
// 先检查是否已存在(configs 锁短暂持有)
|
||||
{
|
||||
let configs = self.configs.lock().unwrap();
|
||||
if configs.contains_key(&name) {
|
||||
bail!("Connection '{}' already exists", name);
|
||||
}
|
||||
}
|
||||
|
||||
println!("[Dynamic] Adding: {} ({}/{})", name, cfg.host, cfg.database);
|
||||
let client = self.create_client(&cfg).await?;
|
||||
|
||||
// 先写 configs 再写 clients,与 get_client 的读取顺序一致
|
||||
self.configs.lock().unwrap().insert(name.clone(), cfg);
|
||||
self.clients.lock().unwrap().insert(name.clone(), ClientState {
|
||||
client: client.clone(),
|
||||
last_used: Instant::now(),
|
||||
});
|
||||
|
||||
println!("[Dynamic] ✓ Added: {}", name);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, serde::Serialize)]
|
||||
pub struct ConnectionInfo {
|
||||
pub name: String,
|
||||
pub database: String,
|
||||
pub host: String,
|
||||
pub status: String,
|
||||
}
|
||||
457
mongo-proxy/src/handler.rs
Normal file
457
mongo-proxy/src/handler.rs
Normal file
@@ -0,0 +1,457 @@
|
||||
use axum::{
|
||||
extract::State,
|
||||
http::StatusCode,
|
||||
Json,
|
||||
};
|
||||
use futures_util::TryStreamExt;
|
||||
use mongodb::bson::Document;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
|
||||
use crate::db::ConnectionManager;
|
||||
use crate::logger::{LogEntry, RequestLogger};
|
||||
|
||||
pub type AppState = Arc<(Arc<ConnectionManager>, Arc<RequestLogger>)>;
|
||||
|
||||
// ============== 请求结构 ==============
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct FindRequest {
|
||||
pub conn: String,
|
||||
pub collection: String,
|
||||
#[serde(default)]
|
||||
pub filter: Option<Document>,
|
||||
#[serde(default)]
|
||||
pub projection: Option<Document>,
|
||||
#[serde(default)]
|
||||
pub sort: Option<Document>,
|
||||
#[serde(default)]
|
||||
pub limit: Option<i64>,
|
||||
#[serde(default)]
|
||||
pub skip: Option<u64>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct InsertRequest {
|
||||
pub conn: String,
|
||||
pub collection: String,
|
||||
pub documents: Vec<Document>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct UpdateRequest {
|
||||
pub conn: String,
|
||||
pub collection: String,
|
||||
pub filter: Document,
|
||||
pub update: Document,
|
||||
#[serde(default)]
|
||||
pub upsert: Option<bool>,
|
||||
#[serde(default)]
|
||||
pub multi: Option<bool>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct DeleteRequest {
|
||||
pub conn: String,
|
||||
pub collection: String,
|
||||
pub filter: Document,
|
||||
#[serde(default)]
|
||||
pub multi: Option<bool>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct AggregateRequest {
|
||||
pub conn: String,
|
||||
pub collection: String,
|
||||
pub pipeline: Vec<Document>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct CountRequest {
|
||||
pub conn: String,
|
||||
pub collection: String,
|
||||
#[serde(default)]
|
||||
pub filter: Option<Document>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct AddConnectionRequest {
|
||||
pub name: String,
|
||||
pub host: String,
|
||||
#[serde(default = "default_port")]
|
||||
pub port: u16,
|
||||
pub database: String,
|
||||
pub username: Option<String>,
|
||||
pub password: Option<String>,
|
||||
pub auth_source: Option<String>,
|
||||
}
|
||||
|
||||
fn default_port() -> u16 { 27017 }
|
||||
|
||||
// ============== 响应结构 ==============
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct FindResponse {
|
||||
pub documents: Vec<serde_json::Value>,
|
||||
pub count: usize,
|
||||
#[serde(rename = "durationMs")]
|
||||
pub duration_ms: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct InsertResponse {
|
||||
#[serde(rename = "insertedIds")]
|
||||
pub inserted_ids: Vec<String>,
|
||||
#[serde(rename = "insertedCount")]
|
||||
pub inserted_count: usize,
|
||||
#[serde(rename = "durationMs")]
|
||||
pub duration_ms: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct UpdateResponse {
|
||||
#[serde(rename = "matchedCount")]
|
||||
pub matched_count: u64,
|
||||
#[serde(rename = "modifiedCount")]
|
||||
pub modified_count: u64,
|
||||
#[serde(rename = "upsertedId")]
|
||||
pub upserted_id: Option<String>,
|
||||
#[serde(rename = "durationMs")]
|
||||
pub duration_ms: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct DeleteResponse {
|
||||
#[serde(rename = "deletedCount")]
|
||||
pub deleted_count: u64,
|
||||
#[serde(rename = "durationMs")]
|
||||
pub duration_ms: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct AggregateResponse {
|
||||
pub documents: Vec<serde_json::Value>,
|
||||
#[serde(rename = "durationMs")]
|
||||
pub duration_ms: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct CountResponse {
|
||||
pub count: u64,
|
||||
#[serde(rename = "durationMs")]
|
||||
pub duration_ms: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct ConnectionsResponse {
|
||||
pub connections: Vec<crate::db::ConnectionInfo>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct HealthResponse {
|
||||
pub status: String,
|
||||
pub connections: usize,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct ErrorResponse {
|
||||
pub error: String,
|
||||
#[serde(rename = "usage")]
|
||||
pub usage: Option<String>,
|
||||
}
|
||||
|
||||
impl ErrorResponse {
|
||||
pub fn new(msg: &str) -> Self {
|
||||
Self { error: msg.to_string(), usage: None }
|
||||
}
|
||||
|
||||
pub fn with_usage(mut self, usage: &str) -> Self {
|
||||
self.usage = Some(usage.to_string());
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
// ============== 处理器 ==============
|
||||
|
||||
pub async fn find(
|
||||
State(state): State<AppState>,
|
||||
Json(req): Json<FindRequest>,
|
||||
) -> Result<Json<FindResponse>, (StatusCode, Json<ErrorResponse>)> {
|
||||
let (manager, logger) = &*state;
|
||||
let start = Instant::now();
|
||||
|
||||
let client = manager.get_client(&req.conn).await
|
||||
.map_err(|e| error_response_with_usage(&e.to_string(),
|
||||
&format!("Usage: POST /find {{\"conn\": \"name\", \"collection\": \"coll\", \"filter\": {{}}}}\nAvailable: {}", list_conn_names(manager))))?;
|
||||
|
||||
let db = client.database(&get_database(manager, &req.conn));
|
||||
let coll = db.collection::<Document>(&req.collection);
|
||||
|
||||
let mut find = coll.find(req.filter.unwrap_or_default());
|
||||
if let Some(p) = req.projection { find = find.projection(p); }
|
||||
if let Some(s) = req.sort { find = find.sort(s); }
|
||||
if let Some(l) = req.limit { find = find.limit(l); }
|
||||
if let Some(s) = req.skip { find = find.skip(s); }
|
||||
|
||||
let mut cursor = find.await
|
||||
.map_err(|e| error_response(&e.to_string()))?;
|
||||
|
||||
let mut documents = Vec::new();
|
||||
while let Some(result) = cursor.try_next().await.map_err(|e: mongodb::error::Error| error_response(&e.to_string()))? {
|
||||
documents.push(bson_to_json(&result));
|
||||
}
|
||||
|
||||
let count = documents.len();
|
||||
let duration = start.elapsed();
|
||||
|
||||
logger.log(&LogEntry::new("/find", "http")
|
||||
.with_conn(&req.conn)
|
||||
.with_command(&format!("{}.{}", req.conn, req.collection))
|
||||
.with_duration(duration.as_millis() as u64)
|
||||
.with_rows(count));
|
||||
|
||||
Ok(Json(FindResponse { documents, count, duration_ms: duration.as_millis() as u64 }))
|
||||
}
|
||||
|
||||
pub async fn insert(
|
||||
State(state): State<AppState>,
|
||||
Json(req): Json<InsertRequest>,
|
||||
) -> Result<Json<InsertResponse>, (StatusCode, Json<ErrorResponse>)> {
|
||||
let (manager, logger) = &*state;
|
||||
let start = Instant::now();
|
||||
|
||||
let client = manager.get_client(&req.conn).await
|
||||
.map_err(|e| error_response_with_usage(&e.to_string(),
|
||||
&format!("Usage: POST /insert {{\"conn\": \"name\", \"collection\": \"coll\", \"documents\": [{{}}]}}")))?;
|
||||
|
||||
let db = client.database(&get_database(manager, &req.conn));
|
||||
let coll = db.collection::<Document>(&req.collection);
|
||||
|
||||
let result = coll.insert_many(req.documents).await
|
||||
.map_err(|e| error_response(&e.to_string()))?;
|
||||
|
||||
let inserted_ids: Vec<String> = result.inserted_ids.iter()
|
||||
.map(|(k, v)| format!("{}:{}", k, v))
|
||||
.collect();
|
||||
let inserted_count = result.inserted_ids.len();
|
||||
let duration = start.elapsed();
|
||||
|
||||
logger.log(&LogEntry::new("/insert", "http")
|
||||
.with_conn(&req.conn)
|
||||
.with_command(&format!("{}.{}", req.conn, req.collection))
|
||||
.with_duration(duration.as_millis() as u64)
|
||||
.with_rows(inserted_count));
|
||||
|
||||
Ok(Json(InsertResponse { inserted_ids, inserted_count, duration_ms: duration.as_millis() as u64 }))
|
||||
}
|
||||
|
||||
pub async fn update(
|
||||
State(state): State<AppState>,
|
||||
Json(req): Json<UpdateRequest>,
|
||||
) -> Result<Json<UpdateResponse>, (StatusCode, Json<ErrorResponse>)> {
|
||||
let (manager, logger) = &*state;
|
||||
let start = Instant::now();
|
||||
|
||||
let client = manager.get_client(&req.conn).await
|
||||
.map_err(|e| error_response_with_usage(&e.to_string(),
|
||||
"Usage: POST /update {\"conn\": \"name\", \"collection\": \"coll\", \"filter\": {}, \"update\": {\"$set\": {}}}"))?;
|
||||
|
||||
let db = client.database(&get_database(manager, &req.conn));
|
||||
let coll = db.collection::<Document>(&req.collection);
|
||||
|
||||
let result = if req.multi.unwrap_or(false) {
|
||||
let mut cmd = coll.update_many(req.filter, req.update);
|
||||
if req.upsert.unwrap_or(false) { cmd = cmd.upsert(true); }
|
||||
cmd.await
|
||||
} else {
|
||||
let mut cmd = coll.update_one(req.filter, req.update);
|
||||
if req.upsert.unwrap_or(false) { cmd = cmd.upsert(true); }
|
||||
cmd.await
|
||||
}.map_err(|e| error_response(&e.to_string()))?;
|
||||
|
||||
let upserted_id = result.upserted_id.map(|id| format!("{:?}", id));
|
||||
let duration = start.elapsed();
|
||||
|
||||
logger.log(&LogEntry::new("/update", "http")
|
||||
.with_conn(&req.conn)
|
||||
.with_command(&format!("{}.{}", req.conn, req.collection))
|
||||
.with_duration(duration.as_millis() as u64));
|
||||
|
||||
Ok(Json(UpdateResponse {
|
||||
matched_count: result.matched_count,
|
||||
modified_count: result.modified_count,
|
||||
upserted_id,
|
||||
duration_ms: duration.as_millis() as u64,
|
||||
}))
|
||||
}
|
||||
|
||||
pub async fn delete(
|
||||
State(state): State<AppState>,
|
||||
Json(req): Json<DeleteRequest>,
|
||||
) -> Result<Json<DeleteResponse>, (StatusCode, Json<ErrorResponse>)> {
|
||||
let (manager, logger) = &*state;
|
||||
let start = Instant::now();
|
||||
|
||||
let client = manager.get_client(&req.conn).await
|
||||
.map_err(|e| error_response_with_usage(&e.to_string(),
|
||||
"Usage: POST /delete {\"conn\": \"name\", \"collection\": \"coll\", \"filter\": {}}"))?;
|
||||
|
||||
let db = client.database(&get_database(manager, &req.conn));
|
||||
let coll = db.collection::<Document>(&req.collection);
|
||||
|
||||
let result = if req.multi.unwrap_or(false) {
|
||||
coll.delete_many(req.filter).await
|
||||
} else {
|
||||
coll.delete_one(req.filter).await
|
||||
}.map_err(|e| error_response(&e.to_string()))?;
|
||||
|
||||
let deleted_count = result.deleted_count;
|
||||
let duration = start.elapsed();
|
||||
|
||||
logger.log(&LogEntry::new("/delete", "http")
|
||||
.with_conn(&req.conn)
|
||||
.with_command(&format!("{}.{}", req.conn, req.collection))
|
||||
.with_duration(duration.as_millis() as u64));
|
||||
|
||||
Ok(Json(DeleteResponse { deleted_count, duration_ms: duration.as_millis() as u64 }))
|
||||
}
|
||||
|
||||
pub async fn aggregate(
|
||||
State(state): State<AppState>,
|
||||
Json(req): Json<AggregateRequest>,
|
||||
) -> Result<Json<AggregateResponse>, (StatusCode, Json<ErrorResponse>)> {
|
||||
let (manager, logger) = &*state;
|
||||
let start = Instant::now();
|
||||
|
||||
let client = manager.get_client(&req.conn).await
|
||||
.map_err(|e| error_response_with_usage(&e.to_string(),
|
||||
"Usage: POST /aggregate {\"conn\": \"name\", \"collection\": \"coll\", \"pipeline\": [{\"$match\": {}}, ...]}"))?;
|
||||
|
||||
let db = client.database(&get_database(manager, &req.conn));
|
||||
let coll = db.collection::<Document>(&req.collection);
|
||||
|
||||
let mut cursor = coll.aggregate(req.pipeline).await
|
||||
.map_err(|e| error_response(&e.to_string()))?;
|
||||
|
||||
let mut documents = Vec::new();
|
||||
while let Some(result) = cursor.try_next().await.map_err(|e: mongodb::error::Error| error_response(&e.to_string()))? {
|
||||
documents.push(bson_to_json(&result));
|
||||
}
|
||||
|
||||
let count = documents.len();
|
||||
let duration = start.elapsed();
|
||||
|
||||
logger.log(&LogEntry::new("/aggregate", "http")
|
||||
.with_conn(&req.conn)
|
||||
.with_command(&format!("{}.{}", req.conn, req.collection))
|
||||
.with_duration(duration.as_millis() as u64)
|
||||
.with_rows(count));
|
||||
|
||||
Ok(Json(AggregateResponse { documents, duration_ms: duration.as_millis() as u64 }))
|
||||
}
|
||||
|
||||
pub async fn count(
|
||||
State(state): State<AppState>,
|
||||
Json(req): Json<CountRequest>,
|
||||
) -> Result<Json<CountResponse>, (StatusCode, Json<ErrorResponse>)> {
|
||||
let (manager, logger) = &*state;
|
||||
let start = Instant::now();
|
||||
|
||||
let client = manager.get_client(&req.conn).await
|
||||
.map_err(|e| error_response_with_usage(&e.to_string(),
|
||||
"Usage: POST /count {\"conn\": \"name\", \"collection\": \"coll\", \"filter\": {}}"))?;
|
||||
|
||||
let db = client.database(&get_database(manager, &req.conn));
|
||||
let coll = db.collection::<Document>(&req.collection);
|
||||
|
||||
let count = coll.count_documents(req.filter.unwrap_or_default()).await
|
||||
.map_err(|e| error_response(&e.to_string()))?;
|
||||
|
||||
let duration = start.elapsed();
|
||||
|
||||
logger.log(&LogEntry::new("/count", "http")
|
||||
.with_conn(&req.conn)
|
||||
.with_command(&format!("{}.{}", req.conn, req.collection))
|
||||
.with_duration(duration.as_millis() as u64));
|
||||
|
||||
Ok(Json(CountResponse { count, duration_ms: duration.as_millis() as u64 }))
|
||||
}
|
||||
|
||||
pub async fn connections(
|
||||
State(state): State<AppState>,
|
||||
) -> Json<ConnectionsResponse> {
|
||||
let (manager, _) = &*state;
|
||||
Json(ConnectionsResponse {
|
||||
connections: manager.list_connections(),
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn health(
|
||||
State(state): State<AppState>,
|
||||
) -> Json<HealthResponse> {
|
||||
let (manager, _) = &*state;
|
||||
Json(HealthResponse {
|
||||
status: "ok".to_string(),
|
||||
connections: manager.list_connections().len(),
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn add_connection(
|
||||
State(state): State<AppState>,
|
||||
Json(req): Json<AddConnectionRequest>,
|
||||
) -> Result<Json<serde_json::Value>, (StatusCode, Json<ErrorResponse>)> {
|
||||
let (manager, logger) = &*state;
|
||||
use crate::config::ConnectionConfig;
|
||||
|
||||
let cfg = ConnectionConfig {
|
||||
name: req.name.clone(),
|
||||
host: req.host,
|
||||
port: req.port,
|
||||
database: req.database,
|
||||
username: req.username,
|
||||
password: req.password,
|
||||
auth_source: req.auth_source,
|
||||
};
|
||||
|
||||
manager.add_connection(cfg).await
|
||||
.map_err(|e| error_response(&e.to_string()))?;
|
||||
|
||||
logger.log(&LogEntry::new("/connections/add", "http")
|
||||
.with_conn(&req.name)
|
||||
.with_duration(0));
|
||||
|
||||
Ok(Json(serde_json::json!({
|
||||
"success": true,
|
||||
"message": format!("Connection '{}' added (temporary)", req.name)
|
||||
})))
|
||||
}
|
||||
|
||||
// ============== 辅助函数 ==============
|
||||
|
||||
fn error_response(msg: &str) -> (StatusCode, Json<ErrorResponse>) {
|
||||
(StatusCode::BAD_REQUEST, Json(ErrorResponse::new(msg)))
|
||||
}
|
||||
|
||||
fn error_response_with_usage(msg: &str, usage: &str) -> (StatusCode, Json<ErrorResponse>) {
|
||||
(StatusCode::BAD_REQUEST, Json(ErrorResponse::new(msg).with_usage(usage)))
|
||||
}
|
||||
|
||||
fn list_conn_names(manager: &ConnectionManager) -> String {
|
||||
manager.list_connections()
|
||||
.iter()
|
||||
.map(|c| c.name.clone())
|
||||
.collect::<Vec<_>>()
|
||||
.join(", ")
|
||||
}
|
||||
|
||||
fn get_database(manager: &ConnectionManager, name: &str) -> String {
|
||||
manager.list_connections().iter()
|
||||
.find(|c| c.name == name)
|
||||
.map(|c| c.database.clone())
|
||||
.unwrap_or_default()
|
||||
}
|
||||
|
||||
fn bson_to_json(doc: &Document) -> serde_json::Value {
|
||||
serde_json::to_value(doc).unwrap_or(serde_json::Value::Null)
|
||||
}
|
||||
150
mongo-proxy/src/logger.rs
Normal file
150
mongo-proxy/src/logger.rs
Normal file
@@ -0,0 +1,150 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::fs::{File, OpenOptions};
|
||||
use std::io::Write;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Mutex;
|
||||
|
||||
pub struct RequestLogger {
|
||||
log_file: Mutex<Option<File>>,
|
||||
enabled: bool,
|
||||
}
|
||||
|
||||
impl RequestLogger {
|
||||
pub fn new(log_path: Option<&str>) -> Self {
|
||||
let (log_file, enabled) = if let Some(path) = log_path {
|
||||
let path = expand_path(path);
|
||||
if let Some(parent) = path.parent() {
|
||||
let _ = std::fs::create_dir_all(parent);
|
||||
}
|
||||
match OpenOptions::new()
|
||||
.create(true)
|
||||
.append(true)
|
||||
.open(&path)
|
||||
{
|
||||
Ok(file) => (Some(file), true),
|
||||
Err(e) => {
|
||||
eprintln!("[Logger] Failed to open log file {}: {}", path.display(), e);
|
||||
(None, false)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
(None, false)
|
||||
};
|
||||
|
||||
Self { log_file: Mutex::new(log_file), enabled }
|
||||
}
|
||||
|
||||
pub fn is_enabled(&self) -> bool { self.enabled }
|
||||
|
||||
pub fn log(&self, entry: &LogEntry) {
|
||||
if !self.enabled { return; }
|
||||
|
||||
let json = match serde_json::to_string(entry) {
|
||||
Ok(j) => j,
|
||||
Err(e) => {
|
||||
eprintln!("[Logger] Failed to serialize log: {}", e);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
if let Ok(mut file) = self.log_file.lock() {
|
||||
if let Some(ref mut f) = *file {
|
||||
let _ = writeln!(f, "{}", json);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct LogEntry {
|
||||
pub timestamp: String,
|
||||
pub level: String,
|
||||
#[serde(rename = "type")]
|
||||
pub log_type: String,
|
||||
pub client: String,
|
||||
pub endpoint: String,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub conn: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub command: Option<String>,
|
||||
#[serde(rename = "durationMs")]
|
||||
pub duration_ms: u64,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub rows: Option<usize>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub error: Option<String>,
|
||||
}
|
||||
|
||||
impl LogEntry {
|
||||
pub fn new(endpoint: &str, client: &str) -> Self {
|
||||
Self {
|
||||
timestamp: current_timestamp(),
|
||||
level: "INFO".to_string(),
|
||||
log_type: "request".to_string(),
|
||||
client: client.to_string(),
|
||||
endpoint: endpoint.to_string(),
|
||||
conn: None,
|
||||
command: None,
|
||||
duration_ms: 0,
|
||||
rows: None,
|
||||
error: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_conn(mut self, conn: &str) -> Self {
|
||||
self.conn = Some(conn.to_string());
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_command(mut self, cmd: &str) -> Self {
|
||||
self.command = Some(truncate_string(cmd, 500));
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_duration(mut self, ms: u64) -> Self {
|
||||
self.duration_ms = ms;
|
||||
if ms > 5000 && self.level == "INFO" {
|
||||
self.level = "ERROR".to_string();
|
||||
} else if ms > 2000 && self.level == "INFO" {
|
||||
self.level = "WARN".to_string();
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_rows(mut self, rows: usize) -> Self {
|
||||
self.rows = Some(rows);
|
||||
self
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub fn with_error(mut self, error: &str) -> Self {
|
||||
self.level = "ERROR".to_string();
|
||||
self.error = Some(error.to_string());
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
fn current_timestamp() -> String {
|
||||
time::OffsetDateTime::now_utc()
|
||||
.format(&time::macros::format_description!("[year]-[month]-[day]T[hour]:[minute]:[second]"))
|
||||
.unwrap_or_default()
|
||||
}
|
||||
|
||||
fn truncate_string(s: &str, max_len: usize) -> String {
|
||||
if s.chars().count() <= max_len {
|
||||
s.to_string()
|
||||
} else {
|
||||
format!("{}... (truncated)", s.chars().take(max_len).collect::<String>())
|
||||
}
|
||||
}
|
||||
|
||||
fn expand_path(path: &str) -> PathBuf {
|
||||
if path.starts_with('~') {
|
||||
let home = std::env::var("HOME")
|
||||
.or_else(|_| std::env::var("USERPROFILE"))
|
||||
.unwrap_or_default();
|
||||
PathBuf::from(path.replacen('~', &home, 1))
|
||||
} else {
|
||||
PathBuf::from(path)
|
||||
}
|
||||
}
|
||||
310
mongo-proxy/src/main.rs
Normal file
310
mongo-proxy/src/main.rs
Normal file
@@ -0,0 +1,310 @@
|
||||
mod cli;
|
||||
mod config;
|
||||
mod db;
|
||||
mod handler;
|
||||
mod logger;
|
||||
|
||||
use axum::{
|
||||
routing::{get, post},
|
||||
Router,
|
||||
};
|
||||
use clap::{Parser, Subcommand};
|
||||
use std::sync::Arc;
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
#[command(name = "mongo-proxy")]
|
||||
#[command(about = "MongoDB HTTP proxy with connection pooling")]
|
||||
struct Args {
|
||||
#[command(subcommand)]
|
||||
command: Option<Commands>,
|
||||
|
||||
/// Config file path
|
||||
#[arg(long, default_value = "mongo-proxy.toml", global = true)]
|
||||
config: String,
|
||||
|
||||
/// Server URL (for CLI mode)
|
||||
#[arg(short = 'S', long, default_value = "http://127.0.0.1:3309", global = true)]
|
||||
server: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Subcommand)]
|
||||
enum Commands {
|
||||
/// Start HTTP server (default)
|
||||
Server {
|
||||
#[arg(short = 'P', long)]
|
||||
port: Option<u16>,
|
||||
|
||||
#[arg(short = 'H', long)]
|
||||
host: Option<String>,
|
||||
},
|
||||
|
||||
/// Find documents
|
||||
Find {
|
||||
/// Connection name
|
||||
#[arg(short, long)]
|
||||
conn: String,
|
||||
|
||||
/// Collection name
|
||||
#[arg(short = 'C', long)]
|
||||
collection: String,
|
||||
|
||||
/// Filter JSON
|
||||
#[arg(short, long, default_value = "{}")]
|
||||
filter: String,
|
||||
|
||||
/// Projection JSON
|
||||
#[arg(short, long)]
|
||||
projection: Option<String>,
|
||||
|
||||
/// Limit
|
||||
#[arg(short, long)]
|
||||
limit: Option<i64>,
|
||||
|
||||
/// Skip
|
||||
#[arg(short, long)]
|
||||
skip: Option<u64>,
|
||||
|
||||
/// Output format: json, pretty
|
||||
#[arg(short = 'F', long, default_value = "pretty")]
|
||||
format: String,
|
||||
},
|
||||
|
||||
/// Insert documents
|
||||
Insert {
|
||||
#[arg(short, long)]
|
||||
conn: String,
|
||||
|
||||
#[arg(short = 'C', long)]
|
||||
collection: String,
|
||||
|
||||
/// Documents JSON array
|
||||
#[arg(short, long)]
|
||||
documents: String,
|
||||
},
|
||||
|
||||
/// Update documents
|
||||
Update {
|
||||
#[arg(short, long)]
|
||||
conn: String,
|
||||
|
||||
#[arg(short = 'C', long)]
|
||||
collection: String,
|
||||
|
||||
/// Filter JSON
|
||||
#[arg(short, long)]
|
||||
filter: String,
|
||||
|
||||
/// Update JSON
|
||||
#[arg(short, long)]
|
||||
update: String,
|
||||
|
||||
/// Upsert
|
||||
#[arg(long)]
|
||||
upsert: bool,
|
||||
|
||||
/// Update multiple
|
||||
#[arg(long)]
|
||||
multi: bool,
|
||||
},
|
||||
|
||||
/// Delete documents
|
||||
Delete {
|
||||
#[arg(short, long)]
|
||||
conn: String,
|
||||
|
||||
#[arg(short = 'C', long)]
|
||||
collection: String,
|
||||
|
||||
/// Filter JSON
|
||||
#[arg(short, long)]
|
||||
filter: String,
|
||||
|
||||
/// Delete multiple
|
||||
#[arg(long)]
|
||||
multi: bool,
|
||||
},
|
||||
|
||||
/// Aggregate pipeline
|
||||
Aggregate {
|
||||
#[arg(short, long)]
|
||||
conn: String,
|
||||
|
||||
#[arg(short = 'C', long)]
|
||||
collection: String,
|
||||
|
||||
/// Pipeline JSON array
|
||||
#[arg(long)]
|
||||
pipeline: String,
|
||||
},
|
||||
|
||||
/// Count documents
|
||||
Count {
|
||||
#[arg(short, long)]
|
||||
conn: String,
|
||||
|
||||
#[arg(short = 'C', long)]
|
||||
collection: String,
|
||||
|
||||
/// Filter JSON
|
||||
#[arg(short, long)]
|
||||
filter: Option<String>,
|
||||
},
|
||||
|
||||
/// List available connections
|
||||
Connections,
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
let args = Args::parse();
|
||||
|
||||
match args.command {
|
||||
Some(Commands::Server { port, host }) => {
|
||||
run_server(&args.config, port, host).await
|
||||
}
|
||||
Some(Commands::Find { conn, collection, filter, projection, limit, skip, format }) => {
|
||||
run_find(&args.server, &conn, &collection, &filter, &format, projection.as_deref(), limit, skip)
|
||||
}
|
||||
Some(Commands::Insert { conn, collection, documents }) => {
|
||||
run_insert(&args.server, &conn, &collection, &documents)
|
||||
}
|
||||
Some(Commands::Update { conn, collection, filter, update, upsert, multi }) => {
|
||||
run_update(&args.server, &conn, &collection, &filter, &update, upsert, multi)
|
||||
}
|
||||
Some(Commands::Delete { conn, collection, filter, multi }) => {
|
||||
run_delete(&args.server, &conn, &collection, &filter, multi)
|
||||
}
|
||||
Some(Commands::Aggregate { conn, collection, pipeline }) => {
|
||||
run_aggregate(&args.server, &conn, &collection, &pipeline)
|
||||
}
|
||||
Some(Commands::Count { conn, collection, filter }) => {
|
||||
run_count(&args.server, &conn, &collection, filter.as_deref())
|
||||
}
|
||||
Some(Commands::Connections) => {
|
||||
list_connections(&args.server)
|
||||
}
|
||||
None => {
|
||||
run_server(&args.config, None, None).await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn run_server(config_path: &str, port: Option<u16>, host: Option<String>) -> anyhow::Result<()> {
|
||||
println!("MongoDB HTTP Proxy v0.1.0\n");
|
||||
|
||||
let mut config = config::Config::from_file(config_path)?;
|
||||
|
||||
if let Some(port) = port {
|
||||
config.server.port = port;
|
||||
}
|
||||
if let Some(host) = host {
|
||||
config.server.host = host;
|
||||
}
|
||||
|
||||
let log_path = std::env::var("MONGO_PROXY_LOG").ok();
|
||||
let logger = Arc::new(logger::RequestLogger::new(log_path.as_deref()));
|
||||
if logger.is_enabled() {
|
||||
println!("Request logging: enabled");
|
||||
}
|
||||
|
||||
println!("Initializing connection pools...\n");
|
||||
let manager = Arc::new(db::ConnectionManager::new(&config.connections, config.pool.clone())?);
|
||||
|
||||
let manager_clone = manager.clone();
|
||||
let check_interval = config.pool.check_interval_secs;
|
||||
tokio::spawn(async move {
|
||||
let mut interval = tokio::time::interval(std::time::Duration::from_secs(check_interval));
|
||||
loop {
|
||||
interval.tick().await;
|
||||
manager_clone.cleanup_idle();
|
||||
}
|
||||
});
|
||||
|
||||
let app = Router::new()
|
||||
.route("/find", post(handler::find))
|
||||
.route("/insert", post(handler::insert))
|
||||
.route("/update", post(handler::update))
|
||||
.route("/delete", post(handler::delete))
|
||||
.route("/aggregate", post(handler::aggregate))
|
||||
.route("/count", post(handler::count))
|
||||
.route("/connections", get(handler::connections))
|
||||
.route("/connections/add", post(handler::add_connection))
|
||||
.route("/health", get(handler::health))
|
||||
.with_state(Arc::new((manager, logger)));
|
||||
|
||||
let addr = format!("{}:{}", config.server.host, config.server.port);
|
||||
let listener = tokio::net::TcpListener::bind(&addr).await?;
|
||||
|
||||
println!("\nServer started at http://{}", addr);
|
||||
println!("\nAPI Endpoints:");
|
||||
println!(" POST /find - Query documents");
|
||||
println!(" POST /insert - Insert documents");
|
||||
println!(" POST /update - Update documents");
|
||||
println!(" POST /delete - Delete documents");
|
||||
println!(" POST /aggregate - Aggregation pipeline");
|
||||
println!(" POST /count - Count documents");
|
||||
println!(" GET /connections - List all connections");
|
||||
println!(" POST /connections/add - Add connection (temporary)");
|
||||
println!(" GET /health - Health check");
|
||||
println!("\nCLI Usage:");
|
||||
println!(" mongo-proxy find -c flux_dev -C users -f '{{\"status\": 1}}'");
|
||||
println!(" mongo-proxy insert -c flux_dev -C users -d '[{{\"name\": \"test\"}}]'");
|
||||
println!(" mongo-proxy aggregate -c flux_dev -C users --pipeline '[{{\"$match\": {{}}}}]'");
|
||||
println!(" mongo-proxy connections");
|
||||
|
||||
axum::serve(listener, app).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn run_cli(server: &str) -> anyhow::Result<cli::Cli> {
|
||||
let c = cli::Cli::new(Some(server.to_string()));
|
||||
if !c.check_server()? {
|
||||
eprintln!();
|
||||
eprintln!("=== mongo-proxy 未运行,请使用降级方案 ===");
|
||||
eprintln!();
|
||||
eprintln!("启动代理: mongo-proxy");
|
||||
eprintln!();
|
||||
eprintln!("或直接使用 mongosh:");
|
||||
eprintln!(" mongosh \"mongodb://user:pass@host:27017/db\"");
|
||||
eprintln!();
|
||||
eprintln!("配置文件中的连接信息见: mongo-proxy.toml");
|
||||
anyhow::bail!("Proxy server not running at {}", server);
|
||||
}
|
||||
Ok(c)
|
||||
}
|
||||
|
||||
fn run_find(server: &str, conn: &str, collection: &str, filter: &str, format: &str,
|
||||
projection: Option<&str>, limit: Option<i64>, skip: Option<u64>) -> anyhow::Result<()> {
|
||||
let cli = run_cli(server)?;
|
||||
cli.find(conn, collection, filter, format, projection, limit, skip)
|
||||
}
|
||||
|
||||
fn run_insert(server: &str, conn: &str, collection: &str, documents: &str) -> anyhow::Result<()> {
|
||||
let cli = run_cli(server)?;
|
||||
cli.insert(conn, collection, documents)
|
||||
}
|
||||
|
||||
fn run_update(server: &str, conn: &str, collection: &str, filter: &str, update: &str, upsert: bool, multi: bool) -> anyhow::Result<()> {
|
||||
let cli = run_cli(server)?;
|
||||
cli.update(conn, collection, filter, update, upsert, multi)
|
||||
}
|
||||
|
||||
fn run_delete(server: &str, conn: &str, collection: &str, filter: &str, multi: bool) -> anyhow::Result<()> {
|
||||
let cli = run_cli(server)?;
|
||||
cli.delete(conn, collection, filter, multi)
|
||||
}
|
||||
|
||||
fn run_aggregate(server: &str, conn: &str, collection: &str, pipeline: &str) -> anyhow::Result<()> {
|
||||
let cli = run_cli(server)?;
|
||||
cli.aggregate(conn, collection, pipeline)
|
||||
}
|
||||
|
||||
fn run_count(server: &str, conn: &str, collection: &str, filter: Option<&str>) -> anyhow::Result<()> {
|
||||
let cli = run_cli(server)?;
|
||||
cli.count(conn, collection, filter)
|
||||
}
|
||||
|
||||
fn list_connections(server: &str) -> anyhow::Result<()> {
|
||||
let cli = run_cli(server)?;
|
||||
cli.list_connections()
|
||||
}
|
||||
Reference in New Issue
Block a user