Add VersionMismatch error variant and check_version() helper to erp-core. All 13 mutable entities now enforce version checking on update/delete: - erp-auth: user, role, organization, department, position - erp-config: dictionary, dictionary_item, menu, setting, numbering_rule - erp-workflow: process_definition, process_instance, task - erp-message: message, message_subscription Update DTOs to expose version in responses and require version in update requests. HTTP 409 Conflict returned on version mismatch.
286 lines
10 KiB
Markdown
286 lines
10 KiB
Markdown
# Phase 7: 审计日志 + 乐观锁 + Redis 限流 + 事件 Outbox
|
||
|
||
## Context
|
||
|
||
Phase 1-6 已完成。对比设计规格发现 4 项核心基础设施缺失:
|
||
1. **审计日志** — AuditLog 类型存在但从未使用,audit_logs 表存在但无 Entity/Service
|
||
2. **乐观锁** — 所有实体有 version 字段但更新时不检查/递增,DTO 不暴露 version
|
||
3. **Redis 限流** — 客户端创建后立即丢弃(`_redis_client`),未存入 AppState
|
||
4. **事件 Outbox** — EventBus 纯内存 broadcast,重启即丢失,无持久化
|
||
|
||
## 实施顺序与依赖
|
||
|
||
```
|
||
Task 7.1 乐观锁 (erp-core error helper)
|
||
→ Task 7.2 乐观锁 (全部 service 方法 + DTO)
|
||
→ Task 7.3 审计日志 (Entity + Service + 集成)
|
||
→ Task 7.4 Redis 限流 (AppState + 中间件)
|
||
→ Task 7.5 事件 Outbox (迁移 + Entity + EventBus 改造)
|
||
```
|
||
|
||
---
|
||
|
||
## Task 7.1: 乐观锁 — erp-core 基础设施
|
||
|
||
**修改文件:**
|
||
- `crates/erp-core/src/error.rs` — 添加 `VersionMismatch` 变体 + `check_version()` helper
|
||
|
||
```rust
|
||
// 新增变体
|
||
#[error("版本冲突: 数据已被其他操作修改,请刷新后重试")]
|
||
VersionMismatch,
|
||
|
||
// 新增 helper 函数
|
||
pub fn check_version(expected: i32, actual: i32) -> AppResult<i32> {
|
||
if expected == actual { Ok(actual + 1) }
|
||
else { Err(AppError::VersionMismatch) }
|
||
}
|
||
```
|
||
|
||
IntoResponse 中 `VersionMismatch` 映射到 `StatusCode::CONFLICT` (409)。
|
||
|
||
---
|
||
|
||
## Task 7.2: 乐观锁 — 全部 Service 方法 + DTO
|
||
|
||
**原则:** 所有用户可调用的 update/delete 方法必须检查并递增 version。
|
||
|
||
### DTO 变更
|
||
|
||
**所有 Update*Req** 添加 `pub version: i32` 字段(必填)。涉及:
|
||
|
||
| Crate | DTO 文件 | DTOs |
|
||
|-------|---------|------|
|
||
| erp-auth | `dto.rs` | UpdateUserReq, UpdateRoleReq, UpdateOrganizationReq, UpdateDepartmentReq, UpdatePositionReq |
|
||
| erp-config | `dto.rs` | UpdateDictionaryReq, UpdateDictionaryItemReq, UpdateMenuReq, UpdateNumberingRuleReq |
|
||
| erp-workflow | `dto.rs` | UpdateProcessDefinitionReq |
|
||
| erp-message | `dto.rs` | UpdateSubscriptionReq (如果存在) |
|
||
|
||
**所有 *Resp** 添加 `pub version: i32` 字段。涉及:
|
||
|
||
| Crate | Resp DTOs |
|
||
|-------|-----------|
|
||
| erp-auth | UserResp, RoleResp, OrganizationResp, DepartmentResp, PositionResp |
|
||
| erp-config | DictionaryResp, DictionaryItemResp, MenuResp, SettingResp, NumberingRuleResp |
|
||
| erp-workflow | ProcessDefinitionResp, ProcessInstanceResp, TaskResp |
|
||
| erp-message | MessageResp, MessageSubscriptionResp |
|
||
|
||
每个 `model_to_resp` 函数添加 `version: m.version`。
|
||
|
||
### Service 方法变更
|
||
|
||
**Update 模式(有 DTO):**
|
||
```rust
|
||
// 在 update 方法中,读取 model 后:
|
||
let next_ver = erp_core::error::check_version(req.version, model.version)?;
|
||
// ... 设置字段 ...
|
||
active.version = Set(next_ver);
|
||
active.update(db).await?;
|
||
```
|
||
|
||
**Delete 模式(无 DTO version):**
|
||
```rust
|
||
// delete 方法中,读取 model 后:
|
||
active.version = Set(model.version + 1);
|
||
```
|
||
|
||
**涉及文件(13 个 service 的 update/delete 方法):**
|
||
|
||
| Crate | 文件 | 方法 |
|
||
|-------|------|------|
|
||
| erp-auth | `user_service.rs` | update, delete |
|
||
| erp-auth | `role_service.rs` | update, delete |
|
||
| erp-auth | `org_service.rs` | update, delete |
|
||
| erp-auth | `dept_service.rs` | update, delete |
|
||
| erp-auth | `position_service.rs` | update, delete |
|
||
| erp-config | `dictionary_service.rs` | update, delete, update_item, delete_item |
|
||
| erp-config | `menu_service.rs` | update, delete |
|
||
| erp-config | `setting_service.rs` | set (update 分支), delete |
|
||
| erp-config | `numbering_service.rs` | update, delete |
|
||
| erp-workflow | `definition_service.rs` | update, publish, delete |
|
||
| erp-workflow | `instance_service.rs` | 状态变更方法 (suspend/resume/terminate) |
|
||
| erp-workflow | `task_service.rs` | complete, delegate |
|
||
| erp-message | `message_service.rs` | mark_read, delete |
|
||
| erp-message | `subscription_service.rs` | upsert (update 分支) |
|
||
|
||
**注意:** `numbering_service::generate_number` 使用 advisory lock,不需要 version 检查。
|
||
|
||
### 前端适配
|
||
|
||
前端所有编辑表单需要在请求时传递 version 字段。涉及:
|
||
- `apps/web/src/pages/` 下所有调用 PUT API 的页面
|
||
|
||
---
|
||
|
||
## Task 7.3: 审计日志
|
||
|
||
### 7.3a: SeaORM Entity
|
||
|
||
**新建文件:**
|
||
- `crates/erp-core/src/entity/mod.rs`
|
||
- `crates/erp-core/src/entity/audit_log.rs`
|
||
|
||
**修改文件:**
|
||
- `crates/erp-core/src/lib.rs` — 添加 `pub mod entity;`
|
||
- `crates/erp-core/Cargo.toml` — 添加 sea-orm 依赖(如果尚未有)
|
||
|
||
audit_log.rs Entity 映射已有的 `audit_logs` 表(迁移 #26 已存在)。
|
||
|
||
### 7.3b: 审计记录服务
|
||
|
||
**新建文件:** `crates/erp-core/src/audit_service.rs`
|
||
|
||
```rust
|
||
/// 持久化审计日志到 audit_logs 表。
|
||
/// 使用 fire-and-forget 模式:失败仅记录日志,不影响业务操作。
|
||
pub async fn record(log: AuditLog, db: &DatabaseConnection) {
|
||
// AuditLog → audit_log::ActiveModel → insert
|
||
// 失败时 tracing::warn!
|
||
}
|
||
```
|
||
|
||
**修改文件:** `crates/erp-core/src/lib.rs` — 添加 `pub mod audit_service;`
|
||
|
||
### 7.3c: 集成到所有 mutation service
|
||
|
||
在每个 service 的 create/update/delete 方法中,操作成功后调用 `audit_service::record()`。
|
||
|
||
**请求信息获取:** handler 层从 `HeaderMap` 提取 IP 和 User-Agent,传给 service。
|
||
|
||
```rust
|
||
// handler 中
|
||
fn extract_request_info(headers: &HeaderMap) -> (Option<String>, Option<String>) {
|
||
let ip = headers.get("x-forwarded-for").or_else(|| headers.get("x-real-ip"))
|
||
.and_then(|v| v.to_str().ok()).map(|s| s.to_string());
|
||
let ua = headers.get("user-agent").and_then(|v| v.to_str().ok()).map(|s| s.to_string());
|
||
(ip, ua)
|
||
}
|
||
```
|
||
|
||
Handler 签名增加 `headers: HeaderMap` 参数,service 方法签名增加 `ip: Option<String>, user_agent: Option<String>`。
|
||
|
||
**涉及文件(与乐观锁相同 + handler 层):**
|
||
|
||
| Crate | Handler 文件 |
|
||
|-------|-------------|
|
||
| erp-auth | `user_handler.rs`, `role_handler.rs`, `org_handler.rs` |
|
||
| erp-config | `dictionary_handler.rs`, `menu_handler.rs`, `setting_handler.rs`, `numbering_handler.rs` |
|
||
| erp-workflow | `definition_handler.rs`, `instance_handler.rs`, `task_handler.rs` |
|
||
| erp-message | `message_handler.rs`, `subscription_handler.rs` |
|
||
|
||
---
|
||
|
||
## Task 7.4: Redis 限流
|
||
|
||
### 7.4a: Redis 存入 AppState
|
||
|
||
**修改文件:**
|
||
- `crates/erp-server/src/state.rs` — `AppState` 添加 `pub redis: redis::Client`
|
||
- `crates/erp-server/src/main.rs` — `_redis_client` → `redis_client`,传入 AppState
|
||
|
||
### 7.4b: 限流中间件
|
||
|
||
**新建文件:**
|
||
- `crates/erp-server/src/middleware/mod.rs`
|
||
- `crates/erp-server/src/middleware/rate_limit.rs`
|
||
|
||
使用 Redis INCR + EXPIRE 实现滑动窗口:
|
||
- Key: `rate_limit:{prefix}:{identifier}`
|
||
- 登录: 5 次/分钟/IP
|
||
- 写操作: 100 次/分钟/user_id
|
||
|
||
### 7.4c: 应用限流层
|
||
|
||
**修改文件:** `crates/erp-server/src/main.rs`
|
||
- 登录路由添加 IP 限流层
|
||
- protected routes 添加 user_id 限流层
|
||
- 超限返回 HTTP 429 Too Many Requests
|
||
|
||
**修改文件:** `crates/erp-core/src/error.rs`
|
||
- 添加 `TooManyRequests` 变体(可选,中间件可直接返回 429)
|
||
|
||
---
|
||
|
||
## Task 7.5: 事件 Outbox 持久化
|
||
|
||
### 7.5a: 数据库迁移
|
||
|
||
**新建文件:** `crates/erp-server/migration/src/m20260416_000031_create_domain_events.rs`
|
||
|
||
```sql
|
||
CREATE TABLE IF NOT EXISTS domain_events (
|
||
id UUID PRIMARY KEY,
|
||
tenant_id UUID NOT NULL,
|
||
event_type VARCHAR(200) NOT NULL,
|
||
payload JSONB,
|
||
correlation_id UUID,
|
||
status VARCHAR(20) NOT NULL DEFAULT 'pending',
|
||
attempts INT NOT NULL DEFAULT 0,
|
||
last_error TEXT,
|
||
created_at TIMESTAMPTZ NOT NULL,
|
||
published_at TIMESTAMPTZ
|
||
);
|
||
CREATE INDEX idx_domain_events_status ON domain_events (status, created_at);
|
||
CREATE INDEX idx_domain_events_tenant ON domain_events (tenant_id);
|
||
```
|
||
|
||
**修改文件:** `crates/erp-server/migration/src/lib.rs` — 注册新迁移
|
||
|
||
### 7.5b: SeaORM Entity
|
||
|
||
**新建文件:** `crates/erp-core/src/entity/domain_event.rs`
|
||
|
||
**修改文件:** `crates/erp-core/src/entity/mod.rs` — 添加 `pub mod domain_event;`
|
||
|
||
### 7.5c: EventBus 改造
|
||
|
||
**修改文件:** `crates/erp-core/src/events.rs`
|
||
|
||
- 现有 `publish()` 重命名为 `broadcast()`(内部使用)
|
||
- 新增 `publish_with_persist(event, db)` — 先 INSERT domain_events,再 broadcast
|
||
- INSERT 失败时仅 log warning,仍然 broadcast(best-effort)
|
||
|
||
### 7.5d: 更新所有 publish 调用点
|
||
|
||
全部 25 个 `event_bus.publish(...)` 调用改为 `event_bus.publish_with_persist(event, db).await`。
|
||
|
||
**涉及文件:**
|
||
- `erp-auth/src/service/` — 5 个文件 (user, role, org, dept, position)
|
||
- `erp-config/src/service/` — 4 个文件 (dictionary, menu, setting, numbering)
|
||
- `erp-workflow/src/service/` — 3 个文件 (definition, instance, task)
|
||
- `erp-message/src/service/` — 1 个文件 (message_service)
|
||
|
||
### 7.5e: Outbox Relay 后台任务
|
||
|
||
**新建文件:** `crates/erp-server/src/outbox.rs`
|
||
|
||
后台 tokio task 每 5 秒扫描 `domain_events WHERE status = 'pending'`,重新 broadcast 并标记为 published。
|
||
|
||
**修改文件:** `crates/erp-server/src/main.rs` — 启动 outbox relay
|
||
|
||
---
|
||
|
||
## 关键文件索引
|
||
|
||
| 用途 | 文件路径 |
|
||
|------|---------|
|
||
| 错误类型 | `crates/erp-core/src/error.rs` |
|
||
| 事件总线 | `crates/erp-core/src/events.rs` |
|
||
| 审计日志类型 | `crates/erp-core/src/audit.rs` |
|
||
| AppState | `crates/erp-server/src/state.rs` |
|
||
| 服务器入口 | `crates/erp-server/src/main.rs` |
|
||
| 迁移注册 | `crates/erp-server/migration/src/lib.rs` |
|
||
| Auth DTO | `crates/erp-auth/src/dto.rs` |
|
||
| Auth Service 参考 | `crates/erp-auth/src/service/user_service.rs` |
|
||
| Auth Handler 参考 | `crates/erp-auth/src/handler/user_handler.rs` |
|
||
|
||
## 验证方式
|
||
|
||
1. `cargo check` — 全 workspace 编译通过
|
||
2. `cargo test --workspace` — 所有测试通过
|
||
3. 手动测试:更新用户两次(第二次用旧 version)→ 409 Conflict
|
||
4. 手动测试:登录限流 → 第 6 次返回 429
|
||
5. 查询 `SELECT * FROM audit_logs` → 验证审计记录
|
||
6. 查询 `SELECT * FROM domain_events` → 验证事件持久化
|
||
7. 重启服务后验证 pending 事件被 relay 处理
|