Skip to content

Conversation

@quasiben
Copy link
Member

  • Closes #xxxx
  • Tests added / passed
  • Passes black distributed / flake8 distributed / isort distributed

Fixes an issue first identified by @crusaderky #4853 (comment)

cc @jrbourbeau

@quasiben
Copy link
Member Author

There are a number of places inside of scheduler.py where we rely on dictionary methods which didn't play nicely. For example:

recipient.has_what.add(ts)

In b9c42fc, I thought it best just to handle the return explicitly for async clients. So we could keep the fancy repr for ipython while we sort through a nice model for leveraging HTML in the repr

Copy link
Member

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the quick fix @quasiben

await s.close()


@gen_cluster(client=False, timeout=None)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious why timeout=None is needed here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not needed -- I'll take it out


@gen_cluster(client=False, timeout=None)
async def test_async_whowhat(s, a, b):
c = await Client(s.address, asynchronous=True)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we set client=True in gen_cluster? That should return an asynchronous Client. It will also automatically be closed at the end, so we can avoid an explicit .close() call

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Comment on lines 3243 to 3248
if self.asynchronous:
return self.sync(self.scheduler.has_what, workers=workers, **kwargs)
else:
return HasWhat(
self.sync(self.scheduler.has_what, workers=workers, **kwargs)
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar comment here

@jrbourbeau
Copy link
Member

jrbourbeau commented May 27, 2021

There are a number of places inside of scheduler.py where we rely on dictionary methods which didn't play nicely

FWIW I think that's because has_what is a set on the scheduler

_has_what: set

Co-authored-by: James Bourbeau <jrbourbeau@users.noreply.github.com>
@jakirkham
Copy link
Member

Should it be a dict or a set?

Or is it both? If the latter, maybe it should be object

It's worth noting when variables are too flexible it really makes it hard to Cythonize things efficiently

@jrbourbeau
Copy link
Member

jrbourbeau commented May 27, 2021

Things like TaskState._who_has are sets

_who_has: set

However, client-side methods like Client.who_has result in some additional formatting code being called

def get_who_has(self, comm=None, keys=None):
parent: SchedulerState = cast(SchedulerState, self)
ws: WorkerState
ts: TaskState
if keys is not None:
return {
k: [ws._address for ws in parent._tasks[k].who_has]
if k in parent._tasks
else []
for k in keys
}
else:
return {
key: [ws._address for ws in ts._who_has]
for key, ts in parent._tasks.items()
}

where the thing that is actually returned to the (client-side) user is a dict. Perhaps we should have the dict returned by Scheduler.get_who_has be the place we add the WhoHas dict subclass? What do you think @jakirkham @quasiben?

@quasiben
Copy link
Member Author

where the thing that is actually returned to the (client-side) user is a dict. Perhaps we should have the dict returned by Scheduler.get_who_has be the place we add the WhoHas dict subclass? What do you think @jakirkham @quasiben?

Admittedly, I didn't know the get_who_has method existed. I'll test that out

@jakirkham
Copy link
Member

Appears to be working. @jrbourbeau do you have any more thoughts on this? 🙂

@jrbourbeau
Copy link
Member

Hmm, unfortunately it looks like the HasWhat / WhoHas subclasses aren't surviving the trip from the scheduler to the client. Instead we just get normal dicts at the client. For example, this change

diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py
index 93aae7aa..c4b0c96f 100644
--- a/distributed/tests/test_client.py
+++ b/distributed/tests/test_client.py
@@ -58,6 +58,7 @@ from distributed.comm import CommClosedError
 from distributed.compatibility import MACOS, WINDOWS
 from distributed.core import Status
 from distributed.metrics import time
+from distributed.objects import HasWhat, WhoHas
 from distributed.scheduler import (
     COMPILED,
     CollectTaskMetaDataPlugin,
@@ -3620,6 +3621,9 @@ async def test_async_whowhat(c, s, a, b):
     who_has = await c.who_has()
     has_what = await c.has_what()

+    assert type(who_has) is WhoHas
+    assert type(has_what) is HasWhat
+
     assert who_has == {x.key: (a.address,)}
     assert has_what == {a.address: (x.key,), b.address: ()}

results in test_async_whowhat failing:

    @gen_cluster(client=True)
    async def test_async_whowhat(c, s, a, b):
        [x] = await c.scatter([1], workers=a.address)

        who_has = await c.who_has()
        has_what = await c.has_what()

>       assert type(who_has) is WhoHas
E       AssertionError: assert <class 'dict'> is WhoHas
E        +  where <class 'dict'> = type({'int-c0a8a20f903a4915b94db8de3ea63195': ('tcp://127.0.0.1:59649',)})

distributed/tests/test_client.py:3624: AssertionError

I'm not sure why the HasWhat / WhoHas subclasses aren't making it to the client process right now. If we don't have time to dig in further, I think we can revert ac50228 and merge to fix things for the upcoming release.

@quasiben
Copy link
Member Author

reverted but also added a test based on your last comment

Copy link
Member

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @quasiben, the changes here look good to me. Let's merge after CI finishes.

I'm not sure why the HasWhat / WhoHas subclasses aren't making it to the client process right now

FWIW I think this has to do with msgpack's handling of dict subclasses

In [1]: from distributed.objects import HasWhat

In [2]: x = HasWhat()

In [3]: type(x)
Out[3]: distributed.objects.HasWhat

In [4]: from distributed.protocol.serialize import msgpack_dumps, msgpack_loads

In [5]: x_roundtrip = msgpack_loads(*msgpack_dumps(x))

In [6]: type(x_roundtrip)
Out[6]: dict

@jakirkham @madsbk may find the above snippet interesting

@jakirkham
Copy link
Member

jakirkham commented May 27, 2021

Yeah was wondering if we needed to do something special with serialization (like defining how to serialize this object). Though still would have expected the Client side code to fix this anyways

That said, this seems like we are getting into the weeds right before a release. So probably best to punt and revisit after

@quasiben
Copy link
Member Author

CI finished green. Thanks @jrbourbeau and @jakirkham

@quasiben quasiben merged commit 4f83686 into dask:main May 27, 2021
@quasiben quasiben deleted the fix-whowhat-repr branch May 27, 2021 23:21
douglasdavis pushed a commit to douglasdavis/distributed that referenced this pull request May 28, 2021
* move object WhoHas/HasWhat to scheduler

* remove whohas from scheduler -- bail on fancy repr for async

* lint

* Update distributed/client.py

Co-authored-by: James Bourbeau <jrbourbeau@users.noreply.github.com>

* code clean up and cleaner tests

* use Whohas/HasWht in get_whohas methods

* revert objects usage in scheduler and add assertive test for objects when using sync client

Co-authored-by: James Bourbeau <jrbourbeau@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants