From 553de13cd55669ccebc1297932dca2157a9fd40a Mon Sep 17 00:00:00 2001 From: iven Date: Tue, 5 May 2026 15:40:15 +0800 Subject: [PATCH] =?UTF-8?q?feat(ai):=20=E6=89=A9=E5=B1=95=E4=BA=8B?= =?UTF-8?q?=E4=BB=B6=E8=AE=A2=E9=98=85=E8=87=AA=E5=8A=A8=E5=85=A5=E9=98=9F?= =?UTF-8?q?=E5=88=86=E6=9E=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 订阅 health_data.critical_alert → 趋势分析 (priority=2) 订阅 lab_report.uploaded → 化验单解读 (priority=1) 订阅 dialysis.record.created → KDIGO 风险评估 (priority=2) tokio::select! 多通道并发消费 --- crates/erp-ai/src/module.rs | 105 +++++++++++++++++++++++++++++++++++- 1 file changed, 103 insertions(+), 2 deletions(-) diff --git a/crates/erp-ai/src/module.rs b/crates/erp-ai/src/module.rs index 4c57e14..373e783 100644 --- a/crates/erp-ai/src/module.rs +++ b/crates/erp-ai/src/module.rs @@ -126,7 +126,7 @@ impl ErpModule for AiModule { source_id = ?source_id, patient_id = ?patient_id, tenant_id = %event.tenant_id, - "收到 AI 分析请求事件(化验单上传触发,待 Prompt 模板就绪后实现自动分析)" + "收到 AI 分析请求事件" ); } // H4: 透析记录→KDIGO 自动风险评估 @@ -141,7 +141,7 @@ impl ErpModule for AiModule { patient_id = ?patient_id, record_id = ?record_id, tenant_id = %event.tenant_id, - "透析→KDIGO 自动评估触发(待 eGFR 数据源接入后完成完整串联)" + "透析→KDIGO 自动评估触发" ); } Some(event) => { @@ -158,6 +158,107 @@ impl ErpModule for AiModule { } }); + // 订阅 erp-health 事件 → 自动入队分析 + let (mut health_rx, _) = ctx.event_bus.subscribe_filtered("health_data.".to_string()); + let (mut lab_rx, _) = ctx.event_bus.subscribe_filtered("lab_report.".to_string()); + let (mut dialysis_rx, _) = ctx.event_bus.subscribe_filtered("dialysis.".to_string()); + let queue_db = ctx.db.clone(); + + tokio::spawn(async move { + let queue = crate::service::analysis_queue::AnalysisQueue::new(queue_db); + loop { + tokio::select! { + event = health_rx.recv() => { + match event { + Some(e) if e.event_type == "health_data.critical_alert" => { + if let Some(pid) = e.payload.get("patient_id") + .and_then(|v| v.as_str()) + .and_then(|s| uuid::Uuid::parse_str(s).ok()) + { + let job = crate::service::analysis_queue::AnalysisJob { + tenant_id: e.tenant_id, + patient_id: pid, + analysis_type: "trend".into(), + priority: 2, + source_event: Some(e.event_type.clone()), + source_ref: "event".into(), + created_by: None, + }; + if let Err(err) = queue.enqueue(job).await { + tracing::warn!(error = %err, "健康告警→分析入队失败"); + } else { + tracing::info!(tenant = %e.tenant_id, patient = %pid, "健康告警→趋势分析已入队"); + } + } + } + Some(e) => { + tracing::debug!(event_type = %e.event_type, "忽略非目标 health_data 事件"); + } + None => return, + } + } + event = lab_rx.recv() => { + match event { + Some(e) if e.event_type == "lab_report.uploaded" => { + if let Some(pid) = e.payload.get("patient_id") + .and_then(|v| v.as_str()) + .and_then(|s| uuid::Uuid::parse_str(s).ok()) + { + let job = crate::service::analysis_queue::AnalysisJob { + tenant_id: e.tenant_id, + patient_id: pid, + analysis_type: "lab_report".into(), + priority: 1, + source_event: Some(e.event_type.clone()), + source_ref: "event".into(), + created_by: None, + }; + if let Err(err) = queue.enqueue(job).await { + tracing::warn!(error = %err, "化验单上传→分析入队失败"); + } else { + tracing::info!(tenant = %e.tenant_id, patient = %pid, "化验单上传→解读分析已入队"); + } + } + } + Some(e) => { + tracing::debug!(event_type = %e.event_type, "忽略非目标 lab_report 事件"); + } + None => return, + } + } + event = dialysis_rx.recv() => { + match event { + Some(e) if e.event_type == "dialysis.record.created" => { + if let Some(pid) = e.payload.get("patient_id") + .and_then(|v| v.as_str()) + .and_then(|s| uuid::Uuid::parse_str(s).ok()) + { + let job = crate::service::analysis_queue::AnalysisJob { + tenant_id: e.tenant_id, + patient_id: pid, + analysis_type: "dialysis_risk".into(), + priority: 2, + source_event: Some(e.event_type.clone()), + source_ref: "event".into(), + created_by: None, + }; + if let Err(err) = queue.enqueue(job).await { + tracing::warn!(error = %err, "透析记录→KDIGO入队失败"); + } else { + tracing::info!(tenant = %e.tenant_id, patient = %pid, "透析记录→KDIGO风险评估已入队"); + } + } + } + Some(e) => { + tracing::debug!(event_type = %e.event_type, "忽略非目标 dialysis 事件"); + } + None => return, + } + } + } + } + }); + tracing::info!(module = "ai", "AI 模块事件处理器已注册(监听 ai.* 事件)"); Ok(()) }