Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions distributed/semaphore.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,17 @@ def __init__(self, scheduler):
dask.config.get("distributed.scheduler.locks.lease-validation-interval"),
default="s",
)
self._pc_lease_timeout = PeriodicCallback(
self.scheduler.periodic_callbacks[
"semaphore-lease-timeout"
] = pc = PeriodicCallback(
self._check_lease_timeout, validation_callback_time * 1000
)
self._pc_lease_timeout.start()
pc.start()
self.lease_timeout = parse_timedelta(
dask.config.get("distributed.scheduler.locks.lease-timeout"), default="s"
)

async def get_value(self, name=None):
def get_value(self, name=None):
return len(self.leases[name])

# `comm` here is required by the handler interface
Expand Down Expand Up @@ -527,6 +529,7 @@ def __setstate__(self, state):
)

def close(self):
self.refresh_callback.stop()
return self.sync(self.scheduler.semaphore_close, name=self.name)

def __del__(self):
Expand Down
56 changes: 15 additions & 41 deletions distributed/tests/test_semaphore.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,11 @@ async def test_async_ctx(s, a, b):
assert await sem.acquire()


@pytest.mark.slow
def test_worker_dies(loop):
with cluster(
config={
"distributed.scheduler.locks.lease-timeout": "0.1s",
"distributed.scheduler.locks.lease-timeout": "50ms",
"distributed.scheduler.locks.lease-validation-interval": "10ms",
}
) as (scheduler, workers):
with Client(scheduler["address"], loop=loop) as client:
Expand Down Expand Up @@ -191,9 +191,8 @@ def f(x, release=True):
assert result.count(False) == 9


@pytest.mark.slow
@gen_cluster(client=True, timeout=120)
async def test_close_async(c, s, a, b):
@gen_cluster(client=True, nthreads=[("", 1)])
async def test_close_async(c, s, a):
sem = await Semaphore(name="test")

assert await sem.acquire()
Expand All @@ -212,7 +211,7 @@ async def test_close_async(c, s, a, b):
assert await sem2.acquire()

def f(sem_):
return sem_.acquire()
sem_.acquire(timeout="0.5s")

semaphore_object = s.extensions["semaphores"]
fire_and_forget(c.submit(f, sem_=sem2))
Expand Down Expand Up @@ -517,7 +516,7 @@ def access_limited(val, sem):
assert len(protected_resource) == 0
protected_resource.append(val)
# Interact with the DB
time.sleep(0.2)
time.sleep(0.01)
protected_resource.remove(val)

client.gather(client.map(access_limited, range(10), sem=sem))
Expand Down Expand Up @@ -556,50 +555,25 @@ async def test_release_retry(c, s, a, b):
"distributed.scheduler.locks.lease-validation-interval": "100ms",
},
)
async def test_release_failure(c, s, a, b):
async def test_release_failure(c, s, a, b, caplog):
"""Don't raise even if release fails: lease will be cleaned up by the
lease-validation after a specified interval anyway (see config parameters used).
"""

with dask.config.set({"distributed.comm.retry.count": 1}):
pool = await FlakyConnectionPool(failing_connections=5)

ext = s.extensions["semaphores"]
name = "foo"
semaphore = await Semaphore(
max_leases=2,
name="resource_we_want_to_limit",
name=name,
scheduler_rpc=pool(s.address),
)
await semaphore.acquire()
pool.activate() # Comm chaos starts
assert await semaphore.release() is False

# Release fails (after a single retry) because of broken connections
with captured_logger(
"distributed.semaphore", level=logging.ERROR
) as semaphore_log:
with captured_logger("distributed.utils_comm") as retry_log:
assert await semaphore.release() is False

with captured_logger(
"distributed.semaphore", level=logging.DEBUG
) as semaphore_cleanup_log:
pool.deactivate() # comm chaos stops
assert await semaphore.get_value() == 1 # lease is still registered
await asyncio.sleep(0.2) # Wait for lease to be cleaned up

# Check release was retried
retry_log = retry_log.getvalue().split("\n")[0]
assert retry_log.startswith(
"Retrying semaphore release:"
) and retry_log.endswith("after exception in attempt 0/1: ")
# Check release failed
semaphore_log = semaphore_log.getvalue().split("\n")[0]
assert semaphore_log.startswith(
"Release failed for id="
) and semaphore_log.endswith("Cluster network might be unstable?")

# Check lease has timed out
assert any(
log.startswith("Lease") and "timed out after" in log
for log in semaphore_cleanup_log.getvalue().split("\n")
)
assert await semaphore.get_value() == 0
pool.deactivate() # comm chaos stops
assert ext.get_value(name) == 1 # lease is still registered
while not (await semaphore.get_value() == 0):
await asyncio.sleep(0.01)