From 39c22f5acd0afb8276c9611a7a2510be78a0df3e Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 19 Mar 2026 10:27:32 +0800 Subject: [PATCH 01/12] Optimize HashJoinStream for empty build side Implement a staged mini-plan for HashJoinStream to immediately exit when the build side is empty and the join type's result is fully determined. This change avoids unnecessary entry into FetchProbeBatch for Inner, Left, LeftSemi, LeftAnti, LeftMark, and RightSemi joins without filters. Add tests to verify join behavior with empty build: - join_does_not_consume_probe_when_empty_build_fixes_output - join_still_consumes_probe_when_empty_build_needs_probe_rows These use MockExec to distinguish between short-circuiting and necessary probe row consumption. --- .../physical-plan/src/joins/hash_join/exec.rs | 103 ++++++++++++++++++ .../src/joins/hash_join/stream.rs | 32 +++++- 2 files changed, 133 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 038eb96b7b45e..9eb70949c37b3 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -4983,6 +4983,109 @@ mod tests { } } + #[tokio::test] + async fn join_does_not_consume_probe_when_empty_build_fixes_output() { + let left_batch = + build_table_i32(("a1", &vec![]), ("b1", &vec![]), ("c1", &vec![])); + let left_schema = left_batch.schema(); + + let err = exec_err!("bad data error"); + let right = build_table_i32(("a2", &vec![]), ("b1", &vec![]), ("c2", &vec![])); + + let on = vec![( + Arc::new(Column::new_with_schema("b1", &left_schema).unwrap()) as _, + Arc::new(Column::new_with_schema("b1", &right.schema()).unwrap()) as _, + )]; + let schema = right.schema(); + let right_input = + Arc::new(MockExec::new(vec![Ok(right), err], schema).with_use_task(false)); + let left: Arc = TestMemoryExec::try_new_exec( + &[vec![left_batch]], + Arc::clone(&left_schema), + None, + ) + .unwrap(); + + let join_types = vec![ + JoinType::Inner, + JoinType::Left, + JoinType::LeftSemi, + JoinType::LeftAnti, + JoinType::LeftMark, + JoinType::RightSemi, + ]; + + for join_type in join_types { + let join = join( + Arc::clone(&left), + Arc::clone(&right_input) as Arc, + on.clone(), + &join_type, + NullEquality::NullEqualsNothing, + ) + .unwrap(); + let task_ctx = Arc::new(TaskContext::default()); + + let stream = join.execute(0, task_ctx).unwrap(); + let batches = common::collect(stream).await.unwrap(); + + assert!( + batches.is_empty(), + "expected no output batches for {join_type}, got {batches:?}" + ); + } + } + + #[tokio::test] + async fn join_still_consumes_probe_when_empty_build_needs_probe_rows() { + let left_batch = + build_table_i32(("a1", &vec![]), ("b1", &vec![]), ("c1", &vec![])); + let left_schema = left_batch.schema(); + + let err = exec_err!("bad data error"); + let right = build_table_i32(("a2", &vec![]), ("b1", &vec![]), ("c2", &vec![])); + + let on = vec![( + Arc::new(Column::new_with_schema("b1", &left_schema).unwrap()) as _, + Arc::new(Column::new_with_schema("b1", &right.schema()).unwrap()) as _, + )]; + let schema = right.schema(); + let right_input = + Arc::new(MockExec::new(vec![Ok(right), err], schema).with_use_task(false)); + let left: Arc = TestMemoryExec::try_new_exec( + &[vec![left_batch]], + Arc::clone(&left_schema), + None, + ) + .unwrap(); + + let join_types = vec![ + JoinType::Right, + JoinType::Full, + JoinType::RightAnti, + JoinType::RightMark, + ]; + + for join_type in join_types { + let join = join( + Arc::clone(&left), + Arc::clone(&right_input) as Arc, + on.clone(), + &join_type, + NullEquality::NullEqualsNothing, + ) + .unwrap(); + let task_ctx = Arc::new(TaskContext::default()); + + let stream = join.execute(0, task_ctx).unwrap(); + let result_string = common::collect(stream).await.unwrap_err().to_string(); + assert!( + result_string.contains("bad data error"), + "actual: {result_string}" + ); + } + } + #[tokio::test] async fn join_split_batch() { let left = build_table( diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index ab630920184d3..c8c88b519fc8c 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -406,6 +406,21 @@ impl HashJoinStream { } } + /// Returns true when an empty build side fully determines the join result, + /// so the probe side does not need to be consumed. + fn can_skip_probe_on_empty_build_side(&self) -> bool { + self.filter.is_none() + && matches!( + self.join_type, + JoinType::Inner + | JoinType::Left + | JoinType::LeftSemi + | JoinType::LeftAnti + | JoinType::LeftMark + | JoinType::RightSemi + ) + } + /// Separate implementation function that unpins the [`HashJoinStream`] so /// that partial borrows work correctly fn poll_next_impl( @@ -469,7 +484,14 @@ impl HashJoinStream { if let Some(ref mut fut) = self.build_waiter { ready!(fut.get_shared(cx))?; } - self.state = HashJoinStreamState::FetchProbeBatch; + let build_side = self.build_side.try_as_ready()?; + self.state = if build_side.left_data.map().is_empty() + && self.can_skip_probe_on_empty_build_side() + { + HashJoinStreamState::Completed + } else { + HashJoinStreamState::FetchProbeBatch + }; Poll::Ready(Ok(StatefulStreamResult::Continue)) } @@ -540,7 +562,13 @@ impl HashJoinStream { })); self.state = HashJoinStreamState::WaitPartitionBoundsReport; } else { - self.state = HashJoinStreamState::FetchProbeBatch; + self.state = if left_data.map().is_empty() + && self.can_skip_probe_on_empty_build_side() + { + HashJoinStreamState::Completed + } else { + HashJoinStreamState::FetchProbeBatch + }; } self.build_side = BuildSide::Ready(BuildSideReadyState { left_data }); From 8e4c6eb9dc503cf4fa51664525935440e5e6a2be Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 19 Mar 2026 10:32:04 +0800 Subject: [PATCH 02/12] Refactor post-build transition handling Extract duplicate post-build transition logic into next_state_after_build_ready in stream.rs. This centralizes the decision between Completed and FetchProbeBatch in one location and streamlines both collect_build_side and wait_for_partition_bounds_report to use the new helper function. --- .../src/joins/hash_join/stream.rs | 26 +++++++++---------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index c8c88b519fc8c..650aae5cb4730 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -421,6 +421,16 @@ impl HashJoinStream { ) } + /// Returns the next state after the build side has been fully collected + /// and any required build-side coordination has completed. + fn next_state_after_build_ready(&self, left_data: &JoinLeftData) -> HashJoinStreamState { + if left_data.map().is_empty() && self.can_skip_probe_on_empty_build_side() { + HashJoinStreamState::Completed + } else { + HashJoinStreamState::FetchProbeBatch + } + } + /// Separate implementation function that unpins the [`HashJoinStream`] so /// that partial borrows work correctly fn poll_next_impl( @@ -485,13 +495,7 @@ impl HashJoinStream { ready!(fut.get_shared(cx))?; } let build_side = self.build_side.try_as_ready()?; - self.state = if build_side.left_data.map().is_empty() - && self.can_skip_probe_on_empty_build_side() - { - HashJoinStreamState::Completed - } else { - HashJoinStreamState::FetchProbeBatch - }; + self.state = self.next_state_after_build_ready(build_side.left_data.as_ref()); Poll::Ready(Ok(StatefulStreamResult::Continue)) } @@ -562,13 +566,7 @@ impl HashJoinStream { })); self.state = HashJoinStreamState::WaitPartitionBoundsReport; } else { - self.state = if left_data.map().is_empty() - && self.can_skip_probe_on_empty_build_side() - { - HashJoinStreamState::Completed - } else { - HashJoinStreamState::FetchProbeBatch - }; + self.state = self.next_state_after_build_ready(left_data.as_ref()); } self.build_side = BuildSide::Ready(BuildSideReadyState { left_data }); From dcb4cd920d84adb38912a4d62cb66c6cac06287b Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 19 Mar 2026 10:33:50 +0800 Subject: [PATCH 03/12] Refactor join type logic into utils.rs Move the pure JoinType semantic rule to utils.rs, placing it alongside the existing join behavior helpers. Update HashJoinStream in stream.rs to focus solely on its stream-specific execution concern by removing unnecessary logic related to filtering. --- .../src/joins/hash_join/stream.rs | 22 +++++-------------- datafusion/physical-plan/src/joins/utils.rs | 14 ++++++++++++ 2 files changed, 19 insertions(+), 17 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index 650aae5cb4730..523db10c83559 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -42,7 +42,7 @@ use crate::{ BuildProbeJoinMetrics, ColumnIndex, JoinFilter, JoinHashMapType, StatefulStreamResult, adjust_indices_by_join_type, apply_join_filter_to_indices, build_batch_empty_build_side, build_batch_from_indices, - need_produce_result_in_final, + can_skip_probe_on_empty_build_side, need_produce_result_in_final, }, }; @@ -406,25 +406,13 @@ impl HashJoinStream { } } - /// Returns true when an empty build side fully determines the join result, - /// so the probe side does not need to be consumed. - fn can_skip_probe_on_empty_build_side(&self) -> bool { - self.filter.is_none() - && matches!( - self.join_type, - JoinType::Inner - | JoinType::Left - | JoinType::LeftSemi - | JoinType::LeftAnti - | JoinType::LeftMark - | JoinType::RightSemi - ) - } - /// Returns the next state after the build side has been fully collected /// and any required build-side coordination has completed. fn next_state_after_build_ready(&self, left_data: &JoinLeftData) -> HashJoinStreamState { - if left_data.map().is_empty() && self.can_skip_probe_on_empty_build_side() { + if left_data.map().is_empty() + && self.filter.is_none() + && can_skip_probe_on_empty_build_side(self.join_type) + { HashJoinStreamState::Completed } else { HashJoinStreamState::FetchProbeBatch diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 3130134e253d9..ccb09e17f9b7b 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -855,6 +855,20 @@ pub(crate) fn need_produce_result_in_final(join_type: JoinType) -> bool { ) } +/// Returns true when an empty build side fully determines the join result, +/// so the probe side does not need to be consumed. +pub(crate) fn can_skip_probe_on_empty_build_side(join_type: JoinType) -> bool { + matches!( + join_type, + JoinType::Inner + | JoinType::Left + | JoinType::LeftSemi + | JoinType::LeftAnti + | JoinType::LeftMark + | JoinType::RightSemi + ) +} + pub(crate) fn get_final_indices_from_shared_bitmap( shared_bitmap: &SharedBitmapBuilder, join_type: JoinType, From 072cbea6cb4acf7c18d83d1195cc0c87bb727b36 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 19 Mar 2026 10:36:35 +0800 Subject: [PATCH 04/12] Refactor test setup for shared empty build fixtures Extract shared empty-build/probe-error test setup into a new function, empty_build_with_probe_error_inputs(), in exec.rs. Both regression tests now reuse this setup, allowing each test to focus more on the join-type behavior it asserts rather than rebuilding the same fixture. --- .../physical-plan/src/joins/hash_join/exec.rs | 68 +++++++------------ 1 file changed, 26 insertions(+), 42 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 9eb70949c37b3..752678ca050af 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -2275,6 +2275,28 @@ mod tests { ) } + fn empty_build_with_probe_error_inputs() -> (Arc, Arc, JoinOn) { + let left_batch = build_table_i32(("a1", &vec![]), ("b1", &vec![]), ("c1", &vec![])); + let left_schema = left_batch.schema(); + let left: Arc = + TestMemoryExec::try_new_exec(&[vec![left_batch]], left_schema.clone(), None) + .unwrap(); + + let err = exec_err!("bad data error"); + let right_batch = + build_table_i32(("a2", &vec![]), ("b1", &vec![]), ("c2", &vec![])); + let right_schema = right_batch.schema(); + let on = vec![( + Arc::new(Column::new_with_schema("b1", &left_schema).unwrap()) as _, + Arc::new(Column::new_with_schema("b1", &right_schema).unwrap()) as _, + )]; + let right: Arc = Arc::new( + MockExec::new(vec![Ok(right_batch), err], right_schema).with_use_task(false), + ); + + (left, right, on) + } + async fn join_collect( left: Arc, right: Arc, @@ -4985,26 +5007,7 @@ mod tests { #[tokio::test] async fn join_does_not_consume_probe_when_empty_build_fixes_output() { - let left_batch = - build_table_i32(("a1", &vec![]), ("b1", &vec![]), ("c1", &vec![])); - let left_schema = left_batch.schema(); - - let err = exec_err!("bad data error"); - let right = build_table_i32(("a2", &vec![]), ("b1", &vec![]), ("c2", &vec![])); - - let on = vec![( - Arc::new(Column::new_with_schema("b1", &left_schema).unwrap()) as _, - Arc::new(Column::new_with_schema("b1", &right.schema()).unwrap()) as _, - )]; - let schema = right.schema(); - let right_input = - Arc::new(MockExec::new(vec![Ok(right), err], schema).with_use_task(false)); - let left: Arc = TestMemoryExec::try_new_exec( - &[vec![left_batch]], - Arc::clone(&left_schema), - None, - ) - .unwrap(); + let (left, right_input, on) = empty_build_with_probe_error_inputs(); let join_types = vec![ JoinType::Inner, @@ -5018,7 +5021,7 @@ mod tests { for join_type in join_types { let join = join( Arc::clone(&left), - Arc::clone(&right_input) as Arc, + Arc::clone(&right_input), on.clone(), &join_type, NullEquality::NullEqualsNothing, @@ -5038,26 +5041,7 @@ mod tests { #[tokio::test] async fn join_still_consumes_probe_when_empty_build_needs_probe_rows() { - let left_batch = - build_table_i32(("a1", &vec![]), ("b1", &vec![]), ("c1", &vec![])); - let left_schema = left_batch.schema(); - - let err = exec_err!("bad data error"); - let right = build_table_i32(("a2", &vec![]), ("b1", &vec![]), ("c2", &vec![])); - - let on = vec![( - Arc::new(Column::new_with_schema("b1", &left_schema).unwrap()) as _, - Arc::new(Column::new_with_schema("b1", &right.schema()).unwrap()) as _, - )]; - let schema = right.schema(); - let right_input = - Arc::new(MockExec::new(vec![Ok(right), err], schema).with_use_task(false)); - let left: Arc = TestMemoryExec::try_new_exec( - &[vec![left_batch]], - Arc::clone(&left_schema), - None, - ) - .unwrap(); + let (left, right_input, on) = empty_build_with_probe_error_inputs(); let join_types = vec![ JoinType::Right, @@ -5069,7 +5053,7 @@ mod tests { for join_type in join_types { let join = join( Arc::clone(&left), - Arc::clone(&right_input) as Arc, + Arc::clone(&right_input), on.clone(), &join_type, NullEquality::NullEqualsNothing, From 30f2a3e30829d2b8855fe3235359248c7aed594f Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 19 Mar 2026 10:41:29 +0800 Subject: [PATCH 05/12] Add regression test for empty build side in hash join Implement test_hash_join_skips_probe_on_empty_build_after_partition_bounds_report in exec.rs. Ensure that dynamic filtering is enabled by keeping a consumer reference alive. Verify that an Inner join with an empty build side correctly skips probe consumption, even when passing through the WaitPartitionBoundsReport path. --- .../physical-plan/src/joins/hash_join/exec.rs | 37 +++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 752678ca050af..1e3d221bff7b3 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -5594,6 +5594,43 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_hash_join_skips_probe_on_empty_build_after_partition_bounds_report( + ) -> Result<()> { + let task_ctx = Arc::new(TaskContext::default()); + let (left, right, on) = empty_build_with_probe_error_inputs(); + + // Keep an extra consumer reference so execute() enables dynamic filter pushdown + // and enters the WaitPartitionBoundsReport path before deciding whether to poll + // the probe side. + let dynamic_filter = HashJoinExec::create_dynamic_filter(&on); + let dynamic_filter_clone = Arc::clone(&dynamic_filter); + + let mut join = HashJoinExec::try_new( + left, + right, + on, + None, + &JoinType::Inner, + None, + PartitionMode::CollectLeft, + NullEquality::NullEqualsNothing, + false, + )?; + join.dynamic_filter = Some(HashJoinExecDynamicFilter { + filter: dynamic_filter, + build_accumulator: OnceLock::new(), + }); + + let stream = join.execute(0, task_ctx)?; + let batches = common::collect(stream).await?; + assert!(batches.is_empty()); + + dynamic_filter_clone.wait_complete().await; + + Ok(()) + } + #[tokio::test] async fn test_perfect_hash_join_with_negative_numbers() -> Result<()> { let task_ctx = prepare_task_ctx(8192, true); From 626f7710683dc13e9e96bc1c59e3e74826a43156 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 19 Mar 2026 10:59:35 +0800 Subject: [PATCH 06/12] Consolidate empty-build probe tests and join setup Refactor exec.rs by consolidating empty-build probe-behavior tests into `assert_empty_build_probe_behavior(...)` and repeated dynamic filter join setup into `hash_join_with_dynamic_filter(...)`. Maintain existing runtime logic while reducing duplicate test boilerplate and redundant local setup for improved clarity and maintainability. --- .../physical-plan/src/joins/hash_join/exec.rs | 222 ++++++++---------- .../src/joins/hash_join/stream.rs | 5 +- 2 files changed, 104 insertions(+), 123 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 1e3d221bff7b3..74989fd7bd642 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -2275,8 +2275,10 @@ mod tests { ) } - fn empty_build_with_probe_error_inputs() -> (Arc, Arc, JoinOn) { - let left_batch = build_table_i32(("a1", &vec![]), ("b1", &vec![]), ("c1", &vec![])); + fn empty_build_with_probe_error_inputs() + -> (Arc, Arc, JoinOn) { + let left_batch = + build_table_i32(("a1", &vec![]), ("b1", &vec![]), ("c1", &vec![])); let left_schema = left_batch.schema(); let left: Arc = TestMemoryExec::try_new_exec(&[vec![left_batch]], left_schema.clone(), None) @@ -2297,6 +2299,69 @@ mod tests { (left, right, on) } + async fn assert_empty_build_probe_behavior( + join_types: &[JoinType], + expect_probe_error: bool, + ) { + let (left, right, on) = empty_build_with_probe_error_inputs(); + + for join_type in join_types { + let join = join( + Arc::clone(&left), + Arc::clone(&right), + on.clone(), + join_type, + NullEquality::NullEqualsNothing, + ) + .unwrap(); + + let result = common::collect( + join.execute(0, Arc::new(TaskContext::default())).unwrap(), + ) + .await; + + if expect_probe_error { + let result_string = result.unwrap_err().to_string(); + assert!( + result_string.contains("bad data error"), + "actual: {result_string}" + ); + } else { + let batches = result.unwrap(); + assert!( + batches.is_empty(), + "expected no output batches for {join_type}, got {batches:?}" + ); + } + } + } + + fn hash_join_with_dynamic_filter( + left: Arc, + right: Arc, + on: JoinOn, + join_type: JoinType, + ) -> Result<(HashJoinExec, Arc)> { + let dynamic_filter = HashJoinExec::create_dynamic_filter(&on); + let mut join = HashJoinExec::try_new( + left, + right, + on, + None, + &join_type, + None, + PartitionMode::CollectLeft, + NullEquality::NullEqualsNothing, + false, + )?; + join.dynamic_filter = Some(HashJoinExecDynamicFilter { + filter: Arc::clone(&dynamic_filter), + build_accumulator: OnceLock::new(), + }); + + Ok((join, dynamic_filter)) + } + async fn join_collect( left: Arc, right: Arc, @@ -5007,67 +5072,32 @@ mod tests { #[tokio::test] async fn join_does_not_consume_probe_when_empty_build_fixes_output() { - let (left, right_input, on) = empty_build_with_probe_error_inputs(); - - let join_types = vec![ - JoinType::Inner, - JoinType::Left, - JoinType::LeftSemi, - JoinType::LeftAnti, - JoinType::LeftMark, - JoinType::RightSemi, - ]; - - for join_type in join_types { - let join = join( - Arc::clone(&left), - Arc::clone(&right_input), - on.clone(), - &join_type, - NullEquality::NullEqualsNothing, - ) - .unwrap(); - let task_ctx = Arc::new(TaskContext::default()); - - let stream = join.execute(0, task_ctx).unwrap(); - let batches = common::collect(stream).await.unwrap(); - - assert!( - batches.is_empty(), - "expected no output batches for {join_type}, got {batches:?}" - ); - } + assert_empty_build_probe_behavior( + &[ + JoinType::Inner, + JoinType::Left, + JoinType::LeftSemi, + JoinType::LeftAnti, + JoinType::LeftMark, + JoinType::RightSemi, + ], + false, + ) + .await; } #[tokio::test] async fn join_still_consumes_probe_when_empty_build_needs_probe_rows() { - let (left, right_input, on) = empty_build_with_probe_error_inputs(); - - let join_types = vec![ - JoinType::Right, - JoinType::Full, - JoinType::RightAnti, - JoinType::RightMark, - ]; - - for join_type in join_types { - let join = join( - Arc::clone(&left), - Arc::clone(&right_input), - on.clone(), - &join_type, - NullEquality::NullEqualsNothing, - ) - .unwrap(); - let task_ctx = Arc::new(TaskContext::default()); - - let stream = join.execute(0, task_ctx).unwrap(); - let result_string = common::collect(stream).await.unwrap_err().to_string(); - assert!( - result_string.contains("bad data error"), - "actual: {result_string}" - ); - } + assert_empty_build_probe_behavior( + &[ + JoinType::Right, + JoinType::Full, + JoinType::RightAnti, + JoinType::RightMark, + ], + true, + ) + .await; } #[tokio::test] @@ -5513,26 +5543,8 @@ mod tests { Arc::new(Column::new_with_schema("b1", &right.schema())?) as _, )]; - // Create a dynamic filter manually - let dynamic_filter = HashJoinExec::create_dynamic_filter(&on); - let dynamic_filter_clone = Arc::clone(&dynamic_filter); - - // Create HashJoinExec with the dynamic filter - let mut join = HashJoinExec::try_new( - left, - right, - on, - None, - &JoinType::Inner, - None, - PartitionMode::CollectLeft, - NullEquality::NullEqualsNothing, - false, - )?; - join.dynamic_filter = Some(HashJoinExecDynamicFilter { - filter: dynamic_filter, - build_accumulator: OnceLock::new(), - }); + let (join, dynamic_filter) = + hash_join_with_dynamic_filter(left, right, on, JoinType::Inner)?; // Execute the join let stream = join.execute(0, task_ctx)?; @@ -5540,7 +5552,7 @@ mod tests { // After the join completes, the dynamic filter should be marked as complete // wait_complete() should return immediately - dynamic_filter_clone.wait_complete().await; + dynamic_filter.wait_complete().await; Ok(()) } @@ -5562,26 +5574,8 @@ mod tests { Arc::new(Column::new_with_schema("b1", &right.schema())?) as _, )]; - // Create a dynamic filter manually - let dynamic_filter = HashJoinExec::create_dynamic_filter(&on); - let dynamic_filter_clone = Arc::clone(&dynamic_filter); - - // Create HashJoinExec with the dynamic filter - let mut join = HashJoinExec::try_new( - left, - right, - on, - None, - &JoinType::Inner, - None, - PartitionMode::CollectLeft, - NullEquality::NullEqualsNothing, - false, - )?; - join.dynamic_filter = Some(HashJoinExecDynamicFilter { - filter: dynamic_filter, - build_accumulator: OnceLock::new(), - }); + let (join, dynamic_filter) = + hash_join_with_dynamic_filter(left, right, on, JoinType::Inner)?; // Execute the join let stream = join.execute(0, task_ctx)?; @@ -5589,44 +5583,28 @@ mod tests { // Even with empty build side, the dynamic filter should be marked as complete // wait_complete() should return immediately - dynamic_filter_clone.wait_complete().await; + dynamic_filter.wait_complete().await; Ok(()) } #[tokio::test] - async fn test_hash_join_skips_probe_on_empty_build_after_partition_bounds_report( - ) -> Result<()> { + async fn test_hash_join_skips_probe_on_empty_build_after_partition_bounds_report() + -> Result<()> { let task_ctx = Arc::new(TaskContext::default()); let (left, right, on) = empty_build_with_probe_error_inputs(); // Keep an extra consumer reference so execute() enables dynamic filter pushdown // and enters the WaitPartitionBoundsReport path before deciding whether to poll // the probe side. - let dynamic_filter = HashJoinExec::create_dynamic_filter(&on); - let dynamic_filter_clone = Arc::clone(&dynamic_filter); - - let mut join = HashJoinExec::try_new( - left, - right, - on, - None, - &JoinType::Inner, - None, - PartitionMode::CollectLeft, - NullEquality::NullEqualsNothing, - false, - )?; - join.dynamic_filter = Some(HashJoinExecDynamicFilter { - filter: dynamic_filter, - build_accumulator: OnceLock::new(), - }); + let (join, dynamic_filter) = + hash_join_with_dynamic_filter(left, right, on, JoinType::Inner)?; let stream = join.execute(0, task_ctx)?; let batches = common::collect(stream).await?; assert!(batches.is_empty()); - dynamic_filter_clone.wait_complete().await; + dynamic_filter.wait_complete().await; Ok(()) } diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index 523db10c83559..4609e198666e4 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -408,7 +408,10 @@ impl HashJoinStream { /// Returns the next state after the build side has been fully collected /// and any required build-side coordination has completed. - fn next_state_after_build_ready(&self, left_data: &JoinLeftData) -> HashJoinStreamState { + fn next_state_after_build_ready( + &self, + left_data: &JoinLeftData, + ) -> HashJoinStreamState { if left_data.map().is_empty() && self.filter.is_none() && can_skip_probe_on_empty_build_side(self.join_type) From 45ee6d17929fb614a49839322a4f0697faaaa0cb Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 19 Mar 2026 22:40:11 +0800 Subject: [PATCH 07/12] Refactor empty build side handling Remove duplication by introducing a shared helper, empty_build_side_produces_empty_result, in utils.rs. Update build_batch_empty_build_side to use this helper directly, ensuring alignment in the short-circuit and batch-construction logic within the hash join state transition in stream.rs. --- .../src/joins/hash_join/stream.rs | 4 +- datafusion/physical-plan/src/joins/utils.rs | 76 +++++++++---------- 2 files changed, 37 insertions(+), 43 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index 4609e198666e4..1d11d4d0601d4 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -42,7 +42,7 @@ use crate::{ BuildProbeJoinMetrics, ColumnIndex, JoinFilter, JoinHashMapType, StatefulStreamResult, adjust_indices_by_join_type, apply_join_filter_to_indices, build_batch_empty_build_side, build_batch_from_indices, - can_skip_probe_on_empty_build_side, need_produce_result_in_final, + empty_build_side_produces_empty_result, need_produce_result_in_final, }, }; @@ -414,7 +414,7 @@ impl HashJoinStream { ) -> HashJoinStreamState { if left_data.map().is_empty() && self.filter.is_none() - && can_skip_probe_on_empty_build_side(self.join_type) + && empty_build_side_produces_empty_result(self.join_type) { HashJoinStreamState::Completed } else { diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index ccb09e17f9b7b..aadb9715787e9 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -855,9 +855,11 @@ pub(crate) fn need_produce_result_in_final(join_type: JoinType) -> bool { ) } -/// Returns true when an empty build side fully determines the join result, -/// so the probe side does not need to be consumed. -pub(crate) fn can_skip_probe_on_empty_build_side(join_type: JoinType) -> bool { +/// Returns true when an empty build side necessarily produces an empty result. +/// +/// This is the shared source of truth for both state-machine short-circuiting +/// and `build_batch_empty_build_side`. +pub(crate) fn empty_build_side_produces_empty_result(join_type: JoinType) -> bool { matches!( join_type, JoinType::Inner @@ -1074,46 +1076,38 @@ pub(crate) fn build_batch_empty_build_side( column_indices: &[ColumnIndex], join_type: JoinType, ) -> Result { - match join_type { - // these join types only return data if the left side is not empty, so we return an - // empty RecordBatch - JoinType::Inner - | JoinType::Left - | JoinType::LeftSemi - | JoinType::RightSemi - | JoinType::LeftAnti - | JoinType::LeftMark => Ok(RecordBatch::new_empty(Arc::new(schema.clone()))), - - // the remaining joins will return data for the right columns and null for the left ones - JoinType::Right | JoinType::Full | JoinType::RightAnti | JoinType::RightMark => { - let num_rows = probe_batch.num_rows(); - if schema.fields().is_empty() { - return new_empty_schema_batch(schema, num_rows); - } - let mut columns: Vec> = - Vec::with_capacity(schema.fields().len()); - - for column_index in column_indices { - let array = match column_index.side { - // left -> null array - JoinSide::Left => new_null_array( - build_batch.column(column_index.index).data_type(), - num_rows, - ), - // right -> respective right array - JoinSide::Right => Arc::clone(probe_batch.column(column_index.index)), - // right mark -> unset boolean array as there are no matches on the left side - JoinSide::None => Arc::new(BooleanArray::new( - BooleanBuffer::new_unset(num_rows), - None, - )), - }; - - columns.push(array); - } + if empty_build_side_produces_empty_result(join_type) { + // These join types only return data if the left side is not empty. + Ok(RecordBatch::new_empty(Arc::new(schema.clone()))) + } else { + // The remaining joins return right-side rows and nulls for the left side. + let num_rows = probe_batch.num_rows(); + if schema.fields().is_empty() { + return new_empty_schema_batch(schema, num_rows); + } + let mut columns: Vec> = + Vec::with_capacity(schema.fields().len()); + + for column_index in column_indices { + let array = match column_index.side { + // left -> null array + JoinSide::Left => new_null_array( + build_batch.column(column_index.index).data_type(), + num_rows, + ), + // right -> respective right array + JoinSide::Right => Arc::clone(probe_batch.column(column_index.index)), + // right mark -> unset boolean array as there are no matches on the left side + JoinSide::None => Arc::new(BooleanArray::new( + BooleanBuffer::new_unset(num_rows), + None, + )), + }; - Ok(RecordBatch::try_new(Arc::new(schema.clone()), columns)?) + columns.push(array); } + + Ok(RecordBatch::try_new(Arc::new(schema.clone()), columns)?) } } From 12a52ef0d65cb43c58b61bbcd5f97d335f2a7f2b Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Fri, 20 Mar 2026 11:19:40 +0800 Subject: [PATCH 08/12] Simplify hash-join state machine and batch construction Refactor stream.rs and utils.rs to streamline the hash-join state machine. Compute the post-build state directly from inputs, eliminating unnecessary indirection. Update the empty-build-side batch construction to utilize early returns and iterator-based collection for columns, replacing manual Vec setup and push logic. --- .../src/joins/hash_join/stream.rs | 23 +++++--- datafusion/physical-plan/src/joins/utils.rs | 54 +++++++++---------- 2 files changed, 41 insertions(+), 36 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index 1d11d4d0601d4..7f0274e619c26 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -408,13 +408,14 @@ impl HashJoinStream { /// Returns the next state after the build side has been fully collected /// and any required build-side coordination has completed. - fn next_state_after_build_ready( - &self, + fn state_after_build_ready( + has_filter: bool, + join_type: JoinType, left_data: &JoinLeftData, ) -> HashJoinStreamState { - if left_data.map().is_empty() - && self.filter.is_none() - && empty_build_side_produces_empty_result(self.join_type) + if !has_filter + && left_data.map().is_empty() + && empty_build_side_produces_empty_result(join_type) { HashJoinStreamState::Completed } else { @@ -486,7 +487,11 @@ impl HashJoinStream { ready!(fut.get_shared(cx))?; } let build_side = self.build_side.try_as_ready()?; - self.state = self.next_state_after_build_ready(build_side.left_data.as_ref()); + self.state = Self::state_after_build_ready( + self.filter.is_some(), + self.join_type, + build_side.left_data.as_ref(), + ); Poll::Ready(Ok(StatefulStreamResult::Continue)) } @@ -557,7 +562,11 @@ impl HashJoinStream { })); self.state = HashJoinStreamState::WaitPartitionBoundsReport; } else { - self.state = self.next_state_after_build_ready(left_data.as_ref()); + self.state = Self::state_after_build_ready( + self.filter.is_some(), + self.join_type, + left_data.as_ref(), + ); } self.build_side = BuildSide::Ready(BuildSideReadyState { left_data }); diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index aadb9715787e9..540fa3c373aa4 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -1078,37 +1078,33 @@ pub(crate) fn build_batch_empty_build_side( ) -> Result { if empty_build_side_produces_empty_result(join_type) { // These join types only return data if the left side is not empty. - Ok(RecordBatch::new_empty(Arc::new(schema.clone()))) - } else { - // The remaining joins return right-side rows and nulls for the left side. - let num_rows = probe_batch.num_rows(); - if schema.fields().is_empty() { - return new_empty_schema_batch(schema, num_rows); - } - let mut columns: Vec> = - Vec::with_capacity(schema.fields().len()); - - for column_index in column_indices { - let array = match column_index.side { - // left -> null array - JoinSide::Left => new_null_array( - build_batch.column(column_index.index).data_type(), - num_rows, - ), - // right -> respective right array - JoinSide::Right => Arc::clone(probe_batch.column(column_index.index)), - // right mark -> unset boolean array as there are no matches on the left side - JoinSide::None => Arc::new(BooleanArray::new( - BooleanBuffer::new_unset(num_rows), - None, - )), - }; - - columns.push(array); - } + return Ok(RecordBatch::new_empty(Arc::new(schema.clone()))); + } - Ok(RecordBatch::try_new(Arc::new(schema.clone()), columns)?) + // The remaining joins return right-side rows and nulls for the left side. + let num_rows = probe_batch.num_rows(); + if schema.fields().is_empty() { + return new_empty_schema_batch(schema, num_rows); } + + let columns = column_indices + .iter() + .map(|column_index| match column_index.side { + // left -> null array + JoinSide::Left => new_null_array( + build_batch.column(column_index.index).data_type(), + num_rows, + ), + // right -> respective right array + JoinSide::Right => Arc::clone(probe_batch.column(column_index.index)), + // right mark -> unset boolean array as there are no matches on the left side + JoinSide::None => { + Arc::new(BooleanArray::new(BooleanBuffer::new_unset(num_rows), None)) + } + }) + .collect(); + + Ok(RecordBatch::try_new(Arc::new(schema.clone()), columns)?) } /// The input is the matched indices for left and right and From 48cdea9f1eef91dbbc20be214e8df584214ff659 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Fri, 20 Mar 2026 13:06:02 +0800 Subject: [PATCH 09/12] clippy fix --- datafusion/physical-plan/src/joins/hash_join/exec.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 74989fd7bd642..d75d40511f34a 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -2280,9 +2280,12 @@ mod tests { let left_batch = build_table_i32(("a1", &vec![]), ("b1", &vec![]), ("c1", &vec![])); let left_schema = left_batch.schema(); - let left: Arc = - TestMemoryExec::try_new_exec(&[vec![left_batch]], left_schema.clone(), None) - .unwrap(); + let left: Arc = TestMemoryExec::try_new_exec( + &[vec![left_batch]], + Arc::clone(&left_schema), + None, + ) + .unwrap(); let err = exec_err!("bad data error"); let right_batch = From 4d6367b285d60c3e1269bc134ef9d23a5bb87fc7 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 2 Apr 2026 14:37:13 +0800 Subject: [PATCH 10/12] Refactor join type handling for empty builds Move empty-build predicate to JoinType. Implement JoinType::empty_build_side_produces_empty_result method and update relevant call sites. Stream state machine now directly uses this method, simplifying logic. Remove old utility predicate and update tests, including new filtered regression tests. --- datafusion/common/src/join_type.rs | 14 ++++ .../physical-plan/src/joins/hash_join/exec.rs | 64 ++++++++++++++++--- .../src/joins/hash_join/stream.rs | 22 ++----- datafusion/physical-plan/src/joins/utils.rs | 18 +----- 4 files changed, 78 insertions(+), 40 deletions(-) diff --git a/datafusion/common/src/join_type.rs b/datafusion/common/src/join_type.rs index 8855e993f2bc7..d517844db48b4 100644 --- a/datafusion/common/src/join_type.rs +++ b/datafusion/common/src/join_type.rs @@ -142,6 +142,20 @@ impl JoinType { | JoinType::RightMark ) } + + /// Returns true when an empty build side necessarily produces an empty + /// result for this join type. + pub fn empty_build_side_produces_empty_result(self) -> bool { + matches!( + self, + JoinType::Inner + | JoinType::Left + | JoinType::LeftSemi + | JoinType::LeftAnti + | JoinType::LeftMark + | JoinType::RightSemi + ) + } } impl Display for JoinType { diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index d75d40511f34a..becadaa9864a1 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -2305,18 +2305,32 @@ mod tests { async fn assert_empty_build_probe_behavior( join_types: &[JoinType], expect_probe_error: bool, + with_filter: bool, ) { let (left, right, on) = empty_build_with_probe_error_inputs(); + let filter = prepare_join_filter(); for join_type in join_types { - let join = join( - Arc::clone(&left), - Arc::clone(&right), - on.clone(), - join_type, - NullEquality::NullEqualsNothing, - ) - .unwrap(); + let join = if with_filter { + join_with_filter( + Arc::clone(&left), + Arc::clone(&right), + on.clone(), + filter.clone(), + join_type, + NullEquality::NullEqualsNothing, + ) + .unwrap() + } else { + join( + Arc::clone(&left), + Arc::clone(&right), + on.clone(), + join_type, + NullEquality::NullEqualsNothing, + ) + .unwrap() + }; let result = common::collect( join.execute(0, Arc::new(TaskContext::default())).unwrap(), @@ -5085,6 +5099,24 @@ mod tests { JoinType::RightSemi, ], false, + false, + ) + .await; + } + + #[tokio::test] + async fn join_does_not_consume_probe_when_empty_build_fixes_output_with_filter() { + assert_empty_build_probe_behavior( + &[ + JoinType::Inner, + JoinType::Left, + JoinType::LeftSemi, + JoinType::LeftAnti, + JoinType::LeftMark, + JoinType::RightSemi, + ], + false, + true, ) .await; } @@ -5099,6 +5131,22 @@ mod tests { JoinType::RightMark, ], true, + false, + ) + .await; + } + + #[tokio::test] + async fn join_still_consumes_probe_when_empty_build_needs_probe_rows_with_filter() { + assert_empty_build_probe_behavior( + &[ + JoinType::Right, + JoinType::Full, + JoinType::RightAnti, + JoinType::RightMark, + ], + true, + true, ) .await; } diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index 7f0274e619c26..ce174494f3f67 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -42,7 +42,7 @@ use crate::{ BuildProbeJoinMetrics, ColumnIndex, JoinFilter, JoinHashMapType, StatefulStreamResult, adjust_indices_by_join_type, apply_join_filter_to_indices, build_batch_empty_build_side, build_batch_from_indices, - empty_build_side_produces_empty_result, need_produce_result_in_final, + need_produce_result_in_final, }, }; @@ -409,13 +409,11 @@ impl HashJoinStream { /// Returns the next state after the build side has been fully collected /// and any required build-side coordination has completed. fn state_after_build_ready( - has_filter: bool, join_type: JoinType, left_data: &JoinLeftData, ) -> HashJoinStreamState { - if !has_filter - && left_data.map().is_empty() - && empty_build_side_produces_empty_result(join_type) + if left_data.map().is_empty() + && join_type.empty_build_side_produces_empty_result() { HashJoinStreamState::Completed } else { @@ -487,11 +485,8 @@ impl HashJoinStream { ready!(fut.get_shared(cx))?; } let build_side = self.build_side.try_as_ready()?; - self.state = Self::state_after_build_ready( - self.filter.is_some(), - self.join_type, - build_side.left_data.as_ref(), - ); + self.state = + Self::state_after_build_ready(self.join_type, build_side.left_data.as_ref()); Poll::Ready(Ok(StatefulStreamResult::Continue)) } @@ -562,11 +557,8 @@ impl HashJoinStream { })); self.state = HashJoinStreamState::WaitPartitionBoundsReport; } else { - self.state = Self::state_after_build_ready( - self.filter.is_some(), - self.join_type, - left_data.as_ref(), - ); + self.state = + Self::state_after_build_ready(self.join_type, left_data.as_ref()); } self.build_side = BuildSide::Ready(BuildSideReadyState { left_data }); diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 540fa3c373aa4..d93f88519e2f3 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -855,22 +855,6 @@ pub(crate) fn need_produce_result_in_final(join_type: JoinType) -> bool { ) } -/// Returns true when an empty build side necessarily produces an empty result. -/// -/// This is the shared source of truth for both state-machine short-circuiting -/// and `build_batch_empty_build_side`. -pub(crate) fn empty_build_side_produces_empty_result(join_type: JoinType) -> bool { - matches!( - join_type, - JoinType::Inner - | JoinType::Left - | JoinType::LeftSemi - | JoinType::LeftAnti - | JoinType::LeftMark - | JoinType::RightSemi - ) -} - pub(crate) fn get_final_indices_from_shared_bitmap( shared_bitmap: &SharedBitmapBuilder, join_type: JoinType, @@ -1076,7 +1060,7 @@ pub(crate) fn build_batch_empty_build_side( column_indices: &[ColumnIndex], join_type: JoinType, ) -> Result { - if empty_build_side_produces_empty_result(join_type) { + if join_type.empty_build_side_produces_empty_result() { // These join types only return data if the left side is not empty. return Ok(RecordBatch::new_empty(Arc::new(schema.clone()))); } From c48f2120173cf000459d5d7bb6ffd669f01ee870 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 2 Apr 2026 14:40:26 +0800 Subject: [PATCH 11/12] Refactor empty-build handling in join processing Keep build_batch_empty_build_side as the empty-build output constructor. Removed redundant gating in probe processing for join types that should terminate before probe polling. Updated the empty-build branch in stream.rs:661 to run on is_empty unconditionally, along with a debug_assert ensuring this path is only for join types that don't produce empty results by semantics. This simplifies the flow and improves performance in probe-dependent join types. --- datafusion/physical-plan/src/joins/hash_join/stream.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index ce174494f3f67..3bc905099c558 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -661,10 +661,12 @@ impl HashJoinStream { } } - // if the left side is empty, we can skip the (potentially expensive) join operation + // If the build side is empty, this stream only reaches ProcessProbeBatch for + // join types whose output still depends on probe rows. let is_empty = build_side.left_data.map().is_empty(); - if is_empty && self.filter.is_none() { + if is_empty { + debug_assert!(!self.join_type.empty_build_side_produces_empty_result()); let result = build_batch_empty_build_side( &self.schema, build_side.left_data.batch(), From f0681be2e1292b1a6f3f6f9c05f9c4e162cedf51 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Sat, 4 Apr 2026 14:18:33 +0800 Subject: [PATCH 12/12] Add debug assertion for empty build side join types in HashJoinStream --- datafusion/physical-plan/src/joins/hash_join/stream.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index 3bc905099c558..1004fba3d4f45 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -666,6 +666,8 @@ impl HashJoinStream { let is_empty = build_side.left_data.map().is_empty(); if is_empty { + // Invariant: state_after_build_ready should have already completed + // join types whose result is fixed to empty when the build side is empty. debug_assert!(!self.join_type.empty_build_side_produces_empty_result()); let result = build_batch_empty_build_side( &self.schema,