diff --git a/datafusion/src/physical_plan/hash_join.rs b/datafusion/src/physical_plan/hash_join.rs index f2ce88fddad4b..ea037a92ea393 100644 --- a/datafusion/src/physical_plan/hash_join.rs +++ b/datafusion/src/physical_plan/hash_join.rs @@ -66,6 +66,7 @@ use super::{ DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream, }; +use crate::arrow::array::BooleanBufferBuilder; use crate::physical_plan::coalesce_batches::concat_batches; use crate::physical_plan::PhysicalExpr; use log::debug; @@ -425,9 +426,13 @@ impl ExecutionPlan for HashJoinExec { let num_rows = left_data.1.num_rows(); let visited_left_side = match self.join_type { JoinType::Left | JoinType::Full | JoinType::Semi | JoinType::Anti => { - vec![false; num_rows] + let mut buffer = BooleanBufferBuilder::new(num_rows); + + buffer.append_n(num_rows, false); + + buffer } - JoinType::Inner | JoinType::Right => vec![], + JoinType::Inner | JoinType::Right => BooleanBufferBuilder::new(0), }; Ok(Box::pin(HashJoinStream::new( self.schema.clone(), @@ -527,7 +532,7 @@ struct HashJoinStream { /// Random state used for hashing initialization random_state: RandomState, /// Keeps track of the left side rows whether they are visited - visited_left_side: Vec, // TODO: use a more memory efficient data structure, https://github.com/apache/arrow-datafusion/issues/240 + visited_left_side: BooleanBufferBuilder, /// There is nothing to process anymore and left side is processed in case of left join is_exhausted: bool, /// Metrics @@ -545,7 +550,7 @@ impl HashJoinStream { right: SendableRecordBatchStream, column_indices: Vec, random_state: RandomState, - visited_left_side: Vec, + visited_left_side: BooleanBufferBuilder, join_metrics: HashJoinMetrics, ) -> Self { HashJoinStream { @@ -831,7 +836,7 @@ fn equal_rows( // Produces a batch for left-side rows that have/have not been matched during the whole join fn produce_from_matched( - visited_left_side: &[bool], + visited_left_side: &BooleanBufferBuilder, schema: &SchemaRef, column_indices: &[ColumnIndex], left_data: &JoinLeftData, @@ -840,20 +845,18 @@ fn produce_from_matched( // Find indices which didn't match any right row (are false) let indices = if unmatched { UInt64Array::from_iter_values( - visited_left_side - .iter() - .enumerate() - .filter(|&(_, &value)| !value) - .map(|(index, _)| index as u64), + (0..visited_left_side.len()) + .into_iter() + .filter(|v| !visited_left_side.get_bit(*v)) + .map(|v| v as u64), ) } else { // produce those that did match UInt64Array::from_iter_values( - visited_left_side - .iter() - .enumerate() - .filter(|&(_, &value)| value) - .map(|(index, _)| index as u64), + (0..visited_left_side.len()) + .into_iter() + .filter(|v| visited_left_side.get_bit(*v)) + .map(|v| v as u64), ) }; @@ -909,7 +912,7 @@ impl Stream for HashJoinStream { | JoinType::Semi | JoinType::Anti => { left_side.iter().flatten().for_each(|x| { - self.visited_left_side[x as usize] = true; + self.visited_left_side.set_bit(x as usize, true); }); } JoinType::Inner | JoinType::Right => {}