use std::collections::HashMap; use chrono::Utc; use sea_orm::{ ActiveModelTrait, ColumnTrait, EntityTrait, PaginatorTrait, QueryFilter, Set, TransactionTrait, ConnectionTrait, }; use uuid::Uuid; use crate::dto::{ProcessInstanceResp, StartInstanceReq, TokenResp}; use crate::engine::executor::FlowExecutor; use crate::engine::parser; use crate::entity::{process_definition, process_instance, process_variable, token}; use crate::error::{WorkflowError, WorkflowResult}; use erp_core::events::EventBus; use erp_core::types::Pagination; /// 流程实例服务。 pub struct InstanceService; impl InstanceService { /// 启动流程实例。 pub async fn start( tenant_id: Uuid, operator_id: Uuid, req: &StartInstanceReq, db: &sea_orm::DatabaseConnection, event_bus: &EventBus, ) -> WorkflowResult { // 查找流程定义 let definition = process_definition::Entity::find_by_id(req.definition_id) .one(db) .await .map_err(|e| WorkflowError::Validation(e.to_string()))? .filter(|d| d.tenant_id == tenant_id && d.deleted_at.is_none()) .ok_or_else(|| { WorkflowError::NotFound(format!("流程定义不存在: {}", req.definition_id)) })?; if definition.status != "published" { return Err(WorkflowError::InvalidState( "只能启动已发布的流程定义".to_string(), )); } // 解析流程图 let nodes: Vec = serde_json::from_value(definition.nodes.clone()) .map_err(|e| WorkflowError::InvalidDiagram(format!("节点数据无效: {e}")))?; let edges: Vec = serde_json::from_value(definition.edges.clone()) .map_err(|e| WorkflowError::InvalidDiagram(format!("连线数据无效: {e}")))?; let graph = parser::parse_and_validate(&nodes, &edges)?; // 准备流程变量 let mut variables = HashMap::new(); if let Some(vars) = &req.variables { for v in vars { let var_type = v.var_type.as_deref().unwrap_or("string"); variables.insert(v.name.clone(), v.value.clone()); } } let instance_id = Uuid::now_v7(); let now = Utc::now(); // 在事务中创建实例、变量和 token let instance_id_clone = instance_id; let tenant_id_clone = tenant_id; let operator_id_clone = operator_id; let business_key = req.business_key.clone(); let definition_id = definition.id; let definition_name = definition.name.clone(); let vars_to_save = req.variables.clone(); db.transaction::<_, (), WorkflowError>(|txn| { let graph = graph.clone(); let variables = variables.clone(); Box::pin(async move { // 创建流程实例 let instance = process_instance::ActiveModel { id: Set(instance_id_clone), tenant_id: Set(tenant_id_clone), definition_id: Set(definition_id), business_key: Set(business_key), status: Set("running".to_string()), started_by: Set(operator_id_clone), started_at: Set(now), completed_at: Set(None), created_at: Set(now), updated_at: Set(now), created_by: Set(operator_id_clone), updated_by: Set(operator_id_clone), deleted_at: Set(None), version: Set(1), }; instance.insert(txn).await.map_err(|e| WorkflowError::Validation(e.to_string()))?; // 保存初始变量 if let Some(vars) = vars_to_save { for v in vars { Self::save_variable( instance_id_clone, tenant_id_clone, &v.name, v.var_type.as_deref().unwrap_or("string"), &v.value, txn, ) .await?; } } // 启动执行引擎 FlowExecutor::start( instance_id_clone, tenant_id_clone, &graph, &variables, txn, ) .await?; Ok(()) }) }) .await?; event_bus.publish(erp_core::events::DomainEvent::new( "process_instance.started", tenant_id, serde_json::json!({ "instance_id": instance_id, "definition_id": definition.id }), )); // 查询创建后的实例(包含 token) let instance = process_instance::Entity::find_by_id(instance_id) .one(db) .await .map_err(|e| WorkflowError::Validation(e.to_string()))? .ok_or_else(|| WorkflowError::NotFound(format!("流程实例不存在: {instance_id}")))?; let active_tokens = Self::get_active_tokens(instance_id, db).await?; Ok(ProcessInstanceResp { id: instance.id, definition_id: instance.definition_id, definition_name: Some(definition_name), business_key: instance.business_key, status: instance.status, started_by: instance.started_by, started_at: instance.started_at, completed_at: instance.completed_at, created_at: instance.created_at, active_tokens, }) } /// 分页查询流程实例。 pub async fn list( tenant_id: Uuid, pagination: &Pagination, db: &sea_orm::DatabaseConnection, ) -> WorkflowResult<(Vec, u64)> { let paginator = process_instance::Entity::find() .filter(process_instance::Column::TenantId.eq(tenant_id)) .filter(process_instance::Column::DeletedAt.is_null()) .paginate(db, pagination.limit()); let total = paginator .num_items() .await .map_err(|e| WorkflowError::Validation(e.to_string()))?; let page_index = pagination.page.unwrap_or(1).saturating_sub(1) as u64; let models = paginator .fetch_page(page_index) .await .map_err(|e| WorkflowError::Validation(e.to_string()))?; let mut resps = Vec::new(); for m in &models { let active_tokens = Self::get_active_tokens(m.id, db).await.unwrap_or_default(); let def_name = process_definition::Entity::find_by_id(m.definition_id) .one(db) .await .ok() .flatten() .map(|d| d.name); resps.push(ProcessInstanceResp { id: m.id, definition_id: m.definition_id, definition_name: def_name, business_key: m.business_key.clone(), status: m.status.clone(), started_by: m.started_by, started_at: m.started_at, completed_at: m.completed_at, created_at: m.created_at, active_tokens, }); } Ok((resps, total)) } /// 获取单个流程实例详情。 pub async fn get_by_id( id: Uuid, tenant_id: Uuid, db: &sea_orm::DatabaseConnection, ) -> WorkflowResult { let instance = process_instance::Entity::find_by_id(id) .one(db) .await .map_err(|e| WorkflowError::Validation(e.to_string()))? .filter(|i| i.tenant_id == tenant_id && i.deleted_at.is_none()) .ok_or_else(|| WorkflowError::NotFound(format!("流程实例不存在: {id}")))?; let def_name = process_definition::Entity::find_by_id(instance.definition_id) .one(db) .await .ok() .flatten() .map(|d| d.name); let active_tokens = Self::get_active_tokens(id, db).await?; Ok(ProcessInstanceResp { id: instance.id, definition_id: instance.definition_id, definition_name: def_name, business_key: instance.business_key, status: instance.status, started_by: instance.started_by, started_at: instance.started_at, completed_at: instance.completed_at, created_at: instance.created_at, active_tokens, }) } /// 挂起流程实例。 pub async fn suspend( id: Uuid, tenant_id: Uuid, operator_id: Uuid, db: &sea_orm::DatabaseConnection, ) -> WorkflowResult<()> { Self::change_status(id, tenant_id, operator_id, "running", "suspended", db).await } /// 终止流程实例。 pub async fn terminate( id: Uuid, tenant_id: Uuid, operator_id: Uuid, db: &sea_orm::DatabaseConnection, ) -> WorkflowResult<()> { Self::change_status(id, tenant_id, operator_id, "running", "terminated", db).await } async fn change_status( id: Uuid, tenant_id: Uuid, operator_id: Uuid, from_status: &str, to_status: &str, db: &sea_orm::DatabaseConnection, ) -> WorkflowResult<()> { let instance = process_instance::Entity::find_by_id(id) .one(db) .await .map_err(|e| WorkflowError::Validation(e.to_string()))? .filter(|i| i.tenant_id == tenant_id && i.deleted_at.is_none()) .ok_or_else(|| WorkflowError::NotFound(format!("流程实例不存在: {id}")))?; if instance.status != from_status { return Err(WorkflowError::InvalidState(format!( "流程实例状态不是 {},无法变更为 {}", from_status, to_status ))); } let mut active: process_instance::ActiveModel = instance.into(); active.status = Set(to_status.to_string()); active.updated_at = Set(Utc::now()); active.updated_by = Set(operator_id); active .update(db) .await .map_err(|e| WorkflowError::Validation(e.to_string()))?; Ok(()) } /// 获取实例的活跃 token 列表。 pub async fn get_active_tokens( instance_id: Uuid, db: &sea_orm::DatabaseConnection, ) -> WorkflowResult> { let tokens = token::Entity::find() .filter(token::Column::InstanceId.eq(instance_id)) .filter(token::Column::Status.eq("active")) .all(db) .await .map_err(|e| WorkflowError::Validation(e.to_string()))?; Ok(tokens .iter() .map(|t| TokenResp { id: t.id, node_id: t.node_id.clone(), status: t.status.clone(), created_at: t.created_at, }) .collect()) } /// 保存流程变量。 pub async fn save_variable( instance_id: Uuid, tenant_id: Uuid, name: &str, var_type: &str, value: &serde_json::Value, txn: &impl ConnectionTrait, ) -> WorkflowResult<()> { let id = Uuid::now_v7(); let (value_string, value_number, value_boolean, value_date): (Option, Option, Option, Option>) = match var_type { "string" => (value.as_str().map(|s| s.to_string()), None, None, None), "number" => (None, value.as_f64(), None, None), "boolean" => (None, None, value.as_bool(), None), _ => (Some(value.to_string()), None, None, None), }; let model = process_variable::ActiveModel { id: Set(id), tenant_id: Set(tenant_id), instance_id: Set(instance_id), name: Set(name.to_string()), var_type: Set(var_type.to_string()), value_string: Set(value_string), value_number: Set(value_number), value_boolean: Set(value_boolean), value_date: Set(None), }; model .insert(txn) .await .map_err(|e| WorkflowError::Validation(e.to_string()))?; Ok(()) } }