Files
base/crates/erp-message/src/module.rs
iven 3772afd987 chore: 干净 ERP 基座 — 删除 health/ai/wechat 业务代码
删除内容:
- 前端: health/(67文件), ai/(2文件), Copilot, MediaPicker, 相关API/Store/Hook
- 后端: wechat_handler, wechat_service, wechat_user entity, analytics handler, ai_workflow_seed
- 配置: WechatConfig, AppConfig.wechat, AuthState wechat 字段
- 启动: 微信凭据检查块, ensure_ai_workflows() 调用
- 迁移: 新增 m20260613_000170_drop_wechat_users.rs
- 脚本: api_test_health_alert.py, api_test_mp.py, mpsync.sh/ps1
- E2E: health-data page, flows/ 目录

保留: erp-core/auth/workflow/message/config/plugin + 基座前端 + 通用组件
2026-06-13 00:32:50 +08:00

1284 lines
47 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
use axum::Router;
use axum::routing::{delete, get, put};
use sea_orm::{ColumnTrait, EntityTrait, FromQueryResult, QueryFilter};
use std::sync::Arc;
use tokio::sync::Semaphore;
use uuid::Uuid;
use erp_core::error::AppResult;
use erp_core::events::EventBus;
use erp_core::module::{ErpModule, PermissionDescriptor};
use crate::entity::message_subscription;
use crate::handler::{message_handler, sse_handler, subscription_handler, template_handler};
/// 消息中心模块,实现 ErpModule trait。
pub struct MessageModule;
impl MessageModule {
pub fn new() -> Self {
Self
}
/// 构建需要认证的路由。
pub fn protected_routes<S>() -> Router<S>
where
crate::message_state::MessageState: axum::extract::FromRef<S>,
S: Clone + Send + Sync + 'static,
{
Router::new()
// 消息路由
.route(
"/messages",
get(message_handler::list_messages).post(message_handler::send_message),
)
.route("/messages/unread-count", get(message_handler::unread_count))
.route("/messages/{id}/read", put(message_handler::mark_read))
.route("/messages/read-all", put(message_handler::mark_all_read))
.route("/messages/{id}", delete(message_handler::delete_message))
// SSE 实时推送
.route("/messages/stream", get(sse_handler::message_stream))
// 模板路由
.route(
"/message-templates",
get(template_handler::list_templates).post(template_handler::create_template),
)
.route(
"/message-templates/{id}",
put(template_handler::update_template).delete(template_handler::delete_template),
)
// 订阅偏好路由
.route(
"/message-subscriptions",
get(subscription_handler::get_subscription)
.put(subscription_handler::update_subscription),
)
}
/// 启动后台事件监听任务,将工作流事件转化为消息通知。
///
/// 使用 Semaphore 限制最大并发数为 8防止事件突发时过度消耗资源。
/// 在 main.rs 中调用,因为需要 db 连接。
pub fn start_event_listener(db: sea_orm::DatabaseConnection, event_bus: EventBus) {
let mut rx = event_bus.subscribe();
let semaphore = Arc::new(Semaphore::new(8));
tokio::spawn(async move {
loop {
match rx.recv().await {
Ok(event) => {
let db = db.clone();
let event_bus = event_bus.clone();
let permit = semaphore.clone();
// 先获取许可,再 spawn 任务
tokio::spawn(async move {
let _permit = match permit.acquire().await {
Ok(p) => p,
Err(_) => {
tracing::warn!("信号量已关闭,跳过工作流事件处理");
return;
}
};
if let Err(e) = handle_workflow_event(&event, &db, &event_bus).await {
tracing::warn!(
event_type = %event.event_type,
error = %e,
"Failed to handle workflow event for messages"
);
}
});
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
tracing::warn!(skipped = n, "Event listener lagged, skipping events");
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
tracing::info!("Event bus closed, stopping message event listener");
break;
}
}
}
});
}
}
impl Default for MessageModule {
fn default() -> Self {
Self::new()
}
}
#[async_trait::async_trait]
impl ErpModule for MessageModule {
fn name(&self) -> &str {
"message"
}
fn version(&self) -> &str {
env!("CARGO_PKG_VERSION")
}
fn dependencies(&self) -> Vec<&str> {
vec!["auth"]
}
fn register_event_handlers(&self, _bus: &EventBus) {}
async fn on_tenant_created(
&self,
_tenant_id: Uuid,
_db: &sea_orm::DatabaseConnection,
_event_bus: &EventBus,
) -> AppResult<()> {
Ok(())
}
async fn on_tenant_deleted(
&self,
_tenant_id: Uuid,
_db: &sea_orm::DatabaseConnection,
) -> AppResult<()> {
Ok(())
}
fn permissions(&self) -> Vec<PermissionDescriptor> {
vec![
PermissionDescriptor {
code: "message.list".into(),
name: "查看消息".into(),
description: "查看消息列表".into(),
module: "message".into(),
},
PermissionDescriptor {
code: "message.send".into(),
name: "发送消息".into(),
description: "发送新消息".into(),
module: "message".into(),
},
PermissionDescriptor {
code: "message.template.list".into(),
name: "查看消息模板".into(),
description: "查看消息模板列表".into(),
module: "message".into(),
},
PermissionDescriptor {
code: "message.template.create".into(),
name: "创建消息模板".into(),
description: "创建消息模板".into(),
module: "message".into(),
},
PermissionDescriptor {
code: "message.template.manage".into(),
name: "管理消息模板".into(),
description: "编辑、删除消息模板".into(),
module: "message".into(),
},
]
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
}
/// 检查用户是否启用了 DND免打扰且当前时间在 DND 窗口内。
/// 返回 true 表示应该跳过发送。
async fn should_skip_for_dnd(
tenant_id: Uuid,
recipient_id: Uuid,
priority: &str,
db: &sea_orm::DatabaseConnection,
) -> bool {
// 紧急消息永远不跳过
if priority == "urgent" {
return false;
}
let sub = match message_subscription::Entity::find()
.filter(message_subscription::Column::TenantId.eq(tenant_id))
.filter(message_subscription::Column::UserId.eq(recipient_id))
.filter(message_subscription::Column::DeletedAt.is_null())
.one(db)
.await
{
Ok(Some(s)) => s,
_ => return false,
};
if !sub.dnd_enabled {
return false;
}
let (start, end) = match (sub.dnd_start, sub.dnd_end) {
(Some(s), Some(e)) => (s, e),
_ => return false,
};
let now = chrono::Local::now();
let now_time = now.format("%H:%M").to_string();
is_in_dnd_window(&now_time, &start, &end)
}
/// 判断当前时间是否在 DND 窗口内。支持跨午夜窗口(如 22:00-06:00
pub(crate) fn is_in_dnd_window(now_time: &str, start: &str, end: &str) -> bool {
if start <= end {
now_time >= start && now_time < end
} else {
now_time >= start || now_time < end
}
}
/// 处理工作流事件,生成对应的消息通知。
async fn handle_workflow_event(
event: &erp_core::events::DomainEvent,
db: &sea_orm::DatabaseConnection,
event_bus: &EventBus,
) -> Result<(), String> {
match event.event_type.as_str() {
"process_instance.started" => {
let instance_id = event
.payload
.get("instance_id")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let starter_id = event.payload.get("started_by").and_then(|v| v.as_str());
if let Some(starter) = starter_id {
let recipient = match uuid::Uuid::parse_str(starter) {
Ok(id) => id,
Err(_) => return Ok(()),
};
if should_skip_for_dnd(event.tenant_id, recipient, "normal", db).await {
return Ok(());
}
let _ = crate::service::message_service::MessageService::send_system(
event.tenant_id,
recipient,
"流程已启动".to_string(),
format!("您的流程实例 {} 已启动执行。", instance_id),
"normal",
Some("workflow_instance".to_string()),
uuid::Uuid::parse_str(instance_id).ok(),
db,
event_bus,
)
.await
.map_err(|e| e.to_string())?;
}
}
"task.completed" => {
let task_id = event
.payload
.get("task_id")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let starter_id = event.payload.get("started_by").and_then(|v| v.as_str());
if let Some(starter) = starter_id {
let recipient = match uuid::Uuid::parse_str(starter) {
Ok(id) => id,
Err(_) => return Ok(()),
};
if should_skip_for_dnd(event.tenant_id, recipient, "normal", db).await {
return Ok(());
}
let _ = crate::service::message_service::MessageService::send_system(
event.tenant_id,
recipient,
"流程任务已完成".to_string(),
format!("流程任务 {} 已完成,请查看。", task_id),
"normal",
Some("workflow_task".to_string()),
uuid::Uuid::parse_str(task_id).ok(),
db,
event_bus,
)
.await
.map_err(|e| e.to_string())?;
}
}
// 预约事件通知
"appointment.created" => {
let appointment_id = event
.payload
.get("appointment_id")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let patient_id = event
.payload
.get("patient_id")
.and_then(|v| v.as_str())
.and_then(|s| uuid::Uuid::parse_str(s).ok());
if let Some(pid) = patient_id {
if should_skip_for_dnd(event.tenant_id, pid, "normal", db).await {
return Ok(());
}
let _ = crate::service::message_service::MessageService::send_system(
event.tenant_id,
pid,
"预约已创建".to_string(),
format!(
"您的新预约 {} 已创建,请等待确认。",
&appointment_id[..8.min(appointment_id.len())]
),
"normal",
Some("appointment".to_string()),
uuid::Uuid::parse_str(appointment_id).ok(),
db,
event_bus,
)
.await
.map_err(|e| e.to_string())?;
}
}
"appointment.confirmed" => {
let appointment_id = event
.payload
.get("appointment_id")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let appointment_date = event
.payload
.get("appointment_date")
.and_then(|v| v.as_str())
.unwrap_or("");
let patient_id = event
.payload
.get("patient_id")
.and_then(|v| v.as_str())
.and_then(|s| uuid::Uuid::parse_str(s).ok());
if let Some(pid) = patient_id {
if should_skip_for_dnd(event.tenant_id, pid, "important", db).await {
return Ok(());
}
let date_info = if appointment_date.is_empty() {
String::new()
} else {
format!("{}", appointment_date)
};
let _ = crate::service::message_service::MessageService::send_system(
event.tenant_id,
pid,
"预约已确认".to_string(),
format!("您的预约{}已确认,请按时就诊。", date_info),
"important",
Some("appointment".to_string()),
uuid::Uuid::parse_str(appointment_id).ok(),
db,
event_bus,
)
.await
.map_err(|e| e.to_string())?;
}
}
"appointment.cancelled" => {
let appointment_id = event
.payload
.get("appointment_id")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let patient_id = event
.payload
.get("patient_id")
.and_then(|v| v.as_str())
.and_then(|s| uuid::Uuid::parse_str(s).ok());
if let Some(pid) = patient_id {
if should_skip_for_dnd(event.tenant_id, pid, "normal", db).await {
return Ok(());
}
let _ = crate::service::message_service::MessageService::send_system(
event.tenant_id,
pid,
"预约已取消".to_string(),
format!(
"您的预约 {} 已被取消。",
&appointment_id[..8.min(appointment_id.len())]
),
"normal",
Some("appointment".to_string()),
uuid::Uuid::parse_str(appointment_id).ok(),
db,
event_bus,
)
.await
.map_err(|e| e.to_string())?;
}
}
"appointment.reminder" => {
let patient_id = event
.payload
.get("patient_id")
.and_then(|v| v.as_str())
.and_then(|s| uuid::Uuid::parse_str(s).ok());
let appointment_date = event
.payload
.get("appointment_date")
.and_then(|v| v.as_str())
.unwrap_or("明天");
let time_slot = event
.payload
.get("time_slot")
.and_then(|v| v.as_str())
.unwrap_or("");
if let Some(pid) = patient_id {
if should_skip_for_dnd(event.tenant_id, pid, "normal", db).await {
return Ok(());
}
let _ = crate::service::message_service::MessageService::send_system(
event.tenant_id,
pid,
"预约提醒".to_string(),
format!(
"您明天({})有一个预约,时间段:{},请准时就诊。",
appointment_date, time_slot
),
"normal",
Some("appointment".to_string()),
event
.payload
.get("appointment_id")
.and_then(|v| v.as_str())
.and_then(|s| uuid::Uuid::parse_str(s).ok()),
db,
event_bus,
)
.await
.map_err(|e| e.to_string())?;
}
}
"health_data.critical_alert" => {
let patient_name = event
.payload
.get("patient_name")
.and_then(|v| v.as_str())
.unwrap_or("未知患者");
let alert = event.payload.get("alert");
let indicator = alert
.and_then(|a| a.get("indicator"))
.and_then(|v| v.as_str())
.unwrap_or("未知指标");
let value = alert
.and_then(|a| a.get("value"))
.map(|v| v.to_string())
.unwrap_or_else(|| "?".to_string());
let direction = alert
.and_then(|a| a.get("direction"))
.and_then(|v| v.as_str())
.unwrap_or("high");
let direction_text = match direction {
"low" => "偏低",
_ => "偏高",
};
// 通知责任医生(优先)— urgent 不跳过 DND
if let Some(doctor_uid) = event
.payload
.get("doctor_user_id")
.and_then(|v| v.as_str())
.and_then(|s| uuid::Uuid::parse_str(s).ok())
{
let _ = crate::service::message_service::MessageService::send_system(
event.tenant_id,
doctor_uid,
format!("危急值告警:患者 {}", patient_name),
format!(
"患者 {}{}{}(值:{}),请立即关注处理。",
patient_name, indicator, direction_text, value
),
"urgent",
Some("critical_alert".to_string()),
Some(event.id),
db,
event_bus,
)
.await
.map_err(|e| e.to_string())?;
}
// 同时通知操作人(录入者)
if let Some(operator_uid) = event
.payload
.get("operator_id")
.and_then(|v| v.as_str())
.and_then(|s| uuid::Uuid::parse_str(s).ok())
{
let is_doctor = event
.payload
.get("doctor_user_id")
.and_then(|v| v.as_str())
.map(|s| s == operator_uid.to_string())
.unwrap_or(false);
if !is_doctor {
if should_skip_for_dnd(event.tenant_id, operator_uid, "important", db).await {
return Ok(());
}
let _ = crate::service::message_service::MessageService::send_system(
event.tenant_id,
operator_uid,
format!("危急值告警:患者 {}", patient_name),
format!(
"患者 {}{}{}(值:{})已触发危急值告警,已通知责任医生。",
patient_name, indicator, direction_text, value
),
"important",
Some("critical_alert".to_string()),
Some(event.id),
db,
event_bus,
)
.await
.map_err(|e| e.to_string())?;
}
}
}
"follow_up.overdue" => {
let task_id = event
.payload
.get("task_id")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let assigned_to = event
.payload
.get("assigned_to")
.and_then(|v| v.as_str())
.and_then(|s| uuid::Uuid::parse_str(s).ok());
let planned_date = event
.payload
.get("planned_date")
.and_then(|v| v.as_str())
.unwrap_or("未知日期");
let patient_name = event
.payload
.get("patient_name")
.and_then(|v| v.as_str())
.unwrap_or("");
if let Some(assignee) = assigned_to {
if should_skip_for_dnd(event.tenant_id, assignee, "important", db).await {
return Ok(());
}
let patient_info = if patient_name.is_empty() {
String::new()
} else {
format!("(患者:{}", patient_name)
};
let _ = crate::service::message_service::MessageService::send_system(
event.tenant_id,
assignee,
"随访任务逾期提醒".to_string(),
format!(
"您的随访任务{}(计划日期:{})已逾期,请尽快处理。",
patient_info, planned_date
),
"important",
Some("follow_up".to_string()),
uuid::Uuid::parse_str(task_id).ok(),
db,
event_bus,
)
.await
.map_err(|e| e.to_string())?;
}
}
// 咨询新消息通知医生
"consultation.new_message" => {
let doctor_id = event
.payload
.get("doctor_id")
.and_then(|v| v.as_str())
.and_then(|s| uuid::Uuid::parse_str(s).ok());
let patient_name = event
.payload
.get("patient_name")
.and_then(|v| v.as_str())
.unwrap_or("患者");
let session_id = event
.payload
.get("session_id")
.and_then(|v| v.as_str())
.unwrap_or("");
if let Some(did) = doctor_id {
if should_skip_for_dnd(event.tenant_id, did, "normal", db).await {
return Ok(());
}
let _ = crate::service::message_service::MessageService::send_system(
event.tenant_id,
did,
format!("新咨询消息 — {}", patient_name),
format!("患者 {} 发来了一条咨询消息,请及时回复。", patient_name),
"normal",
Some("consultation".to_string()),
uuid::Uuid::parse_str(session_id).ok(),
db,
event_bus,
)
.await
.map_err(|e| e.to_string())?;
}
}
// 化验报告审核完成通知患者
"lab_report.reviewed" => {
let patient_id = event
.payload
.get("patient_id")
.and_then(|v| v.as_str())
.and_then(|s| uuid::Uuid::parse_str(s).ok());
let report_type = event
.payload
.get("report_type")
.and_then(|v| v.as_str())
.unwrap_or("化验报告");
let report_id = event
.payload
.get("report_id")
.and_then(|v| v.as_str())
.unwrap_or("");
if let Some(pid) = patient_id {
if should_skip_for_dnd(event.tenant_id, pid, "normal", db).await {
return Ok(());
}
let _ = crate::service::message_service::MessageService::send_system(
event.tenant_id,
pid,
format!("{}已审核", report_type),
format!("您的{}已由医生审核完成,请查看医生注释。", report_type),
"normal",
Some("lab_report".to_string()),
uuid::Uuid::parse_str(report_id).ok(),
db,
event_bus,
)
.await
.map_err(|e| e.to_string())?;
}
}
// 咨询会话开启通知医生
"consultation.opened" => {
let doctor_id = event
.payload
.get("doctor_id")
.and_then(|v| v.as_str())
.and_then(|s| uuid::Uuid::parse_str(s).ok());
let patient_name = event
.payload
.get("patient_name")
.and_then(|v| v.as_str())
.unwrap_or("患者");
let session_id = event
.payload
.get("session_id")
.and_then(|v| v.as_str())
.unwrap_or("");
if let Some(did) = doctor_id {
if should_skip_for_dnd(event.tenant_id, did, "normal", db).await {
return Ok(());
}
let _ = crate::service::message_service::MessageService::send_system(
event.tenant_id,
did,
format!("新咨询会话 — {}", patient_name),
format!("患者 {} 发起了咨询会话,请及时响应。", patient_name),
"normal",
Some("consultation".to_string()),
uuid::Uuid::parse_str(session_id).ok(),
db,
event_bus,
)
.await
.map_err(|e| e.to_string())?;
}
}
// 咨询会话关闭通知患者
"consultation.closed" => {
let patient_id = event
.payload
.get("patient_id")
.and_then(|v| v.as_str())
.and_then(|s| uuid::Uuid::parse_str(s).ok());
let session_id = event
.payload
.get("session_id")
.and_then(|v| v.as_str())
.unwrap_or("");
if let Some(pid) = patient_id {
if should_skip_for_dnd(event.tenant_id, pid, "normal", db).await {
return Ok(());
}
let _ = crate::service::message_service::MessageService::send_system(
event.tenant_id,
pid,
"咨询会话已结束".to_string(),
"您的咨询会话已由医生关闭,感谢使用。".to_string(),
"normal",
Some("consultation".to_string()),
uuid::Uuid::parse_str(session_id).ok(),
db,
event_bus,
)
.await
.map_err(|e| e.to_string())?;
}
}
// 随访任务创建通知被分配人
"follow_up.created" => {
let assigned_to = event
.payload
.get("assigned_to")
.and_then(|v| v.as_str())
.and_then(|s| uuid::Uuid::parse_str(s).ok());
let patient_name = event
.payload
.get("patient_name")
.and_then(|v| v.as_str())
.unwrap_or("");
let planned_date = event
.payload
.get("planned_date")
.and_then(|v| v.as_str())
.unwrap_or("未知日期");
let task_id = event
.payload
.get("task_id")
.and_then(|v| v.as_str())
.unwrap_or("");
if let Some(assignee) = assigned_to {
if should_skip_for_dnd(event.tenant_id, assignee, "normal", db).await {
return Ok(());
}
let patient_info = if patient_name.is_empty() {
String::new()
} else {
format!("(患者:{}", patient_name)
};
let _ = crate::service::message_service::MessageService::send_system(
event.tenant_id,
assignee,
format!("新随访任务{}", patient_info),
format!(
"您被分配了一个随访任务{},计划日期:{}",
patient_info, planned_date
),
"normal",
Some("follow_up".to_string()),
uuid::Uuid::parse_str(task_id).ok(),
db,
event_bus,
)
.await
.map_err(|e| e.to_string())?;
}
}
// 随访完成通知患者
"follow_up.completed" => {
let patient_id = event
.payload
.get("patient_id")
.and_then(|v| v.as_str())
.and_then(|s| uuid::Uuid::parse_str(s).ok());
if let Some(pid) = patient_id {
if should_skip_for_dnd(event.tenant_id, pid, "normal", db).await {
return Ok(());
}
let _ = crate::service::message_service::MessageService::send_system(
event.tenant_id,
pid,
"随访已完成".to_string(),
"您的随访任务已完成,感谢配合。".to_string(),
"normal",
Some("follow_up".to_string()),
None,
db,
event_bus,
)
.await
.map_err(|e| e.to_string())?;
}
}
// 积分获得通知患者
"points.earned" => {
let patient_id = event
.payload
.get("patient_id")
.and_then(|v| v.as_str())
.and_then(|s| uuid::Uuid::parse_str(s).ok());
let points = event
.payload
.get("points")
.and_then(|v| v.as_i64())
.unwrap_or(0);
let reason = event
.payload
.get("reason")
.and_then(|v| v.as_str())
.unwrap_or("系统奖励");
if let Some(pid) = patient_id {
if should_skip_for_dnd(event.tenant_id, pid, "low", db).await {
return Ok(());
}
let _ = crate::service::message_service::MessageService::send_system(
event.tenant_id,
pid,
"积分到账".to_string(),
format!("您获得了 {} 积分({})。", points, reason),
"low",
Some("points".to_string()),
None,
db,
event_bus,
)
.await
.map_err(|e| e.to_string())?;
}
}
// 积分兑换通知患者
"points.exchanged" => {
let patient_id = event
.payload
.get("patient_id")
.and_then(|v| v.as_str())
.and_then(|s| uuid::Uuid::parse_str(s).ok());
let product_name = event
.payload
.get("product_name")
.and_then(|v| v.as_str())
.unwrap_or("商品");
if let Some(pid) = patient_id {
if should_skip_for_dnd(event.tenant_id, pid, "normal", db).await {
return Ok(());
}
let _ = crate::service::message_service::MessageService::send_system(
event.tenant_id,
pid,
"兑换成功".to_string(),
format!("您已成功兑换「{}」,请留意后续通知。", product_name),
"normal",
Some("points".to_string()),
None,
db,
event_bus,
)
.await
.map_err(|e| e.to_string())?;
}
}
// 积分过期通知患者
"points.expired" => {
let patient_id = event
.payload
.get("patient_id")
.and_then(|v| v.as_str())
.and_then(|s| uuid::Uuid::parse_str(s).ok());
let points = event
.payload
.get("points")
.and_then(|v| v.as_i64())
.unwrap_or(0);
if let Some(pid) = patient_id {
if should_skip_for_dnd(event.tenant_id, pid, "low", db).await {
return Ok(());
}
let _ = crate::service::message_service::MessageService::send_system(
event.tenant_id,
pid,
"积分过期提醒".to_string(),
format!("您有 {} 积分已过期。", points),
"low",
Some("points".to_string()),
None,
db,
event_bus,
)
.await
.map_err(|e| e.to_string())?;
}
}
// 文章发布通知(暂记录日志,无直接用户推送)
"article.published" => {
tracing::info!(
article_id = %event.payload.get("article_id").and_then(|v| v.as_str()).unwrap_or(""),
"文章已发布"
);
}
// 文章驳回通知作者
"article.rejected" => {
let author_id = event
.payload
.get("author_id")
.and_then(|v| v.as_str())
.and_then(|s| uuid::Uuid::parse_str(s).ok());
let title = event
.payload
.get("title")
.and_then(|v| v.as_str())
.unwrap_or("文章");
if let Some(aid) = author_id {
if should_skip_for_dnd(event.tenant_id, aid, "normal", db).await {
return Ok(());
}
let _ = crate::service::message_service::MessageService::send_system(
event.tenant_id,
aid,
format!("文章未通过审核 — {}", title),
format!("您的文章「{}」未通过审核,请修改后重新提交。", title),
"normal",
Some("article".to_string()),
None,
db,
event_bus,
)
.await
.map_err(|e| e.to_string())?;
}
}
// 患者信息更新通知(审计日志用途)
"patient.updated" => {
tracing::info!(
patient_id = %event.payload.get("patient_id").and_then(|v| v.as_str()).unwrap_or(""),
"患者信息已更新"
);
}
// AI 分析失败通知医生
"ai.analysis.failed" => {
let doctor_id = event
.payload
.get("doctor_id")
.and_then(|v| v.as_str())
.and_then(|s| uuid::Uuid::parse_str(s).ok());
let error = event
.payload
.get("error")
.and_then(|v| v.as_str())
.unwrap_or("未知错误");
if let Some(did) = doctor_id {
if should_skip_for_dnd(event.tenant_id, did, "normal", db).await {
return Ok(());
}
let _ = crate::service::message_service::MessageService::send_system(
event.tenant_id,
did,
"AI 分析失败".to_string(),
format!("AI 分析任务执行失败:{},请稍后重试。", error),
"normal",
Some("ai".to_string()),
None,
db,
event_bus,
)
.await
.map_err(|e| e.to_string())?;
}
}
// 化验单上传记录日志
"lab_report.uploaded" => {
tracing::info!(
patient_id = %event.payload.get("patient_id").and_then(|v| v.as_str()).unwrap_or(""),
"化验单已上传"
);
}
// 日常监测记录日志
"daily_monitoring.created" => {
tracing::info!(
patient_id = %event.payload.get("patient_id").and_then(|v| v.as_str()).unwrap_or(""),
"日常监测数据已记录"
);
}
// 医生在线状态变更记录日志
"doctor.online_status_changed" => {
tracing::info!(
doctor_id = %event.payload.get("doctor_id").and_then(|v| v.as_str()).unwrap_or(""),
status = %event.payload.get("status").and_then(|v| v.as_str()).unwrap_or(""),
"医生在线状态变更"
);
}
// AI Copilot 洞察生成 → 通知主管医生
"copilot.insight.created" => {
let patient_id = event
.payload
.get("patient_id")
.and_then(|v| v.as_str())
.and_then(|s| uuid::Uuid::parse_str(s).ok());
let severity = event
.payload
.get("severity")
.and_then(|v| v.as_str())
.unwrap_or("warning");
let title = event
.payload
.get("title")
.and_then(|v| v.as_str())
.unwrap_or("AI 健康洞察");
if let Some(pid) = patient_id {
// 查询患者的责任医生(通过 follow_up_task 的 assigned_to
#[derive(sea_orm::FromQueryResult)]
struct DoctorRow {
assigned_to: uuid::Uuid,
}
let doctor: Option<DoctorRow> = DoctorRow::find_by_statement(
sea_orm::Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
"SELECT assigned_to FROM follow_up_task WHERE tenant_id = $1 AND patient_id = $2 AND assigned_to IS NOT NULL AND deleted_at IS NULL AND status IN ('pending', 'in_progress') ORDER BY created_at DESC LIMIT 1",
[event.tenant_id.into(), pid.into()],
),
)
.one(db)
.await
.unwrap_or(None);
if let Some(doc) = doctor {
let priority = match severity {
"critical" => "urgent",
"warning" | "high" => "important",
_ => "normal",
};
if should_skip_for_dnd(event.tenant_id, doc.assigned_to, priority, db).await {
return Ok(());
}
let _ = crate::service::message_service::MessageService::send_system(
event.tenant_id,
doc.assigned_to,
format!("AI 健康洞察:{}", title),
format!(
"AI 系统检测到患者存在「{}」级别的健康风险,请及时关注。洞察内容:{}",
severity, title
),
priority,
Some("ai_insight".to_string()),
Some(event.id),
db,
event_bus,
)
.await
.map_err(|e| e.to_string())?;
}
}
}
// 关怀计划激活 — 温暖通知患者
"care_plan.activated" => {
let patient_id = event
.payload
.get("patient_id")
.and_then(|v| v.as_str())
.and_then(|s| uuid::Uuid::parse_str(s).ok());
if let Some(pid) = patient_id {
if should_skip_for_dnd(event.tenant_id, pid, "normal", db).await {
return Ok(());
}
let _ = crate::service::message_service::MessageService::send_system(
event.tenant_id,
pid,
"护理计划已启动".to_string(),
"您的护理计划已启动,我们将持续关注您的健康状况。如有任何疑问,请随时咨询您的护理团队。".to_string(),
"normal",
Some("care_plan".to_string()),
event.payload.get("plan_id").and_then(|v| v.as_str()).and_then(|s| uuid::Uuid::parse_str(s).ok()),
db,
event_bus,
)
.await
.map_err(|e| e.to_string())?;
}
}
// 关怀计划完成 — 温暖通知患者
"care_plan.completed" => {
let patient_id = event
.payload
.get("patient_id")
.and_then(|v| v.as_str())
.and_then(|s| uuid::Uuid::parse_str(s).ok());
if let Some(pid) = patient_id {
if should_skip_for_dnd(event.tenant_id, pid, "normal", db).await {
return Ok(());
}
let _ = crate::service::message_service::MessageService::send_system(
event.tenant_id,
pid,
"护理计划已完成".to_string(),
"您的护理计划已完成,感谢您这段时间的配合!我们将继续关注您的健康。"
.to_string(),
"normal",
Some("care_plan".to_string()),
event
.payload
.get("plan_id")
.and_then(|v| v.as_str())
.and_then(|s| uuid::Uuid::parse_str(s).ok()),
db,
event_bus,
)
.await
.map_err(|e| e.to_string())?;
}
}
// 关怀行动执行 — 温暖通知患者(护理项完成、测量数据记录等)
"care.action.performed" => {
let patient_id = event
.payload
.get("patient_id")
.and_then(|v| v.as_str())
.and_then(|s| uuid::Uuid::parse_str(s).ok());
let action = event
.payload
.get("action")
.and_then(|v| v.as_str())
.unwrap_or("");
if let Some(pid) = patient_id {
if should_skip_for_dnd(event.tenant_id, pid, "low", db).await {
return Ok(());
}
let (title, body) = match action {
"item_completed" => {
let item_title = event
.payload
.get("item_title")
.and_then(|v| v.as_str())
.unwrap_or("护理项目");
(
"关怀已送达".to_string(),
format!("您的护理团队已完成「{}」,感谢您的配合。", item_title),
)
}
"outcome_measured" => {
let metric = event
.payload
.get("metric")
.and_then(|v| v.as_str())
.unwrap_or("健康指标");
(
"健康数据已更新".to_string(),
format!("您的{}数据已记录,护理团队正在持续关注。", metric),
)
}
_ => (
"关怀已送达".to_string(),
"您的护理团队正在关注您的健康状况。".to_string(),
),
};
let _ = crate::service::message_service::MessageService::send_system(
event.tenant_id,
pid,
title,
body,
"low",
Some("care_action".to_string()),
event
.payload
.get("plan_id")
.and_then(|v| v.as_str())
.and_then(|s| uuid::Uuid::parse_str(s).ok()),
db,
event_bus,
)
.await
.map_err(|e| e.to_string())?;
}
}
_ => {}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
// ---- DND 时间窗逻辑 ----
#[test]
fn dnd_normal_range_inside() {
// 09:00-17:00当前 12:00 → 在窗口内
assert!(is_in_dnd_window("12:00", "09:00", "17:00"));
}
#[test]
fn dnd_normal_range_before() {
// 09:00-17:00当前 08:00 → 不在窗口内
assert!(!is_in_dnd_window("08:00", "09:00", "17:00"));
}
#[test]
fn dnd_normal_range_after() {
// 09:00-17:00当前 18:00 → 不在窗口内
assert!(!is_in_dnd_window("18:00", "09:00", "17:00"));
}
#[test]
fn dnd_normal_range_at_start() {
// 09:00-17:00当前 09:00 → 在窗口内(>= start
assert!(is_in_dnd_window("09:00", "09:00", "17:00"));
}
#[test]
fn dnd_normal_range_at_end() {
// 09:00-17:00当前 17:00 → 不在窗口内(< end 排除了 end 本身)
assert!(!is_in_dnd_window("17:00", "09:00", "17:00"));
}
#[test]
fn dnd_cross_midnight_night_time() {
// 22:00-06:00当前 23:30 → 在窗口内
assert!(is_in_dnd_window("23:30", "22:00", "06:00"));
}
#[test]
fn dnd_cross_midnight_early_morning() {
// 22:00-06:00当前 03:00 → 在窗口内
assert!(is_in_dnd_window("03:00", "22:00", "06:00"));
}
#[test]
fn dnd_cross_midnight_daytime() {
// 22:00-06:00当前 14:00 → 不在窗口内
assert!(!is_in_dnd_window("14:00", "22:00", "06:00"));
}
#[test]
fn dnd_cross_midnight_at_start() {
assert!(is_in_dnd_window("22:00", "22:00", "06:00"));
}
#[test]
fn dnd_cross_midnight_at_end() {
assert!(!is_in_dnd_window("06:00", "22:00", "06:00"));
}
#[test]
fn dnd_cross_midnight_just_before_end() {
assert!(is_in_dnd_window("05:59", "22:00", "06:00"));
}
#[test]
fn dnd_same_start_end_always_in() {
// start == end 意味着 start <= end所以 now >= start && now < end
// "00:00" >= "00:00" && "00:00" < "00:00" → false
assert!(!is_in_dnd_window("00:00", "12:00", "12:00"));
// "15:00" >= "12:00" && "15:00" < "12:00" → false
assert!(!is_in_dnd_window("15:00", "12:00", "12:00"));
}
#[test]
fn dnd_single_minute_window() {
// 23:59-00:00跨午夜 1 分钟)
assert!(is_in_dnd_window("23:59", "23:59", "00:00"));
assert!(!is_in_dnd_window("00:00", "23:59", "00:00"));
}
}