diff --git a/src/api/main.py b/src/api/main.py index 800e0ba..657610d 100644 --- a/src/api/main.py +++ b/src/api/main.py @@ -158,14 +158,11 @@ async def root(): } # Register routers -from routers import auth, cameras, monitors +from routers import auth, cameras, monitors, crossswitch app.include_router(auth.router) app.include_router(cameras.router) app.include_router(monitors.router) - -# TODO: Add remaining routers as phases complete -# from routers import crossswitch -# app.include_router(crossswitch.router) +app.include_router(crossswitch.router) if __name__ == "__main__": import uvicorn diff --git a/src/api/migrations/versions/20251209_crossswitch_routes.py b/src/api/migrations/versions/20251209_crossswitch_routes.py new file mode 100644 index 0000000..5e80131 --- /dev/null +++ b/src/api/migrations/versions/20251209_crossswitch_routes.py @@ -0,0 +1,68 @@ +"""Add crossswitch_routes table + +Revision ID: 20251209_crossswitch +Revises: 20251208_initial_schema +Create Date: 2025-12-09 12:00:00.000000 + +""" +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects.postgresql import UUID, JSONB + +# revision identifiers, used by Alembic. +revision = '20251209_crossswitch' +down_revision = '20251208_initial_schema' +branch_labels = None +depends_on = None + + +def upgrade() -> None: + """Create crossswitch_routes table""" + + # Create crossswitch_routes table + op.create_table( + 'crossswitch_routes', + sa.Column('id', UUID(as_uuid=True), primary_key=True, nullable=False), + sa.Column('camera_id', sa.Integer(), nullable=False, comment='Camera ID (source)'), + sa.Column('monitor_id', sa.Integer(), nullable=False, comment='Monitor ID (destination)'), + sa.Column('mode', sa.Integer(), nullable=True, default=0, comment='Cross-switch mode (0=normal)'), + sa.Column('executed_at', sa.DateTime(), nullable=False), + sa.Column('executed_by', UUID(as_uuid=True), nullable=True), + sa.Column('is_active', sa.Integer(), nullable=False, default=1, comment='1=active route, 0=cleared/historical'), + sa.Column('cleared_at', sa.DateTime(), nullable=True, comment='When this route was cleared'), + sa.Column('cleared_by', UUID(as_uuid=True), nullable=True), + sa.Column('details', JSONB, nullable=True, comment='Additional route details'), + sa.Column('sdk_success', sa.Integer(), nullable=False, default=1, comment='1=SDK success, 0=SDK failure'), + sa.Column('sdk_error', sa.String(500), nullable=True, comment='SDK error message if failed'), + + # Foreign keys + sa.ForeignKeyConstraint(['executed_by'], ['users.id'], ondelete='SET NULL'), + sa.ForeignKeyConstraint(['cleared_by'], ['users.id'], ondelete='SET NULL'), + ) + + # Create indexes for common queries + op.create_index('idx_active_routes', 'crossswitch_routes', ['is_active', 'monitor_id']) + op.create_index('idx_camera_history', 'crossswitch_routes', ['camera_id', 'executed_at']) + op.create_index('idx_monitor_history', 'crossswitch_routes', ['monitor_id', 'executed_at']) + op.create_index('idx_user_routes', 'crossswitch_routes', ['executed_by', 'executed_at']) + + # Create index for single-column lookups + op.create_index('idx_camera_id', 'crossswitch_routes', ['camera_id']) + op.create_index('idx_monitor_id', 'crossswitch_routes', ['monitor_id']) + op.create_index('idx_executed_at', 'crossswitch_routes', ['executed_at']) + + +def downgrade() -> None: + """Drop crossswitch_routes table""" + + # Drop indexes + op.drop_index('idx_executed_at', table_name='crossswitch_routes') + op.drop_index('idx_monitor_id', table_name='crossswitch_routes') + op.drop_index('idx_camera_id', table_name='crossswitch_routes') + op.drop_index('idx_user_routes', table_name='crossswitch_routes') + op.drop_index('idx_monitor_history', table_name='crossswitch_routes') + op.drop_index('idx_camera_history', table_name='crossswitch_routes') + op.drop_index('idx_active_routes', table_name='crossswitch_routes') + + # Drop table + op.drop_table('crossswitch_routes') diff --git a/src/api/models/crossswitch_route.py b/src/api/models/crossswitch_route.py new file mode 100644 index 0000000..c7ccbe5 --- /dev/null +++ b/src/api/models/crossswitch_route.py @@ -0,0 +1,122 @@ +""" +CrossSwitchRoute model for storing cross-switching history and current state +""" +from sqlalchemy import Column, String, Integer, DateTime, ForeignKey, Index +from sqlalchemy.dialects.postgresql import UUID, JSONB +from datetime import datetime +import uuid + +from models import Base + + +class CrossSwitchRoute(Base): + """ + Model for cross-switch routing records + + Stores both current routing state and historical routing changes + """ + __tablename__ = "crossswitch_routes" + + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + + # Route information + camera_id = Column(Integer, nullable=False, index=True, comment="Camera ID (source)") + monitor_id = Column(Integer, nullable=False, index=True, comment="Monitor ID (destination)") + mode = Column(Integer, default=0, comment="Cross-switch mode (0=normal, other modes per SDK)") + + # Execution tracking + executed_at = Column(DateTime, nullable=False, default=datetime.utcnow, index=True) + executed_by = Column(UUID(as_uuid=True), ForeignKey("users.id", ondelete="SET NULL"), nullable=True) + + # Status tracking + is_active = Column(Integer, default=1, nullable=False, index=True, comment="1=active route, 0=cleared/historical") + cleared_at = Column(DateTime, nullable=True, comment="When this route was cleared (if cleared)") + cleared_by = Column(UUID(as_uuid=True), ForeignKey("users.id", ondelete="SET NULL"), nullable=True) + + # Additional metadata + details = Column(JSONB, nullable=True, comment="Additional route details (camera name, monitor name, etc.)") + + # SDK response tracking + sdk_success = Column(Integer, default=1, nullable=False, comment="1=SDK reported success, 0=SDK reported failure") + sdk_error = Column(String(500), nullable=True, comment="SDK error message if failed") + + # Indexes for common queries + __table_args__ = ( + # Index for getting current active routes + Index('idx_active_routes', 'is_active', 'monitor_id'), + # Index for getting route history by camera + Index('idx_camera_history', 'camera_id', 'executed_at'), + # Index for getting route history by monitor + Index('idx_monitor_history', 'monitor_id', 'executed_at'), + # Index for getting user's routing actions + Index('idx_user_routes', 'executed_by', 'executed_at'), + ) + + def __repr__(self): + return f"" + + @classmethod + def create_route( + cls, + camera_id: int, + monitor_id: int, + executed_by: uuid.UUID, + mode: int = 0, + sdk_success: bool = True, + sdk_error: str = None, + details: dict = None + ): + """ + Factory method to create a new route record + + Args: + camera_id: Camera ID + monitor_id: Monitor ID + executed_by: User ID who executed the route + mode: Cross-switch mode (default: 0) + sdk_success: Whether SDK reported success + sdk_error: SDK error message if failed + details: Additional metadata + + Returns: + CrossSwitchRoute instance + """ + return cls( + camera_id=camera_id, + monitor_id=monitor_id, + mode=mode, + executed_by=executed_by, + executed_at=datetime.utcnow(), + is_active=1 if sdk_success else 0, + sdk_success=1 if sdk_success else 0, + sdk_error=sdk_error, + details=details or {} + ) + + def clear_route(self, cleared_by: uuid.UUID): + """ + Mark this route as cleared + + Args: + cleared_by: User ID who cleared the route + """ + self.is_active = 0 + self.cleared_at = datetime.utcnow() + self.cleared_by = cleared_by + + def to_dict(self): + """Convert to dictionary for API responses""" + return { + "id": str(self.id), + "camera_id": self.camera_id, + "monitor_id": self.monitor_id, + "mode": self.mode, + "executed_at": self.executed_at.isoformat() if self.executed_at else None, + "executed_by": str(self.executed_by) if self.executed_by else None, + "is_active": bool(self.is_active), + "cleared_at": self.cleared_at.isoformat() if self.cleared_at else None, + "cleared_by": str(self.cleared_by) if self.cleared_by else None, + "details": self.details, + "sdk_success": bool(self.sdk_success), + "sdk_error": self.sdk_error + } diff --git a/src/api/routers/crossswitch.py b/src/api/routers/crossswitch.py new file mode 100644 index 0000000..dd0803c --- /dev/null +++ b/src/api/routers/crossswitch.py @@ -0,0 +1,302 @@ +""" +Cross-switch router for camera-to-monitor routing operations +""" +from fastapi import APIRouter, Depends, status, HTTPException, Query, Request +from sqlalchemy.ext.asyncio import AsyncSession +import structlog + +from models import get_db +from schemas.crossswitch import ( + CrossSwitchRequest, + CrossSwitchResponse, + ClearMonitorRequest, + ClearMonitorResponse, + RoutingStateResponse, + RouteHistoryResponse +) +from services.crossswitch_service import CrossSwitchService +from middleware.auth_middleware import ( + require_operator, + require_viewer, + get_current_user, + get_client_ip +) +from models.user import User + +logger = structlog.get_logger() + +router = APIRouter( + prefix="/api/v1/crossswitch", + tags=["crossswitch"] +) + + +@router.post( + "", + response_model=CrossSwitchResponse, + status_code=status.HTTP_200_OK, + summary="Execute cross-switch", + description="Route a camera to a monitor (requires Operator role or higher)", + dependencies=[Depends(require_operator)] # Requires at least operator role +) +async def execute_crossswitch( + request: Request, + crossswitch_request: CrossSwitchRequest, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(require_operator) +): + """ + Execute cross-switch operation (route camera to monitor) + + **Authentication Required:** + - Minimum role: Operator + - Viewers cannot execute cross-switching (read-only) + + **Request Body:** + - `camera_id`: Camera ID to display (must be positive integer) + - `monitor_id`: Monitor ID to display on (must be positive integer) + - `mode`: Cross-switch mode (default: 0=normal, optional) + + **Response:** + - `success`: Whether operation succeeded + - `message`: Success message + - `route`: Route information including execution details + + **Side Effects:** + - Clears any existing camera on the target monitor + - Creates database record of routing change + - Creates audit log entry + - Invalidates monitor cache + + **Errors:** + - `400 Bad Request`: Invalid camera or monitor ID + - `403 Forbidden`: User does not have Operator role + - `404 Not Found`: Camera or monitor not found + - `500 Internal Server Error`: SDK Bridge communication failure + """ + crossswitch_service = CrossSwitchService(db) + + logger.info("execute_crossswitch_request", + user_id=str(current_user.id), + username=current_user.username, + camera_id=crossswitch_request.camera_id, + monitor_id=crossswitch_request.monitor_id, + mode=crossswitch_request.mode) + + try: + result = await crossswitch_service.execute_crossswitch( + camera_id=crossswitch_request.camera_id, + monitor_id=crossswitch_request.monitor_id, + user_id=current_user.id, + username=current_user.username, + mode=crossswitch_request.mode, + ip_address=get_client_ip(request) + ) + + logger.info("execute_crossswitch_success", + user_id=str(current_user.id), + camera_id=crossswitch_request.camera_id, + monitor_id=crossswitch_request.monitor_id) + + return result + + except Exception as e: + logger.error("execute_crossswitch_failed", + user_id=str(current_user.id), + camera_id=crossswitch_request.camera_id, + monitor_id=crossswitch_request.monitor_id, + error=str(e), + exc_info=True) + + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Cross-switch operation failed: {str(e)}" + ) + + +@router.post( + "/clear", + response_model=ClearMonitorResponse, + status_code=status.HTTP_200_OK, + summary="Clear monitor", + description="Clear camera from monitor (requires Operator role or higher)", + dependencies=[Depends(require_operator)] # Requires at least operator role +) +async def clear_monitor( + request: Request, + clear_request: ClearMonitorRequest, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(require_operator) +): + """ + Clear monitor (remove camera from monitor) + + **Authentication Required:** + - Minimum role: Operator + - Viewers cannot clear monitors (read-only) + + **Request Body:** + - `monitor_id`: Monitor ID to clear (must be positive integer) + + **Response:** + - `success`: Whether operation succeeded + - `message`: Success message + - `monitor_id`: Monitor ID that was cleared + + **Side Effects:** + - Marks existing route as cleared in database + - Creates audit log entry + - Invalidates monitor cache + + **Errors:** + - `400 Bad Request`: Invalid monitor ID + - `403 Forbidden`: User does not have Operator role + - `404 Not Found`: Monitor not found + - `500 Internal Server Error`: SDK Bridge communication failure + """ + crossswitch_service = CrossSwitchService(db) + + logger.info("clear_monitor_request", + user_id=str(current_user.id), + username=current_user.username, + monitor_id=clear_request.monitor_id) + + try: + result = await crossswitch_service.clear_monitor( + monitor_id=clear_request.monitor_id, + user_id=current_user.id, + username=current_user.username, + ip_address=get_client_ip(request) + ) + + logger.info("clear_monitor_success", + user_id=str(current_user.id), + monitor_id=clear_request.monitor_id) + + return result + + except Exception as e: + logger.error("clear_monitor_failed", + user_id=str(current_user.id), + monitor_id=clear_request.monitor_id, + error=str(e), + exc_info=True) + + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Clear monitor operation failed: {str(e)}" + ) + + +@router.get( + "/routing", + response_model=RoutingStateResponse, + status_code=status.HTTP_200_OK, + summary="Get routing state", + description="Get current routing state (active camera-to-monitor mappings)", + dependencies=[Depends(require_viewer)] # All authenticated users can view +) +async def get_routing_state( + db: AsyncSession = Depends(get_db), + current_user: User = Depends(require_viewer) +): + """ + Get current routing state (active routes) + + **Authentication Required:** + - Minimum role: Viewer (all authenticated users can view routing state) + + **Response:** + - `routes`: List of active route objects + - `total`: Total number of active routes + + **Route Object:** + - `id`: Route UUID + - `camera_id`: Camera ID + - `monitor_id`: Monitor ID + - `mode`: Cross-switch mode + - `executed_at`: When route was executed + - `executed_by`: User ID who executed + - `is_active`: Whether route is active (always true for this endpoint) + - `camera_name`: Camera name (if available) + - `monitor_name`: Monitor name (if available) + """ + crossswitch_service = CrossSwitchService(db) + + logger.info("get_routing_state_request", + user_id=str(current_user.id), + username=current_user.username) + + result = await crossswitch_service.get_routing_state() + + logger.info("get_routing_state_response", + user_id=str(current_user.id), + count=result["total"]) + + return result + + +@router.get( + "/history", + response_model=RouteHistoryResponse, + status_code=status.HTTP_200_OK, + summary="Get routing history", + description="Get historical routing records (all routes including cleared)", + dependencies=[Depends(require_viewer)] # All authenticated users can view +) +async def get_routing_history( + limit: int = Query(100, ge=1, le=1000, description="Maximum records to return"), + offset: int = Query(0, ge=0, description="Number of records to skip"), + camera_id: Optional[int] = Query(None, gt=0, description="Filter by camera ID"), + monitor_id: Optional[int] = Query(None, gt=0, description="Filter by monitor ID"), + db: AsyncSession = Depends(get_db), + current_user: User = Depends(require_viewer) +): + """ + Get routing history (all routes including cleared) + + **Authentication Required:** + - Minimum role: Viewer + + **Query Parameters:** + - `limit`: Maximum records to return (1-1000, default: 100) + - `offset`: Number of records to skip (default: 0) + - `camera_id`: Filter by camera ID (optional) + - `monitor_id`: Filter by monitor ID (optional) + + **Response:** + - `history`: List of historical route objects + - `total`: Total number of historical records (before pagination) + - `limit`: Applied limit + - `offset`: Applied offset + + **Use Cases:** + - Audit trail of all routing changes + - Investigate when a camera was last displayed on a monitor + - Track operator actions + """ + from typing import Optional # Import for type hints + + crossswitch_service = CrossSwitchService(db) + + logger.info("get_routing_history_request", + user_id=str(current_user.id), + username=current_user.username, + limit=limit, + offset=offset, + camera_id=camera_id, + monitor_id=monitor_id) + + result = await crossswitch_service.get_routing_history( + limit=limit, + offset=offset, + camera_id=camera_id, + monitor_id=monitor_id + ) + + logger.info("get_routing_history_response", + user_id=str(current_user.id), + count=len(result["history"]), + total=result["total"]) + + return result diff --git a/src/api/schemas/crossswitch.py b/src/api/schemas/crossswitch.py new file mode 100644 index 0000000..62cc797 --- /dev/null +++ b/src/api/schemas/crossswitch.py @@ -0,0 +1,203 @@ +""" +Cross-switch schemas for request/response validation +""" +from pydantic import BaseModel, Field, field_validator +from typing import Optional, List +from datetime import datetime + + +class CrossSwitchRequest(BaseModel): + """Request schema for executing cross-switch""" + camera_id: int = Field(..., gt=0, description="Camera ID (must be positive)") + monitor_id: int = Field(..., gt=0, description="Monitor ID (must be positive)") + mode: int = Field(default=0, ge=0, description="Cross-switch mode (default: 0=normal)") + + @field_validator('camera_id', 'monitor_id') + @classmethod + def validate_positive_id(cls, v: int) -> int: + """Ensure IDs are positive""" + if v <= 0: + raise ValueError('ID must be positive') + return v + + model_config = { + "json_schema_extra": { + "examples": [ + { + "camera_id": 1, + "monitor_id": 1, + "mode": 0 + } + ] + } + } + + +class ClearMonitorRequest(BaseModel): + """Request schema for clearing a monitor""" + monitor_id: int = Field(..., gt=0, description="Monitor ID to clear (must be positive)") + + @field_validator('monitor_id') + @classmethod + def validate_positive_id(cls, v: int) -> int: + """Ensure monitor ID is positive""" + if v <= 0: + raise ValueError('Monitor ID must be positive') + return v + + model_config = { + "json_schema_extra": { + "examples": [ + { + "monitor_id": 1 + } + ] + } + } + + +class RouteInfo(BaseModel): + """Route information schema""" + id: str = Field(..., description="Route UUID") + camera_id: int = Field(..., description="Camera ID") + monitor_id: int = Field(..., description="Monitor ID") + mode: int = Field(default=0, description="Cross-switch mode") + executed_at: datetime = Field(..., description="When route was executed") + executed_by: Optional[str] = Field(None, description="User ID who executed the route") + executed_by_username: Optional[str] = Field(None, description="Username who executed the route") + is_active: bool = Field(..., description="Whether route is currently active") + camera_name: Optional[str] = Field(None, description="Camera name") + monitor_name: Optional[str] = Field(None, description="Monitor name") + + model_config = { + "from_attributes": True, + "json_schema_extra": { + "examples": [ + { + "id": "550e8400-e29b-41d4-a716-446655440000", + "camera_id": 1, + "monitor_id": 1, + "mode": 0, + "executed_at": "2025-12-09T10:30:00Z", + "executed_by": "550e8400-e29b-41d4-a716-446655440001", + "executed_by_username": "operator", + "is_active": True, + "camera_name": "Entrance Camera", + "monitor_name": "Control Room Monitor 1" + } + ] + } + } + + +class CrossSwitchResponse(BaseModel): + """Response schema for successful cross-switch execution""" + success: bool = Field(..., description="Whether operation succeeded") + message: str = Field(..., description="Success message") + route: RouteInfo = Field(..., description="Route information") + + model_config = { + "json_schema_extra": { + "examples": [ + { + "success": True, + "message": "Successfully switched camera 1 to monitor 1", + "route": { + "id": "550e8400-e29b-41d4-a716-446655440000", + "camera_id": 1, + "monitor_id": 1, + "mode": 0, + "executed_at": "2025-12-09T10:30:00Z", + "executed_by": "550e8400-e29b-41d4-a716-446655440001", + "executed_by_username": "operator", + "is_active": True, + "camera_name": "Entrance Camera", + "monitor_name": "Control Room Monitor 1" + } + } + ] + } + } + + +class ClearMonitorResponse(BaseModel): + """Response schema for successful clear monitor operation""" + success: bool = Field(..., description="Whether operation succeeded") + message: str = Field(..., description="Success message") + monitor_id: int = Field(..., description="Monitor ID that was cleared") + + model_config = { + "json_schema_extra": { + "examples": [ + { + "success": True, + "message": "Successfully cleared monitor 1", + "monitor_id": 1 + } + ] + } + } + + +class RoutingStateResponse(BaseModel): + """Response schema for routing state query""" + routes: List[RouteInfo] = Field(..., description="List of active routes") + total: int = Field(..., description="Total number of active routes") + + model_config = { + "json_schema_extra": { + "examples": [ + { + "routes": [ + { + "id": "550e8400-e29b-41d4-a716-446655440000", + "camera_id": 1, + "monitor_id": 1, + "mode": 0, + "executed_at": "2025-12-09T10:30:00Z", + "executed_by": "550e8400-e29b-41d4-a716-446655440001", + "executed_by_username": "operator", + "is_active": True, + "camera_name": "Entrance Camera", + "monitor_name": "Control Room Monitor 1" + } + ], + "total": 1 + } + ] + } + } + + +class RouteHistoryResponse(BaseModel): + """Response schema for routing history query""" + history: List[RouteInfo] = Field(..., description="List of historical routes") + total: int = Field(..., description="Total number of historical records") + limit: int = Field(..., description="Pagination limit") + offset: int = Field(..., description="Pagination offset") + + model_config = { + "json_schema_extra": { + "examples": [ + { + "history": [ + { + "id": "550e8400-e29b-41d4-a716-446655440000", + "camera_id": 1, + "monitor_id": 1, + "mode": 0, + "executed_at": "2025-12-09T10:30:00Z", + "executed_by": "550e8400-e29b-41d4-a716-446655440001", + "executed_by_username": "operator", + "is_active": False, + "camera_name": "Entrance Camera", + "monitor_name": "Control Room Monitor 1" + } + ], + "total": 50, + "limit": 10, + "offset": 0 + } + ] + } + } diff --git a/src/api/services/crossswitch_service.py b/src/api/services/crossswitch_service.py new file mode 100644 index 0000000..65579e1 --- /dev/null +++ b/src/api/services/crossswitch_service.py @@ -0,0 +1,410 @@ +""" +Cross-switch service for managing camera-to-monitor routing +""" +from typing import List, Optional, Dict, Any +from datetime import datetime +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy import select, and_, desc +import uuid +import structlog + +from models.crossswitch_route import CrossSwitchRoute +from models.audit_log import AuditLog +from clients.sdk_bridge_client import sdk_bridge_client +from clients.redis_client import redis_client + +logger = structlog.get_logger() + + +class CrossSwitchService: + """Service for cross-switching operations""" + + def __init__(self, db_session: AsyncSession): + self.db = db_session + + async def execute_crossswitch( + self, + camera_id: int, + monitor_id: int, + user_id: uuid.UUID, + username: str, + mode: int = 0, + ip_address: Optional[str] = None + ) -> Dict[str, Any]: + """ + Execute cross-switch operation (route camera to monitor) + + Args: + camera_id: Camera ID + monitor_id: Monitor ID + user_id: User ID executing the operation + username: Username executing the operation + mode: Cross-switch mode (default: 0) + ip_address: Client IP address for audit logging + + Returns: + Dictionary with success status, message, and route info + + Raises: + Exception: If SDK Bridge communication fails + """ + logger.info("crossswitch_execute_request", + camera_id=camera_id, + monitor_id=monitor_id, + user_id=str(user_id), + username=username, + mode=mode) + + # First, clear any existing route for this monitor + await self._clear_monitor_routes(monitor_id, user_id) + + # Execute cross-switch via SDK Bridge + try: + success = await sdk_bridge_client.execute_crossswitch( + camera_id=camera_id, + monitor_id=monitor_id, + mode=mode + ) + + sdk_success = True + sdk_error = None + + except Exception as e: + logger.error("crossswitch_sdk_failed", + camera_id=camera_id, + monitor_id=monitor_id, + error=str(e), + exc_info=True) + sdk_success = False + sdk_error = str(e) + + # Get camera and monitor names for details + details = await self._get_route_details(camera_id, monitor_id) + + # Create database record + route = CrossSwitchRoute.create_route( + camera_id=camera_id, + monitor_id=monitor_id, + executed_by=user_id, + mode=mode, + sdk_success=sdk_success, + sdk_error=sdk_error, + details=details + ) + + self.db.add(route) + await self.db.commit() + await self.db.refresh(route) + + # Create audit log + await self._create_audit_log( + action="crossswitch.execute", + target=f"camera:{camera_id}->monitor:{monitor_id}", + outcome="success" if sdk_success else "failure", + details={ + "camera_id": camera_id, + "monitor_id": monitor_id, + "mode": mode, + "sdk_success": sdk_success, + "sdk_error": sdk_error + }, + user_id=user_id, + ip_address=ip_address + ) + + # Invalidate caches + await redis_client.delete("monitors:list") + await redis_client.delete(f"monitors:detail:{monitor_id}") + + if not sdk_success: + logger.error("crossswitch_failed", + camera_id=camera_id, + monitor_id=monitor_id, + error=sdk_error) + raise Exception(f"Cross-switch failed: {sdk_error}") + + logger.info("crossswitch_success", + camera_id=camera_id, + monitor_id=monitor_id, + route_id=str(route.id)) + + return { + "success": True, + "message": f"Successfully switched camera {camera_id} to monitor {monitor_id}", + "route": { + "id": str(route.id), + "camera_id": route.camera_id, + "monitor_id": route.monitor_id, + "mode": route.mode, + "executed_at": route.executed_at, + "executed_by": str(route.executed_by), + "executed_by_username": username, + "is_active": bool(route.is_active), + "camera_name": details.get("camera_name"), + "monitor_name": details.get("monitor_name") + } + } + + async def clear_monitor( + self, + monitor_id: int, + user_id: uuid.UUID, + username: str, + ip_address: Optional[str] = None + ) -> Dict[str, Any]: + """ + Clear monitor (remove camera from monitor) + + Args: + monitor_id: Monitor ID to clear + user_id: User ID executing the operation + username: Username executing the operation + ip_address: Client IP address for audit logging + + Returns: + Dictionary with success status and message + + Raises: + Exception: If SDK Bridge communication fails + """ + logger.info("clear_monitor_request", + monitor_id=monitor_id, + user_id=str(user_id), + username=username) + + # Execute clear via SDK Bridge + try: + success = await sdk_bridge_client.clear_monitor(monitor_id) + sdk_success = True + sdk_error = None + + except Exception as e: + logger.error("clear_monitor_sdk_failed", + monitor_id=monitor_id, + error=str(e), + exc_info=True) + sdk_success = False + sdk_error = str(e) + + # Mark existing routes as cleared in database + await self._clear_monitor_routes(monitor_id, user_id) + + # Create audit log + await self._create_audit_log( + action="crossswitch.clear", + target=f"monitor:{monitor_id}", + outcome="success" if sdk_success else "failure", + details={ + "monitor_id": monitor_id, + "sdk_success": sdk_success, + "sdk_error": sdk_error + }, + user_id=user_id, + ip_address=ip_address + ) + + # Invalidate caches + await redis_client.delete("monitors:list") + await redis_client.delete(f"monitors:detail:{monitor_id}") + + if not sdk_success: + logger.error("clear_monitor_failed", + monitor_id=monitor_id, + error=sdk_error) + raise Exception(f"Clear monitor failed: {sdk_error}") + + logger.info("clear_monitor_success", monitor_id=monitor_id) + + return { + "success": True, + "message": f"Successfully cleared monitor {monitor_id}", + "monitor_id": monitor_id + } + + async def get_routing_state(self) -> Dict[str, Any]: + """ + Get current routing state (active routes) + + Returns: + Dictionary with list of active routes + """ + logger.info("get_routing_state_request") + + # Query active routes from database + result = await self.db.execute( + select(CrossSwitchRoute) + .where(CrossSwitchRoute.is_active == 1) + .order_by(desc(CrossSwitchRoute.executed_at)) + ) + routes = result.scalars().all() + + # Transform to response format + routes_list = [ + { + "id": str(route.id), + "camera_id": route.camera_id, + "monitor_id": route.monitor_id, + "mode": route.mode, + "executed_at": route.executed_at, + "executed_by": str(route.executed_by) if route.executed_by else None, + "is_active": bool(route.is_active), + "camera_name": route.details.get("camera_name") if route.details else None, + "monitor_name": route.details.get("monitor_name") if route.details else None + } + for route in routes + ] + + logger.info("get_routing_state_response", count=len(routes_list)) + + return { + "routes": routes_list, + "total": len(routes_list) + } + + async def get_routing_history( + self, + limit: int = 100, + offset: int = 0, + camera_id: Optional[int] = None, + monitor_id: Optional[int] = None + ) -> Dict[str, Any]: + """ + Get routing history (all routes including cleared) + + Args: + limit: Maximum number of records to return + offset: Number of records to skip + camera_id: Filter by camera ID (optional) + monitor_id: Filter by monitor ID (optional) + + Returns: + Dictionary with historical routes and pagination info + """ + logger.info("get_routing_history_request", + limit=limit, + offset=offset, + camera_id=camera_id, + monitor_id=monitor_id) + + # Build query with optional filters + query = select(CrossSwitchRoute).order_by(desc(CrossSwitchRoute.executed_at)) + + if camera_id is not None: + query = query.where(CrossSwitchRoute.camera_id == camera_id) + + if monitor_id is not None: + query = query.where(CrossSwitchRoute.monitor_id == monitor_id) + + # Get total count + count_result = await self.db.execute(query) + total = len(count_result.scalars().all()) + + # Apply pagination + query = query.limit(limit).offset(offset) + + result = await self.db.execute(query) + routes = result.scalars().all() + + # Transform to response format + history_list = [route.to_dict() for route in routes] + + logger.info("get_routing_history_response", + count=len(history_list), + total=total) + + return { + "history": history_list, + "total": total, + "limit": limit, + "offset": offset + } + + async def _clear_monitor_routes(self, monitor_id: int, cleared_by: uuid.UUID) -> None: + """ + Mark all active routes for a monitor as cleared + + Args: + monitor_id: Monitor ID + cleared_by: User ID who is clearing the routes + """ + result = await self.db.execute( + select(CrossSwitchRoute) + .where(and_( + CrossSwitchRoute.monitor_id == monitor_id, + CrossSwitchRoute.is_active == 1 + )) + ) + active_routes = result.scalars().all() + + for route in active_routes: + route.clear_route(cleared_by) + + if active_routes: + await self.db.commit() + logger.info("monitor_routes_cleared", + monitor_id=monitor_id, + count=len(active_routes)) + + async def _get_route_details(self, camera_id: int, monitor_id: int) -> Dict[str, Any]: + """ + Get additional details for route (camera/monitor names) + + Args: + camera_id: Camera ID + monitor_id: Monitor ID + + Returns: + Dictionary with camera and monitor names + """ + details = {} + + try: + # Get camera name (from cache if available) + camera_data = await redis_client.get_json(f"cameras:detail:{camera_id}") + if camera_data: + details["camera_name"] = camera_data.get("name") + + # Get monitor name (from cache if available) + monitor_data = await redis_client.get_json(f"monitors:detail:{monitor_id}") + if monitor_data: + details["monitor_name"] = monitor_data.get("name") + + except Exception as e: + logger.warning("failed_to_get_route_details", error=str(e)) + + return details + + async def _create_audit_log( + self, + action: str, + target: str, + outcome: str, + details: Optional[Dict[str, Any]] = None, + user_id: Optional[uuid.UUID] = None, + ip_address: Optional[str] = None + ) -> None: + """ + Create audit log entry + + Args: + action: Action name + target: Target of action + outcome: Outcome (success, failure, error) + details: Additional details + user_id: User ID + ip_address: Client IP address + """ + try: + audit_log = AuditLog( + user_id=user_id, + action=action, + target=target, + outcome=outcome, + details=details, + ip_address=ip_address + ) + self.db.add(audit_log) + await self.db.commit() + except Exception as e: + logger.error("audit_log_creation_failed", action=action, error=str(e)) + await self.db.rollback() diff --git a/src/api/tests/test_crossswitch_api.py b/src/api/tests/test_crossswitch_api.py new file mode 100644 index 0000000..c6362af --- /dev/null +++ b/src/api/tests/test_crossswitch_api.py @@ -0,0 +1,382 @@ +""" +Contract tests for cross-switch API endpoints +These tests define the expected behavior - they will FAIL until implementation is complete +""" +import pytest +from httpx import AsyncClient +from fastapi import status + + +@pytest.mark.asyncio +class TestCrossSwitchExecution: + """Contract tests for POST /api/v1/crossswitch""" + + async def test_crossswitch_success_operator(self, async_client: AsyncClient, operator_token: str): + """Test successful cross-switch with operator role""" + response = await async_client.post( + "/api/v1/crossswitch", + json={ + "camera_id": 1, + "monitor_id": 1, + "mode": 0 + }, + headers={"Authorization": f"Bearer {operator_token}"} + ) + + assert response.status_code == status.HTTP_200_OK + data = response.json() + + # Verify response structure + assert "success" in data + assert data["success"] is True + assert "message" in data + assert "route" in data + + # Verify route details + route = data["route"] + assert route["camera_id"] == 1 + assert route["monitor_id"] == 1 + assert "executed_at" in route + assert "executed_by" in route + + async def test_crossswitch_success_administrator(self, async_client: AsyncClient, auth_token: str): + """Test successful cross-switch with administrator role""" + response = await async_client.post( + "/api/v1/crossswitch", + json={ + "camera_id": 2, + "monitor_id": 2, + "mode": 0 + }, + headers={"Authorization": f"Bearer {auth_token}"} + ) + + assert response.status_code == status.HTTP_200_OK + data = response.json() + assert data["success"] is True + + async def test_crossswitch_forbidden_viewer(self, async_client: AsyncClient, viewer_token: str): + """Test that viewer role cannot execute cross-switch""" + response = await async_client.post( + "/api/v1/crossswitch", + json={ + "camera_id": 1, + "monitor_id": 1, + "mode": 0 + }, + headers={"Authorization": f"Bearer {viewer_token}"} + ) + + assert response.status_code == status.HTTP_403_FORBIDDEN + data = response.json() + assert "error" in data or "detail" in data + + async def test_crossswitch_no_auth(self, async_client: AsyncClient): + """Test cross-switch without authentication""" + response = await async_client.post( + "/api/v1/crossswitch", + json={ + "camera_id": 1, + "monitor_id": 1, + "mode": 0 + } + ) + + assert response.status_code == status.HTTP_401_UNAUTHORIZED + + async def test_crossswitch_invalid_camera(self, async_client: AsyncClient, operator_token: str): + """Test cross-switch with invalid camera ID""" + response = await async_client.post( + "/api/v1/crossswitch", + json={ + "camera_id": 99999, # Non-existent camera + "monitor_id": 1, + "mode": 0 + }, + headers={"Authorization": f"Bearer {operator_token}"} + ) + + # Should return 400 or 404 depending on implementation + assert response.status_code in [status.HTTP_400_BAD_REQUEST, status.HTTP_404_NOT_FOUND] + + async def test_crossswitch_invalid_monitor(self, async_client: AsyncClient, operator_token: str): + """Test cross-switch with invalid monitor ID""" + response = await async_client.post( + "/api/v1/crossswitch", + json={ + "camera_id": 1, + "monitor_id": 99999, # Non-existent monitor + "mode": 0 + }, + headers={"Authorization": f"Bearer {operator_token}"} + ) + + assert response.status_code in [status.HTTP_400_BAD_REQUEST, status.HTTP_404_NOT_FOUND] + + async def test_crossswitch_missing_camera_id(self, async_client: AsyncClient, operator_token: str): + """Test cross-switch with missing camera_id""" + response = await async_client.post( + "/api/v1/crossswitch", + json={ + "monitor_id": 1, + "mode": 0 + }, + headers={"Authorization": f"Bearer {operator_token}"} + ) + + assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY + + async def test_crossswitch_missing_monitor_id(self, async_client: AsyncClient, operator_token: str): + """Test cross-switch with missing monitor_id""" + response = await async_client.post( + "/api/v1/crossswitch", + json={ + "camera_id": 1, + "mode": 0 + }, + headers={"Authorization": f"Bearer {operator_token}"} + ) + + assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY + + async def test_crossswitch_negative_ids(self, async_client: AsyncClient, operator_token: str): + """Test cross-switch with negative IDs""" + response = await async_client.post( + "/api/v1/crossswitch", + json={ + "camera_id": -1, + "monitor_id": -1, + "mode": 0 + }, + headers={"Authorization": f"Bearer {operator_token}"} + ) + + assert response.status_code in [status.HTTP_400_BAD_REQUEST, status.HTTP_422_UNPROCESSABLE_ENTITY] + + async def test_crossswitch_default_mode(self, async_client: AsyncClient, operator_token: str): + """Test cross-switch with default mode (mode not specified)""" + response = await async_client.post( + "/api/v1/crossswitch", + json={ + "camera_id": 1, + "monitor_id": 1 + }, + headers={"Authorization": f"Bearer {operator_token}"} + ) + + # Should succeed with default mode=0 + assert response.status_code in [status.HTTP_200_OK, status.HTTP_400_BAD_REQUEST, status.HTTP_404_NOT_FOUND] + + +@pytest.mark.asyncio +class TestClearMonitor: + """Contract tests for POST /api/v1/crossswitch/clear""" + + async def test_clear_monitor_success_operator(self, async_client: AsyncClient, operator_token: str): + """Test successful clear monitor with operator role""" + response = await async_client.post( + "/api/v1/crossswitch/clear", + json={"monitor_id": 1}, + headers={"Authorization": f"Bearer {operator_token}"} + ) + + assert response.status_code == status.HTTP_200_OK + data = response.json() + + assert "success" in data + assert data["success"] is True + assert "message" in data + assert "monitor_id" in data + assert data["monitor_id"] == 1 + + async def test_clear_monitor_success_administrator(self, async_client: AsyncClient, auth_token: str): + """Test successful clear monitor with administrator role""" + response = await async_client.post( + "/api/v1/crossswitch/clear", + json={"monitor_id": 2}, + headers={"Authorization": f"Bearer {auth_token}"} + ) + + assert response.status_code == status.HTTP_200_OK + data = response.json() + assert data["success"] is True + + async def test_clear_monitor_forbidden_viewer(self, async_client: AsyncClient, viewer_token: str): + """Test that viewer role cannot clear monitor""" + response = await async_client.post( + "/api/v1/crossswitch/clear", + json={"monitor_id": 1}, + headers={"Authorization": f"Bearer {viewer_token}"} + ) + + assert response.status_code == status.HTTP_403_FORBIDDEN + + async def test_clear_monitor_no_auth(self, async_client: AsyncClient): + """Test clear monitor without authentication""" + response = await async_client.post( + "/api/v1/crossswitch/clear", + json={"monitor_id": 1} + ) + + assert response.status_code == status.HTTP_401_UNAUTHORIZED + + async def test_clear_monitor_invalid_id(self, async_client: AsyncClient, operator_token: str): + """Test clear monitor with invalid monitor ID""" + response = await async_client.post( + "/api/v1/crossswitch/clear", + json={"monitor_id": 99999}, + headers={"Authorization": f"Bearer {operator_token}"} + ) + + assert response.status_code in [status.HTTP_400_BAD_REQUEST, status.HTTP_404_NOT_FOUND] + + async def test_clear_monitor_missing_id(self, async_client: AsyncClient, operator_token: str): + """Test clear monitor with missing monitor_id""" + response = await async_client.post( + "/api/v1/crossswitch/clear", + json={}, + headers={"Authorization": f"Bearer {operator_token}"} + ) + + assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY + + +@pytest.mark.asyncio +class TestRoutingState: + """Contract tests for GET /api/v1/crossswitch/routing""" + + async def test_get_routing_state_viewer(self, async_client: AsyncClient, viewer_token: str): + """Test getting routing state with viewer role""" + response = await async_client.get( + "/api/v1/crossswitch/routing", + headers={"Authorization": f"Bearer {viewer_token}"} + ) + + assert response.status_code == status.HTTP_200_OK + data = response.json() + + # Verify response structure + assert "routes" in data + assert isinstance(data["routes"], list) + + async def test_get_routing_state_operator(self, async_client: AsyncClient, operator_token: str): + """Test getting routing state with operator role""" + response = await async_client.get( + "/api/v1/crossswitch/routing", + headers={"Authorization": f"Bearer {operator_token}"} + ) + + assert response.status_code == status.HTTP_200_OK + data = response.json() + assert "routes" in data + + async def test_get_routing_state_no_auth(self, async_client: AsyncClient): + """Test getting routing state without authentication""" + response = await async_client.get("/api/v1/crossswitch/routing") + + assert response.status_code == status.HTTP_401_UNAUTHORIZED + + async def test_routing_state_structure(self, async_client: AsyncClient, viewer_token: str): + """Test routing state response structure""" + response = await async_client.get( + "/api/v1/crossswitch/routing", + headers={"Authorization": f"Bearer {viewer_token}"} + ) + + assert response.status_code == status.HTTP_200_OK + data = response.json() + + # Verify structure + if data["routes"]: + route = data["routes"][0] + assert "monitor_id" in route + assert "camera_id" in route + assert "executed_at" in route + assert "executed_by" in route + + +@pytest.mark.asyncio +class TestRoutingHistory: + """Contract tests for GET /api/v1/crossswitch/history""" + + async def test_get_routing_history_viewer(self, async_client: AsyncClient, viewer_token: str): + """Test getting routing history with viewer role""" + response = await async_client.get( + "/api/v1/crossswitch/history", + headers={"Authorization": f"Bearer {viewer_token}"} + ) + + assert response.status_code == status.HTTP_200_OK + data = response.json() + + assert "history" in data + assert "total" in data + assert isinstance(data["history"], list) + + async def test_get_routing_history_pagination(self, async_client: AsyncClient, viewer_token: str): + """Test routing history with pagination""" + response = await async_client.get( + "/api/v1/crossswitch/history?limit=10&offset=0", + headers={"Authorization": f"Bearer {viewer_token}"} + ) + + assert response.status_code == status.HTTP_200_OK + data = response.json() + assert len(data["history"]) <= 10 + + async def test_get_routing_history_no_auth(self, async_client: AsyncClient): + """Test getting routing history without authentication""" + response = await async_client.get("/api/v1/crossswitch/history") + + assert response.status_code == status.HTTP_401_UNAUTHORIZED + + +@pytest.mark.asyncio +class TestCrossSwitchIntegration: + """Integration tests for complete cross-switch workflow""" + + async def test_crossswitch_then_query_state(self, async_client: AsyncClient, operator_token: str): + """Test cross-switch execution followed by state query""" + # Execute cross-switch + switch_response = await async_client.post( + "/api/v1/crossswitch", + json={"camera_id": 1, "monitor_id": 1, "mode": 0}, + headers={"Authorization": f"Bearer {operator_token}"} + ) + + if switch_response.status_code != status.HTTP_200_OK: + pytest.skip("Cross-switch not available") + + # Query routing state + state_response = await async_client.get( + "/api/v1/crossswitch/routing", + headers={"Authorization": f"Bearer {operator_token}"} + ) + + assert state_response.status_code == status.HTTP_200_OK + routes = state_response.json()["routes"] + + # Verify the route exists in state + assert any(r["monitor_id"] == 1 and r["camera_id"] == 1 for r in routes) + + async def test_crossswitch_then_clear(self, async_client: AsyncClient, operator_token: str): + """Test cross-switch followed by clear monitor""" + # Execute cross-switch + switch_response = await async_client.post( + "/api/v1/crossswitch", + json={"camera_id": 1, "monitor_id": 1, "mode": 0}, + headers={"Authorization": f"Bearer {operator_token}"} + ) + + if switch_response.status_code != status.HTTP_200_OK: + pytest.skip("Cross-switch not available") + + # Clear the monitor + clear_response = await async_client.post( + "/api/v1/crossswitch/clear", + json={"monitor_id": 1}, + headers={"Authorization": f"Bearer {operator_token}"} + ) + + assert clear_response.status_code == status.HTTP_200_OK + assert clear_response.json()["success"] is True