Files
zclaw_openfang/src/gateway/manager.ts
2026-03-12 00:23:42 +08:00

329 lines
9.0 KiB
TypeScript

/**
* ZCLAW Gateway Manager
*
* Manages the OpenClaw Gateway subprocess lifecycle:
* - Start/stop Gateway daemon
* - Health checking
* - Auto-restart on crash
* - Configuration management
*/
import { spawn, ChildProcess, execSync } from 'child_process';
import { EventEmitter } from 'events';
import * as path from 'path';
import * as fs from 'fs';
export type GatewayStatus = 'stopped' | 'starting' | 'running' | 'error' | 'stopping';
export interface GatewayManagerOptions {
/** OpenClaw home directory (default: ~/.openclaw) */
home?: string;
/** Gateway port (default: 18789) */
port?: number;
/** Gateway host (default: 127.0.0.1) */
host?: string;
/** Auth token for Gateway connections */
token?: string;
/** Auto-restart on crash */
autoRestart?: boolean;
/** Max restart attempts */
maxRestarts?: number;
/** Health check interval in ms (default: 15000) */
healthCheckInterval?: number;
}
export interface GatewayInfo {
status: GatewayStatus;
pid?: number;
port: number;
host: string;
wsUrl: string;
version?: string;
uptime?: number;
error?: string;
}
export class GatewayManager extends EventEmitter {
private process: ChildProcess | null = null;
private status: GatewayStatus = 'stopped';
private startTime: number = 0;
private restartCount: number = 0;
private healthTimer: ReturnType<typeof setInterval> | null = null;
private options: Required<GatewayManagerOptions>;
private lastError: string | null = null;
constructor(opts: GatewayManagerOptions = {}) {
super();
this.options = {
home: opts.home || path.join(process.env.HOME || process.env.USERPROFILE || '', '.openclaw'),
port: opts.port || 18789,
host: opts.host || '127.0.0.1',
token: opts.token || '',
autoRestart: opts.autoRestart ?? true,
maxRestarts: opts.maxRestarts || 5,
healthCheckInterval: opts.healthCheckInterval || 15000,
};
}
/** Get current gateway info */
getInfo(): GatewayInfo {
return {
status: this.status,
pid: this.process?.pid,
port: this.options.port,
host: this.options.host,
wsUrl: `ws://${this.options.host}:${this.options.port}`,
version: this.getVersion(),
uptime: this.status === 'running' ? Date.now() - this.startTime : undefined,
error: this.lastError || undefined,
};
}
/** Check if OpenClaw is installed */
isInstalled(): boolean {
try {
execSync('openclaw --version', { encoding: 'utf-8', stdio: 'pipe' });
return true;
} catch {
return false;
}
}
/** Get OpenClaw version */
getVersion(): string | undefined {
try {
return execSync('openclaw --version', { encoding: 'utf-8', stdio: 'pipe' }).trim();
} catch {
return undefined;
}
}
/** Start the OpenClaw Gateway */
async start(): Promise<void> {
if (this.status === 'running' || this.status === 'starting') {
return;
}
if (!this.isInstalled()) {
this.setStatus('error');
this.lastError = 'OpenClaw is not installed';
throw new Error('OpenClaw is not installed. Run: npm install -g openclaw@latest');
}
this.setStatus('starting');
this.lastError = null;
try {
// Check if Gateway is already running (external instance)
const alreadyRunning = await this.checkHealth();
if (alreadyRunning) {
this.setStatus('running');
this.startTime = Date.now();
this.startHealthCheck();
this.emit('connected', { external: true });
return;
}
// Start Gateway as subprocess
const env: Record<string, string> = {
...process.env as Record<string, string>,
OPENCLAW_HOME: this.options.home,
};
if (this.options.token) {
env.OPENCLAW_GATEWAY_TOKEN = this.options.token;
}
this.process = spawn('openclaw', ['gateway'], {
env,
stdio: ['ignore', 'pipe', 'pipe'],
detached: false,
});
// Capture stdout
this.process.stdout?.on('data', (data: Buffer) => {
const output = data.toString();
this.emit('log', { level: 'info', message: output.trim() });
// Detect when Gateway is ready
if (output.includes('Gateway listening') || output.includes('ready')) {
this.setStatus('running');
this.startTime = Date.now();
this.restartCount = 0;
this.startHealthCheck();
}
});
// Capture stderr
this.process.stderr?.on('data', (data: Buffer) => {
const output = data.toString();
this.emit('log', { level: 'error', message: output.trim() });
});
// Handle process exit
this.process.on('exit', (code, signal) => {
const wasRunning = this.status === 'running';
this.process = null;
this.stopHealthCheck();
if (this.status === 'stopping') {
this.setStatus('stopped');
return;
}
if (code !== 0 && wasRunning && this.options.autoRestart) {
this.restartCount++;
if (this.restartCount <= this.options.maxRestarts) {
this.lastError = `Gateway crashed (exit code: ${code}), restarting (${this.restartCount}/${this.options.maxRestarts})`;
this.emit('log', { level: 'warn', message: this.lastError });
setTimeout(() => this.start(), 2000);
return;
}
}
this.lastError = `Gateway exited with code ${code}, signal ${signal}`;
this.setStatus(code === 0 ? 'stopped' : 'error');
});
// Handle process error
this.process.on('error', (err) => {
this.lastError = err.message;
this.setStatus('error');
this.process = null;
});
// Wait for Gateway to be ready (timeout 30s)
await this.waitForReady(30000);
} catch (err: any) {
this.lastError = err.message;
this.setStatus('error');
throw err;
}
}
/** Stop the Gateway */
async stop(): Promise<void> {
if (this.status === 'stopped' || this.status === 'stopping') {
return;
}
this.setStatus('stopping');
this.stopHealthCheck();
if (this.process) {
// Send SIGTERM for graceful shutdown
this.process.kill('SIGTERM');
// Wait up to 10s for graceful exit
await new Promise<void>((resolve) => {
const timeout = setTimeout(() => {
if (this.process) {
this.process.kill('SIGKILL');
}
resolve();
}, 10000);
if (this.process) {
this.process.once('exit', () => {
clearTimeout(timeout);
resolve();
});
} else {
clearTimeout(timeout);
resolve();
}
});
}
this.process = null;
this.setStatus('stopped');
}
/** Restart the Gateway */
async restart(): Promise<void> {
await this.stop();
await this.start();
}
/** Check Gateway health via HTTP */
async checkHealth(): Promise<boolean> {
try {
const url = `http://${this.options.host}:${this.options.port}`;
const controller = new AbortController();
const timeout = setTimeout(() => controller.abort(), 3000);
const response = await fetch(url, { signal: controller.signal });
clearTimeout(timeout);
return response.ok || response.status === 200 || response.status === 101;
} catch {
return false;
}
}
/** Wait for Gateway to become ready */
private waitForReady(timeoutMs: number): Promise<void> {
return new Promise((resolve, reject) => {
const start = Date.now();
const check = async () => {
if (this.status === 'running') {
resolve();
return;
}
if (this.status === 'error') {
reject(new Error(this.lastError || 'Gateway failed to start'));
return;
}
if (Date.now() - start > timeoutMs) {
// Try health check as last resort
const healthy = await this.checkHealth();
if (healthy) {
this.setStatus('running');
this.startTime = Date.now();
this.startHealthCheck();
resolve();
return;
}
reject(new Error('Gateway startup timed out'));
return;
}
setTimeout(check, 1000);
};
check();
});
}
/** Start periodic health checks */
private startHealthCheck() {
this.stopHealthCheck();
this.healthTimer = setInterval(async () => {
const healthy = await this.checkHealth();
if (!healthy && this.status === 'running') {
this.emit('log', { level: 'warn', message: 'Health check failed' });
// Don't immediately mark as error — may be transient
}
}, this.options.healthCheckInterval);
}
/** Stop health checks */
private stopHealthCheck() {
if (this.healthTimer) {
clearInterval(this.healthTimer);
this.healthTimer = null;
}
}
/** Update status and emit event */
private setStatus(status: GatewayStatus) {
const prev = this.status;
this.status = status;
if (prev !== status) {
this.emit('status', { status, previous: prev });
}
}
}