Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# e6data Python Connector

![version](https://img.shields.io/badge/version-2.3.10-blue.svg)
![version](https://img.shields.io/badge/version-2.3.11-blue.svg)

## Introduction

Expand Down
102 changes: 93 additions & 9 deletions e6data_python_connector/cluster_manager.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
import threading
import time
import e6data_python_connector.cluster_server.cluster_pb2 as cluster_pb2
Expand All @@ -6,6 +7,8 @@
from grpc._channel import _InactiveRpcError
import multiprocessing

logger = logging.getLogger(__name__)

from e6data_python_connector.strategy import _get_active_strategy, _set_active_strategy, _set_pending_strategy, \
_get_grpc_header as _get_strategy_header

Expand Down Expand Up @@ -137,7 +140,7 @@ class ClusterManager:
"""

def __init__(self, host: str, port: int, user: str, password: str, secure_channel: bool = False, timeout=60 * 5,
cluster_uuid=None, grpc_options=None):
cluster_uuid=None, grpc_options=None, debug=False):
"""
Initializes a new instance of the ClusterManager class.

Expand All @@ -152,6 +155,7 @@ def __init__(self, host: str, port: int, user: str, password: str, secure_channe
defaults to 5 minutes.
cluster_uuid (str, optional): The unique identifier for the target cluster;
defaults to None.
debug (bool, optional): Enable debug logging; defaults to False.
"""

self._host = host
Expand All @@ -164,6 +168,7 @@ def __init__(self, host: str, port: int, user: str, password: str, secure_channe
self._grpc_options = grpc_options
if grpc_options is None:
self._grpc_options = dict()
self._debug = debug

@property
def _get_connection(self):
Expand Down Expand Up @@ -191,20 +196,22 @@ def _get_connection(self):
def _try_cluster_request(self, request_type, payload=None):
"""
Execute a cluster request with strategy fallback for 456 errors.

For efficiency:
- If we have an active strategy, use it first
- Only try authentication sequence (blue -> green) if no active strategy
- On 456 error, switch to alternative strategy and update active strategy

Args:
request_type: Type of request ('status' or 'resume')
payload: Request payload (optional, will be created if not provided)

Returns:
The response from the successful request
"""
current_strategy = _get_active_strategy()
if self._debug:
logger.info(f"Attempting {request_type} request with current strategy: {current_strategy}")

# Create payload if not provided
if payload is None:
Expand All @@ -222,6 +229,8 @@ def _try_cluster_request(self, request_type, payload=None):
# If we have an active strategy, use it first
if current_strategy is not None:
try:
if self._debug:
logger.info(f"Executing {request_type} with strategy '{current_strategy}'")
if request_type == "status":
response = self._get_connection.status(
payload,
Expand All @@ -239,16 +248,24 @@ def _try_cluster_request(self, request_type, payload=None):
if hasattr(response, 'new_strategy') and response.new_strategy:
new_strategy = response.new_strategy.lower()
if new_strategy != current_strategy:
if self._debug:
logger.info(f"Server indicated strategy transition from '{current_strategy}' to '{new_strategy}'")
_set_pending_strategy(new_strategy)

if self._debug:
logger.info(f"{request_type} request successful with strategy '{current_strategy}'")
return response

except _InactiveRpcError as e:
if e.code() == grpc.StatusCode.UNKNOWN and 'status: 456' in e.details():
# 456 error - switch to alternative strategy
alternative_strategy = 'green' if current_strategy == 'blue' else 'blue'
if self._debug:
logger.info(f"Received 456 error with strategy '{current_strategy}', switching to '{alternative_strategy}'")

try:
if self._debug:
logger.info(f"Retrying {request_type} with alternative strategy '{alternative_strategy}'")
if request_type == "status":
response = self._get_connection.status(
payload,
Expand All @@ -261,27 +278,39 @@ def _try_cluster_request(self, request_type, payload=None):
)

# Update active strategy since the alternative worked
if self._debug:
logger.info(f"Successfully switched to strategy '{alternative_strategy}'")
_set_active_strategy(alternative_strategy)

# Check for new strategy in response
if hasattr(response, 'new_strategy') and response.new_strategy:
new_strategy = response.new_strategy.lower()
if new_strategy != alternative_strategy:
if self._debug:
logger.info(f"Server indicated future strategy transition to '{new_strategy}'")
_set_pending_strategy(new_strategy)

return response

except _InactiveRpcError as e2:
if self._debug:
logger.error(f"Failed with alternative strategy '{alternative_strategy}': {e2}")
raise e # Raise the original error
else:
# Non-456 error - don't retry
if self._debug:
logger.info(f"Non-456 error received, not retrying: {e}")
raise e

# No active strategy - start with authentication logic (blue first, then green)
strategies_to_try = ['blue', 'green']
if self._debug:
logger.info("No active strategy found, trying authentication sequence: blue -> green")

for i, strategy in enumerate(strategies_to_try):
try:
if self._debug:
logger.info(f"Trying {request_type} with strategy '{strategy}' (attempt {i+1}/{len(strategies_to_try)})")

if request_type == "status":
response = self._get_connection.status(
Expand All @@ -297,12 +326,16 @@ def _try_cluster_request(self, request_type, payload=None):
raise ValueError(f"Unknown request type: {request_type}")

# Set the working strategy as active
if self._debug:
logger.info(f"Successfully authenticated with strategy '{strategy}'")
_set_active_strategy(strategy)

# Check for new strategy in response
if hasattr(response, 'new_strategy') and response.new_strategy:
new_strategy = response.new_strategy.lower()
if new_strategy != strategy:
if self._debug:
logger.info(f"Server indicated strategy transition to '{new_strategy}'")
_set_pending_strategy(new_strategy)

return response
Expand All @@ -311,23 +344,35 @@ def _try_cluster_request(self, request_type, payload=None):
if e.code() == grpc.StatusCode.UNKNOWN and 'status: 456' in e.details():
# 456 error - try next strategy
if i < len(strategies_to_try) - 1:
if self._debug:
logger.info(f"Strategy '{strategy}' returned 456 error, trying next strategy")
continue
else:
if self._debug:
logger.error(f"All strategies failed with 456 errors")
raise e
else:
# Non-456 error - don't retry
if self._debug:
logger.error(f"Strategy '{strategy}' failed with non-456 error: {e}")
raise e

# If we get here, all strategies failed
if self._debug:
logger.error("All authentication strategies exhausted")
raise e

def _check_cluster_status(self):
while True:
try:
# Use the unified strategy-aware request method
response = self._try_cluster_request("status")
if self._debug:
logger.info(f"Cluster status check returned: {response.status}")
yield response.status
except _InactiveRpcError:
except _InactiveRpcError as e:
if self._debug:
logger.warning(f"Status check failed with error: {e}")
yield None

def resume(self) -> bool:
Expand Down Expand Up @@ -357,38 +402,77 @@ def resume(self) -> bool:
- The operation is subject to the `_timeout` threshold;
if the timeout expires, the method returns False.
"""
if self._debug:
logger.info(f"Starting auto-resume for cluster {self.cluster_uuid} at {self._host}:{self._port}")

with status_lock as lock:
if lock.is_active:
if self._debug:
logger.info("Lock is already active, cluster appears to be running")
return True

# Retrieve the current cluster status with strategy header
if self._debug:
logger.info("Checking current cluster status")
try:
current_status = self._try_cluster_request("status")
except _InactiveRpcError:
if self._debug:
logger.info(f"Current cluster status: {current_status.status}")
except _InactiveRpcError as e:
if self._debug:
logger.error(f"Failed to get cluster status: {e}")
return False

if current_status.status == 'suspended':
# Send the resume request with strategy header
if self._debug:
logger.info("Cluster is suspended, sending resume request")
try:
response = self._try_cluster_request("resume")
except _InactiveRpcError:
if self._debug:
logger.info("Resume request sent successfully")
except _InactiveRpcError as e:
if self._debug:
logger.error(f"Failed to send resume request: {e}")
return False
elif current_status.status == 'active':
if self._debug:
logger.info("Cluster is already active, no action needed")
return True
elif current_status.status != 'resuming':
"""
If cluster cannot be resumed due to its current state,
If cluster cannot be resumed due to its current state,
or already in a process of resuming, terminate the operation.
"""
if self._debug:
logger.warning(f"Cluster is in state '{current_status.status}' which cannot be resumed")
return False

if self._debug:
logger.info("Monitoring cluster status until it becomes active...")
check_count = 0
for status in self._check_cluster_status():
check_count += 1
if status == 'active':
if self._debug:
logger.info(f"Cluster became active after {check_count} status checks")
return True
elif status == 'failed' or time.time() > self._timeout:
elif status == 'failed':
if self._debug:
logger.error(f"Cluster failed to resume after {check_count} status checks")
return False
elif time.time() > self._timeout:
if self._debug:
logger.error(f"Resume operation timed out after {check_count} status checks")
return False

if self._debug:
logger.info(f"Status check #{check_count}: {status}, waiting 5 seconds before next check")
# Wait for 5 seconds before the next status check
time.sleep(5)

if self._debug:
logger.warning("Resume operation completed without reaching active state")
return False

def suspend(self):
Expand Down
62 changes: 38 additions & 24 deletions e6data_python_connector/e6data_grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ def escape_string(self, item):

_escaper = HiveParamEscaper()

# Logger for the module
logger = logging.getLogger(__name__)

# Thread-safe and process-safe storage for active deployment strategy
_strategy_lock = threading.Lock()
_strategy_manager = None
Expand All @@ -137,7 +140,7 @@ def escape_string(self, item):
def _strategy_debug_log(message):
"""Log strategy debug messages if any connection has debug enabled."""
if _debug_connections:
print(f"[E6DATA_STRATEGY_DEBUG] {time.strftime('%Y-%m-%d %H:%M:%S')} - {message}")
logger.info(f"[E6DATA_STRATEGY_DEBUG] {time.strftime('%Y-%m-%d %H:%M:%S')} - {message}")


def _get_shared_strategy():
Expand Down Expand Up @@ -597,7 +600,12 @@ def get_session_id(self):
_clear_strategy_cache()
active_strategy = None
else:
raise e
if self._perform_auto_resume(e):
# Cluster resumed, retry with cached strategy
_strategy_debug_log(f"Cluster resumed, retrying with cached strategy {active_strategy}")
active_strategy = None # Force retry
else:
raise e
elif pending_strategy:
# If there's a pending strategy, force re-authentication with new strategy
active_strategy = None
Expand All @@ -615,7 +623,7 @@ def get_session_id(self):
_strategy_debug_log(f"No cached strategy, will try strategies in order: {strategies}")
last_error = None
for strategy in strategies:
_strategy_debug_log(f"Attempting authentication with strategy: {strategy}")
_strategy_debug_log(f"Attempting authentication with strategy: {strategy}.")
try:
authenticate_response = self._client.authenticate(
authenticate_request,
Expand Down Expand Up @@ -645,8 +653,13 @@ def get_session_id(self):
last_error = e
continue
else:
# Different error, handle it normally
raise e
if self._perform_auto_resume(e):
# Cluster resumed successfully, retry authentication with current strategy
_strategy_debug_log(f"Cluster resumed, retrying authentication with {strategy}")
continue
else:
# Auto-resume failed, raise the error
raise e

if not self._session_id and last_error:
# Neither strategy worked
Expand All @@ -655,30 +668,31 @@ def get_session_id(self):
if not self._session_id:
raise ValueError("Invalid credentials.")
except _InactiveRpcError as e:
if self._auto_resume:
if e.code() == grpc.StatusCode.UNAVAILABLE and 'status: 503' in e.details():
status = ClusterManager(
host=self._host,
port=self._port,
user=self.__username,
password=self.__password,
secure_channel=self._secure_channel,
cluster_uuid=self.cluster_name,
timeout=self.grpc_auto_resume_timeout_seconds
).resume()
if status:
return self.get_session_id
else:
raise e
else:
raise e
else:
raise e
self._perform_auto_resume(e)
except Exception as e:
self._channel.close()
raise e
return self._session_id

def _perform_auto_resume(self, e: _InactiveRpcError):
if self._auto_resume:
if e.code() == grpc.StatusCode.UNAVAILABLE and 'status: 503' in e.details():
status = ClusterManager(
host=self._host,
port=self._port,
user=self.__username,
password=self.__password,
secure_channel=self._secure_channel,
cluster_uuid=self.cluster_name,
timeout=self.grpc_auto_resume_timeout_seconds,
debug=self._debug
).resume()
return status # Return boolean status directly
else:
return False # Non-503 error, cannot auto-resume
else:
return False # Auto-resume disabled

def __enter__(self):
"""
Enters the runtime context related to this object.
Expand Down
Loading