Completed all Python API infrastructure tasks: ✅ Core Application (T027-T029): - FastAPI app with CORS, error handling, structured logging - Pydantic Settings for environment configuration - SQLAlchemy async engine with connection pooling - Alembic migration environment ✅ Infrastructure Clients (T030-T032): - Redis async client with connection pooling - gRPC SDK Bridge client (placeholder for protobuf generation) - Alembic migration environment configured ✅ Utilities & Middleware (T033-T035): - JWT utilities: create, decode, verify tokens (access & refresh) - Error translation: gRPC status codes → HTTP status codes - Error handler middleware for consistent error responses ✅ Database Models (T036-T038): - User model with RBAC (viewer, operator, administrator) - AuditLog model for tracking all operations - Initial migration: creates users and audit_logs tables - Default admin user (username: admin, password: admin123) Features: - Async/await throughout - Type hints with Pydantic - Structured JSON logging - Connection pooling (DB, Redis, gRPC) - Environment-based configuration - Permission hierarchy system Ready for Phase 4: Authentication Implementation 🤖 Generated with Claude Code
66 lines
2.3 KiB
Python
66 lines
2.3 KiB
Python
"""
|
|
User model for authentication and authorization
|
|
"""
|
|
from sqlalchemy import Column, String, DateTime, Enum as SQLEnum
|
|
from sqlalchemy.dialects.postgresql import UUID
|
|
from datetime import datetime
|
|
import uuid
|
|
import enum
|
|
from models import Base
|
|
|
|
class UserRole(str, enum.Enum):
|
|
"""User roles for RBAC"""
|
|
VIEWER = "viewer" # Read-only: view cameras, monitors, routing state
|
|
OPERATOR = "operator" # Viewer + execute cross-switch, clear monitors
|
|
ADMINISTRATOR = "administrator" # Full access: all operator + user management, config
|
|
|
|
class User(Base):
|
|
"""User model for authentication"""
|
|
__tablename__ = "users"
|
|
|
|
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
|
|
username = Column(String(50), unique=True, nullable=False, index=True)
|
|
password_hash = Column(String(255), nullable=False)
|
|
role = Column(SQLEnum(UserRole), nullable=False, default=UserRole.VIEWER)
|
|
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
|
|
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False)
|
|
|
|
def __repr__(self):
|
|
return f"<User(id={self.id}, username={self.username}, role={self.role})>"
|
|
|
|
def has_permission(self, required_role: UserRole) -> bool:
|
|
"""
|
|
Check if user has required permission level
|
|
|
|
Permission hierarchy:
|
|
ADMINISTRATOR > OPERATOR > VIEWER
|
|
"""
|
|
role_hierarchy = {
|
|
UserRole.VIEWER: 1,
|
|
UserRole.OPERATOR: 2,
|
|
UserRole.ADMINISTRATOR: 3
|
|
}
|
|
|
|
user_level = role_hierarchy.get(self.role, 0)
|
|
required_level = role_hierarchy.get(required_role, 0)
|
|
|
|
return user_level >= required_level
|
|
|
|
def can_execute_crossswitch(self) -> bool:
|
|
"""Check if user can execute cross-switch operations"""
|
|
return self.has_permission(UserRole.OPERATOR)
|
|
|
|
def can_manage_users(self) -> bool:
|
|
"""Check if user can manage other users"""
|
|
return self.role == UserRole.ADMINISTRATOR
|
|
|
|
def to_dict(self):
|
|
"""Convert to dictionary (exclude password_hash)"""
|
|
return {
|
|
"id": str(self.id),
|
|
"username": self.username,
|
|
"role": self.role.value,
|
|
"created_at": self.created_at.isoformat(),
|
|
"updated_at": self.updated_at.isoformat()
|
|
}
|