fix(saas-relay): eliminate DATABASE_ERROR by removing DB queries from critical path
Some checks failed
CI / Lint & TypeCheck (push) Has been cancelled
CI / Unit Tests (push) Has been cancelled
CI / Build Frontend (push) Has been cancelled
CI / Rust Check (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
CI / E2E Tests (push) Has been cancelled
Some checks failed
CI / Lint & TypeCheck (push) Has been cancelled
CI / Unit Tests (push) Has been cancelled
CI / Build Frontend (push) Has been cancelled
CI / Rust Check (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
CI / E2E Tests (push) Has been cancelled
Root cause: each relay request executes 13-17 serial DB queries, exhausting the 50-connection pool under concurrency. When pool is exhausted, sqlx returns PoolTimedOut which maps to 500 DATABASE_ERROR. Fixes: 1. log_operation → dispatch_log_operation (async Worker dispatch, non-blocking) 2. record_usage → tokio::spawn (3 DB queries moved off critical path) 3. DB pool: max_connections 50→100 (env-configurable), acquire_timeout 5s→8s Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -8,10 +8,22 @@ const SCHEMA_VERSION: i32 = 11;
|
|||||||
|
|
||||||
/// 初始化数据库
|
/// 初始化数据库
|
||||||
pub async fn init_db(database_url: &str) -> SaasResult<PgPool> {
|
pub async fn init_db(database_url: &str) -> SaasResult<PgPool> {
|
||||||
|
// 连接池大小可通过环境变量配置,默认 100(relay 请求每次 10+ 串行查询,50 偏紧)
|
||||||
|
let max_connections: u32 = std::env::var("ZCLAW_DB_MAX_CONNECTIONS")
|
||||||
|
.ok()
|
||||||
|
.and_then(|v| v.parse().ok())
|
||||||
|
.unwrap_or(100);
|
||||||
|
let min_connections: u32 = std::env::var("ZCLAW_DB_MIN_CONNECTIONS")
|
||||||
|
.ok()
|
||||||
|
.and_then(|v| v.parse().ok())
|
||||||
|
.unwrap_or(5);
|
||||||
|
|
||||||
|
tracing::info!("Database pool: max={}, min={}", max_connections, min_connections);
|
||||||
|
|
||||||
let pool = PgPoolOptions::new()
|
let pool = PgPoolOptions::new()
|
||||||
.max_connections(50)
|
.max_connections(max_connections)
|
||||||
.min_connections(3)
|
.min_connections(min_connections)
|
||||||
.acquire_timeout(std::time::Duration::from_secs(5))
|
.acquire_timeout(std::time::Duration::from_secs(8))
|
||||||
.idle_timeout(std::time::Duration::from_secs(180))
|
.idle_timeout(std::time::Duration::from_secs(180))
|
||||||
.max_lifetime(std::time::Duration::from_secs(900))
|
.max_lifetime(std::time::Duration::from_secs(900))
|
||||||
.connect(database_url)
|
.connect(database_url)
|
||||||
|
|||||||
@@ -9,7 +9,7 @@ use axum::{
|
|||||||
use crate::state::AppState;
|
use crate::state::AppState;
|
||||||
use crate::error::{SaasError, SaasResult};
|
use crate::error::{SaasError, SaasResult};
|
||||||
use crate::auth::types::AuthContext;
|
use crate::auth::types::AuthContext;
|
||||||
use crate::auth::handlers::{log_operation, check_permission};
|
use crate::auth::handlers::check_permission;
|
||||||
use crate::model_config::service as model_service;
|
use crate::model_config::service as model_service;
|
||||||
use super::{types::*, service};
|
use super::{types::*, service};
|
||||||
|
|
||||||
@@ -171,8 +171,11 @@ pub async fn chat_completions(
|
|||||||
max_attempts,
|
max_attempts,
|
||||||
).await?;
|
).await?;
|
||||||
|
|
||||||
log_operation(&state.db, &ctx.account_id, "relay.request", "relay_task", &task.id,
|
// 异步派发操作日志(非阻塞,不占用关键路径 DB 连接)
|
||||||
Some(serde_json::json!({"model": model_name, "stream": stream})), ctx.client_ip.as_deref()).await?;
|
state.dispatch_log_operation(
|
||||||
|
&ctx.account_id, "relay.request", "relay_task", &task.id,
|
||||||
|
Some(serde_json::json!({"model": model_name, "stream": stream})), ctx.client_ip.as_deref(),
|
||||||
|
).await;
|
||||||
|
|
||||||
// 执行中转 (Key Pool 自动选择 + 429 轮转)
|
// 执行中转 (Key Pool 自动选择 + 429 轮转)
|
||||||
let response = service::execute_relay(
|
let response = service::execute_relay(
|
||||||
@@ -183,25 +186,39 @@ pub async fn chat_completions(
|
|||||||
&enc_key,
|
&enc_key,
|
||||||
).await;
|
).await;
|
||||||
|
|
||||||
|
// 克隆用于异步 usage 记录
|
||||||
|
let db_usage = state.db.clone();
|
||||||
|
let account_id_usage = ctx.account_id.clone();
|
||||||
|
let provider_id_usage = target_model.provider_id.clone();
|
||||||
|
let model_id_usage = target_model.model_id.clone();
|
||||||
|
|
||||||
match response {
|
match response {
|
||||||
Ok(service::RelayResponse::Json(body)) => {
|
Ok(service::RelayResponse::Json(body)) => {
|
||||||
let (input_tokens, output_tokens) = service::extract_token_usage_from_json(&body);
|
let (input_tokens, output_tokens) = service::extract_token_usage_from_json(&body);
|
||||||
model_service::record_usage(
|
// 异步记录 usage(不阻塞响应)
|
||||||
&state.db, &ctx.account_id, &target_model.provider_id,
|
tokio::spawn(async move {
|
||||||
&target_model.model_id, input_tokens, output_tokens,
|
if let Err(e) = model_service::record_usage(
|
||||||
None, "success", None,
|
&db_usage, &account_id_usage, &provider_id_usage,
|
||||||
).await?;
|
&model_id_usage, input_tokens, output_tokens,
|
||||||
|
None, "success", None,
|
||||||
|
).await {
|
||||||
|
tracing::warn!("Failed to record relay usage: {}", e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
Ok((StatusCode::OK, [(axum::http::header::CONTENT_TYPE, "application/json")], body).into_response())
|
Ok((StatusCode::OK, [(axum::http::header::CONTENT_TYPE, "application/json")], body).into_response())
|
||||||
}
|
}
|
||||||
Ok(service::RelayResponse::Sse(body)) => {
|
Ok(service::RelayResponse::Sse(body)) => {
|
||||||
// SSE 流的 usage 统计在 service 层异步处理
|
// 异步记录 SSE 占位 usage
|
||||||
// 这里先记录一个占位记录,实际值会在流结束后更新
|
tokio::spawn(async move {
|
||||||
model_service::record_usage(
|
if let Err(e) = model_service::record_usage(
|
||||||
&state.db, &ctx.account_id, &target_model.provider_id,
|
&db_usage, &account_id_usage, &provider_id_usage,
|
||||||
&target_model.model_id, 0, 0,
|
&model_id_usage, 0, 0,
|
||||||
None, "streaming", None,
|
None, "streaming", None,
|
||||||
).await?;
|
).await {
|
||||||
|
tracing::warn!("Failed to record SSE usage placeholder: {}", e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
let response = axum::response::Response::builder()
|
let response = axum::response::Response::builder()
|
||||||
.status(StatusCode::OK)
|
.status(StatusCode::OK)
|
||||||
@@ -213,11 +230,17 @@ pub async fn chat_completions(
|
|||||||
Ok(response)
|
Ok(response)
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
model_service::record_usage(
|
// 异步记录失败 usage(不阻塞错误响应)
|
||||||
&state.db, &ctx.account_id, &target_model.provider_id,
|
let error_msg = e.to_string();
|
||||||
&target_model.model_id, 0, 0,
|
tokio::spawn(async move {
|
||||||
None, "failed", Some(&e.to_string()),
|
if let Err(e2) = model_service::record_usage(
|
||||||
).await?;
|
&db_usage, &account_id_usage, &provider_id_usage,
|
||||||
|
&model_id_usage, 0, 0,
|
||||||
|
None, "failed", Some(&error_msg),
|
||||||
|
).await {
|
||||||
|
tracing::warn!("Failed to record relay failure usage: {}", e2);
|
||||||
|
}
|
||||||
|
});
|
||||||
Err(e)
|
Err(e)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -347,8 +370,11 @@ pub async fn retry_task(
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
log_operation(&state.db, &ctx.account_id, "relay.retry", "relay_task", &id,
|
// 异步派发操作日志
|
||||||
None, ctx.client_ip.as_deref()).await?;
|
state.dispatch_log_operation(
|
||||||
|
&ctx.account_id, "relay.retry", "relay_task", &id,
|
||||||
|
None, ctx.client_ip.as_deref(),
|
||||||
|
).await;
|
||||||
|
|
||||||
Ok(Json(serde_json::json!({"ok": true, "task_id": id})))
|
Ok(Json(serde_json::json!({"ok": true, "task_id": id})))
|
||||||
}
|
}
|
||||||
@@ -407,9 +433,12 @@ pub async fn add_provider_key(
|
|||||||
req.priority, req.max_rpm, req.max_tpm,
|
req.priority, req.max_rpm, req.max_tpm,
|
||||||
).await?;
|
).await?;
|
||||||
|
|
||||||
log_operation(&state.db, &ctx.account_id, "provider_key.add", "provider_key", &key_id,
|
// 异步派发操作日志
|
||||||
|
state.dispatch_log_operation(
|
||||||
|
&ctx.account_id, "provider_key.add", "provider_key", &key_id,
|
||||||
Some(serde_json::json!({"provider_id": provider_id, "label": req.key_label})),
|
Some(serde_json::json!({"provider_id": provider_id, "label": req.key_label})),
|
||||||
ctx.client_ip.as_deref()).await?;
|
ctx.client_ip.as_deref(),
|
||||||
|
).await;
|
||||||
|
|
||||||
Ok(Json(serde_json::json!({"ok": true, "key_id": key_id})))
|
Ok(Json(serde_json::json!({"ok": true, "key_id": key_id})))
|
||||||
}
|
}
|
||||||
@@ -430,9 +459,12 @@ pub async fn toggle_provider_key(
|
|||||||
|
|
||||||
super::key_pool::toggle_key_active(&state.db, &key_id, req.active).await?;
|
super::key_pool::toggle_key_active(&state.db, &key_id, req.active).await?;
|
||||||
|
|
||||||
log_operation(&state.db, &ctx.account_id, "provider_key.toggle", "provider_key", &key_id,
|
// 异步派发操作日志
|
||||||
|
state.dispatch_log_operation(
|
||||||
|
&ctx.account_id, "provider_key.toggle", "provider_key", &key_id,
|
||||||
Some(serde_json::json!({"provider_id": provider_id, "active": req.active})),
|
Some(serde_json::json!({"provider_id": provider_id, "active": req.active})),
|
||||||
ctx.client_ip.as_deref()).await?;
|
ctx.client_ip.as_deref(),
|
||||||
|
).await;
|
||||||
|
|
||||||
Ok(Json(serde_json::json!({"ok": true})))
|
Ok(Json(serde_json::json!({"ok": true})))
|
||||||
}
|
}
|
||||||
@@ -447,9 +479,12 @@ pub async fn delete_provider_key(
|
|||||||
|
|
||||||
super::key_pool::delete_provider_key(&state.db, &key_id).await?;
|
super::key_pool::delete_provider_key(&state.db, &key_id).await?;
|
||||||
|
|
||||||
log_operation(&state.db, &ctx.account_id, "provider_key.delete", "provider_key", &key_id,
|
// 异步派发操作日志
|
||||||
|
state.dispatch_log_operation(
|
||||||
|
&ctx.account_id, "provider_key.delete", "provider_key", &key_id,
|
||||||
Some(serde_json::json!({"provider_id": provider_id})),
|
Some(serde_json::json!({"provider_id": provider_id})),
|
||||||
ctx.client_ip.as_deref()).await?;
|
ctx.client_ip.as_deref(),
|
||||||
|
).await;
|
||||||
|
|
||||||
Ok(Json(serde_json::json!({"ok": true})))
|
Ok(Json(serde_json::json!({"ok": true})))
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user