//! Webhook 数据库服务层 //! //! 提供 CRUD 操作和事件触发。 use sqlx::{FromRow, PgPool}; use crate::error::{SaasError, SaasResult}; use super::types::*; // ─── 数据库行结构 ─────────────────────────────────────────────── #[derive(Debug, FromRow)] #[allow(dead_code)] struct WebhookSubscriptionRow { id: String, account_id: String, url: String, secret: String, events: Vec, enabled: bool, created_at: String, updated_at: String, } impl WebhookSubscriptionRow { fn to_response(&self) -> WebhookSubscription { WebhookSubscription { id: self.id.clone(), account_id: self.account_id.clone(), url: self.url.clone(), events: self.events.clone(), enabled: self.enabled, created_at: self.created_at.clone(), updated_at: self.updated_at.clone(), } } } #[derive(Debug, FromRow)] struct WebhookDeliveryRow { id: String, subscription_id: String, event: String, payload: serde_json::Value, response_status: Option, attempts: i32, delivered_at: Option, created_at: String, } impl WebhookDeliveryRow { fn to_response(&self) -> WebhookDelivery { WebhookDelivery { id: self.id.clone(), subscription_id: self.subscription_id.clone(), event: self.event.clone(), payload: self.payload.clone(), response_status: self.response_status, attempts: self.attempts, delivered_at: self.delivered_at.clone(), created_at: self.created_at.clone(), } } } // ─── 内部查询行:触发 webhooks 时需要 secret 和 url ────────── #[derive(Debug, FromRow)] #[allow(dead_code)] struct SubscriptionForDeliveryRow { id: String, url: String, secret: String, } // ─── CRUD 操作 ──────────────────────────────────────────────── /// 创建 Webhook 订阅 pub async fn create_subscription( db: &PgPool, account_id: &str, req: &CreateWebhookRequest, ) -> SaasResult { let id = uuid::Uuid::new_v4().to_string(); let now = chrono::Utc::now().to_rfc3339(); let secret = generate_secret(); sqlx::query( "INSERT INTO webhook_subscriptions (id, account_id, url, secret, events, enabled, created_at, updated_at) VALUES ($1, $2, $3, $4, $5, true, $6, $6)" ) .bind(&id) .bind(account_id) .bind(&req.url) .bind(&secret) .bind(&req.events) .bind(&now) .execute(db) .await?; Ok(WebhookSubscription { id, account_id: account_id.to_string(), url: req.url.clone(), events: req.events.clone(), enabled: true, created_at: now.clone(), updated_at: now, }) } /// 列出账户的 Webhook 订阅 pub async fn list_subscriptions( db: &PgPool, account_id: &str, ) -> SaasResult> { let rows: Vec = sqlx::query_as( "SELECT id, account_id, url, secret, events, enabled, created_at, updated_at FROM webhook_subscriptions WHERE account_id = $1 ORDER BY created_at DESC" ) .bind(account_id) .fetch_all(db) .await?; Ok(rows.iter().map(|r| r.to_response()).collect()) } /// 获取单个 Webhook 订阅 pub async fn get_subscription( db: &PgPool, account_id: &str, subscription_id: &str, ) -> SaasResult { let row: Option = sqlx::query_as( "SELECT id, account_id, url, secret, events, enabled, created_at, updated_at FROM webhook_subscriptions WHERE id = $1 AND account_id = $2" ) .bind(subscription_id) .bind(account_id) .fetch_optional(db) .await?; Ok(row .ok_or_else(|| SaasError::NotFound("Webhook 订阅不存在".into()))? .to_response()) } /// 更新 Webhook 订阅 pub async fn update_subscription( db: &PgPool, account_id: &str, subscription_id: &str, req: &UpdateWebhookRequest, ) -> SaasResult { let existing = get_subscription(db, account_id, subscription_id).await?; let now = chrono::Utc::now().to_rfc3339(); let url = req.url.as_deref().unwrap_or(&existing.url); let events = req.events.as_deref().unwrap_or(&existing.events); let enabled = req.enabled.unwrap_or(existing.enabled); sqlx::query( "UPDATE webhook_subscriptions SET url = $1, events = $2, enabled = $3, updated_at = $4 WHERE id = $5 AND account_id = $6" ) .bind(url) .bind(events) .bind(enabled) .bind(&now) .bind(subscription_id) .bind(account_id) .execute(db) .await?; get_subscription(db, account_id, subscription_id).await } /// 删除 Webhook 订阅 pub async fn delete_subscription( db: &PgPool, account_id: &str, subscription_id: &str, ) -> SaasResult<()> { let result = sqlx::query( "DELETE FROM webhook_subscriptions WHERE id = $1 AND account_id = $2" ) .bind(subscription_id) .bind(account_id) .execute(db) .await?; if result.rows_affected() == 0 { return Err(SaasError::NotFound("Webhook 订阅不存在".into())); } Ok(()) } /// 列出订阅的投递日志 pub async fn list_deliveries( db: &PgPool, account_id: &str, subscription_id: &str, ) -> SaasResult> { // 先验证订阅属于该账户 let _ = get_subscription(db, account_id, subscription_id).await?; let rows: Vec = sqlx::query_as( "SELECT id, subscription_id, event, payload, response_status, attempts, delivered_at, created_at FROM webhook_deliveries WHERE subscription_id = $1 ORDER BY created_at DESC" ) .bind(subscription_id) .fetch_all(db) .await?; Ok(rows.iter().map(|r| r.to_response()).collect()) } // ─── 事件触发 ───────────────────────────────────────────────── /// 触发匹配的 webhook 订阅,创建投递记录。 /// /// 查找所有启用的、事件列表包含 `event` 的订阅, /// 为每个订阅创建一条 `webhook_deliveries` 记录。 pub async fn trigger_webhooks( db: &PgPool, event: &str, payload: &serde_json::Value, ) -> SaasResult> { let subs: Vec = sqlx::query_as( "SELECT id, url, secret FROM webhook_subscriptions WHERE enabled = true AND $1 = ANY(events)" ) .bind(event) .fetch_all(db) .await?; let mut delivery_ids = Vec::with_capacity(subs.len()); for sub in &subs { let id = uuid::Uuid::new_v4().to_string(); let now = chrono::Utc::now().to_rfc3339(); sqlx::query( "INSERT INTO webhook_deliveries (id, subscription_id, event, payload, attempts, created_at) VALUES ($1, $2, $3, $4, 0, $5)" ) .bind(&id) .bind(&sub.id) .bind(event) .bind(payload) .bind(&now) .execute(db) .await?; delivery_ids.push(id); } if !delivery_ids.is_empty() { tracing::info!( event = %event, count = delivery_ids.len(), "Webhook deliveries enqueued" ); } Ok(delivery_ids) } // ─── 投递查询(供 Worker 使用)──────────────────────────────── /// 获取待投递的记录(未投递且尝试次数 < max_attempts) pub(crate) async fn fetch_pending_deliveries( db: &PgPool, max_attempts: i32, limit: i32, ) -> SaasResult> { let rows: Vec = sqlx::query_as( "SELECT d.id, d.subscription_id, d.event, d.payload, d.attempts, s.url, s.secret FROM webhook_deliveries d JOIN webhook_subscriptions s ON s.id = d.subscription_id WHERE d.delivered_at IS NULL AND d.attempts < $1 AND s.enabled = true ORDER BY d.created_at ASC LIMIT $2" ) .bind(max_attempts) .bind(limit) .fetch_all(db) .await?; Ok(rows.into_iter().map(|r| r.to_pending()).collect()) } /// 记录投递结果 pub async fn record_delivery_result( db: &PgPool, delivery_id: &str, response_status: Option, response_body: Option<&str>, success: bool, ) -> SaasResult<()> { let now = chrono::Utc::now().to_rfc3339(); if success { sqlx::query( "UPDATE webhook_deliveries SET response_status = $1, response_body = $2, delivered_at = $3, attempts = attempts + 1 WHERE id = $4" ) .bind(response_status) .bind(response_body) .bind(&now) .bind(delivery_id) .execute(db) .await?; } else { sqlx::query( "UPDATE webhook_deliveries SET response_status = $1, response_body = $2, attempts = attempts + 1 WHERE id = $3" ) .bind(response_status) .bind(response_body) .bind(delivery_id) .execute(db) .await?; } Ok(()) } // ─── 内部类型 ────────────────────────────────────────────────── #[derive(Debug, FromRow)] struct PendingDeliveryRow { id: String, subscription_id: String, event: String, payload: serde_json::Value, attempts: i32, url: String, secret: String, } #[allow(dead_code)] pub(crate) struct PendingDelivery { pub id: String, pub subscription_id: String, pub event: String, pub payload: serde_json::Value, pub attempts: i32, pub url: String, pub secret: String, } impl PendingDeliveryRow { fn to_pending(self) -> PendingDelivery { PendingDelivery { id: self.id, subscription_id: self.subscription_id, event: self.event, payload: self.payload, attempts: self.attempts, url: self.url, secret: self.secret, } } } /// 生成 32 字节随机签名密钥(hex 编码) fn generate_secret() -> String { use rand::RngCore; let mut buf = [0u8; 32]; rand::thread_rng().fill_bytes(&mut buf); hex::encode(buf) }