From fa5ef9bd518425cda0aa85526d101b38cfb88dfb Mon Sep 17 00:00:00 2001 From: Vishal Anand <101251245+vishale6x@users.noreply.github.com> Date: Mon, 6 Oct 2025 13:15:13 +0530 Subject: [PATCH 1/4] fix: Auto resume --- e6data_python_connector/cluster_manager.py | 102 +++++++++++++++++++-- e6data_python_connector/e6data_grpc.py | 57 +++++++----- setup.py | 2 +- 3 files changed, 128 insertions(+), 33 deletions(-) diff --git a/e6data_python_connector/cluster_manager.py b/e6data_python_connector/cluster_manager.py index d60302c..d2905c9 100644 --- a/e6data_python_connector/cluster_manager.py +++ b/e6data_python_connector/cluster_manager.py @@ -1,3 +1,4 @@ +import logging import threading import time import e6data_python_connector.cluster_server.cluster_pb2 as cluster_pb2 @@ -6,6 +7,8 @@ from grpc._channel import _InactiveRpcError import multiprocessing +logger = logging.getLogger(__name__) + from e6data_python_connector.strategy import _get_active_strategy, _set_active_strategy, _set_pending_strategy, \ _get_grpc_header as _get_strategy_header @@ -137,7 +140,7 @@ class ClusterManager: """ def __init__(self, host: str, port: int, user: str, password: str, secure_channel: bool = False, timeout=60 * 5, - cluster_uuid=None, grpc_options=None): + cluster_uuid=None, grpc_options=None, debug=False): """ Initializes a new instance of the ClusterManager class. @@ -152,6 +155,7 @@ def __init__(self, host: str, port: int, user: str, password: str, secure_channe defaults to 5 minutes. cluster_uuid (str, optional): The unique identifier for the target cluster; defaults to None. + debug (bool, optional): Enable debug logging; defaults to False. """ self._host = host @@ -164,6 +168,7 @@ def __init__(self, host: str, port: int, user: str, password: str, secure_channe self._grpc_options = grpc_options if grpc_options is None: self._grpc_options = dict() + self._debug = debug @property def _get_connection(self): @@ -191,20 +196,22 @@ def _get_connection(self): def _try_cluster_request(self, request_type, payload=None): """ Execute a cluster request with strategy fallback for 456 errors. - + For efficiency: - If we have an active strategy, use it first - Only try authentication sequence (blue -> green) if no active strategy - On 456 error, switch to alternative strategy and update active strategy - + Args: request_type: Type of request ('status' or 'resume') payload: Request payload (optional, will be created if not provided) - + Returns: The response from the successful request """ current_strategy = _get_active_strategy() + if self._debug: + logger.info(f"Attempting {request_type} request with current strategy: {current_strategy}") # Create payload if not provided if payload is None: @@ -222,6 +229,8 @@ def _try_cluster_request(self, request_type, payload=None): # If we have an active strategy, use it first if current_strategy is not None: try: + if self._debug: + logger.info(f"Executing {request_type} with strategy '{current_strategy}'") if request_type == "status": response = self._get_connection.status( payload, @@ -239,16 +248,24 @@ def _try_cluster_request(self, request_type, payload=None): if hasattr(response, 'new_strategy') and response.new_strategy: new_strategy = response.new_strategy.lower() if new_strategy != current_strategy: + if self._debug: + logger.info(f"Server indicated strategy transition from '{current_strategy}' to '{new_strategy}'") _set_pending_strategy(new_strategy) + if self._debug: + logger.info(f"{request_type} request successful with strategy '{current_strategy}'") return response except _InactiveRpcError as e: if e.code() == grpc.StatusCode.UNKNOWN and 'status: 456' in e.details(): # 456 error - switch to alternative strategy alternative_strategy = 'green' if current_strategy == 'blue' else 'blue' + if self._debug: + logger.info(f"Received 456 error with strategy '{current_strategy}', switching to '{alternative_strategy}'") try: + if self._debug: + logger.info(f"Retrying {request_type} with alternative strategy '{alternative_strategy}'") if request_type == "status": response = self._get_connection.status( payload, @@ -261,27 +278,39 @@ def _try_cluster_request(self, request_type, payload=None): ) # Update active strategy since the alternative worked + if self._debug: + logger.info(f"Successfully switched to strategy '{alternative_strategy}'") _set_active_strategy(alternative_strategy) # Check for new strategy in response if hasattr(response, 'new_strategy') and response.new_strategy: new_strategy = response.new_strategy.lower() if new_strategy != alternative_strategy: + if self._debug: + logger.info(f"Server indicated future strategy transition to '{new_strategy}'") _set_pending_strategy(new_strategy) return response except _InactiveRpcError as e2: + if self._debug: + logger.error(f"Failed with alternative strategy '{alternative_strategy}': {e2}") raise e # Raise the original error else: # Non-456 error - don't retry + if self._debug: + logger.info(f"Non-456 error received, not retrying: {e}") raise e # No active strategy - start with authentication logic (blue first, then green) strategies_to_try = ['blue', 'green'] + if self._debug: + logger.info("No active strategy found, trying authentication sequence: blue -> green") for i, strategy in enumerate(strategies_to_try): try: + if self._debug: + logger.info(f"Trying {request_type} with strategy '{strategy}' (attempt {i+1}/{len(strategies_to_try)})") if request_type == "status": response = self._get_connection.status( @@ -297,12 +326,16 @@ def _try_cluster_request(self, request_type, payload=None): raise ValueError(f"Unknown request type: {request_type}") # Set the working strategy as active + if self._debug: + logger.info(f"Successfully authenticated with strategy '{strategy}'") _set_active_strategy(strategy) # Check for new strategy in response if hasattr(response, 'new_strategy') and response.new_strategy: new_strategy = response.new_strategy.lower() if new_strategy != strategy: + if self._debug: + logger.info(f"Server indicated strategy transition to '{new_strategy}'") _set_pending_strategy(new_strategy) return response @@ -311,14 +344,22 @@ def _try_cluster_request(self, request_type, payload=None): if e.code() == grpc.StatusCode.UNKNOWN and 'status: 456' in e.details(): # 456 error - try next strategy if i < len(strategies_to_try) - 1: + if self._debug: + logger.info(f"Strategy '{strategy}' returned 456 error, trying next strategy") continue else: + if self._debug: + logger.error(f"All strategies failed with 456 errors") raise e else: # Non-456 error - don't retry + if self._debug: + logger.error(f"Strategy '{strategy}' failed with non-456 error: {e}") raise e # If we get here, all strategies failed + if self._debug: + logger.error("All authentication strategies exhausted") raise e def _check_cluster_status(self): @@ -326,8 +367,12 @@ def _check_cluster_status(self): try: # Use the unified strategy-aware request method response = self._try_cluster_request("status") + if self._debug: + logger.info(f"Cluster status check returned: {response.status}") yield response.status - except _InactiveRpcError: + except _InactiveRpcError as e: + if self._debug: + logger.warning(f"Status check failed with error: {e}") yield None def resume(self) -> bool: @@ -357,38 +402,77 @@ def resume(self) -> bool: - The operation is subject to the `_timeout` threshold; if the timeout expires, the method returns False. """ + if self._debug: + logger.info(f"Starting auto-resume for cluster {self.cluster_uuid} at {self._host}:{self._port}") with status_lock as lock: if lock.is_active: + if self._debug: + logger.info("Lock is already active, cluster appears to be running") return True # Retrieve the current cluster status with strategy header + if self._debug: + logger.info("Checking current cluster status") try: current_status = self._try_cluster_request("status") - except _InactiveRpcError: + if self._debug: + logger.info(f"Current cluster status: {current_status.status}") + except _InactiveRpcError as e: + if self._debug: + logger.error(f"Failed to get cluster status: {e}") return False + if current_status.status == 'suspended': # Send the resume request with strategy header + if self._debug: + logger.info("Cluster is suspended, sending resume request") try: response = self._try_cluster_request("resume") - except _InactiveRpcError: + if self._debug: + logger.info("Resume request sent successfully") + except _InactiveRpcError as e: + if self._debug: + logger.error(f"Failed to send resume request: {e}") return False elif current_status.status == 'active': + if self._debug: + logger.info("Cluster is already active, no action needed") return True elif current_status.status != 'resuming': """ - If cluster cannot be resumed due to its current state, + If cluster cannot be resumed due to its current state, or already in a process of resuming, terminate the operation. """ + if self._debug: + logger.warning(f"Cluster is in state '{current_status.status}' which cannot be resumed") return False + if self._debug: + logger.info("Monitoring cluster status until it becomes active...") + check_count = 0 for status in self._check_cluster_status(): + check_count += 1 if status == 'active': + if self._debug: + logger.info(f"Cluster became active after {check_count} status checks") return True - elif status == 'failed' or time.time() > self._timeout: + elif status == 'failed': + if self._debug: + logger.error(f"Cluster failed to resume after {check_count} status checks") + return False + elif time.time() > self._timeout: + if self._debug: + logger.error(f"Resume operation timed out after {check_count} status checks") return False + + if self._debug: + logger.info(f"Status check #{check_count}: {status}, waiting 5 seconds before next check") # Wait for 5 seconds before the next status check time.sleep(5) + + if self._debug: + logger.warning("Resume operation completed without reaching active state") return False def suspend(self): diff --git a/e6data_python_connector/e6data_grpc.py b/e6data_python_connector/e6data_grpc.py index d7e3a2e..76d272e 100644 --- a/e6data_python_connector/e6data_grpc.py +++ b/e6data_python_connector/e6data_grpc.py @@ -550,7 +550,12 @@ def get_session_id(self): _clear_strategy_cache() active_strategy = None else: - raise e + if self._perform_auto_resume(e): + # Cluster resumed, retry with cached strategy + _strategy_debug_log(f"Cluster resumed, retrying with cached strategy {active_strategy}") + active_strategy = None # Force retry + else: + raise e elif pending_strategy: # If there's a pending strategy, force re-authentication with new strategy active_strategy = None @@ -568,7 +573,7 @@ def get_session_id(self): _strategy_debug_log(f"No cached strategy, will try strategies in order: {strategies}") last_error = None for strategy in strategies: - _strategy_debug_log(f"Attempting authentication with strategy: {strategy}") + _strategy_debug_log(f"1Attempting authentication with strategy: {strategy}") try: authenticate_response = self._client.authenticate( authenticate_request, @@ -598,8 +603,13 @@ def get_session_id(self): last_error = e continue else: - # Different error, handle it normally - raise e + if self._perform_auto_resume(e): + # Cluster resumed successfully, retry authentication with current strategy + _strategy_debug_log(f"Cluster resumed, retrying authentication with {strategy}") + continue + else: + # Auto-resume failed, raise the error + raise e if not self._session_id and last_error: # Neither strategy worked @@ -608,30 +618,31 @@ def get_session_id(self): if not self._session_id: raise ValueError("Invalid credentials.") except _InactiveRpcError as e: - if self._auto_resume: - if e.code() == grpc.StatusCode.UNAVAILABLE and 'status: 503' in e.details(): - status = ClusterManager( - host=self._host, - port=self._port, - user=self.__username, - password=self.__password, - secure_channel=self._secure_channel, - cluster_uuid=self.cluster_name, - timeout=self.grpc_auto_resume_timeout_seconds - ).resume() - if status: - return self.get_session_id - else: - raise e - else: - raise e - else: - raise e + self._perform_auto_resume(e) except Exception as e: self._channel.close() raise e return self._session_id + def _perform_auto_resume(self, e: _InactiveRpcError): + if self._auto_resume: + if e.code() == grpc.StatusCode.UNAVAILABLE and 'status: 503' in e.details(): + status = ClusterManager( + host=self._host, + port=self._port, + user=self.__username, + password=self.__password, + secure_channel=self._secure_channel, + cluster_uuid=self.cluster_name, + timeout=self.grpc_auto_resume_timeout_seconds, + debug=self._debug + ).resume() + return status # Return boolean status directly + else: + return False # Non-503 error, cannot auto-resume + else: + return False # Auto-resume disabled + def __enter__(self): """ Enters the runtime context related to this object. diff --git a/setup.py b/setup.py index 7fa476f..2e74dba 100644 --- a/setup.py +++ b/setup.py @@ -12,7 +12,7 @@ import setuptools -VERSION = (2, 3, 10,) +VERSION = (2, 3, 11, 'rc1') def get_long_desc(): From 62e55cc0c62465caf218a9ca7939a754d12bd781 Mon Sep 17 00:00:00 2001 From: Vishal Anand <101251245+vishale6x@users.noreply.github.com> Date: Mon, 6 Oct 2025 14:39:43 +0530 Subject: [PATCH 2/4] fix: Auto resume --- e6data_python_connector/e6data_grpc.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/e6data_python_connector/e6data_grpc.py b/e6data_python_connector/e6data_grpc.py index 76d272e..e6fe8ce 100644 --- a/e6data_python_connector/e6data_grpc.py +++ b/e6data_python_connector/e6data_grpc.py @@ -7,6 +7,7 @@ from __future__ import unicode_literals import datetime +import logging import re import sys import time @@ -112,6 +113,9 @@ def escape_string(self, item): _escaper = HiveParamEscaper() +# Logger for the module +logger = logging.getLogger(__name__) + # Thread-safe and process-safe storage for active deployment strategy _strategy_lock = threading.Lock() _strategy_manager = None @@ -134,7 +138,7 @@ def escape_string(self, item): def _strategy_debug_log(message): """Log strategy debug messages if any connection has debug enabled.""" if _debug_connections: - print(f"[E6DATA_STRATEGY_DEBUG] {time.strftime('%Y-%m-%d %H:%M:%S')} - {message}") + logger.info(f"[E6DATA_STRATEGY_DEBUG] {time.strftime('%Y-%m-%d %H:%M:%S')} - {message}") def _get_shared_strategy(): From f0267ee3f40729e9f44e19941631c6dcd7873012 Mon Sep 17 00:00:00 2001 From: Vishal Anand <101251245+vishale6x@users.noreply.github.com> Date: Mon, 6 Oct 2025 14:50:01 +0530 Subject: [PATCH 3/4] fix: Auto resume --- README.md | 2 +- setup.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index cd0c609..a9e594c 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # e6data Python Connector -![version](https://img.shields.io/badge/version-2.3.10-blue.svg) +![version](https://img.shields.io/badge/version-2.3.11-blue.svg) ## Introduction diff --git a/setup.py b/setup.py index 2e74dba..edc50e0 100644 --- a/setup.py +++ b/setup.py @@ -12,7 +12,7 @@ import setuptools -VERSION = (2, 3, 11, 'rc1') +VERSION = (2, 3, 11,) def get_long_desc(): From eba7e8f47da7a62c6c53a7f614658d844872ada6 Mon Sep 17 00:00:00 2001 From: Vishal Anand <101251245+vishale6x@users.noreply.github.com> Date: Tue, 7 Oct 2025 19:33:21 +0530 Subject: [PATCH 4/4] fix: Auto resume --- e6data_python_connector/e6data_grpc.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/e6data_python_connector/e6data_grpc.py b/e6data_python_connector/e6data_grpc.py index e6fe8ce..a6a375c 100644 --- a/e6data_python_connector/e6data_grpc.py +++ b/e6data_python_connector/e6data_grpc.py @@ -577,7 +577,7 @@ def get_session_id(self): _strategy_debug_log(f"No cached strategy, will try strategies in order: {strategies}") last_error = None for strategy in strategies: - _strategy_debug_log(f"1Attempting authentication with strategy: {strategy}") + _strategy_debug_log(f"Attempting authentication with strategy: {strategy}.") try: authenticate_response = self._client.authenticate( authenticate_request,