Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# e6data Python Connector

![version](https://img.shields.io/badge/version-2.2.6-blue.svg)
![version](https://img.shields.io/badge/version-2.3.6-blue.svg)

## Introduction

Expand Down
63 changes: 22 additions & 41 deletions e6data_python_connector/cluster_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,9 @@
import grpc
from grpc._channel import _InactiveRpcError
import multiprocessing
import logging

# Import strategy management functions
from e6data_python_connector.strategy import _get_active_strategy, _set_active_strategy, _set_pending_strategy, _get_grpc_header as _get_strategy_header

# Set up logging
_logger = logging.getLogger(__name__)
from e6data_python_connector.strategy import _get_active_strategy, _set_active_strategy, _set_pending_strategy, \
_get_grpc_header as _get_strategy_header


def _get_grpc_header(engine_ip=None, cluster=None, strategy=None):
Expand Down Expand Up @@ -140,7 +136,8 @@ class ClusterManager:
cluster_uuid (str): The unique identifier for the target cluster.
"""

def __init__(self, host: str, port: int, user: str, password: str, secure_channel: bool = False, timeout=60 * 5, cluster_uuid=None, grpc_options=None):
def __init__(self, host: str, port: int, user: str, password: str, secure_channel: bool = False, timeout=60 * 5,
cluster_uuid=None, grpc_options=None):
"""
Initializes a new instance of the ClusterManager class.

Expand Down Expand Up @@ -208,7 +205,7 @@ def _try_cluster_request(self, request_type, payload=None):
The response from the successful request
"""
current_strategy = _get_active_strategy()

# Create payload if not provided
if payload is None:
if request_type == "status":
Expand All @@ -221,12 +218,10 @@ def _try_cluster_request(self, request_type, payload=None):
user=self._user,
password=self._password
)

# If we have an active strategy, use it first
if current_strategy is not None:
try:
_logger.info(f"ClusterManager: Trying {request_type} with established strategy: {current_strategy}")

if request_type == "status":
response = self._get_connection.status(
payload,
Expand All @@ -239,22 +234,20 @@ def _try_cluster_request(self, request_type, payload=None):
)
else:
raise ValueError(f"Unknown request type: {request_type}")

# Check for new strategy in response
if hasattr(response, 'new_strategy') and response.new_strategy:
new_strategy = response.new_strategy.lower()
if new_strategy != current_strategy:
_logger.info(f"ClusterManager: Server indicated new strategy during {request_type}: {new_strategy}")
_set_pending_strategy(new_strategy)

return response

except _InactiveRpcError as e:
if e.code() == grpc.StatusCode.UNKNOWN and 'status: 456' in e.details():
# 456 error - switch to alternative strategy
alternative_strategy = 'green' if current_strategy == 'blue' else 'blue'
_logger.info(f"ClusterManager: {request_type} failed with 456 error on {current_strategy}, switching to: {alternative_strategy}")


try:
if request_type == "status":
response = self._get_connection.status(
Expand All @@ -266,36 +259,30 @@ def _try_cluster_request(self, request_type, payload=None):
payload,
metadata=_get_grpc_header(cluster=self.cluster_uuid, strategy=alternative_strategy)
)

# Update active strategy since the alternative worked
_set_active_strategy(alternative_strategy)
_logger.info(f"ClusterManager: {request_type} succeeded with alternative strategy: {alternative_strategy}")


# Check for new strategy in response
if hasattr(response, 'new_strategy') and response.new_strategy:
new_strategy = response.new_strategy.lower()
if new_strategy != alternative_strategy:
_logger.info(f"ClusterManager: Server indicated new strategy during {request_type}: {new_strategy}")
_set_pending_strategy(new_strategy)

return response

except _InactiveRpcError as e2:
_logger.error(f"ClusterManager: Both strategies failed for {request_type}. Original error: {e}, Alternative error: {e2}")
raise e # Raise the original error
else:
# Non-456 error - don't retry
_logger.error(f"ClusterManager: {request_type} failed with non-456 error: {e}")
raise e

# No active strategy - start with authentication logic (blue first, then green)
_logger.info(f"ClusterManager: No active strategy, starting authentication sequence for {request_type}")
strategies_to_try = ['blue', 'green']

for i, strategy in enumerate(strategies_to_try):
try:
_logger.info(f"ClusterManager: Trying {request_type} with strategy: {strategy}")


if request_type == "status":
response = self._get_connection.status(
payload,
Expand All @@ -308,36 +295,30 @@ def _try_cluster_request(self, request_type, payload=None):
)
else:
raise ValueError(f"Unknown request type: {request_type}")

# Set the working strategy as active
_set_active_strategy(strategy)
_logger.info(f"ClusterManager: {request_type} succeeded with strategy: {strategy}")


# Check for new strategy in response
if hasattr(response, 'new_strategy') and response.new_strategy:
new_strategy = response.new_strategy.lower()
if new_strategy != strategy:
_logger.info(f"ClusterManager: Server indicated new strategy during {request_type}: {new_strategy}")
_set_pending_strategy(new_strategy)

return response

except _InactiveRpcError as e:
if e.code() == grpc.StatusCode.UNKNOWN and 'status: 456' in e.details():
# 456 error - try next strategy
if i < len(strategies_to_try) - 1:
_logger.info(f"ClusterManager: {request_type} failed with 456 error on {strategy}, trying next strategy: {strategies_to_try[i + 1]}")
continue
else:
_logger.error(f"ClusterManager: {request_type} failed with 456 error on all strategies")
raise e
else:
# Non-456 error - don't retry
_logger.error(f"ClusterManager: {request_type} failed with non-456 error: {e}")
raise e

# If we get here, all strategies failed
_logger.error(f"ClusterManager: All strategies failed for {request_type}")
raise e

def _check_cluster_status(self):
Expand Down
Loading