From 4c1d98116ad714269438024b2726181e04e212fb Mon Sep 17 00:00:00 2001 From: iven Date: Mon, 4 May 2026 02:51:13 +0800 Subject: [PATCH] =?UTF-8?q?feat(health):=20=E5=91=8A=E8=AD=A6=E8=81=9A?= =?UTF-8?q?=E5=90=88=E4=BA=8B=E4=BB=B6=E6=B6=88=E8=B4=B9=E8=80=85=20?= =?UTF-8?q?=E2=80=94=20alert.aggregated?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 ALERT_AGGREGATED 常量 - alert_notifier 消费者中处理 suppressed=true 告警并发布聚合事件 - 更新事件常量测试和 consumer_id 唯一性测试 --- crates/erp-health/src/event.rs | 38 ++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/crates/erp-health/src/event.rs b/crates/erp-health/src/event.rs index 6a865a6..71ffe70 100644 --- a/crates/erp-health/src/event.rs +++ b/crates/erp-health/src/event.rs @@ -11,6 +11,7 @@ pub const APPOINTMENT_CREATED: &str = "appointment.created"; // 告警 pub const ALERT_TRIGGERED: &str = "alert.triggered"; +pub const ALERT_AGGREGATED: &str = "alert.aggregated"; // 知情同意 pub const CONSENT_GRANTED: &str = "consent.granted"; @@ -181,6 +182,33 @@ pub fn register_handlers_with_state(state: crate::state::HealthState) { } let _ = erp_core::events::mark_event_processed(&alert_db, event.id, "alert_notifier").await; } + Some(event) if event.event_type == ALERT_TRIGGERED => { + // 被抑制的告警 → 发布聚合事件 + if erp_core::events::is_event_processed(&alert_db, event.id, "alert_aggregator").await.unwrap_or(false) { + continue; + } + let is_suppressed = event.payload.get("suppressed") + .and_then(|v| v.as_bool()) + .unwrap_or(false); + + if is_suppressed { + let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str()); + if let Some(pid) = patient_id { + let aggregated_event = erp_core::events::DomainEvent::new( + ALERT_AGGREGATED, + event.tenant_id, + erp_core::events::build_event_payload(serde_json::json!({ + "patient_id": pid, + "triggering_alert_id": event.payload.get("alert_id").and_then(|v| v.as_str()), + "severity": event.payload.get("severity"), + })), + ); + alert_bus.publish(aggregated_event, &alert_db).await; + tracing::info!(patient_id = %pid, "告警聚合事件已发布"); + } + } + let _ = erp_core::events::mark_event_processed(&alert_db, event.id, "alert_aggregator").await; + } Some(_) => {} None => break, } @@ -937,6 +965,7 @@ mod tests { let all_types = [ APPOINTMENT_CREATED, ALERT_TRIGGERED, + ALERT_AGGREGATED, CONSENT_GRANTED, CONSENT_REVOKED, ARTICLE_PUBLISHED, @@ -971,6 +1000,7 @@ mod tests { let all_types = [ APPOINTMENT_CREATED, ALERT_TRIGGERED, + ALERT_AGGREGATED, CONSENT_GRANTED, CONSENT_REVOKED, ARTICLE_PUBLISHED, @@ -1008,6 +1038,7 @@ mod tests { // 确保常量值与消费者 switch 匹配中使用的硬编码字符串一致 assert_eq!(APPOINTMENT_CREATED, "appointment.created"); assert_eq!(ALERT_TRIGGERED, "alert.triggered"); + assert_eq!(ALERT_AGGREGATED, "alert.aggregated"); assert_eq!(CONSENT_GRANTED, "consent.granted"); assert_eq!(CONSENT_REVOKED, "consent.revoked"); assert_eq!(ARTICLE_PUBLISHED, "article.published"); @@ -1570,6 +1601,12 @@ mod tests { prefix, ALERT_TRIGGERED ); + assert!( + ALERT_AGGREGATED.starts_with(prefix), + "前缀 '{}' 应覆盖 '{}'", + prefix, + ALERT_AGGREGATED + ); } #[test] @@ -1683,6 +1720,7 @@ mod tests { // 收集所有消费者的 consumer_id(从 mark_event_processed 调用中提取) let consumer_ids = [ "workflow_task_consumer", + "alert_aggregator", "alert_notifier", "patient_welcome", "appt_created_notifier",