From b9e794d7010c7de19522556e5be599313c933629 Mon Sep 17 00:00:00 2001 From: iven Date: Sat, 25 Apr 2026 19:30:02 +0800 Subject: [PATCH] =?UTF-8?q?fix(health):=20P0=20=E5=85=B3=E9=94=AE=E7=83=AD?= =?UTF-8?q?=E4=BF=AE=E5=A4=8D=20=E2=80=94=207=20=E9=A1=B9=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E5=AE=8C=E6=95=B4=E6=80=A7=E5=92=8C=E5=AE=89=E5=85=A8?= =?UTF-8?q?=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 逾期随访检查器:on_startup 现在启动定时器 + 立即执行一次 - 积分并发:earn_points 使用数据库级 CAS 替代无效的 check_version - 签到奖励:check_streak_bonus 包裹在事务中 + CAS 保护 - 活动报名:register_event 包裹在事务中 + CAS 防超员 - 咨询消息:强制 sender_id 为认证用户,防止冒充 - 逾期更新:check_overdue_tasks 现在递增 version 字段 - 趋势生成:添加 365 天范围上限,防止内存溢出 --- crates/erp-health/src/module.rs | 15 +++ .../src/service/consultation_service.rs | 7 +- .../src/service/follow_up_service.rs | 4 + .../erp-health/src/service/points_service.rs | 115 ++++++++++++++---- .../erp-health/src/service/trend_service.rs | 7 ++ 5 files changed, 122 insertions(+), 26 deletions(-) diff --git a/crates/erp-health/src/module.rs b/crates/erp-health/src/module.rs index 4b4e2a6..e50e877 100644 --- a/crates/erp-health/src/module.rs +++ b/crates/erp-health/src/module.rs @@ -414,6 +414,21 @@ impl ErpModule for HealthModule { crate::event::register_handlers_with_state(state); tracing::info!(module = "health", "Health module event handlers registered via on_startup"); + + // 启动逾期随访检查器(立即执行一次 + 每 6 小时重复) + { + let db = ctx.db.clone(); + tokio::spawn(async move { + match crate::service::follow_up_service::check_overdue_tasks(&db).await { + Ok(count) if count > 0 => tracing::info!(count = count, "启动时逾期随访检查完成"), + Ok(_) => tracing::info!("启动时逾期随访检查完成(无逾期任务)"), + Err(e) => tracing::warn!(error = %e, "启动时逾期随访检查失败"), + } + }); + } + let _overdue_handle = Self::start_overdue_checker(ctx.db.clone()); + tracing::info!(module = "health", "Overdue follow-up checker started"); + Ok(()) } diff --git a/crates/erp-health/src/service/consultation_service.rs b/crates/erp-health/src/service/consultation_service.rs index a6da53c..0917102 100644 --- a/crates/erp-health/src/service/consultation_service.rs +++ b/crates/erp-health/src/service/consultation_service.rs @@ -285,6 +285,11 @@ pub async fn create_message( let is_patient = req.sender_role == "patient"; let should_activate = session.status == "waiting"; + // 强制 sender_id 为认证用户,防止冒充 + let sender_id = operator_id.ok_or_else(|| { + HealthError::Validation("sender_id 必须与认证用户匹配".into()) + })?; + // 事务包裹:消息 INSERT + 会话 CAS 更新,保证原子性 let txn = state.db.begin().await?; @@ -293,7 +298,7 @@ pub async fn create_message( id: Set(Uuid::now_v7()), tenant_id: Set(tenant_id), session_id: Set(req.session_id), - sender_id: Set(req.sender_id), + sender_id: Set(sender_id), sender_role: Set(req.sender_role), content_type: Set(content_type), content: Set(req.content), diff --git a/crates/erp-health/src/service/follow_up_service.rs b/crates/erp-health/src/service/follow_up_service.rs index 8d45230..c8666bd 100644 --- a/crates/erp-health/src/service/follow_up_service.rs +++ b/crates/erp-health/src/service/follow_up_service.rs @@ -445,6 +445,10 @@ pub async fn check_overdue_tasks(db: &DatabaseConnection) -> HealthResult { follow_up_task::Column::UpdatedAt, sea_orm::sea_query::Expr::value(chrono::Utc::now()), ) + .col_expr( + follow_up_task::Column::Version, + sea_orm::sea_query::Expr::col(follow_up_task::Column::Version).add(1), + ) .filter(follow_up_task::Column::Status.eq("pending")) .filter(follow_up_task::Column::PlannedDate.lt(today)) .filter(follow_up_task::Column::DeletedAt.is_null()) diff --git a/crates/erp-health/src/service/points_service.rs b/crates/erp-health/src/service/points_service.rs index eef6a81..1c2dbf9 100644 --- a/crates/erp-health/src/service/points_service.rs +++ b/crates/erp-health/src/service/points_service.rs @@ -121,8 +121,8 @@ pub async fn earn_points( // 3. 在事务中执行积分获取 let txn = state.db.begin().await?; let acc = get_or_create_account(&txn, tenant_id, patient_id).await?; - let next_ver = check_version(acc.version, acc.version).unwrap_or(acc.version + 1); + // 使用数据库级 CAS 防止并发赚取导致余额丢失 let now = Utc::now(); let expires_at = now + Duration::days(365); // 12 个月过期 @@ -149,14 +149,31 @@ pub async fn earn_points( }; let inserted = txn_record.insert(&txn).await?; - // 更新账户余额 - let mut acc_active: points_account::ActiveModel = acc.into(); - acc_active.balance = Set(acc_active.balance.unwrap() + rule.points_value); - acc_active.total_earned = Set(acc_active.total_earned.unwrap() + rule.points_value); - acc_active.updated_at = Set(now); - acc_active.updated_by = Set(operator_id); - acc_active.version = Set(next_ver); - acc_active.update(&txn).await?; + // CAS 更新账户余额:基于 version 字段防止并发覆盖 + use sea_orm::sea_query::Expr; + let cas_result = points_account::Entity::update_many() + .col_expr( + points_account::Column::Balance, + Expr::col(points_account::Column::Balance).add(rule.points_value), + ) + .col_expr( + points_account::Column::TotalEarned, + Expr::col(points_account::Column::TotalEarned).add(rule.points_value), + ) + .col_expr(points_account::Column::UpdatedAt, Expr::value(now)) + .col_expr(points_account::Column::UpdatedBy, Expr::value(operator_id)) + .col_expr( + points_account::Column::Version, + Expr::col(points_account::Column::Version).add(1), + ) + .filter(points_account::Column::Id.eq(acc.id)) + .filter(points_account::Column::Version.eq(acc.version)) + .exec(&txn) + .await?; + if cas_result.rows_affected == 0 { + txn.rollback().await?; + return Err(HealthError::VersionMismatch); + } txn.commit().await?; @@ -269,8 +286,9 @@ async fn check_streak_bonus( bonus = get_streak_bonus_value(&state.db, tenant_id, "streak_30d_bonus").await?; } if bonus > 0 { - // 额外奖励通过事件系统 - let acc = get_or_create_account(&state.db, tenant_id, patient_id).await?; + // 额外奖励:在事务中执行流水写入 + 账户更新 + let txn = state.db.begin().await?; + let acc = get_or_create_account(&txn, tenant_id, patient_id).await?; let now = Utc::now(); let txn_record = points_transaction::ActiveModel { id: Set(Uuid::now_v7()), @@ -292,13 +310,35 @@ async fn check_streak_bonus( deleted_at: Set(None), version: Set(1), }; - txn_record.insert(&state.db).await?; - let mut acc_active: points_account::ActiveModel = acc.into(); - acc_active.balance = Set(acc_active.balance.unwrap() + bonus); - acc_active.total_earned = Set(acc_active.total_earned.unwrap() + bonus); - acc_active.updated_at = Set(now); - acc_active.version = Set(acc_active.version.unwrap() + 1); - acc_active.update(&state.db).await?; + txn_record.insert(&txn).await?; + + // CAS 更新账户 + use sea_orm::sea_query::Expr; + let cas_result = points_account::Entity::update_many() + .col_expr( + points_account::Column::Balance, + Expr::col(points_account::Column::Balance).add(bonus), + ) + .col_expr( + points_account::Column::TotalEarned, + Expr::col(points_account::Column::TotalEarned).add(bonus), + ) + .col_expr(points_account::Column::UpdatedAt, Expr::value(now)) + .col_expr(points_account::Column::UpdatedBy, Expr::value(operator_id)) + .col_expr( + points_account::Column::Version, + Expr::col(points_account::Column::Version).add(1), + ) + .filter(points_account::Column::Id.eq(acc.id)) + .filter(points_account::Column::Version.eq(acc.version)) + .exec(&txn) + .await?; + if cas_result.rows_affected == 0 { + txn.rollback().await?; + return Err(HealthError::VersionMismatch); + } + + txn.commit().await?; } Ok(bonus) } @@ -868,6 +908,10 @@ pub async fn register_event( } let now = Utc::now(); + + // 在事务中执行报名 + 参与人数 CAS 更新 + let txn = state.db.begin().await?; + let reg = offline_event_registration::ActiveModel { id: Set(Uuid::now_v7()), tenant_id: Set(tenant_id), @@ -884,14 +928,35 @@ pub async fn register_event( deleted_at: Set(None), version: Set(1), }; - reg.insert(&state.db).await?; + reg.insert(&txn).await?; - // 更新参与人数 - let mut event_active: offline_event::ActiveModel = event.into(); - event_active.current_participants = Set(event_active.current_participants.unwrap() + 1); - event_active.updated_at = Set(now); - event_active.version = Set(event_active.version.unwrap() + 1); - event_active.update(&state.db).await?; + // CAS 更新参与人数:防止并发超出 max_participants + use sea_orm::sea_query::Expr; + let mut cas = offline_event::Entity::update_many() + .col_expr( + offline_event::Column::CurrentParticipants, + Expr::col(offline_event::Column::CurrentParticipants).add(1), + ) + .col_expr(offline_event::Column::UpdatedAt, Expr::value(now)) + .col_expr( + offline_event::Column::Version, + Expr::col(offline_event::Column::Version).add(1), + ) + .filter(offline_event::Column::Id.eq(event_id)) + .filter(offline_event::Column::TenantId.eq(tenant_id)) + .filter(offline_event::Column::Version.eq(event.version)); + + if event.max_participants > 0 { + cas = cas.filter(offline_event::Column::CurrentParticipants.lt(event.max_participants)); + } + + let cas_result = cas.exec(&txn).await?; + if cas_result.rows_affected == 0 { + txn.rollback().await?; + return Err(HealthError::Validation("活动报名已满或版本冲突,请重试".into())); + } + + txn.commit().await?; Ok(()) } diff --git a/crates/erp-health/src/service/trend_service.rs b/crates/erp-health/src/service/trend_service.rs index 7ab78c7..2ce542a 100644 --- a/crates/erp-health/src/service/trend_service.rs +++ b/crates/erp-health/src/service/trend_service.rs @@ -6,6 +6,7 @@ use sea_orm::entity::prelude::*; use sea_orm::{ActiveValue::Set, QueryOrder, QuerySelect}; use uuid::Uuid; +use crate::error::HealthError; use erp_core::types::PaginatedResponse; use crate::dto::health_data_dto::*; @@ -59,6 +60,12 @@ pub async fn generate_trend( period_start: chrono::NaiveDate, period_end: chrono::NaiveDate, ) -> HealthResult { + // 限制日期范围,防止加载过多数据导致内存溢出 + let max_span = chrono::TimeDelta::days(365); + if (period_end - period_start) > max_span { + return Err(HealthError::Validation("趋势生成范围不能超过 365 天".to_string())); + } + // 汇总该时间段内的体征数据 let vitals = vital_signs::Entity::find() .filter(vital_signs::Column::TenantId.eq(tenant_id))