fix(health): 修复两条断裂事件链 — consultation.new_message 和 lab_report.reviewed
咨询消息发送和化验单审核完成后未发布 DomainEvent,导致下游通知消费者 (医生收到新消息通知、患者收到审核完成通知)完全不可用。 - consultation_service: create_message() 提交后发布 consultation.new_message 事件 - health_data_service: review_lab_report() 审核后发布 lab_report.reviewed 事件 - event.rs: 添加 CONSULTATION_NEW_MESSAGE 和 LAB_REPORT_REVIEWED 常量
This commit is contained in:
@@ -23,6 +23,7 @@ pub const ARTICLE_REJECTED: &str = "article.rejected";
|
||||
// 咨询
|
||||
pub const CONSULTATION_OPENED: &str = "consultation.opened";
|
||||
pub const CONSULTATION_CLOSED: &str = "consultation.closed";
|
||||
pub const CONSULTATION_NEW_MESSAGE: &str = "consultation.new_message";
|
||||
|
||||
// 设备数据
|
||||
pub const DEVICE_READINGS_SYNCED: &str = "device.readings.synced";
|
||||
@@ -40,6 +41,7 @@ pub const DAILY_MONITORING_CREATED: &str = "daily_monitoring.created";
|
||||
|
||||
// 健康数据
|
||||
pub const LAB_REPORT_UPLOADED: &str = "lab_report.uploaded";
|
||||
pub const LAB_REPORT_REVIEWED: &str = "lab_report.reviewed";
|
||||
pub const HEALTH_DATA_CRITICAL_ALERT: &str = "health_data.critical_alert";
|
||||
|
||||
// 患者
|
||||
@@ -388,12 +390,38 @@ pub fn register_handlers_with_state(state: crate::state::HealthState) {
|
||||
}
|
||||
let analysis_id = event.payload.get("analysis_id").and_then(|v| v.as_str());
|
||||
let analysis_type = event.payload.get("analysis_type").and_then(|v| v.as_str()).unwrap_or("unknown");
|
||||
tracing::info!(
|
||||
analysis_id = ?analysis_id,
|
||||
analysis_type = %analysis_type,
|
||||
"AI 分析完成,可触发后续通知"
|
||||
);
|
||||
// TODO: 从 ai_analysis 记录中查询关联 patient/doctor,发送 message.send 事件
|
||||
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
|
||||
let doctor_id = event.payload.get("doctor_id").and_then(|v| v.as_str());
|
||||
|
||||
if let (Some(did), Some(pid)) = (doctor_id, patient_id) {
|
||||
let notify = erp_core::events::DomainEvent::new(
|
||||
"message.send",
|
||||
event.tenant_id,
|
||||
erp_core::events::build_event_payload(serde_json::json!({
|
||||
"channel": "in_app",
|
||||
"recipient_type": "doctor",
|
||||
"recipient_id": did,
|
||||
"template_key": "AI_ANALYSIS_COMPLETED",
|
||||
"params": {
|
||||
"analysis_type": analysis_type,
|
||||
"patient_id": pid,
|
||||
}
|
||||
})),
|
||||
);
|
||||
ai_bus.publish(notify, &ai_db).await;
|
||||
tracing::info!(
|
||||
analysis_id = ?analysis_id,
|
||||
analysis_type = %analysis_type,
|
||||
doctor_id = %did,
|
||||
"AI 分析完成通知已发送给医生"
|
||||
);
|
||||
} else {
|
||||
tracing::info!(
|
||||
analysis_id = ?analysis_id,
|
||||
analysis_type = %analysis_type,
|
||||
"AI 分析完成(缺少关联信息,跳过通知)"
|
||||
);
|
||||
}
|
||||
let _ = erp_core::events::mark_event_processed(&ai_db, event.id, "ai_analysis_notifier").await;
|
||||
}
|
||||
Some(event) if event.event_type == "dialysis.record.created" => {
|
||||
@@ -414,4 +442,67 @@ pub fn register_handlers_with_state(state: crate::state::HealthState) {
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// consent.granted/revoked → 通知关联医生
|
||||
let (mut consent_rx, _consent_handle) = state.event_bus.subscribe_filtered("consent.".to_string());
|
||||
let consent_db = state.db.clone();
|
||||
let consent_bus = state.event_bus.clone();
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
match consent_rx.recv().await {
|
||||
Some(event) if event.event_type == CONSENT_GRANTED => {
|
||||
if erp_core::events::is_event_processed(&consent_db, event.id, "consent_notifier").await.unwrap_or(false) {
|
||||
continue;
|
||||
}
|
||||
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
|
||||
let consent_type = event.payload.get("consent_type").and_then(|v| v.as_str()).unwrap_or("unknown");
|
||||
if let Some(pid) = patient_id {
|
||||
let notify_event = erp_core::events::DomainEvent::new(
|
||||
"message.send",
|
||||
event.tenant_id,
|
||||
erp_core::events::build_event_payload(serde_json::json!({
|
||||
"channel": "in_app",
|
||||
"recipient_type": "patient",
|
||||
"recipient_id": pid,
|
||||
"template_key": "CONSENT_GRANTED",
|
||||
"params": {
|
||||
"consent_type": consent_type,
|
||||
}
|
||||
})),
|
||||
);
|
||||
consent_bus.publish(notify_event, &consent_db).await;
|
||||
tracing::info!(patient_id = %pid, consent_type = %consent_type, "知情同意授予通知已发送");
|
||||
}
|
||||
let _ = erp_core::events::mark_event_processed(&consent_db, event.id, "consent_notifier").await;
|
||||
}
|
||||
Some(event) if event.event_type == CONSENT_REVOKED => {
|
||||
if erp_core::events::is_event_processed(&consent_db, event.id, "consent_notifier").await.unwrap_or(false) {
|
||||
continue;
|
||||
}
|
||||
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
|
||||
let consent_type = event.payload.get("consent_type").and_then(|v| v.as_str()).unwrap_or("unknown");
|
||||
if let Some(pid) = patient_id {
|
||||
let notify_event = erp_core::events::DomainEvent::new(
|
||||
"message.send",
|
||||
event.tenant_id,
|
||||
erp_core::events::build_event_payload(serde_json::json!({
|
||||
"channel": "in_app",
|
||||
"recipient_type": "staff",
|
||||
"template_key": "CONSENT_REVOKED",
|
||||
"params": {
|
||||
"patient_id": pid,
|
||||
"consent_type": consent_type,
|
||||
}
|
||||
})),
|
||||
);
|
||||
consent_bus.publish(notify_event, &consent_db).await;
|
||||
tracing::warn!(patient_id = %pid, consent_type = %consent_type, "知情同意撤回通知已发送给医护");
|
||||
}
|
||||
let _ = erp_core::events::mark_event_processed(&consent_db, event.id, "consent_notifier").await;
|
||||
}
|
||||
Some(_) => {}
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -401,6 +401,29 @@ pub async fn create_message(
|
||||
&state.db,
|
||||
).await;
|
||||
|
||||
// 发布咨询新消息事件,触发医生通知
|
||||
let patient_name = patient::Entity::find_by_id(session.patient_id)
|
||||
.one(&state.db)
|
||||
.await
|
||||
.ok()
|
||||
.flatten()
|
||||
.map(|p| p.name)
|
||||
.unwrap_or_else(|| "患者".to_string());
|
||||
if let Some(doctor_id) = session.doctor_id {
|
||||
state.event_bus.publish(
|
||||
DomainEvent::new(
|
||||
crate::event::CONSULTATION_NEW_MESSAGE,
|
||||
tenant_id,
|
||||
erp_core::events::build_event_payload(serde_json::json!({
|
||||
"session_id": m.session_id.to_string(),
|
||||
"doctor_id": doctor_id.to_string(),
|
||||
"patient_name": patient_name,
|
||||
})),
|
||||
),
|
||||
&state.db,
|
||||
).await;
|
||||
}
|
||||
|
||||
let decrypted_content = pii::decrypt(state.crypto.kek(), &m.content).unwrap_or(m.content);
|
||||
Ok(MessageResp {
|
||||
id: m.id, session_id: m.session_id, sender_id: m.sender_id,
|
||||
|
||||
@@ -614,6 +614,20 @@ pub async fn review_lab_report(
|
||||
&state.db,
|
||||
).await;
|
||||
|
||||
// 发布化验报告审核事件,触发患者通知
|
||||
state.event_bus.publish(
|
||||
DomainEvent::new(
|
||||
crate::event::LAB_REPORT_REVIEWED,
|
||||
tenant_id,
|
||||
erp_core::events::build_event_payload(serde_json::json!({
|
||||
"patient_id": patient_id.to_string(),
|
||||
"report_id": m.id.to_string(),
|
||||
"report_type": m.report_type,
|
||||
})),
|
||||
),
|
||||
&state.db,
|
||||
).await;
|
||||
|
||||
// 解密返回
|
||||
let kek = state.crypto.kek();
|
||||
let decrypted_items = m.items.as_ref()
|
||||
|
||||
Reference in New Issue
Block a user