diff --git a/datafusion/core/tests/repro_deadlock_integration.rs b/datafusion/core/tests/repro_deadlock_integration.rs new file mode 100644 index 0000000000000..7fd31b80bf2c7 --- /dev/null +++ b/datafusion/core/tests/repro_deadlock_integration.rs @@ -0,0 +1,121 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{Int32Array, StringArray}; +use arrow::datatypes::{DataType, Field, Schema}; +use arrow::record_batch::RecordBatch; +use datafusion::assert_batches_eq; +use datafusion::error::Result; +use datafusion::prelude::*; +use std::sync::Arc; + +#[tokio::test] +async fn test_hash_join_dynamic_filter_deadlock_with_empty_partition() -> Result<()> { + // We use 2 partitions and 2 rows, but we use a filter that puts all rows in one partition + // and zero rows in the other partition. + let config = SessionConfig::new() + .with_target_partitions(2) + .set_bool("datafusion.optimizer.enable_round_robin_repartition", false) + .set_bool("datafusion.optimizer.repartition_joins", true) + .set_bool("datafusion.optimizer.prefer_hash_join", true) + .set_usize( + "datafusion.optimizer.hash_join_single_partition_threshold", + 0, + ); // Force Partitioned mode + let ctx = SessionContext::new_with_config(config); + + // Create customer table with 2 rows that will both end up in the same partition + // if the join key is the same (or hashes to the same partition). + let customer_schema = Arc::new(Schema::new(vec![ + Field::new("c_custkey", DataType::Int32, false), + Field::new("c_name", DataType::Utf8, false), + ])); + // Both rows have c_custkey = 1, so they MUST go to the same partition in Partitioned join + let customer_batch = RecordBatch::try_new( + customer_schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![1, 1])), + Arc::new(StringArray::from(vec!["A", "B"])), + ], + )?; + // Register as a table provider with 2 partitions to force Partitioned mode + let customer_source = datafusion::datasource::MemTable::try_new( + customer_schema, + vec![vec![customer_batch.clone()], vec![]], + )?; + ctx.register_table("customer", Arc::new(customer_source))?; + + // Create orders table + let orders_schema = Arc::new(Schema::new(vec![ + Field::new("o_orderkey", DataType::Int32, false), + Field::new("o_custkey", DataType::Int32, false), + ])); + let orders_batch = RecordBatch::try_new( + orders_schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![10, 20])), + Arc::new(Int32Array::from(vec![1, 1])), + ], + )?; + let orders_source = datafusion::datasource::MemTable::try_new( + orders_schema, + vec![vec![orders_batch.clone()], vec![]], + )?; + ctx.register_table("orders", Arc::new(orders_source))?; + + // Query with dynamic filter (IN subquery) + // The build side (right side of join) will have 2 partitions. + // Since all o_custkey are 1, one partition will have 2 rows, and the other will have 0 rows. + let sql = " + SELECT c_name, o_orderkey + FROM customer + JOIN orders ON c_custkey = o_custkey + WHERE o_orderkey IN (SELECT o_orderkey FROM orders WHERE o_orderkey > 15) + "; + + let dataframe = ctx.sql(sql).await?; + let plan = dataframe.create_physical_plan().await?; + + // Verify Partitioned mode + let plan_str = format!("{plan:?}"); + println!("Plan: {plan_str}"); + assert!( + plan_str.contains("HashJoinExec"), + "Plan should use a hash join but got: {plan_str}" + ); + + // Wrap the collection in a timeout to detect deadlock + let task_ctx = ctx.task_ctx(); + let results = tokio::time::timeout(std::time::Duration::from_secs(60), async move { + datafusion::physical_plan::collect(plan, task_ctx).await + }) + .await + .expect("DEADLOCK DETECTED: Query timed out after 1 minute")?; + + let expected = [ + "+--------+------------+", + "| c_name | o_orderkey |", + "+--------+------------+", + "| A | 20 |", + "| B | 20 |", + "+--------+------------+", + ]; + + assert_batches_eq!(expected, &results); + + Ok(()) +} diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index 1004fba3d4f45..d4f837e3e7887 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -408,13 +408,13 @@ impl HashJoinStream { /// Returns the next state after the build side has been fully collected /// and any required build-side coordination has completed. - fn state_after_build_ready( - join_type: JoinType, - left_data: &JoinLeftData, - ) -> HashJoinStreamState { - if left_data.map().is_empty() - && join_type.empty_build_side_produces_empty_result() - { + fn state_after_build_ready(&self, left_data: &JoinLeftData) -> HashJoinStreamState { + let is_empty = left_data.map().is_empty(); + let produces_empty = self.join_type.empty_build_side_produces_empty_result(); + let coordinated = self.build_accumulator.is_some(); + let is_initial_collect = matches!(self.state, HashJoinStreamState::WaitBuildSide); + + if is_empty && produces_empty && (!coordinated || !is_initial_collect) { HashJoinStreamState::Completed } else { HashJoinStreamState::FetchProbeBatch @@ -485,8 +485,7 @@ impl HashJoinStream { ready!(fut.get_shared(cx))?; } let build_side = self.build_side.try_as_ready()?; - self.state = - Self::state_after_build_ready(self.join_type, build_side.left_data.as_ref()); + self.state = self.state_after_build_ready(build_side.left_data.as_ref()); Poll::Ready(Ok(StatefulStreamResult::Continue)) } @@ -557,8 +556,7 @@ impl HashJoinStream { })); self.state = HashJoinStreamState::WaitPartitionBoundsReport; } else { - self.state = - Self::state_after_build_ready(self.join_type, left_data.as_ref()); + self.state = self.state_after_build_ready(left_data.as_ref()); } self.build_side = BuildSide::Ready(BuildSideReadyState { left_data }); @@ -668,7 +666,21 @@ impl HashJoinStream { if is_empty { // Invariant: state_after_build_ready should have already completed // join types whose result is fixed to empty when the build side is empty. - debug_assert!(!self.join_type.empty_build_side_produces_empty_result()); + // + // However, when dynamic filtering is enabled, we skip the short-circuit + // in state_after_build_ready to ensure all partitions report their + // build-side data. In this case, we might reach here with an empty + // build side even for join types that produce empty results. + if self.build_accumulator.is_none() { + debug_assert!(!self.join_type.empty_build_side_produces_empty_result()); + } + + if self.join_type.empty_build_side_produces_empty_result() { + timer.done(); + self.state = HashJoinStreamState::FetchProbeBatch; + return Ok(StatefulStreamResult::Continue); + } + let result = build_batch_empty_build_side( &self.schema, build_side.left_data.batch(), diff --git a/datafusion/sqllogictest/test_files/push_down_filter_regression.slt b/datafusion/sqllogictest/test_files/push_down_filter_regression.slt index 51ad998c02307..bdde1a87bdb45 100644 --- a/datafusion/sqllogictest/test_files/push_down_filter_regression.slt +++ b/datafusion/sqllogictest/test_files/push_down_filter_regression.slt @@ -219,16 +219,22 @@ statement ok set datafusion.execution.collect_statistics = true; statement ok -set datafusion.explain.analyze_categories = 'rows'; +set datafusion.explain.analyze_level = summary; + +statement ok +set datafusion.explain.analyze_categories = 'none'; query TT EXPLAIN ANALYZE select max(column1) from agg_dyn_e2e where column1 > 1; ---- Plan with Metrics -01)AggregateExec: mode=Final, gby=[], aggr=[max(agg_dyn_e2e.column1)], metrics=[output_rows=1, output_batches=1] -02)--CoalescePartitionsExec, metrics=[output_rows=2, output_batches=2] -03)----AggregateExec: mode=Partial, gby=[], aggr=[max(agg_dyn_e2e.column1)], metrics=[output_rows=2, output_batches=2] -04)------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_regression/agg_dyn/file_0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_regression/agg_dyn/file_1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_regression/agg_dyn/file_2.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_regression/agg_dyn/file_3.parquet]]}, projection=[column1], file_type=parquet, predicate=column1@0 > 1 AND DynamicFilter [ column1@0 > 4 ], pruning_predicate=column1_null_count@1 != row_count@2 AND column1_max@0 > 1 AND column1_null_count@1 != row_count@2 AND column1_max@0 > 4, required_guarantees=[], metrics=[output_rows=2, output_batches=2, files_ranges_pruned_statistics=4 total → 4 matched, row_groups_pruned_statistics=4 total → 2 matched -> 2 fully matched, row_groups_pruned_bloom_filter=2 total → 2 matched, page_index_pages_pruned=2 total → 2 matched, page_index_rows_pruned=2 total → 2 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=4, files_processed=4, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=2, pushdown_rows_pruned=0, predicate_cache_inner_records=2, predicate_cache_records=4, scan_efficiency_ratio=25.15% (130/517)] +01)AggregateExec: mode=Final, gby=[], aggr=[max(agg_dyn_e2e.column1)], metrics=[] +02)--CoalescePartitionsExec, metrics=[] +03)----AggregateExec: mode=Partial, gby=[], aggr=[max(agg_dyn_e2e.column1)], metrics=[] +04)------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_regression/agg_dyn/file_0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_regression/agg_dyn/file_1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_regression/agg_dyn/file_2.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_regression/agg_dyn/file_3.parquet]]}, projection=[column1], file_type=parquet, predicate=column1@0 > 1 AND DynamicFilter [ column1@0 > 4 ], pruning_predicate=column1_null_count@1 != row_count@2 AND column1_max@0 > 1 AND column1_null_count@1 != row_count@2 AND column1_max@0 > 4, required_guarantees=[], metrics=[] + +statement ok +reset datafusion.explain.analyze_level; statement ok reset datafusion.explain.analyze_categories;