fix(health+ai+dialysis): 审计 P1 批次修复 — EventBus接入/盲索引去重/事件消费者补全
P1-2: erp-ai EventBus 接入 - handler 层 SSE 流完成/失败时发布 ai.analysis.completed/failed 事件 - build_sse_stream 新增 tenant_id 参数 P1-2: erp-dialysis EventBus 接入 - create_dialysis_record 审计后发布 dialysis.record.created 事件 P1-5: message.sent 消费者改进 - 从占位 tracing::info 升级为带 payload 详情的结构化日志 P1-7: 盲索引去重 - create_patient 中新增 id_number HMAC 去重检查(查 blind_indexes 表) - 患者创建成功后写入 blind_indexes 表(id_number + phone) - 防止同租户重复建档 P1-1: 事件消费者补全 - 新增 ai.analysis.completed 消费者(幂等处理 + 日志) - 新增 dialysis.record.created 消费者(幂等处理 + 日志)
This commit is contained in:
@@ -72,7 +72,7 @@ where
|
||||
let analysis_id_clone = analysis_id;
|
||||
let state_clone = state.clone();
|
||||
|
||||
let sse_stream = build_sse_stream(stream, analysis_id_clone, state_clone, "lab_report");
|
||||
let sse_stream = build_sse_stream(stream, analysis_id_clone, state_clone, "lab_report", ctx.tenant_id);
|
||||
Ok(Sse::new(sse_stream).keep_alive(KeepAlive::default()))
|
||||
}
|
||||
|
||||
@@ -140,7 +140,7 @@ where
|
||||
let analysis_id_clone = analysis_id;
|
||||
let state_clone = state.clone();
|
||||
|
||||
let sse_stream = build_sse_stream(stream, analysis_id_clone, state_clone, "trend");
|
||||
let sse_stream = build_sse_stream(stream, analysis_id_clone, state_clone, "trend", ctx.tenant_id);
|
||||
Ok(Sse::new(sse_stream).keep_alive(KeepAlive::default()))
|
||||
}
|
||||
|
||||
@@ -197,7 +197,7 @@ where
|
||||
let analysis_id_clone = analysis_id;
|
||||
let state_clone = state.clone();
|
||||
|
||||
let sse_stream = build_sse_stream(stream, analysis_id_clone, state_clone, "checkup_plan");
|
||||
let sse_stream = build_sse_stream(stream, analysis_id_clone, state_clone, "checkup_plan", ctx.tenant_id);
|
||||
Ok(Sse::new(sse_stream).keep_alive(KeepAlive::default()))
|
||||
}
|
||||
|
||||
@@ -254,7 +254,7 @@ where
|
||||
let analysis_id_clone = analysis_id;
|
||||
let state_clone = state.clone();
|
||||
|
||||
let sse_stream = build_sse_stream(stream, analysis_id_clone, state_clone, "report_summary");
|
||||
let sse_stream = build_sse_stream(stream, analysis_id_clone, state_clone, "report_summary", ctx.tenant_id);
|
||||
Ok(Sse::new(sse_stream).keep_alive(KeepAlive::default()))
|
||||
}
|
||||
|
||||
@@ -452,6 +452,7 @@ fn build_sse_stream(
|
||||
analysis_id: uuid::Uuid,
|
||||
state: AiState,
|
||||
analysis_type: &'static str,
|
||||
tenant_id: uuid::Uuid,
|
||||
) -> impl futures::Stream<Item = Result<Event, Infallible>> {
|
||||
async_stream::stream! {
|
||||
let mut full_content = String::new();
|
||||
@@ -472,13 +473,34 @@ fn build_sse_stream(
|
||||
let data = serde_json::to_string(&event).unwrap_or_default();
|
||||
yield Ok(Event::default().event("error").data(data));
|
||||
let _ = state.analysis.fail_analysis(analysis_id, e.to_string()).await;
|
||||
// 发布 AI 分析失败事件
|
||||
let fail_event = erp_core::events::DomainEvent::new(
|
||||
"ai.analysis.failed",
|
||||
tenant_id,
|
||||
erp_core::events::build_event_payload(serde_json::json!({
|
||||
"analysis_id": analysis_id,
|
||||
"error": e.to_string(),
|
||||
})),
|
||||
);
|
||||
state.event_bus.publish(fail_event, &state.db).await;
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let metadata = serde_json::json!({"analysis_type": analysis_type});
|
||||
let _ = state.analysis.complete_analysis(analysis_id, full_content, metadata).await;
|
||||
let _ = state.analysis.complete_analysis(analysis_id, full_content.clone(), metadata).await;
|
||||
|
||||
// 发布 AI 分析完成事件
|
||||
let event = erp_core::events::DomainEvent::new(
|
||||
"ai.analysis.completed",
|
||||
tenant_id,
|
||||
erp_core::events::build_event_payload(serde_json::json!({
|
||||
"analysis_id": analysis_id,
|
||||
"analysis_type": analysis_type,
|
||||
})),
|
||||
);
|
||||
state.event_bus.publish(event, &state.db).await;
|
||||
|
||||
let done_event = AnalysisSseEvent::Done {
|
||||
analysis_id,
|
||||
|
||||
@@ -4,6 +4,7 @@ use chrono::Utc;
|
||||
use erp_core::audit::AuditLog;
|
||||
use erp_core::audit_service;
|
||||
use erp_core::crypto as pii;
|
||||
use erp_core::events::DomainEvent;
|
||||
use num_traits::ToPrimitive;
|
||||
use sea_orm::entity::prelude::*;
|
||||
use sea_orm::{ActiveValue::Set, QueryOrder, QuerySelect};
|
||||
@@ -130,6 +131,18 @@ pub async fn create_dialysis_record(
|
||||
&state.db,
|
||||
).await;
|
||||
|
||||
// 发布透析记录创建事件
|
||||
let event = DomainEvent::new(
|
||||
"dialysis.record.created",
|
||||
tenant_id,
|
||||
erp_core::events::build_event_payload(serde_json::json!({
|
||||
"record_id": m.id,
|
||||
"patient_id": m.patient_id,
|
||||
"dialysis_type": m.dialysis_type,
|
||||
})),
|
||||
);
|
||||
state.event_bus.publish(event, &state.db).await;
|
||||
|
||||
Ok(to_resp(&state.crypto, m))
|
||||
}
|
||||
|
||||
|
||||
@@ -107,17 +107,23 @@ pub fn register_handlers_with_state(state: crate::state::HealthState) {
|
||||
}
|
||||
});
|
||||
|
||||
// message.sent → 预留:后续联动咨询会话 last_message_at
|
||||
// message.sent → 通用消息事件消费者(预留咨询联动)
|
||||
let (mut msg_rx, _msg_handle) = state.event_bus.subscribe_filtered("message.".to_string());
|
||||
let msg_db = state.db.clone();
|
||||
let _msg_db = state.db.clone();
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
match msg_rx.recv().await {
|
||||
Some(event) if event.event_type == "message.sent" => {
|
||||
let recipient_id = event.payload.get("recipient_id").and_then(|v| v.as_str());
|
||||
let message_id = event.payload.get("message_id").and_then(|v| v.as_str());
|
||||
tracing::info!(
|
||||
event_id = %event.id,
|
||||
"健康模块收到消息发送事件(暂不处理)"
|
||||
message_id = ?message_id,
|
||||
recipient_id = ?recipient_id,
|
||||
"message.sent 消费者收到事件"
|
||||
);
|
||||
// TODO: 若 message 关联了 consultation session(通过 metadata 或 reference),
|
||||
// 可在此更新 consultation_session.last_message_at
|
||||
}
|
||||
Some(_) => {}
|
||||
None => break,
|
||||
@@ -368,4 +374,44 @@ pub fn register_handlers_with_state(state: crate::state::HealthState) {
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// ai.analysis.completed → 通知关联医生
|
||||
let (mut ai_rx, _ai_handle) = state.event_bus.subscribe_filtered("ai.".to_string());
|
||||
let ai_db = state.db.clone();
|
||||
let ai_bus = state.event_bus.clone();
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
match ai_rx.recv().await {
|
||||
Some(event) if event.event_type == "ai.analysis.completed" => {
|
||||
if erp_core::events::is_event_processed(&ai_db, event.id, "ai_analysis_notifier").await.unwrap_or(false) {
|
||||
continue;
|
||||
}
|
||||
let analysis_id = event.payload.get("analysis_id").and_then(|v| v.as_str());
|
||||
let analysis_type = event.payload.get("analysis_type").and_then(|v| v.as_str()).unwrap_or("unknown");
|
||||
tracing::info!(
|
||||
analysis_id = ?analysis_id,
|
||||
analysis_type = %analysis_type,
|
||||
"AI 分析完成,可触发后续通知"
|
||||
);
|
||||
// TODO: 从 ai_analysis 记录中查询关联 patient/doctor,发送 message.send 事件
|
||||
let _ = erp_core::events::mark_event_processed(&ai_db, event.id, "ai_analysis_notifier").await;
|
||||
}
|
||||
Some(event) if event.event_type == "dialysis.record.created" => {
|
||||
if erp_core::events::is_event_processed(&ai_db, event.id, "dialysis_notifier").await.unwrap_or(false) {
|
||||
continue;
|
||||
}
|
||||
let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str());
|
||||
let record_id = event.payload.get("record_id").and_then(|v| v.as_str());
|
||||
tracing::info!(
|
||||
record_id = ?record_id,
|
||||
patient_id = ?patient_id,
|
||||
"透析记录已创建"
|
||||
);
|
||||
let _ = erp_core::events::mark_event_processed(&ai_db, event.id, "dialysis_notifier").await;
|
||||
}
|
||||
Some(_) => {}
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -140,6 +140,23 @@ pub async fn create_patient(
|
||||
.map(|m| pii::encrypt(state.crypto.kek(), m))
|
||||
.transpose()?;
|
||||
|
||||
// 盲索引去重:同租户内相同身份证号不允许重复建档
|
||||
if let Some(ref hash) = id_number_hash {
|
||||
let dup = crate::entity::blind_index::Entity::find()
|
||||
.filter(crate::entity::blind_index::Column::TenantId.eq(tenant_id))
|
||||
.filter(crate::entity::blind_index::Column::EntityType.eq("patient"))
|
||||
.filter(crate::entity::blind_index::Column::FieldName.eq("id_number"))
|
||||
.filter(crate::entity::blind_index::Column::BlindHash.eq(hash.as_str()))
|
||||
.one(&state.db)
|
||||
.await?;
|
||||
if dup.is_some() {
|
||||
return Err(HealthError::Validation("该身份证号已存在患者档案".to_string()));
|
||||
}
|
||||
}
|
||||
// 保留副本供写入 blind_indexes 表(active model 构建 会 move 原值)
|
||||
let bi_id_hash = id_number_hash.clone();
|
||||
let bi_phone_hash = phone_hash.clone();
|
||||
|
||||
let active = patient::ActiveModel {
|
||||
id: Set(id),
|
||||
tenant_id: Set(tenant_id),
|
||||
@@ -170,6 +187,35 @@ pub async fn create_patient(
|
||||
|
||||
let model = active.insert(&state.db).await?;
|
||||
|
||||
// 写入盲索引到统一索引表(用于跨系统去重查询)
|
||||
let now_bi = Utc::now();
|
||||
if let Some(hash) = bi_id_hash {
|
||||
let bi = crate::entity::blind_index::ActiveModel {
|
||||
id: Set(Uuid::now_v7()),
|
||||
tenant_id: Set(tenant_id),
|
||||
entity_type: Set("patient".to_string()),
|
||||
entity_id: Set(model.id),
|
||||
field_name: Set("id_number".to_string()),
|
||||
blind_hash: Set(hash),
|
||||
created_at: Set(now_bi),
|
||||
updated_at: Set(now_bi),
|
||||
};
|
||||
bi.insert(&state.db).await?;
|
||||
}
|
||||
if let Some(hash) = bi_phone_hash {
|
||||
let bi = crate::entity::blind_index::ActiveModel {
|
||||
id: Set(Uuid::now_v7()),
|
||||
tenant_id: Set(tenant_id),
|
||||
entity_type: Set("patient".to_string()),
|
||||
entity_id: Set(model.id),
|
||||
field_name: Set("emergency_contact_phone".to_string()),
|
||||
blind_hash: Set(hash),
|
||||
created_at: Set(now_bi),
|
||||
updated_at: Set(now_bi),
|
||||
};
|
||||
bi.insert(&state.db).await?;
|
||||
}
|
||||
|
||||
let event = DomainEvent::new(
|
||||
crate::event::PATIENT_CREATED,
|
||||
tenant_id,
|
||||
|
||||
Reference in New Issue
Block a user