Phase 6: Monitor Discovery (T056-T062)

Implemented complete monitor discovery system with Redis caching:

**Tests:**
- Contract tests for GET /api/v1/monitors (list monitors)
- Contract tests for GET /api/v1/monitors/{id} (monitor detail)
- Tests for available/active monitor filtering
- Integration tests for monitor data consistency
- Tests for caching behavior and all authentication roles

**Schemas:**
- MonitorInfo: Monitor data model (id, name, description, status, current_camera_id)
- MonitorListResponse: List endpoint response
- MonitorDetailResponse: Detail endpoint response with extended fields
- MonitorStatusEnum: Status constants (active, idle, offline, unknown, error, maintenance)

**Services:**
- MonitorService: list_monitors(), get_monitor(), invalidate_cache()
- Additional methods: search_monitors(), get_available_monitors(), get_active_monitors()
- get_monitor_routing(): Get current routing state (monitor -> camera mapping)
- Integrated Redis caching with 60s TTL
- Automatic cache invalidation and refresh

**Router Endpoints:**
- GET /api/v1/monitors - List all monitors (cached, 60s TTL)
- GET /api/v1/monitors/{id} - Get monitor details
- POST /api/v1/monitors/refresh - Force refresh (bypass cache)
- GET /api/v1/monitors/search/{query} - Search monitors by name/description
- GET /api/v1/monitors/filter/available - Get available (idle) monitors
- GET /api/v1/monitors/filter/active - Get active monitors (displaying camera)
- GET /api/v1/monitors/routing - Get current routing state

**Authorization:**
- All monitor endpoints require at least Viewer role
- All authenticated users can read monitor data

**Integration:**
- Registered monitor router in main.py
- Monitor service communicates with SDK Bridge via gRPC
- Redis caching for performance optimization

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
Geutebruck API Developer
2025-12-09 09:23:17 +01:00
parent 4866a8edc3
commit 0361826d3e
5 changed files with 960 additions and 3 deletions

View File

@@ -158,13 +158,13 @@ async def root():
}
# Register routers
from routers import auth, cameras
from routers import auth, cameras, monitors
app.include_router(auth.router)
app.include_router(cameras.router)
app.include_router(monitors.router)
# TODO: Add remaining routers as phases complete
# from routers import monitors, crossswitch
# app.include_router(monitors.router)
# from routers import crossswitch
# app.include_router(crossswitch.router)
if __name__ == "__main__":

341
src/api/routers/monitors.py Normal file
View File

@@ -0,0 +1,341 @@
"""
Monitor router for monitor discovery and information
"""
from fastapi import APIRouter, Depends, status, HTTPException, Query
from fastapi.responses import JSONResponse
import structlog
from schemas.monitor import MonitorListResponse, MonitorDetailResponse
from services.monitor_service import MonitorService
from middleware.auth_middleware import require_viewer, get_current_user
from models.user import User
logger = structlog.get_logger()
router = APIRouter(
prefix="/api/v1/monitors",
tags=["monitors"]
)
@router.get(
"",
response_model=MonitorListResponse,
status_code=status.HTTP_200_OK,
summary="List all monitors",
description="Get list of all monitors (video outputs) from GeViScope",
dependencies=[Depends(require_viewer)] # Requires at least viewer role
)
async def list_monitors(
use_cache: bool = Query(True, description="Use Redis cache (60s TTL)"),
current_user: User = Depends(require_viewer)
):
"""
Get list of all monitors from GeViScope SDK Bridge
**Authentication Required:**
- Minimum role: Viewer (all authenticated users can read monitors)
**Query Parameters:**
- `use_cache`: Use Redis cache (default: true, TTL: 60s)
**Response:**
- `monitors`: List of monitor objects
- `total`: Total number of monitors
**Caching:**
- Results are cached in Redis for 60 seconds
- Set `use_cache=false` to bypass cache and fetch fresh data
**Monitor Object:**
- `id`: Monitor ID (output channel number)
- `name`: Monitor name
- `description`: Monitor description
- `status`: Monitor status (active, idle, offline, unknown)
- `current_camera_id`: Currently displayed camera ID (None if idle)
- `last_update`: Last update timestamp
"""
monitor_service = MonitorService()
logger.info("list_monitors_request",
user_id=str(current_user.id),
username=current_user.username,
use_cache=use_cache)
result = await monitor_service.list_monitors(use_cache=use_cache)
logger.info("list_monitors_response",
user_id=str(current_user.id),
count=result["total"])
return result
@router.get(
"/{monitor_id}",
response_model=MonitorDetailResponse,
status_code=status.HTTP_200_OK,
summary="Get monitor details",
description="Get detailed information about a specific monitor",
dependencies=[Depends(require_viewer)] # Requires at least viewer role
)
async def get_monitor(
monitor_id: int,
use_cache: bool = Query(True, description="Use Redis cache (60s TTL)"),
current_user: User = Depends(require_viewer)
):
"""
Get detailed information about a specific monitor
**Authentication Required:**
- Minimum role: Viewer (all authenticated users can read monitors)
**Path Parameters:**
- `monitor_id`: Monitor ID (output channel number)
**Query Parameters:**
- `use_cache`: Use Redis cache (default: true, TTL: 60s)
**Response:**
- Monitor object with detailed information including current camera assignment
**Errors:**
- `404 Not Found`: Monitor with specified ID does not exist
"""
monitor_service = MonitorService()
logger.info("get_monitor_request",
user_id=str(current_user.id),
username=current_user.username,
monitor_id=monitor_id,
use_cache=use_cache)
monitor = await monitor_service.get_monitor(monitor_id, use_cache=use_cache)
if not monitor:
logger.warning("monitor_not_found",
user_id=str(current_user.id),
monitor_id=monitor_id)
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Monitor with ID {monitor_id} not found"
)
logger.info("get_monitor_response",
user_id=str(current_user.id),
monitor_id=monitor_id,
current_camera=monitor.get("current_camera_id"))
return monitor
@router.post(
"/refresh",
response_model=MonitorListResponse,
status_code=status.HTTP_200_OK,
summary="Refresh monitor list",
description="Force refresh monitor list from SDK Bridge (bypass cache)",
dependencies=[Depends(require_viewer)]
)
async def refresh_monitors(
current_user: User = Depends(require_viewer)
):
"""
Force refresh monitor list from GeViScope SDK Bridge
**Authentication Required:**
- Minimum role: Viewer
**Response:**
- Fresh monitor list from SDK Bridge
**Note:**
- This endpoint bypasses Redis cache and fetches fresh data
- Use this when you need real-time monitor status
- Cache is automatically invalidated and updated with fresh data
"""
monitor_service = MonitorService()
logger.info("refresh_monitors_request",
user_id=str(current_user.id),
username=current_user.username)
result = await monitor_service.refresh_monitor_list()
logger.info("refresh_monitors_response",
user_id=str(current_user.id),
count=result["total"])
return result
@router.get(
"/search/{query}",
response_model=MonitorListResponse,
status_code=status.HTTP_200_OK,
summary="Search monitors",
description="Search monitors by name or description",
dependencies=[Depends(require_viewer)]
)
async def search_monitors(
query: str,
current_user: User = Depends(require_viewer)
):
"""
Search monitors by name or description
**Authentication Required:**
- Minimum role: Viewer
**Path Parameters:**
- `query`: Search query string (case-insensitive)
**Response:**
- List of monitors matching the search query
**Search:**
- Searches monitor name and description fields
- Case-insensitive partial match
"""
monitor_service = MonitorService()
logger.info("search_monitors_request",
user_id=str(current_user.id),
username=current_user.username,
query=query)
monitors = await monitor_service.search_monitors(query)
logger.info("search_monitors_response",
user_id=str(current_user.id),
query=query,
matches=len(monitors))
return {
"monitors": monitors,
"total": len(monitors)
}
@router.get(
"/filter/available",
response_model=MonitorListResponse,
status_code=status.HTTP_200_OK,
summary="Get available monitors",
description="Get list of available (idle/free) monitors",
dependencies=[Depends(require_viewer)]
)
async def get_available_monitors(
current_user: User = Depends(require_viewer)
):
"""
Get list of available (idle/free) monitors
**Authentication Required:**
- Minimum role: Viewer
**Response:**
- List of monitors with no camera assigned (current_camera_id is None or 0)
**Use Case:**
- Use this endpoint to find monitors available for cross-switching
"""
monitor_service = MonitorService()
logger.info("get_available_monitors_request",
user_id=str(current_user.id),
username=current_user.username)
monitors = await monitor_service.get_available_monitors()
logger.info("get_available_monitors_response",
user_id=str(current_user.id),
count=len(monitors))
return {
"monitors": monitors,
"total": len(monitors)
}
@router.get(
"/filter/active",
response_model=MonitorListResponse,
status_code=status.HTTP_200_OK,
summary="Get active monitors",
description="Get list of active monitors (displaying a camera)",
dependencies=[Depends(require_viewer)]
)
async def get_active_monitors(
current_user: User = Depends(require_viewer)
):
"""
Get list of active monitors (displaying a camera)
**Authentication Required:**
- Minimum role: Viewer
**Response:**
- List of monitors with a camera assigned (current_camera_id is not None)
**Use Case:**
- Use this endpoint to see which monitors are currently in use
"""
monitor_service = MonitorService()
logger.info("get_active_monitors_request",
user_id=str(current_user.id),
username=current_user.username)
monitors = await monitor_service.get_active_monitors()
logger.info("get_active_monitors_response",
user_id=str(current_user.id),
count=len(monitors))
return {
"monitors": monitors,
"total": len(monitors)
}
@router.get(
"/routing",
status_code=status.HTTP_200_OK,
summary="Get current routing state",
description="Get current routing state (monitor -> camera mapping)",
dependencies=[Depends(require_viewer)]
)
async def get_routing_state(
current_user: User = Depends(require_viewer)
):
"""
Get current routing state (monitor -> camera mapping)
**Authentication Required:**
- Minimum role: Viewer
**Response:**
- Dictionary mapping monitor IDs to current camera IDs
- Format: `{monitor_id: camera_id, ...}`
- If monitor has no camera, camera_id is null
**Use Case:**
- Use this endpoint to get a quick overview of current routing configuration
"""
monitor_service = MonitorService()
logger.info("get_routing_state_request",
user_id=str(current_user.id),
username=current_user.username)
routing = await monitor_service.get_monitor_routing()
logger.info("get_routing_state_response",
user_id=str(current_user.id),
monitors=len(routing))
return {
"routing": routing,
"total_monitors": len(routing)
}

