feat(message): add message center module (Phase 5)

Implement the complete message center with:
- Database migrations for message_templates, messages, message_subscriptions tables
- erp-message crate with entities, DTOs, services, handlers
- Message CRUD, send, read/unread tracking, soft delete
- Template management with variable interpolation
- Subscription preferences with DND support
- Frontend: messages page, notification panel, unread count badge
- Server integration with module registration and routing

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
iven
2026-04-11 12:25:05 +08:00
parent 91ecaa3ed7
commit 5ceed71e62
35 changed files with 2252 additions and 15 deletions

View File

@@ -0,0 +1,316 @@
use chrono::Utc;
use sea_orm::{
ActiveModelTrait, ColumnTrait, EntityTrait, PaginatorTrait, QueryFilter, Set,
};
use uuid::Uuid;
use crate::dto::{MessageQuery, MessageResp, SendMessageReq, UnreadCountResp};
use crate::entity::message;
use crate::error::{MessageError, MessageResult};
use erp_core::events::EventBus;
/// 消息服务。
pub struct MessageService;
impl MessageService {
/// 查询消息列表(分页)。
pub async fn list(
tenant_id: Uuid,
recipient_id: Uuid,
query: &MessageQuery,
db: &sea_orm::DatabaseConnection,
) -> MessageResult<(Vec<MessageResp>, u64)> {
let page_size = query.page_size.unwrap_or(20);
let mut q = message::Entity::find()
.filter(message::Column::TenantId.eq(tenant_id))
.filter(message::Column::RecipientId.eq(recipient_id))
.filter(message::Column::DeletedAt.is_null());
if let Some(is_read) = query.is_read {
q = q.filter(message::Column::IsRead.eq(is_read));
}
if let Some(ref priority) = query.priority {
q = q.filter(message::Column::Priority.eq(priority.as_str()));
}
if let Some(ref business_type) = query.business_type {
q = q.filter(message::Column::BusinessType.eq(business_type.as_str()));
}
if let Some(ref status) = query.status {
q = q.filter(message::Column::Status.eq(status.as_str()));
}
let paginator = q.paginate(db, page_size);
let total = paginator
.num_items()
.await
.map_err(|e| MessageError::Validation(e.to_string()))?;
let page_index = query.page.unwrap_or(1).saturating_sub(1) as u64;
let models = paginator
.fetch_page(page_index)
.await
.map_err(|e| MessageError::Validation(e.to_string()))?;
let resps = models.iter().map(Self::model_to_resp).collect();
Ok((resps, total))
}
/// 获取未读消息数量。
pub async fn unread_count(
tenant_id: Uuid,
recipient_id: Uuid,
db: &sea_orm::DatabaseConnection,
) -> MessageResult<UnreadCountResp> {
let count = message::Entity::find()
.filter(message::Column::TenantId.eq(tenant_id))
.filter(message::Column::RecipientId.eq(recipient_id))
.filter(message::Column::IsRead.eq(false))
.filter(message::Column::DeletedAt.is_null())
.count(db)
.await
.map_err(|e| MessageError::Validation(e.to_string()))?;
Ok(UnreadCountResp {
count: count as i64,
})
}
/// 发送消息。
pub async fn send(
tenant_id: Uuid,
sender_id: Uuid,
req: &SendMessageReq,
db: &sea_orm::DatabaseConnection,
event_bus: &EventBus,
) -> MessageResult<MessageResp> {
let id = Uuid::now_v7();
let now = Utc::now();
let model = message::ActiveModel {
id: Set(id),
tenant_id: Set(tenant_id),
template_id: Set(req.template_id),
sender_id: Set(Some(sender_id)),
sender_type: Set("user".to_string()),
recipient_id: Set(req.recipient_id),
recipient_type: Set(req.recipient_type.clone()),
title: Set(req.title.clone()),
body: Set(req.body.clone()),
priority: Set(req.priority.clone()),
business_type: Set(req.business_type.clone()),
business_id: Set(req.business_id),
is_read: Set(false),
read_at: Set(None),
is_archived: Set(false),
archived_at: Set(None),
sent_at: Set(Some(now)),
status: Set("sent".to_string()),
created_at: Set(now),
updated_at: Set(now),
created_by: Set(sender_id),
updated_by: Set(sender_id),
deleted_at: Set(None),
};
let inserted = model
.insert(db)
.await
.map_err(|e| MessageError::Validation(e.to_string()))?;
event_bus.publish(erp_core::events::DomainEvent::new(
"message.sent",
tenant_id,
serde_json::json!({
"message_id": id,
"recipient_id": req.recipient_id,
"title": req.title,
}),
));
Ok(Self::model_to_resp(&inserted))
}
/// 系统发送消息(由事件处理器调用)。
pub async fn send_system(
tenant_id: Uuid,
recipient_id: Uuid,
title: String,
body: String,
priority: &str,
business_type: Option<String>,
business_id: Option<Uuid>,
db: &sea_orm::DatabaseConnection,
event_bus: &EventBus,
) -> MessageResult<MessageResp> {
let id = Uuid::now_v7();
let now = Utc::now();
let system_user = Uuid::nil();
let model = message::ActiveModel {
id: Set(id),
tenant_id: Set(tenant_id),
template_id: Set(None),
sender_id: Set(None),
sender_type: Set("system".to_string()),
recipient_id: Set(recipient_id),
recipient_type: Set("user".to_string()),
title: Set(title),
body: Set(body),
priority: Set(priority.to_string()),
business_type: Set(business_type),
business_id: Set(business_id),
is_read: Set(false),
read_at: Set(None),
is_archived: Set(false),
archived_at: Set(None),
sent_at: Set(Some(now)),
status: Set("sent".to_string()),
created_at: Set(now),
updated_at: Set(now),
created_by: Set(system_user),
updated_by: Set(system_user),
deleted_at: Set(None),
};
let inserted = model
.insert(db)
.await
.map_err(|e| MessageError::Validation(e.to_string()))?;
event_bus.publish(erp_core::events::DomainEvent::new(
"message.sent",
tenant_id,
serde_json::json!({
"message_id": id,
"recipient_id": recipient_id,
}),
));
Ok(Self::model_to_resp(&inserted))
}
/// 标记消息已读。
pub async fn mark_read(
id: Uuid,
tenant_id: Uuid,
user_id: Uuid,
db: &sea_orm::DatabaseConnection,
) -> MessageResult<()> {
let model = message::Entity::find_by_id(id)
.one(db)
.await
.map_err(|e| MessageError::Validation(e.to_string()))?
.filter(|m| m.tenant_id == tenant_id && m.deleted_at.is_none())
.ok_or_else(|| MessageError::NotFound(format!("消息不存在: {id}")))?;
if model.recipient_id != user_id {
return Err(MessageError::Validation(
"只能标记自己的消息为已读".to_string(),
));
}
if model.is_read {
return Ok(());
}
let mut active: message::ActiveModel = model.into();
active.is_read = Set(true);
active.read_at = Set(Some(Utc::now()));
active.updated_at = Set(Utc::now());
active.updated_by = Set(user_id);
active
.update(db)
.await
.map_err(|e| MessageError::Validation(e.to_string()))?;
Ok(())
}
/// 标记所有消息已读。
pub async fn mark_all_read(
tenant_id: Uuid,
user_id: Uuid,
db: &sea_orm::DatabaseConnection,
) -> MessageResult<()> {
let unread = message::Entity::find()
.filter(message::Column::TenantId.eq(tenant_id))
.filter(message::Column::RecipientId.eq(user_id))
.filter(message::Column::IsRead.eq(false))
.filter(message::Column::DeletedAt.is_null())
.all(db)
.await
.map_err(|e| MessageError::Validation(e.to_string()))?;
let now = Utc::now();
for m in unread {
let mut active: message::ActiveModel = m.into();
active.is_read = Set(true);
active.read_at = Set(Some(now));
active.updated_at = Set(now);
active.updated_by = Set(user_id);
active
.update(db)
.await
.map_err(|e| MessageError::Validation(e.to_string()))?;
}
Ok(())
}
/// 删除消息(软删除)。
pub async fn delete(
id: Uuid,
tenant_id: Uuid,
user_id: Uuid,
db: &sea_orm::DatabaseConnection,
) -> MessageResult<()> {
let model = message::Entity::find_by_id(id)
.one(db)
.await
.map_err(|e| MessageError::Validation(e.to_string()))?
.filter(|m| m.tenant_id == tenant_id && m.deleted_at.is_none())
.ok_or_else(|| MessageError::NotFound(format!("消息不存在: {id}")))?;
if model.recipient_id != user_id {
return Err(MessageError::Validation(
"只能删除自己的消息".to_string(),
));
}
let mut active: message::ActiveModel = model.into();
active.deleted_at = Set(Some(Utc::now()));
active.updated_at = Set(Utc::now());
active.updated_by = Set(user_id);
active
.update(db)
.await
.map_err(|e| MessageError::Validation(e.to_string()))?;
Ok(())
}
fn model_to_resp(m: &message::Model) -> MessageResp {
MessageResp {
id: m.id,
tenant_id: m.tenant_id,
template_id: m.template_id,
sender_id: m.sender_id,
sender_type: m.sender_type.clone(),
recipient_id: m.recipient_id,
recipient_type: m.recipient_type.clone(),
title: m.title.clone(),
body: m.body.clone(),
priority: m.priority.clone(),
business_type: m.business_type.clone(),
business_id: m.business_id,
is_read: m.is_read,
read_at: m.read_at,
is_archived: m.is_archived,
status: m.status.clone(),
sent_at: m.sent_at,
created_at: m.created_at,
updated_at: m.updated_at,
}
}
}

