Phase 5: Camera Discovery (T049-T055)

Implemented complete camera discovery system with Redis caching:

**Tests:**
- Contract tests for GET /api/v1/cameras (list cameras)
- Contract tests for GET /api/v1/cameras/{id} (camera detail)
- Integration tests for camera data consistency
- Tests for caching behavior and all authentication roles

**Schemas:**
- CameraInfo: Camera data model (id, name, description, has_ptz, has_video_sensor, status)
- CameraListResponse: List endpoint response
- CameraDetailResponse: Detail endpoint response with extended fields
- CameraStatusEnum: Status constants (online, offline, unknown, error, maintenance)

**Services:**
- CameraService: list_cameras(), get_camera(), invalidate_cache()
- Additional methods: search_cameras(), get_online_cameras(), get_ptz_cameras()
- Integrated Redis caching with 60s TTL
- Automatic cache invalidation and refresh

**Router Endpoints:**
- GET /api/v1/cameras - List all cameras (cached, 60s TTL)
- GET /api/v1/cameras/{id} - Get camera details
- POST /api/v1/cameras/refresh - Force refresh (bypass cache)
- GET /api/v1/cameras/search/{query} - Search cameras by name/description
- GET /api/v1/cameras/filter/online - Get online cameras only
- GET /api/v1/cameras/filter/ptz - Get PTZ cameras only

**Authorization:**
- All camera endpoints require at least Viewer role
- All authenticated users can read camera data

**Integration:**
- Registered camera router in main.py
- Camera service communicates with SDK Bridge via gRPC
- Redis caching for performance optimization

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
Geutebruck API Developer
2025-12-09 09:19:27 +01:00
parent fbebe10711
commit 4866a8edc3
5 changed files with 869 additions and 3 deletions

View File

@@ -158,12 +158,12 @@ async def root():
}
# Register routers
from routers import auth
from routers import auth, cameras
app.include_router(auth.router)
app.include_router(cameras.router)
# TODO: Add remaining routers as phases complete
# from routers import cameras, monitors, crossswitch
# app.include_router(cameras.router)
# from routers import monitors, crossswitch
# app.include_router(monitors.router)
# app.include_router(crossswitch.router)

293
src/api/routers/cameras.py Normal file
View File

@@ -0,0 +1,293 @@
"""
Camera router for camera discovery and information
"""
from fastapi import APIRouter, Depends, status, HTTPException, Query
from fastapi.responses import JSONResponse
import structlog
from schemas.camera import CameraListResponse, CameraDetailResponse
from services.camera_service import CameraService
from middleware.auth_middleware import require_viewer, get_current_user
from models.user import User
logger = structlog.get_logger()
router = APIRouter(
prefix="/api/v1/cameras",
tags=["cameras"]
)
@router.get(
"",
response_model=CameraListResponse,
status_code=status.HTTP_200_OK,
summary="List all cameras",
description="Get list of all cameras discovered from GeViScope",
dependencies=[Depends(require_viewer)] # Requires at least viewer role
)
async def list_cameras(
use_cache: bool = Query(True, description="Use Redis cache (60s TTL)"),
current_user: User = Depends(require_viewer)
):
"""
Get list of all cameras from GeViScope SDK Bridge
**Authentication Required:**
- Minimum role: Viewer (all authenticated users can read cameras)
**Query Parameters:**
- `use_cache`: Use Redis cache (default: true, TTL: 60s)
**Response:**
- `cameras`: List of camera objects
- `total`: Total number of cameras
**Caching:**
- Results are cached in Redis for 60 seconds
- Set `use_cache=false` to bypass cache and fetch fresh data
**Camera Object:**
- `id`: Camera ID (channel number)
- `name`: Camera name
- `description`: Camera description
- `has_ptz`: PTZ capability flag
- `has_video_sensor`: Video sensor flag
- `status`: Camera status (online, offline, unknown)
- `last_seen`: Last seen timestamp
"""
camera_service = CameraService()
logger.info("list_cameras_request",
user_id=str(current_user.id),
username=current_user.username,
use_cache=use_cache)
result = await camera_service.list_cameras(use_cache=use_cache)
logger.info("list_cameras_response",
user_id=str(current_user.id),
count=result["total"])
return result
@router.get(
"/{camera_id}",
response_model=CameraDetailResponse,
status_code=status.HTTP_200_OK,
summary="Get camera details",
description="Get detailed information about a specific camera",
dependencies=[Depends(require_viewer)] # Requires at least viewer role
)
async def get_camera(
camera_id: int,
use_cache: bool = Query(True, description="Use Redis cache (60s TTL)"),
current_user: User = Depends(require_viewer)
):
"""
Get detailed information about a specific camera
**Authentication Required:**
- Minimum role: Viewer (all authenticated users can read cameras)
**Path Parameters:**
- `camera_id`: Camera ID (channel number)
**Query Parameters:**
- `use_cache`: Use Redis cache (default: true, TTL: 60s)
**Response:**
- Camera object with detailed information
**Errors:**
- `404 Not Found`: Camera with specified ID does not exist
"""
camera_service = CameraService()
logger.info("get_camera_request",
user_id=str(current_user.id),
username=current_user.username,
camera_id=camera_id,
use_cache=use_cache)
camera = await camera_service.get_camera(camera_id, use_cache=use_cache)
if not camera:
logger.warning("camera_not_found",
user_id=str(current_user.id),
camera_id=camera_id)
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Camera with ID {camera_id} not found"
)
logger.info("get_camera_response",
user_id=str(current_user.id),
camera_id=camera_id)
return camera
@router.post(
"/refresh",
response_model=CameraListResponse,
status_code=status.HTTP_200_OK,
summary="Refresh camera list",
description="Force refresh camera list from SDK Bridge (bypass cache)",
dependencies=[Depends(require_viewer)]
)
async def refresh_cameras(
current_user: User = Depends(require_viewer)
):
"""
Force refresh camera list from GeViScope SDK Bridge
**Authentication Required:**
- Minimum role: Viewer
**Response:**
- Fresh camera list from SDK Bridge
**Note:**
- This endpoint bypasses Redis cache and fetches fresh data
- Use this when you need real-time camera status
- Cache is automatically invalidated and updated with fresh data
"""
camera_service = CameraService()
logger.info("refresh_cameras_request",
user_id=str(current_user.id),
username=current_user.username)
result = await camera_service.refresh_camera_list()
logger.info("refresh_cameras_response",
user_id=str(current_user.id),
count=result["total"])
return result
@router.get(
"/search/{query}",
response_model=CameraListResponse,
status_code=status.HTTP_200_OK,
summary="Search cameras",
description="Search cameras by name or description",
dependencies=[Depends(require_viewer)]
)
async def search_cameras(
query: str,
current_user: User = Depends(require_viewer)
):
"""
Search cameras by name or description
**Authentication Required:**
- Minimum role: Viewer
**Path Parameters:**
- `query`: Search query string (case-insensitive)
**Response:**
- List of cameras matching the search query
**Search:**
- Searches camera name and description fields
- Case-insensitive partial match
"""
camera_service = CameraService()
logger.info("search_cameras_request",
user_id=str(current_user.id),
username=current_user.username,
query=query)
cameras = await camera_service.search_cameras(query)
logger.info("search_cameras_response",
user_id=str(current_user.id),
query=query,
matches=len(cameras))
return {
"cameras": cameras,
"total": len(cameras)
}
@router.get(
"/filter/online",
response_model=CameraListResponse,
status_code=status.HTTP_200_OK,
summary="Get online cameras",
description="Get list of online cameras only",
dependencies=[Depends(require_viewer)]
)
async def get_online_cameras(
current_user: User = Depends(require_viewer)
):
"""
Get list of online cameras only
**Authentication Required:**
- Minimum role: Viewer
**Response:**
- List of cameras with status="online"
"""
camera_service = CameraService()
logger.info("get_online_cameras_request",
user_id=str(current_user.id),
username=current_user.username)
cameras = await camera_service.get_online_cameras()
logger.info("get_online_cameras_response",
user_id=str(current_user.id),
count=len(cameras))
return {
"cameras": cameras,
"total": len(cameras)
}
@router.get(
"/filter/ptz",
response_model=CameraListResponse,
status_code=status.HTTP_200_OK,
summary="Get PTZ cameras",
description="Get list of cameras with PTZ capabilities",
dependencies=[Depends(require_viewer)]
)
async def get_ptz_cameras(
current_user: User = Depends(require_viewer)
):
"""
Get list of cameras with PTZ capabilities
**Authentication Required:**
- Minimum role: Viewer
**Response:**
- List of cameras with has_ptz=true
"""
camera_service = CameraService()
logger.info("get_ptz_cameras_request",
user_id=str(current_user.id),
username=current_user.username)
cameras = await camera_service.get_ptz_cameras()
logger.info("get_ptz_cameras_response",
user_id=str(current_user.id),
count=len(cameras))
return {
"cameras": cameras,
"total": len(cameras)
}

117
src/api/schemas/camera.py Normal file
View File

@@ -0,0 +1,117 @@
"""
Camera schemas for request/response validation
"""
from pydantic import BaseModel, Field
from typing import Optional
from datetime import datetime
class CameraInfo(BaseModel):
"""Camera information schema"""
id: int = Field(..., description="Camera ID (channel number in GeViScope)")
name: str = Field(..., description="Camera name")
description: Optional[str] = Field(None, description="Camera description")
has_ptz: bool = Field(default=False, description="Whether camera has PTZ capabilities")
has_video_sensor: bool = Field(default=False, description="Whether camera has video sensor (motion detection)")
status: str = Field(..., description="Camera status (online, offline, unknown)")
last_seen: Optional[datetime] = Field(None, description="Last time camera was seen online")
model_config = {
"from_attributes": True,
"json_schema_extra": {
"examples": [
{
"id": 1,
"name": "Entrance Camera",
"description": "Main entrance monitoring",
"has_ptz": True,
"has_video_sensor": True,
"status": "online",
"last_seen": "2025-12-09T10:30:00Z"
}
]
}
}
class CameraListResponse(BaseModel):
"""Response schema for camera list endpoint"""
cameras: list[CameraInfo] = Field(..., description="List of cameras")
total: int = Field(..., description="Total number of cameras")
model_config = {
"json_schema_extra": {
"examples": [
{
"cameras": [
{
"id": 1,
"name": "Entrance Camera",
"description": "Main entrance",
"has_ptz": True,
"has_video_sensor": True,
"status": "online",
"last_seen": "2025-12-09T10:30:00Z"
},
{
"id": 2,
"name": "Parking Lot",
"description": "Parking area monitoring",
"has_ptz": False,
"has_video_sensor": True,
"status": "online",
"last_seen": "2025-12-09T10:30:00Z"
}
],
"total": 2
}
]
}
}
class CameraDetailResponse(BaseModel):
"""Response schema for single camera detail"""
id: int = Field(..., description="Camera ID")
name: str = Field(..., description="Camera name")
description: Optional[str] = Field(None, description="Camera description")
has_ptz: bool = Field(default=False, description="PTZ capability")
has_video_sensor: bool = Field(default=False, description="Video sensor capability")
status: str = Field(..., description="Camera status")
last_seen: Optional[datetime] = Field(None, description="Last seen timestamp")
# Additional details that might be available
channel_id: Optional[int] = Field(None, description="Physical channel ID")
ip_address: Optional[str] = Field(None, description="Camera IP address")
model: Optional[str] = Field(None, description="Camera model")
firmware_version: Optional[str] = Field(None, description="Firmware version")
model_config = {
"from_attributes": True,
"json_schema_extra": {
"examples": [
{
"id": 1,
"name": "Entrance Camera",
"description": "Main entrance monitoring",
"has_ptz": True,
"has_video_sensor": True,
"status": "online",
"last_seen": "2025-12-09T10:30:00Z",
"channel_id": 1,
"ip_address": "192.168.1.100",
"model": "Geutebruck G-Cam/E2510",
"firmware_version": "7.9.975.68"
}
]
}
}
class CameraStatusEnum:
"""Camera status constants"""
ONLINE = "online"
OFFLINE = "offline"
UNKNOWN = "unknown"
ERROR = "error"
MAINTENANCE = "maintenance"

View File

