From 2f02d1cdae9a6e01f9a02b91ca730a015388d2c7 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Wed, 30 Dec 2020 16:12:45 +0100 Subject: [PATCH 1/8] Refactor usage of right indices --- .../datafusion/src/physical_plan/hash_join.rs | 37 +++++++++++++------ 1 file changed, 25 insertions(+), 12 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index def1cc09a17..ead9991d7a3 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -18,7 +18,10 @@ //! Defines the join plan for executing partitions in parallel and then joining the results //! into a set of partitions. -use arrow::array::ArrayRef; +use arrow::{ + array::{ArrayRef, UInt32Builder}, + compute, +}; use std::sync::Arc; use std::{any::Any, collections::HashSet}; @@ -289,12 +292,11 @@ fn build_batch_from_indices( let mut columns: Vec> = Vec::with_capacity(schema.fields().len()); for field in schema.fields() { // pick the column (left or right) based on the field name. - // Note that we take `.data_ref()` to gather the [ArrayData] of each array. - let (is_primary, arrays) = match primary[0].schema().index_of(field.name()) { - Ok(i) => Ok((true, primary.iter().map(|batch| batch.column(i).data_ref().as_ref()).collect::>())), + let (is_primary, column_index) = match primary[0].schema().index_of(field.name()) { + Ok(i) => Ok((true, i)), Err(_) => { match secondary[0].schema().index_of(field.name()) { - Ok(i) => Ok((false, secondary.iter().map(|batch| batch.column(i).data_ref().as_ref()).collect::>())), + Ok(i) => Ok((false, i)), _ => Err(DataFusionError::Internal( format!("During execution, the column {} was not found in neither the left or right side of the join", field.name()).to_string() )) @@ -302,12 +304,17 @@ fn build_batch_from_indices( } }.map_err(DataFusionError::into_arrow_external_error)?; - let capacity = arrays.iter().map(|array| array.len()).sum(); - let mut mutable = MutableArrayData::new(arrays, true, capacity); - let is_left = (is_primary && primary_is_left) || (!is_primary && !primary_is_left); - if is_left { + + let array = if is_left { + // Note that we take `.data_ref()` to gather the [ArrayData] of each array. + let arrays = left.iter() + .map(|batch| batch.column(column_index).data_ref().as_ref()) + .collect::>(); + + let capacity = arrays.iter().map(|array| array.len()).sum(); + let mut mutable = MutableArrayData::new(arrays, true, capacity); // use the left indices for (join_index, _) in indices { match join_index { @@ -315,16 +322,22 @@ fn build_batch_from_indices( None => mutable.extend_nulls(1), } } + make_array(Arc::new(mutable.freeze())) } else { // use the right indices + + let array = right[0].column(column_index); + let mut builder = UInt32Builder::new(indices.len()); for (_, join_index) in indices { match join_index { - Some(row) => mutable.extend(0, *row, *row + 1), - None => mutable.extend_nulls(1), + Some(row) => builder.append_value(*row as u32)?, + None => { + builder.append_null()?; + } } } + compute::take(array.as_ref(), &builder.finish(), None)? }; - let array = make_array(Arc::new(mutable.freeze())); columns.push(array); } Ok(RecordBatch::try_new(Arc::new(schema.clone()), columns)?) From 3ba0d9b28e7b18a1592538ea7d8af37da6967aa1 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Wed, 30 Dec 2020 16:48:49 +0100 Subject: [PATCH 2/8] Format --- .../datafusion/src/physical_plan/hash_join.rs | 35 +++++++------------ 1 file changed, 13 insertions(+), 22 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index ead9991d7a3..a661619c9f8 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -18,10 +18,7 @@ //! Defines the join plan for executing partitions in parallel and then joining the results //! into a set of partitions. -use arrow::{ - array::{ArrayRef, UInt32Builder}, - compute, -}; +use arrow::{array::ArrayRef, compute}; use std::sync::Arc; use std::{any::Any, collections::HashSet}; @@ -58,7 +55,7 @@ type Index = (usize, usize); // as a left join may issue None indices, in which case type JoinIndex = Option<(usize, usize)>; // An index of row uniquely identifying a row in a batch -type RightIndex = Option; +type RightIndex = Option; // Maps ["on" value] -> [list of indices with this key's value] // E.g. [1, 2] -> [(0, 3), (1, 6), (0, 8)] indicates that (column1, column2) = [1, 2] is true @@ -309,7 +306,8 @@ fn build_batch_from_indices( let array = if is_left { // Note that we take `.data_ref()` to gather the [ArrayData] of each array. - let arrays = left.iter() + let arrays = left + .iter() .map(|batch| batch.column(column_index).data_ref().as_ref()) .collect::>(); @@ -324,19 +322,12 @@ fn build_batch_from_indices( } make_array(Arc::new(mutable.freeze())) } else { - // use the right indices - let array = right[0].column(column_index); - let mut builder = UInt32Builder::new(indices.len()); - for (_, join_index) in indices { - match join_index { - Some(row) => builder.append_value(*row as u32)?, - None => { - builder.append_null()?; - } - } - } - compute::take(array.as_ref(), &builder.finish(), None)? + let ind = indices + .iter() + .map(|(_, join_index)| join_index) + .collect::(); + compute::take(array.as_ref(), &ind, None)? }; columns.push(array); } @@ -469,7 +460,7 @@ fn build_join_indexes( // for every item on the left and right with this key, add the respective pair left_indexes.unwrap_or(&vec![]).iter().for_each(|x| { // on an inner join, left and right indices are present - indexes.push((Some(*x), Some(row))); + indexes.push((Some(*x), Some(row as u32))); }) } Ok(indexes) @@ -490,7 +481,7 @@ fn build_join_indexes( is_visited.insert(key.clone()); indices.iter().for_each(|x| { - indexes.push((Some(*x), Some(row))); + indexes.push((Some(*x), Some(row as u32))); }) }; } @@ -515,12 +506,12 @@ fn build_join_indexes( match left_indices { Some(indices) => { indices.iter().for_each(|x| { - indexes.push((Some(*x), Some(row))); + indexes.push((Some(*x), Some(row as u32))); }); } None => { // when no match, add the row with None for the left side - indexes.push((None, Some(row))); + indexes.push((None, Some(row as u32))); } } } From 20cb09115d2ff01a5a702e3843cdee00497cee71 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Wed, 30 Dec 2020 17:00:19 +0100 Subject: [PATCH 3/8] Use indices.len() --- rust/datafusion/src/physical_plan/hash_join.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index a661619c9f8..f878d94de94 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -311,8 +311,7 @@ fn build_batch_from_indices( .map(|batch| batch.column(column_index).data_ref().as_ref()) .collect::>(); - let capacity = arrays.iter().map(|array| array.len()).sum(); - let mut mutable = MutableArrayData::new(arrays, true, capacity); + let mut mutable = MutableArrayData::new(arrays, true, indices.len()); // use the left indices for (join_index, _) in indices { match join_index { From 7b031035fd9bc1afd752cb7d61343c964e886dc9 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Wed, 30 Dec 2020 17:06:49 +0100 Subject: [PATCH 4/8] Small simplification --- rust/datafusion/src/physical_plan/hash_join.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index f878d94de94..3f8aa23b146 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -276,11 +276,11 @@ fn build_batch_from_indices( todo!("Create empty record batch"); } // this is just for symmetry of the code below. - let right = vec![right.clone()]; + let right_batches = vec![right.clone()]; let (primary_is_left, primary, secondary) = match join_type { - JoinType::Inner | JoinType::Left => (true, left, &right), - JoinType::Right => (false, &right, left), + JoinType::Inner | JoinType::Left => (true, left, &right_batches), + JoinType::Right => (false, &right_batches, left), }; // build the columns of the new [RecordBatch]: @@ -321,7 +321,7 @@ fn build_batch_from_indices( } make_array(Arc::new(mutable.freeze())) } else { - let array = right[0].column(column_index); + let array = right.column(column_index); let ind = indices .iter() .map(|(_, join_index)| join_index) From b9fe4e4065dcf7019ef7dbae277d2be05e0fe74d Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Wed, 30 Dec 2020 17:26:57 +0100 Subject: [PATCH 5/8] Naming improvement --- rust/datafusion/src/physical_plan/hash_join.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index 3f8aa23b146..fd4150eb23e 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -321,12 +321,13 @@ fn build_batch_from_indices( } make_array(Arc::new(mutable.freeze())) } else { + // use the right indices let array = right.column(column_index); - let ind = indices + let right_indices = indices .iter() .map(|(_, join_index)| join_index) .collect::(); - compute::take(array.as_ref(), &ind, None)? + compute::take(array.as_ref(), &right_indices, None)? }; columns.push(array); } From 6017e496574f66e562d0d325d35e69d2bf325495 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Wed, 30 Dec 2020 17:30:38 +0100 Subject: [PATCH 6/8] Reuse right indices array --- rust/datafusion/src/physical_plan/hash_join.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index fd4150eb23e..ab60104932f 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -287,6 +287,9 @@ fn build_batch_from_indices( // 1. pick whether the column is from the left or right // 2. based on the pick, `take` items from the different recordBatches let mut columns: Vec> = Vec::with_capacity(schema.fields().len()); + + let right_indices = indices.iter().map(|(_, join_index)| join_index).collect(); + for field in schema.fields() { // pick the column (left or right) based on the field name. let (is_primary, column_index) = match primary[0].schema().index_of(field.name()) { @@ -323,10 +326,6 @@ fn build_batch_from_indices( } else { // use the right indices let array = right.column(column_index); - let right_indices = indices - .iter() - .map(|(_, join_index)| join_index) - .collect::(); compute::take(array.as_ref(), &right_indices, None)? }; columns.push(array); From 098b171efce111ae1a0d8c3cff1c0740d15f1a81 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Wed, 30 Dec 2020 20:31:20 +0100 Subject: [PATCH 7/8] Small simplification, avoid clone --- rust/datafusion/src/physical_plan/hash_join.rs | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index ab60104932f..672646a5049 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -276,11 +276,10 @@ fn build_batch_from_indices( todo!("Create empty record batch"); } // this is just for symmetry of the code below. - let right_batches = vec![right.clone()]; - let (primary_is_left, primary, secondary) = match join_type { - JoinType::Inner | JoinType::Left => (true, left, &right_batches), - JoinType::Right => (false, &right_batches, left), + let (primary_is_left, primary_schema, secondary_schema) = match join_type { + JoinType::Inner | JoinType::Left => (true, left[0].schema(), right.schema()), + JoinType::Right => (false, right.schema(), left[0].schema()), }; // build the columns of the new [RecordBatch]: @@ -292,10 +291,10 @@ fn build_batch_from_indices( for field in schema.fields() { // pick the column (left or right) based on the field name. - let (is_primary, column_index) = match primary[0].schema().index_of(field.name()) { + let (is_primary, column_index) = match primary_schema.index_of(field.name()) { Ok(i) => Ok((true, i)), Err(_) => { - match secondary[0].schema().index_of(field.name()) { + match secondary_schema.index_of(field.name()) { Ok(i) => Ok((false, i)), _ => Err(DataFusionError::Internal( format!("During execution, the column {} was not found in neither the left or right side of the join", field.name()).to_string() @@ -403,7 +402,7 @@ fn build_batch( ) -> ArrowResult { let indices = build_join_indexes(&left_data.0, &batch, join_type, on_right).unwrap(); - build_batch_from_indices(schema, &left_data.1, &batch, join_type, &indices) + build_batch_from_indices(schema, &left_data.1, batch, join_type, &indices) } /// returns a vector with (index from left, index from right). From cde79c7c8d61055b0190bb6766ea4097a8ba69ae Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Wed, 30 Dec 2020 20:32:33 +0100 Subject: [PATCH 8/8] Remove outdated comment --- rust/datafusion/src/physical_plan/hash_join.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index 672646a5049..9ac7447a8ab 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -275,7 +275,6 @@ fn build_batch_from_indices( if left.is_empty() { todo!("Create empty record batch"); } - // this is just for symmetry of the code below. let (primary_is_left, primary_schema, secondary_schema) = match join_type { JoinType::Inner | JoinType::Left => (true, left[0].schema(), right.schema()),