""" Monitor service for managing monitor discovery and information """ from typing import List, Optional, Dict, Any from datetime import datetime import structlog from clients.sdk_bridge_client import sdk_bridge_client from clients.redis_client import redis_client from config import settings logger = structlog.get_logger() # Redis cache TTL for monitor data (60 seconds) MONITOR_CACHE_TTL = 60 class MonitorService: """Service for monitor operations""" def __init__(self): """Initialize monitor service""" pass async def list_monitors(self, use_cache: bool = True) -> Dict[str, Any]: """ Get list of all monitors from SDK Bridge Args: use_cache: Whether to use Redis cache (default: True) Returns: Dictionary with 'monitors' list and 'total' count """ cache_key = "monitors:list" # Try to get from cache first if use_cache: cached_data = await redis_client.get_json(cache_key) if cached_data: logger.info("monitor_list_cache_hit") return cached_data logger.info("monitor_list_cache_miss_fetching_from_sdk") try: # Fetch monitors from SDK Bridge via gRPC monitors = await sdk_bridge_client.list_monitors() # Transform to response format result = { "monitors": monitors, "total": len(monitors) } # Cache the result if use_cache: await redis_client.set_json(cache_key, result, expire=MONITOR_CACHE_TTL) logger.info("monitor_list_cached", count=len(monitors), ttl=MONITOR_CACHE_TTL) return result except Exception as e: logger.error("monitor_list_failed", error=str(e), exc_info=True) # Return empty list on error return {"monitors": [], "total": 0} async def get_monitor(self, monitor_id: int, use_cache: bool = True) -> Optional[Dict[str, Any]]: """ Get single monitor by ID Args: monitor_id: Monitor ID (output channel number) use_cache: Whether to use Redis cache (default: True) Returns: Monitor dictionary or None if not found """ cache_key = f"monitors:detail:{monitor_id}" # Try to get from cache first if use_cache: cached_data = await redis_client.get_json(cache_key) if cached_data: logger.info("monitor_detail_cache_hit", monitor_id=monitor_id) return cached_data logger.info("monitor_detail_cache_miss_fetching_from_sdk", monitor_id=monitor_id) try: # Fetch monitor from SDK Bridge via gRPC monitor = await sdk_bridge_client.get_monitor(monitor_id) if not monitor: logger.warning("monitor_not_found", monitor_id=monitor_id) return None # Cache the result if use_cache: await redis_client.set_json(cache_key, monitor, expire=MONITOR_CACHE_TTL) logger.info("monitor_detail_cached", monitor_id=monitor_id, ttl=MONITOR_CACHE_TTL) return monitor except Exception as e: logger.error("monitor_detail_failed", monitor_id=monitor_id, error=str(e), exc_info=True) return None async def invalidate_cache(self, monitor_id: Optional[int] = None) -> None: """ Invalidate monitor cache Args: monitor_id: Specific monitor ID to invalidate, or None to invalidate all """ if monitor_id is not None: # Invalidate specific monitor cache_key = f"monitors:detail:{monitor_id}" await redis_client.delete(cache_key) logger.info("monitor_cache_invalidated", monitor_id=monitor_id) else: # Invalidate monitor list cache await redis_client.delete("monitors:list") logger.info("monitor_list_cache_invalidated") async def refresh_monitor_list(self) -> Dict[str, Any]: """ Force refresh monitor list from SDK Bridge (bypass cache) Returns: Dictionary with 'monitors' list and 'total' count """ logger.info("monitor_list_force_refresh") # Invalidate cache first await self.invalidate_cache() # Fetch fresh data return await self.list_monitors(use_cache=False) async def get_monitor_count(self) -> int: """ Get total number of monitors Returns: Total monitor count """ result = await self.list_monitors(use_cache=True) return result["total"] async def search_monitors(self, query: str) -> List[Dict[str, Any]]: """ Search monitors by name or description Args: query: Search query string Returns: List of matching monitors """ result = await self.list_monitors(use_cache=True) monitors = result["monitors"] # Simple case-insensitive search query_lower = query.lower() matching = [ mon for mon in monitors if query_lower in mon.get("name", "").lower() or query_lower in mon.get("description", "").lower() ] logger.info("monitor_search", query=query, matches=len(matching)) return matching async def get_available_monitors(self) -> List[Dict[str, Any]]: """ Get list of available (idle/free) monitors Returns: List of monitors with no camera assigned """ result = await self.list_monitors(use_cache=True) monitors = result["monitors"] # Available monitors have no camera assigned (current_camera_id is None or 0) available = [ mon for mon in monitors if mon.get("current_camera_id") is None or mon.get("current_camera_id") == 0 ] logger.info("available_monitors_retrieved", count=len(available), total=len(monitors)) return available async def get_active_monitors(self) -> List[Dict[str, Any]]: """ Get list of active monitors (displaying a camera) Returns: List of monitors with a camera assigned """ result = await self.list_monitors(use_cache=True) monitors = result["monitors"] # Active monitors have a camera assigned active = [ mon for mon in monitors if mon.get("current_camera_id") is not None and mon.get("current_camera_id") != 0 ] logger.info("active_monitors_retrieved", count=len(active), total=len(monitors)) return active async def get_monitor_routing(self) -> Dict[int, Optional[int]]: """ Get current routing state (monitor_id -> camera_id mapping) Returns: Dictionary mapping monitor IDs to current camera IDs """ result = await self.list_monitors(use_cache=True) monitors = result["monitors"] routing = { mon["id"]: mon.get("current_camera_id") for mon in monitors } logger.info("monitor_routing_retrieved", monitors=len(routing)) return routing