- Add comprehensive research.md with SDK integration decisions - Add complete data-model.md with 7 entities and relationships - Add OpenAPI 3.0 specification (contracts/openapi.yaml) - Add developer quickstart.md guide - Add comprehensive tasks.md with 215 tasks organized by user story - Update plan.md with complete technical context - Add SDK_INTEGRATION_LESSONS.md capturing critical knowledge - Add .gitignore for Python and C# projects - Include GeViScopeConfigReader and GeViSoftConfigReader tools Phase 1 Design Complete: ✅ Architecture: Python FastAPI + C# gRPC Bridge + GeViScope SDK ✅ 10 user stories mapped to tasks (MVP = US1-4) ✅ Complete API contract with 17 endpoints ✅ Data model with User, Camera, Stream, Event, Recording, Analytics ✅ TDD approach enforced with 80+ test tasks Ready for Phase 2: Implementation 🤖 Generated with Claude Code (https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
29 KiB
Phase 0 Research: Geutebruck Video Surveillance API
Branch: 001-surveillance-api | Date: 2025-12-08
Research Phase | Input: plan.md research topics
Research Summary
This document resolves all "NEEDS CLARIFICATION" items from the implementation plan and provides technical decisions backed by prototyping, documentation analysis, and best practices research.
Key Findings:
- ✅ GeViScope SDK integration via C# bridge service (recommended)
- ✅ Video streaming via direct GeViScope URLs with token authentication
- ✅ FastAPI WebSocket with Redis pub/sub for event distribution
- ✅ JWT with Redis-backed sessions for authentication
- ✅ Async Python with connection pooling for SDK calls
- ✅ Structured logging with Prometheus metrics
- ✅ Pytest with SDK mock layer for testing
1. GeViScope SDK Integration
Research Question
How should Python FastAPI integrate with the Windows-native GeViScope .NET SDK?
Investigation Performed
Prototype: Built working C# .NET application (GeViSoftConfigReader) that successfully:
- Connects to GeViServer
- Queries configuration via State Queries
- Exports data to JSON
- Handles all SDK dependencies
SDK Analysis: Extracted and analyzed complete SDK documentation (1.4MB):
GeViScope_SDK.pdf→GeViScope_SDK.txtGeViSoft_SDK_Documentation.pdf→GeViSoft_SDK_Documentation.txt
Critical Discovery: SDK has specific requirements:
- Full GeViSoft installation required (not just SDK)
- Visual C++ 2010 Redistributable (x86) mandatory
- Windows Forms context needed for .NET mixed-mode DLL loading
- x86 (32-bit) architecture required
Decision: C# SDK Bridge Service
Selected Approach: Build dedicated C# Windows Service that wraps GeViScope SDK and exposes gRPC interface for Python API.
Architecture:
┌──────────────────────┐
│ Python FastAPI │ (REST/WebSocket API)
│ (Any platform) │
└──────────┬───────────┘
│ gRPC/HTTP
▼
┌──────────────────────┐
│ C# SDK Bridge │ (Windows Service)
│ (GeViScope Wrapper) │
└──────────┬───────────┘
│ .NET SDK
▼
┌──────────────────────┐
│ GeViScope SDK │
│ GeViServer │
└──────────────────────┘
Rationale:
- Stability: SDK crashes don't kill Python API (process isolation)
- Testability: Python can mock gRPC interface easily
- Expertise: Leverage proven C# SDK integration from GeViSoftConfigReader
- Performance: Native .NET SDK calls are faster than COM interop
- Maintainability: Clear separation of concerns
Alternatives Considered
Option A: pythonnet (Direct .NET Interop)
import clr
clr.AddReference("GeViProcAPINET_4_0")
from GEUTEBRUECK.GeViSoftSDKNET import GeViDatabase
- ❌ Requires Python 32-bit on Windows
- ❌ SDK crashes kill Python process
- ❌ Complex debugging
- ✅ No additional service needed
Option B: comtypes (COM Interface)
- ❌ SDK doesn't expose COM interface (tested)
- ❌ Not viable
Option C: Subprocess Calls to C# Executables
subprocess.run(["GeViSoftConfigReader.exe", "args..."])
- ✅ Simple isolation
- ❌ High latency (process startup overhead)
- ❌ No real-time event streaming
- ❌ Resource intensive
Decision Matrix:
| Approach | Stability | Performance | Testability | Maintainability | Score |
|---|---|---|---|---|---|
| C# Service (gRPC) | ✅ Excellent | ✅ Fast | ✅ Easy | ✅ Clear | SELECTED |
| pythonnet | ❌ Poor | ✅ Fast | ⚠️ Moderate | ❌ Complex | Not recommended |
| Subprocess | ✅ Good | ❌ Slow | ✅ Easy | ⚠️ Moderate | Fallback option |
Implementation Plan
C# Bridge Service (GeViScopeBridge):
// gRPC service definition
service GeViScopeBridge {
// Connection management
rpc Connect(ConnectionRequest) returns (ConnectionResponse);
rpc Disconnect(DisconnectRequest) returns (DisconnectResponse);
// State queries
rpc GetCameras(CamerasRequest) returns (CamerasResponse);
rpc GetCamera(CameraRequest) returns (CameraResponse);
// Video operations
rpc GetStreamUrl(StreamRequest) returns (StreamResponse);
rpc SendPTZCommand(PTZRequest) returns (PTZResponse);
// Events (server streaming)
rpc StreamEvents(EventSubscription) returns (stream EventNotification);
// Recording operations
rpc StartRecording(RecordingRequest) returns (RecordingResponse);
rpc StopRecording(StopRecordingRequest) returns (StopRecordingResponse);
// Analytics
rpc ConfigureAnalytics(AnalyticsConfig) returns (AnalyticsResponse);
}
Python Client (src/sdk/bridge.py):
import grpc
from sdk.proto import gevibridge_pb2, gevibridge_pb2_grpc
class SDKBridge:
def __init__(self, bridge_host="localhost:50051"):
self.channel = grpc.insecure_channel(bridge_host)
self.stub = gevibridge_pb2_grpc.GeViScopeBridgeStub(self.channel)
async def get_cameras(self) -> List[Camera]:
response = await self.stub.GetCameras()
return [Camera.from_proto(c) for c in response.cameras]
SDK Integration Patterns (from GeViSoftConfigReader)
Connection Lifecycle:
1. database = new GeViDatabase();
2. database.Create(hostname, username, password);
3. database.RegisterCallback(); // MUST be before Connect()
4. result = database.Connect();
5. if (result != GeViConnectResult.connectOk) { /* handle error */ }
6. // Perform operations
7. database.Disconnect();
8. database.Dispose();
State Query Pattern (GetFirst/GetNext):
var query = new CSQGetFirstVideoInput(true, true);
var answer = database.SendStateQuery(query);
while (answer.AnswerKind != AnswerKind.Nothing) {
var videoInput = (CSAVideoInputInfo)answer;
// Process: videoInput.GlobalID, Name, HasPTZHead, etc.
query = new CSQGetNextVideoInput(true, true, videoInput.GlobalID);
answer = database.SendStateQuery(query);
}
Database Query Pattern:
// Create query session
var createQuery = new CDBQCreateActionQuery(0);
var handle = (CDBAQueryHandle)database.SendDatabaseQuery(createQuery);
// Retrieve records
var getQuery = new CDBQGetLast(handle.Handle);
var actionEntry = (CDBAActionEntry)database.SendDatabaseQuery(getQuery);
2. Video Streaming Strategy
Research Question
How should the API deliver live video streams to clients?
Investigation Performed
GeViScope Documentation Analysis:
- GeViScope SDK provides video stream URLs
- Supports MJPEG, H.264, and proprietary formats
- Channel-based addressing (Channel ID required)
Testing with GeViSet:
- Existing GeViSet application streams video directly from SDK
- URLs typically:
http://<geviserver>:<port>/stream?channel=<id>
Decision: Direct SDK Stream URLs with Token Authentication
Selected Approach: API returns authenticated stream URLs that clients connect to directly
Flow:
1. Client → API: GET /api/v1/cameras/5/stream
2. API → SDK Bridge: Request stream URL for channel 5
3. SDK Bridge → API: Returns base stream URL
4. API → Client: Returns URL with embedded JWT token
5. Client → GeViServer: Connects directly to stream URL
6. GeViServer: Validates token and streams video
Example Response:
{
"channel_id": 5,
"stream_url": "http://localhost:7703/stream?channel=5&token=eyJhbGc...",
"format": "h264",
"resolution": "1920x1080",
"fps": 25,
"expires_at": "2025-12-08T16:00:00Z"
}
Rationale:
- Performance: No proxy overhead, direct streaming
- Scalability: API server doesn't handle video bandwidth
- SDK Native: Leverages GeViScope's built-in streaming
- Standard: HTTP-based streams work with all clients
Alternatives Considered
Option A: API Proxy Streams
@app.get("/cameras/{id}/stream")
async def stream_camera(id: int):
sdk_stream = await sdk.get_stream(id)
return StreamingResponse(sdk_stream, media_type="video/h264")
- ❌ API becomes bandwidth bottleneck
- ❌ Increased server load
- ✅ Centralized authentication
- Rejected: Doesn't scale
Option B: WebRTC Signaling
- ✅ Modern, low latency
- ❌ Requires WebRTC support in GeViScope (not available)
- ❌ Complex client implementation
- Rejected: SDK doesn't support WebRTC
Implementation Details
Token-Based Stream Authentication:
# In cameras endpoint
stream_token = create_stream_token(
channel_id=channel_id,
user_id=current_user.id,
expires=timedelta(hours=1)
)
stream_url = sdk_bridge.get_stream_url(channel_id)
authenticated_url = f"{stream_url}&token={stream_token}"
GeViServer Stream URL Format (from SDK docs):
- Base:
http://<host>:<port>/stream - Parameters:
?channel=<id>&format=<mjpeg|h264>&resolution=<WxH>
3. WebSocket Event Architecture
Research Question
How to deliver real-time events to 1000+ concurrent clients with <100ms latency?
Investigation Performed
FastAPI WebSocket Research:
- Native async WebSocket support
- Connection manager pattern for broadcast
- Starlette WebSocket under the hood
Redis Pub/Sub Research:
- Ideal for distributed event broadcasting
- Sub-millisecond message delivery
- Natural fit for WebSocket fan-out
Decision: FastAPI WebSocket + Redis Pub/Sub
Architecture:
SDK Bridge (C#)
│ Events
▼
Redis Pub/Sub Channel
│ Subscribe
├──▶ API Instance 1 ──▶ WebSocket Clients (1-500)
├──▶ API Instance 2 ──▶ WebSocket Clients (501-1000)
└──▶ API Instance N ──▶ WebSocket Clients (N+...)
Implementation:
# src/api/websocket.py
from fastapi import WebSocket
import redis.asyncio as aioredis
class ConnectionManager:
def __init__(self):
self.active_connections: Dict[str, List[WebSocket]] = {}
self.redis = aioredis.from_url("redis://localhost")
async def connect(self, websocket: WebSocket, user_id: str):
await websocket.accept()
if user_id not in self.active_connections:
self.active_connections[user_id] = []
self.active_connections[user_id].append(websocket)
async def broadcast_event(self, event: Event):
# Filter by permissions
for user_id, connections in self.active_connections.items():
if await has_permission(user_id, event.channel_id):
for websocket in connections:
await websocket.send_json(event.dict())
async def listen_to_events(self):
pubsub = self.redis.pubsub()
await pubsub.subscribe("sdk:events")
async for message in pubsub.listen():
if message["type"] == "message":
event = Event.parse_raw(message["data"])
await self.broadcast_event(event)
Event Subscription Protocol:
// Client subscribes
{
"action": "subscribe",
"filters": {
"event_types": ["motion", "alarm"],
"channels": [1, 2, 3]
}
}
// Server sends events
{
"event_type": "motion",
"channel_id": 2,
"timestamp": "2025-12-08T14:30:00Z",
"data": {
"zone": "entrance",
"confidence": 0.95
}
}
Rationale:
- Scalability: Redis pub/sub enables horizontal scaling
- Performance: <1ms Redis latency + WebSocket overhead = <100ms target
- Simplicity: FastAPI native WebSocket, no custom protocol needed
- Filtering: Server-side filtering reduces client bandwidth
Heartbeat & Reconnection
# Client-side heartbeat every 30s
async def heartbeat():
while True:
await websocket.send_json({"action": "ping"})
await asyncio.sleep(30)
# Server responds with pong
if message["action"] == "ping":
await websocket.send_json({"action": "pong"})
Automatic Reconnection:
- Client exponential backoff: 1s, 2s, 4s, 8s, max 60s
- Server maintains subscription state for 5 minutes
- Reconnected clients receive missed critical events (buffered in Redis)
4. Authentication & Session Management
Research Question
JWT token structure, refresh strategy, and session storage design?
Investigation Performed
FastAPI Security Best Practices:
python-josefor JWT generation/validationpasslib[bcrypt]for password hashing- FastAPI dependency injection for auth
Redis Session Research:
- TTL-based automatic cleanup
- Sub-millisecond lookups
- Atomic operations for token rotation
Decision: JWT Access + Refresh Tokens with Redis Sessions
Token Structure:
# Access Token (short-lived: 1 hour)
{
"sub": "user_id",
"username": "operator1",
"role": "operator",
"permissions": ["camera:1:view", "camera:1:ptz"],
"exp": 1702048800,
"iat": 1702045200,
"jti": "unique_token_id"
}
# Refresh Token (long-lived: 7 days)
{
"sub": "user_id",
"type": "refresh",
"exp": 1702650000,
"jti": "refresh_token_id"
}
Redis Session Schema:
Key: "session:{user_id}:{jti}"
Value: {
"username": "operator1",
"role": "operator",
"ip_address": "192.168.1.100",
"created_at": "2025-12-08T14:00:00Z",
"last_activity": "2025-12-08T14:30:00Z"
}
TTL: 3600 (1 hour for access tokens)
Key: "refresh:{user_id}:{jti}"
Value: {
"access_tokens": ["jti1", "jti2"],
"created_at": "2025-12-08T14:00:00Z"
}
TTL: 604800 (7 days for refresh tokens)
Authentication Flow:
# Login endpoint
@router.post("/auth/login")
async def login(credentials: LoginRequest):
user = await authenticate_user(credentials.username, credentials.password)
if not user:
raise HTTPException(status_code=401, detail="Invalid credentials")
access_token = create_access_token(user)
refresh_token = create_refresh_token(user)
# Store in Redis
await redis.setex(
f"session:{user.id}:{access_token.jti}",
3600,
json.dumps({"username": user.username, ...})
)
return {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "bearer",
"expires_in": 3600
}
Token Refresh:
@router.post("/auth/refresh")
async def refresh_token(token: RefreshTokenRequest):
payload = verify_refresh_token(token.refresh_token)
# Check if refresh token is valid in Redis
refresh_data = await redis.get(f"refresh:{payload.sub}:{payload.jti}")
if not refresh_data:
raise HTTPException(status_code=401, detail="Invalid refresh token")
# Issue new access token
new_access_token = create_access_token(user)
# Store new session
await redis.setex(...)
return {"access_token": new_access_token, "expires_in": 3600}
Rationale:
- Security: Short-lived access tokens minimize exposure
- UX: Refresh tokens enable "stay logged in" without re-authentication
- Revocation: Redis TTL + explicit invalidation for logout
- Scalability: Stateless JWT validation, Redis for revocation only
5. Performance Optimization
Research Question
How to achieve <200ms API response times and support 100+ concurrent streams?
Investigation Performed
Python Async Patterns:
- FastAPI fully async (Starlette + Uvicorn)
asynciofor concurrent I/Oaioredisfor async Redis
gRPC Performance:
- Binary protocol, faster than REST
- HTTP/2 multiplexing
- Streaming for events
Decision: Async Python + Connection Pooling + Caching
Async SDK Bridge Calls:
# src/sdk/bridge.py
class SDKBridge:
def __init__(self):
self.channel_pool = grpc.aio.insecure_channel(
'localhost:50051',
options=[
('grpc.max_concurrent_streams', 100),
('grpc.keepalive_time_ms', 30000),
]
)
self.stub = gevibridge_pb2_grpc.GeViScopeBridgeStub(self.channel_pool)
async def get_camera(self, channel_id: int) -> Camera:
# Concurrent gRPC calls
response = await self.stub.GetCamera(
CameraRequest(channel_id=channel_id)
)
return Camera.from_proto(response)
Redis Caching Layer:
# Cache camera metadata (updated on events)
@cache(ttl=300) # 5 minutes
async def get_camera_info(channel_id: int) -> Camera:
return await sdk_bridge.get_camera(channel_id)
Concurrent Request Handling:
# FastAPI naturally handles concurrent requests
# Configure Uvicorn with workers
uvicorn main:app --workers 4 --host 0.0.0.0 --port 8000
Performance Targets Validation:
| Operation | Target | Expected | Buffer |
|---|---|---|---|
| Metadata queries | <200ms | ~50ms (gRPC) + ~10ms (Redis) | ✅ 3x margin |
| PTZ commands | <500ms | ~100ms (gRPC + SDK) | ✅ 5x margin |
| Event delivery | <100ms | ~1ms (Redis) + ~10ms (WebSocket) | ✅ 9x margin |
| Stream init | <2s | ~500ms (SDK) + network | ✅ 4x margin |
Rationale: Async + gRPC + caching provides comfortable performance margins
6. Error Handling & Monitoring
Research Question
How to translate SDK errors to HTTP codes and provide observability?
Investigation Performed
SDK Error Analysis (from GeViSoftConfigReader):
GeViConnectResultenum:connectOk,connectFailed,connectTimeout- Windows error codes in some SDK responses
- Exception types:
FileNotFoundException, SDK-specific exceptions
Prometheus + Grafana Research:
- Standard for API monitoring
- FastAPI Prometheus middleware available
- Grafana dashboards for visualization
Decision: Structured Logging + Prometheus Metrics + Error Translation Layer
Error Translation:
# src/sdk/errors.py
class SDKException(Exception):
def __init__(self, sdk_error_code: str, message: str):
self.sdk_error_code = sdk_error_code
self.message = message
super().__init__(message)
def translate_sdk_error(sdk_result) -> HTTPException:
ERROR_MAP = {
"connectFailed": (503, "SERVICE_UNAVAILABLE", "GeViServer unavailable"),
"connectTimeout": (504, "GATEWAY_TIMEOUT", "Connection timeout"),
"cameraOffline": (404, "CAMERA_OFFLINE", "Camera not available"),
"permissionDenied": (403, "FORBIDDEN", "Insufficient permissions"),
"invalidChannel": (400, "INVALID_CHANNEL", "Channel does not exist"),
}
status_code, error_code, message = ERROR_MAP.get(
sdk_result,
(500, "INTERNAL_ERROR", "Internal server error")
)
return HTTPException(
status_code=status_code,
detail={"error_code": error_code, "message": message}
)
Structured Logging:
# src/core/logging.py
import logging
import structlog
structlog.configure(
processors=[
structlog.stdlib.add_log_level,
structlog.stdlib.add_logger_name,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.JSONRenderer()
]
)
logger = structlog.get_logger()
# Usage
logger.info("camera_accessed", channel_id=5, user_id=123, action="view")
logger.error("sdk_error", error=str(ex), channel_id=5)
Prometheus Metrics:
# src/api/middleware/metrics.py
from prometheus_client import Counter, Histogram, Gauge
http_requests_total = Counter(
'api_http_requests_total',
'Total HTTP requests',
['method', 'endpoint', 'status']
)
http_request_duration_seconds = Histogram(
'api_http_request_duration_seconds',
'HTTP request latency',
['method', 'endpoint']
)
active_websocket_connections = Gauge(
'api_websocket_connections_active',
'Active WebSocket connections'
)
sdk_errors_total = Counter(
'sdk_errors_total',
'Total SDK errors',
['error_type']
)
Health Check Endpoint:
@app.get("/api/v1/health")
async def health_check():
checks = {
"api": "healthy",
"sdk_bridge": await check_sdk_connection(),
"redis": await check_redis_connection(),
"geviserver": await check_geviserver_status()
}
overall_status = "healthy" if all(
v == "healthy" for v in checks.values()
) else "degraded"
return {
"status": overall_status,
"checks": checks,
"timestamp": datetime.utcnow().isoformat()
}
Rationale:
- Clarity: Meaningful error messages for developers
- Debugging: Structured logs enable quick issue resolution
- Monitoring: Prometheus metrics provide visibility
- Reliability: Health checks enable load balancer decisions
7. Testing Strategy
Research Question
How to test SDK integration without hardware and achieve 80% coverage?
Investigation Performed
Pytest Best Practices:
pytest-asynciofor async testspytest-mockfor mocking- Fixtures for reusable test data
SDK Mocking Strategy:
- Mock gRPC bridge interface
- Simulate SDK responses
- Test error scenarios
Decision: Layered Testing with SDK Mock + Test Instance
Test Pyramid:
E2E Tests (5%)
┌─────────────────────┐
│ Real SDK (optional) │
└─────────────────────┘
Integration Tests (25%)
┌─────────────────────┐
│ Mock gRPC Bridge │
└─────────────────────┘
Unit Tests (70%)
┌─────────────────────┐
│ Pure business logic│
└─────────────────────┘
SDK Mock Implementation:
# tests/mocks/sdk_mock.py
class MockSDKBridge:
def __init__(self):
self.cameras = {
1: Camera(id=1, name="Camera 1", has_ptz=True, status="online"),
2: Camera(id=2, name="Camera 2", has_ptz=False, status="online"),
}
self.events = []
async def get_camera(self, channel_id: int) -> Camera:
if channel_id not in self.cameras:
raise SDKException("invalidChannel", "Camera not found")
return self.cameras[channel_id]
async def send_ptz_command(self, channel_id: int, command: PTZCommand):
camera = await self.get_camera(channel_id)
if not camera.has_ptz:
raise SDKException("noPTZSupport", "Camera has no PTZ")
# Simulate command execution
await asyncio.sleep(0.1)
def emit_event(self, event: Event):
self.events.append(event)
Unit Test Example:
# tests/unit/test_camera_service.py
import pytest
from services.camera import CameraService
from tests.mocks.sdk_mock import MockSDKBridge
@pytest.fixture
def camera_service():
mock_bridge = MockSDKBridge()
return CameraService(sdk_bridge=mock_bridge)
@pytest.mark.asyncio
async def test_get_camera_success(camera_service):
camera = await camera_service.get_camera(1)
assert camera.id == 1
assert camera.name == "Camera 1"
@pytest.mark.asyncio
async def test_get_camera_not_found(camera_service):
with pytest.raises(SDKException) as exc_info:
await camera_service.get_camera(999)
assert exc_info.value.sdk_error_code == "invalidChannel"
Integration Test Example:
# tests/integration/test_camera_endpoints.py
from httpx import AsyncClient
from main import app
@pytest.mark.asyncio
async def test_list_cameras(authenticated_client: AsyncClient):
response = await authenticated_client.get("/api/v1/cameras")
assert response.status_code == 200
cameras = response.json()
assert len(cameras) > 0
assert "id" in cameras[0]
assert "name" in cameras[0]
@pytest.mark.asyncio
async def test_ptz_command_no_permission(authenticated_client: AsyncClient):
response = await authenticated_client.post(
"/api/v1/cameras/1/ptz",
json={"action": "pan_left", "speed": 50}
)
assert response.status_code == 403
Test Data Fixtures:
# tests/conftest.py
import pytest
from tests.mocks.sdk_mock import MockSDKBridge
@pytest.fixture
def mock_sdk_bridge():
return MockSDKBridge()
@pytest.fixture
def authenticated_client(mock_sdk_bridge):
# Create test client with mocked dependencies
app.dependency_overrides[get_sdk_bridge] = lambda: mock_sdk_bridge
async with AsyncClient(app=app, base_url="http://test") as client:
# Login and get token
response = await client.post("/api/v1/auth/login", json={
"username": "test_user",
"password": "test_pass"
})
token = response.json()["access_token"]
client.headers["Authorization"] = f"Bearer {token}"
yield client
Coverage Configuration:
# pyproject.toml
[tool.pytest.ini_options]
testpaths = ["tests"]
asyncio_mode = "auto"
[tool.coverage.run]
source = ["src"]
omit = ["*/tests/*", "*/migrations/*"]
[tool.coverage.report]
fail_under = 80
exclude_lines = [
"pragma: no cover",
"def __repr__",
"raise NotImplementedError",
"if __name__ == .__main__.:",
]
Test Execution:
# Run all tests
pytest
# Run with coverage
pytest --cov=src --cov-report=html
# Run only unit tests
pytest tests/unit
# Run only integration tests
pytest tests/integration
Rationale:
- Speed: Unit tests run instantly without SDK
- Reliability: Tests don't depend on hardware availability
- Coverage: 80% coverage achievable with mocks
- E2E: Optional real SDK tests for validation
SDK-to-API Mapping Reference
Based on GeViSoft SDK analysis, here's the mapping from SDK actions to API endpoints:
| SDK Action Category | SDK Action | API Endpoint | HTTP Method |
|---|---|---|---|
| SystemActions | ConnectDB | /auth/login |
POST |
| DisconnectDB | /auth/logout |
POST | |
| VideoActions | GetFirstVideoInput | /cameras |
GET |
| GetNextVideoInput | (internal pagination) | - | |
| StartVideoStream | /cameras/{id}/stream |
GET | |
| CameraControlActions | PTZControl | /cameras/{id}/ptz |
POST |
| SetPreset | /cameras/{id}/presets |
POST | |
| GotoPreset | /cameras/{id}/presets/{preset_id} |
POST | |
| EventActions | StartEvent | (WebSocket subscription) | WS |
| StopEvent | (WebSocket unsubscribe) | WS | |
| RecordingActions | StartRecording | /recordings/{channel}/start |
POST |
| StopRecording | /recordings/{channel}/stop |
POST | |
| QueryRecordings | /recordings |
GET | |
| AnalyticsActions | ConfigureVMD | /analytics/{channel}/vmd |
PUT |
| ConfigureNPR | /analytics/{channel}/npr |
PUT | |
| ConfigureOBTRACK | /analytics/{channel}/obtrack |
PUT |
Dependencies & Prerequisites
Development Environment
Required:
- Python 3.11+
- .NET SDK 6.0+ (for C# bridge development)
- Redis 7.2+
- Visual Studio 2022 (for C# bridge)
- GeViSoft full installation
- GeViSoft SDK
- Visual C++ 2010 Redistributable (x86)
Python Packages:
# requirements.txt
fastapi==0.104.1
uvicorn[standard]==0.24.0
pydantic==2.5.0
python-jose[cryptography]==3.3.0
passlib[bcrypt]==1.7.4
redis==5.0.1
grpcio==1.59.0
grpcio-tools==1.59.0
python-multipart==0.0.6
prometheus-client==0.19.0
structlog==23.2.0
# Testing
pytest==7.4.3
pytest-asyncio==0.21.1
pytest-cov==4.1.0
pytest-mock==3.12.0
httpx==0.25.2
C# NuGet Packages (Bridge Service):
<PackageReference Include="Grpc.AspNetCore" Version="2.59.0" />
<PackageReference Include="Google.Protobuf" Version="3.25.0" />
<PackageReference Include="Grpc.Tools" Version="2.59.0" />
Deployment Environment
Windows Server 2016+ or Windows 10/11:
- .NET Runtime 6.0+
- Python 3.11+ runtime
- Redis (standalone or cluster)
- GeViSoft with active license
- Nginx or IIS for reverse proxy (HTTPS termination)
Risk Mitigation
High-Risk Items
1. SDK Stability
- Risk: C# bridge crashes take down video functionality
- Mitigation:
- Auto-restart bridge service (Windows Service recovery)
- Circuit breaker pattern in Python (trip after 3 failures)
- Health checks monitor bridge status
- Graceful degradation (API returns cached data when bridge down)
2. Performance Under Load
- Risk: May not achieve 100 concurrent streams
- Mitigation:
- Load testing in Phase 2 with real hardware
- Stream quality adaptation (reduce resolution/fps under load)
- Connection pooling and async I/O
- Horizontal scaling (multiple API instances)
3. Event Delivery Reliability
- Risk: WebSocket disconnections lose events
- Mitigation:
- Redis event buffer (5-minute retention)
- Reconnection sends missed critical events
- Event sequence numbers for gap detection
- Client-side acknowledgments for critical events
Phase 0 Completion Checklist
- GeViScope SDK integration approach decided (C# gRPC bridge)
- Video streaming strategy defined (direct URLs with tokens)
- WebSocket architecture designed (FastAPI + Redis pub/sub)
- Authentication system specified (JWT + Redis sessions)
- Performance optimization plan documented (async + caching)
- Error handling strategy defined (translation layer + structured logging)
- Testing approach designed (layered tests with mocks)
- SDK-to-API mappings documented
- Dependencies identified
- Risk mitigation strategies defined
Status: ✅ Research phase complete - Ready for Phase 1 Design
Next Step: Execute Phase 1 to generate data-model.md, contracts/openapi.yaml, and quickstart.md