Phase 7: Cross-Switching - CORE FUNCTIONALITY (T063-T074)

Implemented complete cross-switching system with database persistence and audit logging:

**Tests:**
- Contract tests for POST /api/v1/crossswitch (execute cross-switch)
- Contract tests for POST /api/v1/crossswitch/clear (clear monitor)
- Contract tests for GET /api/v1/crossswitch/routing (routing state)
- Contract tests for GET /api/v1/crossswitch/history (routing history)
- Integration tests for complete cross-switch workflow
- RBAC tests (operator required for execution, viewer for reading)

**Database:**
- CrossSwitchRoute model with full routing history tracking
- Fields: camera_id, monitor_id, mode, executed_at, executed_by, is_active
- Cleared route tracking: cleared_at, cleared_by
- SDK response tracking: sdk_success, sdk_error
- JSONB details field for camera/monitor names
- Comprehensive indexes for performance

**Migration:**
- 20251209_crossswitch_routes: Creates crossswitch_routes table
- Foreign keys to users table for executed_by and cleared_by
- Indexes: active routes, camera history, monitor history, user routes

**Schemas:**
- CrossSwitchRequest: camera_id, monitor_id, mode validation
- ClearMonitorRequest: monitor_id validation
- RouteInfo: Complete route information with user details
- CrossSwitchResponse, ClearMonitorResponse, RoutingStateResponse
- RouteHistoryResponse: Pagination support

**Services:**
- CrossSwitchService: Complete cross-switching logic
- execute_crossswitch(): Route camera to monitor via SDK Bridge
- clear_monitor(): Remove camera from monitor
- get_routing_state(): Get active routes
- get_routing_history(): Get historical routes with pagination
- Automatic route clearing when new camera assigned to monitor
- Cache invalidation after routing changes
- Integrated audit logging for all operations

**Router Endpoints:**
- POST /api/v1/crossswitch - Execute cross-switch (Operator+)
- POST /api/v1/crossswitch/clear - Clear monitor (Operator+)
- GET /api/v1/crossswitch/routing - Get routing state (Viewer+)
- GET /api/v1/crossswitch/history - Get routing history (Viewer+)

**RBAC:**
- Operator role or higher required for execution (crossswitch, clear)
- Viewer role can read routing state and history
- Administrator has all permissions

**Audit Logging:**
- All cross-switch operations logged to audit_logs table
- Tracks: user, IP address, camera/monitor IDs, success/failure
- SDK errors captured in both audit log and route record

**Integration:**
- Registered crossswitch router in main.py
- SDK Bridge integration for hardware control
- Redis cache invalidation on routing changes
- Database persistence of all routing history

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
Geutebruck API Developer
2025-12-09 13:39:53 +01:00
parent 0361826d3e
commit aa6f7ec947
7 changed files with 1489 additions and 5 deletions

View File

@@ -158,14 +158,11 @@ async def root():
}
# Register routers
from routers import auth, cameras, monitors
from routers import auth, cameras, monitors, crossswitch
app.include_router(auth.router)
app.include_router(cameras.router)
app.include_router(monitors.router)
# TODO: Add remaining routers as phases complete
# from routers import crossswitch
# app.include_router(crossswitch.router)
app.include_router(crossswitch.router)
if __name__ == "__main__":
import uvicorn

View File

