Files
hms/crates/erp-core/src/events.rs
iven be8fca1d76
Some checks failed
CI / rust-check (push) Has been cancelled
CI / rust-test (push) Has been cancelled
CI / frontend-build (push) Has been cancelled
CI / security-audit (push) Has been cancelled
feat(core): EventBus dead-letter + consume_with_retry 辅助函数
- 新增 dead_letter_events 表 + Entity
- consume_with_retry: 幂等检查 + 成功标记 + 失败转入 dead-letter
- insert_dead_letter: 写入失败事件供后续排查和手动重试
2026-04-28 11:47:44 +08:00

346 lines
11 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
use chrono::Utc;
use sea_orm::{ActiveModelTrait, ConnectionTrait, PaginatorTrait, Set};
use serde::{Deserialize, Serialize};
use tokio::sync::{broadcast, mpsc};
use tracing::{error, info};
use uuid::Uuid;
use crate::entity::dead_letter_event;
use crate::entity::domain_event;
/// 领域事件
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DomainEvent {
pub id: Uuid,
pub event_type: String,
pub tenant_id: Uuid,
pub payload: serde_json::Value,
pub timestamp: chrono::DateTime<Utc>,
pub correlation_id: Uuid,
}
impl DomainEvent {
pub fn new(event_type: impl Into<String>, tenant_id: Uuid, payload: serde_json::Value) -> Self {
Self {
id: Uuid::now_v7(),
event_type: event_type.into(),
tenant_id,
payload,
timestamp: Utc::now(),
correlation_id: Uuid::now_v7(),
}
}
}
/// 当前事件 payload schema 版本
pub const EVENT_SCHEMA_VERSION: &str = "v1";
/// 构造统一信封格式的事件 payload。
///
/// 自动注入 `schema_version` 和 `occurred_at`,业务数据通过 `data` 传入。
/// 用法:`build_event_payload(serde_json::json!({ "patient_id": ..., }))`
pub fn build_event_payload(data: serde_json::Value) -> serde_json::Value {
let mut envelope = serde_json::json!({
"schema_version": EVENT_SCHEMA_VERSION,
"occurred_at": Utc::now().to_rfc3339(),
});
if let serde_json::Value::Object(ref mut map) = envelope {
if let serde_json::Value::Object(data_map) = data {
for (k, v) in data_map {
map.insert(k, v);
}
}
}
envelope
}
/// 检查事件是否已被指定消费者处理。
///
/// 查询 `processed_events` 表判断 event_id + consumer_id 是否已存在。
pub async fn is_event_processed(
db: &sea_orm::DatabaseConnection,
event_id: Uuid,
consumer_id: &str,
) -> Result<bool, sea_orm::DbErr> {
use sea_orm::{ColumnTrait, EntityTrait, QueryFilter};
let count = crate::entity::processed_event::Entity::find()
.filter(crate::entity::processed_event::Column::EventId.eq(event_id))
.filter(crate::entity::processed_event::Column::ConsumerId.eq(consumer_id))
.count(db)
.await?;
Ok(count > 0)
}
/// 标记事件已被指定消费者处理。
///
/// 插入 `processed_events` 记录,重复插入会因主键冲突被安全忽略。
pub async fn mark_event_processed(
db: &sea_orm::DatabaseConnection,
event_id: Uuid,
consumer_id: &str,
) -> Result<(), sea_orm::DbErr> {
use sea_orm::ActiveModelTrait;
use sea_orm::Set;
let model = crate::entity::processed_event::ActiveModel {
event_id: Set(event_id),
consumer_id: Set(consumer_id.to_string()),
processed_at: Set(Utc::now()),
};
// INSERT ... ON CONFLICT DO NOTHING主键冲突时安全忽略
match model.insert(db).await {
Ok(_) => Ok(()),
Err(e) => {
// 唯一约束冲突 = 已处理,不是错误
if e.to_string().contains("duplicate") || e.to_string().contains("violates unique") {
Ok(())
} else {
Err(e)
}
}
}
}
/// 消费事件 — 带幂等检查和 dead-letter 兜底。
///
/// 如果事件已被处理(幂等),返回 `ConsumeResult::AlreadyProcessed`。
/// 如果处理成功,标记为已处理并返回 `ConsumeResult::Success`。
/// 如果处理失败,将事件转入 dead_letter_events 表并返回 `ConsumeResult::DeadLettered`。
pub async fn consume_with_retry<F, Fut>(
db: &sea_orm::DatabaseConnection,
event: &DomainEvent,
consumer_id: &str,
handler: F,
) -> ConsumeResult
where
F: FnOnce(&DomainEvent) -> Fut,
Fut: std::future::Future<Output = Result<(), String>>,
{
if is_event_processed(db, event.id, consumer_id)
.await
.unwrap_or(false)
{
return ConsumeResult::AlreadyProcessed;
}
match handler(event).await {
Ok(()) => {
if let Err(e) = mark_event_processed(db, event.id, consumer_id).await {
tracing::warn!(
event_id = %event.id,
consumer_id = consumer_id,
error = %e,
"标记事件已处理失败(非致命)"
);
}
ConsumeResult::Success
}
Err(err) => {
tracing::error!(
event_id = %event.id,
event_type = %event.event_type,
consumer_id = consumer_id,
error = %err,
"事件消费失败,转入 dead-letter"
);
if let Err(e) = insert_dead_letter(db, event, consumer_id, &err).await {
tracing::error!(
event_id = %event.id,
error = %e,
"Dead-letter 写入失败"
);
}
ConsumeResult::DeadLettered(err)
}
}
}
/// 消费结果
#[derive(Debug)]
pub enum ConsumeResult {
Success,
AlreadyProcessed,
DeadLettered(String),
}
/// 将失败事件写入 dead_letter_events 表
pub async fn insert_dead_letter(
db: &sea_orm::DatabaseConnection,
event: &DomainEvent,
consumer_id: &str,
error_msg: &str,
) -> Result<(), sea_orm::DbErr> {
let model = dead_letter_event::ActiveModel {
id: Set(Uuid::now_v7()),
tenant_id: Set(Some(event.tenant_id)),
original_event_id: Set(event.id),
event_type: Set(event.event_type.clone()),
payload: Set(Some(event.payload.clone())),
consumer_id: Set(consumer_id.to_string()),
attempts: Set(1),
last_error: Set(Some(error_msg.to_string())),
created_at: Set(Utc::now()),
resolved_at: Set(None),
};
model.insert(db).await?;
Ok(())
}
/// 过滤事件接收器 — 只接收匹配 `event_type_prefix` 的事件
pub struct FilteredEventReceiver {
receiver: mpsc::Receiver<DomainEvent>,
}
impl FilteredEventReceiver {
/// 接收下一个匹配的事件
pub async fn recv(&mut self) -> Option<DomainEvent> {
self.receiver.recv().await
}
}
/// 订阅句柄 — 用于取消过滤订阅
pub struct SubscriptionHandle {
cancel_tx: mpsc::Sender<()>,
join_handle: tokio::task::JoinHandle<()>,
}
impl SubscriptionHandle {
/// 取消订阅并等待后台任务结束
pub async fn cancel(self) {
let _ = self.cancel_tx.send(()).await;
let _ = self.join_handle.await;
}
}
/// 进程内事件总线
#[derive(Clone)]
pub struct EventBus {
sender: broadcast::Sender<DomainEvent>,
}
impl EventBus {
pub fn new(capacity: usize) -> Self {
let (sender, _) = broadcast::channel(capacity);
Self { sender }
}
/// 发布事件:先持久化到 domain_events 表pending 状态),再内存广播,
/// 最后更新为 published 并 NOTIFY outbox relay。
///
/// 两阶段提交保证:即使广播后服务崩溃,事件仍为 pending 状态,
/// 重启后 outbox relay 会重新广播。
pub async fn publish(&self, event: DomainEvent, db: &sea_orm::DatabaseConnection) {
// 1. 持久化为 pending 状态
let event_id = event.id;
let model = domain_event::ActiveModel {
id: Set(event.id),
tenant_id: Set(event.tenant_id),
event_type: Set(event.event_type.clone()),
payload: Set(Some(event.payload.clone())),
correlation_id: Set(Some(event.correlation_id)),
status: Set("pending".to_string()),
attempts: Set(0),
last_error: Set(None),
created_at: Set(event.timestamp),
published_at: Set(None),
};
let saved = match model.insert(db).await {
Ok(m) => m,
Err(e) => {
tracing::warn!(event_id = %event_id, error = %e, "领域事件持久化失败");
// 持久化失败仍然广播best-effort
self.broadcast(event);
return;
}
};
// 2. 内存广播
self.broadcast(event);
// 3. 更新为 published
let mut active: domain_event::ActiveModel = saved.into();
active.status = Set("published".to_string());
active.published_at = Set(Some(Utc::now()));
if let Err(e) = active.update(db).await {
tracing::warn!(event_id = %event_id, error = %e, "领域事件状态更新为 published 失败");
}
// 4. NOTIFY outbox relay通知 outbox relay 有新事件到达)
let notify_sql = sea_orm::Statement::from_string(
sea_orm::DatabaseBackend::Postgres,
format!("NOTIFY outbox_channel, '{}'", event_id),
);
if let Err(e) = db.execute(notify_sql).await {
tracing::debug!(event_id = %event_id, error = %e, "NOTIFY outbox_channel 失败(非致命)");
}
}
/// 仅内存广播(不持久化,用于内部测试等场景)。
pub fn broadcast(&self, event: DomainEvent) {
info!(event_type = %event.event_type, event_id = %event.id, "Event broadcast");
if let Err(e) = self.sender.send(event) {
error!("Failed to broadcast event: {}", e);
}
}
/// 订阅所有事件,返回接收端
pub fn subscribe(&self) -> broadcast::Receiver<DomainEvent> {
self.sender.subscribe()
}
/// 按事件类型前缀过滤订阅。
///
/// 为每次调用 spawn 一个 Tokio task 从 broadcast channel 读取,
/// 只转发匹配 `event_type_prefix` 的事件到 mpsc channelcapacity 256
pub fn subscribe_filtered(
&self,
event_type_prefix: String,
) -> (FilteredEventReceiver, SubscriptionHandle) {
let mut broadcast_rx = self.sender.subscribe();
let (mpsc_tx, mpsc_rx) = mpsc::channel(256);
let (cancel_tx, mut cancel_rx) = mpsc::channel::<()>(1);
let prefix = event_type_prefix.clone();
let join_handle = tokio::spawn(async move {
loop {
tokio::select! {
biased;
_ = cancel_rx.recv() => {
tracing::info!(prefix = %prefix, "Filtered subscription cancelled");
break;
}
event = broadcast_rx.recv() => {
match event {
Ok(event) => {
if event.event_type.starts_with(&prefix) {
if mpsc_tx.send(event).await.is_err() {
break;
}
}
}
Err(broadcast::error::RecvError::Lagged(n)) => {
tracing::warn!(prefix = %prefix, lagged = n, "Filtered subscriber lagged");
}
Err(broadcast::error::RecvError::Closed) => {
break;
}
}
}
}
}
});
tracing::info!(prefix = %event_type_prefix, "Filtered subscription created");
(
FilteredEventReceiver { receiver: mpsc_rx },
SubscriptionHandle {
cancel_tx,
join_handle,
},
)
}
}