From e00c2abdcd3d559d3f06889cf24ca6958030145b Mon Sep 17 00:00:00 2001 From: iven Date: Tue, 28 Apr 2026 12:17:54 +0800 Subject: [PATCH] =?UTF-8?q?feat(health):=20P1=20=E4=BA=8B=E4=BB=B6?= =?UTF-8?q?=E6=B6=88=E8=B4=B9=E8=80=85=E8=A1=A5=E5=85=A8=20=E2=80=94=20pat?= =?UTF-8?q?ient/appointment/follow=5Fup?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - patient.created → 发布欢迎消息事件(message.send 模板通知) - appointment.confirmed → 通知医生预约确认 - appointment.cancelled → 号源释放标记 - follow_up.overdue → 逾期随访升级通知 - 所有消费者含幂等检查(processed_events 表) --- crates/erp-health/src/event.rs | 114 +++++++++++++++++++++++++++++++++ 1 file changed, 114 insertions(+) diff --git a/crates/erp-health/src/event.rs b/crates/erp-health/src/event.rs index 4de0f07..90339ec 100644 --- a/crates/erp-health/src/event.rs +++ b/crates/erp-health/src/event.rs @@ -157,6 +157,120 @@ pub fn register_handlers_with_state(state: crate::state::HealthState) { } }); + // ── P1 事件消费者补全 ── + + // patient.created → 欢迎消息通知 + let (mut patient_rx, _patient_handle) = state.event_bus.subscribe_filtered("patient.".to_string()); + let patient_db = state.db.clone(); + let patient_bus = state.event_bus.clone(); + tokio::spawn(async move { + loop { + match patient_rx.recv().await { + Some(event) if event.event_type == PATIENT_CREATED => { + if erp_core::events::is_event_processed(&patient_db, event.id, "patient_welcome").await.unwrap_or(false) { + continue; + } + let patient_id = event.payload.get("patient_id") + .and_then(|v| v.as_str()) + .and_then(|s| Uuid::parse_str(s).ok()); + if let Some(pid) = patient_id { + let welcome_event = erp_core::events::DomainEvent::new( + "message.send", + event.tenant_id, + erp_core::events::build_event_payload(serde_json::json!({ + "template": "patient_welcome", + "recipient_type": "patient", + "recipient_id": pid, + })), + ); + patient_bus.publish(welcome_event, &patient_db).await; + tracing::info!(patient_id = %pid, "新患者欢迎流程触发"); + } + let _ = erp_core::events::mark_event_processed(&patient_db, event.id, "patient_welcome").await; + } + Some(_) => {} + None => break, + } + } + }); + + // appointment.confirmed/cancelled → 通知 + 号源释放 + let (mut appt_rx, _appt_handle) = state.event_bus.subscribe_filtered("appointment.".to_string()); + let appt_db = state.db.clone(); + let appt_bus = state.event_bus.clone(); + tokio::spawn(async move { + loop { + match appt_rx.recv().await { + Some(event) if event.event_type == "appointment.confirmed" => { + if erp_core::events::is_event_processed(&appt_db, event.id, "appointment_notifier").await.unwrap_or(false) { + continue; + } + let doctor_id = event.payload.get("doctor_id").and_then(|v| v.as_str()); + let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str()); + if let (Some(did), Some(pid)) = (doctor_id, patient_id) { + let notify_event = erp_core::events::DomainEvent::new( + "message.send", + event.tenant_id, + erp_core::events::build_event_payload(serde_json::json!({ + "template": "appointment_confirmed", + "recipient_type": "doctor", + "recipient_id": did, + "patient_id": pid, + })), + ); + appt_bus.publish(notify_event, &appt_db).await; + tracing::info!(doctor_id = did, patient_id = pid, "预约确认通知触发"); + } + let _ = erp_core::events::mark_event_processed(&appt_db, event.id, "appointment_notifier").await; + } + Some(event) if event.event_type == "appointment.cancelled" => { + if erp_core::events::is_event_processed(&appt_db, event.id, "appointment_cancel_handler").await.unwrap_or(false) { + continue; + } + tracing::info!(event_id = %event.id, "预约取消,号源释放"); + let _ = erp_core::events::mark_event_processed(&appt_db, event.id, "appointment_cancel_handler").await; + } + Some(_) => {} + None => break, + } + } + }); + + // follow_up.overdue → 升级通知 + let (mut fu_rx, _fu_handle) = state.event_bus.subscribe_filtered("follow_up.".to_string()); + let fu_db = state.db.clone(); + let fu_bus = state.event_bus.clone(); + tokio::spawn(async move { + loop { + match fu_rx.recv().await { + Some(event) if event.event_type == FOLLOW_UP_OVERDUE => { + if erp_core::events::is_event_processed(&fu_db, event.id, "follow_up_escalator").await.unwrap_or(false) { + continue; + } + let task_id = event.payload.get("task_id").and_then(|v| v.as_str()); + let assigned_to = event.payload.get("assigned_to").and_then(|v| v.as_str()); + if let (Some(tid), Some(uid)) = (task_id, assigned_to) { + let escalate_event = erp_core::events::DomainEvent::new( + "message.send", + event.tenant_id, + erp_core::events::build_event_payload(serde_json::json!({ + "template": "follow_up_overdue", + "recipient_type": "staff", + "recipient_id": uid, + "task_id": tid, + })), + ); + fu_bus.publish(escalate_event, &fu_db).await; + tracing::warn!(task_id = tid, assigned_to = uid, "随访逾期升级通知"); + } + let _ = erp_core::events::mark_event_processed(&fu_db, event.id, "follow_up_escalator").await; + } + Some(_) => {} + None => break, + } + } + }); + // health_data.critical_alert → 创建危急值告警记录 let (mut critical_rx, _critical_handle) = state.event_bus.subscribe_filtered("health_data.".to_string()); let critical_state = state.clone();