fix(health): 设备数据管线 Phase 1 缺陷修复 + AI 产品策略讨论
- device_readings 批量插入添加 ON CONFLICT 去重唯一索引 - 小程序 BLEManager 增加离线缓存(Storage 持久化 + 启动重传) - 新增 device_readings 90 天数据保留清理定时任务 - 小米手环适配器增加 RACP 历史心率读取支持 - SSE 告警按医生过滤已确认实现(patient_doctor_relation) - 新增 AI 产品策略与设备数据医院场景讨论记录
This commit is contained in:
@@ -84,6 +84,28 @@ impl HealthModule {
|
||||
})
|
||||
}
|
||||
|
||||
/// 启动设备原始数据清理(每 24 小时运行一次),删除超过 90 天的 device_readings
|
||||
pub fn start_device_readings_cleanup(db: sea_orm::DatabaseConnection) -> tokio::task::JoinHandle<()> {
|
||||
tokio::spawn(async move {
|
||||
let mut interval = tokio::time::interval(std::time::Duration::from_secs(24 * 3600));
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = interval.tick() => {
|
||||
match crate::service::device_reading_service::cleanup_stale_readings(&db).await {
|
||||
Ok(count) if count > 0 => tracing::info!(count = count, "设备原始数据清理完成"),
|
||||
Ok(_) => {}
|
||||
Err(e) => tracing::warn!(error = %e, "设备原始数据清理失败"),
|
||||
}
|
||||
}
|
||||
_ = tokio::signal::ctrl_c() => {
|
||||
tracing::info!("设备原始数据清理任务收到关闭信号,正在停止");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
pub fn public_routes<S>() -> Router<S>
|
||||
where
|
||||
crate::state::HealthState: axum::extract::FromRef<S>,
|
||||
@@ -478,7 +500,8 @@ impl HealthModule {
|
||||
)
|
||||
.route(
|
||||
"/health/admin/points/products",
|
||||
axum::routing::post(points_handler::admin_create_product),
|
||||
axum::routing::get(points_handler::admin_list_products)
|
||||
.post(points_handler::admin_create_product),
|
||||
)
|
||||
.route(
|
||||
"/health/admin/points/products/{id}",
|
||||
@@ -713,6 +736,10 @@ impl ErpModule for HealthModule {
|
||||
let _expire_handle = Self::start_points_expiration_checker(ctx.db.clone(), ctx.event_bus.clone());
|
||||
tracing::info!(module = "health", "Points expiration checker started");
|
||||
|
||||
// 启动设备原始数据清理(每 24 小时删除超过 90 天的数据)
|
||||
let _cleanup_handle = Self::start_device_readings_cleanup(ctx.db.clone());
|
||||
tracing::info!(module = "health", "Device readings cleanup task started");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -223,13 +223,26 @@ async fn batch_insert_readings(
|
||||
})
|
||||
.collect();
|
||||
|
||||
let count = models.len() as u64;
|
||||
let total = models.len() as u64;
|
||||
device_readings::Entity::insert_many(models)
|
||||
.on_conflict(
|
||||
sea_orm::sea_query::OnConflict::columns([
|
||||
device_readings::Column::TenantId,
|
||||
device_readings::Column::PatientId,
|
||||
device_readings::Column::DeviceId,
|
||||
device_readings::Column::Metric,
|
||||
device_readings::Column::MeasuredAt,
|
||||
])
|
||||
.do_nothing()
|
||||
.to_owned(),
|
||||
)
|
||||
.exec(db)
|
||||
.await
|
||||
.map_err(|e| HealthError::DbError(e.to_string()))?;
|
||||
|
||||
Ok(count)
|
||||
// ON CONFLICT DO NOTHING 不返回精确插入数,返回提交总数
|
||||
// 调用方通过 BatchResult.duplicates 字段语义不变
|
||||
Ok(total)
|
||||
}
|
||||
|
||||
async fn upsert_hourly_aggregates(
|
||||
@@ -532,3 +545,33 @@ async fn sync_bp_glucose_to_vital_signs(
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// 清理超过 90 天的设备原始数据,分批删除避免长事务
|
||||
pub async fn cleanup_stale_readings(
|
||||
db: &DatabaseConnection,
|
||||
) -> HealthResult<u64> {
|
||||
let cutoff = Utc::now() - chrono::Duration::days(90);
|
||||
let batch_size = 1000i64;
|
||||
let mut total_deleted = 0u64;
|
||||
|
||||
loop {
|
||||
let result = db.execute(sea_orm::Statement::from_sql_and_values(
|
||||
sea_orm::DatabaseBackend::Postgres,
|
||||
"DELETE FROM device_readings WHERE measured_at < $1 AND id IN (SELECT id FROM device_readings WHERE measured_at < $1 LIMIT $2)",
|
||||
[cutoff.into(), (batch_size as i32).into()],
|
||||
)).await;
|
||||
|
||||
match result {
|
||||
Ok(res) => {
|
||||
let rows = res.rows_affected();
|
||||
total_deleted += rows;
|
||||
if rows < batch_size as u64 {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(e) => return Err(HealthError::DbError(e.to_string())),
|
||||
}
|
||||
}
|
||||
|
||||
Ok(total_deleted)
|
||||
}
|
||||
|
||||
@@ -93,6 +93,7 @@ mod m20260428_000090_critical_alerts;
|
||||
mod m20260428_000091_dead_letter_events;
|
||||
mod m20260429_000092_device_readings_metric;
|
||||
mod m20260429_000093_trend_analysis_prompt_v2;
|
||||
mod m20260429_000094_device_readings_unique_constraint;
|
||||
|
||||
pub struct Migrator;
|
||||
|
||||
@@ -193,6 +194,7 @@ impl MigratorTrait for Migrator {
|
||||
Box::new(m20260428_000091_dead_letter_events::Migration),
|
||||
Box::new(m20260429_000092_device_readings_metric::Migration),
|
||||
Box::new(m20260429_000093_trend_analysis_prompt_v2::Migration),
|
||||
Box::new(m20260429_000094_device_readings_unique_constraint::Migration),
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,34 @@
|
||||
//! device_readings 添加去重唯一约束
|
||||
//!
|
||||
// 分区表的唯一约束必须包含分区键 (measured_at)
|
||||
|
||||
use sea_orm_migration::prelude::*;
|
||||
|
||||
#[derive(DeriveMigrationName)]
|
||||
pub struct Migration;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl MigrationTrait for Migration {
|
||||
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
||||
let db = manager.get_connection();
|
||||
|
||||
// 分区表唯一索引必须包含分区键 measured_at
|
||||
// 同一患者、同一设备、同一指标、同一测量时间只允许一条记录
|
||||
db.execute_unprepared(
|
||||
"CREATE UNIQUE INDEX IF NOT EXISTS uq_device_readings_dedup
|
||||
ON device_readings (tenant_id, patient_id, device_id, metric, measured_at);"
|
||||
).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
||||
let db = manager.get_connection();
|
||||
|
||||
db.execute_unprepared(
|
||||
"DROP INDEX IF EXISTS uq_device_readings_dedup;"
|
||||
).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user