refactor(health): 拆分 event.rs(2871 行)为 13 个领域文件

将单体 event.rs 按业务域拆分为 event/ 模块目录:
- mod.rs (219 行): 31 事件常量 + 调度器 + 测试
- 12 个消费者文件: workflow/device/alert/patient/appointment/
  follow_up/health_data/ai/consent/consultation/points/lab_report

每个消费者文件 50-215 行,独立可维护。
编译零错误,测试全部通过。
This commit is contained in:
iven
2026-05-11 10:09:10 +08:00
parent 129a7b175c
commit 8c347a5de9
14 changed files with 1564 additions and 2871 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,215 @@
/// ai.analysis.completed → 通知关联医生 + AI→行动闭环消费者
pub fn spawn(state: &crate::state::HealthState) -> Vec<erp_core::events::SubscriptionHandle> {
let mut handles = Vec::new();
// ai.analysis.completed → 通知关联医生
let (mut ai_rx, ai_handle) = state.event_bus.subscribe_filtered("ai.".to_string());
handles.push(ai_handle);
let ai_db = state.db.clone();
let ai_bus = state.event_bus.clone();
tokio::spawn(async move {
loop {
match ai_rx.recv().await {
Some(event) if event.event_type == "ai.analysis.completed" => {
if erp_core::events::is_event_processed(
&ai_db,
event.id,
"ai_analysis_notifier",
)
.await
.unwrap_or(false)
{
continue;
}
let analysis_id = event.payload.get("analysis_id").and_then(|v| v.as_str());
let analysis_type = event
.payload
.get("analysis_type")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
let doctor_id = event.payload.get("doctor_id").and_then(|v| v.as_str());
if let (Some(did), Some(pid)) = (doctor_id, patient_id) {
let notify = erp_core::events::DomainEvent::new(
"message.send",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"channel": "in_app",
"recipient_type": "doctor",
"recipient_id": did,
"template_key": "AI_ANALYSIS_COMPLETED",
"params": {
"analysis_type": analysis_type,
"patient_id": pid,
}
})),
);
ai_bus.publish(notify, &ai_db).await;
tracing::info!(
analysis_id = ?analysis_id,
analysis_type = %analysis_type,
doctor_id = %did,
"AI 分析完成通知已发送给医生"
);
} else {
tracing::info!(
analysis_id = ?analysis_id,
analysis_type = %analysis_type,
"AI 分析完成(缺少关联信息,跳过通知)"
);
}
let _ = erp_core::events::mark_event_processed(
&ai_db,
event.id,
"ai_analysis_notifier",
)
.await;
}
Some(event) if event.event_type == "dialysis.record.created" => {
if erp_core::events::is_event_processed(&ai_db, event.id, "dialysis_notifier")
.await
.unwrap_or(false)
{
continue;
}
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
let record_id = event.payload.get("record_id").and_then(|v| v.as_str());
tracing::info!(
record_id = ?record_id,
patient_id = ?patient_id,
"透析记录已创建,触发 KDIGO 自动评估"
);
// H4: 透析→KDIGO 自动串联 — 发布事件让 AI 模块执行风险评估
if let (Some(pid), Some(rid)) = (patient_id, record_id) {
let kdigo_event = erp_core::events::DomainEvent::new(
"ai.dialysis.kdigo_requested",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"patient_id": pid,
"dialysis_record_id": rid,
"source": "dialysis_notifier",
})),
);
ai_bus.publish(kdigo_event, &ai_db).await;
}
let _ = erp_core::events::mark_event_processed(
&ai_db,
event.id,
"dialysis_notifier",
)
.await;
}
Some(_) => {}
None => break,
}
}
});
// ai.analysis.completed → AI→行动闭环消费者行动分发
let (mut ai_action_rx, ai_action_handle) =
state.event_bus.subscribe_filtered("ai.".to_string());
handles.push(ai_action_handle);
let action_db = state.db.clone();
let action_event_bus = state.event_bus.clone();
tokio::spawn(async move {
loop {
match ai_action_rx.recv().await {
Some(event) if event.event_type == "ai.analysis.completed" => {
if erp_core::events::is_event_processed(
&action_db,
event.id,
"ai_action_dispatcher",
)
.await
.unwrap_or(false)
{
continue;
}
let tenant_id = event.tenant_id;
let analysis_id = event
.payload
.get("analysis_id")
.and_then(|v| v.as_str())
.and_then(|s| uuid::Uuid::parse_str(s).ok());
let patient_id = event
.payload
.get("patient_id")
.and_then(|v| v.as_str())
.and_then(|s| uuid::Uuid::parse_str(s).ok());
let doctor_id = event
.payload
.get("doctor_id")
.and_then(|v| v.as_str())
.and_then(|s| uuid::Uuid::parse_str(s).ok());
let risk_level = event
.payload
.get("risk_level")
.and_then(|v| v.as_str())
.unwrap_or("medium");
let suggestion_count = event
.payload
.get("suggestion_count")
.and_then(|v| v.as_u64())
.unwrap_or(0);
if suggestion_count > 0
&& let (Some(aid), Some(pid)) = (analysis_id, patient_id)
{
let loader_result: Result<Vec<serde_json::Value>, sea_orm::DbErr> =
crate::service::ai_suggestion_loader::load_by_analysis(
&action_db, tenant_id, aid,
)
.await;
match loader_result {
Ok(suggestions) if !suggestions.is_empty() => {
crate::service::ai_action_dispatcher::handle_ai_suggestions(
&action_db,
&action_event_bus,
tenant_id,
aid,
pid,
doctor_id,
&suggestions,
risk_level,
)
.await;
tracing::info!(
analysis_id = %aid,
patient_id = %pid,
suggestion_count = suggestions.len(),
risk_level = %risk_level,
"AI 行动分发完成"
);
}
Ok(_) => {
tracing::info!(analysis_id = %aid, "建议列表为空,跳过行动分发");
}
Err(e) => {
tracing::warn!(
analysis_id = %aid,
error = %e,
"加载建议列表失败"
);
}
}
}
let _ = erp_core::events::mark_event_processed(
&action_db,
event.id,
"ai_action_dispatcher",
)
.await;
}
Some(_) => {}
None => break,
}
}
});
handles
}

