Skip to content

Conversation

@mrocklin
Copy link
Member

This is just a proof of concept right now (many things fail) but this shows that we can ship graphs directly from the client to the scheduler without dealing with any pack/unpack things.

We pickle the entire graph, ship it to the scheduler, where we then do all of the graph manipulation stuff that we used to do on the client.

cc @rjzamora @madsbk @ian-r-rose

@mrocklin
Copy link
Member Author

So far I am surprised with how far I was able to get with relatively little lines of code changed.

There are, I'm aware, a whole army of monsters waiting for me. Still though, this feels pretty ok so far.

mrocklin added a commit to mrocklin/dask that referenced this pull request Mar 30, 2022
In dask/distributed#6028 we propose shipping the
entire graph to the scheduler with pickle.  This removes the need for
this custom protocol.
mrocklin added a commit to mrocklin/dask that referenced this pull request Mar 30, 2022
In dask/distributed#6028 we propose shipping the
entire graph to the scheduler with pickle.  This removes the need for
this custom protocol.
@mrocklin
Copy link
Member Author

Dask PR here: #6028

@github-actions
Copy link
Contributor

github-actions bot commented Mar 31, 2022

Unit Test Results

       15 files  ±  0         15 suites  ±0   7h 2m 6s ⏱️ -2s
  2 730 tests +  3    2 645 ✔️  -   1    81 💤 ±  0    4 +  4 
20 201 runs  +14  19 242 ✔️ +11  937 💤  - 19  22 +22 

For more details on these failures, see this check.

Results for commit 517cd25. ± Comparison against base commit 5b6a64a.

♻️ This comment has been updated with latest results.

@mrocklin
Copy link
Member Author

OK, here is a class of tests that we're going to have problems with:

    @gen_cluster(client=True)
    async def test_robust_unserializable(c, s, a, b):
        class Foo:
            def __getstate__(self):
                raise MyException()
    
        with pytest.raises(MyException):
>           future = c.submit(identity, Foo())

We're genuinely opening up the scheduler to more client issues. I think that the rewards outweigh the risks here, but there's definitely a regression here in terms of hardening.

@ian-r-rose ian-r-rose self-requested a review March 31, 2022 16:26
@mrocklin
Copy link
Member Author

OK, I spent a few hours hunting down a few of the test failures. There are a few effects of moving the graph handling onto the scheduler which affect local behavior. There are some things that we just get later in the process, and so they affect when users get negative feedback when they've done something wrong. In practice these changes are small and, I think, not super likely to annoy people. They do exist though.

As an example, if someone has a future from another client, or a cancelled future, we don't actually learn about it until after the graph is constructed and things are resolved on the client. Whereas previously they would learn about this failure on a submit call, they now learn about it on a result call. This is a degredation of experience, but not huge.

There are a few other things I can clean up (there is a long tail here) but in principle I think that the heavy parts of this are done. I would welcome feedback on if this is a direction that we want to pursue.

TODO on this PR

  • Fixup failing tests
  • Merge update_graph/update_graph_hlg
  • Jointly merge in the dask/dask PR

Future Work

  • Merge TaskGroups with Layers
  • See how best we can simplify existing layers logic
  • Remove cull, and replace with some nicer create_graph method

Copy link
Collaborator

@ian-r-rose ian-r-rose left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's some early thoughts, I'm still mulling this over. Correctly forwarding annotations from layers to tasks (especially in the presence of any optimizations) still feels fragile, though the approach you've taken here makes sense.

Pickling certainly seems like a win from the client perspective, as well as for the implementer of HLG layers. It certainly does transfer some pain to the scheduler (which ultimately affects fewer maintainers, I suppose)

self.client_desires_keys(keys=keys, client=client)
return

dsk = dict(graph)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're moving more graph logic onto the scheduler, I wonder if it would make sense to also start doing some low-level graph optimization here (while retaining any output keys that we need for the result).

It would probably be tricky to get that right in the next block, however, since key names might be rewritten.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally low level graph optimization mostly goes away I think. But in principle I agree that it can be done here. We'll want to be mindful of annotations, but we would need to do that anyway.

I'm hopeful though that high level graph optimization removes the need here.

with dask.config.set(optimization__fuse__active=False):
x = await x.persist()

assert all({"workers": (a.address,)} == ts.annotations for ts in s.tasks.values())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree it's better to check a single source of truth here (though if I understand the above correctly, the annotations should still be there).

As far as I can tell, annotations still don't work here, correct? The intent seems reasonable to me, though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't as clear to me that it's desired. I don't mind pulling these out personally. It's an open design choice I think.

