-
Notifications
You must be signed in to change notification settings - Fork 4k
ARROW-9382: [Rust][DataFusion] Simplified hash aggregations and added Boolean type #7687
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Thanks for opening a pull request! Could you open an issue for this pull request on JIRA? Then could you also rename pull request title in the following format? See also: |
andygrove
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addition of boolean type for grouping key looks good. A quick skim of the rest of the PR looks good too. I will review in more detail this week.
This reduces * the runtime complexity of this operation from O(N*(1 + M)) to O(N*M) (N=number of rows, M=number of aggregations), * the memory footprint from O(N*M) acumulators to O(M) accumulators * the code complexity via DRY.
| ($BUILDER:ident, $TY:ident, $MAP:expr, $COL_INDEX:expr) => {{ | ||
| let mut builder = $BUILDER::new($MAP.len()); | ||
| let mut err = false; | ||
| for k in $MAP.keys() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Side note: The current impl walks the keys and values from the map separately when building the final result. It would perhaps be safer and more efficient to walk the entries instead (as a separate PR).
| let value = get_scalar_value(&aggr_input_values[col], row) | ||
| .map_err(ExecutionError::into_arrow_external_error)?; | ||
|
|
||
| match map.get(&key) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like this code is performing a map lookup for the same key once for each aggregate column. It would be more efficient to do the map lookup once per row.
I don't recall whether we have a cargo bench for hash aggregate that we can use to check for performance regressions but maybe we should add one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. I think that this is already the case on line 345.
I was trying to keep this commit semantic free and just focus on simplifying code. However, I can expand the scope and fix these. Let me know what do you think it is best.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm pretty sure the current code performs one map lookup per row:
for row in 0..batch.num_rows() {
// create grouping key for this row
create_key(&group_values, row, &mut key)
.map_err(ExecutionError::into_arrow_external_error)?;
if let Some(accumulator_set) = map.get(&key) {However, there are a number of inefficiencies in the current implementation and I was planning on contributing some smaller changes to fix those (such as https://issues.apache.org/jira/browse/ARROW-9550). I like some of the cleanup you have in this PR as well. I'm not sure of the best way to co-ordinate on the changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you are right, I am sorry for the wrong information. Very well spotted.
- I can fix this
- we can scrap this PR
- you can just go ahead and push your changes and I work on rebasing this on top of your changes
either is fine for me. If you already started your part on the current master, then we close this PR and go ahead with 2 or 3 (just ping me on your PR and I follow-up in this). If not and would prefer to work on top of this code, I can work on this tomorrow to not block you any further. :)
On a non-technical note, I am fine to scrap code I wrote if it is not sufficiently valuable given the circumstances, so no worries on that front whatsoever.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I fixed both issues for now.
|
(Of course I can also take ARROW-9550, if you want!) |
| } | ||
|
|
||
| // iterate over each row in the batch and create the accumulators for each grouping key | ||
| let mut accumulators: Vec<Rc<AccumulatorSet>> = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It isn't clear, but this is actually an optimization. By buiding the vec of accumulators, it allows better columnar processing and presumably is making better use of the cpu cache. This PR shows the same minor slow down in performance as the other branch I created.
Before:
Running benchmarks with the following options: Opt { debug: false, iterations: 3, batch_size: 4096, path: "/mnt/nyctaxi/csv/year=2019", file_format: "csv" }
Executing 'fare_amt_by_passenger'
Query 'fare_amt_by_passenger' iteration 0 took 11058 ms
Query 'fare_amt_by_passenger' iteration 1 took 11314 ms
Query 'fare_amt_by_passenger' iteration 2 took 11479 ms
After
Running benchmarks with the following options: Opt { debug: false, iterations: 3, batch_size: 4096, path: "/mnt/nyctaxi/csv/year=2019", file_format: "csv" }
Executing 'fare_amt_by_passenger'
Query 'fare_amt_by_passenger' iteration 0 took 11636 ms
Query 'fare_amt_by_passenger' iteration 1 took 12500 ms
Query 'fare_amt_by_passenger' iteration 2 took 12358 ms
This is from running the benchmark crate in this repo.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mind blowing. Still much to learn.
I struggle to run the benchmarks on my computer. Is there any design reason to not run the benchmarks as part of the pipeline? Are they too unstable against changes in hardware?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I was able to run the benchmarks, but I do not obverse any statistically significant difference:
# the commit pre on master:
git checkout cd503c3f583dab4b94c9934d525664e5897ff06d
cargo bench
leaves me with
aggregate_query_no_group_by
time: [121.39 us 121.55 us 121.73 us]
Found 9 outliers among 100 measurements (9.00%)
3 (3.00%) high mild
6 (6.00%) high severe
aggregate_query_group_by
time: [170.22 us 170.75 us 171.47 us]
Found 12 outliers among 100 measurements (12.00%)
7 (7.00%) high mild
5 (5.00%) high severe
aggregate_query_group_by_with_filter
time: [279.00 us 279.34 us 279.71 us]
Found 9 outliers among 100 measurements (9.00%)
2 (2.00%) high mild
7 (7.00%) high severe
followed by
# the latest commit on this branch:
git checkout bbd9da7ce5b582587bce5c8ff8a228f5425e0113
cargo bench
leaves me with
aggregate_query_no_group_by
time: [122.19 us 122.38 us 122.58 us]
change: [-1.5876% +0.3639% +2.3348%] (p = 0.74 > 0.05)
No change in performance detected.
Found 6 outliers among 100 measurements (6.00%)
6 (6.00%) high severe
aggregate_query_group_by
time: [172.66 us 172.91 us 173.19 us]
change: [-0.9329% +1.2144% +3.2272%] (p = 0.28 > 0.05)
No change in performance detected.
Found 12 outliers among 100 measurements (12.00%)
6 (6.00%) high mild
6 (6.00%) high severe
aggregate_query_group_by_with_filter
time: [282.30 us 282.70 us 283.14 us]
change: [-1.1013% +0.9700% +2.7683%] (p = 0.40 > 0.05)
No change in performance detected.
Found 8 outliers among 100 measurements (8.00%)
2 (2.00%) high mild
6 (6.00%) high severe
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jorgecarleitao Out of curiousity what are the hardware specs you ran this on?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mac OS 10.14.5, 2.3 GHz Intel Core i5, 8 GB 2133 MHz LPDDR3.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would be interested in finding out what AVX instruction sets are supported on the specific CPU that you have. For example, does it support AVX2?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not think so:
$ sysctl -a | grep machdep.cpu.features
machdep.cpu.features: FPU VME DE PSE TSC MSR PAE MCE CX8 APIC SEP MTRR PGE MCA CMOV PAT PSE36 CLFSH DS ACPI MMX FXSR SSE SSE2 SS HTT TM PBE SSE3 PCLMULQDQ DTES64 MON DSCPL VMX SMX EST TM2 SSSE3 FMA CX16 TPR PDCM SSE4.1 SSE4.2 x2APIC MOVBE POPCNT AES PCID XSAVE OSXSAVE SEGLIM64 TSCTMR AVX1.0 RDRAND F16C
There is a AVX1.0 there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, this information is incorrect. My CPU family is Kaby Lake, which as the AVX2.0 extension.
|
Closing in preference of #7936 . |
See individual commits for details. The last one is the commit that adds a new feature - boolean type to group by.
The rational to move some of the code to hash.rs is that hashing is something that goes beyond group by; repartitions, window functions, all have an hashing mechanism. By decoupling the aggregates from the hashing we are preparing the code base to support the other operations.