//! 计费用量聚合 Worker //! //! 从 usage_records 聚合当月用量到 billing_usage_quotas 表。 //! 由 Scheduler 每小时触发,或在 relay 请求完成时直接派发。 use async_trait::async_trait; use chrono::{Datelike, Timelike}; use serde::{Deserialize, Serialize}; use sqlx::PgPool; use crate::error::SaasResult; use super::Worker; /// 用量聚合参数 #[derive(Debug, Serialize, Deserialize)] pub struct AggregateUsageArgs { /// 聚合的目标账户 ID(None = 聚合所有活跃账户) pub account_id: Option, } pub struct AggregateUsageWorker; #[async_trait] impl Worker for AggregateUsageWorker { type Args = AggregateUsageArgs; fn name(&self) -> &str { "aggregate_usage" } async fn perform(&self, db: &PgPool, args: Self::Args) -> SaasResult<()> { match args.account_id { Some(account_id) => { aggregate_single_account(db, &account_id).await?; } None => { aggregate_all_accounts(db).await?; } } Ok(()) } } /// 聚合单个账户的当月用量 async fn aggregate_single_account(db: &PgPool, account_id: &str) -> SaasResult<()> { // 获取或创建用量记录(确保存在) let usage = crate::billing::service::get_or_create_usage(db, account_id).await?; // 从 usage_records 聚合当月实际 token 用量 let now = chrono::Utc::now(); let period_start = now .with_day(1).unwrap_or(now) .with_hour(0).unwrap_or(now) .with_minute(0).unwrap_or(now) .with_second(0).unwrap_or(now) .with_nanosecond(0).unwrap_or(now); let aggregated: Option<(i64, i64, i64)> = sqlx::query_as( "SELECT COALESCE(SUM(input_tokens), 0)::bigint, \ COALESCE(SUM(output_tokens), 0)::bigint, \ COUNT(*) \ FROM usage_records \ WHERE account_id = $1 AND created_at >= $2 AND status = 'success'" ) .bind(account_id) .bind(period_start) .fetch_optional(db) .await?; if let Some((input_tokens, output_tokens, request_count)) = aggregated { sqlx::query( "UPDATE billing_usage_quotas \ SET input_tokens = $1, \ output_tokens = $2, \ relay_requests = GREATEST(relay_requests, $3::int), \ updated_at = NOW() \ WHERE id = $4" ) .bind(input_tokens) .bind(output_tokens) .bind(request_count as i32) .bind(&usage.id) .execute(db) .await?; tracing::debug!( "Aggregated usage for account {}: in={}, out={}, reqs={}", account_id, input_tokens, output_tokens, request_count ); } Ok(()) } /// 聚合所有活跃账户 async fn aggregate_all_accounts(db: &PgPool) -> SaasResult<()> { let account_ids: Vec = sqlx::query_scalar( "SELECT DISTINCT account_id FROM billing_subscriptions \ WHERE status IN ('trial', 'active', 'past_due') \ UNION \ SELECT DISTINCT account_id FROM billing_usage_quotas \ WHERE period_start >= date_trunc('month', NOW())" ) .fetch_all(db) .await?; let total = account_ids.len(); let mut errors = 0; for account_id in &account_ids { if let Err(e) = aggregate_single_account(db, account_id).await { tracing::warn!("Failed to aggregate usage for {}: {}", account_id, e); errors += 1; } } tracing::info!( "Usage aggregation complete: {} accounts, {} errors", total, errors ); Ok(()) }