Files
zclaw_openfang/src/gateway/ws-client.ts
2026-03-12 00:23:42 +08:00

444 lines
12 KiB
TypeScript

/**
* ZCLAW WebSocket Client
*
* Typed WebSocket client for OpenClaw Gateway protocol.
* Handles:
* - Connection + handshake (connect challenge/response)
* - Request/response pattern (with timeout)
* - Event subscription
* - Auto-reconnect
* - Agent streaming
*/
import WebSocket from 'ws';
import { EventEmitter } from 'events';
import * as crypto from 'crypto';
// === Protocol Types ===
export interface ConnectParams {
minProtocol?: number;
maxProtocol?: number;
client: {
id: string;
version: string;
platform: string;
mode: 'operator' | 'node';
};
role: 'operator' | 'node';
scopes: string[];
auth?: { token?: string };
locale?: string;
userAgent?: string;
device?: {
id: string;
publicKey?: string;
signature?: string;
signedAt?: number;
nonce?: string;
};
}
export interface GatewayRequest {
type: 'req';
id: string;
method: string;
params?: Record<string, any>;
}
export interface GatewayResponse {
type: 'res';
id: string;
ok: boolean;
payload?: any;
error?: any;
}
export interface GatewayEvent {
type: 'event';
event: string;
payload?: any;
seq?: number;
stateVersion?: number;
}
export type GatewayFrame = GatewayRequest | GatewayResponse | GatewayEvent;
// Agent stream events
export interface AgentStreamEvent {
stream: 'assistant' | 'tool' | 'lifecycle';
delta?: string;
content?: string;
tool?: string;
phase?: 'start' | 'end' | 'error';
runId?: string;
error?: string;
}
export interface WsClientOptions {
/** Gateway WebSocket URL (default: ws://127.0.0.1:18789) */
url?: string;
/** Auth token */
token?: string;
/** Client identifier */
clientId?: string;
/** Client version */
clientVersion?: string;
/** Auto-reconnect on disconnect */
autoReconnect?: boolean;
/** Reconnect interval ms */
reconnectInterval?: number;
/** Max reconnect attempts (0 = infinite) */
maxReconnectAttempts?: number;
/** Request timeout ms */
requestTimeout?: number;
}
export type ConnectionState = 'disconnected' | 'connecting' | 'handshaking' | 'connected' | 'reconnecting';
export class GatewayWsClient extends EventEmitter {
private ws: WebSocket | null = null;
private state: ConnectionState = 'disconnected';
private requestId: number = 0;
private pendingRequests = new Map<string, {
resolve: (value: any) => void;
reject: (reason: any) => void;
timer: ReturnType<typeof setTimeout>;
}>();
private reconnectAttempts: number = 0;
private reconnectTimer: ReturnType<typeof setTimeout> | null = null;
private deviceId: string;
private options: Required<WsClientOptions>;
constructor(opts: WsClientOptions = {}) {
super();
this.options = {
url: opts.url || 'ws://127.0.0.1:18789',
token: opts.token || '',
clientId: opts.clientId || 'zclaw-tauri',
clientVersion: opts.clientVersion || '0.1.0',
autoReconnect: opts.autoReconnect ?? true,
reconnectInterval: opts.reconnectInterval || 3000,
maxReconnectAttempts: opts.maxReconnectAttempts || 0,
requestTimeout: opts.requestTimeout || 30000,
};
this.deviceId = crypto.randomBytes(16).toString('hex');
}
/** Current connection state */
getState(): ConnectionState {
return this.state;
}
/** Connect to Gateway */
async connect(): Promise<void> {
if (this.state === 'connected' || this.state === 'connecting') {
return;
}
this.setState('connecting');
return new Promise((resolve, reject) => {
try {
this.ws = new WebSocket(this.options.url);
this.ws.on('open', () => {
this.setState('handshaking');
// Wait for connect.challenge event
});
this.ws.on('message', (data: Buffer) => {
try {
const frame: GatewayFrame = JSON.parse(data.toString());
this.handleFrame(frame, resolve);
} catch (err: any) {
this.emit('error', new Error(`Failed to parse frame: ${err.message}`));
}
});
this.ws.on('close', (code: number, reason: Buffer) => {
const wasConnected = this.state === 'connected';
this.cleanup();
if (wasConnected && this.options.autoReconnect) {
this.scheduleReconnect();
}
this.emit('close', { code, reason: reason.toString() });
});
this.ws.on('error', (err: Error) => {
if (this.state === 'connecting') {
this.cleanup();
reject(err);
}
this.emit('error', err);
});
} catch (err) {
this.cleanup();
reject(err);
}
});
}
/** Disconnect from Gateway */
disconnect() {
this.cancelReconnect();
this.options.autoReconnect = false;
if (this.ws) {
this.ws.close(1000, 'Client disconnect');
}
this.cleanup();
}
/** Send a request and wait for response */
async request(method: string, params?: Record<string, any>): Promise<any> {
if (this.state !== 'connected') {
throw new Error(`Cannot send request in state: ${this.state}`);
}
const id = `req_${++this.requestId}`;
const frame: GatewayRequest = { type: 'req', id, method, params: params || {} };
return new Promise((resolve, reject) => {
const timer = setTimeout(() => {
this.pendingRequests.delete(id);
reject(new Error(`Request ${method} timed out after ${this.options.requestTimeout}ms`));
}, this.options.requestTimeout);
this.pendingRequests.set(id, { resolve, reject, timer });
this.send(frame);
});
}
/** Send agent message (trigger agent loop) */
async sendAgentMessage(message: string, opts?: {
sessionKey?: string;
model?: string;
agentId?: string;
}): Promise<{ runId: string; acceptedAt: string }> {
return this.request('agent', {
message,
sessionKey: opts?.sessionKey,
model: opts?.model,
agentId: opts?.agentId,
});
}
/** Wait for agent run to complete */
async waitForAgent(runId: string): Promise<{ status: string; startedAt: string; endedAt: string; error?: string }> {
return this.request('agent.wait', { runId });
}
/** Send message through IM channel */
async sendChannelMessage(channel: string, chatId: string, text: string): Promise<any> {
return this.request('send', { channel, chatId, text });
}
/** Get Gateway health */
async getHealth(): Promise<any> {
return this.request('health');
}
/** Get Gateway status */
async getStatus(): Promise<any> {
return this.request('status');
}
// === ZCLAW Custom RPC Methods ===
async listClones(): Promise<any> {
return this.request('zclaw.clones.list');
}
async createClone(opts: { name: string; role?: string; nickname?: string; scenarios?: string[]; model?: string }): Promise<any> {
return this.request('zclaw.clones.create', opts);
}
async getUsageStats(): Promise<any> {
return this.request('zclaw.stats.usage');
}
async getSessionStats(): Promise<any> {
return this.request('zclaw.stats.sessions');
}
async getWorkspaceInfo(): Promise<any> {
return this.request('zclaw.workspace.info');
}
async getPluginStatus(): Promise<any> {
return this.request('zclaw.plugins.status');
}
async getQuickConfig(): Promise<any> {
return this.request('zclaw.config.quick', { get: true });
}
async saveQuickConfig(config: Record<string, any>): Promise<any> {
return this.request('zclaw.config.quick', config);
}
// === Internal ===
private handleFrame(frame: GatewayFrame, connectResolve?: (value: void) => void) {
switch (frame.type) {
case 'event':
this.handleEvent(frame as GatewayEvent, connectResolve);
break;
case 'res':
this.handleResponse(frame as GatewayResponse);
break;
default:
// Ignore unexpected frame types
break;
}
}
private handleEvent(event: GatewayEvent, connectResolve?: (value: void) => void) {
if (event.event === 'connect.challenge' && this.state === 'handshaking') {
// Respond to challenge with connect request
this.performHandshake(event.payload?.nonce, connectResolve);
return;
}
// Emit typed events
this.emit('event', event);
this.emit(`event:${event.event}`, event.payload);
// Specific agent stream events
if (event.event === 'agent') {
this.emit('agent:stream', event.payload as AgentStreamEvent);
}
}
private performHandshake(nonce: string, connectResolve?: (value: void) => void) {
const platform = process.platform === 'win32' ? 'windows' : process.platform === 'darwin' ? 'macos' : 'linux';
const connectReq: GatewayRequest = {
type: 'req',
id: `connect_${Date.now()}`,
method: 'connect',
params: {
minProtocol: 3,
maxProtocol: 3,
client: {
id: this.options.clientId,
version: this.options.clientVersion,
platform,
mode: 'operator',
},
role: 'operator',
scopes: ['operator.read', 'operator.write'],
auth: this.options.token ? { token: this.options.token } : {},
locale: 'zh-CN',
userAgent: `zclaw-tauri/${this.options.clientVersion}`,
device: {
id: this.deviceId,
},
},
};
// Listen for the connect response
const handler = (data: Buffer) => {
try {
const frame = JSON.parse(data.toString());
if (frame.type === 'res' && frame.id === connectReq.id) {
this.ws?.removeListener('message', handler);
if (frame.ok) {
this.setState('connected');
this.reconnectAttempts = 0;
this.emit('connected', frame.payload);
connectResolve?.();
} else {
const err = new Error(`Handshake failed: ${JSON.stringify(frame.error)}`);
this.emit('error', err);
this.cleanup();
}
}
} catch { /* ignore parse errors during handshake */ }
};
this.ws?.on('message', handler);
this.send(connectReq);
}
private handleResponse(res: GatewayResponse) {
const pending = this.pendingRequests.get(res.id);
if (pending) {
clearTimeout(pending.timer);
this.pendingRequests.delete(res.id);
if (res.ok) {
pending.resolve(res.payload);
} else {
pending.reject(new Error(JSON.stringify(res.error)));
}
}
}
private send(frame: GatewayFrame) {
if (this.ws?.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(frame));
}
}
private setState(state: ConnectionState) {
const prev = this.state;
this.state = state;
if (prev !== state) {
this.emit('state', { state, previous: prev });
}
}
private cleanup() {
// Clear all pending requests
for (const [id, pending] of this.pendingRequests) {
clearTimeout(pending.timer);
pending.reject(new Error('Connection closed'));
}
this.pendingRequests.clear();
if (this.ws) {
this.ws.removeAllListeners();
if (this.ws.readyState === WebSocket.OPEN || this.ws.readyState === WebSocket.CONNECTING) {
try { this.ws.close(); } catch { /* ignore */ }
}
this.ws = null;
}
this.setState('disconnected');
}
private scheduleReconnect() {
if (this.options.maxReconnectAttempts > 0 && this.reconnectAttempts >= this.options.maxReconnectAttempts) {
this.emit('reconnect:failed', { attempts: this.reconnectAttempts });
return;
}
this.reconnectAttempts++;
this.setState('reconnecting');
const delay = Math.min(
this.options.reconnectInterval * Math.pow(1.5, this.reconnectAttempts - 1),
30000
);
this.reconnectTimer = setTimeout(async () => {
try {
await this.connect();
this.emit('reconnect:success', { attempts: this.reconnectAttempts });
} catch {
// Will trigger another reconnect via the close handler
}
}, delay);
}
private cancelReconnect() {
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = null;
}
}
}