diff --git a/crates/zclaw-saas/src/db.rs b/crates/zclaw-saas/src/db.rs index 73f2ea9..f070a6e 100644 --- a/crates/zclaw-saas/src/db.rs +++ b/crates/zclaw-saas/src/db.rs @@ -8,10 +8,22 @@ const SCHEMA_VERSION: i32 = 11; /// 初始化数据库 pub async fn init_db(database_url: &str) -> SaasResult { + // 连接池大小可通过环境变量配置,默认 100(relay 请求每次 10+ 串行查询,50 偏紧) + let max_connections: u32 = std::env::var("ZCLAW_DB_MAX_CONNECTIONS") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(100); + let min_connections: u32 = std::env::var("ZCLAW_DB_MIN_CONNECTIONS") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(5); + + tracing::info!("Database pool: max={}, min={}", max_connections, min_connections); + let pool = PgPoolOptions::new() - .max_connections(50) - .min_connections(3) - .acquire_timeout(std::time::Duration::from_secs(5)) + .max_connections(max_connections) + .min_connections(min_connections) + .acquire_timeout(std::time::Duration::from_secs(8)) .idle_timeout(std::time::Duration::from_secs(180)) .max_lifetime(std::time::Duration::from_secs(900)) .connect(database_url) diff --git a/crates/zclaw-saas/src/relay/handlers.rs b/crates/zclaw-saas/src/relay/handlers.rs index 8075c58..79d6f91 100644 --- a/crates/zclaw-saas/src/relay/handlers.rs +++ b/crates/zclaw-saas/src/relay/handlers.rs @@ -9,7 +9,7 @@ use axum::{ use crate::state::AppState; use crate::error::{SaasError, SaasResult}; use crate::auth::types::AuthContext; -use crate::auth::handlers::{log_operation, check_permission}; +use crate::auth::handlers::check_permission; use crate::model_config::service as model_service; use super::{types::*, service}; @@ -171,8 +171,11 @@ pub async fn chat_completions( max_attempts, ).await?; - log_operation(&state.db, &ctx.account_id, "relay.request", "relay_task", &task.id, - Some(serde_json::json!({"model": model_name, "stream": stream})), ctx.client_ip.as_deref()).await?; + // 异步派发操作日志(非阻塞,不占用关键路径 DB 连接) + state.dispatch_log_operation( + &ctx.account_id, "relay.request", "relay_task", &task.id, + Some(serde_json::json!({"model": model_name, "stream": stream})), ctx.client_ip.as_deref(), + ).await; // 执行中转 (Key Pool 自动选择 + 429 轮转) let response = service::execute_relay( @@ -183,25 +186,39 @@ pub async fn chat_completions( &enc_key, ).await; + // 克隆用于异步 usage 记录 + let db_usage = state.db.clone(); + let account_id_usage = ctx.account_id.clone(); + let provider_id_usage = target_model.provider_id.clone(); + let model_id_usage = target_model.model_id.clone(); + match response { Ok(service::RelayResponse::Json(body)) => { let (input_tokens, output_tokens) = service::extract_token_usage_from_json(&body); - model_service::record_usage( - &state.db, &ctx.account_id, &target_model.provider_id, - &target_model.model_id, input_tokens, output_tokens, - None, "success", None, - ).await?; + // 异步记录 usage(不阻塞响应) + tokio::spawn(async move { + if let Err(e) = model_service::record_usage( + &db_usage, &account_id_usage, &provider_id_usage, + &model_id_usage, input_tokens, output_tokens, + None, "success", None, + ).await { + tracing::warn!("Failed to record relay usage: {}", e); + } + }); Ok((StatusCode::OK, [(axum::http::header::CONTENT_TYPE, "application/json")], body).into_response()) } Ok(service::RelayResponse::Sse(body)) => { - // SSE 流的 usage 统计在 service 层异步处理 - // 这里先记录一个占位记录,实际值会在流结束后更新 - model_service::record_usage( - &state.db, &ctx.account_id, &target_model.provider_id, - &target_model.model_id, 0, 0, - None, "streaming", None, - ).await?; + // 异步记录 SSE 占位 usage + tokio::spawn(async move { + if let Err(e) = model_service::record_usage( + &db_usage, &account_id_usage, &provider_id_usage, + &model_id_usage, 0, 0, + None, "streaming", None, + ).await { + tracing::warn!("Failed to record SSE usage placeholder: {}", e); + } + }); let response = axum::response::Response::builder() .status(StatusCode::OK) @@ -213,11 +230,17 @@ pub async fn chat_completions( Ok(response) } Err(e) => { - model_service::record_usage( - &state.db, &ctx.account_id, &target_model.provider_id, - &target_model.model_id, 0, 0, - None, "failed", Some(&e.to_string()), - ).await?; + // 异步记录失败 usage(不阻塞错误响应) + let error_msg = e.to_string(); + tokio::spawn(async move { + if let Err(e2) = model_service::record_usage( + &db_usage, &account_id_usage, &provider_id_usage, + &model_id_usage, 0, 0, + None, "failed", Some(&error_msg), + ).await { + tracing::warn!("Failed to record relay failure usage: {}", e2); + } + }); Err(e) } } @@ -347,8 +370,11 @@ pub async fn retry_task( } }); - log_operation(&state.db, &ctx.account_id, "relay.retry", "relay_task", &id, - None, ctx.client_ip.as_deref()).await?; + // 异步派发操作日志 + state.dispatch_log_operation( + &ctx.account_id, "relay.retry", "relay_task", &id, + None, ctx.client_ip.as_deref(), + ).await; Ok(Json(serde_json::json!({"ok": true, "task_id": id}))) } @@ -407,9 +433,12 @@ pub async fn add_provider_key( req.priority, req.max_rpm, req.max_tpm, ).await?; - log_operation(&state.db, &ctx.account_id, "provider_key.add", "provider_key", &key_id, + // 异步派发操作日志 + state.dispatch_log_operation( + &ctx.account_id, "provider_key.add", "provider_key", &key_id, Some(serde_json::json!({"provider_id": provider_id, "label": req.key_label})), - ctx.client_ip.as_deref()).await?; + ctx.client_ip.as_deref(), + ).await; Ok(Json(serde_json::json!({"ok": true, "key_id": key_id}))) } @@ -430,9 +459,12 @@ pub async fn toggle_provider_key( super::key_pool::toggle_key_active(&state.db, &key_id, req.active).await?; - log_operation(&state.db, &ctx.account_id, "provider_key.toggle", "provider_key", &key_id, + // 异步派发操作日志 + state.dispatch_log_operation( + &ctx.account_id, "provider_key.toggle", "provider_key", &key_id, Some(serde_json::json!({"provider_id": provider_id, "active": req.active})), - ctx.client_ip.as_deref()).await?; + ctx.client_ip.as_deref(), + ).await; Ok(Json(serde_json::json!({"ok": true}))) } @@ -447,9 +479,12 @@ pub async fn delete_provider_key( super::key_pool::delete_provider_key(&state.db, &key_id).await?; - log_operation(&state.db, &ctx.account_id, "provider_key.delete", "provider_key", &key_id, + // 异步派发操作日志 + state.dispatch_log_operation( + &ctx.account_id, "provider_key.delete", "provider_key", &key_id, Some(serde_json::json!({"provider_id": provider_id})), - ctx.client_ip.as_deref()).await?; + ctx.client_ip.as_deref(), + ).await; Ok(Json(serde_json::json!({"ok": true}))) }