feat(health): 消息推送集成 — 定时任务启动 + 预约提醒事件
- erp-server: 启动逾期随访检查(6h)、积分过期(24h)、预约提醒(1h) 定时任务 - appointment_service: 新增 send_reminders 扫描明日确认预约发送事件 - erp-message: 订阅 appointment.reminder 事件,向患者发送提醒消息
This commit is contained in:
@@ -62,6 +62,28 @@ impl HealthModule {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// 启动预约提醒调度(每 1 小时运行一次),扫描明天有预约的患者发送提醒
|
||||||
|
pub fn start_appointment_reminder(db: sea_orm::DatabaseConnection, event_bus: erp_core::events::EventBus) -> tokio::task::JoinHandle<()> {
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let mut interval = tokio::time::interval(std::time::Duration::from_secs(3600));
|
||||||
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
_ = interval.tick() => {
|
||||||
|
match crate::service::appointment_service::send_reminders(&db, &event_bus).await {
|
||||||
|
Ok(count) if count > 0 => tracing::info!(count = count, "预约提醒发送完成"),
|
||||||
|
Ok(_) => {}
|
||||||
|
Err(e) => tracing::warn!(error = %e, "预约提醒发送失败"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ = tokio::signal::ctrl_c() => {
|
||||||
|
tracing::info!("预约提醒调度任务收到关闭信号,正在停止");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
pub fn public_routes<S>() -> Router<S>
|
pub fn public_routes<S>() -> Router<S>
|
||||||
where
|
where
|
||||||
crate::state::HealthState: axum::extract::FromRef<S>,
|
crate::state::HealthState: axum::extract::FromRef<S>,
|
||||||
|
|||||||
@@ -547,3 +547,43 @@ pub async fn calendar_view(
|
|||||||
|
|
||||||
Ok(result)
|
Ok(result)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// 扫描明天有预约的患者,通过事件总线发送预约提醒
|
||||||
|
/// 由定时任务每小时调用一次
|
||||||
|
pub async fn send_reminders(
|
||||||
|
db: &sea_orm::DatabaseConnection,
|
||||||
|
event_bus: &erp_core::events::EventBus,
|
||||||
|
) -> crate::error::HealthResult<usize> {
|
||||||
|
use chrono::{Local, NaiveDate};
|
||||||
|
use serde_json::json;
|
||||||
|
|
||||||
|
let tomorrow = Local::now().date_naive() + chrono::Duration::days(1);
|
||||||
|
|
||||||
|
// 查找明天所有确认状态的预约
|
||||||
|
let apps = appointment::Entity::find()
|
||||||
|
.filter(appointment::Column::AppointmentDate.eq(tomorrow))
|
||||||
|
.filter(appointment::Column::Status.eq("confirmed"))
|
||||||
|
.filter(appointment::Column::DeletedAt.is_null())
|
||||||
|
.all(db)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let count = apps.len();
|
||||||
|
for app in apps {
|
||||||
|
event_bus.publish(
|
||||||
|
DomainEvent::new(
|
||||||
|
"appointment.reminder",
|
||||||
|
app.tenant_id,
|
||||||
|
json!({
|
||||||
|
"appointment_id": app.id,
|
||||||
|
"patient_id": app.patient_id,
|
||||||
|
"doctor_id": app.doctor_id,
|
||||||
|
"appointment_date": app.appointment_date,
|
||||||
|
"time_slot": format!("{}-{}", app.start_time.format("%H:%M"), app.end_time.format("%H:%M")),
|
||||||
|
}),
|
||||||
|
),
|
||||||
|
db,
|
||||||
|
).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(count)
|
||||||
|
}
|
||||||
|
|||||||
@@ -352,6 +352,42 @@ async fn handle_workflow_event(
|
|||||||
.map_err(|e| e.to_string())?;
|
.map_err(|e| e.to_string())?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
"appointment.reminder" => {
|
||||||
|
let patient_id = event
|
||||||
|
.payload
|
||||||
|
.get("patient_id")
|
||||||
|
.and_then(|v| v.as_str())
|
||||||
|
.and_then(|s| uuid::Uuid::parse_str(s).ok());
|
||||||
|
let appointment_date = event
|
||||||
|
.payload
|
||||||
|
.get("appointment_date")
|
||||||
|
.and_then(|v| v.as_str())
|
||||||
|
.unwrap_or("明天");
|
||||||
|
let time_slot = event
|
||||||
|
.payload
|
||||||
|
.get("time_slot")
|
||||||
|
.and_then(|v| v.as_str())
|
||||||
|
.unwrap_or("");
|
||||||
|
|
||||||
|
if let Some(pid) = patient_id {
|
||||||
|
if should_skip_for_dnd(event.tenant_id, pid, "normal", db).await {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
let _ = crate::service::message_service::MessageService::send_system(
|
||||||
|
event.tenant_id,
|
||||||
|
pid,
|
||||||
|
"预约提醒".to_string(),
|
||||||
|
format!("您明天({})有一个预约,时间段:{},请准时就诊。", appointment_date, time_slot),
|
||||||
|
"normal",
|
||||||
|
Some("appointment".to_string()),
|
||||||
|
event.payload.get("appointment_id").and_then(|v| v.as_str()).and_then(|s| uuid::Uuid::parse_str(s).ok()),
|
||||||
|
db,
|
||||||
|
event_bus,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.map_err(|e| e.to_string())?;
|
||||||
|
}
|
||||||
|
}
|
||||||
"health_data.critical_alert" => {
|
"health_data.critical_alert" => {
|
||||||
let patient_name = event
|
let patient_name = event
|
||||||
.payload
|
.payload
|
||||||
|
|||||||
@@ -414,8 +414,17 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
erp_workflow::WorkflowModule::start_timeout_checker(db.clone(), event_bus.clone());
|
erp_workflow::WorkflowModule::start_timeout_checker(db.clone(), event_bus.clone());
|
||||||
tracing::info!("Timeout checker started");
|
tracing::info!("Timeout checker started");
|
||||||
|
|
||||||
// Start follow-up overdue checker (handled by HealthModule::on_startup)
|
// Start follow-up overdue checker (every 6h)
|
||||||
tracing::info!("Follow-up overdue checker delegated to module on_startup");
|
erp_health::HealthModule::start_overdue_checker(db.clone());
|
||||||
|
tracing::info!("Follow-up overdue checker started");
|
||||||
|
|
||||||
|
// Start points expiration checker (every 24h)
|
||||||
|
erp_health::HealthModule::start_points_expiration_checker(db.clone(), event_bus.clone());
|
||||||
|
tracing::info!("Points expiration checker started");
|
||||||
|
|
||||||
|
// Start appointment reminder scheduler (every 1h)
|
||||||
|
erp_health::HealthModule::start_appointment_reminder(db.clone(), event_bus.clone());
|
||||||
|
tracing::info!("Appointment reminder scheduler started");
|
||||||
|
|
||||||
let host = config.server.host.clone();
|
let host = config.server.host.clone();
|
||||||
let port = config.server.port;
|
let port = config.server.port;
|
||||||
|
|||||||
Reference in New Issue
Block a user