@@ -205,6 +205,9 @@ def _default_ident(self):
205205 # see https://github.com/jupyterlab/jupyterlab/issues/17785
206206 _parent_ident : Mapping [str , bytes ]
207207
208+ # Asyncio lock for main shell thread.
209+ _main_asyncio_lock : asyncio .Lock
210+
208211 @property
209212 def _parent_header (self ):
210213 warnings .warn (
@@ -327,6 +330,8 @@ def __init__(self, **kwargs):
327330 }
328331 )
329332
333+ self ._main_asyncio_lock = asyncio .Lock ()
334+
330335 async def dispatch_control (self , msg ):
331336 """Dispatch a control request, ensuring only one message is processed at a time."""
332337 # Ensure only one control message is processed at a time
@@ -649,56 +654,53 @@ async def shell_channel_thread_main(self, msg):
649654 """Handler for shell messages received on shell_channel_thread"""
650655 assert threading .current_thread () == self .shell_channel_thread
651656
652- if self .session is None :
653- return
657+ async with self .shell_channel_thread .asyncio_lock :
658+ if self .session is None :
659+ return
654660
655- # deserialize only the header to get subshell_id
656- # Keep original message to send to subshell_id unmodified.
657- _ , msg2 = self .session .feed_identities (msg , copy = False )
658- try :
659- msg3 = self .session .deserialize (msg2 , content = False , copy = False )
660- subshell_id = msg3 ["header" ].get ("subshell_id" )
661-
662- # Find inproc pair socket to use to send message to correct subshell.
663- subshell_manager = self .shell_channel_thread .manager
664- socket = subshell_manager .get_shell_channel_to_subshell_socket (subshell_id )
665- assert socket is not None
666- socket .send_multipart (msg , copy = False )
667- except Exception :
668- self .log .error ("Invalid message" , exc_info = True ) # noqa: G201
661+ # deserialize only the header to get subshell_id
662+ # Keep original message to send to subshell_id unmodified.
663+ _ , msg2 = self .session .feed_identities (msg , copy = False )
664+ try :
665+ msg3 = self .session .deserialize (msg2 , content = False , copy = False )
666+ subshell_id = msg3 ["header" ].get ("subshell_id" )
667+
668+ # Find inproc pair socket to use to send message to correct subshell.
669+ subshell_manager = self .shell_channel_thread .manager
670+ socket = subshell_manager .get_shell_channel_to_subshell_socket (subshell_id )
671+ assert socket is not None
672+ socket .send_multipart (msg , copy = False )
673+ except Exception :
674+ self .log .error ("Invalid message" , exc_info = True ) # noqa: G201
669675
670- if self .shell_stream :
671- self .shell_stream .flush ()
676+ if self .shell_stream :
677+ self .shell_stream .flush ()
672678
673679 async def shell_main (self , subshell_id : str | None , msg ):
674680 """Handler of shell messages for a single subshell"""
675681 if self ._supports_kernel_subshells :
676682 if subshell_id is None :
677683 assert threading .current_thread () == threading .main_thread ()
684+ asyncio_lock = self ._main_asyncio_lock
678685 else :
679686 assert threading .current_thread () not in (
680687 self .shell_channel_thread ,
681688 threading .main_thread (),
682689 )
683- socket_pair = self .shell_channel_thread .manager .get_shell_channel_to_subshell_pair (
684- subshell_id
685- )
690+ asyncio_lock = self .shell_channel_thread .manager .get_subshell_asyncio_lock (
691+ subshell_id
692+ )
686693 else :
687694 assert subshell_id is None
688695 assert threading .current_thread () == threading .main_thread ()
689- socket_pair = None
696+ asyncio_lock = self . _main_asyncio_lock
690697
691- try :
692- # Whilst executing a shell message, do not accept any other shell messages on the
693- # same subshell, so that cells are run sequentially. Without this we can run multiple
694- # async cells at the same time which would be a nice feature to have but is an API
695- # change.
696- if socket_pair :
697- socket_pair .pause_on_recv ()
698+ # Whilst executing a shell message, do not accept any other shell messages on the
699+ # same subshell, so that cells are run sequentially. Without this we can run multiple
700+ # async cells at the same time which would be a nice feature to have but is an API
701+ # change.
702+ async with asyncio_lock :
698703 await self .dispatch_shell (msg , subshell_id = subshell_id )
699- finally :
700- if socket_pair :
701- socket_pair .resume_on_recv ()
702704
703705 def record_ports (self , ports ):
704706 """Record the ports that this kernel is using.
0 commit comments