From eaa8349501c345c3ade05782849cdaeb3679ccc1 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 12 Jan 2023 11:37:41 +0100 Subject: [PATCH 1/5] Do not repartition inputs whose sort order is required fix case where repartitioning destroys output order add tests coverage --- .../src/physical_optimizer/repartition.rs | 301 +++++++++++++++++- datafusion/core/src/physical_plan/mod.rs | 7 +- datafusion/core/tests/sql/explain_analyze.rs | 3 +- 3 files changed, 291 insertions(+), 20 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs index 806ed1d4b45ec..81f6b2edac13f 100644 --- a/datafusion/core/src/physical_optimizer/repartition.rs +++ b/datafusion/core/src/physical_optimizer/repartition.rs @@ -129,32 +129,47 @@ impl Repartition { } } -/// Recursively visits all `plan`s puts and then optionally adds a -/// `RepartitionExec` at the output of `plan` to match -/// `target_partitions` in an attempt to increase the overall parallelism. +/// Recursively attempts to increase the overall parallelism of the +/// plan, while respecting ordering, by adding a `RepartitionExec` at +/// the output of `plan` if it would help parallelism and not destroy +/// any possibly useful ordering. /// -/// It does so using depth first scan of the tree, and repartitions +/// It does so using a depth first scan of the tree, and repartitions /// any plan that: /// /// 1. Has fewer partitions than `target_partitions` /// /// 2. Has a direct parent that `benefits_from_input_partitioning` /// -/// if `can_reorder` is false, means that the output of this node -/// can not be reordered as as the final output is relying on that order +/// 3. Does not destroy any existing sort order if the parent is +/// relying on it. /// -/// If 'would_benefit` is false, the upstream operator doesn't -/// benefit from additional repartition +/// if `can_reorder` is false, it means the parent node of `plan` is +/// trying to take advantage of the output sort order of plan, so it +/// should not be repartitioned if doing so would destroy the output +/// sort order. /// +/// (Parent) - If can_reorder is false, means this parent node is +/// trying to use the sort ouder order this plan. If true +/// means parent doesn't care about sort order +/// +/// (plan) - We are deciding to add a partition above here +/// +/// (children) - Recursively visit all children first +/// +/// If 'would_benefit` is true, the upstream operator would benefit +/// from additional partitions and thus repatitioning is considered. +/// +/// if `is_root` is true, no repartition is added. fn optimize_partitions( target_partitions: usize, plan: Arc, + is_root: bool, can_reorder: bool, would_benefit: bool, ) -> Result> { // Recurse into children bottom-up (attempt to repartition as // early as possible) - let new_plan = if plan.children().is_empty() { // leaf node - don't replace children plan @@ -163,10 +178,34 @@ fn optimize_partitions( .children() .iter() .map(|child| { + // does plan itelf (not parent) require its input to + // be sorted in some way? + let required_input_ordering = + plan_has_required_input_ordering(plan.as_ref()); + + let can_reorder_child = if can_reorder { + // parent of `plan` will not use any particular order + + // if `plan` itself doesn't need order OR + !required_input_ordering || + // child has no order to preserve + child.output_ordering().is_none() + } else { + // parent would like to use the `plan`'s output + // order. + + // if `plan` doesn't maintain the input order and + // doesn't need the child's output order itself + (!plan.maintains_input_order() && !required_input_ordering) || + // child has no ordering to preserve + child.output_ordering().is_none() + }; + optimize_partitions( target_partitions, child.clone(), - can_reorder || child.output_ordering().is_none(), + false, // child is not root + can_reorder_child, plan.benefits_from_input_partitioning(), ) }) @@ -191,6 +230,11 @@ fn optimize_partitions( && stats.num_rows.map(|num_rows| num_rows > 1).unwrap_or(true); } + // don't reparititon root of the plan + if is_root { + could_repartition = false; + } + if would_benefit && could_repartition && can_reorder { Ok(Arc::new(RepartitionExec::try_new( new_plan, @@ -201,6 +245,14 @@ fn optimize_partitions( } } +/// Returns true if `plan` requires any of inputs to be sorted in some +/// way for correctness. If this is true, its output should not be +/// repartitioned if it would destroy the required order. +fn plan_has_required_input_ordering(plan: &dyn ExecutionPlan) -> bool { + // NB: checking `is_empty()` is not the right check! + plan.required_input_ordering().iter().any(Option::is_some) +} + impl PhysicalOptimizerRule for Repartition { fn optimize( &self, @@ -213,11 +265,15 @@ impl PhysicalOptimizerRule for Repartition { if !enabled || target_partitions == 1 { Ok(plan) } else { + let is_root = true; + let can_reorder = plan.output_ordering().is_none(); + let would_benefit = false; optimize_partitions( target_partitions, plan.clone(), - plan.output_ordering().is_none(), - false, + is_root, + can_reorder, + would_benefit, ) } } @@ -230,6 +286,13 @@ impl PhysicalOptimizerRule for Repartition { true } } + +#[cfg(test)] +#[ctor::ctor] +fn init() { + let _ = env_logger::try_init(); +} + #[cfg(test)] mod tests { use arrow::compute::SortOptions; @@ -251,12 +314,13 @@ mod tests { use crate::physical_plan::sorts::sort::SortExec; use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use crate::physical_plan::union::UnionExec; - use crate::physical_plan::{displayable, Statistics}; + use crate::physical_plan::{displayable, DisplayFormatType, Statistics}; fn schema() -> SchemaRef { Arc::new(Schema::new(vec![Field::new("c1", DataType::Boolean, true)])) } + /// Create a non sorted parquet exec fn parquet_exec() -> Arc { Arc::new(ParquetExec::new( FileScanConfig { @@ -275,6 +339,30 @@ mod tests { )) } + // Created a sorted parquet exec + fn parquet_exec_sorted() -> Arc { + let sort_exprs = vec![PhysicalSortExpr { + expr: col("c1", &schema()).unwrap(), + options: SortOptions::default(), + }]; + + Arc::new(ParquetExec::new( + FileScanConfig { + object_store_url: ObjectStoreUrl::parse("test:///").unwrap(), + file_schema: schema(), + file_groups: vec![vec![PartitionedFile::new("x".to_string(), 100)]], + statistics: Statistics::default(), + projection: None, + limit: None, + table_partition_cols: vec![], + output_ordering: Some(sort_exprs), + infinite_source: false, + }, + None, + None, + )) + } + fn sort_preserving_merge_exec( input: Arc, ) -> Arc { @@ -350,6 +438,14 @@ mod tests { )) } + fn union_exec(input: Vec>) -> Arc { + Arc::new(UnionExec::new(input)) + } + + fn sort_required_exec(input: Arc) -> Arc { + Arc::new(SortRequiredExec::new(input)) + } + fn trim_plan_display(plan: &str) -> Vec<&str> { plan.split('\n') .map(|s| s.trim()) @@ -550,8 +646,7 @@ mod tests { #[test] fn repartition_ignores_union() -> Result<()> { - let plan: Arc = - Arc::new(UnionExec::new(vec![parquet_exec(); 5])); + let plan = union_exec(vec![parquet_exec(); 5]); let expected = &[ "UnionExec", @@ -568,9 +663,11 @@ mod tests { } #[test] - fn repartition_ignores_sort_preserving_merge() -> Result<()> { + fn repartition_through_sort_preserving_merge() -> Result<()> { + // sort preserving merge with non-sorted input let plan = sort_preserving_merge_exec(parquet_exec()); + // need repartiton and resort as the data was not sorted correctly let expected = &[ "SortPreservingMergeExec: [c1@0 ASC]", "SortExec: [c1@0 ASC]", @@ -583,9 +680,94 @@ mod tests { } #[test] - fn repartition_does_not_repartition_transitively() -> Result<()> { + fn repartition_ignores_sort_preserving_merge() -> Result<()> { + // sort preserving merge already sorted input, + let plan = sort_preserving_merge_exec(parquet_exec_sorted()); + + // should not repartition / sort (as the data was already sorted) + let expected = &[ + "SortPreservingMergeExec: [c1@0 ASC]", + "ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]", + ]; + + assert_optimized!(expected, plan); + Ok(()) + } + + #[test] + fn repartition_ignores_sort_preserving_merge_with_union() -> Result<()> { + // 2 sorted parquet files unioned (partitions are concatenated, sort is preserved) + let input = union_exec(vec![parquet_exec_sorted(); 2]); + let plan = sort_preserving_merge_exec(input); + + // should not repartition / sort (as the data was already sorted) + let expected = &[ + "SortPreservingMergeExec: [c1@0 ASC]", + "UnionExec", + "ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]", + "ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]", + ]; + + assert_optimized!(expected, plan); + Ok(()) + } + + #[test] + fn repartition_does_not_destroy_sort() -> Result<()> { + // SortRequired + // Parquet(sorted) + + let plan = sort_required_exec(parquet_exec_sorted()); + + // should not repartition as doing so destroys the necessary sort order + let expected = &[ + "SortRequiredExec", + "ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]", + ]; + + assert_optimized!(expected, plan); + Ok(()) + } + + #[test] + fn repartition_does_not_destroy_sort_more_complex() -> Result<()> { + // model a more complicated scenario where one child of a union can be repartitioned for performance + // but the other can not be + // + // Union + // SortRequired + // Parquet(sorted) + // Filter + // Parquet(unsorted) + + let input1 = sort_required_exec(parquet_exec_sorted()); + let input2 = filter_exec(parquet_exec()); + let plan = union_exec(vec![input1, input2]); + + // should not repartition below the SortRequired as that + // destroys the sort order but should still repartition for + // FilterExec + let expected = &[ + "UnionExec", + // union input 1: no repartitioning + "SortRequiredExec", + "ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]", + // union input 2: should repartition + "FilterExec: c1@0", + "RepartitionExec: partitioning=RoundRobinBatch(10)", + "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]", + ]; + + assert_optimized!(expected, plan); + Ok(()) + } + + #[test] + fn repartition_transitively_with_projection() -> Result<()> { + // non sorted input let plan = sort_preserving_merge_exec(projection_exec(parquet_exec())); + // needs to repartition / sort as the data was not sorted correctly let expected = &[ "SortPreservingMergeExec: [c1@0 ASC]", "SortExec: [c1@0 ASC]", @@ -598,6 +780,22 @@ mod tests { Ok(()) } + #[test] + fn repartition_ignores_transitively_with_projection() -> Result<()> { + // sorted input + let plan = sort_preserving_merge_exec(projection_exec(parquet_exec_sorted())); + + // data should not be repartitioned / resorted + let expected = &[ + "SortPreservingMergeExec: [c1@0 ASC]", + "ProjectionExec: expr=[c1@0 as c1]", + "ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]", + ]; + + assert_optimized!(expected, plan); + Ok(()) + } + #[test] fn repartition_transitively_past_sort_with_projection() -> Result<()> { let plan = @@ -655,4 +853,73 @@ mod tests { assert_optimized!(expected, plan); Ok(()) } + + /// Models operators like BoundedWindowExec that require an input + /// ordering but is easy to construct + #[derive(Debug)] + struct SortRequiredExec { + input: Arc, + } + + impl SortRequiredExec { + fn new(input: Arc) -> Self { + Self { input } + } + } + + impl ExecutionPlan for SortRequiredExec { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn schema(&self) -> SchemaRef { + self.input.schema() + } + + fn output_partitioning(&self) -> crate::physical_plan::Partitioning { + self.input.output_partitioning() + } + + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + self.input.output_ordering() + } + + fn children(&self) -> Vec> { + vec![self.input.clone()] + } + + // model that it requires the output ordering of its input + fn required_input_ordering(&self) -> Vec> { + vec![self.input.output_ordering()] + } + + fn with_new_children( + self: Arc, + mut children: Vec>, + ) -> Result> { + assert_eq!(children.len(), 1); + let child = children.pop().unwrap(); + Ok(Arc::new(Self::new(child))) + } + + fn execute( + &self, + _partition: usize, + _context: Arc, + ) -> Result { + unreachable!(); + } + + fn statistics(&self) -> Statistics { + self.input.statistics() + } + + fn fmt_as( + &self, + _t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + write!(f, "SortRequiredExec") + } + } } diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs index b9a0d9707ee14..0417814f3c80a 100644 --- a/datafusion/core/src/physical_plan/mod.rs +++ b/datafusion/core/src/physical_plan/mod.rs @@ -136,7 +136,12 @@ pub trait ExecutionPlan: Debug + Send + Sync { } /// Specifies the ordering requirements for all of the children - /// For each child, it's the local ordering requirement within each partition rather than the global ordering + /// For each child, it's the local ordering requirement within + /// each partition rather than the global ordering + /// + /// NOTE that checking `!is_empty()` does **not** check for a + /// required input ordering. Instead, the correct check is that at + /// least one entry must be `Some` fn required_input_ordering(&self) -> Vec> { vec![None; self.children().len()] } diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs index 0cc65abf08502..1e4f081f47899 100644 --- a/datafusion/core/tests/sql/explain_analyze.rs +++ b/datafusion/core/tests/sql/explain_analyze.rs @@ -86,11 +86,10 @@ async fn explain_analyze_baseline_metrics() { "CoalesceBatchesExec: target_batch_size=4096", "metrics=[output_rows=5, elapsed_compute" ); - // The number of output rows becomes less after changing the global sort to the local sort with limit push down assert_metrics!( &formatted, "CoalescePartitionsExec", - "metrics=[output_rows=3, elapsed_compute=" + "metrics=[output_rows=5, elapsed_compute=" ); assert_metrics!( &formatted, From c5e3c8d809669f1ce8b79d6aa0927e306e4964a0 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 17 Jan 2023 12:12:54 +0100 Subject: [PATCH 2/5] Do not repartition inputs whose sort order is required (#4885) fix case where repartitioning destroys output order add tests coverage From 91ac6b2fa110c5886398a16a4a1c3f4ad35cb8f6 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 17 Jan 2023 13:14:09 +0100 Subject: [PATCH 3/5] Reduce redundancy in sort_enforcement tests (#4928) --- .../physical_optimizer/sort_enforcement.rs | 453 ++++++++---------- 1 file changed, 200 insertions(+), 253 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs b/datafusion/core/src/physical_optimizer/sort_enforcement.rs index 52463b4bdc09f..2d785e920a269 100644 --- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs +++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs @@ -589,71 +589,84 @@ mod tests { Ok(()) } + /// Runs the sort enforcement optimizer and asserts the plan + /// against the original and expected plans + /// + /// `$EXPECTED_PLAN_LINES`: input plan + /// `$EXPECTED_OPTIMIZED_PLAN_LINES`: optimized plan + /// `$PLAN`: the plan to optimized + /// + macro_rules! assert_optimized { + ($EXPECTED_PLAN_LINES: expr, $EXPECTED_OPTIMIZED_PLAN_LINES: expr, $PLAN: expr) => { + let session_ctx = SessionContext::new(); + let state = session_ctx.state(); + + let physical_plan = $PLAN; + let formatted = displayable(physical_plan.as_ref()).indent().to_string(); + let actual: Vec<&str> = formatted.trim().lines().collect(); + + let expected_plan_lines: Vec<&str> = $EXPECTED_PLAN_LINES + .iter().map(|s| *s).collect(); + + assert_eq!( + expected_plan_lines, actual, + "\n**Original Plan Mismatch\n\nexpected:\n\n{expected_plan_lines:#?}\nactual:\n\n{actual:#?}\n\n" + ); + + let expected_optimized_lines: Vec<&str> = $EXPECTED_OPTIMIZED_PLAN_LINES + .iter().map(|s| *s).collect(); + + // Run the actual optimizer + let optimized_physical_plan = + EnforceSorting::new().optimize(physical_plan, state.config_options())?; + + let formatted = displayable(optimized_physical_plan.as_ref()) + .indent() + .to_string(); + let actual: Vec<&str> = formatted.trim().lines().collect(); + assert_eq!( + expected_optimized_lines, actual, + "\n**Optimized Plan Mismatch\n\nexpected:\n\n{expected_optimized_lines:#?}\nactual:\n\n{actual:#?}\n\n" + ); + + }; + } + #[tokio::test] async fn test_remove_unnecessary_sort() -> Result<()> { - let session_ctx = SessionContext::new(); - let state = session_ctx.state(); let schema = create_test_schema()?; - let source = Arc::new(MemoryExec::try_new(&[], schema.clone(), None)?) - as Arc; - let sort_exprs = vec![PhysicalSortExpr { - expr: col("non_nullable_col", schema.as_ref()).unwrap(), - options: SortOptions::default(), - }]; - let sort_exec = Arc::new(SortExec::try_new(sort_exprs, source, None)?) - as Arc; - let sort_exprs = vec![PhysicalSortExpr { - expr: col("nullable_col", schema.as_ref()).unwrap(), - options: SortOptions::default(), - }]; - let physical_plan = Arc::new(SortExec::try_new(sort_exprs, sort_exec, None)?) - as Arc; - let formatted = displayable(physical_plan.as_ref()).indent().to_string(); - let expected = { - vec![ - "SortExec: [nullable_col@0 ASC]", - " SortExec: [non_nullable_col@1 ASC]", - ] - }; - let actual: Vec<&str> = formatted.trim().lines().collect(); - let actual_len = actual.len(); - let actual_trim_last = &actual[..actual_len - 1]; - assert_eq!( - expected, actual_trim_last, - "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" - ); - let optimized_physical_plan = - EnforceSorting::new().optimize(physical_plan, state.config_options())?; - let formatted = displayable(optimized_physical_plan.as_ref()) - .indent() - .to_string(); - let expected = { vec!["SortExec: [nullable_col@0 ASC]"] }; - let actual: Vec<&str> = formatted.trim().lines().collect(); - let actual_len = actual.len(); - let actual_trim_last = &actual[..actual_len - 1]; - assert_eq!( - expected, actual_trim_last, - "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" - ); + let source = memory_exec(&schema); + let input = sort_exec(vec![sort_expr("non_nullable_col", &schema)], source); + let physical_plan = sort_exec(vec![sort_expr("nullable_col", &schema)], input); + + let expected_input = vec![ + "SortExec: [nullable_col@0 ASC]", + " SortExec: [non_nullable_col@1 ASC]", + " MemoryExec: partitions=0, partition_sizes=[]", + ]; + let expected_optimized = vec![ + "SortExec: [nullable_col@0 ASC]", + " MemoryExec: partitions=0, partition_sizes=[]", + ]; + assert_optimized!(expected_input, expected_optimized, physical_plan); Ok(()) } #[tokio::test] async fn test_remove_unnecessary_sort_window_multilayer() -> Result<()> { - let session_ctx = SessionContext::new(); - let state = session_ctx.state(); let schema = create_test_schema()?; - let source = Arc::new(MemoryExec::try_new(&[], schema.clone(), None)?) - as Arc; - let sort_exprs = vec![PhysicalSortExpr { - expr: col("non_nullable_col", source.schema().as_ref()).unwrap(), - options: SortOptions { + let source = memory_exec(&schema); + + let sort_exprs = vec![sort_expr_options( + "non_nullable_col", + &source.schema(), + SortOptions { descending: true, nulls_first: true, }, - }]; - let sort_exec = Arc::new(SortExec::try_new(sort_exprs.clone(), source, None)?) - as Arc; + )]; + let sort = sort_exec(sort_exprs.clone(), source); + let window_agg_exec = Arc::new(WindowAggExec::try_new( vec![create_window_expr( &WindowFunction::AggregateFunction(AggregateFunction::Count), @@ -664,32 +677,33 @@ mod tests { Arc::new(WindowFrame::new(true)), schema.as_ref(), )?], - sort_exec.clone(), - sort_exec.schema(), + sort.clone(), + sort.schema(), vec![], Some(sort_exprs), )?) as Arc; - let sort_exprs = vec![PhysicalSortExpr { - expr: col("non_nullable_col", window_agg_exec.schema().as_ref()).unwrap(), - options: SortOptions { + + let sort_exprs = vec![sort_expr_options( + "non_nullable_col", + &window_agg_exec.schema(), + SortOptions { descending: false, nulls_first: false, }, - }]; - let sort_exec = Arc::new(SortExec::try_new( - sort_exprs.clone(), - window_agg_exec, - None, - )?) as Arc; + )]; + + let sort = sort_exec(sort_exprs.clone(), window_agg_exec); + // Add dummy layer propagating Sort above, to test whether sort can be removed from multi layer before - let filter_exec = Arc::new(FilterExec::try_new( + let filter = filter_exec( Arc::new(NotExpr::new( col("non_nullable_col", schema.as_ref()).unwrap(), )), - sort_exec, - )?) as Arc; + sort, + ); + // let filter_exec = sort_exec; - let window_agg_exec = Arc::new(WindowAggExec::try_new( + let physical_plan = Arc::new(WindowAggExec::try_new( vec![create_window_expr( &WindowFunction::AggregateFunction(AggregateFunction::Count), "count".to_owned(), @@ -699,214 +713,147 @@ mod tests { Arc::new(WindowFrame::new(true)), schema.as_ref(), )?], - filter_exec.clone(), - filter_exec.schema(), + filter.clone(), + filter.schema(), vec![], Some(sort_exprs), )?) as Arc; - let physical_plan = window_agg_exec; - let formatted = displayable(physical_plan.as_ref()).indent().to_string(); - let expected = { - vec![ - "WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow }]", - " FilterExec: NOT non_nullable_col@1", - " SortExec: [non_nullable_col@1 ASC NULLS LAST]", - " WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow }]", - " SortExec: [non_nullable_col@1 DESC]", - " MemoryExec: partitions=0, partition_sizes=[]", - ] - }; - let actual: Vec<&str> = formatted.trim().lines().collect(); - assert_eq!( - expected, actual, - "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" - ); - let optimized_physical_plan = - EnforceSorting::new().optimize(physical_plan, state.config_options())?; - let formatted = displayable(optimized_physical_plan.as_ref()) - .indent() - .to_string(); - let expected = { - vec![ - "WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(NULL) }]", - " FilterExec: NOT non_nullable_col@1", - " WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow }]", - " SortExec: [non_nullable_col@1 DESC]", - " MemoryExec: partitions=0, partition_sizes=[]", - ] - }; - let actual: Vec<&str> = formatted.trim().lines().collect(); - assert_eq!( - expected, actual, - "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" - ); + + let expected_input = vec![ + "WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow }]", + " FilterExec: NOT non_nullable_col@1", + " SortExec: [non_nullable_col@1 ASC NULLS LAST]", + " WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow }]", + " SortExec: [non_nullable_col@1 DESC]", + " MemoryExec: partitions=0, partition_sizes=[]", + ]; + + let expected_optimized = vec![ + "WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(NULL) }]", + " FilterExec: NOT non_nullable_col@1", + " WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow }]", + " SortExec: [non_nullable_col@1 DESC]", + " MemoryExec: partitions=0, partition_sizes=[]", + ]; + assert_optimized!(expected_input, expected_optimized, physical_plan); Ok(()) } #[tokio::test] async fn test_add_required_sort() -> Result<()> { - let session_ctx = SessionContext::new(); - let state = session_ctx.state(); let schema = create_test_schema()?; - let source = Arc::new(MemoryExec::try_new(&[], schema.clone(), None)?) - as Arc; - let sort_exprs = vec![PhysicalSortExpr { - expr: col("nullable_col", schema.as_ref()).unwrap(), - options: SortOptions::default(), - }]; - let physical_plan = Arc::new(SortPreservingMergeExec::new(sort_exprs, source)) - as Arc; - let formatted = displayable(physical_plan.as_ref()).indent().to_string(); - let expected = { vec!["SortPreservingMergeExec: [nullable_col@0 ASC]"] }; - let actual: Vec<&str> = formatted.trim().lines().collect(); - let actual_len = actual.len(); - let actual_trim_last = &actual[..actual_len - 1]; - assert_eq!( - expected, actual_trim_last, - "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" - ); - let optimized_physical_plan = - EnforceSorting::new().optimize(physical_plan, state.config_options())?; - let formatted = displayable(optimized_physical_plan.as_ref()) - .indent() - .to_string(); - let expected = { - vec![ - "SortPreservingMergeExec: [nullable_col@0 ASC]", - " SortExec: [nullable_col@0 ASC]", - ] - }; - let actual: Vec<&str> = formatted.trim().lines().collect(); - let actual_len = actual.len(); - let actual_trim_last = &actual[..actual_len - 1]; - assert_eq!( - expected, actual_trim_last, - "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" - ); + let source = memory_exec(&schema); + + let sort_exprs = vec![sort_expr("nullable_col", &schema)]; + + let physical_plan = sort_preserving_merge_exec(sort_exprs, source); + + let expected_input = vec![ + "SortPreservingMergeExec: [nullable_col@0 ASC]", + " MemoryExec: partitions=0, partition_sizes=[]", + ]; + let expected_optimized = vec![ + "SortPreservingMergeExec: [nullable_col@0 ASC]", + " SortExec: [nullable_col@0 ASC]", + " MemoryExec: partitions=0, partition_sizes=[]", + ]; + assert_optimized!(expected_input, expected_optimized, physical_plan); Ok(()) } #[tokio::test] async fn test_remove_unnecessary_sort1() -> Result<()> { - let session_ctx = SessionContext::new(); - let state = session_ctx.state(); let schema = create_test_schema()?; - let source = Arc::new(MemoryExec::try_new(&[], schema.clone(), None)?) - as Arc; - let sort_exprs = vec![PhysicalSortExpr { - expr: col("nullable_col", schema.as_ref()).unwrap(), - options: SortOptions::default(), - }]; - let sort_exec = Arc::new(SortExec::try_new(sort_exprs.clone(), source, None)?) - as Arc; - let sort_preserving_merge_exec = - Arc::new(SortPreservingMergeExec::new(sort_exprs, sort_exec)) - as Arc; - let sort_exprs = vec![PhysicalSortExpr { - expr: col("nullable_col", schema.as_ref()).unwrap(), - options: SortOptions::default(), - }]; - let sort_exec = Arc::new(SortExec::try_new( - sort_exprs.clone(), - sort_preserving_merge_exec, - None, - )?) as Arc; - let sort_preserving_merge_exec = - Arc::new(SortPreservingMergeExec::new(sort_exprs, sort_exec)) - as Arc; - let physical_plan = sort_preserving_merge_exec; - let formatted = displayable(physical_plan.as_ref()).indent().to_string(); - let expected = { - vec![ - "SortPreservingMergeExec: [nullable_col@0 ASC]", - " SortExec: [nullable_col@0 ASC]", - " SortPreservingMergeExec: [nullable_col@0 ASC]", - " SortExec: [nullable_col@0 ASC]", - " MemoryExec: partitions=0, partition_sizes=[]", - ] - }; - let actual: Vec<&str> = formatted.trim().lines().collect(); - assert_eq!( - expected, actual, - "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" - ); - let optimized_physical_plan = - EnforceSorting::new().optimize(physical_plan, state.config_options())?; - let formatted = displayable(optimized_physical_plan.as_ref()) - .indent() - .to_string(); - let expected = { - vec![ - "SortPreservingMergeExec: [nullable_col@0 ASC]", - " SortPreservingMergeExec: [nullable_col@0 ASC]", - " SortExec: [nullable_col@0 ASC]", - " MemoryExec: partitions=0, partition_sizes=[]", - ] - }; - let actual: Vec<&str> = formatted.trim().lines().collect(); - assert_eq!( - expected, actual, - "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" - ); + let source = memory_exec(&schema); + let sort_exprs = vec![sort_expr("nullable_col", &schema)]; + let sort = sort_exec(sort_exprs.clone(), source); + let spm = sort_preserving_merge_exec(sort_exprs, sort); + + let sort_exprs = vec![sort_expr("nullable_col", &schema)]; + let sort = sort_exec(sort_exprs.clone(), spm); + let physical_plan = sort_preserving_merge_exec(sort_exprs, sort); + let expected_input = vec![ + "SortPreservingMergeExec: [nullable_col@0 ASC]", + " SortExec: [nullable_col@0 ASC]", + " SortPreservingMergeExec: [nullable_col@0 ASC]", + " SortExec: [nullable_col@0 ASC]", + " MemoryExec: partitions=0, partition_sizes=[]", + ]; + let expected_optimized = vec![ + "SortPreservingMergeExec: [nullable_col@0 ASC]", + " SortPreservingMergeExec: [nullable_col@0 ASC]", + " SortExec: [nullable_col@0 ASC]", + " MemoryExec: partitions=0, partition_sizes=[]", + ]; + assert_optimized!(expected_input, expected_optimized, physical_plan); Ok(()) } #[tokio::test] async fn test_change_wrong_sorting() -> Result<()> { - let session_ctx = SessionContext::new(); - let state = session_ctx.state(); let schema = create_test_schema()?; - let source = Arc::new(MemoryExec::try_new(&[], schema.clone(), None)?) - as Arc; + let source = memory_exec(&schema); let sort_exprs = vec![ - PhysicalSortExpr { - expr: col("nullable_col", schema.as_ref()).unwrap(), - options: SortOptions::default(), - }, - PhysicalSortExpr { - expr: col("non_nullable_col", schema.as_ref()).unwrap(), - options: SortOptions::default(), - }, + sort_expr("nullable_col", &schema), + sort_expr("non_nullable_col", &schema), ]; - let sort_exec = Arc::new(SortExec::try_new( - vec![sort_exprs[0].clone()], - source, - None, - )?) as Arc; - let sort_preserving_merge_exec = - Arc::new(SortPreservingMergeExec::new(sort_exprs, sort_exec)) - as Arc; - let physical_plan = sort_preserving_merge_exec; - let formatted = displayable(physical_plan.as_ref()).indent().to_string(); - let expected = { - vec![ - "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]", - " SortExec: [nullable_col@0 ASC]", - " MemoryExec: partitions=0, partition_sizes=[]", - ] - }; - let actual: Vec<&str> = formatted.trim().lines().collect(); - assert_eq!( - expected, actual, - "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" - ); - let optimized_physical_plan = - EnforceSorting::new().optimize(physical_plan, state.config_options())?; - let formatted = displayable(optimized_physical_plan.as_ref()) - .indent() - .to_string(); - let expected = { - vec![ - "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]", - " SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]", - " MemoryExec: partitions=0, partition_sizes=[]", - ] - }; - let actual: Vec<&str> = formatted.trim().lines().collect(); - assert_eq!( - expected, actual, - "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" - ); + let sort = sort_exec(vec![sort_exprs[0].clone()], source); + let physical_plan = sort_preserving_merge_exec(sort_exprs, sort); + let expected_input = vec![ + "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]", + " SortExec: [nullable_col@0 ASC]", + " MemoryExec: partitions=0, partition_sizes=[]", + ]; + let expected_optimized = vec![ + "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]", + " SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]", + " MemoryExec: partitions=0, partition_sizes=[]", + ]; + assert_optimized!(expected_input, expected_optimized, physical_plan); Ok(()) } + + /// make PhysicalSortExpr with default options + fn sort_expr(name: &str, schema: &Schema) -> PhysicalSortExpr { + sort_expr_options(name, schema, SortOptions::default()) + } + + /// PhysicalSortExpr with specified options + fn sort_expr_options( + name: &str, + schema: &Schema, + options: SortOptions, + ) -> PhysicalSortExpr { + PhysicalSortExpr { + expr: col(name, schema).unwrap(), + options, + } + } + + fn memory_exec(schema: &SchemaRef) -> Arc { + Arc::new(MemoryExec::try_new(&[], schema.clone(), None).unwrap()) + } + + fn sort_exec( + sort_exprs: impl IntoIterator, + input: Arc, + ) -> Arc { + let sort_exprs = sort_exprs.into_iter().collect(); + Arc::new(SortExec::try_new(sort_exprs, input, None).unwrap()) + } + + fn sort_preserving_merge_exec( + sort_exprs: impl IntoIterator, + input: Arc, + ) -> Arc { + let sort_exprs = sort_exprs.into_iter().collect(); + Arc::new(SortPreservingMergeExec::new(sort_exprs, input)) + } + + fn filter_exec( + predicate: Arc, + input: Arc, + ) -> Arc { + Arc::new(FilterExec::try_new(predicate, input).unwrap()) + } } From d06a9226caea1476371df78ed1af9d29a13072e0 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 17 Jan 2023 08:53:12 -0500 Subject: [PATCH 4/5] Do not resort inputs to Union if they are already sorted --- .../physical_optimizer/sort_enforcement.rs | 125 +++++++++++++++++- datafusion/core/src/physical_plan/union.rs | 55 +++++++- 2 files changed, 172 insertions(+), 8 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs b/datafusion/core/src/physical_optimizer/sort_enforcement.rs index 2d785e920a269..11769a1f9937e 100644 --- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs +++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs @@ -38,7 +38,7 @@ use crate::physical_optimizer::PhysicalOptimizerRule; use crate::physical_plan::rewrite::TreeNodeRewritable; use crate::physical_plan::sorts::sort::SortExec; use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec}; -use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan}; +use crate::physical_plan::{displayable, with_new_children_if_necessary, ExecutionPlan}; use arrow::datatypes::SchemaRef; use datafusion_common::{reverse_sort_options, DataFusionError}; use datafusion_physical_expr::window::WindowExpr; @@ -47,6 +47,8 @@ use itertools::izip; use std::iter::zip; use std::sync::Arc; +use log::trace; + /// This rule inspects SortExec's in the given physical plan and removes the /// ones it can prove unnecessary. #[derive(Default)] @@ -70,6 +72,24 @@ struct PlanWithCorrespondingSort { sort_onwards: Vec)>>, } +fn trace_sort_onwards(title: &str, requirements: &PlanWithCorrespondingSort) { + trace!( + "{} {}", + title, + displayable(requirements.plan.as_ref()).indent() + ); + trace!("requirements:"); + for (idx, children) in requirements.sort_onwards.iter().enumerate() { + trace!("children[{idx}]"); + for (idx2, (child_idx, child)) in children.iter().enumerate() { + trace!( + " child[{idx2}]: ({child_idx}, {})", + displayable(child.as_ref()).one_line() + ); + } + } +} + impl PlanWithCorrespondingSort { pub fn new(plan: Arc) -> Self { let length = plan.children().len(); @@ -147,6 +167,8 @@ impl PhysicalOptimizerRule for EnforceSorting { fn ensure_sorting( requirements: PlanWithCorrespondingSort, ) -> Result> { + trace_sort_onwards("AAL ensuring sort", &requirements); + // Perform naive analysis at the beginning -- remove already-satisfied sorts: if let Some(result) = analyze_immediate_sort_removal(&requirements)? { return Ok(Some(result)); @@ -169,6 +191,13 @@ fn ensure_sorting( required_ordering, || child.equivalence_properties(), ); + trace!("is_ordering_satisfied: {is_ordering_satisfied}"); + trace!("physical_ordering: {physical_ordering:?}"); + trace!("required_ordering: {required_ordering:?}"); + trace!( + "child.equivalence_properties: {:?}", + child.equivalence_properties() + ); if !is_ordering_satisfied { // Make sure we preserve the ordering requirements: update_child_to_remove_unnecessary_sort(child, sort_onwards)?; @@ -223,14 +252,17 @@ fn ensure_sorting( } (Some(required), None) => { // Ordering requirement is not met, we should add a SortExec to the plan. + trace!("Ordering requirement is not met, adding SortExec to the plan"); let sort_expr = required.to_vec(); *child = add_sort_above_child(child, sort_expr)?; *sort_onwards = vec![(idx, child.clone())]; } (None, Some(_)) => { + trace!("SortExec effect is netralized"); // We have a SortExec whose effect may be neutralized by a order-imposing // operator. In this case, remove this sort: if !requirements.plan.maintains_input_order() { + trace!("SortExec effect is netralized and the plan does not maintain input order, removing sort"); update_child_to_remove_unnecessary_sort(child, sort_onwards)?; } } @@ -273,12 +305,23 @@ fn analyze_immediate_sort_removal( requirements: &PlanWithCorrespondingSort, ) -> Result> { if let Some(sort_exec) = requirements.plan.as_any().downcast_ref::() { + trace_sort_onwards("AAL analyze_immediate_sort_removal", &requirements); // If this sort is unnecessary, we should remove it: + trace!("AAL analyze_immediate_sort_removal: sort_exec.input().output_ordering(): {:?}", + sort_exec.input().output_ordering()); + trace!( + "AAL analyze_immediate_sort_removal: sort_exec.output_ordering(): {:?}", + sort_exec.output_ordering() + ); + trace!("AAL analyze_immediate_sort_removal: sort_exec.input().equivalence_properties(): {:?}", + sort_exec.input().equivalence_properties()); + if ordering_satisfy( sort_exec.input().output_ordering(), sort_exec.output_ordering(), || sort_exec.input().equivalence_properties(), ) { + trace!("AAL analyze_immediate_sort_removal orderng was already satisfied, removing sort"); // Since we know that a `SortExec` has exactly one child, // we can use the zero index safely: let mut new_onwards = requirements.sort_onwards[0].to_vec(); @@ -486,15 +529,19 @@ fn check_alignment( #[cfg(test)] mod tests { use super::*; + use crate::datasource::listing::PartitionedFile; + use crate::datasource::object_store::ObjectStoreUrl; use crate::physical_plan::displayable; + use crate::physical_plan::file_format::{FileScanConfig, ParquetExec}; use crate::physical_plan::filter::FilterExec; use crate::physical_plan::memory::MemoryExec; use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; + use crate::physical_plan::union::UnionExec; use crate::physical_plan::windows::create_window_expr; use crate::prelude::SessionContext; use arrow::compute::SortOptions; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; - use datafusion_common::Result; + use datafusion_common::{Result, Statistics}; use datafusion_expr::{AggregateFunction, WindowFrame, WindowFunction}; use datafusion_physical_expr::expressions::{col, NotExpr}; use datafusion_physical_expr::PhysicalSortExpr; @@ -813,6 +860,33 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_union_inputs_sorted() -> Result<()> { + let schema = create_test_schema()?; + + let source1 = parquet_exec(&schema); + let sort_exprs = vec![sort_expr("nullable_col", &schema)]; + let sort = sort_exec(sort_exprs.clone(), source1); + + let source2 = parquet_exec_sorted(&schema, sort_exprs.clone()); + + let union = union_exec(vec![source2, sort]); + let physical_plan = sort_preserving_merge_exec(sort_exprs, union); + + // one input to the union is already sorted, one is not. + let expected_input = vec![ + "SortPreservingMergeExec: [nullable_col@0 ASC]", + " UnionExec", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]", + " SortExec: [nullable_col@0 ASC]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + ]; + // should not add a sort at the output of the union, input plan should not be changed + let expected_optimized = expected_input.clone(); + assert_optimized!(expected_input, expected_optimized, physical_plan); + Ok(()) + } + /// make PhysicalSortExpr with default options fn sort_expr(name: &str, schema: &Schema) -> PhysicalSortExpr { sort_expr_options(name, schema, SortOptions::default()) @@ -856,4 +930,51 @@ mod tests { ) -> Arc { Arc::new(FilterExec::try_new(predicate, input).unwrap()) } + + /// Create a non sorted parquet exec + fn parquet_exec(schema: &SchemaRef) -> Arc { + Arc::new(ParquetExec::new( + FileScanConfig { + object_store_url: ObjectStoreUrl::parse("test:///").unwrap(), + file_schema: schema.clone(), + file_groups: vec![vec![PartitionedFile::new("x".to_string(), 100)]], + statistics: Statistics::default(), + projection: None, + limit: None, + table_partition_cols: vec![], + output_ordering: None, + infinite_source: false, + }, + None, + None, + )) + } + + // Created a sorted parquet exec + fn parquet_exec_sorted( + schema: &SchemaRef, + sort_exprs: impl IntoIterator, + ) -> Arc { + let sort_exprs = sort_exprs.into_iter().collect(); + + Arc::new(ParquetExec::new( + FileScanConfig { + object_store_url: ObjectStoreUrl::parse("test:///").unwrap(), + file_schema: schema.clone(), + file_groups: vec![vec![PartitionedFile::new("x".to_string(), 100)]], + statistics: Statistics::default(), + projection: None, + limit: None, + table_partition_cols: vec![], + output_ordering: Some(sort_exprs), + infinite_source: false, + }, + None, + None, + )) + } + + fn union_exec(input: Vec>) -> Arc { + Arc::new(UnionExec::new(input)) + } } diff --git a/datafusion/core/src/physical_plan/union.rs b/datafusion/core/src/physical_plan/union.rs index 921a0d99f03ef..3b5ca6b74c717 100644 --- a/datafusion/core/src/physical_plan/union.rs +++ b/datafusion/core/src/physical_plan/union.rs @@ -34,6 +34,7 @@ use datafusion_common::{DFSchemaRef, DataFusionError}; use futures::{Stream, StreamExt}; use itertools::Itertools; use log::debug; +use log::trace; use log::warn; use super::{ @@ -43,6 +44,7 @@ use super::{ SendableRecordBatchStream, Statistics, }; use crate::execution::context::TaskContext; +use crate::physical_plan::displayable; use crate::{ error::Result, physical_plan::{expressions, metrics::BaselineMetrics}, @@ -227,24 +229,65 @@ impl ExecutionPlan for UnionExec { // It might be too strict here in the case that the input ordering are compatible but not exactly the same. // For example one input ordering has the ordering spec SortExpr('a','b','c') and the other has the ordering // spec SortExpr('a'), It is safe to derive the out ordering with the spec SortExpr('a'). - if !self.partition_aware + trace!("{}", displayable(self).indent()); + let result = if !self.partition_aware && first_input_ordering.is_some() && self .inputs .iter() + .inspect(|plan| { + trace!( + " considering input {}", + displayable(plan.as_ref()).one_line() + ) + }) .map(|plan| plan.output_ordering()) .all(|ordering| { - ordering.is_some() + trace!(" ordering {ordering:?}"); + + let strict_equal = ordering.is_some() && sort_expr_list_eq_strict_order( ordering.unwrap(), first_input_ordering.unwrap(), - ) - }) - { + ); + trace!(" strict_equal {strict_equal:?}"); + + ordering.is_some() && strict_equal + }) { first_input_ordering } else { None - } + }; + + trace!("self.partition_aware: {}", self.partition_aware); + trace!("first_input_ordering: {:?}", first_input_ordering); + trace!("output ordering: {:?}", result); + + result + } + + fn maintains_input_order(&self) -> bool { + let first_input_ordering = self.inputs[0].output_ordering(); + // If the Union is not partition aware and all the input + // ordering spec strictly equal with the first_input_ordering, + // then the `UnionExec` maintains the input order + // + // It might be too strict here in the case that the input + // ordering are compatible but not exactly the same. See + // comments in output_ordering + !self.partition_aware + && first_input_ordering.is_some() + && self + .inputs + .iter() + .map(|plan| plan.output_ordering()) + .all(|ordering| { + ordering.is_some() + && sort_expr_list_eq_strict_order( + ordering.unwrap(), + first_input_ordering.unwrap(), + ) + }) } fn with_new_children( From c21d391bba9f795d93c5bdc787b9014c41305dfb Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 17 Jan 2023 09:06:22 -0500 Subject: [PATCH 5/5] Remove debugging --- .../physical_optimizer/sort_enforcement.rs | 45 +------------------ datafusion/core/src/physical_plan/union.rs | 31 +++---------- 2 files changed, 7 insertions(+), 69 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs b/datafusion/core/src/physical_optimizer/sort_enforcement.rs index 11769a1f9937e..703a13a1cb1d5 100644 --- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs +++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs @@ -38,7 +38,7 @@ use crate::physical_optimizer::PhysicalOptimizerRule; use crate::physical_plan::rewrite::TreeNodeRewritable; use crate::physical_plan::sorts::sort::SortExec; use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec}; -use crate::physical_plan::{displayable, with_new_children_if_necessary, ExecutionPlan}; +use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan}; use arrow::datatypes::SchemaRef; use datafusion_common::{reverse_sort_options, DataFusionError}; use datafusion_physical_expr::window::WindowExpr; @@ -47,8 +47,6 @@ use itertools::izip; use std::iter::zip; use std::sync::Arc; -use log::trace; - /// This rule inspects SortExec's in the given physical plan and removes the /// ones it can prove unnecessary. #[derive(Default)] @@ -72,24 +70,6 @@ struct PlanWithCorrespondingSort { sort_onwards: Vec)>>, } -fn trace_sort_onwards(title: &str, requirements: &PlanWithCorrespondingSort) { - trace!( - "{} {}", - title, - displayable(requirements.plan.as_ref()).indent() - ); - trace!("requirements:"); - for (idx, children) in requirements.sort_onwards.iter().enumerate() { - trace!("children[{idx}]"); - for (idx2, (child_idx, child)) in children.iter().enumerate() { - trace!( - " child[{idx2}]: ({child_idx}, {})", - displayable(child.as_ref()).one_line() - ); - } - } -} - impl PlanWithCorrespondingSort { pub fn new(plan: Arc) -> Self { let length = plan.children().len(); @@ -167,8 +147,6 @@ impl PhysicalOptimizerRule for EnforceSorting { fn ensure_sorting( requirements: PlanWithCorrespondingSort, ) -> Result> { - trace_sort_onwards("AAL ensuring sort", &requirements); - // Perform naive analysis at the beginning -- remove already-satisfied sorts: if let Some(result) = analyze_immediate_sort_removal(&requirements)? { return Ok(Some(result)); @@ -191,13 +169,6 @@ fn ensure_sorting( required_ordering, || child.equivalence_properties(), ); - trace!("is_ordering_satisfied: {is_ordering_satisfied}"); - trace!("physical_ordering: {physical_ordering:?}"); - trace!("required_ordering: {required_ordering:?}"); - trace!( - "child.equivalence_properties: {:?}", - child.equivalence_properties() - ); if !is_ordering_satisfied { // Make sure we preserve the ordering requirements: update_child_to_remove_unnecessary_sort(child, sort_onwards)?; @@ -252,17 +223,14 @@ fn ensure_sorting( } (Some(required), None) => { // Ordering requirement is not met, we should add a SortExec to the plan. - trace!("Ordering requirement is not met, adding SortExec to the plan"); let sort_expr = required.to_vec(); *child = add_sort_above_child(child, sort_expr)?; *sort_onwards = vec![(idx, child.clone())]; } (None, Some(_)) => { - trace!("SortExec effect is netralized"); // We have a SortExec whose effect may be neutralized by a order-imposing // operator. In this case, remove this sort: if !requirements.plan.maintains_input_order() { - trace!("SortExec effect is netralized and the plan does not maintain input order, removing sort"); update_child_to_remove_unnecessary_sort(child, sort_onwards)?; } } @@ -305,23 +273,12 @@ fn analyze_immediate_sort_removal( requirements: &PlanWithCorrespondingSort, ) -> Result> { if let Some(sort_exec) = requirements.plan.as_any().downcast_ref::() { - trace_sort_onwards("AAL analyze_immediate_sort_removal", &requirements); // If this sort is unnecessary, we should remove it: - trace!("AAL analyze_immediate_sort_removal: sort_exec.input().output_ordering(): {:?}", - sort_exec.input().output_ordering()); - trace!( - "AAL analyze_immediate_sort_removal: sort_exec.output_ordering(): {:?}", - sort_exec.output_ordering() - ); - trace!("AAL analyze_immediate_sort_removal: sort_exec.input().equivalence_properties(): {:?}", - sort_exec.input().equivalence_properties()); - if ordering_satisfy( sort_exec.input().output_ordering(), sort_exec.output_ordering(), || sort_exec.input().equivalence_properties(), ) { - trace!("AAL analyze_immediate_sort_removal orderng was already satisfied, removing sort"); // Since we know that a `SortExec` has exactly one child, // we can use the zero index safely: let mut new_onwards = requirements.sort_onwards[0].to_vec(); diff --git a/datafusion/core/src/physical_plan/union.rs b/datafusion/core/src/physical_plan/union.rs index 3b5ca6b74c717..a0fca80661158 100644 --- a/datafusion/core/src/physical_plan/union.rs +++ b/datafusion/core/src/physical_plan/union.rs @@ -34,7 +34,6 @@ use datafusion_common::{DFSchemaRef, DataFusionError}; use futures::{Stream, StreamExt}; use itertools::Itertools; use log::debug; -use log::trace; use log::warn; use super::{ @@ -44,7 +43,6 @@ use super::{ SendableRecordBatchStream, Statistics, }; use crate::execution::context::TaskContext; -use crate::physical_plan::displayable; use crate::{ error::Result, physical_plan::{expressions, metrics::BaselineMetrics}, @@ -229,41 +227,24 @@ impl ExecutionPlan for UnionExec { // It might be too strict here in the case that the input ordering are compatible but not exactly the same. // For example one input ordering has the ordering spec SortExpr('a','b','c') and the other has the ordering // spec SortExpr('a'), It is safe to derive the out ordering with the spec SortExpr('a'). - trace!("{}", displayable(self).indent()); - let result = if !self.partition_aware + if !self.partition_aware && first_input_ordering.is_some() && self .inputs .iter() - .inspect(|plan| { - trace!( - " considering input {}", - displayable(plan.as_ref()).one_line() - ) - }) .map(|plan| plan.output_ordering()) .all(|ordering| { - trace!(" ordering {ordering:?}"); - - let strict_equal = ordering.is_some() + ordering.is_some() && sort_expr_list_eq_strict_order( ordering.unwrap(), first_input_ordering.unwrap(), - ); - trace!(" strict_equal {strict_equal:?}"); - - ordering.is_some() && strict_equal - }) { + ) + }) + { first_input_ordering } else { None - }; - - trace!("self.partition_aware: {}", self.partition_aware); - trace!("first_input_ordering: {:?}", first_input_ordering); - trace!("output ordering: {:?}", result); - - result + } } fn maintains_input_order(&self) -> bool {