fix(message): resolve Phase 5-6 audit findings

- Add missing version column to all message tables (migration + entities)
- Replace N+1 mark_all_read loop with single batch UPDATE query
- Fix NotificationList infinite re-render (extract queryFilter to stable ref)
- Fix NotificationPreferences dynamic import and remove unused Dayjs type
- Add Semaphore (max 8) to event listener for backpressure control
- Add /docs/openapi.json endpoint for API documentation
- Add permission check to unread_count handler
- Add version: Set(1) to all ActiveModel inserts
This commit is contained in:
iven
2026-04-11 14:16:45 +08:00
parent 97d3c9026b
commit f29f6d76ee
16 changed files with 180 additions and 38 deletions

View File

@@ -36,6 +36,7 @@ pub struct Model {
pub updated_by: Uuid,
#[serde(skip_serializing_if = "Option::is_none")]
pub deleted_at: Option<DateTimeUtc>,
pub version: i32,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]

View File

@@ -23,6 +23,7 @@ pub struct Model {
pub updated_by: Uuid,
#[serde(skip_serializing_if = "Option::is_none")]
pub deleted_at: Option<DateTimeUtc>,
pub version: i32,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]

View File

@@ -19,6 +19,7 @@ pub struct Model {
pub updated_by: Uuid,
#[serde(skip_serializing_if = "Option::is_none")]
pub deleted_at: Option<DateTimeUtc>,
pub version: i32,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]

View File

@@ -49,6 +49,8 @@ where
MessageState: FromRef<S>,
S: Clone + Send + Sync + 'static,
{
require_permission(&ctx, "message:list")?;
let result = MessageService::unread_count(ctx.tenant_id, ctx.user_id, &_state.db).await?;
Ok(Json(ApiResponse::ok(result)))
}

View File

@@ -1,5 +1,7 @@
use axum::Router;
use axum::routing::{delete, get, put};
use std::sync::Arc;
use tokio::sync::Semaphore;
use uuid::Uuid;
use erp_core::error::AppResult;
@@ -60,16 +62,23 @@ impl MessageModule {
/// 启动后台事件监听任务,将工作流事件转化为消息通知。
///
/// 使用 Semaphore 限制最大并发数为 8防止事件突发时过度消耗资源。
/// 在 main.rs 中调用,因为需要 db 连接。
pub fn start_event_listener(db: sea_orm::DatabaseConnection, event_bus: EventBus) {
let mut rx = event_bus.subscribe();
let semaphore = Arc::new(Semaphore::new(8));
tokio::spawn(async move {
loop {
match rx.recv().await {
Ok(event) => {
let db = db.clone();
let event_bus = event_bus.clone();
let permit = semaphore.clone();
// 先获取许可,再 spawn 任务
tokio::spawn(async move {
let _permit = permit.acquire().await.unwrap();
if let Err(e) =
handle_workflow_event(&event, &db, &event_bus).await
{

View File

@@ -1,6 +1,7 @@
use chrono::Utc;
use sea_orm::{
ActiveModelTrait, ColumnTrait, EntityTrait, PaginatorTrait, QueryFilter, Set,
Statement, ConnectionTrait, DatabaseBackend,
};
use uuid::Uuid;
@@ -111,6 +112,7 @@ impl MessageService {
created_by: Set(sender_id),
updated_by: Set(sender_id),
deleted_at: Set(None),
version: Set(1),
};
let inserted = model
@@ -172,6 +174,7 @@ impl MessageService {
created_by: Set(system_user),
updated_by: Set(system_user),
deleted_at: Set(None),
version: Set(1),
};
let inserted = model
@@ -228,33 +231,26 @@ impl MessageService {
Ok(())
}
/// 标记所有消息已读。
/// 标记所有消息已读(批量 UPDATE避免 N+1
pub async fn mark_all_read(
tenant_id: Uuid,
user_id: Uuid,
db: &sea_orm::DatabaseConnection,
) -> MessageResult<()> {
let unread = message::Entity::find()
.filter(message::Column::TenantId.eq(tenant_id))
.filter(message::Column::RecipientId.eq(user_id))
.filter(message::Column::IsRead.eq(false))
.filter(message::Column::DeletedAt.is_null())
.all(db)
.await
.map_err(|e| MessageError::Validation(e.to_string()))?;
let now = Utc::now();
for m in unread {
let mut active: message::ActiveModel = m.into();
active.is_read = Set(true);
active.read_at = Set(Some(now));
active.updated_at = Set(now);
active.updated_by = Set(user_id);
active
.update(db)
.await
.map_err(|e| MessageError::Validation(e.to_string()))?;
}
db.execute(Statement::from_sql_and_values(
DatabaseBackend::Postgres,
"UPDATE messages SET is_read = true, read_at = $1, updated_at = $2, updated_by = $3 WHERE tenant_id = $4 AND recipient_id = $5 AND is_read = false AND deleted_at IS NULL",
[
now.into(),
now.into(),
user_id.into(),
tenant_id.into(),
user_id.into(),
],
))
.await
.map_err(|e| MessageError::Validation(e.to_string()))?;
Ok(())
}

View File

@@ -88,6 +88,7 @@ impl SubscriptionService {
created_by: Set(user_id),
updated_by: Set(user_id),
deleted_at: Set(None),
version: Set(1),
};
let inserted = model

View File

@@ -79,6 +79,7 @@ impl TemplateService {
created_by: Set(operator_id),
updated_by: Set(operator_id),
deleted_at: Set(None),
version: Set(1),
};
let inserted = model

View File

@@ -29,6 +29,7 @@ mod m20260413_000026_create_audit_logs;
mod m20260414_000027_fix_unique_indexes_soft_delete;
mod m20260414_000028_add_standard_fields_to_tokens;
mod m20260414_000029_add_standard_fields_to_process_variables;
mod m20260415_000030_add_version_to_message_tables;
pub struct Migrator;
@@ -65,6 +66,7 @@ impl MigratorTrait for Migrator {
Box::new(m20260414_000027_fix_unique_indexes_soft_delete::Migration),
Box::new(m20260414_000028_add_standard_fields_to_tokens::Migration),
Box::new(m20260414_000029_add_standard_fields_to_process_variables::Migration),
Box::new(m20260415_000030_add_version_to_message_tables::Migration),
]
}
}

View File

@@ -0,0 +1,104 @@
use sea_orm_migration::prelude::*;
/// 为三个消息表添加缺失的 version 列(乐观锁字段)。
///
/// CLAUDE.md 要求所有表包含 version 字段用于乐观锁,但消息模块的三个表遗漏了。
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
// message_templates
manager
.alter_table(
Table::alter()
.table(MessageTemplates::Table)
.add_column(
ColumnDef::new(MessageTemplates::Version)
.integer()
.not_null()
.default(1),
)
.to_owned(),
)
.await?;
// messages
manager
.alter_table(
Table::alter()
.table(Messages::Table)
.add_column(
ColumnDef::new(Messages::Version)
.integer()
.not_null()
.default(1),
)
.to_owned(),
)
.await?;
// message_subscriptions
manager
.alter_table(
Table::alter()
.table(MessageSubscriptions::Table)
.add_column(
ColumnDef::new(MessageSubscriptions::Version)
.integer()
.not_null()
.default(1),
)
.to_owned(),
)
.await
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.alter_table(
Table::alter()
.table(MessageTemplates::Table)
.drop_column(MessageTemplates::Version)
.to_owned(),
)
.await?;
manager
.alter_table(
Table::alter()
.table(Messages::Table)
.drop_column(Messages::Version)
.to_owned(),
)
.await?;
manager
.alter_table(
Table::alter()
.table(MessageSubscriptions::Table)
.drop_column(MessageSubscriptions::Version)
.to_owned(),
)
.await
}
}
#[derive(DeriveIden)]
enum MessageTemplates {
Table,
Version,
}
#[derive(DeriveIden)]
enum Messages {
Table,
Version,
}
#[derive(DeriveIden)]
enum MessageSubscriptions {
Table,
Version,
}

View File

@@ -1 +1,2 @@
pub mod health;
pub mod openapi;

View File

@@ -0,0 +1,19 @@
use axum::response::Json;
use serde_json::Value;
use utoipa::openapi::OpenApiBuilder;
/// GET /docs/openapi.json
///
/// 返回 OpenAPI 3.0 规范 JSON 文档
pub async fn openapi_spec() -> Json<Value> {
let mut info = utoipa::openapi::Info::new(
"ERP Platform API",
env!("CARGO_PKG_VERSION"),
);
info.description = Some("ERP 平台底座 REST API 文档".to_string());
let spec = OpenApiBuilder::new()
.info(info)
.build();
Json(serde_json::to_value(spec).unwrap_or_default())
}

View File

@@ -3,7 +3,7 @@ mod db;
mod handlers;
mod state;
/// OpenAPI 规范定义。
/// OpenAPI 规范定义(预留,未来可通过 utoipa derive 合并各模块 schema
#[derive(OpenApi)]
#[openapi(
info(
@@ -168,6 +168,7 @@ async fn main() -> anyhow::Result<()> {
let public_routes = Router::new()
.merge(handlers::health::health_check_router())
.merge(erp_auth::AuthModule::public_routes())
.route("/docs/openapi.json", axum::routing::get(handlers::openapi::openapi_spec))
.with_state(state.clone());
// Protected routes (JWT authentication required)