diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index 492782f534622..947a2adb08e04 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -40,7 +40,6 @@ use crate::{ physical_optimizer::{ aggregate_statistics::AggregateStatistics, hash_build_probe_order::HashBuildProbeOrder, optimizer::PhysicalOptimizerRule, - parallel_sort::ParallelSort, }, }; pub use datafusion_physical_expr::execution_props::ExecutionProps; @@ -1470,8 +1469,6 @@ impl SessionState { .unwrap(), ))); } - physical_optimizers.push(Arc::new(ParallelSort::new())); - physical_optimizers.push(Arc::new(Repartition::new())); physical_optimizers.push(Arc::new(AddCoalescePartitionsExec::new())); diff --git a/datafusion/core/src/physical_optimizer/parallel_sort.rs b/datafusion/core/src/physical_optimizer/parallel_sort.rs deleted file mode 100644 index e3ca60cb5cf72..0000000000000 --- a/datafusion/core/src/physical_optimizer/parallel_sort.rs +++ /dev/null @@ -1,93 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Parralel sort parallelizes sorts if a limit is present after a sort (`ORDER BY LIMIT N`) -use crate::{ - error::Result, - physical_optimizer::PhysicalOptimizerRule, - physical_plan::{ - sorts::{sort::SortExec, sort_preserving_merge::SortPreservingMergeExec}, - with_new_children_if_necessary, - }, -}; -use std::sync::Arc; - -/// Optimizer rule that makes sort parallel if a limit is used after sort (`ORDER BY LIMIT N`) -/// The plan will use `SortPreservingMergeExec` to merge the results -#[derive(Default)] -pub struct ParallelSort {} - -impl ParallelSort { - #[allow(missing_docs)] - pub fn new() -> Self { - Self {} - } -} -impl PhysicalOptimizerRule for ParallelSort { - fn optimize( - &self, - plan: Arc, - config: &crate::execution::context::SessionConfig, - ) -> Result> { - if plan.children().is_empty() { - // leaf node, children cannot be replaced - Ok(plan.clone()) - } else { - // recurse down first - let children = plan - .children() - .iter() - .map(|child| self.optimize(child.clone(), config)) - .collect::>>()?; - let plan = with_new_children_if_necessary(plan, children)?; - let plan_any = plan.as_any(); - // SortExec preserve_partitioning=False, fetch=Some(n)) - // -> SortPreservingMergeExec (SortExec preserve_partitioning=True, fetch=Some(n)) - let parallel_sort = plan_any.downcast_ref::().is_some() - && plan_any - .downcast_ref::() - .unwrap() - .fetch() - .is_some() - && !plan_any - .downcast_ref::() - .unwrap() - .preserve_partitioning(); - - Ok(if parallel_sort { - let sort = plan_any.downcast_ref::().unwrap(); - let new_sort = SortExec::new_with_partitioning( - sort.expr().to_vec(), - sort.input().clone(), - true, - sort.fetch(), - ); - let merge = SortPreservingMergeExec::new( - sort.expr().to_vec(), - Arc::new(new_sort), - ); - Arc::new(merge) - } else { - plan.clone() - }) - } - } - - fn name(&self) -> &str { - "parallel_sort" - } -} diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index 005a7943265aa..6dc1932a2b951 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -18,6 +18,7 @@ //! Physical query planner use super::analyze::AnalyzeExec; +use super::sorts::sort_preserving_merge::SortPreservingMergeExec; use super::{ aggregates, empty::EmptyExec, hash_join::PartitionMode, udaf, union::UnionExec, values::ValuesExec, windows, @@ -841,8 +842,22 @@ impl DefaultPhysicalPlanner { )), }) .collect::>>()?; - Ok(Arc::new(SortExec::try_new(sort_expr, physical_input, *fetch)?)) - } + // If we have a `LIMIT` can run sort/limts in parallel (similar to TopK) + Ok(if fetch.is_some() && session_state.config.target_partitions > 1 { + let sort = SortExec::new_with_partitioning( + sort_expr, + physical_input, + true, + *fetch, + ); + let merge = SortPreservingMergeExec::new( + sort.expr().to_vec(), + Arc::new(sort), + ); + Arc::new(merge) + } else { + Arc::new(SortExec::try_new(sort_expr, physical_input, *fetch)?) + }) } LogicalPlan::Join(Join { left, right, diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index 8e8457be2549a..fb2ad091900d3 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -38,7 +38,7 @@ use crate::physical_plan::{ RecordBatchStream, SendableRecordBatchStream, Statistics, }; use crate::prelude::SessionConfig; -use arrow::array::{make_array, Array, ArrayRef, MutableArrayData, UInt32Array}; +use arrow::array::{make_array, Array, ArrayRef, MutableArrayData}; pub use arrow::compute::SortOptions; use arrow::compute::{concat, lexsort_to_indices, take, SortColumn, TakeOptions}; use arrow::datatypes::SchemaRef; @@ -152,6 +152,7 @@ impl ExternalSorter { &self.expr, batch_size, tracking_metrics, + self.fetch, )?; let prev_used = self.free_all_memory(); streams.push(SortedStream::new(in_mem_stream, prev_used)); @@ -183,6 +184,7 @@ impl ExternalSorter { &self.expr, batch_size, tracking_metrics, + self.fetch, ); // Report to the memory manager we are no longer using memory self.free_all_memory(); @@ -273,6 +275,7 @@ impl MemoryConsumer for ExternalSorter { &self.expr, self.session_config.batch_size(), tracking_metrics, + self.fetch, ); spill_partial_sorted_stream(&mut stream?, spillfile.path(), self.schema.clone()) @@ -289,13 +292,14 @@ impl MemoryConsumer for ExternalSorter { } } -/// consume the non-empty `sorted_bathes` and do in_mem_sort +/// consume the non-empty `sorted_batches` and do in_mem_sort fn in_mem_partial_sort( buffered_batches: &mut Vec, schema: SchemaRef, expressions: &[PhysicalSortExpr], batch_size: usize, tracking_metrics: MemTrackingMetrics, + fetch: Option, ) -> Result { assert_ne!(buffered_batches.len(), 0); if buffered_batches.len() == 1 { @@ -323,7 +327,7 @@ fn in_mem_partial_sort( // NB timer records time taken on drop, so there are no // calls to `timer.done()` below. let _timer = tracking_metrics.elapsed_compute().timer(); - get_sorted_iter(&sorted_arrays, expressions, batch_size)? + get_sorted_iter(&sorted_arrays, expressions, batch_size, fetch)? }; Ok(Box::pin(SortedSizedRecordBatchStream::new( schema, @@ -345,6 +349,7 @@ fn get_sorted_iter( sort_arrays: &[Vec], expr: &[PhysicalSortExpr], batch_size: usize, + fetch: Option, ) -> Result { let row_indices = sort_arrays .iter() @@ -374,44 +379,38 @@ fn get_sorted_iter( }) }) .collect::>>()?; - let indices = lexsort_to_indices(&sort_columns, None)?; + let indices = lexsort_to_indices(&sort_columns, fetch)?; - Ok(SortedIterator::new(indices, row_indices, batch_size)) + // Calculate composite index based on sorted indices + let row_indices = indices + .values() + .iter() + .map(|i| row_indices[*i as usize]) + .collect(); + + Ok(SortedIterator::new(row_indices, batch_size)) } struct SortedIterator { /// Current logical position in the iterator pos: usize, - /// Indexes into the input representing the correctly sorted total output - indices: UInt32Array, - /// Map each each logical input index to where it can be found in the sorted input batches + /// Sorted composite index of where to find the rows in buffered batches composite: Vec, /// Maximum batch size to produce batch_size: usize, - /// total length of the iterator - length: usize, } impl SortedIterator { - fn new( - indices: UInt32Array, - composite: Vec, - batch_size: usize, - ) -> Self { - let length = composite.len(); + fn new(composite: Vec, batch_size: usize) -> Self { Self { pos: 0, - indices, composite, batch_size, - length, } } fn memory_size(&self) -> usize { - std::mem::size_of_val(self) - + self.indices.get_array_memory_size() - + std::mem::size_of_val(&self.composite[..]) + std::mem::size_of_val(self) + std::mem::size_of_val(&self.composite[..]) } } @@ -420,33 +419,25 @@ impl Iterator for SortedIterator { /// Emit a max of `batch_size` positions each time fn next(&mut self) -> Option { - if self.pos >= self.length { + let length = self.composite.len(); + if self.pos >= length { return None; } - let current_size = min(self.batch_size, self.length - self.pos); + let current_size = min(self.batch_size, length - self.pos); // Combine adjacent indexes from the same batch to make a slice, // for more efficient `extend` later. - let mut last_batch_idx = 0; - let mut indices_in_batch = vec![]; + let mut last_batch_idx = self.composite[self.pos].batch_idx; + let mut indices_in_batch = Vec::with_capacity(current_size); let mut slices = vec![]; - for i in 0..current_size { - let p = self.pos + i; - let c_index = self.indices.value(p) as usize; - let ci = self.composite[c_index]; - - if indices_in_batch.is_empty() { - last_batch_idx = ci.batch_idx; - indices_in_batch.push(ci.row_idx); - } else if ci.batch_idx == last_batch_idx { - indices_in_batch.push(ci.row_idx); - } else { + for ci in &self.composite[self.pos..self.pos + current_size] { + if ci.batch_idx != last_batch_idx { group_indices(last_batch_idx, &mut indices_in_batch, &mut slices); last_batch_idx = ci.batch_idx; - indices_in_batch.push(ci.row_idx); } + indices_in_batch.push(ci.row_idx); } assert!(