diff --git a/crates/zclaw-saas/src/relay/key_pool.rs b/crates/zclaw-saas/src/relay/key_pool.rs index be40cca..d4eb6c5 100644 --- a/crates/zclaw-saas/src/relay/key_pool.rs +++ b/crates/zclaw-saas/src/relay/key_pool.rs @@ -137,6 +137,7 @@ pub async fn select_best_key(db: &PgPool, provider_id: &str, enc_key: &[u8; 32]) } /// 记录 Key 使用量(滑动窗口) +/// 合并为 2 次查询:1 次 upsert 滑动窗口 + 1 次更新 provider_keys 累计统计(含 last_used_at) pub async fn record_key_usage( db: &PgPool, key_id: &str, @@ -144,6 +145,7 @@ pub async fn record_key_usage( ) -> SaasResult<()> { let current_minute = chrono::Utc::now().format("%Y-%m-%dT%H:%M").to_string(); + // 1. Upsert sliding window sqlx::query( "INSERT INTO key_usage_window (key_id, window_minute, request_count, token_count) VALUES ($1, $2, 1, $3) @@ -154,20 +156,18 @@ pub async fn record_key_usage( .bind(key_id).bind(¤t_minute).bind(tokens.unwrap_or(0)) .execute(db).await?; - // 更新 Key 的累计统计 + // 2. Update cumulative stats + last_used_at in one query sqlx::query( - "UPDATE provider_keys SET total_requests = total_requests + 1, total_tokens = total_tokens + COALESCE($1, 0), updated_at = $2 - WHERE id = $3" + "UPDATE provider_keys + SET total_requests = total_requests + 1, + total_tokens = total_tokens + COALESCE($1, 0), + last_used_at = NOW(), + updated_at = NOW() + WHERE id = $2" ) - .bind(tokens).bind(&chrono::Utc::now().to_rfc3339()).bind(key_id) + .bind(tokens).bind(key_id) .execute(db).await?; - // 更新最后使用时间 (LRU 排序依据) - sqlx::query("UPDATE provider_keys SET last_used_at = NOW() WHERE id = $1") - .bind(key_id) - .execute(db) - .await?; - Ok(()) } diff --git a/crates/zclaw-saas/src/relay/service.rs b/crates/zclaw-saas/src/relay/service.rs index 6c68cd4..399e705 100644 --- a/crates/zclaw-saas/src/relay/service.rs +++ b/crates/zclaw-saas/src/relay/service.rs @@ -314,20 +314,26 @@ pub async fn execute_relay( tokio::spawn(async move { let _permit = permit; // 持有 permit 直到任务完成 - tokio::time::sleep(std::time::Duration::from_secs(3)).await; + // Brief delay to allow SSE stream to settle before recording + tokio::time::sleep(std::time::Duration::from_millis(500)).await; let capture = usage_capture.lock().await; let (input, output) = ( if capture.input_tokens > 0 { Some(capture.input_tokens) } else { None }, if capture.output_tokens > 0 { Some(capture.output_tokens) } else { None }, ); - // 记录任务状态 - if let Err(e) = update_task_status(&db_clone, &task_id_clone, "completed", input, output, None).await { - tracing::warn!("Failed to update task status after SSE stream: {}", e); - } - // 记录 Key 使用量 - let total_tokens = input.unwrap_or(0) + output.unwrap_or(0); - if let Err(e) = super::key_pool::record_key_usage(&db_clone, &key_id_for_spawn, Some(total_tokens)).await { - tracing::warn!("Failed to record key usage: {}", e); + // Record task status with timeout to avoid holding DB connections + let db_op = async { + if let Err(e) = update_task_status(&db_clone, &task_id_clone, "completed", input, output, None).await { + tracing::warn!("Failed to update task status after SSE stream: {}", e); + } + // Record key usage (now 2 queries instead of 3) + let total_tokens = input.unwrap_or(0) + output.unwrap_or(0); + if let Err(e) = super::key_pool::record_key_usage(&db_clone, &key_id_for_spawn, Some(total_tokens)).await { + tracing::warn!("Failed to record key usage: {}", e); + } + }; + if tokio::time::timeout(std::time::Duration::from_secs(5), db_op).await.is_err() { + tracing::warn!("SSE usage recording timed out for task {}", task_id_clone); } });