From b95f47a8c56072c8d9f6a6d25c697e71ff63d21f Mon Sep 17 00:00:00 2001 From: Vishal Anand <101251245+vishale6x@users.noreply.github.com> Date: Mon, 24 Mar 2025 17:43:10 +0530 Subject: [PATCH 01/11] Version changes. --- README.md | 2 +- e6data_python_connector/e6data_grpc.py | 26 +++++++++++++++----------- setup.py | 2 +- 3 files changed, 17 insertions(+), 13 deletions(-) diff --git a/README.md b/README.md index b0f4a92..914185a 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # e6data Python Connector -![version](https://img.shields.io/badge/version-2.2.2-blue.svg) +![version](https://img.shields.io/badge/version-2.2.3.rc2-blue.svg) ## Introduction diff --git a/e6data_python_connector/e6data_grpc.py b/e6data_python_connector/e6data_grpc.py index ddb7eb9..d5aa729 100644 --- a/e6data_python_connector/e6data_grpc.py +++ b/e6data_python_connector/e6data_grpc.py @@ -11,6 +11,7 @@ import logging import re import sys +import time from decimal import Decimal from io import BytesIO from ssl import CERT_NONE, CERT_OPTIONAL, CERT_REQUIRED @@ -68,18 +69,21 @@ def _parse_timestamp(value): def re_auth(func): def wrapper(self, *args, **kwargs): - try: - return func(self, *args, **kwargs) - except _InactiveRpcError as e: - print(f'RE_AUTH: Function Name: {func}') - print(f'RE_AUTH: Error Found {e}') - if e.code() == grpc.StatusCode.INTERNAL and 'Access denied' in e.details(): - print('RE_AUTH: Initialising re-authentication.') - self.connection.get_re_authenticate_session_id() - print(f'RE_AUTH: Re-auth successful.') + max_retry = 5 + current_retry = 0 + while current_retry < max_retry: + try: return func(self, *args, **kwargs) - else: - raise e + except _InactiveRpcError as e: + current_retry += 1 + if current_retry == max_retry: + raise e + if e.code() == grpc.StatusCode.INTERNAL and 'Access denied' in e.details(): + time.sleep(0.2) + print(f'RE_AUTH: Function Name: {func}') + print(f'RE_AUTH: Error Found {e}') + else: + raise e return wrapper diff --git a/setup.py b/setup.py index 0b39e2a..73e2e71 100644 --- a/setup.py +++ b/setup.py @@ -12,7 +12,7 @@ import setuptools -VERSION = (2, 2, 2) +VERSION = (2, 2, 2, '3.rc2') def get_long_desc(): From 2d714abf9e96d152c7fabb03d991e9673954c485 Mon Sep 17 00:00:00 2001 From: Vishal Anand <101251245+vishale6x@users.noreply.github.com> Date: Mon, 24 Mar 2025 17:53:08 +0530 Subject: [PATCH 02/11] Version changes. --- README.md | 2 +- setup.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 914185a..c3ec220 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # e6data Python Connector -![version](https://img.shields.io/badge/version-2.2.3.rc2-blue.svg) +![version](https://img.shields.io/badge/version-2.2.3rc2-blue.svg) ## Introduction diff --git a/setup.py b/setup.py index 73e2e71..8122476 100644 --- a/setup.py +++ b/setup.py @@ -12,7 +12,7 @@ import setuptools -VERSION = (2, 2, 2, '3.rc2') +VERSION = (2, 2, 3, 'rc2') def get_long_desc(): From 86f03eea9c5a564082f668bba2365739c6c9eebe Mon Sep 17 00:00:00 2001 From: Vishal Anand <101251245+vishale6x@users.noreply.github.com> Date: Mon, 24 Mar 2025 18:55:09 +0530 Subject: [PATCH 03/11] Version changes. --- README.md | 2 +- e6data_python_connector/e6data_grpc.py | 2 +- setup.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index c3ec220..cb555d8 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # e6data Python Connector -![version](https://img.shields.io/badge/version-2.2.3rc2-blue.svg) +![version](https://img.shields.io/badge/version-2.2.3rc3-blue.svg) ## Introduction diff --git a/e6data_python_connector/e6data_grpc.py b/e6data_python_connector/e6data_grpc.py index d5aa729..be5d3f3 100644 --- a/e6data_python_connector/e6data_grpc.py +++ b/e6data_python_connector/e6data_grpc.py @@ -82,9 +82,9 @@ def wrapper(self, *args, **kwargs): time.sleep(0.2) print(f'RE_AUTH: Function Name: {func}') print(f'RE_AUTH: Error Found {e}') + self.connection.get_re_authenticate_session_id() else: raise e - return wrapper diff --git a/setup.py b/setup.py index 8122476..e369d99 100644 --- a/setup.py +++ b/setup.py @@ -12,7 +12,7 @@ import setuptools -VERSION = (2, 2, 3, 'rc2') +VERSION = (2, 2, 3, 'rc3') def get_long_desc(): From 62144e6fe17f48f7ba49c8091bf54883397bd8e7 Mon Sep 17 00:00:00 2001 From: Vishal Anand <101251245+vishale6x@users.noreply.github.com> Date: Tue, 25 Mar 2025 15:52:40 +0530 Subject: [PATCH 04/11] Version changes. --- README.md | 2 +- e6data_python_connector/e6data_grpc.py | 83 ++++++++++++++++++-------- setup.py | 2 +- 3 files changed, 61 insertions(+), 26 deletions(-) diff --git a/README.md b/README.md index cb555d8..6c76442 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # e6data Python Connector -![version](https://img.shields.io/badge/version-2.2.3rc3-blue.svg) +![version](https://img.shields.io/badge/version-2.2.3rc4-blue.svg) ## Introduction diff --git a/e6data_python_connector/e6data_grpc.py b/e6data_python_connector/e6data_grpc.py index be5d3f3..d2965f0 100644 --- a/e6data_python_connector/e6data_grpc.py +++ b/e6data_python_connector/e6data_grpc.py @@ -80,11 +80,12 @@ def wrapper(self, *args, **kwargs): raise e if e.code() == grpc.StatusCode.INTERNAL and 'Access denied' in e.details(): time.sleep(0.2) - print(f'RE_AUTH: Function Name: {func}') - print(f'RE_AUTH: Error Found {e}') + _logger.info(f'RE_AUTH: Function Name: {func}') + _logger.info(f'RE_AUTH: Error Found {e}') self.connection.get_re_authenticate_session_id() else: raise e + return wrapper @@ -173,6 +174,7 @@ def __init__( - max_receive_message_length: This parameter sets the maximum allowed size (in bytes) for incoming messages on the gRPC server. - max_send_message_length: Similar to max_receive_message_length, this parameter sets the maximum allowed size (in bytes) for outgoing messages from the gRPC client - grpc_prepare_timeout: Timeout for prepare statement API call (default to 10 minutes). + - keepalive_time_ms: This parameter defines the time, in milliseconds, Default to 30 seconds """ if not username or not password: raise ValueError("username or password cannot be empty.") @@ -192,43 +194,76 @@ def __init__( self._auto_resume = auto_resume - self._keepalive_timeout_ms = 900000 - self._max_receive_message_length = -1 - self._max_send_message_length = 300 * 1024 * 1024 # mb - self.grpc_prepare_timeout = 10 * 60 # 10 minutes - - if isinstance(grpc_options, dict): - self._keepalive_timeout_ms = grpc_options.get('keepalive_timeout_ms') or self._keepalive_timeout_ms - self._max_receive_message_length = grpc_options.get( - 'max_receive_message_length') or self._max_receive_message_length - self._max_send_message_length = grpc_options.get('max_send_message_length') or self._max_send_message_length - self.grpc_prepare_timeout = grpc_options.get('grpc_prepare_timeout') or self.grpc_prepare_timeout + self._grpc_options = grpc_options + if self._grpc_options is None: + self._grpc_options = dict() + self.grpc_prepare_timeout = self._grpc_options.get('grpc_prepare_timeout') or 10 * 60 # 10 minutes self._create_client() + @property + def _get_grpc_options(self): + """ + Property to get gRPC options for the connection. + + This method checks if the gRPC options are already cached. If not, it creates a copy of the + provided gRPC options and merges them with the default options. The merged options are then + cached for future use. + + Returns: + list: A list of tuples containing gRPC options. + """ + if not hasattr(self, '_cached_grpc_options'): + grpc_options = self._grpc_options.copy() + default_options = { + "keepalive_timeout_ms": 900000, # Time in milliseconds to keep the connection alive. + "max_receive_message_length": -1, # Maximum size of received messages. + "max_send_message_length": 300 * 1024 * 1024, # Maximum size of sent messages (300 MB). + "grpc_prepare_timeout": self.grpc_prepare_timeout, # Timeout for prepare statement API call. + "keepalive_time_ms": 30000, # Time in milliseconds between keep-alive pings. + "keepalive_permit_without_calls": 1, # Allow keep-alives with no active RPCs. + "http2.max_pings_without_data": 0, # Unlimited pings without data. + "http2.min_time_between_pings_ms": 15000, # Minimum time between pings (15 seconds). + "http2.min_ping_interval_without_data_ms": 15000, # Minimum interval between pings without data (15 seconds). + } + if grpc_options: + for key, value in grpc_options.items(): + default_options[key] = value + + self._cached_grpc_options = [(f'grpc.{key}', value) for key, value in default_options.items()] + + return self._cached_grpc_options + def _create_client(self): + """ + Creates a gRPC client for the connection. + + This method initializes a gRPC channel based on whether a secure channel is required or not. + It then creates a client stub for the QueryEngineService. + + If the secure channel is enabled, it uses `grpc.secure_channel` with SSL credentials. + Otherwise, it uses `grpc.insecure_channel`. + + The gRPC options are retrieved from the `_get_grpc_options` property. + + Raises: + grpc.RpcError: If there is an error in creating the gRPC channel or client stub. + """ if self._secure_channel: self._channel = grpc.secure_channel( target='{}:{}'.format(self._host, self._port), - options=[ - ("grpc.keepalive_timeout_ms", self._keepalive_timeout_ms), - ('grpc.max_send_message_length', self._max_send_message_length), - ('grpc.max_receive_message_length', self._max_receive_message_length) - ], + options=self._get_grpc_options, credentials=grpc.ssl_channel_credentials() ) else: self._channel = grpc.insecure_channel( target='{}:{}'.format(self._host, self._port), - options=[ - ("grpc.keepalive_timeout_ms", self._keepalive_timeout_ms), - ('grpc.max_send_message_length', self._max_send_message_length), - ('grpc.max_receive_message_length', self._max_receive_message_length) - ] + options=self._get_grpc_options ) self._client = e6x_engine_pb2_grpc.QueryEngineServiceStub(self._channel) def get_re_authenticate_session_id(self): - self._session_id = None + self.close() + self._create_client() return self.get_session_id @property diff --git a/setup.py b/setup.py index e369d99..41f4e9b 100644 --- a/setup.py +++ b/setup.py @@ -12,7 +12,7 @@ import setuptools -VERSION = (2, 2, 3, 'rc3') +VERSION = (2, 2, 3, 'rc4') def get_long_desc(): From b4a5623b95e3a13fc903562fb57d8cbf9040c555 Mon Sep 17 00:00:00 2001 From: Vishal Anand <101251245+vishale6x@users.noreply.github.com> Date: Tue, 25 Mar 2025 16:01:18 +0530 Subject: [PATCH 05/11] Logs added. --- e6data_python_connector/cluster_manager.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/e6data_python_connector/cluster_manager.py b/e6data_python_connector/cluster_manager.py index 0b0883b..19d82b3 100644 --- a/e6data_python_connector/cluster_manager.py +++ b/e6data_python_connector/cluster_manager.py @@ -220,6 +220,7 @@ def resume(self) -> bool: payload, metadata=_get_grpc_header(cluster=self.cluster_uuid) ) + print(f'Cluster resume response: {response}') elif current_status.status == 'active': return True elif current_status.status != 'resuming': @@ -227,6 +228,7 @@ def resume(self) -> bool: If cluster cannot be resumed due to its current state, or already in a process of resuming, terminate the operation. """ + print(f'Cluster is not suspended status, raising error.') return False # Wait for the cluster to become active @@ -240,12 +242,16 @@ def resume(self) -> bool: status_payload, metadata=_get_grpc_header(cluster=self.cluster_uuid) ) + print(f'Cluster status response: {response}') if response.status == 'active': + print('Cluster is now active, starting execution.') lock.set_active() return True if response.status in ['suspended', 'failed']: + print(f'Trying to resume the cluster, found status: {response.status}, raising error.') return False if time.time() > self._timeout: + print('Cluster resume timed out.') return False except _InactiveRpcError as e: pass From a7ae396eaf8f0fd8204253011a058ff36ef117c2 Mon Sep 17 00:00:00 2001 From: Vishal Anand <101251245+vishale6x@users.noreply.github.com> Date: Tue, 25 Mar 2025 16:18:15 +0530 Subject: [PATCH 06/11] Logs added. --- README.md | 2 +- e6data_python_connector/cluster_manager.py | 2 +- setup.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 6c76442..4b7f861 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # e6data Python Connector -![version](https://img.shields.io/badge/version-2.2.3rc4-blue.svg) +![version](https://img.shields.io/badge/version-2.2.3rc5-blue.svg) ## Introduction diff --git a/e6data_python_connector/cluster_manager.py b/e6data_python_connector/cluster_manager.py index 19d82b3..789653c 100644 --- a/e6data_python_connector/cluster_manager.py +++ b/e6data_python_connector/cluster_manager.py @@ -247,7 +247,7 @@ def resume(self) -> bool: print('Cluster is now active, starting execution.') lock.set_active() return True - if response.status in ['suspended', 'failed']: + if response.status in ['failed']: print(f'Trying to resume the cluster, found status: {response.status}, raising error.') return False if time.time() > self._timeout: diff --git a/setup.py b/setup.py index 41f4e9b..d23720d 100644 --- a/setup.py +++ b/setup.py @@ -12,7 +12,7 @@ import setuptools -VERSION = (2, 2, 3, 'rc4') +VERSION = (2, 2, 3, 'rc5') def get_long_desc(): From c4433990646af37fe63034bea3b62eabdd298776 Mon Sep 17 00:00:00 2001 From: Vishal Anand <101251245+vishale6x@users.noreply.github.com> Date: Tue, 25 Mar 2025 17:32:37 +0530 Subject: [PATCH 07/11] Version 2.2.3 --- README.md | 2 +- setup.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 4b7f861..d600c73 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # e6data Python Connector -![version](https://img.shields.io/badge/version-2.2.3rc5-blue.svg) +![version](https://img.shields.io/badge/version-2.2.3-blue.svg) ## Introduction diff --git a/setup.py b/setup.py index d23720d..25cab14 100644 --- a/setup.py +++ b/setup.py @@ -12,7 +12,7 @@ import setuptools -VERSION = (2, 2, 3, 'rc5') +VERSION = (2, 2, 3) def get_long_desc(): From 10d6a98af0fb0bb3759e1b2105b29155d5e3074c Mon Sep 17 00:00:00 2001 From: Vishal Anand <101251245+vishale6x@users.noreply.github.com> Date: Tue, 25 Mar 2025 17:33:34 +0530 Subject: [PATCH 08/11] Logs removed. --- e6data_python_connector/cluster_manager.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/e6data_python_connector/cluster_manager.py b/e6data_python_connector/cluster_manager.py index 789653c..baffdc3 100644 --- a/e6data_python_connector/cluster_manager.py +++ b/e6data_python_connector/cluster_manager.py @@ -220,7 +220,6 @@ def resume(self) -> bool: payload, metadata=_get_grpc_header(cluster=self.cluster_uuid) ) - print(f'Cluster resume response: {response}') elif current_status.status == 'active': return True elif current_status.status != 'resuming': @@ -228,7 +227,6 @@ def resume(self) -> bool: If cluster cannot be resumed due to its current state, or already in a process of resuming, terminate the operation. """ - print(f'Cluster is not suspended status, raising error.') return False # Wait for the cluster to become active @@ -242,16 +240,12 @@ def resume(self) -> bool: status_payload, metadata=_get_grpc_header(cluster=self.cluster_uuid) ) - print(f'Cluster status response: {response}') if response.status == 'active': - print('Cluster is now active, starting execution.') lock.set_active() return True if response.status in ['failed']: - print(f'Trying to resume the cluster, found status: {response.status}, raising error.') return False if time.time() > self._timeout: - print('Cluster resume timed out.') return False except _InactiveRpcError as e: pass From c87fe9accca68b352cb68317f7c7cdc44ee08282 Mon Sep 17 00:00:00 2001 From: Vishal Anand <101251245+vishale6x@users.noreply.github.com> Date: Tue, 25 Mar 2025 23:04:40 +0530 Subject: [PATCH 09/11] Generator added for checking cluster status. --- e6data_python_connector/cluster_manager.py | 46 ++++++++++++---------- 1 file changed, 26 insertions(+), 20 deletions(-) diff --git a/e6data_python_connector/cluster_manager.py b/e6data_python_connector/cluster_manager.py index baffdc3..8bb3ce5 100644 --- a/e6data_python_connector/cluster_manager.py +++ b/e6data_python_connector/cluster_manager.py @@ -169,6 +169,24 @@ def _get_connection(self): ) return cluster_pb2_grpc.ClusterServiceStub(self._channel) + def _check_cluster_status(self): + while True: + try: + # Create a status request payload with user credentials + status_payload = cluster_pb2.ClusterStatusRequest( + user=self._user, + password=self._password + ) + # Send the status request to the cluster service + response = self._get_connection.status( + status_payload, + metadata=_get_grpc_header(cluster=self.cluster_uuid) + ) + # Yield the current status + yield response.status + except _InactiveRpcError as e: + yield None + def resume(self) -> bool: """ Resumes the cluster if it is currently suspended or not in the 'active' state. @@ -229,27 +247,15 @@ def resume(self) -> bool: """ return False - # Wait for the cluster to become active - while True: - try: - status_payload = cluster_pb2.ClusterStatusRequest( - user=self._user, - password=self._password - ) - response = self._get_connection.status( - status_payload, - metadata=_get_grpc_header(cluster=self.cluster_uuid) - ) - if response.status == 'active': - lock.set_active() - return True - if response.status in ['failed']: - return False - if time.time() > self._timeout: - return False - except _InactiveRpcError as e: - pass + for status in self._check_cluster_status(): + if status == 'active': + lock.set_active() + return True + elif status == 'failed' or time.time() > self._timeout: + return False + # Wait for 5 seconds before the next status check time.sleep(5) + return False def suspend(self): """ From e01c4b784c3d0cfd321b959ce64960809ba41e34 Mon Sep 17 00:00:00 2001 From: Vishal Anand <101251245+vishale6x@users.noreply.github.com> Date: Tue, 25 Mar 2025 23:10:35 +0530 Subject: [PATCH 10/11] Docs updated. --- e6data_python_connector/cluster_manager.py | 9 +++++++-- e6data_python_connector/e6data_grpc.py | 12 ++++++++++++ 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/e6data_python_connector/cluster_manager.py b/e6data_python_connector/cluster_manager.py index 8bb3ce5..a9f05cd 100644 --- a/e6data_python_connector/cluster_manager.py +++ b/e6data_python_connector/cluster_manager.py @@ -123,7 +123,7 @@ class ClusterManager: cluster_uuid (str): The unique identifier for the target cluster. """ - def __init__(self, host: str, port: int, user: str, password: str, secure_channel: bool = False, timeout=60 * 3, cluster_uuid=None): + def __init__(self, host: str, port: int, user: str, password: str, secure_channel: bool = False, timeout=60 * 3, cluster_uuid=None, grpc_options=None): """ Initializes a new instance of the ClusterManager class. @@ -147,6 +147,9 @@ def __init__(self, host: str, port: int, user: str, password: str, secure_channe self._timeout = time.time() + timeout self._secure_channel = secure_channel self.cluster_uuid = cluster_uuid + self._grpc_options = grpc_options + if grpc_options is None: + self._grpc_options = dict() @property def _get_connection(self): @@ -161,11 +164,13 @@ def _get_connection(self): if self._secure_channel: self._channel = grpc.secure_channel( target='{}:{}'.format(self._host, self._port), + options=self._grpc_options, credentials=grpc.ssl_channel_credentials() ) else: self._channel = grpc.insecure_channel( - target='{}:{}'.format(self._host, self._port) + target='{}:{}'.format(self._host, self._port), + options=self._grpc_options ) return cluster_pb2_grpc.ClusterServiceStub(self._channel) diff --git a/e6data_python_connector/e6data_grpc.py b/e6data_python_connector/e6data_grpc.py index d2965f0..8fb7e99 100644 --- a/e6data_python_connector/e6data_grpc.py +++ b/e6data_python_connector/e6data_grpc.py @@ -262,6 +262,18 @@ def _create_client(self): self._client = e6x_engine_pb2_grpc.QueryEngineServiceStub(self._channel) def get_re_authenticate_session_id(self): + """ + Re-authenticates the session by closing the current connection and creating a new client. + + This method is used to re-establish the session ID by closing the existing gRPC channel, + creating a new client, and then retrieving a new session ID. + + Returns: + str: The new session ID after re-authentication. + + Raises: + Exception: If there is an error during the re-authentication process. + """ self.close() self._create_client() return self.get_session_id From 4ebacb20c216e63779db059b959121095730b3a0 Mon Sep 17 00:00:00 2001 From: Vishal Anand <101251245+vishale6x@users.noreply.github.com> Date: Tue, 25 Mar 2025 23:23:10 +0530 Subject: [PATCH 11/11] Docs updated. --- e6data_python_connector/cluster_manager.py | 13 + e6data_python_connector/e6data_grpc.py | 305 +++++++++++++++++++-- 2 files changed, 294 insertions(+), 24 deletions(-) diff --git a/e6data_python_connector/cluster_manager.py b/e6data_python_connector/cluster_manager.py index a9f05cd..369228c 100644 --- a/e6data_python_connector/cluster_manager.py +++ b/e6data_python_connector/cluster_manager.py @@ -8,6 +8,19 @@ def _get_grpc_header(engine_ip=None, cluster=None): + """ + Generate gRPC metadata headers for the request. + + This function creates a list of metadata headers to be used in gRPC requests. + It includes optional headers for the engine IP and cluster UUID. + + Args: + engine_ip (str, optional): The IP address of the engine. Defaults to None. + cluster (str, optional): The UUID of the cluster. Defaults to None. + + Returns: + list: A list of tuples representing the gRPC metadata headers. + """ metadata = [] if engine_ip: metadata.append(('plannerip', engine_ip)) diff --git a/e6data_python_connector/e6data_grpc.py b/e6data_python_connector/e6data_grpc.py index 8fb7e99..07acddf 100644 --- a/e6data_python_connector/e6data_grpc.py +++ b/e6data_python_connector/e6data_grpc.py @@ -328,35 +328,58 @@ def get_session_id(self): raise e return self._session_id - def update_users(self, user_info): - self.client.updateUsers(userInfo=user_info) - - def set_prop_map(self, prop_map: str): - """ - To enable to disable the caches. - :param prop_map: To set engine props + def __enter__(self): """ - set_props_request = e6x_engine_pb2.SetPropsRequest(sessionId=self.get_session_id, props=prop_map) - self._client.setProps(set_props_request) + Enters the runtime context related to this object. - def __enter__(self): - """Transport should already be opened by __init__""" + This method is called when the execution flow enters the context of the `with` statement. + + Returns: + Connection: The current instance of the connection. + """ return self def __exit__(self, exc_type, exc_val, exc_tb): - """Call close""" + """ + Exits the runtime context related to this object. + + This method is called when the execution flow exits the context of the `with` statement. + + Args: + exc_type (Type[BaseException]): The type of exception raised (if any). + exc_val (BaseException): The exception instance raised (if any). + exc_tb (Traceback): The traceback object of the exception (if any). + """ self.close() def close(self): + """ + Closes the gRPC channel and resets the session ID. + + This method ensures that the gRPC channel is properly closed and the session ID is reset to None. + """ if self._channel is not None: self._channel.close() self._channel = None self._session_id = None def check_connection(self): + """ + Checks if the gRPC channel is still open. + + Returns: + bool: True if the gRPC channel is open, False otherwise. + """ return self._channel is not None def clear(self, query_id, engine_ip=None): + """ + Clears the query results from the server. + + Args: + query_id (str): The ID of the query to be cleared. + engine_ip (str, optional): The IP address of the engine. Defaults to None. + """ clear_request = e6x_engine_pb2.ClearRequest( sessionId=self.get_session_id, queryId=query_id, @@ -368,10 +391,22 @@ def clear(self, query_id, engine_ip=None): ) def reopen(self): + """ + Reopens the gRPC channel by closing the current channel and creating a new client. + + This method is useful for re-establishing the connection if it was previously closed. + """ self._channel.close() self._create_client() def query_cancel(self, engine_ip, query_id): + """ + Cancels the execution of a query on the server. + + Args: + engine_ip (str): The IP address of the engine. + query_id (str): The ID of the query to be canceled. + """ cancel_query_request = e6x_engine_pb2.CancelQueryRequest( engineIP=engine_ip, sessionId=self.get_session_id, @@ -383,6 +418,15 @@ def query_cancel(self, engine_ip, query_id): ) def dry_run(self, query): + """ + Performs a dry run of the query to validate its syntax and structure. + + Args: + query (str): The SQL query to be validated. + + Returns: + str: The result of the dry run validation. + """ dry_run_request = e6x_engine_pb2.DryRunRequest( sessionId=self.get_session_id, schema=self.database, @@ -395,6 +439,16 @@ def dry_run(self, query): return dry_run_response.dryrunValue def get_tables(self, catalog, database): + """ + Retrieves the list of tables from the specified catalog and database. + + Args: + catalog (str): The catalog name. + database (str): The database name. + + Returns: + list: A list of table names. + """ get_table_request = e6x_engine_pb2.GetTablesV2Request( sessionId=self.get_session_id, schema=database, @@ -407,6 +461,17 @@ def get_tables(self, catalog, database): return list(get_table_response.tables) def get_columns(self, catalog, database, table): + """ + Retrieves the list of columns for the specified table in the given catalog and database. + + Args: + catalog (str): The catalog name. + database (str): The database name. + table (str): The table name. + + Returns: + list: A list of dictionaries containing column information. + """ get_columns_request = e6x_engine_pb2.GetColumnsV2Request( sessionId=self.get_session_id, schema=database, @@ -420,6 +485,15 @@ def get_columns(self, catalog, database, table): return [{'fieldName': row.fieldName, 'fieldType': row.fieldType} for row in get_columns_response.fieldInfo] def get_schema_names(self, catalog): + """ + Retrieves the list of schema names from the specified catalog. + + Args: + catalog (str): The catalog name. + + Returns: + list: A list of schema names. + """ get_schema_request = e6x_engine_pb2.GetSchemaNamesV2Request( sessionId=self.get_session_id, catalog=catalog @@ -431,29 +505,65 @@ def get_schema_names(self, catalog): return list(get_schema_response.schemas) def commit(self): - """We do not support transactions, so this does nothing.""" + """ + Commits the current transaction. + + Note: + This method does nothing as transactions are not supported. + """ pass def cursor(self, catalog_name=None, db_name=None): - """Return a new :py:class:`Cursor` object using the connection.""" + """ + Creates a new cursor object for executing queries. + + Args: + catalog_name (str, optional): The catalog name. Defaults to None. + db_name (str, optional): The database name. Defaults to None. + + Returns: + Cursor: A new cursor object. + """ return Cursor(self, database=db_name, catalog_name=catalog_name) def rollback(self): - raise Exception("e6xdb does not support transactions") # pragma: no cover + """ + Rolls back the current transaction. + + Raises: + Exception: Always raises an exception as transactions are not supported. + """ + raise Exception("e6data does not support transactions") # pragma: no cover @property def client(self): + """ + Returns the gRPC client stub for interacting with the server. + + Returns: + e6x_engine_pb2_grpc.QueryEngineServiceStub: The gRPC client stub. + """ return self._client class Cursor(DBAPICursor): - """These objects represent a database cursor, which is used to manage the context of a fetch + """ + These objects represent a database cursor, which is used to manage the context of a fetch operation. Cursors are not isolated, i.e., any changes done to the database by a cursor are immediately visible by other cursors or connections. """ def __init__(self, connection: Connection, array_size=1000, database=None, catalog_name=None): + """ + Initialize a new Cursor object. + + Args: + connection (Connection): The connection object to the database. + array_size (int, optional): The number of rows to fetch at a time. Defaults to 1000. + database (str, optional): The database name. Defaults to None. + catalog_name (str, optional): The catalog name. Defaults to None. + """ super(Cursor, self).__init__() self._array_size = array_size self.connection = connection @@ -474,15 +584,32 @@ def _reset_state(self): @property def metadata(self): + """ + Get the gRPC metadata for the current query. + + Returns: + list: A list of tuples containing gRPC metadata. + """ return _get_grpc_header(engine_ip=self._engine_ip, cluster=self.connection.cluster_uuid) @property def arraysize(self): + """ + Get the array size for fetching rows. + + Returns: + int: The number of rows to fetch at a time. + """ return self._arraysize @arraysize.setter def arraysize(self, value): - """Array size cannot be None, and should be an integer""" + """ + Set the array size for fetching rows. + + Args: + value (int): The number of rows to fetch at a time. + """ default_arraysize = 1000 try: self._arraysize = int(value) or default_arraysize @@ -518,14 +645,29 @@ def description(self): return self._description def __enter__(self): + """ + Enter the runtime context related to this object. + + Returns: + Cursor: The current instance of the cursor. + """ return self def __exit__(self, exc_type, exc_val, exc_tb): + """ + Exit the runtime context related to this object. + + Args: + exc_type (Type[BaseException]): The type of exception raised (if any). + exc_val (BaseException): The exception instance raised (if any). + exc_tb (Traceback): The traceback object of the exception (if any). + """ self.close() def close(self): - """Close the operation handle""" - # self.connection.close() + """ + Close the operation handle and reset the cursor state. + """ try: self.clear() except: @@ -542,17 +684,44 @@ def close(self): self._database = None def get_tables(self): + """ + Retrieve the list of tables from the current database. + + Returns: + list: A list of table names. + """ schema = self.connection.database return self.connection.get_tables(catalog=self._catalog_name, database=schema) def get_columns(self, table): + """ + Retrieve the list of columns for the specified table. + + Args: + table (str): The table name. + + Returns: + list: A list of dictionaries containing column information. + """ schema = self.connection.database return self.connection.get_columns(catalog=self._catalog_name, database=schema, table=table) def get_schema_names(self): + """ + Retrieve the list of schema names from the current catalog. + + Returns: + list: A list of schema names. + """ return self.connection.get_schema_names(catalog=self._catalog_name) def clear(self, query_id=None): + """ + Clear the query results from the server. + + Args: + query_id (str, optional): The ID of the query to be cleared. Defaults to None. + """ if not query_id: query_id = self._query_id clear_request = e6x_engine_pb2.ClearOrCancelQueryRequest( @@ -563,9 +732,24 @@ def clear(self, query_id=None): return self.connection.client.clearOrCancelQuery(clear_request, metadata=self.metadata) def cancel(self, query_id): + """ + Cancel the execution of a query on the server. + + Args: + query_id (str): The ID of the query to be canceled. + """ self.connection.query_cancel(engine_ip=self._engine_ip, query_id=query_id) def status(self, query_id): + """ + Get the status of the specified query. + + Args: + query_id (str): The ID of the query. + + Returns: + StatusResponse: The status response of the query. + """ status_request = e6x_engine_pb2.StatusRequest( sessionId=self.connection.get_session_id, queryId=query_id, @@ -575,12 +759,17 @@ def status(self, query_id): @re_auth def execute(self, operation, parameters=None, **kwargs): - """Prepare and execute a database operation (query or command). - Return values are not defined. - """ """ - Semicolon is now not supported. So removing it from query end. + Prepare and execute a database operation (query or command). + + Args: + operation (str): The SQL query or command to execute. + parameters (dict, optional): The parameters to bind to the query. Defaults to None. + + Returns: + str: The query ID of the executed query. """ + # Semicolon is now not supported. So removing it from query end. operation = operation.strip() # Remove leading and trailing whitespaces. if operation.endswith(';'): operation = operation[:-1] @@ -644,10 +833,19 @@ def execute(self, operation, parameters=None, **kwargs): @property def rowcount(self): + """ + Get the number of rows affected by the last execute operation. + + Returns: + int: The number of rows affected. + """ self.update_mete_data() return self._rowcount def update_mete_data(self): + """ + Update the metadata for the current query. + """ result_meta_data_request = e6x_engine_pb2.GetResultMetadataRequest( engineIP=self._engine_ip, sessionId=self.connection.get_session_id, @@ -662,6 +860,12 @@ def update_mete_data(self): self._is_metadata_updated = True def _fetch_more(self): + """ + Fetch more rows from the server. + + Returns: + list: A list of rows fetched from the server. + """ batch_size = self._arraysize self._data = list() for i in range(batch_size): @@ -672,6 +876,12 @@ def _fetch_more(self): return self._data def _fetch_all(self): + """ + Fetch all rows from the server. + + Returns: + list: A list of all rows fetched from the server. + """ self._data = list() while True: rows = self.fetch_batch() @@ -683,6 +893,15 @@ def _fetch_all(self): return rows def fetchall_buffer(self, query_id=None): + """ + Fetch all rows from the server in a buffered manner. + + Args: + query_id (str, optional): The ID of the query. Defaults to None. + + Yields: + list: A list of rows fetched from the server. + """ if query_id: self._query_id = query_id while True: @@ -692,6 +911,12 @@ def fetchall_buffer(self, query_id=None): yield rows def fetch_batch(self): + """ + Fetch a batch of rows from the server. + + Returns: + list: A list of rows fetched from the server. + """ client = self.connection.client get_next_result_batch_request = e6x_engine_pb2.GetNextResultBatchRequest( engineIP=self._engine_ip, @@ -711,9 +936,24 @@ def fetch_batch(self): return read_rows_from_chunk(self._query_columns_description, buffer) def fetchall(self): + """ + Fetch all rows from the server. + + Returns: + list: A list of all rows fetched from the server. + """ return self._fetch_all() def fetchmany(self, size: int = None): + """ + Fetch a specified number of rows from the server. + + Args: + size (int, optional): The number of rows to fetch. Defaults to None. + + Returns: + list: A list of rows fetched from the server. + """ if size is None: size = self.arraysize if self._data is None: @@ -732,13 +972,24 @@ def fetchmany(self, size: int = None): return rows def fetchone(self): - # _logger.info("fetch One returning the batch itself which is limited by predefined no.of rows") + """ + Fetch a single row from the server. + + Returns: + list: A single row fetched from the server. + """ rows = self.fetchmany(1) if rows is None or len(rows) == 0: return None return rows def explain(self): + """ + Get the execution plan for the current query. + + Returns: + str: The execution plan of the query. + """ explain_request = e6x_engine_pb2.ExplainRequest( engineIP=self._engine_ip, sessionId=self.connection.get_session_id, @@ -751,6 +1002,12 @@ def explain(self): return explain_response.explain def explain_analyse(self): + """ + Get the execution plan for the current query. + + Returns: + dict: The execution plan of the query. + """ explain_analyze_request = e6x_engine_pb2.ExplainAnalyzeRequest( engineIP=self._engine_ip, sessionId=self.connection.get_session_id,