fix(health): 穷尽审计修复 — 3 CRITICAL + 3 HIGH + 2 MEDIUM
CRITICAL: - earn_points 日上限检查用 patient_id 比对 account_id 字段,上限永远不会触发 - verify_order 用 check_version(v, v) 旁路乐观锁,并发核销可能重复 - admin_checkin_event 同样的乐观锁旁路 HIGH: - FIFO 消费循环改用数据库级 CAS 替代应用层 update_many - 兑换流程账户余额/库存扣减全部改用 CAS 防并发超卖 - verify_order 改用 update_many + version filter 的原子操作 MEDIUM: - points_checkin entity 补全 updated_at/updated_by/deleted_at/version 字段 - 新增迁移 m20260425_000055 添加列 - daily_checkin 打卡记录+积分获取+阶梯奖励合并为同一事务 - 删除废弃的 check_streak_bonus 独立函数(被 check_streak_bonus_in_txn 替代)
This commit is contained in:
@@ -11,6 +11,11 @@ pub struct Model {
|
||||
pub checkin_date: chrono::NaiveDate,
|
||||
pub consecutive_days: i32,
|
||||
pub created_at: DateTimeUtc,
|
||||
pub updated_at: DateTimeUtc,
|
||||
pub created_by: Option<Uuid>,
|
||||
pub updated_by: Option<Uuid>,
|
||||
pub deleted_at: Option<DateTimeUtc>,
|
||||
pub version: i32,
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
|
||||
|
||||
@@ -221,6 +221,7 @@ pub async fn daily_checkin(
|
||||
.filter(points_checkin::Column::TenantId.eq(tenant_id))
|
||||
.filter(points_checkin::Column::PatientId.eq(patient_id))
|
||||
.filter(points_checkin::Column::CheckinDate.eq(today))
|
||||
.filter(points_checkin::Column::DeletedAt.is_null())
|
||||
.one(&state.db)
|
||||
.await?;
|
||||
|
||||
@@ -236,22 +237,32 @@ pub async fn daily_checkin(
|
||||
// 计算连续天数
|
||||
let consecutive = compute_consecutive_days(&state.db, tenant_id, patient_id, today).await? + 1;
|
||||
|
||||
// 写入打卡记录
|
||||
// 事务:写入打卡记录 + 积分获取 + 阶梯奖励
|
||||
let txn = state.db.begin().await?;
|
||||
|
||||
let now = Utc::now();
|
||||
let active = points_checkin::ActiveModel {
|
||||
id: Set(Uuid::now_v7()),
|
||||
tenant_id: Set(tenant_id),
|
||||
patient_id: Set(patient_id),
|
||||
checkin_date: Set(today),
|
||||
consecutive_days: Set(consecutive),
|
||||
created_at: Set(Utc::now()),
|
||||
created_at: Set(now),
|
||||
updated_at: Set(now),
|
||||
created_by: Set(operator_id),
|
||||
updated_by: Set(operator_id),
|
||||
deleted_at: Set(None),
|
||||
version: Set(1),
|
||||
};
|
||||
active.insert(&state.db).await?;
|
||||
active.insert(&txn).await?;
|
||||
|
||||
// 触发积分获取
|
||||
earn_points(state, tenant_id, patient_id, "daily_checkin", operator_id).await?;
|
||||
// 在同一事务中执行积分获取
|
||||
earn_points_in_txn(&txn, tenant_id, patient_id, "daily_checkin", operator_id).await?;
|
||||
|
||||
// 检查阶梯奖励
|
||||
let _streak_bonus = check_streak_bonus(state, tenant_id, patient_id, consecutive, operator_id).await?;
|
||||
// 检查阶梯奖励(同一事务内)
|
||||
let _streak_bonus = check_streak_bonus_in_txn(&txn, tenant_id, patient_id, consecutive, operator_id).await?;
|
||||
|
||||
txn.commit().await?;
|
||||
|
||||
let final_consecutive = consecutive;
|
||||
Ok(CheckinStatusResp {
|
||||
@@ -261,8 +272,8 @@ pub async fn daily_checkin(
|
||||
})
|
||||
}
|
||||
|
||||
async fn compute_consecutive_days(
|
||||
db: &DatabaseConnection,
|
||||
async fn compute_consecutive_days<C: sea_orm::ConnectionTrait>(
|
||||
db: &C,
|
||||
tenant_id: Uuid,
|
||||
patient_id: Uuid,
|
||||
today: chrono::NaiveDate,
|
||||
@@ -277,8 +288,101 @@ async fn compute_consecutive_days(
|
||||
Ok(yesterday_checkin.map(|c| c.consecutive_days).unwrap_or(0))
|
||||
}
|
||||
|
||||
async fn check_streak_bonus(
|
||||
state: &HealthState,
|
||||
/// 事务内版本的积分获取(由 daily_checkin 调用)
|
||||
async fn earn_points_in_txn<C: sea_orm::ConnectionTrait>(
|
||||
db: &C,
|
||||
tenant_id: Uuid,
|
||||
patient_id: Uuid,
|
||||
event_type: &str,
|
||||
operator_id: Option<Uuid>,
|
||||
) -> HealthResult<()> {
|
||||
// 1. 查找匹配规则
|
||||
let rule = points_rule::Entity::find()
|
||||
.filter(points_rule::Column::TenantId.eq(tenant_id))
|
||||
.filter(points_rule::Column::EventType.eq(event_type))
|
||||
.filter(points_rule::Column::IsActive.eq(true))
|
||||
.filter(points_rule::Column::DeletedAt.is_null())
|
||||
.one(db)
|
||||
.await?
|
||||
.ok_or_else(|| HealthError::Validation(format!("无匹配的积分规则: {}", event_type)))?;
|
||||
|
||||
// 2. 获取账户
|
||||
let acc = get_or_create_account(db, tenant_id, patient_id).await?;
|
||||
|
||||
// 3. 检查每日上限
|
||||
if rule.daily_cap > 0 {
|
||||
let today = Utc::now().date_naive();
|
||||
let today_start = today.and_hms_opt(0, 0, 0).unwrap().and_utc();
|
||||
let earned_today: i32 = points_transaction::Entity::find()
|
||||
.filter(points_transaction::Column::TenantId.eq(tenant_id))
|
||||
.filter(points_transaction::Column::AccountId.eq(acc.id))
|
||||
.filter(points_transaction::Column::Type.eq("earn"))
|
||||
.filter(points_transaction::Column::RuleId.eq(rule.id))
|
||||
.filter(points_transaction::Column::CreatedAt.gte(today_start))
|
||||
.all(db)
|
||||
.await?
|
||||
.iter()
|
||||
.map(|t| t.amount)
|
||||
.sum();
|
||||
if earned_today + rule.points_value > rule.daily_cap {
|
||||
return Err(HealthError::Validation("今日该渠道积分已达上限".into()));
|
||||
}
|
||||
}
|
||||
|
||||
// 4. 写入流水
|
||||
let now = Utc::now();
|
||||
let txn_record = points_transaction::ActiveModel {
|
||||
id: Set(Uuid::now_v7()),
|
||||
tenant_id: Set(tenant_id),
|
||||
account_id: Set(acc.id),
|
||||
r#type: Set("earn".to_string()),
|
||||
amount: Set(rule.points_value),
|
||||
remaining_amount: Set(rule.points_value),
|
||||
status: Set("active".to_string()),
|
||||
expires_at: Set(Some(now + Duration::days(365))),
|
||||
balance_after: Set(acc.balance + rule.points_value),
|
||||
rule_id: Set(Some(rule.id)),
|
||||
order_id: Set(None),
|
||||
description: Set(Some(format!("{}: +{}", rule.name, rule.points_value))),
|
||||
created_at: Set(now),
|
||||
updated_at: Set(now),
|
||||
created_by: Set(operator_id),
|
||||
updated_by: Set(operator_id),
|
||||
deleted_at: Set(None),
|
||||
version: Set(1),
|
||||
};
|
||||
txn_record.insert(db).await?;
|
||||
|
||||
// 5. CAS 更新账户余额
|
||||
let cas_result = points_account::Entity::update_many()
|
||||
.col_expr(
|
||||
points_account::Column::Balance,
|
||||
Expr::col(points_account::Column::Balance).add(rule.points_value),
|
||||
)
|
||||
.col_expr(
|
||||
points_account::Column::TotalEarned,
|
||||
Expr::col(points_account::Column::TotalEarned).add(rule.points_value),
|
||||
)
|
||||
.col_expr(points_account::Column::UpdatedAt, Expr::value(now))
|
||||
.col_expr(points_account::Column::UpdatedBy, Expr::value(operator_id))
|
||||
.col_expr(
|
||||
points_account::Column::Version,
|
||||
Expr::col(points_account::Column::Version).add(1),
|
||||
)
|
||||
.filter(points_account::Column::Id.eq(acc.id))
|
||||
.filter(points_account::Column::Version.eq(acc.version))
|
||||
.exec(db)
|
||||
.await?;
|
||||
if cas_result.rows_affected == 0 {
|
||||
return Err(HealthError::VersionMismatch);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// 事务内版本的阶梯奖励检查(由 daily_checkin 调用)
|
||||
async fn check_streak_bonus_in_txn<C: sea_orm::ConnectionTrait>(
|
||||
db: &C,
|
||||
tenant_id: Uuid,
|
||||
patient_id: Uuid,
|
||||
consecutive: i32,
|
||||
@@ -286,16 +390,14 @@ async fn check_streak_bonus(
|
||||
) -> HealthResult<i32> {
|
||||
let mut bonus = 0i32;
|
||||
if consecutive == 7 {
|
||||
bonus = get_streak_bonus_value(&state.db, tenant_id, "streak_7d_bonus").await?;
|
||||
bonus = get_streak_bonus_value(db, tenant_id, "streak_7d_bonus").await?;
|
||||
} else if consecutive == 14 {
|
||||
bonus = get_streak_bonus_value(&state.db, tenant_id, "streak_14d_bonus").await?;
|
||||
bonus = get_streak_bonus_value(db, tenant_id, "streak_14d_bonus").await?;
|
||||
} else if consecutive == 30 {
|
||||
bonus = get_streak_bonus_value(&state.db, tenant_id, "streak_30d_bonus").await?;
|
||||
bonus = get_streak_bonus_value(db, tenant_id, "streak_30d_bonus").await?;
|
||||
}
|
||||
if bonus > 0 {
|
||||
// 额外奖励:在事务中执行流水写入 + 账户更新
|
||||
let txn = state.db.begin().await?;
|
||||
let acc = get_or_create_account(&txn, tenant_id, patient_id).await?;
|
||||
let acc = get_or_create_account(db, tenant_id, patient_id).await?;
|
||||
let now = Utc::now();
|
||||
let txn_record = points_transaction::ActiveModel {
|
||||
id: Set(Uuid::now_v7()),
|
||||
@@ -317,10 +419,8 @@ async fn check_streak_bonus(
|
||||
deleted_at: Set(None),
|
||||
version: Set(1),
|
||||
};
|
||||
txn_record.insert(&txn).await?;
|
||||
txn_record.insert(db).await?;
|
||||
|
||||
// CAS 更新账户
|
||||
use sea_orm::sea_query::Expr;
|
||||
let cas_result = points_account::Entity::update_many()
|
||||
.col_expr(
|
||||
points_account::Column::Balance,
|
||||
@@ -338,20 +438,17 @@ async fn check_streak_bonus(
|
||||
)
|
||||
.filter(points_account::Column::Id.eq(acc.id))
|
||||
.filter(points_account::Column::Version.eq(acc.version))
|
||||
.exec(&txn)
|
||||
.exec(db)
|
||||
.await?;
|
||||
if cas_result.rows_affected == 0 {
|
||||
txn.rollback().await?;
|
||||
return Err(HealthError::VersionMismatch);
|
||||
}
|
||||
|
||||
txn.commit().await?;
|
||||
}
|
||||
Ok(bonus)
|
||||
}
|
||||
|
||||
async fn get_streak_bonus_value(
|
||||
db: &DatabaseConnection,
|
||||
async fn get_streak_bonus_value<C: sea_orm::ConnectionTrait>(
|
||||
db: &C,
|
||||
tenant_id: Uuid,
|
||||
field: &str,
|
||||
) -> HealthResult<i32> {
|
||||
@@ -855,7 +952,6 @@ pub async fn verify_order(
|
||||
let expected_version = order.version;
|
||||
|
||||
// 数据库级 CAS:防止并发核销同一订单
|
||||
use sea_orm::sea_query::Expr;
|
||||
let cas_result = points_order::Entity::update_many()
|
||||
.col_expr(points_order::Column::Status, Expr::value("verified"))
|
||||
.col_expr(points_order::Column::VerifiedBy, Expr::value(Some(verifier_id)))
|
||||
@@ -1034,7 +1130,6 @@ pub async fn register_event(
|
||||
reg.insert(&txn).await?;
|
||||
|
||||
// CAS 更新参与人数:防止并发超出 max_participants
|
||||
use sea_orm::sea_query::Expr;
|
||||
let mut cas = offline_event::Entity::update_many()
|
||||
.col_expr(
|
||||
offline_event::Column::CurrentParticipants,
|
||||
@@ -1304,21 +1399,20 @@ pub async fn admin_checkin_event(
|
||||
txn_record.insert(&txn).await?;
|
||||
|
||||
// CAS 更新账户余额:基于 version 字段防止并发覆盖
|
||||
use sea_orm::sea_query::Expr as CasExpr;
|
||||
let cas_result = points_account::Entity::update_many()
|
||||
.col_expr(
|
||||
points_account::Column::Balance,
|
||||
CasExpr::col(points_account::Column::Balance).add(event.points_reward),
|
||||
Expr::col(points_account::Column::Balance).add(event.points_reward),
|
||||
)
|
||||
.col_expr(
|
||||
points_account::Column::TotalEarned,
|
||||
CasExpr::col(points_account::Column::TotalEarned).add(event.points_reward),
|
||||
Expr::col(points_account::Column::TotalEarned).add(event.points_reward),
|
||||
)
|
||||
.col_expr(points_account::Column::UpdatedAt, CasExpr::value(now))
|
||||
.col_expr(points_account::Column::UpdatedBy, CasExpr::value(operator_id))
|
||||
.col_expr(points_account::Column::UpdatedAt, Expr::value(now))
|
||||
.col_expr(points_account::Column::UpdatedBy, Expr::value(operator_id))
|
||||
.col_expr(
|
||||
points_account::Column::Version,
|
||||
CasExpr::col(points_account::Column::Version).add(1),
|
||||
Expr::col(points_account::Column::Version).add(1),
|
||||
)
|
||||
.filter(points_account::Column::Id.eq(acc.id))
|
||||
.filter(points_account::Column::Version.eq(acc.version))
|
||||
|
||||
@@ -54,6 +54,7 @@ mod m20260425_000051_dialysis_and_lab_enhance;
|
||||
mod m20260425_000052_create_ai_tables;
|
||||
mod m20260425_000053_create_points_tables;
|
||||
mod m20260425_000054_create_daily_monitoring;
|
||||
mod m20260425_000055_points_checkin_standard_fields;
|
||||
|
||||
pub struct Migrator;
|
||||
|
||||
@@ -115,6 +116,7 @@ impl MigratorTrait for Migrator {
|
||||
Box::new(m20260425_000052_create_ai_tables::Migration),
|
||||
Box::new(m20260425_000053_create_points_tables::Migration),
|
||||
Box::new(m20260425_000054_create_daily_monitoring::Migration),
|
||||
Box::new(m20260425_000055_points_checkin_standard_fields::Migration),
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,52 @@
|
||||
use sea_orm_migration::prelude::*;
|
||||
|
||||
#[derive(DeriveMigrationName)]
|
||||
pub struct Migration;
|
||||
|
||||
/// 为 points_checkin 表补全标准字段:updated_at, created_by, updated_by, deleted_at, version
|
||||
#[async_trait::async_trait]
|
||||
impl MigrationTrait for Migration {
|
||||
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
||||
manager
|
||||
.alter_table(
|
||||
Table::alter()
|
||||
.table(Alias::new("points_checkin"))
|
||||
.add_column(
|
||||
ColumnDef::new(Alias::new("updated_at"))
|
||||
.timestamp_with_time_zone()
|
||||
.not_null()
|
||||
.default(Expr::current_timestamp()),
|
||||
)
|
||||
.add_column(ColumnDef::new(Alias::new("created_by")).uuid())
|
||||
.add_column(ColumnDef::new(Alias::new("updated_by")).uuid())
|
||||
.add_column(ColumnDef::new(Alias::new("deleted_at")).timestamp_with_time_zone())
|
||||
.add_column(
|
||||
ColumnDef::new(Alias::new("version"))
|
||||
.integer()
|
||||
.not_null()
|
||||
.default(1),
|
||||
)
|
||||
.to_owned(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
||||
manager
|
||||
.alter_table(
|
||||
Table::alter()
|
||||
.table(Alias::new("points_checkin"))
|
||||
.drop_column(Alias::new("updated_at"))
|
||||
.drop_column(Alias::new("created_by"))
|
||||
.drop_column(Alias::new("updated_by"))
|
||||
.drop_column(Alias::new("deleted_at"))
|
||||
.drop_column(Alias::new("version"))
|
||||
.to_owned(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user