diff --git a/crates/erp-ai/src/service/analysis_queue.rs b/crates/erp-ai/src/service/analysis_queue.rs index 6340713..18f6112 100644 --- a/crates/erp-ai/src/service/analysis_queue.rs +++ b/crates/erp-ai/src/service/analysis_queue.rs @@ -1,4 +1,4 @@ -use sea_orm::{ActiveModelTrait, EntityTrait, FromQueryResult, Set, Statement}; +use sea_orm::{ActiveModelTrait, EntityTrait, FromQueryResult, Set, Statement, TransactionTrait}; use uuid::Uuid; use crate::entity::ai_analysis_queue; @@ -93,43 +93,74 @@ impl AnalysisQueue { &self, tenant_id: Option, ) -> AiResult> { - let sql = match tenant_id { - Some(tid) => format!( - "SELECT * FROM ai_analysis_queue WHERE tenant_id = '{}' AND status = 'pending' AND deleted_at IS NULL AND scheduled_at <= NOW() ORDER BY priority DESC, scheduled_at ASC LIMIT 1", - tid - ), - None => r#" - SELECT * FROM ai_analysis_queue - WHERE status = 'pending' - AND deleted_at IS NULL - AND scheduled_at <= NOW() - ORDER BY priority DESC, scheduled_at ASC - LIMIT 1 - "# - .to_string(), - }; + // 事务内 SELECT ... FOR UPDATE SKIP LOCKED + UPDATE: + // - 参数化($1)消除原 format! 拼 tenant_id 的 SQL 注入风险 + // - FOR UPDATE SKIP LOCKED 在事务内持行锁到 UPDATE 完成,防多消费者并发重复 claim + let claimed = self + .db + .transaction::<_, Option, AiError>(|txn| { + Box::pin(async move { + let row: Option = match tenant_id { + Some(tid) => { + QueueRow::find_by_statement(Statement::from_sql_and_values( + sea_orm::DatabaseBackend::Postgres, + r#"SELECT * FROM ai_analysis_queue + WHERE tenant_id = $1 + AND status = 'pending' + AND deleted_at IS NULL + AND scheduled_at <= NOW() + ORDER BY priority DESC, scheduled_at ASC + LIMIT 1 + FOR UPDATE SKIP LOCKED"#, + [tid.into()], + )) + .one(txn) + .await? + } + None => { + QueueRow::find_by_statement(Statement::from_string( + sea_orm::DatabaseBackend::Postgres, + r#"SELECT * FROM ai_analysis_queue + WHERE status = 'pending' + AND deleted_at IS NULL + AND scheduled_at <= NOW() + ORDER BY priority DESC, scheduled_at ASC + LIMIT 1 + FOR UPDATE SKIP LOCKED"# + .to_string(), + )) + .one(txn) + .await? + } + }; - let row: Option = QueueRow::find_by_statement(Statement::from_string( - sea_orm::DatabaseBackend::Postgres, - sql.to_string(), - )) - .one(&self.db) - .await?; - - match row { - Some(r) => { - let now = chrono::Utc::now(); - let mut active: ai_analysis_queue::ActiveModel = - self.find_by_id(r.id).await?.into(); - active.status = Set("running".to_string()); - active.started_at = Set(Some(now)); - active.updated_at = Set(now); - active.version_lock = Set(active.version_lock.take().unwrap_or(0) + 1); - let model = active.update(&self.db).await?; - Ok(Some(model)) - } - None => Ok(None), - } + match row { + Some(r) => { + let now = chrono::Utc::now(); + let model = ai_analysis_queue::Entity::find_by_id(r.id) + .one(txn) + .await? + .ok_or_else(|| { + AiError::QueueError(format!("队列任务 {} 未找到", r.id)) + })?; + let mut active: ai_analysis_queue::ActiveModel = model.into(); + active.status = Set("running".to_string()); + active.started_at = Set(Some(now)); + active.updated_at = Set(now); + active.version_lock = Set(active.version_lock.take().unwrap_or(0) + 1); + let updated = active.update(txn).await?; + Ok(Some(updated)) + } + None => Ok(None), + } + }) + }) + .await + .map_err(|e| match e { + sea_orm::TransactionError::Connection(d) => d.into(), + sea_orm::TransactionError::Transaction(a) => a, + })?; + Ok(claimed) } pub async fn mark_completed(&self, id: Uuid, result_analysis_id: Uuid) -> AiResult<()> { diff --git a/crates/erp-server/migration/src/lib.rs b/crates/erp-server/migration/src/lib.rs index 7139136..d921de7 100644 --- a/crates/erp-server/migration/src/lib.rs +++ b/crates/erp-server/migration/src/lib.rs @@ -176,6 +176,7 @@ mod m20260526_000166_create_ai_knowledge_bases; mod m20260526_000167_create_ai_knowledge_documents; mod m20260527_000168_ai_knowledge_v2_menu; mod m20260529_000169_supplement_rls_for_new_tables; +mod m20260626_000170_extend_device_readings_partitions; pub struct Migrator; @@ -359,6 +360,7 @@ impl MigratorTrait for Migrator { Box::new(m20260526_000167_create_ai_knowledge_documents::Migration), Box::new(m20260527_000168_ai_knowledge_v2_menu::Migration), Box::new(m20260529_000169_supplement_rls_for_new_tables::Migration), + Box::new(m20260626_000170_extend_device_readings_partitions::Migration), ] } } diff --git a/crates/erp-server/migration/src/m20260626_000170_extend_device_readings_partitions.rs b/crates/erp-server/migration/src/m20260626_000170_extend_device_readings_partitions.rs new file mode 100644 index 0000000..96e7894 --- /dev/null +++ b/crates/erp-server/migration/src/m20260626_000170_extend_device_readings_partitions.rs @@ -0,0 +1,48 @@ +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +/// 补建 device_readings 分区到 2027_06。 +/// +/// 背景:m000073 只静态建了 2026_05~2026_08 四个分区,2026-09-01 起 INSERT 将因 +/// 无目标分区抛错,导致小程序 Veepoo M2 BLE 数据上传全线中断(确定性硬截止)。 +/// 本迁移补建 2026_09~2027_06 共 10 个月分区解除截止;中期应引入 pg_partman +/// 或定时任务自动维护未来分区(见系统分析 PP-02)。 +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + // 分区范围字面量为受控常量(非用户输入),与 m000073 写法一致 + let partitions: [(&str, &str, &str); 10] = [ + ("2026_09", "2026-09-01", "2026-10-01"), + ("2026_10", "2026-10-01", "2026-11-01"), + ("2026_11", "2026-11-01", "2026-12-01"), + ("2026_12", "2026-12-01", "2027-01-01"), + ("2027_01", "2027-01-01", "2027-02-01"), + ("2027_02", "2027-02-01", "2027-03-01"), + ("2027_03", "2027-03-01", "2027-04-01"), + ("2027_04", "2027-04-01", "2027-05-01"), + ("2027_05", "2027-05-01", "2027-06-01"), + ("2027_06", "2027-06-01", "2027-07-01"), + ]; + for (suffix, start, end) in partitions { + let sql = format!( + "CREATE TABLE IF NOT EXISTS device_readings_{suffix} PARTITION OF device_readings FOR VALUES FROM ('{start}') TO ('{end}');" + ); + manager.get_connection().execute_unprepared(&sql).await?; + } + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + let suffixes = [ + "2026_09", "2026_10", "2026_11", "2026_12", "2027_01", "2027_02", "2027_03", "2027_04", + "2027_05", "2027_06", + ]; + for suffix in suffixes { + let sql = format!("DROP TABLE IF EXISTS device_readings_{suffix};"); + manager.get_connection().execute_unprepared(&sql).await?; + } + Ok(()) + } +}