diff --git a/rust/datafusion/Cargo.toml b/rust/datafusion/Cargo.toml index 11cc63bbdc3..0080bf58ddf 100644 --- a/rust/datafusion/Cargo.toml +++ b/rust/datafusion/Cargo.toml @@ -52,7 +52,6 @@ parquet = { path = "../parquet", version = "4.0.0-SNAPSHOT", features = ["arrow" sqlparser = "0.8.0" clap = "2.33" rustyline = {version = "7.0", optional = true} -crossbeam = "0.8" paste = "^1.0" num_cpus = "1.13.0" chrono = "0.4" @@ -60,6 +59,7 @@ async-trait = "0.1.41" futures = "0.3" pin-project-lite= "^0.2.0" tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] } +tokio-stream = "0.1" log = "^0.4" md-5 = "^0.9.1" sha2 = "^0.9.1" diff --git a/rust/datafusion/src/physical_plan/parquet.rs b/rust/datafusion/src/physical_plan/parquet.rs index 6ab26c2c9e0..348a924040a 100644 --- a/rust/datafusion/src/physical_plan/parquet.rs +++ b/rust/datafusion/src/physical_plan/parquet.rs @@ -30,8 +30,7 @@ use super::{ planner::DefaultPhysicalPlanner, ColumnarValue, PhysicalExpr, RecordBatchStream, SendableRecordBatchStream, }; -use crate::physical_plan::ExecutionPlan; -use crate::physical_plan::{common, Partitioning}; +use crate::physical_plan::{common, ExecutionPlan, Partitioning}; use crate::{ error::{DataFusionError, Result}, execution::context::ExecutionContextState, @@ -55,14 +54,17 @@ use parquet::file::{ statistics::Statistics as ParquetStatistics, }; -use crossbeam::channel::{bounded, Receiver, RecvError, Sender}; use fmt::Debug; use parquet::arrow::{ArrowReader, ParquetFileArrowReader}; -use tokio::task; +use tokio::{ + sync::mpsc::{channel, Receiver, Sender}, + task, +}; +use tokio_stream::wrappers::ReceiverStream; use crate::datasource::datasource::Statistics; use async_trait::async_trait; -use futures::stream::Stream; +use futures::stream::{Stream, StreamExt}; /// Execution plan for scanning one or more Parquet partitions #[derive(Debug, Clone)] @@ -773,9 +775,9 @@ impl ExecutionPlan for ParquetExec { // because the parquet implementation is not thread-safe, it is necessary to execute // on a thread and communicate with channels let (response_tx, response_rx): ( - Sender>>, - Receiver>>, - ) = bounded(2); + Sender>, + Receiver>, + ) = channel(2); let filenames = self.partitions[partition].filenames.clone(); let projection = self.projection.clone(); @@ -796,17 +798,18 @@ impl ExecutionPlan for ParquetExec { Ok(Box::pin(ParquetStream { schema: self.schema.clone(), - response_rx, + inner: ReceiverStream::new(response_rx), })) } } fn send_result( - response_tx: &Sender>>, - result: Option>, + response_tx: &Sender>, + result: ArrowResult, ) -> Result<()> { + // Note this function is running on its own blockng tokio thread so blocking here is ok. response_tx - .send(result) + .blocking_send(result) .map_err(|e| DataFusionError::Execution(e.to_string()))?; Ok(()) } @@ -816,7 +819,7 @@ fn read_files( projection: &[usize], predicate_builder: &Option, batch_size: usize, - response_tx: Sender>>, + response_tx: Sender>, ) -> Result<()> { for filename in filenames { let file = File::open(&filename)?; @@ -833,7 +836,7 @@ fn read_files( match batch_reader.next() { Some(Ok(batch)) => { //println!("ParquetExec got new batch from {}", filename); - send_result(&response_tx, Some(Ok(batch)))? + send_result(&response_tx, Ok(batch))? } None => { break; @@ -847,7 +850,7 @@ fn read_files( // send error to operator send_result( &response_tx, - Some(Err(ArrowError::ParquetError(err_msg.clone()))), + Err(ArrowError::ParquetError(err_msg.clone())), )?; // terminate thread with error return Err(DataFusionError::Execution(err_msg)); @@ -856,9 +859,8 @@ fn read_files( } } - // finished reading files - send_result(&response_tx, None)?; - + // finished reading files (dropping response_tx will close + // channel) Ok(()) } @@ -872,21 +874,17 @@ fn split_files(filenames: &[String], n: usize) -> Vec<&[String]> { struct ParquetStream { schema: SchemaRef, - response_rx: Receiver>>, + inner: ReceiverStream>, } impl Stream for ParquetStream { type Item = ArrowResult; fn poll_next( - self: std::pin::Pin<&mut Self>, - _: &mut Context<'_>, + mut self: std::pin::Pin<&mut Self>, + cx: &mut Context<'_>, ) -> Poll> { - match self.response_rx.recv() { - Ok(batch) => Poll::Ready(batch), - // RecvError means receiver has exited and closed the channel - Err(RecvError) => Poll::Ready(None), - } + self.inner.poll_next_unpin(cx) } } diff --git a/rust/datafusion/src/physical_plan/repartition.rs b/rust/datafusion/src/physical_plan/repartition.rs index 94c3aab64e1..f62ae233f0e 100644 --- a/rust/datafusion/src/physical_plan/repartition.rs +++ b/rust/datafusion/src/physical_plan/repartition.rs @@ -21,21 +21,24 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use std::{any::Any, vec}; +use std::{any::Any, collections::HashMap, vec}; use crate::error::{DataFusionError, Result}; use crate::physical_plan::{ExecutionPlan, Partitioning}; use arrow::record_batch::RecordBatch; use arrow::{array::Array, error::Result as ArrowResult}; use arrow::{compute::take, datatypes::SchemaRef}; +use tokio_stream::wrappers::UnboundedReceiverStream; use super::{hash_join::create_hashes, RecordBatchStream, SendableRecordBatchStream}; use async_trait::async_trait; -use crossbeam::channel::{unbounded, Receiver, Sender}; use futures::stream::Stream; use futures::StreamExt; -use tokio::sync::Mutex; +use tokio::sync::{ + mpsc::{UnboundedReceiver, UnboundedSender}, + Mutex, +}; use tokio::task::JoinHandle; type MaybeBatch = Option>; @@ -48,9 +51,13 @@ pub struct RepartitionExec { input: Arc, /// Partitioning scheme to use partitioning: Partitioning, - /// Channels for sending batches from input partitions to output partitions - /// there is one entry in this Vec for each output partition - channels: Arc, Receiver)>>>, + /// Channels for sending batches from input partitions to output partitions. + /// Key is the partition number + channels: Arc< + Mutex< + HashMap, UnboundedReceiver)>, + >, + >, } impl RepartitionExec { @@ -110,15 +117,17 @@ impl ExecutionPlan for RepartitionExec { // if this is the first partition to be invoked then we need to set up initial state if channels.is_empty() { // create one channel per *output* partition - for _ in 0..num_output_partitions { + for partition in 0..num_output_partitions { // Note that this operator uses unbounded channels to avoid deadlocks because // the output partitions can be read in any order and this could cause input - // partitions to be blocked when sending data to output receivers that are not + // partitions to be blocked when sending data to output UnboundedReceivers that are not // being read yet. This may cause high memory usage if the next operator is // reading output partitions in order rather than concurrently. One workaround // for this would be to add spill-to-disk capabilities. - let (sender, receiver) = unbounded::>>(); - channels.push((sender, receiver)); + let (sender, receiver) = tokio::sync::mpsc::unbounded_channel::< + Option>, + >(); + channels.insert(partition, (sender, receiver)); } let random = ahash::RandomState::new(); @@ -126,7 +135,10 @@ impl ExecutionPlan for RepartitionExec { for i in 0..num_input_partitions { let random_state = random.clone(); let input = self.input.clone(); - let mut channels = channels.clone(); + let mut txs: HashMap<_, _> = channels + .iter() + .map(|(partition, (tx, _rx))| (*partition, tx.clone())) + .collect(); let partitioning = self.partitioning.clone(); let _: JoinHandle> = tokio::spawn(async move { let mut stream = input.execute(i).await?; @@ -135,7 +147,7 @@ impl ExecutionPlan for RepartitionExec { match &partitioning { Partitioning::RoundRobinBatch(_) => { let output_partition = counter % num_output_partitions; - let tx = &mut channels[output_partition].0; + let tx = txs.get_mut(&output_partition).unwrap(); tx.send(Some(result)).map_err(|e| { DataFusionError::Execution(e.to_string()) })?; @@ -180,7 +192,7 @@ impl ExecutionPlan for RepartitionExec { input_batch.schema(), columns, ); - let tx = &mut channels[num_output_partition].0; + let tx = txs.get_mut(&num_output_partition).unwrap(); tx.send(Some(output_batch)).map_err(|e| { DataFusionError::Execution(e.to_string()) })?; @@ -199,14 +211,12 @@ impl ExecutionPlan for RepartitionExec { } // notify each output partition that this input partition has no more data - for channel in channels.iter_mut().take(num_output_partitions) { - let tx = &mut channel.0; + for (_, tx) in txs { tx.send(None) .map_err(|e| DataFusionError::Execution(e.to_string()))?; } Ok(()) }); - tokio::task::yield_now().await; } } @@ -216,7 +226,7 @@ impl ExecutionPlan for RepartitionExec { num_input_partitions, num_input_partitions_processed: 0, schema: self.input.schema(), - input: channels[partition].1.clone(), + input: UnboundedReceiverStream::new(channels.remove(&partition).unwrap().1), })) } } @@ -230,7 +240,7 @@ impl RepartitionExec { Ok(RepartitionExec { input, partitioning, - channels: Arc::new(Mutex::new(vec![])), + channels: Arc::new(Mutex::new(HashMap::new())), }) } } @@ -243,7 +253,7 @@ struct RepartitionStream { /// Schema schema: SchemaRef, /// channel containing the repartitioned batches - input: Receiver>>, + input: UnboundedReceiverStream>>, } impl Stream for RepartitionStream { @@ -253,10 +263,9 @@ impl Stream for RepartitionStream { mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - match self.input.recv() { - Ok(Some(batch)) => Poll::Ready(Some(batch)), - // End of results from one input partition - Ok(None) => { + match self.input.poll_next_unpin(cx) { + Poll::Ready(Some(Some(v))) => Poll::Ready(Some(v)), + Poll::Ready(Some(None)) => { self.num_input_partitions_processed += 1; if self.num_input_partitions == self.num_input_partitions_processed { // all input partitions have finished sending batches @@ -266,8 +275,8 @@ impl Stream for RepartitionStream { self.poll_next(cx) } } - // RecvError means receiver has exited and closed the channel - Err(_) => Poll::Ready(None), + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, } } } @@ -287,7 +296,7 @@ mod tests { use arrow::datatypes::{DataType, Field, Schema}; use arrow::record_batch::RecordBatch; - #[tokio::test(flavor = "multi_thread")] + #[tokio::test] async fn one_to_many_round_robin() -> Result<()> { // define input partitions let schema = test_schema(); @@ -307,7 +316,7 @@ mod tests { Ok(()) } - #[tokio::test(flavor = "multi_thread")] + #[tokio::test] async fn many_to_one_round_robin() -> Result<()> { // define input partitions let schema = test_schema(); @@ -324,7 +333,7 @@ mod tests { Ok(()) } - #[tokio::test(flavor = "multi_thread")] + #[tokio::test] async fn many_to_many_round_robin() -> Result<()> { // define input partitions let schema = test_schema(); @@ -345,7 +354,7 @@ mod tests { Ok(()) } - #[tokio::test(flavor = "multi_thread")] + #[tokio::test] async fn many_to_many_hash_partition() -> Result<()> { // define input partitions let schema = test_schema(); @@ -415,4 +424,32 @@ mod tests { } Ok(output_partitions) } + + #[tokio::test] + async fn many_to_many_round_robin_within_tokio_task() -> Result<()> { + let join_handle: JoinHandle>>> = + tokio::spawn(async move { + // define input partitions + let schema = test_schema(); + let partition = create_vec_batches(&schema, 50); + let partitions = + vec![partition.clone(), partition.clone(), partition.clone()]; + + // repartition from 3 input to 5 output + repartition(&schema, partitions, Partitioning::RoundRobinBatch(5)).await + }); + + let output_partitions = join_handle + .await + .map_err(|e| DataFusionError::Internal(e.to_string()))??; + + assert_eq!(5, output_partitions.len()); + assert_eq!(30, output_partitions[0].len()); + assert_eq!(30, output_partitions[1].len()); + assert_eq!(30, output_partitions[2].len()); + assert_eq!(30, output_partitions[3].len()); + assert_eq!(30, output_partitions[4].len()); + + Ok(()) + } }