use chrono::Utc; use sea_orm::{ActiveModelTrait, Set}; use serde::{Deserialize, Serialize}; use tokio::sync::{broadcast, mpsc}; use tracing::{error, info}; use uuid::Uuid; use crate::entity::domain_event; /// 领域事件 #[derive(Debug, Clone, Serialize, Deserialize)] pub struct DomainEvent { pub id: Uuid, pub event_type: String, pub tenant_id: Uuid, pub payload: serde_json::Value, pub timestamp: chrono::DateTime, pub correlation_id: Uuid, } impl DomainEvent { pub fn new(event_type: impl Into, tenant_id: Uuid, payload: serde_json::Value) -> Self { Self { id: Uuid::now_v7(), event_type: event_type.into(), tenant_id, payload, timestamp: Utc::now(), correlation_id: Uuid::now_v7(), } } } /// 过滤事件接收器 — 只接收匹配 `event_type_prefix` 的事件 pub struct FilteredEventReceiver { receiver: mpsc::Receiver, } impl FilteredEventReceiver { /// 接收下一个匹配的事件 pub async fn recv(&mut self) -> Option { self.receiver.recv().await } } /// 订阅句柄 — 用于取消过滤订阅 pub struct SubscriptionHandle { cancel_tx: mpsc::Sender<()>, join_handle: tokio::task::JoinHandle<()>, } impl SubscriptionHandle { /// 取消订阅并等待后台任务结束 pub async fn cancel(self) { let _ = self.cancel_tx.send(()).await; let _ = self.join_handle.await; } } /// 进程内事件总线 #[derive(Clone)] pub struct EventBus { sender: broadcast::Sender, } impl EventBus { pub fn new(capacity: usize) -> Self { let (sender, _) = broadcast::channel(capacity); Self { sender } } /// 发布事件:先持久化到 domain_events 表,再内存广播。 /// /// 持久化失败时仅记录 warning,仍然广播(best-effort)。 pub async fn publish(&self, event: DomainEvent, db: &sea_orm::DatabaseConnection) { // 持久化到 domain_events 表 let model = domain_event::ActiveModel { id: Set(event.id), tenant_id: Set(event.tenant_id), event_type: Set(event.event_type.clone()), payload: Set(Some(event.payload.clone())), correlation_id: Set(Some(event.correlation_id)), status: Set("published".to_string()), attempts: Set(0), last_error: Set(None), created_at: Set(event.timestamp), published_at: Set(Some(Utc::now())), }; match model.insert(db).await { Ok(_) => {} Err(e) => { tracing::warn!(event_id = %event.id, error = %e, "领域事件持久化失败"); } } // 内存广播 self.broadcast(event); } /// 仅内存广播(不持久化,用于内部测试等场景)。 pub fn broadcast(&self, event: DomainEvent) { info!(event_type = %event.event_type, event_id = %event.id, "Event broadcast"); if let Err(e) = self.sender.send(event) { error!("Failed to broadcast event: {}", e); } } /// 订阅所有事件,返回接收端 pub fn subscribe(&self) -> broadcast::Receiver { self.sender.subscribe() } /// 按事件类型前缀过滤订阅。 /// /// 为每次调用 spawn 一个 Tokio task 从 broadcast channel 读取, /// 只转发匹配 `event_type_prefix` 的事件到 mpsc channel(capacity 256)。 pub fn subscribe_filtered( &self, event_type_prefix: String, ) -> (FilteredEventReceiver, SubscriptionHandle) { let mut broadcast_rx = self.sender.subscribe(); let (mpsc_tx, mpsc_rx) = mpsc::channel(256); let (cancel_tx, mut cancel_rx) = mpsc::channel::<()>(1); let prefix = event_type_prefix.clone(); let join_handle = tokio::spawn(async move { loop { tokio::select! { biased; _ = cancel_rx.recv() => { tracing::info!(prefix = %prefix, "Filtered subscription cancelled"); break; } event = broadcast_rx.recv() => { match event { Ok(event) => { if event.event_type.starts_with(&prefix) { if mpsc_tx.send(event).await.is_err() { break; } } } Err(broadcast::error::RecvError::Lagged(n)) => { tracing::warn!(prefix = %prefix, lagged = n, "Filtered subscriber lagged"); } Err(broadcast::error::RecvError::Closed) => { break; } } } } } }); tracing::info!(prefix = %event_type_prefix, "Filtered subscription created"); ( FilteredEventReceiver { receiver: mpsc_rx }, SubscriptionHandle { cancel_tx, join_handle, }, ) } }