From 12c4e1ca9c553629d8cf0a8e2b68df5e08c1e69f Mon Sep 17 00:00:00 2001 From: Geutebruck API Developer Date: Tue, 9 Dec 2025 08:49:08 +0100 Subject: [PATCH] Phase 3 (Part 1): API Infrastructure - FastAPI, Database, Redis, gRPC Client MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Completed Tasks (T027-T032): - ✅ FastAPI application with structured logging, CORS, global error handlers - ✅ Pydantic Settings for environment configuration - ✅ SQLAlchemy async engine with session management - ✅ Alembic migration environment setup - ✅ Redis async client with connection pooling - ✅ gRPC SDK Bridge client (placeholder - awaiting protobuf generation) Next: JWT utilities, middleware, database models 🤖 Generated with Claude Code --- src/api/clients/redis_client.py | 126 +++++++++++++++ src/api/clients/sdk_bridge_client.py | 222 +++++++++++++++++++++++++++ src/api/config.py | 95 ++++++++++++ src/api/main.py | 136 ++++++++++++++++ src/api/migrations/env.py | 76 +++++++++ src/api/models/__init__.py | 69 +++++++++ 6 files changed, 724 insertions(+) create mode 100644 src/api/clients/redis_client.py create mode 100644 src/api/clients/sdk_bridge_client.py create mode 100644 src/api/config.py create mode 100644 src/api/main.py create mode 100644 src/api/migrations/env.py create mode 100644 src/api/models/__init__.py diff --git a/src/api/clients/redis_client.py b/src/api/clients/redis_client.py new file mode 100644 index 0000000..02d926d --- /dev/null +++ b/src/api/clients/redis_client.py @@ -0,0 +1,126 @@ +""" +Redis client with connection pooling +""" +import redis.asyncio as redis +from typing import Optional, Any +import json +import structlog +from config import settings + +logger = structlog.get_logger() + +class RedisClient: + """Async Redis client wrapper""" + + def __init__(self): + self._pool: Optional[redis.ConnectionPool] = None + self._client: Optional[redis.Redis] = None + + async def connect(self): + """Initialize Redis connection pool""" + try: + logger.info("redis_connecting", host=settings.REDIS_HOST, port=settings.REDIS_PORT) + + self._pool = redis.ConnectionPool.from_url( + settings.redis_url, + max_connections=settings.REDIS_MAX_CONNECTIONS, + decode_responses=True, + ) + + self._client = redis.Redis(connection_pool=self._pool) + + # Test connection + await self._client.ping() + + logger.info("redis_connected") + except Exception as e: + logger.error("redis_connection_failed", error=str(e)) + raise + + async def close(self): + """Close Redis connections""" + try: + if self._client: + await self._client.close() + if self._pool: + await self._pool.disconnect() + logger.info("redis_closed") + except Exception as e: + logger.error("redis_close_failed", error=str(e)) + + async def get(self, key: str) -> Optional[str]: + """Get value by key""" + if not self._client: + raise RuntimeError("Redis client not connected") + return await self._client.get(key) + + async def set(self, key: str, value: Any, expire: Optional[int] = None) -> bool: + """Set value with optional expiration (seconds)""" + if not self._client: + raise RuntimeError("Redis client not connected") + return await self._client.set(key, value, ex=expire) + + async def delete(self, key: str) -> int: + """Delete key""" + if not self._client: + raise RuntimeError("Redis client not connected") + return await self._client.delete(key) + + async def exists(self, key: str) -> bool: + """Check if key exists""" + if not self._client: + raise RuntimeError("Redis client not connected") + return await self._client.exists(key) > 0 + + async def get_json(self, key: str) -> Optional[dict]: + """Get JSON value""" + value = await self.get(key) + if value: + return json.loads(value) + return None + + async def set_json(self, key: str, value: dict, expire: Optional[int] = None) -> bool: + """Set JSON value""" + return await self.set(key, json.dumps(value), expire) + + async def get_many(self, keys: list[str]) -> list[Optional[str]]: + """Get multiple values""" + if not self._client: + raise RuntimeError("Redis client not connected") + return await self._client.mget(keys) + + async def set_many(self, mapping: dict[str, Any]) -> bool: + """Set multiple key-value pairs""" + if not self._client: + raise RuntimeError("Redis client not connected") + return await self._client.mset(mapping) + + async def incr(self, key: str, amount: int = 1) -> int: + """Increment value""" + if not self._client: + raise RuntimeError("Redis client not connected") + return await self._client.incrby(key, amount) + + async def expire(self, key: str, seconds: int) -> bool: + """Set expiration on key""" + if not self._client: + raise RuntimeError("Redis client not connected") + return await self._client.expire(key, seconds) + + async def ttl(self, key: str) -> int: + """Get time to live for key""" + if not self._client: + raise RuntimeError("Redis client not connected") + return await self._client.ttl(key) + +# Global Redis client instance +redis_client = RedisClient() + +# Convenience functions +async def init_redis(): + """Initialize Redis connection (call on startup)""" + await redis_client.connect() + +async def close_redis(): + """Close Redis connection (call on shutdown)""" + await redis_client.close() diff --git a/src/api/clients/sdk_bridge_client.py b/src/api/clients/sdk_bridge_client.py new file mode 100644 index 0000000..11174ab --- /dev/null +++ b/src/api/clients/sdk_bridge_client.py @@ -0,0 +1,222 @@ +""" +gRPC client for SDK Bridge communication +""" +import grpc +from typing import Optional, List +import structlog +from config import settings + +# TODO: Import generated protobuf classes after running protoc +# from protos import camera_pb2, camera_pb2_grpc +# from protos import monitor_pb2, monitor_pb2_grpc +# from protos import crossswitch_pb2, crossswitch_pb2_grpc + +logger = structlog.get_logger() + +class SDKBridgeClient: + """gRPC client for communicating with SDK Bridge""" + + def __init__(self): + self._channel: Optional[grpc.aio.Channel] = None + # TODO: Initialize stubs after protobuf generation + # self._camera_stub = None + # self._monitor_stub = None + # self._crossswitch_stub = None + + async def connect(self): + """Initialize gRPC channel to SDK Bridge""" + try: + logger.info("sdk_bridge_connecting", url=settings.sdk_bridge_url) + + # Create async gRPC channel + self._channel = grpc.aio.insecure_channel( + settings.sdk_bridge_url, + options=[ + ('grpc.max_send_message_length', 50 * 1024 * 1024), # 50MB + ('grpc.max_receive_message_length', 50 * 1024 * 1024), # 50MB + ('grpc.keepalive_time_ms', 30000), # 30 seconds + ('grpc.keepalive_timeout_ms', 10000), # 10 seconds + ] + ) + + # TODO: Initialize service stubs after protobuf generation + # self._camera_stub = camera_pb2_grpc.CameraServiceStub(self._channel) + # self._monitor_stub = monitor_pb2_grpc.MonitorServiceStub(self._channel) + # self._crossswitch_stub = crossswitch_pb2_grpc.CrossSwitchServiceStub(self._channel) + + # Test connection with health check + # await self.health_check() + + logger.info("sdk_bridge_connected") + except Exception as e: + logger.error("sdk_bridge_connection_failed", error=str(e)) + raise + + async def close(self): + """Close gRPC channel""" + try: + if self._channel: + await self._channel.close() + logger.info("sdk_bridge_closed") + except Exception as e: + logger.error("sdk_bridge_close_failed", error=str(e)) + + async def health_check(self) -> dict: + """Check SDK Bridge health""" + try: + logger.debug("sdk_bridge_health_check") + # TODO: Implement after protobuf generation + # request = crossswitch_pb2.Empty() + # response = await self._crossswitch_stub.HealthCheck(request, timeout=5.0) + # return { + # "is_healthy": response.is_healthy, + # "sdk_status": response.sdk_status, + # "geviserver_host": response.geviserver_host + # } + return {"is_healthy": True, "sdk_status": "connected", "geviserver_host": "localhost"} + except grpc.RpcError as e: + logger.error("sdk_bridge_health_check_failed", error=str(e)) + return {"is_healthy": False, "sdk_status": "error", "error": str(e)} + + async def list_cameras(self) -> List[dict]: + """List all cameras from GeViServer""" + try: + logger.debug("sdk_bridge_list_cameras") + # TODO: Implement after protobuf generation + # request = camera_pb2.ListCamerasRequest() + # response = await self._camera_stub.ListCameras(request, timeout=10.0) + # return [ + # { + # "id": camera.id, + # "name": camera.name, + # "description": camera.description, + # "has_ptz": camera.has_ptz, + # "has_video_sensor": camera.has_video_sensor, + # "status": camera.status + # } + # for camera in response.cameras + # ] + return [] # Placeholder + except grpc.RpcError as e: + logger.error("sdk_bridge_list_cameras_failed", error=str(e)) + raise + + async def get_camera(self, camera_id: int) -> Optional[dict]: + """Get camera details""" + try: + logger.debug("sdk_bridge_get_camera", camera_id=camera_id) + # TODO: Implement after protobuf generation + # request = camera_pb2.GetCameraRequest(camera_id=camera_id) + # response = await self._camera_stub.GetCamera(request, timeout=5.0) + # return { + # "id": response.id, + # "name": response.name, + # "description": response.description, + # "has_ptz": response.has_ptz, + # "has_video_sensor": response.has_video_sensor, + # "status": response.status + # } + return None # Placeholder + except grpc.RpcError as e: + if e.code() == grpc.StatusCode.NOT_FOUND: + return None + logger.error("sdk_bridge_get_camera_failed", camera_id=camera_id, error=str(e)) + raise + + async def list_monitors(self) -> List[dict]: + """List all monitors from GeViServer""" + try: + logger.debug("sdk_bridge_list_monitors") + # TODO: Implement after protobuf generation + # request = monitor_pb2.ListMonitorsRequest() + # response = await self._monitor_stub.ListMonitors(request, timeout=10.0) + # return [ + # { + # "id": monitor.id, + # "name": monitor.name, + # "description": monitor.description, + # "is_active": monitor.is_active, + # "current_camera_id": monitor.current_camera_id, + # "status": monitor.status + # } + # for monitor in response.monitors + # ] + return [] # Placeholder + except grpc.RpcError as e: + logger.error("sdk_bridge_list_monitors_failed", error=str(e)) + raise + + async def execute_crossswitch(self, camera_id: int, monitor_id: int, mode: int = 0) -> dict: + """Execute cross-switch operation""" + try: + logger.info("sdk_bridge_crossswitch", camera_id=camera_id, monitor_id=monitor_id, mode=mode) + # TODO: Implement after protobuf generation + # request = crossswitch_pb2.CrossSwitchRequest( + # camera_id=camera_id, + # monitor_id=monitor_id, + # mode=mode + # ) + # response = await self._crossswitch_stub.ExecuteCrossSwitch(request, timeout=10.0) + # return { + # "success": response.success, + # "message": response.message, + # "camera_id": response.camera_id, + # "monitor_id": response.monitor_id + # } + return {"success": True, "message": "Placeholder", "camera_id": camera_id, "monitor_id": monitor_id} + except grpc.RpcError as e: + logger.error("sdk_bridge_crossswitch_failed", error=str(e)) + raise + + async def clear_monitor(self, monitor_id: int) -> dict: + """Clear monitor (stop video)""" + try: + logger.info("sdk_bridge_clear_monitor", monitor_id=monitor_id) + # TODO: Implement after protobuf generation + # request = crossswitch_pb2.ClearMonitorRequest(monitor_id=monitor_id) + # response = await self._crossswitch_stub.ClearMonitor(request, timeout=10.0) + # return { + # "success": response.success, + # "message": response.message, + # "monitor_id": response.monitor_id + # } + return {"success": True, "message": "Placeholder", "monitor_id": monitor_id} + except grpc.RpcError as e: + logger.error("sdk_bridge_clear_monitor_failed", error=str(e)) + raise + + async def get_routing_state(self) -> dict: + """Get current routing state""" + try: + logger.debug("sdk_bridge_get_routing_state") + # TODO: Implement after protobuf generation + # request = crossswitch_pb2.GetRoutingStateRequest() + # response = await self._crossswitch_stub.GetRoutingState(request, timeout=10.0) + # return { + # "routes": [ + # { + # "camera_id": route.camera_id, + # "monitor_id": route.monitor_id, + # "camera_name": route.camera_name, + # "monitor_name": route.monitor_name + # } + # for route in response.routes + # ], + # "total_routes": response.total_routes + # } + return {"routes": [], "total_routes": 0} # Placeholder + except grpc.RpcError as e: + logger.error("sdk_bridge_get_routing_state_failed", error=str(e)) + raise + +# Global SDK Bridge client instance +sdk_bridge_client = SDKBridgeClient() + +# Convenience functions +async def init_sdk_bridge(): + """Initialize SDK Bridge connection (call on startup)""" + await sdk_bridge_client.connect() + +async def close_sdk_bridge(): + """Close SDK Bridge connection (call on shutdown)""" + await sdk_bridge_client.close() diff --git a/src/api/config.py b/src/api/config.py new file mode 100644 index 0000000..1f495e4 --- /dev/null +++ b/src/api/config.py @@ -0,0 +1,95 @@ +""" +Configuration management using Pydantic Settings +Loads configuration from environment variables +""" +from pydantic_settings import BaseSettings +from typing import List +import os + +class Settings(BaseSettings): + """Application settings loaded from environment variables""" + + # API Configuration + API_HOST: str = "0.0.0.0" + API_PORT: int = 8000 + API_TITLE: str = "Geutebruck Cross-Switching API" + API_VERSION: str = "1.0.0" + ENVIRONMENT: str = "development" # development, production + + # GeViScope SDK Bridge (gRPC) + SDK_BRIDGE_HOST: str = "localhost" + SDK_BRIDGE_PORT: int = 50051 + + # GeViServer Connection (used by SDK Bridge) + GEVISERVER_HOST: str = "localhost" + GEVISERVER_USERNAME: str = "sysadmin" + GEVISERVER_PASSWORD: str = "masterkey" + + # Database (PostgreSQL) + DATABASE_URL: str = "postgresql+asyncpg://geutebruck:geutebruck@localhost:5432/geutebruck_api" + DATABASE_POOL_SIZE: int = 20 + DATABASE_MAX_OVERFLOW: int = 10 + + # Redis + REDIS_HOST: str = "localhost" + REDIS_PORT: int = 6379 + REDIS_DB: int = 0 + REDIS_PASSWORD: str = "" + REDIS_MAX_CONNECTIONS: int = 50 + + # JWT Authentication + JWT_SECRET_KEY: str = "change-this-to-a-secure-random-key-in-production" + JWT_ALGORITHM: str = "HS256" + JWT_ACCESS_TOKEN_EXPIRE_MINUTES: int = 60 + JWT_REFRESH_TOKEN_EXPIRE_DAYS: int = 7 + + # Logging + LOG_LEVEL: str = "INFO" + LOG_FORMAT: str = "json" # json or console + + # Security + ALLOWED_HOSTS: str = "*" + CORS_ORIGINS: List[str] = ["http://localhost:3000", "http://localhost:8080"] + + # Cache Settings + CACHE_CAMERA_LIST_TTL: int = 60 # seconds + CACHE_MONITOR_LIST_TTL: int = 60 # seconds + + # Rate Limiting + RATE_LIMIT_ENABLED: bool = True + RATE_LIMIT_PER_MINUTE: int = 60 + + class Config: + env_file = ".env" + env_file_encoding = "utf-8" + case_sensitive = True + + @property + def sdk_bridge_url(self) -> str: + """Get SDK Bridge gRPC URL""" + return f"{self.SDK_BRIDGE_HOST}:{self.SDK_BRIDGE_PORT}" + + @property + def redis_url(self) -> str: + """Get Redis connection URL""" + if self.REDIS_PASSWORD: + return f"redis://:{self.REDIS_PASSWORD}@{self.REDIS_HOST}:{self.REDIS_PORT}/{self.REDIS_DB}" + return f"redis://{self.REDIS_HOST}:{self.REDIS_PORT}/{self.REDIS_DB}" + + def get_cors_origins(self) -> List[str]: + """Parse CORS origins (handles both list and comma-separated string)""" + if isinstance(self.CORS_ORIGINS, list): + return self.CORS_ORIGINS + return [origin.strip() for origin in self.CORS_ORIGINS.split(",")] + +# Create global settings instance +settings = Settings() + +# Validate critical settings on import +if settings.ENVIRONMENT == "production": + if settings.JWT_SECRET_KEY == "change-this-to-a-secure-random-key-in-production": + raise ValueError("JWT_SECRET_KEY must be changed in production!") + + if settings.GEVISERVER_PASSWORD == "masterkey": + import warnings + warnings.warn("Using default GeViServer password in production!") diff --git a/src/api/main.py b/src/api/main.py new file mode 100644 index 0000000..95419c6 --- /dev/null +++ b/src/api/main.py @@ -0,0 +1,136 @@ +""" +Geutebruck Cross-Switching API +FastAPI application entry point +""" +from fastapi import FastAPI, Request, status +from fastapi.middleware.cors import CORSMiddleware +from fastapi.responses import JSONResponse +from fastapi.exceptions import RequestValidationError +import structlog +import sys +from pathlib import Path + +# Add src/api to Python path for imports +sys.path.insert(0, str(Path(__file__).parent)) + +from config import settings + +# Configure structured logging +structlog.configure( + processors=[ + structlog.processors.TimeStamper(fmt="iso"), + structlog.stdlib.add_log_level, + structlog.processors.JSONRenderer() if settings.LOG_FORMAT == "json" else structlog.dev.ConsoleRenderer() + ], + wrapper_class=structlog.stdlib.BoundLogger, + context_class=dict, + logger_factory=structlog.stdlib.LoggerFactory(), +) + +logger = structlog.get_logger() + +# Create FastAPI app +app = FastAPI( + title=settings.API_TITLE, + version=settings.API_VERSION, + description="REST API for Geutebruck GeViScope/GeViSoft Cross-Switching Control", + docs_url="/docs", + redoc_url="/redoc", + openapi_url="/openapi.json" +) + +# CORS middleware +app.add_middleware( + CORSMiddleware, + allow_origins=settings.CORS_ORIGINS, + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + +# Global exception handlers +@app.exception_handler(RequestValidationError) +async def validation_exception_handler(request: Request, exc: RequestValidationError): + """Handle validation errors""" + logger.warning("validation_error", errors=exc.errors(), body=exc.body) + return JSONResponse( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + content={ + "error": "Validation Error", + "detail": exc.errors(), + }, + ) + +@app.exception_handler(Exception) +async def global_exception_handler(request: Request, exc: Exception): + """Handle unexpected errors""" + logger.error("unexpected_error", exc_info=exc) + return JSONResponse( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + content={ + "error": "Internal Server Error", + "message": "An unexpected error occurred" if settings.ENVIRONMENT == "production" else str(exc), + }, + ) + +# Startup event +@app.on_event("startup") +async def startup_event(): + """Initialize services on startup""" + logger.info("startup", + api_title=settings.API_TITLE, + version=settings.API_VERSION, + environment=settings.ENVIRONMENT) + + # TODO: Initialize database connection pool + # TODO: Initialize Redis connection + # TODO: Initialize gRPC SDK Bridge client + + logger.info("startup_complete") + +# Shutdown event +@app.on_event("shutdown") +async def shutdown_event(): + """Cleanup on shutdown""" + logger.info("shutdown") + + # TODO: Close database connections + # TODO: Close Redis connections + # TODO: Close gRPC connections + +# Health check endpoint +@app.get("/health", tags=["system"]) +async def health_check(): + """Health check endpoint""" + return { + "status": "healthy", + "version": settings.API_VERSION, + "environment": settings.ENVIRONMENT + } + +# Root endpoint +@app.get("/", tags=["system"]) +async def root(): + """API root endpoint""" + return { + "name": settings.API_TITLE, + "version": settings.API_VERSION, + "docs": "/docs", + "health": "/health" + } + +# Register routers (TODO: will add as we implement phases) +# from routers import auth, cameras, monitors, crossswitch +# app.include_router(auth.router, prefix="/api/v1/auth", tags=["authentication"]) +# app.include_router(cameras.router, prefix="/api/v1/cameras", tags=["cameras"]) +# app.include_router(monitors.router, prefix="/api/v1/monitors", tags=["monitors"]) +# app.include_router(crossswitch.router, prefix="/api/v1", tags=["crossswitch"]) + +if __name__ == "__main__": + import uvicorn + uvicorn.run( + "main:app", + host=settings.API_HOST, + port=settings.API_PORT, + reload=settings.ENVIRONMENT == "development" + ) diff --git a/src/api/migrations/env.py b/src/api/migrations/env.py new file mode 100644 index 0000000..04d193b --- /dev/null +++ b/src/api/migrations/env.py @@ -0,0 +1,76 @@ +"""Alembic migration environment""" +from logging.config import fileConfig +from sqlalchemy import pool +from sqlalchemy.engine import Connection +from sqlalchemy.ext.asyncio import async_engine_from_config +from alembic import context +import asyncio +import sys +from pathlib import Path + +# Add src/api to path for imports +sys.path.insert(0, str(Path(__file__).parent.parent)) + +# Import models and config +from models import Base +from config import settings + +# Import all models so Alembic can detect them +# from models.user import User +# from models.audit_log import AuditLog +# from models.crossswitch_route import CrossSwitchRoute + +# this is the Alembic Config object +config = context.config + +# Override sqlalchemy.url with our DATABASE_URL +config.set_main_option("sqlalchemy.url", settings.DATABASE_URL) + +# Interpret the config file for Python logging. +if config.config_file_name is not None: + fileConfig(config.config_file_name) + +# add your model's MetaData object here for 'autogenerate' support +target_metadata = Base.metadata + +def run_migrations_offline() -> None: + """Run migrations in 'offline' mode.""" + url = config.get_main_option("sqlalchemy.url") + context.configure( + url=url, + target_metadata=target_metadata, + literal_binds=True, + dialect_opts={"paramstyle": "named"}, + ) + + with context.begin_transaction(): + context.run_migrations() + +def do_run_migrations(connection: Connection) -> None: + """Run migrations with connection""" + context.configure(connection=connection, target_metadata=target_metadata) + + with context.begin_transaction(): + context.run_migrations() + +async def run_async_migrations() -> None: + """Run migrations in 'online' mode with async engine""" + connectable = async_engine_from_config( + config.get_section(config.config_ini_section, {}), + prefix="sqlalchemy.", + poolclass=pool.NullPool, + ) + + async with connectable.connect() as connection: + await connection.run_sync(do_run_migrations) + + await connectable.dispose() + +def run_migrations_online() -> None: + """Run migrations in 'online' mode.""" + asyncio.run(run_async_migrations()) + +if context.is_offline_mode(): + run_migrations_offline() +else: + run_migrations_online() diff --git a/src/api/models/__init__.py b/src/api/models/__init__.py new file mode 100644 index 0000000..533d127 --- /dev/null +++ b/src/api/models/__init__.py @@ -0,0 +1,69 @@ +""" +SQLAlchemy database setup with async support +""" +from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker +from sqlalchemy.orm import DeclarativeBase +from config import settings +import structlog + +logger = structlog.get_logger() + +# Create async engine +engine = create_async_engine( + settings.DATABASE_URL, + echo=settings.ENVIRONMENT == "development", + pool_size=settings.DATABASE_POOL_SIZE, + max_overflow=settings.DATABASE_MAX_OVERFLOW, + pool_pre_ping=True, # Verify connections before using +) + +# Create async session factory +AsyncSessionLocal = async_sessionmaker( + engine, + class_=AsyncSession, + expire_on_commit=False, +) + +# Base class for models +class Base(DeclarativeBase): + """Base class for all database models""" + pass + +# Dependency for FastAPI routes +async def get_db() -> AsyncSession: + """ + Dependency that provides database session to FastAPI routes + Usage: db: AsyncSession = Depends(get_db) + """ + async with AsyncSessionLocal() as session: + try: + yield session + await session.commit() + except Exception: + await session.rollback() + raise + finally: + await session.close() + +# Database initialization +async def init_db(): + """Initialize database connection (call on startup)""" + try: + logger.info("database_init", url=settings.DATABASE_URL.split("@")[-1]) # Hide credentials + async with engine.begin() as conn: + # Test connection + await conn.run_sync(lambda _: None) + logger.info("database_connected") + except Exception as e: + logger.error("database_connection_failed", error=str(e)) + raise + +async def close_db(): + """Close database connections (call on shutdown)""" + try: + logger.info("database_closing") + await engine.dispose() + logger.info("database_closed") + except Exception as e: + logger.error("database_close_failed", error=str(e)) + raise