Files
hms/crates/erp-server/src/outbox.rs
iven 6d5a711d2c
Some checks failed
CI / rust-check (push) Has been cancelled
CI / rust-test (push) Has been cancelled
CI / frontend-build (push) Has been cancelled
CI / security-audit (push) Has been cancelled
fix: 修复测试发现的 7 个问题 + 全 workspace clippy 清零
功能修复:
1. 患者创建空名称验证:后端添加 name.trim().is_empty() 检查
2. 仪表盘统计容错:单个查询失败返回零值而非 500
3. FHIR 路由修复:从 /fhir 移到 /api/v1/fhir 保持一致
4. 冻结模块后端中间件:新增 frozen_module_middleware 拦截冻结路径
5. 积分端点权限码:health.health-data.list → health.points.list
6. 角色权限迁移:护士补充 devices.list,运营补充 points.list/manage
7. 测试结果文档:R01-R05 角色测试 + T00/T10 结果归档

Clippy 全 workspace 清零(14→0 errors):
- erp-core: 修复 empty doc line、collapsible if、redundant closure 等 9 处
- erp-health: 修复 too_many_arguments、unused var、unnecessary parens 等 58 处
- erp-ai: 修复 dead_code、unused import 等 11 处
- erp-plugin: 修复 too_many_arguments、wildcard pattern 等 11 处
- erp-server-migration: 修复 enum_variant_names 5 处
- erp-auth/config/workflow/message: 各 1-3 处

工程改进:
- lint-staged 配置迁移到 .lintstagedrc.js(函数式避免文件列表传给 clippy)
- cargo fmt 统一格式化
2026-05-07 23:43:14 +08:00

138 lines
5.0 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
use chrono::Utc;
use sea_orm::{
ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter, QueryOrder, QuerySelect, Set,
};
use sqlx::postgres::PgListener;
use std::time::Duration;
use erp_core::entity::domain_event;
use erp_core::events::{DomainEvent, EventBus};
const MAX_RETRY: i32 = 5;
const FALLBACK_POLL_INTERVAL_SECS: u64 = 30;
const NOTIFY_CHANNEL: &str = "outbox_channel";
const RECONNECT_DELAY_SECS: u64 = 5;
/// 启动 outbox relay 后台任务。
///
/// 先执行一次性扫描(处理服务重启前遗留的 pending 事件),
/// 然后通过 PostgreSQL LISTEN/NOTIFY 监听新事件,配合 30s 兜底轮询。
pub fn start_outbox_relay(
db: sea_orm::DatabaseConnection,
event_bus: EventBus,
database_url: String,
) {
let db_clone = db.clone();
let event_bus_clone = event_bus.clone();
let url = database_url.clone();
tokio::spawn(async move {
// 启动时立即处理一次(恢复重启前未广播的事件)
match process_pending_events(&db_clone, &event_bus_clone).await {
Ok(count) if count > 0 => tracing::info!(count = count, "启动时 outbox relay 恢复完成"),
Ok(_) => tracing::info!("启动时 outbox relay 无待处理事件"),
Err(e) => tracing::warn!(error = %e, "启动时 outbox relay 处理失败"),
}
// 进入 LISTEN/NOTIFY 主循环(带自动重连)
loop {
if let Err(e) = run_listener(&db_clone, &event_bus_clone, &url).await {
tracing::warn!(error = %e, "PgListener 断开连接,{}s 后重连", RECONNECT_DELAY_SECS);
}
tokio::time::sleep(Duration::from_secs(RECONNECT_DELAY_SECS)).await;
// 重连后执行一次兜底扫描
if let Err(e) = process_pending_events(&db_clone, &event_bus_clone).await {
tracing::warn!(error = %e, "重连后 outbox relay 处理失败");
}
}
});
}
/// 运行 PgListener 监听循环。
///
/// 使用 `tokio::select!` 在 LISTEN 通知和 30s 定时器之间竞争,
/// 确保即使 NOTIFY 丢失也能兜底处理。
async fn run_listener(
db: &sea_orm::DatabaseConnection,
event_bus: &EventBus,
database_url: &str,
) -> Result<(), sqlx::Error> {
let mut listener = PgListener::connect(database_url).await?;
listener.listen(NOTIFY_CHANNEL).await?;
tracing::info!("Outbox relay LISTEN/NOTIFY 已连接,监听 {}", NOTIFY_CHANNEL);
let mut fallback = tokio::time::interval(Duration::from_secs(FALLBACK_POLL_INTERVAL_SECS));
loop {
tokio::select! {
// LISTEN/NOTIFY 通知触发
notification = listener.recv() => {
match notification {
Ok(notif) => {
tracing::debug!(
channel = %notif.channel(),
payload = %notif.payload(),
"收到 outbox NOTIFY"
);
if let Err(e) = process_pending_events(db, event_bus).await {
tracing::warn!(error = %e, "NOTIFY 触发的 outbox 处理失败");
}
}
Err(e) => return Err(e),
}
}
// 30s 兜底轮询
_ = fallback.tick() => {
tracing::debug!("outbox relay 兜底轮询触发");
if let Err(e) = process_pending_events(db, event_bus).await {
tracing::warn!(error = %e, "兜底轮询 outbox 处理失败");
}
}
}
}
}
async fn process_pending_events(
db: &sea_orm::DatabaseConnection,
event_bus: &EventBus,
) -> Result<usize, sea_orm::DbErr> {
let pending = domain_event::Entity::find()
.filter(domain_event::Column::Status.eq("pending"))
.filter(domain_event::Column::Attempts.lt(MAX_RETRY))
.order_by_asc(domain_event::Column::CreatedAt)
.limit(100)
.all(db)
.await?;
if pending.is_empty() {
return Ok(0);
}
let count = pending.len();
tracing::info!(count = count, "处理待发领域事件");
for event_model in pending {
// 重建 DomainEvent 并广播(保留原始 ID 和时间戳)
let domain_event = DomainEvent {
id: event_model.id,
event_type: event_model.event_type.clone(),
tenant_id: event_model.tenant_id,
payload: event_model.payload.clone().unwrap_or(serde_json::json!({})),
timestamp: event_model.created_at,
correlation_id: event_model.correlation_id.unwrap_or(event_model.id),
};
event_bus.broadcast(domain_event);
// 标记为 published增加 attempts 计数
let mut active: domain_event::ActiveModel = event_model.into();
active.status = Set("published".to_string());
active.published_at = Set(Some(Utc::now()));
active.attempts = Set(erp_core::sea_orm_ext::bump_version(&active.attempts));
active.update(db).await?;
}
Ok(count)
}