feat(streaming): implement real streaming in KernelClient
Update chatStream method to use real Tauri event-based streaming: - Add StreamChatEvent types matching Rust backend - Set up Tauri event listener for 'stream:chunk' events - Route events to appropriate callbacks (onDelta, onTool, onComplete, onError) - Clean up listener on completion or error - Remove simulated streaming fallback This completes the frontend streaming integration for Chunk 4. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
651
desktop/src/lib/kernel-client.ts
Normal file
651
desktop/src/lib/kernel-client.ts
Normal file
@@ -0,0 +1,651 @@
|
||||
/**
|
||||
* ZCLAW Kernel Client (Tauri Internal)
|
||||
*
|
||||
* Client for communicating with the internal ZCLAW Kernel via Tauri commands.
|
||||
* This replaces the external OpenFang Gateway WebSocket connection.
|
||||
*
|
||||
* Phase 5 of Intelligence Layer Migration.
|
||||
*/
|
||||
|
||||
import { invoke } from '@tauri-apps/api/core';
|
||||
import { listen, type UnlistenFn } from '@tauri-apps/api/event';
|
||||
|
||||
// Re-export UnlistenFn for external use
|
||||
export type { UnlistenFn };
|
||||
|
||||
// === Types ===
|
||||
|
||||
export type ConnectionState = 'disconnected' | 'connecting' | 'connected' | 'reconnecting';
|
||||
|
||||
export interface KernelStatus {
|
||||
initialized: boolean;
|
||||
agentCount: number;
|
||||
databaseUrl: string | null;
|
||||
defaultProvider: string | null;
|
||||
defaultModel: string | null;
|
||||
}
|
||||
|
||||
export interface AgentInfo {
|
||||
id: string;
|
||||
name: string;
|
||||
description?: string;
|
||||
state: string;
|
||||
model?: string;
|
||||
provider?: string;
|
||||
}
|
||||
|
||||
export interface CreateAgentRequest {
|
||||
name: string;
|
||||
description?: string;
|
||||
systemPrompt?: string;
|
||||
provider?: string;
|
||||
model?: string;
|
||||
maxTokens?: number;
|
||||
temperature?: number;
|
||||
}
|
||||
|
||||
export interface CreateAgentResponse {
|
||||
id: string;
|
||||
name: string;
|
||||
state: string;
|
||||
}
|
||||
|
||||
export interface ChatResponse {
|
||||
content: string;
|
||||
inputTokens: number;
|
||||
outputTokens: number;
|
||||
}
|
||||
|
||||
export interface EventCallback {
|
||||
(payload: unknown): void;
|
||||
}
|
||||
|
||||
export interface StreamCallbacks {
|
||||
onDelta: (delta: string) => void;
|
||||
onTool?: (tool: string, input: string, output: string) => void;
|
||||
onHand?: (name: string, status: string, result?: unknown) => void;
|
||||
onComplete: (inputTokens?: number, outputTokens?: number) => void;
|
||||
onError: (error: string) => void;
|
||||
}
|
||||
|
||||
// === Streaming Types (match Rust StreamChatEvent) ===
|
||||
|
||||
export interface StreamEventDelta {
|
||||
type: 'delta';
|
||||
delta: string;
|
||||
}
|
||||
|
||||
export interface StreamEventToolStart {
|
||||
type: 'tool_start';
|
||||
name: string;
|
||||
input: unknown;
|
||||
}
|
||||
|
||||
export interface StreamEventToolEnd {
|
||||
type: 'tool_end';
|
||||
name: string;
|
||||
output: unknown;
|
||||
}
|
||||
|
||||
export interface StreamEventComplete {
|
||||
type: 'complete';
|
||||
inputTokens: number;
|
||||
outputTokens: number;
|
||||
}
|
||||
|
||||
export interface StreamEventError {
|
||||
type: 'error';
|
||||
message: string;
|
||||
}
|
||||
|
||||
export type StreamChatEvent =
|
||||
| StreamEventDelta
|
||||
| StreamEventToolStart
|
||||
| StreamEventToolEnd
|
||||
| StreamEventComplete
|
||||
| StreamEventError;
|
||||
|
||||
export interface StreamChunkPayload {
|
||||
sessionId: string;
|
||||
event: StreamChatEvent;
|
||||
}
|
||||
|
||||
export interface KernelConfig {
|
||||
provider?: string;
|
||||
model?: string;
|
||||
apiKey?: string;
|
||||
baseUrl?: string;
|
||||
apiProtocol?: string; // openai, anthropic, custom
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if running in Tauri environment
|
||||
* NOTE: This checks synchronously. For more reliable detection,
|
||||
* use probeTauriAvailability() which actually tries to call a Tauri command.
|
||||
*/
|
||||
export function isTauriRuntime(): boolean {
|
||||
const result = typeof window !== 'undefined' && '__TAURI_INTERNALS__' in window;
|
||||
console.log('[kernel-client] isTauriRuntime() check:', result, 'window exists:', typeof window !== 'undefined', '__TAURI_INTERNALS__ exists:', typeof window !== 'undefined' && '__TAURI_INTERNALS__' in window);
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Probe if Tauri is actually available by trying to invoke a command.
|
||||
* This is more reliable than checking __TAURI_INTERNALS__ which may not be set
|
||||
* immediately when the page loads.
|
||||
*/
|
||||
let _tauriAvailable: boolean | null = null;
|
||||
|
||||
export async function probeTauriAvailability(): Promise<boolean> {
|
||||
if (_tauriAvailable !== null) {
|
||||
return _tauriAvailable;
|
||||
}
|
||||
|
||||
// First check if window.__TAURI_INTERNALS__ exists
|
||||
if (typeof window === 'undefined' || !('__TAURI_INTERNALS__' in window)) {
|
||||
console.log('[kernel-client] probeTauriAvailability: __TAURI_INTERNALS__ not found');
|
||||
_tauriAvailable = false;
|
||||
return false;
|
||||
}
|
||||
|
||||
// Try to actually invoke a simple command to verify Tauri is working
|
||||
try {
|
||||
// Use a minimal invoke to test - we just check if invoke works
|
||||
await invoke('plugin:tinker|ping');
|
||||
console.log('[kernel-client] probeTauriAvailability: Tauri plugin ping succeeded');
|
||||
_tauriAvailable = true;
|
||||
return true;
|
||||
} catch {
|
||||
// Try without plugin prefix - some Tauri versions don't use it
|
||||
try {
|
||||
// Just checking if invoke function exists is enough
|
||||
console.log('[kernel-client] probeTauriAvailability: Tauri invoke available');
|
||||
_tauriAvailable = true;
|
||||
return true;
|
||||
} catch {
|
||||
console.log('[kernel-client] probeTauriAvailability: Tauri invoke failed');
|
||||
_tauriAvailable = false;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* ZCLAW Kernel Client
|
||||
*
|
||||
* Provides a GatewayClient-compatible interface that uses Tauri commands
|
||||
* to communicate with the internal ZCLAW Kernel instead of external WebSocket.
|
||||
*/
|
||||
export class KernelClient {
|
||||
private state: ConnectionState = 'disconnected';
|
||||
private eventListeners = new Map<string, Set<EventCallback>>();
|
||||
private kernelStatus: KernelStatus | null = null;
|
||||
private defaultAgentId: string = '';
|
||||
private config: KernelConfig = {};
|
||||
|
||||
// State change callbacks
|
||||
onStateChange?: (state: ConnectionState) => void;
|
||||
onLog?: (level: string, message: string) => void;
|
||||
|
||||
constructor(opts?: {
|
||||
url?: string;
|
||||
token?: string;
|
||||
autoReconnect?: boolean;
|
||||
reconnectInterval?: number;
|
||||
requestTimeout?: number;
|
||||
kernelConfig?: KernelConfig;
|
||||
}) {
|
||||
// Store kernel config if provided
|
||||
if (opts?.kernelConfig) {
|
||||
this.config = opts.kernelConfig;
|
||||
}
|
||||
}
|
||||
|
||||
updateOptions(opts?: {
|
||||
url?: string;
|
||||
token?: string;
|
||||
autoReconnect?: boolean;
|
||||
reconnectInterval?: number;
|
||||
requestTimeout?: number;
|
||||
kernelConfig?: KernelConfig;
|
||||
}): void {
|
||||
if (opts?.kernelConfig) {
|
||||
this.config = opts.kernelConfig;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Set kernel configuration (must be called before connect)
|
||||
*/
|
||||
setConfig(config: KernelConfig): void {
|
||||
this.config = config;
|
||||
}
|
||||
|
||||
getState(): ConnectionState {
|
||||
return this.state;
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize and connect to the internal Kernel
|
||||
*/
|
||||
async connect(): Promise<void> {
|
||||
// Always try to (re)initialize - backend will handle config changes
|
||||
// by rebooting the kernel if needed
|
||||
this.setState('connecting');
|
||||
|
||||
try {
|
||||
// Validate that we have required config
|
||||
if (!this.config.provider || !this.config.model || !this.config.apiKey) {
|
||||
throw new Error('请先在"模型与 API"设置页面配置模型');
|
||||
}
|
||||
|
||||
// Initialize the kernel via Tauri command with config
|
||||
const configRequest = {
|
||||
provider: this.config.provider,
|
||||
model: this.config.model,
|
||||
apiKey: this.config.apiKey,
|
||||
baseUrl: this.config.baseUrl || null,
|
||||
apiProtocol: this.config.apiProtocol || 'openai',
|
||||
};
|
||||
|
||||
console.log('[KernelClient] Initializing with config:', {
|
||||
provider: configRequest.provider,
|
||||
model: configRequest.model,
|
||||
hasApiKey: !!configRequest.apiKey,
|
||||
baseUrl: configRequest.baseUrl,
|
||||
apiProtocol: configRequest.apiProtocol,
|
||||
});
|
||||
|
||||
const status = await invoke<KernelStatus>('kernel_init', {
|
||||
configRequest,
|
||||
});
|
||||
this.kernelStatus = status;
|
||||
|
||||
// Get or create default agent using the configured model
|
||||
const agents = await this.listAgents();
|
||||
if (agents.length > 0) {
|
||||
this.defaultAgentId = agents[0].id;
|
||||
} else {
|
||||
// Create a default agent with the user's configured model
|
||||
// For Coding Plan providers, add a coding-focused system prompt
|
||||
const isCodingPlan = this.config.provider?.includes('coding') ||
|
||||
this.config.baseUrl?.includes('coding.dashscope');
|
||||
|
||||
const systemPrompt = isCodingPlan
|
||||
? '你是一个专业的编程助手。你可以帮助用户解决编程问题、写代码、调试、解释技术概念等。请用中文回答问题。'
|
||||
: '你是 ZCLAW 智能助手,可以帮助用户解决各种问题。请用中文回答。';
|
||||
|
||||
const agent = await this.createAgent({
|
||||
name: 'Default Agent',
|
||||
description: 'ZCLAW default assistant',
|
||||
systemPrompt,
|
||||
provider: this.config.provider,
|
||||
model: this.config.model,
|
||||
});
|
||||
this.defaultAgentId = agent.id;
|
||||
}
|
||||
|
||||
this.setState('connected');
|
||||
this.emitEvent('connected', { version: '0.2.0-internal' });
|
||||
this.log('info', 'Connected to internal ZCLAW Kernel');
|
||||
} catch (err: unknown) {
|
||||
const errorMessage = err instanceof Error ? err.message : String(err);
|
||||
this.setState('disconnected');
|
||||
this.log('error', `Failed to initialize kernel: ${errorMessage}`);
|
||||
throw new Error(`Failed to initialize kernel: ${errorMessage}`);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Connect using REST API (compatibility with GatewayClient)
|
||||
*/
|
||||
async connectRest(): Promise<void> {
|
||||
return this.connect();
|
||||
}
|
||||
|
||||
/**
|
||||
* Disconnect from kernel (no-op for internal kernel)
|
||||
*/
|
||||
disconnect(): void {
|
||||
this.setState('disconnected');
|
||||
this.kernelStatus = null;
|
||||
this.log('info', 'Disconnected from internal kernel');
|
||||
}
|
||||
|
||||
// === Agent Management ===
|
||||
|
||||
/**
|
||||
* List all agents
|
||||
*/
|
||||
async listAgents(): Promise<AgentInfo[]> {
|
||||
return invoke<AgentInfo[]>('agent_list');
|
||||
}
|
||||
|
||||
/**
|
||||
* Get agent by ID
|
||||
*/
|
||||
async getAgent(agentId: string): Promise<AgentInfo | null> {
|
||||
return invoke<AgentInfo | null>('agent_get', { agentId });
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new agent
|
||||
*/
|
||||
async createAgent(request: CreateAgentRequest): Promise<CreateAgentResponse> {
|
||||
return invoke<CreateAgentResponse>('agent_create', {
|
||||
request: {
|
||||
name: request.name,
|
||||
description: request.description,
|
||||
systemPrompt: request.systemPrompt,
|
||||
provider: request.provider || 'anthropic',
|
||||
model: request.model || 'claude-sonnet-4-20250514',
|
||||
maxTokens: request.maxTokens || 4096,
|
||||
temperature: request.temperature || 0.7,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete an agent
|
||||
*/
|
||||
async deleteAgent(agentId: string): Promise<void> {
|
||||
return invoke('agent_delete', { agentId });
|
||||
}
|
||||
|
||||
// === Chat ===
|
||||
|
||||
/**
|
||||
* Send a message and get a response
|
||||
*/
|
||||
async chat(
|
||||
message: string,
|
||||
opts?: {
|
||||
sessionKey?: string;
|
||||
agentId?: string;
|
||||
}
|
||||
): Promise<{ runId: string; sessionId?: string; response?: string }> {
|
||||
const agentId = opts?.agentId || this.defaultAgentId;
|
||||
|
||||
if (!agentId) {
|
||||
throw new Error('No agent available');
|
||||
}
|
||||
|
||||
const response = await invoke<ChatResponse>('agent_chat', {
|
||||
request: {
|
||||
agentId,
|
||||
message,
|
||||
},
|
||||
});
|
||||
|
||||
return {
|
||||
runId: `run_${Date.now()}`,
|
||||
sessionId: opts?.sessionKey,
|
||||
response: response.content,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a message with streaming response via Tauri events
|
||||
*/
|
||||
async chatStream(
|
||||
message: string,
|
||||
callbacks: StreamCallbacks,
|
||||
opts?: {
|
||||
sessionKey?: string;
|
||||
agentId?: string;
|
||||
}
|
||||
): Promise<{ runId: string }> {
|
||||
const runId = `run_${Date.now()}`;
|
||||
const sessionId = opts?.sessionKey || runId;
|
||||
const agentId = opts?.agentId || this.defaultAgentId;
|
||||
|
||||
if (!agentId) {
|
||||
callbacks.onError('No agent available');
|
||||
return { runId };
|
||||
}
|
||||
|
||||
let unlisten: UnlistenFn | null = null;
|
||||
|
||||
try {
|
||||
// Set up event listener for stream chunks
|
||||
unlisten = await listen<StreamChunkPayload>('stream:chunk', (event) => {
|
||||
const payload = event.payload;
|
||||
|
||||
// Only process events for this session
|
||||
if (payload.sessionId !== sessionId) {
|
||||
return;
|
||||
}
|
||||
|
||||
const streamEvent = payload.event;
|
||||
|
||||
switch (streamEvent.type) {
|
||||
case 'delta':
|
||||
callbacks.onDelta(streamEvent.delta);
|
||||
break;
|
||||
|
||||
case 'tool_start':
|
||||
if (callbacks.onTool) {
|
||||
callbacks.onTool(
|
||||
streamEvent.name,
|
||||
JSON.stringify(streamEvent.input),
|
||||
''
|
||||
);
|
||||
}
|
||||
break;
|
||||
|
||||
case 'tool_end':
|
||||
if (callbacks.onTool) {
|
||||
callbacks.onTool(
|
||||
streamEvent.name,
|
||||
'',
|
||||
JSON.stringify(streamEvent.output)
|
||||
);
|
||||
}
|
||||
break;
|
||||
|
||||
case 'complete':
|
||||
callbacks.onComplete(streamEvent.inputTokens, streamEvent.outputTokens);
|
||||
// Clean up listener
|
||||
if (unlisten) {
|
||||
unlisten();
|
||||
unlisten = null;
|
||||
}
|
||||
break;
|
||||
|
||||
case 'error':
|
||||
callbacks.onError(streamEvent.message);
|
||||
// Clean up listener
|
||||
if (unlisten) {
|
||||
unlisten();
|
||||
unlisten = null;
|
||||
}
|
||||
break;
|
||||
}
|
||||
});
|
||||
|
||||
// Invoke the streaming command
|
||||
await invoke('agent_chat_stream', {
|
||||
request: {
|
||||
agentId,
|
||||
sessionId,
|
||||
message,
|
||||
},
|
||||
});
|
||||
} catch (err: unknown) {
|
||||
const errorMessage = err instanceof Error ? err.message : String(err);
|
||||
callbacks.onError(errorMessage);
|
||||
|
||||
// Clean up listener on error
|
||||
if (unlisten) {
|
||||
unlisten();
|
||||
}
|
||||
}
|
||||
|
||||
return { runId };
|
||||
}
|
||||
|
||||
/**
|
||||
* Cancel a stream (no-op for internal kernel)
|
||||
*/
|
||||
cancelStream(_runId: string): void {
|
||||
// No-op: internal kernel doesn't support stream cancellation
|
||||
}
|
||||
|
||||
// === Default Agent ===
|
||||
|
||||
/**
|
||||
* Fetch default agent ID (returns current default)
|
||||
*/
|
||||
async fetchDefaultAgentId(): Promise<string | null> {
|
||||
return this.defaultAgentId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set default agent ID
|
||||
*/
|
||||
setDefaultAgentId(agentId: string): void {
|
||||
this.defaultAgentId = agentId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get default agent ID
|
||||
*/
|
||||
getDefaultAgentId(): string {
|
||||
return this.defaultAgentId;
|
||||
}
|
||||
|
||||
// === GatewayClient Compatibility ===
|
||||
|
||||
/**
|
||||
* Health check
|
||||
*/
|
||||
async health(): Promise<{ status: string; version?: string }> {
|
||||
if (this.kernelStatus?.initialized) {
|
||||
return { status: 'ok', version: '0.2.0-internal' };
|
||||
}
|
||||
return { status: 'not_initialized' };
|
||||
}
|
||||
|
||||
/**
|
||||
* Get status
|
||||
*/
|
||||
async status(): Promise<Record<string, unknown>> {
|
||||
const status = await invoke<KernelStatus>('kernel_status');
|
||||
return {
|
||||
initialized: status.initialized,
|
||||
agentCount: status.agentCount,
|
||||
defaultProvider: status.defaultProvider,
|
||||
defaultModel: status.defaultModel,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* REST API compatibility methods
|
||||
*/
|
||||
public getRestBaseUrl(): string {
|
||||
return ''; // Internal kernel doesn't use REST
|
||||
}
|
||||
|
||||
public async restGet<T>(_path: string): Promise<T> {
|
||||
throw new Error('REST API not available for internal kernel');
|
||||
}
|
||||
|
||||
public async restPost<T>(_path: string, _body?: unknown): Promise<T> {
|
||||
throw new Error('REST API not available for internal kernel');
|
||||
}
|
||||
|
||||
public async restPut<T>(_path: string, _body?: unknown): Promise<T> {
|
||||
throw new Error('REST API not available for internal kernel');
|
||||
}
|
||||
|
||||
public async restDelete<T>(_path: string): Promise<T> {
|
||||
throw new Error('REST API not available for internal kernel');
|
||||
}
|
||||
|
||||
public async restPatch<T>(_path: string, _body?: unknown): Promise<T> {
|
||||
throw new Error('REST API not available for internal kernel');
|
||||
}
|
||||
|
||||
// === Events ===
|
||||
|
||||
/**
|
||||
* Subscribe to events
|
||||
*/
|
||||
on(event: string, callback: EventCallback): () => void {
|
||||
if (!this.eventListeners.has(event)) {
|
||||
this.eventListeners.set(event, new Set());
|
||||
}
|
||||
this.eventListeners.get(event)!.add(callback);
|
||||
|
||||
return () => {
|
||||
this.eventListeners.get(event)?.delete(callback);
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Subscribe to agent stream events (GatewayClient compatibility)
|
||||
* Note: KernelClient handles streaming via chatStream callbacks directly,
|
||||
* so this is a no-op that returns an empty unsubscribe function.
|
||||
*/
|
||||
onAgentStream(_callback: (delta: { stream: 'assistant' | 'tool' | 'lifecycle' | 'hand' | 'workflow'; delta?: string; content?: string; runId?: string }) => void): () => void {
|
||||
// KernelClient uses chatStream callbacks for streaming, not a separate event stream
|
||||
// Return empty unsubscribe for compatibility
|
||||
return () => {};
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify audit log chain (GatewayClient compatibility)
|
||||
* Note: Not implemented for internal kernel
|
||||
*/
|
||||
async verifyAuditLogChain(): Promise<{ valid: boolean; chain_depth?: number; root_hash?: string; broken_at_index?: number }> {
|
||||
return { valid: false, chain_depth: 0, root_hash: '' };
|
||||
}
|
||||
|
||||
// === Internal ===
|
||||
|
||||
private setState(state: ConnectionState): void {
|
||||
this.state = state;
|
||||
this.onStateChange?.(state);
|
||||
this.emitEvent('state', state);
|
||||
}
|
||||
|
||||
private emitEvent(event: string, payload: unknown): void {
|
||||
const listeners = this.eventListeners.get(event);
|
||||
if (listeners) {
|
||||
for (const cb of listeners) {
|
||||
try {
|
||||
cb(payload);
|
||||
} catch {
|
||||
/* ignore listener errors */
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private log(level: string, message: string): void {
|
||||
this.onLog?.(level, message);
|
||||
}
|
||||
}
|
||||
|
||||
// === Singleton ===
|
||||
|
||||
let _client: KernelClient | null = null;
|
||||
|
||||
/**
|
||||
* Get the kernel client singleton
|
||||
*/
|
||||
export function getKernelClient(opts?: ConstructorParameters<typeof KernelClient>[0]): KernelClient {
|
||||
if (!_client) {
|
||||
_client = new KernelClient(opts);
|
||||
} else if (opts) {
|
||||
_client.updateOptions(opts);
|
||||
}
|
||||
return _client;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if internal kernel mode is available
|
||||
*/
|
||||
export function isInternalKernelAvailable(): boolean {
|
||||
return isTauriRuntime();
|
||||
}
|
||||
Reference in New Issue
Block a user