Skip to content

Commit 395ead6

Browse files
authored
Switching strategy (#68)
* <Vishal> Query isolation.
1 parent 6c1fe14 commit 395ead6

File tree

5 files changed

+68
-58
lines changed

5 files changed

+68
-58
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# e6data Python Connector
22

3-
![version](https://img.shields.io/badge/version-2.3.8-blue.svg)
3+
![version](https://img.shields.io/badge/version-2.3.9-blue.svg)
44

55
## Introduction
66

e6data_python_connector/dialect.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,7 @@ def create_connect_args(self, url):
229229
self.cluster_name = url.query.get("cluster-name")
230230
self.secure = url.query.get("secure") == "true"
231231
self.auto_resume = url.query.get("auto-resume", "true") == "true" # default to True
232+
self.debug = url.query.get("debug", "false") == "true" # default to True
232233
if not self.catalog_name:
233234
raise Exception('Please specify catalog in query parameter.')
234235

@@ -250,7 +251,8 @@ def create_connect_args(self, url):
250251
"cluster_name": self.cluster_name,
251252
'secure': self.secure,
252253
'auto_resume': self.auto_resume,
253-
'grpc_options': grpc_options
254+
'grpc_options': grpc_options,
255+
'debug': self.debug
254256
}
255257
return [], kwargs
256258

e6data_python_connector/e6data_grpc.py

Lines changed: 62 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -128,26 +128,18 @@ def escape_string(self, item):
128128
# Strategy cache timeout in seconds (5 minutes)
129129
STRATEGY_CACHE_TIMEOUT = 300
130130

131+
# Global set to track debug-enabled connections
132+
_debug_connections = set()
133+
134+
def _strategy_debug_log(message):
135+
"""Log strategy debug messages if any connection has debug enabled."""
136+
if _debug_connections:
137+
print(f"[E6DATA_STRATEGY_DEBUG] {time.strftime('%Y-%m-%d %H:%M:%S')} - {message}")
138+
131139

132140
def _get_shared_strategy():
133141
"""Get or create the shared strategy storage."""
134-
global _strategy_manager, _shared_strategy
135-
136-
# Try to use multiprocessing.Manager for process-safe storage
137-
try:
138-
if _strategy_manager is None:
139-
_strategy_manager = multiprocessing.Manager()
140-
_shared_strategy = _strategy_manager.dict()
141-
_shared_strategy['active_strategy'] = None
142-
_shared_strategy['last_check_time'] = 0
143-
_shared_strategy['pending_strategy'] = None
144-
_shared_strategy['query_strategy_map'] = _strategy_manager.dict()
145-
_shared_strategy['last_transition_time'] = 0
146-
_shared_strategy['session_invalidated'] = False
147-
return _shared_strategy
148-
except:
149-
# Fall back to thread-local storage if Manager fails
150-
return _local_strategy_cache
142+
return _local_strategy_cache
151143

152144

153145
def _get_active_strategy():
@@ -175,10 +167,14 @@ def _set_active_strategy(strategy):
175167
with _strategy_lock:
176168
shared_strategy = _get_shared_strategy()
177169
current_time = time.time()
170+
old_strategy = shared_strategy.get('active_strategy')
178171

179172
# Only update transition time if strategy actually changed
180-
if shared_strategy['active_strategy'] != normalized_strategy:
173+
if old_strategy != normalized_strategy:
181174
shared_strategy['last_transition_time'] = current_time
175+
_strategy_debug_log(f"Setting active strategy: {old_strategy} -> {normalized_strategy}")
176+
else:
177+
_strategy_debug_log(f"Active strategy unchanged: {normalized_strategy}")
182178

183179
shared_strategy['active_strategy'] = normalized_strategy
184180
shared_strategy['last_check_time'] = current_time
@@ -188,6 +184,9 @@ def _clear_strategy_cache():
188184
"""Clear the cached strategy to force re-detection."""
189185
with _strategy_lock:
190186
shared_strategy = _get_shared_strategy()
187+
old_strategy = shared_strategy.get('active_strategy')
188+
if old_strategy:
189+
_strategy_debug_log(f"Clearing strategy cache (was: {old_strategy})")
191190
shared_strategy['active_strategy'] = None
192191
shared_strategy['last_check_time'] = 0
193192
shared_strategy['pending_strategy'] = None
@@ -208,6 +207,8 @@ def _set_pending_strategy(strategy):
208207

209208
if normalized_strategy != current_active:
210209
shared_strategy['pending_strategy'] = normalized_strategy
210+
query_count = len(shared_strategy.get('query_strategy_map', {}))
211+
_strategy_debug_log(f"Setting pending strategy: {normalized_strategy} (current: {current_active}, active queries: {query_count})")
211212

212213

213214
def _apply_pending_strategy():
@@ -219,11 +220,15 @@ def _apply_pending_strategy():
219220
new_strategy = shared_strategy['pending_strategy']
220221
current_time = time.time()
221222

223+
_strategy_debug_log(f"Applying pending strategy: {old_strategy} -> {new_strategy}")
224+
222225
shared_strategy['active_strategy'] = new_strategy
223226
shared_strategy['pending_strategy'] = None
224227
shared_strategy['last_check_time'] = current_time
225228
shared_strategy['last_transition_time'] = current_time
226229
shared_strategy['session_invalidated'] = True # Invalidate all sessions
230+
231+
_strategy_debug_log(f"Strategy transition complete. All sessions invalidated.")
227232

228233
return new_strategy
229234
return None
@@ -250,6 +255,7 @@ def _register_query_strategy(query_id, strategy):
250255
query_map = shared_strategy.get('query_strategy_map', {})
251256
query_map[query_id] = normalized_strategy
252257
shared_strategy['query_strategy_map'] = query_map
258+
_strategy_debug_log(f"Query {query_id} registered with strategy: {normalized_strategy}")
253259

254260

255261
def _get_query_strategy(query_id):
@@ -271,8 +277,11 @@ def _cleanup_query_strategy(query_id):
271277
shared_strategy = _get_shared_strategy()
272278
query_map = shared_strategy.get('query_strategy_map', {})
273279
if query_id in query_map:
280+
strategy = query_map[query_id]
274281
del query_map[query_id]
275282
shared_strategy['query_strategy_map'] = query_map
283+
remaining_queries = len(query_map)
284+
_strategy_debug_log(f"Query {query_id} completed (was using {strategy}). Remaining active queries: {remaining_queries}")
276285

277286

278287
def _get_strategy_debug_info():
@@ -319,6 +328,7 @@ def __init__(
319328
auto_resume: bool = True,
320329
scheme: str = 'e6data',
321330
grpc_options: dict = None,
331+
debug: bool = False,
322332
):
323333
"""
324334
Parameters
@@ -350,6 +360,8 @@ def __init__(
350360
- max_send_message_length: Similar to max_receive_message_length, this parameter sets the maximum allowed size (in bytes) for outgoing messages from the gRPC client
351361
- grpc_prepare_timeout: Timeout for prepare statement API call (default to 10 minutes).
352362
- keepalive_time_ms: This parameter defines the time, in milliseconds, Default to 30 seconds
363+
debug: bool, Optional
364+
Flag to enable debug logging for blue-green deployment strategy changes
353365
"""
354366
if not username or not password:
355367
raise ValueError("username or password cannot be empty.")
@@ -379,6 +391,13 @@ def __init__(
379391
The default maximum time on client side to wait for the cluster to resume is 5 minutes.
380392
"""
381393
self.grpc_auto_resume_timeout_seconds = self._grpc_options.pop('grpc_auto_resume_timeout_seconds')
394+
395+
# Store debug flag and register with debug connections
396+
self._debug = debug
397+
if self._debug:
398+
_debug_connections.add(id(self))
399+
_strategy_debug_log(f"Debug mode enabled for connection {id(self)}")
400+
382401
self._create_client()
383402

384403
@property
@@ -505,6 +524,7 @@ def get_session_id(self):
505524

506525
if active_strategy and not pending_strategy:
507526
# Use cached strategy only if there's no pending strategy
527+
_strategy_debug_log(f"Authenticating with cached strategy: {active_strategy}")
508528
try:
509529
authenticate_response = self._client.authenticate(
510530
authenticate_request,
@@ -518,10 +538,15 @@ def get_session_id(self):
518538
new_strategy = authenticate_response.new_strategy.lower()
519539
if new_strategy != active_strategy:
520540
_set_pending_strategy(new_strategy)
521-
# Don't apply immediately - let new queries use fresh connections
541+
_apply_pending_strategy()
542+
self._session_id = None
543+
self.close()
544+
self._create_client()
545+
return self.get_session_id
522546
except _InactiveRpcError as e:
523547
if e.code() == grpc.StatusCode.UNKNOWN and 'status: 456' in e.details():
524548
# Strategy changed, clear cache and retry
549+
_strategy_debug_log(f"Got 456 error with strategy {active_strategy}, clearing cache and retrying")
525550
_clear_strategy_cache()
526551
active_strategy = None
527552
else:
@@ -540,8 +565,10 @@ def get_session_id(self):
540565
else:
541566
# Always try blue first, then green if it fails with 456
542567
strategies = ['blue', 'green']
568+
_strategy_debug_log(f"No cached strategy, will try strategies in order: {strategies}")
543569
last_error = None
544570
for strategy in strategies:
571+
_strategy_debug_log(f"Attempting authentication with strategy: {strategy}")
545572
try:
546573
authenticate_response = self._client.authenticate(
547574
authenticate_request,
@@ -550,19 +577,24 @@ def get_session_id(self):
550577
self._session_id = authenticate_response.sessionId
551578
if self._session_id:
552579
# Success! Cache this strategy
580+
_strategy_debug_log(f"Authentication successful with strategy: {strategy}")
553581
_set_active_strategy(strategy)
554582

555583
# Check for new strategy in authenticate response
556-
if hasattr(authenticate_response,
557-
'new_strategy') and authenticate_response.new_strategy:
584+
if hasattr(authenticate_response, 'new_strategy') and authenticate_response.new_strategy:
558585
new_strategy = authenticate_response.new_strategy.lower()
559586
if new_strategy != strategy:
560587
_set_pending_strategy(new_strategy)
561-
# Don't apply immediately - let new queries use fresh connections
588+
_apply_pending_strategy()
589+
self.close()
590+
self._create_client()
591+
self._session_id = None
592+
return self.get_session_id
562593
break
563594
except _InactiveRpcError as e:
564595
if e.code() == grpc.StatusCode.UNKNOWN and 'status: 456' in e.details():
565596
# Wrong strategy, try the next one
597+
_strategy_debug_log(f"Strategy {strategy} failed with 456 error, trying next")
566598
last_error = e
567599
continue
568600
else:
@@ -589,21 +621,6 @@ def get_session_id(self):
589621
).resume()
590622
if status:
591623
return self.get_session_id
592-
# authenticate_request = e6x_engine_pb2.AuthenticateRequest(
593-
# user=self.__username,
594-
# password=self.__password
595-
# )
596-
# authenticate_response = self._client.authenticate(
597-
# authenticate_request,
598-
# metadata=_get_grpc_header(cluster=self.cluster_uuid, strategy=_get_active_strategy())
599-
# )
600-
# self._session_id = authenticate_response.sessionId
601-
# # Check for new strategy in authenticate response
602-
# if hasattr(authenticate_response, 'new_strategy') and authenticate_response.new_strategy:
603-
# new_strategy = authenticate_response.new_strategy.lower()
604-
# if new_strategy != _get_active_strategy():
605-
# _set_pending_strategy(new_strategy)
606-
# # Don't apply immediately - let new queries use fresh connections
607624
else:
608625
raise e
609626
else:
@@ -649,6 +666,11 @@ def close(self):
649666
self._channel.close()
650667
self._channel = None
651668
self._session_id = None
669+
670+
# Remove from debug connections if debug was enabled
671+
if self._debug:
672+
_debug_connections.discard(id(self))
673+
_strategy_debug_log(f"Debug mode disabled for connection {id(self)}")
652674

653675
def check_connection(self):
654676
"""
@@ -1095,6 +1117,7 @@ def clear(self, query_id=None):
10951117
query_map = shared_strategy.get('query_strategy_map', {})
10961118

10971119
if pending_strategy and len(query_map) == 0:
1120+
_strategy_debug_log(f"Last query cleared, triggering pending strategy transition")
10981121
_apply_pending_strategy()
10991122

11001123
return clear_response
@@ -1118,6 +1141,7 @@ def cancel(self, query_id):
11181141
query_map = shared_strategy.get('query_strategy_map', {})
11191142

11201143
if pending_strategy and len(query_map) == 0:
1144+
_strategy_debug_log(f"Last query cleared, triggering pending strategy transition")
11211145
_apply_pending_strategy()
11221146

11231147
def status(self, query_id):
@@ -1230,6 +1254,7 @@ def execute(self, operation, parameters=None, **kwargs):
12301254

12311255
# Check for new strategy in prepare response
12321256
if hasattr(prepare_statement_response, 'new_strategy') and prepare_statement_response.new_strategy:
1257+
print(f"@@@@@@ New strategy: {prepare_statement_response.new_strategy}")
12331258
new_strategy = prepare_statement_response.new_strategy.lower()
12341259
if new_strategy != _get_active_strategy():
12351260
_set_pending_strategy(new_strategy)

e6data_python_connector/strategy.py

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -40,24 +40,7 @@ def _initialize_shared_state():
4040
with _initialization_lock:
4141
if _shared_strategy is not None:
4242
return _shared_strategy
43-
44-
try:
45-
# Try to create multiprocessing Manager
46-
_manager = multiprocessing.Manager()
47-
_shared_strategy = _manager.dict({
48-
'active_strategy': None,
49-
'last_check_time': 0,
50-
'pending_strategy': None,
51-
'query_strategy_map': _manager.dict(),
52-
'last_transition_time': 0,
53-
'session_invalidated': False
54-
})
55-
_logger.debug("Successfully initialized multiprocessing Manager for strategy sharing")
56-
return _shared_strategy
57-
except Exception as e:
58-
# Fall back to thread-local storage if Manager fails
59-
_logger.warning(f"Failed to initialize multiprocessing Manager: {e}. Using thread-local storage.")
60-
return _local_strategy_cache
43+
return _local_strategy_cache
6144

6245

6346
def _get_shared_strategy():

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
import setuptools
1414

15-
VERSION = (2, 3, 8,)
15+
VERSION = (2, 3, 9,)
1616

1717

1818
def get_long_desc():

0 commit comments

Comments
 (0)