chore: apply cargo fmt across workspace and update docs
- Run cargo fmt on all Rust crates for consistent formatting - Update CLAUDE.md with WASM plugin commands and dev.ps1 instructions - Update wiki: add WASM plugin architecture, rewrite dev environment docs - Minor frontend cleanup (unused imports)
This commit is contained in:
@@ -2,15 +2,14 @@ use std::collections::HashMap;
|
||||
|
||||
use chrono::Utc;
|
||||
use sea_orm::{
|
||||
ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter, Set, ConnectionTrait,
|
||||
PaginatorTrait,
|
||||
ActiveModelTrait, ColumnTrait, ConnectionTrait, EntityTrait, PaginatorTrait, QueryFilter, Set,
|
||||
};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::dto::NodeType;
|
||||
use crate::engine::expression::ExpressionEvaluator;
|
||||
use crate::engine::model::FlowGraph;
|
||||
use crate::entity::{token, process_instance, task};
|
||||
use crate::entity::{process_instance, task, token};
|
||||
use crate::error::{WorkflowError, WorkflowResult};
|
||||
|
||||
/// Token 驱动的流程执行引擎。
|
||||
@@ -92,11 +91,16 @@ impl FlowExecutor {
|
||||
let mut active: token::ActiveModel = current_token.into();
|
||||
active.status = Set("consumed".to_string());
|
||||
active.consumed_at = Set(Some(Utc::now()));
|
||||
active.update(txn).await.map_err(|e| WorkflowError::Validation(e.to_string()))?;
|
||||
active
|
||||
.update(txn)
|
||||
.await
|
||||
.map_err(|e| WorkflowError::Validation(e.to_string()))?;
|
||||
|
||||
// 获取当前节点的出边
|
||||
let outgoing = graph.get_outgoing_edges(&node_id);
|
||||
let current_node = graph.nodes.get(&node_id)
|
||||
let current_node = graph
|
||||
.nodes
|
||||
.get(&node_id)
|
||||
.ok_or_else(|| WorkflowError::InvalidDiagram(format!("节点不存在: {node_id}")))?;
|
||||
|
||||
match current_node.node_type {
|
||||
@@ -177,11 +181,9 @@ impl FlowExecutor {
|
||||
}
|
||||
}
|
||||
|
||||
let target = matched_target
|
||||
.or(default_target)
|
||||
.ok_or_else(|| WorkflowError::ExpressionError(
|
||||
"排他网关没有匹配的条件分支".to_string(),
|
||||
))?;
|
||||
let target = matched_target.or(default_target).ok_or_else(|| {
|
||||
WorkflowError::ExpressionError("排他网关没有匹配的条件分支".to_string())
|
||||
})?;
|
||||
|
||||
Self::create_token_at_node(instance_id, tenant_id, target, graph, variables, txn).await
|
||||
}
|
||||
@@ -219,136 +221,139 @@ impl FlowExecutor {
|
||||
graph: &'a FlowGraph,
|
||||
variables: &'a HashMap<String, serde_json::Value>,
|
||||
txn: &'a impl ConnectionTrait,
|
||||
) -> std::pin::Pin<Box<dyn std::future::Future<Output = WorkflowResult<Vec<Uuid>>> + Send + 'a>> {
|
||||
) -> std::pin::Pin<Box<dyn std::future::Future<Output = WorkflowResult<Vec<Uuid>>> + Send + 'a>>
|
||||
{
|
||||
Box::pin(async move {
|
||||
let node = graph.nodes.get(node_id)
|
||||
.ok_or_else(|| WorkflowError::InvalidDiagram(format!("节点不存在: {node_id}")))?;
|
||||
let node = graph
|
||||
.nodes
|
||||
.get(node_id)
|
||||
.ok_or_else(|| WorkflowError::InvalidDiagram(format!("节点不存在: {node_id}")))?;
|
||||
|
||||
match node.node_type {
|
||||
NodeType::EndEvent => {
|
||||
// 到达 EndEvent,不创建新 token
|
||||
// 检查实例是否所有 token 都完成
|
||||
Self::check_instance_completion(instance_id, tenant_id, txn).await?;
|
||||
Ok(vec![])
|
||||
}
|
||||
NodeType::ParallelGateway
|
||||
if Self::is_join_gateway(node_id, graph) =>
|
||||
{
|
||||
// 并行网关汇合:等待所有入边 token 到达
|
||||
Self::handle_join_gateway(
|
||||
instance_id,
|
||||
tenant_id,
|
||||
node_id,
|
||||
graph,
|
||||
variables,
|
||||
txn,
|
||||
)
|
||||
.await
|
||||
}
|
||||
NodeType::ServiceTask => {
|
||||
// ServiceTask 自动执行:当前阶段自动跳过(直接推进到后继节点)
|
||||
// 创建一个立即消费的 token 记录(用于审计追踪)
|
||||
let now = Utc::now();
|
||||
let system_user = uuid::Uuid::nil();
|
||||
let auto_token_id = Uuid::now_v7();
|
||||
|
||||
let token_model = token::ActiveModel {
|
||||
id: Set(auto_token_id),
|
||||
tenant_id: Set(tenant_id),
|
||||
instance_id: Set(instance_id),
|
||||
node_id: Set(node_id.to_string()),
|
||||
status: Set("consumed".to_string()),
|
||||
created_at: Set(now),
|
||||
updated_at: Set(now),
|
||||
created_by: Set(system_user),
|
||||
updated_by: Set(system_user),
|
||||
deleted_at: Set(None),
|
||||
version: Set(1),
|
||||
consumed_at: Set(Some(now)),
|
||||
};
|
||||
token_model
|
||||
.insert(txn)
|
||||
.await
|
||||
.map_err(|e| WorkflowError::Validation(e.to_string()))?;
|
||||
|
||||
tracing::info!(node_id = node_id, node_name = %node.name, "ServiceTask 自动跳过(尚未实现 HTTP 调用)");
|
||||
|
||||
// 沿出边继续推进
|
||||
let outgoing = graph.get_outgoing_edges(node_id);
|
||||
let mut new_tokens = Vec::new();
|
||||
for edge in &outgoing {
|
||||
let tokens = Self::create_token_at_node(
|
||||
match node.node_type {
|
||||
NodeType::EndEvent => {
|
||||
// 到达 EndEvent,不创建新 token
|
||||
// 检查实例是否所有 token 都完成
|
||||
Self::check_instance_completion(instance_id, tenant_id, txn).await?;
|
||||
Ok(vec![])
|
||||
}
|
||||
NodeType::ParallelGateway if Self::is_join_gateway(node_id, graph) => {
|
||||
// 并行网关汇合:等待所有入边 token 到达
|
||||
Self::handle_join_gateway(
|
||||
instance_id,
|
||||
tenant_id,
|
||||
&edge.target,
|
||||
node_id,
|
||||
graph,
|
||||
variables,
|
||||
txn,
|
||||
)
|
||||
.await?;
|
||||
new_tokens.extend(tokens);
|
||||
}
|
||||
Ok(new_tokens)
|
||||
}
|
||||
_ => {
|
||||
// UserTask / 网关(分支)等:创建活跃 token
|
||||
let new_token_id = Uuid::now_v7();
|
||||
let now = Utc::now();
|
||||
|
||||
let system_user = uuid::Uuid::nil();
|
||||
|
||||
let token_model = token::ActiveModel {
|
||||
id: Set(new_token_id),
|
||||
tenant_id: Set(tenant_id),
|
||||
instance_id: Set(instance_id),
|
||||
node_id: Set(node_id.to_string()),
|
||||
status: Set("active".to_string()),
|
||||
created_at: Set(now),
|
||||
updated_at: Set(now),
|
||||
created_by: Set(system_user),
|
||||
updated_by: Set(system_user),
|
||||
deleted_at: Set(None),
|
||||
version: Set(1),
|
||||
consumed_at: Set(None),
|
||||
};
|
||||
token_model
|
||||
.insert(txn)
|
||||
.await
|
||||
.map_err(|e| WorkflowError::Validation(e.to_string()))?;
|
||||
}
|
||||
NodeType::ServiceTask => {
|
||||
// ServiceTask 自动执行:当前阶段自动跳过(直接推进到后继节点)
|
||||
// 创建一个立即消费的 token 记录(用于审计追踪)
|
||||
let now = Utc::now();
|
||||
let system_user = uuid::Uuid::nil();
|
||||
let auto_token_id = Uuid::now_v7();
|
||||
|
||||
// UserTask: 同时创建 task 记录
|
||||
if node.node_type == NodeType::UserTask {
|
||||
let task_model = task::ActiveModel {
|
||||
id: Set(Uuid::now_v7()),
|
||||
let token_model = token::ActiveModel {
|
||||
id: Set(auto_token_id),
|
||||
tenant_id: Set(tenant_id),
|
||||
instance_id: Set(instance_id),
|
||||
token_id: Set(new_token_id),
|
||||
node_id: Set(node_id.to_string()),
|
||||
node_name: Set(Some(node.name.clone())),
|
||||
assignee_id: Set(node.assignee_id),
|
||||
candidate_groups: Set(node.candidate_groups.as_ref()
|
||||
.map(|g| serde_json::to_value(g).unwrap_or_default())),
|
||||
status: Set("pending".to_string()),
|
||||
outcome: Set(None),
|
||||
form_data: Set(None),
|
||||
due_date: Set(None),
|
||||
completed_at: Set(None),
|
||||
status: Set("consumed".to_string()),
|
||||
created_at: Set(now),
|
||||
updated_at: Set(now),
|
||||
created_by: Set(Uuid::nil()),
|
||||
updated_by: Set(Uuid::nil()),
|
||||
created_by: Set(system_user),
|
||||
updated_by: Set(system_user),
|
||||
deleted_at: Set(None),
|
||||
version: Set(1),
|
||||
consumed_at: Set(Some(now)),
|
||||
};
|
||||
task_model
|
||||
token_model
|
||||
.insert(txn)
|
||||
.await
|
||||
.map_err(|e| WorkflowError::Validation(e.to_string()))?;
|
||||
}
|
||||
|
||||
Ok(vec![new_token_id])
|
||||
tracing::info!(node_id = node_id, node_name = %node.name, "ServiceTask 自动跳过(尚未实现 HTTP 调用)");
|
||||
|
||||
// 沿出边继续推进
|
||||
let outgoing = graph.get_outgoing_edges(node_id);
|
||||
let mut new_tokens = Vec::new();
|
||||
for edge in &outgoing {
|
||||
let tokens = Self::create_token_at_node(
|
||||
instance_id,
|
||||
tenant_id,
|
||||
&edge.target,
|
||||
graph,
|
||||
variables,
|
||||
txn,
|
||||
)
|
||||
.await?;
|
||||
new_tokens.extend(tokens);
|
||||
}
|
||||
Ok(new_tokens)
|
||||
}
|
||||
_ => {
|
||||
// UserTask / 网关(分支)等:创建活跃 token
|
||||
let new_token_id = Uuid::now_v7();
|
||||
let now = Utc::now();
|
||||
|
||||
let system_user = uuid::Uuid::nil();
|
||||
|
||||
let token_model = token::ActiveModel {
|
||||
id: Set(new_token_id),
|
||||
tenant_id: Set(tenant_id),
|
||||
instance_id: Set(instance_id),
|
||||
node_id: Set(node_id.to_string()),
|
||||
status: Set("active".to_string()),
|
||||
created_at: Set(now),
|
||||
updated_at: Set(now),
|
||||
created_by: Set(system_user),
|
||||
updated_by: Set(system_user),
|
||||
deleted_at: Set(None),
|
||||
version: Set(1),
|
||||
consumed_at: Set(None),
|
||||
};
|
||||
token_model
|
||||
.insert(txn)
|
||||
.await
|
||||
.map_err(|e| WorkflowError::Validation(e.to_string()))?;
|
||||
|
||||
// UserTask: 同时创建 task 记录
|
||||
if node.node_type == NodeType::UserTask {
|
||||
let task_model = task::ActiveModel {
|
||||
id: Set(Uuid::now_v7()),
|
||||
tenant_id: Set(tenant_id),
|
||||
instance_id: Set(instance_id),
|
||||
token_id: Set(new_token_id),
|
||||
node_id: Set(node_id.to_string()),
|
||||
node_name: Set(Some(node.name.clone())),
|
||||
assignee_id: Set(node.assignee_id),
|
||||
candidate_groups: Set(node
|
||||
.candidate_groups
|
||||
.as_ref()
|
||||
.map(|g| serde_json::to_value(g).unwrap_or_default())),
|
||||
status: Set("pending".to_string()),
|
||||
outcome: Set(None),
|
||||
form_data: Set(None),
|
||||
due_date: Set(None),
|
||||
completed_at: Set(None),
|
||||
created_at: Set(now),
|
||||
updated_at: Set(now),
|
||||
created_by: Set(Uuid::nil()),
|
||||
updated_by: Set(Uuid::nil()),
|
||||
deleted_at: Set(None),
|
||||
version: Set(1),
|
||||
};
|
||||
task_model
|
||||
.insert(txn)
|
||||
.await
|
||||
.map_err(|e| WorkflowError::Validation(e.to_string()))?;
|
||||
}
|
||||
|
||||
Ok(vec![new_token_id])
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
@@ -445,7 +450,10 @@ impl FlowExecutor {
|
||||
active.status = Set("completed".to_string());
|
||||
active.completed_at = Set(Some(Utc::now()));
|
||||
active.updated_at = Set(Utc::now());
|
||||
active.update(txn).await.map_err(|e| WorkflowError::Validation(e.to_string()))?;
|
||||
active
|
||||
.update(txn)
|
||||
.await
|
||||
.map_err(|e| WorkflowError::Validation(e.to_string()))?;
|
||||
|
||||
// 写入完成事件到 outbox,由 relay 广播
|
||||
let now = Utc::now();
|
||||
@@ -461,7 +469,10 @@ impl FlowExecutor {
|
||||
created_at: Set(now),
|
||||
published_at: Set(None),
|
||||
};
|
||||
outbox_event.insert(txn).await.map_err(|e| WorkflowError::Validation(e.to_string()))?;
|
||||
outbox_event
|
||||
.insert(txn)
|
||||
.await
|
||||
.map_err(|e| WorkflowError::Validation(e.to_string()))?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
Reference in New Issue
Block a user