From d2e306ed69f3d33891eaa673639306adce5ebb13 Mon Sep 17 00:00:00 2001 From: Sylvain Corlay Date: Wed, 27 Jan 2021 23:30:07 +0100 Subject: [PATCH 01/12] Run control channel in separate thread --- ipykernel/control.py | 21 +++++++++ ipykernel/eventloops.py | 41 +++++++--------- ipykernel/inprocess/client.py | 5 +- ipykernel/inprocess/ipkernel.py | 4 +- ipykernel/kernelapp.py | 24 +++++++--- ipykernel/kernelbase.py | 84 +++++++++++++-------------------- 6 files changed, 91 insertions(+), 88 deletions(-) create mode 100644 ipykernel/control.py diff --git a/ipykernel/control.py b/ipykernel/control.py new file mode 100644 index 000000000..d06d9ea34 --- /dev/null +++ b/ipykernel/control.py @@ -0,0 +1,21 @@ + +from threading import Thread +import zmq +if zmq.pyzmq_version_info() >= (17, 0): + from tornado.ioloop import IOLoop +else: + # deprecated since pyzmq 17 + from zmq.eventloop.ioloop import IOLoop + + +class ControlThread(Thread): + + def __init__(self, context): + Thread.__init__(self) + self.context = context + self.io_loop = IOLoop(make_current=False) + + def run(self): + self.io_loop.make_current() + self.io_loop.start() + self.io_loop.close(all_fds=True) diff --git a/ipykernel/eventloops.py b/ipykernel/eventloops.py index 087fad7fc..a71121bdd 100644 --- a/ipykernel/eventloops.py +++ b/ipykernel/eventloops.py @@ -114,11 +114,7 @@ def loop_qt4(kernel): kernel.app = get_app_qt4([" "]) kernel.app.setQuitOnLastWindowClosed(False) - - # Only register the eventloop for the shell stream because doing - # it for the control stream is generating a bunch of unnecessary - # warnings on Windows. - _notify_stream_qt(kernel, kernel.shell_streams[0]) + _notify_stream_qt(kernel, kernel.shell_stream) _loop_qt(kernel.app) @@ -160,10 +156,9 @@ def loop_wx(kernel): def wake(): """wake from wx""" - for stream in kernel.shell_streams: - if stream.flush(limit=1): - kernel.app.ExitMainLoop() - return + if kernel.shell_stream.flush(limit=1): + kernel.app.ExitMainLoop() + return # We have to put the wx.Timer in a wx.Frame for it to fire properly. # We make the Frame hidden when we create it in the main app below. @@ -237,13 +232,12 @@ def process_stream_events(stream, *a, **kw): # For Tkinter, we create a Tk object and call its withdraw method. kernel.app_wrapper = BasicAppWrapper(app) - for stream in kernel.shell_streams: - notifier = partial(process_stream_events, stream) - # seems to be needed for tk - notifier.__name__ = "notifier" - app.tk.createfilehandler(stream.getsockopt(zmq.FD), READABLE, notifier) - # schedule initial call after start - app.after(0, notifier) + notifier = partial(process_stream_events, shell_stream) + # seems to be needed for tk + notifier.__name__ = "notifier" + app.tk.createfilehandler(shell_stream.getsockopt(zmq.FD), READABLE, notifier) + # schedule initial call after start + app.after(0, notifier) app.mainloop() @@ -330,10 +324,9 @@ def handle_int(etype, value, tb): # don't let interrupts during mainloop invoke crash_handler: sys.excepthook = handle_int mainloop(kernel._poll_interval) - for stream in kernel.shell_streams: - if stream.flush(limit=1): - # events to process, return control to kernel - return + if kernel_shell_stream.flush(limit=1): + # events to process, return control to kernel + return except: raise except KeyboardInterrupt: @@ -371,11 +364,9 @@ def process_stream_events(stream): if stream.flush(limit=1): loop.stop() - for stream in kernel.shell_streams: - fd = stream.getsockopt(zmq.FD) - notifier = partial(process_stream_events, stream) - loop.add_reader(fd, notifier) - loop.call_soon(notifier) + notifier = partial(process_stream_events, shell_stream) + loop.add_reader(shell_stream.getsockopt(zmq.FD), notifier) + loop.call_soon(notifier) while True: error = None diff --git a/ipykernel/inprocess/client.py b/ipykernel/inprocess/client.py index 5de4f774d..e6b3b358a 100644 --- a/ipykernel/inprocess/client.py +++ b/ipykernel/inprocess/client.py @@ -12,7 +12,6 @@ #----------------------------------------------------------------------------- # IPython imports -from ipykernel.inprocess.socket import DummySocket from traitlets import Type, Instance, default from jupyter_client.clientabc import KernelClientABC from jupyter_client.client import KernelClient @@ -171,10 +170,10 @@ def _dispatch_to_kernel(self, msg): if kernel is None: raise RuntimeError('Cannot send request. No kernel exists.') - stream = DummySocket() + stream = kernel.shell_stream self.session.send(stream, msg) msg_parts = stream.recv_multipart() - kernel.dispatch_shell(stream, msg_parts) + kernel.dispatch_shell(msg_parts) idents, reply_msg = self.session.recv(stream, copy=False) self.shell_channel.call_handlers_later(reply_msg) diff --git a/ipykernel/inprocess/ipkernel.py b/ipykernel/inprocess/ipkernel.py index 3b51c8e5b..dd1bc6581 100644 --- a/ipykernel/inprocess/ipkernel.py +++ b/ipykernel/inprocess/ipkernel.py @@ -49,11 +49,11 @@ class InProcessKernel(IPythonKernel): #------------------------------------------------------------------------- shell_class = Type(allow_none=True) - shell_streams = List() - control_stream = Any() _underlying_iopub_socket = Instance(DummySocket, ()) iopub_thread = Instance(IOPubThread) + shell_stream = Instance(DummySocket, ()) + @default('iopub_thread') def _default_iopub_thread(self): thread = IOPubThread(self._underlying_iopub_socket) diff --git a/ipykernel/kernelapp.py b/ipykernel/kernelapp.py index ee691508e..6bda6f29a 100644 --- a/ipykernel/kernelapp.py +++ b/ipykernel/kernelapp.py @@ -36,6 +36,7 @@ # local imports from .iostream import IOPubThread +from .control import ControlThread from .heartbeat import Heartbeat from .ipkernel import IPythonKernel from .parentpoller import ParentPollerUnix, ParentPollerWindows @@ -124,6 +125,7 @@ class IPKernelApp(BaseIPythonApplication, InteractiveShellApp, stdin_socket = Any() iopub_socket = Any() iopub_thread = Any() + control_thread = Any() ports = Dict() @@ -276,6 +278,17 @@ def init_sockets(self): self.stdin_port = self._bind_socket(self.stdin_socket, self.stdin_port) self.log.debug("stdin ROUTER Channel on port: %i" % self.stdin_port) + if hasattr(zmq, 'ROUTER_HANDOVER'): + # set router-handover to workaround zeromq reconnect problems + # in certain rare circumstances + # see ipython/ipykernel#270 and zeromq/libzmq#2892 + self.shell_socket.router_handover = \ + self.stdin_socket.router_handover = 1 + + self.init_control(context) + self.init_iopub(context) + + def init_control(self, context): self.control_socket = context.socket(zmq.ROUTER) self.control_socket.linger = 1000 self.control_port = self._bind_socket(self.control_socket, self.control_port) @@ -285,11 +298,10 @@ def init_sockets(self): # set router-handover to workaround zeromq reconnect problems # in certain rare circumstances # see ipython/ipykernel#270 and zeromq/libzmq#2892 - self.shell_socket.router_handover = \ - self.control_socket.router_handover = \ - self.stdin_socket.router_handover = 1 + self.control_socket.router_handover = 1 - self.init_iopub(context) + self.control_thread = ControlThread(self.control_socket) + self.control_thread.start() def init_iopub(self, context): self.iopub_socket = context.socket(zmq.PUB) @@ -437,13 +449,13 @@ def init_signal(self): def init_kernel(self): """Create the Kernel object itself""" shell_stream = ZMQStream(self.shell_socket) - control_stream = ZMQStream(self.control_socket) + control_stream = ZMQStream(self.control_socket, self.control_thread.io_loop) kernel_factory = self.kernel_class.instance kernel = kernel_factory(parent=self, session=self.session, control_stream=control_stream, - shell_streams=[shell_stream, control_stream], + shell_stream=shell_stream, iopub_thread=self.iopub_thread, iopub_socket=self.iopub_socket, stdin_socket=self.stdin_socket, diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index aba3223a9..67dd939fe 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -21,7 +21,7 @@ from tornado import ioloop from tornado import gen -from tornado.queues import PriorityQueue, QueueEmpty +from tornado.queues import Queue, QueueEmpty import zmq from zmq.eventloop.zmqstream import ZMQStream @@ -38,9 +38,6 @@ from ._version import kernel_protocol_version -CONTROL_PRIORITY = 1 -SHELL_PRIORITY = 10 - class Kernel(SingletonConfigurable): @@ -60,7 +57,7 @@ def _update_eventloop(self, change): session = Instance(Session, allow_none=True) profile_dir = Instance('IPython.core.profiledir.ProfileDir', allow_none=True) - shell_streams = List() + shell_stream = Instance(ZMQStream, allow_none=True) control_stream = Instance(ZMQStream, allow_none=True) iopub_socket = Any() iopub_thread = Any() @@ -215,7 +212,7 @@ def should_handle(self, stream, msg, idents): return True @gen.coroutine - def dispatch_shell(self, stream, msg): + def dispatch_shell(self, msg): """dispatch shell requests""" idents, msg = self.session.feed_identities(msg, copy=False) try: @@ -232,11 +229,11 @@ def dispatch_shell(self, stream, msg): # Only abort execute requests if self._aborting and msg_type == 'execute_request': - self._send_abort_reply(stream, msg, idents) + self._send_abort_reply(self.shell_stream, msg, idents) self._publish_status('idle') # flush to ensure reply is sent before # handling the next request - stream.flush(zmq.POLLOUT) + self.shell_stream.flush(zmq.POLLOUT) return # Print some info about this message and leave a '--->' marker, so it's @@ -245,7 +242,7 @@ def dispatch_shell(self, stream, msg): self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type) self.log.debug(' Content: %s\n --->\n ', msg['content']) - if not self.should_handle(stream, msg, idents): + if not self.should_handle(self.shell_stream, msg, idents): return handler = self.shell_handlers.get(msg_type, None) @@ -258,7 +255,7 @@ def dispatch_shell(self, stream, msg): except Exception: self.log.debug("Unable to signal in pre_handler_hook:", exc_info=True) try: - yield gen.maybe_future(handler(stream, idents, msg)) + yield gen.maybe_future(handler(self.shell_stream, idents, msg)) except Exception: self.log.error("Exception in message handler:", exc_info=True) finally: @@ -272,7 +269,7 @@ def dispatch_shell(self, stream, msg): self._publish_status('idle') # flush to ensure reply is sent before # handling the next request - stream.flush(zmq.POLLOUT) + self.shell_stream.flush(zmq.POLLOUT) def pre_handler_hook(self): """Hook to execute before calling message handler""" @@ -332,27 +329,22 @@ def do_one_iteration(self): .. versionchanged:: 5 This is now a coroutine """ - # flush messages off of shell streams into the message queue - for stream in self.shell_streams: - stream.flush() - # process all messages higher priority than shell (control), - # and at most one shell message per iteration - priority = 0 - while priority is not None and priority < SHELL_PRIORITY: - priority = yield self.process_one(wait=False) + # flush messages off of shell stream into the message queue + self.shell_stream.flush() + # process at most one shell message per iteration + yield self.process_one(wait=False) @gen.coroutine def process_one(self, wait=True): """Process one request - Returns priority of the message handled. Returns None if no message was handled. """ if wait: - priority, t, dispatch, args = yield self.msg_queue.get() + t, dispatch, args = yield self.msg_queue.get() else: try: - priority, t, dispatch, args = self.msg_queue.get_nowait() + t, dispatch, args = self.msg_queue.get_nowait() except QueueEmpty: return None yield gen.maybe_future(dispatch(*args)) @@ -377,21 +369,18 @@ def dispatch_queue(self): _message_counter = Any( help="""Monotonic counter of messages - - Ensures messages of the same priority are handled in arrival order. """, ) @default('_message_counter') def _message_counter_default(self): return itertools.count() - def schedule_dispatch(self, priority, dispatch, *args): + def schedule_dispatch(self, dispatch, *args): """schedule a message for dispatch""" idx = next(self._message_counter) self.msg_queue.put_nowait( ( - priority, idx, dispatch, args, @@ -403,32 +392,24 @@ def schedule_dispatch(self, priority, dispatch, *args): def start(self): """register dispatchers for streams""" self.io_loop = ioloop.IOLoop.current() - self.msg_queue = PriorityQueue() + self.msg_queue = Queue() self.io_loop.add_callback(self.dispatch_queue) + self.control_stream.on_recv( + partial( + self.schedule_dispatch, + self.dispatch_control, + ), + copy=False, + ) - if self.control_stream: - self.control_stream.on_recv( - partial( - self.schedule_dispatch, - CONTROL_PRIORITY, - self.dispatch_control, - ), - copy=False, - ) - - for s in self.shell_streams: - if s is self.control_stream: - continue - s.on_recv( - partial( - self.schedule_dispatch, - SHELL_PRIORITY, - self.dispatch_shell, - s, - ), - copy=False, - ) + self.shell_stream.on_recv( + partial( + self.schedule_dispatch, + self.dispatch_shell, + ), + copy=False, + ) # publish idle status self._publish_status('starting') @@ -784,8 +765,7 @@ def _topic(self, topic): @gen.coroutine def _abort_queues(self): - for stream in self.shell_streams: - stream.flush() + self.shell_stream.flush() self._aborting = True def stop_aborting(f): @@ -909,4 +889,4 @@ def _at_shutdown(self): if self._shutdown_message is not None: self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown')) self.log.debug("%s", self._shutdown_message) - [ s.flush(zmq.POLLOUT) for s in self.shell_streams ] + self.shell_stream.flush(zmq.POLLOUT) From 3fe14eecf1c3aadf561db3c387466dc02a9352c4 Mon Sep 17 00:00:00 2001 From: Sylvain Corlay Date: Fri, 5 Feb 2021 16:48:17 +0100 Subject: [PATCH 02/12] Per-channel parent header --- ipykernel/ipkernel.py | 6 +++--- ipykernel/kernelbase.py | 28 ++++++++++++++-------------- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/ipykernel/ipkernel.py b/ipykernel/ipkernel.py index 661627352..62628971a 100644 --- a/ipykernel/ipkernel.py +++ b/ipykernel/ipkernel.py @@ -145,11 +145,11 @@ def start(self): self.shell.exit_now = False super(IPythonKernel, self).start() - def set_parent(self, ident, parent): + def set_parent(self, ident, parent, channel='shell'): """Overridden from parent to tell the display hook and output streams about the parent message. """ - super(IPythonKernel, self).set_parent(ident, parent) + super(IPythonKernel, self).set_parent(ident, parent, channel) self.shell.set_parent(parent) def init_metadata(self, parent): @@ -509,7 +509,7 @@ def do_apply(self, content, bufs, msg_id, reply_metadata): reply_content['engine_info'] = e_info self.send_response(self.iopub_socket, 'error', reply_content, - ident=self._topic('error')) + ident=self._topic('error'), channel='shell') self.log.info("Exception in apply request:\n%s", '\n'.join(reply_content['traceback'])) result_buf = [] reply_content['status'] = 'error' diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index 67dd939fe..fb74206da 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -90,8 +90,8 @@ def _default_ident(self): # track associations with current request _allow_stdin = Bool(False) - _parent_header = Dict() - _parent_ident = Any(b'') + _parent_header = Dict({'shell': {}, 'control': {}}) + _parent_ident = Dict({'shell': b'', 'control': b''}) # Time to sleep after flushing the stdout/err buffers in each execute # cycle. While this introduces a hard limit on the minimal latency of the # execute cycle, it helps prevent output synchronization problems for @@ -176,7 +176,7 @@ def dispatch_control(self, msg): self.log.debug("Control received: %s", msg) # Set the parent message for side effects. - self.set_parent(idents, msg) + self.set_parent(idents, msg, channel='control') self._publish_status('busy') header = msg['header'] @@ -222,7 +222,7 @@ def dispatch_shell(self, msg): return # Set the parent message for side effects. - self.set_parent(idents, msg) + self.set_parent(idents, msg, channel='shell') self._publish_status('busy') msg_type = msg['header']['msg_type'] @@ -440,11 +440,11 @@ def _publish_status(self, status, parent=None): self.session.send(self.iopub_socket, 'status', {'execution_state': status}, - parent=parent or self._parent_header, + parent=parent or self._parent_header['shell'], ident=self._topic('status'), ) - def set_parent(self, ident, parent): + def set_parent(self, ident, parent, channel='shell'): """Set the current parent_header Side effects (IOPub messages) and replies are associated with @@ -453,11 +453,11 @@ def set_parent(self, ident, parent): The parent identity is used to route input_request messages on the stdin channel. """ - self._parent_ident = ident - self._parent_header = parent + self._parent_ident[channel] = ident + self._parent_header[channel] = parent def send_response(self, stream, msg_or_type, content=None, ident=None, - buffers=None, track=False, header=None, metadata=None): + buffers=None, track=False, header=None, metadata=None, channel='shell'): """Send a response to the message we're currently processing. This accepts all the parameters of :meth:`jupyter_client.session.Session.send` @@ -466,7 +466,7 @@ def send_response(self, stream, msg_or_type, content=None, ident=None, This relies on :meth:`set_parent` having been called for the current message. """ - return self.session.send(stream, msg_or_type, content, self._parent_header, + return self.session.send(stream, msg_or_type, content, self._parent_header[channel], ident, buffers, track, header, metadata) def init_metadata(self, parent): @@ -809,8 +809,8 @@ def getpass(self, prompt='', stream=None): warnings.warn("The `stream` parameter of `getpass.getpass` will have no effect when using ipykernel", UserWarning, stacklevel=2) return self._input_request(prompt, - self._parent_ident, - self._parent_header, + self._parent_ident['shell'], + self._parent_header['shell'], password=True, ) @@ -826,8 +826,8 @@ def raw_input(self, prompt=''): "raw_input was called, but this frontend does not support input requests." ) return self._input_request(str(prompt), - self._parent_ident, - self._parent_header, + self._parent_ident['shell'], + self._parent_header['shell'], password=False, ) From fe0999999cb992d8088de6ca577932021993ff52 Mon Sep 17 00:00:00 2001 From: Sylvain Corlay Date: Fri, 5 Feb 2021 16:56:39 +0100 Subject: [PATCH 03/12] Add deprecated property shell_streams --- ipykernel/kernelbase.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index fb74206da..d61937556 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -11,6 +11,7 @@ import sys import time import uuid +import warnings try: # jupyter_client >= 5, use tz-aware now @@ -58,6 +59,15 @@ def _update_eventloop(self, change): session = Instance(Session, allow_none=True) profile_dir = Instance('IPython.core.profiledir.ProfileDir', allow_none=True) shell_stream = Instance(ZMQStream, allow_none=True) + + @property + def shell_streams(self): + warnings.warn( + 'Property shell_streams is deprecated in favor of shell_stream', + DeprecationWarning + ) + return [shell_stream] + control_stream = Instance(ZMQStream, allow_none=True) iopub_socket = Any() iopub_thread = Any() From c46dca1765f24c361461170f15b136a0312fe395 Mon Sep 17 00:00:00 2001 From: Sylvain Corlay Date: Fri, 5 Feb 2021 22:20:48 +0100 Subject: [PATCH 04/12] Make ControlThread daemonic --- ipykernel/control.py | 5 ++--- ipykernel/kernelapp.py | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/ipykernel/control.py b/ipykernel/control.py index d06d9ea34..9220dffba 100644 --- a/ipykernel/control.py +++ b/ipykernel/control.py @@ -10,9 +10,8 @@ class ControlThread(Thread): - def __init__(self, context): - Thread.__init__(self) - self.context = context + def __init__(self, **kwargs): + Thread.__init__(self, **kwargs) self.io_loop = IOLoop(make_current=False) def run(self): diff --git a/ipykernel/kernelapp.py b/ipykernel/kernelapp.py index 6bda6f29a..da8f5598d 100644 --- a/ipykernel/kernelapp.py +++ b/ipykernel/kernelapp.py @@ -300,7 +300,7 @@ def init_control(self, context): # see ipython/ipykernel#270 and zeromq/libzmq#2892 self.control_socket.router_handover = 1 - self.control_thread = ControlThread(self.control_socket) + self.control_thread = ControlThread(daemon=True) self.control_thread.start() def init_iopub(self, context): From deeca80266f0ae5087196348e1b5cd504c1f60f1 Mon Sep 17 00:00:00 2001 From: Sylvain Corlay Date: Sat, 6 Feb 2021 16:49:12 +0100 Subject: [PATCH 05/12] Attempt fixing shutdown --- ipykernel/control.py | 1 - ipykernel/kernelbase.py | 14 +++++++++++--- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/ipykernel/control.py b/ipykernel/control.py index 9220dffba..10853046d 100644 --- a/ipykernel/control.py +++ b/ipykernel/control.py @@ -1,4 +1,3 @@ - from threading import Thread import zmq if zmq.pyzmq_version_info() >= (17, 0): diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index d61937556..38b12e82d 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -672,9 +672,17 @@ def shutdown_request(self, stream, ident, parent): ) self._at_shutdown() - # call sys.exit after a short delay - loop = ioloop.IOLoop.current() - loop.add_timeout(time.time()+0.1, loop.stop) + + # Flush to ensure reply is sent before stopping loop + self.control_stream.flush(zmq.POLLOUT) + + self.log.debug('Stopping control ioloop') + control_io_loop = self.control_stream.io_loop + control_io_loop.add_callback(control_io_loop.stop) + + self.log.debug('Stopping shell ioloop') + shell_io_loop = self.shell_stream.io_loop + shell_io_loop.add_callback(shell_io_loop.stop) def do_shutdown(self, restart): """Override in subclasses to do things when the frontend shuts down the From 03bc0e3d592e2c0f960452e6d91ae32469997824 Mon Sep 17 00:00:00 2001 From: Sylvain Corlay Date: Mon, 8 Feb 2021 12:16:31 +0100 Subject: [PATCH 06/12] Fix control dispatch --- ipykernel/kernelbase.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index 38b12e82d..c21e3cc1e 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -405,13 +405,7 @@ def start(self): self.msg_queue = Queue() self.io_loop.add_callback(self.dispatch_queue) - self.control_stream.on_recv( - partial( - self.schedule_dispatch, - self.dispatch_control, - ), - copy=False, - ) + self.control_stream.on_recv(self.dispatch_control, copy=False) self.shell_stream.on_recv( partial( From fefec0d03b18a336355d1169aa6712c2a79f5077 Mon Sep 17 00:00:00 2001 From: Min RK Date: Wed, 10 Feb 2021 11:47:27 +0100 Subject: [PATCH 07/12] Use WeakSet to close all event pipes leaving an event_pipe in a thread that doesn't exit results in ctx.term() hanging --- ipykernel/iostream.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/ipykernel/iostream.py b/ipykernel/iostream.py index 4d1e6c95d..486e80e27 100644 --- a/ipykernel/iostream.py +++ b/ipykernel/iostream.py @@ -11,6 +11,7 @@ import sys import threading import warnings +from weakref import WeakSet from io import StringIO, TextIOBase import zmq @@ -66,6 +67,7 @@ def __init__(self, socket, pipe=False): self._setup_pipe_in() self._local = threading.local() self._events = deque() + self._event_pipes = WeakSet() self._setup_event_pipe() self.thread = threading.Thread(target=self._thread_main) self.thread.daemon = True @@ -100,6 +102,9 @@ def _event_pipe(self): event_pipe.linger = 0 event_pipe.connect(self._event_interface) self._local.event_pipe = event_pipe + # WeakSet so that event pipes will be closed by garbage collection + # when their threads are terminated + self._event_pipes.add(event_pipe) return event_pipe def _handle_event(self, msg): @@ -179,8 +184,11 @@ def stop(self): return self.io_loop.add_callback(self.io_loop.stop) self.thread.join() - if hasattr(self._local, 'event_pipe'): - self._local.event_pipe.close() + # close *all* event pipes, created in any thread + # event pipes can only be used from other threads while self.thread.is_alive() + # so after thread.join, this should be safe + for event_pipe in self._event_pipes: + event_pipe.close() def close(self): if self.closed: From d9500ce7ab91b95018de2a4a28e4bd105a7d98dc Mon Sep 17 00:00:00 2001 From: Sylvain Corlay Date: Wed, 10 Feb 2021 17:25:40 +0100 Subject: [PATCH 08/12] Remove flush --- ipykernel/kernelbase.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index c21e3cc1e..1ee2dcca0 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -366,12 +366,7 @@ def dispatch_queue(self): Ensures that only one message is processing at a time, even when the handler is async """ - while True: - # ensure control stream is flushed before processing shell messages - if self.control_stream: - self.control_stream.flush() - # receive the next message and handle it try: yield self.process_one() except Exception: @@ -667,9 +662,6 @@ def shutdown_request(self, stream, ident, parent): self._at_shutdown() - # Flush to ensure reply is sent before stopping loop - self.control_stream.flush(zmq.POLLOUT) - self.log.debug('Stopping control ioloop') control_io_loop = self.control_stream.io_loop control_io_loop.add_callback(control_io_loop.stop) From 132e8cf9185e2e8b0303fa3ece12a829811cce5b Mon Sep 17 00:00:00 2001 From: Sylvain Corlay Date: Thu, 11 Feb 2021 15:54:29 +0100 Subject: [PATCH 09/12] Start control thread after creating the control stream --- ipykernel/kernelapp.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/ipykernel/kernelapp.py b/ipykernel/kernelapp.py index da8f5598d..d99c7ab9f 100644 --- a/ipykernel/kernelapp.py +++ b/ipykernel/kernelapp.py @@ -289,6 +289,7 @@ def init_sockets(self): self.init_iopub(context) def init_control(self, context): + self.log.debug('IN INIT_CONTROL') self.control_socket = context.socket(zmq.ROUTER) self.control_socket.linger = 1000 self.control_port = self._bind_socket(self.control_socket, self.control_port) @@ -301,7 +302,6 @@ def init_control(self, context): self.control_socket.router_handover = 1 self.control_thread = ControlThread(daemon=True) - self.control_thread.start() def init_iopub(self, context): self.iopub_socket = context.socket(zmq.PUB) @@ -448,9 +448,10 @@ def init_signal(self): def init_kernel(self): """Create the Kernel object itself""" + self.log.debug('IN INIT_KERNEL') shell_stream = ZMQStream(self.shell_socket) control_stream = ZMQStream(self.control_socket, self.control_thread.io_loop) - + self.control_thread.start() kernel_factory = self.kernel_class.instance kernel = kernel_factory(parent=self, session=self.session, @@ -569,6 +570,7 @@ def init_pdb(self): def initialize(self, argv=None): self._init_asyncio_patch() super(IPKernelApp, self).initialize(argv) + self.log.debug('IN INITIALIZE') if self.subapp is not None: return @@ -605,6 +607,7 @@ def initialize(self, argv=None): sys.stderr.flush() def start(self): + self.log.debug('IN START') if self.subapp is not None: return self.subapp.start() if self.poller is not None: From 326ea899d7ac7e5252a052dcae433c01a022c5c4 Mon Sep 17 00:00:00 2001 From: Johan Mabille Date: Fri, 12 Feb 2021 11:33:28 +0100 Subject: [PATCH 10/12] Added stream parameter to _publish_status method --- ipykernel/kernelbase.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index 1ee2dcca0..4d26a73a7 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -187,7 +187,7 @@ def dispatch_control(self, msg): # Set the parent message for side effects. self.set_parent(idents, msg, channel='control') - self._publish_status('busy') + self._publish_status('busy', 'control') header = msg['header'] msg_type = header['msg_type'] @@ -203,7 +203,7 @@ def dispatch_control(self, msg): sys.stdout.flush() sys.stderr.flush() - self._publish_status('idle') + self._publish_status('idle', 'control') # flush to ensure reply is sent self.control_stream.flush(zmq.POLLOUT) @@ -233,14 +233,14 @@ def dispatch_shell(self, msg): # Set the parent message for side effects. self.set_parent(idents, msg, channel='shell') - self._publish_status('busy') + self._publish_status('busy', 'shell') msg_type = msg['header']['msg_type'] # Only abort execute requests if self._aborting and msg_type == 'execute_request': self._send_abort_reply(self.shell_stream, msg, idents) - self._publish_status('idle') + self._publish_status('idle', 'shell') # flush to ensure reply is sent before # handling the next request self.shell_stream.flush(zmq.POLLOUT) @@ -276,7 +276,7 @@ def dispatch_shell(self, msg): sys.stdout.flush() sys.stderr.flush() - self._publish_status('idle') + self._publish_status('idle', 'shell') # flush to ensure reply is sent before # handling the next request self.shell_stream.flush(zmq.POLLOUT) @@ -411,7 +411,7 @@ def start(self): ) # publish idle status - self._publish_status('starting') + self._publish_status('starting', 'shell') def record_ports(self, ports): @@ -434,12 +434,12 @@ def _publish_execute_input(self, code, parent, execution_count): parent=parent, ident=self._topic('execute_input') ) - def _publish_status(self, status, parent=None): + def _publish_status(self, status, channel, parent=None): """send status (busy/idle) on IOPub""" self.session.send(self.iopub_socket, 'status', {'execution_state': status}, - parent=parent or self._parent_header['shell'], + parent=parent or self._parent_header[channel], ident=self._topic('status'), ) From c03e3436b23314652ac43cf0790b2c9d6835fb35 Mon Sep 17 00:00:00 2001 From: Sylvain Corlay Date: Tue, 23 Feb 2021 22:49:52 +0100 Subject: [PATCH 11/12] Remove debug logs --- ipykernel/kernelapp.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/ipykernel/kernelapp.py b/ipykernel/kernelapp.py index d99c7ab9f..c7756fa2c 100644 --- a/ipykernel/kernelapp.py +++ b/ipykernel/kernelapp.py @@ -289,7 +289,6 @@ def init_sockets(self): self.init_iopub(context) def init_control(self, context): - self.log.debug('IN INIT_CONTROL') self.control_socket = context.socket(zmq.ROUTER) self.control_socket.linger = 1000 self.control_port = self._bind_socket(self.control_socket, self.control_port) @@ -448,7 +447,6 @@ def init_signal(self): def init_kernel(self): """Create the Kernel object itself""" - self.log.debug('IN INIT_KERNEL') shell_stream = ZMQStream(self.shell_socket) control_stream = ZMQStream(self.control_socket, self.control_thread.io_loop) self.control_thread.start() @@ -570,7 +568,6 @@ def init_pdb(self): def initialize(self, argv=None): self._init_asyncio_patch() super(IPKernelApp, self).initialize(argv) - self.log.debug('IN INITIALIZE') if self.subapp is not None: return @@ -607,7 +604,6 @@ def initialize(self, argv=None): sys.stderr.flush() def start(self): - self.log.debug('IN START') if self.subapp is not None: return self.subapp.start() if self.poller is not None: From 208ad24669be43610da964b8c5ab074a3b14c137 Mon Sep 17 00:00:00 2001 From: Sylvain Corlay Date: Wed, 24 Feb 2021 10:03:37 +0100 Subject: [PATCH 12/12] Change version to 6.0.0dev0 --- ipykernel/_version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ipykernel/_version.py b/ipykernel/_version.py index 7b5660bea..b4c4984ae 100644 --- a/ipykernel/_version.py +++ b/ipykernel/_version.py @@ -1,4 +1,4 @@ -version_info = (5, 6, 0, 'dev0') +version_info = (6, 0, 0, 'dev0') __version__ = '.'.join(map(str, version_info[:3])) # pep440 is annoying, beta/alpha/rc should _not_ have dots or pip/setuptools