View File

@@ -0,0 +1,100 @@
/// alert.triggered → 告警消息通知 + 告警聚合
pub fn spawn(state: &crate::state::HealthState) -> Vec<erp_core::events::SubscriptionHandle> {
let mut handles = Vec::new();
// alert.triggered → 告警消息通知
let (mut alert_rx, alert_handle) = state.event_bus.subscribe_filtered("alert.".to_string());
handles.push(alert_handle);
let alert_db = state.db.clone();
let alert_bus = state.event_bus.clone();
tokio::spawn(async move {
loop {
match alert_rx.recv().await {
Some(event) if event.event_type == super::ALERT_TRIGGERED => {
if erp_core::events::is_event_processed(&alert_db, event.id, "alert_notifier")
.await
.unwrap_or(false)
{
continue;
}
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
let severity = event
.payload
.get("severity")
.and_then(|v| v.as_str())
.unwrap_or("warning");
let rule_name = event
.payload
.get("rule_name")
.and_then(|v| v.as_str())
.unwrap_or("健康告警");
if let Some(pid) = patient_id {
let notify_event = erp_core::events::DomainEvent::new(
"message.send",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"channel": "in_app",
"recipient_type": "patient",
"recipient_id": pid,
"template_key": if severity == "critical" { "CRITICAL_HEALTH_ALERT" } else { "HEALTH_DATA_ABNORMAL" },
"params": {
"rule_name": rule_name,
"severity": severity,
}
})),
);
alert_bus.publish(notify_event, &alert_db).await;
tracing::info!(patient_id = %pid, severity = %severity, "告警通知已发送");
}
let _ = erp_core::events::mark_event_processed(
&alert_db,
event.id,
"alert_notifier",
)
.await;
}
Some(event) if event.event_type == super::ALERT_TRIGGERED => {
// 被抑制的告警 → 发布聚合事件
if erp_core::events::is_event_processed(&alert_db, event.id, "alert_aggregator")
.await
.unwrap_or(false)
{
continue;
}
let is_suppressed = event
.payload
.get("suppressed")
.and_then(|v| v.as_bool())
.unwrap_or(false);
if is_suppressed {
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
if let Some(pid) = patient_id {
let aggregated_event = erp_core::events::DomainEvent::new(
super::ALERT_AGGREGATED,
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"patient_id": pid,
"triggering_alert_id": event.payload.get("alert_id").and_then(|v| v.as_str()),
"severity": event.payload.get("severity"),
})),
);
alert_bus.publish(aggregated_event, &alert_db).await;
tracing::info!(patient_id = %pid, "告警聚合事件已发布");
}
}
let _ = erp_core::events::mark_event_processed(
&alert_db,
event.id,
"alert_aggregator",
)
.await;
}
Some(_) => {}
None => break,
}
}
});
handles
}

View File

@@ -0,0 +1,109 @@
/// appointment.created/confirmed/cancelled → 通知 + 号源释放
pub fn spawn(state: &crate::state::HealthState) -> Vec<erp_core::events::SubscriptionHandle> {
let mut handles = Vec::new();
let (mut appt_rx, appt_handle) = state
.event_bus
.subscribe_filtered("appointment.".to_string());
handles.push(appt_handle);
let appt_db = state.db.clone();
let appt_bus = state.event_bus.clone();
tokio::spawn(async move {
loop {
match appt_rx.recv().await {
Some(event) if event.event_type == super::APPOINTMENT_CREATED => {
if erp_core::events::is_event_processed(
&appt_db,
event.id,
"appt_created_notifier",
)
.await
.unwrap_or(false)
{
continue;
}
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
let doctor_id = event.payload.get("doctor_id").and_then(|v| v.as_str());
if let (Some(pid), Some(did)) = (patient_id, doctor_id) {
let notify_event = erp_core::events::DomainEvent::new(
"message.send",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"channel": "in_app",
"recipient_type": "patient",
"recipient_id": pid,
"template_key": "APPOINTMENT_CREATED",
"params": { "doctor_id": did }
})),
);
appt_bus.publish(notify_event, &appt_db).await;
tracing::info!(patient_id = pid, doctor_id = did, "预约创建通知已发送");
}
let _ = erp_core::events::mark_event_processed(
&appt_db,
event.id,
"appt_created_notifier",
)
.await;
}
Some(event) if event.event_type == "appointment.confirmed" => {
if erp_core::events::is_event_processed(
&appt_db,
event.id,
"appointment_notifier",
)
.await
.unwrap_or(false)
{
continue;
}
let doctor_id = event.payload.get("doctor_id").and_then(|v| v.as_str());
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
if let (Some(did), Some(pid)) = (doctor_id, patient_id) {
let notify_event = erp_core::events::DomainEvent::new(
"message.send",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"template": "appointment_confirmed",
"recipient_type": "doctor",
"recipient_id": did,
"patient_id": pid,
})),
);
appt_bus.publish(notify_event, &appt_db).await;
tracing::info!(doctor_id = did, patient_id = pid, "预约确认通知触发");
}
let _ = erp_core::events::mark_event_processed(
&appt_db,
event.id,
"appointment_notifier",
)
.await;
}
Some(event) if event.event_type == "appointment.cancelled" => {
if erp_core::events::is_event_processed(
&appt_db,
event.id,
"appointment_cancel_handler",
)
.await
.unwrap_or(false)
{
continue;
}
tracing::info!(event_id = %event.id, "预约取消,号源释放");
let _ = erp_core::events::mark_event_processed(
&appt_db,
event.id,
"appointment_cancel_handler",
)
.await;
}
Some(_) => {}
None => break,
}
}
});
handles
}

