Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions distributed/tests/test_semaphore.py
Original file line number Diff line number Diff line change
Expand Up @@ -509,10 +509,23 @@ async def test_metrics(c, s, a, b):
def test_threadpoolworkers_pick_correct_ioloop(cleanup):
# gh4057

# About picking appropriate values for the various timings
# * Sleep time in `access_limited` impacts test runtime but is arbitrary
# * `lease-timeout` should be smaller than the sleep time. This is what the
# test builds on. assuming the leases cannot be refreshed, e.g. wrong
# event loop picked / PeriodicCallback never scheduled, the semaphore
# would become oversubscribed and the len(protected_ressources) becomes
# non zero. This should also trigger a log message about "unknown leases"
# and fails the test.
# * `lease-validation-interval` interval should be the smallest quantity.
# How often leases are checked for staleness is hard coded atm and a fifth
# of the `lease-timeout`. Accounting for this and some jitter, this should
# be sufficiently small to ensure smooth operation.

with dask.config.set(
{
"distributed.scheduler.locks.lease-validation-interval": 0.01,
"distributed.scheduler.locks.lease-timeout": 0.05,
"distributed.scheduler.locks.lease-timeout": 0.1,
}
):
with Client(processes=False, threads_per_worker=4) as client:
Expand All @@ -526,7 +539,7 @@ def access_limited(val, sem):
assert len(protected_ressource) == 0
protected_ressource.append(val)
# Interact with the DB
time.sleep(0.1)
time.sleep(0.2)
protected_ressource.remove(val)

client.gather(client.map(access_limited, range(10), sem=sem))
Expand Down