perf(health): alert_engine 批量预加载 + 内存匹配替代逐规则DB查询

批量查询 cooldown 期间所有 alerts 和最近 hourly 记录,
在内存中完成 cooldown 检查和规则匹配。
N规则评估从 2N+ 次查询降为 2 次批量查询。
This commit is contained in:
iven
2026-04-27 09:55:39 +08:00
parent 0a387c189a
commit 0929825ae7

View File

@@ -2,6 +2,7 @@ use chrono::Utc;
use sea_orm::entity::prelude::*;
use sea_orm::{ActiveValue::Set, QueryOrder, QuerySelect};
use serde_json::json;
use std::collections::HashSet;
use uuid::Uuid;
use crate::entity::{alert_rules, alerts, vital_signs_hourly};
@@ -23,10 +24,37 @@ pub async fn evaluate_rules(
.all(&state.db)
.await?;
if rules.is_empty() {
return Ok(Vec::new());
}
// 批量查询 cooldown 期间的 alerts
let max_cooldown: i64 = rules.iter().map(|r| r.cooldown_minutes as i64).max().unwrap_or(60);
let cooldown_start = Utc::now() - chrono::Duration::minutes(max_cooldown);
let recent_alerts = alerts::Entity::find()
.filter(alerts::Column::TenantId.eq(tenant_id))
.filter(alerts::Column::PatientId.eq(patient_id))
.filter(alerts::Column::CreatedAt.gt(cooldown_start))
.filter(alerts::Column::DeletedAt.is_null())
.all(&state.db)
.await?;
let cooldown_set: HashSet<Uuid> = recent_alerts.iter().map(|a| a.rule_id).collect();
// 批量查询最近的 hourly 记录(最多取最近 168 小时用于 trend 判断)
let hourly_records = vital_signs_hourly::Entity::find()
.filter(vital_signs_hourly::Column::TenantId.eq(tenant_id))
.filter(vital_signs_hourly::Column::PatientId.eq(patient_id))
.filter(vital_signs_hourly::Column::DeviceType.eq(device_type))
.filter(vital_signs_hourly::Column::HourStart.gt(Utc::now() - chrono::Duration::hours(168)))
.order_by_desc(vital_signs_hourly::Column::HourStart)
.all(&state.db)
.await?;
let mut triggered_alerts = Vec::new();
for rule in rules {
if is_in_cooldown(&state.db, tenant_id, patient_id, rule.id, rule.cooldown_minutes).await? {
// 检查 cooldown使用预先查询的集合
if cooldown_set.contains(&rule.id) {
continue;
}
@@ -34,15 +62,9 @@ pub async fn evaluate_rules(
let condition_type = rule.condition_type.as_str();
let is_triggered = match condition_type {
"single_threshold" => evaluate_single_threshold(
&state.db, tenant_id, patient_id, device_type, params
).await?,
"consecutive" => evaluate_consecutive(
&state.db, tenant_id, patient_id, device_type, params
).await?,
"trend" => evaluate_trend(
&state.db, tenant_id, patient_id, device_type, params
).await?,
"single_threshold" => evaluate_single_threshold_in_memory(&hourly_records, params),
"consecutive" => evaluate_consecutive_in_memory(&hourly_records, params),
"trend" => evaluate_trend_in_memory(&hourly_records, params),
_ => false,
};
@@ -57,90 +79,49 @@ pub async fn evaluate_rules(
Ok(triggered_alerts)
}
async fn is_in_cooldown(
db: &DatabaseConnection,
tenant_id: Uuid,
patient_id: Uuid,
rule_id: Uuid,
cooldown_minutes: i32,
) -> HealthResult<bool> {
let cooldown_start = Utc::now() - chrono::Duration::minutes(cooldown_minutes as i64);
let recent = alerts::Entity::find()
.filter(alerts::Column::TenantId.eq(tenant_id))
.filter(alerts::Column::PatientId.eq(patient_id))
.filter(alerts::Column::RuleId.eq(rule_id))
.filter(alerts::Column::CreatedAt.gt(cooldown_start))
.filter(alerts::Column::DeletedAt.is_null())
.one(db)
.await?;
Ok(recent.is_some())
}
async fn evaluate_single_threshold(
db: &DatabaseConnection,
tenant_id: Uuid,
patient_id: Uuid,
device_type: &str,
fn evaluate_single_threshold_in_memory(
records: &[vital_signs_hourly::Model],
params: &serde_json::Value,
) -> HealthResult<bool> {
) -> bool {
let direction = params["direction"].as_str().unwrap_or("above");
let threshold = params["value"].as_f64().unwrap_or(f64::MAX);
let latest = vital_signs_hourly::Entity::find()
.filter(vital_signs_hourly::Column::TenantId.eq(tenant_id))
.filter(vital_signs_hourly::Column::PatientId.eq(patient_id))
.filter(vital_signs_hourly::Column::DeviceType.eq(device_type))
.order_by_desc(vital_signs_hourly::Column::HourStart)
.one(db)
.await?;
match latest {
// records 已按 HourStart DESC 排序,第一条即最新
match records.first() {
Some(record) => {
let val = record.avg_val;
Ok(match direction {
match direction {
"above" => val > threshold,
"below" => val < threshold,
_ => false,
})
}
}
None => Ok(false),
None => false,
}
}
async fn evaluate_consecutive(
db: &DatabaseConnection,
tenant_id: Uuid,
patient_id: Uuid,
device_type: &str,
fn evaluate_consecutive_in_memory(
records: &[vital_signs_hourly::Model],
params: &serde_json::Value,
) -> HealthResult<bool> {
let count = params["count"].as_u64().unwrap_or(3) as u64;
) -> bool {
let count = params["count"].as_u64().unwrap_or(3) as usize;
let direction = params["direction"].as_str().unwrap_or("above");
let threshold = params["value"].as_f64().unwrap_or(f64::MAX);
let window_hours = params["window_hours"].as_i64();
use sea_orm::QueryOrder;
let mut query = vital_signs_hourly::Entity::find()
.filter(vital_signs_hourly::Column::TenantId.eq(tenant_id))
.filter(vital_signs_hourly::Column::PatientId.eq(patient_id))
.filter(vital_signs_hourly::Column::DeviceType.eq(device_type))
.order_by_desc(vital_signs_hourly::Column::HourStart);
// records 已按 HourStart DESC 排序
let cutoff = window_hours.map(|h| Utc::now() - chrono::Duration::hours(h));
let recent: Vec<_> = records
.iter()
.take_while(|r| cutoff.map_or(true, |c| r.hour_start > c))
.take(count)
.collect();
if let Some(hours) = window_hours {
let since = Utc::now() - chrono::Duration::hours(hours);
query = query.filter(vital_signs_hourly::Column::HourStart.gt(since));
if recent.len() < count {
return false;
}
let records: Vec<_> = query
.limit(count)
.all(db)
.await?;
if records.len() < count as usize {
return Ok(false);
}
let all_exceed = records.iter().all(|r| {
let all_exceed = recent.iter().all(|r| {
match direction {
"above" => r.avg_val > threshold,
"below" => r.avg_val < threshold,
@@ -148,45 +129,39 @@ async fn evaluate_consecutive(
}
});
Ok(all_exceed)
all_exceed
}
async fn evaluate_trend(
db: &DatabaseConnection,
tenant_id: Uuid,
patient_id: Uuid,
device_type: &str,
fn evaluate_trend_in_memory(
records: &[vital_signs_hourly::Model],
params: &serde_json::Value,
) -> HealthResult<bool> {
) -> bool {
let window_hours = params["window_hours"].as_i64().unwrap_or(168);
let delta_threshold = params["delta"].as_f64().unwrap_or(20.0);
let direction = params["direction"].as_str().unwrap_or("up");
let since = Utc::now() - chrono::Duration::hours(window_hours);
use sea_orm::QueryOrder;
let records: Vec<_> = vital_signs_hourly::Entity::find()
.filter(vital_signs_hourly::Column::TenantId.eq(tenant_id))
.filter(vital_signs_hourly::Column::PatientId.eq(patient_id))
.filter(vital_signs_hourly::Column::DeviceType.eq(device_type))
.filter(vital_signs_hourly::Column::HourStart.gt(since))
.order_by_asc(vital_signs_hourly::Column::HourStart)
.all(db)
.await?;
// records 已按 HourStart DESC 排序,需要按时间正序取首尾
let mut in_window: Vec<_> = records
.iter()
.filter(|r| r.hour_start > since)
.collect();
in_window.sort_by_key(|r| r.hour_start);
if records.len() < 2 {
return Ok(false);
if in_window.len() < 2 {
return false;
}
let first = records.first().unwrap().avg_val;
let last = records.last().unwrap().avg_val;
let first = in_window.first().unwrap().avg_val;
let last = in_window.last().unwrap().avg_val;
let actual_delta = last - first;
Ok(match direction {
match direction {
"up" => actual_delta > delta_threshold,
"down" => actual_delta < -delta_threshold,
_ => false,
})
}
}
async fn create_alert_and_notify(