diff --git a/distributed/utils_comm.py b/distributed/utils_comm.py index 728b4b4c144..465bdc70447 100644 --- a/distributed/utils_comm.py +++ b/distributed/utils_comm.py @@ -42,6 +42,19 @@ async def gather_from_workers(who_has, rpc, close=True, serializers=None, who=No results = dict() all_bad_keys = set() + async def get_data_from_worker_error_busy(address, keys): + resp = await get_data_from_worker( + rpc, + keys, + address, + who=who, + serializers=serializers, + max_connections=False, + ) + if resp.get("state", "") == "busy": + raise OSError(-1, f"Worker {address} is busy") + return resp + while len(results) + len(all_bad_keys) < len(who_has): d = defaultdict(list) rev = dict() @@ -62,14 +75,12 @@ async def gather_from_workers(who_has, rpc, close=True, serializers=None, who=No try: coroutines = { address: asyncio.ensure_future( - get_data_from_worker( - rpc, - keys, + retry_operation( + get_data_from_worker_error_busy, address, - who=who, - serializers=serializers, - max_connections=False, - ) + keys, + operation="Get worker status", + ), ) for address, keys in d.items() }