Skip to content

Conversation

@phofl
Copy link
Contributor

@phofl phofl commented Sep 1, 2023

No description provided.

}
)

total.reset_index().compute().sort_values(["l_returnflag", "l_linestatus"])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: feels a little like cheating to compute first before sorting. In a perfect world, dask would know the data is small enough to collect and sort locally

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good idea, I'll add this to my todo list for dask-expr. I'd like to avoid measuring the intermediate compute though, that's why I added it in this order.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, to be more explicit: dask should not sort locally but reduce to a single worker, sort there and return your result

"s_phone",
"s_comment",
]
].persist().sort_values("s_acctbal", ascending=False).head(100)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the persist?

I wonder how common smth like this is. That could be easily improved with an optimized topk-like operation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, wanted to avoid the intermediate compute and have something with persist in here

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That could be easily improved with an optimized topk-like operation

sort+head -> TopK seems like a classic optimization we should probably have at some point. Should be easy to write too (if we have nlargest around already). If we wanted to onboard someone onto the project this might be an interesting good first issue (for an experienced dask dev)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No obligation to wait though.

Also, doing the nlargest optimization removes the need for the persist/compute stuff. It'll all turn into a nicely streamable operation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, just to convey optimization thoughts, this is why we do optimizations top-down. We would not want to touch the sort/shuffle/p2p lowering phase at all until the expressions above it (the head) have an opportunity to make all of that obsolete.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sort+head -> TopK seems like a classic optimization we should probably have at some point.

apart from the optimization, I think we currently don't even have an API for this. We have topk for arrays. That's good. But in this case we want to return entire DF rows based on the TopK. That's not harder to implement, it's just not there, I think.

My question was not targeted for the optimization but rather whether we have (or should offer) this API

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ahhh, thanks 🤦

.revenue.agg("sum")
.reset_index()
)
result_df = result_df.compute()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again, a premature compute that feels like cheating.

Comment on lines +27 to +29
lineitem_ds = read_data("lineitem")

lineitem_filtered = lineitem_ds[lineitem_ds.l_shipdate <= VAR1]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a little bit biased towards dask-expr. Ordinarily, we'd point users to using filters in the read_parquet call.
That's fine, I think, just wanting to point it out

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ordinarily, we'd point users to using filters in the read_parquet call.

I'm pretty comfortable with what's here. I rarely see users use the read_parquet kwargs in practice, even though they should.

More generally, I think that we should write things the way we think a naive users would write them, rather than the way they should write them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I don't think they should. I've done some measurements with filters in read_parquet and they are terribly slow if you don't have metadata available for the columns you are filtering.

I've explicitly avoided pushing the filters into read_parquet in my benchmarks because that was at least 3 times as slow as doing this with dask/pandas

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, that's sad and we should investigate. If this is indeed slower, read_parquet is trying to be smart where it shouldn't be or smth is just implemented poorly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The slowdown is in arrow itself. We aren't doing anything fancy except passing the filters in.

"s_phone",
"s_comment",
]
].persist().sort_values("s_acctbal", ascending=False).head(100)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The TPCH query 2 defines the sorting on multiple columns (see https://www.tpc.org/TPC_Documents_Current_Versions/pdf/TPC-H_v3.0.1.pdf)

image

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep good call, I played with this and forgot to turn it back

Comment on lines 137 to 139
total.reset_index().head(10)[
["l_orderkey", "revenue", "o_orderdate", "o_shippriority"]
]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is missing an orderby

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep thx

Copy link
Contributor

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is one query where I think we're missing an ordering but otherwise this LGTM

@phofl
Copy link
Contributor Author

phofl commented Oct 4, 2023

ok to merge?

@fjetter fjetter merged commit 99a8270 into main Oct 4, 2023
@fjetter fjetter deleted the phofl/tpch branch October 4, 2023 10:05
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants