Files
hms/crates/erp-health/src/event/alert.rs
iven fa1dc764a3 feat(health+ai): P2 咨询联动 + AI 巡检消费 — 全链路打通
业务链路打通 5/5 断点全部完成:
- 咨询→随访:医生端新增"创建随访"按钮,从咨询会话直接创建随访任务
- 咨询→AI:医生端新增"AI 分析"按钮,对咨询上下文触发 AI 分析
- 告警→咨询:小程序告警详情页新增"在线咨询"快捷入口
- AI 巡检消费:erp-ai 新增 patrol_consumer,订阅 ai.patrol.requested 事件
- 前端联动:Web ConsultationDetail + 小程序 alerts 页面联动实现

后端:2 新 API + 2 handler + 1 service + AI event consumer
前端:Web 2 API + 1 页面改造 + 小程序 2 页面改造
测试:Web consultations.test.ts 9/9 通过
2026-05-20 17:50:49 +08:00

152 lines
7.1 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
use sea_orm::{ColumnTrait, EntityTrait, QueryFilter};
/// alert.triggered → 告警消息通知 + 告警聚合
pub fn spawn(state: &crate::state::HealthState) -> Vec<erp_core::events::SubscriptionHandle> {
let mut handles = Vec::new();
// alert.triggered → 告警消息通知
let (mut alert_rx, alert_handle) = state.event_bus.subscribe_filtered("alert.".to_string());
handles.push(alert_handle);
let alert_db = state.db.clone();
let alert_bus = state.event_bus.clone();
tokio::spawn(async move {
loop {
match alert_rx.recv().await {
Some(event) if event.event_type == super::ALERT_TRIGGERED => {
if erp_core::events::is_event_processed(&alert_db, event.id, "alert_notifier")
.await
.unwrap_or(false)
{
continue;
}
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
let severity = event
.payload
.get("severity")
.and_then(|v| v.as_str())
.unwrap_or("warning");
let rule_name = event
.payload
.get("rule_name")
.and_then(|v| v.as_str())
.unwrap_or("健康告警");
if let Some(pid) = patient_id {
// 检查患者是否有活跃咨询会话active / waiting
let patient_uuid = uuid::Uuid::parse_str(pid).ok();
let active_session = if let Some(puid) = patient_uuid {
crate::entity::consultation_session::Entity::find()
.filter(
crate::entity::consultation_session::Column::PatientId.eq(puid),
)
.filter(
crate::entity::consultation_session::Column::TenantId
.eq(event.tenant_id),
)
.filter(
crate::entity::consultation_session::Column::DeletedAt
.is_null(),
)
.filter(
sea_orm::Condition::any()
.add(
crate::entity::consultation_session::Column::Status
.eq("active"),
)
.add(
crate::entity::consultation_session::Column::Status
.eq("waiting"),
),
)
.one(&alert_db)
.await
.ok()
.flatten()
} else {
None
};
let consultation_session_id =
active_session.as_ref().map(|s| s.id.to_string());
let mut params = serde_json::json!({
"rule_name": rule_name,
"severity": severity,
"suggested_action": "consult",
});
if let Some(ref sid) = consultation_session_id {
params["consultation_session_id"] = serde_json::json!(sid);
}
let notify_event = erp_core::events::DomainEvent::new(
"message.send",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"channel": "in_app",
"recipient_type": "patient",
"recipient_id": pid,
"template_key": if severity == "critical" { "CRITICAL_HEALTH_ALERT" } else { "HEALTH_DATA_ABNORMAL" },
"params": params,
})),
);
alert_bus.publish(notify_event, &alert_db).await;
tracing::info!(
patient_id = %pid,
severity = %severity,
consultation_session_id = ?consultation_session_id,
"告警通知已发送(含咨询联动建议)"
);
}
let _ = erp_core::events::mark_event_processed(
&alert_db,
event.id,
"alert_notifier",
)
.await;
}
Some(event) if event.event_type == super::ALERT_TRIGGERED => {
// 被抑制的告警 → 发布聚合事件
if erp_core::events::is_event_processed(&alert_db, event.id, "alert_aggregator")
.await
.unwrap_or(false)
{
continue;
}
let is_suppressed = event
.payload
.get("suppressed")
.and_then(|v| v.as_bool())
.unwrap_or(false);
if is_suppressed {
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
if let Some(pid) = patient_id {
let aggregated_event = erp_core::events::DomainEvent::new(
super::ALERT_AGGREGATED,
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"patient_id": pid,
"triggering_alert_id": event.payload.get("alert_id").and_then(|v| v.as_str()),
"severity": event.payload.get("severity"),
})),
);
alert_bus.publish(aggregated_event, &alert_db).await;
tracing::info!(patient_id = %pid, "告警聚合事件已发布");
}
}
let _ = erp_core::events::mark_event_processed(
&alert_db,
event.id,
"alert_aggregator",
)
.await;
}
Some(_) => {}
None => break,
}
}
});
handles
}