@@ -366,8 +366,9 @@ def should_handle(self, stream, msg, idents):
366366 return False
367367 return True
368368
369- async def dispatch_shell (self , idents , msg ):
369+ async def dispatch_shell (self , * args ):
370370 """dispatch shell requests"""
371+ idents , msg = args
371372
372373 # flush control queue before handling shell requests
373374 await self ._flush_control_queue ()
@@ -446,13 +447,13 @@ async def process_one(self, wait=True):
446447 Returns None if no message was handled.
447448 """
448449 if wait :
449- t , dispatch , idents , msg = await self .msg_queue .async_q .get ()
450+ t , dispatch , args = await self .msg_queue .async_q .get ()
450451 else :
451452 try :
452- t , dispatch , idents , msg = self .msg_queue .async_q .get_nowait ()
453+ t , dispatch , args = self .msg_queue .async_q .get_nowait ()
453454 except (asyncio .QueueEmpty , QueueEmpty ):
454455 return None
455- await dispatch (idents , msg )
456+ await dispatch (* args )
456457
457458 async def dispatch_queue (self ):
458459 """Coroutine to preserve order of message handling
@@ -476,28 +477,33 @@ async def dispatch_queue(self):
476477 def _message_counter_default (self ):
477478 return itertools .count ()
478479
479- def schedule_dispatch (self , dispatch , msg ):
480+ def schedule_dispatch (self , dispatch , * args ):
480481 """schedule a message for dispatch"""
481482 idx = next (self ._message_counter )
482-
483- idents , msg = self .session .feed_identities (msg , copy = False )
484- try :
485- msg = self .session .deserialize (msg , content = True , copy = False )
486- except Exception :
487- self .log .error ("Invalid Shell Message" , exc_info = True )
488- return
483+ if args :
484+ idents , msg = self .session .feed_identities (args [0 ], copy = False )
485+ try :
486+ msg = self .session .deserialize (msg , content = True , copy = False )
487+ except Exception :
488+ self .log .error ("Invalid Shell Message" , exc_info = True )
489+ return
490+ else :
491+ idents , msg = None , {}
489492
490493 shell_id = msg .get ("metadata" , {}).get ("shell_id" )
491494
492495 if shell_id :
493496 self .subshell_queues [shell_id ].put ((idents , msg ))
494497 else :
498+ if not msg :
499+ args = ()
500+ else :
501+ args = (idents , msg )
495502 self .msg_queue .sync_q .put (
496503 (
497504 idx ,
498505 dispatch ,
499- idents ,
500- msg ,
506+ args ,
501507 )
502508 )
503509 # ensure the eventloop wakes up
@@ -1092,7 +1098,7 @@ async def stop_aborting():
10921098
10931099 # if we have a delay, give messages this long to arrive on the queue
10941100 # before we stop aborting requests
1095- asyncio . get_event_loop () .call_later (self .stop_on_error_timeout , schedule_stop_aborting )
1101+ self . shell_thread . io_loop .call_later (self .stop_on_error_timeout , schedule_stop_aborting )
10961102
10971103 def _send_abort_reply (self , stream , msg , idents ):
10981104 """Send a reply to an aborted request"""
0 commit comments