refactor(desktop): streamStore sendMessage 拆分 Phase 3 — 提取 DeltaBuffer+4 Handler
Some checks failed
CI / Lint & TypeCheck (push) Has been cancelled
CI / Unit Tests (push) Has been cancelled
CI / Build Frontend (push) Has been cancelled
CI / Rust Check (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
CI / E2E Tests (push) Has been cancelled

- DeltaBuffer 类: ~60fps 文本/思考增量缓冲,替换内联 textBuffer/thinkBuffer
- createToolHandler: 工具步骤管理 (toolStart/toolEnd + artifact 自动创建)
- createHandHandler: Hand 能力消息生成
- createSubtaskHandler: 子任务状态映射
- createCompleteHandler: 完成回调 (token 统计+记忆提取+反思+建议)
- sendMessage 内联回调从 ~350 行缩减到 ~130 行 (-63%)
- TypeScript 类型检查通过, 8 个 seam 测试通过
This commit is contained in:
iven
2026-04-21 23:03:04 +08:00
parent ae7322e610
commit 191cc3097c

View File

@@ -144,6 +144,265 @@ interface ChatStoreAccess {
setChatStoreState: (partial: Record<string, unknown>) => void; setChatStoreState: (partial: Record<string, unknown>) => void;
} }
// ---------------------------------------------------------------------------
// Delta buffer — batches text/thinking deltas at ~60fps
// ---------------------------------------------------------------------------
class DeltaBuffer {
private text = '';
private think = '';
private timer: ReturnType<typeof setTimeout> | null = null;
constructor(
private readonly assistantId: string,
private readonly chat: ChatStoreAccess,
) {}
appendText(delta: string) {
this.text += delta;
this.scheduleFlush();
}
appendThinking(delta: string) {
this.think += delta;
this.scheduleFlush();
}
flush() {
this.timer = null;
const text = this.text;
const think = this.think;
this.text = '';
this.think = '';
if (text || think) {
this.chat.updateMessages(msgs =>
msgs.map(m => {
if (m.id !== this.assistantId) return m;
return {
...m,
...(text ? { content: m.content + text } : {}),
...(think ? { thinkingContent: (m.thinkingContent || '') + think } : {}),
};
})
);
}
}
clear() {
if (this.timer !== null) {
clearTimeout(this.timer);
this.timer = null;
}
this.text = '';
this.think = '';
}
flushRemaining() {
if (this.timer !== null) {
clearTimeout(this.timer);
this.timer = null;
}
this.flush();
}
private scheduleFlush() {
if (this.timer === null) {
this.timer = setTimeout(() => this.flush(), 0);
}
}
}
// ---------------------------------------------------------------------------
// Stream event handlers (extracted from sendMessage)
// ---------------------------------------------------------------------------
function createToolHandler(assistantId: string, chat: ChatStoreAccess) {
return (tool: string, input: string, output: string) => {
if (output) {
// toolEnd: complete the last running step for this tool
chat.updateMessages(msgs =>
msgs.map(m => {
if (m.id !== assistantId) return m;
const steps = [...(m.toolSteps || [])];
for (let i = steps.length - 1; i >= 0; i--) {
if (steps[i].toolName === tool && steps[i].status === 'running') {
steps[i] = { ...steps[i], output, status: 'completed' as const };
break;
}
}
return { ...m, toolSteps: steps };
})
);
// Auto-create artifact when file_write tool produces output
if (tool === 'file_write') {
try {
const parsed = JSON.parse(output);
const filePath = parsed?.path || parsed?.file_path || '';
const content = parsed?.content || '';
if (filePath && content) {
const fileName = filePath.split('/').pop() || filePath;
const ext = fileName.split('.').pop()?.toLowerCase() || '';
const typeMap: Record<string, 'code' | 'markdown' | 'text'> = {
ts: 'code', tsx: 'code', js: 'code', jsx: 'code',
py: 'code', rs: 'code', go: 'code', java: 'code',
md: 'markdown', txt: 'text', json: 'code',
html: 'code', css: 'code', sql: 'code', sh: 'code',
};
const langMap: Record<string, string> = {
ts: 'typescript', tsx: 'typescript', js: 'javascript', jsx: 'javascript',
py: 'python', rs: 'rust', go: 'go', java: 'java',
html: 'html', css: 'css', sql: 'sql', sh: 'bash', json: 'json',
};
useArtifactStore.getState().addArtifact({
id: `artifact_${Date.now()}`,
name: fileName,
content: typeof content === 'string' ? content : JSON.stringify(content, null, 2),
type: typeMap[ext] || 'text',
language: langMap[ext],
createdAt: new Date(),
sourceStepId: assistantId,
});
}
} catch { /* non-critical: artifact creation from tool output */ }
}
} else {
// toolStart: create new running step
const step: ToolCallStep = {
id: `step_${Date.now()}_${generateRandomString(4)}`,
toolName: tool,
input,
output: '',
status: 'running',
timestamp: new Date(),
};
chat.updateMessages(msgs =>
msgs.map(m =>
m.id === assistantId
? { ...m, toolSteps: [...(m.toolSteps || []), step] }
: m
)
);
}
};
}
function createHandHandler(chat: ChatStoreAccess, runId: string) {
return (name: string, status: string, result?: unknown) => {
const handMsg: StreamMsg = {
id: `hand_${Date.now()}_${generateRandomString(4)}`,
role: 'hand',
content: result
? (typeof result === 'string' ? result : JSON.stringify(result, null, 2))
: `Hand: ${name} - ${status}`,
timestamp: new Date(),
runId,
handName: name,
handStatus: status,
handResult: result,
};
chat.updateMessages(msgs => [...msgs, handMsg]);
};
}
function createSubtaskHandler(assistantId: string, chat: ChatStoreAccess) {
return (taskId: string, description: string, status: string, detail?: string) => {
const statusMap: Record<string, Subtask['status']> = {
started: 'pending',
running: 'in_progress',
completed: 'completed',
failed: 'failed',
};
const mappedStatus = statusMap[status] || 'in_progress';
chat.updateMessages(msgs =>
msgs.map(m => {
if (m.id !== assistantId) return m;
const subtasks = [...(m.subtasks || [])];
const existingIdx = subtasks.findIndex(st => st.id === taskId);
if (existingIdx >= 0) {
subtasks[existingIdx] = { ...subtasks[existingIdx], status: mappedStatus, result: detail };
} else {
subtasks.push({ id: taskId, description, status: mappedStatus, result: detail });
}
return { ...m, subtasks };
})
);
};
}
function createCompleteHandler(
assistantId: string,
_streamStartTime: number,
chat: ChatStoreAccess,
buffer: DeltaBuffer,
agentId: string,
set: (partial: Partial<StreamState>) => void,
) {
return (inputTokens?: number, outputTokens?: number) => {
buffer.flushRemaining();
const currentMsgs = chat.getMessages();
if (currentMsgs) {
useConversationStore.getState().upsertActiveConversation(currentMsgs);
}
chat.updateMessages(msgs =>
msgs.map(m => {
if (m.id === assistantId) return { ...m, streaming: false };
if (m.optimistic) return { ...m, optimistic: false };
return m;
})
);
set({ isStreaming: false, activeRunId: null });
if (inputTokens !== undefined && outputTokens !== undefined) {
useMessageStore.getState().addTokenUsage(inputTokens, outputTokens);
chat.setChatStoreState({
totalInputTokens: useMessageStore.getState().totalInputTokens,
totalOutputTokens: useMessageStore.getState().totalOutputTokens,
});
}
// Async memory extraction
const msgs = chat.getMessages() || [];
const filtered = msgs
.filter(m => m.role === 'user' || m.role === 'assistant')
.map(m => ({ role: m.role, content: m.content }));
const convId = useConversationStore.getState().currentConversationId;
getMemoryExtractor().extractFromConversation(filtered, agentId, convId ?? undefined)
.then(() => {
if (typeof window !== 'undefined') {
window.dispatchEvent(new CustomEvent('zclaw:agent-profile-updated', {
detail: { agentId }
}));
}
})
.catch(err => log.warn('Memory extraction failed:', err));
intelligenceClient.reflection.recordConversation().catch(err => {
log.warn('Recording conversation failed:', err);
});
intelligenceClient.reflection.shouldReflect().then(shouldReflect => {
if (shouldReflect) {
intelligenceClient.reflection.reflect(agentId, []).catch(err => {
log.warn('Reflection failed:', err);
});
}
});
// Follow-up suggestions
const latestMsgs = chat.getMessages() || [];
const completedMsg = latestMsgs.find(m => m.id === assistantId);
if (completedMsg?.content) {
const suggestions = generateFollowUpSuggestions(completedMsg.content);
if (suggestions.length > 0) {
set({ suggestions });
}
}
};
}
export interface StreamState { export interface StreamState {
isStreaming: boolean; isStreaming: boolean;
/** Brief cooldown after cancelStream — prevents race with backend active-stream check */ /** Brief cooldown after cancelStream — prevents race with backend active-stream check */
@@ -322,40 +581,8 @@ export const useStreamStore = create<StreamState>()(
}); });
set({ isStreaming: true, activeRunId: null }); set({ isStreaming: true, activeRunId: null });
// ── Delta buffering ── // Delta buffer — batches updates at ~60fps
// Accumulate text/thinking deltas in local buffers and flush to store const buffer = new DeltaBuffer(assistantId, _chat);
// at ~60fps intervals. This prevents React "Maximum update depth exceeded"
// when the LLM emits many small deltas per frame (e.g. Kimi thinking).
let textBuffer = '';
let thinkBuffer = '';
let flushTimer: ReturnType<typeof setTimeout> | null = null;
const flushBuffers = () => {
flushTimer = null;
const text = textBuffer;
const think = thinkBuffer;
textBuffer = '';
thinkBuffer = '';
if (text || think) {
_chat?.updateMessages(msgs =>
msgs.map(m => {
if (m.id !== assistantId) return m;
return {
...m,
...(text ? { content: m.content + text } : {}),
...(think ? { thinkingContent: (m.thinkingContent || '') + think } : {}),
};
})
);
}
};
const scheduleFlush = () => {
if (flushTimer === null) {
flushTimer = setTimeout(flushBuffers, 0);
}
};
try { try {
const client = getClient(); const client = getClient();
@@ -384,213 +611,16 @@ export const useStreamStore = create<StreamState>()(
const result = await client.chatStream( const result = await client.chatStream(
content, content,
{ {
onDelta: (delta: string) => { onDelta: (delta: string) => buffer.appendText(delta),
textBuffer += delta; onThinkingDelta: (delta: string) => buffer.appendThinking(delta),
scheduleFlush(); onTool: createToolHandler(assistantId, _chat),
}, onHand: createHandHandler(_chat, runId),
onThinkingDelta: (delta: string) => { onSubtaskStatus: createSubtaskHandler(assistantId, _chat),
thinkBuffer += delta; onComplete: createCompleteHandler(assistantId, streamStartTime, _chat, buffer, agentId, set),
scheduleFlush();
},
onTool: (tool: string, input: string, output: string) => {
if (output) {
// toolEnd: find the last running step for this tool and complete it
_chat?.updateMessages(msgs =>
msgs.map(m => {
if (m.id !== assistantId) return m;
const steps = [...(m.toolSteps || [])];
for (let i = steps.length - 1; i >= 0; i--) {
if (steps[i].toolName === tool && steps[i].status === 'running') {
steps[i] = { ...steps[i], output, status: 'completed' as const };
break;
}
}
return { ...m, toolSteps: steps };
})
);
// Auto-create artifact when file_write tool produces output
if (tool === 'file_write' && output) {
try {
const parsed = JSON.parse(output);
const filePath = parsed?.path || parsed?.file_path || '';
const content = parsed?.content || '';
if (filePath && content) {
const fileName = filePath.split('/').pop() || filePath;
const ext = fileName.split('.').pop()?.toLowerCase() || '';
const typeMap: Record<string, 'code' | 'markdown' | 'text'> = {
ts: 'code', tsx: 'code', js: 'code', jsx: 'code',
py: 'code', rs: 'code', go: 'code', java: 'code',
md: 'markdown', txt: 'text', json: 'code',
html: 'code', css: 'code', sql: 'code', sh: 'code',
};
const langMap: Record<string, string> = {
ts: 'typescript', tsx: 'typescript', js: 'javascript', jsx: 'javascript',
py: 'python', rs: 'rust', go: 'go', java: 'java',
html: 'html', css: 'css', sql: 'sql', sh: 'bash', json: 'json',
};
useArtifactStore.getState().addArtifact({
id: `artifact_${Date.now()}`,
name: fileName,
content: typeof content === 'string' ? content : JSON.stringify(content, null, 2),
type: typeMap[ext] || 'text',
language: langMap[ext],
createdAt: new Date(),
sourceStepId: assistantId,
});
}
} catch { /* non-critical: artifact creation from tool output */ }
}
} else {
// toolStart: create new running step
const step: ToolCallStep = {
id: `step_${Date.now()}_${generateRandomString(4)}`,
toolName: tool,
input,
output: '',
status: 'running',
timestamp: new Date(),
};
_chat?.updateMessages(msgs =>
msgs.map(m =>
m.id === assistantId
? { ...m, toolSteps: [...(m.toolSteps || []), step] }
: m
)
);
}
},
onHand: (name: string, status: string, result?: unknown) => {
const handMsg: StreamMsg = {
id: `hand_${Date.now()}_${generateRandomString(4)}`,
role: 'hand',
content: result
? (typeof result === 'string' ? result : JSON.stringify(result, null, 2))
: `Hand: ${name} - ${status}`,
timestamp: new Date(),
runId,
handName: name,
handStatus: status,
handResult: result,
};
_chat?.updateMessages(msgs => [...msgs, handMsg]);
},
onSubtaskStatus: (taskId: string, description: string, status: string, detail?: string) => {
// Map backend status to frontend Subtask status
const statusMap: Record<string, Subtask['status']> = {
started: 'pending',
running: 'in_progress',
completed: 'completed',
failed: 'failed',
};
const mappedStatus = statusMap[status] || 'in_progress';
_chat?.updateMessages(msgs =>
msgs.map(m => {
if (m.id !== assistantId) return m;
const subtasks = [...(m.subtasks || [])];
const existingIdx = subtasks.findIndex(st => st.id === taskId);
if (existingIdx >= 0) {
subtasks[existingIdx] = { ...subtasks[existingIdx], status: mappedStatus, result: detail };
} else {
subtasks.push({
id: taskId,
description,
status: mappedStatus,
result: detail,
});
}
return { ...m, subtasks };
})
);
},
onComplete: (inputTokens?: number, outputTokens?: number) => {
// Flush any remaining buffered deltas before finalizing
if (flushTimer !== null) {
clearTimeout(flushTimer);
flushTimer = null;
}
flushBuffers();
const currentMsgs = _chat?.getMessages();
if (currentMsgs) {
useConversationStore.getState().upsertActiveConversation(currentMsgs);
}
_chat?.updateMessages(msgs =>
msgs.map(m => {
if (m.id === assistantId) {
return { ...m, streaming: false, runId };
}
if (m.optimistic) {
return { ...m, optimistic: false };
}
return m;
})
);
set({ isStreaming: false, activeRunId: null });
if (inputTokens !== undefined && outputTokens !== undefined) {
useMessageStore.getState().addTokenUsage(inputTokens, outputTokens);
_chat?.setChatStoreState({
totalInputTokens: useMessageStore.getState().totalInputTokens,
totalOutputTokens: useMessageStore.getState().totalOutputTokens,
});
}
// Async memory extraction — dispatch update event AFTER extraction completes
const msgs = _chat?.getMessages() || [];
const filtered = msgs
.filter(m => m.role === 'user' || m.role === 'assistant')
.map(m => ({ role: m.role, content: m.content }));
const convId = useConversationStore.getState().currentConversationId;
getMemoryExtractor().extractFromConversation(filtered, agentId, convId ?? undefined)
.then(() => {
// Notify RightPanel to refresh UserProfile after memory extraction completes
if (typeof window !== 'undefined') {
window.dispatchEvent(new CustomEvent('zclaw:agent-profile-updated', {
detail: { agentId }
}));
}
})
.catch(err => {
log.warn('Memory extraction failed:', err);
});
intelligenceClient.reflection.recordConversation().catch(err => {
log.warn('Recording conversation failed:', err);
});
intelligenceClient.reflection.shouldReflect().then(shouldReflect => {
if (shouldReflect) {
intelligenceClient.reflection.reflect(agentId, []).catch(err => {
log.warn('Reflection failed:', err);
});
}
});
// Follow-up suggestions
const latestMsgs = _chat?.getMessages() || [];
const completedMsg = latestMsgs.find(m => m.id === assistantId);
if (completedMsg?.content) {
const suggestions = generateFollowUpSuggestions(completedMsg.content);
if (suggestions.length > 0) {
get().setSuggestions(suggestions);
}
}
},
onError: async (error: string) => { onError: async (error: string) => {
// Flush any remaining buffered deltas before erroring buffer.flushRemaining();
if (flushTimer !== null) {
clearTimeout(flushTimer);
flushTimer = null;
}
flushBuffers();
// Attempt 401 auth recovery (token refresh + kernel reconnect)
const recoveryMsg = await tryRecoverFromAuthError(error); const recoveryMsg = await tryRecoverFromAuthError(error);
const rawError = recoveryMsg || error; const displayError = formatUserError(recoveryMsg || error);
const displayError = formatUserError(rawError);
_chat?.updateMessages(msgs => _chat?.updateMessages(msgs =>
msgs.map(m => msgs.map(m =>
@@ -604,7 +634,6 @@ export const useStreamStore = create<StreamState>()(
set({ isStreaming: false, activeRunId: null }); set({ isStreaming: false, activeRunId: null });
}, },
}, },
// eslint-disable-next-line @typescript-eslint/no-explicit-any
{ {
sessionKey: effectiveSessionKey, sessionKey: effectiveSessionKey,
agentId: effectiveAgentId, agentId: effectiveAgentId,
@@ -632,21 +661,13 @@ export const useStreamStore = create<StreamState>()(
) )
); );
} catch (err: unknown) { } catch (err: unknown) {
// Flush remaining buffers on error buffer.clear();
if (flushTimer !== null) {
clearTimeout(flushTimer);
flushTimer = null;
}
textBuffer = '';
thinkBuffer = '';
let errorMessage = err instanceof Error ? err.message : '无法连接 Gateway'; let errorMessage = err instanceof Error ? err.message : '无法连接 Gateway';
// Attempt 401 auth recovery
const recoveryMsg = await tryRecoverFromAuthError(errorMessage); const recoveryMsg = await tryRecoverFromAuthError(errorMessage);
if (recoveryMsg) errorMessage = recoveryMsg; if (recoveryMsg) errorMessage = recoveryMsg;
_chat?.updateMessages(msgs => _chat.updateMessages(msgs =>
msgs.map(m => msgs.map(m =>
m.id === assistantId m.id === assistantId
? { ? {