diff --git a/Cargo.toml b/Cargo.toml index de443a8..09863f6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,6 +38,7 @@ sea-orm = { version = "1.1", features = [ "sqlx-postgres", "runtime-tokio-rustls", "macros", "with-uuid", "with-chrono", "with-json" ] } sea-orm-migration = { version = "1.1", features = ["sqlx-postgres", "runtime-tokio-rustls"] } +sqlx = { version = "0.8", features = ["runtime-tokio-rustls", "postgres", "uuid"] } # Serialization serde = { version = "1", features = ["derive"] } diff --git a/crates/erp-core/src/events.rs b/crates/erp-core/src/events.rs index 79f1c3f..21c1ac0 100644 --- a/crates/erp-core/src/events.rs +++ b/crates/erp-core/src/events.rs @@ -1,5 +1,5 @@ use chrono::Utc; -use sea_orm::{ActiveModelTrait, Set}; +use sea_orm::{ActiveModelTrait, ConnectionTrait, Set}; use serde::{Deserialize, Serialize}; use tokio::sync::{broadcast, mpsc}; use tracing::{error, info}; @@ -70,7 +70,7 @@ impl EventBus { } /// 发布事件:先持久化到 domain_events 表(pending 状态),再内存广播, - /// 最后更新为 published。 + /// 最后更新为 published 并 NOTIFY outbox relay。 /// /// 两阶段提交保证:即使广播后服务崩溃,事件仍为 pending 状态, /// 重启后 outbox relay 会重新广播。 @@ -110,6 +110,15 @@ impl EventBus { if let Err(e) = active.update(db).await { tracing::warn!(event_id = %event_id, error = %e, "领域事件状态更新为 published 失败"); } + + // 4. NOTIFY outbox relay(通知 outbox relay 有新事件到达) + let notify_sql = sea_orm::Statement::from_string( + sea_orm::DatabaseBackend::Postgres, + format!("NOTIFY outbox_channel, '{}'", event_id), + ); + if let Err(e) = db.execute(notify_sql).await { + tracing::debug!(event_id = %event_id, error = %e, "NOTIFY outbox_channel 失败(非致命)"); + } } /// 仅内存广播(不持久化,用于内部测试等场景)。 diff --git a/crates/erp-server/Cargo.toml b/crates/erp-server/Cargo.toml index d84bcd8..3c9f7d7 100644 --- a/crates/erp-server/Cargo.toml +++ b/crates/erp-server/Cargo.toml @@ -17,6 +17,7 @@ tracing.workspace = true tracing-subscriber.workspace = true config.workspace = true sea-orm.workspace = true +sqlx.workspace = true redis.workspace = true utoipa.workspace = true serde_json.workspace = true diff --git a/crates/erp-server/src/main.rs b/crates/erp-server/src/main.rs index aa64510..93787fd 100644 --- a/crates/erp-server/src/main.rs +++ b/crates/erp-server/src/main.rs @@ -406,8 +406,8 @@ async fn main() -> anyhow::Result<()> { erp_plugin::notification::start_notification_listener(db.clone(), event_bus.clone()); tracing::info!("Plugin notification listener started"); - // Start outbox relay (re-publish pending domain events) - outbox::start_outbox_relay(db.clone(), event_bus.clone()); + // Start outbox relay (LISTEN/NOTIFY + fallback poll for pending domain events) + outbox::start_outbox_relay(db.clone(), event_bus.clone(), config.database.url.clone()); tracing::info!("Outbox relay started"); // Start timeout checker (scan overdue tasks every 60s) diff --git a/crates/erp-server/src/outbox.rs b/crates/erp-server/src/outbox.rs index b7ebbf1..19dde6a 100644 --- a/crates/erp-server/src/outbox.rs +++ b/crates/erp-server/src/outbox.rs @@ -1,19 +1,29 @@ use chrono::Utc; use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter, QueryOrder, QuerySelect, Set}; +use sqlx::postgres::PgListener; use std::time::Duration; use erp_core::entity::domain_event; use erp_core::events::{DomainEvent, EventBus}; const MAX_RETRY: i32 = 5; +const FALLBACK_POLL_INTERVAL_SECS: u64 = 30; +const NOTIFY_CHANNEL: &str = "outbox_channel"; +const RECONNECT_DELAY_SECS: u64 = 5; /// 启动 outbox relay 后台任务。 /// /// 先执行一次性扫描(处理服务重启前遗留的 pending 事件), -/// 然后每 5 秒定期扫描 domain_events 表中 status = 'pending' 的事件。 -pub fn start_outbox_relay(db: sea_orm::DatabaseConnection, event_bus: EventBus) { +/// 然后通过 PostgreSQL LISTEN/NOTIFY 监听新事件,配合 30s 兜底轮询。 +pub fn start_outbox_relay( + db: sea_orm::DatabaseConnection, + event_bus: EventBus, + database_url: String, +) { let db_clone = db.clone(); let event_bus_clone = event_bus.clone(); + let url = database_url.clone(); + tokio::spawn(async move { // 启动时立即处理一次(恢复重启前未广播的事件) match process_pending_events(&db_clone, &event_bus_clone).await { @@ -22,17 +32,65 @@ pub fn start_outbox_relay(db: sea_orm::DatabaseConnection, event_bus: EventBus) Err(e) => tracing::warn!(error = %e, "启动时 outbox relay 处理失败"), } - // 定期轮询 - let mut interval = tokio::time::interval(Duration::from_secs(5)); + // 进入 LISTEN/NOTIFY 主循环(带自动重连) loop { - interval.tick().await; + if let Err(e) = run_listener(&db_clone, &event_bus_clone, &url).await { + tracing::warn!(error = %e, "PgListener 断开连接,{}s 后重连", RECONNECT_DELAY_SECS); + } + tokio::time::sleep(Duration::from_secs(RECONNECT_DELAY_SECS)).await; + + // 重连后执行一次兜底扫描 if let Err(e) = process_pending_events(&db_clone, &event_bus_clone).await { - tracing::warn!(error = %e, "Outbox relay 处理失败"); + tracing::warn!(error = %e, "重连后 outbox relay 处理失败"); } } }); } +/// 运行 PgListener 监听循环。 +/// +/// 使用 `tokio::select!` 在 LISTEN 通知和 30s 定时器之间竞争, +/// 确保即使 NOTIFY 丢失也能兜底处理。 +async fn run_listener( + db: &sea_orm::DatabaseConnection, + event_bus: &EventBus, + database_url: &str, +) -> Result<(), sqlx::Error> { + let mut listener = PgListener::connect(database_url).await?; + listener.listen(NOTIFY_CHANNEL).await?; + tracing::info!("Outbox relay LISTEN/NOTIFY 已连接,监听 {}", NOTIFY_CHANNEL); + + let mut fallback = tokio::time::interval(Duration::from_secs(FALLBACK_POLL_INTERVAL_SECS)); + + loop { + tokio::select! { + // LISTEN/NOTIFY 通知触发 + notification = listener.recv() => { + match notification { + Ok(notif) => { + tracing::debug!( + channel = %notif.channel(), + payload = %notif.payload(), + "收到 outbox NOTIFY" + ); + if let Err(e) = process_pending_events(db, event_bus).await { + tracing::warn!(error = %e, "NOTIFY 触发的 outbox 处理失败"); + } + } + Err(e) => return Err(e), + } + } + // 30s 兜底轮询 + _ = fallback.tick() => { + tracing::debug!("outbox relay 兜底轮询触发"); + if let Err(e) = process_pending_events(db, event_bus).await { + tracing::warn!(error = %e, "兜底轮询 outbox 处理失败"); + } + } + } + } +} + async fn process_pending_events( db: &sea_orm::DatabaseConnection, event_bus: &EventBus,