diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index 34119dc70..622f1ca05 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -671,7 +671,7 @@ async def execute_request(self, stream, ident, parent): self.log.debug("%s", reply_msg) if not silent and reply_msg['content']['status'] == 'error' and stop_on_error: - await self._abort_queues() + self._abort_queues() def do_execute(self, code, silent, store_history=True, user_expressions=None, allow_stdin=False): @@ -974,13 +974,31 @@ def _topic(self, topic): _aborting = Bool(False) - async def _abort_queues(self): - self.shell_stream.flush() + def _abort_queues(self): + # while this flag is true, + # execute requests will be aborted self._aborting = True + self.log.info("Aborting queue") + + # flush streams, so all currently waiting messages + # are added to the queue + self.shell_stream.flush() + + # Callback to signal that we are done aborting def stop_aborting(): self.log.info("Finishing abort") self._aborting = False - asyncio.get_event_loop().call_later(self.stop_on_error_timeout, stop_aborting) + + # put the stop-aborting event on the message queue + # so that all messages already waiting in the queue are aborted + # before we reset the flag + schedule_stop_aborting = partial(self.schedule_dispatch, stop_aborting) + + # if we have a delay, give messages this long to arrive on the queue + # before we stop aborting requests + asyncio.get_event_loop().call_later( + self.stop_on_error_timeout, schedule_stop_aborting + ) def _send_abort_reply(self, stream, msg, idents): """Send a reply to an aborted request""" diff --git a/ipykernel/tests/test_message_spec.py b/ipykernel/tests/test_message_spec.py index b66c124d6..f550b898e 100644 --- a/ipykernel/tests/test_message_spec.py +++ b/ipykernel/tests/test_message_spec.py @@ -341,15 +341,23 @@ def test_execute_stop_on_error(): """execute request should not abort execution queue with stop_on_error False""" flush_channels() - fail = '\n'.join([ - # sleep to ensure subsequent message is waiting in the queue to be aborted - 'import time', - 'time.sleep(0.5)', - 'raise ValueError', - ]) + fail = "\n".join( + [ + # sleep to ensure subsequent message is waiting in the queue to be aborted + # async sleep to ensure coroutines are processing while this happens + "import asyncio", + "await asyncio.sleep(1)", + "raise ValueError()", + ] + ) KC.execute(code=fail) KC.execute(code='print("Hello")') - KC.get_shell_msg(timeout=TIMEOUT) + KC.execute(code='print("world")') + reply = KC.get_shell_msg(timeout=TIMEOUT) + print(reply) + reply = KC.get_shell_msg(timeout=TIMEOUT) + assert reply["content"]["status"] == "aborted" + # second message, too reply = KC.get_shell_msg(timeout=TIMEOUT) assert reply['content']['status'] == 'aborted'