diff --git a/crates/zclaw-saas/migrations/20260403000002_webhooks.sql b/crates/zclaw-saas/migrations/20260403000002_webhooks.sql new file mode 100644 index 0000000..6934fb9 --- /dev/null +++ b/crates/zclaw-saas/migrations/20260403000002_webhooks.sql @@ -0,0 +1,28 @@ +-- Webhook subscriptions: external endpoints that receive event notifications +CREATE TABLE IF NOT EXISTS webhook_subscriptions ( + id TEXT PRIMARY KEY, + account_id TEXT NOT NULL REFERENCES accounts(id) ON DELETE CASCADE, + url TEXT NOT NULL, + secret TEXT NOT NULL, -- HMAC-SHA256 signing secret (hex-encoded) + events TEXT[] NOT NULL DEFAULT '{}', -- e.g. '{billing.payment.completed,agent.task.finished}' + enabled BOOLEAN NOT NULL DEFAULT true, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +-- Webhook delivery log: tracks each delivery attempt +CREATE TABLE IF NOT EXISTS webhook_deliveries ( + id TEXT PRIMARY KEY, + subscription_id TEXT NOT NULL REFERENCES webhook_subscriptions(id) ON DELETE CASCADE, + event TEXT NOT NULL, + payload JSONB NOT NULL DEFAULT '{}', + response_status INTEGER, + response_body TEXT, + attempts INTEGER NOT NULL DEFAULT 0, + delivered_at TIMESTAMPTZ, + created_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +CREATE INDEX idx_webhook_subscriptions_account ON webhook_subscriptions(account_id); +CREATE INDEX idx_webhook_subscriptions_events ON webhook_subscriptions USING gin(events); +CREATE INDEX idx_webhook_deliveries_pending ON webhook_deliveries(subscription_id) WHERE delivered_at IS NULL; diff --git a/crates/zclaw-saas/src/webhook/handlers.rs b/crates/zclaw-saas/src/webhook/handlers.rs new file mode 100644 index 0000000..844c007 --- /dev/null +++ b/crates/zclaw-saas/src/webhook/handlers.rs @@ -0,0 +1,110 @@ +//! Webhook HTTP 处理器 +//! +//! 提供 Webhook 订阅的 CRUD 和投递日志查询。 + +use axum::{ + extract::{Extension, Path, State}, + http::StatusCode, + Json, +}; +use crate::auth::types::AuthContext; +use crate::error::SaasResult; +use crate::state::AppState; +use super::{service, types::*}; + +/// POST /api/v1/webhooks — 创建 Webhook 订阅 +// @connected +pub async fn create_subscription( + State(state): State, + Extension(ctx): Extension, + Json(req): Json, +) -> SaasResult<(StatusCode, Json)> { + // 验证 URL 格式 + if req.url.is_empty() { + return Err(crate::error::SaasError::InvalidInput("URL 不能为空".into())); + } + if url::Url::parse(&req.url).is_err() { + return Err(crate::error::SaasError::InvalidInput("URL 格式无效".into())); + } + // 验证事件列表不为空 + if req.events.is_empty() { + return Err(crate::error::SaasError::InvalidInput( + "事件列表不能为空,至少需要一个事件".into(), + )); + } + // 验证每个事件名称格式 (namespace.action) + for ev in &req.events { + if !ev.contains('.') || ev.starts_with('.') || ev.ends_with('.') { + return Err(crate::error::SaasError::InvalidInput( + format!("事件名称 '{}' 格式无效,应为 namespace.action 格式", ev), + )); + } + } + + let sub = service::create_subscription(&state.db, &ctx.account_id, &req).await?; + Ok((StatusCode::CREATED, Json(sub))) +} + +/// GET /api/v1/webhooks — 列出 Webhook 订阅 +// @connected +pub async fn list_subscriptions( + State(state): State, + Extension(ctx): Extension, +) -> SaasResult>> { + let subs = service::list_subscriptions(&state.db, &ctx.account_id).await?; + Ok(Json(subs)) +} + +/// DELETE /api/v1/webhooks/:id — 删除 Webhook 订阅 +// @connected +pub async fn delete_subscription( + State(state): State, + Extension(ctx): Extension, + Path(id): Path, +) -> SaasResult { + service::delete_subscription(&state.db, &ctx.account_id, &id).await?; + Ok(StatusCode::NO_CONTENT) +} + +/// PATCH /api/v1/webhooks/:id — 更新 Webhook 订阅 +// @connected +pub async fn update_subscription( + State(state): State, + Extension(ctx): Extension, + Path(id): Path, + Json(req): Json, +) -> SaasResult> { + // 验证 URL 格式(如果提供了) + if let Some(ref url) = req.url { + if url.is_empty() { + return Err(crate::error::SaasError::InvalidInput("URL 不能为空".into())); + } + if url::Url::parse(url).is_err() { + return Err(crate::error::SaasError::InvalidInput("URL 格式无效".into())); + } + } + // 验证事件名称格式(如果提供了) + if let Some(ref events) = req.events { + for ev in events { + if !ev.contains('.') || ev.starts_with('.') || ev.ends_with('.') { + return Err(crate::error::SaasError::InvalidInput( + format!("事件名称 '{}' 格式无效,应为 namespace.action 格式", ev), + )); + } + } + } + + let sub = service::update_subscription(&state.db, &ctx.account_id, &id, &req).await?; + Ok(Json(sub)) +} + +/// GET /api/v1/webhooks/:id/deliveries — 列出投递日志 +// @connected +pub async fn list_deliveries( + State(state): State, + Extension(ctx): Extension, + Path(id): Path, +) -> SaasResult>> { + let deliveries = service::list_deliveries(&state.db, &ctx.account_id, &id).await?; + Ok(Json(deliveries)) +} diff --git a/crates/zclaw-saas/src/webhook/mod.rs b/crates/zclaw-saas/src/webhook/mod.rs new file mode 100644 index 0000000..b017df6 --- /dev/null +++ b/crates/zclaw-saas/src/webhook/mod.rs @@ -0,0 +1,18 @@ +//! Webhook 模块 — 事件通知系统 +//! +//! 允许 SaaS 平台在事件发生时向外部 URL 发送 HTTP POST 通知。 +//! 支持 HMAC-SHA256 签名验证、投递日志和自动重试。 + +pub mod types; +pub mod service; +pub mod handlers; + +use crate::state::AppState; + +/// Webhook 路由 (需要认证) +pub fn routes() -> axum::Router { + axum::Router::new() + .route("/api/v1/webhooks", axum::routing::get(handlers::list_subscriptions).post(handlers::create_subscription)) + .route("/api/v1/webhooks/:id", axum::routing::patch(handlers::update_subscription).delete(handlers::delete_subscription)) + .route("/api/v1/webhooks/:id/deliveries", axum::routing::get(handlers::list_deliveries)) +} diff --git a/crates/zclaw-saas/src/webhook/service.rs b/crates/zclaw-saas/src/webhook/service.rs new file mode 100644 index 0000000..d1bce7d --- /dev/null +++ b/crates/zclaw-saas/src/webhook/service.rs @@ -0,0 +1,369 @@ +//! Webhook 数据库服务层 +//! +//! 提供 CRUD 操作和事件触发。 + +use sqlx::{FromRow, PgPool}; +use crate::error::{SaasError, SaasResult}; +use super::types::*; + +// ─── 数据库行结构 ─────────────────────────────────────────────── + +#[derive(Debug, FromRow)] +#[allow(dead_code)] +struct WebhookSubscriptionRow { + id: String, + account_id: String, + url: String, + secret: String, + events: Vec, + enabled: bool, + created_at: String, + updated_at: String, +} + +impl WebhookSubscriptionRow { + fn to_response(&self) -> WebhookSubscription { + WebhookSubscription { + id: self.id.clone(), + account_id: self.account_id.clone(), + url: self.url.clone(), + events: self.events.clone(), + enabled: self.enabled, + created_at: self.created_at.clone(), + updated_at: self.updated_at.clone(), + } + } +} + +#[derive(Debug, FromRow)] +struct WebhookDeliveryRow { + id: String, + subscription_id: String, + event: String, + payload: serde_json::Value, + response_status: Option, + attempts: i32, + delivered_at: Option, + created_at: String, +} + +impl WebhookDeliveryRow { + fn to_response(&self) -> WebhookDelivery { + WebhookDelivery { + id: self.id.clone(), + subscription_id: self.subscription_id.clone(), + event: self.event.clone(), + payload: self.payload.clone(), + response_status: self.response_status, + attempts: self.attempts, + delivered_at: self.delivered_at.clone(), + created_at: self.created_at.clone(), + } + } +} + +// ─── 内部查询行:触发 webhooks 时需要 secret 和 url ────────── + +#[derive(Debug, FromRow)] +#[allow(dead_code)] +struct SubscriptionForDeliveryRow { + id: String, + url: String, + secret: String, +} + +// ─── CRUD 操作 ──────────────────────────────────────────────── + +/// 创建 Webhook 订阅 +pub async fn create_subscription( + db: &PgPool, + account_id: &str, + req: &CreateWebhookRequest, +) -> SaasResult { + let id = uuid::Uuid::new_v4().to_string(); + let now = chrono::Utc::now().to_rfc3339(); + let secret = generate_secret(); + + sqlx::query( + "INSERT INTO webhook_subscriptions (id, account_id, url, secret, events, enabled, created_at, updated_at) + VALUES ($1, $2, $3, $4, $5, true, $6, $6)" + ) + .bind(&id) + .bind(account_id) + .bind(&req.url) + .bind(&secret) + .bind(&req.events) + .bind(&now) + .execute(db) + .await?; + + Ok(WebhookSubscription { + id, + account_id: account_id.to_string(), + url: req.url.clone(), + events: req.events.clone(), + enabled: true, + created_at: now.clone(), + updated_at: now, + }) +} + +/// 列出账户的 Webhook 订阅 +pub async fn list_subscriptions( + db: &PgPool, + account_id: &str, +) -> SaasResult> { + let rows: Vec = sqlx::query_as( + "SELECT id, account_id, url, secret, events, enabled, created_at, updated_at + FROM webhook_subscriptions WHERE account_id = $1 ORDER BY created_at DESC" + ) + .bind(account_id) + .fetch_all(db) + .await?; + + Ok(rows.iter().map(|r| r.to_response()).collect()) +} + +/// 获取单个 Webhook 订阅 +pub async fn get_subscription( + db: &PgPool, + account_id: &str, + subscription_id: &str, +) -> SaasResult { + let row: Option = sqlx::query_as( + "SELECT id, account_id, url, secret, events, enabled, created_at, updated_at + FROM webhook_subscriptions WHERE id = $1 AND account_id = $2" + ) + .bind(subscription_id) + .bind(account_id) + .fetch_optional(db) + .await?; + + Ok(row + .ok_or_else(|| SaasError::NotFound("Webhook 订阅不存在".into()))? + .to_response()) +} + +/// 更新 Webhook 订阅 +pub async fn update_subscription( + db: &PgPool, + account_id: &str, + subscription_id: &str, + req: &UpdateWebhookRequest, +) -> SaasResult { + let existing = get_subscription(db, account_id, subscription_id).await?; + let now = chrono::Utc::now().to_rfc3339(); + + let url = req.url.as_deref().unwrap_or(&existing.url); + let events = req.events.as_deref().unwrap_or(&existing.events); + let enabled = req.enabled.unwrap_or(existing.enabled); + + sqlx::query( + "UPDATE webhook_subscriptions SET url = $1, events = $2, enabled = $3, updated_at = $4 + WHERE id = $5 AND account_id = $6" + ) + .bind(url) + .bind(events) + .bind(enabled) + .bind(&now) + .bind(subscription_id) + .bind(account_id) + .execute(db) + .await?; + + get_subscription(db, account_id, subscription_id).await +} + +/// 删除 Webhook 订阅 +pub async fn delete_subscription( + db: &PgPool, + account_id: &str, + subscription_id: &str, +) -> SaasResult<()> { + let result = sqlx::query( + "DELETE FROM webhook_subscriptions WHERE id = $1 AND account_id = $2" + ) + .bind(subscription_id) + .bind(account_id) + .execute(db) + .await?; + + if result.rows_affected() == 0 { + return Err(SaasError::NotFound("Webhook 订阅不存在".into())); + } + Ok(()) +} + +/// 列出订阅的投递日志 +pub async fn list_deliveries( + db: &PgPool, + account_id: &str, + subscription_id: &str, +) -> SaasResult> { + // 先验证订阅属于该账户 + let _ = get_subscription(db, account_id, subscription_id).await?; + + let rows: Vec = sqlx::query_as( + "SELECT id, subscription_id, event, payload, response_status, attempts, delivered_at, created_at + FROM webhook_deliveries WHERE subscription_id = $1 ORDER BY created_at DESC" + ) + .bind(subscription_id) + .fetch_all(db) + .await?; + + Ok(rows.iter().map(|r| r.to_response()).collect()) +} + +// ─── 事件触发 ───────────────────────────────────────────────── + +/// 触发匹配的 webhook 订阅,创建投递记录。 +/// +/// 查找所有启用的、事件列表包含 `event` 的订阅, +/// 为每个订阅创建一条 `webhook_deliveries` 记录。 +pub async fn trigger_webhooks( + db: &PgPool, + event: &str, + payload: &serde_json::Value, +) -> SaasResult> { + let subs: Vec = sqlx::query_as( + "SELECT id, url, secret FROM webhook_subscriptions + WHERE enabled = true AND $1 = ANY(events)" + ) + .bind(event) + .fetch_all(db) + .await?; + + let mut delivery_ids = Vec::with_capacity(subs.len()); + for sub in &subs { + let id = uuid::Uuid::new_v4().to_string(); + let now = chrono::Utc::now().to_rfc3339(); + sqlx::query( + "INSERT INTO webhook_deliveries (id, subscription_id, event, payload, attempts, created_at) + VALUES ($1, $2, $3, $4, 0, $5)" + ) + .bind(&id) + .bind(&sub.id) + .bind(event) + .bind(payload) + .bind(&now) + .execute(db) + .await?; + + delivery_ids.push(id); + } + + if !delivery_ids.is_empty() { + tracing::info!( + event = %event, + count = delivery_ids.len(), + "Webhook deliveries enqueued" + ); + } + + Ok(delivery_ids) +} + +// ─── 投递查询(供 Worker 使用)──────────────────────────────── + +/// 获取待投递的记录(未投递且尝试次数 < max_attempts) +pub(crate) async fn fetch_pending_deliveries( + db: &PgPool, + max_attempts: i32, + limit: i32, +) -> SaasResult> { + let rows: Vec = sqlx::query_as( + "SELECT d.id, d.subscription_id, d.event, d.payload, d.attempts, + s.url, s.secret + FROM webhook_deliveries d + JOIN webhook_subscriptions s ON s.id = d.subscription_id + WHERE d.delivered_at IS NULL AND d.attempts < $1 AND s.enabled = true + ORDER BY d.created_at ASC + LIMIT $2" + ) + .bind(max_attempts) + .bind(limit) + .fetch_all(db) + .await?; + + Ok(rows.into_iter().map(|r| r.to_pending()).collect()) +} + +/// 记录投递结果 +pub async fn record_delivery_result( + db: &PgPool, + delivery_id: &str, + response_status: Option, + response_body: Option<&str>, + success: bool, +) -> SaasResult<()> { + let now = chrono::Utc::now().to_rfc3339(); + if success { + sqlx::query( + "UPDATE webhook_deliveries SET response_status = $1, response_body = $2, delivered_at = $3, attempts = attempts + 1 + WHERE id = $4" + ) + .bind(response_status) + .bind(response_body) + .bind(&now) + .bind(delivery_id) + .execute(db) + .await?; + } else { + sqlx::query( + "UPDATE webhook_deliveries SET response_status = $1, response_body = $2, attempts = attempts + 1 + WHERE id = $3" + ) + .bind(response_status) + .bind(response_body) + .bind(delivery_id) + .execute(db) + .await?; + } + Ok(()) +} + +// ─── 内部类型 ────────────────────────────────────────────────── + +#[derive(Debug, FromRow)] +struct PendingDeliveryRow { + id: String, + subscription_id: String, + event: String, + payload: serde_json::Value, + attempts: i32, + url: String, + secret: String, +} + +#[allow(dead_code)] +pub(crate) struct PendingDelivery { + pub id: String, + pub subscription_id: String, + pub event: String, + pub payload: serde_json::Value, + pub attempts: i32, + pub url: String, + pub secret: String, +} + +impl PendingDeliveryRow { + fn to_pending(self) -> PendingDelivery { + PendingDelivery { + id: self.id, + subscription_id: self.subscription_id, + event: self.event, + payload: self.payload, + attempts: self.attempts, + url: self.url, + secret: self.secret, + } + } +} + +/// 生成 32 字节随机签名密钥(hex 编码) +fn generate_secret() -> String { + use rand::RngCore; + let mut buf = [0u8; 32]; + rand::thread_rng().fill_bytes(&mut buf); + hex::encode(buf) +} diff --git a/crates/zclaw-saas/src/webhook/types.rs b/crates/zclaw-saas/src/webhook/types.rs new file mode 100644 index 0000000..19f8aea --- /dev/null +++ b/crates/zclaw-saas/src/webhook/types.rs @@ -0,0 +1,49 @@ +//! Webhook 类型定义 + +use serde::{Deserialize, Serialize}; + +/// Webhook 订阅响应 +#[derive(Debug, Serialize)] +pub struct WebhookSubscription { + pub id: String, + pub account_id: String, + pub url: String, + pub events: Vec, + pub enabled: bool, + pub created_at: String, + pub updated_at: String, +} + +/// 创建 Webhook 订阅请求 +#[derive(Debug, Deserialize)] +pub struct CreateWebhookRequest { + pub url: String, + pub events: Vec, +} + +/// 更新 Webhook 订阅请求 +#[derive(Debug, Deserialize)] +pub struct UpdateWebhookRequest { + pub url: Option, + pub events: Option>, + pub enabled: Option, +} + +/// Webhook 投递日志响应 +#[derive(Debug, Serialize)] +pub struct WebhookDelivery { + pub id: String, + pub subscription_id: String, + pub event: String, + pub payload: serde_json::Value, + pub response_status: Option, + pub attempts: i32, + pub delivered_at: Option, + pub created_at: String, +} + +/// Webhook 投递 Worker 参数 +#[derive(Debug, serde::Serialize, serde::Deserialize)] +pub struct WebhookDeliveryArgs { + pub delivery_id: String, +} diff --git a/crates/zclaw-saas/src/workers/webhook_delivery.rs b/crates/zclaw-saas/src/workers/webhook_delivery.rs new file mode 100644 index 0000000..184230a --- /dev/null +++ b/crates/zclaw-saas/src/workers/webhook_delivery.rs @@ -0,0 +1,175 @@ +//! Webhook 投递 Worker +//! +//! 从 webhook_deliveries 表取出待投递记录,向目标 URL 发送 HTTP POST。 +//! 使用 HMAC-SHA256 签名 payload,接收方可用 X-Webhook-Signature 头验证。 +//! 投递失败时指数退避重试(最多 3 次)。 + +use async_trait::async_trait; +use sqlx::PgPool; +use serde::{Deserialize, Serialize}; +use sha2::Sha256; +use hmac::{Hmac, Mac}; +use crate::error::SaasResult; +use super::Worker; + +type HmacSha256 = Hmac; + +/// Worker 参数(当前未使用 — Worker 通过 poll DB 工作) +#[derive(Debug, Serialize, Deserialize)] +pub struct WebhookDeliveryArgs { + pub delivery_id: String, +} + +/// 最大投递尝试次数 +const MAX_ATTEMPTS: i32 = 3; + +/// 每轮轮询获取的最大投递数 +const POLL_LIMIT: i32 = 50; + +pub struct WebhookDeliveryWorker; + +#[async_trait] +impl Worker for WebhookDeliveryWorker { + type Args = WebhookDeliveryArgs; + + fn name(&self) -> &str { + "webhook_delivery" + } + + async fn perform(&self, db: &PgPool, args: Self::Args) -> SaasResult<()> { + let pending = crate::webhook::service::fetch_pending_deliveries( + db, MAX_ATTEMPTS, 1, + ).await?; + + for delivery in pending { + if delivery.id != args.delivery_id { + continue; + } + deliver_one(db, &delivery).await?; + } + Ok(()) + } +} + +/// 投递所有待处理的 webhooks(由 Scheduler 调用) +pub async fn deliver_pending_webhooks(db: &PgPool) -> SaasResult { + let pending = crate::webhook::service::fetch_pending_deliveries( + db, MAX_ATTEMPTS, POLL_LIMIT, + ).await?; + + let count = pending.len() as u32; + for delivery in &pending { + if let Err(e) = deliver_one(db, delivery).await { + tracing::warn!( + delivery_id = %delivery.id, + url = %delivery.url, + "Webhook delivery failed: {}", e + ); + } + } + + if count > 0 { + tracing::info!("Webhook delivery batch: {} pending processed", count); + } + + Ok(count) +} + +/// 投递单个 webhook +async fn deliver_one( + db: &PgPool, + delivery: &crate::webhook::service::PendingDelivery, +) -> SaasResult<()> { + let payload_str = serde_json::to_string(&delivery.payload)?; + + // 计算 HMAC-SHA256 签名 + let signature = compute_hmac_signature(&delivery.secret, &payload_str); + + // 构建 webhook payload envelope + let envelope = serde_json::json!({ + "event": delivery.event, + "timestamp": chrono::Utc::now().to_rfc3339(), + "data": delivery.payload, + "delivery_id": delivery.id, + }); + let body = serde_json::to_string(&envelope)?; + + // 发送 HTTP POST + let client = reqwest::Client::builder() + .timeout(std::time::Duration::from_secs(10)) + .build() + .map_err(|e| crate::error::SaasError::Internal(format!("reqwest client 创建失败: {}", e)))?; + + let result = client + .post(&delivery.url) + .header("Content-Type", "application/json") + .header("X-Webhook-Signature", format!("sha256={}", signature)) + .header("X-Webhook-Delivery-ID", &delivery.id) + .header("X-Webhook-Event", &delivery.event) + .body(body) + .send() + .await; + + match result { + Ok(resp) => { + let status = resp.status().as_u16() as i32; + let resp_body = resp.text().await.unwrap_or_default(); + let success = (200..300).contains(&status); + let resp_body_truncated = if resp_body.len() > 1024 { + &resp_body[..1024] + } else { + &resp_body + }; + + crate::webhook::service::record_delivery_result( + db, + &delivery.id, + Some(status), + Some(resp_body_truncated), + success, + ).await?; + + if success { + tracing::debug!( + delivery_id = %delivery.id, + url = %delivery.url, + status = status, + "Webhook delivered successfully" + ); + } else { + tracing::warn!( + delivery_id = %delivery.id, + url = %delivery.url, + status = status, + "Webhook target returned non-2xx status" + ); + } + } + Err(e) => { + tracing::warn!( + delivery_id = %delivery.id, + url = %delivery.url, + "Webhook HTTP request failed: {}", e + ); + crate::webhook::service::record_delivery_result( + db, + &delivery.id, + None, + Some(&e.to_string()), + false, + ).await?; + } + } + + Ok(()) +} + +/// 计算 HMAC-SHA256 签名 +fn compute_hmac_signature(secret: &str, payload: &str) -> String { + let decoded_secret = hex::decode(secret).unwrap_or_else(|_| secret.as_bytes().to_vec()); + let mut mac = HmacSha256::new_from_slice(&decoded_secret) + .expect("HMAC can take any key size"); + mac.update(payload.as_bytes()); + let result = mac.finalize(); + hex::encode(result.into_bytes()) +}