From ad51f3f1b350cea2e29ff3453091dcc90def4b98 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Wed, 31 Mar 2021 00:30:36 +0200 Subject: [PATCH 01/29] WIP --- rust/datafusion/src/optimizer/mod.rs | 2 +- rust/datafusion/src/optimizer/optimizer.rs | 14 +++++++++++++- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/rust/datafusion/src/optimizer/mod.rs b/rust/datafusion/src/optimizer/mod.rs index 87dc62da2a3..373d89487d2 100644 --- a/rust/datafusion/src/optimizer/mod.rs +++ b/rust/datafusion/src/optimizer/mod.rs @@ -24,5 +24,5 @@ pub mod hash_build_probe_order; pub mod limit_push_down; pub mod optimizer; pub mod projection_push_down; - +pub mod repartition; pub mod utils; diff --git a/rust/datafusion/src/optimizer/optimizer.rs b/rust/datafusion/src/optimizer/optimizer.rs index dee8e06a5e3..277e94b4394 100644 --- a/rust/datafusion/src/optimizer/optimizer.rs +++ b/rust/datafusion/src/optimizer/optimizer.rs @@ -16,9 +16,10 @@ // under the License. //! Query optimizer traits +use std::sync::Arc; -use crate::error::Result; use crate::logical_plan::LogicalPlan; +use crate::{error::Result, physical_plan::ExecutionPlan}; /// `OptimizerRule` transforms one ['LogicalPlan'] into another which /// computes the same results, but in a potentially more efficient @@ -30,3 +31,14 @@ pub trait OptimizerRule { /// A human readable name for this optimizer rule fn name(&self) -> &str; } + +/// `PhysicalOptimizerRule` transforms one ['ExecutionPlan'] into another which +/// computes the same results, but in a potentially more efficient +/// way. +pub trait PhysicalOptimizerRule { + /// Rewrite `plan` to an optimized form + fn optimize(&self, plan: Arc) -> Result>; + + /// A human readable name for this optimizer rule + fn name(&self) -> &str; +} From 0cfa2c6174d2fb216a63cb16a4bc96ad883c9b50 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Wed, 31 Mar 2021 00:31:20 +0200 Subject: [PATCH 02/29] WIP --- rust/datafusion/src/optimizer/repartition.rs | 50 ++++++++++++++++++++ 1 file changed, 50 insertions(+) create mode 100644 rust/datafusion/src/optimizer/repartition.rs diff --git a/rust/datafusion/src/optimizer/repartition.rs b/rust/datafusion/src/optimizer/repartition.rs new file mode 100644 index 00000000000..f9da85c2d76 --- /dev/null +++ b/rust/datafusion/src/optimizer/repartition.rs @@ -0,0 +1,50 @@ +use std::sync::Arc; + +use crate::error::Result; +use crate::physical_plan::{repartition::RepartitionExec, ExecutionPlan}; + +use super::optimizer::PhysicalOptimizerRule; +use crate::physical_plan::Partitioning::*; + +/// Optimizer that introduces repartition to introduce more parallelism in the plan +pub struct Repartition { + // Concurrency wanted downstream. + // will create more concurrency + concurrency: usize, +} + +impl PhysicalOptimizerRule for Repartition { + fn optimize(&self, plan: Arc) -> Result> { + let partitioning = plan.output_partitioning(); + let perform_repartition = match partitioning { + // Apply when underlying node has less than `self.concurrency` amount of concurrency + RoundRobinBatch(x) => x < self.concurrency, + UnknownPartitioning(x) => x < self.concurrency, + // we don't want to introduce partitioning after hash partitioning + // as the plan will likely depend on this + Hash(_, _) => false, + }; + + // Recurse into children + let children = plan + .children() + .iter() + .map(|child| self.optimize(child.clone())) + .collect::>()?; + + let new_plan = plan.with_new_children(children)?; + + if perform_repartition { + Ok(Arc::new(RepartitionExec::try_new( + new_plan, + RoundRobinBatch(self.concurrency), + )?)) + } else { + Ok(new_plan) + } + } + + fn name(&self) -> &str { + "repartition" + } +} From a30afc3758ec6c88b42e0a745378c0a014d8ae2b Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Wed, 31 Mar 2021 08:42:58 +0200 Subject: [PATCH 03/29] Add test --- rust/datafusion/src/optimizer/repartition.rs | 49 ++++++++++++++++---- rust/datafusion/src/physical_plan/parquet.rs | 4 +- 2 files changed, 42 insertions(+), 11 deletions(-) diff --git a/rust/datafusion/src/optimizer/repartition.rs b/rust/datafusion/src/optimizer/repartition.rs index f9da85c2d76..2bb651f45de 100644 --- a/rust/datafusion/src/optimizer/repartition.rs +++ b/rust/datafusion/src/optimizer/repartition.rs @@ -16,16 +16,8 @@ pub struct Repartition { impl PhysicalOptimizerRule for Repartition { fn optimize(&self, plan: Arc) -> Result> { let partitioning = plan.output_partitioning(); - let perform_repartition = match partitioning { - // Apply when underlying node has less than `self.concurrency` amount of concurrency - RoundRobinBatch(x) => x < self.concurrency, - UnknownPartitioning(x) => x < self.concurrency, - // we don't want to introduce partitioning after hash partitioning - // as the plan will likely depend on this - Hash(_, _) => false, - }; - // Recurse into children + // Recurse into children bottom-up (added nodes should be as deep as possible) let children = plan .children() .iter() @@ -34,6 +26,15 @@ impl PhysicalOptimizerRule for Repartition { let new_plan = plan.with_new_children(children)?; + let perform_repartition = match partitioning { + // Apply when underlying node has less than `self.concurrency` amount of concurrency + RoundRobinBatch(x) => x < self.concurrency, + UnknownPartitioning(x) => x < self.concurrency, + // we don't want to introduce partitioning after hash partitioning + // as the plan will likely depend on this + Hash(_, _) => false, + }; + if perform_repartition { Ok(Arc::new(RepartitionExec::try_new( new_plan, @@ -48,3 +49,33 @@ impl PhysicalOptimizerRule for Repartition { "repartition" } } +#[cfg(test)] +mod tests { + use arrow::datatypes::Schema; + + use super::*; + use crate::datasource::datasource::Statistics; + use crate::physical_plan::parquet::{ParquetExec, ParquetPartition}; + #[test] + fn added_repartition_to_single_partition() -> Result<()> { + let parquet = ParquetExec::new( + vec![ParquetPartition { + filenames: vec!["x".to_string()], + statistics: Statistics::default(), + }], + Schema::empty(), + None, + None, + 2048, + None, + ); + + let optimizer = Repartition { concurrency: 10 }; + + let optimized = optimizer.optimize(Arc::new(parquet))?; + + assert_eq!(optimized.output_partitioning().partition_count(), 10); + + Ok(()) + } +} diff --git a/rust/datafusion/src/physical_plan/parquet.rs b/rust/datafusion/src/physical_plan/parquet.rs index fce85e36074..d41d6968fee 100644 --- a/rust/datafusion/src/physical_plan/parquet.rs +++ b/rust/datafusion/src/physical_plan/parquet.rs @@ -100,9 +100,9 @@ pub struct ParquetExec { #[derive(Debug, Clone)] pub struct ParquetPartition { /// The Parquet filename for this partition - filenames: Vec, + pub filenames: Vec, /// Statistics for this partition - statistics: Statistics, + pub statistics: Statistics, } impl ParquetExec { From 823cf54e985a37c19e651bab5f749842aaad917b Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Wed, 31 Mar 2021 09:02:34 +0200 Subject: [PATCH 04/29] WIP --- rust/datafusion/src/execution/context.rs | 11 +++++++- rust/datafusion/src/optimizer/repartition.rs | 7 +++++ rust/datafusion/src/physical_plan/planner.rs | 27 +++++++++++++++++--- 3 files changed, 40 insertions(+), 5 deletions(-) diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs index 3a7c24ddc3e..6ef8d8212b4 100644 --- a/rust/datafusion/src/execution/context.rs +++ b/rust/datafusion/src/execution/context.rs @@ -21,7 +21,9 @@ use crate::{ catalog::{CatalogList, MemoryCatalogList}, information_schema::CatalogWithInformationSchema, }, - optimizer::hash_build_probe_order::HashBuildProbeOrder, + optimizer::{ + hash_build_probe_order::HashBuildProbeOrder, optimizer::PhysicalOptimizerRule, + }, }; use log::debug; use std::fs; @@ -56,6 +58,7 @@ use crate::optimizer::filter_push_down::FilterPushDown; use crate::optimizer::limit_push_down::LimitPushDown; use crate::optimizer::optimizer::OptimizerRule; use crate::optimizer::projection_push_down::ProjectionPushDown; +use crate::optimizer::repartition::Repartition; use crate::physical_plan::csv::CsvReadOptions; use crate::physical_plan::planner::DefaultPhysicalPlanner; use crate::physical_plan::udf::ScalarUDF; @@ -458,6 +461,7 @@ impl ExecutionContext { .create_physical_plan(logical_plan, &state) } + /// Executes a query and writes the results to a partitioned CSV file. pub async fn write_csv( &self, @@ -581,6 +585,7 @@ impl QueryPlanner for DefaultQueryPlanner { ) -> Result> { let planner = DefaultPhysicalPlanner::default(); planner.create_physical_plan(logical_plan, ctx_state) + } } @@ -593,6 +598,8 @@ pub struct ExecutionConfig { pub batch_size: usize, /// Responsible for optimizing a logical plan optimizers: Vec>, + /// Responsible for optimizing a physical execution plan + pub physical_optimizers: Vec>, /// Responsible for planning `LogicalPlan`s, and `ExecutionPlan` query_planner: Arc, /// Default catalog name for table resolution @@ -609,6 +616,7 @@ pub struct ExecutionConfig { impl ExecutionConfig { /// Create an execution config with default setting pub fn new() -> Self { + let concurrency = num_cpus::get(); Self { concurrency: num_cpus::get(), batch_size: 8192, @@ -619,6 +627,7 @@ impl ExecutionConfig { Arc::new(HashBuildProbeOrder::new()), Arc::new(LimitPushDown::new()), ], + physical_optimizers: vec![Arc::new(Repartition { concurrency })], query_planner: Arc::new(DefaultQueryPlanner {}), default_catalog: "datafusion".to_owned(), default_schema: "public".to_owned(), diff --git a/rust/datafusion/src/optimizer/repartition.rs b/rust/datafusion/src/optimizer/repartition.rs index 2bb651f45de..d4201530bda 100644 --- a/rust/datafusion/src/optimizer/repartition.rs +++ b/rust/datafusion/src/optimizer/repartition.rs @@ -13,6 +13,13 @@ pub struct Repartition { concurrency: usize, } +impl Repartition { + #[allow(missing_docs)] + pub fn new(concurrency: usize) -> Self { + Self { concurrency } + } +} + impl PhysicalOptimizerRule for Repartition { fn optimize(&self, plan: Arc) -> Result> { let partitioning = plan.output_partitioning(); diff --git a/rust/datafusion/src/physical_plan/planner.rs b/rust/datafusion/src/physical_plan/planner.rs index dd4184c5e1c..5458865478e 100644 --- a/rust/datafusion/src/physical_plan/planner.rs +++ b/rust/datafusion/src/physical_plan/planner.rs @@ -109,15 +109,16 @@ impl DefaultPhysicalPlanner { plan: Arc, ctx_state: &ExecutionContextState, ) -> Result> { + // TODO: make this a optimizer rule let children = plan .children() .iter() .map(|child| self.optimize_plan(child.clone(), ctx_state)) .collect::>>()?; - if children.is_empty() { + let plan = if children.is_empty() { // leaf node, children cannot be replaced - Ok(plan.clone()) + plan.clone() } else { // wrap operators in CoalesceBatches to avoid lots of tiny batches when we have // highly selective filters @@ -148,7 +149,9 @@ impl DefaultPhysicalPlanner { let children = plan.children().clone(); match plan.required_child_distribution() { - Distribution::UnspecifiedDistribution => plan.with_new_children(children), + Distribution::UnspecifiedDistribution => { + plan.with_new_children(children)? + } Distribution::SinglePartition => plan.with_new_children( children .iter() @@ -160,9 +163,25 @@ impl DefaultPhysicalPlanner { } }) .collect(), - ), + )?, } + }; + self.optimize_physical_plan(plan, ctx_state) + } + + // Optimize physical plan given based on active optimizers + fn optimize_physical_plan( + &self, + physical_plan: Arc, + ctx_state: &ExecutionContextState, + ) -> Result> { + let optimizers = &ctx_state.config.physical_optimizers; + let mut new_plan = physical_plan.clone(); + + for optimizer in optimizers { + new_plan = optimizer.optimize(new_plan)?; } + Ok(new_plan) } /// Create a physical plan from a logical plan From c2f4de8a64538891d097c53ceb408020f4076d93 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Wed, 31 Mar 2021 09:04:13 +0200 Subject: [PATCH 05/29] WIP --- rust/datafusion/src/execution/context.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs index 6ef8d8212b4..046d50e8c14 100644 --- a/rust/datafusion/src/execution/context.rs +++ b/rust/datafusion/src/execution/context.rs @@ -461,7 +461,6 @@ impl ExecutionContext { .create_physical_plan(logical_plan, &state) } - /// Executes a query and writes the results to a partitioned CSV file. pub async fn write_csv( &self, @@ -585,7 +584,6 @@ impl QueryPlanner for DefaultQueryPlanner { ) -> Result> { let planner = DefaultPhysicalPlanner::default(); planner.create_physical_plan(logical_plan, ctx_state) - } } @@ -627,7 +625,7 @@ impl ExecutionConfig { Arc::new(HashBuildProbeOrder::new()), Arc::new(LimitPushDown::new()), ], - physical_optimizers: vec![Arc::new(Repartition { concurrency })], + physical_optimizers: vec![Arc::new(Repartition::new(concurrency))], query_planner: Arc::new(DefaultQueryPlanner {}), default_catalog: "datafusion".to_owned(), default_schema: "public".to_owned(), From 065abf46af699dd54bb454acd588897f5e35cb52 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Wed, 31 Mar 2021 17:43:21 +0200 Subject: [PATCH 06/29] WIP --- rust/datafusion/src/execution/context.rs | 3 +- rust/datafusion/src/optimizer/optimizer.rs | 4 +- rust/datafusion/src/optimizer/repartition.rs | 115 ++++++++++++------- rust/datafusion/src/physical_plan/mod.rs | 2 +- rust/datafusion/src/physical_plan/planner.rs | 3 +- 5 files changed, 80 insertions(+), 47 deletions(-) diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs index 046d50e8c14..e1aba17e3eb 100644 --- a/rust/datafusion/src/execution/context.rs +++ b/rust/datafusion/src/execution/context.rs @@ -614,7 +614,6 @@ pub struct ExecutionConfig { impl ExecutionConfig { /// Create an execution config with default setting pub fn new() -> Self { - let concurrency = num_cpus::get(); Self { concurrency: num_cpus::get(), batch_size: 8192, @@ -625,7 +624,7 @@ impl ExecutionConfig { Arc::new(HashBuildProbeOrder::new()), Arc::new(LimitPushDown::new()), ], - physical_optimizers: vec![Arc::new(Repartition::new(concurrency))], + physical_optimizers: vec![Arc::new(Repartition::new())], query_planner: Arc::new(DefaultQueryPlanner {}), default_catalog: "datafusion".to_owned(), default_schema: "public".to_owned(), diff --git a/rust/datafusion/src/optimizer/optimizer.rs b/rust/datafusion/src/optimizer/optimizer.rs index 277e94b4394..ff9bb3095d8 100644 --- a/rust/datafusion/src/optimizer/optimizer.rs +++ b/rust/datafusion/src/optimizer/optimizer.rs @@ -18,7 +18,7 @@ //! Query optimizer traits use std::sync::Arc; -use crate::logical_plan::LogicalPlan; +use crate::{execution::context::ExecutionConfig, logical_plan::LogicalPlan}; use crate::{error::Result, physical_plan::ExecutionPlan}; /// `OptimizerRule` transforms one ['LogicalPlan'] into another which @@ -37,7 +37,7 @@ pub trait OptimizerRule { /// way. pub trait PhysicalOptimizerRule { /// Rewrite `plan` to an optimized form - fn optimize(&self, plan: Arc) -> Result>; + fn optimize(&self, plan: Arc, config: &ExecutionConfig) -> Result>; /// A human readable name for this optimizer rule fn name(&self) -> &str; diff --git a/rust/datafusion/src/optimizer/repartition.rs b/rust/datafusion/src/optimizer/repartition.rs index d4201530bda..a3711024814 100644 --- a/rust/datafusion/src/optimizer/repartition.rs +++ b/rust/datafusion/src/optimizer/repartition.rs @@ -1,55 +1,87 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Repartition optimizer that introduces repartition nodes to increase the level of parallism available use std::sync::Arc; -use crate::error::Result; use crate::physical_plan::{repartition::RepartitionExec, ExecutionPlan}; +use crate::{error::Result, execution::context::ExecutionConfig}; use super::optimizer::PhysicalOptimizerRule; -use crate::physical_plan::Partitioning::*; +use crate::physical_plan::{Distribution, Partitioning::*}; /// Optimizer that introduces repartition to introduce more parallelism in the plan -pub struct Repartition { - // Concurrency wanted downstream. - // will create more concurrency - concurrency: usize, -} +pub struct Repartition {} impl Repartition { #[allow(missing_docs)] - pub fn new(concurrency: usize) -> Self { - Self { concurrency } + pub fn new() -> Self { + Self {} + } +} + +fn optimize_concurrency( + concurrency: usize, + requires_single_partition: bool, + plan: Arc, +) -> Result> { + // Recurse into children bottom-up (added nodes should be as deep as possible) + let children = plan + .children() + .iter() + .map(|child| { + optimize_concurrency( + concurrency, + plan.required_child_distribution() == Distribution::SinglePartition, + child.clone(), + ) + }) + .collect::>()?; + + let new_plan = plan.with_new_children(children)?; + + let partitioning = plan.output_partitioning(); + + let perform_repartition = match partitioning { + // Apply when underlying node has less than `self.concurrency` amount of concurrency + RoundRobinBatch(x) => x < concurrency, + UnknownPartitioning(x) => x < concurrency, + // we don't want to introduce partitioning after hash partitioning + // as the plan will likely depend on this + Hash(_, _) => false, + }; + + if perform_repartition && !requires_single_partition { + Ok(Arc::new(RepartitionExec::try_new( + new_plan, + RoundRobinBatch(concurrency), + )?)) + } else { + Ok(new_plan) } } impl PhysicalOptimizerRule for Repartition { - fn optimize(&self, plan: Arc) -> Result> { - let partitioning = plan.output_partitioning(); - - // Recurse into children bottom-up (added nodes should be as deep as possible) - let children = plan - .children() - .iter() - .map(|child| self.optimize(child.clone())) - .collect::>()?; - - let new_plan = plan.with_new_children(children)?; - - let perform_repartition = match partitioning { - // Apply when underlying node has less than `self.concurrency` amount of concurrency - RoundRobinBatch(x) => x < self.concurrency, - UnknownPartitioning(x) => x < self.concurrency, - // we don't want to introduce partitioning after hash partitioning - // as the plan will likely depend on this - Hash(_, _) => false, - }; - - if perform_repartition { - Ok(Arc::new(RepartitionExec::try_new( - new_plan, - RoundRobinBatch(self.concurrency), - )?)) - } else { - Ok(new_plan) - } + fn optimize( + &self, + plan: Arc, + config: &ExecutionConfig, + ) -> Result> { + optimize_concurrency(config.concurrency, true, plan) } fn name(&self) -> &str { @@ -77,9 +109,12 @@ mod tests { None, ); - let optimizer = Repartition { concurrency: 10 }; + let optimizer = Repartition {}; - let optimized = optimizer.optimize(Arc::new(parquet))?; + let optimized = optimizer.optimize( + Arc::new(parquet), + &ExecutionConfig::new().with_concurrency(10), + )?; assert_eq!(optimized.output_partitioning().partition_count(), 10); diff --git a/rust/datafusion/src/physical_plan/mod.rs b/rust/datafusion/src/physical_plan/mod.rs index 3d549ec9323..d529e98f75d 100644 --- a/rust/datafusion/src/physical_plan/mod.rs +++ b/rust/datafusion/src/physical_plan/mod.rs @@ -150,7 +150,7 @@ impl Partitioning { } /// Distribution schemes -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub enum Distribution { /// Unspecified distribution UnspecifiedDistribution, diff --git a/rust/datafusion/src/physical_plan/planner.rs b/rust/datafusion/src/physical_plan/planner.rs index 5458865478e..35a85f79714 100644 --- a/rust/datafusion/src/physical_plan/planner.rs +++ b/rust/datafusion/src/physical_plan/planner.rs @@ -177,9 +177,8 @@ impl DefaultPhysicalPlanner { ) -> Result> { let optimizers = &ctx_state.config.physical_optimizers; let mut new_plan = physical_plan.clone(); - for optimizer in optimizers { - new_plan = optimizer.optimize(new_plan)?; + new_plan = optimizer.optimize(new_plan, &ctx_state.config)?; } Ok(new_plan) } From 69d1bd9d9a679420b532232aa25be394c8d522e9 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Wed, 31 Mar 2021 18:10:23 +0200 Subject: [PATCH 07/29] WIP --- rust/datafusion/src/optimizer/repartition.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/rust/datafusion/src/optimizer/repartition.rs b/rust/datafusion/src/optimizer/repartition.rs index a3711024814..aeb5a34b950 100644 --- a/rust/datafusion/src/optimizer/repartition.rs +++ b/rust/datafusion/src/optimizer/repartition.rs @@ -18,6 +18,8 @@ //! Repartition optimizer that introduces repartition nodes to increase the level of parallism available use std::sync::Arc; +use log::debug; + use crate::physical_plan::{repartition::RepartitionExec, ExecutionPlan}; use crate::{error::Result, execution::context::ExecutionConfig}; @@ -53,10 +55,8 @@ fn optimize_concurrency( .collect::>()?; let new_plan = plan.with_new_children(children)?; - - let partitioning = plan.output_partitioning(); - - let perform_repartition = match partitioning { + + let perform_repartition = match plan.output_partitioning() { // Apply when underlying node has less than `self.concurrency` amount of concurrency RoundRobinBatch(x) => x < concurrency, UnknownPartitioning(x) => x < concurrency, @@ -64,8 +64,9 @@ fn optimize_concurrency( // as the plan will likely depend on this Hash(_, _) => false, }; - + if perform_repartition && !requires_single_partition { + debug!("Added RepartitionExec in optimizer with concurrency {}", concurrency); Ok(Arc::new(RepartitionExec::try_new( new_plan, RoundRobinBatch(concurrency), From 4ce8ec606d81418139dc3bb46d3693b6a2383312 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Wed, 31 Mar 2021 18:34:02 +0200 Subject: [PATCH 08/29] Fix memec --- rust/datafusion/src/optimizer/repartition.rs | 41 ++++++++++++-------- 1 file changed, 25 insertions(+), 16 deletions(-) diff --git a/rust/datafusion/src/optimizer/repartition.rs b/rust/datafusion/src/optimizer/repartition.rs index aeb5a34b950..a90d122238e 100644 --- a/rust/datafusion/src/optimizer/repartition.rs +++ b/rust/datafusion/src/optimizer/repartition.rs @@ -42,20 +42,26 @@ fn optimize_concurrency( plan: Arc, ) -> Result> { // Recurse into children bottom-up (added nodes should be as deep as possible) - let children = plan - .children() - .iter() - .map(|child| { - optimize_concurrency( - concurrency, - plan.required_child_distribution() == Distribution::SinglePartition, - child.clone(), - ) - }) - .collect::>()?; - - let new_plan = plan.with_new_children(children)?; - + + let new_plan = if plan.children().len() == 0 { + // leaf node - don't replace + plan.clone() + } else { + let children = plan + .children() + .iter() + .map(|child| { + optimize_concurrency( + concurrency, + plan.required_child_distribution() == Distribution::SinglePartition, + child.clone(), + ) + }) + .collect::>()?; + + plan.with_new_children(children)? + }; + let perform_repartition = match plan.output_partitioning() { // Apply when underlying node has less than `self.concurrency` amount of concurrency RoundRobinBatch(x) => x < concurrency, @@ -64,9 +70,12 @@ fn optimize_concurrency( // as the plan will likely depend on this Hash(_, _) => false, }; - + if perform_repartition && !requires_single_partition { - debug!("Added RepartitionExec in optimizer with concurrency {}", concurrency); + debug!( + "Added RepartitionExec in optimizer with concurrency {}", + concurrency + ); Ok(Arc::new(RepartitionExec::try_new( new_plan, RoundRobinBatch(concurrency), From 97e071d3c6072b350d8cda1bb800892bc02def8f Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Wed, 31 Mar 2021 21:55:12 +0200 Subject: [PATCH 09/29] Fix test --- rust/datafusion/src/optimizer/repartition.rs | 36 +++++++++++++------- 1 file changed, 23 insertions(+), 13 deletions(-) diff --git a/rust/datafusion/src/optimizer/repartition.rs b/rust/datafusion/src/optimizer/repartition.rs index a90d122238e..b7de1b38747 100644 --- a/rust/datafusion/src/optimizer/repartition.rs +++ b/rust/datafusion/src/optimizer/repartition.rs @@ -105,28 +105,38 @@ mod tests { use super::*; use crate::datasource::datasource::Statistics; use crate::physical_plan::parquet::{ParquetExec, ParquetPartition}; + use crate::physical_plan::projection::ProjectionExec; + #[test] fn added_repartition_to_single_partition() -> Result<()> { - let parquet = ParquetExec::new( - vec![ParquetPartition { - filenames: vec!["x".to_string()], - statistics: Statistics::default(), - }], - Schema::empty(), - None, - None, - 2048, - None, - ); + let parquet_project = ProjectionExec::try_new( + vec![], + Arc::new(ParquetExec::new( + vec![ParquetPartition { + filenames: vec!["x".to_string()], + statistics: Statistics::default(), + }], + Schema::empty(), + None, + None, + 2048, + None, + )), + )?; let optimizer = Repartition {}; let optimized = optimizer.optimize( - Arc::new(parquet), + Arc::new(parquet_project), &ExecutionConfig::new().with_concurrency(10), )?; - assert_eq!(optimized.output_partitioning().partition_count(), 10); + assert_eq!( + optimized.children()[0] + .output_partitioning() + .partition_count(), + 10 + ); Ok(()) } From 556779e44b305c04c004fc2a329986d2aa1aa348 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Wed, 31 Mar 2021 21:55:28 +0200 Subject: [PATCH 10/29] Fmt --- rust/datafusion/src/optimizer/optimizer.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/rust/datafusion/src/optimizer/optimizer.rs b/rust/datafusion/src/optimizer/optimizer.rs index ff9bb3095d8..4c6d472d3a3 100644 --- a/rust/datafusion/src/optimizer/optimizer.rs +++ b/rust/datafusion/src/optimizer/optimizer.rs @@ -18,8 +18,8 @@ //! Query optimizer traits use std::sync::Arc; -use crate::{execution::context::ExecutionConfig, logical_plan::LogicalPlan}; use crate::{error::Result, physical_plan::ExecutionPlan}; +use crate::{execution::context::ExecutionConfig, logical_plan::LogicalPlan}; /// `OptimizerRule` transforms one ['LogicalPlan'] into another which /// computes the same results, but in a potentially more efficient @@ -37,7 +37,11 @@ pub trait OptimizerRule { /// way. pub trait PhysicalOptimizerRule { /// Rewrite `plan` to an optimized form - fn optimize(&self, plan: Arc, config: &ExecutionConfig) -> Result>; + fn optimize( + &self, + plan: Arc, + config: &ExecutionConfig, + ) -> Result>; /// A human readable name for this optimizer rule fn name(&self) -> &str; From dfc8b6c6bbf55409dfeaa7709c3059fd9e25ba0c Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Wed, 31 Mar 2021 22:35:44 +0200 Subject: [PATCH 11/29] Reorganize --- rust/datafusion/src/lib.rs | 1 + rust/datafusion/src/optimizer/mod.rs | 1 - rust/datafusion/src/optimizer/repartition.rs | 143 ------------------- 3 files changed, 1 insertion(+), 144 deletions(-) delete mode 100644 rust/datafusion/src/optimizer/repartition.rs diff --git a/rust/datafusion/src/lib.rs b/rust/datafusion/src/lib.rs index 2733430cbe9..44a8a686a49 100644 --- a/rust/datafusion/src/lib.rs +++ b/rust/datafusion/src/lib.rs @@ -193,6 +193,7 @@ pub mod error; pub mod execution; pub mod logical_plan; pub mod optimizer; +pub mod physical_optimizer; pub mod physical_plan; pub mod prelude; pub mod scalar; diff --git a/rust/datafusion/src/optimizer/mod.rs b/rust/datafusion/src/optimizer/mod.rs index 373d89487d2..dc59b64ff46 100644 --- a/rust/datafusion/src/optimizer/mod.rs +++ b/rust/datafusion/src/optimizer/mod.rs @@ -24,5 +24,4 @@ pub mod hash_build_probe_order; pub mod limit_push_down; pub mod optimizer; pub mod projection_push_down; -pub mod repartition; pub mod utils; diff --git a/rust/datafusion/src/optimizer/repartition.rs b/rust/datafusion/src/optimizer/repartition.rs deleted file mode 100644 index b7de1b38747..00000000000 --- a/rust/datafusion/src/optimizer/repartition.rs +++ /dev/null @@ -1,143 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Repartition optimizer that introduces repartition nodes to increase the level of parallism available -use std::sync::Arc; - -use log::debug; - -use crate::physical_plan::{repartition::RepartitionExec, ExecutionPlan}; -use crate::{error::Result, execution::context::ExecutionConfig}; - -use super::optimizer::PhysicalOptimizerRule; -use crate::physical_plan::{Distribution, Partitioning::*}; - -/// Optimizer that introduces repartition to introduce more parallelism in the plan -pub struct Repartition {} - -impl Repartition { - #[allow(missing_docs)] - pub fn new() -> Self { - Self {} - } -} - -fn optimize_concurrency( - concurrency: usize, - requires_single_partition: bool, - plan: Arc, -) -> Result> { - // Recurse into children bottom-up (added nodes should be as deep as possible) - - let new_plan = if plan.children().len() == 0 { - // leaf node - don't replace - plan.clone() - } else { - let children = plan - .children() - .iter() - .map(|child| { - optimize_concurrency( - concurrency, - plan.required_child_distribution() == Distribution::SinglePartition, - child.clone(), - ) - }) - .collect::>()?; - - plan.with_new_children(children)? - }; - - let perform_repartition = match plan.output_partitioning() { - // Apply when underlying node has less than `self.concurrency` amount of concurrency - RoundRobinBatch(x) => x < concurrency, - UnknownPartitioning(x) => x < concurrency, - // we don't want to introduce partitioning after hash partitioning - // as the plan will likely depend on this - Hash(_, _) => false, - }; - - if perform_repartition && !requires_single_partition { - debug!( - "Added RepartitionExec in optimizer with concurrency {}", - concurrency - ); - Ok(Arc::new(RepartitionExec::try_new( - new_plan, - RoundRobinBatch(concurrency), - )?)) - } else { - Ok(new_plan) - } -} - -impl PhysicalOptimizerRule for Repartition { - fn optimize( - &self, - plan: Arc, - config: &ExecutionConfig, - ) -> Result> { - optimize_concurrency(config.concurrency, true, plan) - } - - fn name(&self) -> &str { - "repartition" - } -} -#[cfg(test)] -mod tests { - use arrow::datatypes::Schema; - - use super::*; - use crate::datasource::datasource::Statistics; - use crate::physical_plan::parquet::{ParquetExec, ParquetPartition}; - use crate::physical_plan::projection::ProjectionExec; - - #[test] - fn added_repartition_to_single_partition() -> Result<()> { - let parquet_project = ProjectionExec::try_new( - vec![], - Arc::new(ParquetExec::new( - vec![ParquetPartition { - filenames: vec!["x".to_string()], - statistics: Statistics::default(), - }], - Schema::empty(), - None, - None, - 2048, - None, - )), - )?; - - let optimizer = Repartition {}; - - let optimized = optimizer.optimize( - Arc::new(parquet_project), - &ExecutionConfig::new().with_concurrency(10), - )?; - - assert_eq!( - optimized.children()[0] - .output_partitioning() - .partition_count(), - 10 - ); - - Ok(()) - } -} From 5c9cadfd700d9e633b5bdc87564745eb038f8c1c Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Wed, 31 Mar 2021 22:39:24 +0200 Subject: [PATCH 12/29] Reorganize --- rust/datafusion/src/physical_optimizer/mod.rs | 22 +++ .../src/physical_optimizer/optimizer.rs | 39 +++++ .../src/physical_optimizer/repartition.rs | 142 ++++++++++++++++++ 3 files changed, 203 insertions(+) create mode 100644 rust/datafusion/src/physical_optimizer/mod.rs create mode 100644 rust/datafusion/src/physical_optimizer/optimizer.rs create mode 100644 rust/datafusion/src/physical_optimizer/repartition.rs diff --git a/rust/datafusion/src/physical_optimizer/mod.rs b/rust/datafusion/src/physical_optimizer/mod.rs new file mode 100644 index 00000000000..37f02ee48a4 --- /dev/null +++ b/rust/datafusion/src/physical_optimizer/mod.rs @@ -0,0 +1,22 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This module contains a query optimizer that operates against a physical plan and applies +//! rules to a physical plan, such as "Repartition". + +pub mod optimizer; +pub mod repartition; diff --git a/rust/datafusion/src/physical_optimizer/optimizer.rs b/rust/datafusion/src/physical_optimizer/optimizer.rs new file mode 100644 index 00000000000..e2f40ae9540 --- /dev/null +++ b/rust/datafusion/src/physical_optimizer/optimizer.rs @@ -0,0 +1,39 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Physical optimizer traits + +use std::sync::Arc; + +use crate::{ + error::Result, execution::context::ExecutionConfig, physical_plan::ExecutionPlan, +}; + +/// `PhysicalOptimizerRule` transforms one ['ExecutionPlan'] into another which +/// computes the same results, but in a potentially more efficient +/// way. +pub trait PhysicalOptimizerRule { + /// Rewrite `plan` to an optimized form + fn optimize( + &self, + plan: Arc, + config: &ExecutionConfig, + ) -> Result>; + + /// A human readable name for this optimizer rule + fn name(&self) -> &str; +} diff --git a/rust/datafusion/src/physical_optimizer/repartition.rs b/rust/datafusion/src/physical_optimizer/repartition.rs new file mode 100644 index 00000000000..c56b7f06988 --- /dev/null +++ b/rust/datafusion/src/physical_optimizer/repartition.rs @@ -0,0 +1,142 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Repartition optimizer that introduces repartition nodes to increase the level of parallism available +use std::sync::Arc; + +use log::debug; + +use super::optimizer::PhysicalOptimizerRule; +use crate::physical_plan::{repartition::RepartitionExec, ExecutionPlan}; +use crate::physical_plan::{Distribution, Partitioning::*}; +use crate::{error::Result, execution::context::ExecutionConfig}; + +/// Optimizer that introduces repartition to introduce more parallelism in the plan +pub struct Repartition {} + +impl Repartition { + #[allow(missing_docs)] + pub fn new() -> Self { + Self {} + } +} + +fn optimize_concurrency( + concurrency: usize, + requires_single_partition: bool, + plan: Arc, +) -> Result> { + // Recurse into children bottom-up (added nodes should be as deep as possible) + + let new_plan = if plan.children().len() == 0 { + // leaf node - don't replace children + plan.clone() + } else { + let children = plan + .children() + .iter() + .map(|child| { + optimize_concurrency( + concurrency, + plan.required_child_distribution() == Distribution::SinglePartition, + child.clone(), + ) + }) + .collect::>()?; + + plan.with_new_children(children)? + }; + + let perform_repartition = match plan.output_partitioning() { + // Apply when underlying node has less than `self.concurrency` amount of concurrency + RoundRobinBatch(x) => x < concurrency, + UnknownPartitioning(x) => x < concurrency, + // we don't want to introduce partitioning after hash partitioning + // as the plan will likely depend on this + Hash(_, _) => false, + }; + + if perform_repartition && !requires_single_partition { + debug!( + "Added RepartitionExec in optimizer with concurrency {}", + concurrency + ); + Ok(Arc::new(RepartitionExec::try_new( + new_plan, + RoundRobinBatch(concurrency), + )?)) + } else { + Ok(new_plan) + } +} + +impl PhysicalOptimizerRule for Repartition { + fn optimize( + &self, + plan: Arc, + config: &ExecutionConfig, + ) -> Result> { + optimize_concurrency(config.concurrency, true, plan) + } + + fn name(&self) -> &str { + "repartition" + } +} +#[cfg(test)] +mod tests { + use arrow::datatypes::Schema; + + use super::*; + use crate::datasource::datasource::Statistics; + use crate::physical_plan::parquet::{ParquetExec, ParquetPartition}; + use crate::physical_plan::projection::ProjectionExec; + + #[test] + fn added_repartition_to_single_partition() -> Result<()> { + let parquet_project = ProjectionExec::try_new( + vec![], + Arc::new(ParquetExec::new( + vec![ParquetPartition { + filenames: vec!["x".to_string()], + statistics: Statistics::default(), + }], + Schema::empty(), + None, + None, + 2048, + None, + )), + )?; + + let optimizer = Repartition {}; + + let optimized = optimizer.optimize( + Arc::new(parquet_project), + &ExecutionConfig::new().with_concurrency(10), + )?; + + assert_eq!( + optimized.children()[0] + .output_partitioning() + .partition_count(), + 10 + ); + + Ok(()) + } +} From 24c494148296685f66f2d465342a3405b4203366 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Wed, 31 Mar 2021 22:48:27 +0200 Subject: [PATCH 13/29] Fix --- rust/datafusion/src/execution/context.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs index 13b7e4f1f5b..985a78a8735 100644 --- a/rust/datafusion/src/execution/context.rs +++ b/rust/datafusion/src/execution/context.rs @@ -58,7 +58,7 @@ use crate::optimizer::filter_push_down::FilterPushDown; use crate::optimizer::limit_push_down::LimitPushDown; use crate::optimizer::optimizer::OptimizerRule; use crate::optimizer::projection_push_down::ProjectionPushDown; -use crate::optimizer::repartition::Repartition; +use crate::physical_optimizer::repartition::Repartition; use crate::physical_plan::csv::CsvReadOptions; use crate::physical_plan::planner::DefaultPhysicalPlanner; use crate::physical_plan::udf::ScalarUDF; From 0a50e915cc853501fa0274e531f61e6b65aa7e39 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Wed, 31 Mar 2021 22:51:01 +0200 Subject: [PATCH 14/29] Reorganize --- rust/datafusion/src/execution/context.rs | 5 ++--- rust/datafusion/src/optimizer/optimizer.rs | 20 ++------------------ 2 files changed, 4 insertions(+), 21 deletions(-) diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs index 985a78a8735..00057f30d74 100644 --- a/rust/datafusion/src/execution/context.rs +++ b/rust/datafusion/src/execution/context.rs @@ -21,9 +21,8 @@ use crate::{ catalog::{CatalogList, MemoryCatalogList}, information_schema::CatalogWithInformationSchema, }, - optimizer::{ - hash_build_probe_order::HashBuildProbeOrder, optimizer::PhysicalOptimizerRule, - }, + optimizer::hash_build_probe_order::HashBuildProbeOrder, + physical_optimizer::optimizer::PhysicalOptimizerRule, }; use log::debug; use std::fs; diff --git a/rust/datafusion/src/optimizer/optimizer.rs b/rust/datafusion/src/optimizer/optimizer.rs index 4c6d472d3a3..dee8e06a5e3 100644 --- a/rust/datafusion/src/optimizer/optimizer.rs +++ b/rust/datafusion/src/optimizer/optimizer.rs @@ -16,10 +16,9 @@ // under the License. //! Query optimizer traits -use std::sync::Arc; -use crate::{error::Result, physical_plan::ExecutionPlan}; -use crate::{execution::context::ExecutionConfig, logical_plan::LogicalPlan}; +use crate::error::Result; +use crate::logical_plan::LogicalPlan; /// `OptimizerRule` transforms one ['LogicalPlan'] into another which /// computes the same results, but in a potentially more efficient @@ -31,18 +30,3 @@ pub trait OptimizerRule { /// A human readable name for this optimizer rule fn name(&self) -> &str; } - -/// `PhysicalOptimizerRule` transforms one ['ExecutionPlan'] into another which -/// computes the same results, but in a potentially more efficient -/// way. -pub trait PhysicalOptimizerRule { - /// Rewrite `plan` to an optimized form - fn optimize( - &self, - plan: Arc, - config: &ExecutionConfig, - ) -> Result>; - - /// A human readable name for this optimizer rule - fn name(&self) -> &str; -} From 45954844c7ef32943ccb43b71d3a43aaf4032e9b Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Thu, 1 Apr 2021 08:33:23 +0200 Subject: [PATCH 15/29] Update tests expectations --- rust/datafusion/src/execution/context.rs | 16 ---------------- rust/datafusion/src/physical_plan/planner.rs | 11 ++++++++--- 2 files changed, 8 insertions(+), 19 deletions(-) diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs index 00057f30d74..82186876842 100644 --- a/rust/datafusion/src/execution/context.rs +++ b/rust/datafusion/src/execution/context.rs @@ -934,10 +934,6 @@ mod tests { println!("{:?}", physical_plan); let results = collect_partitioned(physical_plan).await?; - assert_eq!(results.len(), partition_count); - - // there should be a total of 2 batches with 20 rows because the where clause filters - // out results from 2 partitions // note that the order of partitions is not deterministic let mut num_batches = 0; @@ -1744,16 +1740,8 @@ mod tests { let part3 = plan_and_collect(&mut ctx, "SELECT c1, c2 FROM part3").await?; let allparts = plan_and_collect(&mut ctx, "SELECT c1, c2 FROM allparts").await?; - let part0_count: usize = part0.iter().map(|batch| batch.num_rows()).sum(); - let part1_count: usize = part1.iter().map(|batch| batch.num_rows()).sum(); - let part2_count: usize = part2.iter().map(|batch| batch.num_rows()).sum(); - let part3_count: usize = part3.iter().map(|batch| batch.num_rows()).sum(); let allparts_count: usize = allparts.iter().map(|batch| batch.num_rows()).sum(); - assert_eq!(part0_count, 10); - assert_eq!(part1_count, 10); - assert_eq!(part2_count, 10); - assert_eq!(part3_count, 10); assert_eq!(allparts_count, 40); Ok(()) @@ -1791,10 +1779,6 @@ mod tests { let part3_count: usize = part3.iter().map(|batch| batch.num_rows()).sum(); let allparts_count: usize = allparts.iter().map(|batch| batch.num_rows()).sum(); - assert_eq!(part0_count, 10); - assert_eq!(part1_count, 10); - assert_eq!(part2_count, 10); - assert_eq!(part3_count, 10); assert_eq!(allparts_count, 40); Ok(()) diff --git a/rust/datafusion/src/physical_plan/planner.rs b/rust/datafusion/src/physical_plan/planner.rs index 35a85f79714..c878560564a 100644 --- a/rust/datafusion/src/physical_plan/planner.rs +++ b/rust/datafusion/src/physical_plan/planner.rs @@ -51,6 +51,7 @@ use arrow::compute::can_cast_types; use arrow::compute::SortOptions; use arrow::datatypes::{Schema, SchemaRef}; use expressions::col; +use log::debug; /// This trait exposes the ability to plan an [`ExecutionPlan`] out of a [`LogicalPlan`]. pub trait ExtensionPlanner { @@ -116,7 +117,9 @@ impl DefaultPhysicalPlanner { .map(|child| self.optimize_plan(child.clone(), ctx_state)) .collect::>>()?; - let plan = if children.is_empty() { + let plan = self.optimize_physical_plan(plan, ctx_state)?; + + Ok(if children.is_empty() { // leaf node, children cannot be replaced plan.clone() } else { @@ -165,8 +168,7 @@ impl DefaultPhysicalPlanner { .collect(), )?, } - }; - self.optimize_physical_plan(plan, ctx_state) + }) } // Optimize physical plan given based on active optimizers @@ -177,9 +179,12 @@ impl DefaultPhysicalPlanner { ) -> Result> { let optimizers = &ctx_state.config.physical_optimizers; let mut new_plan = physical_plan.clone(); + debug!("Physical plan:\n{:?}", new_plan); + for optimizer in optimizers { new_plan = optimizer.optimize(new_plan, &ctx_state.config)?; } + debug!("Optimized physical plan:\n{:?}", new_plan); Ok(new_plan) } From bd83b96c40860bde8256525890f89d9d7554f2bc Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Thu, 1 Apr 2021 08:35:03 +0200 Subject: [PATCH 16/29] Update tests expectations --- rust/datafusion/src/execution/context.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs index 82186876842..58bde3bea9b 100644 --- a/rust/datafusion/src/execution/context.rs +++ b/rust/datafusion/src/execution/context.rs @@ -1740,6 +1740,7 @@ mod tests { let part3 = plan_and_collect(&mut ctx, "SELECT c1, c2 FROM part3").await?; let allparts = plan_and_collect(&mut ctx, "SELECT c1, c2 FROM allparts").await?; + assert_eq!(part0_count + part1_count + part2_count + part3_count, 40); let allparts_count: usize = allparts.iter().map(|batch| batch.num_rows()).sum(); assert_eq!(allparts_count, 40); @@ -1779,6 +1780,8 @@ mod tests { let part3_count: usize = part3.iter().map(|batch| batch.num_rows()).sum(); let allparts_count: usize = allparts.iter().map(|batch| batch.num_rows()).sum(); + assert_eq!(part0_count + part1_count + part2_count + part3_count, 40); + assert_eq!(allparts_count, 40); Ok(()) From 5f05dae50b9c1ee167d052d08f3cad39b763c52a Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Thu, 1 Apr 2021 08:51:31 +0200 Subject: [PATCH 17/29] Update tests expectations --- rust/datafusion/src/execution/context.rs | 18 +++--------------- 1 file changed, 3 insertions(+), 15 deletions(-) diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs index 58bde3bea9b..8fe036fbccc 100644 --- a/rust/datafusion/src/execution/context.rs +++ b/rust/datafusion/src/execution/context.rs @@ -1729,20 +1729,15 @@ mod tests { // register each partition as well as the top level dir let csv_read_option = CsvReadOptions::new().schema(&schema); ctx.register_csv("part0", &format!("{}/part-0.csv", out_dir), csv_read_option)?; - ctx.register_csv("part1", &format!("{}/part-1.csv", out_dir), csv_read_option)?; - ctx.register_csv("part2", &format!("{}/part-2.csv", out_dir), csv_read_option)?; - ctx.register_csv("part3", &format!("{}/part-3.csv", out_dir), csv_read_option)?; ctx.register_csv("allparts", &out_dir, csv_read_option)?; let part0 = plan_and_collect(&mut ctx, "SELECT c1, c2 FROM part0").await?; - let part1 = plan_and_collect(&mut ctx, "SELECT c1, c2 FROM part1").await?; - let part2 = plan_and_collect(&mut ctx, "SELECT c1, c2 FROM part2").await?; - let part3 = plan_and_collect(&mut ctx, "SELECT c1, c2 FROM part3").await?; let allparts = plan_and_collect(&mut ctx, "SELECT c1, c2 FROM allparts").await?; - assert_eq!(part0_count + part1_count + part2_count + part3_count, 40); let allparts_count: usize = allparts.iter().map(|batch| batch.num_rows()).sum(); + assert_eq!(part0[0].schema(), allparts[0].schema()); + assert_eq!(allparts_count, 40); Ok(()) @@ -1769,18 +1764,11 @@ mod tests { ctx.register_parquet("allparts", &out_dir)?; let part0 = plan_and_collect(&mut ctx, "SELECT c1, c2 FROM part0").await?; - let part1 = plan_and_collect(&mut ctx, "SELECT c1, c2 FROM part1").await?; - let part2 = plan_and_collect(&mut ctx, "SELECT c1, c2 FROM part2").await?; - let part3 = plan_and_collect(&mut ctx, "SELECT c1, c2 FROM part3").await?; let allparts = plan_and_collect(&mut ctx, "SELECT c1, c2 FROM allparts").await?; - let part0_count: usize = part0.iter().map(|batch| batch.num_rows()).sum(); - let part1_count: usize = part1.iter().map(|batch| batch.num_rows()).sum(); - let part2_count: usize = part2.iter().map(|batch| batch.num_rows()).sum(); - let part3_count: usize = part3.iter().map(|batch| batch.num_rows()).sum(); let allparts_count: usize = allparts.iter().map(|batch| batch.num_rows()).sum(); - assert_eq!(part0_count + part1_count + part2_count + part3_count, 40); + assert_eq!(part0[0].schema(), allparts[0].schema()); assert_eq!(allparts_count, 40); From 3aa5bb13e33e55d428fecc637b2e38673318ccc7 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Thu, 1 Apr 2021 21:49:30 +0200 Subject: [PATCH 18/29] Add CoalesceBatches / AddMergeExec as optimizers --- rust/datafusion/src/execution/context.rs | 9 +- .../physical_optimizer/coalesce_batches.rs | 88 +++++++++++++++ .../src/physical_optimizer/merge_exec.rs | 74 +++++++++++++ rust/datafusion/src/physical_optimizer/mod.rs | 2 + .../src/physical_optimizer/repartition.rs | 11 +- rust/datafusion/src/physical_plan/mod.rs | 3 +- rust/datafusion/src/physical_plan/planner.rs | 101 +++--------------- 7 files changed, 191 insertions(+), 97 deletions(-) create mode 100644 rust/datafusion/src/physical_optimizer/coalesce_batches.rs create mode 100644 rust/datafusion/src/physical_optimizer/merge_exec.rs diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs index 8fe036fbccc..9531fd38520 100644 --- a/rust/datafusion/src/execution/context.rs +++ b/rust/datafusion/src/execution/context.rs @@ -57,7 +57,10 @@ use crate::optimizer::filter_push_down::FilterPushDown; use crate::optimizer::limit_push_down::LimitPushDown; use crate::optimizer::optimizer::OptimizerRule; use crate::optimizer::projection_push_down::ProjectionPushDown; +use crate::physical_optimizer::coalesce_batches::CoalesceBatches; use crate::physical_optimizer::repartition::Repartition; +use crate::physical_optimizer::merge_exec::AddMergeExec; + use crate::physical_plan::csv::CsvReadOptions; use crate::physical_plan::planner::DefaultPhysicalPlanner; use crate::physical_plan::udf::ScalarUDF; @@ -623,7 +626,11 @@ impl ExecutionConfig { Arc::new(HashBuildProbeOrder::new()), Arc::new(LimitPushDown::new()), ], - physical_optimizers: vec![Arc::new(Repartition::new())], + physical_optimizers: vec![ + Arc::new(Repartition::new()), + Arc::new(CoalesceBatches::new()), + Arc::new(AddMergeExec::new()), + ], query_planner: Arc::new(DefaultQueryPlanner {}), default_catalog: "datafusion".to_owned(), default_schema: "public".to_owned(), diff --git a/rust/datafusion/src/physical_optimizer/coalesce_batches.rs b/rust/datafusion/src/physical_optimizer/coalesce_batches.rs new file mode 100644 index 00000000000..9af8911062d --- /dev/null +++ b/rust/datafusion/src/physical_optimizer/coalesce_batches.rs @@ -0,0 +1,88 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! CoalesceBatches optimizer that groups batches together rows +//! in bigger batches to avoid overhead with small batches + +use super::optimizer::PhysicalOptimizerRule; +use crate::{ + error::Result, + physical_plan::{ + coalesce_batches::CoalesceBatchesExec, filter::FilterExec, + hash_join::HashJoinExec, repartition::RepartitionExec, + }, +}; +use std::sync::Arc; + +/// Optimizer that introduces CoalesceBatchesExec to avoid overhead with small batches +pub struct CoalesceBatches {} + +impl CoalesceBatches { + #[allow(missing_docs)] + pub fn new() -> Self { + Self {} + } +} +impl PhysicalOptimizerRule for CoalesceBatches { + fn optimize( + &self, + plan: Arc, + config: &crate::execution::context::ExecutionConfig, + ) -> Result> { + // wrap operators in CoalesceBatches to avoid lots of tiny batches when we have + // highly selective filters + let children = plan + .children() + .iter() + .map(|child| self.optimize(child.clone(), config)) + .collect::>>()?; + + let plan_any = plan.as_any(); + //TODO we should do this in a more generic way either by wrapping all operators + // or having an API so that operators can declare when their inputs or outputs + // need to be wrapped in a coalesce batches operator. + // See https://issues.apache.org/jira/browse/ARROW-11068 + let wrap_in_coalesce = plan_any.downcast_ref::().is_some() + || plan_any.downcast_ref::().is_some() + || plan_any.downcast_ref::().is_some(); + + //TODO we should also do this for HashAggregateExec but we need to update tests + // as part of this work - see https://issues.apache.org/jira/browse/ARROW-11068 + // || plan_any.downcast_ref::().is_some(); + + if plan.children().is_empty() { + // leaf node, children cannot be replaced + Ok(plan.clone()) + } else { + let plan = plan.with_new_children(children)?; + Ok(if wrap_in_coalesce { + //TODO we should add specific configuration settings for coalescing batches and + // we should do that once https://issues.apache.org/jira/browse/ARROW-11059 is + // implemented. For now, we choose half the configured batch size to avoid copies + // when a small number of rows are removed from a batch + let target_batch_size = config.batch_size / 2; + Arc::new(CoalesceBatchesExec::new(plan.clone(), target_batch_size)) + } else { + plan.clone() + }) + } + } + + fn name(&self) -> &str { + "coalesce_batches" + } +} diff --git a/rust/datafusion/src/physical_optimizer/merge_exec.rs b/rust/datafusion/src/physical_optimizer/merge_exec.rs new file mode 100644 index 00000000000..fa8542f7f3d --- /dev/null +++ b/rust/datafusion/src/physical_optimizer/merge_exec.rs @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! CoalesceBatches optimizer that groups batches together rows +//! in bigger batches to avoid overhead with small batches + +use super::optimizer::PhysicalOptimizerRule; +use crate::{ + error::Result, + physical_plan::{merge::MergeExec, Distribution}, +}; +use std::sync::Arc; + +/// Introduces MergeExec +pub struct AddMergeExec {} + +impl AddMergeExec { + #[allow(missing_docs)] + pub fn new() -> Self { + Self {} + } +} + +impl PhysicalOptimizerRule for AddMergeExec { + fn optimize( + &self, + plan: Arc, + config: &crate::execution::context::ExecutionConfig, + ) -> Result> { + if plan.children().is_empty() { + // leaf node, children cannot be replaced + Ok(plan.clone()) + } else { + let children = plan + .children() + .iter() + .map(|child| self.optimize(child.clone(), config)) + .collect::>>()?; + match plan.required_child_distribution() { + Distribution::UnspecifiedDistribution => plan.with_new_children(children), + Distribution::SinglePartition => plan.with_new_children( + children + .iter() + .map(|child| { + if child.output_partitioning().partition_count() == 1 { + child.clone() + } else { + Arc::new(MergeExec::new(child.clone())) + } + }) + .collect(), + ), + } + } + } + + fn name(&self) -> &str { + "add_merge_exec" + } +} diff --git a/rust/datafusion/src/physical_optimizer/mod.rs b/rust/datafusion/src/physical_optimizer/mod.rs index 37f02ee48a4..b7097120b95 100644 --- a/rust/datafusion/src/physical_optimizer/mod.rs +++ b/rust/datafusion/src/physical_optimizer/mod.rs @@ -20,3 +20,5 @@ pub mod optimizer; pub mod repartition; +pub mod coalesce_batches; +pub mod merge_exec; diff --git a/rust/datafusion/src/physical_optimizer/repartition.rs b/rust/datafusion/src/physical_optimizer/repartition.rs index c56b7f06988..bc2c200bf7e 100644 --- a/rust/datafusion/src/physical_optimizer/repartition.rs +++ b/rust/datafusion/src/physical_optimizer/repartition.rs @@ -18,8 +18,6 @@ //! Repartition optimizer that introduces repartition nodes to increase the level of parallism available use std::sync::Arc; -use log::debug; - use super::optimizer::PhysicalOptimizerRule; use crate::physical_plan::{repartition::RepartitionExec, ExecutionPlan}; use crate::physical_plan::{Distribution, Partitioning::*}; @@ -42,7 +40,7 @@ fn optimize_concurrency( ) -> Result> { // Recurse into children bottom-up (added nodes should be as deep as possible) - let new_plan = if plan.children().len() == 0 { + let new_plan = if plan.children().is_empty() { // leaf node - don't replace children plan.clone() } else { @@ -57,11 +55,10 @@ fn optimize_concurrency( ) }) .collect::>()?; - plan.with_new_children(children)? }; - let perform_repartition = match plan.output_partitioning() { + let perform_repartition = match new_plan.output_partitioning() { // Apply when underlying node has less than `self.concurrency` amount of concurrency RoundRobinBatch(x) => x < concurrency, UnknownPartitioning(x) => x < concurrency, @@ -71,10 +68,6 @@ fn optimize_concurrency( }; if perform_repartition && !requires_single_partition { - debug!( - "Added RepartitionExec in optimizer with concurrency {}", - concurrency - ); Ok(Arc::new(RepartitionExec::try_new( new_plan, RoundRobinBatch(concurrency), diff --git a/rust/datafusion/src/physical_plan/mod.rs b/rust/datafusion/src/physical_plan/mod.rs index d529e98f75d..64c7c1b3bbd 100644 --- a/rust/datafusion/src/physical_plan/mod.rs +++ b/rust/datafusion/src/physical_plan/mod.rs @@ -54,8 +54,7 @@ pub trait PhysicalPlanner { &self, logical_plan: &LogicalPlan, ctx_state: &ExecutionContextState, - ) -> Result>; -} + ) -> Result>;} /// Partition-aware execution plan for a relation #[async_trait] diff --git a/rust/datafusion/src/physical_plan/planner.rs b/rust/datafusion/src/physical_plan/planner.rs index c878560564a..df99c2fabaa 100644 --- a/rust/datafusion/src/physical_plan/planner.rs +++ b/rust/datafusion/src/physical_plan/planner.rs @@ -28,19 +28,17 @@ use crate::logical_plan::{ DFSchema, Expr, LogicalPlan, Operator, Partitioning as LogicalPartitioning, PlanType, StringifiedPlan, UserDefinedLogicalNode, }; -use crate::physical_plan::coalesce_batches::CoalesceBatchesExec; use crate::physical_plan::explain::ExplainExec; +use crate::physical_plan::expressions; use crate::physical_plan::expressions::{CaseExpr, Column, Literal, PhysicalSortExpr}; use crate::physical_plan::filter::FilterExec; use crate::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec}; use crate::physical_plan::hash_join::HashJoinExec; use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; -use crate::physical_plan::merge::MergeExec; use crate::physical_plan::projection::ProjectionExec; use crate::physical_plan::repartition::RepartitionExec; use crate::physical_plan::sort::SortExec; use crate::physical_plan::udf; -use crate::physical_plan::{expressions, Distribution}; use crate::physical_plan::{hash_utils, Partitioning}; use crate::physical_plan::{AggregateExpr, ExecutionPlan, PhysicalExpr, PhysicalPlanner}; use crate::prelude::JoinType; @@ -104,87 +102,20 @@ impl DefaultPhysicalPlanner { Self { extension_planners } } - /// Create a physical plan from a logical plan + /// Optimize a physical plan fn optimize_plan( &self, plan: Arc, ctx_state: &ExecutionContextState, - ) -> Result> { - // TODO: make this a optimizer rule - let children = plan - .children() - .iter() - .map(|child| self.optimize_plan(child.clone(), ctx_state)) - .collect::>>()?; - - let plan = self.optimize_physical_plan(plan, ctx_state)?; - - Ok(if children.is_empty() { - // leaf node, children cannot be replaced - plan.clone() - } else { - // wrap operators in CoalesceBatches to avoid lots of tiny batches when we have - // highly selective filters - let plan_any = plan.as_any(); - //TODO we should do this in a more generic way either by wrapping all operators - // or having an API so that operators can declare when their inputs or outputs - // need to be wrapped in a coalesce batches operator. - // See https://issues.apache.org/jira/browse/ARROW-11068 - let wrap_in_coalesce = plan_any.downcast_ref::().is_some() - || plan_any.downcast_ref::().is_some() - || plan_any.downcast_ref::().is_some(); - - //TODO we should also do this for HashAggregateExec but we need to update tests - // as part of this work - see https://issues.apache.org/jira/browse/ARROW-11068 - // || plan_any.downcast_ref::().is_some(); - - let plan = if wrap_in_coalesce { - //TODO we should add specific configuration settings for coalescing batches and - // we should do that once https://issues.apache.org/jira/browse/ARROW-11059 is - // implemented. For now, we choose half the configured batch size to avoid copies - // when a small number of rows are removed from a batch - let target_batch_size = ctx_state.config.batch_size / 2; - Arc::new(CoalesceBatchesExec::new(plan.clone(), target_batch_size)) - } else { - plan.clone() - }; - - let children = plan.children().clone(); - - match plan.required_child_distribution() { - Distribution::UnspecifiedDistribution => { - plan.with_new_children(children)? - } - Distribution::SinglePartition => plan.with_new_children( - children - .iter() - .map(|child| { - if child.output_partitioning().partition_count() == 1 { - child.clone() - } else { - Arc::new(MergeExec::new(child.clone())) - } - }) - .collect(), - )?, - } - }) - } - - // Optimize physical plan given based on active optimizers - fn optimize_physical_plan( - &self, - physical_plan: Arc, - ctx_state: &ExecutionContextState, ) -> Result> { let optimizers = &ctx_state.config.physical_optimizers; - let mut new_plan = physical_plan.clone(); - debug!("Physical plan:\n{:?}", new_plan); + println!("Physical plan:\n{:?}", plan); + let mut new_plan = plan; for optimizer in optimizers { new_plan = optimizer.optimize(new_plan, &ctx_state.config)?; } - debug!("Optimized physical plan:\n{:?}", new_plan); + println!("Optimized physical plan:\n{:?}", new_plan); Ok(new_plan) } @@ -211,7 +142,7 @@ impl DefaultPhysicalPlanner { .. } => { // Initially need to perform the aggregate and then merge the partitions - let input_exec = self.create_physical_plan(input, ctx_state)?; + let input_exec = self.create_initial_plan(input, ctx_state)?; let input_schema = input_exec.schema(); let physical_input_schema = input_exec.as_ref().schema(); let logical_input_schema = input.as_ref().schema(); @@ -267,7 +198,7 @@ impl DefaultPhysicalPlanner { )?)) } LogicalPlan::Projection { input, expr, .. } => { - let input_exec = self.create_physical_plan(input, ctx_state)?; + let input_exec = self.create_initial_plan(input, ctx_state)?; let input_schema = input.as_ref().schema(); let runtime_expr = expr .iter() @@ -287,7 +218,7 @@ impl DefaultPhysicalPlanner { LogicalPlan::Filter { input, predicate, .. } => { - let input = self.create_physical_plan(input, ctx_state)?; + let input = self.create_initial_plan(input, ctx_state)?; let input_schema = input.as_ref().schema(); let runtime_expr = self.create_physical_expr(predicate, &input_schema, ctx_state)?; @@ -296,7 +227,7 @@ impl DefaultPhysicalPlanner { LogicalPlan::Union { inputs, .. } => { let physical_plans = inputs .iter() - .map(|input| self.create_physical_plan(input, ctx_state)) + .map(|input| self.create_initial_plan(input, ctx_state)) .collect::>>()?; Ok(Arc::new(UnionExec::new(physical_plans))) } @@ -304,7 +235,7 @@ impl DefaultPhysicalPlanner { input, partitioning_scheme, } => { - let input = self.create_physical_plan(input, ctx_state)?; + let input = self.create_initial_plan(input, ctx_state)?; let input_schema = input.schema(); let physical_partitioning = match partitioning_scheme { LogicalPartitioning::RoundRobinBatch(n) => { @@ -326,7 +257,7 @@ impl DefaultPhysicalPlanner { )?)) } LogicalPlan::Sort { expr, input, .. } => { - let input = self.create_physical_plan(input, ctx_state)?; + let input = self.create_initial_plan(input, ctx_state)?; let input_schema = input.as_ref().schema(); let sort_expr = expr @@ -360,8 +291,8 @@ impl DefaultPhysicalPlanner { join_type, .. } => { - let left = self.create_physical_plan(left, ctx_state)?; - let right = self.create_physical_plan(right, ctx_state)?; + let left = self.create_initial_plan(left, ctx_state)?; + let right = self.create_initial_plan(right, ctx_state)?; let physical_join_type = match join_type { JoinType::Inner => hash_utils::JoinType::Inner, JoinType::Left => hash_utils::JoinType::Left, @@ -384,7 +315,7 @@ impl DefaultPhysicalPlanner { ))), LogicalPlan::Limit { input, n, .. } => { let limit = *n; - let input = self.create_physical_plan(input, ctx_state)?; + let input = self.create_initial_plan(input, ctx_state)?; // GlobalLimitExec requires a single partition for input let input = if input.output_partitioning().partition_count() == 1 { @@ -412,7 +343,7 @@ impl DefaultPhysicalPlanner { stringified_plans, schema, } => { - let input = self.create_physical_plan(plan, ctx_state)?; + let input = self.create_initial_plan(plan, ctx_state)?; let mut stringified_plans = stringified_plans .iter() @@ -436,7 +367,7 @@ impl DefaultPhysicalPlanner { let inputs = node .inputs() .into_iter() - .map(|input_plan| self.create_physical_plan(input_plan, ctx_state)) + .map(|input_plan| self.create_initial_plan(input_plan, ctx_state)) .collect::>>()?; let maybe_plan = self.extension_planners.iter().try_fold( From 2262c398487f5f9da1db56cb8cda2a4601f1bb8c Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Thu, 1 Apr 2021 22:02:47 +0200 Subject: [PATCH 19/29] Fix tests --- rust/datafusion/src/datasource/parquet.rs | 5 +---- rust/datafusion/src/execution/context.rs | 16 +--------------- rust/datafusion/src/physical_plan/planner.rs | 4 ++-- 3 files changed, 4 insertions(+), 21 deletions(-) diff --git a/rust/datafusion/src/datasource/parquet.rs b/rust/datafusion/src/datasource/parquet.rs index ed45230c0fe..30e47df5f64 100644 --- a/rust/datafusion/src/datasource/parquet.rs +++ b/rust/datafusion/src/datasource/parquet.rs @@ -120,7 +120,7 @@ mod tests { let exec = table.scan(&projection, 2, &[], None)?; let stream = exec.execute(0).await?; - let count = stream + let _ = stream .map(|batch| { let batch = batch.unwrap(); assert_eq!(11, batch.num_columns()); @@ -129,9 +129,6 @@ mod tests { .fold(0, |acc, _| async move { acc + 1i32 }) .await; - // we should have seen 4 batches of 2 rows - assert_eq!(4, count); - // test metadata assert_eq!(table.statistics().num_rows, Some(8)); assert_eq!(table.statistics().total_byte_size, Some(671)); diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs index 9531fd38520..3286c99aff2 100644 --- a/rust/datafusion/src/execution/context.rs +++ b/rust/datafusion/src/execution/context.rs @@ -823,15 +823,6 @@ mod tests { let partition_count = 4; let results = execute("SELECT c1, c2 FROM test", partition_count).await?; - // there should be one batch per partition - assert_eq!(results.len(), partition_count); - - // each batch should contain 2 columns and 10 rows with correct field names - for batch in &results { - assert_eq!(batch.num_columns(), 2); - assert_eq!(batch.num_rows(), 10); - } - let expected = vec![ "+----+----+", "| c1 | c2 |", @@ -943,15 +934,12 @@ mod tests { let results = collect_partitioned(physical_plan).await?; // note that the order of partitions is not deterministic - let mut num_batches = 0; let mut num_rows = 0; for partition in &results { for batch in partition { - num_batches += 1; num_rows += batch.num_rows(); } } - assert_eq!(2, num_batches); assert_eq!(20, num_rows); let results: Vec = results.into_iter().flatten().collect(); @@ -1023,9 +1011,7 @@ mod tests { assert_eq!("c2", physical_plan.schema().field(0).name().as_str()); let batches = collect(physical_plan).await?; - assert_eq!(4, batches.len()); - assert_eq!(1, batches[0].num_columns()); - assert_eq!(10, batches[0].num_rows()); + assert_eq!(40, batches.iter().map(|x|x.num_rows()).sum::()); Ok(()) } diff --git a/rust/datafusion/src/physical_plan/planner.rs b/rust/datafusion/src/physical_plan/planner.rs index df99c2fabaa..dfc11f5f347 100644 --- a/rust/datafusion/src/physical_plan/planner.rs +++ b/rust/datafusion/src/physical_plan/planner.rs @@ -109,13 +109,13 @@ impl DefaultPhysicalPlanner { ctx_state: &ExecutionContextState, ) -> Result> { let optimizers = &ctx_state.config.physical_optimizers; - println!("Physical plan:\n{:?}", plan); + debug!("Physical plan:\n{:?}", plan); let mut new_plan = plan; for optimizer in optimizers { new_plan = optimizer.optimize(new_plan, &ctx_state.config)?; } - println!("Optimized physical plan:\n{:?}", new_plan); + debug!("Optimized physical plan:\n{:?}", new_plan); Ok(new_plan) } From 15009abb859fbc94c97aa32a4a19e534e0c23175 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 2 Apr 2021 08:25:01 +0200 Subject: [PATCH 20/29] Docs, test --- .../src/physical_optimizer/merge_exec.rs | 6 +-- .../src/physical_optimizer/repartition.rs | 40 +++++++++++++++++++ 2 files changed, 43 insertions(+), 3 deletions(-) diff --git a/rust/datafusion/src/physical_optimizer/merge_exec.rs b/rust/datafusion/src/physical_optimizer/merge_exec.rs index fa8542f7f3d..255d1bc2458 100644 --- a/rust/datafusion/src/physical_optimizer/merge_exec.rs +++ b/rust/datafusion/src/physical_optimizer/merge_exec.rs @@ -15,9 +15,9 @@ // specific language governing permissions and limitations // under the License. -//! CoalesceBatches optimizer that groups batches together rows -//! in bigger batches to avoid overhead with small batches - +//! AddMergeExec adds MergeExec to merge plans +//! with more partitions into one partition when the node +//! needs a single partition use super::optimizer::PhysicalOptimizerRule; use crate::{ error::Result, diff --git a/rust/datafusion/src/physical_optimizer/repartition.rs b/rust/datafusion/src/physical_optimizer/repartition.rs index bc2c200bf7e..729dc80bf64 100644 --- a/rust/datafusion/src/physical_optimizer/repartition.rs +++ b/rust/datafusion/src/physical_optimizer/repartition.rs @@ -132,4 +132,44 @@ mod tests { Ok(()) } + + #[test] + fn repartition_deepest_node() -> Result<()> { + let parquet_project = ProjectionExec::try_new( + vec![], + Arc::new(ProjectionExec::try_new( + vec![], + Arc::new(ParquetExec::new( + vec![ParquetPartition { + filenames: vec!["x".to_string()], + statistics: Statistics::default(), + }], + Schema::empty(), + None, + None, + 2048, + None, + )), + )?), + )?; + + let optimizer = Repartition {}; + + let optimized = optimizer.optimize( + Arc::new(parquet_project), + &ExecutionConfig::new().with_concurrency(10), + )?; + + // RepartitionExec is added to deepest node + assert!( + optimized.children()[0].as_any() + .downcast_ref::().is_none() + ); + assert!( + optimized.children()[0].children()[0].as_any() + .downcast_ref::().is_some() + ); + + Ok(()) + } } From 86f8dda9f595a3832e99cbba1604077025d4f799 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 2 Apr 2021 09:05:44 +0200 Subject: [PATCH 21/29] Fmt --- rust/datafusion/src/execution/context.rs | 4 ++-- rust/datafusion/src/physical_optimizer/mod.rs | 4 ++-- .../src/physical_optimizer/repartition.rs | 16 ++++++++-------- rust/datafusion/src/physical_plan/mod.rs | 3 ++- 4 files changed, 14 insertions(+), 13 deletions(-) diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs index 102fb1402ae..5887ef6acb5 100644 --- a/rust/datafusion/src/execution/context.rs +++ b/rust/datafusion/src/execution/context.rs @@ -58,8 +58,8 @@ use crate::optimizer::limit_push_down::LimitPushDown; use crate::optimizer::optimizer::OptimizerRule; use crate::optimizer::projection_push_down::ProjectionPushDown; use crate::physical_optimizer::coalesce_batches::CoalesceBatches; -use crate::physical_optimizer::repartition::Repartition; use crate::physical_optimizer::merge_exec::AddMergeExec; +use crate::physical_optimizer::repartition::Repartition; use crate::physical_plan::csv::CsvReadOptions; use crate::physical_plan::planner::DefaultPhysicalPlanner; @@ -1013,7 +1013,7 @@ mod tests { assert_eq!("c2", physical_plan.schema().field(0).name().as_str()); let batches = collect(physical_plan).await?; - assert_eq!(40, batches.iter().map(|x|x.num_rows()).sum::()); + assert_eq!(40, batches.iter().map(|x| x.num_rows()).sum::()); Ok(()) } diff --git a/rust/datafusion/src/physical_optimizer/mod.rs b/rust/datafusion/src/physical_optimizer/mod.rs index b7097120b95..eca63db9f3d 100644 --- a/rust/datafusion/src/physical_optimizer/mod.rs +++ b/rust/datafusion/src/physical_optimizer/mod.rs @@ -18,7 +18,7 @@ //! This module contains a query optimizer that operates against a physical plan and applies //! rules to a physical plan, such as "Repartition". -pub mod optimizer; -pub mod repartition; pub mod coalesce_batches; pub mod merge_exec; +pub mod optimizer; +pub mod repartition; diff --git a/rust/datafusion/src/physical_optimizer/repartition.rs b/rust/datafusion/src/physical_optimizer/repartition.rs index 729dc80bf64..3be2871d50a 100644 --- a/rust/datafusion/src/physical_optimizer/repartition.rs +++ b/rust/datafusion/src/physical_optimizer/repartition.rs @@ -161,14 +161,14 @@ mod tests { )?; // RepartitionExec is added to deepest node - assert!( - optimized.children()[0].as_any() - .downcast_ref::().is_none() - ); - assert!( - optimized.children()[0].children()[0].as_any() - .downcast_ref::().is_some() - ); + assert!(optimized.children()[0] + .as_any() + .downcast_ref::() + .is_none()); + assert!(optimized.children()[0].children()[0] + .as_any() + .downcast_ref::() + .is_some()); Ok(()) } diff --git a/rust/datafusion/src/physical_plan/mod.rs b/rust/datafusion/src/physical_plan/mod.rs index 64c7c1b3bbd..d529e98f75d 100644 --- a/rust/datafusion/src/physical_plan/mod.rs +++ b/rust/datafusion/src/physical_plan/mod.rs @@ -54,7 +54,8 @@ pub trait PhysicalPlanner { &self, logical_plan: &LogicalPlan, ctx_state: &ExecutionContextState, - ) -> Result>;} + ) -> Result>; +} /// Partition-aware execution plan for a relation #[async_trait] From 691b354fb4f1463b2b0069c86c88cc1e15106439 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 2 Apr 2021 18:18:01 +0200 Subject: [PATCH 22/29] Exclude empty exec from repartition optimizer --- rust/datafusion/src/physical_optimizer/repartition.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/rust/datafusion/src/physical_optimizer/repartition.rs b/rust/datafusion/src/physical_optimizer/repartition.rs index 3be2871d50a..496e30b4783 100644 --- a/rust/datafusion/src/physical_optimizer/repartition.rs +++ b/rust/datafusion/src/physical_optimizer/repartition.rs @@ -19,7 +19,9 @@ use std::sync::Arc; use super::optimizer::PhysicalOptimizerRule; -use crate::physical_plan::{repartition::RepartitionExec, ExecutionPlan}; +use crate::physical_plan::{ + empty::EmptyExec, repartition::RepartitionExec, ExecutionPlan, +}; use crate::physical_plan::{Distribution, Partitioning::*}; use crate::{error::Result, execution::context::ExecutionConfig}; @@ -67,7 +69,11 @@ fn optimize_concurrency( Hash(_, _) => false, }; - if perform_repartition && !requires_single_partition { + // TODO: EmptyExec causes failures with RepartitionExec + // But also not very useful to inlude + let is_empty_exec = plan.as_any().downcast_ref::().is_some(); + + if perform_repartition && !requires_single_partition && !is_empty_exec { Ok(Arc::new(RepartitionExec::try_new( new_plan, RoundRobinBatch(concurrency), From e5558d5413c247d85eb594667d723f376436c1f2 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 2 Apr 2021 18:29:00 +0200 Subject: [PATCH 23/29] Add method to add physical optimizer rule as well --- rust/datafusion/src/execution/context.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs index 5887ef6acb5..0a2febbd715 100644 --- a/rust/datafusion/src/execution/context.rs +++ b/rust/datafusion/src/execution/context.rs @@ -673,6 +673,15 @@ impl ExecutionConfig { self } + /// Adds a new [`PhysicalOptimizerRule`] + pub fn add_physical_optimizer_rule( + mut self, + optimizer_rule: Arc, + ) -> Self { + self.physical_optimizers.push(optimizer_rule); + self + } + /// Selects a name for the default catalog and schema pub fn with_default_catalog_and_schema( mut self, From fb2183be9347e5c5d42ade5a461dc61e68ea9a74 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sat, 3 Apr 2021 21:01:22 +0200 Subject: [PATCH 24/29] Disable rule for concurrency of 1 --- rust/datafusion/src/physical_optimizer/repartition.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/rust/datafusion/src/physical_optimizer/repartition.rs b/rust/datafusion/src/physical_optimizer/repartition.rs index 496e30b4783..82f46f9cbbb 100644 --- a/rust/datafusion/src/physical_optimizer/repartition.rs +++ b/rust/datafusion/src/physical_optimizer/repartition.rs @@ -89,7 +89,12 @@ impl PhysicalOptimizerRule for Repartition { plan: Arc, config: &ExecutionConfig, ) -> Result> { - optimize_concurrency(config.concurrency, true, plan) + // Don't run optimizer if concurrency == 1 + if config.concurrency == 1 { + Ok(plan) + } else { + optimize_concurrency(config.concurrency, true, plan) + } } fn name(&self) -> &str { From bf434a9a541d27ee01a9fce4b472c69ef939ca9f Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Mon, 5 Apr 2021 12:01:16 +0200 Subject: [PATCH 25/29] Change to Distribution::SinglePartition --- rust/datafusion/tests/user_defined_plan.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/datafusion/tests/user_defined_plan.rs b/rust/datafusion/tests/user_defined_plan.rs index 4f43c46c717..4d842da3ea4 100644 --- a/rust/datafusion/tests/user_defined_plan.rs +++ b/rust/datafusion/tests/user_defined_plan.rs @@ -353,7 +353,7 @@ impl ExecutionPlan for TopKExec { } fn required_child_distribution(&self) -> Distribution { - Distribution::UnspecifiedDistribution + Distribution::SinglePartition } fn children(&self) -> Vec> { From d9c7a2cf7a7fe4f225e32070ed3878a45ce97c17 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Mon, 5 Apr 2021 16:25:31 +0200 Subject: [PATCH 26/29] Use derive(Debug) --- rust/datafusion/tests/user_defined_plan.rs | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/rust/datafusion/tests/user_defined_plan.rs b/rust/datafusion/tests/user_defined_plan.rs index 4d842da3ea4..3fa7de629ba 100644 --- a/rust/datafusion/tests/user_defined_plan.rs +++ b/rust/datafusion/tests/user_defined_plan.rs @@ -325,18 +325,13 @@ impl ExtensionPlanner for TopKPlanner { /// Physical operator that implements TopK for u64 data types. This /// code is not general and is meant as an illustration only +#[derive(Debug)] struct TopKExec { input: Arc, /// The maxium number of values k: usize, } -impl Debug for TopKExec { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "TopKExec") - } -} - #[async_trait] impl ExecutionPlan for TopKExec { /// Return a reference to Any that can be used for downcasting From 1cad621c48ffe822641792017ff13ad89f5053a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Mon, 5 Apr 2021 19:51:59 +0200 Subject: [PATCH 27/29] Revert debug implementation --- rust/datafusion/tests/user_defined_plan.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/rust/datafusion/tests/user_defined_plan.rs b/rust/datafusion/tests/user_defined_plan.rs index 3fa7de629ba..4d842da3ea4 100644 --- a/rust/datafusion/tests/user_defined_plan.rs +++ b/rust/datafusion/tests/user_defined_plan.rs @@ -325,13 +325,18 @@ impl ExtensionPlanner for TopKPlanner { /// Physical operator that implements TopK for u64 data types. This /// code is not general and is meant as an illustration only -#[derive(Debug)] struct TopKExec { input: Arc, /// The maxium number of values k: usize, } +impl Debug for TopKExec { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "TopKExec") + } +} + #[async_trait] impl ExecutionPlan for TopKExec { /// Return a reference to Any that can be used for downcasting From 2a1e53f31da4dec7cf82e9c79bfec2dcc3213d2b Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Thu, 8 Apr 2021 21:01:11 +0200 Subject: [PATCH 28/29] Fix topkexec --- rust/datafusion/tests/user_defined_plan.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/rust/datafusion/tests/user_defined_plan.rs b/rust/datafusion/tests/user_defined_plan.rs index 4d842da3ea4..21b25c094f4 100644 --- a/rust/datafusion/tests/user_defined_plan.rs +++ b/rust/datafusion/tests/user_defined_plan.rs @@ -471,12 +471,10 @@ impl Stream for TopKReader { return Poll::Ready(None); } // this aggregates and thus returns a single RecordBatch. - self.done = true; // take this as immutable let k = self.k; let schema = self.schema(); - let top_values = self .input .as_mut() @@ -484,6 +482,7 @@ impl Stream for TopKReader { .try_fold( BTreeMap::::new(), move |top_values, batch| async move { + println!("xxx"); accumulate_batch(&batch, top_values, &k) .map_err(DataFusionError::into_arrow_external_error) }, @@ -508,7 +507,10 @@ impl Stream for TopKReader { }); let mut top_values = Box::pin(top_values.into_stream()); - top_values.poll_next_unpin(cx) + top_values.poll_next_unpin(cx).map(|batch| { + self.done = true; + batch + }) } } From 4bb4dc5bd41771dfbbe1b9fd567b254e764d8c24 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Thu, 8 Apr 2021 21:04:07 +0200 Subject: [PATCH 29/29] Remove print --- rust/datafusion/tests/user_defined_plan.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/rust/datafusion/tests/user_defined_plan.rs b/rust/datafusion/tests/user_defined_plan.rs index 21b25c094f4..aae5c597d82 100644 --- a/rust/datafusion/tests/user_defined_plan.rs +++ b/rust/datafusion/tests/user_defined_plan.rs @@ -482,7 +482,6 @@ impl Stream for TopKReader { .try_fold( BTreeMap::::new(), move |top_values, batch| async move { - println!("xxx"); accumulate_batch(&batch, top_values, &k) .map_err(DataFusionError::into_arrow_external_error) },