use chrono::Utc; use sea_orm::{ActiveModelTrait, ConnectionTrait, Set}; use serde::{Deserialize, Serialize}; use tokio::sync::{broadcast, mpsc}; use tracing::{error, info}; use uuid::Uuid; use crate::entity::domain_event; /// 领域事件 #[derive(Debug, Clone, Serialize, Deserialize)] pub struct DomainEvent { pub id: Uuid, pub event_type: String, pub tenant_id: Uuid, pub payload: serde_json::Value, pub timestamp: chrono::DateTime, pub correlation_id: Uuid, } impl DomainEvent { pub fn new(event_type: impl Into, tenant_id: Uuid, payload: serde_json::Value) -> Self { Self { id: Uuid::now_v7(), event_type: event_type.into(), tenant_id, payload, timestamp: Utc::now(), correlation_id: Uuid::now_v7(), } } } /// 当前事件 payload schema 版本 pub const EVENT_SCHEMA_VERSION: &str = "v1"; /// 构造统一信封格式的事件 payload。 /// /// 自动注入 `schema_version` 和 `occurred_at`,业务数据通过 `data` 传入。 /// 用法:`build_event_payload(serde_json::json!({ "patient_id": ..., }))` pub fn build_event_payload(data: serde_json::Value) -> serde_json::Value { let mut envelope = serde_json::json!({ "schema_version": EVENT_SCHEMA_VERSION, "occurred_at": Utc::now().to_rfc3339(), }); if let serde_json::Value::Object(ref mut map) = envelope { if let serde_json::Value::Object(data_map) = data { for (k, v) in data_map { map.insert(k, v); } } } envelope } /// 过滤事件接收器 — 只接收匹配 `event_type_prefix` 的事件 pub struct FilteredEventReceiver { receiver: mpsc::Receiver, } impl FilteredEventReceiver { /// 接收下一个匹配的事件 pub async fn recv(&mut self) -> Option { self.receiver.recv().await } } /// 订阅句柄 — 用于取消过滤订阅 pub struct SubscriptionHandle { cancel_tx: mpsc::Sender<()>, join_handle: tokio::task::JoinHandle<()>, } impl SubscriptionHandle { /// 取消订阅并等待后台任务结束 pub async fn cancel(self) { let _ = self.cancel_tx.send(()).await; let _ = self.join_handle.await; } } /// 进程内事件总线 #[derive(Clone)] pub struct EventBus { sender: broadcast::Sender, } impl EventBus { pub fn new(capacity: usize) -> Self { let (sender, _) = broadcast::channel(capacity); Self { sender } } /// 发布事件:先持久化到 domain_events 表(pending 状态),再内存广播, /// 最后更新为 published 并 NOTIFY outbox relay。 /// /// 两阶段提交保证:即使广播后服务崩溃,事件仍为 pending 状态, /// 重启后 outbox relay 会重新广播。 pub async fn publish(&self, event: DomainEvent, db: &sea_orm::DatabaseConnection) { // 1. 持久化为 pending 状态 let event_id = event.id; let model = domain_event::ActiveModel { id: Set(event.id), tenant_id: Set(event.tenant_id), event_type: Set(event.event_type.clone()), payload: Set(Some(event.payload.clone())), correlation_id: Set(Some(event.correlation_id)), status: Set("pending".to_string()), attempts: Set(0), last_error: Set(None), created_at: Set(event.timestamp), published_at: Set(None), }; let saved = match model.insert(db).await { Ok(m) => m, Err(e) => { tracing::warn!(event_id = %event_id, error = %e, "领域事件持久化失败"); // 持久化失败仍然广播(best-effort) self.broadcast(event); return; } }; // 2. 内存广播 self.broadcast(event); // 3. 更新为 published let mut active: domain_event::ActiveModel = saved.into(); active.status = Set("published".to_string()); active.published_at = Set(Some(Utc::now())); if let Err(e) = active.update(db).await { tracing::warn!(event_id = %event_id, error = %e, "领域事件状态更新为 published 失败"); } // 4. NOTIFY outbox relay(通知 outbox relay 有新事件到达) let notify_sql = sea_orm::Statement::from_string( sea_orm::DatabaseBackend::Postgres, format!("NOTIFY outbox_channel, '{}'", event_id), ); if let Err(e) = db.execute(notify_sql).await { tracing::debug!(event_id = %event_id, error = %e, "NOTIFY outbox_channel 失败(非致命)"); } } /// 仅内存广播(不持久化,用于内部测试等场景)。 pub fn broadcast(&self, event: DomainEvent) { info!(event_type = %event.event_type, event_id = %event.id, "Event broadcast"); if let Err(e) = self.sender.send(event) { error!("Failed to broadcast event: {}", e); } } /// 订阅所有事件,返回接收端 pub fn subscribe(&self) -> broadcast::Receiver { self.sender.subscribe() } /// 按事件类型前缀过滤订阅。 /// /// 为每次调用 spawn 一个 Tokio task 从 broadcast channel 读取, /// 只转发匹配 `event_type_prefix` 的事件到 mpsc channel(capacity 256)。 pub fn subscribe_filtered( &self, event_type_prefix: String, ) -> (FilteredEventReceiver, SubscriptionHandle) { let mut broadcast_rx = self.sender.subscribe(); let (mpsc_tx, mpsc_rx) = mpsc::channel(256); let (cancel_tx, mut cancel_rx) = mpsc::channel::<()>(1); let prefix = event_type_prefix.clone(); let join_handle = tokio::spawn(async move { loop { tokio::select! { biased; _ = cancel_rx.recv() => { tracing::info!(prefix = %prefix, "Filtered subscription cancelled"); break; } event = broadcast_rx.recv() => { match event { Ok(event) => { if event.event_type.starts_with(&prefix) { if mpsc_tx.send(event).await.is_err() { break; } } } Err(broadcast::error::RecvError::Lagged(n)) => { tracing::warn!(prefix = %prefix, lagged = n, "Filtered subscriber lagged"); } Err(broadcast::error::RecvError::Closed) => { break; } } } } } }); tracing::info!(prefix = %event_type_prefix, "Filtered subscription created"); ( FilteredEventReceiver { receiver: mpsc_rx }, SubscriptionHandle { cancel_tx, join_handle, }, ) } }