feat(health): 事件驱动集成 + 数据一致性修复 + 逾期随访检查

- event.rs 重写为有状态处理器(订阅 workflow.task.completed / message.sent)
- module.rs on_startup 初始化 HealthCrypto 并注册事件处理器
- consultation_service 消息发送改为事务包裹(INSERT + CAS 原子更新)
- appointment_service 取消预约释放排班名额增加下限保护
- appointment_service update_schedule 增加 max_appointments >= current_appointments 校验
- follow_up_service 新增 complete_task_by_system 和 check_overdue_tasks
- validation.rs 随访状态机增加 overdue 状态支持
- main.rs 启动时运行逾期随访检查后台任务
This commit is contained in:
iven
2026-04-25 00:30:32 +08:00
parent 6c70e2a783
commit 43e127d4f7
7 changed files with 202 additions and 35 deletions

View File

@@ -1,17 +1,52 @@
use erp_core::events::EventBus;
pub fn register_handlers(bus: &EventBus) {
// workflow.task.completed → 更新随访任务状态
let (mut workflow_rx, _wf_handle) = bus.subscribe_filtered("workflow.task.".to_string());
/// 兼容旧签名 — 不做任何实际订阅(逻辑已迁移到 on_startup
pub fn register_handlers(_bus: &EventBus) {
// 事件处理器已迁移到 on_startup → register_handlers_with_state
}
/// 带 HealthState 的事件订阅 — 在模块 on_startup 时调用
pub fn register_handlers_with_state(state: crate::state::HealthState) {
// workflow.task.completed → 更新随访任务状态为 completed
let (mut workflow_rx, _wf_handle) = state.event_bus.subscribe_filtered("workflow.task.".to_string());
let db = state.db.clone();
tokio::spawn(async move {
loop {
match workflow_rx.recv().await {
Some(event) if event.event_type == "workflow.task.completed" => {
tracing::info!(
event_id = %event.id,
"健康模块收到工作流任务完成事件"
);
// 后续可通过 db 连接更新 follow_up_task 状态
// 从 payload 中提取 task_id
let task_id = event.payload.get("task_id").and_then(|v| v.as_str()).and_then(|s| uuid::Uuid::parse_str(s).ok());
match task_id {
Some(task_id) => {
match crate::service::follow_up_service::complete_task_by_system(
&db, task_id, event.tenant_id,
)
.await
{
Ok(()) => {
tracing::info!(
event_id = %event.id,
task_id = %task_id,
"工作流任务完成 → 随访任务已更新"
);
}
Err(e) => {
tracing::warn!(
event_id = %event.id,
task_id = %task_id,
error = %e,
"工作流任务完成 → 随访任务更新失败"
);
}
}
}
None => {
tracing::warn!(
event_id = %event.id,
"工作流任务完成事件缺少 task_id跳过"
);
}
}
}
Some(_) => {}
None => break,
@@ -19,17 +54,16 @@ pub fn register_handlers(bus: &EventBus) {
}
});
// message.sent → 联动咨询会话 last_message_at
let (mut msg_rx, _msg_handle) = bus.subscribe_filtered("message.".to_string());
// message.sent → 预留:后续联动咨询会话 last_message_at
let (mut msg_rx, _msg_handle) = state.event_bus.subscribe_filtered("message.".to_string());
tokio::spawn(async move {
loop {
match msg_rx.recv().await {
Some(event) if event.event_type == "message.sent" => {
tracing::info!(
event_id = %event.id,
"健康模块收到消息发送事件"
"健康模块收到消息发送事件(暂不处理)"
);
// 后续可通过 db 连接更新 consultation_session.last_message_at
}
Some(_) => {}
None => break,

View File

@@ -17,6 +17,21 @@ impl HealthModule {
Self
}
/// 启动定时逾期随访检查(每 6 小时运行一次)
pub fn start_overdue_checker(db: sea_orm::DatabaseConnection) {
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(6 * 3600));
loop {
interval.tick().await;
match crate::service::follow_up_service::check_overdue_tasks(&db).await {
Ok(count) if count > 0 => tracing::info!(count = count, "随访逾期检查完成"),
Ok(_) => {}
Err(e) => tracing::warn!(error = %e, "随访逾期检查失败"),
}
}
});
}
pub fn public_routes<S>() -> Router<S>
where
crate::state::HealthState: axum::extract::FromRef<S>,
@@ -235,8 +250,29 @@ impl ErpModule for HealthModule {
vec!["auth"]
}
fn register_event_handlers(&self, bus: &EventBus) {
crate::event::register_handlers(bus);
fn register_event_handlers(&self, _bus: &EventBus) {
// 事件处理器已迁移到 on_startup此处保留空实现以兼容 trait 签名
}
async fn on_startup(&self, ctx: &erp_core::module::ModuleContext) -> erp_core::error::AppResult<()> {
let crypto = crate::crypto::HealthCrypto::from_keys(
&std::env::var("HEALTH_AES_KEY").unwrap_or_default(),
&std::env::var("HEALTH_HMAC_KEY").unwrap_or_default(),
)
.unwrap_or_else(|_| {
tracing::warn!("HEALTH_AES_KEY / HEALTH_HMAC_KEY 未设置或无效,使用开发默认密钥");
crate::crypto::HealthCrypto::dev_default()
});
let state = crate::state::HealthState {
db: ctx.db.clone(),
event_bus: ctx.event_bus.clone(),
crypto,
};
crate::event::register_handlers_with_state(state);
tracing::info!(module = "health", "Health module event handlers registered via on_startup");
Ok(())
}
async fn on_tenant_created(

View File

@@ -233,9 +233,14 @@ pub async fn update_appointment_status(
.filter(doctor_schedule::Column::DeletedAt.is_null())
.filter(Expr::col(doctor_schedule::Column::CurrentAppointments).gt(0))
.exec(&txn)
.await;
if let Err(e) = release_result {
tracing::error!(error = %e, "取消预约时释放排班名额失败");
.await
.map_err(|e| HealthError::DbError(format!("取消预约时释放排班名额失败: {}", e)))?;
if release_result.rows_affected == 0 {
tracing::warn!(
doctor_id = %did,
date = %model.appointment_date,
"取消预约时未找到匹配排班记录,可能已被删除"
);
}
}
}
@@ -386,6 +391,15 @@ pub async fn update_schedule(
if let Some(ref s) = req.status { validate_schedule_status(s)?; }
// 不允许将 max_appointments 设为小于当前已预约数
if let Some(new_max) = req.max_appointments {
if new_max < model.current_appointments {
return Err(HealthError::Validation(
format!("max_appointments ({}) 不能小于当前已预约数 ({})", new_max, model.current_appointments)
));
}
}
let mut active: doctor_schedule::ActiveModel = model.into();
if let Some(v) = req.start_time { active.start_time = Set(v); }
if let Some(v) = req.end_time { active.end_time = Set(v); }

View File

@@ -5,7 +5,7 @@ use erp_core::audit::AuditLog;
use erp_core::audit_service;
use erp_core::events::DomainEvent;
use sea_orm::entity::prelude::*;
use sea_orm::{ActiveValue::Set, Condition, QueryOrder, QuerySelect};
use sea_orm::{ActiveValue::Set, Condition, QueryOrder, QuerySelect, TransactionTrait};
use uuid::Uuid;
use erp_core::error::check_version;
@@ -273,6 +273,9 @@ pub async fn create_message(
let is_patient = req.sender_role == "patient";
let should_activate = session.status == "waiting";
// 事务包裹:消息 INSERT + 会话 CAS 更新,保证原子性
let txn = state.db.begin().await?;
// 创建消息
let active = consultation_message::ActiveModel {
id: Set(Uuid::now_v7()),
@@ -290,7 +293,7 @@ pub async fn create_message(
deleted_at: Set(None),
version: Set(1),
};
let m = active.insert(&state.db).await?;
let m = active.insert(&txn).await?;
// 更新会话的 last_message_at 和未读计数waiting→active 自动触发
// 使用 CAS 防止并发发消息时丢失 unread_count 更新
@@ -317,11 +320,14 @@ pub async fn create_message(
Expr::col(consultation_session::Column::UnreadCountPatient).add(1),
);
}
let cas_result = cas.exec(&state.db).await?;
let cas_result = cas.exec(&txn).await?;
if cas_result.rows_affected == 0 {
txn.rollback().await?;
return Err(HealthError::VersionMismatch);
}
txn.commit().await?;
audit_service::record(
AuditLog::new(tenant_id, operator_id, "consultation.message_sent", "consultation")
.with_resource_id(m.id),

View File

@@ -367,21 +367,73 @@ pub async fn list_records(
Ok(PaginatedResponse { data, total, page, page_size: limit, total_pages })
}
/// 随访任务状态机: pending → in_progress/cancelled, in_progress → completed/cancelled
/// 随访任务状态机(委托给 validation 模块公共函数)
fn validate_follow_up_status_transition(current: &str, new_status: &str) -> HealthResult<()> {
if current == new_status {
return Ok(());
}
let allowed = match current {
"pending" => matches!(new_status, "in_progress" | "cancelled"),
"in_progress" => matches!(new_status, "completed" | "cancelled"),
_ => false,
};
if allowed {
Ok(())
} else {
Err(HealthError::InvalidStatusTransition(format!(
"follow_up_task.status: 不允许从 '{}' 转换到 '{}'", current, new_status
)))
crate::service::validation::validate_follow_up_status_transition(current, new_status)
}
// ---------------------------------------------------------------------------
// 系统自动化操作(由事件处理器调用)
// ---------------------------------------------------------------------------
/// 工作流任务完成时自动将随访任务标记为 completed。
/// 仅当当前状态为 pending 或 in_progress 时才更新,其他状态忽略。
pub async fn complete_task_by_system(
db: &DatabaseConnection,
task_id: Uuid,
tenant_id: Uuid,
) -> HealthResult<()> {
let model = follow_up_task::Entity::find()
.filter(follow_up_task::Column::Id.eq(task_id))
.filter(follow_up_task::Column::TenantId.eq(tenant_id))
.filter(follow_up_task::Column::DeletedAt.is_null())
.one(db)
.await?;
match model {
Some(m) if m.status == "pending" || m.status == "in_progress" => {
let mut active: follow_up_task::ActiveModel = m.into();
active.status = Set("completed".to_string());
active.updated_at = Set(Utc::now());
active.version = Set(active.version.unwrap() + 1);
active.update(db).await?;
Ok(())
}
Some(_) => {
// 非 pending/in_progress 状态,不做任何更新
Ok(())
}
None => {
// 随访任务不存在,可能不属于健康模块
Ok(())
}
}
}
// ---------------------------------------------------------------------------
// 定时任务:逾期随访检查
// ---------------------------------------------------------------------------
/// 批量将 planned_date < 今天 且 status = pending 的随访任务标记为 overdue。
/// 返回受影响的行数。
pub async fn check_overdue_tasks(db: &DatabaseConnection) -> HealthResult<u64> {
use sea_orm::QueryFilter;
let today = chrono::Utc::now().date_naive();
let result = follow_up_task::Entity::update_many()
.col_expr(
follow_up_task::Column::Status,
sea_orm::sea_query::Expr::value("overdue".to_string()),
)
.col_expr(
follow_up_task::Column::UpdatedAt,
sea_orm::sea_query::Expr::value(chrono::Utc::now()),
)
.filter(follow_up_task::Column::Status.eq("pending"))
.filter(follow_up_task::Column::PlannedDate.lt(today))
.filter(follow_up_task::Column::DeletedAt.is_null())
.exec(db)
.await?;
Ok(result.rows_affected)
}

View File

@@ -130,3 +130,24 @@ pub fn validate_online_status(value: &str) -> HealthResult<()> {
validate_enum!(value, "online_status", ["online", "offline", "busy"]);
Ok(())
}
/// follow_up_task.status 状态转换(含 overdue 状态)
pub fn validate_follow_up_status_transition(current: &str, new: &str) -> HealthResult<()> {
if current == new {
return Ok(());
}
let allowed = match current {
"pending" => matches!(new, "in_progress" | "cancelled" | "overdue"),
"in_progress" => matches!(new, "completed" | "cancelled"),
"overdue" => matches!(new, "in_progress" | "cancelled"),
_ => false,
};
if allowed {
Ok(())
} else {
Err(HealthError::InvalidStatusTransition(format!(
"follow_up_task.status: 不允许从 '{}' 转换到 '{}'",
current, new
)))
}
}

View File

@@ -389,6 +389,10 @@ async fn main() -> anyhow::Result<()> {
erp_workflow::WorkflowModule::start_timeout_checker(db.clone());
tracing::info!("Timeout checker started");
// Start follow-up overdue checker (every 6 hours)
erp_health::HealthModule::start_overdue_checker(db.clone());
tracing::info!("Follow-up overdue checker started");
let host = config.server.host.clone();
let port = config.server.port;