View File

@@ -0,0 +1,3 @@
pub mod message_service;
pub mod subscription_service;
pub mod template_service;

View File

@@ -0,0 +1,116 @@
use chrono::Utc;
use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter, Set};
use uuid::Uuid;
use crate::dto::{MessageSubscriptionResp, UpdateSubscriptionReq};
use crate::entity::message_subscription;
use crate::error::{MessageError, MessageResult};
/// 消息订阅偏好服务。
pub struct SubscriptionService;
impl SubscriptionService {
/// 获取用户订阅偏好。
pub async fn get(
tenant_id: Uuid,
user_id: Uuid,
db: &sea_orm::DatabaseConnection,
) -> MessageResult<MessageSubscriptionResp> {
let model = message_subscription::Entity::find()
.filter(message_subscription::Column::TenantId.eq(tenant_id))
.filter(message_subscription::Column::UserId.eq(user_id))
.filter(message_subscription::Column::DeletedAt.is_null())
.one(db)
.await
.map_err(|e| MessageError::Validation(e.to_string()))?
.ok_or_else(|| MessageError::NotFound("订阅偏好不存在".to_string()))?;
Ok(Self::model_to_resp(&model))
}
/// 创建或更新用户订阅偏好upsert
pub async fn upsert(
tenant_id: Uuid,
user_id: Uuid,
req: &UpdateSubscriptionReq,
db: &sea_orm::DatabaseConnection,
) -> MessageResult<MessageSubscriptionResp> {
let existing = message_subscription::Entity::find()
.filter(message_subscription::Column::TenantId.eq(tenant_id))
.filter(message_subscription::Column::UserId.eq(user_id))
.filter(message_subscription::Column::DeletedAt.is_null())
.one(db)
.await
.map_err(|e| MessageError::Validation(e.to_string()))?;
let now = Utc::now();
if let Some(model) = existing {
let mut active: message_subscription::ActiveModel = model.into();
if let Some(types) = &req.notification_types {
active.notification_types = Set(Some(types.clone()));
}
if let Some(prefs) = &req.channel_preferences {
active.channel_preferences = Set(Some(prefs.clone()));
}
if let Some(dnd) = req.dnd_enabled {
active.dnd_enabled = Set(dnd);
}
if let Some(ref start) = req.dnd_start {
active.dnd_start = Set(Some(start.clone()));
}
if let Some(ref end) = req.dnd_end {
active.dnd_end = Set(Some(end.clone()));
}
active.updated_at = Set(now);
active.updated_by = Set(user_id);
let updated = active
.update(db)
.await
.map_err(|e| MessageError::Validation(e.to_string()))?;
Ok(Self::model_to_resp(&updated))
} else {
let id = Uuid::now_v7();
let model = message_subscription::ActiveModel {
id: Set(id),
tenant_id: Set(tenant_id),
user_id: Set(user_id),
notification_types: Set(req.notification_types.clone()),
channel_preferences: Set(req.channel_preferences.clone()),
dnd_enabled: Set(req.dnd_enabled.unwrap_or(false)),
dnd_start: Set(req.dnd_start.clone()),
dnd_end: Set(req.dnd_end.clone()),
created_at: Set(now),
updated_at: Set(now),
created_by: Set(user_id),
updated_by: Set(user_id),
deleted_at: Set(None),
};
let inserted = model
.insert(db)
.await
.map_err(|e| MessageError::Validation(e.to_string()))?;
Ok(Self::model_to_resp(&inserted))
}
}
fn model_to_resp(m: &message_subscription::Model) -> MessageSubscriptionResp {
MessageSubscriptionResp {
id: m.id,
tenant_id: m.tenant_id,
user_id: m.user_id,
notification_types: m.notification_types.clone(),
channel_preferences: m.channel_preferences.clone(),
dnd_enabled: m.dnd_enabled,
dnd_start: m.dnd_start.clone(),
dnd_end: m.dnd_end.clone(),
created_at: m.created_at,
updated_at: m.updated_at,
}
}
}

