diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs b/datafusion/core/src/physical_optimizer/sort_enforcement.rs index 2d785e920a269..703a13a1cb1d5 100644 --- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs +++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs @@ -486,15 +486,19 @@ fn check_alignment( #[cfg(test)] mod tests { use super::*; + use crate::datasource::listing::PartitionedFile; + use crate::datasource::object_store::ObjectStoreUrl; use crate::physical_plan::displayable; + use crate::physical_plan::file_format::{FileScanConfig, ParquetExec}; use crate::physical_plan::filter::FilterExec; use crate::physical_plan::memory::MemoryExec; use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; + use crate::physical_plan::union::UnionExec; use crate::physical_plan::windows::create_window_expr; use crate::prelude::SessionContext; use arrow::compute::SortOptions; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; - use datafusion_common::Result; + use datafusion_common::{Result, Statistics}; use datafusion_expr::{AggregateFunction, WindowFrame, WindowFunction}; use datafusion_physical_expr::expressions::{col, NotExpr}; use datafusion_physical_expr::PhysicalSortExpr; @@ -813,6 +817,33 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_union_inputs_sorted() -> Result<()> { + let schema = create_test_schema()?; + + let source1 = parquet_exec(&schema); + let sort_exprs = vec![sort_expr("nullable_col", &schema)]; + let sort = sort_exec(sort_exprs.clone(), source1); + + let source2 = parquet_exec_sorted(&schema, sort_exprs.clone()); + + let union = union_exec(vec![source2, sort]); + let physical_plan = sort_preserving_merge_exec(sort_exprs, union); + + // one input to the union is already sorted, one is not. + let expected_input = vec![ + "SortPreservingMergeExec: [nullable_col@0 ASC]", + " UnionExec", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]", + " SortExec: [nullable_col@0 ASC]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + ]; + // should not add a sort at the output of the union, input plan should not be changed + let expected_optimized = expected_input.clone(); + assert_optimized!(expected_input, expected_optimized, physical_plan); + Ok(()) + } + /// make PhysicalSortExpr with default options fn sort_expr(name: &str, schema: &Schema) -> PhysicalSortExpr { sort_expr_options(name, schema, SortOptions::default()) @@ -856,4 +887,51 @@ mod tests { ) -> Arc { Arc::new(FilterExec::try_new(predicate, input).unwrap()) } + + /// Create a non sorted parquet exec + fn parquet_exec(schema: &SchemaRef) -> Arc { + Arc::new(ParquetExec::new( + FileScanConfig { + object_store_url: ObjectStoreUrl::parse("test:///").unwrap(), + file_schema: schema.clone(), + file_groups: vec![vec![PartitionedFile::new("x".to_string(), 100)]], + statistics: Statistics::default(), + projection: None, + limit: None, + table_partition_cols: vec![], + output_ordering: None, + infinite_source: false, + }, + None, + None, + )) + } + + // Created a sorted parquet exec + fn parquet_exec_sorted( + schema: &SchemaRef, + sort_exprs: impl IntoIterator, + ) -> Arc { + let sort_exprs = sort_exprs.into_iter().collect(); + + Arc::new(ParquetExec::new( + FileScanConfig { + object_store_url: ObjectStoreUrl::parse("test:///").unwrap(), + file_schema: schema.clone(), + file_groups: vec![vec![PartitionedFile::new("x".to_string(), 100)]], + statistics: Statistics::default(), + projection: None, + limit: None, + table_partition_cols: vec![], + output_ordering: Some(sort_exprs), + infinite_source: false, + }, + None, + None, + )) + } + + fn union_exec(input: Vec>) -> Arc { + Arc::new(UnionExec::new(input)) + } } diff --git a/datafusion/core/src/physical_plan/union.rs b/datafusion/core/src/physical_plan/union.rs index 921a0d99f03ef..a0fca80661158 100644 --- a/datafusion/core/src/physical_plan/union.rs +++ b/datafusion/core/src/physical_plan/union.rs @@ -247,6 +247,30 @@ impl ExecutionPlan for UnionExec { } } + fn maintains_input_order(&self) -> bool { + let first_input_ordering = self.inputs[0].output_ordering(); + // If the Union is not partition aware and all the input + // ordering spec strictly equal with the first_input_ordering, + // then the `UnionExec` maintains the input order + // + // It might be too strict here in the case that the input + // ordering are compatible but not exactly the same. See + // comments in output_ordering + !self.partition_aware + && first_input_ordering.is_some() + && self + .inputs + .iter() + .map(|plan| plan.output_ordering()) + .all(|ordering| { + ordering.is_some() + && sort_expr_list_eq_strict_order( + ordering.unwrap(), + first_input_ordering.unwrap(), + ) + }) + } + fn with_new_children( self: Arc, children: Vec>,