From 685df5e458b14522f1cf485ef864b08f7dd1064c Mon Sep 17 00:00:00 2001 From: iven Date: Sun, 12 Apr 2026 00:10:49 +0800 Subject: [PATCH] feat(core): implement event outbox persistence Add domain_events migration and SeaORM entity. Modify EventBus::publish to persist events before broadcasting (best-effort: DB failure logs warning but still broadcasts in-memory). Update all 19 publish call sites across 4 crates to pass db reference. Add outbox relay background task that polls pending events every 5s and re-broadcasts them, ensuring no events are lost on server restart. --- Cargo.lock | 1 + crates/erp-auth/src/service/auth_service.rs | 2 +- crates/erp-auth/src/service/dept_service.rs | 4 +- crates/erp-auth/src/service/org_service.rs | 4 +- .../erp-auth/src/service/position_service.rs | 4 +- crates/erp-auth/src/service/role_service.rs | 4 +- crates/erp-auth/src/service/user_service.rs | 4 +- .../src/service/dictionary_service.rs | 4 +- crates/erp-config/src/service/menu_service.rs | 4 +- .../src/service/numbering_service.rs | 4 +- .../erp-config/src/service/setting_service.rs | 4 +- crates/erp-core/src/entity/domain_event.rs | 24 ++++++ crates/erp-core/src/entity/mod.rs | 1 + crates/erp-core/src/events.rs | 44 ++++++++-- .../src/service/message_service.rs | 4 +- crates/erp-server/Cargo.toml | 1 + crates/erp-server/migration/src/lib.rs | 2 + .../m20260416_000031_create_domain_events.rs | 80 +++++++++++++++++++ crates/erp-server/src/main.rs | 5 ++ crates/erp-server/src/outbox.rs | 58 ++++++++++++++ .../src/service/definition_service.rs | 4 +- .../src/service/instance_service.rs | 2 +- .../erp-workflow/src/service/task_service.rs | 2 +- 23 files changed, 235 insertions(+), 31 deletions(-) create mode 100644 crates/erp-core/src/entity/domain_event.rs create mode 100644 crates/erp-server/migration/src/m20260416_000031_create_domain_events.rs create mode 100644 crates/erp-server/src/outbox.rs diff --git a/Cargo.lock b/Cargo.lock index 871f394..33a7322 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -896,6 +896,7 @@ version = "0.1.0" dependencies = [ "anyhow", "axum", + "chrono", "config", "erp-auth", "erp-common", diff --git a/crates/erp-auth/src/service/auth_service.rs b/crates/erp-auth/src/service/auth_service.rs index df13fa6..f2a51c5 100644 --- a/crates/erp-auth/src/service/auth_service.rs +++ b/crates/erp-auth/src/service/auth_service.rs @@ -130,7 +130,7 @@ impl AuthService { "user.login", tenant_id, serde_json::json!({ "user_id": user_model.id, "username": user_model.username }), - )); + ), db).await; Ok(LoginResp { access_token, diff --git a/crates/erp-auth/src/service/dept_service.rs b/crates/erp-auth/src/service/dept_service.rs index 48d048f..b5aefbc 100644 --- a/crates/erp-auth/src/service/dept_service.rs +++ b/crates/erp-auth/src/service/dept_service.rs @@ -124,7 +124,7 @@ impl DeptService { "department.created", tenant_id, serde_json::json!({ "dept_id": id, "org_id": org_id, "name": req.name }), - )); + ), db).await; audit_service::record( AuditLog::new(tenant_id, Some(operator_id), "department.create", "department") @@ -271,7 +271,7 @@ impl DeptService { "department.deleted", tenant_id, serde_json::json!({ "dept_id": id }), - )); + ), db).await; audit_service::record( AuditLog::new(tenant_id, Some(operator_id), "department.delete", "department") diff --git a/crates/erp-auth/src/service/org_service.rs b/crates/erp-auth/src/service/org_service.rs index 4686056..8ef0276 100644 --- a/crates/erp-auth/src/service/org_service.rs +++ b/crates/erp-auth/src/service/org_service.rs @@ -110,7 +110,7 @@ impl OrgService { "organization.created", tenant_id, serde_json::json!({ "org_id": id, "name": req.name }), - )); + ), db).await; audit_service::record( AuditLog::new(tenant_id, Some(operator_id), "organization.create", "organization") @@ -250,7 +250,7 @@ impl OrgService { "organization.deleted", tenant_id, serde_json::json!({ "org_id": id }), - )); + ), db).await; audit_service::record( AuditLog::new(tenant_id, Some(operator_id), "organization.delete", "organization") diff --git a/crates/erp-auth/src/service/position_service.rs b/crates/erp-auth/src/service/position_service.rs index 8d5aa0b..fa6a8bd 100644 --- a/crates/erp-auth/src/service/position_service.rs +++ b/crates/erp-auth/src/service/position_service.rs @@ -109,7 +109,7 @@ impl PositionService { "position.created", tenant_id, serde_json::json!({ "position_id": id, "dept_id": dept_id, "name": req.name }), - )); + ), db).await; audit_service::record( AuditLog::new(tenant_id, Some(operator_id), "position.create", "position") @@ -234,7 +234,7 @@ impl PositionService { "position.deleted", tenant_id, serde_json::json!({ "position_id": id }), - )); + ), db).await; audit_service::record( AuditLog::new(tenant_id, Some(operator_id), "position.delete", "position") diff --git a/crates/erp-auth/src/service/role_service.rs b/crates/erp-auth/src/service/role_service.rs index f444cfe..e4e0250 100644 --- a/crates/erp-auth/src/service/role_service.rs +++ b/crates/erp-auth/src/service/role_service.rs @@ -131,7 +131,7 @@ impl RoleService { "role.created", tenant_id, serde_json::json!({ "role_id": id, "code": code }), - )); + ), db).await; audit_service::record( AuditLog::new(tenant_id, Some(operator_id), "role.create", "role") @@ -242,7 +242,7 @@ impl RoleService { "role.deleted", tenant_id, serde_json::json!({ "role_id": id }), - )); + ), db).await; audit_service::record( AuditLog::new(tenant_id, Some(operator_id), "role.delete", "role") diff --git a/crates/erp-auth/src/service/user_service.rs b/crates/erp-auth/src/service/user_service.rs index a10c9c9..97d997a 100644 --- a/crates/erp-auth/src/service/user_service.rs +++ b/crates/erp-auth/src/service/user_service.rs @@ -94,7 +94,7 @@ impl UserService { "user.created", tenant_id, serde_json::json!({ "user_id": user_id, "username": req.username }), - )); + ), db).await; audit_service::record( AuditLog::new(tenant_id, Some(operator_id), "user.create", "user") @@ -265,7 +265,7 @@ impl UserService { "user.deleted", tenant_id, serde_json::json!({ "user_id": id }), - )); + ), db).await; audit_service::record( AuditLog::new(tenant_id, Some(operator_id), "user.delete", "user") diff --git a/crates/erp-config/src/service/dictionary_service.rs b/crates/erp-config/src/service/dictionary_service.rs index 9359162..1e299f8 100644 --- a/crates/erp-config/src/service/dictionary_service.rs +++ b/crates/erp-config/src/service/dictionary_service.rs @@ -137,7 +137,7 @@ impl DictionaryService { "dictionary.created", tenant_id, serde_json::json!({ "dictionary_id": id, "code": code }), - )); + ), db).await; audit_service::record( AuditLog::new(tenant_id, Some(operator_id), "dictionary.create", "dictionary") @@ -248,7 +248,7 @@ impl DictionaryService { "dictionary.deleted", tenant_id, serde_json::json!({ "dictionary_id": id }), - )); + ), db).await; audit_service::record( AuditLog::new(tenant_id, Some(operator_id), "dictionary.delete", "dictionary") diff --git a/crates/erp-config/src/service/menu_service.rs b/crates/erp-config/src/service/menu_service.rs index 0da88c7..3399e12 100644 --- a/crates/erp-config/src/service/menu_service.rs +++ b/crates/erp-config/src/service/menu_service.rs @@ -156,7 +156,7 @@ impl MenuService { "menu.created", tenant_id, serde_json::json!({ "menu_id": id, "title": req.title }), - )); + ), db).await; audit_service::record( AuditLog::new(tenant_id, Some(operator_id), "menu.create", "menu") @@ -289,7 +289,7 @@ impl MenuService { "menu.deleted", tenant_id, serde_json::json!({ "menu_id": id }), - )); + ), db).await; audit_service::record( AuditLog::new(tenant_id, Some(operator_id), "menu.delete", "menu") diff --git a/crates/erp-config/src/service/numbering_service.rs b/crates/erp-config/src/service/numbering_service.rs index 4d285ca..31d67ee 100644 --- a/crates/erp-config/src/service/numbering_service.rs +++ b/crates/erp-config/src/service/numbering_service.rs @@ -107,7 +107,7 @@ impl NumberingService { "numbering_rule.created", tenant_id, serde_json::json!({ "rule_id": id, "code": req.code }), - )); + ), db).await; audit_service::record( AuditLog::new(tenant_id, Some(operator_id), "numbering_rule.create", "numbering_rule") @@ -223,7 +223,7 @@ impl NumberingService { "numbering_rule.deleted", tenant_id, serde_json::json!({ "rule_id": id }), - )); + ), db).await; audit_service::record( AuditLog::new(tenant_id, Some(operator_id), "numbering_rule.delete", "numbering_rule") diff --git a/crates/erp-config/src/service/setting_service.rs b/crates/erp-config/src/service/setting_service.rs index 1621bca..f57135e 100644 --- a/crates/erp-config/src/service/setting_service.rs +++ b/crates/erp-config/src/service/setting_service.rs @@ -117,7 +117,7 @@ impl SettingService { "key": params.key, "scope": params.scope, }), - )); + ), db).await; audit_service::record( AuditLog::new(tenant_id, Some(operator_id), "setting.upsert", "setting") @@ -158,7 +158,7 @@ impl SettingService { "key": params.key, "scope": params.scope, }), - )); + ), db).await; audit_service::record( AuditLog::new(tenant_id, Some(operator_id), "setting.upsert", "setting") diff --git a/crates/erp-core/src/entity/domain_event.rs b/crates/erp-core/src/entity/domain_event.rs new file mode 100644 index 0000000..0d72db5 --- /dev/null +++ b/crates/erp-core/src/entity/domain_event.rs @@ -0,0 +1,24 @@ +use sea_orm::entity::prelude::*; +use serde::{Deserialize, Serialize}; + +/// 领域事件实体 — 映射 domain_events 表。 +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Serialize, Deserialize)] +#[sea_orm(table_name = "domain_events")] +pub struct Model { + #[sea_orm(primary_key, auto_increment = false)] + pub id: Uuid, + pub tenant_id: Uuid, + pub event_type: String, + pub payload: Option, + pub correlation_id: Option, + pub status: String, + pub attempts: i32, + pub last_error: Option, + pub created_at: DateTimeUtc, + pub published_at: Option, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation {} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/crates/erp-core/src/entity/mod.rs b/crates/erp-core/src/entity/mod.rs index 9c4a543..f9b0997 100644 --- a/crates/erp-core/src/entity/mod.rs +++ b/crates/erp-core/src/entity/mod.rs @@ -1 +1,2 @@ pub mod audit_log; +pub mod domain_event; diff --git a/crates/erp-core/src/events.rs b/crates/erp-core/src/events.rs index 7fea753..5a00dd3 100644 --- a/crates/erp-core/src/events.rs +++ b/crates/erp-core/src/events.rs @@ -1,9 +1,12 @@ -use chrono::{DateTime, Utc}; +use chrono::Utc; +use sea_orm::{ActiveModelTrait, Set}; use serde::{Deserialize, Serialize}; use tokio::sync::broadcast; use tracing::{error, info}; use uuid::Uuid; +use crate::entity::domain_event; + /// 领域事件 #[derive(Debug, Clone, Serialize, Deserialize)] pub struct DomainEvent { @@ -11,7 +14,7 @@ pub struct DomainEvent { pub event_type: String, pub tenant_id: Uuid, pub payload: serde_json::Value, - pub timestamp: DateTime, + pub timestamp: chrono::DateTime, pub correlation_id: Uuid, } @@ -46,11 +49,40 @@ impl EventBus { Self { sender } } - /// 发布事件 - pub fn publish(&self, event: DomainEvent) { - info!(event_type = %event.event_type, event_id = %event.id, "Event published"); + /// 发布事件:先持久化到 domain_events 表,再内存广播。 + /// + /// 持久化失败时仅记录 warning,仍然广播(best-effort)。 + pub async fn publish(&self, event: DomainEvent, db: &sea_orm::DatabaseConnection) { + // 持久化到 domain_events 表 + let model = domain_event::ActiveModel { + id: Set(event.id), + tenant_id: Set(event.tenant_id), + event_type: Set(event.event_type.clone()), + payload: Set(Some(event.payload.clone())), + correlation_id: Set(Some(event.correlation_id)), + status: Set("published".to_string()), + attempts: Set(0), + last_error: Set(None), + created_at: Set(event.timestamp), + published_at: Set(Some(Utc::now())), + }; + + match model.insert(db).await { + Ok(_) => {} + Err(e) => { + tracing::warn!(event_id = %event.id, error = %e, "领域事件持久化失败"); + } + } + + // 内存广播 + self.broadcast(event); + } + + /// 仅内存广播(不持久化,用于内部测试等场景)。 + pub fn broadcast(&self, event: DomainEvent) { + info!(event_type = %event.event_type, event_id = %event.id, "Event broadcast"); if let Err(e) = self.sender.send(event) { - error!("Failed to publish event: {}", e); + error!("Failed to broadcast event: {}", e); } } diff --git a/crates/erp-message/src/service/message_service.rs b/crates/erp-message/src/service/message_service.rs index 8ad346b..af1d917 100644 --- a/crates/erp-message/src/service/message_service.rs +++ b/crates/erp-message/src/service/message_service.rs @@ -130,7 +130,7 @@ impl MessageService { "recipient_id": req.recipient_id, "title": req.title, }), - )); + ), db).await; audit_service::record( AuditLog::new(tenant_id, Some(sender_id), "message.send", "message") @@ -198,7 +198,7 @@ impl MessageService { "message_id": id, "recipient_id": recipient_id, }), - )); + ), db).await; audit_service::record( AuditLog::new(tenant_id, Some(system_user), "message.send_system", "message") diff --git a/crates/erp-server/Cargo.toml b/crates/erp-server/Cargo.toml index 878ae07..5454cd1 100644 --- a/crates/erp-server/Cargo.toml +++ b/crates/erp-server/Cargo.toml @@ -29,3 +29,4 @@ erp-workflow.workspace = true erp-message.workspace = true anyhow.workspace = true uuid.workspace = true +chrono.workspace = true diff --git a/crates/erp-server/migration/src/lib.rs b/crates/erp-server/migration/src/lib.rs index 0e9fc9b..1af0fd9 100644 --- a/crates/erp-server/migration/src/lib.rs +++ b/crates/erp-server/migration/src/lib.rs @@ -30,6 +30,7 @@ mod m20260414_000027_fix_unique_indexes_soft_delete; mod m20260414_000028_add_standard_fields_to_tokens; mod m20260414_000029_add_standard_fields_to_process_variables; mod m20260415_000030_add_version_to_message_tables; +mod m20260416_000031_create_domain_events; pub struct Migrator; @@ -67,6 +68,7 @@ impl MigratorTrait for Migrator { Box::new(m20260414_000028_add_standard_fields_to_tokens::Migration), Box::new(m20260414_000029_add_standard_fields_to_process_variables::Migration), Box::new(m20260415_000030_add_version_to_message_tables::Migration), + Box::new(m20260416_000031_create_domain_events::Migration), ] } } diff --git a/crates/erp-server/migration/src/m20260416_000031_create_domain_events.rs b/crates/erp-server/migration/src/m20260416_000031_create_domain_events.rs new file mode 100644 index 0000000..6c35a60 --- /dev/null +++ b/crates/erp-server/migration/src/m20260416_000031_create_domain_events.rs @@ -0,0 +1,80 @@ +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .create_table( + Table::create() + .table(Alias::new("domain_events")) + .if_not_exists() + .col( + ColumnDef::new(Alias::new("id")) + .uuid() + .not_null() + .primary_key(), + ) + .col(ColumnDef::new(Alias::new("tenant_id")).uuid().not_null()) + .col(ColumnDef::new(Alias::new("event_type")).string_len(200).not_null()) + .col(ColumnDef::new(Alias::new("payload")).json().null()) + .col(ColumnDef::new(Alias::new("correlation_id")).uuid().null()) + .col( + ColumnDef::new(Alias::new("status")) + .string_len(20) + .not_null() + .default("pending"), + ) + .col( + ColumnDef::new(Alias::new("attempts")) + .integer() + .not_null() + .default(0), + ) + .col(ColumnDef::new(Alias::new("last_error")).text().null()) + .col( + ColumnDef::new(Alias::new("created_at")) + .timestamp_with_time_zone() + .not_null(), + ) + .col( + ColumnDef::new(Alias::new("published_at")) + .timestamp_with_time_zone() + .null(), + ) + .to_owned(), + ) + .await?; + + manager + .create_index( + Index::create() + .name("idx_domain_events_status") + .table(Alias::new("domain_events")) + .col(Alias::new("status")) + .col(Alias::new("created_at")) + .to_owned(), + ) + .await?; + + manager + .create_index( + Index::create() + .name("idx_domain_events_tenant") + .table(Alias::new("domain_events")) + .col(Alias::new("tenant_id")) + .to_owned(), + ) + .await?; + + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .drop_table(Table::drop().table(Alias::new("domain_events")).to_owned()) + .await + } +} diff --git a/crates/erp-server/src/main.rs b/crates/erp-server/src/main.rs index dbd056f..1c7af8b 100644 --- a/crates/erp-server/src/main.rs +++ b/crates/erp-server/src/main.rs @@ -2,6 +2,7 @@ mod config; mod db; mod handlers; mod middleware; +mod outbox; mod state; /// OpenAPI 规范定义(预留,未来可通过 utoipa derive 合并各模块 schema)。 @@ -142,6 +143,10 @@ async fn main() -> anyhow::Result<()> { erp_message::MessageModule::start_event_listener(db.clone(), event_bus.clone()); tracing::info!("Message event listener started"); + // Start outbox relay (re-publish pending domain events) + outbox::start_outbox_relay(db.clone(), event_bus.clone()); + tracing::info!("Outbox relay started"); + let host = config.server.host.clone(); let port = config.server.port; diff --git a/crates/erp-server/src/outbox.rs b/crates/erp-server/src/outbox.rs new file mode 100644 index 0000000..926920b --- /dev/null +++ b/crates/erp-server/src/outbox.rs @@ -0,0 +1,58 @@ +use chrono::Utc; +use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter, Set}; +use std::time::Duration; + +use erp_core::entity::domain_event; +use erp_core::events::{DomainEvent, EventBus}; + +/// 启动 outbox relay 后台任务。 +/// +/// 定期扫描 domain_events 表中 status = 'pending' 的事件, +/// 重新广播并标记为 published。 +pub fn start_outbox_relay(db: sea_orm::DatabaseConnection, event_bus: EventBus) { + tokio::spawn(async move { + let mut interval = tokio::time::interval(Duration::from_secs(5)); + loop { + interval.tick().await; + if let Err(e) = process_pending_events(&db, &event_bus).await { + tracing::warn!(error = %e, "Outbox relay 处理失败"); + } + } + }); +} + +async fn process_pending_events( + db: &sea_orm::DatabaseConnection, + event_bus: &EventBus, +) -> Result<(), sea_orm::DbErr> { + let pending = domain_event::Entity::find() + .filter(domain_event::Column::Status.eq("pending")) + .filter(domain_event::Column::Attempts.lt(3)) + .all(db) + .await?; + + if pending.is_empty() { + return Ok(()); + } + + tracing::info!(count = pending.len(), "处理待发领域事件"); + + for event_model in pending { + // 重建 DomainEvent 并广播 + let domain_event = DomainEvent::new( + &event_model.event_type, + event_model.tenant_id, + event_model.payload.clone().unwrap_or(serde_json::json!({})), + ); + + event_bus.broadcast(domain_event); + + // 标记为 published + let mut active: domain_event::ActiveModel = event_model.into(); + active.status = Set("published".to_string()); + active.published_at = Set(Some(Utc::now())); + active.update(db).await?; + } + + Ok(()) +} diff --git a/crates/erp-workflow/src/service/definition_service.rs b/crates/erp-workflow/src/service/definition_service.rs index 955bdd2..b254cf4 100644 --- a/crates/erp-workflow/src/service/definition_service.rs +++ b/crates/erp-workflow/src/service/definition_service.rs @@ -107,7 +107,7 @@ impl DefinitionService { "process_definition.created", tenant_id, serde_json::json!({ "definition_id": id, "key": req.key }), - )); + ), db).await; audit_service::record( AuditLog::new(tenant_id, Some(operator_id), "process_definition.create", "process_definition") @@ -245,7 +245,7 @@ impl DefinitionService { "process_definition.published", tenant_id, serde_json::json!({ "definition_id": id }), - )); + ), db).await; audit_service::record( AuditLog::new(tenant_id, Some(operator_id), "process_definition.publish", "process_definition") diff --git a/crates/erp-workflow/src/service/instance_service.rs b/crates/erp-workflow/src/service/instance_service.rs index 4f79de2..6faa488 100644 --- a/crates/erp-workflow/src/service/instance_service.rs +++ b/crates/erp-workflow/src/service/instance_service.rs @@ -130,7 +130,7 @@ impl InstanceService { "process_instance.started", tenant_id, serde_json::json!({ "instance_id": instance_id, "definition_id": definition.id, "started_by": operator_id }), - )); + ), db).await; audit_service::record( AuditLog::new(tenant_id, Some(operator_id), "process_instance.start", "process_instance") diff --git a/crates/erp-workflow/src/service/task_service.rs b/crates/erp-workflow/src/service/task_service.rs index 88f2009..357657d 100644 --- a/crates/erp-workflow/src/service/task_service.rs +++ b/crates/erp-workflow/src/service/task_service.rs @@ -242,7 +242,7 @@ impl TaskService { "task.completed", tenant_id, serde_json::json!({ "task_id": id, "outcome": req.outcome }), - )); + ), db).await; audit_service::record( AuditLog::new(tenant_id, Some(operator_id), "task.complete", "task")