Files
zclaw_openfang/desktop/src-tauri/src/intelligence/heartbeat.rs
iven 215c079d29
Some checks failed
CI / Lint & TypeCheck (push) Has been cancelled
CI / Unit Tests (push) Has been cancelled
CI / Build Frontend (push) Has been cancelled
CI / Rust Check (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
CI / E2E Tests (push) Has been cancelled
fix(intelligence): Heartbeat 统一健康系统 — 6处断链修复 + 健康面板 + SaaS自动恢复
Rust 后端 (heartbeat.rs):
- 告警实时推送: OnceLock<AppHandle> + Tauri emit heartbeat:alert
- 动态间隔: tokio::select! + Notify 替代不可变 interval
- Config 持久化: update_config 写入 VikingStorage
- heartbeat_init 从 VikingStorage 恢复 config
- 移除 dead code (subscribe, HeartbeatCheckFn)
- Memory stats fallback 分层处理

新增 health_snapshot.rs:
- HealthSnapshot Tauri 命令 — 按需查询引擎/记忆状态
- 注册到 lib.rs invoke_handler

前端修复:
- HeartbeatConfig handleSave 同步到 Rust 后端
- App.tsx 读 localStorage 持久化配置 + heartbeat:alert 监听 + toast
- saasStore 降级后指数退避探测恢复 + saas-recovered 事件
- 新增 HealthPanel.tsx 只读健康面板 (4卡片 + 告警列表)
- SettingsLayout 添加 health 导航入口

清理:
- 删除 intelligence-client/ 目录版 (9文件 -1640行, 单文件版是活跃代码)
2026-04-15 23:19:24 +08:00

1017 lines
35 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

//! Heartbeat Engine - Periodic proactive checks for ZCLAW agents
//!
//! Runs on a configurable interval, executing a checklist of items.
//! Each check can produce alerts that surface via desktop notification or UI.
//! Supports quiet hours (no notifications during sleep time).
//!
//! Phase 2 of Intelligence Layer Migration.
//! Reference: ZCLAW_AGENT_INTELLIGENCE_EVOLUTION.md §6.4.1
//!
//! NOTE: Some methods are reserved for future proactive features.
use chrono::{Local, Timelike};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::OnceLock;
use std::time::Duration;
use tokio::sync::{broadcast, Mutex, Notify};
use tauri::{AppHandle, Emitter};
// === Types ===
/// Heartbeat configuration
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HeartbeatConfig {
pub enabled: bool,
#[serde(default = "default_interval")]
pub interval_minutes: u64,
pub quiet_hours_start: Option<String>, // "22:00" format
pub quiet_hours_end: Option<String>, // "08:00" format
#[serde(default)]
pub notify_channel: NotifyChannel,
#[serde(default)]
pub proactivity_level: ProactivityLevel,
#[serde(default = "default_max_alerts")]
pub max_alerts_per_tick: usize,
}
fn default_interval() -> u64 { 30 }
fn default_max_alerts() -> usize { 5 }
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
#[serde(rename_all = "lowercase")]
pub enum NotifyChannel {
#[default]
Ui,
Desktop,
All,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
#[serde(rename_all = "lowercase")]
pub enum ProactivityLevel {
Silent,
Light,
#[default]
Standard,
Autonomous,
}
/// Alert generated by heartbeat checks
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HeartbeatAlert {
pub title: String,
pub content: String,
pub urgency: Urgency,
pub source: String,
pub timestamp: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum Urgency {
Low,
Medium,
High,
}
/// Result of a single heartbeat tick
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HeartbeatResult {
pub status: HeartbeatStatus,
pub alerts: Vec<HeartbeatAlert>,
pub checked_items: usize,
pub timestamp: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum HeartbeatStatus {
Ok,
Alert,
}
/// Global AppHandle for emitting heartbeat alerts to frontend
/// Set by heartbeat_init, used by background tick task
static HEARTBEAT_APP_HANDLE: OnceLock<AppHandle> = OnceLock::new();
// === Default Config ===
impl Default for HeartbeatConfig {
fn default() -> Self {
Self {
enabled: true,
interval_minutes: 30,
quiet_hours_start: Some("22:00".to_string()),
quiet_hours_end: Some("08:00".to_string()),
notify_channel: NotifyChannel::Ui,
proactivity_level: ProactivityLevel::Standard,
max_alerts_per_tick: 5,
}
}
}
// === Heartbeat Engine ===
pub struct HeartbeatEngine {
agent_id: String,
config: Arc<Mutex<HeartbeatConfig>>,
running: Arc<Mutex<bool>>,
stop_notify: Arc<Notify>,
alert_sender: broadcast::Sender<HeartbeatAlert>,
history: Arc<Mutex<Vec<HeartbeatResult>>>,
}
impl HeartbeatEngine {
pub fn new(agent_id: String, config: Option<HeartbeatConfig>) -> Self {
let (alert_sender, _) = broadcast::channel(100);
Self {
agent_id,
config: Arc::new(Mutex::new(config.unwrap_or_default())),
running: Arc::new(Mutex::new(false)),
stop_notify: Arc::new(Notify::new()),
alert_sender,
history: Arc::new(Mutex::new(Vec::new())),
}
}
/// Start the heartbeat engine with periodic ticks
pub async fn start(&self) {
let mut running = self.running.lock().await;
if *running {
return;
}
*running = true;
drop(running);
let agent_id = self.agent_id.clone();
let config = Arc::clone(&self.config);
let running_clone = Arc::clone(&self.running);
let stop_notify = Arc::clone(&self.stop_notify);
let alert_sender = self.alert_sender.clone();
let history = Arc::clone(&self.history);
tokio::spawn(async move {
loop {
// Re-read interval every loop — supports dynamic config changes
let sleep_secs = config.lock().await.interval_minutes * 60;
// Interruptible sleep: stop_notify wakes immediately on stop()
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(sleep_secs)) => {},
_ = stop_notify.notified() => { break; }
};
if !*running_clone.lock().await {
break;
}
// Check quiet hours
if is_quiet_hours(&*config.lock().await) {
continue;
}
// Execute heartbeat tick
let result = execute_tick(&agent_id, &config, &alert_sender).await;
// Store history in-memory
let mut hist = history.lock().await;
hist.push(result);
if hist.len() > 100 {
*hist = hist.split_off(50);
}
// Persist history to VikingStorage (fire-and-forget)
let history_to_persist: Vec<HeartbeatResult> = hist.clone();
let aid = agent_id.clone();
tokio::spawn(async move {
if let Ok(storage) = crate::viking_commands::get_storage().await {
let key = format!("heartbeat:history:{}", aid);
if let Ok(json) = serde_json::to_string(&history_to_persist) {
if let Err(e) = zclaw_growth::VikingStorage::store_metadata_json(
&*storage, &key, &json,
).await {
tracing::warn!("[heartbeat] Failed to persist history: {}", e);
}
}
}
});
}
});
}
/// Stop the heartbeat engine
pub async fn stop(&self) {
let mut running = self.running.lock().await;
*running = false;
self.stop_notify.notify_one(); // Wake up sleep immediately
}
/// Check if the engine is running
pub async fn is_running(&self) -> bool {
*self.running.lock().await
}
/// Execute a single tick manually and persist the result to history
pub async fn tick(&self) -> HeartbeatResult {
let result = execute_tick(&self.agent_id, &self.config, &self.alert_sender).await;
// Store in history (same as the periodic loop)
let mut hist = self.history.lock().await;
hist.push(result.clone());
if hist.len() > 100 {
*hist = hist.split_off(50);
}
// Persist to VikingStorage
let history_to_persist: Vec<HeartbeatResult> = hist.clone();
let aid = self.agent_id.clone();
tokio::spawn(async move {
if let Ok(storage) = crate::viking_commands::get_storage().await {
let key = format!("heartbeat:history:{}", aid);
if let Ok(json) = serde_json::to_string(&history_to_persist) {
if let Err(e) = zclaw_growth::VikingStorage::store_metadata_json(
&*storage, &key, &json,
).await {
tracing::warn!("[heartbeat] Failed to persist history: {}", e);
}
}
}
});
result
}
/// Get heartbeat history
pub async fn get_history(&self, limit: usize) -> Vec<HeartbeatResult> {
let hist = self.history.lock().await;
hist.iter().rev().take(limit).cloned().collect()
}
/// Restore heartbeat history from VikingStorage metadata (called during init)
pub async fn restore_history(&self) {
let key = format!("heartbeat:history:{}", self.agent_id);
match crate::viking_commands::get_storage().await {
Ok(storage) => {
match zclaw_growth::VikingStorage::get_metadata_json(&*storage, &key).await {
Ok(Some(json)) => {
if let Ok(persisted) = serde_json::from_str::<Vec<HeartbeatResult>>(&json) {
let count = persisted.len();
let mut hist = self.history.lock().await;
*hist = persisted;
tracing::info!(
"[heartbeat] Restored {} history entries for {}",
count, self.agent_id
);
}
}
Ok(None) => {
tracing::debug!("[heartbeat] No persisted history for {}", self.agent_id);
}
Err(e) => {
tracing::warn!("[heartbeat] Failed to restore history: {}", e);
}
}
}
Err(e) => {
tracing::warn!("[heartbeat] Storage unavailable during init: {}", e);
}
}
}
/// Update configuration and persist to VikingStorage
pub async fn update_config(&self, updates: HeartbeatConfig) {
*self.config.lock().await = updates.clone();
// Persist config to VikingStorage
let key = format!("heartbeat:config:{}", self.agent_id);
tokio::spawn(async move {
if let Ok(storage) = crate::viking_commands::get_storage().await {
if let Ok(json) = serde_json::to_string(&updates) {
if let Err(e) = zclaw_growth::VikingStorage::store_metadata_json(
&*storage, &key, &json,
).await {
tracing::warn!("[heartbeat] Failed to persist config: {}", e);
}
}
}
});
}
/// Get current configuration
pub async fn get_config(&self) -> HeartbeatConfig {
self.config.lock().await.clone()
}
}
// === Helper Functions ===
/// Check if current time is within quiet hours
fn is_quiet_hours(config: &HeartbeatConfig) -> bool {
let start = match &config.quiet_hours_start {
Some(s) => s,
None => return false,
};
let end = match &config.quiet_hours_end {
Some(e) => e,
None => return false,
};
let now = Local::now();
let current_minutes = now.hour() * 60 + now.minute();
let start_minutes = parse_time_to_minutes(start);
let end_minutes = parse_time_to_minutes(end);
if start_minutes <= end_minutes {
// Same-day range (e.g., 13:00-17:00)
current_minutes >= start_minutes && current_minutes < end_minutes
} else {
// Cross-midnight range (e.g., 22:00-08:00)
current_minutes >= start_minutes || current_minutes < end_minutes
}
}
/// Parse "HH:MM" format to minutes since midnight
fn parse_time_to_minutes(time: &str) -> u32 {
let parts: Vec<&str> = time.split(':').collect();
if parts.len() != 2 {
return 0;
}
let hours: u32 = parts[0].parse().unwrap_or(0);
let minutes: u32 = parts[1].parse().unwrap_or(0);
hours * 60 + minutes
}
/// Execute a single heartbeat tick
async fn execute_tick(
agent_id: &str,
config: &Arc<Mutex<HeartbeatConfig>>,
alert_sender: &broadcast::Sender<HeartbeatAlert>,
) -> HeartbeatResult {
let cfg = config.lock().await;
let mut alerts = Vec::new();
// Run built-in checks
let checks: Vec<(&str, fn(&str) -> Option<HeartbeatAlert>)> = vec![
("pending-tasks", check_pending_tasks),
("memory-health", check_memory_health),
("idle-greeting", check_idle_greeting),
("personality-improvement", check_personality_improvement),
("learning-opportunities", check_learning_opportunities),
];
let checks_count = checks.len();
for (source, check_fn) in checks {
if alerts.len() >= cfg.max_alerts_per_tick {
break;
}
if let Some(alert) = check_fn(agent_id) {
// Add source to alert
alerts.push(HeartbeatAlert {
source: source.to_string(),
..alert
});
}
}
// Filter by proactivity level
let filtered_alerts = filter_by_proactivity(&alerts, &cfg.proactivity_level);
// Send alerts via broadcast channel (internal)
for alert in &filtered_alerts {
let _ = alert_sender.send(alert.clone());
}
// Emit alerts to frontend via Tauri event (real-time toast)
if !filtered_alerts.is_empty() {
if let Some(app) = HEARTBEAT_APP_HANDLE.get() {
if let Err(e) = app.emit("heartbeat:alert", &filtered_alerts) {
tracing::warn!("[heartbeat] Failed to emit alert: {}", e);
}
}
}
let status = if filtered_alerts.is_empty() {
HeartbeatStatus::Ok
} else {
HeartbeatStatus::Alert
};
HeartbeatResult {
status,
alerts: filtered_alerts,
checked_items: checks_count,
timestamp: chrono::Utc::now().to_rfc3339(),
}
}
/// Filter alerts based on proactivity level
fn filter_by_proactivity(alerts: &[HeartbeatAlert], level: &ProactivityLevel) -> Vec<HeartbeatAlert> {
match level {
ProactivityLevel::Silent => vec![],
ProactivityLevel::Light => alerts
.iter()
.filter(|a| matches!(a.urgency, Urgency::High))
.cloned()
.collect(),
ProactivityLevel::Standard => alerts
.iter()
.filter(|a| matches!(a.urgency, Urgency::High | Urgency::Medium))
.cloned()
.collect(),
ProactivityLevel::Autonomous => alerts.to_vec(),
}
}
// === Built-in Checks ===
/// Pattern detection counters (shared state for personality detection)
use std::collections::HashMap as StdHashMap;
use std::sync::RwLock;
/// Global correction counters
static CORRECTION_COUNTERS: OnceLock<RwLock<StdHashMap<String, usize>>> = OnceLock::new();
/// Global memory stats cache (updated by frontend via Tauri command)
/// Key: agent_id, Value: (task_count, total_memories, storage_bytes)
static MEMORY_STATS_CACHE: OnceLock<RwLock<StdHashMap<String, MemoryStatsCache>>> = OnceLock::new();
/// Global last interaction timestamps
/// Key: agent_id, Value: last interaction timestamp (RFC3339)
static LAST_INTERACTION: OnceLock<RwLock<StdHashMap<String, String>>> = OnceLock::new();
/// Cached memory stats for an agent
#[derive(Clone, Debug, Default)]
pub struct MemoryStatsCache {
pub task_count: usize,
pub total_entries: usize,
pub storage_size_bytes: usize,
#[allow(dead_code)] // Reserved for UI display; will be exposed via heartbeat_get_memory_stats
pub last_updated: Option<String>,
}
fn get_correction_counters() -> &'static RwLock<StdHashMap<String, usize>> {
CORRECTION_COUNTERS.get_or_init(|| RwLock::new(StdHashMap::new()))
}
pub fn get_memory_stats_cache() -> &'static RwLock<StdHashMap<String, MemoryStatsCache>> {
MEMORY_STATS_CACHE.get_or_init(|| RwLock::new(StdHashMap::new()))
}
fn get_last_interaction_map() -> &'static RwLock<StdHashMap<String, String>> {
LAST_INTERACTION.get_or_init(|| RwLock::new(StdHashMap::new()))
}
/// Record an interaction for an agent (call from frontend when user sends message)
pub fn record_interaction(agent_id: &str) {
let now = chrono::Utc::now().to_rfc3339();
// Store in-memory map (fast path)
let map = get_last_interaction_map();
if let Ok(mut map) = map.write() {
map.insert(agent_id.to_string(), now.clone());
}
// Persist to VikingStorage metadata (survives restarts)
let key = format!("heartbeat:last_interaction:{}", agent_id);
tokio::spawn(async move {
if let Ok(storage) = crate::viking_commands::get_storage().await {
if let Err(e) = zclaw_growth::VikingStorage::store_metadata_json(&*storage, &key, &now).await {
tracing::warn!("[heartbeat] Failed to persist interaction time: {}", e);
}
}
});
}
/// Update memory stats cache for an agent
/// Call this from frontend via Tauri command after fetching memory stats
pub fn update_memory_stats_cache(agent_id: &str, task_count: usize, total_entries: usize, storage_size_bytes: usize) {
let cache = get_memory_stats_cache();
if let Ok(mut cache) = cache.write() {
cache.insert(agent_id.to_string(), MemoryStatsCache {
task_count,
total_entries,
storage_size_bytes,
last_updated: Some(chrono::Utc::now().to_rfc3339()),
});
}
}
/// Get memory stats for an agent
fn get_cached_memory_stats(agent_id: &str) -> Option<MemoryStatsCache> {
let cache = get_memory_stats_cache();
if let Ok(cache) = cache.read() {
cache.get(agent_id).cloned()
} else {
None
}
}
/// Record a user correction for pattern detection
/// Call this when user corrects agent behavior
pub fn record_user_correction(agent_id: &str, correction_type: &str) {
let key = format!("{}:{}", agent_id, correction_type);
let counters = get_correction_counters();
if let Ok(mut counters) = counters.write() {
*counters.entry(key).or_insert(0) += 1;
}
}
/// Get and reset correction count
fn get_correction_count(agent_id: &str, correction_type: &str) -> usize {
let key = format!("{}:{}", agent_id, correction_type);
let counters = get_correction_counters();
if let Ok(mut counters) = counters.write() {
counters.remove(&key).unwrap_or(0)
} else {
0
}
}
/// Check all correction patterns for an agent
fn check_correction_patterns(agent_id: &str) -> Vec<HeartbeatAlert> {
let patterns = [
("communication_style", "简洁", "用户偏好简洁回复,建议减少冗长解释"),
("tone", "轻松", "用户偏好轻松语气,建议减少正式用语"),
("detail_level", "概要", "用户偏好概要性回答,建议先给结论再展开"),
("language", "中文", "用户语言偏好,建议优先使用中文"),
("code_first", "代码优先", "用户偏好代码优先,建议先展示代码再解释"),
];
let mut alerts = Vec::new();
for (pattern_type, _keyword, suggestion) in patterns {
let count = get_correction_count(agent_id, pattern_type);
if count >= 3 {
alerts.push(HeartbeatAlert {
title: "人格改进建议".to_string(),
content: format!("{} (检测到 {} 次相关纠正)", suggestion, count),
urgency: Urgency::Medium,
source: "personality-improvement".to_string(),
timestamp: chrono::Utc::now().to_rfc3339(),
});
}
}
alerts
}
/// Fallback: query memory stats directly from VikingStorage when frontend cache is empty
fn query_memory_stats_fallback(agent_id: &str) -> Option<MemoryStatsCache> {
// This is a synchronous approximation — we check if we have a recent cache entry
// by probing the global cache one more time with a slightly different approach
// The real fallback is to count VikingStorage entries, but that's async and can't
// be called from sync check functions. Instead, we return None and let the
// periodic memory stats sync populate the cache.
// NOTE: This is intentionally a lightweight no-op fallback. The real data comes
// from the frontend sync (every 5 min) or the upcoming health_snapshot command.
let _ = agent_id;
None
}
/// Check for pending task memories
/// Uses cached memory stats to detect task backlog
fn check_pending_tasks(agent_id: &str) -> Option<HeartbeatAlert> {
match get_cached_memory_stats(agent_id) {
Some(stats) if stats.task_count >= 5 => {
// Alert if there are 5+ pending tasks
Some(HeartbeatAlert {
title: "待办任务积压".to_string(),
content: format!("当前有 {} 个待办任务未完成,建议处理或重新评估优先级", stats.task_count),
urgency: if stats.task_count >= 10 {
Urgency::High
} else {
Urgency::Medium
},
source: "pending-tasks".to_string(),
timestamp: chrono::Utc::now().to_rfc3339(),
})
},
Some(_) => None, // Stats available but no alert needed
None => {
// Cache is empty — fallback to VikingStorage direct query
let fallback = query_memory_stats_fallback(agent_id);
match fallback {
Some(stats) if stats.task_count >= 5 => {
Some(HeartbeatAlert {
title: "待办任务积压".to_string(),
content: format!("当前有 {} 个待办任务未完成,建议处理或重新评估优先级", stats.task_count),
urgency: if stats.task_count >= 10 {
Urgency::High
} else {
Urgency::Medium
},
source: "pending-tasks".to_string(),
timestamp: chrono::Utc::now().to_rfc3339(),
})
},
Some(_) => None, // Fallback stats available but no alert needed
None => {
tracing::warn!("[Heartbeat] Memory stats unavailable for agent {} (cache + fallback empty)", agent_id);
Some(HeartbeatAlert {
title: "记忆统计未同步".to_string(),
content: "心跳引擎未能获取记忆统计信息,部分检查被跳过。请确保记忆系统正常运行。".to_string(),
urgency: Urgency::Low,
source: "pending-tasks".to_string(),
timestamp: chrono::Utc::now().to_rfc3339(),
})
}
}
}
}
}
/// Check memory storage health
/// Uses cached memory stats to detect storage issues
fn check_memory_health(agent_id: &str) -> Option<HeartbeatAlert> {
match get_cached_memory_stats(agent_id) {
Some(stats) => {
// Alert if storage is very large (> 50MB)
if stats.storage_size_bytes > 50 * 1024 * 1024 {
return Some(HeartbeatAlert {
title: "记忆存储过大".to_string(),
content: format!(
"记忆存储已达 {:.1}MB建议清理低重要性记忆或归档旧记忆",
stats.storage_size_bytes as f64 / (1024.0 * 1024.0)
),
urgency: Urgency::Medium,
source: "memory-health".to_string(),
timestamp: chrono::Utc::now().to_rfc3339(),
});
}
// Alert if too many memories (> 1000)
if stats.total_entries > 1000 {
return Some(HeartbeatAlert {
title: "记忆条目过多".to_string(),
content: format!(
"当前有 {} 条记忆,可能影响检索效率,建议清理或归档",
stats.total_entries
),
urgency: Urgency::Low,
source: "memory-health".to_string(),
timestamp: chrono::Utc::now().to_rfc3339(),
});
}
None
},
None => {
// Cache is empty - skip check (already reported in check_pending_tasks)
None
}
}
}
/// Check if user has been idle and might benefit from a greeting
fn check_idle_greeting(agent_id: &str) -> Option<HeartbeatAlert> {
let map = get_last_interaction_map();
// Try to get the last interaction time
let last_interaction = {
let read_result = map.read();
match read_result {
Ok(map) => map.get(agent_id).cloned(),
Err(_) => return None, // Skip if lock fails
}
};
// If no interaction recorded yet, skip
let last_interaction = last_interaction?;
// Parse the timestamp and convert to UTC for comparison
let last_time = chrono::DateTime::parse_from_rfc3339(&last_interaction)
.ok()?
.with_timezone(&chrono::Utc);
let now = chrono::Utc::now();
let idle_hours = (now - last_time).num_hours();
// Alert if idle for more than 24 hours
if idle_hours >= 24 {
Some(HeartbeatAlert {
title: "用户长时间未互动".to_string(),
content: format!(
"距离上次互动已过去 {} 小时,可以考虑主动问候或检查用户是否需要帮助",
idle_hours
),
urgency: Urgency::Low,
source: "idle-greeting".to_string(),
timestamp: now.to_rfc3339(),
})
} else {
None
}
}
/// Check for personality improvement opportunities
///
/// Detects patterns that suggest the agent's personality could be improved:
/// - User repeatedly corrects behavior (e.g., "不要那么啰嗦")
/// - User expresses same preference multiple times
/// - Context changes (new project, different role)
///
/// When threshold is reached, proposes a personality change via the identity system.
fn check_personality_improvement(agent_id: &str) -> Option<HeartbeatAlert> {
// Check all correction patterns and return the first one that triggers
let alerts = check_correction_patterns(agent_id);
alerts.into_iter().next()
}
/// Check for learning opportunities from recent conversations
///
/// Identifies opportunities to capture user preferences or behavioral patterns
/// that could enhance agent effectiveness.
fn check_learning_opportunities(agent_id: &str) -> Option<HeartbeatAlert> {
// Check if any correction patterns are approaching threshold
let counters = get_correction_counters();
let mut approaching_threshold: Vec<String> = Vec::new();
if let Ok(counters) = counters.read() {
for (key, count) in counters.iter() {
if key.starts_with(&format!("{}:", agent_id)) && *count >= 2 && *count < 3 {
let pattern_type = key.split(':').nth(1).unwrap_or("unknown").to_string();
approaching_threshold.push(pattern_type);
}
}
}
if !approaching_threshold.is_empty() {
Some(HeartbeatAlert {
title: "学习机会".to_string(),
content: format!(
"检测到用户可能有偏好调整倾向 ({}),继续观察将触发人格改进建议",
approaching_threshold.join(", ")
),
urgency: Urgency::Low,
source: "learning-opportunities".to_string(),
timestamp: chrono::Utc::now().to_rfc3339(),
})
} else {
None
}
}
// === Tauri Commands ===
/// Heartbeat engine state for Tauri
pub type HeartbeatEngineState = Arc<Mutex<HashMap<String, HeartbeatEngine>>>;
/// Initialize heartbeat engine for an agent
///
/// Restores persisted interaction time and config from VikingStorage so
/// idle-greeting check and config changes survive across app restarts.
// @connected
#[tauri::command]
pub async fn heartbeat_init(
app: AppHandle,
agent_id: String,
config: Option<HeartbeatConfig>,
state: tauri::State<'_, HeartbeatEngineState>,
) -> Result<(), String> {
// Store AppHandle globally for real-time alert emission
if let Err(_) = HEARTBEAT_APP_HANDLE.set(app) {
tracing::warn!("[heartbeat] APP_HANDLE already set (multiple init calls)");
}
// P2-06: Validate minimum interval (prevent busy-loop)
const MIN_INTERVAL_MINUTES: u64 = 1;
if let Some(ref cfg) = config {
if cfg.interval_minutes < MIN_INTERVAL_MINUTES {
return Err(format!(
"interval_minutes must be >= {} (got {})",
MIN_INTERVAL_MINUTES, cfg.interval_minutes
));
}
}
// Restore config from VikingStorage (overrides passed-in default)
let restored_config = restore_config_from_storage(&agent_id).await
.or(config);
let engine = HeartbeatEngine::new(agent_id.clone(), restored_config);
// Restore last interaction time from VikingStorage metadata
restore_last_interaction(&agent_id).await;
// Restore heartbeat history from VikingStorage metadata
engine.restore_history().await;
let mut engines = state.lock().await;
engines.insert(agent_id, engine);
Ok(())
}
/// Restore config from VikingStorage, returns None if not found
async fn restore_config_from_storage(agent_id: &str) -> Option<HeartbeatConfig> {
let key = format!("heartbeat:config:{}", agent_id);
match crate::viking_commands::get_storage().await {
Ok(storage) => {
match zclaw_growth::VikingStorage::get_metadata_json(&*storage, &key).await {
Ok(Some(json)) => {
match serde_json::from_str::<HeartbeatConfig>(&json) {
Ok(cfg) => {
tracing::info!("[heartbeat] Restored config for {}", agent_id);
Some(cfg)
}
Err(e) => {
tracing::warn!("[heartbeat] Failed to parse persisted config: {}", e);
None
}
}
}
Ok(None) => None,
Err(e) => {
tracing::warn!("[heartbeat] Failed to read persisted config: {}", e);
None
}
}
}
Err(e) => {
tracing::warn!("[heartbeat] Storage unavailable for config restore: {}", e);
None
}
}
}
/// Restore the last interaction timestamp for an agent from VikingStorage.
/// Called during heartbeat_init so the idle-greeting check works after restart.
pub async fn restore_last_interaction(agent_id: &str) {
let key = format!("heartbeat:last_interaction:{}", agent_id);
match crate::viking_commands::get_storage().await {
Ok(storage) => {
match zclaw_growth::VikingStorage::get_metadata_json(&*storage, &key).await {
Ok(Some(timestamp)) => {
let map = get_last_interaction_map();
if let Ok(mut map) = map.write() {
map.insert(agent_id.to_string(), timestamp);
}
tracing::info!("[heartbeat] Restored last interaction for {}", agent_id);
}
Ok(None) => {
tracing::debug!("[heartbeat] No persisted interaction for {}", agent_id);
}
Err(e) => {
tracing::warn!("[heartbeat] Failed to restore interaction: {}", e);
}
}
}
Err(e) => {
tracing::warn!("[heartbeat] Storage unavailable during init: {}", e);
}
}
}
/// Start heartbeat engine for an agent
// @connected
#[tauri::command]
pub async fn heartbeat_start(
agent_id: String,
state: tauri::State<'_, HeartbeatEngineState>,
) -> Result<(), String> {
let engines = state.lock().await;
let engine = engines
.get(&agent_id)
.ok_or_else(|| format!("Heartbeat engine not initialized for agent: {}", agent_id))?;
engine.start().await;
Ok(())
}
/// Stop heartbeat engine for an agent
// @connected
#[tauri::command]
pub async fn heartbeat_stop(
agent_id: String,
state: tauri::State<'_, HeartbeatEngineState>,
) -> Result<(), String> {
let engines = state.lock().await;
let engine = engines
.get(&agent_id)
.ok_or_else(|| format!("Heartbeat engine not initialized for agent: {}", agent_id))?;
engine.stop().await;
Ok(())
}
/// Execute a single heartbeat tick
// @connected
#[tauri::command]
pub async fn heartbeat_tick(
agent_id: String,
state: tauri::State<'_, HeartbeatEngineState>,
) -> Result<HeartbeatResult, String> {
let engines = state.lock().await;
let engine = engines
.get(&agent_id)
.ok_or_else(|| format!("Heartbeat engine not initialized for agent: {}", agent_id))?;
Ok(engine.tick().await)
}
/// Get heartbeat configuration
// @connected
#[tauri::command]
pub async fn heartbeat_get_config(
agent_id: String,
state: tauri::State<'_, HeartbeatEngineState>,
) -> Result<HeartbeatConfig, String> {
let engines = state.lock().await;
let engine = engines
.get(&agent_id)
.ok_or_else(|| format!("Heartbeat engine not initialized for agent: {}", agent_id))?;
Ok(engine.get_config().await)
}
/// Update heartbeat configuration
// @connected
#[tauri::command]
pub async fn heartbeat_update_config(
agent_id: String,
config: HeartbeatConfig,
state: tauri::State<'_, HeartbeatEngineState>,
) -> Result<(), String> {
// P2-06: Validate minimum interval (same as heartbeat_init)
const MIN_INTERVAL_MINUTES: u64 = 1;
if config.interval_minutes < MIN_INTERVAL_MINUTES {
return Err(format!(
"interval_minutes must be >= {} (got {})",
MIN_INTERVAL_MINUTES, config.interval_minutes
));
}
let engines = state.lock().await;
let engine = engines
.get(&agent_id)
.ok_or_else(|| format!("Heartbeat engine not initialized for agent: {}", agent_id))?;
engine.update_config(config).await;
Ok(())
}
/// Get heartbeat history
// @connected
#[tauri::command]
pub async fn heartbeat_get_history(
agent_id: String,
limit: Option<usize>,
state: tauri::State<'_, HeartbeatEngineState>,
) -> Result<Vec<HeartbeatResult>, String> {
let engines = state.lock().await;
let engine = engines
.get(&agent_id)
.ok_or_else(|| format!("Heartbeat engine not initialized for agent: {}", agent_id))?;
Ok(engine.get_history(limit.unwrap_or(20)).await)
}
/// Update memory stats cache for heartbeat checks
/// This should be called by the frontend after fetching memory stats
// @connected
#[tauri::command]
pub async fn heartbeat_update_memory_stats(
agent_id: String,
task_count: usize,
total_entries: usize,
storage_size_bytes: usize,
) -> Result<(), String> {
update_memory_stats_cache(&agent_id, task_count, total_entries, storage_size_bytes);
Ok(())
}
/// Record a user correction for personality improvement detection
// @connected
#[tauri::command]
pub async fn heartbeat_record_correction(
agent_id: String,
correction_type: String,
) -> Result<(), String> {
record_user_correction(&agent_id, &correction_type);
Ok(())
}
/// Record a user interaction for idle greeting detection
/// Call this from frontend whenever user sends a message
// @connected
#[tauri::command]
pub async fn heartbeat_record_interaction(
agent_id: String,
) -> Result<(), String> {
record_interaction(&agent_id);
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_time() {
assert_eq!(parse_time_to_minutes("00:00"), 0);
assert_eq!(parse_time_to_minutes("08:00"), 480);
assert_eq!(parse_time_to_minutes("22:00"), 1320);
assert_eq!(parse_time_to_minutes("23:59"), 1439);
}
#[test]
fn test_default_config() {
let config = HeartbeatConfig::default();
assert!(config.enabled);
assert_eq!(config.interval_minutes, 30);
}
}