//! Common test utilities for E2E testing. //! //! This module provides utilities for: //! - Spawning and managing test daemon instances //! - Creating test agents //! - Making API calls //! - Waiting for service readiness //! //! ## Example //! //! ```rust,ignore //! use e2e::common::*; //! //! #[tokio::test] //! async fn my_test() { //! let daemon = spawn_daemon().await; //! wait_for_health(&daemon.base_url()).await; //! //! let agent = create_test_agent(&daemon, DEFAULT_MANIFEST).await; //! let response = send_message(&daemon, &agent.id, "Hello!").await; //! //! assert!(!response.is_empty()); //! } use axum::Router; use openfang_api::middleware; use openfang_api::rate_limiter; use openfang_api::routes::{self, AppState}; use openfang_api::ws; use openfang_kernel::OpenFangKernel; use openfang_types::config::{DefaultModelConfig, KernelConfig}; use std::net::SocketAddr; use std::path::PathBuf; use std::sync::Arc; use std::time::{Duration, Instant}; use tempfile::TempDir; use tower_http::cors::CorsLayer; use tower_http::trace::TraceLayer; // --------------------------------------------------------------------------- // Constants // --------------------------------------------------------------------------- /// Default test manifest using ollama (no API key required). pub const DEFAULT_MANIFEST: &str = r#" name = "test-agent" version = "0.1.0" description = "E2E test agent" author = "test" module = "builtin:chat" [model] provider = "ollama" model = "test-model" system_prompt = "You are a test agent. Reply concisely." [capabilities] tools = ["file_read"] memory_read = ["*"] memory_write = ["self.*"] "#; /// Manifest for real LLM tests using Groq. pub const LLM_MANIFEST: &str = r#" name = "llm-test-agent" version = "0.1.0" description = "E2E test agent with real LLM" author = "test" module = "builtin:chat" [model] provider = "groq" model = "llama-3.3-70b-versatile" system_prompt = "You are a test agent. Reply concisely in 5 words or less." [capabilities] tools = ["file_read"] memory_read = ["*"] memory_write = ["self.*"] "#; /// Maximum time to wait for daemon health check. pub const HEALTH_CHECK_TIMEOUT_SECS: u64 = 30; /// Interval between health check retries. pub const HEALTH_CHECK_INTERVAL_MS: u64 = 100; // --------------------------------------------------------------------------- // Test Daemon // --------------------------------------------------------------------------- /// A running test daemon instance. /// /// This struct manages the lifecycle of a test daemon: /// - Holds a reference to the temp directory (keeps it alive) /// - Holds a reference to the kernel state /// - Provides the base URL for API calls /// /// When dropped, the daemon is automatically shut down. pub struct TestDaemon { base_url: String, state: Arc, _tmp: TempDir, addr: SocketAddr, } impl Drop for TestDaemon { fn drop(&mut self) { self.state.kernel.shutdown(); } } impl TestDaemon { /// Get the base URL for API calls. pub fn base_url(&self) -> &str { &self.base_url } /// Get the socket address the daemon is listening on. pub fn addr(&self) -> SocketAddr { self.addr } /// Get a reference to the kernel state. pub fn state(&self) -> &Arc { &self.state } /// Get the temp directory path. pub fn temp_dir(&self) -> &PathBuf { self._tmp.path() } /// Trigger a graceful shutdown. pub async fn shutdown(&self) { self.state.shutdown_notify.notify_one(); } } /// Configuration for spawning a test daemon. #[derive(Debug, Clone)] pub struct DaemonConfig { /// LLM provider to use (e.g., "ollama", "groq"). pub provider: String, /// Model name for the LLM. pub model: String, /// Environment variable name for the API key. pub api_key_env: String, /// Optional API key for authentication. pub auth_key: Option, /// Base URL for the LLM API (optional). pub base_url: Option, } impl Default for DaemonConfig { fn default() -> Self { Self { provider: "ollama".to_string(), model: "test-model".to_string(), api_key_env: "OLLAMA_API_KEY".to_string(), auth_key: None, base_url: None, } } } impl DaemonConfig { /// Create a config for ollama (no API key needed). pub fn ollama() -> Self { Self::default() } /// Create a config for Groq (requires GROQ_API_KEY). pub fn groq() -> Self { Self { provider: "groq".to_string(), model: "llama-3.3-70b-versatile".to_string(), api_key_env: "GROQ_API_KEY".to_string(), auth_key: None, base_url: None, } } /// Set the authentication key for the daemon. pub fn with_auth_key(mut self, key: impl Into) -> Self { self.auth_key = Some(key.into()); self } } /// Spawn a test daemon with default configuration (ollama, no auth). /// /// This creates an isolated daemon instance with: /// - A random available port /// - A temporary directory for data /// - Ollama as the default provider (no API key needed) /// /// # Example /// /// ```rust,ignore /// let daemon = spawn_daemon().await; /// wait_for_health(&daemon.base_url()).await; /// ``` pub async fn spawn_daemon() -> TestDaemon { spawn_daemon_with_config(DaemonConfig::default()).await } /// Spawn a test daemon with custom configuration. /// /// # Example /// /// ```rust,ignore /// let config = DaemonConfig::groq(); /// let daemon = spawn_daemon_with_config(config).await; /// ``` pub async fn spawn_daemon_with_config(config: DaemonConfig) -> TestDaemon { let tmp = tempfile::tempdir().expect("Failed to create temp dir"); let kernel_config = KernelConfig { home_dir: tmp.path().to_path_buf(), data_dir: tmp.path().join("data"), api_key: config.auth_key.clone().unwrap_or_default(), default_model: DefaultModelConfig { provider: config.provider.clone(), model: config.model.clone(), api_key_env: config.api_key_env.clone(), base_url: config.base_url.clone(), }, ..KernelConfig::default() }; let kernel = OpenFangKernel::boot_with_config(kernel_config).expect("Kernel should boot"); let kernel = Arc::new(kernel); kernel.set_self_handle(); let state = Arc::new(AppState { kernel: kernel.clone(), started_at: Instant::now(), peer_registry: kernel.peer_registry.as_ref().map(|r| Arc::new(r.clone())), bridge_manager: tokio::sync::Mutex::new(None), channels_config: tokio::sync::RwLock::new(Default::default()), shutdown_notify: Arc::new(tokio::sync::Notify::new()), }); let app = build_test_router(state.clone(), config.auth_key.is_some()); // Bind to random available port let listener = tokio::net::TcpListener::bind("127.0.0.1:0") .await .expect("Failed to bind test server"); let addr = listener.local_addr().unwrap(); tokio::spawn(async move { axum::serve(listener, app).await.unwrap(); }); TestDaemon { base_url: format!("http://{}", addr), state, _tmp: tmp, addr, } } /// Build the test router with all routes. fn build_test_router(state: Arc, with_auth: bool) -> Router<()> { let api_key = if with_auth { state.kernel.config.api_key.clone() } else { String::new() }; let gcra_limiter = rate_limiter::create_rate_limiter(); let mut app = Router::new() // Health and status .route("/api/health", axum::routing::get(routes::health)) .route( "/api/health/detail", axum::routing::get(routes::health_detail), ) .route("/api/status", axum::routing::get(routes::status)) .route("/api/version", axum::routing::get(routes::version)) // Agent endpoints .route( "/api/agents", axum::routing::get(routes::list_agents).post(routes::spawn_agent), ) .route( "/api/agents/{id}", axum::routing::get(routes::get_agent).delete(routes::kill_agent), ) .route( "/api/agents/{id}/message", axum::routing::post(routes::send_message), ) .route( "/api/agents/{id}/message/stream", axum::routing::post(routes::send_message_stream), ) .route( "/api/agents/{id}/session", axum::routing::get(routes::get_agent_session), ) .route( "/api/agents/{id}/sessions", axum::routing::get(routes::list_agent_sessions) .post(routes::create_agent_session), ) .route( "/api/agents/{id}/sessions/{session_id}/switch", axum::routing::post(routes::switch_agent_session), ) .route( "/api/agents/{id}/session/reset", axum::routing::post(routes::reset_session), ) .route( "/api/agents/{id}/stop", axum::routing::post(routes::stop_agent), ) .route( "/api/agents/{id}/model", axum::routing::put(routes::set_model), ) .route( "/api/agents/{id}/skills", axum::routing::get(routes::get_agent_skills).put(routes::set_agent_skills), ) .route( "/api/agents/{id}/clone", axum::routing::post(routes::clone_agent), ) .route("/api/agents/{id}/ws", axum::routing::get(ws::agent_ws)) // Profile endpoints .route("/api/profiles", axum::routing::get(routes::list_profiles)) // Memory endpoints .route( "/api/memory/agents/{id}/kv", axum::routing::get(routes::get_agent_kv), ) .route( "/api/memory/agents/{id}/kv/{key}", axum::routing::get(routes::get_agent_kv_key) .put(routes::set_agent_kv_key) .delete(routes::delete_agent_kv_key), ) // Trigger endpoints .route( "/api/triggers", axum::routing::get(routes::list_triggers).post(routes::create_trigger), ) .route( "/api/triggers/{id}", axum::routing::delete(routes::delete_trigger).put(routes::update_trigger), ) // Schedule endpoints .route( "/api/schedules", axum::routing::get(routes::list_schedules).post(routes::create_schedule), ) .route( "/api/schedules/{id}", axum::routing::delete(routes::delete_schedule).put(routes::update_schedule), ) .route( "/api/schedules/{id}/run", axum::routing::post(routes::run_schedule), ) // Workflow endpoints .route( "/api/workflows", axum::routing::get(routes::list_workflows).post(routes::create_workflow), ) .route( "/api/workflows/{id}/run", axum::routing::post(routes::run_workflow), ) .route( "/api/workflows/{id}/runs", axum::routing::get(routes::list_workflow_runs), ) // Budget endpoints .route( "/api/budget", axum::routing::get(routes::budget_status).put(routes::update_budget), ) .route( "/api/budget/agents", axum::routing::get(routes::agent_budget_ranking), ) .route( "/api/budget/agents/{id}", axum::routing::get(routes::agent_budget_status), ) // Usage endpoints .route("/api/usage", axum::routing::get(routes::usage_stats)) .route( "/api/usage/summary", axum::routing::get(routes::usage_summary), ) .route( "/api/usage/by-model", axum::routing::get(routes::usage_by_model), ) .route("/api/usage/daily", axum::routing::get(routes::usage_daily)) // Peer/Network endpoints .route("/api/peers", axum::routing::get(routes::list_peers)) .route( "/api/network/status", axum::routing::get(routes::network_status), ) // A2A endpoints .route("/api/a2a/agents", axum::routing::get(routes::a2a_list_external_agents)) .route( "/api/a2a/discover", axum::routing::post(routes::a2a_discover_external), ) .route( "/api/a2a/send", axum::routing::post(routes::a2a_send_external), ) .route( "/api/a2a/tasks/{id}/status", axum::routing::get(routes::a2a_external_task_status), ) // Model endpoints .route("/api/models", axum::routing::get(routes::list_models)) .route("/api/providers", axum::routing::get(routes::list_providers)) // Config endpoints .route("/api/config", axum::routing::get(routes::get_config)) .route("/api/config/set", axum::routing::post(routes::config_set)) // Shutdown .route("/api/shutdown", axum::routing::post(routes::shutdown)) // Middleware .layer(axum::middleware::from_fn(middleware::security_headers)) .layer(axum::middleware::from_fn(middleware::request_logging)) .layer(TraceLayer::new_for_http()) .layer(CorsLayer::permissive()) .with_state(state.clone()); if with_auth { app = app.layer(axum::middleware::from_fn_with_state( api_key, middleware::auth, )); } app = app.layer(axum::middleware::from_fn_with_state( gcra_limiter, rate_limiter::gcra_rate_limit, )); app } // --------------------------------------------------------------------------- // Health Check Utilities // --------------------------------------------------------------------------- /// Wait for the daemon to become healthy. /// /// Polls the `/api/health` endpoint until it returns 200 or times out. /// /// # Panics /// /// Panics if the daemon doesn't become healthy within the timeout. pub async fn wait_for_health(base_url: &str) { wait_for_health_with_timeout(base_url, Duration::from_secs(HEALTH_CHECK_TIMEOUT_SECS)).await } /// Wait for the daemon to become healthy with a custom timeout. pub async fn wait_for_health_with_timeout(base_url: &str, timeout: Duration) { let client = reqwest::Client::builder() .timeout(Duration::from_millis(500)) .build() .unwrap(); let start = Instant::now(); while start.elapsed() < timeout { match client.get(format!("{}/api/health", base_url)).send().await { Ok(resp) if resp.status().is_success() => return, _ => tokio::time::sleep(Duration::from_millis(HEALTH_CHECK_INTERVAL_MS)).await, } } panic!( "Daemon did not become healthy within {:?}", timeout ); } // --------------------------------------------------------------------------- // Agent Utilities // --------------------------------------------------------------------------- /// Information about a spawned test agent. #[derive(Debug, Clone)] pub struct TestAgent { /// Agent UUID. pub id: String, /// Agent name. pub name: String, /// Model provider. pub provider: String, /// Model name. pub model: String, } /// Create a test agent with the given manifest. /// /// # Example /// /// ```rust,ignore /// let daemon = spawn_daemon().await; /// let agent = create_test_agent(&daemon, DEFAULT_MANIFEST).await; /// println!("Created agent: {}", agent.id); /// ``` pub async fn create_test_agent(daemon: &TestDaemon, manifest: &str) -> TestAgent { let client = reqwest::Client::new(); let resp = client .post(format!("{}/api/agents", daemon.base_url())) .json(&serde_json::json!({"manifest_toml": manifest})) .send() .await .expect("Failed to create agent"); if !resp.status().is_success() { let status = resp.status(); let body = resp.text().await.unwrap_or_default(); panic!("Failed to create agent: {} - {}", status, body); } let body: serde_json::Value = resp.json().await.expect("Invalid JSON response"); TestAgent { id: body["agent_id"].as_str().unwrap().to_string(), name: body["name"].as_str().unwrap().to_string(), provider: body["model_provider"].as_str().unwrap_or("unknown").to_string(), model: body["model_name"].as_str().unwrap_or("unknown").to_string(), } } /// Create a test agent with a custom name. pub async fn create_named_test_agent(daemon: &TestDaemon, name: &str) -> TestAgent { let manifest = format!( r#" name = "{}" version = "0.1.0" description = "Named test agent" author = "test" module = "builtin:chat" [model] provider = "ollama" model = "test-model" system_prompt = "You are a test agent." [capabilities] memory_read = ["*"] memory_write = ["self.*"] "#, name ); create_test_agent(daemon, &manifest).await } /// Kill an agent by ID. pub async fn kill_agent(daemon: &TestDaemon, agent_id: &str) { let client = reqwest::Client::new(); let resp = client .delete(format!("{}/api/agents/{}", daemon.base_url(), agent_id)) .send() .await .expect("Failed to kill agent"); assert!(resp.status().is_success(), "Failed to kill agent: {}", resp.status()); } // --------------------------------------------------------------------------- // Message Utilities // --------------------------------------------------------------------------- /// Response from sending a message to an agent. #[derive(Debug, Clone)] pub struct MessageResponse { /// The agent's response text. pub response: String, /// Number of input tokens used. pub input_tokens: u64, /// Number of output tokens used. pub output_tokens: u64, /// Raw JSON response. pub raw: serde_json::Value, } /// Send a message to an agent. /// /// # Example /// /// ```rust,ignore /// let daemon = spawn_daemon().await; /// let agent = create_test_agent(&daemon, LLM_MANIFEST).await; /// let response = send_message(&daemon, &agent.id, "Hello!").await; /// println!("Response: {}", response.response); /// ``` pub async fn send_message(daemon: &TestDaemon, agent_id: &str, message: &str) -> MessageResponse { let client = reqwest::Client::new(); let resp = client .post(format!( "{}/api/agents/{}/message", daemon.base_url(), agent_id )) .json(&serde_json::json!({"message": message})) .send() .await .expect("Failed to send message"); if !resp.status().is_success() { let status = resp.status(); let body = resp.text().await.unwrap_or_default(); panic!("Failed to send message: {} - {}", status, body); } let raw: serde_json::Value = resp.json().await.expect("Invalid JSON response"); MessageResponse { response: raw["response"].as_str().unwrap_or_default().to_string(), input_tokens: raw["input_tokens"].as_u64().unwrap_or(0), output_tokens: raw["output_tokens"].as_u64().unwrap_or(0), raw, } } // --------------------------------------------------------------------------- // API Helpers // --------------------------------------------------------------------------- /// Make a GET request to the daemon. pub async fn get(daemon: &TestDaemon, path: &str) -> serde_json::Value { let client = reqwest::Client::new(); let resp = client .get(format!("{}{}", daemon.base_url(), path)) .send() .await .expect("GET request failed"); resp.json().await.expect("Invalid JSON response") } /// Make a POST request to the daemon. pub async fn post(daemon: &TestDaemon, path: &str, body: &serde_json::Value) -> serde_json::Value { let client = reqwest::Client::new(); let resp = client .post(format!("{}{}", daemon.base_url(), path)) .json(body) .send() .await .expect("POST request failed"); resp.json().await.expect("Invalid JSON response") } /// Make a PUT request to the daemon. pub async fn put(daemon: &TestDaemon, path: &str, body: &serde_json::Value) -> serde_json::Value { let client = reqwest::Client::new(); let resp = client .put(format!("{}{}", daemon.base_url(), path)) .json(body) .send() .await .expect("PUT request failed"); resp.json().await.expect("Invalid JSON response") } /// Make a DELETE request to the daemon. pub async fn delete(daemon: &TestDaemon, path: &str) -> serde_json::Value { let client = reqwest::Client::new(); let resp = client .delete(format!("{}{}", daemon.base_url(), path)) .send() .await .expect("DELETE request failed"); resp.json().await.expect("Invalid JSON response") } /// List all agents. pub async fn list_agents(daemon: &TestDaemon) -> Vec { get(daemon, "/api/agents").await.as_array().unwrap().clone() } /// Get agent count. pub async fn agent_count(daemon: &TestDaemon) -> usize { list_agents(daemon).await.len() } /// Check if LLM tests can run (API key is available). pub fn can_run_llm_tests() -> bool { std::env::var("GROQ_API_KEY").is_ok() } /// Skip reason for LLM tests. pub fn llm_skip_reason() -> &'static str { "GROQ_API_KEY not set, skipping LLM integration test" } // --------------------------------------------------------------------------- // Assertion Helpers // --------------------------------------------------------------------------- /// Assert that a response has a successful status code. pub fn assert_success(response: &reqwest::Response) { assert!( response.status().is_success(), "Expected success, got: {}", response.status() ); } /// Assert that a response has a specific status code. pub fn assert_status(response: &reqwest::Response, expected: u16) { let status = response.status(); assert_eq!( status.as_u16(), expected, "Expected status {}, got: {}", expected, status ); } /// Assert that an agent exists. pub async fn assert_agent_exists(daemon: &TestDaemon, agent_id: &str) { let agents = list_agents(daemon).await; let exists = agents.iter().any(|a| a["id"] == agent_id); assert!(exists, "Agent {} should exist", agent_id); } /// Assert that an agent does not exist. pub async fn assert_agent_not_exists(daemon: &TestDaemon, agent_id: &str) { let agents = list_agents(daemon).await; let exists = agents.iter().any(|a| a["id"] == agent_id); assert!(!exists, "Agent {} should not exist", agent_id); } // --------------------------------------------------------------------------- // Test Fixture Helpers // --------------------------------------------------------------------------- /// Create a workflow definition for testing. pub fn create_test_workflow(agent_name: &str) -> serde_json::Value { serde_json::json!({ "name": "test-workflow", "description": "E2E test workflow", "steps": [ { "name": "step1", "agent_name": agent_name, "prompt": "Echo: {{input}}", "mode": "sequential", "timeout_secs": 30 } ] }) } /// Create a trigger definition for testing. pub fn create_test_trigger(agent_id: &str) -> serde_json::Value { serde_json::json!({ "agent_id": agent_id, "pattern": "lifecycle", "prompt_template": "Handle: {{event}}", "max_fires": 5 }) } /// Create a schedule definition for testing. pub fn create_test_schedule(agent_id: &str) -> serde_json::Value { serde_json::json!({ "agent_id": agent_id, "cron": "0 * * * * *", // Every hour "prompt": "Scheduled task check", "enabled": true }) }