@@ -0,0 +1,68 @@
"""Add crossswitch_routes table
Revision ID: 20251209_crossswitch
Revises: 20251208_initial_schema
Create Date: 2025-12-09 12:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import UUID, JSONB
# revision identifiers, used by Alembic.
revision = '20251209_crossswitch'
down_revision = '20251208_initial_schema'
branch_labels = None
depends_on = None
def upgrade() -> None:
"""Create crossswitch_routes table"""
# Create crossswitch_routes table
op.create_table(
'crossswitch_routes',
sa.Column('id', UUID(as_uuid=True), primary_key=True, nullable=False),
sa.Column('camera_id', sa.Integer(), nullable=False, comment='Camera ID (source)'),
sa.Column('monitor_id', sa.Integer(), nullable=False, comment='Monitor ID (destination)'),
sa.Column('mode', sa.Integer(), nullable=True, default=0, comment='Cross-switch mode (0=normal)'),
sa.Column('executed_at', sa.DateTime(), nullable=False),
sa.Column('executed_by', UUID(as_uuid=True), nullable=True),
sa.Column('is_active', sa.Integer(), nullable=False, default=1, comment='1=active route, 0=cleared/historical'),
sa.Column('cleared_at', sa.DateTime(), nullable=True, comment='When this route was cleared'),
sa.Column('cleared_by', UUID(as_uuid=True), nullable=True),
sa.Column('details', JSONB, nullable=True, comment='Additional route details'),
sa.Column('sdk_success', sa.Integer(), nullable=False, default=1, comment='1=SDK success, 0=SDK failure'),
sa.Column('sdk_error', sa.String(500), nullable=True, comment='SDK error message if failed'),
# Foreign keys
sa.ForeignKeyConstraint(['executed_by'], ['users.id'], ondelete='SET NULL'),
sa.ForeignKeyConstraint(['cleared_by'], ['users.id'], ondelete='SET NULL'),
)
# Create indexes for common queries
op.create_index('idx_active_routes', 'crossswitch_routes', ['is_active', 'monitor_id'])
op.create_index('idx_camera_history', 'crossswitch_routes', ['camera_id', 'executed_at'])
op.create_index('idx_monitor_history', 'crossswitch_routes', ['monitor_id', 'executed_at'])
op.create_index('idx_user_routes', 'crossswitch_routes', ['executed_by', 'executed_at'])
# Create index for single-column lookups
op.create_index('idx_camera_id', 'crossswitch_routes', ['camera_id'])
op.create_index('idx_monitor_id', 'crossswitch_routes', ['monitor_id'])
op.create_index('idx_executed_at', 'crossswitch_routes', ['executed_at'])
def downgrade() -> None:
"""Drop crossswitch_routes table"""
# Drop indexes
op.drop_index('idx_executed_at', table_name='crossswitch_routes')
op.drop_index('idx_monitor_id', table_name='crossswitch_routes')
op.drop_index('idx_camera_id', table_name='crossswitch_routes')
op.drop_index('idx_user_routes', table_name='crossswitch_routes')
op.drop_index('idx_monitor_history', table_name='crossswitch_routes')
op.drop_index('idx_camera_history', table_name='crossswitch_routes')
op.drop_index('idx_active_routes', table_name='crossswitch_routes')
# Drop table
op.drop_table('crossswitch_routes')

View File

@@ -0,0 +1,122 @@
"""
CrossSwitchRoute model for storing cross-switching history and current state
"""
from sqlalchemy import Column, String, Integer, DateTime, ForeignKey, Index
from sqlalchemy.dialects.postgresql import UUID, JSONB
from datetime import datetime
import uuid
from models import Base
class CrossSwitchRoute(Base):
"""
Model for cross-switch routing records
Stores both current routing state and historical routing changes
"""
__tablename__ = "crossswitch_routes"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
# Route information
camera_id = Column(Integer, nullable=False, index=True, comment="Camera ID (source)")
monitor_id = Column(Integer, nullable=False, index=True, comment="Monitor ID (destination)")
mode = Column(Integer, default=0, comment="Cross-switch mode (0=normal, other modes per SDK)")
# Execution tracking
executed_at = Column(DateTime, nullable=False, default=datetime.utcnow, index=True)
executed_by = Column(UUID(as_uuid=True), ForeignKey("users.id", ondelete="SET NULL"), nullable=True)
# Status tracking
is_active = Column(Integer, default=1, nullable=False, index=True, comment="1=active route, 0=cleared/historical")
cleared_at = Column(DateTime, nullable=True, comment="When this route was cleared (if cleared)")
cleared_by = Column(UUID(as_uuid=True), ForeignKey("users.id", ondelete="SET NULL"), nullable=True)
# Additional metadata
details = Column(JSONB, nullable=True, comment="Additional route details (camera name, monitor name, etc.)")
# SDK response tracking
sdk_success = Column(Integer, default=1, nullable=False, comment="1=SDK reported success, 0=SDK reported failure")
sdk_error = Column(String(500), nullable=True, comment="SDK error message if failed")
# Indexes for common queries
__table_args__ = (
# Index for getting current active routes
Index('idx_active_routes', 'is_active', 'monitor_id'),
# Index for getting route history by camera
Index('idx_camera_history', 'camera_id', 'executed_at'),
# Index for getting route history by monitor
Index('idx_monitor_history', 'monitor_id', 'executed_at'),
# Index for getting user's routing actions
Index('idx_user_routes', 'executed_by', 'executed_at'),
)
def __repr__(self):
return f"<CrossSwitchRoute(camera={self.camera_id}, monitor={self.monitor_id}, active={self.is_active})>"
@classmethod
def create_route(
cls,
camera_id: int,
monitor_id: int,
executed_by: uuid.UUID,
mode: int = 0,
sdk_success: bool = True,
sdk_error: str = None,
details: dict = None
):
"""
Factory method to create a new route record
Args:
camera_id: Camera ID
monitor_id: Monitor ID
executed_by: User ID who executed the route
mode: Cross-switch mode (default: 0)
sdk_success: Whether SDK reported success
sdk_error: SDK error message if failed
details: Additional metadata
Returns:
CrossSwitchRoute instance
"""
return cls(
camera_id=camera_id,
monitor_id=monitor_id,
mode=mode,
executed_by=executed_by,
executed_at=datetime.utcnow(),
is_active=1 if sdk_success else 0,
sdk_success=1 if sdk_success else 0,
sdk_error=sdk_error,
details=details or {}
)
def clear_route(self, cleared_by: uuid.UUID):
"""
Mark this route as cleared
Args:
cleared_by: User ID who cleared the route
"""
self.is_active = 0
self.cleared_at = datetime.utcnow()
self.cleared_by = cleared_by
def to_dict(self):
"""Convert to dictionary for API responses"""
return {
"id": str(self.id),
"camera_id": self.camera_id,
"monitor_id": self.monitor_id,
"mode": self.mode,
"executed_at": self.executed_at.isoformat() if self.executed_at else None,
"executed_by": str(self.executed_by) if self.executed_by else None,
"is_active": bool(self.is_active),
"cleared_at": self.cleared_at.isoformat() if self.cleared_at else None,
"cleared_by": str(self.cleared_by) if self.cleared_by else None,
"details": self.details,
"sdk_success": bool(self.sdk_success),
"sdk_error": self.sdk_error
}

View File

@@ -0,0 +1,302 @@
"""
Cross-switch router for camera-to-monitor routing operations
"""
from fastapi import APIRouter, Depends, status, HTTPException, Query, Request
from sqlalchemy.ext.asyncio import AsyncSession
import structlog
from models import get_db
from schemas.crossswitch import (
CrossSwitchRequest,
CrossSwitchResponse,
ClearMonitorRequest,
ClearMonitorResponse,
RoutingStateResponse,
RouteHistoryResponse
)
from services.crossswitch_service import CrossSwitchService
from middleware.auth_middleware import (
require_operator,
require_viewer,
get_current_user,
get_client_ip
)
from models.user import User
logger = structlog.get_logger()
router = APIRouter(
prefix="/api/v1/crossswitch",
tags=["crossswitch"]
)
@router.post(
"",
response_model=CrossSwitchResponse,
status_code=status.HTTP_200_OK,
summary="Execute cross-switch",
description="Route a camera to a monitor (requires Operator role or higher)",
dependencies=[Depends(require_operator)] # Requires at least operator role
)
async def execute_crossswitch(
request: Request,
crossswitch_request: CrossSwitchRequest,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(require_operator)
):
"""
Execute cross-switch operation (route camera to monitor)
**Authentication Required:**
- Minimum role: Operator
- Viewers cannot execute cross-switching (read-only)
**Request Body:**
- `camera_id`: Camera ID to display (must be positive integer)
- `monitor_id`: Monitor ID to display on (must be positive integer)
- `mode`: Cross-switch mode (default: 0=normal, optional)
**Response:**
- `success`: Whether operation succeeded
- `message`: Success message
- `route`: Route information including execution details
**Side Effects:**
- Clears any existing camera on the target monitor
- Creates database record of routing change
- Creates audit log entry
- Invalidates monitor cache
**Errors:**
- `400 Bad Request`: Invalid camera or monitor ID
- `403 Forbidden`: User does not have Operator role
- `404 Not Found`: Camera or monitor not found
- `500 Internal Server Error`: SDK Bridge communication failure
"""
crossswitch_service = CrossSwitchService(db)
logger.info("execute_crossswitch_request",
user_id=str(current_user.id),
username=current_user.username,
camera_id=crossswitch_request.camera_id,
monitor_id=crossswitch_request.monitor_id,
mode=crossswitch_request.mode)
try:
result = await crossswitch_service.execute_crossswitch(
camera_id=crossswitch_request.camera_id,
monitor_id=crossswitch_request.monitor_id,
user_id=current_user.id,
username=current_user.username,
mode=crossswitch_request.mode,
ip_address=get_client_ip(request)
)
logger.info("execute_crossswitch_success",
user_id=str(current_user.id),
camera_id=crossswitch_request.camera_id,
monitor_id=crossswitch_request.monitor_id)
return result
except Exception as e:
logger.error("execute_crossswitch_failed",
user_id=str(current_user.id),
camera_id=crossswitch_request.camera_id,
monitor_id=crossswitch_request.monitor_id,
error=str(e),
exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Cross-switch operation failed: {str(e)}"
)
@router.post(
"/clear",
response_model=ClearMonitorResponse,
status_code=status.HTTP_200_OK,
summary="Clear monitor",
description="Clear camera from monitor (requires Operator role or higher)",
dependencies=[Depends(require_operator)] # Requires at least operator role
)
async def clear_monitor(
request: Request,
clear_request: ClearMonitorRequest,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(require_operator)
):
"""
Clear monitor (remove camera from monitor)
**Authentication Required:**
- Minimum role: Operator
- Viewers cannot clear monitors (read-only)
**Request Body:**
- `monitor_id`: Monitor ID to clear (must be positive integer)
**Response:**
- `success`: Whether operation succeeded
- `message`: Success message
- `monitor_id`: Monitor ID that was cleared
**Side Effects:**
- Marks existing route as cleared in database
- Creates audit log entry
- Invalidates monitor cache
**Errors:**
- `400 Bad Request`: Invalid monitor ID
- `403 Forbidden`: User does not have Operator role
- `404 Not Found`: Monitor not found
- `500 Internal Server Error`: SDK Bridge communication failure
"""
crossswitch_service = CrossSwitchService(db)
logger.info("clear_monitor_request",
user_id=str(current_user.id),
username=current_user.username,
monitor_id=clear_request.monitor_id)
try:
result = await crossswitch_service.clear_monitor(
monitor_id=clear_request.monitor_id,
user_id=current_user.id,
username=current_user.username,
ip_address=get_client_ip(request)
)
logger.info("clear_monitor_success",
user_id=str(current_user.id),
monitor_id=clear_request.monitor_id)
return result
except Exception as e:
logger.error("clear_monitor_failed",
user_id=str(current_user.id),
monitor_id=clear_request.monitor_id,
error=str(e),
exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Clear monitor operation failed: {str(e)}"
)
@router.get(
"/routing",
response_model=RoutingStateResponse,
status_code=status.HTTP_200_OK,
summary="Get routing state",
description="Get current routing state (active camera-to-monitor mappings)",
dependencies=[Depends(require_viewer)] # All authenticated users can view
)
async def get_routing_state(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(require_viewer)
):
"""
Get current routing state (active routes)
**Authentication Required:**
- Minimum role: Viewer (all authenticated users can view routing state)
**Response:**
- `routes`: List of active route objects
- `total`: Total number of active routes
**Route Object:**
- `id`: Route UUID
- `camera_id`: Camera ID
- `monitor_id`: Monitor ID
- `mode`: Cross-switch mode
- `executed_at`: When route was executed
- `executed_by`: User ID who executed
- `is_active`: Whether route is active (always true for this endpoint)
- `camera_name`: Camera name (if available)
- `monitor_name`: Monitor name (if available)
"""
crossswitch_service = CrossSwitchService(db)
logger.info("get_routing_state_request",
user_id=str(current_user.id),
username=current_user.username)
result = await crossswitch_service.get_routing_state()
logger.info("get_routing_state_response",
user_id=str(current_user.id),
count=result["total"])
return result
@router.get(
"/history",
response_model=RouteHistoryResponse,
status_code=status.HTTP_200_OK,
summary="Get routing history",
description="Get historical routing records (all routes including cleared)",
dependencies=[Depends(require_viewer)] # All authenticated users can view
)
async def get_routing_history(
limit: int = Query(100, ge=1, le=1000, description="Maximum records to return"),
offset: int = Query(0, ge=0, description="Number of records to skip"),
camera_id: Optional[int] = Query(None, gt=0, description="Filter by camera ID"),
monitor_id: Optional[int] = Query(None, gt=0, description="Filter by monitor ID"),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(require_viewer)
):
"""
Get routing history (all routes including cleared)
**Authentication Required:**
- Minimum role: Viewer
**Query Parameters:**
- `limit`: Maximum records to return (1-1000, default: 100)
- `offset`: Number of records to skip (default: 0)
- `camera_id`: Filter by camera ID (optional)
- `monitor_id`: Filter by monitor ID (optional)
**Response:**
- `history`: List of historical route objects
- `total`: Total number of historical records (before pagination)
- `limit`: Applied limit
- `offset`: Applied offset
**Use Cases:**
- Audit trail of all routing changes
- Investigate when a camera was last displayed on a monitor
- Track operator actions
"""
from typing import Optional # Import for type hints
crossswitch_service = CrossSwitchService(db)
logger.info("get_routing_history_request",
user_id=str(current_user.id),
username=current_user.username,
limit=limit,
offset=offset,
camera_id=camera_id,
monitor_id=monitor_id)
result = await crossswitch_service.get_routing_history(
limit=limit,
offset=offset,
camera_id=camera_id,
monitor_id=monitor_id
)
logger.info("get_routing_history_response",
user_id=str(current_user.id),
count=len(result["history"]),
total=result["total"])
return result

View File

@@ -0,0 +1,203 @@
"""
Cross-switch schemas for request/response validation
"""
from pydantic import BaseModel, Field, field_validator
from typing import Optional, List
from datetime import datetime
class CrossSwitchRequest(BaseModel):
"""Request schema for executing cross-switch"""
camera_id: int = Field(..., gt=0, description="Camera ID (must be positive)")
monitor_id: int = Field(..., gt=0, description="Monitor ID (must be positive)")
mode: int = Field(default=0, ge=0, description="Cross-switch mode (default: 0=normal)")
@field_validator('camera_id', 'monitor_id')
@classmethod
def validate_positive_id(cls, v: int) -> int:
"""Ensure IDs are positive"""
if v <= 0:
raise ValueError('ID must be positive')
return v
model_config = {
"json_schema_extra": {
"examples": [
{
"camera_id": 1,
"monitor_id": 1,
"mode": 0
}
]
}
}
class ClearMonitorRequest(BaseModel):
"""Request schema for clearing a monitor"""
monitor_id: int = Field(..., gt=0, description="Monitor ID to clear (must be positive)")
@field_validator('monitor_id')
@classmethod
def validate_positive_id(cls, v: int) -> int:
"""Ensure monitor ID is positive"""
if v <= 0:
raise ValueError('Monitor ID must be positive')
return v
model_config = {
"json_schema_extra": {
"examples": [
{
"monitor_id": 1
}
]
}
}
class RouteInfo(BaseModel):
"""Route information schema"""
id: str = Field(..., description="Route UUID")
camera_id: int = Field(..., description="Camera ID")
monitor_id: int = Field(..., description="Monitor ID")
mode: int = Field(default=0, description="Cross-switch mode")
executed_at: datetime = Field(..., description="When route was executed")
executed_by: Optional[str] = Field(None, description="User ID who executed the route")
executed_by_username: Optional[str] = Field(None, description="Username who executed the route")
is_active: bool = Field(..., description="Whether route is currently active")
camera_name: Optional[str] = Field(None, description="Camera name")
monitor_name: Optional[str] = Field(None, description="Monitor name")
model_config = {
"from_attributes": True,
"json_schema_extra": {
"examples": [
{
"id": "550e8400-e29b-41d4-a716-446655440000",
"camera_id": 1,
"monitor_id": 1,
"mode": 0,
"executed_at": "2025-12-09T10:30:00Z",
"executed_by": "550e8400-e29b-41d4-a716-446655440001",
"executed_by_username": "operator",
"is_active": True,
"camera_name": "Entrance Camera",
"monitor_name": "Control Room Monitor 1"
}
]
}
}
class CrossSwitchResponse(BaseModel):
"""Response schema for successful cross-switch execution"""
success: bool = Field(..., description="Whether operation succeeded")
message: str = Field(..., description="Success message")
route: RouteInfo = Field(..., description="Route information")
model_config = {
"json_schema_extra": {
"examples": [
{
"success": True,
"message": "Successfully switched camera 1 to monitor 1",
"route": {
"id": "550e8400-e29b-41d4-a716-446655440000",
"camera_id": 1,
"monitor_id": 1,
"mode": 0,
"executed_at": "2025-12-09T10:30:00Z",
"executed_by": "550e8400-e29b-41d4-a716-446655440001",
"executed_by_username": "operator",
"is_active": True,
"camera_name": "Entrance Camera",
"monitor_name": "Control Room Monitor 1"
}
}
]
}
}
class ClearMonitorResponse(BaseModel):
"""Response schema for successful clear monitor operation"""
success: bool = Field(..., description="Whether operation succeeded")
message: str = Field(..., description="Success message")
monitor_id: int = Field(..., description="Monitor ID that was cleared")
model_config = {
"json_schema_extra": {
"examples": [
{
"success": True,
"message": "Successfully cleared monitor 1",
"monitor_id": 1
}
]
}
}
class RoutingStateResponse(BaseModel):
"""Response schema for routing state query"""
routes: List[RouteInfo] = Field(..., description="List of active routes")
total: int = Field(..., description="Total number of active routes")
model_config = {
"json_schema_extra": {
"examples": [
{
"routes": [
{
"id": "550e8400-e29b-41d4-a716-446655440000",
"camera_id": 1,
"monitor_id": 1,
"mode": 0,
"executed_at": "2025-12-09T10:30:00Z",
"executed_by": "550e8400-e29b-41d4-a716-446655440001",
"executed_by_username": "operator",
"is_active": True,
"camera_name": "Entrance Camera",
"monitor_name": "Control Room Monitor 1"
}
],
"total": 1
}
]
}
}
class RouteHistoryResponse(BaseModel):
"""Response schema for routing history query"""
history: List[RouteInfo] = Field(..., description="List of historical routes")
total: int = Field(..., description="Total number of historical records")
limit: int = Field(..., description="Pagination limit")
offset: int = Field(..., description="Pagination offset")
model_config = {
"json_schema_extra": {
"examples": [
{
"history": [
{
"id": "550e8400-e29b-41d4-a716-446655440000",
"camera_id": 1,
"monitor_id": 1,
"mode": 0,
"executed_at": "2025-12-09T10:30:00Z",
"executed_by": "550e8400-e29b-41d4-a716-446655440001",
"executed_by_username": "operator",
"is_active": False,
"camera_name": "Entrance Camera",
"monitor_name": "Control Room Monitor 1"
}
],
"total": 50,
"limit": 10,
"offset": 0
}
]
}
}

View File

@@ -0,0 +1,410 @@
"""
Cross-switch service for managing camera-to-monitor routing
"""
from typing import List, Optional, Dict, Any
from datetime import datetime
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, and_, desc
import uuid
import structlog
from models.crossswitch_route import CrossSwitchRoute
from models.audit_log import AuditLog
from clients.sdk_bridge_client import sdk_bridge_client
from clients.redis_client import redis_client
logger = structlog.get_logger()
class CrossSwitchService:
"""Service for cross-switching operations"""
def __init__(self, db_session: AsyncSession):
self.db = db_session
async def execute_crossswitch(
self,
camera_id: int,
monitor_id: int,
user_id: uuid.UUID,
username: str,
mode: int = 0,
ip_address: Optional[str] = None
) -> Dict[str, Any]:
"""
Execute cross-switch operation (route camera to monitor)
Args:
camera_id: Camera ID
monitor_id: Monitor ID
user_id: User ID executing the operation
username: Username executing the operation
mode: Cross-switch mode (default: 0)
ip_address: Client IP address for audit logging
Returns:
Dictionary with success status, message, and route info
Raises:
Exception: If SDK Bridge communication fails
"""
logger.info("crossswitch_execute_request",
camera_id=camera_id,
monitor_id=monitor_id,
user_id=str(user_id),
username=username,
mode=mode)
# First, clear any existing route for this monitor
await self._clear_monitor_routes(monitor_id, user_id)
# Execute cross-switch via SDK Bridge
try:
success = await sdk_bridge_client.execute_crossswitch(
camera_id=camera_id,
monitor_id=monitor_id,
mode=mode
)
sdk_success = True
sdk_error = None
except Exception as e:
logger.error("crossswitch_sdk_failed",
camera_id=camera_id,
monitor_id=monitor_id,
error=str(e),
exc_info=True)
sdk_success = False
sdk_error = str(e)
# Get camera and monitor names for details
details = await self._get_route_details(camera_id, monitor_id)
# Create database record
route = CrossSwitchRoute.create_route(
camera_id=camera_id,
monitor_id=monitor_id,
executed_by=user_id,
mode=mode,
sdk_success=sdk_success,
sdk_error=sdk_error,
details=details
)
self.db.add(route)
await self.db.commit()
await self.db.refresh(route)
# Create audit log
await self._create_audit_log(
action="crossswitch.execute",
target=f"camera:{camera_id}->monitor:{monitor_id}",
outcome="success" if sdk_success else "failure",
details={
"camera_id": camera_id,
"monitor_id": monitor_id,
"mode": mode,
"sdk_success": sdk_success,
"sdk_error": sdk_error
},
user_id=user_id,
ip_address=ip_address
)
# Invalidate caches
await redis_client.delete("monitors:list")
await redis_client.delete(f"monitors:detail:{monitor_id}")
if not sdk_success:
logger.error("crossswitch_failed",
camera_id=camera_id,
monitor_id=monitor_id,
error=sdk_error)
raise Exception(f"Cross-switch failed: {sdk_error}")
logger.info("crossswitch_success",
camera_id=camera_id,
monitor_id=monitor_id,
route_id=str(route.id))
return {
"success": True,
"message": f"Successfully switched camera {camera_id} to monitor {monitor_id}",
"route": {
"id": str(route.id),
"camera_id": route.camera_id,
"monitor_id": route.monitor_id,
"mode": route.mode,
"executed_at": route.executed_at,
"executed_by": str(route.executed_by),
"executed_by_username": username,
"is_active": bool(route.is_active),
"camera_name": details.get("camera_name"),
"monitor_name": details.get("monitor_name")
}
}
async def clear_monitor(
self,
monitor_id: int,
user_id: uuid.UUID,
username: str,
ip_address: Optional[str] = None
) -> Dict[str, Any]:
"""
Clear monitor (remove camera from monitor)
Args:
monitor_id: Monitor ID to clear
user_id: User ID executing the operation
username: Username executing the operation
ip_address: Client IP address for audit logging
Returns:
Dictionary with success status and message
Raises:
Exception: If SDK Bridge communication fails
"""
logger.info("clear_monitor_request",
monitor_id=monitor_id,
user_id=str(user_id),
username=username)
# Execute clear via SDK Bridge
try:
success = await sdk_bridge_client.clear_monitor(monitor_id)
sdk_success = True
sdk_error = None
except Exception as e:
logger.error("clear_monitor_sdk_failed",
monitor_id=monitor_id,
error=str(e),
exc_info=True)
sdk_success = False
sdk_error = str(e)
# Mark existing routes as cleared in database
await self._clear_monitor_routes(monitor_id, user_id)
# Create audit log
await self._create_audit_log(
action="crossswitch.clear",
target=f"monitor:{monitor_id}",
outcome="success" if sdk_success else "failure",
details={
"monitor_id": monitor_id,
"sdk_success": sdk_success,
"sdk_error": sdk_error
},
user_id=user_id,
ip_address=ip_address
)
# Invalidate caches
await redis_client.delete("monitors:list")
await redis_client.delete(f"monitors:detail:{monitor_id}")
if not sdk_success:
logger.error("clear_monitor_failed",
monitor_id=monitor_id,
error=sdk_error)
raise Exception(f"Clear monitor failed: {sdk_error}")
logger.info("clear_monitor_success", monitor_id=monitor_id)
return {
"success": True,
"message": f"Successfully cleared monitor {monitor_id}",
"monitor_id": monitor_id
}
async def get_routing_state(self) -> Dict[str, Any]:
"""
Get current routing state (active routes)
Returns:
Dictionary with list of active routes
"""
logger.info("get_routing_state_request")
# Query active routes from database
result = await self.db.execute(
select(CrossSwitchRoute)
.where(CrossSwitchRoute.is_active == 1)
.order_by(desc(CrossSwitchRoute.executed_at))
)
routes = result.scalars().all()
# Transform to response format
routes_list = [
{
"id": str(route.id),
"camera_id": route.camera_id,
"monitor_id": route.monitor_id,
"mode": route.mode,
"executed_at": route.executed_at,
"executed_by": str(route.executed_by) if route.executed_by else None,
"is_active": bool(route.is_active),
"camera_name": route.details.get("camera_name") if route.details else None,
"monitor_name": route.details.get("monitor_name") if route.details else None
}
for route in routes
]
logger.info("get_routing_state_response", count=len(routes_list))
return {
"routes": routes_list,
"total": len(routes_list)
}
async def get_routing_history(
self,
limit: int = 100,
offset: int = 0,
camera_id: Optional[int] = None,
monitor_id: Optional[int] = None
) -> Dict[str, Any]:
"""
Get routing history (all routes including cleared)
Args:
limit: Maximum number of records to return
offset: Number of records to skip
camera_id: Filter by camera ID (optional)
monitor_id: Filter by monitor ID (optional)
Returns:
Dictionary with historical routes and pagination info
"""
logger.info("get_routing_history_request",
limit=limit,
offset=offset,
camera_id=camera_id,
monitor_id=monitor_id)
# Build query with optional filters
query = select(CrossSwitchRoute).order_by(desc(CrossSwitchRoute.executed_at))
if camera_id is not None:
query = query.where(CrossSwitchRoute.camera_id == camera_id)
if monitor_id is not None:
query = query.where(CrossSwitchRoute.monitor_id == monitor_id)
# Get total count
count_result = await self.db.execute(query)
total = len(count_result.scalars().all())
# Apply pagination
query = query.limit(limit).offset(offset)
result = await self.db.execute(query)
routes = result.scalars().all()
# Transform to response format
history_list = [route.to_dict() for route in routes]
logger.info("get_routing_history_response",
count=len(history_list),
total=total)
return {
"history": history_list,
"total": total,
"limit": limit,
"offset": offset
}
async def _clear_monitor_routes(self, monitor_id: int, cleared_by: uuid.UUID) -> None:
"""
Mark all active routes for a monitor as cleared
Args:
monitor_id: Monitor ID
cleared_by: User ID who is clearing the routes
"""
result = await self.db.execute(
select(CrossSwitchRoute)
.where(and_(
CrossSwitchRoute.monitor_id == monitor_id,
CrossSwitchRoute.is_active == 1
))
)
active_routes = result.scalars().all()
for route in active_routes:
route.clear_route(cleared_by)
if active_routes:
await self.db.commit()
logger.info("monitor_routes_cleared",
monitor_id=monitor_id,
count=len(active_routes))
async def _get_route_details(self, camera_id: int, monitor_id: int) -> Dict[str, Any]:
"""
Get additional details for route (camera/monitor names)
Args:
camera_id: Camera ID
monitor_id: Monitor ID
Returns:
Dictionary with camera and monitor names
"""
details = {}
try:
# Get camera name (from cache if available)
camera_data = await redis_client.get_json(f"cameras:detail:{camera_id}")
if camera_data:
details["camera_name"] = camera_data.get("name")
# Get monitor name (from cache if available)
monitor_data = await redis_client.get_json(f"monitors:detail:{monitor_id}")
if monitor_data:
details["monitor_name"] = monitor_data.get("name")
except Exception as e:
logger.warning("failed_to_get_route_details", error=str(e))
return details
async def _create_audit_log(
self,
action: str,
target: str,
outcome: str,
details: Optional[Dict[str, Any]] = None,
user_id: Optional[uuid.UUID] = None,
ip_address: Optional[str] = None
) -> None:
"""
Create audit log entry
Args:
action: Action name
target: Target of action
outcome: Outcome (success, failure, error)
details: Additional details
user_id: User ID
ip_address: Client IP address
"""
try:
audit_log = AuditLog(
user_id=user_id,
action=action,
target=target,
outcome=outcome,
details=details,
ip_address=ip_address
)
self.db.add(audit_log)
await self.db.commit()
except Exception as e:
logger.error("audit_log_creation_failed", action=action, error=str(e))
await self.db.rollback()

View File

@@ -0,0 +1,382 @@
"""
Contract tests for cross-switch API endpoints
These tests define the expected behavior - they will FAIL until implementation is complete
"""
import pytest
from httpx import AsyncClient
from fastapi import status
@pytest.mark.asyncio
class TestCrossSwitchExecution:
"""Contract tests for POST /api/v1/crossswitch"""
async def test_crossswitch_success_operator(self, async_client: AsyncClient, operator_token: str):
"""Test successful cross-switch with operator role"""
response = await async_client.post(
"/api/v1/crossswitch",
json={
"camera_id": 1,
"monitor_id": 1,
"mode": 0
},
headers={"Authorization": f"Bearer {operator_token}"}
)
assert response.status_code == status.HTTP_200_OK
data = response.json()
# Verify response structure
assert "success" in data
assert data["success"] is True
assert "message" in data
assert "route" in data
# Verify route details
route = data["route"]
assert route["camera_id"] == 1
assert route["monitor_id"] == 1
assert "executed_at" in route
assert "executed_by" in route
async def test_crossswitch_success_administrator(self, async_client: AsyncClient, auth_token: str):
"""Test successful cross-switch with administrator role"""
response = await async_client.post(
"/api/v1/crossswitch",
json={
"camera_id": 2,
"monitor_id": 2,
"mode": 0
},
headers={"Authorization": f"Bearer {auth_token}"}
)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert data["success"] is True
async def test_crossswitch_forbidden_viewer(self, async_client: AsyncClient, viewer_token: str):
"""Test that viewer role cannot execute cross-switch"""
response = await async_client.post(
"/api/v1/crossswitch",
json={
"camera_id": 1,
"monitor_id": 1,
"mode": 0
},
headers={"Authorization": f"Bearer {viewer_token}"}
)
assert response.status_code == status.HTTP_403_FORBIDDEN
data = response.json()
assert "error" in data or "detail" in data
async def test_crossswitch_no_auth(self, async_client: AsyncClient):
"""Test cross-switch without authentication"""
response = await async_client.post(
"/api/v1/crossswitch",
json={
"camera_id": 1,
"monitor_id": 1,
"mode": 0
}
)
assert response.status_code == status.HTTP_401_UNAUTHORIZED
async def test_crossswitch_invalid_camera(self, async_client: AsyncClient, operator_token: str):
"""Test cross-switch with invalid camera ID"""
response = await async_client.post(
"/api/v1/crossswitch",
json={
"camera_id": 99999, # Non-existent camera
"monitor_id": 1,
"mode": 0
},
headers={"Authorization": f"Bearer {operator_token}"}
)
# Should return 400 or 404 depending on implementation
assert response.status_code in [status.HTTP_400_BAD_REQUEST, status.HTTP_404_NOT_FOUND]
async def test_crossswitch_invalid_monitor(self, async_client: AsyncClient, operator_token: str):
"""Test cross-switch with invalid monitor ID"""
response = await async_client.post(
"/api/v1/crossswitch",
json={
"camera_id": 1,
"monitor_id": 99999, # Non-existent monitor
"mode": 0
},
headers={"Authorization": f"Bearer {operator_token}"}
)
assert response.status_code in [status.HTTP_400_BAD_REQUEST, status.HTTP_404_NOT_FOUND]
async def test_crossswitch_missing_camera_id(self, async_client: AsyncClient, operator_token: str):
"""Test cross-switch with missing camera_id"""
response = await async_client.post(
"/api/v1/crossswitch",
json={
"monitor_id": 1,
"mode": 0
},
headers={"Authorization": f"Bearer {operator_token}"}
)
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
async def test_crossswitch_missing_monitor_id(self, async_client: AsyncClient, operator_token: str):
"""Test cross-switch with missing monitor_id"""
response = await async_client.post(
"/api/v1/crossswitch",
json={
"camera_id": 1,
"mode": 0
},
headers={"Authorization": f"Bearer {operator_token}"}
)
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
async def test_crossswitch_negative_ids(self, async_client: AsyncClient, operator_token: str):
"""Test cross-switch with negative IDs"""
response = await async_client.post(
"/api/v1/crossswitch",
json={
"camera_id": -1,
"monitor_id": -1,
"mode": 0
},
headers={"Authorization": f"Bearer {operator_token}"}
)
assert response.status_code in [status.HTTP_400_BAD_REQUEST, status.HTTP_422_UNPROCESSABLE_ENTITY]
async def test_crossswitch_default_mode(self, async_client: AsyncClient, operator_token: str):
"""Test cross-switch with default mode (mode not specified)"""
response = await async_client.post(
"/api/v1/crossswitch",
json={
"camera_id": 1,
"monitor_id": 1
},
headers={"Authorization": f"Bearer {operator_token}"}
)
# Should succeed with default mode=0
assert response.status_code in [status.HTTP_200_OK, status.HTTP_400_BAD_REQUEST, status.HTTP_404_NOT_FOUND]
@pytest.mark.asyncio
class TestClearMonitor:
"""Contract tests for POST /api/v1/crossswitch/clear"""
async def test_clear_monitor_success_operator(self, async_client: AsyncClient, operator_token: str):
"""Test successful clear monitor with operator role"""
response = await async_client.post(
"/api/v1/crossswitch/clear",
json={"monitor_id": 1},
headers={"Authorization": f"Bearer {operator_token}"}
)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert "success" in data
assert data["success"] is True
assert "message" in data
assert "monitor_id" in data
assert data["monitor_id"] == 1
async def test_clear_monitor_success_administrator(self, async_client: AsyncClient, auth_token: str):
"""Test successful clear monitor with administrator role"""
response = await async_client.post(
"/api/v1/crossswitch/clear",
json={"monitor_id": 2},
headers={"Authorization": f"Bearer {auth_token}"}
)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert data["success"] is True
async def test_clear_monitor_forbidden_viewer(self, async_client: AsyncClient, viewer_token: str):
"""Test that viewer role cannot clear monitor"""
response = await async_client.post(
"/api/v1/crossswitch/clear",
json={"monitor_id": 1},
headers={"Authorization": f"Bearer {viewer_token}"}
)
assert response.status_code == status.HTTP_403_FORBIDDEN
async def test_clear_monitor_no_auth(self, async_client: AsyncClient):
"""Test clear monitor without authentication"""
response = await async_client.post(
"/api/v1/crossswitch/clear",
json={"monitor_id": 1}
)
assert response.status_code == status.HTTP_401_UNAUTHORIZED
async def test_clear_monitor_invalid_id(self, async_client: AsyncClient, operator_token: str):
"""Test clear monitor with invalid monitor ID"""
response = await async_client.post(
"/api/v1/crossswitch/clear",
json={"monitor_id": 99999},
headers={"Authorization": f"Bearer {operator_token}"}
)
assert response.status_code in [status.HTTP_400_BAD_REQUEST, status.HTTP_404_NOT_FOUND]
async def test_clear_monitor_missing_id(self, async_client: AsyncClient, operator_token: str):
"""Test clear monitor with missing monitor_id"""
response = await async_client.post(
"/api/v1/crossswitch/clear",
json={},
headers={"Authorization": f"Bearer {operator_token}"}
)
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
@pytest.mark.asyncio
class TestRoutingState:
"""Contract tests for GET /api/v1/crossswitch/routing"""
async def test_get_routing_state_viewer(self, async_client: AsyncClient, viewer_token: str):
"""Test getting routing state with viewer role"""
response = await async_client.get(
"/api/v1/crossswitch/routing",
headers={"Authorization": f"Bearer {viewer_token}"}
)
assert response.status_code == status.HTTP_200_OK
data = response.json()
# Verify response structure
assert "routes" in data
assert isinstance(data["routes"], list)
async def test_get_routing_state_operator(self, async_client: AsyncClient, operator_token: str):
"""Test getting routing state with operator role"""
response = await async_client.get(
"/api/v1/crossswitch/routing",
headers={"Authorization": f"Bearer {operator_token}"}
)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert "routes" in data
async def test_get_routing_state_no_auth(self, async_client: AsyncClient):
"""Test getting routing state without authentication"""
response = await async_client.get("/api/v1/crossswitch/routing")
assert response.status_code == status.HTTP_401_UNAUTHORIZED
async def test_routing_state_structure(self, async_client: AsyncClient, viewer_token: str):
"""Test routing state response structure"""
response = await async_client.get(
"/api/v1/crossswitch/routing",
headers={"Authorization": f"Bearer {viewer_token}"}
)
assert response.status_code == status.HTTP_200_OK
data = response.json()
# Verify structure
if data["routes"]:
route = data["routes"][0]
assert "monitor_id" in route
assert "camera_id" in route
assert "executed_at" in route
assert "executed_by" in route
@pytest.mark.asyncio
class TestRoutingHistory:
"""Contract tests for GET /api/v1/crossswitch/history"""
async def test_get_routing_history_viewer(self, async_client: AsyncClient, viewer_token: str):
"""Test getting routing history with viewer role"""
response = await async_client.get(
"/api/v1/crossswitch/history",
headers={"Authorization": f"Bearer {viewer_token}"}
)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert "history" in data
assert "total" in data
assert isinstance(data["history"], list)
async def test_get_routing_history_pagination(self, async_client: AsyncClient, viewer_token: str):
"""Test routing history with pagination"""
response = await async_client.get(
"/api/v1/crossswitch/history?limit=10&offset=0",
headers={"Authorization": f"Bearer {viewer_token}"}
)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert len(data["history"]) <= 10
async def test_get_routing_history_no_auth(self, async_client: AsyncClient):
"""Test getting routing history without authentication"""
response = await async_client.get("/api/v1/crossswitch/history")
assert response.status_code == status.HTTP_401_UNAUTHORIZED
@pytest.mark.asyncio
class TestCrossSwitchIntegration:
"""Integration tests for complete cross-switch workflow"""
async def test_crossswitch_then_query_state(self, async_client: AsyncClient, operator_token: str):
"""Test cross-switch execution followed by state query"""
# Execute cross-switch
switch_response = await async_client.post(
"/api/v1/crossswitch",
json={"camera_id": 1, "monitor_id": 1, "mode": 0},
headers={"Authorization": f"Bearer {operator_token}"}
)
if switch_response.status_code != status.HTTP_200_OK:
pytest.skip("Cross-switch not available")
# Query routing state
state_response = await async_client.get(
"/api/v1/crossswitch/routing",
headers={"Authorization": f"Bearer {operator_token}"}
)
assert state_response.status_code == status.HTTP_200_OK
routes = state_response.json()["routes"]
# Verify the route exists in state
assert any(r["monitor_id"] == 1 and r["camera_id"] == 1 for r in routes)
async def test_crossswitch_then_clear(self, async_client: AsyncClient, operator_token: str):
"""Test cross-switch followed by clear monitor"""
# Execute cross-switch
switch_response = await async_client.post(
"/api/v1/crossswitch",
json={"camera_id": 1, "monitor_id": 1, "mode": 0},
headers={"Authorization": f"Bearer {operator_token}"}
)
if switch_response.status_code != status.HTTP_200_OK:
pytest.skip("Cross-switch not available")
# Clear the monitor
clear_response = await async_client.post(
"/api/v1/crossswitch/clear",
json={"monitor_id": 1},
headers={"Authorization": f"Bearer {operator_token}"}
)
assert clear_response.status_code == status.HTTP_200_OK
assert clear_response.json()["success"] is True