From 1d2f1b408a7ae2bad5fc0cf213c3623dec911981 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Wed, 16 Feb 2022 11:13:26 +0000 Subject: [PATCH 01/12] Document and test spill->target hysteresis cycle try fix --- distributed/tests/test_worker.py | 59 ++++++++++++++++++++++++++++++-- distributed/worker.py | 9 ++++- docs/source/worker.rst | 12 ++++--- 3 files changed, 73 insertions(+), 7 deletions(-) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 0f735dece79..0d63ced9974 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -1365,11 +1365,66 @@ def __reduce__(self): await wait(f1) assert set(a.data.memory) == {"f1"} - futures = c.map(OverReport, range(int(100e6), int(100e6) + 8)) + with captured_logger("distributed.worker") as logger: + futures = c.map(OverReport, range(int(100e6), int(100e6) + 5)) + + while not a.data.disk: + await asyncio.sleep(0.01) + assert "f1" in a.data.disk + await asyncio.sleep(0.5) + + # Spilling normally starts at the spill threshold and stops at the target threshold. + # In this special case, it stops as soon as the process memory goes below the spill + # threshold, e.g. without a hysteresis cycle. Test that we didn't instead dump the + # whole data to disk (memory_limit * target = 0) + assert "Unmanaged memory use is high" not in logger.getvalue() + +@pytest.mark.slow +@gen_cluster( + nthreads=[("", 1)], + client=True, + worker_kwargs=dict( + memory_limit="1 GiB", # See FIXME note in previous test + memory_monitor_interval="10ms", + memory_target_fraction=0.4, + memory_spill_fraction=0.7, + memory_pause_fraction=False, + ), +) +async def test_spill_hysteresis(c, s, a): + memory = s.workers[a.address].memory.process + a.memory_limit = memory + 2**30 + + # Under-report managed memory, so that we reach the spill threshold for process + # memory withouth first reaching the target threshold for managed memory + class UnderReport: + def __init__(self): + self.data = "x" * (50 * 2**20) # 50 MiB + + def __sizeof__(self): + return 1 + + def __reduce__(self): + """Speed up test by writing very little to disk when spilling""" + return UnderReport, () + + max_in_memory = 0 + futures = [] while not a.data.disk: + futures.append(c.submit(UnderReport, pure=False)) + max_in_memory = max(max_in_memory, len(a.data.memory)) + await wait(futures) + await asyncio.sleep(0.1) + max_in_memory = max(max_in_memory, len(a.data.memory)) + + # If there were no hysteresis, we would lose exactly 1 key. + # Note that, for this test to be meaningful, memory must shrink down readily when + # we deallocate Python objects. This is not always the case on Windows and MacOSX; + # on Linux we set MALLOC_TRIM to help in that regard. + # To verify that this test is useful, set target=spill and watch it fail. + while len(a.data.memory) > max_in_memory - 3: await asyncio.sleep(0.01) - assert "f1" in a.data.disk @pytest.mark.slow diff --git a/distributed/worker.py b/distributed/worker.py index 92f34fffa0a..f5862d27e78 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -3743,7 +3743,14 @@ def check_pause(memory): frac * 100, ) start = time() - target = self.memory_limit * self.memory_target_fraction + # Implement hysteresis cycle where spilling starts at the spill threshold + # and stops at the target threshold. Normally that here the target threshold + # defines process memory, whereas normally it defines reported managed + # memory (e.g. output of sizeof() ). + # If target=False, disable hysteresis. + target = self.memory_limit * ( + self.memory_target_fraction or self.memory_spill_fraction + ) count = 0 need = memory - target while memory > target: diff --git a/docs/source/worker.rst b/docs/source/worker.rst index 495865357fd..9a4cad0c519 100644 --- a/docs/source/worker.rst +++ b/docs/source/worker.rst @@ -161,13 +161,14 @@ by dask-worker :: Workers use a few different heuristics to keep memory use beneath this limit: 1. At 60% of memory load (as estimated by ``sizeof``), spill least recently used data - to disk + to disk. 2. At 70% of memory load (as reported by the OS), spill least recently used data to disk regardless of what is reported by ``sizeof``; this accounts for memory used by - the python interpreter, modules, global variables, memory leaks, etc. + the python interpreter, modules, global variables, memory leaks, etc. The spilling + stops when the memory goes below 60%, in a hysteresis cycle. 3. At 80% of memory load (as reported by the OS), stop accepting new work on local - thread pool -4. At 95% of memory load (as reported by the OS), terminate and restart the worker + thread pool. +4. At 95% of memory load (as reported by the OS), terminate and restart the worker. These values can be configured by modifying the ``~/.config/dask/distributed.yaml`` file: @@ -184,6 +185,9 @@ file: pause: 0.80 # fraction at which we pause worker threads terminate: 0.95 # fraction at which we terminate the worker +It is possible to individually disable any of these by setting its value to False. +Setting 'target' while leaving 'spill' active disables the spill hysteresis cycle. + Spill data to disk ~~~~~~~~~~~~~~~~~~ From 4d0d30102ce9102094a44b8016d693013c8f416f Mon Sep 17 00:00:00 2001 From: crusaderky Date: Wed, 16 Feb 2022 21:23:34 +0000 Subject: [PATCH 02/12] attempt fix --- distributed/tests/test_worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 0d63ced9974..6b266c1e226 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -1366,7 +1366,7 @@ def __reduce__(self): assert set(a.data.memory) == {"f1"} with captured_logger("distributed.worker") as logger: - futures = c.map(OverReport, range(int(100e6), int(100e6) + 5)) + futures = c.map(OverReport, range(int(100e6), int(100e6) + 8)) while not a.data.disk: await asyncio.sleep(0.01) From eaf34109b4187020c8fd4029076cc66cff1064b0 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Thu, 17 Feb 2022 11:50:05 +0000 Subject: [PATCH 03/12] fix test --- distributed/tests/test_worker.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 6b266c1e226..0f490e98c6c 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -1365,19 +1365,20 @@ def __reduce__(self): await wait(f1) assert set(a.data.memory) == {"f1"} - with captured_logger("distributed.worker") as logger: - futures = c.map(OverReport, range(int(100e6), int(100e6) + 8)) + futures = c.map(OverReport, range(int(100e6), int(100e6) + 8)) - while not a.data.disk: - await asyncio.sleep(0.01) - assert "f1" in a.data.disk - await asyncio.sleep(0.5) + while not a.data.disk: + await asyncio.sleep(0.01) + assert "f1" in a.data.disk # Spilling normally starts at the spill threshold and stops at the target threshold. # In this special case, it stops as soon as the process memory goes below the spill # threshold, e.g. without a hysteresis cycle. Test that we didn't instead dump the # whole data to disk (memory_limit * target = 0) - assert "Unmanaged memory use is high" not in logger.getvalue() + start = time() + while time() < start + 0.5: + assert a.data.memory + await asyncio.sleep(0) @pytest.mark.slow From 3b1a889ae1d1dfeaf509c35f0e68632b2138e750 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Sun, 20 Feb 2022 11:24:06 +0000 Subject: [PATCH 04/12] xfail --- distributed/tests/test_worker.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 0f490e98c6c..5b64d7f01bb 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -35,7 +35,7 @@ wait, ) from distributed.comm.registry import backends -from distributed.compatibility import LINUX, WINDOWS +from distributed.compatibility import LINUX, MACOS, WINDOWS from distributed.core import CommClosedError, Status, rpc from distributed.diagnostics import nvml from distributed.diagnostics.plugin import PipInstall @@ -1377,7 +1377,14 @@ def __reduce__(self): # whole data to disk (memory_limit * target = 0) start = time() while time() < start + 0.5: - assert a.data.memory + if not a.data.memory: + # The hysteresis system fails on MacOSX because process memory is very slow + # to shrink down after calls to PyFree. As a result, Worker.memory_monitor + # will continue spilling until there's nothing left. Nothing we can do about + # this short of finding either a way to change this behaviour at OS level or + # a better measure of allocated memory. + assert MACOS + raise pytest.xfail("Process memory is slow to shrink down on MacOSX") await asyncio.sleep(0) From ad8f75d1d1b95e2cc211581ae40cd00ef921f07b Mon Sep 17 00:00:00 2001 From: crusaderky Date: Sun, 20 Feb 2022 11:40:37 +0000 Subject: [PATCH 05/12] fix failures --- distributed/tests/test_worker.py | 38 +++++++++++++++++++------------- 1 file changed, 23 insertions(+), 15 deletions(-) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 5b64d7f01bb..fd8376e8b37 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -1192,6 +1192,7 @@ async def test_statistical_profiling_2(c, s, a, b): break +@requires_zict @gen_cluster( client=True, nthreads=[("", 1)], @@ -1303,7 +1304,7 @@ async def test_spill_spill_threshold(c, s, a): # This works just by luck for the purpose of the spill and pause thresholds, # and does NOT work for the target threshold. memory = s.workers[a.address].memory.process - a.memory_limit = memory / 0.7 + 400e6 + a.memory_limit = (memory + 300e6) / 0.7 class UnderReport: """100 MB process memory, 10 bytes reported managed memory""" @@ -1324,6 +1325,21 @@ def __reduce__(self): await asyncio.sleep(0.01) +async def assert_not_everything_is_spilled(w: Worker) -> None: + start = time() + while time() < start + 0.5: + assert w.data + if not w.data.memory: # type: ignore + # The hysteresis system fails on MacOSX because process memory is very slow + # to shrink down after calls to PyFree. As a result, Worker.memory_monitor + # will continue spilling until there's nothing left. Nothing we can do about + # this short of finding either a way to change this behaviour at OS level or + # a better measure of allocated memory. + assert MACOS, "All data was spilled to disk" + raise pytest.xfail("Process memory is slow to shrink down on MacOSX") + await asyncio.sleep(0) + + @requires_zict @gen_cluster( nthreads=[("", 1)], @@ -1346,7 +1362,7 @@ async def test_spill_no_target_threshold(c, s, a): to False """ memory = s.workers[a.address].memory.process - a.memory_limit = memory / 0.7 + 400e6 + a.memory_limit = (memory + 300e6) / 0.7 # 300 MB before we start spilling class OverReport: """Configurable process memory, 10 GB reported managed memory""" @@ -1365,7 +1381,7 @@ def __reduce__(self): await wait(f1) assert set(a.data.memory) == {"f1"} - futures = c.map(OverReport, range(int(100e6), int(100e6) + 8)) + futures = c.map(OverReport, range(int(20e6), int(20e6) + 25)) # 500 MB while not a.data.disk: await asyncio.sleep(0.01) @@ -1375,20 +1391,11 @@ def __reduce__(self): # In this special case, it stops as soon as the process memory goes below the spill # threshold, e.g. without a hysteresis cycle. Test that we didn't instead dump the # whole data to disk (memory_limit * target = 0) - start = time() - while time() < start + 0.5: - if not a.data.memory: - # The hysteresis system fails on MacOSX because process memory is very slow - # to shrink down after calls to PyFree. As a result, Worker.memory_monitor - # will continue spilling until there's nothing left. Nothing we can do about - # this short of finding either a way to change this behaviour at OS level or - # a better measure of allocated memory. - assert MACOS - raise pytest.xfail("Process memory is slow to shrink down on MacOSX") - await asyncio.sleep(0) + await assert_not_everything_is_spilled(a) @pytest.mark.slow +@requires_zict @gen_cluster( nthreads=[("", 1)], client=True, @@ -1433,6 +1440,7 @@ def __reduce__(self): # To verify that this test is useful, set target=spill and watch it fail. while len(a.data.memory) > max_in_memory - 3: await asyncio.sleep(0.01) + await assert_not_everything_is_spilled(a) @pytest.mark.slow @@ -1449,7 +1457,7 @@ def __reduce__(self): async def test_pause_executor(c, s, a): # See notes in test_spill_spill_threshold memory = psutil.Process().memory_info().rss - a.memory_limit = memory / 0.8 + 200e6 + a.memory_limit = (memory + 150e6) / 0.8 def f(): x = "x" * int(250e6) From 1637ef144e73029b421885fd1310c427a2e8e3ce Mon Sep 17 00:00:00 2001 From: crusaderky Date: Sun, 20 Feb 2022 11:46:14 +0000 Subject: [PATCH 06/12] revisit test --- distributed/tests/test_worker.py | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index fd8376e8b37..90a12100470 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -1540,22 +1540,19 @@ async def test_deque_handler(s): assert any(msg.msg == "foo456" for msg in deque_handler.deque) -@gen_cluster(nthreads=[], client=True) -async def test_avoid_memory_monitor_if_zero_limit(c, s): - worker = await Worker( - s.address, loop=s.loop, memory_limit=0, memory_monitor_interval=10 - ) - assert type(worker.data) is dict - assert "memory" not in worker.periodic_callbacks - +@gen_cluster( + client=True, + nthreads=[("", 1)], + worker_kwargs={"memory_limit": 0, "memory_monitor_interval": "10ms"}, +) +async def test_avoid_memory_monitor_if_zero_limit(c, s, a): + assert type(a.data) is dict + assert "memory" not in a.periodic_callbacks future = c.submit(inc, 1) assert (await future) == 2 - await asyncio.sleep(worker.memory_monitor_interval / 1000) - + await asyncio.sleep(0.05) await c.submit(inc, 2) # worker doesn't pause - await worker.close() - @gen_cluster( nthreads=[("127.0.0.1", 1)], From cc3ab6ee412ccef861d4b60e854799d0dd66384c Mon Sep 17 00:00:00 2001 From: crusaderky Date: Sun, 20 Feb 2022 12:01:58 +0000 Subject: [PATCH 07/12] xfail test_pause_executor on MacOSX --- distributed/tests/test_worker.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 90a12100470..9a41cf8eeb2 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -1336,7 +1336,7 @@ async def assert_not_everything_is_spilled(w: Worker) -> None: # this short of finding either a way to change this behaviour at OS level or # a better measure of allocated memory. assert MACOS, "All data was spilled to disk" - raise pytest.xfail("Process memory is slow to shrink down on MacOSX") + raise pytest.xfail("https://github.com/dask/distributed/issues/5840") await asyncio.sleep(0) @@ -1473,7 +1473,11 @@ def f(): assert "Pausing worker" in logger.getvalue() assert sum(f.status == "finished" for f in futures) < 4 + start = time() while a.status != Status.running: + if time() > start + 10: + assert MACOS + raise pytest.xfail("https://github.com/dask/distributed/issues/5840") await asyncio.sleep(0.01) assert "Resuming worker" in logger.getvalue() From ed4d3dcb7560dc33a0dffe1c4c65642ecc9c067f Mon Sep 17 00:00:00 2001 From: crusaderky Date: Sun, 20 Feb 2022 12:04:22 +0000 Subject: [PATCH 08/12] xfail on windows --- distributed/tests/test_worker.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 9a41cf8eeb2..136ed249b0e 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -35,7 +35,7 @@ wait, ) from distributed.comm.registry import backends -from distributed.compatibility import LINUX, MACOS, WINDOWS +from distributed.compatibility import LINUX, WINDOWS from distributed.core import CommClosedError, Status, rpc from distributed.diagnostics import nvml from distributed.diagnostics.plugin import PipInstall @@ -1330,12 +1330,12 @@ async def assert_not_everything_is_spilled(w: Worker) -> None: while time() < start + 0.5: assert w.data if not w.data.memory: # type: ignore - # The hysteresis system fails on MacOSX because process memory is very slow - # to shrink down after calls to PyFree. As a result, Worker.memory_monitor - # will continue spilling until there's nothing left. Nothing we can do about - # this short of finding either a way to change this behaviour at OS level or - # a better measure of allocated memory. - assert MACOS, "All data was spilled to disk" + # The hysteresis system fails on Windows and MacOSX because process memory + # is very slow to shrink down after calls to PyFree. As a result, + # Worker.memory_monitor will continue spilling until there's nothing left. + # Nothing we can do about this short of finding either a way to change this + # behaviour at OS level or a better measure of allocated memory. + assert not LINUX, "All data was spilled to disk" raise pytest.xfail("https://github.com/dask/distributed/issues/5840") await asyncio.sleep(0) @@ -1476,7 +1476,7 @@ def f(): start = time() while a.status != Status.running: if time() > start + 10: - assert MACOS + assert not LINUX raise pytest.xfail("https://github.com/dask/distributed/issues/5840") await asyncio.sleep(0.01) From cdb9f69a860593c7d7a5e4dfd5d5596113965ebb Mon Sep 17 00:00:00 2001 From: crusaderky Date: Sun, 20 Feb 2022 15:45:25 +0000 Subject: [PATCH 09/12] fix flaky test --- distributed/tests/test_worker.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 136ed249b0e..9963020b364 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -1303,7 +1303,7 @@ async def test_spill_spill_threshold(c, s, a): # FIXME https://github.com/dask/distributed/issues/5367 # This works just by luck for the purpose of the spill and pause thresholds, # and does NOT work for the target threshold. - memory = s.workers[a.address].memory.process + memory = psutil.Process().memory_info().rss a.memory_limit = (memory + 300e6) / 0.7 class UnderReport: @@ -1361,7 +1361,7 @@ async def test_spill_no_target_threshold(c, s, a): """Test that you can enable the spill threshold while leaving the target threshold to False """ - memory = s.workers[a.address].memory.process + memory = psutil.Process().memory_info().rss a.memory_limit = (memory + 300e6) / 0.7 # 300 MB before we start spilling class OverReport: @@ -1381,7 +1381,8 @@ def __reduce__(self): await wait(f1) assert set(a.data.memory) == {"f1"} - futures = c.map(OverReport, range(int(20e6), int(20e6) + 25)) # 500 MB + # 500 MB. Use large chunks to stimulate timely release of process memory. + futures = c.map(OverReport, range(int(100e6), int(100e6) + 5)) while not a.data.disk: await asyncio.sleep(0.01) @@ -1408,7 +1409,7 @@ def __reduce__(self): ), ) async def test_spill_hysteresis(c, s, a): - memory = s.workers[a.address].memory.process + memory = psutil.Process().memory_info().rss a.memory_limit = memory + 2**30 # Under-report managed memory, so that we reach the spill threshold for process From 78858dfde32e43e1235e9b4eb7a766bb27a88d92 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Sun, 20 Feb 2022 17:01:58 +0000 Subject: [PATCH 10/12] fix --- distributed/tests/test_worker.py | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 9963020b364..f9e92111667 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -1410,13 +1410,13 @@ def __reduce__(self): ) async def test_spill_hysteresis(c, s, a): memory = psutil.Process().memory_info().rss - a.memory_limit = memory + 2**30 + a.memory_limit = (memory + 1e9) / 0.7 # Start spilling after 1 GB # Under-report managed memory, so that we reach the spill threshold for process - # memory withouth first reaching the target threshold for managed memory + # memory without first reaching the target threshold for managed memory class UnderReport: def __init__(self): - self.data = "x" * (50 * 2**20) # 50 MiB + self.data = "x" * int(100e6) # 100 MB def __sizeof__(self): return 1 @@ -1431,7 +1431,7 @@ def __reduce__(self): futures.append(c.submit(UnderReport, pure=False)) max_in_memory = max(max_in_memory, len(a.data.memory)) await wait(futures) - await asyncio.sleep(0.1) + await asyncio.sleep(0.05) max_in_memory = max(max_in_memory, len(a.data.memory)) # If there were no hysteresis, we would lose exactly 1 key. @@ -1458,11 +1458,17 @@ def __reduce__(self): async def test_pause_executor(c, s, a): # See notes in test_spill_spill_threshold memory = psutil.Process().memory_info().rss - a.memory_limit = (memory + 150e6) / 0.8 + a.memory_limit = (memory + 160e6) / 0.8 # Pause after 200 MB + # Note: it's crucial to have a very large single chunk of memory that gets descoped + # all at once in order to instigate release of process memory. + # Read: https://github.com/dask/distributed/issues/5840 def f(): - x = "x" * int(250e6) - sleep(1) + # Add 400 MB unmanaged memory + x = "x" * int(400e6) + w = get_worker() + while w.status != Status.paused: + sleep(0.01) with captured_logger(logging.getLogger("distributed.worker")) as logger: future = c.submit(f, key="x") @@ -1474,11 +1480,7 @@ def f(): assert "Pausing worker" in logger.getvalue() assert sum(f.status == "finished" for f in futures) < 4 - start = time() while a.status != Status.running: - if time() > start + 10: - assert not LINUX - raise pytest.xfail("https://github.com/dask/distributed/issues/5840") await asyncio.sleep(0.01) assert "Resuming worker" in logger.getvalue() From ead70b4124a292223d322a9587d845940ffdbc77 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Sun, 20 Feb 2022 17:41:36 +0000 Subject: [PATCH 11/12] fix --- distributed/tests/test_worker.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index f9e92111667..f6f33c4560b 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -1381,8 +1381,8 @@ def __reduce__(self): await wait(f1) assert set(a.data.memory) == {"f1"} - # 500 MB. Use large chunks to stimulate timely release of process memory. - futures = c.map(OverReport, range(int(100e6), int(100e6) + 5)) + # 800 MB. Use large chunks to stimulate timely release of process memory. + futures = c.map(OverReport, range(int(100e6), int(100e6) + 8)) while not a.data.disk: await asyncio.sleep(0.01) From f32a7cdd2f0f355cc89f976784a4aa65f024db9b Mon Sep 17 00:00:00 2001 From: crusaderky Date: Mon, 21 Feb 2022 16:58:18 +0000 Subject: [PATCH 12/12] Revisit documentation --- distributed/distributed.yaml | 8 +-- docs/source/worker.rst | 129 ++++++++++++++++------------------- 2 files changed, 61 insertions(+), 76 deletions(-) diff --git a/distributed/distributed.yaml b/distributed/distributed.yaml index 07abe57d874..793da7b3f33 100644 --- a/distributed/distributed.yaml +++ b/distributed/distributed.yaml @@ -140,10 +140,10 @@ distributed: # Fractions of worker process memory at which we take action to avoid memory # blowup. Set any of the values to False to turn off the behavior entirely. - target: 0.60 # target fraction to stay below - spill: 0.70 # fraction at which we spill to disk - pause: 0.80 # fraction at which we pause worker threads - terminate: 0.95 # fraction at which we terminate the worker + target: 0.60 # fraction of managed memory where we start spilling to disk + spill: 0.70 # fraction of process memory where we start spilling to disk + pause: 0.80 # fraction of process memory at which we pause worker threads + terminate: 0.95 # fraction of process memory at which we terminate the worker # Max size of the spill file on disk (e.g. "10 GB") # Set to false for no maximum. diff --git a/docs/source/worker.rst b/docs/source/worker.rst index 9a4cad0c519..eafaee1568f 100644 --- a/docs/source/worker.rst +++ b/docs/source/worker.rst @@ -149,7 +149,6 @@ exceptions to this are when: Memory Management ----------------- - Workers are given a target memory limit to stay under with the command line ``--memory-limit`` keyword or the ``memory_limit=`` Python keyword argument, which sets the memory limit per worker processes launched @@ -160,89 +159,73 @@ by dask-worker :: Workers use a few different heuristics to keep memory use beneath this limit: -1. At 60% of memory load (as estimated by ``sizeof``), spill least recently used data - to disk. -2. At 70% of memory load (as reported by the OS), spill least recently used data to - disk regardless of what is reported by ``sizeof``; this accounts for memory used by - the python interpreter, modules, global variables, memory leaks, etc. The spilling - stops when the memory goes below 60%, in a hysteresis cycle. -3. At 80% of memory load (as reported by the OS), stop accepting new work on local - thread pool. -4. At 95% of memory load (as reported by the OS), terminate and restart the worker. - -These values can be configured by modifying the ``~/.config/dask/distributed.yaml`` -file: - -.. code-block:: yaml - - distributed: - worker: - # Fractions of worker memory at which we take action to avoid memory blowup - # Set any of the lower three values to False to turn off the behavior entirely - memory: - target: 0.60 # target fraction to stay below - spill: 0.70 # fraction at which we spill to disk - pause: 0.80 # fraction at which we pause worker threads - terminate: 0.95 # fraction at which we terminate the worker - -It is possible to individually disable any of these by setting its value to False. -Setting 'target' while leaving 'spill' active disables the spill hysteresis cycle. - - -Spill data to disk -~~~~~~~~~~~~~~~~~~ +Spilling based on managed memory +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +Every time the worker finishes a task, it estimates the size in bytes that the result +costs to keep in memory using the ``sizeof`` function. This function defaults to +:func:`sys.getsizeof` for arbitrary objects, which uses the standard Python +``__sizeof__`` protocol, but also has special-cased implementations for common data +types like NumPy arrays and Pandas dataframes. The sum of the ``sizeof`` of all data +tracked by Dask is called :ref:`managed memory `_. -Every time the worker finishes a task it estimates the size in bytes that the -result costs to keep in memory using the ``sizeof`` function. This function -defaults to :func:`sys.getsizeof` for arbitrary objects, which uses the standard -Python ``__sizeof__`` protocol, but also has special-cased implementations for -common data types like NumPy arrays and Pandas dataframes. - -When the sum of the number of bytes of the data in memory exceeds 60% of the -memory limit, the worker will begin to dump the least recently used data -to disk. You can control this location with the ``--local-directory`` -keyword.:: +When the managed memory exceeds 60% of the memory limit (*target threshold*), the worker +will begin to dump the least recently used data to disk. You can control this location +with the ``--local-directory`` keyword:: $ dask-worker tcp://scheduler:port --memory-limit="4 GiB" --local-directory /scratch -That data is still available and will be read back from disk when necessary. -On the diagnostic dashboard status page disk I/O will show up in the task -stream plot as orange blocks. Additionally, the memory plot in the upper left -will become yellow and then red. - - -Monitor process memory load -~~~~~~~~~~~~~~~~~~~~~~~~~~~ +That data is still available and will be read back from disk when necessary. On the +diagnostic dashboard status page, disk I/O will show up in the task stream plot as +orange blocks. Additionally, the memory plot in the upper left will show a section of +the bar colored in grey. +Spilling based on process memory +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ The approach above can fail for a few reasons: 1. Custom objects may not report their memory size accurately 2. User functions may take up more RAM than expected 3. Significant amounts of data may accumulate in network I/O buffers -To address this we periodically monitor the memory of the worker process every -200 ms. If the system reported memory use is above 70% of the target memory -usage then the worker will start dumping unused data to disk, even if internal -``sizeof`` recording hasn't yet reached the normal 60% limit. - - -Halt worker threads -~~~~~~~~~~~~~~~~~~~ - -At 80% load, the worker's thread pool will stop starting computation on -additional tasks in the worker's queue. This gives time for the write-to-disk -functionality to take effect even in the face of rapidly accumulating data. -Currently executing tasks continue to run. +To address this, we periodically monitor the :ref:`process memory `_ of the +worker every 200 ms. If the system reported memory use is above 70% of the target memory +usage (*spill threshold*), then the worker will start dumping unused data to disk, even +if internal ``sizeof`` recording hasn't yet reached the normal 60% threshold. This +more aggressive spilling will continue until process memory falls below 60%. +Pause worker +~~~~~~~~~~~~ +At 80% :ref:`process memory `_ load, the worker's thread pool will stop +starting computation on additional tasks in the worker's queue. This gives time for the +write-to-disk functionality to take effect even in the face of rapidly accumulating +data. Currently executing tasks continue to run. Additionally, data transfers to/from +other workers are throttled to a bare minimum. Kill Worker ~~~~~~~~~~~ +At 95% :ref:`process memory `_ load (*terminate threshold*), a worker's nanny +process will terminate it. Tasks will be cancelled mid-execution and rescheduled +elsewhere; all unique data on the worker will be lost and will need to be recomputed. +This is to avoid having our worker job being terminated by an external watchdog (like +Kubernetes, YARN, Mesos, SGE, etc..). After termination, the nanny will restart the +worker in a fresh state. + +Thresholds configuration +~~~~~~~~~~~~~~~~~~~~~~~~ +These values can be configured by modifying the ``~/.config/dask/distributed.yaml`` +file: -At 95% memory load, a worker's nanny process will terminate it. This is to -avoid having our worker job being terminated by an external job scheduler (like -YARN, Mesos, SGE, etc..). After termination, the nanny will restart the worker -in a fresh state. +.. code-block:: yaml + distributed: + worker: + # Fractions of worker process memory at which we take action to avoid memory + # blowup. Set any of the values to False to turn off the behavior entirely. + memory: + target: 0.60 # fraction of managed memory where we start spilling to disk + spill: 0.70 # fraction of process memory where we start spilling to disk + pause: 0.80 # fraction of process memory at which we pause worker threads + terminate: 0.95 # fraction of process memory at which we terminate the worker Using the dashboard to monitor memory usage ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -250,16 +233,18 @@ The dashboard (typically available on port 8787) shows a summary of the overall usage on the cluster, as well as the individual usage on each worker. It provides different memory readings: +.. _memtypes: + process Overall memory used by the worker process (RSS), as measured by the OS managed - This is the sum of the ``sizeof`` of all dask data stored on the worker, excluding + Sum of the ``sizeof`` of all Dask data stored on the worker, excluding spilled data. unmanaged - This is the memory usage that dask is not directly aware of. It is estimated by - subtracting managed memory from the total process memory and typically includes: + Memory usage that Dask is not directly aware of. It is estimated by subtracting + managed memory from the total process memory and typically includes: - The Python interpreter code, loaded modules, and global variables - Memory temporarily used by running tasks @@ -284,7 +269,7 @@ unmanaged recent By default, :meth:`distributed.Client.rebalance` and :meth:`distributed.scheduler.Scheduler.rebalance` ignore unmanaged recent memory. - This behaviour can also be tweaked using the dask config - see the methods' + This behaviour can also be tweaked using the Dask config - see the methods' documentation. spilled @@ -403,7 +388,7 @@ documentation. Ignore process memory ~~~~~~~~~~~~~~~~~~~~~ -If all else fails, you may want to stop dask from using memory metrics from the OS (RSS) +If all else fails, you may want to stop Dask from using memory metrics from the OS (RSS) in its decision-making: .. code-block:: yaml