refactor(desktop): ChatStore structured split + IDB persistence + stream cancel
Some checks failed
CI / Lint & TypeCheck (push) Has been cancelled
CI / Unit Tests (push) Has been cancelled
CI / Build Frontend (push) Has been cancelled
CI / Rust Check (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
CI / E2E Tests (push) Has been cancelled

Split monolithic chatStore.ts (908 lines) into 4 focused stores:
- chatStore.ts: facade layer, owns messages[], backward-compatible selectors
- conversationStore.ts: conversation CRUD, agent switching, IndexedDB persistence
- streamStore.ts: streaming orchestration, chat mode, suggestions
- messageStore.ts: token tracking

Key fixes from 3-round deep audit:
- C1: Fix Rust serde camelCase vs TS snake_case mismatch (toolStart/toolEnd/iterationStart)
- C2: Fix IDB async rehydration race with persist.hasHydrated() subscribe
- C3: Add sessionKey to partialize to survive page refresh
- H3: Fix IDB migration retry on failure (don't set migrated=true in catch)
- M3: Fix ToolCallStep deduplication (toolStart creates, toolEnd updates)
- M-NEW-2: Clear sessionKey on cancelStream

Also adds:
- Rust backend stream cancellation via AtomicBool + cancel_stream command
- IndexedDB storage adapter with one-time localStorage migration
- HMR cleanup for cross-store subscriptions
This commit is contained in:
iven
2026-04-03 00:24:16 +08:00
parent da438ad868
commit 0a04b260a4
22 changed files with 1269 additions and 767 deletions

View File

@@ -5,7 +5,7 @@ use serde::{Deserialize, Serialize};
use tauri::{AppHandle, Emitter, State};
use zclaw_types::AgentId;
use super::{validate_agent_id, KernelState, SessionStreamGuard};
use super::{validate_agent_id, KernelState, SessionStreamGuard, StreamCancelFlags};
use crate::intelligence::validation::validate_string_length;
// ---------------------------------------------------------------------------
@@ -67,6 +67,7 @@ pub struct StreamChatRequest {
// ---------------------------------------------------------------------------
/// Send a message to an agent
// @connected
#[tauri::command]
pub async fn agent_chat(
state: State<'_, KernelState>,
@@ -99,6 +100,7 @@ pub async fn agent_chat(
/// This command initiates a streaming chat session. Events are emitted
/// via Tauri's event system with the name "stream:chunk" and include
/// the session_id for routing.
// @connected
#[tauri::command]
pub async fn agent_chat_stream(
app: AppHandle,
@@ -107,6 +109,7 @@ pub async fn agent_chat_stream(
heartbeat_state: State<'_, crate::intelligence::HeartbeatEngineState>,
reflection_state: State<'_, crate::intelligence::ReflectionEngineState>,
stream_guard: State<'_, SessionStreamGuard>,
cancel_flags: State<'_, StreamCancelFlags>,
request: StreamChatRequest,
) -> Result<(), String> {
validate_agent_id(&request.agent_id)?;
@@ -136,6 +139,21 @@ pub async fn agent_chat_stream(
return Err(format!("Session {} already has an active stream", session_id));
}
// Prepare cleanup resources for error paths (before spawn takes ownership)
let err_cleanup_guard = stream_guard.inner().clone();
let err_cleanup_cancel = cancel_flags.inner().clone();
let err_cleanup_session_id = session_id.clone();
let err_cleanup_flag = Arc::clone(&*session_active);
// Register cancellation flag for this session
let cancel_flag = cancel_flags
.entry(session_id.clone())
.or_insert_with(|| Arc::new(std::sync::atomic::AtomicBool::new(false)));
// Ensure flag is reset (in case of stale entry from a previous stream)
cancel_flag.store(false, std::sync::atomic::Ordering::SeqCst);
let cancel_clone = Arc::clone(&*cancel_flag);
let cancel_flags_map: StreamCancelFlags = cancel_flags.inner().clone();
// AUTO-INIT HEARTBEAT
{
let mut engines = heartbeat_state.lock().await;
@@ -160,7 +178,13 @@ pub async fn agent_chat_stream(
let (mut rx, llm_driver) = {
let kernel_lock = state.lock().await;
let kernel = kernel_lock.as_ref()
.ok_or_else(|| "Kernel not initialized. Call kernel_init first.".to_string())?;
.ok_or_else(|| {
// Cleanup on error: release guard + cancel flag
err_cleanup_flag.store(false, std::sync::atomic::Ordering::SeqCst);
err_cleanup_guard.remove(&err_cleanup_session_id);
err_cleanup_cancel.remove(&err_cleanup_session_id);
"Kernel not initialized. Call kernel_init first.".to_string()
})?;
let driver = Some(kernel.driver());
@@ -172,6 +196,10 @@ pub async fn agent_chat_stream(
match uuid::Uuid::parse_str(&session_id) {
Ok(uuid) => Some(zclaw_types::SessionId::from_uuid(uuid)),
Err(e) => {
// Cleanup on error
err_cleanup_flag.store(false, std::sync::atomic::Ordering::SeqCst);
err_cleanup_guard.remove(&err_cleanup_session_id);
err_cleanup_cancel.remove(&err_cleanup_session_id);
return Err(format!(
"Invalid session_id '{}': {}. Cannot reuse conversation context.",
session_id, e
@@ -194,13 +222,22 @@ pub async fn agent_chat_stream(
Some(chat_mode_config),
)
.await
.map_err(|e| format!("Failed to start streaming: {}", e))?;
.map_err(|e| {
// Cleanup on error
err_cleanup_flag.store(false, std::sync::atomic::Ordering::SeqCst);
err_cleanup_guard.remove(&err_cleanup_session_id);
err_cleanup_cancel.remove(&err_cleanup_session_id);
format!("Failed to start streaming: {}", e)
})?;
(rx, driver)
};
let hb_state = heartbeat_state.inner().clone();
let rf_state = reflection_state.inner().clone();
// Clone the guard map for cleanup in the spawned task
let guard_map: SessionStreamGuard = stream_guard.inner().clone();
// Spawn a task to process stream events.
// The session_active flag is cleared when task completes.
let guard_clone = Arc::clone(&*session_active);
@@ -212,6 +249,16 @@ pub async fn agent_chat_stream(
let stream_timeout = tokio::time::Duration::from_secs(300);
loop {
// Check cancellation flag before each recv
if cancel_clone.load(std::sync::atomic::Ordering::SeqCst) {
tracing::info!("[agent_chat_stream] Stream cancelled for session: {}", session_id);
let _ = app.emit("stream:chunk", serde_json::json!({
"sessionId": session_id,
"event": StreamChatEvent::Error { message: "已取消".to_string() }
}));
break;
}
match tokio::time::timeout(stream_timeout, rx.recv()).await {
Ok(Some(event)) => {
let stream_event = match &event {
@@ -300,9 +347,37 @@ pub async fn agent_chat_stream(
tracing::debug!("[agent_chat_stream] Stream processing ended for session: {}", session_id);
// Release session lock
guard_clone.store(false, std::sync::atomic::Ordering::SeqCst);
// Release session lock and clean up DashMap entries to prevent memory leaks.
// Use compare_exchange to only remove if we still own the flag (guards against
// a new stream for the same session_id starting after we broke out of the loop).
if guard_clone.compare_exchange(true, false, std::sync::atomic::Ordering::SeqCst, std::sync::atomic::Ordering::SeqCst).is_ok() {
guard_map.remove(&session_id);
}
// Clean up cancellation flag (always safe — cancel is session-scoped)
cancel_flags_map.remove(&session_id);
});
Ok(())
}
/// Cancel an active stream for a given session.
///
/// Sets the cancellation flag for the session, which the streaming task
/// checks on each iteration. The task will then emit an error event
/// and clean up.
// @connected
#[tauri::command]
pub async fn cancel_stream(
cancel_flags: State<'_, StreamCancelFlags>,
session_id: String,
) -> Result<(), String> {
if let Some(flag) = cancel_flags.get(&session_id) {
flag.store(true, std::sync::atomic::Ordering::SeqCst);
tracing::info!("[cancel_stream] Cancel requested for session: {}", session_id);
Ok(())
} else {
// No active stream for this session — not an error, just a no-op
tracing::debug!("[cancel_stream] No active stream for session: {}", session_id);
Ok(())
}
}