diff --git a/distributed/cli/tests/test_dask_scheduler.py b/distributed/cli/tests/test_dask_scheduler.py index 308ee86a011..0a506dcd1de 100644 --- a/distributed/cli/tests/test_dask_scheduler.py +++ b/distributed/cli/tests/test_dask_scheduler.py @@ -66,7 +66,7 @@ def test_no_dashboard(loop): def test_dashboard(loop): pytest.importorskip("bokeh") - with popen(["dask-scheduler"], flush_output=False) as proc: + with popen(["dask-scheduler"], capture_output=True) as proc: line = wait_for_log_line(b"dashboard at", proc.stdout) dashboard_port = int(line.decode().split(":")[-1].strip()) @@ -218,7 +218,7 @@ def test_dashboard_port_zero(loop): pytest.importorskip("bokeh") with popen( ["dask-scheduler", "--dashboard-address", ":0"], - flush_output=False, + capture_output=True, ) as proc: line = wait_for_log_line(b"dashboard at", proc.stdout) dashboard_port = int(line.decode().split(":")[-1].strip()) diff --git a/distributed/cli/tests/test_dask_spec.py b/distributed/cli/tests/test_dask_spec.py index 6935061efd4..eaca5bc318f 100644 --- a/distributed/cli/tests/test_dask_spec.py +++ b/distributed/cli/tests/test_dask_spec.py @@ -78,7 +78,7 @@ def test_errors(): "--spec-file", "foo.yaml", ], - flush_output=False, + capture_output=True, ) as proc: line = proc.stdout.readline().decode() assert "exactly one" in line @@ -86,7 +86,7 @@ def test_errors(): with popen( [sys.executable, "-m", "distributed.cli.dask_spec"], - flush_output=False, + capture_output=True, ) as proc: line = proc.stdout.readline().decode() assert "exactly one" in line diff --git a/distributed/cli/tests/test_dask_ssh.py b/distributed/cli/tests/test_dask_ssh.py index d7b011737f7..1a2718dca00 100644 --- a/distributed/cli/tests/test_dask_ssh.py +++ b/distributed/cli/tests/test_dask_ssh.py @@ -23,7 +23,7 @@ def test_version_option(): def test_ssh_cli_nprocs_renamed_to_nworkers(loop): with popen( ["dask-ssh", "--nprocs=2", "--nohost", "localhost"], - flush_output=False, + capture_output=True, ) as proc: with Client("tcp://127.0.0.1:8786", timeout="15 seconds", loop=loop) as c: c.wait_for_workers(2, timeout="15 seconds") @@ -36,6 +36,6 @@ def test_ssh_cli_nprocs_renamed_to_nworkers(loop): def test_ssh_cli_nworkers_with_nprocs_is_an_error(): with popen( ["dask-ssh", "localhost", "--nprocs=2", "--nworkers=2"], - flush_output=False, + capture_output=True, ) as proc: wait_for_log_line(b"Both --nprocs and --nworkers", proc.stdout, max_lines=15) diff --git a/distributed/cli/tests/test_dask_worker.py b/distributed/cli/tests/test_dask_worker.py index ca8ed37aac5..56842ad7cc6 100644 --- a/distributed/cli/tests/test_dask_worker.py +++ b/distributed/cli/tests/test_dask_worker.py @@ -244,7 +244,7 @@ async def test_nanny_worker_port_range_too_many_workers_raises(s): "9686:9687", "--no-dashboard", ], - flush_output=False, + capture_output=True, ) as worker: wait_for_log_line(b"Not enough ports in range", worker.stdout, max_lines=100) @@ -278,14 +278,14 @@ async def test_no_nanny(c, s): async def test_reconnect_deprecated(c, s): with popen( ["dask-worker", s.address, "--reconnect"], - flush_output=False, + capture_output=True, ) as worker: wait_for_log_line(b"`--reconnect` option has been removed", worker.stdout) assert worker.wait() == 1 with popen( ["dask-worker", s.address, "--no-reconnect"], - flush_output=False, + capture_output=True, ) as worker: wait_for_log_line(b"flag is deprecated, and will be removed", worker.stdout) await c.wait_for_workers(1) @@ -361,7 +361,7 @@ def test_scheduler_address_env(loop, monkeypatch): async def test_nworkers_requires_nanny(s): with popen( ["dask-worker", s.address, "--nworkers=2", "--no-nanny"], - flush_output=False, + capture_output=True, ) as worker: wait_for_log_line(b"Failed to launch worker", worker.stdout, max_lines=15) @@ -400,7 +400,7 @@ async def test_nworkers_expands_name(c, s): async def test_worker_cli_nprocs_renamed_to_nworkers(c, s): with popen( ["dask-worker", s.address, "--nprocs=2"], - flush_output=False, + capture_output=True, ) as worker: await c.wait_for_workers(2) wait_for_log_line(b"renamed to --nworkers", worker.stdout, max_lines=15) @@ -410,7 +410,7 @@ async def test_worker_cli_nprocs_renamed_to_nworkers(c, s): async def test_worker_cli_nworkers_with_nprocs_is_an_error(s): with popen( ["dask-worker", s.address, "--nprocs=2", "--nworkers=2"], - flush_output=False, + capture_output=True, ) as worker: wait_for_log_line(b"Both --nprocs and --nworkers", worker.stdout, max_lines=15) @@ -708,7 +708,7 @@ def test_error_during_startup(monkeypatch, nanny): "--port", scheduler_port, ], - flush_output=False, + capture_output=True, ) as scheduler: start = time() # Wait for the scheduler to be up diff --git a/distributed/tests/test_utils_test.py b/distributed/tests/test_utils_test.py index 745ae59caa9..585fbad0811 100755 --- a/distributed/tests/test_utils_test.py +++ b/distributed/tests/test_utils_test.py @@ -4,6 +4,7 @@ import signal import socket import sys +import textwrap import threading from contextlib import contextmanager from time import sleep @@ -35,6 +36,7 @@ gen_test, inc, new_config, + popen, raises_with_cause, tls_only_security, ) @@ -787,6 +789,42 @@ async def test(s): assert test_done +def test_popen_write_during_terminate_deadlock(): + # Fabricate a command which, when terminated, tries to write more than the pipe + # buffer can hold (OS specific: on Linux it's typically 65536 bytes; on Windows it's + # less). This would deadlock if `proc.wait()` was called, since the process will be + # trying to write to stdout, but stdout isn't being cleared because our process is + # blocked in `proc.wait()`. `proc.communicate()` is necessary: + # https://docs.python.org/3/library/subprocess.html#subprocess.Popen.wait + with popen( + [ + sys.executable, + "-c", + textwrap.dedent( + """ + import signal + import threading + + e = threading.Event() + + def cb(signum, frame): + # 131072 is 2x the size of the default Linux pipe buffer + print('x' * 131072) + e.set() + + signal.signal(signal.SIGINT, cb) + print('ready', flush=True) + e.wait() + """ + ), + ], + capture_output=True, + ) as proc: + assert proc.stdout.readline().strip() == b"ready" + # Exiting the context manager (terminating the subprocess) will raise + # `subprocess.TimeoutExpired` if this test breaks. + + @gen_test() async def test_freeze_batched_send(): async with EchoServer() as e: diff --git a/distributed/utils_test.py b/distributed/utils_test.py index 644d3339291..34956571281 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -1283,14 +1283,14 @@ def raises(func, exc=Exception): return True -def _terminate_process(proc): +def _terminate_process(proc: subprocess.Popen) -> None: if proc.poll() is None: if sys.platform.startswith("win"): proc.send_signal(signal.CTRL_BREAK_EVENT) else: proc.send_signal(signal.SIGINT) try: - proc.wait(30) + proc.communicate(timeout=30) finally: # Make sure we don't leave the process lingering around with suppress(OSError): @@ -1299,32 +1299,41 @@ def _terminate_process(proc): @contextmanager def popen( - args: list[str], flush_output: bool = True, **kwargs + args: list[str], capture_output: bool = False, **kwargs ) -> Iterator[subprocess.Popen[bytes]]: """Start a shell command in a subprocess. Yields a subprocess.Popen object. - stderr is redirected to stdout. - stdout is redirected to a pipe. + On exit, the subprocess is terminated. Parameters ---------- args: list[str] Command line arguments - flush_output: bool, optional - If True (the default), the stdout/stderr pipe is emptied while it is being - filled. Set to False if you wish to read the output yourself. Note that setting - this to False and then failing to periodically read from the pipe may result in - a deadlock due to the pipe getting full. + capture_output: bool, default False + Set to True if you need to read output from the subprocess. + Stdout and stderr will both be piped to ``proc.stdout``. + + If False, the subprocess will write to stdout/stderr normally. + + When True, the test could deadlock if the stdout pipe's buffer gets full + (Linux default is 65536 bytes; macOS and Windows may be smaller). + Therefore, you may need to periodically read from ``proc.stdout``, or + use ``proc.communicate``. All the deadlock warnings apply from + https://docs.python.org/3/library/subprocess.html#subprocess.Popen.stderr. + + Note that ``proc.communicate`` is called automatically when the + contextmanager exits. Calling code must not call ``proc.communicate`` + in a separate thread, since it's not thread-safe. kwargs: optional optional arguments to subprocess.Popen """ - kwargs["stdout"] = subprocess.PIPE - kwargs["stderr"] = subprocess.STDOUT + if capture_output: + kwargs["stdout"] = subprocess.PIPE + kwargs["stderr"] = subprocess.STDOUT if sys.platform.startswith("win"): # Allow using CTRL_C_EVENT / CTRL_BREAK_EVENT kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP - dump_stdout = False args = list(args) if sys.platform.startswith("win"): @@ -1333,37 +1342,14 @@ def popen( args[0] = os.path.join( os.environ.get("DESTDIR", "") + sys.prefix, "bin", args[0] ) - proc = subprocess.Popen(args, **kwargs) - - if flush_output: - ex = concurrent.futures.ThreadPoolExecutor(1) - flush_future = ex.submit(proc.communicate) - - try: - yield proc - - # asyncio.CancelledError is raised by @gen_test/@gen_cluster timeout - except (Exception, asyncio.CancelledError): - dump_stdout = True - raise - - finally: + with subprocess.Popen(args, **kwargs) as proc: try: - _terminate_process(proc) + yield proc finally: - # XXX Also dump stdout if return code != 0 ? - if flush_output: - out, err = flush_future.result() - ex.shutdown() - else: - out, err = proc.communicate() + _terminate_process(proc) + out, err = proc.communicate() assert not err - if dump_stdout: - print("\n" + "-" * 27 + " Subprocess stdout/stderr" + "-" * 27) - print(out.decode().rstrip()) - print("-" * 80) - def wait_for(predicate, timeout, fail_func=None, period=0.05): deadline = time() + timeout