From a33e490a1b6ea400832df89cbfdac41dccecb10a Mon Sep 17 00:00:00 2001 From: Emil Ejbyfeldt Date: Tue, 2 Jul 2024 22:45:59 +0200 Subject: [PATCH 1/7] Fixes to 10749 and generalization --- .../physical-plan/src/joins/hash_join.rs | 110 +++++++++++++++++- 1 file changed, 107 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index b2f9ef5607458..5c971391f177e 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -1212,11 +1212,16 @@ fn eq_dyn_null( right: &dyn Array, null_equals_null: bool, ) -> Result { - // Nested datatypes cannot use the underlying not_distinct function and must use a special + // Nested datatypes cannot use the underlying not_distinct/eq function and must use a special // implementation // - if left.data_type().is_nested() && null_equals_null { - return Ok(compare_op_for_nested(&Operator::Eq, &left, &right)?); + if left.data_type().is_nested() { + let op = if null_equals_null { + Operator::IsNotDistinctFrom + } else { + Operator::Eq + }; + return Ok(compare_op_for_nested(&op, &left, &right)?); } match (left.data_type(), right.data_type()) { _ if null_equals_null => not_distinct(&left, &right), @@ -1546,6 +1551,8 @@ mod tests { use arrow::array::{Date32Array, Int32Array, UInt32Builder, UInt64Builder}; use arrow::datatypes::{DataType, Field}; + use arrow_array::StructArray; + use arrow_buffer::NullBuffer; use datafusion_common::{ assert_batches_eq, assert_batches_sorted_eq, assert_contains, exec_err, ScalarValue, @@ -3844,6 +3851,103 @@ mod tests { Ok(()) } + pub fn build_table_struct( + struct_name: &str, + a: (&str, &Vec>), + nulls: Option, + ) -> Arc { + let inner_fields = vec![Field::new(a.0, DataType::Int32, true)]; + let schema = Schema::new(vec![Field::new( + struct_name, + DataType::Struct(inner_fields.clone().into()), + nulls.is_some(), + )]); + + let batch = RecordBatch::try_new( + Arc::new(schema), + vec![Arc::new(StructArray::new( + inner_fields.into(), + vec![Arc::new(Int32Array::from(a.1.clone()))], + nulls, + ))], + ) + .unwrap(); + let schema_ref = batch.schema(); + Arc::new(MemoryExec::try_new(&[vec![batch]], schema_ref, None).unwrap()) + } + + #[tokio::test] + async fn join_on_struct() -> Result<()> { + let task_ctx = Arc::new(TaskContext::default()); + let left = + build_table_struct("n1", ("a", &vec![None, Some(1), Some(2), Some(3)]), None); + let right = + build_table_struct("n2", ("a", &vec![None, Some(1), Some(2), Some(4)]), None); + let on = vec![( + Arc::new(Column::new_with_schema("n1", &left.schema())?) as _, + Arc::new(Column::new_with_schema("n2", &right.schema())?) as _, + )]; + + let (columns, batches) = + join_collect(left, right, on, &JoinType::Inner, false, task_ctx).await?; + + assert_eq!(columns, vec!["n1", "n2"]); + + let expected = [ + "+--------+--------+", + "| n1 | n2 |", + "+--------+--------+", + "| {a: } | {a: } |", + "| {a: 1} | {a: 1} |", + "| {a: 2} | {a: 2} |", + "+--------+--------+", + ]; + assert_batches_eq!(expected, &batches); + + Ok(()) + } + + #[tokio::test] + async fn join_on_struct_with_nulls() -> Result<()> { + let task_ctx = Arc::new(TaskContext::default()); + let left = + build_table_struct("n1", ("a", &vec![None]), Some(NullBuffer::new_null(1))); + let right = + build_table_struct("n2", ("a", &vec![None]), Some(NullBuffer::new_null(1))); + let on = vec![( + Arc::new(Column::new_with_schema("n1", &left.schema())?) as _, + Arc::new(Column::new_with_schema("n2", &right.schema())?) as _, + )]; + + let (_, batches_null_eq) = join_collect( + left.clone(), + right.clone(), + on.clone(), + &JoinType::Inner, + true, + task_ctx.clone(), + ) + .await?; + + let expected_null_eq = [ + "+----+----+", + "| n1 | n2 |", + "+----+----+", + "| | |", + "+----+----+", + ]; + assert_batches_eq!(expected_null_eq, &batches_null_eq); + + let (_, batches_null_neq) = + join_collect(left, right, on, &JoinType::Inner, false, task_ctx).await?; + + let expected_null_neq = + ["+----+----+", "| n1 | n2 |", "+----+----+", "+----+----+"]; + assert_batches_eq!(expected_null_neq, &batches_null_neq); + + Ok(()) + } + /// Returns the column names on the schema fn columns(schema: &Schema) -> Vec { schema.fields().iter().map(|f| f.name().clone()).collect() From b747c4c81f379d162c2f4a9a26631959593e2f76 Mon Sep 17 00:00:00 2001 From: Emil Ejbyfeldt Date: Wed, 3 Jul 2024 10:19:34 +0200 Subject: [PATCH 2/7] Add e2e tests for joins on struct --- datafusion/sqllogictest/test_files/joins.slt | 37 ++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index 3cbeea0f92221..5e6661d563270 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -53,6 +53,20 @@ AS VALUES (44, 'x', 3), (55, 'w', 3); +statement ok +CREATE TABLE join_t3(s3 struct) + AS VALUES + (NULL), + (struct(1)), + (struct(2)); + +statement ok +CREATE TABLE join_t4(s4 struct) + AS VALUES + (NULL), + (struct(2)), + (struct(3)); + # Left semi anti join statement ok @@ -1336,6 +1350,29 @@ physical_plan 10)----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 11)------------MemoryExec: partitions=1, partition_sizes=[1] + +statement ok +set datafusion.optimizer.prefer_hash_join = true; + + +# Join on struct +query ?? +select join_t3.s3, join_t4.s4 +from join_t3 +inner join join_t4 on join_t3.s3 = join_t4.s4 +---- +{id: 2} {id: 2} + +# Join on struct with null +query ?? +select join_t3.s3, join_t4.s4 +from join_t3 +inner join join_t4 on join_t3.s3 IS NOT DISTINCT FROM join_t4.s4 +---- +NULL NULL +{id: 2} {id: 2} + + query TT EXPLAIN select count(*) From 29302c5ab02f8c7c31bcedba19eea1055f0e6c74 Mon Sep 17 00:00:00 2001 From: Emil Ejbyfeldt Date: Wed, 3 Jul 2024 20:37:08 +0200 Subject: [PATCH 3/7] PR comments --- datafusion/physical-plan/src/joins/hash_join.rs | 9 +++++---- datafusion/sqllogictest/test_files/joins.slt | 6 +----- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 5c971391f177e..c6ef9936b9c5a 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -3851,12 +3851,13 @@ mod tests { Ok(()) } - pub fn build_table_struct( + fn build_table_struct( struct_name: &str, - a: (&str, &Vec>), + field_name_and_values: (&str, &Vec>), nulls: Option, ) -> Arc { - let inner_fields = vec![Field::new(a.0, DataType::Int32, true)]; + let (field_name, values) = field_name_and_values; + let inner_fields = vec![Field::new(field_name, DataType::Int32, true)]; let schema = Schema::new(vec![Field::new( struct_name, DataType::Struct(inner_fields.clone().into()), @@ -3867,7 +3868,7 @@ mod tests { Arc::new(schema), vec![Arc::new(StructArray::new( inner_fields.into(), - vec![Arc::new(Int32Array::from(a.1.clone()))], + vec![Arc::new(Int32Array::from(values.clone()))], nulls, ))], ) diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index 5e6661d563270..180bf526dbe3f 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -1351,10 +1351,6 @@ physical_plan 11)------------MemoryExec: partitions=1, partition_sizes=[1] -statement ok -set datafusion.optimizer.prefer_hash_join = true; - - # Join on struct query ?? select join_t3.s3, join_t4.s4 @@ -1363,7 +1359,7 @@ inner join join_t4 on join_t3.s3 = join_t4.s4 ---- {id: 2} {id: 2} -# Join on struct with null +# Join on struct using IS NOT DISTINCT FROM query ?? select join_t3.s3, join_t4.s4 from join_t3 From 711f444c12d919ef4f6853c1a584a39e2f03348b Mon Sep 17 00:00:00 2001 From: Emil Ejbyfeldt Date: Wed, 3 Jul 2024 21:11:10 +0200 Subject: [PATCH 4/7] Add Struct to can_hash method --- datafusion/expr/src/utils.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index e3b8db676c98e..34e007207427b 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -864,6 +864,7 @@ pub fn can_hash(data_type: &DataType) -> bool { DataType::List(_) => true, DataType::LargeList(_) => true, DataType::FixedSizeList(_, _) => true, + DataType::Struct(fields) => fields.iter().all(|f| can_hash(f.data_type())), _ => false, } } From c1b4c8e3872c6975988b1191b5a0333955e27bd1 Mon Sep 17 00:00:00 2001 From: Emil Ejbyfeldt Date: Wed, 3 Jul 2024 21:59:04 +0200 Subject: [PATCH 5/7] Add explain query as well --- datafusion/sqllogictest/test_files/joins.slt | 22 ++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index 180bf526dbe3f..8ad5512c50ad2 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -1352,6 +1352,28 @@ physical_plan # Join on struct +query TT +explain select join_t3.s3, join_t4.s4 +from join_t3 +inner join join_t4 on join_t3.s3 = join_t4.s4 +---- +logical_plan +01)Inner Join: join_t3.s3 = join_t4.s4 +02)--TableScan: join_t3 projection=[s3] +03)--TableScan: join_t4 projection=[s4] +physical_plan +01)CoalesceBatchesExec: target_batch_size=2 +02)--HashJoinExec: mode=Partitioned, join_type=Inner, on=[(s3@0, s4@0)] +03)----CoalesceBatchesExec: target_batch_size=2 +04)------RepartitionExec: partitioning=Hash([s3@0], 2), input_partitions=2 +05)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +06)----------MemoryExec: partitions=1, partition_sizes=[1] +07)----CoalesceBatchesExec: target_batch_size=2 +08)------RepartitionExec: partitioning=Hash([s4@0], 2), input_partitions=2 +09)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +10)----------MemoryExec: partitions=1, partition_sizes=[1] + + query ?? select join_t3.s3, join_t4.s4 from join_t3 From 47a2e02b301cb0ad532efef2f059e7d4077c77ba Mon Sep 17 00:00:00 2001 From: Emil Ejbyfeldt Date: Thu, 4 Jul 2024 22:05:41 +0200 Subject: [PATCH 6/7] Use EXCEPT to trigger failure --- datafusion/sqllogictest/test_files/joins.slt | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index 8ad5512c50ad2..df9b8ad452000 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -1350,7 +1350,6 @@ physical_plan 10)----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 11)------------MemoryExec: partitions=1, partition_sizes=[1] - # Join on struct query TT explain select join_t3.s3, join_t4.s4 @@ -1373,7 +1372,6 @@ physical_plan 09)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 10)----------MemoryExec: partitions=1, partition_sizes=[1] - query ?? select join_t3.s3, join_t4.s4 from join_t3 @@ -1381,15 +1379,13 @@ inner join join_t4 on join_t3.s3 = join_t4.s4 ---- {id: 2} {id: 2} -# Join on struct using IS NOT DISTINCT FROM -query ?? -select join_t3.s3, join_t4.s4 -from join_t3 -inner join join_t4 on join_t3.s3 IS NOT DISTINCT FROM join_t4.s4 +# join with struct key and nulls +query ? +SELECT * FROM join_t3 +EXCEPT +SELECT * FROM join_t4 ---- -NULL NULL -{id: 2} {id: 2} - +{id: 1} query TT EXPLAIN From 4a1fcf08dd1e7107c2673c972c2210191f31a04a Mon Sep 17 00:00:00 2001 From: Emil Ejbyfeldt Date: Fri, 5 Jul 2024 07:52:19 +0200 Subject: [PATCH 7/7] Update datafusion/sqllogictest/test_files/joins.slt Co-authored-by: Liang-Chi Hsieh --- datafusion/sqllogictest/test_files/joins.slt | 1 + 1 file changed, 1 insertion(+) diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index df9b8ad452000..593de07f7d26e 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -1380,6 +1380,7 @@ inner join join_t4 on join_t3.s3 = join_t4.s4 {id: 2} {id: 2} # join with struct key and nulls +# Note that intersect or except applies `null_equals_null` as true for Join. query ? SELECT * FROM join_t3 EXCEPT