优化: ssh-proxy russh 迁移后代码整理

- 移除未使用的 async-trait 依赖
- 添加断线重连逻辑,会话失效时自动重连
- 修复 get_or_create_session TOCTOU 竞态条件
- 日志智能分级: 慢请求告警、退出码识别
- 用 time crate 替换手写日期计算 (删除40行)
- UTF-8 安全截断修复
- 同步优化 mysql-proxy 日志模块
This commit is contained in:
2026-03-21 00:53:27 +08:00
parent 11203f036f
commit f59ed9aae0
9 changed files with 260 additions and 213 deletions

View File

@@ -1,6 +1,6 @@
# 代理工具孵化记录
> 状态:孵化中 | 更新2026-03-19
> 状态:孵化中 | 更新2026-03-21
---
@@ -15,6 +15,33 @@
## 新增功能
### 2026-03-21
1. **ssh-proxy 断线重连** (P0)
- exec 方法检测到会话失效时自动清除并重连
- 修复 get_or_create_session TOCTOU 竞态条件
2. **日志智能分级** (P1)
- 慢请求告警: >3s WARN, >10s ERROR
- 退出码识别: exitCode 0/1/-1 视为正常,>=2 或 127 标记 ERROR
- UTF-8 安全截断
3. **服务器配置扩展** (P1)
- 新增 server1 (一号机)、server2 (二号机)、ai_sg (新加坡 AI 机器)
- 共 5 台服务器
4. **代码优化** (P2)
- 移除未使用的 async-trait 依赖
- 用 time crate 替换 40 行手写日期计算
- 清理未使用字段
### 2026-03-20
1. **ssh-proxy russh 迁移** (P0)
- 从 ssh2 迁移到 russh原生支持 ed25519 密钥
- 修复了 ed25519 认证失败问题
- 使用 async/await 原生 API
### 2026-03-19
1. **请求日志** (P0)
@@ -59,20 +86,51 @@
## AI 推荐使用方式
> **优先使用 CLI 模式**,命令简洁
```bash
# MySQL 查询 (AI 友好)
# MySQL 查询 (CLI 优先)
mysql-proxy cli -c flux_dev -e "SELECT VERSION()"
# MySQL JSON 输出
mysql-proxy cli -c flux_dev -e "SHOW TABLES" -F json
# MySQL 执行 DML
mysql-proxy cli -c flux_dev -e "UPDATE users SET status=1" -x
# SSH 执行 (CLI 优先)
ssh-proxy exec -n flux_dev -c "docker ps"
# SSH JSON 输出
ssh-proxy exec -n flux_dev -c "docker ps" -F json
# 列出服务器
ssh-proxy servers
mysql-proxy connections
```
**HTTP API (复杂场景)**
```bash
# MySQL
curl -X POST http://127.0.0.1:3307/query \
-H "Content-Type: application/json" \
-d '{"conn":"flux_dev","sql":"SELECT VERSION()"}'
# SSH 执行 (AI 友好)
# SSH
curl -X POST http://127.0.0.1:3308/exec \
-H "Content-Type: application/json" \
-d '{"server":"flux_dev","command":"docker ps"}'
```
**降级方案** (代理不可用)
```bash
mysql -h<host> -u<user> -p<password> -D<database> -e "<SQL>"
ssh <user>@<host> "<command>"
```
**优势**
- JSON 格式,易于解析
- CLI 命令简洁AI 友好
- JSON 格式输出,易于解析
- 会话复用,响应快
- 错误提示友好
@@ -82,12 +140,7 @@ curl -X POST http://127.0.0.1:3308/exec \
### ssh-proxy
1. **ssh2 不支持 ed25519 密钥**
- 错误: `[Session(-19)] Callback returned error`
- 解决: 使用 PEM 格式 RSA 密钥
```bash
ssh-keygen -t rsa -b 2048 -f ~/.ssh/id_rsa_pem -m PEM -N ""
```
- 暂无 (ed25519 已支持)
### mysql-proxy

View File

@@ -14,6 +14,7 @@ toml = "0.8"
anyhow = "1"
clap = { version = "4", features = ["derive"] }
ureq = "2" # CLI HTTP client (2.x has simpler API)
time = { version = "0.3", features = ["formatting", "macros"] }
[profile.release]
opt-level = "z"

View File

