Flutter web app replacing legacy WPF CCTV surveillance keyboard controller. Includes wall overview, section view with monitor grid, camera input, PTZ control, alarm/lock/sequence BLoCs, and legacy-matching UI styling. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
488 lines
15 KiB
Dart
488 lines
15 KiB
Dart
import 'dart:async';
|
|
import 'dart:convert';
|
|
|
|
import 'package:dio/dio.dart';
|
|
import 'package:logger/logger.dart';
|
|
import 'package:rxdart/rxdart.dart';
|
|
import 'package:web_socket_channel/web_socket_channel.dart';
|
|
|
|
import '../../domain/entities/server_config.dart';
|
|
import '../models/bridge_event.dart';
|
|
|
|
/// Service for communicating with all bridges.
|
|
/// Includes auto-reconnection with exponential backoff and health polling.
|
|
class BridgeService {
|
|
final Logger _logger = Logger();
|
|
final Map<String, Dio> _httpClients = {};
|
|
final Map<String, WebSocketChannel> _wsChannels = {};
|
|
final Map<String, StreamSubscription> _wsSubscriptions = {};
|
|
final Map<String, ServerConfig> _servers = {};
|
|
final Map<String, int> _reconnectAttempts = {};
|
|
final Map<String, Timer> _reconnectTimers = {};
|
|
Timer? _healthCheckTimer;
|
|
bool _disposed = false;
|
|
|
|
// Reconnection config
|
|
static const _initialReconnectDelay = Duration(seconds: 1);
|
|
static const _maxReconnectDelay = Duration(seconds: 30);
|
|
static const _healthCheckInterval = Duration(seconds: 10);
|
|
static const _commandRetryCount = 3;
|
|
static const _commandRetryDelay = Duration(milliseconds: 200);
|
|
|
|
// Event streams
|
|
final _eventController = BehaviorSubject<BridgeEvent>();
|
|
final _connectionStatusController =
|
|
BehaviorSubject<Map<String, bool>>.seeded({});
|
|
|
|
// Callback for state resync after reconnection
|
|
void Function(String serverId)? onReconnected;
|
|
|
|
/// Stream of all events from all bridges
|
|
Stream<BridgeEvent> get eventStream => _eventController.stream;
|
|
|
|
/// Stream of connection status per server
|
|
Stream<Map<String, bool>> get connectionStatus =>
|
|
_connectionStatusController.stream;
|
|
|
|
/// Get current connection status
|
|
Map<String, bool> get currentConnectionStatus =>
|
|
_connectionStatusController.value;
|
|
|
|
/// Initialize the service with server configurations
|
|
Future<void> initialize(List<ServerConfig> servers) async {
|
|
_logger.i('Initializing BridgeService with ${servers.length} servers');
|
|
|
|
for (final server in servers) {
|
|
if (!server.enabled) {
|
|
_logger.d('Skipping disabled server: ${server.id}');
|
|
continue;
|
|
}
|
|
|
|
_servers[server.id] = server;
|
|
|
|
// Create HTTP client
|
|
_httpClients[server.id] = Dio(BaseOptions(
|
|
baseUrl: server.bridgeUrl,
|
|
connectTimeout: const Duration(seconds: 5),
|
|
receiveTimeout: const Duration(seconds: 10),
|
|
));
|
|
|
|
_updateConnectionStatus(server.id, false);
|
|
}
|
|
}
|
|
|
|
/// Connect to a specific server's bridge
|
|
Future<bool> connect(String serverId) async {
|
|
final server = _servers[serverId];
|
|
if (server == null) {
|
|
_logger.w('Unknown server: $serverId');
|
|
return false;
|
|
}
|
|
|
|
try {
|
|
// Check bridge health
|
|
final response =
|
|
await _httpClients[serverId]!.get('/health');
|
|
if (response.statusCode == 200) {
|
|
_updateConnectionStatus(serverId, true);
|
|
_logger.i('Connected to bridge: $serverId');
|
|
|
|
// Connect WebSocket for events if available
|
|
if (server.websocketUrl != null) {
|
|
await _connectWebSocket(serverId, server.websocketUrl!);
|
|
}
|
|
|
|
return true;
|
|
}
|
|
} catch (e) {
|
|
_logger.e('Failed to connect to $serverId: $e');
|
|
}
|
|
|
|
_updateConnectionStatus(serverId, false);
|
|
return false;
|
|
}
|
|
|
|
/// Connect to all configured servers and start health monitoring
|
|
Future<void> connectAll() async {
|
|
for (final serverId in _servers.keys) {
|
|
await connect(serverId);
|
|
}
|
|
_startHealthChecks();
|
|
}
|
|
|
|
/// Disconnect from a specific server
|
|
Future<void> disconnect(String serverId) async {
|
|
await _disconnectWebSocket(serverId);
|
|
_updateConnectionStatus(serverId, false);
|
|
_logger.i('Disconnected from bridge: $serverId');
|
|
}
|
|
|
|
/// Disconnect from all servers
|
|
Future<void> disconnectAll() async {
|
|
for (final serverId in _servers.keys) {
|
|
await disconnect(serverId);
|
|
}
|
|
}
|
|
|
|
// ============================================================
|
|
// Command Methods
|
|
// ============================================================
|
|
|
|
/// Switch camera to monitor (ViewerConnectLive).
|
|
/// Critical command — retries up to 3 times with backoff.
|
|
Future<bool> viewerConnectLive(int viewer, int channel) async {
|
|
final serverId = _findServerForMonitor(viewer);
|
|
if (serverId == null) {
|
|
_logger.w('No server found for monitor $viewer');
|
|
return false;
|
|
}
|
|
|
|
return _retryCommand('viewerConnectLive', () async {
|
|
final response = await _httpClients[serverId]!.post(
|
|
'/viewer/connect-live',
|
|
data: {'Viewer': viewer, 'Channel': channel},
|
|
);
|
|
return response.statusCode == 200;
|
|
});
|
|
}
|
|
|
|
/// Clear monitor (ViewerClear)
|
|
Future<bool> viewerClear(int viewer) async {
|
|
final serverId = _findServerForMonitor(viewer);
|
|
if (serverId == null) return false;
|
|
|
|
try {
|
|
final response = await _httpClients[serverId]!.post(
|
|
'/viewer/clear',
|
|
data: {'Viewer': viewer},
|
|
);
|
|
return response.statusCode == 200;
|
|
} catch (e) {
|
|
_logger.e('viewerClear failed: $e');
|
|
return false;
|
|
}
|
|
}
|
|
|
|
/// PTZ Pan control
|
|
Future<bool> ptzPan(int camera, String direction, int speed) async {
|
|
final serverId = _findServerForCamera(camera);
|
|
if (serverId == null) return false;
|
|
|
|
try {
|
|
final response = await _httpClients[serverId]!.post(
|
|
'/camera/pan',
|
|
data: {'Camera': camera, 'Direction': direction, 'Speed': speed},
|
|
);
|
|
return response.statusCode == 200;
|
|
} catch (e) {
|
|
_logger.e('ptzPan failed: $e');
|
|
return false;
|
|
}
|
|
}
|
|
|
|
/// PTZ Tilt control
|
|
Future<bool> ptzTilt(int camera, String direction, int speed) async {
|
|
final serverId = _findServerForCamera(camera);
|
|
if (serverId == null) return false;
|
|
|
|
try {
|
|
final response = await _httpClients[serverId]!.post(
|
|
'/camera/tilt',
|
|
data: {'Camera': camera, 'Direction': direction, 'Speed': speed},
|
|
);
|
|
return response.statusCode == 200;
|
|
} catch (e) {
|
|
_logger.e('ptzTilt failed: $e');
|
|
return false;
|
|
}
|
|
}
|
|
|
|
/// PTZ Zoom control
|
|
Future<bool> ptzZoom(int camera, String direction, int speed) async {
|
|
final serverId = _findServerForCamera(camera);
|
|
if (serverId == null) return false;
|
|
|
|
try {
|
|
final response = await _httpClients[serverId]!.post(
|
|
'/camera/zoom',
|
|
data: {'Camera': camera, 'Direction': direction, 'Speed': speed},
|
|
);
|
|
return response.statusCode == 200;
|
|
} catch (e) {
|
|
_logger.e('ptzZoom failed: $e');
|
|
return false;
|
|
}
|
|
}
|
|
|
|
/// PTZ Stop all movement
|
|
Future<bool> ptzStop(int camera) async {
|
|
final serverId = _findServerForCamera(camera);
|
|
if (serverId == null) return false;
|
|
|
|
try {
|
|
final response = await _httpClients[serverId]!.post(
|
|
'/camera/stop',
|
|
data: {'Camera': camera},
|
|
);
|
|
return response.statusCode == 200;
|
|
} catch (e) {
|
|
_logger.e('ptzStop failed: $e');
|
|
return false;
|
|
}
|
|
}
|
|
|
|
/// PTZ Go to preset
|
|
Future<bool> ptzPreset(int camera, int preset) async {
|
|
final serverId = _findServerForCamera(camera);
|
|
if (serverId == null) return false;
|
|
|
|
try {
|
|
final response = await _httpClients[serverId]!.post(
|
|
'/camera/preset',
|
|
data: {'Camera': camera, 'Preset': preset},
|
|
);
|
|
return response.statusCode == 200;
|
|
} catch (e) {
|
|
_logger.e('ptzPreset failed: $e');
|
|
return false;
|
|
}
|
|
}
|
|
|
|
// ============================================================
|
|
// Query Methods
|
|
// ============================================================
|
|
|
|
/// Get current monitor states from a bridge
|
|
Future<List<Map<String, dynamic>>> getMonitorStates(String serverId) async {
|
|
try {
|
|
final response = await _httpClients[serverId]!.get('/monitors');
|
|
if (response.statusCode == 200) {
|
|
final data = response.data as Map<String, dynamic>;
|
|
final monitors = data['monitors'] as List<dynamic>? ?? [];
|
|
return monitors.cast<Map<String, dynamic>>();
|
|
}
|
|
} catch (e) {
|
|
_logger.e('getMonitorStates failed for $serverId: $e');
|
|
}
|
|
return [];
|
|
}
|
|
|
|
/// Get active alarms from a bridge
|
|
Future<List<Map<String, dynamic>>> getActiveAlarms(String serverId) async {
|
|
try {
|
|
final response = await _httpClients[serverId]!.get('/alarms/active');
|
|
if (response.statusCode == 200) {
|
|
final data = response.data as Map<String, dynamic>;
|
|
final alarms = data['alarms'] as List<dynamic>? ?? [];
|
|
return alarms.cast<Map<String, dynamic>>();
|
|
}
|
|
} catch (e) {
|
|
_logger.e('getActiveAlarms failed for $serverId: $e');
|
|
}
|
|
return [];
|
|
}
|
|
|
|
/// Get bridge status
|
|
Future<Map<String, dynamic>?> getBridgeStatus(String serverId) async {
|
|
try {
|
|
final response = await _httpClients[serverId]!.get('/status');
|
|
if (response.statusCode == 200) {
|
|
return response.data as Map<String, dynamic>;
|
|
}
|
|
} catch (e) {
|
|
_logger.e('getBridgeStatus failed for $serverId: $e');
|
|
}
|
|
return null;
|
|
}
|
|
|
|
// ============================================================
|
|
// Private Methods
|
|
// ============================================================
|
|
|
|
String? _findServerForCamera(int cameraId) {
|
|
for (final entry in _servers.entries) {
|
|
if (entry.value.ownsCamera(cameraId)) {
|
|
return entry.key;
|
|
}
|
|
}
|
|
return null;
|
|
}
|
|
|
|
String? _findServerForMonitor(int monitorId) {
|
|
for (final entry in _servers.entries) {
|
|
if (entry.value.ownsMonitor(monitorId)) {
|
|
return entry.key;
|
|
}
|
|
}
|
|
return null;
|
|
}
|
|
|
|
void _updateConnectionStatus(String serverId, bool connected) {
|
|
final current = Map<String, bool>.from(_connectionStatusController.value);
|
|
current[serverId] = connected;
|
|
_connectionStatusController.add(current);
|
|
}
|
|
|
|
Future<void> _connectWebSocket(String serverId, String url) async {
|
|
try {
|
|
await _disconnectWebSocket(serverId);
|
|
|
|
_logger.d('Connecting WebSocket to $url');
|
|
final channel = WebSocketChannel.connect(Uri.parse(url));
|
|
_wsChannels[serverId] = channel;
|
|
_reconnectAttempts[serverId] = 0; // Reset on successful connection
|
|
|
|
_wsSubscriptions[serverId] = channel.stream.listen(
|
|
(message) {
|
|
try {
|
|
final json = jsonDecode(message as String) as Map<String, dynamic>;
|
|
if (json['type'] == 'connected') {
|
|
_logger.d('WebSocket connected to $serverId');
|
|
return;
|
|
}
|
|
final event = BridgeEvent.fromJson(json, serverId);
|
|
_eventController.add(event);
|
|
} catch (e) {
|
|
_logger.e('Failed to parse WebSocket message: $e');
|
|
}
|
|
},
|
|
onError: (error) {
|
|
_logger.e('WebSocket error for $serverId: $error');
|
|
_updateConnectionStatus(serverId, false);
|
|
_scheduleReconnect(serverId);
|
|
},
|
|
onDone: () {
|
|
_logger.w('WebSocket closed for $serverId');
|
|
_updateConnectionStatus(serverId, false);
|
|
_scheduleReconnect(serverId);
|
|
},
|
|
);
|
|
} catch (e) {
|
|
_logger.e('Failed to connect WebSocket to $serverId: $e');
|
|
_scheduleReconnect(serverId);
|
|
}
|
|
}
|
|
|
|
Future<void> _disconnectWebSocket(String serverId) async {
|
|
await _wsSubscriptions[serverId]?.cancel();
|
|
_wsSubscriptions.remove(serverId);
|
|
|
|
await _wsChannels[serverId]?.sink.close();
|
|
_wsChannels.remove(serverId);
|
|
}
|
|
|
|
/// Retry a critical command with exponential backoff.
|
|
/// Used for CrossSwitch and other critical operations.
|
|
Future<bool> _retryCommand(String name, Future<bool> Function() command) async {
|
|
for (int attempt = 1; attempt <= _commandRetryCount; attempt++) {
|
|
try {
|
|
final result = await command();
|
|
if (result) return true;
|
|
} catch (e) {
|
|
if (attempt == _commandRetryCount) {
|
|
_logger.e('$name failed after $attempt attempts: $e');
|
|
return false;
|
|
}
|
|
_logger.w('$name attempt $attempt failed, retrying: $e');
|
|
await Future.delayed(_commandRetryDelay * attempt);
|
|
}
|
|
}
|
|
return false;
|
|
}
|
|
|
|
/// Schedule WebSocket reconnection with exponential backoff.
|
|
void _scheduleReconnect(String serverId) {
|
|
if (_disposed) return;
|
|
_reconnectTimers[serverId]?.cancel();
|
|
|
|
final attempts = _reconnectAttempts[serverId] ?? 0;
|
|
final delay = Duration(
|
|
milliseconds: (_initialReconnectDelay.inMilliseconds *
|
|
(1 << attempts.clamp(0, 5))) // 1s, 2s, 4s, 8s, 16s, 32s
|
|
.clamp(0, _maxReconnectDelay.inMilliseconds),
|
|
);
|
|
|
|
_logger.i('Scheduling reconnect for $serverId in ${delay.inSeconds}s (attempt ${attempts + 1})');
|
|
|
|
_reconnectTimers[serverId] = Timer(delay, () async {
|
|
if (_disposed) return;
|
|
_reconnectAttempts[serverId] = attempts + 1;
|
|
|
|
final server = _servers[serverId];
|
|
if (server == null) return;
|
|
|
|
// Check if bridge is healthy before reconnecting WebSocket
|
|
try {
|
|
final response = await _httpClients[serverId]!.get('/health');
|
|
if (response.statusCode == 200) {
|
|
_updateConnectionStatus(serverId, true);
|
|
if (server.websocketUrl != null) {
|
|
await _connectWebSocket(serverId, server.websocketUrl!);
|
|
}
|
|
_logger.i('Reconnected to $serverId');
|
|
onReconnected?.call(serverId);
|
|
}
|
|
} catch (e) {
|
|
_logger.d('Reconnect health check failed for $serverId: $e');
|
|
_scheduleReconnect(serverId); // Try again
|
|
}
|
|
});
|
|
}
|
|
|
|
/// Start periodic health checks for all bridges.
|
|
/// Detects when a bridge comes back online after failure.
|
|
void _startHealthChecks() {
|
|
_healthCheckTimer?.cancel();
|
|
_healthCheckTimer = Timer.periodic(_healthCheckInterval, (_) async {
|
|
if (_disposed) return;
|
|
for (final serverId in _servers.keys) {
|
|
final isConnected = currentConnectionStatus[serverId] ?? false;
|
|
if (!isConnected) {
|
|
// Bridge is down — try to reconnect
|
|
try {
|
|
final response = await _httpClients[serverId]!.get('/health');
|
|
if (response.statusCode == 200) {
|
|
_logger.i('Bridge $serverId is back online');
|
|
_updateConnectionStatus(serverId, true);
|
|
_reconnectAttempts[serverId] = 0;
|
|
|
|
final server = _servers[serverId];
|
|
if (server?.websocketUrl != null) {
|
|
await _connectWebSocket(serverId, server!.websocketUrl!);
|
|
}
|
|
onReconnected?.call(serverId);
|
|
}
|
|
} catch (_) {
|
|
// Still down, will check again next cycle
|
|
}
|
|
}
|
|
}
|
|
});
|
|
}
|
|
|
|
/// Dispose of all resources
|
|
void dispose() {
|
|
_disposed = true;
|
|
_healthCheckTimer?.cancel();
|
|
for (final timer in _reconnectTimers.values) {
|
|
timer.cancel();
|
|
}
|
|
_reconnectTimers.clear();
|
|
_eventController.close();
|
|
_connectionStatusController.close();
|
|
|
|
for (final sub in _wsSubscriptions.values) {
|
|
sub.cancel();
|
|
}
|
|
_wsSubscriptions.clear();
|
|
|
|
for (final channel in _wsChannels.values) {
|
|
channel.sink.close();
|
|
}
|
|
_wsChannels.clear();
|
|
|
|
_httpClients.clear();
|
|
_servers.clear();
|
|
}
|
|
}
|