From 90855dc83e611f4bc425fd392d0c4a9640c7cb9a Mon Sep 17 00:00:00 2001 From: iven Date: Sun, 5 Apr 2026 19:18:41 +0800 Subject: [PATCH] fix(desktop): resolve 2 release-blocking P1 defects P1-04: GenerationPipeline hardcoded model="default" causing classroom generation 404. Added model field to GenerationPipeline struct, passed from kernel config via with_driver(driver, model). Static scene generation now receives model parameter. P1-03: LLM API concurrent 500 DATABASE_ERROR. Added transient DB error retry (PoolTimedOut/Io) in create_relay_task with 200ms backoff. Recommend setting ZCLAW_DB_MIN_CONNECTIONS=10 for burst resilience. --- crates/zclaw-kernel/src/generation/mod.rs | 12 ++++--- crates/zclaw-saas/src/relay/service.rs | 31 ++++++++++++++++--- .../src/classroom_commands/generate.rs | 2 +- 3 files changed, 35 insertions(+), 10 deletions(-) diff --git a/crates/zclaw-kernel/src/generation/mod.rs b/crates/zclaw-kernel/src/generation/mod.rs index a4aaf68..8722388 100644 --- a/crates/zclaw-kernel/src/generation/mod.rs +++ b/crates/zclaw-kernel/src/generation/mod.rs @@ -248,6 +248,7 @@ pub struct GenerationPipeline { scenes: Arc>>, agents_store: Arc>>, driver: Option>, + model: String, } impl GenerationPipeline { @@ -265,12 +266,14 @@ impl GenerationPipeline { scenes: Arc::new(RwLock::new(Vec::new())), agents_store: Arc::new(RwLock::new(Vec::new())), driver: None, + model: "default".to_string(), } } - pub fn with_driver(driver: Arc) -> Self { + pub fn with_driver(driver: Arc, model: String) -> Self { Self { driver: Some(driver), + model, ..Self::new() } } @@ -353,7 +356,7 @@ impl GenerationPipeline { let item = item.clone(); async move { if let Some(d) = driver { - Self::generate_scene_with_llm_static(d.as_ref(), &item, i).await + Self::generate_scene_with_llm_static(d.as_ref(), &self.model, &item, i).await } else { Self::generate_scene_for_item_static(&item, i) } @@ -413,7 +416,7 @@ impl GenerationPipeline { request: &GenerationRequest, ) -> Result> { let llm_request = CompletionRequest { - model: "default".to_string(), + model: self.model.clone(), system: Some(self.get_outline_system_prompt()), messages: vec![zclaw_types::Message::User { content: prompt.to_string(), @@ -469,6 +472,7 @@ Use Chinese if the topic is in Chinese. Include vivid metaphors and analogies."# async fn generate_scene_with_llm_static( driver: &dyn LlmDriver, + model: &str, item: &OutlineItem, order: usize, ) -> Result { @@ -488,7 +492,7 @@ Use Chinese if the topic is in Chinese. Include vivid metaphors and analogies."# ); let llm_request = CompletionRequest { - model: "default".to_string(), + model: model.to_string(), system: Some(Self::get_scene_system_prompt_static()), messages: vec![zclaw_types::Message::User { content: prompt, diff --git a/crates/zclaw-saas/src/relay/service.rs b/crates/zclaw-saas/src/relay/service.rs index 4cdb2c3..9e16371 100644 --- a/crates/zclaw-saas/src/relay/service.rs +++ b/crates/zclaw-saas/src/relay/service.rs @@ -33,6 +33,11 @@ fn is_retryable_error(e: &reqwest::Error) -> bool { // ============ Relay Task Management ============ +/// 判断 sqlx 错误是否为可重试的瞬态错误(连接池耗尽、临时网络故障) +fn is_transient_db_error(e: &sqlx::Error) -> bool { + matches!(e, sqlx::Error::PoolTimedOut | sqlx::Error::Io(_)) +} + pub async fn create_relay_task( db: &PgPool, account_id: &str, @@ -47,16 +52,32 @@ pub async fn create_relay_task( let request_hash = hash_request(request_body); let max_attempts = max_attempts.max(1).min(5); - // INSERT ... RETURNING 合并两次 DB 往返为一次 - let row: RelayTaskRow = sqlx::query_as( + let query = sqlx::query_as::<_, RelayTaskRow>( "INSERT INTO relay_tasks (id, account_id, provider_id, model_id, request_hash, request_body, status, priority, attempt_count, max_attempts, queued_at, created_at) VALUES ($1, $2, $3, $4, $5, $6, 'queued', $7, 0, $8, $9, $9) RETURNING id, account_id, provider_id, model_id, status, priority, attempt_count, max_attempts, input_tokens, output_tokens, error_message, queued_at, started_at, completed_at, created_at" ) .bind(&id).bind(account_id).bind(provider_id).bind(model_id) - .bind(&request_hash).bind(request_body).bind(priority).bind(max_attempts as i64).bind(&now) - .fetch_one(db) - .await?; + .bind(&request_hash).bind(request_body).bind(priority).bind(max_attempts as i64).bind(&now); + + // 对瞬时 DB 错误(连接池耗尽/超时)重试一次 + let row = match query.fetch_one(db).await { + Ok(row) => row, + Err(e) if is_transient_db_error(&e) => { + tracing::warn!("Transient DB error in create_relay_task, retrying: {}", e); + tokio::time::sleep(Duration::from_millis(200)).await; + sqlx::query_as::<_, RelayTaskRow>( + "INSERT INTO relay_tasks (id, account_id, provider_id, model_id, request_hash, request_body, status, priority, attempt_count, max_attempts, queued_at, created_at) + VALUES ($1, $2, $3, $4, $5, $6, 'queued', $7, 0, $8, $9, $9) + RETURNING id, account_id, provider_id, model_id, status, priority, attempt_count, max_attempts, input_tokens, output_tokens, error_message, queued_at, started_at, completed_at, created_at" + ) + .bind(&id).bind(account_id).bind(provider_id).bind(model_id) + .bind(&request_hash).bind(request_body).bind(priority).bind(max_attempts as i64).bind(&now) + .fetch_one(db) + .await? + } + Err(e) => return Err(e.into()), + }; Ok(RelayTaskInfo { id: row.id, account_id: row.account_id, provider_id: row.provider_id, model_id: row.model_id, diff --git a/desktop/src-tauri/src/classroom_commands/generate.rs b/desktop/src-tauri/src/classroom_commands/generate.rs index 1de0f61..da55475 100644 --- a/desktop/src-tauri/src/classroom_commands/generate.rs +++ b/desktop/src-tauri/src/classroom_commands/generate.rs @@ -132,7 +132,7 @@ pub async fn classroom_generate( let pipeline = { let ks = kernel_state.lock().await; if let Some(kernel) = ks.as_ref() { - GenerationPipeline::with_driver(kernel.driver()) + GenerationPipeline::with_driver(kernel.driver(), kernel.config().model().to_string()) } else { GenerationPipeline::new() }