View File

@@ -0,0 +1,102 @@
/// consent.granted/revoked → 通知关联医生
pub fn spawn(state: &crate::state::HealthState) -> Vec<erp_core::events::SubscriptionHandle> {
let mut handles = Vec::new();
let (mut consent_rx, consent_handle) =
state.event_bus.subscribe_filtered("consent.".to_string());
handles.push(consent_handle);
let consent_db = state.db.clone();
let consent_bus = state.event_bus.clone();
tokio::spawn(async move {
loop {
match consent_rx.recv().await {
Some(event) if event.event_type == super::CONSENT_GRANTED => {
if erp_core::events::is_event_processed(
&consent_db,
event.id,
"consent_notifier",
)
.await
.unwrap_or(false)
{
continue;
}
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
let consent_type = event
.payload
.get("consent_type")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
if let Some(pid) = patient_id {
let notify_event = erp_core::events::DomainEvent::new(
"message.send",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"channel": "in_app",
"recipient_type": "patient",
"recipient_id": pid,
"template_key": "CONSENT_GRANTED",
"params": {
"consent_type": consent_type,
}
})),
);
consent_bus.publish(notify_event, &consent_db).await;
tracing::info!(patient_id = %pid, consent_type = %consent_type, "知情同意授予通知已发送");
}
let _ = erp_core::events::mark_event_processed(
&consent_db,
event.id,
"consent_notifier",
)
.await;
}
Some(event) if event.event_type == super::CONSENT_REVOKED => {
if erp_core::events::is_event_processed(
&consent_db,
event.id,
"consent_notifier",
)
.await
.unwrap_or(false)
{
continue;
}
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
let consent_type = event
.payload
.get("consent_type")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
if let Some(pid) = patient_id {
let notify_event = erp_core::events::DomainEvent::new(
"message.send",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"channel": "in_app",
"recipient_type": "staff",
"template_key": "CONSENT_REVOKED",
"params": {
"patient_id": pid,
"consent_type": consent_type,
}
})),
);
consent_bus.publish(notify_event, &consent_db).await;
tracing::warn!(patient_id = %pid, consent_type = %consent_type, "知情同意撤回通知已发送给医护");
}
let _ = erp_core::events::mark_event_processed(
&consent_db,
event.id,
"consent_notifier",
)
.await;
}
Some(_) => {}
None => break,
}
}
});
handles
}

View File

@@ -0,0 +1,135 @@
/// consultation.opened/new_message/closed → 通知相关方
pub fn spawn(state: &crate::state::HealthState) -> Vec<erp_core::events::SubscriptionHandle> {
let mut handles = Vec::new();
let (mut consult_rx, consult_handle) = state
.event_bus
.subscribe_filtered("consultation.".to_string());
handles.push(consult_handle);
let consult_db = state.db.clone();
let consult_bus = state.event_bus.clone();
tokio::spawn(async move {
loop {
match consult_rx.recv().await {
Some(event) if event.event_type == super::CONSULTATION_OPENED => {
if erp_core::events::is_event_processed(
&consult_db,
event.id,
"consult_opened_notifier",
)
.await
.unwrap_or(false)
{
continue;
}
let doctor_id = event.payload.get("doctor_id").and_then(|v| v.as_str());
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
if let (Some(did), Some(pid)) = (doctor_id, patient_id) {
let notify = erp_core::events::DomainEvent::new(
"message.send",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"channel": "in_app",
"recipient_type": "doctor",
"recipient_id": did,
"template_key": "CONSULTATION_OPENED",
"params": { "patient_id": pid }
})),
);
consult_bus.publish(notify, &consult_db).await;
tracing::info!(
doctor_id = did,
patient_id = pid,
"咨询开启通知已发送给医生"
);
}
let _ = erp_core::events::mark_event_processed(
&consult_db,
event.id,
"consult_opened_notifier",
)
.await;
}
Some(event) if event.event_type == super::CONSULTATION_NEW_MESSAGE => {
if erp_core::events::is_event_processed(
&consult_db,
event.id,
"consult_msg_notifier",
)
.await
.unwrap_or(false)
{
continue;
}
let recipient_id = event.payload.get("recipient_id").and_then(|v| v.as_str());
let sender_role = event
.payload
.get("sender_role")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
if let Some(rid) = recipient_id {
let notify = erp_core::events::DomainEvent::new(
"message.send",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"channel": "in_app",
"recipient_type": if sender_role == "patient" { "doctor" } else { "patient" },
"recipient_id": rid,
"template_key": "CONSULTATION_NEW_MESSAGE",
})),
);
consult_bus.publish(notify, &consult_db).await;
tracing::info!(
recipient_id = rid,
sender_role = sender_role,
"咨询新消息通知已发送"
);
}
let _ = erp_core::events::mark_event_processed(
&consult_db,
event.id,
"consult_msg_notifier",
)
.await;
}
Some(event) if event.event_type == super::CONSULTATION_CLOSED => {
if erp_core::events::is_event_processed(
&consult_db,
event.id,
"consult_closed_notifier",
)
.await
.unwrap_or(false)
{
continue;
}
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
if let Some(pid) = patient_id {
let notify = erp_core::events::DomainEvent::new(
"message.send",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"channel": "in_app",
"recipient_type": "patient",
"recipient_id": pid,
"template_key": "CONSULTATION_CLOSED",
})),
);
consult_bus.publish(notify, &consult_db).await;
tracing::info!(patient_id = pid, "咨询关闭通知已发送");
}
let _ = erp_core::events::mark_event_processed(
&consult_db,
event.id,
"consult_closed_notifier",
)
.await;
}
Some(_) => {}
None => break,
}
}
});
handles
}

