use chrono::Utc; use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter, Set}; use std::time::Duration; use erp_core::entity::domain_event; use erp_core::events::{DomainEvent, EventBus}; /// 启动 outbox relay 后台任务。 /// /// 定期扫描 domain_events 表中 status = 'pending' 的事件, /// 重新广播并标记为 published。 pub fn start_outbox_relay(db: sea_orm::DatabaseConnection, event_bus: EventBus) { tokio::spawn(async move { let mut interval = tokio::time::interval(Duration::from_secs(5)); loop { interval.tick().await; if let Err(e) = process_pending_events(&db, &event_bus).await { tracing::warn!(error = %e, "Outbox relay 处理失败"); } } }); } async fn process_pending_events( db: &sea_orm::DatabaseConnection, event_bus: &EventBus, ) -> Result<(), sea_orm::DbErr> { let pending = domain_event::Entity::find() .filter(domain_event::Column::Status.eq("pending")) .filter(domain_event::Column::Attempts.lt(3)) .all(db) .await?; if pending.is_empty() { return Ok(()); } tracing::info!(count = pending.len(), "处理待发领域事件"); for event_model in pending { // 重建 DomainEvent 并广播 let domain_event = DomainEvent::new( &event_model.event_type, event_model.tenant_id, event_model.payload.clone().unwrap_or(serde_json::json!({})), ); event_bus.broadcast(domain_event); // 标记为 published let mut active: domain_event::ActiveModel = event_model.into(); active.status = Set("published".to_string()); active.published_at = Set(Some(Utc::now())); active.update(db).await?; } Ok(()) }