From df826f1f6d5a4adff294d8f7f932ee5170e89205 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sat, 27 Feb 2021 12:48:46 +0100 Subject: [PATCH 1/5] Optimize builder usage --- rust/Cargo.toml | 3 ++ .../arrow-flight/src/arrow.flight.protocol.rs | 5 ++- rust/arrow/src/datatypes/mod.rs | 1 + rust/benchmarks/Cargo.toml | 3 ++ .../datafusion/src/physical_plan/hash_join.rs | 39 ++++++++++++++----- 5 files changed, 39 insertions(+), 12 deletions(-) diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 16e34de7f14..3934f0bbabb 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -31,3 +31,6 @@ members = [ # how it is compiled within the workspace, causing the whole workspace to be compiled from scratch # this way, this is a stand-alone package that compiles independently of the others. exclude = ["arrow-pyarrow-integration-testing"] + +[profile.release] +debug = true diff --git a/rust/arrow-flight/src/arrow.flight.protocol.rs b/rust/arrow-flight/src/arrow.flight.protocol.rs index 2a87d7b2683..8de767cb8a4 100644 --- a/rust/arrow-flight/src/arrow.flight.protocol.rs +++ b/rust/arrow-flight/src/arrow.flight.protocol.rs @@ -499,8 +499,9 @@ pub mod flight_service_server { #[async_trait] pub trait FlightService: Send + Sync + 'static { #[doc = "Server streaming response type for the Handshake method."] - type HandshakeStream: Stream> - + Send + type HandshakeStream: Stream< + Item = Result, + > + Send + Sync + 'static; #[doc = ""] diff --git a/rust/arrow/src/datatypes/mod.rs b/rust/arrow/src/datatypes/mod.rs index 175b50b0177..ba149df4f09 100644 --- a/rust/arrow/src/datatypes/mod.rs +++ b/rust/arrow/src/datatypes/mod.rs @@ -40,6 +40,7 @@ pub use datatype::*; /// A reference-counted reference to a [`Schema`](crate::datatypes::Schema). pub type SchemaRef = Arc; + #[cfg(test)] mod tests { use super::*; diff --git a/rust/benchmarks/Cargo.toml b/rust/benchmarks/Cargo.toml index 99655710017..34e158507d8 100644 --- a/rust/benchmarks/Cargo.toml +++ b/rust/benchmarks/Cargo.toml @@ -39,3 +39,6 @@ tokio = { version = "^1.0", features = ["macros", "rt", "rt-multi-thread"] } futures = "0.3" env_logger = "^0.8" snmalloc-rs = {version = "0.2", optional = true, features= ["cache-friendly"] } + +[profile.release] +debug = true diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index 25630a9ec8e..34db50897ff 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -23,11 +23,12 @@ use ahash::RandomState; use arrow::{ array::{ - ArrayRef, BooleanArray, LargeStringArray, TimestampMicrosecondArray, - TimestampNanosecondArray, UInt32Builder, UInt64Builder, + ArrayData, ArrayRef, BooleanArray, LargeStringArray, PrimitiveArray, + TimestampMicrosecondArray, TimestampNanosecondArray, UInt32BufferBuilder, + UInt32Builder, UInt64BufferBuilder, UInt64Builder, }, compute, - datatypes::TimeUnit, + datatypes::{TimeUnit, UInt32Type, UInt64Type}, }; use std::time::Instant; use std::{any::Any, collections::HashSet}; @@ -480,11 +481,12 @@ fn build_join_indexes( let hash_values = create_hashes(&keys_values, &random_state)?; let left = &left_data.0; - let mut left_indices = UInt64Builder::new(0); - let mut right_indices = UInt32Builder::new(0); - match join_type { JoinType::Inner => { + // Using a buffer builder to avoid slower normal builder + let mut left_indices = UInt64BufferBuilder::new(0); + let mut right_indices = UInt32BufferBuilder::new(0); + // Visit all of the right rows for (row, hash_value) in hash_values.iter().enumerate() { // Get the hash and find it in the build index @@ -496,15 +498,30 @@ fn build_join_indexes( for &i in indices { // Check hash collisions if equal_rows(i as usize, row, &left_join_values, &keys_values)? { - left_indices.append_value(i)?; - right_indices.append_value(row as u32)?; + left_indices.append(i); + right_indices.append(row as u32); } } } } - Ok((left_indices.finish(), right_indices.finish())) + let left = ArrayData::builder(DataType::UInt64) + .len(left_indices.len()) + .add_buffer(left_indices.finish()) + .build(); + let right = ArrayData::builder(DataType::UInt32) + .len(right_indices.len()) + .add_buffer(right_indices.finish()) + .build(); + + Ok(( + PrimitiveArray::::from(left), + PrimitiveArray::::from(right), + )) } JoinType::Left => { + let mut left_indices = UInt64Builder::new(0); + let mut right_indices = UInt32Builder::new(0); + // Keep track of which item is visited in the build input // TODO: this can be stored more efficiently with a marker // https://issues.apache.org/jira/browse/ARROW-11116 @@ -534,10 +551,12 @@ fn build_join_indexes( } } } - Ok((left_indices.finish(), right_indices.finish())) } JoinType::Right => { + let mut left_indices = UInt64Builder::new(0); + let mut right_indices = UInt32Builder::new(0); + for (row, hash_value) in hash_values.iter().enumerate() { match left.get(hash_value) { Some(indices) => { From a713f0e0e4059a8dd13001ac737f36efc92f421a Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sat, 27 Feb 2021 13:16:35 +0100 Subject: [PATCH 2/5] Cleanup --- rust/Cargo.toml | 3 --- rust/arrow-flight/src/arrow.flight.protocol.rs | 5 ++--- rust/arrow/src/datatypes/mod.rs | 1 - rust/benchmarks/Cargo.toml | 3 --- 4 files changed, 2 insertions(+), 10 deletions(-) diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 3934f0bbabb..16e34de7f14 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -31,6 +31,3 @@ members = [ # how it is compiled within the workspace, causing the whole workspace to be compiled from scratch # this way, this is a stand-alone package that compiles independently of the others. exclude = ["arrow-pyarrow-integration-testing"] - -[profile.release] -debug = true diff --git a/rust/arrow-flight/src/arrow.flight.protocol.rs b/rust/arrow-flight/src/arrow.flight.protocol.rs index 8de767cb8a4..2a87d7b2683 100644 --- a/rust/arrow-flight/src/arrow.flight.protocol.rs +++ b/rust/arrow-flight/src/arrow.flight.protocol.rs @@ -499,9 +499,8 @@ pub mod flight_service_server { #[async_trait] pub trait FlightService: Send + Sync + 'static { #[doc = "Server streaming response type for the Handshake method."] - type HandshakeStream: Stream< - Item = Result, - > + Send + type HandshakeStream: Stream> + + Send + Sync + 'static; #[doc = ""] diff --git a/rust/arrow/src/datatypes/mod.rs b/rust/arrow/src/datatypes/mod.rs index ba149df4f09..175b50b0177 100644 --- a/rust/arrow/src/datatypes/mod.rs +++ b/rust/arrow/src/datatypes/mod.rs @@ -40,7 +40,6 @@ pub use datatype::*; /// A reference-counted reference to a [`Schema`](crate::datatypes::Schema). pub type SchemaRef = Arc; - #[cfg(test)] mod tests { use super::*; diff --git a/rust/benchmarks/Cargo.toml b/rust/benchmarks/Cargo.toml index 34e158507d8..99655710017 100644 --- a/rust/benchmarks/Cargo.toml +++ b/rust/benchmarks/Cargo.toml @@ -39,6 +39,3 @@ tokio = { version = "^1.0", features = ["macros", "rt", "rt-multi-thread"] } futures = "0.3" env_logger = "^0.8" snmalloc-rs = {version = "0.2", optional = true, features= ["cache-friendly"] } - -[profile.release] -debug = true From 5e0f91c7781bef44d1be16ce8553332d059641fe Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sun, 28 Feb 2021 14:22:30 +0100 Subject: [PATCH 3/5] Save allocations in calculating hashes --- .../datafusion/src/physical_plan/hash_join.rs | 33 +++++++++++-------- .../src/physical_plan/repartition.rs | 4 ++- 2 files changed, 23 insertions(+), 14 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index 34db50897ff..fa2de996f53 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -238,19 +238,26 @@ impl ExecutionPlan for HashJoinExec { // This operation performs 2 steps at once: // 1. creates a [JoinHashMap] of all batches from the stream // 2. stores the batches in a vector. - let initial = - (JoinHashMap::with_hasher(IdHashBuilder {}), Vec::new(), 0); - let (hashmap, batches, num_rows) = stream + let initial = ( + JoinHashMap::with_hasher(IdHashBuilder {}), + Vec::new(), + 0, + Vec::new(), + ); + let (hashmap, batches, num_rows, _) = stream .try_fold(initial, |mut acc, batch| async { let hash = &mut acc.0; let values = &mut acc.1; let offset = acc.2; + acc.3.clear(); + acc.3.resize(batch.num_rows(), 0); update_hash( &on_left, &batch, hash, offset, &self.random_state, + &mut acc.3, ) .unwrap(); acc.2 += batch.num_rows(); @@ -312,6 +319,7 @@ fn update_hash( hash: &mut JoinHashMap, offset: usize, random_state: &RandomState, + hashes_buffer: &mut Vec, ) -> Result<()> { // evaluate the keys let keys_values = on @@ -320,7 +328,7 @@ fn update_hash( .collect::>>()?; // update the hash map - let hash_values = create_hashes(&keys_values, &random_state)?; + let hash_values = create_hashes(&keys_values, &random_state, hashes_buffer)?; // insert hashes to key of the hashmap for (row, hash_value) in hash_values.iter().enumerate() { @@ -477,8 +485,8 @@ fn build_join_indexes( .into_array(left_data.1.num_rows())) }) .collect::>>()?; - - let hash_values = create_hashes(&keys_values, &random_state)?; + let hash_buff = &mut vec![0; keys_values[0].len()]; + let hash_values = create_hashes(&keys_values, &random_state, hash_buff)?; let left = &left_data.0; match join_type { @@ -718,13 +726,11 @@ macro_rules! hash_array { } /// Creates hash values for every element in the row based on the values in the columns -pub fn create_hashes( +pub fn create_hashes<'a>( arrays: &[ArrayRef], random_state: &RandomState, -) -> Result> { - let rows = arrays[0].len(); - let mut hashes = vec![0; rows]; - + hashes: &'a mut Vec, +) -> Result<&'a mut Vec> { for col in arrays { match col.data_type() { DataType::UInt8 => { @@ -1155,8 +1161,9 @@ mod tests { ); let random_state = RandomState::new(); - - let hashes = create_hashes(&[left.columns()[0].clone()], &random_state)?; + let hashes_buff = &mut vec![0; left.num_rows()]; + let hashes = + create_hashes(&[left.columns()[0].clone()], &random_state, hashes_buff)?; // Create hash collisions hashmap_left.insert(hashes[0], vec![0, 1]); diff --git a/rust/datafusion/src/physical_plan/repartition.rs b/rust/datafusion/src/physical_plan/repartition.rs index 94c3aab64e1..16426f246f9 100644 --- a/rust/datafusion/src/physical_plan/repartition.rs +++ b/rust/datafusion/src/physical_plan/repartition.rs @@ -151,7 +151,9 @@ impl ExecutionPlan for RepartitionExec { }) .collect::>>()?; // Hash arrays and compute buckets based on number of partitions - let hashes = create_hashes(&arrays, &random_state)?; + let hashes_buf = &mut vec![0; arrays[0].len()]; + let hashes = + create_hashes(&arrays, &random_state, hashes_buf)?; let mut indices = vec![vec![]; num_output_partitions]; for (index, hash) in hashes.iter().enumerate() { indices From 28690403c8ec34f469c02fcac9addabd7bb393ce Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Wed, 3 Mar 2021 19:39:53 +0100 Subject: [PATCH 4/5] Rename to hashes_buffer --- .../datafusion/src/physical_plan/hash_join.rs | 32 +++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index fa2de996f53..56983a501b5 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -485,8 +485,8 @@ fn build_join_indexes( .into_array(left_data.1.num_rows())) }) .collect::>>()?; - let hash_buff = &mut vec![0; keys_values[0].len()]; - let hash_values = create_hashes(&keys_values, &random_state, hash_buff)?; + let hashes_buffer = &mut vec![0; keys_values[0].len()]; + let hash_values = create_hashes(&keys_values, &random_state, hashes_buffer)?; let left = &left_data.0; match join_type { @@ -729,45 +729,45 @@ macro_rules! hash_array { pub fn create_hashes<'a>( arrays: &[ArrayRef], random_state: &RandomState, - hashes: &'a mut Vec, + hashes_buffer: &'a mut Vec, ) -> Result<&'a mut Vec> { for col in arrays { match col.data_type() { DataType::UInt8 => { - hash_array!(UInt8Array, col, u8, hashes, random_state); + hash_array!(UInt8Array, col, u8, hashes_buffer, random_state); } DataType::UInt16 => { - hash_array!(UInt16Array, col, u16, hashes, random_state); + hash_array!(UInt16Array, col, u16, hashes_buffer, random_state); } DataType::UInt32 => { - hash_array!(UInt32Array, col, u32, hashes, random_state); + hash_array!(UInt32Array, col, u32, hashes_buffer, random_state); } DataType::UInt64 => { - hash_array!(UInt64Array, col, u64, hashes, random_state); + hash_array!(UInt64Array, col, u64, hashes_buffer, random_state); } DataType::Int8 => { - hash_array!(Int8Array, col, i8, hashes, random_state); + hash_array!(Int8Array, col, i8, hashes_buffer, random_state); } DataType::Int16 => { - hash_array!(Int16Array, col, i16, hashes, random_state); + hash_array!(Int16Array, col, i16, hashes_buffer, random_state); } DataType::Int32 => { - hash_array!(Int32Array, col, i32, hashes, random_state); + hash_array!(Int32Array, col, i32, hashes_buffer, random_state); } DataType::Int64 => { - hash_array!(Int64Array, col, i64, hashes, random_state); + hash_array!(Int64Array, col, i64, hashes_buffer, random_state); } DataType::Timestamp(TimeUnit::Microsecond, None) => { - hash_array!(TimestampMicrosecondArray, col, i64, hashes, random_state); + hash_array!(TimestampMicrosecondArray, col, i64, hashes_buffer, random_state); } DataType::Timestamp(TimeUnit::Nanosecond, None) => { - hash_array!(TimestampNanosecondArray, col, i64, hashes, random_state); + hash_array!(TimestampNanosecondArray, col, i64, hashes_buffer, random_state); } DataType::Boolean => { - hash_array!(BooleanArray, col, u8, hashes, random_state); + hash_array!(BooleanArray, col, u8, hashes_buffer, random_state); } DataType::Utf8 => { - hash_array!(StringArray, col, str, hashes, random_state); + hash_array!(StringArray, col, str, hashes_buffer, random_state); } _ => { // This is internal because we should have caught this before. @@ -777,7 +777,7 @@ pub fn create_hashes<'a>( } } } - Ok(hashes) + Ok(hashes_buffer) } impl Stream for HashJoinStream { From be37882f5409a3d86402707ff03798da2ee42e5f Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Thu, 4 Mar 2021 08:57:31 +0100 Subject: [PATCH 5/5] Fmt --- rust/datafusion/src/physical_plan/hash_join.rs | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index 56983a501b5..7ca769a5303 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -758,10 +758,22 @@ pub fn create_hashes<'a>( hash_array!(Int64Array, col, i64, hashes_buffer, random_state); } DataType::Timestamp(TimeUnit::Microsecond, None) => { - hash_array!(TimestampMicrosecondArray, col, i64, hashes_buffer, random_state); + hash_array!( + TimestampMicrosecondArray, + col, + i64, + hashes_buffer, + random_state + ); } DataType::Timestamp(TimeUnit::Nanosecond, None) => { - hash_array!(TimestampNanosecondArray, col, i64, hashes_buffer, random_state); + hash_array!( + TimestampNanosecondArray, + col, + i64, + hashes_buffer, + random_state + ); } DataType::Boolean => { hash_array!(BooleanArray, col, u8, hashes_buffer, random_state);