feat: systematic functional audit — fix 18 issues across Phase A/B

Phase A (P1 production blockers):
- A1: Apply IP rate limiting to public routes (login/refresh)
- A2: Publish domain events for workflow instance state transitions
  (completed/suspended/resumed/terminated) via outbox pattern
- A3: Replace hardcoded nil UUID default tenant with dynamic DB lookup
- A4: Add GET /api/v1/audit-logs query endpoint with pagination
- A5: Enhance CORS wildcard warning for production environments

Phase B (P2 functional gaps):
- B1: Remove dead erp-common crate (zero references in codebase)
- B2: Refactor 5 settings pages to use typed API modules instead of
  direct client calls; create api/themes.ts; delete dead errors.ts
- B3: Add resume/suspend buttons to InstanceMonitor page
- B4: Remove unused EventHandler trait from erp-core
- B5: Handle task.completed events in message module (send notifications)
- B6: Wire TimeoutChecker as 60s background task
- B7: Auto-skip ServiceTask nodes instead of crashing the process
- B8: Remove empty register_routes() from ErpModule trait and modules
This commit is contained in:
iven
2026-04-12 15:22:28 +08:00
parent 685df5e458
commit 14f431efff
34 changed files with 785 additions and 304 deletions

View File

@@ -5,7 +5,6 @@ edition.workspace = true
[dependencies]
erp-core.workspace = true
erp-common.workspace = true
tokio.workspace = true
serde.workspace = true
serde_json.workspace = true

View File

@@ -129,24 +129,17 @@ impl ErpModule for AuthModule {
vec![]
}
fn register_routes(&self, router: Router) -> Router {
// The ErpModule trait uses Router<()> (no state type).
// Actual route registration with typed state is done
// via public_routes() and protected_routes(), called by erp-server.
router
}
fn register_event_handlers(&self, _bus: &EventBus) {
// Phase 2: subscribe to events from other modules if needed
// Auth 模块暂无跨模块事件订阅需求
}
async fn on_tenant_created(&self, _tenant_id: Uuid) -> AppResult<()> {
// Phase 2+: create default roles and admin user for new tenant
// TODO: 创建默认角色和管理员用户
Ok(())
}
async fn on_tenant_deleted(&self, _tenant_id: Uuid) -> AppResult<()> {
// Phase 2+: soft-delete all users belonging to the tenant
// TODO: 软删除该租户下所有用户
Ok(())
}

View File

@@ -1,11 +0,0 @@
[package]
name = "erp-common"
version.workspace = true
edition.workspace = true
[dependencies]
uuid.workspace = true
chrono.workspace = true
serde.workspace = true
serde_json.workspace = true
tracing.workspace = true

View File

@@ -1 +0,0 @@
pub mod utils;

View File

@@ -1,55 +0,0 @@
use chrono::{DateTime, Utc};
use uuid::Uuid;
/// 生成 UUID v7时间排序 + 唯一性)
pub fn generate_id() -> Uuid {
Uuid::now_v7()
}
/// 获取当前 UTC 时间
pub fn now() -> DateTime<Utc> {
Utc::now()
}
/// 软删除时间戳 — 返回 None 表示未删除
pub const fn not_deleted() -> Option<DateTime<Utc>> {
None
}
/// 生成租户级别的编号前缀
/// 格式: {prefix}-{timestamp_seconds}-{random_4hex}
pub fn generate_code(prefix: &str) -> String {
let ts = Utc::now().timestamp() as u32;
let random = (Uuid::now_v7().as_u128() & 0xFFFF) as u16;
format!("{}-{:08x}-{:04x}", prefix, ts, random)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_generate_id_returns_valid_uuid() {
let id = generate_id();
assert!(!id.is_nil());
}
#[test]
fn test_generate_code_format() {
let code = generate_code("USR");
assert!(code.starts_with("USR-"));
assert_eq!(code.len(), "USR-".len() + 8 + 1 + 4);
}
#[test]
fn test_not_deleted_returns_none() {
assert!(not_deleted().is_none());
}
#[test]
fn test_generate_ids_are_unique() {
let ids: std::collections::HashSet<Uuid> =
(0..100).map(|_| generate_id()).collect();
assert_eq!(ids.len(), 100);
}
}

View File

