From 19d8befd4c5b276f8b8c2d30a3a37033fadb3cce Mon Sep 17 00:00:00 2001 From: crusaderky Date: Tue, 22 Feb 2022 16:28:38 +0000 Subject: [PATCH 01/13] Fix flaky test_spill_hysteresis --- distributed/tests/test_worker.py | 162 +++++++++++++++++-------------- distributed/worker.py | 13 +-- 2 files changed, 94 insertions(+), 81 deletions(-) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 0bce544f05b..fcaf09bec5c 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -1282,6 +1282,38 @@ async def test_spill_constrained(c, s, w): assert set(w.data.disk) == {x.key} +class BadSizeof: + """Configurable actual process memory and reported managed memory""" + + # dummy *args is to facilitate Client.map + def __init__(self, *args, process: float, managed: float, pickle_time: float = 0.5): + self.process = int(process) + self.managed = int(managed) + self.pickle_time = pickle_time + self.data = "x" * self.process + + def __sizeof__(self) -> int: + return self.managed + + def __getstate__(self): + """Do not rely on actual disk speed, which is sluggish and unpredictable on CI. + Also remove the impact of installing lz4 or blosc, which would shrink down + self.data to kilobytes. + + Make sure that the time spent writing to disk a key exceeds the time needed by + the OS kernel to react to the free() from the previous spilled key and shrink + down the process memory. + + In case of flakiness, read: https://github.com/dask/distributed/issues/5840 + """ + sleep(self.pickle_time) + return self.process, self.managed, self.pickle_time + + def __setstate__(self, state) -> None: + self.process, self.managed, self.pickle_time = state + self.data = "x" * self.process + + @requires_zict @gen_cluster( nthreads=[("", 1)], @@ -1306,44 +1338,36 @@ async def test_spill_spill_threshold(c, s, a): memory = psutil.Process().memory_info().rss a.memory_limit = (memory + 300e6) / 0.7 - class UnderReport: - """100 MB process memory, 10 bytes reported managed memory""" - - def __init__(self, *args): - self.data = "x" * int(100e6) - - def __sizeof__(self): - return 10 - - def __reduce__(self): - """Speed up test by writing very little to disk when spilling""" - return UnderReport, () - - futures = c.map(UnderReport, range(8)) - + futures = c.map(BadSizeof, range(8), process=100e6, managed=10, pickle_time=0) while not a.data.disk: await asyncio.sleep(0.01) -async def assert_not_everything_is_spilled(w: Worker) -> None: +async def assert_not_everything_is_spilled(dask_worker: Worker) -> None: + from distributed.spill import SpillBuffer + + assert isinstance(dask_worker.data, SpillBuffer) start = time() - while time() < start + 0.5: - assert w.data - if not w.data.memory: # type: ignore - # The hysteresis system fails on Windows and MacOSX because process memory - # is very slow to shrink down after calls to PyFree. As a result, - # Worker.memory_monitor will continue spilling until there's nothing left. - # Nothing we can do about this short of finding either a way to change this - # behaviour at OS level or a better measure of allocated memory. - assert not LINUX, "All data was spilled to disk" - raise pytest.xfail("https://github.com/dask/distributed/issues/5840") - await asyncio.sleep(0) + nspilled = 0 + # This timeout must be longer than the 0.5s in BadSizeof.__reduce__ + while time() < start + 1.0: + assert dask_worker.data + assert dask_worker.data.memory + if len(dask_worker.data.disk) != nspilled: + nspilled = len(dask_worker.data.disk) + start = time() # We just spilled; reset timer + await asyncio.sleep(0.01) +@pytest.mark.slow @requires_zict @gen_cluster( nthreads=[("", 1)], client=True, + # Run the test in a freshly spawned interpreter to ensure a clear memory situation, + # as opposed to the potentially heavily fragmented and unpredictable condition of + # the process used to run all the tests so far + Worker=Nanny, worker_kwargs=dict( # FIXME https://github.com/dask/distributed/issues/5367 # Can't reconfigure the absolute target threshold after the worker @@ -1357,42 +1381,45 @@ async def assert_not_everything_is_spilled(w: Worker) -> None: memory_pause_fraction=False, ), ) -async def test_spill_no_target_threshold(c, s, a): +async def test_spill_no_target_threshold(c, s, nanny): """Test that you can enable the spill threshold while leaving the target threshold to False """ - memory = psutil.Process().memory_info().rss - a.memory_limit = (memory + 300e6) / 0.7 # 300 MB before we start spilling - class OverReport: - """Configurable process memory, 10 GB reported managed memory""" + def change_memory_limit(dask_worker): + memory = psutil.Process().memory_info().rss + # 300 MB before we start spilling + dask_worker.memory_limit = (memory + 300e6) / 0.7 - def __init__(self, size): - self.data = "x" * size + await c.run(change_memory_limit) - def __sizeof__(self): - return int(10e9) + async def get_keys() -> tuple[set[str], set[str]]: + def _(dask_worker): + return set(dask_worker.data.memory), set(dask_worker.data.disk) - def __reduce__(self): - """Speed up test by writing very little to disk when spilling""" - return OverReport, (len(self.data),) + out = await c.run(_) + return out[nanny.worker_address] - f1 = c.submit(OverReport, 0, key="f1") + f1 = c.submit(inc, 0, key="f1") await wait(f1) - assert set(a.data.memory) == {"f1"} + mem, spilled = await get_keys() + assert mem == {"f1"} + assert not spilled - # 800 MB. Use large chunks to stimulate timely release of process memory. - futures = c.map(OverReport, range(int(100e6), int(100e6) + 8)) + # 800 MB process memory, 10 GB bogous managed memory. + # Use large chunks to stimulate timely release of process memory. + futures = c.map(BadSizeof, range(8), process=100e6, managed=10e9) - while not a.data.disk: + while not spilled: + mem, spilled = await get_keys() await asyncio.sleep(0.01) - assert "f1" in a.data.disk + assert "f1" in spilled # Spilling normally starts at the spill threshold and stops at the target threshold. # In this special case, it stops as soon as the process memory goes below the spill # threshold, e.g. without a hysteresis cycle. Test that we didn't instead dump the # whole data to disk (memory_limit * target = 0) - await assert_not_everything_is_spilled(a) + await c.run(assert_not_everything_is_spilled) @pytest.mark.slow @@ -1400,48 +1427,39 @@ def __reduce__(self): @gen_cluster( nthreads=[("", 1)], client=True, + # Run the test in a freshly spawned interpreter to ensure a clear memory situation, + # as opposed to the potentially heavily fragmented and unpredictable condition of + # the process used to run all the tests so far + Worker=Nanny, worker_kwargs=dict( - memory_limit="1 GiB", # See FIXME note in previous test + memory_limit="10 GiB", # See FIXME note in previous test memory_monitor_interval="10ms", memory_target_fraction=0.4, memory_spill_fraction=0.7, memory_pause_fraction=False, ), ) -async def test_spill_hysteresis(c, s, a): - memory = psutil.Process().memory_info().rss - a.memory_limit = (memory + 1e9) / 0.7 # Start spilling after 1 GB +async def test_spill_hysteresis(c, s, nanny): + def change_memory_limit(dask_worker): + memory = psutil.Process().memory_info().rss + dask_worker.memory_limit = (memory + 1e9) / 0.7 # Start spilling after 1 GB + + await c.run(change_memory_limit) # Under-report managed memory, so that we reach the spill threshold for process # memory without first reaching the target threshold for managed memory - class UnderReport: - def __init__(self): - self.data = "x" * int(100e6) # 100 MB - - def __sizeof__(self): - return 1 - - def __reduce__(self): - """Speed up test by writing very little to disk when spilling""" - return UnderReport, () + futures = c.map(BadSizeof, range(11), process=100e6, managed=1) - max_in_memory = 0 - futures = [] - while not a.data.disk: - futures.append(c.submit(UnderReport, pure=False)) - max_in_memory = max(max_in_memory, len(a.data.memory)) - await wait(futures) - await asyncio.sleep(0.05) - max_in_memory = max(max_in_memory, len(a.data.memory)) + # Wait until spilling stops, but no less than 1s + await c.run(assert_not_everything_is_spilled) - # If there were no hysteresis, we would lose exactly 1 key. + # If there were no hysteresis, we would lose 1-2 keys. # Note that, for this test to be meaningful, memory must shrink down readily when # we deallocate Python objects. This is not always the case on Windows and MacOSX; # on Linux we set MALLOC_TRIM to help in that regard. # To verify that this test is useful, set target=spill and watch it fail. - while len(a.data.memory) > max_in_memory - 3: - await asyncio.sleep(0.01) - await assert_not_everything_is_spilled(a) + nspilled = await c.run(lambda dask_worker: len(dask_worker.data.disk)) + assert 3 < nspilled[nanny.worker_address] < 11 @pytest.mark.slow diff --git a/distributed/worker.py b/distributed/worker.py index 3f5761f7b9d..3b8738647f5 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -3743,7 +3743,6 @@ def check_pause(memory): "Worker is at %.0f%% memory usage. Start spilling data to disk.", frac * 100, ) - start = time() # Implement hysteresis cycle where spilling starts at the spill threshold # and stops at the target threshold. Normally that here the target threshold # defines process memory, whereas normally it defines reported managed @@ -3768,18 +3767,14 @@ def check_pause(memory): break weight = self.data.evict() if weight == -1: - # Failed to evict: disk full, spill size limit exceeded, or pickle error + # Failed to evict: + # disk full, spill size limit exceeded, or pickle error break total += weight count += 1 - # If the current buffer is filled with a lot of small values, - # evicting one at a time is very slow and the worker might - # generate new data faster than it is able to evict. Therefore, - # only pass on control if we spent at least 0.5s evicting - if time() - start > 0.5: - await asyncio.sleep(0) - start = time() + await asyncio.sleep(0) + memory = proc.memory_info().rss if total > need and memory > target: # Issue a GC to ensure that the evicted data is actually From 8f2ffa2fabacad8d0013e0b53112f2a39ee770c3 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Tue, 22 Feb 2022 16:37:14 +0000 Subject: [PATCH 02/13] stress test --- .github/workflows/tests.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 7e087f99757..d1f8431977b 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -129,8 +129,8 @@ jobs: set -o pipefail mkdir reports - pytest distributed \ - -m "not avoid_ci and ${{ matrix.partition }}" --runslow \ + pytest distributed/tests/test_worker.py \ + -m "not avoid_ci" --runslow \ --leaks=fds,processes,threads \ --junitxml reports/pytest.xml -o junit_suite_name=$TEST_ID \ --cov=distributed --cov-report=xml \ From bddd7c3fa3489f765d91be4efd34dcd10027baea Mon Sep 17 00:00:00 2001 From: crusaderky Date: Tue, 22 Feb 2022 16:56:47 +0000 Subject: [PATCH 03/13] xfail --- distributed/tests/test_worker.py | 82 ++++++++++---------------------- 1 file changed, 25 insertions(+), 57 deletions(-) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index fcaf09bec5c..8734cbe4395 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -1286,10 +1286,9 @@ class BadSizeof: """Configurable actual process memory and reported managed memory""" # dummy *args is to facilitate Client.map - def __init__(self, *args, process: float, managed: float, pickle_time: float = 0.5): + def __init__(self, *args, process: float, managed: float): self.process = int(process) self.managed = int(managed) - self.pickle_time = pickle_time self.data = "x" * self.process def __sizeof__(self) -> int: @@ -1299,18 +1298,11 @@ def __getstate__(self): """Do not rely on actual disk speed, which is sluggish and unpredictable on CI. Also remove the impact of installing lz4 or blosc, which would shrink down self.data to kilobytes. - - Make sure that the time spent writing to disk a key exceeds the time needed by - the OS kernel to react to the free() from the previous spilled key and shrink - down the process memory. - - In case of flakiness, read: https://github.com/dask/distributed/issues/5840 """ - sleep(self.pickle_time) - return self.process, self.managed, self.pickle_time + return self.process, self.managed def __setstate__(self, state) -> None: - self.process, self.managed, self.pickle_time = state + self.process, self.managed = state self.data = "x" * self.process @@ -1338,7 +1330,7 @@ async def test_spill_spill_threshold(c, s, a): memory = psutil.Process().memory_info().rss a.memory_limit = (memory + 300e6) / 0.7 - futures = c.map(BadSizeof, range(8), process=100e6, managed=10, pickle_time=0) + futures = c.map(BadSizeof, range(8), process=100e6, managed=10) while not a.data.disk: await asyncio.sleep(0.01) @@ -1349,25 +1341,20 @@ async def assert_not_everything_is_spilled(dask_worker: Worker) -> None: assert isinstance(dask_worker.data, SpillBuffer) start = time() nspilled = 0 - # This timeout must be longer than the 0.5s in BadSizeof.__reduce__ - while time() < start + 1.0: + while time() < start + 0.5: assert dask_worker.data - assert dask_worker.data.memory + if not dask_worker.data.memory: + raise pytest.xfail("https://github.com/dask/distributed/issues/5840") if len(dask_worker.data.disk) != nspilled: nspilled = len(dask_worker.data.disk) start = time() # We just spilled; reset timer await asyncio.sleep(0.01) -@pytest.mark.slow @requires_zict @gen_cluster( nthreads=[("", 1)], client=True, - # Run the test in a freshly spawned interpreter to ensure a clear memory situation, - # as opposed to the potentially heavily fragmented and unpredictable condition of - # the process used to run all the tests so far - Worker=Nanny, worker_kwargs=dict( # FIXME https://github.com/dask/distributed/issues/5367 # Can't reconfigure the absolute target threshold after the worker @@ -1381,56 +1368,38 @@ async def assert_not_everything_is_spilled(dask_worker: Worker) -> None: memory_pause_fraction=False, ), ) -async def test_spill_no_target_threshold(c, s, nanny): +async def test_spill_no_target_threshold(c, s, a): """Test that you can enable the spill threshold while leaving the target threshold to False """ - - def change_memory_limit(dask_worker): - memory = psutil.Process().memory_info().rss - # 300 MB before we start spilling - dask_worker.memory_limit = (memory + 300e6) / 0.7 - - await c.run(change_memory_limit) - - async def get_keys() -> tuple[set[str], set[str]]: - def _(dask_worker): - return set(dask_worker.data.memory), set(dask_worker.data.disk) - - out = await c.run(_) - return out[nanny.worker_address] + memory = psutil.Process().memory_info().rss + # 300 MB before we start spilling + a.memory_limit = (memory + 300e6) / 0.7 f1 = c.submit(inc, 0, key="f1") await wait(f1) - mem, spilled = await get_keys() - assert mem == {"f1"} - assert not spilled + assert set(a.data.memory) == {"f1"} + assert not a.data.disk # 800 MB process memory, 10 GB bogous managed memory. # Use large chunks to stimulate timely release of process memory. futures = c.map(BadSizeof, range(8), process=100e6, managed=10e9) - while not spilled: - mem, spilled = await get_keys() + while not a.data.disk: await asyncio.sleep(0.01) - assert "f1" in spilled + assert "f1" in a.data.disk # Spilling normally starts at the spill threshold and stops at the target threshold. # In this special case, it stops as soon as the process memory goes below the spill # threshold, e.g. without a hysteresis cycle. Test that we didn't instead dump the # whole data to disk (memory_limit * target = 0) - await c.run(assert_not_everything_is_spilled) + await assert_not_everything_is_spilled(a) -@pytest.mark.slow @requires_zict @gen_cluster( nthreads=[("", 1)], client=True, - # Run the test in a freshly spawned interpreter to ensure a clear memory situation, - # as opposed to the potentially heavily fragmented and unpredictable condition of - # the process used to run all the tests so far - Worker=Nanny, worker_kwargs=dict( memory_limit="10 GiB", # See FIXME note in previous test memory_monitor_interval="10ms", @@ -1439,27 +1408,26 @@ def _(dask_worker): memory_pause_fraction=False, ), ) -async def test_spill_hysteresis(c, s, nanny): - def change_memory_limit(dask_worker): - memory = psutil.Process().memory_info().rss - dask_worker.memory_limit = (memory + 1e9) / 0.7 # Start spilling after 1 GB - - await c.run(change_memory_limit) +async def test_spill_hysteresis(c, s, a): + memory = psutil.Process().memory_info().rss + a.memory_limit = (memory + 1e9) / 0.7 # Start spilling after 1 GB # Under-report managed memory, so that we reach the spill threshold for process # memory without first reaching the target threshold for managed memory futures = c.map(BadSizeof, range(11), process=100e6, managed=1) - # Wait until spilling stops, but no less than 1s - await c.run(assert_not_everything_is_spilled) + while not a.data.disk: + await asyncio.sleep(0.01) + + # Wait until spilling stops, but no less than 0.5s + await assert_not_everything_is_spilled(a) # If there were no hysteresis, we would lose 1-2 keys. # Note that, for this test to be meaningful, memory must shrink down readily when # we deallocate Python objects. This is not always the case on Windows and MacOSX; # on Linux we set MALLOC_TRIM to help in that regard. # To verify that this test is useful, set target=spill and watch it fail. - nspilled = await c.run(lambda dask_worker: len(dask_worker.data.disk)) - assert 3 < nspilled[nanny.worker_address] < 11 + assert 3 < len(a.data.disk) < 11 @pytest.mark.slow From df14e243d407de24c9cd3d736ad7882c23733d6f Mon Sep 17 00:00:00 2001 From: crusaderky Date: Wed, 23 Feb 2022 13:28:04 +0000 Subject: [PATCH 04/13] test resilience --- distributed/tests/test_worker.py | 42 ++++++++++++++++++++++---------- 1 file changed, 29 insertions(+), 13 deletions(-) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 8734cbe4395..81c53dcd170 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -1344,7 +1344,10 @@ async def assert_not_everything_is_spilled(dask_worker: Worker) -> None: while time() < start + 0.5: assert dask_worker.data if not dask_worker.data.memory: - raise pytest.xfail("https://github.com/dask/distributed/issues/5840") + raise pytest.xfail( + "The whole contents of the SpillBuffer was spilled to disk: " + "https://github.com/dask/distributed/issues/5840" + ) if len(dask_worker.data.disk) != nspilled: nspilled = len(dask_worker.data.disk) start = time() # We just spilled; reset timer @@ -1361,7 +1364,7 @@ async def assert_not_everything_is_spilled(dask_worker: Worker) -> None: # started, so we're setting it here to something extremely small and then # increasing the memory_limit dynamically below in order to test the # spill threshold. - memory_limit=1, + memory_limit="1 kb", memory_monitor_interval="10ms", memory_target_fraction=False, memory_spill_fraction=0.7, @@ -1381,7 +1384,7 @@ async def test_spill_no_target_threshold(c, s, a): assert set(a.data.memory) == {"f1"} assert not a.data.disk - # 800 MB process memory, 10 GB bogous managed memory. + # 800 MB process memory, 80 GB bogus managed memory. # Use large chunks to stimulate timely release of process memory. futures = c.map(BadSizeof, range(8), process=100e6, managed=10e9) @@ -1397,37 +1400,50 @@ async def test_spill_no_target_threshold(c, s, a): @requires_zict +@pytest.mark.parametrize( + "memory_target_fraction,min_spill,max_spill", + [ + (0.7, 1, 3), # target=spill -> no hysteresis + (0.4, 4, 8), # target hysteresis + ], +) @gen_cluster( nthreads=[("", 1)], client=True, worker_kwargs=dict( memory_limit="10 GiB", # See FIXME note in previous test memory_monitor_interval="10ms", - memory_target_fraction=0.4, memory_spill_fraction=0.7, memory_pause_fraction=False, ), ) -async def test_spill_hysteresis(c, s, a): +async def test_spill_hysteresis(c, s, a, memory_target_fraction, min_spill, max_spill): memory = psutil.Process().memory_info().rss - a.memory_limit = (memory + 1e9) / 0.7 # Start spilling after 1 GB + a.memory_limit = (memory + 950e6) / 0.7 # Start spilling after 950MB + a.memory_target_fraction = memory_target_fraction # Under-report managed memory, so that we reach the spill threshold for process # memory without first reaching the target threshold for managed memory - futures = c.map(BadSizeof, range(11), process=100e6, managed=1) + futures = c.map(BadSizeof, range(10), process=100e6, managed=1) + start = time() while not a.data.disk: + # Different OSs/test environments can get stuck because of slightly different + # memory management algorithms. Add an extra 100MB every 2s to compensate. + # Can't do this too fast otherwise a very slow CI will fail on the number of + # elements actually spilled later. + if time() > start + 2: # pragma: nocover + print("Did not spill; adding a future") + futures.append(c.submit(BadSizeof, len(futures), process=100e6, managed=1)) + start = time() await asyncio.sleep(0.01) # Wait until spilling stops, but no less than 0.5s await assert_not_everything_is_spilled(a) - # If there were no hysteresis, we would lose 1-2 keys. - # Note that, for this test to be meaningful, memory must shrink down readily when - # we deallocate Python objects. This is not always the case on Windows and MacOSX; - # on Linux we set MALLOC_TRIM to help in that regard. - # To verify that this test is useful, set target=spill and watch it fail. - assert 3 < len(a.data.disk) < 11 + # Run this test twice, once with no hysteresis and another with hysteresis to show + # the different behaviour. + assert min_spill <= len(a.data.disk) <= max_spill @pytest.mark.slow From ad8803ecc8cd413c2eb667d8cb382e3c82bcc1c6 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Wed, 23 Feb 2022 15:25:59 +0000 Subject: [PATCH 05/13] redesign with nannies --- distributed/nanny.py | 14 ++- distributed/tests/test_worker.py | 185 +++++++++++++++---------------- 2 files changed, 97 insertions(+), 102 deletions(-) diff --git a/distributed/nanny.py b/distributed/nanny.py index 55c7838d8f3..65a2d303785 100644 --- a/distributed/nanny.py +++ b/distributed/nanny.py @@ -13,7 +13,7 @@ from inspect import isawaitable from queue import Empty from time import sleep as sync_sleep -from typing import ClassVar +from typing import TYPE_CHECKING, ClassVar, Literal import psutil from tornado import gen @@ -45,6 +45,9 @@ ) from .worker import Worker, parse_memory_limit, run +if TYPE_CHECKING: + from .diagnostics.plugin import NannyPlugin + logger = logging.getLogger(__name__) @@ -94,6 +97,7 @@ def __init__( services=None, name=None, memory_limit="auto", + memory_terminate_fraction: float | Literal[False] | None = None, reconnect=True, validate=False, quiet=False, @@ -203,8 +207,10 @@ def __init__( self.worker_kwargs = worker_kwargs self.contact_address = contact_address - self.memory_terminate_fraction = dask.config.get( - "distributed.worker.memory.terminate" + self.memory_terminate_fraction = ( + memory_terminate_fraction + if memory_terminate_fraction is not None + else dask.config.get("distributed.worker.memory.terminate") ) self.services = services @@ -231,7 +237,7 @@ def __init__( "plugin_remove": self.plugin_remove, } - self.plugins = {} + self.plugins: dict[str, NannyPlugin] = {} super().__init__( handlers=handlers, io_loop=self.loop, connection_args=self.connection_args diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 81c53dcd170..a6e193f0ad8 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -3,6 +3,7 @@ import asyncio import importlib import logging +import math import os import sys import threading @@ -1335,115 +1336,103 @@ async def test_spill_spill_threshold(c, s, a): await asyncio.sleep(0.01) -async def assert_not_everything_is_spilled(dask_worker: Worker) -> None: - from distributed.spill import SpillBuffer - - assert isinstance(dask_worker.data, SpillBuffer) - start = time() - nspilled = 0 - while time() < start + 0.5: - assert dask_worker.data - if not dask_worker.data.memory: - raise pytest.xfail( - "The whole contents of the SpillBuffer was spilled to disk: " - "https://github.com/dask/distributed/issues/5840" - ) - if len(dask_worker.data.disk) != nspilled: - nspilled = len(dask_worker.data.disk) - start = time() # We just spilled; reset timer - await asyncio.sleep(0.01) - - -@requires_zict -@gen_cluster( - nthreads=[("", 1)], - client=True, - worker_kwargs=dict( - # FIXME https://github.com/dask/distributed/issues/5367 - # Can't reconfigure the absolute target threshold after the worker - # started, so we're setting it here to something extremely small and then - # increasing the memory_limit dynamically below in order to test the - # spill threshold. - memory_limit="1 kb", - memory_monitor_interval="10ms", - memory_target_fraction=False, - memory_spill_fraction=0.7, - memory_pause_fraction=False, - ), -) -async def test_spill_no_target_threshold(c, s, a): - """Test that you can enable the spill threshold while leaving the target threshold - to False - """ - memory = psutil.Process().memory_info().rss - # 300 MB before we start spilling - a.memory_limit = (memory + 300e6) / 0.7 - - f1 = c.submit(inc, 0, key="f1") - await wait(f1) - assert set(a.data.memory) == {"f1"} - assert not a.data.disk - - # 800 MB process memory, 80 GB bogus managed memory. - # Use large chunks to stimulate timely release of process memory. - futures = c.map(BadSizeof, range(8), process=100e6, managed=10e9) - - while not a.data.disk: - await asyncio.sleep(0.01) - assert "f1" in a.data.disk - - # Spilling normally starts at the spill threshold and stops at the target threshold. - # In this special case, it stops as soon as the process memory goes below the spill - # threshold, e.g. without a hysteresis cycle. Test that we didn't instead dump the - # whole data to disk (memory_limit * target = 0) - await assert_not_everything_is_spilled(a) - - +@pytest.mark.slow @requires_zict @pytest.mark.parametrize( - "memory_target_fraction,min_spill,max_spill", + "memory_target_fraction,managed,min_spill,max_spill", [ - (0.7, 1, 3), # target=spill -> no hysteresis - (0.4, 4, 8), # target hysteresis + # no target -> no hysteresis + # Over-report managed memory to test that the automated LRU eviction based on + # target is never triggered + (False, 10e9, 1, 3), + # Under-report managed memory, so that we reach the spill threshold for process + # memory without first reaching the target threshold for managed memory + # target == spill -> no hysteresis + (0.7, 1, 1, 3), + # target < spill -> hysteresis from spill to target + (0.4, 1, 4, 8), ], ) -@gen_cluster( - nthreads=[("", 1)], - client=True, - worker_kwargs=dict( - memory_limit="10 GiB", # See FIXME note in previous test +@gen_cluster(nthreads=[], client=True) +async def test_spill_hysteresis( + c, s, memory_target_fraction, managed, min_spill, max_spill +): + """ + 1. Test that you can enable the spill threshold while leaving the target threshold + to False + 2. Test the hysteresis system where, once you reach the spill threshold, the worker + won't stop spilling until the target threshold is reached + """ + # Run the test in a freshly spawned interpreter to ensure a clear memory situation, + # as opposed to the potentially heavily fragmented and unpredictable condition of + # the process used to run all the tests so far + async with Nanny( + s.address, + # Start spilling after ~950 MiB managed memory + # (assuming ~100 MiB unmanaged memory) + memory_limit="1500 MiB", memory_monitor_interval="10ms", + memory_target_fraction=memory_target_fraction, memory_spill_fraction=0.7, memory_pause_fraction=False, - ), -) -async def test_spill_hysteresis(c, s, a, memory_target_fraction, min_spill, max_spill): - memory = psutil.Process().memory_info().rss - a.memory_limit = (memory + 950e6) / 0.7 # Start spilling after 950MB - a.memory_target_fraction = memory_target_fraction - - # Under-report managed memory, so that we reach the spill threshold for process - # memory without first reaching the target threshold for managed memory - futures = c.map(BadSizeof, range(10), process=100e6, managed=1) + memory_terminate_fraction=False, + ) as nanny: + + async def nspilled() -> int: + out = await c.run(lambda dask_worker: len(dask_worker.data.disk)) + return out[nanny.worker_address] + + nfuts_for_spilling = math.ceil((1500 * 0.7 - s.memory.process / 2**20) / 100) + print(f"Process memory: {s.memory.process / 2**20:.0f} MiB") + print(f"Initial load: {nfuts_for_spilling} * 100 MiB") + assert nfuts_for_spilling > 6 + + # Add 100 MiB process memory. Spilling must not happen, even when managed=10GB + futures = [ + c.submit(BadSizeof, process=100 * 2**20, managed=managed, pure=False) + ] + await wait(futures) + await asyncio.sleep(0.2) + assert await nspilled() == 0 + + # Add another ~800MB process memory. This should start the spilling. + futures += c.map( + BadSizeof, + range(nfuts_for_spilling - 1), + process=100 * 2**20, + managed=managed, + ) - start = time() - while not a.data.disk: - # Different OSs/test environments can get stuck because of slightly different - # memory management algorithms. Add an extra 100MB every 2s to compensate. - # Can't do this too fast otherwise a very slow CI will fail on the number of - # elements actually spilled later. - if time() > start + 2: # pragma: nocover - print("Did not spill; adding a future") - futures.append(c.submit(BadSizeof, len(futures), process=100e6, managed=1)) - start = time() - await asyncio.sleep(0.01) + # Wait until spilling starts + start = time() + while not await nspilled(): + # Different OSs/test environments can get stuck because of slightly + # different memory management algorithms and base unmanaged memory. Add an + # extra 100MB every 0.5s to compensate. Can't do this too fast otherwise a + # very slow CI will fail on the number of elements actually spilled later. + if time() > start + 0.5: # pragma: nocover + print("Did not spill; adding a future") + futures.append( + c.submit(BadSizeof, process=100e6, managed=managed, pure=False) + ) + start = time() + await asyncio.sleep(0.1) - # Wait until spilling stops, but no less than 0.5s - await assert_not_everything_is_spilled(a) + # Wait until spilling stops + prev_n = -1 + while prev_n == -1 or time() < start + 0.5: + n = await nspilled() + if n == len(futures): + raise pytest.xfail( + "The whole contents of the SpillBuffer was spilled to disk; see " + "https://github.com/dask/distributed/issues/5840" + ) + if n != prev_n: + prev_n = n + start = time() # We just spilled; reset timer + await asyncio.sleep(0.1) - # Run this test twice, once with no hysteresis and another with hysteresis to show - # the different behaviour. - assert min_spill <= len(a.data.disk) <= max_spill + assert min_spill <= await nspilled() <= max_spill @pytest.mark.slow From 34a790a863fa6e782c30f9a7842f2493c074a9e5 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Wed, 23 Feb 2022 15:47:24 +0000 Subject: [PATCH 06/13] Stress test --- .github/workflows/tests.yaml | 12 ++---------- distributed/tests/test_worker.py | 22 +++++++++++++++++----- 2 files changed, 19 insertions(+), 15 deletions(-) diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index d1f8431977b..bbddd525e4f 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -24,13 +24,6 @@ jobs: matrix: os: [ubuntu-latest, windows-latest, macos-latest] python-version: ["3.8", "3.9"] - # Cherry-pick test modules to split the overall runtime roughly in half - partition: [ci1, not ci1] - include: - - partition: "ci1" - partition-label: "ci1" - - partition: "not ci1" - partition-label: "notci1" # Uncomment to stress-test the test suite for random failures. # Must also change env.TEST_ID below. @@ -38,11 +31,10 @@ jobs: # To avoid hamstringing other people, change 'on: [push, pull_request]' above # to just 'on: [push]'; this way the stress test will run exclusively in your # branch (https://github.com//distributed/actions). - # run: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20] + run: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20] env: - TEST_ID: ${{ matrix.os }}-${{ matrix.python-version }}-${{ matrix.partition-label }} - # TEST_ID: ${{ matrix.os }}-${{ matrix.python-version }}-${{ matrix.partition-label }}-${{ matrix.run }} + TEST_ID: ${{ matrix.os }}-${{ matrix.python-version }}-${{ matrix.run }} steps: - name: Checkout source diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index a6e193f0ad8..a4e0deed308 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -1389,7 +1389,12 @@ async def nspilled() -> int: # Add 100 MiB process memory. Spilling must not happen, even when managed=10GB futures = [ - c.submit(BadSizeof, process=100 * 2**20, managed=managed, pure=False) + c.submit( + BadSizeof, + process=100 * 2**20, + managed=managed, + pure=False, + ) ] await wait(futures) await asyncio.sleep(0.2) @@ -1413,7 +1418,12 @@ async def nspilled() -> int: if time() > start + 0.5: # pragma: nocover print("Did not spill; adding a future") futures.append( - c.submit(BadSizeof, process=100e6, managed=managed, pure=False) + c.submit( + BadSizeof, + process=100 * 2**20, + managed=managed, + pure=False, + ) ) start = time() await asyncio.sleep(0.1) @@ -1423,9 +1433,11 @@ async def nspilled() -> int: while prev_n == -1 or time() < start + 0.5: n = await nspilled() if n == len(futures): - raise pytest.xfail( - "The whole contents of the SpillBuffer was spilled to disk; see " - "https://github.com/dask/distributed/issues/5840" + # raise pytest.xfail( + raise AssertionError( + "The whole content of the SpillBuffer was spilled to disk; see " + "https://github.com/dask/distributed/issues/5840. " + "Consider converting this to xfail" ) if n != prev_n: prev_n = n From cb1f369c38785a5951a152bb32dd6430934f01c3 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Wed, 23 Feb 2022 16:23:23 +0000 Subject: [PATCH 07/13] xfail on MacOS --- distributed/tests/test_worker.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index a4e0deed308..975d7b9d16d 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -36,7 +36,7 @@ wait, ) from distributed.comm.registry import backends -from distributed.compatibility import LINUX, WINDOWS +from distributed.compatibility import LINUX, MACOS, WINDOWS from distributed.core import CommClosedError, Status, rpc from distributed.diagnostics import nvml from distributed.diagnostics.plugin import PipInstall @@ -1433,11 +1433,10 @@ async def nspilled() -> int: while prev_n == -1 or time() < start + 0.5: n = await nspilled() if n == len(futures): - # raise pytest.xfail( - raise AssertionError( + exc_cls = pytest.xfail if MACOS else AssertionError + raise exc_cls( "The whole content of the SpillBuffer was spilled to disk; see " - "https://github.com/dask/distributed/issues/5840. " - "Consider converting this to xfail" + "https://github.com/dask/distributed/issues/5840." ) if n != prev_n: prev_n = n From ab561c229887a4ff04713b74e2bd0bf059ff9d7d Mon Sep 17 00:00:00 2001 From: crusaderky Date: Wed, 23 Feb 2022 16:23:49 +0000 Subject: [PATCH 08/13] don't saturate dask org CI --- .github/workflows/tests.yaml | 3 --- 1 file changed, 3 deletions(-) diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index bbddd525e4f..7a47b1db825 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -2,9 +2,6 @@ name: Tests on: push: - pull_request: - schedule: - - cron: "0 6,18 * * *" # When this workflow is queued, automatically cancel any previous running # or pending jobs from the same branch From ef0e44b905dd787e0feb13fa9847d9a27c133cb8 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Wed, 23 Feb 2022 17:08:36 +0000 Subject: [PATCH 09/13] improve resilience of test_pause_executor --- distributed/tests/test_worker.py | 47 +++++++++++++++++--------------- 1 file changed, 25 insertions(+), 22 deletions(-) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 975d7b9d16d..5685b3846b9 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -1449,44 +1449,47 @@ async def nspilled() -> int: @pytest.mark.slow @gen_cluster( nthreads=[("", 1)], + # Run the test in a freshly spawned interpreter to ensure a clear memory situation, + # as opposed to the potentially heavily fragmented and unpredictable condition of + # the process used to run all the tests so far + Worker=Nanny, client=True, worker_kwargs=dict( + # pause after ~300 MiB, assuming ~100 MiB unmanaged memory + memory_limit="500 MiB", memory_monitor_interval="10ms", memory_target_fraction=False, memory_spill_fraction=False, memory_pause_fraction=0.8, + memory_terminate_fraction=False, ), ) -async def test_pause_executor(c, s, a): - # See notes in test_spill_spill_threshold - memory = psutil.Process().memory_info().rss - a.memory_limit = (memory + 160e6) / 0.8 # Pause after 200 MB +async def test_pause_executor(c, s, nanny): + ws = s.workers[nanny.worker_address] - # Note: it's crucial to have a very large single chunk of memory that gets descoped - # all at once in order to instigate release of process memory. - # Read: https://github.com/dask/distributed/issues/5840 def f(): - # Add 400 MB unmanaged memory - x = "x" * int(400e6) + # Add 1 GiB unmanaged memory + # Note: it's crucial to have a very large single chunk of memory that gets + # descoped all at once in order to instigate release of process memory. Read: + # https://github.com/dask/distributed/issues/5840 + x = "x" * 2**30 w = get_worker() while w.status != Status.paused: sleep(0.01) - with captured_logger(logging.getLogger("distributed.worker")) as logger: - future = c.submit(f, key="x") - futures = c.map(slowinc, range(30), delay=0.1) - - while a.status != Status.paused: - await asyncio.sleep(0.01) - - assert "Pausing worker" in logger.getvalue() - assert sum(f.status == "finished" for f in futures) < 4 + assert ws.status == Status.running + x = c.submit(f, key="x") + while "x" not in s.tasks: + await asyncio.sleep(0.01) + futures = c.map(slowinc, range(8), delay=0.1) - while a.status != Status.running: - await asyncio.sleep(0.01) + while ws.status != Status.paused: + await asyncio.sleep(0.01) - assert "Resuming worker" in logger.getvalue() - await wait(futures) + assert sum(f.status == "finished" for f in futures) < 4 + # Wait for unpause + await wait(futures) + assert ws.status == Status.running @gen_cluster(client=True, worker_kwargs={"profile_cycle_interval": "50 ms"}) From e484378a46a8be2fa842edf4106e9e8f151a277e Mon Sep 17 00:00:00 2001 From: crusaderky Date: Wed, 23 Feb 2022 17:08:36 +0000 Subject: [PATCH 10/13] Revert "improve resilience of test_pause_executor" This reverts commit ef0e44b905dd787e0feb13fa9847d9a27c133cb8. --- distributed/tests/test_worker.py | 47 +++++++++++++++----------------- 1 file changed, 22 insertions(+), 25 deletions(-) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 5685b3846b9..975d7b9d16d 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -1449,47 +1449,44 @@ async def nspilled() -> int: @pytest.mark.slow @gen_cluster( nthreads=[("", 1)], - # Run the test in a freshly spawned interpreter to ensure a clear memory situation, - # as opposed to the potentially heavily fragmented and unpredictable condition of - # the process used to run all the tests so far - Worker=Nanny, client=True, worker_kwargs=dict( - # pause after ~300 MiB, assuming ~100 MiB unmanaged memory - memory_limit="500 MiB", memory_monitor_interval="10ms", memory_target_fraction=False, memory_spill_fraction=False, memory_pause_fraction=0.8, - memory_terminate_fraction=False, ), ) -async def test_pause_executor(c, s, nanny): - ws = s.workers[nanny.worker_address] +async def test_pause_executor(c, s, a): + # See notes in test_spill_spill_threshold + memory = psutil.Process().memory_info().rss + a.memory_limit = (memory + 160e6) / 0.8 # Pause after 200 MB + # Note: it's crucial to have a very large single chunk of memory that gets descoped + # all at once in order to instigate release of process memory. + # Read: https://github.com/dask/distributed/issues/5840 def f(): - # Add 1 GiB unmanaged memory - # Note: it's crucial to have a very large single chunk of memory that gets - # descoped all at once in order to instigate release of process memory. Read: - # https://github.com/dask/distributed/issues/5840 - x = "x" * 2**30 + # Add 400 MB unmanaged memory + x = "x" * int(400e6) w = get_worker() while w.status != Status.paused: sleep(0.01) - assert ws.status == Status.running - x = c.submit(f, key="x") - while "x" not in s.tasks: - await asyncio.sleep(0.01) - futures = c.map(slowinc, range(8), delay=0.1) + with captured_logger(logging.getLogger("distributed.worker")) as logger: + future = c.submit(f, key="x") + futures = c.map(slowinc, range(30), delay=0.1) - while ws.status != Status.paused: - await asyncio.sleep(0.01) + while a.status != Status.paused: + await asyncio.sleep(0.01) - assert sum(f.status == "finished" for f in futures) < 4 - # Wait for unpause - await wait(futures) - assert ws.status == Status.running + assert "Pausing worker" in logger.getvalue() + assert sum(f.status == "finished" for f in futures) < 4 + + while a.status != Status.running: + await asyncio.sleep(0.01) + + assert "Resuming worker" in logger.getvalue() + await wait(futures) @gen_cluster(client=True, worker_kwargs={"profile_cycle_interval": "50 ms"}) From 8152265602a2a7cce5d076e78eaa6df81e7609b7 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Thu, 24 Feb 2022 11:27:41 +0000 Subject: [PATCH 11/13] Revert stress test --- .github/workflows/tests.yaml | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 7a47b1db825..7e087f99757 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -2,6 +2,9 @@ name: Tests on: push: + pull_request: + schedule: + - cron: "0 6,18 * * *" # When this workflow is queued, automatically cancel any previous running # or pending jobs from the same branch @@ -21,6 +24,13 @@ jobs: matrix: os: [ubuntu-latest, windows-latest, macos-latest] python-version: ["3.8", "3.9"] + # Cherry-pick test modules to split the overall runtime roughly in half + partition: [ci1, not ci1] + include: + - partition: "ci1" + partition-label: "ci1" + - partition: "not ci1" + partition-label: "notci1" # Uncomment to stress-test the test suite for random failures. # Must also change env.TEST_ID below. @@ -28,10 +38,11 @@ jobs: # To avoid hamstringing other people, change 'on: [push, pull_request]' above # to just 'on: [push]'; this way the stress test will run exclusively in your # branch (https://github.com//distributed/actions). - run: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20] + # run: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20] env: - TEST_ID: ${{ matrix.os }}-${{ matrix.python-version }}-${{ matrix.run }} + TEST_ID: ${{ matrix.os }}-${{ matrix.python-version }}-${{ matrix.partition-label }} + # TEST_ID: ${{ matrix.os }}-${{ matrix.python-version }}-${{ matrix.partition-label }}-${{ matrix.run }} steps: - name: Checkout source @@ -118,8 +129,8 @@ jobs: set -o pipefail mkdir reports - pytest distributed/tests/test_worker.py \ - -m "not avoid_ci" --runslow \ + pytest distributed \ + -m "not avoid_ci and ${{ matrix.partition }}" --runslow \ --leaks=fds,processes,threads \ --junitxml reports/pytest.xml -o junit_suite_name=$TEST_ID \ --cov=distributed --cov-report=xml \ From cce851312e50e64c2b909e635a50fc1ceb54ce15 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Fri, 25 Feb 2022 11:01:29 +0000 Subject: [PATCH 12/13] mock_rss --- distributed/tests/test_utils_test.py | 27 ++++++++++++++++++++++++++- distributed/utils_test.py | 20 +++++++++++++++++++- 2 files changed, 45 insertions(+), 2 deletions(-) diff --git a/distributed/tests/test_utils_test.py b/distributed/tests/test_utils_test.py index db5cc866b5c..97ee33104e5 100755 --- a/distributed/tests/test_utils_test.py +++ b/distributed/tests/test_utils_test.py @@ -15,7 +15,7 @@ from distributed import Client, Nanny, Scheduler, Worker, config, default_client from distributed.compatibility import WINDOWS -from distributed.core import Server, rpc +from distributed.core import Server, Status, rpc from distributed.metrics import time from distributed.utils import mp_context from distributed.utils_test import ( @@ -28,6 +28,7 @@ gen_cluster, gen_test, inc, + mock_rss, new_config, tls_only_security, ) @@ -607,3 +608,27 @@ def test_start_failure_scheduler(): with pytest.raises(TypeError): with cluster(scheduler_kwargs={"foo": "bar"}): return + + +@gen_cluster( + client=True, + worker_kwargs={"heartbeat_interval": "10ms", "memory_monitor_interval": "10ms"}, +) +async def test_mock_rss(c, s, a, b): + # Test that it affects the readings sent to the Scheduler + mock_rss(a, 2e6) + while s.workers[a.address].memory.process != 2_000_000: + await asyncio.sleep(0.01) + + # Test that the instance has been mocked, not the class + assert s.workers[b.address].memory.process > 10e6 + + # Test that it's compatible with Client.run and can be used with Nannies + await c.run(mock_rss, nbytes=3e6, workers=[b.address]) + while s.workers[b.address].memory.process != 3_000_000: + await asyncio.sleep(0.01) + + # Test that it affects Worker.memory_monitor + mock_rss(a, 100e9) + while s.workers[a.address].status != Status.paused: + await asyncio.sleep(0.01) diff --git a/distributed/utils_test.py b/distributed/utils_test.py index f6625e74180..d56b8e2b85c 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -20,13 +20,14 @@ import threading import uuid import weakref -from collections import defaultdict +from collections import defaultdict, namedtuple from collections.abc import Callable from contextlib import contextmanager, nullcontext, suppress from glob import glob from itertools import count from time import sleep from typing import Any, Literal +from unittest.mock import MagicMock from distributed.compatibility import MACOS from distributed.scheduler import Scheduler @@ -1965,3 +1966,20 @@ def has_pytestmark(test_func: Callable, name: str) -> bool: """ marks = getattr(test_func, "pytestmark", []) return any(mark.name == name for mark in marks) + + +# Variant of psutil._pslinux.pmem, psutil._psosx.pmem, psutil._pswindows.pmem +pmem = namedtuple("pmem", "rss") + + +def mock_rss(dask_worker: Worker, nbytes: float) -> None: + """Mock all the process memory readings on a worker. Does not impact other workers. + + Usage: + + When using Workers: + >>> mock_rss(a, 100e6) + When using Nannies: + >>> await client.run(mock_rss, nbytes=100e6, workers=[a.worker_address]) + """ + dask_worker.monitor.proc.memory_info = MagicMock(return_value=pmem(int(nbytes))) From deca19bb8dd155240e0638764dcb1f3966edb23c Mon Sep 17 00:00:00 2001 From: crusaderky Date: Fri, 25 Feb 2022 17:49:29 +0000 Subject: [PATCH 13/13] test_worker.py --- distributed/tests/test_worker.py | 219 ++++++++++++------------------- 1 file changed, 82 insertions(+), 137 deletions(-) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 8ae934f2158..f19acb724c1 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -3,7 +3,6 @@ import asyncio import importlib import logging -import math import os import sys import threading @@ -28,6 +27,7 @@ import distributed from distributed import ( Client, + Event, Nanny, Reschedule, default_client, @@ -36,7 +36,7 @@ wait, ) from distributed.comm.registry import backends -from distributed.compatibility import LINUX, MACOS, WINDOWS +from distributed.compatibility import LINUX, WINDOWS from distributed.core import CommClosedError, Status, rpc from distributed.diagnostics import nvml from distributed.diagnostics.plugin import PipInstall @@ -54,6 +54,7 @@ gen_cluster, gen_test, inc, + mock_rss, mul, nodebug, slowinc, @@ -1283,35 +1284,12 @@ async def test_spill_constrained(c, s, w): assert set(w.data.disk) == {x.key} -class BadSizeof: - """Configurable actual process memory and reported managed memory""" - - # dummy *args is to facilitate Client.map - def __init__(self, *args, process: float, managed: float): - self.process = int(process) - self.managed = int(managed) - self.data = "x" * self.process - - def __sizeof__(self) -> int: - return self.managed - - def __getstate__(self): - """Do not rely on actual disk speed, which is sluggish and unpredictable on CI. - Also remove the impact of installing lz4 or blosc, which would shrink down - self.data to kilobytes. - """ - return self.process, self.managed - - def __setstate__(self, state) -> None: - self.process, self.managed = state - self.data = "x" * self.process - - @requires_zict @gen_cluster( nthreads=[("", 1)], client=True, worker_kwargs=dict( + memory_limit="1000 MB", memory_monitor_interval="10ms", memory_target_fraction=False, memory_spill_fraction=0.7, @@ -1323,134 +1301,74 @@ async def test_spill_spill_threshold(c, s, a): Test that the spill threshold uses the process memory and not the managed memory reported by sizeof(), which may be inaccurate. """ - # Reach 'spill' threshold after 400MB of managed data. We need to be generous in - # order to avoid flakiness due to fluctuations in unmanaged memory. - # FIXME https://github.com/dask/distributed/issues/5367 - # This works just by luck for the purpose of the spill and pause thresholds, - # and does NOT work for the target threshold. - memory = psutil.Process().memory_info().rss - a.memory_limit = (memory + 300e6) / 0.7 - - futures = c.map(BadSizeof, range(8), process=100e6, managed=10) + x = c.submit(inc, 0, key="x") + mock_rss(a, 800e6) while not a.data.disk: await asyncio.sleep(0.01) + assert await x == 1 -@pytest.mark.slow @requires_zict @pytest.mark.parametrize( - "memory_target_fraction,managed,min_spill,max_spill", + "memory_target_fraction,managed,expect_spilled", [ # no target -> no hysteresis # Over-report managed memory to test that the automated LRU eviction based on # target is never triggered - (False, 10e9, 1, 3), + (False, 10e9, 1), # Under-report managed memory, so that we reach the spill threshold for process # memory without first reaching the target threshold for managed memory # target == spill -> no hysteresis - (0.7, 1, 1, 3), + (0.7, 0, 1), # target < spill -> hysteresis from spill to target - (0.4, 1, 4, 8), + (0.4, 0, 7), ], ) @gen_cluster(nthreads=[], client=True) -async def test_spill_hysteresis( - c, s, memory_target_fraction, managed, min_spill, max_spill -): +async def test_spill_hysteresis(c, s, memory_target_fraction, managed, expect_spilled): """ 1. Test that you can enable the spill threshold while leaving the target threshold to False 2. Test the hysteresis system where, once you reach the spill threshold, the worker won't stop spilling until the target threshold is reached """ - # Run the test in a freshly spawned interpreter to ensure a clear memory situation, - # as opposed to the potentially heavily fragmented and unpredictable condition of - # the process used to run all the tests so far - async with Nanny( + + class C: + def __sizeof__(self): + return managed + + async with Worker( s.address, - # Start spilling after ~950 MiB managed memory - # (assuming ~100 MiB unmanaged memory) - memory_limit="1500 MiB", + memory_limit="1000 MB", memory_monitor_interval="10ms", memory_target_fraction=memory_target_fraction, memory_spill_fraction=0.7, memory_pause_fraction=False, - memory_terminate_fraction=False, - ) as nanny: - - async def nspilled() -> int: - out = await c.run(lambda dask_worker: len(dask_worker.data.disk)) - return out[nanny.worker_address] - - nfuts_for_spilling = math.ceil((1500 * 0.7 - s.memory.process / 2**20) / 100) - print(f"Process memory: {s.memory.process / 2**20:.0f} MiB") - print(f"Initial load: {nfuts_for_spilling} * 100 MiB") - assert nfuts_for_spilling > 6 - - # Add 100 MiB process memory. Spilling must not happen, even when managed=10GB - futures = [ - c.submit( - BadSizeof, - process=100 * 2**20, - managed=managed, - pure=False, - ) - ] + ) as a: + # Add 500MB (reported) process memory. Spilling must not happen. + futures = [c.submit(C, pure=False) for _ in range(10)] + mock_rss(a, 500e6) await wait(futures) - await asyncio.sleep(0.2) - assert await nspilled() == 0 - - # Add another ~800MB process memory. This should start the spilling. - futures += c.map( - BadSizeof, - range(nfuts_for_spilling - 1), - process=100 * 2**20, - managed=managed, - ) - - # Wait until spilling starts - start = time() - while not await nspilled(): - # Different OSs/test environments can get stuck because of slightly - # different memory management algorithms and base unmanaged memory. Add an - # extra 100MB every 0.5s to compensate. Can't do this too fast otherwise a - # very slow CI will fail on the number of elements actually spilled later. - if time() > start + 0.5: # pragma: nocover - print("Did not spill; adding a future") - futures.append( - c.submit( - BadSizeof, - process=100 * 2**20, - managed=managed, - pure=False, - ) - ) - start = time() - await asyncio.sleep(0.1) - - # Wait until spilling stops - prev_n = -1 - while prev_n == -1 or time() < start + 0.5: - n = await nspilled() - if n == len(futures): - exc_cls = pytest.xfail if MACOS else AssertionError - raise exc_cls( - "The whole content of the SpillBuffer was spilled to disk; see " - "https://github.com/dask/distributed/issues/5840." - ) - if n != prev_n: - prev_n = n - start = time() # We just spilled; reset timer - await asyncio.sleep(0.1) + await asyncio.sleep(0.1) + assert not a.data.disk + + # Add another 250MB unamanaged memory. This must trigger the spilling. + mock_rss(a, 750e6) + # Wait until spilling starts. Then, wait until it stops. + prev_n = 0 + while not a.data.disk or len(a.data.disk) > prev_n: + prev_n = len(a.data.disk) + mock_rss(a, 250e6 + 50e6 * len(a.data.memory)) + await asyncio.sleep(0) - assert min_spill <= await nspilled() <= max_spill + assert len(a.data.disk) == expect_spilled -@pytest.mark.slow @gen_cluster( - nthreads=[("", 1)], + nthreads=[("", 2)], client=True, worker_kwargs=dict( + memory_limit="1000 MB", memory_monitor_interval="10ms", memory_target_fraction=False, memory_spill_fraction=False, @@ -1458,35 +1376,62 @@ async def nspilled() -> int: ), ) async def test_pause_executor(c, s, a): - # See notes in test_spill_spill_threshold - memory = psutil.Process().memory_info().rss - a.memory_limit = (memory + 160e6) / 0.8 # Pause after 200 MB + def f(ev_f): + ev_f.wait() - # Note: it's crucial to have a very large single chunk of memory that gets descoped - # all at once in order to instigate release of process memory. - # Read: https://github.com/dask/distributed/issues/5840 - def f(): - # Add 400 MB unmanaged memory - x = "x" * int(400e6) + def g(ev_g1, ev_g2): + ev_g1.wait() + # Add 900 MB unmanaged memory w = get_worker() - while w.status != Status.paused: - sleep(0.01) + mock_rss(w, 900e6) + ev_g2.wait() + mock_rss(w, 0) + + ev_f = Event() + ev_g1 = Event() + ev_g2 = Event() + + # Tasks that are running when the worker pauses + x = c.submit(f, ev_f, key="x") + y = c.submit(g, ev_g1, ev_g2, key="y") + while a.executing_count != 2: + await asyncio.sleep(0.01) - with captured_logger(logging.getLogger("distributed.worker")) as logger: - future = c.submit(f, key="x") - futures = c.map(slowinc, range(30), delay=0.1) + # Task that is queued on the worker when the worker pauses + z = c.submit(inc, 0, key="z") + while "z" not in a.tasks: + await asyncio.sleep(0.01) - while a.status != Status.paused: + with captured_logger(logging.getLogger("distributed.worker")) as logger: + # Hog the worker with 900MB memory + await ev_g1.set() + while s.workers[a.address].status != Status.paused: await asyncio.sleep(0.01) assert "Pausing worker" in logger.getvalue() - assert sum(f.status == "finished" for f in futures) < 4 - while a.status != Status.running: + # Task that is queued on the scheduler when the worker pauses. + # It is not sent to the worker + w = c.submit(inc, 0, key="w") + while "w" not in s.tasks or s.tasks["w"].state != "no-worker": await asyncio.sleep(0.01) + # Unlock a slot on the worker. It won't be used. + await ev_f.set() + await x + await asyncio.sleep(0.05) + + assert a.executing_count == 1 + assert len(a.ready) == 1 + assert a.tasks["z"].state == "ready" + assert "w" not in a.tasks + + # Release the memory + await ev_g2.set() + await wait([y, z, w]) + + assert a.status == Status.running assert "Resuming worker" in logger.getvalue() - await wait(futures) @gen_cluster(client=True, worker_kwargs={"profile_cycle_interval": "50 ms"})