diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index cb0e16f545ea2..06a533f394df1 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -40,6 +40,7 @@ use crate::{ physical_optimizer::{ aggregate_statistics::AggregateStatistics, hash_build_probe_order::HashBuildProbeOrder, optimizer::PhysicalOptimizerRule, + parallel_sort::ParallelSort, }, }; pub use datafusion_physical_expr::execution_props::ExecutionProps; @@ -1469,6 +1470,8 @@ impl SessionState { .unwrap(), ))); } + physical_optimizers.push(Arc::new(ParallelSort::new())); + physical_optimizers.push(Arc::new(Repartition::new())); physical_optimizers.push(Arc::new(AddCoalescePartitionsExec::new())); diff --git a/datafusion/core/src/physical_optimizer/mod.rs b/datafusion/core/src/physical_optimizer/mod.rs index 55550bcd2cffc..82b7087ab2c55 100644 --- a/datafusion/core/src/physical_optimizer/mod.rs +++ b/datafusion/core/src/physical_optimizer/mod.rs @@ -23,6 +23,7 @@ pub mod coalesce_batches; pub mod hash_build_probe_order; pub mod merge_exec; pub mod optimizer; +pub mod parallel_sort; pub mod pruning; pub mod repartition; mod utils; diff --git a/datafusion/core/src/physical_optimizer/parallel_sort.rs b/datafusion/core/src/physical_optimizer/parallel_sort.rs new file mode 100644 index 0000000000000..3361d8155f7fe --- /dev/null +++ b/datafusion/core/src/physical_optimizer/parallel_sort.rs @@ -0,0 +1,92 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Parralel sort parallelizes sorts if a limit is present after a sort (`ORDER BY LIMIT N`) +use crate::{ + error::Result, + physical_optimizer::PhysicalOptimizerRule, + physical_plan::{ + limit::GlobalLimitExec, + sorts::{sort::SortExec, sort_preserving_merge::SortPreservingMergeExec}, + with_new_children_if_necessary, + }, +}; +use std::sync::Arc; + +/// Optimizer rule that makes sort parallel if a limit is used after sort (`ORDER BY LIMIT N`) +/// The plan will use `SortPreservingMergeExec` to merge the results +#[derive(Default)] +pub struct ParallelSort {} + +impl ParallelSort { + #[allow(missing_docs)] + pub fn new() -> Self { + Self {} + } +} +impl PhysicalOptimizerRule for ParallelSort { + fn optimize( + &self, + plan: Arc, + config: &crate::execution::context::SessionConfig, + ) -> Result> { + if plan.children().is_empty() { + // leaf node, children cannot be replaced + Ok(plan.clone()) + } else { + // recurse down first + let children = plan + .children() + .iter() + .map(|child| self.optimize(child.clone(), config)) + .collect::>>()?; + let plan = with_new_children_if_necessary(plan, children)?; + let children = plan.children(); + let plan_any = plan.as_any(); + // GlobalLimitExec (SortExec preserve_partitioning=False) + // -> GlobalLimitExec (SortExec preserve_partitioning=True) + let parallel_sort = plan_any.downcast_ref::().is_some() + && children.len() == 1 + && children[0].as_any().downcast_ref::().is_some() + && !children[0] + .as_any() + .downcast_ref::() + .unwrap() + .preserve_partitioning(); + + Ok(if parallel_sort { + let sort = children[0].as_any().downcast_ref::().unwrap(); + let new_sort = SortExec::new_with_partitioning( + sort.expr().to_vec(), + sort.input().clone(), + true, + ); + let merge = SortPreservingMergeExec::new( + sort.expr().to_vec(), + Arc::new(new_sort), + ); + with_new_children_if_necessary(plan, vec![Arc::new(merge)])? + } else { + plan.clone() + }) + } + } + + fn name(&self) -> &str { + "parallel_sort" + } +} diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index c875900596259..20a819622d5dd 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -841,7 +841,7 @@ impl DefaultPhysicalPlanner { )), }) .collect::>>()?; - Ok(Arc::new(SortExec::try_new(sort_expr, physical_input)?) ) + Ok(Arc::new(SortExec::try_new(sort_expr, physical_input)?)) } LogicalPlan::Join(Join { left, diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index 64312327b4835..cc05017853244 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -668,6 +668,11 @@ impl SortExec { Ok(Self::new_with_partitioning(expr, input, false)) } + /// Whether this `SortExec` preserves partitioning of the children + pub fn preserve_partitioning(&self) -> bool { + self.preserve_partitioning + } + /// Create a new sort execution plan with the option to preserve /// the partitioning of the input plan pub fn new_with_partitioning( @@ -741,10 +746,11 @@ impl ExecutionPlan for SortExec { self: Arc, children: Vec>, ) -> Result> { - Ok(Arc::new(SortExec::try_new( + Ok(Arc::new(SortExec::new_with_partitioning( self.expr.clone(), children[0].clone(), - )?)) + self.preserve_partitioning, + ))) } fn execute( @@ -753,21 +759,6 @@ impl ExecutionPlan for SortExec { context: Arc, ) -> Result { debug!("Start SortExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id()); - if !self.preserve_partitioning { - if 0 != partition { - return Err(DataFusionError::Internal(format!( - "SortExec invalid partition {}", - partition - ))); - } - - // sort needs to operate on a single partition currently - if 1 != self.input.output_partitioning().partition_count() { - return Err(DataFusionError::Internal( - "SortExec requires a single input partition".to_owned(), - )); - } - } debug!( "Start invoking SortExec's input.execute for partition: {}", diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs index 7f465c4c697fc..a75e0e3fa515f 100644 --- a/datafusion/core/tests/sql/explain_analyze.rs +++ b/datafusion/core/tests/sql/explain_analyze.rs @@ -686,8 +686,8 @@ async fn test_physical_plan_display_indent() { let physical_plan = ctx.create_physical_plan(&plan).await.unwrap(); let expected = vec![ "GlobalLimitExec: skip=0, fetch=10", - " SortExec: [the_min@2 DESC]", - " CoalescePartitionsExec", + " SortPreservingMergeExec: [the_min@2 DESC]", + " SortExec: [the_min@2 DESC]", " ProjectionExec: expr=[c1@0 as c1, MAX(aggregate_test_100.c12)@1 as MAX(aggregate_test_100.c12), MIN(aggregate_test_100.c12)@2 as the_min]", " AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[MAX(aggregate_test_100.c12), MIN(aggregate_test_100.c12)]", " CoalesceBatchesExec: target_batch_size=4096",