diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 0f26e5720c043..fde1a67ade035 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -726,7 +726,6 @@ dependencies = [ "percent-encoding", "pin-project-lite", "rand", - "smallvec", "sqlparser", "tempfile", "tokio", diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index ed3b23a9af5ac..e0a2cba5b7da0 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -88,7 +88,6 @@ percent-encoding = "2.2.0" pin-project-lite = "^0.2.7" rand = "0.8" rayon = { version = "1.5", optional = true } -smallvec = { version = "1.6", features = ["union"] } sqlparser = { version = "0.32", features = ["visitor"] } tempfile = "3" tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "fs", "parking_lot"] } diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs index 46f2d92903f16..0dbd0dac6ce85 100644 --- a/datafusion/core/src/physical_plan/joins/hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/hash_join.rs @@ -44,7 +44,6 @@ use arrow::{ }; use futures::{ready, Stream, StreamExt, TryStreamExt}; use hashbrown::raw::RawTable; -use smallvec::{smallvec, SmallVec}; use std::fmt; use std::sync::Arc; use std::task::Poll; @@ -100,7 +99,7 @@ use super::{ // but the values don't match. Those are checked in the [equal_rows] macro // TODO: speed up collision check and move away from using a hashbrown HashMap // https://github.com/apache/arrow-datafusion/issues/50 -pub struct JoinHashMap(pub RawTable<(u64, SmallVec<[u64; 1]>)>); +pub struct JoinHashMap(pub RawTable<(u64, Vec)>); impl fmt::Debug for JoinHashMap { fn fmt(&self, _f: &mut fmt::Formatter) -> fmt::Result { @@ -543,7 +542,7 @@ async fn collect_left_input( ) })? / 7) .next_power_of_two(); - // 32 bytes per `(u64, SmallVec<[u64; 1]>)` + // 32 bytes per `(u64, Vec<[u64; 1]>)` // + 1 byte for each bucket // + 16 bytes fixed let estimated_hastable_size = 32 * estimated_buckets + estimated_buckets + 16; @@ -594,16 +593,19 @@ pub fn update_hash( let hash_values = create_hashes(&keys_values, random_state, hashes_buffer)?; // insert hashes to key of the hashmap - for (row, hash_value) in hash_values.iter().enumerate() { + let row_start = offset; + let row_end = offset + hash_values.len(); + for (row, hash_value) in (row_start..row_end).zip(hash_values.iter()) { + // the hash value is the key, always true let item = hash_map .0 .get_mut(*hash_value, |(hash, _)| *hash_value == *hash); if let Some((_, indices)) = item { - indices.push((row + offset) as u64); + indices.push(row as u64); } else { hash_map.0.insert( *hash_value, - (*hash_value, smallvec![(row + offset) as u64]), + (*hash_value, vec![row as u64]), |(hash, _)| *hash, ); } @@ -1291,7 +1293,6 @@ mod tests { use arrow::array::{ArrayRef, Date32Array, Int32Array, UInt32Builder, UInt64Builder}; use arrow::datatypes::{DataType, Field, Schema}; - use smallvec::smallvec; use datafusion_common::ScalarValue; use datafusion_expr::Operator; @@ -2649,8 +2650,8 @@ mod tests { create_hashes(&[left.columns()[0].clone()], &random_state, hashes_buff)?; // Create hash collisions (same hashes) - hashmap_left.insert(hashes[0], (hashes[0], smallvec![0, 1]), |(h, _)| *h); - hashmap_left.insert(hashes[1], (hashes[1], smallvec![0, 1]), |(h, _)| *h); + hashmap_left.insert(hashes[0], (hashes[0], vec![0, 1]), |(h, _)| *h); + hashmap_left.insert(hashes[1], (hashes[1], vec![0, 1]), |(h, _)| *h); let right = build_table_i32( ("a", &vec![10, 20]), diff --git a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs index f5d06a8c0a48c..15050b28abb95 100644 --- a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs @@ -679,13 +679,13 @@ fn prune_hash_values( for (hash_value, index_set) in hash_value_map.iter() { if let Some((_, separation_chain)) = hashmap .0 - .get_mut(*hash_value, |(hash, _)| hash_value == hash) + .get_mut(*hash_value, |(hash, _)| *hash_value == *hash) { separation_chain.retain(|n| !index_set.contains(n)); if separation_chain.is_empty() { hashmap .0 - .remove_entry(*hash_value, |(hash, _)| hash_value == hash); + .remove_entry(*hash_value, |(hash, _)| *hash_value == *hash); } } }