diff --git a/crates/erp-message/src/handler/sse_handler.rs b/crates/erp-message/src/handler/sse_handler.rs index d434ff9..e3d1016 100644 --- a/crates/erp-message/src/handler/sse_handler.rs +++ b/crates/erp-message/src/handler/sse_handler.rs @@ -3,6 +3,8 @@ use std::convert::Infallible; use axum::extract::Extension; use axum::response::sse::{Event, KeepAlive, Sse}; use futures::stream::Stream; +use sea_orm::ConnectionTrait; +use uuid::Uuid; use erp_core::error::AppError; use erp_core::types::TenantContext; @@ -24,6 +26,7 @@ pub async fn message_stream( // 空前缀 = 订阅所有事件 let (mut rx, _handle) = state.event_bus.subscribe_filtered(String::new()); + let db = state.db.clone(); let sse_stream = async_stream::stream! { loop { match rx.recv().await { @@ -48,6 +51,20 @@ pub async fn message_stream( .data(data)); } "alert.triggered" => { + // 医患关系过滤:只推送给该患者的管床医生 + let patient_id = event.payload.get("patient_id") + .and_then(|v| v.as_str()); + if let Some(pid_str) = patient_id { + let pid = Uuid::parse_str(pid_str).ok(); + if let Some(pid) = pid { + let is_doctor = is_doctor_for_patient( + &db, tenant_id, user_id, pid, + ).await; + if !is_doctor { + continue; + } + } + } let data = serde_json::to_string(&event.payload) .unwrap_or_default(); yield Ok(Event::default() @@ -55,6 +72,20 @@ pub async fn message_stream( .data(data)); } "device.readings.synced" => { + // 医患关系过滤:只推送给该患者的管床医生 + let patient_id = event.payload.get("patient_id") + .and_then(|v| v.as_str()); + if let Some(pid_str) = patient_id { + let pid = Uuid::parse_str(pid_str).ok(); + if let Some(pid) = pid { + let is_doctor = is_doctor_for_patient( + &db, tenant_id, user_id, pid, + ).await; + if !is_doctor { + continue; + } + } + } let data = serde_json::to_string(&event.payload) .unwrap_or_default(); yield Ok(Event::default() @@ -73,3 +104,89 @@ pub async fn message_stream( Ok(Sse::new(sse_stream).keep_alive(KeepAlive::default())) } + +/// 检查 user_id 对应的医生是否是某患者的管床医生。 +/// +/// 查询 `patient_doctor_relation` 表: +/// - `doctor_id` 匹配 `user_id`(doctor_profile 主键即 user_id) +/// - `patient_id` 匹配目标患者 +/// - 未软删除 +/// +/// 查询失败时返回 false(宁可漏推不可误推)。 +async fn is_doctor_for_patient( + db: &sea_orm::DatabaseConnection, + tenant_id: Uuid, + user_id: Uuid, + patient_id: Uuid, +) -> bool { + let sql = sea_orm::Statement::from_sql_and_values( + sea_orm::DatabaseBackend::Postgres, + r#"SELECT COUNT(*) AS cnt FROM patient_doctor_relation + WHERE tenant_id = $1 AND doctor_id = $2 AND patient_id = $3 AND deleted_at IS NULL"#, + [ + tenant_id.into(), + user_id.into(), + patient_id.into(), + ], + ); + match db.query_one(sql).await { + Ok(Some(row)) => { + let cnt: i64 = row.try_get::("", "cnt").unwrap_or(0); + cnt > 0 + } + _ => { + tracing::warn!( + user_id = %user_id, + patient_id = %patient_id, + "查询医患关系失败,跳过推送" + ); + false + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + /// 验证 is_doctor_for_patient 函数签名和基础逻辑。 + /// + /// 由于需要真实数据库连接,此处仅测试参数构造正确性。 + /// 完整的数据库集成测试在 erp-health 的集成测试中覆盖。 + #[test] + fn patient_id_parsing_from_payload() { + let payload = serde_json::json!({ + "patient_id": "550e8400-e29b-41d4-a716-446655440000", + "severity": "critical", + "rule_name": "心率过高", + }); + let pid_str = payload.get("patient_id").and_then(|v| v.as_str()); + assert!(pid_str.is_some()); + let pid = Uuid::parse_str(pid_str.unwrap()).ok(); + assert!(pid.is_some()); + assert_eq!( + pid.unwrap(), + Uuid::parse_str("550e8400-e29b-41d4-a716-446655440000").unwrap() + ); + } + + #[test] + fn patient_id_missing_returns_none() { + let payload = serde_json::json!({ + "severity": "warning", + }); + let pid_str = payload.get("patient_id").and_then(|v| v.as_str()); + assert!(pid_str.is_none()); + } + + #[test] + fn patient_id_invalid_uuid_returns_none() { + let payload = serde_json::json!({ + "patient_id": "not-a-uuid", + }); + let pid_str = payload.get("patient_id").and_then(|v| v.as_str()); + assert!(pid_str.is_some()); + let pid = Uuid::parse_str(pid_str.unwrap()).ok(); + assert!(pid.is_none()); + } +}