feat: 审计修复 Phase 6-7 — SSE 推送/工作流补全/消息群发/前端收尾
Some checks failed
CI / rust-check (push) Has been cancelled
CI / rust-test (push) Has been cancelled
CI / frontend-build (push) Has been cancelled
CI / security-audit (push) Has been cancelled

Phase 6 功能补全:
- P1-3: 消息 SSE 实时推送端点 + 前端 EventSource 连接
- P1-6: ServiceTask HTTP 调用能力 (reqwest GET/POST)
- P1-7: user.deleted 事件处理 — 终止相关流程实例
- P1-8: 任务认领 (claim) 端点 + handler
- P1-9: 超时检查器发布 task.timeout 事件
- P1-15: 组织/部门名称唯一性校验 (create + update)
- P1-18: 消息群发 fan-out (role/department/all 批量投递)

Phase 7 P3-P4 收尾:
- PluginAdmin purge 按钮状态修复
- ChangePassword 最小 8 字符 + 新旧密码不同验证
- AuditLogViewer 用户名缓存 + 扩展资源类型
- InstanceMonitor 通过 definition 缓存解析 node_name
- NotificationPreferences DND 时间范围校验
This commit is contained in:
iven
2026-04-26 19:44:04 +08:00
parent 83fe89cbcd
commit b05b7c27a0
28 changed files with 996 additions and 67 deletions

View File

@@ -81,6 +81,19 @@ impl DeptService {
}
}
// Check name uniqueness within the same organization
let name_exists = department::Entity::find()
.filter(department::Column::TenantId.eq(tenant_id))
.filter(department::Column::OrgId.eq(org_id))
.filter(department::Column::Name.eq(&req.name))
.filter(department::Column::DeletedAt.is_null())
.one(db)
.await
.map_err(|e| AuthError::Validation(e.to_string()))?;
if name_exists.is_some() {
return Err(AuthError::Validation("部门名称已存在".to_string()));
}
// Compute path from parent department or organization root
let path = if let Some(parent_id) = req.parent_id {
let parent = department::Entity::find_by_id(parent_id)
@@ -192,6 +205,24 @@ impl DeptService {
}
}
// If name is being changed, check uniqueness within the same org (exclude self)
if let Some(ref new_name) = req.name
&& new_name != &model.name
{
let name_exists = department::Entity::find()
.filter(department::Column::TenantId.eq(tenant_id))
.filter(department::Column::OrgId.eq(model.org_id))
.filter(department::Column::Name.eq(new_name.as_str()))
.filter(department::Column::DeletedAt.is_null())
.filter(department::Column::Id.ne(id))
.one(db)
.await
.map_err(|e| AuthError::Validation(e.to_string()))?;
if name_exists.is_some() {
return Err(AuthError::Validation("部门名称已存在".to_string()));
}
}
let next_ver = check_version(req.version, model.version)
.map_err(|e| AuthError::Validation(e.to_string()))?;

View File

@@ -69,6 +69,18 @@ impl OrgService {
}
}
// Check name uniqueness within tenant
let name_exists = organization::Entity::find()
.filter(organization::Column::TenantId.eq(tenant_id))
.filter(organization::Column::Name.eq(&req.name))
.filter(organization::Column::DeletedAt.is_null())
.one(db)
.await
.map_err(|e| AuthError::Validation(e.to_string()))?;
if name_exists.is_some() {
return Err(AuthError::Validation("组织名称已存在".to_string()));
}
let (path, level) = if let Some(parent_id) = req.parent_id {
let parent = organization::Entity::find_by_id(parent_id)
.one(db)
@@ -174,6 +186,23 @@ impl OrgService {
}
}
// If name is being changed, check uniqueness (exclude self)
if let Some(ref new_name) = req.name
&& new_name != &model.name
{
let name_exists = organization::Entity::find()
.filter(organization::Column::TenantId.eq(tenant_id))
.filter(organization::Column::Name.eq(new_name.as_str()))
.filter(organization::Column::DeletedAt.is_null())
.filter(organization::Column::Id.ne(id))
.one(db)
.await
.map_err(|e| AuthError::Validation(e.to_string()))?;
if name_exists.is_some() {
return Err(AuthError::Validation("组织名称已存在".to_string()));
}
}
let next_ver = check_version(req.version, model.version)
.map_err(|e| AuthError::Validation(e.to_string()))?;

