/** * StreamStore — manages streaming orchestration, chat mode, and suggestions. * * Extracted from chatStore.ts as part of the structured refactor (Phase 4). * Responsible for: * - Stream lifecycle (sendMessage, initStreamListener) * - Chat mode state (chatMode, setChatMode, getChatModeConfig) * - Follow-up suggestions * - Skill search * * Design: streamStore holds its own `isStreaming` state and delegates * message mutations to chatStore via an injected reference. This avoids * circular imports while keeping high-frequency streaming updates * (dozens of set() calls per second) on a single Zustand store. */ import { create } from 'zustand'; import { persist } from 'zustand/middleware'; import type { AgentStreamDelta } from '../../lib/gateway-client'; import { getClient } from '../../store/connectionStore'; import { intelligenceClient } from '../../lib/intelligence-client'; import { getMemoryExtractor } from '../../lib/memory-extractor'; import { getSkillDiscovery } from '../../lib/skill-discovery'; import { useOfflineStore, isOffline } from '../../store/offlineStore'; import { useConnectionStore } from '../../store/connectionStore'; import { createLogger } from '../../lib/logger'; import { generateRandomString } from '../../lib/crypto-utils'; import type { ChatModeType, ChatModeConfig, Subtask } from '../../components/ai'; import type { ToolCallStep } from '../../components/ai'; import { CHAT_MODES } from '../../components/ai'; import { useConversationStore, resolveGatewayAgentId, } from './conversationStore'; import { useMessageStore } from './messageStore'; import { useArtifactStore } from './artifactStore'; import { llmSuggest } from '../../lib/llm-service'; import { detectNameSuggestion, detectAgentNameSuggestion } from '../../lib/cold-start-mapper'; const log = createLogger('StreamStore'); // --------------------------------------------------------------------------- // Error formatting — convert raw LLM/API errors to user-friendly messages // --------------------------------------------------------------------------- function formatUserError(raw: string): string { if (raw.includes('API Key') || raw.includes('没有可用的 API Key')) { return '模型服务暂时不可用,请稍后重试'; } if (raw.includes('404') || raw.includes('NOT_FOUND')) { return '模型服务未找到,请检查模型配置'; } if (raw.includes('429') || raw.includes('rate_limit') || raw.includes('Too Many Requests')) { return '请求过于频繁,请稍后重试'; } if (raw.includes('401') || raw.includes('Unauthorized')) { return '认证已过期,请重新登录'; } if (raw.includes('402') || raw.includes('quota') || raw.includes('配额')) { return '使用配额已用尽,请升级订阅或联系管理员'; } if (raw.includes('timeout') || raw.includes('超时') || raw.includes('Timeout')) { return '请求超时,请稍后重试'; } if (raw.includes('502') || raw.includes('Bad Gateway')) { return '服务网关异常,请稍后重试'; } if (raw.includes('503') || raw.includes('Service Unavailable')) { return '服务暂时不可用,请稍后重试'; } // Strip raw JSON from remaining errors return raw.replace(/\{[^}]*\}/g, '').replace(/\s+/g, ' ').trim().substring(0, 80) || '请求失败,请稍后重试'; } // --------------------------------------------------------------------------- // 401 Auth Error Recovery // --------------------------------------------------------------------------- /** * Detect and handle 401 auth errors during chat streaming. * Attempts token refresh → kernel reconnect → auto-retry. * Returns a user-friendly error message if recovery fails. */ async function tryRecoverFromAuthError(error: string): Promise { const is401 = /401|Unauthorized|UNAUTHORIZED|未认证|认证已过期/.test(error); if (!is401) return null; log.info('Detected 401 auth error, attempting token refresh...'); try { const { saasClient } = await import('../../lib/saas-client'); const newToken = await saasClient.refreshMutex(); if (newToken) { // Update kernel config with refreshed token → triggers kernel re-init via changed api_key detection const { getKernelClient } = await import('../../lib/kernel-client'); const kernelClient = getKernelClient(); const currentConfig = kernelClient.getConfig(); if (currentConfig) { kernelClient.setConfig({ ...currentConfig, apiKey: newToken }); await kernelClient.connect(); log.info('Kernel reconnected with refreshed token'); } return '认证已刷新,请重新发送消息'; } } catch (refreshErr) { log.warn('Token refresh failed, triggering logout:', refreshErr); try { const { useSaaSStore } = await import('../saasStore'); useSaaSStore.getState().logout(); } catch { /* non-critical */ } return 'SaaS 会话已过期,请重新登录'; } return '认证失败,请重新登录'; } // --------------------------------------------------------------------------- // Types // --------------------------------------------------------------------------- /** Message shape used internally by streamStore for typed callbacks. */ interface StreamMsg { id: string; role: 'user' | 'assistant' | 'tool' | 'hand' | 'workflow' | 'system'; content: string; timestamp: Date; streaming?: boolean; optimistic?: boolean; runId?: string; error?: string; thinkingContent?: string; toolSteps?: ToolCallStep[]; handName?: string; handStatus?: string; handResult?: unknown; workflowId?: string; workflowStep?: string; workflowStatus?: string; workflowResult?: unknown; subtasks?: Subtask[]; } /** Shape of the chatStore methods needed by streamStore. */ interface ChatStoreAccess { addMessage: (msg: StreamMsg) => void; updateMessages: (updater: (msgs: StreamMsg[]) => StreamMsg[]) => void; getMessages: () => StreamMsg[]; setChatStoreState: (partial: Record) => void; } // --------------------------------------------------------------------------- // Delta buffer — batches text/thinking deltas at ~60fps // --------------------------------------------------------------------------- class DeltaBuffer { private text = ''; private think = ''; private timer: ReturnType | null = null; constructor( private readonly assistantId: string, private readonly chat: ChatStoreAccess, ) {} appendText(delta: string) { this.text += delta; this.scheduleFlush(); } appendThinking(delta: string) { this.think += delta; this.scheduleFlush(); } flush() { this.timer = null; const text = this.text; const think = this.think; if (text || think) { this.chat.updateMessages(msgs => msgs.map(m => { if (m.id !== this.assistantId) return m; return { ...m, ...(text ? { content: m.content + text } : {}), ...(think ? { thinkingContent: (m.thinkingContent || '') + think } : {}), }; }) ); } this.text = ''; this.think = ''; } clear() { if (this.timer !== null) { clearTimeout(this.timer); this.timer = null; } this.text = ''; this.think = ''; } flushRemaining() { if (this.timer !== null) { clearTimeout(this.timer); this.timer = null; } this.flush(); } private scheduleFlush() { if (this.timer === null) { this.timer = setTimeout(() => this.flush(), 0); } } } // --------------------------------------------------------------------------- // Stream event handlers (extracted from sendMessage) // --------------------------------------------------------------------------- function createToolHandler(assistantId: string, chat: ChatStoreAccess) { return (tool: string, input: string, output: string) => { if (output) { // toolEnd: complete the last running step for this tool chat.updateMessages(msgs => msgs.map(m => { if (m.id !== assistantId) return m; const steps = [...(m.toolSteps || [])]; for (let i = steps.length - 1; i >= 0; i--) { if (steps[i].toolName === tool && steps[i].status === 'running') { steps[i] = { ...steps[i], output, status: 'completed' as const }; break; } } return { ...m, toolSteps: steps }; }) ); // Auto-create artifact when file_write tool produces output if (tool === 'file_write') { try { const parsed = JSON.parse(output); const filePath = parsed?.path || parsed?.file_path || ''; const content = parsed?.content || ''; if (filePath && content) { const fileName = filePath.split('/').pop() || filePath; const ext = fileName.split('.').pop()?.toLowerCase() || ''; const typeMap: Record = { ts: 'code', tsx: 'code', js: 'code', jsx: 'code', py: 'code', rs: 'code', go: 'code', java: 'code', md: 'markdown', txt: 'text', json: 'code', html: 'code', css: 'code', sql: 'code', sh: 'code', }; const langMap: Record = { ts: 'typescript', tsx: 'typescript', js: 'javascript', jsx: 'javascript', py: 'python', rs: 'rust', go: 'go', java: 'java', html: 'html', css: 'css', sql: 'sql', sh: 'bash', json: 'json', }; useArtifactStore.getState().addArtifact({ id: `artifact_${Date.now()}`, name: fileName, content: typeof content === 'string' ? content : JSON.stringify(content, null, 2), type: typeMap[ext] || 'text', language: langMap[ext], createdAt: new Date(), sourceStepId: assistantId, }); } } catch { /* non-critical: artifact creation from tool output */ } } } else { // toolStart: create new running step const step: ToolCallStep = { id: `step_${Date.now()}_${generateRandomString(4)}`, toolName: tool, input, output: '', status: 'running', timestamp: new Date(), }; chat.updateMessages(msgs => msgs.map(m => m.id === assistantId ? { ...m, toolSteps: [...(m.toolSteps || []), step] } : m ) ); } }; } function createHandHandler(chat: ChatStoreAccess, runId: string) { return (name: string, status: string, result?: unknown) => { const handMsg: StreamMsg = { id: `hand_${Date.now()}_${generateRandomString(4)}`, role: 'hand', content: result ? (typeof result === 'string' ? result : JSON.stringify(result, null, 2)) : `Hand: ${name} - ${status}`, timestamp: new Date(), runId, handName: name, handStatus: status, handResult: result, }; chat.updateMessages(msgs => [...msgs, handMsg]); }; } function createSubtaskHandler(assistantId: string, chat: ChatStoreAccess) { return (taskId: string, description: string, status: string, detail?: string) => { const statusMap: Record = { started: 'pending', running: 'in_progress', completed: 'completed', failed: 'failed', }; const mappedStatus = statusMap[status] || 'in_progress'; chat.updateMessages(msgs => msgs.map(m => { if (m.id !== assistantId) return m; const subtasks = [...(m.subtasks || [])]; const existingIdx = subtasks.findIndex(st => st.id === taskId); if (existingIdx >= 0) { subtasks[existingIdx] = { ...subtasks[existingIdx], status: mappedStatus, result: detail }; } else { subtasks.push({ id: taskId, description, status: mappedStatus, result: detail }); } return { ...m, subtasks }; }) ); }; } function createCompleteHandler( assistantId: string, _streamStartTime: number, chat: ChatStoreAccess, buffer: DeltaBuffer, agentId: string, set: (partial: Partial) => void, ) { return (inputTokens?: number, outputTokens?: number) => { buffer.flushRemaining(); const currentMsgs = chat.getMessages(); if (currentMsgs) { useConversationStore.getState().upsertActiveConversation(currentMsgs); } chat.updateMessages(msgs => msgs.map(m => { if (m.id === assistantId) return { ...m, streaming: false }; if (m.optimistic) return { ...m, optimistic: false }; return m; }) ); set({ isStreaming: false, activeRunId: null }); if (inputTokens !== undefined && outputTokens !== undefined) { useMessageStore.getState().addTokenUsage(inputTokens, outputTokens); chat.setChatStoreState({ totalInputTokens: useMessageStore.getState().totalInputTokens, totalOutputTokens: useMessageStore.getState().totalOutputTokens, }); } // Detect name changes from last user message (independent of memory extraction) const msgs = chat.getMessages() || []; const lastUserMsg = [...msgs].reverse().find(m => m.role === 'user'); const lastContent = typeof lastUserMsg?.content === 'string' ? lastUserMsg.content : ''; if (lastContent && agentId) { // User name detection (e.g. "叫我小王") const detectedName = detectNameSuggestion(lastContent); if (detectedName) { import('../agentStore').then(({ useAgentStore }) => useAgentStore.getState().updateClone(agentId, { userName: detectedName }) .then(() => log.info(`Updated userName to "${detectedName}" from conversation`)) .catch(e => log.warn('Failed to persist detected userName:', e)) ); } // Agent name detection (e.g. "叫你小马", "名称改为小芳") const detectedAgentName = detectAgentNameSuggestion(lastContent); if (detectedAgentName) { import('../agentStore').then(({ useAgentStore }) => useAgentStore.getState().updateClone(agentId, { name: detectedAgentName }) .then(() => { log.info(`Updated agent name to "${detectedAgentName}" from conversation`); if (typeof window !== 'undefined') { window.dispatchEvent(new CustomEvent('zclaw:agent-profile-updated', { detail: { agentId } })); } }) .catch(e => log.warn('Failed to persist detected agent name:', e)) ); } } // Async memory extraction (independent — failures don't block name detection) const filtered = msgs .filter(m => m.role === 'user' || m.role === 'assistant') .map(m => ({ role: m.role, content: m.content })); const convId = useConversationStore.getState().currentConversationId; getMemoryExtractor().extractFromConversation(filtered, agentId, convId ?? undefined) .catch(err => log.warn('Memory extraction failed:', err)); intelligenceClient.reflection.recordConversation().catch(err => { log.warn('Recording conversation failed:', err); }); intelligenceClient.reflection.shouldReflect().then(shouldReflect => { if (shouldReflect) { intelligenceClient.reflection.reflect(agentId, []).catch(err => { log.warn('Reflection failed:', err); }); } }); // Follow-up suggestions (async LLM call with keyword fallback) const latestMsgs = chat.getMessages() || []; const conversationMessages = latestMsgs .filter(m => m.role === 'user' || m.role === 'assistant') .filter(m => !m.streaming) .map(m => ({ role: m.role, content: m.content })); generateLLMSuggestions(conversationMessages, set).catch(err => { log.warn('Suggestion generation error:', err); set({ suggestionsLoading: false }); }); }; } export interface StreamState { isStreaming: boolean; /** Brief cooldown after cancelStream — prevents race with backend active-stream check */ cancelCooldown: boolean; isLoading: boolean; chatMode: ChatModeType; suggestions: string[]; /** Whether LLM-generated suggestions are being fetched. */ suggestionsLoading: boolean; /** Run ID of the currently active stream (null when idle). */ activeRunId: string | null; // Core streaming sendMessage: (content: string) => Promise; initStreamListener: () => () => void; /** Cancel the active stream: resets state, marks message cancelled, saves conversation. */ cancelStream: () => void; // Chat mode setChatMode: (mode: ChatModeType) => void; getChatModeConfig: () => ChatModeConfig; // Suggestions setSuggestions: (suggestions: string[]) => void; setSuggestionsLoading: (loading: boolean) => void; // Skill search searchSkills: (query: string) => { results: Array<{ id: string; name: string; description: string }>; totalAvailable: number; }; // Loading setIsLoading: (loading: boolean) => void; } // --------------------------------------------------------------------------- // Follow-up suggestion generator // --------------------------------------------------------------------------- function generateKeywordFallback(content: string): string[] { const suggestions: string[] = []; const lower = content.toLowerCase(); const patterns: Array<{ keywords: string[]; suggestion: string }> = [ { keywords: ['代码', 'code', 'function', '函数', '实现'], suggestion: '解释这段代码的工作原理' }, { keywords: ['错误', 'error', 'bug', '问题'], suggestion: '如何调试这个问题?' }, { keywords: ['数据', 'data', '分析', '统计'], suggestion: '可视化这些数据' }, { keywords: ['步骤', 'step', '流程', '方案'], suggestion: '详细说明第一步该怎么做' }, { keywords: ['可以', '建议', '推荐', '试试'], suggestion: '还有其他方案吗?' }, { keywords: ['文件', 'file', '保存', '写入'], suggestion: '查看生成的文件内容' }, { keywords: ['搜索', 'search', '查找', 'research'], suggestion: '搜索更多相关信息' }, ]; for (const { keywords, suggestion } of patterns) { if (keywords.some(kw => lower.includes(kw))) { if (!suggestions.includes(suggestion)) { suggestions.push(suggestion); } } if (suggestions.length >= 3) break; } const generic = ['继续深入分析', '换个角度看看', '用简单的话解释']; while (suggestions.length < 3) { const next = generic.find(g => !suggestions.includes(g)); if (next) suggestions.push(next); else break; } return suggestions; } /** * Parse LLM response into an array of suggestion strings. * Handles: raw JSON array, markdown-fenced JSON, trailing/leading text. */ function parseSuggestionResponse(raw: string): string[] { let cleaned = raw.trim(); // Strip markdown code fences cleaned = cleaned.replace(/^```(?:json)?\s*\n?/i, ''); cleaned = cleaned.replace(/\n?```\s*$/i, ''); cleaned = cleaned.trim(); // Direct JSON parse try { const parsed = JSON.parse(cleaned); if (Array.isArray(parsed)) { return parsed .filter((item): item is string => typeof item === 'string' && item.trim().length > 0) .slice(0, 3); } } catch { /* fall through */ } // Extract JSON array from surrounding text const arrayMatch = cleaned.match(/\[[\s\S]*?\]/); if (arrayMatch) { try { const parsed = JSON.parse(arrayMatch[0]); if (Array.isArray(parsed)) { return parsed .filter((item): item is string => typeof item === 'string' && item.trim().length > 0) .slice(0, 3); } } catch { /* fall through */ } } // Last resort: split by newlines, strip list markers const lines = cleaned .split(/\n/) .map(l => l.replace(/^[-*\d.)\]]+\s*/, '').trim()) .filter(l => l.length > 0 && l.length < 60); if (lines.length > 0) { return lines.slice(0, 3); } return []; } /** * Generate contextual follow-up suggestions via LLM. * Routes through SaaS relay or local kernel based on connection mode. * Falls back to keyword-based approach on any failure. */ async function generateLLMSuggestions( messages: Array<{ role: string; content: string }>, set: (partial: Partial) => void, ): Promise { set({ suggestionsLoading: true }); try { const recentMessages = messages.slice(-6); const context = recentMessages .map(m => `${m.role === 'user' ? '用户' : '助手'}: ${m.content}`) .join('\n\n'); const connectionMode = typeof localStorage !== 'undefined' ? localStorage.getItem('zclaw-connection-mode') : null; let raw: string; if (connectionMode === 'saas') { raw = await llmSuggestViaSaaS(context); } else { raw = await llmSuggest(context); } const suggestions = parseSuggestionResponse(raw); if (suggestions.length > 0) { set({ suggestions, suggestionsLoading: false }); } else { const lastAssistant = messages.filter(m => m.role === 'assistant').pop()?.content || ''; set({ suggestions: generateKeywordFallback(lastAssistant), suggestionsLoading: false }); } } catch (err) { log.warn('LLM suggestion generation failed, using keyword fallback:', err); const lastAssistant = messages.filter(m => m.role === 'assistant').pop()?.content || ''; set({ suggestions: generateKeywordFallback(lastAssistant), suggestionsLoading: false }); } } /** * Generate suggestions via SaaS relay using SSE streaming. * Uses the same streaming path as the main chat to avoid relay timeout issues * with non-streaming requests. Collects the full response from SSE deltas, * then parses the suggestion JSON from the accumulated text. */ async function llmSuggestViaSaaS(context: string): Promise { const { saasClient } = await import('../../lib/saas-client'); const { useConversationStore } = await import('./conversationStore'); const { useSaaSStore } = await import('../saasStore'); const currentModel = useConversationStore.getState().currentModel; const availableModels = useSaaSStore.getState().availableModels; const model = currentModel || (availableModels.length > 0 ? availableModels[0]?.id : undefined); if (!model) throw new Error('No model available for suggestions'); // Delay to avoid concurrent relay requests with memory extraction await new Promise(r => setTimeout(r, 2000)); const controller = new AbortController(); const timeoutId = setTimeout(() => controller.abort(), 60000); try { const response = await saasClient.chatCompletion( { model, messages: [ { role: 'system', content: LLM_PROMPTS_SYSTEM }, { role: 'user', content: `以下是对话中最近的消息:\n\n${context}\n\n请生成 3 个后续问题。` }, ], max_tokens: 500, temperature: 0.7, stream: true, }, controller.signal, ); if (!response.ok) { const errText = await response.text().catch(() => 'unknown error'); throw new Error(`SaaS relay error ${response.status}: ${errText.substring(0, 100)}`); } // Read full response as text — suggestion responses are small (max 500 tokens), // so streaming is unnecessary. This avoids ReadableStream compatibility issues // in Tauri WebView2 where body.getReader() may not yield SSE chunks correctly. const rawText = await response.text(); log.debug('[Suggest] Raw response length:', rawText.length); // Parse SSE "data:" lines from accumulated text let accumulated = ''; for (const line of rawText.split('\n')) { const trimmed = line.trim(); if (!trimmed.startsWith('data: ')) continue; const payload = trimmed.slice(6).trim(); if (payload === '[DONE]') continue; try { const parsed = JSON.parse(payload); const delta = parsed.choices?.[0]?.delta; if (delta?.content) accumulated += delta.content; } catch { /* skip malformed lines */ } } log.debug('[Suggest] Accumulated length:', accumulated.length); return accumulated; } finally { clearTimeout(timeoutId); } } const LLM_PROMPTS_SYSTEM = `你是对话分析助手。根据最近的对话内容,生成 3 个用户可能想继续探讨的问题。 要求: - 每个问题必须与对话内容直接相关,具体且有针对性 - 帮助用户深入理解、实际操作或拓展思路 - 每个问题不超过 30 个中文字符 - 不要重复对话中已讨论过的内容 - 使用与用户相同的语言 只输出 JSON 数组,包含恰好 3 个字符串。不要输出任何其他内容。 示例:["如何在生产环境中部署?", "这个方案的成本如何?", "有没有更简单的替代方案?"]`; // --------------------------------------------------------------------------- // ChatStore injection (avoids circular imports) // --------------------------------------------------------------------------- let _chat: ChatStoreAccess | null = null; /** * Inject chatStore access for message mutations. * Called by chatStore after creation. */ export function injectChatStore(access: ChatStoreAccess): void { _chat = access; } // --------------------------------------------------------------------------- // Store // --------------------------------------------------------------------------- export const useStreamStore = create()( persist( (set, get) => ({ isStreaming: false, cancelCooldown: false, isLoading: false, chatMode: 'thinking' as ChatModeType, suggestions: [], suggestionsLoading: false, activeRunId: null as string | null, // ── Chat Mode ── setChatMode: (mode: ChatModeType) => set({ chatMode: mode }), getChatModeConfig: () => CHAT_MODES[get().chatMode].config, setSuggestions: (suggestions: string[]) => set({ suggestions }), setSuggestionsLoading: (loading: boolean) => set({ suggestionsLoading: loading }), setIsLoading: (loading: boolean) => set({ isLoading: loading }), // ── Skill Search ── searchSkills: (query: string) => { const discovery = getSkillDiscovery(); const result = discovery.searchSkills(query); return { results: result.results.map(s => ({ id: s.id, name: s.name, description: s.description })), totalAvailable: result.totalAvailable, }; }, // ── Core: sendMessage ── sendMessage: async (content: string) => { if (get().isStreaming || get().cancelCooldown) return; if (!_chat) { log.warn('sendMessage called before chatStore injection'); return; } const convStore = useConversationStore.getState(); const currentAgent = convStore.currentAgent; const sessionKey = convStore.sessionKey; set({ suggestions: [], suggestionsLoading: false }); const effectiveSessionKey = sessionKey || crypto.randomUUID(); const effectiveAgentId = resolveGatewayAgentId(currentAgent); const agentId = currentAgent?.id || 'zclaw-main'; // Offline path if (isOffline()) { const { queueMessage } = useOfflineStore.getState(); const queueId = queueMessage(content, effectiveAgentId, effectiveSessionKey); log.debug(`Offline - message queued: ${queueId}`); _chat.addMessage({ id: `system_${Date.now()}`, role: 'system', content: `后端服务不可用,消息已保存到本地队列。恢复连接后将自动发送。`, timestamp: new Date(), }); _chat.addMessage({ id: `user_${Date.now()}`, role: 'user', content, timestamp: new Date(), }); return; } const streamStartTime = Date.now(); _chat.addMessage({ id: `user_${streamStartTime}`, role: 'user', content, timestamp: new Date(streamStartTime), optimistic: true, }); const assistantId = `assistant_${Date.now()}`; _chat.addMessage({ id: assistantId, role: 'assistant', content: '', timestamp: new Date(), streaming: true, }); set({ isStreaming: true, activeRunId: null }); // Delta buffer — batches updates at ~60fps const buffer = new DeltaBuffer(assistantId, _chat); try { const client = getClient(); const connectionState = useConnectionStore.getState().connectionState; if (connectionState !== 'connected') { throw new Error(`Not connected (state: ${connectionState})`); } let runId = `run_${Date.now()}`; if (!useConversationStore.getState().sessionKey) { useConversationStore.setState({ sessionKey: effectiveSessionKey }); } // Build conversation history for relay clients (last 20 messages ≈ 10 turns) const history = (_chat?.getMessages() || []) .filter(m => m.role === 'user' || m.role === 'assistant') .filter(m => !m.streaming && !m.optimistic) .map(m => ({ role: m.role, content: m.content })) .slice(-20); // UI 模型选择器应覆盖 Agent 默认模型 const currentModel = useConversationStore.getState().currentModel; const result = await client.chatStream( content, { onDelta: (delta: string) => buffer.appendText(delta), onThinkingDelta: (delta: string) => buffer.appendThinking(delta), onTool: createToolHandler(assistantId, _chat), onHand: createHandHandler(_chat, runId), onSubtaskStatus: createSubtaskHandler(assistantId, _chat), onComplete: createCompleteHandler(assistantId, streamStartTime, _chat, buffer, agentId, set), onError: async (error: string) => { buffer.flushRemaining(); const recoveryMsg = await tryRecoverFromAuthError(error); const displayError = formatUserError(recoveryMsg || error); _chat?.updateMessages(msgs => msgs.map(m => m.id === assistantId ? { ...m, content: displayError, streaming: false, error: displayError } : m.role === 'user' && m.optimistic && m.timestamp.getTime() >= streamStartTime ? { ...m, optimistic: false } : m ) ); set({ isStreaming: false, activeRunId: null }); }, }, { sessionKey: effectiveSessionKey, agentId: effectiveAgentId, thinking_enabled: get().getChatModeConfig().thinking_enabled, reasoning_effort: get().getChatModeConfig().reasoning_effort, plan_mode: get().getChatModeConfig().plan_mode, subagent_enabled: get().getChatModeConfig().subagent_enabled, history, model: currentModel || undefined, } as Parameters[2] ); if (result?.runId) { runId = result.runId; set({ activeRunId: runId }); } if (!sessionKey) { useConversationStore.setState({ sessionKey: effectiveSessionKey }); } _chat?.updateMessages(msgs => msgs.map(m => m.id === assistantId ? { ...m, runId } : m ) ); } catch (err: unknown) { buffer.clear(); let errorMessage = err instanceof Error ? err.message : '无法连接 Gateway'; const recoveryMsg = await tryRecoverFromAuthError(errorMessage); if (recoveryMsg) errorMessage = recoveryMsg; _chat.updateMessages(msgs => msgs.map(m => m.id === assistantId ? { ...m, content: errorMessage, streaming: false, error: errorMessage, } : m.role === 'user' && m.optimistic && m.timestamp.getTime() >= streamStartTime ? { ...m, optimistic: false } : m ) ); set({ isStreaming: false, activeRunId: null }); } }, // ── Cancel Stream ── cancelStream: () => { if (!_chat) return; const { activeRunId, isStreaming } = get(); if (!isStreaming) return; // 1. Tell backend to abort — use sessionKey (which is the sessionId in Tauri) // Also abort the frontend SSE fetch via cancelStream() try { const client = getClient() as unknown as Record; if ('cancelStream' in client) { const fn = client.cancelStream; if (typeof fn === 'function') { // Call with or without sessionId depending on arity if (fn.length > 0) { const sessionId = useConversationStore.getState().sessionKey || activeRunId || ''; (fn as (id: string) => void)(sessionId); } else { (fn as () => void)(); } } } } catch { // Backend cancel is best-effort; proceed with local cleanup } // 2. Mark the streaming message as cancelled _chat.updateMessages(msgs => msgs.map(m => { if (m.streaming) { return { ...m, streaming: false, error: m.content ? undefined : '已取消', }; } if (m.optimistic) { return { ...m, optimistic: false }; } return m; }) ); // 3. Immediately persist the conversation const currentMsgs = _chat.getMessages(); if (currentMsgs) { useConversationStore.getState().upsertActiveConversation(currentMsgs); } // 4. Reset streaming state and clear sessionKey so next send gets a fresh session set({ isStreaming: false, activeRunId: null, cancelCooldown: true }); useConversationStore.setState({ sessionKey: null }); log.info('Stream cancelled by user'); // 5. Brief cooldown to prevent race with backend active-stream check setTimeout(() => set({ cancelCooldown: false }), 500); }, // ── Agent Stream Listener ── initStreamListener: () => { const client = getClient(); if (!('onAgentStream' in client)) { return () => {}; } const unsubscribe = client.onAgentStream((delta: AgentStreamDelta) => { const activeRunId = get().activeRunId; if (activeRunId && delta.runId && delta.runId !== activeRunId) return; const msgs = _chat?.getMessages() || []; const streamingMsg = [...msgs] .reverse() .find(m => ( m.role === 'assistant' && m.streaming && ( (delta.runId && m.runId === delta.runId) || (!delta.runId && m.runId === null) ) )); if (!streamingMsg) return; if (delta.stream === 'assistant' && (delta.delta || delta.content)) { _chat?.updateMessages(ms => ms.map(m => m.id === streamingMsg.id ? { ...m, content: m.content + (delta.delta || delta.content || '') } : m ) ); } else if (delta.stream === 'tool') { if (delta.toolOutput) { // toolEnd: find the last running step for this tool and complete it _chat?.updateMessages(ms => ms.map(m => { if (m.id !== streamingMsg.id) return m; const steps = [...(m.toolSteps || [])]; for (let i = steps.length - 1; i >= 0; i--) { if (steps[i].toolName === (delta.tool || 'unknown') && steps[i].status === 'running') { steps[i] = { ...steps[i], output: delta.toolOutput, status: 'completed' as const }; break; } } return { ...m, toolSteps: steps }; }) ); } else { // toolStart: create new running step const step: ToolCallStep = { id: `step_${Date.now()}_${generateRandomString(4)}`, toolName: delta.tool || 'unknown', input: delta.toolInput, output: '', status: 'running', timestamp: new Date(), }; _chat?.updateMessages(ms => ms.map(m => m.id === streamingMsg.id ? { ...m, toolSteps: [...(m.toolSteps || []), step] } : m ) ); } } else if (delta.stream === 'lifecycle') { if (delta.phase === 'end' || delta.phase === 'error') { if (delta.phase === 'end') { const currentMsgs = _chat?.getMessages(); if (currentMsgs) { useConversationStore.getState().upsertActiveConversation(currentMsgs); } } _chat?.updateMessages(ms => ms.map(m => { if (m.id === streamingMsg.id) { return { ...m, streaming: false, error: delta.phase === 'error' ? delta.error : undefined, }; } if (m.optimistic) { return { ...m, optimistic: false }; } return m; }) ); set({ isStreaming: false, activeRunId: null }); if (delta.phase === 'end') { // Record token usage if present in lifecycle:end event const inputTokens = delta.input_tokens; const outputTokens = delta.output_tokens; if (typeof inputTokens === 'number' && typeof outputTokens === 'number' && inputTokens > 0 && outputTokens > 0) { useMessageStore.getState().addTokenUsage(inputTokens, outputTokens); } const latestMsgs = _chat?.getMessages() || []; const conversationMessages = latestMsgs .filter(m => m.role === 'user' || m.role === 'assistant') .filter(m => !m.streaming) .map(m => ({ role: m.role, content: m.content })); generateLLMSuggestions(conversationMessages, set).catch(err => { log.warn('Suggestion generation error:', err); set({ suggestionsLoading: false }); }); } } } else if (delta.stream === 'hand') { const runId = get().activeRunId; if (!runId || (delta.runId && delta.runId !== runId)) return; const handMsg: StreamMsg = { id: `hand_${Date.now()}_${generateRandomString(4)}`, role: 'hand', content: delta.handResult ? (typeof delta.handResult === 'string' ? delta.handResult : JSON.stringify(delta.handResult, null, 2)) : `Hand: ${delta.handName || 'unknown'} - ${delta.handStatus || 'triggered'}`, timestamp: new Date(), runId: delta.runId, handName: delta.handName, handStatus: delta.handStatus, handResult: delta.handResult, }; _chat?.updateMessages(ms => [...ms, handMsg]); } else if (delta.stream === 'workflow') { const runId = get().activeRunId; if (!runId || (delta.runId && delta.runId !== runId)) return; const workflowMsg: StreamMsg = { id: `workflow_${Date.now()}_${generateRandomString(4)}`, role: 'workflow', content: delta.workflowResult ? (typeof delta.workflowResult === 'string' ? delta.workflowResult : JSON.stringify(delta.workflowResult, null, 2)) : `Workflow: ${delta.workflowId || 'unknown'} step ${delta.workflowStep || '?'} - ${delta.workflowStatus || 'running'}`, timestamp: new Date(), runId: delta.runId, workflowId: delta.workflowId, workflowStep: delta.workflowStep, workflowStatus: delta.workflowStatus, workflowResult: delta.workflowResult, }; _chat?.updateMessages(ms => [...ms, workflowMsg]); } }); return unsubscribe; }, }), { name: 'zclaw-stream-storage', partialize: (state) => ({ chatMode: state.chatMode, }), }, ), );