From 6b4b0b7e1da3bdc78d43305dceb2dc419c241f66 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Wed, 13 Sep 2023 16:18:15 +0300 Subject: [PATCH 01/13] Extend capabilities of enforcedist --- .../enforce_distribution.rs | 447 +++++++++++++++--- .../src/physical_plan/coalesce_partitions.rs | 4 + datafusion/core/src/physical_plan/memory.rs | 9 +- .../core/src/physical_plan/streaming.rs | 8 +- .../sqllogictest/test_files/groupby.slt | 12 +- 5 files changed, 394 insertions(+), 86 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index a0a9bc32e975e..5935d791ccb40 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -21,6 +21,7 @@ //! according to the configuration), this rule increases partition counts in //! the physical plan. +use arrow_schema::SchemaRef; use std::fmt; use std::fmt::Formatter; use std::sync::Arc; @@ -38,26 +39,27 @@ use crate::physical_plan::joins::{ }; use crate::physical_plan::projection::ProjectionExec; use crate::physical_plan::repartition::RepartitionExec; -use crate::physical_plan::sorts::sort::SortOptions; +use crate::physical_plan::sorts::sort::{SortExec, SortOptions}; use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use crate::physical_plan::union::{can_interleave, InterleaveExec, UnionExec}; use crate::physical_plan::windows::WindowAggExec; -use crate::physical_plan::Partitioning; use crate::physical_plan::{with_new_children_if_necessary, Distribution, ExecutionPlan}; +use crate::physical_plan::{DisplayAs, DisplayFormatType, Partitioning}; use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion}; use datafusion_expr::logical_plan::JoinType; use datafusion_physical_expr::equivalence::EquivalenceProperties; use datafusion_physical_expr::expressions::{Column, NoOp}; use datafusion_physical_expr::utils::{ - map_columns_before_projection, ordering_satisfy_requirement_concrete, + map_columns_before_projection, ordering_satisfy, + ordering_satisfy_requirement_concrete, }; use datafusion_physical_expr::{ - expr_list_eq_strict_order, normalize_expr_with_equivalence_properties, PhysicalExpr, - PhysicalSortRequirement, + expr_list_eq_strict_order, normalize_expr_with_equivalence_properties, + LexOrderingReq, PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement, }; -use datafusion_common::internal_err; +use datafusion_common::{internal_err, Statistics}; use itertools::izip; /// The `EnforceDistribution` rule ensures that distribution requirements are @@ -208,15 +210,14 @@ impl PhysicalOptimizerRule for EnforceDistribution { })? }; + let adjusted = require_top_ordering(adjusted)?; let distribution_context = DistributionContext::new(adjusted); // Distribution enforcement needs to be applied bottom-up. let distribution_context = distribution_context.transform_up(&|distribution_context| { ensure_distribution(distribution_context, config) })?; - - // If output ordering is not necessary, removes it - update_plan_to_remove_unnecessary_final_order(distribution_context) + remove_top_ordering_req(distribution_context.plan) } fn name(&self) -> &str { @@ -993,12 +994,6 @@ fn add_roundrobin_on_top( RepartitionExec::try_new(input, Partitioning::RoundRobinBatch(n_target))? .with_preserve_order(should_preserve_ordering), ) as Arc; - if let Some(exec_tree) = dist_onward { - return internal_err!( - "ExecTree should have been empty, but got:{:?}", - exec_tree - ); - } // update distribution onward with new operator update_distribution_onward(new_plan.clone(), dist_onward, input_idx); @@ -1134,21 +1129,64 @@ fn remove_unnecessary_repartition( } = distribution_context; // Remove any redundant RoundRobin at the start: - if let Some(repartition) = plan.as_any().downcast_ref::() { - if let Partitioning::RoundRobinBatch(n_out) = repartition.partitioning() { - // Repartition is useless: - if *n_out <= repartition.input().output_partitioning().partition_count() { - let mut new_distribution_onwards = - vec![None; repartition.input().children().len()]; - if let Some(exec_tree) = &distribution_onwards[0] { - for child in &exec_tree.children { - new_distribution_onwards[child.idx] = Some(child.clone()); - } - } - plan = repartition.input().clone(); - distribution_onwards = new_distribution_onwards; + while let Some(repartition) = plan.as_any().downcast_ref::() { + plan = repartition.input().clone(); + let mut new_distribution_onwards = vec![None; plan.children().len()]; + if let Some(exec_tree) = &distribution_onwards[0] { + for child in &exec_tree.children { + new_distribution_onwards[child.idx] = Some(child.clone()); + } + } + distribution_onwards = new_distribution_onwards; + } + + while let Some(coalesce_partition) = + plan.as_any().downcast_ref::() + { + plan = coalesce_partition.input().clone(); + let mut new_distribution_onwards = vec![None; plan.children().len()]; + if let Some(exec_tree) = &distribution_onwards[0] { + for child in &exec_tree.children { + new_distribution_onwards[child.idx] = Some(child.clone()); + } + } + distribution_onwards = new_distribution_onwards; + } + + while let Some(spm) = plan.as_any().downcast_ref::() { + let sort_expr = spm.expr().to_vec(); + let fetch = spm.fetch(); + plan = spm.input().clone(); + let mut new_distribution_onwards = vec![None; plan.children().len()]; + if let Some(exec_tree) = &distribution_onwards[0] { + for child in &exec_tree.children { + new_distribution_onwards[child.idx] = Some(child.clone()); } } + distribution_onwards = new_distribution_onwards; + // If the ordering requirement is already satisfied, do not add a sort. + if !ordering_satisfy( + plan.output_ordering(), + Some(&sort_expr), + || plan.equivalence_properties(), + || plan.ordering_equivalence_properties(), + ) { + let new_sort = SortExec::new(sort_expr, plan.clone()).with_fetch(fetch); + if distribution_onwards.is_empty() { + let mut distribution_onward = None; + update_distribution_onward(plan.clone(), &mut distribution_onward, 0); + distribution_onwards = vec![distribution_onward]; + } else { + update_distribution_onward(plan.clone(), &mut distribution_onwards[0], 0); + } + plan = Arc::new(if plan.output_partitioning().partition_count() > 1 { + new_sort.with_preserve_partitioning(true) + } else { + new_sort + }) as _; + // update_distribution_onward(plan.clone(), &mut distribution_onwards[0], 0); + } + // add_sort_above(&mut plan, sort_expr, fetch)?; } // Create a plan with the updated children: @@ -1225,13 +1263,15 @@ fn replace_order_preserving_variants_helper( .as_any() .downcast_ref::() { - return Ok(Arc::new(CoalescePartitionsExec::new(spm.input().clone()))); + return Ok(Arc::new(CoalescePartitionsExec::new( + updated_children[0].clone(), + ))); } if let Some(repartition) = exec_tree.plan.as_any().downcast_ref::() { if repartition.preserve_order() { return Ok(Arc::new( RepartitionExec::try_new( - repartition.input().clone(), + updated_children[0].clone(), repartition.partitioning().clone(), )? .with_preserve_order(false), @@ -1271,13 +1311,11 @@ fn ensure_distribution( repartition_beneficial_stat = stats.num_rows.map(|num_rows| num_rows > 1).unwrap_or(true); } - // Remove unnecessary repartition from the physical plan if any let DistributionContext { plan, mut distribution_onwards, } = remove_unnecessary_repartition(dist_context)?; - let n_children = plan.children().len(); // This loop iterates over all the children to: // - Increase parallelism for every child if it is beneficial. @@ -1680,6 +1718,179 @@ impl TreeNode for PlanWithKeyRequirements { } } +fn remove_top_ordering_req( + plan: Arc, +) -> Result> { + let new_children = plan + .children() + .into_iter() + .map(|child| remove_top_ordering_req(child)) + .collect::>>()?; + if plan.as_any().is::() { + Ok(new_children[0].clone()) + } else { + plan.with_new_children(new_children) + } +} + +fn require_top_ordering(plan: Arc) -> Result> { + let (new_plan, is_changed) = require_top_ordering_helper(plan)?; + if is_changed { + Ok(new_plan) + } else { + Ok(Arc::new(SortRequiringExec::new( + new_plan, + None, + Distribution::UnspecifiedDistribution, + )) as _) + } +} +/// Helper function that adds an ancillary `SortRequiringExec` to the given plan. +fn require_top_ordering_helper( + plan: Arc, +) -> Result<(Arc, bool)> { + let mut children = plan.children(); + // Global ordering defines desired ordering in the final result. + if children.len() != 1 { + // Ok(Arc::new(SortRequiringExec::new(plan.clone(), None, Distribution::UnspecifiedDistribution)) as _) + Ok((plan, false)) + } else if let Some(sort_exec) = plan.as_any().downcast_ref::() { + let req_ordering = sort_exec.output_ordering().unwrap_or(&[]); + let reqs = PhysicalSortRequirement::from_sort_exprs(req_ordering); + Ok(( + Arc::new(SortRequiringExec::new( + plan.clone(), + Some(reqs), + Distribution::SinglePartition, + )) as _, + true, + )) + } else if let Some(spm) = plan.as_any().downcast_ref::() { + let req_ordering = spm.expr(); + let reqs = PhysicalSortRequirement::from_sort_exprs(req_ordering); + Ok(( + Arc::new(SortRequiringExec::new( + plan.clone(), + Some(reqs), + Distribution::SinglePartition, + )) as _, + true, + )) + } else if plan.maintains_input_order()[0] + && plan.required_input_ordering()[0].is_none() + { + // Keep searching for a `SortExec` as long as ordering is maintained, + // and on-the-way operators do not themselves require an ordering. + // When an operator requires an ordering, any `SortExec` below can not + // be responsible for (i.e. the originator of) the global ordering. + let (new_child, is_changed) = + require_top_ordering_helper(children.swap_remove(0))?; + Ok((plan.with_new_children(vec![new_child])?, is_changed)) + } else { + // Stop searching, there is no global ordering desired for the query. + // Ok(Arc::new(SortRequiringExec::new(plan.clone(), None, Distribution::UnspecifiedDistribution)) as _) + Ok((plan, false)) + } +} + +/// Models operators like BoundedWindowExec that require an input +/// ordering but is easy to construct +#[derive(Debug)] +struct SortRequiringExec { + input: Arc, + requirements: Option, + dist_requirement: Distribution, +} + +impl SortRequiringExec { + fn new( + input: Arc, + requirements: Option, + dist_requirement: Distribution, + ) -> Self { + Self { + input, + requirements, + dist_requirement, + } + } +} + +impl DisplayAs for SortRequiringExec { + fn fmt_as( + &self, + _t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + if let Some(requirements) = &self.requirements { + let expr: Vec = requirements.iter().map(|e| e.to_string()).collect(); + write!(f, "SortRequiringExec: [{}]", expr.join(",")) + } else { + write!(f, "SortRequiringExec: [None]") + } + } +} + +impl ExecutionPlan for SortRequiringExec { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn schema(&self) -> SchemaRef { + self.input.schema() + } + + fn output_partitioning(&self) -> crate::physical_plan::Partitioning { + self.input.output_partitioning() + } + + fn benefits_from_input_partitioning(&self) -> Vec { + vec![false] + } + + fn required_input_distribution(&self) -> Vec { + vec![self.dist_requirement.clone()] + } + + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + self.input.output_ordering() + } + + fn children(&self) -> Vec> { + vec![self.input.clone()] + } + + // model that it requires the output ordering of its input + fn required_input_ordering(&self) -> Vec>> { + vec![self.requirements.clone()] + } + + fn with_new_children( + self: Arc, + mut children: Vec>, + ) -> Result> { + assert_eq!(children.len(), 1); + let child = children.pop().unwrap(); + Ok(Arc::new(Self::new( + child, + self.requirements.clone(), + self.dist_requirement.clone(), + ))) + } + + fn execute( + &self, + _partition: usize, + _context: Arc, + ) -> Result { + unreachable!(); + } + + fn statistics(&self) -> Statistics { + self.input.statistics() + } +} + #[cfg(test)] mod tests { use std::ops::Deref; @@ -1702,7 +1913,9 @@ mod tests { use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use crate::physical_plan::{displayable, DisplayAs, DisplayFormatType, Statistics}; - use crate::physical_optimizer::test_utils::repartition_exec; + use crate::physical_optimizer::test_utils::{ + coalesce_partitions_exec, repartition_exec, + }; use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use crate::physical_plan::sorts::sort::SortExec; use arrow::compute::SortOptions; @@ -1713,7 +1926,7 @@ mod tests { use datafusion_physical_expr::expressions::{BinaryExpr, Literal}; use datafusion_physical_expr::{ expressions, expressions::binary, expressions::lit, expressions::Column, - PhysicalExpr, PhysicalSortExpr, + LexOrdering, PhysicalExpr, PhysicalSortExpr, }; /// Models operators like BoundedWindowExec that require an input @@ -1721,11 +1934,13 @@ mod tests { #[derive(Debug)] struct SortRequiredExec { input: Arc, + expr: LexOrdering, } impl SortRequiredExec { fn new(input: Arc) -> Self { - Self { input } + let expr = input.output_ordering().unwrap_or(&[]).to_vec(); + Self { input, expr } } } @@ -1735,7 +1950,8 @@ mod tests { _t: DisplayFormatType, f: &mut std::fmt::Formatter, ) -> std::fmt::Result { - write!(f, "SortRequiredExec") + let expr: Vec = self.expr.iter().map(|e| e.to_string()).collect(); + write!(f, "SortRequiredExec: [{}]", expr.join(",")) } } @@ -2982,38 +3198,38 @@ mod tests { vec![ top_join_plan.as_str(), join_plan.as_str(), - "SortExec: expr=[a@0 ASC]", - "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", + "SortPreservingRepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "SortExec: expr=[a@0 ASC]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "SortExec: expr=[b1@1 ASC]", - "RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10", + "SortPreservingRepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10", "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "SortExec: expr=[b1@1 ASC]", "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "SortExec: expr=[c@2 ASC]", - "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", + "SortPreservingRepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "SortExec: expr=[c@2 ASC]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ], // Should include 7 RepartitionExecs _ => vec![ top_join_plan.as_str(), + "SortPreservingRepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", "SortExec: expr=[a@0 ASC]", - "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", join_plan.as_str(), - "SortExec: expr=[a@0 ASC]", - "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", + "SortPreservingRepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "SortExec: expr=[a@0 ASC]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "SortExec: expr=[b1@1 ASC]", - "RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10", + "SortPreservingRepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10", "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "SortExec: expr=[b1@1 ASC]", "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "SortExec: expr=[c@2 ASC]", - "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", + "SortPreservingRepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "SortExec: expr=[c@2 ASC]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ], }; @@ -3086,38 +3302,38 @@ mod tests { JoinType::Inner | JoinType::Right => vec![ top_join_plan.as_str(), join_plan.as_str(), - "SortExec: expr=[a@0 ASC]", - "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", + "SortPreservingRepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "SortExec: expr=[a@0 ASC]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "SortExec: expr=[b1@1 ASC]", - "RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10", + "SortPreservingRepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10", "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "SortExec: expr=[b1@1 ASC]", "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "SortExec: expr=[c@2 ASC]", - "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", + "SortPreservingRepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "SortExec: expr=[c@2 ASC]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ], // Should include 4 RepartitionExecs and 4 SortExecs _ => vec![ top_join_plan.as_str(), + "SortPreservingRepartitionExec: partitioning=Hash([b1@6], 10), input_partitions=10", "SortExec: expr=[b1@6 ASC]", - "RepartitionExec: partitioning=Hash([b1@6], 10), input_partitions=10", join_plan.as_str(), - "SortExec: expr=[a@0 ASC]", - "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", + "SortPreservingRepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "SortExec: expr=[a@0 ASC]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "SortExec: expr=[b1@1 ASC]", - "RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10", + "SortPreservingRepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10", "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "SortExec: expr=[b1@1 ASC]", "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "SortExec: expr=[c@2 ASC]", - "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", + "SortPreservingRepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "SortExec: expr=[c@2 ASC]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ], }; @@ -3300,6 +3516,12 @@ mod tests { "ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC]", ]; assert_optimized!(expected, exec, true); + let expected = &[ + "SortExec: expr=[a@0 ASC]", + "CoalescePartitionsExec", + "CoalesceBatchesExec: target_batch_size=4096", + "ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC]", + ]; assert_optimized!(expected, exec, false); Ok(()) } @@ -3434,7 +3656,7 @@ mod tests { sort_required_exec(filter_exec(sort_exec(sort_key, parquet_exec(), false))); let expected = &[ - "SortRequiredExec", + "SortRequiredExec: [c@2 ASC]", "FilterExec: c@2 = 0", // We can use repartition here, ordering requirement by SortRequiredExec // is still satisfied. @@ -3540,6 +3762,12 @@ mod tests { ]; assert_optimized!(expected, plan.clone(), true); + + let expected = &[ + "SortExec: expr=[c@2 ASC]", + "CoalescePartitionsExec", + "ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]", + ]; assert_optimized!(expected, plan, false); Ok(()) } @@ -3564,6 +3792,14 @@ mod tests { ]; assert_optimized!(expected, plan.clone(), true); + + let expected = &[ + "SortExec: expr=[c@2 ASC]", + "CoalescePartitionsExec", + "UnionExec", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]", + ]; assert_optimized!(expected, plan, false); Ok(()) } @@ -3582,7 +3818,7 @@ mod tests { // during repartitioning ordering is preserved let expected = &[ - "SortRequiredExec", + "SortRequiredExec: [c@2 ASC]", "FilterExec: c@2 = 0", "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]", @@ -3618,7 +3854,7 @@ mod tests { let expected = &[ "UnionExec", // union input 1: no repartitioning - "SortRequiredExec", + "SortRequiredExec: [c@2 ASC]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]", // union input 2: should repartition "FilterExec: c@2 = 0", @@ -3686,16 +3922,13 @@ mod tests { ("c".to_string(), "c".to_string()), ]; // sorted input - let plan = sort_preserving_merge_exec( - sort_key.clone(), - projection_exec_with_alias( - parquet_exec_multiple_sorted(vec![sort_key]), - alias, - ), - ); + let plan = sort_required_exec(projection_exec_with_alias( + parquet_exec_multiple_sorted(vec![sort_key]), + alias, + )); let expected = &[ - "SortPreservingMergeExec: [c@2 ASC]", + "SortRequiredExec: [c@2 ASC]", // Since this projection is trivial, increasing parallelism is not beneficial "ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]", "ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]", @@ -4185,11 +4418,11 @@ mod tests { // no parallelization, because SortRequiredExec doesn't benefit from increased parallelism let expected_parquet = &[ - "SortRequiredExec", + "SortRequiredExec: [c@2 ASC]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]", ]; let expected_csv = &[ - "SortRequiredExec", + "SortRequiredExec: [c@2 ASC]", "CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], has_header=false", ]; @@ -4347,6 +4580,14 @@ mod tests { ]; assert_optimized!(expected, physical_plan.clone(), true); + + let expected = &[ + "SortExec: expr=[c@2 ASC]", + "CoalescePartitionsExec", + "FilterExec: c@2 = 0", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", + "ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]", + ]; assert_optimized!(expected, physical_plan, false); Ok(()) @@ -4376,6 +4617,15 @@ mod tests { ]; assert_optimized!(expected, physical_plan.clone(), true); + + let expected = &[ + "SortExec: expr=[a@0 ASC]", + "CoalescePartitionsExec", + "SortExec: expr=[a@0 ASC]", + "FilterExec: c@2 = 0", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", + "ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]", + ]; assert_optimized!(expected, physical_plan, false); Ok(()) @@ -4402,4 +4652,51 @@ mod tests { Ok(()) } + + #[test] + fn optimize_away_unnecessary_repartition() -> Result<()> { + let physical_plan = coalesce_partitions_exec(repartition_exec(parquet_exec())); + let expected = &[ + "CoalescePartitionsExec", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + ]; + plans_matches_expected!(expected, physical_plan.clone()); + + let expected = + &["ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]"]; + + assert_optimized!(expected, physical_plan.clone(), true); + assert_optimized!(expected, physical_plan, false); + + Ok(()) + } + + #[test] + fn optimize_away_unnecessary_repartition2() -> Result<()> { + let physical_plan = filter_exec(repartition_exec(coalesce_partitions_exec( + filter_exec(repartition_exec(parquet_exec())), + ))); + let expected = &[ + "FilterExec: c@2 = 0", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " CoalescePartitionsExec", + " FilterExec: c@2 = 0", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + ]; + plans_matches_expected!(expected, physical_plan.clone()); + + let expected = &[ + "FilterExec: c@2 = 0", + "FilterExec: c@2 = 0", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + ]; + + assert_optimized!(expected, physical_plan.clone(), true); + assert_optimized!(expected, physical_plan, false); + + Ok(()) + } } diff --git a/datafusion/core/src/physical_plan/coalesce_partitions.rs b/datafusion/core/src/physical_plan/coalesce_partitions.rs index 78cb7b201f263..fb94be24a6355 100644 --- a/datafusion/core/src/physical_plan/coalesce_partitions.rs +++ b/datafusion/core/src/physical_plan/coalesce_partitions.rs @@ -106,6 +106,10 @@ impl ExecutionPlan for CoalescePartitionsExec { self.input.equivalence_properties() } + fn benefits_from_input_partitioning(&self) -> Vec { + vec![false] + } + fn with_new_children( self: Arc, children: Vec>, diff --git a/datafusion/core/src/physical_plan/memory.rs b/datafusion/core/src/physical_plan/memory.rs index 877410c97ca54..65aa5a9dcd2f5 100644 --- a/datafusion/core/src/physical_plan/memory.rs +++ b/datafusion/core/src/physical_plan/memory.rs @@ -126,9 +126,14 @@ impl ExecutionPlan for MemoryExec { fn with_new_children( self: Arc, - _: Vec>, + children: Vec>, ) -> Result> { - internal_err!("Children cannot be replaced in {self:?}") + // MemoryExec has no children + if children.is_empty() { + Ok(self) + } else { + internal_err!("Children cannot be replaced in {self:?}") + } } fn execute( diff --git a/datafusion/core/src/physical_plan/streaming.rs b/datafusion/core/src/physical_plan/streaming.rs index 6c33f88a3991e..399707228c5c0 100644 --- a/datafusion/core/src/physical_plan/streaming.rs +++ b/datafusion/core/src/physical_plan/streaming.rs @@ -163,9 +163,13 @@ impl ExecutionPlan for StreamingTableExec { fn with_new_children( self: Arc, - _children: Vec>, + children: Vec>, ) -> Result> { - internal_err!("Children cannot be replaced in {self:?}") + if children.is_empty() { + Ok(self) + } else { + internal_err!("Children cannot be replaced in {self:?}") + } } fn execute( diff --git a/datafusion/sqllogictest/test_files/groupby.slt b/datafusion/sqllogictest/test_files/groupby.slt index 0c53a58ccdb1b..2d91cce232df5 100644 --- a/datafusion/sqllogictest/test_files/groupby.slt +++ b/datafusion/sqllogictest/test_files/groupby.slt @@ -1962,7 +1962,7 @@ SortPreservingMergeExec: [col0@0 ASC NULLS LAST] ------SortExec: expr=[col0@0 ASC NULLS LAST] --------CoalesceBatchesExec: target_batch_size=8192 ----------RepartitionExec: partitioning=Hash([col0@0, col1@1, col2@2], 4), input_partitions=4 -------------AggregateExec: mode=Partial, gby=[col0@0 as col0, col1@1 as col1, col2@2 as col2], aggr=[LAST_VALUE(r.col1)], ordering_mode=PartiallyOrdered +------------AggregateExec: mode=Partial, gby=[col0@0 as col0, col1@1 as col1, col2@2 as col2], aggr=[FIRST_VALUE(r.col1)], ordering_mode=PartiallyOrdered --------------SortExec: expr=[col0@3 ASC NULLS LAST] ----------------CoalesceBatchesExec: target_batch_size=8192 ------------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(col0@0, col0@0)] @@ -2127,9 +2127,8 @@ Projection: annotated_data_infinite2.a, annotated_data_infinite2.b, FIRST_VALUE( ----TableScan: annotated_data_infinite2 projection=[a, b, c] physical_plan ProjectionExec: expr=[a@0 as a, b@1 as b, FIRST_VALUE(annotated_data_infinite2.c) ORDER BY [annotated_data_infinite2.a DESC NULLS FIRST]@2 as first_c] ---AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b], aggr=[LAST_VALUE(annotated_data_infinite2.c)], ordering_mode=FullyOrdered -----AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b], aggr=[LAST_VALUE(annotated_data_infinite2.c)], ordering_mode=FullyOrdered -------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true +--AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], aggr=[LAST_VALUE(annotated_data_infinite2.c)], ordering_mode=FullyOrdered +----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true query III SELECT a, b, FIRST_VALUE(c ORDER BY a DESC) as first_c @@ -2154,9 +2153,8 @@ Projection: annotated_data_infinite2.a, annotated_data_infinite2.b, LAST_VALUE(a ----TableScan: annotated_data_infinite2 projection=[a, b, c] physical_plan ProjectionExec: expr=[a@0 as a, b@1 as b, LAST_VALUE(annotated_data_infinite2.c) ORDER BY [annotated_data_infinite2.a DESC NULLS FIRST]@2 as last_c] ---AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b], aggr=[FIRST_VALUE(annotated_data_infinite2.c)], ordering_mode=FullyOrdered -----AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b], aggr=[FIRST_VALUE(annotated_data_infinite2.c)], ordering_mode=FullyOrdered -------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true +--AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], aggr=[FIRST_VALUE(annotated_data_infinite2.c)], ordering_mode=FullyOrdered +----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true query III SELECT a, b, LAST_VALUE(c ORDER BY a DESC) as last_c From 8781bb92bc5504c2dfce51abfca2406971b74d56 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Thu, 14 Sep 2023 16:34:02 +0300 Subject: [PATCH 02/13] Simplifications --- .../enforce_distribution.rs | 125 +++++------------- 1 file changed, 32 insertions(+), 93 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index 5935d791ccb40..4029af90579a1 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -28,9 +28,12 @@ use std::sync::Arc; use crate::config::ConfigOptions; use crate::datasource::physical_plan::{CsvExec, ParquetExec}; -use crate::error::{DataFusionError, Result}; +use crate::error::Result; use crate::physical_optimizer::enforce_sorting::{unbounded_output, ExecTree}; -use crate::physical_optimizer::utils::{add_sort_above, get_plan_string}; +use crate::physical_optimizer::utils::{ + add_sort_above, get_plan_string, is_coalesce_partitions, is_repartition, + is_sort_preserving_merge, +}; use crate::physical_optimizer::PhysicalOptimizerRule; use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}; use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; @@ -51,15 +54,14 @@ use datafusion_expr::logical_plan::JoinType; use datafusion_physical_expr::equivalence::EquivalenceProperties; use datafusion_physical_expr::expressions::{Column, NoOp}; use datafusion_physical_expr::utils::{ - map_columns_before_projection, ordering_satisfy, - ordering_satisfy_requirement_concrete, + map_columns_before_projection, ordering_satisfy_requirement_concrete, }; use datafusion_physical_expr::{ expr_list_eq_strict_order, normalize_expr_with_equivalence_properties, LexOrderingReq, PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement, }; -use datafusion_common::{internal_err, Statistics}; +use datafusion_common::Statistics; use itertools::izip; /// The `EnforceDistribution` rule ensures that distribution requirements are @@ -958,6 +960,19 @@ fn update_distribution_onward( } } +fn delete_top_from_distribution_onward( + child_plan: &Arc, + dist_onward: &Option, +) -> Vec> { + let mut new_distribution_onwards = vec![None; child_plan.children().len()]; + if let Some(exec_tree) = &dist_onward { + for child in &exec_tree.children { + new_distribution_onwards[child.idx] = Some(child.clone()); + } + } + new_distribution_onwards +} + /// Adds RoundRobin repartition operator to the plan increase parallelism. /// /// # Arguments @@ -1120,7 +1135,7 @@ fn add_spm_on_top( /// "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", /// " ParquetExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a, b, c, d, e], output_ordering=\[a@0 ASC]", /// ``` -fn remove_unnecessary_repartition( +fn remove_dist_changing_operators( distribution_context: DistributionContext, ) -> Result { let DistributionContext { @@ -1128,65 +1143,16 @@ fn remove_unnecessary_repartition( mut distribution_onwards, } = distribution_context; - // Remove any redundant RoundRobin at the start: - while let Some(repartition) = plan.as_any().downcast_ref::() { - plan = repartition.input().clone(); - let mut new_distribution_onwards = vec![None; plan.children().len()]; - if let Some(exec_tree) = &distribution_onwards[0] { - for child in &exec_tree.children { - new_distribution_onwards[child.idx] = Some(child.clone()); - } - } - distribution_onwards = new_distribution_onwards; - } - - while let Some(coalesce_partition) = - plan.as_any().downcast_ref::() + // Remove any distribution changing operators at the beginning + // They will be re-inserted, according to requirements if absolutely necessary or helpful. + while is_repartition(&plan) + || is_coalesce_partitions(&plan) + || is_sort_preserving_merge(&plan) { - plan = coalesce_partition.input().clone(); - let mut new_distribution_onwards = vec![None; plan.children().len()]; - if let Some(exec_tree) = &distribution_onwards[0] { - for child in &exec_tree.children { - new_distribution_onwards[child.idx] = Some(child.clone()); - } - } - distribution_onwards = new_distribution_onwards; - } - - while let Some(spm) = plan.as_any().downcast_ref::() { - let sort_expr = spm.expr().to_vec(); - let fetch = spm.fetch(); - plan = spm.input().clone(); - let mut new_distribution_onwards = vec![None; plan.children().len()]; - if let Some(exec_tree) = &distribution_onwards[0] { - for child in &exec_tree.children { - new_distribution_onwards[child.idx] = Some(child.clone()); - } - } - distribution_onwards = new_distribution_onwards; - // If the ordering requirement is already satisfied, do not add a sort. - if !ordering_satisfy( - plan.output_ordering(), - Some(&sort_expr), - || plan.equivalence_properties(), - || plan.ordering_equivalence_properties(), - ) { - let new_sort = SortExec::new(sort_expr, plan.clone()).with_fetch(fetch); - if distribution_onwards.is_empty() { - let mut distribution_onward = None; - update_distribution_onward(plan.clone(), &mut distribution_onward, 0); - distribution_onwards = vec![distribution_onward]; - } else { - update_distribution_onward(plan.clone(), &mut distribution_onwards[0], 0); - } - plan = Arc::new(if plan.output_partitioning().partition_count() > 1 { - new_sort.with_preserve_partitioning(true) - } else { - new_sort - }) as _; - // update_distribution_onward(plan.clone(), &mut distribution_onwards[0], 0); - } - // add_sort_above(&mut plan, sort_expr, fetch)?; + // All of above operators have single child. when we remove top operator, we take first child. + plan = plan.children()[0].clone(); + distribution_onwards = + delete_top_from_distribution_onward(&plan, &distribution_onwards[0]); } // Create a plan with the updated children: @@ -1196,29 +1162,6 @@ fn remove_unnecessary_repartition( }) } -/// Changes each child of the `dist_context.plan` such that they no longer -/// use order preserving variants, if no ordering is required at the output -/// of the physical plan (there is no global ordering requirement by the query). -fn update_plan_to_remove_unnecessary_final_order( - dist_context: DistributionContext, -) -> Result> { - let DistributionContext { - plan, - distribution_onwards, - } = dist_context; - let new_children = izip!(plan.children(), distribution_onwards) - .map(|(mut child, mut dist_onward)| { - replace_order_preserving_variants(&mut child, &mut dist_onward)?; - Ok(child) - }) - .collect::>>()?; - if !new_children.is_empty() { - plan.with_new_children(new_children) - } else { - Ok(plan) - } -} - /// Updates the physical plan `input` by using `dist_onward` replace order preserving operator variants /// with their corresponding operators that do not preserve order. It is a wrapper for `replace_order_preserving_variants_helper` fn replace_order_preserving_variants( @@ -1258,11 +1201,7 @@ fn replace_order_preserving_variants_helper( for child in &exec_tree.children { updated_children[child.idx] = replace_order_preserving_variants_helper(child)?; } - if let Some(spm) = exec_tree - .plan - .as_any() - .downcast_ref::() - { + if exec_tree.plan.as_any().is::() { return Ok(Arc::new(CoalescePartitionsExec::new( updated_children[0].clone(), ))); @@ -1315,7 +1254,7 @@ fn ensure_distribution( let DistributionContext { plan, mut distribution_onwards, - } = remove_unnecessary_repartition(dist_context)?; + } = remove_dist_changing_operators(dist_context)?; let n_children = plan.children().len(); // This loop iterates over all the children to: // - Increase parallelism for every child if it is beneficial. From dcadf41f4727ecee392e3cfee30a6b0702e6ca19 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Fri, 15 Sep 2023 11:42:41 +0300 Subject: [PATCH 03/13] Fix test --- .../core/src/datasource/listing/table.rs | 27 ++++++++++++------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index e36252a99566e..c36521d76205f 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -1012,9 +1012,9 @@ mod tests { use arrow::datatypes::{DataType, Schema}; use arrow::record_batch::RecordBatch; use chrono::DateTime; - use datafusion_common::assert_contains; use datafusion_common::GetExt; - use datafusion_expr::LogicalPlanBuilder; + use datafusion_common::{assert_contains, ScalarValue}; + use datafusion_expr::{BinaryExpr, LogicalPlanBuilder, Operator}; use rstest::*; use std::collections::HashMap; use std::fs::File; @@ -2081,7 +2081,9 @@ mod tests { // Create the initial context, schema, and batch. let session_ctx = match session_config_map { Some(cfg) => { - let config = SessionConfig::from_string_hash_map(cfg)?; + let mut config = SessionConfig::from_string_hash_map(cfg)?; + // Make target partition number fixed + config.options_mut().execution.target_partitions = 8; SessionContext::with_config(config) } None => SessionContext::new(), @@ -2094,6 +2096,12 @@ mod tests { false, )])); + let filter_predicate = Expr::BinaryExpr(BinaryExpr::new( + Box::new(Expr::Column("column1".into())), + Operator::GtEq, + Box::new(Expr::Literal(ScalarValue::Int32(Some(0)))), + )); + // Create a new batch of data to insert into the table let batch = RecordBatch::try_new( schema.clone(), @@ -2173,8 +2181,10 @@ mod tests { // Convert the source table into a provider so that it can be used in a query let source = provider_as_source(source_table); // Create a table scan logical plan to read from the source table + // Since logical plan contains filter increasing parallelism is helpful, hence in the final plan we + // will have 8 partitions. let scan_plan = LogicalPlanBuilder::scan("source", source, None)? - .repartition(Partitioning::Hash(vec![Expr::Column("column1".into())], 6))? + .filter(filter_predicate)? .build()?; // Create an insert plan to insert the source data into the initial table let insert_into_table = @@ -2184,7 +2194,6 @@ mod tests { .state() .create_physical_plan(&insert_into_table) .await?; - // Execute the physical plan and collect the results let res = collect(plan, session_ctx.task_ctx()).await?; // Insert returns the number of rows written, in our case this would be 6. @@ -2216,9 +2225,9 @@ mod tests { // Assert that the batches read from the file match the expected result. assert_batches_eq!(expected, &batches); - // Assert that 6 files were added to the table + // Assert that 8 files were added to the table (by default target partition number is 8) let num_files = tmp_dir.path().read_dir()?.count(); - assert_eq!(num_files, 6); + assert_eq!(num_files, 8); // Create a physical plan from the insert plan let plan = session_ctx @@ -2259,9 +2268,9 @@ mod tests { // Assert that the batches read from the file after the second append match the expected result. assert_batches_eq!(expected, &batches); - // Assert that another 6 files were added to the table + // Assert that another 8 files were added to the table let num_files = tmp_dir.path().read_dir()?.count(); - assert_eq!(num_files, 12); + assert_eq!(num_files, 16); // Return Ok if the function Ok(()) From 4acbc072ffb9fa6b388e3780701e492394f5809c Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Fri, 15 Sep 2023 15:39:42 +0300 Subject: [PATCH 04/13] Do not use hard coded partition number --- datafusion/core/src/datasource/listing/table.rs | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index c36521d76205f..d0efa10c91085 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -2081,13 +2081,12 @@ mod tests { // Create the initial context, schema, and batch. let session_ctx = match session_config_map { Some(cfg) => { - let mut config = SessionConfig::from_string_hash_map(cfg)?; - // Make target partition number fixed - config.options_mut().execution.target_partitions = 8; + let config = SessionConfig::from_string_hash_map(cfg)?; SessionContext::with_config(config) } None => SessionContext::new(), }; + let target_partition_number = session_ctx.state().config().target_partitions(); // Create a new schema with one field called "a" of type Int32 let schema = Arc::new(Schema::new(vec![Field::new( @@ -2225,9 +2224,9 @@ mod tests { // Assert that the batches read from the file match the expected result. assert_batches_eq!(expected, &batches); - // Assert that 8 files were added to the table (by default target partition number is 8) + // Assert that # of `target_partition_number` files were added to the table. let num_files = tmp_dir.path().read_dir()?.count(); - assert_eq!(num_files, 8); + assert_eq!(num_files, target_partition_number); // Create a physical plan from the insert plan let plan = session_ctx @@ -2268,9 +2267,9 @@ mod tests { // Assert that the batches read from the file after the second append match the expected result. assert_batches_eq!(expected, &batches); - // Assert that another 8 files were added to the table + // Assert that another # of `target_partition_number` files were added to the table let num_files = tmp_dir.path().read_dir()?.count(); - assert_eq!(num_files, 16); + assert_eq!(num_files, 2 * target_partition_number); // Return Ok if the function Ok(()) From bac64fe42178881b00a0baac2885fb21893aa41f Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Mon, 18 Sep 2023 13:39:31 +0300 Subject: [PATCH 05/13] Add comments, Fix with_new_children of CustomPlan --- .../enforce_distribution.rs | 37 ++++++++++--------- .../provider_filter_pushdown.rs | 11 ++++-- 2 files changed, 27 insertions(+), 21 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index fbbbb54df41dc..6f1aa6c0f53c1 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -946,11 +946,12 @@ fn update_distribution_onward( } } -fn delete_top_from_distribution_onward( - child_plan: &Arc, +/// Get ExecTree for each child of the plan +fn get_children_exectrees( + n_children: usize, dist_onward: &Option, ) -> Vec> { - let mut new_distribution_onwards = vec![None; child_plan.children().len()]; + let mut new_distribution_onwards = vec![None; n_children]; if let Some(exec_tree) = &dist_onward { for child in &exec_tree.children { new_distribution_onwards[child.idx] = Some(child.clone()); @@ -1105,7 +1106,8 @@ fn add_spm_on_top( } /// Updates the physical plan inside `distribution_context` if having a -/// `RepartitionExec(RoundRobin)` is not helpful. +/// so that executors that change distribution are removed from the top +/// (If they are necessary, they will be added in subsequent stages). /// /// Assume that following plan is given: /// ```text @@ -1114,12 +1116,11 @@ fn add_spm_on_top( /// " ParquetExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a, b, c, d, e], output_ordering=\[a@0 ASC]", /// ``` /// -/// `RepartitionExec` at the top is unnecessary. Since it doesn't help with increasing parallelism. -/// This function removes top repartition, and returns following plan. +/// Since `RepartitionExec`s changes distribution. +/// This function removes `RepartitionExec`s, and returns following plan. /// /// ```text -/// "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", -/// " ParquetExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a, b, c, d, e], output_ordering=\[a@0 ASC]", +/// "ParquetExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a, b, c, d, e], output_ordering=\[a@0 ASC]", /// ``` fn remove_dist_changing_operators( distribution_context: DistributionContext, @@ -1138,7 +1139,7 @@ fn remove_dist_changing_operators( // All of above operators have single child. when we remove top operator, we take first child. plan = plan.children()[0].clone(); distribution_onwards = - delete_top_from_distribution_onward(&plan, &distribution_onwards[0]); + get_children_exectrees(plan.children().len(), &distribution_onwards[0]); } // Create a plan with the updated children: @@ -1643,6 +1644,7 @@ impl TreeNode for PlanWithKeyRequirements { } } +/// This functions removes ancillary `SortRequiringExec` from the physical plan. fn remove_top_ordering_req( plan: Arc, ) -> Result> { @@ -1658,18 +1660,23 @@ fn remove_top_ordering_req( } } +/// This functions adds ancillary `SortRequiringExec` to the the physical plan, so that +/// global ordering requirement is not lost during optimization fn require_top_ordering(plan: Arc) -> Result> { let (new_plan, is_changed) = require_top_ordering_helper(plan)?; if is_changed { Ok(new_plan) } else { + // Add SortRequiringExec to the top. Ok(Arc::new(SortRequiringExec::new( new_plan, + // there is no ordering requirement None, Distribution::UnspecifiedDistribution, )) as _) } } + /// Helper function that adds an ancillary `SortRequiringExec` to the given plan. fn require_top_ordering_helper( plan: Arc, @@ -1677,17 +1684,13 @@ fn require_top_ordering_helper( let mut children = plan.children(); // Global ordering defines desired ordering in the final result. if children.len() != 1 { - // Ok(Arc::new(SortRequiringExec::new(plan.clone(), None, Distribution::UnspecifiedDistribution)) as _) Ok((plan, false)) } else if let Some(sort_exec) = plan.as_any().downcast_ref::() { let req_ordering = sort_exec.output_ordering().unwrap_or(&[]); + let req_dist = sort_exec.required_input_distribution()[0].clone(); let reqs = PhysicalSortRequirement::from_sort_exprs(req_ordering); Ok(( - Arc::new(SortRequiringExec::new( - plan.clone(), - Some(reqs), - Distribution::SinglePartition, - )) as _, + Arc::new(SortRequiringExec::new(plan.clone(), Some(reqs), req_dist)) as _, true, )) } else if let Some(spm) = plan.as_any().downcast_ref::() { @@ -1713,13 +1716,11 @@ fn require_top_ordering_helper( Ok((plan.with_new_children(vec![new_child])?, is_changed)) } else { // Stop searching, there is no global ordering desired for the query. - // Ok(Arc::new(SortRequiringExec::new(plan.clone(), None, Distribution::UnspecifiedDistribution)) as _) Ok((plan, false)) } } -/// Models operators like BoundedWindowExec that require an input -/// ordering but is easy to construct +/// Ancillary operator that require given ordering #[derive(Debug)] struct SortRequiringExec { input: Arc, diff --git a/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs b/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs index 79214092fa576..9dfa77ae4e01c 100644 --- a/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs +++ b/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs @@ -32,7 +32,7 @@ use datafusion::physical_plan::{ use datafusion::prelude::*; use datafusion::scalar::ScalarValue; use datafusion_common::cast::as_primitive_array; -use datafusion_common::{not_impl_err, DataFusionError}; +use datafusion_common::{internal_err, not_impl_err, DataFusionError}; use datafusion_expr::expr::{BinaryExpr, Cast}; use std::ops::Deref; use std::sync::Arc; @@ -96,9 +96,14 @@ impl ExecutionPlan for CustomPlan { fn with_new_children( self: Arc, - _: Vec>, + children: Vec>, ) -> Result> { - unreachable!() + // CustomPlan has no children + if children.is_empty() { + Ok(self) + } else { + internal_err!("Children cannot be replaced in {self:?}") + } } fn execute( From c639fd34061060bc95b621592f5bc6fb2495bb59 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Tue, 19 Sep 2023 10:54:14 +0300 Subject: [PATCH 06/13] Use sub-rule as separate rule. --- .../enforce_distribution.rs | 199 ++----------- .../physical_optimizer/global_requirements.rs | 270 ++++++++++++++++++ datafusion/core/src/physical_optimizer/mod.rs | 1 + .../core/src/physical_optimizer/optimizer.rs | 7 + .../sqllogictest/test_files/explain.slt | 4 + .../sqllogictest/test_files/groupby.slt | 2 +- 6 files changed, 299 insertions(+), 184 deletions(-) create mode 100644 datafusion/core/src/physical_optimizer/global_requirements.rs diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index 6f1aa6c0f53c1..17ea9445e4874 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -21,7 +21,6 @@ //! according to the configuration), this rule increases partition counts in //! the physical plan. -use arrow_schema::SchemaRef; use std::fmt; use std::fmt::Formatter; use std::sync::Arc; @@ -41,12 +40,12 @@ use crate::physical_plan::joins::{ }; use crate::physical_plan::projection::ProjectionExec; use crate::physical_plan::repartition::RepartitionExec; -use crate::physical_plan::sorts::sort::{SortExec, SortOptions}; +use crate::physical_plan::sorts::sort::SortOptions; use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use crate::physical_plan::union::{can_interleave, InterleaveExec, UnionExec}; use crate::physical_plan::windows::WindowAggExec; +use crate::physical_plan::Partitioning; use crate::physical_plan::{with_new_children_if_necessary, Distribution, ExecutionPlan}; -use crate::physical_plan::{DisplayAs, DisplayFormatType, Partitioning}; use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion}; use datafusion_expr::logical_plan::JoinType; @@ -56,11 +55,9 @@ use datafusion_physical_expr::utils::{ map_columns_before_projection, ordering_satisfy_requirement_concrete, }; use datafusion_physical_expr::{ - expr_list_eq_strict_order, LexOrderingReq, PhysicalExpr, PhysicalSortExpr, - PhysicalSortRequirement, + expr_list_eq_strict_order, PhysicalExpr, PhysicalSortRequirement, }; -use datafusion_common::Statistics; use itertools::izip; /// The `EnforceDistribution` rule ensures that distribution requirements are @@ -211,14 +208,13 @@ impl PhysicalOptimizerRule for EnforceDistribution { })? }; - let adjusted = require_top_ordering(adjusted)?; let distribution_context = DistributionContext::new(adjusted); // Distribution enforcement needs to be applied bottom-up. let distribution_context = distribution_context.transform_up(&|distribution_context| { ensure_distribution(distribution_context, config) })?; - remove_top_ordering_req(distribution_context.plan) + Ok(distribution_context.plan) } fn name(&self) -> &str { @@ -1644,179 +1640,6 @@ impl TreeNode for PlanWithKeyRequirements { } } -/// This functions removes ancillary `SortRequiringExec` from the physical plan. -fn remove_top_ordering_req( - plan: Arc, -) -> Result> { - let new_children = plan - .children() - .into_iter() - .map(|child| remove_top_ordering_req(child)) - .collect::>>()?; - if plan.as_any().is::() { - Ok(new_children[0].clone()) - } else { - plan.with_new_children(new_children) - } -} - -/// This functions adds ancillary `SortRequiringExec` to the the physical plan, so that -/// global ordering requirement is not lost during optimization -fn require_top_ordering(plan: Arc) -> Result> { - let (new_plan, is_changed) = require_top_ordering_helper(plan)?; - if is_changed { - Ok(new_plan) - } else { - // Add SortRequiringExec to the top. - Ok(Arc::new(SortRequiringExec::new( - new_plan, - // there is no ordering requirement - None, - Distribution::UnspecifiedDistribution, - )) as _) - } -} - -/// Helper function that adds an ancillary `SortRequiringExec` to the given plan. -fn require_top_ordering_helper( - plan: Arc, -) -> Result<(Arc, bool)> { - let mut children = plan.children(); - // Global ordering defines desired ordering in the final result. - if children.len() != 1 { - Ok((plan, false)) - } else if let Some(sort_exec) = plan.as_any().downcast_ref::() { - let req_ordering = sort_exec.output_ordering().unwrap_or(&[]); - let req_dist = sort_exec.required_input_distribution()[0].clone(); - let reqs = PhysicalSortRequirement::from_sort_exprs(req_ordering); - Ok(( - Arc::new(SortRequiringExec::new(plan.clone(), Some(reqs), req_dist)) as _, - true, - )) - } else if let Some(spm) = plan.as_any().downcast_ref::() { - let req_ordering = spm.expr(); - let reqs = PhysicalSortRequirement::from_sort_exprs(req_ordering); - Ok(( - Arc::new(SortRequiringExec::new( - plan.clone(), - Some(reqs), - Distribution::SinglePartition, - )) as _, - true, - )) - } else if plan.maintains_input_order()[0] - && plan.required_input_ordering()[0].is_none() - { - // Keep searching for a `SortExec` as long as ordering is maintained, - // and on-the-way operators do not themselves require an ordering. - // When an operator requires an ordering, any `SortExec` below can not - // be responsible for (i.e. the originator of) the global ordering. - let (new_child, is_changed) = - require_top_ordering_helper(children.swap_remove(0))?; - Ok((plan.with_new_children(vec![new_child])?, is_changed)) - } else { - // Stop searching, there is no global ordering desired for the query. - Ok((plan, false)) - } -} - -/// Ancillary operator that require given ordering -#[derive(Debug)] -struct SortRequiringExec { - input: Arc, - requirements: Option, - dist_requirement: Distribution, -} - -impl SortRequiringExec { - fn new( - input: Arc, - requirements: Option, - dist_requirement: Distribution, - ) -> Self { - Self { - input, - requirements, - dist_requirement, - } - } -} - -impl DisplayAs for SortRequiringExec { - fn fmt_as( - &self, - _t: DisplayFormatType, - f: &mut std::fmt::Formatter, - ) -> std::fmt::Result { - if let Some(requirements) = &self.requirements { - let expr: Vec = requirements.iter().map(|e| e.to_string()).collect(); - write!(f, "SortRequiringExec: [{}]", expr.join(",")) - } else { - write!(f, "SortRequiringExec: [None]") - } - } -} - -impl ExecutionPlan for SortRequiringExec { - fn as_any(&self) -> &dyn std::any::Any { - self - } - - fn schema(&self) -> SchemaRef { - self.input.schema() - } - - fn output_partitioning(&self) -> crate::physical_plan::Partitioning { - self.input.output_partitioning() - } - - fn benefits_from_input_partitioning(&self) -> Vec { - vec![false] - } - - fn required_input_distribution(&self) -> Vec { - vec![self.dist_requirement.clone()] - } - - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { - self.input.output_ordering() - } - - fn children(&self) -> Vec> { - vec![self.input.clone()] - } - - // model that it requires the output ordering of its input - fn required_input_ordering(&self) -> Vec>> { - vec![self.requirements.clone()] - } - - fn with_new_children( - self: Arc, - mut children: Vec>, - ) -> Result> { - assert_eq!(children.len(), 1); - let child = children.pop().unwrap(); - Ok(Arc::new(Self::new( - child, - self.requirements.clone(), - self.dist_requirement.clone(), - ))) - } - - fn execute( - &self, - _partition: usize, - _context: Arc, - ) -> Result { - unreachable!(); - } - - fn statistics(&self) -> Statistics { - self.input.statistics() - } -} - #[cfg(test)] mod tests { use std::ops::Deref; @@ -1826,6 +1649,7 @@ mod tests { use crate::datasource::object_store::ObjectStoreUrl; use crate::datasource::physical_plan::{FileScanConfig, ParquetExec}; use crate::physical_optimizer::enforce_sorting::EnforceSorting; + use crate::physical_optimizer::global_requirements::GlobalRequirements; use crate::physical_plan::aggregates::{ AggregateExec, AggregateMode, PhysicalGroupBy, }; @@ -2259,10 +2083,15 @@ mod tests { // `EnforceSorting` and `EnforceDistribution`. // TODO: Orthogonalize the tests here just to verify `EnforceDistribution` and create // new tests for the cascade. + + // Add global requirement at the start, of any + let optimizer = GlobalRequirements::new_add_mode(); + let optimized = optimizer.optimize($PLAN.clone(), &config)?; + let optimized = if $FIRST_ENFORCE_DIST { // Run enforce distribution rule first: let optimizer = EnforceDistribution::new(); - let optimized = optimizer.optimize($PLAN.clone(), &config)?; + let optimized = optimizer.optimize(optimized, &config)?; // The rule should be idempotent. // Re-running this rule shouldn't introduce unnecessary operators. let optimizer = EnforceDistribution::new(); @@ -2274,7 +2103,7 @@ mod tests { } else { // Run the enforce sorting rule first: let optimizer = EnforceSorting::new(); - let optimized = optimizer.optimize($PLAN.clone(), &config)?; + let optimized = optimizer.optimize(optimized, &config)?; // Run enforce distribution rule: let optimizer = EnforceDistribution::new(); let optimized = optimizer.optimize(optimized, &config)?; @@ -2285,6 +2114,10 @@ mod tests { optimized }; + // Remove ancillary global requirements at the end + let optimizer = GlobalRequirements::new_remove_mode(); + let optimized = optimizer.optimize(optimized, &config)?; + // Now format correctly let plan = displayable(optimized.as_ref()).indent(true).to_string(); let actual_lines = trim_plan_display(&plan); diff --git a/datafusion/core/src/physical_optimizer/global_requirements.rs b/datafusion/core/src/physical_optimizer/global_requirements.rs new file mode 100644 index 0000000000000..28216304d3362 --- /dev/null +++ b/datafusion/core/src/physical_optimizer/global_requirements.rs @@ -0,0 +1,270 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! The GlobalOrderRequire optimizer rule either: +//! - Adds an auxiliary `GlobalRequirementExec` operator to keep track of global +//! ordering and distribution requirement across rules, or +//! - Removes the auxiliary `GlobalRequirementExec` operator from the physical plan. +//! Since the `GlobalRequirementExec` operator is only a helper operator, it +//! shouldn't occur in the final plan (i.e. the executed plan). + +use std::sync::Arc; + +use crate::physical_optimizer::PhysicalOptimizerRule; +use crate::physical_plan::sorts::sort::SortExec; +use crate::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan}; + +use arrow_schema::SchemaRef; +use datafusion_common::config::ConfigOptions; +use datafusion_common::tree_node::{Transformed, TreeNode}; +use datafusion_common::{Result, Statistics}; +use datafusion_physical_expr::{ + Distribution, LexOrderingReq, PhysicalSortExpr, PhysicalSortRequirement, +}; +use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; + +/// This rule either adds or removes [`GlobalRequirements`]s to/from the physical +/// plan according to its `mode` attribute, which is set by the constructors +/// `new_add_mode` and `new_remove_mode`. With this rule, we can keep track of +/// the global requirements (ordering and distribution) across rules. +#[derive(Debug)] +pub struct GlobalRequirements { + mode: RuleMode, +} + +impl GlobalRequirements { + /// Create a new rule which works in `Add` mode; i.e. it simply adds a + /// top-level [`GlobalRequirementExec`] into the physical plan to keep track + /// of global ordering, and global distribution requirements if there is any. + /// Note that this rule should run at the beginning. + pub fn new_add_mode() -> Self { + Self { + mode: RuleMode::Add, + } + } + + /// Create a new rule which works in `Remove` mode; i.e. it simply removes + /// the top-level [`GlobalRequirementExec`] from the physical plan if there is + /// any. We do this because a `GlobalRequirementExec` is an ancillary, + /// non-executable operator whose sole purpose is to track global + /// requirements during optimization. Therefore, a + /// `GlobalRequirementExec` should not appear in the final plan. + pub fn new_remove_mode() -> Self { + Self { + mode: RuleMode::Remove, + } + } +} + +#[derive(Debug, Ord, PartialOrd, PartialEq, Eq, Hash)] +enum RuleMode { + Add, + Remove, +} + +/// An ancillary, non-executable operator whose sole purpose is to track global +/// requirements during optimization. It imposes +/// - the ordering requirement in its `order_requirement` attribute. +/// - the distribution requirement in its `dist_requirement` attribute. +#[derive(Debug)] +struct GlobalRequirementExec { + input: Arc, + order_requirement: Option, + dist_requirement: Distribution, +} + +impl GlobalRequirementExec { + fn new( + input: Arc, + requirements: Option, + dist_requirement: Distribution, + ) -> Self { + Self { + input, + order_requirement: requirements, + dist_requirement, + } + } + + fn input(&self) -> Arc { + self.input.clone() + } +} + +impl DisplayAs for GlobalRequirementExec { + fn fmt_as( + &self, + _t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + write!(f, "GlobalRequirementExec") + } +} + +impl ExecutionPlan for GlobalRequirementExec { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn schema(&self) -> SchemaRef { + self.input.schema() + } + + fn output_partitioning(&self) -> crate::physical_plan::Partitioning { + self.input.output_partitioning() + } + + fn benefits_from_input_partitioning(&self) -> Vec { + vec![false] + } + + fn required_input_distribution(&self) -> Vec { + vec![self.dist_requirement.clone()] + } + + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + self.input.output_ordering() + } + + fn children(&self) -> Vec> { + vec![self.input.clone()] + } + + fn required_input_ordering(&self) -> Vec>> { + vec![self.order_requirement.clone()] + } + + fn with_new_children( + self: Arc, + mut children: Vec>, + ) -> Result> { + assert_eq!(children.len(), 1); + let child = children.remove(0); + Ok(Arc::new(Self::new( + child, + self.order_requirement.clone(), + self.dist_requirement.clone(), + ))) + } + + fn execute( + &self, + _partition: usize, + _context: Arc, + ) -> Result { + unreachable!(); + } + + fn statistics(&self) -> Statistics { + self.input.statistics() + } +} + +impl PhysicalOptimizerRule for GlobalRequirements { + fn optimize( + &self, + plan: Arc, + _config: &ConfigOptions, + ) -> Result> { + match self.mode { + RuleMode::Add => require_top_ordering(plan), + RuleMode::Remove => plan.transform_up(&|plan| { + if let Some(sort_req) = + plan.as_any().downcast_ref::() + { + Ok(Transformed::Yes(sort_req.input().clone())) + } else { + Ok(Transformed::No(plan)) + } + }), + } + } + + fn name(&self) -> &str { + "GlobalRequirements" + } + + fn schema_check(&self) -> bool { + true + } +} + +/// This functions adds ancillary `GlobalRequirementExec` to the the physical plan, so that +/// global requirements are not lost during optimization. +fn require_top_ordering(plan: Arc) -> Result> { + let (new_plan, is_changed) = require_top_ordering_helper(plan)?; + if is_changed { + Ok(new_plan) + } else { + // Add `GlobalRequirementExec` to the top, with no specified ordering and distribution requirement. + Ok(Arc::new(GlobalRequirementExec::new( + new_plan, + // there is no ordering requirement + None, + Distribution::UnspecifiedDistribution, + )) as _) + } +} + +/// Helper function that adds an ancillary `GlobalRequirementExec` to the given plan. +/// First entry in the tuple is resulting plan, second entry indicates whether any +/// `GlobalRequirementExec` is added to the plan. +fn require_top_ordering_helper( + plan: Arc, +) -> Result<(Arc, bool)> { + let mut children = plan.children(); + // Global ordering defines desired ordering in the final result. + if children.len() != 1 { + Ok((plan, false)) + } else if let Some(sort_exec) = plan.as_any().downcast_ref::() { + let req_ordering = sort_exec.output_ordering().unwrap_or(&[]); + let req_dist = sort_exec.required_input_distribution()[0].clone(); + let reqs = PhysicalSortRequirement::from_sort_exprs(req_ordering); + Ok(( + Arc::new(GlobalRequirementExec::new( + plan.clone(), + Some(reqs), + req_dist, + )) as _, + true, + )) + } else if let Some(spm) = plan.as_any().downcast_ref::() { + let req_ordering = spm.expr(); + let reqs = PhysicalSortRequirement::from_sort_exprs(req_ordering); + Ok(( + Arc::new(GlobalRequirementExec::new( + plan.clone(), + Some(reqs), + Distribution::SinglePartition, + )) as _, + true, + )) + } else if plan.maintains_input_order()[0] + && plan.required_input_ordering()[0].is_none() + { + // Keep searching for a `SortExec` as long as ordering is maintained, + // and on-the-way operators do not themselves require an ordering. + // When an operator requires an ordering, any `SortExec` below can not + // be responsible for (i.e. the originator of) the global ordering. + let (new_child, is_changed) = + require_top_ordering_helper(children.swap_remove(0))?; + Ok((plan.with_new_children(vec![new_child])?, is_changed)) + } else { + // Stop searching, there is no global ordering desired for the query. + Ok((plan, false)) + } +} diff --git a/datafusion/core/src/physical_optimizer/mod.rs b/datafusion/core/src/physical_optimizer/mod.rs index 0801a9bc595c6..54d5ddcc6e206 100644 --- a/datafusion/core/src/physical_optimizer/mod.rs +++ b/datafusion/core/src/physical_optimizer/mod.rs @@ -26,6 +26,7 @@ pub mod coalesce_batches; pub mod combine_partial_final_agg; pub mod enforce_distribution; pub mod enforce_sorting; +pub mod global_requirements; pub mod join_selection; pub mod optimizer; pub mod pipeline_checker; diff --git a/datafusion/core/src/physical_optimizer/optimizer.rs b/datafusion/core/src/physical_optimizer/optimizer.rs index 5de70efe3c470..ea7d3ccbeab22 100644 --- a/datafusion/core/src/physical_optimizer/optimizer.rs +++ b/datafusion/core/src/physical_optimizer/optimizer.rs @@ -25,6 +25,7 @@ use crate::physical_optimizer::coalesce_batches::CoalesceBatches; use crate::physical_optimizer::combine_partial_final_agg::CombinePartialFinalAggregate; use crate::physical_optimizer::enforce_distribution::EnforceDistribution; use crate::physical_optimizer::enforce_sorting::EnforceSorting; +use crate::physical_optimizer::global_requirements::GlobalRequirements; use crate::physical_optimizer::join_selection::JoinSelection; use crate::physical_optimizer::pipeline_checker::PipelineChecker; use crate::physical_optimizer::topk_aggregation::TopKAggregation; @@ -68,6 +69,9 @@ impl PhysicalOptimizer { /// Create a new optimizer using the recommended list of rules pub fn new() -> Self { let rules: Vec> = vec![ + // If there is a required global requirement of the query, make sure that + // this information is not lost across different rules during optimization + Arc::new(GlobalRequirements::new_add_mode()), Arc::new(AggregateStatistics::new()), // Statistics-based join selection will change the Auto mode to a real join implementation, // like collect left, or hash join, or future sort merge join, which will influence the @@ -90,6 +94,9 @@ impl PhysicalOptimizer { // The CoalesceBatches rule will not influence the distribution and ordering of the // whole plan tree. Therefore, to avoid influencing other rules, it should run last. Arc::new(CoalesceBatches::new()), + // Remove the ancillary global requirement operator since we are done with the planning + // phase. + Arc::new(GlobalRequirements::new_remove_mode()), // The PipelineChecker rule will reject non-runnable query plans that use // pipeline-breaking operators on infinite input(s). The rule generates a // diagnostic error message when this happens. It makes no changes to the diff --git a/datafusion/sqllogictest/test_files/explain.slt b/datafusion/sqllogictest/test_files/explain.slt index b1ba1eb36d11c..303adcc1c7962 100644 --- a/datafusion/sqllogictest/test_files/explain.slt +++ b/datafusion/sqllogictest/test_files/explain.slt @@ -242,12 +242,16 @@ logical_plan after eliminate_projection SAME TEXT AS ABOVE logical_plan after push_down_limit SAME TEXT AS ABOVE logical_plan TableScan: simple_explain_test projection=[a, b, c] initial_physical_plan CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], has_header=true +physical_plan after GlobalRequirements +GlobalRequirementExec +--CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], has_header=true physical_plan after aggregate_statistics SAME TEXT AS ABOVE physical_plan after join_selection SAME TEXT AS ABOVE physical_plan after EnforceDistribution SAME TEXT AS ABOVE physical_plan after CombinePartialFinalAggregate SAME TEXT AS ABOVE physical_plan after EnforceSorting SAME TEXT AS ABOVE physical_plan after coalesce_batches SAME TEXT AS ABOVE +physical_plan after GlobalRequirements CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], has_header=true physical_plan after PipelineChecker SAME TEXT AS ABOVE physical_plan after LimitAggregation SAME TEXT AS ABOVE physical_plan CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], has_header=true diff --git a/datafusion/sqllogictest/test_files/groupby.slt b/datafusion/sqllogictest/test_files/groupby.slt index dce041271a386..c93617f352ad7 100644 --- a/datafusion/sqllogictest/test_files/groupby.slt +++ b/datafusion/sqllogictest/test_files/groupby.slt @@ -1962,7 +1962,7 @@ SortPreservingMergeExec: [col0@0 ASC NULLS LAST] ------SortExec: expr=[col0@0 ASC NULLS LAST] --------CoalesceBatchesExec: target_batch_size=8192 ----------RepartitionExec: partitioning=Hash([col0@0, col1@1, col2@2], 4), input_partitions=4 -------------AggregateExec: mode=Partial, gby=[col0@0 as col0, col1@1 as col1, col2@2 as col2], aggr=[FIRST_VALUE(r.col1)], ordering_mode=PartiallyOrdered +------------AggregateExec: mode=Partial, gby=[col0@0 as col0, col1@1 as col1, col2@2 as col2], aggr=[LAST_VALUE(r.col1)], ordering_mode=PartiallyOrdered --------------SortExec: expr=[col0@3 ASC NULLS LAST] ----------------CoalesceBatchesExec: target_batch_size=8192 ------------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(col0@0, col0@0)] From 09a129c096234f256d7a46cd09c972f54c202e8e Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Wed, 20 Sep 2023 09:32:32 +0300 Subject: [PATCH 07/13] Add unbounded method --- .../core/src/physical_optimizer/global_requirements.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/datafusion/core/src/physical_optimizer/global_requirements.rs b/datafusion/core/src/physical_optimizer/global_requirements.rs index 28216304d3362..3cc0c4faa25ed 100644 --- a/datafusion/core/src/physical_optimizer/global_requirements.rs +++ b/datafusion/core/src/physical_optimizer/global_requirements.rs @@ -148,6 +148,11 @@ impl ExecutionPlan for GlobalRequirementExec { vec![self.order_requirement.clone()] } + fn unbounded_output(&self, children: &[bool]) -> Result { + // Has single child + Ok(children[0]) + } + fn with_new_children( self: Arc, mut children: Vec>, From 9a5dce9f792f4026cdda7b5c6d326785020e3659 Mon Sep 17 00:00:00 2001 From: Mehmet Ozan Kabak Date: Wed, 27 Sep 2023 15:50:05 +0300 Subject: [PATCH 08/13] Final review --- .../core/src/datasource/listing/table.rs | 47 ++++++++++--------- .../enforce_distribution.rs | 24 +++++----- .../physical_optimizer/global_requirements.rs | 21 +++------ .../core/src/physical_optimizer/optimizer.rs | 2 +- .../provider_filter_pushdown.rs | 8 ++-- 5 files changed, 51 insertions(+), 51 deletions(-) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 1a080b6e32f55..c0d51a18dcd82 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -20,16 +20,8 @@ use std::str::FromStr; use std::{any::Any, sync::Arc}; -use arrow::compute::SortOptions; -use arrow::datatypes::{DataType, Field, SchemaBuilder, SchemaRef}; -use arrow_schema::Schema; -use async_trait::async_trait; -use datafusion_common::FileTypeWriterOptions; -use datafusion_common::{internal_err, plan_err, project_schema, SchemaExt, ToDFSchema}; -use datafusion_expr::expr::Sort; -use datafusion_optimizer::utils::conjunction; -use datafusion_physical_expr::{create_physical_expr, LexOrdering, PhysicalSortExpr}; -use futures::{future, stream, StreamExt, TryStreamExt}; +use super::helpers::{expr_applicable_for_cols, pruned_partition_list, split_files}; +use super::PartitionedFile; use crate::datasource::physical_plan::{FileScanConfig, FileSinkConfig}; use crate::datasource::{ @@ -49,13 +41,22 @@ use crate::{ logical_expr::Expr, physical_plan::{empty::EmptyExec, ExecutionPlan, Statistics}, }; -use datafusion_common::{FileCompressionType, FileType}; + +use arrow::compute::SortOptions; +use arrow::datatypes::{DataType, Field, SchemaBuilder, SchemaRef}; +use arrow_schema::Schema; +use datafusion_common::{ + internal_err, plan_err, project_schema, FileCompressionType, FileType, + FileTypeWriterOptions, SchemaExt, ToDFSchema, +}; use datafusion_execution::cache::cache_manager::FileStatisticsCache; use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache; +use datafusion_expr::expr::Sort; +use datafusion_optimizer::utils::conjunction; +use datafusion_physical_expr::{create_physical_expr, LexOrdering, PhysicalSortExpr}; -use super::PartitionedFile; - -use super::helpers::{expr_applicable_for_cols, pruned_partition_list, split_files}; +use async_trait::async_trait; +use futures::{future, stream, StreamExt, TryStreamExt}; /// Configuration for creating a [`ListingTable`] #[derive(Debug, Clone)] @@ -979,6 +980,9 @@ impl ListingTable { #[cfg(test)] mod tests { + use std::collections::HashMap; + use std::fs::File; + use super::*; use crate::datasource::{provider_as_source, MemTable}; use crate::execution::options::ArrowReadOptions; @@ -991,14 +995,13 @@ mod tests { logical_expr::{col, lit}, test::{columns, object_store::register_test_store}, }; + use arrow::datatypes::{DataType, Schema}; use arrow::record_batch::RecordBatch; - use datafusion_common::GetExt; - use datafusion_common::{assert_contains, ScalarValue}; + use datafusion_common::{assert_contains, GetExt, ScalarValue}; use datafusion_expr::{BinaryExpr, LogicalPlanBuilder, Operator}; + use rstest::*; - use std::collections::HashMap; - use std::fs::File; use tempfile::TempDir; /// It creates dummy file and checks if it can create unbounded input executors. @@ -2123,11 +2126,11 @@ mod tests { // Convert the source table into a provider so that it can be used in a query let source = provider_as_source(source_table); // Create a table scan logical plan to read from the source table - // Since logical plan contains filter increasing parallelism is helpful, hence in the final plan we - // will have 8 partitions. let scan_plan = LogicalPlanBuilder::scan("source", source, None)? .filter(filter_predicate)? .build()?; + // Since logical plan contains a filter, increasing parallelism is helpful. + // Therefore, we will have 8 partitions in the final plan. // Create an insert plan to insert the source data into the initial table let insert_into_table = LogicalPlanBuilder::insert_into(scan_plan, "t", &schema, false)?.build()?; @@ -2167,7 +2170,7 @@ mod tests { // Assert that the batches read from the file match the expected result. assert_batches_eq!(expected, &batches); - // Assert that # of `target_partition_number` files were added to the table. + // Assert that `target_partition_number` many files were added to the table. let num_files = tmp_dir.path().read_dir()?.count(); assert_eq!(num_files, target_partition_number); @@ -2210,7 +2213,7 @@ mod tests { // Assert that the batches read from the file after the second append match the expected result. assert_batches_eq!(expected, &batches); - // Assert that another # of `target_partition_number` files were added to the table + // Assert that another `target_partition_number` many files were added to the table. let num_files = tmp_dir.path().read_dir()?.count(); assert_eq!(num_files, 2 * target_partition_number); diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index 17ea9445e4874..2bd492493fac9 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -1101,9 +1101,9 @@ fn add_spm_on_top( } } -/// Updates the physical plan inside `distribution_context` if having a -/// so that executors that change distribution are removed from the top -/// (If they are necessary, they will be added in subsequent stages). +/// Updates the physical plan inside `distribution_context` so that distribution +/// changing operators are removed from the top. If they are necessary, they will +/// be added in subsequent stages. /// /// Assume that following plan is given: /// ```text @@ -1112,8 +1112,8 @@ fn add_spm_on_top( /// " ParquetExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a, b, c, d, e], output_ordering=\[a@0 ASC]", /// ``` /// -/// Since `RepartitionExec`s changes distribution. -/// This function removes `RepartitionExec`s, and returns following plan. +/// Since `RepartitionExec`s change the distribution, this function removes +/// them and returns following plan: /// /// ```text /// "ParquetExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a, b, c, d, e], output_ordering=\[a@0 ASC]", @@ -1126,13 +1126,14 @@ fn remove_dist_changing_operators( mut distribution_onwards, } = distribution_context; - // Remove any distribution changing operators at the beginning - // They will be re-inserted, according to requirements if absolutely necessary or helpful. + // Remove any distribution changing operators at the beginning: + // Note that they will be re-inserted later on if necessary or helpful. while is_repartition(&plan) || is_coalesce_partitions(&plan) || is_sort_preserving_merge(&plan) { - // All of above operators have single child. when we remove top operator, we take first child. + // All of above operators have a single child. When we remove the top + // operator, we take the first child. plan = plan.children()[0].clone(); distribution_onwards = get_children_exectrees(plan.children().len(), &distribution_onwards[0]); @@ -1184,7 +1185,7 @@ fn replace_order_preserving_variants_helper( for child in &exec_tree.children { updated_children[child.idx] = replace_order_preserving_variants_helper(child)?; } - if exec_tree.plan.as_any().is::() { + if is_sort_preserving_merge(&exec_tree.plan) { return Ok(Arc::new(CoalescePartitionsExec::new( updated_children[0].clone(), ))); @@ -1668,6 +1669,7 @@ mod tests { }; use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use crate::physical_plan::sorts::sort::SortExec; + use arrow::compute::SortOptions; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion_common::{FileCompressionType, ScalarValue}; @@ -2084,7 +2086,7 @@ mod tests { // TODO: Orthogonalize the tests here just to verify `EnforceDistribution` and create // new tests for the cascade. - // Add global requirement at the start, of any + // Add the ancillary global requirements operator at the start: let optimizer = GlobalRequirements::new_add_mode(); let optimized = optimizer.optimize($PLAN.clone(), &config)?; @@ -2114,7 +2116,7 @@ mod tests { optimized }; - // Remove ancillary global requirements at the end + // Remove the ancillary global requirements operator when done: let optimizer = GlobalRequirements::new_remove_mode(); let optimized = optimizer.optimize(optimized, &config)?; diff --git a/datafusion/core/src/physical_optimizer/global_requirements.rs b/datafusion/core/src/physical_optimizer/global_requirements.rs index 3cc0c4faa25ed..ff01822c76915 100644 --- a/datafusion/core/src/physical_optimizer/global_requirements.rs +++ b/datafusion/core/src/physical_optimizer/global_requirements.rs @@ -49,7 +49,7 @@ pub struct GlobalRequirements { impl GlobalRequirements { /// Create a new rule which works in `Add` mode; i.e. it simply adds a /// top-level [`GlobalRequirementExec`] into the physical plan to keep track - /// of global ordering, and global distribution requirements if there is any. + /// of global ordering and distribution requirements if there are any. /// Note that this rule should run at the beginning. pub fn new_add_mode() -> Self { Self { @@ -149,7 +149,7 @@ impl ExecutionPlan for GlobalRequirementExec { } fn unbounded_output(&self, children: &[bool]) -> Result { - // Has single child + // Has a single child Ok(children[0]) } @@ -157,10 +157,8 @@ impl ExecutionPlan for GlobalRequirementExec { self: Arc, mut children: Vec>, ) -> Result> { - assert_eq!(children.len(), 1); - let child = children.remove(0); Ok(Arc::new(Self::new( - child, + children.remove(0), // has a single child self.order_requirement.clone(), self.dist_requirement.clone(), ))) @@ -191,7 +189,7 @@ impl PhysicalOptimizerRule for GlobalRequirements { if let Some(sort_req) = plan.as_any().downcast_ref::() { - Ok(Transformed::Yes(sort_req.input().clone())) + Ok(Transformed::Yes(sort_req.input())) } else { Ok(Transformed::No(plan)) } @@ -240,19 +238,14 @@ fn require_top_ordering_helper( let req_dist = sort_exec.required_input_distribution()[0].clone(); let reqs = PhysicalSortRequirement::from_sort_exprs(req_ordering); Ok(( - Arc::new(GlobalRequirementExec::new( - plan.clone(), - Some(reqs), - req_dist, - )) as _, + Arc::new(GlobalRequirementExec::new(plan, Some(reqs), req_dist)) as _, true, )) } else if let Some(spm) = plan.as_any().downcast_ref::() { - let req_ordering = spm.expr(); - let reqs = PhysicalSortRequirement::from_sort_exprs(req_ordering); + let reqs = PhysicalSortRequirement::from_sort_exprs(spm.expr()); Ok(( Arc::new(GlobalRequirementExec::new( - plan.clone(), + plan, Some(reqs), Distribution::SinglePartition, )) as _, diff --git a/datafusion/core/src/physical_optimizer/optimizer.rs b/datafusion/core/src/physical_optimizer/optimizer.rs index ea7d3ccbeab22..a288037d59ba4 100644 --- a/datafusion/core/src/physical_optimizer/optimizer.rs +++ b/datafusion/core/src/physical_optimizer/optimizer.rs @@ -70,7 +70,7 @@ impl PhysicalOptimizer { pub fn new() -> Self { let rules: Vec> = vec![ // If there is a required global requirement of the query, make sure that - // this information is not lost across different rules during optimization + // this information is not lost across different rules during optimization. Arc::new(GlobalRequirements::new_add_mode()), Arc::new(AggregateStatistics::new()), // Statistics-based join selection will change the Auto mode to a real join implementation, diff --git a/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs b/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs index 9dfa77ae4e01c..73085937cbca0 100644 --- a/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs +++ b/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs @@ -15,10 +15,12 @@ // specific language governing permissions and limitations // under the License. +use std::ops::Deref; +use std::sync::Arc; + use arrow::array::{Int32Builder, Int64Array}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; -use async_trait::async_trait; use datafusion::datasource::provider::{TableProvider, TableType}; use datafusion::error::Result; use datafusion::execution::context::{SessionContext, SessionState, TaskContext}; @@ -34,8 +36,8 @@ use datafusion::scalar::ScalarValue; use datafusion_common::cast::as_primitive_array; use datafusion_common::{internal_err, not_impl_err, DataFusionError}; use datafusion_expr::expr::{BinaryExpr, Cast}; -use std::ops::Deref; -use std::sync::Arc; + +use async_trait::async_trait; fn create_batch(value: i32, num_rows: usize) -> Result { let mut builder = Int32Builder::with_capacity(num_rows); From fc59bb8e548e27d9d84274442ca6a7487cb78345 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Wed, 27 Sep 2023 16:04:57 +0300 Subject: [PATCH 09/13] Move util code to exectree file --- .../physical_optimizer/enforce_distribution.rs | 18 ++---------------- .../core/src/physical_optimizer/utils.rs | 17 +++++++++++++++++ 2 files changed, 19 insertions(+), 16 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index 2bd492493fac9..b1f05b2bb2c1a 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -29,8 +29,8 @@ use crate::config::ConfigOptions; use crate::datasource::physical_plan::{CsvExec, ParquetExec}; use crate::error::Result; use crate::physical_optimizer::utils::{ - add_sort_above, get_plan_string, is_coalesce_partitions, is_repartition, - is_sort_preserving_merge, unbounded_output, ExecTree, + add_sort_above, get_children_exectrees, get_plan_string, is_coalesce_partitions, + is_repartition, is_sort_preserving_merge, unbounded_output, ExecTree, }; use crate::physical_optimizer::PhysicalOptimizerRule; use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}; @@ -942,20 +942,6 @@ fn update_distribution_onward( } } -/// Get ExecTree for each child of the plan -fn get_children_exectrees( - n_children: usize, - dist_onward: &Option, -) -> Vec> { - let mut new_distribution_onwards = vec![None; n_children]; - if let Some(exec_tree) = &dist_onward { - for child in &exec_tree.children { - new_distribution_onwards[child.idx] = Some(child.clone()); - } - } - new_distribution_onwards -} - /// Adds RoundRobin repartition operator to the plan increase parallelism. /// /// # Arguments diff --git a/datafusion/core/src/physical_optimizer/utils.rs b/datafusion/core/src/physical_optimizer/utils.rs index b4dd75e5864ba..01ce4f93cd89c 100644 --- a/datafusion/core/src/physical_optimizer/utils.rs +++ b/datafusion/core/src/physical_optimizer/utils.rs @@ -86,6 +86,23 @@ impl ExecTree { } } +/// Get `ExecTree` for each child of the plan +/// If there is no `ExecTree` for a child of the plan, use `None`. +/// Children with `None` values are not tracked. +/// Children with `Some(ExecTree)` are tracked. +pub(crate) fn get_children_exectrees( + n_children: usize, + dist_onward: &Option, +) -> Vec> { + let mut new_distribution_onwards = vec![None; n_children]; + if let Some(exec_tree) = &dist_onward { + for child in &exec_tree.children { + new_distribution_onwards[child.idx] = Some(child.clone()); + } + } + new_distribution_onwards +} + // Get output (un)boundedness information for the given `plan`. pub(crate) fn unbounded_output(plan: &Arc) -> bool { let result = if plan.children().is_empty() { From e7450fe87f32e68612fbbea51db8cd230738d66c Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Wed, 27 Sep 2023 16:44:10 +0300 Subject: [PATCH 10/13] Update variables and comments --- .../core/src/physical_optimizer/utils.rs | 25 ++++++++++++------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/utils.rs b/datafusion/core/src/physical_optimizer/utils.rs index 01ce4f93cd89c..d5f9023d148b7 100644 --- a/datafusion/core/src/physical_optimizer/utils.rs +++ b/datafusion/core/src/physical_optimizer/utils.rs @@ -86,21 +86,28 @@ impl ExecTree { } } -/// Get `ExecTree` for each child of the plan -/// If there is no `ExecTree` for a child of the plan, use `None`. -/// Children with `None` values are not tracked. -/// Children with `Some(ExecTree)` are tracked. +/// Get `ExecTree` for each child of the plan if they are tracked. +/// # Arguments +/// +/// * `n_children` - Children count of the plan of interest +/// * `onward` - Contains `Some(ExecTree)` of the plan tracked. +/// - Contains `None` is plan is not tracked. +/// +/// # Returns +/// +/// A `Vec>` that contains tracking information of each child. +/// If a child is `None`, it is not tracked. If `Some(ExecTree)` child is tracked also. pub(crate) fn get_children_exectrees( n_children: usize, - dist_onward: &Option, + onward: &Option, ) -> Vec> { - let mut new_distribution_onwards = vec![None; n_children]; - if let Some(exec_tree) = &dist_onward { + let mut children_onward = vec![None; n_children]; + if let Some(exec_tree) = &onward { for child in &exec_tree.children { - new_distribution_onwards[child.idx] = Some(child.clone()); + children_onward[child.idx] = Some(child.clone()); } } - new_distribution_onwards + children_onward } // Get output (un)boundedness information for the given `plan`. From 50615f6650c038f6910c98d91c61a9714bb97526 Mon Sep 17 00:00:00 2001 From: Mustafa Akur <106137913+mustafasrepo@users.noreply.github.com> Date: Fri, 29 Sep 2023 09:43:05 +0300 Subject: [PATCH 11/13] Apply suggestions from code review Update comments Co-authored-by: Andrew Lamb --- .../core/src/physical_optimizer/global_requirements.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/datafusion/core/src/physical_optimizer/global_requirements.rs b/datafusion/core/src/physical_optimizer/global_requirements.rs index ff01822c76915..7ec06a1fb189b 100644 --- a/datafusion/core/src/physical_optimizer/global_requirements.rs +++ b/datafusion/core/src/physical_optimizer/global_requirements.rs @@ -41,6 +41,11 @@ use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeE /// plan according to its `mode` attribute, which is set by the constructors /// `new_add_mode` and `new_remove_mode`. With this rule, we can keep track of /// the global requirements (ordering and distribution) across rules. +/// +/// The primary usecase of this node and rule is to specify and preserve the desired output +/// ordering and distribution the entire plan. When sending to a single client, a single partition may +/// be desirable, but when sending to a multi-partitioned writer, keeping multiple partitions may be +/// better. #[derive(Debug)] pub struct GlobalRequirements { mode: RuleMode, @@ -80,6 +85,8 @@ enum RuleMode { /// requirements during optimization. It imposes /// - the ordering requirement in its `order_requirement` attribute. /// - the distribution requirement in its `dist_requirement` attribute. +/// +/// See [`GlobalRequirements`] for more details #[derive(Debug)] struct GlobalRequirementExec { input: Arc, From 7320727744542ebf7a2628b58e582d6912c868dc Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Fri, 29 Sep 2023 10:28:01 +0300 Subject: [PATCH 12/13] Address reviews --- .../enforce_distribution.rs | 52 ++++++++++++----- datafusion/core/src/physical_optimizer/mod.rs | 2 +- .../core/src/physical_optimizer/optimizer.rs | 10 ++-- ...requirements.rs => output_requirements.rs} | 56 +++++++++---------- 4 files changed, 73 insertions(+), 47 deletions(-) rename datafusion/core/src/physical_optimizer/{global_requirements.rs => output_requirements.rs} (85%) diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index 71703d9cfd075..4a7dc10420754 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -1652,7 +1652,7 @@ mod tests { use crate::datasource::object_store::ObjectStoreUrl; use crate::datasource::physical_plan::{FileScanConfig, ParquetExec}; use crate::physical_optimizer::enforce_sorting::EnforceSorting; - use crate::physical_optimizer::global_requirements::GlobalRequirements; + use crate::physical_optimizer::output_requirements::OutputRequirements; use crate::physical_plan::aggregates::{ AggregateExec, AggregateMode, PhysicalGroupBy, }; @@ -2088,8 +2088,8 @@ mod tests { // TODO: Orthogonalize the tests here just to verify `EnforceDistribution` and create // new tests for the cascade. - // Add the ancillary global requirements operator at the start: - let optimizer = GlobalRequirements::new_add_mode(); + // Add the ancillary output requirements operator at the start: + let optimizer = OutputRequirements::new_add_mode(); let optimized = optimizer.optimize($PLAN.clone(), &config)?; let optimized = if $FIRST_ENFORCE_DIST { @@ -2118,8 +2118,8 @@ mod tests { optimized }; - // Remove the ancillary global requirements operator when done: - let optimizer = GlobalRequirements::new_remove_mode(); + // Remove the ancillary output requirements operator when done: + let optimizer = OutputRequirements::new_remove_mode(); let optimized = optimizer.optimize(optimized, &config)?; // Now format correctly @@ -2956,7 +2956,7 @@ mod tests { format!("SortMergeJoin: join_type={join_type}, on=[(a@0, c@2)]"); let expected = match join_type { - // Should include 6 RepartitionExecs 3 SortExecs + // Should include 6 RepartitionExecs (3 hash, 3 round-robin), 3 SortExecs JoinType::Inner | JoinType::Left | JoinType::LeftSemi | JoinType::LeftAnti => vec![ top_join_plan.as_str(), @@ -2975,9 +2975,18 @@ mod tests { "SortExec: expr=[c@2 ASC]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ], - // Should include 7 RepartitionExecs + // Should include 7 RepartitionExecs (4 hash, 3 round-robin), 4 SortExecs + // Since ordering of the left child is not preserved after SortMergeJoin + // when mode is Right, RgihtSemi, RightAnti, Full + // - We need to add one additional SortExec after SortMergeJoin in contrast the test cases + // when mode is Inner, Left, LeftSemi, LeftAnti + // Similarly, since partitioning of the left side is not preserved + // when mode is Right, RgihtSemi, RightAnti, Full + // - We need to add one additional Hash Repartition after SortMergeJoin in contrast the test + // cases when mode is Inner, Left, LeftSemi, LeftAnti _ => vec![ top_join_plan.as_str(), + // Below 2 operators are differences introduced, when join mode is changed "SortPreservingRepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", "SortExec: expr=[a@0 ASC]", join_plan.as_str(), @@ -2999,7 +3008,7 @@ mod tests { assert_optimized!(expected, top_join.clone(), true, true); let expected_first_sort_enforcement = match join_type { - // Should include 3 RepartitionExecs 3 SortExecs + // Should include 6 RepartitionExecs (3 hash, 3 round-robin), 3 SortExecs JoinType::Inner | JoinType::Left | JoinType::LeftSemi | JoinType::LeftAnti => vec![ top_join_plan.as_str(), @@ -3018,9 +3027,18 @@ mod tests { "SortExec: expr=[c@2 ASC]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ], - // Should include 8 RepartitionExecs (4 of them preserves ordering) + // Should include 8 RepartitionExecs (4 hash, 8 round-robin), 4 SortExecs + // Since ordering of the left child is not preserved after SortMergeJoin + // when mode is Right, RgihtSemi, RightAnti, Full + // - We need to add one additional SortExec after SortMergeJoin in contrast the test cases + // when mode is Inner, Left, LeftSemi, LeftAnti + // Similarly, since partitioning of the left side is not preserved + // when mode is Right, RgihtSemi, RightAnti, Full + // - We need to add one additional Hash Repartition and Roundrobin repartition after + // SortMergeJoin in contrast the test cases when mode is Inner, Left, LeftSemi, LeftAnti _ => vec![ top_join_plan.as_str(), + // Below 4 operators are differences introduced, when join mode is changed "SortPreservingRepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "SortExec: expr=[a@0 ASC]", @@ -3061,7 +3079,7 @@ mod tests { format!("SortMergeJoin: join_type={join_type}, on=[(b1@6, c@2)]"); let expected = match join_type { - // Should include 3 RepartitionExecs and 3 SortExecs + // Should include 6 RepartitionExecs(3 hash, 3 round-robin) and 3 SortExecs JoinType::Inner | JoinType::Right => vec![ top_join_plan.as_str(), join_plan.as_str(), @@ -3079,8 +3097,8 @@ mod tests { "SortExec: expr=[c@2 ASC]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ], - // Should include 4 RepartitionExecs and 4 SortExecs - _ => vec![ + // Should include 7 RepartitionExecs (4 hash, 3 round-robin) and 4 SortExecs + JoinType::Left | JoinType::Full => vec![ top_join_plan.as_str(), "SortPreservingRepartitionExec: partitioning=Hash([b1@6], 10), input_partitions=10", "SortExec: expr=[b1@6 ASC]", @@ -3099,6 +3117,8 @@ mod tests { "SortExec: expr=[c@2 ASC]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ], + // this match arm cannot be reached + _ => unreachable!() }; assert_optimized!(expected, top_join.clone(), true, true); @@ -3122,7 +3142,7 @@ mod tests { "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ], // Should include 8 RepartitionExecs (4 of them preserves order) and 4 SortExecs - _ => vec![ + JoinType::Left | JoinType::Full => vec![ top_join_plan.as_str(), "SortPreservingRepartitionExec: partitioning=Hash([b1@6], 10), input_partitions=10", "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", @@ -3143,6 +3163,8 @@ mod tests { "SortExec: expr=[c@2 ASC]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ], + // this match arm cannot be reached + _ => unreachable!() }; assert_optimized!( expected_first_sort_enforcement, @@ -3279,6 +3301,10 @@ mod tests { "ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC]", ]; assert_optimized!(expected, exec, true); + // In this case preserving ordering through order preserving operators is not desirable + // (according to flag: bounded_order_preserving_variants) + // hence in this case ordering lost during CoalescePartitionsExec and re-introduced with + // SortExec at the top. let expected = &[ "SortExec: expr=[a@0 ASC]", "CoalescePartitionsExec", diff --git a/datafusion/core/src/physical_optimizer/mod.rs b/datafusion/core/src/physical_optimizer/mod.rs index 54d5ddcc6e206..9e22bff340c99 100644 --- a/datafusion/core/src/physical_optimizer/mod.rs +++ b/datafusion/core/src/physical_optimizer/mod.rs @@ -26,9 +26,9 @@ pub mod coalesce_batches; pub mod combine_partial_final_agg; pub mod enforce_distribution; pub mod enforce_sorting; -pub mod global_requirements; pub mod join_selection; pub mod optimizer; +pub mod output_requirements; pub mod pipeline_checker; pub mod pruning; pub mod replace_with_order_preserving_variants; diff --git a/datafusion/core/src/physical_optimizer/optimizer.rs b/datafusion/core/src/physical_optimizer/optimizer.rs index a288037d59ba4..95035e5f81a01 100644 --- a/datafusion/core/src/physical_optimizer/optimizer.rs +++ b/datafusion/core/src/physical_optimizer/optimizer.rs @@ -25,8 +25,8 @@ use crate::physical_optimizer::coalesce_batches::CoalesceBatches; use crate::physical_optimizer::combine_partial_final_agg::CombinePartialFinalAggregate; use crate::physical_optimizer::enforce_distribution::EnforceDistribution; use crate::physical_optimizer::enforce_sorting::EnforceSorting; -use crate::physical_optimizer::global_requirements::GlobalRequirements; use crate::physical_optimizer::join_selection::JoinSelection; +use crate::physical_optimizer::output_requirements::OutputRequirements; use crate::physical_optimizer::pipeline_checker::PipelineChecker; use crate::physical_optimizer::topk_aggregation::TopKAggregation; use crate::{error::Result, physical_plan::ExecutionPlan}; @@ -69,9 +69,9 @@ impl PhysicalOptimizer { /// Create a new optimizer using the recommended list of rules pub fn new() -> Self { let rules: Vec> = vec![ - // If there is a required global requirement of the query, make sure that + // If there is a output requirement of the query, make sure that // this information is not lost across different rules during optimization. - Arc::new(GlobalRequirements::new_add_mode()), + Arc::new(OutputRequirements::new_add_mode()), Arc::new(AggregateStatistics::new()), // Statistics-based join selection will change the Auto mode to a real join implementation, // like collect left, or hash join, or future sort merge join, which will influence the @@ -94,9 +94,9 @@ impl PhysicalOptimizer { // The CoalesceBatches rule will not influence the distribution and ordering of the // whole plan tree. Therefore, to avoid influencing other rules, it should run last. Arc::new(CoalesceBatches::new()), - // Remove the ancillary global requirement operator since we are done with the planning + // Remove the ancillary output requirement operator since we are done with the planning // phase. - Arc::new(GlobalRequirements::new_remove_mode()), + Arc::new(OutputRequirements::new_remove_mode()), // The PipelineChecker rule will reject non-runnable query plans that use // pipeline-breaking operators on infinite input(s). The rule generates a // diagnostic error message when this happens. It makes no changes to the diff --git a/datafusion/core/src/physical_optimizer/global_requirements.rs b/datafusion/core/src/physical_optimizer/output_requirements.rs similarity index 85% rename from datafusion/core/src/physical_optimizer/global_requirements.rs rename to datafusion/core/src/physical_optimizer/output_requirements.rs index 7ec06a1fb189b..4b687d7f35361 100644 --- a/datafusion/core/src/physical_optimizer/global_requirements.rs +++ b/datafusion/core/src/physical_optimizer/output_requirements.rs @@ -16,10 +16,10 @@ // under the License. //! The GlobalOrderRequire optimizer rule either: -//! - Adds an auxiliary `GlobalRequirementExec` operator to keep track of global +//! - Adds an auxiliary `OutputRequirementExec` operator to keep track of global //! ordering and distribution requirement across rules, or -//! - Removes the auxiliary `GlobalRequirementExec` operator from the physical plan. -//! Since the `GlobalRequirementExec` operator is only a helper operator, it +//! - Removes the auxiliary `OutputRequirementExec` operator from the physical plan. +//! Since the `OutputRequirementExec` operator is only a helper operator, it //! shouldn't occur in the final plan (i.e. the executed plan). use std::sync::Arc; @@ -37,23 +37,23 @@ use datafusion_physical_expr::{ }; use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; -/// This rule either adds or removes [`GlobalRequirements`]s to/from the physical +/// This rule either adds or removes [`OutputRequirements`]s to/from the physical /// plan according to its `mode` attribute, which is set by the constructors /// `new_add_mode` and `new_remove_mode`. With this rule, we can keep track of /// the global requirements (ordering and distribution) across rules. /// -/// The primary usecase of this node and rule is to specify and preserve the desired output +/// The primary usecase of this node and rule is to specify and preserve the desired output /// ordering and distribution the entire plan. When sending to a single client, a single partition may /// be desirable, but when sending to a multi-partitioned writer, keeping multiple partitions may be -/// better. +/// better. #[derive(Debug)] -pub struct GlobalRequirements { +pub struct OutputRequirements { mode: RuleMode, } -impl GlobalRequirements { +impl OutputRequirements { /// Create a new rule which works in `Add` mode; i.e. it simply adds a - /// top-level [`GlobalRequirementExec`] into the physical plan to keep track + /// top-level [`OutputRequirementExec`] into the physical plan to keep track /// of global ordering and distribution requirements if there are any. /// Note that this rule should run at the beginning. pub fn new_add_mode() -> Self { @@ -63,11 +63,11 @@ impl GlobalRequirements { } /// Create a new rule which works in `Remove` mode; i.e. it simply removes - /// the top-level [`GlobalRequirementExec`] from the physical plan if there is - /// any. We do this because a `GlobalRequirementExec` is an ancillary, + /// the top-level [`OutputRequirementExec`] from the physical plan if there is + /// any. We do this because a `OutputRequirementExec` is an ancillary, /// non-executable operator whose sole purpose is to track global /// requirements during optimization. Therefore, a - /// `GlobalRequirementExec` should not appear in the final plan. + /// `OutputRequirementExec` should not appear in the final plan. pub fn new_remove_mode() -> Self { Self { mode: RuleMode::Remove, @@ -86,15 +86,15 @@ enum RuleMode { /// - the ordering requirement in its `order_requirement` attribute. /// - the distribution requirement in its `dist_requirement` attribute. /// -/// See [`GlobalRequirements`] for more details +/// See [`OutputRequirements`] for more details #[derive(Debug)] -struct GlobalRequirementExec { +struct OutputRequirementExec { input: Arc, order_requirement: Option, dist_requirement: Distribution, } -impl GlobalRequirementExec { +impl OutputRequirementExec { fn new( input: Arc, requirements: Option, @@ -112,17 +112,17 @@ impl GlobalRequirementExec { } } -impl DisplayAs for GlobalRequirementExec { +impl DisplayAs for OutputRequirementExec { fn fmt_as( &self, _t: DisplayFormatType, f: &mut std::fmt::Formatter, ) -> std::fmt::Result { - write!(f, "GlobalRequirementExec") + write!(f, "OutputRequirementExec") } } -impl ExecutionPlan for GlobalRequirementExec { +impl ExecutionPlan for OutputRequirementExec { fn as_any(&self) -> &dyn std::any::Any { self } @@ -184,7 +184,7 @@ impl ExecutionPlan for GlobalRequirementExec { } } -impl PhysicalOptimizerRule for GlobalRequirements { +impl PhysicalOptimizerRule for OutputRequirements { fn optimize( &self, plan: Arc, @@ -194,7 +194,7 @@ impl PhysicalOptimizerRule for GlobalRequirements { RuleMode::Add => require_top_ordering(plan), RuleMode::Remove => plan.transform_up(&|plan| { if let Some(sort_req) = - plan.as_any().downcast_ref::() + plan.as_any().downcast_ref::() { Ok(Transformed::Yes(sort_req.input())) } else { @@ -205,7 +205,7 @@ impl PhysicalOptimizerRule for GlobalRequirements { } fn name(&self) -> &str { - "GlobalRequirements" + "OutputRequirements" } fn schema_check(&self) -> bool { @@ -213,15 +213,15 @@ impl PhysicalOptimizerRule for GlobalRequirements { } } -/// This functions adds ancillary `GlobalRequirementExec` to the the physical plan, so that +/// This functions adds ancillary `OutputRequirementExec` to the the physical plan, so that /// global requirements are not lost during optimization. fn require_top_ordering(plan: Arc) -> Result> { let (new_plan, is_changed) = require_top_ordering_helper(plan)?; if is_changed { Ok(new_plan) } else { - // Add `GlobalRequirementExec` to the top, with no specified ordering and distribution requirement. - Ok(Arc::new(GlobalRequirementExec::new( + // Add `OutputRequirementExec` to the top, with no specified ordering and distribution requirement. + Ok(Arc::new(OutputRequirementExec::new( new_plan, // there is no ordering requirement None, @@ -230,9 +230,9 @@ fn require_top_ordering(plan: Arc) -> Result, ) -> Result<(Arc, bool)> { @@ -245,13 +245,13 @@ fn require_top_ordering_helper( let req_dist = sort_exec.required_input_distribution()[0].clone(); let reqs = PhysicalSortRequirement::from_sort_exprs(req_ordering); Ok(( - Arc::new(GlobalRequirementExec::new(plan, Some(reqs), req_dist)) as _, + Arc::new(OutputRequirementExec::new(plan, Some(reqs), req_dist)) as _, true, )) } else if let Some(spm) = plan.as_any().downcast_ref::() { let reqs = PhysicalSortRequirement::from_sort_exprs(spm.expr()); Ok(( - Arc::new(GlobalRequirementExec::new( + Arc::new(OutputRequirementExec::new( plan, Some(reqs), Distribution::SinglePartition, From 3b53ec9daedf21a058072ff30c2ddbdb744deeae Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Fri, 29 Sep 2023 14:19:46 +0300 Subject: [PATCH 13/13] Add new tests, do not satisfy requirement if not absolutely necessary enforce dist --- .../enforce_distribution.rs | 196 ++++++++++++++---- .../src/physical_optimizer/enforce_sorting.rs | 1 - .../sqllogictest/test_files/explain.slt | 6 +- .../sqllogictest/test_files/groupby.slt | 6 +- datafusion/sqllogictest/test_files/window.slt | 66 ++++++ 5 files changed, 232 insertions(+), 43 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index 4a7dc10420754..b3fb41ea100fe 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -59,6 +59,7 @@ use datafusion_physical_expr::{ }; use datafusion_physical_plan::unbounded_output; +use datafusion_physical_plan::windows::{get_best_fitting_window, BoundedWindowAggExec}; use itertools::izip; /// The `EnforceDistribution` rule ensures that distribution requirements are @@ -1241,9 +1242,28 @@ fn ensure_distribution( } // Remove unnecessary repartition from the physical plan if any let DistributionContext { - plan, + mut plan, mut distribution_onwards, } = remove_dist_changing_operators(dist_context)?; + + if let Some(exec) = plan.as_any().downcast_ref::() { + if let Some(updated_window) = get_best_fitting_window( + exec.window_expr(), + exec.input(), + &exec.partition_keys, + )? { + plan = updated_window; + } + } else if let Some(exec) = plan.as_any().downcast_ref::() { + if let Some(updated_window) = get_best_fitting_window( + exec.window_expr(), + exec.input(), + &exec.partition_keys, + )? { + plan = updated_window; + } + }; + let n_children = plan.children().len(); // This loop iterates over all the children to: // - Increase parallelism for every child if it is beneficial. @@ -1331,19 +1351,23 @@ fn ensure_distribution( // Either: // - Ordering requirement cannot be satisfied by preserving ordering through repartitions, or // - using order preserving variant is not desirable. - if !ordering_satisfy_requirement_concrete( + let ordering_satisfied = ordering_satisfy_requirement_concrete( existing_ordering, required_input_ordering, || child.equivalence_properties(), || child.ordering_equivalence_properties(), - ) || !order_preserving_variants_desirable - { + ); + if !ordering_satisfied || !order_preserving_variants_desirable { replace_order_preserving_variants(&mut child, dist_onward)?; - let sort_expr = PhysicalSortRequirement::to_sort_exprs( - required_input_ordering.clone(), - ); - // Make sure to satisfy ordering requirement - add_sort_above(&mut child, sort_expr, None)?; + // If ordering requirements were satisfied before repartitioning, + // make sure ordering requirements are still satisfied after. + if ordering_satisfied { + // Make sure to satisfy ordering requirement: + let sort_expr = PhysicalSortRequirement::to_sort_exprs( + required_input_ordering.clone(), + ); + add_sort_above(&mut child, sort_expr, None)?; + } } // Stop tracking distribution changing operators *dist_onward = None; @@ -1696,6 +1720,16 @@ mod tests { let expr = input.output_ordering().unwrap_or(&[]).to_vec(); Self { input, expr } } + + fn new_with_requirement( + input: Arc, + requirement: Vec, + ) -> Self { + Self { + input, + expr: requirement, + } + } } impl DisplayAs for SortRequiredExec { @@ -1747,7 +1781,10 @@ mod tests { ) -> Result> { assert_eq!(children.len(), 1); let child = children.pop().unwrap(); - Ok(Arc::new(Self::new(child))) + Ok(Arc::new(Self::new_with_requirement( + child, + self.expr.clone(), + ))) } fn execute( @@ -2023,6 +2060,13 @@ mod tests { Arc::new(SortRequiredExec::new(input)) } + fn sort_required_exec_with_req( + input: Arc, + sort_exprs: LexOrdering, + ) -> Arc { + Arc::new(SortRequiredExec::new_with_requirement(input, sort_exprs)) + } + fn trim_plan_display(plan: &str) -> Vec<&str> { plan.split('\n') .map(|s| s.trim()) @@ -2961,18 +3005,18 @@ mod tests { vec![ top_join_plan.as_str(), join_plan.as_str(), - "SortPreservingRepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "SortExec: expr=[a@0 ASC]", - "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "SortPreservingRepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10", + "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", "SortExec: expr=[b1@1 ASC]", + "RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "SortPreservingRepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "SortExec: expr=[c@2 ASC]", + "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ], // Should include 7 RepartitionExecs (4 hash, 3 round-robin), 4 SortExecs @@ -2987,21 +3031,21 @@ mod tests { _ => vec![ top_join_plan.as_str(), // Below 2 operators are differences introduced, when join mode is changed - "SortPreservingRepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", "SortExec: expr=[a@0 ASC]", + "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", join_plan.as_str(), - "SortPreservingRepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "SortExec: expr=[a@0 ASC]", - "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "SortPreservingRepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10", + "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", "SortExec: expr=[b1@1 ASC]", + "RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "SortPreservingRepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "SortExec: expr=[c@2 ASC]", + "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ], }; @@ -3083,38 +3127,38 @@ mod tests { JoinType::Inner | JoinType::Right => vec![ top_join_plan.as_str(), join_plan.as_str(), - "SortPreservingRepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "SortExec: expr=[a@0 ASC]", - "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "SortPreservingRepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10", + "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", "SortExec: expr=[b1@1 ASC]", + "RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "SortPreservingRepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "SortExec: expr=[c@2 ASC]", + "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ], // Should include 7 RepartitionExecs (4 hash, 3 round-robin) and 4 SortExecs JoinType::Left | JoinType::Full => vec![ top_join_plan.as_str(), - "SortPreservingRepartitionExec: partitioning=Hash([b1@6], 10), input_partitions=10", "SortExec: expr=[b1@6 ASC]", + "RepartitionExec: partitioning=Hash([b1@6], 10), input_partitions=10", join_plan.as_str(), - "SortPreservingRepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "SortExec: expr=[a@0 ASC]", - "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "SortPreservingRepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10", + "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", "SortExec: expr=[b1@1 ASC]", + "RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "SortPreservingRepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "SortExec: expr=[c@2 ASC]", + "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ], // this match arm cannot be reached @@ -4442,6 +4486,86 @@ mod tests { Ok(()) } + #[test] + fn do_not_put_sort_when_input_is_invalid() -> Result<()> { + let schema = schema(); + let sort_key = vec![PhysicalSortExpr { + expr: col("a", &schema).unwrap(), + options: SortOptions::default(), + }]; + let input = parquet_exec(); + let physical_plan = sort_required_exec_with_req(filter_exec(input), sort_key); + let expected = &[ + // Ordering requirement of sort required exec is NOT satisfied + // by existing ordering at the source. + "SortRequiredExec: [a@0 ASC]", + "FilterExec: c@2 = 0", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + ]; + assert_plan_txt!(expected, physical_plan); + + let expected = &[ + "SortRequiredExec: [a@0 ASC]", + // Since at the start of the rule ordering requirement is not satisfied + // EnforceDistribution rule doesn't satisfy this requirement either. + "FilterExec: c@2 = 0", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + ]; + + let mut config = ConfigOptions::new(); + config.execution.target_partitions = 10; + config.optimizer.enable_round_robin_repartition = true; + config.optimizer.bounded_order_preserving_variants = false; + let distribution_plan = + EnforceDistribution::new().optimize(physical_plan, &config)?; + assert_plan_txt!(expected, distribution_plan); + + Ok(()) + } + + #[test] + fn put_sort_when_input_is_valid() -> Result<()> { + let schema = schema(); + let sort_key = vec![PhysicalSortExpr { + expr: col("a", &schema).unwrap(), + options: SortOptions::default(), + }]; + let input = parquet_exec_multiple_sorted(vec![sort_key.clone()]); + let physical_plan = sort_required_exec_with_req(filter_exec(input), sort_key); + + let expected = &[ + // Ordering requirement of sort required exec is satisfied + // by existing ordering at the source. + "SortRequiredExec: [a@0 ASC]", + "FilterExec: c@2 = 0", + "ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC]", + ]; + assert_plan_txt!(expected, physical_plan); + + let expected = &[ + "SortRequiredExec: [a@0 ASC]", + // Since at the start of the rule ordering requirement is satisfied + // EnforceDistribution rule satisfy this requirement also. + // ordering is re-satisfied by introduction of SortExec. + "SortExec: expr=[a@0 ASC]", + "FilterExec: c@2 = 0", + // ordering is lost here + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", + "ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC]", + ]; + + let mut config = ConfigOptions::new(); + config.execution.target_partitions = 10; + config.optimizer.enable_round_robin_repartition = true; + config.optimizer.bounded_order_preserving_variants = false; + let distribution_plan = + EnforceDistribution::new().optimize(physical_plan, &config)?; + assert_plan_txt!(expected, distribution_plan); + + Ok(()) + } + #[test] fn do_not_add_unnecessary_hash() -> Result<()> { let schema = schema(); diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs b/datafusion/core/src/physical_optimizer/enforce_sorting.rs index a149330181d98..c4b72a7cb31e5 100644 --- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs @@ -2035,7 +2035,6 @@ mod tests { let orig_plan = Arc::new(SortExec::new(sort_exprs, repartition)) as Arc; let actual = get_plan_string(&orig_plan); - println!("{:?}", actual); let expected_input = vec![ "SortExec: expr=[nullable_col@0 ASC]", " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", diff --git a/datafusion/sqllogictest/test_files/explain.slt b/datafusion/sqllogictest/test_files/explain.slt index 303adcc1c7962..27ab8671e939d 100644 --- a/datafusion/sqllogictest/test_files/explain.slt +++ b/datafusion/sqllogictest/test_files/explain.slt @@ -242,8 +242,8 @@ logical_plan after eliminate_projection SAME TEXT AS ABOVE logical_plan after push_down_limit SAME TEXT AS ABOVE logical_plan TableScan: simple_explain_test projection=[a, b, c] initial_physical_plan CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], has_header=true -physical_plan after GlobalRequirements -GlobalRequirementExec +physical_plan after OutputRequirements +OutputRequirementExec --CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], has_header=true physical_plan after aggregate_statistics SAME TEXT AS ABOVE physical_plan after join_selection SAME TEXT AS ABOVE @@ -251,7 +251,7 @@ physical_plan after EnforceDistribution SAME TEXT AS ABOVE physical_plan after CombinePartialFinalAggregate SAME TEXT AS ABOVE physical_plan after EnforceSorting SAME TEXT AS ABOVE physical_plan after coalesce_batches SAME TEXT AS ABOVE -physical_plan after GlobalRequirements CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], has_header=true +physical_plan after OutputRequirements CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], has_header=true physical_plan after PipelineChecker SAME TEXT AS ABOVE physical_plan after LimitAggregation SAME TEXT AS ABOVE physical_plan CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], has_header=true diff --git a/datafusion/sqllogictest/test_files/groupby.slt b/datafusion/sqllogictest/test_files/groupby.slt index ffef93837b279..5bb0f31ed5423 100644 --- a/datafusion/sqllogictest/test_files/groupby.slt +++ b/datafusion/sqllogictest/test_files/groupby.slt @@ -2014,9 +2014,9 @@ Sort: l.col0 ASC NULLS LAST ----------TableScan: tab0 projection=[col0, col1] physical_plan SortPreservingMergeExec: [col0@0 ASC NULLS LAST] ---ProjectionExec: expr=[col0@0 as col0, LAST_VALUE(r.col1) ORDER BY [r.col0 ASC NULLS LAST]@3 as last_col1] -----AggregateExec: mode=FinalPartitioned, gby=[col0@0 as col0, col1@1 as col1, col2@2 as col2], aggr=[LAST_VALUE(r.col1)], ordering_mode=PartiallyOrdered -------SortExec: expr=[col0@0 ASC NULLS LAST] +--SortExec: expr=[col0@0 ASC NULLS LAST] +----ProjectionExec: expr=[col0@0 as col0, LAST_VALUE(r.col1) ORDER BY [r.col0 ASC NULLS LAST]@3 as last_col1] +------AggregateExec: mode=FinalPartitioned, gby=[col0@0 as col0, col1@1 as col1, col2@2 as col2], aggr=[LAST_VALUE(r.col1)] --------CoalesceBatchesExec: target_batch_size=8192 ----------RepartitionExec: partitioning=Hash([col0@0, col1@1, col2@2], 4), input_partitions=4 ------------AggregateExec: mode=Partial, gby=[col0@0 as col0, col1@1 as col1, col2@2 as col2], aggr=[LAST_VALUE(r.col1)], ordering_mode=PartiallyOrdered diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt index 3d9f7511be264..b6325fd889ec1 100644 --- a/datafusion/sqllogictest/test_files/window.slt +++ b/datafusion/sqllogictest/test_files/window.slt @@ -3197,6 +3197,72 @@ SELECT a_new, d, rn1 FROM (SELECT d, a as a_new, 0 0 4 0 1 5 +query TT +EXPLAIN SELECT SUM(a) OVER(partition by a, b order by c) as sum1, +SUM(a) OVER(partition by b, a order by c) as sum2, + SUM(a) OVER(partition by a, d order by b) as sum3, + SUM(a) OVER(partition by d order by a) as sum4 +FROM annotated_data_infinite2; +---- +logical_plan +Projection: SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS sum1, SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS sum2, SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS sum3, SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS sum4 +--WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.a AS Int64)) PARTITION BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] +----Projection: annotated_data_infinite2.a, annotated_data_infinite2.d, SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW +------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.a AS Int64)) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] +--------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.a AS Int64)) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] +----------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.a AS Int64)) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] +------------TableScan: annotated_data_infinite2 projection=[a, b, c, d] +physical_plan +ProjectionExec: expr=[SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as sum1, SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as sum2, SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as sum3, SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as sum4] +--BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: CurrentRow }], mode=[Linear] +----ProjectionExec: expr=[a@0 as a, d@3 as d, SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@6 as SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW] +------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: CurrentRow }], mode=[Sorted] +--------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: CurrentRow }], mode=[PartiallySorted([0])] +----------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: CurrentRow }], mode=[Sorted] +------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true + +statement ok +set datafusion.execution.target_partitions = 2; + +# re-execute the same query in multi partitions. +# final plan should still be streamable +query TT +EXPLAIN SELECT SUM(a) OVER(partition by a, b order by c) as sum1, + SUM(a) OVER(partition by b, a order by c) as sum2, + SUM(a) OVER(partition by a, d order by b) as sum3, + SUM(a) OVER(partition by d order by a) as sum4 +FROM annotated_data_infinite2; +---- +logical_plan +Projection: SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS sum1, SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS sum2, SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS sum3, SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS sum4 +--WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.a AS Int64)) PARTITION BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] +----Projection: annotated_data_infinite2.a, annotated_data_infinite2.d, SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW +------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.a AS Int64)) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] +--------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.a AS Int64)) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] +----------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.a AS Int64)) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] +------------TableScan: annotated_data_infinite2 projection=[a, b, c, d] +physical_plan +ProjectionExec: expr=[SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as sum1, SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as sum2, SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as sum3, SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as sum4] +--BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: CurrentRow }], mode=[Linear] +----CoalesceBatchesExec: target_batch_size=4096 +------SortPreservingRepartitionExec: partitioning=Hash([d@1], 2), input_partitions=2 +--------ProjectionExec: expr=[a@0 as a, d@3 as d, SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@6 as SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW] +----------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: CurrentRow }], mode=[Sorted] +------------CoalesceBatchesExec: target_batch_size=4096 +--------------SortPreservingRepartitionExec: partitioning=Hash([b@1, a@0], 2), input_partitions=2 +----------------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: CurrentRow }], mode=[PartiallySorted([0])] +------------------CoalesceBatchesExec: target_batch_size=4096 +--------------------SortPreservingRepartitionExec: partitioning=Hash([a@0, d@3], 2), input_partitions=2 +----------------------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: CurrentRow }], mode=[Sorted] +------------------------CoalesceBatchesExec: target_batch_size=4096 +--------------------------SortPreservingRepartitionExec: partitioning=Hash([a@0, b@1], 2), input_partitions=2 +----------------------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +------------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true + +# reset the partition number 1 again +statement ok +set datafusion.execution.target_partitions = 1; + statement ok drop table annotated_data_finite2