From 590082470f4b4837d80302de662accaaf583441e Mon Sep 17 00:00:00 2001 From: wiedld Date: Fri, 14 Feb 2025 20:10:09 -0800 Subject: [PATCH 1/5] refactor: update_coalesce_ctx_children to more tightly define the context, selectively indicating when coalesce should be removed --- .../src/enforce_sorting/mod.rs | 138 +++++++++++------- 1 file changed, 88 insertions(+), 50 deletions(-) diff --git a/datafusion/physical-optimizer/src/enforce_sorting/mod.rs b/datafusion/physical-optimizer/src/enforce_sorting/mod.rs index 11f1d8751d83c..44228521a92e0 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/mod.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/mod.rs @@ -56,7 +56,7 @@ use datafusion_common::config::ConfigOptions; use datafusion_common::plan_err; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::Result; -use datafusion_physical_expr::{Distribution, Partitioning}; +use datafusion_physical_expr::Distribution; use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement}; use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; @@ -138,29 +138,67 @@ fn update_sort_ctx_children_data( /// [`CoalescePartitionsExec`] descendant(s) for every child of a plan. The data /// attribute stores whether the plan is a `CoalescePartitionsExec` or is /// connected to a `CoalescePartitionsExec` via its children. +/// +/// The tracker halts at each [`SortExec`] (where the SPM will act to replace the coalesce). +/// +/// This requires a bottom-up traversal was previously performed, updating the +/// children previously. pub type PlanWithCorrespondingCoalescePartitions = PlanContext; +/// Determines if the coalesce may be safely removed. +fn is_coalesce_to_remove( + node: &Arc, + parent: &Arc, +) -> bool { + node.as_any() + .downcast_ref::() + .map(|_coalesce| { + // TODO(wiedld): find a more generalized approach that does not rely on + // pattern matching the structure of the DAG + // Note that the `Partitioning::satisfy()` (parent vs. coalesce.child) cannot be used for cases of: + // * Repartition -> Coalesce -> Repartition + + let parent_req_single_partition = matches!( + parent.required_input_distribution()[0], + Distribution::SinglePartition + ); + + // node above does not require single distribution + !parent_req_single_partition + // it doesn't immediately repartition + || is_repartition(parent) + // any adjacent Coalesce->Sort can be replaced + || is_sort(parent) + }) + .unwrap_or(false) +} + fn update_coalesce_ctx_children( coalesce_context: &mut PlanWithCorrespondingCoalescePartitions, ) { - let children = &coalesce_context.children; - coalesce_context.data = if children.is_empty() { - // Plan has no children, it cannot be a `CoalescePartitionsExec`. - false - } else if is_coalesce_partitions(&coalesce_context.plan) { - // Initiate a connection: - true - } else { - children.iter().enumerate().any(|(idx, node)| { - // Only consider operators that don't require a single partition, - // and connected to some `CoalescePartitionsExec`: - node.data - && !matches!( - coalesce_context.plan.required_input_distribution()[idx], - Distribution::SinglePartition - ) - }) - }; + // perform lookahead(1) during bottom up traversal + // since we are checking distribution requirements after the coalesce occurs + let parent = &coalesce_context.plan; + + for child_context in coalesce_context.children.iter_mut() { + // determine if child, or it's descendents, are a coalesce to be removed + child_context.data = if child_context.children.is_empty() { + // Plan has no children, it cannot be a `CoalescePartitionsExec`. + false + } else if is_coalesce_to_remove(&child_context.plan, parent) { + // Initiate a connection: + true + } else if is_sort(&child_context.plan) { + // halt coalesce removals at the sort + false + } else { + // propogate + child_context + .children + .iter() + .any(|grandchild| grandchild.data) + }; + } } /// Performs optimizations based upon a series of subrules. @@ -316,14 +354,35 @@ fn replace_with_partial_sort( /// are transformed into /// ```text /// "SortPreservingMergeExec: \[a@0 ASC\]", -/// " ...nodes..." -/// " SortExec: expr=\[a@0 ASC\]", +/// " SortExec: expr=\[a@0 ASC\]", +/// " ...nodes..." +/// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", +/// ``` +/// by following connections from [`CoalescePartitionsExec`]s to [`SortExec`]s. +/// By performing sorting in parallel, we can increase performance in some scenarios. +/// +/// This requires that there are no nodes between the [`SortExec`] and [`CoalescePartitionsExec`] +/// which require single partitioning. Do not parallelize when the following scenario occurs: +/// ```text +/// "SortExec: expr=\[a@0 ASC\]", +/// " ...nodes requiring single partitioning..." +/// " CoalescePartitionsExec", /// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", /// ``` pub fn parallelize_sorts( mut requirements: PlanWithCorrespondingCoalescePartitions, ) -> Result> { + requirements = requirements.update_plan_from_children()?; update_coalesce_ctx_children(&mut requirements); + let coalesce_can_be_removed = requirements.children.iter().any(|child| child.data); + + let should_parallelize_sort = (is_sort(&requirements.plan) + || is_sort_preserving_merge(&requirements.plan)) + && requirements.plan.output_partitioning().partition_count() <= 1 + && coalesce_can_be_removed; + + // Repartition -> Coalesce -> Repartition + let unneeded_coalesce = is_repartition(&requirements.plan) && coalesce_can_be_removed; if requirements.children.is_empty() || !requirements.children[0].data { // We only take an action when the plan is either a `SortExec`, a @@ -331,10 +390,7 @@ pub fn parallelize_sorts( // all have a single child. Therefore, if the first child has no // connection, we can return immediately. Ok(Transformed::no(requirements)) - } else if (is_sort(&requirements.plan) - || is_sort_preserving_merge(&requirements.plan)) - && requirements.plan.output_partitioning().partition_count() <= 1 - { + } else if should_parallelize_sort { // Take the initial sort expressions and requirements let (sort_exprs, fetch) = get_sort_exprs(&requirements.plan)?; let sort_reqs = LexRequirement::from(sort_exprs.clone()); @@ -349,8 +405,11 @@ pub fn parallelize_sorts( // We also need to remove the self node since `remove_corresponding_coalesce_in_sub_plan` // deals with the children and their children and so on. requirements = requirements.children.swap_remove(0); + // sync the requirements.plan.children with the mutated requirements.children + requirements = requirements.update_plan_from_children()?; requirements = add_sort_above_with_check(requirements, sort_reqs, fetch); + requirements = requirements.update_plan_from_children()?; let spm = SortPreservingMergeExec::new(sort_exprs, Arc::clone(&requirements.plan)); @@ -361,20 +420,11 @@ pub fn parallelize_sorts( vec![requirements], ), )) - } else if is_coalesce_partitions(&requirements.plan) { - // There is an unnecessary `CoalescePartitionsExec` in the plan. - // This will handle the recursive `CoalescePartitionsExec` plans. + } else if unneeded_coalesce { requirements = remove_bottleneck_in_subplan(requirements)?; - // For the removal of self node which is also a `CoalescePartitionsExec`. - requirements = requirements.children.swap_remove(0); + requirements = requirements.update_plan_from_children()?; - Ok(Transformed::yes( - PlanWithCorrespondingCoalescePartitions::new( - Arc::new(CoalescePartitionsExec::new(Arc::clone(&requirements.plan))), - false, - vec![requirements], - ), - )) + Ok(Transformed::yes(requirements)) } else { Ok(Transformed::yes(requirements)) } @@ -609,19 +659,7 @@ fn remove_bottleneck_in_subplan( }) .collect::>()?; } - let mut new_reqs = requirements.update_plan_from_children()?; - if let Some(repartition) = new_reqs.plan.as_any().downcast_ref::() { - let input_partitioning = repartition.input().output_partitioning(); - // We can remove this repartitioning operator if it is now a no-op: - let mut can_remove = input_partitioning.eq(repartition.partitioning()); - // We can also remove it if we ended up with an ineffective RR: - if let Partitioning::RoundRobinBatch(n_out) = repartition.partitioning() { - can_remove |= *n_out == input_partitioning.partition_count(); - } - if can_remove { - new_reqs = new_reqs.children.swap_remove(0) - } - } + let new_reqs = requirements.update_plan_from_children()?; Ok(new_reqs) } From 1e7bc76133b174862fed7052cceaabc143238680 Mon Sep 17 00:00:00 2001 From: wiedld Date: Tue, 18 Feb 2025 09:26:13 -0800 Subject: [PATCH 2/5] chore: fix typo --- datafusion/physical-optimizer/src/enforce_sorting/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-optimizer/src/enforce_sorting/mod.rs b/datafusion/physical-optimizer/src/enforce_sorting/mod.rs index 44228521a92e0..e5690ba04cff2 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/mod.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/mod.rs @@ -192,7 +192,7 @@ fn update_coalesce_ctx_children( // halt coalesce removals at the sort false } else { - // propogate + // propagate child_context .children .iter() From 77f2dd290ea2b45764bacd59126a4b00ec20a4d6 Mon Sep 17 00:00:00 2001 From: wiedld Date: Wed, 26 Feb 2025 16:38:44 -0800 Subject: [PATCH 3/5] chore: add docs to update_coalesce_ctx_children --- .../src/enforce_sorting/mod.rs | 26 ++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-optimizer/src/enforce_sorting/mod.rs b/datafusion/physical-optimizer/src/enforce_sorting/mod.rs index e5690ba04cff2..6ea2a73af8113 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/mod.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/mod.rs @@ -153,10 +153,8 @@ fn is_coalesce_to_remove( node.as_any() .downcast_ref::() .map(|_coalesce| { - // TODO(wiedld): find a more generalized approach that does not rely on - // pattern matching the structure of the DAG // Note that the `Partitioning::satisfy()` (parent vs. coalesce.child) cannot be used for cases of: - // * Repartition -> Coalesce -> Repartition + // * Repartition -> Coalesce -> Repartition let parent_req_single_partition = matches!( parent.required_input_distribution()[0], @@ -173,6 +171,28 @@ fn is_coalesce_to_remove( .unwrap_or(false) } +/// Discovers the linked Coalesce->Sort cascades. +/// +/// This linkage is used in [`remove_bottleneck_in_subplan`] to selectively +/// remove the linked coalesces in the subplan. Then afterwards, an SPM is added +/// at the root of the subplan (just after the sort) in order to parallelize sorts. +/// Refer to the [`parallelize_sorts`] for more details on sort parallelization. +/// +/// Example of linked Coalesce->Sort: +/// ```text +/// SortExec ctx.data=false, to halt remove_bottleneck_in_subplan) +/// ...nodes... ctx.data=true (e.g. are linked in cascade) +/// Coalesce ctx.data=true (e.g. is a coalesce) +/// ``` +/// +/// The link should not be continued (and the coalesce not removed) if the distribution +/// is changed between the Coalesce->Sort cascade. Example: +/// ```text +/// SortExec ctx.data=false, to halt remove_bottleneck_in_subplan) +/// AggregateExec ctx.data=false, to stop the link +/// ...nodes... ctx.data=true (e.g. are linked in cascade) +/// Coalesce ctx.data=true (e.g. is a coalesce) +/// ``` fn update_coalesce_ctx_children( coalesce_context: &mut PlanWithCorrespondingCoalescePartitions, ) { From 535e74f06e072a0228c524fc5277e825645e2520 Mon Sep 17 00:00:00 2001 From: wiedld Date: Wed, 26 Feb 2025 20:08:49 -0800 Subject: [PATCH 4/5] refactor: cleanup is_coalesce_to_remove() --- .../src/enforce_sorting/mod.rs | 27 ++++++++----------- 1 file changed, 11 insertions(+), 16 deletions(-) diff --git a/datafusion/physical-optimizer/src/enforce_sorting/mod.rs b/datafusion/physical-optimizer/src/enforce_sorting/mod.rs index 6ea2a73af8113..06d29578721d8 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/mod.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/mod.rs @@ -150,25 +150,20 @@ fn is_coalesce_to_remove( node: &Arc, parent: &Arc, ) -> bool { - node.as_any() - .downcast_ref::() - .map(|_coalesce| { - // Note that the `Partitioning::satisfy()` (parent vs. coalesce.child) cannot be used for cases of: - // * Repartition -> Coalesce -> Repartition - - let parent_req_single_partition = matches!( - parent.required_input_distribution()[0], - Distribution::SinglePartition - ); + let parent_req_single_partition = matches!( + parent.required_input_distribution()[0], + Distribution::SinglePartition + ); + is_coalesce_partitions(node) + && ( // node above does not require single distribution !parent_req_single_partition - // it doesn't immediately repartition - || is_repartition(parent) - // any adjacent Coalesce->Sort can be replaced - || is_sort(parent) - }) - .unwrap_or(false) + // it doesn't immediately repartition + || is_repartition(parent) + // any adjacent Coalesce->Sort can be replaced + || is_sort(parent) + ) } /// Discovers the linked Coalesce->Sort cascades. From 68c8201b8097e8c87430bfb12d06a53f27e1a194 Mon Sep 17 00:00:00 2001 From: wiedld Date: Thu, 27 Feb 2025 09:16:39 -0800 Subject: [PATCH 5/5] refactor: make the change only be docs --- .../src/enforce_sorting/mod.rs | 111 ++++++++---------- 1 file changed, 48 insertions(+), 63 deletions(-) diff --git a/datafusion/physical-optimizer/src/enforce_sorting/mod.rs b/datafusion/physical-optimizer/src/enforce_sorting/mod.rs index 06d29578721d8..20733b65692fc 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/mod.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/mod.rs @@ -56,7 +56,7 @@ use datafusion_common::config::ConfigOptions; use datafusion_common::plan_err; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::Result; -use datafusion_physical_expr::Distribution; +use datafusion_physical_expr::{Distribution, Partitioning}; use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement}; use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; @@ -145,27 +145,6 @@ fn update_sort_ctx_children_data( /// children previously. pub type PlanWithCorrespondingCoalescePartitions = PlanContext; -/// Determines if the coalesce may be safely removed. -fn is_coalesce_to_remove( - node: &Arc, - parent: &Arc, -) -> bool { - let parent_req_single_partition = matches!( - parent.required_input_distribution()[0], - Distribution::SinglePartition - ); - - is_coalesce_partitions(node) - && ( - // node above does not require single distribution - !parent_req_single_partition - // it doesn't immediately repartition - || is_repartition(parent) - // any adjacent Coalesce->Sort can be replaced - || is_sort(parent) - ) -} - /// Discovers the linked Coalesce->Sort cascades. /// /// This linkage is used in [`remove_bottleneck_in_subplan`] to selectively @@ -191,29 +170,24 @@ fn is_coalesce_to_remove( fn update_coalesce_ctx_children( coalesce_context: &mut PlanWithCorrespondingCoalescePartitions, ) { - // perform lookahead(1) during bottom up traversal - // since we are checking distribution requirements after the coalesce occurs - let parent = &coalesce_context.plan; - - for child_context in coalesce_context.children.iter_mut() { - // determine if child, or it's descendents, are a coalesce to be removed - child_context.data = if child_context.children.is_empty() { - // Plan has no children, it cannot be a `CoalescePartitionsExec`. - false - } else if is_coalesce_to_remove(&child_context.plan, parent) { - // Initiate a connection: - true - } else if is_sort(&child_context.plan) { - // halt coalesce removals at the sort - false - } else { - // propagate - child_context - .children - .iter() - .any(|grandchild| grandchild.data) - }; - } + let children = &coalesce_context.children; + coalesce_context.data = if children.is_empty() { + // Plan has no children, it cannot be a `CoalescePartitionsExec`. + false + } else if is_coalesce_partitions(&coalesce_context.plan) { + // Initiate a connection: + true + } else { + children.iter().enumerate().any(|(idx, node)| { + // Only consider operators that don't require a single partition, + // and connected to some `CoalescePartitionsExec`: + node.data + && !matches!( + coalesce_context.plan.required_input_distribution()[idx], + Distribution::SinglePartition + ) + }) + }; } /// Performs optimizations based upon a series of subrules. @@ -387,17 +361,7 @@ fn replace_with_partial_sort( pub fn parallelize_sorts( mut requirements: PlanWithCorrespondingCoalescePartitions, ) -> Result> { - requirements = requirements.update_plan_from_children()?; update_coalesce_ctx_children(&mut requirements); - let coalesce_can_be_removed = requirements.children.iter().any(|child| child.data); - - let should_parallelize_sort = (is_sort(&requirements.plan) - || is_sort_preserving_merge(&requirements.plan)) - && requirements.plan.output_partitioning().partition_count() <= 1 - && coalesce_can_be_removed; - - // Repartition -> Coalesce -> Repartition - let unneeded_coalesce = is_repartition(&requirements.plan) && coalesce_can_be_removed; if requirements.children.is_empty() || !requirements.children[0].data { // We only take an action when the plan is either a `SortExec`, a @@ -405,7 +369,10 @@ pub fn parallelize_sorts( // all have a single child. Therefore, if the first child has no // connection, we can return immediately. Ok(Transformed::no(requirements)) - } else if should_parallelize_sort { + } else if (is_sort(&requirements.plan) + || is_sort_preserving_merge(&requirements.plan)) + && requirements.plan.output_partitioning().partition_count() <= 1 + { // Take the initial sort expressions and requirements let (sort_exprs, fetch) = get_sort_exprs(&requirements.plan)?; let sort_reqs = LexRequirement::from(sort_exprs.clone()); @@ -420,11 +387,8 @@ pub fn parallelize_sorts( // We also need to remove the self node since `remove_corresponding_coalesce_in_sub_plan` // deals with the children and their children and so on. requirements = requirements.children.swap_remove(0); - // sync the requirements.plan.children with the mutated requirements.children - requirements = requirements.update_plan_from_children()?; requirements = add_sort_above_with_check(requirements, sort_reqs, fetch); - requirements = requirements.update_plan_from_children()?; let spm = SortPreservingMergeExec::new(sort_exprs, Arc::clone(&requirements.plan)); @@ -435,11 +399,20 @@ pub fn parallelize_sorts( vec![requirements], ), )) - } else if unneeded_coalesce { + } else if is_coalesce_partitions(&requirements.plan) { + // There is an unnecessary `CoalescePartitionsExec` in the plan. + // This will handle the recursive `CoalescePartitionsExec` plans. requirements = remove_bottleneck_in_subplan(requirements)?; - requirements = requirements.update_plan_from_children()?; + // For the removal of self node which is also a `CoalescePartitionsExec`. + requirements = requirements.children.swap_remove(0); - Ok(Transformed::yes(requirements)) + Ok(Transformed::yes( + PlanWithCorrespondingCoalescePartitions::new( + Arc::new(CoalescePartitionsExec::new(Arc::clone(&requirements.plan))), + false, + vec![requirements], + ), + )) } else { Ok(Transformed::yes(requirements)) } @@ -674,7 +647,19 @@ fn remove_bottleneck_in_subplan( }) .collect::>()?; } - let new_reqs = requirements.update_plan_from_children()?; + let mut new_reqs = requirements.update_plan_from_children()?; + if let Some(repartition) = new_reqs.plan.as_any().downcast_ref::() { + let input_partitioning = repartition.input().output_partitioning(); + // We can remove this repartitioning operator if it is now a no-op: + let mut can_remove = input_partitioning.eq(repartition.partitioning()); + // We can also remove it if we ended up with an ineffective RR: + if let Partitioning::RoundRobinBatch(n_out) = repartition.partitioning() { + can_remove |= *n_out == input_partitioning.partition_count(); + } + if can_remove { + new_reqs = new_reqs.children.swap_remove(0) + } + } Ok(new_reqs) }