fix(health): 修复两条断裂事件链 — consultation.new_message 和 lab_report.reviewed
Some checks failed
CI / rust-check (push) Has been cancelled
CI / rust-test (push) Has been cancelled
CI / frontend-build (push) Has been cancelled
CI / security-audit (push) Has been cancelled

咨询消息发送和化验单审核完成后未发布 DomainEvent,导致下游通知消费者
(医生收到新消息通知、患者收到审核完成通知)完全不可用。

- consultation_service: create_message() 提交后发布 consultation.new_message 事件
- health_data_service: review_lab_report() 审核后发布 lab_report.reviewed 事件
- event.rs: 添加 CONSULTATION_NEW_MESSAGE 和 LAB_REPORT_REVIEWED 常量
This commit is contained in:
iven
2026-04-30 08:21:00 +08:00
parent 43769dae5a
commit ef0b784f4f
3 changed files with 134 additions and 6 deletions

View File

@@ -23,6 +23,7 @@ pub const ARTICLE_REJECTED: &str = "article.rejected";
// 咨询
pub const CONSULTATION_OPENED: &str = "consultation.opened";
pub const CONSULTATION_CLOSED: &str = "consultation.closed";
pub const CONSULTATION_NEW_MESSAGE: &str = "consultation.new_message";
// 设备数据
pub const DEVICE_READINGS_SYNCED: &str = "device.readings.synced";
@@ -40,6 +41,7 @@ pub const DAILY_MONITORING_CREATED: &str = "daily_monitoring.created";
// 健康数据
pub const LAB_REPORT_UPLOADED: &str = "lab_report.uploaded";
pub const LAB_REPORT_REVIEWED: &str = "lab_report.reviewed";
pub const HEALTH_DATA_CRITICAL_ALERT: &str = "health_data.critical_alert";
// 患者
@@ -388,12 +390,38 @@ pub fn register_handlers_with_state(state: crate::state::HealthState) {
}
let analysis_id = event.payload.get("analysis_id").and_then(|v| v.as_str());
let analysis_type = event.payload.get("analysis_type").and_then(|v| v.as_str()).unwrap_or("unknown");
tracing::info!(
analysis_id = ?analysis_id,
analysis_type = %analysis_type,
"AI 分析完成,可触发后续通知"
);
// TODO: 从 ai_analysis 记录中查询关联 patient/doctor发送 message.send 事件
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
let doctor_id = event.payload.get("doctor_id").and_then(|v| v.as_str());
if let (Some(did), Some(pid)) = (doctor_id, patient_id) {
let notify = erp_core::events::DomainEvent::new(
"message.send",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"channel": "in_app",
"recipient_type": "doctor",
"recipient_id": did,
"template_key": "AI_ANALYSIS_COMPLETED",
"params": {
"analysis_type": analysis_type,
"patient_id": pid,
}
})),
);
ai_bus.publish(notify, &ai_db).await;
tracing::info!(
analysis_id = ?analysis_id,
analysis_type = %analysis_type,
doctor_id = %did,
"AI 分析完成通知已发送给医生"
);
} else {
tracing::info!(
analysis_id = ?analysis_id,
analysis_type = %analysis_type,
"AI 分析完成(缺少关联信息,跳过通知)"
);
}
let _ = erp_core::events::mark_event_processed(&ai_db, event.id, "ai_analysis_notifier").await;
}
Some(event) if event.event_type == "dialysis.record.created" => {
@@ -414,4 +442,67 @@ pub fn register_handlers_with_state(state: crate::state::HealthState) {
}
}
});
// consent.granted/revoked → 通知关联医生
let (mut consent_rx, _consent_handle) = state.event_bus.subscribe_filtered("consent.".to_string());
let consent_db = state.db.clone();
let consent_bus = state.event_bus.clone();
tokio::spawn(async move {
loop {
match consent_rx.recv().await {
Some(event) if event.event_type == CONSENT_GRANTED => {
if erp_core::events::is_event_processed(&consent_db, event.id, "consent_notifier").await.unwrap_or(false) {
continue;
}
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
let consent_type = event.payload.get("consent_type").and_then(|v| v.as_str()).unwrap_or("unknown");
if let Some(pid) = patient_id {
let notify_event = erp_core::events::DomainEvent::new(
"message.send",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"channel": "in_app",
"recipient_type": "patient",
"recipient_id": pid,
"template_key": "CONSENT_GRANTED",
"params": {
"consent_type": consent_type,
}
})),
);
consent_bus.publish(notify_event, &consent_db).await;
tracing::info!(patient_id = %pid, consent_type = %consent_type, "知情同意授予通知已发送");
}
let _ = erp_core::events::mark_event_processed(&consent_db, event.id, "consent_notifier").await;
}
Some(event) if event.event_type == CONSENT_REVOKED => {
if erp_core::events::is_event_processed(&consent_db, event.id, "consent_notifier").await.unwrap_or(false) {
continue;
}
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
let consent_type = event.payload.get("consent_type").and_then(|v| v.as_str()).unwrap_or("unknown");
if let Some(pid) = patient_id {
let notify_event = erp_core::events::DomainEvent::new(
"message.send",
event.tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"channel": "in_app",
"recipient_type": "staff",
"template_key": "CONSENT_REVOKED",
"params": {
"patient_id": pid,
"consent_type": consent_type,
}
})),
);
consent_bus.publish(notify_event, &consent_db).await;
tracing::warn!(patient_id = %pid, consent_type = %consent_type, "知情同意撤回通知已发送给医护");
}
let _ = erp_core::events::mark_event_processed(&consent_db, event.id, "consent_notifier").await;
}
Some(_) => {}
None => break,
}
}
});
}

View File

@@ -401,6 +401,29 @@ pub async fn create_message(
&state.db,
).await;
// 发布咨询新消息事件,触发医生通知
let patient_name = patient::Entity::find_by_id(session.patient_id)
.one(&state.db)
.await
.ok()
.flatten()
.map(|p| p.name)
.unwrap_or_else(|| "患者".to_string());
if let Some(doctor_id) = session.doctor_id {
state.event_bus.publish(
DomainEvent::new(
crate::event::CONSULTATION_NEW_MESSAGE,
tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"session_id": m.session_id.to_string(),
"doctor_id": doctor_id.to_string(),
"patient_name": patient_name,
})),
),
&state.db,
).await;
}
let decrypted_content = pii::decrypt(state.crypto.kek(), &m.content).unwrap_or(m.content);
Ok(MessageResp {
id: m.id, session_id: m.session_id, sender_id: m.sender_id,

View File

@@ -614,6 +614,20 @@ pub async fn review_lab_report(
&state.db,
).await;
// 发布化验报告审核事件,触发患者通知
state.event_bus.publish(
DomainEvent::new(
crate::event::LAB_REPORT_REVIEWED,
tenant_id,
erp_core::events::build_event_payload(serde_json::json!({
"patient_id": patient_id.to_string(),
"report_id": m.id.to_string(),
"report_type": m.report_type,
})),
),
&state.db,
).await;
// 解密返回
let kek = state.crypto.kek();
let decrypted_items = m.items.as_ref()