perf(health): upsert_hourly_aggregates 批量化 — 批量查询+insert_many
将逐组查询+更新/插入改为一次批量查询所有已存在记录, 分为"新增"和"更新"两组,新增用 insert_many() 一次性插入。 查询次数从 N 降为 1+更新数。
This commit is contained in:
@@ -234,7 +234,6 @@ async fn upsert_hourly_aggregates(
|
||||
let mut groups: HashMap<(String, DateTime<Utc>), Vec<f64>> = HashMap::new();
|
||||
|
||||
for (r, measured_at) in readings {
|
||||
// 尝试从 values 中提取数值用于聚合
|
||||
let hour_start = measured_at
|
||||
.with_minute(0)
|
||||
.and_then(|t| t.with_second(0))
|
||||
@@ -247,22 +246,32 @@ async fn upsert_hourly_aggregates(
|
||||
}
|
||||
}
|
||||
|
||||
if groups.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// 批量查出所有已存在的聚合记录(一次查询)
|
||||
let existing_records = vital_signs_hourly::Entity::find()
|
||||
.filter(vital_signs_hourly::Column::TenantId.eq(tenant_id))
|
||||
.filter(vital_signs_hourly::Column::PatientId.eq(patient_id))
|
||||
.all(db)
|
||||
.await?;
|
||||
|
||||
let existing_map: HashMap<(String, DateTime<Utc>), vital_signs_hourly::Model> = existing_records
|
||||
.into_iter()
|
||||
.map(|r| ((r.device_type.clone(), r.hour_start), r))
|
||||
.collect();
|
||||
|
||||
let now = Utc::now();
|
||||
let mut to_insert: Vec<vital_signs_hourly::ActiveModel> = Vec::new();
|
||||
|
||||
for ((device_type, hour_start), values) in groups {
|
||||
let min_val = values.iter().cloned().reduce(f64::min);
|
||||
let max_val = values.iter().cloned().reduce(f64::max);
|
||||
let avg_val = values.iter().sum::<f64>() / values.len() as f64;
|
||||
let sample_count = values.len() as i32;
|
||||
|
||||
// 尝试查找已存在的聚合记录
|
||||
let existing = vital_signs_hourly::Entity::find()
|
||||
.filter(vital_signs_hourly::Column::TenantId.eq(tenant_id))
|
||||
.filter(vital_signs_hourly::Column::PatientId.eq(patient_id))
|
||||
.filter(vital_signs_hourly::Column::DeviceType.eq(&device_type))
|
||||
.filter(vital_signs_hourly::Column::HourStart.eq(hour_start))
|
||||
.one(db)
|
||||
.await?;
|
||||
|
||||
if let Some(rec) = existing {
|
||||
if let Some(rec) = existing_map.get(&(device_type.clone(), hour_start)) {
|
||||
// 合并:重新计算聚合
|
||||
let total_count = rec.sample_count + sample_count;
|
||||
let combined_avg = (rec.avg_val * rec.sample_count as f64 + avg_val * sample_count as f64)
|
||||
@@ -270,16 +279,16 @@ async fn upsert_hourly_aggregates(
|
||||
let combined_min = rec.min_val.map_or(min_val, |m| min_val.map_or(Some(m), |v| Some(m.min(v)))).or(min_val);
|
||||
let combined_max = rec.max_val.map_or(max_val, |m| max_val.map_or(Some(m), |v| Some(m.max(v)))).or(max_val);
|
||||
|
||||
let mut active: vital_signs_hourly::ActiveModel = rec.into();
|
||||
let mut active: vital_signs_hourly::ActiveModel = rec.clone().into();
|
||||
active.min_val = Set(combined_min);
|
||||
active.max_val = Set(combined_max);
|
||||
active.avg_val = Set(combined_avg);
|
||||
active.sample_count = Set(total_count);
|
||||
active.updated_at = Set(Utc::now());
|
||||
active.updated_at = Set(now);
|
||||
active.version = Set(active.version.unwrap() + 1);
|
||||
active.update(db).await?;
|
||||
} else {
|
||||
let model = vital_signs_hourly::ActiveModel {
|
||||
to_insert.push(vital_signs_hourly::ActiveModel {
|
||||
id: Set(Uuid::now_v7()),
|
||||
tenant_id: Set(tenant_id),
|
||||
patient_id: Set(patient_id),
|
||||
@@ -289,14 +298,21 @@ async fn upsert_hourly_aggregates(
|
||||
max_val: Set(max_val),
|
||||
avg_val: Set(avg_val),
|
||||
sample_count: Set(sample_count),
|
||||
created_at: Set(Utc::now()),
|
||||
updated_at: Set(Utc::now()),
|
||||
created_at: Set(now),
|
||||
updated_at: Set(now),
|
||||
version: Set(1),
|
||||
};
|
||||
model.insert(db).await?;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// 批量插入新增记录
|
||||
if !to_insert.is_empty() {
|
||||
vital_signs_hourly::Entity::insert_many(to_insert)
|
||||
.exec(db)
|
||||
.await
|
||||
.map_err(|e| HealthError::DbError(e.to_string()))?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user