@@ -0,0 +1,203 @@
"""
Camera service for managing camera discovery and information
"""
from typing import List, Optional, Dict, Any
from datetime import datetime
import structlog
from clients.sdk_bridge_client import sdk_bridge_client
from clients.redis_client import redis_client
from config import settings
logger = structlog.get_logger()
# Redis cache TTL for camera data (60 seconds)
CAMERA_CACHE_TTL = 60
class CameraService:
"""Service for camera operations"""
def __init__(self):
"""Initialize camera service"""
pass
async def list_cameras(self, use_cache: bool = True) -> Dict[str, Any]:
"""
Get list of all cameras from SDK Bridge
Args:
use_cache: Whether to use Redis cache (default: True)
Returns:
Dictionary with 'cameras' list and 'total' count
"""
cache_key = "cameras:list"
# Try to get from cache first
if use_cache:
cached_data = await redis_client.get_json(cache_key)
if cached_data:
logger.info("camera_list_cache_hit")
return cached_data
logger.info("camera_list_cache_miss_fetching_from_sdk")
try:
# Fetch cameras from SDK Bridge via gRPC
cameras = await sdk_bridge_client.list_cameras()
# Transform to response format
result = {
"cameras": cameras,
"total": len(cameras)
}
# Cache the result
if use_cache:
await redis_client.set_json(cache_key, result, expire=CAMERA_CACHE_TTL)
logger.info("camera_list_cached", count=len(cameras), ttl=CAMERA_CACHE_TTL)
return result
except Exception as e:
logger.error("camera_list_failed", error=str(e), exc_info=True)
# Return empty list on error
return {"cameras": [], "total": 0}
async def get_camera(self, camera_id: int, use_cache: bool = True) -> Optional[Dict[str, Any]]:
"""
Get single camera by ID
Args:
camera_id: Camera ID (channel number)
use_cache: Whether to use Redis cache (default: True)
Returns:
Camera dictionary or None if not found
"""
cache_key = f"cameras:detail:{camera_id}"
# Try to get from cache first
if use_cache:
cached_data = await redis_client.get_json(cache_key)
if cached_data:
logger.info("camera_detail_cache_hit", camera_id=camera_id)
return cached_data
logger.info("camera_detail_cache_miss_fetching_from_sdk", camera_id=camera_id)
try:
# Fetch camera from SDK Bridge via gRPC
camera = await sdk_bridge_client.get_camera(camera_id)
if not camera:
logger.warning("camera_not_found", camera_id=camera_id)
return None
# Cache the result
if use_cache:
await redis_client.set_json(cache_key, camera, expire=CAMERA_CACHE_TTL)
logger.info("camera_detail_cached", camera_id=camera_id, ttl=CAMERA_CACHE_TTL)
return camera
except Exception as e:
logger.error("camera_detail_failed", camera_id=camera_id, error=str(e), exc_info=True)
return None
async def invalidate_cache(self, camera_id: Optional[int] = None) -> None:
"""
Invalidate camera cache
Args:
camera_id: Specific camera ID to invalidate, or None to invalidate all
"""
if camera_id is not None:
# Invalidate specific camera
cache_key = f"cameras:detail:{camera_id}"
await redis_client.delete(cache_key)
logger.info("camera_cache_invalidated", camera_id=camera_id)
else:
# Invalidate camera list cache
await redis_client.delete("cameras:list")
logger.info("camera_list_cache_invalidated")
async def refresh_camera_list(self) -> Dict[str, Any]:
"""
Force refresh camera list from SDK Bridge (bypass cache)
Returns:
Dictionary with 'cameras' list and 'total' count
"""
logger.info("camera_list_force_refresh")
# Invalidate cache first
await self.invalidate_cache()
# Fetch fresh data
return await self.list_cameras(use_cache=False)
async def get_camera_count(self) -> int:
"""
Get total number of cameras
Returns:
Total camera count
"""
result = await self.list_cameras(use_cache=True)
return result["total"]
async def search_cameras(self, query: str) -> List[Dict[str, Any]]:
"""
Search cameras by name or description
Args:
query: Search query string
Returns:
List of matching cameras
"""
result = await self.list_cameras(use_cache=True)
cameras = result["cameras"]
# Simple case-insensitive search
query_lower = query.lower()
matching = [
cam for cam in cameras
if query_lower in cam.get("name", "").lower()
or query_lower in cam.get("description", "").lower()
]
logger.info("camera_search", query=query, matches=len(matching))
return matching
async def get_online_cameras(self) -> List[Dict[str, Any]]:
"""
Get list of online cameras only
Returns:
List of online cameras
"""
result = await self.list_cameras(use_cache=True)
cameras = result["cameras"]
online = [cam for cam in cameras if cam.get("status") == "online"]
logger.info("online_cameras_retrieved", count=len(online), total=len(cameras))
return online
async def get_ptz_cameras(self) -> List[Dict[str, Any]]:
"""
Get list of cameras with PTZ capabilities
Returns:
List of PTZ cameras
"""
result = await self.list_cameras(use_cache=True)
cameras = result["cameras"]
ptz_cameras = [cam for cam in cameras if cam.get("has_ptz", False)]
logger.info("ptz_cameras_retrieved", count=len(ptz_cameras), total=len(cameras))
return ptz_cameras

