P0-1: 危急值告警消费者 — health_data.critical_alert 事件推送给责任医护 P0-2: 危急值阈值可配置化 — 硬编码改为数据库配置(critical_value_threshold表),支持科室/年龄差异化 P0-3: daily_monitoring合并后告警验证 — update_vital_signs也触发危急值检测 P0-4: 随访逾期通知+幂等保护 — 只通知本次新标记的逾期任务,避免重复 P0-5: 知情同意记录(consent) — 新增实体/迁移/Service/Handler,PIPL合规 P0-6: 审计日志补全 — 患者更新记录前后值(过敏史/病史/状态变更) P0-7: EventBus持久化增强 — 两阶段提交(pending→published)+启动时outbox relay恢复
78 lines
2.8 KiB
Rust
78 lines
2.8 KiB
Rust
use chrono::Utc;
|
||
use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter, QueryOrder, QuerySelect, Set};
|
||
use std::time::Duration;
|
||
|
||
use erp_core::entity::domain_event;
|
||
use erp_core::events::{DomainEvent, EventBus};
|
||
|
||
const MAX_RETRY: i32 = 5;
|
||
|
||
/// 启动 outbox relay 后台任务。
|
||
///
|
||
/// 先执行一次性扫描(处理服务重启前遗留的 pending 事件),
|
||
/// 然后每 5 秒定期扫描 domain_events 表中 status = 'pending' 的事件。
|
||
pub fn start_outbox_relay(db: sea_orm::DatabaseConnection, event_bus: EventBus) {
|
||
let db_clone = db.clone();
|
||
let event_bus_clone = event_bus.clone();
|
||
tokio::spawn(async move {
|
||
// 启动时立即处理一次(恢复重启前未广播的事件)
|
||
match process_pending_events(&db_clone, &event_bus_clone).await {
|
||
Ok(count) if count > 0 => tracing::info!(count = count, "启动时 outbox relay 恢复完成"),
|
||
Ok(_) => tracing::info!("启动时 outbox relay 无待处理事件"),
|
||
Err(e) => tracing::warn!(error = %e, "启动时 outbox relay 处理失败"),
|
||
}
|
||
|
||
// 定期轮询
|
||
let mut interval = tokio::time::interval(Duration::from_secs(5));
|
||
loop {
|
||
interval.tick().await;
|
||
if let Err(e) = process_pending_events(&db_clone, &event_bus_clone).await {
|
||
tracing::warn!(error = %e, "Outbox relay 处理失败");
|
||
}
|
||
}
|
||
});
|
||
}
|
||
|
||
async fn process_pending_events(
|
||
db: &sea_orm::DatabaseConnection,
|
||
event_bus: &EventBus,
|
||
) -> Result<usize, sea_orm::DbErr> {
|
||
let pending = domain_event::Entity::find()
|
||
.filter(domain_event::Column::Status.eq("pending"))
|
||
.filter(domain_event::Column::Attempts.lt(MAX_RETRY))
|
||
.order_by_asc(domain_event::Column::CreatedAt)
|
||
.limit(100)
|
||
.all(db)
|
||
.await?;
|
||
|
||
if pending.is_empty() {
|
||
return Ok(0);
|
||
}
|
||
|
||
let count = pending.len();
|
||
tracing::info!(count = count, "处理待发领域事件");
|
||
|
||
for event_model in pending {
|
||
// 重建 DomainEvent 并广播(保留原始 ID 和时间戳)
|
||
let domain_event = DomainEvent {
|
||
id: event_model.id,
|
||
event_type: event_model.event_type.clone(),
|
||
tenant_id: event_model.tenant_id,
|
||
payload: event_model.payload.clone().unwrap_or(serde_json::json!({})),
|
||
timestamp: event_model.created_at,
|
||
correlation_id: event_model.correlation_id.unwrap_or(event_model.id),
|
||
};
|
||
|
||
event_bus.broadcast(domain_event);
|
||
|
||
// 标记为 published,增加 attempts 计数
|
||
let mut active: domain_event::ActiveModel = event_model.into();
|
||
active.status = Set("published".to_string());
|
||
active.published_at = Set(Some(Utc::now()));
|
||
active.attempts = Set(active.attempts.unwrap() + 1);
|
||
active.update(db).await?;
|
||
}
|
||
|
||
Ok(count)
|
||
}
|