fix(web): SSE 连接添加指数退避重连策略
useAlertSSE hook 和 message store 的 connectSSE 均改为手动重连: 1s→2s→4s→8s→16s→30s(cap),最大重试 10 次,随机 jitter 0.5-1.0x。 替代浏览器原生 EventSource 固定 ~3s 重连,避免服务端压力。
This commit is contained in:
@@ -24,49 +24,51 @@ export interface VitalUpdateSSEEvent {
|
||||
occurred_at?: string;
|
||||
}
|
||||
|
||||
type ConnectionState = 'connecting' | 'connected' | 'disconnected';
|
||||
|
||||
interface UseAlertSSEOptions {
|
||||
/** 是否启用 SSE 连接(默认 true) */
|
||||
enabled?: boolean;
|
||||
/** 收到 alert 事件的回调 */
|
||||
onAlert?: (data: AlertSSEEvent) => void;
|
||||
/** 收到 vital_update 事件的回调 */
|
||||
onVitalUpdate?: (data: VitalUpdateSSEEvent) => void;
|
||||
}
|
||||
|
||||
interface UseAlertSSEReturn {
|
||||
/** 连接状态 */
|
||||
connected: boolean;
|
||||
/** 最近收到的 alert 事件列表(最多保留 100 条) */
|
||||
connectionState: ConnectionState;
|
||||
recentAlerts: AlertSSEEvent[];
|
||||
/** 手动重连 */
|
||||
reconnect: () => void;
|
||||
}
|
||||
|
||||
const MAX_RECENT_ALERTS = 100;
|
||||
const INITIAL_DELAY_MS = 1000;
|
||||
const MAX_DELAY_MS = 30_000;
|
||||
const MAX_RETRIES = 10;
|
||||
|
||||
/**
|
||||
* SSE 实时告警订阅 Hook。
|
||||
*
|
||||
* 封装 EventSource 连接管理,支持:
|
||||
* - 自动重连(EventSource 内置)
|
||||
* - 事件分发(alert / vital_update)
|
||||
* - 连接状态追踪
|
||||
* - 最近告警缓存
|
||||
*/
|
||||
export function useAlertSSE(options: UseAlertSSEOptions = {}): UseAlertSSEReturn {
|
||||
const { enabled = true, onAlert, onVitalUpdate } = options;
|
||||
const eventSourceRef = useRef<EventSource | null>(null);
|
||||
const reconnectTimerRef = useRef<ReturnType<typeof setTimeout> | null>(null);
|
||||
const retryCountRef = useRef(0);
|
||||
const reconnectKeyRef = useRef(0);
|
||||
const [connected, setConnected] = useState(false);
|
||||
const [connectionState, setConnectionState] = useState<ConnectionState>('disconnected');
|
||||
const [recentAlerts, setRecentAlerts] = useState<AlertSSEEvent[]>([]);
|
||||
|
||||
const clearReconnectTimer = useCallback(() => {
|
||||
if (reconnectTimerRef.current) {
|
||||
clearTimeout(reconnectTimerRef.current);
|
||||
reconnectTimerRef.current = null;
|
||||
}
|
||||
}, []);
|
||||
|
||||
const connect = useCallback(() => {
|
||||
// 关闭旧连接
|
||||
if (eventSourceRef.current) {
|
||||
eventSourceRef.current.close();
|
||||
eventSourceRef.current = null;
|
||||
}
|
||||
clearReconnectTimer();
|
||||
setConnected(false);
|
||||
setConnectionState('disconnected');
|
||||
|
||||
if (!enabled) return;
|
||||
|
||||
@@ -75,11 +77,14 @@ export function useAlertSSE(options: UseAlertSSEOptions = {}): UseAlertSSEReturn
|
||||
|
||||
const baseUrl = import.meta.env.VITE_API_BASE_URL || '/api/v1';
|
||||
const url = `${baseUrl}/messages/stream?token=${encodeURIComponent(token)}`;
|
||||
setConnectionState('connecting');
|
||||
const es = new EventSource(url);
|
||||
eventSourceRef.current = es;
|
||||
|
||||
es.onopen = () => {
|
||||
retryCountRef.current = 0;
|
||||
setConnected(true);
|
||||
setConnectionState('connected');
|
||||
};
|
||||
|
||||
es.addEventListener('alert', (e: MessageEvent) => {
|
||||
@@ -90,25 +95,41 @@ export function useAlertSSE(options: UseAlertSSEOptions = {}): UseAlertSSEReturn
|
||||
const next = [data, ...prev];
|
||||
return next.length > MAX_RECENT_ALERTS ? next.slice(0, MAX_RECENT_ALERTS) : next;
|
||||
});
|
||||
} catch {
|
||||
// 忽略解析失败的事件
|
||||
}
|
||||
} catch { /* ignore parse errors */ }
|
||||
});
|
||||
|
||||
es.addEventListener('vital_update', (e: MessageEvent) => {
|
||||
try {
|
||||
const data = JSON.parse(e.data) as VitalUpdateSSEEvent;
|
||||
onVitalUpdate?.(data);
|
||||
} catch {
|
||||
// 忽略解析失败的事件
|
||||
}
|
||||
} catch { /* ignore parse errors */ }
|
||||
});
|
||||
|
||||
es.onerror = () => {
|
||||
setConnected(false);
|
||||
// EventSource 会自动重连,无需手动处理
|
||||
setConnectionState('disconnected');
|
||||
es.close();
|
||||
eventSourceRef.current = null;
|
||||
|
||||
retryCountRef.current += 1;
|
||||
if (retryCountRef.current > MAX_RETRIES) {
|
||||
return;
|
||||
}
|
||||
|
||||
const delay = Math.min(
|
||||
INITIAL_DELAY_MS * Math.pow(2, retryCountRef.current - 1),
|
||||
MAX_DELAY_MS,
|
||||
);
|
||||
const jitter = delay * (0.5 + Math.random() * 0.5);
|
||||
|
||||
reconnectTimerRef.current = setTimeout(() => {
|
||||
reconnectTimerRef.current = null;
|
||||
const tokenNow = localStorage.getItem('access_token');
|
||||
if (!tokenNow || !enabled) return;
|
||||
connect();
|
||||
}, jitter);
|
||||
};
|
||||
}, [enabled, onAlert, onVitalUpdate]);
|
||||
}, [enabled, onAlert, onVitalUpdate, clearReconnectTimer]);
|
||||
|
||||
useEffect(() => {
|
||||
connect();
|
||||
@@ -117,13 +138,16 @@ export function useAlertSSE(options: UseAlertSSEOptions = {}): UseAlertSSEReturn
|
||||
eventSourceRef.current.close();
|
||||
eventSourceRef.current = null;
|
||||
}
|
||||
clearReconnectTimer();
|
||||
setConnected(false);
|
||||
setConnectionState('disconnected');
|
||||
};
|
||||
}, [connect, reconnectKeyRef.current]);
|
||||
|
||||
const reconnect = useCallback(() => {
|
||||
retryCountRef.current = 0;
|
||||
reconnectKeyRef.current += 1;
|
||||
}, []);
|
||||
|
||||
return { connected, recentAlerts, reconnect };
|
||||
return { connected, connectionState, recentAlerts, reconnect };
|
||||
}
|
||||
|
||||
@@ -1,6 +1,10 @@
|
||||
import { create } from 'zustand';
|
||||
import { getUnreadCount, listMessages, markRead, type MessageInfo } from '../api/messages';
|
||||
|
||||
const INITIAL_DELAY_MS = 1000;
|
||||
const MAX_DELAY_MS = 30_000;
|
||||
const MAX_RETRIES = 10;
|
||||
|
||||
interface MessageState {
|
||||
unreadCount: number;
|
||||
recentMessages: MessageInfo[];
|
||||
@@ -71,32 +75,81 @@ export const useMessageStore = create<MessageState>((set, get) => ({
|
||||
},
|
||||
|
||||
connectSSE: () => {
|
||||
const baseUrl = import.meta.env.VITE_API_BASE_URL || '/api/v1';
|
||||
const token = localStorage.getItem('access_token');
|
||||
if (!token) return () => {};
|
||||
let es: EventSource | null = null;
|
||||
let retryCount = 0;
|
||||
let reconnectTimer: ReturnType<typeof setTimeout> | null = null;
|
||||
let disposed = false;
|
||||
|
||||
const url = `${baseUrl}/messages/stream?token=${encodeURIComponent(token)}`;
|
||||
const es = new EventSource(url);
|
||||
|
||||
es.addEventListener('message', () => {
|
||||
get().fetchUnreadCount();
|
||||
get().fetchRecentMessages();
|
||||
});
|
||||
|
||||
es.addEventListener('alert', () => {
|
||||
get().fetchUnreadCount();
|
||||
});
|
||||
|
||||
es.addEventListener('vital_update', () => {
|
||||
// 体征数据更新事件 — 预留:未来可触发趋势图刷新
|
||||
});
|
||||
|
||||
es.onerror = () => {
|
||||
// SSE 连接断开时 EventSource 会自动重连
|
||||
const clearReconnectTimer = () => {
|
||||
if (reconnectTimer) {
|
||||
clearTimeout(reconnectTimer);
|
||||
reconnectTimer = null;
|
||||
}
|
||||
};
|
||||
|
||||
const connect = () => {
|
||||
if (es) {
|
||||
es.close();
|
||||
es = null;
|
||||
}
|
||||
clearReconnectTimer();
|
||||
|
||||
const token = localStorage.getItem('access_token');
|
||||
if (!token || disposed) return;
|
||||
|
||||
const baseUrl = import.meta.env.VITE_API_BASE_URL || '/api/v1';
|
||||
const url = `${baseUrl}/messages/stream?token=${encodeURIComponent(token)}`;
|
||||
es = new EventSource(url);
|
||||
|
||||
es.onopen = () => {
|
||||
retryCount = 0;
|
||||
};
|
||||
|
||||
es.addEventListener('message', () => {
|
||||
get().fetchUnreadCount();
|
||||
get().fetchRecentMessages();
|
||||
});
|
||||
|
||||
es.addEventListener('alert', () => {
|
||||
get().fetchUnreadCount();
|
||||
});
|
||||
|
||||
es.addEventListener('vital_update', () => {
|
||||
// 体征数据更新事件 — 预留:未来可触发趋势图刷新
|
||||
});
|
||||
|
||||
es.onerror = () => {
|
||||
es?.close();
|
||||
es = null;
|
||||
|
||||
retryCount += 1;
|
||||
if (retryCount > MAX_RETRIES || disposed) return;
|
||||
|
||||
const delay = Math.min(
|
||||
INITIAL_DELAY_MS * Math.pow(2, retryCount - 1),
|
||||
MAX_DELAY_MS,
|
||||
);
|
||||
const jitter = delay * (0.5 + Math.random() * 0.5);
|
||||
|
||||
reconnectTimer = setTimeout(() => {
|
||||
reconnectTimer = null;
|
||||
if (disposed) return;
|
||||
const tokenNow = localStorage.getItem('access_token');
|
||||
if (!tokenNow) return;
|
||||
connect();
|
||||
}, jitter);
|
||||
};
|
||||
};
|
||||
|
||||
connect();
|
||||
|
||||
return () => {
|
||||
es.close();
|
||||
disposed = true;
|
||||
if (es) {
|
||||
es.close();
|
||||
es = null;
|
||||
}
|
||||
clearReconnectTimer();
|
||||
};
|
||||
},
|
||||
}));
|
||||
|
||||
Reference in New Issue
Block a user