From c134f53c429789dd7d399e8b561dc7558ac82e43 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Mon, 2 Jan 2023 15:42:42 +0300 Subject: [PATCH 1/4] Separate sort rule --- datafusion/core/src/execution/context.rs | 8 +-- .../src/physical_optimizer/enforcement.rs | 55 ++++++------------- datafusion/core/src/physical_optimizer/mod.rs | 2 +- .../src/physical_optimizer/repartition.rs | 10 +++- ...{optimize_sorts.rs => sort_enforcement.rs} | 29 +++++----- datafusion/core/src/physical_plan/limit.rs | 4 ++ 6 files changed, 49 insertions(+), 59 deletions(-) rename datafusion/core/src/physical_optimizer/{optimize_sorts.rs => sort_enforcement.rs} (97%) diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index 4a49e8ec7000c..cc35b326b4a4a 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -77,7 +77,7 @@ use crate::config::{ OPT_OPTIMIZER_MAX_PASSES, OPT_OPTIMIZER_SKIP_FAILED_RULES, }; use crate::execution::{runtime_env::RuntimeEnv, FunctionRegistry}; -use crate::physical_optimizer::enforcement::BasicEnforcement; +use crate::physical_optimizer::enforcement::EnforceDistribution; use crate::physical_plan::file_format::{plan_to_csv, plan_to_json, plan_to_parquet}; use crate::physical_plan::planner::DefaultPhysicalPlanner; use crate::physical_plan::udaf::AggregateUDF; @@ -100,9 +100,9 @@ use crate::catalog::listing_schema::ListingSchemaProvider; use crate::datasource::object_store::ObjectStoreUrl; use crate::execution::memory_pool::MemoryPool; use crate::physical_optimizer::global_sort_selection::GlobalSortSelection; -use crate::physical_optimizer::optimize_sorts::OptimizeSorts; use crate::physical_optimizer::pipeline_checker::PipelineChecker; use crate::physical_optimizer::pipeline_fixer::PipelineFixer; +use crate::physical_optimizer::sort_enforcement::EnforceSorting; use uuid::Uuid; use super::options::{ @@ -1562,12 +1562,12 @@ impl SessionState { // It's for adding essential repartition and local sorting operator to satisfy the // required distribution and local sort. // Please make sure that the whole plan tree is determined. - physical_optimizers.push(Arc::new(BasicEnforcement::new())); + physical_optimizers.push(Arc::new(EnforceDistribution::new())); // `BasicEnforcement` stage conservatively inserts `SortExec`s to satisfy ordering requirements. // However, a deeper analysis may sometimes reveal that such a `SortExec` is actually unnecessary. // These cases typically arise when we have reversible `WindowAggExec`s or deep subqueries. The // rule below performs this analysis and removes unnecessary `SortExec`s. - physical_optimizers.push(Arc::new(OptimizeSorts::new())); + physical_optimizers.push(Arc::new(EnforceSorting::new())); // It will not influence the distribution and ordering of the whole plan tree. // Therefore, to avoid influencing other rules, it should be run at last. if config.coalesce_batches() { diff --git a/datafusion/core/src/physical_optimizer/enforcement.rs b/datafusion/core/src/physical_optimizer/enforcement.rs index 7c55a81aa75cf..65ec5e46985bd 100644 --- a/datafusion/core/src/physical_optimizer/enforcement.rs +++ b/datafusion/core/src/physical_optimizer/enforcement.rs @@ -15,14 +15,13 @@ // specific language governing permissions and limitations // under the License. -//! Enforcement optimizer rules are used to make sure the plan's Distribution and Ordering -//! requirements are met by inserting necessary [[RepartitionExec]] and [[SortExec]]. +//! Enforcement optimizer rules are used to make sure the plan's Distribution +//! requirements are met by inserting necessary [[RepartitionExec]]. //! use crate::config::{ ConfigOptions, OPT_TARGET_PARTITIONS, OPT_TOP_DOWN_JOIN_KEY_REORDERING, }; use crate::error::Result; -use crate::physical_optimizer::utils::{add_sort_above_child, ordering_satisfy}; use crate::physical_optimizer::PhysicalOptimizerRule; use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}; use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; @@ -48,7 +47,7 @@ use datafusion_physical_expr::{ use std::collections::HashMap; use std::sync::Arc; -/// BasicEnforcement rule, it ensures the Distribution and Ordering requirements are met +/// EnforceDistribution rule, it ensures the Distribution requirements are met /// in the strictest way. It might add additional [[RepartitionExec]] to the plan tree /// and give a non-optimal plan, but it can avoid the possible data skew in joins. /// @@ -57,16 +56,16 @@ use std::sync::Arc; /// /// This rule only chooses the exactly match and satisfies the Distribution(a, b, c) by a HashPartition(a, b, c). #[derive(Default)] -pub struct BasicEnforcement {} +pub struct EnforceDistribution {} -impl BasicEnforcement { +impl EnforceDistribution { #[allow(missing_docs)] pub fn new() -> Self { Self {} } } -impl PhysicalOptimizerRule for BasicEnforcement { +impl PhysicalOptimizerRule for EnforceDistribution { fn optimize( &self, plan: Arc, @@ -93,16 +92,13 @@ impl PhysicalOptimizerRule for BasicEnforcement { } else { plan }; - Ok(Some(ensure_distribution_and_ordering( - adjusted, - target_partitions, - )?)) + Ok(Some(ensure_distribution(adjusted, target_partitions)?)) } }) } fn name(&self) -> &str { - "BasicEnforcement" + "EnforceDistribution" } fn schema_check(&self) -> bool { @@ -836,7 +832,7 @@ fn new_join_conditions( /// Within this function, it checks whether we need to add additional plan operators /// of data exchanging and data ordering to satisfy the required distribution and ordering. /// And we should avoid to manually add plan operators of data exchanging and data ordering in other places -fn ensure_distribution_and_ordering( +fn ensure_distribution( plan: Arc, target_partitions: usize, ) -> Result> { @@ -845,13 +841,11 @@ fn ensure_distribution_and_ordering( } let required_input_distributions = plan.required_input_distribution(); - let required_input_orderings = plan.required_input_ordering(); let children: Vec> = plan.children(); assert_eq!(children.len(), required_input_distributions.len()); - assert_eq!(children.len(), required_input_orderings.len()); // Add RepartitionExec to guarantee output partitioning - let children = children + let new_children: Result>> = children .into_iter() .zip(required_input_distributions.into_iter()) .map(|(child, required)| { @@ -874,24 +868,8 @@ fn ensure_distribution_and_ordering( }; new_child } - }); - - // Add local SortExec to guarantee output ordering within each partition - let new_children: Result>> = children - .zip(required_input_orderings.into_iter()) - .map(|(child_result, required)| { - let child = child_result?; - if ordering_satisfy(child.output_ordering(), required, || { - child.equivalence_properties() - }) { - Ok(child) - } else { - let sort_expr = required.unwrap().to_vec(); - add_sort_above_child(&child, sort_expr) - } }) .collect(); - with_new_children_if_necessary(plan, new_children?) } @@ -983,6 +961,7 @@ mod tests { use super::*; use crate::datasource::listing::PartitionedFile; use crate::datasource::object_store::ObjectStoreUrl; + use crate::physical_optimizer::sort_enforcement::EnforceSorting; use crate::physical_plan::aggregates::{ AggregateExec, AggregateMode, PhysicalGroupBy, }; @@ -1140,8 +1119,10 @@ mod tests { config.set_usize(OPT_TARGET_PARTITIONS, 10); // run optimizer - let optimizer = BasicEnforcement {}; + let optimizer = EnforceDistribution {}; let optimized = optimizer.optimize($PLAN, &config)?; + let optimizer = EnforceSorting {}; + let optimized = optimizer.optimize(optimized, &config)?; // Now format correctly let plan = displayable(optimized.as_ref()).indent().to_string(); @@ -1660,7 +1641,7 @@ mod tests { Column::new_with_schema("c1", &right.schema()).unwrap(), ), ]; - let bottom_left_join = ensure_distribution_and_ordering( + let bottom_left_join = ensure_distribution( hash_join_exec(left.clone(), right.clone(), &join_on, &JoinType::Inner), 10, )?; @@ -1690,7 +1671,7 @@ mod tests { Column::new_with_schema("a1", &right.schema()).unwrap(), ), ]; - let bottom_right_join = ensure_distribution_and_ordering( + let bottom_right_join = ensure_distribution( hash_join_exec(left, right.clone(), &join_on, &JoinType::Inner), 10, )?; @@ -1779,7 +1760,7 @@ mod tests { Column::new_with_schema("b1", &right.schema()).unwrap(), ), ]; - let bottom_left_join = ensure_distribution_and_ordering( + let bottom_left_join = ensure_distribution( hash_join_exec(left.clone(), right.clone(), &join_on, &JoinType::Inner), 10, )?; @@ -1809,7 +1790,7 @@ mod tests { Column::new_with_schema("a1", &right.schema()).unwrap(), ), ]; - let bottom_right_join = ensure_distribution_and_ordering( + let bottom_right_join = ensure_distribution( hash_join_exec(left, right.clone(), &join_on, &JoinType::Inner), 10, )?; diff --git a/datafusion/core/src/physical_optimizer/mod.rs b/datafusion/core/src/physical_optimizer/mod.rs index fb07d54b99d98..bf8df7b388dcd 100644 --- a/datafusion/core/src/physical_optimizer/mod.rs +++ b/datafusion/core/src/physical_optimizer/mod.rs @@ -23,11 +23,11 @@ pub mod coalesce_batches; pub mod enforcement; pub mod global_sort_selection; pub mod join_selection; -pub mod optimize_sorts; pub mod optimizer; pub mod pipeline_checker; pub mod pruning; pub mod repartition; +pub mod sort_enforcement; mod utils; pub mod pipeline_fixer; diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs index 5bdbf59a606c8..cc01175f09030 100644 --- a/datafusion/core/src/physical_optimizer/repartition.rs +++ b/datafusion/core/src/physical_optimizer/repartition.rs @@ -240,7 +240,8 @@ mod tests { use super::*; use crate::datasource::listing::PartitionedFile; use crate::datasource::object_store::ObjectStoreUrl; - use crate::physical_optimizer::enforcement::BasicEnforcement; + use crate::physical_optimizer::enforcement::EnforceDistribution; + use crate::physical_optimizer::sort_enforcement::EnforceSorting; use crate::physical_plan::aggregates::{ AggregateExec, AggregateMode, PhysicalGroupBy, }; @@ -369,9 +370,12 @@ mod tests { // run optimizer let optimizers: Vec> = vec![ Arc::new(Repartition::new()), - // The `BasicEnforcement` is an essential rule to be applied. + // The `EnforceDistribution` is an essential rule to be applied. // Otherwise, the correctness of the generated optimized plan cannot be guaranteed - Arc::new(BasicEnforcement::new()), + Arc::new(EnforceDistribution::new()), + // The `EnforceSorting` is an essential rule to be applied. + // Otherwise, the correctness of the generated optimized plan cannot be guaranteed + Arc::new(EnforceSorting::new()), ]; let optimized = optimizers.into_iter().fold($PLAN, |plan, optimizer| { optimizer.optimize(plan, &config).unwrap() diff --git a/datafusion/core/src/physical_optimizer/optimize_sorts.rs b/datafusion/core/src/physical_optimizer/sort_enforcement.rs similarity index 97% rename from datafusion/core/src/physical_optimizer/optimize_sorts.rs rename to datafusion/core/src/physical_optimizer/sort_enforcement.rs index a47026cc773d4..b7524967e773e 100644 --- a/datafusion/core/src/physical_optimizer/optimize_sorts.rs +++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! OptimizeSorts optimizer rule inspects [SortExec]s in the given physical +//! EnforceSorting optimizer rule inspects [SortExec]s in the given physical //! plan and removes the ones it can prove unnecessary. The rule can work on //! valid *and* invalid physical plans with respect to sorting requirements, //! but always produces a valid physical plan in this sense. @@ -45,16 +45,16 @@ use std::sync::Arc; /// This rule inspects SortExec's in the given physical plan and removes the /// ones it can prove unnecessary. #[derive(Default)] -pub struct OptimizeSorts {} +pub struct EnforceSorting {} -impl OptimizeSorts { +impl EnforceSorting { #[allow(missing_docs)] pub fn new() -> Self { Self {} } } -/// This is a "data class" we use within the [OptimizeSorts] rule that +/// This is a "data class" we use within the [EnforceSorting] rule that /// tracks the closest `SortExec` descendant for every child of a plan. #[derive(Debug, Clone)] struct PlanWithCorrespondingSort { @@ -118,7 +118,7 @@ impl TreeNodeRewritable for PlanWithCorrespondingSort { } } -impl PhysicalOptimizerRule for OptimizeSorts { +impl PhysicalOptimizerRule for EnforceSorting { fn optimize( &self, plan: Arc, @@ -126,12 +126,12 @@ impl PhysicalOptimizerRule for OptimizeSorts { ) -> Result> { // Execute a post-order traversal to adjust input key ordering: let plan_requirements = PlanWithCorrespondingSort::new(plan); - let adjusted = plan_requirements.transform_up(&optimize_sorts)?; + let adjusted = plan_requirements.transform_up(&ensure_sorting)?; Ok(adjusted.plan) } fn name(&self) -> &str { - "OptimizeSorts" + "EnforceSorting" } fn schema_check(&self) -> bool { @@ -139,7 +139,7 @@ impl PhysicalOptimizerRule for OptimizeSorts { } } -fn optimize_sorts( +fn ensure_sorting( requirements: PlanWithCorrespondingSort, ) -> Result> { // Perform naive analysis at the beginning -- remove already-satisfied sorts: @@ -170,7 +170,8 @@ fn optimize_sorts( let sort_expr = required_ordering.to_vec(); *child = add_sort_above_child(child, sort_expr)?; sort_onwards.push((idx, child.clone())) - } else if let [first, ..] = sort_onwards.as_slice() { + } + if let [first, ..] = sort_onwards.as_slice() { // The ordering requirement is met, we can analyze if there is an unnecessary sort: let sort_any = first.1.clone(); let sort_exec = convert_to_sort_exec(&sort_any)?; @@ -588,7 +589,7 @@ mod tests { "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" ); let optimized_physical_plan = - OptimizeSorts::new().optimize(physical_plan, state.config_options())?; + EnforceSorting::new().optimize(physical_plan, state.config_options())?; let formatted = displayable(optimized_physical_plan.as_ref()) .indent() .to_string(); @@ -687,7 +688,7 @@ mod tests { "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" ); let optimized_physical_plan = - OptimizeSorts::new().optimize(physical_plan, state.config_options())?; + EnforceSorting::new().optimize(physical_plan, state.config_options())?; let formatted = displayable(optimized_physical_plan.as_ref()) .indent() .to_string(); @@ -731,7 +732,7 @@ mod tests { "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" ); let optimized_physical_plan = - OptimizeSorts::new().optimize(physical_plan, state.config_options())?; + EnforceSorting::new().optimize(physical_plan, state.config_options())?; let formatted = displayable(optimized_physical_plan.as_ref()) .indent() .to_string(); @@ -796,7 +797,7 @@ mod tests { "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" ); let optimized_physical_plan = - OptimizeSorts::new().optimize(physical_plan, state.config_options())?; + EnforceSorting::new().optimize(physical_plan, state.config_options())?; let formatted = displayable(optimized_physical_plan.as_ref()) .indent() .to_string(); @@ -856,7 +857,7 @@ mod tests { "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" ); let optimized_physical_plan = - OptimizeSorts::new().optimize(physical_plan, state.config_options())?; + EnforceSorting::new().optimize(physical_plan, state.config_options())?; let formatted = displayable(optimized_physical_plan.as_ref()) .indent() .to_string(); diff --git a/datafusion/core/src/physical_plan/limit.rs b/datafusion/core/src/physical_plan/limit.rs index 3a3a4a20b167c..776fefd8bf9df 100644 --- a/datafusion/core/src/physical_plan/limit.rs +++ b/datafusion/core/src/physical_plan/limit.rs @@ -287,6 +287,10 @@ impl ExecutionPlan for LocalLimitExec { self.input.output_ordering() } + fn maintains_input_order(&self) -> bool { + true + } + fn equivalence_properties(&self) -> EquivalenceProperties { self.input.equivalence_properties() } From 2260133bed235b248a907b476145578e3253d9e8 Mon Sep 17 00:00:00 2001 From: Mehmet Ozan Kabak Date: Fri, 6 Jan 2023 16:14:39 -0600 Subject: [PATCH 2/4] Migrate to clearer file name, tidy up comments --- datafusion/core/src/execution/context.rs | 34 +++++++++---------- .../{enforcement.rs => dist_enforcement.rs} | 12 +++---- datafusion/core/src/physical_optimizer/mod.rs | 2 +- .../src/physical_optimizer/repartition.rs | 6 ++-- .../physical_optimizer/sort_enforcement.rs | 14 +++++--- datafusion/expr/src/logical_plan/builder.rs | 3 +- 6 files changed, 37 insertions(+), 34 deletions(-) rename datafusion/core/src/physical_optimizer/{enforcement.rs => dist_enforcement.rs} (99%) diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index 8e3dba8406abe..fcbd8571a0093 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -67,7 +67,7 @@ use crate::physical_optimizer::repartition::Repartition; use crate::config::ConfigOptions; use crate::execution::{runtime_env::RuntimeEnv, FunctionRegistry}; -use crate::physical_optimizer::enforcement::EnforceDistribution; +use crate::physical_optimizer::dist_enforcement::EnforceDistribution; use crate::physical_plan::file_format::{plan_to_csv, plan_to_json, plan_to_parquet}; use crate::physical_plan::planner::DefaultPhysicalPlanner; use crate::physical_plan::udaf::AggregateUDF; @@ -1457,37 +1457,35 @@ impl SessionState { // output partitioning of some operators in the plan tree, which will influence // other rules. Therefore, it should run as soon as possible. It is optional because: // - It's not used for the distributed engine, Ballista. - // - It's conflicted with some parts of the BasicEnforcement, since it will - // introduce additional repartitioning while the BasicEnforcement aims at - // reducing unnecessary repartitioning. + // - It's conflicted with some parts of the EnforceDistribution, since it will + // introduce additional repartitioning while EnforceDistribution aims to + // reduce unnecessary repartitioning. Arc::new(Repartition::new()), // - Currently it will depend on the partition number to decide whether to change the // single node sort to parallel local sort and merge. Therefore, GlobalSortSelection // should run after the Repartition. // - Since it will change the output ordering of some operators, it should run - // before JoinSelection and BasicEnforcement, which may depend on that. + // before JoinSelection and EnforceSorting, which may depend on that. Arc::new(GlobalSortSelection::new()), // Statistics-based join selection will change the Auto mode to a real join implementation, - // like collect left, or hash join, or future sort merge join, which will - // influence the BasicEnforcement to decide whether to add additional repartition - // and local sort to meet the distribution and ordering requirements. - // Therefore, it should run before BasicEnforcement. + // like collect left, or hash join, or future sort merge join, which will influence the + // EnforceDistribution and EnforceSorting rules as they decide whether to add additional + // repartitioning and local sorting steps to meet distribution and ordering requirements. + // Therefore, it should run before EnforceDistribution and EnforceSorting. Arc::new(JoinSelection::new()), // If the query is processing infinite inputs, the PipelineFixer rule applies the // necessary transformations to make the query runnable (if it is not already runnable). // If the query can not be made runnable, the rule emits an error with a diagnostic message. // Since the transformations it applies may alter output partitioning properties of operators - // (e.g. by swapping hash join sides), this rule runs before BasicEnforcement. + // (e.g. by swapping hash join sides), this rule runs before EnforceDistribution. Arc::new(PipelineFixer::new()), - // EnforceDistribution is for adding essential repartition - // to satisfy the required distribution. - // Please make sure that the whole plan tree is determined. + // The EnforceDistribution rule is for adding essential repartition to satisfy the required + // distribution. Please make sure that the whole plan tree is determined before this rule. Arc::new(EnforceDistribution::new()), - // `EnforceSorting` is for adding essential local sorting - // to satisfy the required ordering. - // Please make sure that the whole plan tree is determined. Since `RepartitionExec`s - // added by `EnforceDistribution` may invalidate ordering requirement. `EnforceSorting` should run - // after `EnforceDistribution`. + // The EnforceSorting rule is for adding essential local sorting to satisfy the required + // ordering. Please make sure that the whole plan tree is determined before this rule. + // Note that one should always run this rule after running the EnforceDistribution rule + // as the latter may break local sorting requirements. Arc::new(EnforceSorting::new()), // The CoalesceBatches rule will not influence the distribution and ordering of the // whole plan tree. Therefore, to avoid influencing other rules, it should run last. diff --git a/datafusion/core/src/physical_optimizer/enforcement.rs b/datafusion/core/src/physical_optimizer/dist_enforcement.rs similarity index 99% rename from datafusion/core/src/physical_optimizer/enforcement.rs rename to datafusion/core/src/physical_optimizer/dist_enforcement.rs index 4ff18ba3d31d0..bc2927559287d 100644 --- a/datafusion/core/src/physical_optimizer/enforcement.rs +++ b/datafusion/core/src/physical_optimizer/dist_enforcement.rs @@ -15,9 +15,9 @@ // specific language governing permissions and limitations // under the License. -//! Enforcement optimizer rules are used to make sure the plan's Distribution -//! requirements are met by inserting necessary [[RepartitionExec]]. -//! +//! EnforceDistribution optimizer rule inspects the physical plan with respect +//! to distribution requirements and adds [RepartitionExec]s to satisfy them +//! when necessary. use crate::config::ConfigOptions; use crate::error::Result; use crate::physical_optimizer::PhysicalOptimizerRule; @@ -45,8 +45,8 @@ use datafusion_physical_expr::{ use std::collections::HashMap; use std::sync::Arc; -/// EnforceDistribution rule, it ensures the Distribution requirements are met -/// in the strictest way. It might add additional [[RepartitionExec]] to the plan tree +/// The EnforceDistribution rule ensures that distribution requirements are met +/// in the strictest way. It might add additional [RepartitionExec] to the plan tree /// and give a non-optimal plan, but it can avoid the possible data skew in joins. /// /// For example for a HashJoin with keys(a, b, c), the required Distribution(a, b, c) can be satisfied by @@ -80,7 +80,7 @@ impl PhysicalOptimizerRule for EnforceDistribution { } else { plan }; - // Distribution and Ordering enforcement need to be applied bottom-up. + // Distribution enforcement needs to be applied bottom-up. new_plan.transform_up(&{ |plan| { let adjusted = if !top_down_join_key_reordering { diff --git a/datafusion/core/src/physical_optimizer/mod.rs b/datafusion/core/src/physical_optimizer/mod.rs index bf8df7b388dcd..3958a546a92df 100644 --- a/datafusion/core/src/physical_optimizer/mod.rs +++ b/datafusion/core/src/physical_optimizer/mod.rs @@ -20,7 +20,7 @@ pub mod aggregate_statistics; pub mod coalesce_batches; -pub mod enforcement; +pub mod dist_enforcement; pub mod global_sort_selection; pub mod join_selection; pub mod optimizer; diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs index 3ad46ed9dbc7d..98ca12a9ef011 100644 --- a/datafusion/core/src/physical_optimizer/repartition.rs +++ b/datafusion/core/src/physical_optimizer/repartition.rs @@ -241,7 +241,7 @@ mod tests { use super::*; use crate::datasource::listing::PartitionedFile; use crate::datasource::object_store::ObjectStoreUrl; - use crate::physical_optimizer::enforcement::EnforceDistribution; + use crate::physical_optimizer::dist_enforcement::EnforceDistribution; use crate::physical_optimizer::sort_enforcement::EnforceSorting; use crate::physical_plan::aggregates::{ AggregateExec, AggregateMode, PhysicalGroupBy, @@ -371,10 +371,10 @@ mod tests { // run optimizer let optimizers: Vec> = vec![ Arc::new(Repartition::new()), - // The `EnforceDistribution` is an essential rule to be applied. + // EnforceDistribution is an essential rule to be applied. // Otherwise, the correctness of the generated optimized plan cannot be guaranteed Arc::new(EnforceDistribution::new()), - // The `EnforceSorting` is an essential rule to be applied. + // EnforceSorting is an essential rule to be applied. // Otherwise, the correctness of the generated optimized plan cannot be guaranteed Arc::new(EnforceSorting::new()), ]; diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs b/datafusion/core/src/physical_optimizer/sort_enforcement.rs index c3ca45fcaebe9..52463b4bdc09f 100644 --- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs +++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs @@ -15,12 +15,16 @@ // specific language governing permissions and limitations // under the License. -//! EnforceSorting optimizer rule inspects [SortExec]s in the given physical -//! plan and removes the ones it can prove unnecessary. The rule can work on -//! valid *and* invalid physical plans with respect to sorting requirements, -//! but always produces a valid physical plan in this sense. +//! EnforceSorting optimizer rule inspects the physical plan with respect +//! to local sorting requirements and does the following: +//! - Adds a [SortExec] when a requirement is not met, +//! - Removes an already-existing [SortExec] if it is possible to prove +//! that this sort is unnecessary +//! The rule can work on valid *and* invalid physical plans with respect to +//! sorting requirements, but always produces a valid physical plan in this sense. //! -//! A non-realistic but easy to follow example: Assume that we somehow get the fragment +//! A non-realistic but easy to follow example for sort removals: Assume that we +//! somehow get the fragment //! "SortExec: [nullable_col@0 ASC]", //! " SortExec: [non_nullable_col@1 ASC]", //! in the physical plan. The first sort is unnecessary since its result is overwritten diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index d49af378b16f1..3aa16ec68f8c3 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -273,7 +273,8 @@ impl LogicalPlanBuilder { }); for (_, exprs) in groups { let window_exprs = exprs.into_iter().cloned().collect::>(); - // the partition and sort itself is done at physical level, see the BasicEnforcement rule + // Partition and sorting is done at physical level, see the EnforceDistribution + // and EnforceSorting rules. plan = LogicalPlanBuilder::from(plan) .window(window_exprs)? .build()?; From 4259e543f40ff052c6620dc90151bfb0c9e41ef5 Mon Sep 17 00:00:00 2001 From: Mehmet Ozan Kabak Date: Sun, 8 Jan 2023 12:15:21 -0600 Subject: [PATCH 3/4] Add a note about tests verifying EnforceDistribution/EnforceSorting jointly --- datafusion/core/src/physical_optimizer/dist_enforcement.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/datafusion/core/src/physical_optimizer/dist_enforcement.rs b/datafusion/core/src/physical_optimizer/dist_enforcement.rs index bc2927559287d..836121b07021f 100644 --- a/datafusion/core/src/physical_optimizer/dist_enforcement.rs +++ b/datafusion/core/src/physical_optimizer/dist_enforcement.rs @@ -1117,6 +1117,11 @@ mod tests { // run optimizer let optimizer = EnforceDistribution {}; let optimized = optimizer.optimize($PLAN, &config)?; + // NOTE: These tests verify the joint `EnforceDistribution` + `EnforceSorting` cascade + // because they were written prior to the separation of `BasicEnforcement` into + // `EnforceSorting` and `EnfoceDistribution`. + // TODO: Orthogonalize the tests here just to verify `EnforceDistribution` and create + // new tests for the cascade. let optimizer = EnforceSorting {}; let optimized = optimizer.optimize(optimized, &config)?; From 849f7c6fcb0d82ba0da8f626c81be58d888530bb Mon Sep 17 00:00:00 2001 From: Mehmet Ozan Kabak Date: Sun, 8 Jan 2023 21:20:32 -0600 Subject: [PATCH 4/4] Address review, fix the stale comment --- datafusion/core/src/physical_optimizer/dist_enforcement.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/dist_enforcement.rs b/datafusion/core/src/physical_optimizer/dist_enforcement.rs index 836121b07021f..aa8b07569cd58 100644 --- a/datafusion/core/src/physical_optimizer/dist_enforcement.rs +++ b/datafusion/core/src/physical_optimizer/dist_enforcement.rs @@ -825,9 +825,10 @@ fn new_join_conditions( new_join_on } -/// Within this function, it checks whether we need to add additional plan operators -/// of data exchanging and data ordering to satisfy the required distribution and ordering. -/// And we should avoid to manually add plan operators of data exchanging and data ordering in other places +/// This function checks whether we need to add additional data exchange +/// operators to satisfy distribution requirements. Since this function +/// takes care of such requirements, we should avoid manually adding data +/// exchange operators in other places. fn ensure_distribution( plan: Arc, target_partitions: usize,