Files
erp/plans/bubbly-squishing-lerdorf.md
iven 5d6e1dc394 feat(core): implement optimistic locking across all entities
Add VersionMismatch error variant and check_version() helper to erp-core.
All 13 mutable entities now enforce version checking on update/delete:
- erp-auth: user, role, organization, department, position
- erp-config: dictionary, dictionary_item, menu, setting, numbering_rule
- erp-workflow: process_definition, process_instance, task
- erp-message: message, message_subscription

Update DTOs to expose version in responses and require version in update
requests. HTTP 409 Conflict returned on version mismatch.
2026-04-11 23:25:43 +08:00

10 KiB
Raw Permalink Blame History

Phase 7: 审计日志 + 乐观锁 + Redis 限流 + 事件 Outbox

Context

Phase 1-6 已完成。对比设计规格发现 4 项核心基础设施缺失:

  1. 审计日志 — AuditLog 类型存在但从未使用audit_logs 表存在但无 Entity/Service
  2. 乐观锁 — 所有实体有 version 字段但更新时不检查/递增DTO 不暴露 version
  3. Redis 限流 — 客户端创建后立即丢弃(_redis_client),未存入 AppState
  4. 事件 Outbox — EventBus 纯内存 broadcast重启即丢失无持久化

实施顺序与依赖

Task 7.1 乐观锁 (erp-core error helper)
  → Task 7.2 乐观锁 (全部 service 方法 + DTO)
    → Task 7.3 审计日志 (Entity + Service + 集成)
      → Task 7.4 Redis 限流 (AppState + 中间件)
        → Task 7.5 事件 Outbox (迁移 + Entity + EventBus 改造)

Task 7.1: 乐观锁 — erp-core 基础设施

修改文件:

  • crates/erp-core/src/error.rs — 添加 VersionMismatch 变体 + check_version() helper
// 新增变体
#[error("版本冲突: 数据已被其他操作修改,请刷新后重试")]
VersionMismatch,

// 新增 helper 函数
pub fn check_version(expected: i32, actual: i32) -> AppResult<i32> {
    if expected == actual { Ok(actual + 1) }
    else { Err(AppError::VersionMismatch) }
}

IntoResponse 中 VersionMismatch 映射到 StatusCode::CONFLICT (409)。


Task 7.2: 乐观锁 — 全部 Service 方法 + DTO

原则: 所有用户可调用的 update/delete 方法必须检查并递增 version。

DTO 变更

所有 Update*Req 添加 pub version: i32 字段(必填)。涉及:

Crate DTO 文件 DTOs
erp-auth dto.rs UpdateUserReq, UpdateRoleReq, UpdateOrganizationReq, UpdateDepartmentReq, UpdatePositionReq
erp-config dto.rs UpdateDictionaryReq, UpdateDictionaryItemReq, UpdateMenuReq, UpdateNumberingRuleReq
erp-workflow dto.rs UpdateProcessDefinitionReq
erp-message dto.rs UpdateSubscriptionReq (如果存在)

*所有 Resp 添加 pub version: i32 字段。涉及:

Crate Resp DTOs
erp-auth UserResp, RoleResp, OrganizationResp, DepartmentResp, PositionResp
erp-config DictionaryResp, DictionaryItemResp, MenuResp, SettingResp, NumberingRuleResp
erp-workflow ProcessDefinitionResp, ProcessInstanceResp, TaskResp
erp-message MessageResp, MessageSubscriptionResp

每个 model_to_resp 函数添加 version: m.version

Service 方法变更

Update 模式(有 DTO

// 在 update 方法中,读取 model 后:
let next_ver = erp_core::error::check_version(req.version, model.version)?;
// ... 设置字段 ...
active.version = Set(next_ver);
active.update(db).await?;

Delete 模式(无 DTO version

// delete 方法中,读取 model 后:
active.version = Set(model.version + 1);

涉及文件13 个 service 的 update/delete 方法):

Crate 文件 方法
erp-auth user_service.rs update, delete
erp-auth role_service.rs update, delete
erp-auth org_service.rs update, delete
erp-auth dept_service.rs update, delete
erp-auth position_service.rs update, delete
erp-config dictionary_service.rs update, delete, update_item, delete_item
erp-config menu_service.rs update, delete
erp-config setting_service.rs set (update 分支), delete
erp-config numbering_service.rs update, delete
erp-workflow definition_service.rs update, publish, delete
erp-workflow instance_service.rs 状态变更方法 (suspend/resume/terminate)
erp-workflow task_service.rs complete, delegate
erp-message message_service.rs mark_read, delete
erp-message subscription_service.rs upsert (update 分支)

注意: numbering_service::generate_number 使用 advisory lock不需要 version 检查。

前端适配

前端所有编辑表单需要在请求时传递 version 字段。涉及:

  • apps/web/src/pages/ 下所有调用 PUT API 的页面

Task 7.3: 审计日志

7.3a: SeaORM Entity

新建文件:

  • crates/erp-core/src/entity/mod.rs
  • crates/erp-core/src/entity/audit_log.rs

修改文件:

  • crates/erp-core/src/lib.rs — 添加 pub mod entity;
  • crates/erp-core/Cargo.toml — 添加 sea-orm 依赖(如果尚未有)

audit_log.rs Entity 映射已有的 audit_logs 表(迁移 #26 已存在)。

7.3b: 审计记录服务

新建文件: crates/erp-core/src/audit_service.rs

/// 持久化审计日志到 audit_logs 表。
/// 使用 fire-and-forget 模式:失败仅记录日志,不影响业务操作。
pub async fn record(log: AuditLog, db: &DatabaseConnection) {
    // AuditLog → audit_log::ActiveModel → insert
    // 失败时 tracing::warn!
}

修改文件: crates/erp-core/src/lib.rs — 添加 pub mod audit_service;

7.3c: 集成到所有 mutation service

在每个 service 的 create/update/delete 方法中,操作成功后调用 audit_service::record()

请求信息获取: handler 层从 HeaderMap 提取 IP 和 User-Agent传给 service。

// handler 中
fn extract_request_info(headers: &HeaderMap) -> (Option<String>, Option<String>) {
    let ip = headers.get("x-forwarded-for").or_else(|| headers.get("x-real-ip"))
        .and_then(|v| v.to_str().ok()).map(|s| s.to_string());
    let ua = headers.get("user-agent").and_then(|v| v.to_str().ok()).map(|s| s.to_string());
    (ip, ua)
}

Handler 签名增加 headers: HeaderMap 参数service 方法签名增加 ip: Option<String>, user_agent: Option<String>

涉及文件(与乐观锁相同 + handler 层):

Crate Handler 文件
erp-auth user_handler.rs, role_handler.rs, org_handler.rs
erp-config dictionary_handler.rs, menu_handler.rs, setting_handler.rs, numbering_handler.rs
erp-workflow definition_handler.rs, instance_handler.rs, task_handler.rs
erp-message message_handler.rs, subscription_handler.rs

Task 7.4: Redis 限流

7.4a: Redis 存入 AppState

修改文件:

  • crates/erp-server/src/state.rsAppState 添加 pub redis: redis::Client
  • crates/erp-server/src/main.rs_redis_clientredis_client,传入 AppState

7.4b: 限流中间件

新建文件:

  • crates/erp-server/src/middleware/mod.rs
  • crates/erp-server/src/middleware/rate_limit.rs

使用 Redis INCR + EXPIRE 实现滑动窗口:

  • Key: rate_limit:{prefix}:{identifier}
  • 登录: 5 次/分钟/IP
  • 写操作: 100 次/分钟/user_id

7.4c: 应用限流层

修改文件: crates/erp-server/src/main.rs

  • 登录路由添加 IP 限流层
  • protected routes 添加 user_id 限流层
  • 超限返回 HTTP 429 Too Many Requests

修改文件: crates/erp-core/src/error.rs

  • 添加 TooManyRequests 变体(可选,中间件可直接返回 429

Task 7.5: 事件 Outbox 持久化

7.5a: 数据库迁移

新建文件: crates/erp-server/migration/src/m20260416_000031_create_domain_events.rs

CREATE TABLE IF NOT EXISTS domain_events (
    id              UUID PRIMARY KEY,
    tenant_id       UUID NOT NULL,
    event_type      VARCHAR(200) NOT NULL,
    payload         JSONB,
    correlation_id  UUID,
    status          VARCHAR(20) NOT NULL DEFAULT 'pending',
    attempts        INT NOT NULL DEFAULT 0,
    last_error      TEXT,
    created_at      TIMESTAMPTZ NOT NULL,
    published_at    TIMESTAMPTZ
);
CREATE INDEX idx_domain_events_status ON domain_events (status, created_at);
CREATE INDEX idx_domain_events_tenant ON domain_events (tenant_id);

修改文件: crates/erp-server/migration/src/lib.rs — 注册新迁移

7.5b: SeaORM Entity

新建文件: crates/erp-core/src/entity/domain_event.rs

修改文件: crates/erp-core/src/entity/mod.rs — 添加 pub mod domain_event;

7.5c: EventBus 改造

修改文件: crates/erp-core/src/events.rs

  • 现有 publish() 重命名为 broadcast()(内部使用)
  • 新增 publish_with_persist(event, db) — 先 INSERT domain_events再 broadcast
  • INSERT 失败时仅 log warning仍然 broadcastbest-effort

7.5d: 更新所有 publish 调用点

全部 25 个 event_bus.publish(...) 调用改为 event_bus.publish_with_persist(event, db).await

涉及文件:

  • erp-auth/src/service/ — 5 个文件 (user, role, org, dept, position)
  • erp-config/src/service/ — 4 个文件 (dictionary, menu, setting, numbering)
  • erp-workflow/src/service/ — 3 个文件 (definition, instance, task)
  • erp-message/src/service/ — 1 个文件 (message_service)

7.5e: Outbox Relay 后台任务

新建文件: crates/erp-server/src/outbox.rs

后台 tokio task 每 5 秒扫描 domain_events WHERE status = 'pending',重新 broadcast 并标记为 published。

修改文件: crates/erp-server/src/main.rs — 启动 outbox relay


关键文件索引

用途 文件路径
错误类型 crates/erp-core/src/error.rs
事件总线 crates/erp-core/src/events.rs
审计日志类型 crates/erp-core/src/audit.rs
AppState crates/erp-server/src/state.rs
服务器入口 crates/erp-server/src/main.rs
迁移注册 crates/erp-server/migration/src/lib.rs
Auth DTO crates/erp-auth/src/dto.rs
Auth Service 参考 crates/erp-auth/src/service/user_service.rs
Auth Handler 参考 crates/erp-auth/src/handler/user_handler.rs

验证方式

  1. cargo check — 全 workspace 编译通过
  2. cargo test --workspace — 所有测试通过
  3. 手动测试:更新用户两次(第二次用旧 version→ 409 Conflict
  4. 手动测试:登录限流 → 第 6 次返回 429
  5. 查询 SELECT * FROM audit_logs → 验证审计记录
  6. 查询 SELECT * FROM domain_events → 验证事件持久化
  7. 重启服务后验证 pending 事件被 relay 处理