Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# e6data Python Connector

![version](https://img.shields.io/badge/version-2.3.8-blue.svg)
![version](https://img.shields.io/badge/version-2.3.9-blue.svg)

## Introduction

Expand Down
4 changes: 3 additions & 1 deletion e6data_python_connector/dialect.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ def create_connect_args(self, url):
self.cluster_name = url.query.get("cluster-name")
self.secure = url.query.get("secure") == "true"
self.auto_resume = url.query.get("auto-resume", "true") == "true" # default to True
self.debug = url.query.get("debug", "false") == "true" # default to True
Copy link

Copilot AI Aug 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment incorrectly states 'default to True' when the code actually defaults to False. The comment should read '# default to False'.

Suggested change
self.debug = url.query.get("debug", "false") == "true" # default to True
self.debug = url.query.get("debug", "false") == "true" # default to False

Copilot uses AI. Check for mistakes.
if not self.catalog_name:
raise Exception('Please specify catalog in query parameter.')

Expand All @@ -250,7 +251,8 @@ def create_connect_args(self, url):
"cluster_name": self.cluster_name,
'secure': self.secure,
'auto_resume': self.auto_resume,
'grpc_options': grpc_options
'grpc_options': grpc_options,
'debug': self.debug
}
return [], kwargs

Expand Down
99 changes: 62 additions & 37 deletions e6data_python_connector/e6data_grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,26 +128,18 @@ def escape_string(self, item):
# Strategy cache timeout in seconds (5 minutes)
STRATEGY_CACHE_TIMEOUT = 300

# Global set to track debug-enabled connections
_debug_connections = set()

def _strategy_debug_log(message):
"""Log strategy debug messages if any connection has debug enabled."""
if _debug_connections:
print(f"[E6DATA_STRATEGY_DEBUG] {time.strftime('%Y-%m-%d %H:%M:%S')} - {message}")


def _get_shared_strategy():
"""Get or create the shared strategy storage."""
global _strategy_manager, _shared_strategy

# Try to use multiprocessing.Manager for process-safe storage
try:
if _strategy_manager is None:
_strategy_manager = multiprocessing.Manager()
_shared_strategy = _strategy_manager.dict()
_shared_strategy['active_strategy'] = None
_shared_strategy['last_check_time'] = 0
_shared_strategy['pending_strategy'] = None
_shared_strategy['query_strategy_map'] = _strategy_manager.dict()
_shared_strategy['last_transition_time'] = 0
_shared_strategy['session_invalidated'] = False
return _shared_strategy
except:
# Fall back to thread-local storage if Manager fails
return _local_strategy_cache
return _local_strategy_cache


def _get_active_strategy():
Expand Down Expand Up @@ -175,10 +167,14 @@ def _set_active_strategy(strategy):
with _strategy_lock:
shared_strategy = _get_shared_strategy()
current_time = time.time()
old_strategy = shared_strategy.get('active_strategy')

# Only update transition time if strategy actually changed
if shared_strategy['active_strategy'] != normalized_strategy:
if old_strategy != normalized_strategy:
shared_strategy['last_transition_time'] = current_time
_strategy_debug_log(f"Setting active strategy: {old_strategy} -> {normalized_strategy}")
else:
_strategy_debug_log(f"Active strategy unchanged: {normalized_strategy}")

shared_strategy['active_strategy'] = normalized_strategy
shared_strategy['last_check_time'] = current_time
Expand All @@ -188,6 +184,9 @@ def _clear_strategy_cache():
"""Clear the cached strategy to force re-detection."""
with _strategy_lock:
shared_strategy = _get_shared_strategy()
old_strategy = shared_strategy.get('active_strategy')
if old_strategy:
_strategy_debug_log(f"Clearing strategy cache (was: {old_strategy})")
shared_strategy['active_strategy'] = None
shared_strategy['last_check_time'] = 0
shared_strategy['pending_strategy'] = None
Expand All @@ -208,6 +207,8 @@ def _set_pending_strategy(strategy):

if normalized_strategy != current_active:
shared_strategy['pending_strategy'] = normalized_strategy
query_count = len(shared_strategy.get('query_strategy_map', {}))
_strategy_debug_log(f"Setting pending strategy: {normalized_strategy} (current: {current_active}, active queries: {query_count})")


def _apply_pending_strategy():
Expand All @@ -219,11 +220,15 @@ def _apply_pending_strategy():
new_strategy = shared_strategy['pending_strategy']
current_time = time.time()

_strategy_debug_log(f"Applying pending strategy: {old_strategy} -> {new_strategy}")

shared_strategy['active_strategy'] = new_strategy
shared_strategy['pending_strategy'] = None
shared_strategy['last_check_time'] = current_time
shared_strategy['last_transition_time'] = current_time
shared_strategy['session_invalidated'] = True # Invalidate all sessions

_strategy_debug_log(f"Strategy transition complete. All sessions invalidated.")

return new_strategy
return None
Expand All @@ -250,6 +255,7 @@ def _register_query_strategy(query_id, strategy):
query_map = shared_strategy.get('query_strategy_map', {})
query_map[query_id] = normalized_strategy
shared_strategy['query_strategy_map'] = query_map
_strategy_debug_log(f"Query {query_id} registered with strategy: {normalized_strategy}")


def _get_query_strategy(query_id):
Expand All @@ -271,8 +277,11 @@ def _cleanup_query_strategy(query_id):
shared_strategy = _get_shared_strategy()
query_map = shared_strategy.get('query_strategy_map', {})
if query_id in query_map:
strategy = query_map[query_id]
del query_map[query_id]
shared_strategy['query_strategy_map'] = query_map
remaining_queries = len(query_map)
_strategy_debug_log(f"Query {query_id} completed (was using {strategy}). Remaining active queries: {remaining_queries}")


def _get_strategy_debug_info():
Expand Down Expand Up @@ -319,6 +328,7 @@ def __init__(
auto_resume: bool = True,
scheme: str = 'e6data',
grpc_options: dict = None,
debug: bool = False,
):
"""
Parameters
Expand Down Expand Up @@ -350,6 +360,8 @@ def __init__(
- max_send_message_length: Similar to max_receive_message_length, this parameter sets the maximum allowed size (in bytes) for outgoing messages from the gRPC client
- grpc_prepare_timeout: Timeout for prepare statement API call (default to 10 minutes).
- keepalive_time_ms: This parameter defines the time, in milliseconds, Default to 30 seconds
debug: bool, Optional
Flag to enable debug logging for blue-green deployment strategy changes
"""
if not username or not password:
raise ValueError("username or password cannot be empty.")
Expand Down Expand Up @@ -379,6 +391,13 @@ def __init__(
The default maximum time on client side to wait for the cluster to resume is 5 minutes.
"""
self.grpc_auto_resume_timeout_seconds = self._grpc_options.pop('grpc_auto_resume_timeout_seconds')

# Store debug flag and register with debug connections
self._debug = debug
if self._debug:
_debug_connections.add(id(self))
_strategy_debug_log(f"Debug mode enabled for connection {id(self)}")

self._create_client()

@property
Expand Down Expand Up @@ -505,6 +524,7 @@ def get_session_id(self):

if active_strategy and not pending_strategy:
# Use cached strategy only if there's no pending strategy
_strategy_debug_log(f"Authenticating with cached strategy: {active_strategy}")
try:
authenticate_response = self._client.authenticate(
authenticate_request,
Expand All @@ -518,10 +538,15 @@ def get_session_id(self):
new_strategy = authenticate_response.new_strategy.lower()
if new_strategy != active_strategy:
_set_pending_strategy(new_strategy)
# Don't apply immediately - let new queries use fresh connections
_apply_pending_strategy()
self._session_id = None
self.close()
self._create_client()
return self.get_session_id
Copy link

Copilot AI Aug 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing function call parentheses. This should be return self.get_session_id() to actually call the method, not return the method object.

Suggested change
return self.get_session_id
return self.get_session_id()

Copilot uses AI. Check for mistakes.
except _InactiveRpcError as e:
if e.code() == grpc.StatusCode.UNKNOWN and 'status: 456' in e.details():
# Strategy changed, clear cache and retry
_strategy_debug_log(f"Got 456 error with strategy {active_strategy}, clearing cache and retrying")
_clear_strategy_cache()
active_strategy = None
else:
Expand All @@ -540,8 +565,10 @@ def get_session_id(self):
else:
# Always try blue first, then green if it fails with 456
strategies = ['blue', 'green']
_strategy_debug_log(f"No cached strategy, will try strategies in order: {strategies}")
last_error = None
for strategy in strategies:
_strategy_debug_log(f"Attempting authentication with strategy: {strategy}")
try:
authenticate_response = self._client.authenticate(
authenticate_request,
Expand All @@ -550,19 +577,24 @@ def get_session_id(self):
self._session_id = authenticate_response.sessionId
if self._session_id:
# Success! Cache this strategy
_strategy_debug_log(f"Authentication successful with strategy: {strategy}")
_set_active_strategy(strategy)

# Check for new strategy in authenticate response
if hasattr(authenticate_response,
'new_strategy') and authenticate_response.new_strategy:
if hasattr(authenticate_response, 'new_strategy') and authenticate_response.new_strategy:
new_strategy = authenticate_response.new_strategy.lower()
if new_strategy != strategy:
_set_pending_strategy(new_strategy)
# Don't apply immediately - let new queries use fresh connections
_apply_pending_strategy()
self.close()
self._create_client()
self._session_id = None
return self.get_session_id
Copy link

Copilot AI Aug 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing function call parentheses. This should be return self.get_session_id() to actually call the method, not return the method object.

Suggested change
return self.get_session_id
return self.get_session_id()

Copilot uses AI. Check for mistakes.
break
except _InactiveRpcError as e:
if e.code() == grpc.StatusCode.UNKNOWN and 'status: 456' in e.details():
# Wrong strategy, try the next one
_strategy_debug_log(f"Strategy {strategy} failed with 456 error, trying next")
last_error = e
continue
else:
Expand All @@ -589,21 +621,6 @@ def get_session_id(self):
).resume()
if status:
return self.get_session_id
# authenticate_request = e6x_engine_pb2.AuthenticateRequest(
# user=self.__username,
# password=self.__password
# )
# authenticate_response = self._client.authenticate(
# authenticate_request,
# metadata=_get_grpc_header(cluster=self.cluster_uuid, strategy=_get_active_strategy())
# )
# self._session_id = authenticate_response.sessionId
# # Check for new strategy in authenticate response
# if hasattr(authenticate_response, 'new_strategy') and authenticate_response.new_strategy:
# new_strategy = authenticate_response.new_strategy.lower()
# if new_strategy != _get_active_strategy():
# _set_pending_strategy(new_strategy)
# # Don't apply immediately - let new queries use fresh connections
else:
raise e
else:
Expand Down Expand Up @@ -649,6 +666,11 @@ def close(self):
self._channel.close()
self._channel = None
self._session_id = None

# Remove from debug connections if debug was enabled
if self._debug:
_debug_connections.discard(id(self))
_strategy_debug_log(f"Debug mode disabled for connection {id(self)}")

def check_connection(self):
"""
Expand Down Expand Up @@ -1095,6 +1117,7 @@ def clear(self, query_id=None):
query_map = shared_strategy.get('query_strategy_map', {})

if pending_strategy and len(query_map) == 0:
_strategy_debug_log(f"Last query cleared, triggering pending strategy transition")
_apply_pending_strategy()

return clear_response
Expand All @@ -1118,6 +1141,7 @@ def cancel(self, query_id):
query_map = shared_strategy.get('query_strategy_map', {})

if pending_strategy and len(query_map) == 0:
_strategy_debug_log(f"Last query cleared, triggering pending strategy transition")
_apply_pending_strategy()

def status(self, query_id):
Expand Down Expand Up @@ -1230,6 +1254,7 @@ def execute(self, operation, parameters=None, **kwargs):

# Check for new strategy in prepare response
if hasattr(prepare_statement_response, 'new_strategy') and prepare_statement_response.new_strategy:
print(f"@@@@@@ New strategy: {prepare_statement_response.new_strategy}")
Copy link

Copilot AI Aug 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Debug print statement with temporary markers should be removed or replaced with proper logging using the _strategy_debug_log function.

Suggested change
print(f"@@@@@@ New strategy: {prepare_statement_response.new_strategy}")
_strategy_debug_log(f"New strategy detected: {prepare_statement_response.new_strategy}")

Copilot uses AI. Check for mistakes.
new_strategy = prepare_statement_response.new_strategy.lower()
if new_strategy != _get_active_strategy():
_set_pending_strategy(new_strategy)
Expand Down
19 changes: 1 addition & 18 deletions e6data_python_connector/strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,24 +40,7 @@ def _initialize_shared_state():
with _initialization_lock:
if _shared_strategy is not None:
return _shared_strategy

try:
# Try to create multiprocessing Manager
_manager = multiprocessing.Manager()
_shared_strategy = _manager.dict({
'active_strategy': None,
'last_check_time': 0,
'pending_strategy': None,
'query_strategy_map': _manager.dict(),
'last_transition_time': 0,
'session_invalidated': False
})
_logger.debug("Successfully initialized multiprocessing Manager for strategy sharing")
return _shared_strategy
except Exception as e:
# Fall back to thread-local storage if Manager fails
_logger.warning(f"Failed to initialize multiprocessing Manager: {e}. Using thread-local storage.")
return _local_strategy_cache
return _local_strategy_cache


def _get_shared_strategy():
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

import setuptools

VERSION = (2, 3, 8,)
VERSION = (2, 3, 9,)


def get_long_desc():
Expand Down