Some checks failed
CI / Lint & TypeCheck (push) Has been cancelled
CI / Unit Tests (push) Has been cancelled
CI / Build Frontend (push) Has been cancelled
CI / Rust Check (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
CI / E2E Tests (push) Has been cancelled
Root cause: Each LLM delta (text/thinking) triggered a synchronous setState via updateMessages → chatStore.setState. With Kimi thinking model emitting many deltas per frame, this caused a React render storm that hit the maximum update depth limit. Fix (two-layer approach): 1. streamStore: Buffer text/thinking deltas locally and flush to store via setTimeout(0), batching multiple deltas per frame 2. chatStore: Microtask batching in injectChatStore.updateMessages to coalesce rapid successive updates Verified: 2-round conversation (4 messages) with Kimi thinking model completes without crash. Previously crashed 100% on 2nd message.
765 lines
27 KiB
TypeScript
765 lines
27 KiB
TypeScript
/**
|
|
* StreamStore — manages streaming orchestration, chat mode, and suggestions.
|
|
*
|
|
* Extracted from chatStore.ts as part of the structured refactor (Phase 4).
|
|
* Responsible for:
|
|
* - Stream lifecycle (sendMessage, initStreamListener)
|
|
* - Chat mode state (chatMode, setChatMode, getChatModeConfig)
|
|
* - Follow-up suggestions
|
|
* - Skill search
|
|
*
|
|
* Design: streamStore holds its own `isStreaming` state and delegates
|
|
* message mutations to chatStore via an injected reference. This avoids
|
|
* circular imports while keeping high-frequency streaming updates
|
|
* (dozens of set() calls per second) on a single Zustand store.
|
|
*/
|
|
|
|
import { create } from 'zustand';
|
|
import { persist } from 'zustand/middleware';
|
|
import type { AgentStreamDelta } from '../../lib/gateway-client';
|
|
import { getClient } from '../../store/connectionStore';
|
|
import { intelligenceClient } from '../../lib/intelligence-client';
|
|
import { getMemoryExtractor } from '../../lib/memory-extractor';
|
|
import { getSkillDiscovery } from '../../lib/skill-discovery';
|
|
import { useOfflineStore, isOffline } from '../../store/offlineStore';
|
|
import { useConnectionStore } from '../../store/connectionStore';
|
|
import { createLogger } from '../../lib/logger';
|
|
import { speechSynth } from '../../lib/speech-synth';
|
|
import { generateRandomString } from '../../lib/crypto-utils';
|
|
import type { ChatModeType, ChatModeConfig, Subtask } from '../../components/ai';
|
|
import type { ToolCallStep } from '../../components/ai';
|
|
import { CHAT_MODES } from '../../components/ai';
|
|
import {
|
|
useConversationStore,
|
|
resolveGatewayAgentId,
|
|
} from './conversationStore';
|
|
import { useMessageStore } from './messageStore';
|
|
import { useArtifactStore } from './artifactStore';
|
|
|
|
const log = createLogger('StreamStore');
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Types
|
|
// ---------------------------------------------------------------------------
|
|
|
|
/** Message shape used internally by streamStore for typed callbacks. */
|
|
interface StreamMsg {
|
|
id: string;
|
|
role: 'user' | 'assistant' | 'tool' | 'hand' | 'workflow' | 'system';
|
|
content: string;
|
|
timestamp: Date;
|
|
streaming?: boolean;
|
|
optimistic?: boolean;
|
|
runId?: string;
|
|
error?: string;
|
|
thinkingContent?: string;
|
|
toolSteps?: ToolCallStep[];
|
|
handName?: string;
|
|
handStatus?: string;
|
|
handResult?: unknown;
|
|
workflowId?: string;
|
|
workflowStep?: string;
|
|
workflowStatus?: string;
|
|
workflowResult?: unknown;
|
|
subtasks?: Subtask[];
|
|
}
|
|
|
|
/** Shape of the chatStore methods needed by streamStore. */
|
|
interface ChatStoreAccess {
|
|
addMessage: (msg: StreamMsg) => void;
|
|
updateMessages: (updater: (msgs: StreamMsg[]) => StreamMsg[]) => void;
|
|
getMessages: () => StreamMsg[];
|
|
setChatStoreState: (partial: Record<string, unknown>) => void;
|
|
}
|
|
|
|
export interface StreamState {
|
|
isStreaming: boolean;
|
|
isLoading: boolean;
|
|
chatMode: ChatModeType;
|
|
suggestions: string[];
|
|
/** Run ID of the currently active stream (null when idle). */
|
|
activeRunId: string | null;
|
|
|
|
// Core streaming
|
|
sendMessage: (content: string) => Promise<void>;
|
|
initStreamListener: () => () => void;
|
|
/** Cancel the active stream: resets state, marks message cancelled, saves conversation. */
|
|
cancelStream: () => void;
|
|
|
|
// Chat mode
|
|
setChatMode: (mode: ChatModeType) => void;
|
|
getChatModeConfig: () => ChatModeConfig;
|
|
|
|
// Suggestions
|
|
setSuggestions: (suggestions: string[]) => void;
|
|
|
|
// Skill search
|
|
searchSkills: (query: string) => {
|
|
results: Array<{ id: string; name: string; description: string }>;
|
|
totalAvailable: number;
|
|
};
|
|
|
|
// Loading
|
|
setIsLoading: (loading: boolean) => void;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Follow-up suggestion generator
|
|
// ---------------------------------------------------------------------------
|
|
|
|
function generateFollowUpSuggestions(content: string): string[] {
|
|
const suggestions: string[] = [];
|
|
const lower = content.toLowerCase();
|
|
|
|
const patterns: Array<{ keywords: string[]; suggestion: string }> = [
|
|
{ keywords: ['代码', 'code', 'function', '函数', '实现'], suggestion: '解释这段代码的工作原理' },
|
|
{ keywords: ['错误', 'error', 'bug', '问题'], suggestion: '如何调试这个问题?' },
|
|
{ keywords: ['数据', 'data', '分析', '统计'], suggestion: '可视化这些数据' },
|
|
{ keywords: ['步骤', 'step', '流程', '方案'], suggestion: '详细说明第一步该怎么做' },
|
|
{ keywords: ['可以', '建议', '推荐', '试试'], suggestion: '还有其他方案吗?' },
|
|
{ keywords: ['文件', 'file', '保存', '写入'], suggestion: '查看生成的文件内容' },
|
|
{ keywords: ['搜索', 'search', '查找', 'research'], suggestion: '搜索更多相关信息' },
|
|
];
|
|
|
|
for (const { keywords, suggestion } of patterns) {
|
|
if (keywords.some(kw => lower.includes(kw))) {
|
|
if (!suggestions.includes(suggestion)) {
|
|
suggestions.push(suggestion);
|
|
}
|
|
}
|
|
if (suggestions.length >= 3) break;
|
|
}
|
|
|
|
const generic = ['继续深入分析', '换个角度看看', '用简单的话解释'];
|
|
while (suggestions.length < 3) {
|
|
const next = generic.find(g => !suggestions.includes(g));
|
|
if (next) suggestions.push(next);
|
|
else break;
|
|
}
|
|
|
|
return suggestions;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// ChatStore injection (avoids circular imports)
|
|
// ---------------------------------------------------------------------------
|
|
|
|
let _chat: ChatStoreAccess | null = null;
|
|
|
|
/**
|
|
* Inject chatStore access for message mutations.
|
|
* Called by chatStore after creation.
|
|
*/
|
|
export function injectChatStore(access: ChatStoreAccess): void {
|
|
_chat = access;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Store
|
|
// ---------------------------------------------------------------------------
|
|
|
|
export const useStreamStore = create<StreamState>()(
|
|
persist(
|
|
(set, get) => ({
|
|
isStreaming: false,
|
|
isLoading: false,
|
|
chatMode: 'thinking' as ChatModeType,
|
|
suggestions: [],
|
|
activeRunId: null as string | null,
|
|
|
|
// ── Chat Mode ──
|
|
|
|
setChatMode: (mode: ChatModeType) => set({ chatMode: mode }),
|
|
|
|
getChatModeConfig: () => CHAT_MODES[get().chatMode].config,
|
|
|
|
setSuggestions: (suggestions: string[]) => set({ suggestions }),
|
|
|
|
setIsLoading: (loading: boolean) => set({ isLoading: loading }),
|
|
|
|
// ── Skill Search ──
|
|
|
|
searchSkills: (query: string) => {
|
|
const discovery = getSkillDiscovery();
|
|
const result = discovery.searchSkills(query);
|
|
return {
|
|
results: result.results.map(s => ({ id: s.id, name: s.name, description: s.description })),
|
|
totalAvailable: result.totalAvailable,
|
|
};
|
|
},
|
|
|
|
// ── Core: sendMessage ──
|
|
|
|
sendMessage: async (content: string) => {
|
|
if (get().isStreaming) return;
|
|
if (!_chat) {
|
|
log.warn('sendMessage called before chatStore injection');
|
|
return;
|
|
}
|
|
|
|
const convStore = useConversationStore.getState();
|
|
const currentAgent = convStore.currentAgent;
|
|
const sessionKey = convStore.sessionKey;
|
|
|
|
set({ suggestions: [] });
|
|
const effectiveSessionKey = sessionKey || crypto.randomUUID();
|
|
const effectiveAgentId = resolveGatewayAgentId(currentAgent);
|
|
const agentId = currentAgent?.id || 'zclaw-main';
|
|
|
|
// Offline path
|
|
if (isOffline()) {
|
|
const { queueMessage } = useOfflineStore.getState();
|
|
const queueId = queueMessage(content, effectiveAgentId, effectiveSessionKey);
|
|
log.debug(`Offline - message queued: ${queueId}`);
|
|
|
|
_chat.addMessage({
|
|
id: `system_${Date.now()}`,
|
|
role: 'system',
|
|
content: `后端服务不可用,消息已保存到本地队列。恢复连接后将自动发送。`,
|
|
timestamp: new Date(),
|
|
});
|
|
|
|
_chat.addMessage({
|
|
id: `user_${Date.now()}`,
|
|
role: 'user',
|
|
content,
|
|
timestamp: new Date(),
|
|
});
|
|
return;
|
|
}
|
|
|
|
const streamStartTime = Date.now();
|
|
_chat.addMessage({
|
|
id: `user_${streamStartTime}`,
|
|
role: 'user',
|
|
content,
|
|
timestamp: new Date(streamStartTime),
|
|
optimistic: true,
|
|
});
|
|
|
|
const assistantId = `assistant_${Date.now()}`;
|
|
_chat.addMessage({
|
|
id: assistantId,
|
|
role: 'assistant',
|
|
content: '',
|
|
timestamp: new Date(),
|
|
streaming: true,
|
|
});
|
|
set({ isStreaming: true, activeRunId: null });
|
|
|
|
// ── Delta buffering ──
|
|
// Accumulate text/thinking deltas in local buffers and flush to store
|
|
// at ~60fps intervals. This prevents React "Maximum update depth exceeded"
|
|
// when the LLM emits many small deltas per frame (e.g. Kimi thinking).
|
|
let textBuffer = '';
|
|
let thinkBuffer = '';
|
|
let flushTimer: ReturnType<typeof setTimeout> | null = null;
|
|
|
|
const flushBuffers = () => {
|
|
flushTimer = null;
|
|
const text = textBuffer;
|
|
const think = thinkBuffer;
|
|
textBuffer = '';
|
|
thinkBuffer = '';
|
|
|
|
if (text || think) {
|
|
_chat?.updateMessages(msgs =>
|
|
msgs.map(m => {
|
|
if (m.id !== assistantId) return m;
|
|
return {
|
|
...m,
|
|
...(text ? { content: m.content + text } : {}),
|
|
...(think ? { thinkingContent: (m.thinkingContent || '') + think } : {}),
|
|
};
|
|
})
|
|
);
|
|
}
|
|
};
|
|
|
|
const scheduleFlush = () => {
|
|
if (flushTimer === null) {
|
|
flushTimer = setTimeout(flushBuffers, 0);
|
|
}
|
|
};
|
|
|
|
try {
|
|
const client = getClient();
|
|
const connectionState = useConnectionStore.getState().connectionState;
|
|
|
|
if (connectionState !== 'connected') {
|
|
throw new Error(`Not connected (state: ${connectionState})`);
|
|
}
|
|
|
|
let runId = `run_${Date.now()}`;
|
|
|
|
if (!useConversationStore.getState().sessionKey) {
|
|
useConversationStore.setState({ sessionKey: effectiveSessionKey });
|
|
}
|
|
|
|
const result = await client.chatStream(
|
|
content,
|
|
{
|
|
onDelta: (delta: string) => {
|
|
textBuffer += delta;
|
|
scheduleFlush();
|
|
},
|
|
onThinkingDelta: (delta: string) => {
|
|
thinkBuffer += delta;
|
|
scheduleFlush();
|
|
},
|
|
onTool: (tool: string, input: string, output: string) => {
|
|
if (output) {
|
|
// toolEnd: find the last running step for this tool and complete it
|
|
_chat?.updateMessages(msgs =>
|
|
msgs.map(m => {
|
|
if (m.id !== assistantId) return m;
|
|
const steps = [...(m.toolSteps || [])];
|
|
for (let i = steps.length - 1; i >= 0; i--) {
|
|
if (steps[i].toolName === tool && steps[i].status === 'running') {
|
|
steps[i] = { ...steps[i], output, status: 'completed' as const };
|
|
break;
|
|
}
|
|
}
|
|
return { ...m, toolSteps: steps };
|
|
})
|
|
);
|
|
|
|
// Auto-create artifact when file_write tool produces output
|
|
if (tool === 'file_write' && output) {
|
|
try {
|
|
const parsed = JSON.parse(output);
|
|
const filePath = parsed?.path || parsed?.file_path || '';
|
|
const content = parsed?.content || '';
|
|
if (filePath && content) {
|
|
const fileName = filePath.split('/').pop() || filePath;
|
|
const ext = fileName.split('.').pop()?.toLowerCase() || '';
|
|
const typeMap: Record<string, 'code' | 'markdown' | 'text'> = {
|
|
ts: 'code', tsx: 'code', js: 'code', jsx: 'code',
|
|
py: 'code', rs: 'code', go: 'code', java: 'code',
|
|
md: 'markdown', txt: 'text', json: 'code',
|
|
html: 'code', css: 'code', sql: 'code', sh: 'code',
|
|
};
|
|
const langMap: Record<string, string> = {
|
|
ts: 'typescript', tsx: 'typescript', js: 'javascript', jsx: 'javascript',
|
|
py: 'python', rs: 'rust', go: 'go', java: 'java',
|
|
html: 'html', css: 'css', sql: 'sql', sh: 'bash', json: 'json',
|
|
};
|
|
useArtifactStore.getState().addArtifact({
|
|
id: `artifact_${Date.now()}`,
|
|
name: fileName,
|
|
content: typeof content === 'string' ? content : JSON.stringify(content, null, 2),
|
|
type: typeMap[ext] || 'text',
|
|
language: langMap[ext],
|
|
createdAt: new Date(),
|
|
sourceStepId: assistantId,
|
|
});
|
|
}
|
|
} catch { /* non-critical: artifact creation from tool output */ }
|
|
}
|
|
} else {
|
|
// toolStart: create new running step
|
|
const step: ToolCallStep = {
|
|
id: `step_${Date.now()}_${generateRandomString(4)}`,
|
|
toolName: tool,
|
|
input,
|
|
output: '',
|
|
status: 'running',
|
|
timestamp: new Date(),
|
|
};
|
|
_chat?.updateMessages(msgs =>
|
|
msgs.map(m =>
|
|
m.id === assistantId
|
|
? { ...m, toolSteps: [...(m.toolSteps || []), step] }
|
|
: m
|
|
)
|
|
);
|
|
}
|
|
},
|
|
onHand: (name: string, status: string, result?: unknown) => {
|
|
const handMsg: StreamMsg = {
|
|
id: `hand_${Date.now()}_${generateRandomString(4)}`,
|
|
role: 'hand',
|
|
content: result
|
|
? (typeof result === 'string' ? result : JSON.stringify(result, null, 2))
|
|
: `Hand: ${name} - ${status}`,
|
|
timestamp: new Date(),
|
|
runId,
|
|
handName: name,
|
|
handStatus: status,
|
|
handResult: result,
|
|
};
|
|
_chat?.updateMessages(msgs => [...msgs, handMsg]);
|
|
|
|
if (name === 'speech' && status === 'completed' && result && typeof result === 'object') {
|
|
const res = result as Record<string, unknown>;
|
|
if (res.tts_method === 'browser' && typeof res.text === 'string' && res.text) {
|
|
speechSynth.speak({
|
|
text: res.text as string,
|
|
voice: (res.voice as string) || undefined,
|
|
language: (res.language as string) || undefined,
|
|
rate: typeof res.rate === 'number' ? res.rate : undefined,
|
|
pitch: typeof res.pitch === 'number' ? res.pitch : undefined,
|
|
volume: typeof res.volume === 'number' ? res.volume : undefined,
|
|
}).catch((err: unknown) => {
|
|
const logger = createLogger('speech-synth');
|
|
logger.warn('Browser TTS failed', { error: String(err) });
|
|
});
|
|
}
|
|
}
|
|
},
|
|
onSubtaskStatus: (taskId: string, description: string, status: string, detail?: string) => {
|
|
// Map backend status to frontend Subtask status
|
|
const statusMap: Record<string, Subtask['status']> = {
|
|
started: 'pending',
|
|
running: 'in_progress',
|
|
completed: 'completed',
|
|
failed: 'failed',
|
|
};
|
|
const mappedStatus = statusMap[status] || 'in_progress';
|
|
|
|
_chat?.updateMessages(msgs =>
|
|
msgs.map(m => {
|
|
if (m.id !== assistantId) return m;
|
|
const subtasks = [...(m.subtasks || [])];
|
|
const existingIdx = subtasks.findIndex(st => st.id === taskId);
|
|
if (existingIdx >= 0) {
|
|
subtasks[existingIdx] = { ...subtasks[existingIdx], status: mappedStatus, result: detail };
|
|
} else {
|
|
subtasks.push({
|
|
id: taskId,
|
|
description,
|
|
status: mappedStatus,
|
|
result: detail,
|
|
});
|
|
}
|
|
return { ...m, subtasks };
|
|
})
|
|
);
|
|
},
|
|
onComplete: (inputTokens?: number, outputTokens?: number) => {
|
|
// Flush any remaining buffered deltas before finalizing
|
|
if (flushTimer !== null) {
|
|
clearTimeout(flushTimer);
|
|
flushTimer = null;
|
|
}
|
|
flushBuffers();
|
|
|
|
const currentMsgs = _chat?.getMessages();
|
|
|
|
if (currentMsgs) {
|
|
useConversationStore.getState().upsertActiveConversation(currentMsgs);
|
|
}
|
|
|
|
_chat?.updateMessages(msgs =>
|
|
msgs.map(m => {
|
|
if (m.id === assistantId) {
|
|
return { ...m, streaming: false, runId };
|
|
}
|
|
if (m.optimistic) {
|
|
return { ...m, optimistic: false };
|
|
}
|
|
return m;
|
|
})
|
|
);
|
|
set({ isStreaming: false, activeRunId: null });
|
|
|
|
if (inputTokens !== undefined && outputTokens !== undefined) {
|
|
useMessageStore.getState().addTokenUsage(inputTokens, outputTokens);
|
|
_chat?.setChatStoreState({
|
|
totalInputTokens: useMessageStore.getState().totalInputTokens,
|
|
totalOutputTokens: useMessageStore.getState().totalOutputTokens,
|
|
});
|
|
}
|
|
|
|
// Async memory extraction
|
|
const msgs = _chat?.getMessages() || [];
|
|
const filtered = msgs
|
|
.filter(m => m.role === 'user' || m.role === 'assistant')
|
|
.map(m => ({ role: m.role, content: m.content }));
|
|
const convId = useConversationStore.getState().currentConversationId;
|
|
getMemoryExtractor().extractFromConversation(filtered, agentId, convId ?? undefined).catch(err => {
|
|
log.warn('Memory extraction failed:', err);
|
|
});
|
|
intelligenceClient.reflection.recordConversation().catch(err => {
|
|
log.warn('Recording conversation failed:', err);
|
|
});
|
|
intelligenceClient.reflection.shouldReflect().then(shouldReflect => {
|
|
if (shouldReflect) {
|
|
intelligenceClient.reflection.reflect(agentId, []).catch(err => {
|
|
log.warn('Reflection failed:', err);
|
|
});
|
|
}
|
|
});
|
|
|
|
// Follow-up suggestions
|
|
const latestMsgs = _chat?.getMessages() || [];
|
|
const completedMsg = latestMsgs.find(m => m.id === assistantId);
|
|
if (completedMsg?.content) {
|
|
const suggestions = generateFollowUpSuggestions(completedMsg.content);
|
|
if (suggestions.length > 0) {
|
|
get().setSuggestions(suggestions);
|
|
}
|
|
}
|
|
},
|
|
onError: (error: string) => {
|
|
_chat?.updateMessages(msgs =>
|
|
msgs.map(m =>
|
|
m.id === assistantId
|
|
? { ...m, content: '', streaming: false, error }
|
|
: m.role === 'user' && m.optimistic && m.timestamp.getTime() >= streamStartTime
|
|
? { ...m, optimistic: false }
|
|
: m
|
|
)
|
|
);
|
|
set({ isStreaming: false, activeRunId: null });
|
|
},
|
|
},
|
|
{
|
|
sessionKey: effectiveSessionKey,
|
|
agentId: effectiveAgentId,
|
|
thinking_enabled: get().getChatModeConfig().thinking_enabled,
|
|
reasoning_effort: get().getChatModeConfig().reasoning_effort,
|
|
plan_mode: get().getChatModeConfig().plan_mode,
|
|
subagent_enabled: get().getChatModeConfig().subagent_enabled,
|
|
}
|
|
);
|
|
|
|
if (result?.runId) {
|
|
runId = result.runId;
|
|
set({ activeRunId: runId });
|
|
}
|
|
|
|
if (!sessionKey) {
|
|
useConversationStore.setState({ sessionKey: effectiveSessionKey });
|
|
}
|
|
|
|
_chat?.updateMessages(msgs =>
|
|
msgs.map(m =>
|
|
m.id === assistantId ? { ...m, runId } : m
|
|
)
|
|
);
|
|
} catch (err: unknown) {
|
|
// Flush remaining buffers on error
|
|
if (flushTimer !== null) {
|
|
clearTimeout(flushTimer);
|
|
flushTimer = null;
|
|
}
|
|
textBuffer = '';
|
|
thinkBuffer = '';
|
|
|
|
const errorMessage = err instanceof Error ? err.message : '无法连接 Gateway';
|
|
_chat?.updateMessages(msgs =>
|
|
msgs.map(m =>
|
|
m.id === assistantId
|
|
? {
|
|
...m,
|
|
content: `⚠️ ${errorMessage}`,
|
|
streaming: false,
|
|
error: errorMessage,
|
|
}
|
|
: m.role === 'user' && m.optimistic && m.timestamp.getTime() >= streamStartTime
|
|
? { ...m, optimistic: false }
|
|
: m
|
|
)
|
|
);
|
|
set({ isStreaming: false, activeRunId: null });
|
|
}
|
|
},
|
|
|
|
// ── Cancel Stream ──
|
|
|
|
cancelStream: () => {
|
|
if (!_chat) return;
|
|
const { activeRunId, isStreaming } = get();
|
|
if (!isStreaming) return;
|
|
|
|
// 1. Tell backend to abort — use sessionKey (which is the sessionId in Tauri)
|
|
try {
|
|
const client = getClient();
|
|
if ('cancelStream' in client) {
|
|
const sessionId = useConversationStore.getState().sessionKey || activeRunId || '';
|
|
(client as { cancelStream: (id: string) => void }).cancelStream(sessionId);
|
|
}
|
|
} catch {
|
|
// Backend cancel is best-effort; proceed with local cleanup
|
|
}
|
|
|
|
// 2. Mark the streaming message as cancelled
|
|
_chat.updateMessages(msgs =>
|
|
msgs.map(m => {
|
|
if (m.streaming) {
|
|
return {
|
|
...m,
|
|
streaming: false,
|
|
error: m.content ? undefined : '已取消',
|
|
};
|
|
}
|
|
if (m.optimistic) {
|
|
return { ...m, optimistic: false };
|
|
}
|
|
return m;
|
|
})
|
|
);
|
|
|
|
// 3. Immediately persist the conversation
|
|
const currentMsgs = _chat.getMessages();
|
|
if (currentMsgs) {
|
|
useConversationStore.getState().upsertActiveConversation(currentMsgs);
|
|
}
|
|
|
|
// 4. Reset streaming state and clear sessionKey so next send gets a fresh session
|
|
set({ isStreaming: false, activeRunId: null });
|
|
useConversationStore.setState({ sessionKey: null });
|
|
log.info('Stream cancelled by user');
|
|
},
|
|
|
|
// ── Agent Stream Listener ──
|
|
|
|
initStreamListener: () => {
|
|
const client = getClient();
|
|
|
|
if (!('onAgentStream' in client)) {
|
|
return () => {};
|
|
}
|
|
|
|
const unsubscribe = client.onAgentStream((delta: AgentStreamDelta) => {
|
|
const msgs = _chat?.getMessages() || [];
|
|
|
|
const streamingMsg = [...msgs]
|
|
.reverse()
|
|
.find(m => (
|
|
m.role === 'assistant'
|
|
&& m.streaming
|
|
&& (
|
|
(delta.runId && m.runId === delta.runId)
|
|
|| (!delta.runId && m.runId === null)
|
|
)
|
|
))
|
|
|| [...msgs]
|
|
.reverse()
|
|
.find(m => m.role === 'assistant' && m.streaming);
|
|
|
|
if (!streamingMsg) return;
|
|
|
|
if (delta.stream === 'assistant' && (delta.delta || delta.content)) {
|
|
_chat?.updateMessages(ms =>
|
|
ms.map(m =>
|
|
m.id === streamingMsg.id
|
|
? { ...m, content: m.content + (delta.delta || delta.content || '') }
|
|
: m
|
|
)
|
|
);
|
|
} else if (delta.stream === 'tool') {
|
|
if (delta.toolOutput) {
|
|
// toolEnd: find the last running step for this tool and complete it
|
|
_chat?.updateMessages(ms =>
|
|
ms.map(m => {
|
|
if (m.id !== streamingMsg.id) return m;
|
|
const steps = [...(m.toolSteps || [])];
|
|
for (let i = steps.length - 1; i >= 0; i--) {
|
|
if (steps[i].toolName === (delta.tool || 'unknown') && steps[i].status === 'running') {
|
|
steps[i] = { ...steps[i], output: delta.toolOutput, status: 'completed' as const };
|
|
break;
|
|
}
|
|
}
|
|
return { ...m, toolSteps: steps };
|
|
})
|
|
);
|
|
} else {
|
|
// toolStart: create new running step
|
|
const step: ToolCallStep = {
|
|
id: `step_${Date.now()}_${generateRandomString(4)}`,
|
|
toolName: delta.tool || 'unknown',
|
|
input: delta.toolInput,
|
|
output: '',
|
|
status: 'running',
|
|
timestamp: new Date(),
|
|
};
|
|
_chat?.updateMessages(ms =>
|
|
ms.map(m =>
|
|
m.id === streamingMsg.id
|
|
? { ...m, toolSteps: [...(m.toolSteps || []), step] }
|
|
: m
|
|
)
|
|
);
|
|
}
|
|
} else if (delta.stream === 'lifecycle') {
|
|
if (delta.phase === 'end' || delta.phase === 'error') {
|
|
if (delta.phase === 'end') {
|
|
const currentMsgs = _chat?.getMessages();
|
|
if (currentMsgs) {
|
|
useConversationStore.getState().upsertActiveConversation(currentMsgs);
|
|
}
|
|
}
|
|
_chat?.updateMessages(ms =>
|
|
ms.map(m => {
|
|
if (m.id === streamingMsg.id) {
|
|
return {
|
|
...m,
|
|
streaming: false,
|
|
error: delta.phase === 'error' ? delta.error : undefined,
|
|
};
|
|
}
|
|
if (m.optimistic) {
|
|
return { ...m, optimistic: false };
|
|
}
|
|
return m;
|
|
})
|
|
);
|
|
set({ isStreaming: false, activeRunId: null });
|
|
|
|
if (delta.phase === 'end') {
|
|
const latestMsgs = _chat?.getMessages() || [];
|
|
const completedMsg = latestMsgs.find(m => m.id === streamingMsg.id);
|
|
if (completedMsg?.content) {
|
|
const suggestions = generateFollowUpSuggestions(completedMsg.content);
|
|
if (suggestions.length > 0) {
|
|
get().setSuggestions(suggestions);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
} else if (delta.stream === 'hand') {
|
|
const handMsg: StreamMsg = {
|
|
id: `hand_${Date.now()}_${generateRandomString(4)}`,
|
|
role: 'hand',
|
|
content: delta.handResult
|
|
? (typeof delta.handResult === 'string' ? delta.handResult : JSON.stringify(delta.handResult, null, 2))
|
|
: `Hand: ${delta.handName || 'unknown'} - ${delta.handStatus || 'triggered'}`,
|
|
timestamp: new Date(),
|
|
runId: delta.runId,
|
|
handName: delta.handName,
|
|
handStatus: delta.handStatus,
|
|
handResult: delta.handResult,
|
|
};
|
|
_chat?.updateMessages(ms => [...ms, handMsg]);
|
|
} else if (delta.stream === 'workflow') {
|
|
const workflowMsg: StreamMsg = {
|
|
id: `workflow_${Date.now()}_${generateRandomString(4)}`,
|
|
role: 'workflow',
|
|
content: delta.workflowResult
|
|
? (typeof delta.workflowResult === 'string' ? delta.workflowResult : JSON.stringify(delta.workflowResult, null, 2))
|
|
: `Workflow: ${delta.workflowId || 'unknown'} step ${delta.workflowStep || '?'} - ${delta.workflowStatus || 'running'}`,
|
|
timestamp: new Date(),
|
|
runId: delta.runId,
|
|
workflowId: delta.workflowId,
|
|
workflowStep: delta.workflowStep,
|
|
workflowStatus: delta.workflowStatus,
|
|
workflowResult: delta.workflowResult,
|
|
};
|
|
_chat?.updateMessages(ms => [...ms, workflowMsg]);
|
|
}
|
|
});
|
|
|
|
return unsubscribe;
|
|
},
|
|
}),
|
|
{
|
|
name: 'zclaw-stream-storage',
|
|
partialize: (state) => ({
|
|
chatMode: state.chatMode,
|
|
}),
|
|
},
|
|
),
|
|
);
|