From 84b08243c7ae215765fd0c5ba69ef10771969dc3 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 21 Dec 2020 11:35:05 -0700 Subject: [PATCH 01/21] rough out new repartition physical operator --- rust/datafusion/src/physical_plan/mod.rs | 1 + .../src/physical_plan/repartition.rs | 170 ++++++++++++++++++ 2 files changed, 171 insertions(+) create mode 100644 rust/datafusion/src/physical_plan/repartition.rs diff --git a/rust/datafusion/src/physical_plan/mod.rs b/rust/datafusion/src/physical_plan/mod.rs index 1a8d1ccf450..1f7fac24ab6 100644 --- a/rust/datafusion/src/physical_plan/mod.rs +++ b/rust/datafusion/src/physical_plan/mod.rs @@ -260,6 +260,7 @@ pub mod merge; pub mod parquet; pub mod planner; pub mod projection; +pub mod repartition; pub mod sort; pub mod string_expressions; pub mod type_coercion; diff --git a/rust/datafusion/src/physical_plan/repartition.rs b/rust/datafusion/src/physical_plan/repartition.rs new file mode 100644 index 00000000000..5ef07df0c3d --- /dev/null +++ b/rust/datafusion/src/physical_plan/repartition.rs @@ -0,0 +1,170 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +///! The repartition operator maps N input partitions to M output partitions based on a +///! partitioning scheme. +use std::any::Any; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use crate::error::{DataFusionError, Result}; +use crate::physical_plan::{ExecutionPlan, Partitioning, PhysicalExpr}; +use arrow::datatypes::SchemaRef; +use arrow::error::Result as ArrowResult; +use arrow::record_batch::RecordBatch; + +use super::{RecordBatchStream, SendableRecordBatchStream}; +use async_trait::async_trait; + +use futures::stream::Stream; + +/// Partitioning schemes +#[derive(Debug, Clone)] +pub enum PartitioningScheme { + /// Allocate batches using a round-robin algorithm + RoundRobinBatch, + /// Allocate rows using a round-robin algorithm. This provides finer-grained partitioning + /// than `RoundRobinBatch` but also has much more overhead. + RoundRobinRow, + /// Allocate rows based on a hash of one of more expressions + Hash(Vec>), +} + +/// partition. No guarantees are made about the order of the resulting partition. +#[derive(Debug)] +pub struct RepartitionExec { + /// Input execution plan + input: Arc, + /// Partitioning scheme to use + partitioning_scheme: PartitioningScheme, + /// Number of output partitions + num_partitions: usize, +} + +#[async_trait] +impl ExecutionPlan for RepartitionExec { + /// Return a reference to Any that can be used for downcasting + fn as_any(&self) -> &dyn Any { + self + } + + /// Get the schema for this execution plan + fn schema(&self) -> SchemaRef { + self.input.schema() + } + + fn children(&self) -> Vec> { + vec![self.input.clone()] + } + + fn with_new_children( + &self, + children: Vec>, + ) -> Result> { + match children.len() { + 1 => Ok(Arc::new(RepartitionExec::try_new( + children[0].clone(), + self.partitioning_scheme.clone(), + self.num_partitions, + )?)), + _ => Err(DataFusionError::Internal( + "RepartitionExec wrong number of children".to_string(), + )), + } + } + + fn output_partitioning(&self) -> Partitioning { + //TODO needs more work + Partitioning::UnknownPartitioning(self.num_partitions) + } + + async fn execute(&self, partition: usize) -> Result { + // lock mutex + // if first call to this method { + // create one channel per *output* partition + // launch one async task per *input* partition + // } + + // now return stream for the specified *output* partition which will + // read from the channel + + Ok(Box::pin(RepartitionStream { + schema: self.input.schema(), + //input: the *output* channel to read from + })) + } +} + +impl RepartitionExec { + /// Create a new MergeExec + pub fn try_new( + input: Arc, + partioning_scheme: PartitioningScheme, + num_partitions: usize, + ) -> Result { + match &partioning_scheme { + PartitioningScheme::RoundRobinBatch => Ok(RepartitionExec { + input, + partitioning_scheme: partioning_scheme, + num_partitions, + }), + other => Err(DataFusionError::NotImplemented(format!( + "Partitioning scheme not supported yet: {:?}", + other + ))), + } + } + + async fn process_input_partition(&self, partition: usize) -> Result<()> { + let input = self.input.execute(partition).await?; + // for each input batch { + // compute output partition based on partitioning schema + // send batch to the appropriate output channel, or split batch into + // multiple batches if using row-based partitioning + // } + // } + Ok(()) + } +} + +struct RepartitionStream { + schema: SchemaRef, + //input: Channel to read +} + +impl Stream for RepartitionStream { + type Item = ArrowResult; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + unimplemented!() + } + + fn size_hint(&self) -> (usize, Option) { + unimplemented!() + } +} + +impl RecordBatchStream for RepartitionStream { + /// Get the schema + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} From 3c35af9126144af11c6e20f90be5a606aeaf3a21 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 21 Dec 2020 12:06:47 -0700 Subject: [PATCH 02/21] combine partitioning enums --- rust/datafusion/src/physical_plan/mod.rs | 10 ++++++ .../src/physical_plan/repartition.rs | 36 +++++-------------- 2 files changed, 19 insertions(+), 27 deletions(-) diff --git a/rust/datafusion/src/physical_plan/mod.rs b/rust/datafusion/src/physical_plan/mod.rs index 1f7fac24ab6..c0a466e7bd9 100644 --- a/rust/datafusion/src/physical_plan/mod.rs +++ b/rust/datafusion/src/physical_plan/mod.rs @@ -107,6 +107,13 @@ pub async fn collect(plan: Arc) -> Result> { /// Partitioning schemes supported by operators. #[derive(Debug, Clone)] pub enum Partitioning { + /// Allocate batches using a round-robin algorithm + RoundRobinBatch(usize), + /// Allocate rows using a round-robin algorithm. This provides finer-grained partitioning + /// than `RoundRobinBatch` but also has much more overhead. + RoundRobinRow(usize), + /// Allocate rows based on a hash of one of more expressions + Hash(Vec>, usize), /// Unknown partitioning scheme UnknownPartitioning(usize), } @@ -116,6 +123,9 @@ impl Partitioning { pub fn partition_count(&self) -> usize { use Partitioning::*; match self { + RoundRobinBatch(n) => *n, + RoundRobinRow(n) => *n, + Hash(_, n) => *n, UnknownPartitioning(n) => *n, } } diff --git a/rust/datafusion/src/physical_plan/repartition.rs b/rust/datafusion/src/physical_plan/repartition.rs index 5ef07df0c3d..4a5779833a5 100644 --- a/rust/datafusion/src/physical_plan/repartition.rs +++ b/rust/datafusion/src/physical_plan/repartition.rs @@ -23,7 +23,7 @@ use std::sync::Arc; use std::task::{Context, Poll}; use crate::error::{DataFusionError, Result}; -use crate::physical_plan::{ExecutionPlan, Partitioning, PhysicalExpr}; +use crate::physical_plan::{ExecutionPlan, Partitioning}; use arrow::datatypes::SchemaRef; use arrow::error::Result as ArrowResult; use arrow::record_batch::RecordBatch; @@ -33,27 +33,13 @@ use async_trait::async_trait; use futures::stream::Stream; -/// Partitioning schemes -#[derive(Debug, Clone)] -pub enum PartitioningScheme { - /// Allocate batches using a round-robin algorithm - RoundRobinBatch, - /// Allocate rows using a round-robin algorithm. This provides finer-grained partitioning - /// than `RoundRobinBatch` but also has much more overhead. - RoundRobinRow, - /// Allocate rows based on a hash of one of more expressions - Hash(Vec>), -} - /// partition. No guarantees are made about the order of the resulting partition. #[derive(Debug)] pub struct RepartitionExec { /// Input execution plan input: Arc, /// Partitioning scheme to use - partitioning_scheme: PartitioningScheme, - /// Number of output partitions - num_partitions: usize, + partitioning: Partitioning, } #[async_trait] @@ -79,8 +65,7 @@ impl ExecutionPlan for RepartitionExec { match children.len() { 1 => Ok(Arc::new(RepartitionExec::try_new( children[0].clone(), - self.partitioning_scheme.clone(), - self.num_partitions, + self.partitioning.clone(), )?)), _ => Err(DataFusionError::Internal( "RepartitionExec wrong number of children".to_string(), @@ -89,8 +74,7 @@ impl ExecutionPlan for RepartitionExec { } fn output_partitioning(&self) -> Partitioning { - //TODO needs more work - Partitioning::UnknownPartitioning(self.num_partitions) + self.partitioning.clone() } async fn execute(&self, partition: usize) -> Result { @@ -111,17 +95,15 @@ impl ExecutionPlan for RepartitionExec { } impl RepartitionExec { - /// Create a new MergeExec + /// Create a new RepartitionExec pub fn try_new( input: Arc, - partioning_scheme: PartitioningScheme, - num_partitions: usize, + partitioning: Partitioning, ) -> Result { - match &partioning_scheme { - PartitioningScheme::RoundRobinBatch => Ok(RepartitionExec { + match &partitioning { + Partitioning::RoundRobinBatch(_) => Ok(RepartitionExec { input, - partitioning_scheme: partioning_scheme, - num_partitions, + partitioning, }), other => Err(DataFusionError::NotImplemented(format!( "Partitioning scheme not supported yet: {:?}", From 7472e250b0276a53b355492fd54652b192d60dbe Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 21 Dec 2020 12:24:58 -0700 Subject: [PATCH 03/21] roughing out more of the impl --- .../src/physical_plan/repartition.rs | 38 ++++++++++++++----- 1 file changed, 28 insertions(+), 10 deletions(-) diff --git a/rust/datafusion/src/physical_plan/repartition.rs b/rust/datafusion/src/physical_plan/repartition.rs index 4a5779833a5..cc5f0aafedb 100644 --- a/rust/datafusion/src/physical_plan/repartition.rs +++ b/rust/datafusion/src/physical_plan/repartition.rs @@ -31,7 +31,9 @@ use arrow::record_batch::RecordBatch; use super::{RecordBatchStream, SendableRecordBatchStream}; use async_trait::async_trait; +use futures::channel::mpsc::{self, Receiver, Sender}; use futures::stream::Stream; +use tokio::sync::Mutex; /// partition. No guarantees are made about the order of the resulting partition. #[derive(Debug)] @@ -40,6 +42,9 @@ pub struct RepartitionExec { input: Arc, /// Partitioning scheme to use partitioning: Partitioning, + /// Channels for output batches + channels: + Arc>, Receiver>)>>>, } #[async_trait] @@ -78,19 +83,29 @@ impl ExecutionPlan for RepartitionExec { } async fn execute(&self, partition: usize) -> Result { - // lock mutex - // if first call to this method { - // create one channel per *output* partition - // launch one async task per *input* partition - // } + let mut channels = self.channels.lock().await; + if channels.is_empty() { + // allocate output channels once + for _ in 0..self.partitioning.partition_count() { + let buffer_size = 64; // TODO: configurable? + let (sender, receiver) = + mpsc::channel::>(buffer_size); + + //TODO + //channels.push((sender, receiver)); + } + //TODO launch one async task per *input* partition + } // now return stream for the specified *output* partition which will // read from the channel - Ok(Box::pin(RepartitionStream { - schema: self.input.schema(), - //input: the *output* channel to read from - })) + // Ok(Box::pin(RepartitionStream { + // schema: self.input.schema(), + // input: channels[partition].1.clone() + // })) + + unimplemented!() } } @@ -104,6 +119,7 @@ impl RepartitionExec { Partitioning::RoundRobinBatch(_) => Ok(RepartitionExec { input, partitioning, + channels: Arc::new(Mutex::new(vec![])), }), other => Err(DataFusionError::NotImplemented(format!( "Partitioning scheme not supported yet: {:?}", @@ -125,8 +141,10 @@ impl RepartitionExec { } struct RepartitionStream { + /// Schema schema: SchemaRef, - //input: Channel to read + /// channel containing the repartitioned batches + input: Receiver>, } impl Stream for RepartitionStream { From 634c6260bddf52ab4e0fe1fb7d0e99a87efa52fd Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 21 Dec 2020 12:50:08 -0700 Subject: [PATCH 04/21] save progresS --- .../src/physical_plan/repartition.rs | 34 +++++++++++++------ 1 file changed, 24 insertions(+), 10 deletions(-) diff --git a/rust/datafusion/src/physical_plan/repartition.rs b/rust/datafusion/src/physical_plan/repartition.rs index cc5f0aafedb..2086a1735bb 100644 --- a/rust/datafusion/src/physical_plan/repartition.rs +++ b/rust/datafusion/src/physical_plan/repartition.rs @@ -33,7 +33,9 @@ use async_trait::async_trait; use futures::channel::mpsc::{self, Receiver, Sender}; use futures::stream::Stream; +use futures::StreamExt; use tokio::sync::Mutex; +use tokio::task::JoinHandle; /// partition. No guarantees are made about the order of the resulting partition. #[derive(Debug)] @@ -43,8 +45,14 @@ pub struct RepartitionExec { /// Partitioning scheme to use partitioning: Partitioning, /// Channels for output batches - channels: - Arc>, Receiver>)>>>, + channels: Arc< + Mutex< + Vec<( + Sender>, + Receiver>, + )>, + >, + >, } #[async_trait] @@ -85,16 +93,22 @@ impl ExecutionPlan for RepartitionExec { async fn execute(&self, partition: usize) -> Result { let mut channels = self.channels.lock().await; if channels.is_empty() { - // allocate output channels once + // create one channel per *output* partition + let buffer_size = 64; // TODO: configurable? for _ in 0..self.partitioning.partition_count() { - let buffer_size = 64; // TODO: configurable? - let (sender, receiver) = - mpsc::channel::>(buffer_size); - - //TODO - //channels.push((sender, receiver)); + channels.push(mpsc::channel::>(buffer_size)); + } + // launch one async task per *input* partition + for i in 0..self.input.output_partitioning().partition_count() { + let input = self.input.clone(); + let handle: JoinHandle> = tokio::spawn(async move { + let mut stream = input.execute(i).await?; + while let Some(batch) = stream.next().await { + //TODO + } + Ok(()) + }); } - //TODO launch one async task per *input* partition } // now return stream for the specified *output* partition which will From ff1dbbc018832903d0e3828210c28aa79dbabe3e Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 21 Dec 2020 13:05:04 -0700 Subject: [PATCH 05/21] save progress --- .../src/physical_plan/repartition.rs | 42 ++++++++++++------- 1 file changed, 27 insertions(+), 15 deletions(-) diff --git a/rust/datafusion/src/physical_plan/repartition.rs b/rust/datafusion/src/physical_plan/repartition.rs index 2086a1735bb..d375c2889b6 100644 --- a/rust/datafusion/src/physical_plan/repartition.rs +++ b/rust/datafusion/src/physical_plan/repartition.rs @@ -44,15 +44,10 @@ pub struct RepartitionExec { input: Arc, /// Partitioning scheme to use partitioning: Partitioning, - /// Channels for output batches - channels: Arc< - Mutex< - Vec<( - Sender>, - Receiver>, - )>, - >, - >, + /// Receivers for output batches + rx: Arc>>>>, + /// Senders for output batches + tx: Arc>>>>, } #[async_trait] @@ -91,20 +86,36 @@ impl ExecutionPlan for RepartitionExec { } async fn execute(&self, partition: usize) -> Result { - let mut channels = self.channels.lock().await; - if channels.is_empty() { + let mut tx = self.tx.lock().await; + let mut rx = self.rx.lock().await; + if tx.is_empty() { // create one channel per *output* partition let buffer_size = 64; // TODO: configurable? for _ in 0..self.partitioning.partition_count() { - channels.push(mpsc::channel::>(buffer_size)); + let (sender, receiver) = + mpsc::channel::>(buffer_size); + tx.push(sender); + rx.push(receiver); } // launch one async task per *input* partition for i in 0..self.input.output_partitioning().partition_count() { let input = self.input.clone(); - let handle: JoinHandle> = tokio::spawn(async move { + let mut tx = tx.clone(); + let partitioning = self.partitioning.clone(); + let _handle: JoinHandle> = tokio::spawn(async move { let mut stream = input.execute(i).await?; while let Some(batch) = stream.next().await { - //TODO + //TODO error handling + let batch = batch?; + match partitioning { + Partitioning::RoundRobinBatch(n) => { + //TODO pick a channel based on round-robin + let output_partition = 0; + let mut tx = &mut tx[output_partition]; + tx.try_send(Ok(batch)).unwrap(); + } + _ => unimplemented!(), + } } Ok(()) }); @@ -133,7 +144,8 @@ impl RepartitionExec { Partitioning::RoundRobinBatch(_) => Ok(RepartitionExec { input, partitioning, - channels: Arc::new(Mutex::new(vec![])), + tx: Arc::new(Mutex::new(vec![])), + rx: Arc::new(Mutex::new(vec![])), }), other => Err(DataFusionError::NotImplemented(format!( "Partitioning scheme not supported yet: {:?}", From f5dff600e363d562cb9ad9edac296f3a98f41e1c Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 22 Dec 2020 10:04:32 -0700 Subject: [PATCH 06/21] use async-channel --- rust/datafusion/src/physical_plan/mod.rs | 4 -- .../src/physical_plan/repartition.rs | 58 +++++++++---------- 2 files changed, 28 insertions(+), 34 deletions(-) diff --git a/rust/datafusion/src/physical_plan/mod.rs b/rust/datafusion/src/physical_plan/mod.rs index c0a466e7bd9..78550a721cf 100644 --- a/rust/datafusion/src/physical_plan/mod.rs +++ b/rust/datafusion/src/physical_plan/mod.rs @@ -109,9 +109,6 @@ pub async fn collect(plan: Arc) -> Result> { pub enum Partitioning { /// Allocate batches using a round-robin algorithm RoundRobinBatch(usize), - /// Allocate rows using a round-robin algorithm. This provides finer-grained partitioning - /// than `RoundRobinBatch` but also has much more overhead. - RoundRobinRow(usize), /// Allocate rows based on a hash of one of more expressions Hash(Vec>, usize), /// Unknown partitioning scheme @@ -124,7 +121,6 @@ impl Partitioning { use Partitioning::*; match self { RoundRobinBatch(n) => *n, - RoundRobinRow(n) => *n, Hash(_, n) => *n, UnknownPartitioning(n) => *n, } diff --git a/rust/datafusion/src/physical_plan/repartition.rs b/rust/datafusion/src/physical_plan/repartition.rs index d375c2889b6..f1c003fc4e6 100644 --- a/rust/datafusion/src/physical_plan/repartition.rs +++ b/rust/datafusion/src/physical_plan/repartition.rs @@ -31,7 +31,7 @@ use arrow::record_batch::RecordBatch; use super::{RecordBatchStream, SendableRecordBatchStream}; use async_trait::async_trait; -use futures::channel::mpsc::{self, Receiver, Sender}; +use async_channel::{self, Receiver, Sender}; use futures::stream::Stream; use futures::StreamExt; use tokio::sync::Mutex; @@ -86,36 +86,39 @@ impl ExecutionPlan for RepartitionExec { } async fn execute(&self, partition: usize) -> Result { + // lock mutexes let mut tx = self.tx.lock().await; let mut rx = self.rx.lock().await; + + // if this is the first partition to be invoked then we need to set up initial state if tx.is_empty() { // create one channel per *output* partition - let buffer_size = 64; // TODO: configurable? for _ in 0..self.partitioning.partition_count() { let (sender, receiver) = - mpsc::channel::>(buffer_size); + async_channel::bounded::>(1); tx.push(sender); rx.push(receiver); } // launch one async task per *input* partition - for i in 0..self.input.output_partitioning().partition_count() { + let num_output_partitions = + self.input.output_partitioning().partition_count(); + for i in 0..num_output_partitions { let input = self.input.clone(); let mut tx = tx.clone(); let partitioning = self.partitioning.clone(); let _handle: JoinHandle> = tokio::spawn(async move { let mut stream = input.execute(i).await?; - while let Some(batch) = stream.next().await { - //TODO error handling - let batch = batch?; + let mut counter = 0; + while let Some(result) = stream.next().await { match partitioning { - Partitioning::RoundRobinBatch(n) => { - //TODO pick a channel based on round-robin - let output_partition = 0; - let mut tx = &mut tx[output_partition]; - tx.try_send(Ok(batch)).unwrap(); + Partitioning::RoundRobinBatch(_) => { + let output_partition = counter % num_output_partitions; + let tx = &mut tx[output_partition]; + tx.try_send(result).unwrap(); //TODO remove unwrap } _ => unimplemented!(), } + counter += 1; } Ok(()) }); @@ -124,13 +127,10 @@ impl ExecutionPlan for RepartitionExec { // now return stream for the specified *output* partition which will // read from the channel - - // Ok(Box::pin(RepartitionStream { - // schema: self.input.schema(), - // input: channels[partition].1.clone() - // })) - - unimplemented!() + Ok(Box::pin(RepartitionStream { + schema: self.input.schema(), + input: rx[partition].clone(), + })) } } @@ -153,17 +153,6 @@ impl RepartitionExec { ))), } } - - async fn process_input_partition(&self, partition: usize) -> Result<()> { - let input = self.input.execute(partition).await?; - // for each input batch { - // compute output partition based on partitioning schema - // send batch to the appropriate output channel, or split batch into - // multiple batches if using row-based partitioning - // } - // } - Ok(()) - } } struct RepartitionStream { @@ -180,6 +169,15 @@ impl Stream for RepartitionStream { mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { + // TODO I need help here probably + + //self.input.poll_next() + // match self.input.recv() { + // Ok(batch) => Poll::Ready(batch), + // // RecvError means receiver has exited and closed the channel + // Err(RecvError) => Poll::Ready(None), + // } + unimplemented!() } From d546890a967de42a4bfb70a9a9490b4edfa6b224 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 22 Dec 2020 10:14:35 -0700 Subject: [PATCH 07/21] operator maybe code complete --- .../src/physical_plan/repartition.rs | 33 +++++++------------ 1 file changed, 12 insertions(+), 21 deletions(-) diff --git a/rust/datafusion/src/physical_plan/repartition.rs b/rust/datafusion/src/physical_plan/repartition.rs index f1c003fc4e6..08197c4513a 100644 --- a/rust/datafusion/src/physical_plan/repartition.rs +++ b/rust/datafusion/src/physical_plan/repartition.rs @@ -15,8 +15,9 @@ // specific language governing permissions and limitations // under the License. -///! The repartition operator maps N input partitions to M output partitions based on a -///! partitioning scheme. +//! The repartition operator maps N input partitions to M output partitions based on a +//! partitioning scheme. + use std::any::Any; use std::pin::Pin; use std::sync::Arc; @@ -31,7 +32,7 @@ use arrow::record_batch::RecordBatch; use super::{RecordBatchStream, SendableRecordBatchStream}; use async_trait::async_trait; -use async_channel::{self, Receiver, Sender}; +use crossbeam::channel::{bounded, Receiver, RecvError, Sender}; use futures::stream::Stream; use futures::StreamExt; use tokio::sync::Mutex; @@ -94,8 +95,7 @@ impl ExecutionPlan for RepartitionExec { if tx.is_empty() { // create one channel per *output* partition for _ in 0..self.partitioning.partition_count() { - let (sender, receiver) = - async_channel::bounded::>(1); + let (sender, receiver) = bounded::>(1); tx.push(sender); rx.push(receiver); } @@ -166,23 +166,14 @@ impl Stream for RepartitionStream { type Item = ArrowResult; fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, + self: Pin<&mut Self>, + _cx: &mut Context<'_>, ) -> Poll> { - // TODO I need help here probably - - //self.input.poll_next() - // match self.input.recv() { - // Ok(batch) => Poll::Ready(batch), - // // RecvError means receiver has exited and closed the channel - // Err(RecvError) => Poll::Ready(None), - // } - - unimplemented!() - } - - fn size_hint(&self) -> (usize, Option) { - unimplemented!() + match self.input.recv() { + Ok(batch) => Poll::Ready(Some(batch)), + // RecvError means receiver has exited and closed the channel + Err(RecvError) => Poll::Ready(None), + } } } From 72869d47326e0b4b91aa89a28f5da535eb12e7e8 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 22 Dec 2020 10:17:17 -0700 Subject: [PATCH 08/21] improve error handling --- rust/datafusion/src/physical_plan/repartition.rs | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/rust/datafusion/src/physical_plan/repartition.rs b/rust/datafusion/src/physical_plan/repartition.rs index 08197c4513a..04a3b31492b 100644 --- a/rust/datafusion/src/physical_plan/repartition.rs +++ b/rust/datafusion/src/physical_plan/repartition.rs @@ -114,9 +114,18 @@ impl ExecutionPlan for RepartitionExec { Partitioning::RoundRobinBatch(_) => { let output_partition = counter % num_output_partitions; let tx = &mut tx[output_partition]; - tx.try_send(result).unwrap(); //TODO remove unwrap + tx.send(result).map_err(|e| { + DataFusionError::Execution(e.to_string()) + })?; + } + other => { + // this should be unreachable as long as the validation logic + // in the constructor is kept up-to-date + return Err(DataFusionError::NotImplemented(format!( + "Unsupported repartitioning scheme {:?}", + other + ))); } - _ => unimplemented!(), } counter += 1; } From 9c7716bf02e38d5ae8106ab0152547234b982863 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 22 Dec 2020 10:48:18 -0700 Subject: [PATCH 09/21] plumb through to LogicalPlan and DataFrame --- rust/datafusion/src/dataframe.rs | 22 +++++++++- .../src/execution/dataframe_impl.rs | 11 +++++ rust/datafusion/src/logical_plan/builder.rs | 10 ++++- rust/datafusion/src/logical_plan/mod.rs | 4 +- rust/datafusion/src/logical_plan/plan.rs | 42 ++++++++++++++++++- .../src/optimizer/hash_build_probe_order.rs | 1 + .../src/optimizer/projection_push_down.rs | 1 + rust/datafusion/src/optimizer/utils.rs | 23 +++++++++- rust/datafusion/src/physical_plan/planner.rs | 32 ++++++++++++-- .../src/physical_plan/repartition.rs | 11 +++++ rust/datafusion/src/prelude.rs | 1 + 11 files changed, 150 insertions(+), 8 deletions(-) diff --git a/rust/datafusion/src/dataframe.rs b/rust/datafusion/src/dataframe.rs index 6cab9299f43..826f2c53a11 100644 --- a/rust/datafusion/src/dataframe.rs +++ b/rust/datafusion/src/dataframe.rs @@ -19,7 +19,9 @@ use crate::arrow::record_batch::RecordBatch; use crate::error::Result; -use crate::logical_plan::{DFSchema, Expr, FunctionRegistry, JoinType, LogicalPlan}; +use crate::logical_plan::{ + DFSchema, Expr, FunctionRegistry, JoinType, LogicalPlan, Partitioning, +}; use std::sync::Arc; use async_trait::async_trait; @@ -172,6 +174,24 @@ pub trait DataFrame { right_cols: &[&str], ) -> Result>; + /// Repartition a DataFrame based on a logical partitioning scheme. + /// + /// ``` + /// # use datafusion::prelude::*; + /// # use datafusion::error::Result; + /// # fn main() -> Result<()> { + /// let mut ctx = ExecutionContext::new(); + /// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?; + /// let df1 = df.repartition(Partitioning::RoundRobinBatch(4))?; + /// let df2 = df.repartition(Partitioning::Hash(vec![col("a")], 4))?; + /// # Ok(()) + /// # } + /// ``` + fn repartition( + &self, + partitioning_scheme: Partitioning, + ) -> Result>; + /// Executes this DataFrame and collects all results into a vector of RecordBatch. /// /// ``` diff --git a/rust/datafusion/src/execution/dataframe_impl.rs b/rust/datafusion/src/execution/dataframe_impl.rs index 3badb4fc389..22be5513224 100644 --- a/rust/datafusion/src/execution/dataframe_impl.rs +++ b/rust/datafusion/src/execution/dataframe_impl.rs @@ -24,6 +24,7 @@ use crate::error::Result; use crate::execution::context::{ExecutionContext, ExecutionContextState}; use crate::logical_plan::{ col, DFSchema, Expr, FunctionRegistry, JoinType, LogicalPlan, LogicalPlanBuilder, + Partitioning, }; use crate::{arrow::record_batch::RecordBatch, physical_plan::collect}; @@ -111,6 +112,16 @@ impl DataFrame for DataFrameImpl { Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan))) } + fn repartition( + &self, + partitioning_scheme: Partitioning, + ) -> Result> { + let plan = LogicalPlanBuilder::from(&self.plan) + .repartition(partitioning_scheme)? + .build()?; + Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan))) + } + /// Convert to logical plan fn to_logical_plan(&self) -> LogicalPlan { self.plan.clone() diff --git a/rust/datafusion/src/logical_plan/builder.rs b/rust/datafusion/src/logical_plan/builder.rs index 3a98f676320..23bf2c5ac33 100644 --- a/rust/datafusion/src/logical_plan/builder.rs +++ b/rust/datafusion/src/logical_plan/builder.rs @@ -35,7 +35,7 @@ use super::dfschema::ToDFSchema; use super::{ col, exprlist_to_fields, Expr, JoinType, LogicalPlan, PlanType, StringifiedPlan, }; -use crate::logical_plan::{DFField, DFSchema, DFSchemaRef}; +use crate::logical_plan::{DFField, DFSchema, DFSchemaRef, Partitioning}; use std::collections::HashSet; /// Builder for logical plans @@ -207,6 +207,14 @@ impl LogicalPlanBuilder { } } + /// Repartition + pub fn repartition(&self, partitioning_scheme: Partitioning) -> Result { + Ok(Self::from(&LogicalPlan::Repartition { + input: Arc::new(self.plan.clone()), + partitioning_scheme, + })) + } + /// Apply an aggregate pub fn aggregate(&self, group_expr: Vec, aggr_expr: Vec) -> Result { let mut all_expr: Vec = group_expr.clone(); diff --git a/rust/datafusion/src/logical_plan/mod.rs b/rust/datafusion/src/logical_plan/mod.rs index 4cd4d99587b..f810b0162c4 100644 --- a/rust/datafusion/src/logical_plan/mod.rs +++ b/rust/datafusion/src/logical_plan/mod.rs @@ -41,5 +41,7 @@ pub use expr::{ }; pub use extension::UserDefinedLogicalNode; pub use operators::Operator; -pub use plan::{JoinType, LogicalPlan, PlanType, PlanVisitor, StringifiedPlan}; +pub use plan::{ + JoinType, LogicalPlan, Partitioning, PlanType, PlanVisitor, StringifiedPlan, +}; pub use registry::FunctionRegistry; diff --git a/rust/datafusion/src/logical_plan/plan.rs b/rust/datafusion/src/logical_plan/plan.rs index 957f2bb3e4d..3710cfa6b80 100644 --- a/rust/datafusion/src/logical_plan/plan.rs +++ b/rust/datafusion/src/logical_plan/plan.rs @@ -110,6 +110,13 @@ pub enum LogicalPlan { /// The output schema, containing fields from the left and right inputs schema: DFSchemaRef, }, + /// Repartition the plan based on a partitioning scheme. + Repartition { + /// The incoming logical plan + input: Arc, + /// The partitioning scheme + partitioning_scheme: Partitioning, + }, /// Produces rows from a table provider by reference or from the context TableScan { /// The name of the table @@ -182,6 +189,7 @@ impl LogicalPlan { LogicalPlan::Aggregate { schema, .. } => &schema, LogicalPlan::Sort { input, .. } => input.schema(), LogicalPlan::Join { schema, .. } => &schema, + LogicalPlan::Repartition { input, .. } => input.schema(), LogicalPlan::Limit { input, .. } => input.schema(), LogicalPlan::CreateExternalTable { schema, .. } => &schema, LogicalPlan::Explain { schema, .. } => &schema, @@ -198,6 +206,15 @@ impl LogicalPlan { } } +/// Logical partitioning schemes supported by the repartition operator. +#[derive(Debug, Clone)] +pub enum Partitioning { + /// Allocate batches using a round-robin algorithm + RoundRobinBatch(usize), + /// Allocate rows based on a hash of one of more expressions + Hash(Vec, usize), +} + /// Trait that implements the [Visitor /// pattern](https://en.wikipedia.org/wiki/Visitor_pattern) for a /// depth first walk of `LogicalPlan` nodes. `pre_visit` is called @@ -261,6 +278,7 @@ impl LogicalPlan { let recurse = match self { LogicalPlan::Projection { input, .. } => input.accept(visitor)?, LogicalPlan::Filter { input, .. } => input.accept(visitor)?, + LogicalPlan::Repartition { input, .. } => input.accept(visitor)?, LogicalPlan::Aggregate { input, .. } => input.accept(visitor)?, LogicalPlan::Sort { input, .. } => input.accept(visitor)?, LogicalPlan::Join { left, right, .. } => { @@ -464,7 +482,7 @@ impl LogicalPlan { struct Wrapper<'a>(&'a LogicalPlan); impl<'a> fmt::Display for Wrapper<'a> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match *self.0 { + match &*self.0 { LogicalPlan::EmptyRelation { .. } => write!(f, "EmptyRelation"), LogicalPlan::TableScan { ref table_name, @@ -523,6 +541,28 @@ impl LogicalPlan { keys.iter().map(|(l, r)| format!("{} = {}", l, r)).collect(); write!(f, "Join: {}", join_expr.join(", ")) } + LogicalPlan::Repartition { + partitioning_scheme, + .. + } => match partitioning_scheme { + Partitioning::RoundRobinBatch(n) => { + write!( + f, + "Repartition: RoundRobinBatch partition_count={}", + n + ) + } + Partitioning::Hash(expr, n) => { + let hash_expr: Vec = + expr.iter().map(|e| format!("{:?}", e)).collect(); + write!( + f, + "Repartition: Hash({}) partition_count={}", + hash_expr.join(", "), + n + ) + } + }, LogicalPlan::Limit { ref n, .. } => write!(f, "Limit: {}", n), LogicalPlan::CreateExternalTable { ref name, .. } => { write!(f, "CreateExternalTable: {:?}", name) diff --git a/rust/datafusion/src/optimizer/hash_build_probe_order.rs b/rust/datafusion/src/optimizer/hash_build_probe_order.rs index df05076bc39..179149b4891 100644 --- a/rust/datafusion/src/optimizer/hash_build_probe_order.rs +++ b/rust/datafusion/src/optimizer/hash_build_probe_order.rs @@ -117,6 +117,7 @@ impl OptimizerRule for HashBuildProbeOrder { | LogicalPlan::TableScan { .. } | LogicalPlan::Limit { .. } | LogicalPlan::Filter { .. } + | LogicalPlan::Repartition { .. } | LogicalPlan::EmptyRelation { .. } | LogicalPlan::Sort { .. } | LogicalPlan::CreateExternalTable { .. } diff --git a/rust/datafusion/src/optimizer/projection_push_down.rs b/rust/datafusion/src/optimizer/projection_push_down.rs index 92236b97c6e..bb87ed823d5 100644 --- a/rust/datafusion/src/optimizer/projection_push_down.rs +++ b/rust/datafusion/src/optimizer/projection_push_down.rs @@ -275,6 +275,7 @@ fn optimize_plan( // expressions in this node to the list of required columns LogicalPlan::Limit { .. } | LogicalPlan::Filter { .. } + | LogicalPlan::Repartition { .. } | LogicalPlan::EmptyRelation { .. } | LogicalPlan::Sort { .. } | LogicalPlan::CreateExternalTable { .. } diff --git a/rust/datafusion/src/optimizer/utils.rs b/rust/datafusion/src/optimizer/utils.rs index da13b73ae78..2e950fd2be3 100644 --- a/rust/datafusion/src/optimizer/utils.rs +++ b/rust/datafusion/src/optimizer/utils.rs @@ -24,7 +24,7 @@ use arrow::datatypes::Schema; use super::optimizer::OptimizerRule; use crate::error::{DataFusionError, Result}; use crate::logical_plan::{ - Expr, LogicalPlan, Operator, PlanType, StringifiedPlan, ToDFSchema, + Expr, LogicalPlan, Operator, Partitioning, PlanType, StringifiedPlan, ToDFSchema, }; use crate::prelude::{col, lit}; use crate::scalar::ScalarValue; @@ -140,6 +140,13 @@ pub fn expressions(plan: &LogicalPlan) -> Vec { match plan { LogicalPlan::Projection { expr, .. } => expr.clone(), LogicalPlan::Filter { predicate, .. } => vec![predicate.clone()], + LogicalPlan::Repartition { + partitioning_scheme, + .. + } => match partitioning_scheme { + Partitioning::Hash(expr, _) => expr.clone(), + _ => vec![], + }, LogicalPlan::Aggregate { group_expr, aggr_expr, @@ -168,6 +175,7 @@ pub fn inputs(plan: &LogicalPlan) -> Vec<&LogicalPlan> { match plan { LogicalPlan::Projection { input, .. } => vec![input], LogicalPlan::Filter { input, .. } => vec![input], + LogicalPlan::Repartition { input, .. } => vec![input], LogicalPlan::Aggregate { input, .. } => vec![input], LogicalPlan::Sort { input, .. } => vec![input], LogicalPlan::Join { left, right, .. } => vec![left, right], @@ -197,6 +205,19 @@ pub fn from_plan( predicate: expr[0].clone(), input: Arc::new(inputs[0].clone()), }), + LogicalPlan::Repartition { + partitioning_scheme, + .. + } => match partitioning_scheme { + Partitioning::RoundRobinBatch(n) => Ok(LogicalPlan::Repartition { + partitioning_scheme: Partitioning::RoundRobinBatch(*n), + input: Arc::new(inputs[0].clone()), + }), + Partitioning::Hash(_, n) => Ok(LogicalPlan::Repartition { + partitioning_scheme: Partitioning::Hash(expr.to_owned(), *n), + input: Arc::new(inputs[0].clone()), + }), + }, LogicalPlan::Aggregate { group_expr, schema, .. } => Ok(LogicalPlan::Aggregate { diff --git a/rust/datafusion/src/physical_plan/planner.rs b/rust/datafusion/src/physical_plan/planner.rs index 1d3a479ea03..310f07b3d3e 100644 --- a/rust/datafusion/src/physical_plan/planner.rs +++ b/rust/datafusion/src/physical_plan/planner.rs @@ -23,21 +23,22 @@ use super::{aggregates, empty::EmptyExec, expressions::binary, functions, udaf}; use crate::error::{DataFusionError, Result}; use crate::execution::context::ExecutionContextState; use crate::logical_plan::{ - DFSchema, Expr, LogicalPlan, Operator, PlanType, StringifiedPlan, - UserDefinedLogicalNode, + DFSchema, Expr, LogicalPlan, Operator, Partitioning as LogicalPartitioning, PlanType, + StringifiedPlan, UserDefinedLogicalNode, }; use crate::physical_plan::explain::ExplainExec; use crate::physical_plan::expressions::{CaseExpr, Column, Literal, PhysicalSortExpr}; use crate::physical_plan::filter::FilterExec; use crate::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec}; use crate::physical_plan::hash_join::HashJoinExec; -use crate::physical_plan::hash_utils; use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use crate::physical_plan::merge::MergeExec; use crate::physical_plan::projection::ProjectionExec; +use crate::physical_plan::repartition::RepartitionExec; use crate::physical_plan::sort::SortExec; use crate::physical_plan::udf; use crate::physical_plan::{expressions, Distribution}; +use crate::physical_plan::{hash_utils, Partitioning}; use crate::physical_plan::{AggregateExpr, ExecutionPlan, PhysicalExpr, PhysicalPlanner}; use crate::prelude::JoinType; use crate::variable::VarType; @@ -228,6 +229,31 @@ impl DefaultPhysicalPlanner { self.create_physical_expr(predicate, &input_schema, ctx_state)?; Ok(Arc::new(FilterExec::try_new(runtime_expr, input)?)) } + LogicalPlan::Repartition { + input, + partitioning_scheme, + } => { + let input = self.create_physical_plan(input, ctx_state)?; + let input_schema = input.schema(); + let physical_partitioning = match partitioning_scheme { + LogicalPartitioning::RoundRobinBatch(n) => { + Partitioning::RoundRobinBatch(*n) + } + LogicalPartitioning::Hash(expr, n) => { + let runtime_expr = expr + .iter() + .map(|e| { + self.create_physical_expr(e, &input_schema, &ctx_state) + }) + .collect::>>()?; + Partitioning::Hash(runtime_expr, *n) + } + }; + Ok(Arc::new(RepartitionExec::try_new( + input, + physical_partitioning, + )?)) + } LogicalPlan::Sort { expr, input, .. } => { let input = self.create_physical_plan(input, ctx_state)?; let input_schema = input.as_ref().schema(); diff --git a/rust/datafusion/src/physical_plan/repartition.rs b/rust/datafusion/src/physical_plan/repartition.rs index 04a3b31492b..1e23ca79096 100644 --- a/rust/datafusion/src/physical_plan/repartition.rs +++ b/rust/datafusion/src/physical_plan/repartition.rs @@ -192,3 +192,14 @@ impl RecordBatchStream for RepartitionStream { self.schema.clone() } } + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test() -> Result<()> { + // TODO write tests for the physical operator + Ok(()) + } +} diff --git a/rust/datafusion/src/prelude.rs b/rust/datafusion/src/prelude.rs index 20bbbe47c97..309b75bc6b1 100644 --- a/rust/datafusion/src/prelude.rs +++ b/rust/datafusion/src/prelude.rs @@ -29,5 +29,6 @@ pub use crate::dataframe::DataFrame; pub use crate::execution::context::{ExecutionConfig, ExecutionContext}; pub use crate::logical_plan::{ array, avg, col, concat, count, create_udf, length, lit, max, min, sum, JoinType, + Partitioning, }; pub use crate::physical_plan::csv::CsvReadOptions; From 1441d85e7b02c04c91067ecd0b83648fe47275f3 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 22 Dec 2020 10:59:47 -0700 Subject: [PATCH 10/21] Update TPC-H file conversion to support optional repartitioning --- rust/benchmarks/src/bin/tpch.rs | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/rust/benchmarks/src/bin/tpch.rs b/rust/benchmarks/src/bin/tpch.rs index 6c5d06cf37f..8a6fca78930 100644 --- a/rust/benchmarks/src/bin/tpch.rs +++ b/rust/benchmarks/src/bin/tpch.rs @@ -18,7 +18,6 @@ //! Benchmark derived from TPC-H. This is not an official TPC-H benchmark. use std::path::{Path, PathBuf}; -use std::sync::Arc; use std::time::Instant; use arrow::datatypes::{DataType, DateUnit, Field, Schema}; @@ -28,7 +27,6 @@ use datafusion::datasource::{CsvFile, MemTable, TableProvider}; use datafusion::error::{DataFusionError, Result}; use datafusion::logical_plan::LogicalPlan; use datafusion::physical_plan::collect; -use datafusion::physical_plan::csv::CsvExec; use datafusion::prelude::*; use parquet::basic::Compression; @@ -87,6 +85,10 @@ struct ConvertOpt { /// Compression to use when writing Parquet files #[structopt(short = "c", long = "compression", default_value = "snappy")] compression: String, + + /// Number of partitions to produce + #[structopt(short = "p", long = "partitions", default_value = "1")] + partitions: usize, } #[derive(Debug, StructOpt)] @@ -1017,8 +1019,21 @@ async fn convert_tbl(opt: ConvertOpt) -> Result<()> { .delimiter(b'|') .file_extension(".tbl"); - let ctx = ExecutionContext::new(); - let csv = Arc::new(CsvExec::try_new(&input_path, options, None, 4096)?); + let mut ctx = ExecutionContext::new(); + + // build plan to read the TBL file + let mut csv = ctx.read_csv(&input_path, options)?; + + // optionally, repartition the file + if opt.partitions > 1 { + csv = csv.repartition(Partitioning::RoundRobinBatch(opt.partitions))? + } + + // create the physical plan + let csv = csv.to_logical_plan(); + let csv = ctx.optimize(&csv)?; + let csv = ctx.create_physical_plan(&csv)?; + let output_path = output_root_path.join(table); let output_path = output_path.to_str().unwrap().to_owned(); From a9f4b42b52a44e53549f9f98e97cb6f20b07cc28 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 22 Dec 2020 11:48:09 -0700 Subject: [PATCH 11/21] repartition operator works functionally --- .../src/physical_plan/repartition.rs | 55 ++++++++++++++----- 1 file changed, 40 insertions(+), 15 deletions(-) diff --git a/rust/datafusion/src/physical_plan/repartition.rs b/rust/datafusion/src/physical_plan/repartition.rs index 1e23ca79096..7f8ffd05925 100644 --- a/rust/datafusion/src/physical_plan/repartition.rs +++ b/rust/datafusion/src/physical_plan/repartition.rs @@ -32,7 +32,7 @@ use arrow::record_batch::RecordBatch; use super::{RecordBatchStream, SendableRecordBatchStream}; use async_trait::async_trait; -use crossbeam::channel::{bounded, Receiver, RecvError, Sender}; +use crossbeam::channel::{unbounded, Receiver, Sender}; use futures::stream::Stream; use futures::StreamExt; use tokio::sync::Mutex; @@ -46,9 +46,9 @@ pub struct RepartitionExec { /// Partitioning scheme to use partitioning: Partitioning, /// Receivers for output batches - rx: Arc>>>>, + rx: Arc>>>>>, /// Senders for output batches - tx: Arc>>>>, + tx: Arc>>>>>, } #[async_trait] @@ -91,30 +91,34 @@ impl ExecutionPlan for RepartitionExec { let mut tx = self.tx.lock().await; let mut rx = self.rx.lock().await; + let num_input_partitions = self.input.output_partitioning().partition_count(); + let num_output_partition = self.partitioning.partition_count(); + // if this is the first partition to be invoked then we need to set up initial state if tx.is_empty() { // create one channel per *output* partition - for _ in 0..self.partitioning.partition_count() { - let (sender, receiver) = bounded::>(1); + for _ in 0..num_output_partition { + //TODO this operator currently uses unbounded channels to avoid deadlocks and + // this is far from ideal + + let (sender, receiver) = unbounded::>>(); tx.push(sender); rx.push(receiver); } // launch one async task per *input* partition - let num_output_partitions = - self.input.output_partitioning().partition_count(); - for i in 0..num_output_partitions { + for i in 0..num_input_partitions { let input = self.input.clone(); let mut tx = tx.clone(); let partitioning = self.partitioning.clone(); - let _handle: JoinHandle> = tokio::spawn(async move { + let _: JoinHandle> = tokio::spawn(async move { let mut stream = input.execute(i).await?; let mut counter = 0; while let Some(result) = stream.next().await { match partitioning { Partitioning::RoundRobinBatch(_) => { - let output_partition = counter % num_output_partitions; + let output_partition = counter % num_output_partition; let tx = &mut tx[output_partition]; - tx.send(result).map_err(|e| { + tx.send(Some(result)).map_err(|e| { DataFusionError::Execution(e.to_string()) })?; } @@ -129,6 +133,13 @@ impl ExecutionPlan for RepartitionExec { } counter += 1; } + + // notify each output partition that this input partition has no more data + for i in 0..num_output_partition { + let tx = &mut tx[i]; + tx.send(None) + .map_err(|e| DataFusionError::Execution(e.to_string()))?; + } Ok(()) }); } @@ -137,6 +148,8 @@ impl ExecutionPlan for RepartitionExec { // now return stream for the specified *output* partition which will // read from the channel Ok(Box::pin(RepartitionStream { + num_input_partitions, + num_input_partitions_processed: 0, schema: self.input.schema(), input: rx[partition].clone(), })) @@ -165,23 +178,35 @@ impl RepartitionExec { } struct RepartitionStream { + num_input_partitions: usize, + num_input_partitions_processed: usize, /// Schema schema: SchemaRef, /// channel containing the repartitioned batches - input: Receiver>, + input: Receiver>>, } impl Stream for RepartitionStream { type Item = ArrowResult; fn poll_next( - self: Pin<&mut Self>, + mut self: Pin<&mut Self>, _cx: &mut Context<'_>, ) -> Poll> { match self.input.recv() { - Ok(batch) => Poll::Ready(Some(batch)), + Ok(Some(batch)) => Poll::Ready(Some(batch)), + // End of results from one input partition + Ok(None) => { + self.num_input_partitions_processed += 1; + if self.num_input_partitions == self.num_input_partitions_processed { + // all input partitions have finished sending batches + Poll::Ready(None) + } else { + Poll::Pending + } + } // RecvError means receiver has exited and closed the channel - Err(RecvError) => Poll::Ready(None), + Err(_) => Poll::Ready(None), } } } From d845f927ced825e8e6a3a5b283a7e2b8cdef2426 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 22 Dec 2020 11:49:23 -0700 Subject: [PATCH 12/21] docs --- rust/datafusion/src/physical_plan/repartition.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/rust/datafusion/src/physical_plan/repartition.rs b/rust/datafusion/src/physical_plan/repartition.rs index 7f8ffd05925..6768cb1cba7 100644 --- a/rust/datafusion/src/physical_plan/repartition.rs +++ b/rust/datafusion/src/physical_plan/repartition.rs @@ -178,7 +178,9 @@ impl RepartitionExec { } struct RepartitionStream { + /// Number of input partitions that will be sending batches to this output channel num_input_partitions: usize, + /// Number of input partitions that have finished sending batches to this output channel num_input_partitions_processed: usize, /// Schema schema: SchemaRef, @@ -202,6 +204,7 @@ impl Stream for RepartitionStream { // all input partitions have finished sending batches Poll::Ready(None) } else { + // other partitions still have data to send Poll::Pending } } From 16b69f730aa9ce2b7168169939ee938a6424f9d5 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 22 Dec 2020 12:14:27 -0700 Subject: [PATCH 13/21] add unit test --- .../src/physical_plan/repartition.rs | 69 ++++++++++++++++++- 1 file changed, 67 insertions(+), 2 deletions(-) diff --git a/rust/datafusion/src/physical_plan/repartition.rs b/rust/datafusion/src/physical_plan/repartition.rs index 6768cb1cba7..6a84e49004f 100644 --- a/rust/datafusion/src/physical_plan/repartition.rs +++ b/rust/datafusion/src/physical_plan/repartition.rs @@ -224,10 +224,75 @@ impl RecordBatchStream for RepartitionStream { #[cfg(test)] mod tests { use super::*; + use crate::physical_plan::memory::MemoryExec; + use arrow::array::UInt32Array; + use arrow::datatypes::{DataType, Field, Schema}; + use arrow::record_batch::RecordBatch; #[tokio::test] - async fn test() -> Result<()> { - // TODO write tests for the physical operator + async fn one_to_many_round_robin() -> Result<()> { + // define input partitions + let schema = test_schema(); + let partition = create_vec_batches(&schema, 50)?; + let single_partition_data = vec![partition]; + + // repartition from 1 input to 4 output + let output_partitions = repartition( + &schema, + single_partition_data, + Partitioning::RoundRobinBatch(4), + ) + .await?; + + assert_eq!(4, output_partitions.len()); + assert_eq!(13, output_partitions[0].len()); + assert_eq!(13, output_partitions[1].len()); + assert_eq!(12, output_partitions[2].len()); + assert_eq!(12, output_partitions[3].len()); + Ok(()) } + + fn test_schema() -> Arc { + Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)])) + } + + fn create_vec_batches(schema: &Arc, n: usize) -> Result> { + let batch = create_batch(schema); + let mut vec = Vec::with_capacity(n); + for _ in 0..n { + vec.push(batch.clone()); + } + Ok(vec) + } + + fn create_batch(schema: &Arc) -> RecordBatch { + RecordBatch::try_new( + schema.clone(), + vec![Arc::new(UInt32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8]))], + ) + .unwrap() + } + + async fn repartition( + schema: &SchemaRef, + input_partitions: Vec>, + partitioning_scheme: Partitioning, + ) -> Result>> { + // create physical plan + let exec = MemoryExec::try_new(&input_partitions, schema.clone(), None)?; + let exec = RepartitionExec::try_new(Arc::new(exec), partitioning_scheme)?; + // execute and collect results + let mut output_partitions = vec![]; + for i in 0..input_partitions.len() { + // execute this *output* partition and collect all batches + let mut stream = exec.execute(i).await?; + let mut batches = vec![]; + while let Some(result) = stream.next().await { + batches.push(result?); + } + output_partitions.push(batches); + } + Ok(output_partitions) + } } From 655426bfa33fef2742cecc70015aaf429e0c8be2 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 22 Dec 2020 12:29:22 -0700 Subject: [PATCH 14/21] fix test --- rust/datafusion/src/physical_plan/repartition.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/rust/datafusion/src/physical_plan/repartition.rs b/rust/datafusion/src/physical_plan/repartition.rs index 6a84e49004f..e4aaaa731b1 100644 --- a/rust/datafusion/src/physical_plan/repartition.rs +++ b/rust/datafusion/src/physical_plan/repartition.rs @@ -229,7 +229,7 @@ mod tests { use arrow::datatypes::{DataType, Field, Schema}; use arrow::record_batch::RecordBatch; - #[tokio::test] + #[tokio::test(threaded_scheduler)] async fn one_to_many_round_robin() -> Result<()> { // define input partitions let schema = test_schema(); @@ -282,9 +282,10 @@ mod tests { // create physical plan let exec = MemoryExec::try_new(&input_partitions, schema.clone(), None)?; let exec = RepartitionExec::try_new(Arc::new(exec), partitioning_scheme)?; + // execute and collect results let mut output_partitions = vec![]; - for i in 0..input_partitions.len() { + for i in 0..exec.partitioning.partition_count() { // execute this *output* partition and collect all batches let mut stream = exec.execute(i).await?; let mut batches = vec![]; From a7483df59494558a0879fde64638e61b90766f5c Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 22 Dec 2020 12:34:00 -0700 Subject: [PATCH 15/21] more tests --- .../src/physical_plan/repartition.rs | 48 ++++++++++++++++--- 1 file changed, 41 insertions(+), 7 deletions(-) diff --git a/rust/datafusion/src/physical_plan/repartition.rs b/rust/datafusion/src/physical_plan/repartition.rs index e4aaaa731b1..c53163d6b4a 100644 --- a/rust/datafusion/src/physical_plan/repartition.rs +++ b/rust/datafusion/src/physical_plan/repartition.rs @@ -234,15 +234,11 @@ mod tests { // define input partitions let schema = test_schema(); let partition = create_vec_batches(&schema, 50)?; - let single_partition_data = vec![partition]; + let partitions = vec![partition]; // repartition from 1 input to 4 output - let output_partitions = repartition( - &schema, - single_partition_data, - Partitioning::RoundRobinBatch(4), - ) - .await?; + let output_partitions = + repartition(&schema, partitions, Partitioning::RoundRobinBatch(4)).await?; assert_eq!(4, output_partitions.len()); assert_eq!(13, output_partitions[0].len()); @@ -253,6 +249,44 @@ mod tests { Ok(()) } + #[tokio::test(threaded_scheduler)] + async fn many_to_one_round_robin() -> Result<()> { + // define input partitions + let schema = test_schema(); + let partition = create_vec_batches(&schema, 50)?; + let partitions = vec![partition.clone(), partition.clone(), partition.clone()]; + + // repartition from 3 input to 1 output + let output_partitions = + repartition(&schema, partitions, Partitioning::RoundRobinBatch(1)).await?; + + assert_eq!(1, output_partitions.len()); + assert_eq!(150, output_partitions[0].len()); + + Ok(()) + } + + #[tokio::test(threaded_scheduler)] + async fn many_to_many_round_robin() -> Result<()> { + // define input partitions + let schema = test_schema(); + let partition = create_vec_batches(&schema, 50)?; + let partitions = vec![partition.clone(), partition.clone(), partition.clone()]; + + // repartition from 3 input to 5 output + let output_partitions = + repartition(&schema, partitions, Partitioning::RoundRobinBatch(5)).await?; + + assert_eq!(5, output_partitions.len()); + assert_eq!(150, output_partitions[0].len()); + assert_eq!(150, output_partitions[1].len()); + assert_eq!(150, output_partitions[2].len()); + assert_eq!(150, output_partitions[3].len()); + assert_eq!(150, output_partitions[4].len()); + + Ok(()) + } + fn test_schema() -> Arc { Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)])) } From 67ee55eee3d15cbef72f0e241dae9ad3462eadc8 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 22 Dec 2020 12:38:17 -0700 Subject: [PATCH 16/21] tests pass --- rust/datafusion/src/physical_plan/repartition.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/rust/datafusion/src/physical_plan/repartition.rs b/rust/datafusion/src/physical_plan/repartition.rs index c53163d6b4a..a683d1a3bec 100644 --- a/rust/datafusion/src/physical_plan/repartition.rs +++ b/rust/datafusion/src/physical_plan/repartition.rs @@ -193,7 +193,7 @@ impl Stream for RepartitionStream { fn poll_next( mut self: Pin<&mut Self>, - _cx: &mut Context<'_>, + cx: &mut Context<'_>, ) -> Poll> { match self.input.recv() { Ok(Some(batch)) => Poll::Ready(Some(batch)), @@ -205,7 +205,7 @@ impl Stream for RepartitionStream { Poll::Ready(None) } else { // other partitions still have data to send - Poll::Pending + self.poll_next(cx) } } // RecvError means receiver has exited and closed the channel @@ -278,11 +278,11 @@ mod tests { repartition(&schema, partitions, Partitioning::RoundRobinBatch(5)).await?; assert_eq!(5, output_partitions.len()); - assert_eq!(150, output_partitions[0].len()); - assert_eq!(150, output_partitions[1].len()); - assert_eq!(150, output_partitions[2].len()); - assert_eq!(150, output_partitions[3].len()); - assert_eq!(150, output_partitions[4].len()); + assert_eq!(30, output_partitions[0].len()); + assert_eq!(30, output_partitions[1].len()); + assert_eq!(30, output_partitions[2].len()); + assert_eq!(30, output_partitions[3].len()); + assert_eq!(30, output_partitions[4].len()); Ok(()) } From 23c0b6c1eb712025c2ce2afc6171e95bca483331 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 22 Dec 2020 12:50:40 -0700 Subject: [PATCH 17/21] Remove TODO comment --- rust/datafusion/src/physical_plan/repartition.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/rust/datafusion/src/physical_plan/repartition.rs b/rust/datafusion/src/physical_plan/repartition.rs index a683d1a3bec..12aa66e5abc 100644 --- a/rust/datafusion/src/physical_plan/repartition.rs +++ b/rust/datafusion/src/physical_plan/repartition.rs @@ -98,9 +98,8 @@ impl ExecutionPlan for RepartitionExec { if tx.is_empty() { // create one channel per *output* partition for _ in 0..num_output_partition { - //TODO this operator currently uses unbounded channels to avoid deadlocks and - // this is far from ideal - + // note that this operator uses unbounded channels to avoid deadlocks because + // the output partitions can be read in any order and block the input partitions let (sender, receiver) = unbounded::>>(); tx.push(sender); rx.push(receiver); From 6c765d7e79514e54bc67df12ee8edaad3fa6d01f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 22 Dec 2020 14:42:50 -0700 Subject: [PATCH 18/21] rename num_output_partition to num_output_partitions and improve documentation --- rust/datafusion/src/physical_plan/repartition.rs | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/rust/datafusion/src/physical_plan/repartition.rs b/rust/datafusion/src/physical_plan/repartition.rs index 12aa66e5abc..6eae399a8ef 100644 --- a/rust/datafusion/src/physical_plan/repartition.rs +++ b/rust/datafusion/src/physical_plan/repartition.rs @@ -92,14 +92,18 @@ impl ExecutionPlan for RepartitionExec { let mut rx = self.rx.lock().await; let num_input_partitions = self.input.output_partitioning().partition_count(); - let num_output_partition = self.partitioning.partition_count(); + let num_output_partitions = self.partitioning.partition_count(); // if this is the first partition to be invoked then we need to set up initial state if tx.is_empty() { // create one channel per *output* partition - for _ in 0..num_output_partition { - // note that this operator uses unbounded channels to avoid deadlocks because - // the output partitions can be read in any order and block the input partitions + for _ in 0..num_output_partitions { + // Note that this operator uses unbounded channels to avoid deadlocks because + // the output partitions can be read in any order and this could cause input + // partitions to be blocked when sending data to output receivers that are not + // being read yet. This may cause high memory usage if the next operator is + // reading output partitions in order rather than concurrently. One workaround + // for this would be to add spill-to-disk capabilities. let (sender, receiver) = unbounded::>>(); tx.push(sender); rx.push(receiver); @@ -115,7 +119,7 @@ impl ExecutionPlan for RepartitionExec { while let Some(result) = stream.next().await { match partitioning { Partitioning::RoundRobinBatch(_) => { - let output_partition = counter % num_output_partition; + let output_partition = counter % num_output_partitions; let tx = &mut tx[output_partition]; tx.send(Some(result)).map_err(|e| { DataFusionError::Execution(e.to_string()) @@ -134,7 +138,7 @@ impl ExecutionPlan for RepartitionExec { } // notify each output partition that this input partition has no more data - for i in 0..num_output_partition { + for i in 0..num_output_partitions { let tx = &mut tx[i]; tx.send(None) .map_err(|e| DataFusionError::Execution(e.to_string()))?; From dc992df0c0c52acdb407204859b79516be74e98d Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 22 Dec 2020 15:20:05 -0700 Subject: [PATCH 19/21] combine senders and receivers in single vec to reduce mutex use --- .../src/physical_plan/repartition.rs | 27 +++++++++---------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/rust/datafusion/src/physical_plan/repartition.rs b/rust/datafusion/src/physical_plan/repartition.rs index 6eae399a8ef..254c511a8fb 100644 --- a/rust/datafusion/src/physical_plan/repartition.rs +++ b/rust/datafusion/src/physical_plan/repartition.rs @@ -38,6 +38,8 @@ use futures::StreamExt; use tokio::sync::Mutex; use tokio::task::JoinHandle; +type MaybeBatch = Option>; + /// partition. No guarantees are made about the order of the resulting partition. #[derive(Debug)] pub struct RepartitionExec { @@ -45,10 +47,8 @@ pub struct RepartitionExec { input: Arc, /// Partitioning scheme to use partitioning: Partitioning, - /// Receivers for output batches - rx: Arc>>>>>, - /// Senders for output batches - tx: Arc>>>>>, + /// Channels for sending batches from input partitions to output partitions + channels: Arc, Receiver)>>>, } #[async_trait] @@ -88,14 +88,13 @@ impl ExecutionPlan for RepartitionExec { async fn execute(&self, partition: usize) -> Result { // lock mutexes - let mut tx = self.tx.lock().await; - let mut rx = self.rx.lock().await; + let mut channels = self.channels.lock().await; let num_input_partitions = self.input.output_partitioning().partition_count(); let num_output_partitions = self.partitioning.partition_count(); // if this is the first partition to be invoked then we need to set up initial state - if tx.is_empty() { + if channels.is_empty() { // create one channel per *output* partition for _ in 0..num_output_partitions { // Note that this operator uses unbounded channels to avoid deadlocks because @@ -105,13 +104,12 @@ impl ExecutionPlan for RepartitionExec { // reading output partitions in order rather than concurrently. One workaround // for this would be to add spill-to-disk capabilities. let (sender, receiver) = unbounded::>>(); - tx.push(sender); - rx.push(receiver); + channels.push((sender, receiver)); } // launch one async task per *input* partition for i in 0..num_input_partitions { let input = self.input.clone(); - let mut tx = tx.clone(); + let mut channels = channels.clone(); let partitioning = self.partitioning.clone(); let _: JoinHandle> = tokio::spawn(async move { let mut stream = input.execute(i).await?; @@ -120,7 +118,7 @@ impl ExecutionPlan for RepartitionExec { match partitioning { Partitioning::RoundRobinBatch(_) => { let output_partition = counter % num_output_partitions; - let tx = &mut tx[output_partition]; + let tx = &mut channels[output_partition].0; tx.send(Some(result)).map_err(|e| { DataFusionError::Execution(e.to_string()) })?; @@ -139,7 +137,7 @@ impl ExecutionPlan for RepartitionExec { // notify each output partition that this input partition has no more data for i in 0..num_output_partitions { - let tx = &mut tx[i]; + let tx = &mut channels[i].0; tx.send(None) .map_err(|e| DataFusionError::Execution(e.to_string()))?; } @@ -154,7 +152,7 @@ impl ExecutionPlan for RepartitionExec { num_input_partitions, num_input_partitions_processed: 0, schema: self.input.schema(), - input: rx[partition].clone(), + input: channels[partition].1.clone(), })) } } @@ -169,8 +167,7 @@ impl RepartitionExec { Partitioning::RoundRobinBatch(_) => Ok(RepartitionExec { input, partitioning, - tx: Arc::new(Mutex::new(vec![])), - rx: Arc::new(Mutex::new(vec![])), + channels: Arc::new(Mutex::new(vec![])), }), other => Err(DataFusionError::NotImplemented(format!( "Partitioning scheme not supported yet: {:?}", From 66aa0fc9a0a9a6377bf237243524f5d7456c4869 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 24 Dec 2020 08:49:13 -0700 Subject: [PATCH 20/21] Update rust/datafusion/src/physical_plan/repartition.rs Co-authored-by: Andrew Lamb --- rust/datafusion/src/physical_plan/repartition.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/rust/datafusion/src/physical_plan/repartition.rs b/rust/datafusion/src/physical_plan/repartition.rs index 254c511a8fb..3d5a785dbb3 100644 --- a/rust/datafusion/src/physical_plan/repartition.rs +++ b/rust/datafusion/src/physical_plan/repartition.rs @@ -48,6 +48,7 @@ pub struct RepartitionExec { /// Partitioning scheme to use partitioning: Partitioning, /// Channels for sending batches from input partitions to output partitions + /// there is one entry in this Vec for each output partition channels: Arc, Receiver)>>>, } From aabdf944aed08185349424d8c9b82e5abcdb3f89 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 24 Dec 2020 09:00:25 -0700 Subject: [PATCH 21/21] address feedback --- rust/datafusion/src/dataframe.rs | 1 - rust/datafusion/src/logical_plan/plan.rs | 6 ++++-- rust/datafusion/src/physical_plan/mod.rs | 8 +++++--- rust/datafusion/src/physical_plan/repartition.rs | 7 ++++--- 4 files changed, 13 insertions(+), 9 deletions(-) diff --git a/rust/datafusion/src/dataframe.rs b/rust/datafusion/src/dataframe.rs index 826f2c53a11..1e3c1f17442 100644 --- a/rust/datafusion/src/dataframe.rs +++ b/rust/datafusion/src/dataframe.rs @@ -183,7 +183,6 @@ pub trait DataFrame { /// let mut ctx = ExecutionContext::new(); /// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?; /// let df1 = df.repartition(Partitioning::RoundRobinBatch(4))?; - /// let df2 = df.repartition(Partitioning::Hash(vec![col("a")], 4))?; /// # Ok(()) /// # } /// ``` diff --git a/rust/datafusion/src/logical_plan/plan.rs b/rust/datafusion/src/logical_plan/plan.rs index 3710cfa6b80..b228402ca10 100644 --- a/rust/datafusion/src/logical_plan/plan.rs +++ b/rust/datafusion/src/logical_plan/plan.rs @@ -209,9 +209,11 @@ impl LogicalPlan { /// Logical partitioning schemes supported by the repartition operator. #[derive(Debug, Clone)] pub enum Partitioning { - /// Allocate batches using a round-robin algorithm + /// Allocate batches using a round-robin algorithm and the specified number of partitions RoundRobinBatch(usize), - /// Allocate rows based on a hash of one of more expressions + /// Allocate rows based on a hash of one of more expressions and the specified number + /// of partitions. + /// This partitioning scheme is not yet fully supported. See https://issues.apache.org/jira/browse/ARROW-11011 Hash(Vec, usize), } diff --git a/rust/datafusion/src/physical_plan/mod.rs b/rust/datafusion/src/physical_plan/mod.rs index 78550a721cf..d0dcede78d0 100644 --- a/rust/datafusion/src/physical_plan/mod.rs +++ b/rust/datafusion/src/physical_plan/mod.rs @@ -107,11 +107,13 @@ pub async fn collect(plan: Arc) -> Result> { /// Partitioning schemes supported by operators. #[derive(Debug, Clone)] pub enum Partitioning { - /// Allocate batches using a round-robin algorithm + /// Allocate batches using a round-robin algorithm and the specified number of partitions RoundRobinBatch(usize), - /// Allocate rows based on a hash of one of more expressions + /// Allocate rows based on a hash of one of more expressions and the specified + /// number of partitions + /// This partitioning scheme is not yet fully supported. See https://issues.apache.org/jira/browse/ARROW-11011 Hash(Vec>, usize), - /// Unknown partitioning scheme + /// Unknown partitioning scheme with a known number of partitions UnknownPartitioning(usize), } diff --git a/rust/datafusion/src/physical_plan/repartition.rs b/rust/datafusion/src/physical_plan/repartition.rs index 3d5a785dbb3..fb975034a32 100644 --- a/rust/datafusion/src/physical_plan/repartition.rs +++ b/rust/datafusion/src/physical_plan/repartition.rs @@ -40,7 +40,8 @@ use tokio::task::JoinHandle; type MaybeBatch = Option>; -/// partition. No guarantees are made about the order of the resulting partition. +/// The repartition operator maps N input partitions to M output partitions based on a +/// partitioning scheme. No guarantees are made about the order of the resulting partitions. #[derive(Debug)] pub struct RepartitionExec { /// Input execution plan @@ -312,11 +313,11 @@ mod tests { async fn repartition( schema: &SchemaRef, input_partitions: Vec>, - partitioning_scheme: Partitioning, + partitioning: Partitioning, ) -> Result>> { // create physical plan let exec = MemoryExec::try_new(&input_partitions, schema.clone(), None)?; - let exec = RepartitionExec::try_new(Arc::new(exec), partitioning_scheme)?; + let exec = RepartitionExec::try_new(Arc::new(exec), partitioning)?; // execute and collect results let mut output_partitions = vec![];