diff --git a/crates/erp-health/src/event.rs b/crates/erp-health/src/event.rs index 6b6058c..7cfe356 100644 --- a/crates/erp-health/src/event.rs +++ b/crates/erp-health/src/event.rs @@ -445,6 +445,83 @@ pub fn register_handlers_with_state(state: crate::state::HealthState) { } }); + // ai.analysis.completed → AI→行动闭环消费者(行动分发) + let (mut ai_action_rx, _ai_action_handle) = state.event_bus.subscribe_filtered("ai.".to_string()); + let action_db = state.db.clone(); + let action_event_bus = state.event_bus.clone(); + tokio::spawn(async move { + loop { + match ai_action_rx.recv().await { + Some(event) if event.event_type == "ai.analysis.completed" => { + if erp_core::events::is_event_processed(&action_db, event.id, "ai_action_dispatcher").await.unwrap_or(false) { + continue; + } + + let tenant_id = event.tenant_id; + let analysis_id = event.payload.get("analysis_id") + .and_then(|v| v.as_str()) + .and_then(|s| Uuid::parse_str(s).ok()); + let patient_id = event.payload.get("patient_id") + .and_then(|v| v.as_str()) + .and_then(|s| Uuid::parse_str(s).ok()); + let doctor_id = event.payload.get("doctor_id") + .and_then(|v| v.as_str()) + .and_then(|s| Uuid::parse_str(s).ok()); + let risk_level = event.payload.get("risk_level") + .and_then(|v| v.as_str()) + .unwrap_or("medium"); + let suggestion_count = event.payload.get("suggestion_count") + .and_then(|v| v.as_u64()) + .unwrap_or(0); + + if suggestion_count > 0 { + if let (Some(aid), Some(pid)) = (analysis_id, patient_id) { + let loader_result: Result, sea_orm::DbErr> = + crate::service::ai_suggestion_loader::load_by_analysis( + &action_db, tenant_id, aid, + ).await; + match loader_result { + Ok(suggestions) if !suggestions.is_empty() => { + crate::service::ai_action_dispatcher::handle_ai_suggestions( + &action_db, + &action_event_bus, + tenant_id, + aid, + pid, + doctor_id, + &suggestions, + risk_level, + ).await; + tracing::info!( + analysis_id = %aid, + patient_id = %pid, + suggestion_count = suggestions.len(), + risk_level = %risk_level, + "AI 行动分发完成" + ); + } + Ok(_) => { + tracing::info!(analysis_id = %aid, "建议列表为空,跳过行动分发"); + } + Err(e) => { + tracing::warn!( + analysis_id = %aid, + error = %e, + "加载建议列表失败" + ); + } + } + } + } + + let _ = erp_core::events::mark_event_processed(&action_db, event.id, "ai_action_dispatcher").await; + } + Some(_) => {} + None => break, + } + } + }); + // consent.granted/revoked → 通知关联医生 let (mut consent_rx, _consent_handle) = state.event_bus.subscribe_filtered("consent.".to_string()); let consent_db = state.db.clone(); diff --git a/crates/erp-health/src/service/ai_suggestion_loader.rs b/crates/erp-health/src/service/ai_suggestion_loader.rs new file mode 100644 index 0000000..fba7f0b --- /dev/null +++ b/crates/erp-health/src/service/ai_suggestion_loader.rs @@ -0,0 +1,40 @@ +use sea_orm::{DatabaseConnection, FromQueryResult, Statement}; +use uuid::Uuid; + +#[derive(Debug, FromQueryResult)] +struct SuggestionRow { + params: Option, + suggestion_type: Option, + risk_level: Option, +} + +/// 跨 crate 读取 ai_suggestion 表(通过 raw SQL) +pub async fn load_by_analysis( + db: &DatabaseConnection, + tenant_id: Uuid, + analysis_id: Uuid, +) -> Result, sea_orm::DbErr> { + let rows: Vec = SuggestionRow::find_by_statement(Statement::from_sql_and_values( + sea_orm::DatabaseBackend::Postgres, + r#" + SELECT params, suggestion_type, risk_level + FROM ai_suggestion + WHERE tenant_id = $1 AND analysis_id = $2 AND deleted_at IS NULL + ORDER BY created_at ASC + "#, + [tenant_id.into(), analysis_id.into()], + )) + .all(db) + .await?; + + Ok(rows + .into_iter() + .map(|r| { + serde_json::json!({ + "params": r.params.unwrap_or(serde_json::Value::Null), + "suggestion_type": r.suggestion_type.unwrap_or_default(), + "risk_level": r.risk_level.unwrap_or_default(), + }) + }) + .collect()) +} diff --git a/crates/erp-health/src/service/mod.rs b/crates/erp-health/src/service/mod.rs index 7c244e3..87db442 100644 --- a/crates/erp-health/src/service/mod.rs +++ b/crates/erp-health/src/service/mod.rs @@ -1,4 +1,5 @@ pub mod ai_action_dispatcher; +pub mod ai_suggestion_loader; pub mod alert_engine; pub mod alert_rule_service; pub mod alert_service;