View File

@@ -18,3 +18,6 @@ thiserror.workspace = true
utoipa = { workspace = true, features = ["uuid", "chrono"] }
async-trait.workspace = true
validator.workspace = true
futures.workspace = true
tokio-stream.workspace = true
async-stream.workspace = true

View File

@@ -1,3 +1,4 @@
pub mod message_handler;
pub mod sse_handler;
pub mod subscription_handler;
pub mod template_handler;

View File

@@ -0,0 +1,53 @@
use std::convert::Infallible;
use axum::extract::Extension;
use axum::response::sse::{Event, KeepAlive, Sse};
use futures::stream::Stream;
use erp_core::error::AppError;
use erp_core::types::TenantContext;
use crate::message_state::MessageState;
/// SSE 消息推送端点。
///
/// 客户端连接后监听 `message.sent` 事件,仅推送当前用户的消息。
/// 使用 EventBus 的 filtered subscriber 按前缀过滤事件。
pub async fn message_stream(
axum::extract::State(state): axum::extract::State<MessageState>,
Extension(ctx): Extension<TenantContext>,
) -> Result<Sse<impl Stream<Item = Result<Event, Infallible>>>, AppError> {
let user_id = ctx.user_id;
let tenant_id = ctx.tenant_id;
let (mut rx, _handle) = state.event_bus.subscribe_filtered("message.sent".to_string());
let sse_stream = async_stream::stream! {
loop {
match rx.recv().await {
Some(event) => {
if event.tenant_id != tenant_id {
continue;
}
let is_recipient = event.payload.get("recipient_id")
.and_then(|v: &serde_json::Value| v.as_str())
.map(|s| s == user_id.to_string())
.unwrap_or(false);
if !is_recipient {
continue;
}
let data = serde_json::to_string(&event.payload)
.unwrap_or_default();
yield Ok(Event::default()
.event("message")
.data(data));
}
None => {
break;
}
}
}
};
Ok(Sse::new(sse_stream).keep_alive(KeepAlive::default()))
}

View File

