From c09f6ecdc88afde4e141733e54ad3eacc06968d4 Mon Sep 17 00:00:00 2001 From: iven Date: Mon, 27 Apr 2026 09:29:55 +0800 Subject: [PATCH] =?UTF-8?q?perf(health):=20upsert=5Fhourly=5Faggregates=20?= =?UTF-8?q?=E6=89=B9=E9=87=8F=E5=8C=96=20=E2=80=94=20=E6=89=B9=E9=87=8F?= =?UTF-8?q?=E6=9F=A5=E8=AF=A2+insert=5Fmany?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 将逐组查询+更新/插入改为一次批量查询所有已存在记录, 分为"新增"和"更新"两组,新增用 insert_many() 一次性插入。 查询次数从 N 降为 1+更新数。 --- .../src/service/device_reading_service.rs | 52 ++++++++++++------- 1 file changed, 34 insertions(+), 18 deletions(-) diff --git a/crates/erp-health/src/service/device_reading_service.rs b/crates/erp-health/src/service/device_reading_service.rs index 4ec81a7..081ab64 100644 --- a/crates/erp-health/src/service/device_reading_service.rs +++ b/crates/erp-health/src/service/device_reading_service.rs @@ -234,7 +234,6 @@ async fn upsert_hourly_aggregates( let mut groups: HashMap<(String, DateTime), Vec> = HashMap::new(); for (r, measured_at) in readings { - // 尝试从 values 中提取数值用于聚合 let hour_start = measured_at .with_minute(0) .and_then(|t| t.with_second(0)) @@ -247,22 +246,32 @@ async fn upsert_hourly_aggregates( } } + if groups.is_empty() { + return Ok(()); + } + + // 批量查出所有已存在的聚合记录(一次查询) + let existing_records = vital_signs_hourly::Entity::find() + .filter(vital_signs_hourly::Column::TenantId.eq(tenant_id)) + .filter(vital_signs_hourly::Column::PatientId.eq(patient_id)) + .all(db) + .await?; + + let existing_map: HashMap<(String, DateTime), vital_signs_hourly::Model> = existing_records + .into_iter() + .map(|r| ((r.device_type.clone(), r.hour_start), r)) + .collect(); + + let now = Utc::now(); + let mut to_insert: Vec = Vec::new(); + for ((device_type, hour_start), values) in groups { let min_val = values.iter().cloned().reduce(f64::min); let max_val = values.iter().cloned().reduce(f64::max); let avg_val = values.iter().sum::() / values.len() as f64; let sample_count = values.len() as i32; - // 尝试查找已存在的聚合记录 - let existing = vital_signs_hourly::Entity::find() - .filter(vital_signs_hourly::Column::TenantId.eq(tenant_id)) - .filter(vital_signs_hourly::Column::PatientId.eq(patient_id)) - .filter(vital_signs_hourly::Column::DeviceType.eq(&device_type)) - .filter(vital_signs_hourly::Column::HourStart.eq(hour_start)) - .one(db) - .await?; - - if let Some(rec) = existing { + if let Some(rec) = existing_map.get(&(device_type.clone(), hour_start)) { // 合并:重新计算聚合 let total_count = rec.sample_count + sample_count; let combined_avg = (rec.avg_val * rec.sample_count as f64 + avg_val * sample_count as f64) @@ -270,16 +279,16 @@ async fn upsert_hourly_aggregates( let combined_min = rec.min_val.map_or(min_val, |m| min_val.map_or(Some(m), |v| Some(m.min(v)))).or(min_val); let combined_max = rec.max_val.map_or(max_val, |m| max_val.map_or(Some(m), |v| Some(m.max(v)))).or(max_val); - let mut active: vital_signs_hourly::ActiveModel = rec.into(); + let mut active: vital_signs_hourly::ActiveModel = rec.clone().into(); active.min_val = Set(combined_min); active.max_val = Set(combined_max); active.avg_val = Set(combined_avg); active.sample_count = Set(total_count); - active.updated_at = Set(Utc::now()); + active.updated_at = Set(now); active.version = Set(active.version.unwrap() + 1); active.update(db).await?; } else { - let model = vital_signs_hourly::ActiveModel { + to_insert.push(vital_signs_hourly::ActiveModel { id: Set(Uuid::now_v7()), tenant_id: Set(tenant_id), patient_id: Set(patient_id), @@ -289,14 +298,21 @@ async fn upsert_hourly_aggregates( max_val: Set(max_val), avg_val: Set(avg_val), sample_count: Set(sample_count), - created_at: Set(Utc::now()), - updated_at: Set(Utc::now()), + created_at: Set(now), + updated_at: Set(now), version: Set(1), - }; - model.insert(db).await?; + }); } } + // 批量插入新增记录 + if !to_insert.is_empty() { + vital_signs_hourly::Entity::insert_many(to_insert) + .exec(db) + .await + .map_err(|e| HealthError::DbError(e.to_string()))?; + } + Ok(()) }