Files
geutebruck-api/specs/001-surveillance-api/research.md
Geutebruck API Developer dd2278b39a Complete Phase 0 and Phase 1 design documentation
- Add comprehensive research.md with SDK integration decisions
- Add complete data-model.md with 7 entities and relationships
- Add OpenAPI 3.0 specification (contracts/openapi.yaml)
- Add developer quickstart.md guide
- Add comprehensive tasks.md with 215 tasks organized by user story
- Update plan.md with complete technical context
- Add SDK_INTEGRATION_LESSONS.md capturing critical knowledge
- Add .gitignore for Python and C# projects
- Include GeViScopeConfigReader and GeViSoftConfigReader tools

Phase 1 Design Complete:
 Architecture: Python FastAPI + C# gRPC Bridge + GeViScope SDK
 10 user stories mapped to tasks (MVP = US1-4)
 Complete API contract with 17 endpoints
 Data model with User, Camera, Stream, Event, Recording, Analytics
 TDD approach enforced with 80+ test tasks

Ready for Phase 2: Implementation

🤖 Generated with Claude Code (https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2025-12-09 07:39:55 +01:00

29 KiB

Phase 0 Research: Geutebruck Video Surveillance API

Branch: 001-surveillance-api | Date: 2025-12-08 Research Phase | Input: plan.md research topics


Research Summary

This document resolves all "NEEDS CLARIFICATION" items from the implementation plan and provides technical decisions backed by prototyping, documentation analysis, and best practices research.

Key Findings:

  • GeViScope SDK integration via C# bridge service (recommended)
  • Video streaming via direct GeViScope URLs with token authentication
  • FastAPI WebSocket with Redis pub/sub for event distribution
  • JWT with Redis-backed sessions for authentication
  • Async Python with connection pooling for SDK calls
  • Structured logging with Prometheus metrics
  • Pytest with SDK mock layer for testing

1. GeViScope SDK Integration

Research Question

How should Python FastAPI integrate with the Windows-native GeViScope .NET SDK?

Investigation Performed

Prototype: Built working C# .NET application (GeViSoftConfigReader) that successfully:

  • Connects to GeViServer
  • Queries configuration via State Queries
  • Exports data to JSON
  • Handles all SDK dependencies

SDK Analysis: Extracted and analyzed complete SDK documentation (1.4MB):

  • GeViScope_SDK.pdfGeViScope_SDK.txt
  • GeViSoft_SDK_Documentation.pdfGeViSoft_SDK_Documentation.txt

Critical Discovery: SDK has specific requirements:

  • Full GeViSoft installation required (not just SDK)
  • Visual C++ 2010 Redistributable (x86) mandatory
  • Windows Forms context needed for .NET mixed-mode DLL loading
  • x86 (32-bit) architecture required

Decision: C# SDK Bridge Service

Selected Approach: Build dedicated C# Windows Service that wraps GeViScope SDK and exposes gRPC interface for Python API.

Architecture:

┌──────────────────────┐
│  Python FastAPI      │  (REST/WebSocket API)
│  (Any platform)      │
└──────────┬───────────┘
           │ gRPC/HTTP
           ▼
┌──────────────────────┐
│  C# SDK Bridge       │  (Windows Service)
│  (GeViScope Wrapper) │
└──────────┬───────────┘
           │ .NET SDK
           ▼
┌──────────────────────┐
│  GeViScope SDK       │
│  GeViServer          │
└──────────────────────┘

Rationale:

  1. Stability: SDK crashes don't kill Python API (process isolation)
  2. Testability: Python can mock gRPC interface easily
  3. Expertise: Leverage proven C# SDK integration from GeViSoftConfigReader
  4. Performance: Native .NET SDK calls are faster than COM interop
  5. Maintainability: Clear separation of concerns

Alternatives Considered

Option A: pythonnet (Direct .NET Interop)

import clr
clr.AddReference("GeViProcAPINET_4_0")
from GEUTEBRUECK.GeViSoftSDKNET import GeViDatabase
  • Requires Python 32-bit on Windows
  • SDK crashes kill Python process
  • Complex debugging
  • No additional service needed

Option B: comtypes (COM Interface)

  • SDK doesn't expose COM interface (tested)
  • Not viable

Option C: Subprocess Calls to C# Executables

subprocess.run(["GeViSoftConfigReader.exe", "args..."])
  • Simple isolation
  • High latency (process startup overhead)
  • No real-time event streaming
  • Resource intensive

Decision Matrix:

Approach Stability Performance Testability Maintainability Score
C# Service (gRPC) Excellent Fast Easy Clear SELECTED
pythonnet Poor Fast ⚠️ Moderate Complex Not recommended
Subprocess Good Slow Easy ⚠️ Moderate Fallback option

Implementation Plan

C# Bridge Service (GeViScopeBridge):

// gRPC service definition
service GeViScopeBridge {
  // Connection management
  rpc Connect(ConnectionRequest) returns (ConnectionResponse);
  rpc Disconnect(DisconnectRequest) returns (DisconnectResponse);

  // State queries
  rpc GetCameras(CamerasRequest) returns (CamerasResponse);
  rpc GetCamera(CameraRequest) returns (CameraResponse);

  // Video operations
  rpc GetStreamUrl(StreamRequest) returns (StreamResponse);
  rpc SendPTZCommand(PTZRequest) returns (PTZResponse);

  // Events (server streaming)
  rpc StreamEvents(EventSubscription) returns (stream EventNotification);

  // Recording operations
  rpc StartRecording(RecordingRequest) returns (RecordingResponse);
  rpc StopRecording(StopRecordingRequest) returns (StopRecordingResponse);

  // Analytics
  rpc ConfigureAnalytics(AnalyticsConfig) returns (AnalyticsResponse);
}

Python Client (src/sdk/bridge.py):

import grpc
from sdk.proto import gevibridge_pb2, gevibridge_pb2_grpc

class SDKBridge:
    def __init__(self, bridge_host="localhost:50051"):
        self.channel = grpc.insecure_channel(bridge_host)
        self.stub = gevibridge_pb2_grpc.GeViScopeBridgeStub(self.channel)

    async def get_cameras(self) -> List[Camera]:
        response = await self.stub.GetCameras()
        return [Camera.from_proto(c) for c in response.cameras]

SDK Integration Patterns (from GeViSoftConfigReader)

Connection Lifecycle:

1. database = new GeViDatabase();
2. database.Create(hostname, username, password);
3. database.RegisterCallback();  // MUST be before Connect()
4. result = database.Connect();
5. if (result != GeViConnectResult.connectOk) { /* handle error */ }
6. // Perform operations
7. database.Disconnect();
8. database.Dispose();

State Query Pattern (GetFirst/GetNext):

var query = new CSQGetFirstVideoInput(true, true);
var answer = database.SendStateQuery(query);

while (answer.AnswerKind != AnswerKind.Nothing) {
    var videoInput = (CSAVideoInputInfo)answer;
    // Process: videoInput.GlobalID, Name, HasPTZHead, etc.

    query = new CSQGetNextVideoInput(true, true, videoInput.GlobalID);
    answer = database.SendStateQuery(query);
}

Database Query Pattern:

// Create query session
var createQuery = new CDBQCreateActionQuery(0);
var handle = (CDBAQueryHandle)database.SendDatabaseQuery(createQuery);

// Retrieve records
var getQuery = new CDBQGetLast(handle.Handle);
var actionEntry = (CDBAActionEntry)database.SendDatabaseQuery(getQuery);

2. Video Streaming Strategy

Research Question

How should the API deliver live video streams to clients?

Investigation Performed

GeViScope Documentation Analysis:

  • GeViScope SDK provides video stream URLs
  • Supports MJPEG, H.264, and proprietary formats
  • Channel-based addressing (Channel ID required)

Testing with GeViSet:

  • Existing GeViSet application streams video directly from SDK
  • URLs typically: http://<geviserver>:<port>/stream?channel=<id>

Decision: Direct SDK Stream URLs with Token Authentication

Selected Approach: API returns authenticated stream URLs that clients connect to directly

Flow:

1. Client → API: GET /api/v1/cameras/5/stream
2. API → SDK Bridge: Request stream URL for channel 5
3. SDK Bridge → API: Returns base stream URL
4. API → Client: Returns URL with embedded JWT token
5. Client → GeViServer: Connects directly to stream URL
6. GeViServer: Validates token and streams video

Example Response:

{
  "channel_id": 5,
  "stream_url": "http://localhost:7703/stream?channel=5&token=eyJhbGc...",
  "format": "h264",
  "resolution": "1920x1080",
  "fps": 25,
  "expires_at": "2025-12-08T16:00:00Z"
}

Rationale:

  1. Performance: No proxy overhead, direct streaming
  2. Scalability: API server doesn't handle video bandwidth
  3. SDK Native: Leverages GeViScope's built-in streaming
  4. Standard: HTTP-based streams work with all clients

Alternatives Considered

Option A: API Proxy Streams

@app.get("/cameras/{id}/stream")
async def stream_camera(id: int):
    sdk_stream = await sdk.get_stream(id)
    return StreamingResponse(sdk_stream, media_type="video/h264")
  • API becomes bandwidth bottleneck
  • Increased server load
  • Centralized authentication
  • Rejected: Doesn't scale

Option B: WebRTC Signaling

  • Modern, low latency
  • Requires WebRTC support in GeViScope (not available)
  • Complex client implementation
  • Rejected: SDK doesn't support WebRTC

Implementation Details

Token-Based Stream Authentication:

# In cameras endpoint
stream_token = create_stream_token(
    channel_id=channel_id,
    user_id=current_user.id,
    expires=timedelta(hours=1)
)

stream_url = sdk_bridge.get_stream_url(channel_id)
authenticated_url = f"{stream_url}&token={stream_token}"

GeViServer Stream URL Format (from SDK docs):

  • Base: http://<host>:<port>/stream
  • Parameters: ?channel=<id>&format=<mjpeg|h264>&resolution=<WxH>

3. WebSocket Event Architecture

Research Question

How to deliver real-time events to 1000+ concurrent clients with <100ms latency?

Investigation Performed

FastAPI WebSocket Research:

  • Native async WebSocket support
  • Connection manager pattern for broadcast
  • Starlette WebSocket under the hood

Redis Pub/Sub Research:

  • Ideal for distributed event broadcasting
  • Sub-millisecond message delivery
  • Natural fit for WebSocket fan-out

Decision: FastAPI WebSocket + Redis Pub/Sub

Architecture:

SDK Bridge (C#)
    │ Events
    ▼
Redis Pub/Sub Channel
    │ Subscribe
    ├──▶ API Instance 1 ──▶ WebSocket Clients (1-500)
    ├──▶ API Instance 2 ──▶ WebSocket Clients (501-1000)
    └──▶ API Instance N ──▶ WebSocket Clients (N+...)

Implementation:

# src/api/websocket.py
from fastapi import WebSocket
import redis.asyncio as aioredis

class ConnectionManager:
    def __init__(self):
        self.active_connections: Dict[str, List[WebSocket]] = {}
        self.redis = aioredis.from_url("redis://localhost")

    async def connect(self, websocket: WebSocket, user_id: str):
        await websocket.accept()
        if user_id not in self.active_connections:
            self.active_connections[user_id] = []
        self.active_connections[user_id].append(websocket)

    async def broadcast_event(self, event: Event):
        # Filter by permissions
        for user_id, connections in self.active_connections.items():
            if await has_permission(user_id, event.channel_id):
                for websocket in connections:
                    await websocket.send_json(event.dict())

    async def listen_to_events(self):
        pubsub = self.redis.pubsub()
        await pubsub.subscribe("sdk:events")

        async for message in pubsub.listen():
            if message["type"] == "message":
                event = Event.parse_raw(message["data"])
                await self.broadcast_event(event)

Event Subscription Protocol:

// Client subscribes
{
  "action": "subscribe",
  "filters": {
    "event_types": ["motion", "alarm"],
    "channels": [1, 2, 3]
  }
}

// Server sends events
{
  "event_type": "motion",
  "channel_id": 2,
  "timestamp": "2025-12-08T14:30:00Z",
  "data": {
    "zone": "entrance",
    "confidence": 0.95
  }
}

Rationale:

  1. Scalability: Redis pub/sub enables horizontal scaling
  2. Performance: <1ms Redis latency + WebSocket overhead = <100ms target
  3. Simplicity: FastAPI native WebSocket, no custom protocol needed
  4. Filtering: Server-side filtering reduces client bandwidth

Heartbeat & Reconnection

# Client-side heartbeat every 30s
async def heartbeat():
    while True:
        await websocket.send_json({"action": "ping"})
        await asyncio.sleep(30)

# Server responds with pong
if message["action"] == "ping":
    await websocket.send_json({"action": "pong"})

Automatic Reconnection:

  • Client exponential backoff: 1s, 2s, 4s, 8s, max 60s
  • Server maintains subscription state for 5 minutes
  • Reconnected clients receive missed critical events (buffered in Redis)

4. Authentication & Session Management

Research Question

JWT token structure, refresh strategy, and session storage design?

Investigation Performed

FastAPI Security Best Practices:

  • python-jose for JWT generation/validation
  • passlib[bcrypt] for password hashing
  • FastAPI dependency injection for auth

Redis Session Research:

  • TTL-based automatic cleanup
  • Sub-millisecond lookups
  • Atomic operations for token rotation

Decision: JWT Access + Refresh Tokens with Redis Sessions

Token Structure:

# Access Token (short-lived: 1 hour)
{
  "sub": "user_id",
  "username": "operator1",
  "role": "operator",
  "permissions": ["camera:1:view", "camera:1:ptz"],
  "exp": 1702048800,
  "iat": 1702045200,
  "jti": "unique_token_id"
}

# Refresh Token (long-lived: 7 days)
{
  "sub": "user_id",
  "type": "refresh",
  "exp": 1702650000,
  "jti": "refresh_token_id"
}

Redis Session Schema:

Key: "session:{user_id}:{jti}"
Value: {
  "username": "operator1",
  "role": "operator",
  "ip_address": "192.168.1.100",
  "created_at": "2025-12-08T14:00:00Z",
  "last_activity": "2025-12-08T14:30:00Z"
}
TTL: 3600 (1 hour for access tokens)

Key: "refresh:{user_id}:{jti}"
Value: {
  "access_tokens": ["jti1", "jti2"],
  "created_at": "2025-12-08T14:00:00Z"
}
TTL: 604800 (7 days for refresh tokens)

Authentication Flow:

# Login endpoint
@router.post("/auth/login")
async def login(credentials: LoginRequest):
    user = await authenticate_user(credentials.username, credentials.password)
    if not user:
        raise HTTPException(status_code=401, detail="Invalid credentials")

    access_token = create_access_token(user)
    refresh_token = create_refresh_token(user)

    # Store in Redis
    await redis.setex(
        f"session:{user.id}:{access_token.jti}",
        3600,
        json.dumps({"username": user.username, ...})
    )

    return {
        "access_token": access_token,
        "refresh_token": refresh_token,
        "token_type": "bearer",
        "expires_in": 3600
    }

Token Refresh:

@router.post("/auth/refresh")
async def refresh_token(token: RefreshTokenRequest):
    payload = verify_refresh_token(token.refresh_token)

    # Check if refresh token is valid in Redis
    refresh_data = await redis.get(f"refresh:{payload.sub}:{payload.jti}")
    if not refresh_data:
        raise HTTPException(status_code=401, detail="Invalid refresh token")

    # Issue new access token
    new_access_token = create_access_token(user)

    # Store new session
    await redis.setex(...)

    return {"access_token": new_access_token, "expires_in": 3600}

Rationale:

  1. Security: Short-lived access tokens minimize exposure
  2. UX: Refresh tokens enable "stay logged in" without re-authentication
  3. Revocation: Redis TTL + explicit invalidation for logout
  4. Scalability: Stateless JWT validation, Redis for revocation only

5. Performance Optimization

Research Question

How to achieve <200ms API response times and support 100+ concurrent streams?

Investigation Performed

Python Async Patterns:

  • FastAPI fully async (Starlette + Uvicorn)
  • asyncio for concurrent I/O
  • aioredis for async Redis

gRPC Performance:

  • Binary protocol, faster than REST
  • HTTP/2 multiplexing
  • Streaming for events

Decision: Async Python + Connection Pooling + Caching

Async SDK Bridge Calls:

# src/sdk/bridge.py
class SDKBridge:
    def __init__(self):
        self.channel_pool = grpc.aio.insecure_channel(
            'localhost:50051',
            options=[
                ('grpc.max_concurrent_streams', 100),
                ('grpc.keepalive_time_ms', 30000),
            ]
        )
        self.stub = gevibridge_pb2_grpc.GeViScopeBridgeStub(self.channel_pool)

    async def get_camera(self, channel_id: int) -> Camera:
        # Concurrent gRPC calls
        response = await self.stub.GetCamera(
            CameraRequest(channel_id=channel_id)
        )
        return Camera.from_proto(response)

Redis Caching Layer:

# Cache camera metadata (updated on events)
@cache(ttl=300)  # 5 minutes
async def get_camera_info(channel_id: int) -> Camera:
    return await sdk_bridge.get_camera(channel_id)

Concurrent Request Handling:

# FastAPI naturally handles concurrent requests
# Configure Uvicorn with workers
uvicorn main:app --workers 4 --host 0.0.0.0 --port 8000

Performance Targets Validation:

Operation Target Expected Buffer
Metadata queries <200ms ~50ms (gRPC) + ~10ms (Redis) 3x margin
PTZ commands <500ms ~100ms (gRPC + SDK) 5x margin
Event delivery <100ms ~1ms (Redis) + ~10ms (WebSocket) 9x margin
Stream init <2s ~500ms (SDK) + network 4x margin

Rationale: Async + gRPC + caching provides comfortable performance margins


6. Error Handling & Monitoring

Research Question

How to translate SDK errors to HTTP codes and provide observability?

Investigation Performed

SDK Error Analysis (from GeViSoftConfigReader):

  • GeViConnectResult enum: connectOk, connectFailed, connectTimeout
  • Windows error codes in some SDK responses
  • Exception types: FileNotFoundException, SDK-specific exceptions

Prometheus + Grafana Research:

  • Standard for API monitoring
  • FastAPI Prometheus middleware available
  • Grafana dashboards for visualization

Decision: Structured Logging + Prometheus Metrics + Error Translation Layer

Error Translation:

# src/sdk/errors.py
class SDKException(Exception):
    def __init__(self, sdk_error_code: str, message: str):
        self.sdk_error_code = sdk_error_code
        self.message = message
        super().__init__(message)

def translate_sdk_error(sdk_result) -> HTTPException:
    ERROR_MAP = {
        "connectFailed": (503, "SERVICE_UNAVAILABLE", "GeViServer unavailable"),
        "connectTimeout": (504, "GATEWAY_TIMEOUT", "Connection timeout"),
        "cameraOffline": (404, "CAMERA_OFFLINE", "Camera not available"),
        "permissionDenied": (403, "FORBIDDEN", "Insufficient permissions"),
        "invalidChannel": (400, "INVALID_CHANNEL", "Channel does not exist"),
    }

    status_code, error_code, message = ERROR_MAP.get(
        sdk_result,
        (500, "INTERNAL_ERROR", "Internal server error")
    )

    return HTTPException(
        status_code=status_code,
        detail={"error_code": error_code, "message": message}
    )

Structured Logging:

# src/core/logging.py
import logging
import structlog

structlog.configure(
    processors=[
        structlog.stdlib.add_log_level,
        structlog.stdlib.add_logger_name,
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.StackInfoRenderer(),
        structlog.processors.format_exc_info,
        structlog.processors.JSONRenderer()
    ]
)

logger = structlog.get_logger()

# Usage
logger.info("camera_accessed", channel_id=5, user_id=123, action="view")
logger.error("sdk_error", error=str(ex), channel_id=5)

Prometheus Metrics:

# src/api/middleware/metrics.py
from prometheus_client import Counter, Histogram, Gauge

http_requests_total = Counter(
    'api_http_requests_total',
    'Total HTTP requests',
    ['method', 'endpoint', 'status']
)

http_request_duration_seconds = Histogram(
    'api_http_request_duration_seconds',
    'HTTP request latency',
    ['method', 'endpoint']
)

active_websocket_connections = Gauge(
    'api_websocket_connections_active',
    'Active WebSocket connections'
)

sdk_errors_total = Counter(
    'sdk_errors_total',
    'Total SDK errors',
    ['error_type']
)

Health Check Endpoint:

@app.get("/api/v1/health")
async def health_check():
    checks = {
        "api": "healthy",
        "sdk_bridge": await check_sdk_connection(),
        "redis": await check_redis_connection(),
        "geviserver": await check_geviserver_status()
    }

    overall_status = "healthy" if all(
        v == "healthy" for v in checks.values()
    ) else "degraded"

    return {
        "status": overall_status,
        "checks": checks,
        "timestamp": datetime.utcnow().isoformat()
    }

Rationale:

  1. Clarity: Meaningful error messages for developers
  2. Debugging: Structured logs enable quick issue resolution
  3. Monitoring: Prometheus metrics provide visibility
  4. Reliability: Health checks enable load balancer decisions

7. Testing Strategy

Research Question

How to test SDK integration without hardware and achieve 80% coverage?

Investigation Performed

Pytest Best Practices:

  • pytest-asyncio for async tests
  • pytest-mock for mocking
  • Fixtures for reusable test data

SDK Mocking Strategy:

  • Mock gRPC bridge interface
  • Simulate SDK responses
  • Test error scenarios

Decision: Layered Testing with SDK Mock + Test Instance

Test Pyramid:

         E2E Tests (5%)
    ┌─────────────────────┐
    │ Real SDK (optional) │
    └─────────────────────┘

    Integration Tests (25%)
    ┌─────────────────────┐
    │  Mock gRPC Bridge   │
    └─────────────────────┘

    Unit Tests (70%)
    ┌─────────────────────┐
    │  Pure business logic│
    └─────────────────────┘

SDK Mock Implementation:

# tests/mocks/sdk_mock.py
class MockSDKBridge:
    def __init__(self):
        self.cameras = {
            1: Camera(id=1, name="Camera 1", has_ptz=True, status="online"),
            2: Camera(id=2, name="Camera 2", has_ptz=False, status="online"),
        }
        self.events = []

    async def get_camera(self, channel_id: int) -> Camera:
        if channel_id not in self.cameras:
            raise SDKException("invalidChannel", "Camera not found")
        return self.cameras[channel_id]

    async def send_ptz_command(self, channel_id: int, command: PTZCommand):
        camera = await self.get_camera(channel_id)
        if not camera.has_ptz:
            raise SDKException("noPTZSupport", "Camera has no PTZ")
        # Simulate command execution
        await asyncio.sleep(0.1)

    def emit_event(self, event: Event):
        self.events.append(event)

Unit Test Example:

# tests/unit/test_camera_service.py
import pytest
from services.camera import CameraService
from tests.mocks.sdk_mock import MockSDKBridge

@pytest.fixture
def camera_service():
    mock_bridge = MockSDKBridge()
    return CameraService(sdk_bridge=mock_bridge)

@pytest.mark.asyncio
async def test_get_camera_success(camera_service):
    camera = await camera_service.get_camera(1)
    assert camera.id == 1
    assert camera.name == "Camera 1"

@pytest.mark.asyncio
async def test_get_camera_not_found(camera_service):
    with pytest.raises(SDKException) as exc_info:
        await camera_service.get_camera(999)
    assert exc_info.value.sdk_error_code == "invalidChannel"

Integration Test Example:

# tests/integration/test_camera_endpoints.py
from httpx import AsyncClient
from main import app

@pytest.mark.asyncio
async def test_list_cameras(authenticated_client: AsyncClient):
    response = await authenticated_client.get("/api/v1/cameras")
    assert response.status_code == 200
    cameras = response.json()
    assert len(cameras) > 0
    assert "id" in cameras[0]
    assert "name" in cameras[0]

@pytest.mark.asyncio
async def test_ptz_command_no_permission(authenticated_client: AsyncClient):
    response = await authenticated_client.post(
        "/api/v1/cameras/1/ptz",
        json={"action": "pan_left", "speed": 50}
    )
    assert response.status_code == 403

Test Data Fixtures:

# tests/conftest.py
import pytest
from tests.mocks.sdk_mock import MockSDKBridge

@pytest.fixture
def mock_sdk_bridge():
    return MockSDKBridge()

@pytest.fixture
def authenticated_client(mock_sdk_bridge):
    # Create test client with mocked dependencies
    app.dependency_overrides[get_sdk_bridge] = lambda: mock_sdk_bridge

    async with AsyncClient(app=app, base_url="http://test") as client:
        # Login and get token
        response = await client.post("/api/v1/auth/login", json={
            "username": "test_user",
            "password": "test_pass"
        })
        token = response.json()["access_token"]
        client.headers["Authorization"] = f"Bearer {token}"
        yield client

Coverage Configuration:

# pyproject.toml
[tool.pytest.ini_options]
testpaths = ["tests"]
asyncio_mode = "auto"

[tool.coverage.run]
source = ["src"]
omit = ["*/tests/*", "*/migrations/*"]

[tool.coverage.report]
fail_under = 80
exclude_lines = [
    "pragma: no cover",
    "def __repr__",
    "raise NotImplementedError",
    "if __name__ == .__main__.:",
]

Test Execution:

# Run all tests
pytest

# Run with coverage
pytest --cov=src --cov-report=html

# Run only unit tests
pytest tests/unit

# Run only integration tests
pytest tests/integration

Rationale:

  1. Speed: Unit tests run instantly without SDK
  2. Reliability: Tests don't depend on hardware availability
  3. Coverage: 80% coverage achievable with mocks
  4. E2E: Optional real SDK tests for validation

SDK-to-API Mapping Reference

Based on GeViSoft SDK analysis, here's the mapping from SDK actions to API endpoints:

SDK Action Category SDK Action API Endpoint HTTP Method
SystemActions ConnectDB /auth/login POST
DisconnectDB /auth/logout POST
VideoActions GetFirstVideoInput /cameras GET
GetNextVideoInput (internal pagination) -
StartVideoStream /cameras/{id}/stream GET
CameraControlActions PTZControl /cameras/{id}/ptz POST
SetPreset /cameras/{id}/presets POST
GotoPreset /cameras/{id}/presets/{preset_id} POST
EventActions StartEvent (WebSocket subscription) WS
StopEvent (WebSocket unsubscribe) WS
RecordingActions StartRecording /recordings/{channel}/start POST
StopRecording /recordings/{channel}/stop POST
QueryRecordings /recordings GET
AnalyticsActions ConfigureVMD /analytics/{channel}/vmd PUT
ConfigureNPR /analytics/{channel}/npr PUT
ConfigureOBTRACK /analytics/{channel}/obtrack PUT

Dependencies & Prerequisites

Development Environment

Required:

  • Python 3.11+
  • .NET SDK 6.0+ (for C# bridge development)
  • Redis 7.2+
  • Visual Studio 2022 (for C# bridge)
  • GeViSoft full installation
  • GeViSoft SDK
  • Visual C++ 2010 Redistributable (x86)

Python Packages:

# requirements.txt
fastapi==0.104.1
uvicorn[standard]==0.24.0
pydantic==2.5.0
python-jose[cryptography]==3.3.0
passlib[bcrypt]==1.7.4
redis==5.0.1
grpcio==1.59.0
grpcio-tools==1.59.0
python-multipart==0.0.6
prometheus-client==0.19.0
structlog==23.2.0

# Testing
pytest==7.4.3
pytest-asyncio==0.21.1
pytest-cov==4.1.0
pytest-mock==3.12.0
httpx==0.25.2

C# NuGet Packages (Bridge Service):

<PackageReference Include="Grpc.AspNetCore" Version="2.59.0" />
<PackageReference Include="Google.Protobuf" Version="3.25.0" />
<PackageReference Include="Grpc.Tools" Version="2.59.0" />

Deployment Environment

Windows Server 2016+ or Windows 10/11:

  • .NET Runtime 6.0+
  • Python 3.11+ runtime
  • Redis (standalone or cluster)
  • GeViSoft with active license
  • Nginx or IIS for reverse proxy (HTTPS termination)

Risk Mitigation

High-Risk Items

1. SDK Stability

  • Risk: C# bridge crashes take down video functionality
  • Mitigation:
    • Auto-restart bridge service (Windows Service recovery)
    • Circuit breaker pattern in Python (trip after 3 failures)
    • Health checks monitor bridge status
    • Graceful degradation (API returns cached data when bridge down)

2. Performance Under Load

  • Risk: May not achieve 100 concurrent streams
  • Mitigation:
    • Load testing in Phase 2 with real hardware
    • Stream quality adaptation (reduce resolution/fps under load)
    • Connection pooling and async I/O
    • Horizontal scaling (multiple API instances)

3. Event Delivery Reliability

  • Risk: WebSocket disconnections lose events
  • Mitigation:
    • Redis event buffer (5-minute retention)
    • Reconnection sends missed critical events
    • Event sequence numbers for gap detection
    • Client-side acknowledgments for critical events

Phase 0 Completion Checklist

  • GeViScope SDK integration approach decided (C# gRPC bridge)
  • Video streaming strategy defined (direct URLs with tokens)
  • WebSocket architecture designed (FastAPI + Redis pub/sub)
  • Authentication system specified (JWT + Redis sessions)
  • Performance optimization plan documented (async + caching)
  • Error handling strategy defined (translation layer + structured logging)
  • Testing approach designed (layered tests with mocks)
  • SDK-to-API mappings documented
  • Dependencies identified
  • Risk mitigation strategies defined

Status: Research phase complete - Ready for Phase 1 Design


Next Step: Execute Phase 1 to generate data-model.md, contracts/openapi.yaml, and quickstart.md