From d89e9d0947c22718078b2c0cf020cadfdf75c098 Mon Sep 17 00:00:00 2001 From: kosiew Date: Sat, 4 Apr 2026 14:48:24 +0800 Subject: [PATCH 1/2] Skip probe-side consumption when hash join build side is empty (#21068) * Closes #20492. `HashJoinExec` currently continues polling and consuming the probe side even after the build side has completed with zero rows. For join types whose output is guaranteed to be empty when the build side is empty, this work is unnecessary. In practice, it can trigger large avoidable scans and extra compute despite producing no output. This is especially costly for cases such as INNER, LEFT, LEFT SEMI, LEFT ANTI, LEFT MARK, and RIGHT SEMI joins. This change makes the stream state machine aware of that condition so execution can terminate as soon as the build side is known to be empty and no probe rows are needed to determine the final result. The change also preserves the existing behavior for join types that still require probe-side rows even when the build side is empty, such as RIGHT, FULL, RIGHT ANTI, and RIGHT MARK joins. * Added `JoinType::empty_build_side_produces_empty_result` to centralize logic determining when an empty build side guarantees empty output. * Updated `HashJoinStream` state transitions to: * Skip transitioning to `FetchProbeBatch` when the build side is empty and output is deterministically empty. * Immediately complete the stream in such cases. * Refactored logic in `build_batch_empty_build_side` to reuse the new helper method and simplify match branches. * Ensured probe-side consumption still occurs for join types that require probe rows (e.g., RIGHT, FULL). * Added helper `state_after_build_ready` to unify post-build decision logic. * Introduced reusable helper for constructing hash joins with dynamic filters in tests. Yes, comprehensive tests have been added: * Verified that probe side is **not consumed** when: * Build side is empty * Join type guarantees empty output * Verified that probe side **is still consumed** when required by join semantics (e.g., RIGHT, FULL joins) * Covered both filtered and non-filtered joins * Added tests ensuring correct behavior with dynamic filters * Added regression test ensuring correct behavior after partition bounds reporting These tests validate both correctness and the intended optimization behavior. No API changes. However, this introduces a performance optimization: * Queries involving joins with empty build sides may complete significantly faster * Reduced unnecessary IO and compute No behavioral changes in query results. This PR includes LLM-generated code and comments. All LLM-generated content has been manually reviewed and tested. (cherry picked from commit 6c5e241e6298e70077259b3a12840c3adab3c810) --- datafusion/common/src/join_type.rs | 14 ++ .../physical-plan/src/joins/hash_join/exec.rs | 235 +++++++++++++++--- .../src/joins/hash_join/stream.rs | 70 +++++- datafusion/physical-plan/src/joins/utils.rs | 61 ++--- 4 files changed, 301 insertions(+), 79 deletions(-) diff --git a/datafusion/common/src/join_type.rs b/datafusion/common/src/join_type.rs index e6a90db2dc3e..50a4d88f3f2c 100644 --- a/datafusion/common/src/join_type.rs +++ b/datafusion/common/src/join_type.rs @@ -113,6 +113,20 @@ impl JoinType { | JoinType::RightMark ) } + + /// Returns true when an empty build side necessarily produces an empty + /// result for this join type. + pub fn empty_build_side_produces_empty_result(self) -> bool { + matches!( + self, + JoinType::Inner + | JoinType::Left + | JoinType::LeftSemi + | JoinType::LeftAnti + | JoinType::LeftMark + | JoinType::RightSemi + ) + } } impl Display for JoinType { diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index eb2dacf58aa7..ed164b0a0761 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -1655,6 +1655,110 @@ mod tests { ) } + fn empty_build_with_probe_error_inputs() + -> (Arc, Arc, JoinOn) { + let left_batch = + build_table_i32(("a1", &vec![]), ("b1", &vec![]), ("c1", &vec![])); + let left_schema = left_batch.schema(); + let left: Arc = TestMemoryExec::try_new_exec( + &[vec![left_batch]], + Arc::clone(&left_schema), + None, + ) + .unwrap(); + + let err = exec_err!("bad data error"); + let right_batch = + build_table_i32(("a2", &vec![]), ("b1", &vec![]), ("c2", &vec![])); + let right_schema = right_batch.schema(); + let on = vec![( + Arc::new(Column::new_with_schema("b1", &left_schema).unwrap()) as _, + Arc::new(Column::new_with_schema("b1", &right_schema).unwrap()) as _, + )]; + let right: Arc = Arc::new( + MockExec::new(vec![Ok(right_batch), err], right_schema).with_use_task(false), + ); + + (left, right, on) + } + + async fn assert_empty_build_probe_behavior( + join_types: &[JoinType], + expect_probe_error: bool, + with_filter: bool, + ) { + let (left, right, on) = empty_build_with_probe_error_inputs(); + let filter = prepare_join_filter(); + + for join_type in join_types { + let join = if with_filter { + join_with_filter( + Arc::clone(&left), + Arc::clone(&right), + on.clone(), + filter.clone(), + join_type, + NullEquality::NullEqualsNothing, + ) + .unwrap() + } else { + join( + Arc::clone(&left), + Arc::clone(&right), + on.clone(), + join_type, + NullEquality::NullEqualsNothing, + ) + .unwrap() + }; + + let result = common::collect( + join.execute(0, Arc::new(TaskContext::default())).unwrap(), + ) + .await; + + if expect_probe_error { + let result_string = result.unwrap_err().to_string(); + assert!( + result_string.contains("bad data error"), + "actual: {result_string}" + ); + } else { + let batches = result.unwrap(); + assert!( + batches.is_empty(), + "expected no output batches for {join_type}, got {batches:?}" + ); + } + } + } + + fn hash_join_with_dynamic_filter( + left: Arc, + right: Arc, + on: JoinOn, + join_type: JoinType, + ) -> Result<(HashJoinExec, Arc)> { + let dynamic_filter = HashJoinExec::create_dynamic_filter(&on); + let mut join = HashJoinExec::try_new( + left, + right, + on, + None, + &join_type, + None, + PartitionMode::CollectLeft, + NullEquality::NullEqualsNothing, + false, + )?; + join.dynamic_filter = Some(HashJoinExecDynamicFilter { + filter: Arc::clone(&dynamic_filter), + build_accumulator: OnceLock::new(), + }); + + Ok((join, dynamic_filter)) + } + async fn join_collect( left: Arc, right: Arc, @@ -4187,6 +4291,70 @@ mod tests { } } + #[tokio::test] + async fn join_does_not_consume_probe_when_empty_build_fixes_output() { + assert_empty_build_probe_behavior( + &[ + JoinType::Inner, + JoinType::Left, + JoinType::LeftSemi, + JoinType::LeftAnti, + JoinType::LeftMark, + JoinType::RightSemi, + ], + false, + false, + ) + .await; + } + + #[tokio::test] + async fn join_does_not_consume_probe_when_empty_build_fixes_output_with_filter() { + assert_empty_build_probe_behavior( + &[ + JoinType::Inner, + JoinType::Left, + JoinType::LeftSemi, + JoinType::LeftAnti, + JoinType::LeftMark, + JoinType::RightSemi, + ], + false, + true, + ) + .await; + } + + #[tokio::test] + async fn join_still_consumes_probe_when_empty_build_needs_probe_rows() { + assert_empty_build_probe_behavior( + &[ + JoinType::Right, + JoinType::Full, + JoinType::RightAnti, + JoinType::RightMark, + ], + true, + false, + ) + .await; + } + + #[tokio::test] + async fn join_still_consumes_probe_when_empty_build_needs_probe_rows_with_filter() { + assert_empty_build_probe_behavior( + &[ + JoinType::Right, + JoinType::Full, + JoinType::RightAnti, + JoinType::RightMark, + ], + true, + true, + ) + .await; + } + #[tokio::test] async fn join_split_batch() { let left = build_table( @@ -4629,25 +4797,8 @@ mod tests { Arc::new(Column::new_with_schema("b1", &right.schema())?) as _, )]; - // Create a dynamic filter manually - let dynamic_filter = HashJoinExec::create_dynamic_filter(&on); - let dynamic_filter_clone = Arc::clone(&dynamic_filter); - - // Create HashJoinExec with the dynamic filter - let mut join = HashJoinExec::try_new( - left, - right, - on, - None, - &JoinType::Inner, - None, - PartitionMode::CollectLeft, - NullEquality::NullEqualsNothing, - )?; - join.dynamic_filter = Some(HashJoinExecDynamicFilter { - filter: dynamic_filter, - build_accumulator: OnceLock::new(), - }); + let (join, dynamic_filter) = + hash_join_with_dynamic_filter(left, right, on, JoinType::Inner)?; // Execute the join let stream = join.execute(0, task_ctx)?; @@ -4655,7 +4806,7 @@ mod tests { // After the join completes, the dynamic filter should be marked as complete // wait_complete() should return immediately - dynamic_filter_clone.wait_complete().await; + dynamic_filter.wait_complete().await; Ok(()) } @@ -4677,25 +4828,8 @@ mod tests { Arc::new(Column::new_with_schema("b1", &right.schema())?) as _, )]; - // Create a dynamic filter manually - let dynamic_filter = HashJoinExec::create_dynamic_filter(&on); - let dynamic_filter_clone = Arc::clone(&dynamic_filter); - - // Create HashJoinExec with the dynamic filter - let mut join = HashJoinExec::try_new( - left, - right, - on, - None, - &JoinType::Inner, - None, - PartitionMode::CollectLeft, - NullEquality::NullEqualsNothing, - )?; - join.dynamic_filter = Some(HashJoinExecDynamicFilter { - filter: dynamic_filter, - build_accumulator: OnceLock::new(), - }); + let (join, dynamic_filter) = + hash_join_with_dynamic_filter(left, right, on, JoinType::Inner)?; // Execute the join let stream = join.execute(0, task_ctx)?; @@ -4703,7 +4837,28 @@ mod tests { // Even with empty build side, the dynamic filter should be marked as complete // wait_complete() should return immediately - dynamic_filter_clone.wait_complete().await; + dynamic_filter.wait_complete().await; + + Ok(()) + } + + #[tokio::test] + async fn test_hash_join_skips_probe_on_empty_build_after_partition_bounds_report() + -> Result<()> { + let task_ctx = Arc::new(TaskContext::default()); + let (left, right, on) = empty_build_with_probe_error_inputs(); + + // Keep an extra consumer reference so execute() enables dynamic filter pushdown + // and enters the WaitPartitionBoundsReport path before deciding whether to poll + // the probe side. + let (join, dynamic_filter) = + hash_join_with_dynamic_filter(left, right, on, JoinType::Inner)?; + + let stream = join.execute(0, task_ctx)?; + let batches = common::collect(stream).await?; + assert!(batches.is_empty()); + + dynamic_filter.wait_complete().await; Ok(()) } diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index c5c794f5a8c6..82f4b3b3f66b 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -404,6 +404,21 @@ impl HashJoinStream { } } + /// Returns the next state after the build side has been fully collected + /// and any required build-side coordination has completed. + fn state_after_build_ready( + join_type: JoinType, + left_data: &JoinLeftData, + ) -> HashJoinStreamState { + if left_data.map().is_empty() + && join_type.empty_build_side_produces_empty_result() + { + HashJoinStreamState::Completed + } else { + HashJoinStreamState::FetchProbeBatch + } + } + /// Separate implementation function that unpins the [`HashJoinStream`] so /// that partial borrows work correctly fn poll_next_impl( @@ -462,7 +477,9 @@ impl HashJoinStream { if let Some(ref mut fut) = self.build_waiter { ready!(fut.get_shared(cx))?; } - self.state = HashJoinStreamState::FetchProbeBatch; + let build_side = self.build_side.try_as_ready()?; + self.state = + Self::state_after_build_ready(self.join_type, build_side.left_data.as_ref()); Poll::Ready(Ok(StatefulStreamResult::Continue)) } @@ -529,7 +546,8 @@ impl HashJoinStream { })); self.state = HashJoinStreamState::WaitPartitionBoundsReport; } else { - self.state = HashJoinStreamState::FetchProbeBatch; + self.state = + Self::state_after_build_ready(self.join_type, left_data.as_ref()); } self.build_side = BuildSide::Ready(BuildSideReadyState { left_data }); @@ -588,8 +606,52 @@ impl HashJoinStream { let timer = self.join_metrics.join_time.timer(); - // if the left side is empty, we can skip the (potentially expensive) join operation - if build_side.left_data.hash_map.is_empty() && self.filter.is_none() { + // Null-aware anti join semantics: + // For LeftAnti: output LEFT (build) rows where LEFT.key NOT IN RIGHT.key + // 1. If RIGHT (probe) contains NULL in any batch, no LEFT rows should be output + // 2. LEFT rows with NULL keys should not be output (handled in final stage) + if self.null_aware { + // Mark that we've seen a probe batch with actual rows (probe side is non-empty) + // Only set this if batch has rows - empty batches don't count + // Use shared atomic state so all partitions can see this global information + if state.batch.num_rows() > 0 { + build_side + .left_data + .probe_side_non_empty + .store(true, Ordering::Relaxed); + } + + // Check if probe side (RIGHT) contains NULL + // Since null_aware validation ensures single column join, we only check the first column + let probe_key_column = &state.values[0]; + if probe_key_column.null_count() > 0 { + // Found NULL in probe side - set shared flag to prevent any output + build_side + .left_data + .probe_side_has_null + .store(true, Ordering::Relaxed); + } + + // If probe side has NULL (detected in this or any other partition), return empty result + if build_side + .left_data + .probe_side_has_null + .load(Ordering::Relaxed) + { + timer.done(); + self.state = HashJoinStreamState::FetchProbeBatch; + return Ok(StatefulStreamResult::Continue); + } + } + + // If the build side is empty, this stream only reaches ProcessProbeBatch for + // join types whose output still depends on probe rows. + let is_empty = build_side.left_data.map().is_empty(); + + if is_empty { + // Invariant: state_after_build_ready should have already completed + // join types whose result is fixed to empty when the build side is empty. + debug_assert!(!self.join_type.empty_build_side_produces_empty_result()); let result = build_batch_empty_build_side( &self.schema, build_side.left_data.batch(), diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 53b4c4f80236..139df78ce199 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -1057,44 +1057,35 @@ pub(crate) fn build_batch_empty_build_side( column_indices: &[ColumnIndex], join_type: JoinType, ) -> Result { - match join_type { - // these join types only return data if the left side is not empty, so we return an - // empty RecordBatch - JoinType::Inner - | JoinType::Left - | JoinType::LeftSemi - | JoinType::RightSemi - | JoinType::LeftAnti - | JoinType::LeftMark => Ok(RecordBatch::new_empty(Arc::new(schema.clone()))), - - // the remaining joins will return data for the right columns and null for the left ones - JoinType::Right | JoinType::Full | JoinType::RightAnti | JoinType::RightMark => { - let num_rows = probe_batch.num_rows(); - let mut columns: Vec> = - Vec::with_capacity(schema.fields().len()); - - for column_index in column_indices { - let array = match column_index.side { - // left -> null array - JoinSide::Left => new_null_array( - build_batch.column(column_index.index).data_type(), - num_rows, - ), - // right -> respective right array - JoinSide::Right => Arc::clone(probe_batch.column(column_index.index)), - // right mark -> unset boolean array as there are no matches on the left side - JoinSide::None => Arc::new(BooleanArray::new( - BooleanBuffer::new_unset(num_rows), - None, - )), - }; + if join_type.empty_build_side_produces_empty_result() { + // These join types only return data if the left side is not empty. + return Ok(RecordBatch::new_empty(Arc::new(schema.clone()))); + } + + // The remaining joins return right-side rows and nulls for the left side. + let num_rows = probe_batch.num_rows(); + if schema.fields().is_empty() { + return new_empty_schema_batch(schema, num_rows); + } - columns.push(array); + let columns = column_indices + .iter() + .map(|column_index| match column_index.side { + // left -> null array + JoinSide::Left => new_null_array( + build_batch.column(column_index.index).data_type(), + num_rows, + ), + // right -> respective right array + JoinSide::Right => Arc::clone(probe_batch.column(column_index.index)), + // right mark -> unset boolean array as there are no matches on the left side + JoinSide::None => { + Arc::new(BooleanArray::new(BooleanBuffer::new_unset(num_rows), None)) } + }) + .collect(); - Ok(RecordBatch::try_new(Arc::new(schema.clone()), columns)?) - } - } + Ok(RecordBatch::try_new(Arc::new(schema.clone()), columns)?) } /// The input is the matched indices for left and right and From 6b9fbb838f8e41128cd57b181ba6c353a1b12305 Mon Sep 17 00:00:00 2001 From: LiaCastaneda Date: Mon, 6 Apr 2026 10:48:54 +0200 Subject: [PATCH 2/2] Fix cherry-pick of #21068: remove null-aware code and fix missing helpers The cherry-pick of apache PR #21068 incorrectly included null-aware anti-join code (referencing nonexistent fields `null_aware`, `probe_side_non_empty`, `probe_side_has_null` on `HashJoinStream`/ `JoinLeftData`) from a different PR. Also fixes: - `.map()` -> `.hash_map()` to match this branch's `JoinLeftData` API - Replace `new_empty_schema_batch()` (undefined in this branch) with an inline `RecordBatch::try_new_with_options` equivalent Co-Authored-By: Claude Sonnet 4.6 --- .../physical-plan/src/joins/hash_join/exec.rs | 1 - .../src/joins/hash_join/stream.rs | 42 +------------------ datafusion/physical-plan/src/joins/utils.rs | 7 +++- 3 files changed, 8 insertions(+), 42 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index ed164b0a0761..3604b6cebdee 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -1749,7 +1749,6 @@ mod tests { None, PartitionMode::CollectLeft, NullEquality::NullEqualsNothing, - false, )?; join.dynamic_filter = Some(HashJoinExecDynamicFilter { filter: Arc::clone(&dynamic_filter), diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index 82f4b3b3f66b..fd299e07e6fd 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -410,7 +410,7 @@ impl HashJoinStream { join_type: JoinType, left_data: &JoinLeftData, ) -> HashJoinStreamState { - if left_data.map().is_empty() + if left_data.hash_map().is_empty() && join_type.empty_build_side_produces_empty_result() { HashJoinStreamState::Completed @@ -606,47 +606,9 @@ impl HashJoinStream { let timer = self.join_metrics.join_time.timer(); - // Null-aware anti join semantics: - // For LeftAnti: output LEFT (build) rows where LEFT.key NOT IN RIGHT.key - // 1. If RIGHT (probe) contains NULL in any batch, no LEFT rows should be output - // 2. LEFT rows with NULL keys should not be output (handled in final stage) - if self.null_aware { - // Mark that we've seen a probe batch with actual rows (probe side is non-empty) - // Only set this if batch has rows - empty batches don't count - // Use shared atomic state so all partitions can see this global information - if state.batch.num_rows() > 0 { - build_side - .left_data - .probe_side_non_empty - .store(true, Ordering::Relaxed); - } - - // Check if probe side (RIGHT) contains NULL - // Since null_aware validation ensures single column join, we only check the first column - let probe_key_column = &state.values[0]; - if probe_key_column.null_count() > 0 { - // Found NULL in probe side - set shared flag to prevent any output - build_side - .left_data - .probe_side_has_null - .store(true, Ordering::Relaxed); - } - - // If probe side has NULL (detected in this or any other partition), return empty result - if build_side - .left_data - .probe_side_has_null - .load(Ordering::Relaxed) - { - timer.done(); - self.state = HashJoinStreamState::FetchProbeBatch; - return Ok(StatefulStreamResult::Continue); - } - } - // If the build side is empty, this stream only reaches ProcessProbeBatch for // join types whose output still depends on probe rows. - let is_empty = build_side.left_data.map().is_empty(); + let is_empty = build_side.left_data.hash_map().is_empty(); if is_empty { // Invariant: state_after_build_ready should have already completed diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 139df78ce199..812a52957aa6 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -1065,7 +1065,12 @@ pub(crate) fn build_batch_empty_build_side( // The remaining joins return right-side rows and nulls for the left side. let num_rows = probe_batch.num_rows(); if schema.fields().is_empty() { - return new_empty_schema_batch(schema, num_rows); + return RecordBatch::try_new_with_options( + Arc::new(schema.clone()), + vec![], + &RecordBatchOptions::new().with_row_count(Some(num_rows)), + ) + .map_err(Into::into); } let columns = column_indices