From de12806c0c7b95a036b2d56bb1ae8f9b0ef803b7 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sun, 18 Sep 2022 16:08:07 +0200 Subject: [PATCH 1/7] Parallel sort --- .../core/benches/aggregate_query_sql.rs | 10 ++++++++++ datafusion/core/src/physical_plan/planner.rs | 17 ++++++++++++++-- .../core/src/physical_plan/sorts/sort.rs | 20 +++---------------- datafusion/core/tests/sql/explain_analyze.rs | 6 +++--- 4 files changed, 31 insertions(+), 22 deletions(-) diff --git a/datafusion/core/benches/aggregate_query_sql.rs b/datafusion/core/benches/aggregate_query_sql.rs index 3734cfbe313c1..a0a0ee6c00492 100644 --- a/datafusion/core/benches/aggregate_query_sql.rs +++ b/datafusion/core/benches/aggregate_query_sql.rs @@ -62,6 +62,16 @@ fn criterion_benchmark(c: &mut Criterion) { }) }); + c.bench_function("aggregate_query_no_group_by_count_distinct_utf8", |b| { + b.iter(|| { + query( + ctx.clone(), + "SELECT COUNT(DISTINCT utf8) \ + FROM t", + ) + }) + }); + c.bench_function("aggregate_query_no_group_by_min_max_f64", |b| { b.iter(|| { query( diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index c875900596259..54967b875e556 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -18,6 +18,7 @@ //! Physical query planner use super::analyze::AnalyzeExec; +use super::sorts::sort_preserving_merge::SortPreservingMergeExec; use super::{ aggregates, empty::EmptyExec, hash_join::PartitionMode, udaf, union::UnionExec, values::ValuesExec, windows, @@ -841,8 +842,20 @@ impl DefaultPhysicalPlanner { )), }) .collect::>>()?; - Ok(Arc::new(SortExec::try_new(sort_expr, physical_input)?) ) - } + if session_state.config.target_partitions > 1 || + physical_input.output_partitioning().partition_count() > 1 { + // We benefit from executing the sort in parallel and merging the result later + let sort = Arc::new(SortExec::new_with_partitioning(sort_expr.clone(), physical_input, true)); + Ok(Arc::new( + SortPreservingMergeExec::new( + sort_expr, + sort + ) + )) + } else { + Ok(Arc::new(SortExec::try_new(sort_expr, physical_input)?) ) + } + } LogicalPlan::Join(Join { left, right, diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index 64312327b4835..50ff8c069013f 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -741,10 +741,11 @@ impl ExecutionPlan for SortExec { self: Arc, children: Vec>, ) -> Result> { - Ok(Arc::new(SortExec::try_new( + Ok(Arc::new(SortExec::new_with_partitioning( self.expr.clone(), children[0].clone(), - )?)) + self.preserve_partitioning, + ))) } fn execute( @@ -753,21 +754,6 @@ impl ExecutionPlan for SortExec { context: Arc, ) -> Result { debug!("Start SortExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id()); - if !self.preserve_partitioning { - if 0 != partition { - return Err(DataFusionError::Internal(format!( - "SortExec invalid partition {}", - partition - ))); - } - - // sort needs to operate on a single partition currently - if 1 != self.input.output_partitioning().partition_count() { - return Err(DataFusionError::Internal( - "SortExec requires a single input partition".to_owned(), - )); - } - } debug!( "Start invoking SortExec's input.execute for partition: {}", diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs index 7f465c4c697fc..c82b2fa18ab78 100644 --- a/datafusion/core/tests/sql/explain_analyze.rs +++ b/datafusion/core/tests/sql/explain_analyze.rs @@ -94,7 +94,7 @@ async fn explain_analyze_baseline_metrics() { ); assert_metrics!( &formatted, - "CoalescePartitionsExec", + "SortPreservingMergeExec:", "metrics=[output_rows=5, elapsed_compute=" ); assert_metrics!( @@ -686,8 +686,8 @@ async fn test_physical_plan_display_indent() { let physical_plan = ctx.create_physical_plan(&plan).await.unwrap(); let expected = vec![ "GlobalLimitExec: skip=0, fetch=10", - " SortExec: [the_min@2 DESC]", - " CoalescePartitionsExec", + " SortPreservingMergeExec: [the_min@2 DESC]", + " SortExec: [the_min@2 DESC]", " ProjectionExec: expr=[c1@0 as c1, MAX(aggregate_test_100.c12)@1 as MAX(aggregate_test_100.c12), MIN(aggregate_test_100.c12)@2 as the_min]", " AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[MAX(aggregate_test_100.c12), MIN(aggregate_test_100.c12)]", " CoalesceBatchesExec: target_batch_size=4096", From 2bf15f8e349def1ad901019b39b491874c0092fa Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sun, 18 Sep 2022 16:58:14 +0200 Subject: [PATCH 2/7] Move it to optimization rule --- datafusion/core/src/execution/context.rs | 3 +++ datafusion/core/src/physical_optimizer/mod.rs | 1 + datafusion/core/src/physical_plan/planner.rs | 17 ++--------------- 3 files changed, 6 insertions(+), 15 deletions(-) diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index cb0e16f545ea2..06a533f394df1 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -40,6 +40,7 @@ use crate::{ physical_optimizer::{ aggregate_statistics::AggregateStatistics, hash_build_probe_order::HashBuildProbeOrder, optimizer::PhysicalOptimizerRule, + parallel_sort::ParallelSort, }, }; pub use datafusion_physical_expr::execution_props::ExecutionProps; @@ -1469,6 +1470,8 @@ impl SessionState { .unwrap(), ))); } + physical_optimizers.push(Arc::new(ParallelSort::new())); + physical_optimizers.push(Arc::new(Repartition::new())); physical_optimizers.push(Arc::new(AddCoalescePartitionsExec::new())); diff --git a/datafusion/core/src/physical_optimizer/mod.rs b/datafusion/core/src/physical_optimizer/mod.rs index 55550bcd2cffc..82b7087ab2c55 100644 --- a/datafusion/core/src/physical_optimizer/mod.rs +++ b/datafusion/core/src/physical_optimizer/mod.rs @@ -23,6 +23,7 @@ pub mod coalesce_batches; pub mod hash_build_probe_order; pub mod merge_exec; pub mod optimizer; +pub mod parallel_sort; pub mod pruning; pub mod repartition; mod utils; diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index 54967b875e556..de10490ce7310 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -18,7 +18,6 @@ //! Physical query planner use super::analyze::AnalyzeExec; -use super::sorts::sort_preserving_merge::SortPreservingMergeExec; use super::{ aggregates, empty::EmptyExec, hash_join::PartitionMode, udaf, union::UnionExec, values::ValuesExec, windows, @@ -842,20 +841,8 @@ impl DefaultPhysicalPlanner { )), }) .collect::>>()?; - if session_state.config.target_partitions > 1 || - physical_input.output_partitioning().partition_count() > 1 { - // We benefit from executing the sort in parallel and merging the result later - let sort = Arc::new(SortExec::new_with_partitioning(sort_expr.clone(), physical_input, true)); - Ok(Arc::new( - SortPreservingMergeExec::new( - sort_expr, - sort - ) - )) - } else { - Ok(Arc::new(SortExec::try_new(sort_expr, physical_input)?) ) - } - } + Ok(Arc::new(SortExec::try_new(sort_expr, physical_input)?)) + } LogicalPlan::Join(Join { left, right, From a1ab9bdd05b53779cbd5847f4a675ce96aaa9ac0 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sun, 18 Sep 2022 16:58:47 +0200 Subject: [PATCH 3/7] Add rule --- .../src/physical_optimizer/parallel_sort.rs | 87 +++++++++++++++++++ 1 file changed, 87 insertions(+) create mode 100644 datafusion/core/src/physical_optimizer/parallel_sort.rs diff --git a/datafusion/core/src/physical_optimizer/parallel_sort.rs b/datafusion/core/src/physical_optimizer/parallel_sort.rs new file mode 100644 index 0000000000000..a43c4c384cd55 --- /dev/null +++ b/datafusion/core/src/physical_optimizer/parallel_sort.rs @@ -0,0 +1,87 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! TopK Sort parallelizes sort operations + +use crate::{ + error::Result, + physical_optimizer::PhysicalOptimizerRule, + physical_plan::{ + limit::GlobalLimitExec, + sorts::{sort::SortExec, sort_preserving_merge::SortPreservingMergeExec}, + with_new_children_if_necessary, + }, +}; +use std::sync::Arc; + +/// Optimizer rule that makes sort parallel if limit is used after sort +#[derive(Default)] +pub struct ParallelSort {} + +impl ParallelSort { + #[allow(missing_docs)] + pub fn new() -> Self { + Self {} + } +} +impl PhysicalOptimizerRule for ParallelSort { + fn optimize( + &self, + plan: Arc, + config: &crate::execution::context::SessionConfig, + ) -> Result> { + if plan.children().is_empty() { + // leaf node, children cannot be replaced + Ok(plan.clone()) + } else { + // recurse down first + let children = plan + .children() + .iter() + .map(|child| self.optimize(child.clone(), config)) + .collect::>>()?; + let plan = with_new_children_if_necessary(plan, children)?; + let children = plan.children(); + let plan_any = plan.as_any(); + // GlobalLimitExec (SortExec) + // -> GlobalLimitExec (SortExec) + let parallel_sort = plan_any.downcast_ref::().is_some() + && children.len() == 1 + && children[0].as_any().downcast_ref::().is_some(); + + Ok(if parallel_sort { + let sort = children[0].as_any().downcast_ref::().unwrap(); + let new_sort = SortExec::new_with_partitioning( + sort.expr().to_vec(), + sort.input().clone(), + true, + ); + let merge = SortPreservingMergeExec::new( + sort.expr().to_vec(), + Arc::new(new_sort), + ); + with_new_children_if_necessary(plan, vec![Arc::new(merge)])? + } else { + plan.clone() + }) + } + } + + fn name(&self) -> &str { + "parallel_sort" + } +} From 6c854cc1f077a62e3f4935b5a401214168085b22 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sun, 18 Sep 2022 17:09:53 +0200 Subject: [PATCH 4/7] Improve rule --- .../src/physical_optimizer/parallel_sort.rs | 17 +++++++++++------ datafusion/core/src/physical_plan/sorts/sort.rs | 5 +++++ datafusion/core/tests/sql/explain_analyze.rs | 2 +- 3 files changed, 17 insertions(+), 7 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/parallel_sort.rs b/datafusion/core/src/physical_optimizer/parallel_sort.rs index a43c4c384cd55..3361d8155f7fe 100644 --- a/datafusion/core/src/physical_optimizer/parallel_sort.rs +++ b/datafusion/core/src/physical_optimizer/parallel_sort.rs @@ -15,8 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! TopK Sort parallelizes sort operations - +//! Parralel sort parallelizes sorts if a limit is present after a sort (`ORDER BY LIMIT N`) use crate::{ error::Result, physical_optimizer::PhysicalOptimizerRule, @@ -28,7 +27,8 @@ use crate::{ }; use std::sync::Arc; -/// Optimizer rule that makes sort parallel if limit is used after sort +/// Optimizer rule that makes sort parallel if a limit is used after sort (`ORDER BY LIMIT N`) +/// The plan will use `SortPreservingMergeExec` to merge the results #[derive(Default)] pub struct ParallelSort {} @@ -57,11 +57,16 @@ impl PhysicalOptimizerRule for ParallelSort { let plan = with_new_children_if_necessary(plan, children)?; let children = plan.children(); let plan_any = plan.as_any(); - // GlobalLimitExec (SortExec) - // -> GlobalLimitExec (SortExec) + // GlobalLimitExec (SortExec preserve_partitioning=False) + // -> GlobalLimitExec (SortExec preserve_partitioning=True) let parallel_sort = plan_any.downcast_ref::().is_some() && children.len() == 1 - && children[0].as_any().downcast_ref::().is_some(); + && children[0].as_any().downcast_ref::().is_some() + && !children[0] + .as_any() + .downcast_ref::() + .unwrap() + .preserve_partitioning(); Ok(if parallel_sort { let sort = children[0].as_any().downcast_ref::().unwrap(); diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index 50ff8c069013f..104d5092b7217 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -668,6 +668,11 @@ impl SortExec { Ok(Self::new_with_partitioning(expr, input, false)) } + /// Create a new sort execution plan with the option to preserve + pub fn preserve_partitioning(&self) -> bool { + self.preserve_partitioning + } + /// Create a new sort execution plan with the option to preserve /// the partitioning of the input plan pub fn new_with_partitioning( diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs index c82b2fa18ab78..a75e0e3fa515f 100644 --- a/datafusion/core/tests/sql/explain_analyze.rs +++ b/datafusion/core/tests/sql/explain_analyze.rs @@ -94,7 +94,7 @@ async fn explain_analyze_baseline_metrics() { ); assert_metrics!( &formatted, - "SortPreservingMergeExec:", + "CoalescePartitionsExec", "metrics=[output_rows=5, elapsed_compute=" ); assert_metrics!( From e57717d58fb67a2b7f054cc82cacbf91a0a18d1f Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sun, 18 Sep 2022 17:19:35 +0200 Subject: [PATCH 5/7] Remove bench --- datafusion/core/benches/aggregate_query_sql.rs | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/datafusion/core/benches/aggregate_query_sql.rs b/datafusion/core/benches/aggregate_query_sql.rs index a0a0ee6c00492..3734cfbe313c1 100644 --- a/datafusion/core/benches/aggregate_query_sql.rs +++ b/datafusion/core/benches/aggregate_query_sql.rs @@ -62,16 +62,6 @@ fn criterion_benchmark(c: &mut Criterion) { }) }); - c.bench_function("aggregate_query_no_group_by_count_distinct_utf8", |b| { - b.iter(|| { - query( - ctx.clone(), - "SELECT COUNT(DISTINCT utf8) \ - FROM t", - ) - }) - }); - c.bench_function("aggregate_query_no_group_by_min_max_f64", |b| { b.iter(|| { query( From e460d955d4637d9962b2040120a2d9442fc78f2c Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sun, 18 Sep 2022 17:21:08 +0200 Subject: [PATCH 6/7] Fix doc --- datafusion/core/src/physical_plan/sorts/sort.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index 104d5092b7217..cc05017853244 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -668,7 +668,7 @@ impl SortExec { Ok(Self::new_with_partitioning(expr, input, false)) } - /// Create a new sort execution plan with the option to preserve + /// Whether this `SortExec` preserves partitioning of the children pub fn preserve_partitioning(&self) -> bool { self.preserve_partitioning } From 830f0f3b7122233bf05b1d880c3c55f400902f33 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sun, 18 Sep 2022 17:21:50 +0200 Subject: [PATCH 7/7] Fix indent --- datafusion/core/src/physical_plan/planner.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index de10490ce7310..20a819622d5dd 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -841,7 +841,7 @@ impl DefaultPhysicalPlanner { )), }) .collect::>>()?; - Ok(Arc::new(SortExec::try_new(sort_expr, physical_input)?)) + Ok(Arc::new(SortExec::try_new(sort_expr, physical_input)?)) } LogicalPlan::Join(Join { left,