From 76cdfd0c0082adde63b267f8d8008b9326de998d Mon Sep 17 00:00:00 2001 From: iven Date: Wed, 15 Apr 2026 01:40:27 +0800 Subject: [PATCH] =?UTF-8?q?fix(saas):=20SSE=20=E7=94=A8=E9=87=8F=E7=BB=9F?= =?UTF-8?q?=E8=AE=A1=E4=B8=80=E8=87=B4=E6=80=A7=E4=BF=AE=E5=A4=8D=20?= =?UTF-8?q?=E2=80=94=20=E5=9B=9E=E5=86=99=20usage=5Frecords=20+=20?= =?UTF-8?q?=E6=B6=88=E9=99=A4=20relay=5Frequests=20=E5=8F=8C=E9=87=8D?= =?UTF-8?q?=E8=AE=A1=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - service.rs: SSE 流结束后回写 usage_records 真实 token (status=success) - service.rs: spawned task 中调用 increment_usage 统一递增 tokens + relay_requests - handlers.rs: 移除 SSE 路径的 increment_dimension("relay_requests") 消除双重计数 - 从 request_body 提取 model_id 用于 usage_records 精准归因 --- crates/zclaw-saas/src/relay/handlers.rs | 8 +------- crates/zclaw-saas/src/relay/service.rs | 21 +++++++++++++++++---- 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/crates/zclaw-saas/src/relay/handlers.rs b/crates/zclaw-saas/src/relay/handlers.rs index d949bc4..a5e9182 100644 --- a/crates/zclaw-saas/src/relay/handlers.rs +++ b/crates/zclaw-saas/src/relay/handlers.rs @@ -333,14 +333,8 @@ pub async fn chat_completions( } } - // SSE: relay_requests 实时递增(tokens 由 AggregateUsageWorker 对账修正) - if let Err(e) = crate::billing::service::increment_dimension( - &state.db, &account_id_usage, "relay_requests", - ).await { - tracing::warn!("Failed to increment billing relay_requests for {}: {}", account_id_usage, e); - } - // SSE 流已返回,递减队列计数器(流式任务开始处理) + // 注意: relay_requests 和 tokens 统一由 execute_relay spawned task 中的 increment_usage 递增 state.cache.relay_dequeue(&account_id_usage); let response = axum::response::Response::builder() diff --git a/crates/zclaw-saas/src/relay/service.rs b/crates/zclaw-saas/src/relay/service.rs index d1b476e..03d9560 100644 --- a/crates/zclaw-saas/src/relay/service.rs +++ b/crates/zclaw-saas/src/relay/service.rs @@ -333,6 +333,12 @@ pub async fn execute_relay( let task_id_clone = task_id.to_string(); let key_id_for_spawn = key_id.clone(); let account_id_clone = account_id.to_string(); + let provider_id_clone = provider_id.to_string(); + // 从 request_body 提取 model_id 用于 usage_records 归因 + let model_id_clone = serde_json::from_str::(request_body) + .ok() + .and_then(|v| v.get("model").and_then(|m| m.as_str()).map(String::from)) + .unwrap_or_default(); // Bounded channel for backpressure: 128 chunks (~128KB) buffer. // If the client reads slowly, the upstream is signaled via @@ -434,16 +440,23 @@ pub async fn execute_relay( let input_opt = if input > 0 { Some(input) } else { None }; let output_opt = if output > 0 { Some(output) } else { None }; - // Record task status + billing usage + key usage + // Record task status + billing usage + key usage + usage_records let db_op = async { if let Err(e) = update_task_status(&db_clone, &task_id_clone, "completed", input_opt, output_opt, None).await { tracing::warn!("Failed to update task status after SSE stream: {}", e); } - // P2-9 修复: SSE 路径也更新 billing_usage_quotas + // SSE 路径回写 usage_records + billing 配额 if input > 0 || output > 0 { + // 回写 usage_records 真实 token(补全 handlers.rs 中 token=0 的占位记录) + if let Err(e) = crate::model_config::service::record_usage( + &db_clone, &account_id_clone, &provider_id_clone, &model_id_clone, + input, output, None, "success", None, + ).await { + tracing::warn!("Failed to record SSE usage for task {}: {}", task_id_clone, e); + } + // 更新 billing_usage_quotas(tokens + relay_requests 同步递增) if let Err(e) = crate::billing::service::increment_usage( - &db_clone, &account_id_clone, - input, output, + &db_clone, &account_id_clone, input, output, ).await { tracing::warn!("Failed to increment billing usage for SSE task {}: {}", task_id_clone, e); }