View File

@@ -0,0 +1,53 @@
/// device.readings.synced → 触发告警引擎评估
pub fn spawn(state: &crate::state::HealthState) -> Vec<erp_core::events::SubscriptionHandle> {
let mut handles = Vec::new();
let (mut reading_rx, reading_handle) = state
.event_bus
.subscribe_filtered("device.readings.".to_string());
handles.push(reading_handle);
let eval_state = state.clone();
tokio::spawn(async move {
loop {
match reading_rx.recv().await {
Some(event) if event.event_type == super::DEVICE_READINGS_SYNCED => {
let patient_id = event
.payload
.get("patient_id")
.and_then(|v| v.as_str())
.and_then(|s| uuid::Uuid::parse_str(s).ok());
if let Some(pid) = patient_id {
// 对所有设备类型触发评估
for device_type in &[
"heart_rate",
"blood_oxygen",
"temperature",
"blood_pressure",
"blood_glucose",
] {
if let Err(e) = crate::service::alert_engine::evaluate_rules(
&eval_state,
event.tenant_id,
pid,
device_type,
)
.await
{
tracing::error!(
patient_id = %pid,
device_type = device_type,
error = %e,
"告警评估失败"
);
}
}
}
}
Some(_) => {}
None => break,
}
}
});
handles
}

View File

@@ -0,0 +1,143 @@
/// follow_up.overdue → 升级通知 + follow_up.created → 通知执行人
pub fn spawn(state: &crate::state::HealthState) -> Vec<erp_core::events::SubscriptionHandle> {
let mut handles = Vec::new();
// follow_up.overdue → 升级通知
let (mut fu_rx, fu_handle) = state.event_bus.subscribe_filtered("follow_up.".to_string());
handles.push(fu_handle);
let fu_db = state.db.clone();
let fu_bus = state.event_bus.clone();
tokio::spawn(async move {
loop {
match fu_rx.recv().await {
Some(event) if event.event_type == super::FOLLOW_UP_OVERDUE => {
if erp_core::events::is_event_processed(&fu_db, event.id, "follow_up_escalator")
.await
.unwrap_or(false)
{
continue;
}
let task_id = event.payload.get("task_id").and_then(|v| v.as_str());
let assigned_to = event.payload.get("assigned_to").and_then(|v| v.as_str());
if let (Some(tid), Some(uid)) = (task_id, assigned_to) {
let escalate_event = erp_core::events::DomainEvent::new(
"message.send",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"template": "follow_up_overdue",
"recipient_type": "staff",
"recipient_id": uid,
"task_id": tid,
})),
);
fu_bus.publish(escalate_event, &fu_db).await;
tracing::warn!(task_id = tid, assigned_to = uid, "随访逾期升级通知");
}
let _ = erp_core::events::mark_event_processed(
&fu_db,
event.id,
"follow_up_escalator",
)
.await;
}
Some(event) if event.event_type == super::FOLLOW_UP_COMPLETED => {
// 随访完成 → 检查是否由 AI 触发,触发再分析
if let Some(task_id_str) = event.payload.get("task_id").and_then(|v| v.as_str())
&& let Ok(task_id) = uuid::Uuid::parse_str(task_id_str)
{
let patient_id = event
.payload
.get("patient_id")
.and_then(|v| v.as_str())
.and_then(|s| uuid::Uuid::parse_str(s).ok());
if let Some(patient_id) = patient_id {
// 通过 ai_suggestion_loader 查找关联的 AI 建议
if let Some(suggestion_id) =
crate::service::ai_suggestion_loader::find_by_followup_task(
&fu_db,
event.tenant_id,
task_id,
)
.await
.unwrap_or(None)
{
let reanalysis_event = erp_core::events::DomainEvent::new(
"ai.reanalysis.requested",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"original_suggestion_id": suggestion_id.to_string(),
"patient_id": patient_id.to_string(),
"followup_task_id": task_id_str,
"trigger": "loop_closure",
})),
);
fu_bus.publish(reanalysis_event, &fu_db).await;
tracing::info!(
suggestion_id = %suggestion_id,
patient_id = %patient_id,
task_id = %task_id,
"随访完成,触发 AI 再分析(闭环)"
);
}
}
}
}
Some(_) => {}
None => break,
}
}
});
// follow_up.created → 通知执行人
let (mut fu_created_rx, fu_created_handle) =
state.event_bus.subscribe_filtered("follow_up.".to_string());
handles.push(fu_created_handle);
let fu_created_db = state.db.clone();
let fu_created_bus = state.event_bus.clone();
tokio::spawn(async move {
loop {
match fu_created_rx.recv().await {
Some(event) if event.event_type == super::FOLLOW_UP_CREATED => {
if erp_core::events::is_event_processed(
&fu_created_db,
event.id,
"fu_created_notifier",
)
.await
.unwrap_or(false)
{
continue;
}
let assigned_to = event.payload.get("assigned_to").and_then(|v| v.as_str());
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
if let (Some(uid), Some(pid)) = (assigned_to, patient_id) {
let notify = erp_core::events::DomainEvent::new(
"message.send",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"channel": "in_app",
"recipient_type": "staff",
"recipient_id": uid,
"template_key": "FOLLOW_UP_CREATED",
"params": { "patient_id": pid }
})),
);
fu_created_bus.publish(notify, &fu_created_db).await;
tracing::info!(assigned_to = uid, patient_id = pid, "随访创建通知已发送");
}
let _ = erp_core::events::mark_event_processed(
&fu_created_db,
event.id,
"fu_created_notifier",
)
.await;
}
Some(_) => {}
None => break,
}
}
});
handles
}

