diff --git a/apps/web/src/pages/Messages.tsx b/apps/web/src/pages/Messages.tsx
index 1adf7e5..38258e7 100644
--- a/apps/web/src/pages/Messages.tsx
+++ b/apps/web/src/pages/Messages.tsx
@@ -3,6 +3,10 @@ import { Tabs } from 'antd';
import NotificationList from './messages/NotificationList';
import MessageTemplates from './messages/MessageTemplates';
import NotificationPreferences from './messages/NotificationPreferences';
+import type { MessageQuery } from '../api/messages';
+
+/** 预定义的过滤器,避免每次渲染创建新引用导致子组件无限重渲染。 */
+const UNREAD_FILTER: MessageQuery = { is_read: false };
export default function Messages() {
const [activeKey, setActiveKey] = useState('all');
@@ -21,7 +25,7 @@ export default function Messages() {
{
key: 'unread',
label: '未读消息',
- children: ,
+ children: ,
},
{
key: 'templates',
diff --git a/apps/web/src/pages/messages/NotificationList.tsx b/apps/web/src/pages/messages/NotificationList.tsx
index 501ba14..c984feb 100644
--- a/apps/web/src/pages/messages/NotificationList.tsx
+++ b/apps/web/src/pages/messages/NotificationList.tsx
@@ -1,4 +1,4 @@
-import { useEffect, useState } from 'react';
+import { useEffect, useState, useMemo, useCallback } from 'react';
import { Table, Button, Tag, Space, Modal, Typography, message } from 'antd';
import type { ColumnsType } from 'antd/es/table';
import { listMessages, markRead, markAllRead, deleteMessage, type MessageInfo, type MessageQuery } from '../../api/messages';
@@ -15,10 +15,10 @@ export default function NotificationList({ queryFilter }: Props) {
const [page, setPage] = useState(1);
const [loading, setLoading] = useState(false);
- const fetchData = async (p = page) => {
+ const fetchData = useCallback(async (p = page, filter?: MessageQuery) => {
setLoading(true);
try {
- const result = await listMessages({ page: p, page_size: 20, ...queryFilter });
+ const result = await listMessages({ page: p, page_size: 20, ...filter });
setData(result.data);
setTotal(result.total);
} catch {
@@ -26,17 +26,19 @@ export default function NotificationList({ queryFilter }: Props) {
} finally {
setLoading(false);
}
- };
+ }, [page]);
+
+ // 使用 JSON 序列化比较确保只在 filter 内容变化时触发
+ const filterKey = useMemo(() => JSON.stringify(queryFilter), [queryFilter]);
useEffect(() => {
- fetchData(1);
- // eslint-disable-next-line react-hooks/exhaustive-deps
- }, [queryFilter]);
+ fetchData(1, queryFilter);
+ }, [filterKey, fetchData, queryFilter]);
const handleMarkRead = async (id: string) => {
try {
await markRead(id);
- fetchData();
+ fetchData(page, queryFilter);
} catch {
message.error('操作失败');
}
@@ -45,7 +47,7 @@ export default function NotificationList({ queryFilter }: Props) {
const handleMarkAllRead = async () => {
try {
await markAllRead();
- fetchData();
+ fetchData(page, queryFilter);
message.success('已全部标记为已读');
} catch {
message.error('操作失败');
@@ -55,7 +57,7 @@ export default function NotificationList({ queryFilter }: Props) {
const handleDelete = async (id: string) => {
try {
await deleteMessage(id);
- fetchData();
+ fetchData(page, queryFilter);
message.success('已删除');
} catch {
message.error('删除失败');
@@ -158,7 +160,7 @@ export default function NotificationList({ queryFilter }: Props) {
current: page,
total,
pageSize: 20,
- onChange: (p) => { setPage(p); fetchData(p); },
+ onChange: (p) => { setPage(p); fetchData(p, queryFilter); },
}}
/>
diff --git a/apps/web/src/pages/messages/NotificationPreferences.tsx b/apps/web/src/pages/messages/NotificationPreferences.tsx
index 3db2a5d..d18e3eb 100644
--- a/apps/web/src/pages/messages/NotificationPreferences.tsx
+++ b/apps/web/src/pages/messages/NotificationPreferences.tsx
@@ -1,6 +1,6 @@
import { useEffect, useState } from 'react';
import { Form, Switch, TimePicker, Button, Card, message } from 'antd';
-import type { Dayjs } from 'dayjs';
+import client from '../../api/client';
interface PreferencesData {
dnd_enabled: boolean;
@@ -15,7 +15,6 @@ export default function NotificationPreferences() {
useEffect(() => {
// 加载当前偏好设置
- // 暂时使用默认值,后续连接 API
form.setFieldsValue({
dnd_enabled: false,
});
@@ -31,8 +30,6 @@ export default function NotificationPreferences() {
dnd_end: values.dnd_range?.[1]?.format('HH:mm'),
};
- // 调用 API 更新偏好
- const client = await import('../../api/client').then(m => m.default);
await client.put('/message-subscriptions', {
dnd_enabled: req.dnd_enabled,
dnd_start: req.dnd_start,
diff --git a/crates/erp-message/src/entity/message.rs b/crates/erp-message/src/entity/message.rs
index 68ab012..d28b01b 100644
--- a/crates/erp-message/src/entity/message.rs
+++ b/crates/erp-message/src/entity/message.rs
@@ -36,6 +36,7 @@ pub struct Model {
pub updated_by: Uuid,
#[serde(skip_serializing_if = "Option::is_none")]
pub deleted_at: Option,
+ pub version: i32,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
diff --git a/crates/erp-message/src/entity/message_subscription.rs b/crates/erp-message/src/entity/message_subscription.rs
index 444e12e..56dbdce 100644
--- a/crates/erp-message/src/entity/message_subscription.rs
+++ b/crates/erp-message/src/entity/message_subscription.rs
@@ -23,6 +23,7 @@ pub struct Model {
pub updated_by: Uuid,
#[serde(skip_serializing_if = "Option::is_none")]
pub deleted_at: Option,
+ pub version: i32,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
diff --git a/crates/erp-message/src/entity/message_template.rs b/crates/erp-message/src/entity/message_template.rs
index 0a471bf..98aa839 100644
--- a/crates/erp-message/src/entity/message_template.rs
+++ b/crates/erp-message/src/entity/message_template.rs
@@ -19,6 +19,7 @@ pub struct Model {
pub updated_by: Uuid,
#[serde(skip_serializing_if = "Option::is_none")]
pub deleted_at: Option,
+ pub version: i32,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
diff --git a/crates/erp-message/src/handler/message_handler.rs b/crates/erp-message/src/handler/message_handler.rs
index 0b28b08..8d9f659 100644
--- a/crates/erp-message/src/handler/message_handler.rs
+++ b/crates/erp-message/src/handler/message_handler.rs
@@ -49,6 +49,8 @@ where
MessageState: FromRef,
S: Clone + Send + Sync + 'static,
{
+ require_permission(&ctx, "message:list")?;
+
let result = MessageService::unread_count(ctx.tenant_id, ctx.user_id, &_state.db).await?;
Ok(Json(ApiResponse::ok(result)))
}
diff --git a/crates/erp-message/src/module.rs b/crates/erp-message/src/module.rs
index 9a1c566..d8287ef 100644
--- a/crates/erp-message/src/module.rs
+++ b/crates/erp-message/src/module.rs
@@ -1,5 +1,7 @@
use axum::Router;
use axum::routing::{delete, get, put};
+use std::sync::Arc;
+use tokio::sync::Semaphore;
use uuid::Uuid;
use erp_core::error::AppResult;
@@ -60,16 +62,23 @@ impl MessageModule {
/// 启动后台事件监听任务,将工作流事件转化为消息通知。
///
+ /// 使用 Semaphore 限制最大并发数为 8,防止事件突发时过度消耗资源。
/// 在 main.rs 中调用,因为需要 db 连接。
pub fn start_event_listener(db: sea_orm::DatabaseConnection, event_bus: EventBus) {
let mut rx = event_bus.subscribe();
+ let semaphore = Arc::new(Semaphore::new(8));
+
tokio::spawn(async move {
loop {
match rx.recv().await {
Ok(event) => {
let db = db.clone();
let event_bus = event_bus.clone();
+ let permit = semaphore.clone();
+
+ // 先获取许可,再 spawn 任务
tokio::spawn(async move {
+ let _permit = permit.acquire().await.unwrap();
if let Err(e) =
handle_workflow_event(&event, &db, &event_bus).await
{
diff --git a/crates/erp-message/src/service/message_service.rs b/crates/erp-message/src/service/message_service.rs
index 0ab4146..a10acad 100644
--- a/crates/erp-message/src/service/message_service.rs
+++ b/crates/erp-message/src/service/message_service.rs
@@ -1,6 +1,7 @@
use chrono::Utc;
use sea_orm::{
ActiveModelTrait, ColumnTrait, EntityTrait, PaginatorTrait, QueryFilter, Set,
+ Statement, ConnectionTrait, DatabaseBackend,
};
use uuid::Uuid;
@@ -111,6 +112,7 @@ impl MessageService {
created_by: Set(sender_id),
updated_by: Set(sender_id),
deleted_at: Set(None),
+ version: Set(1),
};
let inserted = model
@@ -172,6 +174,7 @@ impl MessageService {
created_by: Set(system_user),
updated_by: Set(system_user),
deleted_at: Set(None),
+ version: Set(1),
};
let inserted = model
@@ -228,33 +231,26 @@ impl MessageService {
Ok(())
}
- /// 标记所有消息已读。
+ /// 标记所有消息已读(批量 UPDATE,避免 N+1)。
pub async fn mark_all_read(
tenant_id: Uuid,
user_id: Uuid,
db: &sea_orm::DatabaseConnection,
) -> MessageResult<()> {
- let unread = message::Entity::find()
- .filter(message::Column::TenantId.eq(tenant_id))
- .filter(message::Column::RecipientId.eq(user_id))
- .filter(message::Column::IsRead.eq(false))
- .filter(message::Column::DeletedAt.is_null())
- .all(db)
- .await
- .map_err(|e| MessageError::Validation(e.to_string()))?;
-
let now = Utc::now();
- for m in unread {
- let mut active: message::ActiveModel = m.into();
- active.is_read = Set(true);
- active.read_at = Set(Some(now));
- active.updated_at = Set(now);
- active.updated_by = Set(user_id);
- active
- .update(db)
- .await
- .map_err(|e| MessageError::Validation(e.to_string()))?;
- }
+ db.execute(Statement::from_sql_and_values(
+ DatabaseBackend::Postgres,
+ "UPDATE messages SET is_read = true, read_at = $1, updated_at = $2, updated_by = $3 WHERE tenant_id = $4 AND recipient_id = $5 AND is_read = false AND deleted_at IS NULL",
+ [
+ now.into(),
+ now.into(),
+ user_id.into(),
+ tenant_id.into(),
+ user_id.into(),
+ ],
+ ))
+ .await
+ .map_err(|e| MessageError::Validation(e.to_string()))?;
Ok(())
}
diff --git a/crates/erp-message/src/service/subscription_service.rs b/crates/erp-message/src/service/subscription_service.rs
index ca4d785..443532d 100644
--- a/crates/erp-message/src/service/subscription_service.rs
+++ b/crates/erp-message/src/service/subscription_service.rs
@@ -88,6 +88,7 @@ impl SubscriptionService {
created_by: Set(user_id),
updated_by: Set(user_id),
deleted_at: Set(None),
+ version: Set(1),
};
let inserted = model
diff --git a/crates/erp-message/src/service/template_service.rs b/crates/erp-message/src/service/template_service.rs
index a7d098a..705a8c5 100644
--- a/crates/erp-message/src/service/template_service.rs
+++ b/crates/erp-message/src/service/template_service.rs
@@ -79,6 +79,7 @@ impl TemplateService {
created_by: Set(operator_id),
updated_by: Set(operator_id),
deleted_at: Set(None),
+ version: Set(1),
};
let inserted = model
diff --git a/crates/erp-server/migration/src/lib.rs b/crates/erp-server/migration/src/lib.rs
index a996065..0e9fc9b 100644
--- a/crates/erp-server/migration/src/lib.rs
+++ b/crates/erp-server/migration/src/lib.rs
@@ -29,6 +29,7 @@ mod m20260413_000026_create_audit_logs;
mod m20260414_000027_fix_unique_indexes_soft_delete;
mod m20260414_000028_add_standard_fields_to_tokens;
mod m20260414_000029_add_standard_fields_to_process_variables;
+mod m20260415_000030_add_version_to_message_tables;
pub struct Migrator;
@@ -65,6 +66,7 @@ impl MigratorTrait for Migrator {
Box::new(m20260414_000027_fix_unique_indexes_soft_delete::Migration),
Box::new(m20260414_000028_add_standard_fields_to_tokens::Migration),
Box::new(m20260414_000029_add_standard_fields_to_process_variables::Migration),
+ Box::new(m20260415_000030_add_version_to_message_tables::Migration),
]
}
}
diff --git a/crates/erp-server/migration/src/m20260415_000030_add_version_to_message_tables.rs b/crates/erp-server/migration/src/m20260415_000030_add_version_to_message_tables.rs
new file mode 100644
index 0000000..0c4caf8
--- /dev/null
+++ b/crates/erp-server/migration/src/m20260415_000030_add_version_to_message_tables.rs
@@ -0,0 +1,104 @@
+use sea_orm_migration::prelude::*;
+
+/// 为三个消息表添加缺失的 version 列(乐观锁字段)。
+///
+/// CLAUDE.md 要求所有表包含 version 字段用于乐观锁,但消息模块的三个表遗漏了。
+#[derive(DeriveMigrationName)]
+pub struct Migration;
+
+#[async_trait::async_trait]
+impl MigrationTrait for Migration {
+ async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
+ // message_templates
+ manager
+ .alter_table(
+ Table::alter()
+ .table(MessageTemplates::Table)
+ .add_column(
+ ColumnDef::new(MessageTemplates::Version)
+ .integer()
+ .not_null()
+ .default(1),
+ )
+ .to_owned(),
+ )
+ .await?;
+
+ // messages
+ manager
+ .alter_table(
+ Table::alter()
+ .table(Messages::Table)
+ .add_column(
+ ColumnDef::new(Messages::Version)
+ .integer()
+ .not_null()
+ .default(1),
+ )
+ .to_owned(),
+ )
+ .await?;
+
+ // message_subscriptions
+ manager
+ .alter_table(
+ Table::alter()
+ .table(MessageSubscriptions::Table)
+ .add_column(
+ ColumnDef::new(MessageSubscriptions::Version)
+ .integer()
+ .not_null()
+ .default(1),
+ )
+ .to_owned(),
+ )
+ .await
+ }
+
+ async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
+ manager
+ .alter_table(
+ Table::alter()
+ .table(MessageTemplates::Table)
+ .drop_column(MessageTemplates::Version)
+ .to_owned(),
+ )
+ .await?;
+
+ manager
+ .alter_table(
+ Table::alter()
+ .table(Messages::Table)
+ .drop_column(Messages::Version)
+ .to_owned(),
+ )
+ .await?;
+
+ manager
+ .alter_table(
+ Table::alter()
+ .table(MessageSubscriptions::Table)
+ .drop_column(MessageSubscriptions::Version)
+ .to_owned(),
+ )
+ .await
+ }
+}
+
+#[derive(DeriveIden)]
+enum MessageTemplates {
+ Table,
+ Version,
+}
+
+#[derive(DeriveIden)]
+enum Messages {
+ Table,
+ Version,
+}
+
+#[derive(DeriveIden)]
+enum MessageSubscriptions {
+ Table,
+ Version,
+}
diff --git a/crates/erp-server/src/handlers/mod.rs b/crates/erp-server/src/handlers/mod.rs
index 43a7c76..b3721d3 100644
--- a/crates/erp-server/src/handlers/mod.rs
+++ b/crates/erp-server/src/handlers/mod.rs
@@ -1 +1,2 @@
pub mod health;
+pub mod openapi;
diff --git a/crates/erp-server/src/handlers/openapi.rs b/crates/erp-server/src/handlers/openapi.rs
new file mode 100644
index 0000000..66ea668
--- /dev/null
+++ b/crates/erp-server/src/handlers/openapi.rs
@@ -0,0 +1,19 @@
+use axum::response::Json;
+use serde_json::Value;
+use utoipa::openapi::OpenApiBuilder;
+
+/// GET /docs/openapi.json
+///
+/// 返回 OpenAPI 3.0 规范 JSON 文档
+pub async fn openapi_spec() -> Json {
+ let mut info = utoipa::openapi::Info::new(
+ "ERP Platform API",
+ env!("CARGO_PKG_VERSION"),
+ );
+ info.description = Some("ERP 平台底座 REST API 文档".to_string());
+
+ let spec = OpenApiBuilder::new()
+ .info(info)
+ .build();
+ Json(serde_json::to_value(spec).unwrap_or_default())
+}
diff --git a/crates/erp-server/src/main.rs b/crates/erp-server/src/main.rs
index f00a795..5f402bb 100644
--- a/crates/erp-server/src/main.rs
+++ b/crates/erp-server/src/main.rs
@@ -3,7 +3,7 @@ mod db;
mod handlers;
mod state;
-/// OpenAPI 规范定义。
+/// OpenAPI 规范定义(预留,未来可通过 utoipa derive 合并各模块 schema)。
#[derive(OpenApi)]
#[openapi(
info(
@@ -168,6 +168,7 @@ async fn main() -> anyhow::Result<()> {
let public_routes = Router::new()
.merge(handlers::health::health_check_router())
.merge(erp_auth::AuthModule::public_routes())
+ .route("/docs/openapi.json", axum::routing::get(handlers::openapi::openapi_spec))
.with_state(state.clone());
// Protected routes (JWT authentication required)