Skip to content

Client/Scheduler gather not robust to busy worker #4698

@fbriol

Description

@fbriol

When I run a lot of tasks on the CNES HPC with a big Dask cluster (512 threads/128 workers), I sometimes have communication errors between the scheduler and the workers. The error is thrown by the module "distributed/utils_comm.py," because the code tries to read the key 'data' which does not exist in the dictionary.

    /home/ad/briolf/anaconda3/lib/python3.8/site-packages/distributed/utils_comm.py in retry_operation
          383         dask.config.get("distributed.comm.retry.delay.max"), default="s"
          384     )
    ----> 385     return await retry(
          386         partial(coro, *args, **kwargs),
          387         count=retry_count,
    
    /home/ad/briolf/anaconda3/lib/python3.8/site-packages/distributed/utils_comm.py in retry
          368                 delay *= 1 + random.random() * jitter_fraction
          369             await asyncio.sleep(delay)
    ----> 370     return await coro()
          371 
          372 
    
    /home/ad/briolf/anaconda3/lib/python3.8/site-packages/distributed/core.py in send_recv_from_rpc
          860             name, comm.name = comm.name, "ConnectionPool." + key
          861             try:
    ----> 862                 result = await send_recv(comm=comm, op=key, **kwargs)
          863             finally:
          864                 self.pool.reuse(self.addr, comm)
    
    /home/ad/briolf/anaconda3/lib/python3.8/site-packages/distributed/core.py in send_recv
          659         if comm.deserialize:
          660             typ, exc, tb = clean_exception(**response)
    ----> 661             raise exc.with_traceback(tb)
          662         else:
          663             raise Exception(response["text"])
    
    /home/ad/briolf/anaconda3/lib/python3.8/site-packages/distributed/core.py in handle_comm
          499                             result = asyncio.ensure_future(result)
          500                             self._ongoing_coroutines.add(result)
    ----> 501                             result = await result
          502                     except (CommClosedError, CancelledError) as e:
          503                         if self.status == Status.running:
    
    /home/ad/briolf/anaconda3/lib/python3.8/site-packages/distributed/scheduler.py in gather
          5052                 who_has[key] = []
          5053 
    ----> 5054         data, missing_keys, missing_workers = await gather_from_workers(
          5055             who_has, rpc=self.rpc, close=False, serializers=serializers
          5056         )
    
    /home/ad/briolf/anaconda3/lib/python3.8/site-packages/distributed/utils_comm.py in gather_from_workers
          86                     missing_workers.add(worker)
          87                 else:
    ----> 88                     response.update(r["data"])
          89         finally:
          90             for r in rpcs.values():
    
    KeyError: 'data'

I modified the module code, to display the content of this dictionary when this error is thrown. I saw that in this context, the dictionary just contains the following data: dict(status='busy').

I cannot put reproducible code because the software used is not public and I did not discover simple lines of code to reproduce this problem. I can eventually do other tests to give you further information.

Environment:

  • Dask version: 2021.4.0
  • Python version: 3.8.8
  • Operating System: CentOS Linux release 7.6.1810
  • Install method (conda, pip, source): conda, channel conda-forge

Metadata

Metadata

Assignees

Labels

bugSomething is broken

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions