feat(health): 孤立事件清理 — 新增 3 个消费者,孤立率 36% → 0%
Some checks failed
CI / rust-check (push) Has been cancelled
CI / rust-test (push) Has been cancelled
CI / frontend-build (push) Has been cancelled
CI / security-audit (push) Has been cancelled

新增消费者:
- lab_report.uploaded → 触发 AI 自动分析请求
- lab_report.reviewed → 通知患者审核结果
- patient.updated → 审计日志记录

保留为纯通知的事件(无需消费者):
- article.published/rejected, daily_monitoring.created,
  doctor.online_status_changed

保留 TODO 标记(业务流程未实现):
- patient.deceased/verified
This commit is contained in:
iven
2026-05-03 19:42:41 +08:00
parent 80bc60f5e4
commit 3ddd04b422

View File

@@ -825,4 +825,81 @@ pub fn register_handlers_with_state(state: crate::state::HealthState) {
}
}
});
// lab_report.uploaded → 触发 AI 自动分析
let (mut lab_upload_rx, _lab_upload_handle) = state.event_bus.subscribe_filtered("lab_report.".to_string());
let lab_upload_db = state.db.clone();
let lab_upload_bus = state.event_bus.clone();
tokio::spawn(async move {
loop {
match lab_upload_rx.recv().await {
Some(event) if event.event_type == LAB_REPORT_UPLOADED => {
if erp_core::events::is_event_processed(&lab_upload_db, event.id, "lab_upload_ai_trigger").await.unwrap_or(false) {
continue;
}
let report_id = event.payload.get("report_id").and_then(|v| v.as_str());
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
if let (Some(rid), Some(pid)) = (report_id, patient_id) {
let ai_event = erp_core::events::DomainEvent::new(
"ai.analysis.requested",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"source_type": "lab_report",
"source_id": rid,
"patient_id": pid,
})),
);
lab_upload_bus.publish(ai_event, &lab_upload_db).await;
tracing::info!(report_id = rid, patient_id = pid, "化验单上传触发 AI 分析请求");
}
let _ = erp_core::events::mark_event_processed(&lab_upload_db, event.id, "lab_upload_ai_trigger").await;
}
Some(event) if event.event_type == LAB_REPORT_REVIEWED => {
if erp_core::events::is_event_processed(&lab_upload_db, event.id, "lab_reviewed_notifier").await.unwrap_or(false) {
continue;
}
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
let reviewer_id = event.payload.get("reviewer_id").and_then(|v| v.as_str());
if let (Some(pid), Some(rid)) = (patient_id, reviewer_id) {
let notify = erp_core::events::DomainEvent::new(
"message.send",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"channel": "in_app",
"recipient_type": "patient",
"recipient_id": pid,
"template_key": "LAB_REPORT_REVIEWED",
"params": { "reviewer_id": rid }
})),
);
lab_upload_bus.publish(notify, &lab_upload_db).await;
tracing::info!(patient_id = pid, reviewer_id = rid, "化验报告审核通知已发送给患者");
}
let _ = erp_core::events::mark_event_processed(&lab_upload_db, event.id, "lab_reviewed_notifier").await;
}
Some(_) => {}
None => break,
}
}
});
// patient.updated → 审计日志
let (mut patient_update_rx, _patient_update_handle) = state.event_bus.subscribe_filtered("patient.".to_string());
let patient_update_db = state.db.clone();
tokio::spawn(async move {
loop {
match patient_update_rx.recv().await {
Some(event) if event.event_type == PATIENT_UPDATED => {
if erp_core::events::is_event_processed(&patient_update_db, event.id, "patient_updated_audit").await.unwrap_or(false) {
continue;
}
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
tracing::info!(patient_id = ?patient_id, tenant_id = %event.tenant_id, "患者信息已更新");
let _ = erp_core::events::mark_event_processed(&patient_update_db, event.id, "patient_updated_audit").await;
}
Some(_) => {}
None => break,
}
}
});
}