From e4c05578f2cb4bb0d5667fd438f1290c85cb4fda Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Wed, 18 Jan 2023 14:17:12 +0300 Subject: [PATCH 1/9] Initial commit --- .../src/physical_optimizer/repartition.rs | 5 ++-- .../physical_optimizer/sort_enforcement.rs | 4 +-- datafusion/core/src/physical_plan/filter.rs | 4 +-- datafusion/core/src/physical_plan/limit.rs | 8 +++--- datafusion/core/src/physical_plan/mod.rs | 4 +-- .../core/src/physical_plan/projection.rs | 4 +-- .../core/src/physical_plan/repartition/mod.rs | 6 ++-- datafusion/core/src/physical_plan/union.rs | 28 +++++++++---------- .../windows/bounded_window_agg_exec.rs | 4 +-- .../physical_plan/windows/window_agg_exec.rs | 4 +-- .../core/src/scheduler/pipeline/execution.rs | 2 +- 11 files changed, 37 insertions(+), 36 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs index 81f6b2edac13f..2416d8e9fc055 100644 --- a/datafusion/core/src/physical_optimizer/repartition.rs +++ b/datafusion/core/src/physical_optimizer/repartition.rs @@ -177,7 +177,8 @@ fn optimize_partitions( let children = plan .children() .iter() - .map(|child| { + .enumerate() + .map(|(idx, child)| { // does plan itelf (not parent) require its input to // be sorted in some way? let required_input_ordering = @@ -196,7 +197,7 @@ fn optimize_partitions( // if `plan` doesn't maintain the input order and // doesn't need the child's output order itself - (!plan.maintains_input_order() && !required_input_ordering) || + (!plan.maintains_input_order()[idx] && !required_input_ordering) || // child has no ordering to preserve child.output_ordering().is_none() }; diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs b/datafusion/core/src/physical_optimizer/sort_enforcement.rs index 703a13a1cb1d5..edab3014439e5 100644 --- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs +++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs @@ -230,7 +230,7 @@ fn ensure_sorting( (None, Some(_)) => { // We have a SortExec whose effect may be neutralized by a order-imposing // operator. In this case, remove this sort: - if !requirements.plan.maintains_input_order() { + if !requirements.plan.maintains_input_order()[idx] { update_child_to_remove_unnecessary_sort(child, sort_onwards)?; } } @@ -248,7 +248,7 @@ fn ensure_sorting( .take(new_plan.children().len()) { // TODO: When `maintains_input_order` returns a `Vec`, use corresponding index. - if new_plan.maintains_input_order() + if new_plan.maintains_input_order()[idx] && required_ordering.is_none() && !trace.is_empty() { diff --git a/datafusion/core/src/physical_plan/filter.rs b/datafusion/core/src/physical_plan/filter.rs index 35c20c4cc6cb5..45c1cc25d61dd 100644 --- a/datafusion/core/src/physical_plan/filter.rs +++ b/datafusion/core/src/physical_plan/filter.rs @@ -118,9 +118,9 @@ impl ExecutionPlan for FilterExec { self.input.output_ordering() } - fn maintains_input_order(&self) -> bool { + fn maintains_input_order(&self) -> Vec { // tell optimizer this operator doesn't reorder its input - true + vec![true] } fn equivalence_properties(&self) -> EquivalenceProperties { diff --git a/datafusion/core/src/physical_plan/limit.rs b/datafusion/core/src/physical_plan/limit.rs index c7299fea3ee07..3fa900ca1518b 100644 --- a/datafusion/core/src/physical_plan/limit.rs +++ b/datafusion/core/src/physical_plan/limit.rs @@ -105,8 +105,8 @@ impl ExecutionPlan for GlobalLimitExec { Partitioning::UnknownPartitioning(1) } - fn maintains_input_order(&self) -> bool { - true + fn maintains_input_order(&self) -> Vec { + vec![true] } fn benefits_from_input_partitioning(&self) -> bool { @@ -293,8 +293,8 @@ impl ExecutionPlan for LocalLimitExec { self.input.output_ordering() } - fn maintains_input_order(&self) -> bool { - true + fn maintains_input_order(&self) -> Vec { + vec![true] } fn equivalence_properties(&self) -> EquivalenceProperties { diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs index 0417814f3c80a..59b08cd14138a 100644 --- a/datafusion/core/src/physical_plan/mod.rs +++ b/datafusion/core/src/physical_plan/mod.rs @@ -162,8 +162,8 @@ pub trait ExecutionPlan: Debug + Send + Sync { /// WARNING: if you override this default, you *MUST* ensure that /// the operator's maintains the ordering invariant or else /// DataFusion may produce incorrect results. - fn maintains_input_order(&self) -> bool { - false + fn maintains_input_order(&self) -> Vec { + vec![false; self.children().len()] } /// Returns `true` if this operator would benefit from diff --git a/datafusion/core/src/physical_plan/projection.rs b/datafusion/core/src/physical_plan/projection.rs index e418a8f934442..f699a760b4772 100644 --- a/datafusion/core/src/physical_plan/projection.rs +++ b/datafusion/core/src/physical_plan/projection.rs @@ -196,9 +196,9 @@ impl ExecutionPlan for ProjectionExec { self.output_ordering.as_deref() } - fn maintains_input_order(&self) -> bool { + fn maintains_input_order(&self) -> Vec { // tell optimizer this operator doesn't reorder its input - true + vec![true] } fn equivalence_properties(&self) -> EquivalenceProperties { diff --git a/datafusion/core/src/physical_plan/repartition/mod.rs b/datafusion/core/src/physical_plan/repartition/mod.rs index 1d0f1fe5cc5dc..ce87e78d6a4a0 100644 --- a/datafusion/core/src/physical_plan/repartition/mod.rs +++ b/datafusion/core/src/physical_plan/repartition/mod.rs @@ -331,16 +331,16 @@ impl ExecutionPlan for RepartitionExec { } fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { - if self.maintains_input_order() { + if self.maintains_input_order()[0] { self.input().output_ordering() } else { None } } - fn maintains_input_order(&self) -> bool { + fn maintains_input_order(&self) -> Vec { // We preserve ordering when input partitioning is 1 - self.input().output_partitioning().partition_count() <= 1 + vec![self.input().output_partitioning().partition_count() <= 1] } fn equivalence_properties(&self) -> EquivalenceProperties { diff --git a/datafusion/core/src/physical_plan/union.rs b/datafusion/core/src/physical_plan/union.rs index a0fca80661158..d220a094d1352 100644 --- a/datafusion/core/src/physical_plan/union.rs +++ b/datafusion/core/src/physical_plan/union.rs @@ -247,7 +247,7 @@ impl ExecutionPlan for UnionExec { } } - fn maintains_input_order(&self) -> bool { + fn maintains_input_order(&self) -> Vec { let first_input_ordering = self.inputs[0].output_ordering(); // If the Union is not partition aware and all the input // ordering spec strictly equal with the first_input_ordering, @@ -256,19 +256,19 @@ impl ExecutionPlan for UnionExec { // It might be too strict here in the case that the input // ordering are compatible but not exactly the same. See // comments in output_ordering - !self.partition_aware - && first_input_ordering.is_some() - && self - .inputs - .iter() - .map(|plan| plan.output_ordering()) - .all(|ordering| { - ordering.is_some() - && sort_expr_list_eq_strict_order( - ordering.unwrap(), - first_input_ordering.unwrap(), - ) - }) + let res = + !self.partition_aware + && first_input_ordering.is_some() + && self.inputs.iter().map(|plan| plan.output_ordering()).all( + |ordering| { + ordering.is_some() + && sort_expr_list_eq_strict_order( + ordering.unwrap(), + first_input_ordering.unwrap(), + ) + }, + ); + vec![res; self.inputs.len()] } fn with_new_children( diff --git a/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs b/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs index 5ed6a112c82f5..b3b4609818fcf 100644 --- a/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs +++ b/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs @@ -186,8 +186,8 @@ impl ExecutionPlan for BoundedWindowAggExec { self.input().equivalence_properties() } - fn maintains_input_order(&self) -> bool { - true + fn maintains_input_order(&self) -> Vec { + vec![true] } fn with_new_children( diff --git a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs index bd413ad8eac9a..fbd05fa884857 100644 --- a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs +++ b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs @@ -168,8 +168,8 @@ impl ExecutionPlan for WindowAggExec { self.input().output_ordering() } - fn maintains_input_order(&self) -> bool { - true + fn maintains_input_order(&self) -> Vec { + vec![true] } fn required_input_ordering(&self) -> Vec> { diff --git a/datafusion/core/src/scheduler/pipeline/execution.rs b/datafusion/core/src/scheduler/pipeline/execution.rs index b5aa9aff6dde8..18d1aa2aa2454 100644 --- a/datafusion/core/src/scheduler/pipeline/execution.rs +++ b/datafusion/core/src/scheduler/pipeline/execution.rs @@ -239,7 +239,7 @@ impl ExecutionPlan for ProxyExecutionPlan { self.inner.required_input_distribution() } - fn maintains_input_order(&self) -> bool { + fn maintains_input_order(&self) -> Vec { self.inner.maintains_input_order() } From 147c99b0e4f619de359b1f9632cf6b6acabb7a1b Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Wed, 18 Jan 2023 16:32:14 +0300 Subject: [PATCH 2/9] minor changes --- .../physical_optimizer/sort_enforcement.rs | 84 +++++++++++++++- .../core/src/physical_optimizer/utils.rs | 55 +---------- datafusion/core/src/physical_plan/union.rs | 95 ++++++++++--------- datafusion/physical-expr/src/utils.rs | 84 +++++++++++++++- 4 files changed, 214 insertions(+), 104 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs b/datafusion/core/src/physical_optimizer/sort_enforcement.rs index edab3014439e5..077b1e0aac860 100644 --- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs +++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs @@ -31,9 +31,7 @@ //! by another SortExec. Therefore, this rule removes it from the physical plan. use crate::config::ConfigOptions; use crate::error::Result; -use crate::physical_optimizer::utils::{ - add_sort_above_child, ordering_satisfy, ordering_satisfy_concrete, -}; +use crate::physical_optimizer::utils::add_sort_above_child; use crate::physical_optimizer::PhysicalOptimizerRule; use crate::physical_plan::rewrite::TreeNodeRewritable; use crate::physical_plan::sorts::sort::SortExec; @@ -41,6 +39,7 @@ use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec}; use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan}; use arrow::datatypes::SchemaRef; use datafusion_common::{reverse_sort_options, DataFusionError}; +use datafusion_physical_expr::utils::{ordering_satisfy, ordering_satisfy_concrete}; use datafusion_physical_expr::window::WindowExpr; use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr}; use itertools::izip; @@ -113,7 +112,16 @@ impl TreeNodeRewritable for PlanWithCorrespondingSort { } else { // TODO: When `maintains_input_order` returns Vec, // pass the order-enforcing sort upwards. - item.sort_onwards[0].clone() + let mut res = item.sort_onwards[0].clone(); + for (idx, is_maintained) in + item.plan.maintains_input_order().into_iter().enumerate() + { + if is_maintained { + res = item.sort_onwards[idx].clone(); + break; + } + } + res } }) .collect::>(); @@ -844,6 +852,74 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_union_inputs_different_sorted() -> Result<()> { + let schema = create_test_schema()?; + + let source1 = parquet_exec(&schema); + let sort_exprs = vec![sort_expr("nullable_col", &schema)]; + let sort = sort_exec(sort_exprs.clone(), source1); + + let parquet_sort_exprs = vec![ + sort_expr("nullable_col", &schema), + sort_expr("non_nullable_col", &schema), + ]; + let source2 = parquet_exec_sorted(&schema, parquet_sort_exprs); + + let union = union_exec(vec![source2, sort]); + let physical_plan = sort_preserving_merge_exec(sort_exprs, union); + + // one input to the union is already sorted, one is not. + let expected_input = vec![ + "SortPreservingMergeExec: [nullable_col@0 ASC]", + " UnionExec", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC, non_nullable_col@1 ASC], projection=[nullable_col, non_nullable_col]", + " SortExec: [nullable_col@0 ASC]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + ]; + // should not add a sort at the output of the union, input plan should not be changed + let expected_optimized = expected_input.clone(); + assert_optimized!(expected_input, expected_optimized, physical_plan); + Ok(()) + } + + #[tokio::test] + async fn test_union_inputs_different_sorted2() -> Result<()> { + let schema = create_test_schema()?; + + let source1 = parquet_exec(&schema); + let sort_exprs = vec![ + sort_expr("nullable_col", &schema), + sort_expr("non_nullable_col", &schema), + ]; + let sort = sort_exec(sort_exprs, source1); + + let parquet_sort_exprs = vec![sort_expr("nullable_col", &schema)]; + let source2 = parquet_exec_sorted(&schema, parquet_sort_exprs.clone()); + + let union = union_exec(vec![source2, sort]); + let physical_plan = sort_preserving_merge_exec(parquet_sort_exprs, union); + + // one input to the union is already sorted, one is not. + let expected_input = vec![ + "SortPreservingMergeExec: [nullable_col@0 ASC]", + " UnionExec", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]", + " SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + ]; + // should remove finer sorting from below and move it to top where sorting is not unnecessarily finer + let expected_optimized = vec![ + "SortPreservingMergeExec: [nullable_col@0 ASC]", + " SortExec: [nullable_col@0 ASC]", + " UnionExec", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + ]; + assert_optimized!(expected_input, expected_optimized, physical_plan); + Ok(()) + } + /// make PhysicalSortExpr with default options fn sort_expr(name: &str, schema: &Schema) -> PhysicalSortExpr { sort_expr_options(name, schema, SortOptions::default()) diff --git a/datafusion/core/src/physical_optimizer/utils.rs b/datafusion/core/src/physical_optimizer/utils.rs index ae1b58815fdaf..13e04bbc2ae83 100644 --- a/datafusion/core/src/physical_optimizer/utils.rs +++ b/datafusion/core/src/physical_optimizer/utils.rs @@ -23,10 +23,7 @@ use crate::config::ConfigOptions; use crate::error::Result; use crate::physical_plan::sorts::sort::SortExec; use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan}; -use datafusion_physical_expr::{ - normalize_sort_expr_with_equivalence_properties, EquivalenceProperties, - PhysicalSortExpr, -}; +use datafusion_physical_expr::PhysicalSortExpr; use std::sync::Arc; /// Convenience rule for writing optimizers: recursively invoke @@ -51,56 +48,6 @@ pub fn optimize_children( } } -/// Checks whether given ordering requirements are satisfied by provided [PhysicalSortExpr]s. -pub fn ordering_satisfy EquivalenceProperties>( - provided: Option<&[PhysicalSortExpr]>, - required: Option<&[PhysicalSortExpr]>, - equal_properties: F, -) -> bool { - match (provided, required) { - (_, None) => true, - (None, Some(_)) => false, - (Some(provided), Some(required)) => { - ordering_satisfy_concrete(provided, required, equal_properties) - } - } -} - -pub fn ordering_satisfy_concrete EquivalenceProperties>( - provided: &[PhysicalSortExpr], - required: &[PhysicalSortExpr], - equal_properties: F, -) -> bool { - if required.len() > provided.len() { - false - } else if required - .iter() - .zip(provided.iter()) - .all(|(order1, order2)| order1.eq(order2)) - { - true - } else if let eq_classes @ [_, ..] = equal_properties().classes() { - let normalized_required_exprs = required - .iter() - .map(|e| { - normalize_sort_expr_with_equivalence_properties(e.clone(), eq_classes) - }) - .collect::>(); - let normalized_provided_exprs = provided - .iter() - .map(|e| { - normalize_sort_expr_with_equivalence_properties(e.clone(), eq_classes) - }) - .collect::>(); - normalized_required_exprs - .iter() - .zip(normalized_provided_exprs.iter()) - .all(|(order1, order2)| order1.eq(order2)) - } else { - false - } -} - /// Util function to add SortExec above child /// preserving the original partitioning pub fn add_sort_above_child( diff --git a/datafusion/core/src/physical_plan/union.rs b/datafusion/core/src/physical_plan/union.rs index d220a094d1352..9a5c2a2dbe793 100644 --- a/datafusion/core/src/physical_plan/union.rs +++ b/datafusion/core/src/physical_plan/union.rs @@ -47,7 +47,7 @@ use crate::{ error::Result, physical_plan::{expressions, metrics::BaselineMetrics}, }; -use datafusion_physical_expr::sort_expr_list_eq_strict_order; +use datafusion_physical_expr::utils::ordering_satisfy; use tokio::macros::support::thread_rng_n; /// `UnionExec`: `UNION ALL` execution plan. @@ -220,57 +220,62 @@ impl ExecutionPlan for UnionExec { } fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { - let first_input_ordering = self.inputs[0].output_ordering(); - // If the Union is not partition aware and all the input ordering spec strictly equal with the first_input_ordering - // Return the first_input_ordering as the output_ordering - // - // It might be too strict here in the case that the input ordering are compatible but not exactly the same. - // For example one input ordering has the ordering spec SortExpr('a','b','c') and the other has the ordering - // spec SortExpr('a'), It is safe to derive the out ordering with the spec SortExpr('a'). + // Return the output ordering of first child where maintains_input_order is `true`. + // If none of the children preserves ordering. Return `None`. + let mut res = None; + for (idx, is_maintained) in self.maintains_input_order().into_iter().enumerate() { + if is_maintained { + res = self.inputs[idx].output_ordering(); + break; + } + } + res + } + + fn maintains_input_order(&self) -> Vec { + // If the Union is not partition aware and output is sorted at least one of the children + // maintains ordering. + // For instance assume: child1 is SortExpr('a','b','c'), child2 is SortExpr('a','b') and + // child3 is SortExpr('a','b'). Output ordering would be + // SortExpr('a','b') (Common subset of all input orderings). In this scheme, this function will return + // vec![false, true, true]. Indicating ordering for 2nd and 3rd child is preserved but 1st child is not. + let mut smallest: Option<&[PhysicalSortExpr]> = None; + for elem in self.inputs.iter() { + match (elem.output_ordering(), smallest) { + (Some(elem_ordering), Some(smallest_ordering)) => { + if elem_ordering.len() < smallest_ordering.len() { + smallest = Some(elem_ordering); + } + } + (Some(elem_ordering), None) => { + smallest = Some(elem_ordering); + } + (None, _) => { + smallest = None; + break; + } + } + } if !self.partition_aware - && first_input_ordering.is_some() - && self - .inputs - .iter() - .map(|plan| plan.output_ordering()) - .all(|ordering| { - ordering.is_some() - && sort_expr_list_eq_strict_order( - ordering.unwrap(), - first_input_ordering.unwrap(), - ) + && self.inputs.iter().all(|child| { + ordering_satisfy(child.output_ordering(), smallest, || { + child.equivalence_properties() }) + }) { - first_input_ordering + // smallest is the output_ordering + let mut res = vec![]; + for child in &self.inputs { + res.push(ordering_satisfy(smallest, child.output_ordering(), || { + child.equivalence_properties() + })) + } + res } else { - None + vec![false; self.inputs.len()] } } - fn maintains_input_order(&self) -> Vec { - let first_input_ordering = self.inputs[0].output_ordering(); - // If the Union is not partition aware and all the input - // ordering spec strictly equal with the first_input_ordering, - // then the `UnionExec` maintains the input order - // - // It might be too strict here in the case that the input - // ordering are compatible but not exactly the same. See - // comments in output_ordering - let res = - !self.partition_aware - && first_input_ordering.is_some() - && self.inputs.iter().map(|plan| plan.output_ordering()).all( - |ordering| { - ordering.is_some() - && sort_expr_list_eq_strict_order( - ordering.unwrap(), - first_input_ordering.unwrap(), - ) - }, - ); - vec![res; self.inputs.len()] - } - fn with_new_children( self: Arc, children: Vec>, diff --git a/datafusion/physical-expr/src/utils.rs b/datafusion/physical-expr/src/utils.rs index ea64d739769f6..d6d5054ffef03 100644 --- a/datafusion/physical-expr/src/utils.rs +++ b/datafusion/physical-expr/src/utils.rs @@ -20,8 +20,8 @@ use crate::expressions::BinaryExpr; use crate::expressions::Column; use crate::expressions::UnKnownColumn; use crate::rewrite::TreeNodeRewritable; -use crate::PhysicalExpr; use crate::PhysicalSortExpr; +use crate::{EquivalenceProperties, PhysicalExpr}; use datafusion_expr::Operator; use arrow::datatypes::SchemaRef; @@ -185,6 +185,56 @@ pub fn normalize_sort_expr_with_equivalence_properties( } } +/// Checks whether given ordering requirements are satisfied by provided [PhysicalSortExpr]s. +pub fn ordering_satisfy EquivalenceProperties>( + provided: Option<&[PhysicalSortExpr]>, + required: Option<&[PhysicalSortExpr]>, + equal_properties: F, +) -> bool { + match (provided, required) { + (_, None) => true, + (None, Some(_)) => false, + (Some(provided), Some(required)) => { + ordering_satisfy_concrete(provided, required, equal_properties) + } + } +} + +pub fn ordering_satisfy_concrete EquivalenceProperties>( + provided: &[PhysicalSortExpr], + required: &[PhysicalSortExpr], + equal_properties: F, +) -> bool { + if required.len() > provided.len() { + false + } else if required + .iter() + .zip(provided.iter()) + .all(|(order1, order2)| order1.eq(order2)) + { + true + } else if let eq_classes @ [_, ..] = equal_properties().classes() { + let normalized_required_exprs = required + .iter() + .map(|e| { + normalize_sort_expr_with_equivalence_properties(e.clone(), eq_classes) + }) + .collect::>(); + let normalized_provided_exprs = provided + .iter() + .map(|e| { + normalize_sort_expr_with_equivalence_properties(e.clone(), eq_classes) + }) + .collect::>(); + normalized_required_exprs + .iter() + .zip(normalized_provided_exprs.iter()) + .all(|(order1, order2)| order1.eq(order2)) + } else { + false + } +} + #[cfg(test)] mod tests { @@ -194,6 +244,7 @@ mod tests { use arrow::compute::SortOptions; use datafusion_common::Result; + use arrow_schema::Schema; use std::sync::Arc; #[test] @@ -342,4 +393,35 @@ mod tests { Ok(()) } + + #[test] + fn test_ordering_satisfy() -> Result<()> { + let crude = vec![PhysicalSortExpr { + expr: Arc::new(Column::new("a", 0)), + options: SortOptions::default(), + }]; + let crude = Some(&crude[..]); + let finer = vec![ + PhysicalSortExpr { + expr: Arc::new(Column::new("a", 0)), + options: SortOptions::default(), + }, + PhysicalSortExpr { + expr: Arc::new(Column::new("b", 1)), + options: SortOptions::default(), + }, + ]; + let finer = Some(&finer[..]); + let empty_schema = &Arc::new(Schema { + fields: vec![], + metadata: Default::default(), + }); + assert!(ordering_satisfy(finer, crude, || { + EquivalenceProperties::new(empty_schema.clone()) + })); + assert!(!ordering_satisfy(crude, finer, || { + EquivalenceProperties::new(empty_schema.clone()) + })); + Ok(()) + } } From d970ea1abbd1a91306bb250d1bb50d04a14f7f3c Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Wed, 18 Jan 2023 17:24:54 +0300 Subject: [PATCH 3/9] remove some todos --- .../core/src/physical_optimizer/sort_enforcement.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs b/datafusion/core/src/physical_optimizer/sort_enforcement.rs index 077b1e0aac860..2bff32b870b18 100644 --- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs +++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs @@ -110,13 +110,12 @@ impl TreeNodeRewritable for PlanWithCorrespondingSort { if item.sort_onwards.is_empty() { vec![] } else { - // TODO: When `maintains_input_order` returns Vec, - // pass the order-enforcing sort upwards. - let mut res = item.sort_onwards[0].clone(); + let is_sort = item.plan.as_any().is::(); + let mut res = vec![]; for (idx, is_maintained) in item.plan.maintains_input_order().into_iter().enumerate() { - if is_maintained { + if is_maintained || is_sort { res = item.sort_onwards[idx].clone(); break; } @@ -255,7 +254,6 @@ fn ensure_sorting( .enumerate() .take(new_plan.children().len()) { - // TODO: When `maintains_input_order` returns a `Vec`, use corresponding index. if new_plan.maintains_input_order()[idx] && required_ordering.is_none() && !trace.is_empty() From 5a7cbf8c0f4c8101ef2bf76497640c3bdb789f6f Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Thu, 19 Jan 2023 13:35:35 +0300 Subject: [PATCH 4/9] minor changes --- .../physical_optimizer/sort_enforcement.rs | 4 +-- datafusion/core/src/physical_plan/union.rs | 28 ++++++++----------- 2 files changed, 13 insertions(+), 19 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs b/datafusion/core/src/physical_optimizer/sort_enforcement.rs index 2bff32b870b18..55e6143277070 100644 --- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs +++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs @@ -112,10 +112,10 @@ impl TreeNodeRewritable for PlanWithCorrespondingSort { } else { let is_sort = item.plan.as_any().is::(); let mut res = vec![]; - for (idx, is_maintained) in + for (idx, maintains) in item.plan.maintains_input_order().into_iter().enumerate() { - if is_maintained || is_sort { + if maintains || is_sort { res = item.sort_onwards[idx].clone(); break; } diff --git a/datafusion/core/src/physical_plan/union.rs b/datafusion/core/src/physical_plan/union.rs index 9a5c2a2dbe793..a6b341bb83d77 100644 --- a/datafusion/core/src/physical_plan/union.rs +++ b/datafusion/core/src/physical_plan/union.rs @@ -222,14 +222,12 @@ impl ExecutionPlan for UnionExec { fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { // Return the output ordering of first child where maintains_input_order is `true`. // If none of the children preserves ordering. Return `None`. - let mut res = None; - for (idx, is_maintained) in self.maintains_input_order().into_iter().enumerate() { - if is_maintained { - res = self.inputs[idx].output_ordering(); - break; + for (idx, maintains) in self.maintains_input_order().into_iter().enumerate() { + if maintains { + return self.inputs[idx].output_ordering(); } } - res + None } fn maintains_input_order(&self) -> Vec { @@ -241,19 +239,15 @@ impl ExecutionPlan for UnionExec { // vec![false, true, true]. Indicating ordering for 2nd and 3rd child is preserved but 1st child is not. let mut smallest: Option<&[PhysicalSortExpr]> = None; for elem in self.inputs.iter() { - match (elem.output_ordering(), smallest) { - (Some(elem_ordering), Some(smallest_ordering)) => { - if elem_ordering.len() < smallest_ordering.len() { - smallest = Some(elem_ordering); - } - } - (Some(elem_ordering), None) => { + if let Some(elem_ordering) = elem.output_ordering() { + if smallest.is_none() + || smallest.is_some() && elem_ordering.len() < smallest.unwrap().len() + { smallest = Some(elem_ordering); } - (None, _) => { - smallest = None; - break; - } + } else { + smallest = None; + break; } } if !self.partition_aware From 5c967f4557d9c78190930d3ed67eb9d3f75ecc73 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Fri, 20 Jan 2023 11:28:19 +0300 Subject: [PATCH 5/9] add new test --- .../physical_optimizer/sort_enforcement.rs | 37 +++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs b/datafusion/core/src/physical_optimizer/sort_enforcement.rs index 55e6143277070..45ce721139276 100644 --- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs +++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs @@ -918,6 +918,43 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_union_inputs_different_sorted3() -> Result<()> { + let schema = create_test_schema()?; + + let source1 = parquet_exec(&schema); + let sort_exprs = vec![ + sort_expr("nullable_col", &schema), + sort_expr("non_nullable_col", &schema), + ]; + let sort = sort_exec(sort_exprs.clone(), source1); + + let parquet_sort_exprs = vec![sort_expr("nullable_col", &schema)]; + let source2 = parquet_exec_sorted(&schema, parquet_sort_exprs); + + let union = union_exec(vec![source2, sort]); + let physical_plan = sort_preserving_merge_exec(sort_exprs, union); + + // one input to the union is already sorted, one is not. + let expected_input = vec![ + "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]", + " UnionExec", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]", + " SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + ]; + // should remove unnecessary sorting from below and move it to top + let expected_optimized = vec![ + "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]", + " SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]", + " UnionExec", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + ]; + assert_optimized!(expected_input, expected_optimized, physical_plan); + Ok(()) + } + /// make PhysicalSortExpr with default options fn sort_expr(name: &str, schema: &Schema) -> PhysicalSortExpr { sort_expr_options(name, schema, SortOptions::default()) From 0a6827e6cb3767d3b05e0c3afb62a7a51a547d53 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Mon, 23 Jan 2023 17:55:29 +0300 Subject: [PATCH 6/9] minor changes --- .../core/src/physical_optimizer/sort_enforcement.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs b/datafusion/core/src/physical_optimizer/sort_enforcement.rs index 95e417082dc5a..331adc9b7fbd4 100644 --- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs +++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs @@ -115,7 +115,9 @@ impl TreeNodeRewritable for PlanWithCorrespondingSort { for (idx, maintains) in item.plan.maintains_input_order().into_iter().enumerate() { - if maintains || is_sort { + if (maintains || is_sort) + && !item.sort_onwards[idx].is_empty() + { res = item.sort_onwards[idx].clone(); break; } @@ -384,17 +386,15 @@ fn convert_to_sort_exec(sort_any: &Arc) -> Result<&SortExec> fn remove_corresponding_sort_from_sub_plan( sort_onwards: &mut Vec<(usize, Arc)>, ) -> Result> { - let (sort_child_idx, sort_any) = sort_onwards[0].clone(); + let (_, sort_any) = sort_onwards[0].clone(); let sort_exec = convert_to_sort_exec(&sort_any)?; let mut prev_layer = sort_exec.input().clone(); - let mut prev_child_idx = sort_child_idx; // In the loop below, se start from 1 as the first one is a SortExec // and we are removing it from the plan. for (child_idx, layer) in sort_onwards.iter().skip(1) { let mut children = layer.children(); - children[prev_child_idx] = prev_layer; + children[*child_idx] = prev_layer; prev_layer = layer.clone().with_new_children(children)?; - prev_child_idx = *child_idx; } // We have removed the sort, hence empty the sort_onwards: sort_onwards.clear(); From 674f6e524e29e33304a91e169cad4a0e4fb625e2 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Mon, 23 Jan 2023 19:51:08 +0300 Subject: [PATCH 7/9] update tests --- .../physical_optimizer/sort_enforcement.rs | 37 ------------------- 1 file changed, 37 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs b/datafusion/core/src/physical_optimizer/sort_enforcement.rs index 331adc9b7fbd4..7c7712b83795d 100644 --- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs +++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs @@ -857,43 +857,6 @@ mod tests { async fn test_union_inputs_different_sorted2() -> Result<()> { let schema = create_test_schema()?; - let source1 = parquet_exec(&schema); - let sort_exprs = vec![ - sort_expr("nullable_col", &schema), - sort_expr("non_nullable_col", &schema), - ]; - let sort = sort_exec(sort_exprs, source1); - - let parquet_sort_exprs = vec![sort_expr("nullable_col", &schema)]; - let source2 = parquet_exec_sorted(&schema, parquet_sort_exprs.clone()); - - let union = union_exec(vec![source2, sort]); - let physical_plan = sort_preserving_merge_exec(parquet_sort_exprs, union); - - // one input to the union is already sorted, one is not. - let expected_input = vec![ - "SortPreservingMergeExec: [nullable_col@0 ASC]", - " UnionExec", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]", - " SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", - ]; - // should remove finer sorting from below and move it to top where sorting is not unnecessarily finer - let expected_optimized = vec![ - "SortPreservingMergeExec: [nullable_col@0 ASC]", - " SortExec: [nullable_col@0 ASC]", - " UnionExec", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", - ]; - assert_optimized!(expected_input, expected_optimized, physical_plan); - Ok(()) - } - - #[tokio::test] - async fn test_union_inputs_different_sorted3() -> Result<()> { - let schema = create_test_schema()?; - let source1 = parquet_exec(&schema); let sort_exprs = vec![ sort_expr("nullable_col", &schema), From d8bf0898fea49d7f4e4ce1536851332613215319 Mon Sep 17 00:00:00 2001 From: Mehmet Ozan Kabak Date: Mon, 23 Jan 2023 20:16:33 -0600 Subject: [PATCH 8/9] Refactor and simplify logic, add comments --- .../src/physical_optimizer/repartition.rs | 27 +++---- .../physical_optimizer/sort_enforcement.rs | 19 ++--- datafusion/core/src/physical_plan/union.rs | 79 ++++++++++--------- 3 files changed, 56 insertions(+), 69 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs index cf5ad132fcdb2..1285b9089c8d8 100644 --- a/datafusion/core/src/physical_optimizer/repartition.rs +++ b/datafusion/core/src/physical_optimizer/repartition.rs @@ -179,28 +179,19 @@ fn optimize_partitions( .iter() .enumerate() .map(|(idx, child)| { - // does plan itelf (not parent) require its input to + // Does plan itself (not its parent) require its input to // be sorted in some way? let required_input_ordering = plan_has_required_input_ordering(plan.as_ref()); - let can_reorder_child = if can_reorder { - // parent of `plan` will not use any particular order - - // if `plan` itself doesn't need order OR - !required_input_ordering || - // child has no order to preserve - child.output_ordering().is_none() - } else { - // parent would like to use the `plan`'s output - // order. - - // if `plan` doesn't maintain the input order and - // doesn't need the child's output order itself - (!plan.maintains_input_order()[idx] && !required_input_ordering) || - // child has no ordering to preserve - child.output_ordering().is_none() - }; + // We can reorder a child if: + // - It has no ordering to preserve, or + // - Its parent has no required input ordering and does not + // maintain input ordering. + // Check if this condition holds: + let can_reorder_child = child.output_ordering().is_none() + || (!required_input_ordering + && (can_reorder || !plan.maintains_input_order()[idx])); optimize_partitions( target_partitions, diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs b/datafusion/core/src/physical_optimizer/sort_enforcement.rs index 7c7712b83795d..692c81c623030 100644 --- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs +++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs @@ -107,23 +107,18 @@ impl TreeNodeRewritable for PlanWithCorrespondingSort { let sort_onwards = children_requirements .iter() .map(|item| { - if item.sort_onwards.is_empty() { - vec![] - } else { + let onwards = &item.sort_onwards; + if !onwards.is_empty() { let is_sort = item.plan.as_any().is::(); - let mut res = vec![]; - for (idx, maintains) in - item.plan.maintains_input_order().into_iter().enumerate() + let flags = item.plan.maintains_input_order(); + for (maintains, element) in flags.into_iter().zip(onwards.iter()) { - if (maintains || is_sort) - && !item.sort_onwards[idx].is_empty() - { - res = item.sort_onwards[idx].clone(); - break; + if (maintains || is_sort) && !element.is_empty() { + return element.clone(); } } - res } + vec![] }) .collect::>(); let plan = with_new_children_if_necessary(self.plan, children_plans)?; diff --git a/datafusion/core/src/physical_plan/union.rs b/datafusion/core/src/physical_plan/union.rs index a6b341bb83d77..d685d2a56d7f2 100644 --- a/datafusion/core/src/physical_plan/union.rs +++ b/datafusion/core/src/physical_plan/union.rs @@ -220,56 +220,57 @@ impl ExecutionPlan for UnionExec { } fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { - // Return the output ordering of first child where maintains_input_order is `true`. - // If none of the children preserves ordering. Return `None`. - for (idx, maintains) in self.maintains_input_order().into_iter().enumerate() { - if maintains { - return self.inputs[idx].output_ordering(); - } + // If the Union is partition aware, there is no output ordering. + // Otherwise, the output ordering is the "meet" of its input orderings. + // The meet is the finest ordering that satisfied by all the input + // orderings, see https://en.wikipedia.org/wiki/Join_and_meet. + if self.partition_aware { + return None; } - None - } - - fn maintains_input_order(&self) -> Vec { - // If the Union is not partition aware and output is sorted at least one of the children - // maintains ordering. - // For instance assume: child1 is SortExpr('a','b','c'), child2 is SortExpr('a','b') and - // child3 is SortExpr('a','b'). Output ordering would be - // SortExpr('a','b') (Common subset of all input orderings). In this scheme, this function will return - // vec![false, true, true]. Indicating ordering for 2nd and 3rd child is preserved but 1st child is not. + // To find the meet, we first find the smallest input ordering. let mut smallest: Option<&[PhysicalSortExpr]> = None; - for elem in self.inputs.iter() { - if let Some(elem_ordering) = elem.output_ordering() { - if smallest.is_none() - || smallest.is_some() && elem_ordering.len() < smallest.unwrap().len() - { - smallest = Some(elem_ordering); + for item in self.inputs.iter() { + if let Some(ordering) = item.output_ordering() { + smallest = match smallest { + None => Some(ordering), + Some(expr) if ordering.len() < expr.len() => Some(ordering), + _ => continue, } } else { - smallest = None; - break; + return None; } } - if !self.partition_aware - && self.inputs.iter().all(|child| { - ordering_satisfy(child.output_ordering(), smallest, || { - child.equivalence_properties() - }) + // Check if the smallest ordering is a meet or not: + if self.inputs.iter().all(|child| { + ordering_satisfy(child.output_ordering(), smallest, || { + child.equivalence_properties() }) - { - // smallest is the output_ordering - let mut res = vec![]; - for child in &self.inputs { - res.push(ordering_satisfy(smallest, child.output_ordering(), || { - child.equivalence_properties() - })) - } - res + }) { + smallest } else { - vec![false; self.inputs.len()] + None } } + fn maintains_input_order(&self) -> Vec { + // If the Union has an output ordering, it maintains at least one + // child's ordering (i.e. the meet). + // For instance, assume that the first child is SortExpr('a','b','c'), + // the second child is SortExpr('a','b') and the third child is + // SortExpr('a','b'). The output ordering would be SortExpr('a','b'), + // which is the "meet" of all input orderings. In this example, this + // function will return vec![false, true, true], indicating that we + // preserve the orderings for the 2nd and the 3rd children. + self.inputs() + .iter() + .map(|child| { + ordering_satisfy(self.output_ordering(), child.output_ordering(), || { + child.equivalence_properties() + }) + }) + .collect() + } + fn with_new_children( self: Arc, children: Vec>, From 02e5b58c50f5ead5d425f29555aab60c3a30408e Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Tue, 24 Jan 2023 17:51:21 +0300 Subject: [PATCH 9/9] Address reviews --- .../physical_optimizer/sort_enforcement.rs | 20 ++++-- datafusion/core/src/physical_plan/common.rs | 63 +++++++++++++++++++ datafusion/core/src/physical_plan/union.rs | 25 +------- 3 files changed, 81 insertions(+), 27 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs b/datafusion/core/src/physical_optimizer/sort_enforcement.rs index 692c81c623030..c9a3c8fec293e 100644 --- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs +++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs @@ -109,11 +109,14 @@ impl TreeNodeRewritable for PlanWithCorrespondingSort { .map(|item| { let onwards = &item.sort_onwards; if !onwards.is_empty() { - let is_sort = item.plan.as_any().is::(); let flags = item.plan.maintains_input_order(); + // `onwards` starts from sort introducing executor(e.g `SortExec`, `SortPreservingMergeExec`) till the current executor + // if the executors in between maintain input ordering. If we are at + // the beginning both `SortExec` and `SortPreservingMergeExec` doesn't maintain ordering(they introduce ordering). + // However, we want to propagate them above anyway. for (maintains, element) in flags.into_iter().zip(onwards.iter()) { - if (maintains || is_sort) && !element.is_empty() { + if (maintains || is_sort(&item.plan)) && !element.is_empty() { return element.clone(); } } @@ -148,6 +151,12 @@ impl PhysicalOptimizerRule for EnforceSorting { } } +// Checks whether executor is Sort +// TODO: Add support for SortPreservingMergeExec also. +fn is_sort(plan: &Arc) -> bool { + plan.as_any().is::() +} + fn ensure_sorting( requirements: PlanWithCorrespondingSort, ) -> Result> { @@ -258,7 +267,7 @@ fn ensure_sorting( trace.push((idx, new_plan.clone())); } else { trace.clear(); - if new_plan.as_any().is::() { + if is_sort(&new_plan) { trace.push((idx, new_plan.clone())); } } @@ -865,7 +874,10 @@ mod tests { let union = union_exec(vec![source2, sort]); let physical_plan = sort_preserving_merge_exec(sort_exprs, union); - // one input to the union is already sorted, one is not. + // Input is an invalid plan. In this case rule should add required sorting in appropriate places. + // First ParquetExec has output ordering(nullable_col@0 ASC). However, it doesn't satisfy required ordering + // of SortPreservingMergeExec. Hence rule should remove unnecessary sort for second child of the UnionExec + // and put a sort above Union to satisfy required ordering. let expected_input = vec![ "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]", " UnionExec", diff --git a/datafusion/core/src/physical_plan/common.rs b/datafusion/core/src/physical_plan/common.rs index 2f5f5e5670fec..c3e243d83904a 100644 --- a/datafusion/core/src/physical_plan/common.rs +++ b/datafusion/core/src/physical_plan/common.rs @@ -28,6 +28,8 @@ use arrow::error::ArrowError; use arrow::error::Result as ArrowResult; use arrow::ipc::writer::{FileWriter, IpcWriteOptions}; use arrow::record_batch::RecordBatch; +use datafusion_physical_expr::utils::ordering_satisfy; +use datafusion_physical_expr::PhysicalSortExpr; use futures::{Future, Stream, StreamExt, TryStreamExt}; use log::debug; use pin_project_lite::pin_project; @@ -322,15 +324,76 @@ pub fn transpose(original: Vec>) -> Vec> { } } +/// Calculates the "meet" of children orderings +/// The meet is the finest ordering that satisfied by all the input +/// orderings, see https://en.wikipedia.org/wiki/Join_and_meet. +pub fn get_meet_of_orderings( + children: &[Arc], +) -> Option<&[PhysicalSortExpr]> { + // To find the meet, we first find the smallest input ordering. + let mut smallest: Option<&[PhysicalSortExpr]> = None; + for item in children.iter() { + if let Some(ordering) = item.output_ordering() { + smallest = match smallest { + None => Some(ordering), + Some(expr) if ordering.len() < expr.len() => Some(ordering), + _ => continue, + } + } else { + return None; + } + } + // Check if the smallest ordering is a meet or not: + if children.iter().all(|child| { + ordering_satisfy(child.output_ordering(), smallest, || { + child.equivalence_properties() + }) + }) { + smallest + } else { + None + } +} + #[cfg(test)] mod tests { use super::*; use crate::from_slice::FromSlice; + use crate::physical_plan::memory::MemoryExec; + use crate::physical_plan::sorts::sort::SortExec; + use crate::physical_plan::union::UnionExec; + use arrow::compute::SortOptions; use arrow::{ array::{Float32Array, Float64Array}, datatypes::{DataType, Field, Schema}, record_batch::RecordBatch, }; + use datafusion_physical_expr::expressions::col; + + #[test] + fn test_meet_of_orderings() -> Result<()> { + let schema = Arc::new(Schema::new(vec![ + Field::new("f32", DataType::Float32, false), + Field::new("f64", DataType::Float64, false), + ])); + let sort_expr = vec![PhysicalSortExpr { + expr: col("f32", &schema).unwrap(), + options: SortOptions::default(), + }]; + let memory_exec = Arc::new(MemoryExec::try_new(&[], schema.clone(), None)?) as _; + let sort_exec = Arc::new(SortExec::try_new(sort_expr.clone(), memory_exec, None)?) + as Arc; + let memory_exec2 = Arc::new(MemoryExec::try_new(&[], schema, None)?) as _; + // memory_exec2 doesn't have output ordering + let union_exec = UnionExec::new(vec![sort_exec.clone(), memory_exec2]); + let res = get_meet_of_orderings(union_exec.inputs()); + assert!(res.is_none()); + + let union_exec = UnionExec::new(vec![sort_exec.clone(), sort_exec]); + let res = get_meet_of_orderings(union_exec.inputs()); + assert_eq!(res, Some(&sort_expr[..])); + Ok(()) + } #[test] fn test_compute_record_batch_statistics_empty() -> Result<()> { diff --git a/datafusion/core/src/physical_plan/union.rs b/datafusion/core/src/physical_plan/union.rs index d685d2a56d7f2..df78058082f59 100644 --- a/datafusion/core/src/physical_plan/union.rs +++ b/datafusion/core/src/physical_plan/union.rs @@ -43,6 +43,7 @@ use super::{ SendableRecordBatchStream, Statistics, }; use crate::execution::context::TaskContext; +use crate::physical_plan::common::get_meet_of_orderings; use crate::{ error::Result, physical_plan::{expressions, metrics::BaselineMetrics}, @@ -227,29 +228,7 @@ impl ExecutionPlan for UnionExec { if self.partition_aware { return None; } - // To find the meet, we first find the smallest input ordering. - let mut smallest: Option<&[PhysicalSortExpr]> = None; - for item in self.inputs.iter() { - if let Some(ordering) = item.output_ordering() { - smallest = match smallest { - None => Some(ordering), - Some(expr) if ordering.len() < expr.len() => Some(ordering), - _ => continue, - } - } else { - return None; - } - } - // Check if the smallest ordering is a meet or not: - if self.inputs.iter().all(|child| { - ordering_satisfy(child.output_ordering(), smallest, || { - child.equivalence_properties() - }) - }) { - smallest - } else { - None - } + get_meet_of_orderings(&self.inputs) } fn maintains_input_order(&self) -> Vec {