基于全景审计分析,产出 5 份跨领域设计规格: 1. 性能优化 — 后端批量INSERT/合并COUNT/告警预加载 + 前端N+1内联name 2. 安全纵深防御 — PostgreSQL RLS/行级数据范围/session_key Redis/审计哈希链 3. 事件驱动架构增强 — 6个业务域11个缺失事件补发 + Outbox LISTEN/NOTIFY 4. 前端工程化 — 14个大组件拆分 + 3个重复模式统一 + Bundle优化 5. 可观测性与运维 — 深度健康检查/Prometheus/OpenTelemetry/生产Docker
9.7 KiB
事件驱动架构增强设计规格
日期: 2026-04-26 | 状态: draft | 主题: 缺失事件补发 + Outbox relay 优化 + 事件 schema 版本化
1. 背景
HMS 已有完整的事件总线基础设施:
- EventBus (
erp-core/src/events.rs): 两阶段发布(先持久化 pending → 广播 → 更新 published) - Outbox relay (
erp-server/src/outbox.rs): 5 秒轮询 domain_events 表,重发 pending 事件 - domain_events 表: id, tenant_id, event_type, payload, status, attempts, created_at, published_at
已有事件发布的模块: patient, appointment, follow_up, consultation, health_data, alert_engine, device_reading, doctor。
2. 问题分析
2.1 缺失事件清单
以下 6 个业务域的 service 文件中无任何 event_bus.publish 调用:
| 业务域 | Service 文件 | 缺失事件 |
|---|---|---|
| 透析记录 | dialysis_service.rs |
dialysis_record.created/reviewed |
| 诊断 | diagnosis_service.rs |
diagnosis.created/updated |
| 知情同意 | consent_service.rs |
consent.granted/revoked |
| 日常监测 | daily_monitoring_service.rs |
daily_monitoring.created |
| 积分 | points_service.rs |
points.earned/exchanged |
| 资讯文章 | article_service.rs |
article.published/rejected |
2.2 基础设施改进项
| 编号 | 问题 | 影响 |
|---|---|---|
| I-1 | Outbox relay 5 秒轮询延迟高 | 事件从产生到广播最长 5 秒延迟 |
| I-2 | 事件 payload 无 schema 版本 | 消费者无法安全演进,字段增删破坏兼容性 |
| I-3 | 无事件幂等性保证 | 消费者重复消费可能导致业务异常 |
| I-4 | domain_events 表无清理策略 | 表无限增长影响查询性能 |
3. 解决方案
3.1 缺失事件补发
3.1.1 事件优先级排序
| 优先级 | 事件 | 理由 |
|---|---|---|
| P0 | dialysis_record.created/reviewed |
透析是核心医疗流程,需触发统计更新和告警检查 |
| P0 | diagnosis.created/updated |
诊断关联后续治疗方案,影响预约/随访 |
| P1 | consent.granted/revoked |
合规要求,知情同意变更需通知医护 |
| P1 | article.published/rejected |
内容审核流程依赖事件驱动 |
| P2 | daily_monitoring.created |
日常监测触发趋势分析 |
| P2 | points.earned/exchanged |
积分变动通知用户 |
3.1.2 统一事件信封格式
所有事件 payload 遵循统一信封:
{
"schema_version": "v1",
"entity_id": "uuid",
"entity_type": "dialysis_record",
"action": "created",
"tenant_id": "uuid",
"operator_id": "uuid | null",
"timestamp": "ISO 8601",
"data": { /* 实体快照或变更字段 */ },
"metadata": { "source": "erp-health", "trace_id": "uuid" }
}
3.1.3 各事件 data 字段设计
| 事件 | data 关键字段 | 说明 |
|---|---|---|
dialysis_record.created |
patient_id, dialysis_type, status, dialysis_date, duration, ultrafiltration_volume | 新建透析记录 |
dialysis_record.reviewed |
patient_id, reviewer_id, dialysis_type, complication_notes | 医生审核完成 |
diagnosis.created |
patient_id, icd_code, diagnosis_name, diagnosis_type, severity, diagnosed_at | 新诊断录入 |
diagnosis.updated |
patient_id, changed_fields[], old_values{}, new_values{} | 诊断信息变更(含 diff) |
consent.granted |
patient_id, consent_type, consent_scope, granted_by, expires_at | 知情同意签署 |
consent.revoked |
patient_id, consent_type, revoked_by, reason | 知情同意撤销 |
article.published |
title, author_id, category_id, tags[] | 文章审核通过发布 |
article.rejected |
title, reviewer_id, reason | 文章审核驳回 |
daily_monitoring.created |
patient_id, monitoring_date, monitoring_type, values{} | 日常监测数据录入 |
points.earned |
patient_id, points, source_type, source_id, balance_after | 积分获得 |
points.exchanged |
patient_id, points, product_name, order_id, balance_after | 积分兑换 |
3.2 Outbox relay 优化
3.2.1 PostgreSQL LISTEN/NOTIFY 替代轮询
当前: 5 秒轮询 domain_events 表(outbox.rs 第 26-32 行)
优化方案:
- 在
EventBus::publish()持久化事件后执行NOTIFY:
// erp-core/src/events.rs publish() 末尾添加
let notify_sql = format!("NOTIFY outbox_channel, '{}'", event.id);
sqlx::query(¬ify_sql).execute(db).await.ok();
- Outbox relay 使用
LISTEN+ 30 秒兜底轮询:
let mut listener = PgListener::connect_with(&db).await?;
listener.listen("outbox_channel").await?;
loop {
tokio::select! {
_ = listener.recv() => { process_pending_events(&db, &event_bus).await.ok(); }
_ = tokio::time::sleep(Duration::from_secs(30)) => {
process_pending_events(&db, &event_bus).await.ok();
}
}
}
收益: 事件延迟 0-5s → <100ms,DB 轮询压力降低 6x。复杂度: 低。
3.2.2 domain_events 表清理
方案: 按月分区 + 90 天归档
CREATE TABLE domain_events_new (LIKE domain_events INCLUDING ALL)
PARTITION BY RANGE (created_at);
-- 按月创建分区
CREATE TABLE domain_events_2026_04 PARTITION OF domain_events_new
FOR VALUES FROM ('2026-04-01') TO ('2026-05-01');
已 published 且 > 90 天的事件迁移到 domain_events_archive 表。
3.3 事件 schema 版本化
在 payload 中嵌入 schema_version 字段,消费者按 event_type + schema_version 路由:
fn handle_event(event: &DomainEvent) {
let version = event.payload["schema_version"].as_str().unwrap_or("v1");
match (event.event_type.as_str(), version) {
("dialysis_record.created", "v1") => handle_v1(event),
("dialysis_record.created", "v2") => handle_v2(event),
_ => tracing::warn!("Unknown event version"),
}
}
演进规则: 新增字段兼容(不升版),删除/重命名字段不兼容(升版)。
3.4 事件幂等性保证
消费者维护 processed_events 去重表:
CREATE TABLE processed_events (
event_id UUID NOT NULL,
consumer_id VARCHAR(64) NOT NULL,
processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY (event_id, consumer_id)
);
流程: 收到事件 → 查已处理 → 已存在则跳过 → 否则执行业务 + 插入记录。7 天 TTL 定期清理。
4. 实施步骤
Phase 1: P0 事件补发(预估 2 天)
| 步骤 | 任务 | 修改文件 |
|---|---|---|
| 1.1 | dialysis_service 添加 created/reviewed 事件 | dialysis_service.rs |
| 1.2 | diagnosis_service 添加 created/updated 事件 | diagnosis_service.rs |
| 1.3 | 验证: 事件发布 + payload 格式正确 | - |
Phase 2: P1 事件补发(预估 1-2 天)
| 步骤 | 任务 | 修改文件 |
|---|---|---|
| 2.1 | consent_service 添加 granted/revoked 事件 | consent_service.rs |
| 2.2 | article_service 添加 published/rejected 事件 | article_service.rs |
| 2.3 | 验证: 事件发布正确触发 | - |
Phase 3: P2 事件补发(预估 1 天)
| 步骤 | 任务 | 修改文件 |
|---|---|---|
| 3.1 | daily_monitoring_service 添加 created 事件 | daily_monitoring_service.rs |
| 3.2 | points_service 添加 earned/exchanged 事件 | points_service.rs |
| 3.3 | 验证: 积分变动事件触发 | - |
Phase 4: 基础设施优化(预估 2-3 天)
| 步骤 | 任务 | 修改文件 |
|---|---|---|
| 4.1 | Outbox relay 改用 LISTEN/NOTIFY | outbox.rs, events.rs |
| 4.2 | 添加事件 schema_version 字段 | 所有事件发布处 |
| 4.3 | 创建 processed_events 去重表 | migration |
| 4.4 | domain_events 按月分区 + 清理策略 | migration + 后台任务 |
| 4.5 | 验证: 事件延迟 < 100ms + 去重测试 | - |
5. 风险与缓解
5.1 LISTEN/NOTIFY 连接管理
风险: PostgreSQL LISTEN 使用独立连接,连接断开需重建。
缓解: sqlx::PgListener 自动重连 + 30 秒兜底轮询确保不遗漏。
5.2 事件发布失败
风险: event_bus.publish() 失败但业务操作已提交。
缓解: 两阶段发布已处理 — 事件写入 pending,outbox relay 重发。publish 失败仅 warn 日志,不阻塞业务。
5.3 去重表增长
风险: processed_events 表快速增长。
缓解: 7 天 TTL 定期清理,或使用 Redis SET NX + TTL 替代。
5.4 Schema 演进兼容性
风险: 新版本消费者无法处理老版本事件。 缓解: 消费者必须支持 N-1 版本 schema。升版前确保所有消费者已升级。
6. 已有事件 vs 缺失事件汇总
已发布事件(8 个模块)
| 模块 | 事件类型 | 触发位置 |
|---|---|---|
| patient | patient.created, patient.updated |
patient_service.rs |
| appointment | appointment.created, appointment.status_changed |
appointment_service.rs |
| follow_up | follow_up_task.created, follow_up_task.status_changed, follow_up_record.completed |
follow_up_service.rs |
| consultation | consultation_session.created, consultation_session.status_changed |
consultation_service.rs |
| health_data | vital_signs.created, lab_report.uploaded |
health_data_service.rs |
| alert | alert.triggered |
alert_engine.rs |
| device | device.readings.synced |
device_reading_service.rs |
| doctor | doctor.schedule.updated |
doctor_service.rs |
待补发事件(6 个模块,11 个事件)
| 模块 | 事件类型 | 优先级 |
|---|---|---|
| dialysis | dialysis_record.created, dialysis_record.reviewed |
P0 |
| diagnosis | diagnosis.created, diagnosis.updated |
P0 |
| consent | consent.granted, consent.revoked |
P1 |
| article | article.published, article.rejected |
P1 |
| daily_monitoring | daily_monitoring.created |
P2 |
| points | points.earned, points.exchanged |
P2 |