diff --git a/ipykernel/iostream.py b/ipykernel/iostream.py index 5c2818dcc..74e65e4d9 100644 --- a/ipykernel/iostream.py +++ b/ipykernel/iostream.py @@ -57,6 +57,7 @@ def __init__(self, socket, pipe=False): piped from subprocesses. """ self.socket = socket + self._stopped = False self.background_socket = BackgroundSocket(self) self._master_pid = os.getpid() self._pipe_flag = pipe @@ -83,13 +84,21 @@ def _start_event_gc(): self._event_pipe_gc_task = asyncio.ensure_future(self._run_event_pipe_gc()) self.io_loop.run_sync(_start_event_gc) - self.io_loop.start() + + if not self._stopped: + # avoid race if stop called before start thread gets here + # probably only comes up in tests + self.io_loop.start() + if self._event_pipe_gc_task is not None: # cancel gc task to avoid pending task warnings async def _cancel(): self._event_pipe_gc_task.cancel() # type:ignore - self.io_loop.run_sync(_cancel) + try: + self.io_loop.run_sync(_cancel) + except TimeoutError: + pass self.io_loop.close(all_fds=True) def _setup_event_pipe(self): @@ -219,10 +228,16 @@ def start(self): def stop(self): """Stop the IOPub thread""" + self._stopped = True if not self.thread.is_alive(): return self.io_loop.add_callback(self.io_loop.stop) - self.thread.join() + + self.thread.join(timeout=30) + if self.thread.is_alive(): + # avoid infinite hang if stop fails + msg = "IOPub thread did not terminate in 30 seconds" + raise TimeoutError(msg) # close *all* event pipes, created in any thread # event pipes can only be used from other threads while self.thread.is_alive() # so after thread.join, this should be safe