From a0371dcae941f82e0847d98e8a5a77fb9636f00c Mon Sep 17 00:00:00 2001 From: Boaz Berman Date: Sat, 14 Aug 2021 19:18:09 +0300 Subject: [PATCH 1/5] Left join could use bitmap for left join instead of Vec --- datafusion/src/physical_plan/hash_join.rs | 36 ++++++++++++----------- 1 file changed, 19 insertions(+), 17 deletions(-) diff --git a/datafusion/src/physical_plan/hash_join.rs b/datafusion/src/physical_plan/hash_join.rs index 727d1c68ebccd..9a1f51964ee82 100644 --- a/datafusion/src/physical_plan/hash_join.rs +++ b/datafusion/src/physical_plan/hash_join.rs @@ -68,6 +68,7 @@ use super::{ DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream, }; +use crate::arrow::array::BooleanBufferBuilder; use crate::arrow::datatypes::TimeUnit; use crate::physical_plan::coalesce_batches::concat_batches; use crate::physical_plan::PhysicalExpr; @@ -401,9 +402,13 @@ impl ExecutionPlan for HashJoinExec { let num_rows = left_data.1.num_rows(); let visited_left_side = match self.join_type { JoinType::Left | JoinType::Full | JoinType::Semi | JoinType::Anti => { - vec![false; num_rows] + let mut buffer = BooleanBufferBuilder::new(num_rows); + + buffer.append_n(num_rows, false); + + buffer } - JoinType::Inner | JoinType::Right => vec![], + JoinType::Inner | JoinType::Right => BooleanBufferBuilder::new(0), }; Ok(Box::pin(HashJoinStream::new( self.schema.clone(), @@ -502,8 +507,7 @@ struct HashJoinStream { /// Random state used for hashing initialization random_state: RandomState, /// Keeps track of the left side rows whether they are visited - visited_left_side: Vec, - // TODO: use a more memory efficient data structure, https://github.com/apache/arrow-datafusion/issues/240 + visited_left_side: BooleanBufferBuilder, /// There is nothing to process anymore and left side is processed in case of left join is_exhausted: bool, /// Metrics @@ -525,7 +529,7 @@ impl HashJoinStream { right: SendableRecordBatchStream, column_indices: Vec, random_state: RandomState, - visited_left_side: Vec, + visited_left_side: BooleanBufferBuilder, join_metrics: HashJoinMetrics, null_equals_null: bool, ) -> Self { @@ -909,7 +913,7 @@ fn equal_rows( // Produces a batch for left-side rows that have/have not been matched during the whole join fn produce_from_matched( - visited_left_side: &[bool], + visited_left_side: &BooleanBufferBuilder, schema: &SchemaRef, column_indices: &[ColumnIndex], left_data: &JoinLeftData, @@ -918,20 +922,18 @@ fn produce_from_matched( // Find indices which didn't match any right row (are false) let indices = if unmatched { UInt64Array::from_iter_values( - visited_left_side - .iter() - .enumerate() - .filter(|&(_, &value)| !value) - .map(|(index, _)| index as u64), + (1..visited_left_side.len()) + .into_iter() + .filter(|v| !visited_left_side.get_bit(v)) + .map(|v| v as u64), ) } else { // produce those that did match UInt64Array::from_iter_values( - visited_left_side - .iter() - .enumerate() - .filter(|&(_, &value)| value) - .map(|(index, _)| index as u64), + (1..visited_left_side.len()) + .into_iter() + .filter(|v| visited_left_side.get_bit(v)) + .map(|v| v as u64), ) }; @@ -991,7 +993,7 @@ impl Stream for HashJoinStream { | JoinType::Semi | JoinType::Anti => { left_side.iter().flatten().for_each(|x| { - self.visited_left_side[x as usize] = true; + self.visited_left_side.set_bit(x as usize, true); }); } JoinType::Inner | JoinType::Right => {} From 744f8b5356361053927de418ecba385499c8a589 Mon Sep 17 00:00:00 2001 From: Boaz Berman Date: Sat, 18 Sep 2021 13:12:00 +0300 Subject: [PATCH 2/5] fix --- datafusion/src/physical_plan/hash_join.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/src/physical_plan/hash_join.rs b/datafusion/src/physical_plan/hash_join.rs index 9a1f51964ee82..b29b7c0113795 100644 --- a/datafusion/src/physical_plan/hash_join.rs +++ b/datafusion/src/physical_plan/hash_join.rs @@ -924,7 +924,7 @@ fn produce_from_matched( UInt64Array::from_iter_values( (1..visited_left_side.len()) .into_iter() - .filter(|v| !visited_left_side.get_bit(v)) + .filter(|v| !visited_left_side.get_bit(*v)) .map(|v| v as u64), ) } else { @@ -932,7 +932,7 @@ fn produce_from_matched( UInt64Array::from_iter_values( (1..visited_left_side.len()) .into_iter() - .filter(|v| visited_left_side.get_bit(v)) + .filter(|v| visited_left_side.get_bit(*v)) .map(|v| v as u64), ) }; From b57c9232de238b8928155e578e38b060cd73774f Mon Sep 17 00:00:00 2001 From: Boaz Berman Date: Sat, 18 Sep 2021 13:19:40 +0300 Subject: [PATCH 3/5] Fix --- datafusion/src/physical_plan/hash_join.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/src/physical_plan/hash_join.rs b/datafusion/src/physical_plan/hash_join.rs index b29b7c0113795..e1865eb1efaa8 100644 --- a/datafusion/src/physical_plan/hash_join.rs +++ b/datafusion/src/physical_plan/hash_join.rs @@ -922,7 +922,7 @@ fn produce_from_matched( // Find indices which didn't match any right row (are false) let indices = if unmatched { UInt64Array::from_iter_values( - (1..visited_left_side.len()) + (0..visited_left_side.len()) .into_iter() .filter(|v| !visited_left_side.get_bit(*v)) .map(|v| v as u64), @@ -930,7 +930,7 @@ fn produce_from_matched( } else { // produce those that did match UInt64Array::from_iter_values( - (1..visited_left_side.len()) + (0..visited_left_side.len()) .into_iter() .filter(|v| visited_left_side.get_bit(*v)) .map(|v| v as u64), From fb19ea1e77752b56f6e753f0c4616a3ab6b0f5df Mon Sep 17 00:00:00 2001 From: Boaz Berman Date: Sat, 13 Nov 2021 16:24:30 +0200 Subject: [PATCH 4/5] Finish implementation --- datafusion/src/physical_plan/hash_join.rs | 21 ++++----------------- 1 file changed, 4 insertions(+), 17 deletions(-) diff --git a/datafusion/src/physical_plan/hash_join.rs b/datafusion/src/physical_plan/hash_join.rs index e1865eb1efaa8..888bf5a2a7a9f 100644 --- a/datafusion/src/physical_plan/hash_join.rs +++ b/datafusion/src/physical_plan/hash_join.rs @@ -919,23 +919,10 @@ fn produce_from_matched( left_data: &JoinLeftData, unmatched: bool, ) -> ArrowResult { - // Find indices which didn't match any right row (are false) - let indices = if unmatched { - UInt64Array::from_iter_values( - (0..visited_left_side.len()) - .into_iter() - .filter(|v| !visited_left_side.get_bit(*v)) - .map(|v| v as u64), - ) - } else { - // produce those that did match - UInt64Array::from_iter_values( - (0..visited_left_side.len()) - .into_iter() - .filter(|v| visited_left_side.get_bit(*v)) - .map(|v| v as u64), - ) - }; + let indices = + UInt64Array::from_iter_values((0..visited_left_side.len()).filter_map(|v| { + (unmatched ^ visited_left_side.get_bit(v)).then(|| v as u64) + })); // generate batches by taking values from the left side and generating columns filled with null on the right side let num_rows = indices.len(); From a4324056d83002a653bb40ad8bb639d2489806ec Mon Sep 17 00:00:00 2001 From: Boaz Berman Date: Sat, 18 Dec 2021 14:55:16 +0200 Subject: [PATCH 5/5] Update hash_join.rs --- datafusion/src/physical_plan/hash_join.rs | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/datafusion/src/physical_plan/hash_join.rs b/datafusion/src/physical_plan/hash_join.rs index 888bf5a2a7a9f..8cb2f44db2817 100644 --- a/datafusion/src/physical_plan/hash_join.rs +++ b/datafusion/src/physical_plan/hash_join.rs @@ -919,10 +919,17 @@ fn produce_from_matched( left_data: &JoinLeftData, unmatched: bool, ) -> ArrowResult { - let indices = - UInt64Array::from_iter_values((0..visited_left_side.len()).filter_map(|v| { - (unmatched ^ visited_left_side.get_bit(v)).then(|| v as u64) - })); + let indices = if unmatched { + UInt64Array::from_iter_values( + (0..visited_left_side.len()) + .filter_map(|v| (!visited_left_side.get_bit(v)).then(|| v as u64)), + ) + } else { + UInt64Array::from_iter_values( + (0..visited_left_side.len()) + .filter_map(|v| (visited_left_side.get_bit(v)).then(|| v as u64)), + ) + }; // generate batches by taking values from the left side and generating columns filled with null on the right side let num_rows = indices.len();