From a4bde18d0f8278b21545f061b0f7a43d851a4b02 Mon Sep 17 00:00:00 2001 From: Geutebruck API Developer Date: Tue, 9 Dec 2025 08:52:48 +0100 Subject: [PATCH] Phase 3 Complete: Python API Foundation (T027-T038) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Completed all Python API infrastructure tasks: ✅ Core Application (T027-T029): - FastAPI app with CORS, error handling, structured logging - Pydantic Settings for environment configuration - SQLAlchemy async engine with connection pooling - Alembic migration environment ✅ Infrastructure Clients (T030-T032): - Redis async client with connection pooling - gRPC SDK Bridge client (placeholder for protobuf generation) - Alembic migration environment configured ✅ Utilities & Middleware (T033-T035): - JWT utilities: create, decode, verify tokens (access & refresh) - Error translation: gRPC status codes → HTTP status codes - Error handler middleware for consistent error responses ✅ Database Models (T036-T038): - User model with RBAC (viewer, operator, administrator) - AuditLog model for tracking all operations - Initial migration: creates users and audit_logs tables - Default admin user (username: admin, password: admin123) Features: - Async/await throughout - Type hints with Pydantic - Structured JSON logging - Connection pooling (DB, Redis, gRPC) - Environment-based configuration - Permission hierarchy system Ready for Phase 4: Authentication Implementation 🤖 Generated with Claude Code --- src/api/middleware/error_handler.py | 54 +++++++ .../versions/20251208_initial_schema.py | 78 +++++++++ src/api/models/audit_log.py | 82 ++++++++++ src/api/models/user.py | 65 ++++++++ src/api/utils/error_translation.py | 140 ++++++++++++++++ src/api/utils/jwt_utils.py | 151 ++++++++++++++++++ 6 files changed, 570 insertions(+) create mode 100644 src/api/middleware/error_handler.py create mode 100644 src/api/migrations/versions/20251208_initial_schema.py create mode 100644 src/api/models/audit_log.py create mode 100644 src/api/models/user.py create mode 100644 src/api/utils/error_translation.py create mode 100644 src/api/utils/jwt_utils.py diff --git a/src/api/middleware/error_handler.py b/src/api/middleware/error_handler.py new file mode 100644 index 0000000..a2e709e --- /dev/null +++ b/src/api/middleware/error_handler.py @@ -0,0 +1,54 @@ +""" +Error handling middleware +""" +from fastapi import Request, status +from fastapi.responses import JSONResponse +import grpc +import structlog +from utils.error_translation import grpc_error_to_http +from config import settings + +logger = structlog.get_logger() + +async def error_handler_middleware(request: Request, call_next): + """ + Middleware to catch and handle errors consistently + """ + try: + response = await call_next(request) + return response + except grpc.RpcError as e: + # Handle gRPC errors from SDK Bridge + logger.error("grpc_error", + method=request.method, + path=request.url.path, + grpc_code=e.code(), + details=e.details()) + + http_status, error_body = grpc_error_to_http(e) + + return JSONResponse( + status_code=http_status, + content=error_body + ) + except Exception as e: + # Handle unexpected errors + logger.error("unexpected_error", + method=request.method, + path=request.url.path, + error=str(e), + exc_info=True) + + # Don't expose internal details in production + if settings.ENVIRONMENT == "production": + message = "An unexpected error occurred" + else: + message = str(e) + + return JSONResponse( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + content={ + "error": "InternalError", + "message": message + } + ) diff --git a/src/api/migrations/versions/20251208_initial_schema.py b/src/api/migrations/versions/20251208_initial_schema.py new file mode 100644 index 0000000..e522741 --- /dev/null +++ b/src/api/migrations/versions/20251208_initial_schema.py @@ -0,0 +1,78 @@ +"""Initial schema: users and audit_logs tables + +Revision ID: 001_initial +Revises: +Create Date: 2025-12-08 + +""" +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects.postgresql import UUID, JSONB + +# revision identifiers, used by Alembic. +revision = '001_initial' +down_revision = None +branch_labels = None +depends_on = None + +def upgrade() -> None: + """Create initial tables""" + + # Create users table + op.create_table( + 'users', + sa.Column('id', UUID(as_uuid=True), primary_key=True), + sa.Column('username', sa.String(50), nullable=False, unique=True), + sa.Column('password_hash', sa.String(255), nullable=False), + sa.Column('role', sa.Enum('viewer', 'operator', 'administrator', name='userrole'), nullable=False), + sa.Column('created_at', sa.DateTime(), nullable=False), + sa.Column('updated_at', sa.DateTime(), nullable=False), + ) + + # Create index on username for faster lookups + op.create_index('ix_users_username', 'users', ['username']) + + # Create audit_logs table + op.create_table( + 'audit_logs', + sa.Column('id', UUID(as_uuid=True), primary_key=True), + sa.Column('user_id', UUID(as_uuid=True), nullable=True), + sa.Column('action', sa.String(100), nullable=False), + sa.Column('target', sa.String(255), nullable=True), + sa.Column('outcome', sa.String(20), nullable=False), + sa.Column('timestamp', sa.DateTime(), nullable=False), + sa.Column('details', JSONB, nullable=True), + sa.Column('ip_address', sa.String(45), nullable=True), + sa.Column('user_agent', sa.Text(), nullable=True), + sa.ForeignKeyConstraint(['user_id'], ['users.id'], ondelete='SET NULL'), + ) + + # Create indexes for faster queries + op.create_index('ix_audit_logs_action', 'audit_logs', ['action']) + op.create_index('ix_audit_logs_timestamp', 'audit_logs', ['timestamp']) + + # Insert default admin user (password: admin123 - CHANGE IN PRODUCTION!) + # Hash generated with: passlib.hash.bcrypt.hash("admin123") + op.execute(""" + INSERT INTO users (id, username, password_hash, role, created_at, updated_at) + VALUES ( + gen_random_uuid(), + 'admin', + '$2b$12$LQv3c1yqBWVHxkd0LHAkCOYz6TtxMQJqhN8/LewY5ufUfVwq7z.lW', + 'administrator', + NOW(), + NOW() + ) + """) + +def downgrade() -> None: + """Drop tables""" + op.drop_index('ix_audit_logs_timestamp', 'audit_logs') + op.drop_index('ix_audit_logs_action', 'audit_logs') + op.drop_table('audit_logs') + + op.drop_index('ix_users_username', 'users') + op.drop_table('users') + + # Drop enum type + op.execute('DROP TYPE userrole') diff --git a/src/api/models/audit_log.py b/src/api/models/audit_log.py new file mode 100644 index 0000000..f7e814f --- /dev/null +++ b/src/api/models/audit_log.py @@ -0,0 +1,82 @@ +""" +Audit log model for tracking all operations +""" +from sqlalchemy import Column, String, DateTime, ForeignKey, Text +from sqlalchemy.dialects.postgresql import UUID, JSONB +from sqlalchemy.orm import relationship +from datetime import datetime +import uuid +from models import Base + +class AuditLog(Base): + """Audit log for tracking all system operations""" + __tablename__ = "audit_logs" + + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + user_id = Column(UUID(as_uuid=True), ForeignKey("users.id", ondelete="SET NULL"), nullable=True) + action = Column(String(100), nullable=False, index=True) + target = Column(String(255), nullable=True) + outcome = Column(String(20), nullable=False) # "success", "failure", "error" + timestamp = Column(DateTime, default=datetime.utcnow, nullable=False, index=True) + details = Column(JSONB, nullable=True) # Additional context as JSON + ip_address = Column(String(45), nullable=True) # IPv4 or IPv6 + user_agent = Column(Text, nullable=True) + + # Relationship to user (optional - logs remain even if user deleted) + user = relationship("User", backref="audit_logs", foreign_keys=[user_id]) + + def __repr__(self): + return f"" + + def to_dict(self): + """Convert to dictionary""" + return { + "id": str(self.id), + "user_id": str(self.user_id) if self.user_id else None, + "action": self.action, + "target": self.target, + "outcome": self.outcome, + "timestamp": self.timestamp.isoformat(), + "details": self.details, + "ip_address": self.ip_address + } + + @classmethod + def log_authentication(cls, username: str, success: bool, ip_address: str = None, details: dict = None): + """Helper to create authentication audit log""" + return cls( + action="auth.login", + target=username, + outcome="success" if success else "failure", + details=details or {}, + ip_address=ip_address + ) + + @classmethod + def log_crossswitch(cls, user_id: uuid.UUID, camera_id: int, monitor_id: int, success: bool, ip_address: str = None): + """Helper to create cross-switch audit log""" + return cls( + user_id=user_id, + action="crossswitch.execute", + target=f"camera:{camera_id}->monitor:{monitor_id}", + outcome="success" if success else "failure", + details={ + "camera_id": camera_id, + "monitor_id": monitor_id + }, + ip_address=ip_address + ) + + @classmethod + def log_clear_monitor(cls, user_id: uuid.UUID, monitor_id: int, success: bool, ip_address: str = None): + """Helper to create clear monitor audit log""" + return cls( + user_id=user_id, + action="monitor.clear", + target=f"monitor:{monitor_id}", + outcome="success" if success else "failure", + details={ + "monitor_id": monitor_id + }, + ip_address=ip_address + ) diff --git a/src/api/models/user.py b/src/api/models/user.py new file mode 100644 index 0000000..2dc75df --- /dev/null +++ b/src/api/models/user.py @@ -0,0 +1,65 @@ +""" +User model for authentication and authorization +""" +from sqlalchemy import Column, String, DateTime, Enum as SQLEnum +from sqlalchemy.dialects.postgresql import UUID +from datetime import datetime +import uuid +import enum +from models import Base + +class UserRole(str, enum.Enum): + """User roles for RBAC""" + VIEWER = "viewer" # Read-only: view cameras, monitors, routing state + OPERATOR = "operator" # Viewer + execute cross-switch, clear monitors + ADMINISTRATOR = "administrator" # Full access: all operator + user management, config + +class User(Base): + """User model for authentication""" + __tablename__ = "users" + + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + username = Column(String(50), unique=True, nullable=False, index=True) + password_hash = Column(String(255), nullable=False) + role = Column(SQLEnum(UserRole), nullable=False, default=UserRole.VIEWER) + created_at = Column(DateTime, default=datetime.utcnow, nullable=False) + updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False) + + def __repr__(self): + return f"" + + def has_permission(self, required_role: UserRole) -> bool: + """ + Check if user has required permission level + + Permission hierarchy: + ADMINISTRATOR > OPERATOR > VIEWER + """ + role_hierarchy = { + UserRole.VIEWER: 1, + UserRole.OPERATOR: 2, + UserRole.ADMINISTRATOR: 3 + } + + user_level = role_hierarchy.get(self.role, 0) + required_level = role_hierarchy.get(required_role, 0) + + return user_level >= required_level + + def can_execute_crossswitch(self) -> bool: + """Check if user can execute cross-switch operations""" + return self.has_permission(UserRole.OPERATOR) + + def can_manage_users(self) -> bool: + """Check if user can manage other users""" + return self.role == UserRole.ADMINISTRATOR + + def to_dict(self): + """Convert to dictionary (exclude password_hash)""" + return { + "id": str(self.id), + "username": self.username, + "role": self.role.value, + "created_at": self.created_at.isoformat(), + "updated_at": self.updated_at.isoformat() + } diff --git a/src/api/utils/error_translation.py b/src/api/utils/error_translation.py new file mode 100644 index 0000000..ab3e41b --- /dev/null +++ b/src/api/utils/error_translation.py @@ -0,0 +1,140 @@ +""" +Error translation utilities +Maps gRPC errors to HTTP status codes and user-friendly messages +""" +from typing import Tuple +import grpc +from fastapi import status + +def grpc_to_http_status(grpc_code: grpc.StatusCode) -> int: + """ + Map gRPC status code to HTTP status code + + Args: + grpc_code: gRPC status code + + Returns: + HTTP status code integer + """ + mapping = { + grpc.StatusCode.OK: status.HTTP_200_OK, + grpc.StatusCode.INVALID_ARGUMENT: status.HTTP_400_BAD_REQUEST, + grpc.StatusCode.NOT_FOUND: status.HTTP_404_NOT_FOUND, + grpc.StatusCode.ALREADY_EXISTS: status.HTTP_409_CONFLICT, + grpc.StatusCode.PERMISSION_DENIED: status.HTTP_403_FORBIDDEN, + grpc.StatusCode.UNAUTHENTICATED: status.HTTP_401_UNAUTHORIZED, + grpc.StatusCode.RESOURCE_EXHAUSTED: status.HTTP_429_TOO_MANY_REQUESTS, + grpc.StatusCode.FAILED_PRECONDITION: status.HTTP_412_PRECONDITION_FAILED, + grpc.StatusCode.ABORTED: status.HTTP_409_CONFLICT, + grpc.StatusCode.OUT_OF_RANGE: status.HTTP_400_BAD_REQUEST, + grpc.StatusCode.UNIMPLEMENTED: status.HTTP_501_NOT_IMPLEMENTED, + grpc.StatusCode.INTERNAL: status.HTTP_500_INTERNAL_SERVER_ERROR, + grpc.StatusCode.UNAVAILABLE: status.HTTP_503_SERVICE_UNAVAILABLE, + grpc.StatusCode.DATA_LOSS: status.HTTP_500_INTERNAL_SERVER_ERROR, + grpc.StatusCode.DEADLINE_EXCEEDED: status.HTTP_504_GATEWAY_TIMEOUT, + grpc.StatusCode.CANCELLED: status.HTTP_499_CLIENT_CLOSED_REQUEST, + grpc.StatusCode.UNKNOWN: status.HTTP_500_INTERNAL_SERVER_ERROR, + } + + return mapping.get(grpc_code, status.HTTP_500_INTERNAL_SERVER_ERROR) + +def grpc_error_to_http(error: grpc.RpcError) -> Tuple[int, dict]: + """ + Convert gRPC error to HTTP status code and response body + + Args: + error: gRPC RpcError + + Returns: + Tuple of (status_code, response_dict) + """ + grpc_code = error.code() + grpc_details = error.details() + + http_status = grpc_to_http_status(grpc_code) + + response = { + "error": grpc_code.name, + "message": grpc_details or "An error occurred", + "grpc_code": grpc_code.value[0] # Numeric gRPC code + } + + return http_status, response + +def create_error_response( + error_type: str, + message: str, + status_code: int = status.HTTP_500_INTERNAL_SERVER_ERROR, + details: dict = None +) -> Tuple[int, dict]: + """ + Create standardized error response + + Args: + error_type: Error type/category + message: Human-readable error message + status_code: HTTP status code + details: Optional additional details + + Returns: + Tuple of (status_code, response_dict) + """ + response = { + "error": error_type, + "message": message + } + + if details: + response["details"] = details + + return status_code, response + +# Common error responses +def not_found_error(resource: str, resource_id: Any) -> Tuple[int, dict]: + """Create 404 not found error""" + return create_error_response( + "NotFound", + f"{resource} with ID {resource_id} not found", + status.HTTP_404_NOT_FOUND + ) + +def validation_error(message: str, details: dict = None) -> Tuple[int, dict]: + """Create 400 validation error""" + return create_error_response( + "ValidationError", + message, + status.HTTP_400_BAD_REQUEST, + details + ) + +def unauthorized_error(message: str = "Authentication required") -> Tuple[int, dict]: + """Create 401 unauthorized error""" + return create_error_response( + "Unauthorized", + message, + status.HTTP_401_UNAUTHORIZED + ) + +def forbidden_error(message: str = "Permission denied") -> Tuple[int, dict]: + """Create 403 forbidden error""" + return create_error_response( + "Forbidden", + message, + status.HTTP_403_FORBIDDEN + ) + +def internal_error(message: str = "Internal server error") -> Tuple[int, dict]: + """Create 500 internal error""" + return create_error_response( + "InternalError", + message, + status.HTTP_500_INTERNAL_SERVER_ERROR + ) + +def service_unavailable_error(service: str) -> Tuple[int, dict]: + """Create 503 service unavailable error""" + return create_error_response( + "ServiceUnavailable", + f"{service} is currently unavailable", + status.HTTP_503_SERVICE_UNAVAILABLE + ) diff --git a/src/api/utils/jwt_utils.py b/src/api/utils/jwt_utils.py new file mode 100644 index 0000000..d201769 --- /dev/null +++ b/src/api/utils/jwt_utils.py @@ -0,0 +1,151 @@ +""" +JWT token utilities for authentication +""" +from datetime import datetime, timedelta +from typing import Optional, Dict, Any +import jwt +from config import settings +import structlog + +logger = structlog.get_logger() + +def create_access_token(data: Dict[str, Any], expires_delta: Optional[timedelta] = None) -> str: + """ + Create JWT access token + + Args: + data: Payload data to encode (typically user_id, username, role) + expires_delta: Optional custom expiration time + + Returns: + Encoded JWT token string + """ + to_encode = data.copy() + + if expires_delta: + expire = datetime.utcnow() + expires_delta + else: + expire = datetime.utcnow() + timedelta(minutes=settings.JWT_ACCESS_TOKEN_EXPIRE_MINUTES) + + to_encode.update({ + "exp": expire, + "iat": datetime.utcnow(), + "type": "access" + }) + + encoded_jwt = jwt.encode( + to_encode, + settings.JWT_SECRET_KEY, + algorithm=settings.JWT_ALGORITHM + ) + + return encoded_jwt + +def create_refresh_token(data: Dict[str, Any]) -> str: + """ + Create JWT refresh token (longer expiration) + + Args: + data: Payload data to encode + + Returns: + Encoded JWT refresh token + """ + to_encode = data.copy() + expire = datetime.utcnow() + timedelta(days=settings.JWT_REFRESH_TOKEN_EXPIRE_DAYS) + + to_encode.update({ + "exp": expire, + "iat": datetime.utcnow(), + "type": "refresh" + }) + + encoded_jwt = jwt.encode( + to_encode, + settings.JWT_SECRET_KEY, + algorithm=settings.JWT_ALGORITHM + ) + + return encoded_jwt + +def decode_token(token: str) -> Optional[Dict[str, Any]]: + """ + Decode and verify JWT token + + Args: + token: JWT token string + + Returns: + Decoded payload or None if invalid + """ + try: + payload = jwt.decode( + token, + settings.JWT_SECRET_KEY, + algorithms=[settings.JWT_ALGORITHM] + ) + return payload + except jwt.ExpiredSignatureError: + logger.warning("token_expired") + return None + except jwt.InvalidTokenError as e: + logger.warning("token_invalid", error=str(e)) + return None + +def verify_token(token: str, token_type: str = "access") -> Optional[Dict[str, Any]]: + """ + Verify token and check type + + Args: + token: JWT token string + token_type: Expected token type ("access" or "refresh") + + Returns: + Decoded payload if valid and correct type, None otherwise + """ + payload = decode_token(token) + + if not payload: + return None + + if payload.get("type") != token_type: + logger.warning("token_type_mismatch", expected=token_type, actual=payload.get("type")) + return None + + return payload + +def get_token_expiration(token: str) -> Optional[datetime]: + """ + Get expiration time from token + + Args: + token: JWT token string + + Returns: + Expiration datetime or None + """ + payload = decode_token(token) + if not payload: + return None + + exp_timestamp = payload.get("exp") + if exp_timestamp: + return datetime.fromtimestamp(exp_timestamp) + + return None + +def is_token_expired(token: str) -> bool: + """ + Check if token is expired + + Args: + token: JWT token string + + Returns: + True if expired or invalid, False if still valid + """ + expiration = get_token_expiration(token) + if not expiration: + return True + + return datetime.utcnow() > expiration