diff --git a/crates/erp-health/src/entity/points_checkin.rs b/crates/erp-health/src/entity/points_checkin.rs index 734f8ac..bd1fa62 100644 --- a/crates/erp-health/src/entity/points_checkin.rs +++ b/crates/erp-health/src/entity/points_checkin.rs @@ -11,6 +11,11 @@ pub struct Model { pub checkin_date: chrono::NaiveDate, pub consecutive_days: i32, pub created_at: DateTimeUtc, + pub updated_at: DateTimeUtc, + pub created_by: Option, + pub updated_by: Option, + pub deleted_at: Option, + pub version: i32, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] diff --git a/crates/erp-health/src/service/points_service.rs b/crates/erp-health/src/service/points_service.rs index e531886..b2739aa 100644 --- a/crates/erp-health/src/service/points_service.rs +++ b/crates/erp-health/src/service/points_service.rs @@ -221,6 +221,7 @@ pub async fn daily_checkin( .filter(points_checkin::Column::TenantId.eq(tenant_id)) .filter(points_checkin::Column::PatientId.eq(patient_id)) .filter(points_checkin::Column::CheckinDate.eq(today)) + .filter(points_checkin::Column::DeletedAt.is_null()) .one(&state.db) .await?; @@ -236,22 +237,32 @@ pub async fn daily_checkin( // 计算连续天数 let consecutive = compute_consecutive_days(&state.db, tenant_id, patient_id, today).await? + 1; - // 写入打卡记录 + // 事务:写入打卡记录 + 积分获取 + 阶梯奖励 + let txn = state.db.begin().await?; + + let now = Utc::now(); let active = points_checkin::ActiveModel { id: Set(Uuid::now_v7()), tenant_id: Set(tenant_id), patient_id: Set(patient_id), checkin_date: Set(today), consecutive_days: Set(consecutive), - created_at: Set(Utc::now()), + created_at: Set(now), + updated_at: Set(now), + created_by: Set(operator_id), + updated_by: Set(operator_id), + deleted_at: Set(None), + version: Set(1), }; - active.insert(&state.db).await?; + active.insert(&txn).await?; - // 触发积分获取 - earn_points(state, tenant_id, patient_id, "daily_checkin", operator_id).await?; + // 在同一事务中执行积分获取 + earn_points_in_txn(&txn, tenant_id, patient_id, "daily_checkin", operator_id).await?; - // 检查阶梯奖励 - let _streak_bonus = check_streak_bonus(state, tenant_id, patient_id, consecutive, operator_id).await?; + // 检查阶梯奖励(同一事务内) + let _streak_bonus = check_streak_bonus_in_txn(&txn, tenant_id, patient_id, consecutive, operator_id).await?; + + txn.commit().await?; let final_consecutive = consecutive; Ok(CheckinStatusResp { @@ -261,8 +272,8 @@ pub async fn daily_checkin( }) } -async fn compute_consecutive_days( - db: &DatabaseConnection, +async fn compute_consecutive_days( + db: &C, tenant_id: Uuid, patient_id: Uuid, today: chrono::NaiveDate, @@ -277,8 +288,101 @@ async fn compute_consecutive_days( Ok(yesterday_checkin.map(|c| c.consecutive_days).unwrap_or(0)) } -async fn check_streak_bonus( - state: &HealthState, +/// 事务内版本的积分获取(由 daily_checkin 调用) +async fn earn_points_in_txn( + db: &C, + tenant_id: Uuid, + patient_id: Uuid, + event_type: &str, + operator_id: Option, +) -> HealthResult<()> { + // 1. 查找匹配规则 + let rule = points_rule::Entity::find() + .filter(points_rule::Column::TenantId.eq(tenant_id)) + .filter(points_rule::Column::EventType.eq(event_type)) + .filter(points_rule::Column::IsActive.eq(true)) + .filter(points_rule::Column::DeletedAt.is_null()) + .one(db) + .await? + .ok_or_else(|| HealthError::Validation(format!("无匹配的积分规则: {}", event_type)))?; + + // 2. 获取账户 + let acc = get_or_create_account(db, tenant_id, patient_id).await?; + + // 3. 检查每日上限 + if rule.daily_cap > 0 { + let today = Utc::now().date_naive(); + let today_start = today.and_hms_opt(0, 0, 0).unwrap().and_utc(); + let earned_today: i32 = points_transaction::Entity::find() + .filter(points_transaction::Column::TenantId.eq(tenant_id)) + .filter(points_transaction::Column::AccountId.eq(acc.id)) + .filter(points_transaction::Column::Type.eq("earn")) + .filter(points_transaction::Column::RuleId.eq(rule.id)) + .filter(points_transaction::Column::CreatedAt.gte(today_start)) + .all(db) + .await? + .iter() + .map(|t| t.amount) + .sum(); + if earned_today + rule.points_value > rule.daily_cap { + return Err(HealthError::Validation("今日该渠道积分已达上限".into())); + } + } + + // 4. 写入流水 + let now = Utc::now(); + let txn_record = points_transaction::ActiveModel { + id: Set(Uuid::now_v7()), + tenant_id: Set(tenant_id), + account_id: Set(acc.id), + r#type: Set("earn".to_string()), + amount: Set(rule.points_value), + remaining_amount: Set(rule.points_value), + status: Set("active".to_string()), + expires_at: Set(Some(now + Duration::days(365))), + balance_after: Set(acc.balance + rule.points_value), + rule_id: Set(Some(rule.id)), + order_id: Set(None), + description: Set(Some(format!("{}: +{}", rule.name, rule.points_value))), + created_at: Set(now), + updated_at: Set(now), + created_by: Set(operator_id), + updated_by: Set(operator_id), + deleted_at: Set(None), + version: Set(1), + }; + txn_record.insert(db).await?; + + // 5. CAS 更新账户余额 + let cas_result = points_account::Entity::update_many() + .col_expr( + points_account::Column::Balance, + Expr::col(points_account::Column::Balance).add(rule.points_value), + ) + .col_expr( + points_account::Column::TotalEarned, + Expr::col(points_account::Column::TotalEarned).add(rule.points_value), + ) + .col_expr(points_account::Column::UpdatedAt, Expr::value(now)) + .col_expr(points_account::Column::UpdatedBy, Expr::value(operator_id)) + .col_expr( + points_account::Column::Version, + Expr::col(points_account::Column::Version).add(1), + ) + .filter(points_account::Column::Id.eq(acc.id)) + .filter(points_account::Column::Version.eq(acc.version)) + .exec(db) + .await?; + if cas_result.rows_affected == 0 { + return Err(HealthError::VersionMismatch); + } + + Ok(()) +} + +/// 事务内版本的阶梯奖励检查(由 daily_checkin 调用) +async fn check_streak_bonus_in_txn( + db: &C, tenant_id: Uuid, patient_id: Uuid, consecutive: i32, @@ -286,16 +390,14 @@ async fn check_streak_bonus( ) -> HealthResult { let mut bonus = 0i32; if consecutive == 7 { - bonus = get_streak_bonus_value(&state.db, tenant_id, "streak_7d_bonus").await?; + bonus = get_streak_bonus_value(db, tenant_id, "streak_7d_bonus").await?; } else if consecutive == 14 { - bonus = get_streak_bonus_value(&state.db, tenant_id, "streak_14d_bonus").await?; + bonus = get_streak_bonus_value(db, tenant_id, "streak_14d_bonus").await?; } else if consecutive == 30 { - bonus = get_streak_bonus_value(&state.db, tenant_id, "streak_30d_bonus").await?; + bonus = get_streak_bonus_value(db, tenant_id, "streak_30d_bonus").await?; } if bonus > 0 { - // 额外奖励:在事务中执行流水写入 + 账户更新 - let txn = state.db.begin().await?; - let acc = get_or_create_account(&txn, tenant_id, patient_id).await?; + let acc = get_or_create_account(db, tenant_id, patient_id).await?; let now = Utc::now(); let txn_record = points_transaction::ActiveModel { id: Set(Uuid::now_v7()), @@ -317,10 +419,8 @@ async fn check_streak_bonus( deleted_at: Set(None), version: Set(1), }; - txn_record.insert(&txn).await?; + txn_record.insert(db).await?; - // CAS 更新账户 - use sea_orm::sea_query::Expr; let cas_result = points_account::Entity::update_many() .col_expr( points_account::Column::Balance, @@ -338,20 +438,17 @@ async fn check_streak_bonus( ) .filter(points_account::Column::Id.eq(acc.id)) .filter(points_account::Column::Version.eq(acc.version)) - .exec(&txn) + .exec(db) .await?; if cas_result.rows_affected == 0 { - txn.rollback().await?; return Err(HealthError::VersionMismatch); } - - txn.commit().await?; } Ok(bonus) } -async fn get_streak_bonus_value( - db: &DatabaseConnection, +async fn get_streak_bonus_value( + db: &C, tenant_id: Uuid, field: &str, ) -> HealthResult { @@ -855,7 +952,6 @@ pub async fn verify_order( let expected_version = order.version; // 数据库级 CAS:防止并发核销同一订单 - use sea_orm::sea_query::Expr; let cas_result = points_order::Entity::update_many() .col_expr(points_order::Column::Status, Expr::value("verified")) .col_expr(points_order::Column::VerifiedBy, Expr::value(Some(verifier_id))) @@ -1034,7 +1130,6 @@ pub async fn register_event( reg.insert(&txn).await?; // CAS 更新参与人数:防止并发超出 max_participants - use sea_orm::sea_query::Expr; let mut cas = offline_event::Entity::update_many() .col_expr( offline_event::Column::CurrentParticipants, @@ -1304,21 +1399,20 @@ pub async fn admin_checkin_event( txn_record.insert(&txn).await?; // CAS 更新账户余额:基于 version 字段防止并发覆盖 - use sea_orm::sea_query::Expr as CasExpr; let cas_result = points_account::Entity::update_many() .col_expr( points_account::Column::Balance, - CasExpr::col(points_account::Column::Balance).add(event.points_reward), + Expr::col(points_account::Column::Balance).add(event.points_reward), ) .col_expr( points_account::Column::TotalEarned, - CasExpr::col(points_account::Column::TotalEarned).add(event.points_reward), + Expr::col(points_account::Column::TotalEarned).add(event.points_reward), ) - .col_expr(points_account::Column::UpdatedAt, CasExpr::value(now)) - .col_expr(points_account::Column::UpdatedBy, CasExpr::value(operator_id)) + .col_expr(points_account::Column::UpdatedAt, Expr::value(now)) + .col_expr(points_account::Column::UpdatedBy, Expr::value(operator_id)) .col_expr( points_account::Column::Version, - CasExpr::col(points_account::Column::Version).add(1), + Expr::col(points_account::Column::Version).add(1), ) .filter(points_account::Column::Id.eq(acc.id)) .filter(points_account::Column::Version.eq(acc.version)) diff --git a/crates/erp-server/migration/src/lib.rs b/crates/erp-server/migration/src/lib.rs index 4b6f47b..ad3271a 100644 --- a/crates/erp-server/migration/src/lib.rs +++ b/crates/erp-server/migration/src/lib.rs @@ -54,6 +54,7 @@ mod m20260425_000051_dialysis_and_lab_enhance; mod m20260425_000052_create_ai_tables; mod m20260425_000053_create_points_tables; mod m20260425_000054_create_daily_monitoring; +mod m20260425_000055_points_checkin_standard_fields; pub struct Migrator; @@ -115,6 +116,7 @@ impl MigratorTrait for Migrator { Box::new(m20260425_000052_create_ai_tables::Migration), Box::new(m20260425_000053_create_points_tables::Migration), Box::new(m20260425_000054_create_daily_monitoring::Migration), + Box::new(m20260425_000055_points_checkin_standard_fields::Migration), ] } } diff --git a/crates/erp-server/migration/src/m20260425_000055_points_checkin_standard_fields.rs b/crates/erp-server/migration/src/m20260425_000055_points_checkin_standard_fields.rs new file mode 100644 index 0000000..4725b59 --- /dev/null +++ b/crates/erp-server/migration/src/m20260425_000055_points_checkin_standard_fields.rs @@ -0,0 +1,52 @@ +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +/// 为 points_checkin 表补全标准字段:updated_at, created_by, updated_by, deleted_at, version +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + Table::alter() + .table(Alias::new("points_checkin")) + .add_column( + ColumnDef::new(Alias::new("updated_at")) + .timestamp_with_time_zone() + .not_null() + .default(Expr::current_timestamp()), + ) + .add_column(ColumnDef::new(Alias::new("created_by")).uuid()) + .add_column(ColumnDef::new(Alias::new("updated_by")).uuid()) + .add_column(ColumnDef::new(Alias::new("deleted_at")).timestamp_with_time_zone()) + .add_column( + ColumnDef::new(Alias::new("version")) + .integer() + .not_null() + .default(1), + ) + .to_owned(), + ) + .await?; + + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + Table::alter() + .table(Alias::new("points_checkin")) + .drop_column(Alias::new("updated_at")) + .drop_column(Alias::new("created_by")) + .drop_column(Alias::new("updated_by")) + .drop_column(Alias::new("deleted_at")) + .drop_column(Alias::new("version")) + .to_owned(), + ) + .await?; + + Ok(()) + } +}