Files
hms/crates/erp-server/src/outbox.rs
iven 603af83aa9
Some checks failed
CI / rust-check (push) Has been cancelled
CI / rust-test (push) Has been cancelled
CI / frontend-build (push) Has been cancelled
CI / security-audit (push) Has been cancelled
fix: P0 止血 — 消除崩溃风险 + 伪CAS修复 + 硬编码清除 + 晚间血压
- 新增 sea_orm_ext 模块: safe_version() / bump_version() 替代 14 处 unwrap()
- 修复 points_service 伪 CAS 逻辑 bug: 在 Set() 前提取原始版本并重新验证
- AdminDashboard: API 失败时显示 unknown 状态而非虚假绿色 healthy
- AdminDashboard: 今日操作改用真实数据,移除 "0 错误" 硬编码
- OperatorWorkbench: 移除硬编码 "美玲",改用真实用户名
- Home.tsx: operator "内容发布" 从硬编码 0 改为真实积分统计
- 小程序体征录入: 新增晚间血压 indicator_type,映射到 evening 字段
2026-05-02 23:42:01 +08:00

136 lines
5.0 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
use chrono::Utc;
use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter, QueryOrder, QuerySelect, Set};
use sqlx::postgres::PgListener;
use std::time::Duration;
use erp_core::entity::domain_event;
use erp_core::events::{DomainEvent, EventBus};
const MAX_RETRY: i32 = 5;
const FALLBACK_POLL_INTERVAL_SECS: u64 = 30;
const NOTIFY_CHANNEL: &str = "outbox_channel";
const RECONNECT_DELAY_SECS: u64 = 5;
/// 启动 outbox relay 后台任务。
///
/// 先执行一次性扫描(处理服务重启前遗留的 pending 事件),
/// 然后通过 PostgreSQL LISTEN/NOTIFY 监听新事件,配合 30s 兜底轮询。
pub fn start_outbox_relay(
db: sea_orm::DatabaseConnection,
event_bus: EventBus,
database_url: String,
) {
let db_clone = db.clone();
let event_bus_clone = event_bus.clone();
let url = database_url.clone();
tokio::spawn(async move {
// 启动时立即处理一次(恢复重启前未广播的事件)
match process_pending_events(&db_clone, &event_bus_clone).await {
Ok(count) if count > 0 => tracing::info!(count = count, "启动时 outbox relay 恢复完成"),
Ok(_) => tracing::info!("启动时 outbox relay 无待处理事件"),
Err(e) => tracing::warn!(error = %e, "启动时 outbox relay 处理失败"),
}
// 进入 LISTEN/NOTIFY 主循环(带自动重连)
loop {
if let Err(e) = run_listener(&db_clone, &event_bus_clone, &url).await {
tracing::warn!(error = %e, "PgListener 断开连接,{}s 后重连", RECONNECT_DELAY_SECS);
}
tokio::time::sleep(Duration::from_secs(RECONNECT_DELAY_SECS)).await;
// 重连后执行一次兜底扫描
if let Err(e) = process_pending_events(&db_clone, &event_bus_clone).await {
tracing::warn!(error = %e, "重连后 outbox relay 处理失败");
}
}
});
}
/// 运行 PgListener 监听循环。
///
/// 使用 `tokio::select!` 在 LISTEN 通知和 30s 定时器之间竞争,
/// 确保即使 NOTIFY 丢失也能兜底处理。
async fn run_listener(
db: &sea_orm::DatabaseConnection,
event_bus: &EventBus,
database_url: &str,
) -> Result<(), sqlx::Error> {
let mut listener = PgListener::connect(database_url).await?;
listener.listen(NOTIFY_CHANNEL).await?;
tracing::info!("Outbox relay LISTEN/NOTIFY 已连接,监听 {}", NOTIFY_CHANNEL);
let mut fallback = tokio::time::interval(Duration::from_secs(FALLBACK_POLL_INTERVAL_SECS));
loop {
tokio::select! {
// LISTEN/NOTIFY 通知触发
notification = listener.recv() => {
match notification {
Ok(notif) => {
tracing::debug!(
channel = %notif.channel(),
payload = %notif.payload(),
"收到 outbox NOTIFY"
);
if let Err(e) = process_pending_events(db, event_bus).await {
tracing::warn!(error = %e, "NOTIFY 触发的 outbox 处理失败");
}
}
Err(e) => return Err(e),
}
}
// 30s 兜底轮询
_ = fallback.tick() => {
tracing::debug!("outbox relay 兜底轮询触发");
if let Err(e) = process_pending_events(db, event_bus).await {
tracing::warn!(error = %e, "兜底轮询 outbox 处理失败");
}
}
}
}
}
async fn process_pending_events(
db: &sea_orm::DatabaseConnection,
event_bus: &EventBus,
) -> Result<usize, sea_orm::DbErr> {
let pending = domain_event::Entity::find()
.filter(domain_event::Column::Status.eq("pending"))
.filter(domain_event::Column::Attempts.lt(MAX_RETRY))
.order_by_asc(domain_event::Column::CreatedAt)
.limit(100)
.all(db)
.await?;
if pending.is_empty() {
return Ok(0);
}
let count = pending.len();
tracing::info!(count = count, "处理待发领域事件");
for event_model in pending {
// 重建 DomainEvent 并广播(保留原始 ID 和时间戳)
let domain_event = DomainEvent {
id: event_model.id,
event_type: event_model.event_type.clone(),
tenant_id: event_model.tenant_id,
payload: event_model.payload.clone().unwrap_or(serde_json::json!({})),
timestamp: event_model.created_at,
correlation_id: event_model.correlation_id.unwrap_or(event_model.id),
};
event_bus.broadcast(domain_event);
// 标记为 published增加 attempts 计数
let mut active: domain_event::ActiveModel = event_model.into();
active.status = Set("published".to_string());
active.published_at = Set(Some(Utc::now()));
active.attempts = Set(erp_core::sea_orm_ext::bump_version(&active.attempts));
active.update(db).await?;
}
Ok(count)
}