- ErpModule trait hooks now accept db and event_bus parameters - AuthModule.on_tenant_created: seeds default roles, permissions, and admin user for new tenants using existing seed_tenant_auth() - AuthModule.on_tenant_deleted: soft-deletes all users for the tenant - Updated all other modules (config, workflow, message) to match the new trait signature
202 lines
6.7 KiB
Rust
202 lines
6.7 KiB
Rust
use axum::Router;
|
||
use axum::routing::{delete, get, put};
|
||
use std::sync::Arc;
|
||
use tokio::sync::Semaphore;
|
||
use uuid::Uuid;
|
||
|
||
use erp_core::error::AppResult;
|
||
use erp_core::events::EventBus;
|
||
use erp_core::module::ErpModule;
|
||
|
||
use crate::handler::{message_handler, subscription_handler, template_handler};
|
||
|
||
/// 消息中心模块,实现 ErpModule trait。
|
||
pub struct MessageModule;
|
||
|
||
impl MessageModule {
|
||
pub fn new() -> Self {
|
||
Self
|
||
}
|
||
|
||
/// 构建需要认证的路由。
|
||
pub fn protected_routes<S>() -> Router<S>
|
||
where
|
||
crate::message_state::MessageState: axum::extract::FromRef<S>,
|
||
S: Clone + Send + Sync + 'static,
|
||
{
|
||
Router::new()
|
||
// 消息路由
|
||
.route(
|
||
"/messages",
|
||
get(message_handler::list_messages).post(message_handler::send_message),
|
||
)
|
||
.route("/messages/unread-count", get(message_handler::unread_count))
|
||
.route("/messages/{id}/read", put(message_handler::mark_read))
|
||
.route("/messages/read-all", put(message_handler::mark_all_read))
|
||
.route("/messages/{id}", delete(message_handler::delete_message))
|
||
// 模板路由
|
||
.route(
|
||
"/message-templates",
|
||
get(template_handler::list_templates).post(template_handler::create_template),
|
||
)
|
||
// 订阅偏好路由
|
||
.route(
|
||
"/message-subscriptions",
|
||
put(subscription_handler::update_subscription),
|
||
)
|
||
}
|
||
|
||
/// 启动后台事件监听任务,将工作流事件转化为消息通知。
|
||
///
|
||
/// 使用 Semaphore 限制最大并发数为 8,防止事件突发时过度消耗资源。
|
||
/// 在 main.rs 中调用,因为需要 db 连接。
|
||
pub fn start_event_listener(db: sea_orm::DatabaseConnection, event_bus: EventBus) {
|
||
let mut rx = event_bus.subscribe();
|
||
let semaphore = Arc::new(Semaphore::new(8));
|
||
|
||
tokio::spawn(async move {
|
||
loop {
|
||
match rx.recv().await {
|
||
Ok(event) => {
|
||
let db = db.clone();
|
||
let event_bus = event_bus.clone();
|
||
let permit = semaphore.clone();
|
||
|
||
// 先获取许可,再 spawn 任务
|
||
tokio::spawn(async move {
|
||
let _permit = permit.acquire().await.unwrap();
|
||
if let Err(e) = handle_workflow_event(&event, &db, &event_bus).await {
|
||
tracing::warn!(
|
||
event_type = %event.event_type,
|
||
error = %e,
|
||
"Failed to handle workflow event for messages"
|
||
);
|
||
}
|
||
});
|
||
}
|
||
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
|
||
tracing::warn!(skipped = n, "Event listener lagged, skipping events");
|
||
}
|
||
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
|
||
tracing::info!("Event bus closed, stopping message event listener");
|
||
break;
|
||
}
|
||
}
|
||
}
|
||
});
|
||
}
|
||
}
|
||
|
||
impl Default for MessageModule {
|
||
fn default() -> Self {
|
||
Self::new()
|
||
}
|
||
}
|
||
|
||
#[async_trait::async_trait]
|
||
impl ErpModule for MessageModule {
|
||
fn name(&self) -> &str {
|
||
"message"
|
||
}
|
||
|
||
fn version(&self) -> &str {
|
||
env!("CARGO_PKG_VERSION")
|
||
}
|
||
|
||
fn dependencies(&self) -> Vec<&str> {
|
||
vec!["auth"]
|
||
}
|
||
|
||
fn register_event_handlers(&self, _bus: &EventBus) {}
|
||
|
||
async fn on_tenant_created(
|
||
&self,
|
||
_tenant_id: Uuid,
|
||
_db: &sea_orm::DatabaseConnection,
|
||
_event_bus: &EventBus,
|
||
) -> AppResult<()> {
|
||
Ok(())
|
||
}
|
||
|
||
async fn on_tenant_deleted(
|
||
&self,
|
||
_tenant_id: Uuid,
|
||
_db: &sea_orm::DatabaseConnection,
|
||
) -> AppResult<()> {
|
||
Ok(())
|
||
}
|
||
|
||
fn as_any(&self) -> &dyn std::any::Any {
|
||
self
|
||
}
|
||
}
|
||
|
||
/// 处理工作流事件,生成对应的消息通知。
|
||
async fn handle_workflow_event(
|
||
event: &erp_core::events::DomainEvent,
|
||
db: &sea_orm::DatabaseConnection,
|
||
event_bus: &EventBus,
|
||
) -> Result<(), String> {
|
||
match event.event_type.as_str() {
|
||
"process_instance.started" => {
|
||
let instance_id = event
|
||
.payload
|
||
.get("instance_id")
|
||
.and_then(|v| v.as_str())
|
||
.unwrap_or("unknown");
|
||
let starter_id = event.payload.get("started_by").and_then(|v| v.as_str());
|
||
|
||
if let Some(starter) = starter_id {
|
||
let recipient = match uuid::Uuid::parse_str(starter) {
|
||
Ok(id) => id,
|
||
Err(_) => return Ok(()),
|
||
};
|
||
let _ = crate::service::message_service::MessageService::send_system(
|
||
event.tenant_id,
|
||
recipient,
|
||
"流程已启动".to_string(),
|
||
format!("您的流程实例 {} 已启动执行。", instance_id),
|
||
"normal",
|
||
Some("workflow_instance".to_string()),
|
||
uuid::Uuid::parse_str(instance_id).ok(),
|
||
db,
|
||
event_bus,
|
||
)
|
||
.await
|
||
.map_err(|e| e.to_string())?;
|
||
}
|
||
}
|
||
"task.completed" => {
|
||
// 任务完成时通知流程发起人
|
||
let task_id = event
|
||
.payload
|
||
.get("task_id")
|
||
.and_then(|v| v.as_str())
|
||
.unwrap_or("unknown");
|
||
let starter_id = event.payload.get("started_by").and_then(|v| v.as_str());
|
||
|
||
if let Some(starter) = starter_id {
|
||
let recipient = match uuid::Uuid::parse_str(starter) {
|
||
Ok(id) => id,
|
||
Err(_) => return Ok(()),
|
||
};
|
||
let _ = crate::service::message_service::MessageService::send_system(
|
||
event.tenant_id,
|
||
recipient,
|
||
"流程任务已完成".to_string(),
|
||
format!("流程任务 {} 已完成,请查看。", task_id),
|
||
"normal",
|
||
Some("workflow_task".to_string()),
|
||
uuid::Uuid::parse_str(task_id).ok(),
|
||
db,
|
||
event_bus,
|
||
)
|
||
.await
|
||
.map_err(|e| e.to_string())?;
|
||
}
|
||
}
|
||
_ => {}
|
||
}
|
||
Ok(())
|
||
}
|