From 5e71a287ac94f3e7424cfe0b4dc4855ace3fe42f Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Mon, 22 Feb 2021 20:34:35 +0100 Subject: [PATCH 01/11] WIP hash repartition --- .../datafusion/src/physical_plan/hash_join.rs | 2 +- .../src/physical_plan/repartition.rs | 42 +++++++++++++------ 2 files changed, 30 insertions(+), 14 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index abec10d7fc6..3a7d3b01349 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -699,7 +699,7 @@ macro_rules! hash_array { } /// Creates hash values for every element in the row based on the values in the columns -fn create_hashes(arrays: &[ArrayRef], random_state: &RandomState) -> Result> { +pub fn create_hashes(arrays: &[ArrayRef], random_state: &RandomState) -> Result> { let rows = arrays[0].len(); let mut hashes = vec![0; rows]; diff --git a/rust/datafusion/src/physical_plan/repartition.rs b/rust/datafusion/src/physical_plan/repartition.rs index edabfde27c4..cf0c46cc775 100644 --- a/rust/datafusion/src/physical_plan/repartition.rs +++ b/rust/datafusion/src/physical_plan/repartition.rs @@ -29,7 +29,7 @@ use arrow::datatypes::SchemaRef; use arrow::error::Result as ArrowResult; use arrow::record_batch::RecordBatch; -use super::{RecordBatchStream, SendableRecordBatchStream}; +use super::{hash_join::create_hashes, RecordBatchStream, SendableRecordBatchStream}; use async_trait::async_trait; use crossbeam::channel::{unbounded, Receiver, Sender}; @@ -120,8 +120,11 @@ impl ExecutionPlan for RepartitionExec { let (sender, receiver) = unbounded::>>(); channels.push((sender, receiver)); } + let random = ahash::RandomState::new(); + // launch one async task per *input* partition for i in 0..num_input_partitions { + let random_state = random.clone(); let input = self.input.clone(); let mut channels = channels.clone(); let partitioning = self.partitioning.clone(); @@ -129,13 +132,32 @@ impl ExecutionPlan for RepartitionExec { let mut stream = input.execute(i).await?; let mut counter = 0; while let Some(result) = stream.next().await { - match partitioning { + match &partitioning { Partitioning::RoundRobinBatch(_) => { let output_partition = counter % num_output_partitions; let tx = &mut channels[output_partition].0; - tx.send(Some(result)).map_err(|e| { - DataFusionError::Execution(e.to_string()) - })?; + tx.send(Some(result)) + .map_err(|e| DataFusionError::Execution(e.to_string()))?; + } + Partitioning::Hash(expr, _) => { + let batch = result?; + let arrays = expr + .iter() + .map(|b| Ok(b.evaluate(&batch)?.into_array(batch.num_rows()))) + .collect::>>()?; + // Hash arrays and compute buckets based on number of partitions + let hashes = create_hashes(&arrays, &random_state)?; + let partions: Vec = hashes.iter().map(|x| x % num_output_partitions as u64).collect(); + //let mut cols = vec![]; + for c in batch.columns() { + for i in 0..batch.num_rows() { + let partition = partions[i]; + //TODO split array on partition values + } + + } + + } other => { // this should be unreachable as long as the validation logic @@ -177,10 +199,7 @@ impl ExecutionPlan for RepartitionExec { impl RepartitionExec { /// Create a new RepartitionExec - pub fn try_new( - input: Arc, - partitioning: Partitioning, - ) -> Result { + pub fn try_new(input: Arc, partitioning: Partitioning) -> Result { match &partitioning { Partitioning::RoundRobinBatch(_) => Ok(RepartitionExec { input, @@ -209,10 +228,7 @@ struct RepartitionStream { impl Stream for RepartitionStream { type Item = ArrowResult; - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match self.input.recv() { Ok(Some(batch)) => Poll::Ready(Some(batch)), // End of results from one input partition From 708cf97fd4cdcb03a2fe5296f2194da0afa3e55c Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Mon, 22 Feb 2021 20:48:49 +0100 Subject: [PATCH 02/11] WIP --- .../src/physical_plan/repartition.rs | 26 +++++++++++-------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/rust/datafusion/src/physical_plan/repartition.rs b/rust/datafusion/src/physical_plan/repartition.rs index cf0c46cc775..39e815d4139 100644 --- a/rust/datafusion/src/physical_plan/repartition.rs +++ b/rust/datafusion/src/physical_plan/repartition.rs @@ -18,10 +18,10 @@ //! The repartition operator maps N input partitions to M output partitions based on a //! partitioning scheme. -use std::any::Any; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; +use std::{any::Any, vec}; use crate::error::{DataFusionError, Result}; use crate::physical_plan::{ExecutionPlan, Partitioning}; @@ -145,19 +145,23 @@ impl ExecutionPlan for RepartitionExec { .iter() .map(|b| Ok(b.evaluate(&batch)?.into_array(batch.num_rows()))) .collect::>>()?; - // Hash arrays and compute buckets based on number of partitions + // Hash arrays and compute buckets based on number of partitions let hashes = create_hashes(&arrays, &random_state)?; - let partions: Vec = hashes.iter().map(|x| x % num_output_partitions as u64).collect(); - //let mut cols = vec![]; - for c in batch.columns() { - for i in 0..batch.num_rows() { - let partition = partions[i]; - //TODO split array on partition values + let partitions: Vec = hashes + .iter() + .map(|x| x % num_output_partitions as u64) + .collect(); + let mut indices = vec![vec![]; num_output_partitions]; + for (i, partition) in partitions.iter().enumerate() { + indices[*partition as usize].push(i) + } + for i in 0..num_output_partitions { + for c in batch.columns() { + for i in 0..batch.num_rows() { + //TODO take based on indices + } } - } - - } other => { // this should be unreachable as long as the validation logic From f310373940128215c198541c759b08e07887cf11 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Tue, 23 Feb 2021 08:19:36 +0100 Subject: [PATCH 03/11] Make it compile --- .../src/physical_plan/repartition.rs | 63 +++++++++++++------ 1 file changed, 44 insertions(+), 19 deletions(-) diff --git a/rust/datafusion/src/physical_plan/repartition.rs b/rust/datafusion/src/physical_plan/repartition.rs index 39e815d4139..6b8e4f21543 100644 --- a/rust/datafusion/src/physical_plan/repartition.rs +++ b/rust/datafusion/src/physical_plan/repartition.rs @@ -25,9 +25,9 @@ use std::{any::Any, vec}; use crate::error::{DataFusionError, Result}; use crate::physical_plan::{ExecutionPlan, Partitioning}; -use arrow::datatypes::SchemaRef; -use arrow::error::Result as ArrowResult; use arrow::record_batch::RecordBatch; +use arrow::{array::Array, error::Result as ArrowResult}; +use arrow::{compute::take, datatypes::SchemaRef}; use super::{hash_join::create_hashes, RecordBatchStream, SendableRecordBatchStream}; use async_trait::async_trait; @@ -136,31 +136,50 @@ impl ExecutionPlan for RepartitionExec { Partitioning::RoundRobinBatch(_) => { let output_partition = counter % num_output_partitions; let tx = &mut channels[output_partition].0; - tx.send(Some(result)) - .map_err(|e| DataFusionError::Execution(e.to_string()))?; + tx.send(Some(result)).map_err(|e| { + DataFusionError::Execution(e.to_string()) + })?; } Partitioning::Hash(expr, _) => { let batch = result?; let arrays = expr .iter() - .map(|b| Ok(b.evaluate(&batch)?.into_array(batch.num_rows()))) + .map(|b| { + Ok(b.evaluate(&batch)? + .into_array(batch.num_rows())) + }) .collect::>>()?; // Hash arrays and compute buckets based on number of partitions let hashes = create_hashes(&arrays, &random_state)?; - let partitions: Vec = hashes - .iter() - .map(|x| x % num_output_partitions as u64) - .collect(); let mut indices = vec![vec![]; num_output_partitions]; - for (i, partition) in partitions.iter().enumerate() { - indices[*partition as usize].push(i) + for (index, hash) in hashes.iter().enumerate() { + indices + [(*hash % num_output_partitions as u64) as usize] + .push(index as u64) } - for i in 0..num_output_partitions { - for c in batch.columns() { - for i in 0..batch.num_rows() { - //TODO take based on indices - } - } + for num_output_partition in 0..num_output_partitions { + let col_indices = + indices[num_output_partition].clone().into(); + + let columns = batch + .columns() + .iter() + .map(|c| { + take(c.as_ref(), &col_indices, None).map_err( + |e| { + DataFusionError::Execution( + e.to_string(), + ) + }, + ) + }) + .collect::>>>()?; + let res_batch = + RecordBatch::try_new(batch.schema(), columns); + let tx = &mut channels[num_output_partition].0; + tx.send(Some(res_batch)).map_err(|e| { + DataFusionError::Execution(e.to_string()) + })?; } } other => { @@ -203,7 +222,10 @@ impl ExecutionPlan for RepartitionExec { impl RepartitionExec { /// Create a new RepartitionExec - pub fn try_new(input: Arc, partitioning: Partitioning) -> Result { + pub fn try_new( + input: Arc, + partitioning: Partitioning, + ) -> Result { match &partitioning { Partitioning::RoundRobinBatch(_) => Ok(RepartitionExec { input, @@ -232,7 +254,10 @@ struct RepartitionStream { impl Stream for RepartitionStream { type Item = ArrowResult; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { match self.input.recv() { Ok(Some(batch)) => Poll::Ready(Some(batch)), // End of results from one input partition From a192ed09b9506bed03cd1ce94490511753223471 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Tue, 23 Feb 2021 08:36:05 +0100 Subject: [PATCH 04/11] Add test --- .../datafusion/src/physical_plan/hash_join.rs | 5 ++- .../src/physical_plan/repartition.rs | 45 ++++++++++++++----- 2 files changed, 37 insertions(+), 13 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index 3a7d3b01349..25630a9ec8e 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -699,7 +699,10 @@ macro_rules! hash_array { } /// Creates hash values for every element in the row based on the values in the columns -pub fn create_hashes(arrays: &[ArrayRef], random_state: &RandomState) -> Result> { +pub fn create_hashes( + arrays: &[ArrayRef], + random_state: &RandomState, +) -> Result> { let rows = arrays[0].len(); let mut hashes = vec![0; rows]; diff --git a/rust/datafusion/src/physical_plan/repartition.rs b/rust/datafusion/src/physical_plan/repartition.rs index 6b8e4f21543..1b9af47435b 100644 --- a/rust/datafusion/src/physical_plan/repartition.rs +++ b/rust/datafusion/src/physical_plan/repartition.rs @@ -160,7 +160,7 @@ impl ExecutionPlan for RepartitionExec { for num_output_partition in 0..num_output_partitions { let col_indices = indices[num_output_partition].clone().into(); - + // Produce batched based on column indices let columns = batch .columns() .iter() @@ -226,17 +226,11 @@ impl RepartitionExec { input: Arc, partitioning: Partitioning, ) -> Result { - match &partitioning { - Partitioning::RoundRobinBatch(_) => Ok(RepartitionExec { - input, - partitioning, - channels: Arc::new(Mutex::new(vec![])), - }), - other => Err(DataFusionError::NotImplemented(format!( - "Partitioning scheme not supported yet: {:?}", - other - ))), - } + Ok(RepartitionExec { + input, + partitioning, + channels: Arc::new(Mutex::new(vec![])), + }) } } @@ -350,6 +344,33 @@ mod tests { Ok(()) } + #[tokio::test(flavor = "multi_thread")] + async fn many_to_many_hash_partition() -> Result<()> { + // define input partitions + let schema = test_schema(); + let partition = create_vec_batches(&schema, 50); + let partitions = vec![partition.clone(), partition.clone(), partition.clone()]; + + // repartition from 3 input to 5 output + let output_partitions = repartition( + &schema, + partitions, + Partitioning::Hash( + vec![Arc::new(crate::physical_plan::expressions::Column::new( + &"c0", + ))], + 8, + ), + ) + .await?; + + let total_rows: usize = output_partitions.iter().map(|x| x.len()).sum(); + + assert_eq!(total_rows, 8 * 50 * 3); + + Ok(()) + } + fn test_schema() -> Arc { Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)])) } From 56e47fbf7e49d02799eb837cb80f02dae475f8f5 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Tue, 23 Feb 2021 08:39:19 +0100 Subject: [PATCH 05/11] Add assert to test --- rust/datafusion/src/physical_plan/repartition.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/rust/datafusion/src/physical_plan/repartition.rs b/rust/datafusion/src/physical_plan/repartition.rs index 1b9af47435b..2b4ae469de4 100644 --- a/rust/datafusion/src/physical_plan/repartition.rs +++ b/rust/datafusion/src/physical_plan/repartition.rs @@ -366,6 +366,7 @@ mod tests { let total_rows: usize = output_partitions.iter().map(|x| x.len()).sum(); + assert_eq!(8, output_partitions.len()); assert_eq!(total_rows, 8 * 50 * 3); Ok(()) From bbba43f545881b261907b526ee58c1bb4868d999 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Tue, 23 Feb 2021 08:46:54 +0100 Subject: [PATCH 06/11] Remove comment --- rust/datafusion/src/physical_plan/repartition.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/rust/datafusion/src/physical_plan/repartition.rs b/rust/datafusion/src/physical_plan/repartition.rs index 2b4ae469de4..875ed9883e7 100644 --- a/rust/datafusion/src/physical_plan/repartition.rs +++ b/rust/datafusion/src/physical_plan/repartition.rs @@ -351,7 +351,6 @@ mod tests { let partition = create_vec_batches(&schema, 50); let partitions = vec![partition.clone(), partition.clone(), partition.clone()]; - // repartition from 3 input to 5 output let output_partitions = repartition( &schema, partitions, From c120b50f6d7ceb8a8558b42917d027119accb55f Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Tue, 23 Feb 2021 17:47:08 +0100 Subject: [PATCH 07/11] Small cleanup --- rust/datafusion/src/physical_plan/repartition.rs | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/rust/datafusion/src/physical_plan/repartition.rs b/rust/datafusion/src/physical_plan/repartition.rs index 875ed9883e7..3e76e1333bf 100644 --- a/rust/datafusion/src/physical_plan/repartition.rs +++ b/rust/datafusion/src/physical_plan/repartition.rs @@ -140,12 +140,13 @@ impl ExecutionPlan for RepartitionExec { DataFusionError::Execution(e.to_string()) })?; } - Partitioning::Hash(expr, _) => { + Partitioning::Hash(exprs, _) => { let batch = result?; - let arrays = expr + let arrays = exprs .iter() - .map(|b| { - Ok(b.evaluate(&batch)? + .map(|expr| { + Ok(expr + .evaluate(&batch)? .into_array(batch.num_rows())) }) .collect::>>()?; @@ -158,14 +159,14 @@ impl ExecutionPlan for RepartitionExec { .push(index as u64) } for num_output_partition in 0..num_output_partitions { - let col_indices = + let indices = indices[num_output_partition].clone().into(); - // Produce batched based on column indices + // Produce batches based on indices let columns = batch .columns() .iter() .map(|c| { - take(c.as_ref(), &col_indices, None).map_err( + take(c.as_ref(), &indices, None).map_err( |e| { DataFusionError::Execution( e.to_string(), From ccf2bbd4981c8bae5e3e80596c464e06ec5d640d Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Tue, 23 Feb 2021 17:58:43 +0100 Subject: [PATCH 08/11] Small cleanup --- rust/datafusion/src/physical_plan/repartition.rs | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/rust/datafusion/src/physical_plan/repartition.rs b/rust/datafusion/src/physical_plan/repartition.rs index 3e76e1333bf..54e87abb575 100644 --- a/rust/datafusion/src/physical_plan/repartition.rs +++ b/rust/datafusion/src/physical_plan/repartition.rs @@ -141,13 +141,13 @@ impl ExecutionPlan for RepartitionExec { })?; } Partitioning::Hash(exprs, _) => { - let batch = result?; + let input_batch = result?; let arrays = exprs .iter() .map(|expr| { Ok(expr - .evaluate(&batch)? - .into_array(batch.num_rows())) + .evaluate(&input_batch)? + .into_array(input_batch.num_rows())) }) .collect::>>()?; // Hash arrays and compute buckets based on number of partitions @@ -175,10 +175,12 @@ impl ExecutionPlan for RepartitionExec { ) }) .collect::>>>()?; - let res_batch = - RecordBatch::try_new(batch.schema(), columns); + let output_batch = RecordBatch::try_new( + input_batch.schema(), + columns, + ); let tx = &mut channels[num_output_partition].0; - tx.send(Some(res_batch)).map_err(|e| { + tx.send(Some(input_batch)).map_err(|e| { DataFusionError::Execution(e.to_string()) })?; } From eea275494e4d54bf957ebe10a61bf3ab58f59305 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Tue, 23 Feb 2021 17:58:59 +0100 Subject: [PATCH 09/11] Small cleanup --- rust/datafusion/src/physical_plan/repartition.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/datafusion/src/physical_plan/repartition.rs b/rust/datafusion/src/physical_plan/repartition.rs index 54e87abb575..2bb6614f219 100644 --- a/rust/datafusion/src/physical_plan/repartition.rs +++ b/rust/datafusion/src/physical_plan/repartition.rs @@ -180,7 +180,7 @@ impl ExecutionPlan for RepartitionExec { columns, ); let tx = &mut channels[num_output_partition].0; - tx.send(Some(input_batch)).map_err(|e| { + tx.send(Some(output_batch)).map_err(|e| { DataFusionError::Execution(e.to_string()) })?; } From 9917b0a93531829c59429c04807a3b292b91d95d Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Tue, 23 Feb 2021 17:59:22 +0100 Subject: [PATCH 10/11] Small cleanup --- rust/datafusion/src/physical_plan/repartition.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/datafusion/src/physical_plan/repartition.rs b/rust/datafusion/src/physical_plan/repartition.rs index 2bb6614f219..1326af8ed78 100644 --- a/rust/datafusion/src/physical_plan/repartition.rs +++ b/rust/datafusion/src/physical_plan/repartition.rs @@ -162,7 +162,7 @@ impl ExecutionPlan for RepartitionExec { let indices = indices[num_output_partition].clone().into(); // Produce batches based on indices - let columns = batch + let columns = input_batch .columns() .iter() .map(|c| { From 82b75c2fb63ecb1a33dac5e61b1f0829cacc378f Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Wed, 24 Feb 2021 07:23:20 +0100 Subject: [PATCH 11/11] Remove clone --- rust/datafusion/src/physical_plan/repartition.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/rust/datafusion/src/physical_plan/repartition.rs b/rust/datafusion/src/physical_plan/repartition.rs index 1326af8ed78..63854988fa5 100644 --- a/rust/datafusion/src/physical_plan/repartition.rs +++ b/rust/datafusion/src/physical_plan/repartition.rs @@ -158,9 +158,10 @@ impl ExecutionPlan for RepartitionExec { [(*hash % num_output_partitions as u64) as usize] .push(index as u64) } - for num_output_partition in 0..num_output_partitions { - let indices = - indices[num_output_partition].clone().into(); + for (num_output_partition, partition_indices) in + indices.into_iter().enumerate() + { + let indices = partition_indices.into(); // Produce batches based on indices let columns = input_batch .columns()