@@ -10,7 +10,7 @@ use erp_core::events::EventBus;
use erp_core::module::ErpModule;
use crate::entity::message_subscription;
use crate::handler::{message_handler, subscription_handler, template_handler};
use crate::handler::{message_handler, sse_handler, subscription_handler, template_handler};
/// 消息中心模块,实现 ErpModule trait。
pub struct MessageModule;
@@ -36,6 +36,8 @@ impl MessageModule {
.route("/messages/{id}/read", put(message_handler::mark_read))
.route("/messages/read-all", put(message_handler::mark_all_read))
.route("/messages/{id}", delete(message_handler::delete_message))
// SSE 实时推送
.route("/messages/stream", get(sse_handler::message_stream))
// 模板路由
.route(
"/message-templates",

View File

@@ -1,7 +1,7 @@
use chrono::Utc;
use sea_orm::{
ActiveModelTrait, ColumnTrait, ConnectionTrait, DatabaseBackend, EntityTrait, PaginatorTrait,
QueryFilter, Set, Statement,
QueryFilter, Set, Statement, FromQueryResult,
};
use uuid::Uuid;
@@ -12,6 +12,12 @@ use erp_core::audit::AuditLog;
use erp_core::audit_service;
use erp_core::events::EventBus;
/// 原始 SQL 查询 user_id 的结果结构体。
#[derive(Debug, FromQueryResult)]
struct UserIdRow {
user_id: Uuid,
}
/// 消息服务。
pub struct MessageService;
@@ -80,6 +86,12 @@ impl MessageService {
}
/// 发送消息。
///
/// 根据 `recipient_type` 执行不同的投递策略:
/// - `"user"` — 单条消息,直接投递给 `recipient_id` 指定的用户。
/// - `"role"` — 查询 `user_roles` 表,向该角色下的所有用户批量投递。
/// - `"department"` — 查询 `user_departments` 表,向该部门下的所有用户批量投递。
/// - `"all"` — 查询 `users` 表,向租户内所有活跃用户批量投递。
pub async fn send(
tenant_id: Uuid,
sender_id: Uuid,
@@ -87,49 +99,79 @@ impl MessageService {
db: &sea_orm::DatabaseConnection,
event_bus: &EventBus,
) -> MessageResult<MessageResp> {
let id = Uuid::now_v7();
let now = Utc::now();
let model = message::ActiveModel {
id: Set(id),
tenant_id: Set(tenant_id),
template_id: Set(req.template_id),
sender_id: Set(Some(sender_id)),
sender_type: Set("user".to_string()),
recipient_id: Set(req.recipient_id),
recipient_type: Set(req.recipient_type.clone()),
title: Set(req.title.clone()),
body: Set(req.body.clone()),
priority: Set(req.priority.clone()),
business_type: Set(req.business_type.clone()),
business_id: Set(req.business_id),
is_read: Set(false),
read_at: Set(None),
is_archived: Set(false),
archived_at: Set(None),
sent_at: Set(Some(now)),
status: Set("sent".to_string()),
created_at: Set(now),
updated_at: Set(now),
created_by: Set(sender_id),
updated_by: Set(sender_id),
deleted_at: Set(None),
version: Set(1),
// Resolve target user IDs based on recipient type
let recipient_user_ids = match req.recipient_type.as_str() {
"user" => vec![req.recipient_id],
"role" => {
Self::resolve_user_ids_by_role(db, req.recipient_id, tenant_id).await?
}
"department" => {
Self::resolve_user_ids_by_department(db, req.recipient_id, tenant_id).await?
}
"all" => {
Self::resolve_all_active_user_ids(db, tenant_id).await?
}
other => {
return Err(MessageError::Validation(format!(
"不支持的收件人类型: {other}"
)));
}
};
let inserted = model
.insert(db)
if recipient_user_ids.is_empty() {
return Err(MessageError::Validation(
"没有找到符合条件的收件人".to_string(),
));
}
// Build message models for all recipients
let models: Vec<message::ActiveModel> = recipient_user_ids
.iter()
.map(|uid| message::ActiveModel {
id: Set(Uuid::now_v7()),
tenant_id: Set(tenant_id),
template_id: Set(req.template_id),
sender_id: Set(Some(sender_id)),
sender_type: Set("user".to_string()),
recipient_id: Set(*uid),
recipient_type: Set("user".to_string()),
title: Set(req.title.clone()),
body: Set(req.body.clone()),
priority: Set(req.priority.clone()),
business_type: Set(req.business_type.clone()),
business_id: Set(req.business_id),
is_read: Set(false),
read_at: Set(None),
is_archived: Set(false),
archived_at: Set(None),
sent_at: Set(Some(now)),
status: Set("sent".to_string()),
created_at: Set(now),
updated_at: Set(now),
created_by: Set(sender_id),
updated_by: Set(sender_id),
deleted_at: Set(None),
version: Set(1),
})
.collect();
// Batch insert all messages
message::Entity::insert_many(models)
.exec(db)
.await
.map_err(|e| MessageError::Validation(e.to_string()))?;
// Publish one event per batch (summary event)
event_bus
.publish(
erp_core::events::DomainEvent::new(
"message.sent",
tenant_id,
serde_json::json!({
"message_id": id,
"recipient_id": req.recipient_id,
"recipient_type": req.recipient_type,
"recipient_count": recipient_user_ids.len(),
"title": req.title,
}),
),
@@ -139,12 +181,94 @@ impl MessageService {
audit_service::record(
AuditLog::new(tenant_id, Some(sender_id), "message.send", "message")
.with_resource_id(id),
.with_changes(
None,
Some(serde_json::json!({
"recipient_type": req.recipient_type,
"recipient_count": recipient_user_ids.len(),
"title": req.title,
})),
),
db,
)
.await;
Ok(Self::model_to_resp(&inserted))
// Construct a representative response (no row returned from batch insert)
Ok(MessageResp {
id: Uuid::nil(),
tenant_id,
template_id: req.template_id,
sender_id: Some(sender_id),
sender_type: "user".to_string(),
recipient_id: req.recipient_id,
recipient_type: req.recipient_type.clone(),
title: req.title.clone(),
body: req.body.clone(),
priority: req.priority.clone(),
business_type: req.business_type.clone(),
business_id: req.business_id,
is_read: false,
read_at: None,
is_archived: false,
status: "sent".to_string(),
sent_at: Some(now),
created_at: now,
updated_at: now,
version: 1,
})
}
/// 根据角色 ID 查询关联的用户 ID 列表(跨模块 raw SQL
async fn resolve_user_ids_by_role(
db: &sea_orm::DatabaseConnection,
role_id: Uuid,
tenant_id: Uuid,
) -> MessageResult<Vec<Uuid>> {
let rows = UserIdRow::find_by_statement(Statement::from_sql_and_values(
DatabaseBackend::Postgres,
"SELECT user_id FROM user_roles WHERE role_id = $1 AND tenant_id = $2 AND deleted_at IS NULL",
[role_id.into(), tenant_id.into()],
))
.all(db)
.await
.map_err(|e| MessageError::Validation(e.to_string()))?;
Ok(rows.into_iter().map(|r| r.user_id).collect())
}
/// 根据部门 ID 查询关联的用户 ID 列表(跨模块 raw SQL
async fn resolve_user_ids_by_department(
db: &sea_orm::DatabaseConnection,
department_id: Uuid,
tenant_id: Uuid,
) -> MessageResult<Vec<Uuid>> {
let rows = UserIdRow::find_by_statement(Statement::from_sql_and_values(
DatabaseBackend::Postgres,
"SELECT user_id FROM user_departments WHERE department_id = $1 AND tenant_id = $2 AND deleted_at IS NULL",
[department_id.into(), tenant_id.into()],
))
.all(db)
.await
.map_err(|e| MessageError::Validation(e.to_string()))?;
Ok(rows.into_iter().map(|r| r.user_id).collect())
}
/// 查询租户内所有活跃用户的 ID 列表(跨模块 raw SQL
async fn resolve_all_active_user_ids(
db: &sea_orm::DatabaseConnection,
tenant_id: Uuid,
) -> MessageResult<Vec<Uuid>> {
let rows = UserIdRow::find_by_statement(Statement::from_sql_and_values(
DatabaseBackend::Postgres,
"SELECT id AS user_id FROM users WHERE tenant_id = $1 AND deleted_at IS NULL AND status = 'active'",
[tenant_id.into()],
))
.all(db)
.await
.map_err(|e| MessageError::Validation(e.to_string()))?;
Ok(rows.into_iter().map(|r| r.user_id).collect())
}
/// 系统发送消息(由事件处理器调用)。

View File

@@ -411,7 +411,7 @@ async fn main() -> anyhow::Result<()> {
tracing::info!("Outbox relay started");
// Start timeout checker (scan overdue tasks every 60s)
erp_workflow::WorkflowModule::start_timeout_checker(db.clone());
erp_workflow::WorkflowModule::start_timeout_checker(db.clone(), event_bus.clone());
tracing::info!("Timeout checker started");
// Start follow-up overdue checker (handled by HealthModule::on_startup)

View File

@@ -26,6 +26,7 @@ fn make_simple_definition(name: &str, key: &str, assignee_id: Option<uuid::Uuid>
assignee_id: None,
candidate_groups: None,
service_type: None,
service_config: None,
position: None,
},
NodeDef {
@@ -35,6 +36,7 @@ fn make_simple_definition(name: &str, key: &str, assignee_id: Option<uuid::Uuid>
assignee_id,
candidate_groups: None,
service_type: None,
service_config: None,
position: None,
},
NodeDef {
@@ -44,6 +46,7 @@ fn make_simple_definition(name: &str, key: &str, assignee_id: Option<uuid::Uuid>
assignee_id: None,
candidate_groups: None,
service_type: None,
service_config: None,
position: None,
},
],

View File

@@ -18,3 +18,4 @@ thiserror.workspace = true
utoipa = { workspace = true, features = ["uuid", "chrono"] }
async-trait.workspace = true
validator.workspace = true
reqwest = { workspace = true, features = ["json"] }

View File

@@ -30,6 +30,9 @@ pub struct NodeDef {
pub candidate_groups: Option<Vec<String>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub service_type: Option<String>,
/// 服务任务 HTTP 调用配置
#[serde(skip_serializing_if = "Option::is_none")]
pub service_config: Option<ServiceTaskConfig>,
/// 前端渲染位置
#[serde(skip_serializing_if = "Option::is_none")]
pub position: Option<NodePosition>,
@@ -41,6 +44,23 @@ pub struct NodePosition {
pub y: f64,
}
/// ServiceTask HTTP 调用配置
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
pub struct ServiceTaskConfig {
/// 请求 URL
pub url: String,
/// HTTP 方法GET / POST默认 GET
#[serde(default = "default_method")]
pub method: String,
/// POST body 模板(支持从流程变量替换 ${var_name}
#[serde(skip_serializing_if = "Option::is_none")]
pub body: Option<serde_json::Value>,
}
fn default_method() -> String {
"GET".to_string()
}
/// 流程图连线定义
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
pub struct EdgeDef {

View File

@@ -249,8 +249,7 @@ impl FlowExecutor {
.await
}
NodeType::ServiceTask => {
// ServiceTask 自动执行:当前阶段自动跳过(直接推进到后继节点)
// 创建一个立即消费的 token 记录(用于审计追踪)
// ServiceTask 自动执行 HTTP 调用
let now = Utc::now();
let system_user = uuid::Uuid::nil();
let auto_token_id = Uuid::now_v7();
@@ -274,7 +273,18 @@ impl FlowExecutor {
.await
.map_err(|e| WorkflowError::Validation(e.to_string()))?;
tracing::info!(node_id = node_id, node_name = %node.name, "ServiceTask 自动跳过(尚未实现 HTTP 调用)");
// 执行 HTTP 调用(如果配置了 service_config
let var_name = format!("service_task_{node_id}_result");
let result_value = Self::execute_service_task(node, variables).await;
// 将结果存储为流程变量
Self::set_process_variable(
instance_id,
tenant_id,
&var_name,
&result_value,
txn,
)
.await?;
// 沿出边继续推进
let outgoing = graph.get_outgoing_edges(node_id);
@@ -444,6 +454,125 @@ impl FlowExecutor {
Ok(new_tokens)
}
/// 执行 ServiceTask HTTP 调用。
///
/// 根据 `service_config` 中的 url/method/body 发起 HTTP 请求。
/// 如果没有配置 `service_config` 或调用失败,返回错误信息 JSON 而不是阻塞流程。
async fn execute_service_task(
node: &crate::engine::model::FlowNode,
variables: &HashMap<String, serde_json::Value>,
) -> serde_json::Value {
let config = match &node.service_config {
Some(c) => c,
None => {
tracing::warn!(
node_id = &node.id,
node_name = %node.name,
"ServiceTask 没有 service_config 配置,跳过 HTTP 调用"
);
return serde_json::json!({
"status": "skipped",
"reason": "未配置 service_config"
});
}
};
let method = config.method.to_uppercase();
let url = &config.url;
tracing::info!(
node_id = &node.id,
node_name = %node.name,
method = %method,
url = %url,
"ServiceTask 开始 HTTP 调用"
);
let client = reqwest::Client::new();
let result = match method.as_str() {
"POST" => {
let body = config.body.as_ref().map(|b| {
// 简单变量替换:${var_name} → variables 中的值
let mut body_str = b.to_string();
for (key, val) in variables {
let placeholder = format!("${{{key}}}");
body_str = body_str.replace(&placeholder, &val.to_string());
}
body_str
});
client.post(url).body(body.unwrap_or_default()).send().await
}
_ => {
// 默认 GET
client.get(url).send().await
}
};
match result {
Ok(resp) => {
let status = resp.status().as_u16();
let body = resp.text().await.unwrap_or_default();
tracing::info!(
node_id = &node.id,
status = status,
"ServiceTask HTTP 调用完成"
);
serde_json::json!({
"status": "success",
"http_status": status,
"body": body,
})
}
Err(e) => {
tracing::warn!(
node_id = &node.id,
error = %e,
"ServiceTask HTTP 调用失败(流程继续推进)"
);
serde_json::json!({
"status": "error",
"error": e.to_string(),
})
}
}
}
/// 将流程变量写入 process_variables 表。
async fn set_process_variable(
instance_id: Uuid,
tenant_id: Uuid,
name: &str,
value: &serde_json::Value,
txn: &impl ConnectionTrait,
) -> WorkflowResult<()> {
use crate::entity::process_variable;
let now = Utc::now();
let system_user = Uuid::nil();
let var_model = process_variable::ActiveModel {
id: Set(Uuid::now_v7()),
tenant_id: Set(tenant_id),
instance_id: Set(instance_id),
name: Set(name.to_string()),
var_type: Set("json".to_string()),
value_string: Set(Some(value.to_string())),
value_number: Set(None),
value_boolean: Set(None),
value_date: Set(None),
created_at: Set(now),
updated_at: Set(now),
created_by: Set(system_user),
updated_by: Set(system_user),
deleted_at: Set(None),
version: Set(1),
};
var_model
.insert(txn)
.await
.map_err(|e| WorkflowError::Validation(e.to_string()))?;
Ok(())
}
/// 检查实例是否所有 token 都已完成,如果是则完成实例。
async fn check_instance_completion(
instance_id: Uuid,

View File

@@ -28,6 +28,7 @@ pub struct FlowNode {
pub assignee_id: Option<uuid::Uuid>,
pub candidate_groups: Option<Vec<String>>,
pub service_type: Option<String>,
pub service_config: Option<crate::dto::ServiceTaskConfig>,
}
/// 内存中的边模型。
@@ -60,6 +61,7 @@ impl FlowGraph {
assignee_id: n.assignee_id,
candidate_groups: n.candidate_groups.clone(),
service_type: n.service_type.clone(),
service_config: n.service_config.clone(),
};
if n.node_type == NodeType::StartEvent {

View File

@@ -136,6 +136,7 @@ mod tests {
assignee_id: None,
candidate_groups: None,
service_type: None,
service_config: None,
position: Some(NodePosition { x: 100.0, y: 100.0 }),
}
}
@@ -148,6 +149,7 @@ mod tests {
assignee_id: None,
candidate_groups: None,
service_type: None,
service_config: None,
position: Some(NodePosition { x: 100.0, y: 300.0 }),
}
}
@@ -160,6 +162,7 @@ mod tests {
assignee_id: None,
candidate_groups: None,
service_type: None,
service_config: None,
position: None,
}
}
@@ -219,6 +222,7 @@ mod tests {
assignee_id: None,
candidate_groups: None,
service_type: None,
service_config: None,
position: None,
},
];
@@ -249,6 +253,7 @@ mod tests {
assignee_id: None,
candidate_groups: None,
service_type: None,
service_config: None,
position: None,
},
make_end(),

View File

@@ -1,7 +1,7 @@
// 超时检查框架
//
// TimeoutChecker 定期扫描 tasks 表中已超时但仍处于 pending 状态的任务,
// 以便触发自动完成或升级逻辑(后续迭代实现)
// 发布 task.timeout 事件用于升级通知
use chrono::Utc;
use sea_orm::{ColumnTrait, EntityTrait, QueryFilter};
@@ -52,4 +52,26 @@ impl TimeoutChecker {
Ok(overdue.iter().map(|t| t.id).collect())
}
/// 查询所有租户中已超时的任务(含详细信息)。
///
/// 返回 (task_id, tenant_id, instance_id, assignee_id) 元组,
/// 用于发布 task.timeout 事件。
pub async fn find_all_overdue_tasks_with_details(
db: &sea_orm::DatabaseConnection,
) -> WorkflowResult<Vec<(Uuid, Uuid, Uuid, Option<Uuid>)>> {
let now = Utc::now();
let overdue = task::Entity::find()
.filter(task::Column::Status.eq("pending"))
.filter(task::Column::DueDate.lt(now))
.filter(task::Column::DeletedAt.is_null())
.all(db)
.await
.map_err(|e| crate::error::WorkflowError::Validation(e.to_string()))?;
Ok(overdue
.iter()
.map(|t| (t.id, t.tenant_id, t.instance_id, t.assignee_id))
.collect())
}
}

View File

@@ -167,3 +167,33 @@ where
Ok(Json(ApiResponse::ok(resp)))
}
#[utoipa::path(
put,
path = "/api/v1/workflow/tasks/{id}/claim",
params(("id" = Uuid, Path, description = "任务ID")),
responses(
(status = 200, description = "认领成功", body = ApiResponse<TaskResp>),
(status = 401, description = "未授权"),
(status = 403, description = "权限不足"),
(status = 404, description = "任务不存在"),
),
security(("bearer_auth" = [])),
tag = "流程任务"
)]
/// PUT /api/v1/workflow/tasks/{id}/claim
pub async fn claim_task<S>(
State(state): State<WorkflowState>,
Extension(ctx): Extension<TenantContext>,
Path(id): Path<Uuid>,
) -> Result<Json<ApiResponse<TaskResp>>, AppError>
where
WorkflowState: FromRef<S>,
S: Clone + Send + Sync + 'static,
{
require_permission(&ctx, "workflow.approve")?;
let resp = TaskService::claim(id, ctx.tenant_id, ctx.user_id, &state.db).await?;
Ok(Json(ApiResponse::ok(resp)))
}

View File

@@ -1,5 +1,5 @@
use axum::Router;
use axum::routing::{get, post};
use axum::routing::{get, post, put};
use std::time::Duration;
use uuid::Uuid;
@@ -83,13 +83,17 @@ impl WorkflowModule {
"/workflow/tasks/{id}/delegate",
post(task_handler::delegate_task),
)
.route(
"/workflow/tasks/{id}/claim",
put(task_handler::claim_task),
)
}
/// 启动超时检查后台任务。
///
/// 每 60 秒扫描一次 tasks 表,查找 due_date 已过期但仍处于 pending 状态的任务。
/// 发现超时任务时记录 warning 日志,后续迭代将实现自动完成/升级逻辑
pub fn start_timeout_checker(db: sea_orm::DatabaseConnection) {
/// 发现超时任务时发布 `task.timeout` 事件到事件总线,并记录 warning 日志。
pub fn start_timeout_checker(db: sea_orm::DatabaseConnection, event_bus: EventBus) {
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(60));
@@ -99,14 +103,26 @@ impl WorkflowModule {
loop {
interval.tick().await;
match crate::engine::timeout::TimeoutChecker::find_all_overdue_tasks(&db).await {
match crate::engine::timeout::TimeoutChecker::find_all_overdue_tasks_with_details(&db).await {
Ok(overdue) => {
if !overdue.is_empty() {
tracing::warn!(
count = overdue.len(),
task_ids = ?overdue,
"发现超时未完成的任务 — TODO: 实现自动完成/升级逻辑"
"发现超时未完成的任务,发布 task.timeout 事件"
);
for (task_id, tenant_id, instance_id, assignee_id) in &overdue {
// 发布超时事件
let event = erp_core::events::DomainEvent::new(
"task.timeout",
*tenant_id,
serde_json::json!({
"task_id": task_id,
"instance_id": instance_id,
"assignee_id": assignee_id,
}),
);
event_bus.publish(event, &db).await;
}
}
}
Err(e) => {
@@ -138,7 +154,140 @@ impl ErpModule for WorkflowModule {
vec!["auth"]
}
fn register_event_handlers(&self, _bus: &EventBus) {}
fn register_event_handlers(&self, _bus: &EventBus) {
// 事件处理器已迁移到 on_startup需要 DB 连接),此处保留空实现以兼容 trait 签名
}
async fn on_startup(
&self,
ctx: &erp_core::module::ModuleContext,
) -> erp_core::error::AppResult<()> {
let db = ctx.db.clone();
let bus = ctx.event_bus.clone();
// 订阅 user. 前缀事件,处理 user.deleted
let (mut receiver, _handle) = bus.subscribe_filtered("user.".to_string());
tokio::spawn(async move {
loop {
match receiver.recv().await {
Some(event) if event.event_type == "user.deleted" => {
let user_id = match event.payload.get("user_id").and_then(|v| v.as_str()) {
Some(id) => match Uuid::parse_str(id) {
Ok(u) => u,
Err(e) => {
tracing::warn!(
error = %e,
"user.deleted 事件的 user_id 解析失败,跳过"
);
continue;
}
},
_ => {
tracing::warn!("user.deleted 事件缺少 user_id 字段,跳过");
continue;
}
};
tracing::info!(
user_id = %user_id,
tenant_id = %event.tenant_id,
"收到 user.deleted 事件,查找并终止相关流程实例"
);
// 查找该用户有活跃任务的流程实例
use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter, Set};
use chrono::Utc;
// 查找该用户作为 assignee 的 pending 任务
let active_tasks = crate::entity::task::Entity::find()
.filter(crate::entity::task::Column::TenantId.eq(event.tenant_id))
.filter(crate::entity::task::Column::AssigneeId.eq(user_id))
.filter(crate::entity::task::Column::Status.eq("pending"))
.filter(crate::entity::task::Column::DeletedAt.is_null())
.all(&db)
.await;
match active_tasks {
Ok(tasks) if tasks.is_empty() => {
tracing::info!(
user_id = %user_id,
"该用户没有活跃的待办任务,无需终止流程"
);
}
Ok(tasks) => {
// 收集需要终止的实例 ID
let instance_ids: std::collections::HashSet<Uuid> =
tasks.iter().map(|t| t.instance_id).collect();
for instance_id in &instance_ids {
// 将实例状态设置为 terminated
let instance = crate::entity::process_instance::Entity::find_by_id(*instance_id)
.one(&db)
.await;
if let Ok(Some(inst)) = instance {
if inst.tenant_id == event.tenant_id
&& inst.deleted_at.is_none()
&& inst.status == "running"
{
let ver = inst.version;
let mut active: crate::entity::process_instance::ActiveModel = inst.into();
active.status = Set("terminated".to_string());
active.updated_at = Set(Utc::now());
active.version = Set(ver + 1);
match active.update(&db).await {
Ok(_) => {
tracing::info!(
instance_id = %instance_id,
"流程实例已终止(用户被删除)"
);
}
Err(e) => {
tracing::warn!(
instance_id = %instance_id,
error = %e,
"终止流程实例失败"
);
}
}
}
}
}
tracing::info!(
user_id = %user_id,
instance_count = instance_ids.len(),
task_count = tasks.len(),
"用户删除事件处理完成"
);
}
Err(e) => {
tracing::warn!(
error = %e,
"查询用户活跃任务失败"
);
}
}
}
Some(event) => {
// 其他 user. 前缀事件,忽略
tracing::debug!(
event_type = %event.event_type,
"忽略非 user.deleted 事件"
);
}
None => {
// 通道关闭,退出循环
tracing::info!("Workflow 事件订阅通道已关闭");
break;
}
}
}
});
tracing::info!(module = "workflow", "Workflow 事件处理器已注册(监听 user.deleted");
Ok(())
}
async fn on_tenant_created(
&self,

View File

@@ -376,6 +376,52 @@ impl TaskService {
Ok(id)
}
/// 认领任务:将 pending 状态的任务分配给当前用户。
///
/// 适用于 candidate_groups 群组任务池中的任务,用户主动认领后
/// 任务状态变为 in_progressassignee_id 设置为认领用户。
pub async fn claim(
id: Uuid,
tenant_id: Uuid,
user_id: Uuid,
db: &sea_orm::DatabaseConnection,
) -> WorkflowResult<TaskResp> {
let task_model = task::Entity::find_by_id(id)
.one(db)
.await
.map_err(|e| WorkflowError::Validation(e.to_string()))?
.filter(|t| t.tenant_id == tenant_id && t.deleted_at.is_none())
.ok_or_else(|| WorkflowError::NotFound(format!("任务不存在: {id}")))?;
if task_model.status != "pending" {
return Err(WorkflowError::InvalidState(format!(
"任务状态不是 pending当前状态: {}),无法认领",
task_model.status
)));
}
let current_version = task_model.version;
let mut active: task::ActiveModel = task_model.into();
active.assignee_id = Set(Some(user_id));
active.status = Set("in_progress".to_string());
active.version = Set(current_version + 1);
active.updated_at = Set(Utc::now());
active.updated_by = Set(user_id);
let updated = active
.update(db)
.await
.map_err(|e| WorkflowError::Validation(e.to_string()))?;
audit_service::record(
AuditLog::new(tenant_id, Some(user_id), "task.claim", "task").with_resource_id(id),
db,
)
.await;
Ok(Self::model_to_resp(&updated))
}
fn model_to_resp(m: &task::Model) -> TaskResp {
TaskResp {
id: m.id,