fix(server): 修复 device_readings 分区硬截止 + AI 队列 claim_next SQL 注入
PP-02: m000073 只静态建了 2026_05~2026_08 分区,2026-09-01 起 INSERT 将抛错导致小程序 BLE 数据上传全线中断。新增 m20260626_000170 补建 2026_09~2027_06 共 10 个月分区,解除确定性硬截止。 PP-05a: AnalysisQueue::claim_next 用 format! 拼 tenant_id(SQL 注入) 且 SELECT+UPDATE 不在事务内、无 FOR UPDATE SKIP LOCKED。改为参数化 \$1 + 事务内 FOR UPDATE SKIP LOCKED 原子 claim,防注入并防并发重复领取。 PP-01(死信接线)耦合 feat 分支进行中的 cron_heartbeat 工作,另行提交。
This commit is contained in:
@@ -1,4 +1,4 @@
|
|||||||
use sea_orm::{ActiveModelTrait, EntityTrait, FromQueryResult, Set, Statement};
|
use sea_orm::{ActiveModelTrait, EntityTrait, FromQueryResult, Set, Statement, TransactionTrait};
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
use crate::entity::ai_analysis_queue;
|
use crate::entity::ai_analysis_queue;
|
||||||
@@ -93,43 +93,74 @@ impl AnalysisQueue {
|
|||||||
&self,
|
&self,
|
||||||
tenant_id: Option<Uuid>,
|
tenant_id: Option<Uuid>,
|
||||||
) -> AiResult<Option<ai_analysis_queue::Model>> {
|
) -> AiResult<Option<ai_analysis_queue::Model>> {
|
||||||
let sql = match tenant_id {
|
// 事务内 SELECT ... FOR UPDATE SKIP LOCKED + UPDATE:
|
||||||
Some(tid) => format!(
|
// - 参数化($1)消除原 format! 拼 tenant_id 的 SQL 注入风险
|
||||||
"SELECT * FROM ai_analysis_queue WHERE tenant_id = '{}' AND status = 'pending' AND deleted_at IS NULL AND scheduled_at <= NOW() ORDER BY priority DESC, scheduled_at ASC LIMIT 1",
|
// - FOR UPDATE SKIP LOCKED 在事务内持行锁到 UPDATE 完成,防多消费者并发重复 claim
|
||||||
tid
|
let claimed = self
|
||||||
),
|
.db
|
||||||
None => r#"
|
.transaction::<_, Option<ai_analysis_queue::Model>, AiError>(|txn| {
|
||||||
SELECT * FROM ai_analysis_queue
|
Box::pin(async move {
|
||||||
|
let row: Option<QueueRow> = match tenant_id {
|
||||||
|
Some(tid) => {
|
||||||
|
QueueRow::find_by_statement(Statement::from_sql_and_values(
|
||||||
|
sea_orm::DatabaseBackend::Postgres,
|
||||||
|
r#"SELECT * FROM ai_analysis_queue
|
||||||
|
WHERE tenant_id = $1
|
||||||
|
AND status = 'pending'
|
||||||
|
AND deleted_at IS NULL
|
||||||
|
AND scheduled_at <= NOW()
|
||||||
|
ORDER BY priority DESC, scheduled_at ASC
|
||||||
|
LIMIT 1
|
||||||
|
FOR UPDATE SKIP LOCKED"#,
|
||||||
|
[tid.into()],
|
||||||
|
))
|
||||||
|
.one(txn)
|
||||||
|
.await?
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
QueueRow::find_by_statement(Statement::from_string(
|
||||||
|
sea_orm::DatabaseBackend::Postgres,
|
||||||
|
r#"SELECT * FROM ai_analysis_queue
|
||||||
WHERE status = 'pending'
|
WHERE status = 'pending'
|
||||||
AND deleted_at IS NULL
|
AND deleted_at IS NULL
|
||||||
AND scheduled_at <= NOW()
|
AND scheduled_at <= NOW()
|
||||||
ORDER BY priority DESC, scheduled_at ASC
|
ORDER BY priority DESC, scheduled_at ASC
|
||||||
LIMIT 1
|
LIMIT 1
|
||||||
"#
|
FOR UPDATE SKIP LOCKED"#
|
||||||
.to_string(),
|
.to_string(),
|
||||||
};
|
|
||||||
|
|
||||||
let row: Option<QueueRow> = QueueRow::find_by_statement(Statement::from_string(
|
|
||||||
sea_orm::DatabaseBackend::Postgres,
|
|
||||||
sql.to_string(),
|
|
||||||
))
|
))
|
||||||
.one(&self.db)
|
.one(txn)
|
||||||
.await?;
|
.await?
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
match row {
|
match row {
|
||||||
Some(r) => {
|
Some(r) => {
|
||||||
let now = chrono::Utc::now();
|
let now = chrono::Utc::now();
|
||||||
let mut active: ai_analysis_queue::ActiveModel =
|
let model = ai_analysis_queue::Entity::find_by_id(r.id)
|
||||||
self.find_by_id(r.id).await?.into();
|
.one(txn)
|
||||||
|
.await?
|
||||||
|
.ok_or_else(|| {
|
||||||
|
AiError::QueueError(format!("队列任务 {} 未找到", r.id))
|
||||||
|
})?;
|
||||||
|
let mut active: ai_analysis_queue::ActiveModel = model.into();
|
||||||
active.status = Set("running".to_string());
|
active.status = Set("running".to_string());
|
||||||
active.started_at = Set(Some(now));
|
active.started_at = Set(Some(now));
|
||||||
active.updated_at = Set(now);
|
active.updated_at = Set(now);
|
||||||
active.version_lock = Set(active.version_lock.take().unwrap_or(0) + 1);
|
active.version_lock = Set(active.version_lock.take().unwrap_or(0) + 1);
|
||||||
let model = active.update(&self.db).await?;
|
let updated = active.update(txn).await?;
|
||||||
Ok(Some(model))
|
Ok(Some(updated))
|
||||||
}
|
}
|
||||||
None => Ok(None),
|
None => Ok(None),
|
||||||
}
|
}
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.map_err(|e| match e {
|
||||||
|
sea_orm::TransactionError::Connection(d) => d.into(),
|
||||||
|
sea_orm::TransactionError::Transaction(a) => a,
|
||||||
|
})?;
|
||||||
|
Ok(claimed)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn mark_completed(&self, id: Uuid, result_analysis_id: Uuid) -> AiResult<()> {
|
pub async fn mark_completed(&self, id: Uuid, result_analysis_id: Uuid) -> AiResult<()> {
|
||||||
|
|||||||
@@ -176,6 +176,7 @@ mod m20260526_000166_create_ai_knowledge_bases;
|
|||||||
mod m20260526_000167_create_ai_knowledge_documents;
|
mod m20260526_000167_create_ai_knowledge_documents;
|
||||||
mod m20260527_000168_ai_knowledge_v2_menu;
|
mod m20260527_000168_ai_knowledge_v2_menu;
|
||||||
mod m20260529_000169_supplement_rls_for_new_tables;
|
mod m20260529_000169_supplement_rls_for_new_tables;
|
||||||
|
mod m20260626_000170_extend_device_readings_partitions;
|
||||||
|
|
||||||
pub struct Migrator;
|
pub struct Migrator;
|
||||||
|
|
||||||
@@ -359,6 +360,7 @@ impl MigratorTrait for Migrator {
|
|||||||
Box::new(m20260526_000167_create_ai_knowledge_documents::Migration),
|
Box::new(m20260526_000167_create_ai_knowledge_documents::Migration),
|
||||||
Box::new(m20260527_000168_ai_knowledge_v2_menu::Migration),
|
Box::new(m20260527_000168_ai_knowledge_v2_menu::Migration),
|
||||||
Box::new(m20260529_000169_supplement_rls_for_new_tables::Migration),
|
Box::new(m20260529_000169_supplement_rls_for_new_tables::Migration),
|
||||||
|
Box::new(m20260626_000170_extend_device_readings_partitions::Migration),
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,48 @@
|
|||||||
|
use sea_orm_migration::prelude::*;
|
||||||
|
|
||||||
|
#[derive(DeriveMigrationName)]
|
||||||
|
pub struct Migration;
|
||||||
|
|
||||||
|
/// 补建 device_readings 分区到 2027_06。
|
||||||
|
///
|
||||||
|
/// 背景:m000073 只静态建了 2026_05~2026_08 四个分区,2026-09-01 起 INSERT 将因
|
||||||
|
/// 无目标分区抛错,导致小程序 Veepoo M2 BLE 数据上传全线中断(确定性硬截止)。
|
||||||
|
/// 本迁移补建 2026_09~2027_06 共 10 个月分区解除截止;中期应引入 pg_partman
|
||||||
|
/// 或定时任务自动维护未来分区(见系统分析 PP-02)。
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl MigrationTrait for Migration {
|
||||||
|
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
||||||
|
// 分区范围字面量为受控常量(非用户输入),与 m000073 写法一致
|
||||||
|
let partitions: [(&str, &str, &str); 10] = [
|
||||||
|
("2026_09", "2026-09-01", "2026-10-01"),
|
||||||
|
("2026_10", "2026-10-01", "2026-11-01"),
|
||||||
|
("2026_11", "2026-11-01", "2026-12-01"),
|
||||||
|
("2026_12", "2026-12-01", "2027-01-01"),
|
||||||
|
("2027_01", "2027-01-01", "2027-02-01"),
|
||||||
|
("2027_02", "2027-02-01", "2027-03-01"),
|
||||||
|
("2027_03", "2027-03-01", "2027-04-01"),
|
||||||
|
("2027_04", "2027-04-01", "2027-05-01"),
|
||||||
|
("2027_05", "2027-05-01", "2027-06-01"),
|
||||||
|
("2027_06", "2027-06-01", "2027-07-01"),
|
||||||
|
];
|
||||||
|
for (suffix, start, end) in partitions {
|
||||||
|
let sql = format!(
|
||||||
|
"CREATE TABLE IF NOT EXISTS device_readings_{suffix} PARTITION OF device_readings FOR VALUES FROM ('{start}') TO ('{end}');"
|
||||||
|
);
|
||||||
|
manager.get_connection().execute_unprepared(&sql).await?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
||||||
|
let suffixes = [
|
||||||
|
"2026_09", "2026_10", "2026_11", "2026_12", "2027_01", "2027_02", "2027_03", "2027_04",
|
||||||
|
"2027_05", "2027_06",
|
||||||
|
];
|
||||||
|
for suffix in suffixes {
|
||||||
|
let sql = format!("DROP TABLE IF EXISTS device_readings_{suffix};");
|
||||||
|
manager.get_connection().execute_unprepared(&sql).await?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user