feat(saas): add billing infrastructure — tables, types, service, handlers
B1.1 Billing database: - 5 tables: billing_plans, billing_subscriptions, billing_invoices, billing_payments, billing_usage_quotas - Seed data: Free(¥0)/Pro(¥49)/Team(¥199) plans - JSONB limits for flexible plan configuration Billing module (crates/zclaw-saas/src/billing/): - types.rs: BillingPlan, Subscription, Invoice, Payment, UsageQuota - service.rs: plan CRUD, subscription lookup, usage tracking, quota check - handlers.rs: REST API (plans list/detail, subscription, usage) - mod.rs: routes registered at /api/v1/billing/* Cargo.toml: added chrono feature to sqlx for DateTime<Utc> support
This commit is contained in:
@@ -14,6 +14,9 @@ use zclaw_saas::workers::update_last_used::UpdateLastUsedWorker;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
// Load .env file from project root (walk up from current dir)
|
||||
load_dotenv();
|
||||
|
||||
tracing_subscriber::fmt()
|
||||
.with_env_filter(
|
||||
tracing_subscriber::EnvFilter::try_from_default_env()
|
||||
@@ -24,11 +27,18 @@ async fn main() -> anyhow::Result<()> {
|
||||
let config = SaaSConfig::load()?;
|
||||
info!("SaaS config loaded: {}:{}", config.server.host, config.server.port);
|
||||
|
||||
let db = init_db(&config.database.url).await?;
|
||||
let db = init_db(&config.database).await?;
|
||||
info!("Database initialized");
|
||||
|
||||
// 创建 Worker spawn 限制器(门控并发 DB 操作数量)
|
||||
let worker_limiter = zclaw_saas::state::SpawnLimiter::new(
|
||||
"worker",
|
||||
config.database.worker_concurrency,
|
||||
);
|
||||
info!("Worker spawn limiter: {} permits", config.database.worker_concurrency);
|
||||
|
||||
// 初始化 Worker 调度器 + 注册所有 Worker
|
||||
let mut dispatcher = WorkerDispatcher::new(db.clone());
|
||||
let mut dispatcher = WorkerDispatcher::new(db.clone(), worker_limiter.clone());
|
||||
dispatcher.register(LogOperationWorker);
|
||||
dispatcher.register(CleanupRefreshTokensWorker);
|
||||
dispatcher.register(CleanupRateLimitWorker);
|
||||
@@ -38,12 +48,13 @@ async fn main() -> anyhow::Result<()> {
|
||||
|
||||
// 优雅停机令牌 — 取消后所有 SSE 流和长连接立即终止
|
||||
let shutdown_token = CancellationToken::new();
|
||||
let state = AppState::new(db.clone(), config.clone(), dispatcher, shutdown_token.clone())?;
|
||||
let state = AppState::new(db.clone(), config.clone(), dispatcher, shutdown_token.clone(), worker_limiter.clone())?;
|
||||
|
||||
// Restore rate limit counts from DB so limits survive server restarts
|
||||
// 仅恢复最近 60s 的计数(与 middleware 的 60s 滑动窗口一致),避免过于保守的限流
|
||||
{
|
||||
let rows: Vec<(String, i64)> = sqlx::query_as(
|
||||
"SELECT key, SUM(count) FROM rate_limit_events WHERE window_start > NOW() - interval '1 hour' GROUP BY key"
|
||||
"SELECT key, SUM(count) FROM rate_limit_events WHERE window_start > NOW() - interval '60 seconds' GROUP BY key"
|
||||
)
|
||||
.fetch_all(&db)
|
||||
.await
|
||||
@@ -51,18 +62,17 @@ async fn main() -> anyhow::Result<()> {
|
||||
|
||||
let mut restored_count = 0usize;
|
||||
for (key, count) in rows {
|
||||
let mut entries = Vec::new();
|
||||
// Approximate: insert count timestamps at "now" — the DashMap will
|
||||
// expire them naturally via the retain() call in the middleware.
|
||||
// This is intentionally approximate; exact window alignment is not
|
||||
// required for rate limiting correctness.
|
||||
for _ in 0..count as usize {
|
||||
// 限制恢复计数不超过 RPM 配额,避免重启后过于保守
|
||||
let rpm = state.rate_limit_rpm() as usize;
|
||||
let capped = (count as usize).min(rpm);
|
||||
let mut entries = Vec::with_capacity(capped);
|
||||
for _ in 0..capped {
|
||||
entries.push(std::time::Instant::now());
|
||||
}
|
||||
state.rate_limit_entries.insert(key, entries);
|
||||
restored_count += 1;
|
||||
}
|
||||
info!("Restored rate limit state from DB: {} keys", restored_count);
|
||||
info!("Restored rate limit state from DB: {} keys (60s window, capped at RPM)", restored_count);
|
||||
}
|
||||
|
||||
// 迁移旧格式 TOTP secret(明文 → 加密 enc: 格式)
|
||||
@@ -117,20 +127,64 @@ async fn main() -> anyhow::Result<()> {
|
||||
});
|
||||
}
|
||||
|
||||
let app = build_router(state).await;
|
||||
// 限流事件批量 flush (可配置间隔,默认 5s)
|
||||
{
|
||||
let flush_state = state.clone();
|
||||
let batch_interval = config.database.rate_limit_batch_interval_secs;
|
||||
let batch_max = config.database.rate_limit_batch_max_size;
|
||||
tokio::spawn(async move {
|
||||
let mut interval = tokio::time::interval(std::time::Duration::from_secs(batch_interval));
|
||||
loop {
|
||||
interval.tick().await;
|
||||
flush_state.flush_rate_limit_batch(batch_max).await;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// 连接池可观测性 (30s 指标日志)
|
||||
{
|
||||
let metrics_db = db.clone();
|
||||
tokio::spawn(async move {
|
||||
let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
|
||||
loop {
|
||||
interval.tick().await;
|
||||
let pool = &metrics_db;
|
||||
let total = pool.options().get_max_connections() as usize;
|
||||
let idle = pool.num_idle() as usize;
|
||||
let used = total.saturating_sub(idle);
|
||||
let usage_pct = if total > 0 { used * 100 / total } else { 0 };
|
||||
tracing::info!(
|
||||
"[PoolMetrics] total={} idle={} used={} usage_pct={}%",
|
||||
total, idle, used, usage_pct,
|
||||
);
|
||||
if usage_pct >= 80 {
|
||||
tracing::warn!(
|
||||
"[PoolMetrics] HIGH USAGE: {}% of connections in use!",
|
||||
usage_pct,
|
||||
);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
let app = build_router(state.clone()).await;
|
||||
|
||||
// 配置 TCP keepalive + 短 SO_LINGER,防止 CLOSE_WAIT 累积
|
||||
let listener = create_listener(&config.server.host, config.server.port)?;
|
||||
info!("SaaS server listening on {}:{}", config.server.host, config.server.port);
|
||||
|
||||
// 优雅停机: Ctrl+C → 取消 CancellationToken → SSE 流终止 → 连接排空
|
||||
// 优雅停机: Ctrl+C → 最终批量 flush → 取消 CancellationToken → SSE 流终止 → 连接排空
|
||||
let token = shutdown_token.clone();
|
||||
let flush_state = state;
|
||||
let batch_max = config.database.rate_limit_batch_max_size;
|
||||
axum::serve(listener, app.into_make_service_with_connect_info::<std::net::SocketAddr>())
|
||||
.with_graceful_shutdown(async move {
|
||||
tokio::signal::ctrl_c()
|
||||
.await
|
||||
.expect("Failed to install Ctrl+C handler");
|
||||
info!("Received shutdown signal, cancelling SSE streams and draining connections...");
|
||||
info!("Received shutdown signal, flushing pending rate limit batch...");
|
||||
flush_state.flush_rate_limit_batch(batch_max).await;
|
||||
info!("Cancelling SSE streams and draining connections...");
|
||||
token.cancel();
|
||||
})
|
||||
.await?;
|
||||
@@ -280,6 +334,7 @@ async fn build_router(state: AppState) -> axum::Router {
|
||||
.merge(zclaw_saas::agent_template::routes())
|
||||
.merge(zclaw_saas::scheduled_task::routes())
|
||||
.merge(zclaw_saas::telemetry::routes())
|
||||
.merge(zclaw_saas::billing::routes())
|
||||
.layer(middleware::from_fn_with_state(
|
||||
state.clone(),
|
||||
zclaw_saas::middleware::api_version_middleware,
|
||||
@@ -329,3 +384,35 @@ async fn build_router(state: AppState) -> axum::Router {
|
||||
.layer(cors)
|
||||
.with_state(state)
|
||||
}
|
||||
|
||||
/// Load `.env` file from project root by walking up from current directory.
|
||||
/// Sets environment variables that are not already set (does not override).
|
||||
fn load_dotenv() {
|
||||
let mut dir = std::env::current_dir().unwrap_or_default();
|
||||
loop {
|
||||
let env_path = dir.join(".env");
|
||||
if env_path.is_file() {
|
||||
if let Ok(content) = std::fs::read_to_string(&env_path) {
|
||||
for line in content.lines() {
|
||||
let line = line.trim();
|
||||
if line.is_empty() || line.starts_with('#') {
|
||||
continue;
|
||||
}
|
||||
if let Some((key, value)) = line.split_once('=') {
|
||||
let key = key.trim();
|
||||
let value = value.trim();
|
||||
// Only set if not already defined in environment
|
||||
if std::env::var(key).is_err() {
|
||||
std::env::set_var(key, value);
|
||||
}
|
||||
}
|
||||
}
|
||||
tracing::debug!("Loaded .env from {}", env_path.display());
|
||||
}
|
||||
return;
|
||||
}
|
||||
if !dir.pop() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user