# ============ SERVER CONFIGURATION (G-CORE & GEVISCOPE) ============ # Parent endpoint - List all servers (both types) @router.get( "/servers", response_model=AllServersResponse, status_code=status.HTTP_200_OK, summary="List all servers (both G-Core and GeViScope)", description="Get combined list of all server types" ) async def list_all_servers( current_user: User = Depends(require_viewer) ): """List all servers - both G-Core and GeViScope""" service = ConfigurationService() try: # Get G-Core servers gcore_servers = await _get_gcore_servers_list(service) # Get GeViScope servers geviscope_servers = await _get_geviscope_servers_list(service) return AllServersResponse( gcore_servers=gcore_servers, geviscope_servers=geviscope_servers, total_gcore=len(gcore_servers), total_geviscope=len(geviscope_servers), total=len(gcore_servers) + len(geviscope_servers) ) except Exception as e: logger.error("list_all_servers_error", error=str(e)) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to list servers: {str(e)}" ) # ============ G-CORE SERVERS ============ @router.get( "/servers/gcore", response_model=GCoreServerListResponse, status_code=status.HTTP_200_OK, summary="List G-Core servers", description="Get all G-Core servers from GeViGCoreServer folder" ) async def list_gcore_servers( current_user: User = Depends(require_viewer) ): """List all G-Core servers""" service = ConfigurationService() try: servers = await _get_gcore_servers_list(service) return GCoreServerListResponse(servers=servers, total=len(servers)) except Exception as e: logger.error("list_gcore_servers_error", error=str(e)) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to list G-Core servers: {str(e)}" ) @router.get( "/servers/gcore/{server_id}", response_model=GCoreServerResponse, status_code=status.HTTP_200_OK, summary="Get G-Core server", description="Get details of a specific G-Core server by ID" ) async def get_gcore_server( server_id: str, current_user: User = Depends(require_viewer) ): """Get single G-Core server by ID""" service = ConfigurationService() try: server = await _get_gcore_server_by_id(service, server_id) if not server: raise ValueError(f"G-Core server '{server_id}' not found") return server except ValueError as e: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e)) except Exception as e: logger.error("get_gcore_server_error", server_id=server_id, error=str(e)) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to get G-Core server: {str(e)}" ) @router.post( "/servers/gcore", response_model=ServerOperationResponse, status_code=status.HTTP_201_CREATED, summary="Create G-Core server", description="Create a new G-Core server" ) async def create_gcore_server( server_data: GCoreServerCreate, current_user: User = Depends(require_administrator) ): """Create new G-Core server""" service = ConfigurationService() try: result = await service.create_server({ "alias": server_data.alias, "host": server_data.host, "user": server_data.user, "password": server_data.password, "enabled": server_data.enabled, "deactivate_echo": server_data.deactivate_echo, "deactivate_live_check": server_data.deactivate_live_check }) return ServerOperationResponse( success=result.get("success", False), message=result.get("message", ""), server_id=result.get("server_id") ) except Exception as e: logger.error("create_gcore_server_error", error=str(e)) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to create G-Core server: {str(e)}" ) @router.put( "/servers/gcore/{server_id}", response_model=ServerOperationResponse, status_code=status.HTTP_200_OK, summary="Update G-Core server", description="Update an existing G-Core server" ) async def update_gcore_server( server_id: str, server_data: GCoreServerUpdate, current_user: User = Depends(require_administrator) ): """Update existing G-Core server""" service = ConfigurationService() try: result = await service.update_server(server_id, { "alias": server_data.alias, "host": server_data.host, "user": server_data.user, "password": server_data.password, "enabled": server_data.enabled, "deactivate_echo": server_data.deactivate_echo, "deactivate_live_check": server_data.deactivate_live_check }) return ServerOperationResponse( success=result.get("success", False), message=result.get("message", ""), server_id=server_id ) except ValueError as e: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e)) except Exception as e: logger.error("update_gcore_server_error", server_id=server_id, error=str(e)) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to update G-Core server: {str(e)}" ) @router.delete( "/servers/gcore/{server_id}", response_model=ServerOperationResponse, status_code=status.HTTP_200_OK, summary="Delete G-Core server", description="Delete a G-Core server" ) async def delete_gcore_server( server_id: str, current_user: User = Depends(require_administrator) ): """Delete G-Core server""" service = ConfigurationService() try: result = await service.delete_server(server_id) return ServerOperationResponse( success=result.get("success", False), message=result.get("message", ""), server_id=server_id ) except ValueError as e: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e)) except Exception as e: logger.error("delete_gcore_server_error", server_id=server_id, error=str(e)) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to delete G-Core server: {str(e)}" ) # ============ GEVISCOPE SERVERS ============ @router.get( "/servers/geviscope", response_model=GeViScopeServerListResponse, status_code=status.HTTP_200_OK, summary="List GeViScope servers", description="Get all GeViScope servers from GeViGscServer folder" ) async def list_geviscope_servers( current_user: User = Depends(require_viewer) ): """List all GeViScope servers""" service = ConfigurationService() try: servers = await _get_geviscope_servers_list(service) return GeViScopeServerListResponse(servers=servers, total=len(servers)) except Exception as e: logger.error("list_geviscope_servers_error", error=str(e)) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to list GeViScope servers: {str(e)}" ) @router.get( "/servers/geviscope/{server_id}", response_model=GeViScopeServerResponse, status_code=status.HTTP_200_OK, summary="Get GeViScope server", description="Get details of a specific GeViScope server by ID" ) async def get_geviscope_server( server_id: str, current_user: User = Depends(require_viewer) ): """Get single GeViScope server by ID""" service = ConfigurationService() try: server = await _get_geviscope_server_by_id(service, server_id) if not server: raise ValueError(f"GeViScope server '{server_id}' not found") return server except ValueError as e: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e)) except Exception as e: logger.error("get_geviscope_server_error", server_id=server_id, error=str(e)) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to get GeViScope server: {str(e)}" ) @router.post( "/servers/geviscope", response_model=ServerOperationResponse, status_code=status.HTTP_201_CREATED, summary="Create GeViScope server", description="Create a new GeViScope server" ) async def create_geviscope_server( server_data: GeViScopeServerCreate, current_user: User = Depends(require_administrator) ): """Create new GeViScope server""" service = ConfigurationService() try: result = await service.create_geviscope_server({ "alias": server_data.alias, "host": server_data.host, "user": server_data.user, "password": server_data.password, "enabled": server_data.enabled, "deactivate_echo": server_data.deactivate_echo, "deactivate_live_check": server_data.deactivate_live_check, "dialup_broadcast_aware": server_data.dialup_broadcast_aware, "dialup_connection": server_data.dialup_connection, "dialup_cpa_connection": server_data.dialup_cpa_connection, "dialup_cpa_connection_interval": server_data.dialup_cpa_connection_interval, "dialup_cpa_time_settings": server_data.dialup_cpa_time_settings, "dialup_keep_alive": server_data.dialup_keep_alive, "dialup_keep_alive_retrigger": server_data.dialup_keep_alive_retrigger, "dialup_keep_alive_time": server_data.dialup_keep_alive_time }) return ServerOperationResponse( success=result.get("success", False), message=result.get("message", ""), server_id=result.get("server_id") ) except Exception as e: logger.error("create_geviscope_server_error", error=str(e)) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to create GeViScope server: {str(e)}" ) @router.put( "/servers/geviscope/{server_id}", response_model=ServerOperationResponse, status_code=status.HTTP_200_OK, summary="Update GeViScope server", description="Update an existing GeViScope server" ) async def update_geviscope_server( server_id: str, server_data: GeViScopeServerUpdate, current_user: User = Depends(require_administrator) ): """Update existing GeViScope server""" service = ConfigurationService() try: result = await service.update_geviscope_server(server_id, { "alias": server_data.alias, "host": server_data.host, "user": server_data.user, "password": server_data.password, "enabled": server_data.enabled, "deactivate_echo": server_data.deactivate_echo, "deactivate_live_check": server_data.deactivate_live_check, "dialup_broadcast_aware": server_data.dialup_broadcast_aware, "dialup_connection": server_data.dialup_connection, "dialup_cpa_connection": server_data.dialup_cpa_connection, "dialup_cpa_connection_interval": server_data.dialup_cpa_connection_interval, "dialup_cpa_time_settings": server_data.dialup_cpa_time_settings, "dialup_keep_alive": server_data.dialup_keep_alive, "dialup_keep_alive_retrigger": server_data.dialup_keep_alive_retrigger, "dialup_keep_alive_time": server_data.dialup_keep_alive_time }) return ServerOperationResponse( success=result.get("success", False), message=result.get("message", ""), server_id=server_id ) except ValueError as e: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e)) except Exception as e: logger.error("update_geviscope_server_error", server_id=server_id, error=str(e)) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to update GeViScope server: {str(e)}" ) @router.delete( "/servers/geviscope/{server_id}", response_model=ServerOperationResponse, status_code=status.HTTP_200_OK, summary="Delete GeViScope server", description="Delete a GeViScope server" ) async def delete_geviscope_server( server_id: str, current_user: User = Depends(require_administrator) ): """Delete GeViScope server""" service = ConfigurationService() try: result = await service.delete_geviscope_server(server_id) return ServerOperationResponse( success=result.get("success", False), message=result.get("message", ""), server_id=server_id ) except ValueError as e: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e)) except Exception as e: logger.error("delete_geviscope_server_error", server_id=server_id, error=str(e)) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to delete GeViScope server: {str(e)}" ) # ============ HELPER FUNCTIONS ============ async def _get_gcore_servers_list(service: ConfigurationService) -> List[GCoreServerResponse]: """Helper to get list of G-Core servers""" gcore_folder = await service.read_configuration_path("GeViGCoreServer") servers = [] if gcore_folder.get("type") == "folder" and "children" in gcore_folder: for child in gcore_folder["children"]: if child.get("type") != "folder": continue server = _parse_gcore_server(child) if server: servers.append(server) return servers async def _get_gcore_server_by_id(service: ConfigurationService, server_id: str) -> Optional[GCoreServerResponse]: """Helper to get single G-Core server by ID""" gcore_folder = await service.read_configuration_path("GeViGCoreServer") if gcore_folder.get("type") != "folder" or "children" not in gcore_folder: return None for child in gcore_folder["children"]: if child.get("type") == "folder" and child.get("name") == server_id: return _parse_gcore_server(child) return None def _parse_gcore_server(child: dict) -> Optional[GCoreServerResponse]: """Parse G-Core server from config tree node""" server_id = child.get("name") children_dict = {c.get("name"): c for c in child.get("children", [])} return GCoreServerResponse( id=server_id, alias=children_dict.get("Alias", {}).get("value", ""), host=children_dict.get("Host", {}).get("value", ""), user=children_dict.get("User", {}).get("value", ""), password=children_dict.get("Password", {}).get("value", ""), enabled=bool(children_dict.get("Enabled", {}).get("value", 0)), deactivate_echo=bool(children_dict.get("DeactivateEcho", {}).get("value", 0)), deactivate_live_check=bool(children_dict.get("DeactivateLiveCheck", {}).get("value", 0)) ) async def _get_geviscope_servers_list(service: ConfigurationService) -> List[GeViScopeServerResponse]: """Helper to get list of GeViScope servers""" gsc_folder = await service.read_configuration_path("GeViGscServer") servers = [] if gsc_folder.get("type") == "folder" and "children" in gsc_folder: for child in gsc_folder["children"]: if child.get("type") != "folder": continue # Skip global settings (non-folder items) server = _parse_geviscope_server(child) if server: servers.append(server) return servers async def _get_geviscope_server_by_id(service: ConfigurationService, server_id: str) -> Optional[GeViScopeServerResponse]: """Helper to get single GeViScope server by ID""" gsc_folder = await service.read_configuration_path("GeViGscServer") if gsc_folder.get("type") != "folder" or "children" not in gsc_folder: return None for child in gsc_folder["children"]: if child.get("type") == "folder" and child.get("name") == server_id: return _parse_geviscope_server(child) return None def _parse_geviscope_server(child: dict) -> Optional[GeViScopeServerResponse]: """Parse GeViScope server from config tree node""" server_id = child.get("name") children_dict = {c.get("name"): c for c in child.get("children", [])} return GeViScopeServerResponse( id=server_id, alias=children_dict.get("Alias", {}).get("value", ""), host=children_dict.get("Host", {}).get("value", ""), user=children_dict.get("User", {}).get("value", ""), password=children_dict.get("Password", {}).get("value", ""), enabled=bool(children_dict.get("Enabled", {}).get("value", 0)), deactivate_echo=bool(children_dict.get("DeactivateEcho", {}).get("value", 0)), deactivate_live_check=bool(children_dict.get("DeactivateLiveCheck", {}).get("value", 0)), dialup_broadcast_aware=bool(children_dict.get("DialUpBroadcastAware", {}).get("value", 0)), dialup_connection=bool(children_dict.get("DialUpConnection", {}).get("value", 0)), dialup_cpa_connection=bool(children_dict.get("DialUpCPAConnection", {}).get("value", 0)), dialup_cpa_connection_interval=int(children_dict.get("DialUpCPAConnectionInterval", {}).get("value", 3600)), dialup_cpa_time_settings=int(children_dict.get("DialUpCPATimeSettings", {}).get("value", 16777215)), dialup_keep_alive=bool(children_dict.get("DialUpKeepAlive", {}).get("value", 0)), dialup_keep_alive_retrigger=bool(children_dict.get("DialUpKeepAliveRetrigger", {}).get("value", 0)), dialup_keep_alive_time=int(children_dict.get("DialUpKeepAliveTime", {}).get("value", 10)) )