@@ -124,10 +124,6 @@ impl ErpModule for ConfigModule {
vec!["auth"]
}
fn register_routes(&self, router: Router) -> Router {
router
}
fn register_event_handlers(&self, _bus: &EventBus) {}
async fn on_tenant_created(&self, _tenant_id: Uuid) -> AppResult<()> {

View File

@@ -31,12 +31,6 @@ impl DomainEvent {
}
}
/// 事件处理器 trait
pub trait EventHandler: Send + Sync {
fn event_types(&self) -> Vec<String>;
fn handle(&self, event: &DomainEvent) -> impl std::future::Future<Output = anyhow::Result<()>> + Send;
}
/// 进程内事件总线
#[derive(Clone)]
pub struct EventBus {

View File

@@ -1,7 +1,6 @@
use std::any::Any;
use std::sync::Arc;
use axum::Router;
use uuid::Uuid;
use crate::error::AppResult;
@@ -24,9 +23,6 @@ pub trait ErpModule: Send + Sync {
vec![]
}
/// 注册 Axum 路由
fn register_routes(&self, router: Router) -> Router;
/// 注册事件处理器
fn register_event_handlers(&self, _bus: &EventBus) {}
@@ -68,12 +64,6 @@ impl ModuleRegistry {
self
}
pub fn build_router(&self, base: Router) -> Router {
self.modules
.iter()
.fold(base, |router, m| m.register_routes(router))
}
pub fn register_handlers(&self, bus: &EventBus) {
for module in self.modules.iter() {
module.register_event_handlers(bus);

View File

@@ -123,10 +123,6 @@ impl ErpModule for MessageModule {
vec!["auth"]
}
fn register_routes(&self, router: Router) -> Router {
router
}
fn register_event_handlers(&self, _bus: &EventBus) {}
async fn on_tenant_created(&self, _tenant_id: Uuid) -> AppResult<()> {
@@ -177,8 +173,32 @@ async fn handle_workflow_event(
}
}
"task.completed" => {
// 任务完成时通知发起人(此处简化处理)
tracing::debug!("Task completed event received, skipping notification for now");
// 任务完成时通知流程发起人
let task_id = event.payload.get("task_id")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let starter_id = event.payload.get("started_by")
.and_then(|v| v.as_str());
if let Some(starter) = starter_id {
let recipient = match uuid::Uuid::parse_str(starter) {
Ok(id) => id,
Err(_) => return Ok(()),
};
let _ = crate::service::message_service::MessageService::send_system(
event.tenant_id,
recipient,
"流程任务已完成".to_string(),
format!("流程任务 {} 已完成,请查看。", task_id),
"normal",
Some("workflow_task".to_string()),
uuid::Uuid::parse_str(task_id).ok(),
db,
event_bus,
)
.await
.map_err(|e| e.to_string())?;
}
}
_ => {}
}

View File

@@ -9,7 +9,6 @@ path = "src/main.rs"
[dependencies]
erp-core.workspace = true
erp-common.workspace = true
tokio.workspace = true
axum.workspace = true
tower.workspace = true

View File

@@ -0,0 +1,75 @@
use axum::extract::{Query, State};
use axum::response::Json;
use axum::routing::get;
use axum::Router;
use sea_orm::{ColumnTrait, EntityTrait, PaginatorTrait, QueryFilter, QueryOrder};
use serde::{Deserialize, Serialize};
use crate::state::AppState;
use erp_core::entity::audit_log;
use erp_core::error::AppError;
/// 审计日志查询参数。
#[derive(Debug, Deserialize)]
pub struct AuditLogQuery {
pub resource_type: Option<String>,
pub user_id: Option<uuid::Uuid>,
pub page: Option<u64>,
pub page_size: Option<u64>,
}
/// 审计日志分页响应。
#[derive(Debug, Serialize)]
pub struct AuditLogResponse {
pub items: Vec<audit_log::Model>,
pub total: u64,
pub page: u64,
pub page_size: u64,
}
/// GET /audit-logs
///
/// 分页查询审计日志,支持按 resource_type 和 user_id 过滤。
pub async fn list_audit_logs(
State(state): State<AppState>,
Query(params): Query<AuditLogQuery>,
) -> Result<Json<AuditLogResponse>, AppError> {
let page = params.page.unwrap_or(1).max(1);
let page_size = params.page_size.unwrap_or(20).min(100);
let tenant_id = state.default_tenant_id;
let mut q = audit_log::Entity::find()
.filter(audit_log::Column::TenantId.eq(tenant_id));
if let Some(rt) = &params.resource_type {
q = q.filter(audit_log::Column::ResourceType.eq(rt.clone()));
}
if let Some(uid) = &params.user_id {
q = q.filter(audit_log::Column::UserId.eq(*uid));
}
let paginator = q
.order_by_desc(audit_log::Column::CreatedAt)
.paginate(&state.db, page_size);
let total = paginator
.num_items()
.await
.map_err(|e| AppError::Internal(format!("查询审计日志失败: {e}")))?;
let items = paginator
.fetch_page(page - 1)
.await
.map_err(|e| AppError::Internal(format!("查询审计日志失败: {e}")))?;
Ok(Json(AuditLogResponse {
items,
total,
page,
page_size,
}))
}
pub fn audit_log_router() -> Router<AppState> {
Router::new().route("/audit-logs", get(list_audit_logs))
}

View File

@@ -1,2 +1,3 @@
pub mod audit_log;
pub mod health;
pub mod openapi;

View File

@@ -53,8 +53,8 @@ async fn main() -> anyhow::Result<()> {
erp_server_migration::Migrator::up(&db, None).await?;
tracing::info!("Database migrations applied");
// Seed default tenant and auth data if not present
{
// Seed default tenant and auth data if not present, and resolve the actual tenant ID
let default_tenant_id = {
#[derive(sea_orm::FromQueryResult)]
struct TenantId {
id: uuid::Uuid,
@@ -71,6 +71,7 @@ async fn main() -> anyhow::Result<()> {
match existing {
Some(row) => {
tracing::info!(tenant_id = %row.id, "Default tenant already exists, skipping seed");
row.id
}
None => {
let new_tenant_id = uuid::Uuid::now_v7();
@@ -101,9 +102,10 @@ async fn main() -> anyhow::Result<()> {
.map_err(|e| anyhow::anyhow!("Failed to seed auth data: {}", e))?;
tracing::info!(tenant_id = %new_tenant_id, "Default tenant ready with auth seed data");
new_tenant_id
}
}
}
};
// Connect to Redis
let redis_client = redis::Client::open(&config.redis.url[..])?;
@@ -147,6 +149,10 @@ async fn main() -> anyhow::Result<()> {
outbox::start_outbox_relay(db.clone(), event_bus.clone());
tracing::info!("Outbox relay started");
// Start timeout checker (scan overdue tasks every 60s)
erp_workflow::WorkflowModule::start_timeout_checker(db.clone());
tracing::info!("Timeout checker started");
let host = config.server.host.clone();
let port = config.server.port;
@@ -160,6 +166,7 @@ async fn main() -> anyhow::Result<()> {
event_bus,
module_registry: registry,
redis: redis_client.clone(),
default_tenant_id,
};
// --- Build the router ---
@@ -171,11 +178,15 @@ async fn main() -> anyhow::Result<()> {
// Both layers share the same AppState. The protected layer wraps routes
// with the jwt_auth_middleware_fn.
// Public routes (no authentication)
// Public routes (no authentication, but IP-based rate limiting)
let public_routes = Router::new()
.merge(handlers::health::health_check_router())
.merge(erp_auth::AuthModule::public_routes())
.route("/docs/openapi.json", axum::routing::get(handlers::openapi::openapi_spec))
.layer(axum::middleware::from_fn_with_state(
state.clone(),
middleware::rate_limit::rate_limit_by_ip,
))
.with_state(state.clone());
// Protected routes (JWT authentication required)
@@ -184,6 +195,7 @@ async fn main() -> anyhow::Result<()> {
.merge(erp_config::ConfigModule::protected_routes())
.merge(erp_workflow::WorkflowModule::protected_routes())
.merge(erp_message::MessageModule::protected_routes())
.merge(handlers::audit_log::audit_log_router())
.layer(axum::middleware::from_fn_with_state(
state.clone(),
middleware::rate_limit::rate_limit_by_user,
@@ -229,31 +241,34 @@ fn build_cors_layer(allowed_origins: &str) -> tower_http::cors::CorsLayer {
.collect::<Vec<_>>();
if origins.len() == 1 && origins[0] == "*" {
tracing::warn!("CORS: allowing all origins — only use in development!");
tower_http::cors::CorsLayer::permissive()
} else {
let allowed: Vec<HeaderValue> = origins
.iter()
.filter_map(|o| o.parse::<HeaderValue>().ok())
.collect();
tracing::info!(origins = ?origins, "CORS: restricting to allowed origins");
tower_http::cors::CorsLayer::new()
.allow_origin(AllowOrigin::list(allowed))
.allow_methods([
axum::http::Method::GET,
axum::http::Method::POST,
axum::http::Method::PUT,
axum::http::Method::DELETE,
axum::http::Method::PATCH,
])
.allow_headers([
axum::http::header::AUTHORIZATION,
axum::http::header::CONTENT_TYPE,
])
.allow_credentials(true)
tracing::warn!(
"⚠️ CORS 允许所有来源 — 仅限开发环境使用!\
生产环境请通过 ERP__CORS__ALLOWED_ORIGINS 设置具体的来源域名"
);
return tower_http::cors::CorsLayer::permissive();
}
let allowed: Vec<HeaderValue> = origins
.iter()
.filter_map(|o| o.parse::<HeaderValue>().ok())
.collect();
tracing::info!(origins = ?origins, "CORS: restricting to allowed origins");
tower_http::cors::CorsLayer::new()
.allow_origin(AllowOrigin::list(allowed))
.allow_methods([
axum::http::Method::GET,
axum::http::Method::POST,
axum::http::Method::PUT,
axum::http::Method::DELETE,
axum::http::Method::PATCH,
])
.allow_headers([
axum::http::header::AUTHORIZATION,
axum::http::header::CONTENT_TYPE,
])
.allow_credentials(true)
}
async fn shutdown_signal() {

View File

@@ -14,6 +14,8 @@ pub struct AppState {
pub event_bus: EventBus,
pub module_registry: ModuleRegistry,
pub redis: redis::Client,
/// 实际的默认租户 ID从数据库种子数据中获取。
pub default_tenant_id: uuid::Uuid,
}
/// Allow handlers to extract `DatabaseConnection` directly from `State<AppState>`.
@@ -44,10 +46,7 @@ impl FromRef<AppState> for erp_auth::AuthState {
jwt_secret: state.config.jwt.secret.clone(),
access_ttl_secs: parse_ttl(&state.config.jwt.access_token_ttl),
refresh_ttl_secs: parse_ttl(&state.config.jwt.refresh_token_ttl),
// Default tenant ID: during bootstrap, use a well-known UUID.
// In production, tenant resolution middleware will override this.
default_tenant_id: uuid::Uuid::parse_str("00000000-0000-0000-0000-000000000000")
.unwrap(),
default_tenant_id: state.default_tenant_id,
}
}
}

View File

@@ -246,10 +246,49 @@ impl FlowExecutor {
.await
}
NodeType::ServiceTask => {
// ServiceTask 尚未实现:无法自动执行服务调用,直接报错
return Err(WorkflowError::Validation(
format!("ServiceTask ({}) 尚未实现,流程无法继续", node.name),
));
// ServiceTask 自动执行:当前阶段自动跳过(直接推进到后继节点)
// 创建一个立即消费的 token 记录(用于审计追踪)
let now = Utc::now();
let system_user = uuid::Uuid::nil();
let auto_token_id = Uuid::now_v7();
let token_model = token::ActiveModel {
id: Set(auto_token_id),
tenant_id: Set(tenant_id),
instance_id: Set(instance_id),
node_id: Set(node_id.to_string()),
status: Set("consumed".to_string()),
created_at: Set(now),
updated_at: Set(now),
created_by: Set(system_user),
updated_by: Set(system_user),
deleted_at: Set(None),
version: Set(1),
consumed_at: Set(Some(now)),
};
token_model
.insert(txn)
.await
.map_err(|e| WorkflowError::Validation(e.to_string()))?;
tracing::info!(node_id = node_id, node_name = %node.name, "ServiceTask 自动跳过(尚未实现 HTTP 调用)");
// 沿出边继续推进
let outgoing = graph.get_outgoing_edges(node_id);
let mut new_tokens = Vec::new();
for edge in &outgoing {
let tokens = Self::create_token_at_node(
instance_id,
tenant_id,
&edge.target,
graph,
variables,
txn,
)
.await?;
new_tokens.extend(tokens);
}
Ok(new_tokens)
}
_ => {
// UserTask / 网关(分支)等:创建活跃 token
@@ -407,6 +446,22 @@ impl FlowExecutor {
active.completed_at = Set(Some(Utc::now()));
active.updated_at = Set(Utc::now());
active.update(txn).await.map_err(|e| WorkflowError::Validation(e.to_string()))?;
// 写入完成事件到 outbox由 relay 广播
let now = Utc::now();
let outbox_event = erp_core::entity::domain_event::ActiveModel {
id: Set(Uuid::now_v7()),
tenant_id: Set(tenant_id),
event_type: Set("process_instance.completed".to_string()),
payload: Set(Some(serde_json::json!({ "instance_id": instance_id }))),
correlation_id: Set(Some(Uuid::now_v7())),
status: Set("pending".to_string()),
attempts: Set(0),
last_error: Set(None),
created_at: Set(now),
published_at: Set(None),
};
outbox_event.insert(txn).await.map_err(|e| WorkflowError::Validation(e.to_string()))?;
}
Ok(())

View File

@@ -1,7 +1,7 @@
// 超时检查框架 — 占位实现
// 超时检查框架
//
// 当前版本仅提供接口定义,实际超时检查逻辑将在后续迭代中实现。
// Task 表的 due_date 字段已支持设置超时时间
// TimeoutChecker 定期扫描 tasks 表中已超时但仍处于 pending 状态的任务,
// 以便触发自动完成或升级逻辑(后续迭代实现)
use chrono::Utc;
use sea_orm::{ColumnTrait, EntityTrait, QueryFilter};
@@ -10,11 +10,11 @@ use uuid::Uuid;
use crate::entity::task;
use crate::error::WorkflowResult;
/// 超时检查服务(占位)
/// 超时检查服务。
pub struct TimeoutChecker;
impl TimeoutChecker {
/// 查询已超时但未完成的任务列表。
/// 查询指定租户下已超时但未完成的任务列表。
///
/// 返回 due_date < now 且 status = 'pending' 的任务 ID。
pub async fn find_overdue_tasks(
@@ -33,4 +33,23 @@ impl TimeoutChecker {
Ok(overdue.iter().map(|t| t.id).collect())
}
/// 查询所有租户中已超时但未完成的任务列表。
///
/// 返回 due_date < now 且 status = 'pending' 的任务 ID。
/// 用于后台定时任务的全量扫描。
pub async fn find_all_overdue_tasks(
db: &sea_orm::DatabaseConnection,
) -> WorkflowResult<Vec<Uuid>> {
let now = Utc::now();
let overdue = task::Entity::find()
.filter(task::Column::Status.eq("pending"))
.filter(task::Column::DueDate.lt(now))
.filter(task::Column::DeletedAt.is_null())
.all(db)
.await
.map_err(|e| crate::error::WorkflowError::Validation(e.to_string()))?;
Ok(overdue.iter().map(|t| t.id).collect())
}
}

View File

@@ -1,5 +1,6 @@
use axum::Router;
use axum::routing::{get, post};
use std::time::Duration;
use uuid::Uuid;
use erp_core::error::AppResult;
@@ -83,6 +84,38 @@ impl WorkflowModule {
post(task_handler::delegate_task),
)
}
/// 启动超时检查后台任务。
///
/// 每 60 秒扫描一次 tasks 表,查找 due_date 已过期但仍处于 pending 状态的任务。
/// 发现超时任务时记录 warning 日志,后续迭代将实现自动完成/升级逻辑。
pub fn start_timeout_checker(db: sea_orm::DatabaseConnection) {
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(60));
// 首次跳过,等一个完整间隔再执行
interval.tick().await;
loop {
interval.tick().await;
match crate::engine::timeout::TimeoutChecker::find_all_overdue_tasks(&db).await {
Ok(overdue) => {
if !overdue.is_empty() {
tracing::warn!(
count = overdue.len(),
task_ids = ?overdue,
"发现超时未完成的任务 — TODO: 实现自动完成/升级逻辑"
);
}
}
Err(e) => {
tracing::warn!(error = %e, "超时检查任务执行失败");
}
}
}
});
}
}
impl Default for WorkflowModule {
@@ -105,11 +138,6 @@ impl ErpModule for WorkflowModule {
vec!["auth"]
}
fn register_routes(&self, router: Router) -> Router {
// Actual route registration is done via protected_routes(), called by erp-server.
router
}
fn register_event_handlers(&self, _bus: &EventBus) {}
async fn on_tenant_created(&self, _tenant_id: Uuid) -> AppResult<()> {

View File

@@ -312,6 +312,27 @@ impl InstanceService {
.await
.map_err(|e| WorkflowError::Validation(e.to_string()))?;
// 发布状态变更领域事件(通过 outbox 模式,由 relay 广播)
let event_type = format!("process_instance.{}", to_status);
let event_id = Uuid::now_v7();
let now = Utc::now();
let outbox_event = erp_core::entity::domain_event::ActiveModel {
id: Set(event_id),
tenant_id: Set(tenant_id),
event_type: Set(event_type),
payload: Set(Some(serde_json::json!({ "instance_id": id, "changed_by": operator_id }))),
correlation_id: Set(Some(Uuid::now_v7())),
status: Set("pending".to_string()),
attempts: Set(0),
last_error: Set(None),
created_at: Set(now),
published_at: Set(None),
};
match outbox_event.insert(db).await {
Ok(_) => {}
Err(e) => tracing::warn!(error = %e, "领域事件持久化失败"),
}
let action = format!("process_instance.{}", to_status);
audit_service::record(
AuditLog::new(tenant_id, Some(operator_id), action, "process_instance")