import 'dart:async'; import 'dart:convert'; import 'package:flutter/foundation.dart'; import 'package:web_socket_channel/web_socket_channel.dart'; class LastHeardWebSocketClient { WebSocketChannel? _channel; StreamController>? _messageController; Timer? _heartbeatTimer; Timer? _reconnectTimer; bool _isConnecting = false; bool _shouldReconnect = true; bool _isReady = false; int _reconnectAttempts = 0; static const int _maxReconnectAttempts = 5; static const Duration _reconnectDelay = Duration(seconds: 5); final List? _radioIds; int _receivedCount = 0; static const int _maxItems = 200; final Completer _readyCompleter = Completer(); LastHeardWebSocketClient({List? radioIds}) : _radioIds = radioIds; // Socket.IO Engine.IO v4 WebSocket URL for Last Heard static const String _wsUrl = 'wss://api.brandmeister.network/lh/?EIO=4&transport=websocket'; Stream> get messageStream => _messageController?.stream ?? const Stream.empty(); bool get isConnected => _channel != null; bool get isReady => _isReady; Future get ready => _readyCompleter.future; Future connect() async { if (_isConnecting || isConnected) { debugPrint('LastHeard WS: Already connected or connecting'); return; } _isConnecting = true; // Don't reconnect if radio IDs are supplied (one-time search query) _shouldReconnect = _radioIds == null || _radioIds.isEmpty; try { debugPrint('LastHeard WS: Connecting to $_wsUrl'); _messageController ??= StreamController>.broadcast(); _channel = WebSocketChannel.connect(Uri.parse(_wsUrl)); _channel!.stream.listen( _handleMessage, onError: _handleError, onDone: _handleDisconnect, cancelOnError: false, ); _reconnectAttempts = 0; _isConnecting = false; debugPrint('LastHeard WS: WebSocket connected'); } catch (e) { _isConnecting = false; debugPrint('LastHeard WS: Connection error: $e'); _scheduleReconnect(); } } void _handleMessage(dynamic message) { try { final messageStr = message.toString(); debugPrint('LastHeard WS: Received message: $messageStr'); if (messageStr.startsWith('0')) { _handleOpenPacket(messageStr); } else if (messageStr.startsWith('2')) { _sendPong(); } else if (messageStr.startsWith('40')) { _handleNamespaceConnect(); } else if (messageStr.startsWith('42')) { _handleDataPacket(messageStr); } else if (messageStr.startsWith('4')) { debugPrint('LastHeard WS: Received other type 4 message: $messageStr'); } } catch (e) { debugPrint('LastHeard WS: Error handling message: $e'); } } void _handleOpenPacket(String message) { try { final jsonStr = message.substring(1); final data = jsonDecode(jsonStr) as Map; final pingInterval = data['pingInterval'] as int? ?? 25000; _heartbeatTimer?.cancel(); _heartbeatTimer = Timer.periodic( Duration(milliseconds: pingInterval), (_) => _sendPing(), ); debugPrint('LastHeard WS: Open packet received, ping interval: $pingInterval ms'); _sendNamespaceConnect(); } catch (e) { debugPrint('LastHeard WS: Error parsing open packet: $e'); } } void _sendNamespaceConnect() { if (isConnected) { try { _channel!.sink.add('40'); debugPrint('LastHeard WS: Sent namespace connect (40)'); } catch (e) { debugPrint('LastHeard WS: Error sending namespace connect: $e'); } } } void _handleNamespaceConnect() { debugPrint('LastHeard WS: Namespace connected (received 40)'); _isReady = true; if (!_readyCompleter.isCompleted) { _readyCompleter.complete(); } // Send search query after namespace is connected _sendSearchQuery(); } void _sendSearchQuery() { if (!isConnected || !isReady) { debugPrint('LastHeard WS: Cannot send search query, not ready'); return; } if (_radioIds == null || _radioIds.isEmpty) { debugPrint('LastHeard WS: No radio IDs provided, skipping search query'); return; } try { // Build SQL query for multiple radio IDs: SourceID = 123 OR SourceID = 456 final sqlParts = _radioIds.map((id) => 'SourceID = $id').toList(); final sql = sqlParts.join(' OR '); final message = '42${jsonEncode([ 'searchMongo', { 'query': {'sql': sql}, 'amount': 200 } ])}'; _channel!.sink.add(message); debugPrint('LastHeard WS: Sent search query: $message'); } catch (e) { debugPrint('LastHeard WS: Error sending search query: $e'); } } void _handleDataPacket(String message) { try { String jsonStr = message.substring(1); if (jsonStr.startsWith('2')) { jsonStr = jsonStr.substring(1); } final data = jsonDecode(jsonStr); Map? eventData; if (data is Map) { eventData = data; } else if (data is List && data.isNotEmpty) { final eventName = data[0] as String; if (data.length > 1) { eventData = { 'event': eventName, 'data': data[1], }; } else { eventData = { 'event': eventName, }; } } if (eventData != null && _messageController != null && !_messageController!.isClosed) { _messageController!.add(eventData); // Track received items and disconnect after receiving max items (for search queries) if (_radioIds != null && _radioIds.isNotEmpty) { _receivedCount++; if (_receivedCount >= _maxItems) { debugPrint('LastHeard WS: Received $_maxItems items, disconnecting'); disconnect(); } } } } catch (e) { debugPrint('LastHeard WS: Error parsing data packet: $e'); } } void _sendPing() { if (isConnected) { try { _channel!.sink.add('2'); debugPrint('LastHeard WS: Sent ping'); } catch (e) { debugPrint('LastHeard WS: Error sending ping: $e'); } } } void _sendPong() { if (isConnected) { try { _channel!.sink.add('3'); debugPrint('LastHeard WS: Sent pong'); } catch (e) { debugPrint('LastHeard WS: Error sending pong: $e'); } } } void _handleError(Object error) { debugPrint('LastHeard WS: Stream error: $error'); } void _handleDisconnect() { debugPrint('LastHeard WS: Disconnected'); _cleanup(); if (_shouldReconnect) { _scheduleReconnect(); } } void _scheduleReconnect() { if (_reconnectAttempts >= _maxReconnectAttempts) { debugPrint('LastHeard WS: Max reconnect attempts reached'); return; } _reconnectAttempts++; debugPrint('LastHeard WS: Scheduling reconnect attempt $_reconnectAttempts'); _reconnectTimer?.cancel(); _reconnectTimer = Timer(_reconnectDelay, () { if (_shouldReconnect) { connect(); } }); } void _cleanup() { _heartbeatTimer?.cancel(); _heartbeatTimer = null; _channel = null; _isConnecting = false; } void disconnect() { debugPrint('LastHeard WS: Disconnecting'); _shouldReconnect = false; _reconnectTimer?.cancel(); _heartbeatTimer?.cancel(); try { _channel?.sink.close(); } catch (e) { debugPrint('LastHeard WS: Error closing channel: $e'); } _cleanup(); } void dispose() { disconnect(); _messageController?.close(); _messageController = null; } }