Files
hms/crates/erp-health/src/event.rs
iven 15b5781dbb
Some checks failed
CI / rust-check (push) Has been cancelled
CI / rust-test (push) Has been cancelled
CI / frontend-build (push) Has been cancelled
CI / security-audit (push) Has been cancelled
fix(health): 危急值告警全链路修复 — 消费者生命周期 + payload 映射 + 阈值优先级
1. CRITICAL: 修复 SubscriptionHandle 提前 drop 导致所有事件消费者失效
   - register_handlers_with_state 中所有 handle 在函数返回时被 drop
   - cancel channel 关闭导致 subscribe_filtered 的过滤任务退出
   - 方案: 收集所有 handle 并 std::mem::forget,生命周期与进程一致

2. HIGH: 修复 critical_alert 消费者 payload 字段映射不匹配
   - 消费者读取 alert_type/metric_name 等顶层字段,但实际在 alert 嵌套对象中
   - 更新消费者从 alert 对象提取 indicator/value/threshold/level
   - handle_critical_alert_event 增加 severity 参数

3. MEDIUM: 修复 check_indicator 优先匹配最高严重级别
   - 原实现返回第一个匹配的阈值(可能匹配 warning 而非 critical)
   - 改为遍历所有匹配阈值,选择 severity 最高的(critical > warning)

4. MEDIUM: 修复危急值阈值页面不自动加载数据
   - CriticalValueThresholdList 添加 useEffect 初始化加载
2026-05-05 10:11:06 +08:00

