feat(health): device_readings 双写 vital_signs — 血压/血糖自动归档
This commit is contained in:
@@ -8,7 +8,7 @@ use uuid::Uuid;
|
||||
use erp_core::events::DomainEvent;
|
||||
use erp_core::types::PaginatedResponse;
|
||||
|
||||
use crate::entity::{device_readings, patient, patient_devices, vital_signs_hourly};
|
||||
use crate::entity::{device_readings, patient, patient_devices, vital_signs, vital_signs_hourly};
|
||||
use crate::error::{HealthError, HealthResult};
|
||||
use crate::service::validation::validate_device_type;
|
||||
use crate::state::HealthState;
|
||||
@@ -118,6 +118,13 @@ pub async fn batch_create_readings(
|
||||
&parsed_readings,
|
||||
).await?;
|
||||
|
||||
// 4.5 双写 vital_signs(血压/血糖自动归档)
|
||||
if let Err(e) = sync_bp_glucose_to_vital_signs(
|
||||
&state.db, tenant_id, patient_id, &parsed_readings,
|
||||
).await {
|
||||
tracing::warn!(error = %e, "双写 vital_signs 失败(不影响主流程)");
|
||||
}
|
||||
|
||||
// 5. 降采样 upsert
|
||||
upsert_hourly_aggregates(
|
||||
&state.db, tenant_id, patient_id, &parsed_readings,
|
||||
@@ -440,3 +447,88 @@ pub async fn query_hourly_readings(
|
||||
total_pages: total.div_ceil(limit.max(1)),
|
||||
})
|
||||
}
|
||||
|
||||
/// 将血压/血糖设备数据同步到 vital_signs 表
|
||||
async fn sync_bp_glucose_to_vital_signs(
|
||||
db: &DatabaseConnection,
|
||||
tenant_id: Uuid,
|
||||
patient_id: Uuid,
|
||||
readings: &[(&ReadingInput, DateTime<Utc>)],
|
||||
) -> HealthResult<()> {
|
||||
let bp_readings: Vec<_> = readings
|
||||
.iter()
|
||||
.filter(|(r, _)| matches!(r.device_type.as_str(), "blood_pressure" | "blood_glucose"))
|
||||
.collect();
|
||||
|
||||
if bp_readings.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let today = Utc::now().date_naive();
|
||||
|
||||
let existing = vital_signs::Entity::find()
|
||||
.filter(vital_signs::Column::TenantId.eq(tenant_id))
|
||||
.filter(vital_signs::Column::PatientId.eq(patient_id))
|
||||
.filter(vital_signs::Column::RecordDate.eq(today))
|
||||
.filter(vital_signs::Column::DeletedAt.is_null())
|
||||
.one(db)
|
||||
.await
|
||||
.map_err(|e| HealthError::DbError(e.to_string()))?;
|
||||
|
||||
let mut model = if let Some(rec) = existing {
|
||||
let mut m: vital_signs::ActiveModel = rec.into();
|
||||
m.updated_at = Set(Utc::now());
|
||||
m.version = Set(m.version.unwrap() + 1);
|
||||
m
|
||||
} else {
|
||||
vital_signs::ActiveModel {
|
||||
id: Set(Uuid::now_v7()),
|
||||
tenant_id: Set(tenant_id),
|
||||
patient_id: Set(patient_id),
|
||||
record_date: Set(today),
|
||||
source: Set("device_auto".into()),
|
||||
created_at: Set(Utc::now()),
|
||||
updated_at: Set(Utc::now()),
|
||||
created_by: Set(None),
|
||||
updated_by: Set(None),
|
||||
deleted_at: Set(None),
|
||||
version: Set(1),
|
||||
..Default::default()
|
||||
}
|
||||
};
|
||||
|
||||
let mut changed = false;
|
||||
|
||||
for (r, _) in &bp_readings {
|
||||
let metric = r.values.get("metric").and_then(|v| v.as_str()).unwrap_or("");
|
||||
let value = r.values.get("value").and_then(|v| v.as_f64()).unwrap_or(0.0);
|
||||
|
||||
match (r.device_type.as_str(), metric) {
|
||||
("blood_pressure", "systolic") => {
|
||||
model.systolic_bp_morning = Set(Some(value.round() as i32));
|
||||
changed = true;
|
||||
}
|
||||
("blood_pressure", "diastolic") => {
|
||||
model.diastolic_bp_morning = Set(Some(value.round() as i32));
|
||||
changed = true;
|
||||
}
|
||||
("blood_glucose", _) => {
|
||||
let dec_val = sea_orm::prelude::Decimal::from_f64_retain(value).unwrap_or_default();
|
||||
model.blood_sugar = Set(Some(dec_val));
|
||||
changed = true;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
if changed {
|
||||
let is_update = matches!(model.version, Set(v) if v > 1);
|
||||
if is_update {
|
||||
model.update(db).await.map_err(|e| HealthError::DbError(e.to_string()))?;
|
||||
} else {
|
||||
model.insert(db).await.map_err(|e| HealthError::DbError(e.to_string()))?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user