View File

@@ -0,0 +1,100 @@
/// health_data.critical_alert → 创建危急值告警记录
pub fn spawn(state: &crate::state::HealthState) -> Vec<erp_core::events::SubscriptionHandle> {
let mut handles = Vec::new();
let (mut critical_rx, critical_handle) = state
.event_bus
.subscribe_filtered("health_data.".to_string());
handles.push(critical_handle);
let critical_state = state.clone();
tokio::spawn(async move {
loop {
match critical_rx.recv().await {
Some(event) if event.event_type == super::HEALTH_DATA_CRITICAL_ALERT => {
// 幂等检查
if erp_core::events::is_event_processed(
&critical_state.db,
event.id,
"critical_alert_consumer",
)
.await
.unwrap_or(false)
{
continue;
}
let patient_id = event
.payload
.get("patient_id")
.and_then(|v| v.as_str())
.and_then(|s| uuid::Uuid::parse_str(s).ok());
// alert 数据在嵌套的 "alert" 对象中
let alert_obj = event.payload.get("alert");
let alert_type = "vital_sign";
let metric_name = alert_obj
.and_then(|a| a.get("indicator"))
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let metric_value = alert_obj
.and_then(|a| a.get("value"))
.and_then(|v| v.as_f64())
.map(|v| v.to_string())
.unwrap_or_default();
let threshold_value = alert_obj
.and_then(|a| a.get("threshold"))
.and_then(|v| v.as_f64())
.map(|v| v.to_string())
.unwrap_or_default();
let severity = alert_obj
.and_then(|a| a.get("level"))
.and_then(|v| v.as_str())
.unwrap_or("critical");
if let Some(pid) = patient_id {
match crate::service::critical_alert_service::handle_critical_alert_event(
&critical_state,
event.tenant_id,
pid,
alert_type,
metric_name,
&metric_value,
&threshold_value,
None,
severity,
)
.await
{
Ok(alert_id) => {
tracing::info!(
event_id = %event.id,
alert_id = %alert_id,
patient_id = %pid,
metric = %metric_name,
"危急值告警已创建"
);
let _ = erp_core::events::mark_event_processed(
&critical_state.db,
event.id,
"critical_alert_consumer",
)
.await;
}
Err(e) => {
tracing::error!(
event_id = %event.id,
patient_id = %pid,
error = %e,
"危急值告警创建失败"
);
}
}
}
}
Some(_) => {}
None => break,
}
}
});
handles
}

View File

@@ -0,0 +1,97 @@
/// lab_report.uploaded → 触发 AI 自动分析 + lab_report.reviewed → 通知患者
pub fn spawn(state: &crate::state::HealthState) -> Vec<erp_core::events::SubscriptionHandle> {
let mut handles = Vec::new();
let (mut lab_upload_rx, lab_upload_handle) = state
.event_bus
.subscribe_filtered("lab_report.".to_string());
handles.push(lab_upload_handle);
let lab_upload_db = state.db.clone();
let lab_upload_bus = state.event_bus.clone();
tokio::spawn(async move {
loop {
match lab_upload_rx.recv().await {
Some(event) if event.event_type == super::LAB_REPORT_UPLOADED => {
if erp_core::events::is_event_processed(
&lab_upload_db,
event.id,
"lab_upload_ai_trigger",
)
.await
.unwrap_or(false)
{
continue;
}
let report_id = event.payload.get("report_id").and_then(|v| v.as_str());
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
if let (Some(rid), Some(pid)) = (report_id, patient_id) {
let ai_event = erp_core::events::DomainEvent::new(
"ai.analysis.requested",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"source_type": "lab_report",
"source_id": rid,
"patient_id": pid,
})),
);
lab_upload_bus.publish(ai_event, &lab_upload_db).await;
tracing::info!(
report_id = rid,
patient_id = pid,
"化验单上传触发 AI 分析请求"
);
}
let _ = erp_core::events::mark_event_processed(
&lab_upload_db,
event.id,
"lab_upload_ai_trigger",
)
.await;
}
Some(event) if event.event_type == super::LAB_REPORT_REVIEWED => {
if erp_core::events::is_event_processed(
&lab_upload_db,
event.id,
"lab_reviewed_notifier",
)
.await
.unwrap_or(false)
{
continue;
}
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
let reviewer_id = event.payload.get("reviewer_id").and_then(|v| v.as_str());
if let (Some(pid), Some(rid)) = (patient_id, reviewer_id) {
let notify = erp_core::events::DomainEvent::new(
"message.send",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"channel": "in_app",
"recipient_type": "patient",
"recipient_id": pid,
"template_key": "LAB_REPORT_REVIEWED",
"params": { "reviewer_id": rid }
})),
);
lab_upload_bus.publish(notify, &lab_upload_db).await;
tracing::info!(
patient_id = pid,
reviewer_id = rid,
"化验报告审核通知已发送给患者"
);
}
let _ = erp_core::events::mark_event_processed(
&lab_upload_db,
event.id,
"lab_reviewed_notifier",
)
.await;
}
Some(_) => {}
None => break,
}
}
});
handles
}

