-
Notifications
You must be signed in to change notification settings - Fork 1.9k
limit intermediate batch size in nested_loop_join #16443
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
ec04210 to
00140d3
Compare
benchmarkI use this script to do benchmark
I'll find out why there is a performance improvement tpch benchmark result: memory usageI use this to get sql memory usage
|
| let enforce_batch_size_in_joins = | ||
| context.session_config().enforce_batch_size_in_joins(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can remove the enforce_batch_size_in_joins configuration for nested loop join since
- The new implementation in this PR achieves both improved performance and lower memory usage. This surpasses the previous state where
enforce_batch_size_in_joinswas used to toggle between better performance (false) and lower memory usage (true).
datafusion/datafusion/common/src/config.rs
Lines 404 to 408 in e6df27c
/// Should DataFusion enforce batch size in joins or not. By default, /// DataFusion will not enforce batch size in joins. Enforcing batch size /// in joins can reduce memory usage when joining large /// tables with a highly-selective join filter, but is also slightly slower. pub enforce_batch_size_in_joins: bool, default = false - Verification confirms that results remain correct without this configuration
|
Those benchmark helper functions are really cool, I'll see if I can take a look today. |
From the flame graph (when executing the SQL
But I still can't explain why these two functions performed better. 😂 |
|
When you are running the benchmarks do they stay consistent? |
Yes. bechmarks result almost consistent. I ran the benchmarks a few minutes ago on commit. It's worth noting that the join type in query 3 (q3) was modified from a right join to a left join.
|
This comment was marked as outdated.
This comment was marked as outdated.
jonathanc-n
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit confused where the performance increase is coming from as well. I noted down some nits; I'll take a better look tonight
| datafusion_common::_internal_datafusion_err!( | ||
| "should have join_result_status" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can change this to be more verbose, and we can use
internal_err!
| datafusion_common::_internal_datafusion_err!( | |
| "should have join_result_status" | |
| internal_err!( | |
| "get_next_join_result called without initializing join_result_status" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can use internal_err!
internal_err! return Result but ok_or_else require closure return error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we import _internal_datafusion_err! so we do not need the path qualifier? small nit, just looks cleaner that way
|
@korowa do you by any chance have time to review this PR? |
| // - probe_indices: row indices from probe-side table (right table) | ||
| // - processed_count: number of index pairs already processed into output batches | ||
| // We have completed join result for indices [0..processed_count) | ||
| join_result_status: Option<( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may be better to create a separate struct for ProcessProbeBatch state, extended with all attributes required to track join progress (example for hash join)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In a hash join, ProcessProbeBatch is solely responsible for tracking the join progress on the probe side. In contrast, join_result_status serves a broader purpose: it tracks progress for both the probe side and for the unmatched rows from the build side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still think we should make this into a struct
| fn new_task_ctx() -> Arc<TaskContext> { | ||
| let base = TaskContext::default(); | ||
| // limit max size of intermediate batch used in nlj to 1 | ||
| let cfg = base.session_config().clone().with_batch_size(1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you, please, parameterize batch_size value and run all unit tests for various batch sizes (e.g. 1, 2, 4, 10, 8192)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DONE
| .expression() | ||
| .evaluate(&intermediate_batch)? | ||
| .into_array(intermediate_batch.num_rows())?; | ||
| let filter_result = if let Some(max_size) = max_intermediate_size { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why batch_size enforcement should take place during filtering? Can we enforce it before filtering, while calculating build/probe_indices args for this function (in NestedLoopJoinExec::build_join_indices)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we enforce it before filtering, while calculating build/probe_indices args for this function (in NestedLoopJoinExec::build_join_indices)?
I'll do it in next pr. #16364 (comment)
Why batch_size enforcement should take place during filtering
- Although the "Process the Cartesian Product Incrementally" step is designed to limit the input size for
apply_join_filter_to_indices, the size of a single batch can still be very large (up toleft_table.now_rows() * N). When the left table itself is large, this can lead to the creation of a largerecord_batch. - Benchmarks indicate that executing joins is faster with this enforcement in place. limit intermediate batch size in nested_loop_join #16443 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we have this done in this pull request? I think it would make more sense (just moving this logic to build_join_indices
This comment was marked as outdated.
This comment was marked as outdated.
Sorry, something went wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes but I believe this removes the purpose of the pull request if we are building the entire amount of indices? I may be missing something though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this removes the purpose of the pull request if we are building the entire amount of indices
This PR only limits the size of the intermediate record_batch. The Cartesian product of the entire left_table and right_batch is still generated at once (this will be limited in a subsequent PR).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this removes the purpose of the pull request if we are building the entire amount of indices
This PR only limits the size of the intermediate
record_batch. The Cartesian product of the entireleft_tableandright_batchis still generated at once (this will be limited in a subsequent PR).
Additionally, making the Cartesian product step incremental likely requires a larger refactor (comparing to this PR), so it may be better suited for a separate PR.
| ); | ||
| timer.done(); | ||
| if self.join_result_status.is_none() { | ||
| self.join_metrics.input_batches.add(1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fetch_probe_batch seems to be better fit for tracking these two metrics (input_batches/rows)
| /// Current state of the stream | ||
| state: NestedLoopJoinStreamState, | ||
| #[allow(dead_code)] | ||
| // TODO: remove this field ?? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since there is no more need in splitting output batch, and the output is generating progressively, I suppose it can be removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DONE
|
|
||
| let current_start = *start; | ||
|
|
||
| if left_indices.is_empty() && right_indices.is_empty() && current_start == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If both index arrays are empty, maybe it is ok to simply return None here, instead of building batch and setting start to 1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That was my initial approach. However, it resulted in an output with 0 rows and 0 columns, which seems to be incorrect and caused the test to fail.
You can see the failed CI run here:
https://github.com/apache/datafusion/actions/runs/15734253347/job/44343070926?pr=16443#step:5:1208
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we build and return an empty batch instead of calling build_batch_from_indices?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't get this status.processed_count = 1 logic either, perhaps you can add a quick comment to explain it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That was my initial approach. However, it resulted in an output with 0 rows and 0 columns, which seems to be incorrect and caused the test to fail.
You can see the failed CI run here:
https://github.com/apache/datafusion/actions/runs/15734253347/job/44343070926?pr=16443#step:5:1208
Now I understand why this test passed after I changed the return value from None to RecordBatch::new_empty.
In this unit test, the join result is converted to a string and then compared with the expected output. When converting to a string, it retrieves the schema from the record_batch (as the passed schema_opt is None).
https://github.com/apache/arrow-rs/blob/7b219f98c25fcd318a0c207f51a41398d1b23724/arrow-cast/src/pretty.rs#L183-L187
When executed in the CLI, there's no issue even if the Nested Loop Join (NLJ) returns 0 record batches.
> select t1.value from range(1) t1 join range(1) t2 on t1.value + t2.value >100;
+-------+
| value |
+-------+
+-------+
0 row(s) fetched.
From a compatibility standpoint, I think it's better to keep it consistent with the previous behavior.
| if self.join_result_status.is_none() { | ||
| self.join_metrics.input_batches.add(1); | ||
| self.join_metrics.input_rows.add(batch.num_rows()); | ||
| let _timer = self.join_metrics.join_time.timer(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe only one timer covering all the function will be enough (instead on this timer, and the one on L1023)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe only one timer covering all the function will be enough (instead on this timer, and the one on L1023)?
That was my initial approach, but it caused a borrow checker error (E0502).
The issue is a conflict between an immutable borrow for the timer (self.join_metrics.join_time) and a mutable borrow required by self.get_next_join_result.
We can create the timer inside get_next_join_result itself, rather than in the caller
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can clone it, the underlying structure of this metric is Arc<AtomicUsize>, so the cloned version points to the same counter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. I've updated the code to clone join_metrics.join_time
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR refactors join logic to limit the intermediate batch size during filtering and to yield partial batches on demand.
- Updated helper functions and state management to support an optional maximum intermediate batch size
- Refactored nested loop join execution logic and test contexts to integrate the new batching mechanism
- Propagated changes across related join implementations (symmetric hash join and hash join)
Reviewed Changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
| datafusion/physical-plan/src/joins/utils.rs | Adds an import of std::cmp::min and updates apply_join_filter_to_indices to support an optional max intermediate batch size. |
| datafusion/physical-plan/src/joins/symmetric_hash_join.rs | Passes None for the new intermediate batch size parameter to maintain compatibility. |
| datafusion/physical-plan/src/joins/nested_loop_join.rs | Introduces incremental join result production, state transitions, and test context updates for limiting batch sizes. |
| datafusion/physical-plan/src/joins/hash_join.rs | Updated join filter call to pass None for the intermediate batch size parameter. |
Comments suppressed due to low confidence (1)
datafusion/physical-plan/src/joins/nested_loop_join.rs:1068
- The error message uses 'OutputUnmatchBatch' which is inconsistent with the enum variant named 'OutputUnmatchedBuildRows'. Please update the error message for consistency.
return internal_err!(
| // TODO: remove this field ?? | ||
| /// Transforms the output batch before returning. | ||
| batch_transformer: T, | ||
| /// Result of the left data future | ||
| left_data: Option<Arc<JoinLeftData>>, | ||
|
|
||
| // Tracks progress when building join result batches incrementally | ||
| // Contains (build_indices, probe_indices, processed_count) where: | ||
| // - build_indices: row indices from build-side table (left table) | ||
| // - probe_indices: row indices from probe-side table (right table) | ||
| // - processed_count: number of index pairs already processed into output batches | ||
| // We have completed join result for indices [0..processed_count) | ||
| join_result_status: Option<( | ||
| PrimitiveArray<UInt64Type>, | ||
| PrimitiveArray<UInt32Type>, | ||
| usize, | ||
| )>, | ||
|
|
Copilot
AI
Jun 29, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The 'join_result_status' field contains a TODO comment indicating uncertainty. Consider either removing this field if it is no longer necessary or clarifying its purpose.
| // TODO: remove this field ?? | |
| /// Transforms the output batch before returning. | |
| batch_transformer: T, | |
| /// Result of the left data future | |
| left_data: Option<Arc<JoinLeftData>>, | |
| // Tracks progress when building join result batches incrementally | |
| // Contains (build_indices, probe_indices, processed_count) where: | |
| // - build_indices: row indices from build-side table (left table) | |
| // - probe_indices: row indices from probe-side table (right table) | |
| // - processed_count: number of index pairs already processed into output batches | |
| // We have completed join result for indices [0..processed_count) | |
| join_result_status: Option<( | |
| PrimitiveArray<UInt64Type>, | |
| PrimitiveArray<UInt32Type>, | |
| usize, | |
| )>, | |
| /// Transforms the output batch before returning. | |
| batch_transformer: T, | |
| /// Result of the left data future | |
| left_data: Option<Arc<JoinLeftData>>, |
jonathanc-n
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @UBarney, just some comments
| .expression() | ||
| .evaluate(&intermediate_batch)? | ||
| .into_array(intermediate_batch.num_rows())?; | ||
| let filter_result = if let Some(max_size) = max_intermediate_size { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we have this done in this pull request? I think it would make more sense (just moving this logic to build_join_indices
| fn build_unmatched_output( | ||
| &mut self, | ||
| ) -> Result<StatefulStreamResult<Option<RecordBatch>>> { | ||
| if matches!( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not think we need this check, it is already guaranteed that this function will only run when we have the OutputUnmatchedBuildRows state
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we don't need this check
| datafusion_common::_internal_datafusion_err!( | ||
| "should have join_result_status" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we import _internal_datafusion_err! so we do not need the path qualifier? small nit, just looks cleaner that way
|
|
||
| let current_start = *start; | ||
|
|
||
| if left_indices.is_empty() && right_indices.is_empty() && current_start == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we build and return an empty batch instead of calling build_batch_from_indices?
| // - probe_indices: row indices from probe-side table (right table) | ||
| // - processed_count: number of index pairs already processed into output batches | ||
| // We have completed join result for indices [0..processed_count) | ||
| join_result_status: Option<( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still think we should make this into a struct
| NestedLoopJoinStreamState::ProcessProbeBatch(record_batch) => record_batch, | ||
| NestedLoopJoinStreamState::OutputUnmatchedBuildRows(record_batch) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| NestedLoopJoinStreamState::ProcessProbeBatch(record_batch) => record_batch, | |
| NestedLoopJoinStreamState::OutputUnmatchedBuildRows(record_batch) => { | |
| NestedLoopJoinStreamState::ProcessProbeBatch(record_batch) | NestedLoopJoinStreamState::OutputUnmatchedBuildRows(record_batch) => { |
| } | ||
| _ => { | ||
| return internal_err!( | ||
| "state should be ProcessProbeBatch or OutputUnmatchBatch" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| "state should be ProcessProbeBatch or OutputUnmatchBatch" | |
| "State should be ProcessProbeBatch or OutputUnmatchedBuildRows" |
| } | ||
| } | ||
| } else { | ||
| internal_err!("state should be OutputUnmatchBatch") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| internal_err!("state should be OutputUnmatchBatch") | |
| internal_err!("State should be OutputUnmatchedBuildRows") |
Thanks @jonathanc-n for reviewing. I have addressed all of your comments. |
jonathanc-n
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @UBarney This looks good to me! Looking forward to reviewing the follow up prs
|
@korowa @2010YOUY01 Are you able to take a quick look? Thanks! |
Thank you so much for this optimization. It's on my list, but due to the complexity of the join operator, I need to find a time when my mind is clear to review it — which is challenging, as I often feel slow recently 😇 BTW I think the micro-benchmarks for NLJ is quite valuable, it would be great to see them in df's benchmark suite. The same for this memory profiling functionality in benchmark scripts. |
| filter.column_indices(), | ||
| build_side, | ||
| )?; | ||
| let filter_result = filter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps the performance improvement comes from the fact that the data is still in cache when doing the filtering step in subsequent operation?
When doing it on the entire array, it will be wiped out from the cache if it is large enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This version boosts performance with a much higher IPC of 1.75 (vs 0.87), achieved by dramatically cutting LLC misses from 109M to 25M, even with a similar L1 miss rate.
Details
sudo perf stat -e cycles,instructions,L1-dcache-load-misses,L1-dcache-loads,LLC-loads,LLC-load-misses ./limit_batch_size@36991aca -c 'select t1.value from range(100) t1 join range(819200) t2 on (t1.value + t2.value) % 1000 = 0; ' --maxrows 1
sudo perf stat -e cycles,instructions,L1-dcache-load-misses,L1-dcache-loads,LLC-loads,LLC-load-misses ./join_base@6965fd32 -c 'select t1.value from range(100) t1 join range(819200) t2 on (t1.value + t2.value) % 1000 = 0; ' --maxrows 1
DataFusion CLI v48.0.0
+-------+
| value |
+-------+
| 40 |
| . |
| . |
| . |
+-------+
81901 row(s) fetched. (First 1 displayed. Use --maxrows to adjust)
Elapsed 0.067 seconds.
Performance counter stats for './limit_batch_size@36991aca -c select t1.value from range(100) t1 join range(819200) t2 on (t1.value + t2.value) % 1000 = 0; --maxrows 1':
1901401922 cycles
3325776634 instructions # 1.75 insn per cycle
32419611 L1-dcache-load-misses # 5.27% of all L1-dcache accesses
614645891 L1-dcache-loads
<not supported> LLC-loads
<not supported> LLC-load-misses
0.073244586 seconds time elapsed
0.448238000 seconds user
0.044823000 seconds sys
DataFusion CLI v48.0.0
+-------+
| value |
+-------+
| 99 |
| . |
| . |
| . |
+-------+
81901 row(s) fetched. (First 1 displayed. Use --maxrows to adjust)
Elapsed 0.131 seconds.
Performance counter stats for './join_base@6965fd32 -c select t1.value from range(100) t1 join range(819200) t2 on (t1.value + t2.value) % 1000 = 0; --maxrows 1':
3696196789 cycles
3201132508 instructions # 0.87 insn per cycle
21781750 L1-dcache-load-misses # 3.68% of all L1-dcache accesses
592094439 L1-dcache-loads
<not supported> LLC-loads
<not supported> LLC-load-misses
0.139081088 seconds time elapsed
0.835575000 seconds user
0.111277000 seconds sys
(venv) √ devhomeinsp ~/c/d/t/release > valgrind --cache-sim=yes --tool=cachegrind ./join_base@6965fd32 -c 'select t1.value from range(8192) t1 join range(8192) t2 on t1.value + t2.value > t1.value * t2.value;' --maxrows 1
==94454== Cachegrind, a high-precision tracing profiler
==94454== Copyright (C) 2002-2017, and GNU GPL'd, by Nicholas Nethercote et al.
==94454== Using Valgrind-3.22.0 and LibVEX; rerun with -h for copyright info
==94454== Command: ./join_base@6965fd32 -c select\ t1.value\ from\ range(8192)\ t1\ join\ range(8192)\ t2\ on\ t1.value\ +\ t2.value\ \>\ t1.value\ *\ t2.value; --maxrows 1
==94454==
--94454-- warning: L3 cache found, using its data for the LL simulation.
--94454-- warning: specified LL cache: line_size 64 assoc 12 total_size 31,457,280
--94454-- warning: simulated LL cache: line_size 64 assoc 15 total_size 31,457,280
DataFusion CLI v48.0.0
+-------+
| value |
+-------+
| 1 |
| . |
| . |
| . |
+-------+
32763 row(s) fetched. (First 1 displayed. Use --maxrows to adjust)
Elapsed 7.948 seconds.
==94454==
==94454== I refs: 3,555,994,712
==94454== I1 misses: 66,444
==94454== LLi misses: 26,028
==94454== I1 miss rate: 0.00%
==94454== LLi miss rate: 0.00%
==94454==
==94454== D refs: 813,250,121 (475,263,085 rd + 337,987,036 wr)
==94454== D1 misses: 118,285,307 ( 71,937,864 rd + 46,347,443 wr)
==94454== LLd misses: 109,455,796 ( 63,122,399 rd + 46,333,397 wr)
==94454== D1 miss rate: 14.5% ( 15.1% + 13.7% )
==94454== LLd miss rate: 13.5% ( 13.3% + 13.7% )
==94454==
==94454== LL refs: 118,351,751 ( 72,004,308 rd + 46,347,443 wr)
==94454== LL misses: 109,481,824 ( 63,148,427 rd + 46,333,397 wr)
==94454== LL miss rate: 2.5% ( 1.6% + 13.7% )
(venv) √ devhomeinsp ~/c/d/t/release > valgrind --cache-sim=yes --tool=cachegrind ./limit_batch_size@36991aca -c 'select t1.value from range(8192) t1 join range(8192) t2 on t1.value + t2.value > t1.value * t2.value;' --maxrows 1
==96086== Cachegrind, a high-precision tracing profiler
==96086== Copyright (C) 2002-2017, and GNU GPL'd, by Nicholas Nethercote et al.
==96086== Using Valgrind-3.22.0 and LibVEX; rerun with -h for copyright info
==96086== Command: ./limit_batch_size@36991aca -c select\ t1.value\ from\ range(8192)\ t1\ join\ range(8192)\ t2\ on\ t1.value\ +\ t2.value\ \>\ t1.value\ *\ t2.value; --maxrows 1
==96086==
--96086-- warning: L3 cache found, using its data for the LL simulation.
--96086-- warning: specified LL cache: line_size 64 assoc 12 total_size 31,457,280
--96086-- warning: simulated LL cache: line_size 64 assoc 15 total_size 31,457,280
DataFusion CLI v48.0.0
+-------+
| value |
+-------+
| 1 |
| . |
| . |
| . |
+-------+
32763 row(s) fetched. (First 1 displayed. Use --maxrows to adjust)
Elapsed 8.163 seconds.
==96086==
==96086== I refs: 3,663,944,959
==96086== I1 misses: 944,257
==96086== LLi misses: 27,378
==96086== I1 miss rate: 0.03%
==96086== LLi miss rate: 0.00%
==96086==
==96086== D refs: 847,265,289 (495,073,876 rd + 352,191,413 wr)
==96086== D1 misses: 122,392,985 ( 74,620,328 rd + 47,772,657 wr)
==96086== LLd misses: 25,750,761 ( 12,815,239 rd + 12,935,522 wr)
==96086== D1 miss rate: 14.4% ( 15.1% + 13.6% )
==96086== LLd miss rate: 3.0% ( 2.6% + 3.7% )
==96086==
==96086== LL refs: 123,337,242 ( 75,564,585 rd + 47,772,657 wr)
==96086== LL misses: 25,778,139 ( 12,842,617 rd + 12,935,522 wr)
==96086== LL miss rate: 0.6% ( 0.3% + 3.7% )
| let filter_refs: Vec<&dyn Array> = | ||
| filter_results.iter().map(|a| a.as_ref()).collect(); | ||
|
|
||
| compute::concat(&filter_refs)? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be nice to avoid this and rely on CoalesceBatches instead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh wait this is filter, I have another suggestion
| .expression() | ||
| .evaluate(&intermediate_batch)? | ||
| .into_array(intermediate_batch.num_rows())?; | ||
| filter_results.push(filter_result); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about executing the filter directly on build/probe indices slice here and concatenating the indices later?
Ideally, I think the filtering operation should be done here, directly on the sub-batch (and using coalescing kernel (apache/arrow-rs#7652 ) to push results.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
coalesce is now available in datafusion (we have upgraded to a new arrow version)
I hope to continue improving coalesce over time (especially for this common usecase of building up the output of filter)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I fully understand your suggestions. Could you please elaborate?
What about executing the filter directly on build/probe indices slice here and concatenating the indices later?
I'm a bit confused by this. Do you mean we can avoid constructing the intermediate_batch? It seems this approach would require rewriting the filter logic to work directly on index slices
and using coalescing kernel
I'm also not sure why the coalesce kernel should be used here. The current function takes build_indices and probe_indices, builds an intermediate_batch, executes the filter, and returns the (build_indices, probe_indices) that passed the filter.
My understanding is that coalesce is used to merge multiple RecordBatches with a row count less than a target into new RecordBatches with a row count greater than the target. How would that apply in this situation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I fully understand your suggestions. Could you please elaborate?
What about executing the filter directly on build/probe indices slice here and concatenating the indices later?
I'm a bit confused by this. Do you mean we can avoid constructing the
intermediate_batch? It seems this approach would require rewriting the filter logic to work directly on index slicesand using coalescing kernel
I'm also not sure why the
coalescekernel should be used here. The current function takesbuild_indicesandprobe_indices, builds anintermediate_batch, executes the filter, and returns the(build_indices, probe_indices)that passed the filter.My understanding is that
coalesceis used to merge multipleRecordBatches with a row count less than a target into newRecordBatches with a row count greater than the target. How would that apply in this situation?
This is an idea for future optimization:
If we use this interface https://docs.rs/arrow-select/55.2.0/src/arrow_select/coalesce.rs.html#189-193 instead of the current concat_batches() approach, it can be 1. use less memory 2. faster
Now the above interface hasn't been implemented with the fast path yet.
You can see the motivations in apache/arrow-rs#6692
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
However, after replacing concat with coalescer.push_batch_with_filter, performance actually decreased.
code
pub(crate) fn apply_join_filter_to_indices(
build_input_buffer: &RecordBatch,
probe_batch: &RecordBatch,
build_indices: UInt64Array,
probe_indices: UInt32Array,
filter: &JoinFilter,
build_side: JoinSide,
max_intermediate_size: Option<usize>,
) -> Result<(UInt64Array, UInt32Array)> {
if build_indices.is_empty() && probe_indices.is_empty() {
return Ok((build_indices, probe_indices));
};
if let Some(max_size) = max_intermediate_size {
let indices_schema = Arc::new(Schema::new(vec![
Field::new("build_indices", arrow::datatypes::DataType::UInt64, false),
Field::new("probe_indices", arrow::datatypes::DataType::UInt32, false),
]));
let build_indices = Arc::new(build_indices);
let probe_indices = Arc::new(probe_indices);
let indices_batch = RecordBatch::try_new(
indices_schema,
vec![
Arc::clone(&build_indices) as Arc<dyn Array>,
Arc::clone(&probe_indices) as Arc<dyn Array>,
],
)?;
let mut coalescer =
BatchCoalescer::new(indices_batch.schema(), indices_batch.num_rows());
for i in (0..build_indices.len()).step_by(max_size) {
let end = min(build_indices.len(), i + max_size);
let len = end - i;
let intermediate_batch = build_batch_from_indices(
filter.schema(),
build_input_buffer,
probe_batch,
&build_indices.slice(i, len),
&probe_indices.slice(i, len),
filter.column_indices(),
build_side,
)?;
let filter_result = filter
.expression()
.evaluate(&intermediate_batch)?
.into_array(intermediate_batch.num_rows())?;
coalescer.push_batch_with_filter(
indices_batch.slice(i, len),
as_boolean_array(&filter_result)?,
)?;
}
coalescer.finish_buffered_batch()?;
let result = coalescer.next_completed_batch();
if result.is_none() {
return Ok((build_indices.slice(0, 0), probe_indices.slice(0, 0)));
}
if coalescer.has_completed_batch() {
return internal_err!("should not have completed_batch");
}
let (_, arrays, _) = result.unwrap().into_parts();
return Ok((
downcast_array(arrays[0].as_ref()),
downcast_array(arrays[1].as_ref()),
));
}
let intermediate_batch = build_batch_from_indices(
filter.schema(),
build_input_buffer,
probe_batch,
&build_indices,
&probe_indices,
filter.column_indices(),
build_side,
)?;
let filter_result = filter
.expression()
.evaluate(&intermediate_batch)?
.into_array(intermediate_batch.num_rows())?;
let mask = as_boolean_array(&filter_result)?;
let left_filtered = compute::filter(&build_indices, mask)?;
let right_filtered = compute::filter(&probe_indices, mask)?;
Ok((
downcast_array(left_filtered.as_ref()),
downcast_array(right_filtered.as_ref()),
))
}
bench result
| ID | SQL | join_limit_join_batch_size Time(s) | use_BatchCoalescer Time(s) | Performance Change |
|---|---|---|---|---|
| 1 | select t1.value from range(8192) t1 join range(8192) t2 on t1.value + t2.value < t1.value * t2.value; | 0.559 | 0.671 | 1.20x slower 🐌 |
| 2 | select t1.value from range(8192) t1 join range(8192) t2 on t1.value + t2.value > t1.value * t2.value; | 0.377 | 0.371 | +1.02x faster 🚀 |
| 3 | select t1.value from range(8192) t1 left join range(8192) t2 on t1.value + t2.value > t1.value * t2.value; | 0.363 | 0.363 | +1.00x faster 🚀 |
| 4 | select t1.value from range(8192) t1 join range(81920) t2 on t1.value + t2.value < t1.value * t2.value; | 1.556 | 2.031 | 1.30x slower 🐌 |
| 5 | select t1.value from range(100) t1 join range(819200) t2 on t1.value + t2.value > t1.value * t2.value; | 0.063 | 0.057 | +1.11x faster 🚀 |
| 6 | select t1.value from range(100) t1 join range(819200) t2 on t1.value + t2.value < t1.value * t2.value; | 0.153 | 0.194 | 1.27x slower 🐌 |
| SQL Query | join_limit_join_batch_size Memory | use_BatchCoalescer Memory | Improvement |
|---|---|---|---|
| select t1.value from range(8192) t1 join range(8192) t2 on t1.value + t2.value < t1.value * t2.value; | 1.57 GB | 2.31 GB | 1.47x more 🐌 |
| select t1.value from range(8192) t1 join range(8192) t2 on t1.value + t2.value > t1.value * t2.value; | 841.5 MB | 824.9 MB | +1.02x saved 🚀 |
| select t1.value from range(8192) t1 left join range(8192) t2 on t1.value + t2.value > t1.value * t2.value; | 845.3 MB | 824.6 MB | +1.03x saved 🚀 |
| select t1.value from range(8192) t1 join range(81920) t2 on t1.value + t2.value < t1.value * t2.value; | 15.00 GB | 20.36 GB | 1.36x more 🐌 |
| select t1.value from range(100) t1 join range(819200) t2 on t1.value + t2.value > t1.value * t2.value; | 328.1 MB | 327.6 MB | +1.00x saved 🚀 |
| select t1.value from range(100) t1 join range(819200) t2 on t1.value + t2.value < t1.value * t2.value; | 659.8 MB | 810.4 MB | 1.23x more 🐌 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think batch coalescer won't make this faster as this is buffering everything in memory anyway.
The main idea would be to apply the filters iteratively to the incoming RecordBatch instead of the indices, so we have to change the API / implementation a bit more.
It's fine to leave this as future change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's expected, now we only got the interface ready, the efficient implementation is still WIP.
See https://docs.rs/arrow-select/55.2.0/src/arrow_select/coalesce.rs.html#194
2010YOUY01
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this PR's idea is great, the implementation overall looks good to me.
I recommend to doc more high-level ideas to key functions, to make this module easier to maintain in the future, specifically: build_unmatched_output(), prepare_unmatched_output_indices(), and get_next_join_result()
| .expression() | ||
| .evaluate(&intermediate_batch)? | ||
| .into_array(intermediate_batch.num_rows())?; | ||
| let filter_result = if let Some(max_size) = max_intermediate_size { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this removes the purpose of the pull request if we are building the entire amount of indices
This PR only limits the size of the intermediate
record_batch. The Cartesian product of the entireleft_tableandright_batchis still generated at once (this will be limited in a subsequent PR).
Additionally, making the Cartesian product step incremental likely requires a larger refactor (comparing to this PR), so it may be better suited for a separate PR.
| } | ||
| } | ||
|
|
||
| /// Tracks progress when building join result batches incrementally. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| /// Tracks progress when building join result batches incrementally. | |
| /// Tracks incremental output of join result batches. | |
| /// | |
| /// Initialized with all matching pairs that satisfy the join predicate. | |
| /// Pairs are stored as indices in `build_indices` and `probe_indices` | |
| /// Each poll outputs a batch within the configured size limit and updates | |
| /// processed_count until all pairs are consumed. | |
| /// | |
| /// Example: 5000 matches, batch size limit is 100 | |
| /// - Poll 1: output batch[0..100], processed_count = 100 | |
| /// - Poll 2: output batch[100..200], processed_count = 200 | |
| /// - ...continues until processed_count = 5000 |
It would be helpful to doc high-level ideas and examples, for key structs and functions.
| match join_result { | ||
| Some(res) => { | ||
| self.join_metrics.output_batches.add(1); | ||
| self.join_metrics.output_rows.add(res.num_rows()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't have to count output_rows here: it would be automatically counted in the outer poll
This was made by a recent change: #16500
While output_batches still needs to be manually tracked here, it could also be automatically counted in the future.
| fn build_unmatched_output( | ||
| &mut self, | ||
| ) -> Result<StatefulStreamResult<Option<RecordBatch>>> { | ||
| let start = Instant::now(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think we can just construct a timer guard here and let it stop on drops.
| ProcessProbeBatch(RecordBatch), | ||
| /// Indicates that probe-side has been fully processed | ||
| ExhaustedProbeSide, | ||
| /// Output unmatched build-side rows |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| /// Output unmatched build-side rows | |
| /// Output unmatched build-side rows. | |
| /// The indices for rows to output has already been calculated in the previous | |
| /// `ExhaustedProbeSide` state. In this state the final batch will be materialized | |
| // incrementally. | |
| // The inner `RecordBatch` is an empty dummy batch used to get right schema. |
Maybe we can also rename ExhaustedProbeSide to PrepareUnmatchedBuildRows to be more accurate.
|
I have addressed all of your comments. @2010YOUY01 please take another look
Since the logic of |
I'm starting a second pass. I haven't fully grasped the internal logic of I think documenting the semantics of left/right indices for different join types can help readability, like
|
|
@2010YOUY01 Special types need to only return the matching rows, so only one side needs to return rows while the other side can return a null array and not be projected in the final result. This functionality was only moved and had already existed before this pull request where most of it sits in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks again. 🙏🏼
I’ll keep it open for a few more days in case other reviewers have additional concerns they'd like to raise.
|
|
||
| let current_start = *start; | ||
|
|
||
| if left_indices.is_empty() && right_indices.is_empty() && current_start == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't get this status.processed_count = 1 logic either, perhaps you can add a quick comment to explain it?
| return Ok(Some(res)); | ||
| } | ||
|
|
||
| if matches!(self.join_type, JoinType::RightSemi | JoinType::RightAnti) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think from here to the end of the function, it can look nicer if we structure it like this way
match self.join_type {
JoinType::RightSemi | JoinType::RightAnti => {...}
JoinType::RightMark => {...}
JoinType::......(others) => {}
_ => {unreachable!()}
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer to stick with the current implementation. The reason is that the code block from L925 to L939 is shared by several JoinTypes, including RightMark, Inner, LeftSemi, etc.
If we refactor this into the match structure as suggested, we would have to duplicate that block of logic in multiple match arms
| /// - Poll 1: output batch[0..100], processed_count = 100 | ||
| /// - Poll 2: output batch[100..200], processed_count = 200 | ||
| /// - ...continues until processed_count = 5000 | ||
| struct JoinResultStatus { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Status is most commonly used for error code/ state flags, perhaps we can JoinResultProgress here to avoid confusion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
Is this one ready to merge? |
|
@alamb Yes I believe all comments have been addressed. I think we have two notable follow ups:
|
|
Awesome -- thanks @jonathanc-n and @UBarney -- I am very happy to see this moving along |
Yes. I'll work on this very soon |
I'm happy to include this benchmark in the bench suite this week, unless you were already planning to add it yourself @UBarney |
That would be fantastic, thank you! I hadn't planned on adding it myself, so your help is much appreciated. Please, go right ahead. |


Which issue does this PR close?
part of #16364
Rationale for this change
see issue
What changes are included in this PR?
Are these changes tested?
Yes
Are there any user-facing changes?