View File

@@ -0,0 +1,116 @@
use chrono::Utc;
use sea_orm::{
ActiveModelTrait, ColumnTrait, EntityTrait, PaginatorTrait, QueryFilter, Set,
};
use uuid::Uuid;
use crate::dto::{CreateTemplateReq, MessageTemplateResp};
use crate::entity::message_template;
use crate::error::{MessageError, MessageResult};
/// 消息模板服务。
pub struct TemplateService;
impl TemplateService {
/// 查询模板列表。
pub async fn list(
tenant_id: Uuid,
page: u64,
page_size: u64,
db: &sea_orm::DatabaseConnection,
) -> MessageResult<(Vec<MessageTemplateResp>, u64)> {
let paginator = message_template::Entity::find()
.filter(message_template::Column::TenantId.eq(tenant_id))
.filter(message_template::Column::DeletedAt.is_null())
.paginate(db, page_size);
let total = paginator
.num_items()
.await
.map_err(|e| MessageError::Validation(e.to_string()))?;
let page_index = page.saturating_sub(1);
let models = paginator
.fetch_page(page_index)
.await
.map_err(|e| MessageError::Validation(e.to_string()))?;
let resps = models.iter().map(Self::model_to_resp).collect();
Ok((resps, total))
}
/// 创建消息模板。
pub async fn create(
tenant_id: Uuid,
operator_id: Uuid,
req: &CreateTemplateReq,
db: &sea_orm::DatabaseConnection,
) -> MessageResult<MessageTemplateResp> {
// 检查编码唯一性
let existing = message_template::Entity::find()
.filter(message_template::Column::TenantId.eq(tenant_id))
.filter(message_template::Column::Code.eq(&req.code))
.filter(message_template::Column::DeletedAt.is_null())
.one(db)
.await
.map_err(|e| MessageError::Validation(e.to_string()))?;
if existing.is_some() {
return Err(MessageError::DuplicateTemplateCode(format!(
"模板编码已存在: {}",
req.code
)));
}
let id = Uuid::now_v7();
let now = Utc::now();
let model = message_template::ActiveModel {
id: Set(id),
tenant_id: Set(tenant_id),
name: Set(req.name.clone()),
code: Set(req.code.clone()),
channel: Set(req.channel.clone()),
title_template: Set(req.title_template.clone()),
body_template: Set(req.body_template.clone()),
language: Set(req.language.clone()),
created_at: Set(now),
updated_at: Set(now),
created_by: Set(operator_id),
updated_by: Set(operator_id),
deleted_at: Set(None),
};
let inserted = model
.insert(db)
.await
.map_err(|e| MessageError::Validation(e.to_string()))?;
Ok(Self::model_to_resp(&inserted))
}
/// 使用模板渲染消息内容。
/// 支持 {{variable}} 格式的变量插值。
pub fn render(template: &str, variables: &std::collections::HashMap<String, String>) -> String {
let mut result = template.to_string();
for (key, value) in variables {
result = result.replace(&format!("{{{{{}}}}}", key), value);
}
result
}
fn model_to_resp(m: &message_template::Model) -> MessageTemplateResp {
MessageTemplateResp {
id: m.id,
tenant_id: m.tenant_id,
name: m.name.clone(),
code: m.code.clone(),
channel: m.channel.clone(),
title_template: m.title_template.clone(),
body_template: m.body_template.clone(),
language: m.language.clone(),
created_at: m.created_at,
updated_at: m.updated_at,
}
}
}