Some checks failed
CI / Lint & TypeCheck (push) Has been cancelled
CI / Unit Tests (push) Has been cancelled
CI / Build Frontend (push) Has been cancelled
CI / Rust Check (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
CI / E2E Tests (push) Has been cancelled
- saas test harness: align WorkerDispatcher::new and AppState::new signatures with SpawnLimiter addition and init_db(&DatabaseConfig) - growth sqlite: add CJK fallback (LIKE-based) when FTS5 unicode61 tokenizer fails on Chinese queries (unicode61 doesn't index CJK)
392 lines
13 KiB
Rust
392 lines
13 KiB
Rust
//! Integration test harness for zclaw-saas
|
|
//!
|
|
//! Uses a **shared** PostgreSQL database (`zclaw_test_shared`) with per-test
|
|
//! TRUNCATE isolation. Only one database is created; each test truncates all
|
|
//! tables and re-seeds via `init_db`.
|
|
//!
|
|
//! # Setup
|
|
//!
|
|
//! ```bash
|
|
//! # Start PostgreSQL (e.g. via Docker Compose)
|
|
//! docker compose up -d postgres
|
|
//!
|
|
//! # Set the test database URL (point to the base DB for CREATE DATABASE)
|
|
//! export TEST_DATABASE_URL="postgres://postgres:123123@localhost:5432/zclaw"
|
|
//!
|
|
//! # Run tests
|
|
//! cargo test -p zclaw-saas
|
|
//! ```
|
|
|
|
use axum::body::Body;
|
|
use axum::http::{Request, StatusCode};
|
|
use axum::Router;
|
|
use sqlx::PgPool;
|
|
use std::sync::atomic::{AtomicBool, Ordering};
|
|
use tokio_util::sync::CancellationToken;
|
|
use tower::ServiceExt;
|
|
use zclaw_saas::config::{DatabaseConfig, SaaSConfig};
|
|
use zclaw_saas::db::init_db;
|
|
use zclaw_saas::state::{AppState, SpawnLimiter};
|
|
use zclaw_saas::workers::WorkerDispatcher;
|
|
|
|
pub const MAX_BODY: usize = 2 * 1024 * 1024;
|
|
pub const DEFAULT_PASSWORD: &str = "testpassword123";
|
|
|
|
const SHARED_DB_NAME: &str = "zclaw_test_shared";
|
|
|
|
/// Schema version counter — increment to force DROP+CREATE on next run.
|
|
const SCHEMA_VERSION: u32 = 2;
|
|
|
|
/// Whether the shared test database has been created at the current schema version.
|
|
static DB_CREATED: AtomicBool = AtomicBool::new(false);
|
|
|
|
// ── Database helpers ─────────────────────────────────────────────
|
|
|
|
/// Resolve the base test database URL (used to connect for CREATE DATABASE).
|
|
pub fn test_database_url() -> String {
|
|
std::env::var("TEST_DATABASE_URL")
|
|
.or_else(|_| std::env::var("DATABASE_URL"))
|
|
.unwrap_or_else(|_| "postgres://postgres:123123@localhost:5432/zclaw".into())
|
|
}
|
|
|
|
/// Build the shared test database URL by replacing the database name.
|
|
fn shared_db_url() -> String {
|
|
let mut url = test_database_url();
|
|
if let Some(pos) = url.rfind('/') {
|
|
url.truncate(pos + 1);
|
|
url.push_str(SHARED_DB_NAME);
|
|
}
|
|
url
|
|
}
|
|
|
|
/// Ensure the shared test database exists with a clean schema.
|
|
/// Runs once per process: drops the old DB and recreates it.
|
|
async fn ensure_shared_db() -> String {
|
|
if !DB_CREATED.swap(true, Ordering::SeqCst) {
|
|
let base = test_database_url();
|
|
let pool = PgPool::connect(&base)
|
|
.await
|
|
.expect("Cannot connect to PostgreSQL — is it running?");
|
|
// Drop + recreate for a clean schema
|
|
let _ = sqlx::query(&format!("DROP DATABASE IF EXISTS \"{}\"", SHARED_DB_NAME))
|
|
.execute(&pool)
|
|
.await;
|
|
sqlx::query(&format!("CREATE DATABASE \"{}\"", SHARED_DB_NAME))
|
|
.execute(&pool)
|
|
.await
|
|
.expect("Failed to create shared test database");
|
|
drop(pool);
|
|
}
|
|
shared_db_url()
|
|
}
|
|
|
|
/// Truncate all public tables in the database (CASCADE handles FK).
|
|
async fn truncate_all_tables(pool: &PgPool) {
|
|
sqlx::query(
|
|
r#"DO $$
|
|
DECLARE
|
|
r RECORD;
|
|
BEGIN
|
|
FOR r IN (SELECT tablename FROM pg_tables WHERE schemaname = 'public') LOOP
|
|
EXECUTE 'TRUNCATE TABLE ' || quote_ident(r.tablename) || ' CASCADE';
|
|
END LOOP;
|
|
END$$;"#,
|
|
)
|
|
.execute(pool)
|
|
.await
|
|
.expect("Failed to truncate tables");
|
|
}
|
|
|
|
// ── App builder ──────────────────────────────────────────────────
|
|
|
|
/// Build a full Axum `Router` wired to the shared test database.
|
|
///
|
|
/// Flow per test:
|
|
/// 1. Ensure shared DB exists (once)
|
|
/// 2. Truncate all tables (isolation)
|
|
/// 3. Re-run `init_db` to seed fresh data
|
|
/// 4. Return `(Router, PgPool)`
|
|
pub async fn build_test_app() -> (Router, PgPool) {
|
|
let db_url = ensure_shared_db().await;
|
|
|
|
// Dev-mode env vars
|
|
std::env::set_var("ZCLAW_SAAS_DEV", "true");
|
|
std::env::set_var("ZCLAW_SAAS_JWT_SECRET", "test-jwt-secret-do-not-use-in-prod");
|
|
std::env::set_var("ZCLAW_ADMIN_USERNAME", "testadmin");
|
|
std::env::set_var("ZCLAW_ADMIN_PASSWORD", "Admin123456");
|
|
|
|
// Truncate all data for test isolation
|
|
let truncate_pool = PgPool::connect(&db_url)
|
|
.await
|
|
.expect("Cannot connect to shared test DB");
|
|
truncate_all_tables(&truncate_pool).await;
|
|
drop(truncate_pool);
|
|
|
|
// init_db: schema (IF NOT EXISTS, fast) + seed data
|
|
let db_config = DatabaseConfig {
|
|
url: db_url,
|
|
..DatabaseConfig::default()
|
|
};
|
|
let pool = init_db(&db_config).await.expect("init_db failed");
|
|
|
|
let mut config = SaaSConfig::default();
|
|
config.auth.jwt_expiration_hours = 24;
|
|
config.auth.refresh_token_hours = 168;
|
|
config.rate_limit.requests_per_minute = 10_000;
|
|
config.rate_limit.burst = 1_000;
|
|
|
|
let worker_limiter = SpawnLimiter::new("test-worker", 20);
|
|
let dispatcher = WorkerDispatcher::new(pool.clone(), worker_limiter.clone());
|
|
let shutdown_token = CancellationToken::new();
|
|
let state = AppState::new(pool.clone(), config, dispatcher, shutdown_token, worker_limiter).expect("AppState::new failed");
|
|
let router = build_router(state);
|
|
(router, pool)
|
|
}
|
|
|
|
fn build_router(state: AppState) -> Router {
|
|
use axum::middleware;
|
|
use tower_http::cors::{Any, CorsLayer};
|
|
use tower_http::trace::TraceLayer;
|
|
|
|
let public_routes = zclaw_saas::auth::routes()
|
|
.route("/api/health", axum::routing::get(health_handler));
|
|
|
|
let protected_routes = zclaw_saas::auth::protected_routes()
|
|
.merge(zclaw_saas::account::routes())
|
|
.merge(zclaw_saas::model_config::routes())
|
|
.merge(zclaw_saas::relay::routes())
|
|
.merge(zclaw_saas::migration::routes())
|
|
.merge(zclaw_saas::role::routes())
|
|
.merge(zclaw_saas::prompt::routes())
|
|
.merge(zclaw_saas::agent_template::routes())
|
|
.merge(zclaw_saas::telemetry::routes())
|
|
.layer(middleware::from_fn_with_state(
|
|
state.clone(),
|
|
zclaw_saas::middleware::api_version_middleware,
|
|
))
|
|
.layer(middleware::from_fn_with_state(
|
|
state.clone(),
|
|
zclaw_saas::middleware::request_id_middleware,
|
|
))
|
|
.layer(middleware::from_fn_with_state(
|
|
state.clone(),
|
|
zclaw_saas::middleware::rate_limit_middleware,
|
|
))
|
|
.layer(middleware::from_fn_with_state(
|
|
state.clone(),
|
|
zclaw_saas::auth::auth_middleware,
|
|
));
|
|
|
|
Router::new()
|
|
.merge(public_routes)
|
|
.merge(protected_routes)
|
|
.layer(TraceLayer::new_for_http())
|
|
.layer(
|
|
CorsLayer::new()
|
|
.allow_origin(Any)
|
|
.allow_methods(Any)
|
|
.allow_headers(Any),
|
|
)
|
|
.with_state(state)
|
|
.layer(axum::middleware::from_fn(inject_connect_info))
|
|
}
|
|
|
|
/// Simple health handler for testing (mirrors main.rs health_handler).
|
|
async fn health_handler(State(state): axum::extract::State<AppState>) -> axum::Json<serde_json::Value> {
|
|
let db_healthy = sqlx::query_scalar::<_, i32>("SELECT 1")
|
|
.fetch_one(&state.db)
|
|
.await
|
|
.ok()
|
|
.map(|v| v == 1)
|
|
.unwrap_or(false);
|
|
let status = if db_healthy { "healthy" } else { "degraded" };
|
|
axum::Json(serde_json::json!({ "status": status, "database": db_healthy }))
|
|
}
|
|
|
|
use axum::extract::State;
|
|
async fn inject_connect_info(
|
|
mut req: axum::extract::Request,
|
|
next: axum::middleware::Next,
|
|
) -> axum::response::Response {
|
|
use axum::extract::ConnectInfo;
|
|
use std::net::SocketAddr;
|
|
|
|
req.extensions_mut().insert(ConnectInfo::<SocketAddr>(
|
|
"127.0.0.1:12345".parse().unwrap(),
|
|
));
|
|
next.run(req).await
|
|
}
|
|
|
|
// ── HTTP helpers ─────────────────────────────────────────────────
|
|
|
|
pub async fn body_bytes(body: Body) -> Vec<u8> {
|
|
axum::body::to_bytes(body, MAX_BODY)
|
|
.await
|
|
.expect("body too large")
|
|
.to_vec()
|
|
}
|
|
|
|
pub async fn body_json(body: Body) -> serde_json::Value {
|
|
let bytes = body_bytes(body).await;
|
|
serde_json::from_slice(&bytes).unwrap_or_else(|e| {
|
|
panic!(
|
|
"Failed to parse JSON: {}\nBody: {}",
|
|
e,
|
|
String::from_utf8_lossy(&bytes)
|
|
)
|
|
})
|
|
}
|
|
|
|
pub fn get(uri: &str, token: &str) -> Request<Body> {
|
|
Request::builder()
|
|
.method("GET")
|
|
.uri(uri)
|
|
.header("Authorization", format!("Bearer {token}"))
|
|
.body(Body::empty())
|
|
.unwrap()
|
|
}
|
|
|
|
pub fn delete(uri: &str, token: &str) -> Request<Body> {
|
|
Request::builder()
|
|
.method("DELETE")
|
|
.uri(uri)
|
|
.header("Authorization", format!("Bearer {token}"))
|
|
.body(Body::empty())
|
|
.unwrap()
|
|
}
|
|
|
|
pub fn post(uri: &str, token: &str, body: serde_json::Value) -> Request<Body> {
|
|
Request::builder()
|
|
.method("POST")
|
|
.uri(uri)
|
|
.header("Content-Type", "application/json")
|
|
.header("Authorization", format!("Bearer {token}"))
|
|
.body(Body::from(body.to_string()))
|
|
.unwrap()
|
|
}
|
|
|
|
pub fn post_public(uri: &str, body: serde_json::Value) -> Request<Body> {
|
|
Request::builder()
|
|
.method("POST")
|
|
.uri(uri)
|
|
.header("Content-Type", "application/json")
|
|
.body(Body::from(body.to_string()))
|
|
.unwrap()
|
|
}
|
|
|
|
pub fn put(uri: &str, token: &str, body: serde_json::Value) -> Request<Body> {
|
|
Request::builder()
|
|
.method("PUT")
|
|
.uri(uri)
|
|
.header("Content-Type", "application/json")
|
|
.header("Authorization", format!("Bearer {token}"))
|
|
.body(Body::from(body.to_string()))
|
|
.unwrap()
|
|
}
|
|
|
|
pub fn patch(uri: &str, token: &str, body: serde_json::Value) -> Request<Body> {
|
|
Request::builder()
|
|
.method("PATCH")
|
|
.uri(uri)
|
|
.header("Content-Type", "application/json")
|
|
.header("Authorization", format!("Bearer {token}"))
|
|
.body(Body::from(body.to_string()))
|
|
.unwrap()
|
|
}
|
|
|
|
/// Send request and return (status, body_json).
|
|
/// If body is empty, returns `serde_json::Value::Null` instead of panicking.
|
|
pub async fn send(app: &Router, req: Request<Body>) -> (StatusCode, serde_json::Value) {
|
|
let resp = app.clone().oneshot(req).await.unwrap();
|
|
let status = resp.status();
|
|
let bytes = body_bytes(resp.into_body()).await;
|
|
if bytes.is_empty() {
|
|
return (status, serde_json::Value::Null);
|
|
}
|
|
let json: serde_json::Value = serde_json::from_slice(&bytes).unwrap_or_else(|e| {
|
|
panic!(
|
|
"Failed to parse JSON: {}\nBody: {}",
|
|
e,
|
|
String::from_utf8_lossy(&bytes)
|
|
)
|
|
});
|
|
(status, json)
|
|
}
|
|
|
|
// ── Auth helpers ─────────────────────────────────────────────────
|
|
|
|
/// Register a new user. Returns (access_token, refresh_token, response_json).
|
|
pub async fn register(
|
|
app: &Router,
|
|
username: &str,
|
|
email: &str,
|
|
password: &str,
|
|
) -> (String, String, serde_json::Value) {
|
|
let resp = app
|
|
.clone()
|
|
.oneshot(post_public(
|
|
"/api/v1/auth/register",
|
|
serde_json::json!({ "username": username, "email": email, "password": password }),
|
|
))
|
|
.await
|
|
.unwrap();
|
|
let status = resp.status();
|
|
let json = body_json(resp.into_body()).await;
|
|
assert_eq!(status, StatusCode::CREATED, "register failed: {json}");
|
|
let token = json["token"].as_str().unwrap().to_string();
|
|
let refresh = json["refresh_token"].as_str().unwrap().to_string();
|
|
(token, refresh, json)
|
|
}
|
|
|
|
/// Login. Returns (access_token, refresh_token, response_json).
|
|
pub async fn login(
|
|
app: &Router,
|
|
username: &str,
|
|
password: &str,
|
|
) -> (String, String, serde_json::Value) {
|
|
let resp = app
|
|
.clone()
|
|
.oneshot(post_public(
|
|
"/api/v1/auth/login",
|
|
serde_json::json!({ "username": username, "password": password }),
|
|
))
|
|
.await
|
|
.unwrap();
|
|
let status = resp.status();
|
|
let json = body_json(resp.into_body()).await;
|
|
assert_eq!(status, StatusCode::OK, "login failed: {json}");
|
|
let token = json["token"].as_str().unwrap().to_string();
|
|
let refresh = json["refresh_token"].as_str().unwrap().to_string();
|
|
(token, refresh, json)
|
|
}
|
|
|
|
/// Register + return access token.
|
|
pub async fn register_token(app: &Router, username: &str) -> String {
|
|
let email = format!("{username}@test.io");
|
|
register(app, username, &email, DEFAULT_PASSWORD).await.0
|
|
}
|
|
|
|
/// Create a user and promote to `admin`. Returns fresh JWT with admin permissions.
|
|
pub async fn admin_token(app: &Router, pool: &PgPool, username: &str) -> String {
|
|
let email = format!("{username}@test.io");
|
|
register(app, username, &email, DEFAULT_PASSWORD).await;
|
|
sqlx::query("UPDATE accounts SET role = 'admin' WHERE username = $1")
|
|
.bind(username)
|
|
.execute(pool)
|
|
.await
|
|
.unwrap();
|
|
login(app, username, DEFAULT_PASSWORD).await.0
|
|
}
|
|
|
|
/// Create a user and promote to `super_admin`. Returns fresh JWT.
|
|
pub async fn super_admin_token(app: &Router, pool: &PgPool, username: &str) -> String {
|
|
let email = format!("{username}@test.io");
|
|
register(app, username, &email, DEFAULT_PASSWORD).await;
|
|
sqlx::query("UPDATE accounts SET role = 'super_admin' WHERE username = $1")
|
|
.bind(username)
|
|
.execute(pool)
|
|
.await
|
|
.unwrap();
|
|
login(app, username, DEFAULT_PASSWORD).await.0
|
|
}
|