use chrono::Utc; use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, PaginatorTrait, QueryFilter, Set}; use uuid::Uuid; use crate::dto::{CreateProcessDefinitionReq, ProcessDefinitionResp, UpdateProcessDefinitionReq}; use crate::engine::parser; use crate::entity::process_definition; use crate::error::{WorkflowError, WorkflowResult}; use erp_core::audit::AuditLog; use erp_core::audit_service; use erp_core::error::check_version; use erp_core::events::EventBus; use erp_core::types::Pagination; /// 流程定义 CRUD 服务。 pub struct DefinitionService; impl DefinitionService { /// 分页查询流程定义列表。 pub async fn list( tenant_id: Uuid, pagination: &Pagination, db: &sea_orm::DatabaseConnection, ) -> WorkflowResult<(Vec, u64)> { let paginator = process_definition::Entity::find() .filter(process_definition::Column::TenantId.eq(tenant_id)) .filter(process_definition::Column::DeletedAt.is_null()) .paginate(db, pagination.limit()); let total = paginator .num_items() .await .map_err(|e| WorkflowError::Validation(e.to_string()))?; let page_index = pagination.page.unwrap_or(1).saturating_sub(1); let models = paginator .fetch_page(page_index) .await .map_err(|e| WorkflowError::Validation(e.to_string()))?; let resps: Vec = models.iter().map(Self::model_to_resp).collect(); Ok((resps, total)) } /// 获取单个流程定义。 pub async fn get_by_id( id: Uuid, tenant_id: Uuid, db: &sea_orm::DatabaseConnection, ) -> WorkflowResult { let model = process_definition::Entity::find_by_id(id) .one(db) .await .map_err(|e| WorkflowError::Validation(e.to_string()))? .filter(|m| m.tenant_id == tenant_id && m.deleted_at.is_none()) .ok_or_else(|| WorkflowError::NotFound(format!("流程定义不存在: {id}")))?; Ok(Self::model_to_resp(&model)) } /// 创建流程定义。 pub async fn create( tenant_id: Uuid, operator_id: Uuid, req: &CreateProcessDefinitionReq, db: &sea_orm::DatabaseConnection, event_bus: &EventBus, ) -> WorkflowResult { // 验证流程图合法性 parser::parse_and_validate(&req.nodes, &req.edges)?; let now = Utc::now(); let id = Uuid::now_v7(); let nodes_json = serde_json::to_value(&req.nodes) .map_err(|e| WorkflowError::Validation(e.to_string()))?; let edges_json = serde_json::to_value(&req.edges) .map_err(|e| WorkflowError::Validation(e.to_string()))?; let model = process_definition::ActiveModel { id: Set(id), tenant_id: Set(tenant_id), name: Set(req.name.clone()), key: Set(req.key.clone()), version: Set(1), category: Set(req.category.clone()), description: Set(req.description.clone()), nodes: Set(nodes_json), edges: Set(edges_json), status: Set("draft".to_string()), created_at: Set(now), updated_at: Set(now), created_by: Set(operator_id), updated_by: Set(operator_id), deleted_at: Set(None), version_field: Set(1), }; model .insert(db) .await .map_err(|e| WorkflowError::Validation(e.to_string()))?; event_bus .publish( erp_core::events::DomainEvent::new( "process_definition.created", tenant_id, serde_json::json!({ "definition_id": id, "key": req.key }), ), db, ) .await; audit_service::record( AuditLog::new( tenant_id, Some(operator_id), "process_definition.create", "process_definition", ) .with_resource_id(id), db, ) .await; Ok(ProcessDefinitionResp { id, name: req.name.clone(), key: req.key.clone(), version: 1, category: req.category.clone(), description: req.description.clone(), nodes: serde_json::to_value(&req.nodes).unwrap_or_default(), edges: serde_json::to_value(&req.edges).unwrap_or_default(), status: "draft".to_string(), created_at: now, updated_at: now, lock_version: 1, }) } /// 更新流程定义(仅 draft 状态可编辑)。 pub async fn update( id: Uuid, tenant_id: Uuid, operator_id: Uuid, req: &UpdateProcessDefinitionReq, db: &sea_orm::DatabaseConnection, ) -> WorkflowResult { let model = process_definition::Entity::find_by_id(id) .one(db) .await .map_err(|e| WorkflowError::Validation(e.to_string()))? .filter(|m| m.tenant_id == tenant_id && m.deleted_at.is_none()) .ok_or_else(|| WorkflowError::NotFound(format!("流程定义不存在: {id}")))?; if model.status != "draft" { return Err(WorkflowError::InvalidState( "只有 draft 状态的流程定义可以编辑".to_string(), )); } let current_version = model.version_field; let mut active: process_definition::ActiveModel = model.into(); if let Some(name) = &req.name { active.name = Set(name.clone()); } if let Some(category) = &req.category { active.category = Set(Some(category.clone())); } if let Some(description) = &req.description { active.description = Set(Some(description.clone())); } // 当 nodes 或 edges 任一存在时,取最终值验证流程图完整性 let final_nodes = req.nodes.as_ref().or_else(|| { serde_json::from_value::>(active.nodes.as_ref().clone()).ok().as_ref().map(|_| unreachable!()) }); // 简化:如果提供了 nodes 或 edges,将两者合并后验证 if req.nodes.is_some() || req.edges.is_some() { let nodes_val = req.nodes.as_ref().map(|n| serde_json::to_value(n).unwrap()).unwrap_or(active.nodes.as_ref().clone()); let edges_val = req.edges.as_ref().map(|e| serde_json::to_value(e).unwrap()).unwrap_or(active.edges.as_ref().clone()); let nodes: Vec = serde_json::from_value(nodes_val) .map_err(|e| WorkflowError::Validation(format!("节点数据无效: {e}")))?; let edges: Vec = serde_json::from_value(edges_val) .map_err(|e| WorkflowError::Validation(format!("连线数据无效: {e}")))?; parser::parse_and_validate(&nodes, &edges)?; } if let Some(nodes) = &req.nodes { let nodes_json = serde_json::to_value(nodes) .map_err(|e| WorkflowError::Validation(e.to_string()))?; active.nodes = Set(nodes_json); } if let Some(edges) = &req.edges { let edges_json = serde_json::to_value(edges) .map_err(|e| WorkflowError::Validation(e.to_string()))?; active.edges = Set(edges_json); } let next_ver = check_version(req.version, current_version) .map_err(|_| WorkflowError::VersionMismatch)?; active.version_field = Set(next_ver); active.updated_at = Set(Utc::now()); active.updated_by = Set(operator_id); let updated = active .update(db) .await .map_err(|e| WorkflowError::Validation(e.to_string()))?; audit_service::record( AuditLog::new( tenant_id, Some(operator_id), "process_definition.update", "process_definition", ) .with_resource_id(id), db, ) .await; Ok(Self::model_to_resp(&updated)) } /// 发布流程定义(draft → published)。 pub async fn publish( id: Uuid, tenant_id: Uuid, operator_id: Uuid, db: &sea_orm::DatabaseConnection, event_bus: &EventBus, ) -> WorkflowResult { let model = process_definition::Entity::find_by_id(id) .one(db) .await .map_err(|e| WorkflowError::Validation(e.to_string()))? .filter(|m| m.tenant_id == tenant_id && m.deleted_at.is_none()) .ok_or_else(|| WorkflowError::NotFound(format!("流程定义不存在: {id}")))?; if model.status != "draft" { return Err(WorkflowError::InvalidState( "只有 draft 状态的流程定义可以发布".to_string(), )); } // 验证流程图 let nodes: Vec = serde_json::from_value(model.nodes.clone()) .map_err(|e| WorkflowError::InvalidDiagram(format!("节点数据无效: {e}")))?; let edges: Vec = serde_json::from_value(model.edges.clone()) .map_err(|e| WorkflowError::InvalidDiagram(format!("连线数据无效: {e}")))?; parser::parse_and_validate(&nodes, &edges)?; let current_version = model.version_field; let mut active: process_definition::ActiveModel = model.into(); active.status = Set("published".to_string()); active.version_field = Set(current_version + 1); active.updated_at = Set(Utc::now()); active.updated_by = Set(operator_id); let updated = active .update(db) .await .map_err(|e| WorkflowError::Validation(e.to_string()))?; event_bus .publish( erp_core::events::DomainEvent::new( "process_definition.published", tenant_id, serde_json::json!({ "definition_id": id }), ), db, ) .await; audit_service::record( AuditLog::new( tenant_id, Some(operator_id), "process_definition.publish", "process_definition", ) .with_resource_id(id), db, ) .await; Ok(Self::model_to_resp(&updated)) } /// 将已发布的流程定义标记为 deprecated。 pub async fn deprecate( id: Uuid, tenant_id: Uuid, operator_id: Uuid, db: &sea_orm::DatabaseConnection, event_bus: &EventBus, ) -> WorkflowResult { let model = process_definition::Entity::find_by_id(id) .one(db) .await .map_err(|e| WorkflowError::Validation(e.to_string()))? .filter(|m| m.tenant_id == tenant_id && m.deleted_at.is_none()) .ok_or_else(|| WorkflowError::NotFound(format!("流程定义不存在: {id}")))?; if model.status != "published" { return Err(WorkflowError::InvalidState( "只有 published 状态的流程定义可以废弃".to_string(), )); } let current_version = model.version_field; let mut active: process_definition::ActiveModel = model.into(); active.status = Set("deprecated".to_string()); active.version_field = Set(current_version + 1); active.updated_at = Set(Utc::now()); active.updated_by = Set(operator_id); let updated = active .update(db) .await .map_err(|e| WorkflowError::Validation(e.to_string()))?; event_bus .publish( erp_core::events::DomainEvent::new( "process_definition.deprecated", tenant_id, serde_json::json!({ "definition_id": id }), ), db, ) .await; audit_service::record( AuditLog::new( tenant_id, Some(operator_id), "process_definition.deprecate", "process_definition", ) .with_resource_id(id), db, ) .await; Ok(Self::model_to_resp(&updated)) } /// 软删除流程定义。 pub async fn delete( id: Uuid, tenant_id: Uuid, operator_id: Uuid, db: &sea_orm::DatabaseConnection, ) -> WorkflowResult<()> { let model = process_definition::Entity::find_by_id(id) .one(db) .await .map_err(|e| WorkflowError::Validation(e.to_string()))? .filter(|m| m.tenant_id == tenant_id && m.deleted_at.is_none()) .ok_or_else(|| WorkflowError::NotFound(format!("流程定义不存在: {id}")))?; let current_version = model.version_field; let mut active: process_definition::ActiveModel = model.into(); active.version_field = Set(current_version + 1); active.deleted_at = Set(Some(Utc::now())); active.updated_at = Set(Utc::now()); active.updated_by = Set(operator_id); active .update(db) .await .map_err(|e| WorkflowError::Validation(e.to_string()))?; audit_service::record( AuditLog::new( tenant_id, Some(operator_id), "process_definition.delete", "process_definition", ) .with_resource_id(id), db, ) .await; Ok(()) } fn model_to_resp(m: &process_definition::Model) -> ProcessDefinitionResp { ProcessDefinitionResp { id: m.id, name: m.name.clone(), key: m.key.clone(), version: m.version, category: m.category.clone(), description: m.description.clone(), nodes: m.nodes.clone(), edges: m.edges.clone(), status: m.status.clone(), created_at: m.created_at, updated_at: m.updated_at, lock_version: m.version_field, } } }