diff --git a/distributed/distributed.yaml b/distributed/distributed.yaml index 07abe57d874..793da7b3f33 100644 --- a/distributed/distributed.yaml +++ b/distributed/distributed.yaml @@ -140,10 +140,10 @@ distributed: # Fractions of worker process memory at which we take action to avoid memory # blowup. Set any of the values to False to turn off the behavior entirely. - target: 0.60 # target fraction to stay below - spill: 0.70 # fraction at which we spill to disk - pause: 0.80 # fraction at which we pause worker threads - terminate: 0.95 # fraction at which we terminate the worker + target: 0.60 # fraction of managed memory where we start spilling to disk + spill: 0.70 # fraction of process memory where we start spilling to disk + pause: 0.80 # fraction of process memory at which we pause worker threads + terminate: 0.95 # fraction of process memory at which we terminate the worker # Max size of the spill file on disk (e.g. "10 GB") # Set to false for no maximum. diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 0f735dece79..f6f33c4560b 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -1192,6 +1192,7 @@ async def test_statistical_profiling_2(c, s, a, b): break +@requires_zict @gen_cluster( client=True, nthreads=[("", 1)], @@ -1302,8 +1303,8 @@ async def test_spill_spill_threshold(c, s, a): # FIXME https://github.com/dask/distributed/issues/5367 # This works just by luck for the purpose of the spill and pause thresholds, # and does NOT work for the target threshold. - memory = s.workers[a.address].memory.process - a.memory_limit = memory / 0.7 + 400e6 + memory = psutil.Process().memory_info().rss + a.memory_limit = (memory + 300e6) / 0.7 class UnderReport: """100 MB process memory, 10 bytes reported managed memory""" @@ -1324,6 +1325,21 @@ def __reduce__(self): await asyncio.sleep(0.01) +async def assert_not_everything_is_spilled(w: Worker) -> None: + start = time() + while time() < start + 0.5: + assert w.data + if not w.data.memory: # type: ignore + # The hysteresis system fails on Windows and MacOSX because process memory + # is very slow to shrink down after calls to PyFree. As a result, + # Worker.memory_monitor will continue spilling until there's nothing left. + # Nothing we can do about this short of finding either a way to change this + # behaviour at OS level or a better measure of allocated memory. + assert not LINUX, "All data was spilled to disk" + raise pytest.xfail("https://github.com/dask/distributed/issues/5840") + await asyncio.sleep(0) + + @requires_zict @gen_cluster( nthreads=[("", 1)], @@ -1345,8 +1361,8 @@ async def test_spill_no_target_threshold(c, s, a): """Test that you can enable the spill threshold while leaving the target threshold to False """ - memory = s.workers[a.address].memory.process - a.memory_limit = memory / 0.7 + 400e6 + memory = psutil.Process().memory_info().rss + a.memory_limit = (memory + 300e6) / 0.7 # 300 MB before we start spilling class OverReport: """Configurable process memory, 10 GB reported managed memory""" @@ -1365,12 +1381,68 @@ def __reduce__(self): await wait(f1) assert set(a.data.memory) == {"f1"} + # 800 MB. Use large chunks to stimulate timely release of process memory. futures = c.map(OverReport, range(int(100e6), int(100e6) + 8)) while not a.data.disk: await asyncio.sleep(0.01) assert "f1" in a.data.disk + # Spilling normally starts at the spill threshold and stops at the target threshold. + # In this special case, it stops as soon as the process memory goes below the spill + # threshold, e.g. without a hysteresis cycle. Test that we didn't instead dump the + # whole data to disk (memory_limit * target = 0) + await assert_not_everything_is_spilled(a) + + +@pytest.mark.slow +@requires_zict +@gen_cluster( + nthreads=[("", 1)], + client=True, + worker_kwargs=dict( + memory_limit="1 GiB", # See FIXME note in previous test + memory_monitor_interval="10ms", + memory_target_fraction=0.4, + memory_spill_fraction=0.7, + memory_pause_fraction=False, + ), +) +async def test_spill_hysteresis(c, s, a): + memory = psutil.Process().memory_info().rss + a.memory_limit = (memory + 1e9) / 0.7 # Start spilling after 1 GB + + # Under-report managed memory, so that we reach the spill threshold for process + # memory without first reaching the target threshold for managed memory + class UnderReport: + def __init__(self): + self.data = "x" * int(100e6) # 100 MB + + def __sizeof__(self): + return 1 + + def __reduce__(self): + """Speed up test by writing very little to disk when spilling""" + return UnderReport, () + + max_in_memory = 0 + futures = [] + while not a.data.disk: + futures.append(c.submit(UnderReport, pure=False)) + max_in_memory = max(max_in_memory, len(a.data.memory)) + await wait(futures) + await asyncio.sleep(0.05) + max_in_memory = max(max_in_memory, len(a.data.memory)) + + # If there were no hysteresis, we would lose exactly 1 key. + # Note that, for this test to be meaningful, memory must shrink down readily when + # we deallocate Python objects. This is not always the case on Windows and MacOSX; + # on Linux we set MALLOC_TRIM to help in that regard. + # To verify that this test is useful, set target=spill and watch it fail. + while len(a.data.memory) > max_in_memory - 3: + await asyncio.sleep(0.01) + await assert_not_everything_is_spilled(a) + @pytest.mark.slow @gen_cluster( @@ -1386,11 +1458,17 @@ def __reduce__(self): async def test_pause_executor(c, s, a): # See notes in test_spill_spill_threshold memory = psutil.Process().memory_info().rss - a.memory_limit = memory / 0.8 + 200e6 + a.memory_limit = (memory + 160e6) / 0.8 # Pause after 200 MB + # Note: it's crucial to have a very large single chunk of memory that gets descoped + # all at once in order to instigate release of process memory. + # Read: https://github.com/dask/distributed/issues/5840 def f(): - x = "x" * int(250e6) - sleep(1) + # Add 400 MB unmanaged memory + x = "x" * int(400e6) + w = get_worker() + while w.status != Status.paused: + sleep(0.01) with captured_logger(logging.getLogger("distributed.worker")) as logger: future = c.submit(f, key="x") @@ -1469,22 +1547,19 @@ async def test_deque_handler(s): assert any(msg.msg == "foo456" for msg in deque_handler.deque) -@gen_cluster(nthreads=[], client=True) -async def test_avoid_memory_monitor_if_zero_limit(c, s): - worker = await Worker( - s.address, loop=s.loop, memory_limit=0, memory_monitor_interval=10 - ) - assert type(worker.data) is dict - assert "memory" not in worker.periodic_callbacks - +@gen_cluster( + client=True, + nthreads=[("", 1)], + worker_kwargs={"memory_limit": 0, "memory_monitor_interval": "10ms"}, +) +async def test_avoid_memory_monitor_if_zero_limit(c, s, a): + assert type(a.data) is dict + assert "memory" not in a.periodic_callbacks future = c.submit(inc, 1) assert (await future) == 2 - await asyncio.sleep(worker.memory_monitor_interval / 1000) - + await asyncio.sleep(0.05) await c.submit(inc, 2) # worker doesn't pause - await worker.close() - @gen_cluster( nthreads=[("127.0.0.1", 1)], diff --git a/distributed/worker.py b/distributed/worker.py index 6ed8cb5c5dd..749ebb3ea02 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -3743,7 +3743,14 @@ def check_pause(memory): frac * 100, ) start = time() - target = self.memory_limit * self.memory_target_fraction + # Implement hysteresis cycle where spilling starts at the spill threshold + # and stops at the target threshold. Normally that here the target threshold + # defines process memory, whereas normally it defines reported managed + # memory (e.g. output of sizeof() ). + # If target=False, disable hysteresis. + target = self.memory_limit * ( + self.memory_target_fraction or self.memory_spill_fraction + ) count = 0 need = memory - target while memory > target: diff --git a/docs/source/worker.rst b/docs/source/worker.rst index 495865357fd..eafaee1568f 100644 --- a/docs/source/worker.rst +++ b/docs/source/worker.rst @@ -149,7 +149,6 @@ exceptions to this are when: Memory Management ----------------- - Workers are given a target memory limit to stay under with the command line ``--memory-limit`` keyword or the ``memory_limit=`` Python keyword argument, which sets the memory limit per worker processes launched @@ -160,85 +159,73 @@ by dask-worker :: Workers use a few different heuristics to keep memory use beneath this limit: -1. At 60% of memory load (as estimated by ``sizeof``), spill least recently used data - to disk -2. At 70% of memory load (as reported by the OS), spill least recently used data to - disk regardless of what is reported by ``sizeof``; this accounts for memory used by - the python interpreter, modules, global variables, memory leaks, etc. -3. At 80% of memory load (as reported by the OS), stop accepting new work on local - thread pool -4. At 95% of memory load (as reported by the OS), terminate and restart the worker - -These values can be configured by modifying the ``~/.config/dask/distributed.yaml`` -file: - -.. code-block:: yaml - - distributed: - worker: - # Fractions of worker memory at which we take action to avoid memory blowup - # Set any of the lower three values to False to turn off the behavior entirely - memory: - target: 0.60 # target fraction to stay below - spill: 0.70 # fraction at which we spill to disk - pause: 0.80 # fraction at which we pause worker threads - terminate: 0.95 # fraction at which we terminate the worker - - -Spill data to disk -~~~~~~~~~~~~~~~~~~ - -Every time the worker finishes a task it estimates the size in bytes that the -result costs to keep in memory using the ``sizeof`` function. This function -defaults to :func:`sys.getsizeof` for arbitrary objects, which uses the standard -Python ``__sizeof__`` protocol, but also has special-cased implementations for -common data types like NumPy arrays and Pandas dataframes. +Spilling based on managed memory +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +Every time the worker finishes a task, it estimates the size in bytes that the result +costs to keep in memory using the ``sizeof`` function. This function defaults to +:func:`sys.getsizeof` for arbitrary objects, which uses the standard Python +``__sizeof__`` protocol, but also has special-cased implementations for common data +types like NumPy arrays and Pandas dataframes. The sum of the ``sizeof`` of all data +tracked by Dask is called :ref:`managed memory `_. -When the sum of the number of bytes of the data in memory exceeds 60% of the -memory limit, the worker will begin to dump the least recently used data -to disk. You can control this location with the ``--local-directory`` -keyword.:: +When the managed memory exceeds 60% of the memory limit (*target threshold*), the worker +will begin to dump the least recently used data to disk. You can control this location +with the ``--local-directory`` keyword:: $ dask-worker tcp://scheduler:port --memory-limit="4 GiB" --local-directory /scratch -That data is still available and will be read back from disk when necessary. -On the diagnostic dashboard status page disk I/O will show up in the task -stream plot as orange blocks. Additionally, the memory plot in the upper left -will become yellow and then red. - - -Monitor process memory load -~~~~~~~~~~~~~~~~~~~~~~~~~~~ +That data is still available and will be read back from disk when necessary. On the +diagnostic dashboard status page, disk I/O will show up in the task stream plot as +orange blocks. Additionally, the memory plot in the upper left will show a section of +the bar colored in grey. +Spilling based on process memory +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ The approach above can fail for a few reasons: 1. Custom objects may not report their memory size accurately 2. User functions may take up more RAM than expected 3. Significant amounts of data may accumulate in network I/O buffers -To address this we periodically monitor the memory of the worker process every -200 ms. If the system reported memory use is above 70% of the target memory -usage then the worker will start dumping unused data to disk, even if internal -``sizeof`` recording hasn't yet reached the normal 60% limit. - - -Halt worker threads -~~~~~~~~~~~~~~~~~~~ - -At 80% load, the worker's thread pool will stop starting computation on -additional tasks in the worker's queue. This gives time for the write-to-disk -functionality to take effect even in the face of rapidly accumulating data. -Currently executing tasks continue to run. +To address this, we periodically monitor the :ref:`process memory `_ of the +worker every 200 ms. If the system reported memory use is above 70% of the target memory +usage (*spill threshold*), then the worker will start dumping unused data to disk, even +if internal ``sizeof`` recording hasn't yet reached the normal 60% threshold. This +more aggressive spilling will continue until process memory falls below 60%. +Pause worker +~~~~~~~~~~~~ +At 80% :ref:`process memory `_ load, the worker's thread pool will stop +starting computation on additional tasks in the worker's queue. This gives time for the +write-to-disk functionality to take effect even in the face of rapidly accumulating +data. Currently executing tasks continue to run. Additionally, data transfers to/from +other workers are throttled to a bare minimum. Kill Worker ~~~~~~~~~~~ +At 95% :ref:`process memory `_ load (*terminate threshold*), a worker's nanny +process will terminate it. Tasks will be cancelled mid-execution and rescheduled +elsewhere; all unique data on the worker will be lost and will need to be recomputed. +This is to avoid having our worker job being terminated by an external watchdog (like +Kubernetes, YARN, Mesos, SGE, etc..). After termination, the nanny will restart the +worker in a fresh state. + +Thresholds configuration +~~~~~~~~~~~~~~~~~~~~~~~~ +These values can be configured by modifying the ``~/.config/dask/distributed.yaml`` +file: -At 95% memory load, a worker's nanny process will terminate it. This is to -avoid having our worker job being terminated by an external job scheduler (like -YARN, Mesos, SGE, etc..). After termination, the nanny will restart the worker -in a fresh state. +.. code-block:: yaml + distributed: + worker: + # Fractions of worker process memory at which we take action to avoid memory + # blowup. Set any of the values to False to turn off the behavior entirely. + memory: + target: 0.60 # fraction of managed memory where we start spilling to disk + spill: 0.70 # fraction of process memory where we start spilling to disk + pause: 0.80 # fraction of process memory at which we pause worker threads + terminate: 0.95 # fraction of process memory at which we terminate the worker Using the dashboard to monitor memory usage ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -246,16 +233,18 @@ The dashboard (typically available on port 8787) shows a summary of the overall usage on the cluster, as well as the individual usage on each worker. It provides different memory readings: +.. _memtypes: + process Overall memory used by the worker process (RSS), as measured by the OS managed - This is the sum of the ``sizeof`` of all dask data stored on the worker, excluding + Sum of the ``sizeof`` of all Dask data stored on the worker, excluding spilled data. unmanaged - This is the memory usage that dask is not directly aware of. It is estimated by - subtracting managed memory from the total process memory and typically includes: + Memory usage that Dask is not directly aware of. It is estimated by subtracting + managed memory from the total process memory and typically includes: - The Python interpreter code, loaded modules, and global variables - Memory temporarily used by running tasks @@ -280,7 +269,7 @@ unmanaged recent By default, :meth:`distributed.Client.rebalance` and :meth:`distributed.scheduler.Scheduler.rebalance` ignore unmanaged recent memory. - This behaviour can also be tweaked using the dask config - see the methods' + This behaviour can also be tweaked using the Dask config - see the methods' documentation. spilled @@ -399,7 +388,7 @@ documentation. Ignore process memory ~~~~~~~~~~~~~~~~~~~~~ -If all else fails, you may want to stop dask from using memory metrics from the OS (RSS) +If all else fails, you may want to stop Dask from using memory metrics from the OS (RSS) in its decision-making: .. code-block:: yaml