Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 43 additions & 7 deletions ipykernel/iostream.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# Copyright (c) IPython Development Team.
# Distributed under the terms of the Modified BSD License.

import asyncio
import atexit
import io
import os
Expand All @@ -14,8 +15,7 @@
from collections import deque
from io import StringIO, TextIOBase
from threading import local
from typing import Any, Callable, Deque, Optional
from weakref import WeakSet
from typing import Any, Callable, Deque, Dict, Optional

import zmq
from jupyter_client.session import extract_header
Expand Down Expand Up @@ -63,7 +63,10 @@ def __init__(self, socket, pipe=False):
self._setup_pipe_in()
self._local = threading.local()
self._events: Deque[Callable[..., Any]] = deque()
self._event_pipes: WeakSet[Any] = WeakSet()
self._event_pipes: Dict[threading.Thread, Any] = {}
self._event_pipe_gc_lock: threading.Lock = threading.Lock()
self._event_pipe_gc_seconds: float = 10
self._event_pipe_gc_task: Optional[asyncio.Task] = None
self._setup_event_pipe()
self.thread = threading.Thread(target=self._thread_main, name="IOPub")
self.thread.daemon = True
Expand All @@ -73,7 +76,18 @@ def __init__(self, socket, pipe=False):

def _thread_main(self):
"""The inner loop that's actually run in a thread"""

def _start_event_gc():
self._event_pipe_gc_task = asyncio.ensure_future(self._run_event_pipe_gc())

self.io_loop.run_sync(_start_event_gc)
self.io_loop.start()
if self._event_pipe_gc_task is not None:
# cancel gc task to avoid pending task warnings
async def _cancel():
self._event_pipe_gc_task.cancel() # type:ignore

self.io_loop.run_sync(_cancel)
self.io_loop.close(all_fds=True)

def _setup_event_pipe(self):
Expand All @@ -88,6 +102,26 @@ def _setup_event_pipe(self):
self._event_puller = ZMQStream(pipe_in, self.io_loop)
self._event_puller.on_recv(self._handle_event)

async def _run_event_pipe_gc(self):
"""Task to run event pipe gc continuously"""
while True:
await asyncio.sleep(self._event_pipe_gc_seconds)
try:
await self._event_pipe_gc()
except Exception as e:
print(f"Exception in IOPubThread._event_pipe_gc: {e}", file=sys.__stderr__)

async def _event_pipe_gc(self):
"""run a single garbage collection on event pipes"""
if not self._event_pipes:
# don't acquire the lock if there's nothing to do
return
with self._event_pipe_gc_lock:
for thread, socket in list(self._event_pipes.items()):
if not thread.is_alive():
socket.close()
del self._event_pipes[thread]

@property
def _event_pipe(self):
"""thread-local event pipe for signaling events that should be processed in the thread"""
Expand All @@ -100,9 +134,11 @@ def _event_pipe(self):
event_pipe.linger = 0
event_pipe.connect(self._event_interface)
self._local.event_pipe = event_pipe
# WeakSet so that event pipes will be closed by garbage collection
# when their threads are terminated
self._event_pipes.add(event_pipe)
# associate event pipes to their threads
# so they can be closed explicitly
# implicit close on __del__ throws a ResourceWarning
with self._event_pipe_gc_lock:
self._event_pipes[threading.current_thread()] = event_pipe
return event_pipe

def _handle_event(self, msg):
Expand Down Expand Up @@ -188,7 +224,7 @@ def stop(self):
# close *all* event pipes, created in any thread
# event pipes can only be used from other threads while self.thread.is_alive()
# so after thread.join, this should be safe
for event_pipe in self._event_pipes:
for _thread, event_pipe in self._event_pipes.items():
event_pipe.close()

def close(self):
Expand Down
35 changes: 35 additions & 0 deletions ipykernel/tests/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
import os
import subprocess
import sys
import threading
import time
import warnings
from concurrent.futures import Future, ThreadPoolExecutor
from unittest import mock

import pytest
Expand Down Expand Up @@ -114,6 +116,39 @@ def test_outstream(iopub_thread):
assert stream.writable()


async def test_event_pipe_gc(iopub_thread):
session = Session(key=b'abc')
stream = OutStream(
session,
iopub_thread,
"stdout",
isatty=True,
watchfd=False,
)
save_stdout = sys.stdout
assert iopub_thread._event_pipes == {}
with stream, mock.patch.object(sys, "stdout", stream), ThreadPoolExecutor(1) as pool:
pool.submit(print, "x").result()
pool_thread = pool.submit(threading.current_thread).result()
assert list(iopub_thread._event_pipes) == [pool_thread]

# run gc once in the iopub thread
f: Future = Future()

async def run_gc():
try:
await iopub_thread._event_pipe_gc()
except Exception as e:
f.set_exception(e)
else:
f.set_result(None)

iopub_thread.io_loop.add_callback(run_gc)
# wait for call to finish in iopub thread
f.result()
assert iopub_thread._event_pipes == {}


def subprocess_test_echo_watch():
# handshake Pub subscription
session = Session(key=b'abc')
Expand Down