112
src/api/schemas/monitor.py Normal file
View File

@@ -0,0 +1,112 @@
"""
Monitor schemas for request/response validation
"""
from pydantic import BaseModel, Field
from typing import Optional
from datetime import datetime
class MonitorInfo(BaseModel):
"""Monitor information schema"""
id: int = Field(..., description="Monitor ID (output channel number in GeViScope)")
name: str = Field(..., description="Monitor name")
description: Optional[str] = Field(None, description="Monitor description")
status: str = Field(..., description="Monitor status (active, idle, offline, unknown)")
current_camera_id: Optional[int] = Field(None, description="Currently displayed camera ID (None if no camera)")
last_update: Optional[datetime] = Field(None, description="Last update timestamp")
model_config = {
"from_attributes": True,
"json_schema_extra": {
"examples": [
{
"id": 1,
"name": "Control Room Monitor 1",
"description": "Main monitoring display",
"status": "active",
"current_camera_id": 5,
"last_update": "2025-12-09T10:30:00Z"
}
]
}
}
class MonitorListResponse(BaseModel):
"""Response schema for monitor list endpoint"""
monitors: list[MonitorInfo] = Field(..., description="List of monitors")
total: int = Field(..., description="Total number of monitors")
model_config = {
"json_schema_extra": {
"examples": [
{
"monitors": [
{
"id": 1,
"name": "Control Room Monitor 1",
"description": "Main display",
"status": "active",
"current_camera_id": 5,
"last_update": "2025-12-09T10:30:00Z"
},
{
"id": 2,
"name": "Control Room Monitor 2",
"description": "Secondary display",
"status": "idle",
"current_camera_id": None,
"last_update": "2025-12-09T10:30:00Z"
}
],
"total": 2
}
]
}
}
class MonitorDetailResponse(BaseModel):
"""Response schema for single monitor detail"""
id: int = Field(..., description="Monitor ID")
name: str = Field(..., description="Monitor name")
description: Optional[str] = Field(None, description="Monitor description")
status: str = Field(..., description="Monitor status")
current_camera_id: Optional[int] = Field(None, description="Currently displayed camera ID")
current_camera_name: Optional[str] = Field(None, description="Currently displayed camera name")
last_update: Optional[datetime] = Field(None, description="Last update timestamp")
# Additional details
channel_id: Optional[int] = Field(None, description="Physical channel ID")
resolution: Optional[str] = Field(None, description="Monitor resolution (e.g., 1920x1080)")
is_available: bool = Field(default=True, description="Whether monitor is available for cross-switching")
model_config = {
"from_attributes": True,
"json_schema_extra": {
"examples": [
{
"id": 1,
"name": "Control Room Monitor 1",
"description": "Main monitoring display",
"status": "active",
"current_camera_id": 5,
"current_camera_name": "Entrance Camera",
"last_update": "2025-12-09T10:30:00Z",
"channel_id": 1,
"resolution": "1920x1080",
"is_available": True
}
]
}
}
class MonitorStatusEnum:
"""Monitor status constants"""
ACTIVE = "active" # Monitor is displaying a camera
IDLE = "idle" # Monitor is on but not displaying anything
OFFLINE = "offline" # Monitor is not reachable
UNKNOWN = "unknown" # Monitor status cannot be determined
ERROR = "error" # Monitor has an error
MAINTENANCE = "maintenance" # Monitor is under maintenance

