From 551acf02a767e6adfd0b06e6ab8b60d584ac2e7d Mon Sep 17 00:00:00 2001 From: Vishal Anand <101251245+vishale6x@users.noreply.github.com> Date: Fri, 21 Nov 2025 15:45:58 +0530 Subject: [PATCH 1/6] Making fastbinary optional --- README.md | 49 +++++++++++++++++++++- e6data_python_connector/datainputstream.py | 31 ++++++++++---- e6data_python_connector/e6data_grpc.py | 35 +++++++++++++++- setup.py | 2 +- 4 files changed, 106 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index cc727f3..c9c7213 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # e6data Python Connector -![version](https://img.shields.io/badge/version-2.3.11-blue.svg) +![version](https://img.shields.io/badge/version-2.3.12rc31-blue.svg) ## Introduction @@ -86,6 +86,7 @@ The `Connection` class supports the following parameters: | `auto_resume` | bool | No | True | Automatically resume cluster if suspended | | `grpc_options` | dict | No | None | Additional gRPC configuration options | | `debug` | bool | No | False | Enable debug logging for troubleshooting | +| `require_fastbinary` | bool | No | True | Require fastbinary module for Thrift deserialization. Set to False to use pure Python implementation if system dependencies cannot be installed | #### Secure Connection Example @@ -119,6 +120,51 @@ conn = Connection( ) ``` +#### Handling Missing System Dependencies + +The e6data connector uses the `fastbinary` module (from Apache Thrift) for optimal performance when deserializing data. This module requires system-level dependencies (`python3-devel` and `gcc-c++`) to be installed. + +**Default Behavior (Recommended):** +By default, the connector requires `fastbinary` to be available. If it's not found, the connection will fail immediately with a clear error message: + +```python +conn = Connection( + host=host, + port=port, + username=username, + password=password, + database=database +) +# Raises exception if fastbinary is not available +``` + +**Fallback to Pure Python:** +If you cannot install system dependencies (e.g., in restricted environments, serverless platforms, or containers without build tools), you can disable the `fastbinary` requirement. The connector will fall back to a pure Python implementation with a performance penalty: + +```python +conn = Connection( + host=host, + port=port, + username=username, + password=password, + database=database, + require_fastbinary=False # Allow operation without fastbinary +) +# Logs warning but continues with pure Python implementation +``` + +**When to use `require_fastbinary=False`:** +- Running in AWS Lambda or other serverless environments +- Docker containers built without compilation tools +- Restricted environments where system packages cannot be installed +- Development/testing environments where performance is not critical + +**Performance Impact:** +- With `fastbinary`: Optimal performance for data deserialization +- Without `fastbinary` (pure Python): ~2-3x slower deserialization, but otherwise fully functional + +**Note:** It's strongly recommended to install system dependencies when possible for best performance. The `require_fastbinary=False` option should only be used when system dependencies cannot be installed. + ### Perform a Queries & Get Results ```python @@ -652,5 +698,6 @@ python your_script.py | 456 Strategy Error | Automatic blue-green failover will handle this | | Memory issues with large results | Use `fetchall_buffer()` instead of `fetchall()` | | gRPC message size errors | Configure `grpc_options` with appropriate message size limits | +| fastbinary import error | Install system dependencies (`python3-devel`, `gcc-c++`) or set `require_fastbinary=False` | See [TECH_DOC.md](TECH_DOC.md) for detailed technical documentation. diff --git a/e6data_python_connector/datainputstream.py b/e6data_python_connector/datainputstream.py index d9c81f1..4a496b9 100644 --- a/e6data_python_connector/datainputstream.py +++ b/e6data_python_connector/datainputstream.py @@ -12,20 +12,27 @@ from e6data_python_connector.constants import ZONE from e6data_python_connector.date_time_utils import floor_div, floor_mod, timezone_from_offset +# Try to import fastbinary - it's optional but provides better performance +_fastbinary_available = False try: from thrift.protocol import fastbinary + _fastbinary_available = True except ImportError: - raise Exception( - """ - Failed to import fastbinary. - Did you install system dependencies? - Please verify https://github.com/e6x-labs/e6data-python-connector#dependencies - """ - ) + pass # Will check require_fastbinary flag at connection creation time _logger = logging.getLogger(__name__) +def is_fastbinary_available(): + """ + Check if fastbinary module is available. + + Returns: + bool: True if fastbinary is available, False otherwise + """ + return _fastbinary_available + + def _binary_to_decimal128(binary_data, scale=None): """ Convert binary data to Decimal128. @@ -504,6 +511,16 @@ def read_values_from_array(query_columns_description: list, dis: DataInputStream def read_rows_from_chunk(query_columns_description: list, buffer): + """ + Read rows from a Thrift-encoded chunk buffer. + + Args: + query_columns_description: List of column descriptions + buffer: Thrift-encoded binary buffer + + Returns: + List of rows + """ # Create a transport and protocol instance for deserialization transport = TTransport.TMemoryBuffer(buffer) protocol = TBinaryProtocol.TBinaryProtocolAccelerated(transport) diff --git a/e6data_python_connector/e6data_grpc.py b/e6data_python_connector/e6data_grpc.py index db667c4..62591ee 100644 --- a/e6data_python_connector/e6data_grpc.py +++ b/e6data_python_connector/e6data_grpc.py @@ -26,7 +26,7 @@ from e6data_python_connector.strategy import _get_grpc_header as _get_strategy_header from e6data_python_connector.common import DBAPITypeObject, ParamEscaper, DBAPICursor from e6data_python_connector.constants import * -from e6data_python_connector.datainputstream import get_query_columns_info, read_rows_from_chunk +from e6data_python_connector.datainputstream import get_query_columns_info, read_rows_from_chunk, is_fastbinary_available from e6data_python_connector.server import e6x_engine_pb2_grpc, e6x_engine_pb2 from e6data_python_connector.typeId import * @@ -335,6 +335,7 @@ def __init__( scheme: str = 'e6data', grpc_options: dict = None, debug: bool = False, + require_fastbinary: bool = True, ): """ Parameters @@ -368,6 +369,10 @@ def __init__( - keepalive_time_ms: This parameter defines the time, in milliseconds, Default to 30 seconds debug: bool, Optional Flag to enable debug logging for blue-green deployment strategy changes + require_fastbinary: bool, Optional + Flag to require fastbinary module for Thrift deserialization. If True (default), + raises an exception if fastbinary is not available. If False, logs a warning + and continues with pure Python implementation (with reduced performance). """ if not username or not password: raise ValueError("username or password cannot be empty.") @@ -387,6 +392,29 @@ def __init__( self._auto_resume = auto_resume + # Store require_fastbinary flag + self._require_fastbinary = require_fastbinary + + # Check fastbinary availability at connection creation time + if not is_fastbinary_available(): + if require_fastbinary: + raise Exception( + """ + Failed to import fastbinary. + Did you install system dependencies? + Please verify https://github.com/e6x-labs/e6data-python-connector#dependencies + + To continue without fastbinary (with reduced performance), set require_fastbinary=False + in the connection parameters. + """ + ) + else: + logger.warning( + "fastbinary module is not available. Using pure Python implementation. " + "Performance may be degraded. To enable fastbinary, install system dependencies: " + "https://github.com/e6x-labs/e6data-python-connector#dependencies" + ) + self._grpc_options = grpc_options if self._grpc_options is None: self._grpc_options = dict() @@ -1464,7 +1492,10 @@ def fetch_batch(self): if not buffer or len(buffer) == 0: return None # one batch retrieves the predefined set of rows - return read_rows_from_chunk(self._query_columns_description, buffer) + return read_rows_from_chunk( + self._query_columns_description, + buffer + ) def fetchall(self): """ diff --git a/setup.py b/setup.py index edc50e0..136d36f 100644 --- a/setup.py +++ b/setup.py @@ -12,7 +12,7 @@ import setuptools -VERSION = (2, 3, 11,) +VERSION = (2, 3, 12, 'rc31',) def get_long_desc(): From 0e9067b40aabe6534893abafa5f03b5fac7de15c Mon Sep 17 00:00:00 2001 From: Vishal Anand <101251245+vishale6x@users.noreply.github.com> Date: Fri, 21 Nov 2025 22:59:34 +0530 Subject: [PATCH 2/6] Making fastbinary optional --- README.md | 69 +++++++++++++++++++++- e6data_python_connector/cluster_manager.py | 46 ++++++++++++++- e6data_python_connector/e6data_grpc.py | 49 ++++++++++++++- setup.py | 2 +- 4 files changed, 160 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index c9c7213..d891731 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # e6data Python Connector -![version](https://img.shields.io/badge/version-2.3.12rc31-blue.svg) +![version](https://img.shields.io/badge/version-2.3.12rc32-blue.svg) ## Introduction @@ -83,6 +83,7 @@ The `Connection` class supports the following parameters: | `catalog` | str | No | None | Catalog name | | `cluster_name` | str | No | None | Name of the cluster for cluster-specific operations | | `secure` | bool | No | False | Enable SSL/TLS for secure connections | +| `ssl_cert` | str/bytes | No | None | Path to CA certificate (PEM) or certificate bytes for HTTPS connections | | `auto_resume` | bool | No | True | Automatically resume cluster if suspended | | `grpc_options` | dict | No | None | Additional gRPC configuration options | | `debug` | bool | No | False | Enable debug logging for troubleshooting | @@ -120,6 +121,72 @@ conn = Connection( ) ``` +#### HTTPS Connection with HAProxy + +When connecting through HAProxy with HTTPS, you can provide a custom CA certificate for secure connections. The `ssl_cert` parameter accepts either a file path to a PEM certificate or the certificate content as bytes. + +**Using a CA certificate file path:** + +```python +conn = Connection( + host=host, + port=443, + username=username, + password=password, + database=database, + secure=True, + ssl_cert='/path/to/ca-cert.pem' # Path to your CA certificate +) +``` + +**Reading certificate content as bytes:** + +```python +# Read certificate file and pass as bytes +with open('/path/to/ca-cert.pem', 'rb') as cert_file: + cert_data = cert_file.read() + +conn = Connection( + host=host, + port=443, + username=username, + password=password, + database=database, + secure=True, + ssl_cert=cert_data # Certificate content as bytes +) +``` + +**Using system CA bundle for publicly signed certificates:** + +```python +# When ssl_cert is None, system default CA bundle is used +conn = Connection( + host=host, + port=443, + username=username, + password=password, + database=database, + secure=True # Uses system CA bundle by default +) +``` + +**Connection pooling with custom CA certificate:** + +```python +pool = ConnectionPool( + min_size=2, + max_size=10, + host=host, + port=443, + username=username, + password=password, + database=database, + secure=True, + ssl_cert='/path/to/ca-cert.pem' # Custom CA certificate for pool connections +) +``` + #### Handling Missing System Dependencies The e6data connector uses the `fastbinary` module (from Apache Thrift) for optimal performance when deserializing data. This module requires system-level dependencies (`python3-devel` and `gcc-c++`) to be installed. diff --git a/e6data_python_connector/cluster_manager.py b/e6data_python_connector/cluster_manager.py index d2905c9..11aa68b 100644 --- a/e6data_python_connector/cluster_manager.py +++ b/e6data_python_connector/cluster_manager.py @@ -140,7 +140,7 @@ class ClusterManager: """ def __init__(self, host: str, port: int, user: str, password: str, secure_channel: bool = False, timeout=60 * 5, - cluster_uuid=None, grpc_options=None, debug=False): + cluster_uuid=None, grpc_options=None, debug=False, ssl_cert=None): """ Initializes a new instance of the ClusterManager class. @@ -156,6 +156,8 @@ def __init__(self, host: str, port: int, user: str, password: str, secure_channe cluster_uuid (str, optional): The unique identifier for the target cluster; defaults to None. debug (bool, optional): Enable debug logging; defaults to False. + ssl_cert (str or bytes, optional): Path to CA certificate file (PEM format) or + certificate content as bytes for secure connections; defaults to None. """ self._host = host @@ -169,6 +171,46 @@ def __init__(self, host: str, port: int, user: str, password: str, secure_channe if grpc_options is None: self._grpc_options = dict() self._debug = debug + self._ssl_cert = ssl_cert + + def _get_ssl_credentials(self): + """ + Get SSL credentials for secure gRPC channel. + + Handles three scenarios: + 1. ssl_cert is a string (file path): Read the PEM certificate from the file + 2. ssl_cert is bytes: Use the certificate content directly + 3. ssl_cert is None: Use system default CA bundle + + Returns: + grpc.ChannelCredentials: SSL credentials for secure channel + + Raises: + FileNotFoundError: If ssl_cert is a file path but the file doesn't exist + IOError: If ssl_cert file cannot be read + """ + if self._ssl_cert is None: + # Use system default CA bundle + return grpc.ssl_channel_credentials() + elif isinstance(self._ssl_cert, str): + # ssl_cert is a file path - read the certificate from file + try: + with open(self._ssl_cert, 'rb') as cert_file: + root_ca_cert = cert_file.read() + return grpc.ssl_channel_credentials(root_certificates=root_ca_cert) + except FileNotFoundError: + logger.error(f"SSL certificate file not found: {self._ssl_cert}") + raise + except IOError as e: + logger.error(f"Failed to read SSL certificate file {self._ssl_cert}: {e}") + raise + elif isinstance(self._ssl_cert, bytes): + # ssl_cert is certificate content as bytes + return grpc.ssl_channel_credentials(root_certificates=self._ssl_cert) + else: + # Invalid type - log warning and use system default + logger.warning(f"Invalid ssl_cert type: {type(self._ssl_cert)}. Using system default CA bundle.") + return grpc.ssl_channel_credentials() @property def _get_connection(self): @@ -184,7 +226,7 @@ def _get_connection(self): self._channel = grpc.secure_channel( target='{}:{}'.format(self._host, self._port), options=self._grpc_options, - credentials=grpc.ssl_channel_credentials() + credentials=self._get_ssl_credentials() ) else: self._channel = grpc.insecure_channel( diff --git a/e6data_python_connector/e6data_grpc.py b/e6data_python_connector/e6data_grpc.py index 62591ee..5f43070 100644 --- a/e6data_python_connector/e6data_grpc.py +++ b/e6data_python_connector/e6data_grpc.py @@ -331,6 +331,7 @@ def __init__( database: str = None, cluster_name: str = None, secure: bool = False, + ssl_cert = None, auto_resume: bool = True, scheme: str = 'e6data', grpc_options: dict = None, @@ -356,6 +357,9 @@ def __init__( Cluster's name secure: bool, Optional Flag to use a secure channel for data transfer + ssl_cert: str or bytes, Optional + Path to CA certificate file (PEM format) or certificate content as bytes for + secure connections. If None, system default CA bundle is used. auto_resume: bool, Optional Flag to enable auto resume of the cluster before the query execution scheme: string, Optional @@ -387,6 +391,7 @@ def __init__( self._port = port self._secure_channel = secure + self._ssl_cert = ssl_cert self.catalog_name = catalog @@ -511,6 +516,45 @@ def _get_grpc_options(self): return self._cached_grpc_options + def _get_ssl_credentials(self): + """ + Get SSL credentials for secure gRPC channel. + + Handles three scenarios: + 1. ssl_cert is a string (file path): Read the PEM certificate from the file + 2. ssl_cert is bytes: Use the certificate content directly + 3. ssl_cert is None: Use system default CA bundle + + Returns: + grpc.ChannelCredentials: SSL credentials for secure channel + + Raises: + FileNotFoundError: If ssl_cert is a file path but the file doesn't exist + IOError: If ssl_cert file cannot be read + """ + if self._ssl_cert is None: + # Use system default CA bundle + return grpc.ssl_channel_credentials() + elif isinstance(self._ssl_cert, str): + # ssl_cert is a file path - read the certificate from file + try: + with open(self._ssl_cert, 'rb') as cert_file: + root_ca_cert = cert_file.read() + return grpc.ssl_channel_credentials(root_certificates=root_ca_cert) + except FileNotFoundError: + logger.error(f"SSL certificate file not found: {self._ssl_cert}") + raise + except IOError as e: + logger.error(f"Failed to read SSL certificate file {self._ssl_cert}: {e}") + raise + elif isinstance(self._ssl_cert, bytes): + # ssl_cert is certificate content as bytes + return grpc.ssl_channel_credentials(root_certificates=self._ssl_cert) + else: + # Invalid type - log warning and use system default + logger.warning(f"Invalid ssl_cert type: {type(self._ssl_cert)}. Using system default CA bundle.") + return grpc.ssl_channel_credentials() + def _create_client(self): """ Creates a gRPC client for the connection. @@ -531,7 +575,7 @@ def _create_client(self): self._channel = grpc.secure_channel( target='{}:{}'.format(self._host, self._port), options=self._get_grpc_options, - credentials=grpc.ssl_channel_credentials() + credentials=self._get_ssl_credentials() ) else: self._channel = grpc.insecure_channel( @@ -713,7 +757,8 @@ def _perform_auto_resume(self, e: _InactiveRpcError): secure_channel=self._secure_channel, cluster_uuid=self.cluster_name, timeout=self.grpc_auto_resume_timeout_seconds, - debug=self._debug + debug=self._debug, + ssl_cert=self._ssl_cert ).resume() return status # Return boolean status directly else: diff --git a/setup.py b/setup.py index 136d36f..68bf51f 100644 --- a/setup.py +++ b/setup.py @@ -12,7 +12,7 @@ import setuptools -VERSION = (2, 3, 12, 'rc31',) +VERSION = (2, 3, 12, 'rc32',) def get_long_desc(): From 62d0a15a35c88d875abb5678e29e3d056000c7c4 Mon Sep 17 00:00:00 2001 From: Vishal Anand <101251245+vishale6x@users.noreply.github.com> Date: Sun, 23 Nov 2025 10:52:03 +0530 Subject: [PATCH 3/6] Updated release version --- README.md | 2 +- setup.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index d891731..238bf6a 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # e6data Python Connector -![version](https://img.shields.io/badge/version-2.3.12rc32-blue.svg) +![version](https://img.shields.io/badge/version-2.3.12-blue.svg) ## Introduction diff --git a/setup.py b/setup.py index 68bf51f..d5ef466 100644 --- a/setup.py +++ b/setup.py @@ -12,7 +12,7 @@ import setuptools -VERSION = (2, 3, 12, 'rc32',) +VERSION = (2, 3, 12,) def get_long_desc(): From fec4307af4b116ece6f77f52b8a332e3fb6ec2a1 Mon Sep 17 00:00:00 2001 From: Vishal Anand <101251245+vishale6x@users.noreply.github.com> Date: Sun, 23 Nov 2025 11:02:32 +0530 Subject: [PATCH 4/6] Removed duplicate get_ssl_credentials function --- e6data_python_connector/cluster_manager.py | 42 +-------------------- e6data_python_connector/common.py | 44 ++++++++++++++++++++++ e6data_python_connector/e6data_grpc.py | 43 +-------------------- 3 files changed, 48 insertions(+), 81 deletions(-) diff --git a/e6data_python_connector/cluster_manager.py b/e6data_python_connector/cluster_manager.py index 11aa68b..b999a00 100644 --- a/e6data_python_connector/cluster_manager.py +++ b/e6data_python_connector/cluster_manager.py @@ -9,6 +9,7 @@ logger = logging.getLogger(__name__) +from e6data_python_connector.common import get_ssl_credentials from e6data_python_connector.strategy import _get_active_strategy, _set_active_strategy, _set_pending_strategy, \ _get_grpc_header as _get_strategy_header @@ -173,45 +174,6 @@ def __init__(self, host: str, port: int, user: str, password: str, secure_channe self._debug = debug self._ssl_cert = ssl_cert - def _get_ssl_credentials(self): - """ - Get SSL credentials for secure gRPC channel. - - Handles three scenarios: - 1. ssl_cert is a string (file path): Read the PEM certificate from the file - 2. ssl_cert is bytes: Use the certificate content directly - 3. ssl_cert is None: Use system default CA bundle - - Returns: - grpc.ChannelCredentials: SSL credentials for secure channel - - Raises: - FileNotFoundError: If ssl_cert is a file path but the file doesn't exist - IOError: If ssl_cert file cannot be read - """ - if self._ssl_cert is None: - # Use system default CA bundle - return grpc.ssl_channel_credentials() - elif isinstance(self._ssl_cert, str): - # ssl_cert is a file path - read the certificate from file - try: - with open(self._ssl_cert, 'rb') as cert_file: - root_ca_cert = cert_file.read() - return grpc.ssl_channel_credentials(root_certificates=root_ca_cert) - except FileNotFoundError: - logger.error(f"SSL certificate file not found: {self._ssl_cert}") - raise - except IOError as e: - logger.error(f"Failed to read SSL certificate file {self._ssl_cert}: {e}") - raise - elif isinstance(self._ssl_cert, bytes): - # ssl_cert is certificate content as bytes - return grpc.ssl_channel_credentials(root_certificates=self._ssl_cert) - else: - # Invalid type - log warning and use system default - logger.warning(f"Invalid ssl_cert type: {type(self._ssl_cert)}. Using system default CA bundle.") - return grpc.ssl_channel_credentials() - @property def _get_connection(self): """ @@ -226,7 +188,7 @@ def _get_connection(self): self._channel = grpc.secure_channel( target='{}:{}'.format(self._host, self._port), options=self._grpc_options, - credentials=self._get_ssl_credentials() + credentials=get_ssl_credentials(self._ssl_cert) ) else: self._channel = grpc.insecure_channel( diff --git a/e6data_python_connector/common.py b/e6data_python_connector/common.py index 323a1ce..6dd887d 100644 --- a/e6data_python_connector/common.py +++ b/e6data_python_connector/common.py @@ -15,6 +15,7 @@ from builtins import object from builtins import str +import grpc from future.utils import with_metaclass from past.builtins import basestring @@ -267,3 +268,46 @@ class UniversalSet(object): def __contains__(self, item): return True + + +def get_ssl_credentials(ssl_cert): + """ + Get SSL credentials for secure gRPC channel. + + Handles three scenarios: + 1. ssl_cert is a string (file path): Read the PEM certificate from the file + 2. ssl_cert is bytes: Use the certificate content directly + 3. ssl_cert is None: Use system default CA bundle + + Parameters: + ssl_cert (str or bytes or None): SSL certificate as file path, bytes, or None + + Returns: + grpc.ChannelCredentials: SSL credentials for gRPC channel + + Raises: + FileNotFoundError: If ssl_cert is a file path but the file doesn't exist + IOError: If ssl_cert file cannot be read + """ + if ssl_cert is None: + # Use system default CA bundle + return grpc.ssl_channel_credentials() + elif isinstance(ssl_cert, str): + # ssl_cert is a file path - read the certificate from file + try: + with open(ssl_cert, 'rb') as cert_file: + root_ca_cert = cert_file.read() + return grpc.ssl_channel_credentials(root_certificates=root_ca_cert) + except FileNotFoundError: + _logger.error("SSL certificate file not found: {}".format(ssl_cert)) + raise + except IOError as e: + _logger.error("Failed to read SSL certificate file {}: {}".format(ssl_cert, e)) + raise + elif isinstance(ssl_cert, bytes): + # ssl_cert is certificate content as bytes + return grpc.ssl_channel_credentials(root_certificates=ssl_cert) + else: + # Invalid type - fall back to system default CA bundle with warning + _logger.warning("Invalid ssl_cert type: {}. Using system default CA bundle.".format(type(ssl_cert))) + return grpc.ssl_channel_credentials() diff --git a/e6data_python_connector/e6data_grpc.py b/e6data_python_connector/e6data_grpc.py index 5f43070..0f9964b 100644 --- a/e6data_python_connector/e6data_grpc.py +++ b/e6data_python_connector/e6data_grpc.py @@ -24,7 +24,7 @@ from e6data_python_connector.cluster_manager import ClusterManager from e6data_python_connector.strategy import _get_grpc_header as _get_strategy_header -from e6data_python_connector.common import DBAPITypeObject, ParamEscaper, DBAPICursor +from e6data_python_connector.common import DBAPITypeObject, ParamEscaper, DBAPICursor, get_ssl_credentials from e6data_python_connector.constants import * from e6data_python_connector.datainputstream import get_query_columns_info, read_rows_from_chunk, is_fastbinary_available from e6data_python_connector.server import e6x_engine_pb2_grpc, e6x_engine_pb2 @@ -516,45 +516,6 @@ def _get_grpc_options(self): return self._cached_grpc_options - def _get_ssl_credentials(self): - """ - Get SSL credentials for secure gRPC channel. - - Handles three scenarios: - 1. ssl_cert is a string (file path): Read the PEM certificate from the file - 2. ssl_cert is bytes: Use the certificate content directly - 3. ssl_cert is None: Use system default CA bundle - - Returns: - grpc.ChannelCredentials: SSL credentials for secure channel - - Raises: - FileNotFoundError: If ssl_cert is a file path but the file doesn't exist - IOError: If ssl_cert file cannot be read - """ - if self._ssl_cert is None: - # Use system default CA bundle - return grpc.ssl_channel_credentials() - elif isinstance(self._ssl_cert, str): - # ssl_cert is a file path - read the certificate from file - try: - with open(self._ssl_cert, 'rb') as cert_file: - root_ca_cert = cert_file.read() - return grpc.ssl_channel_credentials(root_certificates=root_ca_cert) - except FileNotFoundError: - logger.error(f"SSL certificate file not found: {self._ssl_cert}") - raise - except IOError as e: - logger.error(f"Failed to read SSL certificate file {self._ssl_cert}: {e}") - raise - elif isinstance(self._ssl_cert, bytes): - # ssl_cert is certificate content as bytes - return grpc.ssl_channel_credentials(root_certificates=self._ssl_cert) - else: - # Invalid type - log warning and use system default - logger.warning(f"Invalid ssl_cert type: {type(self._ssl_cert)}. Using system default CA bundle.") - return grpc.ssl_channel_credentials() - def _create_client(self): """ Creates a gRPC client for the connection. @@ -575,7 +536,7 @@ def _create_client(self): self._channel = grpc.secure_channel( target='{}:{}'.format(self._host, self._port), options=self._get_grpc_options, - credentials=self._get_ssl_credentials() + credentials=get_ssl_credentials(self._ssl_cert) ) else: self._channel = grpc.insecure_channel( From 3e6fba955d4a7664ed79eb248bf61b28dcc4cbee Mon Sep 17 00:00:00 2001 From: Vishal Anand <101251245+vishale6x@users.noreply.github.com> Date: Sun, 23 Nov 2025 11:21:13 +0530 Subject: [PATCH 5/6] Duplicate code removal and optimisation. --- e6data_python_connector/cluster_manager.py | 21 +--------------- e6data_python_connector/constants.py | 24 +++++++++++++++++++ e6data_python_connector/e6data_grpc.py | 28 ++++++++++------------ 3 files changed, 37 insertions(+), 36 deletions(-) diff --git a/e6data_python_connector/cluster_manager.py b/e6data_python_connector/cluster_manager.py index b999a00..1b15a87 100644 --- a/e6data_python_connector/cluster_manager.py +++ b/e6data_python_connector/cluster_manager.py @@ -11,26 +11,7 @@ from e6data_python_connector.common import get_ssl_credentials from e6data_python_connector.strategy import _get_active_strategy, _set_active_strategy, _set_pending_strategy, \ - _get_grpc_header as _get_strategy_header - - -def _get_grpc_header(engine_ip=None, cluster=None, strategy=None): - """ - Generate gRPC metadata headers for the request. - - This function creates a list of metadata headers to be used in gRPC requests. - It includes optional headers for the engine IP, cluster UUID, and deployment strategy. - - Args: - engine_ip (str, optional): The IP address of the engine. Defaults to None. - cluster (str, optional): The UUID of the cluster. Defaults to None. - strategy (str, optional): The deployment strategy (blue/green). Defaults to None. - - Returns: - list: A list of tuples representing the gRPC metadata headers. - """ - # Use the strategy module's implementation - return _get_strategy_header(engine_ip=engine_ip, cluster=cluster, strategy=strategy) + _get_grpc_header class _StatusLock: diff --git a/e6data_python_connector/constants.py b/e6data_python_connector/constants.py index d1c13de..742367b 100644 --- a/e6data_python_connector/constants.py +++ b/e6data_python_connector/constants.py @@ -1,5 +1,6 @@ import pytz +# Type definitions PRIMITIVE_TYPES = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 15, 16, 17, 18, 19, 20, 21} COMPLEX_TYPES = {10, 11, 12, 13, 14} COLLECTION_TYPES = {10, 11} @@ -30,3 +31,26 @@ PRECISION = "precision" SCALE = "scale" ZONE = pytz.timezone('UTC') + +# Retry and timeout constants +MAX_RETRY_ATTEMPTS = 5 +RETRY_SLEEP_SECONDS = 0.2 +STRATEGY_CACHE_TIMEOUT_SECONDS = 300 # 5 minutes +DEFAULT_GRPC_PREPARE_TIMEOUT_SECONDS = 600 # 10 minutes +DEFAULT_AUTO_RESUME_TIMEOUT_SECONDS = 300 # 5 minutes +CLUSTER_STATUS_CHECK_SLEEP_SECONDS = 5 +LOCK_TIMEOUT_MS = 500 + +# Connection pool constants +POOL_GET_TIMEOUT_SECONDS = 0.1 +POOL_RETRY_SLEEP_SECONDS = 0.1 + +# Blue-green deployment strategy constants +STRATEGY_BLUE = 'blue' +STRATEGY_GREEN = 'green' +VALID_STRATEGIES = {STRATEGY_BLUE, STRATEGY_GREEN} + +# gRPC error codes and messages +GRPC_ERROR_STRATEGY_MISMATCH = 'status: 456' +GRPC_ERROR_SERVICE_UNAVAILABLE = 'status: 503' +GRPC_ERROR_ACCESS_DENIED = 'Access denied' diff --git a/e6data_python_connector/e6data_grpc.py b/e6data_python_connector/e6data_grpc.py index 0f9964b..ba2e1a5 100644 --- a/e6data_python_connector/e6data_grpc.py +++ b/e6data_python_connector/e6data_grpc.py @@ -9,25 +9,27 @@ import datetime import logging import os - import re import sys +import threading import time from decimal import Decimal from io import BytesIO from ssl import CERT_NONE, CERT_OPTIONAL, CERT_REQUIRED -import threading -import multiprocessing import grpc from grpc._channel import _InactiveRpcError from e6data_python_connector.cluster_manager import ClusterManager -from e6data_python_connector.strategy import _get_grpc_header as _get_strategy_header from e6data_python_connector.common import DBAPITypeObject, ParamEscaper, DBAPICursor, get_ssl_credentials -from e6data_python_connector.constants import * -from e6data_python_connector.datainputstream import get_query_columns_info, read_rows_from_chunk, is_fastbinary_available +from e6data_python_connector.constants import ( + MAX_RETRY_ATTEMPTS, RETRY_SLEEP_SECONDS, GRPC_ERROR_STRATEGY_MISMATCH, GRPC_ERROR_ACCESS_DENIED, + PRIMITIVE_TYPES +) +from e6data_python_connector.datainputstream import get_query_columns_info, read_rows_from_chunk, \ + is_fastbinary_available from e6data_python_connector.server import e6x_engine_pb2_grpc, e6x_engine_pb2 +from e6data_python_connector.strategy import _get_grpc_header from e6data_python_connector.typeId import * apilevel = '2.0' @@ -71,7 +73,7 @@ def _parse_timestamp(value): def re_auth(func): def wrapper(self, *args, **kwargs): - max_retry = 5 + max_retry = MAX_RETRY_ATTEMPTS current_retry = 0 while current_retry < max_retry: try: @@ -80,10 +82,10 @@ def wrapper(self, *args, **kwargs): current_retry += 1 if current_retry == max_retry: raise e - if e.code() == grpc.StatusCode.INTERNAL and 'Access denied' in e.details(): - time.sleep(0.2) + if e.code() == grpc.StatusCode.INTERNAL and GRPC_ERROR_ACCESS_DENIED in e.details(): + time.sleep(RETRY_SLEEP_SECONDS) self.connection.get_re_authenticate_session_id() - elif 'status: 456' in e.details(): + elif GRPC_ERROR_STRATEGY_MISMATCH in e.details(): # Strategy changed, clear cache and retry _clear_strategy_cache() # Force re-authentication which will detect new strategy @@ -304,12 +306,6 @@ def _get_strategy_debug_info(): } -def _get_grpc_header(engine_ip=None, cluster=None, strategy=None): - """Generate gRPC metadata headers for the request.""" - # Use the strategy module's implementation - return _get_strategy_header(engine_ip=engine_ip, cluster=cluster, strategy=strategy) - - def connect(*args, **kwargs): """Constructor for creating a connection to the database. See class :py:class:`Connection` for arguments. From bcf2835bd05285f050c949bbf157de48b6c45274 Mon Sep 17 00:00:00 2001 From: Vishal Anand <101251245+vishale6x@users.noreply.github.com> Date: Sun, 23 Nov 2025 11:43:04 +0530 Subject: [PATCH 6/6] test updated --- test/test_code_optimizations.py | 322 ++++++++++++++++++++++++++++++++ test_e6data_date_functions.py | 207 ++++++++++++++++++++ 2 files changed, 529 insertions(+) create mode 100644 test/test_code_optimizations.py create mode 100644 test_e6data_date_functions.py diff --git a/test/test_code_optimizations.py b/test/test_code_optimizations.py new file mode 100644 index 0000000..c6601b3 --- /dev/null +++ b/test/test_code_optimizations.py @@ -0,0 +1,322 @@ +""" +Unit tests for code optimizations made to e6data-python-connector. + +Tests cover: +1. SSL credentials utility function (get_ssl_credentials) +2. Constants usage +3. Strategy-related optimizations +""" + +import unittest +import tempfile +import os +from unittest.mock import Mock, patch, MagicMock +import grpc + +# Import the modules we're testing +from e6data_python_connector.common import get_ssl_credentials +from e6data_python_connector import constants +from e6data_python_connector.strategy import _get_grpc_header + + +class TestSSLCredentialsUtility(unittest.TestCase): + """Test cases for the get_ssl_credentials utility function.""" + + def test_ssl_cert_none_returns_default_credentials(self): + """Test that None returns system default CA bundle.""" + credentials = get_ssl_credentials(None) + self.assertIsNotNone(credentials) + # Should return grpc.ssl_channel_credentials() with no args + + def test_ssl_cert_with_valid_file_path(self): + """Test that valid file path reads and returns credentials.""" + # Create a temporary certificate file + cert_content = b"""-----BEGIN CERTIFICATE----- +MIIDXTCCAkWgAwIBAgIJAKL0UG+mRKKzMA0GCSqGSIb3DQEBCwUAMEUxCzAJBgNV +BAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBX +-----END CERTIFICATE-----""" + + with tempfile.NamedTemporaryFile(mode='wb', delete=False, suffix='.pem') as f: + f.write(cert_content) + cert_file_path = f.name + + try: + credentials = get_ssl_credentials(cert_file_path) + self.assertIsNotNone(credentials) + finally: + os.unlink(cert_file_path) + + def test_ssl_cert_with_invalid_file_path_raises_error(self): + """Test that invalid file path raises FileNotFoundError.""" + with self.assertRaises(FileNotFoundError): + get_ssl_credentials('/nonexistent/path/to/cert.pem') + + def test_ssl_cert_with_bytes_content(self): + """Test that certificate content as bytes works correctly.""" + cert_content = b"""-----BEGIN CERTIFICATE----- +MIIDXTCCAkWgAwIBAgIJAKL0UG+mRKKzMA0GCSqGSIb3DQEBCwUAMEUxCzAJBgNV +BAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBX +-----END CERTIFICATE-----""" + + credentials = get_ssl_credentials(cert_content) + self.assertIsNotNone(credentials) + + def test_ssl_cert_with_invalid_type_returns_default(self): + """Test that invalid type (not str/bytes/None) returns default credentials with warning.""" + # Should log warning and return default credentials + with patch('e6data_python_connector.common._logger') as mock_logger: + credentials = get_ssl_credentials(12345) # Invalid type + self.assertIsNotNone(credentials) + # Verify warning was logged + mock_logger.warning.assert_called() + + def test_ssl_cert_with_unreadable_file_raises_io_error(self): + """Test that unreadable file raises IOError.""" + # Create a file and make it unreadable + with tempfile.NamedTemporaryFile(mode='wb', delete=False, suffix='.pem') as f: + f.write(b"test cert") + cert_file_path = f.name + + try: + os.chmod(cert_file_path, 0o000) # Remove all permissions + with self.assertRaises(IOError): + get_ssl_credentials(cert_file_path) + finally: + os.chmod(cert_file_path, 0o644) # Restore permissions + os.unlink(cert_file_path) + + +class TestConstants(unittest.TestCase): + """Test cases for constants module.""" + + def test_retry_constants_exist(self): + """Test that retry-related constants are defined.""" + self.assertTrue(hasattr(constants, 'MAX_RETRY_ATTEMPTS')) + self.assertTrue(hasattr(constants, 'RETRY_SLEEP_SECONDS')) + self.assertIsInstance(constants.MAX_RETRY_ATTEMPTS, int) + self.assertIsInstance(constants.RETRY_SLEEP_SECONDS, (int, float)) + + def test_timeout_constants_exist(self): + """Test that timeout-related constants are defined.""" + self.assertTrue(hasattr(constants, 'STRATEGY_CACHE_TIMEOUT_SECONDS')) + self.assertTrue(hasattr(constants, 'DEFAULT_GRPC_PREPARE_TIMEOUT_SECONDS')) + self.assertTrue(hasattr(constants, 'DEFAULT_AUTO_RESUME_TIMEOUT_SECONDS')) + self.assertTrue(hasattr(constants, 'CLUSTER_STATUS_CHECK_SLEEP_SECONDS')) + self.assertTrue(hasattr(constants, 'LOCK_TIMEOUT_MS')) + + def test_connection_pool_constants_exist(self): + """Test that connection pool constants are defined.""" + self.assertTrue(hasattr(constants, 'POOL_GET_TIMEOUT_SECONDS')) + self.assertTrue(hasattr(constants, 'POOL_RETRY_SLEEP_SECONDS')) + self.assertIsInstance(constants.POOL_GET_TIMEOUT_SECONDS, (int, float)) + self.assertIsInstance(constants.POOL_RETRY_SLEEP_SECONDS, (int, float)) + + def test_strategy_constants_exist(self): + """Test that strategy-related constants are defined.""" + self.assertTrue(hasattr(constants, 'STRATEGY_BLUE')) + self.assertTrue(hasattr(constants, 'STRATEGY_GREEN')) + self.assertTrue(hasattr(constants, 'VALID_STRATEGIES')) + + self.assertEqual(constants.STRATEGY_BLUE, 'blue') + self.assertEqual(constants.STRATEGY_GREEN, 'green') + self.assertIsInstance(constants.VALID_STRATEGIES, set) + self.assertIn('blue', constants.VALID_STRATEGIES) + self.assertIn('green', constants.VALID_STRATEGIES) + + def test_grpc_error_constants_exist(self): + """Test that gRPC error message constants are defined.""" + self.assertTrue(hasattr(constants, 'GRPC_ERROR_STRATEGY_MISMATCH')) + self.assertTrue(hasattr(constants, 'GRPC_ERROR_SERVICE_UNAVAILABLE')) + self.assertTrue(hasattr(constants, 'GRPC_ERROR_ACCESS_DENIED')) + + self.assertEqual(constants.GRPC_ERROR_STRATEGY_MISMATCH, 'status: 456') + self.assertEqual(constants.GRPC_ERROR_SERVICE_UNAVAILABLE, 'status: 503') + self.assertEqual(constants.GRPC_ERROR_ACCESS_DENIED, 'Access denied') + + def test_constant_values_are_sensible(self): + """Test that constant values are sensible/reasonable.""" + # Retry attempts should be positive + self.assertGreater(constants.MAX_RETRY_ATTEMPTS, 0) + + # Timeouts should be positive + self.assertGreater(constants.STRATEGY_CACHE_TIMEOUT_SECONDS, 0) + self.assertGreater(constants.DEFAULT_GRPC_PREPARE_TIMEOUT_SECONDS, 0) + self.assertGreater(constants.DEFAULT_AUTO_RESUME_TIMEOUT_SECONDS, 0) + + # Sleep durations should be positive + self.assertGreater(constants.RETRY_SLEEP_SECONDS, 0) + self.assertGreater(constants.CLUSTER_STATUS_CHECK_SLEEP_SECONDS, 0) + + +class TestGrpcHeaderFunction(unittest.TestCase): + """Test cases for _get_grpc_header function (ensure no duplicate wrappers).""" + + def test_get_grpc_header_imports_correctly(self): + """Test that _get_grpc_header is imported from strategy module.""" + # This should work without errors + from e6data_python_connector.strategy import _get_grpc_header + self.assertTrue(callable(_get_grpc_header)) + + def test_get_grpc_header_basic_call(self): + """Test basic functionality of _get_grpc_header.""" + headers = _get_grpc_header() + self.assertIsInstance(headers, list) + + def test_get_grpc_header_with_engine_ip(self): + """Test _get_grpc_header with engine IP.""" + headers = _get_grpc_header(engine_ip='192.168.1.1') + self.assertIsInstance(headers, list) + # Should contain plannerip header (actual header name used) + header_keys = [h[0] for h in headers] + self.assertIn('plannerip', header_keys) + + def test_get_grpc_header_with_cluster(self): + """Test _get_grpc_header with cluster UUID.""" + headers = _get_grpc_header(cluster='test-cluster-uuid') + self.assertIsInstance(headers, list) + # Should contain cluster-name header (actual header name used) + header_keys = [h[0] for h in headers] + self.assertIn('cluster-name', header_keys) + + def test_get_grpc_header_with_strategy(self): + """Test _get_grpc_header with strategy.""" + headers = _get_grpc_header(strategy='blue') + self.assertIsInstance(headers, list) + # Should contain strategy header + header_keys = [h[0] for h in headers] + self.assertIn('strategy', header_keys) + + def test_get_grpc_header_with_all_params(self): + """Test _get_grpc_header with all parameters.""" + headers = _get_grpc_header( + engine_ip='192.168.1.1', + cluster='test-cluster', + strategy='green' + ) + self.assertIsInstance(headers, list) + header_keys = [h[0] for h in headers] + self.assertIn('plannerip', header_keys) # Actual header name + self.assertIn('cluster-name', header_keys) # Actual header name + self.assertIn('strategy', header_keys) + + +class TestReAuthDecorator(unittest.TestCase): + """Test cases for re_auth decorator with constants.""" + + def test_reauth_decorator_source_uses_constants(self): + """Test that re_auth decorator source code uses constant names.""" + from e6data_python_connector.e6data_grpc import re_auth + import inspect + + # Get the source code of the re_auth decorator + source = inspect.getsource(re_auth) + + # Verify it uses the constant names instead of magic numbers + self.assertIn('MAX_RETRY_ATTEMPTS', source) + self.assertIn('RETRY_SLEEP_SECONDS', source) + self.assertIn('GRPC_ERROR_ACCESS_DENIED', source) + self.assertIn('GRPC_ERROR_STRATEGY_MISMATCH', source) + + # Verify it doesn't have hardcoded values + self.assertNotIn('max_retry = 5', source) + self.assertNotIn('time.sleep(0.2)', source, + "Should use RETRY_SLEEP_SECONDS constant instead of hardcoded 0.2") + + def test_reauth_error_messages_use_constants(self): + """Test that error message checks use constants.""" + from e6data_python_connector.e6data_grpc import re_auth + import inspect + + source = inspect.getsource(re_auth) + + # Should use constants for error message matching + # Instead of 'Access denied' or 'status: 456' + self.assertIn('GRPC_ERROR_ACCESS_DENIED', source) + self.assertIn('GRPC_ERROR_STRATEGY_MISMATCH', source) + + # Should not have hardcoded error strings + self.assertNotIn("'Access denied'", source, + "Should use GRPC_ERROR_ACCESS_DENIED constant") + self.assertNotIn("'status: 456'", source, + "Should use GRPC_ERROR_STRATEGY_MISMATCH constant") + + +class TestCodeDuplicationRemoval(unittest.TestCase): + """Test that code duplication has been properly removed.""" + + def test_no_duplicate_get_ssl_credentials_in_e6data_grpc(self): + """Test that get_ssl_credentials is not duplicated in e6data_grpc module.""" + import e6data_python_connector.e6data_grpc as grpc_module + + # Should not have its own _get_ssl_credentials method + # It should use the one from common + self.assertFalse(hasattr(grpc_module.Connection, '_get_ssl_credentials')) + + def test_no_duplicate_get_ssl_credentials_in_cluster_manager(self): + """Test that get_ssl_credentials is not duplicated in cluster_manager module.""" + import e6data_python_connector.cluster_manager as cm_module + + # Should not have its own _get_ssl_credentials method + self.assertFalse(hasattr(cm_module.ClusterManager, '_get_ssl_credentials')) + + def test_no_duplicate_get_grpc_header_in_e6data_grpc(self): + """Test that _get_grpc_header wrapper is not duplicated in e6data_grpc.""" + import e6data_python_connector.e6data_grpc as grpc_module + import inspect + + # Should import from strategy, not define its own + source = inspect.getsource(grpc_module) + + # Should have import statement + self.assertIn('from e6data_python_connector.strategy import _get_grpc_header', source) + + # Should NOT have its own definition (wrapper function) + # Look for function definition pattern + self.assertNotIn('def _get_grpc_header(engine_ip=None, cluster=None, strategy=None):', source) + + def test_no_duplicate_get_grpc_header_in_cluster_manager(self): + """Test that _get_grpc_header wrapper is not duplicated in cluster_manager.""" + import e6data_python_connector.cluster_manager as cm_module + import inspect + + source = inspect.getsource(cm_module) + + # Should have import statement + self.assertIn('from e6data_python_connector.strategy import', source) + self.assertIn('_get_grpc_header', source) + + # Should NOT have its own wrapper definition + self.assertNotIn('def _get_grpc_header(engine_ip=None, cluster=None, strategy=None):', source) + + +class TestConstantsIntegration(unittest.TestCase): + """Integration tests to ensure constants are used throughout codebase.""" + + def test_e6data_grpc_imports_constants(self): + """Test that e6data_grpc imports and uses constants.""" + import e6data_python_connector.e6data_grpc as grpc_module + import inspect + + source = inspect.getsource(grpc_module) + + # Should import constants + self.assertIn('from e6data_python_connector.constants import', source) + + # Should use the constants + self.assertIn('MAX_RETRY_ATTEMPTS', source) + self.assertIn('RETRY_SLEEP_SECONDS', source) + self.assertIn('GRPC_ERROR_STRATEGY_MISMATCH', source) + self.assertIn('GRPC_ERROR_ACCESS_DENIED', source) + + def test_constants_values_match_expected(self): + """Test that constant values match expected values from original code.""" + # These should match the original hardcoded values + self.assertEqual(constants.MAX_RETRY_ATTEMPTS, 5) + self.assertEqual(constants.RETRY_SLEEP_SECONDS, 0.2) + self.assertEqual(constants.STRATEGY_CACHE_TIMEOUT_SECONDS, 300) + self.assertEqual(constants.DEFAULT_GRPC_PREPARE_TIMEOUT_SECONDS, 600) + self.assertEqual(constants.DEFAULT_AUTO_RESUME_TIMEOUT_SECONDS, 300) + + +if __name__ == '__main__': + unittest.main() \ No newline at end of file diff --git a/test_e6data_date_functions.py b/test_e6data_date_functions.py new file mode 100644 index 0000000..3b14b88 --- /dev/null +++ b/test_e6data_date_functions.py @@ -0,0 +1,207 @@ +#!/usr/bin/env python3 +""" +Test script to identify which date/time functions e6data supports. +This helps determine the correct time grain expressions for Superset integration. + +Usage: + python test_e6data_date_functions.py + +Requirements: + - Set environment variables: ENGINE_IP, DB_NAME, EMAIL, PASSWORD, CATALOG, PORT + - Or modify the connection parameters in this script +""" + +import os +from e6data_python_connector import Connection + + +def test_date_functions(): + """Test various date/time functions to see which ones e6data supports""" + + # Connection parameters - modify these or use environment variables + host = os.getenv("ENGINE_IP", "your_host") + port = int(os.getenv("PORT", "80")) + username = os.getenv("EMAIL", "your_email") + password = os.getenv("PASSWORD", "your_token") + database = os.getenv("DB_NAME", "your_database") + catalog = os.getenv("CATALOG", "your_catalog") + + print("=" * 80) + print("E6DATA DATE/TIME FUNCTION COMPATIBILITY TEST") + print("=" * 80) + print() + + # Create connection + try: + conn = Connection( + host=host, + port=port, + username=username, + password=password, + database=database, + catalog=catalog + ) + print("✓ Connection established successfully") + print() + except Exception as e: + print(f"✗ Failed to connect: {e}") + print("\nPlease set environment variables or modify the script:") + print(" - ENGINE_IP: e6data cluster IP") + print(" - PORT: e6data port (default: 80)") + print(" - EMAIL: your e6data email") + print(" - PASSWORD: your access token") + print(" - DB_NAME: database name") + print(" - CATALOG: catalog name") + return + + # Test queries for different date functions + test_queries = { + "CURRENT_TIMESTAMP": "SELECT CURRENT_TIMESTAMP as result", + "CURRENT_DATE": "SELECT CURRENT_DATE() as result", + + # DATE_TRUNC function (most common in modern SQL engines) + "DATE_TRUNC (second)": "SELECT DATE_TRUNC('second', CURRENT_TIMESTAMP) as result", + "DATE_TRUNC (minute)": "SELECT DATE_TRUNC('minute', CURRENT_TIMESTAMP) as result", + "DATE_TRUNC (hour)": "SELECT DATE_TRUNC('hour', CURRENT_TIMESTAMP) as result", + "DATE_TRUNC (day)": "SELECT DATE_TRUNC('day', CURRENT_TIMESTAMP) as result", + "DATE_TRUNC (week)": "SELECT DATE_TRUNC('week', CURRENT_TIMESTAMP) as result", + "DATE_TRUNC (month)": "SELECT DATE_TRUNC('month', CURRENT_TIMESTAMP) as result", + "DATE_TRUNC (quarter)": "SELECT DATE_TRUNC('quarter', CURRENT_TIMESTAMP) as result", + "DATE_TRUNC (year)": "SELECT DATE_TRUNC('year', CURRENT_TIMESTAMP) as result", + + # TRUNC function (Oracle-style) + "TRUNC (day)": "SELECT TRUNC(CURRENT_TIMESTAMP, 'DD') as result", + "TRUNC (month)": "SELECT TRUNC(CURRENT_TIMESTAMP, 'MM') as result", + "TRUNC (year)": "SELECT TRUNC(CURRENT_TIMESTAMP, 'YYYY') as result", + + # DATE_FORMAT function (MySQL-style) + "DATE_FORMAT (day)": "SELECT DATE_FORMAT(CURRENT_TIMESTAMP, '%Y-%m-%d') as result", + "DATE_FORMAT (month)": "SELECT DATE_FORMAT(CURRENT_TIMESTAMP, '%Y-%m-01') as result", + "DATE_FORMAT (year)": "SELECT DATE_FORMAT(CURRENT_TIMESTAMP, '%Y-01-01') as result", + + # CAST to DATE (simple truncation to day) + "CAST AS DATE": "SELECT CAST(CURRENT_TIMESTAMP AS DATE) as result", + + # YEAR, MONTH, DAY functions + "YEAR function": "SELECT YEAR(CURRENT_TIMESTAMP) as result", + "MONTH function": "SELECT MONTH(CURRENT_TIMESTAMP) as result", + "DAY function": "SELECT DAY(CURRENT_TIMESTAMP) as result", + "QUARTER function": "SELECT QUARTER(CURRENT_TIMESTAMP) as result", + "DAYOFWEEK function": "SELECT DAYOFWEEK(CURRENT_TIMESTAMP) as result", + + # Presto/Trino-style date_trunc + "date_trunc (lowercase)": "SELECT date_trunc('day', CURRENT_TIMESTAMP) as result", + + # FROM_UNIXTIME (for epoch conversion) + "FROM_UNIXTIME": "SELECT FROM_UNIXTIME(1609459200) as result", + + # TO_TIMESTAMP (string to timestamp) + "TO_TIMESTAMP": "SELECT TO_TIMESTAMP('2023-01-01 12:00:00', 'YYYY-MM-DD HH24:MI:SS') as result", + } + + results = {} + cursor = conn.cursor() + + print("Testing date/time functions...") + print("-" * 80) + + for func_name, query in test_queries.items(): + try: + cursor.execute(query) + result = cursor.fetchone() + results[func_name] = { + "status": "✓ SUPPORTED", + "result": result[0] if result else None, + "error": None + } + print(f"✓ {func_name:30s} | {result[0]}") + except Exception as e: + error_msg = str(e) + results[func_name] = { + "status": "✗ NOT SUPPORTED", + "result": None, + "error": error_msg[:80] # Truncate error message + } + print(f"✗ {func_name:30s} | Error: {error_msg[:50]}...") + + cursor.close() + conn.close() + + # Summary and recommendations + print() + print("=" * 80) + print("RECOMMENDATIONS FOR SUPERSET ENGINE SPEC") + print("=" * 80) + print() + + # Determine which date truncation method works + if results.get("DATE_TRUNC (day)", {}).get("status") == "✓ SUPPORTED": + print("✓ E6data supports DATE_TRUNC function (Presto/Trino-style)") + print() + print("Use this in your Superset engine spec:") + print(""" +_time_grain_expressions = { + None: "{col}", + "PT1S": "DATE_TRUNC('second', {col})", + "PT1M": "DATE_TRUNC('minute', {col})", + "PT1H": "DATE_TRUNC('hour', {col})", + "P1D": "DATE_TRUNC('day', {col})", + "P1W": "DATE_TRUNC('week', {col})", + "P1M": "DATE_TRUNC('month', {col})", + "P3M": "DATE_TRUNC('quarter', {col})", + "P1Y": "DATE_TRUNC('year', {col})", +} +""") + elif results.get("TRUNC (day)", {}).get("status") == "✓ SUPPORTED": + print("✓ E6data supports TRUNC function (Oracle-style)") + print() + print("Use this in your Superset engine spec:") + print(""" +_time_grain_expressions = { + None: "{col}", + "P1D": "TRUNC({col}, 'DD')", + "P1M": "TRUNC({col}, 'MM')", + "P1Y": "TRUNC({col}, 'YYYY')", +} +""") + elif results.get("DATE_FORMAT (day)", {}).get("status") == "✓ SUPPORTED": + print("✓ E6data supports DATE_FORMAT function (MySQL-style)") + print() + print("Use this in your Superset engine spec:") + print(""" +_time_grain_expressions = { + None: "{col}", + "PT1H": "DATE_FORMAT({col}, '%Y-%m-%d %H:00:00')", + "P1D": "DATE_FORMAT({col}, '%Y-%m-%d')", + "P1M": "DATE_FORMAT({col}, '%Y-%m-01')", + "P1Y": "DATE_FORMAT({col}, '%Y-01-01')", +} +""") + elif results.get("CAST AS DATE", {}).get("status") == "✓ SUPPORTED": + print("✓ E6data supports CAST to DATE (basic truncation to day)") + print() + print("Note: This only provides day-level granularity") + print(""" +_time_grain_expressions = { + None: "{col}", + "P1D": "CAST({col} AS DATE)", +} +""") + else: + print("⚠ Warning: No standard date truncation functions detected") + print("You may need to contact e6data support for date manipulation syntax") + + print() + print("=" * 80) + print("NEXT STEPS") + print("=" * 80) + print() + print("1. Review the results above to identify supported functions") + print("2. Update SUPERSET_ENGINE_SPEC_SOLUTION.md with the correct expressions") + print("3. Create /app/superset/db_engine_specs/e6data.py in your Superset installation") + print("4. Restart Superset and test time-based charts") + print() + + +if __name__ == "__main__": + test_date_functions() \ No newline at end of file