Files
zclaw_openfang/src/core/multi-agent/message-bus.ts
2026-03-12 00:23:42 +08:00

72 lines
2.2 KiB
TypeScript

// 多 Agent 消息总线 - Agent 间通信的核心
import type { AgentMessage, MessageHandler } from './types';
import { generateId } from '../../utils/id';
import { createLogger } from '../../utils/logger';
const log = createLogger('MessageBus');
export class MessageBus {
private subscribers: Map<string, MessageHandler[]> = new Map();
private broadcastHandlers: MessageHandler[] = [];
private messageLog: AgentMessage[] = [];
subscribe(agentId: string, handler: MessageHandler): void {
if (!this.subscribers.has(agentId)) {
this.subscribers.set(agentId, []);
}
this.subscribers.get(agentId)!.push(handler);
log.debug(`Agent ${agentId} subscribed to message bus`);
}
unsubscribe(agentId: string): void {
this.subscribers.delete(agentId);
log.debug(`Agent ${agentId} unsubscribed from message bus`);
}
onBroadcast(handler: MessageHandler): void {
this.broadcastHandlers.push(handler);
}
async send(message: Omit<AgentMessage, 'id' | 'timestamp'>): Promise<void> {
const fullMessage: AgentMessage = {
...message,
id: generateId('msg'),
timestamp: new Date(),
};
this.messageLog.push(fullMessage);
log.debug(`Message: ${fullMessage.from} -> ${fullMessage.to} [${fullMessage.type}]`);
if (fullMessage.to === '*') {
// 广播
const allHandlers = [...this.broadcastHandlers];
for (const [, handlers] of this.subscribers) {
allHandlers.push(...handlers);
}
await Promise.allSettled(allHandlers.map(h => h(fullMessage)));
} else {
// 定向发送
const handlers = this.subscribers.get(fullMessage.to);
if (handlers) {
await Promise.allSettled(handlers.map(h => h(fullMessage)));
} else {
log.warn(`No handlers for agent ${fullMessage.to}`);
}
}
}
getMessages(agentId?: string, limit: number = 50): AgentMessage[] {
let messages = this.messageLog;
if (agentId) {
messages = messages.filter(m => m.from === agentId || m.to === agentId || m.to === '*');
}
return messages.slice(-limit);
}
clear(): void {
this.subscribers.clear();
this.broadcastHandlers = [];
this.messageLog = [];
}
}