View File

@@ -0,0 +1,253 @@
"""
Contract tests for camera API endpoints
These tests define the expected behavior - they will FAIL until implementation is complete
"""
import pytest
from httpx import AsyncClient
from fastapi import status
@pytest.mark.asyncio
class TestCamerasList:
"""Contract tests for GET /api/v1/cameras"""
async def test_list_cameras_success_admin(self, async_client: AsyncClient, auth_token: str):
"""Test listing cameras with admin authentication"""
response = await async_client.get(
"/api/v1/cameras",
headers={"Authorization": f"Bearer {auth_token}"}
)
assert response.status_code == status.HTTP_200_OK
data = response.json()
# Verify response structure
assert "cameras" in data
assert "total" in data
assert isinstance(data["cameras"], list)
assert isinstance(data["total"], int)
# If cameras exist, verify camera structure
if data["cameras"]:
camera = data["cameras"][0]
assert "id" in camera
assert "name" in camera
assert "description" in camera
assert "has_ptz" in camera
assert "has_video_sensor" in camera
assert "status" in camera
async def test_list_cameras_success_operator(self, async_client: AsyncClient, operator_token: str):
"""Test listing cameras with operator role"""
response = await async_client.get(
"/api/v1/cameras",
headers={"Authorization": f"Bearer {operator_token}"}
)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert "cameras" in data
async def test_list_cameras_success_viewer(self, async_client: AsyncClient, viewer_token: str):
"""Test listing cameras with viewer role (read-only)"""
response = await async_client.get(
"/api/v1/cameras",
headers={"Authorization": f"Bearer {viewer_token}"}
)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert "cameras" in data
async def test_list_cameras_no_auth(self, async_client: AsyncClient):
"""Test listing cameras without authentication"""
response = await async_client.get("/api/v1/cameras")
assert response.status_code == status.HTTP_401_UNAUTHORIZED
data = response.json()
assert "error" in data or "detail" in data
async def test_list_cameras_invalid_token(self, async_client: AsyncClient):
"""Test listing cameras with invalid token"""
response = await async_client.get(
"/api/v1/cameras",
headers={"Authorization": "Bearer invalid_token_here"}
)
assert response.status_code == status.HTTP_401_UNAUTHORIZED
async def test_list_cameras_expired_token(self, async_client: AsyncClient, expired_token: str):
"""Test listing cameras with expired token"""
response = await async_client.get(
"/api/v1/cameras",
headers={"Authorization": f"Bearer {expired_token}"}
)
assert response.status_code == status.HTTP_401_UNAUTHORIZED
async def test_list_cameras_caching(self, async_client: AsyncClient, auth_token: str):
"""Test that camera list is cached (second request should be faster)"""
# First request - cache miss
response1 = await async_client.get(
"/api/v1/cameras",
headers={"Authorization": f"Bearer {auth_token}"}
)
assert response1.status_code == status.HTTP_200_OK
# Second request - cache hit
response2 = await async_client.get(
"/api/v1/cameras",
headers={"Authorization": f"Bearer {auth_token}"}
)
assert response2.status_code == status.HTTP_200_OK
# Results should be identical
assert response1.json() == response2.json()
async def test_list_cameras_empty_result(self, async_client: AsyncClient, auth_token: str):
"""Test listing cameras when none are available"""
# This test assumes SDK Bridge might return empty list
response = await async_client.get(
"/api/v1/cameras",
headers={"Authorization": f"Bearer {auth_token}"}
)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert "cameras" in data
assert data["total"] >= 0 # Can be 0 if no cameras
@pytest.mark.asyncio
class TestCameraDetail:
"""Contract tests for GET /api/v1/cameras/{camera_id}"""
async def test_get_camera_success(self, async_client: AsyncClient, auth_token: str):
"""Test getting single camera details"""
# First get list to find a valid camera ID
list_response = await async_client.get(
"/api/v1/cameras",
headers={"Authorization": f"Bearer {auth_token}"}
)
cameras = list_response.json()["cameras"]
if not cameras:
pytest.skip("No cameras available for testing")
camera_id = cameras[0]["id"]
# Now get camera detail
response = await async_client.get(
f"/api/v1/cameras/{camera_id}",
headers={"Authorization": f"Bearer {auth_token}"}
)
assert response.status_code == status.HTTP_200_OK
data = response.json()
# Verify camera structure
assert data["id"] == camera_id
assert "name" in data
assert "description" in data
assert "has_ptz" in data
assert "has_video_sensor" in data
assert "status" in data
async def test_get_camera_not_found(self, async_client: AsyncClient, auth_token: str):
"""Test getting non-existent camera"""
response = await async_client.get(
"/api/v1/cameras/99999", # Non-existent ID
headers={"Authorization": f"Bearer {auth_token}"}
)
assert response.status_code == status.HTTP_404_NOT_FOUND
data = response.json()
assert "error" in data or "detail" in data
async def test_get_camera_invalid_id(self, async_client: AsyncClient, auth_token: str):
"""Test getting camera with invalid ID format"""
response = await async_client.get(
"/api/v1/cameras/invalid",
headers={"Authorization": f"Bearer {auth_token}"}
)
# Should return 422 (validation error) or 404 (not found)
assert response.status_code in [status.HTTP_422_UNPROCESSABLE_ENTITY, status.HTTP_404_NOT_FOUND]
async def test_get_camera_no_auth(self, async_client: AsyncClient):
"""Test getting camera without authentication"""
response = await async_client.get("/api/v1/cameras/1")
assert response.status_code == status.HTTP_401_UNAUTHORIZED
async def test_get_camera_all_roles(self, async_client: AsyncClient, auth_token: str,
operator_token: str, viewer_token: str):
"""Test that all roles can read camera details"""
# All roles (viewer, operator, administrator) should be able to read cameras
for token in [viewer_token, operator_token, auth_token]:
response = await async_client.get(
"/api/v1/cameras/1",
headers={"Authorization": f"Bearer {token}"}
)
# Should succeed or return 404 (if camera doesn't exist), but not 403
assert response.status_code in [status.HTTP_200_OK, status.HTTP_404_NOT_FOUND]
async def test_get_camera_caching(self, async_client: AsyncClient, auth_token: str):
"""Test that camera details are cached"""
camera_id = 1
# First request - cache miss
response1 = await async_client.get(
f"/api/v1/cameras/{camera_id}",
headers={"Authorization": f"Bearer {auth_token}"}
)
# Second request - cache hit (if camera exists)
response2 = await async_client.get(
f"/api/v1/cameras/{camera_id}",
headers={"Authorization": f"Bearer {auth_token}"}
)
# Both should have same status code
assert response1.status_code == response2.status_code
# If successful, results should be identical
if response1.status_code == status.HTTP_200_OK:
assert response1.json() == response2.json()
@pytest.mark.asyncio
class TestCameraIntegration:
"""Integration tests for camera endpoints with SDK Bridge"""
async def test_camera_data_consistency(self, async_client: AsyncClient, auth_token: str):
"""Test that camera data is consistent between list and detail endpoints"""
# Get camera list
list_response = await async_client.get(
"/api/v1/cameras",
headers={"Authorization": f"Bearer {auth_token}"}
)
if list_response.status_code != status.HTTP_200_OK:
pytest.skip("Camera list not available")
cameras = list_response.json()["cameras"]
if not cameras:
pytest.skip("No cameras available")
# Get first camera detail
camera_id = cameras[0]["id"]
detail_response = await async_client.get(
f"/api/v1/cameras/{camera_id}",
headers={"Authorization": f"Bearer {auth_token}"}
)
assert detail_response.status_code == status.HTTP_200_OK
# Verify consistency
list_camera = cameras[0]
detail_camera = detail_response.json()
assert list_camera["id"] == detail_camera["id"]
assert list_camera["name"] == detail_camera["name"]
assert list_camera["status"] == detail_camera["status"]