Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
225 changes: 196 additions & 29 deletions datafusion/physical-plan/src/joins/hash_join/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1316,6 +1316,33 @@ impl ExecutionPlan for HashJoinExec {
.with_category(MetricCategory::Rows)
.counter(ARRAY_MAP_CREATED_COUNT_METRIC_NAME, partition);

// Initialize build_accumulator lazily with runtime partition counts (only if enabled)
// Use RepartitionExec's random state (seeds: 0,0,0,0) for partition routing
let repartition_random_state = REPARTITION_RANDOM_STATE;
let build_accumulator = enable_dynamic_filter_pushdown
.then(|| {
self.dynamic_filter.as_ref().map(|df| {
let filter = Arc::clone(&df.filter);
let on_right = self
.on
.iter()
.map(|(_, right_expr)| Arc::clone(right_expr))
.collect::<Vec<_>>();
Some(Arc::clone(df.build_accumulator.get_or_init(|| {
Arc::new(SharedBuildAccumulator::new_from_partition_mode(
Comment on lines +1319 to +1332
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 I'm trying to deduce what does this change do, but I'm seeing that it's not related with the fix and it's just a refactor? is that right or did I missed something?

self.mode,
self.left.as_ref(),
self.right.as_ref(),
filter,
on_right,
repartition_random_state,
))
})))
})
})
.flatten()
.flatten();

let left_fut = match self.mode {
PartitionMode::CollectLeft => self.left_fut.try_once(|| {
let left_stream = self.left.execute(0, Arc::clone(&context))?;
Expand Down Expand Up @@ -1343,7 +1370,6 @@ impl ExecutionPlan for HashJoinExec {
let reservation =
MemoryConsumer::new(format!("HashJoinInput[{partition}]"))
.register(context.memory_pool());

OnceFut::new(collect_left_input(
self.random_state.random_state().clone(),
left_stream,
Expand All @@ -1368,33 +1394,6 @@ impl ExecutionPlan for HashJoinExec {

let batch_size = context.session_config().batch_size();

// Initialize build_accumulator lazily with runtime partition counts (only if enabled)
// Use RepartitionExec's random state (seeds: 0,0,0,0) for partition routing
let repartition_random_state = REPARTITION_RANDOM_STATE;
let build_accumulator = enable_dynamic_filter_pushdown
.then(|| {
self.dynamic_filter.as_ref().map(|df| {
let filter = Arc::clone(&df.filter);
let on_right = self
.on
.iter()
.map(|(_, right_expr)| Arc::clone(right_expr))
.collect::<Vec<_>>();
Some(Arc::clone(df.build_accumulator.get_or_init(|| {
Arc::new(SharedBuildAccumulator::new_from_partition_mode(
self.mode,
self.left.as_ref(),
self.right.as_ref(),
filter,
on_right,
repartition_random_state,
))
})))
})
})
.flatten()
.flatten();

// we have the batches and the hash map with their keys. We can how create a stream
// over the right that uses this information to issue new batches.
let right_stream = self.right.execute(partition, context)?;
Expand Down Expand Up @@ -2347,6 +2346,22 @@ mod tests {
right: Arc<dyn ExecutionPlan>,
on: JoinOn,
join_type: JoinType,
) -> Result<(HashJoinExec, Arc<DynamicFilterPhysicalExpr>)> {
hash_join_with_dynamic_filter_and_mode(
left,
right,
on,
join_type,
PartitionMode::CollectLeft,
)
}

fn hash_join_with_dynamic_filter_and_mode(
left: Arc<dyn ExecutionPlan>,
right: Arc<dyn ExecutionPlan>,
on: JoinOn,
join_type: JoinType,
mode: PartitionMode,
) -> Result<(HashJoinExec, Arc<DynamicFilterPhysicalExpr>)> {
let dynamic_filter = HashJoinExec::create_dynamic_filter(&on);
let mut join = HashJoinExec::try_new(
Expand All @@ -2356,7 +2371,7 @@ mod tests {
None,
&join_type,
None,
PartitionMode::CollectLeft,
mode,
NullEquality::NullEqualsNothing,
false,
)?;
Expand Down Expand Up @@ -5628,6 +5643,158 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_partitioned_dynamic_filter_reports_empty_canceled_partitions()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Double checked with main and indeed this test reproduces the error 👍

-> Result<()> {
let mut session_config = SessionConfig::default();
session_config
.options_mut()
.optimizer
.enable_dynamic_filter_pushdown = true;
let task_ctx =
Arc::new(TaskContext::default().with_session_config(session_config));

let child_left_schema = Arc::new(Schema::new(vec![
Field::new("child_left_payload", DataType::Int32, false),
Field::new("child_key", DataType::Int32, false),
Field::new("child_left_extra", DataType::Int32, false),
]));
let child_right_schema = Arc::new(Schema::new(vec![
Field::new("child_right_payload", DataType::Int32, false),
Field::new("child_right_key", DataType::Int32, false),
Field::new("child_right_extra", DataType::Int32, false),
]));
let parent_left_schema = Arc::new(Schema::new(vec![
Field::new("parent_payload", DataType::Int32, false),
Field::new("parent_key", DataType::Int32, false),
Field::new("parent_extra", DataType::Int32, false),
]));

let child_left: Arc<dyn ExecutionPlan> = TestMemoryExec::try_new_exec(
&[
vec![build_table_i32(
("child_left_payload", &vec![10]),
("child_key", &vec![0]),
("child_left_extra", &vec![100]),
)],
vec![build_table_i32(
("child_left_payload", &vec![11]),
("child_key", &vec![1]),
("child_left_extra", &vec![101]),
)],
vec![build_table_i32(
("child_left_payload", &vec![12]),
("child_key", &vec![2]),
("child_left_extra", &vec![102]),
)],
vec![build_table_i32(
("child_left_payload", &vec![13]),
("child_key", &vec![3]),
("child_left_extra", &vec![103]),
)],
],
Arc::clone(&child_left_schema),
None,
)?;
let child_right: Arc<dyn ExecutionPlan> = TestMemoryExec::try_new_exec(
&[
vec![build_table_i32(
("child_right_payload", &vec![20]),
("child_right_key", &vec![0]),
("child_right_extra", &vec![200]),
)],
vec![build_table_i32(
("child_right_payload", &vec![21]),
("child_right_key", &vec![1]),
("child_right_extra", &vec![201]),
)],
vec![build_table_i32(
("child_right_payload", &vec![22]),
("child_right_key", &vec![2]),
("child_right_extra", &vec![202]),
)],
vec![build_table_i32(
("child_right_payload", &vec![23]),
("child_right_key", &vec![3]),
("child_right_extra", &vec![203]),
)],
],
Arc::clone(&child_right_schema),
None,
)?;
let parent_left: Arc<dyn ExecutionPlan> = TestMemoryExec::try_new_exec(
&[
vec![build_table_i32(
("parent_payload", &vec![30]),
("parent_key", &vec![0]),
("parent_extra", &vec![300]),
)],
vec![RecordBatch::new_empty(Arc::clone(&parent_left_schema))],
vec![build_table_i32(
("parent_payload", &vec![32]),
("parent_key", &vec![2]),
("parent_extra", &vec![302]),
)],
vec![RecordBatch::new_empty(Arc::clone(&parent_left_schema))],
],
Arc::clone(&parent_left_schema),
None,
)?;

let child_on = vec![(
Arc::new(Column::new_with_schema("child_key", &child_left_schema)?) as _,
Arc::new(Column::new_with_schema(
"child_right_key",
&child_right_schema,
)?) as _,
)];
let (child_join, _child_dynamic_filter) = hash_join_with_dynamic_filter_and_mode(
child_left,
child_right,
child_on,
JoinType::Inner,
PartitionMode::Partitioned,
)?;
let child_join: Arc<dyn ExecutionPlan> = Arc::new(child_join);

let parent_on = vec![(
Arc::new(Column::new_with_schema("parent_key", &parent_left_schema)?) as _,
Arc::new(Column::new_with_schema("child_key", &child_join.schema())?) as _,
)];
let parent_join = HashJoinExec::try_new(
parent_left,
child_join,
parent_on,
None,
&JoinType::RightSemi,
None,
PartitionMode::Partitioned,
NullEquality::NullEqualsNothing,
false,
)?;

let batches = tokio::time::timeout(
std::time::Duration::from_secs(5),
crate::execution_plan::collect(Arc::new(parent_join), task_ctx),
)
.await
.expect("partitioned right-semi join should not hang")?;

assert_batches_sorted_eq!(
[
"+--------------------+-----------+------------------+---------------------+-----------------+-------------------+",
"| child_left_payload | child_key | child_left_extra | child_right_payload | child_right_key | child_right_extra |",
"+--------------------+-----------+------------------+---------------------+-----------------+-------------------+",
"| 10 | 0 | 100 | 20 | 0 | 200 |",
"| 12 | 2 | 102 | 22 | 2 | 202 |",
"+--------------------+-----------+------------------+---------------------+-----------------+-------------------+",
],
&batches
);

Ok(())
}

#[tokio::test]
async fn test_hash_join_skips_probe_on_empty_build_after_partition_bounds_report()
-> Result<()> {
Expand Down
Loading
Loading