From 49dc129e38d2aa7a3f1c5e8ac9b935d50522f5d6 Mon Sep 17 00:00:00 2001 From: Eduard Karacharov Date: Sun, 4 Aug 2024 17:01:27 +0300 Subject: [PATCH 1/4] tests: hash join tests with hash collisions --- .../physical-plan/src/joins/hash_join.rs | 83 ++++++++++++------- 1 file changed, 53 insertions(+), 30 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 063f35059fb8..3660a1b924ae 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -1583,7 +1583,6 @@ mod tests { use rstest::*; use rstest_reuse::*; - #[cfg(not(feature = "force_hash_collisions"))] fn div_ceil(a: usize, b: usize) -> usize { (a + b - 1) / b } @@ -1931,9 +1930,6 @@ mod tests { Ok(()) } - // FIXME(#TODO) test fails with feature `force_hash_collisions` - // https://github.com/apache/datafusion/issues/11658 - #[cfg(not(feature = "force_hash_collisions"))] #[apply(batch_sizes)] #[tokio::test] async fn join_inner_two(batch_size: usize) -> Result<()> { @@ -1964,12 +1960,21 @@ mod tests { assert_eq!(columns, vec!["a1", "b2", "c1", "a1", "b2", "c2"]); - // expected joined records = 3 - // in case batch_size is 1 - additional empty batch for remaining 3-2 row - let mut expected_batch_count = div_ceil(3, batch_size); - if batch_size == 1 { - expected_batch_count += 1; - } + let expected_batch_count = if cfg!(not(feature = "force_hash_collisions")) { + // Expected number of hash table matches = 3 + // in case batch_size is 1 - additional empty batch for remaining 3-2 row + let mut expected_batch_count = div_ceil(3, batch_size); + if batch_size == 1 { + expected_batch_count += 1; + } + expected_batch_count + } else { + // With hash collisions enabled, all records will match each other + // and filtered later. + // Expected number of matches = 9 + div_ceil(9, batch_size) + }; + assert_eq!(batches.len(), expected_batch_count); let expected = [ @@ -1989,9 +1994,6 @@ mod tests { } /// Test where the left has 2 parts, the right with 1 part => 1 part - // FIXME(#TODO) test fails with feature `force_hash_collisions` - // https://github.com/apache/datafusion/issues/11658 - #[cfg(not(feature = "force_hash_collisions"))] #[apply(batch_sizes)] #[tokio::test] async fn join_inner_one_two_parts_left(batch_size: usize) -> Result<()> { @@ -2029,12 +2031,21 @@ mod tests { assert_eq!(columns, vec!["a1", "b2", "c1", "a1", "b2", "c2"]); - // expected joined records = 3 - // in case batch_size is 1 - additional empty batch for remaining 3-2 row - let mut expected_batch_count = div_ceil(3, batch_size); - if batch_size == 1 { - expected_batch_count += 1; - } + let expected_batch_count = if cfg!(not(feature = "force_hash_collisions")) { + // Expected number of hash table matches = 3 + // in case batch_size is 1 - additional empty batch for remaining 3-2 row + let mut expected_batch_count = div_ceil(3, batch_size); + if batch_size == 1 { + expected_batch_count += 1; + } + expected_batch_count + } else { + // With hash collisions enabled, all records will match each other + // and filtered later. + // Expected number of matches = 9 + div_ceil(9, batch_size) + }; + assert_eq!(batches.len(), expected_batch_count); let expected = [ @@ -2104,9 +2115,6 @@ mod tests { } /// Test where the left has 1 part, the right has 2 parts => 2 parts - // FIXME(#TODO) test fails with feature `force_hash_collisions` - // https://github.com/apache/datafusion/issues/11658 - #[cfg(not(feature = "force_hash_collisions"))] #[apply(batch_sizes)] #[tokio::test] async fn join_inner_one_two_parts_right(batch_size: usize) -> Result<()> { @@ -2143,12 +2151,20 @@ mod tests { let stream = join.execute(0, Arc::clone(&task_ctx))?; let batches = common::collect(stream).await?; - // expected joined records = 1 (first right batch) - // and additional empty batch for non-joined 20-6-80 - let mut expected_batch_count = div_ceil(1, batch_size); - if batch_size == 1 { - expected_batch_count += 1; - } + let expected_batch_count = if cfg!(not(feature = "force_hash_collisions")) { + // Expected number of hash table matches for first right batch = 1 + // and additional empty batch for non-joined 20-6-80 + let mut expected_batch_count = div_ceil(1, batch_size); + if batch_size == 1 { + expected_batch_count += 1; + } + expected_batch_count + } else { + // With hash collisions enabled, all records will match each other + // and filtered later. + // Expected number of matches = 6 + div_ceil(6, batch_size) + }; assert_eq!(batches.len(), expected_batch_count); let expected = [ @@ -2166,8 +2182,15 @@ mod tests { let stream = join.execute(1, Arc::clone(&task_ctx))?; let batches = common::collect(stream).await?; - // expected joined records = 2 (second right batch) - let expected_batch_count = div_ceil(2, batch_size); + let expected_batch_count = if cfg!(not(feature = "force_hash_collisions")) { + // Expected number of hash table matches for second right batch = 2 + div_ceil(2, batch_size) + } else { + // With hash collisions enabled, all records will match each other + // and filtered later. + // Expected number of matches = 3 + div_ceil(3, batch_size) + }; assert_eq!(batches.len(), expected_batch_count); let expected = [ From 10e6b7434266e164c5faba9e82fbc95b4878a07e Mon Sep 17 00:00:00 2001 From: Eduard Karacharov Date: Sun, 4 Aug 2024 21:23:23 +0300 Subject: [PATCH 2/4] replace div_ceil expression with function call --- datafusion/physical-plan/src/joins/hash_join.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 3660a1b924ae..54132598d9f5 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -3755,9 +3755,9 @@ mod tests { | JoinType::Right | JoinType::RightSemi | JoinType::RightAnti => { - (expected_resultset_records + batch_size - 1) / batch_size + div_ceil(expected_resultset_records, batch_size) } - _ => (expected_resultset_records + batch_size - 1) / batch_size + 1, + _ => div_ceil(expected_resultset_records, batch_size) + 1, }; assert_eq!( batches.len(), From 436b1d5e141ddf432861dc8a0797ffe8ab917722 Mon Sep 17 00:00:00 2001 From: Eduard Karacharov Date: Tue, 6 Aug 2024 07:01:35 +0300 Subject: [PATCH 3/4] Apply suggestions from code review Co-authored-by: Andrew Lamb --- datafusion/physical-plan/src/joins/hash_join.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 54132598d9f5..24b1c588c9d0 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -1971,7 +1971,6 @@ mod tests { } else { // With hash collisions enabled, all records will match each other // and filtered later. - // Expected number of matches = 9 div_ceil(9, batch_size) }; @@ -2042,7 +2041,6 @@ mod tests { } else { // With hash collisions enabled, all records will match each other // and filtered later. - // Expected number of matches = 9 div_ceil(9, batch_size) }; From e63938e98ebd6e3817ff3290695ad2a40d1c2cc5 Mon Sep 17 00:00:00 2001 From: Eduard Karacharov Date: Tue, 6 Aug 2024 07:03:39 +0300 Subject: [PATCH 4/4] remove redundant comments --- datafusion/physical-plan/src/joins/hash_join.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 24b1c588c9d0..14835f717ea3 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -2160,7 +2160,6 @@ mod tests { } else { // With hash collisions enabled, all records will match each other // and filtered later. - // Expected number of matches = 6 div_ceil(6, batch_size) }; assert_eq!(batches.len(), expected_batch_count); @@ -2186,7 +2185,6 @@ mod tests { } else { // With hash collisions enabled, all records will match each other // and filtered later. - // Expected number of matches = 3 div_ceil(3, batch_size) }; assert_eq!(batches.len(), expected_batch_count);