From c42bb7bee968985f3141759c05aac5beccc48b0b Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Tue, 20 Sep 2022 16:51:32 +0200 Subject: [PATCH 1/9] Add fetch, fix length --- datafusion/core/src/physical_plan/sorts/sort.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index 8e8457be2549a..71eb85bd14c29 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -152,6 +152,7 @@ impl ExternalSorter { &self.expr, batch_size, tracking_metrics, + self.fetch, )?; let prev_used = self.free_all_memory(); streams.push(SortedStream::new(in_mem_stream, prev_used)); @@ -183,6 +184,7 @@ impl ExternalSorter { &self.expr, batch_size, tracking_metrics, + self.fetch, ); // Report to the memory manager we are no longer using memory self.free_all_memory(); @@ -273,6 +275,7 @@ impl MemoryConsumer for ExternalSorter { &self.expr, self.session_config.batch_size(), tracking_metrics, + self.fetch, ); spill_partial_sorted_stream(&mut stream?, spillfile.path(), self.schema.clone()) @@ -289,13 +292,14 @@ impl MemoryConsumer for ExternalSorter { } } -/// consume the non-empty `sorted_bathes` and do in_mem_sort +/// consume the non-empty `sorted_batches` and do in_mem_sort fn in_mem_partial_sort( buffered_batches: &mut Vec, schema: SchemaRef, expressions: &[PhysicalSortExpr], batch_size: usize, tracking_metrics: MemTrackingMetrics, + fetch: Option, ) -> Result { assert_ne!(buffered_batches.len(), 0); if buffered_batches.len() == 1 { @@ -323,7 +327,7 @@ fn in_mem_partial_sort( // NB timer records time taken on drop, so there are no // calls to `timer.done()` below. let _timer = tracking_metrics.elapsed_compute().timer(); - get_sorted_iter(&sorted_arrays, expressions, batch_size)? + get_sorted_iter(&sorted_arrays, expressions, batch_size, fetch)? }; Ok(Box::pin(SortedSizedRecordBatchStream::new( schema, @@ -345,6 +349,7 @@ fn get_sorted_iter( sort_arrays: &[Vec], expr: &[PhysicalSortExpr], batch_size: usize, + fetch: Option, ) -> Result { let row_indices = sort_arrays .iter() From 1b4a7d9c8b121abcc02c86ef911c7f7b2de47457 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Tue, 20 Sep 2022 16:53:48 +0200 Subject: [PATCH 2/9] Add fetch, fix length --- datafusion/core/src/physical_plan/sorts/sort.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index 71eb85bd14c29..27fd92c2a5ba5 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -379,7 +379,7 @@ fn get_sorted_iter( }) }) .collect::>>()?; - let indices = lexsort_to_indices(&sort_columns, None)?; + let indices = lexsort_to_indices(&sort_columns, fetch)?; Ok(SortedIterator::new(indices, row_indices, batch_size)) } @@ -403,7 +403,7 @@ impl SortedIterator { composite: Vec, batch_size: usize, ) -> Self { - let length = composite.len(); + let length = indices.len(); Self { pos: 0, indices, From b5b68121c75f5ce349589717e24548c5e88bfcf3 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Tue, 20 Sep 2022 17:17:15 +0200 Subject: [PATCH 3/9] Simplify implementation a bit --- .../core/src/physical_plan/sorts/sort.rs | 36 ++++++++----------- 1 file changed, 15 insertions(+), 21 deletions(-) diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index 27fd92c2a5ba5..e5122b154bd06 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -38,7 +38,7 @@ use crate::physical_plan::{ RecordBatchStream, SendableRecordBatchStream, Statistics, }; use crate::prelude::SessionConfig; -use arrow::array::{make_array, Array, ArrayRef, MutableArrayData, UInt32Array}; +use arrow::array::{make_array, Array, ArrayRef, MutableArrayData}; pub use arrow::compute::SortOptions; use arrow::compute::{concat, lexsort_to_indices, take, SortColumn, TakeOptions}; use arrow::datatypes::SchemaRef; @@ -381,42 +381,36 @@ fn get_sorted_iter( .collect::>>()?; let indices = lexsort_to_indices(&sort_columns, fetch)?; - Ok(SortedIterator::new(indices, row_indices, batch_size)) + //Only maintain composite indexes + let row_indices = indices + .values() + .into_iter() + .map(|i| row_indices[*i as usize]) + .collect(); + + Ok(SortedIterator::new(row_indices, batch_size)) } struct SortedIterator { /// Current logical position in the iterator pos: usize, - /// Indexes into the input representing the correctly sorted total output - indices: UInt32Array, /// Map each each logical input index to where it can be found in the sorted input batches composite: Vec, /// Maximum batch size to produce batch_size: usize, - /// total length of the iterator - length: usize, } impl SortedIterator { - fn new( - indices: UInt32Array, - composite: Vec, - batch_size: usize, - ) -> Self { - let length = indices.len(); + fn new(composite: Vec, batch_size: usize) -> Self { Self { pos: 0, - indices, composite, batch_size, - length, } } fn memory_size(&self) -> usize { - std::mem::size_of_val(self) - + self.indices.get_array_memory_size() - + std::mem::size_of_val(&self.composite[..]) + std::mem::size_of_val(self) + std::mem::size_of_val(&self.composite[..]) } } @@ -425,11 +419,12 @@ impl Iterator for SortedIterator { /// Emit a max of `batch_size` positions each time fn next(&mut self) -> Option { - if self.pos >= self.length { + let length = self.composite.len(); + if self.pos >= length { return None; } - let current_size = min(self.batch_size, self.length - self.pos); + let current_size = min(self.batch_size, length - self.pos); // Combine adjacent indexes from the same batch to make a slice, // for more efficient `extend` later. @@ -439,8 +434,7 @@ impl Iterator for SortedIterator { let mut slices = vec![]; for i in 0..current_size { let p = self.pos + i; - let c_index = self.indices.value(p) as usize; - let ci = self.composite[c_index]; + let ci = self.composite[p]; if indices_in_batch.is_empty() { last_batch_idx = ci.batch_idx; From 6157d322ebc1d3d58e5d9b4ea8aa49bfbfdb72a3 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Tue, 20 Sep 2022 17:43:14 +0200 Subject: [PATCH 4/9] Simplify --- datafusion/core/src/physical_plan/sorts/sort.rs | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index e5122b154bd06..b926f7a99d667 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -381,7 +381,7 @@ fn get_sorted_iter( .collect::>>()?; let indices = lexsort_to_indices(&sort_columns, fetch)?; - //Only maintain composite indexes + // Calculate composite index based on sorted indices let row_indices = indices .values() .into_iter() @@ -394,7 +394,7 @@ fn get_sorted_iter( struct SortedIterator { /// Current logical position in the iterator pos: usize, - /// Map each each logical input index to where it can be found in the sorted input batches + /// Sorted composite index of where to find the batch composite: Vec, /// Maximum batch size to produce batch_size: usize, @@ -429,22 +429,17 @@ impl Iterator for SortedIterator { // Combine adjacent indexes from the same batch to make a slice, // for more efficient `extend` later. let mut last_batch_idx = 0; - let mut indices_in_batch = vec![]; + let mut indices_in_batch = Vec::with_capacity(current_size); let mut slices = vec![]; - for i in 0..current_size { - let p = self.pos + i; - let ci = self.composite[p]; + for ci in &self.composite[self.pos..self.pos + current_size] { + indices_in_batch.push(ci.row_idx); if indices_in_batch.is_empty() { last_batch_idx = ci.batch_idx; - indices_in_batch.push(ci.row_idx); - } else if ci.batch_idx == last_batch_idx { - indices_in_batch.push(ci.row_idx); - } else { + } else if ci.batch_idx != last_batch_idx { group_indices(last_batch_idx, &mut indices_in_batch, &mut slices); last_batch_idx = ci.batch_idx; - indices_in_batch.push(ci.row_idx); } } From f3bb7427791df81eab5b2aa070381d881a32c452 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Tue, 20 Sep 2022 17:46:39 +0200 Subject: [PATCH 5/9] Doc --- datafusion/core/src/physical_plan/sorts/sort.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index b926f7a99d667..9cc3b76400702 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -394,7 +394,7 @@ fn get_sorted_iter( struct SortedIterator { /// Current logical position in the iterator pos: usize, - /// Sorted composite index of where to find the batch + /// Sorted composite index of where to find the rows in buffered batches composite: Vec, /// Maximum batch size to produce batch_size: usize, From 1a1b8e5d83ee8eb3fbf46623e7ab529b85784898 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Tue, 20 Sep 2022 18:37:12 +0200 Subject: [PATCH 6/9] Reorder --- datafusion/core/src/physical_plan/sorts/sort.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index 9cc3b76400702..9c8d22c635e70 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -384,7 +384,7 @@ fn get_sorted_iter( // Calculate composite index based on sorted indices let row_indices = indices .values() - .into_iter() + .iter() .map(|i| row_indices[*i as usize]) .collect(); @@ -433,14 +433,13 @@ impl Iterator for SortedIterator { let mut slices = vec![]; for ci in &self.composite[self.pos..self.pos + current_size] { - indices_in_batch.push(ci.row_idx); - if indices_in_batch.is_empty() { last_batch_idx = ci.batch_idx; } else if ci.batch_idx != last_batch_idx { group_indices(last_batch_idx, &mut indices_in_batch, &mut slices); last_batch_idx = ci.batch_idx; } + indices_in_batch.push(ci.row_idx); } assert!( From 6de4a217ddbbb443beb208d27e2fc5c2da68f527 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Wed, 21 Sep 2022 17:37:08 +0200 Subject: [PATCH 7/9] Move parallel sort to planner --- datafusion/core/src/execution/context.rs | 3 - .../src/physical_optimizer/parallel_sort.rs | 93 ------------------- datafusion/core/src/physical_plan/planner.rs | 18 +++- 3 files changed, 16 insertions(+), 98 deletions(-) delete mode 100644 datafusion/core/src/physical_optimizer/parallel_sort.rs diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index 492782f534622..947a2adb08e04 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -40,7 +40,6 @@ use crate::{ physical_optimizer::{ aggregate_statistics::AggregateStatistics, hash_build_probe_order::HashBuildProbeOrder, optimizer::PhysicalOptimizerRule, - parallel_sort::ParallelSort, }, }; pub use datafusion_physical_expr::execution_props::ExecutionProps; @@ -1470,8 +1469,6 @@ impl SessionState { .unwrap(), ))); } - physical_optimizers.push(Arc::new(ParallelSort::new())); - physical_optimizers.push(Arc::new(Repartition::new())); physical_optimizers.push(Arc::new(AddCoalescePartitionsExec::new())); diff --git a/datafusion/core/src/physical_optimizer/parallel_sort.rs b/datafusion/core/src/physical_optimizer/parallel_sort.rs deleted file mode 100644 index e3ca60cb5cf72..0000000000000 --- a/datafusion/core/src/physical_optimizer/parallel_sort.rs +++ /dev/null @@ -1,93 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Parralel sort parallelizes sorts if a limit is present after a sort (`ORDER BY LIMIT N`) -use crate::{ - error::Result, - physical_optimizer::PhysicalOptimizerRule, - physical_plan::{ - sorts::{sort::SortExec, sort_preserving_merge::SortPreservingMergeExec}, - with_new_children_if_necessary, - }, -}; -use std::sync::Arc; - -/// Optimizer rule that makes sort parallel if a limit is used after sort (`ORDER BY LIMIT N`) -/// The plan will use `SortPreservingMergeExec` to merge the results -#[derive(Default)] -pub struct ParallelSort {} - -impl ParallelSort { - #[allow(missing_docs)] - pub fn new() -> Self { - Self {} - } -} -impl PhysicalOptimizerRule for ParallelSort { - fn optimize( - &self, - plan: Arc, - config: &crate::execution::context::SessionConfig, - ) -> Result> { - if plan.children().is_empty() { - // leaf node, children cannot be replaced - Ok(plan.clone()) - } else { - // recurse down first - let children = plan - .children() - .iter() - .map(|child| self.optimize(child.clone(), config)) - .collect::>>()?; - let plan = with_new_children_if_necessary(plan, children)?; - let plan_any = plan.as_any(); - // SortExec preserve_partitioning=False, fetch=Some(n)) - // -> SortPreservingMergeExec (SortExec preserve_partitioning=True, fetch=Some(n)) - let parallel_sort = plan_any.downcast_ref::().is_some() - && plan_any - .downcast_ref::() - .unwrap() - .fetch() - .is_some() - && !plan_any - .downcast_ref::() - .unwrap() - .preserve_partitioning(); - - Ok(if parallel_sort { - let sort = plan_any.downcast_ref::().unwrap(); - let new_sort = SortExec::new_with_partitioning( - sort.expr().to_vec(), - sort.input().clone(), - true, - sort.fetch(), - ); - let merge = SortPreservingMergeExec::new( - sort.expr().to_vec(), - Arc::new(new_sort), - ); - Arc::new(merge) - } else { - plan.clone() - }) - } - } - - fn name(&self) -> &str { - "parallel_sort" - } -} diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index 005a7943265aa..e31868ec095c6 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -18,6 +18,7 @@ //! Physical query planner use super::analyze::AnalyzeExec; +use super::sorts::sort_preserving_merge::SortPreservingMergeExec; use super::{ aggregates, empty::EmptyExec, hash_join::PartitionMode, udaf, union::UnionExec, values::ValuesExec, windows, @@ -841,8 +842,21 @@ impl DefaultPhysicalPlanner { )), }) .collect::>>()?; - Ok(Arc::new(SortExec::try_new(sort_expr, physical_input, *fetch)?)) - } + Ok(if fetch.is_some() && session_state.config.target_partitions > 1 { + let sort = SortExec::new_with_partitioning( + sort_expr, + physical_input, + true, + *fetch, + ); + let merge = SortPreservingMergeExec::new( + sort.expr().to_vec(), + Arc::new(sort), + ); + Arc::new(merge) + } else { + Arc::new(SortExec::try_new(sort_expr, physical_input, *fetch)?) + }) } LogicalPlan::Join(Join { left, right, From c6bb97c25394099c7eaa8adb23352f04a48322ad Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Wed, 21 Sep 2022 17:43:08 +0200 Subject: [PATCH 8/9] Simplify a bit more --- datafusion/core/src/physical_plan/sorts/sort.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index 9c8d22c635e70..fb2ad091900d3 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -428,14 +428,12 @@ impl Iterator for SortedIterator { // Combine adjacent indexes from the same batch to make a slice, // for more efficient `extend` later. - let mut last_batch_idx = 0; + let mut last_batch_idx = self.composite[self.pos].batch_idx; let mut indices_in_batch = Vec::with_capacity(current_size); let mut slices = vec![]; for ci in &self.composite[self.pos..self.pos + current_size] { - if indices_in_batch.is_empty() { - last_batch_idx = ci.batch_idx; - } else if ci.batch_idx != last_batch_idx { + if ci.batch_idx != last_batch_idx { group_indices(last_batch_idx, &mut indices_in_batch, &mut slices); last_batch_idx = ci.batch_idx; } From b30dd8b3feb55b2fea2545adccab23fb75286461 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Wed, 21 Sep 2022 18:58:20 +0200 Subject: [PATCH 9/9] Update datafusion/core/src/physical_plan/planner.rs Co-authored-by: Andrew Lamb --- datafusion/core/src/physical_plan/planner.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index e31868ec095c6..6dc1932a2b951 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -842,6 +842,7 @@ impl DefaultPhysicalPlanner { )), }) .collect::>>()?; + // If we have a `LIMIT` can run sort/limts in parallel (similar to TopK) Ok(if fetch.is_some() && session_state.config.target_partitions > 1 { let sort = SortExec::new_with_partitioning( sort_expr,