//! Trajectory Store -- record and compress tool-call chains for analysis. //! //! Stores raw trajectory events (user requests, tool calls, LLM generations) //! and compressed trajectory summaries. Used by the Hermes Intelligence Pipeline //! to analyze agent behaviour patterns and improve routing over time. use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use sqlx::SqlitePool; use zclaw_types::{Result, ZclawError}; // --------------------------------------------------------------------------- // Types // --------------------------------------------------------------------------- /// Step type in a trajectory. #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] pub enum TrajectoryStepType { UserRequest, IntentClassification, SkillSelection, ToolExecution, LlmGeneration, UserFeedback, } impl TrajectoryStepType { /// Serialize to the string stored in SQLite. pub fn as_str(&self) -> &'static str { match self { Self::UserRequest => "user_request", Self::IntentClassification => "intent_classification", Self::SkillSelection => "skill_selection", Self::ToolExecution => "tool_execution", Self::LlmGeneration => "llm_generation", Self::UserFeedback => "user_feedback", } } /// Deserialize from the SQLite string representation. pub fn from_str_lossy(s: &str) -> Self { match s { "user_request" => Self::UserRequest, "intent_classification" => Self::IntentClassification, "skill_selection" => Self::SkillSelection, "tool_execution" => Self::ToolExecution, "llm_generation" => Self::LlmGeneration, "user_feedback" => Self::UserFeedback, _ => Self::UserRequest, } } } /// Single trajectory event. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct TrajectoryEvent { pub id: String, pub session_id: String, pub agent_id: String, pub step_index: usize, pub step_type: TrajectoryStepType, /// Summarised input (max 200 chars). pub input_summary: String, /// Summarised output (max 200 chars). pub output_summary: String, pub duration_ms: u64, pub timestamp: DateTime, } /// Satisfaction signal inferred from user feedback. #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "lowercase")] pub enum SatisfactionSignal { Positive, Negative, Neutral, } impl SatisfactionSignal { pub fn as_str(&self) -> &'static str { match self { Self::Positive => "positive", Self::Negative => "negative", Self::Neutral => "neutral", } } pub fn from_str_lossy(s: &str) -> Option { match s { "positive" => Some(Self::Positive), "negative" => Some(Self::Negative), "neutral" => Some(Self::Neutral), _ => None, } } } /// Completion status of a compressed trajectory. #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "lowercase")] pub enum CompletionStatus { Success, Partial, Failed, Abandoned, } impl CompletionStatus { pub fn as_str(&self) -> &'static str { match self { Self::Success => "success", Self::Partial => "partial", Self::Failed => "failed", Self::Abandoned => "abandoned", } } pub fn from_str_lossy(s: &str) -> Self { match s { "success" => Self::Success, "partial" => Self::Partial, "failed" => Self::Failed, "abandoned" => Self::Abandoned, _ => Self::Success, } } } /// Compressed trajectory (generated at session end). #[derive(Debug, Clone, Serialize, Deserialize)] pub struct CompressedTrajectory { pub id: String, pub session_id: String, pub agent_id: String, pub request_type: String, pub tools_used: Vec, pub outcome: CompletionStatus, pub total_steps: usize, pub total_duration_ms: u64, pub total_tokens: u32, /// Serialised JSON execution chain for analysis. pub execution_chain: String, pub satisfaction_signal: Option, pub created_at: DateTime, } // --------------------------------------------------------------------------- // Store // --------------------------------------------------------------------------- /// Persistent store for trajectory events and compressed trajectories. pub struct TrajectoryStore { pool: SqlitePool, } impl TrajectoryStore { /// Create a new `TrajectoryStore` backed by the given SQLite pool. pub fn new(pool: SqlitePool) -> Self { Self { pool } } /// Create the required tables. Idempotent -- safe to call on startup. pub async fn initialize_schema(&self) -> Result<()> { sqlx::query( r#" CREATE TABLE IF NOT EXISTS trajectory_events ( id TEXT PRIMARY KEY, session_id TEXT NOT NULL, agent_id TEXT NOT NULL, step_index INTEGER NOT NULL, step_type TEXT NOT NULL, input_summary TEXT, output_summary TEXT, duration_ms INTEGER DEFAULT 0, timestamp TEXT NOT NULL ); CREATE INDEX IF NOT EXISTS idx_trajectory_session ON trajectory_events(session_id); "#, ) .execute(&self.pool) .await .map_err(|e| ZclawError::StorageError(e.to_string()))?; sqlx::query( r#" CREATE TABLE IF NOT EXISTS compressed_trajectories ( id TEXT PRIMARY KEY, session_id TEXT NOT NULL, agent_id TEXT NOT NULL, request_type TEXT NOT NULL, tools_used TEXT, outcome TEXT NOT NULL, total_steps INTEGER DEFAULT 0, total_duration_ms INTEGER DEFAULT 0, total_tokens INTEGER DEFAULT 0, execution_chain TEXT NOT NULL, satisfaction_signal TEXT, created_at TEXT NOT NULL ); CREATE INDEX IF NOT EXISTS idx_ct_request_type ON compressed_trajectories(request_type); CREATE INDEX IF NOT EXISTS idx_ct_outcome ON compressed_trajectories(outcome); "#, ) .execute(&self.pool) .await .map_err(|e| ZclawError::StorageError(e.to_string()))?; Ok(()) } /// Insert a raw trajectory event. pub async fn insert_event(&self, event: &TrajectoryEvent) -> Result<()> { sqlx::query( r#" INSERT INTO trajectory_events (id, session_id, agent_id, step_index, step_type, input_summary, output_summary, duration_ms, timestamp) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) "#, ) .bind(&event.id) .bind(&event.session_id) .bind(&event.agent_id) .bind(event.step_index as i64) .bind(event.step_type.as_str()) .bind(&event.input_summary) .bind(&event.output_summary) .bind(event.duration_ms as i64) .bind(event.timestamp.to_rfc3339()) .execute(&self.pool) .await .map_err(|e| { tracing::warn!("[TrajectoryStore] insert_event failed: {}", e); ZclawError::StorageError(e.to_string()) })?; Ok(()) } /// Retrieve all raw events for a session, ordered by step_index. pub async fn get_events_by_session(&self, session_id: &str) -> Result> { let rows = sqlx::query_as::<_, (String, String, String, i64, String, Option, Option, Option, String)>( r#" SELECT id, session_id, agent_id, step_index, step_type, input_summary, output_summary, duration_ms, timestamp FROM trajectory_events WHERE session_id = ? ORDER BY step_index ASC "#, ) .bind(session_id) .fetch_all(&self.pool) .await .map_err(|e| ZclawError::StorageError(e.to_string()))?; let mut events = Vec::with_capacity(rows.len()); for (id, sid, aid, step_idx, stype, input_s, output_s, dur_ms, ts) in rows { let timestamp = DateTime::parse_from_rfc3339(&ts) .map(|dt| dt.with_timezone(&Utc)) .unwrap_or_else(|_| Utc::now()); events.push(TrajectoryEvent { id, session_id: sid, agent_id: aid, step_index: step_idx as usize, step_type: TrajectoryStepType::from_str_lossy(&stype), input_summary: input_s.unwrap_or_default(), output_summary: output_s.unwrap_or_default(), duration_ms: dur_ms.unwrap_or(0) as u64, timestamp, }); } Ok(events) } /// Insert a compressed trajectory. pub async fn insert_compressed(&self, trajectory: &CompressedTrajectory) -> Result<()> { let tools_json = serde_json::to_string(&trajectory.tools_used) .map_err(|e| ZclawError::StorageError(e.to_string()))?; sqlx::query( r#" INSERT INTO compressed_trajectories (id, session_id, agent_id, request_type, tools_used, outcome, total_steps, total_duration_ms, total_tokens, execution_chain, satisfaction_signal, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) "#, ) .bind(&trajectory.id) .bind(&trajectory.session_id) .bind(&trajectory.agent_id) .bind(&trajectory.request_type) .bind(&tools_json) .bind(trajectory.outcome.as_str()) .bind(trajectory.total_steps as i64) .bind(trajectory.total_duration_ms as i64) .bind(trajectory.total_tokens as i64) .bind(&trajectory.execution_chain) .bind(trajectory.satisfaction_signal.map(|s| s.as_str())) .bind(trajectory.created_at.to_rfc3339()) .execute(&self.pool) .await .map_err(|e| { tracing::warn!("[TrajectoryStore] insert_compressed failed: {}", e); ZclawError::StorageError(e.to_string()) })?; Ok(()) } /// Retrieve the compressed trajectory for a session, if any. pub async fn get_compressed_by_session(&self, session_id: &str) -> Result> { let row = sqlx::query_as::<_, ( String, String, String, String, Option, String, i64, i64, i64, String, Option, String, )>( r#" SELECT id, session_id, agent_id, request_type, tools_used, outcome, total_steps, total_duration_ms, total_tokens, execution_chain, satisfaction_signal, created_at FROM compressed_trajectories WHERE session_id = ? "#, ) .bind(session_id) .fetch_optional(&self.pool) .await .map_err(|e| ZclawError::StorageError(e.to_string()))?; match row { Some((id, sid, aid, req_type, tools_json, outcome_str, steps, dur_ms, tokens, chain, sat, created)) => { let tools_used: Vec = tools_json .as_deref() .and_then(|j| serde_json::from_str(j).ok()) .unwrap_or_default(); let timestamp = DateTime::parse_from_rfc3339(&created) .map(|dt| dt.with_timezone(&Utc)) .unwrap_or_else(|_| Utc::now()); Ok(Some(CompressedTrajectory { id, session_id: sid, agent_id: aid, request_type: req_type, tools_used, outcome: CompletionStatus::from_str_lossy(&outcome_str), total_steps: steps as usize, total_duration_ms: dur_ms as u64, total_tokens: tokens as u32, execution_chain: chain, satisfaction_signal: sat.as_deref().and_then(SatisfactionSignal::from_str_lossy), created_at: timestamp, })) } None => Ok(None), } } /// Delete raw trajectory events older than `days` days. Returns count deleted. pub async fn delete_events_older_than(&self, days: i64) -> Result { let result = sqlx::query( r#" DELETE FROM trajectory_events WHERE timestamp < datetime('now', ?) "#, ) .bind(format!("-{} days", days)) .execute(&self.pool) .await .map_err(|e| { tracing::warn!("[TrajectoryStore] delete_events_older_than failed: {}", e); ZclawError::StorageError(e.to_string()) })?; Ok(result.rows_affected()) } /// Delete compressed trajectories older than `days` days. Returns count deleted. pub async fn delete_compressed_older_than(&self, days: i64) -> Result { let result = sqlx::query( r#" DELETE FROM compressed_trajectories WHERE created_at < datetime('now', ?) "#, ) .bind(format!("-{} days", days)) .execute(&self.pool) .await .map_err(|e| { tracing::warn!("[TrajectoryStore] delete_compressed_older_than failed: {}", e); ZclawError::StorageError(e.to_string()) })?; Ok(result.rows_affected()) } } // --------------------------------------------------------------------------- // Tests // --------------------------------------------------------------------------- #[cfg(test)] mod tests { use super::*; async fn test_store() -> TrajectoryStore { let pool = SqlitePool::connect("sqlite::memory:") .await .expect("in-memory pool"); let store = TrajectoryStore::new(pool); store.initialize_schema().await.expect("schema init"); store } fn sample_event(index: usize) -> TrajectoryEvent { TrajectoryEvent { id: format!("evt-{}", index), session_id: "sess-1".to_string(), agent_id: "agent-1".to_string(), step_index: index, step_type: TrajectoryStepType::ToolExecution, input_summary: "search query".to_string(), output_summary: "3 results found".to_string(), duration_ms: 150, timestamp: Utc::now(), } } #[tokio::test] async fn test_insert_and_get_events() { let store = test_store().await; let e1 = sample_event(0); let e2 = TrajectoryEvent { id: "evt-1".to_string(), step_index: 1, step_type: TrajectoryStepType::LlmGeneration, ..sample_event(0) }; store.insert_event(&e1).await.unwrap(); store.insert_event(&e2).await.unwrap(); let events = store.get_events_by_session("sess-1").await.unwrap(); assert_eq!(events.len(), 2); assert_eq!(events[0].step_index, 0); assert_eq!(events[1].step_index, 1); assert_eq!(events[0].step_type, TrajectoryStepType::ToolExecution); assert_eq!(events[1].step_type, TrajectoryStepType::LlmGeneration); } #[tokio::test] async fn test_get_events_empty_session() { let store = test_store().await; let events = store.get_events_by_session("nonexistent").await.unwrap(); assert!(events.is_empty()); } #[tokio::test] async fn test_insert_and_get_compressed() { let store = test_store().await; let ct = CompressedTrajectory { id: "ct-1".to_string(), session_id: "sess-1".to_string(), agent_id: "agent-1".to_string(), request_type: "data_query".to_string(), tools_used: vec!["search".to_string(), "calculate".to_string()], outcome: CompletionStatus::Success, total_steps: 5, total_duration_ms: 1200, total_tokens: 350, execution_chain: r#"[{"step":0,"type":"tool_execution"}]"#.to_string(), satisfaction_signal: Some(SatisfactionSignal::Positive), created_at: Utc::now(), }; store.insert_compressed(&ct).await.unwrap(); let loaded = store.get_compressed_by_session("sess-1").await.unwrap(); assert!(loaded.is_some()); let loaded = loaded.unwrap(); assert_eq!(loaded.id, "ct-1"); assert_eq!(loaded.request_type, "data_query"); assert_eq!(loaded.tools_used.len(), 2); assert_eq!(loaded.outcome, CompletionStatus::Success); assert_eq!(loaded.satisfaction_signal, Some(SatisfactionSignal::Positive)); } #[tokio::test] async fn test_get_compressed_nonexistent() { let store = test_store().await; let result = store.get_compressed_by_session("nonexistent").await.unwrap(); assert!(result.is_none()); } #[tokio::test] async fn test_step_type_roundtrip() { let all_types = [ TrajectoryStepType::UserRequest, TrajectoryStepType::IntentClassification, TrajectoryStepType::SkillSelection, TrajectoryStepType::ToolExecution, TrajectoryStepType::LlmGeneration, TrajectoryStepType::UserFeedback, ]; for st in all_types { assert_eq!(TrajectoryStepType::from_str_lossy(st.as_str()), st); } } #[tokio::test] async fn test_satisfaction_signal_roundtrip() { let signals = [SatisfactionSignal::Positive, SatisfactionSignal::Negative, SatisfactionSignal::Neutral]; for sig in signals { assert_eq!(SatisfactionSignal::from_str_lossy(sig.as_str()), Some(sig)); } assert_eq!(SatisfactionSignal::from_str_lossy("bogus"), None); } #[tokio::test] async fn test_completion_status_roundtrip() { let statuses = [CompletionStatus::Success, CompletionStatus::Partial, CompletionStatus::Failed, CompletionStatus::Abandoned]; for s in statuses { assert_eq!(CompletionStatus::from_str_lossy(s.as_str()), s); } } #[tokio::test] async fn test_delete_events_older_than() { let store = test_store().await; // Insert an event with a timestamp far in the past let old_event = TrajectoryEvent { id: "old-evt".to_string(), timestamp: Utc::now() - chrono::Duration::days(100), ..sample_event(0) }; store.insert_event(&old_event).await.unwrap(); // Insert a recent event let recent_event = TrajectoryEvent { id: "recent-evt".to_string(), step_index: 1, ..sample_event(0) }; store.insert_event(&recent_event).await.unwrap(); let deleted = store.delete_events_older_than(30).await.unwrap(); assert_eq!(deleted, 1); let remaining = store.get_events_by_session("sess-1").await.unwrap(); assert_eq!(remaining.len(), 1); assert_eq!(remaining[0].id, "recent-evt"); } }