Skip to content

Commit 3106e2a

Browse files
committed
Use zmq stream not socket to send messages to shell channel thread
1 parent c90ab05 commit 3106e2a

File tree

4 files changed

+26
-10
lines changed

4 files changed

+26
-10
lines changed

ipykernel/kernelbase.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -387,7 +387,7 @@ async def dispatch_shell(self, msg, /, subshell_id: str | None = None):
387387
assert msg["header"].get("subshell_id") == subshell_id
388388

389389
if self._supports_kernel_subshells:
390-
stream = self.shell_channel_thread.manager.get_subshell_to_shell_channel_socket(
390+
stream = self.shell_channel_thread.manager.get_subshell_to_shell_channel_stream(
391391
subshell_id
392392
)
393393
else:
@@ -690,7 +690,7 @@ def _publish_status(self, status, channel, parent=None):
690690
def _publish_status_and_flush(self, status, channel, stream, parent=None):
691691
"""send status on IOPub and flush specified stream to ensure reply is sent before handling the next reply"""
692692
self._publish_status(status, channel, parent)
693-
if stream and hasattr(stream, "flush"):
693+
if stream:
694694
stream.flush(zmq.POLLOUT)
695695

696696
def _publish_debug_event(self, event):

ipykernel/socket_pair.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,30 @@ class SocketPair:
1515
"""
1616

1717
from_socket: zmq.Socket[Any]
18+
from_stream: ZMQStream | None = None
1819
to_socket: zmq.Socket[Any]
1920
to_stream: ZMQStream | None = None
2021
on_recv_callback: Any
2122
on_recv_copy: bool
2223

23-
def __init__(self, context: zmq.Context[Any], name: str):
24+
def __init__(
25+
self,
26+
context: zmq.Context[Any],
27+
name: str,
28+
from_io_loop: IOLoop | None = None,
29+
):
2430
self.from_socket = context.socket(zmq.PAIR)
2531
self.to_socket = context.socket(zmq.PAIR)
2632
address = self._address(name)
2733
self.from_socket.bind(address)
28-
self.to_socket.connect(address) # Or do I need to do this in another thread?
34+
self.to_socket.connect(address)
35+
36+
if from_io_loop is not None:
37+
self.from_stream = ZMQStream(self.from_socket, from_io_loop)
2938

3039
def close(self):
40+
if self.from_stream is not None:
41+
self.from_stream.close()
3142
self.from_socket.close()
3243

3344
if self.to_stream is not None:

ipykernel/subshell.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
"""A thread for a subshell."""
2+
from __future__ import annotations
23

34
from typing import Any
45

@@ -21,7 +22,7 @@ def __init__(
2122
super().__init__(name=f"subshell-{subshell_id}", **kwargs)
2223

2324
self.shell_channel_to_subshell = SocketPair(context, subshell_id)
24-
self.subshell_to_shell_channel = SocketPair(context, subshell_id + "-reverse")
25+
self.subshell_to_shell_channel = SocketPair(context, subshell_id + "-reverse", self.io_loop)
2526

2627
# When aborting flag is set, execute_request messages to this subshell will be aborted.
2728
self.aborting = False

ipykernel/subshell_manager.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
import zmq
1111
from tornado.ioloop import IOLoop
12+
from zmq.eventloop.zmqstream import ZMQStream
1213

1314
from .socket_pair import SocketPair
1415
from .subshell import SubshellThread
@@ -58,7 +59,7 @@ def __init__(
5859

5960
# Inproc socket pair for communication from main thread to shell channel thread.
6061
# such as for execute_reply messages.
61-
self._main_to_shell_channel = SocketPair(self._context, "main-reverse")
62+
self._main_to_shell_channel = SocketPair(self._context, "main-reverse", IOLoop.current())
6263
self._main_to_shell_channel.on_recv(
6364
self._shell_channel_io_loop, self._send_on_shell_channel
6465
)
@@ -84,11 +85,14 @@ def get_shell_channel_to_subshell_pair(self, subshell_id: str | None) -> SocketP
8485
with self._lock_cache:
8586
return self._cache[subshell_id].shell_channel_to_subshell
8687

87-
def get_subshell_to_shell_channel_socket(self, subshell_id: str | None) -> zmq.Socket[t.Any]:
88+
def get_subshell_to_shell_channel_stream(self, subshell_id: str | None) -> ZMQStream:
8889
if subshell_id is None:
89-
return self._main_to_shell_channel.from_socket
90-
with self._lock_cache:
91-
return self._cache[subshell_id].subshell_to_shell_channel.from_socket
90+
stream = self._main_to_shell_channel.from_stream
91+
else:
92+
with self._lock_cache:
93+
stream = self._cache[subshell_id].subshell_to_shell_channel.from_stream
94+
assert stream is not None
95+
return stream
9296

9397
def get_shell_channel_to_subshell_socket(self, subshell_id: str | None) -> zmq.Socket[t.Any]:
9498
return self.get_shell_channel_to_subshell_pair(subshell_id).from_socket

0 commit comments

Comments
 (0)