feat(health): 告警聚合事件消费者 — alert.aggregated

- 新增 ALERT_AGGREGATED 常量
- alert_notifier 消费者中处理 suppressed=true 告警并发布聚合事件
- 更新事件常量测试和 consumer_id 唯一性测试
This commit is contained in:
iven
2026-05-04 02:51:13 +08:00
parent bb5298ee0f
commit 4c1d98116a

View File

@@ -11,6 +11,7 @@ pub const APPOINTMENT_CREATED: &str = "appointment.created";
// 告警
pub const ALERT_TRIGGERED: &str = "alert.triggered";
pub const ALERT_AGGREGATED: &str = "alert.aggregated";
// 知情同意
pub const CONSENT_GRANTED: &str = "consent.granted";
@@ -181,6 +182,33 @@ pub fn register_handlers_with_state(state: crate::state::HealthState) {
}
let _ = erp_core::events::mark_event_processed(&alert_db, event.id, "alert_notifier").await;
}
Some(event) if event.event_type == ALERT_TRIGGERED => {
// 被抑制的告警 → 发布聚合事件
if erp_core::events::is_event_processed(&alert_db, event.id, "alert_aggregator").await.unwrap_or(false) {
continue;
}
let is_suppressed = event.payload.get("suppressed")
.and_then(|v| v.as_bool())
.unwrap_or(false);
if is_suppressed {
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
if let Some(pid) = patient_id {
let aggregated_event = erp_core::events::DomainEvent::new(
ALERT_AGGREGATED,
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"patient_id": pid,
"triggering_alert_id": event.payload.get("alert_id").and_then(|v| v.as_str()),
"severity": event.payload.get("severity"),
})),
);
alert_bus.publish(aggregated_event, &alert_db).await;
tracing::info!(patient_id = %pid, "告警聚合事件已发布");
}
}
let _ = erp_core::events::mark_event_processed(&alert_db, event.id, "alert_aggregator").await;
}
Some(_) => {}
None => break,
}
@@ -937,6 +965,7 @@ mod tests {
let all_types = [
APPOINTMENT_CREATED,
ALERT_TRIGGERED,
ALERT_AGGREGATED,
CONSENT_GRANTED,
CONSENT_REVOKED,
ARTICLE_PUBLISHED,
@@ -971,6 +1000,7 @@ mod tests {
let all_types = [
APPOINTMENT_CREATED,
ALERT_TRIGGERED,
ALERT_AGGREGATED,
CONSENT_GRANTED,
CONSENT_REVOKED,
ARTICLE_PUBLISHED,
@@ -1008,6 +1038,7 @@ mod tests {
// 确保常量值与消费者 switch 匹配中使用的硬编码字符串一致
assert_eq!(APPOINTMENT_CREATED, "appointment.created");
assert_eq!(ALERT_TRIGGERED, "alert.triggered");
assert_eq!(ALERT_AGGREGATED, "alert.aggregated");
assert_eq!(CONSENT_GRANTED, "consent.granted");
assert_eq!(CONSENT_REVOKED, "consent.revoked");
assert_eq!(ARTICLE_PUBLISHED, "article.published");
@@ -1570,6 +1601,12 @@ mod tests {
prefix,
ALERT_TRIGGERED
);
assert!(
ALERT_AGGREGATED.starts_with(prefix),
"前缀 '{}' 应覆盖 '{}'",
prefix,
ALERT_AGGREGATED
);
}
#[test]
@@ -1683,6 +1720,7 @@ mod tests {
// 收集所有消费者的 consumer_id从 mark_event_processed 调用中提取)
let consumer_ids = [
"workflow_task_consumer",
"alert_aggregator",
"alert_notifier",
"patient_welcome",
"appt_created_notifier",