Skip to content

Commit dcfe11d

Browse files
committed
Run control channel in separate thread
1 parent aba2179 commit dcfe11d

File tree

6 files changed

+89
-85
lines changed

6 files changed

+89
-85
lines changed

ipykernel/control.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
2+
from threading import Thread
3+
import zmq
4+
if zmq.pyzmq_version_info() >= (17, 0):
5+
from tornado.ioloop import IOLoop
6+
else:
7+
# deprecated since pyzmq 17
8+
from zmq.eventloop.ioloop import IOLoop
9+
10+
11+
class ControlThread(Thread):
12+
13+
def __init__(self, context):
14+
Thread.__init__(self)
15+
self.context = context
16+
self.io_loop = IOLoop(make_current=False)
17+
18+
def run(self):
19+
self.io_loop.make_current()
20+
self.io_loop.start()
21+
self.io_loop.close(all_fds=True)

ipykernel/eventloops.py

Lines changed: 16 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -114,11 +114,7 @@ def loop_qt4(kernel):
114114

115115
kernel.app = get_app_qt4([" "])
116116
kernel.app.setQuitOnLastWindowClosed(False)
117-
118-
# Only register the eventloop for the shell stream because doing
119-
# it for the control stream is generating a bunch of unnecessary
120-
# warnings on Windows.
121-
_notify_stream_qt(kernel, kernel.shell_streams[0])
117+
_notify_stream_qt(kernel, kernel.shell_stream)
122118

123119
_loop_qt(kernel.app)
124120

@@ -160,10 +156,9 @@ def loop_wx(kernel):
160156

161157
def wake():
162158
"""wake from wx"""
163-
for stream in kernel.shell_streams:
164-
if stream.flush(limit=1):
165-
kernel.app.ExitMainLoop()
166-
return
159+
if kernel.shell_stream.flush(limit=1):
160+
kernel.app.ExitMainLoop()
161+
return
167162

168163
# We have to put the wx.Timer in a wx.Frame for it to fire properly.
169164
# We make the Frame hidden when we create it in the main app below.
@@ -237,13 +232,12 @@ def process_stream_events(stream, *a, **kw):
237232
# For Tkinter, we create a Tk object and call its withdraw method.
238233
kernel.app_wrapper = BasicAppWrapper(app)
239234

240-
for stream in kernel.shell_streams:
241-
notifier = partial(process_stream_events, stream)
242-
# seems to be needed for tk
243-
notifier.__name__ = "notifier"
244-
app.tk.createfilehandler(stream.getsockopt(zmq.FD), READABLE, notifier)
245-
# schedule initial call after start
246-
app.after(0, notifier)
235+
notifier = partial(process_stream_events, shell_stream)
236+
# seems to be needed for tk
237+
notifier.__name__ = "notifier"
238+
app.tk.createfilehandler(shell_stream.getsockopt(zmq.FD), READABLE, notifier)
239+
# schedule initial call after start
240+
app.after(0, notifier)
247241

248242
app.mainloop()
249243

@@ -330,10 +324,9 @@ def handle_int(etype, value, tb):
330324
# don't let interrupts during mainloop invoke crash_handler:
331325
sys.excepthook = handle_int
332326
mainloop(kernel._poll_interval)
333-
for stream in kernel.shell_streams:
334-
if stream.flush(limit=1):
335-
# events to process, return control to kernel
336-
return
327+
if kernel_shell_stream.flush(limit=1):
328+
# events to process, return control to kernel
329+
return
337330
except:
338331
raise
339332
except KeyboardInterrupt:
@@ -371,11 +364,9 @@ def process_stream_events(stream):
371364
if stream.flush(limit=1):
372365
loop.stop()
373366

374-
for stream in kernel.shell_streams:
375-
fd = stream.getsockopt(zmq.FD)
376-
notifier = partial(process_stream_events, stream)
377-
loop.add_reader(fd, notifier)
378-
loop.call_soon(notifier)
367+
notifier = partial(process_stream_events, shell_stream)
368+
loop.add_reader(shell_stream.getsockopt(zmq.FD), notifier)
369+
loop.call_soon(notifier)
379370

380371
while True:
381372
error = None

ipykernel/inprocess/client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ def _dispatch_to_kernel(self, msg):
174174
stream = DummySocket()
175175
self.session.send(stream, msg)
176176
msg_parts = stream.recv_multipart()
177-
kernel.dispatch_shell(stream, msg_parts)
177+
kernel.dispatch_shell(msg_parts)
178178

179179
idents, reply_msg = self.session.recv(stream, copy=False)
180180
self.shell_channel.call_handlers_later(reply_msg)

ipykernel/inprocess/ipkernel.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ class InProcessKernel(IPythonKernel):
4949
#-------------------------------------------------------------------------
5050

5151
shell_class = Type(allow_none=True)
52-
shell_streams = List()
52+
shell_stream = Any()
5353
control_stream = Any()
5454
_underlying_iopub_socket = Instance(DummySocket, ())
5555
iopub_thread = Instance(IOPubThread)

ipykernel/kernelapp.py

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636

