fix(health): P0 关键热修复 — 7 项数据完整性和安全问题
- 逾期随访检查器:on_startup 现在启动定时器 + 立即执行一次 - 积分并发:earn_points 使用数据库级 CAS 替代无效的 check_version - 签到奖励:check_streak_bonus 包裹在事务中 + CAS 保护 - 活动报名:register_event 包裹在事务中 + CAS 防超员 - 咨询消息:强制 sender_id 为认证用户,防止冒充 - 逾期更新:check_overdue_tasks 现在递增 version 字段 - 趋势生成:添加 365 天范围上限,防止内存溢出
This commit is contained in:
@@ -414,6 +414,21 @@ impl ErpModule for HealthModule {
|
||||
|
||||
crate::event::register_handlers_with_state(state);
|
||||
tracing::info!(module = "health", "Health module event handlers registered via on_startup");
|
||||
|
||||
// 启动逾期随访检查器(立即执行一次 + 每 6 小时重复)
|
||||
{
|
||||
let db = ctx.db.clone();
|
||||
tokio::spawn(async move {
|
||||
match crate::service::follow_up_service::check_overdue_tasks(&db).await {
|
||||
Ok(count) if count > 0 => tracing::info!(count = count, "启动时逾期随访检查完成"),
|
||||
Ok(_) => tracing::info!("启动时逾期随访检查完成(无逾期任务)"),
|
||||
Err(e) => tracing::warn!(error = %e, "启动时逾期随访检查失败"),
|
||||
}
|
||||
});
|
||||
}
|
||||
let _overdue_handle = Self::start_overdue_checker(ctx.db.clone());
|
||||
tracing::info!(module = "health", "Overdue follow-up checker started");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -285,6 +285,11 @@ pub async fn create_message(
|
||||
let is_patient = req.sender_role == "patient";
|
||||
let should_activate = session.status == "waiting";
|
||||
|
||||
// 强制 sender_id 为认证用户,防止冒充
|
||||
let sender_id = operator_id.ok_or_else(|| {
|
||||
HealthError::Validation("sender_id 必须与认证用户匹配".into())
|
||||
})?;
|
||||
|
||||
// 事务包裹:消息 INSERT + 会话 CAS 更新,保证原子性
|
||||
let txn = state.db.begin().await?;
|
||||
|
||||
@@ -293,7 +298,7 @@ pub async fn create_message(
|
||||
id: Set(Uuid::now_v7()),
|
||||
tenant_id: Set(tenant_id),
|
||||
session_id: Set(req.session_id),
|
||||
sender_id: Set(req.sender_id),
|
||||
sender_id: Set(sender_id),
|
||||
sender_role: Set(req.sender_role),
|
||||
content_type: Set(content_type),
|
||||
content: Set(req.content),
|
||||
|
||||
@@ -445,6 +445,10 @@ pub async fn check_overdue_tasks(db: &DatabaseConnection) -> HealthResult<u64> {
|
||||
follow_up_task::Column::UpdatedAt,
|
||||
sea_orm::sea_query::Expr::value(chrono::Utc::now()),
|
||||
)
|
||||
.col_expr(
|
||||
follow_up_task::Column::Version,
|
||||
sea_orm::sea_query::Expr::col(follow_up_task::Column::Version).add(1),
|
||||
)
|
||||
.filter(follow_up_task::Column::Status.eq("pending"))
|
||||
.filter(follow_up_task::Column::PlannedDate.lt(today))
|
||||
.filter(follow_up_task::Column::DeletedAt.is_null())
|
||||
|
||||
@@ -121,8 +121,8 @@ pub async fn earn_points(
|
||||
// 3. 在事务中执行积分获取
|
||||
let txn = state.db.begin().await?;
|
||||
let acc = get_or_create_account(&txn, tenant_id, patient_id).await?;
|
||||
let next_ver = check_version(acc.version, acc.version).unwrap_or(acc.version + 1);
|
||||
|
||||
// 使用数据库级 CAS 防止并发赚取导致余额丢失
|
||||
let now = Utc::now();
|
||||
let expires_at = now + Duration::days(365); // 12 个月过期
|
||||
|
||||
@@ -149,14 +149,31 @@ pub async fn earn_points(
|
||||
};
|
||||
let inserted = txn_record.insert(&txn).await?;
|
||||
|
||||
// 更新账户余额
|
||||
let mut acc_active: points_account::ActiveModel = acc.into();
|
||||
acc_active.balance = Set(acc_active.balance.unwrap() + rule.points_value);
|
||||
acc_active.total_earned = Set(acc_active.total_earned.unwrap() + rule.points_value);
|
||||
acc_active.updated_at = Set(now);
|
||||
acc_active.updated_by = Set(operator_id);
|
||||
acc_active.version = Set(next_ver);
|
||||
acc_active.update(&txn).await?;
|
||||
// CAS 更新账户余额:基于 version 字段防止并发覆盖
|
||||
use sea_orm::sea_query::Expr;
|
||||
let cas_result = points_account::Entity::update_many()
|
||||
.col_expr(
|
||||
points_account::Column::Balance,
|
||||
Expr::col(points_account::Column::Balance).add(rule.points_value),
|
||||
)
|
||||
.col_expr(
|
||||
points_account::Column::TotalEarned,
|
||||
Expr::col(points_account::Column::TotalEarned).add(rule.points_value),
|
||||
)
|
||||
.col_expr(points_account::Column::UpdatedAt, Expr::value(now))
|
||||
.col_expr(points_account::Column::UpdatedBy, Expr::value(operator_id))
|
||||
.col_expr(
|
||||
points_account::Column::Version,
|
||||
Expr::col(points_account::Column::Version).add(1),
|
||||
)
|
||||
.filter(points_account::Column::Id.eq(acc.id))
|
||||
.filter(points_account::Column::Version.eq(acc.version))
|
||||
.exec(&txn)
|
||||
.await?;
|
||||
if cas_result.rows_affected == 0 {
|
||||
txn.rollback().await?;
|
||||
return Err(HealthError::VersionMismatch);
|
||||
}
|
||||
|
||||
txn.commit().await?;
|
||||
|
||||
@@ -269,8 +286,9 @@ async fn check_streak_bonus(
|
||||
bonus = get_streak_bonus_value(&state.db, tenant_id, "streak_30d_bonus").await?;
|
||||
}
|
||||
if bonus > 0 {
|
||||
// 额外奖励通过事件系统
|
||||
let acc = get_or_create_account(&state.db, tenant_id, patient_id).await?;
|
||||
// 额外奖励:在事务中执行流水写入 + 账户更新
|
||||
let txn = state.db.begin().await?;
|
||||
let acc = get_or_create_account(&txn, tenant_id, patient_id).await?;
|
||||
let now = Utc::now();
|
||||
let txn_record = points_transaction::ActiveModel {
|
||||
id: Set(Uuid::now_v7()),
|
||||
@@ -292,13 +310,35 @@ async fn check_streak_bonus(
|
||||
deleted_at: Set(None),
|
||||
version: Set(1),
|
||||
};
|
||||
txn_record.insert(&state.db).await?;
|
||||
let mut acc_active: points_account::ActiveModel = acc.into();
|
||||
acc_active.balance = Set(acc_active.balance.unwrap() + bonus);
|
||||
acc_active.total_earned = Set(acc_active.total_earned.unwrap() + bonus);
|
||||
acc_active.updated_at = Set(now);
|
||||
acc_active.version = Set(acc_active.version.unwrap() + 1);
|
||||
acc_active.update(&state.db).await?;
|
||||
txn_record.insert(&txn).await?;
|
||||
|
||||
// CAS 更新账户
|
||||
use sea_orm::sea_query::Expr;
|
||||
let cas_result = points_account::Entity::update_many()
|
||||
.col_expr(
|
||||
points_account::Column::Balance,
|
||||
Expr::col(points_account::Column::Balance).add(bonus),
|
||||
)
|
||||
.col_expr(
|
||||
points_account::Column::TotalEarned,
|
||||
Expr::col(points_account::Column::TotalEarned).add(bonus),
|
||||
)
|
||||
.col_expr(points_account::Column::UpdatedAt, Expr::value(now))
|
||||
.col_expr(points_account::Column::UpdatedBy, Expr::value(operator_id))
|
||||
.col_expr(
|
||||
points_account::Column::Version,
|
||||
Expr::col(points_account::Column::Version).add(1),
|
||||
)
|
||||
.filter(points_account::Column::Id.eq(acc.id))
|
||||
.filter(points_account::Column::Version.eq(acc.version))
|
||||
.exec(&txn)
|
||||
.await?;
|
||||
if cas_result.rows_affected == 0 {
|
||||
txn.rollback().await?;
|
||||
return Err(HealthError::VersionMismatch);
|
||||
}
|
||||
|
||||
txn.commit().await?;
|
||||
}
|
||||
Ok(bonus)
|
||||
}
|
||||
@@ -868,6 +908,10 @@ pub async fn register_event(
|
||||
}
|
||||
|
||||
let now = Utc::now();
|
||||
|
||||
// 在事务中执行报名 + 参与人数 CAS 更新
|
||||
let txn = state.db.begin().await?;
|
||||
|
||||
let reg = offline_event_registration::ActiveModel {
|
||||
id: Set(Uuid::now_v7()),
|
||||
tenant_id: Set(tenant_id),
|
||||
@@ -884,14 +928,35 @@ pub async fn register_event(
|
||||
deleted_at: Set(None),
|
||||
version: Set(1),
|
||||
};
|
||||
reg.insert(&state.db).await?;
|
||||
reg.insert(&txn).await?;
|
||||
|
||||
// 更新参与人数
|
||||
let mut event_active: offline_event::ActiveModel = event.into();
|
||||
event_active.current_participants = Set(event_active.current_participants.unwrap() + 1);
|
||||
event_active.updated_at = Set(now);
|
||||
event_active.version = Set(event_active.version.unwrap() + 1);
|
||||
event_active.update(&state.db).await?;
|
||||
// CAS 更新参与人数:防止并发超出 max_participants
|
||||
use sea_orm::sea_query::Expr;
|
||||
let mut cas = offline_event::Entity::update_many()
|
||||
.col_expr(
|
||||
offline_event::Column::CurrentParticipants,
|
||||
Expr::col(offline_event::Column::CurrentParticipants).add(1),
|
||||
)
|
||||
.col_expr(offline_event::Column::UpdatedAt, Expr::value(now))
|
||||
.col_expr(
|
||||
offline_event::Column::Version,
|
||||
Expr::col(offline_event::Column::Version).add(1),
|
||||
)
|
||||
.filter(offline_event::Column::Id.eq(event_id))
|
||||
.filter(offline_event::Column::TenantId.eq(tenant_id))
|
||||
.filter(offline_event::Column::Version.eq(event.version));
|
||||
|
||||
if event.max_participants > 0 {
|
||||
cas = cas.filter(offline_event::Column::CurrentParticipants.lt(event.max_participants));
|
||||
}
|
||||
|
||||
let cas_result = cas.exec(&txn).await?;
|
||||
if cas_result.rows_affected == 0 {
|
||||
txn.rollback().await?;
|
||||
return Err(HealthError::Validation("活动报名已满或版本冲突,请重试".into()));
|
||||
}
|
||||
|
||||
txn.commit().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ use sea_orm::entity::prelude::*;
|
||||
use sea_orm::{ActiveValue::Set, QueryOrder, QuerySelect};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::error::HealthError;
|
||||
use erp_core::types::PaginatedResponse;
|
||||
|
||||
use crate::dto::health_data_dto::*;
|
||||
@@ -59,6 +60,12 @@ pub async fn generate_trend(
|
||||
period_start: chrono::NaiveDate,
|
||||
period_end: chrono::NaiveDate,
|
||||
) -> HealthResult<TrendResp> {
|
||||
// 限制日期范围,防止加载过多数据导致内存溢出
|
||||
let max_span = chrono::TimeDelta::days(365);
|
||||
if (period_end - period_start) > max_span {
|
||||
return Err(HealthError::Validation("趋势生成范围不能超过 365 天".to_string()));
|
||||
}
|
||||
|
||||
// 汇总该时间段内的体征数据
|
||||
let vitals = vital_signs::Entity::find()
|
||||
.filter(vital_signs::Column::TenantId.eq(tenant_id))
|
||||
|
||||
Reference in New Issue
Block a user