Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions distributed/distributed.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,10 @@ distributed:

# Fractions of worker process memory at which we take action to avoid memory
# blowup. Set any of the values to False to turn off the behavior entirely.
target: 0.60 # target fraction to stay below
spill: 0.70 # fraction at which we spill to disk
pause: 0.80 # fraction at which we pause worker threads
terminate: 0.95 # fraction at which we terminate the worker
target: 0.60 # fraction of managed memory where we start spilling to disk
spill: 0.70 # fraction of process memory where we start spilling to disk
pause: 0.80 # fraction of process memory at which we pause worker threads
terminate: 0.95 # fraction of process memory at which we terminate the worker

# Max size of the spill file on disk (e.g. "10 GB")
# Set to false for no maximum.
Expand Down
113 changes: 94 additions & 19 deletions distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1192,6 +1192,7 @@ async def test_statistical_profiling_2(c, s, a, b):
break


@requires_zict
@gen_cluster(
client=True,
nthreads=[("", 1)],
Expand Down Expand Up @@ -1302,8 +1303,8 @@ async def test_spill_spill_threshold(c, s, a):
# FIXME https://github.com/dask/distributed/issues/5367
# This works just by luck for the purpose of the spill and pause thresholds,
# and does NOT work for the target threshold.
memory = s.workers[a.address].memory.process
a.memory_limit = memory / 0.7 + 400e6
memory = psutil.Process().memory_info().rss
a.memory_limit = (memory + 300e6) / 0.7

class UnderReport:
"""100 MB process memory, 10 bytes reported managed memory"""
Expand All @@ -1324,6 +1325,21 @@ def __reduce__(self):
await asyncio.sleep(0.01)


async def assert_not_everything_is_spilled(w: Worker) -> None:
start = time()
while time() < start + 0.5:
assert w.data
if not w.data.memory: # type: ignore
# The hysteresis system fails on Windows and MacOSX because process memory
# is very slow to shrink down after calls to PyFree. As a result,
# Worker.memory_monitor will continue spilling until there's nothing left.
# Nothing we can do about this short of finding either a way to change this
# behaviour at OS level or a better measure of allocated memory.
assert not LINUX, "All data was spilled to disk"
raise pytest.xfail("https://github.com/dask/distributed/issues/5840")
await asyncio.sleep(0)


@requires_zict
@gen_cluster(
nthreads=[("", 1)],
Expand All @@ -1345,8 +1361,8 @@ async def test_spill_no_target_threshold(c, s, a):
"""Test that you can enable the spill threshold while leaving the target threshold
to False
"""
memory = s.workers[a.address].memory.process
a.memory_limit = memory / 0.7 + 400e6
memory = psutil.Process().memory_info().rss
a.memory_limit = (memory + 300e6) / 0.7 # 300 MB before we start spilling

class OverReport:
"""Configurable process memory, 10 GB reported managed memory"""
Expand All @@ -1365,12 +1381,68 @@ def __reduce__(self):
await wait(f1)
assert set(a.data.memory) == {"f1"}

# 800 MB. Use large chunks to stimulate timely release of process memory.
futures = c.map(OverReport, range(int(100e6), int(100e6) + 8))

while not a.data.disk:
await asyncio.sleep(0.01)
assert "f1" in a.data.disk

# Spilling normally starts at the spill threshold and stops at the target threshold.
# In this special case, it stops as soon as the process memory goes below the spill
# threshold, e.g. without a hysteresis cycle. Test that we didn't instead dump the
# whole data to disk (memory_limit * target = 0)
await assert_not_everything_is_spilled(a)


@pytest.mark.slow
@requires_zict
@gen_cluster(
nthreads=[("", 1)],
client=True,
worker_kwargs=dict(
memory_limit="1 GiB", # See FIXME note in previous test
memory_monitor_interval="10ms",
memory_target_fraction=0.4,
memory_spill_fraction=0.7,
memory_pause_fraction=False,
),
)
async def test_spill_hysteresis(c, s, a):
memory = psutil.Process().memory_info().rss
a.memory_limit = (memory + 1e9) / 0.7 # Start spilling after 1 GB

# Under-report managed memory, so that we reach the spill threshold for process
# memory without first reaching the target threshold for managed memory
class UnderReport:
def __init__(self):
self.data = "x" * int(100e6) # 100 MB

def __sizeof__(self):
return 1

def __reduce__(self):
"""Speed up test by writing very little to disk when spilling"""
return UnderReport, ()

max_in_memory = 0
futures = []
while not a.data.disk:
futures.append(c.submit(UnderReport, pure=False))
max_in_memory = max(max_in_memory, len(a.data.memory))
await wait(futures)
await asyncio.sleep(0.05)
max_in_memory = max(max_in_memory, len(a.data.memory))

# If there were no hysteresis, we would lose exactly 1 key.
# Note that, for this test to be meaningful, memory must shrink down readily when
# we deallocate Python objects. This is not always the case on Windows and MacOSX;
# on Linux we set MALLOC_TRIM to help in that regard.
# To verify that this test is useful, set target=spill and watch it fail.
while len(a.data.memory) > max_in_memory - 3:
await asyncio.sleep(0.01)
await assert_not_everything_is_spilled(a)


@pytest.mark.slow
@gen_cluster(
Expand All @@ -1386,11 +1458,17 @@ def __reduce__(self):
async def test_pause_executor(c, s, a):
# See notes in test_spill_spill_threshold
memory = psutil.Process().memory_info().rss
a.memory_limit = memory / 0.8 + 200e6
a.memory_limit = (memory + 160e6) / 0.8 # Pause after 200 MB

# Note: it's crucial to have a very large single chunk of memory that gets descoped
# all at once in order to instigate release of process memory.
# Read: https://github.com/dask/distributed/issues/5840
def f():
x = "x" * int(250e6)
sleep(1)
# Add 400 MB unmanaged memory
x = "x" * int(400e6)
w = get_worker()
while w.status != Status.paused:
sleep(0.01)

with captured_logger(logging.getLogger("distributed.worker")) as logger:
future = c.submit(f, key="x")
Expand Down Expand Up @@ -1469,22 +1547,19 @@ async def test_deque_handler(s):
assert any(msg.msg == "foo456" for msg in deque_handler.deque)


@gen_cluster(nthreads=[], client=True)
async def test_avoid_memory_monitor_if_zero_limit(c, s):
worker = await Worker(
s.address, loop=s.loop, memory_limit=0, memory_monitor_interval=10
)
assert type(worker.data) is dict
assert "memory" not in worker.periodic_callbacks

@gen_cluster(
client=True,
nthreads=[("", 1)],
worker_kwargs={"memory_limit": 0, "memory_monitor_interval": "10ms"},
)
async def test_avoid_memory_monitor_if_zero_limit(c, s, a):
assert type(a.data) is dict
assert "memory" not in a.periodic_callbacks
future = c.submit(inc, 1)
assert (await future) == 2
await asyncio.sleep(worker.memory_monitor_interval / 1000)

await asyncio.sleep(0.05)
await c.submit(inc, 2) # worker doesn't pause

await worker.close()


@gen_cluster(
nthreads=[("127.0.0.1", 1)],
Expand Down
9 changes: 8 additions & 1 deletion distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3743,7 +3743,14 @@ def check_pause(memory):
frac * 100,
)
start = time()
target = self.memory_limit * self.memory_target_fraction
# Implement hysteresis cycle where spilling starts at the spill threshold
# and stops at the target threshold. Normally that here the target threshold
# defines process memory, whereas normally it defines reported managed
# memory (e.g. output of sizeof() ).
# If target=False, disable hysteresis.
target = self.memory_limit * (
self.memory_target_fraction or self.memory_spill_fraction
)
count = 0
need = memory - target
while memory > target:
Expand Down
125 changes: 57 additions & 68 deletions docs/source/worker.rst
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ exceptions to this are when:

Memory Management
-----------------

Workers are given a target memory limit to stay under with the
command line ``--memory-limit`` keyword or the ``memory_limit=`` Python
keyword argument, which sets the memory limit per worker processes launched
Expand All @@ -160,102 +159,92 @@ by dask-worker ::

Workers use a few different heuristics to keep memory use beneath this limit:

1. At 60% of memory load (as estimated by ``sizeof``), spill least recently used data
to disk
2. At 70% of memory load (as reported by the OS), spill least recently used data to
disk regardless of what is reported by ``sizeof``; this accounts for memory used by
the python interpreter, modules, global variables, memory leaks, etc.
3. At 80% of memory load (as reported by the OS), stop accepting new work on local
thread pool
4. At 95% of memory load (as reported by the OS), terminate and restart the worker

These values can be configured by modifying the ``~/.config/dask/distributed.yaml``
file:

.. code-block:: yaml

distributed:
worker:
# Fractions of worker memory at which we take action to avoid memory blowup
# Set any of the lower three values to False to turn off the behavior entirely
memory:
target: 0.60 # target fraction to stay below
spill: 0.70 # fraction at which we spill to disk
pause: 0.80 # fraction at which we pause worker threads
terminate: 0.95 # fraction at which we terminate the worker


Spill data to disk
~~~~~~~~~~~~~~~~~~

Every time the worker finishes a task it estimates the size in bytes that the
result costs to keep in memory using the ``sizeof`` function. This function
defaults to :func:`sys.getsizeof` for arbitrary objects, which uses the standard
Python ``__sizeof__`` protocol, but also has special-cased implementations for
common data types like NumPy arrays and Pandas dataframes.
Spilling based on managed memory
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Every time the worker finishes a task, it estimates the size in bytes that the result
costs to keep in memory using the ``sizeof`` function. This function defaults to
:func:`sys.getsizeof` for arbitrary objects, which uses the standard Python
``__sizeof__`` protocol, but also has special-cased implementations for common data
types like NumPy arrays and Pandas dataframes. The sum of the ``sizeof`` of all data
tracked by Dask is called :ref:`managed memory <memtypes>`_.

When the sum of the number of bytes of the data in memory exceeds 60% of the
memory limit, the worker will begin to dump the least recently used data
to disk. You can control this location with the ``--local-directory``
keyword.::
When the managed memory exceeds 60% of the memory limit (*target threshold*), the worker
will begin to dump the least recently used data to disk. You can control this location
with the ``--local-directory`` keyword::

$ dask-worker tcp://scheduler:port --memory-limit="4 GiB" --local-directory /scratch

That data is still available and will be read back from disk when necessary.
On the diagnostic dashboard status page disk I/O will show up in the task
stream plot as orange blocks. Additionally, the memory plot in the upper left
will become yellow and then red.


Monitor process memory load
~~~~~~~~~~~~~~~~~~~~~~~~~~~
That data is still available and will be read back from disk when necessary. On the
diagnostic dashboard status page, disk I/O will show up in the task stream plot as
orange blocks. Additionally, the memory plot in the upper left will show a section of
the bar colored in grey.

Spilling based on process memory
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The approach above can fail for a few reasons:

1. Custom objects may not report their memory size accurately
2. User functions may take up more RAM than expected
3. Significant amounts of data may accumulate in network I/O buffers

To address this we periodically monitor the memory of the worker process every
200 ms. If the system reported memory use is above 70% of the target memory
usage then the worker will start dumping unused data to disk, even if internal
``sizeof`` recording hasn't yet reached the normal 60% limit.


Halt worker threads
~~~~~~~~~~~~~~~~~~~

At 80% load, the worker's thread pool will stop starting computation on
additional tasks in the worker's queue. This gives time for the write-to-disk
functionality to take effect even in the face of rapidly accumulating data.
Currently executing tasks continue to run.
To address this, we periodically monitor the :ref:`process memory <memtypes>`_ of the
worker every 200 ms. If the system reported memory use is above 70% of the target memory
usage (*spill threshold*), then the worker will start dumping unused data to disk, even
if internal ``sizeof`` recording hasn't yet reached the normal 60% threshold. This
more aggressive spilling will continue until process memory falls below 60%.

Pause worker
~~~~~~~~~~~~
At 80% :ref:`process memory <memtypes>`_ load, the worker's thread pool will stop
starting computation on additional tasks in the worker's queue. This gives time for the
write-to-disk functionality to take effect even in the face of rapidly accumulating
data. Currently executing tasks continue to run. Additionally, data transfers to/from
other workers are throttled to a bare minimum.

Kill Worker
~~~~~~~~~~~
At 95% :ref:`process memory <memtypes>`_ load (*terminate threshold*), a worker's nanny
process will terminate it. Tasks will be cancelled mid-execution and rescheduled
elsewhere; all unique data on the worker will be lost and will need to be recomputed.
This is to avoid having our worker job being terminated by an external watchdog (like
Kubernetes, YARN, Mesos, SGE, etc..). After termination, the nanny will restart the
worker in a fresh state.

Thresholds configuration
~~~~~~~~~~~~~~~~~~~~~~~~
These values can be configured by modifying the ``~/.config/dask/distributed.yaml``
file:

At 95% memory load, a worker's nanny process will terminate it. This is to
avoid having our worker job being terminated by an external job scheduler (like
YARN, Mesos, SGE, etc..). After termination, the nanny will restart the worker
in a fresh state.
.. code-block:: yaml

distributed:
worker:
# Fractions of worker process memory at which we take action to avoid memory
# blowup. Set any of the values to False to turn off the behavior entirely.
memory:
target: 0.60 # fraction of managed memory where we start spilling to disk
spill: 0.70 # fraction of process memory where we start spilling to disk
pause: 0.80 # fraction of process memory at which we pause worker threads
terminate: 0.95 # fraction of process memory at which we terminate the worker

Using the dashboard to monitor memory usage
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The dashboard (typically available on port 8787) shows a summary of the overall memory
usage on the cluster, as well as the individual usage on each worker. It provides
different memory readings:

.. _memtypes:

process
Overall memory used by the worker process (RSS), as measured by the OS

managed
This is the sum of the ``sizeof`` of all dask data stored on the worker, excluding
Sum of the ``sizeof`` of all Dask data stored on the worker, excluding
spilled data.

unmanaged
This is the memory usage that dask is not directly aware of. It is estimated by
subtracting managed memory from the total process memory and typically includes:
Memory usage that Dask is not directly aware of. It is estimated by subtracting
managed memory from the total process memory and typically includes:

- The Python interpreter code, loaded modules, and global variables
- Memory temporarily used by running tasks
Expand All @@ -280,7 +269,7 @@ unmanaged recent

By default, :meth:`distributed.Client.rebalance` and
:meth:`distributed.scheduler.Scheduler.rebalance` ignore unmanaged recent memory.
This behaviour can also be tweaked using the dask config - see the methods'
This behaviour can also be tweaked using the Dask config - see the methods'
documentation.

spilled
Expand Down Expand Up @@ -399,7 +388,7 @@ documentation.

Ignore process memory
~~~~~~~~~~~~~~~~~~~~~
If all else fails, you may want to stop dask from using memory metrics from the OS (RSS)
If all else fails, you may want to stop Dask from using memory metrics from the OS (RSS)
in its decision-making:

.. code-block:: yaml
Expand Down