feat(health): 危急值告警消费者 — 幂等处理 + Handler + 路由
- event.rs: 消费 health_data.critical_alert 事件创建告警记录
- handler: list/get/acknowledge 三个端点
- 路由: /health/critical-alerts, /health/critical-alerts/{id}/acknowledge
- 权限: health.critical-alert.list / health.critical-alert.manage
This commit is contained in:
@@ -156,4 +156,62 @@ pub fn register_handlers_with_state(state: crate::state::HealthState) {
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// health_data.critical_alert → 创建危急值告警记录
|
||||
let (mut critical_rx, _critical_handle) = state.event_bus.subscribe_filtered("health_data.".to_string());
|
||||
let critical_state = state.clone();
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
match critical_rx.recv().await {
|
||||
Some(event) if event.event_type == HEALTH_DATA_CRITICAL_ALERT => {
|
||||
// 幂等检查
|
||||
if erp_core::events::is_event_processed(&critical_state.db, event.id, "critical_alert_consumer").await.unwrap_or(false) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let patient_id = event.payload.get("patient_id")
|
||||
.and_then(|v| v.as_str())
|
||||
.and_then(|s| Uuid::parse_str(s).ok());
|
||||
let alert_type = event.payload.get("alert_type").and_then(|v| v.as_str()).unwrap_or("vital_sign");
|
||||
let metric_name = event.payload.get("metric_name").and_then(|v| v.as_str()).unwrap_or("unknown");
|
||||
let metric_value = event.payload.get("metric_value").and_then(|v| v.as_str()).unwrap_or("");
|
||||
let threshold_value = event.payload.get("threshold_value").and_then(|v| v.as_str()).unwrap_or("");
|
||||
|
||||
if let Some(pid) = patient_id {
|
||||
match crate::service::critical_alert_service::handle_critical_alert_event(
|
||||
&critical_state,
|
||||
event.tenant_id,
|
||||
pid,
|
||||
alert_type,
|
||||
metric_name,
|
||||
metric_value,
|
||||
threshold_value,
|
||||
None,
|
||||
).await {
|
||||
Ok(alert_id) => {
|
||||
tracing::info!(
|
||||
event_id = %event.id,
|
||||
alert_id = %alert_id,
|
||||
patient_id = %pid,
|
||||
metric = %metric_name,
|
||||
"危急值告警已创建"
|
||||
);
|
||||
let _ = erp_core::events::mark_event_processed(&critical_state.db, event.id, "critical_alert_consumer").await;
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!(
|
||||
event_id = %event.id,
|
||||
patient_id = %pid,
|
||||
error = %e,
|
||||
"危急值告警创建失败"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Some(_) => {}
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user