Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions ipykernel/iostream.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def __init__(self, socket, pipe=False):
piped from subprocesses.
"""
self.socket = socket
self._stopped = False
self.background_socket = BackgroundSocket(self)
self._master_pid = os.getpid()
self._pipe_flag = pipe
Expand All @@ -83,13 +84,21 @@ def _start_event_gc():
self._event_pipe_gc_task = asyncio.ensure_future(self._run_event_pipe_gc())

self.io_loop.run_sync(_start_event_gc)
self.io_loop.start()

if not self._stopped:
# avoid race if stop called before start thread gets here
# probably only comes up in tests
self.io_loop.start()

if self._event_pipe_gc_task is not None:
# cancel gc task to avoid pending task warnings
async def _cancel():
self._event_pipe_gc_task.cancel() # type:ignore

self.io_loop.run_sync(_cancel)
try:
self.io_loop.run_sync(_cancel)
except TimeoutError:
pass
self.io_loop.close(all_fds=True)

def _setup_event_pipe(self):
Expand Down Expand Up @@ -219,10 +228,16 @@ def start(self):

def stop(self):
"""Stop the IOPub thread"""
self._stopped = True
if not self.thread.is_alive():
return
self.io_loop.add_callback(self.io_loop.stop)
self.thread.join()

self.thread.join(timeout=30)
if self.thread.is_alive():
# avoid infinite hang if stop fails
msg = "IOPub thread did not terminate in 30 seconds"
raise TimeoutError(msg)
# close *all* event pipes, created in any thread
# event pipes can only be used from other threads while self.thread.is_alive()
# so after thread.join, this should be safe
Expand Down