Skip to content

Conversation

@mrocklin
Copy link
Member

This changes behavior for WorkerPlugins to overwrite the previous plugin
if there is a colliding name.

  • Closes #xxxx
  • Tests added / passed
  • Passes black distributed / flake8 distributed / isort distributed

This changes behavior for WorkerPlugins to overwrite the previous plugin
if there is a colliding name.
@mrocklin
Copy link
Member Author

I'm going to go ahead and merge this. I'm happy to revisit it at any point in the future if folks have concerns. This unifies behavior across scheduler/worker/nanny plugins with respect to overwriting by name.

@mrocklin mrocklin merged commit 6dfe5fa into dask:main Aug 22, 2021
@mrocklin mrocklin deleted the worker-plugin-overwrite branch August 22, 2021 19:27
@jrbourbeau
Copy link
Member

Thanks @mrocklin. Just for reference, it was already the plan the plan to update this behavior and was implemented over in #5142

@marouane-miftah
Copy link

marouane-miftah commented Nov 19, 2021

I'm going to go ahead and merge this. I'm happy to revisit it at any point in the future if folks have concerns. This unifies behavior across scheduler/worker/nanny plugins with respect to overwriting by name.

@mrocklin, @jrbourbeau, @crusaderky I have code that needs the plugin to be registered exactly once per worker and any subsequent registration will make the system work in unexpected ways. For example the code below needs to create the asyncio loop only the first time the registration happens.

import asyncio

loop: Optional[asyncio.AbstractEventLoop] = None

class InitWorker(distributed.WorkerPlugin):
    # Prevent the plugin from being registered twice on the same worker
    name = "Init"

    def setup(self, worker) -> None:
        assert not loop
        new_loop = asyncio.new_event_loop()

        future = asyncio.run_coroutine_threadsafe(init_app(), new_loop)
        future.result()
        assert loop is new_loop

    def teardown(self, worker) -> None:
        assert loop
        loop.call_soon_threadsafe(loop.stop)

I tried to change my setup method to something like this, but this does not work

    def setup(self, worker) -> None:
       # plugin has already been registered by worker
       if self.name in worker.plugins:
             return

        assert not loop
        new_loop = asyncio.new_event_loop()

        future = asyncio.run_coroutine_threadsafe(init_app(), new_loop)
        future.result()
        assert loop is new_loop

Is there a work around to get the previous behaviour?

Thank you

Also as a side note, some of the pydoc may not be uptodate

If the plugin has a ``name`` attribute, or if the ``name=`` keyword is
used then that will control idempotency. If a plugin with that name has
already been registered then any future plugins will not run.

@crusaderky
Copy link
Collaborator

Is there a work around to get the previous behaviour?

You could subclass Worker and override Worker.register_worker_plugin. Then you would start workers with
dask-worker --worker-class your.module.CustomWorker

P.S. adding a comment to a closed PR is generally a bad idea in terms of visibility.
Could you open a new issue and xref this PR if you wish to argue for your use case to be supported upstream?

@crusaderky
Copy link
Collaborator

alternatively, you could do

class InitWorker(distributed.WorkerPlugin):
    ...
    async def register(self, dask_scheduler):
        if self.name not in dask_scheduler.worker_plugins:
            return await dask_scheduler.register_worker_plugin(plugin=self, name=self.name)

client.run_on_scheduler(InitWorker().register)

@marouane-miftah
Copy link

Thank you @crusaderky,

You could subclass Worker and override Worker.register_worker_plugin. Then you would start workers with dask-worker --worker-class your.module.CustomWorker

did you mean subclass Worker.plugin_add?

P.S. adding a comment to a closed PR is generally a bad idea in terms of visibility. Could you open a new issue and xref this PR if you wish to argue for your use case to be supported upstream?

I have opened a new issue

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants