import 'dart:async'; import 'dart:convert'; import 'package:dio/dio.dart'; import 'package:logger/logger.dart'; import 'package:rxdart/rxdart.dart'; import 'package:web_socket_channel/web_socket_channel.dart'; import '../../domain/entities/server_config.dart'; import '../models/bridge_event.dart'; /// Service for communicating with all bridges. /// Includes auto-reconnection with exponential backoff and health polling. class BridgeService { final Logger _logger = Logger(); final Map _httpClients = {}; final Map _wsChannels = {}; final Map _wsSubscriptions = {}; final Map _servers = {}; final Map _reconnectAttempts = {}; final Map _reconnectTimers = {}; Timer? _healthCheckTimer; bool _disposed = false; // Reconnection config static const _initialReconnectDelay = Duration(seconds: 1); static const _maxReconnectDelay = Duration(seconds: 30); static const _healthCheckInterval = Duration(seconds: 10); static const _commandRetryCount = 3; static const _commandRetryDelay = Duration(milliseconds: 200); // Event streams final _eventController = BehaviorSubject(); final _connectionStatusController = BehaviorSubject>.seeded({}); // Callback for state resync after reconnection void Function(String serverId)? onReconnected; /// Stream of all events from all bridges Stream get eventStream => _eventController.stream; /// Stream of connection status per server Stream> get connectionStatus => _connectionStatusController.stream; /// Get current connection status Map get currentConnectionStatus => _connectionStatusController.value; /// Initialize the service with server configurations Future initialize(List servers) async { _logger.i('Initializing BridgeService with ${servers.length} servers'); for (final server in servers) { if (!server.enabled) { _logger.d('Skipping disabled server: ${server.id}'); continue; } _servers[server.id] = server; // Create HTTP client _httpClients[server.id] = Dio(BaseOptions( baseUrl: server.bridgeUrl, connectTimeout: const Duration(seconds: 5), receiveTimeout: const Duration(seconds: 10), )); _updateConnectionStatus(server.id, false); } } /// Connect to a specific server's bridge Future connect(String serverId) async { final server = _servers[serverId]; if (server == null) { _logger.w('Unknown server: $serverId'); return false; } try { // Check bridge health final response = await _httpClients[serverId]!.get('/health'); if (response.statusCode == 200) { _updateConnectionStatus(serverId, true); _logger.i('Connected to bridge: $serverId'); // Connect WebSocket for events if available if (server.websocketUrl != null) { await _connectWebSocket(serverId, server.websocketUrl!); } return true; } } catch (e) { _logger.e('Failed to connect to $serverId: $e'); } _updateConnectionStatus(serverId, false); return false; } /// Connect to all configured servers and start health monitoring Future connectAll() async { for (final serverId in _servers.keys) { await connect(serverId); } _startHealthChecks(); } /// Disconnect from a specific server Future disconnect(String serverId) async { await _disconnectWebSocket(serverId); _updateConnectionStatus(serverId, false); _logger.i('Disconnected from bridge: $serverId'); } /// Disconnect from all servers Future disconnectAll() async { for (final serverId in _servers.keys) { await disconnect(serverId); } } // ============================================================ // Command Methods // ============================================================ /// Switch camera to monitor (ViewerConnectLive). /// Critical command — retries up to 3 times with backoff. Future viewerConnectLive(int viewer, int channel) async { final serverId = _findServerForMonitor(viewer); if (serverId == null) { _logger.w('No server found for monitor $viewer'); return false; } return _retryCommand('viewerConnectLive', () async { final response = await _httpClients[serverId]!.post( '/viewer/connect-live', data: {'Viewer': viewer, 'Channel': channel}, ); return response.statusCode == 200; }); } /// Clear monitor (ViewerClear) Future viewerClear(int viewer) async { final serverId = _findServerForMonitor(viewer); if (serverId == null) return false; try { final response = await _httpClients[serverId]!.post( '/viewer/clear', data: {'Viewer': viewer}, ); return response.statusCode == 200; } catch (e) { _logger.e('viewerClear failed: $e'); return false; } } /// PTZ Pan control Future ptzPan(int camera, String direction, int speed) async { final serverId = _findServerForCamera(camera); if (serverId == null) return false; try { final response = await _httpClients[serverId]!.post( '/camera/pan', data: {'Camera': camera, 'Direction': direction, 'Speed': speed}, ); return response.statusCode == 200; } catch (e) { _logger.e('ptzPan failed: $e'); return false; } } /// PTZ Tilt control Future ptzTilt(int camera, String direction, int speed) async { final serverId = _findServerForCamera(camera); if (serverId == null) return false; try { final response = await _httpClients[serverId]!.post( '/camera/tilt', data: {'Camera': camera, 'Direction': direction, 'Speed': speed}, ); return response.statusCode == 200; } catch (e) { _logger.e('ptzTilt failed: $e'); return false; } } /// PTZ Zoom control Future ptzZoom(int camera, String direction, int speed) async { final serverId = _findServerForCamera(camera); if (serverId == null) return false; try { final response = await _httpClients[serverId]!.post( '/camera/zoom', data: {'Camera': camera, 'Direction': direction, 'Speed': speed}, ); return response.statusCode == 200; } catch (e) { _logger.e('ptzZoom failed: $e'); return false; } } /// PTZ Stop all movement Future ptzStop(int camera) async { final serverId = _findServerForCamera(camera); if (serverId == null) return false; try { final response = await _httpClients[serverId]!.post( '/camera/stop', data: {'Camera': camera}, ); return response.statusCode == 200; } catch (e) { _logger.e('ptzStop failed: $e'); return false; } } /// PTZ Go to preset Future ptzPreset(int camera, int preset) async { final serverId = _findServerForCamera(camera); if (serverId == null) return false; try { final response = await _httpClients[serverId]!.post( '/camera/preset', data: {'Camera': camera, 'Preset': preset}, ); return response.statusCode == 200; } catch (e) { _logger.e('ptzPreset failed: $e'); return false; } } // ============================================================ // Query Methods // ============================================================ /// Get current monitor states from a bridge Future>> getMonitorStates(String serverId) async { try { final response = await _httpClients[serverId]!.get('/monitors'); if (response.statusCode == 200) { final data = response.data as Map; final monitors = data['monitors'] as List? ?? []; return monitors.cast>(); } } catch (e) { _logger.e('getMonitorStates failed for $serverId: $e'); } return []; } /// Get active alarms from a bridge Future>> getActiveAlarms(String serverId) async { try { final response = await _httpClients[serverId]!.get('/alarms/active'); if (response.statusCode == 200) { final data = response.data as Map; final alarms = data['alarms'] as List? ?? []; return alarms.cast>(); } } catch (e) { _logger.e('getActiveAlarms failed for $serverId: $e'); } return []; } /// Get bridge status Future?> getBridgeStatus(String serverId) async { try { final response = await _httpClients[serverId]!.get('/status'); if (response.statusCode == 200) { return response.data as Map; } } catch (e) { _logger.e('getBridgeStatus failed for $serverId: $e'); } return null; } // ============================================================ // Private Methods // ============================================================ String? _findServerForCamera(int cameraId) { for (final entry in _servers.entries) { if (entry.value.ownsCamera(cameraId)) { return entry.key; } } return null; } String? _findServerForMonitor(int monitorId) { for (final entry in _servers.entries) { if (entry.value.ownsMonitor(monitorId)) { return entry.key; } } return null; } void _updateConnectionStatus(String serverId, bool connected) { final current = Map.from(_connectionStatusController.value); current[serverId] = connected; _connectionStatusController.add(current); } Future _connectWebSocket(String serverId, String url) async { try { await _disconnectWebSocket(serverId); _logger.d('Connecting WebSocket to $url'); final channel = WebSocketChannel.connect(Uri.parse(url)); _wsChannels[serverId] = channel; _reconnectAttempts[serverId] = 0; // Reset on successful connection _wsSubscriptions[serverId] = channel.stream.listen( (message) { try { final json = jsonDecode(message as String) as Map; if (json['type'] == 'connected') { _logger.d('WebSocket connected to $serverId'); return; } final event = BridgeEvent.fromJson(json, serverId); _eventController.add(event); } catch (e) { _logger.e('Failed to parse WebSocket message: $e'); } }, onError: (error) { _logger.e('WebSocket error for $serverId: $error'); _updateConnectionStatus(serverId, false); _scheduleReconnect(serverId); }, onDone: () { _logger.w('WebSocket closed for $serverId'); _updateConnectionStatus(serverId, false); _scheduleReconnect(serverId); }, ); } catch (e) { _logger.e('Failed to connect WebSocket to $serverId: $e'); _scheduleReconnect(serverId); } } Future _disconnectWebSocket(String serverId) async { await _wsSubscriptions[serverId]?.cancel(); _wsSubscriptions.remove(serverId); await _wsChannels[serverId]?.sink.close(); _wsChannels.remove(serverId); } /// Retry a critical command with exponential backoff. /// Used for CrossSwitch and other critical operations. Future _retryCommand(String name, Future Function() command) async { for (int attempt = 1; attempt <= _commandRetryCount; attempt++) { try { final result = await command(); if (result) return true; } catch (e) { if (attempt == _commandRetryCount) { _logger.e('$name failed after $attempt attempts: $e'); return false; } _logger.w('$name attempt $attempt failed, retrying: $e'); await Future.delayed(_commandRetryDelay * attempt); } } return false; } /// Schedule WebSocket reconnection with exponential backoff. void _scheduleReconnect(String serverId) { if (_disposed) return; _reconnectTimers[serverId]?.cancel(); final attempts = _reconnectAttempts[serverId] ?? 0; final delay = Duration( milliseconds: (_initialReconnectDelay.inMilliseconds * (1 << attempts.clamp(0, 5))) // 1s, 2s, 4s, 8s, 16s, 32s .clamp(0, _maxReconnectDelay.inMilliseconds), ); _logger.i('Scheduling reconnect for $serverId in ${delay.inSeconds}s (attempt ${attempts + 1})'); _reconnectTimers[serverId] = Timer(delay, () async { if (_disposed) return; _reconnectAttempts[serverId] = attempts + 1; final server = _servers[serverId]; if (server == null) return; // Check if bridge is healthy before reconnecting WebSocket try { final response = await _httpClients[serverId]!.get('/health'); if (response.statusCode == 200) { _updateConnectionStatus(serverId, true); if (server.websocketUrl != null) { await _connectWebSocket(serverId, server.websocketUrl!); } _logger.i('Reconnected to $serverId'); onReconnected?.call(serverId); } } catch (e) { _logger.d('Reconnect health check failed for $serverId: $e'); _scheduleReconnect(serverId); // Try again } }); } /// Start periodic health checks for all bridges. /// Detects when a bridge comes back online after failure. void _startHealthChecks() { _healthCheckTimer?.cancel(); _healthCheckTimer = Timer.periodic(_healthCheckInterval, (_) async { if (_disposed) return; for (final serverId in _servers.keys) { final isConnected = currentConnectionStatus[serverId] ?? false; if (!isConnected) { // Bridge is down — try to reconnect try { final response = await _httpClients[serverId]!.get('/health'); if (response.statusCode == 200) { _logger.i('Bridge $serverId is back online'); _updateConnectionStatus(serverId, true); _reconnectAttempts[serverId] = 0; final server = _servers[serverId]; if (server?.websocketUrl != null) { await _connectWebSocket(serverId, server!.websocketUrl!); } onReconnected?.call(serverId); } } catch (_) { // Still down, will check again next cycle } } } }); } /// Dispose of all resources void dispose() { _disposed = true; _healthCheckTimer?.cancel(); for (final timer in _reconnectTimers.values) { timer.cancel(); } _reconnectTimers.clear(); _eventController.close(); _connectionStatusController.close(); for (final sub in _wsSubscriptions.values) { sub.cancel(); } _wsSubscriptions.clear(); for (final channel in _wsChannels.values) { channel.sink.close(); } _wsChannels.clear(); _httpClients.clear(); _servers.clear(); } }