From 61191ea0076347aa81d89ab3b3b6896696fd9802 Mon Sep 17 00:00:00 2001 From: Vishal Anand <101251245+vishale6x@users.noreply.github.com> Date: Fri, 1 Aug 2025 14:05:16 +0530 Subject: [PATCH 1/6] Query isolation. --- README.md | 2 +- e6data_python_connector/e6data_grpc.py | 27 +++++++------------------- setup.py | 2 +- 3 files changed, 9 insertions(+), 22 deletions(-) diff --git a/README.md b/README.md index 87db824..7758caf 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # e6data Python Connector -![version](https://img.shields.io/badge/version-2.3.8-blue.svg) +![version](https://img.shields.io/badge/version-2.3.9.rc7-blue.svg) ## Introduction diff --git a/e6data_python_connector/e6data_grpc.py b/e6data_python_connector/e6data_grpc.py index 68d7bd4..dfe69a8 100644 --- a/e6data_python_connector/e6data_grpc.py +++ b/e6data_python_connector/e6data_grpc.py @@ -215,7 +215,6 @@ def _apply_pending_strategy(): with _strategy_lock: shared_strategy = _get_shared_strategy() if shared_strategy['pending_strategy']: - old_strategy = shared_strategy['active_strategy'] new_strategy = shared_strategy['pending_strategy'] current_time = time.time() @@ -518,7 +517,9 @@ def get_session_id(self): new_strategy = authenticate_response.new_strategy.lower() if new_strategy != active_strategy: _set_pending_strategy(new_strategy) - # Don't apply immediately - let new queries use fresh connections + _apply_pending_strategy() + self._session_id = None + return self.get_session_id except _InactiveRpcError as e: if e.code() == grpc.StatusCode.UNKNOWN and 'status: 456' in e.details(): # Strategy changed, clear cache and retry @@ -553,12 +554,13 @@ def get_session_id(self): _set_active_strategy(strategy) # Check for new strategy in authenticate response - if hasattr(authenticate_response, - 'new_strategy') and authenticate_response.new_strategy: + if hasattr(authenticate_response, 'new_strategy') and authenticate_response.new_strategy: new_strategy = authenticate_response.new_strategy.lower() if new_strategy != strategy: _set_pending_strategy(new_strategy) - # Don't apply immediately - let new queries use fresh connections + _apply_pending_strategy() + self._session_id = None + return self.get_session_id break except _InactiveRpcError as e: if e.code() == grpc.StatusCode.UNKNOWN and 'status: 456' in e.details(): @@ -589,21 +591,6 @@ def get_session_id(self): ).resume() if status: return self.get_session_id - # authenticate_request = e6x_engine_pb2.AuthenticateRequest( - # user=self.__username, - # password=self.__password - # ) - # authenticate_response = self._client.authenticate( - # authenticate_request, - # metadata=_get_grpc_header(cluster=self.cluster_uuid, strategy=_get_active_strategy()) - # ) - # self._session_id = authenticate_response.sessionId - # # Check for new strategy in authenticate response - # if hasattr(authenticate_response, 'new_strategy') and authenticate_response.new_strategy: - # new_strategy = authenticate_response.new_strategy.lower() - # if new_strategy != _get_active_strategy(): - # _set_pending_strategy(new_strategy) - # # Don't apply immediately - let new queries use fresh connections else: raise e else: diff --git a/setup.py b/setup.py index 7acc7df..911f72e 100644 --- a/setup.py +++ b/setup.py @@ -12,7 +12,7 @@ import setuptools -VERSION = (2, 3, 8,) +VERSION = (2, 3, 9, 'rc7') def get_long_desc(): From 189b872e771dfb8aa449ed63cb9def7aed6a3ce8 Mon Sep 17 00:00:00 2001 From: Vishal Anand <101251245+vishale6x@users.noreply.github.com> Date: Fri, 1 Aug 2025 14:08:56 +0530 Subject: [PATCH 2/6] Query isolation. --- README.md | 2 +- e6data_python_connector/e6data_grpc.py | 4 ++++ setup.py | 2 +- 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 7758caf..8959570 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # e6data Python Connector -![version](https://img.shields.io/badge/version-2.3.9.rc7-blue.svg) +![version](https://img.shields.io/badge/version-2.3.9.rc8-blue.svg) ## Introduction diff --git a/e6data_python_connector/e6data_grpc.py b/e6data_python_connector/e6data_grpc.py index dfe69a8..f0b467f 100644 --- a/e6data_python_connector/e6data_grpc.py +++ b/e6data_python_connector/e6data_grpc.py @@ -519,6 +519,8 @@ def get_session_id(self): _set_pending_strategy(new_strategy) _apply_pending_strategy() self._session_id = None + self.close() + self._create_client() return self.get_session_id except _InactiveRpcError as e: if e.code() == grpc.StatusCode.UNKNOWN and 'status: 456' in e.details(): @@ -559,6 +561,8 @@ def get_session_id(self): if new_strategy != strategy: _set_pending_strategy(new_strategy) _apply_pending_strategy() + self.close() + self._create_client() self._session_id = None return self.get_session_id break diff --git a/setup.py b/setup.py index 911f72e..8869fd3 100644 --- a/setup.py +++ b/setup.py @@ -12,7 +12,7 @@ import setuptools -VERSION = (2, 3, 9, 'rc7') +VERSION = (2, 3, 9, 'rc8') def get_long_desc(): From 84680d77b0c02fb74f307d949a7aee2feafb10e4 Mon Sep 17 00:00:00 2001 From: Vishal Anand <101251245+vishale6x@users.noreply.github.com> Date: Fri, 1 Aug 2025 14:25:18 +0530 Subject: [PATCH 3/6] Query isolation. --- e6data_python_connector/dialect.py | 4 +- e6data_python_connector/e6data_grpc.py | 68 +++++++++++++++++++------- e6data_python_connector/strategy.py | 19 +------ 3 files changed, 55 insertions(+), 36 deletions(-) diff --git a/e6data_python_connector/dialect.py b/e6data_python_connector/dialect.py index c87d66f..a3944d8 100644 --- a/e6data_python_connector/dialect.py +++ b/e6data_python_connector/dialect.py @@ -229,6 +229,7 @@ def create_connect_args(self, url): self.cluster_name = url.query.get("cluster-name") self.secure = url.query.get("secure") == "true" self.auto_resume = url.query.get("auto-resume", "true") == "true" # default to True + self.debug = url.query.get("debug", "false") == "true" # default to True if not self.catalog_name: raise Exception('Please specify catalog in query parameter.') @@ -250,7 +251,8 @@ def create_connect_args(self, url): "cluster_name": self.cluster_name, 'secure': self.secure, 'auto_resume': self.auto_resume, - 'grpc_options': grpc_options + 'grpc_options': grpc_options, + 'debug': self.debug } return [], kwargs diff --git a/e6data_python_connector/e6data_grpc.py b/e6data_python_connector/e6data_grpc.py index f0b467f..6475c77 100644 --- a/e6data_python_connector/e6data_grpc.py +++ b/e6data_python_connector/e6data_grpc.py @@ -128,26 +128,19 @@ def escape_string(self, item): # Strategy cache timeout in seconds (5 minutes) STRATEGY_CACHE_TIMEOUT = 300 +# Global set to track debug-enabled connections +_debug_connections = set() + +def _strategy_debug_log(message): + """Log strategy debug messages if any connection has debug enabled.""" + if _debug_connections: + print(f"[E6DATA_STRATEGY_DEBUG] {time.strftime('%Y-%m-%d %H:%M:%S')} - {message}") + def _get_shared_strategy(): """Get or create the shared strategy storage.""" global _strategy_manager, _shared_strategy - - # Try to use multiprocessing.Manager for process-safe storage - try: - if _strategy_manager is None: - _strategy_manager = multiprocessing.Manager() - _shared_strategy = _strategy_manager.dict() - _shared_strategy['active_strategy'] = None - _shared_strategy['last_check_time'] = 0 - _shared_strategy['pending_strategy'] = None - _shared_strategy['query_strategy_map'] = _strategy_manager.dict() - _shared_strategy['last_transition_time'] = 0 - _shared_strategy['session_invalidated'] = False - return _shared_strategy - except: - # Fall back to thread-local storage if Manager fails - return _local_strategy_cache + return _local_strategy_cache def _get_active_strategy(): @@ -175,10 +168,14 @@ def _set_active_strategy(strategy): with _strategy_lock: shared_strategy = _get_shared_strategy() current_time = time.time() + old_strategy = shared_strategy.get('active_strategy') # Only update transition time if strategy actually changed - if shared_strategy['active_strategy'] != normalized_strategy: + if old_strategy != normalized_strategy: shared_strategy['last_transition_time'] = current_time + _strategy_debug_log(f"Setting active strategy: {old_strategy} -> {normalized_strategy}") + else: + _strategy_debug_log(f"Active strategy unchanged: {normalized_strategy}") shared_strategy['active_strategy'] = normalized_strategy shared_strategy['last_check_time'] = current_time @@ -188,6 +185,9 @@ def _clear_strategy_cache(): """Clear the cached strategy to force re-detection.""" with _strategy_lock: shared_strategy = _get_shared_strategy() + old_strategy = shared_strategy.get('active_strategy') + if old_strategy: + _strategy_debug_log(f"Clearing strategy cache (was: {old_strategy})") shared_strategy['active_strategy'] = None shared_strategy['last_check_time'] = 0 shared_strategy['pending_strategy'] = None @@ -208,6 +208,8 @@ def _set_pending_strategy(strategy): if normalized_strategy != current_active: shared_strategy['pending_strategy'] = normalized_strategy + query_count = len(shared_strategy.get('query_strategy_map', {})) + _strategy_debug_log(f"Setting pending strategy: {normalized_strategy} (current: {current_active}, active queries: {query_count})") def _apply_pending_strategy(): @@ -215,14 +217,19 @@ def _apply_pending_strategy(): with _strategy_lock: shared_strategy = _get_shared_strategy() if shared_strategy['pending_strategy']: + old_strategy = shared_strategy['active_strategy'] new_strategy = shared_strategy['pending_strategy'] current_time = time.time() + _strategy_debug_log(f"Applying pending strategy: {old_strategy} -> {new_strategy}") + shared_strategy['active_strategy'] = new_strategy shared_strategy['pending_strategy'] = None shared_strategy['last_check_time'] = current_time shared_strategy['last_transition_time'] = current_time shared_strategy['session_invalidated'] = True # Invalidate all sessions + + _strategy_debug_log(f"Strategy transition complete. All sessions invalidated.") return new_strategy return None @@ -249,6 +256,7 @@ def _register_query_strategy(query_id, strategy): query_map = shared_strategy.get('query_strategy_map', {}) query_map[query_id] = normalized_strategy shared_strategy['query_strategy_map'] = query_map + _strategy_debug_log(f"Query {query_id} registered with strategy: {normalized_strategy}") def _get_query_strategy(query_id): @@ -270,8 +278,11 @@ def _cleanup_query_strategy(query_id): shared_strategy = _get_shared_strategy() query_map = shared_strategy.get('query_strategy_map', {}) if query_id in query_map: + strategy = query_map[query_id] del query_map[query_id] shared_strategy['query_strategy_map'] = query_map + remaining_queries = len(query_map) + _strategy_debug_log(f"Query {query_id} completed (was using {strategy}). Remaining active queries: {remaining_queries}") def _get_strategy_debug_info(): @@ -318,6 +329,7 @@ def __init__( auto_resume: bool = True, scheme: str = 'e6data', grpc_options: dict = None, + debug: bool = False, ): """ Parameters @@ -349,6 +361,8 @@ def __init__( - max_send_message_length: Similar to max_receive_message_length, this parameter sets the maximum allowed size (in bytes) for outgoing messages from the gRPC client - grpc_prepare_timeout: Timeout for prepare statement API call (default to 10 minutes). - keepalive_time_ms: This parameter defines the time, in milliseconds, Default to 30 seconds + debug: bool, Optional + Flag to enable debug logging for blue-green deployment strategy changes """ if not username or not password: raise ValueError("username or password cannot be empty.") @@ -378,6 +392,13 @@ def __init__( The default maximum time on client side to wait for the cluster to resume is 5 minutes. """ self.grpc_auto_resume_timeout_seconds = self._grpc_options.pop('grpc_auto_resume_timeout_seconds') + + # Store debug flag and register with debug connections + self._debug = debug + if self._debug: + _debug_connections.add(id(self)) + _strategy_debug_log(f"Debug mode enabled for connection {id(self)}") + self._create_client() @property @@ -504,6 +525,7 @@ def get_session_id(self): if active_strategy and not pending_strategy: # Use cached strategy only if there's no pending strategy + _strategy_debug_log(f"Authenticating with cached strategy: {active_strategy}") try: authenticate_response = self._client.authenticate( authenticate_request, @@ -525,6 +547,7 @@ def get_session_id(self): except _InactiveRpcError as e: if e.code() == grpc.StatusCode.UNKNOWN and 'status: 456' in e.details(): # Strategy changed, clear cache and retry + _strategy_debug_log(f"Got 456 error with strategy {active_strategy}, clearing cache and retrying") _clear_strategy_cache() active_strategy = None else: @@ -543,8 +566,10 @@ def get_session_id(self): else: # Always try blue first, then green if it fails with 456 strategies = ['blue', 'green'] + _strategy_debug_log(f"No cached strategy, will try strategies in order: {strategies}") last_error = None for strategy in strategies: + _strategy_debug_log(f"Attempting authentication with strategy: {strategy}") try: authenticate_response = self._client.authenticate( authenticate_request, @@ -553,6 +578,7 @@ def get_session_id(self): self._session_id = authenticate_response.sessionId if self._session_id: # Success! Cache this strategy + _strategy_debug_log(f"Authentication successful with strategy: {strategy}") _set_active_strategy(strategy) # Check for new strategy in authenticate response @@ -569,6 +595,7 @@ def get_session_id(self): except _InactiveRpcError as e: if e.code() == grpc.StatusCode.UNKNOWN and 'status: 456' in e.details(): # Wrong strategy, try the next one + _strategy_debug_log(f"Strategy {strategy} failed with 456 error, trying next") last_error = e continue else: @@ -640,6 +667,11 @@ def close(self): self._channel.close() self._channel = None self._session_id = None + + # Remove from debug connections if debug was enabled + if self._debug: + _debug_connections.discard(id(self)) + _strategy_debug_log(f"Debug mode disabled for connection {id(self)}") def check_connection(self): """ @@ -1086,6 +1118,7 @@ def clear(self, query_id=None): query_map = shared_strategy.get('query_strategy_map', {}) if pending_strategy and len(query_map) == 0: + _strategy_debug_log(f"Last query cleared, triggering pending strategy transition") _apply_pending_strategy() return clear_response @@ -1109,6 +1142,7 @@ def cancel(self, query_id): query_map = shared_strategy.get('query_strategy_map', {}) if pending_strategy and len(query_map) == 0: + _strategy_debug_log(f"Last query cleared, triggering pending strategy transition") _apply_pending_strategy() def status(self, query_id): diff --git a/e6data_python_connector/strategy.py b/e6data_python_connector/strategy.py index fe40a55..519d2ef 100644 --- a/e6data_python_connector/strategy.py +++ b/e6data_python_connector/strategy.py @@ -40,24 +40,7 @@ def _initialize_shared_state(): with _initialization_lock: if _shared_strategy is not None: return _shared_strategy - - try: - # Try to create multiprocessing Manager - _manager = multiprocessing.Manager() - _shared_strategy = _manager.dict({ - 'active_strategy': None, - 'last_check_time': 0, - 'pending_strategy': None, - 'query_strategy_map': _manager.dict(), - 'last_transition_time': 0, - 'session_invalidated': False - }) - _logger.debug("Successfully initialized multiprocessing Manager for strategy sharing") - return _shared_strategy - except Exception as e: - # Fall back to thread-local storage if Manager fails - _logger.warning(f"Failed to initialize multiprocessing Manager: {e}. Using thread-local storage.") - return _local_strategy_cache + return _local_strategy_cache def _get_shared_strategy(): From 344d3254cc286683ffcbd0e8ad736584f09a594e Mon Sep 17 00:00:00 2001 From: Vishal Anand <101251245+vishale6x@users.noreply.github.com> Date: Fri, 1 Aug 2025 14:26:17 +0530 Subject: [PATCH 4/6] Query isolation. --- README.md | 2 +- setup.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 8959570..8f91d16 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # e6data Python Connector -![version](https://img.shields.io/badge/version-2.3.9.rc8-blue.svg) +![version](https://img.shields.io/badge/version-2.3.9.rc9-blue.svg) ## Introduction diff --git a/setup.py b/setup.py index 8869fd3..2e88597 100644 --- a/setup.py +++ b/setup.py @@ -12,7 +12,7 @@ import setuptools -VERSION = (2, 3, 9, 'rc8') +VERSION = (2, 3, 9, 'rc9') def get_long_desc(): From c55a3044e953c1666aba5805c6cd7527631f8884 Mon Sep 17 00:00:00 2001 From: Vishal Anand <101251245+vishale6x@users.noreply.github.com> Date: Fri, 1 Aug 2025 16:45:41 +0530 Subject: [PATCH 5/6] Query isolation. --- e6data_python_connector/e6data_grpc.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/e6data_python_connector/e6data_grpc.py b/e6data_python_connector/e6data_grpc.py index 6475c77..93cff30 100644 --- a/e6data_python_connector/e6data_grpc.py +++ b/e6data_python_connector/e6data_grpc.py @@ -139,7 +139,6 @@ def _strategy_debug_log(message): def _get_shared_strategy(): """Get or create the shared strategy storage.""" - global _strategy_manager, _shared_strategy return _local_strategy_cache @@ -1255,6 +1254,7 @@ def execute(self, operation, parameters=None, **kwargs): # Check for new strategy in prepare response if hasattr(prepare_statement_response, 'new_strategy') and prepare_statement_response.new_strategy: + print(f"@@@@@@ New strategy: {prepare_statement_response.new_strategy}") new_strategy = prepare_statement_response.new_strategy.lower() if new_strategy != _get_active_strategy(): _set_pending_strategy(new_strategy) From 0cbd5986a5cf530783f3652e12d48e8ea0c0424f Mon Sep 17 00:00:00 2001 From: Vishal Anand <101251245+vishale6x@users.noreply.github.com> Date: Fri, 1 Aug 2025 16:51:57 +0530 Subject: [PATCH 6/6] Query isolation. --- README.md | 2 +- setup.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 8f91d16..98686be 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # e6data Python Connector -![version](https://img.shields.io/badge/version-2.3.9.rc9-blue.svg) +![version](https://img.shields.io/badge/version-2.3.9-blue.svg) ## Introduction diff --git a/setup.py b/setup.py index 2e88597..051ca53 100644 --- a/setup.py +++ b/setup.py @@ -12,7 +12,7 @@ import setuptools -VERSION = (2, 3, 9, 'rc9') +VERSION = (2, 3, 9,) def get_long_desc():