diff --git a/crates/erp-health/src/event.rs b/crates/erp-health/src/event.rs index 85fe11b..01c69ea 100644 --- a/crates/erp-health/src/event.rs +++ b/crates/erp-health/src/event.rs @@ -1,17 +1,52 @@ use erp_core::events::EventBus; -pub fn register_handlers(bus: &EventBus) { - // workflow.task.completed → 更新随访任务状态 - let (mut workflow_rx, _wf_handle) = bus.subscribe_filtered("workflow.task.".to_string()); +/// 兼容旧签名 — 不做任何实际订阅(逻辑已迁移到 on_startup) +pub fn register_handlers(_bus: &EventBus) { + // 事件处理器已迁移到 on_startup → register_handlers_with_state +} + +/// 带 HealthState 的事件订阅 — 在模块 on_startup 时调用 +pub fn register_handlers_with_state(state: crate::state::HealthState) { + // workflow.task.completed → 更新随访任务状态为 completed + let (mut workflow_rx, _wf_handle) = state.event_bus.subscribe_filtered("workflow.task.".to_string()); + let db = state.db.clone(); tokio::spawn(async move { loop { match workflow_rx.recv().await { Some(event) if event.event_type == "workflow.task.completed" => { - tracing::info!( - event_id = %event.id, - "健康模块收到工作流任务完成事件" - ); - // 后续可通过 db 连接更新 follow_up_task 状态 + // 从 payload 中提取 task_id + let task_id = event.payload.get("task_id").and_then(|v| v.as_str()).and_then(|s| uuid::Uuid::parse_str(s).ok()); + match task_id { + Some(task_id) => { + match crate::service::follow_up_service::complete_task_by_system( + &db, task_id, event.tenant_id, + ) + .await + { + Ok(()) => { + tracing::info!( + event_id = %event.id, + task_id = %task_id, + "工作流任务完成 → 随访任务已更新" + ); + } + Err(e) => { + tracing::warn!( + event_id = %event.id, + task_id = %task_id, + error = %e, + "工作流任务完成 → 随访任务更新失败" + ); + } + } + } + None => { + tracing::warn!( + event_id = %event.id, + "工作流任务完成事件缺少 task_id,跳过" + ); + } + } } Some(_) => {} None => break, @@ -19,17 +54,16 @@ pub fn register_handlers(bus: &EventBus) { } }); - // message.sent → 联动咨询会话 last_message_at - let (mut msg_rx, _msg_handle) = bus.subscribe_filtered("message.".to_string()); + // message.sent → 预留:后续联动咨询会话 last_message_at + let (mut msg_rx, _msg_handle) = state.event_bus.subscribe_filtered("message.".to_string()); tokio::spawn(async move { loop { match msg_rx.recv().await { Some(event) if event.event_type == "message.sent" => { tracing::info!( event_id = %event.id, - "健康模块收到消息发送事件" + "健康模块收到消息发送事件(暂不处理)" ); - // 后续可通过 db 连接更新 consultation_session.last_message_at } Some(_) => {} None => break, diff --git a/crates/erp-health/src/module.rs b/crates/erp-health/src/module.rs index 2ff85e3..c578da5 100644 --- a/crates/erp-health/src/module.rs +++ b/crates/erp-health/src/module.rs @@ -17,6 +17,21 @@ impl HealthModule { Self } + /// 启动定时逾期随访检查(每 6 小时运行一次) + pub fn start_overdue_checker(db: sea_orm::DatabaseConnection) { + tokio::spawn(async move { + let mut interval = tokio::time::interval(std::time::Duration::from_secs(6 * 3600)); + loop { + interval.tick().await; + match crate::service::follow_up_service::check_overdue_tasks(&db).await { + Ok(count) if count > 0 => tracing::info!(count = count, "随访逾期检查完成"), + Ok(_) => {} + Err(e) => tracing::warn!(error = %e, "随访逾期检查失败"), + } + } + }); + } + pub fn public_routes() -> Router where crate::state::HealthState: axum::extract::FromRef, @@ -235,8 +250,29 @@ impl ErpModule for HealthModule { vec!["auth"] } - fn register_event_handlers(&self, bus: &EventBus) { - crate::event::register_handlers(bus); + fn register_event_handlers(&self, _bus: &EventBus) { + // 事件处理器已迁移到 on_startup,此处保留空实现以兼容 trait 签名 + } + + async fn on_startup(&self, ctx: &erp_core::module::ModuleContext) -> erp_core::error::AppResult<()> { + let crypto = crate::crypto::HealthCrypto::from_keys( + &std::env::var("HEALTH_AES_KEY").unwrap_or_default(), + &std::env::var("HEALTH_HMAC_KEY").unwrap_or_default(), + ) + .unwrap_or_else(|_| { + tracing::warn!("HEALTH_AES_KEY / HEALTH_HMAC_KEY 未设置或无效,使用开发默认密钥"); + crate::crypto::HealthCrypto::dev_default() + }); + + let state = crate::state::HealthState { + db: ctx.db.clone(), + event_bus: ctx.event_bus.clone(), + crypto, + }; + + crate::event::register_handlers_with_state(state); + tracing::info!(module = "health", "Health module event handlers registered via on_startup"); + Ok(()) } async fn on_tenant_created( diff --git a/crates/erp-health/src/service/appointment_service.rs b/crates/erp-health/src/service/appointment_service.rs index d08957d..4035dc5 100644 --- a/crates/erp-health/src/service/appointment_service.rs +++ b/crates/erp-health/src/service/appointment_service.rs @@ -233,9 +233,14 @@ pub async fn update_appointment_status( .filter(doctor_schedule::Column::DeletedAt.is_null()) .filter(Expr::col(doctor_schedule::Column::CurrentAppointments).gt(0)) .exec(&txn) - .await; - if let Err(e) = release_result { - tracing::error!(error = %e, "取消预约时释放排班名额失败"); + .await + .map_err(|e| HealthError::DbError(format!("取消预约时释放排班名额失败: {}", e)))?; + if release_result.rows_affected == 0 { + tracing::warn!( + doctor_id = %did, + date = %model.appointment_date, + "取消预约时未找到匹配排班记录,可能已被删除" + ); } } } @@ -386,6 +391,15 @@ pub async fn update_schedule( if let Some(ref s) = req.status { validate_schedule_status(s)?; } + // 不允许将 max_appointments 设为小于当前已预约数 + if let Some(new_max) = req.max_appointments { + if new_max < model.current_appointments { + return Err(HealthError::Validation( + format!("max_appointments ({}) 不能小于当前已预约数 ({})", new_max, model.current_appointments) + )); + } + } + let mut active: doctor_schedule::ActiveModel = model.into(); if let Some(v) = req.start_time { active.start_time = Set(v); } if let Some(v) = req.end_time { active.end_time = Set(v); } diff --git a/crates/erp-health/src/service/consultation_service.rs b/crates/erp-health/src/service/consultation_service.rs index ea595d7..fc3a9ca 100644 --- a/crates/erp-health/src/service/consultation_service.rs +++ b/crates/erp-health/src/service/consultation_service.rs @@ -5,7 +5,7 @@ use erp_core::audit::AuditLog; use erp_core::audit_service; use erp_core::events::DomainEvent; use sea_orm::entity::prelude::*; -use sea_orm::{ActiveValue::Set, Condition, QueryOrder, QuerySelect}; +use sea_orm::{ActiveValue::Set, Condition, QueryOrder, QuerySelect, TransactionTrait}; use uuid::Uuid; use erp_core::error::check_version; @@ -273,6 +273,9 @@ pub async fn create_message( let is_patient = req.sender_role == "patient"; let should_activate = session.status == "waiting"; + // 事务包裹:消息 INSERT + 会话 CAS 更新,保证原子性 + let txn = state.db.begin().await?; + // 创建消息 let active = consultation_message::ActiveModel { id: Set(Uuid::now_v7()), @@ -290,7 +293,7 @@ pub async fn create_message( deleted_at: Set(None), version: Set(1), }; - let m = active.insert(&state.db).await?; + let m = active.insert(&txn).await?; // 更新会话的 last_message_at 和未读计数,waiting→active 自动触发 // 使用 CAS 防止并发发消息时丢失 unread_count 更新 @@ -317,11 +320,14 @@ pub async fn create_message( Expr::col(consultation_session::Column::UnreadCountPatient).add(1), ); } - let cas_result = cas.exec(&state.db).await?; + let cas_result = cas.exec(&txn).await?; if cas_result.rows_affected == 0 { + txn.rollback().await?; return Err(HealthError::VersionMismatch); } + txn.commit().await?; + audit_service::record( AuditLog::new(tenant_id, operator_id, "consultation.message_sent", "consultation") .with_resource_id(m.id), diff --git a/crates/erp-health/src/service/follow_up_service.rs b/crates/erp-health/src/service/follow_up_service.rs index 8c9cba2..30473d0 100644 --- a/crates/erp-health/src/service/follow_up_service.rs +++ b/crates/erp-health/src/service/follow_up_service.rs @@ -367,21 +367,73 @@ pub async fn list_records( Ok(PaginatedResponse { data, total, page, page_size: limit, total_pages }) } -/// 随访任务状态机: pending → in_progress/cancelled, in_progress → completed/cancelled +/// 随访任务状态机(委托给 validation 模块公共函数) fn validate_follow_up_status_transition(current: &str, new_status: &str) -> HealthResult<()> { - if current == new_status { - return Ok(()); - } - let allowed = match current { - "pending" => matches!(new_status, "in_progress" | "cancelled"), - "in_progress" => matches!(new_status, "completed" | "cancelled"), - _ => false, - }; - if allowed { - Ok(()) - } else { - Err(HealthError::InvalidStatusTransition(format!( - "follow_up_task.status: 不允许从 '{}' 转换到 '{}'", current, new_status - ))) + crate::service::validation::validate_follow_up_status_transition(current, new_status) +} + +// --------------------------------------------------------------------------- +// 系统自动化操作(由事件处理器调用) +// --------------------------------------------------------------------------- + +/// 工作流任务完成时自动将随访任务标记为 completed。 +/// 仅当当前状态为 pending 或 in_progress 时才更新,其他状态忽略。 +pub async fn complete_task_by_system( + db: &DatabaseConnection, + task_id: Uuid, + tenant_id: Uuid, +) -> HealthResult<()> { + let model = follow_up_task::Entity::find() + .filter(follow_up_task::Column::Id.eq(task_id)) + .filter(follow_up_task::Column::TenantId.eq(tenant_id)) + .filter(follow_up_task::Column::DeletedAt.is_null()) + .one(db) + .await?; + + match model { + Some(m) if m.status == "pending" || m.status == "in_progress" => { + let mut active: follow_up_task::ActiveModel = m.into(); + active.status = Set("completed".to_string()); + active.updated_at = Set(Utc::now()); + active.version = Set(active.version.unwrap() + 1); + active.update(db).await?; + Ok(()) + } + Some(_) => { + // 非 pending/in_progress 状态,不做任何更新 + Ok(()) + } + None => { + // 随访任务不存在,可能不属于健康模块 + Ok(()) + } } } + +// --------------------------------------------------------------------------- +// 定时任务:逾期随访检查 +// --------------------------------------------------------------------------- + +/// 批量将 planned_date < 今天 且 status = pending 的随访任务标记为 overdue。 +/// 返回受影响的行数。 +pub async fn check_overdue_tasks(db: &DatabaseConnection) -> HealthResult { + use sea_orm::QueryFilter; + + let today = chrono::Utc::now().date_naive(); + let result = follow_up_task::Entity::update_many() + .col_expr( + follow_up_task::Column::Status, + sea_orm::sea_query::Expr::value("overdue".to_string()), + ) + .col_expr( + follow_up_task::Column::UpdatedAt, + sea_orm::sea_query::Expr::value(chrono::Utc::now()), + ) + .filter(follow_up_task::Column::Status.eq("pending")) + .filter(follow_up_task::Column::PlannedDate.lt(today)) + .filter(follow_up_task::Column::DeletedAt.is_null()) + .exec(db) + .await?; + + Ok(result.rows_affected) +} diff --git a/crates/erp-health/src/service/validation.rs b/crates/erp-health/src/service/validation.rs index 845a04e..825e878 100644 --- a/crates/erp-health/src/service/validation.rs +++ b/crates/erp-health/src/service/validation.rs @@ -130,3 +130,24 @@ pub fn validate_online_status(value: &str) -> HealthResult<()> { validate_enum!(value, "online_status", ["online", "offline", "busy"]); Ok(()) } + +/// follow_up_task.status 状态转换(含 overdue 状态) +pub fn validate_follow_up_status_transition(current: &str, new: &str) -> HealthResult<()> { + if current == new { + return Ok(()); + } + let allowed = match current { + "pending" => matches!(new, "in_progress" | "cancelled" | "overdue"), + "in_progress" => matches!(new, "completed" | "cancelled"), + "overdue" => matches!(new, "in_progress" | "cancelled"), + _ => false, + }; + if allowed { + Ok(()) + } else { + Err(HealthError::InvalidStatusTransition(format!( + "follow_up_task.status: 不允许从 '{}' 转换到 '{}'", + current, new + ))) + } +} diff --git a/crates/erp-server/src/main.rs b/crates/erp-server/src/main.rs index 4ab5e3b..868ad7e 100644 --- a/crates/erp-server/src/main.rs +++ b/crates/erp-server/src/main.rs @@ -389,6 +389,10 @@ async fn main() -> anyhow::Result<()> { erp_workflow::WorkflowModule::start_timeout_checker(db.clone()); tracing::info!("Timeout checker started"); + // Start follow-up overdue checker (every 6 hours) + erp_health::HealthModule::start_overdue_checker(db.clone()); + tracing::info!("Follow-up overdue checker started"); + let host = config.server.host.clone(); let port = config.server.port;