修正 spec review 发现的问题: - C-1: TestDb 实际是本地 PostgreSQL 隔离,非 Testcontainers - C-2: E2E 已有 4 spec/10 测试,非零测试 - 补充 6 个遗漏的 service(alert/daily_monitoring/critical_value_threshold 等) - 增加 Phase 0 基础设施搭建 - 修正 CI 配置(增加 PostgreSQL service、验证链) - 补充 5 个遗漏风险项和回退策略 - 统一"全量 80%"目标的准确含义
108 lines
4.6 KiB
Rust
108 lines
4.6 KiB
Rust
use erp_core::events::EventBus;
|
||
use uuid::Uuid;
|
||
|
||
/// 兼容旧签名 — 不做任何实际订阅(逻辑已迁移到 on_startup)
|
||
pub fn register_handlers(_bus: &EventBus) {
|
||
// 事件处理器已迁移到 on_startup → register_handlers_with_state
|
||
}
|
||
|
||
/// 带 HealthState 的事件订阅 — 在模块 on_startup 时调用
|
||
pub fn register_handlers_with_state(state: crate::state::HealthState) {
|
||
// workflow.task.completed → 更新随访任务状态为 completed
|
||
let (mut workflow_rx, _wf_handle) = state.event_bus.subscribe_filtered("workflow.task.".to_string());
|
||
let db = state.db.clone();
|
||
tokio::spawn(async move {
|
||
loop {
|
||
match workflow_rx.recv().await {
|
||
Some(event) if event.event_type == "workflow.task.completed" => {
|
||
// 从 payload 中提取 task_id
|
||
let task_id = event.payload.get("task_id").and_then(|v| v.as_str()).and_then(|s| uuid::Uuid::parse_str(s).ok());
|
||
match task_id {
|
||
Some(task_id) => {
|
||
match crate::service::follow_up_service::complete_task_by_system(
|
||
&db, task_id, event.tenant_id,
|
||
)
|
||
.await
|
||
{
|
||
Ok(()) => {
|
||
tracing::info!(
|
||
event_id = %event.id,
|
||
task_id = %task_id,
|
||
"工作流任务完成 → 随访任务已更新"
|
||
);
|
||
}
|
||
Err(e) => {
|
||
tracing::warn!(
|
||
event_id = %event.id,
|
||
task_id = %task_id,
|
||
error = %e,
|
||
"工作流任务完成 → 随访任务更新失败"
|
||
);
|
||
}
|
||
}
|
||
}
|
||
None => {
|
||
tracing::warn!(
|
||
event_id = %event.id,
|
||
"工作流任务完成事件缺少 task_id,跳过"
|
||
);
|
||
}
|
||
}
|
||
}
|
||
Some(_) => {}
|
||
None => break,
|
||
}
|
||
}
|
||
});
|
||
|
||
// message.sent → 预留:后续联动咨询会话 last_message_at
|
||
let (mut msg_rx, _msg_handle) = state.event_bus.subscribe_filtered("message.".to_string());
|
||
let msg_db = state.db.clone();
|
||
tokio::spawn(async move {
|
||
loop {
|
||
match msg_rx.recv().await {
|
||
Some(event) if event.event_type == "message.sent" => {
|
||
tracing::info!(
|
||
event_id = %event.id,
|
||
"健康模块收到消息发送事件(暂不处理)"
|
||
);
|
||
}
|
||
Some(_) => {}
|
||
None => break,
|
||
}
|
||
}
|
||
});
|
||
|
||
// device.readings.synced → 触发告警引擎评估
|
||
let (mut reading_rx, _reading_handle) = state.event_bus.subscribe_filtered("device.readings.".to_string());
|
||
let eval_state = state.clone();
|
||
tokio::spawn(async move {
|
||
loop {
|
||
match reading_rx.recv().await {
|
||
Some(event) if event.event_type == "device.readings.synced" => {
|
||
let patient_id = event.payload.get("patient_id")
|
||
.and_then(|v| v.as_str())
|
||
.and_then(|s| Uuid::parse_str(s).ok());
|
||
if let Some(pid) = patient_id {
|
||
// 对所有设备类型触发评估
|
||
for device_type in &["heart_rate", "blood_oxygen", "temperature"] {
|
||
if let Err(e) = crate::service::alert_engine::evaluate_rules(
|
||
&eval_state, event.tenant_id, pid, device_type,
|
||
).await {
|
||
tracing::error!(
|
||
patient_id = %pid,
|
||
device_type = device_type,
|
||
error = %e,
|
||
"告警评估失败"
|
||
);
|
||
}
|
||
}
|
||
}
|
||
}
|
||
Some(_) => {}
|
||
None => break,
|
||
}
|
||
}
|
||
});
|
||
}
|