Files
hms/crates/erp-message/src/module.rs
iven b05b7c27a0
Some checks failed
CI / rust-check (push) Has been cancelled
CI / rust-test (push) Has been cancelled
CI / frontend-build (push) Has been cancelled
CI / security-audit (push) Has been cancelled
feat: 审计修复 Phase 6-7 — SSE 推送/工作流补全/消息群发/前端收尾
Phase 6 功能补全:
- P1-3: 消息 SSE 实时推送端点 + 前端 EventSource 连接
- P1-6: ServiceTask HTTP 调用能力 (reqwest GET/POST)
- P1-7: user.deleted 事件处理 — 终止相关流程实例
- P1-8: 任务认领 (claim) 端点 + handler
- P1-9: 超时检查器发布 task.timeout 事件
- P1-15: 组织/部门名称唯一性校验 (create + update)
- P1-18: 消息群发 fan-out (role/department/all 批量投递)

Phase 7 P3-P4 收尾:
- PluginAdmin purge 按钮状态修复
- ChangePassword 最小 8 字符 + 新旧密码不同验证
- AuditLogViewer 用户名缓存 + 扩展资源类型
- InstanceMonitor 通过 definition 缓存解析 node_name
- NotificationPreferences DND 时间范围校验
2026-04-26 19:44:04 +08:00