3737
# local imports
3838
from .iostream import IOPubThread
39+
from .control import ControlThread
3940
from .heartbeat import Heartbeat
4041
from .ipkernel import IPythonKernel
4142
from .parentpoller import ParentPollerUnix, ParentPollerWindows
@@ -124,6 +125,7 @@ class IPKernelApp(BaseIPythonApplication, InteractiveShellApp,
124125
stdin_socket = Any()
125126
iopub_socket = Any()
126127
iopub_thread = Any()
128+
control_thread = Any()
127129

128130
ports = Dict()
129131

@@ -276,6 +278,17 @@ def init_sockets(self):
276278
self.stdin_port = self._bind_socket(self.stdin_socket, self.stdin_port)
277279
self.log.debug("stdin ROUTER Channel on port: %i" % self.stdin_port)
278280

281+
if hasattr(zmq, 'ROUTER_HANDOVER'):
282+
# set router-handover to workaround zeromq reconnect problems
283+
# in certain rare circumstances
284+
# see ipython/ipykernel#270 and zeromq/libzmq#2892
285+
self.shell_socket.router_handover = \
286+
self.stdin_socket.router_handover = 1
287+
288+
self.init_control(context)
289+
self.init_iopub(context)
290+
291+
def init_control(self, context):
279292
self.control_socket = context.socket(zmq.ROUTER)
280293
self.control_socket.linger = 1000
281294
self.control_port = self._bind_socket(self.control_socket, self.control_port)
@@ -285,11 +298,10 @@ def init_sockets(self):
285298
# set router-handover to workaround zeromq reconnect problems
286299
# in certain rare circumstances
287300
# see ipython/ipykernel#270 and zeromq/libzmq#2892
288-
self.shell_socket.router_handover = \
289-
self.control_socket.router_handover = \
290-
self.stdin_socket.router_handover = 1
301+
self.control_socket.router_handover = 1
291302

292-
self.init_iopub(context)
303+
self.control_thread = ControlThread(self.control_socket)
304+
self.control_thread.start()
293305

294306
def init_iopub(self, context):
295307
self.iopub_socket = context.socket(zmq.PUB)
@@ -437,13 +449,13 @@ def init_signal(self):
437449
def init_kernel(self):
438450
"""Create the Kernel object itself"""
439451
shell_stream = ZMQStream(self.shell_socket)
440-
control_stream = ZMQStream(self.control_socket)
452+
control_stream = ZMQStream(self.control_socket, self.control_thread.io_loop)
441453

442454
kernel_factory = self.kernel_class.instance
443455

444456
kernel = kernel_factory(parent=self, session=self.session,
445457
control_stream=control_stream,
446-
shell_streams=[shell_stream, control_stream],
458+
shell_stream=shell_stream,
447459
iopub_thread=self.iopub_thread,
448460
iopub_socket=self.iopub_socket,
449461
stdin_socket=self.stdin_socket,

ipykernel/kernelbase.py

Lines changed: 32 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222
from tornado import ioloop
2323
from tornado import gen
24-
from tornado.queues import PriorityQueue, QueueEmpty
24+
from tornado.queues import Queue, QueueEmpty
2525
import zmq
2626
from zmq.eventloop.zmqstream import ZMQStream
2727

@@ -38,9 +38,6 @@
3838

3939
from ._version import kernel_protocol_version
4040

41-
CONTROL_PRIORITY = 1
42-
SHELL_PRIORITY = 10
43-
4441

4542
class Kernel(SingletonConfigurable):
4643

@@ -60,7 +57,7 @@ def _update_eventloop(self, change):
6057

6158
session = Instance(Session, allow_none=True)
6259
profile_dir = Instance('IPython.core.profiledir.ProfileDir', allow_none=True)
63-
shell_streams = List()
60+
shell_stream = Instance(ZMQStream, allow_none=True)
6461
control_stream = Instance(ZMQStream, allow_none=True)
6562
iopub_socket = Any()
6663
iopub_thread = Any()
@@ -215,7 +212,7 @@ def should_handle(self, stream, msg, idents):
215212
return True
216213

217214
@gen.coroutine
218-
def dispatch_shell(self, stream, msg):
215+
def dispatch_shell(self, msg):
219216
"""dispatch shell requests"""
220217
idents, msg = self.session.feed_identities(msg, copy=False)
221218
try:
@@ -232,11 +229,11 @@ def dispatch_shell(self, stream, msg):
232229

233230
# Only abort execute requests
234231
if self._aborting and msg_type == 'execute_request':
235-
self._send_abort_reply(stream, msg, idents)
232+
self._send_abort_reply(self.shell_stream, msg, idents)
236233
self._publish_status('idle')
237234
# flush to ensure reply is sent before
238235
# handling the next request
239-
stream.flush(zmq.POLLOUT)
236+
self.shell_stream.flush(zmq.POLLOUT)
240237
return
241238

242239
# Print some info about this message and leave a '--->' marker, so it's
@@ -245,7 +242,7 @@ def dispatch_shell(self, stream, msg):
245242
self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
246243
self.log.debug(' Content: %s\n --->\n ', msg['content'])
247244

248-
if not self.should_handle(stream, msg, idents):
245+
if not self.should_handle(self.shell_stream, msg, idents):
249246
return
250247

251248
handler = self.shell_handlers.get(msg_type, None)
@@ -258,7 +255,7 @@ def dispatch_shell(self, stream, msg):
258255
except Exception:
259256
self.log.debug("Unable to signal in pre_handler_hook:", exc_info=True)
260257
try:
261-
yield gen.maybe_future(handler(stream, idents, msg))
258+
yield gen.maybe_future(handler(self.shell_stream, idents, msg))
262259
except Exception:
263260
self.log.error("Exception in message handler:", exc_info=True)
264261
finally:
@@ -272,7 +269,7 @@ def dispatch_shell(self, stream, msg):
272269
self._publish_status('idle')
273270
# flush to ensure reply is sent before
274271
# handling the next request
275-
stream.flush(zmq.POLLOUT)
272+
self.shell_stream.flush(zmq.POLLOUT)
276273

277274
def pre_handler_hook(self):
278275
"""Hook to execute before calling message handler"""
@@ -332,27 +329,22 @@ def do_one_iteration(self):
332329
.. versionchanged:: 5
333330
This is now a coroutine
334331
"""
335-
# flush messages off of shell streams into the message queue
336-
for stream in self.shell_streams:
337-
stream.flush()
338-
# process all messages higher priority than shell (control),
339-
# and at most one shell message per iteration
340-
priority = 0
341-
while priority is not None and priority < SHELL_PRIORITY:
342-
priority = yield self.process_one(wait=False)
332+
# flush messages off of shell stream into the message queue
333+
self.shell_stream.flush()
334+
# process at most one shell message per iteration
335+
yield self.process_one(wait=False)
343336

344337
@gen.coroutine
345338
def process_one(self, wait=True):
346339
"""Process one request
347340
348-
Returns priority of the message handled.
349341
Returns None if no message was handled.
350342
"""
351343
if wait:
352-
priority, t, dispatch, args = yield self.msg_queue.get()
344+
t, dispatch, args = yield self.msg_queue.get()
353345
else:
354346
try:
355-
priority, t, dispatch, args = self.msg_queue.get_nowait()
347+
t, dispatch, args = self.msg_queue.get_nowait()
356348
except QueueEmpty:
357349
return None
358350
yield gen.maybe_future(dispatch(*args))
@@ -377,21 +369,18 @@ def dispatch_queue(self):
377369

378370
_message_counter = Any(
379371
help="""Monotonic counter of messages
380-
381-
Ensures messages of the same priority are handled in arrival order.
382372
""",
383373
)
384374
@default('_message_counter')
385375
def _message_counter_default(self):
386376
return itertools.count()
387377

388-
def schedule_dispatch(self, priority, dispatch, *args):
378+
def schedule_dispatch(self, dispatch, *args):
389379
"""schedule a message for dispatch"""
390380
idx = next(self._message_counter)
391381

392382
self.msg_queue.put_nowait(
393383
(
394-
priority,
395384
idx,
396385
dispatch,
397386
args,
@@ -403,32 +392,24 @@ def schedule_dispatch(self, priority, dispatch, *args):
403392
def start(self):
404393
"""register dispatchers for streams"""
405394
self.io_loop = ioloop.IOLoop.current()
406-
self.msg_queue = PriorityQueue()
395+
self.msg_queue = Queue()
407396
self.io_loop.add_callback(self.dispatch_queue)
408397

398+
self.control_stream.on_recv(
399+
partial(
400+
self.schedule_dispatch,
401+
self.dispatch_control,
402+
),
403+
copy=False,
404+
)
409405

410-
if self.control_stream:
411-
self.control_stream.on_recv(
412-
partial(
413-
self.schedule_dispatch,
414-
CONTROL_PRIORITY,
415-
self.dispatch_control,
416-
),
417-
copy=False,
418-
)
419-
420-
for s in self.shell_streams:
421-
if s is self.control_stream:
422-
continue
423-
s.on_recv(
424-
partial(
425-
self.schedule_dispatch,
426-
SHELL_PRIORITY,
427-
self.dispatch_shell,
428-
s,
429-
),
430-
copy=False,
431-
)
406+
self.shell_stream.on_recv(
407+
partial(
408+
self.schedule_dispatch,
409+
self.dispatch_shell,
410+
),
411+
copy=False,
412+
)
432413

433414
# publish idle status
434415
self._publish_status('starting')
@@ -784,8 +765,7 @@ def _topic(self, topic):
784765

785766
@gen.coroutine
786767
def _abort_queues(self):
787-
for stream in self.shell_streams:
788-
stream.flush()
768+
self.shell_stream.flush()
789769
self._aborting = True
790770

791771
def stop_aborting(f):
@@ -909,4 +889,4 @@ def _at_shutdown(self):
909889
if self._shutdown_message is not None:
910890
self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown'))
911891
self.log.debug("%s", self._shutdown_message)
912-
[ s.flush(zmq.POLLOUT) for s in self.shell_streams ]
892+
self.shell_stream.flush(zmq.POLLOUT)

0 commit comments

Comments
 (0)