Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ unicode-segmentation = { version = "^1.7.1", optional = true }
regex = { version = "^1.4.3", optional = true }
lazy_static = { version = "^1.4.0", optional = true }
smallvec = { version = "1.6", features = ["union"] }
bitvec = "0.22.3"

[dev-dependencies]
rand = "0.8"
Expand Down
17 changes: 11 additions & 6 deletions datafusion/src/physical_plan/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ use super::{
SendableRecordBatchStream,
};
use crate::physical_plan::coalesce_batches::concat_batches;
use bitvec::prelude::*;
use log::debug;

// Maps a `u64` hash value based on the left ["on" values] to a list of indices with this key's value.
Expand Down Expand Up @@ -375,8 +376,8 @@ impl ExecutionPlan for HashJoinExec {
let column_indices = self.column_indices_from_schema()?;
let num_rows = left_data.1.num_rows();
let visited_left_side = match self.join_type {
JoinType::Left | JoinType::Full => vec![false; num_rows],
JoinType::Inner | JoinType::Right => vec![],
JoinType::Left | JoinType::Full => BitVec::repeat(false, num_rows),
JoinType::Inner | JoinType::Right => BitVec::new(),
};
Ok(Box::pin(HashJoinStream {
schema: self.schema.clone(),
Expand Down Expand Up @@ -480,7 +481,7 @@ struct HashJoinStream {
/// Random state used for hashing initialization
random_state: RandomState,
/// Keeps track of the left side rows whether they are visited
visited_left_side: Vec<bool>, // TODO: use a more memory efficient data structure, https://github.com/apache/arrow-datafusion/issues/240
visited_left_side: BitVec,
/// There is nothing to process anymore and left side is processed in case of left join
is_exhausted: bool,
}
Expand Down Expand Up @@ -1016,16 +1017,17 @@ pub fn create_hashes<'a>(

// Produces a batch for left-side rows that are not marked as being visited during the whole join
fn produce_unmatched(
visited_left_side: &[bool],
visited_left_side: &BitSlice,
schema: &SchemaRef,
column_indices: &[ColumnIndex],
left_data: &JoinLeftData,
) -> ArrowResult<RecordBatch> {
// Find indices which didn't match any right row (are false)
let unmatched_indices: Vec<u64> = visited_left_side
.iter()
.by_ref()
.enumerate()
.filter(|&(_, &value)| !value)
.filter(|(_, &value)| !value)
.map(|(index, _)| index as u64)
.collect();

Expand Down Expand Up @@ -1079,7 +1081,10 @@ impl Stream for HashJoinStream {
match self.join_type {
JoinType::Left | JoinType::Full => {
left_side.iter().flatten().for_each(|x| {
self.visited_left_side[x as usize] = true;
*self
.visited_left_side
.get_mut(x as usize)
.unwrap() = true;
});
}
JoinType::Inner | JoinType::Right => {}
Expand Down