From 2dece13d3815815eca8c6710e7eec8279ebf2a57 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Fri, 4 Dec 2020 22:16:14 -0800 Subject: [PATCH 1/2] Use `0` for `memory_limit` instead of `None` We need an integer to pass to `WorkerState` so set this to `0` instead of `None`. --- distributed/scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index cf82573d2c5..2d84c6ce2cf 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -1888,7 +1888,7 @@ async def add_worker( now=None, resources=None, host_info=None, - memory_limit=None, + memory_limit=0, metrics=None, pid=0, services=None, From db8aef54e27be8d937f59b5a19daf1287b25085a Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Fri, 4 Dec 2020 22:23:48 -0800 Subject: [PATCH 2/2] Use `list` comprehension to bind `WorkerState` As Cython may not type things based on outer scope variables when using a generator, convert the generator to a `list` comprehension, which does respect the type of external variables used within the loop. This should fix some attribute access errors seen here. --- distributed/scheduler.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 2d84c6ce2cf..e9a7846f9e0 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -3555,10 +3555,10 @@ async def replicate( del_worker_tasks[ws].add(ts) await asyncio.gather( - *( + *[ self._delete_worker_data(ws._address, [t.key for t in tasks]) for ws, tasks in del_worker_tasks.items() - ) + ] ) # Copy not-yet-filled data @@ -5536,8 +5536,10 @@ def profile_to_figure(state): tasks_timings=tasks_timings, address=self.address, nworkers=len(self.workers), - threads=sum(ws._nthreads for ws in self.workers.values()), - memory=format_bytes(sum(ws._memory_limit for ws in self.workers.values())), + threads=sum([ws._nthreads for ws in self.workers.values()]), + memory=format_bytes( + sum([ws._memory_limit for ws in self.workers.values()]) + ), code=code, dask_version=dask.__version__, distributed_version=distributed.__version__,