From 13c09f3f2f23688e3cdca125659bd1673888a2fb Mon Sep 17 00:00:00 2001 From: kosiew Date: Sat, 4 Apr 2026 14:48:24 +0800 Subject: [PATCH] Skip probe-side consumption when hash join build side is empty (#21068) ## Which issue does this PR close? * Closes #20492. ## Rationale for this change `HashJoinExec` currently continues polling and consuming the probe side even after the build side has completed with zero rows. For join types whose output is guaranteed to be empty when the build side is empty, this work is unnecessary. In practice, it can trigger large avoidable scans and extra compute despite producing no output. This is especially costly for cases such as INNER, LEFT, LEFT SEMI, LEFT ANTI, LEFT MARK, and RIGHT SEMI joins. This change makes the stream state machine aware of that condition so execution can terminate as soon as the build side is known to be empty and no probe rows are needed to determine the final result. The change also preserves the existing behavior for join types that still require probe-side rows even when the build side is empty, such as RIGHT, FULL, RIGHT ANTI, and RIGHT MARK joins. ## What changes are included in this PR? * Added `JoinType::empty_build_side_produces_empty_result` to centralize logic determining when an empty build side guarantees empty output. * Updated `HashJoinStream` state transitions to: * Skip transitioning to `FetchProbeBatch` when the build side is empty and output is deterministically empty. * Immediately complete the stream in such cases. * Refactored logic in `build_batch_empty_build_side` to reuse the new helper method and simplify match branches. * Ensured probe-side consumption still occurs for join types that require probe rows (e.g., RIGHT, FULL). * Added helper `state_after_build_ready` to unify post-build decision logic. * Introduced reusable helper for constructing hash joins with dynamic filters in tests. ## Are these changes tested? Yes, comprehensive tests have been added: * Verified that probe side is **not consumed** when: * Build side is empty * Join type guarantees empty output * Verified that probe side **is still consumed** when required by join semantics (e.g., RIGHT, FULL joins) * Covered both filtered and non-filtered joins * Added tests ensuring correct behavior with dynamic filters * Added regression test ensuring correct behavior after partition bounds reporting These tests validate both correctness and the intended optimization behavior. ## Are there any user-facing changes? No API changes. However, this introduces a performance optimization: * Queries involving joins with empty build sides may complete significantly faster * Reduced unnecessary IO and compute No behavioral changes in query results. ## LLM-generated code disclosure This PR includes LLM-generated code and comments. All LLM-generated content has been manually reviewed and tested. (cherry picked from commit 6c5e241e6298e70077259b3a12840c3adab3c810) --- datafusion/common/src/join_type.rs | 14 ++ .../physical-plan/src/joins/hash_join/exec.rs | 237 ++++++++++++++---- .../src/joins/hash_join/stream.rs | 30 ++- datafusion/physical-plan/src/joins/utils.rs | 62 ++--- 4 files changed, 260 insertions(+), 83 deletions(-) diff --git a/datafusion/common/src/join_type.rs b/datafusion/common/src/join_type.rs index e6a90db2dc3e..50a4d88f3f2c 100644 --- a/datafusion/common/src/join_type.rs +++ b/datafusion/common/src/join_type.rs @@ -113,6 +113,20 @@ impl JoinType { | JoinType::RightMark ) } + + /// Returns true when an empty build side necessarily produces an empty + /// result for this join type. + pub fn empty_build_side_produces_empty_result(self) -> bool { + matches!( + self, + JoinType::Inner + | JoinType::Left + | JoinType::LeftSemi + | JoinType::LeftAnti + | JoinType::LeftMark + | JoinType::RightSemi + ) + } } impl Display for JoinType { diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 25b320f98550..781f7ce879b9 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -2215,6 +2215,110 @@ mod tests { ) } + fn empty_build_with_probe_error_inputs() + -> (Arc, Arc, JoinOn) { + let left_batch = + build_table_i32(("a1", &vec![]), ("b1", &vec![]), ("c1", &vec![])); + let left_schema = left_batch.schema(); + let left: Arc = TestMemoryExec::try_new_exec( + &[vec![left_batch]], + Arc::clone(&left_schema), + None, + ) + .unwrap(); + + let err = exec_err!("bad data error"); + let right_batch = + build_table_i32(("a2", &vec![]), ("b1", &vec![]), ("c2", &vec![])); + let right_schema = right_batch.schema(); + let on = vec![( + Arc::new(Column::new_with_schema("b1", &left_schema).unwrap()) as _, + Arc::new(Column::new_with_schema("b1", &right_schema).unwrap()) as _, + )]; + let right: Arc = Arc::new( + MockExec::new(vec![Ok(right_batch), err], right_schema).with_use_task(false), + ); + + (left, right, on) + } + + async fn assert_empty_build_probe_behavior( + join_types: &[JoinType], + expect_probe_error: bool, + with_filter: bool, + ) { + let (left, right, on) = empty_build_with_probe_error_inputs(); + let filter = prepare_join_filter(); + + for join_type in join_types { + let join = if with_filter { + join_with_filter( + Arc::clone(&left), + Arc::clone(&right), + on.clone(), + filter.clone(), + join_type, + NullEquality::NullEqualsNothing, + ) + .unwrap() + } else { + join( + Arc::clone(&left), + Arc::clone(&right), + on.clone(), + join_type, + NullEquality::NullEqualsNothing, + ) + .unwrap() + }; + + let result = common::collect( + join.execute(0, Arc::new(TaskContext::default())).unwrap(), + ) + .await; + + if expect_probe_error { + let result_string = result.unwrap_err().to_string(); + assert!( + result_string.contains("bad data error"), + "actual: {result_string}" + ); + } else { + let batches = result.unwrap(); + assert!( + batches.is_empty(), + "expected no output batches for {join_type}, got {batches:?}" + ); + } + } + } + + fn hash_join_with_dynamic_filter( + left: Arc, + right: Arc, + on: JoinOn, + join_type: JoinType, + ) -> Result<(HashJoinExec, Arc)> { + let dynamic_filter = HashJoinExec::create_dynamic_filter(&on); + let mut join = HashJoinExec::try_new( + left, + right, + on, + None, + &join_type, + None, + PartitionMode::CollectLeft, + NullEquality::NullEqualsNothing, + false, + )?; + join.dynamic_filter = Some(HashJoinExecDynamicFilter { + filter: Arc::clone(&dynamic_filter), + build_accumulator: OnceLock::new(), + }); + + Ok((join, dynamic_filter)) + } + async fn join_collect( left: Arc, right: Arc, @@ -4923,6 +5027,70 @@ mod tests { } } + #[tokio::test] + async fn join_does_not_consume_probe_when_empty_build_fixes_output() { + assert_empty_build_probe_behavior( + &[ + JoinType::Inner, + JoinType::Left, + JoinType::LeftSemi, + JoinType::LeftAnti, + JoinType::LeftMark, + JoinType::RightSemi, + ], + false, + false, + ) + .await; + } + + #[tokio::test] + async fn join_does_not_consume_probe_when_empty_build_fixes_output_with_filter() { + assert_empty_build_probe_behavior( + &[ + JoinType::Inner, + JoinType::Left, + JoinType::LeftSemi, + JoinType::LeftAnti, + JoinType::LeftMark, + JoinType::RightSemi, + ], + false, + true, + ) + .await; + } + + #[tokio::test] + async fn join_still_consumes_probe_when_empty_build_needs_probe_rows() { + assert_empty_build_probe_behavior( + &[ + JoinType::Right, + JoinType::Full, + JoinType::RightAnti, + JoinType::RightMark, + ], + true, + false, + ) + .await; + } + + #[tokio::test] + async fn join_still_consumes_probe_when_empty_build_needs_probe_rows_with_filter() { + assert_empty_build_probe_behavior( + &[ + JoinType::Right, + JoinType::Full, + JoinType::RightAnti, + JoinType::RightMark, + ], + true, + true, + ) + .await; + } + #[tokio::test] async fn join_split_batch() { let left = build_table( @@ -5366,26 +5534,8 @@ mod tests { Arc::new(Column::new_with_schema("b1", &right.schema())?) as _, )]; - // Create a dynamic filter manually - let dynamic_filter = HashJoinExec::create_dynamic_filter(&on); - let dynamic_filter_clone = Arc::clone(&dynamic_filter); - - // Create HashJoinExec with the dynamic filter - let mut join = HashJoinExec::try_new( - left, - right, - on, - None, - &JoinType::Inner, - None, - PartitionMode::CollectLeft, - NullEquality::NullEqualsNothing, - false, - )?; - join.dynamic_filter = Some(HashJoinExecDynamicFilter { - filter: dynamic_filter, - build_accumulator: OnceLock::new(), - }); + let (join, dynamic_filter) = + hash_join_with_dynamic_filter(left, right, on, JoinType::Inner)?; // Execute the join let stream = join.execute(0, task_ctx)?; @@ -5393,7 +5543,7 @@ mod tests { // After the join completes, the dynamic filter should be marked as complete // wait_complete() should return immediately - dynamic_filter_clone.wait_complete().await; + dynamic_filter.wait_complete().await; Ok(()) } @@ -5415,26 +5565,8 @@ mod tests { Arc::new(Column::new_with_schema("b1", &right.schema())?) as _, )]; - // Create a dynamic filter manually - let dynamic_filter = HashJoinExec::create_dynamic_filter(&on); - let dynamic_filter_clone = Arc::clone(&dynamic_filter); - - // Create HashJoinExec with the dynamic filter - let mut join = HashJoinExec::try_new( - left, - right, - on, - None, - &JoinType::Inner, - None, - PartitionMode::CollectLeft, - NullEquality::NullEqualsNothing, - false, - )?; - join.dynamic_filter = Some(HashJoinExecDynamicFilter { - filter: dynamic_filter, - build_accumulator: OnceLock::new(), - }); + let (join, dynamic_filter) = + hash_join_with_dynamic_filter(left, right, on, JoinType::Inner)?; // Execute the join let stream = join.execute(0, task_ctx)?; @@ -5442,7 +5574,28 @@ mod tests { // Even with empty build side, the dynamic filter should be marked as complete // wait_complete() should return immediately - dynamic_filter_clone.wait_complete().await; + dynamic_filter.wait_complete().await; + + Ok(()) + } + + #[tokio::test] + async fn test_hash_join_skips_probe_on_empty_build_after_partition_bounds_report() + -> Result<()> { + let task_ctx = Arc::new(TaskContext::default()); + let (left, right, on) = empty_build_with_probe_error_inputs(); + + // Keep an extra consumer reference so execute() enables dynamic filter pushdown + // and enters the WaitPartitionBoundsReport path before deciding whether to poll + // the probe side. + let (join, dynamic_filter) = + hash_join_with_dynamic_filter(left, right, on, JoinType::Inner)?; + + let stream = join.execute(0, task_ctx)?; + let batches = common::collect(stream).await?; + assert!(batches.is_empty()); + + dynamic_filter.wait_complete().await; Ok(()) } diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index b31982ea3b7b..d64b35a4dd24 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -406,6 +406,21 @@ impl HashJoinStream { } } + /// Returns the next state after the build side has been fully collected + /// and any required build-side coordination has completed. + fn state_after_build_ready( + join_type: JoinType, + left_data: &JoinLeftData, + ) -> HashJoinStreamState { + if left_data.map().is_empty() + && join_type.empty_build_side_produces_empty_result() + { + HashJoinStreamState::Completed + } else { + HashJoinStreamState::FetchProbeBatch + } + } + /// Separate implementation function that unpins the [`HashJoinStream`] so /// that partial borrows work correctly fn poll_next_impl( @@ -469,7 +484,9 @@ impl HashJoinStream { if let Some(ref mut fut) = self.build_waiter { ready!(fut.get_shared(cx))?; } - self.state = HashJoinStreamState::FetchProbeBatch; + let build_side = self.build_side.try_as_ready()?; + self.state = + Self::state_after_build_ready(self.join_type, build_side.left_data.as_ref()); Poll::Ready(Ok(StatefulStreamResult::Continue)) } @@ -540,7 +557,8 @@ impl HashJoinStream { })); self.state = HashJoinStreamState::WaitPartitionBoundsReport; } else { - self.state = HashJoinStreamState::FetchProbeBatch; + self.state = + Self::state_after_build_ready(self.join_type, left_data.as_ref()); } self.build_side = BuildSide::Ready(BuildSideReadyState { left_data }); @@ -643,10 +661,14 @@ impl HashJoinStream { } } - // if the left side is empty, we can skip the (potentially expensive) join operation + // If the build side is empty, this stream only reaches ProcessProbeBatch for + // join types whose output still depends on probe rows. let is_empty = build_side.left_data.map().is_empty(); - if is_empty && self.filter.is_none() { + if is_empty { + // Invariant: state_after_build_ready should have already completed + // join types whose result is fixed to empty when the build side is empty. + debug_assert!(!self.join_type.empty_build_side_produces_empty_result()); let result = build_batch_empty_build_side( &self.schema, build_side.left_data.batch(), diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index cf4bf2cd163f..56ab8366cfd8 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -1060,47 +1060,35 @@ pub(crate) fn build_batch_empty_build_side( column_indices: &[ColumnIndex], join_type: JoinType, ) -> Result { - match join_type { - // these join types only return data if the left side is not empty, so we return an - // empty RecordBatch - JoinType::Inner - | JoinType::Left - | JoinType::LeftSemi - | JoinType::RightSemi - | JoinType::LeftAnti - | JoinType::LeftMark => Ok(RecordBatch::new_empty(Arc::new(schema.clone()))), + if join_type.empty_build_side_produces_empty_result() { + // These join types only return data if the left side is not empty. + return Ok(RecordBatch::new_empty(Arc::new(schema.clone()))); + } - // the remaining joins will return data for the right columns and null for the left ones - JoinType::Right | JoinType::Full | JoinType::RightAnti | JoinType::RightMark => { - let num_rows = probe_batch.num_rows(); - if schema.fields().is_empty() { - return new_empty_schema_batch(schema, num_rows); - } - let mut columns: Vec> = - Vec::with_capacity(schema.fields().len()); - - for column_index in column_indices { - let array = match column_index.side { - // left -> null array - JoinSide::Left => new_null_array( - build_batch.column(column_index.index).data_type(), - num_rows, - ), - // right -> respective right array - JoinSide::Right => Arc::clone(probe_batch.column(column_index.index)), - // right mark -> unset boolean array as there are no matches on the left side - JoinSide::None => Arc::new(BooleanArray::new( - BooleanBuffer::new_unset(num_rows), - None, - )), - }; + // The remaining joins return right-side rows and nulls for the left side. + let num_rows = probe_batch.num_rows(); + if schema.fields().is_empty() { + return new_empty_schema_batch(schema, num_rows); + } - columns.push(array); + let columns = column_indices + .iter() + .map(|column_index| match column_index.side { + // left -> null array + JoinSide::Left => new_null_array( + build_batch.column(column_index.index).data_type(), + num_rows, + ), + // right -> respective right array + JoinSide::Right => Arc::clone(probe_batch.column(column_index.index)), + // right mark -> unset boolean array as there are no matches on the left side + JoinSide::None => { + Arc::new(BooleanArray::new(BooleanBuffer::new_unset(num_rows), None)) } + }) + .collect(); - Ok(RecordBatch::try_new(Arc::new(schema.clone()), columns)?) - } - } + Ok(RecordBatch::try_new(Arc::new(schema.clone()), columns)?) } /// The input is the matched indices for left and right and