Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 115 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# e6data Python Connector

![version](https://img.shields.io/badge/version-2.3.11-blue.svg)
![version](https://img.shields.io/badge/version-2.3.12-blue.svg)

## Introduction

Expand Down Expand Up @@ -83,9 +83,11 @@ The `Connection` class supports the following parameters:
| `catalog` | str | No | None | Catalog name |
| `cluster_name` | str | No | None | Name of the cluster for cluster-specific operations |
| `secure` | bool | No | False | Enable SSL/TLS for secure connections |
| `ssl_cert` | str/bytes | No | None | Path to CA certificate (PEM) or certificate bytes for HTTPS connections |
| `auto_resume` | bool | No | True | Automatically resume cluster if suspended |
| `grpc_options` | dict | No | None | Additional gRPC configuration options |
| `debug` | bool | No | False | Enable debug logging for troubleshooting |
| `require_fastbinary` | bool | No | True | Require fastbinary module for Thrift deserialization. Set to False to use pure Python implementation if system dependencies cannot be installed |

#### Secure Connection Example

Expand Down Expand Up @@ -119,6 +121,117 @@ conn = Connection(
)
```

#### HTTPS Connection with HAProxy

When connecting through HAProxy with HTTPS, you can provide a custom CA certificate for secure connections. The `ssl_cert` parameter accepts either a file path to a PEM certificate or the certificate content as bytes.

**Using a CA certificate file path:**

```python
conn = Connection(
host=host,
port=443,
username=username,
password=password,
database=database,
secure=True,
ssl_cert='/path/to/ca-cert.pem' # Path to your CA certificate
)
```

**Reading certificate content as bytes:**

```python
# Read certificate file and pass as bytes
with open('/path/to/ca-cert.pem', 'rb') as cert_file:
cert_data = cert_file.read()

conn = Connection(
host=host,
port=443,
username=username,
password=password,
database=database,
secure=True,
ssl_cert=cert_data # Certificate content as bytes
)
```

**Using system CA bundle for publicly signed certificates:**

```python
# When ssl_cert is None, system default CA bundle is used
conn = Connection(
host=host,
port=443,
username=username,
password=password,
database=database,
secure=True # Uses system CA bundle by default
)
```

**Connection pooling with custom CA certificate:**

```python
pool = ConnectionPool(
min_size=2,
max_size=10,
host=host,
port=443,
username=username,
password=password,
database=database,
secure=True,
ssl_cert='/path/to/ca-cert.pem' # Custom CA certificate for pool connections
)
```

#### Handling Missing System Dependencies

The e6data connector uses the `fastbinary` module (from Apache Thrift) for optimal performance when deserializing data. This module requires system-level dependencies (`python3-devel` and `gcc-c++`) to be installed.

**Default Behavior (Recommended):**
By default, the connector requires `fastbinary` to be available. If it's not found, the connection will fail immediately with a clear error message:

```python
conn = Connection(
host=host,
port=port,
username=username,
password=password,
database=database
)
# Raises exception if fastbinary is not available
```

**Fallback to Pure Python:**
If you cannot install system dependencies (e.g., in restricted environments, serverless platforms, or containers without build tools), you can disable the `fastbinary` requirement. The connector will fall back to a pure Python implementation with a performance penalty:

```python
conn = Connection(
host=host,
port=port,
username=username,
password=password,
database=database,
require_fastbinary=False # Allow operation without fastbinary
)
# Logs warning but continues with pure Python implementation
```

**When to use `require_fastbinary=False`:**
- Running in AWS Lambda or other serverless environments
- Docker containers built without compilation tools
- Restricted environments where system packages cannot be installed
- Development/testing environments where performance is not critical

**Performance Impact:**
- With `fastbinary`: Optimal performance for data deserialization
- Without `fastbinary` (pure Python): ~2-3x slower deserialization, but otherwise fully functional

**Note:** It's strongly recommended to install system dependencies when possible for best performance. The `require_fastbinary=False` option should only be used when system dependencies cannot be installed.

### Perform a Queries & Get Results

```python
Expand Down Expand Up @@ -652,5 +765,6 @@ python your_script.py
| 456 Strategy Error | Automatic blue-green failover will handle this |
| Memory issues with large results | Use `fetchall_buffer()` instead of `fetchall()` |
| gRPC message size errors | Configure `grpc_options` with appropriate message size limits |
| fastbinary import error | Install system dependencies (`python3-devel`, `gcc-c++`) or set `require_fastbinary=False` |

See [TECH_DOC.md](TECH_DOC.md) for detailed technical documentation.
29 changes: 7 additions & 22 deletions e6data_python_connector/cluster_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,9 @@

logger = logging.getLogger(__name__)

from e6data_python_connector.common import get_ssl_credentials
from e6data_python_connector.strategy import _get_active_strategy, _set_active_strategy, _set_pending_strategy, \
_get_grpc_header as _get_strategy_header


def _get_grpc_header(engine_ip=None, cluster=None, strategy=None):
"""
Generate gRPC metadata headers for the request.

This function creates a list of metadata headers to be used in gRPC requests.
It includes optional headers for the engine IP, cluster UUID, and deployment strategy.

Args:
engine_ip (str, optional): The IP address of the engine. Defaults to None.
cluster (str, optional): The UUID of the cluster. Defaults to None.
strategy (str, optional): The deployment strategy (blue/green). Defaults to None.

Returns:
list: A list of tuples representing the gRPC metadata headers.
"""
# Use the strategy module's implementation
return _get_strategy_header(engine_ip=engine_ip, cluster=cluster, strategy=strategy)
_get_grpc_header


class _StatusLock:
Expand Down Expand Up @@ -140,7 +122,7 @@ class ClusterManager:
"""

def __init__(self, host: str, port: int, user: str, password: str, secure_channel: bool = False, timeout=60 * 5,
cluster_uuid=None, grpc_options=None, debug=False):
cluster_uuid=None, grpc_options=None, debug=False, ssl_cert=None):
"""
Initializes a new instance of the ClusterManager class.

Expand All @@ -156,6 +138,8 @@ def __init__(self, host: str, port: int, user: str, password: str, secure_channe
cluster_uuid (str, optional): The unique identifier for the target cluster;
defaults to None.
debug (bool, optional): Enable debug logging; defaults to False.
ssl_cert (str or bytes, optional): Path to CA certificate file (PEM format) or
certificate content as bytes for secure connections; defaults to None.
"""

self._host = host
Expand All @@ -169,6 +153,7 @@ def __init__(self, host: str, port: int, user: str, password: str, secure_channe
if grpc_options is None:
self._grpc_options = dict()
self._debug = debug
self._ssl_cert = ssl_cert

@property
def _get_connection(self):
Expand All @@ -184,7 +169,7 @@ def _get_connection(self):
self._channel = grpc.secure_channel(
target='{}:{}'.format(self._host, self._port),
options=self._grpc_options,
credentials=grpc.ssl_channel_credentials()
credentials=get_ssl_credentials(self._ssl_cert)
)
else:
self._channel = grpc.insecure_channel(
Expand Down
44 changes: 44 additions & 0 deletions e6data_python_connector/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from builtins import object
from builtins import str

import grpc
from future.utils import with_metaclass
from past.builtins import basestring

Expand Down Expand Up @@ -267,3 +268,46 @@ class UniversalSet(object):

def __contains__(self, item):
return True


def get_ssl_credentials(ssl_cert):
"""
Get SSL credentials for secure gRPC channel.

Handles three scenarios:
1. ssl_cert is a string (file path): Read the PEM certificate from the file
2. ssl_cert is bytes: Use the certificate content directly
3. ssl_cert is None: Use system default CA bundle

Parameters:
ssl_cert (str or bytes or None): SSL certificate as file path, bytes, or None

Returns:
grpc.ChannelCredentials: SSL credentials for gRPC channel

Raises:
FileNotFoundError: If ssl_cert is a file path but the file doesn't exist
IOError: If ssl_cert file cannot be read
"""
if ssl_cert is None:
# Use system default CA bundle
return grpc.ssl_channel_credentials()
elif isinstance(ssl_cert, str):
# ssl_cert is a file path - read the certificate from file
try:
with open(ssl_cert, 'rb') as cert_file:
root_ca_cert = cert_file.read()
return grpc.ssl_channel_credentials(root_certificates=root_ca_cert)
except FileNotFoundError:
_logger.error("SSL certificate file not found: {}".format(ssl_cert))
raise
except IOError as e:
_logger.error("Failed to read SSL certificate file {}: {}".format(ssl_cert, e))
raise
elif isinstance(ssl_cert, bytes):
# ssl_cert is certificate content as bytes
return grpc.ssl_channel_credentials(root_certificates=ssl_cert)
else:
# Invalid type - fall back to system default CA bundle with warning
_logger.warning("Invalid ssl_cert type: {}. Using system default CA bundle.".format(type(ssl_cert)))
return grpc.ssl_channel_credentials()
24 changes: 24 additions & 0 deletions e6data_python_connector/constants.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import pytz

# Type definitions
PRIMITIVE_TYPES = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 15, 16, 17, 18, 19, 20, 21}
COMPLEX_TYPES = {10, 11, 12, 13, 14}
COLLECTION_TYPES = {10, 11}
Expand Down Expand Up @@ -30,3 +31,26 @@
PRECISION = "precision"
SCALE = "scale"
ZONE = pytz.timezone('UTC')

# Retry and timeout constants
MAX_RETRY_ATTEMPTS = 5
RETRY_SLEEP_SECONDS = 0.2
STRATEGY_CACHE_TIMEOUT_SECONDS = 300 # 5 minutes
DEFAULT_GRPC_PREPARE_TIMEOUT_SECONDS = 600 # 10 minutes
DEFAULT_AUTO_RESUME_TIMEOUT_SECONDS = 300 # 5 minutes
CLUSTER_STATUS_CHECK_SLEEP_SECONDS = 5
LOCK_TIMEOUT_MS = 500

# Connection pool constants
POOL_GET_TIMEOUT_SECONDS = 0.1
POOL_RETRY_SLEEP_SECONDS = 0.1

# Blue-green deployment strategy constants
STRATEGY_BLUE = 'blue'
STRATEGY_GREEN = 'green'
VALID_STRATEGIES = {STRATEGY_BLUE, STRATEGY_GREEN}

# gRPC error codes and messages
GRPC_ERROR_STRATEGY_MISMATCH = 'status: 456'
GRPC_ERROR_SERVICE_UNAVAILABLE = 'status: 503'
GRPC_ERROR_ACCESS_DENIED = 'Access denied'
31 changes: 24 additions & 7 deletions e6data_python_connector/datainputstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,27 @@
from e6data_python_connector.constants import ZONE
from e6data_python_connector.date_time_utils import floor_div, floor_mod, timezone_from_offset

# Try to import fastbinary - it's optional but provides better performance
_fastbinary_available = False
try:
from thrift.protocol import fastbinary
_fastbinary_available = True
except ImportError:
raise Exception(
"""
Failed to import fastbinary.
Did you install system dependencies?
Please verify https://github.com/e6x-labs/e6data-python-connector#dependencies
"""
)
pass # Will check require_fastbinary flag at connection creation time

_logger = logging.getLogger(__name__)


def is_fastbinary_available():
"""
Check if fastbinary module is available.

Returns:
bool: True if fastbinary is available, False otherwise
"""
return _fastbinary_available


def _binary_to_decimal128(binary_data, scale=None):
"""
Convert binary data to Decimal128.
Expand Down Expand Up @@ -504,6 +511,16 @@ def read_values_from_array(query_columns_description: list, dis: DataInputStream


def read_rows_from_chunk(query_columns_description: list, buffer):
"""
Read rows from a Thrift-encoded chunk buffer.

Args:
query_columns_description: List of column descriptions
buffer: Thrift-encoded binary buffer

Returns:
List of rows
"""
# Create a transport and protocol instance for deserialization
transport = TTransport.TMemoryBuffer(buffer)
protocol = TBinaryProtocol.TBinaryProtocolAccelerated(transport)
Expand Down
Loading