功能修复: 1. 患者创建空名称验证:后端添加 name.trim().is_empty() 检查 2. 仪表盘统计容错:单个查询失败返回零值而非 500 3. FHIR 路由修复:从 /fhir 移到 /api/v1/fhir 保持一致 4. 冻结模块后端中间件:新增 frozen_module_middleware 拦截冻结路径 5. 积分端点权限码:health.health-data.list → health.points.list 6. 角色权限迁移:护士补充 devices.list,运营补充 points.list/manage 7. 测试结果文档:R01-R05 角色测试 + T00/T10 结果归档 Clippy 全 workspace 清零(14→0 errors): - erp-core: 修复 empty doc line、collapsible if、redundant closure 等 9 处 - erp-health: 修复 too_many_arguments、unused var、unnecessary parens 等 58 处 - erp-ai: 修复 dead_code、unused import 等 11 处 - erp-plugin: 修复 too_many_arguments、wildcard pattern 等 11 处 - erp-server-migration: 修复 enum_variant_names 5 处 - erp-auth/config/workflow/message: 各 1-3 处 工程改进: - lint-staged 配置迁移到 .lintstagedrc.js(函数式避免文件列表传给 clippy) - cargo fmt 统一格式化
114 lines
3.8 KiB
Rust
114 lines
3.8 KiB
Rust
use std::time::Duration;
|
||
|
||
/// 启动事件清理后台任务。
|
||
///
|
||
/// 每日执行一次:
|
||
/// - 调用 `cleanup_old_published_events()` 归档 >7 天的已发布事件
|
||
/// - 调用 `cleanup_old_processed_events()` 清理 >7 天的去重记录
|
||
pub fn start_event_cleanup(db: sea_orm::DatabaseConnection) {
|
||
tokio::spawn(async move {
|
||
let mut interval = tokio::time::interval(Duration::from_secs(86400));
|
||
loop {
|
||
interval.tick().await;
|
||
if let Err(e) = run_cleanup(&db).await {
|
||
tracing::warn!(error = %e, "事件清理任务执行失败");
|
||
}
|
||
}
|
||
});
|
||
tracing::info!("事件清理任务已启动(每 24 小时执行一次)");
|
||
}
|
||
|
||
async fn run_cleanup(db: &sea_orm::DatabaseConnection) -> Result<(), sea_orm::DbErr> {
|
||
use sea_orm::ConnectionTrait;
|
||
|
||
// 归档 >7 天的已发布事件
|
||
match db
|
||
.execute_unprepared("SELECT cleanup_old_published_events(7, 1000)")
|
||
.await
|
||
{
|
||
Ok(result) => {
|
||
tracing::info!(rows_affected = result.rows_affected(), "已发布事件归档完成");
|
||
}
|
||
Err(e) => tracing::warn!(error = %e, "已发布事件归档失败"),
|
||
}
|
||
|
||
// 清理 >7 天的去重记录
|
||
match db
|
||
.execute_unprepared("SELECT cleanup_old_processed_events(7, 1000)")
|
||
.await
|
||
{
|
||
Ok(result) => {
|
||
tracing::info!(rows_affected = result.rows_affected(), "去重记录清理完成");
|
||
}
|
||
Err(e) => tracing::warn!(error = %e, "去重记录清理失败"),
|
||
}
|
||
|
||
Ok(())
|
||
}
|
||
|
||
/// 启动 DB 连接池 + EventBus 积压指标采样任务。
|
||
///
|
||
/// 每 30 秒采样一次并导出为 Prometheus gauge:
|
||
/// - `db_pool_connections_active` — 当前活跃连接数
|
||
/// - `db_pool_connections_idle` — 当前空闲连接数
|
||
/// - `eventbus_pending_total` — pending 状态的领域事件数
|
||
pub fn start_pool_metrics(db: sea_orm::DatabaseConnection) {
|
||
tokio::spawn(async move {
|
||
let mut interval = tokio::time::interval(Duration::from_secs(30));
|
||
loop {
|
||
interval.tick().await;
|
||
sample_pool_metrics(&db).await;
|
||
sample_eventbus_backlog(&db).await;
|
||
}
|
||
});
|
||
tracing::info!("DB 连接池 + EventBus 积压指标采样已启动(每 30 秒采样一次)");
|
||
}
|
||
|
||
async fn sample_pool_metrics(db: &sea_orm::DatabaseConnection) {
|
||
use sea_orm::FromQueryResult;
|
||
|
||
#[derive(FromQueryResult)]
|
||
struct CountRow {
|
||
cnt: i64,
|
||
}
|
||
|
||
// 通过 pg_stat_activity 查询当前连接数
|
||
let stmt = sea_orm::Statement::from_string(
|
||
sea_orm::DatabaseBackend::Postgres,
|
||
"SELECT COUNT(*)::bigint AS cnt FROM pg_stat_activity WHERE state = 'active'".to_string(),
|
||
);
|
||
if let Ok(Some(row)) = CountRow::find_by_statement(stmt).one(db).await {
|
||
metrics::gauge!("db_pool_connections_active").set(row.cnt as f64);
|
||
}
|
||
|
||
let stmt = sea_orm::Statement::from_string(
|
||
sea_orm::DatabaseBackend::Postgres,
|
||
"SELECT COUNT(*)::bigint AS cnt FROM pg_stat_activity WHERE state = 'idle'".to_string(),
|
||
);
|
||
if let Ok(Some(row)) = CountRow::find_by_statement(stmt).one(db).await {
|
||
metrics::gauge!("db_pool_connections_idle").set(row.cnt as f64);
|
||
}
|
||
}
|
||
|
||
async fn sample_eventbus_backlog(db: &sea_orm::DatabaseConnection) {
|
||
use sea_orm::FromQueryResult;
|
||
|
||
#[derive(FromQueryResult)]
|
||
struct CountRow {
|
||
cnt: i64,
|
||
}
|
||
|
||
let stmt = sea_orm::Statement::from_string(
|
||
sea_orm::DatabaseBackend::Postgres,
|
||
"SELECT COUNT(*)::bigint AS cnt FROM domain_events WHERE status = 'pending'".to_string(),
|
||
);
|
||
match CountRow::find_by_statement(stmt).one(db).await {
|
||
Ok(Some(row)) => {
|
||
metrics::gauge!("eventbus_pending_total").set(row.cnt as f64);
|
||
}
|
||
_ => {
|
||
tracing::debug!("EventBus 积压采样:无法获取 pending 事件数");
|
||
}
|
||
}
|
||
}
|