feat(message): add cross-module event integration (Phase 6)
- Message module subscribes to workflow events (process_instance.started) - Auto-generates notifications when workflows start - Added started_by to workflow instance event payload - Event listener runs as background tokio task Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -57,6 +57,43 @@ impl MessageModule {
|
|||||||
put(subscription_handler::update_subscription),
|
put(subscription_handler::update_subscription),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// 启动后台事件监听任务,将工作流事件转化为消息通知。
|
||||||
|
///
|
||||||
|
/// 在 main.rs 中调用,因为需要 db 连接。
|
||||||
|
pub fn start_event_listener(db: sea_orm::DatabaseConnection, event_bus: EventBus) {
|
||||||
|
use sea_orm::ConnectionTrait;
|
||||||
|
|
||||||
|
let mut rx = event_bus.subscribe();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
loop {
|
||||||
|
match rx.recv().await {
|
||||||
|
Ok(event) => {
|
||||||
|
let db = db.clone();
|
||||||
|
let event_bus = event_bus.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
if let Err(e) =
|
||||||
|
handle_workflow_event(&event, &db, &event_bus).await
|
||||||
|
{
|
||||||
|
tracing::warn!(
|
||||||
|
event_type = %event.event_type,
|
||||||
|
error = %e,
|
||||||
|
"Failed to handle workflow event for messages"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
|
||||||
|
tracing::warn!(skipped = n, "Event listener lagged, skipping events");
|
||||||
|
}
|
||||||
|
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
|
||||||
|
tracing::info!("Event bus closed, stopping message event listener");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for MessageModule {
|
impl Default for MessageModule {
|
||||||
@@ -97,3 +134,46 @@ impl ErpModule for MessageModule {
|
|||||||
self
|
self
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// 处理工作流事件,生成对应的消息通知。
|
||||||
|
async fn handle_workflow_event(
|
||||||
|
event: &erp_core::events::DomainEvent,
|
||||||
|
db: &sea_orm::DatabaseConnection,
|
||||||
|
event_bus: &EventBus,
|
||||||
|
) -> Result<(), String> {
|
||||||
|
match event.event_type.as_str() {
|
||||||
|
"process_instance.started" => {
|
||||||
|
let instance_id = event.payload.get("instance_id")
|
||||||
|
.and_then(|v| v.as_str())
|
||||||
|
.unwrap_or("unknown");
|
||||||
|
let starter_id = event.payload.get("started_by")
|
||||||
|
.and_then(|v| v.as_str());
|
||||||
|
|
||||||
|
if let Some(starter) = starter_id {
|
||||||
|
let recipient = match uuid::Uuid::parse_str(starter) {
|
||||||
|
Ok(id) => id,
|
||||||
|
Err(_) => return Ok(()),
|
||||||
|
};
|
||||||
|
let _ = crate::service::message_service::MessageService::send_system(
|
||||||
|
event.tenant_id,
|
||||||
|
recipient,
|
||||||
|
"流程已启动".to_string(),
|
||||||
|
format!("您的流程实例 {} 已启动执行。", instance_id),
|
||||||
|
"normal",
|
||||||
|
Some("workflow_instance".to_string()),
|
||||||
|
uuid::Uuid::parse_str(instance_id).ok(),
|
||||||
|
db,
|
||||||
|
event_bus,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.map_err(|e| e.to_string())?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
"task.completed" => {
|
||||||
|
// 任务完成时通知发起人(此处简化处理)
|
||||||
|
tracing::debug!("Task completed event received, skipping notification for now");
|
||||||
|
}
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|||||||
@@ -124,6 +124,10 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
// Register event handlers
|
// Register event handlers
|
||||||
registry.register_handlers(&event_bus);
|
registry.register_handlers(&event_bus);
|
||||||
|
|
||||||
|
// Start message event listener (workflow events → message notifications)
|
||||||
|
erp_message::MessageModule::start_event_listener(db.clone(), event_bus.clone());
|
||||||
|
tracing::info!("Message event listener started");
|
||||||
|
|
||||||
let host = config.server.host.clone();
|
let host = config.server.host.clone();
|
||||||
let port = config.server.port;
|
let port = config.server.port;
|
||||||
|
|
||||||
|
|||||||
@@ -54,7 +54,7 @@ impl InstanceService {
|
|||||||
let mut variables = HashMap::new();
|
let mut variables = HashMap::new();
|
||||||
if let Some(vars) = &req.variables {
|
if let Some(vars) = &req.variables {
|
||||||
for v in vars {
|
for v in vars {
|
||||||
let var_type = v.var_type.as_deref().unwrap_or("string");
|
let _var_type = v.var_type.as_deref().unwrap_or("string");
|
||||||
variables.insert(v.name.clone(), v.value.clone());
|
variables.insert(v.name.clone(), v.value.clone());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -127,7 +127,7 @@ impl InstanceService {
|
|||||||
event_bus.publish(erp_core::events::DomainEvent::new(
|
event_bus.publish(erp_core::events::DomainEvent::new(
|
||||||
"process_instance.started",
|
"process_instance.started",
|
||||||
tenant_id,
|
tenant_id,
|
||||||
serde_json::json!({ "instance_id": instance_id, "definition_id": definition.id }),
|
serde_json::json!({ "instance_id": instance_id, "definition_id": definition.id, "started_by": operator_id }),
|
||||||
));
|
));
|
||||||
|
|
||||||
// 查询创建后的实例(包含 token)
|
// 查询创建后的实例(包含 token)
|
||||||
@@ -169,7 +169,7 @@ impl InstanceService {
|
|||||||
.await
|
.await
|
||||||
.map_err(|e| WorkflowError::Validation(e.to_string()))?;
|
.map_err(|e| WorkflowError::Validation(e.to_string()))?;
|
||||||
|
|
||||||
let page_index = pagination.page.unwrap_or(1).saturating_sub(1) as u64;
|
let page_index = pagination.page.unwrap_or(1).saturating_sub(1);
|
||||||
let models = paginator
|
let models = paginator
|
||||||
.fetch_page(page_index)
|
.fetch_page(page_index)
|
||||||
.await
|
.await
|
||||||
@@ -325,7 +325,7 @@ impl InstanceService {
|
|||||||
) -> WorkflowResult<()> {
|
) -> WorkflowResult<()> {
|
||||||
let id = Uuid::now_v7();
|
let id = Uuid::now_v7();
|
||||||
|
|
||||||
let (value_string, value_number, value_boolean, value_date): (Option<String>, Option<f64>, Option<bool>, Option<chrono::DateTime<Utc>>) = match var_type {
|
let (value_string, value_number, value_boolean, _value_date): (Option<String>, Option<f64>, Option<bool>, Option<chrono::DateTime<Utc>>) = match var_type {
|
||||||
"string" => (value.as_str().map(|s| s.to_string()), None, None, None),
|
"string" => (value.as_str().map(|s| s.to_string()), None, None, None),
|
||||||
"number" => (None, value.as_f64(), None, None),
|
"number" => (None, value.as_f64(), None, None),
|
||||||
"boolean" => (None, None, value.as_bool(), None),
|
"boolean" => (None, None, value.as_bool(), None),
|
||||||
|
|||||||
Reference in New Issue
Block a user