use std::collections::HashMap; use chrono::Utc; use sea_orm::{ ActiveModelTrait, ColumnTrait, ConnectionTrait, EntityTrait, PaginatorTrait, QueryFilter, Set, }; use uuid::Uuid; use crate::dto::NodeType; use crate::engine::expression::ExpressionEvaluator; use crate::engine::model::FlowGraph; use crate::entity::{process_instance, task, token}; use crate::error::{WorkflowError, WorkflowResult}; /// Token 驱动的流程执行引擎。 /// /// 核心职责: /// - 在流程启动时,于 StartEvent 创建第一个 token /// - 在任务完成时推进 token 到下一个节点 /// - 处理网关分支/汇合逻辑 /// - 在 EndEvent 完成实例 pub struct FlowExecutor; impl FlowExecutor { /// 启动流程:在 StartEvent 的后继节点创建 token。 /// /// 返回创建的 token ID 列表。 pub async fn start( instance_id: Uuid, tenant_id: Uuid, graph: &FlowGraph, variables: &HashMap, txn: &impl ConnectionTrait, ) -> WorkflowResult> { let start_id = graph .start_node_id .as_ref() .ok_or_else(|| WorkflowError::InvalidDiagram("流程图没有开始事件".to_string()))?; // 获取 StartEvent 的出边,推进到后继节点 let outgoing = graph.get_outgoing_edges(start_id); if outgoing.is_empty() { return Err(WorkflowError::InvalidDiagram( "开始事件没有出边".to_string(), )); } // StartEvent 只有一条出边 let first_edge = &outgoing[0]; let target_node_id = &first_edge.target; Self::create_token_at_node( instance_id, tenant_id, target_node_id, graph, variables, txn, ) .await } /// 推进 token:消费当前 token,在下一节点创建新 token。 /// /// 返回新创建的 token ID 列表。 pub async fn advance( token_id: Uuid, instance_id: Uuid, tenant_id: Uuid, graph: &FlowGraph, variables: &HashMap, txn: &impl ConnectionTrait, ) -> WorkflowResult> { // 读取当前 token let current_token = token::Entity::find_by_id(token_id) .one(txn) .await .map_err(|e| WorkflowError::Validation(e.to_string()))? .ok_or_else(|| WorkflowError::NotFound(format!("Token 不存在: {token_id}")))?; if current_token.status != "active" { return Err(WorkflowError::InvalidState(format!( "Token 状态不是 active: {}", current_token.status ))); } let node_id = current_token.node_id.clone(); // 消费当前 token let mut active: token::ActiveModel = current_token.into(); active.version = Set(active.version.take().unwrap_or(0) + 1); active.status = Set("consumed".to_string()); active.consumed_at = Set(Some(Utc::now())); active .update(txn) .await .map_err(|e| WorkflowError::Validation(e.to_string()))?; // 获取当前节点的出边 let outgoing = graph.get_outgoing_edges(&node_id); let current_node = graph .nodes .get(&node_id) .ok_or_else(|| WorkflowError::InvalidDiagram(format!("节点不存在: {node_id}")))?; match current_node.node_type { NodeType::ExclusiveGateway => { // 排他网关:求值条件,选择一条分支 Self::advance_exclusive_gateway( instance_id, tenant_id, &outgoing, graph, variables, txn, ) .await } NodeType::ParallelGateway => { // 并行网关:为每条出边创建 token Self::advance_parallel_gateway( instance_id, tenant_id, &outgoing, graph, variables, txn, ) .await } _ => { // 普通节点:沿出边前进 if outgoing.is_empty() { // 没有出边(理论上只有 EndEvent 会到这里) Ok(vec![]) } else { let mut new_tokens = Vec::new(); for edge in &outgoing { let tokens = Self::create_token_at_node( instance_id, tenant_id, &edge.target, graph, variables, txn, ) .await?; new_tokens.extend(tokens); } Ok(new_tokens) } } } } /// 排他网关分支:求值条件,选择第一个满足条件的分支。 async fn advance_exclusive_gateway( instance_id: Uuid, tenant_id: Uuid, outgoing: &[&crate::engine::model::FlowEdge], graph: &FlowGraph, variables: &HashMap, txn: &impl ConnectionTrait, ) -> WorkflowResult> { let mut default_target: Option<&str> = None; let mut matched_target: Option<&str> = None; for edge in outgoing { if let Some(condition) = &edge.condition { match ExpressionEvaluator::eval(condition, variables) { Ok(true) => { matched_target = Some(&edge.target); break; } Ok(false) => continue, Err(_) => continue, // 条件求值失败,跳过 } } else { // 无条件的边作为默认分支 default_target = Some(&edge.target); } } let target = matched_target.or(default_target).ok_or_else(|| { WorkflowError::ExpressionError("排他网关没有匹配的条件分支".to_string()) })?; Self::create_token_at_node(instance_id, tenant_id, target, graph, variables, txn).await } /// 并行网关分支:为每条出边创建 token。 async fn advance_parallel_gateway( instance_id: Uuid, tenant_id: Uuid, outgoing: &[&crate::engine::model::FlowEdge], graph: &FlowGraph, variables: &HashMap, txn: &impl ConnectionTrait, ) -> WorkflowResult> { let mut new_tokens = Vec::new(); for edge in outgoing { let tokens = Self::create_token_at_node( instance_id, tenant_id, &edge.target, graph, variables, txn, ) .await?; new_tokens.extend(tokens); } Ok(new_tokens) } /// 在指定节点创建 token,并根据节点类型执行相应逻辑。 fn create_token_at_node<'a>( instance_id: Uuid, tenant_id: Uuid, node_id: &'a str, graph: &'a FlowGraph, variables: &'a HashMap, txn: &'a impl ConnectionTrait, ) -> std::pin::Pin>> + Send + 'a>> { Box::pin(async move { let node = graph .nodes .get(node_id) .ok_or_else(|| WorkflowError::InvalidDiagram(format!("节点不存在: {node_id}")))?; match node.node_type { NodeType::EndEvent => { // 到达 EndEvent,不创建新 token // 检查实例是否所有 token 都完成 Self::check_instance_completion(instance_id, tenant_id, txn).await?; Ok(vec![]) } NodeType::ParallelGateway if Self::is_join_gateway(node_id, graph) => { // 并行网关汇合:等待所有入边 token 到达 Self::handle_join_gateway( instance_id, tenant_id, node_id, graph, variables, txn, ) .await } NodeType::ServiceTask => { // ServiceTask 自动执行 HTTP 调用 let now = Utc::now(); let system_user = uuid::Uuid::nil(); let auto_token_id = Uuid::now_v7(); let token_model = token::ActiveModel { id: Set(auto_token_id), tenant_id: Set(tenant_id), instance_id: Set(instance_id), node_id: Set(node_id.to_string()), status: Set("consumed".to_string()), created_at: Set(now), updated_at: Set(now), created_by: Set(system_user), updated_by: Set(system_user), deleted_at: Set(None), version: Set(1), consumed_at: Set(Some(now)), }; token_model .insert(txn) .await .map_err(|e| WorkflowError::Validation(e.to_string()))?; // 执行 HTTP 调用(如果配置了 service_config) let var_name = format!("service_task_{node_id}_result"); let result_value = Self::execute_service_task(node, variables).await; // 将结果存储为流程变量 Self::set_process_variable( instance_id, tenant_id, &var_name, &result_value, txn, ) .await?; // 沿出边继续推进 let outgoing = graph.get_outgoing_edges(node_id); let mut new_tokens = Vec::new(); for edge in &outgoing { let tokens = Self::create_token_at_node( instance_id, tenant_id, &edge.target, graph, variables, txn, ) .await?; new_tokens.extend(tokens); } Ok(new_tokens) } _ => { // UserTask / 网关(分支)等:创建活跃 token let new_token_id = Uuid::now_v7(); let now = Utc::now(); let system_user = uuid::Uuid::nil(); let token_model = token::ActiveModel { id: Set(new_token_id), tenant_id: Set(tenant_id), instance_id: Set(instance_id), node_id: Set(node_id.to_string()), status: Set("active".to_string()), created_at: Set(now), updated_at: Set(now), created_by: Set(system_user), updated_by: Set(system_user), deleted_at: Set(None), version: Set(1), consumed_at: Set(None), }; token_model .insert(txn) .await .map_err(|e| WorkflowError::Validation(e.to_string()))?; // UserTask: 同时创建 task 记录 if node.node_type == NodeType::UserTask { let task_model = task::ActiveModel { id: Set(Uuid::now_v7()), tenant_id: Set(tenant_id), instance_id: Set(instance_id), token_id: Set(new_token_id), node_id: Set(node_id.to_string()), node_name: Set(Some(node.name.clone())), assignee_id: Set(node.assignee_id), candidate_groups: Set(node .candidate_groups .as_ref() .map(|g| serde_json::to_value(g).unwrap_or_default())), status: Set("pending".to_string()), outcome: Set(None), form_data: Set(None), due_date: Set(None), completed_at: Set(None), created_at: Set(now), updated_at: Set(now), created_by: Set(Uuid::nil()), updated_by: Set(Uuid::nil()), deleted_at: Set(None), version: Set(1), }; task_model .insert(txn) .await .map_err(|e| WorkflowError::Validation(e.to_string()))?; } Ok(vec![new_token_id]) } } }) } /// 判断并行网关是否是汇合模式(入边数 > 出边数,或者入边数 > 1)。 fn is_join_gateway(node_id: &str, graph: &FlowGraph) -> bool { let incoming = graph.get_incoming_edges(node_id); incoming.len() > 1 } /// 处理并行网关汇合逻辑。 /// /// 当所有入边的源节点都有已消费的 token 时,创建新 token 推进到后继。 async fn handle_join_gateway( instance_id: Uuid, tenant_id: Uuid, node_id: &str, graph: &FlowGraph, variables: &HashMap, txn: &impl ConnectionTrait, ) -> WorkflowResult> { let incoming = graph.get_incoming_edges(node_id); // 检查所有入边的源节点是否都有已消费/已完成的 token for edge in &incoming { let has_consumed = token::Entity::find() .filter(token::Column::InstanceId.eq(instance_id)) .filter(token::Column::NodeId.eq(&edge.source)) .filter(token::Column::Status.is_in(["consumed", "active"])) .one(txn) .await .map_err(|e| WorkflowError::Validation(e.to_string()))?; if has_consumed.is_none() { // 还有分支没有到达,等待 return Ok(vec![]); } // 检查是否还有活跃的 token(来自其他分支) let has_active = token::Entity::find() .filter(token::Column::InstanceId.eq(instance_id)) .filter(token::Column::NodeId.eq(&edge.source)) .filter(token::Column::Status.eq("active")) .one(txn) .await .map_err(|e| WorkflowError::Validation(e.to_string()))?; if has_active.is_some() { // 还有分支在执行中,等待 return Ok(vec![]); } } // 所有分支都完成了,先将 consumed tokens 标记为 completed 防止并发重复触发 for edge in &incoming { let consumed_tokens = token::Entity::find() .filter(token::Column::InstanceId.eq(instance_id)) .filter(token::Column::NodeId.eq(&edge.source)) .filter(token::Column::Status.eq("consumed")) .all(txn) .await .map_err(|e| WorkflowError::Validation(e.to_string()))?; for t in consumed_tokens { let ver = t.version; let mut active: token::ActiveModel = t.into(); active.status = Set("completed".to_string()); active.version = Set(ver + 1); active.updated_at = Set(chrono::Utc::now()); active .update(txn) .await .map_err(|e| WorkflowError::Validation(e.to_string()))?; } } // 沿出边继续创建新 token let outgoing = graph.get_outgoing_edges(node_id); let mut new_tokens = Vec::new(); for edge in &outgoing { let tokens = Self::create_token_at_node( instance_id, tenant_id, &edge.target, graph, variables, txn, ) .await?; new_tokens.extend(tokens); } Ok(new_tokens) } /// 执行 ServiceTask HTTP 调用。 /// /// 根据 `service_config` 中的 url/method/body 发起 HTTP 请求。 /// 如果没有配置 `service_config` 或调用失败,返回错误信息 JSON 而不是阻塞流程。 async fn execute_service_task( node: &crate::engine::model::FlowNode, variables: &HashMap, ) -> serde_json::Value { let config = match &node.service_config { Some(c) => c, None => { tracing::warn!( node_id = &node.id, node_name = %node.name, "ServiceTask 没有 service_config 配置,跳过 HTTP 调用" ); return serde_json::json!({ "status": "skipped", "reason": "未配置 service_config" }); } }; let method = config.method.to_uppercase(); let url = &config.url; tracing::info!( node_id = &node.id, node_name = %node.name, method = %method, url = %url, "ServiceTask 开始 HTTP 调用" ); let client = reqwest::Client::new(); let result = match method.as_str() { "POST" => { let body = config.body.as_ref().map(|b| { // 简单变量替换:${var_name} → variables 中的值 let mut body_str = b.to_string(); for (key, val) in variables { let placeholder = format!("${{{key}}}"); body_str = body_str.replace(&placeholder, &val.to_string()); } body_str }); client.post(url).body(body.unwrap_or_default()).send().await } _ => { // 默认 GET client.get(url).send().await } }; match result { Ok(resp) => { let status = resp.status().as_u16(); let body = resp.text().await.unwrap_or_default(); tracing::info!( node_id = &node.id, status = status, "ServiceTask HTTP 调用完成" ); serde_json::json!({ "status": "success", "http_status": status, "body": body, }) } Err(e) => { tracing::warn!( node_id = &node.id, error = %e, "ServiceTask HTTP 调用失败(流程继续推进)" ); serde_json::json!({ "status": "error", "error": e.to_string(), }) } } } /// 将流程变量写入 process_variables 表。 async fn set_process_variable( instance_id: Uuid, tenant_id: Uuid, name: &str, value: &serde_json::Value, txn: &impl ConnectionTrait, ) -> WorkflowResult<()> { use crate::entity::process_variable; let now = Utc::now(); let system_user = Uuid::nil(); let var_model = process_variable::ActiveModel { id: Set(Uuid::now_v7()), tenant_id: Set(tenant_id), instance_id: Set(instance_id), name: Set(name.to_string()), var_type: Set("json".to_string()), value_string: Set(Some(value.to_string())), value_number: Set(None), value_boolean: Set(None), value_date: Set(None), created_at: Set(now), updated_at: Set(now), created_by: Set(system_user), updated_by: Set(system_user), deleted_at: Set(None), version: Set(1), }; var_model .insert(txn) .await .map_err(|e| WorkflowError::Validation(e.to_string()))?; Ok(()) } /// 检查实例是否所有 token 都已完成,如果是则完成实例。 async fn check_instance_completion( instance_id: Uuid, tenant_id: Uuid, txn: &impl ConnectionTrait, ) -> WorkflowResult<()> { let active_count = token::Entity::find() .filter(token::Column::InstanceId.eq(instance_id)) .filter(token::Column::Status.eq("active")) .count(txn) .await .map_err(|e| WorkflowError::Validation(e.to_string()))?; if active_count == 0 { // 所有 token 都完成,标记实例完成 let instance = process_instance::Entity::find_by_id(instance_id) .one(txn) .await .map_err(|e| WorkflowError::Validation(e.to_string()))? .filter(|i| i.tenant_id == tenant_id && i.deleted_at.is_none()) .ok_or_else(|| WorkflowError::NotFound(format!("流程实例不存在: {instance_id}")))?; let mut active: process_instance::ActiveModel = instance.into(); active.version = Set(active.version.take().unwrap_or(0) + 1); active.status = Set("completed".to_string()); active.completed_at = Set(Some(Utc::now())); active.updated_at = Set(Utc::now()); active .update(txn) .await .map_err(|e| WorkflowError::Validation(e.to_string()))?; // 写入完成事件到 outbox,由 relay 广播 let now = Utc::now(); let outbox_event = erp_core::entity::domain_event::ActiveModel { id: Set(Uuid::now_v7()), tenant_id: Set(tenant_id), event_type: Set("process_instance.completed".to_string()), payload: Set(Some(serde_json::json!({ "instance_id": instance_id }))), correlation_id: Set(Some(Uuid::now_v7())), status: Set("pending".to_string()), attempts: Set(0), last_error: Set(None), created_at: Set(now), published_at: Set(None), }; outbox_event .insert(txn) .await .map_err(|e| WorkflowError::Validation(e.to_string()))?; } Ok(()) } } #[cfg(test)] mod tests { use super::*; use crate::dto::{EdgeDef, NodeDef, NodeType}; fn make_node(id: &str, node_type: NodeType) -> NodeDef { NodeDef { id: id.to_string(), node_type, name: id.to_string(), assignee_id: None, candidate_groups: None, service_type: None, service_config: None, position: None, } } fn make_edge(id: &str, source: &str, target: &str) -> EdgeDef { EdgeDef { id: id.to_string(), source: source.to_string(), target: target.to_string(), condition: None, label: None, } } #[test] fn test_is_join_gateway_with_multiple_incoming() { let nodes = vec![ make_node("start", NodeType::StartEvent), make_node("a", NodeType::UserTask), make_node("b", NodeType::ServiceTask), make_node("join", NodeType::ParallelGateway), make_node("end", NodeType::EndEvent), ]; let edges = vec![ make_edge("e1", "start", "a"), make_edge("e2", "start", "b"), make_edge("e3", "a", "join"), make_edge("e4", "b", "join"), make_edge("e5", "join", "end"), ]; let graph = FlowGraph::build(&nodes, &edges); assert!(FlowExecutor::is_join_gateway("join", &graph)); } #[test] fn test_is_not_join_gateway_single_incoming() { let nodes = vec![ make_node("start", NodeType::StartEvent), make_node("fork", NodeType::ParallelGateway), make_node("end", NodeType::EndEvent), ]; let edges = vec![ make_edge("e1", "start", "fork"), make_edge("e2", "fork", "end"), ]; let graph = FlowGraph::build(&nodes, &edges); assert!(!FlowExecutor::is_join_gateway("fork", &graph)); } #[test] fn test_is_not_join_gateway_for_nonexistent_node() { let graph = FlowGraph::build(&[], &[]); assert!(!FlowExecutor::is_join_gateway("nonexistent", &graph)); } }