From 62738aa2b292ec02022603782a4a0a13daccb5a7 Mon Sep 17 00:00:00 2001 From: Bernhard Stadlbauer Date: Thu, 5 Jan 2023 10:51:15 +0100 Subject: [PATCH 1/4] add tests that break stagger and restart --- distributed/cli/tests/test_dask_worker.py | 33 +++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/distributed/cli/tests/test_dask_worker.py b/distributed/cli/tests/test_dask_worker.py index 3750b24e0bb..b2fce4f93f2 100644 --- a/distributed/cli/tests/test_dask_worker.py +++ b/distributed/cli/tests/test_dask_worker.py @@ -610,6 +610,39 @@ def dask_setup(worker): assert foo == "setup" +@pytest.mark.slow +@gen_cluster(nthreads=[], client=True) +async def test_set_lifetime_stagger_via_env_var(c, s): + # Ensure that lifetime stagger can be set via an environment variable + env = os.environ.copy() + env["DASK_DISTRIBUTED__WORKER__LIFETIME__DURATION"] = "10 seconds" + env["DASK_DISTRIBUTED__WORKER__LIFETIME__STAGGER"] = "2 seconds" + with popen(["dask", "worker", s.address], env=env), popen( + ["dask", "worker", s.address], env=env + ): + await c.wait_for_workers(1) + [lifetime1, lifetime2] = ( + await c.run(lambda dask_worker: dask_worker.lifetime) + ).values() + assert lifetime1 != lifetime2 + assert 8 <= lifetime1 <= 12 + assert 8 <= lifetime2 <= 12 + + +@pytest.mark.slow +@gen_cluster(nthreads=[], client=True) +async def test_set_lifetime_restart_via_env_var(c, s): + # Ensure that lifetime restart can be set via an environment variable + env = os.environ.copy() + env["DASK_DISTRIBUTED__WORKER__LIFETIME__RESTART"] = "True" + with popen(["dask", "worker", s.address], env=env): + await c.wait_for_workers(1) + [lifetime_restart] = ( + await c.run(lambda dask_worker: dask_worker.lifetime_restart) + ).values() + assert lifetime_restart + + @pytest.mark.slow @pytest.mark.parametrize("nanny", ["--nanny", "--no-nanny"]) def test_timeout(nanny): From 4c1dcb9d1a7cdd9284ddaa28beec75d72489ce60 Mon Sep 17 00:00:00 2001 From: Bernhard Stadlbauer <11799671+bstadlbauer@users.noreply.github.com> Date: Tue, 3 Jan 2023 17:31:56 +0100 Subject: [PATCH 2/4] set `--lifetime-stagger` default value to `None` --- distributed/cli/dask_worker.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/distributed/cli/dask_worker.py b/distributed/cli/dask_worker.py index c000ed1d2d2..24393a6c6bb 100755 --- a/distributed/cli/dask_worker.py +++ b/distributed/cli/dask_worker.py @@ -190,8 +190,7 @@ @click.option( "--lifetime-stagger", type=str, - default="0 seconds", - show_default=True, + default=None, help="Random amount by which to stagger lifetime values", ) @click.option( From afacf2e58d410b787fee3a3fb7f5cd00c51101a0 Mon Sep 17 00:00:00 2001 From: Bernhard Stadlbauer Date: Thu, 5 Jan 2023 10:47:30 +0100 Subject: [PATCH 3/4] set `--lifetime-restart` default value to `None` --- distributed/cli/dask_worker.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/distributed/cli/dask_worker.py b/distributed/cli/dask_worker.py index 24393a6c6bb..ad2b3c754a0 100755 --- a/distributed/cli/dask_worker.py +++ b/distributed/cli/dask_worker.py @@ -203,8 +203,7 @@ @click.option( "--lifetime-restart/--no-lifetime-restart", "lifetime_restart", - default=False, - show_default=True, + default=None, required=False, help="Whether or not to restart the worker after the lifetime lapses. " "This assumes that you are using the --lifetime and --nanny keywords", From 4ce53dfdc78b5975da3012b8fd7910626c08fc52 Mon Sep 17 00:00:00 2001 From: Bernhard Stadlbauer <11799671+bstadlbauer@users.noreply.github.com> Date: Thu, 5 Jan 2023 14:05:52 +0100 Subject: [PATCH 4/4] wait for two workers --- distributed/cli/tests/test_dask_worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/cli/tests/test_dask_worker.py b/distributed/cli/tests/test_dask_worker.py index b2fce4f93f2..7212ee36d76 100644 --- a/distributed/cli/tests/test_dask_worker.py +++ b/distributed/cli/tests/test_dask_worker.py @@ -620,7 +620,7 @@ async def test_set_lifetime_stagger_via_env_var(c, s): with popen(["dask", "worker", s.address], env=env), popen( ["dask", "worker", s.address], env=env ): - await c.wait_for_workers(1) + await c.wait_for_workers(2) [lifetime1, lifetime2] = ( await c.run(lambda dask_worker: dask_worker.lifetime) ).values()