Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# e6data Python Connector

![version](https://img.shields.io/badge/version-2.2.2-blue.svg)
![version](https://img.shields.io/badge/version-2.2.3-blue.svg)

## Introduction

Expand Down
68 changes: 46 additions & 22 deletions e6data_python_connector/cluster_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,19 @@


def _get_grpc_header(engine_ip=None, cluster=None):
"""
Generate gRPC metadata headers for the request.

This function creates a list of metadata headers to be used in gRPC requests.
It includes optional headers for the engine IP and cluster UUID.

Args:
engine_ip (str, optional): The IP address of the engine. Defaults to None.
cluster (str, optional): The UUID of the cluster. Defaults to None.

Returns:
list: A list of tuples representing the gRPC metadata headers.
"""
metadata = []
if engine_ip:
metadata.append(('plannerip', engine_ip))
Expand Down Expand Up @@ -123,7 +136,7 @@ class ClusterManager:
cluster_uuid (str): The unique identifier for the target cluster.
"""

def __init__(self, host: str, port: int, user: str, password: str, secure_channel: bool = False, timeout=60 * 3, cluster_uuid=None):
def __init__(self, host: str, port: int, user: str, password: str, secure_channel: bool = False, timeout=60 * 3, cluster_uuid=None, grpc_options=None):
"""
Initializes a new instance of the ClusterManager class.

Expand All @@ -147,6 +160,9 @@ def __init__(self, host: str, port: int, user: str, password: str, secure_channe
self._timeout = time.time() + timeout
self._secure_channel = secure_channel
self.cluster_uuid = cluster_uuid
self._grpc_options = grpc_options
if grpc_options is None:
self._grpc_options = dict()

@property
def _get_connection(self):
Expand All @@ -161,14 +177,34 @@ def _get_connection(self):
if self._secure_channel:
self._channel = grpc.secure_channel(
target='{}:{}'.format(self._host, self._port),
options=self._grpc_options,
credentials=grpc.ssl_channel_credentials()
)
else:
self._channel = grpc.insecure_channel(
target='{}:{}'.format(self._host, self._port)
target='{}:{}'.format(self._host, self._port),
options=self._grpc_options
)
return cluster_pb2_grpc.ClusterServiceStub(self._channel)

def _check_cluster_status(self):
while True:
try:
# Create a status request payload with user credentials
status_payload = cluster_pb2.ClusterStatusRequest(
user=self._user,
password=self._password
)
# Send the status request to the cluster service
response = self._get_connection.status(
status_payload,
metadata=_get_grpc_header(cluster=self.cluster_uuid)
)
# Yield the current status
yield response.status
except _InactiveRpcError as e:
yield None

def resume(self) -> bool:
"""
Resumes the cluster if it is currently suspended or not in the 'active' state.
Expand Down Expand Up @@ -229,27 +265,15 @@ def resume(self) -> bool:
"""
return False

# Wait for the cluster to become active
while True:
try:
status_payload = cluster_pb2.ClusterStatusRequest(
user=self._user,
password=self._password
)
response = self._get_connection.status(
status_payload,
metadata=_get_grpc_header(cluster=self.cluster_uuid)
)
if response.status == 'active':
lock.set_active()
return True
if response.status in ['suspended', 'failed']:
return False
if time.time() > self._timeout:
return False
except _InactiveRpcError as e:
pass
for status in self._check_cluster_status():
if status == 'active':
lock.set_active()
return True
elif status == 'failed' or time.time() > self._timeout:
return False
# Wait for 5 seconds before the next status check
time.sleep(5)
return False

def suspend(self):
"""
Expand Down
Loading