Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 20 additions & 14 deletions distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1180,33 +1180,39 @@ async def test_statistical_profiling_2(c, s, a, b):


@gen_cluster(
nthreads=[("127.0.0.1", 1)],
nthreads=[("", 1)],
client=True,
worker_kwargs={"memory_monitor_interval": 10},
config={
"distributed.worker.memory.target": False,
"distributed.worker.memory.spill": 0.7,
},
Copy link
Collaborator Author

@crusaderky crusaderky Feb 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no change from default for spill, but I like to spell out explicitly relevant settings - also in the case they change in the future

worker_kwargs={"memory_monitor_interval": "10ms"},
)
async def test_robust_to_bad_sizeof_estimates(c, s, a):
np = pytest.importorskip("numpy")
memory = psutil.Process().memory_info().rss
"""Test that the spill threshold uses the process memory and not the managed memory
reported by sizeof(), which may be inaccurate
"""
memory = s.workers[a.address].memory.process
# Reach 'spill' threshold after 400MB of managed data
a.memory_limit = memory / 0.7 + 400e6
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this works just by luck today for the purpose of the spill threshold, and does NOT work for the target threshold. See #5367


class BadAccounting:
def __init__(self, data):
self.data = data
"""100 MB process memory, 10 bytes reported managed memory"""

def __init__(self, *args):
self.data = "x" * int(100e6)

def __sizeof__(self):
return 10

def f(n):
x = np.ones(int(n), dtype="u1")
result = BadAccounting(x)
return result
def __reduce__(self):
"""Speed up test by writing very little to disk when spilling"""
return BadAccounting, ()

futures = c.map(f, [100e6] * 8, pure=False)
futures = c.map(BadAccounting, range(8))

start = time()
while not a.data.disk:
await asyncio.sleep(0.1)
assert time() < start + 5
await asyncio.sleep(0.01)


@pytest.mark.slow
Expand Down