Files
BrandManager/lib/services/lastheard_websocket_client.dart

269 lines
6.7 KiB
Dart

import 'dart:async';
import 'dart:convert';
import 'package:flutter/foundation.dart';
import 'package:web_socket_channel/web_socket_channel.dart';
class LastHeardWebSocketClient {
WebSocketChannel? _channel;
StreamController<Map<String, dynamic>>? _messageController;
Timer? _heartbeatTimer;
Timer? _reconnectTimer;
bool _isConnecting = false;
bool _shouldReconnect = true;
bool _isReady = false;
int _reconnectAttempts = 0;
static const int _maxReconnectAttempts = 5;
static const Duration _reconnectDelay = Duration(seconds: 5);
final Completer<void> _readyCompleter = Completer<void>();
// Socket.IO Engine.IO v4 WebSocket URL for Last Heard
static const String _wsUrl =
'wss://api.brandmeister.network/lh/?EIO=4&transport=websocket';
Stream<Map<String, dynamic>> get messageStream =>
_messageController?.stream ?? const Stream.empty();
bool get isConnected => _channel != null;
bool get isReady => _isReady;
Future<void> get ready => _readyCompleter.future;
Future<void> connect() async {
if (_isConnecting || isConnected) {
debugPrint('LastHeard WS: Already connected or connecting');
return;
}
_isConnecting = true;
_shouldReconnect = true;
try {
debugPrint('LastHeard WS: Connecting to $_wsUrl');
_messageController ??= StreamController<Map<String, dynamic>>.broadcast();
_channel = WebSocketChannel.connect(Uri.parse(_wsUrl));
_channel!.stream.listen(
_handleMessage,
onError: _handleError,
onDone: _handleDisconnect,
cancelOnError: false,
);
_reconnectAttempts = 0;
_isConnecting = false;
debugPrint('LastHeard WS: WebSocket connected');
} catch (e) {
_isConnecting = false;
debugPrint('LastHeard WS: Connection error: $e');
_scheduleReconnect();
}
}
void _handleMessage(dynamic message) {
try {
final messageStr = message.toString();
debugPrint('LastHeard WS: Received message: $messageStr');
if (messageStr.startsWith('0')) {
_handleOpenPacket(messageStr);
} else if (messageStr.startsWith('2')) {
_sendPong();
} else if (messageStr == '40') {
_handleNamespaceConnect();
} else if (messageStr.startsWith('42')) {
_handleDataPacket(messageStr);
} else if (messageStr.startsWith('4')) {
debugPrint('LastHeard WS: Received other type 4 message: $messageStr');
}
} catch (e) {
debugPrint('LastHeard WS: Error handling message: $e');
}
}
void _handleOpenPacket(String message) {
try {
final jsonStr = message.substring(1);
final data = jsonDecode(jsonStr) as Map<String, dynamic>;
final pingInterval = data['pingInterval'] as int? ?? 25000;
_heartbeatTimer?.cancel();
_heartbeatTimer = Timer.periodic(
Duration(milliseconds: pingInterval),
(_) => _sendPing(),
);
debugPrint('LastHeard WS: Open packet received, ping interval: $pingInterval ms');
_sendNamespaceConnect();
} catch (e) {
debugPrint('LastHeard WS: Error parsing open packet: $e');
}
}
void _sendNamespaceConnect() {
if (isConnected) {
try {
_channel!.sink.add('40');
debugPrint('LastHeard WS: Sent namespace connect (40)');
} catch (e) {
debugPrint('LastHeard WS: Error sending namespace connect: $e');
}
}
}
void _handleNamespaceConnect() {
debugPrint('LastHeard WS: Namespace connected (received 40)');
_isReady = true;
if (!_readyCompleter.isCompleted) {
_readyCompleter.complete();
}
// Send search query after namespace is connected
_sendSearchQuery();
}
void _sendSearchQuery() {
if (!isConnected || !isReady) {
debugPrint('LastHeard WS: Cannot send search query, not ready');
return;
}
try {
final message = '42${jsonEncode([
'searchMongo',
{
'query': {'sql': ''},
'amount': 200
}
])}';
_channel!.sink.add(message);
debugPrint('LastHeard WS: Sent search query: $message');
} catch (e) {
debugPrint('LastHeard WS: Error sending search query: $e');
}
}
void _handleDataPacket(String message) {
try {
String jsonStr = message.substring(1);
if (jsonStr.startsWith('2')) {
jsonStr = jsonStr.substring(1);
}
final data = jsonDecode(jsonStr);
Map<String, dynamic>? eventData;
if (data is Map<String, dynamic>) {
eventData = data;
} else if (data is List && data.isNotEmpty) {
final eventName = data[0] as String;
if (data.length > 1) {
eventData = {
'event': eventName,
'data': data[1],
};
} else {
eventData = {
'event': eventName,
};
}
}
if (eventData != null &&
_messageController != null &&
!_messageController!.isClosed) {
_messageController!.add(eventData);
}
} catch (e) {
debugPrint('LastHeard WS: Error parsing data packet: $e');
}
}
void _sendPing() {
if (isConnected) {
try {
_channel!.sink.add('2');
debugPrint('LastHeard WS: Sent ping');
} catch (e) {
debugPrint('LastHeard WS: Error sending ping: $e');
}
}
}
void _sendPong() {
if (isConnected) {
try {
_channel!.sink.add('3');
debugPrint('LastHeard WS: Sent pong');
} catch (e) {
debugPrint('LastHeard WS: Error sending pong: $e');
}
}
}
void _handleError(Object error) {
debugPrint('LastHeard WS: Stream error: $error');
}
void _handleDisconnect() {
debugPrint('LastHeard WS: Disconnected');
_cleanup();
if (_shouldReconnect) {
_scheduleReconnect();
}
}
void _scheduleReconnect() {
if (_reconnectAttempts >= _maxReconnectAttempts) {
debugPrint('LastHeard WS: Max reconnect attempts reached');
return;
}
_reconnectAttempts++;
debugPrint('LastHeard WS: Scheduling reconnect attempt $_reconnectAttempts');
_reconnectTimer?.cancel();
_reconnectTimer = Timer(_reconnectDelay, () {
if (_shouldReconnect) {
connect();
}
});
}
void _cleanup() {
_heartbeatTimer?.cancel();
_heartbeatTimer = null;
_channel = null;
_isConnecting = false;
}
void disconnect() {
debugPrint('LastHeard WS: Disconnecting');
_shouldReconnect = false;
_reconnectTimer?.cancel();
_heartbeatTimer?.cancel();
try {
_channel?.sink.close();
} catch (e) {
debugPrint('LastHeard WS: Error closing channel: $e');
}
_cleanup();
}
void dispose() {
disconnect();
_messageController?.close();
_messageController = null;
}
}