feat: initialize Nuanji (Warm Notes) project

- Base platform from base.git (ERP base: auth, core, config, message, workflow, plugin)
- Created erp-diary module skeleton (lib.rs, dto.rs, error.rs, event.rs, state.rs)
- Integrated erp-diary into workspace and erp-server
- Added DiaryModule registration in main.rs
- Added DiaryState FromRef in state.rs
- Diary routes mounted (empty routes, ready for implementation)
- Product design spec v1.2 preserved in docs/
- Implementation plan preserved in plans/

Cargo check: OK
Cargo test: OK (78+ base tests passing)
This commit is contained in:
iven
2026-05-31 20:52:19 +08:00
commit c539e6fd83
285 changed files with 59156 additions and 0 deletions

View File

@@ -0,0 +1,205 @@
//! AI 行动闭环 BPMN 流程定义种子数据
//!
//! 三条流程:
//! - ai_followup_workflow — AI 随访建议审批
//! - ai_appointment_workflow — AI 预约建议审批
//! - ai_alert_workflow — AI 预警确认
use chrono::Utc;
use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter, Set};
use uuid::Uuid;
use crate::entity::process_definition;
/// AI 随访审批流程
///
/// ```text
/// Start → ExclusiveGateway(风险分级)
/// → [low] → End (自动执行,由分发器直接处理)
/// → [medium] → UserTask(医生审批) → ExclusiveGateway → [approved] → End
/// → [rejected] → End
/// → [high] → UserTask(紧急确认) → ExclusiveGateway → [approved] → End
/// → [rejected] → End
/// ```
fn followup_nodes() -> Vec<serde_json::Value> {
serde_json::from_value(serde_json::json!([
{"id": "start", "type": "StartEvent", "name": "AI 随访建议"},
{"id": "gw_risk", "type": "ExclusiveGateway", "name": "风险分级"},
{"id": "end_auto", "type": "EndEvent", "name": "自动完成"},
{"id": "doctor_review", "type": "UserTask", "name": "医生审批随访建议",
"candidate_groups": ["doctor"]},
{"id": "gw_outcome", "type": "ExclusiveGateway", "name": "审批结果"},
{"id": "end_approved", "type": "EndEvent", "name": "已批准"},
{"id": "end_rejected", "type": "EndEvent", "name": "已拒绝"}
]))
.unwrap()
}
fn followup_edges() -> Vec<serde_json::Value> {
serde_json::from_value(serde_json::json!([
{"id": "e1", "source": "start", "target": "gw_risk"},
{"id": "e2", "source": "gw_risk", "target": "end_auto",
"condition": "risk_level == \"low\"", "label": "低风险"},
{"id": "e3", "source": "gw_risk", "target": "doctor_review",
"label": "中/高风险"},
{"id": "e4", "source": "doctor_review", "target": "gw_outcome"},
{"id": "e5", "source": "gw_outcome", "target": "end_approved",
"condition": "outcome == \"approved\"", "label": "批准"},
{"id": "e6", "source": "gw_outcome", "target": "end_rejected",
"condition": "outcome == \"rejected\"", "label": "拒绝"}
]))
.unwrap()
}
/// AI 预约审批流程
fn appointment_nodes() -> Vec<serde_json::Value> {
serde_json::from_value(serde_json::json!([
{"id": "start", "type": "StartEvent", "name": "AI 预约建议"},
{"id": "gw_risk", "type": "ExclusiveGateway", "name": "风险分级"},
{"id": "end_auto", "type": "EndEvent", "name": "自动完成"},
{"id": "doctor_confirm", "type": "UserTask", "name": "医生确认预约建议",
"candidate_groups": ["doctor"]},
{"id": "gw_outcome", "type": "ExclusiveGateway", "name": "确认结果"},
{"id": "end_approved", "type": "EndEvent", "name": "已确认"},
{"id": "end_rejected", "type": "EndEvent", "name": "已拒绝"}
]))
.unwrap()
}
fn appointment_edges() -> Vec<serde_json::Value> {
serde_json::from_value(serde_json::json!([
{"id": "e1", "source": "start", "target": "gw_risk"},
{"id": "e2", "source": "gw_risk", "target": "end_auto",
"condition": "risk_level == \"low\"", "label": "低风险"},
{"id": "e3", "source": "gw_risk", "target": "doctor_confirm",
"label": "中/高风险"},
{"id": "e4", "source": "doctor_confirm", "target": "gw_outcome"},
{"id": "e5", "source": "gw_outcome", "target": "end_approved",
"condition": "outcome == \"approved\"", "label": "确认"},
{"id": "e6", "source": "gw_outcome", "target": "end_rejected",
"condition": "outcome == \"rejected\"", "label": "拒绝"}
]))
.unwrap()
}
/// AI 预警确认流程
fn alert_nodes() -> Vec<serde_json::Value> {
serde_json::from_value(serde_json::json!([
{"id": "start", "type": "StartEvent", "name": "AI 预警"},
{"id": "gw_risk", "type": "ExclusiveGateway", "name": "风险分级"},
{"id": "end_auto", "type": "EndEvent", "name": "已发送"},
{"id": "doctor_ack", "type": "UserTask", "name": "医生确认预警",
"candidate_groups": ["doctor"]},
{"id": "gw_outcome", "type": "ExclusiveGateway", "name": "确认结果"},
{"id": "end_acknowledged", "type": "EndEvent", "name": "已确认"},
{"id": "end_escalated", "type": "EndEvent", "name": "已升级"}
]))
.unwrap()
}
fn alert_edges() -> Vec<serde_json::Value> {
serde_json::from_value(serde_json::json!([
{"id": "e1", "source": "start", "target": "gw_risk"},
{"id": "e2", "source": "gw_risk", "target": "end_auto",
"condition": "risk_level == \"low\"", "label": "低风险"},
{"id": "e3", "source": "gw_risk", "target": "doctor_ack",
"label": "中/高风险"},
{"id": "e4", "source": "doctor_ack", "target": "gw_outcome"},
{"id": "e5", "source": "gw_outcome", "target": "end_acknowledged",
"condition": "outcome == \"approved\"", "label": "确认"},
{"id": "e6", "source": "gw_outcome", "target": "end_escalated",
"condition": "outcome == \"rejected\"", "label": "升级"}
]))
.unwrap()
}
struct WorkflowTemplate {
key: &'static str,
name: &'static str,
category: &'static str,
description: &'static str,
nodes: Vec<serde_json::Value>,
edges: Vec<serde_json::Value>,
}
fn all_templates() -> Vec<WorkflowTemplate> {
vec![
WorkflowTemplate {
key: "ai_followup_workflow",
name: "AI 随访建议审批",
category: "ai_action",
description: "AI 分析生成的随访建议,按风险等级自动执行或提交医生审批",
nodes: followup_nodes(),
edges: followup_edges(),
},
WorkflowTemplate {
key: "ai_appointment_workflow",
name: "AI 预约建议审批",
category: "ai_action",
description: "AI 分析生成的预约建议,按风险等级自动执行或提交医生确认",
nodes: appointment_nodes(),
edges: appointment_edges(),
},
WorkflowTemplate {
key: "ai_alert_workflow",
name: "AI 预警确认",
category: "ai_action",
description: "AI 分析生成的预警通知,按风险等级自动发送或提交医生确认",
nodes: alert_nodes(),
edges: alert_edges(),
},
]
}
/// 确保 AI 行动闭环的工作流定义存在(幂等)。
///
/// 对每个 tenant_id 检查 key 是否已存在,不存在则创建并发布。
pub async fn ensure_ai_workflows(
db: &sea_orm::DatabaseConnection,
tenant_id: Uuid,
) -> Result<(), sea_orm::DbErr> {
let system_id = Uuid::parse_str("00000000-0000-0000-0000-000000000000").unwrap();
for tmpl in all_templates() {
let exists = process_definition::Entity::find()
.filter(process_definition::Column::TenantId.eq(tenant_id))
.filter(process_definition::Column::Key.eq(tmpl.key))
.filter(process_definition::Column::DeletedAt.is_null())
.one(db)
.await?
.is_some();
if exists {
continue;
}
let now = Utc::now();
let id = Uuid::now_v7();
let active = process_definition::ActiveModel {
id: Set(id),
tenant_id: Set(tenant_id),
name: Set(tmpl.name.to_string()),
key: Set(tmpl.key.to_string()),
version: Set(1),
category: Set(Some(tmpl.category.to_string())),
description: Set(Some(tmpl.description.to_string())),
nodes: Set(serde_json::json!(tmpl.nodes)),
edges: Set(serde_json::json!(tmpl.edges)),
status: Set("published".to_string()),
created_at: Set(now),
updated_at: Set(now),
created_by: Set(system_id),
updated_by: Set(system_id),
deleted_at: Set(None),
version_field: Set(1),
};
active.insert(db).await?;
tracing::info!(
key = %tmpl.key,
tenant_id = %tenant_id,
"AI 工作流定义已创建"
);
}
Ok(())
}

