diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 927947c6208..4fb8d8b3bbf 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -5678,7 +5678,9 @@ def remove_plugin( f"Could not find plugin {name!r} among the current scheduler plugins" ) - async def register_scheduler_plugin(self, comm=None, plugin=None, name=None): + async def register_scheduler_plugin( + self, comm=None, plugin=None, name=None, idempotent=None + ): """Register a plugin on the scheduler.""" if not dask.config.get("distributed.scheduler.pickle"): raise ValueError( @@ -5689,12 +5691,18 @@ async def register_scheduler_plugin(self, comm=None, plugin=None, name=None): ) plugin = loads(plugin) + if name is None: + name = _get_plugin_name(plugin) + + if name in self.plugins and idempotent: + return + if hasattr(plugin, "start"): result = plugin.start(self) if inspect.isawaitable(result): await result - self.add_plugin(plugin, name=name) + self.add_plugin(plugin, name=name, idempotent=idempotent) def worker_send(self, worker, msg): """Send message to worker diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index f83146cde07..c584377de74 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -3271,3 +3271,49 @@ async def test__to_dict(c, s, a, b): "events", ] assert dct["tasks"][futs[0].key] + + +@gen_cluster(nthreads=[]) +async def test_idempotent_plugins(s): + + from distributed.diagnostics.plugin import SchedulerPlugin + + class IdempotentPlugin(SchedulerPlugin): + def __init__(self, instance=None): + self.name = "idempotentplugin" + self.instance = instance + + def start(self, scheduler): + if self.instance != "first": + raise RuntimeError( + "Only the first plugin should be started when idempotent is set" + ) + + first = IdempotentPlugin(instance="first") + await s.register_scheduler_plugin(plugin=dumps(first), idempotent=True) + assert "idempotentplugin" in s.plugins + + second = IdempotentPlugin(instance="second") + await s.register_scheduler_plugin(plugin=dumps(second), idempotent=True) + assert "idempotentplugin" in s.plugins + assert s.plugins["idempotentplugin"].instance == "first" + + +@gen_cluster(nthreads=[]) +async def test_non_idempotent_plugins(s): + + from distributed.diagnostics.plugin import SchedulerPlugin + + class NonIdempotentPlugin(SchedulerPlugin): + def __init__(self, instance=None): + self.name = "nonidempotentplugin" + self.instance = instance + + first = NonIdempotentPlugin(instance="first") + await s.register_scheduler_plugin(plugin=dumps(first), idempotent=False) + assert "nonidempotentplugin" in s.plugins + + second = NonIdempotentPlugin(instance="second") + await s.register_scheduler_plugin(plugin=dumps(second), idempotent=False) + assert "nonidempotentplugin" in s.plugins + assert s.plugins["nonidempotentplugin"].instance == "second"