feat(server): outbox relay 改为 LISTEN/NOTIFY + 30s 兜底轮询
- EventBus::publish() 持久化后执行 NOTIFY outbox_channel - outbox relay 使用 sqlx::PgListener 监听 + tokio::select! 竞争 - 30s 兜底轮询防止 NOTIFY 丢失,断线自动重连 - 轮询间隔从 5s 提升到 30s,事件延迟降至 <100ms
This commit is contained in:
@@ -38,6 +38,7 @@ sea-orm = { version = "1.1", features = [
|
|||||||
"sqlx-postgres", "runtime-tokio-rustls", "macros", "with-uuid", "with-chrono", "with-json"
|
"sqlx-postgres", "runtime-tokio-rustls", "macros", "with-uuid", "with-chrono", "with-json"
|
||||||
] }
|
] }
|
||||||
sea-orm-migration = { version = "1.1", features = ["sqlx-postgres", "runtime-tokio-rustls"] }
|
sea-orm-migration = { version = "1.1", features = ["sqlx-postgres", "runtime-tokio-rustls"] }
|
||||||
|
sqlx = { version = "0.8", features = ["runtime-tokio-rustls", "postgres", "uuid"] }
|
||||||
|
|
||||||
# Serialization
|
# Serialization
|
||||||
serde = { version = "1", features = ["derive"] }
|
serde = { version = "1", features = ["derive"] }
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
use chrono::Utc;
|
use chrono::Utc;
|
||||||
use sea_orm::{ActiveModelTrait, Set};
|
use sea_orm::{ActiveModelTrait, ConnectionTrait, Set};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use tokio::sync::{broadcast, mpsc};
|
use tokio::sync::{broadcast, mpsc};
|
||||||
use tracing::{error, info};
|
use tracing::{error, info};
|
||||||
@@ -70,7 +70,7 @@ impl EventBus {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// 发布事件:先持久化到 domain_events 表(pending 状态),再内存广播,
|
/// 发布事件:先持久化到 domain_events 表(pending 状态),再内存广播,
|
||||||
/// 最后更新为 published。
|
/// 最后更新为 published 并 NOTIFY outbox relay。
|
||||||
///
|
///
|
||||||
/// 两阶段提交保证:即使广播后服务崩溃,事件仍为 pending 状态,
|
/// 两阶段提交保证:即使广播后服务崩溃,事件仍为 pending 状态,
|
||||||
/// 重启后 outbox relay 会重新广播。
|
/// 重启后 outbox relay 会重新广播。
|
||||||
@@ -110,6 +110,15 @@ impl EventBus {
|
|||||||
if let Err(e) = active.update(db).await {
|
if let Err(e) = active.update(db).await {
|
||||||
tracing::warn!(event_id = %event_id, error = %e, "领域事件状态更新为 published 失败");
|
tracing::warn!(event_id = %event_id, error = %e, "领域事件状态更新为 published 失败");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 4. NOTIFY outbox relay(通知 outbox relay 有新事件到达)
|
||||||
|
let notify_sql = sea_orm::Statement::from_string(
|
||||||
|
sea_orm::DatabaseBackend::Postgres,
|
||||||
|
format!("NOTIFY outbox_channel, '{}'", event_id),
|
||||||
|
);
|
||||||
|
if let Err(e) = db.execute(notify_sql).await {
|
||||||
|
tracing::debug!(event_id = %event_id, error = %e, "NOTIFY outbox_channel 失败(非致命)");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// 仅内存广播(不持久化,用于内部测试等场景)。
|
/// 仅内存广播(不持久化,用于内部测试等场景)。
|
||||||
|
|||||||
@@ -17,6 +17,7 @@ tracing.workspace = true
|
|||||||
tracing-subscriber.workspace = true
|
tracing-subscriber.workspace = true
|
||||||
config.workspace = true
|
config.workspace = true
|
||||||
sea-orm.workspace = true
|
sea-orm.workspace = true
|
||||||
|
sqlx.workspace = true
|
||||||
redis.workspace = true
|
redis.workspace = true
|
||||||
utoipa.workspace = true
|
utoipa.workspace = true
|
||||||
serde_json.workspace = true
|
serde_json.workspace = true
|
||||||
|
|||||||
@@ -406,8 +406,8 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
erp_plugin::notification::start_notification_listener(db.clone(), event_bus.clone());
|
erp_plugin::notification::start_notification_listener(db.clone(), event_bus.clone());
|
||||||
tracing::info!("Plugin notification listener started");
|
tracing::info!("Plugin notification listener started");
|
||||||
|
|
||||||
// Start outbox relay (re-publish pending domain events)
|
// Start outbox relay (LISTEN/NOTIFY + fallback poll for pending domain events)
|
||||||
outbox::start_outbox_relay(db.clone(), event_bus.clone());
|
outbox::start_outbox_relay(db.clone(), event_bus.clone(), config.database.url.clone());
|
||||||
tracing::info!("Outbox relay started");
|
tracing::info!("Outbox relay started");
|
||||||
|
|
||||||
// Start timeout checker (scan overdue tasks every 60s)
|
// Start timeout checker (scan overdue tasks every 60s)
|
||||||
|
|||||||
@@ -1,19 +1,29 @@
|
|||||||
use chrono::Utc;
|
use chrono::Utc;
|
||||||
use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter, QueryOrder, QuerySelect, Set};
|
use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter, QueryOrder, QuerySelect, Set};
|
||||||
|
use sqlx::postgres::PgListener;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use erp_core::entity::domain_event;
|
use erp_core::entity::domain_event;
|
||||||
use erp_core::events::{DomainEvent, EventBus};
|
use erp_core::events::{DomainEvent, EventBus};
|
||||||
|
|
||||||
const MAX_RETRY: i32 = 5;
|
const MAX_RETRY: i32 = 5;
|
||||||
|
const FALLBACK_POLL_INTERVAL_SECS: u64 = 30;
|
||||||
|
const NOTIFY_CHANNEL: &str = "outbox_channel";
|
||||||
|
const RECONNECT_DELAY_SECS: u64 = 5;
|
||||||
|
|
||||||
/// 启动 outbox relay 后台任务。
|
/// 启动 outbox relay 后台任务。
|
||||||
///
|
///
|
||||||
/// 先执行一次性扫描(处理服务重启前遗留的 pending 事件),
|
/// 先执行一次性扫描(处理服务重启前遗留的 pending 事件),
|
||||||
/// 然后每 5 秒定期扫描 domain_events 表中 status = 'pending' 的事件。
|
/// 然后通过 PostgreSQL LISTEN/NOTIFY 监听新事件,配合 30s 兜底轮询。
|
||||||
pub fn start_outbox_relay(db: sea_orm::DatabaseConnection, event_bus: EventBus) {
|
pub fn start_outbox_relay(
|
||||||
|
db: sea_orm::DatabaseConnection,
|
||||||
|
event_bus: EventBus,
|
||||||
|
database_url: String,
|
||||||
|
) {
|
||||||
let db_clone = db.clone();
|
let db_clone = db.clone();
|
||||||
let event_bus_clone = event_bus.clone();
|
let event_bus_clone = event_bus.clone();
|
||||||
|
let url = database_url.clone();
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
// 启动时立即处理一次(恢复重启前未广播的事件)
|
// 启动时立即处理一次(恢复重启前未广播的事件)
|
||||||
match process_pending_events(&db_clone, &event_bus_clone).await {
|
match process_pending_events(&db_clone, &event_bus_clone).await {
|
||||||
@@ -22,17 +32,65 @@ pub fn start_outbox_relay(db: sea_orm::DatabaseConnection, event_bus: EventBus)
|
|||||||
Err(e) => tracing::warn!(error = %e, "启动时 outbox relay 处理失败"),
|
Err(e) => tracing::warn!(error = %e, "启动时 outbox relay 处理失败"),
|
||||||
}
|
}
|
||||||
|
|
||||||
// 定期轮询
|
// 进入 LISTEN/NOTIFY 主循环(带自动重连)
|
||||||
let mut interval = tokio::time::interval(Duration::from_secs(5));
|
|
||||||
loop {
|
loop {
|
||||||
interval.tick().await;
|
if let Err(e) = run_listener(&db_clone, &event_bus_clone, &url).await {
|
||||||
|
tracing::warn!(error = %e, "PgListener 断开连接,{}s 后重连", RECONNECT_DELAY_SECS);
|
||||||
|
}
|
||||||
|
tokio::time::sleep(Duration::from_secs(RECONNECT_DELAY_SECS)).await;
|
||||||
|
|
||||||
|
// 重连后执行一次兜底扫描
|
||||||
if let Err(e) = process_pending_events(&db_clone, &event_bus_clone).await {
|
if let Err(e) = process_pending_events(&db_clone, &event_bus_clone).await {
|
||||||
tracing::warn!(error = %e, "Outbox relay 处理失败");
|
tracing::warn!(error = %e, "重连后 outbox relay 处理失败");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// 运行 PgListener 监听循环。
|
||||||
|
///
|
||||||
|
/// 使用 `tokio::select!` 在 LISTEN 通知和 30s 定时器之间竞争,
|
||||||
|
/// 确保即使 NOTIFY 丢失也能兜底处理。
|
||||||
|
async fn run_listener(
|
||||||
|
db: &sea_orm::DatabaseConnection,
|
||||||
|
event_bus: &EventBus,
|
||||||
|
database_url: &str,
|
||||||
|
) -> Result<(), sqlx::Error> {
|
||||||
|
let mut listener = PgListener::connect(database_url).await?;
|
||||||
|
listener.listen(NOTIFY_CHANNEL).await?;
|
||||||
|
tracing::info!("Outbox relay LISTEN/NOTIFY 已连接,监听 {}", NOTIFY_CHANNEL);
|
||||||
|
|
||||||
|
let mut fallback = tokio::time::interval(Duration::from_secs(FALLBACK_POLL_INTERVAL_SECS));
|
||||||
|
|
||||||
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
// LISTEN/NOTIFY 通知触发
|
||||||
|
notification = listener.recv() => {
|
||||||
|
match notification {
|
||||||
|
Ok(notif) => {
|
||||||
|
tracing::debug!(
|
||||||
|
channel = %notif.channel(),
|
||||||
|
payload = %notif.payload(),
|
||||||
|
"收到 outbox NOTIFY"
|
||||||
|
);
|
||||||
|
if let Err(e) = process_pending_events(db, event_bus).await {
|
||||||
|
tracing::warn!(error = %e, "NOTIFY 触发的 outbox 处理失败");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => return Err(e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// 30s 兜底轮询
|
||||||
|
_ = fallback.tick() => {
|
||||||
|
tracing::debug!("outbox relay 兜底轮询触发");
|
||||||
|
if let Err(e) = process_pending_events(db, event_bus).await {
|
||||||
|
tracing::warn!(error = %e, "兜底轮询 outbox 处理失败");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
async fn process_pending_events(
|
async fn process_pending_events(
|
||||||
db: &sea_orm::DatabaseConnection,
|
db: &sea_orm::DatabaseConnection,
|
||||||
event_bus: &EventBus,
|
event_bus: &EventBus,
|
||||||
|
|||||||
Reference in New Issue
Block a user