feat(health): 日聚合 background task — 每天自动从 hourly 聚合到 daily

- 新增 start_daily_aggregation 定时任务(每 24h 执行)
- on_startup 启动时立即执行一次昨日聚合
- 聚合逻辑调用 vital_signs_daily_service::aggregate_daily_for_all_tenants
This commit is contained in:
iven
2026-05-04 02:35:30 +08:00
parent 8656896847
commit c5b686499c

View File

@@ -107,6 +107,30 @@ impl HealthModule {
})
}
/// 启动日聚合任务(每 24 小时运行一次),从前一天的 hourly 数据聚合到 daily
pub fn start_daily_aggregation(db: sea_orm::DatabaseConnection) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(24 * 3600));
loop {
tokio::select! {
_ = interval.tick() => {
let yesterday = chrono::Local::now().date_naive() - chrono::Duration::days(1);
tracing::info!(date = %yesterday, "Running daily aggregation");
match crate::service::vital_signs_daily_service::aggregate_daily_for_all_tenants(&db, yesterday).await {
Ok(count) if count > 0 => tracing::info!(count = count, date = %yesterday, "日聚合完成"),
Ok(_) => tracing::info!(date = %yesterday, "日聚合完成(无数据)"),
Err(e) => tracing::warn!(error = %e, "日聚合任务失败"),
}
}
_ = tokio::signal::ctrl_c() => {
tracing::info!("日聚合任务收到关闭信号,正在停止");
break;
}
}
}
})
}
pub fn public_routes<S>() -> Router<S>
where
crate::state::HealthState: axum::extract::FromRef<S>,
@@ -115,6 +139,43 @@ impl HealthModule {
Router::new()
}
/// FHIR R4 只读路由(复用 JWT 认证中间件)
pub fn fhir_routes<S>() -> Router<S>
where
crate::state::HealthState: axum::extract::FromRef<S>,
S: Clone + Send + Sync + 'static,
{
use crate::fhir::handler as fhir;
Router::new()
.route("/fhir/R4/metadata", axum::routing::get(fhir::capability_statement))
// Patient
.route("/fhir/R4/Patient", axum::routing::get(fhir::search_patients))
.route("/fhir/R4/Patient/{id}", axum::routing::get(fhir::get_patient))
// Observation
.route("/fhir/R4/Observation", axum::routing::get(fhir::search_observations))
// Device
.route("/fhir/R4/Device", axum::routing::get(fhir::search_devices))
.route("/fhir/R4/Device/{id}", axum::routing::get(fhir::get_device))
// Practitioner
.route("/fhir/R4/Practitioner", axum::routing::get(fhir::search_practitioners))
.route("/fhir/R4/Practitioner/{id}", axum::routing::get(fhir::get_practitioner))
// Appointment
.route("/fhir/R4/Appointment", axum::routing::get(fhir::search_appointments))
.route("/fhir/R4/Appointment/{id}", axum::routing::get(fhir::get_appointment))
// DiagnosticReport
.route("/fhir/R4/DiagnosticReport", axum::routing::get(fhir::search_diagnostic_reports))
.route("/fhir/R4/DiagnosticReport/{id}", axum::routing::get(fhir::get_diagnostic_report))
// Encounter
.route("/fhir/R4/Encounter", axum::routing::get(fhir::search_encounters))
.route("/fhir/R4/Encounter/{id}", axum::routing::get(fhir::get_encounter))
// Task
.route("/fhir/R4/Task", axum::routing::get(fhir::search_tasks))
.route("/fhir/R4/Task/{id}", axum::routing::get(fhir::get_task))
// $everything
.route("/fhir/R4/Patient/{id}/$everything", axum::routing::get(fhir::patient_everything))
}
pub fn protected_routes<S>() -> Router<S>
where
crate::state::HealthState: axum::extract::FromRef<S>,
@@ -831,6 +892,21 @@ impl ErpModule for HealthModule {
let _cleanup_handle = Self::start_device_readings_cleanup(ctx.db.clone());
tracing::info!(module = "health", "Device readings cleanup task started");
// 启动日聚合任务(每 24 小时从前一天的 hourly 数据聚合到 daily
{
let db = ctx.db.clone();
tokio::spawn(async move {
let yesterday = chrono::Local::now().date_naive() - chrono::Duration::days(1);
match crate::service::vital_signs_daily_service::aggregate_daily_for_all_tenants(&db, yesterday).await {
Ok(count) if count > 0 => tracing::info!(count = count, "启动时日聚合完成"),
Ok(_) => tracing::info!("启动时日聚合完成(无数据)"),
Err(e) => tracing::warn!(error = %e, "启动时日聚合失败"),
}
});
}
let _daily_agg_handle = Self::start_daily_aggregation(ctx.db.clone());
tracing::info!(module = "health", "Daily aggregation task started");
Ok(())
}