// SSE 通知服务 — 监听服务端推送事件 import 'dart:async'; import 'dart:convert'; import 'package:dio/dio.dart'; /// SSE 通知事件 class NotificationEvent { final String type; final Map payload; final DateTime receivedAt; const NotificationEvent({ required this.type, required this.payload, required this.receivedAt, }); } /// SSE 通知服务 — 监听后端 Server-Sent Events 推送 /// /// 使用方式: /// ```dart /// final service = SseNotificationService(token: 'jwt-token'); /// service.events.listen((event) { /// // 处理通知 /// }); /// await service.connect(); /// ``` class SseNotificationService { final String _baseUrl; final String? _token; Dio? _dio; Response? _response; StreamController? _controller; bool _disposed = false; SseNotificationService({ required String token, required String baseUrl, }) : _token = token, _baseUrl = baseUrl; /// 通知事件流 Stream get events { _controller ??= StreamController.broadcast(); return _controller!.stream; } /// 连接到 SSE 端点 Future connect() async { if (_disposed) return; _dio = Dio(BaseOptions( baseUrl: _baseUrl, headers: { 'Accept': 'text/event-stream', 'Cache-Control': 'no-cache', if (_token != null) 'Authorization': 'Bearer $_token', }, responseType: ResponseType.stream, )); try { _response = await _dio!.get('/message/stream'); if (_response?.data == null) return; _response!.data!.stream.listen( (data) { _parseSseData(data); }, onError: (error) { if (!_disposed) { // 自动重连逻辑(3秒延迟) Future.delayed(const Duration(seconds: 3), () { if (!_disposed) connect(); }); } }, onDone: () { if (!_disposed) { Future.delayed(const Duration(seconds: 3), () { if (!_disposed) connect(); }); } }, ); } catch (e) { // 连接失败,延迟重连 if (!_disposed) { Future.delayed(const Duration(seconds: 5), () { if (!_disposed) connect(); }); } } } /// 解析 SSE 数据帧 void _parseSseData(List data) { final text = utf8.decode(data); final lines = text.split('\n'); String? eventType; String? eventData; for (final line in lines) { if (line.startsWith('event:')) { eventType = line.substring(6).trim(); } else if (line.startsWith('data:')) { eventData = line.substring(5).trim(); } else if (line.isEmpty && eventType != null && eventData != null) { // 空行 = 事件结束 _emitEvent(eventType, eventData); eventType = null; eventData = null; } } } /// 发射通知事件到流 void _emitEvent(String type, String data) { if (_disposed || _controller == null) return; try { final payload = jsonDecode(data) as Map; _controller!.add(NotificationEvent( type: type, payload: payload, receivedAt: DateTime.now(), )); } catch (_) { // 忽略解析错误 } } /// 断开连接并释放资源 void dispose() { _disposed = true; _response?.data?.stream.listen((_) {}); _controller?.close(); _dio?.close(); } }