use erp_core::events::EventBus; use erp_core::types::Pagination; use erp_workflow::dto::{ CompleteTaskReq, CreateProcessDefinitionReq, StartInstanceReq, }; use erp_workflow::service::{DefinitionService, InstanceService, TaskService}; use super::test_db::TestDb; /// 构建一个最简单的线性流程:开始 → 审批 → 结束 fn make_simple_definition(name: &str) -> CreateProcessDefinitionReq { use erp_workflow::dto::{EdgeDef, NodeDef}; CreateProcessDefinitionReq { name: name.to_string(), description: Some("集成测试流程".to_string()), nodes: vec![ NodeDef { id: "start".to_string(), node_type: "start".to_string(), label: "开始".to_string(), ..Default::default() }, NodeDef { id: "approve".to_string(), node_type: "userTask".to_string(), label: "审批".to_string(), assignee: Some("${initiator}".to_string()), ..Default::default() }, NodeDef { id: "end".to_string(), node_type: "end".to_string(), label: "结束".to_string(), ..Default::default() }, ], edges: vec![ EdgeDef { id: "e1".to_string(), source: "start".to_string(), target: "approve".to_string(), ..Default::default() }, EdgeDef { id: "e2".to_string(), source: "approve".to_string(), target: "end".to_string(), ..Default::default() }, ], } } #[tokio::test] async fn test_workflow_definition_crud() { let test_db = TestDb::new().await; let db = &test_db.db; let tenant_id = uuid::Uuid::new_v4(); let operator_id = uuid::Uuid::new_v4(); let event_bus = EventBus::new(100); // 创建流程定义 let def = DefinitionService::create( tenant_id, operator_id, &make_simple_definition("测试流程"), db, &event_bus, ) .await .expect("创建流程定义失败"); assert_eq!(def.name, "测试流程"); assert_eq!(def.status, "draft"); // 查询列表 let (defs, total) = DefinitionService::list( tenant_id, &Pagination { page: Some(1), page_size: Some(10), }, db, ) .await .expect("查询流程定义列表失败"); assert_eq!(total, 1); assert_eq!(defs[0].name, "测试流程"); // 按 ID 查询 let found = DefinitionService::get_by_id(def.id, tenant_id, db) .await .expect("查询流程定义失败"); assert_eq!(found.id, def.id); // 发布 let published = DefinitionService::publish(def.id, tenant_id, operator_id, db, &event_bus) .await .expect("发布流程定义失败"); assert_eq!(published.status, "published"); } #[tokio::test] async fn test_workflow_instance_lifecycle() { let test_db = TestDb::new().await; let db = &test_db.db; let tenant_id = uuid::Uuid::new_v4(); let operator_id = uuid::Uuid::new_v4(); let event_bus = EventBus::new(100); // 创建并发布流程定义 let def = DefinitionService::create( tenant_id, operator_id, &make_simple_definition("生命周期测试"), db, &event_bus, ) .await .expect("创建流程定义失败"); let def = DefinitionService::publish(def.id, tenant_id, operator_id, db, &event_bus) .await .expect("发布流程定义失败"); // 启动流程实例 let instance = InstanceService::start( tenant_id, operator_id, &StartInstanceReq { definition_id: def.id, title: Some("测试实例".to_string()), variables: None, }, db, &event_bus, ) .await .expect("启动流程实例失败"); assert_eq!(instance.status, "running"); // 查询待办任务 let (tasks, task_total) = TaskService::list_pending( tenant_id, operator_id, &Pagination { page: Some(1), page_size: Some(10), }, db, ) .await .expect("查询待办任务失败"); assert_eq!(task_total, 1); assert_eq!(tasks[0].status, "pending"); // 完成任务 let completed = TaskService::complete( tasks[0].id, tenant_id, operator_id, &CompleteTaskReq { outcome: Some("approved".to_string()), comment: Some("同意".to_string()), }, db, &event_bus, ) .await .expect("完成任务失败"); assert_eq!(completed.status, "completed"); } #[tokio::test] async fn test_workflow_tenant_isolation() { let test_db = TestDb::new().await; let db = &test_db.db; let tenant_a = uuid::Uuid::new_v4(); let tenant_b = uuid::Uuid::new_v4(); let operator_id = uuid::Uuid::new_v4(); let event_bus = EventBus::new(100); // 租户 A 创建流程定义 let def_a = DefinitionService::create( tenant_a, operator_id, &make_simple_definition("租户A流程"), db, &event_bus, ) .await .expect("创建流程定义失败"); // 租户 B 查询不应看到租户 A 的定义 let (defs_b, total_b) = DefinitionService::list( tenant_b, &Pagination { page: Some(1), page_size: Some(10), }, db, ) .await .expect("查询流程定义列表失败"); assert_eq!(total_b, 0); assert!(defs_b.is_empty()); // 租户 B 按 ID 查询租户 A 的定义应返回错误 let result = DefinitionService::get_by_id(def_a.id, tenant_b, db).await; assert!(result.is_err()); } #[tokio::test] async fn test_event_bus_pub_sub() { let event_bus = EventBus::new(100); let tenant_id = uuid::Uuid::new_v4(); // 订阅 "user." 前缀事件 let (mut receiver, _handle) = event_bus.subscribe_filtered("user.".to_string()); // 发布匹配事件 let event = erp_core::events::DomainEvent::new( "user.created", tenant_id, serde_json::json!({"username": "test"}), ); event_bus.broadcast(event); // 发布不匹配事件 let other_event = erp_core::events::DomainEvent::new( "workflow.started", tenant_id, serde_json::json!({}), ); event_bus.broadcast(other_event); // 应只收到匹配事件 let received = receiver.recv().await; assert!(received.is_some()); let received = received.unwrap(); assert_eq!(received.event_type, "user.created"); assert_eq!(received.payload["username"], "test"); // 不匹配事件不应出现 // broadcast channel 不会发送不匹配的事件到 filtered receiver }