From 4ca12622503161adf0d06f7522cf1c3d3f428a09 Mon Sep 17 00:00:00 2001 From: Min RK Date: Mon, 12 Jun 2023 12:34:03 +0200 Subject: [PATCH 1/3] avoid ResourceWarning on implicitly closed event pipe sockets since there's no 'on thread close event', so we need to garbage collect event pipes periodically. There will only be something to do when a thread has: 1. triggered an event via print or flush, and 2. closed so run quite rarely (e.g. 10 seconds) --- ipykernel/iostream.py | 50 ++++++++++++++++++++++++++++++++------ ipykernel/tests/test_io.py | 35 ++++++++++++++++++++++++++ 2 files changed, 78 insertions(+), 7 deletions(-) diff --git a/ipykernel/iostream.py b/ipykernel/iostream.py index 8b5e47b30..6f0bfb975 100644 --- a/ipykernel/iostream.py +++ b/ipykernel/iostream.py @@ -3,6 +3,7 @@ # Copyright (c) IPython Development Team. # Distributed under the terms of the Modified BSD License. +import asyncio import atexit import io import os @@ -14,8 +15,7 @@ from collections import deque from io import StringIO, TextIOBase from threading import local -from typing import Any, Callable, Deque, Optional -from weakref import WeakSet +from typing import Any, Callable, Deque, Dict, Optional import zmq from jupyter_client.session import extract_header @@ -63,7 +63,10 @@ def __init__(self, socket, pipe=False): self._setup_pipe_in() self._local = threading.local() self._events: Deque[Callable[..., Any]] = deque() - self._event_pipes: WeakSet[Any] = WeakSet() + self._event_pipes: Dict[threading.Thread, Any] = {} + self._event_pipe_gc_lock: threading.Lock = threading.Lock() + self._event_pipe_gc_seconds: float = 10 + self._event_pipe_gc_task: Optional[asyncio.Task] = None self._setup_event_pipe() self.thread = threading.Thread(target=self._thread_main, name="IOPub") self.thread.daemon = True @@ -73,7 +76,18 @@ def __init__(self, socket, pipe=False): def _thread_main(self): """The inner loop that's actually run in a thread""" + + def _start_event_gc(): + self._event_pipe_gc_task = asyncio.ensure_future(self._run_event_pipe_gc()) + + self.io_loop.run_sync(_start_event_gc) self.io_loop.start() + if self._event_pipe_gc_task is not None: + # cancel gc task to avoid pending task warnings + async def _cancel(): + self._event_pipe_gc_task.cancel() + + self.io_loop.run_sync(_cancel) self.io_loop.close(all_fds=True) def _setup_event_pipe(self): @@ -88,6 +102,26 @@ def _setup_event_pipe(self): self._event_puller = ZMQStream(pipe_in, self.io_loop) self._event_puller.on_recv(self._handle_event) + async def _run_event_pipe_gc(self): + """Task to run event pipe gc continuously""" + while True: + await asyncio.sleep(self._event_pipe_gc_seconds) + try: + await self._event_pipe_gc() + except Exception as e: + print(f"Exception in IOPubThread._event_pipe_gc: {e}", file=sys.__stderr__) + + async def _event_pipe_gc(self): + """run a single garbage collection on event pipes""" + if not self._event_pipes: + # don't acquire the lock if there's nothing to do + return + with self._event_pipe_gc_lock: + for thread, socket in list(self._event_pipes.items()): + if not thread.is_alive(): + socket.close() + del self._event_pipes[thread] + @property def _event_pipe(self): """thread-local event pipe for signaling events that should be processed in the thread""" @@ -100,9 +134,11 @@ def _event_pipe(self): event_pipe.linger = 0 event_pipe.connect(self._event_interface) self._local.event_pipe = event_pipe - # WeakSet so that event pipes will be closed by garbage collection - # when their threads are terminated - self._event_pipes.add(event_pipe) + # associate event pipes to their threads + # so they can be closed explicitly + # implicit close on __del__ throws a ResourceWarning + with self._event_pipe_gc_lock: + self._event_pipes[threading.current_thread()] = event_pipe return event_pipe def _handle_event(self, msg): @@ -188,7 +224,7 @@ def stop(self): # close *all* event pipes, created in any thread # event pipes can only be used from other threads while self.thread.is_alive() # so after thread.join, this should be safe - for event_pipe in self._event_pipes: + for _thread, event_pipe in self._event_pipes.items(): event_pipe.close() def close(self): diff --git a/ipykernel/tests/test_io.py b/ipykernel/tests/test_io.py index 6a9f65170..8ae160dcb 100644 --- a/ipykernel/tests/test_io.py +++ b/ipykernel/tests/test_io.py @@ -4,8 +4,10 @@ import os import subprocess import sys +import threading import time import warnings +from concurrent.futures import Future, ThreadPoolExecutor from unittest import mock import pytest @@ -114,6 +116,39 @@ def test_outstream(iopub_thread): assert stream.writable() +async def test_event_pipe_gc(iopub_thread): + session = Session(key=b'abc') + stream = OutStream( + session, + iopub_thread, + "stdout", + isatty=True, + watchfd=False, + ) + save_stdout = sys.stdout + assert iopub_thread._event_pipes == {} + with stream, mock.patch.object(sys, "stdout", stream), ThreadPoolExecutor(1) as pool: + pool.submit(print, "x").result() + pool_thread = pool.submit(threading.current_thread).result() + assert list(iopub_thread._event_pipes) == [pool_thread] + + # run gc once in the iopub thread + f = Future() + + async def run_gc(): + try: + await iopub_thread._event_pipe_gc() + except Exception as e: + f.set_exception(e) + else: + f.set_result(None) + + iopub_thread.io_loop.add_callback(run_gc) + # wait for call to finish in iopub thread + f.result() + assert iopub_thread._event_pipes == {} + + def subprocess_test_echo_watch(): # handshake Pub subscription session = Session(key=b'abc') From e313e56536790f1e50f4a6f8d3c233e529011a75 Mon Sep 17 00:00:00 2001 From: Steven Silvester Date: Mon, 12 Jun 2023 11:56:05 -0500 Subject: [PATCH 2/3] lint --- ipykernel/iostream.py | 2 +- ipykernel/tests/test_io.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/ipykernel/iostream.py b/ipykernel/iostream.py index 6f0bfb975..a1e138452 100644 --- a/ipykernel/iostream.py +++ b/ipykernel/iostream.py @@ -85,7 +85,7 @@ def _start_event_gc(): if self._event_pipe_gc_task is not None: # cancel gc task to avoid pending task warnings async def _cancel(): - self._event_pipe_gc_task.cancel() + self._event_pipe_gc_task.cancel() # type:ignore self.io_loop.run_sync(_cancel) self.io_loop.close(all_fds=True) diff --git a/ipykernel/tests/test_io.py b/ipykernel/tests/test_io.py index 8ae160dcb..6256ecd67 100644 --- a/ipykernel/tests/test_io.py +++ b/ipykernel/tests/test_io.py @@ -133,7 +133,7 @@ async def test_event_pipe_gc(iopub_thread): assert list(iopub_thread._event_pipes) == [pool_thread] # run gc once in the iopub thread - f = Future() + f: Future = Future() async def run_gc(): try: From f745ceacd458553827a631d687bb48f56efe944c Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 12 Jun 2023 16:58:17 +0000 Subject: [PATCH 3/3] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- ipykernel/tests/test_io.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ipykernel/tests/test_io.py b/ipykernel/tests/test_io.py index 6256ecd67..404657cbb 100644 --- a/ipykernel/tests/test_io.py +++ b/ipykernel/tests/test_io.py @@ -133,7 +133,7 @@ async def test_event_pipe_gc(iopub_thread): assert list(iopub_thread._event_pipes) == [pool_thread] # run gc once in the iopub thread - f: Future = Future() + f: Future = Future() async def run_gc(): try: