From 1f5644dcbe5ff6d34c333f16dfca98266e4c62fb Mon Sep 17 00:00:00 2001 From: Ratul Dawar Date: Wed, 15 Apr 2026 03:12:00 +0530 Subject: [PATCH 1/6] fix: prevent hash join deadlock when dynamic filtering is enabled Empty partitions were short-circuiting to Completed state without reporting to the shared accumulator. This caused other partitions to wait indefinitely at the barrier. This fix ensures that even empty partitions proceed to report their data if a build accumulator is present. Made-with: Cursor --- .../src/joins/hash_join/stream.rs | 36 ++++++++++++------- 1 file changed, 24 insertions(+), 12 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index 1004fba3d4f45..d4f837e3e7887 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -408,13 +408,13 @@ impl HashJoinStream { /// Returns the next state after the build side has been fully collected /// and any required build-side coordination has completed. - fn state_after_build_ready( - join_type: JoinType, - left_data: &JoinLeftData, - ) -> HashJoinStreamState { - if left_data.map().is_empty() - && join_type.empty_build_side_produces_empty_result() - { + fn state_after_build_ready(&self, left_data: &JoinLeftData) -> HashJoinStreamState { + let is_empty = left_data.map().is_empty(); + let produces_empty = self.join_type.empty_build_side_produces_empty_result(); + let coordinated = self.build_accumulator.is_some(); + let is_initial_collect = matches!(self.state, HashJoinStreamState::WaitBuildSide); + + if is_empty && produces_empty && (!coordinated || !is_initial_collect) { HashJoinStreamState::Completed } else { HashJoinStreamState::FetchProbeBatch @@ -485,8 +485,7 @@ impl HashJoinStream { ready!(fut.get_shared(cx))?; } let build_side = self.build_side.try_as_ready()?; - self.state = - Self::state_after_build_ready(self.join_type, build_side.left_data.as_ref()); + self.state = self.state_after_build_ready(build_side.left_data.as_ref()); Poll::Ready(Ok(StatefulStreamResult::Continue)) } @@ -557,8 +556,7 @@ impl HashJoinStream { })); self.state = HashJoinStreamState::WaitPartitionBoundsReport; } else { - self.state = - Self::state_after_build_ready(self.join_type, left_data.as_ref()); + self.state = self.state_after_build_ready(left_data.as_ref()); } self.build_side = BuildSide::Ready(BuildSideReadyState { left_data }); @@ -668,7 +666,21 @@ impl HashJoinStream { if is_empty { // Invariant: state_after_build_ready should have already completed // join types whose result is fixed to empty when the build side is empty. - debug_assert!(!self.join_type.empty_build_side_produces_empty_result()); + // + // However, when dynamic filtering is enabled, we skip the short-circuit + // in state_after_build_ready to ensure all partitions report their + // build-side data. In this case, we might reach here with an empty + // build side even for join types that produce empty results. + if self.build_accumulator.is_none() { + debug_assert!(!self.join_type.empty_build_side_produces_empty_result()); + } + + if self.join_type.empty_build_side_produces_empty_result() { + timer.done(); + self.state = HashJoinStreamState::FetchProbeBatch; + return Ok(StatefulStreamResult::Continue); + } + let result = build_batch_empty_build_side( &self.schema, build_side.left_data.batch(), From 3d3d1d2543334238f77a08bdde6ccfe1a141d4f3 Mon Sep 17 00:00:00 2001 From: Ratul Dawar Date: Wed, 15 Apr 2026 22:32:56 +0530 Subject: [PATCH 2/6] test: add deterministic integration test for hash join deadlock This test forces a Partitioned join with an empty partition to ensure the dynamic filter coordination deadlock does not regress. Made-with: Cursor --- .config/nextest.toml | 0 .../core/tests/repro_deadlock_integration.rs | 90 +++++++++++++++++++ 2 files changed, 90 insertions(+) create mode 100644 .config/nextest.toml create mode 100644 datafusion/core/tests/repro_deadlock_integration.rs diff --git a/.config/nextest.toml b/.config/nextest.toml new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/datafusion/core/tests/repro_deadlock_integration.rs b/datafusion/core/tests/repro_deadlock_integration.rs new file mode 100644 index 0000000000000..eb6f97176bcdd --- /dev/null +++ b/datafusion/core/tests/repro_deadlock_integration.rs @@ -0,0 +1,90 @@ +use std::sync::Arc; +use datafusion::prelude::*; +use datafusion::error::Result; +use datafusion::assert_batches_eq; +use arrow::datatypes::{DataType, Field, Schema}; +use arrow::record_batch::RecordBatch; +use arrow::array::{Int32Array, StringArray}; + +#[tokio::test] +async fn test_hash_join_dynamic_filter_deadlock_with_empty_partition() -> Result<()> { + // We use 2 partitions and 2 rows, but we use a filter that puts all rows in one partition + // and zero rows in the other partition. + let config = SessionConfig::new() + .with_target_partitions(2) + .set_bool("datafusion.optimizer.enable_round_robin_repartition", false) + .set_bool("datafusion.optimizer.repartition_joins", true) + .set_bool("datafusion.optimizer.prefer_hash_join", true) + .set_usize("datafusion.optimizer.hash_join_single_partition_threshold", 0); // Force Partitioned mode + let ctx = SessionContext::new_with_config(config); + + // Create customer table with 2 rows that will both end up in the same partition + // if the join key is the same (or hashes to the same partition). + let customer_schema = Arc::new(Schema::new(vec![ + Field::new("c_custkey", DataType::Int32, false), + Field::new("c_name", DataType::Utf8, false), + ])); + // Both rows have c_custkey = 1, so they MUST go to the same partition in Partitioned join + let customer_batch = RecordBatch::try_new( + customer_schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![1, 1])), + Arc::new(StringArray::from(vec!["A", "B"])), + ], + )?; + // Register as a table provider with 2 partitions to force Partitioned mode + let customer_source = datafusion::datasource::MemTable::try_new(customer_schema, vec![vec![customer_batch.clone()], vec![]])?; + ctx.register_table("customer", Arc::new(customer_source))?; + + // Create orders table + let orders_schema = Arc::new(Schema::new(vec![ + Field::new("o_orderkey", DataType::Int32, false), + Field::new("o_custkey", DataType::Int32, false), + ])); + let orders_batch = RecordBatch::try_new( + orders_schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![10, 20])), + Arc::new(Int32Array::from(vec![1, 1])), + ], + )?; + let orders_source = datafusion::datasource::MemTable::try_new(orders_schema, vec![vec![orders_batch.clone()], vec![]])?; + ctx.register_table("orders", Arc::new(orders_source))?; + + // Query with dynamic filter (IN subquery) + // The build side (right side of join) will have 2 partitions. + // Since all o_custkey are 1, one partition will have 2 rows, and the other will have 0 rows. + let sql = " + SELECT c_name, o_orderkey + FROM customer + JOIN orders ON c_custkey = o_custkey + WHERE o_orderkey IN (SELECT o_orderkey FROM orders WHERE o_orderkey > 15) + "; + + let dataframe = ctx.sql(sql).await?; + let plan = dataframe.create_physical_plan().await?; + + // Verify Partitioned mode + let plan_str = format!("{:?}", plan); + println!("Plan: {}", plan_str); + assert!(plan_str.contains("HashJoinExec"), "Plan should use a hash join but got: {}", plan_str); + + // Wrap the collection in a timeout to detect deadlock + let task_ctx = ctx.task_ctx(); + let results = tokio::time::timeout(std::time::Duration::from_secs(60), async move { + datafusion::physical_plan::collect(plan, task_ctx).await + }).await.expect("DEADLOCK DETECTED: Query timed out after 1 minute")?; + + let expected = vec![ + "+--------+------------+", + "| c_name | o_orderkey |", + "+--------+------------+", + "| A | 20 |", + "| B | 20 |", + "+--------+------------+", + ]; + + assert_batches_eq!(expected, &results); + + Ok(()) +} From 51d359f62b00fea2ee8c38194509dd117150c5c0 Mon Sep 17 00:00:00 2001 From: Ratul Dawar Date: Wed, 15 Apr 2026 22:37:56 +0530 Subject: [PATCH 3/6] test: add deterministic integration test for hash join deadlock This test forces a Partitioned join with an empty partition to ensure the dynamic filter coordination deadlock does not regress. Made-with: Cursor --- .config/nextest.toml | 0 .../core/tests/repro_deadlock_integration.rs | 17 +++++++++++++++++ 2 files changed, 17 insertions(+) delete mode 100644 .config/nextest.toml diff --git a/.config/nextest.toml b/.config/nextest.toml deleted file mode 100644 index e69de29bb2d1d..0000000000000 diff --git a/datafusion/core/tests/repro_deadlock_integration.rs b/datafusion/core/tests/repro_deadlock_integration.rs index eb6f97176bcdd..2876ebf8aac76 100644 --- a/datafusion/core/tests/repro_deadlock_integration.rs +++ b/datafusion/core/tests/repro_deadlock_integration.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + use std::sync::Arc; use datafusion::prelude::*; use datafusion::error::Result; From 64b73360d5e857907814c848d2faa20ee2173397 Mon Sep 17 00:00:00 2001 From: Ratul Dawar Date: Wed, 15 Apr 2026 22:46:20 +0530 Subject: [PATCH 4/6] test: add deterministic integration test for hash join deadlock This test forces a Partitioned join with an empty partition to ensure the dynamic filter coordination deadlock does not regress. Made-with: Cursor --- .../core/tests/repro_deadlock_integration.rs | 44 ++++++++++++------- 1 file changed, 29 insertions(+), 15 deletions(-) diff --git a/datafusion/core/tests/repro_deadlock_integration.rs b/datafusion/core/tests/repro_deadlock_integration.rs index 2876ebf8aac76..7fd31b80bf2c7 100644 --- a/datafusion/core/tests/repro_deadlock_integration.rs +++ b/datafusion/core/tests/repro_deadlock_integration.rs @@ -15,13 +15,13 @@ // specific language governing permissions and limitations // under the License. -use std::sync::Arc; -use datafusion::prelude::*; -use datafusion::error::Result; -use datafusion::assert_batches_eq; +use arrow::array::{Int32Array, StringArray}; use arrow::datatypes::{DataType, Field, Schema}; use arrow::record_batch::RecordBatch; -use arrow::array::{Int32Array, StringArray}; +use datafusion::assert_batches_eq; +use datafusion::error::Result; +use datafusion::prelude::*; +use std::sync::Arc; #[tokio::test] async fn test_hash_join_dynamic_filter_deadlock_with_empty_partition() -> Result<()> { @@ -32,7 +32,10 @@ async fn test_hash_join_dynamic_filter_deadlock_with_empty_partition() -> Result .set_bool("datafusion.optimizer.enable_round_robin_repartition", false) .set_bool("datafusion.optimizer.repartition_joins", true) .set_bool("datafusion.optimizer.prefer_hash_join", true) - .set_usize("datafusion.optimizer.hash_join_single_partition_threshold", 0); // Force Partitioned mode + .set_usize( + "datafusion.optimizer.hash_join_single_partition_threshold", + 0, + ); // Force Partitioned mode let ctx = SessionContext::new_with_config(config); // Create customer table with 2 rows that will both end up in the same partition @@ -50,7 +53,10 @@ async fn test_hash_join_dynamic_filter_deadlock_with_empty_partition() -> Result ], )?; // Register as a table provider with 2 partitions to force Partitioned mode - let customer_source = datafusion::datasource::MemTable::try_new(customer_schema, vec![vec![customer_batch.clone()], vec![]])?; + let customer_source = datafusion::datasource::MemTable::try_new( + customer_schema, + vec![vec![customer_batch.clone()], vec![]], + )?; ctx.register_table("customer", Arc::new(customer_source))?; // Create orders table @@ -65,7 +71,10 @@ async fn test_hash_join_dynamic_filter_deadlock_with_empty_partition() -> Result Arc::new(Int32Array::from(vec![1, 1])), ], )?; - let orders_source = datafusion::datasource::MemTable::try_new(orders_schema, vec![vec![orders_batch.clone()], vec![]])?; + let orders_source = datafusion::datasource::MemTable::try_new( + orders_schema, + vec![vec![orders_batch.clone()], vec![]], + )?; ctx.register_table("orders", Arc::new(orders_source))?; // Query with dynamic filter (IN subquery) @@ -80,19 +89,24 @@ async fn test_hash_join_dynamic_filter_deadlock_with_empty_partition() -> Result let dataframe = ctx.sql(sql).await?; let plan = dataframe.create_physical_plan().await?; - + // Verify Partitioned mode - let plan_str = format!("{:?}", plan); - println!("Plan: {}", plan_str); - assert!(plan_str.contains("HashJoinExec"), "Plan should use a hash join but got: {}", plan_str); + let plan_str = format!("{plan:?}"); + println!("Plan: {plan_str}"); + assert!( + plan_str.contains("HashJoinExec"), + "Plan should use a hash join but got: {plan_str}" + ); // Wrap the collection in a timeout to detect deadlock let task_ctx = ctx.task_ctx(); let results = tokio::time::timeout(std::time::Duration::from_secs(60), async move { datafusion::physical_plan::collect(plan, task_ctx).await - }).await.expect("DEADLOCK DETECTED: Query timed out after 1 minute")?; - - let expected = vec![ + }) + .await + .expect("DEADLOCK DETECTED: Query timed out after 1 minute")?; + + let expected = [ "+--------+------------+", "| c_name | o_orderkey |", "+--------+------------+", From f21e089e620d73ef8409419f6cb34c22b62e0b24 Mon Sep 17 00:00:00 2001 From: Ratul Dawar Date: Wed, 15 Apr 2026 23:18:52 +0530 Subject: [PATCH 5/6] trigger ci Made-with: Cursor From 2fa71e058e9a5418ac578e86bbc0afd860623bc1 Mon Sep 17 00:00:00 2001 From: Ratul Dawar Date: Wed, 15 Apr 2026 23:31:30 +0530 Subject: [PATCH 6/6] test: make agg_dyn_e2e test deterministic by ignoring metrics Made-with: Cursor --- .../test_files/push_down_filter_regression.slt | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/datafusion/sqllogictest/test_files/push_down_filter_regression.slt b/datafusion/sqllogictest/test_files/push_down_filter_regression.slt index 51ad998c02307..bdde1a87bdb45 100644 --- a/datafusion/sqllogictest/test_files/push_down_filter_regression.slt +++ b/datafusion/sqllogictest/test_files/push_down_filter_regression.slt @@ -219,16 +219,22 @@ statement ok set datafusion.execution.collect_statistics = true; statement ok -set datafusion.explain.analyze_categories = 'rows'; +set datafusion.explain.analyze_level = summary; + +statement ok +set datafusion.explain.analyze_categories = 'none'; query TT EXPLAIN ANALYZE select max(column1) from agg_dyn_e2e where column1 > 1; ---- Plan with Metrics -01)AggregateExec: mode=Final, gby=[], aggr=[max(agg_dyn_e2e.column1)], metrics=[output_rows=1, output_batches=1] -02)--CoalescePartitionsExec, metrics=[output_rows=2, output_batches=2] -03)----AggregateExec: mode=Partial, gby=[], aggr=[max(agg_dyn_e2e.column1)], metrics=[output_rows=2, output_batches=2] -04)------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_regression/agg_dyn/file_0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_regression/agg_dyn/file_1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_regression/agg_dyn/file_2.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_regression/agg_dyn/file_3.parquet]]}, projection=[column1], file_type=parquet, predicate=column1@0 > 1 AND DynamicFilter [ column1@0 > 4 ], pruning_predicate=column1_null_count@1 != row_count@2 AND column1_max@0 > 1 AND column1_null_count@1 != row_count@2 AND column1_max@0 > 4, required_guarantees=[], metrics=[output_rows=2, output_batches=2, files_ranges_pruned_statistics=4 total → 4 matched, row_groups_pruned_statistics=4 total → 2 matched -> 2 fully matched, row_groups_pruned_bloom_filter=2 total → 2 matched, page_index_pages_pruned=2 total → 2 matched, page_index_rows_pruned=2 total → 2 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=4, files_processed=4, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=2, pushdown_rows_pruned=0, predicate_cache_inner_records=2, predicate_cache_records=4, scan_efficiency_ratio=25.15% (130/517)] +01)AggregateExec: mode=Final, gby=[], aggr=[max(agg_dyn_e2e.column1)], metrics=[] +02)--CoalescePartitionsExec, metrics=[] +03)----AggregateExec: mode=Partial, gby=[], aggr=[max(agg_dyn_e2e.column1)], metrics=[] +04)------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_regression/agg_dyn/file_0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_regression/agg_dyn/file_1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_regression/agg_dyn/file_2.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_regression/agg_dyn/file_3.parquet]]}, projection=[column1], file_type=parquet, predicate=column1@0 > 1 AND DynamicFilter [ column1@0 > 4 ], pruning_predicate=column1_null_count@1 != row_count@2 AND column1_max@0 > 1 AND column1_null_count@1 != row_count@2 AND column1_max@0 > 4, required_guarantees=[], metrics=[] + +statement ok +reset datafusion.explain.analyze_level; statement ok reset datafusion.explain.analyze_categories;