Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 29 additions & 28 deletions rust/datafusion/src/physical_plan/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! Defines the join plan for executing partitions in parallel and then joining the results
//! into a set of partitions.

use arrow::array::ArrayRef;
use arrow::{array::ArrayRef, compute};
use std::sync::Arc;
use std::{any::Any, collections::HashSet};

Expand Down Expand Up @@ -55,7 +55,7 @@ type Index = (usize, usize);
// as a left join may issue None indices, in which case
type JoinIndex = Option<(usize, usize)>;
// An index of row uniquely identifying a row in a batch
type RightIndex = Option<usize>;
type RightIndex = Option<u32>;

// Maps ["on" value] -> [list of indices with this key's value]
// E.g. [1, 2] -> [(0, 3), (1, 6), (0, 8)] indicates that (column1, column2) = [1, 2] is true
Expand Down Expand Up @@ -275,56 +275,57 @@ fn build_batch_from_indices(
if left.is_empty() {
todo!("Create empty record batch");
}
// this is just for symmetry of the code below.
let right = vec![right.clone()];

let (primary_is_left, primary, secondary) = match join_type {
JoinType::Inner | JoinType::Left => (true, left, &right),
JoinType::Right => (false, &right, left),
let (primary_is_left, primary_schema, secondary_schema) = match join_type {
JoinType::Inner | JoinType::Left => (true, left[0].schema(), right.schema()),
JoinType::Right => (false, right.schema(), left[0].schema()),
};

// build the columns of the new [RecordBatch]:
// 1. pick whether the column is from the left or right
// 2. based on the pick, `take` items from the different recordBatches
let mut columns: Vec<Arc<dyn Array>> = Vec::with_capacity(schema.fields().len());

let right_indices = indices.iter().map(|(_, join_index)| join_index).collect();

for field in schema.fields() {
// pick the column (left or right) based on the field name.
// Note that we take `.data_ref()` to gather the [ArrayData] of each array.
let (is_primary, arrays) = match primary[0].schema().index_of(field.name()) {
Ok(i) => Ok((true, primary.iter().map(|batch| batch.column(i).data_ref().as_ref()).collect::<Vec<_>>())),
let (is_primary, column_index) = match primary_schema.index_of(field.name()) {
Ok(i) => Ok((true, i)),
Err(_) => {
match secondary[0].schema().index_of(field.name()) {
Ok(i) => Ok((false, secondary.iter().map(|batch| batch.column(i).data_ref().as_ref()).collect::<Vec<_>>())),
match secondary_schema.index_of(field.name()) {
Ok(i) => Ok((false, i)),
_ => Err(DataFusionError::Internal(
format!("During execution, the column {} was not found in neither the left or right side of the join", field.name()).to_string()
))
}
}
}.map_err(DataFusionError::into_arrow_external_error)?;

let capacity = arrays.iter().map(|array| array.len()).sum();
let mut mutable = MutableArrayData::new(arrays, true, capacity);

let is_left =
(is_primary && primary_is_left) || (!is_primary && !primary_is_left);
if is_left {

let array = if is_left {
// Note that we take `.data_ref()` to gather the [ArrayData] of each array.
let arrays = left
.iter()
.map(|batch| batch.column(column_index).data_ref().as_ref())
.collect::<Vec<_>>();

let mut mutable = MutableArrayData::new(arrays, true, indices.len());
// use the left indices
for (join_index, _) in indices {
match join_index {
Some((batch, row)) => mutable.extend(*batch, *row, *row + 1),
None => mutable.extend_nulls(1),
}
}
make_array(Arc::new(mutable.freeze()))
} else {
// use the right indices
for (_, join_index) in indices {
match join_index {
Some(row) => mutable.extend(0, *row, *row + 1),
None => mutable.extend_nulls(1),
}
}
let array = right.column(column_index);
compute::take(array.as_ref(), &right_indices, None)?
};
let array = make_array(Arc::new(mutable.freeze()));
columns.push(array);
}
Ok(RecordBatch::try_new(Arc::new(schema.clone()), columns)?)
Expand Down Expand Up @@ -400,7 +401,7 @@ fn build_batch(
) -> ArrowResult<RecordBatch> {
let indices = build_join_indexes(&left_data.0, &batch, join_type, on_right).unwrap();

build_batch_from_indices(schema, &left_data.1, &batch, join_type, &indices)
build_batch_from_indices(schema, &left_data.1, batch, join_type, &indices)
}

/// returns a vector with (index from left, index from right).
Expand Down Expand Up @@ -456,7 +457,7 @@ fn build_join_indexes(
// for every item on the left and right with this key, add the respective pair
left_indexes.unwrap_or(&vec![]).iter().for_each(|x| {
// on an inner join, left and right indices are present
indexes.push((Some(*x), Some(row)));
indexes.push((Some(*x), Some(row as u32)));
})
}
Ok(indexes)
Expand All @@ -477,7 +478,7 @@ fn build_join_indexes(
is_visited.insert(key.clone());

indices.iter().for_each(|x| {
indexes.push((Some(*x), Some(row)));
indexes.push((Some(*x), Some(row as u32)));
})
};
}
Expand All @@ -502,12 +503,12 @@ fn build_join_indexes(
match left_indices {
Some(indices) => {
indices.iter().for_each(|x| {
indexes.push((Some(*x), Some(row)));
indexes.push((Some(*x), Some(row as u32)));
});
}
None => {
// when no match, add the row with None for the left side
indexes.push((None, Some(row)));
indexes.push((None, Some(row as u32)));
}
}
}
Expand Down