import { create } from 'zustand'; import { persist } from 'zustand/middleware'; import { getGatewayClient, AgentStreamDelta } from '../lib/gateway-client'; import { getMemoryManager } from '../lib/agent-memory'; import { getAgentIdentityManager } from '../lib/agent-identity'; import { getMemoryExtractor } from '../lib/memory-extractor'; import { getContextCompactor } from '../lib/context-compactor'; import { getReflectionEngine } from '../lib/reflection-engine'; export interface MessageFile { name: string; path?: string; size?: number; type?: string; } export interface CodeBlock { language?: string; filename?: string; content?: string; } export interface Message { id: string; role: 'user' | 'assistant' | 'tool' | 'hand' | 'workflow'; content: string; timestamp: Date; runId?: string; streaming?: boolean; toolName?: string; toolInput?: string; toolOutput?: string; error?: string; // Hand event fields handName?: string; handStatus?: string; handResult?: unknown; // Workflow event fields workflowId?: string; workflowStep?: string; workflowStatus?: string; workflowResult?: unknown; // Output files and code blocks files?: MessageFile[]; codeBlocks?: CodeBlock[]; } export interface Conversation { id: string; title: string; messages: Message[]; sessionKey: string | null; agentId: string | null; createdAt: Date; updatedAt: Date; } export interface Agent { id: string; name: string; icon: string; color: string; lastMessage: string; time: string; } export interface AgentProfileLike { id: string; name: string; nickname?: string; role?: string; } interface ChatState { messages: Message[]; conversations: Conversation[]; currentConversationId: string | null; agents: Agent[]; currentAgent: Agent | null; isStreaming: boolean; currentModel: string; sessionKey: string | null; addMessage: (message: Message) => void; updateMessage: (id: string, updates: Partial) => void; setCurrentAgent: (agent: Agent) => void; syncAgents: (profiles: AgentProfileLike[]) => void; setCurrentModel: (model: string) => void; sendMessage: (content: string) => Promise; initStreamListener: () => () => void; newConversation: () => void; switchConversation: (id: string) => void; deleteConversation: (id: string) => void; } function generateConvId(): string { return `conv_${Date.now()}_${Math.random().toString(36).slice(2, 6)}`; } function deriveTitle(messages: Message[]): string { const firstUser = messages.find(m => m.role === 'user'); if (firstUser) { const text = firstUser.content.trim(); return text.length > 30 ? text.slice(0, 30) + '...' : text; } return '新对话'; } const DEFAULT_AGENT: Agent = { id: '1', name: 'ZCLAW', icon: '🦞', color: 'bg-gradient-to-br from-orange-500 to-red-500', lastMessage: '发送消息开始对话', time: '', }; export function toChatAgent(profile: AgentProfileLike): Agent { return { id: profile.id, name: profile.name, icon: profile.nickname?.slice(0, 1) || '🦞', color: 'bg-gradient-to-br from-orange-500 to-red-500', lastMessage: profile.role || '新分身', time: '', }; } function resolveConversationAgentId(agent: Agent | null): string | null { if (!agent || agent.id === DEFAULT_AGENT.id) { return null; } return agent.id; } function resolveGatewayAgentId(agent: Agent | null): string | undefined { if (!agent || agent.id === DEFAULT_AGENT.id || agent.id.startsWith('clone_')) { return undefined; } return agent.id; } function resolveAgentForConversation(agentId: string | null, agents: Agent[]): Agent { if (!agentId) { return DEFAULT_AGENT; } return agents.find((agent) => agent.id === agentId) || DEFAULT_AGENT; } function upsertActiveConversation( conversations: Conversation[], state: Pick ): Conversation[] { if (state.messages.length === 0) { return conversations; } const currentId = state.currentConversationId || generateConvId(); const existingIdx = conversations.findIndex((conversation) => conversation.id === currentId); const nextConversation: Conversation = { id: currentId, title: deriveTitle(state.messages), messages: [...state.messages], sessionKey: state.sessionKey, agentId: resolveConversationAgentId(state.currentAgent), createdAt: existingIdx >= 0 ? conversations[existingIdx].createdAt : new Date(), updatedAt: new Date(), }; if (existingIdx >= 0) { conversations[existingIdx] = nextConversation; return conversations; } return [nextConversation, ...conversations]; } export const useChatStore = create()( persist( (set, get) => ({ messages: [], conversations: [], currentConversationId: null, agents: [DEFAULT_AGENT], currentAgent: DEFAULT_AGENT, isStreaming: false, currentModel: 'glm-5', sessionKey: null, addMessage: (message) => set((state) => ({ messages: [...state.messages, message] })), updateMessage: (id, updates) => set((state) => ({ messages: state.messages.map((m) => m.id === id ? { ...m, ...updates } : m ), })), setCurrentAgent: (agent) => set((state) => { if (state.currentAgent?.id === agent.id) { return { currentAgent: agent }; } const conversations = upsertActiveConversation([...state.conversations], state); return { conversations, currentAgent: agent, messages: [], sessionKey: null, isStreaming: false, currentConversationId: null, }; }), syncAgents: (profiles) => set((state) => { const agents = profiles.length > 0 ? profiles.map(toChatAgent) : [DEFAULT_AGENT]; const currentAgent = state.currentConversationId ? resolveAgentForConversation( state.conversations.find((conversation) => conversation.id === state.currentConversationId)?.agentId || null, agents ) : state.currentAgent ? agents.find((agent) => agent.id === state.currentAgent?.id) || agents[0] : agents[0]; return { agents, currentAgent }; }), setCurrentModel: (model) => set({ currentModel: model }), newConversation: () => { const state = get(); const conversations = upsertActiveConversation([...state.conversations], state); set({ conversations, messages: [], sessionKey: null, isStreaming: false, currentConversationId: null, }); }, switchConversation: (id: string) => { const state = get(); const conversations = upsertActiveConversation([...state.conversations], state); const target = conversations.find(c => c.id === id); if (target) { set({ conversations, messages: [...target.messages], sessionKey: target.sessionKey, currentAgent: resolveAgentForConversation(target.agentId, state.agents), currentConversationId: target.id, isStreaming: false, }); } }, deleteConversation: (id: string) => { const state = get(); const conversations = state.conversations.filter(c => c.id !== id); if (state.currentConversationId === id) { set({ conversations, messages: [], sessionKey: null, currentConversationId: null, isStreaming: false }); } else { set({ conversations }); } }, sendMessage: async (content: string) => { const { addMessage, currentAgent, sessionKey } = get(); const effectiveSessionKey = sessionKey || `session_${Date.now()}`; const effectiveAgentId = resolveGatewayAgentId(currentAgent); const agentId = currentAgent?.id || 'zclaw-main'; // Check context compaction threshold before adding new message try { const compactor = getContextCompactor(); const check = compactor.checkThreshold(get().messages.map(m => ({ role: m.role, content: m.content }))); if (check.shouldCompact) { console.log(`[Chat] Context compaction triggered (${check.urgency}): ${check.currentTokens} tokens`); const result = await compactor.compact( get().messages.map(m => ({ role: m.role, content: m.content, id: m.id, timestamp: m.timestamp })), agentId, get().currentConversationId ?? undefined ); // Replace messages with compacted version const compactedMsgs: Message[] = result.compactedMessages.map((m, i) => ({ id: m.id || `compacted_${i}_${Date.now()}`, role: m.role as Message['role'], content: m.content, timestamp: m.timestamp || new Date(), })); set({ messages: compactedMsgs }); } } catch (err) { console.warn('[Chat] Context compaction check failed:', err); } // Build memory-enhanced content let enhancedContent = content; try { const memoryMgr = getMemoryManager(); const identityMgr = getAgentIdentityManager(); const relevantMemories = await memoryMgr.search(content, { agentId, limit: 8, minImportance: 3, }); const memoryContext = relevantMemories.length > 0 ? `\n\n## 相关记忆\n${relevantMemories.map(m => `- [${m.type}] ${m.content}`).join('\n')}` : ''; const systemPrompt = identityMgr.buildSystemPrompt(agentId, memoryContext); if (systemPrompt) { enhancedContent = `\n${systemPrompt}\n\n\n${content}`; } } catch (err) { console.warn('[Chat] Memory enhancement failed, proceeding without:', err); } // Add user message (original content for display) const userMsg: Message = { id: `user_${Date.now()}`, role: 'user', content, timestamp: new Date(), }; addMessage(userMsg); // Create placeholder assistant message for streaming const assistantId = `assistant_${Date.now()}`; const assistantMsg: Message = { id: assistantId, role: 'assistant', content: '', timestamp: new Date(), streaming: true, }; addMessage(assistantMsg); set({ isStreaming: true }); try { const client = getGatewayClient(); // Try streaming first (OpenFang WebSocket) if (client.getState() === 'connected') { const { runId } = await client.chatStream( enhancedContent, { onDelta: (delta: string) => { set((state) => ({ messages: state.messages.map((m) => m.id === assistantId ? { ...m, content: m.content + delta } : m ), })); }, onTool: (tool: string, input: string, output: string) => { const toolMsg: Message = { id: `tool_${Date.now()}_${Math.random().toString(36).slice(2, 6)}`, role: 'tool', content: output || input, timestamp: new Date(), runId, toolName: tool, toolInput: input, toolOutput: output, }; set((state) => ({ messages: [...state.messages, toolMsg] })); }, onHand: (name: string, status: string, result?: unknown) => { const handMsg: Message = { id: `hand_${Date.now()}_${Math.random().toString(36).slice(2, 6)}`, role: 'hand', content: result ? (typeof result === 'string' ? result : JSON.stringify(result, null, 2)) : `Hand: ${name} - ${status}`, timestamp: new Date(), runId, handName: name, handStatus: status, handResult: result, }; set((state) => ({ messages: [...state.messages, handMsg] })); }, onComplete: () => { set((state) => ({ isStreaming: false, messages: state.messages.map((m) => m.id === assistantId ? { ...m, streaming: false } : m ), })); // Async memory extraction after stream completes const msgs = get().messages .filter(m => m.role === 'user' || m.role === 'assistant') .map(m => ({ role: m.role, content: m.content })); getMemoryExtractor().extractFromConversation(msgs, agentId, get().currentConversationId ?? undefined).catch(err => console.warn('[Chat] Memory extraction failed:', err) ); // Track conversation for reflection trigger const reflectionEngine = getReflectionEngine(); reflectionEngine.recordConversation(); if (reflectionEngine.shouldReflect()) { reflectionEngine.reflect(agentId).catch(err => console.warn('[Chat] Reflection failed:', err) ); } }, onError: (error: string) => { set((state) => ({ isStreaming: false, messages: state.messages.map((m) => m.id === assistantId ? { ...m, content: `⚠️ ${error}`, streaming: false, error } : m ), })); }, }, { sessionKey: effectiveSessionKey, agentId: effectiveAgentId, } ); if (!sessionKey) { set({ sessionKey: effectiveSessionKey }); } // Store runId on the message for correlation set((state) => ({ messages: state.messages.map((m) => m.id === assistantId ? { ...m, runId } : m ), })); return; } // Fallback to REST API (non-streaming) const result = await client.chat(enhancedContent, { sessionKey: effectiveSessionKey, agentId: effectiveAgentId, }); if (!sessionKey) { set({ sessionKey: effectiveSessionKey }); } // OpenFang returns response directly (no WebSocket streaming) if (result.response) { set((state) => ({ isStreaming: false, messages: state.messages.map((m) => m.id === assistantId ? { ...m, content: result.response || '', streaming: false } : m ), })); return; } // The actual streaming content comes via the 'agent' event listener // set in initStreamListener(). The runId links events to this message. set((state) => ({ messages: state.messages.map((m) => m.id === assistantId ? { ...m, runId: result.runId } : m ), })); } catch (err: unknown) { // Gateway not connected — show error in the assistant bubble const errorMessage = err instanceof Error ? err.message : '无法连接 Gateway'; set((state) => ({ isStreaming: false, messages: state.messages.map((m) => m.id === assistantId ? { ...m, content: `⚠️ ${errorMessage}`, streaming: false, error: errorMessage, } : m ), })); } }, initStreamListener: () => { const client = getGatewayClient(); const unsubscribe = client.onAgentStream((delta: AgentStreamDelta) => { const state = get(); const streamingMsg = [...state.messages] .reverse() .find((m) => ( m.role === 'assistant' && m.streaming && ( (delta.runId && m.runId === delta.runId) || (!delta.runId && m.runId == null) ) )) || [...state.messages] .reverse() .find((m) => m.role === 'assistant' && m.streaming); if (!streamingMsg) return; if (delta.stream === 'assistant' && (delta.delta || delta.content)) { set((s) => ({ messages: s.messages.map((m) => m.id === streamingMsg.id ? { ...m, content: m.content + (delta.delta || delta.content || '') } : m ), })); } else if (delta.stream === 'tool') { const toolMsg: Message = { id: `tool_${Date.now()}_${Math.random().toString(36).slice(2, 6)}`, role: 'tool', content: delta.toolOutput || '', timestamp: new Date(), runId: delta.runId, toolName: delta.tool, toolInput: delta.toolInput, toolOutput: delta.toolOutput, }; set((s) => ({ messages: [...s.messages, toolMsg] })); } else if (delta.stream === 'lifecycle') { if (delta.phase === 'end' || delta.phase === 'error') { set((s) => ({ isStreaming: false, messages: s.messages.map((m) => m.id === streamingMsg.id ? { ...m, streaming: false, error: delta.phase === 'error' ? delta.error : undefined, } : m ), })); } } else if (delta.stream === 'hand') { // Handle Hand trigger events from OpenFang const handMsg: Message = { id: `hand_${Date.now()}_${Math.random().toString(36).slice(2, 6)}`, role: 'hand', content: delta.handResult ? (typeof delta.handResult === 'string' ? delta.handResult : JSON.stringify(delta.handResult, null, 2)) : `Hand: ${delta.handName || 'unknown'} - ${delta.handStatus || 'triggered'}`, timestamp: new Date(), runId: delta.runId, handName: delta.handName, handStatus: delta.handStatus, handResult: delta.handResult, }; set((s) => ({ messages: [...s.messages, handMsg] })); } else if (delta.stream === 'workflow') { // Handle Workflow execution events from OpenFang const workflowMsg: Message = { id: `workflow_${Date.now()}_${Math.random().toString(36).slice(2, 6)}`, role: 'workflow', content: delta.workflowResult ? (typeof delta.workflowResult === 'string' ? delta.workflowResult : JSON.stringify(delta.workflowResult, null, 2)) : `Workflow: ${delta.workflowId || 'unknown'} step ${delta.workflowStep || '?'} - ${delta.workflowStatus || 'running'}`, timestamp: new Date(), runId: delta.runId, workflowId: delta.workflowId, workflowStep: delta.workflowStep, workflowStatus: delta.workflowStatus, workflowResult: delta.workflowResult, }; set((s) => ({ messages: [...s.messages, workflowMsg] })); } }); return unsubscribe; }, }), { name: 'zclaw-chat-storage', partialize: (state) => ({ conversations: state.conversations, currentModel: state.currentModel, }), onRehydrateStorage: () => (state) => { // Rehydrate Date objects from JSON strings if (state?.conversations) { for (const conv of state.conversations) { conv.createdAt = new Date(conv.createdAt); conv.updatedAt = new Date(conv.updatedAt); for (const msg of conv.messages) { msg.timestamp = new Date(msg.timestamp); msg.streaming = false; // Never restore streaming state } } } }, }, ), );