diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs index d058e44a85d00..a71da22c97687 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs @@ -1107,12 +1107,16 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() { let result = format!("{}", pretty_format_batches(&batches).unwrap()); - let probe_scan_metrics = probe_scan.metrics().unwrap(); - - // The probe side had 4 rows, but after applying the dynamic filter only 2 rows should remain. - // The number of output rows from the probe side scan should stay consistent across executions. - // Issue: https://github.com/apache/datafusion/issues/17451 - assert_eq!(probe_scan_metrics.output_rows().unwrap(), 2); + // Previously this test asserted `probe_scan.metrics().output_rows() == 2` + // — that the probe scan emitted only the rows matching the finalized + // dynamic filter. That relied on all partitions blocking on a shared + // barrier inside `SharedBuildAccumulator` before any probing started, + // which deadlocked with `RepartitionExec`'s global backpressure gate + // under Q18-style plans (#21625). With the barrier removed, probing + // can begin before the last partition has reported, so the scan may + // emit its first batch against the placeholder filter. The join + // itself still applies the filter correctly (result batches are + // snapshot-asserted below). insta::assert_snapshot!( result, @@ -1285,12 +1289,16 @@ async fn test_hashjoin_dynamic_filter_pushdown_collect_left() { let result = format!("{}", pretty_format_batches(&batches).unwrap()); - let probe_scan_metrics = probe_scan.metrics().unwrap(); - - // The probe side had 4 rows, but after applying the dynamic filter only 2 rows should remain. - // The number of output rows from the probe side scan should stay consistent across executions. - // Issue: https://github.com/apache/datafusion/issues/17451 - assert_eq!(probe_scan_metrics.output_rows().unwrap(), 2); + // Previously this test asserted `probe_scan.metrics().output_rows() == 2` + // — that the probe scan emitted only the rows matching the finalized + // dynamic filter. That relied on all partitions blocking on a shared + // barrier inside `SharedBuildAccumulator` before any probing started, + // which deadlocked with `RepartitionExec`'s global backpressure gate + // under Q18-style plans (#21625). With the barrier removed, probing + // can begin before the last partition has reported, so the scan may + // emit its first batch against the placeholder filter. The join + // itself still applies the filter correctly (result batches are + // snapshot-asserted below). insta::assert_snapshot!( result, @@ -2585,10 +2593,8 @@ async fn test_hashjoin_hash_table_pushdown_partitioned() { let result = format!("{}", pretty_format_batches(&batches).unwrap()); - let probe_scan_metrics = probe_scan.metrics().unwrap(); - - // The probe side had 4 rows, but after applying the dynamic filter only 2 rows should remain. - assert_eq!(probe_scan_metrics.output_rows().unwrap(), 2); + // Scan-level row-count assertion removed; see the explanation in + // `test_hashjoin_dynamic_filter_pushdown_partitioned` (#21625). // Results should be identical to the InList version insta::assert_snapshot!( @@ -2736,10 +2742,8 @@ async fn test_hashjoin_hash_table_pushdown_collect_left() { let result = format!("{}", pretty_format_batches(&batches).unwrap()); - let probe_scan_metrics = probe_scan.metrics().unwrap(); - - // The probe side had 4 rows, but after applying the dynamic filter only 2 rows should remain. - assert_eq!(probe_scan_metrics.output_rows().unwrap(), 2); + // Scan-level row-count assertion removed; see the explanation in + // `test_hashjoin_dynamic_filter_pushdown_partitioned` (#21625). // Results should be identical to the InList version insta::assert_snapshot!( @@ -2761,6 +2765,125 @@ async fn test_hashjoin_hash_table_pushdown_collect_left() { // already covered by the simpler CollectLeft port in push_down_filter_parquet.slt; // the with_support(false) branch has no SQL analog (parquet always supports // pushdown). +/// Test HashTable strategy with integer multi-column join keys. +/// Verifies that hash_lookup works correctly with integer data types. +#[tokio::test] +async fn test_hashjoin_hash_table_pushdown_integer_keys() { + use datafusion_common::JoinType; + use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode}; + + // Create build side with integer keys + let build_batches = vec![ + record_batch!( + ("id1", Int32, [1, 2]), + ("id2", Int32, [10, 20]), + ("value", Float64, [100.0, 200.0]) + ) + .unwrap(), + ]; + let build_side_schema = Arc::new(Schema::new(vec![ + Field::new("id1", DataType::Int32, false), + Field::new("id2", DataType::Int32, false), + Field::new("value", DataType::Float64, false), + ])); + let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema)) + .with_support(true) + .with_batches(build_batches) + .build(); + + // Create probe side with more integer rows + let probe_batches = vec![ + record_batch!( + ("id1", Int32, [1, 2, 3, 4]), + ("id2", Int32, [10, 20, 30, 40]), + ("data", Utf8, ["a", "b", "c", "d"]) + ) + .unwrap(), + ]; + let probe_side_schema = Arc::new(Schema::new(vec![ + Field::new("id1", DataType::Int32, false), + Field::new("id2", DataType::Int32, false), + Field::new("data", DataType::Utf8, false), + ])); + let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema)) + .with_support(true) + .with_batches(probe_batches) + .build(); + + // Create join on multiple integer columns + let on = vec![ + ( + col("id1", &build_side_schema).unwrap(), + col("id1", &probe_side_schema).unwrap(), + ), + ( + col("id2", &build_side_schema).unwrap(), + col("id2", &probe_side_schema).unwrap(), + ), + ]; + let plan = Arc::new( + HashJoinExec::try_new( + build_scan, + Arc::clone(&probe_scan), + on, + None, + &JoinType::Inner, + None, + PartitionMode::CollectLeft, + datafusion_common::NullEquality::NullEqualsNothing, + false, + ) + .unwrap(), + ); + + // Apply optimization with forced HashTable strategy + let session_config = SessionConfig::default() + .with_batch_size(10) + .set_usize("datafusion.optimizer.hash_join_inlist_pushdown_max_size", 1) + .set_bool("datafusion.execution.parquet.pushdown_filters", true) + .set_bool("datafusion.optimizer.enable_dynamic_filter_pushdown", true); + let plan = FilterPushdown::new_post_optimization() + .optimize(plan, session_config.options()) + .unwrap(); + let session_ctx = SessionContext::new_with_config(session_config); + session_ctx.register_object_store( + ObjectStoreUrl::parse("test://").unwrap().as_ref(), + Arc::new(InMemory::new()), + ); + let state = session_ctx.state(); + let task_ctx = state.task_ctx(); + let batches = collect(Arc::clone(&plan), Arc::clone(&task_ctx)) + .await + .unwrap(); + + // Verify hash_lookup is used + let plan_str = format_plan_for_test(&plan).to_string(); + assert!( + plan_str.contains("hash_lookup"), + "Expected hash_lookup in plan but got: {plan_str}" + ); + assert!( + !plan_str.contains("IN (SET)"), + "Expected no IN (SET) in plan but got: {plan_str}" + ); + + let result = format!("{}", pretty_format_batches(&batches).unwrap()); + + // Scan-level row-count assertion removed; see the explanation in + // `test_hashjoin_dynamic_filter_pushdown_partitioned` (#21625). + + insta::assert_snapshot!( + result, + @r" + +-----+-----+-------+-----+-----+------+ + | id1 | id2 | value | id1 | id2 | data | + +-----+-----+-------+-----+-----+------+ + | 1 | 10 | 100.0 | 1 | 10 | a | + | 2 | 20 | 200.0 | 2 | 20 | b | + +-----+-----+-------+-----+-----+------+ + ", + ); +} #[tokio::test] async fn test_hashjoin_dynamic_filter_pushdown_is_used() { use datafusion_common::JoinType; diff --git a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs index f32dc7fa80268..39736c27e879e 100644 --- a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs +++ b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs @@ -20,6 +20,7 @@ use std::fmt; use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; use crate::ExecutionPlan; use crate::ExecutionPlanProperties; @@ -42,7 +43,6 @@ use datafusion_physical_expr::expressions::{ use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef, ScalarFunctionExpr}; use parking_lot::Mutex; -use tokio::sync::Barrier; /// Represents the minimum and maximum values for a specific column. /// Used in dynamic filter pushdown to establish value boundaries. @@ -216,7 +216,10 @@ fn create_bounds_predicate( pub(crate) struct SharedBuildAccumulator { /// Build-side data protected by a single mutex to avoid ordering concerns inner: Mutex, - barrier: Barrier, + /// Number of `report_build_data` calls still expected before the dynamic + /// filter can be finalized. Each call decrements the counter; the caller + /// that brings it to zero is responsible for publishing the filter. + remaining: AtomicUsize, /// Dynamic filter for pushdown to probe side dynamic_filter: Arc, /// Right side join expressions needed for creating filter expressions @@ -337,7 +340,7 @@ impl SharedBuildAccumulator { Self { inner: Mutex::new(mode_data), - barrier: Barrier::new(expected_calls), + remaining: AtomicUsize::new(expected_calls), dynamic_filter, on_right, repartition_random_state, @@ -357,7 +360,7 @@ impl SharedBuildAccumulator { /// /// # Returns /// * `Result<()>` - Ok if successful, Err if filter update failed or mode mismatch - pub(crate) async fn report_build_data(&self, data: PartitionBuildData) -> Result<()> { + pub(crate) fn report_build_data(&self, data: PartitionBuildData) -> Result<()> { // Store data in the accumulator { let mut guard = self.inner.lock(); @@ -393,8 +396,13 @@ impl SharedBuildAccumulator { } } - // Wait for all partitions to report - if self.barrier.wait().await.is_leader() { + // Decrement the expected-calls counter. The caller that brings it to + // zero is the leader and publishes the final filter expression. + // + // `AcqRel` pairs with the other reporters' stores into `inner` above: + // the leader's subsequent read of `inner` happens-after every + // store by any partition that decremented before it. + if self.remaining.fetch_sub(1, Ordering::AcqRel) == 1 { // All partitions have reported, so we can create and update the filter let inner = self.inner.lock(); @@ -592,3 +600,155 @@ impl fmt::Debug for SharedBuildAccumulator { write!(f, "SharedBuildAccumulator") } } + +#[cfg(test)] +mod tests { + use super::*; + use datafusion_physical_expr::expressions::Column; + use std::time::Duration; + + /// How long a single `report_build_data` call is allowed to take before + /// the test fails. `report_build_data` is synchronous and should return + /// in microseconds; this bound exists so a regression to a blocking + /// implementation fails the test instead of hanging the test runner. + const REPORT_TIMEOUT: Duration = Duration::from_secs(1); + + /// Build an accumulator in Partitioned mode expecting `expected_calls` + /// reports over `num_partitions` partitions, plus the dynamic filter it + /// guards so tests can inspect its completion state. + fn partitioned_accumulator( + expected_calls: usize, + num_partitions: usize, + ) -> (SharedBuildAccumulator, Arc) { + let probe_schema = + Arc::new(Schema::new(vec![Field::new("k", DataType::Int64, false)])); + let col = Arc::new(Column::new("k", 0)) as PhysicalExprRef; + let filter = Arc::new(DynamicFilterPhysicalExpr::new( + vec![Arc::clone(&col)], + lit(true), + )); + let accum = SharedBuildAccumulator { + inner: Mutex::new(AccumulatedBuildData::Partitioned { + partitions: vec![None; num_partitions], + }), + remaining: AtomicUsize::new(expected_calls), + dynamic_filter: Arc::clone(&filter), + on_right: vec![col], + repartition_random_state: HASH_JOIN_SEED, + probe_schema, + }; + (accum, filter) + } + + fn empty_partitioned_report(partition_id: usize) -> PartitionBuildData { + PartitionBuildData::Partitioned { + partition_id, + pushdown: PushdownStrategy::Empty, + bounds: PartitionBounds::new(vec![]), + } + } + + /// Call `report_build_data` under a timeout so a regression to a blocking + /// implementation fails the test instead of hanging. The body of the + /// inner `async` block is what differs between the current (synchronous) + /// implementation and any past or future blocking one. + async fn report_with_timeout( + accum: &SharedBuildAccumulator, + data: PartitionBuildData, + what: &str, + ) { + tokio::time::timeout(REPORT_TIMEOUT, async { + accum.report_build_data(data).unwrap(); + }) + .await + .unwrap_or_else(|_| { + panic!( + "report_build_data blocked for >{REPORT_TIMEOUT:?} while reporting \ + {what} — SharedBuildAccumulator must never wait on other partitions" + ) + }); + } + + /// Regression test for #21625. + /// + /// `report_build_data` must return without waiting for other partitions. + /// Previously it parked on a `tokio::sync::Barrier`, which combined with + /// `RepartitionExec`'s global backpressure gate could deadlock the + /// pipeline. This test creates an accumulator expecting four partitions + /// and reports from only one — the call must still return immediately + /// and the dynamic filter must remain in its placeholder (not-yet- + /// complete) state. + #[tokio::test] + async fn report_build_data_does_not_block_on_partial_reports() { + let (accum, filter) = partitioned_accumulator(4, 4); + + report_with_timeout(&accum, empty_partitioned_report(0), "partition 0").await; + + // With 3 partitions still outstanding, the filter must NOT have + // been published yet. + assert!( + tokio::time::timeout(Duration::from_millis(50), filter.wait_complete()) + .await + .is_err(), + "dynamic filter was marked complete after only 1/4 partitions reported" + ); + } + + /// Exactly one caller — the one that decrements `remaining` to zero — + /// must run the leader body (publish the filter expression and mark it + /// complete). All earlier callers leave the filter in the `InProgress` + /// placeholder state. Each report is itself under a timeout so a + /// blocking implementation fails instead of hanging. + #[tokio::test] + async fn last_reporter_publishes_filter_and_marks_complete() { + let (accum, filter) = partitioned_accumulator(2, 2); + + report_with_timeout(&accum, empty_partitioned_report(0), "first of two").await; + // First of two reporters: filter still in placeholder state. + assert!( + tokio::time::timeout(Duration::from_millis(50), filter.wait_complete()) + .await + .is_err(), + "first reporter (out of 2) must not mark the dynamic filter complete" + ); + + report_with_timeout(&accum, empty_partitioned_report(1), "second of two").await; + // Second (last) reporter: filter must be complete. + tokio::time::timeout(Duration::from_millis(500), filter.wait_complete()) + .await + .expect("last reporter must mark the dynamic filter complete"); + } + + /// Concurrent reports from every partition must still elect exactly + /// one leader. The test exercises the `AcqRel` decrement and its + /// happens-before pairing with the `inner` writes. + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn concurrent_reporters_elect_exactly_one_leader() { + use datafusion_common_runtime::SpawnedTask; + + let (accum, filter) = partitioned_accumulator(8, 8); + let accum = Arc::new(accum); + + let handles: Vec<_> = (0..8) + .map(|pid| { + let accum = Arc::clone(&accum); + SpawnedTask::spawn(async move { + tokio::time::timeout(REPORT_TIMEOUT, async { + accum.report_build_data(empty_partitioned_report(pid)) + }) + .await + .expect("report_build_data must not block") + }) + }) + .collect(); + + for h in handles { + h.join().await.unwrap().unwrap(); + } + + // Every partition reported; the leader must have run mark_complete. + tokio::time::timeout(Duration::from_secs(1), filter.wait_complete()) + .await + .expect("filter must be complete after all partitions report"); + } +} diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index 1004fba3d4f45..da5cb83ef2722 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -125,8 +125,6 @@ impl BuildSide { pub(super) enum HashJoinStreamState { /// Initial state for HashJoinStream indicating that build-side data not collected yet WaitBuildSide, - /// Waiting for bounds to be reported by all partitions - WaitPartitionBoundsReport, /// Indicates that build-side has been collected, and stream is ready for fetching probe-side FetchProbeBatch, /// Indicates that non-empty batch has been fetched from probe-side, and is ready to be processed @@ -216,9 +214,6 @@ pub(super) struct HashJoinStream { right_side_ordered: bool, /// Shared build accumulator for coordinating dynamic filter updates (collects hash maps and/or bounds, optional) build_accumulator: Option>, - /// Optional future to signal when build information has been reported by all partitions - /// and the dynamic filter has been updated - build_waiter: Option>, /// Partitioning mode to use mode: PartitionMode, /// Output buffer for coalescing small batches into larger ones with optional fetch limit. @@ -399,21 +394,19 @@ impl HashJoinStream { build_indices_buffer: Vec::with_capacity(batch_size), right_side_ordered, build_accumulator, - build_waiter: None, mode, output_buffer, null_aware, } } - /// Returns the next state after the build side has been fully collected - /// and any required build-side coordination has completed. - fn state_after_build_ready( - join_type: JoinType, - left_data: &JoinLeftData, - ) -> HashJoinStreamState { + /// Returns the next state after the build side has been fully collected. + /// + /// Short-circuits to `Completed` when the build side is empty and the + /// join semantics guarantee an empty result. + fn state_after_build_ready(&self, left_data: &JoinLeftData) -> HashJoinStreamState { if left_data.map().is_empty() - && join_type.empty_build_side_produces_empty_result() + && self.join_type.empty_build_side_produces_empty_result() { HashJoinStreamState::Completed } else { @@ -445,9 +438,6 @@ impl HashJoinStream { HashJoinStreamState::WaitBuildSide => { handle_state!(ready!(self.collect_build_side(cx))) } - HashJoinStreamState::WaitPartitionBoundsReport => { - handle_state!(ready!(self.wait_for_partition_bounds_report(cx))) - } HashJoinStreamState::FetchProbeBatch => { handle_state!(ready!(self.fetch_probe_batch(cx))) } @@ -468,28 +458,6 @@ impl HashJoinStream { } } - /// Optional step to wait until build-side information (hash maps or bounds) has been reported by all partitions. - /// This state is only entered if a build accumulator is present. - /// - /// ## Why wait? - /// - /// The dynamic filter is only built once all partitions have reported their information (hash maps or bounds). - /// If we do not wait here, the probe-side scan may start before the filter is ready. - /// This can lead to the probe-side scan missing the opportunity to apply the filter - /// and skip reading unnecessary data. - fn wait_for_partition_bounds_report( - &mut self, - cx: &mut std::task::Context<'_>, - ) -> Poll>>> { - if let Some(ref mut fut) = self.build_waiter { - ready!(fut.get_shared(cx))?; - } - let build_side = self.build_side.try_as_ready()?; - self.state = - Self::state_after_build_ready(self.join_type, build_side.left_data.as_ref()); - Poll::Ready(Ok(StatefulStreamResult::Continue)) - } - /// Collects build-side data by polling `OnceFut` future from initialized build-side /// /// Updates build-side to `Ready`, and state to `FetchProbeSide` @@ -517,8 +485,6 @@ impl HashJoinStream { // Report hash maps (Partitioned mode) or bounds (CollectLeft mode) to the accumulator // which will handle synchronization and filter updates if let Some(ref build_accumulator) = self.build_accumulator { - let build_accumulator = Arc::clone(build_accumulator); - let left_side_partition_id = match self.mode { PartitionMode::Partitioned => self.partition, PartitionMode::CollectLeft => 0, @@ -552,15 +518,10 @@ impl HashJoinStream { ), }; - self.build_waiter = Some(OnceFut::new(async move { - build_accumulator.report_build_data(build_data).await - })); - self.state = HashJoinStreamState::WaitPartitionBoundsReport; - } else { - self.state = - Self::state_after_build_ready(self.join_type, left_data.as_ref()); + build_accumulator.report_build_data(build_data)?; } + self.state = self.state_after_build_ready(left_data.as_ref()); self.build_side = BuildSide::Ready(BuildSideReadyState { left_data }); Poll::Ready(Ok(StatefulStreamResult::Continue)) } @@ -669,6 +630,7 @@ impl HashJoinStream { // Invariant: state_after_build_ready should have already completed // join types whose result is fixed to empty when the build side is empty. debug_assert!(!self.join_type.empty_build_side_produces_empty_result()); + let result = build_batch_empty_build_side( &self.schema, build_side.left_data.batch(),