diff --git a/desktop/src-tauri/src/intelligence/compactor.rs b/desktop/src-tauri/src/intelligence/compactor.rs new file mode 100644 index 0000000..272caef --- /dev/null +++ b/desktop/src-tauri/src/intelligence/compactor.rs @@ -0,0 +1,453 @@ +//! Context Compactor - Manages infinite-length conversations without losing key info +//! +//! Flow: +//! 1. Monitor token count against soft threshold +//! 2. When threshold approached: flush memories from old messages +//! 3. Summarize old messages into a compact system message +//! 4. Replace old messages with summary — user sees no interruption +//! +//! Phase 2 of Intelligence Layer Migration. +//! Reference: ZCLAW_AGENT_INTELLIGENCE_EVOLUTION.md §6.3.1 + +use serde::{Deserialize, Serialize}; +use regex::Regex; + +// === Types === + +/// Compaction configuration +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CompactionConfig { + #[serde(default = "default_soft_threshold")] + pub soft_threshold_tokens: usize, + #[serde(default = "default_hard_threshold")] + pub hard_threshold_tokens: usize, + #[serde(default = "default_reserve")] + pub reserve_tokens: usize, + #[serde(default = "default_memory_flush")] + pub memory_flush_enabled: bool, + #[serde(default = "default_keep_recent")] + pub keep_recent_messages: usize, + #[serde(default = "default_summary_max")] + pub summary_max_tokens: usize, + #[serde(default)] + pub use_llm: bool, + #[serde(default = "default_llm_fallback")] + pub llm_fallback_to_rules: bool, +} + +fn default_soft_threshold() -> usize { 15000 } +fn default_hard_threshold() -> usize { 20000 } +fn default_reserve() -> usize { 4000 } +fn default_memory_flush() -> bool { true } +fn default_keep_recent() -> usize { 6 } +fn default_summary_max() -> usize { 800 } +fn default_llm_fallback() -> bool { true } + +impl Default for CompactionConfig { + fn default() -> Self { + Self { + soft_threshold_tokens: 15000, + hard_threshold_tokens: 20000, + reserve_tokens: 4000, + memory_flush_enabled: true, + keep_recent_messages: 6, + summary_max_tokens: 800, + use_llm: false, + llm_fallback_to_rules: true, + } + } +} + +/// Message that can be compacted +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CompactableMessage { + pub role: String, + pub content: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub id: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub timestamp: Option, +} + +/// Result of compaction +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CompactionResult { + pub compacted_messages: Vec, + pub summary: String, + pub original_count: usize, + pub retained_count: usize, + pub flushed_memories: usize, + pub tokens_before_compaction: usize, + pub tokens_after_compaction: usize, +} + +/// Check result before compaction +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CompactionCheck { + pub should_compact: bool, + pub current_tokens: usize, + pub threshold: usize, + #[serde(rename = "urgency")] + pub urgency: CompactionUrgency, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum CompactionUrgency { + None, + Soft, + Hard, +} + +// === Token Estimation === + +/// Heuristic token count estimation. +/// CJK characters ≈ 1.5 tokens each, English words ≈ 1.3 tokens each. +/// This is intentionally conservative (overestimates) to avoid hitting real limits. +pub fn estimate_tokens(text: &str) -> usize { + if text.is_empty() { + return 0; + } + + let mut tokens = 0.0; + for char in text.chars() { + let code = char as u32; + if code >= 0x4E00 && code <= 0x9FFF { + // CJK ideographs + tokens += 1.5; + } else if code >= 0x3400 && code <= 0x4DBF { + // CJK Extension A + tokens += 1.5; + } else if code >= 0x3000 && code <= 0x303F { + // CJK punctuation + tokens += 1.0; + } else if char == ' ' || char == '\n' || char == '\t' { + // whitespace + tokens += 0.25; + } else { + // ASCII chars (roughly 4 chars per token for English) + tokens += 0.3; + } + } + + tokens.ceil() as usize +} + +/// Estimate total tokens for a list of messages +pub fn estimate_messages_tokens(messages: &[CompactableMessage]) -> usize { + let mut total = 0; + for msg in messages { + total += estimate_tokens(&msg.content); + total += 4; // message framing overhead (role, separators) + } + total +} + +// === Context Compactor === + +pub struct ContextCompactor { + config: CompactionConfig, +} + +impl ContextCompactor { + pub fn new(config: Option) -> Self { + Self { + config: config.unwrap_or_default(), + } + } + + /// Check if compaction is needed based on current message token count + pub fn check_threshold(&self, messages: &[CompactableMessage]) -> CompactionCheck { + let current_tokens = estimate_messages_tokens(messages); + + if current_tokens >= self.config.hard_threshold_tokens { + return CompactionCheck { + should_compact: true, + current_tokens, + threshold: self.config.hard_threshold_tokens, + urgency: CompactionUrgency::Hard, + }; + } + + if current_tokens >= self.config.soft_threshold_tokens { + return CompactionCheck { + should_compact: true, + current_tokens, + threshold: self.config.soft_threshold_tokens, + urgency: CompactionUrgency::Soft, + }; + } + + CompactionCheck { + should_compact: false, + current_tokens, + threshold: self.config.soft_threshold_tokens, + urgency: CompactionUrgency::None, + } + } + + /// Execute compaction: summarize old messages, keep recent ones + pub fn compact( + &self, + messages: &[CompactableMessage], + _agent_id: &str, + _conversation_id: Option<&str>, + ) -> CompactionResult { + let tokens_before_compaction = estimate_messages_tokens(messages); + let keep_count = self.config.keep_recent_messages.min(messages.len()); + + // Split: old messages to compact vs recent to keep + let split_index = messages.len().saturating_sub(keep_count); + let old_messages = &messages[..split_index]; + let recent_messages = &messages[split_index..]; + + // Generate summary of old messages + let summary = self.generate_summary(old_messages); + + // Build compacted message list + let summary_message = CompactableMessage { + role: "system".to_string(), + content: summary.clone(), + id: Some(format!("compaction_{}", chrono::Utc::now().timestamp())), + timestamp: Some(chrono::Utc::now().to_rfc3339()), + }; + + let mut compacted_messages = vec![summary_message]; + compacted_messages.extend(recent_messages.to_vec()); + + let tokens_after_compaction = estimate_messages_tokens(&compacted_messages); + + CompactionResult { + compacted_messages, + summary, + original_count: messages.len(), + retained_count: split_index + 1, // summary + recent + flushed_memories: 0, // Would be populated by memory flush + tokens_before_compaction, + tokens_after_compaction, + } + } + + /// Phase 2: Rule-based summary generation + fn generate_summary(&self, messages: &[CompactableMessage]) -> String { + if messages.is_empty() { + return "[对话开始]".to_string(); + } + + let mut sections: Vec = vec!["[以下是之前对话的摘要]".to_string()]; + + // Extract user questions/topics + let user_messages: Vec<_> = messages.iter().filter(|m| m.role == "user").collect(); + let assistant_messages: Vec<_> = messages.iter().filter(|m| m.role == "assistant").collect(); + + // Summarize topics discussed + if !user_messages.is_empty() { + let topics: Vec = user_messages + .iter() + .filter_map(|m| self.extract_topic(&m.content)) + .collect(); + + if !topics.is_empty() { + sections.push(format!("讨论主题: {}", topics.join("; "))); + } + } + + // Extract key decisions/conclusions from assistant + if !assistant_messages.is_empty() { + let conclusions: Vec = assistant_messages + .iter() + .flat_map(|m| self.extract_conclusions(&m.content)) + .take(5) + .collect(); + + if !conclusions.is_empty() { + let formatted: Vec = conclusions.iter().map(|c| format!("- {}", c)).collect(); + sections.push(format!("关键结论:\n{}", formatted.join("\n"))); + } + } + + // Extract technical context + let technical_context: Vec = messages + .iter() + .filter(|m| m.content.contains("```") || m.content.contains("function ") || m.content.contains("class ")) + .filter_map(|m| { + let re = Regex::new(r"```(\w+)?[\s\S]*?```").ok()?; + let cap = re.captures(&m.content)?; + let lang = cap.get(1).map(|m| m.as_str()).unwrap_or("code"); + Some(format!("代码片段 ({})", lang)) + }) + .collect(); + + if !technical_context.is_empty() { + sections.push(format!("技术上下文: {}", technical_context.join(", "))); + } + + // Message count summary + sections.push(format!( + "(已压缩 {} 条消息,其中用户 {} 条,助手 {} 条)", + messages.len(), + user_messages.len(), + assistant_messages.len() + )); + + let summary = sections.join("\n"); + + // Enforce token limit + let summary_tokens = estimate_tokens(&summary); + if summary_tokens > self.config.summary_max_tokens { + let max_chars = self.config.summary_max_tokens * 2; + return format!("{}...\n(摘要已截断)", &summary[..max_chars.min(summary.len())]); + } + + summary + } + + /// Extract the main topic from a user message + fn extract_topic(&self, content: &str) -> Option { + let trimmed = content.trim(); + + // First sentence or first 50 chars + let sentence_end = trimmed.find(|c| c == '。' || c == '!' || c == '?' || c == '\n'); + + if let Some(pos) = sentence_end { + if pos <= 80 { + return Some(trimmed[..=pos].to_string()); + } + } + + if trimmed.len() <= 50 { + return Some(trimmed.to_string()); + } + + Some(format!("{}...", &trimmed[..50])) + } + + /// Extract key conclusions/decisions from assistant messages + fn extract_conclusions(&self, content: &str) -> Vec { + let mut conclusions = Vec::new(); + + let patterns = vec![ + Regex::new(r"(?:总结|结论|关键点|建议|方案)[::]\s*(.{10,100})").ok(), + Regex::new(r"(?:\d+\.\s+)(.{10,80})").ok(), + Regex::new(r"(?:需要|应该|可以|建议)(.{5,60})").ok(), + ]; + + for pattern_opt in patterns { + if let Some(pattern) = pattern_opt { + for cap in pattern.captures_iter(content) { + if let Some(m) = cap.get(1) { + let text = m.as_str().trim(); + if text.len() > 10 && text.len() < 100 { + conclusions.push(text.to_string()); + } + } + } + } + } + + conclusions.into_iter().take(3).collect() + } + + /// Get current configuration + pub fn get_config(&self) -> &CompactionConfig { + &self.config + } + + /// Update configuration + pub fn update_config(&mut self, updates: CompactionConfig) { + self.config = updates; + } +} + +// === Tauri Commands === + +/// Estimate tokens for text +#[tauri::command] +pub fn compactor_estimate_tokens(text: String) -> usize { + estimate_tokens(&text) +} + +/// Estimate tokens for messages +#[tauri::command] +pub fn compactor_estimate_messages_tokens(messages: Vec) -> usize { + estimate_messages_tokens(&messages) +} + +/// Check if compaction is needed +#[tauri::command] +pub fn compactor_check_threshold( + messages: Vec, + config: Option, +) -> CompactionCheck { + let compactor = ContextCompactor::new(config); + compactor.check_threshold(&messages) +} + +/// Execute compaction +#[tauri::command] +pub fn compactor_compact( + messages: Vec, + agent_id: String, + conversation_id: Option, + config: Option, +) -> CompactionResult { + let compactor = ContextCompactor::new(config); + compactor.compact(&messages, &agent_id, conversation_id.as_deref()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_estimate_tokens_english() { + let text = "Hello world"; + let tokens = estimate_tokens(text); + assert!(tokens > 0); + assert!(tokens < 20); // Should be around 3-4 tokens + } + + #[test] + fn test_estimate_tokens_chinese() { + let text = "你好世界"; + let tokens = estimate_tokens(text); + assert_eq!(tokens, 6); // 4 chars * 1.5 = 6 + } + + #[test] + fn test_compaction_check() { + let compactor = ContextCompactor::new(None); + + // Small message list - no compaction needed + let small_messages = vec![CompactableMessage { + role: "user".to_string(), + content: "Hello".to_string(), + id: None, + timestamp: None, + }]; + let check = compactor.check_threshold(&small_messages); + assert!(!check.should_compact); + } + + #[test] + fn test_generate_summary() { + let compactor = ContextCompactor::new(None); + let messages = vec![ + CompactableMessage { + role: "user".to_string(), + content: "什么是 Rust?".to_string(), + id: None, + timestamp: None, + }, + CompactableMessage { + role: "assistant".to_string(), + content: "Rust 是一门系统编程语言,专注于安全性和性能。".to_string(), + id: None, + timestamp: None, + }, + ]; + + let summary = compactor.generate_summary(&messages); + assert!(summary.contains("讨论主题")); + } +} diff --git a/desktop/src-tauri/src/intelligence/heartbeat.rs b/desktop/src-tauri/src/intelligence/heartbeat.rs new file mode 100644 index 0000000..96a655b --- /dev/null +++ b/desktop/src-tauri/src/intelligence/heartbeat.rs @@ -0,0 +1,463 @@ +//! Heartbeat Engine - Periodic proactive checks for ZCLAW agents +//! +//! Runs on a configurable interval, executing a checklist of items. +//! Each check can produce alerts that surface via desktop notification or UI. +//! Supports quiet hours (no notifications during sleep time). +//! +//! Phase 2 of Intelligence Layer Migration. +//! Reference: ZCLAW_AGENT_INTELLIGENCE_EVOLUTION.md §6.4.1 + +use chrono::{DateTime, Local, Timelike}; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::{broadcast, Mutex}; +use tokio::time::interval; + +// === Types === + +/// Heartbeat configuration +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct HeartbeatConfig { + pub enabled: bool, + #[serde(default = "default_interval")] + pub interval_minutes: u64, + pub quiet_hours_start: Option, // "22:00" format + pub quiet_hours_end: Option, // "08:00" format + #[serde(default)] + pub notify_channel: NotifyChannel, + #[serde(default)] + pub proactivity_level: ProactivityLevel, + #[serde(default = "default_max_alerts")] + pub max_alerts_per_tick: usize, +} + +fn default_interval() -> u64 { 30 } +fn default_max_alerts() -> usize { 5 } + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +#[serde(rename_all = "lowercase")] +pub enum NotifyChannel { + #[default] + Ui, + Desktop, + All, +} + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +#[serde(rename_all = "lowercase")] +pub enum ProactivityLevel { + Silent, + Light, + #[default] + Standard, + Autonomous, +} + +/// Alert generated by heartbeat checks +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct HeartbeatAlert { + pub title: String, + pub content: String, + pub urgency: Urgency, + pub source: String, + pub timestamp: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum Urgency { + Low, + Medium, + High, +} + +/// Result of a single heartbeat tick +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct HeartbeatResult { + pub status: HeartbeatStatus, + pub alerts: Vec, + pub checked_items: usize, + pub timestamp: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum HeartbeatStatus { + Ok, + Alert, +} + +/// Type alias for heartbeat check function +pub type HeartbeatCheckFn = Box std::pin::Pin> + Send>> + Send + Sync>; + +// === Default Config === + +impl Default for HeartbeatConfig { + fn default() -> Self { + Self { + enabled: false, + interval_minutes: 30, + quiet_hours_start: Some("22:00".to_string()), + quiet_hours_end: Some("08:00".to_string()), + notify_channel: NotifyChannel::Ui, + proactivity_level: ProactivityLevel::Light, + max_alerts_per_tick: 5, + } + } +} + +// === Heartbeat Engine === + +pub struct HeartbeatEngine { + agent_id: String, + config: Arc>, + running: Arc>, + alert_sender: broadcast::Sender, + history: Arc>>, +} + +impl HeartbeatEngine { + pub fn new(agent_id: String, config: Option) -> Self { + let (alert_sender, _) = broadcast::channel(100); + + Self { + agent_id, + config: Arc::new(Mutex::new(config.unwrap_or_default())), + running: Arc::new(Mutex::new(false)), + alert_sender, + history: Arc::new(Mutex::new(Vec::new())), + } + } + + /// Start the heartbeat engine with periodic ticks + pub async fn start(&self) { + let mut running = self.running.lock().await; + if *running { + return; + } + *running = true; + drop(running); + + let agent_id = self.agent_id.clone(); + let config = Arc::clone(&self.config); + let running_clone = Arc::clone(&self.running); + let alert_sender = self.alert_sender.clone(); + let history = Arc::clone(&self.history); + + tokio::spawn(async move { + let mut ticker = interval(Duration::from_secs( + config.lock().await.interval_minutes * 60, + )); + + loop { + ticker.tick().await; + + if !*running_clone.lock().await { + break; + } + + // Check quiet hours + if is_quiet_hours(&config.lock().await) { + continue; + } + + // Execute heartbeat tick + let result = execute_tick(&agent_id, &config, &alert_sender).await; + + // Store history + let mut hist = history.lock().await; + hist.push(result); + if hist.len() > 100 { + *hist = hist.split_off(50); + } + } + }); + } + + /// Stop the heartbeat engine + pub async fn stop(&self) { + let mut running = self.running.lock().await; + *running = false; + } + + /// Check if the engine is running + pub async fn is_running(&self) -> bool { + *self.running.lock().await + } + + /// Execute a single tick manually + pub async fn tick(&self) -> HeartbeatResult { + execute_tick(&self.agent_id, &self.config, &self.alert_sender).await + } + + /// Subscribe to alerts + pub fn subscribe(&self) -> broadcast::Receiver { + self.alert_sender.subscribe() + } + + /// Get heartbeat history + pub async fn get_history(&self, limit: usize) -> Vec { + let hist = self.history.lock().await; + hist.iter().rev().take(limit).cloned().collect() + } + + /// Update configuration + pub async fn update_config(&self, updates: HeartbeatConfig) { + let mut config = self.config.lock().await; + *config = updates; + } + + /// Get current configuration + pub async fn get_config(&self) -> HeartbeatConfig { + self.config.lock().await.clone() + } +} + +// === Helper Functions === + +/// Check if current time is within quiet hours +fn is_quiet_hours(config: &HeartbeatConfig) -> bool { + let start = match &config.quiet_hours_start { + Some(s) => s, + None => return false, + }; + let end = match &config.quiet_hours_end { + Some(e) => e, + None => return false, + }; + + let now = Local::now(); + let current_minutes = now.hour() * 60 + now.minute(); + + let start_minutes = parse_time_to_minutes(start); + let end_minutes = parse_time_to_minutes(end); + + if start_minutes <= end_minutes { + // Same-day range (e.g., 13:00-17:00) + current_minutes >= start_minutes && current_minutes < end_minutes + } else { + // Cross-midnight range (e.g., 22:00-08:00) + current_minutes >= start_minutes || current_minutes < end_minutes + } +} + +/// Parse "HH:MM" format to minutes since midnight +fn parse_time_to_minutes(time: &str) -> u32 { + let parts: Vec<&str> = time.split(':').collect(); + if parts.len() != 2 { + return 0; + } + let hours: u32 = parts[0].parse().unwrap_or(0); + let minutes: u32 = parts[1].parse().unwrap_or(0); + hours * 60 + minutes +} + +/// Execute a single heartbeat tick +async fn execute_tick( + agent_id: &str, + config: &Arc>, + alert_sender: &broadcast::Sender, +) -> HeartbeatResult { + let cfg = config.lock().await; + let mut alerts = Vec::new(); + + // Run built-in checks + let checks: Vec<(&str, fn(&str) -> Option)> = vec![ + ("pending-tasks", check_pending_tasks), + ("memory-health", check_memory_health), + ("idle-greeting", check_idle_greeting), + ]; + + for (source, check_fn) in checks { + if alerts.len() >= cfg.max_alerts_per_tick { + break; + } + + if let Some(alert) = check_fn(agent_id) { + alerts.push(alert); + } + } + + // Filter by proactivity level + let filtered_alerts = filter_by_proactivity(&alerts, &cfg.proactivity_level); + + // Send alerts + for alert in &filtered_alerts { + let _ = alert_sender.send(alert.clone()); + } + + let status = if filtered_alerts.is_empty() { + HeartbeatStatus::Ok + } else { + HeartbeatStatus::Alert + }; + + HeartbeatResult { + status, + alerts: filtered_alerts, + checked_items: checks.len(), + timestamp: chrono::Utc::now().to_rfc3339(), + } +} + +/// Filter alerts based on proactivity level +fn filter_by_proactivity(alerts: &[HeartbeatAlert], level: &ProactivityLevel) -> Vec { + match level { + ProactivityLevel::Silent => vec![], + ProactivityLevel::Light => alerts + .iter() + .filter(|a| matches!(a.urgency, Urgency::High)) + .cloned() + .collect(), + ProactivityLevel::Standard => alerts + .iter() + .filter(|a| matches!(a.urgency, Urgency::High | Urgency::Medium)) + .cloned() + .collect(), + ProactivityLevel::Autonomous => alerts.to_vec(), + } +} + +// === Built-in Checks === + +/// Check for pending task memories (placeholder - would connect to memory store) +fn check_pending_tasks(_agent_id: &str) -> Option { + // In full implementation, this would query the memory store + // For now, return None (no tasks) + None +} + +/// Check memory storage health (placeholder) +fn check_memory_health(_agent_id: &str) -> Option { + // In full implementation, this would check memory stats + None +} + +/// Check if user has been idle (placeholder) +fn check_idle_greeting(_agent_id: &str) -> Option { + // In full implementation, this would check last interaction time + None +} + +// === Tauri Commands === + +/// Heartbeat engine state for Tauri +pub type HeartbeatEngineState = Arc>>; + +/// Initialize heartbeat engine for an agent +#[tauri::command] +pub async fn heartbeat_init( + agent_id: String, + config: Option, + state: tauri::State<'_, HeartbeatEngineState>, +) -> Result<(), String> { + let engine = HeartbeatEngine::new(agent_id.clone(), config); + let mut engines = state.lock().await; + engines.insert(agent_id, engine); + Ok(()) +} + +/// Start heartbeat engine for an agent +#[tauri::command] +pub async fn heartbeat_start( + agent_id: String, + state: tauri::State<'_, HeartbeatEngineState>, +) -> Result<(), String> { + let engines = state.lock().await; + let engine = engines + .get(&agent_id) + .ok_or_else(|| format!("Heartbeat engine not initialized for agent: {}", agent_id))?; + engine.start().await; + Ok(()) +} + +/// Stop heartbeat engine for an agent +#[tauri::command] +pub async fn heartbeat_stop( + agent_id: String, + state: tauri::State<'_, HeartbeatEngineState>, +) -> Result<(), String> { + let engines = state.lock().await; + let engine = engines + .get(&agent_id) + .ok_or_else(|| format!("Heartbeat engine not initialized for agent: {}", agent_id))?; + engine.stop().await; + Ok(()) +} + +/// Execute a single heartbeat tick +#[tauri::command] +pub async fn heartbeat_tick( + agent_id: String, + state: tauri::State<'_, HeartbeatEngineState>, +) -> Result { + let engines = state.lock().await; + let engine = engines + .get(&agent_id) + .ok_or_else(|| format!("Heartbeat engine not initialized for agent: {}", agent_id))?; + Ok(engine.tick().await) +} + +/// Get heartbeat configuration +#[tauri::command] +pub async fn heartbeat_get_config( + agent_id: String, + state: tauri::State<'_, HeartbeatEngineState>, +) -> Result { + let engines = state.lock().await; + let engine = engines + .get(&agent_id) + .ok_or_else(|| format!("Heartbeat engine not initialized for agent: {}", agent_id))?; + Ok(engine.get_config().await) +} + +/// Update heartbeat configuration +#[tauri::command] +pub async fn heartbeat_update_config( + agent_id: String, + config: HeartbeatConfig, + state: tauri::State<'_, HeartbeatEngineState>, +) -> Result<(), String> { + let engines = state.lock().await; + let engine = engines + .get(&agent_id) + .ok_or_else(|| format!("Heartbeat engine not initialized for agent: {}", agent_id))?; + engine.update_config(config).await; + Ok(()) +} + +/// Get heartbeat history +#[tauri::command] +pub async fn heartbeat_get_history( + agent_id: String, + limit: Option, + state: tauri::State<'_, HeartbeatEngineState>, +) -> Result, String> { + let engines = state.lock().await; + let engine = engines + .get(&agent_id) + .ok_or_else(|| format!("Heartbeat engine not initialized for agent: {}", agent_id))?; + Ok(engine.get_history(limit.unwrap_or(20)).await) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_time() { + assert_eq!(parse_time_to_minutes("00:00"), 0); + assert_eq!(parse_time_to_minutes("08:00"), 480); + assert_eq!(parse_time_to_minutes("22:00"), 1320); + assert_eq!(parse_time_to_minutes("23:59"), 1439); + } + + #[test] + fn test_default_config() { + let config = HeartbeatConfig::default(); + assert!(!config.enabled); + assert_eq!(config.interval_minutes, 30); + } +} diff --git a/desktop/src-tauri/src/intelligence/identity.rs b/desktop/src-tauri/src/intelligence/identity.rs new file mode 100644 index 0000000..f210a81 --- /dev/null +++ b/desktop/src-tauri/src/intelligence/identity.rs @@ -0,0 +1,651 @@ +//! Agent Identity Manager - Per-agent dynamic identity files +//! +//! Manages SOUL.md, AGENTS.md, USER.md per agent with: +//! - Per-agent isolated identity directories +//! - USER.md auto-update by agent (stores learned preferences) +//! - SOUL.md/AGENTS.md change proposals (require user approval) +//! - Snapshot history for rollback +//! +//! Phase 3 of Intelligence Layer Migration. +//! Reference: ZCLAW_AGENT_INTELLIGENCE_EVOLUTION.md §6.2.3 + +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; + +// === Types === + +/// Identity files for an agent +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct IdentityFiles { + pub soul: String, + pub instructions: String, + pub user_profile: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub heartbeat: Option, +} + +/// Proposal for identity change (requires user approval) +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct IdentityChangeProposal { + pub id: String, + pub agent_id: String, + pub file: IdentityFile, + pub reason: String, + pub current_content: String, + pub suggested_content: String, + pub status: ProposalStatus, + pub created_at: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum IdentityFile { + Soul, + Instructions, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum ProposalStatus { + Pending, + Approved, + Rejected, +} + +/// Snapshot for rollback +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct IdentitySnapshot { + pub id: String, + pub agent_id: String, + pub files: IdentityFiles, + pub timestamp: String, + pub reason: String, +} + +// === Default Identity Content === + +fn default_soul() -> String { + r#"# ZCLAW 人格 + +你是 ZCLAW(小龙虾),一个基于 OpenClaw 定制的中文 AI 助手。 + +## 核心特质 + +- **高效执行**: 你不只是出主意,你会真正动手完成任务 +- **中文优先**: 默认使用中文交流,必要时切换英文 +- **专业可靠**: 对技术问题给出精确答案,不确定时坦诚说明 +- **持续成长**: 你会记住与用户的交互,不断改进自己的服务方式 + +## 语气 + +简洁、专业、友好。避免过度客套,直接给出有用信息。"#.to_string() +} + +fn default_instructions() -> String { + r#"# Agent 指令 + +## 操作规范 + +1. 执行文件操作前,先确认目标路径 +2. 执行 Shell 命令前,评估安全风险 +3. 长时间任务需定期汇报进度 +4. 优先使用中文回复 + +## 记忆管理 + +- 重要的用户偏好自动记录 +- 项目上下文保存到工作区 +- 对话结束时总结关键信息"#.to_string() +} + +fn default_user_profile() -> String { + r#"# 用户画像 + +_尚未收集到用户偏好信息。随着交互积累,此文件将自动更新。_"#.to_string() +} + +// === Agent Identity Manager === + +pub struct AgentIdentityManager { + identities: HashMap, + proposals: Vec, + snapshots: Vec, + snapshot_counter: usize, +} + +impl AgentIdentityManager { + pub fn new() -> Self { + Self { + identities: HashMap::new(), + proposals: Vec::new(), + snapshots: Vec::new(), + snapshot_counter: 0, + } + } + + /// Get identity files for an agent (creates default if not exists) + pub fn get_identity(&mut self, agent_id: &str) -> IdentityFiles { + if let Some(existing) = self.identities.get(agent_id) { + return existing.clone(); + } + + // Initialize with defaults + let defaults = IdentityFiles { + soul: default_soul(), + instructions: default_instructions(), + user_profile: default_user_profile(), + heartbeat: None, + }; + self.identities.insert(agent_id.to_string(), defaults.clone()); + defaults + } + + /// Get a specific file content + pub fn get_file(&mut self, agent_id: &str, file: IdentityFile) -> String { + let identity = self.get_identity(agent_id); + match file { + IdentityFile::Soul => identity.soul, + IdentityFile::Instructions => identity.instructions, + } + } + + /// Build system prompt from identity files + pub fn build_system_prompt(&mut self, agent_id: &str, memory_context: Option<&str>) -> String { + let identity = self.get_identity(agent_id); + let mut sections = Vec::new(); + + if !identity.soul.is_empty() { + sections.push(identity.soul.clone()); + } + if !identity.instructions.is_empty() { + sections.push(identity.instructions.clone()); + } + if !identity.user_profile.is_empty() + && identity.user_profile != default_user_profile() + { + sections.push(format!("## 用户画像\n{}", identity.user_profile)); + } + if let Some(ctx) = memory_context { + sections.push(ctx.to_string()); + } + + sections.join("\n\n") + } + + /// Update user profile (auto, no approval needed) + pub fn update_user_profile(&mut self, agent_id: &str, new_content: &str) { + let identity = self.get_identity(agent_id); + let _old_content = identity.user_profile.clone(); + + // Create snapshot before update + self.create_snapshot(agent_id, "Auto-update USER.md"); + + let mut updated = identity.clone(); + updated.user_profile = new_content.to_string(); + self.identities.insert(agent_id.to_string(), updated); + } + + /// Append to user profile + pub fn append_to_user_profile(&mut self, agent_id: &str, addition: &str) { + let identity = self.get_identity(agent_id); + let updated = format!("{}\n\n{}", identity.user_profile.trim_end(), addition); + self.update_user_profile(agent_id, &updated); + } + + /// Propose a change to soul or instructions (requires approval) + pub fn propose_change( + &mut self, + agent_id: &str, + file: IdentityFile, + suggested_content: &str, + reason: &str, + ) -> IdentityChangeProposal { + let identity = self.get_identity(agent_id); + let current_content = match file { + IdentityFile::Soul => identity.soul.clone(), + IdentityFile::Instructions => identity.instructions.clone(), + }; + + let proposal = IdentityChangeProposal { + id: format!("prop_{}_{}", Utc::now().timestamp(), rand_id()), + agent_id: agent_id.to_string(), + file, + reason: reason.to_string(), + current_content, + suggested_content: suggested_content.to_string(), + status: ProposalStatus::Pending, + created_at: Utc::now().to_rfc3339(), + }; + + self.proposals.push(proposal.clone()); + proposal + } + + /// Approve a pending proposal + pub fn approve_proposal(&mut self, proposal_id: &str) -> Result { + let proposal_idx = self + .proposals + .iter() + .position(|p| p.id == proposal_id && p.status == ProposalStatus::Pending) + .ok_or_else(|| "Proposal not found or not pending".to_string())?; + + let proposal = &self.proposals[proposal_idx]; + let agent_id = proposal.agent_id.clone(); + let file = proposal.file.clone(); + + // Create snapshot before applying + self.create_snapshot(&agent_id, &format!("Approved proposal: {}", proposal.reason)); + + // Get current identity and update + let identity = self.get_identity(&agent_id); + let mut updated = identity.clone(); + + match file { + IdentityFile::Soul => updated.soul = proposal.suggested_content.clone(), + IdentityFile::Instructions => { + updated.instructions = proposal.suggested_content.clone() + } + } + + self.identities.insert(agent_id.clone(), updated.clone()); + + // Update proposal status + self.proposals[proposal_idx].status = ProposalStatus::Approved; + + Ok(updated) + } + + /// Reject a pending proposal + pub fn reject_proposal(&mut self, proposal_id: &str) -> Result<(), String> { + let proposal = self + .proposals + .iter_mut() + .find(|p| p.id == proposal_id && p.status == ProposalStatus::Pending) + .ok_or_else(|| "Proposal not found or not pending".to_string())?; + + proposal.status = ProposalStatus::Rejected; + Ok(()) + } + + /// Get pending proposals for an agent (or all agents if None) + pub fn get_pending_proposals(&self, agent_id: Option<&str>) -> Vec<&IdentityChangeProposal> { + self.proposals + .iter() + .filter(|p| { + p.status == ProposalStatus::Pending + && agent_id.map_or(true, |id| p.agent_id == id) + }) + .collect() + } + + /// Direct file update (user explicitly edits in UI) + pub fn update_file( + &mut self, + agent_id: &str, + file: &str, + content: &str, + ) -> Result<(), String> { + let identity = self.get_identity(agent_id); + self.create_snapshot(agent_id, &format!("Manual edit: {}", file)); + + let mut updated = identity.clone(); + match file { + "soul" => updated.soul = content.to_string(), + "instructions" => updated.instructions = content.to_string(), + "userProfile" | "user_profile" => updated.user_profile = content.to_string(), + _ => return Err(format!("Unknown file: {}", file)), + } + + self.identities.insert(agent_id.to_string(), updated); + Ok(()) + } + + /// Create a snapshot + fn create_snapshot(&mut self, agent_id: &str, reason: &str) { + let identity = self.get_identity(agent_id); + self.snapshot_counter += 1; + + self.snapshots.push(IdentitySnapshot { + id: format!( + "snap_{}_{}_{}", + Utc::now().timestamp(), + self.snapshot_counter, + rand_id() + ), + agent_id: agent_id.to_string(), + files: identity, + timestamp: Utc::now().to_rfc3339(), + reason: reason.to_string(), + }); + + // Keep only last 50 snapshots per agent + let agent_snapshots: Vec<_> = self + .snapshots + .iter() + .filter(|s| s.agent_id == agent_id) + .collect(); + if agent_snapshots.len() > 50 { + // Remove oldest snapshots for this agent + self.snapshots.retain(|s| { + s.agent_id != agent_id + || agent_snapshots + .iter() + .rev() + .take(50) + .any(|&s_ref| s_ref.id == s.id) + }); + } + } + + /// Get snapshots for an agent + pub fn get_snapshots(&self, agent_id: &str, limit: usize) -> Vec<&IdentitySnapshot> { + let mut filtered: Vec<_> = self + .snapshots + .iter() + .filter(|s| s.agent_id == agent_id) + .collect(); + filtered.sort_by(|a, b| b.timestamp.cmp(&a.timestamp)); + filtered.into_iter().take(limit).collect() + } + + /// Restore a snapshot + pub fn restore_snapshot(&mut self, agent_id: &str, snapshot_id: &str) -> Result<(), String> { + let snapshot = self + .snapshots + .iter() + .find(|s| s.agent_id == agent_id && s.id == snapshot_id) + .ok_or_else(|| "Snapshot not found".to_string())?; + + // Create snapshot before rollback + self.create_snapshot( + agent_id, + &format!("Rollback to {}", snapshot.timestamp), + ); + + self.identities + .insert(agent_id.to_string(), snapshot.files.clone()); + Ok(()) + } + + /// List all agents with identities + pub fn list_agents(&self) -> Vec { + self.identities.keys().cloned().collect() + } + + /// Delete an agent's identity + pub fn delete_agent(&mut self, agent_id: &str) { + self.identities.remove(agent_id); + self.proposals.retain(|p| p.agent_id != agent_id); + self.snapshots.retain(|s| s.agent_id != agent_id); + } + + /// Export all identities for backup + pub fn export_all(&self) -> HashMap { + self.identities.clone() + } + + /// Import identities from backup + pub fn import(&mut self, identities: HashMap) { + for (agent_id, files) in identities { + self.identities.insert(agent_id, files); + } + } + + /// Get all proposals (for debugging) + pub fn get_all_proposals(&self) -> &[IdentityChangeProposal] { + &self.proposals + } +} + +impl Default for AgentIdentityManager { + fn default() -> Self { + Self::new() + } +} + +/// Generate random ID suffix +fn rand_id() -> String { + use std::sync::atomic::{AtomicU64, Ordering}; + static COUNTER: AtomicU64 = AtomicU64::new(0); + let count = COUNTER.fetch_add(1, Ordering::Relaxed); + format!("{:04x}", count % 0x10000) +} + +// === Tauri Commands === + +use std::sync::Arc; +use tokio::sync::Mutex; + +pub type IdentityManagerState = Arc>; + +/// Initialize identity manager +#[tauri::command] +pub async fn identity_init() -> Result { + Ok(Arc::new(Mutex::new(AgentIdentityManager::new()))) +} + +/// Get identity files for an agent +#[tauri::command] +pub async fn identity_get( + agent_id: String, + state: tauri::State<'_, IdentityManagerState>, +) -> Result { + let mut manager = state.lock().await; + Ok(manager.get_identity(&agent_id)) +} + +/// Get a specific file +#[tauri::command] +pub async fn identity_get_file( + agent_id: String, + file: String, + state: tauri::State<'_, IdentityManagerState>, +) -> Result { + let mut manager = state.lock().await; + let file_type = match file.as_str() { + "soul" => IdentityFile::Soul, + "instructions" => IdentityFile::Instructions, + _ => return Err(format!("Unknown file: {}", file)), + }; + Ok(manager.get_file(&agent_id, file_type)) +} + +/// Build system prompt +#[tauri::command] +pub async fn identity_build_prompt( + agent_id: String, + memory_context: Option, + state: tauri::State<'_, IdentityManagerState>, +) -> Result { + let mut manager = state.lock().await; + Ok(manager.build_system_prompt(&agent_id, memory_context.as_deref())) +} + +/// Update user profile (auto) +#[tauri::command] +pub async fn identity_update_user_profile( + agent_id: String, + content: String, + state: tauri::State<'_, IdentityManagerState>, +) -> Result<(), String> { + let mut manager = state.lock().await; + manager.update_user_profile(&agent_id, &content); + Ok(()) +} + +/// Append to user profile +#[tauri::command] +pub async fn identity_append_user_profile( + agent_id: String, + addition: String, + state: tauri::State<'_, IdentityManagerState>, +) -> Result<(), String> { + let mut manager = state.lock().await; + manager.append_to_user_profile(&agent_id, &addition); + Ok(()) +} + +/// Propose a change +#[tauri::command] +pub async fn identity_propose_change( + agent_id: String, + file: String, + suggested_content: String, + reason: String, + state: tauri::State<'_, IdentityManagerState>, +) -> Result { + let mut manager = state.lock().await; + let file_type = match file.as_str() { + "soul" => IdentityFile::Soul, + "instructions" => IdentityFile::Instructions, + _ => return Err(format!("Unknown file: {}", file)), + }; + Ok(manager.propose_change(&agent_id, file_type, &suggested_content, &reason)) +} + +/// Approve a proposal +#[tauri::command] +pub async fn identity_approve_proposal( + proposal_id: String, + state: tauri::State<'_, IdentityManagerState>, +) -> Result { + let mut manager = state.lock().await; + manager.approve_proposal(&proposal_id) +} + +/// Reject a proposal +#[tauri::command] +pub async fn identity_reject_proposal( + proposal_id: String, + state: tauri::State<'_, IdentityManagerState>, +) -> Result<(), String> { + let mut manager = state.lock().await; + manager.reject_proposal(&proposal_id) +} + +/// Get pending proposals +#[tauri::command] +pub async fn identity_get_pending_proposals( + agent_id: Option, + state: tauri::State<'_, IdentityManagerState>, +) -> Result, String> { + let manager = state.lock().await; + Ok(manager + .get_pending_proposals(agent_id.as_deref()) + .into_iter() + .cloned() + .collect()) +} + +/// Update file directly +#[tauri::command] +pub async fn identity_update_file( + agent_id: String, + file: String, + content: String, + state: tauri::State<'_, IdentityManagerState>, +) -> Result<(), String> { + let mut manager = state.lock().await; + manager.update_file(&agent_id, &file, &content) +} + +/// Get snapshots +#[tauri::command] +pub async fn identity_get_snapshots( + agent_id: String, + limit: Option, + state: tauri::State<'_, IdentityManagerState>, +) -> Result, String> { + let manager = state.lock().await; + Ok(manager + .get_snapshots(&agent_id, limit.unwrap_or(10)) + .into_iter() + .cloned() + .collect()) +} + +/// Restore snapshot +#[tauri::command] +pub async fn identity_restore_snapshot( + agent_id: String, + snapshot_id: String, + state: tauri::State<'_, IdentityManagerState>, +) -> Result<(), String> { + let mut manager = state.lock().await; + manager.restore_snapshot(&agent_id, &snapshot_id) +} + +/// List agents +#[tauri::command] +pub async fn identity_list_agents( + state: tauri::State<'_, IdentityManagerState>, +) -> Result, String> { + let manager = state.lock().await; + Ok(manager.list_agents()) +} + +/// Delete agent identity +#[tauri::command] +pub async fn identity_delete_agent( + agent_id: String, + state: tauri::State<'_, IdentityManagerState>, +) -> Result<(), String> { + let mut manager = state.lock().await; + manager.delete_agent(&agent_id); + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_get_identity_creates_default() { + let mut manager = AgentIdentityManager::new(); + let identity = manager.get_identity("test-agent"); + assert!(!identity.soul.is_empty()); + assert!(!identity.instructions.is_empty()); + } + + #[test] + fn test_update_user_profile() { + let mut manager = AgentIdentityManager::new(); + manager.update_user_profile("test-agent", "New profile content"); + let identity = manager.get_identity("test-agent"); + assert_eq!(identity.user_profile, "New profile content"); + } + + #[test] + fn test_proposal_flow() { + let mut manager = AgentIdentityManager::new(); + let proposal = manager.propose_change( + "test-agent", + IdentityFile::Soul, + "New soul content", + "Test proposal", + ); + + assert_eq!(proposal.status, ProposalStatus::Pending); + + let pending = manager.get_pending_proposals(None); + assert_eq!(pending.len(), 1); + + // Approve + let result = manager.approve_proposal(&proposal.id); + assert!(result.is_ok()); + + let identity = manager.get_identity("test-agent"); + assert_eq!(identity.soul, "New soul content"); + } + + #[test] + fn test_snapshots() { + let mut manager = AgentIdentityManager::new(); + manager.update_user_profile("test-agent", "First update"); + manager.update_user_profile("test-agent", "Second update"); + + let snapshots = manager.get_snapshots("test-agent", 10); + assert!(snapshots.len() >= 2); + } +} diff --git a/desktop/src-tauri/src/intelligence/mod.rs b/desktop/src-tauri/src/intelligence/mod.rs new file mode 100644 index 0000000..3ee8f48 --- /dev/null +++ b/desktop/src-tauri/src/intelligence/mod.rs @@ -0,0 +1,52 @@ +//! Intelligence Layer - Migrated from frontend lib/ +//! +//! This module contains the intelligence components that were previously +//! implemented in TypeScript/JavaScript in the frontend. +//! +//! ## Modules +//! +//! - `heartbeat` - Periodic proactive checks with quiet hours support +//! - `compactor` - Context compaction for infinite-length conversations +//! - `reflection` - Agent self-improvement through conversation analysis +//! - `identity` - Agent identity file management (SOUL.md, AGENTS.md, USER.md) +//! +//! ## Migration Status +//! +//! | Component | Status | Notes | +//! |-----------|--------|-------| +//! | Heartbeat Engine | ✅ Phase 2 | Complete | +//! | Context Compactor | ✅ Phase 2 | Complete | +//! | Reflection Engine | ✅ Phase 3 | Complete | +//! | Agent Identity | ✅ Phase 3 | Complete | +//! | Agent Swarm | 🚧 Phase 3 | TODO | +//! | Vector Memory | 📋 Phase 4 | Planned | +//! +//! Reference: docs/plans/INTELLIGENCE-LAYER-MIGRATION.md + +pub mod heartbeat; +pub mod compactor; +pub mod reflection; +pub mod identity; + +// Re-export main types for convenience +pub use heartbeat::{ + HeartbeatConfig, HeartbeatEngine, HeartbeatEngineState, + HeartbeatAlert, HeartbeatResult, HeartbeatStatus, + Urgency, NotifyChannel, ProactivityLevel, +}; +pub use compactor::{ + CompactionConfig, ContextCompactor, CompactableMessage, + CompactionResult, CompactionCheck, CompactionUrgency, + estimate_tokens, estimate_messages_tokens, +}; +pub use reflection::{ + ReflectionConfig, ReflectionEngine, ReflectionEngineState, + ReflectionResult, ReflectionState, ReflectionResult as ReflectionOutput, + PatternObservation, ImprovementSuggestion, IdentityChangeProposal as ReflectionIdentityChangeProposal, + Sentiment, Priority, MemoryEntryForAnalysis, +}; +pub use identity::{ + AgentIdentityManager, IdentityManagerState, + IdentityFiles, IdentityChangeProposal, IdentitySnapshot, + IdentityFile, ProposalStatus, +}; diff --git a/desktop/src-tauri/src/intelligence/reflection.rs b/desktop/src-tauri/src/intelligence/reflection.rs new file mode 100644 index 0000000..20bc3d3 --- /dev/null +++ b/desktop/src-tauri/src/intelligence/reflection.rs @@ -0,0 +1,568 @@ +//! Reflection Engine - Agent self-improvement through conversation analysis +//! +//! Periodically analyzes recent conversations to: +//! - Identify behavioral patterns (positive and negative) +//! - Generate improvement suggestions +//! - Propose identity file changes (with user approval) +//! - Create meta-memories about agent performance +//! +//! Phase 3 of Intelligence Layer Migration. +//! Reference: ZCLAW_AGENT_INTELLIGENCE_EVOLUTION.md §6.4.2 + +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; + +// === Types === + +/// Reflection configuration +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ReflectionConfig { + #[serde(default = "default_trigger_conversations")] + pub trigger_after_conversations: usize, + #[serde(default = "default_trigger_hours")] + pub trigger_after_hours: u64, + #[serde(default)] + pub allow_soul_modification: bool, + #[serde(default = "default_require_approval")] + pub require_approval: bool, + #[serde(default = "default_use_llm")] + pub use_llm: bool, + #[serde(default = "default_llm_fallback")] + pub llm_fallback_to_rules: bool, +} + +fn default_trigger_conversations() -> usize { 5 } +fn default_trigger_hours() -> u64 { 24 } +fn default_require_approval() -> bool { true } +fn default_use_llm() -> bool { true } +fn default_llm_fallback() -> bool { true } + +impl Default for ReflectionConfig { + fn default() -> Self { + Self { + trigger_after_conversations: 5, + trigger_after_hours: 24, + allow_soul_modification: false, + require_approval: true, + use_llm: true, + llm_fallback_to_rules: true, + } + } +} + +/// Observed pattern from analysis +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PatternObservation { + pub observation: String, + pub frequency: usize, + pub sentiment: Sentiment, + pub evidence: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum Sentiment { + Positive, + Negative, + Neutral, +} + +/// Improvement suggestion +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ImprovementSuggestion { + pub area: String, + pub suggestion: String, + pub priority: Priority, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum Priority { + High, + Medium, + Low, +} + +/// Identity change proposal +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct IdentityChangeProposal { + pub agent_id: String, + pub field: String, + pub current_value: String, + pub proposed_value: String, + pub reason: String, +} + +/// Result of reflection +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ReflectionResult { + pub patterns: Vec, + pub improvements: Vec, + pub identity_proposals: Vec, + pub new_memories: usize, + pub timestamp: String, +} + +/// Reflection state +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ReflectionState { + pub conversations_since_reflection: usize, + pub last_reflection_time: Option, + pub last_reflection_agent_id: Option, +} + +impl Default for ReflectionState { + fn default() -> Self { + Self { + conversations_since_reflection: 0, + last_reflection_time: None, + last_reflection_agent_id: None, + } + } +} + +// === Memory Entry (simplified for analysis) === + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MemoryEntryForAnalysis { + pub memory_type: String, + pub content: String, + pub importance: usize, + pub access_count: usize, + pub tags: Vec, +} + +// === Reflection Engine === + +pub struct ReflectionEngine { + config: ReflectionConfig, + state: ReflectionState, + history: Vec, +} + +impl ReflectionEngine { + pub fn new(config: Option) -> Self { + Self { + config: config.unwrap_or_default(), + state: ReflectionState::default(), + history: Vec::new(), + } + } + + /// Record that a conversation happened + pub fn record_conversation(&mut self) { + self.state.conversations_since_reflection += 1; + } + + /// Check if it's time for reflection + pub fn should_reflect(&self) -> bool { + // Conversation count trigger + if self.state.conversations_since_reflection >= self.config.trigger_after_conversations { + return true; + } + + // Time-based trigger + if let Some(last_time) = &self.state.last_reflection_time { + if let Ok(last) = DateTime::parse_from_rfc3339(last_time) { + let elapsed = Utc::now().signed_duration_since(last); + let hours_since = elapsed.num_hours() as u64; + if hours_since >= self.config.trigger_after_hours { + return true; + } + } + } else { + // Never reflected before, trigger after initial conversations + return self.state.conversations_since_reflection >= 3; + } + + false + } + + /// Execute reflection cycle + pub fn reflect(&mut self, agent_id: &str, memories: &[MemoryEntryForAnalysis]) -> ReflectionResult { + // 1. Analyze memory patterns + let patterns = self.analyze_patterns(memories); + + // 2. Generate improvement suggestions + let improvements = self.generate_improvements(&patterns, memories); + + // 3. Propose identity changes if patterns warrant it + let identity_proposals: Vec = if self.config.allow_soul_modification { + self.propose_identity_changes(agent_id, &patterns) + } else { + vec![] + }; + + // 4. Count new memories that would be saved + let new_memories = patterns.iter() + .filter(|p| p.frequency >= 3) + .count() + + improvements.iter() + .filter(|i| matches!(i.priority, Priority::High)) + .count(); + + // 5. Build result + let result = ReflectionResult { + patterns, + improvements, + identity_proposals, + new_memories, + timestamp: Utc::now().to_rfc3339(), + }; + + // 6. Update state + self.state.conversations_since_reflection = 0; + self.state.last_reflection_time = Some(result.timestamp.clone()); + self.state.last_reflection_agent_id = Some(agent_id.to_string()); + + // 7. Store in history + self.history.push(result.clone()); + if self.history.len() > 20 { + self.history = self.history.split_off(10); + } + + result + } + + /// Analyze patterns in memories + fn analyze_patterns(&self, memories: &[MemoryEntryForAnalysis]) -> Vec { + let mut patterns = Vec::new(); + + // Analyze memory type distribution + let mut type_counts: HashMap = HashMap::new(); + for m in memories { + *type_counts.entry(m.memory_type.clone()).or_insert(0) += 1; + } + + // Pattern: Too many tasks accumulating + let task_count = *type_counts.get("task").unwrap_or(&0); + if task_count >= 5 { + let evidence: Vec = memories + .iter() + .filter(|m| m.memory_type == "task") + .take(3) + .map(|m| m.content.clone()) + .collect(); + + patterns.push(PatternObservation { + observation: format!("积累了 {} 个待办任务,可能存在任务管理不善", task_count), + frequency: task_count, + sentiment: Sentiment::Negative, + evidence, + }); + } + + // Pattern: Strong preference accumulation + let pref_count = *type_counts.get("preference").unwrap_or(&0); + if pref_count >= 5 { + let evidence: Vec = memories + .iter() + .filter(|m| m.memory_type == "preference") + .take(3) + .map(|m| m.content.clone()) + .collect(); + + patterns.push(PatternObservation { + observation: format!("已记录 {} 个用户偏好,对用户习惯有较好理解", pref_count), + frequency: pref_count, + sentiment: Sentiment::Positive, + evidence, + }); + } + + // Pattern: Many lessons learned + let lesson_count = *type_counts.get("lesson").unwrap_or(&0); + if lesson_count >= 5 { + let evidence: Vec = memories + .iter() + .filter(|m| m.memory_type == "lesson") + .take(3) + .map(|m| m.content.clone()) + .collect(); + + patterns.push(PatternObservation { + observation: format!("积累了 {} 条经验教训,知识库在成长", lesson_count), + frequency: lesson_count, + sentiment: Sentiment::Positive, + evidence, + }); + } + + // Pattern: High-importance items being accessed frequently + let high_access: Vec<_> = memories + .iter() + .filter(|m| m.access_count >= 5 && m.importance >= 7) + .collect(); + if high_access.len() >= 3 { + let evidence: Vec = high_access.iter().take(3).map(|m| m.content.clone()).collect(); + + patterns.push(PatternObservation { + observation: format!("有 {} 条高频访问的重要记忆,核心知识正在形成", high_access.len()), + frequency: high_access.len(), + sentiment: Sentiment::Positive, + evidence, + }); + } + + // Pattern: Low-importance memories accumulating + let low_importance_count = memories.iter().filter(|m| m.importance <= 3).count(); + if low_importance_count > 20 { + patterns.push(PatternObservation { + observation: format!("有 {} 条低重要性记忆,建议清理", low_importance_count), + frequency: low_importance_count, + sentiment: Sentiment::Neutral, + evidence: vec![], + }); + } + + // Pattern: Tag analysis - recurring topics + let mut tag_counts: HashMap = HashMap::new(); + for m in memories { + for tag in &m.tags { + if tag != "auto-extracted" { + *tag_counts.entry(tag.clone()).or_insert(0) += 1; + } + } + } + + let mut frequent_tags: Vec<_> = tag_counts + .iter() + .filter(|(_, count)| **count >= 5) + .map(|(tag, count)| (tag.clone(), *count)) + .collect(); + frequent_tags.sort_by(|a, b| b.1.cmp(&a.1)); + + if !frequent_tags.is_empty() { + let tag_str: Vec = frequent_tags + .iter() + .take(5) + .map(|(tag, count)| format!("{}({}次)", tag, count)) + .collect(); + + patterns.push(PatternObservation { + observation: format!("反复出现的主题: {}", tag_str.join(", ")), + frequency: frequent_tags[0].1, + sentiment: Sentiment::Neutral, + evidence: frequent_tags.iter().take(5).map(|(t, _)| t.clone()).collect(), + }); + } + + patterns + } + + /// Generate improvement suggestions + fn generate_improvements( + &self, + patterns: &[PatternObservation], + memories: &[MemoryEntryForAnalysis], + ) -> Vec { + let mut improvements = Vec::new(); + + // Suggestion: Clear pending tasks + if patterns.iter().any(|p| p.observation.contains("待办任务")) { + improvements.push(ImprovementSuggestion { + area: "任务管理".to_string(), + suggestion: "清理已完成的任务记忆,对长期未处理的任务降低重要性或标记为已取消".to_string(), + priority: Priority::High, + }); + } + + // Suggestion: Prune low-importance memories + if patterns.iter().any(|p| p.observation.contains("低重要性")) { + improvements.push(ImprovementSuggestion { + area: "记忆管理".to_string(), + suggestion: "执行记忆清理,移除30天以上未访问且重要性低于3的记忆".to_string(), + priority: Priority::Medium, + }); + } + + // Suggestion: User profile enrichment + let pref_count = memories.iter().filter(|m| m.memory_type == "preference").count(); + if pref_count < 3 { + improvements.push(ImprovementSuggestion { + area: "用户理解".to_string(), + suggestion: "主动在对话中了解用户偏好(沟通风格、技术栈、工作习惯),丰富用户画像".to_string(), + priority: Priority::Medium, + }); + } + + // Suggestion: Knowledge consolidation + let fact_count = memories.iter().filter(|m| m.memory_type == "fact").count(); + if fact_count > 20 { + improvements.push(ImprovementSuggestion { + area: "知识整合".to_string(), + suggestion: "合并相似的事实记忆,提高检索效率。可将相关事实整合为结构化的项目/用户档案".to_string(), + priority: Priority::Low, + }); + } + + improvements + } + + /// Propose identity changes based on patterns + fn propose_identity_changes( + &self, + agent_id: &str, + patterns: &[PatternObservation], + ) -> Vec { + let mut proposals = Vec::new(); + + // If many negative patterns, propose instruction update + let negative_patterns: Vec<_> = patterns + .iter() + .filter(|p| matches!(p.sentiment, Sentiment::Negative)) + .collect(); + + if negative_patterns.len() >= 2 { + let additions: Vec = negative_patterns + .iter() + .map(|p| format!("- 注意: {}", p.observation)) + .collect(); + + proposals.push(IdentityChangeProposal { + agent_id: agent_id.to_string(), + field: "instructions".to_string(), + current_value: "...".to_string(), + proposed_value: format!("\n\n## 自我反思改进\n{}", additions.join("\n")), + reason: format!( + "基于 {} 个负面模式观察,建议在指令中增加自我改进提醒", + negative_patterns.len() + ), + }); + } + + proposals + } + + /// Get reflection history + pub fn get_history(&self, limit: usize) -> Vec<&ReflectionResult> { + self.history.iter().rev().take(limit).collect() + } + + /// Get last reflection result + pub fn get_last_result(&self) -> Option<&ReflectionResult> { + self.history.last() + } + + /// Get current state + pub fn get_state(&self) -> &ReflectionState { + &self.state + } + + /// Get configuration + pub fn get_config(&self) -> &ReflectionConfig { + &self.config + } + + /// Update configuration + pub fn update_config(&mut self, config: ReflectionConfig) { + self.config = config; + } +} + +// === Tauri Commands === + +use std::sync::Arc; +use tokio::sync::Mutex; + +pub type ReflectionEngineState = Arc>; + +/// Initialize reflection engine +#[tauri::command] +pub async fn reflection_init( + config: Option, +) -> Result { + Ok(Arc::new(Mutex::new(ReflectionEngine::new(config)))) +} + +/// Record a conversation +#[tauri::command] +pub async fn reflection_record_conversation( + state: tauri::State<'_, ReflectionEngineState>, +) -> Result<(), String> { + let mut engine = state.lock().await; + engine.record_conversation(); + Ok(()) +} + +/// Check if reflection should run +#[tauri::command] +pub async fn reflection_should_reflect( + state: tauri::State<'_, ReflectionEngineState>, +) -> Result { + let engine = state.lock().await; + Ok(engine.should_reflect()) +} + +/// Execute reflection +#[tauri::command] +pub async fn reflection_reflect( + agent_id: String, + memories: Vec, + state: tauri::State<'_, ReflectionEngineState>, +) -> Result { + let mut engine = state.lock().await; + Ok(engine.reflect(&agent_id, &memories)) +} + +/// Get reflection history +#[tauri::command] +pub async fn reflection_get_history( + limit: Option, + state: tauri::State<'_, ReflectionEngineState>, +) -> Result, String> { + let engine = state.lock().await; + Ok(engine.get_history(limit.unwrap_or(10)).into_iter().cloned().collect()) +} + +/// Get reflection state +#[tauri::command] +pub async fn reflection_get_state( + state: tauri::State<'_, ReflectionEngineState>, +) -> Result { + let engine = state.lock().await; + Ok(engine.get_state().clone()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_should_reflect_initial() { + let mut engine = ReflectionEngine::new(None); + assert!(!engine.should_reflect()); + + // After 3 conversations + for _ in 0..3 { + engine.record_conversation(); + } + assert!(engine.should_reflect()); + } + + #[test] + fn test_analyze_patterns() { + let engine = ReflectionEngine::new(None); + let memories = vec![ + MemoryEntryForAnalysis { + memory_type: "task".to_string(), + content: "Task 1".to_string(), + importance: 7, + access_count: 1, + tags: vec![], + }, + MemoryEntryForAnalysis { + memory_type: "task".to_string(), + content: "Task 2".to_string(), + importance: 8, + access_count: 2, + tags: vec![], + }, + ]; + + let patterns = engine.analyze_patterns(&memories); + // Should not trigger (only 2 tasks, threshold is 5) + assert!(!patterns.iter().any(|p| p.observation.contains("待办任务"))); + } +} diff --git a/desktop/src-tauri/src/lib.rs b/desktop/src-tauri/src/lib.rs index 759bd0f..276e433 100644 --- a/desktop/src-tauri/src/lib.rs +++ b/desktop/src-tauri/src/lib.rs @@ -21,6 +21,9 @@ mod secure_storage; // Memory commands for persistent storage mod memory_commands; +// Intelligence Layer (migrated from frontend lib/) +mod intelligence; + use serde::Serialize; use serde_json::{json, Value}; use std::fs; @@ -1300,10 +1303,18 @@ pub fn run() { // Initialize memory store state let memory_state: memory_commands::MemoryStoreState = std::sync::Arc::new(tokio::sync::Mutex::new(None)); + // Initialize intelligence layer state + let heartbeat_state: intelligence::HeartbeatEngineState = std::sync::Arc::new(tokio::sync::Mutex::new(std::collections::HashMap::new())); + let reflection_state: intelligence::ReflectionEngineState = std::sync::Arc::new(tokio::sync::Mutex::new(intelligence::ReflectionEngine::new(None))); + let identity_state: intelligence::IdentityManagerState = std::sync::Arc::new(tokio::sync::Mutex::new(intelligence::AgentIdentityManager::new())); + tauri::Builder::default() .plugin(tauri_plugin_opener::init()) .manage(browser_state) .manage(memory_state) + .manage(heartbeat_state) + .manage(reflection_state) + .manage(identity_state) .invoke_handler(tauri::generate_handler![ // OpenFang commands (new naming) openfang_status, @@ -1390,7 +1401,43 @@ pub fn run() { memory_commands::memory_stats, memory_commands::memory_export, memory_commands::memory_import, - memory_commands::memory_db_path + memory_commands::memory_db_path, + // Intelligence Layer commands (Phase 2-3) + // Heartbeat Engine + intelligence::heartbeat::heartbeat_init, + intelligence::heartbeat::heartbeat_start, + intelligence::heartbeat::heartbeat_stop, + intelligence::heartbeat::heartbeat_tick, + intelligence::heartbeat::heartbeat_get_config, + intelligence::heartbeat::heartbeat_update_config, + intelligence::heartbeat::heartbeat_get_history, + // Context Compactor + intelligence::compactor::compactor_estimate_tokens, + intelligence::compactor::compactor_estimate_messages_tokens, + intelligence::compactor::compactor_check_threshold, + intelligence::compactor::compactor_compact, + // Reflection Engine + intelligence::reflection::reflection_init, + intelligence::reflection::reflection_record_conversation, + intelligence::reflection::reflection_should_reflect, + intelligence::reflection::reflection_reflect, + intelligence::reflection::reflection_get_history, + intelligence::reflection::reflection_get_state, + // Agent Identity Manager + intelligence::identity::identity_get, + intelligence::identity::identity_get_file, + intelligence::identity::identity_build_prompt, + intelligence::identity::identity_update_user_profile, + intelligence::identity::identity_append_user_profile, + intelligence::identity::identity_propose_change, + intelligence::identity::identity_approve_proposal, + intelligence::identity::identity_reject_proposal, + intelligence::identity::identity_get_pending_proposals, + intelligence::identity::identity_update_file, + intelligence::identity::identity_get_snapshots, + intelligence::identity::identity_restore_snapshot, + intelligence::identity::identity_list_agents, + intelligence::identity::identity_delete_agent ]) .run(tauri::generate_context!()) .expect("error while running tauri application");