Files
erp/crates/erp-workflow/src/service/definition_service.rs
iven 685df5e458 feat(core): implement event outbox persistence
Add domain_events migration and SeaORM entity. Modify EventBus::publish
to persist events before broadcasting (best-effort: DB failure logs
warning but still broadcasts in-memory). Update all 19 publish call
sites across 4 crates to pass db reference.

Add outbox relay background task that polls pending events every 5s
and re-broadcasts them, ensuring no events are lost on server restart.
2026-04-12 00:10:49 +08:00

312 lines
11 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

use chrono::Utc;
use sea_orm::{
ActiveModelTrait, ColumnTrait, EntityTrait, PaginatorTrait, QueryFilter, Set,
};
use uuid::Uuid;
use crate::dto::{
CreateProcessDefinitionReq, ProcessDefinitionResp, UpdateProcessDefinitionReq,
};
use crate::engine::parser;
use crate::entity::process_definition;
use crate::error::{WorkflowError, WorkflowResult};
use erp_core::audit::AuditLog;
use erp_core::audit_service;
use erp_core::error::check_version;
use erp_core::events::EventBus;
use erp_core::types::Pagination;
/// 流程定义 CRUD 服务。
pub struct DefinitionService;
impl DefinitionService {
/// 分页查询流程定义列表。
pub async fn list(
tenant_id: Uuid,
pagination: &Pagination,
db: &sea_orm::DatabaseConnection,
) -> WorkflowResult<(Vec<ProcessDefinitionResp>, u64)> {
let paginator = process_definition::Entity::find()
.filter(process_definition::Column::TenantId.eq(tenant_id))
.filter(process_definition::Column::DeletedAt.is_null())
.paginate(db, pagination.limit());
let total = paginator
.num_items()
.await
.map_err(|e| WorkflowError::Validation(e.to_string()))?;
let page_index = pagination.page.unwrap_or(1).saturating_sub(1);
let models = paginator
.fetch_page(page_index)
.await
.map_err(|e| WorkflowError::Validation(e.to_string()))?;
let resps: Vec<ProcessDefinitionResp> = models.iter().map(Self::model_to_resp).collect();
Ok((resps, total))
}
/// 获取单个流程定义。
pub async fn get_by_id(
id: Uuid,
tenant_id: Uuid,
db: &sea_orm::DatabaseConnection,
) -> WorkflowResult<ProcessDefinitionResp> {
let model = process_definition::Entity::find_by_id(id)
.one(db)
.await
.map_err(|e| WorkflowError::Validation(e.to_string()))?
.filter(|m| m.tenant_id == tenant_id && m.deleted_at.is_none())
.ok_or_else(|| WorkflowError::NotFound(format!("流程定义不存在: {id}")))?;
Ok(Self::model_to_resp(&model))
}
/// 创建流程定义。
pub async fn create(
tenant_id: Uuid,
operator_id: Uuid,
req: &CreateProcessDefinitionReq,
db: &sea_orm::DatabaseConnection,
event_bus: &EventBus,
) -> WorkflowResult<ProcessDefinitionResp> {
// 验证流程图合法性
parser::parse_and_validate(&req.nodes, &req.edges)?;
let now = Utc::now();
let id = Uuid::now_v7();
let nodes_json = serde_json::to_value(&req.nodes)
.map_err(|e| WorkflowError::Validation(e.to_string()))?;
let edges_json = serde_json::to_value(&req.edges)
.map_err(|e| WorkflowError::Validation(e.to_string()))?;
let model = process_definition::ActiveModel {
id: Set(id),
tenant_id: Set(tenant_id),
name: Set(req.name.clone()),
key: Set(req.key.clone()),
version: Set(1),
category: Set(req.category.clone()),
description: Set(req.description.clone()),
nodes: Set(nodes_json),
edges: Set(edges_json),
status: Set("draft".to_string()),
created_at: Set(now),
updated_at: Set(now),
created_by: Set(operator_id),
updated_by: Set(operator_id),
deleted_at: Set(None),
version_field: Set(1),
};
model
.insert(db)
.await
.map_err(|e| WorkflowError::Validation(e.to_string()))?;
event_bus.publish(erp_core::events::DomainEvent::new(
"process_definition.created",
tenant_id,
serde_json::json!({ "definition_id": id, "key": req.key }),
), db).await;
audit_service::record(
AuditLog::new(tenant_id, Some(operator_id), "process_definition.create", "process_definition")
.with_resource_id(id),
db,
)
.await;
Ok(ProcessDefinitionResp {
id,
name: req.name.clone(),
key: req.key.clone(),
version: 1,
category: req.category.clone(),
description: req.description.clone(),
nodes: serde_json::to_value(&req.nodes).unwrap_or_default(),
edges: serde_json::to_value(&req.edges).unwrap_or_default(),
status: "draft".to_string(),
created_at: now,
updated_at: now,
lock_version: 1,
})
}
/// 更新流程定义(仅 draft 状态可编辑)。
pub async fn update(
id: Uuid,
tenant_id: Uuid,
operator_id: Uuid,
req: &UpdateProcessDefinitionReq,
db: &sea_orm::DatabaseConnection,
) -> WorkflowResult<ProcessDefinitionResp> {
let model = process_definition::Entity::find_by_id(id)
.one(db)
.await
.map_err(|e| WorkflowError::Validation(e.to_string()))?
.filter(|m| m.tenant_id == tenant_id && m.deleted_at.is_none())
.ok_or_else(|| WorkflowError::NotFound(format!("流程定义不存在: {id}")))?;
if model.status != "draft" {
return Err(WorkflowError::InvalidState(
"只有 draft 状态的流程定义可以编辑".to_string(),
));
}
let current_version = model.version_field;
let mut active: process_definition::ActiveModel = model.into();
if let Some(name) = &req.name {
active.name = Set(name.clone());
}
if let Some(category) = &req.category {
active.category = Set(Some(category.clone()));
}
if let Some(description) = &req.description {
active.description = Set(Some(description.clone()));
}
if let Some(nodes) = &req.nodes {
// 验证新流程图
if let Some(edges) = &req.edges {
parser::parse_and_validate(nodes, edges)?;
}
let nodes_json = serde_json::to_value(nodes)
.map_err(|e| WorkflowError::Validation(e.to_string()))?;
active.nodes = Set(nodes_json);
}
if let Some(edges) = &req.edges {
let edges_json = serde_json::to_value(edges)
.map_err(|e| WorkflowError::Validation(e.to_string()))?;
active.edges = Set(edges_json);
}
let next_ver = check_version(req.version, current_version)
.map_err(|_| WorkflowError::VersionMismatch)?;
active.version_field = Set(next_ver);
active.updated_at = Set(Utc::now());
active.updated_by = Set(operator_id);
let updated = active
.update(db)
.await
.map_err(|e| WorkflowError::Validation(e.to_string()))?;
audit_service::record(
AuditLog::new(tenant_id, Some(operator_id), "process_definition.update", "process_definition")
.with_resource_id(id),
db,
)
.await;
Ok(Self::model_to_resp(&updated))
}
/// 发布流程定义draft → published
pub async fn publish(
id: Uuid,
tenant_id: Uuid,
operator_id: Uuid,
db: &sea_orm::DatabaseConnection,
event_bus: &EventBus,
) -> WorkflowResult<ProcessDefinitionResp> {
let model = process_definition::Entity::find_by_id(id)
.one(db)
.await
.map_err(|e| WorkflowError::Validation(e.to_string()))?
.filter(|m| m.tenant_id == tenant_id && m.deleted_at.is_none())
.ok_or_else(|| WorkflowError::NotFound(format!("流程定义不存在: {id}")))?;
if model.status != "draft" {
return Err(WorkflowError::InvalidState(
"只有 draft 状态的流程定义可以发布".to_string(),
));
}
// 验证流程图
let nodes: Vec<crate::dto::NodeDef> = serde_json::from_value(model.nodes.clone())
.map_err(|e| WorkflowError::InvalidDiagram(format!("节点数据无效: {e}")))?;
let edges: Vec<crate::dto::EdgeDef> = serde_json::from_value(model.edges.clone())
.map_err(|e| WorkflowError::InvalidDiagram(format!("连线数据无效: {e}")))?;
parser::parse_and_validate(&nodes, &edges)?;
let current_version = model.version_field;
let mut active: process_definition::ActiveModel = model.into();
active.status = Set("published".to_string());
active.version_field = Set(current_version + 1);
active.updated_at = Set(Utc::now());
active.updated_by = Set(operator_id);
let updated = active
.update(db)
.await
.map_err(|e| WorkflowError::Validation(e.to_string()))?;
event_bus.publish(erp_core::events::DomainEvent::new(
"process_definition.published",
tenant_id,
serde_json::json!({ "definition_id": id }),
), db).await;
audit_service::record(
AuditLog::new(tenant_id, Some(operator_id), "process_definition.publish", "process_definition")
.with_resource_id(id),
db,
)
.await;
Ok(Self::model_to_resp(&updated))
}
/// 软删除流程定义。
pub async fn delete(
id: Uuid,
tenant_id: Uuid,
operator_id: Uuid,
db: &sea_orm::DatabaseConnection,
) -> WorkflowResult<()> {
let model = process_definition::Entity::find_by_id(id)
.one(db)
.await
.map_err(|e| WorkflowError::Validation(e.to_string()))?
.filter(|m| m.tenant_id == tenant_id && m.deleted_at.is_none())
.ok_or_else(|| WorkflowError::NotFound(format!("流程定义不存在: {id}")))?;
let current_version = model.version_field;
let mut active: process_definition::ActiveModel = model.into();
active.version_field = Set(current_version + 1);
active.deleted_at = Set(Some(Utc::now()));
active.updated_at = Set(Utc::now());
active.updated_by = Set(operator_id);
active
.update(db)
.await
.map_err(|e| WorkflowError::Validation(e.to_string()))?;
audit_service::record(
AuditLog::new(tenant_id, Some(operator_id), "process_definition.delete", "process_definition")
.with_resource_id(id),
db,
)
.await;
Ok(())
}
fn model_to_resp(m: &process_definition::Model) -> ProcessDefinitionResp {
ProcessDefinitionResp {
id: m.id,
name: m.name.clone(),
key: m.key.clone(),
version: m.version,
category: m.category.clone(),
description: m.description.clone(),
nodes: m.nodes.clone(),
edges: m.edges.clone(),
status: m.status.clone(),
created_at: m.created_at,
updated_at: m.updated_at,
lock_version: m.version_field,
}
}
}