From 679b36ab780cb97a96b006dcaae0d97820cd4458 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 31 May 2022 17:55:10 -0600 Subject: [PATCH 1/8] Test for write-on-terminate deadlock The subprocess writes a bunch of output when it terminates. Using `Popen.wait()` here will deadlock, as the Python docs loudly warn you in numerous places. --- distributed/tests/test_utils_test.py | 58 ++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/distributed/tests/test_utils_test.py b/distributed/tests/test_utils_test.py index 3116fb3a8c8..ae4dda5122a 100755 --- a/distributed/tests/test_utils_test.py +++ b/distributed/tests/test_utils_test.py @@ -31,6 +31,7 @@ gen_test, inc, new_config, + popen, raises_with_cause, tls_only_security, ) @@ -772,3 +773,60 @@ async def test(s): with pytest.raises(CustomError): test() assert test_done + + +def test_popen_write_during_terminate_deadlock(): + # Fabricate a command which, when terminated, tries to write more than the pipe buffer can hold (65536 bytes). + # This would deadlock if `proc.wait()` was called, since the process will be trying to write to stdout, but + # stdout isn't being cleared because our process is blocked in `proc.wait()`. + # `proc.communicate()` is necessary: https://docs.python.org/3/library/subprocess.html#subprocess.Popen.wait + with popen( + [ + sys.executable, + "-c", + "; ".join( + [ + "import signal", + "import time", + "import threading", + "e = threading.Event()", + "signal.signal(signal.SIGINT, lambda *args: [print('x' * 131072), e.set()])", + # ^ 131072 is 2x the size of the default Linux pipe buffer + "print('ready', flush=True)", + "e.wait()", + ] + ), + ], + flush_output=False, + ) as proc: + assert proc.stdout.readline().strip() == b"ready" + + # Exiting the context manager (terminating the subprocess) will raise `subprocess.TimeoutExpired` + # if this test breaks. + + # import subprocess + # import signal + # import time + + # proc = subprocess.Popen( + # # This shell magic: + # # - Starts `tail -f /dev/null` (which will hang forever) in the background. + # # It's in the background, because `trap`s don't run until subcommands finish, + # # so otherwise it would block the signal handler. + # # - When SIGINT occurs: + # # - Tries to print 131072 random bytes (more than pipe buffer size) + # # - Then kills the `tail -f`. Otherwise, the overall `proc` would still be running. + # 'trap "head -c 131072 /dev/urandom; kill %1" SIGINT; tail -f /dev/null & wait', + # shell=True, + # stdout=subprocess.PIPE, + # stderr=subprocess.STDOUT, + # ) + # time.sleep(0.5) + # proc.send_signal(signal.SIGINT) + # # time.sleep(0.5) + # proc.wait(1) + # # proc.kill() + # out, _ = proc.communicate(timeout=2) + # print(out) + # assert out + # # assert False From 1e22bb64d836b70de6f24d8e8a8ce06602a5dd2f Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 31 May 2022 17:55:42 -0600 Subject: [PATCH 2/8] remove experimental code --- distributed/tests/test_utils_test.py | 27 --------------------------- 1 file changed, 27 deletions(-) diff --git a/distributed/tests/test_utils_test.py b/distributed/tests/test_utils_test.py index ae4dda5122a..3aecf7b5bfd 100755 --- a/distributed/tests/test_utils_test.py +++ b/distributed/tests/test_utils_test.py @@ -803,30 +803,3 @@ def test_popen_write_during_terminate_deadlock(): # Exiting the context manager (terminating the subprocess) will raise `subprocess.TimeoutExpired` # if this test breaks. - - # import subprocess - # import signal - # import time - - # proc = subprocess.Popen( - # # This shell magic: - # # - Starts `tail -f /dev/null` (which will hang forever) in the background. - # # It's in the background, because `trap`s don't run until subcommands finish, - # # so otherwise it would block the signal handler. - # # - When SIGINT occurs: - # # - Tries to print 131072 random bytes (more than pipe buffer size) - # # - Then kills the `tail -f`. Otherwise, the overall `proc` would still be running. - # 'trap "head -c 131072 /dev/urandom; kill %1" SIGINT; tail -f /dev/null & wait', - # shell=True, - # stdout=subprocess.PIPE, - # stderr=subprocess.STDOUT, - # ) - # time.sleep(0.5) - # proc.send_signal(signal.SIGINT) - # # time.sleep(0.5) - # proc.wait(1) - # # proc.kill() - # out, _ = proc.communicate(timeout=2) - # print(out) - # assert out - # # assert False From c4737b680c5cc8c53e899e6f3f491518c4982429 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 31 May 2022 17:56:46 -0600 Subject: [PATCH 3/8] `communicate` instead of `wait` in `_terminate` Not a huge fan of this; it's a weird argument to pass in. Maybe should just inline the function. --- distributed/utils_test.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/distributed/utils_test.py b/distributed/utils_test.py index edd74bef17e..4bace2b4a8d 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -1285,14 +1285,17 @@ def raises(func, exc=Exception): return True -def _terminate_process(proc): +def _terminate_process(proc: subprocess.Popen, already_communicating: bool): if proc.poll() is None: if sys.platform.startswith("win"): proc.send_signal(signal.CTRL_BREAK_EVENT) else: proc.send_signal(signal.SIGINT) try: - proc.wait(30) + if already_communicating: + proc.wait(timeout=30) + else: + proc.communicate(timeout=30) finally: # Make sure we don't leave the process lingering around with suppress(OSError): @@ -1351,7 +1354,7 @@ def popen( finally: try: - _terminate_process(proc) + _terminate_process(proc, already_communicating=flush_output) finally: # XXX Also dump stdout if return code != 0 ? if flush_output: From 6a8ad6e9f264ebf3da63668e97a9dfff5527f818 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 31 May 2022 19:07:00 -0600 Subject: [PATCH 4/8] Don't capture output in popen if unneeded Our `popen` helper would always capture stdout/stderr. Redirecting output via pipes carries the risk of deadlock (see admonition under https://docs.python.org/3/library/subprocess.html#subprocess.Popen.stderr), so we would run `Popen.communicate` in a background thread to always be draining the pipe. If the test wasn't actually using stdout/stderr (most don't), it's just simpler to just not redirect it and let it print out as normal. As usual, pytest will hide the output if the test passes, and print it if it fails. This change isn't strictly necessary, it's just a simplification. And it makes it a little easier to implement the terminate-communicate logic for the `capture_output=True` case, since you don't have to worry about a background thread already running `communicate`. --- distributed/cli/tests/test_dask_scheduler.py | 4 +- distributed/cli/tests/test_dask_spec.py | 4 +- distributed/cli/tests/test_dask_ssh.py | 4 +- distributed/cli/tests/test_dask_worker.py | 14 ++-- distributed/tests/test_utils_test.py | 2 +- distributed/utils_test.py | 69 ++++++++------------ 6 files changed, 40 insertions(+), 57 deletions(-) diff --git a/distributed/cli/tests/test_dask_scheduler.py b/distributed/cli/tests/test_dask_scheduler.py index 308ee86a011..0a506dcd1de 100644 --- a/distributed/cli/tests/test_dask_scheduler.py +++ b/distributed/cli/tests/test_dask_scheduler.py @@ -66,7 +66,7 @@ def test_no_dashboard(loop): def test_dashboard(loop): pytest.importorskip("bokeh") - with popen(["dask-scheduler"], flush_output=False) as proc: + with popen(["dask-scheduler"], capture_output=True) as proc: line = wait_for_log_line(b"dashboard at", proc.stdout) dashboard_port = int(line.decode().split(":")[-1].strip()) @@ -218,7 +218,7 @@ def test_dashboard_port_zero(loop): pytest.importorskip("bokeh") with popen( ["dask-scheduler", "--dashboard-address", ":0"], - flush_output=False, + capture_output=True, ) as proc: line = wait_for_log_line(b"dashboard at", proc.stdout) dashboard_port = int(line.decode().split(":")[-1].strip()) diff --git a/distributed/cli/tests/test_dask_spec.py b/distributed/cli/tests/test_dask_spec.py index 6935061efd4..eaca5bc318f 100644 --- a/distributed/cli/tests/test_dask_spec.py +++ b/distributed/cli/tests/test_dask_spec.py @@ -78,7 +78,7 @@ def test_errors(): "--spec-file", "foo.yaml", ], - flush_output=False, + capture_output=True, ) as proc: line = proc.stdout.readline().decode() assert "exactly one" in line @@ -86,7 +86,7 @@ def test_errors(): with popen( [sys.executable, "-m", "distributed.cli.dask_spec"], - flush_output=False, + capture_output=True, ) as proc: line = proc.stdout.readline().decode() assert "exactly one" in line diff --git a/distributed/cli/tests/test_dask_ssh.py b/distributed/cli/tests/test_dask_ssh.py index d7b011737f7..1a2718dca00 100644 --- a/distributed/cli/tests/test_dask_ssh.py +++ b/distributed/cli/tests/test_dask_ssh.py @@ -23,7 +23,7 @@ def test_version_option(): def test_ssh_cli_nprocs_renamed_to_nworkers(loop): with popen( ["dask-ssh", "--nprocs=2", "--nohost", "localhost"], - flush_output=False, + capture_output=True, ) as proc: with Client("tcp://127.0.0.1:8786", timeout="15 seconds", loop=loop) as c: c.wait_for_workers(2, timeout="15 seconds") @@ -36,6 +36,6 @@ def test_ssh_cli_nprocs_renamed_to_nworkers(loop): def test_ssh_cli_nworkers_with_nprocs_is_an_error(): with popen( ["dask-ssh", "localhost", "--nprocs=2", "--nworkers=2"], - flush_output=False, + capture_output=True, ) as proc: wait_for_log_line(b"Both --nprocs and --nworkers", proc.stdout, max_lines=15) diff --git a/distributed/cli/tests/test_dask_worker.py b/distributed/cli/tests/test_dask_worker.py index ca8ed37aac5..56842ad7cc6 100644 --- a/distributed/cli/tests/test_dask_worker.py +++ b/distributed/cli/tests/test_dask_worker.py @@ -244,7 +244,7 @@ async def test_nanny_worker_port_range_too_many_workers_raises(s): "9686:9687", "--no-dashboard", ], - flush_output=False, + capture_output=True, ) as worker: wait_for_log_line(b"Not enough ports in range", worker.stdout, max_lines=100) @@ -278,14 +278,14 @@ async def test_no_nanny(c, s): async def test_reconnect_deprecated(c, s): with popen( ["dask-worker", s.address, "--reconnect"], - flush_output=False, + capture_output=True, ) as worker: wait_for_log_line(b"`--reconnect` option has been removed", worker.stdout) assert worker.wait() == 1 with popen( ["dask-worker", s.address, "--no-reconnect"], - flush_output=False, + capture_output=True, ) as worker: wait_for_log_line(b"flag is deprecated, and will be removed", worker.stdout) await c.wait_for_workers(1) @@ -361,7 +361,7 @@ def test_scheduler_address_env(loop, monkeypatch): async def test_nworkers_requires_nanny(s): with popen( ["dask-worker", s.address, "--nworkers=2", "--no-nanny"], - flush_output=False, + capture_output=True, ) as worker: wait_for_log_line(b"Failed to launch worker", worker.stdout, max_lines=15) @@ -400,7 +400,7 @@ async def test_nworkers_expands_name(c, s): async def test_worker_cli_nprocs_renamed_to_nworkers(c, s): with popen( ["dask-worker", s.address, "--nprocs=2"], - flush_output=False, + capture_output=True, ) as worker: await c.wait_for_workers(2) wait_for_log_line(b"renamed to --nworkers", worker.stdout, max_lines=15) @@ -410,7 +410,7 @@ async def test_worker_cli_nprocs_renamed_to_nworkers(c, s): async def test_worker_cli_nworkers_with_nprocs_is_an_error(s): with popen( ["dask-worker", s.address, "--nprocs=2", "--nworkers=2"], - flush_output=False, + capture_output=True, ) as worker: wait_for_log_line(b"Both --nprocs and --nworkers", worker.stdout, max_lines=15) @@ -708,7 +708,7 @@ def test_error_during_startup(monkeypatch, nanny): "--port", scheduler_port, ], - flush_output=False, + capture_output=True, ) as scheduler: start = time() # Wait for the scheduler to be up diff --git a/distributed/tests/test_utils_test.py b/distributed/tests/test_utils_test.py index 3aecf7b5bfd..add77c2898e 100755 --- a/distributed/tests/test_utils_test.py +++ b/distributed/tests/test_utils_test.py @@ -797,7 +797,7 @@ def test_popen_write_during_terminate_deadlock(): ] ), ], - flush_output=False, + capture_output=True, ) as proc: assert proc.stdout.readline().strip() == b"ready" diff --git a/distributed/utils_test.py b/distributed/utils_test.py index 4bace2b4a8d..613af5f4238 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -1285,17 +1285,14 @@ def raises(func, exc=Exception): return True -def _terminate_process(proc: subprocess.Popen, already_communicating: bool): +def _terminate_process(proc: subprocess.Popen): if proc.poll() is None: if sys.platform.startswith("win"): proc.send_signal(signal.CTRL_BREAK_EVENT) else: proc.send_signal(signal.SIGINT) try: - if already_communicating: - proc.wait(timeout=30) - else: - proc.communicate(timeout=30) + proc.communicate(timeout=30) finally: # Make sure we don't leave the process lingering around with suppress(OSError): @@ -1304,32 +1301,41 @@ def _terminate_process(proc: subprocess.Popen, already_communicating: bool): @contextmanager def popen( - args: list[str], flush_output: bool = True, **kwargs + args: list[str], capture_output: bool = False, **kwargs ) -> Iterator[subprocess.Popen[bytes]]: """Start a shell command in a subprocess. Yields a subprocess.Popen object. - stderr is redirected to stdout. - stdout is redirected to a pipe. + On exit, the subprocess is terminated. Parameters ---------- args: list[str] Command line arguments - flush_output: bool, optional - If True (the default), the stdout/stderr pipe is emptied while it is being - filled. Set to False if you wish to read the output yourself. Note that setting - this to False and then failing to periodically read from the pipe may result in - a deadlock due to the pipe getting full. + capture_output: bool, default False + Set to True if you need to read output from the subprocess. + Stdout and stderr will both be piped to ``proc.stdout``. + + If False, the subprocess will write to stdout/stderr normally. + + When True, the test could deadlock if the stdout pipe's buffer gets full + (Linux default is 65536 bytes; macOS and Windows may be smaller). + Therefore, you may need to periodically read from ``proc.stdout``, or + use ``proc.communicate``. All the deadlock warnings apply from + https://docs.python.org/3/library/subprocess.html#subprocess.Popen.stderr. + + Note that ``proc.communicate`` is called automatically when the + contextmanager exits. Calling code must not call ``proc.communicate`` + in a separate thread, since it's not thread-safe. kwargs: optional optional arguments to subprocess.Popen """ - kwargs["stdout"] = subprocess.PIPE - kwargs["stderr"] = subprocess.STDOUT + if capture_output: + kwargs["stdout"] = subprocess.PIPE + kwargs["stderr"] = subprocess.STDOUT if sys.platform.startswith("win"): # Allow using CTRL_C_EVENT / CTRL_BREAK_EVENT kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP - dump_stdout = False args = list(args) if sys.platform.startswith("win"): @@ -1338,37 +1344,14 @@ def popen( args[0] = os.path.join( os.environ.get("DESTDIR", "") + sys.prefix, "bin", args[0] ) - proc = subprocess.Popen(args, **kwargs) - - if flush_output: - ex = concurrent.futures.ThreadPoolExecutor(1) - flush_future = ex.submit(proc.communicate) - - try: - yield proc - - # asyncio.CancelledError is raised by @gen_test/@gen_cluster timeout - except (Exception, asyncio.CancelledError): - dump_stdout = True - raise - - finally: + with subprocess.Popen(args, **kwargs) as proc: try: - _terminate_process(proc, already_communicating=flush_output) + yield proc finally: - # XXX Also dump stdout if return code != 0 ? - if flush_output: - out, err = flush_future.result() - ex.shutdown() - else: - out, err = proc.communicate() + _terminate_process(proc) + out, err = proc.communicate() assert not err - if dump_stdout: - print("\n" + "-" * 27 + " Subprocess stdout/stderr" + "-" * 27) - print(out.decode().rstrip()) - print("-" * 80) - def wait_for(predicate, timeout, fail_func=None, period=0.05): deadline = time() + timeout From 90fe1b5d571d081fb9ed6c738963f20301157357 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Wed, 1 Jun 2022 09:10:08 -0600 Subject: [PATCH 5/8] full type annotations Co-authored-by: Thomas Grainger --- distributed/utils_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/utils_test.py b/distributed/utils_test.py index 613af5f4238..5f798cbaaf9 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -1285,7 +1285,7 @@ def raises(func, exc=Exception): return True -def _terminate_process(proc: subprocess.Popen): +def _terminate_process(proc: subprocess.Popen) -> None: if proc.poll() is None: if sys.platform.startswith("win"): proc.send_signal(signal.CTRL_BREAK_EVENT) From 8f07c3d84091ed49e6c0f39bc3e1e7ee215b084b Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Wed, 8 Jun 2022 10:43:22 -0600 Subject: [PATCH 6/8] Cosmetics Co-authored-by: crusaderky --- distributed/tests/test_utils_test.py | 30 +++++++++++++++++----------- 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/distributed/tests/test_utils_test.py b/distributed/tests/test_utils_test.py index add77c2898e..44b96522cf8 100755 --- a/distributed/tests/test_utils_test.py +++ b/distributed/tests/test_utils_test.py @@ -776,7 +776,8 @@ async def test(s): def test_popen_write_during_terminate_deadlock(): - # Fabricate a command which, when terminated, tries to write more than the pipe buffer can hold (65536 bytes). + # Fabricate a command which, when terminated, tries to write more than the pipe buffer can hold + # (OS specific: on Linux it's typically 65536 bytes; on Windows it's less). # This would deadlock if `proc.wait()` was called, since the process will be trying to write to stdout, but # stdout isn't being cleared because our process is blocked in `proc.wait()`. # `proc.communicate()` is necessary: https://docs.python.org/3/library/subprocess.html#subprocess.Popen.wait @@ -784,17 +785,22 @@ def test_popen_write_during_terminate_deadlock(): [ sys.executable, "-c", - "; ".join( - [ - "import signal", - "import time", - "import threading", - "e = threading.Event()", - "signal.signal(signal.SIGINT, lambda *args: [print('x' * 131072), e.set()])", - # ^ 131072 is 2x the size of the default Linux pipe buffer - "print('ready', flush=True)", - "e.wait()", - ] + textwrap.dedent( + """ + import signal + import threading + + e = threading.Event() + + def cb(signum, frame): + # 131072 is 2x the size of the default Linux pipe buffer + print('x' * 131072) + e.set() + + signal.signal(signal.SIGINT, cb) + print('ready', flush=True) + e.wait() + """ ), ], capture_output=True, From b0af22056d1eca1510fa0fc7f8811b43f684f92d Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Wed, 8 Jun 2022 10:44:14 -0600 Subject: [PATCH 7/8] import textwrap --- distributed/tests/test_utils_test.py | 1 + 1 file changed, 1 insertion(+) diff --git a/distributed/tests/test_utils_test.py b/distributed/tests/test_utils_test.py index 44b96522cf8..5502f68bc7e 100755 --- a/distributed/tests/test_utils_test.py +++ b/distributed/tests/test_utils_test.py @@ -4,6 +4,7 @@ import signal import socket import sys +import textwrap import threading from contextlib import contextmanager from time import sleep From 89669f65d57e93da0200739af823e42fd62c0406 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Wed, 8 Jun 2022 10:44:50 -0600 Subject: [PATCH 8/8] 88 character comments --- distributed/tests/test_utils_test.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/distributed/tests/test_utils_test.py b/distributed/tests/test_utils_test.py index 5502f68bc7e..fa6ad2d9369 100755 --- a/distributed/tests/test_utils_test.py +++ b/distributed/tests/test_utils_test.py @@ -777,11 +777,12 @@ async def test(s): def test_popen_write_during_terminate_deadlock(): - # Fabricate a command which, when terminated, tries to write more than the pipe buffer can hold - # (OS specific: on Linux it's typically 65536 bytes; on Windows it's less). - # This would deadlock if `proc.wait()` was called, since the process will be trying to write to stdout, but - # stdout isn't being cleared because our process is blocked in `proc.wait()`. - # `proc.communicate()` is necessary: https://docs.python.org/3/library/subprocess.html#subprocess.Popen.wait + # Fabricate a command which, when terminated, tries to write more than the pipe + # buffer can hold (OS specific: on Linux it's typically 65536 bytes; on Windows it's + # less). This would deadlock if `proc.wait()` was called, since the process will be + # trying to write to stdout, but stdout isn't being cleared because our process is + # blocked in `proc.wait()`. `proc.communicate()` is necessary: + # https://docs.python.org/3/library/subprocess.html#subprocess.Popen.wait with popen( [ sys.executable, @@ -808,5 +809,5 @@ def cb(signum, frame): ) as proc: assert proc.stdout.readline().strip() == b"ready" - # Exiting the context manager (terminating the subprocess) will raise `subprocess.TimeoutExpired` - # if this test breaks. + # Exiting the context manager (terminating the subprocess) will raise + # `subprocess.TimeoutExpired` if this test breaks.