Files
hms/crates/erp-health/src/service/alert_engine.rs
iven 5b81a0051f docs: 修正测试策略 spec 的事实性错误
修正 spec review 发现的问题:
- C-1: TestDb 实际是本地 PostgreSQL 隔离,非 Testcontainers
- C-2: E2E 已有 4 spec/10 测试,非零测试
- 补充 6 个遗漏的 service(alert/daily_monitoring/critical_value_threshold 等)
- 增加 Phase 0 基础设施搭建
- 修正 CI 配置(增加 PostgreSQL service、验证链)
- 补充 5 个遗漏风险项和回退策略
- 统一"全量 80%"目标的准确含义
2026-04-27 00:21:02 +08:00

241 lines
7.5 KiB
Rust

use chrono::Utc;
use sea_orm::entity::prelude::*;
use sea_orm::{ActiveValue::Set, QueryOrder, QuerySelect};
use serde_json::json;
use uuid::Uuid;
use crate::entity::{alert_rules, alerts, vital_signs_hourly};
use crate::error::{HealthError, HealthResult};
use crate::state::HealthState;
/// 评估所有适用规则,返回触发的告警列表
pub async fn evaluate_rules(
state: &HealthState,
tenant_id: Uuid,
patient_id: Uuid,
device_type: &str,
) -> HealthResult<Vec<alerts::Model>> {
let rules = alert_rules::Entity::find()
.filter(alert_rules::Column::TenantId.eq(tenant_id))
.filter(alert_rules::Column::IsActive.eq(true))
.filter(alert_rules::Column::DeviceType.eq(device_type))
.filter(alert_rules::Column::DeletedAt.is_null())
.all(&state.db)
.await?;
let mut triggered_alerts = Vec::new();
for rule in rules {
if is_in_cooldown(&state.db, tenant_id, patient_id, rule.id, rule.cooldown_minutes).await? {
continue;
}
let params = &rule.condition_params;
let condition_type = rule.condition_type.as_str();
let is_triggered = match condition_type {
"single_threshold" => evaluate_single_threshold(
&state.db, tenant_id, patient_id, device_type, params
).await?,
"consecutive" => evaluate_consecutive(
&state.db, tenant_id, patient_id, device_type, params
).await?,
"trend" => evaluate_trend(
&state.db, tenant_id, patient_id, device_type, params
).await?,
_ => false,
};
if is_triggered {
let alert = create_alert_and_notify(
&state.db, &state.event_bus, tenant_id, patient_id, &rule
).await?;
triggered_alerts.push(alert);
}
}
Ok(triggered_alerts)
}
async fn is_in_cooldown(
db: &DatabaseConnection,
tenant_id: Uuid,
patient_id: Uuid,
rule_id: Uuid,
cooldown_minutes: i32,
) -> HealthResult<bool> {
let cooldown_start = Utc::now() - chrono::Duration::minutes(cooldown_minutes as i64);
let recent = alerts::Entity::find()
.filter(alerts::Column::TenantId.eq(tenant_id))
.filter(alerts::Column::PatientId.eq(patient_id))
.filter(alerts::Column::RuleId.eq(rule_id))
.filter(alerts::Column::CreatedAt.gt(cooldown_start))
.filter(alerts::Column::DeletedAt.is_null())
.one(db)
.await?;
Ok(recent.is_some())
}
async fn evaluate_single_threshold(
db: &DatabaseConnection,
tenant_id: Uuid,
patient_id: Uuid,
device_type: &str,
params: &serde_json::Value,
) -> HealthResult<bool> {
let direction = params["direction"].as_str().unwrap_or("above");
let threshold = params["value"].as_f64().unwrap_or(f64::MAX);
let latest = vital_signs_hourly::Entity::find()
.filter(vital_signs_hourly::Column::TenantId.eq(tenant_id))
.filter(vital_signs_hourly::Column::PatientId.eq(patient_id))
.filter(vital_signs_hourly::Column::DeviceType.eq(device_type))
.order_by_desc(vital_signs_hourly::Column::HourStart)
.one(db)
.await?;
match latest {
Some(record) => {
let val = record.avg_val;
Ok(match direction {
"above" => val > threshold,
"below" => val < threshold,
_ => false,
})
}
None => Ok(false),
}
}
async fn evaluate_consecutive(
db: &DatabaseConnection,
tenant_id: Uuid,
patient_id: Uuid,
device_type: &str,
params: &serde_json::Value,
) -> HealthResult<bool> {
let count = params["count"].as_u64().unwrap_or(3) as u64;
let direction = params["direction"].as_str().unwrap_or("above");
let threshold = params["value"].as_f64().unwrap_or(f64::MAX);
let window_hours = params["window_hours"].as_i64();
use sea_orm::QueryOrder;
let mut query = vital_signs_hourly::Entity::find()
.filter(vital_signs_hourly::Column::TenantId.eq(tenant_id))
.filter(vital_signs_hourly::Column::PatientId.eq(patient_id))
.filter(vital_signs_hourly::Column::DeviceType.eq(device_type))
.order_by_desc(vital_signs_hourly::Column::HourStart);
if let Some(hours) = window_hours {
let since = Utc::now() - chrono::Duration::hours(hours);
query = query.filter(vital_signs_hourly::Column::HourStart.gt(since));
}
let records: Vec<_> = query
.limit(count)
.all(db)
.await?;
if records.len() < count as usize {
return Ok(false);
}
let all_exceed = records.iter().all(|r| {
match direction {
"above" => r.avg_val > threshold,
"below" => r.avg_val < threshold,
_ => false,
}
});
Ok(all_exceed)
}
async fn evaluate_trend(
db: &DatabaseConnection,
tenant_id: Uuid,
patient_id: Uuid,
device_type: &str,
params: &serde_json::Value,
) -> HealthResult<bool> {
let window_hours = params["window_hours"].as_i64().unwrap_or(168);
let delta_threshold = params["delta"].as_f64().unwrap_or(20.0);
let direction = params["direction"].as_str().unwrap_or("up");
let since = Utc::now() - chrono::Duration::hours(window_hours);
use sea_orm::QueryOrder;
let records: Vec<_> = vital_signs_hourly::Entity::find()
.filter(vital_signs_hourly::Column::TenantId.eq(tenant_id))
.filter(vital_signs_hourly::Column::PatientId.eq(patient_id))
.filter(vital_signs_hourly::Column::DeviceType.eq(device_type))
.filter(vital_signs_hourly::Column::HourStart.gt(since))
.order_by_asc(vital_signs_hourly::Column::HourStart)
.all(db)
.await?;
if records.len() < 2 {
return Ok(false);
}
let first = records.first().unwrap().avg_val;
let last = records.last().unwrap().avg_val;
let actual_delta = last - first;
Ok(match direction {
"up" => actual_delta > delta_threshold,
"down" => actual_delta < -delta_threshold,
_ => false,
})
}
async fn create_alert_and_notify(
db: &DatabaseConnection,
event_bus: &erp_core::events::EventBus,
tenant_id: Uuid,
patient_id: Uuid,
rule: &alert_rules::Model,
) -> HealthResult<alerts::Model> {
let alert_id = Uuid::now_v7();
let alert = alerts::ActiveModel {
id: Set(alert_id),
tenant_id: Set(tenant_id),
patient_id: Set(patient_id),
rule_id: Set(rule.id),
severity: Set(rule.severity.clone()),
title: Set(format!("{}触发", rule.name)),
detail: Set(Some(json!({
"rule_name": rule.name,
"condition_type": rule.condition_type,
"condition_params": rule.condition_params,
"device_type": rule.device_type,
}))),
status: Set("pending".to_string()),
acknowledged_by: Set(None),
acknowledged_at: Set(None),
resolved_at: Set(None),
created_at: Set(Utc::now()),
updated_at: Set(Utc::now()),
deleted_at: Set(None),
version: Set(1),
};
let alert = alert.insert(db).await?;
let event = erp_core::events::DomainEvent::new(
"alert.triggered",
tenant_id,
json!({
"alert_id": alert.id,
"patient_id": patient_id,
"rule_name": rule.name,
"severity": rule.severity,
"detail": alert.detail,
"notify_roles": rule.notify_roles,
}),
);
event_bus.publish(event, db).await;
Ok(alert)
}