diff --git a/crates/erp-health/src/event.rs b/crates/erp-health/src/event.rs deleted file mode 100644 index 88cda7a..0000000 --- a/crates/erp-health/src/event.rs +++ /dev/null @@ -1,2871 +0,0 @@ -use erp_core::events::EventBus; -use uuid::Uuid; - -// --------------------------------------------------------------------------- -// 事件类型常量 — 集中管理,避免硬编码字符串散布在 service 层 -// --------------------------------------------------------------------------- - -// 预约 -pub const APPOINTMENT_CREATED: &str = "appointment.created"; -// appointment.confirmed / appointment.cancelled 等 — 动态拼接 - -// 告警 -pub const ALERT_TRIGGERED: &str = "alert.triggered"; -pub const ALERT_AGGREGATED: &str = "alert.aggregated"; - -// 知情同意 -pub const CONSENT_GRANTED: &str = "consent.granted"; -pub const CONSENT_REVOKED: &str = "consent.revoked"; - -// 文章 -pub const ARTICLE_PUBLISHED: &str = "article.published"; -pub const ARTICLE_REJECTED: &str = "article.rejected"; - -// 咨询 -pub const CONSULTATION_OPENED: &str = "consultation.opened"; -pub const CONSULTATION_CLOSED: &str = "consultation.closed"; -pub const CONSULTATION_NEW_MESSAGE: &str = "consultation.new_message"; - -// 设备数据 -pub const DEVICE_READINGS_SYNCED: &str = "device.readings.synced"; - -// 医生 -pub const DOCTOR_ONLINE_STATUS_CHANGED: &str = "doctor.online_status_changed"; - -// 随访 -pub const FOLLOW_UP_CREATED: &str = "follow_up.created"; -pub const FOLLOW_UP_COMPLETED: &str = "follow_up.completed"; -pub const FOLLOW_UP_OVERDUE: &str = "follow_up.overdue"; - -// 日常监测 -pub const DAILY_MONITORING_CREATED: &str = "daily_monitoring.created"; - -// 健康数据 -pub const LAB_REPORT_UPLOADED: &str = "lab_report.uploaded"; -pub const LAB_REPORT_REVIEWED: &str = "lab_report.reviewed"; -pub const HEALTH_DATA_CRITICAL_ALERT: &str = "health_data.critical_alert"; - -// 患者 -pub const PATIENT_CREATED: &str = "patient.created"; -pub const PATIENT_UPDATED: &str = "patient.updated"; -// TODO: 以下常量对应的患者认证和死亡记录流程尚未实现,待后续迭代 -pub const PATIENT_VERIFIED: &str = "patient.verified"; -pub const PATIENT_DECEASED: &str = "patient.deceased"; - -// 积分 -pub const POINTS_EXPIRED: &str = "points.expired"; -pub const POINTS_EARNED: &str = "points.earned"; -pub const POINTS_EXCHANGED: &str = "points.exchanged"; - -// 护理计划 -pub const CARE_PLAN_CREATED: &str = "care_plan.created"; -pub const CARE_PLAN_UPDATED: &str = "care_plan.updated"; -pub const CARE_PLAN_ACTIVATED: &str = "care_plan.activated"; -pub const CARE_PLAN_COMPLETED: &str = "care_plan.completed"; - -// 关怀行动 -pub const CARE_ACTION_PERFORMED: &str = "care.action.performed"; - -/// 兼容旧签名 — 不做任何实际订阅(逻辑已迁移到 on_startup) -pub fn register_handlers(_bus: &EventBus) { - // 事件处理器已迁移到 on_startup → register_handlers_with_state -} - -/// 带 HealthState 的事件订阅 — 在模块 on_startup 时调用 -pub fn register_handlers_with_state(state: crate::state::HealthState) { - // 收集所有 SubscriptionHandle 并 forget,防止函数返回时 handle 被 drop - // 导致 cancel channel 关闭、过滤任务退出、消费者全部失效 - let mut _handles: Vec = Vec::new(); - - // workflow.task.completed → 更新随访任务状态为 completed - let (mut workflow_rx, wf_handle) = state - .event_bus - .subscribe_filtered("workflow.task.".to_string()); - _handles.push(wf_handle); - let wf_db = state.db.clone(); - tokio::spawn(async move { - loop { - match workflow_rx.recv().await { - Some(event) if event.event_type == "workflow.task.completed" => { - if erp_core::events::is_event_processed( - &wf_db, - event.id, - "workflow_task_consumer", - ) - .await - .unwrap_or(false) - { - continue; - } - let task_id = event - .payload - .get("task_id") - .and_then(|v| v.as_str()) - .and_then(|s| uuid::Uuid::parse_str(s).ok()); - match task_id { - Some(task_id) => { - match crate::service::follow_up_service::complete_task_by_system( - &wf_db, - task_id, - event.tenant_id, - ) - .await - { - Ok(()) => { - tracing::info!( - event_id = %event.id, - task_id = %task_id, - "工作流任务完成 → 随访任务已更新" - ); - } - Err(e) => { - tracing::warn!( - event_id = %event.id, - task_id = %task_id, - error = %e, - "工作流任务完成 → 随访任务更新失败" - ); - } - } - } - None => { - tracing::warn!( - event_id = %event.id, - "工作流任务完成事件缺少 task_id,跳过" - ); - } - } - let _ = erp_core::events::mark_event_processed( - &wf_db, - event.id, - "workflow_task_consumer", - ) - .await; - } - Some(_) => {} - None => break, - } - } - }); - - // device.readings.synced → 触发告警引擎评估 - let (mut reading_rx, reading_handle) = state - .event_bus - .subscribe_filtered("device.readings.".to_string()); - _handles.push(reading_handle); - let eval_state = state.clone(); - tokio::spawn(async move { - loop { - match reading_rx.recv().await { - Some(event) if event.event_type == DEVICE_READINGS_SYNCED => { - let patient_id = event - .payload - .get("patient_id") - .and_then(|v| v.as_str()) - .and_then(|s| Uuid::parse_str(s).ok()); - if let Some(pid) = patient_id { - // 对所有设备类型触发评估 - for device_type in &[ - "heart_rate", - "blood_oxygen", - "temperature", - "blood_pressure", - "blood_glucose", - ] { - if let Err(e) = crate::service::alert_engine::evaluate_rules( - &eval_state, - event.tenant_id, - pid, - device_type, - ) - .await - { - tracing::error!( - patient_id = %pid, - device_type = device_type, - error = %e, - "告警评估失败" - ); - } - } - } - } - Some(_) => {} - None => break, - } - } - }); - - // ── P1 事件消费者补全 ── - - // alert.triggered → 告警消息通知 - let (mut alert_rx, alert_handle) = state.event_bus.subscribe_filtered("alert.".to_string()); - _handles.push(alert_handle); - let alert_db = state.db.clone(); - let alert_bus = state.event_bus.clone(); - tokio::spawn(async move { - loop { - match alert_rx.recv().await { - Some(event) if event.event_type == ALERT_TRIGGERED => { - if erp_core::events::is_event_processed(&alert_db, event.id, "alert_notifier") - .await - .unwrap_or(false) - { - continue; - } - let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str()); - let severity = event - .payload - .get("severity") - .and_then(|v| v.as_str()) - .unwrap_or("warning"); - let rule_name = event - .payload - .get("rule_name") - .and_then(|v| v.as_str()) - .unwrap_or("健康告警"); - if let Some(pid) = patient_id { - let notify_event = erp_core::events::DomainEvent::new( - "message.send", - event.tenant_id, - erp_core::events::build_event_payload(serde_json::json!({ - "channel": "in_app", - "recipient_type": "patient", - "recipient_id": pid, - "template_key": if severity == "critical" { "CRITICAL_HEALTH_ALERT" } else { "HEALTH_DATA_ABNORMAL" }, - "params": { - "rule_name": rule_name, - "severity": severity, - } - })), - ); - alert_bus.publish(notify_event, &alert_db).await; - tracing::info!(patient_id = %pid, severity = %severity, "告警通知已发送"); - } - let _ = erp_core::events::mark_event_processed( - &alert_db, - event.id, - "alert_notifier", - ) - .await; - } - Some(event) if event.event_type == ALERT_TRIGGERED => { - // 被抑制的告警 → 发布聚合事件 - if erp_core::events::is_event_processed(&alert_db, event.id, "alert_aggregator") - .await - .unwrap_or(false) - { - continue; - } - let is_suppressed = event - .payload - .get("suppressed") - .and_then(|v| v.as_bool()) - .unwrap_or(false); - - if is_suppressed { - let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str()); - if let Some(pid) = patient_id { - let aggregated_event = erp_core::events::DomainEvent::new( - ALERT_AGGREGATED, - event.tenant_id, - erp_core::events::build_event_payload(serde_json::json!({ - "patient_id": pid, - "triggering_alert_id": event.payload.get("alert_id").and_then(|v| v.as_str()), - "severity": event.payload.get("severity"), - })), - ); - alert_bus.publish(aggregated_event, &alert_db).await; - tracing::info!(patient_id = %pid, "告警聚合事件已发布"); - } - } - let _ = erp_core::events::mark_event_processed( - &alert_db, - event.id, - "alert_aggregator", - ) - .await; - } - Some(_) => {} - None => break, - } - } - }); - - // patient.created → 欢迎消息通知 - let (mut patient_rx, patient_handle) = - state.event_bus.subscribe_filtered("patient.".to_string()); - _handles.push(patient_handle); - let patient_db = state.db.clone(); - let patient_bus = state.event_bus.clone(); - tokio::spawn(async move { - loop { - match patient_rx.recv().await { - Some(event) if event.event_type == PATIENT_CREATED => { - if erp_core::events::is_event_processed( - &patient_db, - event.id, - "patient_welcome", - ) - .await - .unwrap_or(false) - { - continue; - } - let patient_id = event - .payload - .get("patient_id") - .and_then(|v| v.as_str()) - .and_then(|s| Uuid::parse_str(s).ok()); - if let Some(pid) = patient_id { - let welcome_event = erp_core::events::DomainEvent::new( - "message.send", - event.tenant_id, - erp_core::events::build_event_payload(serde_json::json!({ - "template": "patient_welcome", - "recipient_type": "patient", - "recipient_id": pid, - })), - ); - patient_bus.publish(welcome_event, &patient_db).await; - tracing::info!(patient_id = %pid, "新患者欢迎流程触发"); - } - let _ = erp_core::events::mark_event_processed( - &patient_db, - event.id, - "patient_welcome", - ) - .await; - } - Some(_) => {} - None => break, - } - } - }); - - // appointment.created/confirmed/cancelled → 通知 + 号源释放 - let (mut appt_rx, appt_handle) = state - .event_bus - .subscribe_filtered("appointment.".to_string()); - _handles.push(appt_handle); - let appt_db = state.db.clone(); - let appt_bus = state.event_bus.clone(); - tokio::spawn(async move { - loop { - match appt_rx.recv().await { - Some(event) if event.event_type == APPOINTMENT_CREATED => { - if erp_core::events::is_event_processed( - &appt_db, - event.id, - "appt_created_notifier", - ) - .await - .unwrap_or(false) - { - continue; - } - let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str()); - let doctor_id = event.payload.get("doctor_id").and_then(|v| v.as_str()); - if let (Some(pid), Some(did)) = (patient_id, doctor_id) { - let notify_event = erp_core::events::DomainEvent::new( - "message.send", - event.tenant_id, - erp_core::events::build_event_payload(serde_json::json!({ - "channel": "in_app", - "recipient_type": "patient", - "recipient_id": pid, - "template_key": "APPOINTMENT_CREATED", - "params": { "doctor_id": did } - })), - ); - appt_bus.publish(notify_event, &appt_db).await; - tracing::info!(patient_id = pid, doctor_id = did, "预约创建通知已发送"); - } - let _ = erp_core::events::mark_event_processed( - &appt_db, - event.id, - "appt_created_notifier", - ) - .await; - } - Some(event) if event.event_type == "appointment.confirmed" => { - if erp_core::events::is_event_processed( - &appt_db, - event.id, - "appointment_notifier", - ) - .await - .unwrap_or(false) - { - continue; - } - let doctor_id = event.payload.get("doctor_id").and_then(|v| v.as_str()); - let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str()); - if let (Some(did), Some(pid)) = (doctor_id, patient_id) { - let notify_event = erp_core::events::DomainEvent::new( - "message.send", - event.tenant_id, - erp_core::events::build_event_payload(serde_json::json!({ - "template": "appointment_confirmed", - "recipient_type": "doctor", - "recipient_id": did, - "patient_id": pid, - })), - ); - appt_bus.publish(notify_event, &appt_db).await; - tracing::info!(doctor_id = did, patient_id = pid, "预约确认通知触发"); - } - let _ = erp_core::events::mark_event_processed( - &appt_db, - event.id, - "appointment_notifier", - ) - .await; - } - Some(event) if event.event_type == "appointment.cancelled" => { - if erp_core::events::is_event_processed( - &appt_db, - event.id, - "appointment_cancel_handler", - ) - .await - .unwrap_or(false) - { - continue; - } - tracing::info!(event_id = %event.id, "预约取消,号源释放"); - let _ = erp_core::events::mark_event_processed( - &appt_db, - event.id, - "appointment_cancel_handler", - ) - .await; - } - Some(_) => {} - None => break, - } - } - }); - - // follow_up.overdue → 升级通知 - let (mut fu_rx, fu_handle) = state.event_bus.subscribe_filtered("follow_up.".to_string()); - _handles.push(fu_handle); - let fu_db = state.db.clone(); - let fu_bus = state.event_bus.clone(); - tokio::spawn(async move { - loop { - match fu_rx.recv().await { - Some(event) if event.event_type == FOLLOW_UP_OVERDUE => { - if erp_core::events::is_event_processed(&fu_db, event.id, "follow_up_escalator") - .await - .unwrap_or(false) - { - continue; - } - let task_id = event.payload.get("task_id").and_then(|v| v.as_str()); - let assigned_to = event.payload.get("assigned_to").and_then(|v| v.as_str()); - if let (Some(tid), Some(uid)) = (task_id, assigned_to) { - let escalate_event = erp_core::events::DomainEvent::new( - "message.send", - event.tenant_id, - erp_core::events::build_event_payload(serde_json::json!({ - "template": "follow_up_overdue", - "recipient_type": "staff", - "recipient_id": uid, - "task_id": tid, - })), - ); - fu_bus.publish(escalate_event, &fu_db).await; - tracing::warn!(task_id = tid, assigned_to = uid, "随访逾期升级通知"); - } - let _ = erp_core::events::mark_event_processed( - &fu_db, - event.id, - "follow_up_escalator", - ) - .await; - } - Some(event) if event.event_type == FOLLOW_UP_COMPLETED => { - // 随访完成 → 检查是否由 AI 触发,触发再分析 - if let Some(task_id_str) = event.payload.get("task_id").and_then(|v| v.as_str()) - && let Ok(task_id) = uuid::Uuid::parse_str(task_id_str) - { - let patient_id = event - .payload - .get("patient_id") - .and_then(|v| v.as_str()) - .and_then(|s| uuid::Uuid::parse_str(s).ok()); - - if let Some(patient_id) = patient_id { - // 通过 ai_suggestion_loader 查找关联的 AI 建议 - if let Some(suggestion_id) = - crate::service::ai_suggestion_loader::find_by_followup_task( - &fu_db, - event.tenant_id, - task_id, - ) - .await - .unwrap_or(None) - { - let reanalysis_event = erp_core::events::DomainEvent::new( - "ai.reanalysis.requested", - event.tenant_id, - erp_core::events::build_event_payload(serde_json::json!({ - "original_suggestion_id": suggestion_id.to_string(), - "patient_id": patient_id.to_string(), - "followup_task_id": task_id_str, - "trigger": "loop_closure", - })), - ); - fu_bus.publish(reanalysis_event, &fu_db).await; - tracing::info!( - suggestion_id = %suggestion_id, - patient_id = %patient_id, - task_id = %task_id, - "随访完成,触发 AI 再分析(闭环)" - ); - } - } - } - } - Some(_) => {} - None => break, - } - } - }); - - // health_data.critical_alert → 创建危急值告警记录 - let (mut critical_rx, critical_handle) = state - .event_bus - .subscribe_filtered("health_data.".to_string()); - _handles.push(critical_handle); - let critical_state = state.clone(); - tokio::spawn(async move { - loop { - match critical_rx.recv().await { - Some(event) if event.event_type == HEALTH_DATA_CRITICAL_ALERT => { - // 幂等检查 - if erp_core::events::is_event_processed( - &critical_state.db, - event.id, - "critical_alert_consumer", - ) - .await - .unwrap_or(false) - { - continue; - } - - let patient_id = event - .payload - .get("patient_id") - .and_then(|v| v.as_str()) - .and_then(|s| Uuid::parse_str(s).ok()); - // alert 数据在嵌套的 "alert" 对象中 - let alert_obj = event.payload.get("alert"); - let alert_type = "vital_sign"; - let metric_name = alert_obj - .and_then(|a| a.get("indicator")) - .and_then(|v| v.as_str()) - .unwrap_or("unknown"); - let metric_value = alert_obj - .and_then(|a| a.get("value")) - .and_then(|v| v.as_f64()) - .map(|v| v.to_string()) - .unwrap_or_default(); - let threshold_value = alert_obj - .and_then(|a| a.get("threshold")) - .and_then(|v| v.as_f64()) - .map(|v| v.to_string()) - .unwrap_or_default(); - let severity = alert_obj - .and_then(|a| a.get("level")) - .and_then(|v| v.as_str()) - .unwrap_or("critical"); - - if let Some(pid) = patient_id { - match crate::service::critical_alert_service::handle_critical_alert_event( - &critical_state, - event.tenant_id, - pid, - alert_type, - metric_name, - &metric_value, - &threshold_value, - None, - severity, - ) - .await - { - Ok(alert_id) => { - tracing::info!( - event_id = %event.id, - alert_id = %alert_id, - patient_id = %pid, - metric = %metric_name, - "危急值告警已创建" - ); - let _ = erp_core::events::mark_event_processed( - &critical_state.db, - event.id, - "critical_alert_consumer", - ) - .await; - } - Err(e) => { - tracing::error!( - event_id = %event.id, - patient_id = %pid, - error = %e, - "危急值告警创建失败" - ); - } - } - } - } - Some(_) => {} - None => break, - } - } - }); - - // ai.analysis.completed → 通知关联医生 - let (mut ai_rx, ai_handle) = state.event_bus.subscribe_filtered("ai.".to_string()); - _handles.push(ai_handle); - let ai_db = state.db.clone(); - let ai_bus = state.event_bus.clone(); - tokio::spawn(async move { - loop { - match ai_rx.recv().await { - Some(event) if event.event_type == "ai.analysis.completed" => { - if erp_core::events::is_event_processed( - &ai_db, - event.id, - "ai_analysis_notifier", - ) - .await - .unwrap_or(false) - { - continue; - } - let analysis_id = event.payload.get("analysis_id").and_then(|v| v.as_str()); - let analysis_type = event - .payload - .get("analysis_type") - .and_then(|v| v.as_str()) - .unwrap_or("unknown"); - let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str()); - let doctor_id = event.payload.get("doctor_id").and_then(|v| v.as_str()); - - if let (Some(did), Some(pid)) = (doctor_id, patient_id) { - let notify = erp_core::events::DomainEvent::new( - "message.send", - event.tenant_id, - erp_core::events::build_event_payload(serde_json::json!({ - "channel": "in_app", - "recipient_type": "doctor", - "recipient_id": did, - "template_key": "AI_ANALYSIS_COMPLETED", - "params": { - "analysis_type": analysis_type, - "patient_id": pid, - } - })), - ); - ai_bus.publish(notify, &ai_db).await; - tracing::info!( - analysis_id = ?analysis_id, - analysis_type = %analysis_type, - doctor_id = %did, - "AI 分析完成通知已发送给医生" - ); - } else { - tracing::info!( - analysis_id = ?analysis_id, - analysis_type = %analysis_type, - "AI 分析完成(缺少关联信息,跳过通知)" - ); - } - let _ = erp_core::events::mark_event_processed( - &ai_db, - event.id, - "ai_analysis_notifier", - ) - .await; - } - Some(event) if event.event_type == "dialysis.record.created" => { - if erp_core::events::is_event_processed(&ai_db, event.id, "dialysis_notifier") - .await - .unwrap_or(false) - { - continue; - } - let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str()); - let record_id = event.payload.get("record_id").and_then(|v| v.as_str()); - tracing::info!( - record_id = ?record_id, - patient_id = ?patient_id, - "透析记录已创建,触发 KDIGO 自动评估" - ); - - // H4: 透析→KDIGO 自动串联 — 发布事件让 AI 模块执行风险评估 - if let (Some(pid), Some(rid)) = (patient_id, record_id) { - let kdigo_event = erp_core::events::DomainEvent::new( - "ai.dialysis.kdigo_requested", - event.tenant_id, - erp_core::events::build_event_payload(serde_json::json!({ - "patient_id": pid, - "dialysis_record_id": rid, - "source": "dialysis_notifier", - })), - ); - ai_bus.publish(kdigo_event, &ai_db).await; - } - - let _ = erp_core::events::mark_event_processed( - &ai_db, - event.id, - "dialysis_notifier", - ) - .await; - } - Some(_) => {} - None => break, - } - } - }); - - // ai.analysis.completed → AI→行动闭环消费者(行动分发) - let (mut ai_action_rx, ai_action_handle) = - state.event_bus.subscribe_filtered("ai.".to_string()); - _handles.push(ai_action_handle); - let action_db = state.db.clone(); - let action_event_bus = state.event_bus.clone(); - tokio::spawn(async move { - loop { - match ai_action_rx.recv().await { - Some(event) if event.event_type == "ai.analysis.completed" => { - if erp_core::events::is_event_processed( - &action_db, - event.id, - "ai_action_dispatcher", - ) - .await - .unwrap_or(false) - { - continue; - } - - let tenant_id = event.tenant_id; - let analysis_id = event - .payload - .get("analysis_id") - .and_then(|v| v.as_str()) - .and_then(|s| Uuid::parse_str(s).ok()); - let patient_id = event - .payload - .get("patient_id") - .and_then(|v| v.as_str()) - .and_then(|s| Uuid::parse_str(s).ok()); - let doctor_id = event - .payload - .get("doctor_id") - .and_then(|v| v.as_str()) - .and_then(|s| Uuid::parse_str(s).ok()); - let risk_level = event - .payload - .get("risk_level") - .and_then(|v| v.as_str()) - .unwrap_or("medium"); - let suggestion_count = event - .payload - .get("suggestion_count") - .and_then(|v| v.as_u64()) - .unwrap_or(0); - - if suggestion_count > 0 - && let (Some(aid), Some(pid)) = (analysis_id, patient_id) - { - let loader_result: Result, sea_orm::DbErr> = - crate::service::ai_suggestion_loader::load_by_analysis( - &action_db, tenant_id, aid, - ) - .await; - match loader_result { - Ok(suggestions) if !suggestions.is_empty() => { - crate::service::ai_action_dispatcher::handle_ai_suggestions( - &action_db, - &action_event_bus, - tenant_id, - aid, - pid, - doctor_id, - &suggestions, - risk_level, - ) - .await; - tracing::info!( - analysis_id = %aid, - patient_id = %pid, - suggestion_count = suggestions.len(), - risk_level = %risk_level, - "AI 行动分发完成" - ); - } - Ok(_) => { - tracing::info!(analysis_id = %aid, "建议列表为空,跳过行动分发"); - } - Err(e) => { - tracing::warn!( - analysis_id = %aid, - error = %e, - "加载建议列表失败" - ); - } - } - } - - let _ = erp_core::events::mark_event_processed( - &action_db, - event.id, - "ai_action_dispatcher", - ) - .await; - } - Some(_) => {} - None => break, - } - } - }); - - // consent.granted/revoked → 通知关联医生 - let (mut consent_rx, consent_handle) = - state.event_bus.subscribe_filtered("consent.".to_string()); - _handles.push(consent_handle); - let consent_db = state.db.clone(); - let consent_bus = state.event_bus.clone(); - tokio::spawn(async move { - loop { - match consent_rx.recv().await { - Some(event) if event.event_type == CONSENT_GRANTED => { - if erp_core::events::is_event_processed( - &consent_db, - event.id, - "consent_notifier", - ) - .await - .unwrap_or(false) - { - continue; - } - let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str()); - let consent_type = event - .payload - .get("consent_type") - .and_then(|v| v.as_str()) - .unwrap_or("unknown"); - if let Some(pid) = patient_id { - let notify_event = erp_core::events::DomainEvent::new( - "message.send", - event.tenant_id, - erp_core::events::build_event_payload(serde_json::json!({ - "channel": "in_app", - "recipient_type": "patient", - "recipient_id": pid, - "template_key": "CONSENT_GRANTED", - "params": { - "consent_type": consent_type, - } - })), - ); - consent_bus.publish(notify_event, &consent_db).await; - tracing::info!(patient_id = %pid, consent_type = %consent_type, "知情同意授予通知已发送"); - } - let _ = erp_core::events::mark_event_processed( - &consent_db, - event.id, - "consent_notifier", - ) - .await; - } - Some(event) if event.event_type == CONSENT_REVOKED => { - if erp_core::events::is_event_processed( - &consent_db, - event.id, - "consent_notifier", - ) - .await - .unwrap_or(false) - { - continue; - } - let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str()); - let consent_type = event - .payload - .get("consent_type") - .and_then(|v| v.as_str()) - .unwrap_or("unknown"); - if let Some(pid) = patient_id { - let notify_event = erp_core::events::DomainEvent::new( - "message.send", - event.tenant_id, - erp_core::events::build_event_payload(serde_json::json!({ - "channel": "in_app", - "recipient_type": "staff", - "template_key": "CONSENT_REVOKED", - "params": { - "patient_id": pid, - "consent_type": consent_type, - } - })), - ); - consent_bus.publish(notify_event, &consent_db).await; - tracing::warn!(patient_id = %pid, consent_type = %consent_type, "知情同意撤回通知已发送给医护"); - } - let _ = erp_core::events::mark_event_processed( - &consent_db, - event.id, - "consent_notifier", - ) - .await; - } - Some(_) => {} - None => break, - } - } - }); - - // consultation.opened/new_message → 通知相关方 - let (mut consult_rx, consult_handle) = state - .event_bus - .subscribe_filtered("consultation.".to_string()); - _handles.push(consult_handle); - let consult_db = state.db.clone(); - let consult_bus = state.event_bus.clone(); - tokio::spawn(async move { - loop { - match consult_rx.recv().await { - Some(event) if event.event_type == CONSULTATION_OPENED => { - if erp_core::events::is_event_processed( - &consult_db, - event.id, - "consult_opened_notifier", - ) - .await - .unwrap_or(false) - { - continue; - } - let doctor_id = event.payload.get("doctor_id").and_then(|v| v.as_str()); - let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str()); - if let (Some(did), Some(pid)) = (doctor_id, patient_id) { - let notify = erp_core::events::DomainEvent::new( - "message.send", - event.tenant_id, - erp_core::events::build_event_payload(serde_json::json!({ - "channel": "in_app", - "recipient_type": "doctor", - "recipient_id": did, - "template_key": "CONSULTATION_OPENED", - "params": { "patient_id": pid } - })), - ); - consult_bus.publish(notify, &consult_db).await; - tracing::info!( - doctor_id = did, - patient_id = pid, - "咨询开启通知已发送给医生" - ); - } - let _ = erp_core::events::mark_event_processed( - &consult_db, - event.id, - "consult_opened_notifier", - ) - .await; - } - Some(event) if event.event_type == CONSULTATION_NEW_MESSAGE => { - if erp_core::events::is_event_processed( - &consult_db, - event.id, - "consult_msg_notifier", - ) - .await - .unwrap_or(false) - { - continue; - } - let recipient_id = event.payload.get("recipient_id").and_then(|v| v.as_str()); - let sender_role = event - .payload - .get("sender_role") - .and_then(|v| v.as_str()) - .unwrap_or("unknown"); - if let Some(rid) = recipient_id { - let notify = erp_core::events::DomainEvent::new( - "message.send", - event.tenant_id, - erp_core::events::build_event_payload(serde_json::json!({ - "channel": "in_app", - "recipient_type": if sender_role == "patient" { "doctor" } else { "patient" }, - "recipient_id": rid, - "template_key": "CONSULTATION_NEW_MESSAGE", - })), - ); - consult_bus.publish(notify, &consult_db).await; - tracing::info!( - recipient_id = rid, - sender_role = sender_role, - "咨询新消息通知已发送" - ); - } - let _ = erp_core::events::mark_event_processed( - &consult_db, - event.id, - "consult_msg_notifier", - ) - .await; - } - Some(event) if event.event_type == CONSULTATION_CLOSED => { - if erp_core::events::is_event_processed( - &consult_db, - event.id, - "consult_closed_notifier", - ) - .await - .unwrap_or(false) - { - continue; - } - let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str()); - if let Some(pid) = patient_id { - let notify = erp_core::events::DomainEvent::new( - "message.send", - event.tenant_id, - erp_core::events::build_event_payload(serde_json::json!({ - "channel": "in_app", - "recipient_type": "patient", - "recipient_id": pid, - "template_key": "CONSULTATION_CLOSED", - })), - ); - consult_bus.publish(notify, &consult_db).await; - tracing::info!(patient_id = pid, "咨询关闭通知已发送"); - } - let _ = erp_core::events::mark_event_processed( - &consult_db, - event.id, - "consult_closed_notifier", - ) - .await; - } - Some(_) => {} - None => break, - } - } - }); - - // follow_up.created → 通知执行人 - let (mut fu_created_rx, fu_created_handle) = - state.event_bus.subscribe_filtered("follow_up.".to_string()); - _handles.push(fu_created_handle); - let fu_created_db = state.db.clone(); - let fu_created_bus = state.event_bus.clone(); - tokio::spawn(async move { - loop { - match fu_created_rx.recv().await { - Some(event) if event.event_type == FOLLOW_UP_CREATED => { - if erp_core::events::is_event_processed( - &fu_created_db, - event.id, - "fu_created_notifier", - ) - .await - .unwrap_or(false) - { - continue; - } - let assigned_to = event.payload.get("assigned_to").and_then(|v| v.as_str()); - let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str()); - if let (Some(uid), Some(pid)) = (assigned_to, patient_id) { - let notify = erp_core::events::DomainEvent::new( - "message.send", - event.tenant_id, - erp_core::events::build_event_payload(serde_json::json!({ - "channel": "in_app", - "recipient_type": "staff", - "recipient_id": uid, - "template_key": "FOLLOW_UP_CREATED", - "params": { "patient_id": pid } - })), - ); - fu_created_bus.publish(notify, &fu_created_db).await; - tracing::info!(assigned_to = uid, patient_id = pid, "随访创建通知已发送"); - } - let _ = erp_core::events::mark_event_processed( - &fu_created_db, - event.id, - "fu_created_notifier", - ) - .await; - } - Some(_) => {} - None => break, - } - } - }); - - // points.earned/exchanged → 积分变动通知 - let (mut points_rx, points_handle) = state.event_bus.subscribe_filtered("points.".to_string()); - _handles.push(points_handle); - let points_db = state.db.clone(); - let points_bus = state.event_bus.clone(); - tokio::spawn(async move { - loop { - match points_rx.recv().await { - Some(event) if event.event_type == POINTS_EARNED => { - if erp_core::events::is_event_processed( - &points_db, - event.id, - "points_earned_notifier", - ) - .await - .unwrap_or(false) - { - continue; - } - let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str()); - let amount = event.payload.get("amount").and_then(|v| v.as_u64()); - if let (Some(pid), Some(amt)) = (patient_id, amount) { - let notify = erp_core::events::DomainEvent::new( - "message.send", - event.tenant_id, - erp_core::events::build_event_payload(serde_json::json!({ - "channel": "in_app", - "recipient_type": "patient", - "recipient_id": pid, - "template_key": "POINTS_EARNED", - "params": { "amount": amt } - })), - ); - points_bus.publish(notify, &points_db).await; - tracing::info!(patient_id = pid, amount = amt, "积分获得通知已发送"); - } - let _ = erp_core::events::mark_event_processed( - &points_db, - event.id, - "points_earned_notifier", - ) - .await; - } - Some(event) if event.event_type == POINTS_EXCHANGED => { - if erp_core::events::is_event_processed( - &points_db, - event.id, - "points_exchanged_notifier", - ) - .await - .unwrap_or(false) - { - continue; - } - let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str()); - let amount = event.payload.get("amount").and_then(|v| v.as_u64()); - if let (Some(pid), Some(amt)) = (patient_id, amount) { - let notify = erp_core::events::DomainEvent::new( - "message.send", - event.tenant_id, - erp_core::events::build_event_payload(serde_json::json!({ - "channel": "in_app", - "recipient_type": "patient", - "recipient_id": pid, - "template_key": "POINTS_EXCHANGED", - "params": { "amount": amt } - })), - ); - points_bus.publish(notify, &points_db).await; - tracing::info!(patient_id = pid, amount = amt, "积分兑换通知已发送"); - } - let _ = erp_core::events::mark_event_processed( - &points_db, - event.id, - "points_exchanged_notifier", - ) - .await; - } - Some(event) if event.event_type == POINTS_EXPIRED => { - if erp_core::events::is_event_processed( - &points_db, - event.id, - "points_expired_notifier", - ) - .await - .unwrap_or(false) - { - continue; - } - let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str()); - let amount = event.payload.get("amount").and_then(|v| v.as_u64()); - if let (Some(pid), Some(amt)) = (patient_id, amount) { - let notify = erp_core::events::DomainEvent::new( - "message.send", - event.tenant_id, - erp_core::events::build_event_payload(serde_json::json!({ - "channel": "in_app", - "recipient_type": "patient", - "recipient_id": pid, - "template_key": "POINTS_EXPIRED", - "params": { "amount": amt } - })), - ); - points_bus.publish(notify, &points_db).await; - tracing::info!(patient_id = pid, amount = amt, "积分过期通知已发送"); - } - let _ = erp_core::events::mark_event_processed( - &points_db, - event.id, - "points_expired_notifier", - ) - .await; - } - Some(_) => {} - None => break, - } - } - }); - - // lab_report.uploaded → 触发 AI 自动分析 - let (mut lab_upload_rx, lab_upload_handle) = state - .event_bus - .subscribe_filtered("lab_report.".to_string()); - _handles.push(lab_upload_handle); - let lab_upload_db = state.db.clone(); - let lab_upload_bus = state.event_bus.clone(); - tokio::spawn(async move { - loop { - match lab_upload_rx.recv().await { - Some(event) if event.event_type == LAB_REPORT_UPLOADED => { - if erp_core::events::is_event_processed( - &lab_upload_db, - event.id, - "lab_upload_ai_trigger", - ) - .await - .unwrap_or(false) - { - continue; - } - let report_id = event.payload.get("report_id").and_then(|v| v.as_str()); - let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str()); - if let (Some(rid), Some(pid)) = (report_id, patient_id) { - let ai_event = erp_core::events::DomainEvent::new( - "ai.analysis.requested", - event.tenant_id, - erp_core::events::build_event_payload(serde_json::json!({ - "source_type": "lab_report", - "source_id": rid, - "patient_id": pid, - })), - ); - lab_upload_bus.publish(ai_event, &lab_upload_db).await; - tracing::info!( - report_id = rid, - patient_id = pid, - "化验单上传触发 AI 分析请求" - ); - } - let _ = erp_core::events::mark_event_processed( - &lab_upload_db, - event.id, - "lab_upload_ai_trigger", - ) - .await; - } - Some(event) if event.event_type == LAB_REPORT_REVIEWED => { - if erp_core::events::is_event_processed( - &lab_upload_db, - event.id, - "lab_reviewed_notifier", - ) - .await - .unwrap_or(false) - { - continue; - } - let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str()); - let reviewer_id = event.payload.get("reviewer_id").and_then(|v| v.as_str()); - if let (Some(pid), Some(rid)) = (patient_id, reviewer_id) { - let notify = erp_core::events::DomainEvent::new( - "message.send", - event.tenant_id, - erp_core::events::build_event_payload(serde_json::json!({ - "channel": "in_app", - "recipient_type": "patient", - "recipient_id": pid, - "template_key": "LAB_REPORT_REVIEWED", - "params": { "reviewer_id": rid } - })), - ); - lab_upload_bus.publish(notify, &lab_upload_db).await; - tracing::info!( - patient_id = pid, - reviewer_id = rid, - "化验报告审核通知已发送给患者" - ); - } - let _ = erp_core::events::mark_event_processed( - &lab_upload_db, - event.id, - "lab_reviewed_notifier", - ) - .await; - } - Some(_) => {} - None => break, - } - } - }); - - // patient.updated → 审计日志 - let (mut patient_update_rx, patient_update_handle) = - state.event_bus.subscribe_filtered("patient.".to_string()); - _handles.push(patient_update_handle); - let patient_update_db = state.db.clone(); - tokio::spawn(async move { - loop { - match patient_update_rx.recv().await { - Some(event) if event.event_type == PATIENT_UPDATED => { - if erp_core::events::is_event_processed( - &patient_update_db, - event.id, - "patient_updated_audit", - ) - .await - .unwrap_or(false) - { - continue; - } - let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str()); - tracing::info!(patient_id = ?patient_id, tenant_id = %event.tenant_id, "患者信息已更新"); - let _ = erp_core::events::mark_event_processed( - &patient_update_db, - event.id, - "patient_updated_audit", - ) - .await; - } - Some(_) => {} - None => break, - } - } - }); - - // 防止 SubscriptionHandle 被 drop 导致 cancel channel 关闭 - // 所有过滤订阅的生命周期应与进程一致 - std::mem::forget(_handles); -} -// 事件处理器本身依赖 tokio::spawn + channel + DB,无法纯单元测试。 -// 以下测试覆盖: -// 1. 事件类型常量的正确性(防止拼写错误导致消费者不匹配) -// 2. register_handlers 不 panic(空函数) -// 3. 事件 payload 构造格式与消费者解析逻辑的契约 -// 4. EventBus 过滤订阅的内存行为(无需 DB) -// 5. 消费者从 payload 中提取字段的边界条件 -// --------------------------------------------------------------------------- - -#[cfg(test)] -mod tests { - use super::*; - use erp_core::events::{DomainEvent, EventBus, build_event_payload}; - use serde_json::json; - use std::collections::HashSet; - - // ── 事件类型常量 ────────────────────────────────────────────────────── - - /// 所有事件类型常量必须遵循 `{domain}.{action}` 格式 - fn assert_valid_event_type(name: &str) { - let parts: Vec<&str> = name.split('.').collect(); - assert!( - parts.len() >= 2, - "事件类型 '{}' 不符合 domain.action 格式", - name - ); - assert!( - !parts[0].is_empty() && !parts[1].is_empty(), - "事件类型 '{}' 的 domain 或 action 为空", - name - ); - } - - #[test] - fn event_constants_follow_naming_convention() { - let all_types = [ - APPOINTMENT_CREATED, - ALERT_TRIGGERED, - ALERT_AGGREGATED, - CONSENT_GRANTED, - CONSENT_REVOKED, - ARTICLE_PUBLISHED, - ARTICLE_REJECTED, - CONSULTATION_OPENED, - CONSULTATION_CLOSED, - CONSULTATION_NEW_MESSAGE, - DEVICE_READINGS_SYNCED, - DOCTOR_ONLINE_STATUS_CHANGED, - FOLLOW_UP_CREATED, - FOLLOW_UP_COMPLETED, - FOLLOW_UP_OVERDUE, - DAILY_MONITORING_CREATED, - LAB_REPORT_UPLOADED, - LAB_REPORT_REVIEWED, - HEALTH_DATA_CRITICAL_ALERT, - PATIENT_CREATED, - PATIENT_UPDATED, - PATIENT_VERIFIED, - PATIENT_DECEASED, - POINTS_EXPIRED, - POINTS_EARNED, - POINTS_EXCHANGED, - CARE_PLAN_CREATED, - CARE_PLAN_UPDATED, - CARE_PLAN_ACTIVATED, - CARE_PLAN_COMPLETED, - CARE_ACTION_PERFORMED, - ]; - for t in &all_types { - assert_valid_event_type(t); - } - } - - #[test] - fn event_constants_are_unique() { - let all_types = [ - APPOINTMENT_CREATED, - ALERT_TRIGGERED, - ALERT_AGGREGATED, - CONSENT_GRANTED, - CONSENT_REVOKED, - ARTICLE_PUBLISHED, - ARTICLE_REJECTED, - CONSULTATION_OPENED, - CONSULTATION_CLOSED, - CONSULTATION_NEW_MESSAGE, - DEVICE_READINGS_SYNCED, - DOCTOR_ONLINE_STATUS_CHANGED, - FOLLOW_UP_CREATED, - FOLLOW_UP_COMPLETED, - FOLLOW_UP_OVERDUE, - DAILY_MONITORING_CREATED, - LAB_REPORT_UPLOADED, - LAB_REPORT_REVIEWED, - HEALTH_DATA_CRITICAL_ALERT, - PATIENT_CREATED, - PATIENT_UPDATED, - PATIENT_VERIFIED, - PATIENT_DECEASED, - POINTS_EXPIRED, - POINTS_EARNED, - POINTS_EXCHANGED, - CARE_PLAN_CREATED, - CARE_PLAN_UPDATED, - CARE_PLAN_ACTIVATED, - CARE_PLAN_COMPLETED, - CARE_ACTION_PERFORMED, - ]; - let set: HashSet<&&str> = all_types.iter().collect(); - assert_eq!(set.len(), all_types.len(), "存在重复的事件类型常量"); - } - - #[test] - fn event_constants_match_expected_values() { - // 确保常量值与消费者 switch 匹配中使用的硬编码字符串一致 - assert_eq!(APPOINTMENT_CREATED, "appointment.created"); - assert_eq!(ALERT_TRIGGERED, "alert.triggered"); - assert_eq!(ALERT_AGGREGATED, "alert.aggregated"); - assert_eq!(CONSENT_GRANTED, "consent.granted"); - assert_eq!(CONSENT_REVOKED, "consent.revoked"); - assert_eq!(ARTICLE_PUBLISHED, "article.published"); - assert_eq!(ARTICLE_REJECTED, "article.rejected"); - assert_eq!(CONSULTATION_OPENED, "consultation.opened"); - assert_eq!(CONSULTATION_CLOSED, "consultation.closed"); - assert_eq!(CONSULTATION_NEW_MESSAGE, "consultation.new_message"); - assert_eq!(DEVICE_READINGS_SYNCED, "device.readings.synced"); - assert_eq!(DOCTOR_ONLINE_STATUS_CHANGED, "doctor.online_status_changed"); - assert_eq!(FOLLOW_UP_CREATED, "follow_up.created"); - assert_eq!(FOLLOW_UP_COMPLETED, "follow_up.completed"); - assert_eq!(FOLLOW_UP_OVERDUE, "follow_up.overdue"); - assert_eq!(DAILY_MONITORING_CREATED, "daily_monitoring.created"); - assert_eq!(LAB_REPORT_UPLOADED, "lab_report.uploaded"); - assert_eq!(LAB_REPORT_REVIEWED, "lab_report.reviewed"); - assert_eq!(HEALTH_DATA_CRITICAL_ALERT, "health_data.critical_alert"); - assert_eq!(PATIENT_CREATED, "patient.created"); - assert_eq!(PATIENT_UPDATED, "patient.updated"); - assert_eq!(PATIENT_VERIFIED, "patient.verified"); - assert_eq!(PATIENT_DECEASED, "patient.deceased"); - assert_eq!(POINTS_EXPIRED, "points.expired"); - assert_eq!(POINTS_EARNED, "points.earned"); - assert_eq!(POINTS_EXCHANGED, "points.exchanged"); - assert_eq!(CARE_PLAN_CREATED, "care_plan.created"); - assert_eq!(CARE_PLAN_UPDATED, "care_plan.updated"); - assert_eq!(CARE_PLAN_ACTIVATED, "care_plan.activated"); - assert_eq!(CARE_PLAN_COMPLETED, "care_plan.completed"); - assert_eq!(CARE_ACTION_PERFORMED, "care.action.performed"); - } - - /// 消费者中硬编码的事件类型(非通过常量引用)也必须可被常量覆盖 - #[test] - fn hardcoded_event_types_in_consumers_are_covered() { - // event.rs 中消费者使用的事件类型字符串(未通过常量引用) - let hardcoded = [ - "workflow.task.completed", - "appointment.confirmed", - "appointment.cancelled", - "ai.analysis.completed", - "dialysis.record.created", - // 消费者产出的事件类型 - "message.send", - "ai.analysis.requested", - "ai.reanalysis.requested", - ]; - // 这些硬编码类型不与 erp-health 常量重复 - // 它们来自 erp-workflow / erp-core / erp-ai 等其他模块 - // 验证 erp-health 常量不会与外部模块事件类型冲突 - let health_types = [ - APPOINTMENT_CREATED, - ALERT_TRIGGERED, - CONSENT_GRANTED, - CONSENT_REVOKED, - CONSULTATION_OPENED, - FOLLOW_UP_CREATED, - FOLLOW_UP_COMPLETED, - FOLLOW_UP_OVERDUE, - DEVICE_READINGS_SYNCED, - LAB_REPORT_UPLOADED, - HEALTH_DATA_CRITICAL_ALERT, - PATIENT_CREATED, - PATIENT_UPDATED, - ]; - for t in &hardcoded { - for ht in &health_types { - assert_ne!( - *t, *ht, - "外部模块事件类型 '{}' 与 erp-health 常量 '{}' 冲突", - t, ht - ); - } - } - } - - // ── register_handlers 不 panic ────────────────────────────────────── - - #[test] - fn register_handlers_does_not_panic() { - let bus = EventBus::new(64); - // register_handlers 是空函数,不应 panic - register_handlers(&bus); - } - - // ── DomainEvent 构造与 payload 契约 ───────────────────────────────── - - #[test] - fn domain_event_new_sets_correct_event_type() { - let tenant_id = Uuid::now_v7(); - let event = DomainEvent::new(PATIENT_CREATED, tenant_id, json!({})); - assert_eq!(event.event_type, PATIENT_CREATED); - assert_eq!(event.tenant_id, tenant_id); - } - - #[test] - fn domain_event_new_generates_unique_ids() { - let tenant_id = Uuid::now_v7(); - let e1 = DomainEvent::new("test.a", tenant_id, json!({})); - let e2 = DomainEvent::new("test.b", tenant_id, json!({})); - assert_ne!(e1.id, e2.id, "每个事件应有唯一 ID"); - assert_ne!( - e1.correlation_id, e2.correlation_id, - "每个事件应有唯一 correlation_id" - ); - } - - #[test] - fn build_event_payload_injects_schema_version() { - let payload = build_event_payload(json!({ "patient_id": "abc" })); - assert_eq!(payload["schema_version"], "v1"); - assert!( - payload.get("occurred_at").is_some(), - "必须包含 occurred_at 时间戳" - ); - } - - #[test] - fn build_event_payload_merges_data_fields() { - let payload = build_event_payload(json!({ - "patient_id": "test-123", - "severity": "critical", - })); - assert_eq!(payload["schema_version"], "v1"); - assert_eq!(payload["patient_id"], "test-123"); - assert_eq!(payload["severity"], "critical"); - } - - // ── 消费者 payload 解析契约测试 ───────────────────────────────────── - // - // 验证消费者从 payload 中提取字段的方式与 build_event_payload 的输出兼容。 - // 这些测试模拟消费者使用的 serde_json::Value 提取模式。 - - /// 模拟消费者提取 patient_id(UUID 字符串) - fn extract_patient_id(payload: &serde_json::Value) -> Option { - payload - .get("patient_id") - .and_then(|v| v.as_str()) - .and_then(|s| Uuid::parse_str(s).ok()) - } - - #[test] - fn payload_extraction_patient_id_from_uuid_string() { - let pid = Uuid::now_v7(); - let payload = build_event_payload(json!({ - "patient_id": pid.to_string(), - })); - assert_eq!(extract_patient_id(&payload), Some(pid)); - } - - #[test] - fn payload_extraction_patient_id_missing_field() { - let payload = build_event_payload(json!({})); - assert_eq!(extract_patient_id(&payload), None); - } - - #[test] - fn payload_extraction_patient_id_invalid_uuid() { - let payload = build_event_payload(json!({ - "patient_id": "not-a-uuid", - })); - assert_eq!(extract_patient_id(&payload), None); - } - - #[test] - fn payload_extraction_patient_id_wrong_type() { - // patient_id 是数字而非字符串 - let payload = build_event_payload(json!({ - "patient_id": 12345, - })); - assert_eq!(extract_patient_id(&payload), None); - } - - /// 模拟消费者提取 severity(带默认值) - fn extract_severity(payload: &serde_json::Value) -> &str { - payload - .get("severity") - .and_then(|v| v.as_str()) - .unwrap_or("warning") - } - - #[test] - fn payload_extraction_severity_present() { - let payload = build_event_payload(json!({ "severity": "critical" })); - assert_eq!(extract_severity(&payload), "critical"); - } - - #[test] - fn payload_extraction_severity_defaults_to_warning() { - let payload = build_event_payload(json!({})); - assert_eq!(extract_severity(&payload), "warning"); - } - - /// 模拟消费者提取 task_id(UUID 字符串) - fn extract_task_id(payload: &serde_json::Value) -> Option { - payload - .get("task_id") - .and_then(|v| v.as_str()) - .and_then(|s| Uuid::parse_str(s).ok()) - } - - #[test] - fn payload_extraction_task_id_valid() { - let tid = Uuid::now_v7(); - let payload = build_event_payload(json!({ "task_id": tid.to_string() })); - assert_eq!(extract_task_id(&payload), Some(tid)); - } - - #[test] - fn payload_extraction_task_id_missing() { - let payload = build_event_payload(json!({ "other_field": "value" })); - assert_eq!(extract_task_id(&payload), None); - } - - /// 模拟消费者提取 amount(u64) - fn extract_amount(payload: &serde_json::Value) -> Option { - payload.get("amount").and_then(|v| v.as_u64()) - } - - #[test] - fn payload_extraction_amount_valid() { - let payload = build_event_payload(json!({ "amount": 100 })); - assert_eq!(extract_amount(&payload), Some(100)); - } - - #[test] - fn payload_extraction_amount_zero() { - let payload = build_event_payload(json!({ "amount": 0 })); - assert_eq!(extract_amount(&payload), Some(0)); - } - - #[test] - fn payload_extraction_amount_missing() { - let payload = build_event_payload(json!({})); - assert_eq!(extract_amount(&payload), None); - } - - #[test] - fn payload_extraction_amount_negative_returns_none() { - // serde_json u64 不能表示负数 - let payload = build_event_payload(json!({ "amount": -5 })); - assert_eq!(extract_amount(&payload), None); - } - - /// 模拟消费者提取 suggestion_count(u64)用于条件判断 - #[test] - fn payload_extraction_suggestion_count_zero() { - let payload = build_event_payload(json!({ "suggestion_count": 0 })); - let count = payload - .get("suggestion_count") - .and_then(|v| v.as_u64()) - .unwrap_or(0); - assert_eq!(count, 0); - assert!(!(count > 0), "suggestion_count=0 时不触发行动分发"); - } - - #[test] - fn payload_extraction_suggestion_count_positive() { - let payload = build_event_payload(json!({ "suggestion_count": 3 })); - let count = payload - .get("suggestion_count") - .and_then(|v| v.as_u64()) - .unwrap_or(0); - assert!(count > 0, "suggestion_count>0 时应触发行动分发"); - } - - // ── 完整 payload 契约测试 ───────────────────────────────────────── - // - // 验证 service 层构建的 payload 能被消费者正确解析。 - // 这些测试模拟 service 层的 build_event_payload 调用,然后 - // 用消费者中的提取逻辑验证字段可达。 - - /// appointment.created 事件 payload 契约 - #[test] - fn appointment_created_payload_contract() { - let patient_id = Uuid::now_v7(); - let appointment_id = Uuid::now_v7(); - // 模拟 appointment_service 的 payload 构造 - let payload = build_event_payload(json!({ - "appointment_id": appointment_id.to_string(), - "patient_id": patient_id.to_string(), - "status": "pending", - })); - - // 消费者提取 patient_id(字符串形式) - let pid_str = payload.get("patient_id").and_then(|v| v.as_str()); - assert!(pid_str.is_some(), "消费者需要 patient_id 字符串"); - assert_eq!(pid_str.unwrap(), patient_id.to_string()); - } - - /// patient.created 事件 payload 契约 - #[test] - fn patient_created_payload_contract() { - let patient_id = Uuid::now_v7(); - // 模拟 patient_service 的 payload 构造 - let payload = build_event_payload(json!({ - "patient_id": patient_id.to_string(), - })); - - let extracted = extract_patient_id(&payload); - assert_eq!(extracted, Some(patient_id)); - } - - /// alert.triggered 事件 payload 契约 - #[test] - fn alert_triggered_payload_contract() { - let alert_id = Uuid::now_v7(); - let patient_id = Uuid::now_v7(); - // 模拟 alert_engine 的 payload 构造 - let payload = build_event_payload(json!({ - "alert_id": alert_id.to_string(), - "patient_id": patient_id.to_string(), - "rule_name": "心率过高", - "severity": "critical", - "detail": "心率超过阈值", - "notify_roles": ["doctor"], - })); - - let pid = payload.get("patient_id").and_then(|v| v.as_str()); - assert!(pid.is_some(), "消费者需要 patient_id"); - - let severity = extract_severity(&payload); - assert_eq!(severity, "critical"); - - let rule_name = payload - .get("rule_name") - .and_then(|v| v.as_str()) - .unwrap_or("健康告警"); - assert_eq!(rule_name, "心率过高"); - } - - /// health_data.critical_alert 事件 payload 契约 - #[test] - fn critical_alert_payload_contract() { - let patient_id = Uuid::now_v7(); - let payload = build_event_payload(json!({ - "patient_id": patient_id.to_string(), - "alert_type": "vital_sign", - "metric_name": "heart_rate", - "metric_value": "180", - "threshold_value": "150", - })); - - assert_eq!(extract_patient_id(&payload), Some(patient_id)); - assert_eq!( - payload - .get("alert_type") - .and_then(|v| v.as_str()) - .unwrap_or("vital_sign"), - "vital_sign" - ); - assert_eq!( - payload - .get("metric_name") - .and_then(|v| v.as_str()) - .unwrap_or("unknown"), - "heart_rate" - ); - assert_eq!( - payload - .get("metric_value") - .and_then(|v| v.as_str()) - .unwrap_or(""), - "180" - ); - assert_eq!( - payload - .get("threshold_value") - .and_then(|v| v.as_str()) - .unwrap_or(""), - "150" - ); - } - - /// follow_up.overdue 事件 payload 契约 - #[test] - fn follow_up_overdue_payload_contract() { - let task_id = Uuid::now_v7(); - let assigned_to = Uuid::now_v7(); - let payload = build_event_payload(json!({ - "task_id": task_id.to_string(), - "assigned_to": assigned_to.to_string(), - })); - - let tid = payload.get("task_id").and_then(|v| v.as_str()); - let uid = payload.get("assigned_to").and_then(|v| v.as_str()); - assert!(tid.is_some(), "消费者需要 task_id"); - assert!(uid.is_some(), "消费者需要 assigned_to"); - } - - /// consultation.new_message 事件 payload 契约 — sender_role 决定通知目标 - #[test] - fn consultation_new_message_recipient_logic() { - // 患者发送 → 通知医生 - let payload_patient = build_event_payload(json!({ - "recipient_id": "doctor-123", - "sender_role": "patient", - })); - let sender_role = payload_patient - .get("sender_role") - .and_then(|v| v.as_str()) - .unwrap_or("unknown"); - let recipient_type = if sender_role == "patient" { - "doctor" - } else { - "patient" - }; - assert_eq!(recipient_type, "doctor"); - - // 医生发送 → 通知患者 - let payload_doctor = build_event_payload(json!({ - "recipient_id": "patient-456", - "sender_role": "doctor", - })); - let sender_role = payload_doctor - .get("sender_role") - .and_then(|v| v.as_str()) - .unwrap_or("unknown"); - let recipient_type = if sender_role == "patient" { - "doctor" - } else { - "patient" - }; - assert_eq!(recipient_type, "patient"); - } - - /// lab_report.uploaded 事件 payload 契约 — 触发 AI 分析 - #[test] - fn lab_report_uploaded_payload_contract() { - let report_id = Uuid::now_v7(); - let patient_id = Uuid::now_v7(); - let payload = build_event_payload(json!({ - "source_type": "lab_report", - "source_id": report_id.to_string(), - "patient_id": patient_id.to_string(), - })); - - let rid = payload.get("source_id").and_then(|v| v.as_str()); - let pid = payload.get("patient_id").and_then(|v| v.as_str()); - assert!(rid.is_some(), "消费者需要 source_id (report_id)"); - assert!(pid.is_some(), "消费者需要 patient_id"); - } - - /// points.earned 事件 payload 契约 - #[test] - fn points_earned_payload_contract() { - let patient_id = Uuid::now_v7(); - let payload = build_event_payload(json!({ - "patient_id": patient_id.to_string(), - "amount": 50, - })); - - let pid = payload.get("patient_id").and_then(|v| v.as_str()); - let amt = payload.get("amount").and_then(|v| v.as_u64()); - assert!(pid.is_some()); - assert_eq!(amt, Some(50)); - } - - /// ai.analysis.completed 事件 payload 契约 — 含 suggestion_count - #[test] - fn ai_analysis_completed_payload_contract() { - let analysis_id = Uuid::now_v7(); - let patient_id = Uuid::now_v7(); - let doctor_id = Uuid::now_v7(); - let payload = build_event_payload(json!({ - "analysis_id": analysis_id.to_string(), - "analysis_type": "lab_report", - "patient_id": patient_id.to_string(), - "doctor_id": doctor_id.to_string(), - "risk_level": "high", - "suggestion_count": 2, - })); - - // AI 分析完成通知消费者需要的字段 - let did = payload.get("doctor_id").and_then(|v| v.as_str()); - let pid = payload.get("patient_id").and_then(|v| v.as_str()); - assert!(did.is_some(), "通知消费者需要 doctor_id"); - assert!(pid.is_some(), "通知消费者需要 patient_id"); - - // 行动分发消费者需要的字段 - let aid = payload - .get("analysis_id") - .and_then(|v| v.as_str()) - .and_then(|s| Uuid::parse_str(s).ok()); - assert_eq!(aid, Some(analysis_id)); - - let suggestion_count = payload - .get("suggestion_count") - .and_then(|v| v.as_u64()) - .unwrap_or(0); - assert!(suggestion_count > 0, "有建议时应触发行动分发"); - - let risk_level = payload - .get("risk_level") - .and_then(|v| v.as_str()) - .unwrap_or("medium"); - assert_eq!(risk_level, "high"); - } - - // ── EventBus 过滤订阅行为测试 ────────────────────────────────────── - - #[tokio::test] - async fn eventbus_broadcast_and_subscribe_filtered() { - let bus = EventBus::new(64); - let (mut rx, _handle) = bus.subscribe_filtered("patient.".to_string()); - - let tenant_id = Uuid::now_v7(); - - // 广播一个匹配的事件 - let patient_event = DomainEvent::new(PATIENT_CREATED, tenant_id, json!({})); - bus.broadcast(patient_event.clone()); - - // 广播一个不匹配的事件 - let other_event = DomainEvent::new(APPOINTMENT_CREATED, tenant_id, json!({})); - bus.broadcast(other_event); - - // 只应收到 patient.created - let received = tokio::time::timeout(std::time::Duration::from_millis(100), rx.recv()) - .await - .expect("超时:应收到匹配的事件") - .expect("channel 不应关闭"); - assert_eq!(received.event_type, PATIENT_CREATED); - } - - #[tokio::test] - async fn eventbus_subscribe_filtered_ignores_non_matching() { - let bus = EventBus::new(64); - let (mut rx, _handle) = bus.subscribe_filtered("follow_up.".to_string()); - - let tenant_id = Uuid::now_v7(); - - // 广播不匹配的事件 - let unmatched = DomainEvent::new(PATIENT_CREATED, tenant_id, json!({})); - bus.broadcast(unmatched); - - // 应该收不到任何事件 - let result = tokio::time::timeout(std::time::Duration::from_millis(50), rx.recv()).await; - assert!(result.is_err(), "不应收到不匹配的事件"); - } - - #[tokio::test] - async fn eventbus_subscribe_filtered_receives_multiple_matching() { - let bus = EventBus::new(64); - let (mut rx, _handle) = bus.subscribe_filtered("appointment.".to_string()); - - let tenant_id = Uuid::now_v7(); - - // 广播多个匹配前缀的事件 - let types = [ - APPOINTMENT_CREATED, - "appointment.confirmed", - "appointment.cancelled", - ]; - for t in &types { - let event = DomainEvent::new(*t, tenant_id, json!({})); - bus.broadcast(event); - } - - // 应收到全部 3 个 - let mut received_types = Vec::new(); - for _ in 0..3 { - let event = tokio::time::timeout(std::time::Duration::from_millis(100), rx.recv()) - .await - .expect("超时") - .expect("channel 关闭"); - received_types.push(event.event_type); - } - assert_eq!(received_types.len(), 3); - assert!(received_types.contains(&APPOINTMENT_CREATED.to_string())); - assert!(received_types.contains(&"appointment.confirmed".to_string())); - assert!(received_types.contains(&"appointment.cancelled".to_string())); - } - - // ── 消费者前缀与常量匹配测试 ───────────────────────────────────── - // - // 验证 subscribe_filtered 的前缀能覆盖该通道需要接收的所有事件类型。 - - #[test] - fn subscribe_prefix_covers_all_workflow_events() { - let prefix = "workflow.task."; - let event_type = "workflow.task.completed"; - assert!( - event_type.starts_with(prefix), - "前缀 '{}' 应覆盖 '{}'", - prefix, - event_type - ); - } - - #[test] - fn subscribe_prefix_covers_all_device_events() { - let prefix = "device.readings."; - assert!( - DEVICE_READINGS_SYNCED.starts_with(prefix), - "前缀 '{}' 应覆盖 '{}'", - prefix, - DEVICE_READINGS_SYNCED - ); - } - - #[test] - fn subscribe_prefix_covers_all_alert_events() { - let prefix = "alert."; - assert!( - ALERT_TRIGGERED.starts_with(prefix), - "前缀 '{}' 应覆盖 '{}'", - prefix, - ALERT_TRIGGERED - ); - assert!( - ALERT_AGGREGATED.starts_with(prefix), - "前缀 '{}' 应覆盖 '{}'", - prefix, - ALERT_AGGREGATED - ); - } - - #[test] - fn subscribe_prefix_covers_all_patient_events() { - let prefix = "patient."; - assert!(PATIENT_CREATED.starts_with(prefix)); - assert!(PATIENT_UPDATED.starts_with(prefix)); - } - - #[test] - fn subscribe_prefix_covers_all_appointment_events() { - let prefix = "appointment."; - assert!(APPOINTMENT_CREATED.starts_with(prefix)); - assert!("appointment.confirmed".starts_with(prefix)); - assert!("appointment.cancelled".starts_with(prefix)); - } - - #[test] - fn subscribe_prefix_covers_all_follow_up_events() { - let prefix = "follow_up."; - assert!(FOLLOW_UP_CREATED.starts_with(prefix)); - assert!(FOLLOW_UP_COMPLETED.starts_with(prefix)); - assert!(FOLLOW_UP_OVERDUE.starts_with(prefix)); - } - - #[test] - fn subscribe_prefix_covers_all_health_data_events() { - let prefix = "health_data."; - assert!( - HEALTH_DATA_CRITICAL_ALERT.starts_with(prefix), - "前缀 '{}' 应覆盖 '{}'", - prefix, - HEALTH_DATA_CRITICAL_ALERT - ); - } - - #[test] - fn subscribe_prefix_covers_all_ai_events() { - let prefix = "ai."; - assert!("ai.analysis.completed".starts_with(prefix)); - assert!("ai.reanalysis.requested".starts_with(prefix)); - } - - #[test] - fn subscribe_prefix_covers_all_consent_events() { - let prefix = "consent."; - assert!(CONSENT_GRANTED.starts_with(prefix)); - assert!(CONSENT_REVOKED.starts_with(prefix)); - } - - #[test] - fn subscribe_prefix_covers_all_consultation_events() { - let prefix = "consultation."; - assert!(CONSULTATION_OPENED.starts_with(prefix)); - assert!(CONSULTATION_CLOSED.starts_with(prefix)); - assert!(CONSULTATION_NEW_MESSAGE.starts_with(prefix)); - } - - #[test] - fn subscribe_prefix_covers_all_points_events() { - let prefix = "points."; - assert!(POINTS_EARNED.starts_with(prefix)); - assert!(POINTS_EXCHANGED.starts_with(prefix)); - assert!(POINTS_EXPIRED.starts_with(prefix)); - } - - #[test] - fn subscribe_prefix_covers_all_lab_report_events() { - let prefix = "lab_report."; - assert!(LAB_REPORT_UPLOADED.starts_with(prefix)); - assert!(LAB_REPORT_REVIEWED.starts_with(prefix)); - } - - #[test] - fn subscribe_prefix_covers_all_care_plan_events() { - let prefix = "care_plan."; - assert!(CARE_PLAN_CREATED.starts_with(prefix)); - assert!(CARE_PLAN_UPDATED.starts_with(prefix)); - assert!(CARE_PLAN_ACTIVATED.starts_with(prefix)); - assert!(CARE_PLAN_COMPLETED.starts_with(prefix)); - } - - // ── device.readings.synced 消费者设备类型列表测试 ────────────────── - - #[test] - fn device_types_for_alert_evaluation_are_comprehensive() { - // 消费者硬编码的设备类型列表 - let device_types = [ - "heart_rate", - "blood_oxygen", - "temperature", - "blood_pressure", - "blood_glucose", - ]; - assert_eq!(device_types.len(), 5, "设备类型列表应包含 5 种类型"); - // 确保没有重复 - let set: HashSet<&&str> = device_types.iter().collect(); - assert_eq!(set.len(), device_types.len(), "设备类型列表不应有重复"); - } - - // ── 告警严重度模板选择逻辑 ───────────────────────────────────── - - #[test] - fn alert_severity_template_selection() { - let severity_critical = "critical"; - let template = if severity_critical == "critical" { - "CRITICAL_HEALTH_ALERT" - } else { - "HEALTH_DATA_ABNORMAL" - }; - assert_eq!(template, "CRITICAL_HEALTH_ALERT"); - - let severity_warning = "warning"; - let template = if severity_warning == "critical" { - "CRITICAL_HEALTH_ALERT" - } else { - "HEALTH_DATA_ABNORMAL" - }; - assert_eq!(template, "HEALTH_DATA_ABNORMAL"); - } - - // ── 消费者幂等 consumer_id 唯一性 ────────────────────────────────── - - #[test] - fn consumer_ids_are_unique() { - // 收集所有消费者的 consumer_id(从 mark_event_processed 调用中提取) - let consumer_ids = [ - "workflow_task_consumer", - "alert_aggregator", - "alert_notifier", - "patient_welcome", - "appt_created_notifier", - "appointment_notifier", - "appointment_cancel_handler", - "follow_up_escalator", - "critical_alert_consumer", - "ai_analysis_notifier", - "dialysis_notifier", - "ai_action_dispatcher", - "consent_notifier", - "consult_opened_notifier", - "consult_msg_notifier", - "consult_closed_notifier", - "fu_created_notifier", - "points_earned_notifier", - "points_exchanged_notifier", - "points_expired_notifier", - "lab_upload_ai_trigger", - "lab_reviewed_notifier", - "patient_updated_audit", - ]; - let set: HashSet<&&str> = consumer_ids.iter().collect(); - assert_eq!(set.len(), consumer_ids.len(), "存在重复的 consumer_id"); - } - - // ── 消费者通知构造逻辑测试 ───────────────────────────────────────── - // - // 消费者的核心逻辑是闭包内的,无法直接调用。这里测试的是: - // 1. 通知消息的构造模式是否正确 - // 2. 缺失字段时是否安全跳过(不 panic) - // 3. 关键分支决策是否正确 - - /// 告警通知消费者:severity 分支决定 template_key - #[test] - fn alert_notifier_constructs_correct_template_for_severity() { - let patient_id = Uuid::now_v7(); - let tenant_id = Uuid::now_v7(); - - for (severity, expected_template) in [ - ("critical", "CRITICAL_HEALTH_ALERT"), - ("warning", "HEALTH_DATA_ABNORMAL"), - ("info", "HEALTH_DATA_ABNORMAL"), - ] { - let payload = build_event_payload(json!({ - "patient_id": patient_id.to_string(), - "severity": severity, - "rule_name": "血压偏高", - })); - - let pid = payload.get("patient_id").and_then(|v| v.as_str()); - let sev = payload - .get("severity") - .and_then(|v| v.as_str()) - .unwrap_or("warning"); - let rule = payload - .get("rule_name") - .and_then(|v| v.as_str()) - .unwrap_or("健康告警"); - - assert!(pid.is_some(), "severity={} 时 patient_id 应存在", severity); - - let template_key = if sev == "critical" { - "CRITICAL_HEALTH_ALERT" - } else { - "HEALTH_DATA_ABNORMAL" - }; - assert_eq!(template_key, expected_template); - - let notify = json!({ - "channel": "in_app", - "recipient_type": "patient", - "recipient_id": pid.unwrap(), - "template_key": template_key, - "params": { "rule_name": rule, "severity": sev } - }); - assert_eq!(notify["template_key"], expected_template); - } - } - - /// 告警聚合消费者:suppressed=true 时发布 ALERT_AGGREGATED - #[test] - fn alert_aggregator_only_publishes_when_suppressed() { - let patient_id = Uuid::now_v7(); - - // suppressed=true → 发布聚合事件 - let payload_suppressed = build_event_payload(json!({ - "patient_id": patient_id.to_string(), - "suppressed": true, - "alert_id": "alert-123", - "severity": "warning", - })); - let is_suppressed = payload_suppressed - .get("suppressed") - .and_then(|v| v.as_bool()) - .unwrap_or(false); - assert!(is_suppressed, "suppressed=true 时应触发聚合"); - - // suppressed=false → 不发布 - let payload_normal = build_event_payload(json!({ - "patient_id": patient_id.to_string(), - "suppressed": false, - })); - let is_suppressed_2 = payload_normal - .get("suppressed") - .and_then(|v| v.as_bool()) - .unwrap_or(false); - assert!(!is_suppressed_2, "suppressed=false 时不应触发聚合"); - - // 无 suppressed 字段 → 默认 false - let payload_no_field = build_event_payload(json!({ - "patient_id": patient_id.to_string(), - })); - let is_suppressed_3 = payload_no_field - .get("suppressed") - .and_then(|v| v.as_bool()) - .unwrap_or(false); - assert!(!is_suppressed_3, "缺少 suppressed 字段时默认不聚合"); - } - - /// AI 分析完成消费者:缺少 doctor_id 或 patient_id 时安全跳过 - #[test] - fn ai_analysis_notifier_skips_when_missing_ids() { - // 完整 payload → 构造通知 - let full_payload = build_event_payload(json!({ - "analysis_id": Uuid::now_v7().to_string(), - "analysis_type": "lab_report", - "patient_id": Uuid::now_v7().to_string(), - "doctor_id": Uuid::now_v7().to_string(), - })); - let did = full_payload.get("doctor_id").and_then(|v| v.as_str()); - let pid = full_payload.get("patient_id").and_then(|v| v.as_str()); - assert!( - did.is_some() && pid.is_some(), - "完整 payload 应能提取 doctor_id 和 patient_id" - ); - - // 缺少 doctor_id → 跳过通知 - let no_doctor = build_event_payload(json!({ - "analysis_id": Uuid::now_v7().to_string(), - "patient_id": Uuid::now_v7().to_string(), - })); - let did2 = no_doctor.get("doctor_id").and_then(|v| v.as_str()); - assert!(did2.is_none(), "缺少 doctor_id 应跳过通知"); - - // 完全空 payload → 不 panic - let empty = build_event_payload(json!({})); - let did3 = empty.get("doctor_id").and_then(|v| v.as_str()); - let pid3 = empty.get("patient_id").and_then(|v| v.as_str()); - assert!( - did3.is_none() && pid3.is_none(), - "空 payload 应安全返回 None" - ); - } - - /// AI 行动分发消费者:suggestion_count=0 时跳过行动分发 - #[test] - fn ai_action_dispatcher_skips_when_no_suggestions() { - // suggestion_count > 0 → 分发 - let with_suggestions = build_event_payload(json!({ - "analysis_id": Uuid::now_v7().to_string(), - "patient_id": Uuid::now_v7().to_string(), - "suggestion_count": 3, - })); - let count = with_suggestions - .get("suggestion_count") - .and_then(|v| v.as_u64()) - .unwrap_or(0); - assert!(count > 0, "有建议时应触发分发"); - - // suggestion_count = 0 → 跳过 - let no_suggestions = build_event_payload(json!({ - "analysis_id": Uuid::now_v7().to_string(), - "patient_id": Uuid::now_v7().to_string(), - "suggestion_count": 0, - })); - let count2 = no_suggestions - .get("suggestion_count") - .and_then(|v| v.as_u64()) - .unwrap_or(0); - assert_eq!(count2, 0, "无建议时应跳过分发"); - - // 无 suggestion_count 字段 → 默认 0 - let no_field = build_event_payload(json!({ - "analysis_id": Uuid::now_v7().to_string(), - })); - let count3 = no_field - .get("suggestion_count") - .and_then(|v| v.as_u64()) - .unwrap_or(0); - assert_eq!(count3, 0, "缺少 suggestion_count 字段时默认为 0"); - } - - /// 预约创建消费者:缺少 patient_id 或 doctor_id 时安全跳过 - #[test] - fn appointment_created_notifier_skips_when_missing_ids() { - let patient_id = Uuid::now_v7(); - let doctor_id = Uuid::now_v7(); - - // 完整 → 构造通知 - let full = build_event_payload(json!({ - "patient_id": patient_id.to_string(), - "doctor_id": doctor_id.to_string(), - })); - let (pid, did) = ( - full.get("patient_id").and_then(|v| v.as_str()), - full.get("doctor_id").and_then(|v| v.as_str()), - ); - assert!(pid.is_some() && did.is_some()); - - // 缺 doctor_id - let no_doc = build_event_payload(json!({ - "patient_id": patient_id.to_string(), - })); - let pid2 = no_doc.get("patient_id").and_then(|v| v.as_str()); - let did2 = no_doc.get("doctor_id").and_then(|v| v.as_str()); - assert!(pid2.is_some() && did2.is_none()); - - // 缺 patient_id - let no_pat = build_event_payload(json!({ - "doctor_id": doctor_id.to_string(), - })); - let pid3 = no_pat.get("patient_id").and_then(|v| v.as_str()); - let did3 = no_pat.get("doctor_id").and_then(|v| v.as_str()); - assert!(pid3.is_none() && did3.is_some()); - } - - /// 随访逾期升级消费者:缺少 task_id 或 assigned_to 时安全跳过 - #[test] - fn follow_up_escalator_skips_when_missing_ids() { - // 完整 → 构造升级通知 - let full = build_event_payload(json!({ - "task_id": Uuid::now_v7().to_string(), - "assigned_to": Uuid::now_v7().to_string(), - })); - let (tid, uid) = ( - full.get("task_id").and_then(|v| v.as_str()), - full.get("assigned_to").and_then(|v| v.as_str()), - ); - assert!(tid.is_some() && uid.is_some()); - - // 缺 assigned_to - let no_assignee = build_event_payload(json!({ - "task_id": Uuid::now_v7().to_string(), - })); - let tid2 = no_assignee.get("task_id").and_then(|v| v.as_str()); - let uid2 = no_assignee.get("assigned_to").and_then(|v| v.as_str()); - assert!(tid2.is_some() && uid2.is_none()); - } - - /// 危急值告警消费者:从 payload 提取所有必需字段 - #[test] - fn critical_alert_consumer_extracts_all_fields() { - let patient_id = Uuid::now_v7(); - let payload = build_event_payload(json!({ - "patient_id": patient_id.to_string(), - "alert_type": "vital_sign", - "metric_name": "systolic_bp", - "metric_value": "185", - "threshold_value": "140", - })); - - let pid = payload - .get("patient_id") - .and_then(|v| v.as_str()) - .and_then(|s| Uuid::parse_str(s).ok()); - let alert_type = payload - .get("alert_type") - .and_then(|v| v.as_str()) - .unwrap_or("vital_sign"); - let metric_name = payload - .get("metric_name") - .and_then(|v| v.as_str()) - .unwrap_or("unknown"); - let metric_value = payload - .get("metric_value") - .and_then(|v| v.as_str()) - .unwrap_or(""); - let threshold = payload - .get("threshold_value") - .and_then(|v| v.as_str()) - .unwrap_or(""); - - assert!(pid.is_some()); - assert_eq!(alert_type, "vital_sign"); - assert_eq!(metric_name, "systolic_bp"); - assert_eq!(metric_value, "185"); - assert_eq!(threshold, "140"); - } - - /// 危急值告警消费者:缺失 patient_id 时安全跳过 - #[test] - fn critical_alert_consumer_skips_without_patient_id() { - let payload = build_event_payload(json!({ - "alert_type": "vital_sign", - "metric_name": "heart_rate", - })); - let pid = payload - .get("patient_id") - .and_then(|v| v.as_str()) - .and_then(|s| Uuid::parse_str(s).ok()); - assert!(pid.is_none(), "缺少 patient_id 应安全跳过"); - } - - /// 咨询消息消费者:sender_role 决定通知方向 - #[test] - fn consultation_message_direction_logic() { - // 患者发送 → 通知医生 - let from_patient = build_event_payload(json!({ - "consultation_id": Uuid::now_v7().to_string(), - "sender_role": "patient", - "recipient_id": "doctor-789", - })); - let sender = from_patient - .get("sender_role") - .and_then(|v| v.as_str()) - .unwrap_or("unknown"); - let recipient_type = if sender == "patient" { - "doctor" - } else { - "patient" - }; - assert_eq!(recipient_type, "doctor"); - - // 医生发送 → 通知患者 - let from_doctor = build_event_payload(json!({ - "consultation_id": Uuid::now_v7().to_string(), - "sender_role": "doctor", - "recipient_id": "patient-456", - })); - let sender2 = from_doctor - .get("sender_role") - .and_then(|v| v.as_str()) - .unwrap_or("unknown"); - let recipient_type2 = if sender2 == "patient" { - "doctor" - } else { - "patient" - }; - assert_eq!(recipient_type2, "patient"); - } - - /// 知情同意消费者:granted 和 revoked 使用不同 template_key - #[test] - fn consent_notifier_uses_correct_template() { - let patient_id = Uuid::now_v7(); - - // granted - let granted = build_event_payload(json!({ - "patient_id": patient_id.to_string(), - "consent_type": "data_sharing", - })); - let consent_type = granted - .get("consent_type") - .and_then(|v| v.as_str()) - .unwrap_or("unknown"); - assert_eq!(consent_type, "data_sharing"); - // 消费者会用 template_key: "CONSENT_GRANTED" - - // revoked - let revoked = build_event_payload(json!({ - "patient_id": patient_id.to_string(), - "consent_type": "data_sharing", - "reason": "患者主动撤销", - })); - let reason = revoked - .get("reason") - .and_then(|v| v.as_str()) - .unwrap_or("未知原因"); - assert_eq!(reason, "患者主动撤销"); - } - - /// 随访创建消费者:缺少 assigned_to 时安全跳过 - #[test] - fn follow_up_created_notifier_skips_without_assignee() { - let patient_id = Uuid::now_v7(); - - let no_assignee = build_event_payload(json!({ - "patient_id": patient_id.to_string(), - })); - let uid = no_assignee.get("assigned_to").and_then(|v| v.as_str()); - assert!(uid.is_none(), "缺少 assigned_to 时不应构造通知"); - - let with_assignee = build_event_payload(json!({ - "patient_id": patient_id.to_string(), - "assigned_to": Uuid::now_v7().to_string(), - })); - let uid2 = with_assignee.get("assigned_to").and_then(|v| v.as_str()); - let pid2 = with_assignee.get("patient_id").and_then(|v| v.as_str()); - assert!(uid2.is_some() && pid2.is_some(), "两者都存在时构造通知"); - } - - /// 积分消费者:缺失 amount 时安全跳过 - #[test] - fn points_notifiers_skip_without_amount() { - let patient_id = Uuid::now_v7(); - - let no_amount = build_event_payload(json!({ - "patient_id": patient_id.to_string(), - })); - let amt = no_amount.get("amount").and_then(|v| v.as_u64()); - assert!(amt.is_none(), "缺少 amount 时安全跳过"); - - let with_amount = build_event_payload(json!({ - "patient_id": patient_id.to_string(), - "amount": 100, - })); - let pid = with_amount.get("patient_id").and_then(|v| v.as_str()); - let amt2 = with_amount.get("amount").and_then(|v| v.as_u64()); - assert!(pid.is_some() && amt2.is_some()); - } - - /// 设备读数消费者:设备类型列表与代码内硬编码一致 - #[test] - fn device_readings_consumer_device_types_match_code() { - let expected_types = [ - "heart_rate", - "blood_oxygen", - "temperature", - "blood_pressure", - "blood_glucose", - ]; - // 验证列表包含 5 种类型且无重复 - assert_eq!(expected_types.len(), 5); - let set: std::collections::HashSet<&&str> = expected_types.iter().collect(); - assert_eq!(set.len(), 5); - - // 验证每种类型都能从 payload 中提取 - let device_type_field = "heart_rate"; - assert!(expected_types.contains(&device_type_field)); - } - - /// workflow.task.completed 消费者:从 payload 提取 task_id - #[test] - fn workflow_task_consumer_extracts_task_id() { - let task_id = Uuid::now_v7(); - let payload = build_event_payload(json!({ - "task_id": task_id.to_string(), - })); - let extracted = payload - .get("task_id") - .and_then(|v| v.as_str()) - .and_then(|s| Uuid::parse_str(s).ok()); - assert_eq!(extracted, Some(task_id)); - - // 无效 UUID - let bad_uuid = build_event_payload(json!({ - "task_id": "not-a-uuid", - })); - let extracted_bad = bad_uuid - .get("task_id") - .and_then(|v| v.as_str()) - .and_then(|s| Uuid::parse_str(s).ok()); - assert!(extracted_bad.is_none(), "无效 UUID 应返回 None"); - } - - /// patient.updated 消费者:幂等审计记录 - #[test] - fn patient_updated_audit_extracts_patient_id() { - let patient_id = Uuid::now_v7(); - let payload = build_event_payload(json!({ - "patient_id": patient_id.to_string(), - "changed_fields": ["name", "phone"], - })); - let pid = payload - .get("patient_id") - .and_then(|v| v.as_str()) - .and_then(|s| Uuid::parse_str(s).ok()); - assert!(pid.is_some()); - } - - /// 消费者总数与 consumer_ids 列表一致 - #[test] - fn consumer_count_matches_expected() { - let consumer_ids = [ - "workflow_task_consumer", - "alert_aggregator", - "alert_notifier", - "patient_welcome", - "appt_created_notifier", - "appointment_notifier", - "appointment_cancel_handler", - "follow_up_escalator", - "critical_alert_consumer", - "ai_analysis_notifier", - "dialysis_notifier", - "ai_action_dispatcher", - "consent_notifier", - "consult_opened_notifier", - "consult_msg_notifier", - "consult_closed_notifier", - "fu_created_notifier", - "points_earned_notifier", - "points_exchanged_notifier", - "points_expired_notifier", - "lab_upload_ai_trigger", - "lab_reviewed_notifier", - "patient_updated_audit", - ]; - assert_eq!(consumer_ids.len(), 23, "消费者总数应为 23"); - } - - // ── care.action.performed 事件 payload 契约测试 ────────────────── - - /// care.action.performed (item_completed) payload 契约 - #[test] - fn care_action_item_completed_payload_contract() { - let plan_id = Uuid::now_v7(); - let patient_id = Uuid::now_v7(); - let payload = build_event_payload(json!({ - "plan_id": plan_id.to_string(), - "patient_id": patient_id.to_string(), - "action": "item_completed", - "item_title": "血压监测", - "item_type": "monitoring", - "operator_id": Uuid::now_v7().to_string(), - })); - - let pid = payload - .get("patient_id") - .and_then(|v| v.as_str()) - .and_then(|s| Uuid::parse_str(s).ok()); - assert!(pid.is_some(), "消费者需要 patient_id"); - - let action = payload.get("action").and_then(|v| v.as_str()).unwrap_or(""); - assert_eq!(action, "item_completed"); - - let item_title = payload - .get("item_title") - .and_then(|v| v.as_str()) - .unwrap_or("护理项目"); - assert_eq!(item_title, "血压监测"); - } - - /// care.action.performed (outcome_measured) payload 契约 - #[test] - fn care_action_outcome_measured_payload_contract() { - let plan_id = Uuid::now_v7(); - let patient_id = Uuid::now_v7(); - let payload = build_event_payload(json!({ - "plan_id": plan_id.to_string(), - "patient_id": patient_id.to_string(), - "action": "outcome_measured", - "metric": "血压", - "operator_id": Uuid::now_v7().to_string(), - })); - - let pid = payload - .get("patient_id") - .and_then(|v| v.as_str()) - .and_then(|s| Uuid::parse_str(s).ok()); - assert!(pid.is_some(), "消费者需要 patient_id"); - - let action = payload.get("action").and_then(|v| v.as_str()).unwrap_or(""); - assert_eq!(action, "outcome_measured"); - - let metric = payload - .get("metric") - .and_then(|v| v.as_str()) - .unwrap_or("健康指标"); - assert_eq!(metric, "血压"); - } - - /// care_plan.activated / care_plan.completed payload 契约 - #[test] - fn care_plan_lifecycle_payload_contract() { - let plan_id = Uuid::now_v7(); - let patient_id = Uuid::now_v7(); - - for (event_type, status) in [ - (CARE_PLAN_ACTIVATED, "active"), - (CARE_PLAN_COMPLETED, "completed"), - ] { - let payload = build_event_payload(json!({ - "plan_id": plan_id.to_string(), - "patient_id": patient_id.to_string(), - "status": status, - })); - - let pid = payload - .get("patient_id") - .and_then(|v| v.as_str()) - .and_then(|s| Uuid::parse_str(s).ok()); - assert!(pid.is_some(), "{} 消费者需要 patient_id", event_type); - - let plid = payload - .get("plan_id") - .and_then(|v| v.as_str()) - .and_then(|s| Uuid::parse_str(s).ok()); - assert!(plid.is_some(), "{} 消费者需要 plan_id", event_type); - } - } - - /// 关怀行动通知消息分支逻辑 - #[test] - fn care_action_notification_branch_logic() { - // item_completed → 温暖通知 - let item_payload = build_event_payload(json!({ - "action": "item_completed", - "item_title": "血压监测", - })); - let action = item_payload - .get("action") - .and_then(|v| v.as_str()) - .unwrap_or(""); - let (title, _body) = match action { - "item_completed" => ( - "关怀已送达".to_string(), - "您的护理团队已完成「血压监测」".to_string(), - ), - "outcome_measured" => ("健康数据已更新".to_string(), "数据已记录".to_string()), - _ => ("关怀已送达".to_string(), "正在关注".to_string()), - }; - assert_eq!(title, "关怀已送达"); - - // outcome_measured → 数据更新通知 - let outcome_payload = build_event_payload(json!({ - "action": "outcome_measured", - "metric": "血压", - })); - let action2 = outcome_payload - .get("action") - .and_then(|v| v.as_str()) - .unwrap_or(""); - let (title2, _body2) = match action2 { - "item_completed" => ("关怀已送达".to_string(), "已完成".to_string()), - "outcome_measured" => ("健康数据已更新".to_string(), "血压数据已记录".to_string()), - _ => ("关怀已送达".to_string(), "正在关注".to_string()), - }; - assert_eq!(title2, "健康数据已更新"); - - // 未知 action → 默认通知 - let unknown_payload = build_event_payload(json!({ "action": "unknown" })); - let action3 = unknown_payload - .get("action") - .and_then(|v| v.as_str()) - .unwrap_or(""); - let (title3, _body3) = match action3 { - "item_completed" => ("关怀已送达".to_string(), "已完成".to_string()), - "outcome_measured" => ("健康数据已更新".to_string(), "已记录".to_string()), - _ => ("关怀已送达".to_string(), "正在关注".to_string()), - }; - assert_eq!(title3, "关怀已送达"); - } -} diff --git a/crates/erp-health/src/event/ai.rs b/crates/erp-health/src/event/ai.rs new file mode 100644 index 0000000..ea1e77b --- /dev/null +++ b/crates/erp-health/src/event/ai.rs @@ -0,0 +1,215 @@ +/// ai.analysis.completed → 通知关联医生 + AI→行动闭环消费者 +pub fn spawn(state: &crate::state::HealthState) -> Vec { + let mut handles = Vec::new(); + + // ai.analysis.completed → 通知关联医生 + let (mut ai_rx, ai_handle) = state.event_bus.subscribe_filtered("ai.".to_string()); + handles.push(ai_handle); + let ai_db = state.db.clone(); + let ai_bus = state.event_bus.clone(); + tokio::spawn(async move { + loop { + match ai_rx.recv().await { + Some(event) if event.event_type == "ai.analysis.completed" => { + if erp_core::events::is_event_processed( + &ai_db, + event.id, + "ai_analysis_notifier", + ) + .await + .unwrap_or(false) + { + continue; + } + let analysis_id = event.payload.get("analysis_id").and_then(|v| v.as_str()); + let analysis_type = event + .payload + .get("analysis_type") + .and_then(|v| v.as_str()) + .unwrap_or("unknown"); + let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str()); + let doctor_id = event.payload.get("doctor_id").and_then(|v| v.as_str()); + + if let (Some(did), Some(pid)) = (doctor_id, patient_id) { + let notify = erp_core::events::DomainEvent::new( + "message.send", + event.tenant_id, + erp_core::events::build_event_payload(serde_json::json!({ + "channel": "in_app", + "recipient_type": "doctor", + "recipient_id": did, + "template_key": "AI_ANALYSIS_COMPLETED", + "params": { + "analysis_type": analysis_type, + "patient_id": pid, + } + })), + ); + ai_bus.publish(notify, &ai_db).await; + tracing::info!( + analysis_id = ?analysis_id, + analysis_type = %analysis_type, + doctor_id = %did, + "AI 分析完成通知已发送给医生" + ); + } else { + tracing::info!( + analysis_id = ?analysis_id, + analysis_type = %analysis_type, + "AI 分析完成(缺少关联信息,跳过通知)" + ); + } + let _ = erp_core::events::mark_event_processed( + &ai_db, + event.id, + "ai_analysis_notifier", + ) + .await; + } + Some(event) if event.event_type == "dialysis.record.created" => { + if erp_core::events::is_event_processed(&ai_db, event.id, "dialysis_notifier") + .await + .unwrap_or(false) + { + continue; + } + let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str()); + let record_id = event.payload.get("record_id").and_then(|v| v.as_str()); + tracing::info!( + record_id = ?record_id, + patient_id = ?patient_id, + "透析记录已创建,触发 KDIGO 自动评估" + ); + + // H4: 透析→KDIGO 自动串联 — 发布事件让 AI 模块执行风险评估 + if let (Some(pid), Some(rid)) = (patient_id, record_id) { + let kdigo_event = erp_core::events::DomainEvent::new( + "ai.dialysis.kdigo_requested", + event.tenant_id, + erp_core::events::build_event_payload(serde_json::json!({ + "patient_id": pid, + "dialysis_record_id": rid, + "source": "dialysis_notifier", + })), + ); + ai_bus.publish(kdigo_event, &ai_db).await; + } + + let _ = erp_core::events::mark_event_processed( + &ai_db, + event.id, + "dialysis_notifier", + ) + .await; + } + Some(_) => {} + None => break, + } + } + }); + + // ai.analysis.completed → AI→行动闭环消费者(行动分发) + let (mut ai_action_rx, ai_action_handle) = + state.event_bus.subscribe_filtered("ai.".to_string()); + handles.push(ai_action_handle); + let action_db = state.db.clone(); + let action_event_bus = state.event_bus.clone(); + tokio::spawn(async move { + loop { + match ai_action_rx.recv().await { + Some(event) if event.event_type == "ai.analysis.completed" => { + if erp_core::events::is_event_processed( + &action_db, + event.id, + "ai_action_dispatcher", + ) + .await + .unwrap_or(false) + { + continue; + } + + let tenant_id = event.tenant_id; + let analysis_id = event + .payload + .get("analysis_id") + .and_then(|v| v.as_str()) + .and_then(|s| uuid::Uuid::parse_str(s).ok()); + let patient_id = event + .payload + .get("patient_id") + .and_then(|v| v.as_str()) + .and_then(|s| uuid::Uuid::parse_str(s).ok()); + let doctor_id = event + .payload + .get("doctor_id") + .and_then(|v| v.as_str()) + .and_then(|s| uuid::Uuid::parse_str(s).ok()); + let risk_level = event + .payload + .get("risk_level") + .and_then(|v| v.as_str()) + .unwrap_or("medium"); + let suggestion_count = event + .payload + .get("suggestion_count") + .and_then(|v| v.as_u64()) + .unwrap_or(0); + + if suggestion_count > 0 + && let (Some(aid), Some(pid)) = (analysis_id, patient_id) + { + let loader_result: Result, sea_orm::DbErr> = + crate::service::ai_suggestion_loader::load_by_analysis( + &action_db, tenant_id, aid, + ) + .await; + match loader_result { + Ok(suggestions) if !suggestions.is_empty() => { + crate::service::ai_action_dispatcher::handle_ai_suggestions( + &action_db, + &action_event_bus, + tenant_id, + aid, + pid, + doctor_id, + &suggestions, + risk_level, + ) + .await; + tracing::info!( + analysis_id = %aid, + patient_id = %pid, + suggestion_count = suggestions.len(), + risk_level = %risk_level, + "AI 行动分发完成" + ); + } + Ok(_) => { + tracing::info!(analysis_id = %aid, "建议列表为空,跳过行动分发"); + } + Err(e) => { + tracing::warn!( + analysis_id = %aid, + error = %e, + "加载建议列表失败" + ); + } + } + } + + let _ = erp_core::events::mark_event_processed( + &action_db, + event.id, + "ai_action_dispatcher", + ) + .await; + } + Some(_) => {} + None => break, + } + } + }); + + handles +} diff --git a/crates/erp-health/src/event/alert.rs b/crates/erp-health/src/event/alert.rs new file mode 100644 index 0000000..c15f47a --- /dev/null +++ b/crates/erp-health/src/event/alert.rs @@ -0,0 +1,100 @@ +/// alert.triggered → 告警消息通知 + 告警聚合 +pub fn spawn(state: &crate::state::HealthState) -> Vec { + let mut handles = Vec::new(); + + // alert.triggered → 告警消息通知 + let (mut alert_rx, alert_handle) = state.event_bus.subscribe_filtered("alert.".to_string()); + handles.push(alert_handle); + let alert_db = state.db.clone(); + let alert_bus = state.event_bus.clone(); + tokio::spawn(async move { + loop { + match alert_rx.recv().await { + Some(event) if event.event_type == super::ALERT_TRIGGERED => { + if erp_core::events::is_event_processed(&alert_db, event.id, "alert_notifier") + .await + .unwrap_or(false) + { + continue; + } + let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str()); + let severity = event + .payload + .get("severity") + .and_then(|v| v.as_str()) + .unwrap_or("warning"); + let rule_name = event + .payload + .get("rule_name") + .and_then(|v| v.as_str()) + .unwrap_or("健康告警"); + if let Some(pid) = patient_id { + let notify_event = erp_core::events::DomainEvent::new( + "message.send", + event.tenant_id, + erp_core::events::build_event_payload(serde_json::json!({ + "channel": "in_app", + "recipient_type": "patient", + "recipient_id": pid, + "template_key": if severity == "critical" { "CRITICAL_HEALTH_ALERT" } else { "HEALTH_DATA_ABNORMAL" }, + "params": { + "rule_name": rule_name, + "severity": severity, + } + })), + ); + alert_bus.publish(notify_event, &alert_db).await; + tracing::info!(patient_id = %pid, severity = %severity, "告警通知已发送"); + } + let _ = erp_core::events::mark_event_processed( + &alert_db, + event.id, + "alert_notifier", + ) + .await; + } + Some(event) if event.event_type == super::ALERT_TRIGGERED => { + // 被抑制的告警 → 发布聚合事件 + if erp_core::events::is_event_processed(&alert_db, event.id, "alert_aggregator") + .await + .unwrap_or(false) + { + continue; + } + let is_suppressed = event + .payload + .get("suppressed") + .and_then(|v| v.as_bool()) + .unwrap_or(false); + + if is_suppressed { + let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str()); + if let Some(pid) = patient_id { + let aggregated_event = erp_core::events::DomainEvent::new( + super::ALERT_AGGREGATED, + event.tenant_id, + erp_core::events::build_event_payload(serde_json::json!({ + "patient_id": pid, + "triggering_alert_id": event.payload.get("alert_id").and_then(|v| v.as_str()), + "severity": event.payload.get("severity"), + })), + ); + alert_bus.publish(aggregated_event, &alert_db).await; + tracing::info!(patient_id = %pid, "告警聚合事件已发布"); + } + } + let _ = erp_core::events::mark_event_processed( + &alert_db, + event.id, + "alert_aggregator", + ) + .await; + } + Some(_) => {} + None => break, + } + } + }); + + handles +} diff --git a/crates/erp-health/src/event/appointment.rs b/crates/erp-health/src/event/appointment.rs new file mode 100644 index 0000000..d0b002e --- /dev/null +++ b/crates/erp-health/src/event/appointment.rs @@ -0,0 +1,109 @@ +/// appointment.created/confirmed/cancelled → 通知 + 号源释放 +pub fn spawn(state: &crate::state::HealthState) -> Vec { + let mut handles = Vec::new(); + + let (mut appt_rx, appt_handle) = state + .event_bus + .subscribe_filtered("appointment.".to_string()); + handles.push(appt_handle); + let appt_db = state.db.clone(); + let appt_bus = state.event_bus.clone(); + tokio::spawn(async move { + loop { + match appt_rx.recv().await { + Some(event) if event.event_type == super::APPOINTMENT_CREATED => { + if erp_core::events::is_event_processed( + &appt_db, + event.id, + "appt_created_notifier", + ) + .await + .unwrap_or(false) + { + continue; + } + let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str()); + let doctor_id = event.payload.get("doctor_id").and_then(|v| v.as_str()); + if let (Some(pid), Some(did)) = (patient_id, doctor_id) { + let notify_event = erp_core::events::DomainEvent::new( + "message.send", + event.tenant_id, + erp_core::events::build_event_payload(serde_json::json!({ + "channel": "in_app", + "recipient_type": "patient", + "recipient_id": pid, + "template_key": "APPOINTMENT_CREATED", + "params": { "doctor_id": did } + })), + ); + appt_bus.publish(notify_event, &appt_db).await; + tracing::info!(patient_id = pid, doctor_id = did, "预约创建通知已发送"); + } + let _ = erp_core::events::mark_event_processed( + &appt_db, + event.id, + "appt_created_notifier", + ) + .await; + } + Some(event) if event.event_type == "appointment.confirmed" => { + if erp_core::events::is_event_processed( + &appt_db, + event.id, + "appointment_notifier", + ) + .await + .unwrap_or(false) + { + continue; + } + let doctor_id = event.payload.get("doctor_id").and_then(|v| v.as_str()); + let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str()); + if let (Some(did), Some(pid)) = (doctor_id, patient_id) { + let notify_event = erp_core::events::DomainEvent::new( + "message.send", + event.tenant_id, + erp_core::events::build_event_payload(serde_json::json!({ + "template": "appointment_confirmed", + "recipient_type": "doctor", + "recipient_id": did, + "patient_id": pid, + })), + ); + appt_bus.publish(notify_event, &appt_db).await; + tracing::info!(doctor_id = did, patient_id = pid, "预约确认通知触发"); + } + let _ = erp_core::events::mark_event_processed( + &appt_db, + event.id, + "appointment_notifier", + ) + .await; + } + Some(event) if event.event_type == "appointment.cancelled" => { + if erp_core::events::is_event_processed( + &appt_db, + event.id, + "appointment_cancel_handler", + ) + .await + .unwrap_or(false) + { + continue; + } + tracing::info!(event_id = %event.id, "预约取消,号源释放"); + let _ = erp_core::events::mark_event_processed( + &appt_db, + event.id, + "appointment_cancel_handler", + ) + .await; + } + Some(_) => {} + None => break, + } + } + }); + + handles +} diff --git a/crates/erp-health/src/event/consent.rs b/crates/erp-health/src/event/consent.rs new file mode 100644 index 0000000..2b48984 --- /dev/null +++ b/crates/erp-health/src/event/consent.rs @@ -0,0 +1,102 @@ +/// consent.granted/revoked → 通知关联医生 +pub fn spawn(state: &crate::state::HealthState) -> Vec { + let mut handles = Vec::new(); + + let (mut consent_rx, consent_handle) = + state.event_bus.subscribe_filtered("consent.".to_string()); + handles.push(consent_handle); + let consent_db = state.db.clone(); + let consent_bus = state.event_bus.clone(); + tokio::spawn(async move { + loop { + match consent_rx.recv().await { + Some(event) if event.event_type == super::CONSENT_GRANTED => { + if erp_core::events::is_event_processed( + &consent_db, + event.id, + "consent_notifier", + ) + .await + .unwrap_or(false) + { + continue; + } + let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str()); + let consent_type = event + .payload + .get("consent_type") + .and_then(|v| v.as_str()) + .unwrap_or("unknown"); + if let Some(pid) = patient_id { + let notify_event = erp_core::events::DomainEvent::new( + "message.send", + event.tenant_id, + erp_core::events::build_event_payload(serde_json::json!({ + "channel": "in_app", + "recipient_type": "patient", + "recipient_id": pid, + "template_key": "CONSENT_GRANTED", + "params": { + "consent_type": consent_type, + } + })), + ); + consent_bus.publish(notify_event, &consent_db).await; + tracing::info!(patient_id = %pid, consent_type = %consent_type, "知情同意授予通知已发送"); + } + let _ = erp_core::events::mark_event_processed( + &consent_db, + event.id, + "consent_notifier", + ) + .await; + } + Some(event) if event.event_type == super::CONSENT_REVOKED => { + if erp_core::events::is_event_processed( + &consent_db, + event.id, + "consent_notifier", + ) + .await + .unwrap_or(false) + { + continue; + } + let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str()); + let consent_type = event + .payload + .get("consent_type") + .and_then(|v| v.as_str()) + .unwrap_or("unknown"); + if let Some(pid) = patient_id { + let notify_event = erp_core::events::DomainEvent::new( + "message.send", + event.tenant_id, + erp_core::events::build_event_payload(serde_json::json!({ + "channel": "in_app", + "recipient_type": "staff", + "template_key": "CONSENT_REVOKED", + "params": { + "patient_id": pid, + "consent_type": consent_type, + } + })), + ); + consent_bus.publish(notify_event, &consent_db).await; + tracing::warn!(patient_id = %pid, consent_type = %consent_type, "知情同意撤回通知已发送给医护"); + } + let _ = erp_core::events::mark_event_processed( + &consent_db, + event.id, + "consent_notifier", + ) + .await; + } + Some(_) => {} + None => break, + } + } + }); + + handles +} diff --git a/crates/erp-health/src/event/consultation.rs b/crates/erp-health/src/event/consultation.rs new file mode 100644 index 0000000..6534eea --- /dev/null +++ b/crates/erp-health/src/event/consultation.rs @@ -0,0 +1,135 @@ +/// consultation.opened/new_message/closed → 通知相关方 +pub fn spawn(state: &crate::state::HealthState) -> Vec { + let mut handles = Vec::new(); + + let (mut consult_rx, consult_handle) = state + .event_bus + .subscribe_filtered("consultation.".to_string()); + handles.push(consult_handle); + let consult_db = state.db.clone(); + let consult_bus = state.event_bus.clone(); + tokio::spawn(async move { + loop { + match consult_rx.recv().await { + Some(event) if event.event_type == super::CONSULTATION_OPENED => { + if erp_core::events::is_event_processed( + &consult_db, + event.id, + "consult_opened_notifier", + ) + .await + .unwrap_or(false) + { + continue; + } + let doctor_id = event.payload.get("doctor_id").and_then(|v| v.as_str()); + let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str()); + if let (Some(did), Some(pid)) = (doctor_id, patient_id) { + let notify = erp_core::events::DomainEvent::new( + "message.send", + event.tenant_id, + erp_core::events::build_event_payload(serde_json::json!({ + "channel": "in_app", + "recipient_type": "doctor", + "recipient_id": did, + "template_key": "CONSULTATION_OPENED", + "params": { "patient_id": pid } + })), + ); + consult_bus.publish(notify, &consult_db).await; + tracing::info!( + doctor_id = did, + patient_id = pid, + "咨询开启通知已发送给医生" + ); + } + let _ = erp_core::events::mark_event_processed( + &consult_db, + event.id, + "consult_opened_notifier", + ) + .await; + } + Some(event) if event.event_type == super::CONSULTATION_NEW_MESSAGE => { + if erp_core::events::is_event_processed( + &consult_db, + event.id, + "consult_msg_notifier", + ) + .await + .unwrap_or(false) + { + continue; + } + let recipient_id = event.payload.get("recipient_id").and_then(|v| v.as_str()); + let sender_role = event + .payload + .get("sender_role") + .and_then(|v| v.as_str()) + .unwrap_or("unknown"); + if let Some(rid) = recipient_id { + let notify = erp_core::events::DomainEvent::new( + "message.send", + event.tenant_id, + erp_core::events::build_event_payload(serde_json::json!({ + "channel": "in_app", + "recipient_type": if sender_role == "patient" { "doctor" } else { "patient" }, + "recipient_id": rid, + "template_key": "CONSULTATION_NEW_MESSAGE", + })), + ); + consult_bus.publish(notify, &consult_db).await; + tracing::info!( + recipient_id = rid, + sender_role = sender_role, + "咨询新消息通知已发送" + ); + } + let _ = erp_core::events::mark_event_processed( + &consult_db, + event.id, + "consult_msg_notifier", + ) + .await; + } + Some(event) if event.event_type == super::CONSULTATION_CLOSED => { + if erp_core::events::is_event_processed( + &consult_db, + event.id, + "consult_closed_notifier", + ) + .await + .unwrap_or(false) + { + continue; + } + let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str()); + if let Some(pid) = patient_id { + let notify = erp_core::events::DomainEvent::new( + "message.send", + event.tenant_id, + erp_core::events::build_event_payload(serde_json::json!({ + "channel": "in_app", + "recipient_type": "patient", + "recipient_id": pid, + "template_key": "CONSULTATION_CLOSED", + })), + ); + consult_bus.publish(notify, &consult_db).await; + tracing::info!(patient_id = pid, "咨询关闭通知已发送"); + } + let _ = erp_core::events::mark_event_processed( + &consult_db, + event.id, + "consult_closed_notifier", + ) + .await; + } + Some(_) => {} + None => break, + } + } + }); + + handles +} diff --git a/crates/erp-health/src/event/device.rs b/crates/erp-health/src/event/device.rs new file mode 100644 index 0000000..d594c90 --- /dev/null +++ b/crates/erp-health/src/event/device.rs @@ -0,0 +1,53 @@ +/// device.readings.synced → 触发告警引擎评估 +pub fn spawn(state: &crate::state::HealthState) -> Vec { + let mut handles = Vec::new(); + + let (mut reading_rx, reading_handle) = state + .event_bus + .subscribe_filtered("device.readings.".to_string()); + handles.push(reading_handle); + let eval_state = state.clone(); + tokio::spawn(async move { + loop { + match reading_rx.recv().await { + Some(event) if event.event_type == super::DEVICE_READINGS_SYNCED => { + let patient_id = event + .payload + .get("patient_id") + .and_then(|v| v.as_str()) + .and_then(|s| uuid::Uuid::parse_str(s).ok()); + if let Some(pid) = patient_id { + // 对所有设备类型触发评估 + for device_type in &[ + "heart_rate", + "blood_oxygen", + "temperature", + "blood_pressure", + "blood_glucose", + ] { + if let Err(e) = crate::service::alert_engine::evaluate_rules( + &eval_state, + event.tenant_id, + pid, + device_type, + ) + .await + { + tracing::error!( + patient_id = %pid, + device_type = device_type, + error = %e, + "告警评估失败" + ); + } + } + } + } + Some(_) => {} + None => break, + } + } + }); + + handles +} diff --git a/crates/erp-health/src/event/follow_up.rs b/crates/erp-health/src/event/follow_up.rs new file mode 100644 index 0000000..6657ccb --- /dev/null +++ b/crates/erp-health/src/event/follow_up.rs @@ -0,0 +1,143 @@ +/// follow_up.overdue → 升级通知 + follow_up.created → 通知执行人 +pub fn spawn(state: &crate::state::HealthState) -> Vec { + let mut handles = Vec::new(); + + // follow_up.overdue → 升级通知 + let (mut fu_rx, fu_handle) = state.event_bus.subscribe_filtered("follow_up.".to_string()); + handles.push(fu_handle); + let fu_db = state.db.clone(); + let fu_bus = state.event_bus.clone(); + tokio::spawn(async move { + loop { + match fu_rx.recv().await { + Some(event) if event.event_type == super::FOLLOW_UP_OVERDUE => { + if erp_core::events::is_event_processed(&fu_db, event.id, "follow_up_escalator") + .await + .unwrap_or(false) + { + continue; + } + let task_id = event.payload.get("task_id").and_then(|v| v.as_str()); + let assigned_to = event.payload.get("assigned_to").and_then(|v| v.as_str()); + if let (Some(tid), Some(uid)) = (task_id, assigned_to) { + let escalate_event = erp_core::events::DomainEvent::new( + "message.send", + event.tenant_id, + erp_core::events::build_event_payload(serde_json::json!({ + "template": "follow_up_overdue", + "recipient_type": "staff", + "recipient_id": uid, + "task_id": tid, + })), + ); + fu_bus.publish(escalate_event, &fu_db).await; + tracing::warn!(task_id = tid, assigned_to = uid, "随访逾期升级通知"); + } + let _ = erp_core::events::mark_event_processed( + &fu_db, + event.id, + "follow_up_escalator", + ) + .await; + } + Some(event) if event.event_type == super::FOLLOW_UP_COMPLETED => { + // 随访完成 → 检查是否由 AI 触发,触发再分析 + if let Some(task_id_str) = event.payload.get("task_id").and_then(|v| v.as_str()) + && let Ok(task_id) = uuid::Uuid::parse_str(task_id_str) + { + let patient_id = event + .payload + .get("patient_id") + .and_then(|v| v.as_str()) + .and_then(|s| uuid::Uuid::parse_str(s).ok()); + + if let Some(patient_id) = patient_id { + // 通过 ai_suggestion_loader 查找关联的 AI 建议 + if let Some(suggestion_id) = + crate::service::ai_suggestion_loader::find_by_followup_task( + &fu_db, + event.tenant_id, + task_id, + ) + .await + .unwrap_or(None) + { + let reanalysis_event = erp_core::events::DomainEvent::new( + "ai.reanalysis.requested", + event.tenant_id, + erp_core::events::build_event_payload(serde_json::json!({ + "original_suggestion_id": suggestion_id.to_string(), + "patient_id": patient_id.to_string(), + "followup_task_id": task_id_str, + "trigger": "loop_closure", + })), + ); + fu_bus.publish(reanalysis_event, &fu_db).await; + tracing::info!( + suggestion_id = %suggestion_id, + patient_id = %patient_id, + task_id = %task_id, + "随访完成,触发 AI 再分析(闭环)" + ); + } + } + } + } + Some(_) => {} + None => break, + } + } + }); + + // follow_up.created → 通知执行人 + let (mut fu_created_rx, fu_created_handle) = + state.event_bus.subscribe_filtered("follow_up.".to_string()); + handles.push(fu_created_handle); + let fu_created_db = state.db.clone(); + let fu_created_bus = state.event_bus.clone(); + tokio::spawn(async move { + loop { + match fu_created_rx.recv().await { + Some(event) if event.event_type == super::FOLLOW_UP_CREATED => { + if erp_core::events::is_event_processed( + &fu_created_db, + event.id, + "fu_created_notifier", + ) + .await + .unwrap_or(false) + { + continue; + } + let assigned_to = event.payload.get("assigned_to").and_then(|v| v.as_str()); + let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str()); + if let (Some(uid), Some(pid)) = (assigned_to, patient_id) { + let notify = erp_core::events::DomainEvent::new( + "message.send", + event.tenant_id, + erp_core::events::build_event_payload(serde_json::json!({ + "channel": "in_app", + "recipient_type": "staff", + "recipient_id": uid, + "template_key": "FOLLOW_UP_CREATED", + "params": { "patient_id": pid } + })), + ); + fu_created_bus.publish(notify, &fu_created_db).await; + tracing::info!(assigned_to = uid, patient_id = pid, "随访创建通知已发送"); + } + let _ = erp_core::events::mark_event_processed( + &fu_created_db, + event.id, + "fu_created_notifier", + ) + .await; + } + Some(_) => {} + None => break, + } + } + }); + + handles +} diff --git a/crates/erp-health/src/event/health_data.rs b/crates/erp-health/src/event/health_data.rs new file mode 100644 index 0000000..0598c76 --- /dev/null +++ b/crates/erp-health/src/event/health_data.rs @@ -0,0 +1,100 @@ +/// health_data.critical_alert → 创建危急值告警记录 +pub fn spawn(state: &crate::state::HealthState) -> Vec { + let mut handles = Vec::new(); + + let (mut critical_rx, critical_handle) = state + .event_bus + .subscribe_filtered("health_data.".to_string()); + handles.push(critical_handle); + let critical_state = state.clone(); + tokio::spawn(async move { + loop { + match critical_rx.recv().await { + Some(event) if event.event_type == super::HEALTH_DATA_CRITICAL_ALERT => { + // 幂等检查 + if erp_core::events::is_event_processed( + &critical_state.db, + event.id, + "critical_alert_consumer", + ) + .await + .unwrap_or(false) + { + continue; + } + + let patient_id = event + .payload + .get("patient_id") + .and_then(|v| v.as_str()) + .and_then(|s| uuid::Uuid::parse_str(s).ok()); + // alert 数据在嵌套的 "alert" 对象中 + let alert_obj = event.payload.get("alert"); + let alert_type = "vital_sign"; + let metric_name = alert_obj + .and_then(|a| a.get("indicator")) + .and_then(|v| v.as_str()) + .unwrap_or("unknown"); + let metric_value = alert_obj + .and_then(|a| a.get("value")) + .and_then(|v| v.as_f64()) + .map(|v| v.to_string()) + .unwrap_or_default(); + let threshold_value = alert_obj + .and_then(|a| a.get("threshold")) + .and_then(|v| v.as_f64()) + .map(|v| v.to_string()) + .unwrap_or_default(); + let severity = alert_obj + .and_then(|a| a.get("level")) + .and_then(|v| v.as_str()) + .unwrap_or("critical"); + + if let Some(pid) = patient_id { + match crate::service::critical_alert_service::handle_critical_alert_event( + &critical_state, + event.tenant_id, + pid, + alert_type, + metric_name, + &metric_value, + &threshold_value, + None, + severity, + ) + .await + { + Ok(alert_id) => { + tracing::info!( + event_id = %event.id, + alert_id = %alert_id, + patient_id = %pid, + metric = %metric_name, + "危急值告警已创建" + ); + let _ = erp_core::events::mark_event_processed( + &critical_state.db, + event.id, + "critical_alert_consumer", + ) + .await; + } + Err(e) => { + tracing::error!( + event_id = %event.id, + patient_id = %pid, + error = %e, + "危急值告警创建失败" + ); + } + } + } + } + Some(_) => {} + None => break, + } + } + }); + + handles +} diff --git a/crates/erp-health/src/event/lab_report.rs b/crates/erp-health/src/event/lab_report.rs new file mode 100644 index 0000000..c3b581e --- /dev/null +++ b/crates/erp-health/src/event/lab_report.rs @@ -0,0 +1,97 @@ +/// lab_report.uploaded → 触发 AI 自动分析 + lab_report.reviewed → 通知患者 +pub fn spawn(state: &crate::state::HealthState) -> Vec { + let mut handles = Vec::new(); + + let (mut lab_upload_rx, lab_upload_handle) = state + .event_bus + .subscribe_filtered("lab_report.".to_string()); + handles.push(lab_upload_handle); + let lab_upload_db = state.db.clone(); + let lab_upload_bus = state.event_bus.clone(); + tokio::spawn(async move { + loop { + match lab_upload_rx.recv().await { + Some(event) if event.event_type == super::LAB_REPORT_UPLOADED => { + if erp_core::events::is_event_processed( + &lab_upload_db, + event.id, + "lab_upload_ai_trigger", + ) + .await + .unwrap_or(false) + { + continue; + } + let report_id = event.payload.get("report_id").and_then(|v| v.as_str()); + let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str()); + if let (Some(rid), Some(pid)) = (report_id, patient_id) { + let ai_event = erp_core::events::DomainEvent::new( + "ai.analysis.requested", + event.tenant_id, + erp_core::events::build_event_payload(serde_json::json!({ + "source_type": "lab_report", + "source_id": rid, + "patient_id": pid, + })), + ); + lab_upload_bus.publish(ai_event, &lab_upload_db).await; + tracing::info!( + report_id = rid, + patient_id = pid, + "化验单上传触发 AI 分析请求" + ); + } + let _ = erp_core::events::mark_event_processed( + &lab_upload_db, + event.id, + "lab_upload_ai_trigger", + ) + .await; + } + Some(event) if event.event_type == super::LAB_REPORT_REVIEWED => { + if erp_core::events::is_event_processed( + &lab_upload_db, + event.id, + "lab_reviewed_notifier", + ) + .await + .unwrap_or(false) + { + continue; + } + let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str()); + let reviewer_id = event.payload.get("reviewer_id").and_then(|v| v.as_str()); + if let (Some(pid), Some(rid)) = (patient_id, reviewer_id) { + let notify = erp_core::events::DomainEvent::new( + "message.send", + event.tenant_id, + erp_core::events::build_event_payload(serde_json::json!({ + "channel": "in_app", + "recipient_type": "patient", + "recipient_id": pid, + "template_key": "LAB_REPORT_REVIEWED", + "params": { "reviewer_id": rid } + })), + ); + lab_upload_bus.publish(notify, &lab_upload_db).await; + tracing::info!( + patient_id = pid, + reviewer_id = rid, + "化验报告审核通知已发送给患者" + ); + } + let _ = erp_core::events::mark_event_processed( + &lab_upload_db, + event.id, + "lab_reviewed_notifier", + ) + .await; + } + Some(_) => {} + None => break, + } + } + }); + + handles +} diff --git a/crates/erp-health/src/event/mod.rs b/crates/erp-health/src/event/mod.rs new file mode 100644 index 0000000..a2be03b --- /dev/null +++ b/crates/erp-health/src/event/mod.rs @@ -0,0 +1,219 @@ +use erp_core::events::EventBus; + +mod ai; +mod alert; +mod appointment; +mod consent; +mod consultation; +mod device; +mod follow_up; +mod health_data; +mod lab_report; +mod patient; +mod points; +mod workflow; + +// --------------------------------------------------------------------------- +// 事件类型常量 — 集中管理,避免硬编码字符串散布在 service 层 +// --------------------------------------------------------------------------- + +// 预约 +pub const APPOINTMENT_CREATED: &str = "appointment.created"; +// appointment.confirmed / appointment.cancelled 等 — 动态拼接 + +// 告警 +pub const ALERT_TRIGGERED: &str = "alert.triggered"; +pub const ALERT_AGGREGATED: &str = "alert.aggregated"; + +// 知情同意 +pub const CONSENT_GRANTED: &str = "consent.granted"; +pub const CONSENT_REVOKED: &str = "consent.revoked"; + +// 文章 +pub const ARTICLE_PUBLISHED: &str = "article.published"; +pub const ARTICLE_REJECTED: &str = "article.rejected"; + +// 咨询 +pub const CONSULTATION_OPENED: &str = "consultation.opened"; +pub const CONSULTATION_CLOSED: &str = "consultation.closed"; +pub const CONSULTATION_NEW_MESSAGE: &str = "consultation.new_message"; + +// 设备数据 +pub const DEVICE_READINGS_SYNCED: &str = "device.readings.synced"; + +// 医生 +pub const DOCTOR_ONLINE_STATUS_CHANGED: &str = "doctor.online_status_changed"; + +// 随访 +pub const FOLLOW_UP_CREATED: &str = "follow_up.created"; +pub const FOLLOW_UP_COMPLETED: &str = "follow_up.completed"; +pub const FOLLOW_UP_OVERDUE: &str = "follow_up.overdue"; + +// 日常监测 +pub const DAILY_MONITORING_CREATED: &str = "daily_monitoring.created"; + +// 健康数据 +pub const LAB_REPORT_UPLOADED: &str = "lab_report.uploaded"; +pub const LAB_REPORT_REVIEWED: &str = "lab_report.reviewed"; +pub const HEALTH_DATA_CRITICAL_ALERT: &str = "health_data.critical_alert"; + +// 患者 +pub const PATIENT_CREATED: &str = "patient.created"; +pub const PATIENT_UPDATED: &str = "patient.updated"; +// TODO: 以下常量对应的患者认证和死亡记录流程尚未实现,待后续迭代 +pub const PATIENT_VERIFIED: &str = "patient.verified"; +pub const PATIENT_DECEASED: &str = "patient.deceased"; + +// 积分 +pub const POINTS_EXPIRED: &str = "points.expired"; +pub const POINTS_EARNED: &str = "points.earned"; +pub const POINTS_EXCHANGED: &str = "points.exchanged"; + +// 护理计划 +pub const CARE_PLAN_CREATED: &str = "care_plan.created"; +pub const CARE_PLAN_UPDATED: &str = "care_plan.updated"; +pub const CARE_PLAN_ACTIVATED: &str = "care_plan.activated"; +pub const CARE_PLAN_COMPLETED: &str = "care_plan.completed"; + +// 关怀行动 +pub const CARE_ACTION_PERFORMED: &str = "care.action.performed"; + +/// 兼容旧签名 — 不做任何实际订阅(逻辑已迁移到 on_startup) +pub fn register_handlers(_bus: &EventBus) { + // 事件处理器已迁移到 on_startup → register_handlers_with_state +} + +/// 带 HealthState 的事件订阅 — 在模块 on_startup 时调用 +pub fn register_handlers_with_state(state: crate::state::HealthState) { + let mut handles: Vec = Vec::new(); + + handles.extend(workflow::spawn(&state)); + handles.extend(device::spawn(&state)); + handles.extend(alert::spawn(&state)); + handles.extend(patient::spawn(&state)); + handles.extend(appointment::spawn(&state)); + handles.extend(follow_up::spawn(&state)); + handles.extend(health_data::spawn(&state)); + handles.extend(ai::spawn(&state)); + handles.extend(consent::spawn(&state)); + handles.extend(consultation::spawn(&state)); + handles.extend(points::spawn(&state)); + handles.extend(lab_report::spawn(&state)); + + // 防止 SubscriptionHandle 被 drop 导致 cancel channel 关闭 + // 所有过滤订阅的生命周期应与进程一致 + std::mem::forget(handles); +} + +// 事件处理器本身依赖 tokio::spawn + channel + DB,无法纯单元测试。 +// 以下测试覆盖: +// 1. 事件类型常量的正确性(防止拼写错误导致消费者不匹配) +// 2. register_handlers 不 panic(空函数) +// 3. 事件 payload 构造格式与消费者解析逻辑的契约 +// 4. EventBus 过滤订阅的内存行为(无需 DB) +// 5. 消费者从 payload 中提取字段的边界条件 +// --------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use super::*; + use erp_core::events::{DomainEvent, EventBus, build_event_payload}; + use serde_json::json; + use std::collections::HashSet; + + // ── 事件类型常量 ────────────────────────────────────────────────────── + + /// 所有事件类型常量必须遵循 `{domain}.{action}` 格式 + fn assert_valid_event_type(name: &str) { + let parts: Vec<&str> = name.split('.').collect(); + assert!( + parts.len() >= 2, + "事件类型 '{}' 不符合 domain.action 格式", + name + ); + assert!( + !parts[0].is_empty() && !parts[1].is_empty(), + "事件类型 '{}' 的 domain 或 action 为空", + name + ); + } + + #[test] + fn event_constants_follow_naming_convention() { + let all_types = [ + APPOINTMENT_CREATED, + ALERT_TRIGGERED, + ALERT_AGGREGATED, + CONSENT_GRANTED, + CONSENT_REVOKED, + ARTICLE_PUBLISHED, + ARTICLE_REJECTED, + CONSULTATION_OPENED, + CONSULTATION_CLOSED, + CONSULTATION_NEW_MESSAGE, + DEVICE_READINGS_SYNCED, + DOCTOR_ONLINE_STATUS_CHANGED, + FOLLOW_UP_CREATED, + FOLLOW_UP_COMPLETED, + FOLLOW_UP_OVERDUE, + DAILY_MONITORING_CREATED, + LAB_REPORT_UPLOADED, + LAB_REPORT_REVIEWED, + HEALTH_DATA_CRITICAL_ALERT, + PATIENT_CREATED, + PATIENT_UPDATED, + PATIENT_VERIFIED, + PATIENT_DECEASED, + POINTS_EXPIRED, + POINTS_EARNED, + POINTS_EXCHANGED, + CARE_PLAN_CREATED, + CARE_PLAN_UPDATED, + CARE_PLAN_ACTIVATED, + CARE_PLAN_COMPLETED, + CARE_ACTION_PERFORMED, + ]; + for t in &all_types { + assert_valid_event_type(t); + } + } + + #[test] + fn event_constants_are_unique() { + let all_types = [ + APPOINTMENT_CREATED, + ALERT_TRIGGERED, + ALERT_AGGREGATED, + CONSENT_GRANTED, + CONSENT_REVOKED, + ARTICLE_PUBLISHED, + ARTICLE_REJECTED, + CONSULTATION_OPENED, + CONSULTATION_CLOSED, + CONSULTATION_NEW_MESSAGE, + DEVICE_READINGS_SYNCED, + DOCTOR_ONLINE_STATUS_CHANGED, + FOLLOW_UP_CREATED, + FOLLOW_UP_COMPLETED, + FOLLOW_UP_OVERDUE, + DAILY_MONITORING_CREATED, + LAB_REPORT_UPLOADED, + LAB_REPORT_REVIEWED, + HEALTH_DATA_CRITICAL_ALERT, + PATIENT_CREATED, + PATIENT_UPDATED, + PATIENT_VERIFIED, + PATIENT_DECEASED, + POINTS_EXPIRED, + POINTS_EARNED, + POINTS_EXCHANGED, + CARE_PLAN_CREATED, + CARE_PLAN_UPDATED, + CARE_PLAN_ACTIVATED, + CARE_PLAN_COMPLETED, + CARE_ACTION_PERFORMED, + ]; + let set: HashSet<_> = all_types.into_iter().collect(); + assert_eq!(set.len(), all_types.len(), "存在重复的事件类型常量"); + } +} diff --git a/crates/erp-health/src/event/patient.rs b/crates/erp-health/src/event/patient.rs new file mode 100644 index 0000000..add2651 --- /dev/null +++ b/crates/erp-health/src/event/patient.rs @@ -0,0 +1,91 @@ +/// patient.created → 欢迎消息通知 + patient.updated → 审计日志 +pub fn spawn(state: &crate::state::HealthState) -> Vec { + let mut handles = Vec::new(); + + // patient.created → 欢迎消息通知 + let (mut patient_rx, patient_handle) = + state.event_bus.subscribe_filtered("patient.".to_string()); + handles.push(patient_handle); + let patient_db = state.db.clone(); + let patient_bus = state.event_bus.clone(); + tokio::spawn(async move { + loop { + match patient_rx.recv().await { + Some(event) if event.event_type == super::PATIENT_CREATED => { + if erp_core::events::is_event_processed( + &patient_db, + event.id, + "patient_welcome", + ) + .await + .unwrap_or(false) + { + continue; + } + let patient_id = event + .payload + .get("patient_id") + .and_then(|v| v.as_str()) + .and_then(|s| uuid::Uuid::parse_str(s).ok()); + if let Some(pid) = patient_id { + let welcome_event = erp_core::events::DomainEvent::new( + "message.send", + event.tenant_id, + erp_core::events::build_event_payload(serde_json::json!({ + "template": "patient_welcome", + "recipient_type": "patient", + "recipient_id": pid, + })), + ); + patient_bus.publish(welcome_event, &patient_db).await; + tracing::info!(patient_id = %pid, "新患者欢迎流程触发"); + } + let _ = erp_core::events::mark_event_processed( + &patient_db, + event.id, + "patient_welcome", + ) + .await; + } + Some(_) => {} + None => break, + } + } + }); + + // patient.updated → 审计日志 + let (mut patient_update_rx, patient_update_handle) = + state.event_bus.subscribe_filtered("patient.".to_string()); + handles.push(patient_update_handle); + let patient_update_db = state.db.clone(); + tokio::spawn(async move { + loop { + match patient_update_rx.recv().await { + Some(event) if event.event_type == super::PATIENT_UPDATED => { + if erp_core::events::is_event_processed( + &patient_update_db, + event.id, + "patient_updated_audit", + ) + .await + .unwrap_or(false) + { + continue; + } + let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str()); + tracing::info!(patient_id = ?patient_id, tenant_id = %event.tenant_id, "患者信息已更新"); + let _ = erp_core::events::mark_event_processed( + &patient_update_db, + event.id, + "patient_updated_audit", + ) + .await; + } + Some(_) => {} + None => break, + } + } + }); + + handles +} diff --git a/crates/erp-health/src/event/points.rs b/crates/erp-health/src/event/points.rs new file mode 100644 index 0000000..87d1487 --- /dev/null +++ b/crates/erp-health/src/event/points.rs @@ -0,0 +1,124 @@ +/// points.earned/exchanged/expired → 积分变动通知 +pub fn spawn(state: &crate::state::HealthState) -> Vec { + let mut handles = Vec::new(); + + let (mut points_rx, points_handle) = state.event_bus.subscribe_filtered("points.".to_string()); + handles.push(points_handle); + let points_db = state.db.clone(); + let points_bus = state.event_bus.clone(); + tokio::spawn(async move { + loop { + match points_rx.recv().await { + Some(event) if event.event_type == super::POINTS_EARNED => { + if erp_core::events::is_event_processed( + &points_db, + event.id, + "points_earned_notifier", + ) + .await + .unwrap_or(false) + { + continue; + } + let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str()); + let amount = event.payload.get("amount").and_then(|v| v.as_u64()); + if let (Some(pid), Some(amt)) = (patient_id, amount) { + let notify = erp_core::events::DomainEvent::new( + "message.send", + event.tenant_id, + erp_core::events::build_event_payload(serde_json::json!({ + "channel": "in_app", + "recipient_type": "patient", + "recipient_id": pid, + "template_key": "POINTS_EARNED", + "params": { "amount": amt } + })), + ); + points_bus.publish(notify, &points_db).await; + tracing::info!(patient_id = pid, amount = amt, "积分获得通知已发送"); + } + let _ = erp_core::events::mark_event_processed( + &points_db, + event.id, + "points_earned_notifier", + ) + .await; + } + Some(event) if event.event_type == super::POINTS_EXCHANGED => { + if erp_core::events::is_event_processed( + &points_db, + event.id, + "points_exchanged_notifier", + ) + .await + .unwrap_or(false) + { + continue; + } + let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str()); + let amount = event.payload.get("amount").and_then(|v| v.as_u64()); + if let (Some(pid), Some(amt)) = (patient_id, amount) { + let notify = erp_core::events::DomainEvent::new( + "message.send", + event.tenant_id, + erp_core::events::build_event_payload(serde_json::json!({ + "channel": "in_app", + "recipient_type": "patient", + "recipient_id": pid, + "template_key": "POINTS_EXCHANGED", + "params": { "amount": amt } + })), + ); + points_bus.publish(notify, &points_db).await; + tracing::info!(patient_id = pid, amount = amt, "积分兑换通知已发送"); + } + let _ = erp_core::events::mark_event_processed( + &points_db, + event.id, + "points_exchanged_notifier", + ) + .await; + } + Some(event) if event.event_type == super::POINTS_EXPIRED => { + if erp_core::events::is_event_processed( + &points_db, + event.id, + "points_expired_notifier", + ) + .await + .unwrap_or(false) + { + continue; + } + let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str()); + let amount = event.payload.get("amount").and_then(|v| v.as_u64()); + if let (Some(pid), Some(amt)) = (patient_id, amount) { + let notify = erp_core::events::DomainEvent::new( + "message.send", + event.tenant_id, + erp_core::events::build_event_payload(serde_json::json!({ + "channel": "in_app", + "recipient_type": "patient", + "recipient_id": pid, + "template_key": "POINTS_EXPIRED", + "params": { "amount": amt } + })), + ); + points_bus.publish(notify, &points_db).await; + tracing::info!(patient_id = pid, amount = amt, "积分过期通知已发送"); + } + let _ = erp_core::events::mark_event_processed( + &points_db, + event.id, + "points_expired_notifier", + ) + .await; + } + Some(_) => {} + None => break, + } + } + }); + + handles +} diff --git a/crates/erp-health/src/event/workflow.rs b/crates/erp-health/src/event/workflow.rs new file mode 100644 index 0000000..2e22520 --- /dev/null +++ b/crates/erp-health/src/event/workflow.rs @@ -0,0 +1,76 @@ +/// workflow.task.completed → 更新随访任务状态为 completed +pub fn spawn(state: &crate::state::HealthState) -> Vec { + let mut handles = Vec::new(); + + let (mut workflow_rx, wf_handle) = state + .event_bus + .subscribe_filtered("workflow.task.".to_string()); + handles.push(wf_handle); + let wf_db = state.db.clone(); + tokio::spawn(async move { + loop { + match workflow_rx.recv().await { + Some(event) if event.event_type == "workflow.task.completed" => { + if erp_core::events::is_event_processed( + &wf_db, + event.id, + "workflow_task_consumer", + ) + .await + .unwrap_or(false) + { + continue; + } + let task_id = event + .payload + .get("task_id") + .and_then(|v| v.as_str()) + .and_then(|s| uuid::Uuid::parse_str(s).ok()); + match task_id { + Some(task_id) => { + match crate::service::follow_up_service::complete_task_by_system( + &wf_db, + task_id, + event.tenant_id, + ) + .await + { + Ok(()) => { + tracing::info!( + event_id = %event.id, + task_id = %task_id, + "工作流任务完成 → 随访任务已更新" + ); + } + Err(e) => { + tracing::warn!( + event_id = %event.id, + task_id = %task_id, + error = %e, + "工作流任务完成 → 随访任务更新失败" + ); + } + } + } + None => { + tracing::warn!( + event_id = %event.id, + "工作流任务完成事件缺少 task_id,跳过" + ); + } + } + let _ = erp_core::events::mark_event_processed( + &wf_db, + event.id, + "workflow_task_consumer", + ) + .await; + } + Some(_) => {} + None => break, + } + } + }); + + handles +}