Skip to content

Conversation

@martindurant
Copy link
Member

Fixes #4224

@martindurant
Copy link
Member Author

I tested manually with the following code, to show that blocking, async methods and properties work:

import dask.distributed
client = dask.distributed.Client(n_workers=1)

class Minimal:
    def __init__(self):
        self.value = 0

    def inc(self):
        self.value += 1
        return self.value

    async def ainc(self):
        self.value += 1
        return self.value

    @property
    def val(self):
        return self.value

class UsesMinimal:

    def do_inc(self, ac, N):
        return ac.inc().result()

    async def ado_inc(self, ac, N):
        return await ac.ainc()

    def do_val(self, ac):
        return ac.val

ac = client.submit(Minimal, actor=True).result()
ac2 = client.submit(UsesMinimal, actor=True, workers=[ac._address]).result()

@mrocklin
Copy link
Member

mrocklin commented Nov 7, 2020

Cool. I'm glad to see this come together so fast. Do you think it would be doable to include your manual test as a pytest test?

@martindurant
Copy link
Member Author

martindurant commented Nov 7, 2020 via email

attr = getattr(actor, key)

if iscoroutinefunction(attr):
return lambda *args, **kwargs: attr(*args, **kwargs)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused by this. It seems like a no-op? Would return attr be equivalent?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems ... likely :). It must have been a little more complex as I tried&errored here.

@mrocklin
Copy link
Member

mrocklin commented Nov 7, 2020

Yes of course - I wasn't certain whether what i wrote made sense or not.

I don't see anything that rings any warning bells, but all of my experience here is somewhat dated. And hey, "if it passes tests ... " :)

@martindurant
Copy link
Member Author

This took a little more work, because the new code path I made only applies to workers calling workers, not normal submitted tasks calling workers (because they go via the scheduler and get executed in the normal pool).

@martindurant martindurant marked this pull request as ready for review November 9, 2020 14:47
@martindurant
Copy link
Member Author

This passed on Travis at https://travis-ci.com/github/martindurant/distributed/builds/199375854 but not updating here.

@martindurant
Copy link
Member Author

The following also fails - in fact, I think any exception in an actor might kill the worker. I'll put the fix in a new PR as soon as I have it

def test_exception():
    class MyException(Exception):
        pass

    class Broken:
        def method(self):
            raise MyException

    with cluster(nworkers=2) as (cl, w):
        client = Client(cl["address"])
        ac = client.submit(Broken, actor=True).result()
        acfut = ac.method()
        with pytest.raises(MyException):
            acfut.result()

if (
self._worker
and self._worker.address == self._address
and threading.current_thread().name.startswith("Dask-Actor-Threads")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typically we use threadlocals for this. You might want to look into the use of the thread_state variable in worker.py


thread_state.execution_state = execution_state
thread_state.key = key
thread_state.actor = True
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should set this to False just after calling the function?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intent, I think, is to state "yes, this is an actor-specific thread" rather than "we happen to be running an actor function right now". This use is more similar to checking the thread name.

@mrocklin mrocklin merged commit 7d769b8 into dask:master Nov 10, 2020
@martindurant martindurant deleted the direct_actors branch November 10, 2020 19:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Actors on same worker cause deadlock

2 participants