fix(health): 危急值告警全链路修复 — 消费者生命周期 + payload 映射 + 阈值优先级
Some checks failed
CI / rust-check (push) Has been cancelled
CI / rust-test (push) Has been cancelled
CI / frontend-build (push) Has been cancelled
CI / security-audit (push) Has been cancelled

1. CRITICAL: 修复 SubscriptionHandle 提前 drop 导致所有事件消费者失效
   - register_handlers_with_state 中所有 handle 在函数返回时被 drop
   - cancel channel 关闭导致 subscribe_filtered 的过滤任务退出
   - 方案: 收集所有 handle 并 std::mem::forget,生命周期与进程一致

2. HIGH: 修复 critical_alert 消费者 payload 字段映射不匹配
   - 消费者读取 alert_type/metric_name 等顶层字段,但实际在 alert 嵌套对象中
   - 更新消费者从 alert 对象提取 indicator/value/threshold/level
   - handle_critical_alert_event 增加 severity 参数

3. MEDIUM: 修复 check_indicator 优先匹配最高严重级别
   - 原实现返回第一个匹配的阈值(可能匹配 warning 而非 critical)
   - 改为遍历所有匹配阈值,选择 severity 最高的(critical > warning)

4. MEDIUM: 修复危急值阈值页面不自动加载数据
   - CriticalValueThresholdList 添加 useEffect 初始化加载
This commit is contained in:
iven
2026-05-05 10:11:06 +08:00
parent 2acd9485c7
commit 15b5781dbb
4 changed files with 132 additions and 35 deletions

View File

@@ -1,4 +1,4 @@
import { useState, useCallback } from 'react';
import { useState, useCallback, useEffect } from 'react';
import {
Button, Form, Input, InputNumber, message, Modal, Popconfirm, Result, Select, Space, Switch, Table, Tag,
} from 'antd';
@@ -45,6 +45,10 @@ export default function CriticalValueThresholdList() {
}
}, []);
useEffect(() => {
fetchData();
}, [fetchData]);
const handleCreate = () => {
setEditRecord(null);
form.resetFields();

View File

@@ -73,8 +73,13 @@ pub fn register_handlers(_bus: &EventBus) {
/// 带 HealthState 的事件订阅 — 在模块 on_startup 时调用
pub fn register_handlers_with_state(state: crate::state::HealthState) {
// 收集所有 SubscriptionHandle 并 forget防止函数返回时 handle 被 drop
// 导致 cancel channel 关闭、过滤任务退出、消费者全部失效
let mut _handles: Vec<erp_core::events::SubscriptionHandle> = Vec::new();
// workflow.task.completed → 更新随访任务状态为 completed
let (mut workflow_rx, _wf_handle) = state.event_bus.subscribe_filtered("workflow.task.".to_string());
let (mut workflow_rx, wf_handle) = state.event_bus.subscribe_filtered("workflow.task.".to_string());
_handles.push(wf_handle);
let wf_db = state.db.clone();
tokio::spawn(async move {
loop {
@@ -124,7 +129,8 @@ pub fn register_handlers_with_state(state: crate::state::HealthState) {
});
// device.readings.synced → 触发告警引擎评估
let (mut reading_rx, _reading_handle) = state.event_bus.subscribe_filtered("device.readings.".to_string());
let (mut reading_rx, reading_handle) = state.event_bus.subscribe_filtered("device.readings.".to_string());
_handles.push(reading_handle);
let eval_state = state.clone();
tokio::spawn(async move {
loop {
@@ -158,7 +164,8 @@ pub fn register_handlers_with_state(state: crate::state::HealthState) {
// ── P1 事件消费者补全 ──
// alert.triggered → 告警消息通知
let (mut alert_rx, _alert_handle) = state.event_bus.subscribe_filtered("alert.".to_string());
let (mut alert_rx, alert_handle) = state.event_bus.subscribe_filtered("alert.".to_string());
_handles.push(alert_handle);
let alert_db = state.db.clone();
let alert_bus = state.event_bus.clone();
tokio::spawn(async move {
@@ -225,7 +232,8 @@ pub fn register_handlers_with_state(state: crate::state::HealthState) {
});
// patient.created → 欢迎消息通知
let (mut patient_rx, _patient_handle) = state.event_bus.subscribe_filtered("patient.".to_string());
let (mut patient_rx, patient_handle) = state.event_bus.subscribe_filtered("patient.".to_string());
_handles.push(patient_handle);
let patient_db = state.db.clone();
let patient_bus = state.event_bus.clone();
tokio::spawn(async move {
@@ -260,7 +268,8 @@ pub fn register_handlers_with_state(state: crate::state::HealthState) {
});
// appointment.created/confirmed/cancelled → 通知 + 号源释放
let (mut appt_rx, _appt_handle) = state.event_bus.subscribe_filtered("appointment.".to_string());
let (mut appt_rx, appt_handle) = state.event_bus.subscribe_filtered("appointment.".to_string());
_handles.push(appt_handle);
let appt_db = state.db.clone();
let appt_bus = state.event_bus.clone();
tokio::spawn(async move {
@@ -325,7 +334,8 @@ pub fn register_handlers_with_state(state: crate::state::HealthState) {
});
// follow_up.overdue → 升级通知
let (mut fu_rx, _fu_handle) = state.event_bus.subscribe_filtered("follow_up.".to_string());
let (mut fu_rx, fu_handle) = state.event_bus.subscribe_filtered("follow_up.".to_string());
_handles.push(fu_handle);
let fu_db = state.db.clone();
let fu_bus = state.event_bus.clone();
tokio::spawn(async move {
@@ -395,7 +405,8 @@ pub fn register_handlers_with_state(state: crate::state::HealthState) {
});
// health_data.critical_alert → 创建危急值告警记录
let (mut critical_rx, _critical_handle) = state.event_bus.subscribe_filtered("health_data.".to_string());
let (mut critical_rx, critical_handle) = state.event_bus.subscribe_filtered("health_data.".to_string());
_handles.push(critical_handle);
let critical_state = state.clone();
tokio::spawn(async move {
loop {
@@ -409,10 +420,13 @@ pub fn register_handlers_with_state(state: crate::state::HealthState) {
let patient_id = event.payload.get("patient_id")
.and_then(|v| v.as_str())
.and_then(|s| Uuid::parse_str(s).ok());
let alert_type = event.payload.get("alert_type").and_then(|v| v.as_str()).unwrap_or("vital_sign");
let metric_name = event.payload.get("metric_name").and_then(|v| v.as_str()).unwrap_or("unknown");
let metric_value = event.payload.get("metric_value").and_then(|v| v.as_str()).unwrap_or("");
let threshold_value = event.payload.get("threshold_value").and_then(|v| v.as_str()).unwrap_or("");
// alert 数据在嵌套的 "alert" 对象中
let alert_obj = event.payload.get("alert");
let alert_type = "vital_sign";
let metric_name = alert_obj.and_then(|a| a.get("indicator")).and_then(|v| v.as_str()).unwrap_or("unknown");
let metric_value = alert_obj.and_then(|a| a.get("value")).and_then(|v| v.as_f64()).map(|v| v.to_string()).unwrap_or_default();
let threshold_value = alert_obj.and_then(|a| a.get("threshold")).and_then(|v| v.as_f64()).map(|v| v.to_string()).unwrap_or_default();
let severity = alert_obj.and_then(|a| a.get("level")).and_then(|v| v.as_str()).unwrap_or("critical");
if let Some(pid) = patient_id {
match crate::service::critical_alert_service::handle_critical_alert_event(
@@ -421,9 +435,10 @@ pub fn register_handlers_with_state(state: crate::state::HealthState) {
pid,
alert_type,
metric_name,
metric_value,
threshold_value,
&metric_value,
&threshold_value,
None,
severity,
).await {
Ok(alert_id) => {
tracing::info!(
@@ -453,7 +468,8 @@ pub fn register_handlers_with_state(state: crate::state::HealthState) {
});
// ai.analysis.completed → 通知关联医生
let (mut ai_rx, _ai_handle) = state.event_bus.subscribe_filtered("ai.".to_string());
let (mut ai_rx, ai_handle) = state.event_bus.subscribe_filtered("ai.".to_string());
_handles.push(ai_handle);
let ai_db = state.db.clone();
let ai_bus = state.event_bus.clone();
tokio::spawn(async move {
@@ -534,7 +550,8 @@ pub fn register_handlers_with_state(state: crate::state::HealthState) {
});
// ai.analysis.completed → AI→行动闭环消费者行动分发
let (mut ai_action_rx, _ai_action_handle) = state.event_bus.subscribe_filtered("ai.".to_string());
let (mut ai_action_rx, ai_action_handle) = state.event_bus.subscribe_filtered("ai.".to_string());
_handles.push(ai_action_handle);
let action_db = state.db.clone();
let action_event_bus = state.event_bus.clone();
tokio::spawn(async move {
@@ -611,7 +628,8 @@ pub fn register_handlers_with_state(state: crate::state::HealthState) {
});
// consent.granted/revoked → 通知关联医生
let (mut consent_rx, _consent_handle) = state.event_bus.subscribe_filtered("consent.".to_string());
let (mut consent_rx, consent_handle) = state.event_bus.subscribe_filtered("consent.".to_string());
_handles.push(consent_handle);
let consent_db = state.db.clone();
let consent_bus = state.event_bus.clone();
tokio::spawn(async move {
@@ -674,7 +692,8 @@ pub fn register_handlers_with_state(state: crate::state::HealthState) {
});
// consultation.opened/new_message → 通知相关方
let (mut consult_rx, _consult_handle) = state.event_bus.subscribe_filtered("consultation.".to_string());
let (mut consult_rx, consult_handle) = state.event_bus.subscribe_filtered("consultation.".to_string());
_handles.push(consult_handle);
let consult_db = state.db.clone();
let consult_bus = state.event_bus.clone();
tokio::spawn(async move {
@@ -753,7 +772,8 @@ pub fn register_handlers_with_state(state: crate::state::HealthState) {
});
// follow_up.created → 通知执行人
let (mut fu_created_rx, _fu_created_handle) = state.event_bus.subscribe_filtered("follow_up.".to_string());
let (mut fu_created_rx, fu_created_handle) = state.event_bus.subscribe_filtered("follow_up.".to_string());
_handles.push(fu_created_handle);
let fu_created_db = state.db.clone();
let fu_created_bus = state.event_bus.clone();
tokio::spawn(async move {
@@ -789,7 +809,8 @@ pub fn register_handlers_with_state(state: crate::state::HealthState) {
});
// points.earned/exchanged → 积分变动通知
let (mut points_rx, _points_handle) = state.event_bus.subscribe_filtered("points.".to_string());
let (mut points_rx, points_handle) = state.event_bus.subscribe_filtered("points.".to_string());
_handles.push(points_handle);
let points_db = state.db.clone();
let points_bus = state.event_bus.clone();
tokio::spawn(async move {
@@ -871,7 +892,8 @@ pub fn register_handlers_with_state(state: crate::state::HealthState) {
});
// lab_report.uploaded → 触发 AI 自动分析
let (mut lab_upload_rx, _lab_upload_handle) = state.event_bus.subscribe_filtered("lab_report.".to_string());
let (mut lab_upload_rx, lab_upload_handle) = state.event_bus.subscribe_filtered("lab_report.".to_string());
_handles.push(lab_upload_handle);
let lab_upload_db = state.db.clone();
let lab_upload_bus = state.event_bus.clone();
tokio::spawn(async move {
@@ -928,7 +950,8 @@ pub fn register_handlers_with_state(state: crate::state::HealthState) {
});
// patient.updated → 审计日志
let (mut patient_update_rx, _patient_update_handle) = state.event_bus.subscribe_filtered("patient.".to_string());
let (mut patient_update_rx, patient_update_handle) = state.event_bus.subscribe_filtered("patient.".to_string());
_handles.push(patient_update_handle);
let patient_update_db = state.db.clone();
tokio::spawn(async move {
loop {
@@ -946,11 +969,11 @@ pub fn register_handlers_with_state(state: crate::state::HealthState) {
}
}
});
}
// ---------------------------------------------------------------------------
// 单元测试
// ---------------------------------------------------------------------------
// 防止 SubscriptionHandle 被 drop 导致 cancel channel 关闭
// 所有过滤订阅的生命周期应与进程一致
std::mem::forget(_handles);
}
// 事件处理器本身依赖 tokio::spawn + channel + DB无法纯单元测试。
// 以下测试覆盖:
// 1. 事件类型常量的正确性(防止拼写错误导致消费者不匹配)

View File

@@ -19,6 +19,7 @@ pub async fn handle_critical_alert_event(
metric_value: &str,
threshold_value: &str,
created_by: Option<Uuid>,
severity: &str,
) -> HealthResult<Uuid> {
let id = Uuid::now_v7();
let now = Utc::now();
@@ -30,7 +31,7 @@ pub async fn handle_critical_alert_event(
metric_name: Set(metric_name.to_string()),
metric_value: Set(metric_value.to_string()),
threshold_value: Set(threshold_value.to_string()),
severity: Set("critical".to_string()),
severity: Set(severity.to_string()),
status: Set("pending".to_string()),
acknowledged_by: Set(None),
acknowledged_at: Set(None),

View File

@@ -128,12 +128,20 @@ pub(crate) async fn check_vital_signs_alert(
}
/// 根据阈值配置检查单个指标值,匹配则添加到 alerts。
/// 优先匹配 critical 级别,其次 warning。
fn check_indicator(
thresholds: &[crate::entity::critical_value_threshold::Model],
indicator: &str,
value: f64,
alerts: &mut Vec<serde_json::Value>,
) {
let severity_order = |level: &str| match level {
"critical" => 0,
"warning" => 1,
_ => 2,
};
let mut best: Option<&crate::entity::critical_value_threshold::Model> = None;
for t in thresholds {
if t.indicator != indicator {
continue;
@@ -144,14 +152,75 @@ fn check_indicator(
_ => false,
};
if triggered {
alerts.push(serde_json::json!({
"indicator": indicator,
"value": value,
"threshold": t.threshold_value,
"level": t.level,
"direction": t.direction,
}));
return;
let is_better = match best {
None => true,
Some(prev) => severity_order(&t.level) < severity_order(&prev.level),
};
if is_better {
best = Some(t);
}
}
}
if let Some(t) = best {
alerts.push(serde_json::json!({
"indicator": indicator,
"value": value,
"threshold": t.threshold_value,
"level": t.level,
"direction": t.direction,
}));
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_threshold(indicator: &str, direction: &str, value: f64, level: &str) -> crate::entity::critical_value_threshold::Model {
crate::entity::critical_value_threshold::Model {
id: uuid::Uuid::now_v7(),
tenant_id: uuid::Uuid::now_v7(),
indicator: indicator.to_string(),
direction: direction.to_string(),
threshold_value: value,
level: level.to_string(),
department: None,
age_min: None,
age_max: None,
is_active: true,
created_at: chrono::Utc::now(),
updated_at: chrono::Utc::now(),
created_by: None,
updated_by: None,
deleted_at: None,
version: 1,
}
}
#[test]
fn check_indicator_prefers_critical_over_warning() {
let thresholds = vec![
make_threshold("diastolic_bp", "high", 90.0, "warning"),
make_threshold("diastolic_bp", "high", 110.0, "critical"),
make_threshold("diastolic_bp", "low", 50.0, "critical"),
];
let mut alerts = Vec::new();
check_indicator(&thresholds, "diastolic_bp", 145.0, &mut alerts);
assert_eq!(alerts.len(), 1);
assert_eq!(alerts[0]["level"].as_str(), Some("critical"));
assert_eq!(alerts[0]["threshold"].as_f64(), Some(110.0));
}
#[test]
fn check_indicator_critical_first_in_list() {
let thresholds = vec![
make_threshold("systolic_bp", "high", 180.0, "critical"),
make_threshold("systolic_bp", "high", 140.0, "warning"),
];
let mut alerts = Vec::new();
check_indicator(&thresholds, "systolic_bp", 250.0, &mut alerts);
assert_eq!(alerts.len(), 1);
assert_eq!(alerts[0]["level"].as_str(), Some("critical"));
}
}