refactor(health): 集中管理事件类型常量 + 积分过期发布事件
- event.rs 新增 20 个事件类型常量(PATIENT_CREATED 等) - 10 个 service 文件引用常量替代硬编码字符串 - expire_points 增加 EventBus 参数,处理完成后发布 points.expired 事件 - start_points_expiration_checker 传入 EventBus
This commit is contained in:
@@ -1,6 +1,45 @@
|
||||
use erp_core::events::EventBus;
|
||||
use uuid::Uuid;
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// 事件类型常量 — 集中管理,避免硬编码字符串散布在 service 层
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
// 预约
|
||||
pub const APPOINTMENT_CREATED: &str = "appointment.created";
|
||||
// appointment.confirmed / appointment.cancelled 等 — 动态拼接
|
||||
|
||||
// 告警
|
||||
pub const ALERT_TRIGGERED: &str = "alert.triggered";
|
||||
|
||||
// 咨询
|
||||
pub const CONSULTATION_OPENED: &str = "consultation.opened";
|
||||
pub const CONSULTATION_CLOSED: &str = "consultation.closed";
|
||||
|
||||
// 设备数据
|
||||
pub const DEVICE_READINGS_SYNCED: &str = "device.readings.synced";
|
||||
|
||||
// 医生
|
||||
pub const DOCTOR_ONLINE_STATUS_CHANGED: &str = "doctor.online_status_changed";
|
||||
|
||||
// 随访
|
||||
pub const FOLLOW_UP_CREATED: &str = "follow_up.created";
|
||||
pub const FOLLOW_UP_COMPLETED: &str = "follow_up.completed";
|
||||
pub const FOLLOW_UP_OVERDUE: &str = "follow_up.overdue";
|
||||
|
||||
// 健康数据
|
||||
pub const LAB_REPORT_UPLOADED: &str = "lab_report.uploaded";
|
||||
pub const HEALTH_DATA_CRITICAL_ALERT: &str = "health_data.critical_alert";
|
||||
|
||||
// 患者
|
||||
pub const PATIENT_CREATED: &str = "patient.created";
|
||||
pub const PATIENT_UPDATED: &str = "patient.updated";
|
||||
pub const PATIENT_VERIFIED: &str = "patient.verified";
|
||||
pub const PATIENT_DECEASED: &str = "patient.deceased";
|
||||
|
||||
// 积分
|
||||
pub const POINTS_EXPIRED: &str = "points.expired";
|
||||
|
||||
/// 兼容旧签名 — 不做任何实际订阅(逻辑已迁移到 on_startup)
|
||||
pub fn register_handlers(_bus: &EventBus) {
|
||||
// 事件处理器已迁移到 on_startup → register_handlers_with_state
|
||||
@@ -79,7 +118,7 @@ pub fn register_handlers_with_state(state: crate::state::HealthState) {
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
match reading_rx.recv().await {
|
||||
Some(event) if event.event_type == "device.readings.synced" => {
|
||||
Some(event) if event.event_type == DEVICE_READINGS_SYNCED => {
|
||||
let patient_id = event.payload.get("patient_id")
|
||||
.and_then(|v| v.as_str())
|
||||
.and_then(|s| Uuid::parse_str(s).ok());
|
||||
|
||||
@@ -41,13 +41,13 @@ impl HealthModule {
|
||||
}
|
||||
|
||||
/// 启动积分过期清理(每 24 小时运行一次),返回 JoinHandle 用于优雅关闭
|
||||
pub fn start_points_expiration_checker(db: sea_orm::DatabaseConnection) -> tokio::task::JoinHandle<()> {
|
||||
pub fn start_points_expiration_checker(db: sea_orm::DatabaseConnection, event_bus: erp_core::events::EventBus) -> tokio::task::JoinHandle<()> {
|
||||
tokio::spawn(async move {
|
||||
let mut interval = tokio::time::interval(std::time::Duration::from_secs(24 * 3600));
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = interval.tick() => {
|
||||
match crate::service::points_service::expire_points(&db).await {
|
||||
match crate::service::points_service::expire_points(&db, &event_bus).await {
|
||||
Ok(count) if count > 0 => tracing::info!(count = count, "积分过期清理完成"),
|
||||
Ok(_) => {}
|
||||
Err(e) => tracing::warn!(error = %e, "积分过期清理失败"),
|
||||
@@ -641,15 +641,16 @@ impl ErpModule for HealthModule {
|
||||
// 启动积分过期清理(启动时执行一次 + 每 24 小时重复)
|
||||
{
|
||||
let db = ctx.db.clone();
|
||||
let event_bus = ctx.event_bus.clone();
|
||||
tokio::spawn(async move {
|
||||
match crate::service::points_service::expire_points(&db).await {
|
||||
match crate::service::points_service::expire_points(&db, &event_bus).await {
|
||||
Ok(count) if count > 0 => tracing::info!(count = count, "启动时积分过期清理完成"),
|
||||
Ok(_) => tracing::info!("启动时积分过期清理完成(无过期积分)"),
|
||||
Err(e) => tracing::warn!(error = %e, "启动时积分过期清理失败"),
|
||||
}
|
||||
});
|
||||
}
|
||||
let _expire_handle = Self::start_points_expiration_checker(ctx.db.clone());
|
||||
let _expire_handle = Self::start_points_expiration_checker(ctx.db.clone(), ctx.event_bus.clone());
|
||||
tracing::info!(module = "health", "Points expiration checker started");
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -198,7 +198,7 @@ async fn create_alert_and_notify(
|
||||
let alert = alert.insert(db).await?;
|
||||
|
||||
let event = erp_core::events::DomainEvent::new(
|
||||
"alert.triggered",
|
||||
crate::event::ALERT_TRIGGERED,
|
||||
tenant_id,
|
||||
json!({
|
||||
"alert_id": alert.id,
|
||||
|
||||
@@ -209,7 +209,7 @@ pub async fn create_appointment(
|
||||
txn.commit().await?;
|
||||
|
||||
let event = DomainEvent::new(
|
||||
"appointment.created",
|
||||
crate::event::APPOINTMENT_CREATED,
|
||||
tenant_id,
|
||||
serde_json::json!({ "appointment_id": m.id, "patient_id": m.patient_id, "status": m.status }),
|
||||
);
|
||||
|
||||
@@ -75,7 +75,7 @@ pub async fn create_session(
|
||||
let m = active.insert(&state.db).await?;
|
||||
|
||||
let event = DomainEvent::new(
|
||||
"consultation.opened",
|
||||
crate::event::CONSULTATION_OPENED,
|
||||
tenant_id,
|
||||
serde_json::json!({ "session_id": m.id, "patient_id": m.patient_id }),
|
||||
);
|
||||
@@ -173,7 +173,7 @@ pub async fn close_session(
|
||||
let m = active.update(&state.db).await?;
|
||||
|
||||
let event = DomainEvent::new(
|
||||
"consultation.closed",
|
||||
crate::event::CONSULTATION_CLOSED,
|
||||
tenant_id,
|
||||
serde_json::json!({ "session_id": m.id, "patient_id": m.patient_id }),
|
||||
);
|
||||
|
||||
@@ -125,7 +125,7 @@ pub async fn batch_create_readings(
|
||||
|
||||
// 6. 发布 EventBus 事件
|
||||
let event = DomainEvent::new(
|
||||
"device.readings.synced",
|
||||
crate::event::DEVICE_READINGS_SYNCED,
|
||||
tenant_id,
|
||||
serde_json::json!({
|
||||
"patient_id": patient_id,
|
||||
|
||||
@@ -170,7 +170,7 @@ pub async fn update_doctor(
|
||||
active.online_status = Set(v.clone());
|
||||
if old_online_status != *v {
|
||||
let event = erp_core::events::DomainEvent::new(
|
||||
"doctor.online_status_changed",
|
||||
crate::event::DOCTOR_ONLINE_STATUS_CHANGED,
|
||||
tenant_id,
|
||||
serde_json::json!({ "doctor_id": id, "old_status": old_online_status, "new_status": v }),
|
||||
);
|
||||
|
||||
@@ -142,7 +142,7 @@ pub async fn create_task(
|
||||
let m = active.insert(&state.db).await?;
|
||||
|
||||
let event = DomainEvent::new(
|
||||
"follow_up.created",
|
||||
crate::event::FOLLOW_UP_CREATED,
|
||||
tenant_id,
|
||||
serde_json::json!({ "task_id": m.id, "patient_id": m.patient_id }),
|
||||
);
|
||||
@@ -355,7 +355,7 @@ pub async fn create_record(
|
||||
txn.commit().await?;
|
||||
|
||||
let event = DomainEvent::new(
|
||||
"follow_up.completed",
|
||||
crate::event::FOLLOW_UP_COMPLETED,
|
||||
tenant_id,
|
||||
serde_json::json!({ "task_id": record.task_id, "patient_id": task_patient_id }),
|
||||
);
|
||||
@@ -538,7 +538,7 @@ pub async fn check_overdue_and_notify(state: &HealthState) -> HealthResult<u64>
|
||||
// 3. 只为本次新标记的任务发布事件
|
||||
for task in newly_overdue {
|
||||
let event = erp_core::events::DomainEvent::new(
|
||||
"follow_up.overdue",
|
||||
crate::event::FOLLOW_UP_OVERDUE,
|
||||
task.tenant_id,
|
||||
serde_json::json!({
|
||||
"task_id": task.id,
|
||||
|
||||
@@ -381,7 +381,7 @@ pub async fn create_lab_report(
|
||||
let m = active.insert(&state.db).await?;
|
||||
|
||||
let event = DomainEvent::new(
|
||||
"lab_report.uploaded",
|
||||
crate::event::LAB_REPORT_UPLOADED,
|
||||
tenant_id,
|
||||
serde_json::json!({ "report_id": m.id, "patient_id": m.patient_id, "report_type": m.report_type }),
|
||||
);
|
||||
@@ -909,7 +909,7 @@ async fn check_vital_signs_alert(
|
||||
}
|
||||
|
||||
let event = DomainEvent::new(
|
||||
"health_data.critical_alert",
|
||||
crate::event::HEALTH_DATA_CRITICAL_ALERT,
|
||||
tenant_id,
|
||||
payload,
|
||||
);
|
||||
|
||||
@@ -171,7 +171,7 @@ pub async fn create_patient(
|
||||
let model = active.insert(&state.db).await?;
|
||||
|
||||
let event = DomainEvent::new(
|
||||
"patient.created",
|
||||
crate::event::PATIENT_CREATED,
|
||||
tenant_id,
|
||||
serde_json::json!({ "patient_id": model.id }),
|
||||
);
|
||||
@@ -287,11 +287,11 @@ pub async fn update_patient(
|
||||
|
||||
// 根据状态变更发布不同事件
|
||||
let event_type = if req.status.as_deref() == Some("deceased") {
|
||||
"patient.deceased"
|
||||
crate::event::PATIENT_DECEASED
|
||||
} else if req.verification_status.as_deref() == Some("verified") {
|
||||
"patient.verified"
|
||||
crate::event::PATIENT_VERIFIED
|
||||
} else {
|
||||
"patient.updated"
|
||||
crate::event::PATIENT_UPDATED
|
||||
};
|
||||
let event = DomainEvent::new(
|
||||
event_type,
|
||||
|
||||
@@ -1699,7 +1699,7 @@ pub async fn get_points_statistics(
|
||||
|
||||
/// 扫描已过期的 earn 交易,扣减账户余额,更新 total_expired
|
||||
/// 返回处理的过期交易数量
|
||||
pub async fn expire_points(db: &sea_orm::DatabaseConnection) -> HealthResult<u64> {
|
||||
pub async fn expire_points(db: &sea_orm::DatabaseConnection, event_bus: &erp_core::events::EventBus) -> HealthResult<u64> {
|
||||
let now = Utc::now();
|
||||
|
||||
// 查找所有已过期但未标记 expired 的 earn 交易
|
||||
@@ -1717,6 +1717,8 @@ pub async fn expire_points(db: &sea_orm::DatabaseConnection) -> HealthResult<u64
|
||||
return Ok(0);
|
||||
}
|
||||
|
||||
let tenant_id = expired_txns.first().map(|t| t.tenant_id).unwrap_or_default();
|
||||
|
||||
let mut processed: u64 = 0;
|
||||
|
||||
for txn in expired_txns {
|
||||
@@ -1774,6 +1776,12 @@ pub async fn expire_points(db: &sea_orm::DatabaseConnection) -> HealthResult<u64
|
||||
|
||||
if processed > 0 {
|
||||
tracing::info!(count = processed, "积分过期清理完成");
|
||||
let event = erp_core::events::DomainEvent::new(
|
||||
crate::event::POINTS_EXPIRED,
|
||||
tenant_id,
|
||||
serde_json::json!({ "expired_count": processed }),
|
||||
);
|
||||
event_bus.publish(event, db).await;
|
||||
}
|
||||
|
||||
Ok(processed)
|
||||
|
||||
Reference in New Issue
Block a user