@@ -3,7 +3,6 @@ use std::fs::{File, OpenOptions};
use std::io::Write;
use std::path::PathBuf;
use std::sync::Mutex;
use std::time::{SystemTime, UNIX_EPOCH};
/// 日志记录器
pub struct RequestLogger {
@@ -136,6 +135,12 @@ impl LogEntry {
pub fn with_duration(mut self, ms: u64) -> Self {
self.duration_ms = ms;
// 慢请求告警: >2s 为 WARN, >5s 为 ERROR
if ms > 5000 && self.level == "INFO" {
self.level = "ERROR".to_string();
} else if ms > 2000 && self.level == "INFO" {
self.level = "WARN".to_string();
}
self
}
@@ -157,62 +162,16 @@ impl LogEntry {
}
fn current_timestamp() -> String {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default();
let secs = now.as_secs();
let datetime = chrono_timestamp(secs);
format!("{}Z", datetime)
}
fn chrono_timestamp(secs: u64) -> String {
let days = secs / 86400;
let remaining = secs % 86400;
let hours = remaining / 3600;
let minutes = (remaining % 3600) / 60;
let seconds = remaining % 60;
// 从 1970-01-01 开始计算日期
let mut year = 1970;
let mut days_left = days;
loop {
let days_in_year = if is_leap_year(year) { 366 } else { 365 };
if days_left < days_in_year {
break;
}
days_left -= days_in_year;
year += 1;
}
let month_days = if is_leap_year(year) {
[31, 29, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]
} else {
[31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]
};
let mut month = 1;
for &days_in_month in &month_days {
if days_left < days_in_month {
break;
}
days_left -= days_in_month;
month += 1;
}
let day = days_left + 1;
format!("{:04}-{:02}-{:02}T{:02}:{:02}:{:02}", year, month, day, hours, minutes, seconds)
}
fn is_leap_year(year: u64) -> bool {
(year % 4 == 0 && year % 100 != 0) || (year % 400 == 0)
time::OffsetDateTime::now_utc()
.format(&time::macros::format_description!("[year]-[month]-[day]T[hour]:[minute]:[second]"))
.unwrap_or_default()
}
fn truncate_string(s: &str, max_len: usize) -> String {
if s.len() <= max_len {
if s.chars().count() <= max_len {
s.to_string()
} else {
format!("{}... (truncated)", &s[..max_len])
format!("{}... (truncated)", s.chars().take(max_len).collect::<String>())
}
}

View File

@@ -5,7 +5,8 @@ edition = "2021"
description = "SSH HTTP proxy with session pooling"
[dependencies]
ssh2 = "0.9"
russh = { version = "0.50", features = ["legacy-ed25519-pkcs8-parser"] }
russh-keys = "0.50.0-beta.7"
tokio = { version = "1", features = ["full"] }
axum = "0.7"
serde = { version = "1", features = ["derive"] }
@@ -14,6 +15,7 @@ toml = "0.8"
anyhow = "1"
clap = { version = "4", features = ["derive"] }
ureq = "2" # CLI HTTP client (2.x has simpler API)
time = { version = "0.3", features = ["formatting", "macros"] }
[profile.release]
opt-level = "z"

View File

@@ -196,14 +196,8 @@ impl Cli {
anyhow::bail!("{}", err.error);
}
#[derive(Deserialize)]
struct AddResponse {
success: bool,
message: String,
}
let result: AddResponse = serde_json::from_str(&body)?;
println!("{}", result.message);
let result: serde_json::Value = serde_json::from_str(&body)?;
println!("{}", result["message"]);
Ok(())
}

View File

@@ -4,6 +4,7 @@ use std::sync::Arc;
use std::time::Instant;
use crate::session::SessionManager;
use crate::config::SshServerConfig;
use crate::logger::{LogEntry, RequestLogger};
pub type AppState = Arc<(Arc<SessionManager>, Arc<RequestLogger>)>;
@@ -75,24 +76,17 @@ pub async fn exec(
let start = Instant::now();
let (manager, logger) = &*state;
let manager_clone = manager.clone();
let server = req.server.clone();
let command = req.command.clone();
// 获取服务器列表用于错误提示
let server_names = list_server_names(manager);
let result = tokio::task::spawn_blocking(move || {
manager_clone.exec(&server, &command)
})
.await
.map_err(|e| {
error_response_with_usage(&e.to_string(), "Internal error occurred")
})?
.map_err(|e| {
let usage = format!(
"Usage: POST /exec {{\"server\": \"server_name\", \"command\": \"your command\"}}\n\nAvailable servers: {}",
list_server_names(&manager)
);
error_response_with_usage(&e.to_string(), &usage)
})?;
let result = manager.exec(&req.server, &req.command).await
.map_err(|e| {
let usage = format!(
"Usage: POST /exec {{\"server\": \"server_name\", \"command\": \"your command\"}}\n\nAvailable servers: {}",
server_names
);
error_response_with_usage(&e.to_string(), &usage)
})?;
let duration_ms = start.elapsed().as_millis() as u64;
@@ -123,7 +117,6 @@ pub async fn add_server(
Json(req): Json<AddServerRequest>,
) -> Result<Json<serde_json::Value>, (StatusCode, Json<ErrorResponse>)> {
let (manager, logger) = &*state;
use crate::config::SshServerConfig;
// 验证必须有密码或私钥
if req.password.is_none() && req.private_key.is_none() {
@@ -142,7 +135,7 @@ pub async fn add_server(
private_key: req.private_key,
};
manager.add_server(cfg)
manager.add_server(cfg).await
.map_err(|e| error_response(&e.to_string()))?;
// 记录日志

View File

@@ -3,7 +3,6 @@ use std::fs::{File, OpenOptions};
use std::io::Write;
use std::path::PathBuf;
use std::sync::Mutex;
use std::time::{SystemTime, UNIX_EPOCH};
/// 日志记录器
pub struct RequestLogger {
@@ -136,6 +135,13 @@ impl LogEntry {
pub fn with_duration(mut self, ms: u64) -> Self {
self.duration_ms = ms;
// 慢请求告警: >3s 为 WARN, >10s 为 ERROR
// 注意: 后台命令(nohup)可能耗时很长但 exitCode=0不标记错误
if ms > 10000 && self.level == "INFO" {
self.level = "ERROR".to_string();
} else if ms > 3000 && self.level == "INFO" {
self.level = "WARN".to_string();
}
self
}
@@ -146,6 +152,13 @@ impl LogEntry {
pub fn with_exit_code(mut self, code: i32) -> Self {
self.exit_code = Some(code);
// 只标记真正的错误 (>=2 或 127)
// exitCode: -1 (pkill未找到), 0 (成功), 1 (grep未匹配) 视为正常
if code >= 2 || code == 127 {
if self.level == "INFO" {
self.level = "ERROR".to_string();
}
}
self
}
@@ -157,62 +170,16 @@ impl LogEntry {
}
fn current_timestamp() -> String {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default();
let secs = now.as_secs();
let datetime = chrono_timestamp(secs);
format!("{}Z", datetime)
}
fn chrono_timestamp(secs: u64) -> String {
let days = secs / 86400;
let remaining = secs % 86400;
let hours = remaining / 3600;
let minutes = (remaining % 3600) / 60;
let seconds = remaining % 60;
// 从 1970-01-01 开始计算日期
let mut year = 1970;
let mut days_left = days;
loop {
let days_in_year = if is_leap_year(year) { 366 } else { 365 };
if days_left < days_in_year {
break;
}
days_left -= days_in_year;
year += 1;
}
let month_days = if is_leap_year(year) {
[31, 29, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]
} else {
[31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]
};
let mut month = 1;
for &days_in_month in &month_days {
if days_left < days_in_month {
break;
}
days_left -= days_in_month;
month += 1;
}
let day = days_left + 1;
format!("{:04}-{:02}-{:02}T{:02}:{:02}:{:02}", year, month, day, hours, minutes, seconds)
}
fn is_leap_year(year: u64) -> bool {
(year % 4 == 0 && year % 100 != 0) || (year % 400 == 0)
time::OffsetDateTime::now_utc()
.format(&time::macros::format_description!("[year]-[month]-[day]T[hour]:[minute]:[second]"))
.unwrap_or_default()
}
fn truncate_string(s: &str, max_len: usize) -> String {
if s.len() <= max_len {
if s.chars().count() <= max_len {
s.to_string()
} else {
format!("{}... (truncated)", &s[..max_len])
format!("{}... (truncated)", s.chars().take(max_len).collect::<String>())
}
}

View File

@@ -92,7 +92,7 @@ async fn run_server(config_path: &str, port: Option<u16>, host: Option<String>)
let mut interval = tokio::time::interval(std::time::Duration::from_secs(check_interval));
loop {
interval.tick().await;
manager_clone.cleanup_idle(idle_timeout);
manager_clone.cleanup_idle(idle_timeout).await;
}
});

View File

@@ -1,16 +1,16 @@
use anyhow::{Result, bail};
use ssh2::Session;
use std::collections::HashMap;
use std::net::TcpStream;
use std::sync::Mutex;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::io::Read;
use std::path::PathBuf;
use std::path::Path;
use tokio::sync::Mutex;
use russh::client::{self, Config};
use russh::keys::{load_secret_key, PrivateKeyWithHashAlg, ssh_key};
use crate::config::SshServerConfig;
struct SessionState {
session: Session,
session: Arc<client::Handle<Client>>,
last_used: Instant,
}
@@ -19,6 +19,20 @@ pub struct SessionManager {
configs: Mutex<HashMap<String, SshServerConfig>>,
}
// 简单的 SSH 客户端处理器
struct Client;
impl client::Handler for Client {
type Error = russh::Error;
async fn check_server_key(
&mut self,
_server_public_key: &ssh_key::PublicKey,
) -> Result<bool, russh::Error> {
Ok(true)
}
}
impl SessionManager {
pub fn new(configs: &[SshServerConfig]) -> Result<Self> {
let mut config_map = HashMap::new();
@@ -33,83 +47,127 @@ impl SessionManager {
})
}
fn get_or_create_session(&self, name: &str) -> Result<Session> {
async fn get_or_create_session(&self, name: &str) -> Result<Arc<client::Handle<Client>>> {
// 先检查现有会话(快速路径)
{
let mut sessions = self.sessions.lock().unwrap();
let mut sessions = self.sessions.lock().await;
if let Some(state) = sessions.get_mut(name) {
if state.session.authenticated() {
state.last_used = Instant::now();
return Ok(state.session.clone());
} else {
println!("[Session] Session expired, reconnecting: {}", name);
sessions.remove(name);
}
state.last_used = Instant::now();
return Ok(state.session.clone());
}
}
let cfg = self.configs.lock().unwrap()
let cfg = self.configs.lock().await
.get(name)
.ok_or_else(|| anyhow::anyhow!("Server '{}' not found", name))?
.clone();
println!("[LazyInit] Connecting to: {}", name);
let session = self.create_session(&cfg)?;
let session = Arc::new(self.create_session(&cfg).await?);
{
let mut sessions = self.sessions.lock().unwrap();
sessions.insert(name.to_string(), SessionState {
session: session.clone(),
last_used: Instant::now(),
});
// 插入时检查是否已被其他线程创建(避免竞态重复)
let mut sessions = self.sessions.lock().await;
if let Some(state) = sessions.get_mut(name) {
return Ok(state.session.clone());
}
sessions.insert(name.to_string(), SessionState {
session: session.clone(),
last_used: Instant::now(),
});
println!("[LazyInit] Connected: {}", name);
Ok(session)
}
fn create_session(&self, cfg: &SshServerConfig) -> Result<Session> {
async fn create_session(&self, cfg: &SshServerConfig) -> Result<client::Handle<Client>> {
let addr = format!("{}:{}", cfg.host, cfg.port);
let tcp = TcpStream::connect(&addr)
let ssh_config = Arc::new(Config::default());
let mut session = client::connect(
ssh_config,
&addr,
Client,
).await
.map_err(|e| anyhow::anyhow!("Failed to connect to {}: {}", addr, e))?;
let mut session = Session::new()?;
session.set_tcp_stream(tcp);
session.handshake()?;
// 认证
if let Some(key_path) = cfg.get_private_key_path() {
let pubkey_path: PathBuf = key_path.with_extension("pub");
session.userauth_pubkey_file(&cfg.user, Some(&pubkey_path), &key_path, None)?;
let key_path_str = key_path.to_string_lossy();
let key_pair = load_secret_key(Path::new(&*key_path_str), None)
.map_err(|e| anyhow::anyhow!("Failed to load key {}: {}", key_path_str, e))?;
// 获取最佳 RSA hash如果使用 RSA 密钥)
let best_hash = session.best_supported_rsa_hash().await?;
let auth_result = session.authenticate_publickey(
&cfg.user,
PrivateKeyWithHashAlg::new(
Arc::new(key_pair),
best_hash.flatten(),
),
).await
.map_err(|e| anyhow::anyhow!("Public key auth failed: {}", e))?;
if !auth_result.success() {
bail!("Public key authentication failed");
}
} else if let Some(ref password) = cfg.password {
session.userauth_password(&cfg.user, password)?;
let auth_result = session.authenticate_password(&cfg.user, password).await
.map_err(|e| anyhow::anyhow!("Password auth failed: {}", e))?;
if !auth_result.success() {
bail!("Password authentication failed");
}
} else {
bail!("No authentication method configured");
}
if !session.authenticated() {
bail!("SSH authentication failed");
}
Ok(session)
}
pub fn exec(&self, name: &str, command: &str) -> Result<ExecResult> {
pub async fn exec(&self, name: &str, command: &str) -> Result<ExecResult> {
let start = Instant::now();
let session = self.get_or_create_session(name)?;
let session = self.get_or_create_session(name).await?;
let mut channel = session.channel_session()?;
channel.exec(command)?;
let mut channel = match session.channel_open_session().await {
Ok(ch) => ch,
Err(e) => {
// 会话可能已失效,清除并重连
println!("[Reconnect] Session stale for {}: {}, reconnecting...", name, e);
self.remove_session(name).await;
let session = self.get_or_create_session(name).await?;
session.channel_open_session().await
.map_err(|e| anyhow::anyhow!("Failed to open channel after reconnect: {}", e))?
}
};
channel.exec(true, command).await
.map_err(|e| anyhow::anyhow!("Failed to exec command: {}", e))?;
let mut stdout = String::new();
let mut stderr = String::new();
if let Err(e) = channel.read_to_string(&mut stdout) {
eprintln!("[SSH] stdout read error: {}", e);
}
if let Err(e) = channel.stderr().read_to_string(&mut stderr) {
eprintln!("[SSH] stderr read error: {}", e);
}
let mut exit_code: i32 = -1;
channel.wait_close()?;
let exit_code = channel.exit_status()?;
// 读取输出
while let Some(msg) = channel.wait().await {
match msg {
russh::ChannelMsg::Data { data } => {
stdout.push_str(&String::from_utf8_lossy(&data));
}
russh::ChannelMsg::ExtendedData { data, ext } => {
if ext == 1 { // stderr
stderr.push_str(&String::from_utf8_lossy(&data));
}
}
russh::ChannelMsg::ExitStatus { exit_status } => {
exit_code = exit_status as i32;
}
russh::ChannelMsg::Eof => {
// 继续等待 ExitStatus
}
_ => {}
}
}
Ok(ExecResult {
stdout,
@@ -120,47 +178,62 @@ impl SessionManager {
}
pub fn list_servers(&self) -> Vec<ServerInfo> {
let sessions = self.sessions.lock().unwrap();
let configs = self.configs.lock().unwrap();
configs.iter().map(|(name, cfg)| {
let status = if sessions.contains_key(name) { "connected" } else { "pending" };
ServerInfo {
name: name.clone(),
host: cfg.host.clone(),
port: cfg.port,
user: cfg.user.clone(),
status: status.to_string(),
// 使用 try_lock 避免阻塞,如果锁不可用则返回 pending
let sessions = self.sessions.try_lock();
let configs = self.configs.try_lock();
match (sessions, configs) {
(Ok(sessions), Ok(configs)) => {
configs.iter().map(|(name, cfg)| {
let status = if sessions.contains_key(name) { "connected" } else { "pending" };
ServerInfo {
name: name.clone(),
host: cfg.host.clone(),
port: cfg.port,
user: cfg.user.clone(),
status: status.to_string(),
}
}).collect()
}
}).collect()
_ => {
// 如果锁不可用,返回空列表
vec![]
}
}
}
/// 动态添加服务器 (临时,重启后消失)
pub fn add_server(&self, cfg: SshServerConfig) -> Result<()> {
pub async fn add_server(&self, cfg: SshServerConfig) -> Result<()> {
let name = cfg.name.clone();
let mut configs = self.configs.lock().unwrap();
if configs.contains_key(&name) {
anyhow::bail!("Server '{}' already exists", name);
{
let configs = self.configs.lock().await;
if configs.contains_key(&name) {
anyhow::bail!("Server '{}' already exists", name);
}
}
println!("[Dynamic] Adding: {} ({}:{})", name, cfg.host, cfg.port);
// 测试连接
let session = self.create_session(&cfg)?;
let session = self.create_session(&cfg).await?;
{
let mut sessions = self.sessions.lock().unwrap();
let mut sessions = self.sessions.lock().await;
sessions.insert(name.clone(), SessionState {
session,
session: Arc::new(session),
last_used: Instant::now(),
});
}
configs.insert(name.clone(), cfg);
{
let mut configs = self.configs.lock().await;
configs.insert(name.clone(), cfg);
}
println!("[Dynamic] ✓ Added: {}", name);
Ok(())
}
pub fn cleanup_idle(&self, timeout_secs: u64) {
let mut sessions = self.sessions.lock().unwrap();
pub async fn cleanup_idle(&self, timeout_secs: u64) {
let mut sessions = self.sessions.lock().await;
let now = Instant::now();
sessions.retain(|name, state| {
let elapsed = now.duration_since(state.last_used);
@@ -172,6 +245,11 @@ impl SessionManager {
}
});
}
async fn remove_session(&self, name: &str) {
let mut sessions = self.sessions.lock().await;
sessions.remove(name);
}
}
#[derive(Debug, Clone, serde::Serialize)]
@@ -189,4 +267,4 @@ pub struct ServerInfo {
pub port: u16,
pub user: String,
pub status: String,
}
}