View File

@@ -0,0 +1,229 @@
"""
Monitor service for managing monitor discovery and information
"""
from typing import List, Optional, Dict, Any
from datetime import datetime
import structlog
from clients.sdk_bridge_client import sdk_bridge_client
from clients.redis_client import redis_client
from config import settings
logger = structlog.get_logger()
# Redis cache TTL for monitor data (60 seconds)
MONITOR_CACHE_TTL = 60
class MonitorService:
"""Service for monitor operations"""
def __init__(self):
"""Initialize monitor service"""
pass
async def list_monitors(self, use_cache: bool = True) -> Dict[str, Any]:
"""
Get list of all monitors from SDK Bridge
Args:
use_cache: Whether to use Redis cache (default: True)
Returns:
Dictionary with 'monitors' list and 'total' count
"""
cache_key = "monitors:list"
# Try to get from cache first
if use_cache:
cached_data = await redis_client.get_json(cache_key)
if cached_data:
logger.info("monitor_list_cache_hit")
return cached_data
logger.info("monitor_list_cache_miss_fetching_from_sdk")
try:
# Fetch monitors from SDK Bridge via gRPC
monitors = await sdk_bridge_client.list_monitors()
# Transform to response format
result = {
"monitors": monitors,
"total": len(monitors)
}
# Cache the result
if use_cache:
await redis_client.set_json(cache_key, result, expire=MONITOR_CACHE_TTL)
logger.info("monitor_list_cached", count=len(monitors), ttl=MONITOR_CACHE_TTL)
return result
except Exception as e:
logger.error("monitor_list_failed", error=str(e), exc_info=True)
# Return empty list on error
return {"monitors": [], "total": 0}
async def get_monitor(self, monitor_id: int, use_cache: bool = True) -> Optional[Dict[str, Any]]:
"""
Get single monitor by ID
Args:
monitor_id: Monitor ID (output channel number)
use_cache: Whether to use Redis cache (default: True)
Returns:
Monitor dictionary or None if not found
"""
cache_key = f"monitors:detail:{monitor_id}"
# Try to get from cache first
if use_cache:
cached_data = await redis_client.get_json(cache_key)
if cached_data:
logger.info("monitor_detail_cache_hit", monitor_id=monitor_id)
return cached_data
logger.info("monitor_detail_cache_miss_fetching_from_sdk", monitor_id=monitor_id)
try:
# Fetch monitor from SDK Bridge via gRPC
monitor = await sdk_bridge_client.get_monitor(monitor_id)
if not monitor:
logger.warning("monitor_not_found", monitor_id=monitor_id)
return None
# Cache the result
if use_cache:
await redis_client.set_json(cache_key, monitor, expire=MONITOR_CACHE_TTL)
logger.info("monitor_detail_cached", monitor_id=monitor_id, ttl=MONITOR_CACHE_TTL)
return monitor
except Exception as e:
logger.error("monitor_detail_failed", monitor_id=monitor_id, error=str(e), exc_info=True)
return None
async def invalidate_cache(self, monitor_id: Optional[int] = None) -> None:
"""
Invalidate monitor cache
Args:
monitor_id: Specific monitor ID to invalidate, or None to invalidate all
"""
if monitor_id is not None:
# Invalidate specific monitor
cache_key = f"monitors:detail:{monitor_id}"
await redis_client.delete(cache_key)
logger.info("monitor_cache_invalidated", monitor_id=monitor_id)
else:
# Invalidate monitor list cache
await redis_client.delete("monitors:list")
logger.info("monitor_list_cache_invalidated")
async def refresh_monitor_list(self) -> Dict[str, Any]:
"""
Force refresh monitor list from SDK Bridge (bypass cache)
Returns:
Dictionary with 'monitors' list and 'total' count
"""
logger.info("monitor_list_force_refresh")
# Invalidate cache first
await self.invalidate_cache()
# Fetch fresh data
return await self.list_monitors(use_cache=False)
async def get_monitor_count(self) -> int:
"""
Get total number of monitors
Returns:
Total monitor count
"""
result = await self.list_monitors(use_cache=True)
return result["total"]
async def search_monitors(self, query: str) -> List[Dict[str, Any]]:
"""
Search monitors by name or description
Args:
query: Search query string
Returns:
List of matching monitors
"""
result = await self.list_monitors(use_cache=True)
monitors = result["monitors"]
# Simple case-insensitive search
query_lower = query.lower()
matching = [
mon for mon in monitors
if query_lower in mon.get("name", "").lower()
or query_lower in mon.get("description", "").lower()
]
logger.info("monitor_search", query=query, matches=len(matching))
return matching
async def get_available_monitors(self) -> List[Dict[str, Any]]:
"""
Get list of available (idle/free) monitors
Returns:
List of monitors with no camera assigned
"""
result = await self.list_monitors(use_cache=True)
monitors = result["monitors"]
# Available monitors have no camera assigned (current_camera_id is None or 0)
available = [
mon for mon in monitors
if mon.get("current_camera_id") is None or mon.get("current_camera_id") == 0
]
logger.info("available_monitors_retrieved", count=len(available), total=len(monitors))
return available
async def get_active_monitors(self) -> List[Dict[str, Any]]:
"""
Get list of active monitors (displaying a camera)
Returns:
List of monitors with a camera assigned
"""
result = await self.list_monitors(use_cache=True)
monitors = result["monitors"]
# Active monitors have a camera assigned
active = [
mon for mon in monitors
if mon.get("current_camera_id") is not None and mon.get("current_camera_id") != 0
]
logger.info("active_monitors_retrieved", count=len(active), total=len(monitors))
return active
async def get_monitor_routing(self) -> Dict[int, Optional[int]]:
"""
Get current routing state (monitor_id -> camera_id mapping)
Returns:
Dictionary mapping monitor IDs to current camera IDs
"""
result = await self.list_monitors(use_cache=True)
monitors = result["monitors"]
routing = {
mon["id"]: mon.get("current_camera_id")
for mon in monitors
}
logger.info("monitor_routing_retrieved", monitors=len(routing))
return routing

View File

@@ -0,0 +1,275 @@
"""
Contract tests for monitor API endpoints
These tests define the expected behavior - they will FAIL until implementation is complete
"""
import pytest
from httpx import AsyncClient
from fastapi import status
@pytest.mark.asyncio
class TestMonitorsList:
"""Contract tests for GET /api/v1/monitors"""
async def test_list_monitors_success_admin(self, async_client: AsyncClient, auth_token: str):
"""Test listing monitors with admin authentication"""
response = await async_client.get(
"/api/v1/monitors",
headers={"Authorization": f"Bearer {auth_token}"}
)
assert response.status_code == status.HTTP_200_OK
data = response.json()
# Verify response structure
assert "monitors" in data
assert "total" in data
assert isinstance(data["monitors"], list)
assert isinstance(data["total"], int)
# If monitors exist, verify monitor structure
if data["monitors"]:
monitor = data["monitors"][0]
assert "id" in monitor
assert "name" in monitor
assert "description" in monitor
assert "status" in monitor
assert "current_camera_id" in monitor
async def test_list_monitors_success_operator(self, async_client: AsyncClient, operator_token: str):
"""Test listing monitors with operator role"""
response = await async_client.get(
"/api/v1/monitors",
headers={"Authorization": f"Bearer {operator_token}"}
)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert "monitors" in data
async def test_list_monitors_success_viewer(self, async_client: AsyncClient, viewer_token: str):
"""Test listing monitors with viewer role (read-only)"""
response = await async_client.get(
"/api/v1/monitors",
headers={"Authorization": f"Bearer {viewer_token}"}
)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert "monitors" in data
async def test_list_monitors_no_auth(self, async_client: AsyncClient):
"""Test listing monitors without authentication"""
response = await async_client.get("/api/v1/monitors")
assert response.status_code == status.HTTP_401_UNAUTHORIZED
data = response.json()
assert "error" in data or "detail" in data
async def test_list_monitors_invalid_token(self, async_client: AsyncClient):
"""Test listing monitors with invalid token"""
response = await async_client.get(
"/api/v1/monitors",
headers={"Authorization": "Bearer invalid_token_here"}
)
assert response.status_code == status.HTTP_401_UNAUTHORIZED
async def test_list_monitors_expired_token(self, async_client: AsyncClient, expired_token: str):
"""Test listing monitors with expired token"""
response = await async_client.get(
"/api/v1/monitors",
headers={"Authorization": f"Bearer {expired_token}"}
)
assert response.status_code == status.HTTP_401_UNAUTHORIZED
async def test_list_monitors_caching(self, async_client: AsyncClient, auth_token: str):
"""Test that monitor list is cached (second request should be faster)"""
# First request - cache miss
response1 = await async_client.get(
"/api/v1/monitors",
headers={"Authorization": f"Bearer {auth_token}"}
)
assert response1.status_code == status.HTTP_200_OK
# Second request - cache hit
response2 = await async_client.get(
"/api/v1/monitors",
headers={"Authorization": f"Bearer {auth_token}"}
)
assert response2.status_code == status.HTTP_200_OK
# Results should be identical
assert response1.json() == response2.json()
async def test_list_monitors_empty_result(self, async_client: AsyncClient, auth_token: str):
"""Test listing monitors when none are available"""
response = await async_client.get(
"/api/v1/monitors",
headers={"Authorization": f"Bearer {auth_token}"}
)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert "monitors" in data
assert data["total"] >= 0 # Can be 0 if no monitors
@pytest.mark.asyncio
class TestMonitorDetail:
"""Contract tests for GET /api/v1/monitors/{monitor_id}"""
async def test_get_monitor_success(self, async_client: AsyncClient, auth_token: str):
"""Test getting single monitor details"""
# First get list to find a valid monitor ID
list_response = await async_client.get(
"/api/v1/monitors",
headers={"Authorization": f"Bearer {auth_token}"}
)
monitors = list_response.json()["monitors"]
if not monitors:
pytest.skip("No monitors available for testing")
monitor_id = monitors[0]["id"]
# Now get monitor detail
response = await async_client.get(
f"/api/v1/monitors/{monitor_id}",
headers={"Authorization": f"Bearer {auth_token}"}
)
assert response.status_code == status.HTTP_200_OK
data = response.json()
# Verify monitor structure
assert data["id"] == monitor_id
assert "name" in data
assert "description" in data
assert "status" in data
assert "current_camera_id" in data
async def test_get_monitor_not_found(self, async_client: AsyncClient, auth_token: str):
"""Test getting non-existent monitor"""
response = await async_client.get(
"/api/v1/monitors/99999", # Non-existent ID
headers={"Authorization": f"Bearer {auth_token}"}
)
assert response.status_code == status.HTTP_404_NOT_FOUND
data = response.json()
assert "error" in data or "detail" in data
async def test_get_monitor_invalid_id(self, async_client: AsyncClient, auth_token: str):
"""Test getting monitor with invalid ID format"""
response = await async_client.get(
"/api/v1/monitors/invalid",
headers={"Authorization": f"Bearer {auth_token}"}
)
# Should return 422 (validation error) or 404 (not found)
assert response.status_code in [status.HTTP_422_UNPROCESSABLE_ENTITY, status.HTTP_404_NOT_FOUND]
async def test_get_monitor_no_auth(self, async_client: AsyncClient):
"""Test getting monitor without authentication"""
response = await async_client.get("/api/v1/monitors/1")
assert response.status_code == status.HTTP_401_UNAUTHORIZED
async def test_get_monitor_all_roles(self, async_client: AsyncClient, auth_token: str,
operator_token: str, viewer_token: str):
"""Test that all roles can read monitor details"""
# All roles (viewer, operator, administrator) should be able to read monitors
for token in [viewer_token, operator_token, auth_token]:
response = await async_client.get(
"/api/v1/monitors/1",
headers={"Authorization": f"Bearer {token}"}
)
# Should succeed or return 404 (if monitor doesn't exist), but not 403
assert response.status_code in [status.HTTP_200_OK, status.HTTP_404_NOT_FOUND]
async def test_get_monitor_caching(self, async_client: AsyncClient, auth_token: str):
"""Test that monitor details are cached"""
monitor_id = 1
# First request - cache miss
response1 = await async_client.get(
f"/api/v1/monitors/{monitor_id}",
headers={"Authorization": f"Bearer {auth_token}"}
)
# Second request - cache hit (if monitor exists)
response2 = await async_client.get(
f"/api/v1/monitors/{monitor_id}",
headers={"Authorization": f"Bearer {auth_token}"}
)
# Both should have same status code
assert response1.status_code == response2.status_code
# If successful, results should be identical
if response1.status_code == status.HTTP_200_OK:
assert response1.json() == response2.json()
@pytest.mark.asyncio
class TestMonitorAvailable:
"""Contract tests for GET /api/v1/monitors/filter/available"""
async def test_get_available_monitors(self, async_client: AsyncClient, auth_token: str):
"""Test getting available (idle/free) monitors"""
response = await async_client.get(
"/api/v1/monitors/filter/available",
headers={"Authorization": f"Bearer {auth_token}"}
)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert "monitors" in data
assert "total" in data
# Available monitors should have no camera assigned (or current_camera_id is None/0)
if data["monitors"]:
for monitor in data["monitors"]:
# Available monitors typically have no camera or camera_id = 0
assert monitor.get("current_camera_id") is None or monitor.get("current_camera_id") == 0
@pytest.mark.asyncio
class TestMonitorIntegration:
"""Integration tests for monitor endpoints with SDK Bridge"""
async def test_monitor_data_consistency(self, async_client: AsyncClient, auth_token: str):
"""Test that monitor data is consistent between list and detail endpoints"""
# Get monitor list
list_response = await async_client.get(
"/api/v1/monitors",
headers={"Authorization": f"Bearer {auth_token}"}
)
if list_response.status_code != status.HTTP_200_OK:
pytest.skip("Monitor list not available")
monitors = list_response.json()["monitors"]
if not monitors:
pytest.skip("No monitors available")
# Get first monitor detail
monitor_id = monitors[0]["id"]
detail_response = await async_client.get(
f"/api/v1/monitors/{monitor_id}",
headers={"Authorization": f"Bearer {auth_token}"}
)
assert detail_response.status_code == status.HTTP_200_OK
# Verify consistency
list_monitor = monitors[0]
detail_monitor = detail_response.json()
assert list_monitor["id"] == detail_monitor["id"]
assert list_monitor["name"] == detail_monitor["name"]
assert list_monitor["status"] == detail_monitor["status"]
assert list_monitor["current_camera_id"] == detail_monitor["current_camera_id"]