Skip to content

Commit 9bc9cc4

Browse files
committed
Replaced PUB socket with XPUB socket
1 parent 327589f commit 9bc9cc4

File tree

3 files changed

+67
-3
lines changed

3 files changed

+67
-3
lines changed

ipykernel/iostream.py

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
from typing import Any
2121

2222
import zmq
23-
from jupyter_client.session import extract_header
23+
from jupyter_client.session import extract_header, Session
2424
from tornado.ioloop import IOLoop
2525
from zmq.eventloop.zmqstream import ZMQStream
2626

@@ -73,12 +73,75 @@ def __init__(self, socket, pipe=False):
7373
self._event_pipe_gc_seconds: float = 10
7474
self._event_pipe_gc_task: asyncio.Task[Any] | None = None
7575
self._setup_event_pipe()
76+
self._setup_xpub_listener()
7677
self.thread = threading.Thread(target=self._thread_main, name="IOPub")
7778
self.thread.daemon = True
7879
self.thread.pydev_do_not_trace = True # type:ignore[attr-defined]
7980
self.thread.is_pydev_daemon_thread = True # type:ignore[attr-defined]
8081
self.thread.name = "IOPub"
8182

83+
def _setup_xpub_listener(self):
84+
"""Setup listener for XPUB subscription events"""
85+
86+
# Checks the socket is not a DummySocket
87+
if not hasattr(self.socket, 'getsockopt'):
88+
return
89+
90+
socket_type = self.socket.getsockopt(zmq.TYPE)
91+
if socket_type == zmq.XPUB:
92+
self._xpub_stream = ZMQStream(self.socket, self.io_loop)
93+
self._xpub_stream.on_recv(self._handle_subscription)
94+
95+
def _handle_subscription(self, frames):
96+
"""Handle subscription/unsubscription events from XPUB socket
97+
98+
XPUB sockets receive:
99+
- subscribe: single frame with b'\\x01' + topic
100+
- unsubscribe: single frame with b'\\x00' + topic
101+
"""
102+
103+
for frame in frames:
104+
event_type = frame[0]
105+
if event_type == 1:
106+
subscription = frame[1:] if len(frame) > 1 else b''
107+
try:
108+
subscription_str = subscription.decode('utf-8')
109+
except UnicodeDecodeError:
110+
continue
111+
self._send_welcome_message(subscription_str)
112+
113+
def _send_welcome_message(self, subscription):
114+
"""Send iopub_welcome message for new subscription
115+
116+
Parameters
117+
----------
118+
subscription : str
119+
The subscription topic (UTF-8 decoded)
120+
"""
121+
122+
content = {
123+
'subscription': subscription
124+
}
125+
126+
header = self.session.msg_header('iopub_welcome')
127+
msg = {
128+
'header': header,
129+
'parent_header': {},
130+
'metadata': {},
131+
'content': content,
132+
'buffers': [],
133+
}
134+
135+
msg_list = self.session.serialize(msg)
136+
137+
if subscription:
138+
identity = subscription.encode('utf-8')
139+
full_msg = [identity] + msg_list
140+
else:
141+
full_msg = msg_list
142+
# Send directly on socket (we're already in IO thread context)
143+
self.socket.send_multipart(full_msg)
144+
82145
def _thread_main(self):
83146
"""The inner loop that's actually run in a thread"""
84147

ipykernel/kernelapp.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -377,12 +377,13 @@ def init_control(self, context):
377377

378378
def init_iopub(self, context):
379379
"""Initialize the iopub channel."""
380-
self.iopub_socket = context.socket(zmq.PUB)
380+
self.iopub_socket = context.socket(zmq.XPUB)
381381
self.iopub_socket.linger = 1000
382382
self.iopub_port = self._bind_socket(self.iopub_socket, self.iopub_port)
383383
self.log.debug("iopub PUB Channel on port: %i", self.iopub_port)
384384
self.configure_tornado_logger()
385385
self.iopub_thread = IOPubThread(self.iopub_socket, pipe=True)
386+
self.iopub_thread.session = self.session
386387
self.iopub_thread.start()
387388
# backward-compat: wrap iopub socket API in background thread
388389
self.iopub_socket = self.iopub_thread.background_socket

tests/conftest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ class KernelMixin:
4848

4949
def _initialize(self):
5050
self.context = context = zmq.Context()
51-
self.iopub_socket = context.socket(zmq.PUB)
51+
self.iopub_socket = context.socket(zmq.XPUB)
5252
self.stdin_socket = context.socket(zmq.ROUTER)
5353
self.session = Session()
5454
self.test_sockets = [self.iopub_socket]

0 commit comments

Comments
 (0)