diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 112e9f3e20b07..6ce3f64eeb665 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -290,6 +290,18 @@ config_namespace! { /// functions in parallel using the provided `target_partitions` level" pub repartition_windows: bool, default = true + /// Should DataFusion execute sorts in a per-partition fashion and merge + /// afterwards instead of coalescing first and sorting globally + /// With this flag is enabled, plans in the form below + /// "SortExec: [a@0 ASC]", + /// " CoalescePartitionsExec", + /// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", + /// would turn into the plan below which performs better in multithreaded environments + /// "SortPreservingMergeExec: [a@0 ASC]", + /// " SortExec: [a@0 ASC]", + /// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", + pub repartition_sorts: bool, default = true + /// When set to true, the logical plan optimizer will produce warning /// messages if any optimization rules produce errors and then proceed to the next /// rule. When set to false, any rules that produce errors will cause the query to fail diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index 132d5711a519f..a053f640fb3bf 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -1232,6 +1232,12 @@ impl SessionConfig { self.options.optimizer.repartition_windows } + /// Do we execute sorts in a per-partition fashion and merge afterwards, + /// or do we coalesce partitions first and sort globally? + pub fn repartition_sorts(&self) -> bool { + self.options.optimizer.repartition_sorts + } + /// Are statistics collected during execution? pub fn collect_statistics(&self) -> bool { self.options.execution.collect_statistics @@ -1290,6 +1296,12 @@ impl SessionConfig { self } + /// Enables or disables the use of per-partition sorting to improve parallelism + pub fn with_repartition_sorts(mut self, enabled: bool) -> Self { + self.options.optimizer.repartition_sorts = enabled; + self + } + /// Enables or disables the use of pruning predicate for parquet readers to skip row groups pub fn with_parquet_pruning(mut self, enabled: bool) -> Self { self.options.execution.parquet.pruning = enabled; diff --git a/datafusion/core/src/physical_optimizer/dist_enforcement.rs b/datafusion/core/src/physical_optimizer/dist_enforcement.rs index c6c2bd40e39f7..0dbba2c319b4d 100644 --- a/datafusion/core/src/physical_optimizer/dist_enforcement.rs +++ b/datafusion/core/src/physical_optimizer/dist_enforcement.rs @@ -1134,7 +1134,7 @@ mod tests { // `EnforceSorting` and `EnfoceDistribution`. // TODO: Orthogonalize the tests here just to verify `EnforceDistribution` and create // new tests for the cascade. - let optimizer = EnforceSorting {}; + let optimizer = EnforceSorting::new(); let optimized = optimizer.optimize(optimized, &config)?; // Now format correctly diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs b/datafusion/core/src/physical_optimizer/sort_enforcement.rs index c9a3c8fec293e..0eadfb7c2dea5 100644 --- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs +++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs @@ -33,16 +33,19 @@ use crate::config::ConfigOptions; use crate::error::Result; use crate::physical_optimizer::utils::add_sort_above_child; use crate::physical_optimizer::PhysicalOptimizerRule; +use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; +use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use crate::physical_plan::rewrite::TreeNodeRewritable; use crate::physical_plan::sorts::sort::SortExec; +use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; +use crate::physical_plan::union::UnionExec; use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec}; -use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan}; +use crate::physical_plan::{with_new_children_if_necessary, Distribution, ExecutionPlan}; use arrow::datatypes::SchemaRef; use datafusion_common::{reverse_sort_options, DataFusionError}; use datafusion_physical_expr::utils::{ordering_satisfy, ordering_satisfy_concrete}; -use datafusion_physical_expr::window::WindowExpr; use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr}; -use itertools::izip; +use itertools::{concat, izip}; use std::iter::zip; use std::sync::Arc; @@ -58,15 +61,39 @@ impl EnforceSorting { } } -/// This is a "data class" we use within the [EnforceSorting] rule that -/// tracks the closest `SortExec` descendant for every child of a plan. +/// This object implements a tree that we use while keeping track of paths +/// leading to `SortExec`s. +#[derive(Debug, Clone)] +struct ExecTree { + /// Child index of the plan in its parent + pub idx: usize, + /// Children of the plan that would need updating if we remove leaf executors + pub children: Vec, + /// The `ExecutionPlan` associated with this node + pub plan: Arc, +} + +impl ExecTree { + /// This function returns the executors at the leaves of the tree. + fn get_leaves(&self) -> Vec> { + if self.children.is_empty() { + vec![self.plan.clone()] + } else { + concat(self.children.iter().map(|e| e.get_leaves())) + } + } +} + +/// This object is used within the [EnforceSorting] rule to track the closest +/// `SortExec` descendant(s) for every child of a plan. #[derive(Debug, Clone)] struct PlanWithCorrespondingSort { plan: Arc, - // For every child, keep a vector of `ExecutionPlan`s starting from the - // closest `SortExec` till the current plan. The first index of the tuple is - // the child index of the plan -- we need this information as we make updates. - sort_onwards: Vec)>>, + // For every child, keep a subtree of `ExecutionPlan`s starting from the + // child until the `SortExec`(s) -- could be multiple for n-ary plans like + // Union -- that determine the output ordering of the child. If the child + // has no connection to any sort, simpliy store None (and not a subtree). + sort_onwards: Vec>, } impl PlanWithCorrespondingSort { @@ -74,10 +101,76 @@ impl PlanWithCorrespondingSort { let length = plan.children().len(); PlanWithCorrespondingSort { plan, - sort_onwards: vec![vec![]; length], + sort_onwards: vec![None; length], } } + pub fn new_from_children_nodes( + children_nodes: Vec, + parent_plan: Arc, + ) -> Result { + let children_plans = children_nodes + .iter() + .map(|item| item.plan.clone()) + .collect::>(); + let sort_onwards = children_nodes + .into_iter() + .enumerate() + .map(|(idx, item)| { + let plan = &item.plan; + // Leaves of `sort_onwards` are `SortExec` operators, which impose + // an ordering. This tree collects all the intermediate executors + // that maintain this ordering. If we just saw a order imposing + // operator, we reset the tree and start accumulating. + if is_sort(plan) { + return Some(ExecTree { + idx, + plan: item.plan, + children: vec![], + }); + } else if is_limit(plan) { + // There is no sort linkage for this path, it starts at a limit. + return None; + } + let is_spm = is_sort_preserving_merge(plan); + let is_union = plan.as_any().is::(); + // If the executor is a `UnionExec`, and it has an output ordering; + // then it at least partially maintains some child's output ordering. + // Therefore, we propagate this information upwards. + let partially_maintains = is_union && plan.output_ordering().is_some(); + let required_orderings = plan.required_input_ordering(); + let flags = plan.maintains_input_order(); + let children = izip!(flags, item.sort_onwards, required_orderings) + .filter_map(|(maintains, element, required_ordering)| { + if (required_ordering.is_none() + && (maintains || partially_maintains)) + || is_spm + { + element + } else { + None + } + }) + .collect::>(); + if !children.is_empty() { + // Add parent node to the tree if there is at least one + // child with a subtree: + Some(ExecTree { + idx, + plan: item.plan, + children, + }) + } else { + // There is no sort linkage for this child, do nothing. + None + } + }) + .collect(); + + let plan = with_new_children_if_necessary(parent_plan, children_plans)?; + Ok(PlanWithCorrespondingSort { plan, sort_onwards }) + } + pub fn children(&self) -> Vec { self.plan .children() @@ -96,50 +189,147 @@ impl TreeNodeRewritable for PlanWithCorrespondingSort { if children.is_empty() { Ok(self) } else { - let children_requirements = children + let children_nodes = children .into_iter() .map(transform) .collect::>>()?; - let children_plans = children_requirements - .iter() - .map(|elem| elem.plan.clone()) - .collect::>(); - let sort_onwards = children_requirements - .iter() - .map(|item| { - let onwards = &item.sort_onwards; - if !onwards.is_empty() { - let flags = item.plan.maintains_input_order(); - // `onwards` starts from sort introducing executor(e.g `SortExec`, `SortPreservingMergeExec`) till the current executor - // if the executors in between maintain input ordering. If we are at - // the beginning both `SortExec` and `SortPreservingMergeExec` doesn't maintain ordering(they introduce ordering). - // However, we want to propagate them above anyway. - for (maintains, element) in flags.into_iter().zip(onwards.iter()) - { - if (maintains || is_sort(&item.plan)) && !element.is_empty() { - return element.clone(); - } - } + PlanWithCorrespondingSort::new_from_children_nodes(children_nodes, self.plan) + } + } +} + +/// This object is used within the [EnforceSorting] rule to track the closest +/// `CoalescePartitionsExec` descendant(s) for every child of a plan. +#[derive(Debug, Clone)] +struct PlanWithCorrespondingCoalescePartitions { + plan: Arc, + // For every child, keep a subtree of `ExecutionPlan`s starting from the + // child until the `CoalescePartitionsExec`(s) -- could be multiple for + // n-ary plans like Union -- that affect the output partitioning of the + // child. If the child has no connection to any `CoalescePartitionsExec`, + // simplify store None (and not a subtree). + coalesce_onwards: Vec>, +} + +impl PlanWithCorrespondingCoalescePartitions { + pub fn new(plan: Arc) -> Self { + let length = plan.children().len(); + PlanWithCorrespondingCoalescePartitions { + plan, + coalesce_onwards: vec![None; length], + } + } + + pub fn new_from_children_nodes( + children_nodes: Vec, + parent_plan: Arc, + ) -> Result { + let children_plans = children_nodes + .iter() + .map(|item| item.plan.clone()) + .collect(); + let coalesce_onwards = children_nodes + .into_iter() + .enumerate() + .map(|(idx, item)| { + // Leaves of the `coalesce_onwards` tree are `CoalescePartitionsExec` + // operators. This tree collects all the intermediate executors that + // maintain a single partition. If we just saw a `CoalescePartitionsExec` + // operator, we reset the tree and start accumulating. + let plan = item.plan; + if plan.as_any().is::() { + Some(ExecTree { + idx, + plan, + children: vec![], + }) + } else if plan.children().is_empty() { + // Plan has no children, there is nothing to propagate. + None + } else { + let children = item + .coalesce_onwards + .into_iter() + .flatten() + .filter(|item| { + // Only consider operators that don't require a + // single partition. + !matches!( + plan.required_input_distribution()[item.idx], + Distribution::SinglePartition + ) + }) + .collect::>(); + if children.is_empty() { + None + } else { + Some(ExecTree { + idx, + plan, + children, + }) } - vec![] - }) - .collect::>(); - let plan = with_new_children_if_necessary(self.plan, children_plans)?; - Ok(PlanWithCorrespondingSort { plan, sort_onwards }) + } + }) + .collect(); + let plan = with_new_children_if_necessary(parent_plan, children_plans)?; + Ok(PlanWithCorrespondingCoalescePartitions { + plan, + coalesce_onwards, + }) + } + + pub fn children(&self) -> Vec { + self.plan + .children() + .into_iter() + .map(|child| PlanWithCorrespondingCoalescePartitions::new(child)) + .collect() + } +} + +impl TreeNodeRewritable for PlanWithCorrespondingCoalescePartitions { + fn map_children(self, transform: F) -> Result + where + F: FnMut(Self) -> Result, + { + let children = self.children(); + if children.is_empty() { + Ok(self) + } else { + let children_nodes = children + .into_iter() + .map(transform) + .collect::>>()?; + PlanWithCorrespondingCoalescePartitions::new_from_children_nodes( + children_nodes, + self.plan, + ) } } } +/// The boolean flag `repartition_sorts` defined in the config indicates +/// whether we elect to transform CoalescePartitionsExec + SortExec cascades +/// into SortExec + SortPreservingMergeExec cascades, which enables us to +/// perform sorting in parallel. impl PhysicalOptimizerRule for EnforceSorting { fn optimize( &self, plan: Arc, - _config: &ConfigOptions, + config: &ConfigOptions, ) -> Result> { - // Execute a post-order traversal to adjust input key ordering: let plan_requirements = PlanWithCorrespondingSort::new(plan); let adjusted = plan_requirements.transform_up(&ensure_sorting)?; - Ok(adjusted.plan) + if config.optimizer.repartition_sorts { + let plan_with_coalesce_partitions = + PlanWithCorrespondingCoalescePartitions::new(adjusted.plan); + let parallel = + plan_with_coalesce_partitions.transform_up(¶llelize_sorts)?; + Ok(parallel.plan) + } else { + Ok(adjusted.plan) + } } fn name(&self) -> &str { @@ -151,25 +341,98 @@ impl PhysicalOptimizerRule for EnforceSorting { } } -// Checks whether executor is Sort -// TODO: Add support for SortPreservingMergeExec also. +/// This function turns plans of the form +/// "SortExec: [a@0 ASC]", +/// " CoalescePartitionsExec", +/// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", +/// to +/// "SortPreservingMergeExec: [a@0 ASC]", +/// " SortExec: [a@0 ASC]", +/// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", +/// by following connections from `CoalescePartitionsExec`s to `SortExec`s. +/// By performing sorting in parallel, we can increase performance in some scenarios. +fn parallelize_sorts( + requirements: PlanWithCorrespondingCoalescePartitions, +) -> Result> { + let plan = requirements.plan; + if plan.children().is_empty() { + return Ok(None); + } + let mut coalesce_onwards = requirements.coalesce_onwards; + // We know that `plan` has children, so `coalesce_onwards` is non-empty. + if coalesce_onwards[0].is_some() { + if let Some(sort_exec) = plan.as_any().downcast_ref::() { + // If there is a connection between a `CoalescePartitionsExec` and a + // `SortExec` that satisfy the requirements (i.e. they don't require a + // single partition), then we can replace the `CoalescePartitionsExec` + // + `SortExec` cascade with a `SortExec` + `SortPreservingMergeExec` + // cascade to parallelize sorting. + let mut prev_layer = plan.clone(); + update_child_to_change_coalesce( + &mut prev_layer, + &mut coalesce_onwards[0], + Some(sort_exec), + )?; + let spm = SortPreservingMergeExec::new(sort_exec.expr().to_vec(), prev_layer); + return Ok(Some(PlanWithCorrespondingCoalescePartitions { + plan: Arc::new(spm), + coalesce_onwards: vec![None], + })); + } else if plan.as_any().is::() { + // There is an unnecessary `CoalescePartitionExec` in the plan. + let mut prev_layer = plan.clone(); + update_child_to_change_coalesce( + &mut prev_layer, + &mut coalesce_onwards[0], + None, + )?; + let new_plan = plan.with_new_children(vec![prev_layer])?; + return Ok(Some(PlanWithCorrespondingCoalescePartitions { + plan: new_plan, + coalesce_onwards: vec![None], + })); + } + } + Ok(Some(PlanWithCorrespondingCoalescePartitions { + plan, + coalesce_onwards, + })) +} + +/// Checks whether the given executor is a limit; +/// i.e. either a `LocalLimitExec` or a `GlobalLimitExec`. +fn is_limit(plan: &Arc) -> bool { + plan.as_any().is::() || plan.as_any().is::() +} + +/// Checks whether the given executor is a `SortExec`. fn is_sort(plan: &Arc) -> bool { plan.as_any().is::() } +/// Checks whether the given executor is a `SortPreservingMergeExec`. +fn is_sort_preserving_merge(plan: &Arc) -> bool { + plan.as_any().is::() +} + +/// This function enforces sorting requirements and makes optimizations without +/// violating these requirements whenever possible. fn ensure_sorting( requirements: PlanWithCorrespondingSort, ) -> Result> { // Perform naive analysis at the beginning -- remove already-satisfied sorts: - if let Some(result) = analyze_immediate_sort_removal(&requirements)? { + let plan = requirements.plan; + let mut children = plan.children(); + if children.is_empty() { + return Ok(None); + } + let mut sort_onwards = requirements.sort_onwards; + if let Some(result) = analyze_immediate_sort_removal(&plan, &sort_onwards) { return Ok(Some(result)); } - let plan = &requirements.plan; - let mut new_children = plan.children().clone(); - let mut new_onwards = requirements.sort_onwards.clone(); for (idx, (child, sort_onwards, required_ordering)) in izip!( - new_children.iter_mut(), - new_onwards.iter_mut(), + children.iter_mut(), + sort_onwards.iter_mut(), plan.required_input_ordering() ) .enumerate() @@ -184,225 +447,380 @@ fn ensure_sorting( ); if !is_ordering_satisfied { // Make sure we preserve the ordering requirements: - update_child_to_remove_unnecessary_sort(child, sort_onwards)?; + update_child_to_remove_unnecessary_sort(child, sort_onwards, &plan)?; let sort_expr = required_ordering.to_vec(); *child = add_sort_above_child(child, sort_expr)?; - sort_onwards.push((idx, child.clone())) + *sort_onwards = Some(ExecTree { + idx, + plan: child.clone(), + children: vec![], + }) } - if let [first, ..] = sort_onwards.as_slice() { - // The ordering requirement is met, we can analyze if there is an unnecessary sort: - let sort_any = first.1.clone(); - let sort_exec = convert_to_sort_exec(&sort_any)?; - let sort_output_ordering = sort_exec.output_ordering(); - let sort_input_ordering = sort_exec.input().output_ordering(); - // Simple analysis: Does the input of the sort in question already satisfy the ordering requirements? - if ordering_satisfy(sort_input_ordering, sort_output_ordering, || { - sort_exec.input().equivalence_properties() - }) { - update_child_to_remove_unnecessary_sort(child, sort_onwards)?; - } + if let Some(tree) = sort_onwards { // For window expressions, we can remove some sorts when we can // calculate the result in reverse: - else if let Some(exec) = - requirements.plan.as_any().downcast_ref::() - { - if let Some(result) = analyze_window_sort_removal( - exec.window_expr(), - &exec.partition_keys, - sort_exec, - sort_onwards, - )? { - return Ok(Some(result)); - } - } else if let Some(exec) = requirements - .plan - .as_any() - .downcast_ref::() + if plan.as_any().is::() + || plan.as_any().is::() { - if let Some(result) = analyze_window_sort_removal( - exec.window_expr(), - &exec.partition_keys, - sort_exec, - sort_onwards, - )? { + if let Some(result) = analyze_window_sort_removal(tree, &plan)? { return Ok(Some(result)); } } - // TODO: Once we can ensure that required ordering information propagates with - // necessary lineage information, compare `sort_input_ordering` and `required_ordering`. - // This will enable us to handle cases such as (a,b) -> Sort -> (a,b,c) -> Required(a,b). - // Currently, we can not remove such sorts. } } (Some(required), None) => { - // Ordering requirement is not met, we should add a SortExec to the plan. - let sort_expr = required.to_vec(); - *child = add_sort_above_child(child, sort_expr)?; - *sort_onwards = vec![(idx, child.clone())]; + // Ordering requirement is not met, we should add a `SortExec` to the plan. + *child = add_sort_above_child(child, required.to_vec())?; + *sort_onwards = Some(ExecTree { + idx, + plan: child.clone(), + children: vec![], + }) } (None, Some(_)) => { - // We have a SortExec whose effect may be neutralized by a order-imposing - // operator. In this case, remove this sort: - if !requirements.plan.maintains_input_order()[idx] { - update_child_to_remove_unnecessary_sort(child, sort_onwards)?; + // We have a `SortExec` whose effect may be neutralized by + // another order-imposing operator. Remove or update this sort: + if !plan.maintains_input_order()[idx] { + let count = plan.output_ordering().map_or(0, |e| e.len()); + if (count > 0) && !is_sort(&plan) { + update_child_to_change_finer_sort(child, sort_onwards, count)?; + } else { + update_child_to_remove_unnecessary_sort( + child, + sort_onwards, + &plan, + )?; + } } } (None, None) => {} } } - if plan.children().is_empty() { - Ok(Some(requirements)) - } else { - let new_plan = requirements.plan.with_new_children(new_children)?; - for (idx, (trace, required_ordering)) in new_onwards - .iter_mut() - .zip(new_plan.required_input_ordering()) - .enumerate() - .take(new_plan.children().len()) - { - if new_plan.maintains_input_order()[idx] - && required_ordering.is_none() - && !trace.is_empty() - { - trace.push((idx, new_plan.clone())); - } else { - trace.clear(); - if is_sort(&new_plan) { - trace.push((idx, new_plan.clone())); - } - } - } - Ok(Some(PlanWithCorrespondingSort { - plan: new_plan, - sort_onwards: new_onwards, - })) - } + Ok(Some(PlanWithCorrespondingSort { + plan: plan.with_new_children(children)?, + sort_onwards, + })) } -/// Analyzes a given `SortExec` to determine whether its input already has -/// a finer ordering than this `SortExec` enforces. +/// Analyzes a given `SortExec` (`plan`) to determine whether its input already +/// has a finer ordering than this `SortExec` enforces. fn analyze_immediate_sort_removal( - requirements: &PlanWithCorrespondingSort, -) -> Result> { - if let Some(sort_exec) = requirements.plan.as_any().downcast_ref::() { + plan: &Arc, + sort_onwards: &[Option], +) -> Option { + if let Some(sort_exec) = plan.as_any().downcast_ref::() { + let sort_input = sort_exec.input().clone(); // If this sort is unnecessary, we should remove it: if ordering_satisfy( - sort_exec.input().output_ordering(), + sort_input.output_ordering(), sort_exec.output_ordering(), - || sort_exec.input().equivalence_properties(), + || sort_input.equivalence_properties(), ) { // Since we know that a `SortExec` has exactly one child, // we can use the zero index safely: - let mut new_onwards = requirements.sort_onwards[0].to_vec(); - if !new_onwards.is_empty() { - new_onwards.pop(); - } - return Ok(Some(PlanWithCorrespondingSort { - plan: sort_exec.input().clone(), - sort_onwards: vec![new_onwards], - })); + return Some( + if !sort_exec.preserve_partitioning() + && sort_input.output_partitioning().partition_count() > 1 + { + // Replace the sort with a sort-preserving merge: + let new_plan: Arc = + Arc::new(SortPreservingMergeExec::new( + sort_exec.expr().to_vec(), + sort_input, + )); + let new_tree = ExecTree { + idx: 0, + plan: new_plan.clone(), + children: sort_onwards.iter().flat_map(|e| e.clone()).collect(), + }; + PlanWithCorrespondingSort { + plan: new_plan, + sort_onwards: vec![Some(new_tree)], + } + } else { + // Remove the sort: + PlanWithCorrespondingSort { + plan: sort_input, + sort_onwards: sort_onwards.to_vec(), + } + }, + ); } } - Ok(None) + None } /// Analyzes a [WindowAggExec] or a [BoundedWindowAggExec] to determine whether /// it may allow removing a sort. fn analyze_window_sort_removal( - window_expr: &[Arc], - partition_keys: &[Arc], - sort_exec: &SortExec, - sort_onward: &mut Vec<(usize, Arc)>, + sort_tree: &mut ExecTree, + window_exec: &Arc, ) -> Result> { - let required_ordering = sort_exec.output_ordering().ok_or_else(|| { - DataFusionError::Plan("A SortExec should have output ordering".to_string()) - })?; - let physical_ordering = sort_exec.input().output_ordering(); - let physical_ordering = if let Some(physical_ordering) = physical_ordering { - physical_ordering + let (window_expr, partition_keys) = if let Some(exec) = + window_exec.as_any().downcast_ref::() + { + (exec.window_expr(), &exec.partition_keys) + } else if let Some(exec) = window_exec.as_any().downcast_ref::() { + (exec.window_expr(), &exec.partition_keys) } else { - // If there is no physical ordering, there is no way to remove a sort -- immediately return: - return Ok(None); + return Err(DataFusionError::Plan( + "Expects to receive either WindowAggExec of BoundedWindowAggExec".to_string(), + )); }; - let (can_skip_sorting, should_reverse) = can_skip_sort( - window_expr[0].partition_by(), - required_ordering, - &sort_exec.input().schema(), - physical_ordering, - )?; - if can_skip_sorting { - let new_window_expr = if should_reverse { - window_expr - .iter() - .map(|e| e.get_reverse_expr()) - .collect::>>() - } else { - Some(window_expr.to_vec()) - }; - if let Some(window_expr) = new_window_expr { - let new_child = remove_corresponding_sort_from_sub_plan(sort_onward)?; - let new_schema = new_child.schema(); - - let uses_bounded_memory = window_expr.iter().all(|e| e.uses_bounded_memory()); - // If all window exprs can run with bounded memory choose bounded window variant - let new_plan = if uses_bounded_memory { - Arc::new(BoundedWindowAggExec::try_new( - window_expr, - new_child, - new_schema, - partition_keys.to_vec(), - Some(physical_ordering.to_vec()), - )?) as _ + + let mut first_should_reverse = None; + let mut physical_ordering_common = vec![]; + for sort_any in sort_tree.get_leaves() { + let sort_output_ordering = sort_any.output_ordering(); + // Variable `sort_any` will either be a `SortExec` or a + // `SortPreservingMergeExec`, and both have a single child. + // Therefore, we can use the 0th index without loss of generality. + let sort_input = sort_any.children()[0].clone(); + let physical_ordering = sort_input.output_ordering(); + // TODO: Once we can ensure that required ordering information propagates with + // the necessary lineage information, compare `physical_ordering` and the + // ordering required by the window executor instead of `sort_output_ordering`. + // This will enable us to handle cases such as (a,b) -> Sort -> (a,b,c) -> Required(a,b). + // Currently, we can not remove such sorts. + let required_ordering = sort_output_ordering.ok_or_else(|| { + DataFusionError::Plan("A SortExec should have output ordering".to_string()) + })?; + if let Some(physical_ordering) = physical_ordering { + if physical_ordering_common.is_empty() + || physical_ordering.len() < physical_ordering_common.len() + { + physical_ordering_common = physical_ordering.to_vec(); + } + let (can_skip_sorting, should_reverse) = can_skip_sort( + window_expr[0].partition_by(), + required_ordering, + &sort_input.schema(), + physical_ordering, + )?; + if !can_skip_sorting { + return Ok(None); + } + if let Some(first_should_reverse) = first_should_reverse { + if first_should_reverse != should_reverse { + return Ok(None); + } } else { - Arc::new(WindowAggExec::try_new( - window_expr, - new_child, - new_schema, - partition_keys.to_vec(), - Some(physical_ordering.to_vec()), - )?) as _ - }; - return Ok(Some(PlanWithCorrespondingSort::new(new_plan))); + first_should_reverse = Some(should_reverse); + } + } else { + // If there is no physical ordering, there is no way to remove a + // sort, so immediately return. + return Ok(None); } } + let new_window_expr = if first_should_reverse.unwrap() { + window_expr + .iter() + .map(|e| e.get_reverse_expr()) + .collect::>>() + } else { + Some(window_expr.to_vec()) + }; + if let Some(window_expr) = new_window_expr { + let requires_single_partition = matches!( + window_exec.required_input_distribution()[sort_tree.idx], + Distribution::SinglePartition + ); + let new_child = remove_corresponding_sort_from_sub_plan( + sort_tree, + requires_single_partition, + )?; + let new_schema = new_child.schema(); + + let uses_bounded_memory = window_expr.iter().all(|e| e.uses_bounded_memory()); + // If all window expressions can run with bounded memory, choose the + // bounded window variant: + let new_plan = if uses_bounded_memory { + Arc::new(BoundedWindowAggExec::try_new( + window_expr, + new_child, + new_schema, + partition_keys.to_vec(), + Some(physical_ordering_common), + )?) as _ + } else { + Arc::new(WindowAggExec::try_new( + window_expr, + new_child, + new_schema, + partition_keys.to_vec(), + Some(physical_ordering_common), + )?) as _ + }; + return Ok(Some(PlanWithCorrespondingSort::new(new_plan))); + } Ok(None) } -/// Updates child to remove the unnecessary sorting below it. -fn update_child_to_remove_unnecessary_sort( +/// Updates child to remove the unnecessary `CoalescePartitions` below it. +fn update_child_to_change_coalesce( child: &mut Arc, - sort_onwards: &mut Vec<(usize, Arc)>, + coalesce_onwards: &mut Option, + sort_exec: Option<&SortExec>, ) -> Result<()> { - if !sort_onwards.is_empty() { - *child = remove_corresponding_sort_from_sub_plan(sort_onwards)?; + if let Some(coalesce_onwards) = coalesce_onwards { + *child = change_corresponding_coalesce_in_sub_plan(coalesce_onwards, sort_exec)?; } Ok(()) } -/// Converts an [ExecutionPlan] trait object to a [SortExec] when possible. -fn convert_to_sort_exec(sort_any: &Arc) -> Result<&SortExec> { - sort_any.as_any().downcast_ref::().ok_or_else(|| { - DataFusionError::Plan("Given ExecutionPlan is not a SortExec".to_string()) - }) +/// Removes the `CoalescePartitions` from the plan in `coalesce_onwards`. +fn change_corresponding_coalesce_in_sub_plan( + coalesce_onwards: &mut ExecTree, + sort_exec: Option<&SortExec>, +) -> Result> { + Ok( + if coalesce_onwards + .plan + .as_any() + .is::() + { + // We can safely use the 0th index since we have a `CoalescePartitionsExec`. + let coalesce_input = coalesce_onwards.plan.children()[0].clone(); + if let Some(sort_exec) = sort_exec { + let sort_expr = sort_exec.expr(); + if !ordering_satisfy( + coalesce_input.output_ordering(), + Some(sort_expr), + || coalesce_input.equivalence_properties(), + ) { + return add_sort_above_child(&coalesce_input, sort_expr.to_vec()); + } + } + coalesce_input + } else { + let plan = coalesce_onwards.plan.clone(); + let mut children = plan.children(); + for item in &mut coalesce_onwards.children { + children[item.idx] = + change_corresponding_coalesce_in_sub_plan(item, sort_exec)?; + } + plan.with_new_children(children)? + }, + ) +} + +/// Updates child to remove the unnecessary sorting below it. +fn update_child_to_remove_unnecessary_sort( + child: &mut Arc, + sort_onwards: &mut Option, + parent: &Arc, +) -> Result<()> { + if let Some(sort_onwards) = sort_onwards { + let requires_single_partition = matches!( + parent.required_input_distribution()[sort_onwards.idx], + Distribution::SinglePartition + ); + *child = remove_corresponding_sort_from_sub_plan( + sort_onwards, + requires_single_partition, + )?; + } + *sort_onwards = None; + Ok(()) } /// Removes the sort from the plan in `sort_onwards`. fn remove_corresponding_sort_from_sub_plan( - sort_onwards: &mut Vec<(usize, Arc)>, + sort_onwards: &mut ExecTree, + requires_single_partition: bool, +) -> Result> { + // A `SortExec` is always at the bottom of the tree. + if is_sort(&sort_onwards.plan) { + Ok(sort_onwards.plan.children()[0].clone()) + } else { + let plan = &sort_onwards.plan; + let mut children = plan.children(); + for item in &mut sort_onwards.children { + let requires_single_partition = matches!( + plan.required_input_distribution()[item.idx], + Distribution::SinglePartition + ); + children[item.idx] = + remove_corresponding_sort_from_sub_plan(item, requires_single_partition)?; + } + if is_sort_preserving_merge(plan) { + let child = &children[0]; + if requires_single_partition + && child.output_partitioning().partition_count() > 1 + { + Ok(Arc::new(CoalescePartitionsExec::new(child.clone()))) + } else { + Ok(child.clone()) + } + } else { + plan.clone().with_new_children(children) + } + } +} + +/// Updates child to modify the unnecessarily fine sorting below it. +fn update_child_to_change_finer_sort( + child: &mut Arc, + sort_onwards: &mut Option, + n_sort_expr: usize, +) -> Result<()> { + if let Some(sort_onwards) = sort_onwards { + *child = change_finer_sort_in_sub_plan(sort_onwards, n_sort_expr)?; + } + Ok(()) +} + +/// Change the unnecessarily fine sort in `sort_onwards`. +fn change_finer_sort_in_sub_plan( + sort_onwards: &mut ExecTree, + n_sort_expr: usize, ) -> Result> { - let (_, sort_any) = sort_onwards[0].clone(); - let sort_exec = convert_to_sort_exec(&sort_any)?; - let mut prev_layer = sort_exec.input().clone(); - // In the loop below, se start from 1 as the first one is a SortExec - // and we are removing it from the plan. - for (child_idx, layer) in sort_onwards.iter().skip(1) { - let mut children = layer.children(); - children[*child_idx] = prev_layer; - prev_layer = layer.clone().with_new_children(children)?; - } - // We have removed the sort, hence empty the sort_onwards: - sort_onwards.clear(); - Ok(prev_layer) + let plan = &sort_onwards.plan; + // A `SortExec` is always at the bottom of the tree. + if is_sort(plan) { + let prev_layer = plan.children()[0].clone(); + let new_sort_expr = get_sort_exprs(plan)?[0..n_sort_expr].to_vec(); + let updated_plan = add_sort_above_child(&prev_layer, new_sort_expr)?; + *sort_onwards = ExecTree { + idx: sort_onwards.idx, + children: vec![], + plan: updated_plan.clone(), + }; + Ok(updated_plan) + } else { + let mut children = plan.children(); + for item in &mut sort_onwards.children { + children[item.idx] = change_finer_sort_in_sub_plan(item, n_sort_expr)?; + } + if is_sort_preserving_merge(plan) { + let new_sort_expr = get_sort_exprs(plan)?[0..n_sort_expr].to_vec(); + let updated_plan = Arc::new(SortPreservingMergeExec::new( + new_sort_expr, + children[0].clone(), + )) as Arc; + sort_onwards.plan = updated_plan.clone(); + Ok(updated_plan) + } else { + plan.clone().with_new_children(children) + } + } +} + +/// Converts an [ExecutionPlan] trait object to a [PhysicalSortExpr] slice when possible. +fn get_sort_exprs(sort_any: &Arc) -> Result<&[PhysicalSortExpr]> { + if let Some(sort_exec) = sort_any.as_any().downcast_ref::() { + Ok(sort_exec.expr()) + } else if let Some(sort_preserving_merge_exec) = + sort_any.as_any().downcast_ref::() + { + Ok(sort_preserving_merge_exec.expr()) + } else { + Err(DataFusionError::Plan( + "Given ExecutionPlan is not a SortExec or a SortPreservingMergeExec" + .to_string(), + )) + } } #[derive(Debug)] @@ -498,13 +916,17 @@ mod tests { use super::*; use crate::datasource::listing::PartitionedFile; use crate::datasource::object_store::ObjectStoreUrl; - use crate::physical_plan::displayable; + use crate::physical_optimizer::dist_enforcement::EnforceDistribution; + use crate::physical_plan::aggregates::PhysicalGroupBy; + use crate::physical_plan::aggregates::{AggregateExec, AggregateMode}; use crate::physical_plan::file_format::{FileScanConfig, ParquetExec}; use crate::physical_plan::filter::FilterExec; use crate::physical_plan::memory::MemoryExec; + use crate::physical_plan::repartition::RepartitionExec; use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use crate::physical_plan::union::UnionExec; use crate::physical_plan::windows::create_window_expr; + use crate::physical_plan::{displayable, Partitioning}; use crate::prelude::SessionContext; use arrow::compute::SortOptions; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; @@ -522,6 +944,13 @@ mod tests { Ok(schema) } + // Util function to get string representation of a physical plan + fn get_plan_string(plan: &Arc) -> Vec { + let formatted = displayable(plan.as_ref()).indent().to_string(); + let actual: Vec<&str> = formatted.trim().lines().collect(); + actual.iter().map(|elem| elem.to_string()).collect() + } + #[tokio::test] async fn test_is_column_aligned_nullable() -> Result<()> { let schema = create_test_schema()?; @@ -633,11 +1062,8 @@ mod tests { // Run the actual optimizer let optimized_physical_plan = EnforceSorting::new().optimize(physical_plan, state.config_options())?; - - let formatted = displayable(optimized_physical_plan.as_ref()) - .indent() - .to_string(); - let actual: Vec<&str> = formatted.trim().lines().collect(); + // Get string representation of the plan + let actual = get_plan_string(&optimized_physical_plan); assert_eq!( expected_optimized_lines, actual, "\n**Optimized Plan Mismatch\n\nexpected:\n\n{expected_optimized_lines:#?}\nactual:\n\n{actual:#?}\n\n" @@ -775,6 +1201,133 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_remove_unnecessary_sort2() -> Result<()> { + let schema = create_test_schema()?; + let source = memory_exec(&schema); + let sort_exprs = vec![sort_expr("non_nullable_col", &schema)]; + let sort = sort_exec(sort_exprs.clone(), source); + let spm = sort_preserving_merge_exec(sort_exprs, sort); + + let sort_exprs = vec![ + sort_expr("nullable_col", &schema), + sort_expr("non_nullable_col", &schema), + ]; + let sort2 = sort_exec(sort_exprs.clone(), spm); + let spm2 = sort_preserving_merge_exec(sort_exprs, sort2); + + let sort_exprs = vec![sort_expr("nullable_col", &schema)]; + let sort3 = sort_exec(sort_exprs, spm2); + let physical_plan = repartition_exec(repartition_exec(sort3)); + + let expected_input = vec![ + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=10", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " SortExec: [nullable_col@0 ASC]", + " SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]", + " SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]", + " SortPreservingMergeExec: [non_nullable_col@1 ASC]", + " SortExec: [non_nullable_col@1 ASC]", + " MemoryExec: partitions=0, partition_sizes=[]", + ]; + + let expected_optimized = vec![ + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=10", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=0", + " MemoryExec: partitions=0, partition_sizes=[]", + ]; + assert_optimized!(expected_input, expected_optimized, physical_plan); + Ok(()) + } + + #[tokio::test] + async fn test_remove_unnecessary_sort3() -> Result<()> { + let schema = create_test_schema()?; + let source = memory_exec(&schema); + let sort_exprs = vec![sort_expr("non_nullable_col", &schema)]; + let sort = sort_exec(sort_exprs.clone(), source); + let spm = sort_preserving_merge_exec(sort_exprs, sort); + + let sort_exprs = vec![ + sort_expr("nullable_col", &schema), + sort_expr("non_nullable_col", &schema), + ]; + let repartition_exec = repartition_exec(spm); + let sort2 = sort_exec(sort_exprs.clone(), repartition_exec); + let spm2 = sort_preserving_merge_exec(sort_exprs, sort2); + + let physical_plan = aggregate_exec(spm2); + + // When removing a `SortPreservingMergeExec`, make sure that partitioning + // requirements are not violated. In some cases, we may need to replace + // it with a `CoalescePartitionsExec` instead of directly removing it. + let expected_input = vec![ + "AggregateExec: mode=Final, gby=[], aggr=[]", + " SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]", + " SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " SortPreservingMergeExec: [non_nullable_col@1 ASC]", + " SortExec: [non_nullable_col@1 ASC]", + " MemoryExec: partitions=0, partition_sizes=[]", + ]; + + let expected_optimized = vec![ + "AggregateExec: mode=Final, gby=[], aggr=[]", + " CoalescePartitionsExec", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=0", + " MemoryExec: partitions=0, partition_sizes=[]", + ]; + assert_optimized!(expected_input, expected_optimized, physical_plan); + Ok(()) + } + + #[tokio::test] + async fn test_do_not_remove_sort_with_limit() -> Result<()> { + let schema = create_test_schema()?; + + let source1 = parquet_exec(&schema); + let sort_exprs = vec![ + sort_expr("nullable_col", &schema), + sort_expr("non_nullable_col", &schema), + ]; + let sort = sort_exec(sort_exprs.clone(), source1); + let limit = local_limit_exec(sort); + let limit = global_limit_exec(limit); + + let parquet_sort_exprs = vec![sort_expr("nullable_col", &schema)]; + let source2 = parquet_exec_sorted(&schema, parquet_sort_exprs); + + let union = union_exec(vec![source2, limit]); + let repartition = repartition_exec(union); + let physical_plan = sort_preserving_merge_exec(sort_exprs, repartition); + + let expected_input = vec![ + "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", + " UnionExec", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]", + " GlobalLimitExec: skip=0, fetch=100", + " LocalLimitExec: fetch=100", + " SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + ]; + + // We should keep the bottom `SortExec`. + let expected_optimized = vec![ + "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]", + " SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", + " UnionExec", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]", + " GlobalLimitExec: skip=0, fetch=100", + " LocalLimitExec: fetch=100", + " SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + ]; + assert_optimized!(expected_input, expected_optimized, physical_plan); + Ok(()) + } + #[tokio::test] async fn test_change_wrong_sorting() -> Result<()> { let schema = create_test_schema()?; @@ -897,6 +1450,332 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_union_inputs_different_sorted3() -> Result<()> { + let schema = create_test_schema()?; + + let source1 = parquet_exec(&schema); + let sort_exprs1 = vec![ + sort_expr("nullable_col", &schema), + sort_expr("non_nullable_col", &schema), + ]; + let sort1 = sort_exec(sort_exprs1, source1.clone()); + let sort_exprs2 = vec![sort_expr("nullable_col", &schema)]; + let sort2 = sort_exec(sort_exprs2, source1); + + let parquet_sort_exprs = vec![sort_expr("nullable_col", &schema)]; + let source2 = parquet_exec_sorted(&schema, parquet_sort_exprs.clone()); + + let union = union_exec(vec![sort1, source2, sort2]); + let physical_plan = sort_preserving_merge_exec(parquet_sort_exprs, union); + + // First input to the union is not Sorted (SortExec is finer than required ordering by the SortPreservingMergeExec above). + // Second input to the union is already Sorted (matches with the required ordering by the SortPreservingMergeExec above). + // Third input to the union is not Sorted (SortExec is matches required ordering by the SortPreservingMergeExec above). + let expected_input = vec![ + "SortPreservingMergeExec: [nullable_col@0 ASC]", + " UnionExec", + " SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]", + " SortExec: [nullable_col@0 ASC]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + ]; + // should adjust sorting in the first input of the union such that it is not unnecessarily fine + let expected_optimized = vec![ + "SortPreservingMergeExec: [nullable_col@0 ASC]", + " UnionExec", + " SortExec: [nullable_col@0 ASC]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]", + " SortExec: [nullable_col@0 ASC]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + ]; + assert_optimized!(expected_input, expected_optimized, physical_plan); + Ok(()) + } + + #[tokio::test] + async fn test_union_inputs_different_sorted4() -> Result<()> { + let schema = create_test_schema()?; + + let source1 = parquet_exec(&schema); + let sort_exprs1 = vec![ + sort_expr("nullable_col", &schema), + sort_expr("non_nullable_col", &schema), + ]; + let sort_exprs2 = vec![sort_expr("nullable_col", &schema)]; + let sort1 = sort_exec(sort_exprs2.clone(), source1.clone()); + let sort2 = sort_exec(sort_exprs2.clone(), source1); + + let source2 = parquet_exec_sorted(&schema, sort_exprs2); + + let union = union_exec(vec![sort1, source2, sort2]); + let physical_plan = sort_preserving_merge_exec(sort_exprs1, union); + + // Ordering requirement of the `SortPreservingMergeExec` is not met. + // Should modify the plan to ensure that all three inputs to the + // `UnionExec` satisfy the ordering, OR add a single sort after + // the `UnionExec` (both of which are equally good for this example). + let expected_input = vec![ + "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]", + " UnionExec", + " SortExec: [nullable_col@0 ASC]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]", + " SortExec: [nullable_col@0 ASC]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + ]; + let expected_optimized = vec![ + "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]", + " SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]", + " UnionExec", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + ]; + assert_optimized!(expected_input, expected_optimized, physical_plan); + Ok(()) + } + + #[tokio::test] + async fn test_union_inputs_different_sorted5() -> Result<()> { + let schema = create_test_schema()?; + + let source1 = parquet_exec(&schema); + let sort_exprs1 = vec![ + sort_expr("nullable_col", &schema), + sort_expr("non_nullable_col", &schema), + ]; + let sort_exprs2 = vec![ + sort_expr("nullable_col", &schema), + sort_expr_options( + "non_nullable_col", + &schema, + SortOptions { + descending: true, + nulls_first: false, + }, + ), + ]; + let sort_exprs3 = vec![sort_expr("nullable_col", &schema)]; + let sort1 = sort_exec(sort_exprs1, source1.clone()); + let sort2 = sort_exec(sort_exprs2, source1); + + let union = union_exec(vec![sort1, sort2]); + let physical_plan = sort_preserving_merge_exec(sort_exprs3, union); + + // The `UnionExec` doesn't preserve any of the inputs ordering in the + // example below. However, we should be able to change the unnecessarily + // fine `SortExec`s below with required `SortExec`s that are absolutely necessary. + let expected_input = vec![ + "SortPreservingMergeExec: [nullable_col@0 ASC]", + " UnionExec", + " SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " SortExec: [nullable_col@0 ASC,non_nullable_col@1 DESC NULLS LAST]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + ]; + let expected_optimized = vec![ + "SortPreservingMergeExec: [nullable_col@0 ASC]", + " UnionExec", + " SortExec: [nullable_col@0 ASC]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " SortExec: [nullable_col@0 ASC]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + ]; + assert_optimized!(expected_input, expected_optimized, physical_plan); + Ok(()) + } + + #[tokio::test] + async fn test_union_inputs_different_sorted6() -> Result<()> { + let schema = create_test_schema()?; + + let source1 = parquet_exec(&schema); + let sort_exprs1 = vec![sort_expr("nullable_col", &schema)]; + let sort1 = sort_exec(sort_exprs1, source1.clone()); + let sort_exprs2 = vec![ + sort_expr("nullable_col", &schema), + sort_expr("non_nullable_col", &schema), + ]; + let repartition = repartition_exec(source1); + let spm = sort_preserving_merge_exec(sort_exprs2, repartition); + + let parquet_sort_exprs = vec![sort_expr("nullable_col", &schema)]; + let source2 = parquet_exec_sorted(&schema, parquet_sort_exprs.clone()); + + let union = union_exec(vec![sort1, source2, spm]); + let physical_plan = sort_preserving_merge_exec(parquet_sort_exprs, union); + + // The plan is not valid as it is -- the input ordering requirement + // of the `SortPreservingMergeExec` under the third child of the + // `UnionExec` is not met. We should add a `SortExec` below it. + // At the same time, this ordering requirement is unnecessarily fine. + // The final plan should be valid AND the ordering of the third child + // shouldn't be finer than necessary. + let expected_input = vec![ + "SortPreservingMergeExec: [nullable_col@0 ASC]", + " UnionExec", + " SortExec: [nullable_col@0 ASC]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]", + " SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + ]; + // Should adjust the requirement in the third input of the union so + // that it is not unnecessarily fine. + let expected_optimized = vec![ + "SortPreservingMergeExec: [nullable_col@0 ASC]", + " UnionExec", + " SortExec: [nullable_col@0 ASC]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]", + " SortPreservingMergeExec: [nullable_col@0 ASC]", + " SortExec: [nullable_col@0 ASC]", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + ]; + assert_optimized!(expected_input, expected_optimized, physical_plan); + Ok(()) + } + + #[tokio::test] + async fn test_window_multi_path_sort() -> Result<()> { + let schema = create_test_schema()?; + + let sort_exprs1 = vec![ + sort_expr("nullable_col", &schema), + sort_expr("non_nullable_col", &schema), + ]; + let sort_exprs2 = vec![sort_expr("nullable_col", &schema)]; + // reverse sorting of sort_exprs2 + let sort_exprs3 = vec![sort_expr_options( + "nullable_col", + &schema, + SortOptions { + descending: true, + nulls_first: false, + }, + )]; + let source1 = parquet_exec_sorted(&schema, sort_exprs1); + let source2 = parquet_exec_sorted(&schema, sort_exprs2); + let sort1 = sort_exec(sort_exprs3.clone(), source1); + let sort2 = sort_exec(sort_exprs3.clone(), source2); + + let union = union_exec(vec![sort1, sort2]); + let physical_plan = window_exec("nullable_col", sort_exprs3, union); + + // The `WindowAggExec` gets its sorting from multiple children jointly. + // During the removal of `SortExec`s, it should be able to remove the + // corresponding SortExecs together. Also, the inputs of these `SortExec`s + // are not necessarily the same to be able to remove them. + let expected_input = vec![ + "WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow }]", + " UnionExec", + " SortExec: [nullable_col@0 DESC NULLS LAST]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC, non_nullable_col@1 ASC], projection=[nullable_col, non_nullable_col]", + " SortExec: [nullable_col@0 DESC NULLS LAST]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]", + ]; + let expected_optimized = vec![ + "WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(NULL) }]", + " UnionExec", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC, non_nullable_col@1 ASC], projection=[nullable_col, non_nullable_col]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]", + ]; + assert_optimized!(expected_input, expected_optimized, physical_plan); + Ok(()) + } + + #[tokio::test] + async fn test_multilayer_coalesce_partitions() -> Result<()> { + let schema = create_test_schema()?; + + let source1 = parquet_exec(&schema); + let repartition = repartition_exec(source1); + let coalesce = Arc::new(CoalescePartitionsExec::new(repartition)) as _; + // Add dummy layer propagating Sort above, to test whether sort can be removed from multi layer before + let filter = filter_exec( + Arc::new(NotExpr::new( + col("non_nullable_col", schema.as_ref()).unwrap(), + )), + coalesce, + ); + let sort_exprs = vec![sort_expr("nullable_col", &schema)]; + let physical_plan = sort_exec(sort_exprs, filter); + + // CoalescePartitionsExec and SortExec are not directly consecutive. In this case + // we should be able to parallelize Sorting also (given that executors in between don't require) + // single partition. + let expected_input = vec![ + "SortExec: [nullable_col@0 ASC]", + " FilterExec: NOT non_nullable_col@1", + " CoalescePartitionsExec", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + ]; + let expected_optimized = vec![ + "SortPreservingMergeExec: [nullable_col@0 ASC]", + " FilterExec: NOT non_nullable_col@1", + " SortExec: [nullable_col@0 ASC]", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + ]; + assert_optimized!(expected_input, expected_optimized, physical_plan); + Ok(()) + } + + #[tokio::test] + // With new change in SortEnforcement EnforceSorting->EnforceDistribution->EnforceSorting + // should produce same result with EnforceDistribution+EnforceSorting + // This enables us to use EnforceSorting possibly before EnforceDistribution + // Given that it will be called at least once after last EnforceDistribution. The reason is that + // EnforceDistribution may invalidate ordering invariant. + async fn test_commutativity() -> Result<()> { + let schema = create_test_schema()?; + + let session_ctx = SessionContext::new(); + let state = session_ctx.state(); + + let memory_exec = memory_exec(&schema); + let sort_exprs = vec![sort_expr("nullable_col", &schema)]; + let window = window_exec("nullable_col", sort_exprs.clone(), memory_exec); + let repartition = repartition_exec(window); + + let orig_plan = Arc::new(SortExec::new_with_partitioning( + sort_exprs, + repartition, + false, + None, + )) as Arc; + + let mut plan = orig_plan.clone(); + let rules = vec![ + Arc::new(EnforceDistribution::new()) as Arc, + Arc::new(EnforceSorting::new()) as Arc, + ]; + for rule in rules { + plan = rule.optimize(plan, state.config_options())?; + } + let first_plan = plan.clone(); + + let mut plan = orig_plan.clone(); + let rules = vec![ + Arc::new(EnforceSorting::new()) as Arc, + Arc::new(EnforceDistribution::new()) as Arc, + Arc::new(EnforceSorting::new()) as Arc, + ]; + for rule in rules { + plan = rule.optimize(plan, state.config_options())?; + } + let second_plan = plan.clone(); + + assert_eq!(get_plan_string(&first_plan), get_plan_string(&second_plan)); + Ok(()) + } + /// make PhysicalSortExpr with default options fn sort_expr(name: &str, schema: &Schema) -> PhysicalSortExpr { sort_expr_options(name, schema, SortOptions::default()) @@ -1016,4 +1895,32 @@ mod tests { fn union_exec(input: Vec>) -> Arc { Arc::new(UnionExec::new(input)) } + + fn local_limit_exec(input: Arc) -> Arc { + Arc::new(LocalLimitExec::new(input, 100)) + } + + fn global_limit_exec(input: Arc) -> Arc { + Arc::new(GlobalLimitExec::new(input, 0, Some(100))) + } + + fn repartition_exec(input: Arc) -> Arc { + Arc::new( + RepartitionExec::try_new(input, Partitioning::RoundRobinBatch(10)).unwrap(), + ) + } + + fn aggregate_exec(input: Arc) -> Arc { + let schema = input.schema(); + Arc::new( + AggregateExec::try_new( + AggregateMode::Final, + PhysicalGroupBy::default(), + vec![], + input, + schema, + ) + .unwrap(), + ) + } } diff --git a/datafusion/core/src/physical_optimizer/test_utils.rs b/datafusion/core/src/physical_optimizer/test_utils.rs index 1404dfa20c30c..8689b016b01c1 100644 --- a/datafusion/core/src/physical_optimizer/test_utils.rs +++ b/datafusion/core/src/physical_optimizer/test_utils.rs @@ -118,7 +118,12 @@ impl QueryCase { if error.is_some() { let plan_error = plan.unwrap_err(); let initial = error.unwrap().to_string(); - assert!(plan_error.to_string().contains(initial.as_str())); + assert!( + plan_error.to_string().contains(initial.as_str()), + "plan_error: {:?} doesn't contain message: {:?}", + plan_error, + initial.as_str() + ); } else { assert!(plan.is_ok()) } diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs index 01bd94e8e4bbe..5fc877a2c8e65 100644 --- a/datafusion/core/tests/sql/explain_analyze.rs +++ b/datafusion/core/tests/sql/explain_analyze.rs @@ -86,11 +86,6 @@ async fn explain_analyze_baseline_metrics() { "CoalesceBatchesExec: target_batch_size=4096", "metrics=[output_rows=5, elapsed_compute" ); - assert_metrics!( - &formatted, - "CoalescePartitionsExec", - "metrics=[output_rows=5, elapsed_compute=" - ); assert_metrics!( &formatted, "UnionExec", diff --git a/datafusion/core/tests/sql/joins.rs b/datafusion/core/tests/sql/joins.rs index e0bd1a523c4ad..37b662a284c9b 100644 --- a/datafusion/core/tests/sql/joins.rs +++ b/datafusion/core/tests/sql/joins.rs @@ -1980,8 +1980,8 @@ async fn left_semi_join() -> Result<()> { let physical_plan = dataframe.create_physical_plan().await?; let expected = if repartition_joins { vec![ - "SortExec: [t1_id@0 ASC NULLS LAST]", - " CoalescePartitionsExec", + "SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]", + " SortExec: [t1_id@0 ASC NULLS LAST]", " ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name]", " CoalesceBatchesExec: target_batch_size=4096", " HashJoinExec: mode=Partitioned, join_type=LeftSemi, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2_id\", index: 0 })]", @@ -1997,8 +1997,8 @@ async fn left_semi_join() -> Result<()> { ] } else { vec![ - "SortExec: [t1_id@0 ASC NULLS LAST]", - " CoalescePartitionsExec", + "SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]", + " SortExec: [t1_id@0 ASC NULLS LAST]", " ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name]", " CoalesceBatchesExec: target_batch_size=4096", " HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2_id\", index: 0 })]", @@ -2062,8 +2062,8 @@ async fn left_semi_join() -> Result<()> { let physical_plan = dataframe.create_physical_plan().await?; let expected = if repartition_joins { vec![ - "SortExec: [t1_id@0 ASC NULLS LAST]", - " CoalescePartitionsExec", + "SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]", + " SortExec: [t1_id@0 ASC NULLS LAST]", " ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name]", " CoalesceBatchesExec: target_batch_size=4096", " HashJoinExec: mode=Partitioned, join_type=LeftSemi, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2_id\", index: 0 })]", @@ -2078,8 +2078,8 @@ async fn left_semi_join() -> Result<()> { ] } else { vec![ - "SortExec: [t1_id@0 ASC NULLS LAST]", - " CoalescePartitionsExec", + "SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]", + " SortExec: [t1_id@0 ASC NULLS LAST]", " ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name]", " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1", " CoalesceBatchesExec: target_batch_size=4096", @@ -2259,8 +2259,8 @@ async fn right_semi_join() -> Result<()> { let dataframe = ctx.sql(sql).await.expect(&msg); let physical_plan = dataframe.create_physical_plan().await?; let expected = if repartition_joins { - vec![ "SortExec: [t1_id@0 ASC NULLS LAST]", - " CoalescePartitionsExec", + vec![ "SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]", + " SortExec: [t1_id@0 ASC NULLS LAST]", " ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as t1_int]", " CoalesceBatchesExec: target_batch_size=4096", " HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(Column { name: \"t2_id\", index: 0 }, Column { name: \"t1_id\", index: 0 })], filter=BinaryExpr { left: Column { name: \"t2_name\", index: 1 }, op: NotEq, right: Column { name: \"t1_name\", index: 0 } }", @@ -2275,8 +2275,8 @@ async fn right_semi_join() -> Result<()> { ] } else { vec![ - "SortExec: [t1_id@0 ASC NULLS LAST]", - " CoalescePartitionsExec", + "SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]", + " SortExec: [t1_id@0 ASC NULLS LAST]", " ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as t1_int]", " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1", " CoalesceBatchesExec: target_batch_size=4096", @@ -2307,8 +2307,8 @@ async fn right_semi_join() -> Result<()> { let dataframe = ctx.sql(sql).await.expect(&msg); let physical_plan = dataframe.create_physical_plan().await?; let expected = if repartition_joins { - vec![ "SortExec: [t1_id@0 ASC NULLS LAST]", - " CoalescePartitionsExec", + vec![ "SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]", + " SortExec: [t1_id@0 ASC NULLS LAST]", " ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as t1_int]", " CoalesceBatchesExec: target_batch_size=4096", " HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(Column { name: \"t2_id\", index: 0 }, Column { name: \"t1_id\", index: 0 })], filter=BinaryExpr { left: Column { name: \"t2_name\", index: 0 }, op: NotEq, right: Column { name: \"t1_name\", index: 1 } }", @@ -2323,8 +2323,8 @@ async fn right_semi_join() -> Result<()> { ] } else { vec![ - "SortExec: [t1_id@0 ASC NULLS LAST]", - " CoalescePartitionsExec", + "SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]", + " SortExec: [t1_id@0 ASC NULLS LAST]", " ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as t1_int]", " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1", " CoalesceBatchesExec: target_batch_size=4096", diff --git a/datafusion/core/tests/sql/window.rs b/datafusion/core/tests/sql/window.rs index da682abb85099..21a6062b8cd7a 100644 --- a/datafusion/core/tests/sql/window.rs +++ b/datafusion/core/tests/sql/window.rs @@ -1747,8 +1747,8 @@ async fn over_order_by_sort_keys_sorting_global_order_compacting() -> Result<()> // 3 SortExec are added let expected = { vec![ - "SortExec: [c2@0 ASC NULLS LAST]", - " CoalescePartitionsExec", + "SortPreservingMergeExec: [c2@0 ASC NULLS LAST]", + " SortExec: [c2@0 ASC NULLS LAST]", " ProjectionExec: expr=[c2@0 as c2, MAX(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as MAX(aggregate_test_100.c9), SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@4 as SUM(aggregate_test_100.c9), MIN(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as MIN(aggregate_test_100.c9)]", " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1", " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]", @@ -2298,15 +2298,14 @@ async fn test_remove_unnecessary_sort_in_sub_query() -> Result<()> { " AggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))]", " CoalescePartitionsExec", " AggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1))]", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CoalescePartitionsExec", - " AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[COUNT(UInt8(1))]", - " CoalesceBatchesExec: target_batch_size=4096", - " RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 8), input_partitions=8", - " AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[COUNT(UInt8(1))]", - " CoalesceBatchesExec: target_batch_size=4096", - " FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", + " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=8", + " AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[COUNT(UInt8(1))]", + " CoalesceBatchesExec: target_batch_size=4096", + " RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 8), input_partitions=8", + " AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[COUNT(UInt8(1))]", + " CoalesceBatchesExec: target_batch_size=4096", + " FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434", + " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", ] }; @@ -2385,6 +2384,173 @@ async fn test_window_agg_sort_orderby_reversed_partitionby_reversed_plan() -> Re Ok(()) } +#[tokio::test] +async fn test_window_agg_global_sort() -> Result<()> { + let config = SessionConfig::new() + .with_repartition_windows(true) + .with_target_partitions(2) + .with_repartition_sorts(true); + let ctx = SessionContext::with_config(config); + register_aggregate_csv(&ctx).await?; + let sql = "SELECT c1, ROW_NUMBER() OVER (PARTITION BY c1) as rn1 FROM aggregate_test_100 ORDER BY c1 ASC"; + + let msg = format!("Creating logical plan for '{sql}'"); + let dataframe = ctx.sql(sql).await.expect(&msg); + let physical_plan = dataframe.create_physical_plan().await?; + let formatted = displayable(physical_plan.as_ref()).indent().to_string(); + // Only 1 SortExec was added + let expected = { + vec![ + "SortPreservingMergeExec: [c1@0 ASC NULLS LAST]", + " ProjectionExec: expr=[c1@0 as c1, ROW_NUMBER() PARTITION BY [aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as rn1]", + " BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: \"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]", + " SortExec: [c1@0 ASC NULLS LAST]", + " CoalesceBatchesExec: target_batch_size=8192", + " RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 2), input_partitions=2", + " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1", + ] + }; + + let actual: Vec<&str> = formatted.trim().lines().collect(); + let actual_len = actual.len(); + let actual_trim_last = &actual[..actual_len - 1]; + assert_eq!( + expected, actual_trim_last, + "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" + ); + + Ok(()) +} + +#[tokio::test] +async fn test_window_agg_global_sort_parallelize_sort_disabled() -> Result<()> { + let config = SessionConfig::new() + .with_repartition_windows(true) + .with_target_partitions(2) + .with_repartition_sorts(false); + let ctx = SessionContext::with_config(config); + register_aggregate_csv(&ctx).await?; + let sql = "SELECT c1, ROW_NUMBER() OVER (PARTITION BY c1) as rn1 FROM aggregate_test_100 ORDER BY c1 ASC"; + + let msg = format!("Creating logical plan for '{sql}'"); + let dataframe = ctx.sql(sql).await.expect(&msg); + let physical_plan = dataframe.create_physical_plan().await?; + let formatted = displayable(physical_plan.as_ref()).indent().to_string(); + // Only 1 SortExec was added + let expected = { + vec![ + "SortExec: [c1@0 ASC NULLS LAST]", + " CoalescePartitionsExec", + " ProjectionExec: expr=[c1@0 as c1, ROW_NUMBER() PARTITION BY [aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as rn1]", + " BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: \"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]", + " SortExec: [c1@0 ASC NULLS LAST]", + " CoalesceBatchesExec: target_batch_size=8192", + " RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 2), input_partitions=2", + " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1", + ] + }; + + let actual: Vec<&str> = formatted.trim().lines().collect(); + let actual_len = actual.len(); + let actual_trim_last = &actual[..actual_len - 1]; + assert_eq!( + expected, actual_trim_last, + "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" + ); + + Ok(()) +} + +#[tokio::test] +async fn test_window_agg_global_sort_intermediate_parallel_sort() -> Result<()> { + let config = SessionConfig::new() + .with_repartition_windows(true) + .with_target_partitions(2) + .with_repartition_sorts(true); + let ctx = SessionContext::with_config(config); + register_aggregate_csv(&ctx).await?; + let sql = "SELECT c1, \ + SUM(C9) OVER (PARTITION BY C1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING) as sum1, \ + SUM(C9) OVER (ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum2 \ + FROM aggregate_test_100 ORDER BY c1 ASC"; + + let msg = format!("Creating logical plan for '{sql}'"); + let dataframe = ctx.sql(sql).await.expect(&msg); + let physical_plan = dataframe.create_physical_plan().await?; + let formatted = displayable(physical_plan.as_ref()).indent().to_string(); + // Only 1 SortExec was added + let expected = { + vec![ + "SortExec: [c1@0 ASC NULLS LAST]", + " ProjectionExec: expr=[c1@0 as c1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING@2 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@3 as sum2]", + " BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]", + " SortPreservingMergeExec: [c9@1 ASC NULLS LAST]", + " SortExec: [c9@1 ASC NULLS LAST]", + " BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(3)) }]", + " SortExec: [c1@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]", + " CoalesceBatchesExec: target_batch_size=8192", + " RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 2), input_partitions=2", + " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1", + ] + }; + + let actual: Vec<&str> = formatted.trim().lines().collect(); + let actual_len = actual.len(); + let actual_trim_last = &actual[..actual_len - 1]; + assert_eq!( + expected, actual_trim_last, + "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" + ); + + Ok(()) +} + +#[tokio::test] +async fn test_window_agg_with_global_limit() -> Result<()> { + let config = SessionConfig::new() + .with_repartition_windows(false) + .with_target_partitions(1); + let ctx = SessionContext::with_config(config); + register_aggregate_csv(&ctx).await?; + let sql = "SELECT ARRAY_AGG(c13) as array_agg1 FROM (SELECT * FROM aggregate_test_100 ORDER BY c13 LIMIT 1)"; + + let msg = format!("Creating logical plan for '{sql}'"); + let dataframe = ctx.sql(sql).await.expect(&msg); + let physical_plan = dataframe.create_physical_plan().await?; + let formatted = displayable(physical_plan.as_ref()).indent().to_string(); + // Only 1 SortExec was added + let expected = { + vec![ + "ProjectionExec: expr=[ARRAYAGG(aggregate_test_100.c13)@0 as array_agg1]", + " AggregateExec: mode=Final, gby=[], aggr=[ARRAYAGG(aggregate_test_100.c13)]", + " AggregateExec: mode=Partial, gby=[], aggr=[ARRAYAGG(aggregate_test_100.c13)]", + " GlobalLimitExec: skip=0, fetch=1", + " SortExec: [c13@0 ASC NULLS LAST]", + " ProjectionExec: expr=[c13@0 as c13]", + ] + }; + + let actual: Vec<&str> = formatted.trim().lines().collect(); + let actual_len = actual.len(); + let actual_trim_last = &actual[..actual_len - 1]; + assert_eq!( + expected, actual_trim_last, + "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" + ); + + let actual = execute_to_batches(&ctx, sql).await; + let expected = vec![ + "+----------------------------------+", + "| array_agg1 |", + "+----------------------------------+", + "| [0VVIHzxWtNOFLtnhjHEKjXaJOSLJfm] |", + "+----------------------------------+", + ]; + assert_batches_eq!(expected, &actual); + + Ok(()) +} + #[tokio::test] async fn test_window_agg_low_cardinality() -> Result<()> { let config = SessionConfig::new().with_target_partitions(32); diff --git a/datafusion/core/tests/sqllogictests/test_files/information_schema.slt b/datafusion/core/tests/sqllogictests/test_files/information_schema.slt index 4b1c0bc0e2092..75eca2ddff46d 100644 --- a/datafusion/core/tests/sqllogictests/test_files/information_schema.slt +++ b/datafusion/core/tests/sqllogictests/test_files/information_schema.slt @@ -134,6 +134,7 @@ datafusion.optimizer.repartition_aggregations true datafusion.optimizer.repartition_file_min_size 10485760 datafusion.optimizer.repartition_file_scans false datafusion.optimizer.repartition_joins true +datafusion.optimizer.repartition_sorts true datafusion.optimizer.repartition_windows true datafusion.optimizer.skip_failed_rules true datafusion.optimizer.top_down_join_key_reordering true diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index bd5b69467ebd5..ddfbc4e6e56f3 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -35,39 +35,40 @@ Values are parsed according to the [same rules used in casts from Utf8](https:// If the value in the environment variable cannot be cast to the type of the configuration option, the default value will be used instead and a warning emitted. Environment variables are read during `SessionConfig` initialisation so they must be set beforehand and will not affect running sessions. -| key | default | description | -| --------------------------------------------------------- | ---------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| datafusion.catalog.create_default_catalog_and_schema | true | Whether the default catalog and schema should be created automatically. | -| datafusion.catalog.default_catalog | datafusion | The default catalog name - this impacts what SQL queries use if not specified | -| datafusion.catalog.default_schema | public | The default schema name - this impacts what SQL queries use if not specified | -| datafusion.catalog.information_schema | false | Should DataFusion provide access to `information_schema` virtual tables for displaying schema information | -| datafusion.catalog.location | NULL | Location scanned to load tables for `default` schema | -| datafusion.catalog.format | NULL | Type of `TableProvider` to use when loading `default` schema | -| datafusion.catalog.has_header | false | If the file has a header | -| datafusion.execution.batch_size | 8192 | Default batch size while creating new batches, it's especially useful for buffer-in-memory batches since creating tiny batches would results in too much metadata memory consumption | -| datafusion.execution.coalesce_batches | true | When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting | -| datafusion.execution.collect_statistics | false | Should DataFusion collect statistics after listing files | -| datafusion.execution.target_partitions | 0 | Number of partitions for query execution. Increasing partitions can increase concurrency. Defaults to the number of cpu cores on the system | -| datafusion.execution.time_zone | +00:00 | The default time zone Some functions, e.g. EXTRACT(HOUR from SOME_TIME), shift the underlying datetime according to this time zone, and then extract the hour | -| datafusion.execution.parquet.enable_page_index | false | If true, uses parquet data page level metadata (Page Index) statistics to reduce the number of rows decoded. | -| datafusion.execution.parquet.pruning | true | If true, the parquet reader attempts to skip entire row groups based on the predicate in the query and the metadata (min/max values) stored in the parquet file | -| datafusion.execution.parquet.skip_metadata | true | If true, the parquet reader skip the optional embedded metadata that may be in the file Schema. This setting can help avoid schema conflicts when querying multiple parquet files with schemas containing compatible types but different metadata | -| datafusion.execution.parquet.metadata_size_hint | NULL | If specified, the parquet reader will try and fetch the last `size_hint` bytes of the parquet file optimistically. If not specified, two read are required: One read to fetch the 8-byte parquet footer and another to fetch the metadata length encoded in the footer | -| datafusion.execution.parquet.pushdown_filters | false | If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded | -| datafusion.execution.parquet.reorder_filters | false | If true, filter expressions evaluated during the parquet decoding operation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query | -| datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartition to increase parallelism to leverage more CPU cores | -| datafusion.optimizer.filter_null_join_keys | false | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down. | -| datafusion.optimizer.repartition_aggregations | true | Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel using the provided `target_partitions` level" | -| datafusion.optimizer.repartition_file_min_size | 10485760 | Minimum total files size in bytes to perform file scan repartitioning. | -| datafusion.optimizer.repartition_joins | true | Should DataFusion repartition data using the join keys to execute joins in parallel using the provided `target_partitions` level" | -| datafusion.optimizer.repartition_file_scans | false | When set to true, file groups will be repartitioned to achieve maximum parallelism. Currently supported only for Parquet format in which case multiple row groups from the same file may be read concurrently. If false then each row group is read serially, though different files may be read in parallel. | -| datafusion.optimizer.repartition_windows | true | Should DataFusion repartition data using the partitions keys to execute window functions in parallel using the provided `target_partitions` level" | -| datafusion.optimizer.skip_failed_rules | true | When set to true, the logical plan optimizer will produce warning messages if any optimization rules produce errors and then proceed to the next rule. When set to false, any rules that produce errors will cause the query to fail | -| datafusion.optimizer.max_passes | 3 | Number of times that the optimizer will attempt to optimize the plan | -| datafusion.optimizer.top_down_join_key_reordering | true | When set to true, the physical plan optimizer will run a top down process to reorder the join keys | -| datafusion.optimizer.prefer_hash_join | true | When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficiently than SortMergeJoin but consumes more memory | -| datafusion.optimizer.hash_join_single_partition_threshold | 1048576 | The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition | -| datafusion.explain.logical_plan_only | false | When set to true, the explain statement will only print logical plans | -| datafusion.explain.physical_plan_only | false | When set to true, the explain statement will only print physical plans | -| datafusion.sql_parser.parse_float_as_decimal | false | When set to true, sql parser will parse float as decimal type | -| datafusion.sql_parser.enable_ident_normalization | true | When set to true, sql parser will normalize ident(convert ident to lowercase when not quoted) | +| key | default | description | +| --------------------------------------------------------- | ---------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| datafusion.catalog.create_default_catalog_and_schema | true | Whether the default catalog and schema should be created automatically. | +| datafusion.catalog.default_catalog | datafusion | The default catalog name - this impacts what SQL queries use if not specified | +| datafusion.catalog.default_schema | public | The default schema name - this impacts what SQL queries use if not specified | +| datafusion.catalog.information_schema | false | Should DataFusion provide access to `information_schema` virtual tables for displaying schema information | +| datafusion.catalog.location | NULL | Location scanned to load tables for `default` schema | +| datafusion.catalog.format | NULL | Type of `TableProvider` to use when loading `default` schema | +| datafusion.catalog.has_header | false | If the file has a header | +| datafusion.execution.batch_size | 8192 | Default batch size while creating new batches, it's especially useful for buffer-in-memory batches since creating tiny batches would results in too much metadata memory consumption | +| datafusion.execution.coalesce_batches | true | When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting | +| datafusion.execution.collect_statistics | false | Should DataFusion collect statistics after listing files | +| datafusion.execution.target_partitions | 0 | Number of partitions for query execution. Increasing partitions can increase concurrency. Defaults to the number of cpu cores on the system | +| datafusion.execution.time_zone | +00:00 | The default time zone Some functions, e.g. EXTRACT(HOUR from SOME_TIME), shift the underlying datetime according to this time zone, and then extract the hour | +| datafusion.execution.parquet.enable_page_index | false | If true, uses parquet data page level metadata (Page Index) statistics to reduce the number of rows decoded. | +| datafusion.execution.parquet.pruning | true | If true, the parquet reader attempts to skip entire row groups based on the predicate in the query and the metadata (min/max values) stored in the parquet file | +| datafusion.execution.parquet.skip_metadata | true | If true, the parquet reader skip the optional embedded metadata that may be in the file Schema. This setting can help avoid schema conflicts when querying multiple parquet files with schemas containing compatible types but different metadata | +| datafusion.execution.parquet.metadata_size_hint | NULL | If specified, the parquet reader will try and fetch the last `size_hint` bytes of the parquet file optimistically. If not specified, two read are required: One read to fetch the 8-byte parquet footer and another to fetch the metadata length encoded in the footer | +| datafusion.execution.parquet.pushdown_filters | false | If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded | +| datafusion.execution.parquet.reorder_filters | false | If true, filter expressions evaluated during the parquet decoding operation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query | +| datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartition to increase parallelism to leverage more CPU cores | +| datafusion.optimizer.filter_null_join_keys | false | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down. | +| datafusion.optimizer.repartition_aggregations | true | Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel using the provided `target_partitions` level" | +| datafusion.optimizer.repartition_file_min_size | 10485760 | Minimum total files size in bytes to perform file scan repartitioning. | +| datafusion.optimizer.repartition_joins | true | Should DataFusion repartition data using the join keys to execute joins in parallel using the provided `target_partitions` level" | +| datafusion.optimizer.repartition_file_scans | false | When set to true, file groups will be repartitioned to achieve maximum parallelism. Currently supported only for Parquet format in which case multiple row groups from the same file may be read concurrently. If false then each row group is read serially, though different files may be read in parallel. | +| datafusion.optimizer.repartition_windows | true | Should DataFusion repartition data using the partitions keys to execute window functions in parallel using the provided `target_partitions` level" | +| datafusion.optimizer.repartition_sorts | true | Should DataFusion execute sorts in a per-partition fashion and merge afterwards instead of coalescing first and sorting globally With this flag is enabled, plans in the form below "SortExec: [a@0 ASC]", " CoalescePartitionsExec", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", would turn into the plan below which performs better in multithreaded environments "SortPreservingMergeExec: [a@0 ASC]", " SortExec: [a@0 ASC]", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", | +| datafusion.optimizer.skip_failed_rules | true | When set to true, the logical plan optimizer will produce warning messages if any optimization rules produce errors and then proceed to the next rule. When set to false, any rules that produce errors will cause the query to fail | +| datafusion.optimizer.max_passes | 3 | Number of times that the optimizer will attempt to optimize the plan | +| datafusion.optimizer.top_down_join_key_reordering | true | When set to true, the physical plan optimizer will run a top down process to reorder the join keys | +| datafusion.optimizer.prefer_hash_join | true | When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficiently than SortMergeJoin but consumes more memory | +| datafusion.optimizer.hash_join_single_partition_threshold | 1048576 | The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition | +| datafusion.explain.logical_plan_only | false | When set to true, the explain statement will only print logical plans | +| datafusion.explain.physical_plan_only | false | When set to true, the explain statement will only print physical plans | +| datafusion.sql_parser.parse_float_as_decimal | false | When set to true, sql parser will parse float as decimal type | +| datafusion.sql_parser.enable_ident_normalization | true | When set to true, sql parser will normalize ident(convert ident to lowercase when not quoted) |