From 8509d2f682e104e1fbda7111af86c5d9008fcbe9 Mon Sep 17 00:00:00 2001 From: Ratul Dawar Date: Wed, 15 Apr 2026 03:12:00 +0530 Subject: [PATCH 1/7] fix: prevent hash join deadlock when dynamic filtering is enabled Empty partitions were short-circuiting to Completed state without reporting to the shared accumulator. This caused other partitions to wait indefinitely at the barrier. This fix ensures that even empty partitions proceed to report their data if a build accumulator is present. Made-with: Cursor --- .../src/joins/hash_join/stream.rs | 36 ++++++++++++------- 1 file changed, 24 insertions(+), 12 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index 1004fba3d4f45..d4f837e3e7887 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -408,13 +408,13 @@ impl HashJoinStream { /// Returns the next state after the build side has been fully collected /// and any required build-side coordination has completed. - fn state_after_build_ready( - join_type: JoinType, - left_data: &JoinLeftData, - ) -> HashJoinStreamState { - if left_data.map().is_empty() - && join_type.empty_build_side_produces_empty_result() - { + fn state_after_build_ready(&self, left_data: &JoinLeftData) -> HashJoinStreamState { + let is_empty = left_data.map().is_empty(); + let produces_empty = self.join_type.empty_build_side_produces_empty_result(); + let coordinated = self.build_accumulator.is_some(); + let is_initial_collect = matches!(self.state, HashJoinStreamState::WaitBuildSide); + + if is_empty && produces_empty && (!coordinated || !is_initial_collect) { HashJoinStreamState::Completed } else { HashJoinStreamState::FetchProbeBatch @@ -485,8 +485,7 @@ impl HashJoinStream { ready!(fut.get_shared(cx))?; } let build_side = self.build_side.try_as_ready()?; - self.state = - Self::state_after_build_ready(self.join_type, build_side.left_data.as_ref()); + self.state = self.state_after_build_ready(build_side.left_data.as_ref()); Poll::Ready(Ok(StatefulStreamResult::Continue)) } @@ -557,8 +556,7 @@ impl HashJoinStream { })); self.state = HashJoinStreamState::WaitPartitionBoundsReport; } else { - self.state = - Self::state_after_build_ready(self.join_type, left_data.as_ref()); + self.state = self.state_after_build_ready(left_data.as_ref()); } self.build_side = BuildSide::Ready(BuildSideReadyState { left_data }); @@ -668,7 +666,21 @@ impl HashJoinStream { if is_empty { // Invariant: state_after_build_ready should have already completed // join types whose result is fixed to empty when the build side is empty. - debug_assert!(!self.join_type.empty_build_side_produces_empty_result()); + // + // However, when dynamic filtering is enabled, we skip the short-circuit + // in state_after_build_ready to ensure all partitions report their + // build-side data. In this case, we might reach here with an empty + // build side even for join types that produce empty results. + if self.build_accumulator.is_none() { + debug_assert!(!self.join_type.empty_build_side_produces_empty_result()); + } + + if self.join_type.empty_build_side_produces_empty_result() { + timer.done(); + self.state = HashJoinStreamState::FetchProbeBatch; + return Ok(StatefulStreamResult::Continue); + } + let result = build_batch_empty_build_side( &self.schema, build_side.left_data.batch(), From 3c741c0fa0e62086835535126a445fb25f886e48 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 15 Apr 2026 09:00:34 -0500 Subject: [PATCH 2/7] fix: remove barrier from SharedBuildAccumulator to resolve Q18 deadlock The prior fix (disabling the #21068 short-circuit when a build accumulator is present) addressed the symptom but not the cause. The actual deadlock is an interaction between tokio::sync::Barrier and RepartitionExec's global backpressure gate in distributor_channels.rs: - SharedBuildAccumulator parks every partition on a Barrier sized to the total partition count so the leader can publish the dynamic filter atomically. - RepartitionExec's channels use a global "all channels non-empty -> gate closed" rule. A receiver dropped while its channel still has buffered data does NOT free a gate slot, because empty_channels is only decremented when a channel drains to empty before being closed. - When N-k partitions reach the barrier and drop their receivers with residual data, the gate counter stays pinned. The remaining k partitions then starve on the RepartitionExec that feeds their build side, never arrive at the barrier, and the N-k parked partitions block forever. TPCH Q18 at DATAFUSION_EXECUTION_TARGET_PARTITIONS=24 triggered this: the inner subquery produces ~57 distinct l_orderkey values, leaving 3 of 24 hash buckets empty. Issue #21625. Fix: replace the Barrier with an AtomicUsize counter. report_build_data stores the partition's data under the existing Mutex, then fetch_sub(1, AcqRel); the caller that brings the counter to zero runs the leader work (build filter, update, mark_complete). All other partitions return immediately and start probing. No one parks, so no one holds the gate closed, so the pipeline drains normally. Trade-off: a small number of probe batches may evaluate against the placeholder lit(true) filter before the leader publishes the real one. DynamicFilterPhysicalExpr is explicitly designed to be read live, and the filter is an optimization not a correctness constraint, so this is safe. The #21068 short-circuit optimization is preserved because state_after_build_ready's guard from the prior fix is no longer needed. Dead code removed: HashJoinStreamState::WaitPartitionBoundsReport, HashJoinStream::build_waiter, wait_for_partition_bounds_report, and the process_probe_batch empty-build fallback that existed only to satisfy the relaxed short-circuit guard. Verified: - Q18 at 24 partitions with dynamic filter pushdown on: 57 rows in ~0.9s (previously hung indefinitely). - cargo test -p datafusion-physical-plan joins::hash_join: 376 passed. - cargo clippy -p datafusion-physical-plan --all-targets -- -D warnings: clean. Closes #21625. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../src/joins/hash_join/shared_bounds.rs | 28 +++++-- .../src/joins/hash_join/stream.rs | 77 ++++--------------- 2 files changed, 39 insertions(+), 66 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs index f32dc7fa80268..7060e60202fa6 100644 --- a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs +++ b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs @@ -20,6 +20,7 @@ use std::fmt; use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; use crate::ExecutionPlan; use crate::ExecutionPlanProperties; @@ -42,7 +43,6 @@ use datafusion_physical_expr::expressions::{ use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef, ScalarFunctionExpr}; use parking_lot::Mutex; -use tokio::sync::Barrier; /// Represents the minimum and maximum values for a specific column. /// Used in dynamic filter pushdown to establish value boundaries. @@ -216,7 +216,18 @@ fn create_bounds_predicate( pub(crate) struct SharedBuildAccumulator { /// Build-side data protected by a single mutex to avoid ordering concerns inner: Mutex, - barrier: Barrier, + /// Number of `report_build_data` calls still expected before the dynamic + /// filter can be finalized. Each call decrements the counter; the caller + /// that brings it to zero is responsible for publishing the filter. + /// + /// This intentionally does NOT block partitions on a barrier: callers to + /// `report_build_data` return as soon as they have stored their build-side + /// slice. Blocking all partitions until every partition arrived used to + /// cause a deadlock with RepartitionExec's global-gate backpressure, because + /// a partition parked on the barrier held its build-side receiver alive + /// long enough for the shared gate to close across unrelated hash join + /// instances. See issue #21625 for the TPCH Q18 reproducer. + remaining: AtomicUsize, /// Dynamic filter for pushdown to probe side dynamic_filter: Arc, /// Right side join expressions needed for creating filter expressions @@ -337,7 +348,7 @@ impl SharedBuildAccumulator { Self { inner: Mutex::new(mode_data), - barrier: Barrier::new(expected_calls), + remaining: AtomicUsize::new(expected_calls), dynamic_filter, on_right, repartition_random_state, @@ -357,7 +368,7 @@ impl SharedBuildAccumulator { /// /// # Returns /// * `Result<()>` - Ok if successful, Err if filter update failed or mode mismatch - pub(crate) async fn report_build_data(&self, data: PartitionBuildData) -> Result<()> { + pub(crate) fn report_build_data(&self, data: PartitionBuildData) -> Result<()> { // Store data in the accumulator { let mut guard = self.inner.lock(); @@ -393,8 +404,13 @@ impl SharedBuildAccumulator { } } - // Wait for all partitions to report - if self.barrier.wait().await.is_leader() { + // Decrement the expected-calls counter. The caller that brings it to + // zero is the leader and publishes the final filter expression. + // + // `AcqRel` pairs with the other reporters' stores into `inner` above: + // the leader's subsequent read of `inner` happens-after every + // store by any partition that decremented before it. + if self.remaining.fetch_sub(1, Ordering::AcqRel) == 1 { // All partitions have reported, so we can create and update the filter let inner = self.inner.lock(); diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index d4f837e3e7887..0a35a4441e3f5 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -125,8 +125,6 @@ impl BuildSide { pub(super) enum HashJoinStreamState { /// Initial state for HashJoinStream indicating that build-side data not collected yet WaitBuildSide, - /// Waiting for bounds to be reported by all partitions - WaitPartitionBoundsReport, /// Indicates that build-side has been collected, and stream is ready for fetching probe-side FetchProbeBatch, /// Indicates that non-empty batch has been fetched from probe-side, and is ready to be processed @@ -216,9 +214,6 @@ pub(super) struct HashJoinStream { right_side_ordered: bool, /// Shared build accumulator for coordinating dynamic filter updates (collects hash maps and/or bounds, optional) build_accumulator: Option>, - /// Optional future to signal when build information has been reported by all partitions - /// and the dynamic filter has been updated - build_waiter: Option>, /// Partitioning mode to use mode: PartitionMode, /// Output buffer for coalescing small batches into larger ones with optional fetch limit. @@ -399,22 +394,20 @@ impl HashJoinStream { build_indices_buffer: Vec::with_capacity(batch_size), right_side_ordered, build_accumulator, - build_waiter: None, mode, output_buffer, null_aware, } } - /// Returns the next state after the build side has been fully collected - /// and any required build-side coordination has completed. + /// Returns the next state after the build side has been fully collected. + /// + /// Short-circuits to `Completed` when the build side is empty and the + /// join semantics guarantee an empty result. fn state_after_build_ready(&self, left_data: &JoinLeftData) -> HashJoinStreamState { - let is_empty = left_data.map().is_empty(); - let produces_empty = self.join_type.empty_build_side_produces_empty_result(); - let coordinated = self.build_accumulator.is_some(); - let is_initial_collect = matches!(self.state, HashJoinStreamState::WaitBuildSide); - - if is_empty && produces_empty && (!coordinated || !is_initial_collect) { + if left_data.map().is_empty() + && self.join_type.empty_build_side_produces_empty_result() + { HashJoinStreamState::Completed } else { HashJoinStreamState::FetchProbeBatch @@ -445,9 +438,6 @@ impl HashJoinStream { HashJoinStreamState::WaitBuildSide => { handle_state!(ready!(self.collect_build_side(cx))) } - HashJoinStreamState::WaitPartitionBoundsReport => { - handle_state!(ready!(self.wait_for_partition_bounds_report(cx))) - } HashJoinStreamState::FetchProbeBatch => { handle_state!(ready!(self.fetch_probe_batch(cx))) } @@ -468,27 +458,6 @@ impl HashJoinStream { } } - /// Optional step to wait until build-side information (hash maps or bounds) has been reported by all partitions. - /// This state is only entered if a build accumulator is present. - /// - /// ## Why wait? - /// - /// The dynamic filter is only built once all partitions have reported their information (hash maps or bounds). - /// If we do not wait here, the probe-side scan may start before the filter is ready. - /// This can lead to the probe-side scan missing the opportunity to apply the filter - /// and skip reading unnecessary data. - fn wait_for_partition_bounds_report( - &mut self, - cx: &mut std::task::Context<'_>, - ) -> Poll>>> { - if let Some(ref mut fut) = self.build_waiter { - ready!(fut.get_shared(cx))?; - } - let build_side = self.build_side.try_as_ready()?; - self.state = self.state_after_build_ready(build_side.left_data.as_ref()); - Poll::Ready(Ok(StatefulStreamResult::Continue)) - } - /// Collects build-side data by polling `OnceFut` future from initialized build-side /// /// Updates build-side to `Ready`, and state to `FetchProbeSide` @@ -516,8 +485,6 @@ impl HashJoinStream { // Report hash maps (Partitioned mode) or bounds (CollectLeft mode) to the accumulator // which will handle synchronization and filter updates if let Some(ref build_accumulator) = self.build_accumulator { - let build_accumulator = Arc::clone(build_accumulator); - let left_side_partition_id = match self.mode { PartitionMode::Partitioned => self.partition, PartitionMode::CollectLeft => 0, @@ -551,14 +518,17 @@ impl HashJoinStream { ), }; - self.build_waiter = Some(OnceFut::new(async move { - build_accumulator.report_build_data(build_data).await - })); - self.state = HashJoinStreamState::WaitPartitionBoundsReport; - } else { - self.state = self.state_after_build_ready(left_data.as_ref()); + // Report build-side data without blocking this stream. The + // accumulator publishes the dynamic filter when the last + // partition reports; all other partitions return immediately and + // start probing. Any probe batches read before the filter is + // finalized simply see the placeholder filter, which is safe — + // the dynamic filter is an optimization, not a correctness + // constraint. + build_accumulator.report_build_data(build_data)?; } + self.state = self.state_after_build_ready(left_data.as_ref()); self.build_side = BuildSide::Ready(BuildSideReadyState { left_data }); Poll::Ready(Ok(StatefulStreamResult::Continue)) } @@ -666,20 +636,7 @@ impl HashJoinStream { if is_empty { // Invariant: state_after_build_ready should have already completed // join types whose result is fixed to empty when the build side is empty. - // - // However, when dynamic filtering is enabled, we skip the short-circuit - // in state_after_build_ready to ensure all partitions report their - // build-side data. In this case, we might reach here with an empty - // build side even for join types that produce empty results. - if self.build_accumulator.is_none() { - debug_assert!(!self.join_type.empty_build_side_produces_empty_result()); - } - - if self.join_type.empty_build_side_produces_empty_result() { - timer.done(); - self.state = HashJoinStreamState::FetchProbeBatch; - return Ok(StatefulStreamResult::Continue); - } + debug_assert!(!self.join_type.empty_build_side_produces_empty_result()); let result = build_batch_empty_build_side( &self.schema, From 00819f516c2139498bca5c2c66acad2dd2d3b5e5 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 15 Apr 2026 09:05:51 -0500 Subject: [PATCH 3/7] tweak --- .../physical-plan/src/joins/hash_join/shared_bounds.rs | 8 -------- datafusion/physical-plan/src/joins/hash_join/stream.rs | 7 ------- 2 files changed, 15 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs index 7060e60202fa6..7a991809f57a6 100644 --- a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs +++ b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs @@ -219,14 +219,6 @@ pub(crate) struct SharedBuildAccumulator { /// Number of `report_build_data` calls still expected before the dynamic /// filter can be finalized. Each call decrements the counter; the caller /// that brings it to zero is responsible for publishing the filter. - /// - /// This intentionally does NOT block partitions on a barrier: callers to - /// `report_build_data` return as soon as they have stored their build-side - /// slice. Blocking all partitions until every partition arrived used to - /// cause a deadlock with RepartitionExec's global-gate backpressure, because - /// a partition parked on the barrier held its build-side receiver alive - /// long enough for the shared gate to close across unrelated hash join - /// instances. See issue #21625 for the TPCH Q18 reproducer. remaining: AtomicUsize, /// Dynamic filter for pushdown to probe side dynamic_filter: Arc, diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index 0a35a4441e3f5..da5cb83ef2722 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -518,13 +518,6 @@ impl HashJoinStream { ), }; - // Report build-side data without blocking this stream. The - // accumulator publishes the dynamic filter when the last - // partition reports; all other partitions return immediately and - // start probing. Any probe batches read before the filter is - // finalized simply see the placeholder filter, which is safe — - // the dynamic filter is an optimization, not a correctness - // constraint. build_accumulator.report_build_data(build_data)?; } From c1db602ba24425769f8d74114272cff52c41fbc6 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 15 Apr 2026 09:27:11 -0500 Subject: [PATCH 4/7] test: add regression tests for SharedBuildAccumulator non-blocking contract MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three unit tests guarding the contract that `report_build_data` must never wait on other partitions: - `report_build_data_does_not_block_on_partial_reports` — the core regression test for #21625. Builds an accumulator expecting four reports, submits only one, and asserts the call returns within 1s (it used to block forever on `tokio::sync::Barrier::wait()`). - `last_reporter_publishes_filter_and_marks_complete` — verifies the leader-election contract: earlier reporters leave the dynamic filter in its placeholder state; exactly the last reporter publishes it and marks it complete. - `concurrent_reporters_elect_exactly_one_leader` — spawns every partition concurrently and asserts the filter ends up complete. Each `report_build_data` call is wrapped in a 1s `tokio::time::timeout` so a regression to a blocking implementation fails the test with a clear message instead of hanging the test runner. Verified by porting the tests to the pre-fix `main` (adding `.await` to match the async signature): tests 1 and 2 fail within ~1s with the expected panic message, test 3 still passes (leader election works under either design). Co-Authored-By: Claude Opus 4.6 (1M context) --- .../src/joins/hash_join/shared_bounds.rs | 154 ++++++++++++++++++ 1 file changed, 154 insertions(+) diff --git a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs index 7a991809f57a6..29e071d332969 100644 --- a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs +++ b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs @@ -600,3 +600,157 @@ impl fmt::Debug for SharedBuildAccumulator { write!(f, "SharedBuildAccumulator") } } + +#[cfg(test)] +mod tests { + use super::*; + use datafusion_physical_expr::expressions::Column; + use std::time::Duration; + + /// How long a single `report_build_data` call is allowed to take before + /// the test fails. `report_build_data` is synchronous and should return + /// in microseconds; this bound exists so a regression to a blocking + /// implementation fails the test instead of hanging the test runner. + const REPORT_TIMEOUT: Duration = Duration::from_secs(1); + + /// Build an accumulator in Partitioned mode expecting `expected_calls` + /// reports over `num_partitions` partitions, plus the dynamic filter it + /// guards so tests can inspect its completion state. + fn partitioned_accumulator( + expected_calls: usize, + num_partitions: usize, + ) -> (SharedBuildAccumulator, Arc) { + let probe_schema = Arc::new(Schema::new(vec![Field::new( + "k", + DataType::Int64, + false, + )])); + let col = Arc::new(Column::new("k", 0)) as PhysicalExprRef; + let filter = Arc::new(DynamicFilterPhysicalExpr::new( + vec![Arc::clone(&col)], + lit(true), + )); + let accum = SharedBuildAccumulator { + inner: Mutex::new(AccumulatedBuildData::Partitioned { + partitions: vec![None; num_partitions], + }), + remaining: AtomicUsize::new(expected_calls), + dynamic_filter: Arc::clone(&filter), + on_right: vec![col], + repartition_random_state: HASH_JOIN_SEED, + probe_schema, + }; + (accum, filter) + } + + fn empty_partitioned_report(partition_id: usize) -> PartitionBuildData { + PartitionBuildData::Partitioned { + partition_id, + pushdown: PushdownStrategy::Empty, + bounds: PartitionBounds::new(vec![]), + } + } + + /// Call `report_build_data` under a timeout so a regression to a blocking + /// implementation fails the test instead of hanging. The body of the + /// inner `async` block is what differs between the current (synchronous) + /// implementation and any past or future blocking one. + async fn report_with_timeout( + accum: &SharedBuildAccumulator, + data: PartitionBuildData, + what: &str, + ) { + tokio::time::timeout(REPORT_TIMEOUT, async { + accum.report_build_data(data).unwrap(); + }) + .await + .unwrap_or_else(|_| { + panic!( + "report_build_data blocked for >{:?} while reporting {what} — \ + SharedBuildAccumulator must never wait on other partitions", + REPORT_TIMEOUT + ) + }); + } + + /// Regression test for #21625. + /// + /// `report_build_data` must return without waiting for other partitions. + /// Previously it parked on a `tokio::sync::Barrier`, which combined with + /// `RepartitionExec`'s global backpressure gate could deadlock the + /// pipeline. This test creates an accumulator expecting four partitions + /// and reports from only one — the call must still return immediately + /// and the dynamic filter must remain in its placeholder (not-yet- + /// complete) state. + #[tokio::test] + async fn report_build_data_does_not_block_on_partial_reports() { + let (accum, filter) = partitioned_accumulator(4, 4); + + report_with_timeout(&accum, empty_partitioned_report(0), "partition 0").await; + + // With 3 partitions still outstanding, the filter must NOT have + // been published yet. + assert!( + tokio::time::timeout(Duration::from_millis(50), filter.wait_complete()) + .await + .is_err(), + "dynamic filter was marked complete after only 1/4 partitions reported" + ); + } + + /// Exactly one caller — the one that decrements `remaining` to zero — + /// must run the leader body (publish the filter expression and mark it + /// complete). All earlier callers leave the filter in the `InProgress` + /// placeholder state. Each report is itself under a timeout so a + /// blocking implementation fails instead of hanging. + #[tokio::test] + async fn last_reporter_publishes_filter_and_marks_complete() { + let (accum, filter) = partitioned_accumulator(2, 2); + + report_with_timeout(&accum, empty_partitioned_report(0), "first of two").await; + // First of two reporters: filter still in placeholder state. + assert!( + tokio::time::timeout(Duration::from_millis(50), filter.wait_complete()) + .await + .is_err(), + "first reporter (out of 2) must not mark the dynamic filter complete" + ); + + report_with_timeout(&accum, empty_partitioned_report(1), "second of two").await; + // Second (last) reporter: filter must be complete. + tokio::time::timeout(Duration::from_millis(500), filter.wait_complete()) + .await + .expect("last reporter must mark the dynamic filter complete"); + } + + /// Concurrent reports from every partition must still elect exactly + /// one leader. The test exercises the `AcqRel` decrement and its + /// happens-before pairing with the `inner` writes. + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn concurrent_reporters_elect_exactly_one_leader() { + let (accum, filter) = partitioned_accumulator(8, 8); + let accum = Arc::new(accum); + + let handles: Vec<_> = (0..8) + .map(|pid| { + let accum = Arc::clone(&accum); + tokio::spawn(async move { + tokio::time::timeout(REPORT_TIMEOUT, async { + accum.report_build_data(empty_partitioned_report(pid)) + }) + .await + .expect("report_build_data must not block") + }) + }) + .collect(); + + for h in handles { + h.await.unwrap().unwrap(); + } + + // Every partition reported; the leader must have run mark_complete. + tokio::time::timeout(Duration::from_secs(1), filter.wait_complete()) + .await + .expect("filter must be complete after all partitions report"); + } +} From 863e73b00486d038bbd4bea1f78664e1786b6e3a Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 15 Apr 2026 10:12:52 -0500 Subject: [PATCH 5/7] lint --- .../src/joins/hash_join/shared_bounds.rs | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs index 29e071d332969..39736c27e879e 100644 --- a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs +++ b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs @@ -620,11 +620,8 @@ mod tests { expected_calls: usize, num_partitions: usize, ) -> (SharedBuildAccumulator, Arc) { - let probe_schema = Arc::new(Schema::new(vec![Field::new( - "k", - DataType::Int64, - false, - )])); + let probe_schema = + Arc::new(Schema::new(vec![Field::new("k", DataType::Int64, false)])); let col = Arc::new(Column::new("k", 0)) as PhysicalExprRef; let filter = Arc::new(DynamicFilterPhysicalExpr::new( vec![Arc::clone(&col)], @@ -666,9 +663,8 @@ mod tests { .await .unwrap_or_else(|_| { panic!( - "report_build_data blocked for >{:?} while reporting {what} — \ - SharedBuildAccumulator must never wait on other partitions", - REPORT_TIMEOUT + "report_build_data blocked for >{REPORT_TIMEOUT:?} while reporting \ + {what} — SharedBuildAccumulator must never wait on other partitions" ) }); } @@ -728,13 +724,15 @@ mod tests { /// happens-before pairing with the `inner` writes. #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn concurrent_reporters_elect_exactly_one_leader() { + use datafusion_common_runtime::SpawnedTask; + let (accum, filter) = partitioned_accumulator(8, 8); let accum = Arc::new(accum); let handles: Vec<_> = (0..8) .map(|pid| { let accum = Arc::clone(&accum); - tokio::spawn(async move { + SpawnedTask::spawn(async move { tokio::time::timeout(REPORT_TIMEOUT, async { accum.report_build_data(empty_partitioned_report(pid)) }) @@ -745,7 +743,7 @@ mod tests { .collect(); for h in handles { - h.await.unwrap().unwrap(); + h.join().await.unwrap().unwrap(); } // Every partition reported; the leader must have run mark_complete. From 7dd78797d84b34af4d534d141f1d76de3c9fed43 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 15 Apr 2026 10:17:44 -0500 Subject: [PATCH 6/7] test: drop scan-level row-count assertions in dynamic filter pushdown tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit With the barrier removed from SharedBuildAccumulator, probing can begin before the leader partition has published the finalized dynamic filter. The scan may therefore emit its first batch against the placeholder `lit(true)` filter, so scan-level metrics no longer reflect the finalized filter's row count. The join itself still applies the filter correctly (result batches are still snapshot-asserted), and the plan-shape snapshots still verify the CASE expression is pushed into the probe-side DataSource. Restoring scan-level early filtering under the non-blocking design is a follow-up optimization — most likely via a bounded wait_update on the scan side so the scan waits briefly for the filter before its first read, without reintroducing the backpressure deadlock from Co-Authored-By: Claude Opus 4.6 (1M context) --- .../physical_optimizer/filter_pushdown.rs | 165 +++++++++++++++--- 1 file changed, 145 insertions(+), 20 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs index d058e44a85d00..97a81d8a05cd5 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs @@ -1107,12 +1107,17 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() { let result = format!("{}", pretty_format_batches(&batches).unwrap()); - let probe_scan_metrics = probe_scan.metrics().unwrap(); - - // The probe side had 4 rows, but after applying the dynamic filter only 2 rows should remain. - // The number of output rows from the probe side scan should stay consistent across executions. - // Issue: https://github.com/apache/datafusion/issues/17451 - assert_eq!(probe_scan_metrics.output_rows().unwrap(), 2); + // Previously this test asserted `probe_scan.metrics().output_rows() == 2` + // — that the probe scan emitted only the rows matching the finalized + // dynamic filter. That relied on all partitions blocking on a shared + // barrier inside `SharedBuildAccumulator` before any probing started, + // which deadlocked with `RepartitionExec`'s global backpressure gate + // under Q18-style plans (#21625). With the barrier removed, probing + // can begin before the last partition has reported, so the scan may + // emit its first batch against the placeholder filter. The join + // itself still applies the filter correctly (result batches are + // snapshot-asserted below). Restoring scan-level early filtering + // under the non-blocking design is a follow-up optimization. insta::assert_snapshot!( result, @@ -1285,12 +1290,17 @@ async fn test_hashjoin_dynamic_filter_pushdown_collect_left() { let result = format!("{}", pretty_format_batches(&batches).unwrap()); - let probe_scan_metrics = probe_scan.metrics().unwrap(); - - // The probe side had 4 rows, but after applying the dynamic filter only 2 rows should remain. - // The number of output rows from the probe side scan should stay consistent across executions. - // Issue: https://github.com/apache/datafusion/issues/17451 - assert_eq!(probe_scan_metrics.output_rows().unwrap(), 2); + // Previously this test asserted `probe_scan.metrics().output_rows() == 2` + // — that the probe scan emitted only the rows matching the finalized + // dynamic filter. That relied on all partitions blocking on a shared + // barrier inside `SharedBuildAccumulator` before any probing started, + // which deadlocked with `RepartitionExec`'s global backpressure gate + // under Q18-style plans (#21625). With the barrier removed, probing + // can begin before the last partition has reported, so the scan may + // emit its first batch against the placeholder filter. The join + // itself still applies the filter correctly (result batches are + // snapshot-asserted below). Restoring scan-level early filtering + // under the non-blocking design is a follow-up optimization. insta::assert_snapshot!( result, @@ -2585,10 +2595,8 @@ async fn test_hashjoin_hash_table_pushdown_partitioned() { let result = format!("{}", pretty_format_batches(&batches).unwrap()); - let probe_scan_metrics = probe_scan.metrics().unwrap(); - - // The probe side had 4 rows, but after applying the dynamic filter only 2 rows should remain. - assert_eq!(probe_scan_metrics.output_rows().unwrap(), 2); + // Scan-level row-count assertion removed; see the explanation in + // `test_hashjoin_dynamic_filter_pushdown_partitioned` (#21625). // Results should be identical to the InList version insta::assert_snapshot!( @@ -2736,10 +2744,8 @@ async fn test_hashjoin_hash_table_pushdown_collect_left() { let result = format!("{}", pretty_format_batches(&batches).unwrap()); - let probe_scan_metrics = probe_scan.metrics().unwrap(); - - // The probe side had 4 rows, but after applying the dynamic filter only 2 rows should remain. - assert_eq!(probe_scan_metrics.output_rows().unwrap(), 2); + // Scan-level row-count assertion removed; see the explanation in + // `test_hashjoin_dynamic_filter_pushdown_partitioned` (#21625). // Results should be identical to the InList version insta::assert_snapshot!( @@ -2761,6 +2767,125 @@ async fn test_hashjoin_hash_table_pushdown_collect_left() { // already covered by the simpler CollectLeft port in push_down_filter_parquet.slt; // the with_support(false) branch has no SQL analog (parquet always supports // pushdown). +/// Test HashTable strategy with integer multi-column join keys. +/// Verifies that hash_lookup works correctly with integer data types. +#[tokio::test] +async fn test_hashjoin_hash_table_pushdown_integer_keys() { + use datafusion_common::JoinType; + use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode}; + + // Create build side with integer keys + let build_batches = vec![ + record_batch!( + ("id1", Int32, [1, 2]), + ("id2", Int32, [10, 20]), + ("value", Float64, [100.0, 200.0]) + ) + .unwrap(), + ]; + let build_side_schema = Arc::new(Schema::new(vec![ + Field::new("id1", DataType::Int32, false), + Field::new("id2", DataType::Int32, false), + Field::new("value", DataType::Float64, false), + ])); + let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema)) + .with_support(true) + .with_batches(build_batches) + .build(); + + // Create probe side with more integer rows + let probe_batches = vec![ + record_batch!( + ("id1", Int32, [1, 2, 3, 4]), + ("id2", Int32, [10, 20, 30, 40]), + ("data", Utf8, ["a", "b", "c", "d"]) + ) + .unwrap(), + ]; + let probe_side_schema = Arc::new(Schema::new(vec![ + Field::new("id1", DataType::Int32, false), + Field::new("id2", DataType::Int32, false), + Field::new("data", DataType::Utf8, false), + ])); + let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema)) + .with_support(true) + .with_batches(probe_batches) + .build(); + + // Create join on multiple integer columns + let on = vec![ + ( + col("id1", &build_side_schema).unwrap(), + col("id1", &probe_side_schema).unwrap(), + ), + ( + col("id2", &build_side_schema).unwrap(), + col("id2", &probe_side_schema).unwrap(), + ), + ]; + let plan = Arc::new( + HashJoinExec::try_new( + build_scan, + Arc::clone(&probe_scan), + on, + None, + &JoinType::Inner, + None, + PartitionMode::CollectLeft, + datafusion_common::NullEquality::NullEqualsNothing, + false, + ) + .unwrap(), + ); + + // Apply optimization with forced HashTable strategy + let session_config = SessionConfig::default() + .with_batch_size(10) + .set_usize("datafusion.optimizer.hash_join_inlist_pushdown_max_size", 1) + .set_bool("datafusion.execution.parquet.pushdown_filters", true) + .set_bool("datafusion.optimizer.enable_dynamic_filter_pushdown", true); + let plan = FilterPushdown::new_post_optimization() + .optimize(plan, session_config.options()) + .unwrap(); + let session_ctx = SessionContext::new_with_config(session_config); + session_ctx.register_object_store( + ObjectStoreUrl::parse("test://").unwrap().as_ref(), + Arc::new(InMemory::new()), + ); + let state = session_ctx.state(); + let task_ctx = state.task_ctx(); + let batches = collect(Arc::clone(&plan), Arc::clone(&task_ctx)) + .await + .unwrap(); + + // Verify hash_lookup is used + let plan_str = format_plan_for_test(&plan).to_string(); + assert!( + plan_str.contains("hash_lookup"), + "Expected hash_lookup in plan but got: {plan_str}" + ); + assert!( + !plan_str.contains("IN (SET)"), + "Expected no IN (SET) in plan but got: {plan_str}" + ); + + let result = format!("{}", pretty_format_batches(&batches).unwrap()); + + // Scan-level row-count assertion removed; see the explanation in + // `test_hashjoin_dynamic_filter_pushdown_partitioned` (#21625). + + insta::assert_snapshot!( + result, + @r" + +-----+-----+-------+-----+-----+------+ + | id1 | id2 | value | id1 | id2 | data | + +-----+-----+-------+-----+-----+------+ + | 1 | 10 | 100.0 | 1 | 10 | a | + | 2 | 20 | 200.0 | 2 | 20 | b | + +-----+-----+-------+-----+-----+------+ + ", + ); +} #[tokio::test] async fn test_hashjoin_dynamic_filter_pushdown_is_used() { use datafusion_common::JoinType; From 40565a05f13aae9cd6c49f012619829f3500ab95 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 15 Apr 2026 11:08:30 -0500 Subject: [PATCH 7/7] test: drop follow-up note from filter_pushdown test comments Co-Authored-By: Claude Opus 4.6 (1M context) --- datafusion/core/tests/physical_optimizer/filter_pushdown.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs index 97a81d8a05cd5..a71da22c97687 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs @@ -1116,8 +1116,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() { // can begin before the last partition has reported, so the scan may // emit its first batch against the placeholder filter. The join // itself still applies the filter correctly (result batches are - // snapshot-asserted below). Restoring scan-level early filtering - // under the non-blocking design is a follow-up optimization. + // snapshot-asserted below). insta::assert_snapshot!( result, @@ -1299,8 +1298,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_collect_left() { // can begin before the last partition has reported, so the scan may // emit its first batch against the placeholder filter. The join // itself still applies the filter correctly (result batches are - // snapshot-asserted below). Restoring scan-level early filtering - // under the non-blocking design is a follow-up optimization. + // snapshot-asserted below). insta::assert_snapshot!( result,