From f2fd0af0143f71a91dabe466748e172040877259 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Thu, 19 Aug 2021 15:54:46 +0100 Subject: [PATCH] Minor polish on cfexecutor --- distributed/cfexecutor.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/distributed/cfexecutor.py b/distributed/cfexecutor.py index fda2fe63fb3..92031247c26 100644 --- a/distributed/cfexecutor.py +++ b/distributed/cfexecutor.py @@ -128,10 +128,9 @@ def map(self, fn, *iterables, **kwargs): fs = self._client.map(fn, *iterables, **self._kwargs) - if isinstance(fs, list): - # Below iterator relies on this being a generator to cancel - # remaining futures - fs = (val for val in fs) + # Below iterator relies on fs being an iterator itself, and not just an iterable + # (such as a list), in order to cancel remaining futures + fs = iter(fs) # Yield must be hidden in closure so that the tasks are submitted # before the first iterator value is required. @@ -148,8 +147,7 @@ def result_iterator(): yield future.result() finally: remaining = list(fs) - for future in remaining: - self._futures.add(future) + self._futures.update(remaining) self._client.cancel(remaining) return result_iterator()