优化: 代理工具代码审查修复

This commit is contained in:
2026-04-01 18:11:06 +08:00
parent f77ed2572a
commit 38b16a6efc
9 changed files with 321 additions and 91 deletions

2
.gitignore vendored
View File

@@ -12,3 +12,5 @@ logs/
# 配置文件(含敏感信息)
mysql-proxy/mysql-proxy.toml
ssh-proxy/ssh-proxy.toml
mongo-proxy/mongo-proxy.toml
redis-proxy/redis-proxy.toml

View File

@@ -1,6 +1,6 @@
# 代理工具孵化记录
> 状态:孵化中 | 更新2026-03-24
> 状态:孵化中 | 更新2026-03-27
---
@@ -152,6 +152,54 @@ ssh <user>@<host> "<command>"
---
## 代码审查 (2026-03-27)
> 12 维度并行审查架构一致性、错误处理、安全性、性能、DRY、日志、配置管理、CLI 一致性、边界条件、依赖、资源清理、代码度量
### 必须修复
| # | 问题 | 文件 | 说明 | 状态 |
|---|------|------|------|------|
| 1 | .gitignore 未覆盖 mongo/redis 配置 | `.gitignore` | 含真实密码,随时可能被 `git add` 提交 | ✅ |
| 2 | Redis 密码无 URL 编码 | `redis-proxy/src/config.rs` | 密码含 `@` `:` 时 URL 解析失败 | ✅ |
| 3 | mongo urlencoding 不完整 | `mongo-proxy/src/config.rs` | 只编码 5 字符mysql 编码 19 字符 | ✅ |
| 4 | Redis 无重连机制 | `redis-proxy/src/db.rs` | Redis 重启后持续失败 5 分钟直到 idle timeout | ✅ |
| 5 | mysql-proxy handler 阻塞 tokio runtime | `mysql-proxy/src/handler.rs` | 同步 DB 操作未用 spawn_blocking阻塞 worker 线程 | ✅ |
| 6 | 全部失败路径无日志 | 所有 handler.rs (4 项目) | 约 36 个 map_err/?? 路径跳过 logger.log() | ✅ |
| 7 | with_error() 从未调用 | 所有 logger.rs (4 项目) | 错误日志 level 永远是 INFO | ✅ |
### 建议改进
| # | 问题 | 文件 | 说明 |
|---|------|------|------|
| 8 | get_conn TOCTOU 竞态 | mysql/mongo/redis `db.rs` | 释放锁后重建,可能重复创建连接 |
| 9 | mysql `--config` 占用 `-c` | `mysql-proxy/src/main.rs:22` | 与 `--conn` 冲突 |
| 10 | LogEntry 字段不一致 | 4 个 `logger.rs` | server/sql/rows/exitCode 各有缺失 |
| 11 | logger.rs 等 ~300 行重复 | 4 个项目 | RequestLogger/LogEntry/辅助函数高度重复 |
| 12 | ssh-proxy 无 SSH disconnect | `ssh-proxy/src/session.rs` | cleanup 只移除 HashMap 不发 disconnect |
| 13 | 无 graceful shutdown | 4 个 `main.rs` | Ctrl+C 不等待进行中请求 |
| 14 | redis-proxy 降级提示单行 | `redis-proxy/src/main.rs:52` | 其他 3 个用多行格式 |
| 15 | mongo-proxy JSON 模式打印耗时 | `mongo-proxy/src/cli.rs:86` | 破坏 JSON 输出纯度 |
| 16 | redis-proxy 无 JSON 输出 | `redis-proxy/src/cli.rs` | 其他 3 个都支持 -F json |
| 17 | mongo insert/update 缺 usage 连接列表 | `mongo-proxy/src/handler.rs:224,256` | find/count 有但 insert/update 没有 |
| 18 | ssh-proxy 未使用 russh-keys 依赖 | `ssh-proxy/Cargo.toml` | beta 版本,代码用 russh::keys |
| 19 | tokio features = ["full"] 过宽 | 4 个 `Cargo.toml` | 实际只需 rt-multi-thread/net/time/sync |
| 20 | 日志写入阻塞 tokio | 4 个 `logger.rs` | 同步 writeln 阻塞 worker 线程 |
### 代码度量
| 项目 | 行数 | 文件数 | 最长函数 | unwrap | clone | unsafe |
|------|------|--------|---------|-------|-------|
| mysql-proxy | 1,209 | 6 | query() 74行 | 10 | 18 | 0 |
| ssh-proxy | 997 | 6 | exec() 49行 | 0 | 17 | 0 |
| mongo-proxy | 1,422 | 6 | find() 38行 | 9 | 11 | 0 |
| redis-proxy | 964 | 6 | run_cmd() 19行 | 10 | 30 | 0 |
| **合计** | **4,592** | **24** | -- | **29** | **76** | **0** |
> 无 unsafe 代码,无 TODO/FIXME嵌套深度最大 3 层,函数最长 74 行。代码质量良好。
---
## 已知问题
### ssh-proxy
@@ -166,4 +214,11 @@ ssh <user>@<host> "<command>"
## 待优化
*由 AI 在实际使用过程中根据遇到的问题填写*
- [ ] 提取 proxy-common 公共 crate (logger/config/error/cli 基础)
- [ ] HTTP API 认证机制 (当前仅依赖 127.0.0.1 绑定)
- [ ] SSH 主机密钥验证 (当前无条件信任)
- [ ] redis-proxy 改用 redis::aio::ConnectionManager (避免 spawn_blocking)
- [ ] 日志写入改为 channel + 后台线程 (避免阻塞 tokio)
- [ ] 统一 graceful shutdown (with_graceful_shutdown)
- [x] .gitignore 补充 mongo/redis 配置文件
- [x] redis/mongo URL 编码对齐 mysql

View File

@@ -132,6 +132,21 @@ fn urlencoding(s: &str) -> String {
'/' => result.push_str("%2F"),
'?' => result.push_str("%3F"),
'#' => result.push_str("%23"),
'[' => result.push_str("%5B"),
']' => result.push_str("%5D"),
'!' => result.push_str("%21"),
'$' => result.push_str("%24"),
'&' => result.push_str("%26"),
'\'' => result.push_str("%27"),
'(' => result.push_str("%28"),
')' => result.push_str("%29"),
'*' => result.push_str("%2A"),
'+' => result.push_str("%2B"),
',' => result.push_str("%2C"),
';' => result.push_str("%3B"),
'=' => result.push_str("%3D"),
'%' => result.push_str("%25"),
' ' => result.push_str("%20"),
_ => result.push(c),
}
}

View File

@@ -182,8 +182,11 @@ pub async fn find(
let start = Instant::now();
let client = manager.get_client(&req.conn).await
.map_err(|e| error_response_with_usage(&e.to_string(),
&format!("Usage: POST /find {{\"conn\": \"name\", \"collection\": \"coll\", \"filter\": {{}}}}\nAvailable: {}", list_conn_names(manager))))?;
.map_err(|e| {
logger.log(&LogEntry::new("/find", "http").with_conn(&req.conn).with_error(&e.to_string()));
error_response_with_usage(&e.to_string(),
&format!("Usage: POST /find {{\"conn\": \"name\", \"collection\": \"coll\", \"filter\": {{}}}}\nAvailable: {}", list_conn_names(manager)))
})?;
let db = client.database(&get_database(manager, &req.conn));
let coll = db.collection::<Document>(&req.collection);
@@ -195,10 +198,16 @@ pub async fn find(
if let Some(s) = req.skip { find = find.skip(s); }
let mut cursor = find.await
.map_err(|e| error_response(&e.to_string()))?;
.map_err(|e| {
logger.log(&LogEntry::new("/find", "http").with_conn(&req.conn).with_command(&format!("{}.{}", req.conn, req.collection)).with_error(&e.to_string()));
error_response(&e.to_string())
})?;
let mut documents = Vec::new();
while let Some(result) = cursor.try_next().await.map_err(|e: mongodb::error::Error| error_response(&e.to_string()))? {
while let Some(result) = cursor.try_next().await.map_err(|e: mongodb::error::Error| {
logger.log(&LogEntry::new("/find", "http").with_conn(&req.conn).with_error(&e.to_string()));
error_response(&e.to_string())
})? {
documents.push(bson_to_json(&result));
}
@@ -222,14 +231,20 @@ pub async fn insert(
let start = Instant::now();
let client = manager.get_client(&req.conn).await
.map_err(|e| error_response_with_usage(&e.to_string(),
&format!("Usage: POST /insert {{\"conn\": \"name\", \"collection\": \"coll\", \"documents\": [{{}}]}}")))?;
.map_err(|e| {
logger.log(&LogEntry::new("/insert", "http").with_conn(&req.conn).with_error(&e.to_string()));
error_response_with_usage(&e.to_string(),
&format!("Usage: POST /insert {{\"conn\": \"name\", \"collection\": \"coll\", \"documents\": [{{}}]}}"))
})?;
let db = client.database(&get_database(manager, &req.conn));
let coll = db.collection::<Document>(&req.collection);
let result = coll.insert_many(req.documents).await
.map_err(|e| error_response(&e.to_string()))?;
.map_err(|e| {
logger.log(&LogEntry::new("/insert", "http").with_conn(&req.conn).with_command(&format!("{}.{}", req.conn, req.collection)).with_error(&e.to_string()));
error_response(&e.to_string())
})?;
let inserted_ids: Vec<String> = result.inserted_ids.iter()
.map(|(k, v)| format!("{}:{}", k, v))
@@ -254,8 +269,11 @@ pub async fn update(
let start = Instant::now();
let client = manager.get_client(&req.conn).await
.map_err(|e| error_response_with_usage(&e.to_string(),
"Usage: POST /update {\"conn\": \"name\", \"collection\": \"coll\", \"filter\": {}, \"update\": {\"$set\": {}}}"))?;
.map_err(|e| {
logger.log(&LogEntry::new("/update", "http").with_conn(&req.conn).with_error(&e.to_string()));
error_response_with_usage(&e.to_string(),
"Usage: POST /update {\"conn\": \"name\", \"collection\": \"coll\", \"filter\": {}, \"update\": {\"$set\": {}}}")
})?;
let db = client.database(&get_database(manager, &req.conn));
let coll = db.collection::<Document>(&req.collection);
@@ -268,7 +286,10 @@ pub async fn update(
let mut cmd = coll.update_one(req.filter, req.update);
if req.upsert.unwrap_or(false) { cmd = cmd.upsert(true); }
cmd.await
}.map_err(|e| error_response(&e.to_string()))?;
}.map_err(|e| {
logger.log(&LogEntry::new("/update", "http").with_conn(&req.conn).with_command(&format!("{}.{}", req.conn, req.collection)).with_error(&e.to_string()));
error_response(&e.to_string())
})?;
let upserted_id = result.upserted_id.map(|id| format!("{:?}", id));
let duration = start.elapsed();
@@ -294,8 +315,11 @@ pub async fn delete(
let start = Instant::now();
let client = manager.get_client(&req.conn).await
.map_err(|e| error_response_with_usage(&e.to_string(),
"Usage: POST /delete {\"conn\": \"name\", \"collection\": \"coll\", \"filter\": {}}"))?;
.map_err(|e| {
logger.log(&LogEntry::new("/delete", "http").with_conn(&req.conn).with_error(&e.to_string()));
error_response_with_usage(&e.to_string(),
"Usage: POST /delete {\"conn\": \"name\", \"collection\": \"coll\", \"filter\": {}}")
})?;
let db = client.database(&get_database(manager, &req.conn));
let coll = db.collection::<Document>(&req.collection);
@@ -304,7 +328,10 @@ pub async fn delete(
coll.delete_many(req.filter).await
} else {
coll.delete_one(req.filter).await
}.map_err(|e| error_response(&e.to_string()))?;
}.map_err(|e| {
logger.log(&LogEntry::new("/delete", "http").with_conn(&req.conn).with_command(&format!("{}.{}", req.conn, req.collection)).with_error(&e.to_string()));
error_response(&e.to_string())
})?;
let deleted_count = result.deleted_count;
let duration = start.elapsed();
@@ -325,17 +352,26 @@ pub async fn aggregate(
let start = Instant::now();
let client = manager.get_client(&req.conn).await
.map_err(|e| error_response_with_usage(&e.to_string(),
"Usage: POST /aggregate {\"conn\": \"name\", \"collection\": \"coll\", \"pipeline\": [{\"$match\": {}}, ...]}"))?;
.map_err(|e| {
logger.log(&LogEntry::new("/aggregate", "http").with_conn(&req.conn).with_error(&e.to_string()));
error_response_with_usage(&e.to_string(),
"Usage: POST /aggregate {\"conn\": \"name\", \"collection\": \"coll\", \"pipeline\": [{\"$match\": {}}, ...]}")
})?;
let db = client.database(&get_database(manager, &req.conn));
let coll = db.collection::<Document>(&req.collection);
let mut cursor = coll.aggregate(req.pipeline).await
.map_err(|e| error_response(&e.to_string()))?;
.map_err(|e| {
logger.log(&LogEntry::new("/aggregate", "http").with_conn(&req.conn).with_command(&format!("{}.{}", req.conn, req.collection)).with_error(&e.to_string()));
error_response(&e.to_string())
})?;
let mut documents = Vec::new();
while let Some(result) = cursor.try_next().await.map_err(|e: mongodb::error::Error| error_response(&e.to_string()))? {
while let Some(result) = cursor.try_next().await.map_err(|e: mongodb::error::Error| {
logger.log(&LogEntry::new("/aggregate", "http").with_conn(&req.conn).with_error(&e.to_string()));
error_response(&e.to_string())
})? {
documents.push(bson_to_json(&result));
}
@@ -359,14 +395,20 @@ pub async fn count(
let start = Instant::now();
let client = manager.get_client(&req.conn).await
.map_err(|e| error_response_with_usage(&e.to_string(),
"Usage: POST /count {\"conn\": \"name\", \"collection\": \"coll\", \"filter\": {}}"))?;
.map_err(|e| {
logger.log(&LogEntry::new("/count", "http").with_conn(&req.conn).with_error(&e.to_string()));
error_response_with_usage(&e.to_string(),
"Usage: POST /count {\"conn\": \"name\", \"collection\": \"coll\", \"filter\": {}}")
})?;
let db = client.database(&get_database(manager, &req.conn));
let coll = db.collection::<Document>(&req.collection);
let count = coll.count_documents(req.filter.unwrap_or_default()).await
.map_err(|e| error_response(&e.to_string()))?;
.map_err(|e| {
logger.log(&LogEntry::new("/count", "http").with_conn(&req.conn).with_command(&format!("{}.{}", req.conn, req.collection)).with_error(&e.to_string()));
error_response(&e.to_string())
})?;
let duration = start.elapsed();
@@ -415,7 +457,10 @@ pub async fn add_connection(
};
manager.add_connection(cfg).await
.map_err(|e| error_response(&e.to_string()))?;
.map_err(|e| {
logger.log(&LogEntry::new("/connections/add", "http").with_conn(&req.name).with_error(&e.to_string()));
error_response(&e.to_string())
})?;
logger.log(&LogEntry::new("/connections/add", "http")
.with_conn(&req.name)

View File

@@ -100,6 +100,9 @@ impl ErrorResponse {
// ============== 处理器 ==============
struct QueryResult { columns: Vec<String>, rows: Vec<Vec<Option<String>>> }
struct ExecResult { affected: u64, last_id: u64 }
/// 查询处理器
pub async fn query(
State(state): State<AppState>,
@@ -108,14 +111,6 @@ pub async fn query(
let (manager, logger) = &*state;
let start = Instant::now();
// 获取连接
let mut conn = manager.get_conn(&req.conn)
.map_err(|e| {
let usage = "Usage: POST /query {\"conn\": \"connection_name\", \"sql\": \"SELECT ...\"}\n"
.to_string() + &format!("Available connections: {}", list_conn_names(&manager));
error_response_with_usage(&e.to_string(), &usage)
})?;
// 判断是否是查询语句
let sql_upper = req.sql.trim().to_uppercase();
let is_query = sql_upper.starts_with("SELECT")
@@ -126,43 +121,45 @@ pub async fn query(
|| sql_upper.starts_with("WITH");
if !is_query {
let err = error_response_with_usage(
logger.log(&LogEntry::new("/query", "http")
.with_conn(&req.conn).with_sql(&req.sql).with_error("Not a SELECT query"));
return Err(error_response_with_usage(
"Not a SELECT query. Use /execute for INSERT/UPDATE/DELETE",
"Usage:\n POST /query {\"conn\": \"name\", \"sql\": \"SELECT ...\"}\n POST /execute {\"conn\": \"name\", \"sql\": \"INSERT/UPDATE/DELETE ...\"}"
);
return Err(err);
));
}
// 执行查询
let result = conn.query_iter(&req.sql)
.map_err(|e| {
let usage = format!("SQL Error: {}\n\nUsage: POST /query {{\"conn\": \"name\", \"sql\": \"SELECT ...\"}}", e);
let mgr = manager.clone();
let conn_name = req.conn.clone();
let sql = req.sql.clone();
let conn_names = list_conn_names(manager);
let result = tokio::task::spawn_blocking(move || -> Result<QueryResult, (StatusCode, Json<ErrorResponse>)> {
let mut conn = mgr.get_conn(&conn_name).map_err(|e| {
let usage = format!("Usage: POST /query {{\"conn\": \"connection_name\", \"sql\": \"SELECT ...\"}}\nAvailable connections: {}", conn_names);
error_response_with_usage(&e.to_string(), &usage)
})?;
let result = conn.query_iter(&sql).map_err(|e| error_response(&e.to_string()))?;
let mut columns: Vec<String> = Vec::new();
let mut data: Vec<Vec<Option<String>>> = Vec::new();
// 获取数据并从第一行提取列名
for row_result in result {
let row = row_result.map_err(|e| error_response(&e.to_string()))?;
// 从第一行获取列名
if columns.is_empty() {
columns = row.columns()
.iter()
.map(|c| c.name_str().to_string())
.collect();
columns = row.columns().iter().map(|c| c.name_str().to_string()).collect();
}
data.push(row_to_strings(&row, columns.len()));
}
Ok(QueryResult { columns, rows: data })
}).await.map_err(|_| error_response("Task join error"))?;
let values = row_to_strings(&row, columns.len());
data.push(values);
}
let result = result.map_err(|e| {
logger.log(&LogEntry::new("/query", "http").with_conn(&req.conn).with_sql(&req.sql).with_error(&e.1.error));
e
})?;
let duration = start.elapsed();
let row_count = data.len();
let row_count = result.rows.len();
// 记录日志
logger.log(&LogEntry::new("/query", "http")
.with_conn(&req.conn)
.with_sql(&req.sql)
@@ -170,8 +167,8 @@ pub async fn query(
.with_rows(row_count));
Ok(Json(QueryResponse {
columns,
rows: data,
columns: result.columns,
rows: result.rows,
row_count,
duration_ms: duration.as_millis() as u64,
}))
@@ -185,31 +182,35 @@ pub async fn execute(
let (manager, logger) = &*state;
let start = Instant::now();
// 获取连接
let mut conn = manager.get_conn(&req.conn)
.map_err(|e| {
let usage = "Usage: POST /execute {\"conn\": \"connection_name\", \"sql\": \"INSERT/UPDATE/DELETE ...\"}\n"
.to_string() + &format!("Available connections: {}", list_conn_names(&manager));
let mgr = manager.clone();
let conn_name = req.conn.clone();
let sql = req.sql.clone();
let conn_names = list_conn_names(manager);
let exec_result = tokio::task::spawn_blocking(move || -> Result<ExecResult, (StatusCode, Json<ErrorResponse>)> {
let mut conn = mgr.get_conn(&conn_name).map_err(|e| {
let usage = format!("Usage: POST /execute {{\"conn\": \"connection_name\", \"sql\": \"INSERT/UPDATE/DELETE ...\"}}\nAvailable connections: {}", conn_names);
error_response_with_usage(&e.to_string(), &usage)
})?;
let result = conn.query_iter(&sql).map_err(|e| error_response(&e.to_string()))?;
Ok(ExecResult { affected: result.affected_rows(), last_id: result.last_insert_id().unwrap_or(0) })
}).await.map_err(|_| error_response("Task join error"))?;
// 执行
let result = conn.query_iter(&req.sql)
.map_err(|e| error_response(&e.to_string()))?;
let exec_result = exec_result.map_err(|e| {
logger.log(&LogEntry::new("/execute", "http").with_conn(&req.conn).with_sql(&req.sql).with_error(&e.1.error));
e
})?;
let duration = start.elapsed();
let affected = result.affected_rows();
let last_id = result.last_insert_id().unwrap_or(0);
// 记录日志
logger.log(&LogEntry::new("/execute", "http")
.with_conn(&req.conn)
.with_sql(&req.sql)
.with_duration(duration.as_millis() as u64));
Ok(Json(ExecuteResponse {
affected_rows: affected,
last_insert_id: last_id,
affected_rows: exec_result.affected,
last_insert_id: exec_result.last_id,
duration_ms: duration.as_millis() as u64,
}))
}
@@ -254,7 +255,10 @@ pub async fn add_connection(
};
manager.add_connection(cfg)
.map_err(|e| error_response(&e.to_string()))?;
.map_err(|e| {
logger.log(&LogEntry::new("/connections/add", "http").with_conn(&req.name).with_error(&e.to_string()));
error_response(&e.to_string())
})?;
// 记录日志
logger.log(&LogEntry::new("/connections/add", "http")

View File

@@ -77,8 +77,38 @@ impl Config {
impl ConnectionConfig {
pub fn build_url(&self) -> String {
match &self.password {
Some(pass) => format!("redis://:{}@{}:{}/{}", pass, self.host, self.port, self.db),
Some(pass) => format!("redis://:{}@{}:{}/{}", url_encode_password(pass), self.host, self.port, self.db),
None => format!("redis://{}:{}/{}", self.host, self.port, self.db),
}
}
}
fn url_encode_password(s: &str) -> String {
let mut result = String::new();
for c in s.chars() {
match c {
'@' => result.push_str("%40"),
':' => result.push_str("%3A"),
'/' => result.push_str("%2F"),
'?' => result.push_str("%3F"),
'#' => result.push_str("%23"),
'[' => result.push_str("%5B"),
']' => result.push_str("%5D"),
'!' => result.push_str("%21"),
'$' => result.push_str("%24"),
'&' => result.push_str("%26"),
'\'' => result.push_str("%27"),
'(' => result.push_str("%28"),
')' => result.push_str("%29"),
'*' => result.push_str("%2A"),
'+' => result.push_str("%2B"),
',' => result.push_str("%2C"),
';' => result.push_str("%3B"),
'=' => result.push_str("%3D"),
'%' => result.push_str("%25"),
' ' => result.push_str("%20"),
_ => result.push(c),
}
}
result
}

View File

@@ -38,9 +38,17 @@ impl ConnectionManager {
} else { None }
};
if let Some(client) = client {
// Validate connection is still alive
if let Ok(mut conn) = client.get_connection() {
if redis::cmd("PING").query::<String>(&mut conn).is_ok() {
let cfg = self.configs.lock().unwrap().get(name).unwrap().clone();
return Ok((client, cfg));
}
}
// Connection dead, remove stale client and recreate
println!("[Reconnect] Stale Redis client detected: {}, removing", name);
self.clients.lock().unwrap().remove(name);
}
let cfg = self.configs.lock().unwrap().get(name)
.ok_or_else(|| anyhow::anyhow!("Connection '{}' not found", name))?.clone();
println!("[LazyInit] Creating Redis client for: {}", name);

View File

@@ -78,14 +78,20 @@ pub async fn run_cmd(State(state): State<AppState>, Json(req): Json<RunRequest>)
let conn_name = req.conn.clone();
let cmd_str = format!("{} {}", req.command, req.args.join(" "));
let result: serde_json::Value = tokio::task::spawn_blocking(move || -> Result<serde_json::Value, ApiError> {
let result: Result<serde_json::Value, ApiError> = tokio::task::spawn_blocking(move || {
let (client, _) = mgr.get_conn(&conn_name).map_err(|e| err_usage(&e.to_string(), "Usage: POST /run {\"conn\": \"name\", \"command\": \"GET\", \"args\": [\"key\"]}"))?;
let mut conn = client.get_connection().map_err(|e| err(&e.to_string()))?;
let mut cmd = redis::cmd(&req.command);
for arg in &req.args { cmd.arg(arg); }
let val: redis::Value = cmd.query(&mut conn).map_err(|e| err(&e.to_string()))?;
Ok(redis_value_to_json(&val))
}).await.map_err(|_| err("Task join error"))??;
}).await.map_err(|_| err("Task join error"))?;
let result = result.map_err(|e| {
let msg = &e.1.error;
logger.log(&LogEntry::new("/run", "http").with_conn(&req.conn).with_command(&cmd_str).with_error(msg));
e
})?;
let duration = start.elapsed();
logger.log(&LogEntry::new("/run", "http").with_conn(&req.conn).with_command(&cmd_str).with_duration(duration.as_millis() as u64));
@@ -99,12 +105,17 @@ pub async fn get(State(state): State<AppState>, Json(req): Json<GetRequest>) ->
let conn_name = req.conn.clone();
let key_name = req.key.clone();
let value: Option<String> = tokio::task::spawn_blocking(move || -> Result<Option<String>, ApiError> {
let value: Result<Option<String>, ApiError> = tokio::task::spawn_blocking(move || {
let (client, _) = mgr.get_conn(&conn_name).map_err(|e| err(&e.to_string()))?;
let mut conn = client.get_connection().map_err(|e| err(&e.to_string()))?;
let val: Option<String> = conn.get(&key_name).map_err(|e| err(&e.to_string()))?;
Ok(val)
}).await.map_err(|_| err("Task join error"))??;
}).await.map_err(|_| err("Task join error"))?;
let value = value.map_err(|e| {
logger.log(&LogEntry::new("/get", "http").with_conn(&req.conn).with_command(&format!("GET {}", req.key)).with_error(&e.1.error));
e
})?;
let duration = start.elapsed();
logger.log(&LogEntry::new("/get", "http").with_conn(&req.conn).with_command(&format!("GET {}", req.key)).with_duration(duration.as_millis() as u64));
@@ -120,7 +131,7 @@ pub async fn set(State(state): State<AppState>, Json(req): Json<SetRequest>) ->
let value_str = req.value.clone();
let ttl = req.ttl;
tokio::task::spawn_blocking(move || -> Result<(), ApiError> {
let result: Result<(), ApiError> = tokio::task::spawn_blocking(move || {
let (client, _) = mgr.get_conn(&conn_name).map_err(|e| err(&e.to_string()))?;
let mut conn = client.get_connection().map_err(|e| err(&e.to_string()))?;
if let Some(secs) = ttl {
@@ -129,7 +140,12 @@ pub async fn set(State(state): State<AppState>, Json(req): Json<SetRequest>) ->
conn.set::<_, _, ()>(&key_name, &value_str).map_err(|e| err(&e.to_string()))?;
}
Ok(())
}).await.map_err(|_| err("Task join error"))??;
}).await.map_err(|_| err("Task join error"))?;
result.map_err(|e| {
logger.log(&LogEntry::new("/set", "http").with_conn(&req.conn).with_command(&format!("SET {} {}", req.key, req.value)).with_error(&e.1.error));
e
})?;
let duration = start.elapsed();
logger.log(&LogEntry::new("/set", "http").with_conn(&req.conn).with_command(&format!("SET {} {}", req.key, req.value)).with_duration(duration.as_millis() as u64));
@@ -143,12 +159,17 @@ pub async fn del(State(state): State<AppState>, Json(req): Json<DelRequest>) ->
let conn_name = req.conn.clone();
let del_keys = req.keys.clone();
let deleted: u64 = tokio::task::spawn_blocking(move || -> Result<u64, ApiError> {
let deleted: Result<u64, ApiError> = tokio::task::spawn_blocking(move || {
let (client, _) = mgr.get_conn(&conn_name).map_err(|e| err(&e.to_string()))?;
let mut conn = client.get_connection().map_err(|e| err(&e.to_string()))?;
let count: u64 = conn.del(&del_keys).map_err(|e| err(&e.to_string()))?;
Ok(count)
}).await.map_err(|_| err("Task join error"))??;
}).await.map_err(|_| err("Task join error"))?;
let deleted = deleted.map_err(|e| {
logger.log(&LogEntry::new("/del", "http").with_conn(&req.conn).with_command(&format!("DEL {}", req.keys.join(" "))).with_error(&e.1.error));
e
})?;
let duration = start.elapsed();
logger.log(&LogEntry::new("/del", "http").with_conn(&req.conn).with_command(&format!("DEL {}", req.keys.join(" "))).with_duration(duration.as_millis() as u64));
@@ -162,12 +183,17 @@ pub async fn keys(State(state): State<AppState>, Json(req): Json<KeysRequest>) -
let conn_name = req.conn.clone();
let pattern = req.pattern.clone();
let result: Vec<String> = tokio::task::spawn_blocking(move || -> Result<Vec<String>, ApiError> {
let result: Result<Vec<String>, ApiError> = tokio::task::spawn_blocking(move || {
let (client, _) = mgr.get_conn(&conn_name).map_err(|e| err(&e.to_string()))?;
let mut conn = client.get_connection().map_err(|e| err(&e.to_string()))?;
let keys: Vec<String> = conn.keys(&pattern).map_err(|e| err(&e.to_string()))?;
Ok(keys)
}).await.map_err(|_| err("Task join error"))??;
}).await.map_err(|_| err("Task join error"))?;
let result = result.map_err(|e| {
logger.log(&LogEntry::new("/keys", "http").with_conn(&req.conn).with_command(&format!("KEYS {}", req.pattern)).with_error(&e.1.error));
e
})?;
let duration = start.elapsed();
logger.log(&LogEntry::new("/keys", "http").with_conn(&req.conn).with_command(&format!("KEYS {}", req.pattern)).with_duration(duration.as_millis() as u64));
@@ -180,12 +206,17 @@ pub async fn info(State(state): State<AppState>, Json(req): Json<InfoRequest>) -
let mgr = manager.clone();
let conn_name = req.conn.clone();
let info: String = tokio::task::spawn_blocking(move || -> Result<String, ApiError> {
let info: Result<String, ApiError> = tokio::task::spawn_blocking(move || {
let (client, _) = mgr.get_conn(&conn_name).map_err(|e| err(&e.to_string()))?;
let mut conn = client.get_connection().map_err(|e| err(&e.to_string()))?;
let info: String = redis::cmd("INFO").query(&mut conn).map_err(|e| err(&e.to_string()))?;
Ok(info)
}).await.map_err(|_| err("Task join error"))??;
}).await.map_err(|_| err("Task join error"))?;
let info = info.map_err(|e| {
logger.log(&LogEntry::new("/info", "http").with_conn(&req.conn).with_command("INFO").with_error(&e.1.error));
e
})?;
let duration = start.elapsed();
logger.log(&LogEntry::new("/info", "http").with_conn(&req.conn).with_command("INFO").with_duration(duration.as_millis() as u64));
@@ -203,7 +234,10 @@ pub async fn health(State(state): State<AppState>) -> Json<HealthResponse> {
pub async fn add_connection(State(state): State<AppState>, Json(req): Json<AddConnectionRequest>) -> Result<Json<serde_json::Value>, ApiError> {
let (manager, logger) = state.as_ref();
let cfg = crate::config::ConnectionConfig { name: req.name.clone(), host: req.host, port: req.port, password: req.password, db: req.db };
manager.add_connection(cfg).map_err(|e| err(&e.to_string()))?;
manager.add_connection(cfg).map_err(|e| {
logger.log(&LogEntry::new("/connections/add", "http").with_conn(&req.name).with_error(&e.to_string()));
err(&e.to_string())
})?;
logger.log(&LogEntry::new("/connections/add", "http").with_conn(&req.name).with_duration(0));
Ok(Json(serde_json::json!({"success": true, "message": format!("Connection '{}' added (temporary)", req.name)})))
}

View File

@@ -188,14 +188,50 @@ pub struct DownloadRequest {
fn default_concurrency() -> usize { 4 }
/// 修复 MSYS/Git Bash 在 Windows 上的路径转换问题
/// MSYS 会把 /tmp/file 转为 C:/Users/.../Temp/file/home/user/file 转为 C:/Users/user/file
/// 检测并还原为 Unix 路径
fn fix_msys_remote_path(path: &str) -> String {
#[cfg(target_os = "windows")]
{
// 检测 Windows 绝对路径 (X:\... 或 X:/...)
let bytes = path.as_bytes();
if bytes.len() >= 2 && bytes[1] == b':' && (bytes[2] == b'/' || bytes[2] == b'\\') {
// 可能是 MSYS 转换后的路径,尝试还原
// 常见映射C:/Users/<user>/AppData/Local/Temp/<name> -> /tmp/<name>
if let Some(idx) = path.find("/AppData/Local/Temp/") {
let rest = &path[idx + "/AppData/Local/Temp/".len()..];
return format!("/tmp/{}", rest);
}
// 常见映射C:/Users/<user>/... -> /home/<user>/... (不太常见但以防万一)
if let Some(idx) = path.find("/Users/") {
let rest = &path[idx + "/Users/".len()..];
// 如果 rest 以数字开头MSYS 用户目录映射),跳过
// 格式: C:/Users/<user>/file -> /home/<user>/file
if let Some(slash_pos) = rest.find('/') {
let user = &rest[..slash_pos];
let file_path = &rest[slash_pos..];
return format!("/home/{}{}", user, file_path);
}
}
}
}
#[cfg(not(target_os = "windows"))]
{
let _ = path;
}
path.to_string()
}
pub async fn upload(
State(state): State<AppState>,
Json(req): Json<UploadRequest>,
) -> Result<Json<TransferResponse>, (StatusCode, Json<ErrorResponse>)> {
let (manager, logger) = &*state;
let server_names = list_server_names(manager);
let remote_path = fix_msys_remote_path(&req.remote_path);
let result = manager.upload(&req.server, &req.local_path, &req.remote_path, req.concurrency).await
let result = manager.upload(&req.server, &req.local_path, &remote_path, req.concurrency).await
.map_err(|e| {
logger.log(&LogEntry::new("/upload", "http").with_server(&req.server).with_error(&e.to_string()));
let usage = format!(
@@ -228,8 +264,9 @@ pub async fn download(
) -> Result<Json<TransferResponse>, (StatusCode, Json<ErrorResponse>)> {
let (manager, logger) = &*state;
let server_names = list_server_names(manager);
let remote_path = fix_msys_remote_path(&req.remote_path);
let result = manager.download(&req.server, &req.remote_path, &req.local_path, req.concurrency).await
let result = manager.download(&req.server, &remote_path, &req.local_path, req.concurrency).await
.map_err(|e| {
logger.log(&LogEntry::new("/download", "http").with_server(&req.server).with_error(&e.to_string()));
let usage = format!(