438 lines
16 KiB
Rust
438 lines
16 KiB
Rust
//! 积分账户 Service — 获取/创建账户、积分获取、流水查询、积分统计
|
||
|
||
use chrono::{Duration, Utc};
|
||
use sea_orm::entity::prelude::*;
|
||
use sea_orm::sea_query::Expr;
|
||
use sea_orm::{ActiveValue::Set, TransactionTrait};
|
||
use uuid::Uuid;
|
||
|
||
use erp_core::audit::AuditLog;
|
||
use erp_core::audit_service;
|
||
use erp_core::events::DomainEvent;
|
||
use erp_core::types::PaginatedResponse;
|
||
|
||
use crate::dto::points_dto::*;
|
||
use crate::entity::{points_account, points_rule, points_transaction};
|
||
use crate::error::{PointsError, PointsResult};
|
||
use crate::state::PointsState;
|
||
|
||
// ---------------------------------------------------------------------------
|
||
// 内部辅助:获取或创建账户
|
||
// ---------------------------------------------------------------------------
|
||
|
||
/// 获取或创建患者的积分账户(支持事务和非事务连接)
|
||
pub(crate) async fn get_or_create_account<C: sea_orm::ConnectionTrait>(
|
||
db: &C,
|
||
tenant_id: Uuid,
|
||
patient_id: Uuid,
|
||
) -> PointsResult<points_account::Model> {
|
||
if let Some(acc) = points_account::Entity::find()
|
||
.filter(points_account::Column::TenantId.eq(tenant_id))
|
||
.filter(points_account::Column::PatientId.eq(patient_id))
|
||
.filter(points_account::Column::DeletedAt.is_null())
|
||
.one(db)
|
||
.await?
|
||
{
|
||
return Ok(acc);
|
||
}
|
||
let now = Utc::now();
|
||
let active = points_account::ActiveModel {
|
||
id: Set(Uuid::now_v7()),
|
||
tenant_id: Set(tenant_id),
|
||
patient_id: Set(patient_id),
|
||
balance: Set(0),
|
||
total_earned: Set(0),
|
||
total_spent: Set(0),
|
||
total_expired: Set(0),
|
||
version: Set(1),
|
||
created_at: Set(now),
|
||
updated_at: Set(now),
|
||
created_by: Set(None),
|
||
updated_by: Set(None),
|
||
deleted_at: Set(None),
|
||
};
|
||
Ok(active.insert(db).await?)
|
||
}
|
||
|
||
// ---------------------------------------------------------------------------
|
||
// 积分账户
|
||
// ---------------------------------------------------------------------------
|
||
|
||
/// 获取患者积分账户
|
||
pub async fn get_account(
|
||
state: &PointsState,
|
||
tenant_id: Uuid,
|
||
patient_id: Uuid,
|
||
) -> PointsResult<PointsAccountResp> {
|
||
let acc = get_or_create_account(&state.db, tenant_id, patient_id).await?;
|
||
Ok(PointsAccountResp {
|
||
id: acc.id,
|
||
patient_id: acc.patient_id,
|
||
balance: acc.balance,
|
||
total_earned: acc.total_earned,
|
||
total_spent: acc.total_spent,
|
||
total_expired: acc.total_expired,
|
||
created_at: acc.created_at,
|
||
updated_at: acc.updated_at,
|
||
version: acc.version,
|
||
})
|
||
}
|
||
|
||
// ---------------------------------------------------------------------------
|
||
// 积分获取(事件触发)
|
||
// ---------------------------------------------------------------------------
|
||
|
||
/// 核心方法:根据事件类型给患者加积分
|
||
pub async fn earn_points(
|
||
state: &PointsState,
|
||
tenant_id: Uuid,
|
||
patient_id: Uuid,
|
||
event_type: &str,
|
||
operator_id: Option<Uuid>,
|
||
) -> PointsResult<PointsTransactionResp> {
|
||
// 1. 查找匹配规则
|
||
let rule = points_rule::Entity::find()
|
||
.filter(points_rule::Column::TenantId.eq(tenant_id))
|
||
.filter(points_rule::Column::EventType.eq(event_type))
|
||
.filter(points_rule::Column::IsActive.eq(true))
|
||
.filter(points_rule::Column::DeletedAt.is_null())
|
||
.one(&state.db)
|
||
.await?
|
||
.ok_or_else(|| PointsError::Validation(format!("无匹配的积分规则: {}", event_type)))?;
|
||
|
||
// 2. 先获取/创建账户(需要 account_id 来做日上限查询)
|
||
let acc = get_or_create_account(&state.db, tenant_id, patient_id).await?;
|
||
|
||
// 3. 检查每日上限(用 account.id 而非 patient_id)
|
||
if rule.daily_cap > 0 {
|
||
let today = Utc::now().date_naive();
|
||
let today_start = today.and_hms_opt(0, 0, 0).unwrap().and_utc();
|
||
let earned_today: i32 = points_transaction::Entity::find()
|
||
.filter(points_transaction::Column::TenantId.eq(tenant_id))
|
||
.filter(points_transaction::Column::AccountId.eq(acc.id))
|
||
.filter(points_transaction::Column::TransactionType.eq("earn"))
|
||
.filter(points_transaction::Column::RuleId.eq(rule.id))
|
||
.filter(points_transaction::Column::CreatedAt.gte(today_start))
|
||
.all(&state.db)
|
||
.await?
|
||
.iter()
|
||
.map(|t| t.amount)
|
||
.sum();
|
||
|
||
if earned_today + rule.points_value > rule.daily_cap {
|
||
return Err(PointsError::Validation("今日该渠道积分已达上限".into()));
|
||
}
|
||
}
|
||
|
||
// 4. 在事务中执行积分获取
|
||
let txn = state.db.begin().await?;
|
||
// 重新读取账户以获取最新 version(事务内)
|
||
let acc = points_account::Entity::find_by_id(acc.id)
|
||
.one(&txn)
|
||
.await?
|
||
.ok_or(PointsError::Validation("积分账户不存在".into()))?;
|
||
|
||
// 使用数据库级 CAS 防止并发赚取导致余额丢失
|
||
let now = Utc::now();
|
||
let expires_at = now + Duration::days(365); // 12 个月过期
|
||
|
||
// 写入流水
|
||
let txn_record = points_transaction::ActiveModel {
|
||
id: Set(Uuid::now_v7()),
|
||
tenant_id: Set(tenant_id),
|
||
account_id: Set(acc.id),
|
||
transaction_type: Set("earn".to_string()),
|
||
amount: Set(rule.points_value),
|
||
remaining_amount: Set(rule.points_value),
|
||
status: Set("active".to_string()),
|
||
expires_at: Set(Some(expires_at)),
|
||
balance_after: Set(acc.balance + rule.points_value),
|
||
rule_id: Set(Some(rule.id)),
|
||
order_id: Set(None),
|
||
description: Set(Some(format!("{}: +{}", rule.name, rule.points_value))),
|
||
created_at: Set(now),
|
||
updated_at: Set(now),
|
||
created_by: Set(operator_id),
|
||
updated_by: Set(operator_id),
|
||
deleted_at: Set(None),
|
||
version: Set(1),
|
||
};
|
||
let inserted = txn_record.insert(&txn).await?;
|
||
|
||
// CAS 更新账户余额:基于 version 字段防止并发覆盖
|
||
let cas_result = points_account::Entity::update_many()
|
||
.col_expr(
|
||
points_account::Column::Balance,
|
||
Expr::col(points_account::Column::Balance).add(rule.points_value),
|
||
)
|
||
.col_expr(
|
||
points_account::Column::TotalEarned,
|
||
Expr::col(points_account::Column::TotalEarned).add(rule.points_value),
|
||
)
|
||
.col_expr(points_account::Column::UpdatedAt, Expr::value(now))
|
||
.col_expr(points_account::Column::UpdatedBy, Expr::value(operator_id))
|
||
.col_expr(
|
||
points_account::Column::Version,
|
||
Expr::col(points_account::Column::Version).add(1),
|
||
)
|
||
.filter(points_account::Column::Id.eq(acc.id))
|
||
.filter(points_account::Column::Version.eq(acc.version))
|
||
.exec(&txn)
|
||
.await?;
|
||
if cas_result.rows_affected == 0 {
|
||
txn.rollback().await?;
|
||
return Err(PointsError::VersionMismatch);
|
||
}
|
||
|
||
txn.commit().await?;
|
||
|
||
audit_service::record(
|
||
AuditLog::new(tenant_id, operator_id, "points.earned", "points_transaction")
|
||
.with_resource_id(inserted.id),
|
||
&state.db,
|
||
).await;
|
||
|
||
state.event_bus.publish(
|
||
DomainEvent::new(crate::event::POINTS_EARNED, tenant_id, erp_core::events::build_event_payload(serde_json::json!({
|
||
"transaction_id": inserted.id, "account_id": inserted.account_id,
|
||
"amount": inserted.amount, "balance_after": inserted.balance_after,
|
||
}))),
|
||
&state.db,
|
||
).await;
|
||
|
||
Ok(PointsTransactionResp {
|
||
id: inserted.id,
|
||
account_id: inserted.account_id,
|
||
transaction_type: inserted.transaction_type,
|
||
amount: inserted.amount,
|
||
remaining_amount: inserted.remaining_amount,
|
||
status: inserted.status,
|
||
expires_at: inserted.expires_at,
|
||
balance_after: inserted.balance_after,
|
||
description: inserted.description,
|
||
created_at: inserted.created_at,
|
||
})
|
||
}
|
||
|
||
// ---------------------------------------------------------------------------
|
||
// 积分流水查询
|
||
// ---------------------------------------------------------------------------
|
||
|
||
/// 查询积分流水(分页)
|
||
pub async fn list_transactions(
|
||
state: &PointsState,
|
||
tenant_id: Uuid,
|
||
patient_id: Uuid,
|
||
page: u64,
|
||
page_size: u64,
|
||
) -> PointsResult<PaginatedResponse<PointsTransactionResp>> {
|
||
let acc = get_or_create_account(&state.db, tenant_id, patient_id).await?;
|
||
let limit = page_size.min(100);
|
||
let offset = page.saturating_sub(1) * limit;
|
||
|
||
let query = points_transaction::Entity::find()
|
||
.filter(points_transaction::Column::TenantId.eq(tenant_id))
|
||
.filter(points_transaction::Column::AccountId.eq(acc.id));
|
||
|
||
let total = query.clone().count(&state.db).await?;
|
||
let models = query
|
||
.order_by_desc(points_transaction::Column::CreatedAt)
|
||
.offset(offset)
|
||
.limit(limit)
|
||
.all(&state.db)
|
||
.await?;
|
||
|
||
let total_pages = total.div_ceil(limit.max(1));
|
||
let data = models.into_iter().map(|m| PointsTransactionResp {
|
||
id: m.id, account_id: m.account_id, transaction_type: m.transaction_type,
|
||
amount: m.amount, remaining_amount: m.remaining_amount,
|
||
status: m.status, expires_at: m.expires_at,
|
||
balance_after: m.balance_after, description: m.description,
|
||
created_at: m.created_at,
|
||
}).collect();
|
||
|
||
Ok(PaginatedResponse { data, total, page, page_size: limit, total_pages })
|
||
}
|
||
|
||
// ---------------------------------------------------------------------------
|
||
// 积分统计 — 管理端
|
||
// ---------------------------------------------------------------------------
|
||
|
||
/// 管理端:积分统计汇总
|
||
pub async fn get_points_statistics(
|
||
state: &PointsState,
|
||
tenant_id: Uuid,
|
||
) -> PointsResult<PointsStatisticsResp> {
|
||
use sea_orm::FromQueryResult;
|
||
|
||
#[derive(Debug, FromQueryResult)]
|
||
struct AggRow {
|
||
total_issued: Option<i64>,
|
||
total_spent: Option<i64>,
|
||
total_expired: Option<i64>,
|
||
active_accounts: Option<i64>,
|
||
}
|
||
|
||
#[derive(Debug, FromQueryResult)]
|
||
struct TopEarnerRow {
|
||
id: Uuid,
|
||
patient_id: Uuid,
|
||
total_earned: Option<i32>,
|
||
}
|
||
|
||
// 聚合查询:总发放/总消费/总过期/活跃账户数
|
||
let agg_sql = r#"
|
||
SELECT
|
||
COALESCE(SUM(total_earned), 0) AS total_issued,
|
||
COALESCE(SUM(total_spent), 0) AS total_spent,
|
||
COALESCE(SUM(total_expired), 0) AS total_expired,
|
||
COUNT(*) AS active_accounts
|
||
FROM points_account
|
||
WHERE tenant_id = $1 AND deleted_at IS NULL
|
||
"#;
|
||
let agg = AggRow::find_by_statement(
|
||
sea_orm::Statement::from_sql_and_values(
|
||
sea_orm::DatabaseBackend::Postgres,
|
||
agg_sql,
|
||
[tenant_id.into()],
|
||
),
|
||
)
|
||
.one(&state.db)
|
||
.await?
|
||
.unwrap_or(AggRow {
|
||
total_issued: Some(0),
|
||
total_spent: Some(0),
|
||
total_expired: Some(0),
|
||
active_accounts: Some(0),
|
||
});
|
||
|
||
// Top 10 积分获取者
|
||
let top_sql = r#"
|
||
SELECT id, patient_id, total_earned
|
||
FROM points_account
|
||
WHERE tenant_id = $1 AND deleted_at IS NULL
|
||
ORDER BY total_earned DESC
|
||
LIMIT 10
|
||
"#;
|
||
let top_rows = TopEarnerRow::find_by_statement(
|
||
sea_orm::Statement::from_sql_and_values(
|
||
sea_orm::DatabaseBackend::Postgres,
|
||
top_sql,
|
||
[tenant_id.into()],
|
||
),
|
||
)
|
||
.all(&state.db)
|
||
.await?;
|
||
|
||
let top_earners = top_rows.into_iter().map(|r| TopEarner {
|
||
account_id: r.id,
|
||
patient_id: r.patient_id,
|
||
total_earned: r.total_earned.unwrap_or(0),
|
||
}).collect();
|
||
|
||
Ok(PointsStatisticsResp {
|
||
total_issued: agg.total_issued.unwrap_or(0),
|
||
total_spent: agg.total_spent.unwrap_or(0),
|
||
total_expired: agg.total_expired.unwrap_or(0),
|
||
active_accounts: agg.active_accounts.unwrap_or(0),
|
||
top_earners,
|
||
})
|
||
}
|
||
|
||
// ---------------------------------------------------------------------------
|
||
// 积分过期清理
|
||
// ---------------------------------------------------------------------------
|
||
|
||
/// 扫描已过期的 earn 交易,扣减账户余额,更新 total_expired。
|
||
/// 返回处理的过期交易数量。
|
||
pub async fn expire_points(
|
||
db: &sea_orm::DatabaseConnection,
|
||
event_bus: &erp_core::events::EventBus,
|
||
) -> PointsResult<u64> {
|
||
let now = Utc::now();
|
||
|
||
// 查找所有已过期但未标记 expired 的 earn 交易
|
||
let expired_txns: Vec<points_transaction::Model> = points_transaction::Entity::find()
|
||
.filter(points_transaction::Column::TransactionType.eq("earn"))
|
||
.filter(points_transaction::Column::Status.eq("active"))
|
||
.filter(points_transaction::Column::ExpiresAt.is_not_null())
|
||
.filter(points_transaction::Column::ExpiresAt.lt(now))
|
||
.filter(points_transaction::Column::DeletedAt.is_null())
|
||
.filter(points_transaction::Column::RemainingAmount.gt(0))
|
||
.all(db)
|
||
.await?;
|
||
|
||
if expired_txns.is_empty() {
|
||
return Ok(0);
|
||
}
|
||
|
||
let tenant_id = expired_txns.first().map(|t| t.tenant_id).unwrap_or_default();
|
||
|
||
let mut processed: u64 = 0;
|
||
|
||
for txn in expired_txns {
|
||
let txn_id = txn.id;
|
||
let account_id = txn.account_id;
|
||
let remaining = txn.remaining_amount;
|
||
|
||
let txn_result = db
|
||
.transaction::<_, (), PointsError>(|txn_db| {
|
||
Box::pin(async move {
|
||
// 标记交易为 expired
|
||
let mut active_txn: points_transaction::ActiveModel = txn.into();
|
||
active_txn.status = Set("expired".to_string());
|
||
active_txn.remaining_amount = Set(0);
|
||
active_txn.version = Set(active_txn.version.unwrap() + 1);
|
||
active_txn.updated_at = Set(Utc::now());
|
||
active_txn.update(txn_db).await?;
|
||
|
||
// 扣减账户余额,更新 total_expired
|
||
let account = points_account::Entity::find_by_id(account_id)
|
||
.one(txn_db)
|
||
.await?
|
||
.ok_or_else(|| PointsError::Validation("积分账户不存在".to_string()))?;
|
||
|
||
let new_balance = (account.balance - remaining).max(0);
|
||
let new_expired = account.total_expired + remaining;
|
||
|
||
let mut active_account: points_account::ActiveModel = account.into();
|
||
active_account.balance = Set(new_balance);
|
||
active_account.total_expired = Set(new_expired);
|
||
active_account.version = Set(active_account.version.unwrap() + 1);
|
||
active_account.updated_at = Set(Utc::now());
|
||
let expected_ver: i32 = match &active_account.version {
|
||
sea_orm::ActiveValue::Unchanged(v) | sea_orm::ActiveValue::Set(v) => *v,
|
||
_ => 0,
|
||
};
|
||
let _next_ver = erp_core::error::check_version(expected_ver, expected_ver)?;
|
||
active_account.update(txn_db).await?;
|
||
|
||
Ok(())
|
||
})
|
||
})
|
||
.await;
|
||
|
||
match txn_result {
|
||
Ok(()) => {
|
||
processed += 1;
|
||
tracing::debug!(txn_id = %txn_id, remaining = remaining, "积分过期处理完成");
|
||
}
|
||
Err(e) => {
|
||
tracing::warn!(txn_id = %txn_id, error = %e, "积分过期处理失败,跳过");
|
||
}
|
||
}
|
||
}
|
||
|
||
if processed > 0 {
|
||
tracing::info!(count = processed, "积分过期清理完成");
|
||
let event = erp_core::events::DomainEvent::new(
|
||
crate::event::POINTS_EXPIRED,
|
||
tenant_id,
|
||
erp_core::events::build_event_payload(serde_json::json!({ "expired_count": processed })),
|
||
);
|
||
event_bus.publish(event, db).await;
|
||
}
|
||
|
||
Ok(processed)
|
||
}
|