diff --git a/.gitignore b/.gitignore index 34b47d7..db08f8b 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,5 @@ logs/ # 配置文件(含敏感信息) mysql-proxy/mysql-proxy.toml ssh-proxy/ssh-proxy.toml +mongo-proxy/mongo-proxy.toml +redis-proxy/redis-proxy.toml diff --git a/INCUBATOR.md b/INCUBATOR.md index 9f14321..caedc04 100644 --- a/INCUBATOR.md +++ b/INCUBATOR.md @@ -1,6 +1,6 @@ # 代理工具孵化记录 -> 状态:孵化中 | 更新:2026-03-24 +> 状态:孵化中 | 更新:2026-03-27 --- @@ -152,6 +152,54 @@ ssh @ "" --- +## 代码审查 (2026-03-27) + +> 12 维度并行审查:架构一致性、错误处理、安全性、性能、DRY、日志、配置管理、CLI 一致性、边界条件、依赖、资源清理、代码度量 + +### 必须修复 + +| # | 问题 | 文件 | 说明 | 状态 | +|---|------|------|------|------| +| 1 | .gitignore 未覆盖 mongo/redis 配置 | `.gitignore` | 含真实密码,随时可能被 `git add` 提交 | ✅ | +| 2 | Redis 密码无 URL 编码 | `redis-proxy/src/config.rs` | 密码含 `@` `:` 时 URL 解析失败 | ✅ | +| 3 | mongo urlencoding 不完整 | `mongo-proxy/src/config.rs` | 只编码 5 字符,mysql 编码 19 字符 | ✅ | +| 4 | Redis 无重连机制 | `redis-proxy/src/db.rs` | Redis 重启后持续失败 5 分钟直到 idle timeout | ✅ | +| 5 | mysql-proxy handler 阻塞 tokio runtime | `mysql-proxy/src/handler.rs` | 同步 DB 操作未用 spawn_blocking,阻塞 worker 线程 | ✅ | +| 6 | 全部失败路径无日志 | 所有 handler.rs (4 项目) | 约 36 个 map_err/?? 路径跳过 logger.log() | ✅ | +| 7 | with_error() 从未调用 | 所有 logger.rs (4 项目) | 错误日志 level 永远是 INFO | ✅ | + +### 建议改进 + +| # | 问题 | 文件 | 说明 | +|---|------|------|------| +| 8 | get_conn TOCTOU 竞态 | mysql/mongo/redis `db.rs` | 释放锁后重建,可能重复创建连接 | +| 9 | mysql `--config` 占用 `-c` | `mysql-proxy/src/main.rs:22` | 与 `--conn` 冲突 | +| 10 | LogEntry 字段不一致 | 4 个 `logger.rs` | server/sql/rows/exitCode 各有缺失 | +| 11 | logger.rs 等 ~300 行重复 | 4 个项目 | RequestLogger/LogEntry/辅助函数高度重复 | +| 12 | ssh-proxy 无 SSH disconnect | `ssh-proxy/src/session.rs` | cleanup 只移除 HashMap 不发 disconnect | +| 13 | 无 graceful shutdown | 4 个 `main.rs` | Ctrl+C 不等待进行中请求 | +| 14 | redis-proxy 降级提示单行 | `redis-proxy/src/main.rs:52` | 其他 3 个用多行格式 | +| 15 | mongo-proxy JSON 模式打印耗时 | `mongo-proxy/src/cli.rs:86` | 破坏 JSON 输出纯度 | +| 16 | redis-proxy 无 JSON 输出 | `redis-proxy/src/cli.rs` | 其他 3 个都支持 -F json | +| 17 | mongo insert/update 缺 usage 连接列表 | `mongo-proxy/src/handler.rs:224,256` | find/count 有但 insert/update 没有 | +| 18 | ssh-proxy 未使用 russh-keys 依赖 | `ssh-proxy/Cargo.toml` | beta 版本,代码用 russh::keys | +| 19 | tokio features = ["full"] 过宽 | 4 个 `Cargo.toml` | 实际只需 rt-multi-thread/net/time/sync | +| 20 | 日志写入阻塞 tokio | 4 个 `logger.rs` | 同步 writeln 阻塞 worker 线程 | + +### 代码度量 + +| 项目 | 行数 | 文件数 | 最长函数 | unwrap | clone | unsafe | +|------|------|--------|---------|-------|-------| +| mysql-proxy | 1,209 | 6 | query() 74行 | 10 | 18 | 0 | +| ssh-proxy | 997 | 6 | exec() 49行 | 0 | 17 | 0 | +| mongo-proxy | 1,422 | 6 | find() 38行 | 9 | 11 | 0 | +| redis-proxy | 964 | 6 | run_cmd() 19行 | 10 | 30 | 0 | +| **合计** | **4,592** | **24** | -- | **29** | **76** | **0** | + +> 无 unsafe 代码,无 TODO/FIXME,嵌套深度最大 3 层,函数最长 74 行。代码质量良好。 + +--- + ## 已知问题 ### ssh-proxy @@ -166,4 +214,11 @@ ssh @ "" ## 待优化 -*由 AI 在实际使用过程中根据遇到的问题填写* +- [ ] 提取 proxy-common 公共 crate (logger/config/error/cli 基础) +- [ ] HTTP API 认证机制 (当前仅依赖 127.0.0.1 绑定) +- [ ] SSH 主机密钥验证 (当前无条件信任) +- [ ] redis-proxy 改用 redis::aio::ConnectionManager (避免 spawn_blocking) +- [ ] 日志写入改为 channel + 后台线程 (避免阻塞 tokio) +- [ ] 统一 graceful shutdown (with_graceful_shutdown) +- [x] .gitignore 补充 mongo/redis 配置文件 +- [x] redis/mongo URL 编码对齐 mysql diff --git a/mongo-proxy/src/config.rs b/mongo-proxy/src/config.rs index d8e0f5f..d85ffa4 100644 --- a/mongo-proxy/src/config.rs +++ b/mongo-proxy/src/config.rs @@ -132,6 +132,21 @@ fn urlencoding(s: &str) -> String { '/' => result.push_str("%2F"), '?' => result.push_str("%3F"), '#' => result.push_str("%23"), + '[' => result.push_str("%5B"), + ']' => result.push_str("%5D"), + '!' => result.push_str("%21"), + '$' => result.push_str("%24"), + '&' => result.push_str("%26"), + '\'' => result.push_str("%27"), + '(' => result.push_str("%28"), + ')' => result.push_str("%29"), + '*' => result.push_str("%2A"), + '+' => result.push_str("%2B"), + ',' => result.push_str("%2C"), + ';' => result.push_str("%3B"), + '=' => result.push_str("%3D"), + '%' => result.push_str("%25"), + ' ' => result.push_str("%20"), _ => result.push(c), } } diff --git a/mongo-proxy/src/handler.rs b/mongo-proxy/src/handler.rs index 311d16f..f8934b2 100644 --- a/mongo-proxy/src/handler.rs +++ b/mongo-proxy/src/handler.rs @@ -182,8 +182,11 @@ pub async fn find( let start = Instant::now(); let client = manager.get_client(&req.conn).await - .map_err(|e| error_response_with_usage(&e.to_string(), - &format!("Usage: POST /find {{\"conn\": \"name\", \"collection\": \"coll\", \"filter\": {{}}}}\nAvailable: {}", list_conn_names(manager))))?; + .map_err(|e| { + logger.log(&LogEntry::new("/find", "http").with_conn(&req.conn).with_error(&e.to_string())); + error_response_with_usage(&e.to_string(), + &format!("Usage: POST /find {{\"conn\": \"name\", \"collection\": \"coll\", \"filter\": {{}}}}\nAvailable: {}", list_conn_names(manager))) + })?; let db = client.database(&get_database(manager, &req.conn)); let coll = db.collection::(&req.collection); @@ -195,10 +198,16 @@ pub async fn find( if let Some(s) = req.skip { find = find.skip(s); } let mut cursor = find.await - .map_err(|e| error_response(&e.to_string()))?; + .map_err(|e| { + logger.log(&LogEntry::new("/find", "http").with_conn(&req.conn).with_command(&format!("{}.{}", req.conn, req.collection)).with_error(&e.to_string())); + error_response(&e.to_string()) + })?; let mut documents = Vec::new(); - while let Some(result) = cursor.try_next().await.map_err(|e: mongodb::error::Error| error_response(&e.to_string()))? { + while let Some(result) = cursor.try_next().await.map_err(|e: mongodb::error::Error| { + logger.log(&LogEntry::new("/find", "http").with_conn(&req.conn).with_error(&e.to_string())); + error_response(&e.to_string()) + })? { documents.push(bson_to_json(&result)); } @@ -222,14 +231,20 @@ pub async fn insert( let start = Instant::now(); let client = manager.get_client(&req.conn).await - .map_err(|e| error_response_with_usage(&e.to_string(), - &format!("Usage: POST /insert {{\"conn\": \"name\", \"collection\": \"coll\", \"documents\": [{{}}]}}")))?; + .map_err(|e| { + logger.log(&LogEntry::new("/insert", "http").with_conn(&req.conn).with_error(&e.to_string())); + error_response_with_usage(&e.to_string(), + &format!("Usage: POST /insert {{\"conn\": \"name\", \"collection\": \"coll\", \"documents\": [{{}}]}}")) + })?; let db = client.database(&get_database(manager, &req.conn)); let coll = db.collection::(&req.collection); let result = coll.insert_many(req.documents).await - .map_err(|e| error_response(&e.to_string()))?; + .map_err(|e| { + logger.log(&LogEntry::new("/insert", "http").with_conn(&req.conn).with_command(&format!("{}.{}", req.conn, req.collection)).with_error(&e.to_string())); + error_response(&e.to_string()) + })?; let inserted_ids: Vec = result.inserted_ids.iter() .map(|(k, v)| format!("{}:{}", k, v)) @@ -254,8 +269,11 @@ pub async fn update( let start = Instant::now(); let client = manager.get_client(&req.conn).await - .map_err(|e| error_response_with_usage(&e.to_string(), - "Usage: POST /update {\"conn\": \"name\", \"collection\": \"coll\", \"filter\": {}, \"update\": {\"$set\": {}}}"))?; + .map_err(|e| { + logger.log(&LogEntry::new("/update", "http").with_conn(&req.conn).with_error(&e.to_string())); + error_response_with_usage(&e.to_string(), + "Usage: POST /update {\"conn\": \"name\", \"collection\": \"coll\", \"filter\": {}, \"update\": {\"$set\": {}}}") + })?; let db = client.database(&get_database(manager, &req.conn)); let coll = db.collection::(&req.collection); @@ -268,7 +286,10 @@ pub async fn update( let mut cmd = coll.update_one(req.filter, req.update); if req.upsert.unwrap_or(false) { cmd = cmd.upsert(true); } cmd.await - }.map_err(|e| error_response(&e.to_string()))?; + }.map_err(|e| { + logger.log(&LogEntry::new("/update", "http").with_conn(&req.conn).with_command(&format!("{}.{}", req.conn, req.collection)).with_error(&e.to_string())); + error_response(&e.to_string()) + })?; let upserted_id = result.upserted_id.map(|id| format!("{:?}", id)); let duration = start.elapsed(); @@ -294,8 +315,11 @@ pub async fn delete( let start = Instant::now(); let client = manager.get_client(&req.conn).await - .map_err(|e| error_response_with_usage(&e.to_string(), - "Usage: POST /delete {\"conn\": \"name\", \"collection\": \"coll\", \"filter\": {}}"))?; + .map_err(|e| { + logger.log(&LogEntry::new("/delete", "http").with_conn(&req.conn).with_error(&e.to_string())); + error_response_with_usage(&e.to_string(), + "Usage: POST /delete {\"conn\": \"name\", \"collection\": \"coll\", \"filter\": {}}") + })?; let db = client.database(&get_database(manager, &req.conn)); let coll = db.collection::(&req.collection); @@ -304,7 +328,10 @@ pub async fn delete( coll.delete_many(req.filter).await } else { coll.delete_one(req.filter).await - }.map_err(|e| error_response(&e.to_string()))?; + }.map_err(|e| { + logger.log(&LogEntry::new("/delete", "http").with_conn(&req.conn).with_command(&format!("{}.{}", req.conn, req.collection)).with_error(&e.to_string())); + error_response(&e.to_string()) + })?; let deleted_count = result.deleted_count; let duration = start.elapsed(); @@ -325,17 +352,26 @@ pub async fn aggregate( let start = Instant::now(); let client = manager.get_client(&req.conn).await - .map_err(|e| error_response_with_usage(&e.to_string(), - "Usage: POST /aggregate {\"conn\": \"name\", \"collection\": \"coll\", \"pipeline\": [{\"$match\": {}}, ...]}"))?; + .map_err(|e| { + logger.log(&LogEntry::new("/aggregate", "http").with_conn(&req.conn).with_error(&e.to_string())); + error_response_with_usage(&e.to_string(), + "Usage: POST /aggregate {\"conn\": \"name\", \"collection\": \"coll\", \"pipeline\": [{\"$match\": {}}, ...]}") + })?; let db = client.database(&get_database(manager, &req.conn)); let coll = db.collection::(&req.collection); let mut cursor = coll.aggregate(req.pipeline).await - .map_err(|e| error_response(&e.to_string()))?; + .map_err(|e| { + logger.log(&LogEntry::new("/aggregate", "http").with_conn(&req.conn).with_command(&format!("{}.{}", req.conn, req.collection)).with_error(&e.to_string())); + error_response(&e.to_string()) + })?; let mut documents = Vec::new(); - while let Some(result) = cursor.try_next().await.map_err(|e: mongodb::error::Error| error_response(&e.to_string()))? { + while let Some(result) = cursor.try_next().await.map_err(|e: mongodb::error::Error| { + logger.log(&LogEntry::new("/aggregate", "http").with_conn(&req.conn).with_error(&e.to_string())); + error_response(&e.to_string()) + })? { documents.push(bson_to_json(&result)); } @@ -359,14 +395,20 @@ pub async fn count( let start = Instant::now(); let client = manager.get_client(&req.conn).await - .map_err(|e| error_response_with_usage(&e.to_string(), - "Usage: POST /count {\"conn\": \"name\", \"collection\": \"coll\", \"filter\": {}}"))?; + .map_err(|e| { + logger.log(&LogEntry::new("/count", "http").with_conn(&req.conn).with_error(&e.to_string())); + error_response_with_usage(&e.to_string(), + "Usage: POST /count {\"conn\": \"name\", \"collection\": \"coll\", \"filter\": {}}") + })?; let db = client.database(&get_database(manager, &req.conn)); let coll = db.collection::(&req.collection); let count = coll.count_documents(req.filter.unwrap_or_default()).await - .map_err(|e| error_response(&e.to_string()))?; + .map_err(|e| { + logger.log(&LogEntry::new("/count", "http").with_conn(&req.conn).with_command(&format!("{}.{}", req.conn, req.collection)).with_error(&e.to_string())); + error_response(&e.to_string()) + })?; let duration = start.elapsed(); @@ -415,7 +457,10 @@ pub async fn add_connection( }; manager.add_connection(cfg).await - .map_err(|e| error_response(&e.to_string()))?; + .map_err(|e| { + logger.log(&LogEntry::new("/connections/add", "http").with_conn(&req.name).with_error(&e.to_string())); + error_response(&e.to_string()) + })?; logger.log(&LogEntry::new("/connections/add", "http") .with_conn(&req.name) diff --git a/mysql-proxy/src/handler.rs b/mysql-proxy/src/handler.rs index 964b77c..f654a39 100644 --- a/mysql-proxy/src/handler.rs +++ b/mysql-proxy/src/handler.rs @@ -100,6 +100,9 @@ impl ErrorResponse { // ============== 处理器 ============== +struct QueryResult { columns: Vec, rows: Vec>> } +struct ExecResult { affected: u64, last_id: u64 } + /// 查询处理器 pub async fn query( State(state): State, @@ -108,14 +111,6 @@ pub async fn query( let (manager, logger) = &*state; let start = Instant::now(); - // 获取连接 - let mut conn = manager.get_conn(&req.conn) - .map_err(|e| { - let usage = "Usage: POST /query {\"conn\": \"connection_name\", \"sql\": \"SELECT ...\"}\n" - .to_string() + &format!("Available connections: {}", list_conn_names(&manager)); - error_response_with_usage(&e.to_string(), &usage) - })?; - // 判断是否是查询语句 let sql_upper = req.sql.trim().to_uppercase(); let is_query = sql_upper.starts_with("SELECT") @@ -126,43 +121,45 @@ pub async fn query( || sql_upper.starts_with("WITH"); if !is_query { - let err = error_response_with_usage( + logger.log(&LogEntry::new("/query", "http") + .with_conn(&req.conn).with_sql(&req.sql).with_error("Not a SELECT query")); + return Err(error_response_with_usage( "Not a SELECT query. Use /execute for INSERT/UPDATE/DELETE", "Usage:\n POST /query {\"conn\": \"name\", \"sql\": \"SELECT ...\"}\n POST /execute {\"conn\": \"name\", \"sql\": \"INSERT/UPDATE/DELETE ...\"}" - ); - return Err(err); + )); } - // 执行查询 - let result = conn.query_iter(&req.sql) - .map_err(|e| { - let usage = format!("SQL Error: {}\n\nUsage: POST /query {{\"conn\": \"name\", \"sql\": \"SELECT ...\"}}", e); + let mgr = manager.clone(); + let conn_name = req.conn.clone(); + let sql = req.sql.clone(); + let conn_names = list_conn_names(manager); + + let result = tokio::task::spawn_blocking(move || -> Result)> { + let mut conn = mgr.get_conn(&conn_name).map_err(|e| { + let usage = format!("Usage: POST /query {{\"conn\": \"connection_name\", \"sql\": \"SELECT ...\"}}\nAvailable connections: {}", conn_names); error_response_with_usage(&e.to_string(), &usage) })?; - - let mut columns: Vec = Vec::new(); - let mut data: Vec>> = Vec::new(); - - // 获取数据并从第一行提取列名 - for row_result in result { - let row = row_result.map_err(|e| error_response(&e.to_string()))?; - - // 从第一行获取列名 - if columns.is_empty() { - columns = row.columns() - .iter() - .map(|c| c.name_str().to_string()) - .collect(); + let result = conn.query_iter(&sql).map_err(|e| error_response(&e.to_string()))?; + let mut columns: Vec = Vec::new(); + let mut data: Vec>> = Vec::new(); + for row_result in result { + let row = row_result.map_err(|e| error_response(&e.to_string()))?; + if columns.is_empty() { + columns = row.columns().iter().map(|c| c.name_str().to_string()).collect(); + } + data.push(row_to_strings(&row, columns.len())); } + Ok(QueryResult { columns, rows: data }) + }).await.map_err(|_| error_response("Task join error"))?; - let values = row_to_strings(&row, columns.len()); - data.push(values); - } + let result = result.map_err(|e| { + logger.log(&LogEntry::new("/query", "http").with_conn(&req.conn).with_sql(&req.sql).with_error(&e.1.error)); + e + })?; let duration = start.elapsed(); - let row_count = data.len(); + let row_count = result.rows.len(); - // 记录日志 logger.log(&LogEntry::new("/query", "http") .with_conn(&req.conn) .with_sql(&req.sql) @@ -170,8 +167,8 @@ pub async fn query( .with_rows(row_count)); Ok(Json(QueryResponse { - columns, - rows: data, + columns: result.columns, + rows: result.rows, row_count, duration_ms: duration.as_millis() as u64, })) @@ -185,31 +182,35 @@ pub async fn execute( let (manager, logger) = &*state; let start = Instant::now(); - // 获取连接 - let mut conn = manager.get_conn(&req.conn) - .map_err(|e| { - let usage = "Usage: POST /execute {\"conn\": \"connection_name\", \"sql\": \"INSERT/UPDATE/DELETE ...\"}\n" - .to_string() + &format!("Available connections: {}", list_conn_names(&manager)); + let mgr = manager.clone(); + let conn_name = req.conn.clone(); + let sql = req.sql.clone(); + let conn_names = list_conn_names(manager); + + let exec_result = tokio::task::spawn_blocking(move || -> Result)> { + let mut conn = mgr.get_conn(&conn_name).map_err(|e| { + let usage = format!("Usage: POST /execute {{\"conn\": \"connection_name\", \"sql\": \"INSERT/UPDATE/DELETE ...\"}}\nAvailable connections: {}", conn_names); error_response_with_usage(&e.to_string(), &usage) })?; + let result = conn.query_iter(&sql).map_err(|e| error_response(&e.to_string()))?; + Ok(ExecResult { affected: result.affected_rows(), last_id: result.last_insert_id().unwrap_or(0) }) + }).await.map_err(|_| error_response("Task join error"))?; - // 执行 - let result = conn.query_iter(&req.sql) - .map_err(|e| error_response(&e.to_string()))?; + let exec_result = exec_result.map_err(|e| { + logger.log(&LogEntry::new("/execute", "http").with_conn(&req.conn).with_sql(&req.sql).with_error(&e.1.error)); + e + })?; let duration = start.elapsed(); - let affected = result.affected_rows(); - let last_id = result.last_insert_id().unwrap_or(0); - // 记录日志 logger.log(&LogEntry::new("/execute", "http") .with_conn(&req.conn) .with_sql(&req.sql) .with_duration(duration.as_millis() as u64)); Ok(Json(ExecuteResponse { - affected_rows: affected, - last_insert_id: last_id, + affected_rows: exec_result.affected, + last_insert_id: exec_result.last_id, duration_ms: duration.as_millis() as u64, })) } @@ -254,7 +255,10 @@ pub async fn add_connection( }; manager.add_connection(cfg) - .map_err(|e| error_response(&e.to_string()))?; + .map_err(|e| { + logger.log(&LogEntry::new("/connections/add", "http").with_conn(&req.name).with_error(&e.to_string())); + error_response(&e.to_string()) + })?; // 记录日志 logger.log(&LogEntry::new("/connections/add", "http") diff --git a/redis-proxy/src/config.rs b/redis-proxy/src/config.rs index 7b6bcb5..0ee220b 100644 --- a/redis-proxy/src/config.rs +++ b/redis-proxy/src/config.rs @@ -77,8 +77,38 @@ impl Config { impl ConnectionConfig { pub fn build_url(&self) -> String { match &self.password { - Some(pass) => format!("redis://:{}@{}:{}/{}", pass, self.host, self.port, self.db), + Some(pass) => format!("redis://:{}@{}:{}/{}", url_encode_password(pass), self.host, self.port, self.db), None => format!("redis://{}:{}/{}", self.host, self.port, self.db), } } } + +fn url_encode_password(s: &str) -> String { + let mut result = String::new(); + for c in s.chars() { + match c { + '@' => result.push_str("%40"), + ':' => result.push_str("%3A"), + '/' => result.push_str("%2F"), + '?' => result.push_str("%3F"), + '#' => result.push_str("%23"), + '[' => result.push_str("%5B"), + ']' => result.push_str("%5D"), + '!' => result.push_str("%21"), + '$' => result.push_str("%24"), + '&' => result.push_str("%26"), + '\'' => result.push_str("%27"), + '(' => result.push_str("%28"), + ')' => result.push_str("%29"), + '*' => result.push_str("%2A"), + '+' => result.push_str("%2B"), + ',' => result.push_str("%2C"), + ';' => result.push_str("%3B"), + '=' => result.push_str("%3D"), + '%' => result.push_str("%25"), + ' ' => result.push_str("%20"), + _ => result.push(c), + } + } + result +} diff --git a/redis-proxy/src/db.rs b/redis-proxy/src/db.rs index f4f59a2..68d2405 100644 --- a/redis-proxy/src/db.rs +++ b/redis-proxy/src/db.rs @@ -38,8 +38,16 @@ impl ConnectionManager { } else { None } }; if let Some(client) = client { - let cfg = self.configs.lock().unwrap().get(name).unwrap().clone(); - return Ok((client, cfg)); + // Validate connection is still alive + if let Ok(mut conn) = client.get_connection() { + if redis::cmd("PING").query::(&mut conn).is_ok() { + let cfg = self.configs.lock().unwrap().get(name).unwrap().clone(); + return Ok((client, cfg)); + } + } + // Connection dead, remove stale client and recreate + println!("[Reconnect] Stale Redis client detected: {}, removing", name); + self.clients.lock().unwrap().remove(name); } let cfg = self.configs.lock().unwrap().get(name) .ok_or_else(|| anyhow::anyhow!("Connection '{}' not found", name))?.clone(); diff --git a/redis-proxy/src/handler.rs b/redis-proxy/src/handler.rs index 749caf0..4c8bd46 100644 --- a/redis-proxy/src/handler.rs +++ b/redis-proxy/src/handler.rs @@ -78,14 +78,20 @@ pub async fn run_cmd(State(state): State, Json(req): Json) let conn_name = req.conn.clone(); let cmd_str = format!("{} {}", req.command, req.args.join(" ")); - let result: serde_json::Value = tokio::task::spawn_blocking(move || -> Result { + let result: Result = tokio::task::spawn_blocking(move || { let (client, _) = mgr.get_conn(&conn_name).map_err(|e| err_usage(&e.to_string(), "Usage: POST /run {\"conn\": \"name\", \"command\": \"GET\", \"args\": [\"key\"]}"))?; let mut conn = client.get_connection().map_err(|e| err(&e.to_string()))?; let mut cmd = redis::cmd(&req.command); for arg in &req.args { cmd.arg(arg); } let val: redis::Value = cmd.query(&mut conn).map_err(|e| err(&e.to_string()))?; Ok(redis_value_to_json(&val)) - }).await.map_err(|_| err("Task join error"))??; + }).await.map_err(|_| err("Task join error"))?; + + let result = result.map_err(|e| { + let msg = &e.1.error; + logger.log(&LogEntry::new("/run", "http").with_conn(&req.conn).with_command(&cmd_str).with_error(msg)); + e + })?; let duration = start.elapsed(); logger.log(&LogEntry::new("/run", "http").with_conn(&req.conn).with_command(&cmd_str).with_duration(duration.as_millis() as u64)); @@ -99,12 +105,17 @@ pub async fn get(State(state): State, Json(req): Json) -> let conn_name = req.conn.clone(); let key_name = req.key.clone(); - let value: Option = tokio::task::spawn_blocking(move || -> Result, ApiError> { + let value: Result, ApiError> = tokio::task::spawn_blocking(move || { let (client, _) = mgr.get_conn(&conn_name).map_err(|e| err(&e.to_string()))?; let mut conn = client.get_connection().map_err(|e| err(&e.to_string()))?; let val: Option = conn.get(&key_name).map_err(|e| err(&e.to_string()))?; Ok(val) - }).await.map_err(|_| err("Task join error"))??; + }).await.map_err(|_| err("Task join error"))?; + + let value = value.map_err(|e| { + logger.log(&LogEntry::new("/get", "http").with_conn(&req.conn).with_command(&format!("GET {}", req.key)).with_error(&e.1.error)); + e + })?; let duration = start.elapsed(); logger.log(&LogEntry::new("/get", "http").with_conn(&req.conn).with_command(&format!("GET {}", req.key)).with_duration(duration.as_millis() as u64)); @@ -120,7 +131,7 @@ pub async fn set(State(state): State, Json(req): Json) -> let value_str = req.value.clone(); let ttl = req.ttl; - tokio::task::spawn_blocking(move || -> Result<(), ApiError> { + let result: Result<(), ApiError> = tokio::task::spawn_blocking(move || { let (client, _) = mgr.get_conn(&conn_name).map_err(|e| err(&e.to_string()))?; let mut conn = client.get_connection().map_err(|e| err(&e.to_string()))?; if let Some(secs) = ttl { @@ -129,7 +140,12 @@ pub async fn set(State(state): State, Json(req): Json) -> conn.set::<_, _, ()>(&key_name, &value_str).map_err(|e| err(&e.to_string()))?; } Ok(()) - }).await.map_err(|_| err("Task join error"))??; + }).await.map_err(|_| err("Task join error"))?; + + result.map_err(|e| { + logger.log(&LogEntry::new("/set", "http").with_conn(&req.conn).with_command(&format!("SET {} {}", req.key, req.value)).with_error(&e.1.error)); + e + })?; let duration = start.elapsed(); logger.log(&LogEntry::new("/set", "http").with_conn(&req.conn).with_command(&format!("SET {} {}", req.key, req.value)).with_duration(duration.as_millis() as u64)); @@ -143,12 +159,17 @@ pub async fn del(State(state): State, Json(req): Json) -> let conn_name = req.conn.clone(); let del_keys = req.keys.clone(); - let deleted: u64 = tokio::task::spawn_blocking(move || -> Result { + let deleted: Result = tokio::task::spawn_blocking(move || { let (client, _) = mgr.get_conn(&conn_name).map_err(|e| err(&e.to_string()))?; let mut conn = client.get_connection().map_err(|e| err(&e.to_string()))?; let count: u64 = conn.del(&del_keys).map_err(|e| err(&e.to_string()))?; Ok(count) - }).await.map_err(|_| err("Task join error"))??; + }).await.map_err(|_| err("Task join error"))?; + + let deleted = deleted.map_err(|e| { + logger.log(&LogEntry::new("/del", "http").with_conn(&req.conn).with_command(&format!("DEL {}", req.keys.join(" "))).with_error(&e.1.error)); + e + })?; let duration = start.elapsed(); logger.log(&LogEntry::new("/del", "http").with_conn(&req.conn).with_command(&format!("DEL {}", req.keys.join(" "))).with_duration(duration.as_millis() as u64)); @@ -162,12 +183,17 @@ pub async fn keys(State(state): State, Json(req): Json) - let conn_name = req.conn.clone(); let pattern = req.pattern.clone(); - let result: Vec = tokio::task::spawn_blocking(move || -> Result, ApiError> { + let result: Result, ApiError> = tokio::task::spawn_blocking(move || { let (client, _) = mgr.get_conn(&conn_name).map_err(|e| err(&e.to_string()))?; let mut conn = client.get_connection().map_err(|e| err(&e.to_string()))?; let keys: Vec = conn.keys(&pattern).map_err(|e| err(&e.to_string()))?; Ok(keys) - }).await.map_err(|_| err("Task join error"))??; + }).await.map_err(|_| err("Task join error"))?; + + let result = result.map_err(|e| { + logger.log(&LogEntry::new("/keys", "http").with_conn(&req.conn).with_command(&format!("KEYS {}", req.pattern)).with_error(&e.1.error)); + e + })?; let duration = start.elapsed(); logger.log(&LogEntry::new("/keys", "http").with_conn(&req.conn).with_command(&format!("KEYS {}", req.pattern)).with_duration(duration.as_millis() as u64)); @@ -180,12 +206,17 @@ pub async fn info(State(state): State, Json(req): Json) - let mgr = manager.clone(); let conn_name = req.conn.clone(); - let info: String = tokio::task::spawn_blocking(move || -> Result { + let info: Result = tokio::task::spawn_blocking(move || { let (client, _) = mgr.get_conn(&conn_name).map_err(|e| err(&e.to_string()))?; let mut conn = client.get_connection().map_err(|e| err(&e.to_string()))?; let info: String = redis::cmd("INFO").query(&mut conn).map_err(|e| err(&e.to_string()))?; Ok(info) - }).await.map_err(|_| err("Task join error"))??; + }).await.map_err(|_| err("Task join error"))?; + + let info = info.map_err(|e| { + logger.log(&LogEntry::new("/info", "http").with_conn(&req.conn).with_command("INFO").with_error(&e.1.error)); + e + })?; let duration = start.elapsed(); logger.log(&LogEntry::new("/info", "http").with_conn(&req.conn).with_command("INFO").with_duration(duration.as_millis() as u64)); @@ -203,7 +234,10 @@ pub async fn health(State(state): State) -> Json { pub async fn add_connection(State(state): State, Json(req): Json) -> Result, ApiError> { let (manager, logger) = state.as_ref(); let cfg = crate::config::ConnectionConfig { name: req.name.clone(), host: req.host, port: req.port, password: req.password, db: req.db }; - manager.add_connection(cfg).map_err(|e| err(&e.to_string()))?; + manager.add_connection(cfg).map_err(|e| { + logger.log(&LogEntry::new("/connections/add", "http").with_conn(&req.name).with_error(&e.to_string())); + err(&e.to_string()) + })?; logger.log(&LogEntry::new("/connections/add", "http").with_conn(&req.name).with_duration(0)); Ok(Json(serde_json::json!({"success": true, "message": format!("Connection '{}' added (temporary)", req.name)}))) } diff --git a/ssh-proxy/src/handler.rs b/ssh-proxy/src/handler.rs index dd0b77e..44485e2 100644 --- a/ssh-proxy/src/handler.rs +++ b/ssh-proxy/src/handler.rs @@ -188,14 +188,50 @@ pub struct DownloadRequest { fn default_concurrency() -> usize { 4 } +/// 修复 MSYS/Git Bash 在 Windows 上的路径转换问题 +/// MSYS 会把 /tmp/file 转为 C:/Users/.../Temp/file,/home/user/file 转为 C:/Users/user/file +/// 检测并还原为 Unix 路径 +fn fix_msys_remote_path(path: &str) -> String { + #[cfg(target_os = "windows")] + { + // 检测 Windows 绝对路径 (X:\... 或 X:/...) + let bytes = path.as_bytes(); + if bytes.len() >= 2 && bytes[1] == b':' && (bytes[2] == b'/' || bytes[2] == b'\\') { + // 可能是 MSYS 转换后的路径,尝试还原 + // 常见映射:C:/Users//AppData/Local/Temp/ -> /tmp/ + if let Some(idx) = path.find("/AppData/Local/Temp/") { + let rest = &path[idx + "/AppData/Local/Temp/".len()..]; + return format!("/tmp/{}", rest); + } + // 常见映射:C:/Users//... -> /home//... (不太常见但以防万一) + if let Some(idx) = path.find("/Users/") { + let rest = &path[idx + "/Users/".len()..]; + // 如果 rest 以数字开头(MSYS 用户目录映射),跳过 + // 格式: C:/Users//file -> /home//file + if let Some(slash_pos) = rest.find('/') { + let user = &rest[..slash_pos]; + let file_path = &rest[slash_pos..]; + return format!("/home/{}{}", user, file_path); + } + } + } + } + #[cfg(not(target_os = "windows"))] + { + let _ = path; + } + path.to_string() +} + pub async fn upload( State(state): State, Json(req): Json, ) -> Result, (StatusCode, Json)> { let (manager, logger) = &*state; let server_names = list_server_names(manager); + let remote_path = fix_msys_remote_path(&req.remote_path); - let result = manager.upload(&req.server, &req.local_path, &req.remote_path, req.concurrency).await + let result = manager.upload(&req.server, &req.local_path, &remote_path, req.concurrency).await .map_err(|e| { logger.log(&LogEntry::new("/upload", "http").with_server(&req.server).with_error(&e.to_string())); let usage = format!( @@ -228,8 +264,9 @@ pub async fn download( ) -> Result, (StatusCode, Json)> { let (manager, logger) = &*state; let server_names = list_server_names(manager); + let remote_path = fix_msys_remote_path(&req.remote_path); - let result = manager.download(&req.server, &req.remote_path, &req.local_path, req.concurrency).await + let result = manager.download(&req.server, &remote_path, &req.local_path, req.concurrency).await .map_err(|e| { logger.log(&LogEntry::new("/download", "http").with_server(&req.server).with_error(&e.to_string())); let usage = format!(