Files
hms/crates/erp-message/src/module.rs
iven 26aa66d6e3
Some checks failed
CI / frontend-build (push) Has been cancelled
CI / rust-check (push) Has been cancelled
CI / rust-test (push) Has been cancelled
CI / security-audit (push) Has been cancelled
test(message): erp-message 从 45 增至 69 个单元测试 — DND 时间窗 + TransactionError + model_to_resp
- module.rs: 提取 is_in_dnd_window 纯函数 + 14 个 DND 时间窗测试(正常范围/跨午夜/边界)
- error.rs: 2 个 TransactionError 转换测试(Connection/Transaction)
- message_service: 2 个 model_to_resp 字段映射测试
- template_service: 1 个 model_to_resp 字段映射测试
- subscription_service: 1 个 model_to_resp 字段映射测试
2026-04-28 18:26:36 +08:00

695 lines
25 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
use axum::Router;
use axum::routing::{delete, get, put};
use sea_orm::{ColumnTrait, EntityTrait, QueryFilter};
use std::sync::Arc;
use tokio::sync::Semaphore;
use uuid::Uuid;
use erp_core::error::AppResult;
use erp_core::events::EventBus;
use erp_core::module::ErpModule;
use crate::entity::message_subscription;
use crate::handler::{message_handler, sse_handler, subscription_handler, template_handler};
/// 消息中心模块,实现 ErpModule trait。
pub struct MessageModule;
impl MessageModule {
pub fn new() -> Self {
Self
}
/// 构建需要认证的路由。
pub fn protected_routes<S>() -> Router<S>
where
crate::message_state::MessageState: axum::extract::FromRef<S>,
S: Clone + Send + Sync + 'static,
{
Router::new()
// 消息路由
.route(
"/messages",
get(message_handler::list_messages).post(message_handler::send_message),
)
.route("/messages/unread-count", get(message_handler::unread_count))
.route("/messages/{id}/read", put(message_handler::mark_read))
.route("/messages/read-all", put(message_handler::mark_all_read))
.route("/messages/{id}", delete(message_handler::delete_message))
// SSE 实时推送
.route("/messages/stream", get(sse_handler::message_stream))
// 模板路由
.route(
"/message-templates",
get(template_handler::list_templates).post(template_handler::create_template),
)
.route(
"/message-templates/{id}",
put(template_handler::update_template).delete(template_handler::delete_template),
)
// 订阅偏好路由
.route(
"/message-subscriptions",
get(subscription_handler::get_subscription)
.put(subscription_handler::update_subscription),
)
}
/// 启动后台事件监听任务,将工作流事件转化为消息通知。
///
/// 使用 Semaphore 限制最大并发数为 8防止事件突发时过度消耗资源。
/// 在 main.rs 中调用,因为需要 db 连接。
pub fn start_event_listener(db: sea_orm::DatabaseConnection, event_bus: EventBus) {
let mut rx = event_bus.subscribe();
let semaphore = Arc::new(Semaphore::new(8));
tokio::spawn(async move {
loop {
match rx.recv().await {
Ok(event) => {
let db = db.clone();
let event_bus = event_bus.clone();
let permit = semaphore.clone();
// 先获取许可,再 spawn 任务
tokio::spawn(async move {
let _permit = permit.acquire().await.unwrap();
if let Err(e) = handle_workflow_event(&event, &db, &event_bus).await {
tracing::warn!(
event_type = %event.event_type,
error = %e,
"Failed to handle workflow event for messages"
);
}
});
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
tracing::warn!(skipped = n, "Event listener lagged, skipping events");
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
tracing::info!("Event bus closed, stopping message event listener");
break;
}
}
}
});
}
}
impl Default for MessageModule {
fn default() -> Self {
Self::new()
}
}
#[async_trait::async_trait]
impl ErpModule for MessageModule {
fn name(&self) -> &str {
"message"
}
fn version(&self) -> &str {
env!("CARGO_PKG_VERSION")
}
fn dependencies(&self) -> Vec<&str> {
vec!["auth"]
}
fn register_event_handlers(&self, _bus: &EventBus) {}
async fn on_tenant_created(
&self,
_tenant_id: Uuid,
_db: &sea_orm::DatabaseConnection,
_event_bus: &EventBus,
) -> AppResult<()> {
Ok(())
}
async fn on_tenant_deleted(
&self,
_tenant_id: Uuid,
_db: &sea_orm::DatabaseConnection,
) -> AppResult<()> {
Ok(())
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
}
/// 检查用户是否启用了 DND免打扰且当前时间在 DND 窗口内。
/// 返回 true 表示应该跳过发送。
async fn should_skip_for_dnd(
tenant_id: Uuid,
recipient_id: Uuid,
priority: &str,
db: &sea_orm::DatabaseConnection,
) -> bool {
// 紧急消息永远不跳过
if priority == "urgent" {
return false;
}
let sub = match message_subscription::Entity::find()
.filter(message_subscription::Column::TenantId.eq(tenant_id))
.filter(message_subscription::Column::UserId.eq(recipient_id))
.filter(message_subscription::Column::DeletedAt.is_null())
.one(db)
.await
{
Ok(Some(s)) => s,
_ => return false,
};
if !sub.dnd_enabled {
return false;
}
let (start, end) = match (sub.dnd_start, sub.dnd_end) {
(Some(s), Some(e)) => (s, e),
_ => return false,
};
let now = chrono::Local::now();
let now_time = now.format("%H:%M").to_string();
is_in_dnd_window(&now_time, &start, &end)
}
/// 判断当前时间是否在 DND 窗口内。支持跨午夜窗口(如 22:00-06:00
pub(crate) fn is_in_dnd_window(now_time: &str, start: &str, end: &str) -> bool {
if start <= end {
now_time >= start && now_time < end
} else {
now_time >= start || now_time < end
}
}
/// 处理工作流事件,生成对应的消息通知。
async fn handle_workflow_event(
event: &erp_core::events::DomainEvent,
db: &sea_orm::DatabaseConnection,
event_bus: &EventBus,
) -> Result<(), String> {
match event.event_type.as_str() {
"process_instance.started" => {
let instance_id = event
.payload
.get("instance_id")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let starter_id = event.payload.get("started_by").and_then(|v| v.as_str());
if let Some(starter) = starter_id {
let recipient = match uuid::Uuid::parse_str(starter) {
Ok(id) => id,
Err(_) => return Ok(()),
};
if should_skip_for_dnd(event.tenant_id, recipient, "normal", db).await {
return Ok(());
}
let _ = crate::service::message_service::MessageService::send_system(
event.tenant_id,
recipient,
"流程已启动".to_string(),
format!("您的流程实例 {} 已启动执行。", instance_id),
"normal",
Some("workflow_instance".to_string()),
uuid::Uuid::parse_str(instance_id).ok(),
db,
event_bus,
)
.await
.map_err(|e| e.to_string())?;
}
}
"task.completed" => {
let task_id = event
.payload
.get("task_id")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let starter_id = event.payload.get("started_by").and_then(|v| v.as_str());
if let Some(starter) = starter_id {
let recipient = match uuid::Uuid::parse_str(starter) {
Ok(id) => id,
Err(_) => return Ok(()),
};
if should_skip_for_dnd(event.tenant_id, recipient, "normal", db).await {
return Ok(());
}
let _ = crate::service::message_service::MessageService::send_system(
event.tenant_id,
recipient,
"流程任务已完成".to_string(),
format!("流程任务 {} 已完成,请查看。", task_id),
"normal",
Some("workflow_task".to_string()),
uuid::Uuid::parse_str(task_id).ok(),
db,
event_bus,
)
.await
.map_err(|e| e.to_string())?;
}
}
// 预约事件通知
"appointment.created" => {
let appointment_id = event
.payload
.get("appointment_id")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let patient_id = event
.payload
.get("patient_id")
.and_then(|v| v.as_str())
.and_then(|s| uuid::Uuid::parse_str(s).ok());
if let Some(pid) = patient_id {
if should_skip_for_dnd(event.tenant_id, pid, "normal", db).await {
return Ok(());
}
let _ = crate::service::message_service::MessageService::send_system(
event.tenant_id,
pid,
"预约已创建".to_string(),
format!("您的新预约 {} 已创建,请等待确认。", &appointment_id[..8.min(appointment_id.len())]),
"normal",
Some("appointment".to_string()),
uuid::Uuid::parse_str(appointment_id).ok(),
db,
event_bus,
)
.await
.map_err(|e| e.to_string())?;
}
}
"appointment.confirmed" => {
let appointment_id = event
.payload
.get("appointment_id")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let appointment_date = event
.payload
.get("appointment_date")
.and_then(|v| v.as_str())
.unwrap_or("");
let patient_id = event
.payload
.get("patient_id")
.and_then(|v| v.as_str())
.and_then(|s| uuid::Uuid::parse_str(s).ok());
if let Some(pid) = patient_id {
if should_skip_for_dnd(event.tenant_id, pid, "important", db).await {
return Ok(());
}
let date_info = if appointment_date.is_empty() {
String::new()
} else {
format!("{}", appointment_date)
};
let _ = crate::service::message_service::MessageService::send_system(
event.tenant_id,
pid,
"预约已确认".to_string(),
format!("您的预约{}已确认,请按时就诊。", date_info),
"important",
Some("appointment".to_string()),
uuid::Uuid::parse_str(appointment_id).ok(),
db,
event_bus,
)
.await
.map_err(|e| e.to_string())?;
}
}
"appointment.cancelled" => {
let appointment_id = event
.payload
.get("appointment_id")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let patient_id = event
.payload
.get("patient_id")
.and_then(|v| v.as_str())
.and_then(|s| uuid::Uuid::parse_str(s).ok());
if let Some(pid) = patient_id {
if should_skip_for_dnd(event.tenant_id, pid, "normal", db).await {
return Ok(());
}
let _ = crate::service::message_service::MessageService::send_system(
event.tenant_id,
pid,
"预约已取消".to_string(),
format!("您的预约 {} 已被取消。", &appointment_id[..8.min(appointment_id.len())]),
"normal",
Some("appointment".to_string()),
uuid::Uuid::parse_str(appointment_id).ok(),
db,
event_bus,
)
.await
.map_err(|e| e.to_string())?;
}
}
"appointment.reminder" => {
let patient_id = event
.payload
.get("patient_id")
.and_then(|v| v.as_str())
.and_then(|s| uuid::Uuid::parse_str(s).ok());
let appointment_date = event
.payload
.get("appointment_date")
.and_then(|v| v.as_str())
.unwrap_or("明天");
let time_slot = event
.payload
.get("time_slot")
.and_then(|v| v.as_str())
.unwrap_or("");
if let Some(pid) = patient_id {
if should_skip_for_dnd(event.tenant_id, pid, "normal", db).await {
return Ok(());
}
let _ = crate::service::message_service::MessageService::send_system(
event.tenant_id,
pid,
"预约提醒".to_string(),
format!("您明天({})有一个预约,时间段:{},请准时就诊。", appointment_date, time_slot),
"normal",
Some("appointment".to_string()),
event.payload.get("appointment_id").and_then(|v| v.as_str()).and_then(|s| uuid::Uuid::parse_str(s).ok()),
db,
event_bus,
)
.await
.map_err(|e| e.to_string())?;
}
}
"health_data.critical_alert" => {
let patient_name = event
.payload
.get("patient_name")
.and_then(|v| v.as_str())
.unwrap_or("未知患者");
let alert = event.payload.get("alert");
let indicator = alert
.and_then(|a| a.get("indicator"))
.and_then(|v| v.as_str())
.unwrap_or("未知指标");
let value = alert
.and_then(|a| a.get("value"))
.map(|v| v.to_string())
.unwrap_or_else(|| "?".to_string());
let direction = alert
.and_then(|a| a.get("direction"))
.and_then(|v| v.as_str())
.unwrap_or("high");
let direction_text = match direction {
"low" => "偏低",
_ => "偏高",
};
// 通知责任医生(优先)— urgent 不跳过 DND
if let Some(doctor_uid) = event
.payload
.get("doctor_user_id")
.and_then(|v| v.as_str())
.and_then(|s| uuid::Uuid::parse_str(s).ok())
{
let _ = crate::service::message_service::MessageService::send_system(
event.tenant_id,
doctor_uid,
format!("危急值告警:患者 {}", patient_name),
format!(
"患者 {}{}{}(值:{}),请立即关注处理。",
patient_name, indicator, direction_text, value
),
"urgent",
Some("critical_alert".to_string()),
Some(event.id),
db,
event_bus,
)
.await
.map_err(|e| e.to_string())?;
}
// 同时通知操作人(录入者)
if let Some(operator_uid) = event
.payload
.get("operator_id")
.and_then(|v| v.as_str())
.and_then(|s| uuid::Uuid::parse_str(s).ok())
{
let is_doctor = event
.payload
.get("doctor_user_id")
.and_then(|v| v.as_str())
.map(|s| s == operator_uid.to_string())
.unwrap_or(false);
if !is_doctor {
if should_skip_for_dnd(event.tenant_id, operator_uid, "important", db).await {
return Ok(());
}
let _ = crate::service::message_service::MessageService::send_system(
event.tenant_id,
operator_uid,
format!("危急值告警:患者 {}", patient_name),
format!(
"患者 {}{}{}(值:{})已触发危急值告警,已通知责任医生。",
patient_name, indicator, direction_text, value
),
"important",
Some("critical_alert".to_string()),
Some(event.id),
db,
event_bus,
)
.await
.map_err(|e| e.to_string())?;
}
}
}
"follow_up.overdue" => {
let task_id = event
.payload
.get("task_id")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let assigned_to = event
.payload
.get("assigned_to")
.and_then(|v| v.as_str())
.and_then(|s| uuid::Uuid::parse_str(s).ok());
let planned_date = event
.payload
.get("planned_date")
.and_then(|v| v.as_str())
.unwrap_or("未知日期");
let patient_name = event
.payload
.get("patient_name")
.and_then(|v| v.as_str())
.unwrap_or("");
if let Some(assignee) = assigned_to {
if should_skip_for_dnd(event.tenant_id, assignee, "important", db).await {
return Ok(());
}
let patient_info = if patient_name.is_empty() {
String::new()
} else {
format!("(患者:{}", patient_name)
};
let _ = crate::service::message_service::MessageService::send_system(
event.tenant_id,
assignee,
"随访任务逾期提醒".to_string(),
format!(
"您的随访任务{}(计划日期:{})已逾期,请尽快处理。",
patient_info, planned_date
),
"important",
Some("follow_up".to_string()),
uuid::Uuid::parse_str(task_id).ok(),
db,
event_bus,
)
.await
.map_err(|e| e.to_string())?;
}
}
// 咨询新消息通知医生
"consultation.new_message" => {
let doctor_id = event
.payload
.get("doctor_id")
.and_then(|v| v.as_str())
.and_then(|s| uuid::Uuid::parse_str(s).ok());
let patient_name = event
.payload
.get("patient_name")
.and_then(|v| v.as_str())
.unwrap_or("患者");
let session_id = event
.payload
.get("session_id")
.and_then(|v| v.as_str())
.unwrap_or("");
if let Some(did) = doctor_id {
if should_skip_for_dnd(event.tenant_id, did, "normal", db).await {
return Ok(());
}
let _ = crate::service::message_service::MessageService::send_system(
event.tenant_id,
did,
format!("新咨询消息 — {}", patient_name),
format!("患者 {} 发来了一条咨询消息,请及时回复。", patient_name),
"normal",
Some("consultation".to_string()),
uuid::Uuid::parse_str(session_id).ok(),
db,
event_bus,
)
.await
.map_err(|e| e.to_string())?;
}
}
// 化验报告审核完成通知患者
"lab_report.reviewed" => {
let patient_id = event
.payload
.get("patient_id")
.and_then(|v| v.as_str())
.and_then(|s| uuid::Uuid::parse_str(s).ok());
let report_type = event
.payload
.get("report_type")
.and_then(|v| v.as_str())
.unwrap_or("化验报告");
let report_id = event
.payload
.get("report_id")
.and_then(|v| v.as_str())
.unwrap_or("");
if let Some(pid) = patient_id {
if should_skip_for_dnd(event.tenant_id, pid, "normal", db).await {
return Ok(());
}
let _ = crate::service::message_service::MessageService::send_system(
event.tenant_id,
pid,
format!("{}已审核", report_type),
format!("您的{}已由医生审核完成,请查看医生注释。", report_type),
"normal",
Some("lab_report".to_string()),
uuid::Uuid::parse_str(report_id).ok(),
db,
event_bus,
)
.await
.map_err(|e| e.to_string())?;
}
}
_ => {}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
// ---- DND 时间窗逻辑 ----
#[test]
fn dnd_normal_range_inside() {
// 09:00-17:00当前 12:00 → 在窗口内
assert!(is_in_dnd_window("12:00", "09:00", "17:00"));
}
#[test]
fn dnd_normal_range_before() {
// 09:00-17:00当前 08:00 → 不在窗口内
assert!(!is_in_dnd_window("08:00", "09:00", "17:00"));
}
#[test]
fn dnd_normal_range_after() {
// 09:00-17:00当前 18:00 → 不在窗口内
assert!(!is_in_dnd_window("18:00", "09:00", "17:00"));
}
#[test]
fn dnd_normal_range_at_start() {
// 09:00-17:00当前 09:00 → 在窗口内(>= start
assert!(is_in_dnd_window("09:00", "09:00", "17:00"));
}
#[test]
fn dnd_normal_range_at_end() {
// 09:00-17:00当前 17:00 → 不在窗口内(< end 排除了 end 本身)
assert!(!is_in_dnd_window("17:00", "09:00", "17:00"));
}
#[test]
fn dnd_cross_midnight_night_time() {
// 22:00-06:00当前 23:30 → 在窗口内
assert!(is_in_dnd_window("23:30", "22:00", "06:00"));
}
#[test]
fn dnd_cross_midnight_early_morning() {
// 22:00-06:00当前 03:00 → 在窗口内
assert!(is_in_dnd_window("03:00", "22:00", "06:00"));
}
#[test]
fn dnd_cross_midnight_daytime() {
// 22:00-06:00当前 14:00 → 不在窗口内
assert!(!is_in_dnd_window("14:00", "22:00", "06:00"));
}
#[test]
fn dnd_cross_midnight_at_start() {
assert!(is_in_dnd_window("22:00", "22:00", "06:00"));
}
#[test]
fn dnd_cross_midnight_at_end() {
assert!(!is_in_dnd_window("06:00", "22:00", "06:00"));
}
#[test]
fn dnd_cross_midnight_just_before_end() {
assert!(is_in_dnd_window("05:59", "22:00", "06:00"));
}
#[test]
fn dnd_same_start_end_always_in() {
// start == end 意味着 start <= end所以 now >= start && now < end
// "00:00" >= "00:00" && "00:00" < "00:00" → false
assert!(!is_in_dnd_window("00:00", "12:00", "12:00"));
// "15:00" >= "12:00" && "15:00" < "12:00" → false
assert!(!is_in_dnd_window("15:00", "12:00", "12:00"));
}
#[test]
fn dnd_single_minute_window() {
// 23:59-00:00跨午夜 1 分钟)
assert!(is_in_dnd_window("23:59", "23:59", "00:00"));
assert!(!is_in_dnd_window("00:00", "23:59", "00:00"));
}
}