diff --git a/apps/web/src/pages/health/CriticalValueThresholdList.tsx b/apps/web/src/pages/health/CriticalValueThresholdList.tsx index 0c07add..c7778c2 100644 --- a/apps/web/src/pages/health/CriticalValueThresholdList.tsx +++ b/apps/web/src/pages/health/CriticalValueThresholdList.tsx @@ -1,4 +1,4 @@ -import { useState, useCallback } from 'react'; +import { useState, useCallback, useEffect } from 'react'; import { Button, Form, Input, InputNumber, message, Modal, Popconfirm, Result, Select, Space, Switch, Table, Tag, } from 'antd'; @@ -45,6 +45,10 @@ export default function CriticalValueThresholdList() { } }, []); + useEffect(() => { + fetchData(); + }, [fetchData]); + const handleCreate = () => { setEditRecord(null); form.resetFields(); diff --git a/crates/erp-health/src/event.rs b/crates/erp-health/src/event.rs index 4fd6f87..90c237f 100644 --- a/crates/erp-health/src/event.rs +++ b/crates/erp-health/src/event.rs @@ -73,8 +73,13 @@ pub fn register_handlers(_bus: &EventBus) { /// 带 HealthState 的事件订阅 — 在模块 on_startup 时调用 pub fn register_handlers_with_state(state: crate::state::HealthState) { + // 收集所有 SubscriptionHandle 并 forget,防止函数返回时 handle 被 drop + // 导致 cancel channel 关闭、过滤任务退出、消费者全部失效 + let mut _handles: Vec = Vec::new(); + // workflow.task.completed → 更新随访任务状态为 completed - let (mut workflow_rx, _wf_handle) = state.event_bus.subscribe_filtered("workflow.task.".to_string()); + let (mut workflow_rx, wf_handle) = state.event_bus.subscribe_filtered("workflow.task.".to_string()); + _handles.push(wf_handle); let wf_db = state.db.clone(); tokio::spawn(async move { loop { @@ -124,7 +129,8 @@ pub fn register_handlers_with_state(state: crate::state::HealthState) { }); // device.readings.synced → 触发告警引擎评估 - let (mut reading_rx, _reading_handle) = state.event_bus.subscribe_filtered("device.readings.".to_string()); + let (mut reading_rx, reading_handle) = state.event_bus.subscribe_filtered("device.readings.".to_string()); + _handles.push(reading_handle); let eval_state = state.clone(); tokio::spawn(async move { loop { @@ -158,7 +164,8 @@ pub fn register_handlers_with_state(state: crate::state::HealthState) { // ── P1 事件消费者补全 ── // alert.triggered → 告警消息通知 - let (mut alert_rx, _alert_handle) = state.event_bus.subscribe_filtered("alert.".to_string()); + let (mut alert_rx, alert_handle) = state.event_bus.subscribe_filtered("alert.".to_string()); + _handles.push(alert_handle); let alert_db = state.db.clone(); let alert_bus = state.event_bus.clone(); tokio::spawn(async move { @@ -225,7 +232,8 @@ pub fn register_handlers_with_state(state: crate::state::HealthState) { }); // patient.created → 欢迎消息通知 - let (mut patient_rx, _patient_handle) = state.event_bus.subscribe_filtered("patient.".to_string()); + let (mut patient_rx, patient_handle) = state.event_bus.subscribe_filtered("patient.".to_string()); + _handles.push(patient_handle); let patient_db = state.db.clone(); let patient_bus = state.event_bus.clone(); tokio::spawn(async move { @@ -260,7 +268,8 @@ pub fn register_handlers_with_state(state: crate::state::HealthState) { }); // appointment.created/confirmed/cancelled → 通知 + 号源释放 - let (mut appt_rx, _appt_handle) = state.event_bus.subscribe_filtered("appointment.".to_string()); + let (mut appt_rx, appt_handle) = state.event_bus.subscribe_filtered("appointment.".to_string()); + _handles.push(appt_handle); let appt_db = state.db.clone(); let appt_bus = state.event_bus.clone(); tokio::spawn(async move { @@ -325,7 +334,8 @@ pub fn register_handlers_with_state(state: crate::state::HealthState) { }); // follow_up.overdue → 升级通知 - let (mut fu_rx, _fu_handle) = state.event_bus.subscribe_filtered("follow_up.".to_string()); + let (mut fu_rx, fu_handle) = state.event_bus.subscribe_filtered("follow_up.".to_string()); + _handles.push(fu_handle); let fu_db = state.db.clone(); let fu_bus = state.event_bus.clone(); tokio::spawn(async move { @@ -395,7 +405,8 @@ pub fn register_handlers_with_state(state: crate::state::HealthState) { }); // health_data.critical_alert → 创建危急值告警记录 - let (mut critical_rx, _critical_handle) = state.event_bus.subscribe_filtered("health_data.".to_string()); + let (mut critical_rx, critical_handle) = state.event_bus.subscribe_filtered("health_data.".to_string()); + _handles.push(critical_handle); let critical_state = state.clone(); tokio::spawn(async move { loop { @@ -409,10 +420,13 @@ pub fn register_handlers_with_state(state: crate::state::HealthState) { let patient_id = event.payload.get("patient_id") .and_then(|v| v.as_str()) .and_then(|s| Uuid::parse_str(s).ok()); - let alert_type = event.payload.get("alert_type").and_then(|v| v.as_str()).unwrap_or("vital_sign"); - let metric_name = event.payload.get("metric_name").and_then(|v| v.as_str()).unwrap_or("unknown"); - let metric_value = event.payload.get("metric_value").and_then(|v| v.as_str()).unwrap_or(""); - let threshold_value = event.payload.get("threshold_value").and_then(|v| v.as_str()).unwrap_or(""); + // alert 数据在嵌套的 "alert" 对象中 + let alert_obj = event.payload.get("alert"); + let alert_type = "vital_sign"; + let metric_name = alert_obj.and_then(|a| a.get("indicator")).and_then(|v| v.as_str()).unwrap_or("unknown"); + let metric_value = alert_obj.and_then(|a| a.get("value")).and_then(|v| v.as_f64()).map(|v| v.to_string()).unwrap_or_default(); + let threshold_value = alert_obj.and_then(|a| a.get("threshold")).and_then(|v| v.as_f64()).map(|v| v.to_string()).unwrap_or_default(); + let severity = alert_obj.and_then(|a| a.get("level")).and_then(|v| v.as_str()).unwrap_or("critical"); if let Some(pid) = patient_id { match crate::service::critical_alert_service::handle_critical_alert_event( @@ -421,9 +435,10 @@ pub fn register_handlers_with_state(state: crate::state::HealthState) { pid, alert_type, metric_name, - metric_value, - threshold_value, + &metric_value, + &threshold_value, None, + severity, ).await { Ok(alert_id) => { tracing::info!( @@ -453,7 +468,8 @@ pub fn register_handlers_with_state(state: crate::state::HealthState) { }); // ai.analysis.completed → 通知关联医生 - let (mut ai_rx, _ai_handle) = state.event_bus.subscribe_filtered("ai.".to_string()); + let (mut ai_rx, ai_handle) = state.event_bus.subscribe_filtered("ai.".to_string()); + _handles.push(ai_handle); let ai_db = state.db.clone(); let ai_bus = state.event_bus.clone(); tokio::spawn(async move { @@ -534,7 +550,8 @@ pub fn register_handlers_with_state(state: crate::state::HealthState) { }); // ai.analysis.completed → AI→行动闭环消费者(行动分发) - let (mut ai_action_rx, _ai_action_handle) = state.event_bus.subscribe_filtered("ai.".to_string()); + let (mut ai_action_rx, ai_action_handle) = state.event_bus.subscribe_filtered("ai.".to_string()); + _handles.push(ai_action_handle); let action_db = state.db.clone(); let action_event_bus = state.event_bus.clone(); tokio::spawn(async move { @@ -611,7 +628,8 @@ pub fn register_handlers_with_state(state: crate::state::HealthState) { }); // consent.granted/revoked → 通知关联医生 - let (mut consent_rx, _consent_handle) = state.event_bus.subscribe_filtered("consent.".to_string()); + let (mut consent_rx, consent_handle) = state.event_bus.subscribe_filtered("consent.".to_string()); + _handles.push(consent_handle); let consent_db = state.db.clone(); let consent_bus = state.event_bus.clone(); tokio::spawn(async move { @@ -674,7 +692,8 @@ pub fn register_handlers_with_state(state: crate::state::HealthState) { }); // consultation.opened/new_message → 通知相关方 - let (mut consult_rx, _consult_handle) = state.event_bus.subscribe_filtered("consultation.".to_string()); + let (mut consult_rx, consult_handle) = state.event_bus.subscribe_filtered("consultation.".to_string()); + _handles.push(consult_handle); let consult_db = state.db.clone(); let consult_bus = state.event_bus.clone(); tokio::spawn(async move { @@ -753,7 +772,8 @@ pub fn register_handlers_with_state(state: crate::state::HealthState) { }); // follow_up.created → 通知执行人 - let (mut fu_created_rx, _fu_created_handle) = state.event_bus.subscribe_filtered("follow_up.".to_string()); + let (mut fu_created_rx, fu_created_handle) = state.event_bus.subscribe_filtered("follow_up.".to_string()); + _handles.push(fu_created_handle); let fu_created_db = state.db.clone(); let fu_created_bus = state.event_bus.clone(); tokio::spawn(async move { @@ -789,7 +809,8 @@ pub fn register_handlers_with_state(state: crate::state::HealthState) { }); // points.earned/exchanged → 积分变动通知 - let (mut points_rx, _points_handle) = state.event_bus.subscribe_filtered("points.".to_string()); + let (mut points_rx, points_handle) = state.event_bus.subscribe_filtered("points.".to_string()); + _handles.push(points_handle); let points_db = state.db.clone(); let points_bus = state.event_bus.clone(); tokio::spawn(async move { @@ -871,7 +892,8 @@ pub fn register_handlers_with_state(state: crate::state::HealthState) { }); // lab_report.uploaded → 触发 AI 自动分析 - let (mut lab_upload_rx, _lab_upload_handle) = state.event_bus.subscribe_filtered("lab_report.".to_string()); + let (mut lab_upload_rx, lab_upload_handle) = state.event_bus.subscribe_filtered("lab_report.".to_string()); + _handles.push(lab_upload_handle); let lab_upload_db = state.db.clone(); let lab_upload_bus = state.event_bus.clone(); tokio::spawn(async move { @@ -928,7 +950,8 @@ pub fn register_handlers_with_state(state: crate::state::HealthState) { }); // patient.updated → 审计日志 - let (mut patient_update_rx, _patient_update_handle) = state.event_bus.subscribe_filtered("patient.".to_string()); + let (mut patient_update_rx, patient_update_handle) = state.event_bus.subscribe_filtered("patient.".to_string()); + _handles.push(patient_update_handle); let patient_update_db = state.db.clone(); tokio::spawn(async move { loop { @@ -946,11 +969,11 @@ pub fn register_handlers_with_state(state: crate::state::HealthState) { } } }); -} -// --------------------------------------------------------------------------- -// 单元测试 -// --------------------------------------------------------------------------- + // 防止 SubscriptionHandle 被 drop 导致 cancel channel 关闭 + // 所有过滤订阅的生命周期应与进程一致 + std::mem::forget(_handles); +} // 事件处理器本身依赖 tokio::spawn + channel + DB,无法纯单元测试。 // 以下测试覆盖: // 1. 事件类型常量的正确性(防止拼写错误导致消费者不匹配) diff --git a/crates/erp-health/src/service/critical_alert_service.rs b/crates/erp-health/src/service/critical_alert_service.rs index 3f5718e..71c12e5 100644 --- a/crates/erp-health/src/service/critical_alert_service.rs +++ b/crates/erp-health/src/service/critical_alert_service.rs @@ -19,6 +19,7 @@ pub async fn handle_critical_alert_event( metric_value: &str, threshold_value: &str, created_by: Option, + severity: &str, ) -> HealthResult { let id = Uuid::now_v7(); let now = Utc::now(); @@ -30,7 +31,7 @@ pub async fn handle_critical_alert_event( metric_name: Set(metric_name.to_string()), metric_value: Set(metric_value.to_string()), threshold_value: Set(threshold_value.to_string()), - severity: Set("critical".to_string()), + severity: Set(severity.to_string()), status: Set("pending".to_string()), acknowledged_by: Set(None), acknowledged_at: Set(None), diff --git a/crates/erp-health/src/service/health_data_service/alert.rs b/crates/erp-health/src/service/health_data_service/alert.rs index d19f844..12ab2c1 100644 --- a/crates/erp-health/src/service/health_data_service/alert.rs +++ b/crates/erp-health/src/service/health_data_service/alert.rs @@ -128,12 +128,20 @@ pub(crate) async fn check_vital_signs_alert( } /// 根据阈值配置检查单个指标值,匹配则添加到 alerts。 +/// 优先匹配 critical 级别,其次 warning。 fn check_indicator( thresholds: &[crate::entity::critical_value_threshold::Model], indicator: &str, value: f64, alerts: &mut Vec, ) { + let severity_order = |level: &str| match level { + "critical" => 0, + "warning" => 1, + _ => 2, + }; + + let mut best: Option<&crate::entity::critical_value_threshold::Model> = None; for t in thresholds { if t.indicator != indicator { continue; @@ -144,14 +152,75 @@ fn check_indicator( _ => false, }; if triggered { - alerts.push(serde_json::json!({ - "indicator": indicator, - "value": value, - "threshold": t.threshold_value, - "level": t.level, - "direction": t.direction, - })); - return; + let is_better = match best { + None => true, + Some(prev) => severity_order(&t.level) < severity_order(&prev.level), + }; + if is_better { + best = Some(t); + } } } + + if let Some(t) = best { + alerts.push(serde_json::json!({ + "indicator": indicator, + "value": value, + "threshold": t.threshold_value, + "level": t.level, + "direction": t.direction, + })); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn make_threshold(indicator: &str, direction: &str, value: f64, level: &str) -> crate::entity::critical_value_threshold::Model { + crate::entity::critical_value_threshold::Model { + id: uuid::Uuid::now_v7(), + tenant_id: uuid::Uuid::now_v7(), + indicator: indicator.to_string(), + direction: direction.to_string(), + threshold_value: value, + level: level.to_string(), + department: None, + age_min: None, + age_max: None, + is_active: true, + created_at: chrono::Utc::now(), + updated_at: chrono::Utc::now(), + created_by: None, + updated_by: None, + deleted_at: None, + version: 1, + } + } + + #[test] + fn check_indicator_prefers_critical_over_warning() { + let thresholds = vec![ + make_threshold("diastolic_bp", "high", 90.0, "warning"), + make_threshold("diastolic_bp", "high", 110.0, "critical"), + make_threshold("diastolic_bp", "low", 50.0, "critical"), + ]; + let mut alerts = Vec::new(); + check_indicator(&thresholds, "diastolic_bp", 145.0, &mut alerts); + assert_eq!(alerts.len(), 1); + assert_eq!(alerts[0]["level"].as_str(), Some("critical")); + assert_eq!(alerts[0]["threshold"].as_f64(), Some(110.0)); + } + + #[test] + fn check_indicator_critical_first_in_list() { + let thresholds = vec![ + make_threshold("systolic_bp", "high", 180.0, "critical"), + make_threshold("systolic_bp", "high", 140.0, "warning"), + ]; + let mut alerts = Vec::new(); + check_indicator(&thresholds, "systolic_bp", 250.0, &mut alerts); + assert_eq!(alerts.len(), 1); + assert_eq!(alerts[0]["level"].as_str(), Some("critical")); + } }