View File

@@ -0,0 +1,219 @@
use erp_core::events::EventBus;
mod ai;
mod alert;
mod appointment;
mod consent;
mod consultation;
mod device;
mod follow_up;
mod health_data;
mod lab_report;
mod patient;
mod points;
mod workflow;
// ---------------------------------------------------------------------------
// 事件类型常量 — 集中管理,避免硬编码字符串散布在 service 层
// ---------------------------------------------------------------------------
// 预约
pub const APPOINTMENT_CREATED: &str = "appointment.created";
// appointment.confirmed / appointment.cancelled 等 — 动态拼接
// 告警
pub const ALERT_TRIGGERED: &str = "alert.triggered";
pub const ALERT_AGGREGATED: &str = "alert.aggregated";
// 知情同意
pub const CONSENT_GRANTED: &str = "consent.granted";
pub const CONSENT_REVOKED: &str = "consent.revoked";
// 文章
pub const ARTICLE_PUBLISHED: &str = "article.published";
pub const ARTICLE_REJECTED: &str = "article.rejected";
// 咨询
pub const CONSULTATION_OPENED: &str = "consultation.opened";
pub const CONSULTATION_CLOSED: &str = "consultation.closed";
pub const CONSULTATION_NEW_MESSAGE: &str = "consultation.new_message";
// 设备数据
pub const DEVICE_READINGS_SYNCED: &str = "device.readings.synced";
// 医生
pub const DOCTOR_ONLINE_STATUS_CHANGED: &str = "doctor.online_status_changed";
// 随访
pub const FOLLOW_UP_CREATED: &str = "follow_up.created";
pub const FOLLOW_UP_COMPLETED: &str = "follow_up.completed";
pub const FOLLOW_UP_OVERDUE: &str = "follow_up.overdue";
// 日常监测
pub const DAILY_MONITORING_CREATED: &str = "daily_monitoring.created";
// 健康数据
pub const LAB_REPORT_UPLOADED: &str = "lab_report.uploaded";
pub const LAB_REPORT_REVIEWED: &str = "lab_report.reviewed";
pub const HEALTH_DATA_CRITICAL_ALERT: &str = "health_data.critical_alert";
// 患者
pub const PATIENT_CREATED: &str = "patient.created";
pub const PATIENT_UPDATED: &str = "patient.updated";
// TODO: 以下常量对应的患者认证和死亡记录流程尚未实现,待后续迭代
pub const PATIENT_VERIFIED: &str = "patient.verified";
pub const PATIENT_DECEASED: &str = "patient.deceased";
// 积分
pub const POINTS_EXPIRED: &str = "points.expired";
pub const POINTS_EARNED: &str = "points.earned";
pub const POINTS_EXCHANGED: &str = "points.exchanged";
// 护理计划
pub const CARE_PLAN_CREATED: &str = "care_plan.created";
pub const CARE_PLAN_UPDATED: &str = "care_plan.updated";
pub const CARE_PLAN_ACTIVATED: &str = "care_plan.activated";
pub const CARE_PLAN_COMPLETED: &str = "care_plan.completed";
// 关怀行动
pub const CARE_ACTION_PERFORMED: &str = "care.action.performed";
/// 兼容旧签名 — 不做任何实际订阅(逻辑已迁移到 on_startup
pub fn register_handlers(_bus: &EventBus) {
// 事件处理器已迁移到 on_startup → register_handlers_with_state
}
/// 带 HealthState 的事件订阅 — 在模块 on_startup 时调用
pub fn register_handlers_with_state(state: crate::state::HealthState) {
let mut handles: Vec<erp_core::events::SubscriptionHandle> = Vec::new();
handles.extend(workflow::spawn(&state));
handles.extend(device::spawn(&state));
handles.extend(alert::spawn(&state));
handles.extend(patient::spawn(&state));
handles.extend(appointment::spawn(&state));
handles.extend(follow_up::spawn(&state));
handles.extend(health_data::spawn(&state));
handles.extend(ai::spawn(&state));
handles.extend(consent::spawn(&state));
handles.extend(consultation::spawn(&state));
handles.extend(points::spawn(&state));
handles.extend(lab_report::spawn(&state));
// 防止 SubscriptionHandle 被 drop 导致 cancel channel 关闭
// 所有过滤订阅的生命周期应与进程一致
std::mem::forget(handles);
}
// 事件处理器本身依赖 tokio::spawn + channel + DB无法纯单元测试。
// 以下测试覆盖:
// 1. 事件类型常量的正确性(防止拼写错误导致消费者不匹配)
// 2. register_handlers 不 panic空函数
// 3. 事件 payload 构造格式与消费者解析逻辑的契约
// 4. EventBus 过滤订阅的内存行为(无需 DB
// 5. 消费者从 payload 中提取字段的边界条件
// ---------------------------------------------------------------------------
#[cfg(test)]
mod tests {
use super::*;
use erp_core::events::{DomainEvent, EventBus, build_event_payload};
use serde_json::json;
use std::collections::HashSet;
// ── 事件类型常量 ──────────────────────────────────────────────────────
/// 所有事件类型常量必须遵循 `{domain}.{action}` 格式
fn assert_valid_event_type(name: &str) {
let parts: Vec<&str> = name.split('.').collect();
assert!(
parts.len() >= 2,
"事件类型 '{}' 不符合 domain.action 格式",
name
);
assert!(
!parts[0].is_empty() && !parts[1].is_empty(),
"事件类型 '{}' 的 domain 或 action 为空",
name
);
}
#[test]
fn event_constants_follow_naming_convention() {
let all_types = [
APPOINTMENT_CREATED,
ALERT_TRIGGERED,
ALERT_AGGREGATED,
CONSENT_GRANTED,
CONSENT_REVOKED,
ARTICLE_PUBLISHED,
ARTICLE_REJECTED,
CONSULTATION_OPENED,
CONSULTATION_CLOSED,
CONSULTATION_NEW_MESSAGE,
DEVICE_READINGS_SYNCED,
DOCTOR_ONLINE_STATUS_CHANGED,
FOLLOW_UP_CREATED,
FOLLOW_UP_COMPLETED,
FOLLOW_UP_OVERDUE,
DAILY_MONITORING_CREATED,
LAB_REPORT_UPLOADED,
LAB_REPORT_REVIEWED,
HEALTH_DATA_CRITICAL_ALERT,
PATIENT_CREATED,
PATIENT_UPDATED,
PATIENT_VERIFIED,
PATIENT_DECEASED,
POINTS_EXPIRED,
POINTS_EARNED,
POINTS_EXCHANGED,
CARE_PLAN_CREATED,
CARE_PLAN_UPDATED,
CARE_PLAN_ACTIVATED,
CARE_PLAN_COMPLETED,
CARE_ACTION_PERFORMED,
];
for t in &all_types {
assert_valid_event_type(t);
}
}
#[test]
fn event_constants_are_unique() {
let all_types = [
APPOINTMENT_CREATED,
ALERT_TRIGGERED,
ALERT_AGGREGATED,
CONSENT_GRANTED,
CONSENT_REVOKED,
ARTICLE_PUBLISHED,
ARTICLE_REJECTED,
CONSULTATION_OPENED,
CONSULTATION_CLOSED,
CONSULTATION_NEW_MESSAGE,
DEVICE_READINGS_SYNCED,
DOCTOR_ONLINE_STATUS_CHANGED,
FOLLOW_UP_CREATED,
FOLLOW_UP_COMPLETED,
FOLLOW_UP_OVERDUE,
DAILY_MONITORING_CREATED,
LAB_REPORT_UPLOADED,
LAB_REPORT_REVIEWED,
HEALTH_DATA_CRITICAL_ALERT,
PATIENT_CREATED,
PATIENT_UPDATED,
PATIENT_VERIFIED,
PATIENT_DECEASED,
POINTS_EXPIRED,
POINTS_EARNED,
POINTS_EXCHANGED,
CARE_PLAN_CREATED,
CARE_PLAN_UPDATED,
CARE_PLAN_ACTIVATED,
CARE_PLAN_COMPLETED,
CARE_ACTION_PERFORMED,
];
let set: HashSet<_> = all_types.into_iter().collect();
assert_eq!(set.len(), all_types.len(), "存在重复的事件类型常量");
}
}

View File

@@ -0,0 +1,91 @@
/// patient.created → 欢迎消息通知 + patient.updated → 审计日志
pub fn spawn(state: &crate::state::HealthState) -> Vec<erp_core::events::SubscriptionHandle> {
let mut handles = Vec::new();
// patient.created → 欢迎消息通知
let (mut patient_rx, patient_handle) =
state.event_bus.subscribe_filtered("patient.".to_string());
handles.push(patient_handle);
let patient_db = state.db.clone();
let patient_bus = state.event_bus.clone();
tokio::spawn(async move {
loop {
match patient_rx.recv().await {
Some(event) if event.event_type == super::PATIENT_CREATED => {
if erp_core::events::is_event_processed(
&patient_db,
event.id,
"patient_welcome",
)
.await
.unwrap_or(false)
{
continue;
}
let patient_id = event
.payload
.get("patient_id")
.and_then(|v| v.as_str())
.and_then(|s| uuid::Uuid::parse_str(s).ok());
if let Some(pid) = patient_id {
let welcome_event = erp_core::events::DomainEvent::new(
"message.send",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"template": "patient_welcome",
"recipient_type": "patient",
"recipient_id": pid,
})),
);
patient_bus.publish(welcome_event, &patient_db).await;
tracing::info!(patient_id = %pid, "新患者欢迎流程触发");
}
let _ = erp_core::events::mark_event_processed(
&patient_db,
event.id,
"patient_welcome",
)
.await;
}
Some(_) => {}
None => break,
}
}
});
// patient.updated → 审计日志
let (mut patient_update_rx, patient_update_handle) =
state.event_bus.subscribe_filtered("patient.".to_string());
handles.push(patient_update_handle);
let patient_update_db = state.db.clone();
tokio::spawn(async move {
loop {
match patient_update_rx.recv().await {
Some(event) if event.event_type == super::PATIENT_UPDATED => {
if erp_core::events::is_event_processed(
&patient_update_db,
event.id,
"patient_updated_audit",
)
.await
.unwrap_or(false)
{
continue;
}
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
tracing::info!(patient_id = ?patient_id, tenant_id = %event.tenant_id, "患者信息已更新");
let _ = erp_core::events::mark_event_processed(
&patient_update_db,
event.id,
"patient_updated_audit",
)
.await;
}
Some(_) => {}
None => break,
}
}
});
handles
}

