将单体 event.rs 按业务域拆分为 event/ 模块目录: - mod.rs (219 行): 31 事件常量 + 调度器 + 测试 - 12 个消费者文件: workflow/device/alert/patient/appointment/ follow_up/health_data/ai/consent/consultation/points/lab_report 每个消费者文件 50-215 行,独立可维护。 编译零错误,测试全部通过。
92 lines
3.6 KiB
Rust
92 lines
3.6 KiB
Rust
/// patient.created → 欢迎消息通知 + patient.updated → 审计日志
|
|
pub fn spawn(state: &crate::state::HealthState) -> Vec<erp_core::events::SubscriptionHandle> {
|
|
let mut handles = Vec::new();
|
|
|
|
// patient.created → 欢迎消息通知
|
|
let (mut patient_rx, patient_handle) =
|
|
state.event_bus.subscribe_filtered("patient.".to_string());
|
|
handles.push(patient_handle);
|
|
let patient_db = state.db.clone();
|
|
let patient_bus = state.event_bus.clone();
|
|
tokio::spawn(async move {
|
|
loop {
|
|
match patient_rx.recv().await {
|
|
Some(event) if event.event_type == super::PATIENT_CREATED => {
|
|
if erp_core::events::is_event_processed(
|
|
&patient_db,
|
|
event.id,
|
|
"patient_welcome",
|
|
)
|
|
.await
|
|
.unwrap_or(false)
|
|
{
|
|
continue;
|
|
}
|
|
let patient_id = event
|
|
.payload
|
|
.get("patient_id")
|
|
.and_then(|v| v.as_str())
|
|
.and_then(|s| uuid::Uuid::parse_str(s).ok());
|
|
if let Some(pid) = patient_id {
|
|
let welcome_event = erp_core::events::DomainEvent::new(
|
|
"message.send",
|
|
event.tenant_id,
|
|
erp_core::events::build_event_payload(serde_json::json!({
|
|
"template": "patient_welcome",
|
|
"recipient_type": "patient",
|
|
"recipient_id": pid,
|
|
})),
|
|
);
|
|
patient_bus.publish(welcome_event, &patient_db).await;
|
|
tracing::info!(patient_id = %pid, "新患者欢迎流程触发");
|
|
}
|
|
let _ = erp_core::events::mark_event_processed(
|
|
&patient_db,
|
|
event.id,
|
|
"patient_welcome",
|
|
)
|
|
.await;
|
|
}
|
|
Some(_) => {}
|
|
None => break,
|
|
}
|
|
}
|
|
});
|
|
|
|
// patient.updated → 审计日志
|
|
let (mut patient_update_rx, patient_update_handle) =
|
|
state.event_bus.subscribe_filtered("patient.".to_string());
|
|
handles.push(patient_update_handle);
|
|
let patient_update_db = state.db.clone();
|
|
tokio::spawn(async move {
|
|
loop {
|
|
match patient_update_rx.recv().await {
|
|
Some(event) if event.event_type == super::PATIENT_UPDATED => {
|
|
if erp_core::events::is_event_processed(
|
|
&patient_update_db,
|
|
event.id,
|
|
"patient_updated_audit",
|
|
)
|
|
.await
|
|
.unwrap_or(false)
|
|
{
|
|
continue;
|
|
}
|
|
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
|
|
tracing::info!(patient_id = ?patient_id, tenant_id = %event.tenant_id, "患者信息已更新");
|
|
let _ = erp_core::events::mark_event_processed(
|
|
&patient_update_db,
|
|
event.id,
|
|
"patient_updated_audit",
|
|
)
|
|
.await;
|
|
}
|
|
Some(_) => {}
|
|
None => break,
|
|
}
|
|
}
|
|
});
|
|
|
|
handles
|
|
}
|