Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 16 additions & 12 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2245,7 +2245,7 @@ def _transition(self, key, finish: str, *args, **kwargs):
ts._dependents = dependents
ts._dependencies = dependencies
parent._tasks[ts._key] = ts
for plugin in self.plugins.values():
for plugin in list(self.plugins.values()):
try:
plugin.transition(key, start, finish2, *args, **kwargs)
except Exception:
Expand Down Expand Up @@ -3949,7 +3949,9 @@ def del_scheduler_file():
for preload in self.preloads:
await preload.start()

await asyncio.gather(*[plugin.start(self) for plugin in self.plugins.values()])
await asyncio.gather(
*[plugin.start(self) for plugin in list(self.plugins.values())]
)

self.start_periodic_callbacks()

Expand Down Expand Up @@ -3988,7 +3990,9 @@ async def close(self, comm=None, fast=False, close_workers=False):
else:
break

await asyncio.gather(*[plugin.close() for plugin in self.plugins.values()])
await asyncio.gather(
*[plugin.close() for plugin in list(self.plugins.values())]
)

for pc in self.periodic_callbacks.values():
pc.stop()
Expand Down Expand Up @@ -4239,7 +4243,7 @@ async def add_worker(
if ws._nthreads > len(ws._processing):
parent._idle[ws._address] = ws

for plugin in self.plugins.values():
for plugin in list(self.plugins.values()):
try:
result = plugin.add_worker(scheduler=self, worker=address)
if inspect.isawaitable(result):
Expand Down Expand Up @@ -4653,7 +4657,7 @@ def update_graph(
recommendations[ts._key] = "erred"
break

for plugin in self.plugins.values():
for plugin in list(self.plugins.values()):
try:
plugin.update_graph(
self,
Expand Down Expand Up @@ -4916,7 +4920,7 @@ async def remove_worker(self, comm=None, address=None, safe=False, close=True):

self.transitions(recommendations)

for plugin in self.plugins.values():
for plugin in list(self.plugins.values()):
try:
result = plugin.remove_worker(scheduler=self, worker=address)
if inspect.isawaitable(result):
Expand Down Expand Up @@ -5219,7 +5223,7 @@ async def add_client(self, comm, client=None, versions=None):
self.log_event(["all", client], {"action": "add-client", "client": client})
parent._clients[client] = ClientState(client, versions=versions)

for plugin in self.plugins.values():
for plugin in list(self.plugins.values()):
try:
plugin.add_client(scheduler=self, client=client)
except Exception as e:
Expand Down Expand Up @@ -5274,7 +5278,7 @@ def remove_client(self, client=None):
)
del parent._clients[client]

for plugin in self.plugins.values():
for plugin in list(self.plugins.values()):
try:
plugin.remove_client(scheduler=self, client=client)
except Exception as e:
Expand Down Expand Up @@ -5511,8 +5515,8 @@ def remove_plugin(self, plugin=None, name=None):
self.plugins.pop(plugin.name)
else:
# TODO: Remove this block of code once removing plugins by value is disabled
if plugin in self.plugins.values():
if sum(plugin is p for p in self.plugins.values()) > 1:
if plugin in list(self.plugins.values()):
if sum(plugin is p for p in list(self.plugins.values())) > 1:
raise ValueError(
f"Multiple instances of {plugin} were found in the current scheduler "
"plugins, we cannot remove this plugin."
Expand Down Expand Up @@ -5755,7 +5759,7 @@ async def restart(self, client=None, timeout=30):

self.clear_task_state()

for plugin in self.plugins.values():
for plugin in list(self.plugins.values()):
try:
plugin.restart(self)
except Exception as e:
Expand Down Expand Up @@ -7023,7 +7027,7 @@ def start_task_metadata(self, comm=None, name=None):
def stop_task_metadata(self, comm=None, name=None):
plugins = [
p
for p in self.plugins.values()
for p in list(self.plugins.values())
if isinstance(p, CollectTaskMetaDataPlugin) and p.name == name
]
if len(plugins) != 1:
Expand Down