Skip to content

Add initial draft of large scale computation class#4972

Closed
mrocklin wants to merge 9 commits intodask:mainfrom
mrocklin:computations-draft
Closed

Add initial draft of large scale computation class#4972
mrocklin wants to merge 9 commits intodask:mainfrom
mrocklin:computations-draft

Conversation

@mrocklin
Copy link
Member

Fixes #4613

This is very much an early work in progress / proof of concept. Mostly this is here to generate discussion

cc @fjetter

dependencies = dependencies or {}

if len(tasks) > 1 or Computation._recent is None:
# TODO: maybe reuse old computation based on time interval?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would propose adding a ctx manager here instead of trying to be smart. something like

with computation(id="my-favourite-computation"):
    while True:
        client.submit(foo)

Most users wouldn't need to use this since we'd assign every compute its own computation. The ctx manager would work similar to annotations, maybe the computation ID is an annotation? idk.

We can iterate on this, of course. I would like to avoid any fragile timing based heuristics if possible

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not applying heuristics here would also allow us to not store state. I believe there wouldn't be a reason for the _recent if we made it explicit

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My current thinking is that we continue to use the previous computation until it finishes. Once it finishes then we close it out and start a new one.

So the following would probably result in a single computation

x = x.persist()
y = y.persist()
z = (x + y).sum().compute()

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We now use the previous computation if Scheduler.total_occupancy > epsilon, so there is no longer a magic time interval.

I do think that a magic time interval might at some point be welcome, but I'm happy to wait to feel the need for that.

@fjetter
Copy link
Member

fjetter commented Jun 25, 2021

the one thing I would add to get started is a unique ID for every computation. probably a UUID or a token (hash) with timestamp.

@mrocklin
Copy link
Member Author

the one thing I would add to get started is a unique ID for every computation. probably a UUID or a token (hash) with timestamp.

We currently store a timestamp. The scheduler has a UUID. Maybe that's enough? I thought about adding a UUID but I didn't need it for anything, and so I stopped. I'm inclined to find a use case before we add this.

@fjetter
Copy link
Member

fjetter commented Jun 25, 2021

We currently store a timestamp. The scheduler has a UUID. Maybe that's enough?

Should be enough. I wasn't aware.

def __repr__(self):
return (
f"Computation: {format_time(time() - self._start)} ago with {len(self.groups)} groups\n\n"
+ "\n---------------------------\n".join(self.code)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the plain repr should be that long. The code could be thousands of lines long and the repr is something we might see in a console, an IDE, a log message, CI, etc.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I don't disagree. This was useful during debugging / writing this PR. I'm happy to strip out the code. We could also consider only collecting +-50 lines of code around the relevant line.

self._tasks = tasks
else:
self._tasks = dict()
self._computations = deque(maxlen=100)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe, this should be configurable from the start. We've seen in the past that too long deques can cause problems for long running clusters and reducing the deque length is often the only way to deal with this problem.

We do not know where people are issuing computations from or how large those modules are. With large modules, these Computation objects might easily reach MB size (for reference, our scheduler.py has ~260KiB) since we might be storing multiple code snapshots per Computation

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No disagreement from me

@mrocklin
Copy link
Member Author

Also TODO: the sys._getframe(4) solution works for x.compute() but not for client.submit(...). We'll need to be slightly more clever here. Probably we'll want to walk up the frames until we're out of a library that we know about (dask, distributed, xarray, prefect, ...)

@mrocklin
Copy link
Member Author

@fjetter any interest in taking this over?

@fjetter
Copy link
Member

fjetter commented Jun 28, 2021

Also TODO: the sys._getframe(4) solution works for x.compute() but not for client.submit(...). We'll need to be slightly more clever here. Probably we'll want to walk up the frames until we're out of a library that we know about (dask, distributed, xarray, prefect, ...)

How important is this feature to us? I remember us having this argument before and you mentioned it was useful. I have a different perspective here but this is obviously very biased and depends on the way one is using dask

In the past years, most of the code I was using with dask for regularly was in libraries. The compute would be the most outer shell of a service and would not reveal any useful logic.

from my_library.dask_stuff import construct_dask_graph

def endpoint(foo, bar, cluster_addr):
   # Instead of a webservice endpoint, this could be a jupyter notebook or smth else
    params = parse_params(foo, bar)
    with Client(cluster_addr):    
        graph = construct_dask_graph(**params)
        graph.compute()

For these situations the frame inspection doesn't work at all or rather it is not very informative since we can only capture the immediate context of the compute. I would expect the pattern to put dask code into libraries not than uncommon.


@fjetter any interest in taking this over?

sure

@mrocklin
Copy link
Member Author

The compute would be the most outer shell of a service and would not reveal any useful logic.

Probably we'll want to walk up the frames until we're out of a library that we know about (dask, distributed, xarray, prefect, ...)

Maybe we can solve this problem by making this list of known libraries configurable. We could also include a brief stack.

To demonstrate value I'll point to the first page of performance reports, which includes code similarly. As someone debugging someone else's code I find that looking at the context of the compute call is pretty helpful. Often I look at the task stream and say "hrm, that's odd, why so much white space" then I look at their code and I say "Oh! Your chunks are way too small". Getting insight into user code has been, for me, quite helpful.

@fjetter fjetter mentioned this pull request Jun 30, 2021
@fjetter
Copy link
Member

fjetter commented Jul 22, 2021

Closed by #5001

@fjetter fjetter closed this Jul 22, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add Computation model to Scheduler

2 participants