View File

@@ -0,0 +1,417 @@
use chrono::Utc;
use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, PaginatorTrait, QueryFilter, Set};
use uuid::Uuid;
use crate::dto::{CreateProcessDefinitionReq, ProcessDefinitionResp, UpdateProcessDefinitionReq};
use crate::engine::parser;
use crate::entity::process_definition;
use crate::error::{WorkflowError, WorkflowResult};
use erp_core::audit::AuditLog;
use erp_core::audit_service;
use erp_core::error::check_version;
use erp_core::events::EventBus;
use erp_core::types::Pagination;
/// 流程定义 CRUD 服务。
pub struct DefinitionService;
impl DefinitionService {
/// 分页查询流程定义列表。
pub async fn list(
tenant_id: Uuid,
pagination: &Pagination,
db: &sea_orm::DatabaseConnection,
) -> WorkflowResult<(Vec<ProcessDefinitionResp>, u64)> {
let paginator = process_definition::Entity::find()
.filter(process_definition::Column::TenantId.eq(tenant_id))
.filter(process_definition::Column::DeletedAt.is_null())
.paginate(db, pagination.limit());
let total = paginator
.num_items()
.await
.map_err(|e| WorkflowError::Validation(e.to_string()))?;
let page_index = pagination.page.unwrap_or(1).saturating_sub(1);
let models = paginator
.fetch_page(page_index)
.await
.map_err(|e| WorkflowError::Validation(e.to_string()))?;
let resps: Vec<ProcessDefinitionResp> = models.iter().map(Self::model_to_resp).collect();
Ok((resps, total))
}
/// 获取单个流程定义。
pub async fn get_by_id(
id: Uuid,
tenant_id: Uuid,
db: &sea_orm::DatabaseConnection,
) -> WorkflowResult<ProcessDefinitionResp> {
let model = process_definition::Entity::find_by_id(id)
.one(db)
.await
.map_err(|e| WorkflowError::Validation(e.to_string()))?
.filter(|m| m.tenant_id == tenant_id && m.deleted_at.is_none())
.ok_or_else(|| WorkflowError::NotFound(format!("流程定义不存在: {id}")))?;
Ok(Self::model_to_resp(&model))
}
/// 创建流程定义。
pub async fn create(
tenant_id: Uuid,
operator_id: Uuid,
req: &CreateProcessDefinitionReq,
db: &sea_orm::DatabaseConnection,
event_bus: &EventBus,
) -> WorkflowResult<ProcessDefinitionResp> {
// 验证流程图合法性
parser::parse_and_validate(&req.nodes, &req.edges)?;
let now = Utc::now();
let id = Uuid::now_v7();
let nodes_json = serde_json::to_value(&req.nodes)
.map_err(|e| WorkflowError::Validation(e.to_string()))?;
let edges_json = serde_json::to_value(&req.edges)
.map_err(|e| WorkflowError::Validation(e.to_string()))?;
let model = process_definition::ActiveModel {
id: Set(id),
tenant_id: Set(tenant_id),
name: Set(req.name.clone()),
key: Set(req.key.clone()),
version: Set(1),
category: Set(req.category.clone()),
description: Set(req.description.clone()),
nodes: Set(nodes_json),
edges: Set(edges_json),
status: Set("draft".to_string()),
created_at: Set(now),
updated_at: Set(now),
created_by: Set(operator_id),
updated_by: Set(operator_id),
deleted_at: Set(None),
version_field: Set(1),
};
model
.insert(db)
.await
.map_err(|e| WorkflowError::Validation(e.to_string()))?;
event_bus
.publish(
erp_core::events::DomainEvent::new(
"process_definition.created",
tenant_id,
serde_json::json!({ "definition_id": id, "key": req.key }),
),
db,
)
.await;
audit_service::record(
AuditLog::new(
tenant_id,
Some(operator_id),
"process_definition.create",
"process_definition",
)
.with_resource_id(id),
db,
)
.await;
Ok(ProcessDefinitionResp {
id,
name: req.name.clone(),
key: req.key.clone(),
version: 1,
category: req.category.clone(),
description: req.description.clone(),
nodes: serde_json::to_value(&req.nodes).unwrap_or_default(),
edges: serde_json::to_value(&req.edges).unwrap_or_default(),
status: "draft".to_string(),
created_at: now,
updated_at: now,
lock_version: 1,
})
}
/// 更新流程定义(仅 draft 状态可编辑)。
pub async fn update(
id: Uuid,
tenant_id: Uuid,
operator_id: Uuid,
req: &UpdateProcessDefinitionReq,
db: &sea_orm::DatabaseConnection,
) -> WorkflowResult<ProcessDefinitionResp> {
let model = process_definition::Entity::find_by_id(id)
.one(db)
.await
.map_err(|e| WorkflowError::Validation(e.to_string()))?
.filter(|m| m.tenant_id == tenant_id && m.deleted_at.is_none())
.ok_or_else(|| WorkflowError::NotFound(format!("流程定义不存在: {id}")))?;
if model.status != "draft" {
return Err(WorkflowError::InvalidState(
"只有 draft 状态的流程定义可以编辑".to_string(),
));
}
let current_version = model.version_field;
let mut active: process_definition::ActiveModel = model.into();
if let Some(name) = &req.name {
active.name = Set(name.clone());
}
if let Some(category) = &req.category {
active.category = Set(Some(category.clone()));
}
if let Some(description) = &req.description {
active.description = Set(Some(description.clone()));
}
// 当 nodes 或 edges 任一存在时,取最终值验证流程图完整性
let _final_nodes = req.nodes.as_ref().or_else(|| {
serde_json::from_value::<Vec<crate::dto::NodeDef>>(active.nodes.as_ref().clone())
.ok()
.as_ref()
.map(|_| unreachable!())
});
// 简化:如果提供了 nodes 或 edges将两者合并后验证
if req.nodes.is_some() || req.edges.is_some() {
let nodes_val = req
.nodes
.as_ref()
.map(|n| serde_json::to_value(n).unwrap_or_default())
.unwrap_or(active.nodes.as_ref().clone());
let edges_val = req
.edges
.as_ref()
.map(|e| serde_json::to_value(e).unwrap_or_default())
.unwrap_or(active.edges.as_ref().clone());
let nodes: Vec<crate::dto::NodeDef> = serde_json::from_value(nodes_val)
.map_err(|e| WorkflowError::Validation(format!("节点数据无效: {e}")))?;
let edges: Vec<crate::dto::EdgeDef> = serde_json::from_value(edges_val)
.map_err(|e| WorkflowError::Validation(format!("连线数据无效: {e}")))?;
parser::parse_and_validate(&nodes, &edges)?;
}
if let Some(nodes) = &req.nodes {
let nodes_json = serde_json::to_value(nodes)
.map_err(|e| WorkflowError::Validation(e.to_string()))?;
active.nodes = Set(nodes_json);
}
if let Some(edges) = &req.edges {
let edges_json = serde_json::to_value(edges)
.map_err(|e| WorkflowError::Validation(e.to_string()))?;
active.edges = Set(edges_json);
}
let next_ver = check_version(req.version, current_version)
.map_err(|_| WorkflowError::VersionMismatch)?;
active.version_field = Set(next_ver);
active.updated_at = Set(Utc::now());
active.updated_by = Set(operator_id);
let updated = active
.update(db)
.await
.map_err(|e| WorkflowError::Validation(e.to_string()))?;
audit_service::record(
AuditLog::new(
tenant_id,
Some(operator_id),
"process_definition.update",
"process_definition",
)
.with_resource_id(id),
db,
)
.await;
Ok(Self::model_to_resp(&updated))
}
/// 发布流程定义draft → published
pub async fn publish(
id: Uuid,
tenant_id: Uuid,
operator_id: Uuid,
db: &sea_orm::DatabaseConnection,
event_bus: &EventBus,
) -> WorkflowResult<ProcessDefinitionResp> {
let model = process_definition::Entity::find_by_id(id)
.one(db)
.await
.map_err(|e| WorkflowError::Validation(e.to_string()))?
.filter(|m| m.tenant_id == tenant_id && m.deleted_at.is_none())
.ok_or_else(|| WorkflowError::NotFound(format!("流程定义不存在: {id}")))?;
if model.status != "draft" {
return Err(WorkflowError::InvalidState(
"只有 draft 状态的流程定义可以发布".to_string(),
));
}
// 验证流程图
let nodes: Vec<crate::dto::NodeDef> = serde_json::from_value(model.nodes.clone())
.map_err(|e| WorkflowError::InvalidDiagram(format!("节点数据无效: {e}")))?;
let edges: Vec<crate::dto::EdgeDef> = serde_json::from_value(model.edges.clone())
.map_err(|e| WorkflowError::InvalidDiagram(format!("连线数据无效: {e}")))?;
parser::parse_and_validate(&nodes, &edges)?;
let current_version = model.version_field;
let mut active: process_definition::ActiveModel = model.into();
active.status = Set("published".to_string());
active.version_field = Set(current_version + 1);
active.updated_at = Set(Utc::now());
active.updated_by = Set(operator_id);
let updated = active
.update(db)
.await
.map_err(|e| WorkflowError::Validation(e.to_string()))?;
event_bus
.publish(
erp_core::events::DomainEvent::new(
"process_definition.published",
tenant_id,
serde_json::json!({ "definition_id": id }),
),
db,
)
.await;
audit_service::record(
AuditLog::new(
tenant_id,
Some(operator_id),
"process_definition.publish",
"process_definition",
)
.with_resource_id(id),
db,
)
.await;
Ok(Self::model_to_resp(&updated))
}
/// 将已发布的流程定义标记为 deprecated。
pub async fn deprecate(
id: Uuid,
tenant_id: Uuid,
operator_id: Uuid,
db: &sea_orm::DatabaseConnection,
event_bus: &EventBus,
) -> WorkflowResult<ProcessDefinitionResp> {
let model = process_definition::Entity::find_by_id(id)
.one(db)
.await
.map_err(|e| WorkflowError::Validation(e.to_string()))?
.filter(|m| m.tenant_id == tenant_id && m.deleted_at.is_none())
.ok_or_else(|| WorkflowError::NotFound(format!("流程定义不存在: {id}")))?;
if model.status != "published" {
return Err(WorkflowError::InvalidState(
"只有 published 状态的流程定义可以废弃".to_string(),
));
}
let current_version = model.version_field;
let mut active: process_definition::ActiveModel = model.into();
active.status = Set("deprecated".to_string());
active.version_field = Set(current_version + 1);
active.updated_at = Set(Utc::now());
active.updated_by = Set(operator_id);
let updated = active
.update(db)
.await
.map_err(|e| WorkflowError::Validation(e.to_string()))?;
event_bus
.publish(
erp_core::events::DomainEvent::new(
"process_definition.deprecated",
tenant_id,
serde_json::json!({ "definition_id": id }),
),
db,
)
.await;
audit_service::record(
AuditLog::new(
tenant_id,
Some(operator_id),
"process_definition.deprecate",
"process_definition",
)
.with_resource_id(id),
db,
)
.await;
Ok(Self::model_to_resp(&updated))
}
/// 软删除流程定义。
pub async fn delete(
id: Uuid,
tenant_id: Uuid,
operator_id: Uuid,
db: &sea_orm::DatabaseConnection,
) -> WorkflowResult<()> {
let model = process_definition::Entity::find_by_id(id)
.one(db)
.await
.map_err(|e| WorkflowError::Validation(e.to_string()))?
.filter(|m| m.tenant_id == tenant_id && m.deleted_at.is_none())
.ok_or_else(|| WorkflowError::NotFound(format!("流程定义不存在: {id}")))?;
let current_version = model.version_field;
let mut active: process_definition::ActiveModel = model.into();
active.version_field = Set(current_version + 1);
active.deleted_at = Set(Some(Utc::now()));
active.updated_at = Set(Utc::now());
active.updated_by = Set(operator_id);
active
.update(db)
.await
.map_err(|e| WorkflowError::Validation(e.to_string()))?;
audit_service::record(
AuditLog::new(
tenant_id,
Some(operator_id),
"process_definition.delete",
"process_definition",
)
.with_resource_id(id),
db,
)
.await;
Ok(())
}
fn model_to_resp(m: &process_definition::Model) -> ProcessDefinitionResp {
ProcessDefinitionResp {
id: m.id,
name: m.name.clone(),
key: m.key.clone(),
version: m.version,
category: m.category.clone(),
description: m.description.clone(),
nodes: m.nodes.clone(),
edges: m.edges.clone(),
status: m.status.clone(),
created_at: m.created_at,
updated_at: m.updated_at,
lock_version: m.version_field,
}
}
}

View File

@@ -0,0 +1,424 @@
use std::collections::HashMap;
use chrono::Utc;
use sea_orm::{
ActiveModelTrait, ColumnTrait, ConnectionTrait, EntityTrait, PaginatorTrait, QueryFilter, Set,
TransactionTrait,
};
use uuid::Uuid;
use crate::dto::{ProcessInstanceResp, StartInstanceReq, TokenResp};
use crate::engine::executor::FlowExecutor;
use crate::engine::parser;
use crate::entity::{process_definition, process_instance, process_variable, token};
use crate::error::{WorkflowError, WorkflowResult};
use erp_core::audit::AuditLog;
use erp_core::audit_service;
use erp_core::events::EventBus;
use erp_core::types::Pagination;
/// 流程实例服务。
pub struct InstanceService;
impl InstanceService {
/// 启动流程实例。
pub async fn start(
tenant_id: Uuid,
operator_id: Uuid,
req: &StartInstanceReq,
db: &sea_orm::DatabaseConnection,
event_bus: &EventBus,
) -> WorkflowResult<ProcessInstanceResp> {
// 查找流程定义
let definition = process_definition::Entity::find_by_id(req.definition_id)
.one(db)
.await
.map_err(|e| WorkflowError::Validation(e.to_string()))?
.filter(|d| d.tenant_id == tenant_id && d.deleted_at.is_none())
.ok_or_else(|| {
WorkflowError::NotFound(format!("流程定义不存在: {}", req.definition_id))
})?;
if definition.status != "published" {
return Err(WorkflowError::InvalidState(
"只能启动已发布的流程定义".to_string(),
));
}
// 解析流程图
let nodes: Vec<crate::dto::NodeDef> = serde_json::from_value(definition.nodes.clone())
.map_err(|e| WorkflowError::InvalidDiagram(format!("节点数据无效: {e}")))?;
let edges: Vec<crate::dto::EdgeDef> = serde_json::from_value(definition.edges.clone())
.map_err(|e| WorkflowError::InvalidDiagram(format!("连线数据无效: {e}")))?;
let graph = parser::parse_and_validate(&nodes, &edges)?;
// 准备流程变量
let mut variables = HashMap::new();
if let Some(vars) = &req.variables {
for v in vars {
let _var_type = v.var_type.as_deref().unwrap_or("string");
variables.insert(v.name.clone(), v.value.clone());
}
}
let instance_id = Uuid::now_v7();
let now = Utc::now();
// 在事务中创建实例、变量和 token
let instance_id_clone = instance_id;
let tenant_id_clone = tenant_id;
let operator_id_clone = operator_id;
let business_key = req.business_key.clone();
let definition_id = definition.id;
let definition_name = definition.name.clone();
let vars_to_save = req.variables.clone();
db.transaction::<_, (), WorkflowError>(|txn| {
let graph = graph.clone();
let variables = variables.clone();
Box::pin(async move {
// 创建流程实例
let instance = process_instance::ActiveModel {
id: Set(instance_id_clone),
tenant_id: Set(tenant_id_clone),
definition_id: Set(definition_id),
business_key: Set(business_key),
status: Set("running".to_string()),
started_by: Set(operator_id_clone),
started_at: Set(now),
completed_at: Set(None),
created_at: Set(now),
updated_at: Set(now),
created_by: Set(operator_id_clone),
updated_by: Set(operator_id_clone),
deleted_at: Set(None),
version: Set(1),
};
instance
.insert(txn)
.await
.map_err(|e| WorkflowError::Validation(e.to_string()))?;
// 保存初始变量
if let Some(vars) = vars_to_save {
for v in vars {
Self::save_variable(
instance_id_clone,
tenant_id_clone,
&v.name,
v.var_type.as_deref().unwrap_or("string"),
&v.value,
txn,
)
.await?;
}
}
// 启动执行引擎
FlowExecutor::start(instance_id_clone, tenant_id_clone, &graph, &variables, txn)
.await?;
Ok(())
})
})
.await?;
event_bus.publish(erp_core::events::DomainEvent::new(
"process_instance.started",
tenant_id,
erp_core::events::build_event_payload(serde_json::json!({ "instance_id": instance_id, "definition_id": definition.id, "started_by": operator_id })),
), db).await;
audit_service::record(
AuditLog::new(
tenant_id,
Some(operator_id),
"process_instance.start",
"process_instance",
)
.with_resource_id(instance_id),
db,
)
.await;
// 查询创建后的实例(包含 token
let instance = process_instance::Entity::find_by_id(instance_id)
.one(db)
.await
.map_err(|e| WorkflowError::Validation(e.to_string()))?
.ok_or_else(|| WorkflowError::NotFound(format!("流程实例不存在: {instance_id}")))?;
let active_tokens = Self::get_active_tokens(instance_id, db).await?;
Ok(ProcessInstanceResp {
id: instance.id,
definition_id: instance.definition_id,
definition_name: Some(definition_name),
business_key: instance.business_key,
status: instance.status,
started_by: instance.started_by,
started_at: instance.started_at,
completed_at: instance.completed_at,
created_at: instance.created_at,
active_tokens,
version: instance.version,
})
}
/// 分页查询流程实例。
pub async fn list(
tenant_id: Uuid,
pagination: &Pagination,
db: &sea_orm::DatabaseConnection,
) -> WorkflowResult<(Vec<ProcessInstanceResp>, u64)> {
let paginator = process_instance::Entity::find()
.filter(process_instance::Column::TenantId.eq(tenant_id))
.filter(process_instance::Column::DeletedAt.is_null())
.paginate(db, pagination.limit());
let total = paginator
.num_items()
.await
.map_err(|e| WorkflowError::Validation(e.to_string()))?;
let page_index = pagination.page.unwrap_or(1).saturating_sub(1);
let models = paginator
.fetch_page(page_index)
.await
.map_err(|e| WorkflowError::Validation(e.to_string()))?;
let mut resps = Vec::new();
for m in &models {
let active_tokens = Self::get_active_tokens(m.id, db).await.unwrap_or_default();
let def_name = process_definition::Entity::find_by_id(m.definition_id)
.one(db)
.await
.ok()
.flatten()
.map(|d| d.name);
resps.push(ProcessInstanceResp {
id: m.id,
definition_id: m.definition_id,
definition_name: def_name,
business_key: m.business_key.clone(),
status: m.status.clone(),
started_by: m.started_by,
started_at: m.started_at,
completed_at: m.completed_at,
created_at: m.created_at,
active_tokens,
version: m.version,
});
}
Ok((resps, total))
}
/// 获取单个流程实例详情。
pub async fn get_by_id(
id: Uuid,
tenant_id: Uuid,
db: &sea_orm::DatabaseConnection,
) -> WorkflowResult<ProcessInstanceResp> {
let instance = process_instance::Entity::find_by_id(id)
.one(db)
.await
.map_err(|e| WorkflowError::Validation(e.to_string()))?
.filter(|i| i.tenant_id == tenant_id && i.deleted_at.is_none())
.ok_or_else(|| WorkflowError::NotFound(format!("流程实例不存在: {id}")))?;
let def_name = process_definition::Entity::find_by_id(instance.definition_id)
.one(db)
.await
.ok()
.flatten()
.map(|d| d.name);
let active_tokens = Self::get_active_tokens(id, db).await?;
Ok(ProcessInstanceResp {
id: instance.id,
definition_id: instance.definition_id,
definition_name: def_name,
business_key: instance.business_key,
status: instance.status,
started_by: instance.started_by,
started_at: instance.started_at,
completed_at: instance.completed_at,
created_at: instance.created_at,
active_tokens,
version: instance.version,
})
}
/// 挂起流程实例。
pub async fn suspend(
id: Uuid,
tenant_id: Uuid,
operator_id: Uuid,
db: &sea_orm::DatabaseConnection,
) -> WorkflowResult<()> {
Self::change_status(id, tenant_id, operator_id, "running", "suspended", db).await
}
/// 终止流程实例。
pub async fn terminate(
id: Uuid,
tenant_id: Uuid,
operator_id: Uuid,
db: &sea_orm::DatabaseConnection,
) -> WorkflowResult<()> {
Self::change_status(id, tenant_id, operator_id, "running", "terminated", db).await
}
/// 恢复已挂起的流程实例。
pub async fn resume(
id: Uuid,
tenant_id: Uuid,
operator_id: Uuid,
db: &sea_orm::DatabaseConnection,
) -> WorkflowResult<()> {
Self::change_status(id, tenant_id, operator_id, "suspended", "running", db).await
}
async fn change_status(
id: Uuid,
tenant_id: Uuid,
operator_id: Uuid,
from_status: &str,
to_status: &str,
db: &sea_orm::DatabaseConnection,
) -> WorkflowResult<()> {
let instance = process_instance::Entity::find_by_id(id)
.one(db)
.await
.map_err(|e| WorkflowError::Validation(e.to_string()))?
.filter(|i| i.tenant_id == tenant_id && i.deleted_at.is_none())
.ok_or_else(|| WorkflowError::NotFound(format!("流程实例不存在: {id}")))?;
if instance.status != from_status {
return Err(WorkflowError::InvalidState(format!(
"流程实例状态不是 {},无法变更为 {}",
from_status, to_status
)));
}
let current_version = instance.version;
let mut active: process_instance::ActiveModel = instance.into();
active.status = Set(to_status.to_string());
active.version = Set(current_version + 1);
active.updated_at = Set(Utc::now());
active.updated_by = Set(operator_id);
active
.update(db)
.await
.map_err(|e| WorkflowError::Validation(e.to_string()))?;
// 发布状态变更领域事件(通过 outbox 模式,由 relay 广播)
let event_type = format!("process_instance.{}", to_status);
let event_id = Uuid::now_v7();
let now = Utc::now();
let outbox_event = erp_core::entity::domain_event::ActiveModel {
id: Set(event_id),
tenant_id: Set(tenant_id),
event_type: Set(event_type),
payload: Set(Some(erp_core::events::build_event_payload(
serde_json::json!({ "instance_id": id, "changed_by": operator_id }),
))),
correlation_id: Set(Some(Uuid::now_v7())),
status: Set("pending".to_string()),
attempts: Set(0),
last_error: Set(None),
created_at: Set(now),
published_at: Set(None),
};
match outbox_event.insert(db).await {
Ok(_) => {}
Err(e) => tracing::warn!(error = %e, "领域事件持久化失败"),
}
let action = format!("process_instance.{}", to_status);
audit_service::record(
AuditLog::new(tenant_id, Some(operator_id), action, "process_instance")
.with_resource_id(id),
db,
)
.await;
Ok(())
}
/// 获取实例的活跃 token 列表。
pub async fn get_active_tokens(
instance_id: Uuid,
db: &sea_orm::DatabaseConnection,
) -> WorkflowResult<Vec<TokenResp>> {
let tokens = token::Entity::find()
.filter(token::Column::InstanceId.eq(instance_id))
.filter(token::Column::Status.eq("active"))
.all(db)
.await
.map_err(|e| WorkflowError::Validation(e.to_string()))?;
Ok(tokens
.iter()
.map(|t| TokenResp {
id: t.id,
node_id: t.node_id.clone(),
status: t.status.clone(),
created_at: t.created_at,
})
.collect())
}
/// 保存流程变量。
pub async fn save_variable(
instance_id: Uuid,
tenant_id: Uuid,
name: &str,
var_type: &str,
value: &serde_json::Value,
txn: &impl ConnectionTrait,
) -> WorkflowResult<()> {
let id = Uuid::now_v7();
let (value_string, value_number, value_boolean, _value_date): (
Option<String>,
Option<f64>,
Option<bool>,
Option<chrono::DateTime<Utc>>,
) = match var_type {
"string" => (value.as_str().map(|s| s.to_string()), None, None, None),
"number" => (None, value.as_f64(), None, None),
"boolean" => (None, None, value.as_bool(), None),
_ => (Some(value.to_string()), None, None, None),
};
let now = chrono::Utc::now();
let system_user = uuid::Uuid::nil();
let model = process_variable::ActiveModel {
id: Set(id),
tenant_id: Set(tenant_id),
instance_id: Set(instance_id),
name: Set(name.to_string()),
var_type: Set(var_type.to_string()),
value_string: Set(value_string),
value_number: Set(value_number),
value_boolean: Set(value_boolean),
value_date: Set(None),
created_at: Set(now),
updated_at: Set(now),
created_by: Set(system_user),
updated_by: Set(system_user),
deleted_at: Set(None),
version: Set(1),
};
model
.insert(txn)
.await
.map_err(|e| WorkflowError::Validation(e.to_string()))?;
Ok(())
}
}

View File

@@ -0,0 +1,4 @@
pub mod ai_workflow_seed;
pub mod definition_service;
pub mod instance_service;
pub mod task_service;

View File

@@ -0,0 +1,445 @@
use std::collections::HashMap;
use chrono::Utc;
use sea_orm::{
ActiveModelTrait, ColumnTrait, ConnectionTrait, DatabaseBackend, EntityTrait, PaginatorTrait,
QueryFilter, Set, Statement, TransactionTrait,
};
use uuid::Uuid;
use crate::dto::{CompleteTaskReq, DelegateTaskReq, TaskResp};
use crate::engine::executor::FlowExecutor;
use crate::engine::parser;
use crate::entity::{process_definition, process_instance, task};
use crate::error::{WorkflowError, WorkflowResult};
use erp_core::audit::AuditLog;
use erp_core::audit_service;
use erp_core::events::EventBus;
use erp_core::types::Pagination;
/// 任务服务。
pub struct TaskService;
impl TaskService {
/// 查询当前用户的待办任务。
pub async fn list_pending(
tenant_id: Uuid,
assignee_id: Uuid,
pagination: &Pagination,
db: &sea_orm::DatabaseConnection,
) -> WorkflowResult<(Vec<TaskResp>, u64)> {
let paginator = task::Entity::find()
.filter(task::Column::TenantId.eq(tenant_id))
.filter(task::Column::AssigneeId.eq(assignee_id))
.filter(task::Column::Status.eq("pending"))
.filter(task::Column::DeletedAt.is_null())
.paginate(db, pagination.limit());
let total = paginator
.num_items()
.await
.map_err(|e| WorkflowError::Validation(e.to_string()))?;
let page_index = pagination.page.unwrap_or(1).saturating_sub(1);
let models = paginator
.fetch_page(page_index)
.await
.map_err(|e| WorkflowError::Validation(e.to_string()))?;
let mut resps = Vec::new();
for m in &models {
let mut resp = Self::model_to_resp(m);
// 附加实例信息
if let Some(inst) = process_instance::Entity::find_by_id(m.instance_id)
.one(db)
.await
.ok()
.flatten()
{
resp.business_key = inst.business_key;
if let Some(def) = process_definition::Entity::find_by_id(inst.definition_id)
.one(db)
.await
.ok()
.flatten()
{
resp.definition_name = Some(def.name);
}
}
resps.push(resp);
}
Ok((resps, total))
}
/// 查询当前用户的已办任务。
pub async fn list_completed(
tenant_id: Uuid,
assignee_id: Uuid,
pagination: &Pagination,
db: &sea_orm::DatabaseConnection,
) -> WorkflowResult<(Vec<TaskResp>, u64)> {
let paginator = task::Entity::find()
.filter(task::Column::TenantId.eq(tenant_id))
.filter(task::Column::AssigneeId.eq(assignee_id))
.filter(task::Column::Status.is_in(["completed", "approved", "rejected", "delegated"]))
.filter(task::Column::DeletedAt.is_null())
.paginate(db, pagination.limit());
let total = paginator
.num_items()
.await
.map_err(|e| WorkflowError::Validation(e.to_string()))?;
let page_index = pagination.page.unwrap_or(1).saturating_sub(1);
let models = paginator
.fetch_page(page_index)
.await
.map_err(|e| WorkflowError::Validation(e.to_string()))?;
let mut resps = Vec::new();
for m in &models {
let mut resp = Self::model_to_resp(m);
if let Some(inst) = process_instance::Entity::find_by_id(m.instance_id)
.one(db)
.await
.ok()
.flatten()
{
resp.business_key = inst.business_key;
if let Some(def) = process_definition::Entity::find_by_id(inst.definition_id)
.one(db)
.await
.ok()
.flatten()
{
resp.definition_name = Some(def.name);
}
}
resps.push(resp);
}
Ok((resps, total))
}
/// 完成任务:更新任务状态 + 推进 token。
pub async fn complete(
id: Uuid,
tenant_id: Uuid,
operator_id: Uuid,
req: &CompleteTaskReq,
db: &sea_orm::DatabaseConnection,
event_bus: &EventBus,
) -> WorkflowResult<TaskResp> {
let task_model = task::Entity::find_by_id(id)
.one(db)
.await
.map_err(|e| WorkflowError::Validation(e.to_string()))?
.filter(|t| t.tenant_id == tenant_id && t.deleted_at.is_none())
.ok_or_else(|| WorkflowError::NotFound(format!("任务不存在: {id}")))?;
if task_model.status != "pending" {
return Err(WorkflowError::InvalidState(
"任务状态不是 pending无法完成".to_string(),
));
}
// 验证操作者是当前处理人
if task_model.assignee_id != Some(operator_id) {
return Err(WorkflowError::InvalidState(
"只有当前处理人才能完成任务".to_string(),
));
}
let instance_id = task_model.instance_id;
let token_id = task_model.token_id;
// 获取流程定义和流程图
let instance = process_instance::Entity::find_by_id(instance_id)
.one(db)
.await
.map_err(|e| WorkflowError::Validation(e.to_string()))?
.filter(|i| i.tenant_id == tenant_id && i.deleted_at.is_none())
.ok_or_else(|| WorkflowError::NotFound(format!("流程实例不存在: {instance_id}")))?;
let definition = process_definition::Entity::find_by_id(instance.definition_id)
.one(db)
.await
.map_err(|e| WorkflowError::Validation(e.to_string()))?
.filter(|d| d.tenant_id == tenant_id && d.deleted_at.is_none())
.ok_or_else(|| {
WorkflowError::NotFound(format!("流程定义不存在: {}", instance.definition_id))
})?;
if instance.status != "running" {
return Err(WorkflowError::InvalidState(format!(
"流程实例状态不是 running: {}",
instance.status
)));
}
let nodes: Vec<crate::dto::NodeDef> = serde_json::from_value(definition.nodes.clone())
.map_err(|e| WorkflowError::InvalidDiagram(format!("节点数据无效: {e}")))?;
let edges: Vec<crate::dto::EdgeDef> = serde_json::from_value(definition.edges.clone())
.map_err(|e| WorkflowError::InvalidDiagram(format!("连线数据无效: {e}")))?;
let graph = parser::parse_and_validate(&nodes, &edges)?;
// 准备变量(从 req.form_data 中提取)
let mut variables = HashMap::new();
if let Some(form) = &req.form_data
&& let Some(obj) = form.as_object()
{
for (k, v) in obj {
variables.insert(k.clone(), v.clone());
}
}
// 在事务中更新任务 + 推进 token
let now = Utc::now();
let outcome = req.outcome.clone();
let form_data = req.form_data.clone();
db.transaction::<_, (), WorkflowError>(|txn| {
let graph = graph.clone();
let variables = variables.clone();
let task_model = task_model.clone();
Box::pin(async move {
// 更新任务状态
let current_version = task_model.version;
let mut active: task::ActiveModel = task_model.clone().into();
active.status = Set("completed".to_string());
active.outcome = Set(Some(outcome));
active.form_data = Set(form_data);
active.completed_at = Set(Some(now));
active.version = Set(current_version + 1);
active.updated_at = Set(now);
active.updated_by = Set(operator_id);
active
.update(txn)
.await
.map_err(|e| WorkflowError::Validation(e.to_string()))?;
// 推进 token
FlowExecutor::advance(token_id, instance_id, tenant_id, &graph, &variables, txn)
.await?;
Ok(())
})
})
.await?;
event_bus
.publish(
erp_core::events::DomainEvent::new(
"task.completed",
tenant_id,
serde_json::json!({
"task_id": id,
"instance_id": instance_id,
"started_by": instance.started_by,
"outcome": req.outcome,
}),
),
db,
)
.await;
audit_service::record(
AuditLog::new(tenant_id, Some(operator_id), "task.complete", "task")
.with_resource_id(id),
db,
)
.await;
// 重新查询任务
let updated = task::Entity::find_by_id(id)
.one(db)
.await
.map_err(|e| WorkflowError::Validation(e.to_string()))?
.ok_or_else(|| WorkflowError::NotFound(format!("任务不存在: {id}")))?;
Ok(Self::model_to_resp(&updated))
}
/// 委派任务给其他人。
pub async fn delegate(
id: Uuid,
tenant_id: Uuid,
operator_id: Uuid,
req: &DelegateTaskReq,
db: &sea_orm::DatabaseConnection,
) -> WorkflowResult<TaskResp> {
let task_model = task::Entity::find_by_id(id)
.one(db)
.await
.map_err(|e| WorkflowError::Validation(e.to_string()))?
.filter(|t| t.tenant_id == tenant_id && t.deleted_at.is_none())
.ok_or_else(|| WorkflowError::NotFound(format!("任务不存在: {id}")))?;
if task_model.status != "pending" {
return Err(WorkflowError::InvalidState(
"任务状态不是 pending无法委派".to_string(),
));
}
// 验证操作者是当前处理人
if task_model.assignee_id != Some(operator_id) {
return Err(WorkflowError::InvalidState(
"只有当前处理人才能委派任务".to_string(),
));
}
// 验证目标用户属于同一租户(使用 raw SQL 避免跨模块依赖 erp-auth
let result = db.query_one(Statement::from_sql_and_values(
DatabaseBackend::Postgres,
"SELECT EXISTS(SELECT 1 FROM users WHERE id = $1 AND tenant_id = $2 AND deleted_at IS NULL AND status = 'active') AS ok",
[req.delegate_to.into(), tenant_id.into()],
))
.await
.map_err(|e| WorkflowError::Validation(e.to_string()))?;
let target_ok = result
.and_then(|r| r.try_get::<bool>("", "ok").ok())
.unwrap_or(false);
if !target_ok {
return Err(WorkflowError::Validation(
"委派目标用户不存在或不属于当前租户".to_string(),
));
}
let current_version = task_model.version;
let mut active: task::ActiveModel = task_model.into();
active.assignee_id = Set(Some(req.delegate_to));
active.version = Set(current_version + 1);
active.updated_at = Set(Utc::now());
active.updated_by = Set(operator_id);
let updated = active
.update(db)
.await
.map_err(|e| WorkflowError::Validation(e.to_string()))?;
audit_service::record(
AuditLog::new(tenant_id, Some(operator_id), "task.delegate", "task")
.with_resource_id(id),
db,
)
.await;
Ok(Self::model_to_resp(&updated))
}
/// 创建任务记录(由执行引擎调用)。
#[allow(clippy::too_many_arguments)]
pub async fn create_task(
instance_id: Uuid,
tenant_id: Uuid,
token_id: Uuid,
node_id: &str,
node_name: Option<&str>,
assignee_id: Option<Uuid>,
candidate_groups: Option<Vec<String>>,
db: &sea_orm::DatabaseConnection,
) -> WorkflowResult<Uuid> {
let id = Uuid::now_v7();
let now = Utc::now();
let system_user = Uuid::nil();
let model = task::ActiveModel {
id: Set(id),
tenant_id: Set(tenant_id),
instance_id: Set(instance_id),
token_id: Set(token_id),
node_id: Set(node_id.to_string()),
node_name: Set(node_name.map(|s| s.to_string())),
assignee_id: Set(assignee_id),
candidate_groups: Set(
candidate_groups.map(|g| serde_json::to_value(g).unwrap_or_default())
),
status: Set("pending".to_string()),
outcome: Set(None),
form_data: Set(None),
due_date: Set(None),
completed_at: Set(None),
created_at: Set(now),
updated_at: Set(now),
created_by: Set(system_user),
updated_by: Set(system_user),
deleted_at: Set(None),
version: Set(1),
};
model
.insert(db)
.await
.map_err(|e| WorkflowError::Validation(e.to_string()))?;
Ok(id)
}
/// 认领任务:将 pending 状态的任务分配给当前用户。
///
/// 适用于 candidate_groups 群组任务池中的任务,用户主动认领后
/// 任务状态变为 in_progressassignee_id 设置为认领用户。
pub async fn claim(
id: Uuid,
tenant_id: Uuid,
user_id: Uuid,
db: &sea_orm::DatabaseConnection,
) -> WorkflowResult<TaskResp> {
let task_model = task::Entity::find_by_id(id)
.one(db)
.await
.map_err(|e| WorkflowError::Validation(e.to_string()))?
.filter(|t| t.tenant_id == tenant_id && t.deleted_at.is_none())
.ok_or_else(|| WorkflowError::NotFound(format!("任务不存在: {id}")))?;
if task_model.status != "pending" {
return Err(WorkflowError::InvalidState(format!(
"任务状态不是 pending当前状态: {}),无法认领",
task_model.status
)));
}
let current_version = task_model.version;
let mut active: task::ActiveModel = task_model.into();
active.assignee_id = Set(Some(user_id));
active.status = Set("in_progress".to_string());
active.version = Set(current_version + 1);
active.updated_at = Set(Utc::now());
active.updated_by = Set(user_id);
let updated = active
.update(db)
.await
.map_err(|e| WorkflowError::Validation(e.to_string()))?;
audit_service::record(
AuditLog::new(tenant_id, Some(user_id), "task.claim", "task").with_resource_id(id),
db,
)
.await;
Ok(Self::model_to_resp(&updated))
}
fn model_to_resp(m: &task::Model) -> TaskResp {
TaskResp {
id: m.id,
instance_id: m.instance_id,
token_id: m.token_id,
node_id: m.node_id.clone(),
node_name: m.node_name.clone(),
assignee_id: m.assignee_id,
candidate_groups: m.candidate_groups.clone(),
status: m.status.clone(),
outcome: m.outcome.clone(),
form_data: m.form_data.clone(),
due_date: m.due_date,
completed_at: m.completed_at,
created_at: m.created_at,
definition_name: None,
business_key: None,
version: m.version,
}
}
}