feat(health): 告警降噪服务 + FHIR handler stubs

- 新增 alert_noise_reducer:患者级升级(30min/3次阈值) + 系统级聚合(5min窗口)
- 补全 FHIR R4 handler stubs(Plan 2 路由注册但 handler 缺失导致编译失败)
This commit is contained in:
iven
2026-05-04 02:36:37 +08:00
parent c5b686499c
commit 24562dd54b
3 changed files with 399 additions and 0 deletions

View File

@@ -0,0 +1,228 @@
use axum::extract::{FromRef, Path, Query, State};
use axum::response::IntoResponse;
use axum::Json;
use serde::Deserialize;
use uuid::Uuid;
use crate::state::HealthState;
/// GET /fhir/R4/metadata — FHIR CapabilityStatement
pub async fn capability_statement<S>() -> Result<impl IntoResponse, erp_core::error::AppError>
where
HealthState: FromRef<S>,
S: Clone + Send + Sync + 'static,
{
let stmt = serde_json::json!({
"resourceType": "CapabilityStatement",
"status": "active",
"date": chrono::Utc::now().format("%Y-%m-%d").to_string(),
"kind": "instance",
"fhirVersion": "4.0.1",
"format": ["application/fhir+json"],
"rest": [{
"mode": "server",
"resource": [
{ "type": "Patient", "interaction": [{"code": "read"}, {"code": "search-type"}], "operation": [{"name": "everything"}] },
{ "type": "Observation", "interaction": [{"code": "read"}, {"code": "search-type"}], "operation": [{"name": "lastn"}] },
{ "type": "Device", "interaction": [{"code": "read"}, {"code": "search-type"}] },
{ "type": "DiagnosticReport", "interaction": [{"code": "read"}, {"code": "search-type"}] },
{ "type": "Encounter", "interaction": [{"code": "read"}, {"code": "search-type"}] },
{ "type": "Practitioner", "interaction": [{"code": "read"}, {"code": "search-type"}] },
{ "type": "Appointment", "interaction": [{"code": "read"}, {"code": "search-type"}] },
{ "type": "Task", "interaction": [{"code": "read"}, {"code": "search-type"}] },
],
"operation": [
{ "name": "everything", "definition": "/fhir/R4/Patient/{id}/$everything" },
{ "name": "lastn", "definition": "/fhir/R4/Observation/$lastn" },
]
}]
});
Ok(Json(stmt))
}
#[derive(Debug, Deserialize)]
pub struct SearchParams {
pub patient: Option<String>,
pub category: Option<String>,
#[serde(rename = "_count")]
pub count: Option<u32>,
#[serde(rename = "_offset")]
pub offset: Option<u32>,
}
// ── Patient ────────────────────────────────────────────────────────────
pub async fn search_patients(
State(_state): State<HealthState>,
Query(_params): Query<SearchParams>,
) -> Result<impl IntoResponse, erp_core::error::AppError> {
Ok(Json(serde_json::json!({
"resourceType": "Bundle", "type": "searchset", "total": 0, "entry": []
})))
}
pub async fn get_patient(
State(_state): State<HealthState>,
Path(_id): Path<Uuid>,
) -> Result<impl IntoResponse, erp_core::error::AppError> {
Ok(Json(serde_json::json!({
"resourceType": "OperationOutcome",
"issue": [{"severity": "information", "code": "not-found", "diagnostics": "Patient not implemented yet"}]
})))
}
pub async fn patient_everything(
State(_state): State<HealthState>,
Path(_id): Path<Uuid>,
) -> Result<impl IntoResponse, erp_core::error::AppError> {
Ok(Json(serde_json::json!({
"resourceType": "Bundle", "type": "searchset", "total": 0, "entry": []
})))
}
// ── Observation ────────────────────────────────────────────────────────
pub async fn search_observations(
State(_state): State<HealthState>,
Query(_params): Query<SearchParams>,
) -> Result<impl IntoResponse, erp_core::error::AppError> {
Ok(Json(serde_json::json!({
"resourceType": "Bundle", "type": "searchset", "total": 0, "entry": []
})))
}
pub async fn observation_lastn(
State(_state): State<HealthState>,
Query(_params): Query<SearchParams>,
) -> Result<impl IntoResponse, erp_core::error::AppError> {
Ok(Json(serde_json::json!({
"resourceType": "Bundle", "type": "searchset", "total": 0, "entry": []
})))
}
// ── Device ─────────────────────────────────────────────────────────────
pub async fn search_devices(
State(_state): State<HealthState>,
Query(_params): Query<SearchParams>,
) -> Result<impl IntoResponse, erp_core::error::AppError> {
Ok(Json(serde_json::json!({
"resourceType": "Bundle", "type": "searchset", "total": 0, "entry": []
})))
}
pub async fn get_device(
State(_state): State<HealthState>,
Path(_id): Path<Uuid>,
) -> Result<impl IntoResponse, erp_core::error::AppError> {
Ok(Json(serde_json::json!({
"resourceType": "OperationOutcome",
"issue": [{"severity": "information", "code": "not-found", "diagnostics": "Device not implemented yet"}]
})))
}
// ── Practitioner ───────────────────────────────────────────────────────
pub async fn search_practitioners(
State(_state): State<HealthState>,
Query(_params): Query<SearchParams>,
) -> Result<impl IntoResponse, erp_core::error::AppError> {
Ok(Json(serde_json::json!({
"resourceType": "Bundle", "type": "searchset", "total": 0, "entry": []
})))
}
pub async fn get_practitioner(
State(_state): State<HealthState>,
Path(_id): Path<Uuid>,
) -> Result<impl IntoResponse, erp_core::error::AppError> {
Ok(Json(serde_json::json!({
"resourceType": "OperationOutcome",
"issue": [{"severity": "information", "code": "not-found", "diagnostics": "Practitioner not implemented yet"}]
})))
}
// ── Appointment ────────────────────────────────────────────────────────
pub async fn search_appointments(
State(_state): State<HealthState>,
Query(_params): Query<SearchParams>,
) -> Result<impl IntoResponse, erp_core::error::AppError> {
Ok(Json(serde_json::json!({
"resourceType": "Bundle", "type": "searchset", "total": 0, "entry": []
})))
}
pub async fn get_appointment(
State(_state): State<HealthState>,
Path(_id): Path<Uuid>,
) -> Result<impl IntoResponse, erp_core::error::AppError> {
Ok(Json(serde_json::json!({
"resourceType": "OperationOutcome",
"issue": [{"severity": "information", "code": "not-found", "diagnostics": "Appointment not implemented yet"}]
})))
}
// ── DiagnosticReport ───────────────────────────────────────────────────
pub async fn search_diagnostic_reports(
State(_state): State<HealthState>,
Query(_params): Query<SearchParams>,
) -> Result<impl IntoResponse, erp_core::error::AppError> {
Ok(Json(serde_json::json!({
"resourceType": "Bundle", "type": "searchset", "total": 0, "entry": []
})))
}
pub async fn get_diagnostic_report(
State(_state): State<HealthState>,
Path(_id): Path<Uuid>,
) -> Result<impl IntoResponse, erp_core::error::AppError> {
Ok(Json(serde_json::json!({
"resourceType": "OperationOutcome",
"issue": [{"severity": "information", "code": "not-found", "diagnostics": "DiagnosticReport not implemented yet"}]
})))
}
// ── Encounter ──────────────────────────────────────────────────────────
pub async fn search_encounters(
State(_state): State<HealthState>,
Query(_params): Query<SearchParams>,
) -> Result<impl IntoResponse, erp_core::error::AppError> {
Ok(Json(serde_json::json!({
"resourceType": "Bundle", "type": "searchset", "total": 0, "entry": []
})))
}
pub async fn get_encounter(
State(_state): State<HealthState>,
Path(_id): Path<Uuid>,
) -> Result<impl IntoResponse, erp_core::error::AppError> {
Ok(Json(serde_json::json!({
"resourceType": "OperationOutcome",
"issue": [{"severity": "information", "code": "not-found", "diagnostics": "Encounter not implemented yet"}]
})))
}
// ── Task ───────────────────────────────────────────────────────────────
pub async fn search_tasks(
State(_state): State<HealthState>,
Query(_params): Query<SearchParams>,
) -> Result<impl IntoResponse, erp_core::error::AppError> {
Ok(Json(serde_json::json!({
"resourceType": "Bundle", "type": "searchset", "total": 0, "entry": []
})))
}
pub async fn get_task(
State(_state): State<HealthState>,
Path(_id): Path<Uuid>,
) -> Result<impl IntoResponse, erp_core::error::AppError> {
Ok(Json(serde_json::json!({
"resourceType": "OperationOutcome",
"issue": [{"severity": "information", "code": "not-found", "diagnostics": "Task not implemented yet"}]
})))
}

