Skip to content

Conversation

@phofl
Copy link
Collaborator

@phofl phofl commented Nov 16, 2023

First step towards a new optimization organisation

@mrocklin
Copy link
Member

This may be the right approach. I'll raise the concern we mentioned when we spoke last, just to be explicit.

df = dd.read_parquet(...)
train = df[["x", "y", "z"]]

df.y.sum().compute()

When we're computing df.y.sum() we may ask df what columns it dependents need and it'll say "all of x, y, and z" but for this computation we really only need y. It could be that dependent tracking is really only helpful within the context of a single computation.

@phofl phofl marked this pull request as draft November 16, 2023 22:44
@phofl phofl changed the title Keep track of dependents for every expression [DNM] Rewrite optimisations Nov 16, 2023
@phofl
Copy link
Collaborator Author

phofl commented Nov 16, 2023

Yeah you are correct, this is also causing troubles in other places too. I removed that part of the logic and replaced it with an explicit trigger that collects dependents.

This is just a POC. We are doing one simplify step at a time before we have to refresh our dependents, and then repeat. This is an exemplary implementation for Blockwise ops (I am pretty sure that we can make this more efficient than it currently is, but it's a good start)

df = pd.DataFrame({"a": [1, 2, 3, 1, 2, 2], "b": [1, 2, 3, 4, 5, 6], "c": 1, "d": 1})

df = from_pandas(df, npartitions=2)

df = df.replace(1, 5)
df["x"] = df[["b", "c"]].b * 2
df["y"] = df.replace(2, 6).fillna(10).c * 3
df = df[["a", "x", "y"]].simplify()

This results in:

Assign: key='y'
  Assign: key='x'
    Projection: columns=['a']
      Replace: to_replace=1 value=5
        FromPandas: frame='<pandas>' npartitions=2 columns=['a', 'b', 'c']
    Mul: right=2
      Projection: columns='b'
        Replace: to_replace=1 value=5
          FromPandas: frame='<pandas>' npartitions=2 columns=['a', 'b', 'c']
  Mul: right=3
    Projection: columns='c'
      Fillna: value=10
        Replace: to_replace=2 value=6
          Projection: columns=['c']
            Replace: to_replace=1 value=5
              FromPandas: frame='<pandas>' npartitions=2 columns=['a', 'b', 'c']

So we can avoid over-eager pushdown of projections.

The cache is very important if we want to keep the dependents tracking on the Expression itself. We can't create the same expression as multiple Python objects

@phofl phofl marked this pull request as ready for review December 7, 2023 21:33
@phofl phofl changed the title [DNM] Rewrite optimisations Rewrite optimisations to avoid over-eager pushdown Dec 8, 2023
Copy link
Member

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re: runtime complexity. I haven't reviewed the old code so I don't know if this was a regression or not. I assume that most expression graphs have very few edges (and are small-ish overall) so I guess this is not a huge deal but it is a little worrisome since whatever is being written in simplify_up has to be fast. Passing a collection of the size of the entire graph to that function feels... tempting to do the wrong thing.
The largest HLG graphs that were reported on the dask/dask tracker were a couple of (hundred) thousand layers (which I believe was a user error and in the array landscape). Still, I could see this grow for complex queries.

(Note I'm coming in here very biased by dask.order which handles possibly millions of nodes. My gut tells me we can ignore this but at least I wanted to raise awareness)

Other than this, I like the direction considering that the code complexity seems to go down a lot

return dask.core.get(graph, name)


def collect_depdendents(expr) -> defaultdict:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will only return the correct result if expr is a leaf, doesn't it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, it will only work for graphs where expr is the only leaf. I assume this is generally true in dask-expr or are there ways around this?

e.g. would `dask.compute(a, b, c)`` be optimized as one? (We discussed having the possibility of a tuple expr for this, just checking on the status quo)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is currently not working as it should if you do dask.compute(a,b,c), but neither does the status quo.

The tuple expression is what will address this problem

This will only return the correct result if expr is a leaf, doesn't it?

depends on what you are looking for, it would be correct if you are only interested in a subtree, otherwise not

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

depends on what you are looking for, it would be correct if you are only interested in a subtree, otherwise not

Well, we're not always dealing with trees, are we? Consider

graph BT;
    A1-->B1;
    A2-->B1;
    A3-->B2;
    A4-->B2;
    B1-->C1;
    B2-->C1;
    B2-->C2;
    C1-->D;
    C2 -->D

Loading

Unless you start with D you will not get the dependents of B2 right since you're not walking the entire graph.

I assume this is fine since we are always starting at a terminal node?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I wasn't clear enough.

We are always starting with the node that is the leaf node for the thing that we want to compute, so yes


for dep in node.dependencies():
stack.append(dep)
dependents[dep._name].append(weakref.ref(node))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this required to be a weakref?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a little bit of a shortcut, in case we can prune whole branches so that we don't have to consider them when doing optimisations

You could also interpret this as premature optimization, but I was a bit worried that I would keep things alive that will screw me over if I have hard refs

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additionally, weakref give me the option to do more than one step at a time, consider this:

df = ...

df.replace().fillna()["a"]

The first optimization is the following:

df.replace()["a"].fillna()

Replace now only depends on ["a"], but if I keep a hard reference alive, the original fillna()["a"] won't go out of scope, because the replace expression does not change in this step.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not blocking on this but it feels a little brittle to rely on weakref semantics to get this right. I'd be worried if this logic would not work out without weakrefs

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No the logic works without, it would just need more simplify steps to get to the final result

return expr

def simplify(self):
def simplify_once(self, dependents: defaultdict):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little worried about runtime complexity here. I know that expr graphs are supposed to be small but what I see is definitely a non-linear runtime in the number of edges E (at the least)

Generally, this function is called more than once per node. In fact, due to line 365, this is called E times.
_simplify_up in L350 then accepts the dependents and may call things like determine_column_projection which iterates over the dependents which effectively brings us to a quadratic runtime in E

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not necessarily a showstopper but it is concerning, paritcularly if simplify_up became more complex down the road.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

substitute_parameters which is also called occasionally in simplify_up is also iterating over the edges so that's a second source besides determine_column_projection for this runtime complexity.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fjetter and I chatted offline a little about this

It's a potential issue for us that I intend to investigate in January. We can avoid some of this by caching the projected columns on every simplify step, which would hopefully solve the runtime complexity issue. Current main exhibits the same pattern though, so this does not block this pr

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fjetter - Do you think this complexity may have contributed to #796? Sorry - I never got a chance to look through these changes, so I'm not really sure how the new system works yet.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think this complexity may have contributed to

Possibly. I'll try to have a look at this shortly

# Conflicts:
#	dask_expr/_collection.py
#	dask_expr/_concat.py
#	dask_expr/_expr.py
#	dask_expr/_groupby.py
#	dask_expr/_reductions.py
#	dask_expr/_shuffle.py
@phofl
Copy link
Collaborator Author

phofl commented Dec 19, 2023

Rebased and I intend to merge later today

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants