Files
geutebruck-api/src/api/clients/redis_client.py
2025-12-09 14:52:35 +01:00

140 lines
4.5 KiB
Python

"""
Redis client with connection pooling
"""
import redis.asyncio as redis
from typing import Optional, Any
import json
import structlog
from config import settings
logger = structlog.get_logger()
class RedisClient:
"""Async Redis client wrapper"""
def __init__(self):
self._pool: Optional[redis.ConnectionPool] = None
self._client: Optional[redis.Redis] = None
async def connect(self):
"""Initialize Redis connection pool"""
try:
logger.info("redis_connecting", host=settings.REDIS_HOST, port=settings.REDIS_PORT)
self._pool = redis.ConnectionPool.from_url(
settings.redis_url,
max_connections=settings.REDIS_MAX_CONNECTIONS,
decode_responses=True,
)
self._client = redis.Redis(connection_pool=self._pool)
# Test connection
await self._client.ping()
logger.info("redis_connected")
except Exception as e:
logger.error("redis_connection_failed", error=str(e))
raise
async def disconnect(self):
"""Disconnect Redis (alias for close)"""
await self.close()
async def close(self):
"""Close Redis connections"""
try:
if self._client:
await self._client.close()
if self._pool:
await self._pool.disconnect()
logger.info("redis_closed")
except Exception as e:
logger.error("redis_close_failed", error=str(e))
async def ping(self) -> bool:
"""Ping Redis to check connectivity"""
if not self._client:
return False
try:
return await self._client.ping()
except Exception:
return False
async def get(self, key: str) -> Optional[str]:
"""Get value by key"""
if not self._client:
raise RuntimeError("Redis client not connected")
return await self._client.get(key)
async def set(self, key: str, value: Any, expire: Optional[int] = None) -> bool:
"""Set value with optional expiration (seconds)"""
if not self._client:
raise RuntimeError("Redis client not connected")
return await self._client.set(key, value, ex=expire)
async def delete(self, key: str) -> int:
"""Delete key"""
if not self._client:
raise RuntimeError("Redis client not connected")
return await self._client.delete(key)
async def exists(self, key: str) -> bool:
"""Check if key exists"""
if not self._client:
raise RuntimeError("Redis client not connected")
return await self._client.exists(key) > 0
async def get_json(self, key: str) -> Optional[dict]:
"""Get JSON value"""
value = await self.get(key)
if value:
return json.loads(value)
return None
async def set_json(self, key: str, value: dict, expire: Optional[int] = None) -> bool:
"""Set JSON value"""
return await self.set(key, json.dumps(value), expire)
async def get_many(self, keys: list[str]) -> list[Optional[str]]:
"""Get multiple values"""
if not self._client:
raise RuntimeError("Redis client not connected")
return await self._client.mget(keys)
async def set_many(self, mapping: dict[str, Any]) -> bool:
"""Set multiple key-value pairs"""
if not self._client:
raise RuntimeError("Redis client not connected")
return await self._client.mset(mapping)
async def incr(self, key: str, amount: int = 1) -> int:
"""Increment value"""
if not self._client:
raise RuntimeError("Redis client not connected")
return await self._client.incrby(key, amount)
async def expire(self, key: str, seconds: int) -> bool:
"""Set expiration on key"""
if not self._client:
raise RuntimeError("Redis client not connected")
return await self._client.expire(key, seconds)
async def ttl(self, key: str) -> int:
"""Get time to live for key"""
if not self._client:
raise RuntimeError("Redis client not connected")
return await self._client.ttl(key)
# Global Redis client instance
redis_client = RedisClient()
# Convenience functions
async def init_redis():
"""Initialize Redis connection (call on startup)"""
await redis_client.connect()
async def close_redis():
"""Close Redis connection (call on shutdown)"""
await redis_client.close()