From 36aa7f38dcb3b26ca18761db0bff1d5b3c650ee4 Mon Sep 17 00:00:00 2001 From: Alex Ford Date: Wed, 11 May 2022 13:32:24 -0700 Subject: [PATCH] Add idempotent to register_scheduler_plugin client Updates client-side interface of register_scheduler_plugin to pass through idempotent flag. Extending https://github.com/dask/distributed/pull/5545 Finalizing removal of kwargs support deprecated in https://github.com/dask/distributed/pull/5699 and removed in https://github.com/dask/distributed/pull/6144 --- distributed/client.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/distributed/client.py b/distributed/client.py index 240695cc73c..175f0fc4f41 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -4418,16 +4418,14 @@ async def _get_task_stream( else: return msgs - async def _register_scheduler_plugin(self, plugin, name, **kwargs): - if isinstance(plugin, type): - plugin = plugin(**kwargs) - + async def _register_scheduler_plugin(self, plugin, name, idempotent=False): return await self.scheduler.register_scheduler_plugin( plugin=dumps(plugin, protocol=4), name=name, + idempotent=idempotent, ) - def register_scheduler_plugin(self, plugin, name=None): + def register_scheduler_plugin(self, plugin, name=None, idempotent=False): """Register a scheduler plugin. See https://distributed.readthedocs.io/en/latest/plugins.html#scheduler-plugins @@ -4439,6 +4437,8 @@ def register_scheduler_plugin(self, plugin, name=None): name : str Name for the plugin; if None, a name is taken from the plugin instance or automatically generated if not present. + idempotent : bool + Do not re-register if a plugin of the given name already exists. """ if name is None: name = _get_plugin_name(plugin) @@ -4447,6 +4447,7 @@ def register_scheduler_plugin(self, plugin, name=None): self._register_scheduler_plugin, plugin=plugin, name=name, + idempotent=idempotent, ) def register_worker_callbacks(self, setup=None):