From 28c662e07e7bb12a85bd6a3fe7751136437d790b Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Sat, 11 Jun 2022 17:33:29 -0600 Subject: [PATCH 1/5] Log popen stdout/err when subprocess times out --- distributed/tests/test_utils_test.py | 36 ++++++++++++++++++++++ distributed/utils_test.py | 46 ++++++++++++++++++++++++---- 2 files changed, 76 insertions(+), 6 deletions(-) diff --git a/distributed/tests/test_utils_test.py b/distributed/tests/test_utils_test.py index 585fbad0811..81d6c20ef36 100755 --- a/distributed/tests/test_utils_test.py +++ b/distributed/tests/test_utils_test.py @@ -3,6 +3,7 @@ import pathlib import signal import socket +import subprocess import sys import textwrap import threading @@ -825,6 +826,41 @@ def cb(signum, frame): # `subprocess.TimeoutExpired` if this test breaks. +def test_popen_timeout(capsys: pytest.CaptureFixture): + with pytest.raises(subprocess.TimeoutExpired): + with popen( + [ + sys.executable, + "-c", + textwrap.dedent( + """ + import time + + print('ready', flush=True) + while True: + try: + time.sleep(0.1) + print("slept", flush=True) + except KeyboardInterrupt: + print("interrupted", flush=True) + """ + ), + ], + capture_output=True, + terminate_timeout=1, + ) as proc: + assert proc.stdout + assert proc.stdout.readline().strip() == b"ready" + # Exiting contextmanager sends SIGINT, waits 1s for shutdown. + # Our script ignores SIGINT, so after 1s it sends SIGKILL. + # The contextmanager raises `TimeoutExpired` once the process is killed, + # because it failed the 1s timeout + captured = capsys.readouterr() + assert "stdout of" in captured.out + assert "interrupted" in captured.out + assert "slept" in captured.out + + @gen_test() async def test_freeze_batched_send(): async with EchoServer() as e: diff --git a/distributed/utils_test.py b/distributed/utils_test.py index 34956571281..344936eec54 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -1283,23 +1283,30 @@ def raises(func, exc=Exception): return True -def _terminate_process(proc: subprocess.Popen) -> None: +def _terminate_process( + proc: subprocess.Popen, terminate_timeout: float, kill_timeout: float +) -> None: if proc.poll() is None: if sys.platform.startswith("win"): proc.send_signal(signal.CTRL_BREAK_EVENT) else: proc.send_signal(signal.SIGINT) try: - proc.communicate(timeout=30) + proc.communicate(timeout=terminate_timeout) finally: # Make sure we don't leave the process lingering around with suppress(OSError): proc.kill() + proc.communicate(timeout=kill_timeout) @contextmanager def popen( - args: list[str], capture_output: bool = False, **kwargs + args: list[str], + capture_output: bool = False, + terminate_timeout: float = 30, + kill_timeout: float = 10, + **kwargs, ) -> Iterator[subprocess.Popen[bytes]]: """Start a shell command in a subprocess. Yields a subprocess.Popen object. @@ -1325,6 +1332,18 @@ def popen( Note that ``proc.communicate`` is called automatically when the contextmanager exits. Calling code must not call ``proc.communicate`` in a separate thread, since it's not thread-safe. + terminate_timeout: optional, default 30 + When the contextmanager exits, SIGINT is sent to the subprocess. + ``terminate_timeout`` sets how many seconds to wait for the subprocess + to terminate after that. If the timeout expires, SIGKILL is sent to + the subprocess (which cannot be blocked); see ``kill_timeout``. + If this timeout expires, `subprocess.TimeoutExpired` is raised. + kill_timeout: optional, default 10 + When the contextmanger exits, if the subprocess does not shut down + after ``terminate_timeout`` seconds in response to SIGINT, SIGKILL + is sent to the subprocess (which cannot be blocked). ``kill_timeout`` + controls how long to wait after SIGKILL to join the process. + If this timeout expires, `subprocess.TimeoutExpired` is raised. kwargs: optional optional arguments to subprocess.Popen """ @@ -1346,9 +1365,24 @@ def popen( try: yield proc finally: - _terminate_process(proc) - out, err = proc.communicate() - assert not err + try: + _terminate_process(proc, terminate_timeout, kill_timeout) + except subprocess.TimeoutExpired as err: + if err.stdout: + print(f"------ stdout of {err.cmd} ------") + print( + err.stdout.decode() + if isinstance(err.stdout, bytes) + else err.stdout + ) + if err.stderr: + print(f"------ stderr of {err.cmd} ------") + print( + err.stderr.decode() + if isinstance(err.stderr, bytes) + else err.stderr + ) + raise def wait_for(predicate, timeout, fail_func=None, period=0.05): From abdd846964b2fdff89762c7fcc84b595b2918472 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 14 Jun 2022 18:26:18 -0600 Subject: [PATCH 2/5] Always print cpatured output, even without failure pytest will suppress it if the test passes --- distributed/tests/test_utils_test.py | 13 ++++++++++- distributed/utils_test.py | 34 +++++++++++----------------- 2 files changed, 25 insertions(+), 22 deletions(-) diff --git a/distributed/tests/test_utils_test.py b/distributed/tests/test_utils_test.py index 81d6c20ef36..d146f6315ca 100755 --- a/distributed/tests/test_utils_test.py +++ b/distributed/tests/test_utils_test.py @@ -856,11 +856,22 @@ def test_popen_timeout(capsys: pytest.CaptureFixture): # The contextmanager raises `TimeoutExpired` once the process is killed, # because it failed the 1s timeout captured = capsys.readouterr() - assert "stdout of" in captured.out + assert "stdout: returncode -9" in captured.out assert "interrupted" in captured.out assert "slept" in captured.out +def test_popen_always_prints_output(capsys: pytest.CaptureFixture): + # We always print stdout even if there was no error, in case some other assertion + # later in the test fails and the output would be useful. + with popen(["/bin/echo", "foo"], capture_output=True) as proc: + proc.communicate(timeout=5) + + captured = capsys.readouterr() + assert "stdout: returncode 0" in captured.out + assert "foo" in captured.out + + @gen_test() async def test_freeze_batched_send(): async with EchoServer() as e: diff --git a/distributed/utils_test.py b/distributed/utils_test.py index 344936eec54..a8d0df68c97 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -1283,9 +1283,7 @@ def raises(func, exc=Exception): return True -def _terminate_process( - proc: subprocess.Popen, terminate_timeout: float, kill_timeout: float -) -> None: +def _terminate_process(proc: subprocess.Popen, terminate_timeout: float) -> None: if proc.poll() is None: if sys.platform.startswith("win"): proc.send_signal(signal.CTRL_BREAK_EVENT) @@ -1297,7 +1295,6 @@ def _terminate_process( # Make sure we don't leave the process lingering around with suppress(OSError): proc.kill() - proc.communicate(timeout=kill_timeout) @contextmanager @@ -1332,6 +1329,9 @@ def popen( Note that ``proc.communicate`` is called automatically when the contextmanager exits. Calling code must not call ``proc.communicate`` in a separate thread, since it's not thread-safe. + + When captured, the stdout/stderr of the process is always printed + when the process exits for easier test debugging. terminate_timeout: optional, default 30 When the contextmanager exits, SIGINT is sent to the subprocess. ``terminate_timeout`` sets how many seconds to wait for the subprocess @@ -1366,23 +1366,15 @@ def popen( yield proc finally: try: - _terminate_process(proc, terminate_timeout, kill_timeout) - except subprocess.TimeoutExpired as err: - if err.stdout: - print(f"------ stdout of {err.cmd} ------") - print( - err.stdout.decode() - if isinstance(err.stdout, bytes) - else err.stdout - ) - if err.stderr: - print(f"------ stderr of {err.cmd} ------") - print( - err.stderr.decode() - if isinstance(err.stderr, bytes) - else err.stderr - ) - raise + _terminate_process(proc, terminate_timeout) + finally: + out, err = proc.communicate(timeout=kill_timeout) + if out: + print(f"------ stdout: returncode {proc.returncode}, {args} ------") + print(out.decode() if isinstance(out, bytes) else out) + if err: + print(f"------ stderr: returncode {proc.returncode}, {args} ------") + print(err.decode() if isinstance(err, bytes) else err) def wait_for(predicate, timeout, fail_func=None, period=0.05): From 224da83b0e56d9c182045318e224c12c6267dfe4 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 14 Jun 2022 18:30:09 -0600 Subject: [PATCH 3/5] Use `popen` helper in `test_quiet_close_process` Printing the logs will likely help diagnose why this test is hanging. --- distributed/tests/test_client.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index 28d490f15b5..c644fb796b7 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -90,6 +90,7 @@ inc, map_varying, nodebug, + popen, pristine_loop, randominc, save_sys_modules, @@ -7507,7 +7508,7 @@ async def test_wait_for_workers_updates_info(c, s): client_script = """ from dask.distributed import Client if __name__ == "__main__": - client = Client(processes=%s, n_workers=1) + client = Client(processes=%s, n_workers=1, scheduler_port=0, dashboard_address=":0") """ @@ -7516,13 +7517,8 @@ def test_quiet_close_process(processes, tmp_path): with open(tmp_path / "script.py", mode="w") as f: f.write(client_script % processes) - proc = subprocess.Popen( - [sys.executable, tmp_path / "script.py"], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - ) - - out, err = proc.communicate(timeout=10) + with popen([sys.executable, tmp_path / "script.py"], capture_output=True) as proc: + out, err = proc.communicate(timeout=10) assert not out assert not err From d2f03e2ec68933382db63d7ceb613ad7885a22b9 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 14 Jun 2022 18:56:41 -0600 Subject: [PATCH 4/5] Attempt to make test work on windows This is a shot in the dark --- distributed/tests/test_utils_test.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/distributed/tests/test_utils_test.py b/distributed/tests/test_utils_test.py index d146f6315ca..c00f056e2f6 100755 --- a/distributed/tests/test_utils_test.py +++ b/distributed/tests/test_utils_test.py @@ -834,8 +834,14 @@ def test_popen_timeout(capsys: pytest.CaptureFixture): "-c", textwrap.dedent( """ + import signal + import sys import time + if sys.platform == "win32": + signal.signal(signal.SIGBREAK, signal.default_int_handler) + # ^ Cause `CTRL_BREAK_EVENT` on Windows to raise `KeyboardInterrupt` + print('ready', flush=True) while True: try: From 9309c4ff868b327eaa8ccaf4c385357837729c04 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 14 Jun 2022 20:11:05 -0600 Subject: [PATCH 5/5] fix windows tests * different exit code on windows (not -9) * `/bin/echo` obvs doesn't exist --- distributed/tests/test_utils_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/tests/test_utils_test.py b/distributed/tests/test_utils_test.py index c00f056e2f6..89babb27e83 100755 --- a/distributed/tests/test_utils_test.py +++ b/distributed/tests/test_utils_test.py @@ -862,7 +862,7 @@ def test_popen_timeout(capsys: pytest.CaptureFixture): # The contextmanager raises `TimeoutExpired` once the process is killed, # because it failed the 1s timeout captured = capsys.readouterr() - assert "stdout: returncode -9" in captured.out + assert "stdout: returncode" in captured.out assert "interrupted" in captured.out assert "slept" in captured.out @@ -870,7 +870,7 @@ def test_popen_timeout(capsys: pytest.CaptureFixture): def test_popen_always_prints_output(capsys: pytest.CaptureFixture): # We always print stdout even if there was no error, in case some other assertion # later in the test fails and the output would be useful. - with popen(["/bin/echo", "foo"], capture_output=True) as proc: + with popen([sys.executable, "-c", "print('foo')"], capture_output=True) as proc: proc.communicate(timeout=5) captured = capsys.readouterr()