基于全景审计分析,产出 5 份跨领域设计规格: 1. 性能优化 — 后端批量INSERT/合并COUNT/告警预加载 + 前端N+1内联name 2. 安全纵深防御 — PostgreSQL RLS/行级数据范围/session_key Redis/审计哈希链 3. 事件驱动架构增强 — 6个业务域11个缺失事件补发 + Outbox LISTEN/NOTIFY 4. 前端工程化 — 14个大组件拆分 + 3个重复模式统一 + Bundle优化 5. 可观测性与运维 — 深度健康检查/Prometheus/OpenTelemetry/生产Docker
250 lines
9.7 KiB
Markdown
250 lines
9.7 KiB
Markdown
# 事件驱动架构增强设计规格
|
||
|
||
> 日期: 2026-04-26 | 状态: draft | 主题: 缺失事件补发 + Outbox relay 优化 + 事件 schema 版本化
|
||
|
||
## 1. 背景
|
||
|
||
HMS 已有完整的事件总线基础设施:
|
||
|
||
- **EventBus** (`erp-core/src/events.rs`): 两阶段发布(先持久化 pending → 广播 → 更新 published)
|
||
- **Outbox relay** (`erp-server/src/outbox.rs`): 5 秒轮询 domain_events 表,重发 pending 事件
|
||
- **domain_events 表**: id, tenant_id, event_type, payload, status, attempts, created_at, published_at
|
||
|
||
已有事件发布的模块: patient, appointment, follow_up, consultation, health_data, alert_engine, device_reading, doctor。
|
||
|
||
## 2. 问题分析
|
||
|
||
### 2.1 缺失事件清单
|
||
|
||
以下 6 个业务域的 service 文件中无任何 `event_bus.publish` 调用:
|
||
|
||
| 业务域 | Service 文件 | 缺失事件 |
|
||
|--------|-------------|----------|
|
||
| 透析记录 | `dialysis_service.rs` | `dialysis_record.created/reviewed` |
|
||
| 诊断 | `diagnosis_service.rs` | `diagnosis.created/updated` |
|
||
| 知情同意 | `consent_service.rs` | `consent.granted/revoked` |
|
||
| 日常监测 | `daily_monitoring_service.rs` | `daily_monitoring.created` |
|
||
| 积分 | `points_service.rs` | `points.earned/exchanged` |
|
||
| 资讯文章 | `article_service.rs` | `article.published/rejected` |
|
||
|
||
### 2.2 基础设施改进项
|
||
|
||
| 编号 | 问题 | 影响 |
|
||
|------|------|------|
|
||
| I-1 | Outbox relay 5 秒轮询延迟高 | 事件从产生到广播最长 5 秒延迟 |
|
||
| I-2 | 事件 payload 无 schema 版本 | 消费者无法安全演进,字段增删破坏兼容性 |
|
||
| I-3 | 无事件幂等性保证 | 消费者重复消费可能导致业务异常 |
|
||
| I-4 | domain_events 表无清理策略 | 表无限增长影响查询性能 |
|
||
|
||
## 3. 解决方案
|
||
|
||
### 3.1 缺失事件补发
|
||
|
||
#### 3.1.1 事件优先级排序
|
||
|
||
| 优先级 | 事件 | 理由 |
|
||
|--------|------|------|
|
||
| P0 | `dialysis_record.created/reviewed` | 透析是核心医疗流程,需触发统计更新和告警检查 |
|
||
| P0 | `diagnosis.created/updated` | 诊断关联后续治疗方案,影响预约/随访 |
|
||
| P1 | `consent.granted/revoked` | 合规要求,知情同意变更需通知医护 |
|
||
| P1 | `article.published/rejected` | 内容审核流程依赖事件驱动 |
|
||
| P2 | `daily_monitoring.created` | 日常监测触发趋势分析 |
|
||
| P2 | `points.earned/exchanged` | 积分变动通知用户 |
|
||
|
||
#### 3.1.2 统一事件信封格式
|
||
|
||
所有事件 payload 遵循统一信封:
|
||
|
||
```json
|
||
{
|
||
"schema_version": "v1",
|
||
"entity_id": "uuid",
|
||
"entity_type": "dialysis_record",
|
||
"action": "created",
|
||
"tenant_id": "uuid",
|
||
"operator_id": "uuid | null",
|
||
"timestamp": "ISO 8601",
|
||
"data": { /* 实体快照或变更字段 */ },
|
||
"metadata": { "source": "erp-health", "trace_id": "uuid" }
|
||
}
|
||
```
|
||
|
||
#### 3.1.3 各事件 data 字段设计
|
||
|
||
| 事件 | data 关键字段 | 说明 |
|
||
|------|--------------|------|
|
||
| `dialysis_record.created` | patient_id, dialysis_type, status, dialysis_date, duration, ultrafiltration_volume | 新建透析记录 |
|
||
| `dialysis_record.reviewed` | patient_id, reviewer_id, dialysis_type, complication_notes | 医生审核完成 |
|
||
| `diagnosis.created` | patient_id, icd_code, diagnosis_name, diagnosis_type, severity, diagnosed_at | 新诊断录入 |
|
||
| `diagnosis.updated` | patient_id, changed_fields[], old_values{}, new_values{} | 诊断信息变更(含 diff) |
|
||
| `consent.granted` | patient_id, consent_type, consent_scope, granted_by, expires_at | 知情同意签署 |
|
||
| `consent.revoked` | patient_id, consent_type, revoked_by, reason | 知情同意撤销 |
|
||
| `article.published` | title, author_id, category_id, tags[] | 文章审核通过发布 |
|
||
| `article.rejected` | title, reviewer_id, reason | 文章审核驳回 |
|
||
| `daily_monitoring.created` | patient_id, monitoring_date, monitoring_type, values{} | 日常监测数据录入 |
|
||
| `points.earned` | patient_id, points, source_type, source_id, balance_after | 积分获得 |
|
||
| `points.exchanged` | patient_id, points, product_name, order_id, balance_after | 积分兑换 |
|
||
|
||
### 3.2 Outbox relay 优化
|
||
|
||
#### 3.2.1 PostgreSQL LISTEN/NOTIFY 替代轮询
|
||
|
||
**当前**: 5 秒轮询 `domain_events` 表(`outbox.rs` 第 26-32 行)
|
||
|
||
**优化方案**:
|
||
|
||
1. 在 `EventBus::publish()` 持久化事件后执行 `NOTIFY`:
|
||
|
||
```rust
|
||
// erp-core/src/events.rs publish() 末尾添加
|
||
let notify_sql = format!("NOTIFY outbox_channel, '{}'", event.id);
|
||
sqlx::query(¬ify_sql).execute(db).await.ok();
|
||
```
|
||
|
||
2. Outbox relay 使用 `LISTEN` + 30 秒兜底轮询:
|
||
|
||
```rust
|
||
let mut listener = PgListener::connect_with(&db).await?;
|
||
listener.listen("outbox_channel").await?;
|
||
loop {
|
||
tokio::select! {
|
||
_ = listener.recv() => { process_pending_events(&db, &event_bus).await.ok(); }
|
||
_ = tokio::time::sleep(Duration::from_secs(30)) => {
|
||
process_pending_events(&db, &event_bus).await.ok();
|
||
}
|
||
}
|
||
}
|
||
```
|
||
|
||
**收益**: 事件延迟 0-5s → <100ms,DB 轮询压力降低 6x。**复杂度**: 低。
|
||
|
||
#### 3.2.2 domain_events 表清理
|
||
|
||
**方案**: 按月分区 + 90 天归档
|
||
|
||
```sql
|
||
CREATE TABLE domain_events_new (LIKE domain_events INCLUDING ALL)
|
||
PARTITION BY RANGE (created_at);
|
||
-- 按月创建分区
|
||
CREATE TABLE domain_events_2026_04 PARTITION OF domain_events_new
|
||
FOR VALUES FROM ('2026-04-01') TO ('2026-05-01');
|
||
```
|
||
|
||
已 published 且 > 90 天的事件迁移到 `domain_events_archive` 表。
|
||
|
||
### 3.3 事件 schema 版本化
|
||
|
||
在 payload 中嵌入 `schema_version` 字段,消费者按 `event_type` + `schema_version` 路由:
|
||
|
||
```rust
|
||
fn handle_event(event: &DomainEvent) {
|
||
let version = event.payload["schema_version"].as_str().unwrap_or("v1");
|
||
match (event.event_type.as_str(), version) {
|
||
("dialysis_record.created", "v1") => handle_v1(event),
|
||
("dialysis_record.created", "v2") => handle_v2(event),
|
||
_ => tracing::warn!("Unknown event version"),
|
||
}
|
||
}
|
||
```
|
||
|
||
**演进规则**: 新增字段兼容(不升版),删除/重命名字段不兼容(升版)。
|
||
|
||
### 3.4 事件幂等性保证
|
||
|
||
消费者维护 `processed_events` 去重表:
|
||
|
||
```sql
|
||
CREATE TABLE processed_events (
|
||
event_id UUID NOT NULL,
|
||
consumer_id VARCHAR(64) NOT NULL,
|
||
processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||
PRIMARY KEY (event_id, consumer_id)
|
||
);
|
||
```
|
||
|
||
流程: 收到事件 → 查已处理 → 已存在则跳过 → 否则执行业务 + 插入记录。7 天 TTL 定期清理。
|
||
|
||
## 4. 实施步骤
|
||
|
||
### Phase 1: P0 事件补发(预估 2 天)
|
||
|
||
| 步骤 | 任务 | 修改文件 |
|
||
|------|------|----------|
|
||
| 1.1 | dialysis_service 添加 created/reviewed 事件 | `dialysis_service.rs` |
|
||
| 1.2 | diagnosis_service 添加 created/updated 事件 | `diagnosis_service.rs` |
|
||
| 1.3 | 验证: 事件发布 + payload 格式正确 | - |
|
||
|
||
### Phase 2: P1 事件补发(预估 1-2 天)
|
||
|
||
| 步骤 | 任务 | 修改文件 |
|
||
|------|------|----------|
|
||
| 2.1 | consent_service 添加 granted/revoked 事件 | `consent_service.rs` |
|
||
| 2.2 | article_service 添加 published/rejected 事件 | `article_service.rs` |
|
||
| 2.3 | 验证: 事件发布正确触发 | - |
|
||
|
||
### Phase 3: P2 事件补发(预估 1 天)
|
||
|
||
| 步骤 | 任务 | 修改文件 |
|
||
|------|------|----------|
|
||
| 3.1 | daily_monitoring_service 添加 created 事件 | `daily_monitoring_service.rs` |
|
||
| 3.2 | points_service 添加 earned/exchanged 事件 | `points_service.rs` |
|
||
| 3.3 | 验证: 积分变动事件触发 | - |
|
||
|
||
### Phase 4: 基础设施优化(预估 2-3 天)
|
||
|
||
| 步骤 | 任务 | 修改文件 |
|
||
|------|------|----------|
|
||
| 4.1 | Outbox relay 改用 LISTEN/NOTIFY | `outbox.rs`, `events.rs` |
|
||
| 4.2 | 添加事件 schema_version 字段 | 所有事件发布处 |
|
||
| 4.3 | 创建 processed_events 去重表 | migration |
|
||
| 4.4 | domain_events 按月分区 + 清理策略 | migration + 后台任务 |
|
||
| 4.5 | 验证: 事件延迟 < 100ms + 去重测试 | - |
|
||
|
||
## 5. 风险与缓解
|
||
|
||
### 5.1 LISTEN/NOTIFY 连接管理
|
||
|
||
**风险**: PostgreSQL LISTEN 使用独立连接,连接断开需重建。
|
||
**缓解**: `sqlx::PgListener` 自动重连 + 30 秒兜底轮询确保不遗漏。
|
||
|
||
### 5.2 事件发布失败
|
||
|
||
**风险**: `event_bus.publish()` 失败但业务操作已提交。
|
||
**缓解**: 两阶段发布已处理 — 事件写入 pending,outbox relay 重发。publish 失败仅 warn 日志,不阻塞业务。
|
||
|
||
### 5.3 去重表增长
|
||
|
||
**风险**: `processed_events` 表快速增长。
|
||
**缓解**: 7 天 TTL 定期清理,或使用 Redis SET NX + TTL 替代。
|
||
|
||
### 5.4 Schema 演进兼容性
|
||
|
||
**风险**: 新版本消费者无法处理老版本事件。
|
||
**缓解**: 消费者必须支持 N-1 版本 schema。升版前确保所有消费者已升级。
|
||
|
||
## 6. 已有事件 vs 缺失事件汇总
|
||
|
||
### 已发布事件(8 个模块)
|
||
|
||
| 模块 | 事件类型 | 触发位置 |
|
||
|------|----------|----------|
|
||
| patient | `patient.created`, `patient.updated` | `patient_service.rs` |
|
||
| appointment | `appointment.created`, `appointment.status_changed` | `appointment_service.rs` |
|
||
| follow_up | `follow_up_task.created`, `follow_up_task.status_changed`, `follow_up_record.completed` | `follow_up_service.rs` |
|
||
| consultation | `consultation_session.created`, `consultation_session.status_changed` | `consultation_service.rs` |
|
||
| health_data | `vital_signs.created`, `lab_report.uploaded` | `health_data_service.rs` |
|
||
| alert | `alert.triggered` | `alert_engine.rs` |
|
||
| device | `device.readings.synced` | `device_reading_service.rs` |
|
||
| doctor | `doctor.schedule.updated` | `doctor_service.rs` |
|
||
|
||
### 待补发事件(6 个模块,11 个事件)
|
||
|
||
| 模块 | 事件类型 | 优先级 |
|
||
|------|----------|--------|
|
||
| dialysis | `dialysis_record.created`, `dialysis_record.reviewed` | P0 |
|
||
| diagnosis | `diagnosis.created`, `diagnosis.updated` | P0 |
|
||
| consent | `consent.granted`, `consent.revoked` | P1 |
|
||
| article | `article.published`, `article.rejected` | P1 |
|
||
| daily_monitoring | `daily_monitoring.created` | P2 |
|
||
| points | `points.earned`, `points.exchanged` | P2 |
|