From 00b4a845dc80aee59c1de36c9450f0b8f9ee2f36 Mon Sep 17 00:00:00 2001 From: Boaz Berman Date: Sat, 15 May 2021 17:41:56 +0300 Subject: [PATCH] Left join could use bitmap for left join instead of Vec Closes https://github.com/apache/arrow-datafusion/issues/240 --- datafusion/Cargo.toml | 1 + datafusion/src/physical_plan/hash_join.rs | 17 +++++++++++------ 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml index 4d98fdb1b2075..8bf6c057e4e41 100644 --- a/datafusion/Cargo.toml +++ b/datafusion/Cargo.toml @@ -65,6 +65,7 @@ unicode-segmentation = { version = "^1.7.1", optional = true } regex = { version = "^1.4.3", optional = true } lazy_static = { version = "^1.4.0", optional = true } smallvec = { version = "1.6", features = ["union"] } +bitvec = "0.22.3" [dev-dependencies] rand = "0.8" diff --git a/datafusion/src/physical_plan/hash_join.rs b/datafusion/src/physical_plan/hash_join.rs index 0bf5a2857fdee..9f2a67a3b502a 100644 --- a/datafusion/src/physical_plan/hash_join.rs +++ b/datafusion/src/physical_plan/hash_join.rs @@ -63,6 +63,7 @@ use super::{ SendableRecordBatchStream, }; use crate::physical_plan::coalesce_batches::concat_batches; +use bitvec::prelude::*; use log::debug; // Maps a `u64` hash value based on the left ["on" values] to a list of indices with this key's value. @@ -375,8 +376,8 @@ impl ExecutionPlan for HashJoinExec { let column_indices = self.column_indices_from_schema()?; let num_rows = left_data.1.num_rows(); let visited_left_side = match self.join_type { - JoinType::Left | JoinType::Full => vec![false; num_rows], - JoinType::Inner | JoinType::Right => vec![], + JoinType::Left | JoinType::Full => BitVec::repeat(false, num_rows), + JoinType::Inner | JoinType::Right => BitVec::new(), }; Ok(Box::pin(HashJoinStream { schema: self.schema.clone(), @@ -480,7 +481,7 @@ struct HashJoinStream { /// Random state used for hashing initialization random_state: RandomState, /// Keeps track of the left side rows whether they are visited - visited_left_side: Vec, // TODO: use a more memory efficient data structure, https://github.com/apache/arrow-datafusion/issues/240 + visited_left_side: BitVec, /// There is nothing to process anymore and left side is processed in case of left join is_exhausted: bool, } @@ -1016,7 +1017,7 @@ pub fn create_hashes<'a>( // Produces a batch for left-side rows that are not marked as being visited during the whole join fn produce_unmatched( - visited_left_side: &[bool], + visited_left_side: &BitSlice, schema: &SchemaRef, column_indices: &[ColumnIndex], left_data: &JoinLeftData, @@ -1024,8 +1025,9 @@ fn produce_unmatched( // Find indices which didn't match any right row (are false) let unmatched_indices: Vec = visited_left_side .iter() + .by_ref() .enumerate() - .filter(|&(_, &value)| !value) + .filter(|(_, &value)| !value) .map(|(index, _)| index as u64) .collect(); @@ -1079,7 +1081,10 @@ impl Stream for HashJoinStream { match self.join_type { JoinType::Left | JoinType::Full => { left_side.iter().flatten().for_each(|x| { - self.visited_left_side[x as usize] = true; + *self + .visited_left_side + .get_mut(x as usize) + .unwrap() = true; }); } JoinType::Inner | JoinType::Right => {}