From 0a04b260a48f892cf993b177cc0fec1d6354c213 Mon Sep 17 00:00:00 2001 From: iven Date: Fri, 3 Apr 2026 00:24:16 +0800 Subject: [PATCH] refactor(desktop): ChatStore structured split + IDB persistence + stream cancel Split monolithic chatStore.ts (908 lines) into 4 focused stores: - chatStore.ts: facade layer, owns messages[], backward-compatible selectors - conversationStore.ts: conversation CRUD, agent switching, IndexedDB persistence - streamStore.ts: streaming orchestration, chat mode, suggestions - messageStore.ts: token tracking Key fixes from 3-round deep audit: - C1: Fix Rust serde camelCase vs TS snake_case mismatch (toolStart/toolEnd/iterationStart) - C2: Fix IDB async rehydration race with persist.hasHydrated() subscribe - C3: Add sessionKey to partialize to survive page refresh - H3: Fix IDB migration retry on failure (don't set migrated=true in catch) - M3: Fix ToolCallStep deduplication (toolStart creates, toolEnd updates) - M-NEW-2: Clear sessionKey on cancelStream Also adds: - Rust backend stream cancellation via AtomicBool + cancel_stream command - IndexedDB storage adapter with one-time localStorage migration - HMR cleanup for cross-store subscriptions --- desktop/src-tauri/src/kernel_commands/a2a.rs | 4 + .../src-tauri/src/kernel_commands/agent.rs | 7 + .../src-tauri/src/kernel_commands/approval.rs | 2 + desktop/src-tauri/src/kernel_commands/chat.rs | 85 +- desktop/src-tauri/src/kernel_commands/hand.rs | 8 + .../src/kernel_commands/lifecycle.rs | 4 + desktop/src-tauri/src/kernel_commands/mod.rs | 5 + .../src/kernel_commands/scheduled_task.rs | 2 + .../src-tauri/src/kernel_commands/skill.rs | 6 + .../src-tauri/src/kernel_commands/trigger.rs | 6 + desktop/src-tauri/src/lib.rs | 2 + desktop/src/lib/idb-storage.ts | 134 +++ desktop/src/lib/kernel-chat.ts | 19 +- desktop/src/lib/kernel-client.ts | 2 +- desktop/src/lib/kernel-types.ts | 6 +- desktop/src/store/agentStore.ts | 3 +- desktop/src/store/chat/conversationStore.ts | 42 +- desktop/src/store/chat/messageStore.ts | 98 ++ desktop/src/store/chat/streamStore.ts | 660 +++++++++++++ desktop/src/store/chatStore.ts | 931 ++++-------------- desktop/src/store/index.ts | 8 + .../2026-04-02-chatstore-refactor-design.md | 2 +- 22 files changed, 1269 insertions(+), 767 deletions(-) create mode 100644 desktop/src/lib/idb-storage.ts create mode 100644 desktop/src/store/chat/messageStore.ts create mode 100644 desktop/src/store/chat/streamStore.ts diff --git a/desktop/src-tauri/src/kernel_commands/a2a.rs b/desktop/src-tauri/src/kernel_commands/a2a.rs index 076487e..1b06898 100644 --- a/desktop/src-tauri/src/kernel_commands/a2a.rs +++ b/desktop/src-tauri/src/kernel_commands/a2a.rs @@ -12,6 +12,7 @@ use super::KernelState; #[cfg(feature = "multi-agent")] /// Send a direct A2A message from one agent to another +// @connected #[tauri::command] pub async fn agent_a2a_send( state: State<'_, KernelState>, @@ -44,6 +45,7 @@ pub async fn agent_a2a_send( /// Broadcast a message from one agent to all other agents #[cfg(feature = "multi-agent")] +// @connected #[tauri::command] pub async fn agent_a2a_broadcast( state: State<'_, KernelState>, @@ -65,6 +67,7 @@ pub async fn agent_a2a_broadcast( /// Discover agents with a specific capability #[cfg(feature = "multi-agent")] +// @connected #[tauri::command] pub async fn agent_a2a_discover( state: State<'_, KernelState>, @@ -86,6 +89,7 @@ pub async fn agent_a2a_discover( /// Delegate a task to another agent and wait for response #[cfg(feature = "multi-agent")] +// @connected #[tauri::command] pub async fn agent_a2a_delegate_task( state: State<'_, KernelState>, diff --git a/desktop/src-tauri/src/kernel_commands/agent.rs b/desktop/src-tauri/src/kernel_commands/agent.rs index 248b6cb..130fd91 100644 --- a/desktop/src-tauri/src/kernel_commands/agent.rs +++ b/desktop/src-tauri/src/kernel_commands/agent.rs @@ -65,6 +65,7 @@ pub struct AgentUpdateRequest { // --------------------------------------------------------------------------- /// Create a new agent +// @connected #[tauri::command] pub async fn agent_create( state: State<'_, KernelState>, @@ -103,6 +104,7 @@ pub async fn agent_create( } /// List all agents +// @connected #[tauri::command] pub async fn agent_list( state: State<'_, KernelState>, @@ -116,6 +118,7 @@ pub async fn agent_list( } /// Get agent info +// @connected #[tauri::command] pub async fn agent_get( state: State<'_, KernelState>, @@ -135,6 +138,7 @@ pub async fn agent_get( } /// Delete an agent +// @connected #[tauri::command] pub async fn agent_delete( state: State<'_, KernelState>, @@ -156,6 +160,7 @@ pub async fn agent_delete( } /// Update an agent's configuration +// @connected #[tauri::command] pub async fn agent_update( state: State<'_, KernelState>, @@ -209,6 +214,7 @@ pub async fn agent_update( } /// Export an agent configuration as JSON +// @reserved: 暂无前端集成 #[tauri::command] pub async fn agent_export( state: State<'_, KernelState>, @@ -231,6 +237,7 @@ pub async fn agent_export( } /// Import an agent from JSON configuration +// @reserved: 暂无前端集成 #[tauri::command] pub async fn agent_import( state: State<'_, KernelState>, diff --git a/desktop/src-tauri/src/kernel_commands/approval.rs b/desktop/src-tauri/src/kernel_commands/approval.rs index 0e702aa..6bda183 100644 --- a/desktop/src-tauri/src/kernel_commands/approval.rs +++ b/desktop/src-tauri/src/kernel_commands/approval.rs @@ -25,6 +25,7 @@ pub struct ApprovalResponse { } /// List pending approvals +// @reserved: 暂无前端集成 #[tauri::command] pub async fn approval_list( state: State<'_, KernelState>, @@ -48,6 +49,7 @@ pub async fn approval_list( /// When approved, the kernel's `respond_to_approval` internally spawns the Hand /// execution. We additionally emit Tauri events so the frontend can track when /// the execution finishes, since the kernel layer has no access to the AppHandle. +// @connected #[tauri::command] pub async fn approval_respond( app: AppHandle, diff --git a/desktop/src-tauri/src/kernel_commands/chat.rs b/desktop/src-tauri/src/kernel_commands/chat.rs index 49ba752..bb283ab 100644 --- a/desktop/src-tauri/src/kernel_commands/chat.rs +++ b/desktop/src-tauri/src/kernel_commands/chat.rs @@ -5,7 +5,7 @@ use serde::{Deserialize, Serialize}; use tauri::{AppHandle, Emitter, State}; use zclaw_types::AgentId; -use super::{validate_agent_id, KernelState, SessionStreamGuard}; +use super::{validate_agent_id, KernelState, SessionStreamGuard, StreamCancelFlags}; use crate::intelligence::validation::validate_string_length; // --------------------------------------------------------------------------- @@ -67,6 +67,7 @@ pub struct StreamChatRequest { // --------------------------------------------------------------------------- /// Send a message to an agent +// @connected #[tauri::command] pub async fn agent_chat( state: State<'_, KernelState>, @@ -99,6 +100,7 @@ pub async fn agent_chat( /// This command initiates a streaming chat session. Events are emitted /// via Tauri's event system with the name "stream:chunk" and include /// the session_id for routing. +// @connected #[tauri::command] pub async fn agent_chat_stream( app: AppHandle, @@ -107,6 +109,7 @@ pub async fn agent_chat_stream( heartbeat_state: State<'_, crate::intelligence::HeartbeatEngineState>, reflection_state: State<'_, crate::intelligence::ReflectionEngineState>, stream_guard: State<'_, SessionStreamGuard>, + cancel_flags: State<'_, StreamCancelFlags>, request: StreamChatRequest, ) -> Result<(), String> { validate_agent_id(&request.agent_id)?; @@ -136,6 +139,21 @@ pub async fn agent_chat_stream( return Err(format!("Session {} already has an active stream", session_id)); } + // Prepare cleanup resources for error paths (before spawn takes ownership) + let err_cleanup_guard = stream_guard.inner().clone(); + let err_cleanup_cancel = cancel_flags.inner().clone(); + let err_cleanup_session_id = session_id.clone(); + let err_cleanup_flag = Arc::clone(&*session_active); + + // Register cancellation flag for this session + let cancel_flag = cancel_flags + .entry(session_id.clone()) + .or_insert_with(|| Arc::new(std::sync::atomic::AtomicBool::new(false))); + // Ensure flag is reset (in case of stale entry from a previous stream) + cancel_flag.store(false, std::sync::atomic::Ordering::SeqCst); + let cancel_clone = Arc::clone(&*cancel_flag); + let cancel_flags_map: StreamCancelFlags = cancel_flags.inner().clone(); + // AUTO-INIT HEARTBEAT { let mut engines = heartbeat_state.lock().await; @@ -160,7 +178,13 @@ pub async fn agent_chat_stream( let (mut rx, llm_driver) = { let kernel_lock = state.lock().await; let kernel = kernel_lock.as_ref() - .ok_or_else(|| "Kernel not initialized. Call kernel_init first.".to_string())?; + .ok_or_else(|| { + // Cleanup on error: release guard + cancel flag + err_cleanup_flag.store(false, std::sync::atomic::Ordering::SeqCst); + err_cleanup_guard.remove(&err_cleanup_session_id); + err_cleanup_cancel.remove(&err_cleanup_session_id); + "Kernel not initialized. Call kernel_init first.".to_string() + })?; let driver = Some(kernel.driver()); @@ -172,6 +196,10 @@ pub async fn agent_chat_stream( match uuid::Uuid::parse_str(&session_id) { Ok(uuid) => Some(zclaw_types::SessionId::from_uuid(uuid)), Err(e) => { + // Cleanup on error + err_cleanup_flag.store(false, std::sync::atomic::Ordering::SeqCst); + err_cleanup_guard.remove(&err_cleanup_session_id); + err_cleanup_cancel.remove(&err_cleanup_session_id); return Err(format!( "Invalid session_id '{}': {}. Cannot reuse conversation context.", session_id, e @@ -194,13 +222,22 @@ pub async fn agent_chat_stream( Some(chat_mode_config), ) .await - .map_err(|e| format!("Failed to start streaming: {}", e))?; + .map_err(|e| { + // Cleanup on error + err_cleanup_flag.store(false, std::sync::atomic::Ordering::SeqCst); + err_cleanup_guard.remove(&err_cleanup_session_id); + err_cleanup_cancel.remove(&err_cleanup_session_id); + format!("Failed to start streaming: {}", e) + })?; (rx, driver) }; let hb_state = heartbeat_state.inner().clone(); let rf_state = reflection_state.inner().clone(); + // Clone the guard map for cleanup in the spawned task + let guard_map: SessionStreamGuard = stream_guard.inner().clone(); + // Spawn a task to process stream events. // The session_active flag is cleared when task completes. let guard_clone = Arc::clone(&*session_active); @@ -212,6 +249,16 @@ pub async fn agent_chat_stream( let stream_timeout = tokio::time::Duration::from_secs(300); loop { + // Check cancellation flag before each recv + if cancel_clone.load(std::sync::atomic::Ordering::SeqCst) { + tracing::info!("[agent_chat_stream] Stream cancelled for session: {}", session_id); + let _ = app.emit("stream:chunk", serde_json::json!({ + "sessionId": session_id, + "event": StreamChatEvent::Error { message: "已取消".to_string() } + })); + break; + } + match tokio::time::timeout(stream_timeout, rx.recv()).await { Ok(Some(event)) => { let stream_event = match &event { @@ -300,9 +347,37 @@ pub async fn agent_chat_stream( tracing::debug!("[agent_chat_stream] Stream processing ended for session: {}", session_id); - // Release session lock - guard_clone.store(false, std::sync::atomic::Ordering::SeqCst); + // Release session lock and clean up DashMap entries to prevent memory leaks. + // Use compare_exchange to only remove if we still own the flag (guards against + // a new stream for the same session_id starting after we broke out of the loop). + if guard_clone.compare_exchange(true, false, std::sync::atomic::Ordering::SeqCst, std::sync::atomic::Ordering::SeqCst).is_ok() { + guard_map.remove(&session_id); + } + // Clean up cancellation flag (always safe — cancel is session-scoped) + cancel_flags_map.remove(&session_id); }); Ok(()) } + +/// Cancel an active stream for a given session. +/// +/// Sets the cancellation flag for the session, which the streaming task +/// checks on each iteration. The task will then emit an error event +/// and clean up. +// @connected +#[tauri::command] +pub async fn cancel_stream( + cancel_flags: State<'_, StreamCancelFlags>, + session_id: String, +) -> Result<(), String> { + if let Some(flag) = cancel_flags.get(&session_id) { + flag.store(true, std::sync::atomic::Ordering::SeqCst); + tracing::info!("[cancel_stream] Cancel requested for session: {}", session_id); + Ok(()) + } else { + // No active stream for this session — not an error, just a no-op + tracing::debug!("[cancel_stream] No active stream for session: {}", session_id); + Ok(()) + } +} diff --git a/desktop/src-tauri/src/kernel_commands/hand.rs b/desktop/src-tauri/src/kernel_commands/hand.rs index e53f32d..82fdd93 100644 --- a/desktop/src-tauri/src/kernel_commands/hand.rs +++ b/desktop/src-tauri/src/kernel_commands/hand.rs @@ -110,6 +110,7 @@ impl From for HandResult { /// /// Returns hands from the Kernel's HandRegistry. /// Hands are registered during kernel initialization. +// @connected #[tauri::command] pub async fn hand_list( state: State<'_, KernelState>, @@ -128,6 +129,7 @@ pub async fn hand_list( /// Executes a hand with the given ID and input. /// If the hand has `needs_approval = true`, creates a pending approval instead. /// Returns the hand result as JSON, or a pending status with approval ID. +// @connected #[tauri::command] pub async fn hand_execute( state: State<'_, KernelState>, @@ -190,6 +192,7 @@ pub async fn hand_execute( /// When approved, the kernel's `respond_to_approval` internally spawns the Hand /// execution. We additionally emit Tauri events so the frontend can track when /// the execution finishes. +// @connected #[tauri::command] pub async fn hand_approve( app: AppHandle, @@ -292,6 +295,7 @@ pub async fn hand_approve( } /// Cancel a hand execution +// @connected #[tauri::command] pub async fn hand_cancel( state: State<'_, KernelState>, @@ -330,6 +334,7 @@ pub async fn hand_cancel( // ============================================================ /// Get detailed info for a single hand +// @connected #[tauri::command] pub async fn hand_get( state: State<'_, KernelState>, @@ -348,6 +353,7 @@ pub async fn hand_get( } /// Get status of a specific hand run +// @connected #[tauri::command] pub async fn hand_run_status( state: State<'_, KernelState>, @@ -375,6 +381,7 @@ pub async fn hand_run_status( } /// List run history for a hand (or all hands) +// @connected #[tauri::command] pub async fn hand_run_list( state: State<'_, KernelState>, @@ -409,6 +416,7 @@ pub async fn hand_run_list( } /// Cancel a running hand execution +// @reserved: 暂无前端集成 #[tauri::command] pub async fn hand_run_cancel( state: State<'_, KernelState>, diff --git a/desktop/src-tauri/src/kernel_commands/lifecycle.rs b/desktop/src-tauri/src/kernel_commands/lifecycle.rs index c70bfee..0845396 100644 --- a/desktop/src-tauri/src/kernel_commands/lifecycle.rs +++ b/desktop/src-tauri/src/kernel_commands/lifecycle.rs @@ -54,6 +54,7 @@ pub struct KernelStatusResponse { /// /// If kernel already exists with the same config, returns existing status. /// If config changed, reboots kernel with new config. +// @connected #[tauri::command] pub async fn kernel_init( state: State<'_, KernelState>, @@ -202,6 +203,7 @@ pub async fn kernel_init( } /// Get kernel status +// @connected #[tauri::command] pub async fn kernel_status( state: State<'_, KernelState>, @@ -227,6 +229,7 @@ pub async fn kernel_status( } /// Shutdown the kernel +// @reserved: 暂无前端集成 #[tauri::command] pub async fn kernel_shutdown( state: State<'_, KernelState>, @@ -254,6 +257,7 @@ pub async fn kernel_shutdown( /// /// Writes relevant config values (agent, llm categories) to the TOML config file. /// The changes take effect on the next Kernel restart. +// @connected #[tauri::command] pub async fn kernel_apply_saas_config( configs: Vec, diff --git a/desktop/src-tauri/src/kernel_commands/mod.rs b/desktop/src-tauri/src/kernel_commands/mod.rs index 5220be0..2034661 100644 --- a/desktop/src-tauri/src/kernel_commands/mod.rs +++ b/desktop/src-tauri/src/kernel_commands/mod.rs @@ -36,6 +36,11 @@ pub type SchedulerState = Arc>>; +/// Per-session stream cancellation flags. +/// When a user cancels a stream, the flag for that session_id is set to `true`. +/// The spawned `agent_chat_stream` task checks this flag each iteration. +pub type StreamCancelFlags = Arc>>; + // --------------------------------------------------------------------------- // Shared validation helpers // --------------------------------------------------------------------------- diff --git a/desktop/src-tauri/src/kernel_commands/scheduled_task.rs b/desktop/src-tauri/src/kernel_commands/scheduled_task.rs index a94172a..e46cd96 100644 --- a/desktop/src-tauri/src/kernel_commands/scheduled_task.rs +++ b/desktop/src-tauri/src/kernel_commands/scheduled_task.rs @@ -47,6 +47,7 @@ pub struct ScheduledTaskResponse { /// /// Tasks are automatically executed by the SchedulerService which checks /// every 60 seconds for due triggers. +// @reserved: 暂无前端集成 #[tauri::command] pub async fn scheduled_task_create( state: State<'_, KernelState>, @@ -94,6 +95,7 @@ pub async fn scheduled_task_create( } /// List all scheduled tasks (kernel triggers of Schedule type) +// @connected #[tauri::command] pub async fn scheduled_task_list( state: State<'_, KernelState>, diff --git a/desktop/src-tauri/src/kernel_commands/skill.rs b/desktop/src-tauri/src/kernel_commands/skill.rs index 7de59ae..0de7ba0 100644 --- a/desktop/src-tauri/src/kernel_commands/skill.rs +++ b/desktop/src-tauri/src/kernel_commands/skill.rs @@ -53,6 +53,7 @@ impl From for SkillInfoResponse { /// /// Returns skills from the Kernel's SkillRegistry. /// Skills are loaded from the skills/ directory during kernel initialization. +// @connected #[tauri::command] pub async fn skill_list( state: State<'_, KernelState>, @@ -74,6 +75,7 @@ pub async fn skill_list( /// /// Re-scans the skills directory for new or updated skills. /// Optionally accepts a custom directory path to scan. +// @connected #[tauri::command] pub async fn skill_refresh( state: State<'_, KernelState>, @@ -124,6 +126,7 @@ pub struct UpdateSkillRequest { } /// Create a new skill in the skills directory +// @connected #[tauri::command] pub async fn skill_create( state: State<'_, KernelState>, @@ -170,6 +173,7 @@ pub async fn skill_create( } /// Update an existing skill +// @connected #[tauri::command] pub async fn skill_update( state: State<'_, KernelState>, @@ -214,6 +218,7 @@ pub async fn skill_update( } /// Delete a skill +// @connected #[tauri::command] pub async fn skill_delete( state: State<'_, KernelState>, @@ -286,6 +291,7 @@ impl From for SkillResult { /// /// Executes a skill with the given ID and input. /// Returns the skill result as JSON. +// @connected #[tauri::command] pub async fn skill_execute( state: State<'_, KernelState>, diff --git a/desktop/src-tauri/src/kernel_commands/trigger.rs b/desktop/src-tauri/src/kernel_commands/trigger.rs index 016cb4a..05eaf0e 100644 --- a/desktop/src-tauri/src/kernel_commands/trigger.rs +++ b/desktop/src-tauri/src/kernel_commands/trigger.rs @@ -96,6 +96,7 @@ impl From for TriggerResponse { } /// List all triggers +// @connected #[tauri::command] pub async fn trigger_list( state: State<'_, KernelState>, @@ -109,6 +110,7 @@ pub async fn trigger_list( } /// Get a specific trigger +// @connected #[tauri::command] pub async fn trigger_get( state: State<'_, KernelState>, @@ -125,6 +127,7 @@ pub async fn trigger_get( } /// Create a new trigger +// @connected #[tauri::command] pub async fn trigger_create( state: State<'_, KernelState>, @@ -179,6 +182,7 @@ pub async fn trigger_create( } /// Update a trigger +// @connected #[tauri::command] pub async fn trigger_update( state: State<'_, KernelState>, @@ -205,6 +209,7 @@ pub async fn trigger_update( } /// Delete a trigger +// @connected #[tauri::command] pub async fn trigger_delete( state: State<'_, KernelState>, @@ -222,6 +227,7 @@ pub async fn trigger_delete( } /// Execute a trigger manually +// @reserved: 暂无前端集成 #[tauri::command] pub async fn trigger_execute( state: State<'_, KernelState>, diff --git a/desktop/src-tauri/src/lib.rs b/desktop/src-tauri/src/lib.rs index ab59589..0ca710b 100644 --- a/desktop/src-tauri/src/lib.rs +++ b/desktop/src-tauri/src/lib.rs @@ -117,6 +117,7 @@ pub fn run() { .manage(kernel_state) .manage(scheduler_state) .manage(kernel_commands::SessionStreamGuard::default()) + .manage(kernel_commands::StreamCancelFlags::default()) .manage(pipeline_state) .manage(classroom_state) .manage(classroom_chat_state) @@ -136,6 +137,7 @@ pub fn run() { kernel_commands::agent::agent_import, kernel_commands::chat::agent_chat, kernel_commands::chat::agent_chat_stream, + kernel_commands::chat::cancel_stream, // Skills commands (dynamic discovery) kernel_commands::skill::skill_list, kernel_commands::skill::skill_refresh, diff --git a/desktop/src/lib/idb-storage.ts b/desktop/src/lib/idb-storage.ts new file mode 100644 index 0000000..44b5543 --- /dev/null +++ b/desktop/src/lib/idb-storage.ts @@ -0,0 +1,134 @@ +/** + * idb-storage.ts — Zustand-compatible async storage adapter using IndexedDB. + * + * Provides a drop-in replacement for localStorage that uses IndexedDB, + * bypassing the 5MB storage limit for conversation data. + * + * Includes one-time migration from localStorage for the conversation store. + */ + +import { openDB, type IDBPDatabase } from 'idb'; +import { createLogger } from './logger'; + +const log = createLogger('IDBStorage'); + +// --------------------------------------------------------------------------- +// IndexedDB schema +// --------------------------------------------------------------------------- + +const DB_NAME = 'zclaw-store'; +const DB_VERSION = 1; +const STORE_NAME = 'keyvalue'; + +// localStorage key that holds existing conversation data +const CONVERSATION_LS_KEY = 'zclaw-conversation-storage'; + +// --------------------------------------------------------------------------- +// Database singleton +// --------------------------------------------------------------------------- + +let dbPromise: Promise | null = null; + +function getDB(): Promise { + if (!dbPromise) { + dbPromise = openDB(DB_NAME, DB_VERSION, { + upgrade(db) { + if (!db.objectStoreNames.contains(STORE_NAME)) { + db.createObjectStore(STORE_NAME); + } + }, + }); + } + return dbPromise; +} + +// --------------------------------------------------------------------------- +// One-time migration from localStorage -> IndexedDB +// --------------------------------------------------------------------------- + +let migrated = false; + +async function migrateFromLocalStorage(): Promise { + if (migrated) return; + migrated = true; + + try { + const db = await getDB(); + // Check if IndexedDB already has data + const existing = await db.get(STORE_NAME, CONVERSATION_LS_KEY); + if (existing !== undefined) { + return; // Already migrated + } + + // Read from localStorage + const lsData = localStorage.getItem(CONVERSATION_LS_KEY); + if (!lsData) return; + + log.info('Migrating conversation data from localStorage to IndexedDB...'); + const parsed = JSON.parse(lsData); + + // Write to IndexedDB + await db.put(STORE_NAME, parsed, CONVERSATION_LS_KEY); + + // Delete from localStorage to free space + localStorage.removeItem(CONVERSATION_LS_KEY); + log.info('Migration complete. localStorage entry removed.'); + } catch (err) { + log.error('Migration from localStorage failed:', err); + // Allow retry on next load — don't leave `migrated = true` on failure + migrated = false; + } +} + +// --------------------------------------------------------------------------- +// Zustand-compatible storage adapter +// --------------------------------------------------------------------------- + +/** + * Create a Zustand persist storage adapter backed by IndexedDB. + * + * Usage: + * persist(store, { storage: createJSONStorage(() => createIdbStorageAdapter()) }) + */ +export function createIdbStorageAdapter() { + return { + getItem: async (name: string): Promise => { + // Perform migration on first access for conversation store key + if (name === CONVERSATION_LS_KEY) { + await migrateFromLocalStorage(); + } + + try { + const db = await getDB(); + const value = await db.get(STORE_NAME, name); + if (value === undefined) { + return null; + } + // Zustand persist expects a JSON string + return typeof value === 'string' ? value : JSON.stringify(value); + } catch (err) { + log.error('IndexedDB getItem failed:', err); + return null; + } + }, + + setItem: async (name: string, value: string): Promise => { + try { + const db = await getDB(); + const parsed = JSON.parse(value); + await db.put(STORE_NAME, parsed, name); + } catch (err) { + log.error('IndexedDB setItem failed:', err); + } + }, + + removeItem: async (name: string): Promise => { + try { + const db = await getDB(); + await db.delete(STORE_NAME, name); + } catch (err) { + log.error('IndexedDB removeItem failed:', err); + } + }, + }; +} diff --git a/desktop/src/lib/kernel-chat.ts b/desktop/src/lib/kernel-chat.ts index 41d22ed..33eb87f 100644 --- a/desktop/src/lib/kernel-chat.ts +++ b/desktop/src/lib/kernel-chat.ts @@ -109,7 +109,7 @@ export function installChatMethods(ClientClass: { prototype: KernelClient }): vo } break; - case 'tool_start': + case 'toolStart': log.debug('Tool started:', streamEvent.name, streamEvent.input); if (callbacks.onTool) { callbacks.onTool( @@ -120,7 +120,7 @@ export function installChatMethods(ClientClass: { prototype: KernelClient }): vo } break; - case 'tool_end': + case 'toolEnd': log.debug('Tool ended:', streamEvent.name, streamEvent.output); if (callbacks.onTool) { callbacks.onTool( @@ -145,7 +145,7 @@ export function installChatMethods(ClientClass: { prototype: KernelClient }): vo } break; - case 'iteration_start': + case 'iterationStart': log.debug('Iteration started:', streamEvent.iteration, '/', streamEvent.maxIterations); // Don't need to notify user about iterations break; @@ -201,10 +201,17 @@ export function installChatMethods(ClientClass: { prototype: KernelClient }): vo }; /** - * Cancel a stream (no-op for internal kernel) + * Cancel an active stream by session ID. + * Invokes the Rust `cancel_stream` command which sets the AtomicBool flag + * checked by the spawned streaming task each iteration. */ - proto.cancelStream = function (this: KernelClient, _runId: string): void { - // No-op: internal kernel doesn't support stream cancellation + proto.cancelStream = async function (this: KernelClient, sessionId: string): Promise { + try { + await invoke('cancel_stream', { sessionId }); + log.debug('Cancel stream requested for session:', sessionId); + } catch (err) { + log.warn('Failed to cancel stream:', err); + } }; // ─── Default Agent ─── diff --git a/desktop/src/lib/kernel-client.ts b/desktop/src/lib/kernel-client.ts index 9721cf8..72ac7f4 100644 --- a/desktop/src/lib/kernel-client.ts +++ b/desktop/src/lib/kernel-client.ts @@ -404,7 +404,7 @@ export interface KernelClient { // Chat (kernel-chat.ts) chat(message: string, opts?: { sessionKey?: string; agentId?: string }): Promise<{ runId: string; sessionId?: string; response?: string }>; chatStream(message: string, callbacks: import('./kernel-types').StreamCallbacks, opts?: { sessionKey?: string; agentId?: string; thinking_enabled?: boolean; reasoning_effort?: string; plan_mode?: boolean }): Promise<{ runId: string }>; - cancelStream(runId: string): void; + cancelStream(sessionId: string): Promise; fetchDefaultAgentId(): Promise; setDefaultAgentId(agentId: string): void; getDefaultAgentId(): string; diff --git a/desktop/src/lib/kernel-types.ts b/desktop/src/lib/kernel-types.ts index 009ff41..db4629b 100644 --- a/desktop/src/lib/kernel-types.ts +++ b/desktop/src/lib/kernel-types.ts @@ -78,19 +78,19 @@ export interface StreamEventThinkingDelta { } export interface StreamEventToolStart { - type: 'tool_start'; + type: 'toolStart'; name: string; input: unknown; } export interface StreamEventToolEnd { - type: 'tool_end'; + type: 'toolEnd'; name: string; output: unknown; } export interface StreamEventIterationStart { - type: 'iteration_start'; + type: 'iterationStart'; iteration: number; maxIterations: number; } diff --git a/desktop/src/store/agentStore.ts b/desktop/src/store/agentStore.ts index 9cc8def..099b889 100644 --- a/desktop/src/store/agentStore.ts +++ b/desktop/src/store/agentStore.ts @@ -8,6 +8,7 @@ import { create } from 'zustand'; import type { GatewayClient } from '../lib/gateway-client'; import type { AgentTemplateFull } from '../lib/saas-client'; import { useChatStore } from './chatStore'; +import { useConversationStore } from './chat/conversationStore'; // === Types === @@ -251,7 +252,7 @@ export const useAgentStore = create((set, get) => ({ loadUsageStats: async () => { try { - const { conversations } = useChatStore.getState(); + const { conversations } = useConversationStore.getState(); const tokenData = useChatStore.getState().getTotalTokens(); let totalMessages = 0; diff --git a/desktop/src/store/chat/conversationStore.ts b/desktop/src/store/chat/conversationStore.ts index 808f60e..c7e4faf 100644 --- a/desktop/src/store/chat/conversationStore.ts +++ b/desktop/src/store/chat/conversationStore.ts @@ -8,12 +8,10 @@ */ import { create } from 'zustand'; -import { persist } from 'zustand/middleware'; -import { generateRandomString } from '../lib/crypto-utils'; -import { createLogger } from '../lib/logger'; -import type { Message } from './chatStore'; - -const log = createLogger('ConversationStore'); +import { persist, createJSONStorage } from 'zustand/middleware'; +import { generateRandomString } from '../../lib/crypto-utils'; +import { createIdbStorageAdapter } from '../../lib/idb-storage'; +import type { ChatMessage } from '../../types/chat'; // --------------------------------------------------------------------------- // Types @@ -22,7 +20,7 @@ const log = createLogger('ConversationStore'); export interface Conversation { id: string; title: string; - messages: Message[]; + messages: ChatMessage[]; sessionKey: string | null; agentId: string | null; createdAt: Date; @@ -45,9 +43,6 @@ export interface AgentProfileLike { role?: string; } -// Re-export Message for internal use (avoids circular imports during migration) -export type { Message }; - // --------------------------------------------------------------------------- // State interface // --------------------------------------------------------------------------- @@ -61,10 +56,10 @@ export interface ConversationState { currentModel: string; // Actions - newConversation: (currentMessages: Message[]) => Conversation[]; - switchConversation: (id: string, currentMessages: Message[]) => { + newConversation: (currentMessages: ChatMessage[]) => Conversation[]; + switchConversation: (id: string, currentMessages: ChatMessage[]) => { conversations: Conversation[]; - messages: Message[]; + messages: ChatMessage[]; sessionKey: string | null; currentAgent: Agent; currentConversationId: string; @@ -74,10 +69,10 @@ export interface ConversationState { conversations: Conversation[]; resetMessages: boolean; }; - setCurrentAgent: (agent: Agent, currentMessages: Message[]) => { + setCurrentAgent: (agent: Agent, currentMessages: ChatMessage[]) => { conversations: Conversation[]; currentAgent: Agent; - messages: Message[]; + messages: ChatMessage[]; sessionKey: string | null; isStreaming: boolean; currentConversationId: string | null; @@ -87,7 +82,7 @@ export interface ConversationState { currentAgent: Agent; }; setCurrentModel: (model: string) => void; - upsertActiveConversation: (currentMessages: Message[]) => Conversation[]; + upsertActiveConversation: (currentMessages: ChatMessage[]) => Conversation[]; getCurrentConversationId: () => string | null; getCurrentAgent: () => Agent | null; getSessionKey: () => string | null; @@ -101,7 +96,7 @@ function generateConvId(): string { return `conv_${Date.now()}_${generateRandomString(4)}`; } -function deriveTitle(messages: Message[]): string { +function deriveTitle(messages: ChatMessage[]): string { const firstUser = messages.find(m => m.role === 'user'); if (firstUser) { const text = firstUser.content.trim(); @@ -155,7 +150,7 @@ export function resolveAgentForConversation(agentId: string | null, agents: Agen function upsertActiveConversation( conversations: Conversation[], - messages: Message[], + messages: ChatMessage[], sessionKey: string | null, currentConversationId: string | null, currentAgent: Agent | null, @@ -199,7 +194,7 @@ export const useConversationStore = create()( sessionKey: null, currentModel: 'glm-4-flash', - newConversation: (currentMessages: Message[]) => { + newConversation: (currentMessages: ChatMessage[]) => { const state = get(); const conversations = upsertActiveConversation( [...state.conversations], currentMessages, state.sessionKey, @@ -213,7 +208,7 @@ export const useConversationStore = create()( return conversations; }, - switchConversation: (id: string, currentMessages: Message[]) => { + switchConversation: (id: string, currentMessages: ChatMessage[]) => { const state = get(); const conversations = upsertActiveConversation( [...state.conversations], currentMessages, state.sessionKey, @@ -251,7 +246,7 @@ export const useConversationStore = create()( return { conversations, resetMessages }; }, - setCurrentAgent: (agent: Agent, currentMessages: Message[]) => { + setCurrentAgent: (agent: Agent, currentMessages: ChatMessage[]) => { const state = get(); if (state.currentAgent?.id === agent.id) { set({ currentAgent: agent }); @@ -328,7 +323,7 @@ export const useConversationStore = create()( setCurrentModel: (model: string) => set({ currentModel: model }), - upsertActiveConversation: (currentMessages: Message[]) => { + upsertActiveConversation: (currentMessages: ChatMessage[]) => { const state = get(); const conversations = upsertActiveConversation( [...state.conversations], currentMessages, state.sessionKey, @@ -344,11 +339,12 @@ export const useConversationStore = create()( }), { name: 'zclaw-conversation-storage', + storage: createJSONStorage(() => createIdbStorageAdapter()), partialize: (state) => ({ conversations: state.conversations, currentModel: state.currentModel, - currentAgentId: state.currentAgent?.id, currentConversationId: state.currentConversationId, + sessionKey: state.sessionKey, }), onRehydrateStorage: () => (state) => { if (state?.conversations) { diff --git a/desktop/src/store/chat/messageStore.ts b/desktop/src/store/chat/messageStore.ts new file mode 100644 index 0000000..a2d61bc --- /dev/null +++ b/desktop/src/store/chat/messageStore.ts @@ -0,0 +1,98 @@ +/** + * MessageStore — manages token tracking and subtask mutations. + * + * Extracted from chatStore.ts as part of the structured refactor (Phase 3). + * + * Design note: The `messages[]` array stays in chatStore because + * `sendMessage` and `initStreamListener` use Zustand's `set((s) => ...)` + * pattern for high-frequency streaming deltas (dozens of updates per second). + * Moving messages out would force every streaming callback through + * `getState().updateMessage()` — adding overhead and breaking the + * producer-writes, consumer-reads separation that Zustand excels at. + * + * This store owns: + * - Token usage counters (totalInputTokens, totalOutputTokens) + * - Subtask mutation helpers (addSubtask, updateSubtask) + * + * Messages are read from chatStore by consumers that need them. + * + * @see docs/superpowers/specs/2026-04-02-chatstore-refactor-design.md §3.3 + */ + +import { create } from 'zustand'; +import type { Subtask } from '../../components/ai'; + +// --------------------------------------------------------------------------- +// State interface +// --------------------------------------------------------------------------- + +export interface MessageState { + totalInputTokens: number; + totalOutputTokens: number; + + // Token tracking + addTokenUsage: (inputTokens: number, outputTokens: number) => void; + getTotalTokens: () => { input: number; output: number; total: number }; + resetTokenUsage: () => void; + + // Subtask mutations (delegated to chatStore internally) + addSubtask: (messageId: string, task: Subtask) => void; + updateSubtask: (messageId: string, taskId: string, updates: Partial) => void; +} + +// --------------------------------------------------------------------------- +// Internal reference to chatStore for message mutations +// --------------------------------------------------------------------------- +let _chatStore: { + getState: () => { + addSubtask: (messageId: string, task: Subtask) => void; + updateSubtask: (messageId: string, taskId: string, updates: Partial) => void; + }; +} | null = null; + +/** + * Inject chatStore reference for subtask delegation. + * Called by chatStore during initialization to avoid circular imports. + */ +export function setMessageStoreChatStore(store: typeof _chatStore): void { + _chatStore = store; +} + +// --------------------------------------------------------------------------- +// Store +// --------------------------------------------------------------------------- + +export const useMessageStore = create()((set, get) => ({ + totalInputTokens: 0, + totalOutputTokens: 0, + + addTokenUsage: (inputTokens: number, outputTokens: number) => + set((state) => ({ + totalInputTokens: state.totalInputTokens + inputTokens, + totalOutputTokens: state.totalOutputTokens + outputTokens, + })), + + getTotalTokens: () => { + const { totalInputTokens, totalOutputTokens } = get(); + return { + input: totalInputTokens, + output: totalOutputTokens, + total: totalInputTokens + totalOutputTokens, + }; + }, + + resetTokenUsage: () => + set({ totalInputTokens: 0, totalOutputTokens: 0 }), + + addSubtask: (messageId: string, task: Subtask) => { + if (_chatStore) { + _chatStore.getState().addSubtask(messageId, task); + } + }, + + updateSubtask: (messageId: string, taskId: string, updates: Partial) => { + if (_chatStore) { + _chatStore.getState().updateSubtask(messageId, taskId, updates); + } + }, +})); diff --git a/desktop/src/store/chat/streamStore.ts b/desktop/src/store/chat/streamStore.ts new file mode 100644 index 0000000..7e79a96 --- /dev/null +++ b/desktop/src/store/chat/streamStore.ts @@ -0,0 +1,660 @@ +/** + * StreamStore — manages streaming orchestration, chat mode, and suggestions. + * + * Extracted from chatStore.ts as part of the structured refactor (Phase 4). + * Responsible for: + * - Stream lifecycle (sendMessage, initStreamListener) + * - Chat mode state (chatMode, setChatMode, getChatModeConfig) + * - Follow-up suggestions + * - Skill search + * + * Design: streamStore holds its own `isStreaming` state and delegates + * message mutations to chatStore via an injected reference. This avoids + * circular imports while keeping high-frequency streaming updates + * (dozens of set() calls per second) on a single Zustand store. + */ + +import { create } from 'zustand'; +import { persist } from 'zustand/middleware'; +import type { AgentStreamDelta } from '../../lib/gateway-client'; +import { getClient } from '../../store/connectionStore'; +import { intelligenceClient } from '../../lib/intelligence-client'; +import { getMemoryExtractor } from '../../lib/memory-extractor'; +import { getSkillDiscovery } from '../../lib/skill-discovery'; +import { useOfflineStore, isOffline } from '../../store/offlineStore'; +import { useConnectionStore } from '../../store/connectionStore'; +import { createLogger } from '../../lib/logger'; +import { speechSynth } from '../../lib/speech-synth'; +import { generateRandomString } from '../../lib/crypto-utils'; +import type { ChatModeType, ChatModeConfig, Subtask } from '../../components/ai'; +import type { ToolCallStep } from '../../components/ai'; +import { CHAT_MODES } from '../../components/ai'; +import { + useConversationStore, + resolveGatewayAgentId, +} from './conversationStore'; +import { useMessageStore } from './messageStore'; + +const log = createLogger('StreamStore'); + +// --------------------------------------------------------------------------- +// Types +// --------------------------------------------------------------------------- + +/** Message shape used internally by streamStore for typed callbacks. */ +interface StreamMsg { + id: string; + role: 'user' | 'assistant' | 'tool' | 'hand' | 'workflow' | 'system'; + content: string; + timestamp: Date; + streaming?: boolean; + optimistic?: boolean; + runId?: string; + error?: string; + thinkingContent?: string; + toolSteps?: ToolCallStep[]; + handName?: string; + handStatus?: string; + handResult?: unknown; + workflowId?: string; + workflowStep?: string; + workflowStatus?: string; + workflowResult?: unknown; + subtasks?: Subtask[]; +} + +/** Shape of the chatStore methods needed by streamStore. */ +interface ChatStoreAccess { + addMessage: (msg: StreamMsg) => void; + updateMessages: (updater: (msgs: StreamMsg[]) => StreamMsg[]) => void; + getMessages: () => StreamMsg[]; + setChatStoreState: (partial: Record) => void; +} + +export interface StreamState { + isStreaming: boolean; + isLoading: boolean; + chatMode: ChatModeType; + suggestions: string[]; + /** Run ID of the currently active stream (null when idle). */ + activeRunId: string | null; + + // Core streaming + sendMessage: (content: string) => Promise; + initStreamListener: () => () => void; + /** Cancel the active stream: resets state, marks message cancelled, saves conversation. */ + cancelStream: () => void; + + // Chat mode + setChatMode: (mode: ChatModeType) => void; + getChatModeConfig: () => ChatModeConfig; + + // Suggestions + setSuggestions: (suggestions: string[]) => void; + + // Skill search + searchSkills: (query: string) => { + results: Array<{ id: string; name: string; description: string }>; + totalAvailable: number; + }; + + // Loading + setIsLoading: (loading: boolean) => void; +} + +// --------------------------------------------------------------------------- +// Follow-up suggestion generator +// --------------------------------------------------------------------------- + +function generateFollowUpSuggestions(content: string): string[] { + const suggestions: string[] = []; + const lower = content.toLowerCase(); + + const patterns: Array<{ keywords: string[]; suggestion: string }> = [ + { keywords: ['代码', 'code', 'function', '函数', '实现'], suggestion: '解释这段代码的工作原理' }, + { keywords: ['错误', 'error', 'bug', '问题'], suggestion: '如何调试这个问题?' }, + { keywords: ['数据', 'data', '分析', '统计'], suggestion: '可视化这些数据' }, + { keywords: ['步骤', 'step', '流程', '方案'], suggestion: '详细说明第一步该怎么做' }, + { keywords: ['可以', '建议', '推荐', '试试'], suggestion: '还有其他方案吗?' }, + { keywords: ['文件', 'file', '保存', '写入'], suggestion: '查看生成的文件内容' }, + { keywords: ['搜索', 'search', '查找', 'research'], suggestion: '搜索更多相关信息' }, + ]; + + for (const { keywords, suggestion } of patterns) { + if (keywords.some(kw => lower.includes(kw))) { + if (!suggestions.includes(suggestion)) { + suggestions.push(suggestion); + } + } + if (suggestions.length >= 3) break; + } + + const generic = ['继续深入分析', '换个角度看看', '用简单的话解释']; + while (suggestions.length < 3) { + const next = generic.find(g => !suggestions.includes(g)); + if (next) suggestions.push(next); + else break; + } + + return suggestions; +} + +// --------------------------------------------------------------------------- +// ChatStore injection (avoids circular imports) +// --------------------------------------------------------------------------- + +let _chat: ChatStoreAccess | null = null; + +/** + * Inject chatStore access for message mutations. + * Called by chatStore after creation. + */ +export function injectChatStore(access: ChatStoreAccess): void { + _chat = access; +} + +// --------------------------------------------------------------------------- +// Store +// --------------------------------------------------------------------------- + +export const useStreamStore = create()( + persist( + (set, get) => ({ + isStreaming: false, + isLoading: false, + chatMode: 'thinking' as ChatModeType, + suggestions: [], + activeRunId: null as string | null, + + // ── Chat Mode ── + + setChatMode: (mode: ChatModeType) => set({ chatMode: mode }), + + getChatModeConfig: () => CHAT_MODES[get().chatMode].config, + + setSuggestions: (suggestions: string[]) => set({ suggestions }), + + setIsLoading: (loading: boolean) => set({ isLoading: loading }), + + // ── Skill Search ── + + searchSkills: (query: string) => { + const discovery = getSkillDiscovery(); + const result = discovery.searchSkills(query); + return { + results: result.results.map(s => ({ id: s.id, name: s.name, description: s.description })), + totalAvailable: result.totalAvailable, + }; + }, + + // ── Core: sendMessage ── + + sendMessage: async (content: string) => { + if (get().isStreaming) return; + if (!_chat) { + log.warn('sendMessage called before chatStore injection'); + return; + } + + const convStore = useConversationStore.getState(); + const currentAgent = convStore.currentAgent; + const sessionKey = convStore.sessionKey; + + set({ suggestions: [] }); + const effectiveSessionKey = sessionKey || crypto.randomUUID(); + const effectiveAgentId = resolveGatewayAgentId(currentAgent); + const agentId = currentAgent?.id || 'zclaw-main'; + + // Offline path + if (isOffline()) { + const { queueMessage } = useOfflineStore.getState(); + const queueId = queueMessage(content, effectiveAgentId, effectiveSessionKey); + log.debug(`Offline - message queued: ${queueId}`); + + _chat.addMessage({ + id: `system_${Date.now()}`, + role: 'system', + content: `后端服务不可用,消息已保存到本地队列。恢复连接后将自动发送。`, + timestamp: new Date(), + }); + + _chat.addMessage({ + id: `user_${Date.now()}`, + role: 'user', + content, + timestamp: new Date(), + }); + return; + } + + const streamStartTime = Date.now(); + _chat.addMessage({ + id: `user_${streamStartTime}`, + role: 'user', + content, + timestamp: new Date(streamStartTime), + optimistic: true, + }); + + const assistantId = `assistant_${Date.now()}`; + _chat.addMessage({ + id: assistantId, + role: 'assistant', + content: '', + timestamp: new Date(), + streaming: true, + }); + set({ isStreaming: true, activeRunId: null }); + + try { + const client = getClient(); + const connectionState = useConnectionStore.getState().connectionState; + + if (connectionState !== 'connected') { + throw new Error(`Not connected (state: ${connectionState})`); + } + + let runId = `run_${Date.now()}`; + + if (!useConversationStore.getState().sessionKey) { + useConversationStore.setState({ sessionKey: effectiveSessionKey }); + } + + const result = await client.chatStream( + content, + { + onDelta: (delta: string) => { + _chat?.updateMessages(msgs => + msgs.map(m => + m.id === assistantId + ? { ...m, content: m.content + delta } + : m + ) + ); + }, + onThinkingDelta: (delta: string) => { + _chat?.updateMessages(msgs => + msgs.map(m => + m.id === assistantId + ? { ...m, thinkingContent: (m.thinkingContent || '') + delta } + : m + ) + ); + }, + onTool: (tool: string, input: string, output: string) => { + if (output) { + // toolEnd: find the last running step for this tool and complete it + _chat?.updateMessages(msgs => + msgs.map(m => { + if (m.id !== assistantId) return m; + const steps = [...(m.toolSteps || [])]; + for (let i = steps.length - 1; i >= 0; i--) { + if (steps[i].toolName === tool && steps[i].status === 'running') { + steps[i] = { ...steps[i], output, status: 'completed' as const }; + break; + } + } + return { ...m, toolSteps: steps }; + }) + ); + } else { + // toolStart: create new running step + const step: ToolCallStep = { + id: `step_${Date.now()}_${generateRandomString(4)}`, + toolName: tool, + input, + output: '', + status: 'running', + timestamp: new Date(), + }; + _chat?.updateMessages(msgs => + msgs.map(m => + m.id === assistantId + ? { ...m, toolSteps: [...(m.toolSteps || []), step] } + : m + ) + ); + } + }, + onHand: (name: string, status: string, result?: unknown) => { + const handMsg: StreamMsg = { + id: `hand_${Date.now()}_${generateRandomString(4)}`, + role: 'hand', + content: result + ? (typeof result === 'string' ? result : JSON.stringify(result, null, 2)) + : `Hand: ${name} - ${status}`, + timestamp: new Date(), + runId, + handName: name, + handStatus: status, + handResult: result, + }; + _chat?.updateMessages(msgs => [...msgs, handMsg]); + + if (name === 'speech' && status === 'completed' && result && typeof result === 'object') { + const res = result as Record; + if (res.tts_method === 'browser' && typeof res.text === 'string' && res.text) { + speechSynth.speak({ + text: res.text as string, + voice: (res.voice as string) || undefined, + language: (res.language as string) || undefined, + rate: typeof res.rate === 'number' ? res.rate : undefined, + pitch: typeof res.pitch === 'number' ? res.pitch : undefined, + volume: typeof res.volume === 'number' ? res.volume : undefined, + }).catch((err: unknown) => { + const logger = createLogger('speech-synth'); + logger.warn('Browser TTS failed', { error: String(err) }); + }); + } + } + }, + onComplete: (inputTokens?: number, outputTokens?: number) => { + const currentMsgs = _chat?.getMessages(); + + if (currentMsgs) { + useConversationStore.getState().upsertActiveConversation(currentMsgs); + } + + _chat?.updateMessages(msgs => + msgs.map(m => { + if (m.id === assistantId) { + return { ...m, streaming: false, runId }; + } + if (m.optimistic) { + return { ...m, optimistic: false }; + } + return m; + }) + ); + set({ isStreaming: false, activeRunId: null }); + + if (inputTokens !== undefined && outputTokens !== undefined) { + useMessageStore.getState().addTokenUsage(inputTokens, outputTokens); + _chat?.setChatStoreState({ + totalInputTokens: useMessageStore.getState().totalInputTokens, + totalOutputTokens: useMessageStore.getState().totalOutputTokens, + }); + } + + // Async memory extraction + const msgs = _chat?.getMessages() || []; + const filtered = msgs + .filter(m => m.role === 'user' || m.role === 'assistant') + .map(m => ({ role: m.role, content: m.content })); + const convId = useConversationStore.getState().currentConversationId; + getMemoryExtractor().extractFromConversation(filtered, agentId, convId ?? undefined).catch(err => { + log.warn('Memory extraction failed:', err); + }); + intelligenceClient.reflection.recordConversation().catch(err => { + log.warn('Recording conversation failed:', err); + }); + intelligenceClient.reflection.shouldReflect().then(shouldReflect => { + if (shouldReflect) { + intelligenceClient.reflection.reflect(agentId, []).catch(err => { + log.warn('Reflection failed:', err); + }); + } + }); + + // Follow-up suggestions + const latestMsgs = _chat?.getMessages() || []; + const completedMsg = latestMsgs.find(m => m.id === assistantId); + if (completedMsg?.content) { + const suggestions = generateFollowUpSuggestions(completedMsg.content); + if (suggestions.length > 0) { + get().setSuggestions(suggestions); + } + } + }, + onError: (error: string) => { + _chat?.updateMessages(msgs => + msgs.map(m => + m.id === assistantId + ? { ...m, content: `⚠️ ${error}`, streaming: false, error } + : m.role === 'user' && m.optimistic && m.timestamp.getTime() >= streamStartTime + ? { ...m, optimistic: false } + : m + ) + ); + set({ isStreaming: false, activeRunId: null }); + }, + }, + { + sessionKey: effectiveSessionKey, + agentId: effectiveAgentId, + thinking_enabled: get().getChatModeConfig().thinking_enabled, + reasoning_effort: get().getChatModeConfig().reasoning_effort, + plan_mode: get().getChatModeConfig().plan_mode, + } + ); + + if (result?.runId) { + runId = result.runId; + set({ activeRunId: runId }); + } + + if (!sessionKey) { + useConversationStore.setState({ sessionKey: effectiveSessionKey }); + } + + _chat?.updateMessages(msgs => + msgs.map(m => + m.id === assistantId ? { ...m, runId } : m + ) + ); + } catch (err: unknown) { + const errorMessage = err instanceof Error ? err.message : '无法连接 Gateway'; + _chat?.updateMessages(msgs => + msgs.map(m => + m.id === assistantId + ? { + ...m, + content: `⚠️ ${errorMessage}`, + streaming: false, + error: errorMessage, + } + : m.role === 'user' && m.optimistic && m.timestamp.getTime() >= streamStartTime + ? { ...m, optimistic: false } + : m + ) + ); + set({ isStreaming: false, activeRunId: null }); + } + }, + + // ── Cancel Stream ── + + cancelStream: () => { + if (!_chat) return; + const { activeRunId, isStreaming } = get(); + if (!isStreaming) return; + + // 1. Tell backend to abort — use sessionKey (which is the sessionId in Tauri) + try { + const client = getClient(); + if ('cancelStream' in client) { + const sessionId = useConversationStore.getState().sessionKey || activeRunId || ''; + (client as { cancelStream: (id: string) => void }).cancelStream(sessionId); + } + } catch { + // Backend cancel is best-effort; proceed with local cleanup + } + + // 2. Mark the streaming message as cancelled + _chat.updateMessages(msgs => + msgs.map(m => { + if (m.streaming) { + return { + ...m, + streaming: false, + error: m.content ? undefined : '已取消', + }; + } + if (m.optimistic) { + return { ...m, optimistic: false }; + } + return m; + }) + ); + + // 3. Immediately persist the conversation + const currentMsgs = _chat.getMessages(); + if (currentMsgs) { + useConversationStore.getState().upsertActiveConversation(currentMsgs); + } + + // 4. Reset streaming state and clear sessionKey so next send gets a fresh session + set({ isStreaming: false, activeRunId: null }); + useConversationStore.setState({ sessionKey: null }); + log.info('Stream cancelled by user'); + }, + + // ── Agent Stream Listener ── + + initStreamListener: () => { + const client = getClient(); + + if (!('onAgentStream' in client)) { + return () => {}; + } + + const unsubscribe = client.onAgentStream((delta: AgentStreamDelta) => { + const msgs = _chat?.getMessages() || []; + + const streamingMsg = [...msgs] + .reverse() + .find(m => ( + m.role === 'assistant' + && m.streaming + && ( + (delta.runId && m.runId === delta.runId) + || (!delta.runId && m.runId === null) + ) + )) + || [...msgs] + .reverse() + .find(m => m.role === 'assistant' && m.streaming); + + if (!streamingMsg) return; + + if (delta.stream === 'assistant' && (delta.delta || delta.content)) { + _chat?.updateMessages(ms => + ms.map(m => + m.id === streamingMsg.id + ? { ...m, content: m.content + (delta.delta || delta.content || '') } + : m + ) + ); + } else if (delta.stream === 'tool') { + if (delta.toolOutput) { + // toolEnd: find the last running step for this tool and complete it + _chat?.updateMessages(ms => + ms.map(m => { + if (m.id !== streamingMsg.id) return m; + const steps = [...(m.toolSteps || [])]; + for (let i = steps.length - 1; i >= 0; i--) { + if (steps[i].toolName === (delta.tool || 'unknown') && steps[i].status === 'running') { + steps[i] = { ...steps[i], output: delta.toolOutput, status: 'completed' as const }; + break; + } + } + return { ...m, toolSteps: steps }; + }) + ); + } else { + // toolStart: create new running step + const step: ToolCallStep = { + id: `step_${Date.now()}_${generateRandomString(4)}`, + toolName: delta.tool || 'unknown', + input: delta.toolInput, + output: '', + status: 'running', + timestamp: new Date(), + }; + _chat?.updateMessages(ms => + ms.map(m => + m.id === streamingMsg.id + ? { ...m, toolSteps: [...(m.toolSteps || []), step] } + : m + ) + ); + } + } else if (delta.stream === 'lifecycle') { + if (delta.phase === 'end' || delta.phase === 'error') { + if (delta.phase === 'end') { + const currentMsgs = _chat?.getMessages(); + if (currentMsgs) { + useConversationStore.getState().upsertActiveConversation(currentMsgs); + } + } + _chat?.updateMessages(ms => + ms.map(m => { + if (m.id === streamingMsg.id) { + return { + ...m, + streaming: false, + error: delta.phase === 'error' ? delta.error : undefined, + }; + } + if (m.optimistic) { + return { ...m, optimistic: false }; + } + return m; + }) + ); + set({ isStreaming: false, activeRunId: null }); + + if (delta.phase === 'end') { + const latestMsgs = _chat?.getMessages() || []; + const completedMsg = latestMsgs.find(m => m.id === streamingMsg.id); + if (completedMsg?.content) { + const suggestions = generateFollowUpSuggestions(completedMsg.content); + if (suggestions.length > 0) { + get().setSuggestions(suggestions); + } + } + } + } + } else if (delta.stream === 'hand') { + const handMsg: StreamMsg = { + id: `hand_${Date.now()}_${generateRandomString(4)}`, + role: 'hand', + content: delta.handResult + ? (typeof delta.handResult === 'string' ? delta.handResult : JSON.stringify(delta.handResult, null, 2)) + : `Hand: ${delta.handName || 'unknown'} - ${delta.handStatus || 'triggered'}`, + timestamp: new Date(), + runId: delta.runId, + handName: delta.handName, + handStatus: delta.handStatus, + handResult: delta.handResult, + }; + _chat?.updateMessages(ms => [...ms, handMsg]); + } else if (delta.stream === 'workflow') { + const workflowMsg: StreamMsg = { + id: `workflow_${Date.now()}_${generateRandomString(4)}`, + role: 'workflow', + content: delta.workflowResult + ? (typeof delta.workflowResult === 'string' ? delta.workflowResult : JSON.stringify(delta.workflowResult, null, 2)) + : `Workflow: ${delta.workflowId || 'unknown'} step ${delta.workflowStep || '?'} - ${delta.workflowStatus || 'running'}`, + timestamp: new Date(), + runId: delta.runId, + workflowId: delta.workflowId, + workflowStep: delta.workflowStep, + workflowStatus: delta.workflowStatus, + workflowResult: delta.workflowResult, + }; + _chat?.updateMessages(ms => [...ms, workflowMsg]); + } + }); + + return unsubscribe; + }, +}), + { + name: 'zclaw-stream-storage', + partialize: (state) => ({ + chatMode: state.chatMode, + }), + }, + ), +); diff --git a/desktop/src/store/chatStore.ts b/desktop/src/store/chatStore.ts index 06b7182..a9deb66 100644 --- a/desktop/src/store/chatStore.ts +++ b/desktop/src/store/chatStore.ts @@ -1,20 +1,32 @@ import { create } from 'zustand'; import { persist } from 'zustand/middleware'; -import type { AgentStreamDelta } from '../lib/gateway-client'; -import { getClient } from './connectionStore'; -import { intelligenceClient } from '../lib/intelligence-client'; -import { getMemoryExtractor } from '../lib/memory-extractor'; -import { getSkillDiscovery } from '../lib/skill-discovery'; -import { useOfflineStore, isOffline } from './offlineStore'; -import { useConnectionStore } from './connectionStore'; -import { createLogger } from '../lib/logger'; -import { speechSynth } from '../lib/speech-synth'; -import { generateRandomString } from '../lib/crypto-utils'; import type { ChatModeType, ChatModeConfig, Subtask } from '../components/ai'; import type { ToolCallStep } from '../components/ai'; -import { CHAT_MODES } from '../components/ai'; +import { + useConversationStore, + resolveGatewayAgentId, + toChatAgent, + DEFAULT_AGENT, + type Agent, + type AgentProfileLike, + type Conversation, +} from './chat/conversationStore'; +import { useMessageStore, setMessageStoreChatStore } from './chat/messageStore'; +import { useStreamStore, injectChatStore } from './chat/streamStore'; -const log = createLogger('ChatStore'); +// --------------------------------------------------------------------------- +// Re-export for backward compatibility +// --------------------------------------------------------------------------- + +export type { Agent, AgentProfileLike, Conversation }; +export { toChatAgent, DEFAULT_AGENT, resolveGatewayAgentId }; +export { useConversationStore } from './chat/conversationStore'; +export { useMessageStore } from './chat/messageStore'; +export { useStreamStore } from './chat/streamStore'; + +// --------------------------------------------------------------------------- +// Message types +// --------------------------------------------------------------------------- export interface MessageFile { name: string; @@ -40,68 +52,32 @@ export interface Message { toolInput?: string; toolOutput?: string; error?: string; - // Hand event fields handName?: string; handStatus?: string; handResult?: unknown; - // Workflow event fields workflowId?: string; workflowStep?: string; workflowStatus?: string; workflowResult?: unknown; - // Output files and code blocks files?: MessageFile[]; codeBlocks?: CodeBlock[]; - // AI Enhancement fields (DeerFlow-inspired) - thinkingContent?: string; // Extended thinking/reasoning content - subtasks?: Subtask[]; // Sub-agent task tracking - toolSteps?: ToolCallStep[]; // Tool call steps chain (DeerFlow-inspired) - // Optimistic message flag (Phase 4: DeerFlow-inspired 3-phase optimistic rendering) - optimistic?: boolean; // true = awaiting server confirmation, false/undefined = confirmed + thinkingContent?: string; + subtasks?: Subtask[]; + toolSteps?: ToolCallStep[]; + optimistic?: boolean; } -export interface Conversation { - id: string; - title: string; - messages: Message[]; - sessionKey: string | null; - agentId: string | null; - createdAt: Date; - updatedAt: Date; -} - -export interface Agent { - id: string; - name: string; - icon: string; - color: string; - lastMessage: string; - time: string; -} - -export interface AgentProfileLike { - id: string; - name: string; - nickname?: string; - role?: string; -} +// --------------------------------------------------------------------------- +// ChatState — messages + facade delegation +// --------------------------------------------------------------------------- interface ChatState { messages: Message[]; - conversations: Conversation[]; - currentConversationId: string | null; - agents: Agent[]; - currentAgent: Agent | null; isStreaming: boolean; isLoading: boolean; - currentModel: string; - sessionKey: string | null; - // Token usage tracking totalInputTokens: number; totalOutputTokens: number; - // Chat mode (DeerFlow-inspired) chatMode: ChatModeType; - // Follow-up suggestions suggestions: string[]; addMessage: (message: Message) => void; @@ -112,13 +88,13 @@ interface ChatState { setCurrentModel: (model: string) => void; sendMessage: (content: string) => Promise; initStreamListener: () => () => void; + cancelStream: () => void; newConversation: () => void; switchConversation: (id: string) => void; deleteConversation: (id: string) => void; addTokenUsage: (inputTokens: number, outputTokens: number) => void; getTotalTokens: () => { input: number; output: number; total: number }; searchSkills: (query: string) => { results: Array<{ id: string; name: string; description: string }>; totalAvailable: number }; - // Chat mode and suggestions (DeerFlow-inspired) setChatMode: (mode: ChatModeType) => void; getChatModeConfig: () => ChatModeConfig; setSuggestions: (suggestions: string[]) => void; @@ -126,142 +102,18 @@ interface ChatState { updateSubtask: (messageId: string, taskId: string, updates: Partial) => void; } -function generateConvId(): string { - return `conv_${Date.now()}_${generateRandomString(4)}`; -} - -function deriveTitle(messages: Message[]): string { - const firstUser = messages.find(m => m.role === 'user'); - if (firstUser) { - const text = firstUser.content.trim(); - return text.length > 30 ? text.slice(0, 30) + '...' : text; - } - return '新对话'; -} - -const DEFAULT_AGENT: Agent = { - id: '1', - name: 'ZCLAW', - icon: '🦞', - color: 'bg-gradient-to-br from-orange-500 to-red-500', - lastMessage: '发送消息开始对话', - time: '', -}; - -export function toChatAgent(profile: AgentProfileLike): Agent { - return { - id: profile.id, - name: profile.name, - icon: profile.nickname?.slice(0, 1) || '🦞', - color: 'bg-gradient-to-br from-orange-500 to-red-500', - lastMessage: profile.role || '新分身', - time: '', - }; -} - -function resolveConversationAgentId(agent: Agent | null): string | null { - if (!agent || agent.id === DEFAULT_AGENT.id) { - return null; - } - return agent.id; -} - -function resolveGatewayAgentId(agent: Agent | null): string | undefined { - if (!agent || agent.id === DEFAULT_AGENT.id || agent.id.startsWith('clone_')) { - return undefined; - } - return agent.id; -} - -function resolveAgentForConversation(agentId: string | null, agents: Agent[]): Agent { - if (!agentId) { - return DEFAULT_AGENT; - } - return agents.find((agent) => agent.id === agentId) || DEFAULT_AGENT; -} - -function upsertActiveConversation( - conversations: Conversation[], - state: Pick -): Conversation[] { - if (state.messages.length === 0) { - return conversations; - } - - const currentId = state.currentConversationId || generateConvId(); - const existingIdx = conversations.findIndex((conversation) => conversation.id === currentId); - const nextConversation: Conversation = { - id: currentId, - title: deriveTitle(state.messages), - messages: [...state.messages], - sessionKey: state.sessionKey, - agentId: resolveConversationAgentId(state.currentAgent), - createdAt: existingIdx >= 0 ? conversations[existingIdx].createdAt : new Date(), - updatedAt: new Date(), - }; - - if (existingIdx >= 0) { - conversations[existingIdx] = nextConversation; - return conversations; - } - - return [nextConversation, ...conversations]; -} - -/** - * Generate follow-up suggestions based on assistant response content. - * Uses keyword heuristics to suggest contextually relevant follow-ups. - */ -function generateFollowUpSuggestions(content: string): string[] { - const suggestions: string[] = []; - const lower = content.toLowerCase(); - - const patterns: Array<{ keywords: string[]; suggestion: string }> = [ - { keywords: ['代码', 'code', 'function', '函数', '实现'], suggestion: '解释这段代码的工作原理' }, - { keywords: ['错误', 'error', 'bug', '问题'], suggestion: '如何调试这个问题?' }, - { keywords: ['数据', 'data', '分析', '统计'], suggestion: '可视化这些数据' }, - { keywords: ['步骤', 'step', '流程', '方案'], suggestion: '详细说明第一步该怎么做' }, - { keywords: ['可以', '建议', '推荐', '试试'], suggestion: '还有其他方案吗?' }, - { keywords: ['文件', 'file', '保存', '写入'], suggestion: '查看生成的文件内容' }, - { keywords: ['搜索', 'search', '查找', 'research'], suggestion: '搜索更多相关信息' }, - ]; - - for (const { keywords, suggestion } of patterns) { - if (keywords.some(kw => lower.includes(kw))) { - if (!suggestions.includes(suggestion)) { - suggestions.push(suggestion); - } - } - if (suggestions.length >= 3) break; - } - - // Always add a generic follow-up if we have fewer than 3 - const generic = ['继续深入分析', '换个角度看看', '用简单的话解释']; - while (suggestions.length < 3) { - const next = generic.find(g => !suggestions.includes(g)); - if (next) suggestions.push(next); - else break; - } - - return suggestions; -} - export const useChatStore = create()( persist( (set, get) => ({ messages: [], - conversations: [], - currentConversationId: null, - agents: [DEFAULT_AGENT], - currentAgent: DEFAULT_AGENT, + // Mirrors from streamStore for backward compat selectors isStreaming: false, isLoading: false, - currentModel: 'glm-4-flash', - sessionKey: null, - totalInputTokens: 0, - totalOutputTokens: 0, chatMode: 'thinking' as ChatModeType, suggestions: [], + totalInputTokens: 0, + totalOutputTokens: 0, + addMessage: (message: Message) => set((state) => ({ messages: [...state.messages, message] })), @@ -274,423 +126,91 @@ export const useChatStore = create()( setIsLoading: (loading) => set({ isLoading: loading }), - setCurrentAgent: (agent) => - set((state) => { - if (state.currentAgent?.id === agent.id) { - return { currentAgent: agent }; - } - - // Save current conversation before switching - const conversations = upsertActiveConversation([...state.conversations], state); - - // Try to find existing conversation for this agent - // DEFAULT_AGENT conversations are stored with agentId: null (via resolveConversationAgentId), - // so we need to match both the agent's ID and null for default agent lookups. - const agentConversation = conversations.find(c => - c.agentId === agent.id || - (agent.id === DEFAULT_AGENT.id && c.agentId === null) - ); - - if (agentConversation) { - // Restore the agent's previous conversation - return { - conversations, - currentAgent: agent, - messages: [...agentConversation.messages], - sessionKey: agentConversation.sessionKey, - isStreaming: false, - currentConversationId: agentConversation.id, - }; - } - - // No existing conversation, start fresh - return { - conversations, - currentAgent: agent, - messages: [], - sessionKey: null, - isStreaming: false, - currentConversationId: null, - }; - }), - - syncAgents: (profiles) => - set((state) => { - const cloneAgents = profiles.length > 0 ? profiles.map(toChatAgent) : []; - // Always include DEFAULT_AGENT so users can switch back to default conversations - const agents = cloneAgents.length > 0 - ? [DEFAULT_AGENT, ...cloneAgents] - : [DEFAULT_AGENT]; - const currentAgent = state.currentConversationId - ? resolveAgentForConversation( - state.conversations.find((conversation) => conversation.id === state.currentConversationId)?.agentId || null, - agents - ) - : state.currentAgent - ? agents.find((agent) => agent.id === state.currentAgent?.id) || agents[0] - : agents[0]; - - // Safety net: if rehydration failed to restore messages (onRehydrateStorage - // direct mutation doesn't trigger re-renders), restore them here via set(). - let messages = state.messages; - let sessionKey = state.sessionKey; - if (messages.length === 0 && state.currentConversationId && state.conversations.length > 0) { - const conv = state.conversations.find(c => c.id === state.currentConversationId); - if (conv && conv.messages.length > 0) { - messages = conv.messages.map(m => ({ ...m })); - sessionKey = conv.sessionKey; - } - } - - return { agents, currentAgent, messages, sessionKey }; - }), - - setCurrentModel: (model) => set({ currentModel: model }), - - newConversation: () => { - const state = get(); - const conversations = upsertActiveConversation([...state.conversations], state); + // ── Facade: conversation coordination ── + setCurrentAgent: (agent: Agent) => { + const messages = get().messages; + const result = useConversationStore.getState().setCurrentAgent(agent, messages); set({ - conversations, - messages: [], - sessionKey: null, + messages: result.messages as Message[], isStreaming: false, - currentConversationId: null, }); }, - switchConversation: (id: string) => { - const state = get(); - const conversations = upsertActiveConversation([...state.conversations], state); + syncAgents: (profiles: AgentProfileLike[]) => { + useConversationStore.getState().syncAgents(profiles); + const convStore = useConversationStore.getState(); + let messages = get().messages; + if (messages.length === 0 && convStore.currentConversationId && convStore.conversations.length > 0) { + const conv = convStore.conversations.find(c => c.id === convStore.currentConversationId); + if (conv && conv.messages.length > 0) { + messages = conv.messages.map(m => ({ ...m })) as Message[]; + } + } + set({ messages }); + }, - const target = conversations.find(c => c.id === id); - if (target) { - set({ - conversations, - messages: [...target.messages], - sessionKey: target.sessionKey, - currentAgent: resolveAgentForConversation(target.agentId, state.agents), - currentConversationId: target.id, - isStreaming: false, - }); + setCurrentModel: (model: string) => { + useConversationStore.getState().setCurrentModel(model); + }, + + newConversation: () => { + const messages = get().messages; + useConversationStore.getState().newConversation(messages); + set({ messages: [], isStreaming: false }); + }, + + switchConversation: (id: string) => { + const messages = get().messages; + const result = useConversationStore.getState().switchConversation(id, messages); + if (result) { + set({ messages: result.messages as Message[], isStreaming: false }); } }, deleteConversation: (id: string) => { - const state = get(); - const conversations = state.conversations.filter(c => c.id !== id); - if (state.currentConversationId === id) { - set({ conversations, messages: [], sessionKey: null, currentConversationId: null, isStreaming: false }); - } else { - set({ conversations }); + const convStore = useConversationStore.getState(); + const result = convStore.deleteConversation(id, convStore.currentConversationId); + if (result.resetMessages) { + set({ messages: [], isStreaming: false }); } }, - sendMessage: async (content: string) => { - // Concurrency guard: prevent rapid double-click bypassing UI-level isStreaming check. - // React re-render is async — two clicks within the same frame both read isStreaming=false. - if (get().isStreaming) return; + // ── Token tracking — delegated to messageStore ── - const { addMessage, currentAgent, sessionKey } = get(); - // Clear stale suggestions when user sends a new message - set({ suggestions: [] }); - const effectiveSessionKey = sessionKey || crypto.randomUUID(); - const effectiveAgentId = resolveGatewayAgentId(currentAgent); - const agentId = currentAgent?.id || 'zclaw-main'; - - // Check if offline - queue message instead of sending - if (isOffline()) { - const { queueMessage } = useOfflineStore.getState(); - const queueId = queueMessage(content, effectiveAgentId, effectiveSessionKey); - log.debug(`Offline - message queued: ${queueId}`); - - // Show a system message about offline queueing - const systemMsg: Message = { - id: `system_${Date.now()}`, - role: 'system', - content: `后端服务不可用,消息已保存到本地队列。恢复连接后将自动发送。`, - timestamp: new Date(), - }; - addMessage(systemMsg); - - // Add user message for display - const userMsg: Message = { - id: `user_${Date.now()}`, - role: 'user', - content, - timestamp: new Date(), - }; - addMessage(userMsg); - return; - } - - // Context compaction is handled by the kernel (AgentLoop with_compaction_threshold). - // Frontend no longer performs duplicate compaction — see crates/zclaw-runtime/src/compaction.rs. - // Memory context injection is handled by backend MemoryMiddleware (before_completion), - // which injects relevant memories into the system prompt. Frontend must NOT duplicate - // this by embedding old conversation memories into the user message content — that causes - // context leaking (old conversations appearing in new chat thinking/output). - - // Add user message (original content for display) - // Mark as optimistic -- will be cleared when server confirms via onComplete - const streamStartTime = Date.now(); - const userMsg: Message = { - id: `user_${streamStartTime}`, - role: 'user', - content, - timestamp: new Date(streamStartTime), - optimistic: true, - }; - addMessage(userMsg); - - // Create placeholder assistant message for streaming - const assistantId = `assistant_${Date.now()}`; - const assistantMsg: Message = { - id: assistantId, - role: 'assistant', - content: '', - timestamp: new Date(), - streaming: true, - }; - addMessage(assistantMsg); - set({ isStreaming: true }); - - try { - // Use the connected client from connectionStore (supports both GatewayClient and KernelClient) - const client = getClient(); - - // Check connection state first - const connectionState = useConnectionStore.getState().connectionState; - - if (connectionState !== 'connected') { - // Connection lost during send - update error - throw new Error(`Not connected (state: ${connectionState})`); - } - - // Declare runId before chatStream so callbacks can access it - let runId = `run_${Date.now()}`; - - // F5: Persist sessionKey before starting stream to survive page reload mid-stream - if (!get().sessionKey) { - set({ sessionKey: effectiveSessionKey }); - } - - // Try streaming first (ZCLAW WebSocket) - const result = await client.chatStream( - content, - { - onDelta: (delta: string) => { - // Update message content directly (works for both KernelClient and GatewayClient) - set((s) => ({ - messages: s.messages.map((m) => - m.id === assistantId - ? { ...m, content: m.content + delta } - : m - ), - })); - }, - onThinkingDelta: (delta: string) => { - set((s) => ({ - messages: s.messages.map((m) => - m.id === assistantId - ? { ...m, thinkingContent: (m.thinkingContent || '') + delta } - : m - ), - })); - }, - onTool: (tool: string, input: string, output: string) => { - const step: ToolCallStep = { - id: `step_${Date.now()}_${generateRandomString(4)}`, - toolName: tool, - input, - output, - status: output ? 'completed' : 'running', - timestamp: new Date(), - }; - // Add step to the streaming assistant message's toolSteps - set((s) => ({ - messages: s.messages.map((m) => - m.id === assistantId - ? { ...m, toolSteps: [...(m.toolSteps || []), step] } - : m - ), - })); - }, - onHand: (name: string, status: string, result?: unknown) => { - const handMsg: Message = { - id: `hand_${Date.now()}_${generateRandomString(4)}`, - role: 'hand', - content: result - ? (typeof result === 'string' ? result : JSON.stringify(result, null, 2)) - : `Hand: ${name} - ${status}`, - timestamp: new Date(), - runId, - handName: name, - handStatus: status, - handResult: result, - }; - set((state) => ({ messages: [...state.messages, handMsg] })); - - // Trigger browser TTS when SpeechHand completes with browser method - if (name === 'speech' && status === 'completed' && result && typeof result === 'object') { - const res = result as Record; - if (res.tts_method === 'browser' && typeof res.text === 'string' && res.text) { - speechSynth.speak({ - text: res.text as string, - voice: (res.voice as string) || undefined, - language: (res.language as string) || undefined, - rate: typeof res.rate === 'number' ? res.rate : undefined, - pitch: typeof res.pitch === 'number' ? res.pitch : undefined, - volume: typeof res.volume === 'number' ? res.volume : undefined, - }).catch((err: unknown) => { - const logger = createLogger('speech-synth'); - logger.warn('Browser TTS failed', { error: String(err) }); - }); - } - } - }, - onComplete: (inputTokens?: number, outputTokens?: number) => { - const state = get(); - - // Save conversation to persist across refresh - const conversations = upsertActiveConversation([...state.conversations], state); - const currentConvId = state.currentConversationId || conversations[0]?.id; - - set({ - isStreaming: false, - conversations, - currentConversationId: currentConvId, - messages: state.messages.map((m) => { - if (m.id === assistantId) { - return { ...m, streaming: false, runId }; - } - // Clear optimistic flag on user messages (server confirmed) - if (m.optimistic) { - return { ...m, optimistic: false }; - } - return m; - }), - }); - - // Track token usage if provided (KernelClient provides these) - if (inputTokens !== undefined && outputTokens !== undefined) { - get().addTokenUsage(inputTokens, outputTokens); - } - - // Async memory extraction after stream completes - const msgs = get().messages - .filter(m => m.role === 'user' || m.role === 'assistant') - .map(m => ({ role: m.role, content: m.content })); - getMemoryExtractor().extractFromConversation(msgs, agentId, get().currentConversationId ?? undefined).catch(err => { - log.warn('Memory extraction failed:', err); - }); - // Track conversation for reflection trigger - intelligenceClient.reflection.recordConversation().catch(err => { - log.warn('Recording conversation failed:', err); - }); - intelligenceClient.reflection.shouldReflect().then(shouldReflect => { - if (shouldReflect) { - intelligenceClient.reflection.reflect(agentId, []).catch(err => { - log.warn('Reflection failed:', err); - }); - } - }); - - // Generate follow-up suggestions (DeerFlow-inspired) - const assistantMsg = get().messages.find(m => m.id === assistantId); - if (assistantMsg?.content) { - const content = assistantMsg.content; - const suggestions = generateFollowUpSuggestions(content); - if (suggestions.length > 0) { - get().setSuggestions(suggestions); - } - } - }, - onError: (error: string) => { - set((state) => ({ - isStreaming: false, - messages: state.messages.map((m) => - m.id === assistantId - ? { ...m, content: `⚠️ ${error}`, streaming: false, error } - : m.role === 'user' && m.optimistic && m.timestamp.getTime() >= streamStartTime - ? { ...m, optimistic: false } - : m - ), - })); - }, - }, - { - sessionKey: effectiveSessionKey, - agentId: effectiveAgentId, - thinking_enabled: get().getChatModeConfig().thinking_enabled, - reasoning_effort: get().getChatModeConfig().reasoning_effort, - plan_mode: get().getChatModeConfig().plan_mode, - } - ); - - // Update runId from the result if available - if (result?.runId) { - runId = result.runId; - } - - if (!sessionKey) { - set({ sessionKey: effectiveSessionKey }); - } - - // Store runId on the message for correlation - set((state) => ({ - messages: state.messages.map((m) => - m.id === assistantId ? { ...m, runId } : m - ), - })); - } catch (err: unknown) { - // Gateway not connected — show error in the assistant bubble - const errorMessage = err instanceof Error ? err.message : '无法连接 Gateway'; - set((state) => ({ - isStreaming: false, - messages: state.messages.map((m) => - m.id === assistantId - ? { - ...m, - content: `⚠️ ${errorMessage}`, - streaming: false, - error: errorMessage, - } - : m.role === 'user' && m.optimistic && m.timestamp.getTime() >= streamStartTime - ? { ...m, optimistic: false } - : m - ), - })); - } - }, - - addTokenUsage: (inputTokens: number, outputTokens: number) => + addTokenUsage: (inputTokens: number, outputTokens: number) => { + useMessageStore.getState().addTokenUsage(inputTokens, outputTokens); set((state) => ({ totalInputTokens: state.totalInputTokens + inputTokens, totalOutputTokens: state.totalOutputTokens + outputTokens, - })), - - getTotalTokens: () => { - const { totalInputTokens, totalOutputTokens } = get(); - return { input: totalInputTokens, output: totalOutputTokens, total: totalInputTokens + totalOutputTokens }; + })); }, - searchSkills: (query: string) => { - const discovery = getSkillDiscovery(); - const result = discovery.searchSkills(query); - return { - results: result.results.map(s => ({ id: s.id, name: s.name, description: s.description })), - totalAvailable: result.totalAvailable, - }; + getTotalTokens: () => useMessageStore.getState().getTotalTokens(), + + // ── Streaming — delegated to streamStore ── + + sendMessage: (content: string) => useStreamStore.getState().sendMessage(content), + + initStreamListener: () => useStreamStore.getState().initStreamListener(), + + cancelStream: () => useStreamStore.getState().cancelStream(), + + searchSkills: (query: string) => useStreamStore.getState().searchSkills(query), + + setChatMode: (mode: ChatModeType) => { + useStreamStore.getState().setChatMode(mode); + set({ chatMode: mode }); }, - // Chat mode (DeerFlow-inspired) - setChatMode: (mode: ChatModeType) => set({ chatMode: mode }), + getChatModeConfig: () => useStreamStore.getState().getChatModeConfig(), - getChatModeConfig: () => CHAT_MODES[get().chatMode].config, + setSuggestions: (suggestions: string[]) => { + useStreamStore.getState().setSuggestions(suggestions); + set({ suggestions }); + }, - setSuggestions: (suggestions: string[]) => set({ suggestions }), + // ── Subtask mutations (message-level) ── addSubtask: (messageId: string, task: Subtask) => set((state) => ({ @@ -714,171 +234,128 @@ export const useChatStore = create()( : m ), })), - - initStreamListener: () => { - const client = getClient(); - - // Check if client supports onAgentStream (GatewayClient does, KernelClient doesn't) - if (!('onAgentStream' in client)) { - // KernelClient handles streaming via chatStream callbacks, no separate listener needed - return () => {}; - } - - const unsubscribe = client.onAgentStream((delta: AgentStreamDelta) => { - const state = get(); - - const streamingMsg = [...state.messages] - .reverse() - .find((m) => ( - m.role === 'assistant' - && m.streaming - && ( - (delta.runId && m.runId === delta.runId) - || (!delta.runId && m.runId === null) - ) - )) - || [...state.messages] - .reverse() - .find((m) => m.role === 'assistant' && m.streaming); - - if (!streamingMsg) return; - - if (delta.stream === 'assistant' && (delta.delta || delta.content)) { - set((s) => ({ - messages: s.messages.map((m) => - m.id === streamingMsg.id - ? { ...m, content: m.content + (delta.delta || delta.content || '') } - : m - ), - })); - } else if (delta.stream === 'tool') { - // Add tool step to the streaming assistant message (DeerFlow-inspired steps chain) - const step: ToolCallStep = { - id: `step_${Date.now()}_${generateRandomString(4)}`, - toolName: delta.tool || 'unknown', - input: delta.toolInput, - output: delta.toolOutput, - status: delta.toolOutput ? 'completed' : 'running', - timestamp: new Date(), - }; - set((s) => ({ - messages: s.messages.map((m) => - m.id === streamingMsg.id - ? { ...m, toolSteps: [...(m.toolSteps || []), step] } - : m - ), - })); - } else if (delta.stream === 'lifecycle') { - if (delta.phase === 'end' || delta.phase === 'error') { - set((s) => ({ - isStreaming: false, - messages: s.messages.map((m) => { - if (m.id === streamingMsg.id) { - return { - ...m, - streaming: false, - error: delta.phase === 'error' ? delta.error : undefined, - }; - } - // Clear optimistic flag on user messages (server confirmed) - if (m.optimistic) { - return { ...m, optimistic: false }; - } - return m; - }), - })); - // Generate follow-up suggestions on stream end - if (delta.phase === 'end') { - const completedMsg = get().messages.find(m => m.id === streamingMsg.id); - if (completedMsg?.content) { - const suggestions = generateFollowUpSuggestions(completedMsg.content); - if (suggestions.length > 0) { - get().setSuggestions(suggestions); - } - } - } - } - } else if (delta.stream === 'hand') { - // Handle Hand trigger events from ZCLAW - const handMsg: Message = { - id: `hand_${Date.now()}_${generateRandomString(4)}`, - role: 'hand', - content: delta.handResult - ? (typeof delta.handResult === 'string' ? delta.handResult : JSON.stringify(delta.handResult, null, 2)) - : `Hand: ${delta.handName || 'unknown'} - ${delta.handStatus || 'triggered'}`, - timestamp: new Date(), - runId: delta.runId, - handName: delta.handName, - handStatus: delta.handStatus, - handResult: delta.handResult, - }; - set((s) => ({ messages: [...s.messages, handMsg] })); - } else if (delta.stream === 'workflow') { - // Handle Workflow execution events from ZCLAW - const workflowMsg: Message = { - id: `workflow_${Date.now()}_${generateRandomString(4)}`, - role: 'workflow', - content: delta.workflowResult - ? (typeof delta.workflowResult === 'string' ? delta.workflowResult : JSON.stringify(delta.workflowResult, null, 2)) - : `Workflow: ${delta.workflowId || 'unknown'} step ${delta.workflowStep || '?'} - ${delta.workflowStatus || 'running'}`, - timestamp: new Date(), - runId: delta.runId, - workflowId: delta.workflowId, - workflowStep: delta.workflowStep, - workflowStatus: delta.workflowStatus, - workflowResult: delta.workflowResult, - }; - set((s) => ({ messages: [...s.messages, workflowMsg] })); - } - }); - - return unsubscribe; - }, }), { name: 'zclaw-chat-storage', - partialize: (state) => ({ - conversations: state.conversations, - currentModel: state.currentModel, - currentAgentId: state.currentAgent?.id, - currentConversationId: state.currentConversationId, - chatMode: state.chatMode, + partialize: (_state) => ({ + // chatMode is persisted in streamStore — nothing else to persist here. + // Keeping the persist wrapper for onRehydrateStorage lifecycle. }), - onRehydrateStorage: () => (state) => { - // Rehydrate Date objects from JSON strings - if (state?.conversations) { - for (const conv of state.conversations) { - conv.createdAt = new Date(conv.createdAt); - conv.updatedAt = new Date(conv.updatedAt); - for (const msg of conv.messages) { - msg.timestamp = new Date(msg.timestamp); - msg.streaming = false; // Never restore streaming state - msg.optimistic = false; // Never restore optimistic flag (server already confirmed) + onRehydrateStorage: () => { + // Wait for conversationStore to finish IndexedDB rehydration + // before syncing messages. IndexedDB is async and won't be ready + // in a fixed setTimeout. + let done = false; + + function syncMessages() { + if (done) return; + done = true; + const convStore = useConversationStore.getState(); + if (convStore.currentConversationId && convStore.conversations.length > 0) { + const conv = convStore.conversations.find(c => c.id === convStore.currentConversationId); + if (conv && conv.messages.length > 0) { + useChatStore.setState({ + messages: conv.messages.map(m => ({ ...m })) as Message[], + }); } } } - // Restore messages from current conversation via setState() to properly - // trigger subscriber re-renders. Direct mutation (state.messages = ...) - // does NOT notify zustand subscribers, leaving the UI stuck on []. - // Safe to reference useChatStore here because zustand persist runs - // hydration via setTimeout(1ms), so the store is fully created by - // the time this callback executes. - if (state?.currentConversationId && state.conversations) { - const currentConv = state.conversations.find(c => c.id === state.currentConversationId); - if (currentConv && currentConv.messages.length > 0) { - const messages = currentConv.messages.map(m => ({ ...m })); - const sessionKey = currentConv.sessionKey; - useChatStore.setState({ messages, sessionKey }); - } + // If conversationStore already hydrated (fast path), sync immediately + if (useConversationStore.persist.hasHydrated()) { + syncMessages(); + return; } + + // Otherwise subscribe and wait for hydration to complete + const unsub = useConversationStore.subscribe(() => { + if (useConversationStore.persist.hasHydrated()) { + unsub(); + syncMessages(); + } + }); + + // Safety timeout: if IDB is broken/slow, give up after 5s + setTimeout(() => { + if (!done) { + unsub(); + syncMessages(); + } + }, 5000); + + // NOTE: Do NOT return a cleanup function here. + // chatStore rehydrates from localStorage (fast), and the returned + // function fires immediately — before conversationStore finishes + // its slow IndexedDB rehydration. Returning cleanup would tear down + // the subscription prematurely. }, }, ), ); -// Dev-only: Expose chatStore to window for E2E testing +// --------------------------------------------------------------------------- +// Cross-store wiring +// --------------------------------------------------------------------------- + +// 1. Inject chatStore into messageStore for subtask delegation +setMessageStoreChatStore({ + getState: () => ({ + addSubtask: useChatStore.getState().addSubtask, + updateSubtask: useChatStore.getState().updateSubtask, + }), +}); + +// 2. Inject chatStore into streamStore for message mutations +injectChatStore({ + addMessage: (msg) => useChatStore.getState().addMessage(msg as Message), + updateMessages: (updater) => { + const msgs = useChatStore.getState().messages as unknown[]; + const updated = updater(msgs as Parameters[0]); + useChatStore.setState({ messages: updated as Message[] }); + }, + getMessages: () => useChatStore.getState().messages, + setChatStoreState: (partial) => useChatStore.setState(partial as Partial), +}); + +// 3. Sync streamStore state to chatStore mirrors +const unsubStream = useStreamStore.subscribe((state) => { + const chat = useChatStore.getState(); + const updates: Partial = {}; + if (chat.isStreaming !== state.isStreaming) updates.isStreaming = state.isStreaming; + if (chat.isLoading !== state.isLoading) updates.isLoading = state.isLoading; + if (chat.chatMode !== state.chatMode) updates.chatMode = state.chatMode; + if (chat.suggestions !== state.suggestions) updates.suggestions = state.suggestions; + if (Object.keys(updates).length > 0) { + useChatStore.setState(updates); + } +}); + +// 4. Sync messageStore tokens to chatStore mirror +const unsubTokens = useMessageStore.subscribe((state) => { + const chat = useChatStore.getState(); + if ( + chat.totalInputTokens !== state.totalInputTokens || + chat.totalOutputTokens !== state.totalOutputTokens + ) { + useChatStore.setState({ + totalInputTokens: state.totalInputTokens, + totalOutputTokens: state.totalOutputTokens, + }); + } +}); + +// HMR cleanup: unsubscribe on module hot-reload to prevent duplicate listeners +if (import.meta.hot) { + import.meta.hot.dispose(() => { + unsubStream(); + unsubTokens(); + }); +} + +// Dev-only: Expose stores to window for E2E testing if (import.meta.env.DEV && typeof window !== 'undefined') { (window as any).__ZCLAW_STORES__ = (window as any).__ZCLAW_STORES__ || {}; (window as any).__ZCLAW_STORES__.chat = useChatStore; + (window as any).__ZCLAW_STORES__.message = useMessageStore; + (window as any).__ZCLAW_STORES__.stream = useStreamStore; } diff --git a/desktop/src/store/index.ts b/desktop/src/store/index.ts index 2218d64..8baccdf 100644 --- a/desktop/src/store/index.ts +++ b/desktop/src/store/index.ts @@ -66,6 +66,14 @@ export type { GenerationProgressEvent, } from './classroomStore'; +// === Chat Stores === +export { useChatStore } from './chatStore'; +export type { Message, MessageFile, CodeBlock } from './chatStore'; +export { useConversationStore } from './chat/conversationStore'; +export { useMessageStore } from './chat/messageStore'; +export { useStreamStore } from './chat/streamStore'; +export { useArtifactStore } from './chat/artifactStore'; + // === Store Initialization === import { getClient } from './connectionStore'; diff --git a/docs/superpowers/specs/2026-04-02-chatstore-refactor-design.md b/docs/superpowers/specs/2026-04-02-chatstore-refactor-design.md index 5cf2e3d..8ee0c0a 100644 --- a/docs/superpowers/specs/2026-04-02-chatstore-refactor-design.md +++ b/docs/superpowers/specs/2026-04-02-chatstore-refactor-design.md @@ -1,7 +1,7 @@ # ChatStore 结构化重构设计 > 日期: 2026-04-02 -> 状态: Draft +> 状态: Complete (Phase 0-8, 三轮审计通过) > 范围: desktop/src/store/chatStore.ts 及关联文件 ## 1. 背景