2327 lines
103 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
use erp_core::events::EventBus;
use uuid::Uuid;
// ---------------------------------------------------------------------------
// 事件类型常量 — 集中管理,避免硬编码字符串散布在 service 层
// ---------------------------------------------------------------------------
// 预约
pub const APPOINTMENT_CREATED: &str = "appointment.created";
// appointment.confirmed / appointment.cancelled 等 — 动态拼接
// 告警
pub const ALERT_TRIGGERED: &str = "alert.triggered";
pub const ALERT_AGGREGATED: &str = "alert.aggregated";
// 知情同意
pub const CONSENT_GRANTED: &str = "consent.granted";
pub const CONSENT_REVOKED: &str = "consent.revoked";
// 文章
pub const ARTICLE_PUBLISHED: &str = "article.published";
pub const ARTICLE_REJECTED: &str = "article.rejected";
// 咨询
pub const CONSULTATION_OPENED: &str = "consultation.opened";
pub const CONSULTATION_CLOSED: &str = "consultation.closed";
pub const CONSULTATION_NEW_MESSAGE: &str = "consultation.new_message";
// 设备数据
pub const DEVICE_READINGS_SYNCED: &str = "device.readings.synced";
// 医生
pub const DOCTOR_ONLINE_STATUS_CHANGED: &str = "doctor.online_status_changed";
// 随访
pub const FOLLOW_UP_CREATED: &str = "follow_up.created";
pub const FOLLOW_UP_COMPLETED: &str = "follow_up.completed";
pub const FOLLOW_UP_OVERDUE: &str = "follow_up.overdue";
// 日常监测
pub const DAILY_MONITORING_CREATED: &str = "daily_monitoring.created";
// 健康数据
pub const LAB_REPORT_UPLOADED: &str = "lab_report.uploaded";
pub const LAB_REPORT_REVIEWED: &str = "lab_report.reviewed";
pub const HEALTH_DATA_CRITICAL_ALERT: &str = "health_data.critical_alert";
// 患者
pub const PATIENT_CREATED: &str = "patient.created";
pub const PATIENT_UPDATED: &str = "patient.updated";
// TODO: 以下常量对应的患者认证和死亡记录流程尚未实现,待后续迭代
pub const PATIENT_VERIFIED: &str = "patient.verified";
pub const PATIENT_DECEASED: &str = "patient.deceased";
// 积分
pub const POINTS_EXPIRED: &str = "points.expired";
pub const POINTS_EARNED: &str = "points.earned";
pub const POINTS_EXCHANGED: &str = "points.exchanged";
// 护理计划
pub const CARE_PLAN_CREATED: &str = "care_plan.created";
pub const CARE_PLAN_UPDATED: &str = "care_plan.updated";
pub const CARE_PLAN_ACTIVATED: &str = "care_plan.activated";
pub const CARE_PLAN_COMPLETED: &str = "care_plan.completed";
// 关怀行动
pub const CARE_ACTION_PERFORMED: &str = "care.action.performed";
/// 兼容旧签名 — 不做任何实际订阅(逻辑已迁移到 on_startup
pub fn register_handlers(_bus: &EventBus) {
// 事件处理器已迁移到 on_startup → register_handlers_with_state
}
/// 带 HealthState 的事件订阅 — 在模块 on_startup 时调用
pub fn register_handlers_with_state(state: crate::state::HealthState) {
// 收集所有 SubscriptionHandle 并 forget防止函数返回时 handle 被 drop
// 导致 cancel channel 关闭、过滤任务退出、消费者全部失效
let mut _handles: Vec<erp_core::events::SubscriptionHandle> = Vec::new();
// workflow.task.completed → 更新随访任务状态为 completed
let (mut workflow_rx, wf_handle) = state.event_bus.subscribe_filtered("workflow.task.".to_string());
_handles.push(wf_handle);
let wf_db = state.db.clone();
tokio::spawn(async move {
loop {
match workflow_rx.recv().await {
Some(event) if event.event_type == "workflow.task.completed" => {
if erp_core::events::is_event_processed(&wf_db, event.id, "workflow_task_consumer").await.unwrap_or(false) {
continue;
}
let task_id = event.payload.get("task_id").and_then(|v| v.as_str()).and_then(|s| uuid::Uuid::parse_str(s).ok());
match task_id {
Some(task_id) => {
match crate::service::follow_up_service::complete_task_by_system(
&wf_db, task_id, event.tenant_id,
)
.await
{
Ok(()) => {
tracing::info!(
event_id = %event.id,
task_id = %task_id,
"工作流任务完成 → 随访任务已更新"
);
}
Err(e) => {
tracing::warn!(
event_id = %event.id,
task_id = %task_id,
error = %e,
"工作流任务完成 → 随访任务更新失败"
);
}
}
}
None => {
tracing::warn!(
event_id = %event.id,
"工作流任务完成事件缺少 task_id跳过"
);
}
}
let _ = erp_core::events::mark_event_processed(&wf_db, event.id, "workflow_task_consumer").await;
}
Some(_) => {}
None => break,
}
}
});
// device.readings.synced → 触发告警引擎评估
let (mut reading_rx, reading_handle) = state.event_bus.subscribe_filtered("device.readings.".to_string());
_handles.push(reading_handle);
let eval_state = state.clone();
tokio::spawn(async move {
loop {
match reading_rx.recv().await {
Some(event) if event.event_type == DEVICE_READINGS_SYNCED => {
let patient_id = event.payload.get("patient_id")
.and_then(|v| v.as_str())
.and_then(|s| Uuid::parse_str(s).ok());
if let Some(pid) = patient_id {
// 对所有设备类型触发评估
for device_type in &["heart_rate", "blood_oxygen", "temperature", "blood_pressure", "blood_glucose"] {
if let Err(e) = crate::service::alert_engine::evaluate_rules(
&eval_state, event.tenant_id, pid, device_type,
).await {
tracing::error!(
patient_id = %pid,
device_type = device_type,
error = %e,
"告警评估失败"
);
}
}
}
}
Some(_) => {}
None => break,
}
}
});
// ── P1 事件消费者补全 ──
// alert.triggered → 告警消息通知
let (mut alert_rx, alert_handle) = state.event_bus.subscribe_filtered("alert.".to_string());
_handles.push(alert_handle);
let alert_db = state.db.clone();
let alert_bus = state.event_bus.clone();
tokio::spawn(async move {
loop {
match alert_rx.recv().await {
Some(event) if event.event_type == ALERT_TRIGGERED => {
if erp_core::events::is_event_processed(&alert_db, event.id, "alert_notifier").await.unwrap_or(false) {
continue;
}
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
let severity = event.payload.get("severity").and_then(|v| v.as_str()).unwrap_or("warning");
let rule_name = event.payload.get("rule_name").and_then(|v| v.as_str()).unwrap_or("健康告警");
if let Some(pid) = patient_id {
let notify_event = erp_core::events::DomainEvent::new(
"message.send",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"channel": "in_app",
"recipient_type": "patient",
"recipient_id": pid,
"template_key": if severity == "critical" { "CRITICAL_HEALTH_ALERT" } else { "HEALTH_DATA_ABNORMAL" },
"params": {
"rule_name": rule_name,
"severity": severity,
}
})),
);
alert_bus.publish(notify_event, &alert_db).await;
tracing::info!(patient_id = %pid, severity = %severity, "告警通知已发送");
}
let _ = erp_core::events::mark_event_processed(&alert_db, event.id, "alert_notifier").await;
}
Some(event) if event.event_type == ALERT_TRIGGERED => {
// 被抑制的告警 → 发布聚合事件
if erp_core::events::is_event_processed(&alert_db, event.id, "alert_aggregator").await.unwrap_or(false) {
continue;
}
let is_suppressed = event.payload.get("suppressed")
.and_then(|v| v.as_bool())
.unwrap_or(false);
if is_suppressed {
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
if let Some(pid) = patient_id {
let aggregated_event = erp_core::events::DomainEvent::new(
ALERT_AGGREGATED,
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"patient_id": pid,
"triggering_alert_id": event.payload.get("alert_id").and_then(|v| v.as_str()),
"severity": event.payload.get("severity"),
})),
);
alert_bus.publish(aggregated_event, &alert_db).await;
tracing::info!(patient_id = %pid, "告警聚合事件已发布");
}
}
let _ = erp_core::events::mark_event_processed(&alert_db, event.id, "alert_aggregator").await;
}
Some(_) => {}
None => break,
}
}
});
// patient.created → 欢迎消息通知
let (mut patient_rx, patient_handle) = state.event_bus.subscribe_filtered("patient.".to_string());
_handles.push(patient_handle);
let patient_db = state.db.clone();
let patient_bus = state.event_bus.clone();
tokio::spawn(async move {
loop {
match patient_rx.recv().await {
Some(event) if event.event_type == PATIENT_CREATED => {
if erp_core::events::is_event_processed(&patient_db, event.id, "patient_welcome").await.unwrap_or(false) {
continue;
}
let patient_id = event.payload.get("patient_id")
.and_then(|v| v.as_str())
.and_then(|s| Uuid::parse_str(s).ok());
if let Some(pid) = patient_id {
let welcome_event = erp_core::events::DomainEvent::new(
"message.send",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"template": "patient_welcome",
"recipient_type": "patient",
"recipient_id": pid,
})),
);
patient_bus.publish(welcome_event, &patient_db).await;
tracing::info!(patient_id = %pid, "新患者欢迎流程触发");
}
let _ = erp_core::events::mark_event_processed(&patient_db, event.id, "patient_welcome").await;
}
Some(_) => {}
None => break,
}
}
});
// appointment.created/confirmed/cancelled → 通知 + 号源释放
let (mut appt_rx, appt_handle) = state.event_bus.subscribe_filtered("appointment.".to_string());
_handles.push(appt_handle);
let appt_db = state.db.clone();
let appt_bus = state.event_bus.clone();
tokio::spawn(async move {
loop {
match appt_rx.recv().await {
Some(event) if event.event_type == APPOINTMENT_CREATED => {
if erp_core::events::is_event_processed(&appt_db, event.id, "appt_created_notifier").await.unwrap_or(false) {
continue;
}
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
let doctor_id = event.payload.get("doctor_id").and_then(|v| v.as_str());
if let (Some(pid), Some(did)) = (patient_id, doctor_id) {
let notify_event = erp_core::events::DomainEvent::new(
"message.send",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"channel": "in_app",
"recipient_type": "patient",
"recipient_id": pid,
"template_key": "APPOINTMENT_CREATED",
"params": { "doctor_id": did }
})),
);
appt_bus.publish(notify_event, &appt_db).await;
tracing::info!(patient_id = pid, doctor_id = did, "预约创建通知已发送");
}
let _ = erp_core::events::mark_event_processed(&appt_db, event.id, "appt_created_notifier").await;
}
Some(event) if event.event_type == "appointment.confirmed" => {
if erp_core::events::is_event_processed(&appt_db, event.id, "appointment_notifier").await.unwrap_or(false) {
continue;
}
let doctor_id = event.payload.get("doctor_id").and_then(|v| v.as_str());
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
if let (Some(did), Some(pid)) = (doctor_id, patient_id) {
let notify_event = erp_core::events::DomainEvent::new(
"message.send",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"template": "appointment_confirmed",
"recipient_type": "doctor",
"recipient_id": did,
"patient_id": pid,
})),
);
appt_bus.publish(notify_event, &appt_db).await;
tracing::info!(doctor_id = did, patient_id = pid, "预约确认通知触发");
}
let _ = erp_core::events::mark_event_processed(&appt_db, event.id, "appointment_notifier").await;
}
Some(event) if event.event_type == "appointment.cancelled" => {
if erp_core::events::is_event_processed(&appt_db, event.id, "appointment_cancel_handler").await.unwrap_or(false) {
continue;
}
tracing::info!(event_id = %event.id, "预约取消,号源释放");
let _ = erp_core::events::mark_event_processed(&appt_db, event.id, "appointment_cancel_handler").await;
}
Some(_) => {}
None => break,
}
}
});
// follow_up.overdue → 升级通知
let (mut fu_rx, fu_handle) = state.event_bus.subscribe_filtered("follow_up.".to_string());
_handles.push(fu_handle);
let fu_db = state.db.clone();
let fu_bus = state.event_bus.clone();
tokio::spawn(async move {
loop {
match fu_rx.recv().await {
Some(event) if event.event_type == FOLLOW_UP_OVERDUE => {
if erp_core::events::is_event_processed(&fu_db, event.id, "follow_up_escalator").await.unwrap_or(false) {
continue;
}
let task_id = event.payload.get("task_id").and_then(|v| v.as_str());
let assigned_to = event.payload.get("assigned_to").and_then(|v| v.as_str());
if let (Some(tid), Some(uid)) = (task_id, assigned_to) {
let escalate_event = erp_core::events::DomainEvent::new(
"message.send",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"template": "follow_up_overdue",
"recipient_type": "staff",
"recipient_id": uid,
"task_id": tid,
})),
);
fu_bus.publish(escalate_event, &fu_db).await;
tracing::warn!(task_id = tid, assigned_to = uid, "随访逾期升级通知");
}
let _ = erp_core::events::mark_event_processed(&fu_db, event.id, "follow_up_escalator").await;
}
Some(event) if event.event_type == FOLLOW_UP_COMPLETED => {
// 随访完成 → 检查是否由 AI 触发,触发再分析
if let Some(task_id_str) = event.payload.get("task_id").and_then(|v| v.as_str()) {
if let Ok(task_id) = uuid::Uuid::parse_str(task_id_str) {
let patient_id = event.payload.get("patient_id")
.and_then(|v| v.as_str())
.and_then(|s| uuid::Uuid::parse_str(s).ok());
if let Some(patient_id) = patient_id {
// 通过 ai_suggestion_loader 查找关联的 AI 建议
if let Some(suggestion_id) = crate::service::ai_suggestion_loader::find_by_followup_task(
&fu_db, event.tenant_id, task_id,
).await.unwrap_or(None) {
let reanalysis_event = erp_core::events::DomainEvent::new(
"ai.reanalysis.requested",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"original_suggestion_id": suggestion_id.to_string(),
"patient_id": patient_id.to_string(),
"followup_task_id": task_id_str,
"trigger": "loop_closure",
})),
);
fu_bus.publish(reanalysis_event, &fu_db).await;
tracing::info!(
suggestion_id = %suggestion_id,
patient_id = %patient_id,
task_id = %task_id,
"随访完成,触发 AI 再分析(闭环)"
);
}
}
}
}
}
Some(_) => {}
None => break,
}
}
});
// health_data.critical_alert → 创建危急值告警记录
let (mut critical_rx, critical_handle) = state.event_bus.subscribe_filtered("health_data.".to_string());
_handles.push(critical_handle);
let critical_state = state.clone();
tokio::spawn(async move {
loop {
match critical_rx.recv().await {
Some(event) if event.event_type == HEALTH_DATA_CRITICAL_ALERT => {
// 幂等检查
if erp_core::events::is_event_processed(&critical_state.db, event.id, "critical_alert_consumer").await.unwrap_or(false) {
continue;
}
let patient_id = event.payload.get("patient_id")
.and_then(|v| v.as_str())
.and_then(|s| Uuid::parse_str(s).ok());
// alert 数据在嵌套的 "alert" 对象中
let alert_obj = event.payload.get("alert");
let alert_type = "vital_sign";
let metric_name = alert_obj.and_then(|a| a.get("indicator")).and_then(|v| v.as_str()).unwrap_or("unknown");
let metric_value = alert_obj.and_then(|a| a.get("value")).and_then(|v| v.as_f64()).map(|v| v.to_string()).unwrap_or_default();
let threshold_value = alert_obj.and_then(|a| a.get("threshold")).and_then(|v| v.as_f64()).map(|v| v.to_string()).unwrap_or_default();
let severity = alert_obj.and_then(|a| a.get("level")).and_then(|v| v.as_str()).unwrap_or("critical");
if let Some(pid) = patient_id {
match crate::service::critical_alert_service::handle_critical_alert_event(
&critical_state,
event.tenant_id,
pid,
alert_type,
metric_name,
&metric_value,
&threshold_value,
None,
severity,
).await {
Ok(alert_id) => {
tracing::info!(
event_id = %event.id,
alert_id = %alert_id,
patient_id = %pid,
metric = %metric_name,
"危急值告警已创建"
);
let _ = erp_core::events::mark_event_processed(&critical_state.db, event.id, "critical_alert_consumer").await;
}
Err(e) => {
tracing::error!(
event_id = %event.id,
patient_id = %pid,
error = %e,
"危急值告警创建失败"
);
}
}
}
}
Some(_) => {}
None => break,
}
}
});
// ai.analysis.completed → 通知关联医生
let (mut ai_rx, ai_handle) = state.event_bus.subscribe_filtered("ai.".to_string());
_handles.push(ai_handle);
let ai_db = state.db.clone();
let ai_bus = state.event_bus.clone();
tokio::spawn(async move {
loop {
match ai_rx.recv().await {
Some(event) if event.event_type == "ai.analysis.completed" => {
if erp_core::events::is_event_processed(&ai_db, event.id, "ai_analysis_notifier").await.unwrap_or(false) {
continue;
}
let analysis_id = event.payload.get("analysis_id").and_then(|v| v.as_str());
let analysis_type = event.payload.get("analysis_type").and_then(|v| v.as_str()).unwrap_or("unknown");
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
let doctor_id = event.payload.get("doctor_id").and_then(|v| v.as_str());
if let (Some(did), Some(pid)) = (doctor_id, patient_id) {
let notify = erp_core::events::DomainEvent::new(
"message.send",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"channel": "in_app",
"recipient_type": "doctor",
"recipient_id": did,
"template_key": "AI_ANALYSIS_COMPLETED",
"params": {
"analysis_type": analysis_type,
"patient_id": pid,
}
})),
);
ai_bus.publish(notify, &ai_db).await;
tracing::info!(
analysis_id = ?analysis_id,
analysis_type = %analysis_type,
doctor_id = %did,
"AI 分析完成通知已发送给医生"
);
} else {
tracing::info!(
analysis_id = ?analysis_id,
analysis_type = %analysis_type,
"AI 分析完成(缺少关联信息,跳过通知)"
);
}
let _ = erp_core::events::mark_event_processed(&ai_db, event.id, "ai_analysis_notifier").await;
}
Some(event) if event.event_type == "dialysis.record.created" => {
if erp_core::events::is_event_processed(&ai_db, event.id, "dialysis_notifier").await.unwrap_or(false) {
continue;
}
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
let record_id = event.payload.get("record_id").and_then(|v| v.as_str());
tracing::info!(
record_id = ?record_id,
patient_id = ?patient_id,
"透析记录已创建,触发 KDIGO 自动评估"
);
// H4: 透析→KDIGO 自动串联 — 发布事件让 AI 模块执行风险评估
if let (Some(pid), Some(rid)) = (patient_id, record_id) {
let kdigo_event = erp_core::events::DomainEvent::new(
"ai.dialysis.kdigo_requested",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"patient_id": pid,
"dialysis_record_id": rid,
"source": "dialysis_notifier",
})),
);
ai_bus.publish(kdigo_event, &ai_db).await;
}
let _ = erp_core::events::mark_event_processed(&ai_db, event.id, "dialysis_notifier").await;
}
Some(_) => {}
None => break,
}
}
});
// ai.analysis.completed → AI→行动闭环消费者行动分发
let (mut ai_action_rx, ai_action_handle) = state.event_bus.subscribe_filtered("ai.".to_string());
_handles.push(ai_action_handle);
let action_db = state.db.clone();
let action_event_bus = state.event_bus.clone();
tokio::spawn(async move {
loop {
match ai_action_rx.recv().await {
Some(event) if event.event_type == "ai.analysis.completed" => {
if erp_core::events::is_event_processed(&action_db, event.id, "ai_action_dispatcher").await.unwrap_or(false) {
continue;
}
let tenant_id = event.tenant_id;
let analysis_id = event.payload.get("analysis_id")
.and_then(|v| v.as_str())
.and_then(|s| Uuid::parse_str(s).ok());
let patient_id = event.payload.get("patient_id")
.and_then(|v| v.as_str())
.and_then(|s| Uuid::parse_str(s).ok());
let doctor_id = event.payload.get("doctor_id")
.and_then(|v| v.as_str())
.and_then(|s| Uuid::parse_str(s).ok());
let risk_level = event.payload.get("risk_level")
.and_then(|v| v.as_str())
.unwrap_or("medium");
let suggestion_count = event.payload.get("suggestion_count")
.and_then(|v| v.as_u64())
.unwrap_or(0);
if suggestion_count > 0 {
if let (Some(aid), Some(pid)) = (analysis_id, patient_id) {
let loader_result: Result<Vec<serde_json::Value>, sea_orm::DbErr> =
crate::service::ai_suggestion_loader::load_by_analysis(
&action_db, tenant_id, aid,
).await;
match loader_result {
Ok(suggestions) if !suggestions.is_empty() => {
crate::service::ai_action_dispatcher::handle_ai_suggestions(
&action_db,
&action_event_bus,
tenant_id,
aid,
pid,
doctor_id,
&suggestions,
risk_level,
).await;
tracing::info!(
analysis_id = %aid,
patient_id = %pid,
suggestion_count = suggestions.len(),
risk_level = %risk_level,
"AI 行动分发完成"
);
}
Ok(_) => {
tracing::info!(analysis_id = %aid, "建议列表为空,跳过行动分发");
}
Err(e) => {
tracing::warn!(
analysis_id = %aid,
error = %e,
"加载建议列表失败"
);
}
}
}
}
let _ = erp_core::events::mark_event_processed(&action_db, event.id, "ai_action_dispatcher").await;
}
Some(_) => {}
None => break,
}
}
});
// consent.granted/revoked → 通知关联医生
let (mut consent_rx, consent_handle) = state.event_bus.subscribe_filtered("consent.".to_string());
_handles.push(consent_handle);
let consent_db = state.db.clone();
let consent_bus = state.event_bus.clone();
tokio::spawn(async move {
loop {
match consent_rx.recv().await {
Some(event) if event.event_type == CONSENT_GRANTED => {
if erp_core::events::is_event_processed(&consent_db, event.id, "consent_notifier").await.unwrap_or(false) {
continue;
}
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
let consent_type = event.payload.get("consent_type").and_then(|v| v.as_str()).unwrap_or("unknown");
if let Some(pid) = patient_id {
let notify_event = erp_core::events::DomainEvent::new(
"message.send",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"channel": "in_app",
"recipient_type": "patient",
"recipient_id": pid,
"template_key": "CONSENT_GRANTED",
"params": {
"consent_type": consent_type,
}
})),
);
consent_bus.publish(notify_event, &consent_db).await;
tracing::info!(patient_id = %pid, consent_type = %consent_type, "知情同意授予通知已发送");
}
let _ = erp_core::events::mark_event_processed(&consent_db, event.id, "consent_notifier").await;
}
Some(event) if event.event_type == CONSENT_REVOKED => {
if erp_core::events::is_event_processed(&consent_db, event.id, "consent_notifier").await.unwrap_or(false) {
continue;
}
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
let consent_type = event.payload.get("consent_type").and_then(|v| v.as_str()).unwrap_or("unknown");
if let Some(pid) = patient_id {
let notify_event = erp_core::events::DomainEvent::new(
"message.send",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"channel": "in_app",
"recipient_type": "staff",
"template_key": "CONSENT_REVOKED",
"params": {
"patient_id": pid,
"consent_type": consent_type,
}
})),
);
consent_bus.publish(notify_event, &consent_db).await;
tracing::warn!(patient_id = %pid, consent_type = %consent_type, "知情同意撤回通知已发送给医护");
}
let _ = erp_core::events::mark_event_processed(&consent_db, event.id, "consent_notifier").await;
}
Some(_) => {}
None => break,
}
}
});
// consultation.opened/new_message → 通知相关方
let (mut consult_rx, consult_handle) = state.event_bus.subscribe_filtered("consultation.".to_string());
_handles.push(consult_handle);
let consult_db = state.db.clone();
let consult_bus = state.event_bus.clone();
tokio::spawn(async move {
loop {
match consult_rx.recv().await {
Some(event) if event.event_type == CONSULTATION_OPENED => {
if erp_core::events::is_event_processed(&consult_db, event.id, "consult_opened_notifier").await.unwrap_or(false) {
continue;
}
let doctor_id = event.payload.get("doctor_id").and_then(|v| v.as_str());
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
if let (Some(did), Some(pid)) = (doctor_id, patient_id) {
let notify = erp_core::events::DomainEvent::new(
"message.send",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"channel": "in_app",
"recipient_type": "doctor",
"recipient_id": did,
"template_key": "CONSULTATION_OPENED",
"params": { "patient_id": pid }
})),
);
consult_bus.publish(notify, &consult_db).await;
tracing::info!(doctor_id = did, patient_id = pid, "咨询开启通知已发送给医生");
}
let _ = erp_core::events::mark_event_processed(&consult_db, event.id, "consult_opened_notifier").await;
}
Some(event) if event.event_type == CONSULTATION_NEW_MESSAGE => {
if erp_core::events::is_event_processed(&consult_db, event.id, "consult_msg_notifier").await.unwrap_or(false) {
continue;
}
let recipient_id = event.payload.get("recipient_id").and_then(|v| v.as_str());
let sender_role = event.payload.get("sender_role").and_then(|v| v.as_str()).unwrap_or("unknown");
if let Some(rid) = recipient_id {
let notify = erp_core::events::DomainEvent::new(
"message.send",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"channel": "in_app",
"recipient_type": if sender_role == "patient" { "doctor" } else { "patient" },
"recipient_id": rid,
"template_key": "CONSULTATION_NEW_MESSAGE",
})),
);
consult_bus.publish(notify, &consult_db).await;
tracing::info!(recipient_id = rid, sender_role = sender_role, "咨询新消息通知已发送");
}
let _ = erp_core::events::mark_event_processed(&consult_db, event.id, "consult_msg_notifier").await;
}
Some(event) if event.event_type == CONSULTATION_CLOSED => {
if erp_core::events::is_event_processed(&consult_db, event.id, "consult_closed_notifier").await.unwrap_or(false) {
continue;
}
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
if let Some(pid) = patient_id {
let notify = erp_core::events::DomainEvent::new(
"message.send",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"channel": "in_app",
"recipient_type": "patient",
"recipient_id": pid,
"template_key": "CONSULTATION_CLOSED",
})),
);
consult_bus.publish(notify, &consult_db).await;
tracing::info!(patient_id = pid, "咨询关闭通知已发送");
}
let _ = erp_core::events::mark_event_processed(&consult_db, event.id, "consult_closed_notifier").await;
}
Some(_) => {}
None => break,
}
}
});
// follow_up.created → 通知执行人
let (mut fu_created_rx, fu_created_handle) = state.event_bus.subscribe_filtered("follow_up.".to_string());
_handles.push(fu_created_handle);
let fu_created_db = state.db.clone();
let fu_created_bus = state.event_bus.clone();
tokio::spawn(async move {
loop {
match fu_created_rx.recv().await {
Some(event) if event.event_type == FOLLOW_UP_CREATED => {
if erp_core::events::is_event_processed(&fu_created_db, event.id, "fu_created_notifier").await.unwrap_or(false) {
continue;
}
let assigned_to = event.payload.get("assigned_to").and_then(|v| v.as_str());
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
if let (Some(uid), Some(pid)) = (assigned_to, patient_id) {
let notify = erp_core::events::DomainEvent::new(
"message.send",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"channel": "in_app",
"recipient_type": "staff",
"recipient_id": uid,
"template_key": "FOLLOW_UP_CREATED",
"params": { "patient_id": pid }
})),
);
fu_created_bus.publish(notify, &fu_created_db).await;
tracing::info!(assigned_to = uid, patient_id = pid, "随访创建通知已发送");
}
let _ = erp_core::events::mark_event_processed(&fu_created_db, event.id, "fu_created_notifier").await;
}
Some(_) => {}
None => break,
}
}
});
// points.earned/exchanged → 积分变动通知
let (mut points_rx, points_handle) = state.event_bus.subscribe_filtered("points.".to_string());
_handles.push(points_handle);
let points_db = state.db.clone();
let points_bus = state.event_bus.clone();
tokio::spawn(async move {
loop {
match points_rx.recv().await {
Some(event) if event.event_type == POINTS_EARNED => {
if erp_core::events::is_event_processed(&points_db, event.id, "points_earned_notifier").await.unwrap_or(false) {
continue;
}
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
let amount = event.payload.get("amount").and_then(|v| v.as_u64());
if let (Some(pid), Some(amt)) = (patient_id, amount) {
let notify = erp_core::events::DomainEvent::new(
"message.send",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"channel": "in_app",
"recipient_type": "patient",
"recipient_id": pid,
"template_key": "POINTS_EARNED",
"params": { "amount": amt }
})),
);
points_bus.publish(notify, &points_db).await;
tracing::info!(patient_id = pid, amount = amt, "积分获得通知已发送");
}
let _ = erp_core::events::mark_event_processed(&points_db, event.id, "points_earned_notifier").await;
}
Some(event) if event.event_type == POINTS_EXCHANGED => {
if erp_core::events::is_event_processed(&points_db, event.id, "points_exchanged_notifier").await.unwrap_or(false) {
continue;
}
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
let amount = event.payload.get("amount").and_then(|v| v.as_u64());
if let (Some(pid), Some(amt)) = (patient_id, amount) {
let notify = erp_core::events::DomainEvent::new(
"message.send",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"channel": "in_app",
"recipient_type": "patient",
"recipient_id": pid,
"template_key": "POINTS_EXCHANGED",
"params": { "amount": amt }
})),
);
points_bus.publish(notify, &points_db).await;
tracing::info!(patient_id = pid, amount = amt, "积分兑换通知已发送");
}
let _ = erp_core::events::mark_event_processed(&points_db, event.id, "points_exchanged_notifier").await;
}
Some(event) if event.event_type == POINTS_EXPIRED => {
if erp_core::events::is_event_processed(&points_db, event.id, "points_expired_notifier").await.unwrap_or(false) {
continue;
}
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
let amount = event.payload.get("amount").and_then(|v| v.as_u64());
if let (Some(pid), Some(amt)) = (patient_id, amount) {
let notify = erp_core::events::DomainEvent::new(
"message.send",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"channel": "in_app",
"recipient_type": "patient",
"recipient_id": pid,
"template_key": "POINTS_EXPIRED",
"params": { "amount": amt }
})),
);
points_bus.publish(notify, &points_db).await;
tracing::info!(patient_id = pid, amount = amt, "积分过期通知已发送");
}
let _ = erp_core::events::mark_event_processed(&points_db, event.id, "points_expired_notifier").await;
}
Some(_) => {}
None => break,
}
}
});
// lab_report.uploaded → 触发 AI 自动分析
let (mut lab_upload_rx, lab_upload_handle) = state.event_bus.subscribe_filtered("lab_report.".to_string());
_handles.push(lab_upload_handle);
let lab_upload_db = state.db.clone();
let lab_upload_bus = state.event_bus.clone();
tokio::spawn(async move {
loop {
match lab_upload_rx.recv().await {
Some(event) if event.event_type == LAB_REPORT_UPLOADED => {
if erp_core::events::is_event_processed(&lab_upload_db, event.id, "lab_upload_ai_trigger").await.unwrap_or(false) {
continue;
}
let report_id = event.payload.get("report_id").and_then(|v| v.as_str());
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
if let (Some(rid), Some(pid)) = (report_id, patient_id) {
let ai_event = erp_core::events::DomainEvent::new(
"ai.analysis.requested",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"source_type": "lab_report",
"source_id": rid,
"patient_id": pid,
})),
);
lab_upload_bus.publish(ai_event, &lab_upload_db).await;
tracing::info!(report_id = rid, patient_id = pid, "化验单上传触发 AI 分析请求");
}
let _ = erp_core::events::mark_event_processed(&lab_upload_db, event.id, "lab_upload_ai_trigger").await;
}
Some(event) if event.event_type == LAB_REPORT_REVIEWED => {
if erp_core::events::is_event_processed(&lab_upload_db, event.id, "lab_reviewed_notifier").await.unwrap_or(false) {
continue;
}
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
let reviewer_id = event.payload.get("reviewer_id").and_then(|v| v.as_str());
if let (Some(pid), Some(rid)) = (patient_id, reviewer_id) {
let notify = erp_core::events::DomainEvent::new(
"message.send",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"channel": "in_app",
"recipient_type": "patient",
"recipient_id": pid,
"template_key": "LAB_REPORT_REVIEWED",
"params": { "reviewer_id": rid }
})),
);
lab_upload_bus.publish(notify, &lab_upload_db).await;
tracing::info!(patient_id = pid, reviewer_id = rid, "化验报告审核通知已发送给患者");
}
let _ = erp_core::events::mark_event_processed(&lab_upload_db, event.id, "lab_reviewed_notifier").await;
}
Some(_) => {}
None => break,
}
}
});
// patient.updated → 审计日志
let (mut patient_update_rx, patient_update_handle) = state.event_bus.subscribe_filtered("patient.".to_string());
_handles.push(patient_update_handle);
let patient_update_db = state.db.clone();
tokio::spawn(async move {
loop {
match patient_update_rx.recv().await {
Some(event) if event.event_type == PATIENT_UPDATED => {
if erp_core::events::is_event_processed(&patient_update_db, event.id, "patient_updated_audit").await.unwrap_or(false) {
continue;
}
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
tracing::info!(patient_id = ?patient_id, tenant_id = %event.tenant_id, "患者信息已更新");
let _ = erp_core::events::mark_event_processed(&patient_update_db, event.id, "patient_updated_audit").await;
}
Some(_) => {}
None => break,
}
}
});
// 防止 SubscriptionHandle 被 drop 导致 cancel channel 关闭
// 所有过滤订阅的生命周期应与进程一致
std::mem::forget(_handles);
}
// 事件处理器本身依赖 tokio::spawn + channel + DB无法纯单元测试。
// 以下测试覆盖:
// 1. 事件类型常量的正确性(防止拼写错误导致消费者不匹配)
// 2. register_handlers 不 panic空函数
// 3. 事件 payload 构造格式与消费者解析逻辑的契约
// 4. EventBus 过滤订阅的内存行为(无需 DB
// 5. 消费者从 payload 中提取字段的边界条件
// ---------------------------------------------------------------------------
#[cfg(test)]
mod tests {
use super::*;
use erp_core::events::{build_event_payload, DomainEvent, EventBus};
use serde_json::json;
use std::collections::HashSet;
// ── 事件类型常量 ──────────────────────────────────────────────────────
/// 所有事件类型常量必须遵循 `{domain}.{action}` 格式
fn assert_valid_event_type(name: &str) {
let parts: Vec<&str> = name.split('.').collect();
assert!(
parts.len() >= 2,
"事件类型 '{}' 不符合 domain.action 格式",
name
);
assert!(
!parts[0].is_empty() && !parts[1].is_empty(),
"事件类型 '{}' 的 domain 或 action 为空",
name
);
}
#[test]
fn event_constants_follow_naming_convention() {
let all_types = [
APPOINTMENT_CREATED,
ALERT_TRIGGERED,
ALERT_AGGREGATED,
CONSENT_GRANTED,
CONSENT_REVOKED,
ARTICLE_PUBLISHED,
ARTICLE_REJECTED,
CONSULTATION_OPENED,
CONSULTATION_CLOSED,
CONSULTATION_NEW_MESSAGE,
DEVICE_READINGS_SYNCED,
DOCTOR_ONLINE_STATUS_CHANGED,
FOLLOW_UP_CREATED,
FOLLOW_UP_COMPLETED,
FOLLOW_UP_OVERDUE,
DAILY_MONITORING_CREATED,
LAB_REPORT_UPLOADED,
LAB_REPORT_REVIEWED,
HEALTH_DATA_CRITICAL_ALERT,
PATIENT_CREATED,
PATIENT_UPDATED,
PATIENT_VERIFIED,
PATIENT_DECEASED,
POINTS_EXPIRED,
POINTS_EARNED,
POINTS_EXCHANGED,
CARE_PLAN_CREATED,
CARE_PLAN_UPDATED,
CARE_PLAN_ACTIVATED,
CARE_PLAN_COMPLETED,
CARE_ACTION_PERFORMED,
];
for t in &all_types {
assert_valid_event_type(t);
}
}
#[test]
fn event_constants_are_unique() {
let all_types = [
APPOINTMENT_CREATED,
ALERT_TRIGGERED,
ALERT_AGGREGATED,
CONSENT_GRANTED,
CONSENT_REVOKED,
ARTICLE_PUBLISHED,
ARTICLE_REJECTED,
CONSULTATION_OPENED,
CONSULTATION_CLOSED,
CONSULTATION_NEW_MESSAGE,
DEVICE_READINGS_SYNCED,
DOCTOR_ONLINE_STATUS_CHANGED,
FOLLOW_UP_CREATED,
FOLLOW_UP_COMPLETED,
FOLLOW_UP_OVERDUE,
DAILY_MONITORING_CREATED,
LAB_REPORT_UPLOADED,
LAB_REPORT_REVIEWED,
HEALTH_DATA_CRITICAL_ALERT,
PATIENT_CREATED,
PATIENT_UPDATED,
PATIENT_VERIFIED,
PATIENT_DECEASED,
POINTS_EXPIRED,
POINTS_EARNED,
POINTS_EXCHANGED,
CARE_PLAN_CREATED,
CARE_PLAN_UPDATED,
CARE_PLAN_ACTIVATED,
CARE_PLAN_COMPLETED,
CARE_ACTION_PERFORMED,
];
let set: HashSet<&&str> = all_types.iter().collect();
assert_eq!(
set.len(),
all_types.len(),
"存在重复的事件类型常量"
);
}
#[test]
fn event_constants_match_expected_values() {
// 确保常量值与消费者 switch 匹配中使用的硬编码字符串一致
assert_eq!(APPOINTMENT_CREATED, "appointment.created");
assert_eq!(ALERT_TRIGGERED, "alert.triggered");
assert_eq!(ALERT_AGGREGATED, "alert.aggregated");
assert_eq!(CONSENT_GRANTED, "consent.granted");
assert_eq!(CONSENT_REVOKED, "consent.revoked");
assert_eq!(ARTICLE_PUBLISHED, "article.published");
assert_eq!(ARTICLE_REJECTED, "article.rejected");
assert_eq!(CONSULTATION_OPENED, "consultation.opened");
assert_eq!(CONSULTATION_CLOSED, "consultation.closed");
assert_eq!(CONSULTATION_NEW_MESSAGE, "consultation.new_message");
assert_eq!(DEVICE_READINGS_SYNCED, "device.readings.synced");
assert_eq!(DOCTOR_ONLINE_STATUS_CHANGED, "doctor.online_status_changed");
assert_eq!(FOLLOW_UP_CREATED, "follow_up.created");
assert_eq!(FOLLOW_UP_COMPLETED, "follow_up.completed");
assert_eq!(FOLLOW_UP_OVERDUE, "follow_up.overdue");
assert_eq!(DAILY_MONITORING_CREATED, "daily_monitoring.created");
assert_eq!(LAB_REPORT_UPLOADED, "lab_report.uploaded");
assert_eq!(LAB_REPORT_REVIEWED, "lab_report.reviewed");
assert_eq!(HEALTH_DATA_CRITICAL_ALERT, "health_data.critical_alert");
assert_eq!(PATIENT_CREATED, "patient.created");
assert_eq!(PATIENT_UPDATED, "patient.updated");
assert_eq!(PATIENT_VERIFIED, "patient.verified");
assert_eq!(PATIENT_DECEASED, "patient.deceased");
assert_eq!(POINTS_EXPIRED, "points.expired");
assert_eq!(POINTS_EARNED, "points.earned");
assert_eq!(POINTS_EXCHANGED, "points.exchanged");
assert_eq!(CARE_PLAN_CREATED, "care_plan.created");
assert_eq!(CARE_PLAN_UPDATED, "care_plan.updated");
assert_eq!(CARE_PLAN_ACTIVATED, "care_plan.activated");
assert_eq!(CARE_PLAN_COMPLETED, "care_plan.completed");
assert_eq!(CARE_ACTION_PERFORMED, "care.action.performed");
}
/// 消费者中硬编码的事件类型(非通过常量引用)也必须可被常量覆盖
#[test]
fn hardcoded_event_types_in_consumers_are_covered() {
// event.rs 中消费者使用的事件类型字符串(未通过常量引用)
let hardcoded = [
"workflow.task.completed",
"appointment.confirmed",
"appointment.cancelled",
"ai.analysis.completed",
"dialysis.record.created",
// 消费者产出的事件类型
"message.send",
"ai.analysis.requested",
"ai.reanalysis.requested",
];
// 这些硬编码类型不与 erp-health 常量重复
// 它们来自 erp-workflow / erp-core / erp-ai 等其他模块
// 验证 erp-health 常量不会与外部模块事件类型冲突
let health_types = [
APPOINTMENT_CREATED,
ALERT_TRIGGERED,
CONSENT_GRANTED,
CONSENT_REVOKED,
CONSULTATION_OPENED,
FOLLOW_UP_CREATED,
FOLLOW_UP_COMPLETED,
FOLLOW_UP_OVERDUE,
DEVICE_READINGS_SYNCED,
LAB_REPORT_UPLOADED,
HEALTH_DATA_CRITICAL_ALERT,
PATIENT_CREATED,
PATIENT_UPDATED,
];
for t in &hardcoded {
for ht in &health_types {
assert_ne!(
*t, *ht,
"外部模块事件类型 '{}' 与 erp-health 常量 '{}' 冲突",
t, ht
);
}
}
}
// ── register_handlers 不 panic ──────────────────────────────────────
#[test]
fn register_handlers_does_not_panic() {
let bus = EventBus::new(64);
// register_handlers 是空函数,不应 panic
register_handlers(&bus);
}
// ── DomainEvent 构造与 payload 契约 ─────────────────────────────────
#[test]
fn domain_event_new_sets_correct_event_type() {
let tenant_id = Uuid::now_v7();
let event = DomainEvent::new(PATIENT_CREATED, tenant_id, json!({}));
assert_eq!(event.event_type, PATIENT_CREATED);
assert_eq!(event.tenant_id, tenant_id);
}
#[test]
fn domain_event_new_generates_unique_ids() {
let tenant_id = Uuid::now_v7();
let e1 = DomainEvent::new("test.a", tenant_id, json!({}));
let e2 = DomainEvent::new("test.b", tenant_id, json!({}));
assert_ne!(e1.id, e2.id, "每个事件应有唯一 ID");
assert_ne!(e1.correlation_id, e2.correlation_id, "每个事件应有唯一 correlation_id");
}
#[test]
fn build_event_payload_injects_schema_version() {
let payload = build_event_payload(json!({ "patient_id": "abc" }));
assert_eq!(payload["schema_version"], "v1");
assert!(payload.get("occurred_at").is_some(), "必须包含 occurred_at 时间戳");
}
#[test]
fn build_event_payload_merges_data_fields() {
let payload = build_event_payload(json!({
"patient_id": "test-123",
"severity": "critical",
}));
assert_eq!(payload["schema_version"], "v1");
assert_eq!(payload["patient_id"], "test-123");
assert_eq!(payload["severity"], "critical");
}
// ── 消费者 payload 解析契约测试 ─────────────────────────────────────
//
// 验证消费者从 payload 中提取字段的方式与 build_event_payload 的输出兼容。
// 这些测试模拟消费者使用的 serde_json::Value 提取模式。
/// 模拟消费者提取 patient_idUUID 字符串)
fn extract_patient_id(payload: &serde_json::Value) -> Option<Uuid> {
payload
.get("patient_id")
.and_then(|v| v.as_str())
.and_then(|s| Uuid::parse_str(s).ok())
}
#[test]
fn payload_extraction_patient_id_from_uuid_string() {
let pid = Uuid::now_v7();
let payload = build_event_payload(json!({
"patient_id": pid.to_string(),
}));
assert_eq!(extract_patient_id(&payload), Some(pid));
}
#[test]
fn payload_extraction_patient_id_missing_field() {
let payload = build_event_payload(json!({}));
assert_eq!(extract_patient_id(&payload), None);
}
#[test]
fn payload_extraction_patient_id_invalid_uuid() {
let payload = build_event_payload(json!({
"patient_id": "not-a-uuid",
}));
assert_eq!(extract_patient_id(&payload), None);
}
#[test]
fn payload_extraction_patient_id_wrong_type() {
// patient_id 是数字而非字符串
let payload = build_event_payload(json!({
"patient_id": 12345,
}));
assert_eq!(extract_patient_id(&payload), None);
}
/// 模拟消费者提取 severity带默认值
fn extract_severity(payload: &serde_json::Value) -> &str {
payload
.get("severity")
.and_then(|v| v.as_str())
.unwrap_or("warning")
}
#[test]
fn payload_extraction_severity_present() {
let payload = build_event_payload(json!({ "severity": "critical" }));
assert_eq!(extract_severity(&payload), "critical");
}
#[test]
fn payload_extraction_severity_defaults_to_warning() {
let payload = build_event_payload(json!({}));
assert_eq!(extract_severity(&payload), "warning");
}
/// 模拟消费者提取 task_idUUID 字符串)
fn extract_task_id(payload: &serde_json::Value) -> Option<Uuid> {
payload
.get("task_id")
.and_then(|v| v.as_str())
.and_then(|s| Uuid::parse_str(s).ok())
}
#[test]
fn payload_extraction_task_id_valid() {
let tid = Uuid::now_v7();
let payload = build_event_payload(json!({ "task_id": tid.to_string() }));
assert_eq!(extract_task_id(&payload), Some(tid));
}
#[test]
fn payload_extraction_task_id_missing() {
let payload = build_event_payload(json!({ "other_field": "value" }));
assert_eq!(extract_task_id(&payload), None);
}
/// 模拟消费者提取 amountu64
fn extract_amount(payload: &serde_json::Value) -> Option<u64> {
payload.get("amount").and_then(|v| v.as_u64())
}
#[test]
fn payload_extraction_amount_valid() {
let payload = build_event_payload(json!({ "amount": 100 }));
assert_eq!(extract_amount(&payload), Some(100));
}
#[test]
fn payload_extraction_amount_zero() {
let payload = build_event_payload(json!({ "amount": 0 }));
assert_eq!(extract_amount(&payload), Some(0));
}
#[test]
fn payload_extraction_amount_missing() {
let payload = build_event_payload(json!({}));
assert_eq!(extract_amount(&payload), None);
}
#[test]
fn payload_extraction_amount_negative_returns_none() {
// serde_json u64 不能表示负数
let payload = build_event_payload(json!({ "amount": -5 }));
assert_eq!(extract_amount(&payload), None);
}
/// 模拟消费者提取 suggestion_countu64用于条件判断
#[test]
fn payload_extraction_suggestion_count_zero() {
let payload = build_event_payload(json!({ "suggestion_count": 0 }));
let count = payload.get("suggestion_count").and_then(|v| v.as_u64()).unwrap_or(0);
assert_eq!(count, 0);
assert!(!(count > 0), "suggestion_count=0 时不触发行动分发");
}
#[test]
fn payload_extraction_suggestion_count_positive() {
let payload = build_event_payload(json!({ "suggestion_count": 3 }));
let count = payload.get("suggestion_count").and_then(|v| v.as_u64()).unwrap_or(0);
assert!(count > 0, "suggestion_count>0 时应触发行动分发");
}
// ── 完整 payload 契约测试 ─────────────────────────────────────────
//
// 验证 service 层构建的 payload 能被消费者正确解析。
// 这些测试模拟 service 层的 build_event_payload 调用,然后
// 用消费者中的提取逻辑验证字段可达。
/// appointment.created 事件 payload 契约
#[test]
fn appointment_created_payload_contract() {
let patient_id = Uuid::now_v7();
let appointment_id = Uuid::now_v7();
// 模拟 appointment_service 的 payload 构造
let payload = build_event_payload(json!({
"appointment_id": appointment_id.to_string(),
"patient_id": patient_id.to_string(),
"status": "pending",
}));
// 消费者提取 patient_id字符串形式
let pid_str = payload.get("patient_id").and_then(|v| v.as_str());
assert!(pid_str.is_some(), "消费者需要 patient_id 字符串");
assert_eq!(pid_str.unwrap(), patient_id.to_string());
}
/// patient.created 事件 payload 契约
#[test]
fn patient_created_payload_contract() {
let patient_id = Uuid::now_v7();
// 模拟 patient_service 的 payload 构造
let payload = build_event_payload(json!({
"patient_id": patient_id.to_string(),
}));
let extracted = extract_patient_id(&payload);
assert_eq!(extracted, Some(patient_id));
}
/// alert.triggered 事件 payload 契约
#[test]
fn alert_triggered_payload_contract() {
let alert_id = Uuid::now_v7();
let patient_id = Uuid::now_v7();
// 模拟 alert_engine 的 payload 构造
let payload = build_event_payload(json!({
"alert_id": alert_id.to_string(),
"patient_id": patient_id.to_string(),
"rule_name": "心率过高",
"severity": "critical",
"detail": "心率超过阈值",
"notify_roles": ["doctor"],
}));
let pid = payload.get("patient_id").and_then(|v| v.as_str());
assert!(pid.is_some(), "消费者需要 patient_id");
let severity = extract_severity(&payload);
assert_eq!(severity, "critical");
let rule_name = payload.get("rule_name").and_then(|v| v.as_str()).unwrap_or("健康告警");
assert_eq!(rule_name, "心率过高");
}
/// health_data.critical_alert 事件 payload 契约
#[test]
fn critical_alert_payload_contract() {
let patient_id = Uuid::now_v7();
let payload = build_event_payload(json!({
"patient_id": patient_id.to_string(),
"alert_type": "vital_sign",
"metric_name": "heart_rate",
"metric_value": "180",
"threshold_value": "150",
}));
assert_eq!(extract_patient_id(&payload), Some(patient_id));
assert_eq!(
payload.get("alert_type").and_then(|v| v.as_str()).unwrap_or("vital_sign"),
"vital_sign"
);
assert_eq!(
payload.get("metric_name").and_then(|v| v.as_str()).unwrap_or("unknown"),
"heart_rate"
);
assert_eq!(
payload.get("metric_value").and_then(|v| v.as_str()).unwrap_or(""),
"180"
);
assert_eq!(
payload.get("threshold_value").and_then(|v| v.as_str()).unwrap_or(""),
"150"
);
}
/// follow_up.overdue 事件 payload 契约
#[test]
fn follow_up_overdue_payload_contract() {
let task_id = Uuid::now_v7();
let assigned_to = Uuid::now_v7();
let payload = build_event_payload(json!({
"task_id": task_id.to_string(),
"assigned_to": assigned_to.to_string(),
}));
let tid = payload.get("task_id").and_then(|v| v.as_str());
let uid = payload.get("assigned_to").and_then(|v| v.as_str());
assert!(tid.is_some(), "消费者需要 task_id");
assert!(uid.is_some(), "消费者需要 assigned_to");
}
/// consultation.new_message 事件 payload 契约 — sender_role 决定通知目标
#[test]
fn consultation_new_message_recipient_logic() {
// 患者发送 → 通知医生
let payload_patient = build_event_payload(json!({
"recipient_id": "doctor-123",
"sender_role": "patient",
}));
let sender_role = payload_patient.get("sender_role").and_then(|v| v.as_str()).unwrap_or("unknown");
let recipient_type = if sender_role == "patient" { "doctor" } else { "patient" };
assert_eq!(recipient_type, "doctor");
// 医生发送 → 通知患者
let payload_doctor = build_event_payload(json!({
"recipient_id": "patient-456",
"sender_role": "doctor",
}));
let sender_role = payload_doctor.get("sender_role").and_then(|v| v.as_str()).unwrap_or("unknown");
let recipient_type = if sender_role == "patient" { "doctor" } else { "patient" };
assert_eq!(recipient_type, "patient");
}
/// lab_report.uploaded 事件 payload 契约 — 触发 AI 分析
#[test]
fn lab_report_uploaded_payload_contract() {
let report_id = Uuid::now_v7();
let patient_id = Uuid::now_v7();
let payload = build_event_payload(json!({
"source_type": "lab_report",
"source_id": report_id.to_string(),
"patient_id": patient_id.to_string(),
}));
let rid = payload.get("source_id").and_then(|v| v.as_str());
let pid = payload.get("patient_id").and_then(|v| v.as_str());
assert!(rid.is_some(), "消费者需要 source_id (report_id)");
assert!(pid.is_some(), "消费者需要 patient_id");
}
/// points.earned 事件 payload 契约
#[test]
fn points_earned_payload_contract() {
let patient_id = Uuid::now_v7();
let payload = build_event_payload(json!({
"patient_id": patient_id.to_string(),
"amount": 50,
}));
let pid = payload.get("patient_id").and_then(|v| v.as_str());
let amt = payload.get("amount").and_then(|v| v.as_u64());
assert!(pid.is_some());
assert_eq!(amt, Some(50));
}
/// ai.analysis.completed 事件 payload 契约 — 含 suggestion_count
#[test]
fn ai_analysis_completed_payload_contract() {
let analysis_id = Uuid::now_v7();
let patient_id = Uuid::now_v7();
let doctor_id = Uuid::now_v7();
let payload = build_event_payload(json!({
"analysis_id": analysis_id.to_string(),
"analysis_type": "lab_report",
"patient_id": patient_id.to_string(),
"doctor_id": doctor_id.to_string(),
"risk_level": "high",
"suggestion_count": 2,
}));
// AI 分析完成通知消费者需要的字段
let did = payload.get("doctor_id").and_then(|v| v.as_str());
let pid = payload.get("patient_id").and_then(|v| v.as_str());
assert!(did.is_some(), "通知消费者需要 doctor_id");
assert!(pid.is_some(), "通知消费者需要 patient_id");
// 行动分发消费者需要的字段
let aid = payload.get("analysis_id").and_then(|v| v.as_str()).and_then(|s| Uuid::parse_str(s).ok());
assert_eq!(aid, Some(analysis_id));
let suggestion_count = payload.get("suggestion_count").and_then(|v| v.as_u64()).unwrap_or(0);
assert!(suggestion_count > 0, "有建议时应触发行动分发");
let risk_level = payload.get("risk_level").and_then(|v| v.as_str()).unwrap_or("medium");
assert_eq!(risk_level, "high");
}
// ── EventBus 过滤订阅行为测试 ──────────────────────────────────────
#[tokio::test]
async fn eventbus_broadcast_and_subscribe_filtered() {
let bus = EventBus::new(64);
let (mut rx, _handle) = bus.subscribe_filtered("patient.".to_string());
let tenant_id = Uuid::now_v7();
// 广播一个匹配的事件
let patient_event = DomainEvent::new(PATIENT_CREATED, tenant_id, json!({}));
bus.broadcast(patient_event.clone());
// 广播一个不匹配的事件
let other_event = DomainEvent::new(APPOINTMENT_CREATED, tenant_id, json!({}));
bus.broadcast(other_event);
// 只应收到 patient.created
let received = tokio::time::timeout(
std::time::Duration::from_millis(100),
rx.recv(),
)
.await
.expect("超时:应收到匹配的事件")
.expect("channel 不应关闭");
assert_eq!(received.event_type, PATIENT_CREATED);
}
#[tokio::test]
async fn eventbus_subscribe_filtered_ignores_non_matching() {
let bus = EventBus::new(64);
let (mut rx, _handle) = bus.subscribe_filtered("follow_up.".to_string());
let tenant_id = Uuid::now_v7();
// 广播不匹配的事件
let unmatched = DomainEvent::new(PATIENT_CREATED, tenant_id, json!({}));
bus.broadcast(unmatched);
// 应该收不到任何事件
let result = tokio::time::timeout(
std::time::Duration::from_millis(50),
rx.recv(),
)
.await;
assert!(result.is_err(), "不应收到不匹配的事件");
}
#[tokio::test]
async fn eventbus_subscribe_filtered_receives_multiple_matching() {
let bus = EventBus::new(64);
let (mut rx, _handle) = bus.subscribe_filtered("appointment.".to_string());
let tenant_id = Uuid::now_v7();
// 广播多个匹配前缀的事件
let types = [
APPOINTMENT_CREATED,
"appointment.confirmed",
"appointment.cancelled",
];
for t in &types {
let event = DomainEvent::new(*t, tenant_id, json!({}));
bus.broadcast(event);
}
// 应收到全部 3 个
let mut received_types = Vec::new();
for _ in 0..3 {
let event = tokio::time::timeout(
std::time::Duration::from_millis(100),
rx.recv(),
)
.await
.expect("超时")
.expect("channel 关闭");
received_types.push(event.event_type);
}
assert_eq!(received_types.len(), 3);
assert!(received_types.contains(&APPOINTMENT_CREATED.to_string()));
assert!(received_types.contains(&"appointment.confirmed".to_string()));
assert!(received_types.contains(&"appointment.cancelled".to_string()));
}
// ── 消费者前缀与常量匹配测试 ─────────────────────────────────────
//
// 验证 subscribe_filtered 的前缀能覆盖该通道需要接收的所有事件类型。
#[test]
fn subscribe_prefix_covers_all_workflow_events() {
let prefix = "workflow.task.";
let event_type = "workflow.task.completed";
assert!(
event_type.starts_with(prefix),
"前缀 '{}' 应覆盖 '{}'",
prefix,
event_type
);
}
#[test]
fn subscribe_prefix_covers_all_device_events() {
let prefix = "device.readings.";
assert!(
DEVICE_READINGS_SYNCED.starts_with(prefix),
"前缀 '{}' 应覆盖 '{}'",
prefix,
DEVICE_READINGS_SYNCED
);
}
#[test]
fn subscribe_prefix_covers_all_alert_events() {
let prefix = "alert.";
assert!(
ALERT_TRIGGERED.starts_with(prefix),
"前缀 '{}' 应覆盖 '{}'",
prefix,
ALERT_TRIGGERED
);
assert!(
ALERT_AGGREGATED.starts_with(prefix),
"前缀 '{}' 应覆盖 '{}'",
prefix,
ALERT_AGGREGATED
);
}
#[test]
fn subscribe_prefix_covers_all_patient_events() {
let prefix = "patient.";
assert!(PATIENT_CREATED.starts_with(prefix));
assert!(PATIENT_UPDATED.starts_with(prefix));
}
#[test]
fn subscribe_prefix_covers_all_appointment_events() {
let prefix = "appointment.";
assert!(APPOINTMENT_CREATED.starts_with(prefix));
assert!("appointment.confirmed".starts_with(prefix));
assert!("appointment.cancelled".starts_with(prefix));
}
#[test]
fn subscribe_prefix_covers_all_follow_up_events() {
let prefix = "follow_up.";
assert!(FOLLOW_UP_CREATED.starts_with(prefix));
assert!(FOLLOW_UP_COMPLETED.starts_with(prefix));
assert!(FOLLOW_UP_OVERDUE.starts_with(prefix));
}
#[test]
fn subscribe_prefix_covers_all_health_data_events() {
let prefix = "health_data.";
assert!(
HEALTH_DATA_CRITICAL_ALERT.starts_with(prefix),
"前缀 '{}' 应覆盖 '{}'",
prefix,
HEALTH_DATA_CRITICAL_ALERT
);
}
#[test]
fn subscribe_prefix_covers_all_ai_events() {
let prefix = "ai.";
assert!("ai.analysis.completed".starts_with(prefix));
assert!("ai.reanalysis.requested".starts_with(prefix));
}
#[test]
fn subscribe_prefix_covers_all_consent_events() {
let prefix = "consent.";
assert!(CONSENT_GRANTED.starts_with(prefix));
assert!(CONSENT_REVOKED.starts_with(prefix));
}
#[test]
fn subscribe_prefix_covers_all_consultation_events() {
let prefix = "consultation.";
assert!(CONSULTATION_OPENED.starts_with(prefix));
assert!(CONSULTATION_CLOSED.starts_with(prefix));
assert!(CONSULTATION_NEW_MESSAGE.starts_with(prefix));
}
#[test]
fn subscribe_prefix_covers_all_points_events() {
let prefix = "points.";
assert!(POINTS_EARNED.starts_with(prefix));
assert!(POINTS_EXCHANGED.starts_with(prefix));
assert!(POINTS_EXPIRED.starts_with(prefix));
}
#[test]
fn subscribe_prefix_covers_all_lab_report_events() {
let prefix = "lab_report.";
assert!(LAB_REPORT_UPLOADED.starts_with(prefix));
assert!(LAB_REPORT_REVIEWED.starts_with(prefix));
}
#[test]
fn subscribe_prefix_covers_all_care_plan_events() {
let prefix = "care_plan.";
assert!(CARE_PLAN_CREATED.starts_with(prefix));
assert!(CARE_PLAN_UPDATED.starts_with(prefix));
assert!(CARE_PLAN_ACTIVATED.starts_with(prefix));
assert!(CARE_PLAN_COMPLETED.starts_with(prefix));
}
// ── device.readings.synced 消费者设备类型列表测试 ──────────────────
#[test]
fn device_types_for_alert_evaluation_are_comprehensive() {
// 消费者硬编码的设备类型列表
let device_types = ["heart_rate", "blood_oxygen", "temperature", "blood_pressure", "blood_glucose"];
assert_eq!(device_types.len(), 5, "设备类型列表应包含 5 种类型");
// 确保没有重复
let set: HashSet<&&str> = device_types.iter().collect();
assert_eq!(set.len(), device_types.len(), "设备类型列表不应有重复");
}
// ── 告警严重度模板选择逻辑 ─────────────────────────────────────
#[test]
fn alert_severity_template_selection() {
let severity_critical = "critical";
let template = if severity_critical == "critical" {
"CRITICAL_HEALTH_ALERT"
} else {
"HEALTH_DATA_ABNORMAL"
};
assert_eq!(template, "CRITICAL_HEALTH_ALERT");
let severity_warning = "warning";
let template = if severity_warning == "critical" {
"CRITICAL_HEALTH_ALERT"
} else {
"HEALTH_DATA_ABNORMAL"
};
assert_eq!(template, "HEALTH_DATA_ABNORMAL");
}
// ── 消费者幂等 consumer_id 唯一性 ──────────────────────────────────
#[test]
fn consumer_ids_are_unique() {
// 收集所有消费者的 consumer_id从 mark_event_processed 调用中提取)
let consumer_ids = [
"workflow_task_consumer",
"alert_aggregator",
"alert_notifier",
"patient_welcome",
"appt_created_notifier",
"appointment_notifier",
"appointment_cancel_handler",
"follow_up_escalator",
"critical_alert_consumer",
"ai_analysis_notifier",
"dialysis_notifier",
"ai_action_dispatcher",
"consent_notifier",
"consult_opened_notifier",
"consult_msg_notifier",
"consult_closed_notifier",
"fu_created_notifier",
"points_earned_notifier",
"points_exchanged_notifier",
"points_expired_notifier",
"lab_upload_ai_trigger",
"lab_reviewed_notifier",
"patient_updated_audit",
];
let set: HashSet<&&str> = consumer_ids.iter().collect();
assert_eq!(
set.len(),
consumer_ids.len(),
"存在重复的 consumer_id"
);
}
// ── 消费者通知构造逻辑测试 ─────────────────────────────────────────
//
// 消费者的核心逻辑是闭包内的,无法直接调用。这里测试的是:
// 1. 通知消息的构造模式是否正确
// 2. 缺失字段时是否安全跳过(不 panic
// 3. 关键分支决策是否正确
/// 告警通知消费者severity 分支决定 template_key
#[test]
fn alert_notifier_constructs_correct_template_for_severity() {
let patient_id = Uuid::now_v7();
let tenant_id = Uuid::now_v7();
for (severity, expected_template) in [
("critical", "CRITICAL_HEALTH_ALERT"),
("warning", "HEALTH_DATA_ABNORMAL"),
("info", "HEALTH_DATA_ABNORMAL"),
] {
let payload = build_event_payload(json!({
"patient_id": patient_id.to_string(),
"severity": severity,
"rule_name": "血压偏高",
}));
let pid = payload.get("patient_id").and_then(|v| v.as_str());
let sev = payload.get("severity").and_then(|v| v.as_str()).unwrap_or("warning");
let rule = payload.get("rule_name").and_then(|v| v.as_str()).unwrap_or("健康告警");
assert!(pid.is_some(), "severity={} 时 patient_id 应存在", severity);
let template_key = if sev == "critical" { "CRITICAL_HEALTH_ALERT" } else { "HEALTH_DATA_ABNORMAL" };
assert_eq!(template_key, expected_template);
let notify = json!({
"channel": "in_app",
"recipient_type": "patient",
"recipient_id": pid.unwrap(),
"template_key": template_key,
"params": { "rule_name": rule, "severity": sev }
});
assert_eq!(notify["template_key"], expected_template);
}
}
/// 告警聚合消费者suppressed=true 时发布 ALERT_AGGREGATED
#[test]
fn alert_aggregator_only_publishes_when_suppressed() {
let patient_id = Uuid::now_v7();
// suppressed=true → 发布聚合事件
let payload_suppressed = build_event_payload(json!({
"patient_id": patient_id.to_string(),
"suppressed": true,
"alert_id": "alert-123",
"severity": "warning",
}));
let is_suppressed = payload_suppressed.get("suppressed").and_then(|v| v.as_bool()).unwrap_or(false);
assert!(is_suppressed, "suppressed=true 时应触发聚合");
// suppressed=false → 不发布
let payload_normal = build_event_payload(json!({
"patient_id": patient_id.to_string(),
"suppressed": false,
}));
let is_suppressed_2 = payload_normal.get("suppressed").and_then(|v| v.as_bool()).unwrap_or(false);
assert!(!is_suppressed_2, "suppressed=false 时不应触发聚合");
// 无 suppressed 字段 → 默认 false
let payload_no_field = build_event_payload(json!({
"patient_id": patient_id.to_string(),
}));
let is_suppressed_3 = payload_no_field.get("suppressed").and_then(|v| v.as_bool()).unwrap_or(false);
assert!(!is_suppressed_3, "缺少 suppressed 字段时默认不聚合");
}
/// AI 分析完成消费者:缺少 doctor_id 或 patient_id 时安全跳过
#[test]
fn ai_analysis_notifier_skips_when_missing_ids() {
// 完整 payload → 构造通知
let full_payload = build_event_payload(json!({
"analysis_id": Uuid::now_v7().to_string(),
"analysis_type": "lab_report",
"patient_id": Uuid::now_v7().to_string(),
"doctor_id": Uuid::now_v7().to_string(),
}));
let did = full_payload.get("doctor_id").and_then(|v| v.as_str());
let pid = full_payload.get("patient_id").and_then(|v| v.as_str());
assert!(did.is_some() && pid.is_some(), "完整 payload 应能提取 doctor_id 和 patient_id");
// 缺少 doctor_id → 跳过通知
let no_doctor = build_event_payload(json!({
"analysis_id": Uuid::now_v7().to_string(),
"patient_id": Uuid::now_v7().to_string(),
}));
let did2 = no_doctor.get("doctor_id").and_then(|v| v.as_str());
assert!(did2.is_none(), "缺少 doctor_id 应跳过通知");
// 完全空 payload → 不 panic
let empty = build_event_payload(json!({}));
let did3 = empty.get("doctor_id").and_then(|v| v.as_str());
let pid3 = empty.get("patient_id").and_then(|v| v.as_str());
assert!(did3.is_none() && pid3.is_none(), "空 payload 应安全返回 None");
}
/// AI 行动分发消费者suggestion_count=0 时跳过行动分发
#[test]
fn ai_action_dispatcher_skips_when_no_suggestions() {
// suggestion_count > 0 → 分发
let with_suggestions = build_event_payload(json!({
"analysis_id": Uuid::now_v7().to_string(),
"patient_id": Uuid::now_v7().to_string(),
"suggestion_count": 3,
}));
let count = with_suggestions.get("suggestion_count").and_then(|v| v.as_u64()).unwrap_or(0);
assert!(count > 0, "有建议时应触发分发");
// suggestion_count = 0 → 跳过
let no_suggestions = build_event_payload(json!({
"analysis_id": Uuid::now_v7().to_string(),
"patient_id": Uuid::now_v7().to_string(),
"suggestion_count": 0,
}));
let count2 = no_suggestions.get("suggestion_count").and_then(|v| v.as_u64()).unwrap_or(0);
assert_eq!(count2, 0, "无建议时应跳过分发");
// 无 suggestion_count 字段 → 默认 0
let no_field = build_event_payload(json!({
"analysis_id": Uuid::now_v7().to_string(),
}));
let count3 = no_field.get("suggestion_count").and_then(|v| v.as_u64()).unwrap_or(0);
assert_eq!(count3, 0, "缺少 suggestion_count 字段时默认为 0");
}
/// 预约创建消费者:缺少 patient_id 或 doctor_id 时安全跳过
#[test]
fn appointment_created_notifier_skips_when_missing_ids() {
let patient_id = Uuid::now_v7();
let doctor_id = Uuid::now_v7();
// 完整 → 构造通知
let full = build_event_payload(json!({
"patient_id": patient_id.to_string(),
"doctor_id": doctor_id.to_string(),
}));
let (pid, did) = (
full.get("patient_id").and_then(|v| v.as_str()),
full.get("doctor_id").and_then(|v| v.as_str()),
);
assert!(pid.is_some() && did.is_some());
// 缺 doctor_id
let no_doc = build_event_payload(json!({
"patient_id": patient_id.to_string(),
}));
let pid2 = no_doc.get("patient_id").and_then(|v| v.as_str());
let did2 = no_doc.get("doctor_id").and_then(|v| v.as_str());
assert!(pid2.is_some() && did2.is_none());
// 缺 patient_id
let no_pat = build_event_payload(json!({
"doctor_id": doctor_id.to_string(),
}));
let pid3 = no_pat.get("patient_id").and_then(|v| v.as_str());
let did3 = no_pat.get("doctor_id").and_then(|v| v.as_str());
assert!(pid3.is_none() && did3.is_some());
}
/// 随访逾期升级消费者:缺少 task_id 或 assigned_to 时安全跳过
#[test]
fn follow_up_escalator_skips_when_missing_ids() {
// 完整 → 构造升级通知
let full = build_event_payload(json!({
"task_id": Uuid::now_v7().to_string(),
"assigned_to": Uuid::now_v7().to_string(),
}));
let (tid, uid) = (
full.get("task_id").and_then(|v| v.as_str()),
full.get("assigned_to").and_then(|v| v.as_str()),
);
assert!(tid.is_some() && uid.is_some());
// 缺 assigned_to
let no_assignee = build_event_payload(json!({
"task_id": Uuid::now_v7().to_string(),
}));
let tid2 = no_assignee.get("task_id").and_then(|v| v.as_str());
let uid2 = no_assignee.get("assigned_to").and_then(|v| v.as_str());
assert!(tid2.is_some() && uid2.is_none());
}
/// 危急值告警消费者:从 payload 提取所有必需字段
#[test]
fn critical_alert_consumer_extracts_all_fields() {
let patient_id = Uuid::now_v7();
let payload = build_event_payload(json!({
"patient_id": patient_id.to_string(),
"alert_type": "vital_sign",
"metric_name": "systolic_bp",
"metric_value": "185",
"threshold_value": "140",
}));
let pid = payload.get("patient_id").and_then(|v| v.as_str()).and_then(|s| Uuid::parse_str(s).ok());
let alert_type = payload.get("alert_type").and_then(|v| v.as_str()).unwrap_or("vital_sign");
let metric_name = payload.get("metric_name").and_then(|v| v.as_str()).unwrap_or("unknown");
let metric_value = payload.get("metric_value").and_then(|v| v.as_str()).unwrap_or("");
let threshold = payload.get("threshold_value").and_then(|v| v.as_str()).unwrap_or("");
assert!(pid.is_some());
assert_eq!(alert_type, "vital_sign");
assert_eq!(metric_name, "systolic_bp");
assert_eq!(metric_value, "185");
assert_eq!(threshold, "140");
}
/// 危急值告警消费者:缺失 patient_id 时安全跳过
#[test]
fn critical_alert_consumer_skips_without_patient_id() {
let payload = build_event_payload(json!({
"alert_type": "vital_sign",
"metric_name": "heart_rate",
}));
let pid = payload.get("patient_id").and_then(|v| v.as_str()).and_then(|s| Uuid::parse_str(s).ok());
assert!(pid.is_none(), "缺少 patient_id 应安全跳过");
}
/// 咨询消息消费者sender_role 决定通知方向
#[test]
fn consultation_message_direction_logic() {
// 患者发送 → 通知医生
let from_patient = build_event_payload(json!({
"consultation_id": Uuid::now_v7().to_string(),
"sender_role": "patient",
"recipient_id": "doctor-789",
}));
let sender = from_patient.get("sender_role").and_then(|v| v.as_str()).unwrap_or("unknown");
let recipient_type = if sender == "patient" { "doctor" } else { "patient" };
assert_eq!(recipient_type, "doctor");
// 医生发送 → 通知患者
let from_doctor = build_event_payload(json!({
"consultation_id": Uuid::now_v7().to_string(),
"sender_role": "doctor",
"recipient_id": "patient-456",
}));
let sender2 = from_doctor.get("sender_role").and_then(|v| v.as_str()).unwrap_or("unknown");
let recipient_type2 = if sender2 == "patient" { "doctor" } else { "patient" };
assert_eq!(recipient_type2, "patient");
}
/// 知情同意消费者granted 和 revoked 使用不同 template_key
#[test]
fn consent_notifier_uses_correct_template() {
let patient_id = Uuid::now_v7();
// granted
let granted = build_event_payload(json!({
"patient_id": patient_id.to_string(),
"consent_type": "data_sharing",
}));
let consent_type = granted.get("consent_type").and_then(|v| v.as_str()).unwrap_or("unknown");
assert_eq!(consent_type, "data_sharing");
// 消费者会用 template_key: "CONSENT_GRANTED"
// revoked
let revoked = build_event_payload(json!({
"patient_id": patient_id.to_string(),
"consent_type": "data_sharing",
"reason": "患者主动撤销",
}));
let reason = revoked.get("reason").and_then(|v| v.as_str()).unwrap_or("未知原因");
assert_eq!(reason, "患者主动撤销");
}
/// 随访创建消费者:缺少 assigned_to 时安全跳过
#[test]
fn follow_up_created_notifier_skips_without_assignee() {
let patient_id = Uuid::now_v7();
let no_assignee = build_event_payload(json!({
"patient_id": patient_id.to_string(),
}));
let uid = no_assignee.get("assigned_to").and_then(|v| v.as_str());
assert!(uid.is_none(), "缺少 assigned_to 时不应构造通知");
let with_assignee = build_event_payload(json!({
"patient_id": patient_id.to_string(),
"assigned_to": Uuid::now_v7().to_string(),
}));
let uid2 = with_assignee.get("assigned_to").and_then(|v| v.as_str());
let pid2 = with_assignee.get("patient_id").and_then(|v| v.as_str());
assert!(uid2.is_some() && pid2.is_some(), "两者都存在时构造通知");
}
/// 积分消费者:缺失 amount 时安全跳过
#[test]
fn points_notifiers_skip_without_amount() {
let patient_id = Uuid::now_v7();
let no_amount = build_event_payload(json!({
"patient_id": patient_id.to_string(),
}));
let amt = no_amount.get("amount").and_then(|v| v.as_u64());
assert!(amt.is_none(), "缺少 amount 时安全跳过");
let with_amount = build_event_payload(json!({
"patient_id": patient_id.to_string(),
"amount": 100,
}));
let pid = with_amount.get("patient_id").and_then(|v| v.as_str());
let amt2 = with_amount.get("amount").and_then(|v| v.as_u64());
assert!(pid.is_some() && amt2.is_some());
}
/// 设备读数消费者:设备类型列表与代码内硬编码一致
#[test]
fn device_readings_consumer_device_types_match_code() {
let expected_types = ["heart_rate", "blood_oxygen", "temperature", "blood_pressure", "blood_glucose"];
// 验证列表包含 5 种类型且无重复
assert_eq!(expected_types.len(), 5);
let set: std::collections::HashSet<&&str> = expected_types.iter().collect();
assert_eq!(set.len(), 5);
// 验证每种类型都能从 payload 中提取
let device_type_field = "heart_rate";
assert!(expected_types.contains(&device_type_field));
}
/// workflow.task.completed 消费者:从 payload 提取 task_id
#[test]
fn workflow_task_consumer_extracts_task_id() {
let task_id = Uuid::now_v7();
let payload = build_event_payload(json!({
"task_id": task_id.to_string(),
}));
let extracted = payload.get("task_id").and_then(|v| v.as_str()).and_then(|s| Uuid::parse_str(s).ok());
assert_eq!(extracted, Some(task_id));
// 无效 UUID
let bad_uuid = build_event_payload(json!({
"task_id": "not-a-uuid",
}));
let extracted_bad = bad_uuid.get("task_id").and_then(|v| v.as_str()).and_then(|s| Uuid::parse_str(s).ok());
assert!(extracted_bad.is_none(), "无效 UUID 应返回 None");
}
/// patient.updated 消费者:幂等审计记录
#[test]
fn patient_updated_audit_extracts_patient_id() {
let patient_id = Uuid::now_v7();
let payload = build_event_payload(json!({
"patient_id": patient_id.to_string(),
"changed_fields": ["name", "phone"],
}));
let pid = payload.get("patient_id").and_then(|v| v.as_str()).and_then(|s| Uuid::parse_str(s).ok());
assert!(pid.is_some());
}
/// 消费者总数与 consumer_ids 列表一致
#[test]
fn consumer_count_matches_expected() {
let consumer_ids = [
"workflow_task_consumer",
"alert_aggregator",
"alert_notifier",
"patient_welcome",
"appt_created_notifier",
"appointment_notifier",
"appointment_cancel_handler",
"follow_up_escalator",
"critical_alert_consumer",
"ai_analysis_notifier",
"dialysis_notifier",
"ai_action_dispatcher",
"consent_notifier",
"consult_opened_notifier",
"consult_msg_notifier",
"consult_closed_notifier",
"fu_created_notifier",
"points_earned_notifier",
"points_exchanged_notifier",
"points_expired_notifier",
"lab_upload_ai_trigger",
"lab_reviewed_notifier",
"patient_updated_audit",
];
assert_eq!(consumer_ids.len(), 23, "消费者总数应为 23");
}
// ── care.action.performed 事件 payload 契约测试 ──────────────────
/// care.action.performed (item_completed) payload 契约
#[test]
fn care_action_item_completed_payload_contract() {
let plan_id = Uuid::now_v7();
let patient_id = Uuid::now_v7();
let payload = build_event_payload(json!({
"plan_id": plan_id.to_string(),
"patient_id": patient_id.to_string(),
"action": "item_completed",
"item_title": "血压监测",
"item_type": "monitoring",
"operator_id": Uuid::now_v7().to_string(),
}));
let pid = payload.get("patient_id").and_then(|v| v.as_str()).and_then(|s| Uuid::parse_str(s).ok());
assert!(pid.is_some(), "消费者需要 patient_id");
let action = payload.get("action").and_then(|v| v.as_str()).unwrap_or("");
assert_eq!(action, "item_completed");
let item_title = payload.get("item_title").and_then(|v| v.as_str()).unwrap_or("护理项目");
assert_eq!(item_title, "血压监测");
}
/// care.action.performed (outcome_measured) payload 契约
#[test]
fn care_action_outcome_measured_payload_contract() {
let plan_id = Uuid::now_v7();
let patient_id = Uuid::now_v7();
let payload = build_event_payload(json!({
"plan_id": plan_id.to_string(),
"patient_id": patient_id.to_string(),
"action": "outcome_measured",
"metric": "血压",
"operator_id": Uuid::now_v7().to_string(),
}));
let pid = payload.get("patient_id").and_then(|v| v.as_str()).and_then(|s| Uuid::parse_str(s).ok());
assert!(pid.is_some(), "消费者需要 patient_id");
let action = payload.get("action").and_then(|v| v.as_str()).unwrap_or("");
assert_eq!(action, "outcome_measured");
let metric = payload.get("metric").and_then(|v| v.as_str()).unwrap_or("健康指标");
assert_eq!(metric, "血压");
}
/// care_plan.activated / care_plan.completed payload 契约
#[test]
fn care_plan_lifecycle_payload_contract() {
let plan_id = Uuid::now_v7();
let patient_id = Uuid::now_v7();
for (event_type, status) in [
(CARE_PLAN_ACTIVATED, "active"),
(CARE_PLAN_COMPLETED, "completed"),
] {
let payload = build_event_payload(json!({
"plan_id": plan_id.to_string(),
"patient_id": patient_id.to_string(),
"status": status,
}));
let pid = payload.get("patient_id").and_then(|v| v.as_str()).and_then(|s| Uuid::parse_str(s).ok());
assert!(pid.is_some(), "{} 消费者需要 patient_id", event_type);
let plid = payload.get("plan_id").and_then(|v| v.as_str()).and_then(|s| Uuid::parse_str(s).ok());
assert!(plid.is_some(), "{} 消费者需要 plan_id", event_type);
}
}
/// 关怀行动通知消息分支逻辑
#[test]
fn care_action_notification_branch_logic() {
// item_completed → 温暖通知
let item_payload = build_event_payload(json!({
"action": "item_completed",
"item_title": "血压监测",
}));
let action = item_payload.get("action").and_then(|v| v.as_str()).unwrap_or("");
let (title, _body) = match action {
"item_completed" => ("关怀已送达".to_string(), "您的护理团队已完成「血压监测」".to_string()),
"outcome_measured" => ("健康数据已更新".to_string(), "数据已记录".to_string()),
_ => ("关怀已送达".to_string(), "正在关注".to_string()),
};
assert_eq!(title, "关怀已送达");
// outcome_measured → 数据更新通知
let outcome_payload = build_event_payload(json!({
"action": "outcome_measured",
"metric": "血压",
}));
let action2 = outcome_payload.get("action").and_then(|v| v.as_str()).unwrap_or("");
let (title2, _body2) = match action2 {
"item_completed" => ("关怀已送达".to_string(), "已完成".to_string()),
"outcome_measured" => ("健康数据已更新".to_string(), "血压数据已记录".to_string()),
_ => ("关怀已送达".to_string(), "正在关注".to_string()),
};
assert_eq!(title2, "健康数据已更新");
// 未知 action → 默认通知
let unknown_payload = build_event_payload(json!({ "action": "unknown" }));
let action3 = unknown_payload.get("action").and_then(|v| v.as_str()).unwrap_or("");
let (title3, _body3) = match action3 {
"item_completed" => ("关怀已送达".to_string(), "已完成".to_string()),
"outcome_measured" => ("健康数据已更新".to_string(), "已记录".to_string()),
_ => ("关怀已送达".to_string(), "正在关注".to_string()),
};
assert_eq!(title3, "关怀已送达");
}
}