use chrono::Utc; use sea_orm::{ActiveModelTrait, ConnectionTrait, PaginatorTrait, Set}; use serde::{Deserialize, Serialize}; use tokio::sync::{broadcast, mpsc}; use tracing::{error, info}; use uuid::Uuid; use crate::entity::dead_letter_event; use crate::entity::domain_event; /// 领域事件 #[derive(Debug, Clone, Serialize, Deserialize)] pub struct DomainEvent { pub id: Uuid, pub event_type: String, pub tenant_id: Uuid, pub payload: serde_json::Value, pub timestamp: chrono::DateTime, pub correlation_id: Uuid, } impl DomainEvent { pub fn new(event_type: impl Into, tenant_id: Uuid, payload: serde_json::Value) -> Self { Self { id: Uuid::now_v7(), event_type: event_type.into(), tenant_id, payload, timestamp: Utc::now(), correlation_id: Uuid::now_v7(), } } } /// 当前事件 payload schema 版本 pub const EVENT_SCHEMA_VERSION: &str = "v1"; /// 构造统一信封格式的事件 payload。 /// /// 自动注入 `schema_version` 和 `occurred_at`,业务数据通过 `data` 传入。 /// 用法:`build_event_payload(serde_json::json!({ "patient_id": ..., }))` pub fn build_event_payload(data: serde_json::Value) -> serde_json::Value { let mut envelope = serde_json::json!({ "schema_version": EVENT_SCHEMA_VERSION, "occurred_at": Utc::now().to_rfc3339(), }); if let serde_json::Value::Object(ref mut map) = envelope && let serde_json::Value::Object(data_map) = data { for (k, v) in data_map { map.insert(k, v); } } envelope } /// 检查事件是否已被指定消费者处理。 /// /// 查询 `processed_events` 表判断 event_id + consumer_id 是否已存在。 pub async fn is_event_processed( db: &sea_orm::DatabaseConnection, event_id: Uuid, consumer_id: &str, ) -> Result { use sea_orm::{ColumnTrait, EntityTrait, QueryFilter}; let count = crate::entity::processed_event::Entity::find() .filter(crate::entity::processed_event::Column::EventId.eq(event_id)) .filter(crate::entity::processed_event::Column::ConsumerId.eq(consumer_id)) .count(db) .await?; Ok(count > 0) } /// 标记事件已被指定消费者处理。 /// /// 插入 `processed_events` 记录,重复插入会因主键冲突被安全忽略。 pub async fn mark_event_processed( db: &sea_orm::DatabaseConnection, event_id: Uuid, consumer_id: &str, ) -> Result<(), sea_orm::DbErr> { use sea_orm::ActiveModelTrait; use sea_orm::Set; let model = crate::entity::processed_event::ActiveModel { event_id: Set(event_id), consumer_id: Set(consumer_id.to_string()), processed_at: Set(Utc::now()), }; // INSERT ... ON CONFLICT DO NOTHING(主键冲突时安全忽略) match model.insert(db).await { Ok(_) => Ok(()), Err(e) => { // 唯一约束冲突 = 已处理,不是错误 if e.to_string().contains("duplicate") || e.to_string().contains("violates unique") { Ok(()) } else { Err(e) } } } } /// 消费事件 — 带幂等检查和 dead-letter 兜底。 /// /// 如果事件已被处理(幂等),返回 `ConsumeResult::AlreadyProcessed`。 /// 如果处理成功,标记为已处理并返回 `ConsumeResult::Success`。 /// 如果处理失败,将事件转入 dead_letter_events 表并返回 `ConsumeResult::DeadLettered`。 pub async fn consume_with_retry( db: &sea_orm::DatabaseConnection, event: &DomainEvent, consumer_id: &str, handler: F, ) -> ConsumeResult where F: FnOnce(&DomainEvent) -> Fut, Fut: std::future::Future>, { if is_event_processed(db, event.id, consumer_id) .await .unwrap_or(false) { return ConsumeResult::AlreadyProcessed; } match handler(event).await { Ok(()) => { if let Err(e) = mark_event_processed(db, event.id, consumer_id).await { tracing::warn!( event_id = %event.id, consumer_id = consumer_id, error = %e, "标记事件已处理失败(非致命)" ); } ConsumeResult::Success } Err(err) => { tracing::error!( event_id = %event.id, event_type = %event.event_type, consumer_id = consumer_id, error = %err, "事件消费失败,转入 dead-letter" ); if let Err(e) = insert_dead_letter(db, event, consumer_id, &err).await { tracing::error!( event_id = %event.id, error = %e, "Dead-letter 写入失败" ); } ConsumeResult::DeadLettered(err) } } } /// 消费结果 #[derive(Debug)] pub enum ConsumeResult { Success, AlreadyProcessed, DeadLettered(String), } /// 将失败事件写入 dead_letter_events 表 pub async fn insert_dead_letter( db: &sea_orm::DatabaseConnection, event: &DomainEvent, consumer_id: &str, error_msg: &str, ) -> Result<(), sea_orm::DbErr> { let model = dead_letter_event::ActiveModel { id: Set(Uuid::now_v7()), tenant_id: Set(Some(event.tenant_id)), original_event_id: Set(event.id), event_type: Set(event.event_type.clone()), payload: Set(Some(event.payload.clone())), consumer_id: Set(consumer_id.to_string()), attempts: Set(1), last_error: Set(Some(error_msg.to_string())), created_at: Set(Utc::now()), resolved_at: Set(None), }; model.insert(db).await?; Ok(()) } /// 过滤事件接收器 — 只接收匹配 `event_type_prefix` 的事件 pub struct FilteredEventReceiver { receiver: mpsc::Receiver, } impl FilteredEventReceiver { /// 接收下一个匹配的事件 pub async fn recv(&mut self) -> Option { self.receiver.recv().await } } /// 订阅句柄 — 用于取消过滤订阅 pub struct SubscriptionHandle { cancel_tx: mpsc::Sender<()>, join_handle: tokio::task::JoinHandle<()>, } impl SubscriptionHandle { /// 取消订阅并等待后台任务结束 pub async fn cancel(self) { let _ = self.cancel_tx.send(()).await; let _ = self.join_handle.await; } } /// 进程内事件总线 #[derive(Clone)] pub struct EventBus { sender: broadcast::Sender, } impl EventBus { pub fn new(capacity: usize) -> Self { let (sender, _) = broadcast::channel(capacity); Self { sender } } /// 发布事件:先持久化到 domain_events 表(pending 状态),再内存广播, /// 最后更新为 published 并 NOTIFY outbox relay。 /// /// 两阶段提交保证:即使广播后服务崩溃,事件仍为 pending 状态, /// 重启后 outbox relay 会重新广播。 pub async fn publish(&self, event: DomainEvent, db: &sea_orm::DatabaseConnection) { // 1. 持久化为 pending 状态 let event_id = event.id; let model = domain_event::ActiveModel { id: Set(event.id), tenant_id: Set(event.tenant_id), event_type: Set(event.event_type.clone()), payload: Set(Some(event.payload.clone())), correlation_id: Set(Some(event.correlation_id)), status: Set("pending".to_string()), attempts: Set(0), last_error: Set(None), created_at: Set(event.timestamp), published_at: Set(None), }; let saved = match model.insert(db).await { Ok(m) => m, Err(e) => { tracing::warn!(event_id = %event_id, error = %e, "领域事件持久化失败"); // 持久化失败仍然广播(best-effort) self.broadcast(event); return; } }; // 2. 内存广播 self.broadcast(event); // 3. 更新为 published let mut active: domain_event::ActiveModel = saved.into(); active.status = Set("published".to_string()); active.published_at = Set(Some(Utc::now())); if let Err(e) = active.update(db).await { tracing::warn!(event_id = %event_id, error = %e, "领域事件状态更新为 published 失败"); } // 4. NOTIFY outbox relay(通知 outbox relay 有新事件到达) let notify_sql = sea_orm::Statement::from_string( sea_orm::DatabaseBackend::Postgres, format!("NOTIFY outbox_channel, '{}'", event_id), ); if let Err(e) = db.execute(notify_sql).await { tracing::debug!(event_id = %event_id, error = %e, "NOTIFY outbox_channel 失败(非致命)"); } } /// 仅内存广播(不持久化,用于内部测试等场景)。 pub fn broadcast(&self, event: DomainEvent) { info!(event_type = %event.event_type, event_id = %event.id, "Event broadcast"); if let Err(e) = self.sender.send(event) { error!("Failed to broadcast event: {}", e); } } /// 订阅所有事件,返回接收端 pub fn subscribe(&self) -> broadcast::Receiver { self.sender.subscribe() } /// 按事件类型前缀过滤订阅。 /// /// 为每次调用 spawn 一个 Tokio task 从 broadcast channel 读取, /// 只转发匹配 `event_type_prefix` 的事件到 mpsc channel(capacity 256)。 pub fn subscribe_filtered( &self, event_type_prefix: String, ) -> (FilteredEventReceiver, SubscriptionHandle) { let mut broadcast_rx = self.sender.subscribe(); let (mpsc_tx, mpsc_rx) = mpsc::channel(256); let (cancel_tx, mut cancel_rx) = mpsc::channel::<()>(1); let prefix = event_type_prefix.clone(); let join_handle = tokio::spawn(async move { loop { tokio::select! { biased; _ = cancel_rx.recv() => { tracing::info!(prefix = %prefix, "Filtered subscription cancelled"); break; } event = broadcast_rx.recv() => { match event { Ok(event) => { if event.event_type.starts_with(&prefix) && mpsc_tx.send(event).await.is_err() { break; } } Err(broadcast::error::RecvError::Lagged(n)) => { tracing::warn!(prefix = %prefix, lagged = n, "Filtered subscriber lagged"); } Err(broadcast::error::RecvError::Closed) => { break; } } } } } }); tracing::info!(prefix = %event_type_prefix, "Filtered subscription created"); ( FilteredEventReceiver { receiver: mpsc_rx }, SubscriptionHandle { cancel_tx, join_handle, }, ) } }