View File

@@ -0,0 +1,124 @@
/// points.earned/exchanged/expired → 积分变动通知
pub fn spawn(state: &crate::state::HealthState) -> Vec<erp_core::events::SubscriptionHandle> {
let mut handles = Vec::new();
let (mut points_rx, points_handle) = state.event_bus.subscribe_filtered("points.".to_string());
handles.push(points_handle);
let points_db = state.db.clone();
let points_bus = state.event_bus.clone();
tokio::spawn(async move {
loop {
match points_rx.recv().await {
Some(event) if event.event_type == super::POINTS_EARNED => {
if erp_core::events::is_event_processed(
&points_db,
event.id,
"points_earned_notifier",
)
.await
.unwrap_or(false)
{
continue;
}
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
let amount = event.payload.get("amount").and_then(|v| v.as_u64());
if let (Some(pid), Some(amt)) = (patient_id, amount) {
let notify = erp_core::events::DomainEvent::new(
"message.send",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"channel": "in_app",
"recipient_type": "patient",
"recipient_id": pid,
"template_key": "POINTS_EARNED",
"params": { "amount": amt }
})),
);
points_bus.publish(notify, &points_db).await;
tracing::info!(patient_id = pid, amount = amt, "积分获得通知已发送");
}
let _ = erp_core::events::mark_event_processed(
&points_db,
event.id,
"points_earned_notifier",
)
.await;
}
Some(event) if event.event_type == super::POINTS_EXCHANGED => {
if erp_core::events::is_event_processed(
&points_db,
event.id,
"points_exchanged_notifier",
)
.await
.unwrap_or(false)
{
continue;
}
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
let amount = event.payload.get("amount").and_then(|v| v.as_u64());
if let (Some(pid), Some(amt)) = (patient_id, amount) {
let notify = erp_core::events::DomainEvent::new(
"message.send",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"channel": "in_app",
"recipient_type": "patient",
"recipient_id": pid,
"template_key": "POINTS_EXCHANGED",
"params": { "amount": amt }
})),
);
points_bus.publish(notify, &points_db).await;
tracing::info!(patient_id = pid, amount = amt, "积分兑换通知已发送");
}
let _ = erp_core::events::mark_event_processed(
&points_db,
event.id,
"points_exchanged_notifier",
)
.await;
}
Some(event) if event.event_type == super::POINTS_EXPIRED => {
if erp_core::events::is_event_processed(
&points_db,
event.id,
"points_expired_notifier",
)
.await
.unwrap_or(false)
{
continue;
}
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
let amount = event.payload.get("amount").and_then(|v| v.as_u64());
if let (Some(pid), Some(amt)) = (patient_id, amount) {
let notify = erp_core::events::DomainEvent::new(
"message.send",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"channel": "in_app",
"recipient_type": "patient",
"recipient_id": pid,
"template_key": "POINTS_EXPIRED",
"params": { "amount": amt }
})),
);
points_bus.publish(notify, &points_db).await;
tracing::info!(patient_id = pid, amount = amt, "积分过期通知已发送");
}
let _ = erp_core::events::mark_event_processed(
&points_db,
event.id,
"points_expired_notifier",
)
.await;
}
Some(_) => {}
None => break,
}
}
});
handles
}

View File

@@ -0,0 +1,76 @@
/// workflow.task.completed → 更新随访任务状态为 completed
pub fn spawn(state: &crate::state::HealthState) -> Vec<erp_core::events::SubscriptionHandle> {
let mut handles = Vec::new();
let (mut workflow_rx, wf_handle) = state
.event_bus
.subscribe_filtered("workflow.task.".to_string());
handles.push(wf_handle);
let wf_db = state.db.clone();
tokio::spawn(async move {
loop {
match workflow_rx.recv().await {
Some(event) if event.event_type == "workflow.task.completed" => {
if erp_core::events::is_event_processed(
&wf_db,
event.id,
"workflow_task_consumer",
)
.await
.unwrap_or(false)
{
continue;
}
let task_id = event
.payload
.get("task_id")
.and_then(|v| v.as_str())
.and_then(|s| uuid::Uuid::parse_str(s).ok());
match task_id {
Some(task_id) => {
match crate::service::follow_up_service::complete_task_by_system(
&wf_db,
task_id,
event.tenant_id,
)
.await
{
Ok(()) => {
tracing::info!(
event_id = %event.id,
task_id = %task_id,
"工作流任务完成 → 随访任务已更新"
);
}
Err(e) => {
tracing::warn!(
event_id = %event.id,
task_id = %task_id,
error = %e,
"工作流任务完成 → 随访任务更新失败"
);
}
}
}
None => {
tracing::warn!(
event_id = %event.id,
"工作流任务完成事件缺少 task_id跳过"
);
}
}
let _ = erp_core::events::mark_event_processed(
&wf_db,
event.id,
"workflow_task_consumer",
)
.await;
}
Some(_) => {}
None => break,
}
}
});
handles
}