- Base platform from base.git (ERP base: auth, core, config, message, workflow, plugin) - Created erp-diary module skeleton (lib.rs, dto.rs, error.rs, event.rs, state.rs) - Integrated erp-diary into workspace and erp-server - Added DiaryModule registration in main.rs - Added DiaryState FromRef in state.rs - Diary routes mounted (empty routes, ready for implementation) - Product design spec v1.2 preserved in docs/ - Implementation plan preserved in plans/ Cargo check: OK Cargo test: OK (78+ base tests passing)
126 lines
4.3 KiB
Rust
126 lines
4.3 KiB
Rust
use std::sync::atomic::{AtomicU64, Ordering};
|
||
use std::sync::Arc;
|
||
use std::time::Duration;
|
||
|
||
fn touch_heartbeat(heartbeat: &Arc<AtomicU64>) {
|
||
let now = std::time::SystemTime::now()
|
||
.duration_since(std::time::UNIX_EPOCH)
|
||
.unwrap_or_default()
|
||
.as_secs();
|
||
heartbeat.store(now, Ordering::Relaxed);
|
||
}
|
||
|
||
/// 启动事件清理后台任务。
|
||
///
|
||
/// 每日执行一次:
|
||
/// - 调用 `cleanup_old_published_events()` 归档 >7 天的已发布事件
|
||
/// - 调用 `cleanup_old_processed_events()` 清理 >7 天的去重记录
|
||
pub fn start_event_cleanup(db: sea_orm::DatabaseConnection, heartbeat: Arc<AtomicU64>) {
|
||
tokio::spawn(async move {
|
||
let mut interval = tokio::time::interval(Duration::from_secs(86400));
|
||
loop {
|
||
interval.tick().await;
|
||
if let Err(e) = run_cleanup(&db).await {
|
||
tracing::warn!(error = %e, "事件清理任务执行失败");
|
||
}
|
||
touch_heartbeat(&heartbeat);
|
||
}
|
||
});
|
||
tracing::info!("事件清理任务已启动(每 24 小时执行一次)");
|
||
}
|
||
|
||
async fn run_cleanup(db: &sea_orm::DatabaseConnection) -> Result<(), sea_orm::DbErr> {
|
||
use sea_orm::ConnectionTrait;
|
||
|
||
// 归档 >7 天的已发布事件
|
||
match db
|
||
.execute_unprepared("SELECT cleanup_old_published_events(7, 1000)")
|
||
.await
|
||
{
|
||
Ok(result) => {
|
||
tracing::info!(rows_affected = result.rows_affected(), "已发布事件归档完成");
|
||
}
|
||
Err(e) => tracing::warn!(error = %e, "已发布事件归档失败"),
|
||
}
|
||
|
||
// 清理 >7 天的去重记录
|
||
match db
|
||
.execute_unprepared("SELECT cleanup_old_processed_events(7, 1000)")
|
||
.await
|
||
{
|
||
Ok(result) => {
|
||
tracing::info!(rows_affected = result.rows_affected(), "去重记录清理完成");
|
||
}
|
||
Err(e) => tracing::warn!(error = %e, "去重记录清理失败"),
|
||
}
|
||
|
||
Ok(())
|
||
}
|
||
|
||
/// 启动 DB 连接池 + EventBus 积压指标采样任务。
|
||
///
|
||
/// 每 30 秒采样一次并导出为 Prometheus gauge:
|
||
/// - `db_pool_connections_active` — 当前活跃连接数
|
||
/// - `db_pool_connections_idle` — 当前空闲连接数
|
||
/// - `eventbus_pending_total` — pending 状态的领域事件数
|
||
pub fn start_pool_metrics(db: sea_orm::DatabaseConnection, heartbeat: Arc<AtomicU64>) {
|
||
tokio::spawn(async move {
|
||
let mut interval = tokio::time::interval(Duration::from_secs(30));
|
||
loop {
|
||
interval.tick().await;
|
||
sample_pool_metrics(&db).await;
|
||
sample_eventbus_backlog(&db).await;
|
||
touch_heartbeat(&heartbeat);
|
||
}
|
||
});
|
||
tracing::info!("DB 连接池 + EventBus 积压指标采样已启动(每 30 秒采样一次)");
|
||
}
|
||
|
||
async fn sample_pool_metrics(db: &sea_orm::DatabaseConnection) {
|
||
use sea_orm::FromQueryResult;
|
||
|
||
#[derive(FromQueryResult)]
|
||
struct CountRow {
|
||
cnt: i64,
|
||
}
|
||
|
||
// 通过 pg_stat_activity 查询当前连接数
|
||
let stmt = sea_orm::Statement::from_string(
|
||
sea_orm::DatabaseBackend::Postgres,
|
||
"SELECT COUNT(*)::bigint AS cnt FROM pg_stat_activity WHERE state = 'active'".to_string(),
|
||
);
|
||
if let Ok(Some(row)) = CountRow::find_by_statement(stmt).one(db).await {
|
||
metrics::gauge!("db_pool_connections_active").set(row.cnt as f64);
|
||
}
|
||
|
||
let stmt = sea_orm::Statement::from_string(
|
||
sea_orm::DatabaseBackend::Postgres,
|
||
"SELECT COUNT(*)::bigint AS cnt FROM pg_stat_activity WHERE state = 'idle'".to_string(),
|
||
);
|
||
if let Ok(Some(row)) = CountRow::find_by_statement(stmt).one(db).await {
|
||
metrics::gauge!("db_pool_connections_idle").set(row.cnt as f64);
|
||
}
|
||
}
|
||
|
||
async fn sample_eventbus_backlog(db: &sea_orm::DatabaseConnection) {
|
||
use sea_orm::FromQueryResult;
|
||
|
||
#[derive(FromQueryResult)]
|
||
struct CountRow {
|
||
cnt: i64,
|
||
}
|
||
|
||
let stmt = sea_orm::Statement::from_string(
|
||
sea_orm::DatabaseBackend::Postgres,
|
||
"SELECT COUNT(*)::bigint AS cnt FROM domain_events WHERE status = 'pending'".to_string(),
|
||
);
|
||
match CountRow::find_by_statement(stmt).one(db).await {
|
||
Ok(Some(row)) => {
|
||
metrics::gauge!("eventbus_pending_total").set(row.cnt as f64);
|
||
}
|
||
_ => {
|
||
tracing::debug!("EventBus 积压采样:无法获取 pending 事件数");
|
||
}
|
||
}
|
||
}
|