From 395c3764389dddfc2d77b613ebf72f7e33328cc8 Mon Sep 17 00:00:00 2001 From: Kazuyuki Tanimura Date: Thu, 8 Jun 2023 16:34:30 -0700 Subject: [PATCH 1/3] expose streaming_merge --- datafusion/core/src/physical_plan/sorts/merge.rs | 2 +- datafusion/core/src/physical_plan/sorts/mod.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/physical_plan/sorts/merge.rs b/datafusion/core/src/physical_plan/sorts/merge.rs index e191c044b9040..734c269c0cc61 100644 --- a/datafusion/core/src/physical_plan/sorts/merge.rs +++ b/datafusion/core/src/physical_plan/sorts/merge.rs @@ -52,7 +52,7 @@ macro_rules! merge_helper { } /// Perform a streaming merge of [`SendableRecordBatchStream`] -pub(crate) fn streaming_merge( +pub fn streaming_merge( streams: Vec, schema: SchemaRef, expressions: &[PhysicalSortExpr], diff --git a/datafusion/core/src/physical_plan/sorts/mod.rs b/datafusion/core/src/physical_plan/sorts/mod.rs index 567de96c1cfdf..dff39db423f0a 100644 --- a/datafusion/core/src/physical_plan/sorts/mod.rs +++ b/datafusion/core/src/physical_plan/sorts/mod.rs @@ -20,7 +20,7 @@ mod builder; mod cursor; mod index; -mod merge; +pub mod merge; pub mod sort; pub mod sort_preserving_merge; mod stream; From ab1fca79f1dff0512b97394476266c3d581aa1bb Mon Sep 17 00:00:00 2001 From: Kazuyuki Tanimura Date: Fri, 7 Jul 2023 11:12:13 -0700 Subject: [PATCH 2/3] address review comments --- datafusion/core/src/physical_plan/sorts/merge.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/datafusion/core/src/physical_plan/sorts/merge.rs b/datafusion/core/src/physical_plan/sorts/merge.rs index 734c269c0cc61..5427d0cddeac8 100644 --- a/datafusion/core/src/physical_plan/sorts/merge.rs +++ b/datafusion/core/src/physical_plan/sorts/merge.rs @@ -15,6 +15,9 @@ // specific language governing permissions and limitations // under the License. +//! Merge that deals with an arbitrary size of streaming inputs. +//! This is an order-preserving merge. + use crate::physical_plan::metrics::BaselineMetrics; use crate::physical_plan::sorts::builder::BatchBuilder; use crate::physical_plan::sorts::cursor::Cursor; @@ -51,7 +54,9 @@ macro_rules! merge_helper { }}; } -/// Perform a streaming merge of [`SendableRecordBatchStream`] +/// Perform a streaming merge of [`SendableRecordBatchStream`] based on provided sort expressions +/// while preserving order. This is a convenience wrapper for [`SortPreservingMergeStream`] and +/// chooses a right cursor for the expressions and the data type pub fn streaming_merge( streams: Vec, schema: SchemaRef, From cab8e0b2d8ffc99a7a821ea3995d50aeaad74bb8 Mon Sep 17 00:00:00 2001 From: Kazuyuki Tanimura Date: Fri, 7 Jul 2023 15:19:30 -0700 Subject: [PATCH 3/3] address review comments --- datafusion/core/src/physical_plan/sorts/merge.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/datafusion/core/src/physical_plan/sorts/merge.rs b/datafusion/core/src/physical_plan/sorts/merge.rs index 5427d0cddeac8..736df7dbe81a1 100644 --- a/datafusion/core/src/physical_plan/sorts/merge.rs +++ b/datafusion/core/src/physical_plan/sorts/merge.rs @@ -55,8 +55,7 @@ macro_rules! merge_helper { } /// Perform a streaming merge of [`SendableRecordBatchStream`] based on provided sort expressions -/// while preserving order. This is a convenience wrapper for [`SortPreservingMergeStream`] and -/// chooses a right cursor for the expressions and the data type +/// while preserving order. pub fn streaming_merge( streams: Vec, schema: SchemaRef,