From 30344d474f02b1fe617ba5706f0f693167cc22a1 Mon Sep 17 00:00:00 2001 From: iven Date: Wed, 29 Apr 2026 17:00:24 +0800 Subject: [PATCH] =?UTF-8?q?fix(health+ai+dialysis):=20=E5=AE=A1=E8=AE=A1?= =?UTF-8?q?=20P1=20=E6=89=B9=E6=AC=A1=E4=BF=AE=E5=A4=8D=20=E2=80=94=20Even?= =?UTF-8?q?tBus=E6=8E=A5=E5=85=A5/=E7=9B=B2=E7=B4=A2=E5=BC=95=E5=8E=BB?= =?UTF-8?q?=E9=87=8D/=E4=BA=8B=E4=BB=B6=E6=B6=88=E8=B4=B9=E8=80=85?= =?UTF-8?q?=E8=A1=A5=E5=85=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit P1-2: erp-ai EventBus 接入 - handler 层 SSE 流完成/失败时发布 ai.analysis.completed/failed 事件 - build_sse_stream 新增 tenant_id 参数 P1-2: erp-dialysis EventBus 接入 - create_dialysis_record 审计后发布 dialysis.record.created 事件 P1-5: message.sent 消费者改进 - 从占位 tracing::info 升级为带 payload 详情的结构化日志 P1-7: 盲索引去重 - create_patient 中新增 id_number HMAC 去重检查(查 blind_indexes 表) - 患者创建成功后写入 blind_indexes 表(id_number + phone) - 防止同租户重复建档 P1-1: 事件消费者补全 - 新增 ai.analysis.completed 消费者(幂等处理 + 日志) - 新增 dialysis.record.created 消费者(幂等处理 + 日志) --- crates/erp-ai/src/handler/mod.rs | 32 ++++++++++-- .../src/service/dialysis_service.rs | 13 +++++ crates/erp-health/src/event.rs | 52 +++++++++++++++++-- .../erp-health/src/service/patient_service.rs | 46 ++++++++++++++++ 4 files changed, 135 insertions(+), 8 deletions(-) diff --git a/crates/erp-ai/src/handler/mod.rs b/crates/erp-ai/src/handler/mod.rs index 7f55b59..0a9f521 100644 --- a/crates/erp-ai/src/handler/mod.rs +++ b/crates/erp-ai/src/handler/mod.rs @@ -72,7 +72,7 @@ where let analysis_id_clone = analysis_id; let state_clone = state.clone(); - let sse_stream = build_sse_stream(stream, analysis_id_clone, state_clone, "lab_report"); + let sse_stream = build_sse_stream(stream, analysis_id_clone, state_clone, "lab_report", ctx.tenant_id); Ok(Sse::new(sse_stream).keep_alive(KeepAlive::default())) } @@ -140,7 +140,7 @@ where let analysis_id_clone = analysis_id; let state_clone = state.clone(); - let sse_stream = build_sse_stream(stream, analysis_id_clone, state_clone, "trend"); + let sse_stream = build_sse_stream(stream, analysis_id_clone, state_clone, "trend", ctx.tenant_id); Ok(Sse::new(sse_stream).keep_alive(KeepAlive::default())) } @@ -197,7 +197,7 @@ where let analysis_id_clone = analysis_id; let state_clone = state.clone(); - let sse_stream = build_sse_stream(stream, analysis_id_clone, state_clone, "checkup_plan"); + let sse_stream = build_sse_stream(stream, analysis_id_clone, state_clone, "checkup_plan", ctx.tenant_id); Ok(Sse::new(sse_stream).keep_alive(KeepAlive::default())) } @@ -254,7 +254,7 @@ where let analysis_id_clone = analysis_id; let state_clone = state.clone(); - let sse_stream = build_sse_stream(stream, analysis_id_clone, state_clone, "report_summary"); + let sse_stream = build_sse_stream(stream, analysis_id_clone, state_clone, "report_summary", ctx.tenant_id); Ok(Sse::new(sse_stream).keep_alive(KeepAlive::default())) } @@ -452,6 +452,7 @@ fn build_sse_stream( analysis_id: uuid::Uuid, state: AiState, analysis_type: &'static str, + tenant_id: uuid::Uuid, ) -> impl futures::Stream> { async_stream::stream! { let mut full_content = String::new(); @@ -472,13 +473,34 @@ fn build_sse_stream( let data = serde_json::to_string(&event).unwrap_or_default(); yield Ok(Event::default().event("error").data(data)); let _ = state.analysis.fail_analysis(analysis_id, e.to_string()).await; + // 发布 AI 分析失败事件 + let fail_event = erp_core::events::DomainEvent::new( + "ai.analysis.failed", + tenant_id, + erp_core::events::build_event_payload(serde_json::json!({ + "analysis_id": analysis_id, + "error": e.to_string(), + })), + ); + state.event_bus.publish(fail_event, &state.db).await; return; } } } let metadata = serde_json::json!({"analysis_type": analysis_type}); - let _ = state.analysis.complete_analysis(analysis_id, full_content, metadata).await; + let _ = state.analysis.complete_analysis(analysis_id, full_content.clone(), metadata).await; + + // 发布 AI 分析完成事件 + let event = erp_core::events::DomainEvent::new( + "ai.analysis.completed", + tenant_id, + erp_core::events::build_event_payload(serde_json::json!({ + "analysis_id": analysis_id, + "analysis_type": analysis_type, + })), + ); + state.event_bus.publish(event, &state.db).await; let done_event = AnalysisSseEvent::Done { analysis_id, diff --git a/crates/erp-dialysis/src/service/dialysis_service.rs b/crates/erp-dialysis/src/service/dialysis_service.rs index b80aa3e..97de4a7 100644 --- a/crates/erp-dialysis/src/service/dialysis_service.rs +++ b/crates/erp-dialysis/src/service/dialysis_service.rs @@ -4,6 +4,7 @@ use chrono::Utc; use erp_core::audit::AuditLog; use erp_core::audit_service; use erp_core::crypto as pii; +use erp_core::events::DomainEvent; use num_traits::ToPrimitive; use sea_orm::entity::prelude::*; use sea_orm::{ActiveValue::Set, QueryOrder, QuerySelect}; @@ -130,6 +131,18 @@ pub async fn create_dialysis_record( &state.db, ).await; + // 发布透析记录创建事件 + let event = DomainEvent::new( + "dialysis.record.created", + tenant_id, + erp_core::events::build_event_payload(serde_json::json!({ + "record_id": m.id, + "patient_id": m.patient_id, + "dialysis_type": m.dialysis_type, + })), + ); + state.event_bus.publish(event, &state.db).await; + Ok(to_resp(&state.crypto, m)) } diff --git a/crates/erp-health/src/event.rs b/crates/erp-health/src/event.rs index 2d6de1f..2df9934 100644 --- a/crates/erp-health/src/event.rs +++ b/crates/erp-health/src/event.rs @@ -107,17 +107,23 @@ pub fn register_handlers_with_state(state: crate::state::HealthState) { } }); - // message.sent → 预留:后续联动咨询会话 last_message_at + // message.sent → 通用消息事件消费者(预留咨询联动) let (mut msg_rx, _msg_handle) = state.event_bus.subscribe_filtered("message.".to_string()); - let msg_db = state.db.clone(); + let _msg_db = state.db.clone(); tokio::spawn(async move { loop { match msg_rx.recv().await { Some(event) if event.event_type == "message.sent" => { + let recipient_id = event.payload.get("recipient_id").and_then(|v| v.as_str()); + let message_id = event.payload.get("message_id").and_then(|v| v.as_str()); tracing::info!( event_id = %event.id, - "健康模块收到消息发送事件(暂不处理)" + message_id = ?message_id, + recipient_id = ?recipient_id, + "message.sent 消费者收到事件" ); + // TODO: 若 message 关联了 consultation session(通过 metadata 或 reference), + // 可在此更新 consultation_session.last_message_at } Some(_) => {} None => break, @@ -368,4 +374,44 @@ pub fn register_handlers_with_state(state: crate::state::HealthState) { } } }); + + // ai.analysis.completed → 通知关联医生 + let (mut ai_rx, _ai_handle) = state.event_bus.subscribe_filtered("ai.".to_string()); + let ai_db = state.db.clone(); + let ai_bus = state.event_bus.clone(); + tokio::spawn(async move { + loop { + match ai_rx.recv().await { + Some(event) if event.event_type == "ai.analysis.completed" => { + if erp_core::events::is_event_processed(&ai_db, event.id, "ai_analysis_notifier").await.unwrap_or(false) { + continue; + } + let analysis_id = event.payload.get("analysis_id").and_then(|v| v.as_str()); + let analysis_type = event.payload.get("analysis_type").and_then(|v| v.as_str()).unwrap_or("unknown"); + tracing::info!( + analysis_id = ?analysis_id, + analysis_type = %analysis_type, + "AI 分析完成,可触发后续通知" + ); + // TODO: 从 ai_analysis 记录中查询关联 patient/doctor,发送 message.send 事件 + let _ = erp_core::events::mark_event_processed(&ai_db, event.id, "ai_analysis_notifier").await; + } + Some(event) if event.event_type == "dialysis.record.created" => { + if erp_core::events::is_event_processed(&ai_db, event.id, "dialysis_notifier").await.unwrap_or(false) { + continue; + } + let patient_id = event.payload.get("patient_id").and_then(|v| v.as_str()); + let record_id = event.payload.get("record_id").and_then(|v| v.as_str()); + tracing::info!( + record_id = ?record_id, + patient_id = ?patient_id, + "透析记录已创建" + ); + let _ = erp_core::events::mark_event_processed(&ai_db, event.id, "dialysis_notifier").await; + } + Some(_) => {} + None => break, + } + } + }); } diff --git a/crates/erp-health/src/service/patient_service.rs b/crates/erp-health/src/service/patient_service.rs index 214dd35..f9c825f 100644 --- a/crates/erp-health/src/service/patient_service.rs +++ b/crates/erp-health/src/service/patient_service.rs @@ -140,6 +140,23 @@ pub async fn create_patient( .map(|m| pii::encrypt(state.crypto.kek(), m)) .transpose()?; + // 盲索引去重:同租户内相同身份证号不允许重复建档 + if let Some(ref hash) = id_number_hash { + let dup = crate::entity::blind_index::Entity::find() + .filter(crate::entity::blind_index::Column::TenantId.eq(tenant_id)) + .filter(crate::entity::blind_index::Column::EntityType.eq("patient")) + .filter(crate::entity::blind_index::Column::FieldName.eq("id_number")) + .filter(crate::entity::blind_index::Column::BlindHash.eq(hash.as_str())) + .one(&state.db) + .await?; + if dup.is_some() { + return Err(HealthError::Validation("该身份证号已存在患者档案".to_string())); + } + } + // 保留副本供写入 blind_indexes 表(active model 构建 会 move 原值) + let bi_id_hash = id_number_hash.clone(); + let bi_phone_hash = phone_hash.clone(); + let active = patient::ActiveModel { id: Set(id), tenant_id: Set(tenant_id), @@ -170,6 +187,35 @@ pub async fn create_patient( let model = active.insert(&state.db).await?; + // 写入盲索引到统一索引表(用于跨系统去重查询) + let now_bi = Utc::now(); + if let Some(hash) = bi_id_hash { + let bi = crate::entity::blind_index::ActiveModel { + id: Set(Uuid::now_v7()), + tenant_id: Set(tenant_id), + entity_type: Set("patient".to_string()), + entity_id: Set(model.id), + field_name: Set("id_number".to_string()), + blind_hash: Set(hash), + created_at: Set(now_bi), + updated_at: Set(now_bi), + }; + bi.insert(&state.db).await?; + } + if let Some(hash) = bi_phone_hash { + let bi = crate::entity::blind_index::ActiveModel { + id: Set(Uuid::now_v7()), + tenant_id: Set(tenant_id), + entity_type: Set("patient".to_string()), + entity_id: Set(model.id), + field_name: Set("emergency_contact_phone".to_string()), + blind_hash: Set(hash), + created_at: Set(now_bi), + updated_at: Set(now_bi), + }; + bi.insert(&state.db).await?; + } + let event = DomainEvent::new( crate::event::PATIENT_CREATED, tenant_id,