diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs index 8595aa8ab135a..1285b9089c8d8 100644 --- a/datafusion/core/src/physical_optimizer/repartition.rs +++ b/datafusion/core/src/physical_optimizer/repartition.rs @@ -177,29 +177,21 @@ fn optimize_partitions( let children = plan .children() .iter() - .map(|child| { - // does plan itelf (not parent) require its input to + .enumerate() + .map(|(idx, child)| { + // Does plan itself (not its parent) require its input to // be sorted in some way? let required_input_ordering = plan_has_required_input_ordering(plan.as_ref()); - let can_reorder_child = if can_reorder { - // parent of `plan` will not use any particular order - - // if `plan` itself doesn't need order OR - !required_input_ordering || - // child has no order to preserve - child.output_ordering().is_none() - } else { - // parent would like to use the `plan`'s output - // order. - - // if `plan` doesn't maintain the input order and - // doesn't need the child's output order itself - (!plan.maintains_input_order() && !required_input_ordering) || - // child has no ordering to preserve - child.output_ordering().is_none() - }; + // We can reorder a child if: + // - It has no ordering to preserve, or + // - Its parent has no required input ordering and does not + // maintain input ordering. + // Check if this condition holds: + let can_reorder_child = child.output_ordering().is_none() + || (!required_input_ordering + && (can_reorder || !plan.maintains_input_order()[idx])); optimize_partitions( target_partitions, diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs b/datafusion/core/src/physical_optimizer/sort_enforcement.rs index c17192cad1b59..c9a3c8fec293e 100644 --- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs +++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs @@ -31,9 +31,7 @@ //! by another SortExec. Therefore, this rule removes it from the physical plan. use crate::config::ConfigOptions; use crate::error::Result; -use crate::physical_optimizer::utils::{ - add_sort_above_child, ordering_satisfy, ordering_satisfy_concrete, -}; +use crate::physical_optimizer::utils::add_sort_above_child; use crate::physical_optimizer::PhysicalOptimizerRule; use crate::physical_plan::rewrite::TreeNodeRewritable; use crate::physical_plan::sorts::sort::SortExec; @@ -41,6 +39,7 @@ use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec}; use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan}; use arrow::datatypes::SchemaRef; use datafusion_common::{reverse_sort_options, DataFusionError}; +use datafusion_physical_expr::utils::{ordering_satisfy, ordering_satisfy_concrete}; use datafusion_physical_expr::window::WindowExpr; use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr}; use itertools::izip; @@ -108,13 +107,21 @@ impl TreeNodeRewritable for PlanWithCorrespondingSort { let sort_onwards = children_requirements .iter() .map(|item| { - if item.sort_onwards.is_empty() { - vec![] - } else { - // TODO: When `maintains_input_order` returns Vec, - // pass the order-enforcing sort upwards. - item.sort_onwards[0].clone() + let onwards = &item.sort_onwards; + if !onwards.is_empty() { + let flags = item.plan.maintains_input_order(); + // `onwards` starts from sort introducing executor(e.g `SortExec`, `SortPreservingMergeExec`) till the current executor + // if the executors in between maintain input ordering. If we are at + // the beginning both `SortExec` and `SortPreservingMergeExec` doesn't maintain ordering(they introduce ordering). + // However, we want to propagate them above anyway. + for (maintains, element) in flags.into_iter().zip(onwards.iter()) + { + if (maintains || is_sort(&item.plan)) && !element.is_empty() { + return element.clone(); + } + } } + vec![] }) .collect::>(); let plan = with_new_children_if_necessary(self.plan, children_plans)?; @@ -144,6 +151,12 @@ impl PhysicalOptimizerRule for EnforceSorting { } } +// Checks whether executor is Sort +// TODO: Add support for SortPreservingMergeExec also. +fn is_sort(plan: &Arc) -> bool { + plan.as_any().is::() +} + fn ensure_sorting( requirements: PlanWithCorrespondingSort, ) -> Result> { @@ -230,7 +243,7 @@ fn ensure_sorting( (None, Some(_)) => { // We have a SortExec whose effect may be neutralized by a order-imposing // operator. In this case, remove this sort: - if !requirements.plan.maintains_input_order() { + if !requirements.plan.maintains_input_order()[idx] { update_child_to_remove_unnecessary_sort(child, sort_onwards)?; } } @@ -247,15 +260,14 @@ fn ensure_sorting( .enumerate() .take(new_plan.children().len()) { - // TODO: When `maintains_input_order` returns a `Vec`, use corresponding index. - if new_plan.maintains_input_order() + if new_plan.maintains_input_order()[idx] && required_ordering.is_none() && !trace.is_empty() { trace.push((idx, new_plan.clone())); } else { trace.clear(); - if new_plan.as_any().is::() { + if is_sort(&new_plan) { trace.push((idx, new_plan.clone())); } } @@ -378,17 +390,15 @@ fn convert_to_sort_exec(sort_any: &Arc) -> Result<&SortExec> fn remove_corresponding_sort_from_sub_plan( sort_onwards: &mut Vec<(usize, Arc)>, ) -> Result> { - let (sort_child_idx, sort_any) = sort_onwards[0].clone(); + let (_, sort_any) = sort_onwards[0].clone(); let sort_exec = convert_to_sort_exec(&sort_any)?; let mut prev_layer = sort_exec.input().clone(); - let mut prev_child_idx = sort_child_idx; // In the loop below, se start from 1 as the first one is a SortExec // and we are removing it from the plan. for (child_idx, layer) in sort_onwards.iter().skip(1) { let mut children = layer.children(); - children[prev_child_idx] = prev_layer; + children[*child_idx] = prev_layer; prev_layer = layer.clone().with_new_children(children)?; - prev_child_idx = *child_idx; } // We have removed the sort, hence empty the sort_onwards: sort_onwards.clear(); @@ -816,6 +826,77 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_union_inputs_different_sorted() -> Result<()> { + let schema = create_test_schema()?; + + let source1 = parquet_exec(&schema); + let sort_exprs = vec![sort_expr("nullable_col", &schema)]; + let sort = sort_exec(sort_exprs.clone(), source1); + + let parquet_sort_exprs = vec![ + sort_expr("nullable_col", &schema), + sort_expr("non_nullable_col", &schema), + ]; + let source2 = parquet_exec_sorted(&schema, parquet_sort_exprs); + + let union = union_exec(vec![source2, sort]); + let physical_plan = sort_preserving_merge_exec(sort_exprs, union); + + // one input to the union is already sorted, one is not. + let expected_input = vec![ + "SortPreservingMergeExec: [nullable_col@0 ASC]", + " UnionExec", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC, non_nullable_col@1 ASC], projection=[nullable_col, non_nullable_col]", + " SortExec: [nullable_col@0 ASC]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + ]; + // should not add a sort at the output of the union, input plan should not be changed + let expected_optimized = expected_input.clone(); + assert_optimized!(expected_input, expected_optimized, physical_plan); + Ok(()) + } + + #[tokio::test] + async fn test_union_inputs_different_sorted2() -> Result<()> { + let schema = create_test_schema()?; + + let source1 = parquet_exec(&schema); + let sort_exprs = vec![ + sort_expr("nullable_col", &schema), + sort_expr("non_nullable_col", &schema), + ]; + let sort = sort_exec(sort_exprs.clone(), source1); + + let parquet_sort_exprs = vec![sort_expr("nullable_col", &schema)]; + let source2 = parquet_exec_sorted(&schema, parquet_sort_exprs); + + let union = union_exec(vec![source2, sort]); + let physical_plan = sort_preserving_merge_exec(sort_exprs, union); + + // Input is an invalid plan. In this case rule should add required sorting in appropriate places. + // First ParquetExec has output ordering(nullable_col@0 ASC). However, it doesn't satisfy required ordering + // of SortPreservingMergeExec. Hence rule should remove unnecessary sort for second child of the UnionExec + // and put a sort above Union to satisfy required ordering. + let expected_input = vec![ + "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]", + " UnionExec", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]", + " SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + ]; + // should remove unnecessary sorting from below and move it to top + let expected_optimized = vec![ + "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]", + " SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]", + " UnionExec", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + ]; + assert_optimized!(expected_input, expected_optimized, physical_plan); + Ok(()) + } + /// make PhysicalSortExpr with default options fn sort_expr(name: &str, schema: &Schema) -> PhysicalSortExpr { sort_expr_options(name, schema, SortOptions::default()) diff --git a/datafusion/core/src/physical_optimizer/utils.rs b/datafusion/core/src/physical_optimizer/utils.rs index ae1b58815fdaf..13e04bbc2ae83 100644 --- a/datafusion/core/src/physical_optimizer/utils.rs +++ b/datafusion/core/src/physical_optimizer/utils.rs @@ -23,10 +23,7 @@ use crate::config::ConfigOptions; use crate::error::Result; use crate::physical_plan::sorts::sort::SortExec; use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan}; -use datafusion_physical_expr::{ - normalize_sort_expr_with_equivalence_properties, EquivalenceProperties, - PhysicalSortExpr, -}; +use datafusion_physical_expr::PhysicalSortExpr; use std::sync::Arc; /// Convenience rule for writing optimizers: recursively invoke @@ -51,56 +48,6 @@ pub fn optimize_children( } } -/// Checks whether given ordering requirements are satisfied by provided [PhysicalSortExpr]s. -pub fn ordering_satisfy EquivalenceProperties>( - provided: Option<&[PhysicalSortExpr]>, - required: Option<&[PhysicalSortExpr]>, - equal_properties: F, -) -> bool { - match (provided, required) { - (_, None) => true, - (None, Some(_)) => false, - (Some(provided), Some(required)) => { - ordering_satisfy_concrete(provided, required, equal_properties) - } - } -} - -pub fn ordering_satisfy_concrete EquivalenceProperties>( - provided: &[PhysicalSortExpr], - required: &[PhysicalSortExpr], - equal_properties: F, -) -> bool { - if required.len() > provided.len() { - false - } else if required - .iter() - .zip(provided.iter()) - .all(|(order1, order2)| order1.eq(order2)) - { - true - } else if let eq_classes @ [_, ..] = equal_properties().classes() { - let normalized_required_exprs = required - .iter() - .map(|e| { - normalize_sort_expr_with_equivalence_properties(e.clone(), eq_classes) - }) - .collect::>(); - let normalized_provided_exprs = provided - .iter() - .map(|e| { - normalize_sort_expr_with_equivalence_properties(e.clone(), eq_classes) - }) - .collect::>(); - normalized_required_exprs - .iter() - .zip(normalized_provided_exprs.iter()) - .all(|(order1, order2)| order1.eq(order2)) - } else { - false - } -} - /// Util function to add SortExec above child /// preserving the original partitioning pub fn add_sort_above_child( diff --git a/datafusion/core/src/physical_plan/common.rs b/datafusion/core/src/physical_plan/common.rs index 2f5f5e5670fec..c3e243d83904a 100644 --- a/datafusion/core/src/physical_plan/common.rs +++ b/datafusion/core/src/physical_plan/common.rs @@ -28,6 +28,8 @@ use arrow::error::ArrowError; use arrow::error::Result as ArrowResult; use arrow::ipc::writer::{FileWriter, IpcWriteOptions}; use arrow::record_batch::RecordBatch; +use datafusion_physical_expr::utils::ordering_satisfy; +use datafusion_physical_expr::PhysicalSortExpr; use futures::{Future, Stream, StreamExt, TryStreamExt}; use log::debug; use pin_project_lite::pin_project; @@ -322,15 +324,76 @@ pub fn transpose(original: Vec>) -> Vec> { } } +/// Calculates the "meet" of children orderings +/// The meet is the finest ordering that satisfied by all the input +/// orderings, see https://en.wikipedia.org/wiki/Join_and_meet. +pub fn get_meet_of_orderings( + children: &[Arc], +) -> Option<&[PhysicalSortExpr]> { + // To find the meet, we first find the smallest input ordering. + let mut smallest: Option<&[PhysicalSortExpr]> = None; + for item in children.iter() { + if let Some(ordering) = item.output_ordering() { + smallest = match smallest { + None => Some(ordering), + Some(expr) if ordering.len() < expr.len() => Some(ordering), + _ => continue, + } + } else { + return None; + } + } + // Check if the smallest ordering is a meet or not: + if children.iter().all(|child| { + ordering_satisfy(child.output_ordering(), smallest, || { + child.equivalence_properties() + }) + }) { + smallest + } else { + None + } +} + #[cfg(test)] mod tests { use super::*; use crate::from_slice::FromSlice; + use crate::physical_plan::memory::MemoryExec; + use crate::physical_plan::sorts::sort::SortExec; + use crate::physical_plan::union::UnionExec; + use arrow::compute::SortOptions; use arrow::{ array::{Float32Array, Float64Array}, datatypes::{DataType, Field, Schema}, record_batch::RecordBatch, }; + use datafusion_physical_expr::expressions::col; + + #[test] + fn test_meet_of_orderings() -> Result<()> { + let schema = Arc::new(Schema::new(vec![ + Field::new("f32", DataType::Float32, false), + Field::new("f64", DataType::Float64, false), + ])); + let sort_expr = vec![PhysicalSortExpr { + expr: col("f32", &schema).unwrap(), + options: SortOptions::default(), + }]; + let memory_exec = Arc::new(MemoryExec::try_new(&[], schema.clone(), None)?) as _; + let sort_exec = Arc::new(SortExec::try_new(sort_expr.clone(), memory_exec, None)?) + as Arc; + let memory_exec2 = Arc::new(MemoryExec::try_new(&[], schema, None)?) as _; + // memory_exec2 doesn't have output ordering + let union_exec = UnionExec::new(vec![sort_exec.clone(), memory_exec2]); + let res = get_meet_of_orderings(union_exec.inputs()); + assert!(res.is_none()); + + let union_exec = UnionExec::new(vec![sort_exec.clone(), sort_exec]); + let res = get_meet_of_orderings(union_exec.inputs()); + assert_eq!(res, Some(&sort_expr[..])); + Ok(()) + } #[test] fn test_compute_record_batch_statistics_empty() -> Result<()> { diff --git a/datafusion/core/src/physical_plan/filter.rs b/datafusion/core/src/physical_plan/filter.rs index 35c20c4cc6cb5..45c1cc25d61dd 100644 --- a/datafusion/core/src/physical_plan/filter.rs +++ b/datafusion/core/src/physical_plan/filter.rs @@ -118,9 +118,9 @@ impl ExecutionPlan for FilterExec { self.input.output_ordering() } - fn maintains_input_order(&self) -> bool { + fn maintains_input_order(&self) -> Vec { // tell optimizer this operator doesn't reorder its input - true + vec![true] } fn equivalence_properties(&self) -> EquivalenceProperties { diff --git a/datafusion/core/src/physical_plan/limit.rs b/datafusion/core/src/physical_plan/limit.rs index c7299fea3ee07..3fa900ca1518b 100644 --- a/datafusion/core/src/physical_plan/limit.rs +++ b/datafusion/core/src/physical_plan/limit.rs @@ -105,8 +105,8 @@ impl ExecutionPlan for GlobalLimitExec { Partitioning::UnknownPartitioning(1) } - fn maintains_input_order(&self) -> bool { - true + fn maintains_input_order(&self) -> Vec { + vec![true] } fn benefits_from_input_partitioning(&self) -> bool { @@ -293,8 +293,8 @@ impl ExecutionPlan for LocalLimitExec { self.input.output_ordering() } - fn maintains_input_order(&self) -> bool { - true + fn maintains_input_order(&self) -> Vec { + vec![true] } fn equivalence_properties(&self) -> EquivalenceProperties { diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs index 72af7b16c6eac..0ec7b16ef1316 100644 --- a/datafusion/core/src/physical_plan/mod.rs +++ b/datafusion/core/src/physical_plan/mod.rs @@ -162,8 +162,8 @@ pub trait ExecutionPlan: Debug + Send + Sync { /// WARNING: if you override this default, you *MUST* ensure that /// the operator's maintains the ordering invariant or else /// DataFusion may produce incorrect results. - fn maintains_input_order(&self) -> bool { - false + fn maintains_input_order(&self) -> Vec { + vec![false; self.children().len()] } /// Returns `true` if this operator would benefit from diff --git a/datafusion/core/src/physical_plan/projection.rs b/datafusion/core/src/physical_plan/projection.rs index e418a8f934442..f699a760b4772 100644 --- a/datafusion/core/src/physical_plan/projection.rs +++ b/datafusion/core/src/physical_plan/projection.rs @@ -196,9 +196,9 @@ impl ExecutionPlan for ProjectionExec { self.output_ordering.as_deref() } - fn maintains_input_order(&self) -> bool { + fn maintains_input_order(&self) -> Vec { // tell optimizer this operator doesn't reorder its input - true + vec![true] } fn equivalence_properties(&self) -> EquivalenceProperties { diff --git a/datafusion/core/src/physical_plan/repartition/mod.rs b/datafusion/core/src/physical_plan/repartition/mod.rs index 8b9d44a99b651..4965979951edc 100644 --- a/datafusion/core/src/physical_plan/repartition/mod.rs +++ b/datafusion/core/src/physical_plan/repartition/mod.rs @@ -331,16 +331,16 @@ impl ExecutionPlan for RepartitionExec { } fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { - if self.maintains_input_order() { + if self.maintains_input_order()[0] { self.input().output_ordering() } else { None } } - fn maintains_input_order(&self) -> bool { + fn maintains_input_order(&self) -> Vec { // We preserve ordering when input partitioning is 1 - self.input().output_partitioning().partition_count() <= 1 + vec![self.input().output_partitioning().partition_count() <= 1] } fn equivalence_properties(&self) -> EquivalenceProperties { diff --git a/datafusion/core/src/physical_plan/union.rs b/datafusion/core/src/physical_plan/union.rs index a0fca80661158..df78058082f59 100644 --- a/datafusion/core/src/physical_plan/union.rs +++ b/datafusion/core/src/physical_plan/union.rs @@ -43,11 +43,12 @@ use super::{ SendableRecordBatchStream, Statistics, }; use crate::execution::context::TaskContext; +use crate::physical_plan::common::get_meet_of_orderings; use crate::{ error::Result, physical_plan::{expressions, metrics::BaselineMetrics}, }; -use datafusion_physical_expr::sort_expr_list_eq_strict_order; +use datafusion_physical_expr::utils::ordering_satisfy; use tokio::macros::support::thread_rng_n; /// `UnionExec`: `UNION ALL` execution plan. @@ -220,55 +221,33 @@ impl ExecutionPlan for UnionExec { } fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { - let first_input_ordering = self.inputs[0].output_ordering(); - // If the Union is not partition aware and all the input ordering spec strictly equal with the first_input_ordering - // Return the first_input_ordering as the output_ordering - // - // It might be too strict here in the case that the input ordering are compatible but not exactly the same. - // For example one input ordering has the ordering spec SortExpr('a','b','c') and the other has the ordering - // spec SortExpr('a'), It is safe to derive the out ordering with the spec SortExpr('a'). - if !self.partition_aware - && first_input_ordering.is_some() - && self - .inputs - .iter() - .map(|plan| plan.output_ordering()) - .all(|ordering| { - ordering.is_some() - && sort_expr_list_eq_strict_order( - ordering.unwrap(), - first_input_ordering.unwrap(), - ) - }) - { - first_input_ordering - } else { - None + // If the Union is partition aware, there is no output ordering. + // Otherwise, the output ordering is the "meet" of its input orderings. + // The meet is the finest ordering that satisfied by all the input + // orderings, see https://en.wikipedia.org/wiki/Join_and_meet. + if self.partition_aware { + return None; } + get_meet_of_orderings(&self.inputs) } - fn maintains_input_order(&self) -> bool { - let first_input_ordering = self.inputs[0].output_ordering(); - // If the Union is not partition aware and all the input - // ordering spec strictly equal with the first_input_ordering, - // then the `UnionExec` maintains the input order - // - // It might be too strict here in the case that the input - // ordering are compatible but not exactly the same. See - // comments in output_ordering - !self.partition_aware - && first_input_ordering.is_some() - && self - .inputs - .iter() - .map(|plan| plan.output_ordering()) - .all(|ordering| { - ordering.is_some() - && sort_expr_list_eq_strict_order( - ordering.unwrap(), - first_input_ordering.unwrap(), - ) + fn maintains_input_order(&self) -> Vec { + // If the Union has an output ordering, it maintains at least one + // child's ordering (i.e. the meet). + // For instance, assume that the first child is SortExpr('a','b','c'), + // the second child is SortExpr('a','b') and the third child is + // SortExpr('a','b'). The output ordering would be SortExpr('a','b'), + // which is the "meet" of all input orderings. In this example, this + // function will return vec![false, true, true], indicating that we + // preserve the orderings for the 2nd and the 3rd children. + self.inputs() + .iter() + .map(|child| { + ordering_satisfy(self.output_ordering(), child.output_ordering(), || { + child.equivalence_properties() }) + }) + .collect() } fn with_new_children( diff --git a/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs b/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs index 13b6d88da18e3..39660742373b0 100644 --- a/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs +++ b/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs @@ -186,8 +186,8 @@ impl ExecutionPlan for BoundedWindowAggExec { self.input().equivalence_properties() } - fn maintains_input_order(&self) -> bool { - true + fn maintains_input_order(&self) -> Vec { + vec![true] } fn with_new_children( diff --git a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs index bd413ad8eac9a..fbd05fa884857 100644 --- a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs +++ b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs @@ -168,8 +168,8 @@ impl ExecutionPlan for WindowAggExec { self.input().output_ordering() } - fn maintains_input_order(&self) -> bool { - true + fn maintains_input_order(&self) -> Vec { + vec![true] } fn required_input_ordering(&self) -> Vec> { diff --git a/datafusion/core/src/scheduler/pipeline/execution.rs b/datafusion/core/src/scheduler/pipeline/execution.rs index b5aa9aff6dde8..18d1aa2aa2454 100644 --- a/datafusion/core/src/scheduler/pipeline/execution.rs +++ b/datafusion/core/src/scheduler/pipeline/execution.rs @@ -239,7 +239,7 @@ impl ExecutionPlan for ProxyExecutionPlan { self.inner.required_input_distribution() } - fn maintains_input_order(&self) -> bool { + fn maintains_input_order(&self) -> Vec { self.inner.maintains_input_order() } diff --git a/datafusion/physical-expr/src/utils.rs b/datafusion/physical-expr/src/utils.rs index ea64d739769f6..d6d5054ffef03 100644 --- a/datafusion/physical-expr/src/utils.rs +++ b/datafusion/physical-expr/src/utils.rs @@ -20,8 +20,8 @@ use crate::expressions::BinaryExpr; use crate::expressions::Column; use crate::expressions::UnKnownColumn; use crate::rewrite::TreeNodeRewritable; -use crate::PhysicalExpr; use crate::PhysicalSortExpr; +use crate::{EquivalenceProperties, PhysicalExpr}; use datafusion_expr::Operator; use arrow::datatypes::SchemaRef; @@ -185,6 +185,56 @@ pub fn normalize_sort_expr_with_equivalence_properties( } } +/// Checks whether given ordering requirements are satisfied by provided [PhysicalSortExpr]s. +pub fn ordering_satisfy EquivalenceProperties>( + provided: Option<&[PhysicalSortExpr]>, + required: Option<&[PhysicalSortExpr]>, + equal_properties: F, +) -> bool { + match (provided, required) { + (_, None) => true, + (None, Some(_)) => false, + (Some(provided), Some(required)) => { + ordering_satisfy_concrete(provided, required, equal_properties) + } + } +} + +pub fn ordering_satisfy_concrete EquivalenceProperties>( + provided: &[PhysicalSortExpr], + required: &[PhysicalSortExpr], + equal_properties: F, +) -> bool { + if required.len() > provided.len() { + false + } else if required + .iter() + .zip(provided.iter()) + .all(|(order1, order2)| order1.eq(order2)) + { + true + } else if let eq_classes @ [_, ..] = equal_properties().classes() { + let normalized_required_exprs = required + .iter() + .map(|e| { + normalize_sort_expr_with_equivalence_properties(e.clone(), eq_classes) + }) + .collect::>(); + let normalized_provided_exprs = provided + .iter() + .map(|e| { + normalize_sort_expr_with_equivalence_properties(e.clone(), eq_classes) + }) + .collect::>(); + normalized_required_exprs + .iter() + .zip(normalized_provided_exprs.iter()) + .all(|(order1, order2)| order1.eq(order2)) + } else { + false + } +} + #[cfg(test)] mod tests { @@ -194,6 +244,7 @@ mod tests { use arrow::compute::SortOptions; use datafusion_common::Result; + use arrow_schema::Schema; use std::sync::Arc; #[test] @@ -342,4 +393,35 @@ mod tests { Ok(()) } + + #[test] + fn test_ordering_satisfy() -> Result<()> { + let crude = vec![PhysicalSortExpr { + expr: Arc::new(Column::new("a", 0)), + options: SortOptions::default(), + }]; + let crude = Some(&crude[..]); + let finer = vec![ + PhysicalSortExpr { + expr: Arc::new(Column::new("a", 0)), + options: SortOptions::default(), + }, + PhysicalSortExpr { + expr: Arc::new(Column::new("b", 1)), + options: SortOptions::default(), + }, + ]; + let finer = Some(&finer[..]); + let empty_schema = &Arc::new(Schema { + fields: vec![], + metadata: Default::default(), + }); + assert!(ordering_satisfy(finer, crude, || { + EquivalenceProperties::new(empty_schema.clone()) + })); + assert!(!ordering_satisfy(crude, finer, || { + EquivalenceProperties::new(empty_schema.clone()) + })); + Ok(()) + } }