From b1dee417073ed20451ad57a0889b37cba7e5bb14 Mon Sep 17 00:00:00 2001 From: Min RK Date: Mon, 24 Apr 2023 09:36:09 +0200 Subject: [PATCH 01/11] Avoid echoing onto a captured FD --- ipykernel/iostream.py | 38 +++++++++--- ipykernel/tests/test_io.py | 123 +++++++++++++++++++++++-------------- 2 files changed, 109 insertions(+), 52 deletions(-) diff --git a/ipykernel/iostream.py b/ipykernel/iostream.py index 9de6156b3..02d326225 100644 --- a/ipykernel/iostream.py +++ b/ipykernel/iostream.py @@ -364,7 +364,7 @@ def __init__( echo : bool whether to echo output watchfd : bool (default, True) - Watch the file descripttor corresponding to the replaced stream. + Watch the file descriptor corresponding to the replaced stream. This is useful if you know some underlying code will write directly the file descriptor by its number. It will spawn a watching thread, that will swap the give file descriptor for a pipe, read from the @@ -408,19 +408,39 @@ def __init__( if ( watchfd - and (sys.platform.startswith("linux") or sys.platform.startswith("darwin")) - and ("PYTEST_CURRENT_TEST" not in os.environ) + and ( + (sys.platform.startswith("linux") or sys.platform.startswith("darwin")) + # Pytest set its own capture. Don't redirect from within pytest. + and ("PYTEST_CURRENT_TEST" not in os.environ) + ) + # allow forcing watchfd (mainly for tests) + or watchfd == "force" ): - # Pytest set its own capture. Dont redirect from within pytest. - self._should_watch = True self._setup_stream_redirects(name) if echo: if hasattr(echo, "read") and hasattr(echo, "write"): + # make sure we aren't trying to echo on the FD we're watching! + # that would cause an infinite loop, always echoing on itself + if self._should_watch: + try: + echo_fd = echo.fileno() + except Exception: + echo_fd = None + + if echo_fd is not None and echo_fd == self._original_stdstream_fd: + # echo on the _copy_ we made during + # this is the actual terminal FD now + echo = io.TextIOWrapper( + io.FileIO( + self._original_stdstream_copy, + "w", + ) + ) self.echo = echo else: - msg = "echo argument must be a file like object" + msg = "echo argument must be a file-like object" raise ValueError(msg) def isatty(self): @@ -433,7 +453,7 @@ def isatty(self): def _setup_stream_redirects(self, name): pr, pw = os.pipe() - fno = getattr(sys, name).fileno() + fno = self._original_stdstream_fd = getattr(sys, name).fileno() self._original_stdstream_copy = os.dup(fno) os.dup2(pw, fno) @@ -456,6 +476,10 @@ def close(self): if self._should_watch: self._should_watch = False self.watch_fd_thread.join() + # restore original FDs + os.dup2(self._original_stdstream_copy, self._original_stdstream_fd) + os.close(self._original_stdstream_copy) + print("closing", self) if self._exc: etype, value, tb = self._exc traceback.print_exception(etype, value, tb) diff --git a/ipykernel/tests/test_io.py b/ipykernel/tests/test_io.py index 221af1f8b..b55cf53c3 100644 --- a/ipykernel/tests/test_io.py +++ b/ipykernel/tests/test_io.py @@ -1,6 +1,7 @@ """Test IO capturing functionality""" import io +import sys import warnings import pytest @@ -10,20 +11,29 @@ from ipykernel.iostream import MASTER, BackgroundSocket, IOPubThread, OutStream -def test_io_api(): - """Test that wrapped stdout has the same API as a normal TextIO object""" - session = Session() +@pytest.fixture +def ctx(): ctx = zmq.Context() - pub = ctx.socket(zmq.PUB) - thread = IOPubThread(pub) - thread.start() + return ctx + # yield ctx + ctx.destroy() - stream = OutStream(session, thread, "stdout") - # cleanup unused zmq objects before we start testing - thread.stop() - thread.close() - ctx.term() +@pytest.fixture +def iopub_thread(ctx): + with ctx.socket(zmq.PUB) as pub: + thread = IOPubThread(pub) + thread.start() + + yield thread + thread.stop() + thread.close() + + +def test_io_api(iopub_thread): + """Test that wrapped stdout has the same API as a normal TextIO object""" + session = Session() + stream = OutStream(session, iopub_thread, "stdout") assert stream.errors is None assert not stream.isatty() @@ -43,21 +53,14 @@ def test_io_api(): stream.write(b"") # type:ignore -def test_io_isatty(): +def test_io_isatty(iopub_thread): session = Session() - ctx = zmq.Context() - pub = ctx.socket(zmq.PUB) - thread = IOPubThread(pub) - thread.start() - - stream = OutStream(session, thread, "stdout", isatty=True) + stream = OutStream(session, iopub_thread, "stdout", isatty=True) assert stream.isatty() -def test_io_thread(): - ctx = zmq.Context() - pub = ctx.socket(zmq.PUB) - thread = IOPubThread(pub) +def test_io_thread(iopub_thread): + thread = iopub_thread thread._setup_pipe_in() msg = [thread._pipe_uuid, b"a"] thread._handle_pipe_msg(msg) @@ -72,40 +75,70 @@ def test_io_thread(): thread._really_send(None) -def test_background_socket(): - ctx = zmq.Context() - pub = ctx.socket(zmq.PUB) - thread = IOPubThread(pub) - sock = BackgroundSocket(thread) +def test_background_socket(iopub_thread): + sock = BackgroundSocket(iopub_thread) assert sock.__class__ == BackgroundSocket with warnings.catch_warnings(): warnings.simplefilter("ignore", DeprecationWarning) sock.linger = 101 - assert thread.socket.linger == 101 - assert sock.io_thread == thread + assert iopub_thread.socket.linger == 101 + assert sock.io_thread == iopub_thread sock.send(b"hi") -def test_outstream(): +def test_outstream(iopub_thread): session = Session() - ctx = zmq.Context() - pub = ctx.socket(zmq.PUB) - thread = IOPubThread(pub) - thread.start() - + pub = iopub_thread.socket with warnings.catch_warnings(): warnings.simplefilter("ignore", DeprecationWarning) stream = OutStream(session, pub, "stdout") - stream = OutStream(session, thread, "stdout", pipe=object()) + stream.close() + stream = OutStream(session, iopub_thread, "stdout", pipe=object()) + stream.close() - stream = OutStream(session, thread, "stdout", watchfd=False) + stream = OutStream(session, iopub_thread, "stdout", watchfd=False) stream.close() - stream = OutStream(session, thread, "stdout", isatty=True, echo=io.StringIO()) - with pytest.raises(io.UnsupportedOperation): - stream.fileno() - stream._watch_pipe_fd() - stream.flush() - stream.write("hi") - stream.writelines(["ab", "cd"]) - assert stream.writable() + stream = OutStream(session, iopub_thread, "stdout", isatty=True, echo=io.StringIO()) + + with stream: + with pytest.raises(io.UnsupportedOperation): + stream.fileno() + stream._watch_pipe_fd() + stream.flush() + stream.write("hi") + stream.writelines(["ab", "cd"]) + assert stream.writable() + + +def test_echo_watch(capfd, iopub_thread): + """Test echo on underlying FD while capturing the same FD + + If not careful, this + """ + session = Session() + import os + + fd_stdout = os.fdopen(sys.stdout.fileno(), "wb") + stream = OutStream( + session, + iopub_thread, + "stdout", + isatty=True, + echo=sys.stdout, + ) + # fd_stdout.close() + save_stdout = sys.stdout + with stream: + fd_stdout.write(b"fd\n") + fd_stdout.flush() + sys.stdout = stream + sys.__stdout__.write("__stdout__\n") + sys.__stdout__.flush() + sys.stdout.write("stdout") + sys.stdout.flush() + sys.stdout = save_stdout + + out, err = capfd.readouterr() + print(out, err) + assert out.strip() == "fd\n__stdout__\nstdout" From d04e117bacb21a953c1b9104dc0fe48420b04e3c Mon Sep 17 00:00:00 2001 From: Min RK Date: Thu, 27 Apr 2023 09:09:18 +0200 Subject: [PATCH 02/11] test capture_fd + echo combination --- ipykernel/iostream.py | 1 - ipykernel/tests/test_io.py | 25 +++++++++++++++---------- 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/ipykernel/iostream.py b/ipykernel/iostream.py index 02d326225..562b7512a 100644 --- a/ipykernel/iostream.py +++ b/ipykernel/iostream.py @@ -479,7 +479,6 @@ def close(self): # restore original FDs os.dup2(self._original_stdstream_copy, self._original_stdstream_fd) os.close(self._original_stdstream_copy) - print("closing", self) if self._exc: etype, value, tb = self._exc traceback.print_exception(etype, value, tb) diff --git a/ipykernel/tests/test_io.py b/ipykernel/tests/test_io.py index b55cf53c3..295f59c00 100644 --- a/ipykernel/tests/test_io.py +++ b/ipykernel/tests/test_io.py @@ -1,8 +1,10 @@ """Test IO capturing functionality""" import io +import os import sys import warnings +from unittest import mock import pytest import zmq @@ -111,15 +113,15 @@ def test_outstream(iopub_thread): assert stream.writable() +@pytest.mark.skipif(sys.platform.startswith("win"), reason="Windows") def test_echo_watch(capfd, iopub_thread): """Test echo on underlying FD while capturing the same FD If not careful, this """ session = Session() - import os - fd_stdout = os.fdopen(sys.stdout.fileno(), "wb") + stdout_fd = sys.stdout.fileno() stream = OutStream( session, iopub_thread, @@ -127,18 +129,21 @@ def test_echo_watch(capfd, iopub_thread): isatty=True, echo=sys.stdout, ) - # fd_stdout.close() save_stdout = sys.stdout - with stream: - fd_stdout.write(b"fd\n") - fd_stdout.flush() - sys.stdout = stream + with stream, mock.patch.object(sys, "stdout", stream): + # write to low-level FD + os.write(stdout_fd, b"fd\n") + os.fsync(stdout_fd) + # write to unwrapped __stdout__ (should also go to original FD) sys.__stdout__.write("__stdout__\n") sys.__stdout__.flush() - sys.stdout.write("stdout") + # write to original sys.stdout (should be the same as __stdout__) + save_stdout.write("stdout\n") + save_stdout.flush() + # print (writes to stream) + print("print") sys.stdout.flush() - sys.stdout = save_stdout out, err = capfd.readouterr() print(out, err) - assert out.strip() == "fd\n__stdout__\nstdout" + assert out.strip() == "fd\n__stdout__\nstdout\nprint" From eba36dd08fc79f1e95c4c22cfa5b89c5a5f3f72d Mon Sep 17 00:00:00 2001 From: Min RK Date: Thu, 27 Apr 2023 09:56:27 +0200 Subject: [PATCH 03/11] wake watcher thread before joining avoids hang if there's nothing to read could also close first, which would wake the thread, but that gets complicated with the redirect --- ipykernel/iostream.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/ipykernel/iostream.py b/ipykernel/iostream.py index 562b7512a..8b5e47b30 100644 --- a/ipykernel/iostream.py +++ b/ipykernel/iostream.py @@ -475,6 +475,9 @@ def close(self): """Close the stream.""" if self._should_watch: self._should_watch = False + # thread won't wake unless there's something to read + # writing something after _should_watch will not be echoed + os.write(self._original_stdstream_fd, b'\0') self.watch_fd_thread.join() # restore original FDs os.dup2(self._original_stdstream_copy, self._original_stdstream_fd) From a8ddfa182a3a197de4328376f0018a375d85102e Mon Sep 17 00:00:00 2001 From: Min RK Date: Thu, 27 Apr 2023 09:57:14 +0200 Subject: [PATCH 04/11] run echo_watch test in a subprocess to avoid messing with pytest output capture I'm pretty sure it's _possible_ to do this in-process, but I can't seem to figure it out --- ipykernel/tests/test_io.py | 123 ++++++++++++++++++++++++++++--------- 1 file changed, 94 insertions(+), 29 deletions(-) diff --git a/ipykernel/tests/test_io.py b/ipykernel/tests/test_io.py index 295f59c00..1623b104b 100644 --- a/ipykernel/tests/test_io.py +++ b/ipykernel/tests/test_io.py @@ -2,7 +2,9 @@ import io import os +import subprocess import sys +import time import warnings from unittest import mock @@ -113,37 +115,100 @@ def test_outstream(iopub_thread): assert stream.writable() +def subprocess_test_echo_watch(): + # handshake Pub subscription + session = Session(key=b'abc') + + # use PUSH socket to avoid subscription issues + with zmq.Context() as ctx, ctx.socket(zmq.PUSH) as pub: + pub.connect(os.environ["IOPUB_URL"]) + iopub_thread = IOPubThread(pub) + stdout_fd = sys.stdout.fileno() + sys.stdout.flush() + stream = OutStream( + session, + iopub_thread, + "stdout", + isatty=True, + echo=sys.stdout, + watchfd="force", + ) + save_stdout = sys.stdout + with stream, mock.patch.object(sys, "stdout", stream): + # write to low-level FD + os.write(stdout_fd, b"fd\n") + # print (writes to stream) + print("print") + sys.stdout.flush() + # write to unwrapped __stdout__ (should also go to original FD) + sys.__stdout__.write("__stdout__\n") + sys.__stdout__.flush() + # write to original sys.stdout (should be the same as __stdout__) + save_stdout.write("stdout\n") + save_stdout.flush() + stream.flush() + # we don't have a sync flush on _reading_ from the watched pipe + time.sleep(0.5) + iopub_thread.stop() + iopub_thread.close() + + @pytest.mark.skipif(sys.platform.startswith("win"), reason="Windows") -def test_echo_watch(capfd, iopub_thread): +def test_echo_watch(ctx): """Test echo on underlying FD while capturing the same FD - If not careful, this + Test runs in a subprocess to avoid messing with pytest output capturing. """ - session = Session() - - stdout_fd = sys.stdout.fileno() - stream = OutStream( - session, - iopub_thread, + s = ctx.socket(zmq.PULL) + port = s.bind_to_random_port("tcp://127.0.0.1") + url = f"tcp://127.0.0.1:{port}" + session = Session(key=b'abc') + messages = [] + stdout_chunks = [] + with s: + env = dict(os.environ) + env["IOPUB_URL"] = url + env["PYTHONUNBUFFERED"] = "1" + env.pop("PYTEST_CURRENT_TEST", None) + p = subprocess.run( + [ + sys.executable, + "-c", + f"import {__name__}; {__name__}.subprocess_test_echo_watch()", + ], + env=env, + capture_output=True, + text=True, + timeout=10, + ) + print(f"{p.stdout=}") + print(f"{p.stderr}=", file=sys.stderr) + assert p.returncode == 0 + while s.poll(timeout=100): + ident, msg = session.recv(s) + if msg["header"]["msg_type"] == "stream" and msg["content"]["name"] == "stdout": + stdout_chunks.append(msg["content"]["text"]) + + # check outputs + # use sets of lines to ignore ordering issues with + # async flush and watchfd thread + + # Check the stream output forwarded over zmq + zmq_stdout = "".join(stdout_chunks) + assert set(zmq_stdout.strip().splitlines()) == { + "fd", + "print", + # original stdout streams don't get captured, + # they write directly to the terminal + # "stdout", + # "__stdout__", + } + + # Check what was written to the process stdout (kernel terminal) + # just check that each output source went to the terminal + assert set(p.stdout.strip().splitlines()) == { + "fd", + "print", "stdout", - isatty=True, - echo=sys.stdout, - ) - save_stdout = sys.stdout - with stream, mock.patch.object(sys, "stdout", stream): - # write to low-level FD - os.write(stdout_fd, b"fd\n") - os.fsync(stdout_fd) - # write to unwrapped __stdout__ (should also go to original FD) - sys.__stdout__.write("__stdout__\n") - sys.__stdout__.flush() - # write to original sys.stdout (should be the same as __stdout__) - save_stdout.write("stdout\n") - save_stdout.flush() - # print (writes to stream) - print("print") - sys.stdout.flush() - - out, err = capfd.readouterr() - print(out, err) - assert out.strip() == "fd\n__stdout__\nstdout\nprint" + "__stdout__", + } From 1d7db8e8d02d26d84a3779992992a49157298898 Mon Sep 17 00:00:00 2001 From: Min RK Date: Fri, 5 May 2023 13:45:23 +0200 Subject: [PATCH 05/11] try to get echo_watch test passing --- ipykernel/tests/test_io.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/ipykernel/tests/test_io.py b/ipykernel/tests/test_io.py index 1623b104b..349ad943a 100644 --- a/ipykernel/tests/test_io.py +++ b/ipykernel/tests/test_io.py @@ -146,9 +146,12 @@ def subprocess_test_echo_watch(): # write to original sys.stdout (should be the same as __stdout__) save_stdout.write("stdout\n") save_stdout.flush() - stream.flush() + # is there another way to flush on the FD? + fd_file = os.fdopen(stdout_fd, "w") + fd_file.flush() # we don't have a sync flush on _reading_ from the watched pipe - time.sleep(0.5) + time.sleep(1) + stream.flush() iopub_thread.stop() iopub_thread.close() @@ -198,10 +201,8 @@ def test_echo_watch(ctx): assert set(zmq_stdout.strip().splitlines()) == { "fd", "print", - # original stdout streams don't get captured, - # they write directly to the terminal - # "stdout", - # "__stdout__", + "stdout", + "__stdout__", } # Check what was written to the process stdout (kernel terminal) From 02e998fcb2cc87b3493acaba36f58f4bfb0ad634 Mon Sep 17 00:00:00 2001 From: Min RK Date: Mon, 8 May 2023 14:48:15 +0200 Subject: [PATCH 06/11] narrow type for test_io assert that the unreachable None case doesn't happen --- ipykernel/tests/test_io.py | 1 + 1 file changed, 1 insertion(+) diff --git a/ipykernel/tests/test_io.py b/ipykernel/tests/test_io.py index 349ad943a..17a7ae28c 100644 --- a/ipykernel/tests/test_io.py +++ b/ipykernel/tests/test_io.py @@ -189,6 +189,7 @@ def test_echo_watch(ctx): assert p.returncode == 0 while s.poll(timeout=100): ident, msg = session.recv(s) + assert msg is not None # for type narrowing if msg["header"]["msg_type"] == "stream" and msg["content"]["name"] == "stdout": stdout_chunks.append(msg["content"]["text"]) From 65139afd525da00eaf2acb7443213d694b934cfa Mon Sep 17 00:00:00 2001 From: Min RK Date: Mon, 8 May 2023 15:07:09 +0200 Subject: [PATCH 07/11] lint: satisfy pedantry --- ipykernel/tests/test_io.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ipykernel/tests/test_io.py b/ipykernel/tests/test_io.py index 17a7ae28c..607d41bc2 100644 --- a/ipykernel/tests/test_io.py +++ b/ipykernel/tests/test_io.py @@ -71,7 +71,7 @@ def test_io_thread(iopub_thread): ctx1, pipe = thread._setup_pipe_out() pipe.close() thread._pipe_in.close() - thread._check_mp_mode = lambda: MASTER # type:ignore + thread._check_mp_mode = lambda: MASTER thread._really_send([b"hi"]) ctx1.destroy() thread.close() From d8a6f29b88a5b0b315b9a677655cacb81c32c65b Mon Sep 17 00:00:00 2001 From: Min RK Date: Mon, 8 May 2023 15:24:31 +0200 Subject: [PATCH 08/11] show packages in minimal-dependency env otherwise, we have no idea why things break --- .github/workflows/ci.yml | 5 +++++ pyproject.toml | 1 + 2 files changed, 6 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ad5d4fece..8781622d5 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -150,6 +150,11 @@ jobs: uses: jupyterlab/maintainer-tools/.github/actions/base-setup@v1 with: dependency_type: minimum + + - name: List installed packages + run: | + hatch run test:list + - name: Run the unit tests run: | hatch run test:nowarn || hatch run test:nowarn --lf diff --git a/pyproject.toml b/pyproject.toml index b042fac27..3494fefae 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -91,6 +91,7 @@ api = "sphinx-apidoc -o docs/api -f -E ipykernel ipykernel/tests ipykernel/inpro [tool.hatch.envs.test] features = ["test"] [tool.hatch.envs.test.scripts] +list = "python -m pip freeze" test = "python -m pytest -vv {args}" nowarn = "test -W default {args}" From 6c12316eadd508cb361796bc4937a1dc7ff9f82a Mon Sep 17 00:00:00 2001 From: Min RK Date: Mon, 8 May 2023 15:52:12 +0200 Subject: [PATCH 09/11] make sure iopub thread is started in test stop closes the event puller socket, but only if start was called --- ipykernel/tests/test_io.py | 1 + 1 file changed, 1 insertion(+) diff --git a/ipykernel/tests/test_io.py b/ipykernel/tests/test_io.py index 607d41bc2..a2045b47e 100644 --- a/ipykernel/tests/test_io.py +++ b/ipykernel/tests/test_io.py @@ -123,6 +123,7 @@ def subprocess_test_echo_watch(): with zmq.Context() as ctx, ctx.socket(zmq.PUSH) as pub: pub.connect(os.environ["IOPUB_URL"]) iopub_thread = IOPubThread(pub) + iopub_thread.start() stdout_fd = sys.stdout.fileno() sys.stdout.flush() stream = OutStream( From d5bcf548b7ebea4e3a54fdefd0cfc214da5bc89a Mon Sep 17 00:00:00 2001 From: Min RK Date: Tue, 9 May 2023 15:24:31 +0200 Subject: [PATCH 10/11] make sure we don't split newline in print call --- ipykernel/tests/test_io.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ipykernel/tests/test_io.py b/ipykernel/tests/test_io.py index a2045b47e..75c31e331 100644 --- a/ipykernel/tests/test_io.py +++ b/ipykernel/tests/test_io.py @@ -139,7 +139,7 @@ def subprocess_test_echo_watch(): # write to low-level FD os.write(stdout_fd, b"fd\n") # print (writes to stream) - print("print") + print("print\n", end="") sys.stdout.flush() # write to unwrapped __stdout__ (should also go to original FD) sys.__stdout__.write("__stdout__\n") From f4cbd7634d2871c0ba09cc6281c616f7774636e7 Mon Sep 17 00:00:00 2001 From: Min RK Date: Wed, 10 May 2023 14:41:36 +0200 Subject: [PATCH 11/11] TST: yield context in fixture --- ipykernel/tests/test_io.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/ipykernel/tests/test_io.py b/ipykernel/tests/test_io.py index 75c31e331..6a9f65170 100644 --- a/ipykernel/tests/test_io.py +++ b/ipykernel/tests/test_io.py @@ -18,8 +18,7 @@ @pytest.fixture def ctx(): ctx = zmq.Context() - return ctx - # yield ctx + yield ctx ctx.destroy()