Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 121 additions & 0 deletions datafusion/core/tests/repro_deadlock_integration.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow::array::{Int32Array, StringArray};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use datafusion::assert_batches_eq;
use datafusion::error::Result;
use datafusion::prelude::*;
use std::sync::Arc;

#[tokio::test]
async fn test_hash_join_dynamic_filter_deadlock_with_empty_partition() -> Result<()> {
// We use 2 partitions and 2 rows, but we use a filter that puts all rows in one partition
// and zero rows in the other partition.
let config = SessionConfig::new()
.with_target_partitions(2)
.set_bool("datafusion.optimizer.enable_round_robin_repartition", false)
.set_bool("datafusion.optimizer.repartition_joins", true)
.set_bool("datafusion.optimizer.prefer_hash_join", true)
.set_usize(
"datafusion.optimizer.hash_join_single_partition_threshold",
0,
); // Force Partitioned mode
let ctx = SessionContext::new_with_config(config);

// Create customer table with 2 rows that will both end up in the same partition
// if the join key is the same (or hashes to the same partition).
let customer_schema = Arc::new(Schema::new(vec![
Field::new("c_custkey", DataType::Int32, false),
Field::new("c_name", DataType::Utf8, false),
]));
// Both rows have c_custkey = 1, so they MUST go to the same partition in Partitioned join
let customer_batch = RecordBatch::try_new(
customer_schema.clone(),
vec![
Arc::new(Int32Array::from(vec![1, 1])),
Arc::new(StringArray::from(vec!["A", "B"])),
],
)?;
// Register as a table provider with 2 partitions to force Partitioned mode
let customer_source = datafusion::datasource::MemTable::try_new(
customer_schema,
vec![vec![customer_batch.clone()], vec![]],
)?;
ctx.register_table("customer", Arc::new(customer_source))?;

// Create orders table
let orders_schema = Arc::new(Schema::new(vec![
Field::new("o_orderkey", DataType::Int32, false),
Field::new("o_custkey", DataType::Int32, false),
]));
let orders_batch = RecordBatch::try_new(
orders_schema.clone(),
vec![
Arc::new(Int32Array::from(vec![10, 20])),
Arc::new(Int32Array::from(vec![1, 1])),
],
)?;
let orders_source = datafusion::datasource::MemTable::try_new(
orders_schema,
vec![vec![orders_batch.clone()], vec![]],
)?;
ctx.register_table("orders", Arc::new(orders_source))?;

// Query with dynamic filter (IN subquery)
// The build side (right side of join) will have 2 partitions.
// Since all o_custkey are 1, one partition will have 2 rows, and the other will have 0 rows.
let sql = "
SELECT c_name, o_orderkey
FROM customer
JOIN orders ON c_custkey = o_custkey
WHERE o_orderkey IN (SELECT o_orderkey FROM orders WHERE o_orderkey > 15)
";

let dataframe = ctx.sql(sql).await?;
let plan = dataframe.create_physical_plan().await?;

// Verify Partitioned mode
let plan_str = format!("{plan:?}");
println!("Plan: {plan_str}");
assert!(
plan_str.contains("HashJoinExec"),
"Plan should use a hash join but got: {plan_str}"
);

// Wrap the collection in a timeout to detect deadlock
let task_ctx = ctx.task_ctx();
let results = tokio::time::timeout(std::time::Duration::from_secs(60), async move {
datafusion::physical_plan::collect(plan, task_ctx).await
})
.await
.expect("DEADLOCK DETECTED: Query timed out after 1 minute")?;

let expected = [
"+--------+------------+",
"| c_name | o_orderkey |",
"+--------+------------+",
"| A | 20 |",
"| B | 20 |",
"+--------+------------+",
];

assert_batches_eq!(expected, &results);

Ok(())
}
36 changes: 24 additions & 12 deletions datafusion/physical-plan/src/joins/hash_join/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,13 +408,13 @@ impl HashJoinStream {

/// Returns the next state after the build side has been fully collected
/// and any required build-side coordination has completed.
fn state_after_build_ready(
join_type: JoinType,
left_data: &JoinLeftData,
) -> HashJoinStreamState {
if left_data.map().is_empty()
&& join_type.empty_build_side_produces_empty_result()
{
fn state_after_build_ready(&self, left_data: &JoinLeftData) -> HashJoinStreamState {
let is_empty = left_data.map().is_empty();
let produces_empty = self.join_type.empty_build_side_produces_empty_result();
let coordinated = self.build_accumulator.is_some();
let is_initial_collect = matches!(self.state, HashJoinStreamState::WaitBuildSide);

if is_empty && produces_empty && (!coordinated || !is_initial_collect) {
HashJoinStreamState::Completed
} else {
HashJoinStreamState::FetchProbeBatch
Expand Down Expand Up @@ -485,8 +485,7 @@ impl HashJoinStream {
ready!(fut.get_shared(cx))?;
}
let build_side = self.build_side.try_as_ready()?;
self.state =
Self::state_after_build_ready(self.join_type, build_side.left_data.as_ref());
self.state = self.state_after_build_ready(build_side.left_data.as_ref());
Poll::Ready(Ok(StatefulStreamResult::Continue))
}

Expand Down Expand Up @@ -557,8 +556,7 @@ impl HashJoinStream {
}));
self.state = HashJoinStreamState::WaitPartitionBoundsReport;
} else {
self.state =
Self::state_after_build_ready(self.join_type, left_data.as_ref());
self.state = self.state_after_build_ready(left_data.as_ref());
}

self.build_side = BuildSide::Ready(BuildSideReadyState { left_data });
Expand Down Expand Up @@ -668,7 +666,21 @@ impl HashJoinStream {
if is_empty {
// Invariant: state_after_build_ready should have already completed
// join types whose result is fixed to empty when the build side is empty.
debug_assert!(!self.join_type.empty_build_side_produces_empty_result());
//
// However, when dynamic filtering is enabled, we skip the short-circuit
// in state_after_build_ready to ensure all partitions report their
// build-side data. In this case, we might reach here with an empty
// build side even for join types that produce empty results.
if self.build_accumulator.is_none() {
debug_assert!(!self.join_type.empty_build_side_produces_empty_result());
}

if self.join_type.empty_build_side_produces_empty_result() {
timer.done();
self.state = HashJoinStreamState::FetchProbeBatch;
return Ok(StatefulStreamResult::Continue);
}

let result = build_batch_empty_build_side(
&self.schema,
build_side.left_data.batch(),
Expand Down
16 changes: 11 additions & 5 deletions datafusion/sqllogictest/test_files/push_down_filter_regression.slt
Original file line number Diff line number Diff line change
Expand Up @@ -219,16 +219,22 @@ statement ok
set datafusion.execution.collect_statistics = true;

statement ok
set datafusion.explain.analyze_categories = 'rows';
set datafusion.explain.analyze_level = summary;

statement ok
set datafusion.explain.analyze_categories = 'none';

query TT
EXPLAIN ANALYZE select max(column1) from agg_dyn_e2e where column1 > 1;
----
Plan with Metrics
01)AggregateExec: mode=Final, gby=[], aggr=[max(agg_dyn_e2e.column1)], metrics=[output_rows=1, output_batches=1]
02)--CoalescePartitionsExec, metrics=[output_rows=2, output_batches=2]
03)----AggregateExec: mode=Partial, gby=[], aggr=[max(agg_dyn_e2e.column1)], metrics=[output_rows=2, output_batches=2]
04)------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_regression/agg_dyn/file_0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_regression/agg_dyn/file_1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_regression/agg_dyn/file_2.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_regression/agg_dyn/file_3.parquet]]}, projection=[column1], file_type=parquet, predicate=column1@0 > 1 AND DynamicFilter [ column1@0 > 4 ], pruning_predicate=column1_null_count@1 != row_count@2 AND column1_max@0 > 1 AND column1_null_count@1 != row_count@2 AND column1_max@0 > 4, required_guarantees=[], metrics=[output_rows=2, output_batches=2, files_ranges_pruned_statistics=4 total → 4 matched, row_groups_pruned_statistics=4 total → 2 matched -> 2 fully matched, row_groups_pruned_bloom_filter=2 total → 2 matched, page_index_pages_pruned=2 total → 2 matched, page_index_rows_pruned=2 total → 2 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=4, files_processed=4, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=2, pushdown_rows_pruned=0, predicate_cache_inner_records=2, predicate_cache_records=4, scan_efficiency_ratio=25.15% (130/517)]
01)AggregateExec: mode=Final, gby=[], aggr=[max(agg_dyn_e2e.column1)], metrics=[]
02)--CoalescePartitionsExec, metrics=[]
03)----AggregateExec: mode=Partial, gby=[], aggr=[max(agg_dyn_e2e.column1)], metrics=[]
04)------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_regression/agg_dyn/file_0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_regression/agg_dyn/file_1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_regression/agg_dyn/file_2.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_regression/agg_dyn/file_3.parquet]]}, projection=[column1], file_type=parquet, predicate=column1@0 > 1 AND DynamicFilter [ column1@0 > 4 ], pruning_predicate=column1_null_count@1 != row_count@2 AND column1_max@0 > 1 AND column1_null_count@1 != row_count@2 AND column1_max@0 > 4, required_guarantees=[], metrics=[]

statement ok
reset datafusion.explain.analyze_level;

statement ok
reset datafusion.explain.analyze_categories;
Expand Down
Loading