use chrono::Utc; use sea_orm::{ ActiveModelTrait, ColumnTrait, ConnectionTrait, DatabaseBackend, EntityTrait, PaginatorTrait, QueryFilter, Set, Statement, }; use uuid::Uuid; use crate::dto::{MessageQuery, MessageResp, SendMessageReq, UnreadCountResp}; use crate::entity::message; use crate::error::{MessageError, MessageResult}; use erp_core::audit::AuditLog; use erp_core::audit_service; use erp_core::events::EventBus; /// 消息服务。 pub struct MessageService; impl MessageService { /// 查询消息列表(分页)。 pub async fn list( tenant_id: Uuid, recipient_id: Uuid, query: &MessageQuery, db: &sea_orm::DatabaseConnection, ) -> MessageResult<(Vec, u64)> { let page_size = query.safe_page_size(); let mut q = message::Entity::find() .filter(message::Column::TenantId.eq(tenant_id)) .filter(message::Column::RecipientId.eq(recipient_id)) .filter(message::Column::DeletedAt.is_null()); if let Some(is_read) = query.is_read { q = q.filter(message::Column::IsRead.eq(is_read)); } if let Some(ref priority) = query.priority { q = q.filter(message::Column::Priority.eq(priority.as_str())); } if let Some(ref business_type) = query.business_type { q = q.filter(message::Column::BusinessType.eq(business_type.as_str())); } if let Some(ref status) = query.status { q = q.filter(message::Column::Status.eq(status.as_str())); } let paginator = q.paginate(db, page_size); let total = paginator .num_items() .await .map_err(|e| MessageError::Validation(e.to_string()))?; let page_index = query.page.unwrap_or(1).saturating_sub(1); let models = paginator .fetch_page(page_index) .await .map_err(|e| MessageError::Validation(e.to_string()))?; let resps = models.iter().map(Self::model_to_resp).collect(); Ok((resps, total)) } /// 获取未读消息数量。 pub async fn unread_count( tenant_id: Uuid, recipient_id: Uuid, db: &sea_orm::DatabaseConnection, ) -> MessageResult { let count = message::Entity::find() .filter(message::Column::TenantId.eq(tenant_id)) .filter(message::Column::RecipientId.eq(recipient_id)) .filter(message::Column::IsRead.eq(false)) .filter(message::Column::DeletedAt.is_null()) .count(db) .await .map_err(|e| MessageError::Validation(e.to_string()))?; Ok(UnreadCountResp { count: count as i64, }) } /// 发送消息。 pub async fn send( tenant_id: Uuid, sender_id: Uuid, req: &SendMessageReq, db: &sea_orm::DatabaseConnection, event_bus: &EventBus, ) -> MessageResult { let id = Uuid::now_v7(); let now = Utc::now(); let model = message::ActiveModel { id: Set(id), tenant_id: Set(tenant_id), template_id: Set(req.template_id), sender_id: Set(Some(sender_id)), sender_type: Set("user".to_string()), recipient_id: Set(req.recipient_id), recipient_type: Set(req.recipient_type.clone()), title: Set(req.title.clone()), body: Set(req.body.clone()), priority: Set(req.priority.clone()), business_type: Set(req.business_type.clone()), business_id: Set(req.business_id), is_read: Set(false), read_at: Set(None), is_archived: Set(false), archived_at: Set(None), sent_at: Set(Some(now)), status: Set("sent".to_string()), created_at: Set(now), updated_at: Set(now), created_by: Set(sender_id), updated_by: Set(sender_id), deleted_at: Set(None), version: Set(1), }; let inserted = model .insert(db) .await .map_err(|e| MessageError::Validation(e.to_string()))?; event_bus .publish( erp_core::events::DomainEvent::new( "message.sent", tenant_id, serde_json::json!({ "message_id": id, "recipient_id": req.recipient_id, "title": req.title, }), ), db, ) .await; audit_service::record( AuditLog::new(tenant_id, Some(sender_id), "message.send", "message") .with_resource_id(id), db, ) .await; Ok(Self::model_to_resp(&inserted)) } /// 系统发送消息(由事件处理器调用)。 /// /// 幂等保证:当 `business_id` 存在时,若同 tenant + recipient + business_id 的消息已存在, /// 直接返回已有消息,避免 outbox relay 重放导致重复通知。 #[allow(clippy::too_many_arguments)] pub async fn send_system( tenant_id: Uuid, recipient_id: Uuid, title: String, body: String, priority: &str, business_type: Option, business_id: Option, db: &sea_orm::DatabaseConnection, event_bus: &EventBus, ) -> MessageResult { // 幂等检查:防止 outbox relay 重放导致重复消息 if let Some(bid) = business_id { let existing = message::Entity::find() .filter(message::Column::TenantId.eq(tenant_id)) .filter(message::Column::RecipientId.eq(recipient_id)) .filter(message::Column::BusinessId.eq(bid)) .filter(message::Column::DeletedAt.is_null()) .one(db) .await .map_err(|e| MessageError::Validation(e.to_string()))?; if let Some(m) = existing { tracing::debug!( message_id = %m.id, business_id = %bid, "消息已存在,跳过重复创建(幂等保护)" ); return Ok(Self::model_to_resp(&m)); } } let id = Uuid::now_v7(); let now = Utc::now(); let system_user = Uuid::nil(); let model = message::ActiveModel { id: Set(id), tenant_id: Set(tenant_id), template_id: Set(None), sender_id: Set(None), sender_type: Set("system".to_string()), recipient_id: Set(recipient_id), recipient_type: Set("user".to_string()), title: Set(title), body: Set(body), priority: Set(priority.to_string()), business_type: Set(business_type), business_id: Set(business_id), is_read: Set(false), read_at: Set(None), is_archived: Set(false), archived_at: Set(None), sent_at: Set(Some(now)), status: Set("sent".to_string()), created_at: Set(now), updated_at: Set(now), created_by: Set(system_user), updated_by: Set(system_user), deleted_at: Set(None), version: Set(1), }; let inserted = model .insert(db) .await .map_err(|e| MessageError::Validation(e.to_string()))?; event_bus .publish( erp_core::events::DomainEvent::new( "message.sent", tenant_id, serde_json::json!({ "message_id": id, "recipient_id": recipient_id, }), ), db, ) .await; audit_service::record( AuditLog::new( tenant_id, Some(system_user), "message.send_system", "message", ) .with_resource_id(id), db, ) .await; Ok(Self::model_to_resp(&inserted)) } /// 标记消息已读。 pub async fn mark_read( id: Uuid, tenant_id: Uuid, user_id: Uuid, db: &sea_orm::DatabaseConnection, ) -> MessageResult<()> { let model = message::Entity::find_by_id(id) .one(db) .await .map_err(|e| MessageError::Validation(e.to_string()))? .filter(|m| m.tenant_id == tenant_id && m.deleted_at.is_none()) .ok_or_else(|| MessageError::NotFound(format!("消息不存在: {id}")))?; if model.recipient_id != user_id { return Err(MessageError::Validation( "只能标记自己的消息为已读".to_string(), )); } if model.is_read { return Ok(()); } let current_version = model.version; let mut active: message::ActiveModel = model.into(); active.is_read = Set(true); active.read_at = Set(Some(Utc::now())); active.version = Set(current_version + 1); active.updated_at = Set(Utc::now()); active.updated_by = Set(user_id); active .update(db) .await .map_err(|e| MessageError::Validation(e.to_string()))?; audit_service::record( AuditLog::new(tenant_id, Some(user_id), "message.mark_read", "message") .with_resource_id(id), db, ) .await; Ok(()) } /// 标记所有消息已读(批量 UPDATE,避免 N+1)。 pub async fn mark_all_read( tenant_id: Uuid, user_id: Uuid, db: &sea_orm::DatabaseConnection, ) -> MessageResult<()> { let now = Utc::now(); db.execute(Statement::from_sql_and_values( DatabaseBackend::Postgres, "UPDATE messages SET is_read = true, read_at = $1, updated_at = $2, updated_by = $3 WHERE tenant_id = $4 AND recipient_id = $5 AND is_read = false AND deleted_at IS NULL", [ now.into(), now.into(), user_id.into(), tenant_id.into(), user_id.into(), ], )) .await .map_err(|e| MessageError::Validation(e.to_string()))?; audit_service::record( AuditLog::new(tenant_id, Some(user_id), "message.mark_all_read", "message"), db, ) .await; Ok(()) } /// 删除消息(软删除)。 pub async fn delete( id: Uuid, tenant_id: Uuid, user_id: Uuid, db: &sea_orm::DatabaseConnection, ) -> MessageResult<()> { let model = message::Entity::find_by_id(id) .one(db) .await .map_err(|e| MessageError::Validation(e.to_string()))? .filter(|m| m.tenant_id == tenant_id && m.deleted_at.is_none()) .ok_or_else(|| MessageError::NotFound(format!("消息不存在: {id}")))?; if model.recipient_id != user_id { return Err(MessageError::Validation("只能删除自己的消息".to_string())); } let current_version = model.version; let mut active: message::ActiveModel = model.into(); active.version = Set(current_version + 1); active.deleted_at = Set(Some(Utc::now())); active.updated_at = Set(Utc::now()); active.updated_by = Set(user_id); active .update(db) .await .map_err(|e| MessageError::Validation(e.to_string()))?; audit_service::record( AuditLog::new(tenant_id, Some(user_id), "message.delete", "message") .with_resource_id(id), db, ) .await; Ok(()) } fn model_to_resp(m: &message::Model) -> MessageResp { MessageResp { id: m.id, tenant_id: m.tenant_id, template_id: m.template_id, sender_id: m.sender_id, sender_type: m.sender_type.clone(), recipient_id: m.recipient_id, recipient_type: m.recipient_type.clone(), title: m.title.clone(), body: m.body.clone(), priority: m.priority.clone(), business_type: m.business_type.clone(), business_id: m.business_id, is_read: m.is_read, read_at: m.read_at, is_archived: m.is_archived, status: m.status.clone(), sent_at: m.sent_at, created_at: m.created_at, updated_at: m.updated_at, version: m.version, } } }