/// points.earned/exchanged/expired → 积分变动通知 /// daily_monitoring.created → 健康数据上报积分 /// lab_report.uploaded → 化验报告上传积分 /// follow_up.completed → 随访完成积分 pub fn spawn(state: &crate::state::HealthState) -> Vec { let mut handles = Vec::new(); let (mut points_rx, points_handle) = state.event_bus.subscribe_filtered("points.".to_string()); handles.push(points_handle); let points_db = state.db.clone(); let points_bus = state.event_bus.clone(); tokio::spawn(async move { loop { match points_rx.recv().await { Some(event) if event.event_type == super::POINTS_EARNED => { if erp_core::events::is_event_processed( &points_db, event.id, "points_earned_notifier", ) .await .unwrap_or(false) { continue; } let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str()); let amount = event.payload.get("amount").and_then(|v| v.as_u64()); if let (Some(pid), Some(amt)) = (patient_id, amount) { let notify = erp_core::events::DomainEvent::new( "message.send", event.tenant_id, erp_core::events::build_event_payload(serde_json::json!({ "channel": "in_app", "recipient_type": "patient", "recipient_id": pid, "template_key": "POINTS_EARNED", "params": { "amount": amt } })), ); points_bus.publish(notify, &points_db).await; tracing::info!(patient_id = pid, amount = amt, "积分获得通知已发送"); } let _ = erp_core::events::mark_event_processed( &points_db, event.id, "points_earned_notifier", ) .await; } Some(event) if event.event_type == super::POINTS_EXCHANGED => { if erp_core::events::is_event_processed( &points_db, event.id, "points_exchanged_notifier", ) .await .unwrap_or(false) { continue; } let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str()); let amount = event.payload.get("amount").and_then(|v| v.as_u64()); if let (Some(pid), Some(amt)) = (patient_id, amount) { let notify = erp_core::events::DomainEvent::new( "message.send", event.tenant_id, erp_core::events::build_event_payload(serde_json::json!({ "channel": "in_app", "recipient_type": "patient", "recipient_id": pid, "template_key": "POINTS_EXCHANGED", "params": { "amount": amt } })), ); points_bus.publish(notify, &points_db).await; tracing::info!(patient_id = pid, amount = amt, "积分兑换通知已发送"); } let _ = erp_core::events::mark_event_processed( &points_db, event.id, "points_exchanged_notifier", ) .await; } Some(event) if event.event_type == super::POINTS_EXPIRED => { if erp_core::events::is_event_processed( &points_db, event.id, "points_expired_notifier", ) .await .unwrap_or(false) { continue; } let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str()); let amount = event.payload.get("amount").and_then(|v| v.as_u64()); if let (Some(pid), Some(amt)) = (patient_id, amount) { let notify = erp_core::events::DomainEvent::new( "message.send", event.tenant_id, erp_core::events::build_event_payload(serde_json::json!({ "channel": "in_app", "recipient_type": "patient", "recipient_id": pid, "template_key": "POINTS_EXPIRED", "params": { "amount": amt } })), ); points_bus.publish(notify, &points_db).await; tracing::info!(patient_id = pid, amount = amt, "积分过期通知已发送"); } let _ = erp_core::events::mark_event_processed( &points_db, event.id, "points_expired_notifier", ) .await; } Some(_) => {} None => break, } } }); // daily_monitoring.created → 健康数据上报积分 let (mut dm_rx, dm_handle) = state .event_bus .subscribe_filtered("daily_monitoring.".to_string()); handles.push(dm_handle); let dm_state = state.clone(); tokio::spawn(async move { loop { match dm_rx.recv().await { Some(event) if event.event_type == super::DAILY_MONITORING_CREATED => { if erp_core::events::is_event_processed( &dm_state.db, event.id, "daily_monitoring_points", ) .await .unwrap_or(false) { continue; } let patient_id = event .payload .get("patient_id") .and_then(|v| v.as_str()) .and_then(|s| uuid::Uuid::parse_str(s).ok()); if let Some(pid) = patient_id { match crate::service::points_service::earn_points( &dm_state, event.tenant_id, pid, "health_data_report", None, ) .await { Ok(tx) => { tracing::info!( patient_id = %pid, points = tx.amount, "健康数据上报积分已发放" ); } Err(e) => { // 无匹配规则时不告警(租户可能未配置该规则) let err_str = e.to_string(); if !err_str.contains("无匹配的积分规则") { tracing::warn!( patient_id = %pid, error = %e, "健康数据上报积分发放失败" ); } } } } let _ = erp_core::events::mark_event_processed( &dm_state.db, event.id, "daily_monitoring_points", ) .await; } Some(_) => {} None => break, } } }); // lab_report.uploaded → 化验报告上传积分 let (mut lr_rx, lr_handle) = state .event_bus .subscribe_filtered("lab_report.".to_string()); handles.push(lr_handle); let lr_state = state.clone(); tokio::spawn(async move { loop { match lr_rx.recv().await { Some(event) if event.event_type == super::LAB_REPORT_UPLOADED => { if erp_core::events::is_event_processed( &lr_state.db, event.id, "lab_report_points", ) .await .unwrap_or(false) { continue; } let patient_id = event .payload .get("patient_id") .and_then(|v| v.as_str()) .and_then(|s| uuid::Uuid::parse_str(s).ok()); if let Some(pid) = patient_id { match crate::service::points_service::earn_points( &lr_state, event.tenant_id, pid, "lab_report_upload", None, ) .await { Ok(tx) => { tracing::info!( patient_id = %pid, points = tx.amount, "化验报告上传积分已发放" ); } Err(e) => { let err_str = e.to_string(); if !err_str.contains("无匹配的积分规则") { tracing::warn!( patient_id = %pid, error = %e, "化验报告上传积分发放失败" ); } } } } let _ = erp_core::events::mark_event_processed( &lr_state.db, event.id, "lab_report_points", ) .await; } Some(_) => {} None => break, } } }); // follow_up.completed → 随访完成积分 let (mut fu_rx, fu_handle) = state.event_bus.subscribe_filtered("follow_up.".to_string()); handles.push(fu_handle); let fu_state = state.clone(); tokio::spawn(async move { loop { match fu_rx.recv().await { Some(event) if event.event_type == super::FOLLOW_UP_COMPLETED => { if erp_core::events::is_event_processed( &fu_state.db, event.id, "follow_up_points", ) .await .unwrap_or(false) { continue; } let patient_id = event .payload .get("patient_id") .and_then(|v| v.as_str()) .and_then(|s| uuid::Uuid::parse_str(s).ok()); if let Some(pid) = patient_id { match crate::service::points_service::earn_points( &fu_state, event.tenant_id, pid, "follow_up_completion", None, ) .await { Ok(tx) => { tracing::info!( patient_id = %pid, points = tx.amount, "随访完成积分已发放" ); } Err(e) => { let err_str = e.to_string(); if !err_str.contains("无匹配的积分规则") { tracing::warn!( patient_id = %pid, error = %e, "随访完成积分发放失败" ); } } } } let _ = erp_core::events::mark_event_processed( &fu_state.db, event.id, "follow_up_points", ) .await; } Some(_) => {} None => break, } } }); handles }