From 7df0c12e0afaa6d88441a3c57e06e9cf8c1c132c Mon Sep 17 00:00:00 2001 From: Mike Seddon Date: Mon, 1 Mar 2021 11:34:05 +1100 Subject: [PATCH 1/7] add test for tokio tasks within tasks --- .../src/physical_plan/repartition.rs | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/rust/datafusion/src/physical_plan/repartition.rs b/rust/datafusion/src/physical_plan/repartition.rs index 94c3aab64e1..12b29cbeabe 100644 --- a/rust/datafusion/src/physical_plan/repartition.rs +++ b/rust/datafusion/src/physical_plan/repartition.rs @@ -415,4 +415,32 @@ mod tests { } Ok(output_partitions) } + + #[tokio::test(flavor = "multi_thread")] + async fn many_to_many_round_robin_within_tokio_task() -> Result<()> { + let join_handle: JoinHandle>>> = + tokio::spawn(async move { + // define input partitions + let schema = test_schema(); + let partition = create_vec_batches(&schema, 50); + let partitions = + vec![partition.clone(), partition.clone(), partition.clone()]; + + // repartition from 3 input to 5 output + repartition(&schema, partitions, Partitioning::RoundRobinBatch(5)).await + }); + + let output_partitions = join_handle + .await + .map_err(|e| DataFusionError::Internal(e.to_string()))??; + + assert_eq!(5, output_partitions.len()); + assert_eq!(30, output_partitions[0].len()); + assert_eq!(30, output_partitions[1].len()); + assert_eq!(30, output_partitions[2].len()); + assert_eq!(30, output_partitions[3].len()); + assert_eq!(30, output_partitions[4].len()); + + Ok(()) + } } From 3859b87d563c1e242ce69722f354d857ac852d5b Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 1 Mar 2021 06:27:16 -0500 Subject: [PATCH 2/7] Use single threaded test harness --- rust/datafusion/src/physical_plan/repartition.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/rust/datafusion/src/physical_plan/repartition.rs b/rust/datafusion/src/physical_plan/repartition.rs index 12b29cbeabe..427a6322855 100644 --- a/rust/datafusion/src/physical_plan/repartition.rs +++ b/rust/datafusion/src/physical_plan/repartition.rs @@ -287,7 +287,7 @@ mod tests { use arrow::datatypes::{DataType, Field, Schema}; use arrow::record_batch::RecordBatch; - #[tokio::test(flavor = "multi_thread")] + #[tokio::test] async fn one_to_many_round_robin() -> Result<()> { // define input partitions let schema = test_schema(); @@ -307,7 +307,7 @@ mod tests { Ok(()) } - #[tokio::test(flavor = "multi_thread")] + #[tokio::test] async fn many_to_one_round_robin() -> Result<()> { // define input partitions let schema = test_schema(); @@ -324,7 +324,7 @@ mod tests { Ok(()) } - #[tokio::test(flavor = "multi_thread")] + #[tokio::test] async fn many_to_many_round_robin() -> Result<()> { // define input partitions let schema = test_schema(); @@ -345,7 +345,7 @@ mod tests { Ok(()) } - #[tokio::test(flavor = "multi_thread")] + #[tokio::test] async fn many_to_many_hash_partition() -> Result<()> { // define input partitions let schema = test_schema(); @@ -416,7 +416,7 @@ mod tests { Ok(output_partitions) } - #[tokio::test(flavor = "multi_thread")] + #[tokio::test] async fn many_to_many_round_robin_within_tokio_task() -> Result<()> { let join_handle: JoinHandle>>> = tokio::spawn(async move { From 3b28871e8c4d02f2a08bcaa9cfcd3e6d0f34ec0a Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 1 Mar 2021 06:37:44 -0500 Subject: [PATCH 3/7] remove yield_now, causes deadlock --- rust/datafusion/src/physical_plan/repartition.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/rust/datafusion/src/physical_plan/repartition.rs b/rust/datafusion/src/physical_plan/repartition.rs index 427a6322855..8c0881eb32c 100644 --- a/rust/datafusion/src/physical_plan/repartition.rs +++ b/rust/datafusion/src/physical_plan/repartition.rs @@ -206,7 +206,6 @@ impl ExecutionPlan for RepartitionExec { } Ok(()) }); - tokio::task::yield_now().await; } } From 321ec55833e48e594e22ba939c169cf39c76d369 Mon Sep 17 00:00:00 2001 From: Ximo Guanter Date: Fri, 26 Feb 2021 18:00:34 +0100 Subject: [PATCH 4/7] remove crossbeam from repartition --- rust/datafusion/Cargo.toml | 1 + .../src/physical_plan/repartition.rs | 52 ++++++++++--------- 2 files changed, 28 insertions(+), 25 deletions(-) diff --git a/rust/datafusion/Cargo.toml b/rust/datafusion/Cargo.toml index 11cc63bbdc3..ef3f22cb25c 100644 --- a/rust/datafusion/Cargo.toml +++ b/rust/datafusion/Cargo.toml @@ -60,6 +60,7 @@ async-trait = "0.1.41" futures = "0.3" pin-project-lite= "^0.2.0" tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] } +tokio-stream = "0.1" log = "^0.4" md-5 = "^0.9.1" sha2 = "^0.9.1" diff --git a/rust/datafusion/src/physical_plan/repartition.rs b/rust/datafusion/src/physical_plan/repartition.rs index 8c0881eb32c..174a4b3ef24 100644 --- a/rust/datafusion/src/physical_plan/repartition.rs +++ b/rust/datafusion/src/physical_plan/repartition.rs @@ -18,24 +18,24 @@ //! The repartition operator maps N input partitions to M output partitions based on a //! partitioning scheme. +use std::{any::Any, collections::HashMap, vec}; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use std::{any::Any, vec}; use crate::error::{DataFusionError, Result}; use crate::physical_plan::{ExecutionPlan, Partitioning}; use arrow::record_batch::RecordBatch; use arrow::{array::Array, error::Result as ArrowResult}; use arrow::{compute::take, datatypes::SchemaRef}; +use tokio_stream::wrappers::UnboundedReceiverStream; use super::{hash_join::create_hashes, RecordBatchStream, SendableRecordBatchStream}; use async_trait::async_trait; -use crossbeam::channel::{unbounded, Receiver, Sender}; use futures::stream::Stream; use futures::StreamExt; -use tokio::sync::Mutex; +use tokio::sync::{Mutex, mpsc::{UnboundedReceiver, UnboundedSender}}; use tokio::task::JoinHandle; type MaybeBatch = Option>; @@ -48,9 +48,9 @@ pub struct RepartitionExec { input: Arc, /// Partitioning scheme to use partitioning: Partitioning, - /// Channels for sending batches from input partitions to output partitions - /// there is one entry in this Vec for each output partition - channels: Arc, Receiver)>>>, + /// Channels for sending batches from input partitions to output partitions. + /// Key is the partition number + channels: Arc, UnboundedReceiver)>>>, } impl RepartitionExec { @@ -110,15 +110,15 @@ impl ExecutionPlan for RepartitionExec { // if this is the first partition to be invoked then we need to set up initial state if channels.is_empty() { // create one channel per *output* partition - for _ in 0..num_output_partitions { + for partition in 0..num_output_partitions { // Note that this operator uses unbounded channels to avoid deadlocks because // the output partitions can be read in any order and this could cause input - // partitions to be blocked when sending data to output receivers that are not + // partitions to be blocked when sending data to output UnboundedReceivers that are not // being read yet. This may cause high memory usage if the next operator is // reading output partitions in order rather than concurrently. One workaround // for this would be to add spill-to-disk capabilities. - let (sender, receiver) = unbounded::>>(); - channels.push((sender, receiver)); + let (sender, receiver) = tokio::sync::mpsc::unbounded_channel::>>(); + channels.insert(partition, (sender, receiver)); } let random = ahash::RandomState::new(); @@ -126,7 +126,10 @@ impl ExecutionPlan for RepartitionExec { for i in 0..num_input_partitions { let random_state = random.clone(); let input = self.input.clone(); - let mut channels = channels.clone(); + let mut txs: HashMap<_, _> = channels + .iter() + .map(|(partition, (tx, _rx))| (*partition, tx.clone()) ) + .collect(); let partitioning = self.partitioning.clone(); let _: JoinHandle> = tokio::spawn(async move { let mut stream = input.execute(i).await?; @@ -135,7 +138,7 @@ impl ExecutionPlan for RepartitionExec { match &partitioning { Partitioning::RoundRobinBatch(_) => { let output_partition = counter % num_output_partitions; - let tx = &mut channels[output_partition].0; + let tx = txs.get_mut(&output_partition).unwrap(); tx.send(Some(result)).map_err(|e| { DataFusionError::Execution(e.to_string()) })?; @@ -180,7 +183,7 @@ impl ExecutionPlan for RepartitionExec { input_batch.schema(), columns, ); - let tx = &mut channels[num_output_partition].0; + let tx = txs.get_mut(&num_output_partition).unwrap(); tx.send(Some(output_batch)).map_err(|e| { DataFusionError::Execution(e.to_string()) })?; @@ -199,8 +202,7 @@ impl ExecutionPlan for RepartitionExec { } // notify each output partition that this input partition has no more data - for channel in channels.iter_mut().take(num_output_partitions) { - let tx = &mut channel.0; + for (_, tx) in txs { tx.send(None) .map_err(|e| DataFusionError::Execution(e.to_string()))?; } @@ -209,13 +211,14 @@ impl ExecutionPlan for RepartitionExec { } } + // now return stream for the specified *output* partition which will // read from the channel Ok(Box::pin(RepartitionStream { num_input_partitions, num_input_partitions_processed: 0, schema: self.input.schema(), - input: channels[partition].1.clone(), + input: UnboundedReceiverStream::new(channels.remove(&partition).unwrap().1), })) } } @@ -229,7 +232,7 @@ impl RepartitionExec { Ok(RepartitionExec { input, partitioning, - channels: Arc::new(Mutex::new(vec![])), + channels: Arc::new(Mutex::new(HashMap::new())), }) } } @@ -242,7 +245,7 @@ struct RepartitionStream { /// Schema schema: SchemaRef, /// channel containing the repartitioned batches - input: Receiver>>, + input: UnboundedReceiverStream>>, } impl Stream for RepartitionStream { @@ -252,10 +255,9 @@ impl Stream for RepartitionStream { mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - match self.input.recv() { - Ok(Some(batch)) => Poll::Ready(Some(batch)), - // End of results from one input partition - Ok(None) => { + match self.input.poll_next_unpin(cx) { + Poll::Ready(Some(Some(v))) => Poll::Ready(Some(v)), + Poll::Ready(Some(None)) => { self.num_input_partitions_processed += 1; if self.num_input_partitions == self.num_input_partitions_processed { // all input partitions have finished sending batches @@ -264,9 +266,9 @@ impl Stream for RepartitionStream { // other partitions still have data to send self.poll_next(cx) } - } - // RecvError means receiver has exited and closed the channel - Err(_) => Poll::Ready(None), + }, + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, } } } From 70111f3c7eb7702e34b1cc83c708e774d8fc3644 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 1 Mar 2021 06:45:21 -0500 Subject: [PATCH 5/7] cargo fmt --- .../src/physical_plan/repartition.rs | 22 +++++++++++++------ 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/rust/datafusion/src/physical_plan/repartition.rs b/rust/datafusion/src/physical_plan/repartition.rs index 174a4b3ef24..f62ae233f0e 100644 --- a/rust/datafusion/src/physical_plan/repartition.rs +++ b/rust/datafusion/src/physical_plan/repartition.rs @@ -18,10 +18,10 @@ //! The repartition operator maps N input partitions to M output partitions based on a //! partitioning scheme. -use std::{any::Any, collections::HashMap, vec}; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; +use std::{any::Any, collections::HashMap, vec}; use crate::error::{DataFusionError, Result}; use crate::physical_plan::{ExecutionPlan, Partitioning}; @@ -35,7 +35,10 @@ use async_trait::async_trait; use futures::stream::Stream; use futures::StreamExt; -use tokio::sync::{Mutex, mpsc::{UnboundedReceiver, UnboundedSender}}; +use tokio::sync::{ + mpsc::{UnboundedReceiver, UnboundedSender}, + Mutex, +}; use tokio::task::JoinHandle; type MaybeBatch = Option>; @@ -50,7 +53,11 @@ pub struct RepartitionExec { partitioning: Partitioning, /// Channels for sending batches from input partitions to output partitions. /// Key is the partition number - channels: Arc, UnboundedReceiver)>>>, + channels: Arc< + Mutex< + HashMap, UnboundedReceiver)>, + >, + >, } impl RepartitionExec { @@ -117,7 +124,9 @@ impl ExecutionPlan for RepartitionExec { // being read yet. This may cause high memory usage if the next operator is // reading output partitions in order rather than concurrently. One workaround // for this would be to add spill-to-disk capabilities. - let (sender, receiver) = tokio::sync::mpsc::unbounded_channel::>>(); + let (sender, receiver) = tokio::sync::mpsc::unbounded_channel::< + Option>, + >(); channels.insert(partition, (sender, receiver)); } let random = ahash::RandomState::new(); @@ -128,7 +137,7 @@ impl ExecutionPlan for RepartitionExec { let input = self.input.clone(); let mut txs: HashMap<_, _> = channels .iter() - .map(|(partition, (tx, _rx))| (*partition, tx.clone()) ) + .map(|(partition, (tx, _rx))| (*partition, tx.clone())) .collect(); let partitioning = self.partitioning.clone(); let _: JoinHandle> = tokio::spawn(async move { @@ -211,7 +220,6 @@ impl ExecutionPlan for RepartitionExec { } } - // now return stream for the specified *output* partition which will // read from the channel Ok(Box::pin(RepartitionStream { @@ -266,7 +274,7 @@ impl Stream for RepartitionStream { // other partitions still have data to send self.poll_next(cx) } - }, + } Poll::Ready(None) => Poll::Ready(None), Poll::Pending => Poll::Pending, } From c3f1435ba509f5e963dce407b527eec2209180ad Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 1 Mar 2021 07:41:45 -0500 Subject: [PATCH 6/7] Use tokio channels in parquet reader --- rust/datafusion/src/physical_plan/parquet.rs | 50 ++++++++++---------- 1 file changed, 24 insertions(+), 26 deletions(-) diff --git a/rust/datafusion/src/physical_plan/parquet.rs b/rust/datafusion/src/physical_plan/parquet.rs index 6ab26c2c9e0..348a924040a 100644 --- a/rust/datafusion/src/physical_plan/parquet.rs +++ b/rust/datafusion/src/physical_plan/parquet.rs @@ -30,8 +30,7 @@ use super::{ planner::DefaultPhysicalPlanner, ColumnarValue, PhysicalExpr, RecordBatchStream, SendableRecordBatchStream, }; -use crate::physical_plan::ExecutionPlan; -use crate::physical_plan::{common, Partitioning}; +use crate::physical_plan::{common, ExecutionPlan, Partitioning}; use crate::{ error::{DataFusionError, Result}, execution::context::ExecutionContextState, @@ -55,14 +54,17 @@ use parquet::file::{ statistics::Statistics as ParquetStatistics, }; -use crossbeam::channel::{bounded, Receiver, RecvError, Sender}; use fmt::Debug; use parquet::arrow::{ArrowReader, ParquetFileArrowReader}; -use tokio::task; +use tokio::{ + sync::mpsc::{channel, Receiver, Sender}, + task, +}; +use tokio_stream::wrappers::ReceiverStream; use crate::datasource::datasource::Statistics; use async_trait::async_trait; -use futures::stream::Stream; +use futures::stream::{Stream, StreamExt}; /// Execution plan for scanning one or more Parquet partitions #[derive(Debug, Clone)] @@ -773,9 +775,9 @@ impl ExecutionPlan for ParquetExec { // because the parquet implementation is not thread-safe, it is necessary to execute // on a thread and communicate with channels let (response_tx, response_rx): ( - Sender>>, - Receiver>>, - ) = bounded(2); + Sender>, + Receiver>, + ) = channel(2); let filenames = self.partitions[partition].filenames.clone(); let projection = self.projection.clone(); @@ -796,17 +798,18 @@ impl ExecutionPlan for ParquetExec { Ok(Box::pin(ParquetStream { schema: self.schema.clone(), - response_rx, + inner: ReceiverStream::new(response_rx), })) } } fn send_result( - response_tx: &Sender>>, - result: Option>, + response_tx: &Sender>, + result: ArrowResult, ) -> Result<()> { + // Note this function is running on its own blockng tokio thread so blocking here is ok. response_tx - .send(result) + .blocking_send(result) .map_err(|e| DataFusionError::Execution(e.to_string()))?; Ok(()) } @@ -816,7 +819,7 @@ fn read_files( projection: &[usize], predicate_builder: &Option, batch_size: usize, - response_tx: Sender>>, + response_tx: Sender>, ) -> Result<()> { for filename in filenames { let file = File::open(&filename)?; @@ -833,7 +836,7 @@ fn read_files( match batch_reader.next() { Some(Ok(batch)) => { //println!("ParquetExec got new batch from {}", filename); - send_result(&response_tx, Some(Ok(batch)))? + send_result(&response_tx, Ok(batch))? } None => { break; @@ -847,7 +850,7 @@ fn read_files( // send error to operator send_result( &response_tx, - Some(Err(ArrowError::ParquetError(err_msg.clone()))), + Err(ArrowError::ParquetError(err_msg.clone())), )?; // terminate thread with error return Err(DataFusionError::Execution(err_msg)); @@ -856,9 +859,8 @@ fn read_files( } } - // finished reading files - send_result(&response_tx, None)?; - + // finished reading files (dropping response_tx will close + // channel) Ok(()) } @@ -872,21 +874,17 @@ fn split_files(filenames: &[String], n: usize) -> Vec<&[String]> { struct ParquetStream { schema: SchemaRef, - response_rx: Receiver>>, + inner: ReceiverStream>, } impl Stream for ParquetStream { type Item = ArrowResult; fn poll_next( - self: std::pin::Pin<&mut Self>, - _: &mut Context<'_>, + mut self: std::pin::Pin<&mut Self>, + cx: &mut Context<'_>, ) -> Poll> { - match self.response_rx.recv() { - Ok(batch) => Poll::Ready(batch), - // RecvError means receiver has exited and closed the channel - Err(RecvError) => Poll::Ready(None), - } + self.inner.poll_next_unpin(cx) } } From 9b9af1f02a59ab4d6898c4e6c7920e297a404b7c Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 1 Mar 2021 07:42:16 -0500 Subject: [PATCH 7/7] Remove crossbeam entirely --- rust/datafusion/Cargo.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/rust/datafusion/Cargo.toml b/rust/datafusion/Cargo.toml index ef3f22cb25c..0080bf58ddf 100644 --- a/rust/datafusion/Cargo.toml +++ b/rust/datafusion/Cargo.toml @@ -52,7 +52,6 @@ parquet = { path = "../parquet", version = "4.0.0-SNAPSHOT", features = ["arrow" sqlparser = "0.8.0" clap = "2.33" rustyline = {version = "7.0", optional = true} -crossbeam = "0.8" paste = "^1.0" num_cpus = "1.13.0" chrono = "0.4"