diff --git a/crates/erp-health/src/event.rs b/crates/erp-health/src/event.rs index 6a865a6..71ffe70 100644 --- a/crates/erp-health/src/event.rs +++ b/crates/erp-health/src/event.rs @@ -11,6 +11,7 @@ pub const APPOINTMENT_CREATED: &str = "appointment.created"; // 告警 pub const ALERT_TRIGGERED: &str = "alert.triggered"; +pub const ALERT_AGGREGATED: &str = "alert.aggregated"; // 知情同意 pub const CONSENT_GRANTED: &str = "consent.granted"; @@ -181,6 +182,33 @@ pub fn register_handlers_with_state(state: crate::state::HealthState) { } let _ = erp_core::events::mark_event_processed(&alert_db, event.id, "alert_notifier").await; } + Some(event) if event.event_type == ALERT_TRIGGERED => { + // 被抑制的告警 → 发布聚合事件 + if erp_core::events::is_event_processed(&alert_db, event.id, "alert_aggregator").await.unwrap_or(false) { + continue; + } + let is_suppressed = event.payload.get("suppressed") + .and_then(|v| v.as_bool()) + .unwrap_or(false); + + if is_suppressed { + let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str()); + if let Some(pid) = patient_id { + let aggregated_event = erp_core::events::DomainEvent::new( + ALERT_AGGREGATED, + event.tenant_id, + erp_core::events::build_event_payload(serde_json::json!({ + "patient_id": pid, + "triggering_alert_id": event.payload.get("alert_id").and_then(|v| v.as_str()), + "severity": event.payload.get("severity"), + })), + ); + alert_bus.publish(aggregated_event, &alert_db).await; + tracing::info!(patient_id = %pid, "告警聚合事件已发布"); + } + } + let _ = erp_core::events::mark_event_processed(&alert_db, event.id, "alert_aggregator").await; + } Some(_) => {} None => break, } @@ -937,6 +965,7 @@ mod tests { let all_types = [ APPOINTMENT_CREATED, ALERT_TRIGGERED, + ALERT_AGGREGATED, CONSENT_GRANTED, CONSENT_REVOKED, ARTICLE_PUBLISHED, @@ -971,6 +1000,7 @@ mod tests { let all_types = [ APPOINTMENT_CREATED, ALERT_TRIGGERED, + ALERT_AGGREGATED, CONSENT_GRANTED, CONSENT_REVOKED, ARTICLE_PUBLISHED, @@ -1008,6 +1038,7 @@ mod tests { // 确保常量值与消费者 switch 匹配中使用的硬编码字符串一致 assert_eq!(APPOINTMENT_CREATED, "appointment.created"); assert_eq!(ALERT_TRIGGERED, "alert.triggered"); + assert_eq!(ALERT_AGGREGATED, "alert.aggregated"); assert_eq!(CONSENT_GRANTED, "consent.granted"); assert_eq!(CONSENT_REVOKED, "consent.revoked"); assert_eq!(ARTICLE_PUBLISHED, "article.published"); @@ -1570,6 +1601,12 @@ mod tests { prefix, ALERT_TRIGGERED ); + assert!( + ALERT_AGGREGATED.starts_with(prefix), + "前缀 '{}' 应覆盖 '{}'", + prefix, + ALERT_AGGREGATED + ); } #[test] @@ -1683,6 +1720,7 @@ mod tests { // 收集所有消费者的 consumer_id(从 mark_event_processed 调用中提取) let consumer_ids = [ "workflow_task_consumer", + "alert_aggregator", "alert_notifier", "patient_welcome", "appt_created_notifier",