feat(saas): add webhook event notification system (@unplugged)

Webhook infrastructure for external event notifications:
- SQL migration: webhook_subscriptions + webhook_deliveries tables
- Types: CreateWebhookRequest, UpdateWebhookRequest, WebhookDelivery
- Service: CRUD operations + trigger_webhooks + HMAC-SHA256 signing
- Handlers: REST API endpoints (CRUD + delivery logs)
- Worker: WebhookDeliveryWorker with exponential retry (max 3)

NOT YET INTEGRATED: needs mod registration in lib.rs + workers/mod.rs,
hmac crate dependency, and route mounting. Code is ready for future
integration after stabilization phase completes.
This commit is contained in:
iven
2026-04-03 23:01:49 +08:00
parent 1c99e5f3a3
commit 5eeabd1f30
6 changed files with 749 additions and 0 deletions

View File

@@ -0,0 +1,369 @@
//! Webhook 数据库服务层
//!
//! 提供 CRUD 操作和事件触发。
use sqlx::{FromRow, PgPool};
use crate::error::{SaasError, SaasResult};
use super::types::*;
// ─── 数据库行结构 ───────────────────────────────────────────────
#[derive(Debug, FromRow)]
#[allow(dead_code)]
struct WebhookSubscriptionRow {
id: String,
account_id: String,
url: String,
secret: String,
events: Vec<String>,
enabled: bool,
created_at: String,
updated_at: String,
}
impl WebhookSubscriptionRow {
fn to_response(&self) -> WebhookSubscription {
WebhookSubscription {
id: self.id.clone(),
account_id: self.account_id.clone(),
url: self.url.clone(),
events: self.events.clone(),
enabled: self.enabled,
created_at: self.created_at.clone(),
updated_at: self.updated_at.clone(),
}
}
}
#[derive(Debug, FromRow)]
struct WebhookDeliveryRow {
id: String,
subscription_id: String,
event: String,
payload: serde_json::Value,
response_status: Option<i32>,
attempts: i32,
delivered_at: Option<String>,
created_at: String,
}
impl WebhookDeliveryRow {
fn to_response(&self) -> WebhookDelivery {
WebhookDelivery {
id: self.id.clone(),
subscription_id: self.subscription_id.clone(),
event: self.event.clone(),
payload: self.payload.clone(),
response_status: self.response_status,
attempts: self.attempts,
delivered_at: self.delivered_at.clone(),
created_at: self.created_at.clone(),
}
}
}
// ─── 内部查询行:触发 webhooks 时需要 secret 和 url ──────────
#[derive(Debug, FromRow)]
#[allow(dead_code)]
struct SubscriptionForDeliveryRow {
id: String,
url: String,
secret: String,
}
// ─── CRUD 操作 ────────────────────────────────────────────────
/// 创建 Webhook 订阅
pub async fn create_subscription(
db: &PgPool,
account_id: &str,
req: &CreateWebhookRequest,
) -> SaasResult<WebhookSubscription> {
let id = uuid::Uuid::new_v4().to_string();
let now = chrono::Utc::now().to_rfc3339();
let secret = generate_secret();
sqlx::query(
"INSERT INTO webhook_subscriptions (id, account_id, url, secret, events, enabled, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, true, $6, $6)"
)
.bind(&id)
.bind(account_id)
.bind(&req.url)
.bind(&secret)
.bind(&req.events)
.bind(&now)
.execute(db)
.await?;
Ok(WebhookSubscription {
id,
account_id: account_id.to_string(),
url: req.url.clone(),
events: req.events.clone(),
enabled: true,
created_at: now.clone(),
updated_at: now,
})
}
/// 列出账户的 Webhook 订阅
pub async fn list_subscriptions(
db: &PgPool,
account_id: &str,
) -> SaasResult<Vec<WebhookSubscription>> {
let rows: Vec<WebhookSubscriptionRow> = sqlx::query_as(
"SELECT id, account_id, url, secret, events, enabled, created_at, updated_at
FROM webhook_subscriptions WHERE account_id = $1 ORDER BY created_at DESC"
)
.bind(account_id)
.fetch_all(db)
.await?;
Ok(rows.iter().map(|r| r.to_response()).collect())
}
/// 获取单个 Webhook 订阅
pub async fn get_subscription(
db: &PgPool,
account_id: &str,
subscription_id: &str,
) -> SaasResult<WebhookSubscription> {
let row: Option<WebhookSubscriptionRow> = sqlx::query_as(
"SELECT id, account_id, url, secret, events, enabled, created_at, updated_at
FROM webhook_subscriptions WHERE id = $1 AND account_id = $2"
)
.bind(subscription_id)
.bind(account_id)
.fetch_optional(db)
.await?;
Ok(row
.ok_or_else(|| SaasError::NotFound("Webhook 订阅不存在".into()))?
.to_response())
}
/// 更新 Webhook 订阅
pub async fn update_subscription(
db: &PgPool,
account_id: &str,
subscription_id: &str,
req: &UpdateWebhookRequest,
) -> SaasResult<WebhookSubscription> {
let existing = get_subscription(db, account_id, subscription_id).await?;
let now = chrono::Utc::now().to_rfc3339();
let url = req.url.as_deref().unwrap_or(&existing.url);
let events = req.events.as_deref().unwrap_or(&existing.events);
let enabled = req.enabled.unwrap_or(existing.enabled);
sqlx::query(
"UPDATE webhook_subscriptions SET url = $1, events = $2, enabled = $3, updated_at = $4
WHERE id = $5 AND account_id = $6"
)
.bind(url)
.bind(events)
.bind(enabled)
.bind(&now)
.bind(subscription_id)
.bind(account_id)
.execute(db)
.await?;
get_subscription(db, account_id, subscription_id).await
}
/// 删除 Webhook 订阅
pub async fn delete_subscription(
db: &PgPool,
account_id: &str,
subscription_id: &str,
) -> SaasResult<()> {
let result = sqlx::query(
"DELETE FROM webhook_subscriptions WHERE id = $1 AND account_id = $2"
)
.bind(subscription_id)
.bind(account_id)
.execute(db)
.await?;
if result.rows_affected() == 0 {
return Err(SaasError::NotFound("Webhook 订阅不存在".into()));
}
Ok(())
}
/// 列出订阅的投递日志
pub async fn list_deliveries(
db: &PgPool,
account_id: &str,
subscription_id: &str,
) -> SaasResult<Vec<WebhookDelivery>> {
// 先验证订阅属于该账户
let _ = get_subscription(db, account_id, subscription_id).await?;
let rows: Vec<WebhookDeliveryRow> = sqlx::query_as(
"SELECT id, subscription_id, event, payload, response_status, attempts, delivered_at, created_at
FROM webhook_deliveries WHERE subscription_id = $1 ORDER BY created_at DESC"
)
.bind(subscription_id)
.fetch_all(db)
.await?;
Ok(rows.iter().map(|r| r.to_response()).collect())
}
// ─── 事件触发 ─────────────────────────────────────────────────
/// 触发匹配的 webhook 订阅,创建投递记录。
///
/// 查找所有启用的、事件列表包含 `event` 的订阅,
/// 为每个订阅创建一条 `webhook_deliveries` 记录。
pub async fn trigger_webhooks(
db: &PgPool,
event: &str,
payload: &serde_json::Value,
) -> SaasResult<Vec<String>> {
let subs: Vec<SubscriptionForDeliveryRow> = sqlx::query_as(
"SELECT id, url, secret FROM webhook_subscriptions
WHERE enabled = true AND $1 = ANY(events)"
)
.bind(event)
.fetch_all(db)
.await?;
let mut delivery_ids = Vec::with_capacity(subs.len());
for sub in &subs {
let id = uuid::Uuid::new_v4().to_string();
let now = chrono::Utc::now().to_rfc3339();
sqlx::query(
"INSERT INTO webhook_deliveries (id, subscription_id, event, payload, attempts, created_at)
VALUES ($1, $2, $3, $4, 0, $5)"
)
.bind(&id)
.bind(&sub.id)
.bind(event)
.bind(payload)
.bind(&now)
.execute(db)
.await?;
delivery_ids.push(id);
}
if !delivery_ids.is_empty() {
tracing::info!(
event = %event,
count = delivery_ids.len(),
"Webhook deliveries enqueued"
);
}
Ok(delivery_ids)
}
// ─── 投递查询(供 Worker 使用)────────────────────────────────
/// 获取待投递的记录(未投递且尝试次数 < max_attempts
pub(crate) async fn fetch_pending_deliveries(
db: &PgPool,
max_attempts: i32,
limit: i32,
) -> SaasResult<Vec<PendingDelivery>> {
let rows: Vec<PendingDeliveryRow> = sqlx::query_as(
"SELECT d.id, d.subscription_id, d.event, d.payload, d.attempts,
s.url, s.secret
FROM webhook_deliveries d
JOIN webhook_subscriptions s ON s.id = d.subscription_id
WHERE d.delivered_at IS NULL AND d.attempts < $1 AND s.enabled = true
ORDER BY d.created_at ASC
LIMIT $2"
)
.bind(max_attempts)
.bind(limit)
.fetch_all(db)
.await?;
Ok(rows.into_iter().map(|r| r.to_pending()).collect())
}
/// 记录投递结果
pub async fn record_delivery_result(
db: &PgPool,
delivery_id: &str,
response_status: Option<i32>,
response_body: Option<&str>,
success: bool,
) -> SaasResult<()> {
let now = chrono::Utc::now().to_rfc3339();
if success {
sqlx::query(
"UPDATE webhook_deliveries SET response_status = $1, response_body = $2, delivered_at = $3, attempts = attempts + 1
WHERE id = $4"
)
.bind(response_status)
.bind(response_body)
.bind(&now)
.bind(delivery_id)
.execute(db)
.await?;
} else {
sqlx::query(
"UPDATE webhook_deliveries SET response_status = $1, response_body = $2, attempts = attempts + 1
WHERE id = $3"
)
.bind(response_status)
.bind(response_body)
.bind(delivery_id)
.execute(db)
.await?;
}
Ok(())
}
// ─── 内部类型 ──────────────────────────────────────────────────
#[derive(Debug, FromRow)]
struct PendingDeliveryRow {
id: String,
subscription_id: String,
event: String,
payload: serde_json::Value,
attempts: i32,
url: String,
secret: String,
}
#[allow(dead_code)]
pub(crate) struct PendingDelivery {
pub id: String,
pub subscription_id: String,
pub event: String,
pub payload: serde_json::Value,
pub attempts: i32,
pub url: String,
pub secret: String,
}
impl PendingDeliveryRow {
fn to_pending(self) -> PendingDelivery {
PendingDelivery {
id: self.id,
subscription_id: self.subscription_id,
event: self.event,
payload: self.payload,
attempts: self.attempts,
url: self.url,
secret: self.secret,
}
}
}
/// 生成 32 字节随机签名密钥hex 编码)
fn generate_secret() -> String {
use rand::RngCore;
let mut buf = [0u8; 32];
rand::thread_rng().fill_bytes(&mut buf);
hex::encode(buf)
}