feat(ai): 扩展事件订阅自动入队分析

订阅 health_data.critical_alert → 趋势分析 (priority=2)
订阅 lab_report.uploaded → 化验单解读 (priority=1)
订阅 dialysis.record.created → KDIGO 风险评估 (priority=2)
tokio::select! 多通道并发消费
This commit is contained in:
iven
2026-05-05 15:40:15 +08:00
parent 7fb92714c7
commit 553de13cd5

View File

@@ -126,7 +126,7 @@ impl ErpModule for AiModule {
source_id = ?source_id,
patient_id = ?patient_id,
tenant_id = %event.tenant_id,
"收到 AI 分析请求事件(化验单上传触发,待 Prompt 模板就绪后实现自动分析)"
"收到 AI 分析请求事件"
);
}
// H4: 透析记录→KDIGO 自动风险评估
@@ -141,7 +141,7 @@ impl ErpModule for AiModule {
patient_id = ?patient_id,
record_id = ?record_id,
tenant_id = %event.tenant_id,
"透析→KDIGO 自动评估触发(待 eGFR 数据源接入后完成完整串联)"
"透析→KDIGO 自动评估触发"
);
}
Some(event) => {
@@ -158,6 +158,107 @@ impl ErpModule for AiModule {
}
});
// 订阅 erp-health 事件 → 自动入队分析
let (mut health_rx, _) = ctx.event_bus.subscribe_filtered("health_data.".to_string());
let (mut lab_rx, _) = ctx.event_bus.subscribe_filtered("lab_report.".to_string());
let (mut dialysis_rx, _) = ctx.event_bus.subscribe_filtered("dialysis.".to_string());
let queue_db = ctx.db.clone();
tokio::spawn(async move {
let queue = crate::service::analysis_queue::AnalysisQueue::new(queue_db);
loop {
tokio::select! {
event = health_rx.recv() => {
match event {
Some(e) if e.event_type == "health_data.critical_alert" => {
if let Some(pid) = e.payload.get("patient_id")
.and_then(|v| v.as_str())
.and_then(|s| uuid::Uuid::parse_str(s).ok())
{
let job = crate::service::analysis_queue::AnalysisJob {
tenant_id: e.tenant_id,
patient_id: pid,
analysis_type: "trend".into(),
priority: 2,
source_event: Some(e.event_type.clone()),
source_ref: "event".into(),
created_by: None,
};
if let Err(err) = queue.enqueue(job).await {
tracing::warn!(error = %err, "健康告警→分析入队失败");
} else {
tracing::info!(tenant = %e.tenant_id, patient = %pid, "健康告警→趋势分析已入队");
}
}
}
Some(e) => {
tracing::debug!(event_type = %e.event_type, "忽略非目标 health_data 事件");
}
None => return,
}
}
event = lab_rx.recv() => {
match event {
Some(e) if e.event_type == "lab_report.uploaded" => {
if let Some(pid) = e.payload.get("patient_id")
.and_then(|v| v.as_str())
.and_then(|s| uuid::Uuid::parse_str(s).ok())
{
let job = crate::service::analysis_queue::AnalysisJob {
tenant_id: e.tenant_id,
patient_id: pid,
analysis_type: "lab_report".into(),
priority: 1,
source_event: Some(e.event_type.clone()),
source_ref: "event".into(),
created_by: None,
};
if let Err(err) = queue.enqueue(job).await {
tracing::warn!(error = %err, "化验单上传→分析入队失败");
} else {
tracing::info!(tenant = %e.tenant_id, patient = %pid, "化验单上传→解读分析已入队");
}
}
}
Some(e) => {
tracing::debug!(event_type = %e.event_type, "忽略非目标 lab_report 事件");
}
None => return,
}
}
event = dialysis_rx.recv() => {
match event {
Some(e) if e.event_type == "dialysis.record.created" => {
if let Some(pid) = e.payload.get("patient_id")
.and_then(|v| v.as_str())
.and_then(|s| uuid::Uuid::parse_str(s).ok())
{
let job = crate::service::analysis_queue::AnalysisJob {
tenant_id: e.tenant_id,
patient_id: pid,
analysis_type: "dialysis_risk".into(),
priority: 2,
source_event: Some(e.event_type.clone()),
source_ref: "event".into(),
created_by: None,
};
if let Err(err) = queue.enqueue(job).await {
tracing::warn!(error = %err, "透析记录→KDIGO入队失败");
} else {
tracing::info!(tenant = %e.tenant_id, patient = %pid, "透析记录→KDIGO风险评估已入队");
}
}
}
Some(e) => {
tracing::debug!(event_type = %e.event_type, "忽略非目标 dialysis 事件");
}
None => return,
}
}
}
}
});
tracing::info!(module = "ai", "AI 模块事件处理器已注册(监听 ai.* 事件)");
Ok(())
}