Some checks failed
CI / Build Frontend (push) Has been cancelled
CI / Lint & TypeCheck (push) Has been cancelled
CI / Unit Tests (push) Has been cancelled
CI / Rust Check (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
CI / E2E Tests (push) Has been cancelled
- P1-02: Heartbeat auto-initialized in kernel_init for default agent - P1-05: CloneManager shows warning when deleting active agent + auto-switch - P1-06: AgentInfo returns soul/system_prompt/temperature/max_tokens - P1-01: Browser Hand marked experimental (requires Fantoccini bridge) - Updated DEFECT_LIST.md: all P1 resolved (0 active) - Updated RELEASE_READINESS.md: all P1 sections reflect current status
903 lines
30 KiB
Rust
903 lines
30 KiB
Rust
//! Heartbeat Engine - Periodic proactive checks for ZCLAW agents
|
||
//!
|
||
//! Runs on a configurable interval, executing a checklist of items.
|
||
//! Each check can produce alerts that surface via desktop notification or UI.
|
||
//! Supports quiet hours (no notifications during sleep time).
|
||
//!
|
||
//! Phase 2 of Intelligence Layer Migration.
|
||
//! Reference: ZCLAW_AGENT_INTELLIGENCE_EVOLUTION.md §6.4.1
|
||
//!
|
||
//! NOTE: Some methods are reserved for future proactive features.
|
||
|
||
use chrono::{Local, Timelike};
|
||
use serde::{Deserialize, Serialize};
|
||
use std::collections::HashMap;
|
||
use std::sync::Arc;
|
||
use std::time::Duration;
|
||
use tokio::sync::{broadcast, Mutex};
|
||
use tokio::time::interval;
|
||
|
||
// === Types ===
|
||
|
||
/// Heartbeat configuration
|
||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||
pub struct HeartbeatConfig {
|
||
pub enabled: bool,
|
||
#[serde(default = "default_interval")]
|
||
pub interval_minutes: u64,
|
||
pub quiet_hours_start: Option<String>, // "22:00" format
|
||
pub quiet_hours_end: Option<String>, // "08:00" format
|
||
#[serde(default)]
|
||
pub notify_channel: NotifyChannel,
|
||
#[serde(default)]
|
||
pub proactivity_level: ProactivityLevel,
|
||
#[serde(default = "default_max_alerts")]
|
||
pub max_alerts_per_tick: usize,
|
||
}
|
||
|
||
fn default_interval() -> u64 { 30 }
|
||
fn default_max_alerts() -> usize { 5 }
|
||
|
||
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
|
||
#[serde(rename_all = "lowercase")]
|
||
pub enum NotifyChannel {
|
||
#[default]
|
||
Ui,
|
||
Desktop,
|
||
All,
|
||
}
|
||
|
||
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
|
||
#[serde(rename_all = "lowercase")]
|
||
pub enum ProactivityLevel {
|
||
Silent,
|
||
Light,
|
||
#[default]
|
||
Standard,
|
||
Autonomous,
|
||
}
|
||
|
||
/// Alert generated by heartbeat checks
|
||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||
pub struct HeartbeatAlert {
|
||
pub title: String,
|
||
pub content: String,
|
||
pub urgency: Urgency,
|
||
pub source: String,
|
||
pub timestamp: String,
|
||
}
|
||
|
||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||
#[serde(rename_all = "lowercase")]
|
||
pub enum Urgency {
|
||
Low,
|
||
Medium,
|
||
High,
|
||
}
|
||
|
||
/// Result of a single heartbeat tick
|
||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||
pub struct HeartbeatResult {
|
||
pub status: HeartbeatStatus,
|
||
pub alerts: Vec<HeartbeatAlert>,
|
||
pub checked_items: usize,
|
||
pub timestamp: String,
|
||
}
|
||
|
||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||
#[serde(rename_all = "lowercase")]
|
||
pub enum HeartbeatStatus {
|
||
Ok,
|
||
Alert,
|
||
}
|
||
|
||
/// Type alias for heartbeat check function
|
||
#[allow(dead_code)] // Reserved for future proactive check registration
|
||
type HeartbeatCheckFn = Box<dyn Fn(String) -> std::pin::Pin<Box<dyn std::future::Future<Output = Option<HeartbeatAlert>> + Send>> + Send + Sync>;
|
||
|
||
// === Default Config ===
|
||
|
||
impl Default for HeartbeatConfig {
|
||
fn default() -> Self {
|
||
Self {
|
||
enabled: true,
|
||
interval_minutes: 30,
|
||
quiet_hours_start: Some("22:00".to_string()),
|
||
quiet_hours_end: Some("08:00".to_string()),
|
||
notify_channel: NotifyChannel::Ui,
|
||
proactivity_level: ProactivityLevel::Standard,
|
||
max_alerts_per_tick: 5,
|
||
}
|
||
}
|
||
}
|
||
|
||
// === Heartbeat Engine ===
|
||
|
||
pub struct HeartbeatEngine {
|
||
agent_id: String,
|
||
config: Arc<Mutex<HeartbeatConfig>>,
|
||
running: Arc<Mutex<bool>>,
|
||
alert_sender: broadcast::Sender<HeartbeatAlert>,
|
||
history: Arc<Mutex<Vec<HeartbeatResult>>>,
|
||
}
|
||
|
||
impl HeartbeatEngine {
|
||
pub fn new(agent_id: String, config: Option<HeartbeatConfig>) -> Self {
|
||
let (alert_sender, _) = broadcast::channel(100);
|
||
|
||
Self {
|
||
agent_id,
|
||
config: Arc::new(Mutex::new(config.unwrap_or_default())),
|
||
running: Arc::new(Mutex::new(false)),
|
||
alert_sender,
|
||
history: Arc::new(Mutex::new(Vec::new())),
|
||
}
|
||
}
|
||
|
||
/// Start the heartbeat engine with periodic ticks
|
||
pub async fn start(&self) {
|
||
let mut running = self.running.lock().await;
|
||
if *running {
|
||
return;
|
||
}
|
||
*running = true;
|
||
drop(running);
|
||
|
||
let agent_id = self.agent_id.clone();
|
||
let config = Arc::clone(&self.config);
|
||
let running_clone = Arc::clone(&self.running);
|
||
let alert_sender = self.alert_sender.clone();
|
||
let history = Arc::clone(&self.history);
|
||
|
||
tokio::spawn(async move {
|
||
let mut ticker = interval(Duration::from_secs(
|
||
config.lock().await.interval_minutes * 60,
|
||
));
|
||
|
||
loop {
|
||
ticker.tick().await;
|
||
|
||
if !*running_clone.lock().await {
|
||
break;
|
||
}
|
||
|
||
// Check quiet hours
|
||
if is_quiet_hours(&*config.lock().await) {
|
||
continue;
|
||
}
|
||
|
||
// Execute heartbeat tick
|
||
let result = execute_tick(&agent_id, &config, &alert_sender).await;
|
||
|
||
// Store history in-memory
|
||
let mut hist = history.lock().await;
|
||
hist.push(result);
|
||
if hist.len() > 100 {
|
||
*hist = hist.split_off(50);
|
||
}
|
||
|
||
// Persist history to VikingStorage (fire-and-forget)
|
||
let history_to_persist: Vec<HeartbeatResult> = hist.clone();
|
||
let aid = agent_id.clone();
|
||
tokio::spawn(async move {
|
||
if let Ok(storage) = crate::viking_commands::get_storage().await {
|
||
let key = format!("heartbeat:history:{}", aid);
|
||
if let Ok(json) = serde_json::to_string(&history_to_persist) {
|
||
if let Err(e) = zclaw_growth::VikingStorage::store_metadata_json(
|
||
&*storage, &key, &json,
|
||
).await {
|
||
tracing::warn!("[heartbeat] Failed to persist history: {}", e);
|
||
}
|
||
}
|
||
}
|
||
});
|
||
}
|
||
});
|
||
}
|
||
|
||
/// Stop the heartbeat engine
|
||
pub async fn stop(&self) {
|
||
let mut running = self.running.lock().await;
|
||
*running = false;
|
||
}
|
||
|
||
/// Check if the engine is running
|
||
#[allow(dead_code)] // Reserved for UI status display
|
||
pub async fn is_running(&self) -> bool {
|
||
*self.running.lock().await
|
||
}
|
||
|
||
/// Execute a single tick manually and persist the result to history
|
||
pub async fn tick(&self) -> HeartbeatResult {
|
||
let result = execute_tick(&self.agent_id, &self.config, &self.alert_sender).await;
|
||
|
||
// Store in history (same as the periodic loop)
|
||
let mut hist = self.history.lock().await;
|
||
hist.push(result.clone());
|
||
if hist.len() > 100 {
|
||
*hist = hist.split_off(50);
|
||
}
|
||
|
||
// Persist to VikingStorage
|
||
let history_to_persist: Vec<HeartbeatResult> = hist.clone();
|
||
let aid = self.agent_id.clone();
|
||
tokio::spawn(async move {
|
||
if let Ok(storage) = crate::viking_commands::get_storage().await {
|
||
let key = format!("heartbeat:history:{}", aid);
|
||
if let Ok(json) = serde_json::to_string(&history_to_persist) {
|
||
if let Err(e) = zclaw_growth::VikingStorage::store_metadata_json(
|
||
&*storage, &key, &json,
|
||
).await {
|
||
tracing::warn!("[heartbeat] Failed to persist history: {}", e);
|
||
}
|
||
}
|
||
}
|
||
});
|
||
|
||
result
|
||
}
|
||
|
||
/// Subscribe to alerts
|
||
#[allow(dead_code)] // Reserved for future UI notification integration
|
||
pub fn subscribe(&self) -> broadcast::Receiver<HeartbeatAlert> {
|
||
self.alert_sender.subscribe()
|
||
}
|
||
|
||
/// Get heartbeat history
|
||
pub async fn get_history(&self, limit: usize) -> Vec<HeartbeatResult> {
|
||
let hist = self.history.lock().await;
|
||
hist.iter().rev().take(limit).cloned().collect()
|
||
}
|
||
|
||
/// Restore heartbeat history from VikingStorage metadata (called during init)
|
||
pub async fn restore_history(&self) {
|
||
let key = format!("heartbeat:history:{}", self.agent_id);
|
||
match crate::viking_commands::get_storage().await {
|
||
Ok(storage) => {
|
||
match zclaw_growth::VikingStorage::get_metadata_json(&*storage, &key).await {
|
||
Ok(Some(json)) => {
|
||
if let Ok(persisted) = serde_json::from_str::<Vec<HeartbeatResult>>(&json) {
|
||
let count = persisted.len();
|
||
let mut hist = self.history.lock().await;
|
||
*hist = persisted;
|
||
tracing::info!(
|
||
"[heartbeat] Restored {} history entries for {}",
|
||
count, self.agent_id
|
||
);
|
||
}
|
||
}
|
||
Ok(None) => {
|
||
tracing::debug!("[heartbeat] No persisted history for {}", self.agent_id);
|
||
}
|
||
Err(e) => {
|
||
tracing::warn!("[heartbeat] Failed to restore history: {}", e);
|
||
}
|
||
}
|
||
}
|
||
Err(e) => {
|
||
tracing::warn!("[heartbeat] Storage unavailable during init: {}", e);
|
||
}
|
||
}
|
||
}
|
||
|
||
/// Update configuration
|
||
pub async fn update_config(&self, updates: HeartbeatConfig) {
|
||
let mut config = self.config.lock().await;
|
||
*config = updates;
|
||
}
|
||
|
||
/// Get current configuration
|
||
pub async fn get_config(&self) -> HeartbeatConfig {
|
||
self.config.lock().await.clone()
|
||
}
|
||
}
|
||
|
||
// === Helper Functions ===
|
||
|
||
/// Check if current time is within quiet hours
|
||
fn is_quiet_hours(config: &HeartbeatConfig) -> bool {
|
||
let start = match &config.quiet_hours_start {
|
||
Some(s) => s,
|
||
None => return false,
|
||
};
|
||
let end = match &config.quiet_hours_end {
|
||
Some(e) => e,
|
||
None => return false,
|
||
};
|
||
|
||
let now = Local::now();
|
||
let current_minutes = now.hour() * 60 + now.minute();
|
||
|
||
let start_minutes = parse_time_to_minutes(start);
|
||
let end_minutes = parse_time_to_minutes(end);
|
||
|
||
if start_minutes <= end_minutes {
|
||
// Same-day range (e.g., 13:00-17:00)
|
||
current_minutes >= start_minutes && current_minutes < end_minutes
|
||
} else {
|
||
// Cross-midnight range (e.g., 22:00-08:00)
|
||
current_minutes >= start_minutes || current_minutes < end_minutes
|
||
}
|
||
}
|
||
|
||
/// Parse "HH:MM" format to minutes since midnight
|
||
fn parse_time_to_minutes(time: &str) -> u32 {
|
||
let parts: Vec<&str> = time.split(':').collect();
|
||
if parts.len() != 2 {
|
||
return 0;
|
||
}
|
||
let hours: u32 = parts[0].parse().unwrap_or(0);
|
||
let minutes: u32 = parts[1].parse().unwrap_or(0);
|
||
hours * 60 + minutes
|
||
}
|
||
|
||
/// Execute a single heartbeat tick
|
||
async fn execute_tick(
|
||
agent_id: &str,
|
||
config: &Arc<Mutex<HeartbeatConfig>>,
|
||
alert_sender: &broadcast::Sender<HeartbeatAlert>,
|
||
) -> HeartbeatResult {
|
||
let cfg = config.lock().await;
|
||
let mut alerts = Vec::new();
|
||
|
||
// Run built-in checks
|
||
let checks: Vec<(&str, fn(&str) -> Option<HeartbeatAlert>)> = vec![
|
||
("pending-tasks", check_pending_tasks),
|
||
("memory-health", check_memory_health),
|
||
("idle-greeting", check_idle_greeting),
|
||
("personality-improvement", check_personality_improvement),
|
||
("learning-opportunities", check_learning_opportunities),
|
||
];
|
||
|
||
let checks_count = checks.len();
|
||
|
||
for (source, check_fn) in checks {
|
||
if alerts.len() >= cfg.max_alerts_per_tick {
|
||
break;
|
||
}
|
||
|
||
if let Some(alert) = check_fn(agent_id) {
|
||
// Add source to alert
|
||
alerts.push(HeartbeatAlert {
|
||
source: source.to_string(),
|
||
..alert
|
||
});
|
||
}
|
||
}
|
||
|
||
// Filter by proactivity level
|
||
let filtered_alerts = filter_by_proactivity(&alerts, &cfg.proactivity_level);
|
||
|
||
// Send alerts
|
||
for alert in &filtered_alerts {
|
||
let _ = alert_sender.send(alert.clone());
|
||
}
|
||
|
||
let status = if filtered_alerts.is_empty() {
|
||
HeartbeatStatus::Ok
|
||
} else {
|
||
HeartbeatStatus::Alert
|
||
};
|
||
|
||
HeartbeatResult {
|
||
status,
|
||
alerts: filtered_alerts,
|
||
checked_items: checks_count,
|
||
timestamp: chrono::Utc::now().to_rfc3339(),
|
||
}
|
||
}
|
||
|
||
/// Filter alerts based on proactivity level
|
||
fn filter_by_proactivity(alerts: &[HeartbeatAlert], level: &ProactivityLevel) -> Vec<HeartbeatAlert> {
|
||
match level {
|
||
ProactivityLevel::Silent => vec![],
|
||
ProactivityLevel::Light => alerts
|
||
.iter()
|
||
.filter(|a| matches!(a.urgency, Urgency::High))
|
||
.cloned()
|
||
.collect(),
|
||
ProactivityLevel::Standard => alerts
|
||
.iter()
|
||
.filter(|a| matches!(a.urgency, Urgency::High | Urgency::Medium))
|
||
.cloned()
|
||
.collect(),
|
||
ProactivityLevel::Autonomous => alerts.to_vec(),
|
||
}
|
||
}
|
||
|
||
// === Built-in Checks ===
|
||
|
||
/// Pattern detection counters (shared state for personality detection)
|
||
use std::collections::HashMap as StdHashMap;
|
||
use std::sync::RwLock;
|
||
use std::sync::OnceLock;
|
||
|
||
/// Global correction counters
|
||
static CORRECTION_COUNTERS: OnceLock<RwLock<StdHashMap<String, usize>>> = OnceLock::new();
|
||
|
||
/// Global memory stats cache (updated by frontend via Tauri command)
|
||
/// Key: agent_id, Value: (task_count, total_memories, storage_bytes)
|
||
static MEMORY_STATS_CACHE: OnceLock<RwLock<StdHashMap<String, MemoryStatsCache>>> = OnceLock::new();
|
||
|
||
/// Global last interaction timestamps
|
||
/// Key: agent_id, Value: last interaction timestamp (RFC3339)
|
||
static LAST_INTERACTION: OnceLock<RwLock<StdHashMap<String, String>>> = OnceLock::new();
|
||
|
||
/// Cached memory stats for an agent
|
||
#[derive(Clone, Debug, Default)]
|
||
pub struct MemoryStatsCache {
|
||
pub task_count: usize,
|
||
pub total_entries: usize,
|
||
pub storage_size_bytes: usize,
|
||
#[allow(dead_code)] // Reserved for UI display; will be exposed via heartbeat_get_memory_stats
|
||
pub last_updated: Option<String>,
|
||
}
|
||
|
||
fn get_correction_counters() -> &'static RwLock<StdHashMap<String, usize>> {
|
||
CORRECTION_COUNTERS.get_or_init(|| RwLock::new(StdHashMap::new()))
|
||
}
|
||
|
||
fn get_memory_stats_cache() -> &'static RwLock<StdHashMap<String, MemoryStatsCache>> {
|
||
MEMORY_STATS_CACHE.get_or_init(|| RwLock::new(StdHashMap::new()))
|
||
}
|
||
|
||
fn get_last_interaction_map() -> &'static RwLock<StdHashMap<String, String>> {
|
||
LAST_INTERACTION.get_or_init(|| RwLock::new(StdHashMap::new()))
|
||
}
|
||
|
||
/// Record an interaction for an agent (call from frontend when user sends message)
|
||
pub fn record_interaction(agent_id: &str) {
|
||
let now = chrono::Utc::now().to_rfc3339();
|
||
|
||
// Store in-memory map (fast path)
|
||
let map = get_last_interaction_map();
|
||
if let Ok(mut map) = map.write() {
|
||
map.insert(agent_id.to_string(), now.clone());
|
||
}
|
||
|
||
// Persist to VikingStorage metadata (survives restarts)
|
||
let key = format!("heartbeat:last_interaction:{}", agent_id);
|
||
tokio::spawn(async move {
|
||
if let Ok(storage) = crate::viking_commands::get_storage().await {
|
||
if let Err(e) = zclaw_growth::VikingStorage::store_metadata_json(&*storage, &key, &now).await {
|
||
tracing::warn!("[heartbeat] Failed to persist interaction time: {}", e);
|
||
}
|
||
}
|
||
});
|
||
}
|
||
|
||
/// Update memory stats cache for an agent
|
||
/// Call this from frontend via Tauri command after fetching memory stats
|
||
pub fn update_memory_stats_cache(agent_id: &str, task_count: usize, total_entries: usize, storage_size_bytes: usize) {
|
||
let cache = get_memory_stats_cache();
|
||
if let Ok(mut cache) = cache.write() {
|
||
cache.insert(agent_id.to_string(), MemoryStatsCache {
|
||
task_count,
|
||
total_entries,
|
||
storage_size_bytes,
|
||
last_updated: Some(chrono::Utc::now().to_rfc3339()),
|
||
});
|
||
}
|
||
}
|
||
|
||
/// Get memory stats for an agent
|
||
fn get_cached_memory_stats(agent_id: &str) -> Option<MemoryStatsCache> {
|
||
let cache = get_memory_stats_cache();
|
||
if let Ok(cache) = cache.read() {
|
||
cache.get(agent_id).cloned()
|
||
} else {
|
||
None
|
||
}
|
||
}
|
||
|
||
/// Record a user correction for pattern detection
|
||
/// Call this when user corrects agent behavior
|
||
pub fn record_user_correction(agent_id: &str, correction_type: &str) {
|
||
let key = format!("{}:{}", agent_id, correction_type);
|
||
let counters = get_correction_counters();
|
||
if let Ok(mut counters) = counters.write() {
|
||
*counters.entry(key).or_insert(0) += 1;
|
||
}
|
||
}
|
||
|
||
/// Get and reset correction count
|
||
fn get_correction_count(agent_id: &str, correction_type: &str) -> usize {
|
||
let key = format!("{}:{}", agent_id, correction_type);
|
||
let counters = get_correction_counters();
|
||
if let Ok(mut counters) = counters.write() {
|
||
counters.remove(&key).unwrap_or(0)
|
||
} else {
|
||
0
|
||
}
|
||
}
|
||
|
||
/// Check all correction patterns for an agent
|
||
fn check_correction_patterns(agent_id: &str) -> Vec<HeartbeatAlert> {
|
||
let patterns = [
|
||
("communication_style", "简洁", "用户偏好简洁回复,建议减少冗长解释"),
|
||
("tone", "轻松", "用户偏好轻松语气,建议减少正式用语"),
|
||
("detail_level", "概要", "用户偏好概要性回答,建议先给结论再展开"),
|
||
("language", "中文", "用户语言偏好,建议优先使用中文"),
|
||
("code_first", "代码优先", "用户偏好代码优先,建议先展示代码再解释"),
|
||
];
|
||
|
||
let mut alerts = Vec::new();
|
||
for (pattern_type, _keyword, suggestion) in patterns {
|
||
let count = get_correction_count(agent_id, pattern_type);
|
||
if count >= 3 {
|
||
alerts.push(HeartbeatAlert {
|
||
title: "人格改进建议".to_string(),
|
||
content: format!("{} (检测到 {} 次相关纠正)", suggestion, count),
|
||
urgency: Urgency::Medium,
|
||
source: "personality-improvement".to_string(),
|
||
timestamp: chrono::Utc::now().to_rfc3339(),
|
||
});
|
||
}
|
||
}
|
||
alerts
|
||
}
|
||
|
||
/// Check for pending task memories
|
||
/// Uses cached memory stats to detect task backlog
|
||
fn check_pending_tasks(agent_id: &str) -> Option<HeartbeatAlert> {
|
||
match get_cached_memory_stats(agent_id) {
|
||
Some(stats) if stats.task_count >= 5 => {
|
||
// Alert if there are 5+ pending tasks
|
||
Some(HeartbeatAlert {
|
||
title: "待办任务积压".to_string(),
|
||
content: format!("当前有 {} 个待办任务未完成,建议处理或重新评估优先级", stats.task_count),
|
||
urgency: if stats.task_count >= 10 {
|
||
Urgency::High
|
||
} else {
|
||
Urgency::Medium
|
||
},
|
||
source: "pending-tasks".to_string(),
|
||
timestamp: chrono::Utc::now().to_rfc3339(),
|
||
})
|
||
},
|
||
Some(_) => None, // Stats available but no alert needed
|
||
None => {
|
||
// Cache is empty - warn about missing sync
|
||
tracing::warn!("[Heartbeat] Memory stats cache is empty for agent {}, waiting for frontend sync", agent_id);
|
||
Some(HeartbeatAlert {
|
||
title: "记忆统计未同步".to_string(),
|
||
content: "心跳引擎未能获取记忆统计信息,部分检查被跳过。请确保记忆系统正常运行。".to_string(),
|
||
urgency: Urgency::Low,
|
||
source: "pending-tasks".to_string(),
|
||
timestamp: chrono::Utc::now().to_rfc3339(),
|
||
})
|
||
}
|
||
}
|
||
}
|
||
|
||
/// Check memory storage health
|
||
/// Uses cached memory stats to detect storage issues
|
||
fn check_memory_health(agent_id: &str) -> Option<HeartbeatAlert> {
|
||
match get_cached_memory_stats(agent_id) {
|
||
Some(stats) => {
|
||
// Alert if storage is very large (> 50MB)
|
||
if stats.storage_size_bytes > 50 * 1024 * 1024 {
|
||
return Some(HeartbeatAlert {
|
||
title: "记忆存储过大".to_string(),
|
||
content: format!(
|
||
"记忆存储已达 {:.1}MB,建议清理低重要性记忆或归档旧记忆",
|
||
stats.storage_size_bytes as f64 / (1024.0 * 1024.0)
|
||
),
|
||
urgency: Urgency::Medium,
|
||
source: "memory-health".to_string(),
|
||
timestamp: chrono::Utc::now().to_rfc3339(),
|
||
});
|
||
}
|
||
|
||
// Alert if too many memories (> 1000)
|
||
if stats.total_entries > 1000 {
|
||
return Some(HeartbeatAlert {
|
||
title: "记忆条目过多".to_string(),
|
||
content: format!(
|
||
"当前有 {} 条记忆,可能影响检索效率,建议清理或归档",
|
||
stats.total_entries
|
||
),
|
||
urgency: Urgency::Low,
|
||
source: "memory-health".to_string(),
|
||
timestamp: chrono::Utc::now().to_rfc3339(),
|
||
});
|
||
}
|
||
None
|
||
},
|
||
None => {
|
||
// Cache is empty - skip check (already reported in check_pending_tasks)
|
||
None
|
||
}
|
||
}
|
||
}
|
||
|
||
/// Check if user has been idle and might benefit from a greeting
|
||
fn check_idle_greeting(agent_id: &str) -> Option<HeartbeatAlert> {
|
||
let map = get_last_interaction_map();
|
||
|
||
// Try to get the last interaction time
|
||
let last_interaction = {
|
||
let read_result = map.read();
|
||
match read_result {
|
||
Ok(map) => map.get(agent_id).cloned(),
|
||
Err(_) => return None, // Skip if lock fails
|
||
}
|
||
};
|
||
|
||
// If no interaction recorded yet, skip
|
||
let last_interaction = last_interaction?;
|
||
|
||
// Parse the timestamp and convert to UTC for comparison
|
||
let last_time = chrono::DateTime::parse_from_rfc3339(&last_interaction)
|
||
.ok()?
|
||
.with_timezone(&chrono::Utc);
|
||
let now = chrono::Utc::now();
|
||
let idle_hours = (now - last_time).num_hours();
|
||
|
||
// Alert if idle for more than 24 hours
|
||
if idle_hours >= 24 {
|
||
Some(HeartbeatAlert {
|
||
title: "用户长时间未互动".to_string(),
|
||
content: format!(
|
||
"距离上次互动已过去 {} 小时,可以考虑主动问候或检查用户是否需要帮助",
|
||
idle_hours
|
||
),
|
||
urgency: Urgency::Low,
|
||
source: "idle-greeting".to_string(),
|
||
timestamp: now.to_rfc3339(),
|
||
})
|
||
} else {
|
||
None
|
||
}
|
||
}
|
||
|
||
/// Check for personality improvement opportunities
|
||
///
|
||
/// Detects patterns that suggest the agent's personality could be improved:
|
||
/// - User repeatedly corrects behavior (e.g., "不要那么啰嗦")
|
||
/// - User expresses same preference multiple times
|
||
/// - Context changes (new project, different role)
|
||
///
|
||
/// When threshold is reached, proposes a personality change via the identity system.
|
||
fn check_personality_improvement(agent_id: &str) -> Option<HeartbeatAlert> {
|
||
// Check all correction patterns and return the first one that triggers
|
||
let alerts = check_correction_patterns(agent_id);
|
||
alerts.into_iter().next()
|
||
}
|
||
|
||
/// Check for learning opportunities from recent conversations
|
||
///
|
||
/// Identifies opportunities to capture user preferences or behavioral patterns
|
||
/// that could enhance agent effectiveness.
|
||
fn check_learning_opportunities(agent_id: &str) -> Option<HeartbeatAlert> {
|
||
// Check if any correction patterns are approaching threshold
|
||
let counters = get_correction_counters();
|
||
let mut approaching_threshold: Vec<String> = Vec::new();
|
||
|
||
if let Ok(counters) = counters.read() {
|
||
for (key, count) in counters.iter() {
|
||
if key.starts_with(&format!("{}:", agent_id)) && *count >= 2 && *count < 3 {
|
||
let pattern_type = key.split(':').nth(1).unwrap_or("unknown").to_string();
|
||
approaching_threshold.push(pattern_type);
|
||
}
|
||
}
|
||
}
|
||
|
||
if !approaching_threshold.is_empty() {
|
||
Some(HeartbeatAlert {
|
||
title: "学习机会".to_string(),
|
||
content: format!(
|
||
"检测到用户可能有偏好调整倾向 ({}),继续观察将触发人格改进建议",
|
||
approaching_threshold.join(", ")
|
||
),
|
||
urgency: Urgency::Low,
|
||
source: "learning-opportunities".to_string(),
|
||
timestamp: chrono::Utc::now().to_rfc3339(),
|
||
})
|
||
} else {
|
||
None
|
||
}
|
||
}
|
||
|
||
// === Tauri Commands ===
|
||
|
||
/// Heartbeat engine state for Tauri
|
||
pub type HeartbeatEngineState = Arc<Mutex<HashMap<String, HeartbeatEngine>>>;
|
||
|
||
/// Initialize heartbeat engine for an agent
|
||
///
|
||
/// Restores persisted interaction time from VikingStorage so idle-greeting
|
||
/// check works correctly across app restarts.
|
||
// @connected
|
||
#[tauri::command]
|
||
pub async fn heartbeat_init(
|
||
agent_id: String,
|
||
config: Option<HeartbeatConfig>,
|
||
state: tauri::State<'_, HeartbeatEngineState>,
|
||
) -> Result<(), String> {
|
||
let engine = HeartbeatEngine::new(agent_id.clone(), config);
|
||
|
||
// Restore last interaction time from VikingStorage metadata
|
||
restore_last_interaction(&agent_id).await;
|
||
|
||
// Restore heartbeat history from VikingStorage metadata
|
||
engine.restore_history().await;
|
||
|
||
let mut engines = state.lock().await;
|
||
engines.insert(agent_id, engine);
|
||
Ok(())
|
||
}
|
||
|
||
/// Restore the last interaction timestamp for an agent from VikingStorage.
|
||
/// Called during heartbeat_init so the idle-greeting check works after restart.
|
||
pub async fn restore_last_interaction(agent_id: &str) {
|
||
let key = format!("heartbeat:last_interaction:{}", agent_id);
|
||
match crate::viking_commands::get_storage().await {
|
||
Ok(storage) => {
|
||
match zclaw_growth::VikingStorage::get_metadata_json(&*storage, &key).await {
|
||
Ok(Some(timestamp)) => {
|
||
let map = get_last_interaction_map();
|
||
if let Ok(mut map) = map.write() {
|
||
map.insert(agent_id.to_string(), timestamp);
|
||
}
|
||
tracing::info!("[heartbeat] Restored last interaction for {}", agent_id);
|
||
}
|
||
Ok(None) => {
|
||
tracing::debug!("[heartbeat] No persisted interaction for {}", agent_id);
|
||
}
|
||
Err(e) => {
|
||
tracing::warn!("[heartbeat] Failed to restore interaction: {}", e);
|
||
}
|
||
}
|
||
}
|
||
Err(e) => {
|
||
tracing::warn!("[heartbeat] Storage unavailable during init: {}", e);
|
||
}
|
||
}
|
||
}
|
||
|
||
/// Start heartbeat engine for an agent
|
||
// @connected
|
||
#[tauri::command]
|
||
pub async fn heartbeat_start(
|
||
agent_id: String,
|
||
state: tauri::State<'_, HeartbeatEngineState>,
|
||
) -> Result<(), String> {
|
||
let engines = state.lock().await;
|
||
let engine = engines
|
||
.get(&agent_id)
|
||
.ok_or_else(|| format!("Heartbeat engine not initialized for agent: {}", agent_id))?;
|
||
engine.start().await;
|
||
Ok(())
|
||
}
|
||
|
||
/// Stop heartbeat engine for an agent
|
||
// @connected
|
||
#[tauri::command]
|
||
pub async fn heartbeat_stop(
|
||
agent_id: String,
|
||
state: tauri::State<'_, HeartbeatEngineState>,
|
||
) -> Result<(), String> {
|
||
let engines = state.lock().await;
|
||
let engine = engines
|
||
.get(&agent_id)
|
||
.ok_or_else(|| format!("Heartbeat engine not initialized for agent: {}", agent_id))?;
|
||
engine.stop().await;
|
||
Ok(())
|
||
}
|
||
|
||
/// Execute a single heartbeat tick
|
||
// @connected
|
||
#[tauri::command]
|
||
pub async fn heartbeat_tick(
|
||
agent_id: String,
|
||
state: tauri::State<'_, HeartbeatEngineState>,
|
||
) -> Result<HeartbeatResult, String> {
|
||
let engines = state.lock().await;
|
||
let engine = engines
|
||
.get(&agent_id)
|
||
.ok_or_else(|| format!("Heartbeat engine not initialized for agent: {}", agent_id))?;
|
||
Ok(engine.tick().await)
|
||
}
|
||
|
||
/// Get heartbeat configuration
|
||
// @connected
|
||
#[tauri::command]
|
||
pub async fn heartbeat_get_config(
|
||
agent_id: String,
|
||
state: tauri::State<'_, HeartbeatEngineState>,
|
||
) -> Result<HeartbeatConfig, String> {
|
||
let engines = state.lock().await;
|
||
let engine = engines
|
||
.get(&agent_id)
|
||
.ok_or_else(|| format!("Heartbeat engine not initialized for agent: {}", agent_id))?;
|
||
Ok(engine.get_config().await)
|
||
}
|
||
|
||
/// Update heartbeat configuration
|
||
// @connected
|
||
#[tauri::command]
|
||
pub async fn heartbeat_update_config(
|
||
agent_id: String,
|
||
config: HeartbeatConfig,
|
||
state: tauri::State<'_, HeartbeatEngineState>,
|
||
) -> Result<(), String> {
|
||
let engines = state.lock().await;
|
||
let engine = engines
|
||
.get(&agent_id)
|
||
.ok_or_else(|| format!("Heartbeat engine not initialized for agent: {}", agent_id))?;
|
||
engine.update_config(config).await;
|
||
Ok(())
|
||
}
|
||
|
||
/// Get heartbeat history
|
||
// @connected
|
||
#[tauri::command]
|
||
pub async fn heartbeat_get_history(
|
||
agent_id: String,
|
||
limit: Option<usize>,
|
||
state: tauri::State<'_, HeartbeatEngineState>,
|
||
) -> Result<Vec<HeartbeatResult>, String> {
|
||
let engines = state.lock().await;
|
||
let engine = engines
|
||
.get(&agent_id)
|
||
.ok_or_else(|| format!("Heartbeat engine not initialized for agent: {}", agent_id))?;
|
||
Ok(engine.get_history(limit.unwrap_or(20)).await)
|
||
}
|
||
|
||
/// Update memory stats cache for heartbeat checks
|
||
/// This should be called by the frontend after fetching memory stats
|
||
// @connected
|
||
#[tauri::command]
|
||
pub async fn heartbeat_update_memory_stats(
|
||
agent_id: String,
|
||
task_count: usize,
|
||
total_entries: usize,
|
||
storage_size_bytes: usize,
|
||
) -> Result<(), String> {
|
||
update_memory_stats_cache(&agent_id, task_count, total_entries, storage_size_bytes);
|
||
Ok(())
|
||
}
|
||
|
||
/// Record a user correction for personality improvement detection
|
||
// @connected
|
||
#[tauri::command]
|
||
pub async fn heartbeat_record_correction(
|
||
agent_id: String,
|
||
correction_type: String,
|
||
) -> Result<(), String> {
|
||
record_user_correction(&agent_id, &correction_type);
|
||
Ok(())
|
||
}
|
||
|
||
/// Record a user interaction for idle greeting detection
|
||
/// Call this from frontend whenever user sends a message
|
||
// @connected
|
||
#[tauri::command]
|
||
pub async fn heartbeat_record_interaction(
|
||
agent_id: String,
|
||
) -> Result<(), String> {
|
||
record_interaction(&agent_id);
|
||
Ok(())
|
||
}
|
||
|
||
#[cfg(test)]
|
||
mod tests {
|
||
use super::*;
|
||
|
||
#[test]
|
||
fn test_parse_time() {
|
||
assert_eq!(parse_time_to_minutes("00:00"), 0);
|
||
assert_eq!(parse_time_to_minutes("08:00"), 480);
|
||
assert_eq!(parse_time_to_minutes("22:00"), 1320);
|
||
assert_eq!(parse_time_to_minutes("23:59"), 1439);
|
||
}
|
||
|
||
#[test]
|
||
fn test_default_config() {
|
||
let config = HeartbeatConfig::default();
|
||
assert!(config.enabled);
|
||
assert_eq!(config.interval_minutes, 30);
|
||
}
|
||
}
|