feat(kernel): persist agent runtime state across restarts
Some checks failed
CI / Lint & TypeCheck (push) Has been cancelled
CI / Unit Tests (push) Has been cancelled
CI / Build Frontend (push) Has been cancelled
CI / Rust Check (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
CI / E2E Tests (push) Has been cancelled
Some checks failed
CI / Lint & TypeCheck (push) Has been cancelled
CI / Unit Tests (push) Has been cancelled
CI / Build Frontend (push) Has been cancelled
CI / Rust Check (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
CI / E2E Tests (push) Has been cancelled
- Schema: migrations now execute ALTER TABLE ADD COLUMN for state/message_count - MemoryStore: add update_agent_runtime() and list_agents_with_runtime() - Registry: add register_with_runtime() to accept persisted state/message_count - Kernel boot: restore agents with their persisted state (not always Running) - Kernel shutdown: persist all agent states/message_counts before terminating Agents that were suspended stay suspended after restart. Message counts survive restarts instead of resetting to 0.
This commit is contained in:
@@ -1,6 +1,6 @@
|
|||||||
//! Agent CRUD operations
|
//! Agent CRUD operations
|
||||||
|
|
||||||
use zclaw_types::{AgentConfig, AgentId, AgentInfo, Event, Result};
|
use zclaw_types::{AgentConfig, AgentId, AgentInfo, AgentState, Event, Result};
|
||||||
|
|
||||||
#[cfg(feature = "multi-agent")]
|
#[cfg(feature = "multi-agent")]
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|||||||
@@ -12,7 +12,7 @@ mod a2a;
|
|||||||
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::sync::{broadcast, Mutex};
|
use tokio::sync::{broadcast, Mutex};
|
||||||
use zclaw_types::{Event, Result};
|
use zclaw_types::{Event, Result, AgentState};
|
||||||
|
|
||||||
#[cfg(feature = "multi-agent")]
|
#[cfg(feature = "multi-agent")]
|
||||||
use zclaw_types::AgentId;
|
use zclaw_types::AgentId;
|
||||||
@@ -114,10 +114,21 @@ impl Kernel {
|
|||||||
// Initialize Growth system — shared VikingAdapter for memory storage
|
// Initialize Growth system — shared VikingAdapter for memory storage
|
||||||
let viking = Arc::new(zclaw_runtime::VikingAdapter::in_memory());
|
let viking = Arc::new(zclaw_runtime::VikingAdapter::in_memory());
|
||||||
|
|
||||||
// Restore persisted agents
|
// Restore persisted agents with their runtime state
|
||||||
let persisted = memory.list_agents().await?;
|
let persisted = memory.list_agents_with_runtime().await?;
|
||||||
for agent in persisted {
|
for (agent, state_str, msg_count) in persisted {
|
||||||
registry.register(agent);
|
let state = match state_str.as_str() {
|
||||||
|
"running" => AgentState::Running,
|
||||||
|
"suspended" => AgentState::Suspended,
|
||||||
|
_ => AgentState::Terminated,
|
||||||
|
};
|
||||||
|
// Only auto-resume agents that were running; suspended/terminated stay as-is
|
||||||
|
let restored_state = if state == AgentState::Running {
|
||||||
|
AgentState::Running
|
||||||
|
} else {
|
||||||
|
state
|
||||||
|
};
|
||||||
|
registry.register_with_runtime(agent, restored_state, msg_count);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Initialize A2A router for multi-agent support
|
// Initialize A2A router for multi-agent support
|
||||||
@@ -287,6 +298,23 @@ impl Kernel {
|
|||||||
|
|
||||||
/// Shutdown the kernel
|
/// Shutdown the kernel
|
||||||
pub async fn shutdown(&self) -> Result<()> {
|
pub async fn shutdown(&self) -> Result<()> {
|
||||||
|
// Persist all agent runtime states before shutdown
|
||||||
|
let agents = self.registry.list();
|
||||||
|
for info in &agents {
|
||||||
|
let state_str = match info.state {
|
||||||
|
AgentState::Running => "running",
|
||||||
|
AgentState::Suspended => "suspended",
|
||||||
|
AgentState::Terminated => "terminated",
|
||||||
|
};
|
||||||
|
if let Err(e) = self.memory
|
||||||
|
.update_agent_runtime(&info.id, state_str, info.message_count as u64)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
tracing::warn!("[Kernel] Failed to persist agent {} state: {}", info.id, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
tracing::info!("[Kernel] Persisted runtime state for {} agents", agents.len());
|
||||||
|
|
||||||
self.events.publish(Event::KernelShutdown);
|
self.events.publish(Event::KernelShutdown);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -30,6 +30,22 @@ impl AgentRegistry {
|
|||||||
self.created_at.insert(id, Utc::now());
|
self.created_at.insert(id, Utc::now());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Register an agent with persisted runtime state
|
||||||
|
pub fn register_with_runtime(
|
||||||
|
&self,
|
||||||
|
config: AgentConfig,
|
||||||
|
state: AgentState,
|
||||||
|
message_count: u64,
|
||||||
|
) {
|
||||||
|
let id = config.id;
|
||||||
|
self.agents.insert(id, config);
|
||||||
|
self.states.insert(id, state);
|
||||||
|
self.created_at.insert(id, Utc::now());
|
||||||
|
if message_count > 0 {
|
||||||
|
self.message_counts.insert(id, message_count);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Unregister an agent
|
/// Unregister an agent
|
||||||
pub fn unregister(&self, id: &AgentId) {
|
pub fn unregister(&self, id: &AgentId) {
|
||||||
self.agents.remove(id);
|
self.agents.remove(id);
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
//! Database schema definitions
|
//! Database schema definitions
|
||||||
|
|
||||||
/// Current schema version
|
/// Current schema version
|
||||||
pub const SCHEMA_VERSION: i32 = 1;
|
pub const SCHEMA_VERSION: i32 = 2;
|
||||||
|
|
||||||
/// Schema creation SQL
|
/// Schema creation SQL
|
||||||
pub const CREATE_SCHEMA: &str = r#"
|
pub const CREATE_SCHEMA: &str = r#"
|
||||||
@@ -87,3 +87,10 @@ CREATE INDEX IF NOT EXISTS idx_facts_agent ON facts(agent_id);
|
|||||||
CREATE INDEX IF NOT EXISTS idx_facts_category ON facts(agent_id, category);
|
CREATE INDEX IF NOT EXISTS idx_facts_category ON facts(agent_id, category);
|
||||||
CREATE INDEX IF NOT EXISTS idx_facts_confidence ON facts(agent_id, confidence DESC);
|
CREATE INDEX IF NOT EXISTS idx_facts_confidence ON facts(agent_id, confidence DESC);
|
||||||
"#;
|
"#;
|
||||||
|
|
||||||
|
/// Incremental migrations (safe to run repeatedly via ALTER … ADD COLUMN + IF NOT EXISTS pattern)
|
||||||
|
pub const MIGRATIONS: &[&str] = &[
|
||||||
|
// v1→v2: persist runtime state and message count
|
||||||
|
"ALTER TABLE agents ADD COLUMN state TEXT NOT NULL DEFAULT 'running'",
|
||||||
|
"ALTER TABLE agents ADD COLUMN message_count INTEGER NOT NULL DEFAULT 0",
|
||||||
|
];
|
||||||
|
|||||||
@@ -69,6 +69,26 @@ impl MemoryStore {
|
|||||||
.execute(&self.pool)
|
.execute(&self.pool)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| ZclawError::StorageError(e.to_string()))?;
|
.map_err(|e| ZclawError::StorageError(e.to_string()))?;
|
||||||
|
|
||||||
|
// Run incremental migrations (ALTER … ADD COLUMN is idempotent with error suppression)
|
||||||
|
for migration in crate::schema::MIGRATIONS {
|
||||||
|
if let Err(e) = sqlx::query(migration)
|
||||||
|
.execute(&self.pool)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
// Column already exists — expected on repeated runs
|
||||||
|
tracing::debug!("[MemoryStore] Migration skipped (already applied): {}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Persist current schema version
|
||||||
|
let version = crate::schema::SCHEMA_VERSION;
|
||||||
|
sqlx::query("INSERT OR REPLACE INTO schema_version (version) VALUES (?)")
|
||||||
|
.bind(version)
|
||||||
|
.execute(&self.pool)
|
||||||
|
.await
|
||||||
|
.map_err(|e| ZclawError::StorageError(e.to_string()))?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -150,6 +170,46 @@ impl MemoryStore {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Persist runtime state and message count for an agent
|
||||||
|
pub async fn update_agent_runtime(
|
||||||
|
&self,
|
||||||
|
id: &AgentId,
|
||||||
|
state: &str,
|
||||||
|
message_count: u64,
|
||||||
|
) -> Result<()> {
|
||||||
|
let id_str = id.to_string();
|
||||||
|
sqlx::query(
|
||||||
|
"UPDATE agents SET state = ?, message_count = ?, updated_at = datetime('now') WHERE id = ?",
|
||||||
|
)
|
||||||
|
.bind(state)
|
||||||
|
.bind(message_count as i64)
|
||||||
|
.bind(&id_str)
|
||||||
|
.execute(&self.pool)
|
||||||
|
.await
|
||||||
|
.map_err(|e| ZclawError::StorageError(e.to_string()))?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// List all agents with their persisted runtime state
|
||||||
|
/// Returns (AgentConfig, state_string, message_count)
|
||||||
|
pub async fn list_agents_with_runtime(&self) -> Result<Vec<(AgentConfig, String, u64)>> {
|
||||||
|
let rows = sqlx::query_as::<_, (String, String, i64)>(
|
||||||
|
"SELECT config, state, message_count FROM agents",
|
||||||
|
)
|
||||||
|
.fetch_all(&self.pool)
|
||||||
|
.await
|
||||||
|
.map_err(|e| ZclawError::StorageError(e.to_string()))?;
|
||||||
|
|
||||||
|
let agents = rows
|
||||||
|
.into_iter()
|
||||||
|
.filter_map(|(config, state, mc)| {
|
||||||
|
let agent: AgentConfig = serde_json::from_str(&config).ok()?;
|
||||||
|
Some((agent, state, mc as u64))
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
Ok(agents)
|
||||||
|
}
|
||||||
|
|
||||||
// === Session Management ===
|
// === Session Management ===
|
||||||
|
|
||||||
/// Create a new session for an agent
|
/// Create a new session for an agent
|
||||||
|
|||||||
Reference in New Issue
Block a user