use chrono::Utc; use sea_orm::{ ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter, QueryOrder, QuerySelect, Set, }; use sqlx::postgres::PgListener; use std::time::Duration; use erp_core::entity::domain_event; use erp_core::events::{DomainEvent, EventBus}; const MAX_RETRY: i32 = 5; const FALLBACK_POLL_INTERVAL_SECS: u64 = 30; const NOTIFY_CHANNEL: &str = "outbox_channel"; const RECONNECT_DELAY_SECS: u64 = 5; /// 启动 outbox relay 后台任务。 /// /// 先执行一次性扫描(处理服务重启前遗留的 pending 事件), /// 然后通过 PostgreSQL LISTEN/NOTIFY 监听新事件,配合 30s 兜底轮询。 pub fn start_outbox_relay( db: sea_orm::DatabaseConnection, event_bus: EventBus, database_url: String, ) { let db_clone = db.clone(); let event_bus_clone = event_bus.clone(); let url = database_url.clone(); tokio::spawn(async move { // 启动时立即处理一次(恢复重启前未广播的事件) match process_pending_events(&db_clone, &event_bus_clone).await { Ok(count) if count > 0 => tracing::info!(count = count, "启动时 outbox relay 恢复完成"), Ok(_) => tracing::info!("启动时 outbox relay 无待处理事件"), Err(e) => tracing::warn!(error = %e, "启动时 outbox relay 处理失败"), } // 进入 LISTEN/NOTIFY 主循环(带自动重连) loop { if let Err(e) = run_listener(&db_clone, &event_bus_clone, &url).await { tracing::warn!(error = %e, "PgListener 断开连接,{}s 后重连", RECONNECT_DELAY_SECS); } tokio::time::sleep(Duration::from_secs(RECONNECT_DELAY_SECS)).await; // 重连后执行一次兜底扫描 if let Err(e) = process_pending_events(&db_clone, &event_bus_clone).await { tracing::warn!(error = %e, "重连后 outbox relay 处理失败"); } } }); } /// 运行 PgListener 监听循环。 /// /// 使用 `tokio::select!` 在 LISTEN 通知和 30s 定时器之间竞争, /// 确保即使 NOTIFY 丢失也能兜底处理。 async fn run_listener( db: &sea_orm::DatabaseConnection, event_bus: &EventBus, database_url: &str, ) -> Result<(), sqlx::Error> { let mut listener = PgListener::connect(database_url).await?; listener.listen(NOTIFY_CHANNEL).await?; tracing::info!("Outbox relay LISTEN/NOTIFY 已连接,监听 {}", NOTIFY_CHANNEL); let mut fallback = tokio::time::interval(Duration::from_secs(FALLBACK_POLL_INTERVAL_SECS)); loop { tokio::select! { // LISTEN/NOTIFY 通知触发 notification = listener.recv() => { match notification { Ok(notif) => { tracing::debug!( channel = %notif.channel(), payload = %notif.payload(), "收到 outbox NOTIFY" ); if let Err(e) = process_pending_events(db, event_bus).await { tracing::warn!(error = %e, "NOTIFY 触发的 outbox 处理失败"); } } Err(e) => return Err(e), } } // 30s 兜底轮询 _ = fallback.tick() => { tracing::debug!("outbox relay 兜底轮询触发"); if let Err(e) = process_pending_events(db, event_bus).await { tracing::warn!(error = %e, "兜底轮询 outbox 处理失败"); } } } } } async fn process_pending_events( db: &sea_orm::DatabaseConnection, event_bus: &EventBus, ) -> Result { let pending = domain_event::Entity::find() .filter(domain_event::Column::Status.eq("pending")) .filter(domain_event::Column::Attempts.lt(MAX_RETRY)) .order_by_asc(domain_event::Column::CreatedAt) .limit(100) .all(db) .await?; if pending.is_empty() { return Ok(0); } let count = pending.len(); tracing::info!(count = count, "处理待发领域事件"); for event_model in pending { // 重建 DomainEvent 并广播(保留原始 ID 和时间戳) let domain_event = DomainEvent { id: event_model.id, event_type: event_model.event_type.clone(), tenant_id: event_model.tenant_id, payload: event_model.payload.clone().unwrap_or(serde_json::json!({})), timestamp: event_model.created_at, correlation_id: event_model.correlation_id.unwrap_or(event_model.id), }; event_bus.broadcast(domain_event); // 标记为 published,增加 attempts 计数 let mut active: domain_event::ActiveModel = event_model.into(); active.status = Set("published".to_string()); active.published_at = Set(Some(Utc::now())); active.attempts = Set(erp_core::sea_orm_ext::bump_version(&active.attempts)); active.update(db).await?; } Ok(count) }