import 'dart:async'; import 'dart:convert'; import 'package:flutter/foundation.dart'; import 'package:web_socket_channel/web_socket_channel.dart'; class MeActivityWebSocketClient { WebSocketChannel? _channel; final StreamController> _messageController = StreamController>.broadcast(); Timer? _heartbeatTimer; Timer? _reconnectTimer; bool _isConnecting = false; bool _shouldReconnect = true; bool _isReady = false; int _reconnectAttempts = 0; static const int _maxReconnectAttempts = 5; static const Duration _reconnectDelay = Duration(seconds: 5); final List _radioIds; final Completer _readyCompleter = Completer(); MeActivityWebSocketClient({ required List radioIds, }) : _radioIds = radioIds; static const String _wsUrl = 'wss://api.brandmeister.network/lh/?EIO=4&transport=websocket'; Stream> get messageStream => _messageController.stream; bool get isConnected => _channel != null; bool get isReady => _isReady; Future get ready => _readyCompleter.future; List get radioIds => _radioIds; Future connect() async { if (_isConnecting || isConnected) { debugPrint('MeActivity WS: Already connected or connecting'); return; } if (_radioIds.isEmpty) { debugPrint('MeActivity WS: No radio IDs provided'); if (!_readyCompleter.isCompleted) { _readyCompleter.complete(); } return; } _isConnecting = true; _shouldReconnect = true; try { debugPrint('MeActivity WS: Connecting to $_wsUrl'); _channel = WebSocketChannel.connect(Uri.parse(_wsUrl)); _channel!.stream.listen( _handleMessage, onError: _handleError, onDone: _handleDisconnect, cancelOnError: false, ); _reconnectAttempts = 0; _isConnecting = false; debugPrint('MeActivity WS: WebSocket connected'); } catch (e) { _isConnecting = false; debugPrint('MeActivity WS: Connection error: $e'); _scheduleReconnect(); } } void _handleMessage(dynamic message) { try { final messageStr = message.toString(); debugPrint('MeActivity WS: Received message: $messageStr'); if (messageStr.startsWith('0')) { _handleOpenPacket(messageStr); } else if (messageStr.startsWith('2')) { _sendPong(); } else if (messageStr.startsWith('40')) { _handleNamespaceConnect(); } else if (messageStr.startsWith('42')) { _handleDataPacket(messageStr); } else if (messageStr.startsWith('4')) { debugPrint('MeActivity WS: Received other type 4 message: $messageStr'); } } catch (e) { debugPrint('MeActivity WS: Error handling message: $e'); } } void _handleOpenPacket(String message) { try { final jsonStr = message.substring(1); final data = jsonDecode(jsonStr) as Map; final pingInterval = data['pingInterval'] as int? ?? 25000; _heartbeatTimer?.cancel(); _heartbeatTimer = Timer.periodic( Duration(milliseconds: pingInterval), (_) => _sendPing(), ); debugPrint( 'MeActivity WS: Open packet received, ping interval: $pingInterval ms'); _sendNamespaceConnect(); } catch (e) { debugPrint('MeActivity WS: Error parsing open packet: $e'); } } void _sendNamespaceConnect() { if (isConnected) { try { _channel!.sink.add('40'); debugPrint('MeActivity WS: Sent namespace connect (40)'); } catch (e) { debugPrint('MeActivity WS: Error sending namespace connect: $e'); } } } void _handleNamespaceConnect() { debugPrint('MeActivity WS: Namespace connected (received 40)'); _isReady = true; if (!_readyCompleter.isCompleted) { _readyCompleter.complete(); } // Send search query to fetch historical activity for user's devices _sendSearchQuery(); } void _sendSearchQuery() { if (!isConnected || !isReady) { debugPrint('MeActivity WS: Cannot send search query, not ready'); return; } try { // Build SQL query for multiple device IDs: ContextID = 123 OR ContextID = 456 final sqlParts = _radioIds.map((id) => 'ContextID = $id').toList(); final sql = sqlParts.join(' OR '); final message = '42${jsonEncode([ 'searchMongo', { 'query': {'sql': sql}, 'amount': 50 // Fetch up to 50 historical items } ])}'; _channel!.sink.add(message); debugPrint('MeActivity WS: Sent search query: $message'); } catch (e) { debugPrint('MeActivity WS: Error sending search query: $e'); } } void _handleDataPacket(String message) { try { String jsonStr = message.substring(1); if (jsonStr.startsWith('2')) { jsonStr = jsonStr.substring(1); } final data = jsonDecode(jsonStr); Map? eventData; if (data is Map) { eventData = data; } else if (data is List && data.isNotEmpty) { final eventName = data[0] as String; if (data.length > 1) { eventData = { 'event': eventName, 'data': data[1], }; } else { eventData = { 'event': eventName, }; } } if (eventData != null && !_messageController.isClosed) { _messageController.add(eventData); } } catch (e) { debugPrint('MeActivity WS: Error parsing data packet: $e'); } } void _sendPing() { if (isConnected) { try { _channel!.sink.add('2'); debugPrint('MeActivity WS: Sent ping'); } catch (e) { debugPrint('MeActivity WS: Error sending ping: $e'); } } } void _sendPong() { if (isConnected) { try { _channel!.sink.add('3'); debugPrint('MeActivity WS: Sent pong'); } catch (e) { debugPrint('MeActivity WS: Error sending pong: $e'); } } } void _handleError(Object error) { debugPrint('MeActivity WS: Stream error: $error'); } void _handleDisconnect() { debugPrint('MeActivity WS: Disconnected'); _cleanup(); if (_shouldReconnect) { _scheduleReconnect(); } } void _scheduleReconnect() { if (_reconnectAttempts >= _maxReconnectAttempts) { debugPrint('MeActivity WS: Max reconnect attempts reached'); return; } _reconnectAttempts++; debugPrint('MeActivity WS: Scheduling reconnect attempt $_reconnectAttempts'); _reconnectTimer?.cancel(); _reconnectTimer = Timer(_reconnectDelay, () { if (_shouldReconnect) { connect(); } }); } void _cleanup() { _heartbeatTimer?.cancel(); _heartbeatTimer = null; _channel = null; _isConnecting = false; } void disconnect() { debugPrint('MeActivity WS: Disconnecting'); _shouldReconnect = false; _reconnectTimer?.cancel(); _heartbeatTimer?.cancel(); try { _channel?.sink.close(); } catch (e) { debugPrint('MeActivity WS: Error closing channel: $e'); } _cleanup(); } void dispose() { disconnect(); _messageController.close(); } }