feat(core): EventBus dead-letter + consume_with_retry 辅助函数
- 新增 dead_letter_events 表 + Entity - consume_with_retry: 幂等检查 + 成功标记 + 失败转入 dead-letter - insert_dead_letter: 写入失败事件供后续排查和手动重试
This commit is contained in:
27
crates/erp-core/src/entity/dead_letter_event.rs
Normal file
27
crates/erp-core/src/entity/dead_letter_event.rs
Normal file
@@ -0,0 +1,27 @@
|
||||
use sea_orm::entity::prelude::*;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Serialize, Deserialize)]
|
||||
#[sea_orm(table_name = "dead_letter_events")]
|
||||
pub struct Model {
|
||||
#[sea_orm(primary_key, auto_increment = false)]
|
||||
pub id: Uuid,
|
||||
#[sea_orm(skip_serializing_if = "Option::is_none")]
|
||||
pub tenant_id: Option<Uuid>,
|
||||
pub original_event_id: Uuid,
|
||||
pub event_type: String,
|
||||
#[sea_orm(skip_serializing_if = "Option::is_none")]
|
||||
pub payload: Option<serde_json::Value>,
|
||||
pub consumer_id: String,
|
||||
pub attempts: i32,
|
||||
#[sea_orm(skip_serializing_if = "Option::is_none")]
|
||||
pub last_error: Option<String>,
|
||||
pub created_at: DateTimeUtc,
|
||||
#[sea_orm(skip_serializing_if = "Option::is_none")]
|
||||
pub resolved_at: Option<DateTimeUtc>,
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
|
||||
pub enum Relation {}
|
||||
|
||||
impl ActiveModelBehavior for ActiveModel {}
|
||||
@@ -1,3 +1,4 @@
|
||||
pub mod audit_log;
|
||||
pub mod dead_letter_event;
|
||||
pub mod domain_event;
|
||||
pub mod processed_event;
|
||||
|
||||
@@ -5,6 +5,7 @@ use tokio::sync::{broadcast, mpsc};
|
||||
use tracing::{error, info};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::entity::dead_letter_event;
|
||||
use crate::entity::domain_event;
|
||||
|
||||
/// 领域事件
|
||||
@@ -101,6 +102,91 @@ pub async fn mark_event_processed(
|
||||
}
|
||||
}
|
||||
|
||||
/// 消费事件 — 带幂等检查和 dead-letter 兜底。
|
||||
///
|
||||
/// 如果事件已被处理(幂等),返回 `ConsumeResult::AlreadyProcessed`。
|
||||
/// 如果处理成功,标记为已处理并返回 `ConsumeResult::Success`。
|
||||
/// 如果处理失败,将事件转入 dead_letter_events 表并返回 `ConsumeResult::DeadLettered`。
|
||||
pub async fn consume_with_retry<F, Fut>(
|
||||
db: &sea_orm::DatabaseConnection,
|
||||
event: &DomainEvent,
|
||||
consumer_id: &str,
|
||||
handler: F,
|
||||
) -> ConsumeResult
|
||||
where
|
||||
F: FnOnce(&DomainEvent) -> Fut,
|
||||
Fut: std::future::Future<Output = Result<(), String>>,
|
||||
{
|
||||
if is_event_processed(db, event.id, consumer_id)
|
||||
.await
|
||||
.unwrap_or(false)
|
||||
{
|
||||
return ConsumeResult::AlreadyProcessed;
|
||||
}
|
||||
|
||||
match handler(event).await {
|
||||
Ok(()) => {
|
||||
if let Err(e) = mark_event_processed(db, event.id, consumer_id).await {
|
||||
tracing::warn!(
|
||||
event_id = %event.id,
|
||||
consumer_id = consumer_id,
|
||||
error = %e,
|
||||
"标记事件已处理失败(非致命)"
|
||||
);
|
||||
}
|
||||
ConsumeResult::Success
|
||||
}
|
||||
Err(err) => {
|
||||
tracing::error!(
|
||||
event_id = %event.id,
|
||||
event_type = %event.event_type,
|
||||
consumer_id = consumer_id,
|
||||
error = %err,
|
||||
"事件消费失败,转入 dead-letter"
|
||||
);
|
||||
if let Err(e) = insert_dead_letter(db, event, consumer_id, &err).await {
|
||||
tracing::error!(
|
||||
event_id = %event.id,
|
||||
error = %e,
|
||||
"Dead-letter 写入失败"
|
||||
);
|
||||
}
|
||||
ConsumeResult::DeadLettered(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// 消费结果
|
||||
#[derive(Debug)]
|
||||
pub enum ConsumeResult {
|
||||
Success,
|
||||
AlreadyProcessed,
|
||||
DeadLettered(String),
|
||||
}
|
||||
|
||||
/// 将失败事件写入 dead_letter_events 表
|
||||
pub async fn insert_dead_letter(
|
||||
db: &sea_orm::DatabaseConnection,
|
||||
event: &DomainEvent,
|
||||
consumer_id: &str,
|
||||
error_msg: &str,
|
||||
) -> Result<(), sea_orm::DbErr> {
|
||||
let model = dead_letter_event::ActiveModel {
|
||||
id: Set(Uuid::now_v7()),
|
||||
tenant_id: Set(Some(event.tenant_id)),
|
||||
original_event_id: Set(event.id),
|
||||
event_type: Set(event.event_type.clone()),
|
||||
payload: Set(Some(event.payload.clone())),
|
||||
consumer_id: Set(consumer_id.to_string()),
|
||||
attempts: Set(1),
|
||||
last_error: Set(Some(error_msg.to_string())),
|
||||
created_at: Set(Utc::now()),
|
||||
resolved_at: Set(None),
|
||||
};
|
||||
model.insert(db).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// 过滤事件接收器 — 只接收匹配 `event_type_prefix` 的事件
|
||||
pub struct FilteredEventReceiver {
|
||||
receiver: mpsc::Receiver<DomainEvent>,
|
||||
|
||||
@@ -90,6 +90,7 @@ mod m20260427_000087_audit_logs_hash_chain;
|
||||
mod m20260428_000088_rls_policy_strict;
|
||||
mod m20260428_000089_blind_indexes;
|
||||
mod m20260428_000090_critical_alerts;
|
||||
mod m20260428_000091_dead_letter_events;
|
||||
|
||||
pub struct Migrator;
|
||||
|
||||
@@ -187,6 +188,7 @@ impl MigratorTrait for Migrator {
|
||||
Box::new(m20260428_000088_rls_policy_strict::Migration),
|
||||
Box::new(m20260428_000089_blind_indexes::Migration),
|
||||
Box::new(m20260428_000090_critical_alerts::Migration),
|
||||
Box::new(m20260428_000091_dead_letter_events::Migration),
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,76 @@
|
||||
use sea_orm_migration::prelude::*;
|
||||
|
||||
#[derive(DeriveMigrationName)]
|
||||
pub struct Migration;
|
||||
|
||||
#[derive(Iden)]
|
||||
enum DeadLetterEvent {
|
||||
Table,
|
||||
Id,
|
||||
TenantId,
|
||||
OriginalEventId,
|
||||
EventType,
|
||||
Payload,
|
||||
ConsumerId,
|
||||
Attempts,
|
||||
LastError,
|
||||
CreatedAt,
|
||||
ResolvedAt,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl MigrationTrait for Migration {
|
||||
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
||||
manager
|
||||
.create_table(
|
||||
Table::create()
|
||||
.table(DeadLetterEvent::Table)
|
||||
.col(
|
||||
ColumnDef::new(DeadLetterEvent::Id)
|
||||
.uuid()
|
||||
.not_null()
|
||||
.primary_key()
|
||||
.default(PgFunc::gen_random_uuid()),
|
||||
)
|
||||
.col(ColumnDef::new(DeadLetterEvent::TenantId).uuid())
|
||||
.col(
|
||||
ColumnDef::new(DeadLetterEvent::OriginalEventId)
|
||||
.uuid()
|
||||
.not_null(),
|
||||
)
|
||||
.col(
|
||||
ColumnDef::new(DeadLetterEvent::EventType)
|
||||
.string_len(128)
|
||||
.not_null(),
|
||||
)
|
||||
.col(ColumnDef::new(DeadLetterEvent::Payload).json_binary())
|
||||
.col(
|
||||
ColumnDef::new(DeadLetterEvent::ConsumerId)
|
||||
.string_len(128)
|
||||
.not_null(),
|
||||
)
|
||||
.col(
|
||||
ColumnDef::new(DeadLetterEvent::Attempts)
|
||||
.integer()
|
||||
.not_null()
|
||||
.default(0),
|
||||
)
|
||||
.col(ColumnDef::new(DeadLetterEvent::LastError).text())
|
||||
.col(
|
||||
ColumnDef::new(DeadLetterEvent::CreatedAt)
|
||||
.timestamp_with_time_zone()
|
||||
.not_null()
|
||||
.default(Expr::current_timestamp()),
|
||||
)
|
||||
.col(ColumnDef::new(DeadLetterEvent::ResolvedAt).timestamp_with_time_zone())
|
||||
.to_owned(),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
||||
manager
|
||||
.drop_table(Table::drop().table(DeadLetterEvent::Table).to_owned())
|
||||
.await
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user