From 19c84d784d5e9b9ab901e33a3a20011c00bd72ba Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Thu, 31 Dec 2020 14:22:59 +0100 Subject: [PATCH 1/4] Calculate column indices upfront --- .../datafusion/src/physical_plan/hash_join.rs | 84 ++++++++++++------- 1 file changed, 54 insertions(+), 30 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index 9ac7447a8ab..2cf82d21e65 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -202,12 +202,20 @@ impl ExecutionPlan for HashJoinExec { .iter() .map(|on| on.1.clone()) .collect::>(); + + let column_indices = column_indices_from_schema( + &self.left.schema(), + &self.right.schema(), + &self.schema(), + self.join_type, + )?; Ok(Box::pin(HashJoinStream { schema: self.schema.clone(), on_right, join_type: self.join_type, left_data, right: stream, + column_indices, })) } } @@ -252,6 +260,8 @@ struct HashJoinStream { left_data: JoinLeftData, /// right right: SendableRecordBatchStream, + /// Information of index and left / right placement of columns + column_indices: Vec<(usize, bool)>, } impl RecordBatchStream for HashJoinStream { @@ -260,6 +270,39 @@ impl RecordBatchStream for HashJoinStream { } } +/// Calculates column index based on input / output schemas and jointype +fn column_indices_from_schema( + left: &Schema, + right: &Schema, + output_schema: &Schema, + join_type: JoinType, +) -> ArrowResult> { + let (primary_is_left, primary_schema, secondary_schema) = match join_type { + JoinType::Inner | JoinType::Left => (true, left, right), + JoinType::Right => (false, right, left), + }; + let mut column_indices = vec![]; + for field in output_schema.fields() { + let (is_primary, column_index) = match primary_schema.index_of(field.name()) { + Ok(i) => Ok((true, i)), + Err(_) => { + match secondary_schema.index_of(field.name()) { + Ok(i) => Ok((false, i)), + _ => Err(DataFusionError::Internal( + format!("During execution, the column {} was not found in neither the left or right side of the join", field.name()).to_string() + )) + } + } + }.map_err(DataFusionError::into_arrow_external_error)?; + + let is_left = + (is_primary && primary_is_left) || (!is_primary && !primary_is_left); + column_indices.push((column_index, is_left)); + } + + Ok(column_indices) +} + /// Returns a new [RecordBatch] by combining the `left` and `right` according to `indices`. /// The resulting batch has [Schema] `schema`. /// # Error @@ -269,18 +312,13 @@ fn build_batch_from_indices( schema: &Schema, left: &Vec, right: &RecordBatch, - join_type: &JoinType, indices: &[(JoinIndex, RightIndex)], + column_indices: &Vec<(usize, bool)>, ) -> ArrowResult { if left.is_empty() { todo!("Create empty record batch"); } - let (primary_is_left, primary_schema, secondary_schema) = match join_type { - JoinType::Inner | JoinType::Left => (true, left[0].schema(), right.schema()), - JoinType::Right => (false, right.schema(), left[0].schema()), - }; - // build the columns of the new [RecordBatch]: // 1. pick whether the column is from the left or right // 2. based on the pick, `take` items from the different recordBatches @@ -288,28 +326,12 @@ fn build_batch_from_indices( let right_indices = indices.iter().map(|(_, join_index)| join_index).collect(); - for field in schema.fields() { - // pick the column (left or right) based on the field name. - let (is_primary, column_index) = match primary_schema.index_of(field.name()) { - Ok(i) => Ok((true, i)), - Err(_) => { - match secondary_schema.index_of(field.name()) { - Ok(i) => Ok((false, i)), - _ => Err(DataFusionError::Internal( - format!("During execution, the column {} was not found in neither the left or right side of the join", field.name()).to_string() - )) - } - } - }.map_err(DataFusionError::into_arrow_external_error)?; - - let is_left = - (is_primary && primary_is_left) || (!is_primary && !primary_is_left); - - let array = if is_left { + for (column_index, is_left) in column_indices { + let array = if *is_left { // Note that we take `.data_ref()` to gather the [ArrayData] of each array. let arrays = left .iter() - .map(|batch| batch.column(column_index).data_ref().as_ref()) + .map(|batch| batch.column(*column_index).data_ref().as_ref()) .collect::>(); let mut mutable = MutableArrayData::new(arrays, true, indices.len()); @@ -323,7 +345,7 @@ fn build_batch_from_indices( make_array(Arc::new(mutable.freeze())) } else { // use the right indices - let array = right.column(column_index); + let array = right.column(*column_index); compute::take(array.as_ref(), &right_indices, None)? }; columns.push(array); @@ -396,12 +418,13 @@ fn build_batch( batch: &RecordBatch, left_data: &JoinLeftData, on_right: &HashSet, - join_type: &JoinType, + join_type: JoinType, schema: &Schema, + column_indices: &Vec<(usize, bool)>, ) -> ArrowResult { let indices = build_join_indexes(&left_data.0, &batch, join_type, on_right).unwrap(); - build_batch_from_indices(schema, &left_data.1, batch, join_type, &indices) + build_batch_from_indices(schema, &left_data.1, batch, &indices, column_indices) } /// returns a vector with (index from left, index from right). @@ -434,7 +457,7 @@ fn build_batch( fn build_join_indexes( left: &JoinHashMap, right: &RecordBatch, - join_type: &JoinType, + join_type: JoinType, right_on: &HashSet, ) -> Result> { let keys_values = right_on @@ -531,8 +554,9 @@ impl Stream for HashJoinStream { &batch, &self.left_data, &self.on_right, - &self.join_type, + self.join_type, &self.schema, + &self.column_indices, )), other => other, }) From 45fa819569442951694fdb9cbb2a1dd223e41fcf Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Thu, 31 Dec 2020 15:58:04 +0100 Subject: [PATCH 2/4] Make function part of HashJoinExec --- .../datafusion/src/physical_plan/hash_join.rs | 70 ++++++++----------- 1 file changed, 31 insertions(+), 39 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index 2cf82d21e65..566152f27b9 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -116,6 +116,36 @@ impl HashJoinExec { build_side: Arc::new(Mutex::new(None)), }) } + + /// Calculates column indices and left/right placement on input / output schemas and jointype + fn column_indices_from_schema(&self) -> ArrowResult> { + let (primary_is_left, primary_schema, secondary_schema) = match self.join_type { + JoinType::Inner | JoinType::Left => { + (true, self.left.schema(), self.right.schema()) + } + JoinType::Right => (false, self.right.schema(), self.left.schema()), + }; + let mut column_indices = vec![]; + for field in self.schema.fields() { + let (is_primary, column_index) = match primary_schema.index_of(field.name()) { + Ok(i) => Ok((true, i)), + Err(_) => { + match secondary_schema.index_of(field.name()) { + Ok(i) => Ok((false, i)), + _ => Err(DataFusionError::Internal( + format!("During execution, the column {} was not found in neither the left or right side of the join", field.name()).to_string() + )) + } + } + }.map_err(DataFusionError::into_arrow_external_error)?; + + let is_left = + is_primary && primary_is_left || !is_primary && !primary_is_left; + column_indices.push((column_index, is_left)); + } + + Ok(column_indices) + } } #[async_trait] @@ -203,12 +233,7 @@ impl ExecutionPlan for HashJoinExec { .map(|on| on.1.clone()) .collect::>(); - let column_indices = column_indices_from_schema( - &self.left.schema(), - &self.right.schema(), - &self.schema(), - self.join_type, - )?; + let column_indices = self.column_indices_from_schema()?; Ok(Box::pin(HashJoinStream { schema: self.schema.clone(), on_right, @@ -270,39 +295,6 @@ impl RecordBatchStream for HashJoinStream { } } -/// Calculates column index based on input / output schemas and jointype -fn column_indices_from_schema( - left: &Schema, - right: &Schema, - output_schema: &Schema, - join_type: JoinType, -) -> ArrowResult> { - let (primary_is_left, primary_schema, secondary_schema) = match join_type { - JoinType::Inner | JoinType::Left => (true, left, right), - JoinType::Right => (false, right, left), - }; - let mut column_indices = vec![]; - for field in output_schema.fields() { - let (is_primary, column_index) = match primary_schema.index_of(field.name()) { - Ok(i) => Ok((true, i)), - Err(_) => { - match secondary_schema.index_of(field.name()) { - Ok(i) => Ok((false, i)), - _ => Err(DataFusionError::Internal( - format!("During execution, the column {} was not found in neither the left or right side of the join", field.name()).to_string() - )) - } - } - }.map_err(DataFusionError::into_arrow_external_error)?; - - let is_left = - (is_primary && primary_is_left) || (!is_primary && !primary_is_left); - column_indices.push((column_index, is_left)); - } - - Ok(column_indices) -} - /// Returns a new [RecordBatch] by combining the `left` and `right` according to `indices`. /// The resulting batch has [Schema] `schema`. /// # Error From baa5dcea80392f8f575ff1cf723c121dc6cdc9e6 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 1 Jan 2021 12:56:03 +0100 Subject: [PATCH 3/4] Small refactor by using struct --- .../datafusion/src/physical_plan/hash_join.rs | 27 +++++++++++-------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index 566152f27b9..82032d5f133 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -81,6 +81,11 @@ pub struct HashJoinExec { build_side: Arc>>, } +struct ColumnIndex { + index: usize, + is_left: bool, +} + impl HashJoinExec { /// Tries to create a new [HashJoinExec]. /// # Error @@ -118,16 +123,16 @@ impl HashJoinExec { } /// Calculates column indices and left/right placement on input / output schemas and jointype - fn column_indices_from_schema(&self) -> ArrowResult> { + fn column_indices_from_schema(&self) -> ArrowResult> { let (primary_is_left, primary_schema, secondary_schema) = match self.join_type { JoinType::Inner | JoinType::Left => { (true, self.left.schema(), self.right.schema()) } JoinType::Right => (false, self.right.schema(), self.left.schema()), }; - let mut column_indices = vec![]; + let mut column_indices = Vec::with_capacity(self.schema.fields().len()); for field in self.schema.fields() { - let (is_primary, column_index) = match primary_schema.index_of(field.name()) { + let (is_primary, index) = match primary_schema.index_of(field.name()) { Ok(i) => Ok((true, i)), Err(_) => { match secondary_schema.index_of(field.name()) { @@ -141,7 +146,7 @@ impl HashJoinExec { let is_left = is_primary && primary_is_left || !is_primary && !primary_is_left; - column_indices.push((column_index, is_left)); + column_indices.push(ColumnIndex { index, is_left }); } Ok(column_indices) @@ -286,7 +291,7 @@ struct HashJoinStream { /// right right: SendableRecordBatchStream, /// Information of index and left / right placement of columns - column_indices: Vec<(usize, bool)>, + column_indices: Vec, } impl RecordBatchStream for HashJoinStream { @@ -305,7 +310,7 @@ fn build_batch_from_indices( left: &Vec, right: &RecordBatch, indices: &[(JoinIndex, RightIndex)], - column_indices: &Vec<(usize, bool)>, + column_indices: &Vec, ) -> ArrowResult { if left.is_empty() { todo!("Create empty record batch"); @@ -318,12 +323,12 @@ fn build_batch_from_indices( let right_indices = indices.iter().map(|(_, join_index)| join_index).collect(); - for (column_index, is_left) in column_indices { - let array = if *is_left { + for column_index in column_indices { + let array = if column_index.is_left { // Note that we take `.data_ref()` to gather the [ArrayData] of each array. let arrays = left .iter() - .map(|batch| batch.column(*column_index).data_ref().as_ref()) + .map(|batch| batch.column(column_index.index).data_ref().as_ref()) .collect::>(); let mut mutable = MutableArrayData::new(arrays, true, indices.len()); @@ -337,7 +342,7 @@ fn build_batch_from_indices( make_array(Arc::new(mutable.freeze())) } else { // use the right indices - let array = right.column(*column_index); + let array = right.column(column_index.index); compute::take(array.as_ref(), &right_indices, None)? }; columns.push(array); @@ -412,7 +417,7 @@ fn build_batch( on_right: &HashSet, join_type: JoinType, schema: &Schema, - column_indices: &Vec<(usize, bool)>, + column_indices: &Vec, ) -> ArrowResult { let indices = build_join_indexes(&left_data.0, &batch, join_type, on_right).unwrap(); From 5b6bf8316cc692ea01f9a0f185e1eb2d14994083 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 1 Jan 2021 13:11:49 +0100 Subject: [PATCH 4/4] Add comments --- rust/datafusion/src/physical_plan/hash_join.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index 82032d5f133..d6b3b937004 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -81,8 +81,11 @@ pub struct HashJoinExec { build_side: Arc>>, } +/// Information about the index and placement (left or right) of the columns struct ColumnIndex { + /// Index of the column index: usize, + /// Whether the column is at the left or right side is_left: bool, }