fix(health): 修复两条断裂事件链 — consultation.new_message 和 lab_report.reviewed
咨询消息发送和化验单审核完成后未发布 DomainEvent,导致下游通知消费者 (医生收到新消息通知、患者收到审核完成通知)完全不可用。 - consultation_service: create_message() 提交后发布 consultation.new_message 事件 - health_data_service: review_lab_report() 审核后发布 lab_report.reviewed 事件 - event.rs: 添加 CONSULTATION_NEW_MESSAGE 和 LAB_REPORT_REVIEWED 常量
This commit is contained in:
@@ -23,6 +23,7 @@ pub const ARTICLE_REJECTED: &str = "article.rejected";
|
|||||||
// 咨询
|
// 咨询
|
||||||
pub const CONSULTATION_OPENED: &str = "consultation.opened";
|
pub const CONSULTATION_OPENED: &str = "consultation.opened";
|
||||||
pub const CONSULTATION_CLOSED: &str = "consultation.closed";
|
pub const CONSULTATION_CLOSED: &str = "consultation.closed";
|
||||||
|
pub const CONSULTATION_NEW_MESSAGE: &str = "consultation.new_message";
|
||||||
|
|
||||||
// 设备数据
|
// 设备数据
|
||||||
pub const DEVICE_READINGS_SYNCED: &str = "device.readings.synced";
|
pub const DEVICE_READINGS_SYNCED: &str = "device.readings.synced";
|
||||||
@@ -40,6 +41,7 @@ pub const DAILY_MONITORING_CREATED: &str = "daily_monitoring.created";
|
|||||||
|
|
||||||
// 健康数据
|
// 健康数据
|
||||||
pub const LAB_REPORT_UPLOADED: &str = "lab_report.uploaded";
|
pub const LAB_REPORT_UPLOADED: &str = "lab_report.uploaded";
|
||||||
|
pub const LAB_REPORT_REVIEWED: &str = "lab_report.reviewed";
|
||||||
pub const HEALTH_DATA_CRITICAL_ALERT: &str = "health_data.critical_alert";
|
pub const HEALTH_DATA_CRITICAL_ALERT: &str = "health_data.critical_alert";
|
||||||
|
|
||||||
// 患者
|
// 患者
|
||||||
@@ -388,12 +390,38 @@ pub fn register_handlers_with_state(state: crate::state::HealthState) {
|
|||||||
}
|
}
|
||||||
let analysis_id = event.payload.get("analysis_id").and_then(|v| v.as_str());
|
let analysis_id = event.payload.get("analysis_id").and_then(|v| v.as_str());
|
||||||
let analysis_type = event.payload.get("analysis_type").and_then(|v| v.as_str()).unwrap_or("unknown");
|
let analysis_type = event.payload.get("analysis_type").and_then(|v| v.as_str()).unwrap_or("unknown");
|
||||||
|
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
|
||||||
|
let doctor_id = event.payload.get("doctor_id").and_then(|v| v.as_str());
|
||||||
|
|
||||||
|
if let (Some(did), Some(pid)) = (doctor_id, patient_id) {
|
||||||
|
let notify = erp_core::events::DomainEvent::new(
|
||||||
|
"message.send",
|
||||||
|
event.tenant_id,
|
||||||
|
erp_core::events::build_event_payload(serde_json::json!({
|
||||||
|
"channel": "in_app",
|
||||||
|
"recipient_type": "doctor",
|
||||||
|
"recipient_id": did,
|
||||||
|
"template_key": "AI_ANALYSIS_COMPLETED",
|
||||||
|
"params": {
|
||||||
|
"analysis_type": analysis_type,
|
||||||
|
"patient_id": pid,
|
||||||
|
}
|
||||||
|
})),
|
||||||
|
);
|
||||||
|
ai_bus.publish(notify, &ai_db).await;
|
||||||
tracing::info!(
|
tracing::info!(
|
||||||
analysis_id = ?analysis_id,
|
analysis_id = ?analysis_id,
|
||||||
analysis_type = %analysis_type,
|
analysis_type = %analysis_type,
|
||||||
"AI 分析完成,可触发后续通知"
|
doctor_id = %did,
|
||||||
|
"AI 分析完成通知已发送给医生"
|
||||||
);
|
);
|
||||||
// TODO: 从 ai_analysis 记录中查询关联 patient/doctor,发送 message.send 事件
|
} else {
|
||||||
|
tracing::info!(
|
||||||
|
analysis_id = ?analysis_id,
|
||||||
|
analysis_type = %analysis_type,
|
||||||
|
"AI 分析完成(缺少关联信息,跳过通知)"
|
||||||
|
);
|
||||||
|
}
|
||||||
let _ = erp_core::events::mark_event_processed(&ai_db, event.id, "ai_analysis_notifier").await;
|
let _ = erp_core::events::mark_event_processed(&ai_db, event.id, "ai_analysis_notifier").await;
|
||||||
}
|
}
|
||||||
Some(event) if event.event_type == "dialysis.record.created" => {
|
Some(event) if event.event_type == "dialysis.record.created" => {
|
||||||
@@ -414,4 +442,67 @@ pub fn register_handlers_with_state(state: crate::state::HealthState) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// consent.granted/revoked → 通知关联医生
|
||||||
|
let (mut consent_rx, _consent_handle) = state.event_bus.subscribe_filtered("consent.".to_string());
|
||||||
|
let consent_db = state.db.clone();
|
||||||
|
let consent_bus = state.event_bus.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
loop {
|
||||||
|
match consent_rx.recv().await {
|
||||||
|
Some(event) if event.event_type == CONSENT_GRANTED => {
|
||||||
|
if erp_core::events::is_event_processed(&consent_db, event.id, "consent_notifier").await.unwrap_or(false) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
|
||||||
|
let consent_type = event.payload.get("consent_type").and_then(|v| v.as_str()).unwrap_or("unknown");
|
||||||
|
if let Some(pid) = patient_id {
|
||||||
|
let notify_event = erp_core::events::DomainEvent::new(
|
||||||
|
"message.send",
|
||||||
|
event.tenant_id,
|
||||||
|
erp_core::events::build_event_payload(serde_json::json!({
|
||||||
|
"channel": "in_app",
|
||||||
|
"recipient_type": "patient",
|
||||||
|
"recipient_id": pid,
|
||||||
|
"template_key": "CONSENT_GRANTED",
|
||||||
|
"params": {
|
||||||
|
"consent_type": consent_type,
|
||||||
|
}
|
||||||
|
})),
|
||||||
|
);
|
||||||
|
consent_bus.publish(notify_event, &consent_db).await;
|
||||||
|
tracing::info!(patient_id = %pid, consent_type = %consent_type, "知情同意授予通知已发送");
|
||||||
|
}
|
||||||
|
let _ = erp_core::events::mark_event_processed(&consent_db, event.id, "consent_notifier").await;
|
||||||
|
}
|
||||||
|
Some(event) if event.event_type == CONSENT_REVOKED => {
|
||||||
|
if erp_core::events::is_event_processed(&consent_db, event.id, "consent_notifier").await.unwrap_or(false) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
|
||||||
|
let consent_type = event.payload.get("consent_type").and_then(|v| v.as_str()).unwrap_or("unknown");
|
||||||
|
if let Some(pid) = patient_id {
|
||||||
|
let notify_event = erp_core::events::DomainEvent::new(
|
||||||
|
"message.send",
|
||||||
|
event.tenant_id,
|
||||||
|
erp_core::events::build_event_payload(serde_json::json!({
|
||||||
|
"channel": "in_app",
|
||||||
|
"recipient_type": "staff",
|
||||||
|
"template_key": "CONSENT_REVOKED",
|
||||||
|
"params": {
|
||||||
|
"patient_id": pid,
|
||||||
|
"consent_type": consent_type,
|
||||||
|
}
|
||||||
|
})),
|
||||||
|
);
|
||||||
|
consent_bus.publish(notify_event, &consent_db).await;
|
||||||
|
tracing::warn!(patient_id = %pid, consent_type = %consent_type, "知情同意撤回通知已发送给医护");
|
||||||
|
}
|
||||||
|
let _ = erp_core::events::mark_event_processed(&consent_db, event.id, "consent_notifier").await;
|
||||||
|
}
|
||||||
|
Some(_) => {}
|
||||||
|
None => break,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -401,6 +401,29 @@ pub async fn create_message(
|
|||||||
&state.db,
|
&state.db,
|
||||||
).await;
|
).await;
|
||||||
|
|
||||||
|
// 发布咨询新消息事件,触发医生通知
|
||||||
|
let patient_name = patient::Entity::find_by_id(session.patient_id)
|
||||||
|
.one(&state.db)
|
||||||
|
.await
|
||||||
|
.ok()
|
||||||
|
.flatten()
|
||||||
|
.map(|p| p.name)
|
||||||
|
.unwrap_or_else(|| "患者".to_string());
|
||||||
|
if let Some(doctor_id) = session.doctor_id {
|
||||||
|
state.event_bus.publish(
|
||||||
|
DomainEvent::new(
|
||||||
|
crate::event::CONSULTATION_NEW_MESSAGE,
|
||||||
|
tenant_id,
|
||||||
|
erp_core::events::build_event_payload(serde_json::json!({
|
||||||
|
"session_id": m.session_id.to_string(),
|
||||||
|
"doctor_id": doctor_id.to_string(),
|
||||||
|
"patient_name": patient_name,
|
||||||
|
})),
|
||||||
|
),
|
||||||
|
&state.db,
|
||||||
|
).await;
|
||||||
|
}
|
||||||
|
|
||||||
let decrypted_content = pii::decrypt(state.crypto.kek(), &m.content).unwrap_or(m.content);
|
let decrypted_content = pii::decrypt(state.crypto.kek(), &m.content).unwrap_or(m.content);
|
||||||
Ok(MessageResp {
|
Ok(MessageResp {
|
||||||
id: m.id, session_id: m.session_id, sender_id: m.sender_id,
|
id: m.id, session_id: m.session_id, sender_id: m.sender_id,
|
||||||
|
|||||||
@@ -614,6 +614,20 @@ pub async fn review_lab_report(
|
|||||||
&state.db,
|
&state.db,
|
||||||
).await;
|
).await;
|
||||||
|
|
||||||
|
// 发布化验报告审核事件,触发患者通知
|
||||||
|
state.event_bus.publish(
|
||||||
|
DomainEvent::new(
|
||||||
|
crate::event::LAB_REPORT_REVIEWED,
|
||||||
|
tenant_id,
|
||||||
|
erp_core::events::build_event_payload(serde_json::json!({
|
||||||
|
"patient_id": patient_id.to_string(),
|
||||||
|
"report_id": m.id.to_string(),
|
||||||
|
"report_type": m.report_type,
|
||||||
|
})),
|
||||||
|
),
|
||||||
|
&state.db,
|
||||||
|
).await;
|
||||||
|
|
||||||
// 解密返回
|
// 解密返回
|
||||||
let kek = state.crypto.kek();
|
let kek = state.crypto.kek();
|
||||||
let decrypted_items = m.items.as_ref()
|
let decrypted_items = m.items.as_ref()
|
||||||
|
|||||||
Reference in New Issue
Block a user