From 5b12bb800a8d03263987b0bf58e4b16cfdad7b54 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Wed, 24 Nov 2021 16:17:46 +0000 Subject: [PATCH 1/5] Allow idempotent scheduler plugins to be registered via the comm --- distributed/scheduler.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 927947c6208..5fdbed8189a 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -5678,7 +5678,9 @@ def remove_plugin( f"Could not find plugin {name!r} among the current scheduler plugins" ) - async def register_scheduler_plugin(self, comm=None, plugin=None, name=None): + async def register_scheduler_plugin( + self, comm=None, plugin=None, name=None, idempotent=False + ): """Register a plugin on the scheduler.""" if not dask.config.get("distributed.scheduler.pickle"): raise ValueError( @@ -5694,7 +5696,7 @@ async def register_scheduler_plugin(self, comm=None, plugin=None, name=None): if inspect.isawaitable(result): await result - self.add_plugin(plugin, name=name) + self.add_plugin(plugin, name=name, idempotent=idempotent) def worker_send(self, worker, msg): """Send message to worker From 469f506b0a54ca5892e17b07ead1a97026c3c4aa Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Wed, 24 Nov 2021 16:20:14 +0000 Subject: [PATCH 2/5] Avoid setting default in two places --- distributed/scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 5fdbed8189a..9c2188184d6 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -5679,7 +5679,7 @@ def remove_plugin( ) async def register_scheduler_plugin( - self, comm=None, plugin=None, name=None, idempotent=False + self, comm=None, plugin=None, name=None, idempotent=None ): """Register a plugin on the scheduler.""" if not dask.config.get("distributed.scheduler.pickle"): From 971172a3bf248ba92a81007b5b7c51aeda58f462 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Tue, 30 Nov 2021 09:54:47 +0000 Subject: [PATCH 3/5] Add tests and another idempotent check --- distributed/scheduler.py | 6 ++++ distributed/tests/test_scheduler.py | 51 +++++++++++++++++++++++++++++ 2 files changed, 57 insertions(+) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 9c2188184d6..4fb8d8b3bbf 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -5691,6 +5691,12 @@ async def register_scheduler_plugin( ) plugin = loads(plugin) + if name is None: + name = _get_plugin_name(plugin) + + if name in self.plugins and idempotent: + return + if hasattr(plugin, "start"): result = plugin.start(self) if inspect.isawaitable(result): diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index f83146cde07..38bf20df08e 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -3271,3 +3271,54 @@ async def test__to_dict(c, s, a, b): "events", ] assert dct["tasks"][futs[0].key] + + +@gen_cluster(nthreads=[]) +async def test_idempotent_plugins(s): + + from distributed.diagnostics.plugin import SchedulerPlugin + + class IdempotentPlugin(SchedulerPlugin): + def __init__(self, instance=None): + self.name = "idempotentplugin" + self.instance = instance + self.status = None + + def start(self, scheduler): + if self.instance != "first": + raise RuntimeError( + "Only the first plugin should be started when idempotent is set" + ) + + first = IdempotentPlugin(instance="first") + await s.register_scheduler_plugin(plugin=dumps(first), idempotent=True) + assert "idempotentplugin" in s.plugins + + second = IdempotentPlugin(instance="second") + await s.register_scheduler_plugin(plugin=dumps(second), idempotent=True) + assert "idempotentplugin" in s.plugins + assert s.plugins["idempotentplugin"].instance == "first" + + +@gen_cluster(nthreads=[]) +async def test_non_idempotent_plugins(s): + + from distributed.diagnostics.plugin import SchedulerPlugin + + class NonIdempotentPlugin(SchedulerPlugin): + def __init__(self, instance=None): + self.name = "idempotentplugin" + self.instance = instance + self.status = None + + def start(self, scheduler): + pass + + first = NonIdempotentPlugin(instance="first") + await s.register_scheduler_plugin(plugin=dumps(first), idempotent=False) + assert "idempotentplugin" in s.plugins + + second = NonIdempotentPlugin(instance="second") + await s.register_scheduler_plugin(plugin=dumps(second), idempotent=False) + assert "idempotentplugin" in s.plugins + assert s.plugins["idempotentplugin"].instance == "second" From 34ea895c00781dae967051080665c0cbb99a7f57 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Tue, 30 Nov 2021 09:57:03 +0000 Subject: [PATCH 4/5] Correct plugin name --- distributed/tests/test_scheduler.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 38bf20df08e..2f538766e6a 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -3307,7 +3307,7 @@ async def test_non_idempotent_plugins(s): class NonIdempotentPlugin(SchedulerPlugin): def __init__(self, instance=None): - self.name = "idempotentplugin" + self.name = "nonidempotentplugin" self.instance = instance self.status = None @@ -3316,9 +3316,9 @@ def start(self, scheduler): first = NonIdempotentPlugin(instance="first") await s.register_scheduler_plugin(plugin=dumps(first), idempotent=False) - assert "idempotentplugin" in s.plugins + assert "nonidempotentplugin" in s.plugins second = NonIdempotentPlugin(instance="second") await s.register_scheduler_plugin(plugin=dumps(second), idempotent=False) - assert "idempotentplugin" in s.plugins - assert s.plugins["idempotentplugin"].instance == "second" + assert "nonidempotentplugin" in s.plugins + assert s.plugins["nonidempotentplugin"].instance == "second" From 4361774c4b61df2ecd9f233510847c41bb8ae69f Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Tue, 30 Nov 2021 09:57:38 +0000 Subject: [PATCH 5/5] Remove unnecessary code --- distributed/tests/test_scheduler.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 2f538766e6a..c584377de74 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -3282,7 +3282,6 @@ class IdempotentPlugin(SchedulerPlugin): def __init__(self, instance=None): self.name = "idempotentplugin" self.instance = instance - self.status = None def start(self, scheduler): if self.instance != "first": @@ -3309,10 +3308,6 @@ class NonIdempotentPlugin(SchedulerPlugin): def __init__(self, instance=None): self.name = "nonidempotentplugin" self.instance = instance - self.status = None - - def start(self, scheduler): - pass first = NonIdempotentPlugin(instance="first") await s.register_scheduler_plugin(plugin=dumps(first), idempotent=False)