feat(health): P1 事件消费者补全 — patient/appointment/follow_up

- patient.created → 发布欢迎消息事件(message.send 模板通知)
- appointment.confirmed → 通知医生预约确认
- appointment.cancelled → 号源释放标记
- follow_up.overdue → 逾期随访升级通知
- 所有消费者含幂等检查(processed_events 表)
This commit is contained in:
iven
2026-04-28 12:17:54 +08:00
parent 147fd886e3
commit e00c2abdcd

View File

@@ -157,6 +157,120 @@ pub fn register_handlers_with_state(state: crate::state::HealthState) {
}
});
// ── P1 事件消费者补全 ──
// patient.created → 欢迎消息通知
let (mut patient_rx, _patient_handle) = state.event_bus.subscribe_filtered("patient.".to_string());
let patient_db = state.db.clone();
let patient_bus = state.event_bus.clone();
tokio::spawn(async move {
loop {
match patient_rx.recv().await {
Some(event) if event.event_type == PATIENT_CREATED => {
if erp_core::events::is_event_processed(&patient_db, event.id, "patient_welcome").await.unwrap_or(false) {
continue;
}
let patient_id = event.payload.get("patient_id")
.and_then(|v| v.as_str())
.and_then(|s| Uuid::parse_str(s).ok());
if let Some(pid) = patient_id {
let welcome_event = erp_core::events::DomainEvent::new(
"message.send",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"template": "patient_welcome",
"recipient_type": "patient",
"recipient_id": pid,
})),
);
patient_bus.publish(welcome_event, &patient_db).await;
tracing::info!(patient_id = %pid, "新患者欢迎流程触发");
}
let _ = erp_core::events::mark_event_processed(&patient_db, event.id, "patient_welcome").await;
}
Some(_) => {}
None => break,
}
}
});
// appointment.confirmed/cancelled → 通知 + 号源释放
let (mut appt_rx, _appt_handle) = state.event_bus.subscribe_filtered("appointment.".to_string());
let appt_db = state.db.clone();
let appt_bus = state.event_bus.clone();
tokio::spawn(async move {
loop {
match appt_rx.recv().await {
Some(event) if event.event_type == "appointment.confirmed" => {
if erp_core::events::is_event_processed(&appt_db, event.id, "appointment_notifier").await.unwrap_or(false) {
continue;
}
let doctor_id = event.payload.get("doctor_id").and_then(|v| v.as_str());
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
if let (Some(did), Some(pid)) = (doctor_id, patient_id) {
let notify_event = erp_core::events::DomainEvent::new(
"message.send",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"template": "appointment_confirmed",
"recipient_type": "doctor",
"recipient_id": did,
"patient_id": pid,
})),
);
appt_bus.publish(notify_event, &appt_db).await;
tracing::info!(doctor_id = did, patient_id = pid, "预约确认通知触发");
}
let _ = erp_core::events::mark_event_processed(&appt_db, event.id, "appointment_notifier").await;
}
Some(event) if event.event_type == "appointment.cancelled" => {
if erp_core::events::is_event_processed(&appt_db, event.id, "appointment_cancel_handler").await.unwrap_or(false) {
continue;
}
tracing::info!(event_id = %event.id, "预约取消,号源释放");
let _ = erp_core::events::mark_event_processed(&appt_db, event.id, "appointment_cancel_handler").await;
}
Some(_) => {}
None => break,
}
}
});
// follow_up.overdue → 升级通知
let (mut fu_rx, _fu_handle) = state.event_bus.subscribe_filtered("follow_up.".to_string());
let fu_db = state.db.clone();
let fu_bus = state.event_bus.clone();
tokio::spawn(async move {
loop {
match fu_rx.recv().await {
Some(event) if event.event_type == FOLLOW_UP_OVERDUE => {
if erp_core::events::is_event_processed(&fu_db, event.id, "follow_up_escalator").await.unwrap_or(false) {
continue;
}
let task_id = event.payload.get("task_id").and_then(|v| v.as_str());
let assigned_to = event.payload.get("assigned_to").and_then(|v| v.as_str());
if let (Some(tid), Some(uid)) = (task_id, assigned_to) {
let escalate_event = erp_core::events::DomainEvent::new(
"message.send",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"template": "follow_up_overdue",
"recipient_type": "staff",
"recipient_id": uid,
"task_id": tid,
})),
);
fu_bus.publish(escalate_event, &fu_db).await;
tracing::warn!(task_id = tid, assigned_to = uid, "随访逾期升级通知");
}
let _ = erp_core::events::mark_event_processed(&fu_db, event.id, "follow_up_escalator").await;
}
Some(_) => {}
None => break,
}
}
});
// health_data.critical_alert → 创建危急值告警记录
let (mut critical_rx, _critical_handle) = state.event_bus.subscribe_filtered("health_data.".to_string());
let critical_state = state.clone();