feat(server): 可观测性 Phase 1 — 健康检查路由 + Prometheus 指标 + 连接池/事件积压监控
- 添加 /health/live 存活探针别名(原 /health + /health/ready 保留) - 新增 metrics middleware:http_requests_total 计数器 + http_request_duration_seconds 直方图 - Prometheus exporter 独立端口 9090(可通过 ERP__SERVER__METRICS_PORT 覆盖) - 后台任务每 30s 采样 DB 连接池活跃/空闲连接数(pg_stat_activity) - 后台任务每 30s 采样 EventBus pending 事件积压数 - UUID 路径归一化避免高基数(/api/v1/users/:id/posts)
This commit is contained in:
@@ -36,6 +36,8 @@ anyhow.workspace = true
|
||||
uuid.workspace = true
|
||||
chrono.workspace = true
|
||||
moka = { version = "0.12", features = ["sync"] }
|
||||
metrics.workspace = true
|
||||
metrics-exporter-prometheus.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
erp-auth = { workspace = true }
|
||||
|
||||
@@ -20,6 +20,12 @@ pub struct AppConfig {
|
||||
pub struct ServerConfig {
|
||||
pub host: String,
|
||||
pub port: u16,
|
||||
#[serde(default = "default_metrics_port")]
|
||||
pub metrics_port: u16,
|
||||
}
|
||||
|
||||
fn default_metrics_port() -> u16 {
|
||||
9090
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
|
||||
@@ -130,5 +130,6 @@ async fn check_redis(client: &redis::Client) -> ComponentStatus {
|
||||
pub fn health_check_router() -> Router<AppState> {
|
||||
Router::new()
|
||||
.route("/health", get(health_check))
|
||||
.route("/health/live", get(health_check))
|
||||
.route("/health/ready", get(readiness_check))
|
||||
}
|
||||
|
||||
@@ -432,6 +432,9 @@ async fn main() -> anyhow::Result<()> {
|
||||
// Start event cleanup (archive old published events + purge processed_events)
|
||||
tasks::start_event_cleanup(db.clone());
|
||||
|
||||
// Start DB connection pool metrics sampling (every 30s)
|
||||
tasks::start_pool_metrics(db.clone());
|
||||
|
||||
// Start timeout checker (scan overdue tasks every 60s)
|
||||
erp_workflow::WorkflowModule::start_timeout_checker(db.clone(), event_bus.clone());
|
||||
tracing::info!("Timeout checker started");
|
||||
@@ -611,8 +614,13 @@ async fn main() -> anyhow::Result<()> {
|
||||
let app = Router::new()
|
||||
.nest("/api/v1", unthrottled_routes.merge(public_routes).merge(protected_routes))
|
||||
.nest("/uploads", uploads_router)
|
||||
.layer(axum::middleware::from_fn(middleware::metrics::metrics_middleware))
|
||||
.layer(cors);
|
||||
|
||||
// Start Prometheus metrics exporter on a separate port
|
||||
let metrics_port = state.config.server.metrics_port;
|
||||
middleware::metrics::start_metrics_server(metrics_port);
|
||||
|
||||
let addr = format!("{}:{}", host, port);
|
||||
let listener = tokio::net::TcpListener::bind(&addr).await?;
|
||||
tracing::info!(addr = %addr, "Server listening");
|
||||
|
||||
122
crates/erp-server/src/middleware/metrics.rs
Normal file
122
crates/erp-server/src/middleware/metrics.rs
Normal file
@@ -0,0 +1,122 @@
|
||||
use axum::extract::Request;
|
||||
use axum::http::Method;
|
||||
use axum::middleware::Next;
|
||||
use axum::response::{IntoResponse, Response};
|
||||
use metrics::{counter, histogram};
|
||||
use std::time::Instant;
|
||||
|
||||
/// HTTP 请求指标中间件。
|
||||
///
|
||||
/// 记录两个 Prometheus 指标:
|
||||
/// - `http_requests_total` — 计数器,标签: method, path, status
|
||||
/// - `http_request_duration_seconds` — 直方图,标签: method, path, status
|
||||
pub async fn metrics_middleware(req: Request, next: Next) -> Response {
|
||||
let method = method_label(req.method());
|
||||
let path = path_label(req.uri().path());
|
||||
|
||||
let start = Instant::now();
|
||||
let resp = next.run(req).await;
|
||||
let elapsed = start.elapsed();
|
||||
|
||||
let status = resp.status().as_u16().to_string();
|
||||
|
||||
let labels = [
|
||||
("method", method.clone()),
|
||||
("path", path.clone()),
|
||||
("status", status.clone()),
|
||||
];
|
||||
|
||||
counter!("http_requests_total", &labels).increment(1);
|
||||
histogram!("http_request_duration_seconds", &labels).record(elapsed.as_secs_f64());
|
||||
|
||||
resp
|
||||
}
|
||||
|
||||
fn method_label(method: &Method) -> String {
|
||||
method.as_str().to_owned()
|
||||
}
|
||||
|
||||
/// 归一化路径:将 UUID 段替换为 `:id`,避免高基数。
|
||||
fn path_label(path: &str) -> String {
|
||||
let parts: Vec<&str> = path
|
||||
.split('/')
|
||||
.filter(|s| !s.is_empty())
|
||||
.map(|s| if looks_like_uuid(s) { ":id" } else { s })
|
||||
.collect();
|
||||
if parts.is_empty() {
|
||||
"/".to_string()
|
||||
} else {
|
||||
format!("/{}", parts.join("/"))
|
||||
}
|
||||
}
|
||||
|
||||
fn looks_like_uuid(s: &str) -> bool {
|
||||
s.len() == 36
|
||||
&& s.chars().filter(|c| *c == '-').count() == 4
|
||||
&& s.chars().all(|c| c.is_ascii_hexdigit() || c == '-')
|
||||
}
|
||||
|
||||
/// 在独立端口启动 Prometheus exporter。
|
||||
pub fn start_metrics_server(port: u16) {
|
||||
let builder = metrics_exporter_prometheus::PrometheusBuilder::new();
|
||||
let recorder = builder.build_recorder();
|
||||
let handle = recorder.handle();
|
||||
|
||||
if let Err(e) = metrics::set_global_recorder(recorder) {
|
||||
tracing::error!(error = %e, "Failed to install Prometheus recorder");
|
||||
return;
|
||||
}
|
||||
|
||||
tokio::spawn(async move {
|
||||
let app = axum::Router::new()
|
||||
.route(
|
||||
"/metrics",
|
||||
axum::routing::get(move || {
|
||||
let handle = handle.clone();
|
||||
async move {
|
||||
let body = handle.render();
|
||||
axum::response::IntoResponse::into_response(
|
||||
([(axum::http::header::CONTENT_TYPE, "text/plain; version=0.0.4")], body),
|
||||
)
|
||||
}
|
||||
}),
|
||||
)
|
||||
.fallback(|| async { axum::http::StatusCode::NOT_FOUND.into_response() as Response });
|
||||
|
||||
let addr = format!("0.0.0.0:{port}");
|
||||
match tokio::net::TcpListener::bind(&addr).await {
|
||||
Ok(listener) => {
|
||||
tracing::info!(addr = %addr, "Prometheus metrics server listening");
|
||||
if let Err(e) = axum::serve(listener, app).await {
|
||||
tracing::error!(error = %e, "Metrics server error");
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!(error = %e, addr = %addr, "Failed to bind metrics server");
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn path_label_normalizes_uuids() {
|
||||
assert_eq!(path_label("/api/v1/users"), "/api/v1/users");
|
||||
assert_eq!(
|
||||
path_label("/api/v1/users/01234567-89ab-cdef-0123-456789abcdef/posts"),
|
||||
"/api/v1/users/:id/posts"
|
||||
);
|
||||
assert_eq!(path_label("/"), "/");
|
||||
assert_eq!(path_label(""), "/");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn is_uuid_checks_format() {
|
||||
assert!(looks_like_uuid("01234567-89ab-cdef-0123-456789abcdef"));
|
||||
assert!(!looks_like_uuid("not-a-uuid"));
|
||||
assert!(!looks_like_uuid("short"));
|
||||
}
|
||||
}
|
||||
@@ -1,2 +1,3 @@
|
||||
pub mod metrics;
|
||||
pub mod rate_limit;
|
||||
pub mod tenant_rls;
|
||||
|
||||
@@ -51,3 +51,69 @@ async fn run_cleanup(db: &sea_orm::DatabaseConnection) -> Result<(), sea_orm::Db
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// 启动 DB 连接池 + EventBus 积压指标采样任务。
|
||||
///
|
||||
/// 每 30 秒采样一次并导出为 Prometheus gauge:
|
||||
/// - `db_pool_connections_active` — 当前活跃连接数
|
||||
/// - `db_pool_connections_idle` — 当前空闲连接数
|
||||
/// - `eventbus_pending_total` — pending 状态的领域事件数
|
||||
pub fn start_pool_metrics(db: sea_orm::DatabaseConnection) {
|
||||
tokio::spawn(async move {
|
||||
let mut interval = tokio::time::interval(Duration::from_secs(30));
|
||||
loop {
|
||||
interval.tick().await;
|
||||
sample_pool_metrics(&db).await;
|
||||
sample_eventbus_backlog(&db).await;
|
||||
}
|
||||
});
|
||||
tracing::info!("DB 连接池 + EventBus 积压指标采样已启动(每 30 秒采样一次)");
|
||||
}
|
||||
|
||||
async fn sample_pool_metrics(db: &sea_orm::DatabaseConnection) {
|
||||
use sea_orm::FromQueryResult;
|
||||
|
||||
#[derive(FromQueryResult)]
|
||||
struct CountRow {
|
||||
cnt: i64,
|
||||
}
|
||||
|
||||
// 通过 pg_stat_activity 查询当前连接数
|
||||
let stmt = sea_orm::Statement::from_string(
|
||||
sea_orm::DatabaseBackend::Postgres,
|
||||
"SELECT COUNT(*)::bigint AS cnt FROM pg_stat_activity WHERE state = 'active'".to_string(),
|
||||
);
|
||||
if let Ok(Some(row)) = CountRow::find_by_statement(stmt).one(db).await {
|
||||
metrics::gauge!("db_pool_connections_active").set(row.cnt as f64);
|
||||
}
|
||||
|
||||
let stmt = sea_orm::Statement::from_string(
|
||||
sea_orm::DatabaseBackend::Postgres,
|
||||
"SELECT COUNT(*)::bigint AS cnt FROM pg_stat_activity WHERE state = 'idle'".to_string(),
|
||||
);
|
||||
if let Ok(Some(row)) = CountRow::find_by_statement(stmt).one(db).await {
|
||||
metrics::gauge!("db_pool_connections_idle").set(row.cnt as f64);
|
||||
}
|
||||
}
|
||||
|
||||
async fn sample_eventbus_backlog(db: &sea_orm::DatabaseConnection) {
|
||||
use sea_orm::FromQueryResult;
|
||||
|
||||
#[derive(FromQueryResult)]
|
||||
struct CountRow {
|
||||
cnt: i64,
|
||||
}
|
||||
|
||||
let stmt = sea_orm::Statement::from_string(
|
||||
sea_orm::DatabaseBackend::Postgres,
|
||||
"SELECT COUNT(*)::bigint AS cnt FROM domain_events WHERE status = 'pending'".to_string(),
|
||||
);
|
||||
match CountRow::find_by_statement(stmt).one(db).await {
|
||||
Ok(Some(row)) => {
|
||||
metrics::gauge!("eventbus_pending_total").set(row.cnt as f64);
|
||||
}
|
||||
_ => {
|
||||
tracing::debug!("EventBus 积压采样:无法获取 pending 事件数");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user