Files
hms/crates/erp-message/src/service/message_service.rs
iven db2cd24259 feat(core): add audit logging to all mutation operations
Create audit_log SeaORM entity and audit_service::record() helper.
Integrate audit recording into 35 mutation endpoints across all modules:
- erp-auth: user/role/organization/department/position CRUD (15 actions)
- erp-config: dictionary/menu/setting/numbering_rule CRUD (15 actions)
- erp-workflow: definition/instance/task operations (8 actions)
- erp-message: send/system/mark_read/delete (5 actions)

Uses fire-and-forget pattern — audit failures logged but non-blocking.
2026-04-11 23:48:45 +08:00

355 lines
11 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
use chrono::Utc;
use sea_orm::{
ActiveModelTrait, ColumnTrait, EntityTrait, PaginatorTrait, QueryFilter, Set,
Statement, ConnectionTrait, DatabaseBackend,
};
use uuid::Uuid;
use crate::dto::{MessageQuery, MessageResp, SendMessageReq, UnreadCountResp};
use crate::entity::message;
use crate::error::{MessageError, MessageResult};
use erp_core::audit::AuditLog;
use erp_core::audit_service;
use erp_core::events::EventBus;
/// 消息服务。
pub struct MessageService;
impl MessageService {
/// 查询消息列表(分页)。
pub async fn list(
tenant_id: Uuid,
recipient_id: Uuid,
query: &MessageQuery,
db: &sea_orm::DatabaseConnection,
) -> MessageResult<(Vec<MessageResp>, u64)> {
let page_size = query.safe_page_size();
let mut q = message::Entity::find()
.filter(message::Column::TenantId.eq(tenant_id))
.filter(message::Column::RecipientId.eq(recipient_id))
.filter(message::Column::DeletedAt.is_null());
if let Some(is_read) = query.is_read {
q = q.filter(message::Column::IsRead.eq(is_read));
}
if let Some(ref priority) = query.priority {
q = q.filter(message::Column::Priority.eq(priority.as_str()));
}
if let Some(ref business_type) = query.business_type {
q = q.filter(message::Column::BusinessType.eq(business_type.as_str()));
}
if let Some(ref status) = query.status {
q = q.filter(message::Column::Status.eq(status.as_str()));
}
let paginator = q.paginate(db, page_size);
let total = paginator
.num_items()
.await
.map_err(|e| MessageError::Validation(e.to_string()))?;
let page_index = query.page.unwrap_or(1).saturating_sub(1);
let models = paginator
.fetch_page(page_index)
.await
.map_err(|e| MessageError::Validation(e.to_string()))?;
let resps = models.iter().map(Self::model_to_resp).collect();
Ok((resps, total))
}
/// 获取未读消息数量。
pub async fn unread_count(
tenant_id: Uuid,
recipient_id: Uuid,
db: &sea_orm::DatabaseConnection,
) -> MessageResult<UnreadCountResp> {
let count = message::Entity::find()
.filter(message::Column::TenantId.eq(tenant_id))
.filter(message::Column::RecipientId.eq(recipient_id))
.filter(message::Column::IsRead.eq(false))
.filter(message::Column::DeletedAt.is_null())
.count(db)
.await
.map_err(|e| MessageError::Validation(e.to_string()))?;
Ok(UnreadCountResp {
count: count as i64,
})
}
/// 发送消息。
pub async fn send(
tenant_id: Uuid,
sender_id: Uuid,
req: &SendMessageReq,
db: &sea_orm::DatabaseConnection,
event_bus: &EventBus,
) -> MessageResult<MessageResp> {
let id = Uuid::now_v7();
let now = Utc::now();
let model = message::ActiveModel {
id: Set(id),
tenant_id: Set(tenant_id),
template_id: Set(req.template_id),
sender_id: Set(Some(sender_id)),
sender_type: Set("user".to_string()),
recipient_id: Set(req.recipient_id),
recipient_type: Set(req.recipient_type.clone()),
title: Set(req.title.clone()),
body: Set(req.body.clone()),
priority: Set(req.priority.clone()),
business_type: Set(req.business_type.clone()),
business_id: Set(req.business_id),
is_read: Set(false),
read_at: Set(None),
is_archived: Set(false),
archived_at: Set(None),
sent_at: Set(Some(now)),
status: Set("sent".to_string()),
created_at: Set(now),
updated_at: Set(now),
created_by: Set(sender_id),
updated_by: Set(sender_id),
deleted_at: Set(None),
version: Set(1),
};
let inserted = model
.insert(db)
.await
.map_err(|e| MessageError::Validation(e.to_string()))?;
event_bus.publish(erp_core::events::DomainEvent::new(
"message.sent",
tenant_id,
serde_json::json!({
"message_id": id,
"recipient_id": req.recipient_id,
"title": req.title,
}),
));
audit_service::record(
AuditLog::new(tenant_id, Some(sender_id), "message.send", "message")
.with_resource_id(id),
db,
)
.await;
Ok(Self::model_to_resp(&inserted))
}
/// 系统发送消息(由事件处理器调用)。
#[allow(clippy::too_many_arguments)]
pub async fn send_system(
tenant_id: Uuid,
recipient_id: Uuid,
title: String,
body: String,
priority: &str,
business_type: Option<String>,
business_id: Option<Uuid>,
db: &sea_orm::DatabaseConnection,
event_bus: &EventBus,
) -> MessageResult<MessageResp> {
let id = Uuid::now_v7();
let now = Utc::now();
let system_user = Uuid::nil();
let model = message::ActiveModel {
id: Set(id),
tenant_id: Set(tenant_id),
template_id: Set(None),
sender_id: Set(None),
sender_type: Set("system".to_string()),
recipient_id: Set(recipient_id),
recipient_type: Set("user".to_string()),
title: Set(title),
body: Set(body),
priority: Set(priority.to_string()),
business_type: Set(business_type),
business_id: Set(business_id),
is_read: Set(false),
read_at: Set(None),
is_archived: Set(false),
archived_at: Set(None),
sent_at: Set(Some(now)),
status: Set("sent".to_string()),
created_at: Set(now),
updated_at: Set(now),
created_by: Set(system_user),
updated_by: Set(system_user),
deleted_at: Set(None),
version: Set(1),
};
let inserted = model
.insert(db)
.await
.map_err(|e| MessageError::Validation(e.to_string()))?;
event_bus.publish(erp_core::events::DomainEvent::new(
"message.sent",
tenant_id,
serde_json::json!({
"message_id": id,
"recipient_id": recipient_id,
}),
));
audit_service::record(
AuditLog::new(tenant_id, Some(system_user), "message.send_system", "message")
.with_resource_id(id),
db,
)
.await;
Ok(Self::model_to_resp(&inserted))
}
/// 标记消息已读。
pub async fn mark_read(
id: Uuid,
tenant_id: Uuid,
user_id: Uuid,
db: &sea_orm::DatabaseConnection,
) -> MessageResult<()> {
let model = message::Entity::find_by_id(id)
.one(db)
.await
.map_err(|e| MessageError::Validation(e.to_string()))?
.filter(|m| m.tenant_id == tenant_id && m.deleted_at.is_none())
.ok_or_else(|| MessageError::NotFound(format!("消息不存在: {id}")))?;
if model.recipient_id != user_id {
return Err(MessageError::Validation(
"只能标记自己的消息为已读".to_string(),
));
}
if model.is_read {
return Ok(());
}
let current_version = model.version;
let mut active: message::ActiveModel = model.into();
active.is_read = Set(true);
active.read_at = Set(Some(Utc::now()));
active.version = Set(current_version + 1);
active.updated_at = Set(Utc::now());
active.updated_by = Set(user_id);
active
.update(db)
.await
.map_err(|e| MessageError::Validation(e.to_string()))?;
audit_service::record(
AuditLog::new(tenant_id, Some(user_id), "message.mark_read", "message")
.with_resource_id(id),
db,
)
.await;
Ok(())
}
/// 标记所有消息已读(批量 UPDATE避免 N+1
pub async fn mark_all_read(
tenant_id: Uuid,
user_id: Uuid,
db: &sea_orm::DatabaseConnection,
) -> MessageResult<()> {
let now = Utc::now();
db.execute(Statement::from_sql_and_values(
DatabaseBackend::Postgres,
"UPDATE messages SET is_read = true, read_at = $1, updated_at = $2, updated_by = $3 WHERE tenant_id = $4 AND recipient_id = $5 AND is_read = false AND deleted_at IS NULL",
[
now.into(),
now.into(),
user_id.into(),
tenant_id.into(),
user_id.into(),
],
))
.await
.map_err(|e| MessageError::Validation(e.to_string()))?;
audit_service::record(
AuditLog::new(tenant_id, Some(user_id), "message.mark_all_read", "message"),
db,
)
.await;
Ok(())
}
/// 删除消息(软删除)。
pub async fn delete(
id: Uuid,
tenant_id: Uuid,
user_id: Uuid,
db: &sea_orm::DatabaseConnection,
) -> MessageResult<()> {
let model = message::Entity::find_by_id(id)
.one(db)
.await
.map_err(|e| MessageError::Validation(e.to_string()))?
.filter(|m| m.tenant_id == tenant_id && m.deleted_at.is_none())
.ok_or_else(|| MessageError::NotFound(format!("消息不存在: {id}")))?;
if model.recipient_id != user_id {
return Err(MessageError::Validation(
"只能删除自己的消息".to_string(),
));
}
let current_version = model.version;
let mut active: message::ActiveModel = model.into();
active.version = Set(current_version + 1);
active.deleted_at = Set(Some(Utc::now()));
active.updated_at = Set(Utc::now());
active.updated_by = Set(user_id);
active
.update(db)
.await
.map_err(|e| MessageError::Validation(e.to_string()))?;
audit_service::record(
AuditLog::new(tenant_id, Some(user_id), "message.delete", "message")
.with_resource_id(id),
db,
)
.await;
Ok(())
}
fn model_to_resp(m: &message::Model) -> MessageResp {
MessageResp {
id: m.id,
tenant_id: m.tenant_id,
template_id: m.template_id,
sender_id: m.sender_id,
sender_type: m.sender_type.clone(),
recipient_id: m.recipient_id,
recipient_type: m.recipient_type.clone(),
title: m.title.clone(),
body: m.body.clone(),
priority: m.priority.clone(),
business_type: m.business_type.clone(),
business_id: m.business_id,
is_read: m.is_read,
read_at: m.read_at,
is_archived: m.is_archived,
status: m.status.clone(),
sent_at: m.sent_at,
created_at: m.created_at,
updated_at: m.updated_at,
version: m.version,
}
}
}