diff --git a/datafusion/src/physical_plan/hash_join.rs b/datafusion/src/physical_plan/hash_join.rs index 727d1c68ebccd..8cb2f44db2817 100644 --- a/datafusion/src/physical_plan/hash_join.rs +++ b/datafusion/src/physical_plan/hash_join.rs @@ -68,6 +68,7 @@ use super::{ DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream, }; +use crate::arrow::array::BooleanBufferBuilder; use crate::arrow::datatypes::TimeUnit; use crate::physical_plan::coalesce_batches::concat_batches; use crate::physical_plan::PhysicalExpr; @@ -401,9 +402,13 @@ impl ExecutionPlan for HashJoinExec { let num_rows = left_data.1.num_rows(); let visited_left_side = match self.join_type { JoinType::Left | JoinType::Full | JoinType::Semi | JoinType::Anti => { - vec![false; num_rows] + let mut buffer = BooleanBufferBuilder::new(num_rows); + + buffer.append_n(num_rows, false); + + buffer } - JoinType::Inner | JoinType::Right => vec![], + JoinType::Inner | JoinType::Right => BooleanBufferBuilder::new(0), }; Ok(Box::pin(HashJoinStream::new( self.schema.clone(), @@ -502,8 +507,7 @@ struct HashJoinStream { /// Random state used for hashing initialization random_state: RandomState, /// Keeps track of the left side rows whether they are visited - visited_left_side: Vec, - // TODO: use a more memory efficient data structure, https://github.com/apache/arrow-datafusion/issues/240 + visited_left_side: BooleanBufferBuilder, /// There is nothing to process anymore and left side is processed in case of left join is_exhausted: bool, /// Metrics @@ -525,7 +529,7 @@ impl HashJoinStream { right: SendableRecordBatchStream, column_indices: Vec, random_state: RandomState, - visited_left_side: Vec, + visited_left_side: BooleanBufferBuilder, join_metrics: HashJoinMetrics, null_equals_null: bool, ) -> Self { @@ -909,29 +913,21 @@ fn equal_rows( // Produces a batch for left-side rows that have/have not been matched during the whole join fn produce_from_matched( - visited_left_side: &[bool], + visited_left_side: &BooleanBufferBuilder, schema: &SchemaRef, column_indices: &[ColumnIndex], left_data: &JoinLeftData, unmatched: bool, ) -> ArrowResult { - // Find indices which didn't match any right row (are false) let indices = if unmatched { UInt64Array::from_iter_values( - visited_left_side - .iter() - .enumerate() - .filter(|&(_, &value)| !value) - .map(|(index, _)| index as u64), + (0..visited_left_side.len()) + .filter_map(|v| (!visited_left_side.get_bit(v)).then(|| v as u64)), ) } else { - // produce those that did match UInt64Array::from_iter_values( - visited_left_side - .iter() - .enumerate() - .filter(|&(_, &value)| value) - .map(|(index, _)| index as u64), + (0..visited_left_side.len()) + .filter_map(|v| (visited_left_side.get_bit(v)).then(|| v as u64)), ) }; @@ -991,7 +987,7 @@ impl Stream for HashJoinStream { | JoinType::Semi | JoinType::Anti => { left_side.iter().flatten().for_each(|x| { - self.visited_left_side[x as usize] = true; + self.visited_left_side.set_bit(x as usize, true); }); } JoinType::Inner | JoinType::Right => {}