Files
zclaw_openfang/src/core/remote-execution/engine.ts
2026-03-12 00:23:42 +08:00

167 lines
5.2 KiB
TypeScript

// 远程执行系统 - 实现类
import type {
RemoteExecutionSystem,
Device,
Task,
TaskStatus,
StatusHandler,
Result,
DeviceStatus
} from './types';
import { generateId } from '../../utils/id';
import { createLogger } from '../../utils/logger';
const log = createLogger('RemoteExecution');
export class RemoteExecutionEngine implements RemoteExecutionSystem {
private devices: Map<string, Device> = new Map();
private tasks: Map<string, Task> = new Map();
private subscriptions: Map<string, StatusHandler[]> = new Map();
private runningCount = 0;
private maxConcurrent: number;
private pendingQueue: Task[] = [];
constructor(maxConcurrent: number = 5) {
this.maxConcurrent = maxConcurrent;
}
async registerDevice(device: Device): Promise<void> {
this.devices.set(device.id, device);
log.info(`Device registered: ${device.id} (${device.platform})`);
}
async heartbeat(deviceId: string): Promise<DeviceStatus> {
const device = this.devices.get(deviceId);
if (!device) {
throw new Error(`Device not found: ${deviceId}`);
}
device.lastHeartbeat = new Date();
return device.status;
}
async submitTask(task: Task): Promise<string> {
task.id = task.id || generateId('task');
task.status = 'pending';
task.createdAt = new Date();
this.tasks.set(task.id, task);
log.info(`Task submitted: ${task.id} (priority: ${task.priority})`);
if (this.runningCount < this.maxConcurrent) {
this.executeTask(task).catch(err => log.error(`Task execution error: ${err.message}`));
} else {
this.pendingQueue.push(task);
log.debug(`Task queued (${this.pendingQueue.length} pending)`);
}
return task.id;
}
async cancelTask(taskId: string): Promise<void> {
const task = this.tasks.get(taskId);
if (!task) throw new Error(`Task not found: ${taskId}`);
task.status = 'cancelled';
this.notifySubscribers(taskId, 'cancelled');
log.info(`Task cancelled: ${taskId}`);
}
async getStatus(taskId: string): Promise<TaskStatus> {
const task = this.tasks.get(taskId);
if (!task) throw new Error(`Task not found: ${taskId}`);
return task.status;
}
getTask(taskId: string): Task | undefined {
return this.tasks.get(taskId);
}
getDevice(deviceId: string): Device | undefined {
return this.devices.get(deviceId);
}
listDevices(): Device[] {
return Array.from(this.devices.values());
}
listTasks(filter?: { status?: TaskStatus; userId?: string }): Task[] {
let tasks = Array.from(this.tasks.values());
if (filter?.status) tasks = tasks.filter(t => t.status === filter.status);
if (filter?.userId) tasks = tasks.filter(t => t.userId === filter.userId);
return tasks.sort((a, b) => b.createdAt.getTime() - a.createdAt.getTime());
}
subscribe(taskId: string, handler: StatusHandler): void {
if (!this.subscriptions.has(taskId)) {
this.subscriptions.set(taskId, []);
}
this.subscriptions.get(taskId)!.push(handler);
}
async pushResult(taskId: string, result: Result): Promise<void> {
const task = this.tasks.get(taskId);
if (!task) throw new Error(`Task not found: ${taskId}`);
task.result = result;
task.status = result.success ? 'completed' : 'failed';
task.completedAt = new Date();
this.notifySubscribers(taskId, task.status);
}
getStats(): { total: number; running: number; pending: number; completed: number; failed: number } {
const tasks = Array.from(this.tasks.values());
return {
total: tasks.length,
running: tasks.filter(t => t.status === 'running').length,
pending: tasks.filter(t => t.status === 'pending').length,
completed: tasks.filter(t => t.status === 'completed').length,
failed: tasks.filter(t => t.status === 'failed').length,
};
}
private async executeTask(task: Task): Promise<void> {
this.runningCount++;
try {
task.status = 'running';
task.startedAt = new Date();
this.notifySubscribers(task.id, 'running');
log.info(`Executing task: ${task.id}`);
// TODO: 集成 OpenClaw SDK 实际执行
await new Promise(resolve => setTimeout(resolve, 1000));
await this.pushResult(task.id, {
taskId: task.id,
success: true,
data: { message: 'Task completed successfully' }
});
} catch (error: any) {
await this.pushResult(task.id, {
taskId: task.id,
success: false,
error: error.message
});
} finally {
this.runningCount--;
this.processNextInQueue();
}
}
private processNextInQueue(): void {
if (this.pendingQueue.length > 0 && this.runningCount < this.maxConcurrent) {
// 按优先级排序
this.pendingQueue.sort((a, b) => {
const priority: Record<string, number> = { high: 0, normal: 1, low: 2 };
return (priority[a.priority] ?? 1) - (priority[b.priority] ?? 1);
});
const next = this.pendingQueue.shift()!;
this.executeTask(next).catch(err => log.error(`Queued task error: ${err.message}`));
}
}
private notifySubscribers(taskId: string, status: TaskStatus, progress?: number): void {
const handlers = this.subscriptions.get(taskId);
if (handlers) {
handlers.forEach(handler => handler(status, progress));
}
}
}