feat(health): AI 行动分发事件消费者 — 订阅 ai.analysis.completed

- 新增 ai_suggestion_loader:跨 crate 通过 raw SQL 读取 ai_suggestion 表
- 事件消费者 ai_action_dispatcher 订阅 ai. 事件
- 根据 suggestion_count > 0 触发行动分发路由
- 低风险自动执行,中/高风险进入医生审核队列
This commit is contained in:
iven
2026-05-01 08:41:14 +08:00
parent 69f9e1a61a
commit 5053908444
3 changed files with 118 additions and 0 deletions

View File

@@ -445,6 +445,83 @@ pub fn register_handlers_with_state(state: crate::state::HealthState) {
}
});
// ai.analysis.completed → AI→行动闭环消费者行动分发
let (mut ai_action_rx, _ai_action_handle) = state.event_bus.subscribe_filtered("ai.".to_string());
let action_db = state.db.clone();
let action_event_bus = state.event_bus.clone();
tokio::spawn(async move {
loop {
match ai_action_rx.recv().await {
Some(event) if event.event_type == "ai.analysis.completed" => {
if erp_core::events::is_event_processed(&action_db, event.id, "ai_action_dispatcher").await.unwrap_or(false) {
continue;
}
let tenant_id = event.tenant_id;
let analysis_id = event.payload.get("analysis_id")
.and_then(|v| v.as_str())
.and_then(|s| Uuid::parse_str(s).ok());
let patient_id = event.payload.get("patient_id")
.and_then(|v| v.as_str())
.and_then(|s| Uuid::parse_str(s).ok());
let doctor_id = event.payload.get("doctor_id")
.and_then(|v| v.as_str())
.and_then(|s| Uuid::parse_str(s).ok());
let risk_level = event.payload.get("risk_level")
.and_then(|v| v.as_str())
.unwrap_or("medium");
let suggestion_count = event.payload.get("suggestion_count")
.and_then(|v| v.as_u64())
.unwrap_or(0);
if suggestion_count > 0 {
if let (Some(aid), Some(pid)) = (analysis_id, patient_id) {
let loader_result: Result<Vec<serde_json::Value>, sea_orm::DbErr> =
crate::service::ai_suggestion_loader::load_by_analysis(
&action_db, tenant_id, aid,
).await;
match loader_result {
Ok(suggestions) if !suggestions.is_empty() => {
crate::service::ai_action_dispatcher::handle_ai_suggestions(
&action_db,
&action_event_bus,
tenant_id,
aid,
pid,
doctor_id,
&suggestions,
risk_level,
).await;
tracing::info!(
analysis_id = %aid,
patient_id = %pid,
suggestion_count = suggestions.len(),
risk_level = %risk_level,
"AI 行动分发完成"
);
}
Ok(_) => {
tracing::info!(analysis_id = %aid, "建议列表为空,跳过行动分发");
}
Err(e) => {
tracing::warn!(
analysis_id = %aid,
error = %e,
"加载建议列表失败"
);
}
}
}
}
let _ = erp_core::events::mark_event_processed(&action_db, event.id, "ai_action_dispatcher").await;
}
Some(_) => {}
None => break,
}
}
});
// consent.granted/revoked → 通知关联医生
let (mut consent_rx, _consent_handle) = state.event_bus.subscribe_filtered("consent.".to_string());
let consent_db = state.db.clone();

View File

@@ -0,0 +1,40 @@
use sea_orm::{DatabaseConnection, FromQueryResult, Statement};
use uuid::Uuid;
#[derive(Debug, FromQueryResult)]
struct SuggestionRow {
params: Option<serde_json::Value>,
suggestion_type: Option<String>,
risk_level: Option<String>,
}
/// 跨 crate 读取 ai_suggestion 表(通过 raw SQL
pub async fn load_by_analysis(
db: &DatabaseConnection,
tenant_id: Uuid,
analysis_id: Uuid,
) -> Result<Vec<serde_json::Value>, sea_orm::DbErr> {
let rows: Vec<SuggestionRow> = SuggestionRow::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
r#"
SELECT params, suggestion_type, risk_level
FROM ai_suggestion
WHERE tenant_id = $1 AND analysis_id = $2 AND deleted_at IS NULL
ORDER BY created_at ASC
"#,
[tenant_id.into(), analysis_id.into()],
))
.all(db)
.await?;
Ok(rows
.into_iter()
.map(|r| {
serde_json::json!({
"params": r.params.unwrap_or(serde_json::Value::Null),
"suggestion_type": r.suggestion_type.unwrap_or_default(),
"risk_level": r.risk_level.unwrap_or_default(),
})
})
.collect())
}

View File

@@ -1,4 +1,5 @@
pub mod ai_action_dispatcher;
pub mod ai_suggestion_loader;
pub mod alert_engine;
pub mod alert_rule_service;
pub mod alert_service;