-
Notifications
You must be signed in to change notification settings - Fork 1.9k
perf: Optimize multi_group_by when there are a lot of unique groups
#17592
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
This optimization is fairly simple: if the row indices to append are continues (i.e. `append_row_indices[i] + 1 == append_row_indices[i + 1]`) we will call an optimized function for that case the optimized function should copy all the data in a single pass making it very fast as opposed to item by item
|
@alamb Are there any benchmarks that you can run that use that flow? I only see the following but it benchmark bytes view which is the only non optimized case: I see that the original PR that added it run clickbench but I don't know if clickbench have a lot of unique groups: |
|
🤖 |
Several of the ClickBench queries have many distinct groups so hopefully that will cover it. I kicked off the run and will check back shortly. Thank you @rluvaton -- this PR sounds quite cool. |
datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs
Outdated
Show resolved
Hide resolved
datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs
Outdated
Show resolved
Hide resolved
|
🤖: Benchmark completed Details
|
|
🤖 |
|
🤖: Benchmark completed Details
|
|
@alamb I added fuzz tests, and this PR is ready for review |
|
@jayzhan211 would love your review as well |
|
I might be missing something but the benchmark showed that q22 is really fast now But q22 have single group by column and it should no go to the path I just changed SELECT
SearchPhrase,
MIN(URL),
MIN(Title),
COUNT(*) AS c,
COUNT(DISTINCT UserID)
FROM hits
WHERE
Title LIKE '%Google%' AND
URL NOT LIKE '%.google.%' AND
SearchPhrase <> ''
GROUP BY SearchPhrase
ORDER BY c DESC
LIMIT 10;I verified that the codepath was not used by adding |
|
Will rerun to see if we can reproduce the results |
|
🤖 |
|
🤖: Benchmark completed Details
|
|
🤖 |
|
🤖: Benchmark completed Details
|
|
Most of the benefit would be from bytes but all bytes are treated as views by default when parsing from sql due to: so no query would show that unless disabled |
this is done to make sure we don't allocate the indices again when it is not supported
| /// Whether this builder supports [`Self::append_array_slice`] optimization | ||
| /// In case it returns true, [`Self::append_array_slice`] must be implemented | ||
| fn support_append_array_slice(&self) -> bool { | ||
| false | ||
| } | ||
|
|
||
| /// Append slice of values from `array`, starting at `start` for `length` rows | ||
| /// | ||
| /// This is a special case of `vectorized_append` when the rows are continuous | ||
| /// | ||
| /// You should implement this to optimize large copies of contiguous values. | ||
| /// | ||
| /// This does not get the sliced array even though it would be more user-friendly | ||
| /// to allow optimization that avoid the additional computation that can happen in a slice | ||
| /// | ||
| /// Note: in order for this to be used, [`Self::support_append_array_slice`] must return true | ||
| fn append_array_slice( | ||
| &mut self, | ||
| _array: &ArrayRef, | ||
| _start: usize, | ||
| _length: usize, | ||
| ) -> Result<()> { | ||
| assert!(!self.support_append_array_slice(), "support_append_array_slice() return true while append_array_slice() is not implemented"); | ||
| not_impl_err!( | ||
| "append_array_slice is not implemented for this GroupColumn, please implement it as well as support_append_array_slice" | ||
| ) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Originally I did not had support_append_array_slice and I had this default implementation for append_array_slice:
fn append_array_slice(
&mut self,
array: &ArrayRef,
start: usize,
length: usize,
) -> Result<()> {
let rows = (start..start + length).collect::<Vec<_>>();
self.vectorized_append(array, &rows)
}but I moved out of this to avoid having the allocation here as we already hold the append_row_indices.
as I saw from the benchmarks that it can be slower for non supported impl
|
I ran out of time for a thorough review today -- I need to do this one with a clean 🧠 in the morning |
|
Second run shows no real change for Query 22 🤔
|
I tried one query that has a large number of groups: SELECT "UserID", "SearchPhrase", COUNT(*)
FROM '/Users/andrewlamb/Software/datafusion/benchmarks/data/hits.parquet'
GROUP BY "UserID", "SearchPhrase"
ORDER BY COUNT(*) DESC
LIMIT 10;and it didn't seem to show any difference |
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @rluvaton
I reviewed this code carefully and it makes a lot of sense to me. In general I think it is almost ready to merge.
I also verified that the new fuzz test covers the newly added code
The only thing I think is required is some sort of benchmark that shows this actually improves performance in some case(there are more guidelines here)
In general, the performance improvement from a change should be “enough” to justify any added code complexity. How much is “enough” is a judgement made by the committers, but generally means that the improvement should be noticeable in a real-world scenario and is greater than the noise of the benchmarking system.
Do you have any ideas / ways to create one? Perhaps a SELECT DISTINCT query with some strings 🤔 ?
| /// The `vectorized append` row indices buffer | ||
| append_row_indices: Vec<usize>, | ||
|
|
||
| /// If all the values in `append_row_indices` are consecutive |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| /// If all the values in `append_row_indices` are consecutive | |
| /// If all the values in `append_row_indices` are consecutive. | |
| /// This is updated by [`Self::add_append_row_index`] |
| col, | ||
| &self.vectorized_operation_buffers.append_row_indices, | ||
| )?; | ||
| if self |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest putting this check into a function with a descriptive name, something like
if let Some(start, length) = self.consecutive_row_indices() {
...
} else {
...
}I think that would make the intent clearer
But this is not necessary for this PR
This reverts commit cc2e725.
|
Any update on the benchmark / showing some query that this improve performances? |
Sorry we are in a holiday season, creating one now |
| self.remaining_row_indices.clear(); | ||
| } | ||
|
|
||
| fn add_append_row_index(&mut self, row: usize) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would probably faster and somewhat cleaner to do this in a single check on append_row_indices in vectorized_append,
something like:
append_row_indices.windows(2).all(|[x, y]| x + 1 == y)
will do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think even better, it should also be possible to just check the length of append_row_indices. If this is equal to the size of the incoming batch, all of the indices should be consecutive already (i.e. all values are unique), so it becomes a really cheap check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think even better, it should also be possible to just check the length of
append_row_indices. If this is equal to the size of the incoming batch, all of the indices should be consecutive already (i.e. all values are unique), so it becomes a really cheap check.
but what if we only need to add the last half of the column. then the length would not be equal but it is still consecutive
also added benchmark for testing pure grouping performance for more than 1 column.
----
I run this query for the data:
```
SELECT
COUNT(*) AS total_count,
COUNT(DISTINCT u64_wide) AS unique_count,
COUNT(DISTINCT u64_wide) * 1.0 / COUNT(*) AS cardinality
FROM t;
```
Before:
```
| total_count | unique_count | cardinality |
| ----------- | ------------ | ----------- |
| 65536 | 2048 | 0.03125 |
```
After:
```
| total_count | unique_count | cardinality |
| ----------- | ------------ | ----------- |
| 65536 | 65536 | 1.0 |
```
Which issue does this PR close?
N/A
Rationale for this change
I want fast grouping when there are a lot of columns to group by and there are a lot of unique groups
What changes are included in this PR?
This optimization is fairly simple:
if the row indices to append are continues (i.e.
append_row_indices[i] + 1 == append_row_indices[i + 1]) we will call an optimized function for that casethe optimized function should copy all the data in a single pass making it very fast as opposed to item by item.
I did not implement for Bytes views at the moment as I don't think it would be very beneficial as there is no chunk of data that we can copy once and finish with it.
I also added fuzz tests for grouping on multiple columns that each one column that I tested have different optimized implementation. all rows are unique in the test so we assert that we are getting the same output
Are these changes tested?
Yes, and I also try to insert bugs (manual mutation tests) to see that the test is solid
Are there any user-facing changes?
Nope