From f29f6d76ee74b61c87da5fbf9bd6b50945fae0bc Mon Sep 17 00:00:00 2001 From: iven Date: Sat, 11 Apr 2026 14:16:45 +0800 Subject: [PATCH] fix(message): resolve Phase 5-6 audit findings - Add missing version column to all message tables (migration + entities) - Replace N+1 mark_all_read loop with single batch UPDATE query - Fix NotificationList infinite re-render (extract queryFilter to stable ref) - Fix NotificationPreferences dynamic import and remove unused Dayjs type - Add Semaphore (max 8) to event listener for backpressure control - Add /docs/openapi.json endpoint for API documentation - Add permission check to unread_count handler - Add version: Set(1) to all ActiveModel inserts --- apps/web/src/pages/Messages.tsx | 6 +- .../src/pages/messages/NotificationList.tsx | 24 ++-- .../messages/NotificationPreferences.tsx | 5 +- crates/erp-message/src/entity/message.rs | 1 + .../src/entity/message_subscription.rs | 1 + .../src/entity/message_template.rs | 1 + .../src/handler/message_handler.rs | 2 + crates/erp-message/src/module.rs | 9 ++ .../src/service/message_service.rs | 38 +++---- .../src/service/subscription_service.rs | 1 + .../src/service/template_service.rs | 1 + crates/erp-server/migration/src/lib.rs | 2 + ...15_000030_add_version_to_message_tables.rs | 104 ++++++++++++++++++ crates/erp-server/src/handlers/mod.rs | 1 + crates/erp-server/src/handlers/openapi.rs | 19 ++++ crates/erp-server/src/main.rs | 3 +- 16 files changed, 180 insertions(+), 38 deletions(-) create mode 100644 crates/erp-server/migration/src/m20260415_000030_add_version_to_message_tables.rs create mode 100644 crates/erp-server/src/handlers/openapi.rs diff --git a/apps/web/src/pages/Messages.tsx b/apps/web/src/pages/Messages.tsx index 1adf7e5..38258e7 100644 --- a/apps/web/src/pages/Messages.tsx +++ b/apps/web/src/pages/Messages.tsx @@ -3,6 +3,10 @@ import { Tabs } from 'antd'; import NotificationList from './messages/NotificationList'; import MessageTemplates from './messages/MessageTemplates'; import NotificationPreferences from './messages/NotificationPreferences'; +import type { MessageQuery } from '../api/messages'; + +/** 预定义的过滤器,避免每次渲染创建新引用导致子组件无限重渲染。 */ +const UNREAD_FILTER: MessageQuery = { is_read: false }; export default function Messages() { const [activeKey, setActiveKey] = useState('all'); @@ -21,7 +25,7 @@ export default function Messages() { { key: 'unread', label: '未读消息', - children: , + children: , }, { key: 'templates', diff --git a/apps/web/src/pages/messages/NotificationList.tsx b/apps/web/src/pages/messages/NotificationList.tsx index 501ba14..c984feb 100644 --- a/apps/web/src/pages/messages/NotificationList.tsx +++ b/apps/web/src/pages/messages/NotificationList.tsx @@ -1,4 +1,4 @@ -import { useEffect, useState } from 'react'; +import { useEffect, useState, useMemo, useCallback } from 'react'; import { Table, Button, Tag, Space, Modal, Typography, message } from 'antd'; import type { ColumnsType } from 'antd/es/table'; import { listMessages, markRead, markAllRead, deleteMessage, type MessageInfo, type MessageQuery } from '../../api/messages'; @@ -15,10 +15,10 @@ export default function NotificationList({ queryFilter }: Props) { const [page, setPage] = useState(1); const [loading, setLoading] = useState(false); - const fetchData = async (p = page) => { + const fetchData = useCallback(async (p = page, filter?: MessageQuery) => { setLoading(true); try { - const result = await listMessages({ page: p, page_size: 20, ...queryFilter }); + const result = await listMessages({ page: p, page_size: 20, ...filter }); setData(result.data); setTotal(result.total); } catch { @@ -26,17 +26,19 @@ export default function NotificationList({ queryFilter }: Props) { } finally { setLoading(false); } - }; + }, [page]); + + // 使用 JSON 序列化比较确保只在 filter 内容变化时触发 + const filterKey = useMemo(() => JSON.stringify(queryFilter), [queryFilter]); useEffect(() => { - fetchData(1); - // eslint-disable-next-line react-hooks/exhaustive-deps - }, [queryFilter]); + fetchData(1, queryFilter); + }, [filterKey, fetchData, queryFilter]); const handleMarkRead = async (id: string) => { try { await markRead(id); - fetchData(); + fetchData(page, queryFilter); } catch { message.error('操作失败'); } @@ -45,7 +47,7 @@ export default function NotificationList({ queryFilter }: Props) { const handleMarkAllRead = async () => { try { await markAllRead(); - fetchData(); + fetchData(page, queryFilter); message.success('已全部标记为已读'); } catch { message.error('操作失败'); @@ -55,7 +57,7 @@ export default function NotificationList({ queryFilter }: Props) { const handleDelete = async (id: string) => { try { await deleteMessage(id); - fetchData(); + fetchData(page, queryFilter); message.success('已删除'); } catch { message.error('删除失败'); @@ -158,7 +160,7 @@ export default function NotificationList({ queryFilter }: Props) { current: page, total, pageSize: 20, - onChange: (p) => { setPage(p); fetchData(p); }, + onChange: (p) => { setPage(p); fetchData(p, queryFilter); }, }} /> diff --git a/apps/web/src/pages/messages/NotificationPreferences.tsx b/apps/web/src/pages/messages/NotificationPreferences.tsx index 3db2a5d..d18e3eb 100644 --- a/apps/web/src/pages/messages/NotificationPreferences.tsx +++ b/apps/web/src/pages/messages/NotificationPreferences.tsx @@ -1,6 +1,6 @@ import { useEffect, useState } from 'react'; import { Form, Switch, TimePicker, Button, Card, message } from 'antd'; -import type { Dayjs } from 'dayjs'; +import client from '../../api/client'; interface PreferencesData { dnd_enabled: boolean; @@ -15,7 +15,6 @@ export default function NotificationPreferences() { useEffect(() => { // 加载当前偏好设置 - // 暂时使用默认值,后续连接 API form.setFieldsValue({ dnd_enabled: false, }); @@ -31,8 +30,6 @@ export default function NotificationPreferences() { dnd_end: values.dnd_range?.[1]?.format('HH:mm'), }; - // 调用 API 更新偏好 - const client = await import('../../api/client').then(m => m.default); await client.put('/message-subscriptions', { dnd_enabled: req.dnd_enabled, dnd_start: req.dnd_start, diff --git a/crates/erp-message/src/entity/message.rs b/crates/erp-message/src/entity/message.rs index 68ab012..d28b01b 100644 --- a/crates/erp-message/src/entity/message.rs +++ b/crates/erp-message/src/entity/message.rs @@ -36,6 +36,7 @@ pub struct Model { pub updated_by: Uuid, #[serde(skip_serializing_if = "Option::is_none")] pub deleted_at: Option, + pub version: i32, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] diff --git a/crates/erp-message/src/entity/message_subscription.rs b/crates/erp-message/src/entity/message_subscription.rs index 444e12e..56dbdce 100644 --- a/crates/erp-message/src/entity/message_subscription.rs +++ b/crates/erp-message/src/entity/message_subscription.rs @@ -23,6 +23,7 @@ pub struct Model { pub updated_by: Uuid, #[serde(skip_serializing_if = "Option::is_none")] pub deleted_at: Option, + pub version: i32, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] diff --git a/crates/erp-message/src/entity/message_template.rs b/crates/erp-message/src/entity/message_template.rs index 0a471bf..98aa839 100644 --- a/crates/erp-message/src/entity/message_template.rs +++ b/crates/erp-message/src/entity/message_template.rs @@ -19,6 +19,7 @@ pub struct Model { pub updated_by: Uuid, #[serde(skip_serializing_if = "Option::is_none")] pub deleted_at: Option, + pub version: i32, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] diff --git a/crates/erp-message/src/handler/message_handler.rs b/crates/erp-message/src/handler/message_handler.rs index 0b28b08..8d9f659 100644 --- a/crates/erp-message/src/handler/message_handler.rs +++ b/crates/erp-message/src/handler/message_handler.rs @@ -49,6 +49,8 @@ where MessageState: FromRef, S: Clone + Send + Sync + 'static, { + require_permission(&ctx, "message:list")?; + let result = MessageService::unread_count(ctx.tenant_id, ctx.user_id, &_state.db).await?; Ok(Json(ApiResponse::ok(result))) } diff --git a/crates/erp-message/src/module.rs b/crates/erp-message/src/module.rs index 9a1c566..d8287ef 100644 --- a/crates/erp-message/src/module.rs +++ b/crates/erp-message/src/module.rs @@ -1,5 +1,7 @@ use axum::Router; use axum::routing::{delete, get, put}; +use std::sync::Arc; +use tokio::sync::Semaphore; use uuid::Uuid; use erp_core::error::AppResult; @@ -60,16 +62,23 @@ impl MessageModule { /// 启动后台事件监听任务,将工作流事件转化为消息通知。 /// + /// 使用 Semaphore 限制最大并发数为 8,防止事件突发时过度消耗资源。 /// 在 main.rs 中调用,因为需要 db 连接。 pub fn start_event_listener(db: sea_orm::DatabaseConnection, event_bus: EventBus) { let mut rx = event_bus.subscribe(); + let semaphore = Arc::new(Semaphore::new(8)); + tokio::spawn(async move { loop { match rx.recv().await { Ok(event) => { let db = db.clone(); let event_bus = event_bus.clone(); + let permit = semaphore.clone(); + + // 先获取许可,再 spawn 任务 tokio::spawn(async move { + let _permit = permit.acquire().await.unwrap(); if let Err(e) = handle_workflow_event(&event, &db, &event_bus).await { diff --git a/crates/erp-message/src/service/message_service.rs b/crates/erp-message/src/service/message_service.rs index 0ab4146..a10acad 100644 --- a/crates/erp-message/src/service/message_service.rs +++ b/crates/erp-message/src/service/message_service.rs @@ -1,6 +1,7 @@ use chrono::Utc; use sea_orm::{ ActiveModelTrait, ColumnTrait, EntityTrait, PaginatorTrait, QueryFilter, Set, + Statement, ConnectionTrait, DatabaseBackend, }; use uuid::Uuid; @@ -111,6 +112,7 @@ impl MessageService { created_by: Set(sender_id), updated_by: Set(sender_id), deleted_at: Set(None), + version: Set(1), }; let inserted = model @@ -172,6 +174,7 @@ impl MessageService { created_by: Set(system_user), updated_by: Set(system_user), deleted_at: Set(None), + version: Set(1), }; let inserted = model @@ -228,33 +231,26 @@ impl MessageService { Ok(()) } - /// 标记所有消息已读。 + /// 标记所有消息已读(批量 UPDATE,避免 N+1)。 pub async fn mark_all_read( tenant_id: Uuid, user_id: Uuid, db: &sea_orm::DatabaseConnection, ) -> MessageResult<()> { - let unread = message::Entity::find() - .filter(message::Column::TenantId.eq(tenant_id)) - .filter(message::Column::RecipientId.eq(user_id)) - .filter(message::Column::IsRead.eq(false)) - .filter(message::Column::DeletedAt.is_null()) - .all(db) - .await - .map_err(|e| MessageError::Validation(e.to_string()))?; - let now = Utc::now(); - for m in unread { - let mut active: message::ActiveModel = m.into(); - active.is_read = Set(true); - active.read_at = Set(Some(now)); - active.updated_at = Set(now); - active.updated_by = Set(user_id); - active - .update(db) - .await - .map_err(|e| MessageError::Validation(e.to_string()))?; - } + db.execute(Statement::from_sql_and_values( + DatabaseBackend::Postgres, + "UPDATE messages SET is_read = true, read_at = $1, updated_at = $2, updated_by = $3 WHERE tenant_id = $4 AND recipient_id = $5 AND is_read = false AND deleted_at IS NULL", + [ + now.into(), + now.into(), + user_id.into(), + tenant_id.into(), + user_id.into(), + ], + )) + .await + .map_err(|e| MessageError::Validation(e.to_string()))?; Ok(()) } diff --git a/crates/erp-message/src/service/subscription_service.rs b/crates/erp-message/src/service/subscription_service.rs index ca4d785..443532d 100644 --- a/crates/erp-message/src/service/subscription_service.rs +++ b/crates/erp-message/src/service/subscription_service.rs @@ -88,6 +88,7 @@ impl SubscriptionService { created_by: Set(user_id), updated_by: Set(user_id), deleted_at: Set(None), + version: Set(1), }; let inserted = model diff --git a/crates/erp-message/src/service/template_service.rs b/crates/erp-message/src/service/template_service.rs index a7d098a..705a8c5 100644 --- a/crates/erp-message/src/service/template_service.rs +++ b/crates/erp-message/src/service/template_service.rs @@ -79,6 +79,7 @@ impl TemplateService { created_by: Set(operator_id), updated_by: Set(operator_id), deleted_at: Set(None), + version: Set(1), }; let inserted = model diff --git a/crates/erp-server/migration/src/lib.rs b/crates/erp-server/migration/src/lib.rs index a996065..0e9fc9b 100644 --- a/crates/erp-server/migration/src/lib.rs +++ b/crates/erp-server/migration/src/lib.rs @@ -29,6 +29,7 @@ mod m20260413_000026_create_audit_logs; mod m20260414_000027_fix_unique_indexes_soft_delete; mod m20260414_000028_add_standard_fields_to_tokens; mod m20260414_000029_add_standard_fields_to_process_variables; +mod m20260415_000030_add_version_to_message_tables; pub struct Migrator; @@ -65,6 +66,7 @@ impl MigratorTrait for Migrator { Box::new(m20260414_000027_fix_unique_indexes_soft_delete::Migration), Box::new(m20260414_000028_add_standard_fields_to_tokens::Migration), Box::new(m20260414_000029_add_standard_fields_to_process_variables::Migration), + Box::new(m20260415_000030_add_version_to_message_tables::Migration), ] } } diff --git a/crates/erp-server/migration/src/m20260415_000030_add_version_to_message_tables.rs b/crates/erp-server/migration/src/m20260415_000030_add_version_to_message_tables.rs new file mode 100644 index 0000000..0c4caf8 --- /dev/null +++ b/crates/erp-server/migration/src/m20260415_000030_add_version_to_message_tables.rs @@ -0,0 +1,104 @@ +use sea_orm_migration::prelude::*; + +/// 为三个消息表添加缺失的 version 列(乐观锁字段)。 +/// +/// CLAUDE.md 要求所有表包含 version 字段用于乐观锁,但消息模块的三个表遗漏了。 +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + // message_templates + manager + .alter_table( + Table::alter() + .table(MessageTemplates::Table) + .add_column( + ColumnDef::new(MessageTemplates::Version) + .integer() + .not_null() + .default(1), + ) + .to_owned(), + ) + .await?; + + // messages + manager + .alter_table( + Table::alter() + .table(Messages::Table) + .add_column( + ColumnDef::new(Messages::Version) + .integer() + .not_null() + .default(1), + ) + .to_owned(), + ) + .await?; + + // message_subscriptions + manager + .alter_table( + Table::alter() + .table(MessageSubscriptions::Table) + .add_column( + ColumnDef::new(MessageSubscriptions::Version) + .integer() + .not_null() + .default(1), + ) + .to_owned(), + ) + .await + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + Table::alter() + .table(MessageTemplates::Table) + .drop_column(MessageTemplates::Version) + .to_owned(), + ) + .await?; + + manager + .alter_table( + Table::alter() + .table(Messages::Table) + .drop_column(Messages::Version) + .to_owned(), + ) + .await?; + + manager + .alter_table( + Table::alter() + .table(MessageSubscriptions::Table) + .drop_column(MessageSubscriptions::Version) + .to_owned(), + ) + .await + } +} + +#[derive(DeriveIden)] +enum MessageTemplates { + Table, + Version, +} + +#[derive(DeriveIden)] +enum Messages { + Table, + Version, +} + +#[derive(DeriveIden)] +enum MessageSubscriptions { + Table, + Version, +} diff --git a/crates/erp-server/src/handlers/mod.rs b/crates/erp-server/src/handlers/mod.rs index 43a7c76..b3721d3 100644 --- a/crates/erp-server/src/handlers/mod.rs +++ b/crates/erp-server/src/handlers/mod.rs @@ -1 +1,2 @@ pub mod health; +pub mod openapi; diff --git a/crates/erp-server/src/handlers/openapi.rs b/crates/erp-server/src/handlers/openapi.rs new file mode 100644 index 0000000..66ea668 --- /dev/null +++ b/crates/erp-server/src/handlers/openapi.rs @@ -0,0 +1,19 @@ +use axum::response::Json; +use serde_json::Value; +use utoipa::openapi::OpenApiBuilder; + +/// GET /docs/openapi.json +/// +/// 返回 OpenAPI 3.0 规范 JSON 文档 +pub async fn openapi_spec() -> Json { + let mut info = utoipa::openapi::Info::new( + "ERP Platform API", + env!("CARGO_PKG_VERSION"), + ); + info.description = Some("ERP 平台底座 REST API 文档".to_string()); + + let spec = OpenApiBuilder::new() + .info(info) + .build(); + Json(serde_json::to_value(spec).unwrap_or_default()) +} diff --git a/crates/erp-server/src/main.rs b/crates/erp-server/src/main.rs index f00a795..5f402bb 100644 --- a/crates/erp-server/src/main.rs +++ b/crates/erp-server/src/main.rs @@ -3,7 +3,7 @@ mod db; mod handlers; mod state; -/// OpenAPI 规范定义。 +/// OpenAPI 规范定义(预留,未来可通过 utoipa derive 合并各模块 schema)。 #[derive(OpenApi)] #[openapi( info( @@ -168,6 +168,7 @@ async fn main() -> anyhow::Result<()> { let public_routes = Router::new() .merge(handlers::health::health_check_router()) .merge(erp_auth::AuthModule::public_routes()) + .route("/docs/openapi.json", axum::routing::get(handlers::openapi::openapi_spec)) .with_state(state.clone()); // Protected routes (JWT authentication required)