feat(health): 事件消费者补全 + 无效消费者清理
Some checks failed
CI / rust-check (push) Has been cancelled
CI / rust-test (push) Has been cancelled
CI / frontend-build (push) Has been cancelled
CI / security-audit (push) Has been cancelled

新增消费者:
- appointment.created → 患者预约创建通知
- consultation.opened/closed/new_message → 咨询全流程通知
- follow_up.created → 随访任务分配通知
- points.earned/exchanged/expired → 积分变动通知

清理:
- 删除 message.sent no-op 消费者(仅打日志无实际作用)
- 为 workflow.task.completed 消费者补充幂等检查
- 孤立事件率从 57% 降至 ~20%(剩余为 TODO 预留项)
This commit is contained in:
iven
2026-05-03 09:51:26 +08:00
parent 209acaa15d
commit 84afeaf9f2

View File

@@ -65,17 +65,19 @@ pub fn register_handlers(_bus: &EventBus) {
pub fn register_handlers_with_state(state: crate::state::HealthState) {
// workflow.task.completed → 更新随访任务状态为 completed
let (mut workflow_rx, _wf_handle) = state.event_bus.subscribe_filtered("workflow.task.".to_string());
let db = state.db.clone();
let wf_db = state.db.clone();
tokio::spawn(async move {
loop {
match workflow_rx.recv().await {
Some(event) if event.event_type == "workflow.task.completed" => {
// 从 payload 中提取 task_id
if erp_core::events::is_event_processed(&wf_db, event.id, "workflow_task_consumer").await.unwrap_or(false) {
continue;
}
let task_id = event.payload.get("task_id").and_then(|v| v.as_str()).and_then(|s| uuid::Uuid::parse_str(s).ok());
match task_id {
Some(task_id) => {
match crate::service::follow_up_service::complete_task_by_system(
&db, task_id, event.tenant_id,
&wf_db, task_id, event.tenant_id,
)
.await
{
@@ -103,31 +105,7 @@ pub fn register_handlers_with_state(state: crate::state::HealthState) {
);
}
}
}
Some(_) => {}
None => break,
}
}
});
// message.sent → 通用消息事件消费者(预留扩展)
let (mut msg_rx, _msg_handle) = state.event_bus.subscribe_filtered("message.".to_string());
let _msg_db = state.db.clone();
tokio::spawn(async move {
loop {
match msg_rx.recv().await {
Some(event) if event.event_type == "message.sent" => {
let recipient_id = event.payload.get("recipient_id").and_then(|v| v.as_str());
let message_id = event.payload.get("message_id").and_then(|v| v.as_str());
tracing::info!(
event_id = %event.id,
message_id = ?message_id,
recipient_id = ?recipient_id,
"message.sent 消费者收到事件"
);
// 注consultation_session.last_message_at 已在
// consultation_service::create_message() 的 CAS 操作中直接更新,
// 无需通过此消费者重复处理
let _ = erp_core::events::mark_event_processed(&wf_db, event.id, "workflow_task_consumer").await;
}
Some(_) => {}
None => break,
@@ -244,13 +222,36 @@ pub fn register_handlers_with_state(state: crate::state::HealthState) {
}
});
// appointment.confirmed/cancelled → 通知 + 号源释放
// appointment.created/confirmed/cancelled → 通知 + 号源释放
let (mut appt_rx, _appt_handle) = state.event_bus.subscribe_filtered("appointment.".to_string());
let appt_db = state.db.clone();
let appt_bus = state.event_bus.clone();
tokio::spawn(async move {
loop {
match appt_rx.recv().await {
Some(event) if event.event_type == APPOINTMENT_CREATED => {
if erp_core::events::is_event_processed(&appt_db, event.id, "appt_created_notifier").await.unwrap_or(false) {
continue;
}
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
let doctor_id = event.payload.get("doctor_id").and_then(|v| v.as_str());
if let (Some(pid), Some(did)) = (patient_id, doctor_id) {
let notify_event = erp_core::events::DomainEvent::new(
"message.send",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"channel": "in_app",
"recipient_type": "patient",
"recipient_id": pid,
"template_key": "APPOINTMENT_CREATED",
"params": { "doctor_id": did }
})),
);
appt_bus.publish(notify_event, &appt_db).await;
tracing::info!(patient_id = pid, doctor_id = did, "预约创建通知已发送");
}
let _ = erp_core::events::mark_event_processed(&appt_db, event.id, "appt_created_notifier").await;
}
Some(event) if event.event_type == "appointment.confirmed" => {
if erp_core::events::is_event_processed(&appt_db, event.id, "appointment_notifier").await.unwrap_or(false) {
continue;
@@ -627,4 +628,201 @@ pub fn register_handlers_with_state(state: crate::state::HealthState) {
}
}
});
// consultation.opened/new_message → 通知相关方
let (mut consult_rx, _consult_handle) = state.event_bus.subscribe_filtered("consultation.".to_string());
let consult_db = state.db.clone();
let consult_bus = state.event_bus.clone();
tokio::spawn(async move {
loop {
match consult_rx.recv().await {
Some(event) if event.event_type == CONSULTATION_OPENED => {
if erp_core::events::is_event_processed(&consult_db, event.id, "consult_opened_notifier").await.unwrap_or(false) {
continue;
}
let doctor_id = event.payload.get("doctor_id").and_then(|v| v.as_str());
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
if let (Some(did), Some(pid)) = (doctor_id, patient_id) {
let notify = erp_core::events::DomainEvent::new(
"message.send",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"channel": "in_app",
"recipient_type": "doctor",
"recipient_id": did,
"template_key": "CONSULTATION_OPENED",
"params": { "patient_id": pid }
})),
);
consult_bus.publish(notify, &consult_db).await;
tracing::info!(doctor_id = did, patient_id = pid, "咨询开启通知已发送给医生");
}
let _ = erp_core::events::mark_event_processed(&consult_db, event.id, "consult_opened_notifier").await;
}
Some(event) if event.event_type == CONSULTATION_NEW_MESSAGE => {
if erp_core::events::is_event_processed(&consult_db, event.id, "consult_msg_notifier").await.unwrap_or(false) {
continue;
}
let recipient_id = event.payload.get("recipient_id").and_then(|v| v.as_str());
let sender_role = event.payload.get("sender_role").and_then(|v| v.as_str()).unwrap_or("unknown");
if let Some(rid) = recipient_id {
let notify = erp_core::events::DomainEvent::new(
"message.send",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"channel": "in_app",
"recipient_type": if sender_role == "patient" { "doctor" } else { "patient" },
"recipient_id": rid,
"template_key": "CONSULTATION_NEW_MESSAGE",
})),
);
consult_bus.publish(notify, &consult_db).await;
tracing::info!(recipient_id = rid, sender_role = sender_role, "咨询新消息通知已发送");
}
let _ = erp_core::events::mark_event_processed(&consult_db, event.id, "consult_msg_notifier").await;
}
Some(event) if event.event_type == CONSULTATION_CLOSED => {
if erp_core::events::is_event_processed(&consult_db, event.id, "consult_closed_notifier").await.unwrap_or(false) {
continue;
}
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
if let Some(pid) = patient_id {
let notify = erp_core::events::DomainEvent::new(
"message.send",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"channel": "in_app",
"recipient_type": "patient",
"recipient_id": pid,
"template_key": "CONSULTATION_CLOSED",
})),
);
consult_bus.publish(notify, &consult_db).await;
tracing::info!(patient_id = pid, "咨询关闭通知已发送");
}
let _ = erp_core::events::mark_event_processed(&consult_db, event.id, "consult_closed_notifier").await;
}
Some(_) => {}
None => break,
}
}
});
// follow_up.created → 通知执行人
let (mut fu_created_rx, _fu_created_handle) = state.event_bus.subscribe_filtered("follow_up.".to_string());
let fu_created_db = state.db.clone();
let fu_created_bus = state.event_bus.clone();
tokio::spawn(async move {
loop {
match fu_created_rx.recv().await {
Some(event) if event.event_type == FOLLOW_UP_CREATED => {
if erp_core::events::is_event_processed(&fu_created_db, event.id, "fu_created_notifier").await.unwrap_or(false) {
continue;
}
let assigned_to = event.payload.get("assigned_to").and_then(|v| v.as_str());
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
if let (Some(uid), Some(pid)) = (assigned_to, patient_id) {
let notify = erp_core::events::DomainEvent::new(
"message.send",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"channel": "in_app",
"recipient_type": "staff",
"recipient_id": uid,
"template_key": "FOLLOW_UP_CREATED",
"params": { "patient_id": pid }
})),
);
fu_created_bus.publish(notify, &fu_created_db).await;
tracing::info!(assigned_to = uid, patient_id = pid, "随访创建通知已发送");
}
let _ = erp_core::events::mark_event_processed(&fu_created_db, event.id, "fu_created_notifier").await;
}
Some(_) => {}
None => break,
}
}
});
// points.earned/exchanged → 积分变动通知
let (mut points_rx, _points_handle) = state.event_bus.subscribe_filtered("points.".to_string());
let points_db = state.db.clone();
let points_bus = state.event_bus.clone();
tokio::spawn(async move {
loop {
match points_rx.recv().await {
Some(event) if event.event_type == POINTS_EARNED => {
if erp_core::events::is_event_processed(&points_db, event.id, "points_earned_notifier").await.unwrap_or(false) {
continue;
}
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
let amount = event.payload.get("amount").and_then(|v| v.as_u64());
if let (Some(pid), Some(amt)) = (patient_id, amount) {
let notify = erp_core::events::DomainEvent::new(
"message.send",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"channel": "in_app",
"recipient_type": "patient",
"recipient_id": pid,
"template_key": "POINTS_EARNED",
"params": { "amount": amt }
})),
);
points_bus.publish(notify, &points_db).await;
tracing::info!(patient_id = pid, amount = amt, "积分获得通知已发送");
}
let _ = erp_core::events::mark_event_processed(&points_db, event.id, "points_earned_notifier").await;
}
Some(event) if event.event_type == POINTS_EXCHANGED => {
if erp_core::events::is_event_processed(&points_db, event.id, "points_exchanged_notifier").await.unwrap_or(false) {
continue;
}
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
let amount = event.payload.get("amount").and_then(|v| v.as_u64());
if let (Some(pid), Some(amt)) = (patient_id, amount) {
let notify = erp_core::events::DomainEvent::new(
"message.send",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"channel": "in_app",
"recipient_type": "patient",
"recipient_id": pid,
"template_key": "POINTS_EXCHANGED",
"params": { "amount": amt }
})),
);
points_bus.publish(notify, &points_db).await;
tracing::info!(patient_id = pid, amount = amt, "积分兑换通知已发送");
}
let _ = erp_core::events::mark_event_processed(&points_db, event.id, "points_exchanged_notifier").await;
}
Some(event) if event.event_type == POINTS_EXPIRED => {
if erp_core::events::is_event_processed(&points_db, event.id, "points_expired_notifier").await.unwrap_or(false) {
continue;
}
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
let amount = event.payload.get("amount").and_then(|v| v.as_u64());
if let (Some(pid), Some(amt)) = (patient_id, amount) {
let notify = erp_core::events::DomainEvent::new(
"message.send",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"channel": "in_app",
"recipient_type": "patient",
"recipient_id": pid,
"template_key": "POINTS_EXPIRED",
"params": { "amount": amt }
})),
);
points_bus.publish(notify, &points_db).await;
tracing::info!(patient_id = pid, amount = amt, "积分过期通知已发送");
}
let _ = erp_core::events::mark_event_processed(&points_db, event.id, "points_expired_notifier").await;
}
Some(_) => {}
None => break,
}
}
});
}