Files
geutebruck-api/src/api/models/user.py
Geutebruck API Developer a4bde18d0f Phase 3 Complete: Python API Foundation (T027-T038)
Completed all Python API infrastructure tasks:

 Core Application (T027-T029):
- FastAPI app with CORS, error handling, structured logging
- Pydantic Settings for environment configuration
- SQLAlchemy async engine with connection pooling
- Alembic migration environment

 Infrastructure Clients (T030-T032):
- Redis async client with connection pooling
- gRPC SDK Bridge client (placeholder for protobuf generation)
- Alembic migration environment configured

 Utilities & Middleware (T033-T035):
- JWT utilities: create, decode, verify tokens (access & refresh)
- Error translation: gRPC status codes → HTTP status codes
- Error handler middleware for consistent error responses

 Database Models (T036-T038):
- User model with RBAC (viewer, operator, administrator)
- AuditLog model for tracking all operations
- Initial migration: creates users and audit_logs tables
- Default admin user (username: admin, password: admin123)

Features:
- Async/await throughout
- Type hints with Pydantic
- Structured JSON logging
- Connection pooling (DB, Redis, gRPC)
- Environment-based configuration
- Permission hierarchy system

Ready for Phase 4: Authentication Implementation

🤖 Generated with Claude Code
2025-12-09 08:52:48 +01:00

66 lines
2.3 KiB
Python

"""
User model for authentication and authorization
"""
from sqlalchemy import Column, String, DateTime, Enum as SQLEnum
from sqlalchemy.dialects.postgresql import UUID
from datetime import datetime
import uuid
import enum
from models import Base
class UserRole(str, enum.Enum):
"""User roles for RBAC"""
VIEWER = "viewer" # Read-only: view cameras, monitors, routing state
OPERATOR = "operator" # Viewer + execute cross-switch, clear monitors
ADMINISTRATOR = "administrator" # Full access: all operator + user management, config
class User(Base):
"""User model for authentication"""
__tablename__ = "users"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
username = Column(String(50), unique=True, nullable=False, index=True)
password_hash = Column(String(255), nullable=False)
role = Column(SQLEnum(UserRole), nullable=False, default=UserRole.VIEWER)
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False)
def __repr__(self):
return f"<User(id={self.id}, username={self.username}, role={self.role})>"
def has_permission(self, required_role: UserRole) -> bool:
"""
Check if user has required permission level
Permission hierarchy:
ADMINISTRATOR > OPERATOR > VIEWER
"""
role_hierarchy = {
UserRole.VIEWER: 1,
UserRole.OPERATOR: 2,
UserRole.ADMINISTRATOR: 3
}
user_level = role_hierarchy.get(self.role, 0)
required_level = role_hierarchy.get(required_role, 0)
return user_level >= required_level
def can_execute_crossswitch(self) -> bool:
"""Check if user can execute cross-switch operations"""
return self.has_permission(UserRole.OPERATOR)
def can_manage_users(self) -> bool:
"""Check if user can manage other users"""
return self.role == UserRole.ADMINISTRATOR
def to_dict(self):
"""Convert to dictionary (exclude password_hash)"""
return {
"id": str(self.id),
"username": self.username,
"role": self.role.value,
"created_at": self.created_at.isoformat(),
"updated_at": self.updated_at.isoformat()
}