Add VersionMismatch error variant and check_version() helper to erp-core. All 13 mutable entities now enforce version checking on update/delete: - erp-auth: user, role, organization, department, position - erp-config: dictionary, dictionary_item, menu, setting, numbering_rule - erp-workflow: process_definition, process_instance, task - erp-message: message, message_subscription Update DTOs to expose version in responses and require version in update requests. HTTP 409 Conflict returned on version mismatch.
319 lines
10 KiB
Rust
319 lines
10 KiB
Rust
use chrono::Utc;
|
||
use sea_orm::{
|
||
ActiveModelTrait, ColumnTrait, EntityTrait, PaginatorTrait, QueryFilter, Set,
|
||
Statement, ConnectionTrait, DatabaseBackend,
|
||
};
|
||
use uuid::Uuid;
|
||
|
||
use crate::dto::{MessageQuery, MessageResp, SendMessageReq, UnreadCountResp};
|
||
use crate::entity::message;
|
||
use crate::error::{MessageError, MessageResult};
|
||
use erp_core::events::EventBus;
|
||
|
||
/// 消息服务。
|
||
pub struct MessageService;
|
||
|
||
impl MessageService {
|
||
/// 查询消息列表(分页)。
|
||
pub async fn list(
|
||
tenant_id: Uuid,
|
||
recipient_id: Uuid,
|
||
query: &MessageQuery,
|
||
db: &sea_orm::DatabaseConnection,
|
||
) -> MessageResult<(Vec<MessageResp>, u64)> {
|
||
let page_size = query.safe_page_size();
|
||
let mut q = message::Entity::find()
|
||
.filter(message::Column::TenantId.eq(tenant_id))
|
||
.filter(message::Column::RecipientId.eq(recipient_id))
|
||
.filter(message::Column::DeletedAt.is_null());
|
||
|
||
if let Some(is_read) = query.is_read {
|
||
q = q.filter(message::Column::IsRead.eq(is_read));
|
||
}
|
||
if let Some(ref priority) = query.priority {
|
||
q = q.filter(message::Column::Priority.eq(priority.as_str()));
|
||
}
|
||
if let Some(ref business_type) = query.business_type {
|
||
q = q.filter(message::Column::BusinessType.eq(business_type.as_str()));
|
||
}
|
||
if let Some(ref status) = query.status {
|
||
q = q.filter(message::Column::Status.eq(status.as_str()));
|
||
}
|
||
|
||
let paginator = q.paginate(db, page_size);
|
||
|
||
let total = paginator
|
||
.num_items()
|
||
.await
|
||
.map_err(|e| MessageError::Validation(e.to_string()))?;
|
||
|
||
let page_index = query.page.unwrap_or(1).saturating_sub(1);
|
||
let models = paginator
|
||
.fetch_page(page_index)
|
||
.await
|
||
.map_err(|e| MessageError::Validation(e.to_string()))?;
|
||
|
||
let resps = models.iter().map(Self::model_to_resp).collect();
|
||
Ok((resps, total))
|
||
}
|
||
|
||
/// 获取未读消息数量。
|
||
pub async fn unread_count(
|
||
tenant_id: Uuid,
|
||
recipient_id: Uuid,
|
||
db: &sea_orm::DatabaseConnection,
|
||
) -> MessageResult<UnreadCountResp> {
|
||
let count = message::Entity::find()
|
||
.filter(message::Column::TenantId.eq(tenant_id))
|
||
.filter(message::Column::RecipientId.eq(recipient_id))
|
||
.filter(message::Column::IsRead.eq(false))
|
||
.filter(message::Column::DeletedAt.is_null())
|
||
.count(db)
|
||
.await
|
||
.map_err(|e| MessageError::Validation(e.to_string()))?;
|
||
|
||
Ok(UnreadCountResp {
|
||
count: count as i64,
|
||
})
|
||
}
|
||
|
||
/// 发送消息。
|
||
pub async fn send(
|
||
tenant_id: Uuid,
|
||
sender_id: Uuid,
|
||
req: &SendMessageReq,
|
||
db: &sea_orm::DatabaseConnection,
|
||
event_bus: &EventBus,
|
||
) -> MessageResult<MessageResp> {
|
||
let id = Uuid::now_v7();
|
||
let now = Utc::now();
|
||
|
||
let model = message::ActiveModel {
|
||
id: Set(id),
|
||
tenant_id: Set(tenant_id),
|
||
template_id: Set(req.template_id),
|
||
sender_id: Set(Some(sender_id)),
|
||
sender_type: Set("user".to_string()),
|
||
recipient_id: Set(req.recipient_id),
|
||
recipient_type: Set(req.recipient_type.clone()),
|
||
title: Set(req.title.clone()),
|
||
body: Set(req.body.clone()),
|
||
priority: Set(req.priority.clone()),
|
||
business_type: Set(req.business_type.clone()),
|
||
business_id: Set(req.business_id),
|
||
is_read: Set(false),
|
||
read_at: Set(None),
|
||
is_archived: Set(false),
|
||
archived_at: Set(None),
|
||
sent_at: Set(Some(now)),
|
||
status: Set("sent".to_string()),
|
||
created_at: Set(now),
|
||
updated_at: Set(now),
|
||
created_by: Set(sender_id),
|
||
updated_by: Set(sender_id),
|
||
deleted_at: Set(None),
|
||
version: Set(1),
|
||
};
|
||
|
||
let inserted = model
|
||
.insert(db)
|
||
.await
|
||
.map_err(|e| MessageError::Validation(e.to_string()))?;
|
||
|
||
event_bus.publish(erp_core::events::DomainEvent::new(
|
||
"message.sent",
|
||
tenant_id,
|
||
serde_json::json!({
|
||
"message_id": id,
|
||
"recipient_id": req.recipient_id,
|
||
"title": req.title,
|
||
}),
|
||
));
|
||
|
||
Ok(Self::model_to_resp(&inserted))
|
||
}
|
||
|
||
/// 系统发送消息(由事件处理器调用)。
|
||
#[allow(clippy::too_many_arguments)]
|
||
pub async fn send_system(
|
||
tenant_id: Uuid,
|
||
recipient_id: Uuid,
|
||
title: String,
|
||
body: String,
|
||
priority: &str,
|
||
business_type: Option<String>,
|
||
business_id: Option<Uuid>,
|
||
db: &sea_orm::DatabaseConnection,
|
||
event_bus: &EventBus,
|
||
) -> MessageResult<MessageResp> {
|
||
let id = Uuid::now_v7();
|
||
let now = Utc::now();
|
||
let system_user = Uuid::nil();
|
||
|
||
let model = message::ActiveModel {
|
||
id: Set(id),
|
||
tenant_id: Set(tenant_id),
|
||
template_id: Set(None),
|
||
sender_id: Set(None),
|
||
sender_type: Set("system".to_string()),
|
||
recipient_id: Set(recipient_id),
|
||
recipient_type: Set("user".to_string()),
|
||
title: Set(title),
|
||
body: Set(body),
|
||
priority: Set(priority.to_string()),
|
||
business_type: Set(business_type),
|
||
business_id: Set(business_id),
|
||
is_read: Set(false),
|
||
read_at: Set(None),
|
||
is_archived: Set(false),
|
||
archived_at: Set(None),
|
||
sent_at: Set(Some(now)),
|
||
status: Set("sent".to_string()),
|
||
created_at: Set(now),
|
||
updated_at: Set(now),
|
||
created_by: Set(system_user),
|
||
updated_by: Set(system_user),
|
||
deleted_at: Set(None),
|
||
version: Set(1),
|
||
};
|
||
|
||
let inserted = model
|
||
.insert(db)
|
||
.await
|
||
.map_err(|e| MessageError::Validation(e.to_string()))?;
|
||
|
||
event_bus.publish(erp_core::events::DomainEvent::new(
|
||
"message.sent",
|
||
tenant_id,
|
||
serde_json::json!({
|
||
"message_id": id,
|
||
"recipient_id": recipient_id,
|
||
}),
|
||
));
|
||
|
||
Ok(Self::model_to_resp(&inserted))
|
||
}
|
||
|
||
/// 标记消息已读。
|
||
pub async fn mark_read(
|
||
id: Uuid,
|
||
tenant_id: Uuid,
|
||
user_id: Uuid,
|
||
db: &sea_orm::DatabaseConnection,
|
||
) -> MessageResult<()> {
|
||
let model = message::Entity::find_by_id(id)
|
||
.one(db)
|
||
.await
|
||
.map_err(|e| MessageError::Validation(e.to_string()))?
|
||
.filter(|m| m.tenant_id == tenant_id && m.deleted_at.is_none())
|
||
.ok_or_else(|| MessageError::NotFound(format!("消息不存在: {id}")))?;
|
||
|
||
if model.recipient_id != user_id {
|
||
return Err(MessageError::Validation(
|
||
"只能标记自己的消息为已读".to_string(),
|
||
));
|
||
}
|
||
|
||
if model.is_read {
|
||
return Ok(());
|
||
}
|
||
|
||
let current_version = model.version;
|
||
let mut active: message::ActiveModel = model.into();
|
||
active.is_read = Set(true);
|
||
active.read_at = Set(Some(Utc::now()));
|
||
active.version = Set(current_version + 1);
|
||
active.updated_at = Set(Utc::now());
|
||
active.updated_by = Set(user_id);
|
||
active
|
||
.update(db)
|
||
.await
|
||
.map_err(|e| MessageError::Validation(e.to_string()))?;
|
||
|
||
Ok(())
|
||
}
|
||
|
||
/// 标记所有消息已读(批量 UPDATE,避免 N+1)。
|
||
pub async fn mark_all_read(
|
||
tenant_id: Uuid,
|
||
user_id: Uuid,
|
||
db: &sea_orm::DatabaseConnection,
|
||
) -> MessageResult<()> {
|
||
let now = Utc::now();
|
||
db.execute(Statement::from_sql_and_values(
|
||
DatabaseBackend::Postgres,
|
||
"UPDATE messages SET is_read = true, read_at = $1, updated_at = $2, updated_by = $3 WHERE tenant_id = $4 AND recipient_id = $5 AND is_read = false AND deleted_at IS NULL",
|
||
[
|
||
now.into(),
|
||
now.into(),
|
||
user_id.into(),
|
||
tenant_id.into(),
|
||
user_id.into(),
|
||
],
|
||
))
|
||
.await
|
||
.map_err(|e| MessageError::Validation(e.to_string()))?;
|
||
|
||
Ok(())
|
||
}
|
||
|
||
/// 删除消息(软删除)。
|
||
pub async fn delete(
|
||
id: Uuid,
|
||
tenant_id: Uuid,
|
||
user_id: Uuid,
|
||
db: &sea_orm::DatabaseConnection,
|
||
) -> MessageResult<()> {
|
||
let model = message::Entity::find_by_id(id)
|
||
.one(db)
|
||
.await
|
||
.map_err(|e| MessageError::Validation(e.to_string()))?
|
||
.filter(|m| m.tenant_id == tenant_id && m.deleted_at.is_none())
|
||
.ok_or_else(|| MessageError::NotFound(format!("消息不存在: {id}")))?;
|
||
|
||
if model.recipient_id != user_id {
|
||
return Err(MessageError::Validation(
|
||
"只能删除自己的消息".to_string(),
|
||
));
|
||
}
|
||
|
||
let current_version = model.version;
|
||
let mut active: message::ActiveModel = model.into();
|
||
active.version = Set(current_version + 1);
|
||
active.deleted_at = Set(Some(Utc::now()));
|
||
active.updated_at = Set(Utc::now());
|
||
active.updated_by = Set(user_id);
|
||
active
|
||
.update(db)
|
||
.await
|
||
.map_err(|e| MessageError::Validation(e.to_string()))?;
|
||
|
||
Ok(())
|
||
}
|
||
|
||
fn model_to_resp(m: &message::Model) -> MessageResp {
|
||
MessageResp {
|
||
id: m.id,
|
||
tenant_id: m.tenant_id,
|
||
template_id: m.template_id,
|
||
sender_id: m.sender_id,
|
||
sender_type: m.sender_type.clone(),
|
||
recipient_id: m.recipient_id,
|
||
recipient_type: m.recipient_type.clone(),
|
||
title: m.title.clone(),
|
||
body: m.body.clone(),
|
||
priority: m.priority.clone(),
|
||
business_type: m.business_type.clone(),
|
||
business_id: m.business_id,
|
||
is_read: m.is_read,
|
||
read_at: m.read_at,
|
||
is_archived: m.is_archived,
|
||
status: m.status.clone(),
|
||
sent_at: m.sent_at,
|
||
created_at: m.created_at,
|
||
updated_at: m.updated_at,
|
||
version: m.version,
|
||
}
|
||
}
|
||
}
|