From fd9f2e5ccc8151ccc2bc6414d8946798dcc5c763 Mon Sep 17 00:00:00 2001 From: yangzhong Date: Mon, 10 Apr 2023 18:42:34 +0800 Subject: [PATCH 1/4] Remove unnecessary equality check for JoinHashMap --- .../core/src/physical_plan/joins/hash_join.rs | 18 ++++++++---------- .../physical_plan/joins/symmetric_hash_join.rs | 9 ++------- 2 files changed, 10 insertions(+), 17 deletions(-) diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs index 46f2d92903f16..482ca4651c678 100644 --- a/datafusion/core/src/physical_plan/joins/hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/hash_join.rs @@ -594,16 +594,17 @@ pub fn update_hash( let hash_values = create_hashes(&keys_values, random_state, hashes_buffer)?; // insert hashes to key of the hashmap - for (row, hash_value) in hash_values.iter().enumerate() { - let item = hash_map - .0 - .get_mut(*hash_value, |(hash, _)| *hash_value == *hash); + let row_start = offset; + let row_end = offset + hash_values.len(); + for (row, hash_value) in (row_start..row_end).zip(hash_values.iter()) { + // the hash value is the key, always true + let item = hash_map.0.get_mut(*hash_value, |_| true); if let Some((_, indices)) = item { - indices.push((row + offset) as u64); + indices.push(row as u64); } else { hash_map.0.insert( *hash_value, - (*hash_value, smallvec![(row + offset) as u64]), + (*hash_value, smallvec![row as u64]), |(hash, _)| *hash, ); } @@ -760,10 +761,7 @@ pub fn build_equal_condition_join_indices( // For every item on the build and probe we check if it matches // This possibly contains rows with hash collisions, // So we have to check here whether rows are equal or not - if let Some((_, indices)) = build_hashmap - .0 - .get(*hash_value, |(hash, _)| *hash_value == *hash) - { + if let Some((_, indices)) = build_hashmap.0.get(*hash_value, |_| true) { for &i in indices { // Check hash collisions let offset_build_index = i as usize - offset_value; diff --git a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs index f5d06a8c0a48c..e0a24014982ff 100644 --- a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs @@ -677,15 +677,10 @@ fn prune_hash_values( } } for (hash_value, index_set) in hash_value_map.iter() { - if let Some((_, separation_chain)) = hashmap - .0 - .get_mut(*hash_value, |(hash, _)| hash_value == hash) - { + if let Some((_, separation_chain)) = hashmap.0.get_mut(*hash_value, |_| true) { separation_chain.retain(|n| !index_set.contains(n)); if separation_chain.is_empty() { - hashmap - .0 - .remove_entry(*hash_value, |(hash, _)| hash_value == hash); + hashmap.0.remove_entry(*hash_value, |_| true); } } } From 1ca95468c90424ec9352f24b6244118d7739b440 Mon Sep 17 00:00:00 2001 From: yangzhong Date: Mon, 10 Apr 2023 19:11:36 +0800 Subject: [PATCH 2/4] Change back SmallVec to Vec for JoinHashMap --- datafusion/core/Cargo.toml | 1 - datafusion/core/src/physical_plan/joins/hash_join.rs | 12 +++++------- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index ed3b23a9af5ac..e0a2cba5b7da0 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -88,7 +88,6 @@ percent-encoding = "2.2.0" pin-project-lite = "^0.2.7" rand = "0.8" rayon = { version = "1.5", optional = true } -smallvec = { version = "1.6", features = ["union"] } sqlparser = { version = "0.32", features = ["visitor"] } tempfile = "3" tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "fs", "parking_lot"] } diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs index 482ca4651c678..b9d0e196240d6 100644 --- a/datafusion/core/src/physical_plan/joins/hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/hash_join.rs @@ -44,7 +44,6 @@ use arrow::{ }; use futures::{ready, Stream, StreamExt, TryStreamExt}; use hashbrown::raw::RawTable; -use smallvec::{smallvec, SmallVec}; use std::fmt; use std::sync::Arc; use std::task::Poll; @@ -100,7 +99,7 @@ use super::{ // but the values don't match. Those are checked in the [equal_rows] macro // TODO: speed up collision check and move away from using a hashbrown HashMap // https://github.com/apache/arrow-datafusion/issues/50 -pub struct JoinHashMap(pub RawTable<(u64, SmallVec<[u64; 1]>)>); +pub struct JoinHashMap(pub RawTable<(u64, Vec)>); impl fmt::Debug for JoinHashMap { fn fmt(&self, _f: &mut fmt::Formatter) -> fmt::Result { @@ -543,7 +542,7 @@ async fn collect_left_input( ) })? / 7) .next_power_of_two(); - // 32 bytes per `(u64, SmallVec<[u64; 1]>)` + // 32 bytes per `(u64, Vec<[u64; 1]>)` // + 1 byte for each bucket // + 16 bytes fixed let estimated_hastable_size = 32 * estimated_buckets + estimated_buckets + 16; @@ -604,7 +603,7 @@ pub fn update_hash( } else { hash_map.0.insert( *hash_value, - (*hash_value, smallvec![row as u64]), + (*hash_value, vec![row as u64]), |(hash, _)| *hash, ); } @@ -1289,7 +1288,6 @@ mod tests { use arrow::array::{ArrayRef, Date32Array, Int32Array, UInt32Builder, UInt64Builder}; use arrow::datatypes::{DataType, Field, Schema}; - use smallvec::smallvec; use datafusion_common::ScalarValue; use datafusion_expr::Operator; @@ -2647,8 +2645,8 @@ mod tests { create_hashes(&[left.columns()[0].clone()], &random_state, hashes_buff)?; // Create hash collisions (same hashes) - hashmap_left.insert(hashes[0], (hashes[0], smallvec![0, 1]), |(h, _)| *h); - hashmap_left.insert(hashes[1], (hashes[1], smallvec![0, 1]), |(h, _)| *h); + hashmap_left.insert(hashes[0], (hashes[0], vec![0, 1]), |(h, _)| *h); + hashmap_left.insert(hashes[1], (hashes[1], vec![0, 1]), |(h, _)| *h); let right = build_table_i32( ("a", &vec![10, 20]), From 6e9d776a1cceffc9069de5996bd27686c5a434de Mon Sep 17 00:00:00 2001 From: yangzhong Date: Mon, 10 Apr 2023 19:50:30 +0800 Subject: [PATCH 3/4] Fix Cargo check --- datafusion-cli/Cargo.lock | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 0f26e5720c043..fde1a67ade035 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -726,7 +726,6 @@ dependencies = [ "percent-encoding", "pin-project-lite", "rand", - "smallvec", "sqlparser", "tempfile", "tokio", From 0ea3c31988c868825f302ba8fcd75e676f0048e2 Mon Sep 17 00:00:00 2001 From: yangzhong Date: Tue, 11 Apr 2023 10:35:21 +0800 Subject: [PATCH 4/4] Add back equality check for JoinHashMap --- datafusion/core/src/physical_plan/joins/hash_join.rs | 9 +++++++-- .../core/src/physical_plan/joins/symmetric_hash_join.rs | 9 +++++++-- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs index b9d0e196240d6..0dbd0dac6ce85 100644 --- a/datafusion/core/src/physical_plan/joins/hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/hash_join.rs @@ -597,7 +597,9 @@ pub fn update_hash( let row_end = offset + hash_values.len(); for (row, hash_value) in (row_start..row_end).zip(hash_values.iter()) { // the hash value is the key, always true - let item = hash_map.0.get_mut(*hash_value, |_| true); + let item = hash_map + .0 + .get_mut(*hash_value, |(hash, _)| *hash_value == *hash); if let Some((_, indices)) = item { indices.push(row as u64); } else { @@ -760,7 +762,10 @@ pub fn build_equal_condition_join_indices( // For every item on the build and probe we check if it matches // This possibly contains rows with hash collisions, // So we have to check here whether rows are equal or not - if let Some((_, indices)) = build_hashmap.0.get(*hash_value, |_| true) { + if let Some((_, indices)) = build_hashmap + .0 + .get(*hash_value, |(hash, _)| *hash_value == *hash) + { for &i in indices { // Check hash collisions let offset_build_index = i as usize - offset_value; diff --git a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs index e0a24014982ff..15050b28abb95 100644 --- a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs @@ -677,10 +677,15 @@ fn prune_hash_values( } } for (hash_value, index_set) in hash_value_map.iter() { - if let Some((_, separation_chain)) = hashmap.0.get_mut(*hash_value, |_| true) { + if let Some((_, separation_chain)) = hashmap + .0 + .get_mut(*hash_value, |(hash, _)| *hash_value == *hash) + { separation_chain.retain(|n| !index_set.contains(n)); if separation_chain.is_empty() { - hashmap.0.remove_entry(*hash_value, |_| true); + hashmap + .0 + .remove_entry(*hash_value, |(hash, _)| *hash_value == *hash); } } }