# Phase 0 Research: Geutebruck Video Surveillance API **Branch**: `001-surveillance-api` | **Date**: 2025-12-08 **Research Phase** | Input: [plan.md](./plan.md) research topics --- ## Research Summary This document resolves all "NEEDS CLARIFICATION" items from the implementation plan and provides technical decisions backed by prototyping, documentation analysis, and best practices research. **Key Findings:** - ✅ GeViScope SDK integration via C# bridge service (recommended) - ✅ Video streaming via direct GeViScope URLs with token authentication - ✅ FastAPI WebSocket with Redis pub/sub for event distribution - ✅ JWT with Redis-backed sessions for authentication - ✅ Async Python with connection pooling for SDK calls - ✅ Structured logging with Prometheus metrics - ✅ Pytest with SDK mock layer for testing --- ## 1. GeViScope SDK Integration ### Research Question How should Python FastAPI integrate with the Windows-native GeViScope .NET SDK? ### Investigation Performed **Prototype**: Built working C# .NET application (GeViSoftConfigReader) that successfully: - Connects to GeViServer - Queries configuration via State Queries - Exports data to JSON - Handles all SDK dependencies **SDK Analysis**: Extracted and analyzed complete SDK documentation (1.4MB): - `GeViScope_SDK.pdf` → `GeViScope_SDK.txt` - `GeViSoft_SDK_Documentation.pdf` → `GeViSoft_SDK_Documentation.txt` **Critical Discovery**: SDK has specific requirements: - **Full GeViSoft installation** required (not just SDK) - **Visual C++ 2010 Redistributable (x86)** mandatory - **Windows Forms context** needed for .NET mixed-mode DLL loading - **x86 (32-bit) architecture** required ### Decision: C# SDK Bridge Service **Selected Approach**: Build dedicated C# Windows Service that wraps GeViScope SDK and exposes gRPC interface for Python API. **Architecture**: ``` ┌──────────────────────┐ │ Python FastAPI │ (REST/WebSocket API) │ (Any platform) │ └──────────┬───────────┘ │ gRPC/HTTP ▼ ┌──────────────────────┐ │ C# SDK Bridge │ (Windows Service) │ (GeViScope Wrapper) │ └──────────┬───────────┘ │ .NET SDK ▼ ┌──────────────────────┐ │ GeViScope SDK │ │ GeViServer │ └──────────────────────┘ ``` **Rationale**: 1. **Stability**: SDK crashes don't kill Python API (process isolation) 2. **Testability**: Python can mock gRPC interface easily 3. **Expertise**: Leverage proven C# SDK integration from GeViSoftConfigReader 4. **Performance**: Native .NET SDK calls are faster than COM interop 5. **Maintainability**: Clear separation of concerns ### Alternatives Considered **Option A: pythonnet (Direct .NET Interop)** ```python import clr clr.AddReference("GeViProcAPINET_4_0") from GEUTEBRUECK.GeViSoftSDKNET import GeViDatabase ``` - ❌ Requires Python 32-bit on Windows - ❌ SDK crashes kill Python process - ❌ Complex debugging - ✅ No additional service needed **Option B: comtypes (COM Interface)** - ❌ SDK doesn't expose COM interface (tested) - ❌ Not viable **Option C: Subprocess Calls to C# Executables** ```python subprocess.run(["GeViSoftConfigReader.exe", "args..."]) ``` - ✅ Simple isolation - ❌ High latency (process startup overhead) - ❌ No real-time event streaming - ❌ Resource intensive **Decision Matrix**: | Approach | Stability | Performance | Testability | Maintainability | **Score** | |----------|-----------|-------------|-------------|-----------------|-----------| | C# Service (gRPC) | ✅ Excellent | ✅ Fast | ✅ Easy | ✅ Clear | **SELECTED** | | pythonnet | ❌ Poor | ✅ Fast | ⚠️ Moderate | ❌ Complex | Not recommended | | Subprocess | ✅ Good | ❌ Slow | ✅ Easy | ⚠️ Moderate | Fallback option | ### Implementation Plan **C# Bridge Service** (`GeViScopeBridge`): ```csharp // gRPC service definition service GeViScopeBridge { // Connection management rpc Connect(ConnectionRequest) returns (ConnectionResponse); rpc Disconnect(DisconnectRequest) returns (DisconnectResponse); // State queries rpc GetCameras(CamerasRequest) returns (CamerasResponse); rpc GetCamera(CameraRequest) returns (CameraResponse); // Video operations rpc GetStreamUrl(StreamRequest) returns (StreamResponse); rpc SendPTZCommand(PTZRequest) returns (PTZResponse); // Events (server streaming) rpc StreamEvents(EventSubscription) returns (stream EventNotification); // Recording operations rpc StartRecording(RecordingRequest) returns (RecordingResponse); rpc StopRecording(StopRecordingRequest) returns (StopRecordingResponse); // Analytics rpc ConfigureAnalytics(AnalyticsConfig) returns (AnalyticsResponse); } ``` **Python Client** (`src/sdk/bridge.py`): ```python import grpc from sdk.proto import gevibridge_pb2, gevibridge_pb2_grpc class SDKBridge: def __init__(self, bridge_host="localhost:50051"): self.channel = grpc.insecure_channel(bridge_host) self.stub = gevibridge_pb2_grpc.GeViScopeBridgeStub(self.channel) async def get_cameras(self) -> List[Camera]: response = await self.stub.GetCameras() return [Camera.from_proto(c) for c in response.cameras] ``` ### SDK Integration Patterns (from GeViSoftConfigReader) **Connection Lifecycle**: ```csharp 1. database = new GeViDatabase(); 2. database.Create(hostname, username, password); 3. database.RegisterCallback(); // MUST be before Connect() 4. result = database.Connect(); 5. if (result != GeViConnectResult.connectOk) { /* handle error */ } 6. // Perform operations 7. database.Disconnect(); 8. database.Dispose(); ``` **State Query Pattern** (GetFirst/GetNext): ```csharp var query = new CSQGetFirstVideoInput(true, true); var answer = database.SendStateQuery(query); while (answer.AnswerKind != AnswerKind.Nothing) { var videoInput = (CSAVideoInputInfo)answer; // Process: videoInput.GlobalID, Name, HasPTZHead, etc. query = new CSQGetNextVideoInput(true, true, videoInput.GlobalID); answer = database.SendStateQuery(query); } ``` **Database Query Pattern**: ```csharp // Create query session var createQuery = new CDBQCreateActionQuery(0); var handle = (CDBAQueryHandle)database.SendDatabaseQuery(createQuery); // Retrieve records var getQuery = new CDBQGetLast(handle.Handle); var actionEntry = (CDBAActionEntry)database.SendDatabaseQuery(getQuery); ``` --- ## 2. Video Streaming Strategy ### Research Question How should the API deliver live video streams to clients? ### Investigation Performed **GeViScope Documentation Analysis**: - GeViScope SDK provides video stream URLs - Supports MJPEG, H.264, and proprietary formats - Channel-based addressing (Channel ID required) **Testing with GeViSet**: - Existing GeViSet application streams video directly from SDK - URLs typically: `http://:/stream?channel=` ### Decision: Direct SDK Stream URLs with Token Authentication **Selected Approach**: API returns authenticated stream URLs that clients connect to directly **Flow**: ``` 1. Client → API: GET /api/v1/cameras/5/stream 2. API → SDK Bridge: Request stream URL for channel 5 3. SDK Bridge → API: Returns base stream URL 4. API → Client: Returns URL with embedded JWT token 5. Client → GeViServer: Connects directly to stream URL 6. GeViServer: Validates token and streams video ``` **Example Response**: ```json { "channel_id": 5, "stream_url": "http://localhost:7703/stream?channel=5&token=eyJhbGc...", "format": "h264", "resolution": "1920x1080", "fps": 25, "expires_at": "2025-12-08T16:00:00Z" } ``` **Rationale**: 1. **Performance**: No proxy overhead, direct streaming 2. **Scalability**: API server doesn't handle video bandwidth 3. **SDK Native**: Leverages GeViScope's built-in streaming 4. **Standard**: HTTP-based streams work with all clients ### Alternatives Considered **Option A: API Proxy Streams** ```python @app.get("/cameras/{id}/stream") async def stream_camera(id: int): sdk_stream = await sdk.get_stream(id) return StreamingResponse(sdk_stream, media_type="video/h264") ``` - ❌ API becomes bandwidth bottleneck - ❌ Increased server load - ✅ Centralized authentication - **Rejected**: Doesn't scale **Option B: WebRTC Signaling** - ✅ Modern, low latency - ❌ Requires WebRTC support in GeViScope (not available) - ❌ Complex client implementation - **Rejected**: SDK doesn't support WebRTC ### Implementation Details **Token-Based Stream Authentication**: ```python # In cameras endpoint stream_token = create_stream_token( channel_id=channel_id, user_id=current_user.id, expires=timedelta(hours=1) ) stream_url = sdk_bridge.get_stream_url(channel_id) authenticated_url = f"{stream_url}&token={stream_token}" ``` **GeViServer Stream URL Format** (from SDK docs): - Base: `http://:/stream` - Parameters: `?channel=&format=&resolution=` --- ## 3. WebSocket Event Architecture ### Research Question How to deliver real-time events to 1000+ concurrent clients with <100ms latency? ### Investigation Performed **FastAPI WebSocket Research**: - Native async WebSocket support - Connection manager pattern for broadcast - Starlette WebSocket under the hood **Redis Pub/Sub Research**: - Ideal for distributed event broadcasting - Sub-millisecond message delivery - Natural fit for WebSocket fan-out ### Decision: FastAPI WebSocket + Redis Pub/Sub **Architecture**: ``` SDK Bridge (C#) │ Events ▼ Redis Pub/Sub Channel │ Subscribe ├──▶ API Instance 1 ──▶ WebSocket Clients (1-500) ├──▶ API Instance 2 ──▶ WebSocket Clients (501-1000) └──▶ API Instance N ──▶ WebSocket Clients (N+...) ``` **Implementation**: ```python # src/api/websocket.py from fastapi import WebSocket import redis.asyncio as aioredis class ConnectionManager: def __init__(self): self.active_connections: Dict[str, List[WebSocket]] = {} self.redis = aioredis.from_url("redis://localhost") async def connect(self, websocket: WebSocket, user_id: str): await websocket.accept() if user_id not in self.active_connections: self.active_connections[user_id] = [] self.active_connections[user_id].append(websocket) async def broadcast_event(self, event: Event): # Filter by permissions for user_id, connections in self.active_connections.items(): if await has_permission(user_id, event.channel_id): for websocket in connections: await websocket.send_json(event.dict()) async def listen_to_events(self): pubsub = self.redis.pubsub() await pubsub.subscribe("sdk:events") async for message in pubsub.listen(): if message["type"] == "message": event = Event.parse_raw(message["data"]) await self.broadcast_event(event) ``` **Event Subscription Protocol**: ```json // Client subscribes { "action": "subscribe", "filters": { "event_types": ["motion", "alarm"], "channels": [1, 2, 3] } } // Server sends events { "event_type": "motion", "channel_id": 2, "timestamp": "2025-12-08T14:30:00Z", "data": { "zone": "entrance", "confidence": 0.95 } } ``` **Rationale**: 1. **Scalability**: Redis pub/sub enables horizontal scaling 2. **Performance**: <1ms Redis latency + WebSocket overhead = <100ms target 3. **Simplicity**: FastAPI native WebSocket, no custom protocol needed 4. **Filtering**: Server-side filtering reduces client bandwidth ### Heartbeat & Reconnection ```python # Client-side heartbeat every 30s async def heartbeat(): while True: await websocket.send_json({"action": "ping"}) await asyncio.sleep(30) # Server responds with pong if message["action"] == "ping": await websocket.send_json({"action": "pong"}) ``` **Automatic Reconnection**: - Client exponential backoff: 1s, 2s, 4s, 8s, max 60s - Server maintains subscription state for 5 minutes - Reconnected clients receive missed critical events (buffered in Redis) --- ## 4. Authentication & Session Management ### Research Question JWT token structure, refresh strategy, and session storage design? ### Investigation Performed **FastAPI Security Best Practices**: - `python-jose` for JWT generation/validation - `passlib[bcrypt]` for password hashing - FastAPI dependency injection for auth **Redis Session Research**: - TTL-based automatic cleanup - Sub-millisecond lookups - Atomic operations for token rotation ### Decision: JWT Access + Refresh Tokens with Redis Sessions **Token Structure**: ```python # Access Token (short-lived: 1 hour) { "sub": "user_id", "username": "operator1", "role": "operator", "permissions": ["camera:1:view", "camera:1:ptz"], "exp": 1702048800, "iat": 1702045200, "jti": "unique_token_id" } # Refresh Token (long-lived: 7 days) { "sub": "user_id", "type": "refresh", "exp": 1702650000, "jti": "refresh_token_id" } ``` **Redis Session Schema**: ``` Key: "session:{user_id}:{jti}" Value: { "username": "operator1", "role": "operator", "ip_address": "192.168.1.100", "created_at": "2025-12-08T14:00:00Z", "last_activity": "2025-12-08T14:30:00Z" } TTL: 3600 (1 hour for access tokens) Key: "refresh:{user_id}:{jti}" Value: { "access_tokens": ["jti1", "jti2"], "created_at": "2025-12-08T14:00:00Z" } TTL: 604800 (7 days for refresh tokens) ``` **Authentication Flow**: ```python # Login endpoint @router.post("/auth/login") async def login(credentials: LoginRequest): user = await authenticate_user(credentials.username, credentials.password) if not user: raise HTTPException(status_code=401, detail="Invalid credentials") access_token = create_access_token(user) refresh_token = create_refresh_token(user) # Store in Redis await redis.setex( f"session:{user.id}:{access_token.jti}", 3600, json.dumps({"username": user.username, ...}) ) return { "access_token": access_token, "refresh_token": refresh_token, "token_type": "bearer", "expires_in": 3600 } ``` **Token Refresh**: ```python @router.post("/auth/refresh") async def refresh_token(token: RefreshTokenRequest): payload = verify_refresh_token(token.refresh_token) # Check if refresh token is valid in Redis refresh_data = await redis.get(f"refresh:{payload.sub}:{payload.jti}") if not refresh_data: raise HTTPException(status_code=401, detail="Invalid refresh token") # Issue new access token new_access_token = create_access_token(user) # Store new session await redis.setex(...) return {"access_token": new_access_token, "expires_in": 3600} ``` **Rationale**: 1. **Security**: Short-lived access tokens minimize exposure 2. **UX**: Refresh tokens enable "stay logged in" without re-authentication 3. **Revocation**: Redis TTL + explicit invalidation for logout 4. **Scalability**: Stateless JWT validation, Redis for revocation only --- ## 5. Performance Optimization ### Research Question How to achieve <200ms API response times and support 100+ concurrent streams? ### Investigation Performed **Python Async Patterns**: - FastAPI fully async (Starlette + Uvicorn) - `asyncio` for concurrent I/O - `aioredis` for async Redis **gRPC Performance**: - Binary protocol, faster than REST - HTTP/2 multiplexing - Streaming for events ### Decision: Async Python + Connection Pooling + Caching **Async SDK Bridge Calls**: ```python # src/sdk/bridge.py class SDKBridge: def __init__(self): self.channel_pool = grpc.aio.insecure_channel( 'localhost:50051', options=[ ('grpc.max_concurrent_streams', 100), ('grpc.keepalive_time_ms', 30000), ] ) self.stub = gevibridge_pb2_grpc.GeViScopeBridgeStub(self.channel_pool) async def get_camera(self, channel_id: int) -> Camera: # Concurrent gRPC calls response = await self.stub.GetCamera( CameraRequest(channel_id=channel_id) ) return Camera.from_proto(response) ``` **Redis Caching Layer**: ```python # Cache camera metadata (updated on events) @cache(ttl=300) # 5 minutes async def get_camera_info(channel_id: int) -> Camera: return await sdk_bridge.get_camera(channel_id) ``` **Concurrent Request Handling**: ```python # FastAPI naturally handles concurrent requests # Configure Uvicorn with workers uvicorn main:app --workers 4 --host 0.0.0.0 --port 8000 ``` **Performance Targets Validation**: | Operation | Target | Expected | Buffer | |-----------|--------|----------|--------| | Metadata queries | <200ms | ~50ms (gRPC) + ~10ms (Redis) | ✅ 3x margin | | PTZ commands | <500ms | ~100ms (gRPC + SDK) | ✅ 5x margin | | Event delivery | <100ms | ~1ms (Redis) + ~10ms (WebSocket) | ✅ 9x margin | | Stream init | <2s | ~500ms (SDK) + network | ✅ 4x margin | **Rationale**: Async + gRPC + caching provides comfortable performance margins --- ## 6. Error Handling & Monitoring ### Research Question How to translate SDK errors to HTTP codes and provide observability? ### Investigation Performed **SDK Error Analysis** (from GeViSoftConfigReader): - `GeViConnectResult` enum: `connectOk`, `connectFailed`, `connectTimeout` - Windows error codes in some SDK responses - Exception types: `FileNotFoundException`, SDK-specific exceptions **Prometheus + Grafana Research**: - Standard for API monitoring - FastAPI Prometheus middleware available - Grafana dashboards for visualization ### Decision: Structured Logging + Prometheus Metrics + Error Translation Layer **Error Translation**: ```python # src/sdk/errors.py class SDKException(Exception): def __init__(self, sdk_error_code: str, message: str): self.sdk_error_code = sdk_error_code self.message = message super().__init__(message) def translate_sdk_error(sdk_result) -> HTTPException: ERROR_MAP = { "connectFailed": (503, "SERVICE_UNAVAILABLE", "GeViServer unavailable"), "connectTimeout": (504, "GATEWAY_TIMEOUT", "Connection timeout"), "cameraOffline": (404, "CAMERA_OFFLINE", "Camera not available"), "permissionDenied": (403, "FORBIDDEN", "Insufficient permissions"), "invalidChannel": (400, "INVALID_CHANNEL", "Channel does not exist"), } status_code, error_code, message = ERROR_MAP.get( sdk_result, (500, "INTERNAL_ERROR", "Internal server error") ) return HTTPException( status_code=status_code, detail={"error_code": error_code, "message": message} ) ``` **Structured Logging**: ```python # src/core/logging.py import logging import structlog structlog.configure( processors=[ structlog.stdlib.add_log_level, structlog.stdlib.add_logger_name, structlog.processors.TimeStamper(fmt="iso"), structlog.processors.StackInfoRenderer(), structlog.processors.format_exc_info, structlog.processors.JSONRenderer() ] ) logger = structlog.get_logger() # Usage logger.info("camera_accessed", channel_id=5, user_id=123, action="view") logger.error("sdk_error", error=str(ex), channel_id=5) ``` **Prometheus Metrics**: ```python # src/api/middleware/metrics.py from prometheus_client import Counter, Histogram, Gauge http_requests_total = Counter( 'api_http_requests_total', 'Total HTTP requests', ['method', 'endpoint', 'status'] ) http_request_duration_seconds = Histogram( 'api_http_request_duration_seconds', 'HTTP request latency', ['method', 'endpoint'] ) active_websocket_connections = Gauge( 'api_websocket_connections_active', 'Active WebSocket connections' ) sdk_errors_total = Counter( 'sdk_errors_total', 'Total SDK errors', ['error_type'] ) ``` **Health Check Endpoint**: ```python @app.get("/api/v1/health") async def health_check(): checks = { "api": "healthy", "sdk_bridge": await check_sdk_connection(), "redis": await check_redis_connection(), "geviserver": await check_geviserver_status() } overall_status = "healthy" if all( v == "healthy" for v in checks.values() ) else "degraded" return { "status": overall_status, "checks": checks, "timestamp": datetime.utcnow().isoformat() } ``` **Rationale**: 1. **Clarity**: Meaningful error messages for developers 2. **Debugging**: Structured logs enable quick issue resolution 3. **Monitoring**: Prometheus metrics provide visibility 4. **Reliability**: Health checks enable load balancer decisions --- ## 7. Testing Strategy ### Research Question How to test SDK integration without hardware and achieve 80% coverage? ### Investigation Performed **Pytest Best Practices**: - `pytest-asyncio` for async tests - `pytest-mock` for mocking - Fixtures for reusable test data **SDK Mocking Strategy**: - Mock gRPC bridge interface - Simulate SDK responses - Test error scenarios ### Decision: Layered Testing with SDK Mock + Test Instance **Test Pyramid**: ``` E2E Tests (5%) ┌─────────────────────┐ │ Real SDK (optional) │ └─────────────────────┘ Integration Tests (25%) ┌─────────────────────┐ │ Mock gRPC Bridge │ └─────────────────────┘ Unit Tests (70%) ┌─────────────────────┐ │ Pure business logic│ └─────────────────────┘ ``` **SDK Mock Implementation**: ```python # tests/mocks/sdk_mock.py class MockSDKBridge: def __init__(self): self.cameras = { 1: Camera(id=1, name="Camera 1", has_ptz=True, status="online"), 2: Camera(id=2, name="Camera 2", has_ptz=False, status="online"), } self.events = [] async def get_camera(self, channel_id: int) -> Camera: if channel_id not in self.cameras: raise SDKException("invalidChannel", "Camera not found") return self.cameras[channel_id] async def send_ptz_command(self, channel_id: int, command: PTZCommand): camera = await self.get_camera(channel_id) if not camera.has_ptz: raise SDKException("noPTZSupport", "Camera has no PTZ") # Simulate command execution await asyncio.sleep(0.1) def emit_event(self, event: Event): self.events.append(event) ``` **Unit Test Example**: ```python # tests/unit/test_camera_service.py import pytest from services.camera import CameraService from tests.mocks.sdk_mock import MockSDKBridge @pytest.fixture def camera_service(): mock_bridge = MockSDKBridge() return CameraService(sdk_bridge=mock_bridge) @pytest.mark.asyncio async def test_get_camera_success(camera_service): camera = await camera_service.get_camera(1) assert camera.id == 1 assert camera.name == "Camera 1" @pytest.mark.asyncio async def test_get_camera_not_found(camera_service): with pytest.raises(SDKException) as exc_info: await camera_service.get_camera(999) assert exc_info.value.sdk_error_code == "invalidChannel" ``` **Integration Test Example**: ```python # tests/integration/test_camera_endpoints.py from httpx import AsyncClient from main import app @pytest.mark.asyncio async def test_list_cameras(authenticated_client: AsyncClient): response = await authenticated_client.get("/api/v1/cameras") assert response.status_code == 200 cameras = response.json() assert len(cameras) > 0 assert "id" in cameras[0] assert "name" in cameras[0] @pytest.mark.asyncio async def test_ptz_command_no_permission(authenticated_client: AsyncClient): response = await authenticated_client.post( "/api/v1/cameras/1/ptz", json={"action": "pan_left", "speed": 50} ) assert response.status_code == 403 ``` **Test Data Fixtures**: ```python # tests/conftest.py import pytest from tests.mocks.sdk_mock import MockSDKBridge @pytest.fixture def mock_sdk_bridge(): return MockSDKBridge() @pytest.fixture def authenticated_client(mock_sdk_bridge): # Create test client with mocked dependencies app.dependency_overrides[get_sdk_bridge] = lambda: mock_sdk_bridge async with AsyncClient(app=app, base_url="http://test") as client: # Login and get token response = await client.post("/api/v1/auth/login", json={ "username": "test_user", "password": "test_pass" }) token = response.json()["access_token"] client.headers["Authorization"] = f"Bearer {token}" yield client ``` **Coverage Configuration**: ```ini # pyproject.toml [tool.pytest.ini_options] testpaths = ["tests"] asyncio_mode = "auto" [tool.coverage.run] source = ["src"] omit = ["*/tests/*", "*/migrations/*"] [tool.coverage.report] fail_under = 80 exclude_lines = [ "pragma: no cover", "def __repr__", "raise NotImplementedError", "if __name__ == .__main__.:", ] ``` **Test Execution**: ```bash # Run all tests pytest # Run with coverage pytest --cov=src --cov-report=html # Run only unit tests pytest tests/unit # Run only integration tests pytest tests/integration ``` **Rationale**: 1. **Speed**: Unit tests run instantly without SDK 2. **Reliability**: Tests don't depend on hardware availability 3. **Coverage**: 80% coverage achievable with mocks 4. **E2E**: Optional real SDK tests for validation --- ## SDK-to-API Mapping Reference Based on GeViSoft SDK analysis, here's the mapping from SDK actions to API endpoints: | SDK Action Category | SDK Action | API Endpoint | HTTP Method | |---------------------|------------|--------------|-------------| | **SystemActions** | ConnectDB | `/auth/login` | POST | | | DisconnectDB | `/auth/logout` | POST | | **VideoActions** | GetFirstVideoInput | `/cameras` | GET | | | GetNextVideoInput | (internal pagination) | - | | | StartVideoStream | `/cameras/{id}/stream` | GET | | **CameraControlActions** | PTZControl | `/cameras/{id}/ptz` | POST | | | SetPreset | `/cameras/{id}/presets` | POST | | | GotoPreset | `/cameras/{id}/presets/{preset_id}` | POST | | **EventActions** | StartEvent | (WebSocket subscription) | WS | | | StopEvent | (WebSocket unsubscribe) | WS | | **RecordingActions** | StartRecording | `/recordings/{channel}/start` | POST | | | StopRecording | `/recordings/{channel}/stop` | POST | | | QueryRecordings | `/recordings` | GET | | **AnalyticsActions** | ConfigureVMD | `/analytics/{channel}/vmd` | PUT | | | ConfigureNPR | `/analytics/{channel}/npr` | PUT | | | ConfigureOBTRACK | `/analytics/{channel}/obtrack` | PUT | --- ## Dependencies & Prerequisites ### Development Environment **Required**: - Python 3.11+ - .NET SDK 6.0+ (for C# bridge development) - Redis 7.2+ - Visual Studio 2022 (for C# bridge) - GeViSoft full installation - GeViSoft SDK - Visual C++ 2010 Redistributable (x86) **Python Packages**: ``` # requirements.txt fastapi==0.104.1 uvicorn[standard]==0.24.0 pydantic==2.5.0 python-jose[cryptography]==3.3.0 passlib[bcrypt]==1.7.4 redis==5.0.1 grpcio==1.59.0 grpcio-tools==1.59.0 python-multipart==0.0.6 prometheus-client==0.19.0 structlog==23.2.0 # Testing pytest==7.4.3 pytest-asyncio==0.21.1 pytest-cov==4.1.0 pytest-mock==3.12.0 httpx==0.25.2 ``` **C# NuGet Packages** (Bridge Service): ```xml ``` ### Deployment Environment **Windows Server 2016+ or Windows 10/11**: - .NET Runtime 6.0+ - Python 3.11+ runtime - Redis (standalone or cluster) - GeViSoft with active license - Nginx or IIS for reverse proxy (HTTPS termination) --- ## Risk Mitigation ### High-Risk Items **1. SDK Stability** - **Risk**: C# bridge crashes take down video functionality - **Mitigation**: - Auto-restart bridge service (Windows Service recovery) - Circuit breaker pattern in Python (trip after 3 failures) - Health checks monitor bridge status - Graceful degradation (API returns cached data when bridge down) **2. Performance Under Load** - **Risk**: May not achieve 100 concurrent streams - **Mitigation**: - Load testing in Phase 2 with real hardware - Stream quality adaptation (reduce resolution/fps under load) - Connection pooling and async I/O - Horizontal scaling (multiple API instances) **3. Event Delivery Reliability** - **Risk**: WebSocket disconnections lose events - **Mitigation**: - Redis event buffer (5-minute retention) - Reconnection sends missed critical events - Event sequence numbers for gap detection - Client-side acknowledgments for critical events --- ## Phase 0 Completion Checklist - [x] GeViScope SDK integration approach decided (C# gRPC bridge) - [x] Video streaming strategy defined (direct URLs with tokens) - [x] WebSocket architecture designed (FastAPI + Redis pub/sub) - [x] Authentication system specified (JWT + Redis sessions) - [x] Performance optimization plan documented (async + caching) - [x] Error handling strategy defined (translation layer + structured logging) - [x] Testing approach designed (layered tests with mocks) - [x] SDK-to-API mappings documented - [x] Dependencies identified - [x] Risk mitigation strategies defined **Status**: ✅ **Research phase complete** - Ready for Phase 1 Design --- **Next Step**: Execute Phase 1 to generate `data-model.md`, `contracts/openapi.yaml`, and `quickstart.md`