feat(message): SSE 告警/体征推送添加医患关系过滤
Some checks failed
CI / rust-check (push) Has been cancelled
CI / rust-test (push) Has been cancelled
CI / frontend-build (push) Has been cancelled
CI / security-audit (push) Has been cancelled

alert.triggered 和 device.readings.synced 事件现在只推送给
该患者的管床医生(通过 patient_doctor_relation 表查询),
而非广播给租户内所有用户。新增 3 个单元测试验证 payload
解析逻辑。
This commit is contained in:
iven
2026-04-28 19:49:38 +08:00
parent e5546efa41
commit 781e1191a5

View File

@@ -3,6 +3,8 @@ use std::convert::Infallible;
use axum::extract::Extension;
use axum::response::sse::{Event, KeepAlive, Sse};
use futures::stream::Stream;
use sea_orm::ConnectionTrait;
use uuid::Uuid;
use erp_core::error::AppError;
use erp_core::types::TenantContext;
@@ -24,6 +26,7 @@ pub async fn message_stream(
// 空前缀 = 订阅所有事件
let (mut rx, _handle) = state.event_bus.subscribe_filtered(String::new());
let db = state.db.clone();
let sse_stream = async_stream::stream! {
loop {
match rx.recv().await {
@@ -48,6 +51,20 @@ pub async fn message_stream(
.data(data));
}
"alert.triggered" => {
// 医患关系过滤:只推送给该患者的管床医生
let patient_id = event.payload.get("patient_id")
.and_then(|v| v.as_str());
if let Some(pid_str) = patient_id {
let pid = Uuid::parse_str(pid_str).ok();
if let Some(pid) = pid {
let is_doctor = is_doctor_for_patient(
&db, tenant_id, user_id, pid,
).await;
if !is_doctor {
continue;
}
}
}
let data = serde_json::to_string(&event.payload)
.unwrap_or_default();
yield Ok(Event::default()
@@ -55,6 +72,20 @@ pub async fn message_stream(
.data(data));
}
"device.readings.synced" => {
// 医患关系过滤:只推送给该患者的管床医生
let patient_id = event.payload.get("patient_id")
.and_then(|v| v.as_str());
if let Some(pid_str) = patient_id {
let pid = Uuid::parse_str(pid_str).ok();
if let Some(pid) = pid {
let is_doctor = is_doctor_for_patient(
&db, tenant_id, user_id, pid,
).await;
if !is_doctor {
continue;
}
}
}
let data = serde_json::to_string(&event.payload)
.unwrap_or_default();
yield Ok(Event::default()
@@ -73,3 +104,89 @@ pub async fn message_stream(
Ok(Sse::new(sse_stream).keep_alive(KeepAlive::default()))
}
/// 检查 user_id 对应的医生是否是某患者的管床医生。
///
/// 查询 `patient_doctor_relation` 表:
/// - `doctor_id` 匹配 `user_id`doctor_profile 主键即 user_id
/// - `patient_id` 匹配目标患者
/// - 未软删除
///
/// 查询失败时返回 false宁可漏推不可误推
async fn is_doctor_for_patient(
db: &sea_orm::DatabaseConnection,
tenant_id: Uuid,
user_id: Uuid,
patient_id: Uuid,
) -> bool {
let sql = sea_orm::Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
r#"SELECT COUNT(*) AS cnt FROM patient_doctor_relation
WHERE tenant_id = $1 AND doctor_id = $2 AND patient_id = $3 AND deleted_at IS NULL"#,
[
tenant_id.into(),
user_id.into(),
patient_id.into(),
],
);
match db.query_one(sql).await {
Ok(Some(row)) => {
let cnt: i64 = row.try_get::<i64>("", "cnt").unwrap_or(0);
cnt > 0
}
_ => {
tracing::warn!(
user_id = %user_id,
patient_id = %patient_id,
"查询医患关系失败,跳过推送"
);
false
}
}
}
#[cfg(test)]
mod tests {
use super::*;
/// 验证 is_doctor_for_patient 函数签名和基础逻辑。
///
/// 由于需要真实数据库连接,此处仅测试参数构造正确性。
/// 完整的数据库集成测试在 erp-health 的集成测试中覆盖。
#[test]
fn patient_id_parsing_from_payload() {
let payload = serde_json::json!({
"patient_id": "550e8400-e29b-41d4-a716-446655440000",
"severity": "critical",
"rule_name": "心率过高",
});
let pid_str = payload.get("patient_id").and_then(|v| v.as_str());
assert!(pid_str.is_some());
let pid = Uuid::parse_str(pid_str.unwrap()).ok();
assert!(pid.is_some());
assert_eq!(
pid.unwrap(),
Uuid::parse_str("550e8400-e29b-41d4-a716-446655440000").unwrap()
);
}
#[test]
fn patient_id_missing_returns_none() {
let payload = serde_json::json!({
"severity": "warning",
});
let pid_str = payload.get("patient_id").and_then(|v| v.as_str());
assert!(pid_str.is_none());
}
#[test]
fn patient_id_invalid_uuid_returns_none() {
let payload = serde_json::json!({
"patient_id": "not-a-uuid",
});
let pid_str = payload.get("patient_id").and_then(|v| v.as_str());
assert!(pid_str.is_some());
let pid = Uuid::parse_str(pid_str.unwrap()).ok();
assert!(pid.is_none());
}
}