From be8fca1d7639994240ccfa4fca80fd653f0190d2 Mon Sep 17 00:00:00 2001 From: iven Date: Tue, 28 Apr 2026 11:47:44 +0800 Subject: [PATCH] =?UTF-8?q?feat(core):=20EventBus=20dead-letter=20+=20cons?= =?UTF-8?q?ume=5Fwith=5Fretry=20=E8=BE=85=E5=8A=A9=E5=87=BD=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 dead_letter_events 表 + Entity - consume_with_retry: 幂等检查 + 成功标记 + 失败转入 dead-letter - insert_dead_letter: 写入失败事件供后续排查和手动重试 --- .../erp-core/src/entity/dead_letter_event.rs | 27 ++++++ crates/erp-core/src/entity/mod.rs | 1 + crates/erp-core/src/events.rs | 86 +++++++++++++++++++ crates/erp-server/migration/src/lib.rs | 2 + .../m20260428_000091_dead_letter_events.rs | 76 ++++++++++++++++ 5 files changed, 192 insertions(+) create mode 100644 crates/erp-core/src/entity/dead_letter_event.rs create mode 100644 crates/erp-server/migration/src/m20260428_000091_dead_letter_events.rs diff --git a/crates/erp-core/src/entity/dead_letter_event.rs b/crates/erp-core/src/entity/dead_letter_event.rs new file mode 100644 index 0000000..e8ec2f9 --- /dev/null +++ b/crates/erp-core/src/entity/dead_letter_event.rs @@ -0,0 +1,27 @@ +use sea_orm::entity::prelude::*; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Serialize, Deserialize)] +#[sea_orm(table_name = "dead_letter_events")] +pub struct Model { + #[sea_orm(primary_key, auto_increment = false)] + pub id: Uuid, + #[sea_orm(skip_serializing_if = "Option::is_none")] + pub tenant_id: Option, + pub original_event_id: Uuid, + pub event_type: String, + #[sea_orm(skip_serializing_if = "Option::is_none")] + pub payload: Option, + pub consumer_id: String, + pub attempts: i32, + #[sea_orm(skip_serializing_if = "Option::is_none")] + pub last_error: Option, + pub created_at: DateTimeUtc, + #[sea_orm(skip_serializing_if = "Option::is_none")] + pub resolved_at: Option, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation {} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/crates/erp-core/src/entity/mod.rs b/crates/erp-core/src/entity/mod.rs index 00a81aa..34dd114 100644 --- a/crates/erp-core/src/entity/mod.rs +++ b/crates/erp-core/src/entity/mod.rs @@ -1,3 +1,4 @@ pub mod audit_log; +pub mod dead_letter_event; pub mod domain_event; pub mod processed_event; diff --git a/crates/erp-core/src/events.rs b/crates/erp-core/src/events.rs index 2d78d7e..c0a686e 100644 --- a/crates/erp-core/src/events.rs +++ b/crates/erp-core/src/events.rs @@ -5,6 +5,7 @@ use tokio::sync::{broadcast, mpsc}; use tracing::{error, info}; use uuid::Uuid; +use crate::entity::dead_letter_event; use crate::entity::domain_event; /// 领域事件 @@ -101,6 +102,91 @@ pub async fn mark_event_processed( } } +/// 消费事件 — 带幂等检查和 dead-letter 兜底。 +/// +/// 如果事件已被处理(幂等),返回 `ConsumeResult::AlreadyProcessed`。 +/// 如果处理成功,标记为已处理并返回 `ConsumeResult::Success`。 +/// 如果处理失败,将事件转入 dead_letter_events 表并返回 `ConsumeResult::DeadLettered`。 +pub async fn consume_with_retry( + db: &sea_orm::DatabaseConnection, + event: &DomainEvent, + consumer_id: &str, + handler: F, +) -> ConsumeResult +where + F: FnOnce(&DomainEvent) -> Fut, + Fut: std::future::Future>, +{ + if is_event_processed(db, event.id, consumer_id) + .await + .unwrap_or(false) + { + return ConsumeResult::AlreadyProcessed; + } + + match handler(event).await { + Ok(()) => { + if let Err(e) = mark_event_processed(db, event.id, consumer_id).await { + tracing::warn!( + event_id = %event.id, + consumer_id = consumer_id, + error = %e, + "标记事件已处理失败(非致命)" + ); + } + ConsumeResult::Success + } + Err(err) => { + tracing::error!( + event_id = %event.id, + event_type = %event.event_type, + consumer_id = consumer_id, + error = %err, + "事件消费失败,转入 dead-letter" + ); + if let Err(e) = insert_dead_letter(db, event, consumer_id, &err).await { + tracing::error!( + event_id = %event.id, + error = %e, + "Dead-letter 写入失败" + ); + } + ConsumeResult::DeadLettered(err) + } + } +} + +/// 消费结果 +#[derive(Debug)] +pub enum ConsumeResult { + Success, + AlreadyProcessed, + DeadLettered(String), +} + +/// 将失败事件写入 dead_letter_events 表 +pub async fn insert_dead_letter( + db: &sea_orm::DatabaseConnection, + event: &DomainEvent, + consumer_id: &str, + error_msg: &str, +) -> Result<(), sea_orm::DbErr> { + let model = dead_letter_event::ActiveModel { + id: Set(Uuid::now_v7()), + tenant_id: Set(Some(event.tenant_id)), + original_event_id: Set(event.id), + event_type: Set(event.event_type.clone()), + payload: Set(Some(event.payload.clone())), + consumer_id: Set(consumer_id.to_string()), + attempts: Set(1), + last_error: Set(Some(error_msg.to_string())), + created_at: Set(Utc::now()), + resolved_at: Set(None), + }; + model.insert(db).await?; + Ok(()) +} + /// 过滤事件接收器 — 只接收匹配 `event_type_prefix` 的事件 pub struct FilteredEventReceiver { receiver: mpsc::Receiver, diff --git a/crates/erp-server/migration/src/lib.rs b/crates/erp-server/migration/src/lib.rs index a2b71f7..6d65f6e 100644 --- a/crates/erp-server/migration/src/lib.rs +++ b/crates/erp-server/migration/src/lib.rs @@ -90,6 +90,7 @@ mod m20260427_000087_audit_logs_hash_chain; mod m20260428_000088_rls_policy_strict; mod m20260428_000089_blind_indexes; mod m20260428_000090_critical_alerts; +mod m20260428_000091_dead_letter_events; pub struct Migrator; @@ -187,6 +188,7 @@ impl MigratorTrait for Migrator { Box::new(m20260428_000088_rls_policy_strict::Migration), Box::new(m20260428_000089_blind_indexes::Migration), Box::new(m20260428_000090_critical_alerts::Migration), + Box::new(m20260428_000091_dead_letter_events::Migration), ] } } diff --git a/crates/erp-server/migration/src/m20260428_000091_dead_letter_events.rs b/crates/erp-server/migration/src/m20260428_000091_dead_letter_events.rs new file mode 100644 index 0000000..47d3c42 --- /dev/null +++ b/crates/erp-server/migration/src/m20260428_000091_dead_letter_events.rs @@ -0,0 +1,76 @@ +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[derive(Iden)] +enum DeadLetterEvent { + Table, + Id, + TenantId, + OriginalEventId, + EventType, + Payload, + ConsumerId, + Attempts, + LastError, + CreatedAt, + ResolvedAt, +} + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .create_table( + Table::create() + .table(DeadLetterEvent::Table) + .col( + ColumnDef::new(DeadLetterEvent::Id) + .uuid() + .not_null() + .primary_key() + .default(PgFunc::gen_random_uuid()), + ) + .col(ColumnDef::new(DeadLetterEvent::TenantId).uuid()) + .col( + ColumnDef::new(DeadLetterEvent::OriginalEventId) + .uuid() + .not_null(), + ) + .col( + ColumnDef::new(DeadLetterEvent::EventType) + .string_len(128) + .not_null(), + ) + .col(ColumnDef::new(DeadLetterEvent::Payload).json_binary()) + .col( + ColumnDef::new(DeadLetterEvent::ConsumerId) + .string_len(128) + .not_null(), + ) + .col( + ColumnDef::new(DeadLetterEvent::Attempts) + .integer() + .not_null() + .default(0), + ) + .col(ColumnDef::new(DeadLetterEvent::LastError).text()) + .col( + ColumnDef::new(DeadLetterEvent::CreatedAt) + .timestamp_with_time_zone() + .not_null() + .default(Expr::current_timestamp()), + ) + .col(ColumnDef::new(DeadLetterEvent::ResolvedAt).timestamp_with_time_zone()) + .to_owned(), + ) + .await + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .drop_table(Table::drop().table(DeadLetterEvent::Table).to_owned()) + .await + } +}