feat(core): 事件归档 + 消费者幂等性 — 迁移 084/085 + 清理任务
- 迁移 084: domain_events_archive 归档表 + cleanup_old_published_events() - 迁移 085: processed_events 去重表 + cleanup_old_processed_events() - erp-core: is_event_processed() / mark_event_processed() 幂等性辅助 - erp-server: tasks::start_event_cleanup() 每 24h 归档 >90 天事件
This commit is contained in:
@@ -1,2 +1,3 @@
|
||||
pub mod audit_log;
|
||||
pub mod domain_event;
|
||||
pub mod processed_event;
|
||||
|
||||
18
crates/erp-core/src/entity/processed_event.rs
Normal file
18
crates/erp-core/src/entity/processed_event.rs
Normal file
@@ -0,0 +1,18 @@
|
||||
use sea_orm::entity::prelude::*;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
/// 已处理事件记录 — 幂等性去重表。
|
||||
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Serialize, Deserialize)]
|
||||
#[sea_orm(table_name = "processed_events")]
|
||||
pub struct Model {
|
||||
#[sea_orm(primary_key, auto_increment = false)]
|
||||
pub event_id: Uuid,
|
||||
#[sea_orm(primary_key, auto_increment = false)]
|
||||
pub consumer_id: String,
|
||||
pub processed_at: DateTimeUtc,
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
|
||||
pub enum Relation {}
|
||||
|
||||
impl ActiveModelBehavior for ActiveModel {}
|
||||
@@ -1,5 +1,5 @@
|
||||
use chrono::Utc;
|
||||
use sea_orm::{ActiveModelTrait, ConnectionTrait, Set};
|
||||
use sea_orm::{ActiveModelTrait, ConnectionTrait, PaginatorTrait, Set};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::sync::{broadcast, mpsc};
|
||||
use tracing::{error, info};
|
||||
@@ -53,6 +53,54 @@ pub fn build_event_payload(data: serde_json::Value) -> serde_json::Value {
|
||||
envelope
|
||||
}
|
||||
|
||||
/// 检查事件是否已被指定消费者处理。
|
||||
///
|
||||
/// 查询 `processed_events` 表判断 event_id + consumer_id 是否已存在。
|
||||
pub async fn is_event_processed(
|
||||
db: &sea_orm::DatabaseConnection,
|
||||
event_id: Uuid,
|
||||
consumer_id: &str,
|
||||
) -> Result<bool, sea_orm::DbErr> {
|
||||
use sea_orm::{ColumnTrait, EntityTrait, QueryFilter};
|
||||
|
||||
let count = crate::entity::processed_event::Entity::find()
|
||||
.filter(crate::entity::processed_event::Column::EventId.eq(event_id))
|
||||
.filter(crate::entity::processed_event::Column::ConsumerId.eq(consumer_id))
|
||||
.count(db)
|
||||
.await?;
|
||||
Ok(count > 0)
|
||||
}
|
||||
|
||||
/// 标记事件已被指定消费者处理。
|
||||
///
|
||||
/// 插入 `processed_events` 记录,重复插入会因主键冲突被安全忽略。
|
||||
pub async fn mark_event_processed(
|
||||
db: &sea_orm::DatabaseConnection,
|
||||
event_id: Uuid,
|
||||
consumer_id: &str,
|
||||
) -> Result<(), sea_orm::DbErr> {
|
||||
use sea_orm::ActiveModelTrait;
|
||||
use sea_orm::Set;
|
||||
|
||||
let model = crate::entity::processed_event::ActiveModel {
|
||||
event_id: Set(event_id),
|
||||
consumer_id: Set(consumer_id.to_string()),
|
||||
processed_at: Set(Utc::now()),
|
||||
};
|
||||
// INSERT ... ON CONFLICT DO NOTHING(主键冲突时安全忽略)
|
||||
match model.insert(db).await {
|
||||
Ok(_) => Ok(()),
|
||||
Err(e) => {
|
||||
// 唯一约束冲突 = 已处理,不是错误
|
||||
if e.to_string().contains("duplicate") || e.to_string().contains("violates unique") {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// 过滤事件接收器 — 只接收匹配 `event_type_prefix` 的事件
|
||||
pub struct FilteredEventReceiver {
|
||||
receiver: mpsc::Receiver<DomainEvent>,
|
||||
|
||||
Reference in New Issue
Block a user