use erp_core::events::EventBus; use erp_core::types::Pagination; use erp_workflow::dto::{ CompleteTaskReq, CreateProcessDefinitionReq, EdgeDef, NodeDef, NodeType, StartInstanceReq, }; use erp_workflow::service::definition_service::DefinitionService; use erp_workflow::service::instance_service::InstanceService; use erp_workflow::service::task_service::TaskService; use super::test_db::TestDb; /// 构建一个最简单的线性流程:开始 → 审批 → 结束 /// assignee 指向 operator_id,使 list_pending 能查到任务 fn make_simple_definition(name: &str, key: &str, assignee_id: Option) -> CreateProcessDefinitionReq { CreateProcessDefinitionReq { name: name.to_string(), key: key.to_string(), category: Some("test".to_string()), description: Some("集成测试流程".to_string()), nodes: vec![ NodeDef { id: "start".to_string(), node_type: NodeType::StartEvent, name: "开始".to_string(), assignee_id: None, candidate_groups: None, service_type: None, position: None, }, NodeDef { id: "approve".to_string(), node_type: NodeType::UserTask, name: "审批".to_string(), assignee_id, candidate_groups: None, service_type: None, position: None, }, NodeDef { id: "end".to_string(), node_type: NodeType::EndEvent, name: "结束".to_string(), assignee_id: None, candidate_groups: None, service_type: None, position: None, }, ], edges: vec![ EdgeDef { id: "e1".to_string(), source: "start".to_string(), target: "approve".to_string(), condition: None, label: None, }, EdgeDef { id: "e2".to_string(), source: "approve".to_string(), target: "end".to_string(), condition: None, label: None, }, ], } } #[tokio::test] async fn test_workflow_definition_crud() { let test_db = TestDb::new().await; let db = test_db.db(); let tenant_id = uuid::Uuid::new_v4(); let operator_id = uuid::Uuid::new_v4(); let event_bus = EventBus::new(100); let def = DefinitionService::create( tenant_id, operator_id, &make_simple_definition("测试流程", "test-flow-1", None), db, &event_bus, ) .await .expect("创建流程定义失败"); assert_eq!(def.name, "测试流程"); assert_eq!(def.status, "draft"); let (defs, total) = DefinitionService::list( tenant_id, &Pagination { page: Some(1), page_size: Some(10), }, db, ) .await .expect("查询流程定义列表失败"); assert_eq!(total, 1); assert_eq!(defs[0].name, "测试流程"); let found = DefinitionService::get_by_id(def.id, tenant_id, db) .await .expect("查询流程定义失败"); assert_eq!(found.id, def.id); let published = DefinitionService::publish(def.id, tenant_id, operator_id, db, &event_bus) .await .expect("发布流程定义失败"); assert_eq!(published.status, "published"); } #[tokio::test] async fn test_workflow_instance_lifecycle() { let test_db = TestDb::new().await; let db = test_db.db(); let tenant_id = uuid::Uuid::new_v4(); let operator_id = uuid::Uuid::new_v4(); let event_bus = EventBus::new(100); let def = DefinitionService::create( tenant_id, operator_id, &make_simple_definition("生命周期测试", "lifecycle-flow", Some(operator_id)), db, &event_bus, ) .await .expect("创建流程定义失败"); let def = DefinitionService::publish(def.id, tenant_id, operator_id, db, &event_bus) .await .expect("发布流程定义失败"); let instance = InstanceService::start( tenant_id, operator_id, &StartInstanceReq { definition_id: def.id, business_key: Some("测试实例".to_string()), variables: None, }, db, &event_bus, ) .await .expect("启动流程实例失败"); assert_eq!(instance.status, "running"); let (tasks, task_total) = TaskService::list_pending( tenant_id, operator_id, &Pagination { page: Some(1), page_size: Some(10), }, db, ) .await .expect("查询待办任务失败"); assert_eq!(task_total, 1); assert_eq!(tasks[0].status, "pending"); let completed = TaskService::complete( tasks[0].id, tenant_id, operator_id, &CompleteTaskReq { outcome: "approved".to_string(), form_data: Some(serde_json::json!({"comment": "同意"})), }, db, &event_bus, ) .await .expect("完成任务失败"); assert_eq!(completed.status, "completed"); } #[tokio::test] async fn test_workflow_tenant_isolation() { let test_db = TestDb::new().await; let db = test_db.db(); let tenant_a = uuid::Uuid::new_v4(); let tenant_b = uuid::Uuid::new_v4(); let operator_id = uuid::Uuid::new_v4(); let event_bus = EventBus::new(100); let def_a = DefinitionService::create( tenant_a, operator_id, &make_simple_definition("租户A流程", "tenant-a-flow", None), db, &event_bus, ) .await .expect("创建流程定义失败"); let (defs_b, total_b) = DefinitionService::list( tenant_b, &Pagination { page: Some(1), page_size: Some(10), }, db, ) .await .expect("查询流程定义列表失败"); assert_eq!(total_b, 0); assert!(defs_b.is_empty()); let result = DefinitionService::get_by_id(def_a.id, tenant_b, db).await; assert!(result.is_err()); } #[tokio::test] async fn test_event_bus_pub_sub() { let event_bus = EventBus::new(100); let tenant_id = uuid::Uuid::new_v4(); let (mut receiver, _handle) = event_bus.subscribe_filtered("user.".to_string()); let event = erp_core::events::DomainEvent::new( "user.created", tenant_id, serde_json::json!({"username": "test"}), ); event_bus.broadcast(event); let other_event = erp_core::events::DomainEvent::new( "workflow.started", tenant_id, serde_json::json!({}), ); event_bus.broadcast(other_event); let received = receiver.recv().await; assert!(received.is_some()); let received = received.unwrap(); assert_eq!(received.event_type, "user.created"); assert_eq!(received.payload["username"], "test"); }