From 19c84d784d5e9b9ab901e33a3a20011c00bd72ba Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Thu, 31 Dec 2020 14:22:59 +0100 Subject: [PATCH 01/43] Calculate column indices upfront --- .../datafusion/src/physical_plan/hash_join.rs | 84 ++++++++++++------- 1 file changed, 54 insertions(+), 30 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index 9ac7447a8ab..2cf82d21e65 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -202,12 +202,20 @@ impl ExecutionPlan for HashJoinExec { .iter() .map(|on| on.1.clone()) .collect::>(); + + let column_indices = column_indices_from_schema( + &self.left.schema(), + &self.right.schema(), + &self.schema(), + self.join_type, + )?; Ok(Box::pin(HashJoinStream { schema: self.schema.clone(), on_right, join_type: self.join_type, left_data, right: stream, + column_indices, })) } } @@ -252,6 +260,8 @@ struct HashJoinStream { left_data: JoinLeftData, /// right right: SendableRecordBatchStream, + /// Information of index and left / right placement of columns + column_indices: Vec<(usize, bool)>, } impl RecordBatchStream for HashJoinStream { @@ -260,6 +270,39 @@ impl RecordBatchStream for HashJoinStream { } } +/// Calculates column index based on input / output schemas and jointype +fn column_indices_from_schema( + left: &Schema, + right: &Schema, + output_schema: &Schema, + join_type: JoinType, +) -> ArrowResult> { + let (primary_is_left, primary_schema, secondary_schema) = match join_type { + JoinType::Inner | JoinType::Left => (true, left, right), + JoinType::Right => (false, right, left), + }; + let mut column_indices = vec![]; + for field in output_schema.fields() { + let (is_primary, column_index) = match primary_schema.index_of(field.name()) { + Ok(i) => Ok((true, i)), + Err(_) => { + match secondary_schema.index_of(field.name()) { + Ok(i) => Ok((false, i)), + _ => Err(DataFusionError::Internal( + format!("During execution, the column {} was not found in neither the left or right side of the join", field.name()).to_string() + )) + } + } + }.map_err(DataFusionError::into_arrow_external_error)?; + + let is_left = + (is_primary && primary_is_left) || (!is_primary && !primary_is_left); + column_indices.push((column_index, is_left)); + } + + Ok(column_indices) +} + /// Returns a new [RecordBatch] by combining the `left` and `right` according to `indices`. /// The resulting batch has [Schema] `schema`. /// # Error @@ -269,18 +312,13 @@ fn build_batch_from_indices( schema: &Schema, left: &Vec, right: &RecordBatch, - join_type: &JoinType, indices: &[(JoinIndex, RightIndex)], + column_indices: &Vec<(usize, bool)>, ) -> ArrowResult { if left.is_empty() { todo!("Create empty record batch"); } - let (primary_is_left, primary_schema, secondary_schema) = match join_type { - JoinType::Inner | JoinType::Left => (true, left[0].schema(), right.schema()), - JoinType::Right => (false, right.schema(), left[0].schema()), - }; - // build the columns of the new [RecordBatch]: // 1. pick whether the column is from the left or right // 2. based on the pick, `take` items from the different recordBatches @@ -288,28 +326,12 @@ fn build_batch_from_indices( let right_indices = indices.iter().map(|(_, join_index)| join_index).collect(); - for field in schema.fields() { - // pick the column (left or right) based on the field name. - let (is_primary, column_index) = match primary_schema.index_of(field.name()) { - Ok(i) => Ok((true, i)), - Err(_) => { - match secondary_schema.index_of(field.name()) { - Ok(i) => Ok((false, i)), - _ => Err(DataFusionError::Internal( - format!("During execution, the column {} was not found in neither the left or right side of the join", field.name()).to_string() - )) - } - } - }.map_err(DataFusionError::into_arrow_external_error)?; - - let is_left = - (is_primary && primary_is_left) || (!is_primary && !primary_is_left); - - let array = if is_left { + for (column_index, is_left) in column_indices { + let array = if *is_left { // Note that we take `.data_ref()` to gather the [ArrayData] of each array. let arrays = left .iter() - .map(|batch| batch.column(column_index).data_ref().as_ref()) + .map(|batch| batch.column(*column_index).data_ref().as_ref()) .collect::>(); let mut mutable = MutableArrayData::new(arrays, true, indices.len()); @@ -323,7 +345,7 @@ fn build_batch_from_indices( make_array(Arc::new(mutable.freeze())) } else { // use the right indices - let array = right.column(column_index); + let array = right.column(*column_index); compute::take(array.as_ref(), &right_indices, None)? }; columns.push(array); @@ -396,12 +418,13 @@ fn build_batch( batch: &RecordBatch, left_data: &JoinLeftData, on_right: &HashSet, - join_type: &JoinType, + join_type: JoinType, schema: &Schema, + column_indices: &Vec<(usize, bool)>, ) -> ArrowResult { let indices = build_join_indexes(&left_data.0, &batch, join_type, on_right).unwrap(); - build_batch_from_indices(schema, &left_data.1, batch, join_type, &indices) + build_batch_from_indices(schema, &left_data.1, batch, &indices, column_indices) } /// returns a vector with (index from left, index from right). @@ -434,7 +457,7 @@ fn build_batch( fn build_join_indexes( left: &JoinHashMap, right: &RecordBatch, - join_type: &JoinType, + join_type: JoinType, right_on: &HashSet, ) -> Result> { let keys_values = right_on @@ -531,8 +554,9 @@ impl Stream for HashJoinStream { &batch, &self.left_data, &self.on_right, - &self.join_type, + self.join_type, &self.schema, + &self.column_indices, )), other => other, }) From 45fa819569442951694fdb9cbb2a1dd223e41fcf Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Thu, 31 Dec 2020 15:58:04 +0100 Subject: [PATCH 02/43] Make function part of HashJoinExec --- .../datafusion/src/physical_plan/hash_join.rs | 70 ++++++++----------- 1 file changed, 31 insertions(+), 39 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index 2cf82d21e65..566152f27b9 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -116,6 +116,36 @@ impl HashJoinExec { build_side: Arc::new(Mutex::new(None)), }) } + + /// Calculates column indices and left/right placement on input / output schemas and jointype + fn column_indices_from_schema(&self) -> ArrowResult> { + let (primary_is_left, primary_schema, secondary_schema) = match self.join_type { + JoinType::Inner | JoinType::Left => { + (true, self.left.schema(), self.right.schema()) + } + JoinType::Right => (false, self.right.schema(), self.left.schema()), + }; + let mut column_indices = vec![]; + for field in self.schema.fields() { + let (is_primary, column_index) = match primary_schema.index_of(field.name()) { + Ok(i) => Ok((true, i)), + Err(_) => { + match secondary_schema.index_of(field.name()) { + Ok(i) => Ok((false, i)), + _ => Err(DataFusionError::Internal( + format!("During execution, the column {} was not found in neither the left or right side of the join", field.name()).to_string() + )) + } + } + }.map_err(DataFusionError::into_arrow_external_error)?; + + let is_left = + is_primary && primary_is_left || !is_primary && !primary_is_left; + column_indices.push((column_index, is_left)); + } + + Ok(column_indices) + } } #[async_trait] @@ -203,12 +233,7 @@ impl ExecutionPlan for HashJoinExec { .map(|on| on.1.clone()) .collect::>(); - let column_indices = column_indices_from_schema( - &self.left.schema(), - &self.right.schema(), - &self.schema(), - self.join_type, - )?; + let column_indices = self.column_indices_from_schema()?; Ok(Box::pin(HashJoinStream { schema: self.schema.clone(), on_right, @@ -270,39 +295,6 @@ impl RecordBatchStream for HashJoinStream { } } -/// Calculates column index based on input / output schemas and jointype -fn column_indices_from_schema( - left: &Schema, - right: &Schema, - output_schema: &Schema, - join_type: JoinType, -) -> ArrowResult> { - let (primary_is_left, primary_schema, secondary_schema) = match join_type { - JoinType::Inner | JoinType::Left => (true, left, right), - JoinType::Right => (false, right, left), - }; - let mut column_indices = vec![]; - for field in output_schema.fields() { - let (is_primary, column_index) = match primary_schema.index_of(field.name()) { - Ok(i) => Ok((true, i)), - Err(_) => { - match secondary_schema.index_of(field.name()) { - Ok(i) => Ok((false, i)), - _ => Err(DataFusionError::Internal( - format!("During execution, the column {} was not found in neither the left or right side of the join", field.name()).to_string() - )) - } - } - }.map_err(DataFusionError::into_arrow_external_error)?; - - let is_left = - (is_primary && primary_is_left) || (!is_primary && !primary_is_left); - column_indices.push((column_index, is_left)); - } - - Ok(column_indices) -} - /// Returns a new [RecordBatch] by combining the `left` and `right` according to `indices`. /// The resulting batch has [Schema] `schema`. /// # Error From baa5dcea80392f8f575ff1cf723c121dc6cdc9e6 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 1 Jan 2021 12:56:03 +0100 Subject: [PATCH 03/43] Small refactor by using struct --- .../datafusion/src/physical_plan/hash_join.rs | 27 +++++++++++-------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index 566152f27b9..82032d5f133 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -81,6 +81,11 @@ pub struct HashJoinExec { build_side: Arc>>, } +struct ColumnIndex { + index: usize, + is_left: bool, +} + impl HashJoinExec { /// Tries to create a new [HashJoinExec]. /// # Error @@ -118,16 +123,16 @@ impl HashJoinExec { } /// Calculates column indices and left/right placement on input / output schemas and jointype - fn column_indices_from_schema(&self) -> ArrowResult> { + fn column_indices_from_schema(&self) -> ArrowResult> { let (primary_is_left, primary_schema, secondary_schema) = match self.join_type { JoinType::Inner | JoinType::Left => { (true, self.left.schema(), self.right.schema()) } JoinType::Right => (false, self.right.schema(), self.left.schema()), }; - let mut column_indices = vec![]; + let mut column_indices = Vec::with_capacity(self.schema.fields().len()); for field in self.schema.fields() { - let (is_primary, column_index) = match primary_schema.index_of(field.name()) { + let (is_primary, index) = match primary_schema.index_of(field.name()) { Ok(i) => Ok((true, i)), Err(_) => { match secondary_schema.index_of(field.name()) { @@ -141,7 +146,7 @@ impl HashJoinExec { let is_left = is_primary && primary_is_left || !is_primary && !primary_is_left; - column_indices.push((column_index, is_left)); + column_indices.push(ColumnIndex { index, is_left }); } Ok(column_indices) @@ -286,7 +291,7 @@ struct HashJoinStream { /// right right: SendableRecordBatchStream, /// Information of index and left / right placement of columns - column_indices: Vec<(usize, bool)>, + column_indices: Vec, } impl RecordBatchStream for HashJoinStream { @@ -305,7 +310,7 @@ fn build_batch_from_indices( left: &Vec, right: &RecordBatch, indices: &[(JoinIndex, RightIndex)], - column_indices: &Vec<(usize, bool)>, + column_indices: &Vec, ) -> ArrowResult { if left.is_empty() { todo!("Create empty record batch"); @@ -318,12 +323,12 @@ fn build_batch_from_indices( let right_indices = indices.iter().map(|(_, join_index)| join_index).collect(); - for (column_index, is_left) in column_indices { - let array = if *is_left { + for column_index in column_indices { + let array = if column_index.is_left { // Note that we take `.data_ref()` to gather the [ArrayData] of each array. let arrays = left .iter() - .map(|batch| batch.column(*column_index).data_ref().as_ref()) + .map(|batch| batch.column(column_index.index).data_ref().as_ref()) .collect::>(); let mut mutable = MutableArrayData::new(arrays, true, indices.len()); @@ -337,7 +342,7 @@ fn build_batch_from_indices( make_array(Arc::new(mutable.freeze())) } else { // use the right indices - let array = right.column(*column_index); + let array = right.column(column_index.index); compute::take(array.as_ref(), &right_indices, None)? }; columns.push(array); @@ -412,7 +417,7 @@ fn build_batch( on_right: &HashSet, join_type: JoinType, schema: &Schema, - column_indices: &Vec<(usize, bool)>, + column_indices: &Vec, ) -> ArrowResult { let indices = build_join_indexes(&left_data.0, &batch, join_type, on_right).unwrap(); From 5b6bf8316cc692ea01f9a0f185e1eb2d14994083 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 1 Jan 2021 13:11:49 +0100 Subject: [PATCH 04/43] Add comments --- rust/datafusion/src/physical_plan/hash_join.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index 82032d5f133..d6b3b937004 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -81,8 +81,11 @@ pub struct HashJoinExec { build_side: Arc>>, } +/// Information about the index and placement (left or right) of the columns struct ColumnIndex { + /// Index of the column index: usize, + /// Whether the column is at the left or right side is_left: bool, } From 38d465121cd2a37ba4998bca223b39f90ec1307d Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 1 Jan 2021 13:27:09 +0100 Subject: [PATCH 05/43] Experiment with single batch --- rust/datafusion/src/physical_plan/coalesce_batches.rs | 2 +- rust/datafusion/src/physical_plan/hash_join.rs | 11 ++++++----- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/rust/datafusion/src/physical_plan/coalesce_batches.rs b/rust/datafusion/src/physical_plan/coalesce_batches.rs index 1be8c2e8221..8ab2fdf6d3c 100644 --- a/rust/datafusion/src/physical_plan/coalesce_batches.rs +++ b/rust/datafusion/src/physical_plan/coalesce_batches.rs @@ -194,7 +194,7 @@ impl RecordBatchStream for CoalesceBatchesStream { } } -fn concat_batches( +pub fn concat_batches( schema: &SchemaRef, batches: &[RecordBatch], row_count: usize, diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index d6b3b937004..47e614a2dce 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -46,6 +46,7 @@ use super::{ use crate::error::{DataFusionError, Result}; use super::{ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream}; +use crate::physical_plan::coalesce_batches::concat_batches; use ahash::RandomState; // An index of (batch, row) uniquely identifying a row in a part. @@ -83,7 +84,7 @@ pub struct HashJoinExec { /// Information about the index and placement (left or right) of the columns struct ColumnIndex { - /// Index of the column + /// Index of the column index: usize, /// Whether the column is at the left or right side is_left: bool, @@ -207,24 +208,24 @@ impl ExecutionPlan for HashJoinExec { .iter() .map(|on| on.0.clone()) .collect::>(); - // This operation performs 2 steps at once: // 1. creates a [JoinHashMap] of all batches from the stream // 2. stores the batches in a vector. let initial = (JoinHashMap::default(), Vec::new(), 0); - let left_data = stream + let (hashmap, batches, _len) = stream .try_fold(initial, |mut acc, batch| async { let hash = &mut acc.0; let values = &mut acc.1; let index = acc.2; - update_hash(&on_left, &batch, hash, index).unwrap(); + update_hash(&on_left, &batch, hash, 0).unwrap(); values.push(batch); acc.2 += 1; Ok(acc) }) .await?; + let single_batch = vec![concat_batches(&batches[0].schema(), &batches, 32768)?]; - let left_side = Arc::new((left_data.0, left_data.1)); + let left_side = Arc::new((hashmap, single_batch)); *build_side = Some(left_side.clone()); left_side } From 54b9c3ae2de2dedbcfaa6fe9e3faa61d8ca5c73a Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 1 Jan 2021 13:34:28 +0100 Subject: [PATCH 06/43] Use index as offset --- rust/datafusion/src/physical_plan/hash_join.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index 47e614a2dce..c5ea87dfc57 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -217,9 +217,9 @@ impl ExecutionPlan for HashJoinExec { let hash = &mut acc.0; let values = &mut acc.1; let index = acc.2; - update_hash(&on_left, &batch, hash, 0).unwrap(); + update_hash(&on_left, &batch, hash, index).unwrap(); + acc.2 += batch.num_rows(); values.push(batch); - acc.2 += 1; Ok(acc) }) .await?; @@ -276,8 +276,8 @@ fn update_hash( hash.raw_entry_mut() .from_key(&key) - .and_modify(|_, v| v.push((index, row))) - .or_insert_with(|| (key.clone(), vec![(index, row)])); + .and_modify(|_, v| v.push((0, row + index))) + .or_insert_with(|| (key.clone(), vec![(0, row + index)])); } Ok(()) } From 2db270c303a5879687f27120f15b18704ec599f0 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 1 Jan 2021 14:01:11 +0100 Subject: [PATCH 07/43] Use rows --- rust/datafusion/src/physical_plan/hash_join.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index c5ea87dfc57..4fe9f67cfca 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -212,7 +212,7 @@ impl ExecutionPlan for HashJoinExec { // 1. creates a [JoinHashMap] of all batches from the stream // 2. stores the batches in a vector. let initial = (JoinHashMap::default(), Vec::new(), 0); - let (hashmap, batches, _len) = stream + let (hashmap, batches, num_rows) = stream .try_fold(initial, |mut acc, batch| async { let hash = &mut acc.0; let values = &mut acc.1; @@ -223,7 +223,9 @@ impl ExecutionPlan for HashJoinExec { Ok(acc) }) .await?; - let single_batch = vec![concat_batches(&batches[0].schema(), &batches, 32768)?]; + + let single_batch = + vec![concat_batches(&batches[0].schema(), &batches, num_rows)?]; let left_side = Arc::new((hashmap, single_batch)); *build_side = Some(left_side.clone()); From 5842c730048c91812e7eb10e69590e25ff0411d3 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 1 Jan 2021 14:57:05 +0100 Subject: [PATCH 08/43] Refactor with single batch --- .../datafusion/src/physical_plan/hash_join.rs | 45 +++++++------------ 1 file changed, 17 insertions(+), 28 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index 4fe9f67cfca..33150588feb 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -27,7 +27,7 @@ use futures::{Stream, StreamExt, TryStreamExt}; use hashbrown::HashMap; use tokio::sync::Mutex; -use arrow::array::{make_array, Array, MutableArrayData}; +use arrow::array::Array; use arrow::datatypes::DataType; use arrow::datatypes::{Schema, SchemaRef}; use arrow::error::Result as ArrowResult; @@ -49,12 +49,12 @@ use super::{ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchS use crate::physical_plan::coalesce_batches::concat_batches; use ahash::RandomState; -// An index of (batch, row) uniquely identifying a row in a part. -type Index = (usize, usize); +// An index of uniquely identifying a row. +type Index = usize; // A pair (left index, right index) // Note that while this is currently equal to `Index`, the `JoinIndex` is semantically different // as a left join may issue None indices, in which case -type JoinIndex = Option<(usize, usize)>; +type JoinIndex = Option; // An index of row uniquely identifying a row in a batch type RightIndex = Option; @@ -62,7 +62,7 @@ type RightIndex = Option; // E.g. [1, 2] -> [(0, 3), (1, 6), (0, 8)] indicates that (column1, column2) = [1, 2] is true // for rows 3 and 8 from batch 0 and row 6 from batch 1. type JoinHashMap = HashMap, Vec, RandomState>; -type JoinLeftData = Arc<(JoinHashMap, Vec)>; +type JoinLeftData = Arc<(JoinHashMap, RecordBatch)>; /// join execution plan executes partitions in parallel and combines them into a set of /// partitions. @@ -225,7 +225,7 @@ impl ExecutionPlan for HashJoinExec { .await?; let single_batch = - vec![concat_batches(&batches[0].schema(), &batches, num_rows)?]; + concat_batches(&batches[0].schema(), &batches, num_rows)?; let left_side = Arc::new((hashmap, single_batch)); *build_side = Some(left_side.clone()); @@ -278,8 +278,8 @@ fn update_hash( hash.raw_entry_mut() .from_key(&key) - .and_modify(|_, v| v.push((0, row + index))) - .or_insert_with(|| (key.clone(), vec![(0, row + index)])); + .and_modify(|_, v| v.push(row + index)) + .or_insert_with(|| (key.clone(), vec![row + index])); } Ok(()) } @@ -313,39 +313,28 @@ impl RecordBatchStream for HashJoinStream { /// * fn build_batch_from_indices( schema: &Schema, - left: &Vec, + left: &RecordBatch, right: &RecordBatch, indices: &[(JoinIndex, RightIndex)], column_indices: &Vec, ) -> ArrowResult { - if left.is_empty() { - todo!("Create empty record batch"); - } - // build the columns of the new [RecordBatch]: // 1. pick whether the column is from the left or right // 2. based on the pick, `take` items from the different recordBatches let mut columns: Vec> = Vec::with_capacity(schema.fields().len()); + // TODO: u64 + let left_indices = indices + .iter() + .map(|(left_index, _)| left_index.map(|x| x as u32)) + .collect(); + let right_indices = indices.iter().map(|(_, join_index)| join_index).collect(); for column_index in column_indices { let array = if column_index.is_left { - // Note that we take `.data_ref()` to gather the [ArrayData] of each array. - let arrays = left - .iter() - .map(|batch| batch.column(column_index.index).data_ref().as_ref()) - .collect::>(); - - let mut mutable = MutableArrayData::new(arrays, true, indices.len()); - // use the left indices - for (join_index, _) in indices { - match join_index { - Some((batch, row)) => mutable.extend(*batch, *row, *row + 1), - None => mutable.extend_nulls(1), - } - } - make_array(Arc::new(mutable.freeze())) + let array = left.column(column_index.index); + compute::take(array.as_ref(), &left_indices, None)? } else { // use the right indices let array = right.column(column_index.index); From 1511f0eb770f3332d6559bb141dec10df68d39ee Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 1 Jan 2021 19:45:40 +0100 Subject: [PATCH 09/43] Refactor --- .../datafusion/src/physical_plan/hash_join.rs | 96 +++++++++---------- 1 file changed, 47 insertions(+), 49 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index 33150588feb..22be430e69f 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -18,7 +18,10 @@ //! Defines the join plan for executing partitions in parallel and then joining the results //! into a set of partitions. -use arrow::{array::ArrayRef, compute}; +use arrow::{ + array::{ArrayRef, UInt32Builder}, + compute, +}; use std::sync::Arc; use std::{any::Any, collections::HashSet}; @@ -49,19 +52,10 @@ use super::{ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchS use crate::physical_plan::coalesce_batches::concat_batches; use ahash::RandomState; -// An index of uniquely identifying a row. -type Index = usize; -// A pair (left index, right index) -// Note that while this is currently equal to `Index`, the `JoinIndex` is semantically different -// as a left join may issue None indices, in which case -type JoinIndex = Option; -// An index of row uniquely identifying a row in a batch -type RightIndex = Option; - // Maps ["on" value] -> [list of indices with this key's value] // E.g. [1, 2] -> [(0, 3), (1, 6), (0, 8)] indicates that (column1, column2) = [1, 2] is true // for rows 3 and 8 from batch 0 and row 6 from batch 1. -type JoinHashMap = HashMap, Vec, RandomState>; +type JoinHashMap = HashMap, Vec, RandomState>; type JoinLeftData = Arc<(JoinHashMap, RecordBatch)>; /// join execution plan executes partitions in parallel and combines them into a set of @@ -315,22 +309,15 @@ fn build_batch_from_indices( schema: &Schema, left: &RecordBatch, right: &RecordBatch, - indices: &[(JoinIndex, RightIndex)], + left_indices: UInt32Array, + right_indices: UInt32Array, column_indices: &Vec, ) -> ArrowResult { // build the columns of the new [RecordBatch]: // 1. pick whether the column is from the left or right - // 2. based on the pick, `take` items from the different recordBatches + // 2. based on the pick, `take` items from the different RecordBatches let mut columns: Vec> = Vec::with_capacity(schema.fields().len()); - // TODO: u64 - let left_indices = indices - .iter() - .map(|(left_index, _)| left_index.map(|x| x as u32)) - .collect(); - - let right_indices = indices.iter().map(|(_, join_index)| join_index).collect(); - for column_index in column_indices { let array = if column_index.is_left { let array = left.column(column_index.index); @@ -342,7 +329,7 @@ fn build_batch_from_indices( }; columns.push(array); } - Ok(RecordBatch::try_new(Arc::new(schema.clone()), columns)?) + RecordBatch::try_new(Arc::new(schema.clone()), columns) } /// Create a key `Vec` that is used as key for the hashmap @@ -414,9 +401,17 @@ fn build_batch( schema: &Schema, column_indices: &Vec, ) -> ArrowResult { - let indices = build_join_indexes(&left_data.0, &batch, join_type, on_right).unwrap(); - - build_batch_from_indices(schema, &left_data.1, batch, &indices, column_indices) + let (left_indices, right_indices) = + build_join_indexes(&left_data.0, &batch, join_type, on_right).unwrap(); + + build_batch_from_indices( + schema, + &left_data.1, + batch, + left_indices, + right_indices, + column_indices, + ) } /// returns a vector with (index from left, index from right). @@ -451,7 +446,7 @@ fn build_join_indexes( right: &RecordBatch, join_type: JoinType, right_on: &HashSet, -) -> Result> { +) -> Result<(UInt32Array, UInt32Array)> { let keys_values = right_on .iter() .map(|name| Ok(col(name).evaluate(right)?.into_array(right.num_rows()))) @@ -459,10 +454,11 @@ fn build_join_indexes( let mut key = Vec::with_capacity(keys_values.len()); + let mut left_indices = UInt32Builder::new(0); + let mut right_indices = UInt32Builder::new(0); + match join_type { JoinType::Inner => { - let mut indexes = Vec::new(); // unknown a prior size - // Visit all of the right rows for row in 0..right.num_rows() { // Get the key and find it in the build index @@ -470,16 +466,15 @@ fn build_join_indexes( let left_indexes = left.get(&key); // for every item on the left and right with this key, add the respective pair - left_indexes.unwrap_or(&vec![]).iter().for_each(|x| { + for x in left_indexes.unwrap_or(&vec![]) { // on an inner join, left and right indices are present - indexes.push((Some(*x), Some(row as u32))); - }) + left_indices.append_value(*x as u32)?; + right_indices.append_value(row as u32)?; + } } - Ok(indexes) + Ok((left_indices.finish(), right_indices.finish())) } JoinType::Left => { - let mut indexes = Vec::new(); // unknown a prior size - // Keep track of which item is visited in the build input // TODO: this can be stored more efficiently with a marker let mut is_visited = HashSet::new(); @@ -492,42 +487,45 @@ fn build_join_indexes( if let Some(indices) = left_indexes { is_visited.insert(key.clone()); - indices.iter().for_each(|x| { - indexes.push((Some(*x), Some(row as u32))); - }) + for x in indices { + left_indices.append_value(*x as u32)?; + right_indices.append_value(row as u32)?; + } }; } // Add the remaining left rows to the result set with None on the right side for (key, indices) in left { if !is_visited.contains(key) { - indices.iter().for_each(|x| { - indexes.push((Some(*x), None)); - }); + for x in indices { + left_indices.append_value(*x as u32)?; + right_indices.append_null()?; + }; } } - Ok(indexes) + Ok((left_indices.finish(), right_indices.finish())) } JoinType::Right => { - let mut indexes = Vec::new(); // unknown a prior size for row in 0..right.num_rows() { create_key(&keys_values, row, &mut key)?; - let left_indices = left.get(&key); + let left_indexes = left.get(&key); - match left_indices { + match left_indexes { Some(indices) => { - indices.iter().for_each(|x| { - indexes.push((Some(*x), Some(row as u32))); - }); + for x in indices { + left_indices.append_value(*x as u32)?; + right_indices.append_value(row as u32)?; + } } None => { // when no match, add the row with None for the left side - indexes.push((None, Some(row as u32))); + left_indices.append_null()?; + right_indices.append_value(row as u32)?; } } } - Ok(indexes) + Ok((left_indices.finish(), right_indices.finish())) } } } From a976ace567e18c01337e4e6f94e852ff7ccc901f Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sat, 2 Jan 2021 17:31:17 +0100 Subject: [PATCH 10/43] Small simplification --- rust/datafusion/src/physical_plan/hash_join.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index 22be430e69f..4372feb796d 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -380,7 +380,7 @@ pub(crate) fn create_key( // store the size vec.extend(value.len().to_le_bytes().iter()); // store the string value - vec.extend(array.value(row).as_bytes().iter()); + vec.extend(value.as_bytes().iter()); } _ => { // This is internal because we should have caught this before. From 797ffb6c8a41cbe9fd0c06e1d0379680b83a1197 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sat, 2 Jan 2021 19:17:24 +0100 Subject: [PATCH 11/43] Add comment --- rust/datafusion/src/physical_plan/coalesce_batches.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/rust/datafusion/src/physical_plan/coalesce_batches.rs b/rust/datafusion/src/physical_plan/coalesce_batches.rs index 8ab2fdf6d3c..2c5209018f9 100644 --- a/rust/datafusion/src/physical_plan/coalesce_batches.rs +++ b/rust/datafusion/src/physical_plan/coalesce_batches.rs @@ -194,6 +194,7 @@ impl RecordBatchStream for CoalesceBatchesStream { } } +/// Concatenates an array of `RecordBatch` into one batch pub fn concat_batches( schema: &SchemaRef, batches: &[RecordBatch], From b865b8a1d876956e969e7c4263f033db45a8cc52 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sun, 3 Jan 2021 05:11:50 +0100 Subject: [PATCH 12/43] Small simplifiction --- .../datafusion/src/physical_plan/hash_join.rs | 27 ++++++++++--------- 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index 3f8c42a7bcd..23033ec366c 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -18,8 +18,11 @@ //! Defines the join plan for executing partitions in parallel and then joining the results //! into a set of partitions. -use arrow::{array::{TimestampMicrosecondArray, TimestampNanosecondArray, UInt32Builder}, datatypes::TimeUnit}; use arrow::{array::ArrayRef, compute}; +use arrow::{ + array::{TimestampMicrosecondArray, TimestampNanosecondArray, UInt32Builder}, + datatypes::TimeUnit, +}; use std::sync::Arc; use std::{any::Any, collections::HashSet}; @@ -342,23 +345,23 @@ pub(crate) fn create_key( match col.data_type() { DataType::UInt8 => { let array = col.as_any().downcast_ref::().unwrap(); - vec.extend(array.value(row).to_le_bytes().iter()); + vec.extend_from_slice(&array.value(row).to_le_bytes()); } DataType::UInt16 => { let array = col.as_any().downcast_ref::().unwrap(); - vec.extend(array.value(row).to_le_bytes().iter()); + vec.extend_from_slice(&array.value(row).to_le_bytes()); } DataType::UInt32 => { let array = col.as_any().downcast_ref::().unwrap(); - vec.extend(array.value(row).to_le_bytes().iter()); + vec.extend_from_slice(&array.value(row).to_le_bytes()); } DataType::UInt64 => { let array = col.as_any().downcast_ref::().unwrap(); - vec.extend(array.value(row).to_le_bytes().iter()); + vec.extend_from_slice(&array.value(row).to_le_bytes()); } DataType::Int8 => { let array = col.as_any().downcast_ref::().unwrap(); - vec.extend(array.value(row).to_le_bytes().iter()); + vec.extend_from_slice(&array.value(row).to_le_bytes()); } DataType::Int16 => { let array = col.as_any().downcast_ref::().unwrap(); @@ -366,33 +369,33 @@ pub(crate) fn create_key( } DataType::Int32 => { let array = col.as_any().downcast_ref::().unwrap(); - vec.extend(array.value(row).to_le_bytes().iter()); + vec.extend_from_slice(&array.value(row).to_le_bytes()); } DataType::Int64 => { let array = col.as_any().downcast_ref::().unwrap(); - vec.extend(array.value(row).to_le_bytes().iter()); + vec.extend_from_slice(&array.value(row).to_le_bytes()); } DataType::Timestamp(TimeUnit::Microsecond, None) => { let array = col .as_any() .downcast_ref::() .unwrap(); - vec.extend(array.value(row).to_le_bytes().iter()); + vec.extend_from_slice(&array.value(row).to_le_bytes()); } DataType::Timestamp(TimeUnit::Nanosecond, None) => { let array = col .as_any() .downcast_ref::() .unwrap(); - vec.extend(array.value(row).to_le_bytes().iter()); + vec.extend_from_slice(&array.value(row).to_le_bytes()); } DataType::Utf8 => { let array = col.as_any().downcast_ref::().unwrap(); let value = array.value(row); // store the size - vec.extend(value.len().to_le_bytes().iter()); + vec.extend_from_slice(&value.len().to_le_bytes()); // store the string value - vec.extend(value.as_bytes().iter()); + vec.extend_from_slice(value.as_bytes()); } _ => { // This is internal because we should have caught this before. From 0efb5a7bd2f692040ccc5c70c5fd607a5c995577 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sun, 3 Jan 2021 05:16:12 +0100 Subject: [PATCH 13/43] Reuse num rows --- rust/datafusion/src/physical_plan/hash_join.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index a4081d68fbc..862a5577106 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -222,8 +222,6 @@ impl ExecutionPlan for HashJoinExec { Ok(acc) }) .await?; - let num_rows: usize = - batches.iter().map(|batch| batch.num_rows()).sum(); let single_batch = concat_batches(&batches[0].schema(), &batches, num_rows)?; From bbec07005b090bbb4aa063a7df543069aeadbcaf Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sun, 3 Jan 2021 05:17:08 +0100 Subject: [PATCH 14/43] Rename as offset --- rust/datafusion/src/physical_plan/hash_join.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index 862a5577106..b6d02fe5e7f 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -215,8 +215,8 @@ impl ExecutionPlan for HashJoinExec { .try_fold(initial, |mut acc, batch| async { let hash = &mut acc.0; let values = &mut acc.1; - let index = acc.2; - update_hash(&on_left, &batch, hash, index).unwrap(); + let offset = acc.2; + update_hash(&on_left, &batch, hash, offset).unwrap(); acc.2 += batch.num_rows(); values.push(batch); Ok(acc) @@ -274,7 +274,7 @@ fn update_hash( on: &HashSet, batch: &RecordBatch, hash: &mut JoinHashMap, - index: usize, + offset: usize, ) -> Result<()> { // evaluate the keys let keys_values = on @@ -290,8 +290,8 @@ fn update_hash( hash.raw_entry_mut() .from_key(&key) - .and_modify(|_, v| v.push(row + index)) - .or_insert_with(|| (key.clone(), vec![row + index])); + .and_modify(|_, v| v.push(row + offset)) + .or_insert_with(|| (key.clone(), vec![row + offset])); } Ok(()) } From 6d182c957ad1947778c6012fea86c9fc224dbd54 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sun, 3 Jan 2021 05:23:23 +0100 Subject: [PATCH 15/43] Use u64 for left indices --- rust/datafusion/src/physical_plan/hash_join.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index b6d02fe5e7f..32e16276981 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -18,7 +18,7 @@ //! Defines the join plan for executing partitions in parallel and then joining the results //! into a set of partitions. -use arrow::{array::ArrayRef, compute}; +use arrow::{array::{ArrayRef, UInt64Builder}, compute}; use arrow::{ array::{TimestampMicrosecondArray, TimestampNanosecondArray, UInt32Builder}, datatypes::TimeUnit, @@ -337,7 +337,7 @@ fn build_batch_from_indices( schema: &Schema, left: &RecordBatch, right: &RecordBatch, - left_indices: UInt32Array, + left_indices: UInt64Array, right_indices: UInt32Array, column_indices: &Vec, ) -> ArrowResult { @@ -487,7 +487,7 @@ fn build_join_indexes( right: &RecordBatch, join_type: JoinType, right_on: &HashSet, -) -> Result<(UInt32Array, UInt32Array)> { +) -> Result<(UInt64Array, UInt32Array)> { let keys_values = right_on .iter() .map(|name| Ok(col(name).evaluate(right)?.into_array(right.num_rows()))) @@ -495,7 +495,7 @@ fn build_join_indexes( let mut key = Vec::with_capacity(keys_values.len()); - let mut left_indices = UInt32Builder::new(0); + let mut left_indices = UInt64Builder::new(0); let mut right_indices = UInt32Builder::new(0); match join_type { @@ -509,7 +509,7 @@ fn build_join_indexes( // for every item on the left and right with this key, add the respective pair for x in left_indexes.unwrap_or(&vec![]) { // on an inner join, left and right indices are present - left_indices.append_value(*x as u32)?; + left_indices.append_value(*x as u64)?; right_indices.append_value(row as u32)?; } } @@ -529,7 +529,7 @@ fn build_join_indexes( is_visited.insert(key.clone()); for x in indices { - left_indices.append_value(*x as u32)?; + left_indices.append_value(*x as u64)?; right_indices.append_value(row as u32)?; } }; @@ -538,7 +538,7 @@ fn build_join_indexes( for (key, indices) in left { if !is_visited.contains(key) { for x in indices { - left_indices.append_value(*x as u32)?; + left_indices.append_value(*x as u64)?; right_indices.append_null()?; } } @@ -555,7 +555,7 @@ fn build_join_indexes( match left_indexes { Some(indices) => { for x in indices { - left_indices.append_value(*x as u32)?; + left_indices.append_value(*x as u64)?; right_indices.append_value(row as u32)?; } } From d58abf638a2a311d1657c29733f89ce395db40b3 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sun, 3 Jan 2021 06:12:13 +0100 Subject: [PATCH 16/43] Use appending by slice, small refactor --- .../datafusion/src/physical_plan/hash_join.rs | 41 +++++++++++-------- 1 file changed, 25 insertions(+), 16 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index 32e16276981..5a7226741a7 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -18,7 +18,10 @@ //! Defines the join plan for executing partitions in parallel and then joining the results //! into a set of partitions. -use arrow::{array::{ArrayRef, UInt64Builder}, compute}; +use arrow::{ + array::{ArrayRef, UInt64Builder}, + compute, +}; use arrow::{ array::{TimestampMicrosecondArray, TimestampNanosecondArray, UInt32Builder}, datatypes::TimeUnit, @@ -58,7 +61,7 @@ use log::debug; // Maps ["on" value] -> [list of indices with this key's value] // E.g. [1, 2] -> [(0, 3), (1, 6), (0, 8)] indicates that (column1, column2) = [1, 2] is true // for rows 3 and 8 from batch 0 and row 6 from batch 1. -type JoinHashMap = HashMap, Vec, RandomState>; +type JoinHashMap = HashMap, Vec, RandomState>; type JoinLeftData = Arc<(JoinHashMap, RecordBatch)>; /// join execution plan executes partitions in parallel and combines them into a set of @@ -223,6 +226,8 @@ impl ExecutionPlan for HashJoinExec { }) .await?; + // Merge all batches into a single batch, so we + // can directly index into the arrays let single_batch = concat_batches(&batches[0].schema(), &batches, num_rows)?; @@ -290,8 +295,8 @@ fn update_hash( hash.raw_entry_mut() .from_key(&key) - .and_modify(|_, v| v.push(row + offset)) - .or_insert_with(|| (key.clone(), vec![row + offset])); + .and_modify(|_, v| v.push(row as u64 + offset as u64)) + .or_insert_with(|| (key.clone(), vec![row as u64 + offset as u64])); } Ok(()) } @@ -505,12 +510,15 @@ fn build_join_indexes( // Get the key and find it in the build index create_key(&keys_values, row, &mut key)?; let left_indexes = left.get(&key); - // for every item on the left and right with this key, add the respective pair - for x in left_indexes.unwrap_or(&vec![]) { - // on an inner join, left and right indices are present - left_indices.append_value(*x as u64)?; - right_indices.append_value(row as u32)?; + + if let Some(indices) = left_indexes { + left_indices.append_slice(&indices)?; + + for _ in 0..indices.len() { + // on an inner join, left and right indices are present + right_indices.append_value(row as u32)?; + } } } Ok((left_indices.finish(), right_indices.finish())) @@ -527,9 +535,9 @@ fn build_join_indexes( if let Some(indices) = left_indexes { is_visited.insert(key.clone()); - - for x in indices { - left_indices.append_value(*x as u64)?; + left_indices.append_slice(&indices)?; + for _ in 0..indices.len() { + // on an inner join, left and right indices are present right_indices.append_value(row as u32)?; } }; @@ -537,8 +545,8 @@ fn build_join_indexes( // Add the remaining left rows to the result set with None on the right side for (key, indices) in left { if !is_visited.contains(key) { - for x in indices { - left_indices.append_value(*x as u64)?; + left_indices.append_slice(&indices)?; + for _ in 0..indices.len() { right_indices.append_null()?; } } @@ -554,8 +562,9 @@ fn build_join_indexes( match left_indexes { Some(indices) => { - for x in indices { - left_indices.append_value(*x as u64)?; + left_indices.append_slice(&indices)?; + + for _ in 0..indices.len() { right_indices.append_value(row as u32)?; } } From 69d328663a9434bb4f58748d802e854411e6d49b Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sun, 3 Jan 2021 06:29:51 +0100 Subject: [PATCH 17/43] Style tweak --- rust/datafusion/src/physical_plan/hash_join.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index 5a7226741a7..20535a2c49e 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -295,8 +295,8 @@ fn update_hash( hash.raw_entry_mut() .from_key(&key) - .and_modify(|_, v| v.push(row as u64 + offset as u64)) - .or_insert_with(|| (key.clone(), vec![row as u64 + offset as u64])); + .and_modify(|_, v| v.push((row + offset) as u64)) + .or_insert_with(|| (key.clone(), vec![(row + offset) as u64])); } Ok(()) } From c96ba9fa533c8973e76d2cbada55a948e8331513 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sun, 3 Jan 2021 06:39:09 +0100 Subject: [PATCH 18/43] Style --- rust/datafusion/src/physical_plan/hash_join.rs | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index 20535a2c49e..c297eccf37c 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -509,10 +509,9 @@ fn build_join_indexes( for row in 0..right.num_rows() { // Get the key and find it in the build index create_key(&keys_values, row, &mut key)?; - let left_indexes = left.get(&key); - // for every item on the left and right with this key, add the respective pair - if let Some(indices) = left_indexes { + // for every item on the left and right with this key, add the respective pair + if let Some(indices) = left.get(&key) { left_indices.append_slice(&indices)?; for _ in 0..indices.len() { @@ -531,9 +530,8 @@ fn build_join_indexes( // First visit all of the rows for row in 0..right.num_rows() { create_key(&keys_values, row, &mut key)?; - let left_indexes = left.get(&key); - if let Some(indices) = left_indexes { + if let Some(indices) = left.get(&key) { is_visited.insert(key.clone()); left_indices.append_slice(&indices)?; for _ in 0..indices.len() { @@ -558,9 +556,7 @@ fn build_join_indexes( for row in 0..right.num_rows() { create_key(&keys_values, row, &mut key)?; - let left_indexes = left.get(&key); - - match left_indexes { + match left.get(&key) { Some(indices) => { left_indices.append_slice(&indices)?; From 33b96f3c667ab4d824a56f16e6166c2107d051ec Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sun, 3 Jan 2021 12:05:56 +0100 Subject: [PATCH 19/43] Doc updates --- rust/datafusion/src/physical_plan/hash_join.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index c297eccf37c..31056dc47d0 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -515,7 +515,6 @@ fn build_join_indexes( left_indices.append_slice(&indices)?; for _ in 0..indices.len() { - // on an inner join, left and right indices are present right_indices.append_value(row as u32)?; } } @@ -525,6 +524,9 @@ fn build_join_indexes( JoinType::Left => { // Keep track of which item is visited in the build input // TODO: this can be stored more efficiently with a marker + // https://issues.apache.org/jira/browse/ARROW-11116 + // TODO: Fix LEFT join with multiple right batches + // https://issues.apache.org/jira/browse/ARROW-10971 let mut is_visited = HashSet::new(); // First visit all of the rows @@ -535,7 +537,6 @@ fn build_join_indexes( is_visited.insert(key.clone()); left_indices.append_slice(&indices)?; for _ in 0..indices.len() { - // on an inner join, left and right indices are present right_indices.append_value(row as u32)?; } }; From c3bdde147ffa9f18e9002a8824a6369326833913 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Wed, 6 Jan 2021 21:10:42 +0100 Subject: [PATCH 20/43] Start vectorized hashing --- .../datafusion/src/physical_plan/hash_join.rs | 234 ++++++++++++++---- 1 file changed, 186 insertions(+), 48 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index 31056dc47d0..10ea2521ab0 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -26,9 +26,9 @@ use arrow::{ array::{TimestampMicrosecondArray, TimestampNanosecondArray, UInt32Builder}, datatypes::TimeUnit, }; -use std::sync::Arc; use std::time::Instant; use std::{any::Any, collections::HashSet}; +use std::{hash::Hasher, sync::Arc}; use async_trait::async_trait; use futures::{Stream, StreamExt, TryStreamExt}; @@ -61,7 +61,7 @@ use log::debug; // Maps ["on" value] -> [list of indices with this key's value] // E.g. [1, 2] -> [(0, 3), (1, 6), (0, 8)] indicates that (column1, column2) = [1, 2] is true // for rows 3 and 8 from batch 0 and row 6 from batch 1. -type JoinHashMap = HashMap, Vec, RandomState>; +type JoinHashMap = HashMap, RandomState>; type JoinLeftData = Arc<(JoinHashMap, RecordBatch)>; /// join execution plan executes partitions in parallel and combines them into a set of @@ -194,6 +194,7 @@ impl ExecutionPlan for HashJoinExec { async fn execute(&self, partition: usize) -> Result { // we only want to compute the build side once + let random_state = RandomState::new(); let left_data = { let mut build_side = self.build_side.lock().await; match build_side.as_ref() { @@ -209,17 +210,18 @@ impl ExecutionPlan for HashJoinExec { .on .iter() .map(|on| on.0.clone()) - .collect::>(); + .collect::>(); // This operation performs 2 steps at once: // 1. creates a [JoinHashMap] of all batches from the stream // 2. stores the batches in a vector. - let initial = (JoinHashMap::default(), Vec::new(), 0); - let (hashmap, batches, num_rows) = stream + let initial = (JoinHashMap::default(), Vec::new(), 0, Vec::new()); + let (hashmap, batches, num_rows, _) = stream .try_fold(initial, |mut acc, batch| async { let hash = &mut acc.0; let values = &mut acc.1; let offset = acc.2; - update_hash(&on_left, &batch, hash, offset).unwrap(); + update_hash(&on_left, &batch, hash, offset, &mut acc.3, &random_state) + .unwrap(); acc.2 += batch.num_rows(); values.push(batch); Ok(acc) @@ -254,12 +256,12 @@ impl ExecutionPlan for HashJoinExec { .on .iter() .map(|on| on.1.clone()) - .collect::>(); + .collect::>(); let column_indices = self.column_indices_from_schema()?; Ok(Box::pin(HashJoinStream { schema: self.schema.clone(), - on_right, + on_right: on_right, join_type: self.join_type, left_data, right: stream, @@ -269,6 +271,7 @@ impl ExecutionPlan for HashJoinExec { num_output_batches: 0, num_output_rows: 0, join_time: 0, + random_state })) } } @@ -276,10 +279,12 @@ impl ExecutionPlan for HashJoinExec { /// Updates `hash` with new entries from [RecordBatch] evaluated against the expressions `on`, /// assuming that the [RecordBatch] corresponds to the `index`th fn update_hash( - on: &HashSet, + on: &[String], batch: &RecordBatch, hash: &mut JoinHashMap, offset: usize, + hash_buf: &mut Vec, + random_state: &RandomState ) -> Result<()> { // evaluate the keys let keys_values = on @@ -287,16 +292,15 @@ fn update_hash( .map(|name| Ok(col(name).evaluate(batch)?.into_array(batch.num_rows()))) .collect::>>()?; - let mut key = Vec::with_capacity(keys_values.len()); - // update the hash map - for row in 0..batch.num_rows() { - create_key(&keys_values, row, &mut key)?; + let hash_values = create_hashes(&keys_values, &random_state, hash_buf)?; + // insert hashes to key of the hashmap + for (row, hash_value) in hash_values.iter().enumerate() { hash.raw_entry_mut() - .from_key(&key) + .from_key(hash_value) .and_modify(|_, v| v.push((row + offset) as u64)) - .or_insert_with(|| (key.clone(), vec![(row + offset) as u64])); + .or_insert_with(|| (*hash_value, vec![(row + offset) as u64])); } Ok(()) } @@ -306,7 +310,7 @@ struct HashJoinStream { /// Input schema schema: Arc, /// columns from the right used to compute the hash - on_right: HashSet, + on_right: Vec, /// type of the join join_type: JoinType, /// information from the left @@ -325,6 +329,7 @@ struct HashJoinStream { num_output_rows: usize, /// total time for joining probe-side batches to the build-side batches join_time: usize, + random_state: RandomState, } impl RecordBatchStream for HashJoinStream { @@ -442,13 +447,15 @@ pub(crate) fn create_key( fn build_batch( batch: &RecordBatch, left_data: &JoinLeftData, - on_right: &HashSet, + on_right: &[String], join_type: JoinType, schema: &Schema, column_indices: &Vec, + random_state: &RandomState, ) -> ArrowResult { let (left_indices, right_indices) = - build_join_indexes(&left_data.0, &batch, join_type, on_right).unwrap(); + build_join_indexes(&left_data.0, &batch, join_type, on_right, random_state) + .unwrap(); build_batch_from_indices( schema, @@ -491,31 +498,34 @@ fn build_join_indexes( left: &JoinHashMap, right: &RecordBatch, join_type: JoinType, - right_on: &HashSet, + right_on: &[String], + random_state: &RandomState, ) -> Result<(UInt64Array, UInt32Array)> { let keys_values = right_on .iter() .map(|name| Ok(col(name).evaluate(right)?.into_array(right.num_rows()))) .collect::>>()?; - let mut key = Vec::with_capacity(keys_values.len()); - let mut left_indices = UInt64Builder::new(0); let mut right_indices = UInt32Builder::new(0); + let buf = &mut Vec::new(); + let hash_values = create_hashes(&keys_values, &random_state, buf)?; match join_type { JoinType::Inner => { // Visit all of the right rows for row in 0..right.num_rows() { - // Get the key and find it in the build index - create_key(&keys_values, row, &mut key)?; - - // for every item on the left and right with this key, add the respective pair - if let Some(indices) = left.get(&key) { - left_indices.append_slice(&indices)?; - - for _ in 0..indices.len() { - right_indices.append_value(row as u32)?; + // Get the hash and find it in the build index + + // for every item on the left and right we check if it matches + if let Some(indices) = left.get(&hash_values[row]) + { + for &i in indices { + // TODO: collision check + if true { + left_indices.append_value(i)?; + right_indices.append_value(row as u32)?; + } } } } @@ -531,21 +541,23 @@ fn build_join_indexes( // First visit all of the rows for row in 0..right.num_rows() { - create_key(&keys_values, row, &mut key)?; - - if let Some(indices) = left.get(&key) { - is_visited.insert(key.clone()); - left_indices.append_slice(&indices)?; - for _ in 0..indices.len() { - right_indices.append_value(row as u32)?; + if let Some(indices) = left.get(&hash_values[row]) + { + for &i in indices { + // Collision check + if true { + left_indices.append_value(i)?; + right_indices.append_value(row as u32)?; + is_visited.insert(i); + } } }; } // Add the remaining left rows to the result set with None on the right side - for (key, indices) in left { - if !is_visited.contains(key) { - left_indices.append_slice(&indices)?; - for _ in 0..indices.len() { + for (_, indices) in left { + for i in indices { + if !is_visited.contains(i) { + left_indices.append_slice(&indices)?; right_indices.append_null()?; } } @@ -555,14 +567,14 @@ fn build_join_indexes( } JoinType::Right => { for row in 0..right.num_rows() { - create_key(&keys_values, row, &mut key)?; - - match left.get(&key) { + match left.get(&hash_values[row]) + { Some(indices) => { - left_indices.append_slice(&indices)?; - - for _ in 0..indices.len() { - right_indices.append_value(row as u32)?; + for &i in indices { + if true { + left_indices.append_value(i)?; + right_indices.append_value(row as u32)?; + } } } None => { @@ -576,6 +588,131 @@ fn build_join_indexes( } } } +use core::hash::BuildHasher; + +/// Creates hash values for every +fn create_hashes<'a>( + arrays: &[ArrayRef], + random_state: &RandomState, + buf: &'a mut Vec, +) -> Result> { + let rows = arrays[0].len(); + buf.fill(0); + buf.resize(rows, 0); + + let mut hashes = vec![0; rows]; + + for col in arrays { + match col.data_type() { + DataType::UInt8 => { + let array = col.as_any().downcast_ref::().unwrap(); + + for (i, hash) in hashes.iter_mut().enumerate() { + let mut hasher = random_state.build_hasher(); + hasher.write_u8(array.value(i)); + *hash = hasher.finish().overflowing_add(*hash).0; + } + } + DataType::UInt16 => { + let array = col.as_any().downcast_ref::().unwrap(); + + for (i, hash) in hashes.iter_mut().enumerate() { + let mut hasher = random_state.build_hasher(); + hasher.write_u16(array.value(i)); + *hash = hasher.finish().overflowing_add(*hash).0; + } + } + DataType::UInt32 => { + let array = col.as_any().downcast_ref::().unwrap(); + + for (i, hash) in hashes.iter_mut().enumerate() { + let mut hasher = random_state.build_hasher(); + hasher.write_u32(array.value(i)); + *hash = hasher.finish().overflowing_add(*hash).0; + } + } + DataType::UInt64 => { + let array = col.as_any().downcast_ref::().unwrap(); + + for (i, hash) in hashes.iter_mut().enumerate() { + let mut hasher = random_state.build_hasher(); + hasher.write_u64(array.value(i)); + *hash = hasher.finish().overflowing_add(*hash).0; + } + } + DataType::Int8 => { + let array = col.as_any().downcast_ref::().unwrap(); + + for (i, hash) in hashes.iter_mut().enumerate() { + let mut hasher = random_state.build_hasher(); + hasher.write_i8(array.value(i)); + *hash = hasher.finish().overflowing_add(*hash).0; + } + } + DataType::Int16 => { + let array = col.as_any().downcast_ref::().unwrap(); + for (i, hash) in hashes.iter_mut().enumerate() { + let mut hasher = random_state.build_hasher(); + hasher.write_i16(array.value(i)); + *hash = hasher.finish().overflowing_add(*hash).0; + } + } + DataType::Int32 => { + let array = col.as_any().downcast_ref::().unwrap(); + for (i, hash) in hashes.iter_mut().enumerate() { + let mut hasher = random_state.build_hasher(); + hasher.write_i32(array.value(i)); + *hash = hasher.finish().overflowing_add(*hash).0; + } + } + DataType::Int64 => { + let array = col.as_any().downcast_ref::().unwrap(); + for (i, hash) in hashes.iter_mut().enumerate() { + let mut hasher = random_state.build_hasher(); + hasher.write_i64(array.value(i)); + *hash = hasher.finish().overflowing_add(*hash).0; + } + } + DataType::Timestamp(TimeUnit::Microsecond, None) => { + let array = col + .as_any() + .downcast_ref::() + .unwrap(); + for (i, hash) in hashes.iter_mut().enumerate() { + let mut hasher = random_state.build_hasher(); + hasher.write_i64(array.value(i)); + *hash = hasher.finish().overflowing_add(*hash).0; + } + } + DataType::Timestamp(TimeUnit::Nanosecond, None) => { + let array = col + .as_any() + .downcast_ref::() + .unwrap(); + for (i, hash) in hashes.iter_mut().enumerate() { + let mut hasher = random_state.build_hasher(); + hasher.write_i64(array.value(i)); + *hash = hasher.finish().overflowing_add(*hash).0; + } + } + DataType::Utf8 => { + let array = col.as_any().downcast_ref::().unwrap(); + for (i, hash) in hashes.iter_mut().enumerate() { + let mut hasher = random_state.build_hasher(); + hasher.write(array.value(i).as_bytes()); + *hash = hasher.finish().overflowing_add(*hash).0; + } + } + _ => { + // This is internal because we should have caught this before. + return Err(DataFusionError::Internal( + "Unsupported GROUP BY data type".to_string(), + )); + } + } + } + Ok(hashes) +} impl Stream for HashJoinStream { type Item = ArrowResult; @@ -596,6 +733,7 @@ impl Stream for HashJoinStream { self.join_type, &self.schema, &self.column_indices, + &self.random_state, ); self.num_input_batches += 1; self.num_input_rows += batch.num_rows(); From 9cf80237f743c9cb58650b05c77237a1035f3bdc Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Wed, 6 Jan 2021 22:10:12 +0100 Subject: [PATCH 21/43] Fmt --- .../datafusion/src/physical_plan/hash_join.rs | 39 +++++++++---------- 1 file changed, 18 insertions(+), 21 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index 10ea2521ab0..5fac5cf3350 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -206,11 +206,8 @@ impl ExecutionPlan for HashJoinExec { let merge = MergeExec::new(self.left.clone()); let stream = merge.execute(0).await?; - let on_left = self - .on - .iter() - .map(|on| on.0.clone()) - .collect::>(); + let on_left = + self.on.iter().map(|on| on.0.clone()).collect::>(); // This operation performs 2 steps at once: // 1. creates a [JoinHashMap] of all batches from the stream // 2. stores the batches in a vector. @@ -220,8 +217,15 @@ impl ExecutionPlan for HashJoinExec { let hash = &mut acc.0; let values = &mut acc.1; let offset = acc.2; - update_hash(&on_left, &batch, hash, offset, &mut acc.3, &random_state) - .unwrap(); + update_hash( + &on_left, + &batch, + hash, + offset, + &mut acc.3, + &random_state, + ) + .unwrap(); acc.2 += batch.num_rows(); values.push(batch); Ok(acc) @@ -252,11 +256,7 @@ impl ExecutionPlan for HashJoinExec { // over the right that uses this information to issue new batches. let stream = self.right.execute(partition).await?; - let on_right = self - .on - .iter() - .map(|on| on.1.clone()) - .collect::>(); + let on_right = self.on.iter().map(|on| on.1.clone()).collect::>(); let column_indices = self.column_indices_from_schema()?; Ok(Box::pin(HashJoinStream { @@ -271,7 +271,7 @@ impl ExecutionPlan for HashJoinExec { num_output_batches: 0, num_output_rows: 0, join_time: 0, - random_state + random_state, })) } } @@ -284,7 +284,7 @@ fn update_hash( hash: &mut JoinHashMap, offset: usize, hash_buf: &mut Vec, - random_state: &RandomState + random_state: &RandomState, ) -> Result<()> { // evaluate the keys let keys_values = on @@ -518,8 +518,7 @@ fn build_join_indexes( // Get the hash and find it in the build index // for every item on the left and right we check if it matches - if let Some(indices) = left.get(&hash_values[row]) - { + if let Some(indices) = left.get(&hash_values[row]) { for &i in indices { // TODO: collision check if true { @@ -541,8 +540,7 @@ fn build_join_indexes( // First visit all of the rows for row in 0..right.num_rows() { - if let Some(indices) = left.get(&hash_values[row]) - { + if let Some(indices) = left.get(&hash_values[row]) { for &i in indices { // Collision check if true { @@ -567,8 +565,7 @@ fn build_join_indexes( } JoinType::Right => { for row in 0..right.num_rows() { - match left.get(&hash_values[row]) - { + match left.get(&hash_values[row]) { Some(indices) => { for &i in indices { if true { @@ -590,7 +587,7 @@ fn build_join_indexes( } use core::hash::BuildHasher; -/// Creates hash values for every +/// Creates hash values for every fn create_hashes<'a>( arrays: &[ArrayRef], random_state: &RandomState, From dc0c03675c0e6d1fc69b1be54dcc9a27e0386db3 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Wed, 6 Jan 2021 22:27:21 +0100 Subject: [PATCH 22/43] Function to combine hash values --- .../datafusion/src/physical_plan/hash_join.rs | 29 ++++++++++++------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index 5fac5cf3350..fc316afc95a 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -587,6 +587,13 @@ fn build_join_indexes( } use core::hash::BuildHasher; +// Simple function to combine two hashes +fn combine_hashes(l: u64, r: u64) -> u64{ + let mut hash = (17 * 37u64).overflowing_add(l).0; + hash = hash.overflowing_mul(37).0.overflowing_add(r).0; + return hash; +} + /// Creates hash values for every fn create_hashes<'a>( arrays: &[ArrayRef], @@ -607,7 +614,7 @@ fn create_hashes<'a>( for (i, hash) in hashes.iter_mut().enumerate() { let mut hasher = random_state.build_hasher(); hasher.write_u8(array.value(i)); - *hash = hasher.finish().overflowing_add(*hash).0; + *hash = combine_hashes(hasher.finish(), *hash); } } DataType::UInt16 => { @@ -616,7 +623,7 @@ fn create_hashes<'a>( for (i, hash) in hashes.iter_mut().enumerate() { let mut hasher = random_state.build_hasher(); hasher.write_u16(array.value(i)); - *hash = hasher.finish().overflowing_add(*hash).0; + *hash = combine_hashes(hasher.finish(), *hash); } } DataType::UInt32 => { @@ -625,7 +632,7 @@ fn create_hashes<'a>( for (i, hash) in hashes.iter_mut().enumerate() { let mut hasher = random_state.build_hasher(); hasher.write_u32(array.value(i)); - *hash = hasher.finish().overflowing_add(*hash).0; + *hash = combine_hashes(hasher.finish(), *hash); } } DataType::UInt64 => { @@ -634,7 +641,7 @@ fn create_hashes<'a>( for (i, hash) in hashes.iter_mut().enumerate() { let mut hasher = random_state.build_hasher(); hasher.write_u64(array.value(i)); - *hash = hasher.finish().overflowing_add(*hash).0; + *hash = combine_hashes(hasher.finish(), *hash); } } DataType::Int8 => { @@ -643,7 +650,7 @@ fn create_hashes<'a>( for (i, hash) in hashes.iter_mut().enumerate() { let mut hasher = random_state.build_hasher(); hasher.write_i8(array.value(i)); - *hash = hasher.finish().overflowing_add(*hash).0; + *hash = combine_hashes(hasher.finish(), *hash); } } DataType::Int16 => { @@ -651,7 +658,7 @@ fn create_hashes<'a>( for (i, hash) in hashes.iter_mut().enumerate() { let mut hasher = random_state.build_hasher(); hasher.write_i16(array.value(i)); - *hash = hasher.finish().overflowing_add(*hash).0; + *hash = combine_hashes(hasher.finish(), *hash); } } DataType::Int32 => { @@ -659,7 +666,7 @@ fn create_hashes<'a>( for (i, hash) in hashes.iter_mut().enumerate() { let mut hasher = random_state.build_hasher(); hasher.write_i32(array.value(i)); - *hash = hasher.finish().overflowing_add(*hash).0; + *hash = combine_hashes(hasher.finish(), *hash); } } DataType::Int64 => { @@ -667,7 +674,7 @@ fn create_hashes<'a>( for (i, hash) in hashes.iter_mut().enumerate() { let mut hasher = random_state.build_hasher(); hasher.write_i64(array.value(i)); - *hash = hasher.finish().overflowing_add(*hash).0; + *hash = combine_hashes(hasher.finish(), *hash); } } DataType::Timestamp(TimeUnit::Microsecond, None) => { @@ -678,7 +685,7 @@ fn create_hashes<'a>( for (i, hash) in hashes.iter_mut().enumerate() { let mut hasher = random_state.build_hasher(); hasher.write_i64(array.value(i)); - *hash = hasher.finish().overflowing_add(*hash).0; + *hash = combine_hashes(hasher.finish(), *hash); } } DataType::Timestamp(TimeUnit::Nanosecond, None) => { @@ -689,7 +696,7 @@ fn create_hashes<'a>( for (i, hash) in hashes.iter_mut().enumerate() { let mut hasher = random_state.build_hasher(); hasher.write_i64(array.value(i)); - *hash = hasher.finish().overflowing_add(*hash).0; + *hash = combine_hashes(hasher.finish(), *hash); } } DataType::Utf8 => { @@ -697,7 +704,7 @@ fn create_hashes<'a>( for (i, hash) in hashes.iter_mut().enumerate() { let mut hasher = random_state.build_hasher(); hasher.write(array.value(i).as_bytes()); - *hash = hasher.finish().overflowing_add(*hash).0; + *hash = combine_hashes(hasher.finish(), *hash); } } _ => { From a2537ef3c674188ef78738f8de43c89a201e39d9 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Thu, 7 Jan 2021 08:37:49 +0100 Subject: [PATCH 23/43] Fix test --- .../datafusion/src/physical_plan/hash_join.rs | 68 ++++++++----------- 1 file changed, 27 insertions(+), 41 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index fc316afc95a..d18c9711f73 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -80,6 +80,8 @@ pub struct HashJoinExec { schema: SchemaRef, /// Build-side build_side: Arc>>, + + random_state: Arc, } /// Information about the index and placement (left or right) of the columns @@ -116,6 +118,8 @@ impl HashJoinExec { .map(|(l, r)| (l.to_string(), r.to_string())) .collect(); + let random_state = RandomState::new(); + Ok(HashJoinExec { left, right, @@ -123,15 +127,14 @@ impl HashJoinExec { join_type: *join_type, schema, build_side: Arc::new(Mutex::new(None)), + random_state: Arc::new(random_state), }) } /// Calculates column indices and left/right placement on input / output schemas and jointype fn column_indices_from_schema(&self) -> ArrowResult> { let (primary_is_left, primary_schema, secondary_schema) = match self.join_type { - JoinType::Inner | JoinType::Left => { - (true, self.left.schema(), self.right.schema()) - } + JoinType::Inner | JoinType::Left => (true, self.left.schema(), self.right.schema()), JoinType::Right => (false, self.right.schema(), self.left.schema()), }; let mut column_indices = Vec::with_capacity(self.schema.fields().len()); @@ -148,8 +151,7 @@ impl HashJoinExec { } }.map_err(DataFusionError::into_arrow_external_error)?; - let is_left = - is_primary && primary_is_left || !is_primary && !primary_is_left; + let is_left = is_primary && primary_is_left || !is_primary && !primary_is_left; column_indices.push(ColumnIndex { index, is_left }); } @@ -194,7 +196,6 @@ impl ExecutionPlan for HashJoinExec { async fn execute(&self, partition: usize) -> Result { // we only want to compute the build side once - let random_state = RandomState::new(); let left_data = { let mut build_side = self.build_side.lock().await; match build_side.as_ref() { @@ -206,8 +207,7 @@ impl ExecutionPlan for HashJoinExec { let merge = MergeExec::new(self.left.clone()); let stream = merge.execute(0).await?; - let on_left = - self.on.iter().map(|on| on.0.clone()).collect::>(); + let on_left = self.on.iter().map(|on| on.0.clone()).collect::>(); // This operation performs 2 steps at once: // 1. creates a [JoinHashMap] of all batches from the stream // 2. stores the batches in a vector. @@ -217,15 +217,8 @@ impl ExecutionPlan for HashJoinExec { let hash = &mut acc.0; let values = &mut acc.1; let offset = acc.2; - update_hash( - &on_left, - &batch, - hash, - offset, - &mut acc.3, - &random_state, - ) - .unwrap(); + update_hash(&on_left, &batch, hash, offset, &mut acc.3, &self.random_state) + .unwrap(); acc.2 += batch.num_rows(); values.push(batch); Ok(acc) @@ -234,8 +227,7 @@ impl ExecutionPlan for HashJoinExec { // Merge all batches into a single batch, so we // can directly index into the arrays - let single_batch = - concat_batches(&batches[0].schema(), &batches, num_rows)?; + let single_batch = concat_batches(&batches[0].schema(), &batches, num_rows)?; let left_side = Arc::new((hashmap, single_batch)); @@ -271,7 +263,7 @@ impl ExecutionPlan for HashJoinExec { num_output_batches: 0, num_output_rows: 0, join_time: 0, - random_state, + random_state: self.random_state.clone(), })) } } @@ -329,7 +321,7 @@ struct HashJoinStream { num_output_rows: usize, /// total time for joining probe-side batches to the build-side batches join_time: usize, - random_state: RandomState, + random_state: Arc, } impl RecordBatchStream for HashJoinStream { @@ -370,11 +362,7 @@ fn build_batch_from_indices( } /// Create a key `Vec` that is used as key for the hashmap -pub(crate) fn create_key( - group_by_keys: &[ArrayRef], - row: usize, - vec: &mut Vec, -) -> Result<()> { +pub(crate) fn create_key(group_by_keys: &[ArrayRef], row: usize, vec: &mut Vec) -> Result<()> { vec.clear(); for i in 0..group_by_keys.len() { let col = &group_by_keys[i]; @@ -454,8 +442,7 @@ fn build_batch( random_state: &RandomState, ) -> ArrowResult { let (left_indices, right_indices) = - build_join_indexes(&left_data.0, &batch, join_type, on_right, random_state) - .unwrap(); + build_join_indexes(&left_data.0, &batch, join_type, on_right, random_state).unwrap(); build_batch_from_indices( schema, @@ -510,6 +497,7 @@ fn build_join_indexes( let mut right_indices = UInt32Builder::new(0); let buf = &mut Vec::new(); let hash_values = create_hashes(&keys_values, &random_state, buf)?; + println!("{:?}", left); match join_type { JoinType::Inner => { @@ -587,8 +575,8 @@ fn build_join_indexes( } use core::hash::BuildHasher; -// Simple function to combine two hashes -fn combine_hashes(l: u64, r: u64) -> u64{ +// Simple function to combine two hashes +fn combine_hashes(l: u64, r: u64) -> u64 { let mut hash = (17 * 37u64).overflowing_add(l).0; hash = hash.overflowing_mul(37).0.overflowing_add(r).0; return hash; @@ -601,10 +589,10 @@ fn create_hashes<'a>( buf: &'a mut Vec, ) -> Result> { let rows = arrays[0].len(); - buf.fill(0); buf.resize(rows, 0); let mut hashes = vec![0; rows]; + println!("{:?}", hashes); for col in arrays { match col.data_type() { @@ -665,6 +653,7 @@ fn create_hashes<'a>( let array = col.as_any().downcast_ref::().unwrap(); for (i, hash) in hashes.iter_mut().enumerate() { let mut hasher = random_state.build_hasher(); + println!("{}", array.value(i)); hasher.write_i32(array.value(i)); *hash = combine_hashes(hasher.finish(), *hash); } @@ -715,6 +704,7 @@ fn create_hashes<'a>( } } } + println!("{:?}", hashes); Ok(hashes) } @@ -911,12 +901,10 @@ mod tests { ("b2", &vec![1, 2]), ("c1", &vec![7, 8]), ); - let batch2 = - build_table_i32(("a1", &vec![2]), ("b2", &vec![2]), ("c1", &vec![9])); + let batch2 = build_table_i32(("a1", &vec![2]), ("b2", &vec![2]), ("c1", &vec![9])); let schema = batch1.schema(); - let left = Arc::new( - MemoryExec::try_new(&vec![vec![batch1], vec![batch2]], schema, None).unwrap(), - ); + let left = + Arc::new(MemoryExec::try_new(&vec![vec![batch1], vec![batch2]], schema, None).unwrap()); let right = build_table( ("a1", &vec![1, 2, 3]), @@ -956,12 +944,10 @@ mod tests { ("b1", &vec![4, 6]), ("c2", &vec![70, 80]), ); - let batch2 = - build_table_i32(("a2", &vec![30]), ("b1", &vec![5]), ("c2", &vec![90])); + let batch2 = build_table_i32(("a2", &vec![30]), ("b1", &vec![5]), ("c2", &vec![90])); let schema = batch1.schema(); - let right = Arc::new( - MemoryExec::try_new(&vec![vec![batch1], vec![batch2]], schema, None).unwrap(), - ); + let right = + Arc::new(MemoryExec::try_new(&vec![vec![batch1], vec![batch2]], schema, None).unwrap()); let on = &[("b1", "b1")]; From 703438f41f82f7874b6a1cc85dc82f2d3a0e0fe7 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Thu, 7 Jan 2021 08:41:30 +0100 Subject: [PATCH 24/43] Remove prints --- rust/datafusion/src/physical_plan/hash_join.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index d18c9711f73..106a4e713f7 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -497,7 +497,6 @@ fn build_join_indexes( let mut right_indices = UInt32Builder::new(0); let buf = &mut Vec::new(); let hash_values = create_hashes(&keys_values, &random_state, buf)?; - println!("{:?}", left); match join_type { JoinType::Inner => { @@ -592,7 +591,6 @@ fn create_hashes<'a>( buf.resize(rows, 0); let mut hashes = vec![0; rows]; - println!("{:?}", hashes); for col in arrays { match col.data_type() { @@ -653,7 +651,6 @@ fn create_hashes<'a>( let array = col.as_any().downcast_ref::().unwrap(); for (i, hash) in hashes.iter_mut().enumerate() { let mut hasher = random_state.build_hasher(); - println!("{}", array.value(i)); hasher.write_i32(array.value(i)); *hash = combine_hashes(hasher.finish(), *hash); } @@ -704,7 +701,6 @@ fn create_hashes<'a>( } } } - println!("{:?}", hashes); Ok(hashes) } From 24537dad1ae7ff8cbf9477312677762d8fe39c21 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Thu, 7 Jan 2021 08:51:27 +0100 Subject: [PATCH 25/43] fmt --- .../datafusion/src/physical_plan/hash_join.rs | 49 +++++++++++++------ 1 file changed, 35 insertions(+), 14 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index 106a4e713f7..11b930019c8 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -134,7 +134,9 @@ impl HashJoinExec { /// Calculates column indices and left/right placement on input / output schemas and jointype fn column_indices_from_schema(&self) -> ArrowResult> { let (primary_is_left, primary_schema, secondary_schema) = match self.join_type { - JoinType::Inner | JoinType::Left => (true, self.left.schema(), self.right.schema()), + JoinType::Inner | JoinType::Left => { + (true, self.left.schema(), self.right.schema()) + } JoinType::Right => (false, self.right.schema(), self.left.schema()), }; let mut column_indices = Vec::with_capacity(self.schema.fields().len()); @@ -151,7 +153,8 @@ impl HashJoinExec { } }.map_err(DataFusionError::into_arrow_external_error)?; - let is_left = is_primary && primary_is_left || !is_primary && !primary_is_left; + let is_left = + is_primary && primary_is_left || !is_primary && !primary_is_left; column_indices.push(ColumnIndex { index, is_left }); } @@ -207,7 +210,8 @@ impl ExecutionPlan for HashJoinExec { let merge = MergeExec::new(self.left.clone()); let stream = merge.execute(0).await?; - let on_left = self.on.iter().map(|on| on.0.clone()).collect::>(); + let on_left = + self.on.iter().map(|on| on.0.clone()).collect::>(); // This operation performs 2 steps at once: // 1. creates a [JoinHashMap] of all batches from the stream // 2. stores the batches in a vector. @@ -217,8 +221,15 @@ impl ExecutionPlan for HashJoinExec { let hash = &mut acc.0; let values = &mut acc.1; let offset = acc.2; - update_hash(&on_left, &batch, hash, offset, &mut acc.3, &self.random_state) - .unwrap(); + update_hash( + &on_left, + &batch, + hash, + offset, + &mut acc.3, + &self.random_state, + ) + .unwrap(); acc.2 += batch.num_rows(); values.push(batch); Ok(acc) @@ -227,7 +238,8 @@ impl ExecutionPlan for HashJoinExec { // Merge all batches into a single batch, so we // can directly index into the arrays - let single_batch = concat_batches(&batches[0].schema(), &batches, num_rows)?; + let single_batch = + concat_batches(&batches[0].schema(), &batches, num_rows)?; let left_side = Arc::new((hashmap, single_batch)); @@ -362,7 +374,11 @@ fn build_batch_from_indices( } /// Create a key `Vec` that is used as key for the hashmap -pub(crate) fn create_key(group_by_keys: &[ArrayRef], row: usize, vec: &mut Vec) -> Result<()> { +pub(crate) fn create_key( + group_by_keys: &[ArrayRef], + row: usize, + vec: &mut Vec, +) -> Result<()> { vec.clear(); for i in 0..group_by_keys.len() { let col = &group_by_keys[i]; @@ -442,7 +458,8 @@ fn build_batch( random_state: &RandomState, ) -> ArrowResult { let (left_indices, right_indices) = - build_join_indexes(&left_data.0, &batch, join_type, on_right, random_state).unwrap(); + build_join_indexes(&left_data.0, &batch, join_type, on_right, random_state) + .unwrap(); build_batch_from_indices( schema, @@ -897,10 +914,12 @@ mod tests { ("b2", &vec![1, 2]), ("c1", &vec![7, 8]), ); - let batch2 = build_table_i32(("a1", &vec![2]), ("b2", &vec![2]), ("c1", &vec![9])); + let batch2 = + build_table_i32(("a1", &vec![2]), ("b2", &vec![2]), ("c1", &vec![9])); let schema = batch1.schema(); - let left = - Arc::new(MemoryExec::try_new(&vec![vec![batch1], vec![batch2]], schema, None).unwrap()); + let left = Arc::new( + MemoryExec::try_new(&vec![vec![batch1], vec![batch2]], schema, None).unwrap(), + ); let right = build_table( ("a1", &vec![1, 2, 3]), @@ -940,10 +959,12 @@ mod tests { ("b1", &vec![4, 6]), ("c2", &vec![70, 80]), ); - let batch2 = build_table_i32(("a2", &vec![30]), ("b1", &vec![5]), ("c2", &vec![90])); + let batch2 = + build_table_i32(("a2", &vec![30]), ("b1", &vec![5]), ("c2", &vec![90])); let schema = batch1.schema(); - let right = - Arc::new(MemoryExec::try_new(&vec![vec![batch1], vec![batch2]], schema, None).unwrap()); + let right = Arc::new( + MemoryExec::try_new(&vec![vec![batch1], vec![batch2]], schema, None).unwrap(), + ); let on = &[("b1", "b1")]; From 28cd876098641c210080ca7e68cf8c9eba8e467f Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Thu, 7 Jan 2021 22:05:32 +0100 Subject: [PATCH 26/43] Clippy --- .../datafusion/src/physical_plan/hash_join.rs | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index ba048e0e581..5da95b49217 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -265,7 +265,7 @@ impl ExecutionPlan for HashJoinExec { let column_indices = self.column_indices_from_schema()?; Ok(Box::pin(HashJoinStream { schema: self.schema.clone(), - on_right: on_right, + on_right, join_type: self.join_type, left_data, right: stream, @@ -518,11 +518,11 @@ fn build_join_indexes( match join_type { JoinType::Inner => { // Visit all of the right rows - for row in 0..right.num_rows() { + for (row, hash_value) in hash_values.iter().enumerate() { // Get the hash and find it in the build index // for every item on the left and right we check if it matches - if let Some(indices) = left.get(&hash_values[row]) { + if let Some(indices) = left.get(hash_value) { for &i in indices { // TODO: collision check if true { @@ -543,8 +543,8 @@ fn build_join_indexes( let mut is_visited = HashSet::new(); // First visit all of the rows - for row in 0..right.num_rows() { - if let Some(indices) = left.get(&hash_values[row]) { + for (row, hash_value) in hash_values.iter().enumerate() { + if let Some(indices) = left.get(hash_value) { for &i in indices { // Collision check if true { @@ -568,8 +568,8 @@ fn build_join_indexes( Ok((left_indices.finish(), right_indices.finish())) } JoinType::Right => { - for row in 0..right.num_rows() { - match left.get(&hash_values[row]) { + for (row, hash_value) in hash_values.iter().enumerate() { + match left.get(hash_value) { Some(indices) => { for &i in indices { if true { @@ -593,9 +593,8 @@ use core::hash::BuildHasher; // Simple function to combine two hashes fn combine_hashes(l: u64, r: u64) -> u64 { - let mut hash = (17 * 37u64).overflowing_add(l).0; - hash = hash.overflowing_mul(37).0.overflowing_add(r).0; - return hash; + let hash = (17 * 37u64).overflowing_add(l).0; + hash.overflowing_mul(37).0.overflowing_add(r).0 } /// Creates hash values for every From b167b3b4f96b5d4fbd32066728847ca83b9f4361 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Thu, 7 Jan 2021 22:31:57 +0100 Subject: [PATCH 27/43] Cleanup --- rust/datafusion/src/physical_plan/hash_join.rs | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index 5da95b49217..9b0964149fb 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -215,8 +215,8 @@ impl ExecutionPlan for HashJoinExec { // This operation performs 2 steps at once: // 1. creates a [JoinHashMap] of all batches from the stream // 2. stores the batches in a vector. - let initial = (JoinHashMap::default(), Vec::new(), 0, Vec::new()); - let (hashmap, batches, num_rows, _) = stream + let initial = (JoinHashMap::default(), Vec::new(), 0); + let (hashmap, batches, num_rows) = stream .try_fold(initial, |mut acc, batch| async { let hash = &mut acc.0; let values = &mut acc.1; @@ -226,7 +226,6 @@ impl ExecutionPlan for HashJoinExec { &batch, hash, offset, - &mut acc.3, &self.random_state, ) .unwrap(); @@ -287,7 +286,6 @@ fn update_hash( batch: &RecordBatch, hash: &mut JoinHashMap, offset: usize, - hash_buf: &mut Vec, random_state: &RandomState, ) -> Result<()> { // evaluate the keys @@ -297,7 +295,7 @@ fn update_hash( .collect::>>()?; // update the hash map - let hash_values = create_hashes(&keys_values, &random_state, hash_buf)?; + let hash_values = create_hashes(&keys_values, &random_state)?; // insert hashes to key of the hashmap for (row, hash_value) in hash_values.iter().enumerate() { @@ -509,8 +507,7 @@ fn build_join_indexes( .map(|name| Ok(col(name).evaluate(right)?.into_array(right.num_rows()))) .collect::>>()?; - let buf = &mut Vec::new(); - let hash_values = create_hashes(&keys_values, &random_state, buf)?; + let hash_values = create_hashes(&keys_values, &random_state)?; let mut left_indices = UInt64Builder::new(0); let mut right_indices = UInt32Builder::new(0); @@ -601,11 +598,8 @@ fn combine_hashes(l: u64, r: u64) -> u64 { fn create_hashes<'a>( arrays: &[ArrayRef], random_state: &RandomState, - buf: &'a mut Vec, ) -> Result> { let rows = arrays[0].len(); - buf.resize(rows, 0); - let mut hashes = vec![0; rows]; for col in arrays { From e25cf38093c696f0631276d815bdff778aea3fb5 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 8 Jan 2021 07:42:46 +0100 Subject: [PATCH 28/43] Avoid rehashing using custom Hasher --- .../datafusion/src/physical_plan/hash_join.rs | 37 +++++++++++++++++-- 1 file changed, 33 insertions(+), 4 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index 9b0964149fb..ed9bb27c6f2 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -18,6 +18,7 @@ //! Defines the join plan for executing partitions in parallel and then joining the results //! into a set of partitions. +use ahash::RandomState; use arrow::{ array::{ArrayRef, UInt64Builder}, compute, @@ -55,13 +56,12 @@ use crate::error::{DataFusionError, Result}; use super::{ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream}; use crate::physical_plan::coalesce_batches::concat_batches; -use ahash::RandomState; use log::debug; // Maps ["on" value] -> [list of indices with this key's value] // E.g. [1, 2] -> [(0, 3), (1, 6), (0, 8)] indicates that (column1, column2) = [1, 2] is true // for rows 3 and 8 from batch 0 and row 6 from batch 1. -type JoinHashMap = HashMap, RandomState>; +type JoinHashMap = HashMap, IdHashBuilder>; type JoinLeftData = Arc<(JoinHashMap, RecordBatch)>; /// join execution plan executes partitions in parallel and combines them into a set of @@ -215,7 +215,7 @@ impl ExecutionPlan for HashJoinExec { // This operation performs 2 steps at once: // 1. creates a [JoinHashMap] of all batches from the stream // 2. stores the batches in a vector. - let initial = (JoinHashMap::default(), Vec::new(), 0); + let initial = (JoinHashMap::with_hasher(IdHashBuilder{}), Vec::new(), 0); let (hashmap, batches, num_rows) = stream .try_fold(initial, |mut acc, batch| async { let hash = &mut acc.0; @@ -300,7 +300,7 @@ fn update_hash( // insert hashes to key of the hashmap for (row, hash_value) in hash_values.iter().enumerate() { hash.raw_entry_mut() - .from_key(hash_value) + .from_key_hashed_nocheck(*hash_value, hash_value) .and_modify(|_, v| v.push((row + offset) as u64)) .or_insert_with(|| (*hash_value, vec![(row + offset) as u64])); } @@ -588,6 +588,35 @@ fn build_join_indexes( } use core::hash::BuildHasher; +struct IdHasher { + hash: u64, +} + +impl Hasher for IdHasher { + fn finish(&self) -> u64 { + self.hash + } + + fn write_u64(&mut self, i: u64) { + self.hash = i; + } + + fn write(&mut self, _bytes: &[u8]) { + unreachable!("IdHasher should only be used for u64 keys") + } +} + +#[derive(Debug)] +struct IdHashBuilder {} + +impl BuildHasher for IdHashBuilder { + type Hasher = IdHasher; + + fn build_hasher(&self) -> Self::Hasher { + IdHasher { hash: 0 } + } +} + // Simple function to combine two hashes fn combine_hashes(l: u64, r: u64) -> u64 { let hash = (17 * 37u64).overflowing_add(l).0; From 0106769afa9126ba9b50270eb7d690eee9fdfef8 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 8 Jan 2021 19:01:28 +0100 Subject: [PATCH 29/43] Doc and remove `Arc` --- rust/datafusion/src/physical_plan/hash_join.rs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index ed9bb27c6f2..a90f449dd29 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -81,7 +81,7 @@ pub struct HashJoinExec { /// Build-side build_side: Arc>>, - random_state: Arc, + random_state: RandomState, } /// Information about the index and placement (left or right) of the columns @@ -127,7 +127,7 @@ impl HashJoinExec { join_type: *join_type, schema, build_side: Arc::new(Mutex::new(None)), - random_state: Arc::new(random_state), + random_state: random_state, }) } @@ -331,7 +331,8 @@ struct HashJoinStream { num_output_rows: usize, /// total time for joining probe-side batches to the build-side batches join_time: usize, - random_state: Arc, + /// Random state used for hashing initialization + random_state: RandomState, } impl RecordBatchStream for HashJoinStream { @@ -587,7 +588,9 @@ fn build_join_indexes( } } use core::hash::BuildHasher; - + +/// `Hasher` that returns the same `u64` value as a hash, to avoid re-hashing +/// it when inserting/indexing or regrowing the `HashMap` struct IdHasher { hash: u64, } @@ -617,7 +620,7 @@ impl BuildHasher for IdHashBuilder { } } -// Simple function to combine two hashes +// Combines two hashes into one hash fn combine_hashes(l: u64, r: u64) -> u64 { let hash = (17 * 37u64).overflowing_add(l).0; hash.overflowing_mul(37).0.overflowing_add(r).0 From dcc9045d18cf8d8501cfb7d7e40090dcfbf96c05 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 8 Jan 2021 22:25:00 +0100 Subject: [PATCH 30/43] Implement collision deection --- .../datafusion/src/physical_plan/hash_join.rs | 178 ++++++++++++++++-- 1 file changed, 161 insertions(+), 17 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index a90f449dd29..1f2076f88ed 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -20,7 +20,7 @@ use ahash::RandomState; use arrow::{ - array::{ArrayRef, UInt64Builder}, + array::{ArrayRef, BooleanArray, LargeStringArray, UInt64Builder}, compute, }; use arrow::{ @@ -198,6 +198,8 @@ impl ExecutionPlan for HashJoinExec { } async fn execute(&self, partition: usize) -> Result { + let on_left = self.on.iter().map(|on| on.0.clone()).collect::>(); + // we only want to compute the build side once let left_data = { let mut build_side = self.build_side.lock().await; @@ -210,12 +212,11 @@ impl ExecutionPlan for HashJoinExec { let merge = MergeExec::new(self.left.clone()); let stream = merge.execute(0).await?; - let on_left = - self.on.iter().map(|on| on.0.clone()).collect::>(); // This operation performs 2 steps at once: // 1. creates a [JoinHashMap] of all batches from the stream // 2. stores the batches in a vector. - let initial = (JoinHashMap::with_hasher(IdHashBuilder{}), Vec::new(), 0); + let initial = + (JoinHashMap::with_hasher(IdHashBuilder {}), Vec::new(), 0); let (hashmap, batches, num_rows) = stream .try_fold(initial, |mut acc, batch| async { let hash = &mut acc.0; @@ -264,6 +265,7 @@ impl ExecutionPlan for HashJoinExec { let column_indices = self.column_indices_from_schema()?; Ok(Box::pin(HashJoinStream { schema: self.schema.clone(), + on_left, on_right, join_type: self.join_type, left_data, @@ -311,6 +313,8 @@ fn update_hash( struct HashJoinStream { /// Input schema schema: Arc, + /// columns from the left + on_left: Vec, /// columns from the right used to compute the hash on_right: Vec, /// type of the join @@ -352,7 +356,7 @@ fn build_batch_from_indices( right: &RecordBatch, left_indices: UInt64Array, right_indices: UInt32Array, - column_indices: &Vec, + column_indices: &[ColumnIndex], ) -> ArrowResult { // build the columns of the new [RecordBatch]: // 1. pick whether the column is from the left or right @@ -449,15 +453,22 @@ pub(crate) fn create_key( fn build_batch( batch: &RecordBatch, left_data: &JoinLeftData, + on_left: &[String], on_right: &[String], join_type: JoinType, schema: &Schema, - column_indices: &Vec, + column_indices: &[ColumnIndex], random_state: &RandomState, ) -> ArrowResult { - let (left_indices, right_indices) = - build_join_indexes(&left_data.0, &batch, join_type, on_right, random_state) - .unwrap(); + let (left_indices, right_indices) = build_join_indexes( + &left_data, + &batch, + join_type, + on_left, + on_right, + random_state, + ) + .unwrap(); build_batch_from_indices( schema, @@ -497,9 +508,10 @@ fn build_batch( // (1, 1) (1, 1) // (1, 0) (1, 2) fn build_join_indexes( - left: &JoinHashMap, + left_data: &JoinLeftData, right: &RecordBatch, join_type: JoinType, + left_on: &[String], right_on: &[String], random_state: &RandomState, ) -> Result<(UInt64Array, UInt32Array)> { @@ -507,8 +519,17 @@ fn build_join_indexes( .iter() .map(|name| Ok(col(name).evaluate(right)?.into_array(right.num_rows()))) .collect::>>()?; + let left_join_values = left_on + .iter() + .map(|name| { + Ok(col(name) + .evaluate(&left_data.1)? + .into_array(left_data.1.num_rows())) + }) + .collect::>>()?; let hash_values = create_hashes(&keys_values, &random_state)?; + let left = &left_data.0; let mut left_indices = UInt64Builder::new(0); let mut right_indices = UInt32Builder::new(0); @@ -522,8 +543,7 @@ fn build_join_indexes( // for every item on the left and right we check if it matches if let Some(indices) = left.get(hash_value) { for &i in indices { - // TODO: collision check - if true { + if is_eq(i as usize, row, &left_join_values, &keys_values)? { left_indices.append_value(i)?; right_indices.append_value(row as u32)?; } @@ -545,7 +565,7 @@ fn build_join_indexes( if let Some(indices) = left.get(hash_value) { for &i in indices { // Collision check - if true { + if is_eq(i as usize, row, &left_join_values, &keys_values)? { left_indices.append_value(i)?; right_indices.append_value(row as u32)?; is_visited.insert(i); @@ -555,7 +575,7 @@ fn build_join_indexes( } // Add the remaining left rows to the result set with None on the right side for (_, indices) in left { - for i in indices { + for i in indices.iter() { if !is_visited.contains(i) { left_indices.append_slice(&indices)?; right_indices.append_null()?; @@ -570,7 +590,7 @@ fn build_join_indexes( match left.get(hash_value) { Some(indices) => { for &i in indices { - if true { + if is_eq(i as usize, row, &left_join_values, &keys_values)? { left_indices.append_value(i)?; right_indices.append_value(row as u32)?; } @@ -588,7 +608,7 @@ fn build_join_indexes( } } use core::hash::BuildHasher; - + /// `Hasher` that returns the same `u64` value as a hash, to avoid re-hashing /// it when inserting/indexing or regrowing the `HashMap` struct IdHasher { @@ -626,6 +646,129 @@ fn combine_hashes(l: u64, r: u64) -> u64 { hash.overflowing_mul(37).0.overflowing_add(r).0 } +fn is_eq( + left: usize, + right: usize, + left_arrays: &[ArrayRef], + right_arrays: &[ArrayRef], +) -> Result { + let mut err = None; + let res = left_arrays + .iter() + .zip(right_arrays) + .all(|(l, r)| match l.data_type() { + DataType::Null => true, + DataType::Boolean => { + l.as_any() + .downcast_ref::() + .unwrap() + .value(left) + == r.as_any() + .downcast_ref::() + .unwrap() + .value(right) + } + DataType::Int8 => { + l.as_any().downcast_ref::().unwrap().value(left) + == r.as_any().downcast_ref::().unwrap().value(right) + } + DataType::Int16 => { + l.as_any().downcast_ref::().unwrap().value(left) + == r.as_any() + .downcast_ref::() + .unwrap() + .value(right) + } + DataType::Int32 => { + l.as_any().downcast_ref::().unwrap().value(left) + == r.as_any() + .downcast_ref::() + .unwrap() + .value(right) + } + DataType::Int64 => { + l.as_any().downcast_ref::().unwrap().value(left) + == r.as_any() + .downcast_ref::() + .unwrap() + .value(right) + } + DataType::UInt8 => { + l.as_any().downcast_ref::().unwrap().value(left) + == r.as_any() + .downcast_ref::() + .unwrap() + .value(right) + } + DataType::UInt16 => { + l.as_any() + .downcast_ref::() + .unwrap() + .value(left) + == r.as_any() + .downcast_ref::() + .unwrap() + .value(right) + } + DataType::UInt32 => { + l.as_any() + .downcast_ref::() + .unwrap() + .value(left) + == r.as_any() + .downcast_ref::() + .unwrap() + .value(right) + } + DataType::UInt64 => { + l.as_any() + .downcast_ref::() + .unwrap() + .value(left) + == r.as_any() + .downcast_ref::() + .unwrap() + .value(right) + } + DataType::Timestamp(_, None) => { + l.as_any().downcast_ref::().unwrap().value(left) + == r.as_any() + .downcast_ref::() + .unwrap() + .value(right) + } + DataType::Utf8 => { + l.as_any() + .downcast_ref::() + .unwrap() + .value(left) + == r.as_any() + .downcast_ref::() + .unwrap() + .value(right) + } + DataType::LargeUtf8 => { + l.as_any() + .downcast_ref::() + .unwrap() + .value(left) + == r.as_any() + .downcast_ref::() + .unwrap() + .value(right) + } + _ => { + // This is internal because we should have caught this before. + err = Some(Err(DataFusionError::Internal( + "Unsupported data type in hasher".to_string(), + ))); + false + } + }); + + err.unwrap_or(Ok(res)) +} + /// Creates hash values for every fn create_hashes<'a>( arrays: &[ArrayRef], @@ -738,7 +881,7 @@ fn create_hashes<'a>( _ => { // This is internal because we should have caught this before. return Err(DataFusionError::Internal( - "Unsupported GROUP BY data type".to_string(), + "Unsupported data type in hasher".to_string(), )); } } @@ -761,6 +904,7 @@ impl Stream for HashJoinStream { let result = build_batch( &batch, &self.left_data, + &self.on_left, &self.on_right, self.join_type, &self.schema, From 4c9e2a0fe0ff1263b7da0c698f543a205cd09831 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sat, 9 Jan 2021 10:31:23 +0100 Subject: [PATCH 31/43] Clippy --- rust/datafusion/src/physical_plan/hash_join.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index 1f2076f88ed..f2ed91564a3 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -127,7 +127,7 @@ impl HashJoinExec { join_type: *join_type, schema, build_side: Arc::new(Mutex::new(None)), - random_state: random_state, + random_state, }) } @@ -450,6 +450,7 @@ pub(crate) fn create_key( Ok(()) } +#[allow(clippy::too_many_arguments)] fn build_batch( batch: &RecordBatch, left_data: &JoinLeftData, @@ -770,7 +771,7 @@ fn is_eq( } /// Creates hash values for every -fn create_hashes<'a>( +fn create_hashes( arrays: &[ArrayRef], random_state: &RandomState, ) -> Result> { From d14cff3abd1c96101992c6ae9202cfa1e785ef08 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sat, 9 Jan 2021 10:31:39 +0100 Subject: [PATCH 32/43] Fmt --- rust/datafusion/src/physical_plan/hash_join.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index f2ed91564a3..256982e7142 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -771,10 +771,7 @@ fn is_eq( } /// Creates hash values for every -fn create_hashes( - arrays: &[ArrayRef], - random_state: &RandomState, -) -> Result> { +fn create_hashes(arrays: &[ArrayRef], random_state: &RandomState) -> Result> { let rows = arrays[0].len(); let mut hashes = vec![0; rows]; From 413660e25d40e533d74e0dc815226c60b1623087 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sat, 9 Jan 2021 10:52:30 +0100 Subject: [PATCH 33/43] Docs --- .../datafusion/src/physical_plan/hash_join.rs | 22 +++++++++++-------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index 256982e7142..c1eeaaa0350 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -58,9 +58,9 @@ use super::{ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchS use crate::physical_plan::coalesce_batches::concat_batches; use log::debug; -// Maps ["on" value] -> [list of indices with this key's value] -// E.g. [1, 2] -> [(0, 3), (1, 6), (0, 8)] indicates that (column1, column2) = [1, 2] is true -// for rows 3 and 8 from batch 0 and row 6 from batch 1. +// Maps a `u64` hash value based on the left ["on" values] to a list of indices with this key's value. +// E.g. [1, 2] -> [3, 6, 8] indicates that the column values map to rows 3, 6 and 8 +// As the key is a hash value, we need to check possible hash collisions in the probe stage type JoinHashMap = HashMap, IdHashBuilder>; type JoinLeftData = Arc<(JoinHashMap, RecordBatch)>; @@ -80,7 +80,7 @@ pub struct HashJoinExec { schema: SchemaRef, /// Build-side build_side: Arc>>, - + /// Shares the `RandomState` for the hashing algorithm random_state: RandomState, } @@ -541,10 +541,13 @@ fn build_join_indexes( for (row, hash_value) in hash_values.iter().enumerate() { // Get the hash and find it in the build index - // for every item on the left and right we check if it matches + // For every item on the left and right we check if it matches + // This possibly contains rows with hash collisions, + // So we have to check here whether rows are equal or not if let Some(indices) = left.get(hash_value) { for &i in indices { - if is_eq(i as usize, row, &left_join_values, &keys_values)? { + // Check hash collisions + if equal_rows(i as usize, row, &left_join_values, &keys_values)? { left_indices.append_value(i)?; right_indices.append_value(row as u32)?; } @@ -566,7 +569,7 @@ fn build_join_indexes( if let Some(indices) = left.get(hash_value) { for &i in indices { // Collision check - if is_eq(i as usize, row, &left_join_values, &keys_values)? { + if equal_rows(i as usize, row, &left_join_values, &keys_values)? { left_indices.append_value(i)?; right_indices.append_value(row as u32)?; is_visited.insert(i); @@ -591,7 +594,7 @@ fn build_join_indexes( match left.get(hash_value) { Some(indices) => { for &i in indices { - if is_eq(i as usize, row, &left_join_values, &keys_values)? { + if equal_rows(i as usize, row, &left_join_values, &keys_values)? { left_indices.append_value(i)?; right_indices.append_value(row as u32)?; } @@ -647,7 +650,8 @@ fn combine_hashes(l: u64, r: u64) -> u64 { hash.overflowing_mul(37).0.overflowing_add(r).0 } -fn is_eq( +/// Left and right row have equal values +fn equal_rows( left: usize, right: usize, left_arrays: &[ArrayRef], From 60739c4088b4e2027ad1085c79d56eefad39b7e2 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sun, 10 Jan 2021 20:00:22 +0100 Subject: [PATCH 34/43] Add test for hash collision --- .../datafusion/src/physical_plan/hash_join.rs | 57 ++++++++++++++++++- 1 file changed, 55 insertions(+), 2 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index c1eeaaa0350..738c75dc884 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -594,7 +594,12 @@ fn build_join_indexes( match left.get(hash_value) { Some(indices) => { for &i in indices { - if equal_rows(i as usize, row, &left_join_values, &keys_values)? { + if equal_rows( + i as usize, + row, + &left_join_values, + &keys_values, + )? { left_indices.append_value(i)?; right_indices.append_value(row as u32)?; } @@ -940,7 +945,6 @@ impl Stream for HashJoinStream { #[cfg(test)] mod tests { - use crate::{ physical_plan::{common, memory::MemoryExec}, test::{build_table_i32, columns, format_batch}, @@ -1224,4 +1228,53 @@ mod tests { Ok(()) } + + #[test] + fn join_with_hash_collision() -> Result<()> { + let mut hashmap_left = HashMap::with_hasher(IdHashBuilder {}); + let left = build_table_i32( + ("a", &vec![10, 20]), + ("x", &vec![100, 200]), + ("y", &vec![200, 300]), + ); + + let random_state = RandomState::new(); + + let hashes = create_hashes(&[left.columns()[0].clone()], &random_state)?; + + // Create hash collisions + hashmap_left.insert(hashes[0], vec![0, 1]); + hashmap_left.insert(hashes[1], vec![0, 1]); + + let right = build_table_i32( + ("a", &vec![10, 20]), + ("b", &vec![0, 0]), + ("c", &vec![30, 40]), + ); + + let left_data = JoinLeftData::new((hashmap_left, left)); + let (l, r) = build_join_indexes( + &left_data, + &right, + JoinType::Inner, + &["a".to_string()], + &["a".to_string()], + &random_state, + )?; + + let mut left_ids = UInt64Builder::new(0); + left_ids.append_value(0)?; + left_ids.append_value(1)?; + + let mut right_ids = UInt32Builder::new(0); + + right_ids.append_value(0)?; + right_ids.append_value(1)?; + + assert_eq!(left_ids.finish(), l); + + assert_eq!(right_ids.finish(), r); + + Ok(()) + } } From 0f43bff46e110dba5cb23f16b80c2e1ee9e8044f Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sun, 10 Jan 2021 20:02:36 +0100 Subject: [PATCH 35/43] Move create_key --- .../src/physical_plan/hash_aggregate.rs | 74 ++++++++++++++++++- .../datafusion/src/physical_plan/hash_join.rs | 74 ------------------- 2 files changed, 72 insertions(+), 76 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_aggregate.rs b/rust/datafusion/src/physical_plan/hash_aggregate.rs index 864bc78a154..f8123542a4f 100644 --- a/rust/datafusion/src/physical_plan/hash_aggregate.rs +++ b/rust/datafusion/src/physical_plan/hash_aggregate.rs @@ -43,8 +43,8 @@ use arrow::{ use pin_project_lite::pin_project; use super::{ - common, expressions::Column, group_scalar::GroupByScalar, hash_join::create_key, - RecordBatchStream, SendableRecordBatchStream, + common, expressions::Column, group_scalar::GroupByScalar, RecordBatchStream, + SendableRecordBatchStream, }; use ahash::RandomState; use hashbrown::HashMap; @@ -329,6 +329,76 @@ fn group_aggregate_batch( Ok(accumulators) } +/// Create a key `Vec` that is used as key for the hashmap +fn create_key(group_by_keys: &[ArrayRef], row: usize, vec: &mut Vec) -> Result<()> { + vec.clear(); + for col in group_by_keys { + match col.data_type() { + DataType::UInt8 => { + let array = col.as_any().downcast_ref::().unwrap(); + vec.extend_from_slice(&array.value(row).to_le_bytes()); + } + DataType::UInt16 => { + let array = col.as_any().downcast_ref::().unwrap(); + vec.extend_from_slice(&array.value(row).to_le_bytes()); + } + DataType::UInt32 => { + let array = col.as_any().downcast_ref::().unwrap(); + vec.extend_from_slice(&array.value(row).to_le_bytes()); + } + DataType::UInt64 => { + let array = col.as_any().downcast_ref::().unwrap(); + vec.extend_from_slice(&array.value(row).to_le_bytes()); + } + DataType::Int8 => { + let array = col.as_any().downcast_ref::().unwrap(); + vec.extend_from_slice(&array.value(row).to_le_bytes()); + } + DataType::Int16 => { + let array = col.as_any().downcast_ref::().unwrap(); + vec.extend(array.value(row).to_le_bytes().iter()); + } + DataType::Int32 => { + let array = col.as_any().downcast_ref::().unwrap(); + vec.extend_from_slice(&array.value(row).to_le_bytes()); + } + DataType::Int64 => { + let array = col.as_any().downcast_ref::().unwrap(); + vec.extend_from_slice(&array.value(row).to_le_bytes()); + } + DataType::Timestamp(TimeUnit::Microsecond, None) => { + let array = col + .as_any() + .downcast_ref::() + .unwrap(); + vec.extend_from_slice(&array.value(row).to_le_bytes()); + } + DataType::Timestamp(TimeUnit::Nanosecond, None) => { + let array = col + .as_any() + .downcast_ref::() + .unwrap(); + vec.extend_from_slice(&array.value(row).to_le_bytes()); + } + DataType::Utf8 => { + let array = col.as_any().downcast_ref::().unwrap(); + let value = array.value(row); + // store the size + vec.extend_from_slice(&value.len().to_le_bytes()); + // store the string value + vec.extend_from_slice(value.as_bytes()); + } + _ => { + // This is internal because we should have caught this before. + return Err(DataFusionError::Internal( + "Unsupported GROUP BY data type".to_string(), + )); + } + } + } + Ok(()) +} + async fn compute_grouped_hash_aggregate( mode: AggregateMode, schema: SchemaRef, diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index 738c75dc884..0c2dbda1424 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -376,80 +376,6 @@ fn build_batch_from_indices( RecordBatch::try_new(Arc::new(schema.clone()), columns) } -/// Create a key `Vec` that is used as key for the hashmap -pub(crate) fn create_key( - group_by_keys: &[ArrayRef], - row: usize, - vec: &mut Vec, -) -> Result<()> { - vec.clear(); - for col in group_by_keys { - match col.data_type() { - DataType::UInt8 => { - let array = col.as_any().downcast_ref::().unwrap(); - vec.extend_from_slice(&array.value(row).to_le_bytes()); - } - DataType::UInt16 => { - let array = col.as_any().downcast_ref::().unwrap(); - vec.extend_from_slice(&array.value(row).to_le_bytes()); - } - DataType::UInt32 => { - let array = col.as_any().downcast_ref::().unwrap(); - vec.extend_from_slice(&array.value(row).to_le_bytes()); - } - DataType::UInt64 => { - let array = col.as_any().downcast_ref::().unwrap(); - vec.extend_from_slice(&array.value(row).to_le_bytes()); - } - DataType::Int8 => { - let array = col.as_any().downcast_ref::().unwrap(); - vec.extend_from_slice(&array.value(row).to_le_bytes()); - } - DataType::Int16 => { - let array = col.as_any().downcast_ref::().unwrap(); - vec.extend(array.value(row).to_le_bytes().iter()); - } - DataType::Int32 => { - let array = col.as_any().downcast_ref::().unwrap(); - vec.extend_from_slice(&array.value(row).to_le_bytes()); - } - DataType::Int64 => { - let array = col.as_any().downcast_ref::().unwrap(); - vec.extend_from_slice(&array.value(row).to_le_bytes()); - } - DataType::Timestamp(TimeUnit::Microsecond, None) => { - let array = col - .as_any() - .downcast_ref::() - .unwrap(); - vec.extend_from_slice(&array.value(row).to_le_bytes()); - } - DataType::Timestamp(TimeUnit::Nanosecond, None) => { - let array = col - .as_any() - .downcast_ref::() - .unwrap(); - vec.extend_from_slice(&array.value(row).to_le_bytes()); - } - DataType::Utf8 => { - let array = col.as_any().downcast_ref::().unwrap(); - let value = array.value(row); - // store the size - vec.extend_from_slice(&value.len().to_le_bytes()); - // store the string value - vec.extend_from_slice(value.as_bytes()); - } - _ => { - // This is internal because we should have caught this before. - return Err(DataFusionError::Internal( - "Unsupported GROUP BY data type".to_string(), - )); - } - } - } - Ok(()) -} - #[allow(clippy::too_many_arguments)] fn build_batch( batch: &RecordBatch, From d6102c9cd2711ddcddefb5ddf599c3f195cabe4a Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sun, 10 Jan 2021 20:24:02 +0100 Subject: [PATCH 36/43] Use macro to reduce verbosity --- .../datafusion/src/physical_plan/hash_join.rs | 107 ++++++------------ 1 file changed, 35 insertions(+), 72 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index 0c2dbda1424..425866fd602 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -705,7 +705,19 @@ fn equal_rows( err.unwrap_or(Ok(res)) } -/// Creates hash values for every +macro_rules! hash_array { + ($array_type:ident, $column: ident, $f: ident, $hashes: ident, $random_state: ident) => { + let array = $column.as_any().downcast_ref::<$array_type>().unwrap(); + + for (i, hash) in $hashes.iter_mut().enumerate() { + let mut hasher = $random_state.build_hasher(); + hasher.$f(array.value(i)); + *hash = combine_hashes(hasher.finish(), *hash); + } + }; +} + +/// Creates hash values for every element in the row based on the values in the columns fn create_hashes(arrays: &[ArrayRef], random_state: &RandomState) -> Result> { let rows = arrays[0].len(); let mut hashes = vec![0; rows]; @@ -713,95 +725,46 @@ fn create_hashes(arrays: &[ArrayRef], random_state: &RandomState) -> Result { - let array = col.as_any().downcast_ref::().unwrap(); - - for (i, hash) in hashes.iter_mut().enumerate() { - let mut hasher = random_state.build_hasher(); - hasher.write_u8(array.value(i)); - *hash = combine_hashes(hasher.finish(), *hash); - } + hash_array!(UInt8Array, col, write_u8, hashes, random_state); } DataType::UInt16 => { - let array = col.as_any().downcast_ref::().unwrap(); - - for (i, hash) in hashes.iter_mut().enumerate() { - let mut hasher = random_state.build_hasher(); - hasher.write_u16(array.value(i)); - *hash = combine_hashes(hasher.finish(), *hash); - } + hash_array!(UInt16Array, col, write_u16, hashes, random_state); } DataType::UInt32 => { - let array = col.as_any().downcast_ref::().unwrap(); - - for (i, hash) in hashes.iter_mut().enumerate() { - let mut hasher = random_state.build_hasher(); - hasher.write_u32(array.value(i)); - *hash = combine_hashes(hasher.finish(), *hash); - } + hash_array!(UInt32Array, col, write_u32, hashes, random_state); } DataType::UInt64 => { - let array = col.as_any().downcast_ref::().unwrap(); - - for (i, hash) in hashes.iter_mut().enumerate() { - let mut hasher = random_state.build_hasher(); - hasher.write_u64(array.value(i)); - *hash = combine_hashes(hasher.finish(), *hash); - } + hash_array!(UInt64Array, col, write_u64, hashes, random_state); } DataType::Int8 => { - let array = col.as_any().downcast_ref::().unwrap(); - - for (i, hash) in hashes.iter_mut().enumerate() { - let mut hasher = random_state.build_hasher(); - hasher.write_i8(array.value(i)); - *hash = combine_hashes(hasher.finish(), *hash); - } + hash_array!(Int8Array, col, write_i8, hashes, random_state); } DataType::Int16 => { - let array = col.as_any().downcast_ref::().unwrap(); - for (i, hash) in hashes.iter_mut().enumerate() { - let mut hasher = random_state.build_hasher(); - hasher.write_i16(array.value(i)); - *hash = combine_hashes(hasher.finish(), *hash); - } + hash_array!(Int16Array, col, write_i16, hashes, random_state); } DataType::Int32 => { - let array = col.as_any().downcast_ref::().unwrap(); - for (i, hash) in hashes.iter_mut().enumerate() { - let mut hasher = random_state.build_hasher(); - hasher.write_i32(array.value(i)); - *hash = combine_hashes(hasher.finish(), *hash); - } + hash_array!(Int32Array, col, write_i32, hashes, random_state); } DataType::Int64 => { - let array = col.as_any().downcast_ref::().unwrap(); - for (i, hash) in hashes.iter_mut().enumerate() { - let mut hasher = random_state.build_hasher(); - hasher.write_i64(array.value(i)); - *hash = combine_hashes(hasher.finish(), *hash); - } + hash_array!(Int64Array, col, write_i64, hashes, random_state); } DataType::Timestamp(TimeUnit::Microsecond, None) => { - let array = col - .as_any() - .downcast_ref::() - .unwrap(); - for (i, hash) in hashes.iter_mut().enumerate() { - let mut hasher = random_state.build_hasher(); - hasher.write_i64(array.value(i)); - *hash = combine_hashes(hasher.finish(), *hash); - } + hash_array!( + TimestampMicrosecondArray, + col, + write_i64, + hashes, + random_state + ); } DataType::Timestamp(TimeUnit::Nanosecond, None) => { - let array = col - .as_any() - .downcast_ref::() - .unwrap(); - for (i, hash) in hashes.iter_mut().enumerate() { - let mut hasher = random_state.build_hasher(); - hasher.write_i64(array.value(i)); - *hash = combine_hashes(hasher.finish(), *hash); - } + hash_array!( + TimestampNanosecondArray, + col, + write_i64, + hashes, + random_state + ); } DataType::Utf8 => { let array = col.as_any().downcast_ref::().unwrap(); From 1823c50268026dde5515fe931c29bff1a9851d8c Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sun, 10 Jan 2021 20:31:32 +0100 Subject: [PATCH 37/43] Use macro to reduce verbosity --- .../datafusion/src/physical_plan/hash_join.rs | 103 +++++------------- 1 file changed, 27 insertions(+), 76 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index 425866fd602..48cf90d3613 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -59,7 +59,7 @@ use crate::physical_plan::coalesce_batches::concat_batches; use log::debug; // Maps a `u64` hash value based on the left ["on" values] to a list of indices with this key's value. -// E.g. [1, 2] -> [3, 6, 8] indicates that the column values map to rows 3, 6 and 8 +// E.g. 1 -> [3, 6, 8] indicates that the column values map to rows 3, 6 and 8 for hash value 1 // As the key is a hash value, we need to check possible hash collisions in the probe stage type JoinHashMap = HashMap, IdHashBuilder>; type JoinLeftData = Arc<(JoinHashMap, RecordBatch)>; @@ -581,6 +581,20 @@ fn combine_hashes(l: u64, r: u64) -> u64 { hash.overflowing_mul(37).0.overflowing_add(r).0 } +macro_rules! equal_rows_elem { + ($array_type:ident, $l: ident, $r: ident, $left: ident, $right: ident) => { + $l.as_any() + .downcast_ref::<$array_type>() + .unwrap() + .value($left) + == $r + .as_any() + .downcast_ref::<$array_type>() + .unwrap() + .value($right) + }; +} + /// Left and right row have equal values fn equal_rows( left: usize, @@ -595,103 +609,40 @@ fn equal_rows( .all(|(l, r)| match l.data_type() { DataType::Null => true, DataType::Boolean => { - l.as_any() - .downcast_ref::() - .unwrap() - .value(left) - == r.as_any() - .downcast_ref::() - .unwrap() - .value(right) + equal_rows_elem!(BooleanArray, l, r, left, right) } DataType::Int8 => { - l.as_any().downcast_ref::().unwrap().value(left) - == r.as_any().downcast_ref::().unwrap().value(right) + equal_rows_elem!(Int8Array, l, r, left, right) } DataType::Int16 => { - l.as_any().downcast_ref::().unwrap().value(left) - == r.as_any() - .downcast_ref::() - .unwrap() - .value(right) + equal_rows_elem!(Int16Array, l, r, left, right) } DataType::Int32 => { - l.as_any().downcast_ref::().unwrap().value(left) - == r.as_any() - .downcast_ref::() - .unwrap() - .value(right) + equal_rows_elem!(Int32Array, l, r, left, right) } DataType::Int64 => { - l.as_any().downcast_ref::().unwrap().value(left) - == r.as_any() - .downcast_ref::() - .unwrap() - .value(right) + equal_rows_elem!(Int64Array, l, r, left, right) } DataType::UInt8 => { - l.as_any().downcast_ref::().unwrap().value(left) - == r.as_any() - .downcast_ref::() - .unwrap() - .value(right) + equal_rows_elem!(UInt8Array, l, r, left, right) } DataType::UInt16 => { - l.as_any() - .downcast_ref::() - .unwrap() - .value(left) - == r.as_any() - .downcast_ref::() - .unwrap() - .value(right) + equal_rows_elem!(UInt16Array, l, r, left, right) } DataType::UInt32 => { - l.as_any() - .downcast_ref::() - .unwrap() - .value(left) - == r.as_any() - .downcast_ref::() - .unwrap() - .value(right) + equal_rows_elem!(UInt32Array, l, r, left, right) } DataType::UInt64 => { - l.as_any() - .downcast_ref::() - .unwrap() - .value(left) - == r.as_any() - .downcast_ref::() - .unwrap() - .value(right) + equal_rows_elem!(UInt64Array, l, r, left, right) } DataType::Timestamp(_, None) => { - l.as_any().downcast_ref::().unwrap().value(left) - == r.as_any() - .downcast_ref::() - .unwrap() - .value(right) + equal_rows_elem!(Int64Array, l, r, left, right) } DataType::Utf8 => { - l.as_any() - .downcast_ref::() - .unwrap() - .value(left) - == r.as_any() - .downcast_ref::() - .unwrap() - .value(right) + equal_rows_elem!(StringArray, l, r, left, right) } DataType::LargeUtf8 => { - l.as_any() - .downcast_ref::() - .unwrap() - .value(left) - == r.as_any() - .downcast_ref::() - .unwrap() - .value(right) + equal_rows_elem!(LargeStringArray, l, r, left, right) } _ => { // This is internal because we should have caught this before. From 5deec7b4eb86ef9ef54dcafdbec380f93eefb4fd Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Thu, 14 Jan 2021 22:16:37 +0100 Subject: [PATCH 38/43] Fix merge --- rust/datafusion/src/physical_plan/hash_join.rs | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index 78a6b1eca67..c43c38528f1 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -20,16 +20,12 @@ use ahash::RandomState; use arrow::{ -<<<<<<< HEAD - array::{ArrayRef, BooleanArray, LargeStringArray, UInt64Builder}, -======= - array::{ArrayRef, Float32Array, Float64Array, UInt64Builder}, ->>>>>>> upstream/master + array::{ + ArrayRef, BooleanArray, LargeStringArray, + UInt64Builder, TimestampMicrosecondArray, TimestampNanosecondArray, UInt32Builder + }, compute, -}; -use arrow::{ - array::{TimestampMicrosecondArray, TimestampNanosecondArray, UInt32Builder}, - datatypes::TimeUnit, + datatypes::TimeUnit }; use std::time::Instant; use std::{any::Any, collections::HashSet}; From f87daf41fcd8d14940b376cf250da0e234d2ef27 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Thu, 14 Jan 2021 22:16:55 +0100 Subject: [PATCH 39/43] fmt --- rust/datafusion/src/physical_plan/hash_join.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index c43c38528f1..6cb53fcde91 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -21,11 +21,11 @@ use ahash::RandomState; use arrow::{ array::{ - ArrayRef, BooleanArray, LargeStringArray, - UInt64Builder, TimestampMicrosecondArray, TimestampNanosecondArray, UInt32Builder + ArrayRef, BooleanArray, LargeStringArray, TimestampMicrosecondArray, + TimestampNanosecondArray, UInt32Builder, UInt64Builder, }, compute, - datatypes::TimeUnit + datatypes::TimeUnit, }; use std::time::Instant; use std::{any::Any, collections::HashSet}; From 567bc673737e91ca3e399c023fdfe2c359448a77 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 15 Jan 2021 08:37:58 +0100 Subject: [PATCH 40/43] Null handling --- .../datafusion/src/physical_plan/hash_join.rs | 40 +++++++++++-------- 1 file changed, 24 insertions(+), 16 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index 6cb53fcde91..eda3f270962 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -602,17 +602,16 @@ fn combine_hashes(l: u64, r: u64) -> u64 { } macro_rules! equal_rows_elem { - ($array_type:ident, $l: ident, $r: ident, $left: ident, $right: ident) => { - $l.as_any() - .downcast_ref::<$array_type>() - .unwrap() - .value($left) - == $r - .as_any() - .downcast_ref::<$array_type>() - .unwrap() - .value($right) - }; + ($array_type:ident, $l: ident, $r: ident, $left: ident, $right: ident) => {{ + let left_array = $l.as_any().downcast_ref::<$array_type>().unwrap(); + let right_array = $r.as_any().downcast_ref::<$array_type>().unwrap(); + + match (left_array.is_null($left), left_array.is_null($right)) { + (true, true) => true, + (false, false) => left_array.value($left) == right_array.value($right), + _ => false + } + }}; } /// Left and right row have equal values @@ -679,11 +678,20 @@ fn equal_rows( macro_rules! hash_array { ($array_type:ident, $column: ident, $f: ident, $hashes: ident, $random_state: ident) => { let array = $column.as_any().downcast_ref::<$array_type>().unwrap(); - - for (i, hash) in $hashes.iter_mut().enumerate() { - let mut hasher = $random_state.build_hasher(); - hasher.$f(array.value(i)); - *hash = combine_hashes(hasher.finish(), *hash); + if array.null_count() == 0 { + for (i, hash) in $hashes.iter_mut().enumerate() { + let mut hasher = $random_state.build_hasher(); + hasher.$f(array.value(i)); + *hash = combine_hashes(hasher.finish(), *hash); + } + } else { + for (i, hash) in $hashes.iter_mut().enumerate() { + let mut hasher = $random_state.build_hasher(); + if !array.is_null(i) { + hasher.$f(array.value(i)); + *hash = combine_hashes(hasher.finish(), *hash); + } + } } }; } From 749ea4c651e177f07e00da53fa0547628801a604 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 15 Jan 2021 20:49:49 +0100 Subject: [PATCH 41/43] Clippy --- rust/datafusion/src/physical_plan/hash_join.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index eda3f270962..4707d78db2e 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -609,7 +609,7 @@ macro_rules! equal_rows_elem { match (left_array.is_null($left), left_array.is_null($right)) { (true, true) => true, (false, false) => left_array.value($left) == right_array.value($right), - _ => false + _ => false, } }}; } From 086d642cf18b7adee559f3cdc28f14e2c1767d0b Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 15 Jan 2021 20:58:07 +0100 Subject: [PATCH 42/43] Testing module --- testing | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/testing b/testing index d6c4deb22c4..b4eeafdec6f 160000 --- a/testing +++ b/testing @@ -1 +1 @@ -Subproject commit d6c4deb22c4b4e9e3247a2f291046e3c671ad235 +Subproject commit b4eeafdec6fb5284c4aaf269f2ebdb3be2c63ed5 From 28935563b6a8891bac2e826cfc080fa3aeb63d2c Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sat, 16 Jan 2021 22:52:14 +0100 Subject: [PATCH 43/43] Small cleanup --- rust/datafusion/src/physical_plan/hash_join.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index 4707d78db2e..a0961710e7f 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -597,8 +597,8 @@ impl BuildHasher for IdHashBuilder { // Combines two hashes into one hash fn combine_hashes(l: u64, r: u64) -> u64 { - let hash = (17 * 37u64).overflowing_add(l).0; - hash.overflowing_mul(37).0.overflowing_add(r).0 + let hash = (17 * 37u64).wrapping_add(l); + hash.wrapping_mul(37).wrapping_add(r) } macro_rules! equal_rows_elem {