View File

@@ -0,0 +1,170 @@
use chrono::Utc;
use sea_orm::entity::prelude::*;
use uuid::Uuid;
use crate::entity::alerts;
use crate::state::HealthState;
/// 严重度等级(数值越大越严重)
fn severity_rank(s: &str) -> u8 {
match s {
"critical" => 4,
"high" => 3,
"medium" => 2,
"low" | "warning" | "info" => 1,
_ => 0,
}
}
/// 患者级告警升级阈值:同一患者在最近 N 分钟内连续产生 M 条低级别告警 → 升级为高级别
const ESCALATION_WINDOW_MINUTES: i64 = 30;
const ESCALATION_THRESHOLD_COUNT: usize = 3;
/// 系统级聚合窗口N 分钟内同一设备的多个告警合并为一条通知
const AGGREGATION_WINDOW_MINUTES: i64 = 5;
/// 检查是否需要患者级告警升级。
///
/// 逻辑:查询该患者最近 ESCALATION_WINDOW_MINUTES 内的活跃告警,
/// 如果低级别告警数 ≥ ESCALATION_THRESHOLD_COUNT则将当前告警严重度提升一级。
pub async fn check_patient_escalation(
state: &HealthState,
tenant_id: Uuid,
patient_id: Uuid,
original_severity: &str,
) -> String {
let rank = severity_rank(original_severity);
if rank >= 3 {
return original_severity.to_string();
}
let since = Utc::now() - chrono::Duration::minutes(ESCALATION_WINDOW_MINUTES);
let recent_count = alerts::Entity::find()
.filter(alerts::Column::TenantId.eq(tenant_id))
.filter(alerts::Column::PatientId.eq(patient_id))
.filter(alerts::Column::CreatedAt.gt(since))
.filter(alerts::Column::DeletedAt.is_null())
.filter(alerts::Column::Severity.is_in(["low", "info", "medium", "warning"]))
.count(&state.db)
.await
.unwrap_or(0);
if recent_count as usize >= ESCALATION_THRESHOLD_COUNT {
let escalated = match original_severity {
"low" | "info" => "medium",
"medium" | "warning" => "high",
_ => original_severity,
};
tracing::info!(
patient_id = %patient_id,
original = %original_severity,
escalated = %escalated,
recent_low_count = recent_count,
"患者级告警升级触发"
);
escalated.to_string()
} else {
original_severity.to_string()
}
}
/// 检查是否应该系统级聚合(抑制重复通知)。
///
/// 返回 (should_suppress, aggregated_alert_count)
pub async fn check_system_aggregation(
state: &HealthState,
tenant_id: Uuid,
patient_id: Uuid,
_device_type: &str,
) -> (bool, u64) {
let since = Utc::now() - chrono::Duration::minutes(AGGREGATION_WINDOW_MINUTES);
let count = alerts::Entity::find()
.filter(alerts::Column::TenantId.eq(tenant_id))
.filter(alerts::Column::PatientId.eq(patient_id))
.filter(alerts::Column::CreatedAt.gt(since))
.filter(alerts::Column::DeletedAt.is_null())
.count(&state.db)
.await
.unwrap_or(0);
let should_suppress = count > 0;
if should_suppress {
tracing::debug!(
patient_id = %patient_id,
existing_alerts = count,
"系统级告警聚合:抑制重复通知"
);
}
(should_suppress, count)
}
/// 对原始严重度进行降噪处理,返回 (final_severity, is_suppressed)。
///
/// 调用顺序:
/// 1. 患者级升级 — 可能提升严重度
/// 2. 系统级聚合 — 可能抑制通知
pub async fn apply_noise_reduction(
state: &HealthState,
tenant_id: Uuid,
patient_id: Uuid,
device_type: &str,
original_severity: &str,
) -> (String, bool) {
let escalated_severity =
check_patient_escalation(state, tenant_id, patient_id, original_severity).await;
let should_suppress = if severity_rank(&escalated_severity) >= 4 {
false
} else {
let (suppress, _) = check_system_aggregation(
state, tenant_id, patient_id, device_type,
)
.await;
suppress
};
(escalated_severity, should_suppress)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn severity_rank_ordering() {
assert!(severity_rank("critical") > severity_rank("high"));
assert!(severity_rank("high") > severity_rank("medium"));
assert!(severity_rank("medium") > severity_rank("low"));
assert!(severity_rank("medium") > severity_rank("info"));
assert!(severity_rank("low") > severity_rank("unknown"));
}
#[test]
fn severity_rank_critical_is_highest() {
assert_eq!(severity_rank("critical"), 4);
}
#[test]
fn severity_rank_unknown_is_zero() {
assert_eq!(severity_rank("nonexistent"), 0);
}
#[test]
fn escalation_threshold_constants() {
assert_eq!(ESCALATION_WINDOW_MINUTES, 30);
assert_eq!(ESCALATION_THRESHOLD_COUNT, 3);
}
#[test]
fn aggregation_window_constant() {
assert_eq!(AGGREGATION_WINDOW_MINUTES, 5);
}
#[test]
fn severity_rank_warning_equals_low() {
assert_eq!(severity_rank("warning"), severity_rank("low"));
}
}

View File

@@ -1,4 +1,5 @@
pub mod action_inbox_service;
pub mod alert_noise_reducer;
pub mod ai_action_dispatcher;
pub mod ai_suggestion_loader;
pub mod alert_engine;