use std::collections::HashMap; use chrono::Utc; use sea_orm::{ ActiveModelTrait, ColumnTrait, ConnectionTrait, DatabaseBackend, EntityTrait, PaginatorTrait, QueryFilter, Set, Statement, TransactionTrait, }; use uuid::Uuid; use crate::dto::{CompleteTaskReq, DelegateTaskReq, TaskResp}; use crate::engine::executor::FlowExecutor; use crate::engine::parser; use crate::entity::{process_definition, process_instance, task}; use crate::error::{WorkflowError, WorkflowResult}; use erp_core::audit::AuditLog; use erp_core::audit_service; use erp_core::events::EventBus; use erp_core::types::Pagination; /// 任务服务。 pub struct TaskService; impl TaskService { /// 查询当前用户的待办任务。 pub async fn list_pending( tenant_id: Uuid, assignee_id: Uuid, pagination: &Pagination, db: &sea_orm::DatabaseConnection, ) -> WorkflowResult<(Vec, u64)> { let paginator = task::Entity::find() .filter(task::Column::TenantId.eq(tenant_id)) .filter(task::Column::AssigneeId.eq(assignee_id)) .filter(task::Column::Status.eq("pending")) .filter(task::Column::DeletedAt.is_null()) .paginate(db, pagination.limit()); let total = paginator .num_items() .await .map_err(|e| WorkflowError::Validation(e.to_string()))?; let page_index = pagination.page.unwrap_or(1).saturating_sub(1); let models = paginator .fetch_page(page_index) .await .map_err(|e| WorkflowError::Validation(e.to_string()))?; let mut resps = Vec::new(); for m in &models { let mut resp = Self::model_to_resp(m); // 附加实例信息 if let Some(inst) = process_instance::Entity::find_by_id(m.instance_id) .one(db) .await .ok() .flatten() { resp.business_key = inst.business_key; if let Some(def) = process_definition::Entity::find_by_id(inst.definition_id) .one(db) .await .ok() .flatten() { resp.definition_name = Some(def.name); } } resps.push(resp); } Ok((resps, total)) } /// 查询当前用户的已办任务。 pub async fn list_completed( tenant_id: Uuid, assignee_id: Uuid, pagination: &Pagination, db: &sea_orm::DatabaseConnection, ) -> WorkflowResult<(Vec, u64)> { let paginator = task::Entity::find() .filter(task::Column::TenantId.eq(tenant_id)) .filter(task::Column::AssigneeId.eq(assignee_id)) .filter(task::Column::Status.is_in(["completed", "approved", "rejected", "delegated"])) .filter(task::Column::DeletedAt.is_null()) .paginate(db, pagination.limit()); let total = paginator .num_items() .await .map_err(|e| WorkflowError::Validation(e.to_string()))?; let page_index = pagination.page.unwrap_or(1).saturating_sub(1); let models = paginator .fetch_page(page_index) .await .map_err(|e| WorkflowError::Validation(e.to_string()))?; let mut resps = Vec::new(); for m in &models { let mut resp = Self::model_to_resp(m); if let Some(inst) = process_instance::Entity::find_by_id(m.instance_id) .one(db) .await .ok() .flatten() { resp.business_key = inst.business_key; if let Some(def) = process_definition::Entity::find_by_id(inst.definition_id) .one(db) .await .ok() .flatten() { resp.definition_name = Some(def.name); } } resps.push(resp); } Ok((resps, total)) } /// 完成任务:更新任务状态 + 推进 token。 pub async fn complete( id: Uuid, tenant_id: Uuid, operator_id: Uuid, req: &CompleteTaskReq, db: &sea_orm::DatabaseConnection, event_bus: &EventBus, ) -> WorkflowResult { let task_model = task::Entity::find_by_id(id) .one(db) .await .map_err(|e| WorkflowError::Validation(e.to_string()))? .filter(|t| t.tenant_id == tenant_id && t.deleted_at.is_none()) .ok_or_else(|| WorkflowError::NotFound(format!("任务不存在: {id}")))?; if task_model.status != "pending" { return Err(WorkflowError::InvalidState( "任务状态不是 pending,无法完成".to_string(), )); } // 验证操作者是当前处理人 if task_model.assignee_id != Some(operator_id) { return Err(WorkflowError::InvalidState( "只有当前处理人才能完成任务".to_string(), )); } let instance_id = task_model.instance_id; let token_id = task_model.token_id; // 获取流程定义和流程图 let instance = process_instance::Entity::find_by_id(instance_id) .one(db) .await .map_err(|e| WorkflowError::Validation(e.to_string()))? .filter(|i| i.tenant_id == tenant_id && i.deleted_at.is_none()) .ok_or_else(|| WorkflowError::NotFound(format!("流程实例不存在: {instance_id}")))?; let definition = process_definition::Entity::find_by_id(instance.definition_id) .one(db) .await .map_err(|e| WorkflowError::Validation(e.to_string()))? .filter(|d| d.tenant_id == tenant_id && d.deleted_at.is_none()) .ok_or_else(|| { WorkflowError::NotFound(format!("流程定义不存在: {}", instance.definition_id)) })?; if instance.status != "running" { return Err(WorkflowError::InvalidState(format!( "流程实例状态不是 running: {}", instance.status ))); } let nodes: Vec = serde_json::from_value(definition.nodes.clone()) .map_err(|e| WorkflowError::InvalidDiagram(format!("节点数据无效: {e}")))?; let edges: Vec = serde_json::from_value(definition.edges.clone()) .map_err(|e| WorkflowError::InvalidDiagram(format!("连线数据无效: {e}")))?; let graph = parser::parse_and_validate(&nodes, &edges)?; // 准备变量(从 req.form_data 中提取) let mut variables = HashMap::new(); if let Some(form) = &req.form_data && let Some(obj) = form.as_object() { for (k, v) in obj { variables.insert(k.clone(), v.clone()); } } // 在事务中更新任务 + 推进 token let now = Utc::now(); let outcome = req.outcome.clone(); let form_data = req.form_data.clone(); db.transaction::<_, (), WorkflowError>(|txn| { let graph = graph.clone(); let variables = variables.clone(); let task_model = task_model.clone(); Box::pin(async move { // 更新任务状态 let current_version = task_model.version; let mut active: task::ActiveModel = task_model.clone().into(); active.status = Set("completed".to_string()); active.outcome = Set(Some(outcome)); active.form_data = Set(form_data); active.completed_at = Set(Some(now)); active.version = Set(current_version + 1); active.updated_at = Set(now); active.updated_by = Set(operator_id); active .update(txn) .await .map_err(|e| WorkflowError::Validation(e.to_string()))?; // 推进 token FlowExecutor::advance(token_id, instance_id, tenant_id, &graph, &variables, txn) .await?; Ok(()) }) }) .await?; event_bus .publish( erp_core::events::DomainEvent::new( "task.completed", tenant_id, serde_json::json!({ "task_id": id, "instance_id": instance_id, "started_by": instance.started_by, "outcome": req.outcome, }), ), db, ) .await; audit_service::record( AuditLog::new(tenant_id, Some(operator_id), "task.complete", "task") .with_resource_id(id), db, ) .await; // 重新查询任务 let updated = task::Entity::find_by_id(id) .one(db) .await .map_err(|e| WorkflowError::Validation(e.to_string()))? .ok_or_else(|| WorkflowError::NotFound(format!("任务不存在: {id}")))?; Ok(Self::model_to_resp(&updated)) } /// 委派任务给其他人。 pub async fn delegate( id: Uuid, tenant_id: Uuid, operator_id: Uuid, req: &DelegateTaskReq, db: &sea_orm::DatabaseConnection, ) -> WorkflowResult { let task_model = task::Entity::find_by_id(id) .one(db) .await .map_err(|e| WorkflowError::Validation(e.to_string()))? .filter(|t| t.tenant_id == tenant_id && t.deleted_at.is_none()) .ok_or_else(|| WorkflowError::NotFound(format!("任务不存在: {id}")))?; if task_model.status != "pending" { return Err(WorkflowError::InvalidState( "任务状态不是 pending,无法委派".to_string(), )); } // 验证操作者是当前处理人 if task_model.assignee_id != Some(operator_id) { return Err(WorkflowError::InvalidState( "只有当前处理人才能委派任务".to_string(), )); } // 验证目标用户属于同一租户(使用 raw SQL 避免跨模块依赖 erp-auth) let result = db.query_one(Statement::from_sql_and_values( DatabaseBackend::Postgres, "SELECT EXISTS(SELECT 1 FROM users WHERE id = $1 AND tenant_id = $2 AND deleted_at IS NULL AND status = 'active') AS ok", [req.delegate_to.into(), tenant_id.into()], )) .await .map_err(|e| WorkflowError::Validation(e.to_string()))?; let target_ok = result .and_then(|r| r.try_get::("", "ok").ok()) .unwrap_or(false); if !target_ok { return Err(WorkflowError::Validation( "委派目标用户不存在或不属于当前租户".to_string(), )); } let current_version = task_model.version; let mut active: task::ActiveModel = task_model.into(); active.assignee_id = Set(Some(req.delegate_to)); active.version = Set(current_version + 1); active.updated_at = Set(Utc::now()); active.updated_by = Set(operator_id); let updated = active .update(db) .await .map_err(|e| WorkflowError::Validation(e.to_string()))?; audit_service::record( AuditLog::new(tenant_id, Some(operator_id), "task.delegate", "task") .with_resource_id(id), db, ) .await; Ok(Self::model_to_resp(&updated)) } /// 创建任务记录(由执行引擎调用)。 #[allow(clippy::too_many_arguments)] pub async fn create_task( instance_id: Uuid, tenant_id: Uuid, token_id: Uuid, node_id: &str, node_name: Option<&str>, assignee_id: Option, candidate_groups: Option>, db: &sea_orm::DatabaseConnection, ) -> WorkflowResult { let id = Uuid::now_v7(); let now = Utc::now(); let system_user = Uuid::nil(); let model = task::ActiveModel { id: Set(id), tenant_id: Set(tenant_id), instance_id: Set(instance_id), token_id: Set(token_id), node_id: Set(node_id.to_string()), node_name: Set(node_name.map(|s| s.to_string())), assignee_id: Set(assignee_id), candidate_groups: Set( candidate_groups.map(|g| serde_json::to_value(g).unwrap_or_default()) ), status: Set("pending".to_string()), outcome: Set(None), form_data: Set(None), due_date: Set(None), completed_at: Set(None), created_at: Set(now), updated_at: Set(now), created_by: Set(system_user), updated_by: Set(system_user), deleted_at: Set(None), version: Set(1), }; model .insert(db) .await .map_err(|e| WorkflowError::Validation(e.to_string()))?; Ok(id) } /// 认领任务:将 pending 状态的任务分配给当前用户。 /// /// 适用于 candidate_groups 群组任务池中的任务,用户主动认领后 /// 任务状态变为 in_progress,assignee_id 设置为认领用户。 pub async fn claim( id: Uuid, tenant_id: Uuid, user_id: Uuid, db: &sea_orm::DatabaseConnection, ) -> WorkflowResult { let task_model = task::Entity::find_by_id(id) .one(db) .await .map_err(|e| WorkflowError::Validation(e.to_string()))? .filter(|t| t.tenant_id == tenant_id && t.deleted_at.is_none()) .ok_or_else(|| WorkflowError::NotFound(format!("任务不存在: {id}")))?; if task_model.status != "pending" { return Err(WorkflowError::InvalidState(format!( "任务状态不是 pending(当前状态: {}),无法认领", task_model.status ))); } let current_version = task_model.version; let mut active: task::ActiveModel = task_model.into(); active.assignee_id = Set(Some(user_id)); active.status = Set("in_progress".to_string()); active.version = Set(current_version + 1); active.updated_at = Set(Utc::now()); active.updated_by = Set(user_id); let updated = active .update(db) .await .map_err(|e| WorkflowError::Validation(e.to_string()))?; audit_service::record( AuditLog::new(tenant_id, Some(user_id), "task.claim", "task").with_resource_id(id), db, ) .await; Ok(Self::model_to_resp(&updated)) } fn model_to_resp(m: &task::Model) -> TaskResp { TaskResp { id: m.id, instance_id: m.instance_id, token_id: m.token_id, node_id: m.node_id.clone(), node_name: m.node_name.clone(), assignee_id: m.assignee_id, candidate_groups: m.candidate_groups.clone(), status: m.status.clone(), outcome: m.outcome.clone(), form_data: m.form_data.clone(), due_date: m.due_date, completed_at: m.completed_at, created_at: m.created_at, definition_name: None, business_key: None, version: m.version, } } }