diff --git a/distributed/cfexecutor.py b/distributed/cfexecutor.py index fda2fe63fb3..92031247c26 100644 --- a/distributed/cfexecutor.py +++ b/distributed/cfexecutor.py @@ -128,10 +128,9 @@ def map(self, fn, *iterables, **kwargs): fs = self._client.map(fn, *iterables, **self._kwargs) - if isinstance(fs, list): - # Below iterator relies on this being a generator to cancel - # remaining futures - fs = (val for val in fs) + # Below iterator relies on fs being an iterator itself, and not just an iterable + # (such as a list), in order to cancel remaining futures + fs = iter(fs) # Yield must be hidden in closure so that the tasks are submitted # before the first iterator value is required. @@ -148,8 +147,7 @@ def result_iterator(): yield future.result() finally: remaining = list(fs) - for future in remaining: - self._futures.add(future) + self._futures.update(remaining) self._client.cancel(remaining) return result_iterator()