diff --git a/crates/erp-auth/src/service/auth_service.rs b/crates/erp-auth/src/service/auth_service.rs index db96f6e..7e4ef30 100644 --- a/crates/erp-auth/src/service/auth_service.rs +++ b/crates/erp-auth/src/service/auth_service.rs @@ -161,7 +161,7 @@ impl AuthService { event_bus.publish(erp_core::events::DomainEvent::new( "user.login", tenant_id, - serde_json::json!({ "user_id": user_model.id, "username": user_model.username }), + erp_core::events::build_event_payload(serde_json::json!({ "user_id": user_model.id, "username": user_model.username })), ), db).await; // 审计:登录成功 diff --git a/crates/erp-core/src/events.rs b/crates/erp-core/src/events.rs index 21c1ac0..a01cc4a 100644 --- a/crates/erp-core/src/events.rs +++ b/crates/erp-core/src/events.rs @@ -31,6 +31,28 @@ impl DomainEvent { } } +/// 当前事件 payload schema 版本 +pub const EVENT_SCHEMA_VERSION: &str = "v1"; + +/// 构造统一信封格式的事件 payload。 +/// +/// 自动注入 `schema_version` 和 `occurred_at`,业务数据通过 `data` 传入。 +/// 用法:`build_event_payload(serde_json::json!({ "patient_id": ..., }))` +pub fn build_event_payload(data: serde_json::Value) -> serde_json::Value { + let mut envelope = serde_json::json!({ + "schema_version": EVENT_SCHEMA_VERSION, + "occurred_at": Utc::now().to_rfc3339(), + }); + if let serde_json::Value::Object(ref mut map) = envelope { + if let serde_json::Value::Object(data_map) = data { + for (k, v) in data_map { + map.insert(k, v); + } + } + } + envelope +} + /// 过滤事件接收器 — 只接收匹配 `event_type_prefix` 的事件 pub struct FilteredEventReceiver { receiver: mpsc::Receiver, diff --git a/crates/erp-health/src/service/alert_engine.rs b/crates/erp-health/src/service/alert_engine.rs index a2aa4c7..9e506ee 100644 --- a/crates/erp-health/src/service/alert_engine.rs +++ b/crates/erp-health/src/service/alert_engine.rs @@ -200,14 +200,14 @@ async fn create_alert_and_notify( let event = erp_core::events::DomainEvent::new( crate::event::ALERT_TRIGGERED, tenant_id, - json!({ + erp_core::events::build_event_payload(json!({ "alert_id": alert.id, "patient_id": patient_id, "rule_name": rule.name, "severity": rule.severity, "detail": alert.detail, "notify_roles": rule.notify_roles, - }), + })), ); event_bus.publish(event, db).await; diff --git a/crates/erp-health/src/service/appointment_service.rs b/crates/erp-health/src/service/appointment_service.rs index 489e9b7..1cc2196 100644 --- a/crates/erp-health/src/service/appointment_service.rs +++ b/crates/erp-health/src/service/appointment_service.rs @@ -211,7 +211,7 @@ pub async fn create_appointment( let event = DomainEvent::new( crate::event::APPOINTMENT_CREATED, tenant_id, - serde_json::json!({ "appointment_id": m.id, "patient_id": m.patient_id, "status": m.status }), + erp_core::events::build_event_payload(serde_json::json!({ "appointment_id": m.id, "patient_id": m.patient_id, "status": m.status })), ); state.event_bus.publish(event, &state.db).await; @@ -299,7 +299,7 @@ pub async fn update_appointment_status( let event = DomainEvent::new( event_type, tenant_id, - serde_json::json!({ "appointment_id": m.id, "patient_id": m.patient_id, "status": m.status }), + erp_core::events::build_event_payload(serde_json::json!({ "appointment_id": m.id, "patient_id": m.patient_id, "status": m.status })), ); state.event_bus.publish(event, &state.db).await; @@ -573,13 +573,13 @@ pub async fn send_reminders( DomainEvent::new( "appointment.reminder", app.tenant_id, - json!({ + erp_core::events::build_event_payload(json!({ "appointment_id": app.id, "patient_id": app.patient_id, "doctor_id": app.doctor_id, "appointment_date": app.appointment_date, "time_slot": format!("{}-{}", app.start_time.format("%H:%M"), app.end_time.format("%H:%M")), - }), + })), ), db, ).await; diff --git a/crates/erp-health/src/service/article_service.rs b/crates/erp-health/src/service/article_service.rs index a6e8150..018eb00 100644 --- a/crates/erp-health/src/service/article_service.rs +++ b/crates/erp-health/src/service/article_service.rs @@ -186,9 +186,9 @@ pub async fn approve_article( ).await; state.event_bus.publish( - DomainEvent::new(crate::event::ARTICLE_PUBLISHED, tenant_id, serde_json::json!({ + DomainEvent::new(crate::event::ARTICLE_PUBLISHED, tenant_id, erp_core::events::build_event_payload(serde_json::json!({ "article_id": m.id, "title": m.title, "category": m.category, - })), + }))), &state.db, ).await; @@ -229,9 +229,9 @@ pub async fn reject_article( ).await; state.event_bus.publish( - DomainEvent::new(crate::event::ARTICLE_REJECTED, tenant_id, serde_json::json!({ + DomainEvent::new(crate::event::ARTICLE_REJECTED, tenant_id, erp_core::events::build_event_payload(serde_json::json!({ "article_id": m.id, "title": m.title, - })), + }))), &state.db, ).await; diff --git a/crates/erp-health/src/service/consent_service.rs b/crates/erp-health/src/service/consent_service.rs index 4735931..96fb855 100644 --- a/crates/erp-health/src/service/consent_service.rs +++ b/crates/erp-health/src/service/consent_service.rs @@ -109,10 +109,10 @@ pub async fn grant_consent( ).await; state.event_bus.publish( - DomainEvent::new(crate::event::CONSENT_GRANTED, tenant_id, serde_json::json!({ + DomainEvent::new(crate::event::CONSENT_GRANTED, tenant_id, erp_core::events::build_event_payload(serde_json::json!({ "consent_id": m.id, "patient_id": m.patient_id, "consent_type": m.consent_type, "consent_scope": m.consent_scope, - })), + }))), &state.db, ).await; @@ -155,10 +155,10 @@ pub async fn revoke_consent( ).await; state.event_bus.publish( - DomainEvent::new(crate::event::CONSENT_REVOKED, tenant_id, serde_json::json!({ + DomainEvent::new(crate::event::CONSENT_REVOKED, tenant_id, erp_core::events::build_event_payload(serde_json::json!({ "consent_id": m.id, "patient_id": m.patient_id, "consent_type": m.consent_type, - })), + }))), &state.db, ).await; diff --git a/crates/erp-health/src/service/consultation_service.rs b/crates/erp-health/src/service/consultation_service.rs index 2b3e6a9..c484b5d 100644 --- a/crates/erp-health/src/service/consultation_service.rs +++ b/crates/erp-health/src/service/consultation_service.rs @@ -77,7 +77,7 @@ pub async fn create_session( let event = DomainEvent::new( crate::event::CONSULTATION_OPENED, tenant_id, - serde_json::json!({ "session_id": m.id, "patient_id": m.patient_id }), + erp_core::events::build_event_payload(serde_json::json!({ "session_id": m.id, "patient_id": m.patient_id })), ); state.event_bus.publish(event, &state.db).await; @@ -175,7 +175,7 @@ pub async fn close_session( let event = DomainEvent::new( crate::event::CONSULTATION_CLOSED, tenant_id, - serde_json::json!({ "session_id": m.id, "patient_id": m.patient_id }), + erp_core::events::build_event_payload(serde_json::json!({ "session_id": m.id, "patient_id": m.patient_id })), ); state.event_bus.publish(event, &state.db).await; diff --git a/crates/erp-health/src/service/daily_monitoring_service.rs b/crates/erp-health/src/service/daily_monitoring_service.rs index aab797d..57ce25e 100644 --- a/crates/erp-health/src/service/daily_monitoring_service.rs +++ b/crates/erp-health/src/service/daily_monitoring_service.rs @@ -102,13 +102,13 @@ pub async fn create_daily_monitoring( let event = DomainEvent::new( DAILY_MONITORING_CREATED, tenant_id, - serde_json::json!({ + erp_core::events::build_event_payload(serde_json::json!({ "record_id": dm.id, "patient_id": dm.patient_id, "record_date": dm.record_date, "weight": dm.weight, "blood_sugar": dm.blood_sugar, - }), + })), ); state.event_bus.publish(event, &state.db).await; diff --git a/crates/erp-health/src/service/device_reading_service.rs b/crates/erp-health/src/service/device_reading_service.rs index b131a5e..d3c41f8 100644 --- a/crates/erp-health/src/service/device_reading_service.rs +++ b/crates/erp-health/src/service/device_reading_service.rs @@ -127,7 +127,7 @@ pub async fn batch_create_readings( let event = DomainEvent::new( crate::event::DEVICE_READINGS_SYNCED, tenant_id, - serde_json::json!({ + erp_core::events::build_event_payload(serde_json::json!({ "patient_id": patient_id, "count": inserted, "device_model": req.device_model, @@ -135,7 +135,7 @@ pub async fn batch_create_readings( "from": earliest.map(|t| t.to_rfc3339()), "to": latest.map(|t| t.to_rfc3339()), } - }), + })), ); state.event_bus.publish(event, &state.db).await; diff --git a/crates/erp-health/src/service/doctor_service.rs b/crates/erp-health/src/service/doctor_service.rs index 1e48d5f..9510c59 100644 --- a/crates/erp-health/src/service/doctor_service.rs +++ b/crates/erp-health/src/service/doctor_service.rs @@ -172,7 +172,7 @@ pub async fn update_doctor( let event = erp_core::events::DomainEvent::new( crate::event::DOCTOR_ONLINE_STATUS_CHANGED, tenant_id, - serde_json::json!({ "doctor_id": id, "old_status": old_online_status, "new_status": v }), + erp_core::events::build_event_payload(serde_json::json!({ "doctor_id": id, "old_status": old_online_status, "new_status": v })), ); state.event_bus.publish(event, &state.db).await; } diff --git a/crates/erp-health/src/service/follow_up_service.rs b/crates/erp-health/src/service/follow_up_service.rs index 3a6d500..c3d994d 100644 --- a/crates/erp-health/src/service/follow_up_service.rs +++ b/crates/erp-health/src/service/follow_up_service.rs @@ -167,7 +167,7 @@ pub async fn create_task( let event = DomainEvent::new( crate::event::FOLLOW_UP_CREATED, tenant_id, - serde_json::json!({ "task_id": m.id, "patient_id": m.patient_id }), + erp_core::events::build_event_payload(serde_json::json!({ "task_id": m.id, "patient_id": m.patient_id })), ); state.event_bus.publish(event, &state.db).await; @@ -359,7 +359,7 @@ pub async fn batch_create_tasks( DomainEvent::new( crate::event::FOLLOW_UP_CREATED, tenant_id, - serde_json::json!({ "batch_count": count, "assigned_to": req.assigned_to }), + erp_core::events::build_event_payload(serde_json::json!({ "batch_count": count, "assigned_to": req.assigned_to })), ), &state.db, ).await; @@ -487,7 +487,7 @@ pub async fn batch_complete_tasks( DomainEvent::new( crate::event::FOLLOW_UP_COMPLETED, tenant_id, - serde_json::json!({ "batch_count": succeeded }), + erp_core::events::build_event_payload(serde_json::json!({ "batch_count": succeeded })), ), &state.db, ).await; @@ -594,7 +594,7 @@ pub async fn create_record( let event = DomainEvent::new( crate::event::FOLLOW_UP_COMPLETED, tenant_id, - serde_json::json!({ "task_id": record.task_id, "patient_id": task_patient_id }), + erp_core::events::build_event_payload(serde_json::json!({ "task_id": record.task_id, "patient_id": task_patient_id })), ); state.event_bus.publish(event, &state.db).await; @@ -777,12 +777,12 @@ pub async fn check_overdue_and_notify(state: &HealthState) -> HealthResult let event = erp_core::events::DomainEvent::new( crate::event::FOLLOW_UP_OVERDUE, task.tenant_id, - serde_json::json!({ + erp_core::events::build_event_payload(serde_json::json!({ "task_id": task.id, "patient_id": task.patient_id, "assigned_to": task.assigned_to, "planned_date": task.planned_date, - }), + })), ); state.event_bus.publish(event, db).await; } diff --git a/crates/erp-health/src/service/health_data_service.rs b/crates/erp-health/src/service/health_data_service.rs index 4a4719d..853e0fc 100644 --- a/crates/erp-health/src/service/health_data_service.rs +++ b/crates/erp-health/src/service/health_data_service.rs @@ -401,7 +401,7 @@ pub async fn create_lab_report( let event = DomainEvent::new( crate::event::LAB_REPORT_UPLOADED, tenant_id, - serde_json::json!({ "report_id": m.id, "patient_id": m.patient_id, "report_type": m.report_type }), + erp_core::events::build_event_payload(serde_json::json!({ "report_id": m.id, "patient_id": m.patient_id, "report_type": m.report_type })), ); state.event_bus.publish(event, &state.db).await; @@ -929,7 +929,7 @@ async fn check_vital_signs_alert( let event = DomainEvent::new( crate::event::HEALTH_DATA_CRITICAL_ALERT, tenant_id, - payload, + erp_core::events::build_event_payload(payload), ); state.event_bus.publish(event, &state.db).await; tracing::warn!( diff --git a/crates/erp-health/src/service/patient_service.rs b/crates/erp-health/src/service/patient_service.rs index 85c4a7b..214dd35 100644 --- a/crates/erp-health/src/service/patient_service.rs +++ b/crates/erp-health/src/service/patient_service.rs @@ -173,7 +173,7 @@ pub async fn create_patient( let event = DomainEvent::new( crate::event::PATIENT_CREATED, tenant_id, - serde_json::json!({ "patient_id": model.id }), + erp_core::events::build_event_payload(serde_json::json!({ "patient_id": model.id })), ); state.event_bus.publish(event, &state.db).await; @@ -296,7 +296,7 @@ pub async fn update_patient( let event = DomainEvent::new( event_type, tenant_id, - serde_json::json!({ "patient_id": updated.id }), + erp_core::events::build_event_payload(serde_json::json!({ "patient_id": updated.id })), ); state.event_bus.publish(event, &state.db).await; diff --git a/crates/erp-health/src/service/points_service.rs b/crates/erp-health/src/service/points_service.rs index b7e416a..3086848 100644 --- a/crates/erp-health/src/service/points_service.rs +++ b/crates/erp-health/src/service/points_service.rs @@ -192,10 +192,10 @@ pub async fn earn_points( ).await; state.event_bus.publish( - DomainEvent::new(crate::event::POINTS_EARNED, tenant_id, serde_json::json!({ + DomainEvent::new(crate::event::POINTS_EARNED, tenant_id, erp_core::events::build_event_payload(serde_json::json!({ "transaction_id": inserted.id, "account_id": inserted.account_id, "amount": inserted.amount, "balance_after": inserted.balance_after, - })), + }))), &state.db, ).await; @@ -933,10 +933,10 @@ pub async fn exchange_product( ).await; state.event_bus.publish( - DomainEvent::new(crate::event::POINTS_EXCHANGED, tenant_id, serde_json::json!({ + DomainEvent::new(crate::event::POINTS_EXCHANGED, tenant_id, erp_core::events::build_event_payload(serde_json::json!({ "order_id": inserted_order.id, "patient_id": inserted_order.patient_id, "product_id": inserted_order.product_id, "points_cost": inserted_order.points_cost, - })), + }))), &state.db, ).await; @@ -1796,7 +1796,7 @@ pub async fn expire_points(db: &sea_orm::DatabaseConnection, event_bus: &erp_cor let event = erp_core::events::DomainEvent::new( crate::event::POINTS_EXPIRED, tenant_id, - serde_json::json!({ "expired_count": processed }), + erp_core::events::build_event_payload(serde_json::json!({ "expired_count": processed })), ); event_bus.publish(event, db).await; } diff --git a/crates/erp-workflow/src/module.rs b/crates/erp-workflow/src/module.rs index 0a9cab4..03744e5 100644 --- a/crates/erp-workflow/src/module.rs +++ b/crates/erp-workflow/src/module.rs @@ -115,11 +115,11 @@ impl WorkflowModule { let event = erp_core::events::DomainEvent::new( "task.timeout", *tenant_id, - serde_json::json!({ + erp_core::events::build_event_payload(serde_json::json!({ "task_id": task_id, "instance_id": instance_id, "assignee_id": assignee_id, - }), + })), ); event_bus.publish(event, &db).await; } diff --git a/crates/erp-workflow/src/service/instance_service.rs b/crates/erp-workflow/src/service/instance_service.rs index dc4dc09..665119a 100644 --- a/crates/erp-workflow/src/service/instance_service.rs +++ b/crates/erp-workflow/src/service/instance_service.rs @@ -126,7 +126,7 @@ impl InstanceService { event_bus.publish(erp_core::events::DomainEvent::new( "process_instance.started", tenant_id, - serde_json::json!({ "instance_id": instance_id, "definition_id": definition.id, "started_by": operator_id }), + erp_core::events::build_event_payload(serde_json::json!({ "instance_id": instance_id, "definition_id": definition.id, "started_by": operator_id })), ), db).await; audit_service::record( @@ -323,7 +323,7 @@ impl InstanceService { tenant_id: Set(tenant_id), event_type: Set(event_type), payload: Set(Some( - serde_json::json!({ "instance_id": id, "changed_by": operator_id }), + erp_core::events::build_event_payload(serde_json::json!({ "instance_id": id, "changed_by": operator_id })), )), correlation_id: Set(Some(Uuid::now_v7())), status: Set("pending".to_string()),