From c9bf249b2e80ea056c1f2d1509660f2d8d10b3c4 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 17 Jan 2023 08:53:12 -0500 Subject: [PATCH 1/2] Do not resort inputs to Union if they are already sorted --- .../physical_optimizer/sort_enforcement.rs | 125 +++++++++++++++++- datafusion/core/src/physical_plan/union.rs | 55 +++++++- 2 files changed, 172 insertions(+), 8 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs b/datafusion/core/src/physical_optimizer/sort_enforcement.rs index 2d785e920a269..11769a1f9937e 100644 --- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs +++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs @@ -38,7 +38,7 @@ use crate::physical_optimizer::PhysicalOptimizerRule; use crate::physical_plan::rewrite::TreeNodeRewritable; use crate::physical_plan::sorts::sort::SortExec; use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec}; -use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan}; +use crate::physical_plan::{displayable, with_new_children_if_necessary, ExecutionPlan}; use arrow::datatypes::SchemaRef; use datafusion_common::{reverse_sort_options, DataFusionError}; use datafusion_physical_expr::window::WindowExpr; @@ -47,6 +47,8 @@ use itertools::izip; use std::iter::zip; use std::sync::Arc; +use log::trace; + /// This rule inspects SortExec's in the given physical plan and removes the /// ones it can prove unnecessary. #[derive(Default)] @@ -70,6 +72,24 @@ struct PlanWithCorrespondingSort { sort_onwards: Vec)>>, } +fn trace_sort_onwards(title: &str, requirements: &PlanWithCorrespondingSort) { + trace!( + "{} {}", + title, + displayable(requirements.plan.as_ref()).indent() + ); + trace!("requirements:"); + for (idx, children) in requirements.sort_onwards.iter().enumerate() { + trace!("children[{idx}]"); + for (idx2, (child_idx, child)) in children.iter().enumerate() { + trace!( + " child[{idx2}]: ({child_idx}, {})", + displayable(child.as_ref()).one_line() + ); + } + } +} + impl PlanWithCorrespondingSort { pub fn new(plan: Arc) -> Self { let length = plan.children().len(); @@ -147,6 +167,8 @@ impl PhysicalOptimizerRule for EnforceSorting { fn ensure_sorting( requirements: PlanWithCorrespondingSort, ) -> Result> { + trace_sort_onwards("AAL ensuring sort", &requirements); + // Perform naive analysis at the beginning -- remove already-satisfied sorts: if let Some(result) = analyze_immediate_sort_removal(&requirements)? { return Ok(Some(result)); @@ -169,6 +191,13 @@ fn ensure_sorting( required_ordering, || child.equivalence_properties(), ); + trace!("is_ordering_satisfied: {is_ordering_satisfied}"); + trace!("physical_ordering: {physical_ordering:?}"); + trace!("required_ordering: {required_ordering:?}"); + trace!( + "child.equivalence_properties: {:?}", + child.equivalence_properties() + ); if !is_ordering_satisfied { // Make sure we preserve the ordering requirements: update_child_to_remove_unnecessary_sort(child, sort_onwards)?; @@ -223,14 +252,17 @@ fn ensure_sorting( } (Some(required), None) => { // Ordering requirement is not met, we should add a SortExec to the plan. + trace!("Ordering requirement is not met, adding SortExec to the plan"); let sort_expr = required.to_vec(); *child = add_sort_above_child(child, sort_expr)?; *sort_onwards = vec![(idx, child.clone())]; } (None, Some(_)) => { + trace!("SortExec effect is netralized"); // We have a SortExec whose effect may be neutralized by a order-imposing // operator. In this case, remove this sort: if !requirements.plan.maintains_input_order() { + trace!("SortExec effect is netralized and the plan does not maintain input order, removing sort"); update_child_to_remove_unnecessary_sort(child, sort_onwards)?; } } @@ -273,12 +305,23 @@ fn analyze_immediate_sort_removal( requirements: &PlanWithCorrespondingSort, ) -> Result> { if let Some(sort_exec) = requirements.plan.as_any().downcast_ref::() { + trace_sort_onwards("AAL analyze_immediate_sort_removal", &requirements); // If this sort is unnecessary, we should remove it: + trace!("AAL analyze_immediate_sort_removal: sort_exec.input().output_ordering(): {:?}", + sort_exec.input().output_ordering()); + trace!( + "AAL analyze_immediate_sort_removal: sort_exec.output_ordering(): {:?}", + sort_exec.output_ordering() + ); + trace!("AAL analyze_immediate_sort_removal: sort_exec.input().equivalence_properties(): {:?}", + sort_exec.input().equivalence_properties()); + if ordering_satisfy( sort_exec.input().output_ordering(), sort_exec.output_ordering(), || sort_exec.input().equivalence_properties(), ) { + trace!("AAL analyze_immediate_sort_removal orderng was already satisfied, removing sort"); // Since we know that a `SortExec` has exactly one child, // we can use the zero index safely: let mut new_onwards = requirements.sort_onwards[0].to_vec(); @@ -486,15 +529,19 @@ fn check_alignment( #[cfg(test)] mod tests { use super::*; + use crate::datasource::listing::PartitionedFile; + use crate::datasource::object_store::ObjectStoreUrl; use crate::physical_plan::displayable; + use crate::physical_plan::file_format::{FileScanConfig, ParquetExec}; use crate::physical_plan::filter::FilterExec; use crate::physical_plan::memory::MemoryExec; use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; + use crate::physical_plan::union::UnionExec; use crate::physical_plan::windows::create_window_expr; use crate::prelude::SessionContext; use arrow::compute::SortOptions; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; - use datafusion_common::Result; + use datafusion_common::{Result, Statistics}; use datafusion_expr::{AggregateFunction, WindowFrame, WindowFunction}; use datafusion_physical_expr::expressions::{col, NotExpr}; use datafusion_physical_expr::PhysicalSortExpr; @@ -813,6 +860,33 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_union_inputs_sorted() -> Result<()> { + let schema = create_test_schema()?; + + let source1 = parquet_exec(&schema); + let sort_exprs = vec![sort_expr("nullable_col", &schema)]; + let sort = sort_exec(sort_exprs.clone(), source1); + + let source2 = parquet_exec_sorted(&schema, sort_exprs.clone()); + + let union = union_exec(vec![source2, sort]); + let physical_plan = sort_preserving_merge_exec(sort_exprs, union); + + // one input to the union is already sorted, one is not. + let expected_input = vec![ + "SortPreservingMergeExec: [nullable_col@0 ASC]", + " UnionExec", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]", + " SortExec: [nullable_col@0 ASC]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + ]; + // should not add a sort at the output of the union, input plan should not be changed + let expected_optimized = expected_input.clone(); + assert_optimized!(expected_input, expected_optimized, physical_plan); + Ok(()) + } + /// make PhysicalSortExpr with default options fn sort_expr(name: &str, schema: &Schema) -> PhysicalSortExpr { sort_expr_options(name, schema, SortOptions::default()) @@ -856,4 +930,51 @@ mod tests { ) -> Arc { Arc::new(FilterExec::try_new(predicate, input).unwrap()) } + + /// Create a non sorted parquet exec + fn parquet_exec(schema: &SchemaRef) -> Arc { + Arc::new(ParquetExec::new( + FileScanConfig { + object_store_url: ObjectStoreUrl::parse("test:///").unwrap(), + file_schema: schema.clone(), + file_groups: vec![vec![PartitionedFile::new("x".to_string(), 100)]], + statistics: Statistics::default(), + projection: None, + limit: None, + table_partition_cols: vec![], + output_ordering: None, + infinite_source: false, + }, + None, + None, + )) + } + + // Created a sorted parquet exec + fn parquet_exec_sorted( + schema: &SchemaRef, + sort_exprs: impl IntoIterator, + ) -> Arc { + let sort_exprs = sort_exprs.into_iter().collect(); + + Arc::new(ParquetExec::new( + FileScanConfig { + object_store_url: ObjectStoreUrl::parse("test:///").unwrap(), + file_schema: schema.clone(), + file_groups: vec![vec![PartitionedFile::new("x".to_string(), 100)]], + statistics: Statistics::default(), + projection: None, + limit: None, + table_partition_cols: vec![], + output_ordering: Some(sort_exprs), + infinite_source: false, + }, + None, + None, + )) + } + + fn union_exec(input: Vec>) -> Arc { + Arc::new(UnionExec::new(input)) + } } diff --git a/datafusion/core/src/physical_plan/union.rs b/datafusion/core/src/physical_plan/union.rs index 921a0d99f03ef..3b5ca6b74c717 100644 --- a/datafusion/core/src/physical_plan/union.rs +++ b/datafusion/core/src/physical_plan/union.rs @@ -34,6 +34,7 @@ use datafusion_common::{DFSchemaRef, DataFusionError}; use futures::{Stream, StreamExt}; use itertools::Itertools; use log::debug; +use log::trace; use log::warn; use super::{ @@ -43,6 +44,7 @@ use super::{ SendableRecordBatchStream, Statistics, }; use crate::execution::context::TaskContext; +use crate::physical_plan::displayable; use crate::{ error::Result, physical_plan::{expressions, metrics::BaselineMetrics}, @@ -227,24 +229,65 @@ impl ExecutionPlan for UnionExec { // It might be too strict here in the case that the input ordering are compatible but not exactly the same. // For example one input ordering has the ordering spec SortExpr('a','b','c') and the other has the ordering // spec SortExpr('a'), It is safe to derive the out ordering with the spec SortExpr('a'). - if !self.partition_aware + trace!("{}", displayable(self).indent()); + let result = if !self.partition_aware && first_input_ordering.is_some() && self .inputs .iter() + .inspect(|plan| { + trace!( + " considering input {}", + displayable(plan.as_ref()).one_line() + ) + }) .map(|plan| plan.output_ordering()) .all(|ordering| { - ordering.is_some() + trace!(" ordering {ordering:?}"); + + let strict_equal = ordering.is_some() && sort_expr_list_eq_strict_order( ordering.unwrap(), first_input_ordering.unwrap(), - ) - }) - { + ); + trace!(" strict_equal {strict_equal:?}"); + + ordering.is_some() && strict_equal + }) { first_input_ordering } else { None - } + }; + + trace!("self.partition_aware: {}", self.partition_aware); + trace!("first_input_ordering: {:?}", first_input_ordering); + trace!("output ordering: {:?}", result); + + result + } + + fn maintains_input_order(&self) -> bool { + let first_input_ordering = self.inputs[0].output_ordering(); + // If the Union is not partition aware and all the input + // ordering spec strictly equal with the first_input_ordering, + // then the `UnionExec` maintains the input order + // + // It might be too strict here in the case that the input + // ordering are compatible but not exactly the same. See + // comments in output_ordering + !self.partition_aware + && first_input_ordering.is_some() + && self + .inputs + .iter() + .map(|plan| plan.output_ordering()) + .all(|ordering| { + ordering.is_some() + && sort_expr_list_eq_strict_order( + ordering.unwrap(), + first_input_ordering.unwrap(), + ) + }) } fn with_new_children( From 28df02640a7006c6bedd60c26dd11931f3265d63 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 17 Jan 2023 09:06:22 -0500 Subject: [PATCH 2/2] Remove debugging --- .../physical_optimizer/sort_enforcement.rs | 45 +------------------ datafusion/core/src/physical_plan/union.rs | 31 +++---------- 2 files changed, 7 insertions(+), 69 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs b/datafusion/core/src/physical_optimizer/sort_enforcement.rs index 11769a1f9937e..703a13a1cb1d5 100644 --- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs +++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs @@ -38,7 +38,7 @@ use crate::physical_optimizer::PhysicalOptimizerRule; use crate::physical_plan::rewrite::TreeNodeRewritable; use crate::physical_plan::sorts::sort::SortExec; use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec}; -use crate::physical_plan::{displayable, with_new_children_if_necessary, ExecutionPlan}; +use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan}; use arrow::datatypes::SchemaRef; use datafusion_common::{reverse_sort_options, DataFusionError}; use datafusion_physical_expr::window::WindowExpr; @@ -47,8 +47,6 @@ use itertools::izip; use std::iter::zip; use std::sync::Arc; -use log::trace; - /// This rule inspects SortExec's in the given physical plan and removes the /// ones it can prove unnecessary. #[derive(Default)] @@ -72,24 +70,6 @@ struct PlanWithCorrespondingSort { sort_onwards: Vec)>>, } -fn trace_sort_onwards(title: &str, requirements: &PlanWithCorrespondingSort) { - trace!( - "{} {}", - title, - displayable(requirements.plan.as_ref()).indent() - ); - trace!("requirements:"); - for (idx, children) in requirements.sort_onwards.iter().enumerate() { - trace!("children[{idx}]"); - for (idx2, (child_idx, child)) in children.iter().enumerate() { - trace!( - " child[{idx2}]: ({child_idx}, {})", - displayable(child.as_ref()).one_line() - ); - } - } -} - impl PlanWithCorrespondingSort { pub fn new(plan: Arc) -> Self { let length = plan.children().len(); @@ -167,8 +147,6 @@ impl PhysicalOptimizerRule for EnforceSorting { fn ensure_sorting( requirements: PlanWithCorrespondingSort, ) -> Result> { - trace_sort_onwards("AAL ensuring sort", &requirements); - // Perform naive analysis at the beginning -- remove already-satisfied sorts: if let Some(result) = analyze_immediate_sort_removal(&requirements)? { return Ok(Some(result)); @@ -191,13 +169,6 @@ fn ensure_sorting( required_ordering, || child.equivalence_properties(), ); - trace!("is_ordering_satisfied: {is_ordering_satisfied}"); - trace!("physical_ordering: {physical_ordering:?}"); - trace!("required_ordering: {required_ordering:?}"); - trace!( - "child.equivalence_properties: {:?}", - child.equivalence_properties() - ); if !is_ordering_satisfied { // Make sure we preserve the ordering requirements: update_child_to_remove_unnecessary_sort(child, sort_onwards)?; @@ -252,17 +223,14 @@ fn ensure_sorting( } (Some(required), None) => { // Ordering requirement is not met, we should add a SortExec to the plan. - trace!("Ordering requirement is not met, adding SortExec to the plan"); let sort_expr = required.to_vec(); *child = add_sort_above_child(child, sort_expr)?; *sort_onwards = vec![(idx, child.clone())]; } (None, Some(_)) => { - trace!("SortExec effect is netralized"); // We have a SortExec whose effect may be neutralized by a order-imposing // operator. In this case, remove this sort: if !requirements.plan.maintains_input_order() { - trace!("SortExec effect is netralized and the plan does not maintain input order, removing sort"); update_child_to_remove_unnecessary_sort(child, sort_onwards)?; } } @@ -305,23 +273,12 @@ fn analyze_immediate_sort_removal( requirements: &PlanWithCorrespondingSort, ) -> Result> { if let Some(sort_exec) = requirements.plan.as_any().downcast_ref::() { - trace_sort_onwards("AAL analyze_immediate_sort_removal", &requirements); // If this sort is unnecessary, we should remove it: - trace!("AAL analyze_immediate_sort_removal: sort_exec.input().output_ordering(): {:?}", - sort_exec.input().output_ordering()); - trace!( - "AAL analyze_immediate_sort_removal: sort_exec.output_ordering(): {:?}", - sort_exec.output_ordering() - ); - trace!("AAL analyze_immediate_sort_removal: sort_exec.input().equivalence_properties(): {:?}", - sort_exec.input().equivalence_properties()); - if ordering_satisfy( sort_exec.input().output_ordering(), sort_exec.output_ordering(), || sort_exec.input().equivalence_properties(), ) { - trace!("AAL analyze_immediate_sort_removal orderng was already satisfied, removing sort"); // Since we know that a `SortExec` has exactly one child, // we can use the zero index safely: let mut new_onwards = requirements.sort_onwards[0].to_vec(); diff --git a/datafusion/core/src/physical_plan/union.rs b/datafusion/core/src/physical_plan/union.rs index 3b5ca6b74c717..a0fca80661158 100644 --- a/datafusion/core/src/physical_plan/union.rs +++ b/datafusion/core/src/physical_plan/union.rs @@ -34,7 +34,6 @@ use datafusion_common::{DFSchemaRef, DataFusionError}; use futures::{Stream, StreamExt}; use itertools::Itertools; use log::debug; -use log::trace; use log::warn; use super::{ @@ -44,7 +43,6 @@ use super::{ SendableRecordBatchStream, Statistics, }; use crate::execution::context::TaskContext; -use crate::physical_plan::displayable; use crate::{ error::Result, physical_plan::{expressions, metrics::BaselineMetrics}, @@ -229,41 +227,24 @@ impl ExecutionPlan for UnionExec { // It might be too strict here in the case that the input ordering are compatible but not exactly the same. // For example one input ordering has the ordering spec SortExpr('a','b','c') and the other has the ordering // spec SortExpr('a'), It is safe to derive the out ordering with the spec SortExpr('a'). - trace!("{}", displayable(self).indent()); - let result = if !self.partition_aware + if !self.partition_aware && first_input_ordering.is_some() && self .inputs .iter() - .inspect(|plan| { - trace!( - " considering input {}", - displayable(plan.as_ref()).one_line() - ) - }) .map(|plan| plan.output_ordering()) .all(|ordering| { - trace!(" ordering {ordering:?}"); - - let strict_equal = ordering.is_some() + ordering.is_some() && sort_expr_list_eq_strict_order( ordering.unwrap(), first_input_ordering.unwrap(), - ); - trace!(" strict_equal {strict_equal:?}"); - - ordering.is_some() && strict_equal - }) { + ) + }) + { first_input_ordering } else { None - }; - - trace!("self.partition_aware: {}", self.partition_aware); - trace!("first_input_ordering: {:?}", first_input_ordering); - trace!("output ordering: {:?}", result); - - result + } } fn maintains_input_order(&self) -> bool {