use axum::Router; use axum::routing::{delete, get, put}; use sea_orm::{ColumnTrait, EntityTrait, FromQueryResult, QueryFilter}; use std::sync::Arc; use tokio::sync::Semaphore; use uuid::Uuid; use erp_core::error::AppResult; use erp_core::events::EventBus; use erp_core::module::{ErpModule, PermissionDescriptor}; use crate::entity::message_subscription; use crate::handler::{message_handler, sse_handler, subscription_handler, template_handler}; /// 消息中心模块,实现 ErpModule trait。 pub struct MessageModule; impl MessageModule { pub fn new() -> Self { Self } /// 构建需要认证的路由。 pub fn protected_routes() -> Router where crate::message_state::MessageState: axum::extract::FromRef, S: Clone + Send + Sync + 'static, { Router::new() // 消息路由 .route( "/messages", get(message_handler::list_messages).post(message_handler::send_message), ) .route("/messages/unread-count", get(message_handler::unread_count)) .route("/messages/{id}/read", put(message_handler::mark_read)) .route("/messages/read-all", put(message_handler::mark_all_read)) .route("/messages/{id}", delete(message_handler::delete_message)) // SSE 实时推送 .route("/messages/stream", get(sse_handler::message_stream)) // 模板路由 .route( "/message-templates", get(template_handler::list_templates).post(template_handler::create_template), ) .route( "/message-templates/{id}", put(template_handler::update_template).delete(template_handler::delete_template), ) // 订阅偏好路由 .route( "/message-subscriptions", get(subscription_handler::get_subscription) .put(subscription_handler::update_subscription), ) } /// 启动后台事件监听任务,将工作流事件转化为消息通知。 /// /// 使用 Semaphore 限制最大并发数为 8,防止事件突发时过度消耗资源。 /// 在 main.rs 中调用,因为需要 db 连接。 pub fn start_event_listener(db: sea_orm::DatabaseConnection, event_bus: EventBus) { let mut rx = event_bus.subscribe(); let semaphore = Arc::new(Semaphore::new(8)); tokio::spawn(async move { loop { match rx.recv().await { Ok(event) => { let db = db.clone(); let event_bus = event_bus.clone(); let permit = semaphore.clone(); // 先获取许可,再 spawn 任务 tokio::spawn(async move { let _permit = match permit.acquire().await { Ok(p) => p, Err(_) => { tracing::warn!("信号量已关闭,跳过工作流事件处理"); return; } }; if let Err(e) = handle_workflow_event(&event, &db, &event_bus).await { tracing::warn!( event_type = %event.event_type, error = %e, "Failed to handle workflow event for messages" ); } }); } Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { tracing::warn!(skipped = n, "Event listener lagged, skipping events"); } Err(tokio::sync::broadcast::error::RecvError::Closed) => { tracing::info!("Event bus closed, stopping message event listener"); break; } } } }); } } impl Default for MessageModule { fn default() -> Self { Self::new() } } #[async_trait::async_trait] impl ErpModule for MessageModule { fn name(&self) -> &str { "message" } fn version(&self) -> &str { env!("CARGO_PKG_VERSION") } fn dependencies(&self) -> Vec<&str> { vec!["auth"] } fn register_event_handlers(&self, _bus: &EventBus) {} async fn on_tenant_created( &self, _tenant_id: Uuid, _db: &sea_orm::DatabaseConnection, _event_bus: &EventBus, ) -> AppResult<()> { Ok(()) } async fn on_tenant_deleted( &self, _tenant_id: Uuid, _db: &sea_orm::DatabaseConnection, ) -> AppResult<()> { Ok(()) } fn permissions(&self) -> Vec { vec![ PermissionDescriptor { code: "message.list".into(), name: "查看消息".into(), description: "查看消息列表".into(), module: "message".into(), }, PermissionDescriptor { code: "message.send".into(), name: "发送消息".into(), description: "发送新消息".into(), module: "message".into(), }, PermissionDescriptor { code: "message.template.list".into(), name: "查看消息模板".into(), description: "查看消息模板列表".into(), module: "message".into(), }, PermissionDescriptor { code: "message.template.create".into(), name: "创建消息模板".into(), description: "创建消息模板".into(), module: "message".into(), }, PermissionDescriptor { code: "message.template.manage".into(), name: "管理消息模板".into(), description: "编辑、删除消息模板".into(), module: "message".into(), }, ] } fn as_any(&self) -> &dyn std::any::Any { self } } /// 检查用户是否启用了 DND(免打扰)且当前时间在 DND 窗口内。 /// 返回 true 表示应该跳过发送。 async fn should_skip_for_dnd( tenant_id: Uuid, recipient_id: Uuid, priority: &str, db: &sea_orm::DatabaseConnection, ) -> bool { // 紧急消息永远不跳过 if priority == "urgent" { return false; } let sub = match message_subscription::Entity::find() .filter(message_subscription::Column::TenantId.eq(tenant_id)) .filter(message_subscription::Column::UserId.eq(recipient_id)) .filter(message_subscription::Column::DeletedAt.is_null()) .one(db) .await { Ok(Some(s)) => s, _ => return false, }; if !sub.dnd_enabled { return false; } let (start, end) = match (sub.dnd_start, sub.dnd_end) { (Some(s), Some(e)) => (s, e), _ => return false, }; let now = chrono::Local::now(); let now_time = now.format("%H:%M").to_string(); is_in_dnd_window(&now_time, &start, &end) } /// 判断当前时间是否在 DND 窗口内。支持跨午夜窗口(如 22:00-06:00)。 pub(crate) fn is_in_dnd_window(now_time: &str, start: &str, end: &str) -> bool { if start <= end { now_time >= start && now_time < end } else { now_time >= start || now_time < end } } /// 处理工作流事件,生成对应的消息通知。 async fn handle_workflow_event( event: &erp_core::events::DomainEvent, db: &sea_orm::DatabaseConnection, event_bus: &EventBus, ) -> Result<(), String> { match event.event_type.as_str() { "process_instance.started" => { let instance_id = event .payload .get("instance_id") .and_then(|v| v.as_str()) .unwrap_or("unknown"); let starter_id = event.payload.get("started_by").and_then(|v| v.as_str()); if let Some(starter) = starter_id { let recipient = match uuid::Uuid::parse_str(starter) { Ok(id) => id, Err(_) => return Ok(()), }; if should_skip_for_dnd(event.tenant_id, recipient, "normal", db).await { return Ok(()); } let _ = crate::service::message_service::MessageService::send_system( event.tenant_id, recipient, "流程已启动".to_string(), format!("您的流程实例 {} 已启动执行。", instance_id), "normal", Some("workflow_instance".to_string()), uuid::Uuid::parse_str(instance_id).ok(), db, event_bus, ) .await .map_err(|e| e.to_string())?; } } "task.completed" => { let task_id = event .payload .get("task_id") .and_then(|v| v.as_str()) .unwrap_or("unknown"); let starter_id = event.payload.get("started_by").and_then(|v| v.as_str()); if let Some(starter) = starter_id { let recipient = match uuid::Uuid::parse_str(starter) { Ok(id) => id, Err(_) => return Ok(()), }; if should_skip_for_dnd(event.tenant_id, recipient, "normal", db).await { return Ok(()); } let _ = crate::service::message_service::MessageService::send_system( event.tenant_id, recipient, "流程任务已完成".to_string(), format!("流程任务 {} 已完成,请查看。", task_id), "normal", Some("workflow_task".to_string()), uuid::Uuid::parse_str(task_id).ok(), db, event_bus, ) .await .map_err(|e| e.to_string())?; } } // 预约事件通知 "appointment.created" => { let appointment_id = event .payload .get("appointment_id") .and_then(|v| v.as_str()) .unwrap_or("unknown"); let patient_id = event .payload .get("patient_id") .and_then(|v| v.as_str()) .and_then(|s| uuid::Uuid::parse_str(s).ok()); if let Some(pid) = patient_id { if should_skip_for_dnd(event.tenant_id, pid, "normal", db).await { return Ok(()); } let _ = crate::service::message_service::MessageService::send_system( event.tenant_id, pid, "预约已创建".to_string(), format!( "您的新预约 {} 已创建,请等待确认。", &appointment_id[..8.min(appointment_id.len())] ), "normal", Some("appointment".to_string()), uuid::Uuid::parse_str(appointment_id).ok(), db, event_bus, ) .await .map_err(|e| e.to_string())?; } } "appointment.confirmed" => { let appointment_id = event .payload .get("appointment_id") .and_then(|v| v.as_str()) .unwrap_or("unknown"); let appointment_date = event .payload .get("appointment_date") .and_then(|v| v.as_str()) .unwrap_or(""); let patient_id = event .payload .get("patient_id") .and_then(|v| v.as_str()) .and_then(|s| uuid::Uuid::parse_str(s).ok()); if let Some(pid) = patient_id { if should_skip_for_dnd(event.tenant_id, pid, "important", db).await { return Ok(()); } let date_info = if appointment_date.is_empty() { String::new() } else { format!("({})", appointment_date) }; let _ = crate::service::message_service::MessageService::send_system( event.tenant_id, pid, "预约已确认".to_string(), format!("您的预约{}已确认,请按时就诊。", date_info), "important", Some("appointment".to_string()), uuid::Uuid::parse_str(appointment_id).ok(), db, event_bus, ) .await .map_err(|e| e.to_string())?; } } "appointment.cancelled" => { let appointment_id = event .payload .get("appointment_id") .and_then(|v| v.as_str()) .unwrap_or("unknown"); let patient_id = event .payload .get("patient_id") .and_then(|v| v.as_str()) .and_then(|s| uuid::Uuid::parse_str(s).ok()); if let Some(pid) = patient_id { if should_skip_for_dnd(event.tenant_id, pid, "normal", db).await { return Ok(()); } let _ = crate::service::message_service::MessageService::send_system( event.tenant_id, pid, "预约已取消".to_string(), format!( "您的预约 {} 已被取消。", &appointment_id[..8.min(appointment_id.len())] ), "normal", Some("appointment".to_string()), uuid::Uuid::parse_str(appointment_id).ok(), db, event_bus, ) .await .map_err(|e| e.to_string())?; } } "appointment.reminder" => { let patient_id = event .payload .get("patient_id") .and_then(|v| v.as_str()) .and_then(|s| uuid::Uuid::parse_str(s).ok()); let appointment_date = event .payload .get("appointment_date") .and_then(|v| v.as_str()) .unwrap_or("明天"); let time_slot = event .payload .get("time_slot") .and_then(|v| v.as_str()) .unwrap_or(""); if let Some(pid) = patient_id { if should_skip_for_dnd(event.tenant_id, pid, "normal", db).await { return Ok(()); } let _ = crate::service::message_service::MessageService::send_system( event.tenant_id, pid, "预约提醒".to_string(), format!( "您明天({})有一个预约,时间段:{},请准时就诊。", appointment_date, time_slot ), "normal", Some("appointment".to_string()), event .payload .get("appointment_id") .and_then(|v| v.as_str()) .and_then(|s| uuid::Uuid::parse_str(s).ok()), db, event_bus, ) .await .map_err(|e| e.to_string())?; } } "health_data.critical_alert" => { let patient_name = event .payload .get("patient_name") .and_then(|v| v.as_str()) .unwrap_or("未知患者"); let alert = event.payload.get("alert"); let indicator = alert .and_then(|a| a.get("indicator")) .and_then(|v| v.as_str()) .unwrap_or("未知指标"); let value = alert .and_then(|a| a.get("value")) .map(|v| v.to_string()) .unwrap_or_else(|| "?".to_string()); let direction = alert .and_then(|a| a.get("direction")) .and_then(|v| v.as_str()) .unwrap_or("high"); let direction_text = match direction { "low" => "偏低", _ => "偏高", }; // 通知责任医生(优先)— urgent 不跳过 DND if let Some(doctor_uid) = event .payload .get("doctor_user_id") .and_then(|v| v.as_str()) .and_then(|s| uuid::Uuid::parse_str(s).ok()) { let _ = crate::service::message_service::MessageService::send_system( event.tenant_id, doctor_uid, format!("危急值告警:患者 {}", patient_name), format!( "患者 {} 的{}{}(值:{}),请立即关注处理。", patient_name, indicator, direction_text, value ), "urgent", Some("critical_alert".to_string()), Some(event.id), db, event_bus, ) .await .map_err(|e| e.to_string())?; } // 同时通知操作人(录入者) if let Some(operator_uid) = event .payload .get("operator_id") .and_then(|v| v.as_str()) .and_then(|s| uuid::Uuid::parse_str(s).ok()) { let is_doctor = event .payload .get("doctor_user_id") .and_then(|v| v.as_str()) .map(|s| s == operator_uid.to_string()) .unwrap_or(false); if !is_doctor { if should_skip_for_dnd(event.tenant_id, operator_uid, "important", db).await { return Ok(()); } let _ = crate::service::message_service::MessageService::send_system( event.tenant_id, operator_uid, format!("危急值告警:患者 {}", patient_name), format!( "患者 {} 的{}{}(值:{})已触发危急值告警,已通知责任医生。", patient_name, indicator, direction_text, value ), "important", Some("critical_alert".to_string()), Some(event.id), db, event_bus, ) .await .map_err(|e| e.to_string())?; } } } "follow_up.overdue" => { let task_id = event .payload .get("task_id") .and_then(|v| v.as_str()) .unwrap_or("unknown"); let assigned_to = event .payload .get("assigned_to") .and_then(|v| v.as_str()) .and_then(|s| uuid::Uuid::parse_str(s).ok()); let planned_date = event .payload .get("planned_date") .and_then(|v| v.as_str()) .unwrap_or("未知日期"); let patient_name = event .payload .get("patient_name") .and_then(|v| v.as_str()) .unwrap_or(""); if let Some(assignee) = assigned_to { if should_skip_for_dnd(event.tenant_id, assignee, "important", db).await { return Ok(()); } let patient_info = if patient_name.is_empty() { String::new() } else { format!("(患者:{})", patient_name) }; let _ = crate::service::message_service::MessageService::send_system( event.tenant_id, assignee, "随访任务逾期提醒".to_string(), format!( "您的随访任务{}(计划日期:{})已逾期,请尽快处理。", patient_info, planned_date ), "important", Some("follow_up".to_string()), uuid::Uuid::parse_str(task_id).ok(), db, event_bus, ) .await .map_err(|e| e.to_string())?; } } // 咨询新消息通知医生 "consultation.new_message" => { let doctor_id = event .payload .get("doctor_id") .and_then(|v| v.as_str()) .and_then(|s| uuid::Uuid::parse_str(s).ok()); let patient_name = event .payload .get("patient_name") .and_then(|v| v.as_str()) .unwrap_or("患者"); let session_id = event .payload .get("session_id") .and_then(|v| v.as_str()) .unwrap_or(""); if let Some(did) = doctor_id { if should_skip_for_dnd(event.tenant_id, did, "normal", db).await { return Ok(()); } let _ = crate::service::message_service::MessageService::send_system( event.tenant_id, did, format!("新咨询消息 — {}", patient_name), format!("患者 {} 发来了一条咨询消息,请及时回复。", patient_name), "normal", Some("consultation".to_string()), uuid::Uuid::parse_str(session_id).ok(), db, event_bus, ) .await .map_err(|e| e.to_string())?; } } // 化验报告审核完成通知患者 "lab_report.reviewed" => { let patient_id = event .payload .get("patient_id") .and_then(|v| v.as_str()) .and_then(|s| uuid::Uuid::parse_str(s).ok()); let report_type = event .payload .get("report_type") .and_then(|v| v.as_str()) .unwrap_or("化验报告"); let report_id = event .payload .get("report_id") .and_then(|v| v.as_str()) .unwrap_or(""); if let Some(pid) = patient_id { if should_skip_for_dnd(event.tenant_id, pid, "normal", db).await { return Ok(()); } let _ = crate::service::message_service::MessageService::send_system( event.tenant_id, pid, format!("{}已审核", report_type), format!("您的{}已由医生审核完成,请查看医生注释。", report_type), "normal", Some("lab_report".to_string()), uuid::Uuid::parse_str(report_id).ok(), db, event_bus, ) .await .map_err(|e| e.to_string())?; } } // 咨询会话开启通知医生 "consultation.opened" => { let doctor_id = event .payload .get("doctor_id") .and_then(|v| v.as_str()) .and_then(|s| uuid::Uuid::parse_str(s).ok()); let patient_name = event .payload .get("patient_name") .and_then(|v| v.as_str()) .unwrap_or("患者"); let session_id = event .payload .get("session_id") .and_then(|v| v.as_str()) .unwrap_or(""); if let Some(did) = doctor_id { if should_skip_for_dnd(event.tenant_id, did, "normal", db).await { return Ok(()); } let _ = crate::service::message_service::MessageService::send_system( event.tenant_id, did, format!("新咨询会话 — {}", patient_name), format!("患者 {} 发起了咨询会话,请及时响应。", patient_name), "normal", Some("consultation".to_string()), uuid::Uuid::parse_str(session_id).ok(), db, event_bus, ) .await .map_err(|e| e.to_string())?; } } // 咨询会话关闭通知患者 "consultation.closed" => { let patient_id = event .payload .get("patient_id") .and_then(|v| v.as_str()) .and_then(|s| uuid::Uuid::parse_str(s).ok()); let session_id = event .payload .get("session_id") .and_then(|v| v.as_str()) .unwrap_or(""); if let Some(pid) = patient_id { if should_skip_for_dnd(event.tenant_id, pid, "normal", db).await { return Ok(()); } let _ = crate::service::message_service::MessageService::send_system( event.tenant_id, pid, "咨询会话已结束".to_string(), "您的咨询会话已由医生关闭,感谢使用。".to_string(), "normal", Some("consultation".to_string()), uuid::Uuid::parse_str(session_id).ok(), db, event_bus, ) .await .map_err(|e| e.to_string())?; } } // 随访任务创建通知被分配人 "follow_up.created" => { let assigned_to = event .payload .get("assigned_to") .and_then(|v| v.as_str()) .and_then(|s| uuid::Uuid::parse_str(s).ok()); let patient_name = event .payload .get("patient_name") .and_then(|v| v.as_str()) .unwrap_or(""); let planned_date = event .payload .get("planned_date") .and_then(|v| v.as_str()) .unwrap_or("未知日期"); let task_id = event .payload .get("task_id") .and_then(|v| v.as_str()) .unwrap_or(""); if let Some(assignee) = assigned_to { if should_skip_for_dnd(event.tenant_id, assignee, "normal", db).await { return Ok(()); } let patient_info = if patient_name.is_empty() { String::new() } else { format!("(患者:{})", patient_name) }; let _ = crate::service::message_service::MessageService::send_system( event.tenant_id, assignee, format!("新随访任务{}", patient_info), format!( "您被分配了一个随访任务{},计划日期:{}。", patient_info, planned_date ), "normal", Some("follow_up".to_string()), uuid::Uuid::parse_str(task_id).ok(), db, event_bus, ) .await .map_err(|e| e.to_string())?; } } // 随访完成通知患者 "follow_up.completed" => { let patient_id = event .payload .get("patient_id") .and_then(|v| v.as_str()) .and_then(|s| uuid::Uuid::parse_str(s).ok()); if let Some(pid) = patient_id { if should_skip_for_dnd(event.tenant_id, pid, "normal", db).await { return Ok(()); } let _ = crate::service::message_service::MessageService::send_system( event.tenant_id, pid, "随访已完成".to_string(), "您的随访任务已完成,感谢配合。".to_string(), "normal", Some("follow_up".to_string()), None, db, event_bus, ) .await .map_err(|e| e.to_string())?; } } // 积分获得通知患者 "points.earned" => { let patient_id = event .payload .get("patient_id") .and_then(|v| v.as_str()) .and_then(|s| uuid::Uuid::parse_str(s).ok()); let points = event .payload .get("points") .and_then(|v| v.as_i64()) .unwrap_or(0); let reason = event .payload .get("reason") .and_then(|v| v.as_str()) .unwrap_or("系统奖励"); if let Some(pid) = patient_id { if should_skip_for_dnd(event.tenant_id, pid, "low", db).await { return Ok(()); } let _ = crate::service::message_service::MessageService::send_system( event.tenant_id, pid, "积分到账".to_string(), format!("您获得了 {} 积分({})。", points, reason), "low", Some("points".to_string()), None, db, event_bus, ) .await .map_err(|e| e.to_string())?; } } // 积分兑换通知患者 "points.exchanged" => { let patient_id = event .payload .get("patient_id") .and_then(|v| v.as_str()) .and_then(|s| uuid::Uuid::parse_str(s).ok()); let product_name = event .payload .get("product_name") .and_then(|v| v.as_str()) .unwrap_or("商品"); if let Some(pid) = patient_id { if should_skip_for_dnd(event.tenant_id, pid, "normal", db).await { return Ok(()); } let _ = crate::service::message_service::MessageService::send_system( event.tenant_id, pid, "兑换成功".to_string(), format!("您已成功兑换「{}」,请留意后续通知。", product_name), "normal", Some("points".to_string()), None, db, event_bus, ) .await .map_err(|e| e.to_string())?; } } // 积分过期通知患者 "points.expired" => { let patient_id = event .payload .get("patient_id") .and_then(|v| v.as_str()) .and_then(|s| uuid::Uuid::parse_str(s).ok()); let points = event .payload .get("points") .and_then(|v| v.as_i64()) .unwrap_or(0); if let Some(pid) = patient_id { if should_skip_for_dnd(event.tenant_id, pid, "low", db).await { return Ok(()); } let _ = crate::service::message_service::MessageService::send_system( event.tenant_id, pid, "积分过期提醒".to_string(), format!("您有 {} 积分已过期。", points), "low", Some("points".to_string()), None, db, event_bus, ) .await .map_err(|e| e.to_string())?; } } // 文章发布通知(暂记录日志,无直接用户推送) "article.published" => { tracing::info!( article_id = %event.payload.get("article_id").and_then(|v| v.as_str()).unwrap_or(""), "文章已发布" ); } // 文章驳回通知作者 "article.rejected" => { let author_id = event .payload .get("author_id") .and_then(|v| v.as_str()) .and_then(|s| uuid::Uuid::parse_str(s).ok()); let title = event .payload .get("title") .and_then(|v| v.as_str()) .unwrap_or("文章"); if let Some(aid) = author_id { if should_skip_for_dnd(event.tenant_id, aid, "normal", db).await { return Ok(()); } let _ = crate::service::message_service::MessageService::send_system( event.tenant_id, aid, format!("文章未通过审核 — {}", title), format!("您的文章「{}」未通过审核,请修改后重新提交。", title), "normal", Some("article".to_string()), None, db, event_bus, ) .await .map_err(|e| e.to_string())?; } } // 患者信息更新通知(审计日志用途) "patient.updated" => { tracing::info!( patient_id = %event.payload.get("patient_id").and_then(|v| v.as_str()).unwrap_or(""), "患者信息已更新" ); } // AI 分析失败通知医生 "ai.analysis.failed" => { let doctor_id = event .payload .get("doctor_id") .and_then(|v| v.as_str()) .and_then(|s| uuid::Uuid::parse_str(s).ok()); let error = event .payload .get("error") .and_then(|v| v.as_str()) .unwrap_or("未知错误"); if let Some(did) = doctor_id { if should_skip_for_dnd(event.tenant_id, did, "normal", db).await { return Ok(()); } let _ = crate::service::message_service::MessageService::send_system( event.tenant_id, did, "AI 分析失败".to_string(), format!("AI 分析任务执行失败:{},请稍后重试。", error), "normal", Some("ai".to_string()), None, db, event_bus, ) .await .map_err(|e| e.to_string())?; } } // 化验单上传记录日志 "lab_report.uploaded" => { tracing::info!( patient_id = %event.payload.get("patient_id").and_then(|v| v.as_str()).unwrap_or(""), "化验单已上传" ); } // 日常监测记录日志 "daily_monitoring.created" => { tracing::info!( patient_id = %event.payload.get("patient_id").and_then(|v| v.as_str()).unwrap_or(""), "日常监测数据已记录" ); } // 医生在线状态变更记录日志 "doctor.online_status_changed" => { tracing::info!( doctor_id = %event.payload.get("doctor_id").and_then(|v| v.as_str()).unwrap_or(""), status = %event.payload.get("status").and_then(|v| v.as_str()).unwrap_or(""), "医生在线状态变更" ); } // AI Copilot 洞察生成 → 通知主管医生 "copilot.insight.created" => { let patient_id = event .payload .get("patient_id") .and_then(|v| v.as_str()) .and_then(|s| uuid::Uuid::parse_str(s).ok()); let severity = event .payload .get("severity") .and_then(|v| v.as_str()) .unwrap_or("warning"); let title = event .payload .get("title") .and_then(|v| v.as_str()) .unwrap_or("AI 健康洞察"); if let Some(pid) = patient_id { // 查询患者的责任医生(通过 follow_up_task 的 assigned_to) #[derive(sea_orm::FromQueryResult)] struct DoctorRow { assigned_to: uuid::Uuid, } let doctor: Option = DoctorRow::find_by_statement( sea_orm::Statement::from_sql_and_values( sea_orm::DatabaseBackend::Postgres, "SELECT assigned_to FROM follow_up_task WHERE tenant_id = $1 AND patient_id = $2 AND assigned_to IS NOT NULL AND deleted_at IS NULL AND status IN ('pending', 'in_progress') ORDER BY created_at DESC LIMIT 1", [event.tenant_id.into(), pid.into()], ), ) .one(db) .await .unwrap_or(None); if let Some(doc) = doctor { let priority = match severity { "critical" => "urgent", "warning" | "high" => "important", _ => "normal", }; if should_skip_for_dnd(event.tenant_id, doc.assigned_to, priority, db).await { return Ok(()); } let _ = crate::service::message_service::MessageService::send_system( event.tenant_id, doc.assigned_to, format!("AI 健康洞察:{}", title), format!( "AI 系统检测到患者存在「{}」级别的健康风险,请及时关注。洞察内容:{}", severity, title ), priority, Some("ai_insight".to_string()), Some(event.id), db, event_bus, ) .await .map_err(|e| e.to_string())?; } } } // 关怀计划激活 — 温暖通知患者 "care_plan.activated" => { let patient_id = event .payload .get("patient_id") .and_then(|v| v.as_str()) .and_then(|s| uuid::Uuid::parse_str(s).ok()); if let Some(pid) = patient_id { if should_skip_for_dnd(event.tenant_id, pid, "normal", db).await { return Ok(()); } let _ = crate::service::message_service::MessageService::send_system( event.tenant_id, pid, "护理计划已启动".to_string(), "您的护理计划已启动,我们将持续关注您的健康状况。如有任何疑问,请随时咨询您的护理团队。".to_string(), "normal", Some("care_plan".to_string()), event.payload.get("plan_id").and_then(|v| v.as_str()).and_then(|s| uuid::Uuid::parse_str(s).ok()), db, event_bus, ) .await .map_err(|e| e.to_string())?; } } // 关怀计划完成 — 温暖通知患者 "care_plan.completed" => { let patient_id = event .payload .get("patient_id") .and_then(|v| v.as_str()) .and_then(|s| uuid::Uuid::parse_str(s).ok()); if let Some(pid) = patient_id { if should_skip_for_dnd(event.tenant_id, pid, "normal", db).await { return Ok(()); } let _ = crate::service::message_service::MessageService::send_system( event.tenant_id, pid, "护理计划已完成".to_string(), "您的护理计划已完成,感谢您这段时间的配合!我们将继续关注您的健康。" .to_string(), "normal", Some("care_plan".to_string()), event .payload .get("plan_id") .and_then(|v| v.as_str()) .and_then(|s| uuid::Uuid::parse_str(s).ok()), db, event_bus, ) .await .map_err(|e| e.to_string())?; } } // 关怀行动执行 — 温暖通知患者(护理项完成、测量数据记录等) "care.action.performed" => { let patient_id = event .payload .get("patient_id") .and_then(|v| v.as_str()) .and_then(|s| uuid::Uuid::parse_str(s).ok()); let action = event .payload .get("action") .and_then(|v| v.as_str()) .unwrap_or(""); if let Some(pid) = patient_id { if should_skip_for_dnd(event.tenant_id, pid, "low", db).await { return Ok(()); } let (title, body) = match action { "item_completed" => { let item_title = event .payload .get("item_title") .and_then(|v| v.as_str()) .unwrap_or("护理项目"); ( "关怀已送达".to_string(), format!("您的护理团队已完成「{}」,感谢您的配合。", item_title), ) } "outcome_measured" => { let metric = event .payload .get("metric") .and_then(|v| v.as_str()) .unwrap_or("健康指标"); ( "健康数据已更新".to_string(), format!("您的{}数据已记录,护理团队正在持续关注。", metric), ) } _ => ( "关怀已送达".to_string(), "您的护理团队正在关注您的健康状况。".to_string(), ), }; let _ = crate::service::message_service::MessageService::send_system( event.tenant_id, pid, title, body, "low", Some("care_action".to_string()), event .payload .get("plan_id") .and_then(|v| v.as_str()) .and_then(|s| uuid::Uuid::parse_str(s).ok()), db, event_bus, ) .await .map_err(|e| e.to_string())?; } } _ => {} } Ok(()) } #[cfg(test)] mod tests { use super::*; // ---- DND 时间窗逻辑 ---- #[test] fn dnd_normal_range_inside() { // 09:00-17:00,当前 12:00 → 在窗口内 assert!(is_in_dnd_window("12:00", "09:00", "17:00")); } #[test] fn dnd_normal_range_before() { // 09:00-17:00,当前 08:00 → 不在窗口内 assert!(!is_in_dnd_window("08:00", "09:00", "17:00")); } #[test] fn dnd_normal_range_after() { // 09:00-17:00,当前 18:00 → 不在窗口内 assert!(!is_in_dnd_window("18:00", "09:00", "17:00")); } #[test] fn dnd_normal_range_at_start() { // 09:00-17:00,当前 09:00 → 在窗口内(>= start) assert!(is_in_dnd_window("09:00", "09:00", "17:00")); } #[test] fn dnd_normal_range_at_end() { // 09:00-17:00,当前 17:00 → 不在窗口内(< end 排除了 end 本身) assert!(!is_in_dnd_window("17:00", "09:00", "17:00")); } #[test] fn dnd_cross_midnight_night_time() { // 22:00-06:00,当前 23:30 → 在窗口内 assert!(is_in_dnd_window("23:30", "22:00", "06:00")); } #[test] fn dnd_cross_midnight_early_morning() { // 22:00-06:00,当前 03:00 → 在窗口内 assert!(is_in_dnd_window("03:00", "22:00", "06:00")); } #[test] fn dnd_cross_midnight_daytime() { // 22:00-06:00,当前 14:00 → 不在窗口内 assert!(!is_in_dnd_window("14:00", "22:00", "06:00")); } #[test] fn dnd_cross_midnight_at_start() { assert!(is_in_dnd_window("22:00", "22:00", "06:00")); } #[test] fn dnd_cross_midnight_at_end() { assert!(!is_in_dnd_window("06:00", "22:00", "06:00")); } #[test] fn dnd_cross_midnight_just_before_end() { assert!(is_in_dnd_window("05:59", "22:00", "06:00")); } #[test] fn dnd_same_start_end_always_in() { // start == end 意味着 start <= end,所以 now >= start && now < end // "00:00" >= "00:00" && "00:00" < "00:00" → false assert!(!is_in_dnd_window("00:00", "12:00", "12:00")); // "15:00" >= "12:00" && "15:00" < "12:00" → false assert!(!is_in_dnd_window("15:00", "12:00", "12:00")); } #[test] fn dnd_single_minute_window() { // 23:59-00:00(跨午夜 1 分钟) assert!(is_in_dnd_window("23:59", "23:59", "00:00")); assert!(!is_in_dnd_window("00:00", "23:59", "00:00")); } }