diff --git a/desktop/src/store/chat/streamStore.ts b/desktop/src/store/chat/streamStore.ts index 9eb178a..b11e746 100644 --- a/desktop/src/store/chat/streamStore.ts +++ b/desktop/src/store/chat/streamStore.ts @@ -144,6 +144,265 @@ interface ChatStoreAccess { setChatStoreState: (partial: Record) => void; } +// --------------------------------------------------------------------------- +// Delta buffer — batches text/thinking deltas at ~60fps +// --------------------------------------------------------------------------- + +class DeltaBuffer { + private text = ''; + private think = ''; + private timer: ReturnType | null = null; + + constructor( + private readonly assistantId: string, + private readonly chat: ChatStoreAccess, + ) {} + + appendText(delta: string) { + this.text += delta; + this.scheduleFlush(); + } + + appendThinking(delta: string) { + this.think += delta; + this.scheduleFlush(); + } + + flush() { + this.timer = null; + const text = this.text; + const think = this.think; + this.text = ''; + this.think = ''; + if (text || think) { + this.chat.updateMessages(msgs => + msgs.map(m => { + if (m.id !== this.assistantId) return m; + return { + ...m, + ...(text ? { content: m.content + text } : {}), + ...(think ? { thinkingContent: (m.thinkingContent || '') + think } : {}), + }; + }) + ); + } + } + + clear() { + if (this.timer !== null) { + clearTimeout(this.timer); + this.timer = null; + } + this.text = ''; + this.think = ''; + } + + flushRemaining() { + if (this.timer !== null) { + clearTimeout(this.timer); + this.timer = null; + } + this.flush(); + } + + private scheduleFlush() { + if (this.timer === null) { + this.timer = setTimeout(() => this.flush(), 0); + } + } +} + +// --------------------------------------------------------------------------- +// Stream event handlers (extracted from sendMessage) +// --------------------------------------------------------------------------- + +function createToolHandler(assistantId: string, chat: ChatStoreAccess) { + return (tool: string, input: string, output: string) => { + if (output) { + // toolEnd: complete the last running step for this tool + chat.updateMessages(msgs => + msgs.map(m => { + if (m.id !== assistantId) return m; + const steps = [...(m.toolSteps || [])]; + for (let i = steps.length - 1; i >= 0; i--) { + if (steps[i].toolName === tool && steps[i].status === 'running') { + steps[i] = { ...steps[i], output, status: 'completed' as const }; + break; + } + } + return { ...m, toolSteps: steps }; + }) + ); + + // Auto-create artifact when file_write tool produces output + if (tool === 'file_write') { + try { + const parsed = JSON.parse(output); + const filePath = parsed?.path || parsed?.file_path || ''; + const content = parsed?.content || ''; + if (filePath && content) { + const fileName = filePath.split('/').pop() || filePath; + const ext = fileName.split('.').pop()?.toLowerCase() || ''; + const typeMap: Record = { + ts: 'code', tsx: 'code', js: 'code', jsx: 'code', + py: 'code', rs: 'code', go: 'code', java: 'code', + md: 'markdown', txt: 'text', json: 'code', + html: 'code', css: 'code', sql: 'code', sh: 'code', + }; + const langMap: Record = { + ts: 'typescript', tsx: 'typescript', js: 'javascript', jsx: 'javascript', + py: 'python', rs: 'rust', go: 'go', java: 'java', + html: 'html', css: 'css', sql: 'sql', sh: 'bash', json: 'json', + }; + useArtifactStore.getState().addArtifact({ + id: `artifact_${Date.now()}`, + name: fileName, + content: typeof content === 'string' ? content : JSON.stringify(content, null, 2), + type: typeMap[ext] || 'text', + language: langMap[ext], + createdAt: new Date(), + sourceStepId: assistantId, + }); + } + } catch { /* non-critical: artifact creation from tool output */ } + } + } else { + // toolStart: create new running step + const step: ToolCallStep = { + id: `step_${Date.now()}_${generateRandomString(4)}`, + toolName: tool, + input, + output: '', + status: 'running', + timestamp: new Date(), + }; + chat.updateMessages(msgs => + msgs.map(m => + m.id === assistantId + ? { ...m, toolSteps: [...(m.toolSteps || []), step] } + : m + ) + ); + } + }; +} + +function createHandHandler(chat: ChatStoreAccess, runId: string) { + return (name: string, status: string, result?: unknown) => { + const handMsg: StreamMsg = { + id: `hand_${Date.now()}_${generateRandomString(4)}`, + role: 'hand', + content: result + ? (typeof result === 'string' ? result : JSON.stringify(result, null, 2)) + : `Hand: ${name} - ${status}`, + timestamp: new Date(), + runId, + handName: name, + handStatus: status, + handResult: result, + }; + chat.updateMessages(msgs => [...msgs, handMsg]); + }; +} + +function createSubtaskHandler(assistantId: string, chat: ChatStoreAccess) { + return (taskId: string, description: string, status: string, detail?: string) => { + const statusMap: Record = { + started: 'pending', + running: 'in_progress', + completed: 'completed', + failed: 'failed', + }; + const mappedStatus = statusMap[status] || 'in_progress'; + + chat.updateMessages(msgs => + msgs.map(m => { + if (m.id !== assistantId) return m; + const subtasks = [...(m.subtasks || [])]; + const existingIdx = subtasks.findIndex(st => st.id === taskId); + if (existingIdx >= 0) { + subtasks[existingIdx] = { ...subtasks[existingIdx], status: mappedStatus, result: detail }; + } else { + subtasks.push({ id: taskId, description, status: mappedStatus, result: detail }); + } + return { ...m, subtasks }; + }) + ); + }; +} + +function createCompleteHandler( + assistantId: string, + _streamStartTime: number, + chat: ChatStoreAccess, + buffer: DeltaBuffer, + agentId: string, + set: (partial: Partial) => void, +) { + return (inputTokens?: number, outputTokens?: number) => { + buffer.flushRemaining(); + + const currentMsgs = chat.getMessages(); + if (currentMsgs) { + useConversationStore.getState().upsertActiveConversation(currentMsgs); + } + + chat.updateMessages(msgs => + msgs.map(m => { + if (m.id === assistantId) return { ...m, streaming: false }; + if (m.optimistic) return { ...m, optimistic: false }; + return m; + }) + ); + set({ isStreaming: false, activeRunId: null }); + + if (inputTokens !== undefined && outputTokens !== undefined) { + useMessageStore.getState().addTokenUsage(inputTokens, outputTokens); + chat.setChatStoreState({ + totalInputTokens: useMessageStore.getState().totalInputTokens, + totalOutputTokens: useMessageStore.getState().totalOutputTokens, + }); + } + + // Async memory extraction + const msgs = chat.getMessages() || []; + const filtered = msgs + .filter(m => m.role === 'user' || m.role === 'assistant') + .map(m => ({ role: m.role, content: m.content })); + const convId = useConversationStore.getState().currentConversationId; + getMemoryExtractor().extractFromConversation(filtered, agentId, convId ?? undefined) + .then(() => { + if (typeof window !== 'undefined') { + window.dispatchEvent(new CustomEvent('zclaw:agent-profile-updated', { + detail: { agentId } + })); + } + }) + .catch(err => log.warn('Memory extraction failed:', err)); + + intelligenceClient.reflection.recordConversation().catch(err => { + log.warn('Recording conversation failed:', err); + }); + intelligenceClient.reflection.shouldReflect().then(shouldReflect => { + if (shouldReflect) { + intelligenceClient.reflection.reflect(agentId, []).catch(err => { + log.warn('Reflection failed:', err); + }); + } + }); + + // Follow-up suggestions + const latestMsgs = chat.getMessages() || []; + const completedMsg = latestMsgs.find(m => m.id === assistantId); + if (completedMsg?.content) { + const suggestions = generateFollowUpSuggestions(completedMsg.content); + if (suggestions.length > 0) { + set({ suggestions }); + } + } + }; +} + export interface StreamState { isStreaming: boolean; /** Brief cooldown after cancelStream — prevents race with backend active-stream check */ @@ -322,40 +581,8 @@ export const useStreamStore = create()( }); set({ isStreaming: true, activeRunId: null }); - // ── Delta buffering ── - // Accumulate text/thinking deltas in local buffers and flush to store - // at ~60fps intervals. This prevents React "Maximum update depth exceeded" - // when the LLM emits many small deltas per frame (e.g. Kimi thinking). - let textBuffer = ''; - let thinkBuffer = ''; - let flushTimer: ReturnType | null = null; - - const flushBuffers = () => { - flushTimer = null; - const text = textBuffer; - const think = thinkBuffer; - textBuffer = ''; - thinkBuffer = ''; - - if (text || think) { - _chat?.updateMessages(msgs => - msgs.map(m => { - if (m.id !== assistantId) return m; - return { - ...m, - ...(text ? { content: m.content + text } : {}), - ...(think ? { thinkingContent: (m.thinkingContent || '') + think } : {}), - }; - }) - ); - } - }; - - const scheduleFlush = () => { - if (flushTimer === null) { - flushTimer = setTimeout(flushBuffers, 0); - } - }; + // Delta buffer — batches updates at ~60fps + const buffer = new DeltaBuffer(assistantId, _chat); try { const client = getClient(); @@ -384,213 +611,16 @@ export const useStreamStore = create()( const result = await client.chatStream( content, { - onDelta: (delta: string) => { - textBuffer += delta; - scheduleFlush(); - }, - onThinkingDelta: (delta: string) => { - thinkBuffer += delta; - scheduleFlush(); - }, - onTool: (tool: string, input: string, output: string) => { - if (output) { - // toolEnd: find the last running step for this tool and complete it - _chat?.updateMessages(msgs => - msgs.map(m => { - if (m.id !== assistantId) return m; - const steps = [...(m.toolSteps || [])]; - for (let i = steps.length - 1; i >= 0; i--) { - if (steps[i].toolName === tool && steps[i].status === 'running') { - steps[i] = { ...steps[i], output, status: 'completed' as const }; - break; - } - } - return { ...m, toolSteps: steps }; - }) - ); - - // Auto-create artifact when file_write tool produces output - if (tool === 'file_write' && output) { - try { - const parsed = JSON.parse(output); - const filePath = parsed?.path || parsed?.file_path || ''; - const content = parsed?.content || ''; - if (filePath && content) { - const fileName = filePath.split('/').pop() || filePath; - const ext = fileName.split('.').pop()?.toLowerCase() || ''; - const typeMap: Record = { - ts: 'code', tsx: 'code', js: 'code', jsx: 'code', - py: 'code', rs: 'code', go: 'code', java: 'code', - md: 'markdown', txt: 'text', json: 'code', - html: 'code', css: 'code', sql: 'code', sh: 'code', - }; - const langMap: Record = { - ts: 'typescript', tsx: 'typescript', js: 'javascript', jsx: 'javascript', - py: 'python', rs: 'rust', go: 'go', java: 'java', - html: 'html', css: 'css', sql: 'sql', sh: 'bash', json: 'json', - }; - useArtifactStore.getState().addArtifact({ - id: `artifact_${Date.now()}`, - name: fileName, - content: typeof content === 'string' ? content : JSON.stringify(content, null, 2), - type: typeMap[ext] || 'text', - language: langMap[ext], - createdAt: new Date(), - sourceStepId: assistantId, - }); - } - } catch { /* non-critical: artifact creation from tool output */ } - } - } else { - // toolStart: create new running step - const step: ToolCallStep = { - id: `step_${Date.now()}_${generateRandomString(4)}`, - toolName: tool, - input, - output: '', - status: 'running', - timestamp: new Date(), - }; - _chat?.updateMessages(msgs => - msgs.map(m => - m.id === assistantId - ? { ...m, toolSteps: [...(m.toolSteps || []), step] } - : m - ) - ); - } - }, - onHand: (name: string, status: string, result?: unknown) => { - const handMsg: StreamMsg = { - id: `hand_${Date.now()}_${generateRandomString(4)}`, - role: 'hand', - content: result - ? (typeof result === 'string' ? result : JSON.stringify(result, null, 2)) - : `Hand: ${name} - ${status}`, - timestamp: new Date(), - runId, - handName: name, - handStatus: status, - handResult: result, - }; - _chat?.updateMessages(msgs => [...msgs, handMsg]); - - }, - onSubtaskStatus: (taskId: string, description: string, status: string, detail?: string) => { - // Map backend status to frontend Subtask status - const statusMap: Record = { - started: 'pending', - running: 'in_progress', - completed: 'completed', - failed: 'failed', - }; - const mappedStatus = statusMap[status] || 'in_progress'; - - _chat?.updateMessages(msgs => - msgs.map(m => { - if (m.id !== assistantId) return m; - const subtasks = [...(m.subtasks || [])]; - const existingIdx = subtasks.findIndex(st => st.id === taskId); - if (existingIdx >= 0) { - subtasks[existingIdx] = { ...subtasks[existingIdx], status: mappedStatus, result: detail }; - } else { - subtasks.push({ - id: taskId, - description, - status: mappedStatus, - result: detail, - }); - } - return { ...m, subtasks }; - }) - ); - }, - onComplete: (inputTokens?: number, outputTokens?: number) => { - // Flush any remaining buffered deltas before finalizing - if (flushTimer !== null) { - clearTimeout(flushTimer); - flushTimer = null; - } - flushBuffers(); - - const currentMsgs = _chat?.getMessages(); - - if (currentMsgs) { - useConversationStore.getState().upsertActiveConversation(currentMsgs); - } - - _chat?.updateMessages(msgs => - msgs.map(m => { - if (m.id === assistantId) { - return { ...m, streaming: false, runId }; - } - if (m.optimistic) { - return { ...m, optimistic: false }; - } - return m; - }) - ); - set({ isStreaming: false, activeRunId: null }); - - if (inputTokens !== undefined && outputTokens !== undefined) { - useMessageStore.getState().addTokenUsage(inputTokens, outputTokens); - _chat?.setChatStoreState({ - totalInputTokens: useMessageStore.getState().totalInputTokens, - totalOutputTokens: useMessageStore.getState().totalOutputTokens, - }); - } - - // Async memory extraction — dispatch update event AFTER extraction completes - const msgs = _chat?.getMessages() || []; - const filtered = msgs - .filter(m => m.role === 'user' || m.role === 'assistant') - .map(m => ({ role: m.role, content: m.content })); - const convId = useConversationStore.getState().currentConversationId; - getMemoryExtractor().extractFromConversation(filtered, agentId, convId ?? undefined) - .then(() => { - // Notify RightPanel to refresh UserProfile after memory extraction completes - if (typeof window !== 'undefined') { - window.dispatchEvent(new CustomEvent('zclaw:agent-profile-updated', { - detail: { agentId } - })); - } - }) - .catch(err => { - log.warn('Memory extraction failed:', err); - }); - intelligenceClient.reflection.recordConversation().catch(err => { - log.warn('Recording conversation failed:', err); - }); - intelligenceClient.reflection.shouldReflect().then(shouldReflect => { - if (shouldReflect) { - intelligenceClient.reflection.reflect(agentId, []).catch(err => { - log.warn('Reflection failed:', err); - }); - } - }); - - // Follow-up suggestions - const latestMsgs = _chat?.getMessages() || []; - const completedMsg = latestMsgs.find(m => m.id === assistantId); - if (completedMsg?.content) { - const suggestions = generateFollowUpSuggestions(completedMsg.content); - if (suggestions.length > 0) { - get().setSuggestions(suggestions); - } - } - }, + onDelta: (delta: string) => buffer.appendText(delta), + onThinkingDelta: (delta: string) => buffer.appendThinking(delta), + onTool: createToolHandler(assistantId, _chat), + onHand: createHandHandler(_chat, runId), + onSubtaskStatus: createSubtaskHandler(assistantId, _chat), + onComplete: createCompleteHandler(assistantId, streamStartTime, _chat, buffer, agentId, set), onError: async (error: string) => { - // Flush any remaining buffered deltas before erroring - if (flushTimer !== null) { - clearTimeout(flushTimer); - flushTimer = null; - } - flushBuffers(); - - // Attempt 401 auth recovery (token refresh + kernel reconnect) + buffer.flushRemaining(); const recoveryMsg = await tryRecoverFromAuthError(error); - const rawError = recoveryMsg || error; - const displayError = formatUserError(rawError); + const displayError = formatUserError(recoveryMsg || error); _chat?.updateMessages(msgs => msgs.map(m => @@ -604,7 +634,6 @@ export const useStreamStore = create()( set({ isStreaming: false, activeRunId: null }); }, }, - // eslint-disable-next-line @typescript-eslint/no-explicit-any { sessionKey: effectiveSessionKey, agentId: effectiveAgentId, @@ -632,21 +661,13 @@ export const useStreamStore = create()( ) ); } catch (err: unknown) { - // Flush remaining buffers on error - if (flushTimer !== null) { - clearTimeout(flushTimer); - flushTimer = null; - } - textBuffer = ''; - thinkBuffer = ''; + buffer.clear(); let errorMessage = err instanceof Error ? err.message : '无法连接 Gateway'; - - // Attempt 401 auth recovery const recoveryMsg = await tryRecoverFromAuthError(errorMessage); if (recoveryMsg) errorMessage = recoveryMsg; - _chat?.updateMessages(msgs => + _chat.updateMessages(msgs => msgs.map(m => m.id === assistantId ? {