use std::time::Duration; /// 启动事件清理后台任务。 /// /// 每日执行一次: /// - 调用 `cleanup_old_published_events()` 归档 >7 天的已发布事件 /// - 调用 `cleanup_old_processed_events()` 清理 >7 天的去重记录 pub fn start_event_cleanup(db: sea_orm::DatabaseConnection) { tokio::spawn(async move { let mut interval = tokio::time::interval(Duration::from_secs(86400)); loop { interval.tick().await; if let Err(e) = run_cleanup(&db).await { tracing::warn!(error = %e, "事件清理任务执行失败"); } } }); tracing::info!("事件清理任务已启动(每 24 小时执行一次)"); } async fn run_cleanup(db: &sea_orm::DatabaseConnection) -> Result<(), sea_orm::DbErr> { use sea_orm::ConnectionTrait; // 归档 >7 天的已发布事件 match db .execute_unprepared("SELECT cleanup_old_published_events(7, 1000)") .await { Ok(result) => { tracing::info!( rows_affected = result.rows_affected(), "已发布事件归档完成" ); } Err(e) => tracing::warn!(error = %e, "已发布事件归档失败"), } // 清理 >7 天的去重记录 match db .execute_unprepared("SELECT cleanup_old_processed_events(7, 1000)") .await { Ok(result) => { tracing::info!( rows_affected = result.rows_affected(), "去重记录清理完成" ); } Err(e) => tracing::warn!(error = %e, "去重记录清理失败"), } Ok(()) } /// 启动 DB 连接池 + EventBus 积压指标采样任务。 /// /// 每 30 秒采样一次并导出为 Prometheus gauge: /// - `db_pool_connections_active` — 当前活跃连接数 /// - `db_pool_connections_idle` — 当前空闲连接数 /// - `eventbus_pending_total` — pending 状态的领域事件数 pub fn start_pool_metrics(db: sea_orm::DatabaseConnection) { tokio::spawn(async move { let mut interval = tokio::time::interval(Duration::from_secs(30)); loop { interval.tick().await; sample_pool_metrics(&db).await; sample_eventbus_backlog(&db).await; } }); tracing::info!("DB 连接池 + EventBus 积压指标采样已启动(每 30 秒采样一次)"); } async fn sample_pool_metrics(db: &sea_orm::DatabaseConnection) { use sea_orm::FromQueryResult; #[derive(FromQueryResult)] struct CountRow { cnt: i64, } // 通过 pg_stat_activity 查询当前连接数 let stmt = sea_orm::Statement::from_string( sea_orm::DatabaseBackend::Postgres, "SELECT COUNT(*)::bigint AS cnt FROM pg_stat_activity WHERE state = 'active'".to_string(), ); if let Ok(Some(row)) = CountRow::find_by_statement(stmt).one(db).await { metrics::gauge!("db_pool_connections_active").set(row.cnt as f64); } let stmt = sea_orm::Statement::from_string( sea_orm::DatabaseBackend::Postgres, "SELECT COUNT(*)::bigint AS cnt FROM pg_stat_activity WHERE state = 'idle'".to_string(), ); if let Ok(Some(row)) = CountRow::find_by_statement(stmt).one(db).await { metrics::gauge!("db_pool_connections_idle").set(row.cnt as f64); } } async fn sample_eventbus_backlog(db: &sea_orm::DatabaseConnection) { use sea_orm::FromQueryResult; #[derive(FromQueryResult)] struct CountRow { cnt: i64, } let stmt = sea_orm::Statement::from_string( sea_orm::DatabaseBackend::Postgres, "SELECT COUNT(*)::bigint AS cnt FROM domain_events WHERE status = 'pending'".to_string(), ); match CountRow::find_by_statement(stmt).one(db).await { Ok(Some(row)) => { metrics::gauge!("eventbus_pending_total").set(row.cnt as f64); } _ => { tracing::debug!("EventBus 积压采样:无法获取 pending 事件数"); } } }