From b25dfc967a4f9d4dd506ebf1075bc178ea6024a4 Mon Sep 17 00:00:00 2001 From: iven Date: Sat, 4 Apr 2026 01:19:53 +0800 Subject: [PATCH] feat(kernel): persist agent runtime state across restarts - Schema: migrations now execute ALTER TABLE ADD COLUMN for state/message_count - MemoryStore: add update_agent_runtime() and list_agents_with_runtime() - Registry: add register_with_runtime() to accept persisted state/message_count - Kernel boot: restore agents with their persisted state (not always Running) - Kernel shutdown: persist all agent states/message_counts before terminating Agents that were suspended stay suspended after restart. Message counts survive restarts instead of resetting to 0. --- crates/zclaw-kernel/src/kernel/agents.rs | 2 +- crates/zclaw-kernel/src/kernel/mod.rs | 38 +++++++++++++-- crates/zclaw-kernel/src/registry.rs | 16 +++++++ crates/zclaw-memory/src/schema.rs | 9 +++- crates/zclaw-memory/src/store.rs | 60 ++++++++++++++++++++++++ 5 files changed, 118 insertions(+), 7 deletions(-) diff --git a/crates/zclaw-kernel/src/kernel/agents.rs b/crates/zclaw-kernel/src/kernel/agents.rs index 7fcb859..07625da 100644 --- a/crates/zclaw-kernel/src/kernel/agents.rs +++ b/crates/zclaw-kernel/src/kernel/agents.rs @@ -1,6 +1,6 @@ //! Agent CRUD operations -use zclaw_types::{AgentConfig, AgentId, AgentInfo, Event, Result}; +use zclaw_types::{AgentConfig, AgentId, AgentInfo, AgentState, Event, Result}; #[cfg(feature = "multi-agent")] use std::sync::Arc; diff --git a/crates/zclaw-kernel/src/kernel/mod.rs b/crates/zclaw-kernel/src/kernel/mod.rs index 0d30603..a66fe21 100644 --- a/crates/zclaw-kernel/src/kernel/mod.rs +++ b/crates/zclaw-kernel/src/kernel/mod.rs @@ -12,7 +12,7 @@ mod a2a; use std::sync::Arc; use tokio::sync::{broadcast, Mutex}; -use zclaw_types::{Event, Result}; +use zclaw_types::{Event, Result, AgentState}; #[cfg(feature = "multi-agent")] use zclaw_types::AgentId; @@ -114,10 +114,21 @@ impl Kernel { // Initialize Growth system — shared VikingAdapter for memory storage let viking = Arc::new(zclaw_runtime::VikingAdapter::in_memory()); - // Restore persisted agents - let persisted = memory.list_agents().await?; - for agent in persisted { - registry.register(agent); + // Restore persisted agents with their runtime state + let persisted = memory.list_agents_with_runtime().await?; + for (agent, state_str, msg_count) in persisted { + let state = match state_str.as_str() { + "running" => AgentState::Running, + "suspended" => AgentState::Suspended, + _ => AgentState::Terminated, + }; + // Only auto-resume agents that were running; suspended/terminated stay as-is + let restored_state = if state == AgentState::Running { + AgentState::Running + } else { + state + }; + registry.register_with_runtime(agent, restored_state, msg_count); } // Initialize A2A router for multi-agent support @@ -287,6 +298,23 @@ impl Kernel { /// Shutdown the kernel pub async fn shutdown(&self) -> Result<()> { + // Persist all agent runtime states before shutdown + let agents = self.registry.list(); + for info in &agents { + let state_str = match info.state { + AgentState::Running => "running", + AgentState::Suspended => "suspended", + AgentState::Terminated => "terminated", + }; + if let Err(e) = self.memory + .update_agent_runtime(&info.id, state_str, info.message_count as u64) + .await + { + tracing::warn!("[Kernel] Failed to persist agent {} state: {}", info.id, e); + } + } + tracing::info!("[Kernel] Persisted runtime state for {} agents", agents.len()); + self.events.publish(Event::KernelShutdown); Ok(()) } diff --git a/crates/zclaw-kernel/src/registry.rs b/crates/zclaw-kernel/src/registry.rs index 7632e2d..0cdbfd8 100644 --- a/crates/zclaw-kernel/src/registry.rs +++ b/crates/zclaw-kernel/src/registry.rs @@ -30,6 +30,22 @@ impl AgentRegistry { self.created_at.insert(id, Utc::now()); } + /// Register an agent with persisted runtime state + pub fn register_with_runtime( + &self, + config: AgentConfig, + state: AgentState, + message_count: u64, + ) { + let id = config.id; + self.agents.insert(id, config); + self.states.insert(id, state); + self.created_at.insert(id, Utc::now()); + if message_count > 0 { + self.message_counts.insert(id, message_count); + } + } + /// Unregister an agent pub fn unregister(&self, id: &AgentId) { self.agents.remove(id); diff --git a/crates/zclaw-memory/src/schema.rs b/crates/zclaw-memory/src/schema.rs index abc71d9..15ade9d 100644 --- a/crates/zclaw-memory/src/schema.rs +++ b/crates/zclaw-memory/src/schema.rs @@ -1,7 +1,7 @@ //! Database schema definitions /// Current schema version -pub const SCHEMA_VERSION: i32 = 1; +pub const SCHEMA_VERSION: i32 = 2; /// Schema creation SQL pub const CREATE_SCHEMA: &str = r#" @@ -87,3 +87,10 @@ CREATE INDEX IF NOT EXISTS idx_facts_agent ON facts(agent_id); CREATE INDEX IF NOT EXISTS idx_facts_category ON facts(agent_id, category); CREATE INDEX IF NOT EXISTS idx_facts_confidence ON facts(agent_id, confidence DESC); "#; + +/// Incremental migrations (safe to run repeatedly via ALTER … ADD COLUMN + IF NOT EXISTS pattern) +pub const MIGRATIONS: &[&str] = &[ + // v1→v2: persist runtime state and message count + "ALTER TABLE agents ADD COLUMN state TEXT NOT NULL DEFAULT 'running'", + "ALTER TABLE agents ADD COLUMN message_count INTEGER NOT NULL DEFAULT 0", +]; diff --git a/crates/zclaw-memory/src/store.rs b/crates/zclaw-memory/src/store.rs index bbfe065..5c60ff6 100644 --- a/crates/zclaw-memory/src/store.rs +++ b/crates/zclaw-memory/src/store.rs @@ -69,6 +69,26 @@ impl MemoryStore { .execute(&self.pool) .await .map_err(|e| ZclawError::StorageError(e.to_string()))?; + + // Run incremental migrations (ALTER … ADD COLUMN is idempotent with error suppression) + for migration in crate::schema::MIGRATIONS { + if let Err(e) = sqlx::query(migration) + .execute(&self.pool) + .await + { + // Column already exists — expected on repeated runs + tracing::debug!("[MemoryStore] Migration skipped (already applied): {}", e); + } + } + + // Persist current schema version + let version = crate::schema::SCHEMA_VERSION; + sqlx::query("INSERT OR REPLACE INTO schema_version (version) VALUES (?)") + .bind(version) + .execute(&self.pool) + .await + .map_err(|e| ZclawError::StorageError(e.to_string()))?; + Ok(()) } @@ -150,6 +170,46 @@ impl MemoryStore { Ok(()) } + /// Persist runtime state and message count for an agent + pub async fn update_agent_runtime( + &self, + id: &AgentId, + state: &str, + message_count: u64, + ) -> Result<()> { + let id_str = id.to_string(); + sqlx::query( + "UPDATE agents SET state = ?, message_count = ?, updated_at = datetime('now') WHERE id = ?", + ) + .bind(state) + .bind(message_count as i64) + .bind(&id_str) + .execute(&self.pool) + .await + .map_err(|e| ZclawError::StorageError(e.to_string()))?; + Ok(()) + } + + /// List all agents with their persisted runtime state + /// Returns (AgentConfig, state_string, message_count) + pub async fn list_agents_with_runtime(&self) -> Result> { + let rows = sqlx::query_as::<_, (String, String, i64)>( + "SELECT config, state, message_count FROM agents", + ) + .fetch_all(&self.pool) + .await + .map_err(|e| ZclawError::StorageError(e.to_string()))?; + + let agents = rows + .into_iter() + .filter_map(|(config, state, mc)| { + let agent: AgentConfig = serde_json::from_str(&config).ok()?; + Some((agent, state, mc as u64)) + }) + .collect(); + Ok(agents) + } + // === Session Management === /// Create a new session for an agent