From c5ccb7dc7ba320bb834629adc4c34f246b6a6f8a Mon Sep 17 00:00:00 2001 From: David Brochart Date: Tue, 14 Jun 2022 08:35:16 +0200 Subject: [PATCH 1/9] Support subshells --- ipykernel/kernelbase.py | 66 +++++++++++++++++++++++++++-------------- ipykernel/shell.py | 31 +++++++++++++++++++ 2 files changed, 75 insertions(+), 22 deletions(-) create mode 100644 ipykernel/shell.py diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index 30ab173f6..ca2115066 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -57,6 +57,7 @@ from ipykernel.jsonutil import json_clean from ._version import kernel_protocol_version +from .shell import ShellThread def _accepts_cell_id(meth): @@ -245,6 +246,7 @@ def _parent_header(self): "kernel_info_request", "connect_request", "shutdown_request", + "subshell_request", "is_complete_request", "interrupt_request", # deprecated: @@ -270,6 +272,9 @@ def __init__(self, **kwargs): self.control_handlers[msg_type] = getattr(self, msg_type) self.control_queue: Queue[t.Any] = Queue() + mainshell_thread = ShellThread(shell_id="main", daemon=True) + self.shell_threads: dict[str, ShellThread] = dict(main=mainshell_thread) + mainshell_thread.start() def dispatch_control(self, msg): self.control_queue.put_nowait(msg) @@ -367,9 +372,11 @@ async def dispatch_shell(self, msg): self.log.error("Invalid Message", exc_info=True) return - # Set the parent message for side effects. - self.set_parent(idents, msg, channel="shell") - self._publish_status("busy", "shell") + shell_id = msg.get("metadata", {}).get("shell_id", "main") + if shell_id == "main": + # Set the parent message for side effects. + self.set_parent(idents, msg, channel="shell") + self._publish_status("busy", "shell") msg_type = msg["header"]["msg_type"] @@ -396,28 +403,33 @@ async def dispatch_shell(self, msg): self.log.warning("Unknown message type: %r", msg_type) else: self.log.debug("%s: %s", msg_type, msg) + asyncio.run_coroutine_threadsafe(self.call_handler(shell_id, handler, idents, msg), self.shell_threads[shell_id].io_loop.asyncio_loop) + + async def call_handler(self, shell_id, handler, idents, msg): + try: + self.pre_handler_hook() + except Exception: + self.log.debug("Unable to signal in pre_handler_hook:", exc_info=True) + try: + result = handler(self.shell_stream, idents, msg) + if inspect.isawaitable(result): + await result + except Exception: + self.log.error("Exception in message handler:", exc_info=True) + except KeyboardInterrupt: + # Ctrl-c shouldn't crash the kernel here. + self.log.error("KeyboardInterrupt caught in kernel.") + finally: try: - self.pre_handler_hook() - except Exception: - self.log.debug("Unable to signal in pre_handler_hook:", exc_info=True) - try: - result = handler(self.shell_stream, idents, msg) - if inspect.isawaitable(result): - await result + self.post_handler_hook() except Exception: - self.log.error("Exception in message handler:", exc_info=True) - except KeyboardInterrupt: - # Ctrl-c shouldn't crash the kernel here. - self.log.error("KeyboardInterrupt caught in kernel.") - finally: - try: - self.post_handler_hook() - except Exception: - self.log.debug("Unable to signal in post_handler_hook:", exc_info=True) + self.log.debug("Unable to signal in post_handler_hook:", exc_info=True) + + if shell_id == "main": + sys.stdout.flush() + sys.stderr.flush() + self._publish_status("idle", "shell") - sys.stdout.flush() - sys.stderr.flush() - self._publish_status("idle", "shell") # flush to ensure reply is sent before # handling the next request self.shell_stream.flush(zmq.POLLOUT) @@ -924,6 +936,16 @@ async def shutdown_request(self, stream, ident, parent): shell_io_loop = self.shell_stream.io_loop shell_io_loop.add_callback(shell_io_loop.stop) + async def subshell_request(self, stream, ident, parent): + shell_id = parent.get("content", {}).get("shell_id") + if shell_id is None: + shell_id = str(uuid.uuid4()) + self.log.debug(f"Creating new shell with ID: {shell_id}") + self.shell_threads[shell_id] = subshell_thread = ShellThread(shell_id=shell_id, daemon=True) + subshell_thread.start() + content = dict(shell_id=shell_id) + self.session.send(stream, "subshell_reply", content, parent, ident=ident) + def do_shutdown(self, restart): """Override in subclasses to do things when the frontend shuts down the kernel. diff --git a/ipykernel/shell.py b/ipykernel/shell.py new file mode 100644 index 000000000..d5be0070b --- /dev/null +++ b/ipykernel/shell.py @@ -0,0 +1,31 @@ +from threading import Thread + +import zmq + +if zmq.pyzmq_version_info() >= (17, 0): + from tornado.ioloop import IOLoop +else: + # deprecated since pyzmq 17 + from zmq.eventloop.ioloop import IOLoop + + +class ShellThread(Thread): + def __init__(self, **kwargs): + self._shell_id = kwargs.pop("shell_id") + Thread.__init__(self, name=f"Shell_{self._shell_id}", **kwargs) + self.io_loop = IOLoop(make_current=False) + + def run(self): + self.name = f"Shell_{self._shell_id}" + self.io_loop.make_current() + try: + self.io_loop.start() + finally: + self.io_loop.close() + + def stop(self): + """Stop the thread. + + This method is threadsafe. + """ + self.io_loop.add_callback(self.io_loop.stop) From 4cec1933628300abdca283c80748e2b09d658d24 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 14 Jun 2022 07:07:20 +0000 Subject: [PATCH 2/9] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- ipykernel/kernelbase.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index ca2115066..f2de8674d 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -403,7 +403,10 @@ async def dispatch_shell(self, msg): self.log.warning("Unknown message type: %r", msg_type) else: self.log.debug("%s: %s", msg_type, msg) - asyncio.run_coroutine_threadsafe(self.call_handler(shell_id, handler, idents, msg), self.shell_threads[shell_id].io_loop.asyncio_loop) + asyncio.run_coroutine_threadsafe( + self.call_handler(shell_id, handler, idents, msg), + self.shell_threads[shell_id].io_loop.asyncio_loop, + ) async def call_handler(self, shell_id, handler, idents, msg): try: From 0bde221b630a76ac758a41c35c5bf964995330ec Mon Sep 17 00:00:00 2001 From: David Brochart Date: Tue, 14 Jun 2022 17:39:02 +0200 Subject: [PATCH 3/9] Handle main shell requests in main thread --- ipykernel/kernelapp.py | 8 +++- ipykernel/kernelbase.py | 81 ++++++------------------------------- ipykernel/shell.py | 90 ++++++++++++++++++++++++++++++++++++----- pyproject.toml | 1 + 4 files changed, 100 insertions(+), 80 deletions(-) diff --git a/ipykernel/kernelapp.py b/ipykernel/kernelapp.py index 8e6084671..bed73f002 100644 --- a/ipykernel/kernelapp.py +++ b/ipykernel/kernelapp.py @@ -43,6 +43,7 @@ from traitlets.utils.importstring import import_item from zmq.eventloop.zmqstream import ZMQStream +from .shell import ShellThread from .control import ControlThread from .heartbeat import Heartbeat @@ -137,6 +138,7 @@ class IPKernelApp(BaseIPythonApplication, InteractiveShellApp, ConnectionFileMix stdin_socket = Any() iopub_socket = Any() iopub_thread = Any() + shell_thread = Any() control_thread = Any() _ports = Dict() @@ -318,6 +320,8 @@ def init_sockets(self): # see ipython/ipykernel#270 and zeromq/libzmq#2892 self.shell_socket.router_handover = self.stdin_socket.router_handover = 1 + self.shell_thread = ShellThread() + self.init_control(context) self.init_iopub(context) @@ -519,9 +523,10 @@ def init_signal(self): def init_kernel(self): """Create the Kernel object itself""" - shell_stream = ZMQStream(self.shell_socket) + shell_stream = ZMQStream(self.shell_socket, self.shell_thread.io_loop) control_stream = ZMQStream(self.control_socket, self.control_thread.io_loop) debugpy_stream = ZMQStream(self.debugpy_socket, self.control_thread.io_loop) + self.shell_thread.start() self.control_thread.start() kernel_factory = self.kernel_class.instance @@ -532,6 +537,7 @@ def init_kernel(self): debugpy_stream=debugpy_stream, debug_shell_socket=self.debug_shell_socket, shell_stream=shell_stream, + shell_thread=self.shell_thread, control_thread=self.control_thread, iopub_thread=self.iopub_thread, iopub_socket=self.iopub_socket, diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index f2de8674d..e335167d3 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -32,6 +32,7 @@ # jupyter_client < 5, use local now() now = datetime.now +import janus import psutil import zmq from IPython.core.error import StdinNotImplementedError @@ -57,7 +58,7 @@ from ipykernel.jsonutil import json_clean from ._version import kernel_protocol_version -from .shell import ShellThread +from .shell import SubshellThread, handle_messages def _accepts_cell_id(meth): @@ -134,6 +135,7 @@ def _shell_streams_changed(self, change): debug_shell_socket = Any() + shell_thread = Any() control_thread = Any() iopub_socket = Any() iopub_thread = Any() @@ -272,9 +274,11 @@ def __init__(self, **kwargs): self.control_handlers[msg_type] = getattr(self, msg_type) self.control_queue: Queue[t.Any] = Queue() - mainshell_thread = ShellThread(shell_id="main", daemon=True) - self.shell_threads: dict[str, ShellThread] = dict(main=mainshell_thread) - mainshell_thread.start() + self.subshell_threads: dict[str, SubshellThread] = dict() + + async def handle_main_shell(self): + self.shell_queues: dict[str, janus.Queue] = dict(main=janus.Queue()) + await handle_messages(self.shell_queues["main"].async_q, self) def dispatch_control(self, msg): self.control_queue.put_nowait(msg) @@ -373,69 +377,8 @@ async def dispatch_shell(self, msg): return shell_id = msg.get("metadata", {}).get("shell_id", "main") - if shell_id == "main": - # Set the parent message for side effects. - self.set_parent(idents, msg, channel="shell") - self._publish_status("busy", "shell") - - msg_type = msg["header"]["msg_type"] - - # Only abort execute requests - if self._aborting and msg_type == "execute_request": - self._send_abort_reply(self.shell_stream, msg, idents) - self._publish_status("idle", "shell") - # flush to ensure reply is sent before - # handling the next request - self.shell_stream.flush(zmq.POLLOUT) - return - - # Print some info about this message and leave a '--->' marker, so it's - # easier to trace visually the message chain when debugging. Each - # handler prints its message at the end. - self.log.debug("\n*** MESSAGE TYPE:%s***", msg_type) - self.log.debug(" Content: %s\n --->\n ", msg["content"]) - - if not self.should_handle(self.shell_stream, msg, idents): - return - - handler = self.shell_handlers.get(msg_type, None) - if handler is None: - self.log.warning("Unknown message type: %r", msg_type) - else: - self.log.debug("%s: %s", msg_type, msg) - asyncio.run_coroutine_threadsafe( - self.call_handler(shell_id, handler, idents, msg), - self.shell_threads[shell_id].io_loop.asyncio_loop, - ) - - async def call_handler(self, shell_id, handler, idents, msg): - try: - self.pre_handler_hook() - except Exception: - self.log.debug("Unable to signal in pre_handler_hook:", exc_info=True) - try: - result = handler(self.shell_stream, idents, msg) - if inspect.isawaitable(result): - await result - except Exception: - self.log.error("Exception in message handler:", exc_info=True) - except KeyboardInterrupt: - # Ctrl-c shouldn't crash the kernel here. - self.log.error("KeyboardInterrupt caught in kernel.") - finally: - try: - self.post_handler_hook() - except Exception: - self.log.debug("Unable to signal in post_handler_hook:", exc_info=True) - - if shell_id == "main": - sys.stdout.flush() - sys.stderr.flush() - self._publish_status("idle", "shell") - - # flush to ensure reply is sent before - # handling the next request - self.shell_stream.flush(zmq.POLLOUT) + is_main = shell_id == "main" + self.shell_queues[shell_id].sync_q.put((is_main, idents, msg)) def pre_handler_hook(self): """Hook to execute before calling message handler""" @@ -553,6 +496,7 @@ def start(self): """register dispatchers for streams""" self.io_loop = ioloop.IOLoop.current() self.msg_queue: Queue[t.Any] = Queue() + self.io_loop.add_callback(self.handle_main_shell) self.io_loop.add_callback(self.dispatch_queue) self.control_stream.on_recv(self.dispatch_control, copy=False) @@ -944,7 +888,8 @@ async def subshell_request(self, stream, ident, parent): if shell_id is None: shell_id = str(uuid.uuid4()) self.log.debug(f"Creating new shell with ID: {shell_id}") - self.shell_threads[shell_id] = subshell_thread = ShellThread(shell_id=shell_id, daemon=True) + self.shell_queues[shell_id] = subshell_queue = janus.Queue() + self.subshell_threads[shell_id] = subshell_thread = SubshellThread(shell_id, subshell_queue, self) subshell_thread.start() content = dict(shell_id=shell_id) self.session.send(stream, "subshell_reply", content, parent, ident=ident) diff --git a/ipykernel/shell.py b/ipykernel/shell.py index d5be0070b..15a8c7ef2 100644 --- a/ipykernel/shell.py +++ b/ipykernel/shell.py @@ -1,3 +1,6 @@ +import asyncio +import inspect +import sys from threading import Thread import zmq @@ -9,23 +12,88 @@ from zmq.eventloop.ioloop import IOLoop +async def handle_messages(msg_queue, kernel): + while True: + is_main, idents, msg = await msg_queue.get() + if is_main: + # Set the parent message for side effects. + kernel.set_parent(idents, msg, channel="shell") + kernel._publish_status("busy", "shell") + + msg_type = msg["header"]["msg_type"] + + # Only abort execute requests + if kernel._aborting and msg_type == "execute_request": + kernel._send_abort_reply(kernel.shell_stream, msg, idents) + kernel._publish_status("idle", "shell") + # flush to ensure reply is sent before + # handling the next request + kernel.shell_stream.flush(zmq.POLLOUT) + return + + # Print some info about this message and leave a '--->' marker, so it's + # easier to trace visually the message chain when debugging. Each + # handler prints its message at the end. + kernel.log.debug("\n*** MESSAGE TYPE:%s***", msg_type) + kernel.log.debug(" Content: %s\n --->\n ", msg["content"]) + + if not kernel.should_handle(kernel.shell_stream, msg, idents): + return + + handler = kernel.shell_handlers.get(msg_type, None) + if handler is None: + kernel.log.warning("Unknown message type: %r", msg_type) + else: + kernel.log.debug("%s: %s", msg_type, msg) + try: + kernel.pre_handler_hook() + except Exception: + kernel.log.debug("Unable to signal in pre_handler_hook:", exc_info=True) + try: + result = handler(kernel.shell_stream, idents, msg) + if inspect.isawaitable(result): + await result + except Exception: + kernel.log.error("Exception in message handler:", exc_info=True) + except KeyboardInterrupt: + # Ctrl-c shouldn't crash the kernel here. + kernel.log.error("KeyboardInterrupt caught in kernel.") + finally: + try: + kernel.post_handler_hook() + except Exception: + kernel.log.debug("Unable to signal in post_handler_hook:", exc_info=True) + + if is_main: + sys.stdout.flush() + sys.stderr.flush() + kernel._publish_status("idle", "shell") + + # flush to ensure reply is sent before + # handling the next request + kernel.shell_stream.flush(zmq.POLLOUT) + + +class SubshellThread(Thread): + def __init__(self, shell_id, msg_queue, kernel): + self.msg_queue = msg_queue + self.kernel = kernel + super().__init__(name=f"Subshell_{shell_id}", daemon=True) + + def run(self): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + asyncio.run(handle_messages(self.msg_queue, self.kernel)) + + class ShellThread(Thread): - def __init__(self, **kwargs): - self._shell_id = kwargs.pop("shell_id") - Thread.__init__(self, name=f"Shell_{self._shell_id}", **kwargs) + def __init__(self): + super().__init__(name="Shell", daemon=True) self.io_loop = IOLoop(make_current=False) def run(self): - self.name = f"Shell_{self._shell_id}" self.io_loop.make_current() try: self.io_loop.start() finally: self.io_loop.close() - - def stop(self): - """Stop the thread. - - This method is threadsafe. - """ - self.io_loop.add_callback(self.io_loop.stop) diff --git a/pyproject.toml b/pyproject.toml index 54cd0ba2b..253616c43 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,6 +35,7 @@ dependencies = [ "psutil", "nest_asyncio", "packaging", + "janus>=1.0.0", ] [project.optional-dependencies] From 094c07c5b074168f1d7a2e1fe1081ef4d6e00d05 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 14 Jun 2022 15:39:33 +0000 Subject: [PATCH 4/9] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- ipykernel/kernelapp.py | 2 +- ipykernel/kernelbase.py | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/ipykernel/kernelapp.py b/ipykernel/kernelapp.py index bed73f002..ab8532058 100644 --- a/ipykernel/kernelapp.py +++ b/ipykernel/kernelapp.py @@ -43,7 +43,6 @@ from traitlets.utils.importstring import import_item from zmq.eventloop.zmqstream import ZMQStream -from .shell import ShellThread from .control import ControlThread from .heartbeat import Heartbeat @@ -51,6 +50,7 @@ from .iostream import IOPubThread from .ipkernel import IPythonKernel from .parentpoller import ParentPollerUnix, ParentPollerWindows +from .shell import ShellThread from .zmqshell import ZMQInteractiveShell # ----------------------------------------------------------------------------- diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index e335167d3..a901fa1cd 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -889,7 +889,9 @@ async def subshell_request(self, stream, ident, parent): shell_id = str(uuid.uuid4()) self.log.debug(f"Creating new shell with ID: {shell_id}") self.shell_queues[shell_id] = subshell_queue = janus.Queue() - self.subshell_threads[shell_id] = subshell_thread = SubshellThread(shell_id, subshell_queue, self) + self.subshell_threads[shell_id] = subshell_thread = SubshellThread( + shell_id, subshell_queue, self + ) subshell_thread.start() content = dict(shell_id=shell_id) self.session.send(stream, "subshell_reply", content, parent, ident=ident) From a64cf72868f9053345ab19149e5e22222c2889bb Mon Sep 17 00:00:00 2001 From: David Brochart Date: Wed, 15 Jun 2022 11:20:44 +0200 Subject: [PATCH 5/9] Add subshell test --- ipykernel/kernelbase.py | 73 +++++++++++++++++--------------- ipykernel/shell.py | 10 +++-- ipykernel/tests/test_subshell.py | 60 ++++++++++++++++++++++++++ 3 files changed, 105 insertions(+), 38 deletions(-) create mode 100644 ipykernel/tests/test_subshell.py diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index a901fa1cd..84f63bd35 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -9,6 +9,7 @@ import itertools import logging import os +import queue import socket import sys import time @@ -277,8 +278,9 @@ def __init__(self, **kwargs): self.subshell_threads: dict[str, SubshellThread] = dict() async def handle_main_shell(self): - self.shell_queues: dict[str, janus.Queue] = dict(main=janus.Queue()) - await handle_messages(self.shell_queues["main"].async_q, self) + self.msg_queue: janus.Queue[t.Any] = janus.Queue() + self.shell_queues: dict[str, t.Union[queue.Queue, janus.Queue]] = dict(main=janus.Queue()) + await handle_messages(self.shell_queues["main"].async_q, self, True) def dispatch_control(self, msg): self.control_queue.put_nowait(msg) @@ -315,15 +317,8 @@ def _flush(): control_loop.add_callback(_flush) return awaitable_future - async def process_control(self, msg): + async def process_control(self, idents, msg): """dispatch control requests""" - idents, msg = self.session.feed_identities(msg, copy=False) - try: - msg = self.session.deserialize(msg, content=True, copy=False) - except Exception: - self.log.error("Invalid Control Message", exc_info=True) - return - self.log.debug("Control received: %s", msg) # Set the parent message for side effects. @@ -363,22 +358,17 @@ def should_handle(self, stream, msg, idents): return False return True - async def dispatch_shell(self, msg): + async def dispatch_shell(self, idents, msg): """dispatch shell requests""" # flush control queue before handling shell requests await self._flush_control_queue() - idents, msg = self.session.feed_identities(msg, copy=False) - try: - msg = self.session.deserialize(msg, content=True, copy=False) - except Exception: - self.log.error("Invalid Message", exc_info=True) - return - shell_id = msg.get("metadata", {}).get("shell_id", "main") - is_main = shell_id == "main" - self.shell_queues[shell_id].sync_q.put((is_main, idents, msg)) + shell_queue = self.shell_queues[shell_id] + if shell_id == "main": + shell_queue = shell_queue.sync_q + shell_queue.put((idents, msg)) def pre_handler_hook(self): """Hook to execute before calling message handler""" @@ -448,13 +438,13 @@ async def process_one(self, wait=True): Returns None if no message was handled. """ if wait: - t, dispatch, args = await self.msg_queue.get() + t, dispatch, idents, msg = await self.msg_queue.async_q.get() else: try: - t, dispatch, args = self.msg_queue.get_nowait() + t, dispatch, idents, msg = self.msg_queue.async_q.get_nowait() except (asyncio.QueueEmpty, QueueEmpty): return None - await dispatch(*args) + await dispatch(idents, msg) async def dispatch_queue(self): """Coroutine to preserve order of message handling @@ -478,24 +468,37 @@ async def dispatch_queue(self): def _message_counter_default(self): return itertools.count() - def schedule_dispatch(self, dispatch, *args): + def schedule_dispatch(self, dispatch, msg): """schedule a message for dispatch""" idx = next(self._message_counter) - self.msg_queue.put_nowait( - ( - idx, - dispatch, - args, + idents, msg = self.session.feed_identities(msg, copy=False) + try: + msg = self.session.deserialize(msg, content=True, copy=False) + except Exception: + self.log.error("Invalid Message", exc_info=True) + return + + shell_id = msg.get("metadata", {}).get("shell_id", "main") + + if shell_id == "main": + msg_queue = self.msg_queue.sync_q + self.msg_queue.sync_q.put( + ( + idx, + dispatch, + idents, + msg, + ) ) - ) - # ensure the eventloop wakes up - self.io_loop.add_callback(lambda: None) + # ensure the eventloop wakes up + self.io_loop.add_callback(lambda: None) + else: + self.shell_queues[shell_id].put((idents, msg)) def start(self): """register dispatchers for streams""" self.io_loop = ioloop.IOLoop.current() - self.msg_queue: Queue[t.Any] = Queue() self.io_loop.add_callback(self.handle_main_shell) self.io_loop.add_callback(self.dispatch_queue) @@ -658,6 +661,8 @@ async def execute_request(self, stream, ident, parent): stop_on_error = content.get("stop_on_error", True) metadata = self.init_metadata(parent) + if "shell_id" in parent["metadata"]: + metadata["shell_id"] = parent["metadata"]["shell_id"] # Re-broadcast our input for the benefit of listening clients, and # start computing output @@ -888,7 +893,7 @@ async def subshell_request(self, stream, ident, parent): if shell_id is None: shell_id = str(uuid.uuid4()) self.log.debug(f"Creating new shell with ID: {shell_id}") - self.shell_queues[shell_id] = subshell_queue = janus.Queue() + self.shell_queues[shell_id] = subshell_queue = queue.Queue() self.subshell_threads[shell_id] = subshell_thread = SubshellThread( shell_id, subshell_queue, self ) diff --git a/ipykernel/shell.py b/ipykernel/shell.py index 15a8c7ef2..e1b0f94bc 100644 --- a/ipykernel/shell.py +++ b/ipykernel/shell.py @@ -12,9 +12,12 @@ from zmq.eventloop.ioloop import IOLoop -async def handle_messages(msg_queue, kernel): +async def handle_messages(msg_queue, kernel, is_main): while True: - is_main, idents, msg = await msg_queue.get() + res = msg_queue.get() + if is_main: + res = await res + idents, msg = res if is_main: # Set the parent message for side effects. kernel.set_parent(idents, msg, channel="shell") @@ -82,8 +85,7 @@ def __init__(self, shell_id, msg_queue, kernel): def run(self): loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - asyncio.run(handle_messages(self.msg_queue, self.kernel)) + loop.run_until_complete(handle_messages(self.msg_queue, self.kernel, False)) class ShellThread(Thread): diff --git a/ipykernel/tests/test_subshell.py b/ipykernel/tests/test_subshell.py new file mode 100644 index 000000000..7b0e427ee --- /dev/null +++ b/ipykernel/tests/test_subshell.py @@ -0,0 +1,60 @@ +import time +from textwrap import dedent + +from jupyter_client.manager import start_new_kernel + + +def test_subshell(): + km, kc = start_new_kernel() + + shell_id = "foo" + content = dict(shell_id=shell_id) + msg = kc.session.msg("subshell_request", content) + kc.shell_channel.send(msg) + msg = kc.get_shell_msg() + assert msg["content"]["shell_id"] == shell_id + + def get_content(t): + code = dedent( + f""" + import time + + time.sleep({t}) + """ + ) + content = dict( + code=code, + silent=False, + ) + return content + + # launch execution in main shell + t0 = time.time() + msg0 = kc.session.msg("execute_request", get_content(0.3)) + kc.shell_channel.send(msg0) + + time.sleep(0.1) + + # launch execution in subshell while main shell is executing + t1 = time.time() + metadata = dict(shell_id=shell_id) + msg1 = kc.session.msg("execute_request", get_content(0.1), metadata=metadata) + kc.shell_channel.send(msg1) + + msg_cnt = 0 + while True: + msg = kc.get_shell_msg() + t = time.time() + if msg["parent_header"]["msg_id"] == msg0["msg_id"]: + # main shell execution should take ~0.3s + assert 0.3 < t - t0 < 0.4 + msg_cnt += 1 + elif msg["parent_header"]["msg_id"] == msg1["msg_id"]: + # subshell execution should take ~0.1s if done in parallel + assert 0.1 < t - t1 < 0.2 + msg_cnt += 1 + if msg_cnt == 2: + break + + kc.stop_channels() + km.shutdown_kernel() From f924f8a671528d9dd25c7016dca9015c7b288355 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Wed, 15 Jun 2022 12:05:33 +0200 Subject: [PATCH 6/9] Fix control channel --- ipykernel/kernelbase.py | 12 +++++++++--- ipykernel/tests/test_subshell.py | 4 ++-- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index 84f63bd35..31371e86c 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -317,8 +317,15 @@ def _flush(): control_loop.add_callback(_flush) return awaitable_future - async def process_control(self, idents, msg): + async def process_control(self, msg): """dispatch control requests""" + idents, msg = self.session.feed_identities(msg, copy=False) + try: + msg = self.session.deserialize(msg, content=True, copy=False) + except Exception: + self.log.error("Invalid Control Message", exc_info=True) + return + self.log.debug("Control received: %s", msg) # Set the parent message for side effects. @@ -476,13 +483,12 @@ def schedule_dispatch(self, dispatch, msg): try: msg = self.session.deserialize(msg, content=True, copy=False) except Exception: - self.log.error("Invalid Message", exc_info=True) + self.log.error("Invalid Shell Message", exc_info=True) return shell_id = msg.get("metadata", {}).get("shell_id", "main") if shell_id == "main": - msg_queue = self.msg_queue.sync_q self.msg_queue.sync_q.put( ( idx, diff --git a/ipykernel/tests/test_subshell.py b/ipykernel/tests/test_subshell.py index 7b0e427ee..b033ec0bf 100644 --- a/ipykernel/tests/test_subshell.py +++ b/ipykernel/tests/test_subshell.py @@ -10,8 +10,8 @@ def test_subshell(): shell_id = "foo" content = dict(shell_id=shell_id) msg = kc.session.msg("subshell_request", content) - kc.shell_channel.send(msg) - msg = kc.get_shell_msg() + kc.control_channel.send(msg) + msg = kc.get_control_msg() assert msg["content"]["shell_id"] == shell_id def get_content(t): From 7349b470cda01dffb790b783436a73e9e136f770 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Wed, 15 Jun 2022 12:52:36 +0200 Subject: [PATCH 7/9] Fix shutdown --- ipykernel/kernelbase.py | 34 +++++++++++++++++----------------- ipykernel/shell.py | 4 ++-- 2 files changed, 19 insertions(+), 19 deletions(-) diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index 31371e86c..228fb7311 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -275,12 +275,13 @@ def __init__(self, **kwargs): self.control_handlers[msg_type] = getattr(self, msg_type) self.control_queue: Queue[t.Any] = Queue() - self.subshell_threads: dict[str, SubshellThread] = dict() + self.subshell_threads: dict[str, SubshellThread] = {} async def handle_main_shell(self): self.msg_queue: janus.Queue[t.Any] = janus.Queue() - self.shell_queues: dict[str, t.Union[queue.Queue, janus.Queue]] = dict(main=janus.Queue()) - await handle_messages(self.shell_queues["main"].async_q, self, True) + self.subshell_queues: dict[str, queue.Queue[t.Any]] = {} + self.main_shell_queue: janus.Queue[t.Any] = janus.Queue() + await handle_messages(self.main_shell_queue.async_q, self, True) def dispatch_control(self, msg): self.control_queue.put_nowait(msg) @@ -371,11 +372,11 @@ async def dispatch_shell(self, idents, msg): # flush control queue before handling shell requests await self._flush_control_queue() - shell_id = msg.get("metadata", {}).get("shell_id", "main") - shell_queue = self.shell_queues[shell_id] - if shell_id == "main": - shell_queue = shell_queue.sync_q - shell_queue.put((idents, msg)) + shell_id = msg.get("metadata", {}).get("shell_id") + if shell_id: + self.subshell_queues[shell_id].put((idents, msg)) + else: + self.main_shell_queue.sync_q.put((idents, msg)) def pre_handler_hook(self): """Hook to execute before calling message handler""" @@ -486,9 +487,11 @@ def schedule_dispatch(self, dispatch, msg): self.log.error("Invalid Shell Message", exc_info=True) return - shell_id = msg.get("metadata", {}).get("shell_id", "main") + shell_id = msg.get("metadata", {}).get("shell_id") - if shell_id == "main": + if shell_id: + self.subshell_queues[shell_id].put((idents, msg)) + else: self.msg_queue.sync_q.put( ( idx, @@ -499,8 +502,6 @@ def schedule_dispatch(self, dispatch, msg): ) # ensure the eventloop wakes up self.io_loop.add_callback(lambda: None) - else: - self.shell_queues[shell_id].put((idents, msg)) def start(self): """register dispatchers for streams""" @@ -890,18 +891,17 @@ async def shutdown_request(self, stream, ident, parent): control_io_loop = self.control_stream.io_loop control_io_loop.add_callback(control_io_loop.stop) - self.log.debug("Stopping shell ioloop") - shell_io_loop = self.shell_stream.io_loop - shell_io_loop.add_callback(shell_io_loop.stop) + self.log.debug("Stopping main ioloop") + self.io_loop.add_callback(self.io_loop.stop) async def subshell_request(self, stream, ident, parent): shell_id = parent.get("content", {}).get("shell_id") if shell_id is None: shell_id = str(uuid.uuid4()) self.log.debug(f"Creating new shell with ID: {shell_id}") - self.shell_queues[shell_id] = subshell_queue = queue.Queue() + self.subshell_queues[shell_id] = queue.Queue() self.subshell_threads[shell_id] = subshell_thread = SubshellThread( - shell_id, subshell_queue, self + shell_id, self.subshell_queues[shell_id], self ) subshell_thread.start() content = dict(shell_id=shell_id) diff --git a/ipykernel/shell.py b/ipykernel/shell.py index e1b0f94bc..0520e9af0 100644 --- a/ipykernel/shell.py +++ b/ipykernel/shell.py @@ -32,7 +32,7 @@ async def handle_messages(msg_queue, kernel, is_main): # flush to ensure reply is sent before # handling the next request kernel.shell_stream.flush(zmq.POLLOUT) - return + continue # Print some info about this message and leave a '--->' marker, so it's # easier to trace visually the message chain when debugging. Each @@ -41,7 +41,7 @@ async def handle_messages(msg_queue, kernel, is_main): kernel.log.debug(" Content: %s\n --->\n ", msg["content"]) if not kernel.should_handle(kernel.shell_stream, msg, idents): - return + continue handler = kernel.shell_handlers.get(msg_type, None) if handler is None: From 93d3dd04f97963d84651988f6622b88e0ffd5440 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Wed, 15 Jun 2022 16:29:52 +0200 Subject: [PATCH 8/9] Fix execution abort --- ipykernel/kernelbase.py | 36 +++++++++++++++++++------------- ipykernel/tests/test_subshell.py | 27 ++++++++++++------------ 2 files changed, 34 insertions(+), 29 deletions(-) diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index 228fb7311..8ba73357d 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -366,8 +366,9 @@ def should_handle(self, stream, msg, idents): return False return True - async def dispatch_shell(self, idents, msg): + async def dispatch_shell(self, *args): """dispatch shell requests""" + idents, msg = args # flush control queue before handling shell requests await self._flush_control_queue() @@ -446,13 +447,13 @@ async def process_one(self, wait=True): Returns None if no message was handled. """ if wait: - t, dispatch, idents, msg = await self.msg_queue.async_q.get() + t, dispatch, args = await self.msg_queue.async_q.get() else: try: - t, dispatch, idents, msg = self.msg_queue.async_q.get_nowait() + t, dispatch, args = self.msg_queue.async_q.get_nowait() except (asyncio.QueueEmpty, QueueEmpty): return None - await dispatch(idents, msg) + await dispatch(*args) async def dispatch_queue(self): """Coroutine to preserve order of message handling @@ -476,28 +477,33 @@ async def dispatch_queue(self): def _message_counter_default(self): return itertools.count() - def schedule_dispatch(self, dispatch, msg): + def schedule_dispatch(self, dispatch, *args): """schedule a message for dispatch""" idx = next(self._message_counter) - - idents, msg = self.session.feed_identities(msg, copy=False) - try: - msg = self.session.deserialize(msg, content=True, copy=False) - except Exception: - self.log.error("Invalid Shell Message", exc_info=True) - return + if args: + idents, msg = self.session.feed_identities(args[0], copy=False) + try: + msg = self.session.deserialize(msg, content=True, copy=False) + except Exception: + self.log.error("Invalid Shell Message", exc_info=True) + return + else: + idents, msg = None, {} shell_id = msg.get("metadata", {}).get("shell_id") if shell_id: self.subshell_queues[shell_id].put((idents, msg)) else: + if not msg: + args = () + else: + args = (idents, msg) self.msg_queue.sync_q.put( ( idx, dispatch, - idents, - msg, + args, ) ) # ensure the eventloop wakes up @@ -1092,7 +1098,7 @@ async def stop_aborting(): # if we have a delay, give messages this long to arrive on the queue # before we stop aborting requests - asyncio.get_event_loop().call_later(self.stop_on_error_timeout, schedule_stop_aborting) + self.shell_thread.io_loop.call_later(self.stop_on_error_timeout, schedule_stop_aborting) def _send_abort_reply(self, stream, msg, idents): """Send a reply to an aborted request""" diff --git a/ipykernel/tests/test_subshell.py b/ipykernel/tests/test_subshell.py index b033ec0bf..700e622e3 100644 --- a/ipykernel/tests/test_subshell.py +++ b/ipykernel/tests/test_subshell.py @@ -41,20 +41,19 @@ def get_content(t): msg1 = kc.session.msg("execute_request", get_content(0.1), metadata=metadata) kc.shell_channel.send(msg1) - msg_cnt = 0 - while True: - msg = kc.get_shell_msg() - t = time.time() - if msg["parent_header"]["msg_id"] == msg0["msg_id"]: - # main shell execution should take ~0.3s - assert 0.3 < t - t0 < 0.4 - msg_cnt += 1 - elif msg["parent_header"]["msg_id"] == msg1["msg_id"]: - # subshell execution should take ~0.1s if done in parallel - assert 0.1 < t - t1 < 0.2 - msg_cnt += 1 - if msg_cnt == 2: - break + msg = kc.get_shell_msg() + t = time.time() + # subshell should have finished execution first + assert msg["parent_header"]["msg_id"] == msg1["msg_id"] + # subshell execution should take ~0.1s if done in parallel + assert 0.1 < t - t1 < 0.2 + + msg = kc.get_shell_msg() + t = time.time() + # main shell shoud have finished execution last + assert msg["parent_header"]["msg_id"] == msg0["msg_id"] + # main shell execution should take ~0.3s + assert 0.3 < t - t0 < 0.4 kc.stop_channels() km.shutdown_kernel() From f94ad8f26bbd61bc53b0ccefaa897fc7dfe619af Mon Sep 17 00:00:00 2001 From: David Brochart Date: Wed, 15 Jun 2022 17:11:40 +0200 Subject: [PATCH 9/9] Fix in-process kernel --- ipykernel/inprocess/client.py | 3 +- ipykernel/inprocess/ipkernel.py | 1 + ipykernel/kernelbase.py | 19 ++++- ipykernel/shell.py | 118 +++++++++++++++++--------------- 4 files changed, 79 insertions(+), 62 deletions(-) diff --git a/ipykernel/inprocess/client.py b/ipykernel/inprocess/client.py index 807cad760..ea70bf961 100644 --- a/ipykernel/inprocess/client.py +++ b/ipykernel/inprocess/client.py @@ -179,8 +179,7 @@ def _dispatch_to_kernel(self, msg): stream = kernel.shell_stream self.session.send(stream, msg) msg_parts = stream.recv_multipart() - loop = asyncio.get_event_loop() - loop.run_until_complete(kernel.dispatch_shell(msg_parts)) + asyncio.run(kernel.dispatch_shell(msg_parts)) idents, reply_msg = self.session.recv(stream, copy=False) self.shell_channel.call_handlers_later(reply_msg) diff --git a/ipykernel/inprocess/ipkernel.py b/ipykernel/inprocess/ipkernel.py index aaaa5099f..b66dc3043 100644 --- a/ipykernel/inprocess/ipkernel.py +++ b/ipykernel/inprocess/ipkernel.py @@ -71,6 +71,7 @@ def __init__(self, **traits): self._underlying_iopub_socket.observe(self._io_dispatch, names=["message_sent"]) self.shell.kernel = self + self.main_shell_queue = None async def execute_request(self, stream, ident, parent): """Override for temporary IO redirection.""" diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index 8ba73357d..98771447e 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -59,7 +59,7 @@ from ipykernel.jsonutil import json_clean from ._version import kernel_protocol_version -from .shell import SubshellThread, handle_messages +from .shell import SubshellThread, handle_message, handle_messages def _accepts_cell_id(meth): @@ -368,7 +368,16 @@ def should_handle(self, stream, msg, idents): async def dispatch_shell(self, *args): """dispatch shell requests""" - idents, msg = args + if len(args) == 1: + # in-process kernel + idents, msg = self.session.feed_identities(args[0], copy=False) + try: + msg = self.session.deserialize(msg, content=True, copy=False) + except Exception: + self.log.error("Invalid Shell Message", exc_info=True) + return + else: + idents, msg = args # flush control queue before handling shell requests await self._flush_control_queue() @@ -377,7 +386,11 @@ async def dispatch_shell(self, *args): if shell_id: self.subshell_queues[shell_id].put((idents, msg)) else: - self.main_shell_queue.sync_q.put((idents, msg)) + if self.main_shell_queue is None: + # in-process kernel + await handle_message(idents, msg, self, True) + else: + self.main_shell_queue.sync_q.put((idents, msg)) def pre_handler_hook(self): """Hook to execute before calling message handler""" diff --git a/ipykernel/shell.py b/ipykernel/shell.py index 0520e9af0..1bd2f72e6 100644 --- a/ipykernel/shell.py +++ b/ipykernel/shell.py @@ -12,69 +12,73 @@ from zmq.eventloop.ioloop import IOLoop +async def handle_message(idents, msg, kernel, is_main): + if is_main: + # Set the parent message for side effects. + kernel.set_parent(idents, msg, channel="shell") + kernel._publish_status("busy", "shell") + + msg_type = msg["header"]["msg_type"] + + # Only abort execute requests + if kernel._aborting and msg_type == "execute_request": + kernel._send_abort_reply(kernel.shell_stream, msg, idents) + kernel._publish_status("idle", "shell") + # flush to ensure reply is sent before + # handling the next request + kernel.shell_stream.flush(zmq.POLLOUT) + return + + # Print some info about this message and leave a '--->' marker, so it's + # easier to trace visually the message chain when debugging. Each + # handler prints its message at the end. + kernel.log.debug("\n*** MESSAGE TYPE:%s***", msg_type) + kernel.log.debug(" Content: %s\n --->\n ", msg["content"]) + + if not kernel.should_handle(kernel.shell_stream, msg, idents): + return + + handler = kernel.shell_handlers.get(msg_type, None) + if handler is None: + kernel.log.warning("Unknown message type: %r", msg_type) + else: + kernel.log.debug("%s: %s", msg_type, msg) + try: + kernel.pre_handler_hook() + except Exception: + kernel.log.debug("Unable to signal in pre_handler_hook:", exc_info=True) + try: + result = handler(kernel.shell_stream, idents, msg) + if inspect.isawaitable(result): + await result + except Exception: + kernel.log.error("Exception in message handler:", exc_info=True) + except KeyboardInterrupt: + # Ctrl-c shouldn't crash the kernel here. + kernel.log.error("KeyboardInterrupt caught in kernel.") + finally: + try: + kernel.post_handler_hook() + except Exception: + kernel.log.debug("Unable to signal in post_handler_hook:", exc_info=True) + + if is_main: + sys.stdout.flush() + sys.stderr.flush() + kernel._publish_status("idle", "shell") + + # flush to ensure reply is sent before + # handling the next request + kernel.shell_stream.flush(zmq.POLLOUT) + + async def handle_messages(msg_queue, kernel, is_main): while True: res = msg_queue.get() if is_main: res = await res idents, msg = res - if is_main: - # Set the parent message for side effects. - kernel.set_parent(idents, msg, channel="shell") - kernel._publish_status("busy", "shell") - - msg_type = msg["header"]["msg_type"] - - # Only abort execute requests - if kernel._aborting and msg_type == "execute_request": - kernel._send_abort_reply(kernel.shell_stream, msg, idents) - kernel._publish_status("idle", "shell") - # flush to ensure reply is sent before - # handling the next request - kernel.shell_stream.flush(zmq.POLLOUT) - continue - - # Print some info about this message and leave a '--->' marker, so it's - # easier to trace visually the message chain when debugging. Each - # handler prints its message at the end. - kernel.log.debug("\n*** MESSAGE TYPE:%s***", msg_type) - kernel.log.debug(" Content: %s\n --->\n ", msg["content"]) - - if not kernel.should_handle(kernel.shell_stream, msg, idents): - continue - - handler = kernel.shell_handlers.get(msg_type, None) - if handler is None: - kernel.log.warning("Unknown message type: %r", msg_type) - else: - kernel.log.debug("%s: %s", msg_type, msg) - try: - kernel.pre_handler_hook() - except Exception: - kernel.log.debug("Unable to signal in pre_handler_hook:", exc_info=True) - try: - result = handler(kernel.shell_stream, idents, msg) - if inspect.isawaitable(result): - await result - except Exception: - kernel.log.error("Exception in message handler:", exc_info=True) - except KeyboardInterrupt: - # Ctrl-c shouldn't crash the kernel here. - kernel.log.error("KeyboardInterrupt caught in kernel.") - finally: - try: - kernel.post_handler_hook() - except Exception: - kernel.log.debug("Unable to signal in post_handler_hook:", exc_info=True) - - if is_main: - sys.stdout.flush() - sys.stderr.flush() - kernel._publish_status("idle", "shell") - - # flush to ensure reply is sent before - # handling the next request - kernel.shell_stream.flush(zmq.POLLOUT) + await handle_message(idents, msg, kernel, is_main) class SubshellThread(Thread):