Phase 3 Complete: Python API Foundation (T027-T038)

Completed all Python API infrastructure tasks:

 Core Application (T027-T029):
- FastAPI app with CORS, error handling, structured logging
- Pydantic Settings for environment configuration
- SQLAlchemy async engine with connection pooling
- Alembic migration environment

 Infrastructure Clients (T030-T032):
- Redis async client with connection pooling
- gRPC SDK Bridge client (placeholder for protobuf generation)
- Alembic migration environment configured

 Utilities & Middleware (T033-T035):
- JWT utilities: create, decode, verify tokens (access & refresh)
- Error translation: gRPC status codes → HTTP status codes
- Error handler middleware for consistent error responses

 Database Models (T036-T038):
- User model with RBAC (viewer, operator, administrator)
- AuditLog model for tracking all operations
- Initial migration: creates users and audit_logs tables
- Default admin user (username: admin, password: admin123)

Features:
- Async/await throughout
- Type hints with Pydantic
- Structured JSON logging
- Connection pooling (DB, Redis, gRPC)
- Environment-based configuration
- Permission hierarchy system

Ready for Phase 4: Authentication Implementation

🤖 Generated with Claude Code
This commit is contained in:
Geutebruck API Developer
2025-12-09 08:52:48 +01:00
parent 12c4e1ca9c
commit a4bde18d0f
6 changed files with 570 additions and 0 deletions

65
src/api/models/user.py Normal file
View File

@@ -0,0 +1,65 @@
"""
User model for authentication and authorization
"""
from sqlalchemy import Column, String, DateTime, Enum as SQLEnum
from sqlalchemy.dialects.postgresql import UUID
from datetime import datetime
import uuid
import enum
from models import Base
class UserRole(str, enum.Enum):
"""User roles for RBAC"""
VIEWER = "viewer" # Read-only: view cameras, monitors, routing state
OPERATOR = "operator" # Viewer + execute cross-switch, clear monitors
ADMINISTRATOR = "administrator" # Full access: all operator + user management, config
class User(Base):
"""User model for authentication"""
__tablename__ = "users"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
username = Column(String(50), unique=True, nullable=False, index=True)
password_hash = Column(String(255), nullable=False)
role = Column(SQLEnum(UserRole), nullable=False, default=UserRole.VIEWER)
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False)
def __repr__(self):
return f"<User(id={self.id}, username={self.username}, role={self.role})>"
def has_permission(self, required_role: UserRole) -> bool:
"""
Check if user has required permission level
Permission hierarchy:
ADMINISTRATOR > OPERATOR > VIEWER
"""
role_hierarchy = {
UserRole.VIEWER: 1,
UserRole.OPERATOR: 2,
UserRole.ADMINISTRATOR: 3
}
user_level = role_hierarchy.get(self.role, 0)
required_level = role_hierarchy.get(required_role, 0)
return user_level >= required_level
def can_execute_crossswitch(self) -> bool:
"""Check if user can execute cross-switch operations"""
return self.has_permission(UserRole.OPERATOR)
def can_manage_users(self) -> bool:
"""Check if user can manage other users"""
return self.role == UserRole.ADMINISTRATOR
def to_dict(self):
"""Convert to dictionary (exclude password_hash)"""
return {
"id": str(self.id),
"username": self.username,
"role": self.role.value,
"created_at": self.created_at.isoformat(),
"updated_at": self.updated_at.isoformat()
}