diff --git a/crates/erp-health/src/service/alert_engine.rs b/crates/erp-health/src/service/alert_engine.rs index 5375f6b..d5918ad 100644 --- a/crates/erp-health/src/service/alert_engine.rs +++ b/crates/erp-health/src/service/alert_engine.rs @@ -2,6 +2,7 @@ use chrono::Utc; use sea_orm::entity::prelude::*; use sea_orm::{ActiveValue::Set, QueryOrder, QuerySelect}; use serde_json::json; +use std::collections::HashSet; use uuid::Uuid; use crate::entity::{alert_rules, alerts, vital_signs_hourly}; @@ -23,10 +24,37 @@ pub async fn evaluate_rules( .all(&state.db) .await?; + if rules.is_empty() { + return Ok(Vec::new()); + } + + // 批量查询 cooldown 期间的 alerts + let max_cooldown: i64 = rules.iter().map(|r| r.cooldown_minutes as i64).max().unwrap_or(60); + let cooldown_start = Utc::now() - chrono::Duration::minutes(max_cooldown); + let recent_alerts = alerts::Entity::find() + .filter(alerts::Column::TenantId.eq(tenant_id)) + .filter(alerts::Column::PatientId.eq(patient_id)) + .filter(alerts::Column::CreatedAt.gt(cooldown_start)) + .filter(alerts::Column::DeletedAt.is_null()) + .all(&state.db) + .await?; + let cooldown_set: HashSet = recent_alerts.iter().map(|a| a.rule_id).collect(); + + // 批量查询最近的 hourly 记录(最多取最近 168 小时用于 trend 判断) + let hourly_records = vital_signs_hourly::Entity::find() + .filter(vital_signs_hourly::Column::TenantId.eq(tenant_id)) + .filter(vital_signs_hourly::Column::PatientId.eq(patient_id)) + .filter(vital_signs_hourly::Column::DeviceType.eq(device_type)) + .filter(vital_signs_hourly::Column::HourStart.gt(Utc::now() - chrono::Duration::hours(168))) + .order_by_desc(vital_signs_hourly::Column::HourStart) + .all(&state.db) + .await?; + let mut triggered_alerts = Vec::new(); for rule in rules { - if is_in_cooldown(&state.db, tenant_id, patient_id, rule.id, rule.cooldown_minutes).await? { + // 检查 cooldown(使用预先查询的集合) + if cooldown_set.contains(&rule.id) { continue; } @@ -34,15 +62,9 @@ pub async fn evaluate_rules( let condition_type = rule.condition_type.as_str(); let is_triggered = match condition_type { - "single_threshold" => evaluate_single_threshold( - &state.db, tenant_id, patient_id, device_type, params - ).await?, - "consecutive" => evaluate_consecutive( - &state.db, tenant_id, patient_id, device_type, params - ).await?, - "trend" => evaluate_trend( - &state.db, tenant_id, patient_id, device_type, params - ).await?, + "single_threshold" => evaluate_single_threshold_in_memory(&hourly_records, params), + "consecutive" => evaluate_consecutive_in_memory(&hourly_records, params), + "trend" => evaluate_trend_in_memory(&hourly_records, params), _ => false, }; @@ -57,90 +79,49 @@ pub async fn evaluate_rules( Ok(triggered_alerts) } -async fn is_in_cooldown( - db: &DatabaseConnection, - tenant_id: Uuid, - patient_id: Uuid, - rule_id: Uuid, - cooldown_minutes: i32, -) -> HealthResult { - let cooldown_start = Utc::now() - chrono::Duration::minutes(cooldown_minutes as i64); - let recent = alerts::Entity::find() - .filter(alerts::Column::TenantId.eq(tenant_id)) - .filter(alerts::Column::PatientId.eq(patient_id)) - .filter(alerts::Column::RuleId.eq(rule_id)) - .filter(alerts::Column::CreatedAt.gt(cooldown_start)) - .filter(alerts::Column::DeletedAt.is_null()) - .one(db) - .await?; - Ok(recent.is_some()) -} - -async fn evaluate_single_threshold( - db: &DatabaseConnection, - tenant_id: Uuid, - patient_id: Uuid, - device_type: &str, +fn evaluate_single_threshold_in_memory( + records: &[vital_signs_hourly::Model], params: &serde_json::Value, -) -> HealthResult { +) -> bool { let direction = params["direction"].as_str().unwrap_or("above"); let threshold = params["value"].as_f64().unwrap_or(f64::MAX); - let latest = vital_signs_hourly::Entity::find() - .filter(vital_signs_hourly::Column::TenantId.eq(tenant_id)) - .filter(vital_signs_hourly::Column::PatientId.eq(patient_id)) - .filter(vital_signs_hourly::Column::DeviceType.eq(device_type)) - .order_by_desc(vital_signs_hourly::Column::HourStart) - .one(db) - .await?; - - match latest { + // records 已按 HourStart DESC 排序,第一条即最新 + match records.first() { Some(record) => { let val = record.avg_val; - Ok(match direction { + match direction { "above" => val > threshold, "below" => val < threshold, _ => false, - }) + } } - None => Ok(false), + None => false, } } -async fn evaluate_consecutive( - db: &DatabaseConnection, - tenant_id: Uuid, - patient_id: Uuid, - device_type: &str, +fn evaluate_consecutive_in_memory( + records: &[vital_signs_hourly::Model], params: &serde_json::Value, -) -> HealthResult { - let count = params["count"].as_u64().unwrap_or(3) as u64; +) -> bool { + let count = params["count"].as_u64().unwrap_or(3) as usize; let direction = params["direction"].as_str().unwrap_or("above"); let threshold = params["value"].as_f64().unwrap_or(f64::MAX); let window_hours = params["window_hours"].as_i64(); - use sea_orm::QueryOrder; - let mut query = vital_signs_hourly::Entity::find() - .filter(vital_signs_hourly::Column::TenantId.eq(tenant_id)) - .filter(vital_signs_hourly::Column::PatientId.eq(patient_id)) - .filter(vital_signs_hourly::Column::DeviceType.eq(device_type)) - .order_by_desc(vital_signs_hourly::Column::HourStart); + // records 已按 HourStart DESC 排序 + let cutoff = window_hours.map(|h| Utc::now() - chrono::Duration::hours(h)); + let recent: Vec<_> = records + .iter() + .take_while(|r| cutoff.map_or(true, |c| r.hour_start > c)) + .take(count) + .collect(); - if let Some(hours) = window_hours { - let since = Utc::now() - chrono::Duration::hours(hours); - query = query.filter(vital_signs_hourly::Column::HourStart.gt(since)); + if recent.len() < count { + return false; } - let records: Vec<_> = query - .limit(count) - .all(db) - .await?; - - if records.len() < count as usize { - return Ok(false); - } - - let all_exceed = records.iter().all(|r| { + let all_exceed = recent.iter().all(|r| { match direction { "above" => r.avg_val > threshold, "below" => r.avg_val < threshold, @@ -148,45 +129,39 @@ async fn evaluate_consecutive( } }); - Ok(all_exceed) + all_exceed } -async fn evaluate_trend( - db: &DatabaseConnection, - tenant_id: Uuid, - patient_id: Uuid, - device_type: &str, +fn evaluate_trend_in_memory( + records: &[vital_signs_hourly::Model], params: &serde_json::Value, -) -> HealthResult { +) -> bool { let window_hours = params["window_hours"].as_i64().unwrap_or(168); let delta_threshold = params["delta"].as_f64().unwrap_or(20.0); let direction = params["direction"].as_str().unwrap_or("up"); let since = Utc::now() - chrono::Duration::hours(window_hours); - use sea_orm::QueryOrder; - let records: Vec<_> = vital_signs_hourly::Entity::find() - .filter(vital_signs_hourly::Column::TenantId.eq(tenant_id)) - .filter(vital_signs_hourly::Column::PatientId.eq(patient_id)) - .filter(vital_signs_hourly::Column::DeviceType.eq(device_type)) - .filter(vital_signs_hourly::Column::HourStart.gt(since)) - .order_by_asc(vital_signs_hourly::Column::HourStart) - .all(db) - .await?; + // records 已按 HourStart DESC 排序,需要按时间正序取首尾 + let mut in_window: Vec<_> = records + .iter() + .filter(|r| r.hour_start > since) + .collect(); + in_window.sort_by_key(|r| r.hour_start); - if records.len() < 2 { - return Ok(false); + if in_window.len() < 2 { + return false; } - let first = records.first().unwrap().avg_val; - let last = records.last().unwrap().avg_val; + let first = in_window.first().unwrap().avg_val; + let last = in_window.last().unwrap().avg_val; let actual_delta = last - first; - Ok(match direction { + match direction { "up" => actual_delta > delta_threshold, "down" => actual_delta < -delta_threshold, _ => false, - }) + } } async fn create_alert_and_notify(