if layer.annotations and "retries" in layer.annotations:
retries = retries or {}
d = process(layer.annotations["retries"], keys=layer, string_keys=None)
retries.update(d) # TODO: there is an implicit ordering here
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ordering makes sense to me -- less specific to more specific.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you still consider these TODO? I think this ordering is fine (even better if it's documented somewhere)

@ian-r-rose
Copy link
Collaborator

A thought I had today (which might be off base):

Previously, using GPU-enabled Dask clusters could be somewhat painful if you don't have a GPU on your local machine (cf rapidsai/cudf#3661). That was one of the motivating use-cases of tools like afar. In this PR, the scheduler is doing much more graph manipulation, and in particular it is serializing tasks for workers. Does this mean that the scheduler would also need to have a GPU in those cases?

@jakirkham
Copy link
Member

That's already the case as the Scheduler can deserialize things or be directly involved with serialization in some cases. This isn't desirable and something we would ideally fix, but it is likely no worse with this PR than is already the case. Though others more in the weeds here should feel free to correct this if I've missed something

@mrocklin
Copy link
Member Author

Does this mean that the scheduler would also need to have a GPU in those cases?

yes

@mrocklin
Copy link
Member Author

Heads-up, my plan with this PR is to wait until stability to work is done and we have strong evidence that it's solid. Then we can mint a new version that we're comfortable with and sit with that for a while.

Afterwards I'll work to merge this in and shake things up some more :)

@rjzamora
Copy link
Member

Regarding this comment in the HLG-roadmap issue: I'd like to help get this PR over the line, but my understanding of the client/scheduler code is quite limited compared to my understanding of dask/dask.

@mrocklin - Could you summarize what you expect the current state of this PR to be? Anything you know to be broken and/or missing? I noticed that many tests fail when I use the nuke-hlg branches in both distributed and dask, and that there are some minor conflicts with main. I will try to get these tests passing, but help/advice is very welcome.

@mrocklin
Copy link
Member Author

@fjetter heads up, it looks like @rjzamora is becoming active here. In principle I think that this is probably a good direction to go in (confidence 80% or so?) it's likely to cause mild havoc though.

@rjzamora
Copy link
Member

it looks like @rjzamora is becoming active here.

Yes - I started pushing on this a bit yesterday, and ~80% seems like a reasonable confidence level. On the dask/dask and HLG-development side, this is a huge win. However, we are making some clear trade-offs by always pickling HLGs (and there are probably issues we are not even considering yet).

One change that would probably make me feel more confident is if we preserved the old HLG-packing code path for the special case of materialized layers, and provided an option to materialize layers before shipping the hlg. We would still want to remove the dask_distributed_pack methods, but could preserve the pickle-free packing/unpacking logic somewhere. This would provide an escape hatch for users who are running into problems with pickle (maybe they have mismatched python environments, or no gpu on their scheduler, etc.).

@mrocklin
Copy link
Member Author

One change that would probably make me feel more confident is if we preserved the old HLG-packing code path for the special case of materialized layers, and provided an option to materialize layers before shipping the hlg. We would still want to remove the dask_distributed_pack methods, but could preserve the pickle-free packing/unpacking logic somewhere. This would provide an escape hatch for users who are running into problems with pickle (maybe they have mismatched python environments, or no gpu on their scheduler, etc.).

I'm inclined towards radical simplification here. I don't think that we should do more half-measures here. I think that we should burn things down. (but others may disagree)

@rjzamora
Copy link
Member

I'm inclined towards radical simplification here. I don't think that we should do more half-measures here. I think that we should burn things down. (but others may disagree)

I'll submit something to your branch and see what you think. It seems that we can run the exact same code to materialize and process the HLG on either the client or the scheduler. So, I don't see a huge benefit in avoiding the option to run it on the client (yet).

@mrocklin
Copy link
Member Author

Could you summarize what you expect the current state of this PR to be? Anything you know to be broken and/or missing? I noticed that many tests fail when I use the nuke-hlg branches in both distributed and dask, and that there are some minor conflicts with main. I will try to get these tests passing, but help/advice is very welcome

I think that I had almost everything running smoothly. There was some trickiness around Variables/Semaphores that I think I worked out almost completely but not entirely.

@fjetter
Copy link
Member

fjetter commented Aug 29, 2022

So far, I only skimmed this PR.

At first glance, it looks like we're putting more emphasize on Client.current which makes me a bit nervous. The Client.current mechanism and specifically the default client mechanism is very complex and at times pretty unreliable, specifically in poorly isolated environments (many threads, asyncio, etc.)

This is a small thing, really, but I think we should use Client.current(allow_default=False) which actually returns None if there is no default client registered. I think this entire API should be cleaned up.

Generally, if this is all about the serializability of Futures, maybe we should figure out how to (de-)serialize futures cleanly without touching client code? I might not have understood the problem, yet. Any pointers appreciated.


One change that would probably make me feel more confident is if we preserved the old HLG-packing code path for the special case of materialized layers, and provided an option to materialize layers before shipping the hlg. We would still want to remove the dask_distributed_pack methods, but could preserve the pickle-free packing/unpacking logic somewhere. This would provide an escape hatch for users who are running into problems with pickle (maybe they have mismatched python environments, or no gpu on their scheduler, etc.).

I'm inclined towards radical simplification here. I don't think that we should do more half-measures here. I think that we should burn things down. (but others may disagree)

How far would the "radical simplifications" go? Would this allow for simplifications in distributed.protocol.serialized, e.g. can we get rid of Serialized?

@mrocklin
Copy link
Member Author

mrocklin commented Oct 11, 2022 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants