Files
COPILOT/copilot_keyboard/lib/data/services/bridge_service.dart
klas 40143734fc Initial commit: COPILOT D6 Flutter keyboard controller
Flutter web app replacing legacy WPF CCTV surveillance keyboard controller.
Includes wall overview, section view with monitor grid, camera input,
PTZ control, alarm/lock/sequence BLoCs, and legacy-matching UI styling.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-12 14:57:38 +01:00

488 lines
15 KiB
Dart

import 'dart:async';
import 'dart:convert';
import 'package:dio/dio.dart';
import 'package:logger/logger.dart';
import 'package:rxdart/rxdart.dart';
import 'package:web_socket_channel/web_socket_channel.dart';
import '../../domain/entities/server_config.dart';
import '../models/bridge_event.dart';
/// Service for communicating with all bridges.
/// Includes auto-reconnection with exponential backoff and health polling.
class BridgeService {
final Logger _logger = Logger();
final Map<String, Dio> _httpClients = {};
final Map<String, WebSocketChannel> _wsChannels = {};
final Map<String, StreamSubscription> _wsSubscriptions = {};
final Map<String, ServerConfig> _servers = {};
final Map<String, int> _reconnectAttempts = {};
final Map<String, Timer> _reconnectTimers = {};
Timer? _healthCheckTimer;
bool _disposed = false;
// Reconnection config
static const _initialReconnectDelay = Duration(seconds: 1);
static const _maxReconnectDelay = Duration(seconds: 30);
static const _healthCheckInterval = Duration(seconds: 10);
static const _commandRetryCount = 3;
static const _commandRetryDelay = Duration(milliseconds: 200);
// Event streams
final _eventController = BehaviorSubject<BridgeEvent>();
final _connectionStatusController =
BehaviorSubject<Map<String, bool>>.seeded({});
// Callback for state resync after reconnection
void Function(String serverId)? onReconnected;
/// Stream of all events from all bridges
Stream<BridgeEvent> get eventStream => _eventController.stream;
/// Stream of connection status per server
Stream<Map<String, bool>> get connectionStatus =>
_connectionStatusController.stream;
/// Get current connection status
Map<String, bool> get currentConnectionStatus =>
_connectionStatusController.value;
/// Initialize the service with server configurations
Future<void> initialize(List<ServerConfig> servers) async {
_logger.i('Initializing BridgeService with ${servers.length} servers');
for (final server in servers) {
if (!server.enabled) {
_logger.d('Skipping disabled server: ${server.id}');
continue;
}
_servers[server.id] = server;
// Create HTTP client
_httpClients[server.id] = Dio(BaseOptions(
baseUrl: server.bridgeUrl,
connectTimeout: const Duration(seconds: 5),
receiveTimeout: const Duration(seconds: 10),
));
_updateConnectionStatus(server.id, false);
}
}
/// Connect to a specific server's bridge
Future<bool> connect(String serverId) async {
final server = _servers[serverId];
if (server == null) {
_logger.w('Unknown server: $serverId');
return false;
}
try {
// Check bridge health
final response =
await _httpClients[serverId]!.get('/health');
if (response.statusCode == 200) {
_updateConnectionStatus(serverId, true);
_logger.i('Connected to bridge: $serverId');
// Connect WebSocket for events if available
if (server.websocketUrl != null) {
await _connectWebSocket(serverId, server.websocketUrl!);
}
return true;
}
} catch (e) {
_logger.e('Failed to connect to $serverId: $e');
}
_updateConnectionStatus(serverId, false);
return false;
}
/// Connect to all configured servers and start health monitoring
Future<void> connectAll() async {
for (final serverId in _servers.keys) {
await connect(serverId);
}
_startHealthChecks();
}
/// Disconnect from a specific server
Future<void> disconnect(String serverId) async {
await _disconnectWebSocket(serverId);
_updateConnectionStatus(serverId, false);
_logger.i('Disconnected from bridge: $serverId');
}
/// Disconnect from all servers
Future<void> disconnectAll() async {
for (final serverId in _servers.keys) {
await disconnect(serverId);
}
}
// ============================================================
// Command Methods
// ============================================================
/// Switch camera to monitor (ViewerConnectLive).
/// Critical command — retries up to 3 times with backoff.
Future<bool> viewerConnectLive(int viewer, int channel) async {
final serverId = _findServerForMonitor(viewer);
if (serverId == null) {
_logger.w('No server found for monitor $viewer');
return false;
}
return _retryCommand('viewerConnectLive', () async {
final response = await _httpClients[serverId]!.post(
'/viewer/connect-live',
data: {'Viewer': viewer, 'Channel': channel},
);
return response.statusCode == 200;
});
}
/// Clear monitor (ViewerClear)
Future<bool> viewerClear(int viewer) async {
final serverId = _findServerForMonitor(viewer);
if (serverId == null) return false;
try {
final response = await _httpClients[serverId]!.post(
'/viewer/clear',
data: {'Viewer': viewer},
);
return response.statusCode == 200;
} catch (e) {
_logger.e('viewerClear failed: $e');
return false;
}
}
/// PTZ Pan control
Future<bool> ptzPan(int camera, String direction, int speed) async {
final serverId = _findServerForCamera(camera);
if (serverId == null) return false;
try {
final response = await _httpClients[serverId]!.post(
'/camera/pan',
data: {'Camera': camera, 'Direction': direction, 'Speed': speed},
);
return response.statusCode == 200;
} catch (e) {
_logger.e('ptzPan failed: $e');
return false;
}
}
/// PTZ Tilt control
Future<bool> ptzTilt(int camera, String direction, int speed) async {
final serverId = _findServerForCamera(camera);
if (serverId == null) return false;
try {
final response = await _httpClients[serverId]!.post(
'/camera/tilt',
data: {'Camera': camera, 'Direction': direction, 'Speed': speed},
);
return response.statusCode == 200;
} catch (e) {
_logger.e('ptzTilt failed: $e');
return false;
}
}
/// PTZ Zoom control
Future<bool> ptzZoom(int camera, String direction, int speed) async {
final serverId = _findServerForCamera(camera);
if (serverId == null) return false;
try {
final response = await _httpClients[serverId]!.post(
'/camera/zoom',
data: {'Camera': camera, 'Direction': direction, 'Speed': speed},
);
return response.statusCode == 200;
} catch (e) {
_logger.e('ptzZoom failed: $e');
return false;
}
}
/// PTZ Stop all movement
Future<bool> ptzStop(int camera) async {
final serverId = _findServerForCamera(camera);
if (serverId == null) return false;
try {
final response = await _httpClients[serverId]!.post(
'/camera/stop',
data: {'Camera': camera},
);
return response.statusCode == 200;
} catch (e) {
_logger.e('ptzStop failed: $e');
return false;
}
}
/// PTZ Go to preset
Future<bool> ptzPreset(int camera, int preset) async {
final serverId = _findServerForCamera(camera);
if (serverId == null) return false;
try {
final response = await _httpClients[serverId]!.post(
'/camera/preset',
data: {'Camera': camera, 'Preset': preset},
);
return response.statusCode == 200;
} catch (e) {
_logger.e('ptzPreset failed: $e');
return false;
}
}
// ============================================================
// Query Methods
// ============================================================
/// Get current monitor states from a bridge
Future<List<Map<String, dynamic>>> getMonitorStates(String serverId) async {
try {
final response = await _httpClients[serverId]!.get('/monitors');
if (response.statusCode == 200) {
final data = response.data as Map<String, dynamic>;
final monitors = data['monitors'] as List<dynamic>? ?? [];
return monitors.cast<Map<String, dynamic>>();
}
} catch (e) {
_logger.e('getMonitorStates failed for $serverId: $e');
}
return [];
}
/// Get active alarms from a bridge
Future<List<Map<String, dynamic>>> getActiveAlarms(String serverId) async {
try {
final response = await _httpClients[serverId]!.get('/alarms/active');
if (response.statusCode == 200) {
final data = response.data as Map<String, dynamic>;
final alarms = data['alarms'] as List<dynamic>? ?? [];
return alarms.cast<Map<String, dynamic>>();
}
} catch (e) {
_logger.e('getActiveAlarms failed for $serverId: $e');
}
return [];
}
/// Get bridge status
Future<Map<String, dynamic>?> getBridgeStatus(String serverId) async {
try {
final response = await _httpClients[serverId]!.get('/status');
if (response.statusCode == 200) {
return response.data as Map<String, dynamic>;
}
} catch (e) {
_logger.e('getBridgeStatus failed for $serverId: $e');
}
return null;
}
// ============================================================
// Private Methods
// ============================================================
String? _findServerForCamera(int cameraId) {
for (final entry in _servers.entries) {
if (entry.value.ownsCamera(cameraId)) {
return entry.key;
}
}
return null;
}
String? _findServerForMonitor(int monitorId) {
for (final entry in _servers.entries) {
if (entry.value.ownsMonitor(monitorId)) {
return entry.key;
}
}
return null;
}
void _updateConnectionStatus(String serverId, bool connected) {
final current = Map<String, bool>.from(_connectionStatusController.value);
current[serverId] = connected;
_connectionStatusController.add(current);
}
Future<void> _connectWebSocket(String serverId, String url) async {
try {
await _disconnectWebSocket(serverId);
_logger.d('Connecting WebSocket to $url');
final channel = WebSocketChannel.connect(Uri.parse(url));
_wsChannels[serverId] = channel;
_reconnectAttempts[serverId] = 0; // Reset on successful connection
_wsSubscriptions[serverId] = channel.stream.listen(
(message) {
try {
final json = jsonDecode(message as String) as Map<String, dynamic>;
if (json['type'] == 'connected') {
_logger.d('WebSocket connected to $serverId');
return;
}
final event = BridgeEvent.fromJson(json, serverId);
_eventController.add(event);
} catch (e) {
_logger.e('Failed to parse WebSocket message: $e');
}
},
onError: (error) {
_logger.e('WebSocket error for $serverId: $error');
_updateConnectionStatus(serverId, false);
_scheduleReconnect(serverId);
},
onDone: () {
_logger.w('WebSocket closed for $serverId');
_updateConnectionStatus(serverId, false);
_scheduleReconnect(serverId);
},
);
} catch (e) {
_logger.e('Failed to connect WebSocket to $serverId: $e');
_scheduleReconnect(serverId);
}
}
Future<void> _disconnectWebSocket(String serverId) async {
await _wsSubscriptions[serverId]?.cancel();
_wsSubscriptions.remove(serverId);
await _wsChannels[serverId]?.sink.close();
_wsChannels.remove(serverId);
}
/// Retry a critical command with exponential backoff.
/// Used for CrossSwitch and other critical operations.
Future<bool> _retryCommand(String name, Future<bool> Function() command) async {
for (int attempt = 1; attempt <= _commandRetryCount; attempt++) {
try {
final result = await command();
if (result) return true;
} catch (e) {
if (attempt == _commandRetryCount) {
_logger.e('$name failed after $attempt attempts: $e');
return false;
}
_logger.w('$name attempt $attempt failed, retrying: $e');
await Future.delayed(_commandRetryDelay * attempt);
}
}
return false;
}
/// Schedule WebSocket reconnection with exponential backoff.
void _scheduleReconnect(String serverId) {
if (_disposed) return;
_reconnectTimers[serverId]?.cancel();
final attempts = _reconnectAttempts[serverId] ?? 0;
final delay = Duration(
milliseconds: (_initialReconnectDelay.inMilliseconds *
(1 << attempts.clamp(0, 5))) // 1s, 2s, 4s, 8s, 16s, 32s
.clamp(0, _maxReconnectDelay.inMilliseconds),
);
_logger.i('Scheduling reconnect for $serverId in ${delay.inSeconds}s (attempt ${attempts + 1})');
_reconnectTimers[serverId] = Timer(delay, () async {
if (_disposed) return;
_reconnectAttempts[serverId] = attempts + 1;
final server = _servers[serverId];
if (server == null) return;
// Check if bridge is healthy before reconnecting WebSocket
try {
final response = await _httpClients[serverId]!.get('/health');
if (response.statusCode == 200) {
_updateConnectionStatus(serverId, true);
if (server.websocketUrl != null) {
await _connectWebSocket(serverId, server.websocketUrl!);
}
_logger.i('Reconnected to $serverId');
onReconnected?.call(serverId);
}
} catch (e) {
_logger.d('Reconnect health check failed for $serverId: $e');
_scheduleReconnect(serverId); // Try again
}
});
}
/// Start periodic health checks for all bridges.
/// Detects when a bridge comes back online after failure.
void _startHealthChecks() {
_healthCheckTimer?.cancel();
_healthCheckTimer = Timer.periodic(_healthCheckInterval, (_) async {
if (_disposed) return;
for (final serverId in _servers.keys) {
final isConnected = currentConnectionStatus[serverId] ?? false;
if (!isConnected) {
// Bridge is down — try to reconnect
try {
final response = await _httpClients[serverId]!.get('/health');
if (response.statusCode == 200) {
_logger.i('Bridge $serverId is back online');
_updateConnectionStatus(serverId, true);
_reconnectAttempts[serverId] = 0;
final server = _servers[serverId];
if (server?.websocketUrl != null) {
await _connectWebSocket(serverId, server!.websocketUrl!);
}
onReconnected?.call(serverId);
}
} catch (_) {
// Still down, will check again next cycle
}
}
}
});
}
/// Dispose of all resources
void dispose() {
_disposed = true;
_healthCheckTimer?.cancel();
for (final timer in _reconnectTimers.values) {
timer.cancel();
}
_reconnectTimers.clear();
_eventController.close();
_connectionStatusController.close();
for (final sub in _wsSubscriptions.values) {
sub.cancel();
}
_wsSubscriptions.clear();
for (final channel in _wsChannels.values) {
channel.sink.close();
}
_wsChannels.clear();
_httpClients.clear();
_servers.clear();
}
}