569 lines
20 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
use axum::Router;
use axum::routing::{delete, get, put};
use sea_orm::{ColumnTrait, EntityTrait, QueryFilter};
use std::sync::Arc;
use tokio::sync::Semaphore;
use uuid::Uuid;
use erp_core::error::AppResult;
use erp_core::events::EventBus;
use erp_core::module::ErpModule;
use crate::entity::message_subscription;
use crate::handler::{message_handler, sse_handler, subscription_handler, template_handler};
/// 消息中心模块,实现 ErpModule trait。
pub struct MessageModule;
impl MessageModule {
pub fn new() -> Self {
Self
}
/// 构建需要认证的路由。
pub fn protected_routes<S>() -> Router<S>
where
crate::message_state::MessageState: axum::extract::FromRef<S>,
S: Clone + Send + Sync + 'static,
{
Router::new()
// 消息路由
.route(
"/messages",
get(message_handler::list_messages).post(message_handler::send_message),
)
.route("/messages/unread-count", get(message_handler::unread_count))
.route("/messages/{id}/read", put(message_handler::mark_read))
.route("/messages/read-all", put(message_handler::mark_all_read))
.route("/messages/{id}", delete(message_handler::delete_message))
// SSE 实时推送
.route("/messages/stream", get(sse_handler::message_stream))
// 模板路由
.route(
"/message-templates",
get(template_handler::list_templates).post(template_handler::create_template),
)
.route(
"/message-templates/{id}",
put(template_handler::update_template).delete(template_handler::delete_template),
)
// 订阅偏好路由
.route(
"/message-subscriptions",
get(subscription_handler::get_subscription)
.put(subscription_handler::update_subscription),
)
}
/// 启动后台事件监听任务,将工作流事件转化为消息通知。
///
/// 使用 Semaphore 限制最大并发数为 8防止事件突发时过度消耗资源。
/// 在 main.rs 中调用,因为需要 db 连接。
pub fn start_event_listener(db: sea_orm::DatabaseConnection, event_bus: EventBus) {
let mut rx = event_bus.subscribe();
let semaphore = Arc::new(Semaphore::new(8));
tokio::spawn(async move {
loop {
match rx.recv().await {
Ok(event) => {
let db = db.clone();
let event_bus = event_bus.clone();
let permit = semaphore.clone();
// 先获取许可,再 spawn 任务
tokio::spawn(async move {
let _permit = permit.acquire().await.unwrap();
if let Err(e) = handle_workflow_event(&event, &db, &event_bus).await {
tracing::warn!(
event_type = %event.event_type,
error = %e,
"Failed to handle workflow event for messages"
);
}
});
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
tracing::warn!(skipped = n, "Event listener lagged, skipping events");
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
tracing::info!("Event bus closed, stopping message event listener");
break;
}
}
}
});
}
}
impl Default for MessageModule {
fn default() -> Self {
Self::new()
}
}
#[async_trait::async_trait]
impl ErpModule for MessageModule {
fn name(&self) -> &str {
"message"
}
fn version(&self) -> &str {
env!("CARGO_PKG_VERSION")
}
fn dependencies(&self) -> Vec<&str> {
vec!["auth"]
}
fn register_event_handlers(&self, _bus: &EventBus) {}
async fn on_tenant_created(
&self,
_tenant_id: Uuid,
_db: &sea_orm::DatabaseConnection,
_event_bus: &EventBus,
) -> AppResult<()> {
Ok(())
}
async fn on_tenant_deleted(
&self,
_tenant_id: Uuid,
_db: &sea_orm::DatabaseConnection,
) -> AppResult<()> {
Ok(())
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
}
/// 检查用户是否启用了 DND免打扰且当前时间在 DND 窗口内。
/// 返回 true 表示应该跳过发送。
async fn should_skip_for_dnd(
tenant_id: Uuid,
recipient_id: Uuid,
priority: &str,
db: &sea_orm::DatabaseConnection,
) -> bool {
// 紧急消息永远不跳过
if priority == "urgent" {
return false;
}
let sub = match message_subscription::Entity::find()
.filter(message_subscription::Column::TenantId.eq(tenant_id))
.filter(message_subscription::Column::UserId.eq(recipient_id))
.filter(message_subscription::Column::DeletedAt.is_null())
.one(db)
.await
{
Ok(Some(s)) => s,
_ => return false,
};
if !sub.dnd_enabled {
return false;
}
let (start, end) = match (sub.dnd_start, sub.dnd_end) {
(Some(s), Some(e)) => (s, e),
_ => return false,
};
let now = chrono::Local::now();
let now_time = now.format("%H:%M").to_string();
// DND 窗口比较(支持跨午夜,如 22:00-08:00
if start <= end {
now_time >= start && now_time < end
} else {
now_time >= start || now_time < end
}
}
/// 处理工作流事件,生成对应的消息通知。
async fn handle_workflow_event(
event: &erp_core::events::DomainEvent,
db: &sea_orm::DatabaseConnection,
event_bus: &EventBus,
) -> Result<(), String> {
match event.event_type.as_str() {
"process_instance.started" => {
let instance_id = event
.payload
.get("instance_id")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let starter_id = event.payload.get("started_by").and_then(|v| v.as_str());
if let Some(starter) = starter_id {
let recipient = match uuid::Uuid::parse_str(starter) {
Ok(id) => id,
Err(_) => return Ok(()),
};
if should_skip_for_dnd(event.tenant_id, recipient, "normal", db).await {
return Ok(());
}
let _ = crate::service::message_service::MessageService::send_system(
event.tenant_id,
recipient,
"流程已启动".to_string(),
format!("您的流程实例 {} 已启动执行。", instance_id),
"normal",
Some("workflow_instance".to_string()),
uuid::Uuid::parse_str(instance_id).ok(),
db,
event_bus,
)
.await
.map_err(|e| e.to_string())?;
}
}
"task.completed" => {
let task_id = event
.payload
.get("task_id")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let starter_id = event.payload.get("started_by").and_then(|v| v.as_str());
if let Some(starter) = starter_id {
let recipient = match uuid::Uuid::parse_str(starter) {
Ok(id) => id,
Err(_) => return Ok(()),
};
if should_skip_for_dnd(event.tenant_id, recipient, "normal", db).await {
return Ok(());
}
let _ = crate::service::message_service::MessageService::send_system(
event.tenant_id,
recipient,
"流程任务已完成".to_string(),
format!("流程任务 {} 已完成,请查看。", task_id),
"normal",
Some("workflow_task".to_string()),
uuid::Uuid::parse_str(task_id).ok(),
db,
event_bus,
)
.await
.map_err(|e| e.to_string())?;
}
}
// 预约事件通知
"appointment.created" => {
let appointment_id = event
.payload
.get("appointment_id")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let patient_id = event
.payload
.get("patient_id")
.and_then(|v| v.as_str())
.and_then(|s| uuid::Uuid::parse_str(s).ok());
if let Some(pid) = patient_id {
if should_skip_for_dnd(event.tenant_id, pid, "normal", db).await {
return Ok(());
}
let _ = crate::service::message_service::MessageService::send_system(
event.tenant_id,
pid,
"预约已创建".to_string(),
format!("您的新预约 {} 已创建,请等待确认。", &appointment_id[..8.min(appointment_id.len())]),
"normal",
Some("appointment".to_string()),
uuid::Uuid::parse_str(appointment_id).ok(),
db,
event_bus,
)
.await
.map_err(|e| e.to_string())?;
}
}
"appointment.confirmed" => {
let appointment_id = event
.payload
.get("appointment_id")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let appointment_date = event
.payload
.get("appointment_date")
.and_then(|v| v.as_str())
.unwrap_or("");
let patient_id = event
.payload
.get("patient_id")
.and_then(|v| v.as_str())
.and_then(|s| uuid::Uuid::parse_str(s).ok());
if let Some(pid) = patient_id {
if should_skip_for_dnd(event.tenant_id, pid, "important", db).await {
return Ok(());
}
let date_info = if appointment_date.is_empty() {
String::new()
} else {
format!("{}", appointment_date)
};
let _ = crate::service::message_service::MessageService::send_system(
event.tenant_id,
pid,
"预约已确认".to_string(),
format!("您的预约{}已确认,请按时就诊。", date_info),
"important",
Some("appointment".to_string()),
uuid::Uuid::parse_str(appointment_id).ok(),
db,
event_bus,
)
.await
.map_err(|e| e.to_string())?;
}
}
"appointment.cancelled" => {
let appointment_id = event
.payload
.get("appointment_id")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let patient_id = event
.payload
.get("patient_id")
.and_then(|v| v.as_str())
.and_then(|s| uuid::Uuid::parse_str(s).ok());
if let Some(pid) = patient_id {
if should_skip_for_dnd(event.tenant_id, pid, "normal", db).await {
return Ok(());
}
let _ = crate::service::message_service::MessageService::send_system(
event.tenant_id,
pid,
"预约已取消".to_string(),
format!("您的预约 {} 已被取消。", &appointment_id[..8.min(appointment_id.len())]),
"normal",
Some("appointment".to_string()),
uuid::Uuid::parse_str(appointment_id).ok(),
db,
event_bus,
)
.await
.map_err(|e| e.to_string())?;
}
}
"health_data.critical_alert" => {
let patient_name = event
.payload
.get("patient_name")
.and_then(|v| v.as_str())
.unwrap_or("未知患者");
let alert = event.payload.get("alert");
let indicator = alert
.and_then(|a| a.get("indicator"))
.and_then(|v| v.as_str())
.unwrap_or("未知指标");
let value = alert
.and_then(|a| a.get("value"))
.map(|v| v.to_string())
.unwrap_or_else(|| "?".to_string());
let direction = alert
.and_then(|a| a.get("direction"))
.and_then(|v| v.as_str())
.unwrap_or("high");
let direction_text = match direction {
"low" => "偏低",
_ => "偏高",
};
// 通知责任医生(优先)— urgent 不跳过 DND
if let Some(doctor_uid) = event
.payload
.get("doctor_user_id")
.and_then(|v| v.as_str())
.and_then(|s| uuid::Uuid::parse_str(s).ok())
{
let _ = crate::service::message_service::MessageService::send_system(
event.tenant_id,
doctor_uid,
format!("危急值告警:患者 {}", patient_name),
format!(
"患者 {}{}{}(值:{}),请立即关注处理。",
patient_name, indicator, direction_text, value
),
"urgent",
Some("critical_alert".to_string()),
Some(event.id),
db,
event_bus,
)
.await
.map_err(|e| e.to_string())?;
}
// 同时通知操作人(录入者)
if let Some(operator_uid) = event
.payload
.get("operator_id")
.and_then(|v| v.as_str())
.and_then(|s| uuid::Uuid::parse_str(s).ok())
{
let is_doctor = event
.payload
.get("doctor_user_id")
.and_then(|v| v.as_str())
.map(|s| s == operator_uid.to_string())
.unwrap_or(false);
if !is_doctor {
if should_skip_for_dnd(event.tenant_id, operator_uid, "important", db).await {
return Ok(());
}
let _ = crate::service::message_service::MessageService::send_system(
event.tenant_id,
operator_uid,
format!("危急值告警:患者 {}", patient_name),
format!(
"患者 {}{}{}(值:{})已触发危急值告警,已通知责任医生。",
patient_name, indicator, direction_text, value
),
"important",
Some("critical_alert".to_string()),
Some(event.id),
db,
event_bus,
)
.await
.map_err(|e| e.to_string())?;
}
}
}
"follow_up.overdue" => {
let task_id = event
.payload
.get("task_id")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let assigned_to = event
.payload
.get("assigned_to")
.and_then(|v| v.as_str())
.and_then(|s| uuid::Uuid::parse_str(s).ok());
let planned_date = event
.payload
.get("planned_date")
.and_then(|v| v.as_str())
.unwrap_or("未知日期");
let patient_name = event
.payload
.get("patient_name")
.and_then(|v| v.as_str())
.unwrap_or("");
if let Some(assignee) = assigned_to {
if should_skip_for_dnd(event.tenant_id, assignee, "important", db).await {
return Ok(());
}
let patient_info = if patient_name.is_empty() {
String::new()
} else {
format!("(患者:{}", patient_name)
};
let _ = crate::service::message_service::MessageService::send_system(
event.tenant_id,
assignee,
"随访任务逾期提醒".to_string(),
format!(
"您的随访任务{}(计划日期:{})已逾期,请尽快处理。",
patient_info, planned_date
),
"important",
Some("follow_up".to_string()),
uuid::Uuid::parse_str(task_id).ok(),
db,
event_bus,
)
.await
.map_err(|e| e.to_string())?;
}
}
// 咨询新消息通知医生
"consultation.new_message" => {
let doctor_id = event
.payload
.get("doctor_id")
.and_then(|v| v.as_str())
.and_then(|s| uuid::Uuid::parse_str(s).ok());
let patient_name = event
.payload
.get("patient_name")
.and_then(|v| v.as_str())
.unwrap_or("患者");
let session_id = event
.payload
.get("session_id")
.and_then(|v| v.as_str())
.unwrap_or("");
if let Some(did) = doctor_id {
if should_skip_for_dnd(event.tenant_id, did, "normal", db).await {
return Ok(());
}
let _ = crate::service::message_service::MessageService::send_system(
event.tenant_id,
did,
format!("新咨询消息 — {}", patient_name),
format!("患者 {} 发来了一条咨询消息,请及时回复。", patient_name),
"normal",
Some("consultation".to_string()),
uuid::Uuid::parse_str(session_id).ok(),
db,
event_bus,
)
.await
.map_err(|e| e.to_string())?;
}
}
// 化验报告审核完成通知患者
"lab_report.reviewed" => {
let patient_id = event
.payload
.get("patient_id")
.and_then(|v| v.as_str())
.and_then(|s| uuid::Uuid::parse_str(s).ok());
let report_type = event
.payload
.get("report_type")
.and_then(|v| v.as_str())
.unwrap_or("化验报告");
let report_id = event
.payload
.get("report_id")
.and_then(|v| v.as_str())
.unwrap_or("");
if let Some(pid) = patient_id {
if should_skip_for_dnd(event.tenant_id, pid, "normal", db).await {
return Ok(());
}
let _ = crate::service::message_service::MessageService::send_system(
event.tenant_id,
pid,
format!("{}已审核", report_type),
format!("您的{}已由医生审核完成,请查看医生注释。", report_type),
"normal",
Some("lab_report".to_string()),
uuid::Uuid::parse_str(report_id).ok(),
db,
event_bus,
)
.await
.map_err(|e| e.to_string())?;
}
}
_ => {}
}
Ok(())
}