Skip to content

Commit 2150d10

Browse files
committed
Replaced PUB socket with XPUB socket
1 parent 327589f commit 2150d10

File tree

3 files changed

+64
-2
lines changed

3 files changed

+64
-2
lines changed

ipykernel/iostream.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,12 +73,73 @@ def __init__(self, socket, pipe=False):
7373
self._event_pipe_gc_seconds: float = 10
7474
self._event_pipe_gc_task: asyncio.Task[Any] | None = None
7575
self._setup_event_pipe()
76+
self._setup_xpub_listener()
7677
self.thread = threading.Thread(target=self._thread_main, name="IOPub")
7778
self.thread.daemon = True
7879
self.thread.pydev_do_not_trace = True # type:ignore[attr-defined]
7980
self.thread.is_pydev_daemon_thread = True # type:ignore[attr-defined]
8081
self.thread.name = "IOPub"
8182

83+
def _setup_xpub_listener(self):
84+
"""Setup listener for XPUB subscription events"""
85+
86+
# Checks the socket is not a DummySocket
87+
if not hasattr(self.socket, "getsockopt"):
88+
return
89+
90+
socket_type = self.socket.getsockopt(zmq.TYPE)
91+
if socket_type == zmq.XPUB:
92+
self._xpub_stream = ZMQStream(self.socket, self.io_loop)
93+
self._xpub_stream.on_recv(self._handle_subscription)
94+
95+
def _handle_subscription(self, frames):
96+
"""Handle subscription/unsubscription events from XPUB socket
97+
98+
XPUB sockets receive:
99+
- subscribe: single frame with b'\\x01' + topic
100+
- unsubscribe: single frame with b'\\x00' + topic
101+
"""
102+
103+
for frame in frames:
104+
event_type = frame[0]
105+
if event_type == 1:
106+
subscription = frame[1:] if len(frame) > 1 else b""
107+
try:
108+
subscription_str = subscription.decode("utf-8")
109+
except UnicodeDecodeError:
110+
continue
111+
self._send_welcome_message(subscription_str)
112+
113+
def _send_welcome_message(self, subscription):
114+
"""Send iopub_welcome message for new subscription
115+
116+
Parameters
117+
----------
118+
subscription : str
119+
The subscription topic (UTF-8 decoded)
120+
"""
121+
122+
content = {"subscription": subscription}
123+
124+
header = self.session.msg_header("iopub_welcome")
125+
msg = {
126+
"header": header,
127+
"parent_header": {},
128+
"metadata": {},
129+
"content": content,
130+
"buffers": [],
131+
}
132+
133+
msg_list = self.session.serialize(msg)
134+
135+
if subscription:
136+
identity = subscription.encode("utf-8")
137+
full_msg = [identity] + msg_list
138+
else:
139+
full_msg = msg_list
140+
# Send directly on socket (we're already in IO thread context)
141+
self.socket.send_multipart(full_msg)
142+
82143
def _thread_main(self):
83144
"""The inner loop that's actually run in a thread"""
84145

ipykernel/kernelapp.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -377,12 +377,13 @@ def init_control(self, context):
377377

378378
def init_iopub(self, context):
379379
"""Initialize the iopub channel."""
380-
self.iopub_socket = context.socket(zmq.PUB)
380+
self.iopub_socket = context.socket(zmq.XPUB)
381381
self.iopub_socket.linger = 1000
382382
self.iopub_port = self._bind_socket(self.iopub_socket, self.iopub_port)
383383
self.log.debug("iopub PUB Channel on port: %i", self.iopub_port)
384384
self.configure_tornado_logger()
385385
self.iopub_thread = IOPubThread(self.iopub_socket, pipe=True)
386+
self.iopub_thread.session = self.session
386387
self.iopub_thread.start()
387388
# backward-compat: wrap iopub socket API in background thread
388389
self.iopub_socket = self.iopub_thread.background_socket

tests/conftest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ class KernelMixin:
4848

4949
def _initialize(self):
5050
self.context = context = zmq.Context()
51-
self.iopub_socket = context.socket(zmq.PUB)
51+
self.iopub_socket = context.socket(zmq.XPUB)
5252
self.stdin_socket = context.socket(zmq.ROUTER)
5353
self.session = Session()
5454
self.test_sockets = [self.iopub_socket]

0 commit comments

Comments
 (0)