fix(desktop): resolve 2 release-blocking P1 defects
P1-04: GenerationPipeline hardcoded model="default" causing classroom generation 404. Added model field to GenerationPipeline struct, passed from kernel config via with_driver(driver, model). Static scene generation now receives model parameter. P1-03: LLM API concurrent 500 DATABASE_ERROR. Added transient DB error retry (PoolTimedOut/Io) in create_relay_task with 200ms backoff. Recommend setting ZCLAW_DB_MIN_CONNECTIONS=10 for burst resilience.
This commit is contained in:
@@ -248,6 +248,7 @@ pub struct GenerationPipeline {
|
||||
scenes: Arc<RwLock<Vec<GeneratedScene>>>,
|
||||
agents_store: Arc<RwLock<Vec<AgentProfile>>>,
|
||||
driver: Option<Arc<dyn LlmDriver>>,
|
||||
model: String,
|
||||
}
|
||||
|
||||
impl GenerationPipeline {
|
||||
@@ -265,12 +266,14 @@ impl GenerationPipeline {
|
||||
scenes: Arc::new(RwLock::new(Vec::new())),
|
||||
agents_store: Arc::new(RwLock::new(Vec::new())),
|
||||
driver: None,
|
||||
model: "default".to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_driver(driver: Arc<dyn LlmDriver>) -> Self {
|
||||
pub fn with_driver(driver: Arc<dyn LlmDriver>, model: String) -> Self {
|
||||
Self {
|
||||
driver: Some(driver),
|
||||
model,
|
||||
..Self::new()
|
||||
}
|
||||
}
|
||||
@@ -353,7 +356,7 @@ impl GenerationPipeline {
|
||||
let item = item.clone();
|
||||
async move {
|
||||
if let Some(d) = driver {
|
||||
Self::generate_scene_with_llm_static(d.as_ref(), &item, i).await
|
||||
Self::generate_scene_with_llm_static(d.as_ref(), &self.model, &item, i).await
|
||||
} else {
|
||||
Self::generate_scene_for_item_static(&item, i)
|
||||
}
|
||||
@@ -413,7 +416,7 @@ impl GenerationPipeline {
|
||||
request: &GenerationRequest,
|
||||
) -> Result<Vec<OutlineItem>> {
|
||||
let llm_request = CompletionRequest {
|
||||
model: "default".to_string(),
|
||||
model: self.model.clone(),
|
||||
system: Some(self.get_outline_system_prompt()),
|
||||
messages: vec![zclaw_types::Message::User {
|
||||
content: prompt.to_string(),
|
||||
@@ -469,6 +472,7 @@ Use Chinese if the topic is in Chinese. Include vivid metaphors and analogies."#
|
||||
|
||||
async fn generate_scene_with_llm_static(
|
||||
driver: &dyn LlmDriver,
|
||||
model: &str,
|
||||
item: &OutlineItem,
|
||||
order: usize,
|
||||
) -> Result<GeneratedScene> {
|
||||
@@ -488,7 +492,7 @@ Use Chinese if the topic is in Chinese. Include vivid metaphors and analogies."#
|
||||
);
|
||||
|
||||
let llm_request = CompletionRequest {
|
||||
model: "default".to_string(),
|
||||
model: model.to_string(),
|
||||
system: Some(Self::get_scene_system_prompt_static()),
|
||||
messages: vec![zclaw_types::Message::User {
|
||||
content: prompt,
|
||||
|
||||
@@ -33,6 +33,11 @@ fn is_retryable_error(e: &reqwest::Error) -> bool {
|
||||
|
||||
// ============ Relay Task Management ============
|
||||
|
||||
/// 判断 sqlx 错误是否为可重试的瞬态错误(连接池耗尽、临时网络故障)
|
||||
fn is_transient_db_error(e: &sqlx::Error) -> bool {
|
||||
matches!(e, sqlx::Error::PoolTimedOut | sqlx::Error::Io(_))
|
||||
}
|
||||
|
||||
pub async fn create_relay_task(
|
||||
db: &PgPool,
|
||||
account_id: &str,
|
||||
@@ -47,8 +52,21 @@ pub async fn create_relay_task(
|
||||
let request_hash = hash_request(request_body);
|
||||
let max_attempts = max_attempts.max(1).min(5);
|
||||
|
||||
// INSERT ... RETURNING 合并两次 DB 往返为一次
|
||||
let row: RelayTaskRow = sqlx::query_as(
|
||||
let query = sqlx::query_as::<_, RelayTaskRow>(
|
||||
"INSERT INTO relay_tasks (id, account_id, provider_id, model_id, request_hash, request_body, status, priority, attempt_count, max_attempts, queued_at, created_at)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, 'queued', $7, 0, $8, $9, $9)
|
||||
RETURNING id, account_id, provider_id, model_id, status, priority, attempt_count, max_attempts, input_tokens, output_tokens, error_message, queued_at, started_at, completed_at, created_at"
|
||||
)
|
||||
.bind(&id).bind(account_id).bind(provider_id).bind(model_id)
|
||||
.bind(&request_hash).bind(request_body).bind(priority).bind(max_attempts as i64).bind(&now);
|
||||
|
||||
// 对瞬时 DB 错误(连接池耗尽/超时)重试一次
|
||||
let row = match query.fetch_one(db).await {
|
||||
Ok(row) => row,
|
||||
Err(e) if is_transient_db_error(&e) => {
|
||||
tracing::warn!("Transient DB error in create_relay_task, retrying: {}", e);
|
||||
tokio::time::sleep(Duration::from_millis(200)).await;
|
||||
sqlx::query_as::<_, RelayTaskRow>(
|
||||
"INSERT INTO relay_tasks (id, account_id, provider_id, model_id, request_hash, request_body, status, priority, attempt_count, max_attempts, queued_at, created_at)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, 'queued', $7, 0, $8, $9, $9)
|
||||
RETURNING id, account_id, provider_id, model_id, status, priority, attempt_count, max_attempts, input_tokens, output_tokens, error_message, queued_at, started_at, completed_at, created_at"
|
||||
@@ -56,7 +74,10 @@ pub async fn create_relay_task(
|
||||
.bind(&id).bind(account_id).bind(provider_id).bind(model_id)
|
||||
.bind(&request_hash).bind(request_body).bind(priority).bind(max_attempts as i64).bind(&now)
|
||||
.fetch_one(db)
|
||||
.await?;
|
||||
.await?
|
||||
}
|
||||
Err(e) => return Err(e.into()),
|
||||
};
|
||||
|
||||
Ok(RelayTaskInfo {
|
||||
id: row.id, account_id: row.account_id, provider_id: row.provider_id, model_id: row.model_id,
|
||||
|
||||
@@ -132,7 +132,7 @@ pub async fn classroom_generate(
|
||||
let pipeline = {
|
||||
let ks = kernel_state.lock().await;
|
||||
if let Some(kernel) = ks.as_ref() {
|
||||
GenerationPipeline::with_driver(kernel.driver())
|
||||
GenerationPipeline::with_driver(kernel.driver(), kernel.config().model().to_string())
|
||||
} else {
|
||||
GenerationPipeline::new()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user