diff --git a/README.md b/README.md index fcbf71a..13b80ba 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # e6data Python Connector -![version](https://img.shields.io/badge/version-2.2.6-blue.svg) +![version](https://img.shields.io/badge/version-2.3.6-blue.svg) ## Introduction diff --git a/e6data_python_connector/cluster_manager.py b/e6data_python_connector/cluster_manager.py index 5a63456..d60302c 100644 --- a/e6data_python_connector/cluster_manager.py +++ b/e6data_python_connector/cluster_manager.py @@ -5,13 +5,9 @@ import grpc from grpc._channel import _InactiveRpcError import multiprocessing -import logging -# Import strategy management functions -from e6data_python_connector.strategy import _get_active_strategy, _set_active_strategy, _set_pending_strategy, _get_grpc_header as _get_strategy_header - -# Set up logging -_logger = logging.getLogger(__name__) +from e6data_python_connector.strategy import _get_active_strategy, _set_active_strategy, _set_pending_strategy, \ + _get_grpc_header as _get_strategy_header def _get_grpc_header(engine_ip=None, cluster=None, strategy=None): @@ -140,7 +136,8 @@ class ClusterManager: cluster_uuid (str): The unique identifier for the target cluster. """ - def __init__(self, host: str, port: int, user: str, password: str, secure_channel: bool = False, timeout=60 * 5, cluster_uuid=None, grpc_options=None): + def __init__(self, host: str, port: int, user: str, password: str, secure_channel: bool = False, timeout=60 * 5, + cluster_uuid=None, grpc_options=None): """ Initializes a new instance of the ClusterManager class. @@ -208,7 +205,7 @@ def _try_cluster_request(self, request_type, payload=None): The response from the successful request """ current_strategy = _get_active_strategy() - + # Create payload if not provided if payload is None: if request_type == "status": @@ -221,12 +218,10 @@ def _try_cluster_request(self, request_type, payload=None): user=self._user, password=self._password ) - + # If we have an active strategy, use it first if current_strategy is not None: try: - _logger.info(f"ClusterManager: Trying {request_type} with established strategy: {current_strategy}") - if request_type == "status": response = self._get_connection.status( payload, @@ -239,22 +234,20 @@ def _try_cluster_request(self, request_type, payload=None): ) else: raise ValueError(f"Unknown request type: {request_type}") - + # Check for new strategy in response if hasattr(response, 'new_strategy') and response.new_strategy: new_strategy = response.new_strategy.lower() if new_strategy != current_strategy: - _logger.info(f"ClusterManager: Server indicated new strategy during {request_type}: {new_strategy}") _set_pending_strategy(new_strategy) - + return response - + except _InactiveRpcError as e: if e.code() == grpc.StatusCode.UNKNOWN and 'status: 456' in e.details(): # 456 error - switch to alternative strategy alternative_strategy = 'green' if current_strategy == 'blue' else 'blue' - _logger.info(f"ClusterManager: {request_type} failed with 456 error on {current_strategy}, switching to: {alternative_strategy}") - + try: if request_type == "status": response = self._get_connection.status( @@ -266,36 +259,30 @@ def _try_cluster_request(self, request_type, payload=None): payload, metadata=_get_grpc_header(cluster=self.cluster_uuid, strategy=alternative_strategy) ) - + # Update active strategy since the alternative worked _set_active_strategy(alternative_strategy) - _logger.info(f"ClusterManager: {request_type} succeeded with alternative strategy: {alternative_strategy}") - + # Check for new strategy in response if hasattr(response, 'new_strategy') and response.new_strategy: new_strategy = response.new_strategy.lower() if new_strategy != alternative_strategy: - _logger.info(f"ClusterManager: Server indicated new strategy during {request_type}: {new_strategy}") _set_pending_strategy(new_strategy) - + return response - + except _InactiveRpcError as e2: - _logger.error(f"ClusterManager: Both strategies failed for {request_type}. Original error: {e}, Alternative error: {e2}") raise e # Raise the original error else: # Non-456 error - don't retry - _logger.error(f"ClusterManager: {request_type} failed with non-456 error: {e}") raise e - + # No active strategy - start with authentication logic (blue first, then green) - _logger.info(f"ClusterManager: No active strategy, starting authentication sequence for {request_type}") strategies_to_try = ['blue', 'green'] - + for i, strategy in enumerate(strategies_to_try): try: - _logger.info(f"ClusterManager: Trying {request_type} with strategy: {strategy}") - + if request_type == "status": response = self._get_connection.status( payload, @@ -308,36 +295,30 @@ def _try_cluster_request(self, request_type, payload=None): ) else: raise ValueError(f"Unknown request type: {request_type}") - + # Set the working strategy as active _set_active_strategy(strategy) - _logger.info(f"ClusterManager: {request_type} succeeded with strategy: {strategy}") - + # Check for new strategy in response if hasattr(response, 'new_strategy') and response.new_strategy: new_strategy = response.new_strategy.lower() if new_strategy != strategy: - _logger.info(f"ClusterManager: Server indicated new strategy during {request_type}: {new_strategy}") _set_pending_strategy(new_strategy) - + return response - + except _InactiveRpcError as e: if e.code() == grpc.StatusCode.UNKNOWN and 'status: 456' in e.details(): # 456 error - try next strategy if i < len(strategies_to_try) - 1: - _logger.info(f"ClusterManager: {request_type} failed with 456 error on {strategy}, trying next strategy: {strategies_to_try[i + 1]}") continue else: - _logger.error(f"ClusterManager: {request_type} failed with 456 error on all strategies") raise e else: # Non-456 error - don't retry - _logger.error(f"ClusterManager: {request_type} failed with non-456 error: {e}") raise e - + # If we get here, all strategies failed - _logger.error(f"ClusterManager: All strategies failed for {request_type}") raise e def _check_cluster_status(self): diff --git a/e6data_python_connector/e6data_grpc.py b/e6data_python_connector/e6data_grpc.py index 84ab913..298c9f4 100644 --- a/e6data_python_connector/e6data_grpc.py +++ b/e6data_python_connector/e6data_grpc.py @@ -7,7 +7,6 @@ from __future__ import unicode_literals import datetime -import logging import re import sys import time @@ -32,9 +31,6 @@ threadsafety = 2 # Threads may share the e6xdb and connections. paramstyle = 'pyformat' # Python extended format codes, e.g. ...WHERE name=%(name)s -logging.basicConfig(level=logging.DEBUG) -_logger = logging.getLogger(__name__) - _TIMESTAMP_PATTERN = re.compile(r'(\d+-\d+-\d+ \d+:\d+:\d+(\.\d{,6})?)') ssl_cert_parameter_map = { @@ -121,10 +117,10 @@ def escape_string(self, item): _strategy_manager = None _shared_strategy = None _local_strategy_cache = { - 'active_strategy': None, + 'active_strategy': None, 'last_check_time': 0, 'pending_strategy': None, # Strategy to use for next query - 'query_strategy_map': {}, # Map of query_id to strategy used + 'query_strategy_map': {}, # Map of query_id to strategy used 'last_transition_time': 0, # Timestamp of last strategy transition 'session_invalidated': False # Flag to invalidate all sessions } @@ -132,10 +128,11 @@ def escape_string(self, item): # Strategy cache timeout in seconds (5 minutes) STRATEGY_CACHE_TIMEOUT = 300 + def _get_shared_strategy(): """Get or create the shared strategy storage.""" global _strategy_manager, _shared_strategy - + # Try to use multiprocessing.Manager for process-safe storage try: if _strategy_manager is None: @@ -174,15 +171,15 @@ def _set_active_strategy(strategy): normalized_strategy = strategy.lower() if normalized_strategy not in ['blue', 'green']: return - + with _strategy_lock: shared_strategy = _get_shared_strategy() current_time = time.time() - + # Only update transition time if strategy actually changed if shared_strategy['active_strategy'] != normalized_strategy: shared_strategy['last_transition_time'] = current_time - + shared_strategy['active_strategy'] = normalized_strategy shared_strategy['last_check_time'] = current_time @@ -204,11 +201,11 @@ def _set_pending_strategy(strategy): normalized_strategy = strategy.lower() if normalized_strategy not in ['blue', 'green']: return - + with _strategy_lock: shared_strategy = _get_shared_strategy() current_active = shared_strategy['active_strategy'] - + if normalized_strategy != current_active: shared_strategy['pending_strategy'] = normalized_strategy @@ -221,13 +218,13 @@ def _apply_pending_strategy(): old_strategy = shared_strategy['active_strategy'] new_strategy = shared_strategy['pending_strategy'] current_time = time.time() - + shared_strategy['active_strategy'] = new_strategy shared_strategy['pending_strategy'] = None shared_strategy['last_check_time'] = current_time shared_strategy['last_transition_time'] = current_time shared_strategy['session_invalidated'] = True # Invalidate all sessions - + return new_strategy return None @@ -247,7 +244,7 @@ def _register_query_strategy(query_id, strategy): normalized_strategy = strategy.lower() if normalized_strategy not in ['blue', 'green']: return - + with _strategy_lock: shared_strategy = _get_shared_strategy() query_map = shared_strategy.get('query_strategy_map', {}) @@ -407,7 +404,8 @@ def _get_grpc_options(self): "keepalive_permit_without_calls": 1, # Allow keep-alives with no active RPCs. "http2.max_pings_without_data": 0, # Unlimited pings without data. "http2.min_time_between_pings_ms": 15000, # Minimum time between pings (15 seconds). - "http2.min_ping_interval_without_data_ms": 15000, # Minimum interval between pings without data (15 seconds). + "http2.min_ping_interval_without_data_ms": 15000, + # Minimum interval between pings without data (15 seconds). } if grpc_options: for key, value in grpc_options.items(): @@ -481,7 +479,7 @@ def get_session_id(self): self._create_client() # Clear the invalidation flag shared_strategy['session_invalidated'] = False - + # Only create fresh connection if we have no active queries elif self._session_id and pending_strategy and pending_strategy != active_strategy: query_map = shared_strategy.get('query_strategy_map', {}) @@ -531,7 +529,7 @@ def get_session_id(self): elif pending_strategy: # If there's a pending strategy, force re-authentication with new strategy active_strategy = None - + if not active_strategy: # Check if we have a pending strategy to use if pending_strategy: @@ -555,7 +553,8 @@ def get_session_id(self): _set_active_strategy(strategy) # Check for new strategy in authenticate response - if hasattr(authenticate_response, 'new_strategy') and authenticate_response.new_strategy: + if hasattr(authenticate_response, + 'new_strategy') and authenticate_response.new_strategy: new_strategy = authenticate_response.new_strategy.lower() if new_strategy != strategy: _set_pending_strategy(new_strategy) @@ -569,11 +568,11 @@ def get_session_id(self): else: # Different error, handle it normally raise e - + if not self._session_id and last_error: # Neither strategy worked raise last_error - + if not self._session_id: raise ValueError("Invalid credentials.") except _InactiveRpcError as e: @@ -671,7 +670,7 @@ def check_strategy_change(self): pending_strategy = shared_strategy.get('pending_strategy') active_strategy = shared_strategy.get('active_strategy') query_map = shared_strategy.get('query_strategy_map', {}) - + if pending_strategy and pending_strategy != active_strategy and len(query_map) == 0: _apply_pending_strategy() # Force new authentication with new strategy @@ -690,11 +689,11 @@ def _should_create_new_connection(self): pending_strategy = shared_strategy.get('pending_strategy') active_strategy = shared_strategy.get('active_strategy') query_map = shared_strategy.get('query_strategy_map', {}) - + # Create new connection if: # 1. No session exists # 2. There's a pending strategy change and no active queries - return (not self._session_id or + return (not self._session_id or (pending_strategy and pending_strategy != active_strategy and len(query_map) == 0)) def clear(self, query_id, engine_ip=None): @@ -714,7 +713,7 @@ def clear(self, query_id, engine_ip=None): clear_request, metadata=_get_grpc_header(engine_ip=engine_ip, cluster=self.cluster_uuid, strategy=_get_active_strategy()) ) - + # Check for new strategy in clear response if hasattr(clear_response, 'new_strategy') and clear_response.new_strategy: _set_pending_strategy(clear_response.new_strategy) @@ -745,7 +744,7 @@ def query_cancel(self, engine_ip, query_id): cancel_query_request, metadata=_get_grpc_header(engine_ip=engine_ip, cluster=self.cluster_uuid, strategy=_get_active_strategy()) ) - + # Check for new strategy in cancel response if hasattr(cancel_response, 'new_strategy') and cancel_response.new_strategy: _set_pending_strategy(cancel_response.new_strategy) @@ -791,7 +790,7 @@ def get_tables(self, catalog, database): get_table_request, metadata=_get_grpc_header(cluster=self.cluster_uuid, strategy=_get_active_strategy()) ) - + # Check for new strategy in get tables response if hasattr(get_table_response, 'new_strategy') and get_table_response.new_strategy: _set_pending_strategy(get_table_response.new_strategy) @@ -819,7 +818,7 @@ def get_columns(self, catalog, database, table): get_columns_request, metadata=_get_grpc_header(cluster=self.cluster_uuid, strategy=_get_active_strategy()) ) - + # Check for new strategy in get columns response if hasattr(get_columns_response, 'new_strategy') and get_columns_response.new_strategy: _set_pending_strategy(get_columns_response.new_strategy) @@ -843,11 +842,11 @@ def get_schema_names(self, catalog): get_schema_request, metadata=_get_grpc_header(cluster=self.cluster_uuid, strategy=_get_active_strategy()) ) - + # Check for new strategy in get schema names response if hasattr(get_schema_response, 'new_strategy') and get_schema_response.new_strategy: _set_pending_strategy(get_schema_response.new_strategy) - + return list(get_schema_response.schemas) def commit(self): @@ -1072,7 +1071,7 @@ def clear(self, query_id=None): """ if not query_id: query_id = self._query_id - + clear_request = e6x_engine_pb2.ClearOrCancelQueryRequest( sessionId=self.connection.get_session_id, queryId=query_id, @@ -1081,7 +1080,7 @@ def clear(self, query_id=None): # Get fresh client after session access (may have been invalidated) client = self.connection.client clear_response = client.clearOrCancelQuery(clear_request, metadata=self.metadata) - + # Check for new strategy in clear response if hasattr(clear_response, 'new_strategy') and clear_response.new_strategy: _set_pending_strategy(clear_response.new_strategy) @@ -1094,7 +1093,7 @@ def clear(self, query_id=None): shared_strategy = _get_shared_strategy() pending_strategy = shared_strategy.get('pending_strategy') query_map = shared_strategy.get('query_strategy_map', {}) - + if pending_strategy and len(query_map) == 0: _apply_pending_strategy() @@ -1117,7 +1116,7 @@ def cancel(self, query_id): shared_strategy = _get_shared_strategy() pending_strategy = shared_strategy.get('pending_strategy') query_map = shared_strategy.get('query_strategy_map', {}) - + if pending_strategy and len(query_map) == 0: _apply_pending_strategy() @@ -1137,11 +1136,11 @@ def status(self, query_id): engineIP=self._engine_ip ) status_response = self.connection.client.status(status_request, metadata=self.metadata) - + # Check for new strategy in status response if hasattr(status_response, 'new_strategy') and status_response.new_strategy: _set_pending_strategy(status_response.new_strategy) - + return status_response @re_auth @@ -1183,18 +1182,17 @@ def execute(self, operation, parameters=None, **kwargs): self._query_id = prepare_statement_response.queryId self._engine_ip = prepare_statement_response.engineIP - # Check for new strategy in prepare response if hasattr(prepare_statement_response, 'new_strategy') and prepare_statement_response.new_strategy: new_strategy = prepare_statement_response.new_strategy.lower() if new_strategy != _get_active_strategy(): _set_pending_strategy(new_strategy) - + # Register this query with the current strategy current_strategy = _get_active_strategy() if current_strategy: _register_query_strategy(self._query_id, current_strategy) - + execute_statement_request = e6x_engine_pb2.ExecuteStatementRequest( engineIP=self._engine_ip, sessionId=self.connection.get_session_id, @@ -1206,7 +1204,7 @@ def execute(self, operation, parameters=None, **kwargs): execute_statement_request, metadata=self.metadata ) - + # Check for new strategy in execute response if hasattr(execute_response, 'new_strategy') and execute_response.new_strategy: new_strategy = execute_response.new_strategy.lower() @@ -1235,7 +1233,7 @@ def execute(self, operation, parameters=None, **kwargs): new_strategy = prepare_statement_response.new_strategy.lower() if new_strategy != _get_active_strategy(): _set_pending_strategy(new_strategy) - + # Register this query with the current strategy current_strategy = _get_active_strategy() @@ -1288,13 +1286,13 @@ def update_mete_data(self): result_meta_data_request, metadata=self.metadata ) - + # Check for new strategy in metadata response if hasattr(get_result_metadata_response, 'new_strategy') and get_result_metadata_response.new_strategy: new_strategy = get_result_metadata_response.new_strategy.lower() if new_strategy != _get_active_strategy(): _set_pending_strategy(new_strategy) - + buffer = BytesIO(get_result_metadata_response.resultMetaData) self._rowcount, self._query_columns_description = get_query_columns_info(buffer) self._is_metadata_updated = True @@ -1368,13 +1366,13 @@ def fetch_batch(self): get_next_result_batch_request, metadata=self.metadata ) - + # Check for new strategy in batch response if hasattr(get_next_result_batch_response, 'new_strategy') and get_next_result_batch_response.new_strategy: new_strategy = get_next_result_batch_response.new_strategy.lower() if new_strategy != _get_active_strategy(): _set_pending_strategy(new_strategy) - + buffer = get_next_result_batch_response.resultBatch if not self._is_metadata_updated: self.update_mete_data() @@ -1467,11 +1465,11 @@ def explain_analyse(self): explain_analyze_request, metadata=self.metadata ) - + # Check for new strategy in explain analyze response if hasattr(explain_analyze_response, 'new_strategy') and explain_analyze_response.new_strategy: _set_pending_strategy(explain_analyze_response.new_strategy) - + return dict( is_cached=explain_analyze_response.isCached, parsing_time=explain_analyze_response.parsingTime, diff --git a/setup.py b/setup.py index 5feb0a4..82b28dd 100644 --- a/setup.py +++ b/setup.py @@ -12,7 +12,7 @@ import setuptools -VERSION = (2, 2, 6,) +VERSION = (2, 3, 6,) def get_long_desc():