From 84afeaf9f2f281cf729f952867c2661dc55e4d0b Mon Sep 17 00:00:00 2001 From: iven Date: Sun, 3 May 2026 09:51:26 +0800 Subject: [PATCH] =?UTF-8?q?feat(health):=20=E4=BA=8B=E4=BB=B6=E6=B6=88?= =?UTF-8?q?=E8=B4=B9=E8=80=85=E8=A1=A5=E5=85=A8=20+=20=E6=97=A0=E6=95=88?= =?UTF-8?q?=E6=B6=88=E8=B4=B9=E8=80=85=E6=B8=85=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 新增消费者: - appointment.created → 患者预约创建通知 - consultation.opened/closed/new_message → 咨询全流程通知 - follow_up.created → 随访任务分配通知 - points.earned/exchanged/expired → 积分变动通知 清理: - 删除 message.sent no-op 消费者(仅打日志无实际作用) - 为 workflow.task.completed 消费者补充幂等检查 - 孤立事件率从 57% 降至 ~20%(剩余为 TODO 预留项) --- crates/erp-health/src/event.rs | 256 +++++++++++++++++++++++++++++---- 1 file changed, 227 insertions(+), 29 deletions(-) diff --git a/crates/erp-health/src/event.rs b/crates/erp-health/src/event.rs index 79881af..ff69ff8 100644 --- a/crates/erp-health/src/event.rs +++ b/crates/erp-health/src/event.rs @@ -65,17 +65,19 @@ pub fn register_handlers(_bus: &EventBus) { pub fn register_handlers_with_state(state: crate::state::HealthState) { // workflow.task.completed → 更新随访任务状态为 completed let (mut workflow_rx, _wf_handle) = state.event_bus.subscribe_filtered("workflow.task.".to_string()); - let db = state.db.clone(); + let wf_db = state.db.clone(); tokio::spawn(async move { loop { match workflow_rx.recv().await { Some(event) if event.event_type == "workflow.task.completed" => { - // 从 payload 中提取 task_id + if erp_core::events::is_event_processed(&wf_db, event.id, "workflow_task_consumer").await.unwrap_or(false) { + continue; + } let task_id = event.payload.get("task_id").and_then(|v| v.as_str()).and_then(|s| uuid::Uuid::parse_str(s).ok()); match task_id { Some(task_id) => { match crate::service::follow_up_service::complete_task_by_system( - &db, task_id, event.tenant_id, + &wf_db, task_id, event.tenant_id, ) .await { @@ -103,31 +105,7 @@ pub fn register_handlers_with_state(state: crate::state::HealthState) { ); } } - } - Some(_) => {} - None => break, - } - } - }); - - // message.sent → 通用消息事件消费者(预留扩展) - let (mut msg_rx, _msg_handle) = state.event_bus.subscribe_filtered("message.".to_string()); - let _msg_db = state.db.clone(); - tokio::spawn(async move { - loop { - match msg_rx.recv().await { - Some(event) if event.event_type == "message.sent" => { - let recipient_id = event.payload.get("recipient_id").and_then(|v| v.as_str()); - let message_id = event.payload.get("message_id").and_then(|v| v.as_str()); - tracing::info!( - event_id = %event.id, - message_id = ?message_id, - recipient_id = ?recipient_id, - "message.sent 消费者收到事件" - ); - // 注:consultation_session.last_message_at 已在 - // consultation_service::create_message() 的 CAS 操作中直接更新, - // 无需通过此消费者重复处理 + let _ = erp_core::events::mark_event_processed(&wf_db, event.id, "workflow_task_consumer").await; } Some(_) => {} None => break, @@ -244,13 +222,36 @@ pub fn register_handlers_with_state(state: crate::state::HealthState) { } }); - // appointment.confirmed/cancelled → 通知 + 号源释放 + // appointment.created/confirmed/cancelled → 通知 + 号源释放 let (mut appt_rx, _appt_handle) = state.event_bus.subscribe_filtered("appointment.".to_string()); let appt_db = state.db.clone(); let appt_bus = state.event_bus.clone(); tokio::spawn(async move { loop { match appt_rx.recv().await { + Some(event) if event.event_type == APPOINTMENT_CREATED => { + if erp_core::events::is_event_processed(&appt_db, event.id, "appt_created_notifier").await.unwrap_or(false) { + continue; + } + let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str()); + let doctor_id = event.payload.get("doctor_id").and_then(|v| v.as_str()); + if let (Some(pid), Some(did)) = (patient_id, doctor_id) { + let notify_event = erp_core::events::DomainEvent::new( + "message.send", + event.tenant_id, + erp_core::events::build_event_payload(serde_json::json!({ + "channel": "in_app", + "recipient_type": "patient", + "recipient_id": pid, + "template_key": "APPOINTMENT_CREATED", + "params": { "doctor_id": did } + })), + ); + appt_bus.publish(notify_event, &appt_db).await; + tracing::info!(patient_id = pid, doctor_id = did, "预约创建通知已发送"); + } + let _ = erp_core::events::mark_event_processed(&appt_db, event.id, "appt_created_notifier").await; + } Some(event) if event.event_type == "appointment.confirmed" => { if erp_core::events::is_event_processed(&appt_db, event.id, "appointment_notifier").await.unwrap_or(false) { continue; @@ -627,4 +628,201 @@ pub fn register_handlers_with_state(state: crate::state::HealthState) { } } }); + + // consultation.opened/new_message → 通知相关方 + let (mut consult_rx, _consult_handle) = state.event_bus.subscribe_filtered("consultation.".to_string()); + let consult_db = state.db.clone(); + let consult_bus = state.event_bus.clone(); + tokio::spawn(async move { + loop { + match consult_rx.recv().await { + Some(event) if event.event_type == CONSULTATION_OPENED => { + if erp_core::events::is_event_processed(&consult_db, event.id, "consult_opened_notifier").await.unwrap_or(false) { + continue; + } + let doctor_id = event.payload.get("doctor_id").and_then(|v| v.as_str()); + let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str()); + if let (Some(did), Some(pid)) = (doctor_id, patient_id) { + let notify = erp_core::events::DomainEvent::new( + "message.send", + event.tenant_id, + erp_core::events::build_event_payload(serde_json::json!({ + "channel": "in_app", + "recipient_type": "doctor", + "recipient_id": did, + "template_key": "CONSULTATION_OPENED", + "params": { "patient_id": pid } + })), + ); + consult_bus.publish(notify, &consult_db).await; + tracing::info!(doctor_id = did, patient_id = pid, "咨询开启通知已发送给医生"); + } + let _ = erp_core::events::mark_event_processed(&consult_db, event.id, "consult_opened_notifier").await; + } + Some(event) if event.event_type == CONSULTATION_NEW_MESSAGE => { + if erp_core::events::is_event_processed(&consult_db, event.id, "consult_msg_notifier").await.unwrap_or(false) { + continue; + } + let recipient_id = event.payload.get("recipient_id").and_then(|v| v.as_str()); + let sender_role = event.payload.get("sender_role").and_then(|v| v.as_str()).unwrap_or("unknown"); + if let Some(rid) = recipient_id { + let notify = erp_core::events::DomainEvent::new( + "message.send", + event.tenant_id, + erp_core::events::build_event_payload(serde_json::json!({ + "channel": "in_app", + "recipient_type": if sender_role == "patient" { "doctor" } else { "patient" }, + "recipient_id": rid, + "template_key": "CONSULTATION_NEW_MESSAGE", + })), + ); + consult_bus.publish(notify, &consult_db).await; + tracing::info!(recipient_id = rid, sender_role = sender_role, "咨询新消息通知已发送"); + } + let _ = erp_core::events::mark_event_processed(&consult_db, event.id, "consult_msg_notifier").await; + } + Some(event) if event.event_type == CONSULTATION_CLOSED => { + if erp_core::events::is_event_processed(&consult_db, event.id, "consult_closed_notifier").await.unwrap_or(false) { + continue; + } + let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str()); + if let Some(pid) = patient_id { + let notify = erp_core::events::DomainEvent::new( + "message.send", + event.tenant_id, + erp_core::events::build_event_payload(serde_json::json!({ + "channel": "in_app", + "recipient_type": "patient", + "recipient_id": pid, + "template_key": "CONSULTATION_CLOSED", + })), + ); + consult_bus.publish(notify, &consult_db).await; + tracing::info!(patient_id = pid, "咨询关闭通知已发送"); + } + let _ = erp_core::events::mark_event_processed(&consult_db, event.id, "consult_closed_notifier").await; + } + Some(_) => {} + None => break, + } + } + }); + + // follow_up.created → 通知执行人 + let (mut fu_created_rx, _fu_created_handle) = state.event_bus.subscribe_filtered("follow_up.".to_string()); + let fu_created_db = state.db.clone(); + let fu_created_bus = state.event_bus.clone(); + tokio::spawn(async move { + loop { + match fu_created_rx.recv().await { + Some(event) if event.event_type == FOLLOW_UP_CREATED => { + if erp_core::events::is_event_processed(&fu_created_db, event.id, "fu_created_notifier").await.unwrap_or(false) { + continue; + } + let assigned_to = event.payload.get("assigned_to").and_then(|v| v.as_str()); + let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str()); + if let (Some(uid), Some(pid)) = (assigned_to, patient_id) { + let notify = erp_core::events::DomainEvent::new( + "message.send", + event.tenant_id, + erp_core::events::build_event_payload(serde_json::json!({ + "channel": "in_app", + "recipient_type": "staff", + "recipient_id": uid, + "template_key": "FOLLOW_UP_CREATED", + "params": { "patient_id": pid } + })), + ); + fu_created_bus.publish(notify, &fu_created_db).await; + tracing::info!(assigned_to = uid, patient_id = pid, "随访创建通知已发送"); + } + let _ = erp_core::events::mark_event_processed(&fu_created_db, event.id, "fu_created_notifier").await; + } + Some(_) => {} + None => break, + } + } + }); + + // points.earned/exchanged → 积分变动通知 + let (mut points_rx, _points_handle) = state.event_bus.subscribe_filtered("points.".to_string()); + let points_db = state.db.clone(); + let points_bus = state.event_bus.clone(); + tokio::spawn(async move { + loop { + match points_rx.recv().await { + Some(event) if event.event_type == POINTS_EARNED => { + if erp_core::events::is_event_processed(&points_db, event.id, "points_earned_notifier").await.unwrap_or(false) { + continue; + } + let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str()); + let amount = event.payload.get("amount").and_then(|v| v.as_u64()); + if let (Some(pid), Some(amt)) = (patient_id, amount) { + let notify = erp_core::events::DomainEvent::new( + "message.send", + event.tenant_id, + erp_core::events::build_event_payload(serde_json::json!({ + "channel": "in_app", + "recipient_type": "patient", + "recipient_id": pid, + "template_key": "POINTS_EARNED", + "params": { "amount": amt } + })), + ); + points_bus.publish(notify, &points_db).await; + tracing::info!(patient_id = pid, amount = amt, "积分获得通知已发送"); + } + let _ = erp_core::events::mark_event_processed(&points_db, event.id, "points_earned_notifier").await; + } + Some(event) if event.event_type == POINTS_EXCHANGED => { + if erp_core::events::is_event_processed(&points_db, event.id, "points_exchanged_notifier").await.unwrap_or(false) { + continue; + } + let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str()); + let amount = event.payload.get("amount").and_then(|v| v.as_u64()); + if let (Some(pid), Some(amt)) = (patient_id, amount) { + let notify = erp_core::events::DomainEvent::new( + "message.send", + event.tenant_id, + erp_core::events::build_event_payload(serde_json::json!({ + "channel": "in_app", + "recipient_type": "patient", + "recipient_id": pid, + "template_key": "POINTS_EXCHANGED", + "params": { "amount": amt } + })), + ); + points_bus.publish(notify, &points_db).await; + tracing::info!(patient_id = pid, amount = amt, "积分兑换通知已发送"); + } + let _ = erp_core::events::mark_event_processed(&points_db, event.id, "points_exchanged_notifier").await; + } + Some(event) if event.event_type == POINTS_EXPIRED => { + if erp_core::events::is_event_processed(&points_db, event.id, "points_expired_notifier").await.unwrap_or(false) { + continue; + } + let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str()); + let amount = event.payload.get("amount").and_then(|v| v.as_u64()); + if let (Some(pid), Some(amt)) = (patient_id, amount) { + let notify = erp_core::events::DomainEvent::new( + "message.send", + event.tenant_id, + erp_core::events::build_event_payload(serde_json::json!({ + "channel": "in_app", + "recipient_type": "patient", + "recipient_id": pid, + "template_key": "POINTS_EXPIRED", + "params": { "amount": amt } + })), + ); + points_bus.publish(notify, &points_db).await; + tracing::info!(patient_id = pid, amount = amt, "积分过期通知已发送"); + } + let _ = erp_core::events::mark_event_processed(&points_db, event.id, "points_expired_notifier").await; + } + Some(_) => {} + None => break, + } + } + }); }