From ee5193346921571f0d57fd1bcc265491bd38234d Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Thu, 21 Apr 2022 19:02:57 +0100 Subject: [PATCH 1/3] Don't sort batches during plan --- .../core/src/physical_plan/sorts/sort.rs | 84 ++++++++++++++++--- 1 file changed, 71 insertions(+), 13 deletions(-) diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index 8edd20a8b7a85..81862b752d202 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -42,12 +42,13 @@ use arrow::array::{make_array, Array, ArrayRef, MutableArrayData, UInt32Array}; pub use arrow::compute::SortOptions; use arrow::compute::{concat, lexsort_to_indices, take, SortColumn, TakeOptions}; use arrow::datatypes::SchemaRef; -use arrow::error::Result as ArrowResult; +use arrow::error::{ArrowError, Result as ArrowResult}; use arrow::ipc::reader::FileReader; use arrow::record_batch::RecordBatch; use async_trait::async_trait; +use futures::future::BoxFuture; use futures::lock::Mutex; -use futures::{Stream, StreamExt}; +use futures::{ready, FutureExt, Stream, StreamExt}; use log::{debug, error}; use std::any::Any; use std::cmp::min; @@ -56,6 +57,7 @@ use std::fmt::{Debug, Formatter}; use std::fs::File; use std::io::BufReader; use std::path::{Path, PathBuf}; +use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; use tempfile::NamedTempFile; @@ -779,17 +781,19 @@ impl ExecutionPlan for SortExec { debug!("End SortExec's input.execute for partition: {}", partition); - let result = do_sort( - input, - partition, - self.expr.clone(), - self.metrics_set.clone(), - context, - ) - .await; - - debug!("End SortExec::execute for partition {}", partition); - result + Ok(Box::pin(SortExecStream { + schema: self.schema(), + state: SortStreamState::Fetch( + do_sort( + input, + partition, + self.expr.clone(), + self.metrics_set.clone(), + context, + ) + .boxed(), + ), + })) } fn metrics(&self) -> Option { @@ -871,6 +875,60 @@ fn sort_batch( }) } +#[derive(Debug)] +struct SortExecStream { + schema: SchemaRef, + state: SortStreamState, +} + +enum SortStreamState { + Fetch(BoxFuture<'static, Result>), + Stream(SendableRecordBatchStream), + Error, +} + +impl std::fmt::Debug for SortStreamState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> fmt::Result { + match self { + SortStreamState::Fetch(_) => write!(f, "SortStreamState::Fetch"), + SortStreamState::Stream(_) => write!(f, "SortStreamState::Stream"), + SortStreamState::Error => write!(f, "SortStreamState::Error"), + } + } +} + +impl Stream for SortExecStream { + type Item = ArrowResult; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let this = &mut *self; + loop { + match &mut this.state { + SortStreamState::Fetch(fut) => match ready!(fut.poll_unpin(cx)) { + Ok(stream) => this.state = SortStreamState::Stream(stream), + Err(e) => { + this.state = SortStreamState::Error; + return Poll::Ready(Some(Err(ArrowError::ExternalError( + Box::new(e), + )))); + } + }, + SortStreamState::Stream(stream) => return stream.poll_next_unpin(cx), + SortStreamState::Error => return Poll::Pending, + } + } + } +} + +impl RecordBatchStream for SortExecStream { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} + async fn do_sort( mut input: SendableRecordBatchStream, partition_id: usize, From d441c7d77ed8fac362d655518082a3dddfc73f5e Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Thu, 21 Apr 2022 19:14:22 +0100 Subject: [PATCH 2/3] Use BoxStream instead --- .../core/src/physical_plan/sorts/sort.rs | 74 +++---------------- datafusion/core/src/physical_plan/stream.rs | 43 +++++++++++ 2 files changed, 53 insertions(+), 64 deletions(-) diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index 81862b752d202..609faa92e29d2 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -32,7 +32,7 @@ use crate::physical_plan::metrics::{ }; use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeStream; use crate::physical_plan::sorts::SortedStream; -use crate::physical_plan::stream::RecordBatchReceiverStream; +use crate::physical_plan::stream::{RecordBatchBoxStream, RecordBatchReceiverStream}; use crate::physical_plan::{ DisplayFormatType, Distribution, EmptyRecordBatchStream, ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream, Statistics, @@ -46,9 +46,8 @@ use arrow::error::{ArrowError, Result as ArrowResult}; use arrow::ipc::reader::FileReader; use arrow::record_batch::RecordBatch; use async_trait::async_trait; -use futures::future::BoxFuture; use futures::lock::Mutex; -use futures::{ready, FutureExt, Stream, StreamExt}; +use futures::{Stream, StreamExt, TryFutureExt, TryStreamExt}; use log::{debug, error}; use std::any::Any; use std::cmp::min; @@ -57,7 +56,6 @@ use std::fmt::{Debug, Formatter}; use std::fs::File; use std::io::BufReader; use std::path::{Path, PathBuf}; -use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; use tempfile::NamedTempFile; @@ -781,9 +779,9 @@ impl ExecutionPlan for SortExec { debug!("End SortExec's input.execute for partition: {}", partition); - Ok(Box::pin(SortExecStream { - schema: self.schema(), - state: SortStreamState::Fetch( + Ok(Box::pin(RecordBatchBoxStream::new( + self.schema(), + futures::stream::once( do_sort( input, partition, @@ -791,9 +789,11 @@ impl ExecutionPlan for SortExec { self.metrics_set.clone(), context, ) - .boxed(), - ), - })) + .map_err(|e| ArrowError::ExternalError(Box::new(e))), + ) + .try_flatten() + .boxed(), + ))) } fn metrics(&self) -> Option { @@ -875,60 +875,6 @@ fn sort_batch( }) } -#[derive(Debug)] -struct SortExecStream { - schema: SchemaRef, - state: SortStreamState, -} - -enum SortStreamState { - Fetch(BoxFuture<'static, Result>), - Stream(SendableRecordBatchStream), - Error, -} - -impl std::fmt::Debug for SortStreamState { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> fmt::Result { - match self { - SortStreamState::Fetch(_) => write!(f, "SortStreamState::Fetch"), - SortStreamState::Stream(_) => write!(f, "SortStreamState::Stream"), - SortStreamState::Error => write!(f, "SortStreamState::Error"), - } - } -} - -impl Stream for SortExecStream { - type Item = ArrowResult; - - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - let this = &mut *self; - loop { - match &mut this.state { - SortStreamState::Fetch(fut) => match ready!(fut.poll_unpin(cx)) { - Ok(stream) => this.state = SortStreamState::Stream(stream), - Err(e) => { - this.state = SortStreamState::Error; - return Poll::Ready(Some(Err(ArrowError::ExternalError( - Box::new(e), - )))); - } - }, - SortStreamState::Stream(stream) => return stream.poll_next_unpin(cx), - SortStreamState::Error => return Poll::Pending, - } - } - } -} - -impl RecordBatchStream for SortExecStream { - fn schema(&self) -> SchemaRef { - self.schema.clone() - } -} - async fn do_sort( mut input: SendableRecordBatchStream, partition_id: usize, diff --git a/datafusion/core/src/physical_plan/stream.rs b/datafusion/core/src/physical_plan/stream.rs index 67b7090406901..b8a7906364962 100644 --- a/datafusion/core/src/physical_plan/stream.rs +++ b/datafusion/core/src/physical_plan/stream.rs @@ -20,6 +20,7 @@ use arrow::{ datatypes::SchemaRef, error::Result as ArrowResult, record_batch::RecordBatch, }; +use futures::stream::BoxStream; use futures::{Stream, StreamExt}; use tokio::task::JoinHandle; use tokio_stream::wrappers::ReceiverStream; @@ -73,3 +74,45 @@ impl RecordBatchStream for RecordBatchReceiverStream { self.schema.clone() } } + +/// Combines a [`BoxStream`] with [`SchemaRef`] implementing +/// [`RecordBatchStream`] for the combination +pub(crate) struct RecordBatchBoxStream { + schema: SchemaRef, + stream: BoxStream<'static, ArrowResult>, +} + +impl RecordBatchBoxStream { + /// Creates a new [`RecordBatchBoxStream`] from the provided schema and stream + pub(crate) fn new( + schema: SchemaRef, + stream: BoxStream<'static, ArrowResult>, + ) -> Self { + Self { schema, stream } + } +} + +impl std::fmt::Debug for RecordBatchBoxStream { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RecordBatchBoxStream") + .field("schema", &self.schema) + .finish() + } +} + +impl Stream for RecordBatchBoxStream { + type Item = ArrowResult; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.stream.poll_next_unpin(cx) + } +} + +impl RecordBatchStream for RecordBatchBoxStream { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} From b709905edbb733dda0ef9dbdd3b5859febd36f69 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Thu, 21 Apr 2022 19:30:54 +0100 Subject: [PATCH 3/3] Eliminate double boxing of stream --- .../core/src/physical_plan/sorts/sort.rs | 7 ++- datafusion/core/src/physical_plan/stream.rs | 43 +++++++++++-------- 2 files changed, 28 insertions(+), 22 deletions(-) diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index 609faa92e29d2..b6e662a6e90a3 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -32,7 +32,7 @@ use crate::physical_plan::metrics::{ }; use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeStream; use crate::physical_plan::sorts::SortedStream; -use crate::physical_plan::stream::{RecordBatchBoxStream, RecordBatchReceiverStream}; +use crate::physical_plan::stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter}; use crate::physical_plan::{ DisplayFormatType, Distribution, EmptyRecordBatchStream, ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream, Statistics, @@ -779,7 +779,7 @@ impl ExecutionPlan for SortExec { debug!("End SortExec's input.execute for partition: {}", partition); - Ok(Box::pin(RecordBatchBoxStream::new( + Ok(Box::pin(RecordBatchStreamAdapter::new( self.schema(), futures::stream::once( do_sort( @@ -791,8 +791,7 @@ impl ExecutionPlan for SortExec { ) .map_err(|e| ArrowError::ExternalError(Box::new(e))), ) - .try_flatten() - .boxed(), + .try_flatten(), ))) } diff --git a/datafusion/core/src/physical_plan/stream.rs b/datafusion/core/src/physical_plan/stream.rs index b8a7906364962..99209121ffa7b 100644 --- a/datafusion/core/src/physical_plan/stream.rs +++ b/datafusion/core/src/physical_plan/stream.rs @@ -20,8 +20,8 @@ use arrow::{ datatypes::SchemaRef, error::Result as ArrowResult, record_batch::RecordBatch, }; -use futures::stream::BoxStream; use futures::{Stream, StreamExt}; +use pin_project_lite::pin_project; use tokio::task::JoinHandle; use tokio_stream::wrappers::ReceiverStream; @@ -75,43 +75,50 @@ impl RecordBatchStream for RecordBatchReceiverStream { } } -/// Combines a [`BoxStream`] with [`SchemaRef`] implementing -/// [`RecordBatchStream`] for the combination -pub(crate) struct RecordBatchBoxStream { - schema: SchemaRef, - stream: BoxStream<'static, ArrowResult>, +pin_project! { + /// Combines a [`Stream`] with a [`SchemaRef`] implementing + /// [`RecordBatchStream`] for the combination + pub(crate) struct RecordBatchStreamAdapter { + schema: SchemaRef, + + #[pin] + stream: S, + } } -impl RecordBatchBoxStream { - /// Creates a new [`RecordBatchBoxStream`] from the provided schema and stream - pub(crate) fn new( - schema: SchemaRef, - stream: BoxStream<'static, ArrowResult>, - ) -> Self { +impl RecordBatchStreamAdapter { + /// Creates a new [`RecordBatchStreamAdapter`] from the provided schema and stream + pub(crate) fn new(schema: SchemaRef, stream: S) -> Self { Self { schema, stream } } } -impl std::fmt::Debug for RecordBatchBoxStream { +impl std::fmt::Debug for RecordBatchStreamAdapter { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("RecordBatchBoxStream") + f.debug_struct("RecordBatchStreamAdapter") .field("schema", &self.schema) .finish() } } -impl Stream for RecordBatchBoxStream { +impl Stream for RecordBatchStreamAdapter +where + S: Stream>, +{ type Item = ArrowResult; fn poll_next( - mut self: std::pin::Pin<&mut Self>, + self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { - self.stream.poll_next_unpin(cx) + self.project().stream.poll_next(cx) } } -impl RecordBatchStream for RecordBatchBoxStream { +impl RecordBatchStream for RecordBatchStreamAdapter +where + S: Stream>, +{ fn schema(&self) -> SchemaRef { self.schema.clone() }