From 76cde39720cd1a3626081bc217d840a03756de3a Mon Sep 17 00:00:00 2001 From: yangzhong Date: Thu, 22 Dec 2022 15:36:56 +0800 Subject: [PATCH 1/7] Extract the global sort algorithm selection from the BasicEnforcement to be a separate rule, GlobalSortSelection --- datafusion/core/src/execution/context.rs | 2 + .../src/physical_optimizer/enforcement.rs | 31 ------- .../global_sort_selection.rs | 91 +++++++++++++++++++ datafusion/core/src/physical_optimizer/mod.rs | 1 + datafusion/core/src/physical_plan/planner.rs | 5 + 5 files changed, 99 insertions(+), 31 deletions(-) create mode 100644 datafusion/core/src/physical_optimizer/global_sort_selection.rs diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index 58d414e3abd5f..03c0d0b78fd36 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -100,6 +100,7 @@ use url::Url; use crate::catalog::listing_schema::ListingSchemaProvider; use crate::datasource::object_store::ObjectStoreUrl; use crate::execution::memory_pool::MemoryPool; +use crate::physical_optimizer::global_sort_selection::GlobalSortSelection; use uuid::Uuid; use super::options::{ @@ -1579,6 +1580,7 @@ impl SessionState { let mut physical_optimizers: Vec> = vec![ Arc::new(AggregateStatistics::new()), + Arc::new(GlobalSortSelection::new()), Arc::new(JoinSelection::new()), ]; physical_optimizers.push(Arc::new(BasicEnforcement::new())); diff --git a/datafusion/core/src/physical_optimizer/enforcement.rs b/datafusion/core/src/physical_optimizer/enforcement.rs index 3da9d24773a73..3e7a118d8e8cc 100644 --- a/datafusion/core/src/physical_optimizer/enforcement.rs +++ b/datafusion/core/src/physical_optimizer/enforcement.rs @@ -31,7 +31,6 @@ use crate::physical_plan::repartition::RepartitionExec; use crate::physical_plan::rewrite::TreeNodeRewritable; use crate::physical_plan::sorts::sort::SortExec; use crate::physical_plan::sorts::sort::SortOptions; -use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use crate::physical_plan::windows::WindowAggExec; use crate::physical_plan::Partitioning; use crate::physical_plan::{with_new_children_if_necessary, Distribution, ExecutionPlan}; @@ -846,36 +845,6 @@ fn ensure_distribution_and_ordering( if plan.children().is_empty() { return Ok(plan); } - // It's mainly for changing the single node global SortExec to - // the SortPreservingMergeExec with multiple local SortExec. - // What's more, if limit exists, it can also be pushed down to the local sort - let plan = plan - .as_any() - .downcast_ref::() - .and_then(|sort_exec| { - // There are three situations that there's no need for this optimization - // - There's only one input partition; - // - It's already preserving the partitioning so that it can be regarded as a local sort - // - There's no limit pushed down to the local sort (It's still controversial) - if sort_exec.input().output_partitioning().partition_count() > 1 - && !sort_exec.preserve_partitioning() - && sort_exec.fetch().is_some() - { - let sort = SortExec::new_with_partitioning( - sort_exec.expr().to_vec(), - sort_exec.input().clone(), - true, - sort_exec.fetch(), - ); - Some(Arc::new(SortPreservingMergeExec::new( - sort_exec.expr().to_vec(), - Arc::new(sort), - ))) - } else { - None - } - }) - .map_or(plan, |new_plan| new_plan); let required_input_distributions = plan.required_input_distribution(); let required_input_orderings = plan.required_input_ordering(); diff --git a/datafusion/core/src/physical_optimizer/global_sort_selection.rs b/datafusion/core/src/physical_optimizer/global_sort_selection.rs new file mode 100644 index 0000000000000..308f76af65f01 --- /dev/null +++ b/datafusion/core/src/physical_optimizer/global_sort_selection.rs @@ -0,0 +1,91 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Select the efficient global sort implementation based on sort details. + +use std::sync::Arc; + +use crate::error::Result; +use crate::physical_optimizer::PhysicalOptimizerRule; +use crate::physical_plan::rewrite::TreeNodeRewritable; +use crate::physical_plan::sorts::sort::SortExec; +use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; +use crate::physical_plan::ExecutionPlan; +use crate::prelude::SessionConfig; + +/// Currently for a sort operator, if +/// - there are more than one input partitions +/// - and there's some limit which can be pushed down to each of its input partitions +/// then [SortPreservingMergeExec] with local sort with a limit pushed down will be preferred; +/// Otherwise, the normal global sort [SortExec] will be used. +/// Later more intelligent statistics-based decision can also be introduced. +/// For example, for a small data set, the global sort may be efficient enough +#[derive(Default)] +pub struct GlobalSortSelection {} + +impl GlobalSortSelection { + #[allow(missing_docs)] + pub fn new() -> Self { + Self {} + } +} + +impl PhysicalOptimizerRule for GlobalSortSelection { + fn optimize( + &self, + plan: Arc, + config: &SessionConfig, + ) -> Result> { + plan.transform_up(&|plan| { + Ok(plan + .as_any() + .downcast_ref::() + .and_then(|sort_exec| { + // Temporary use the config target partition number to pass through the unit test. + // Later it will be changed to use the input output partition number. + if config.target_partitions() > 1 + && sort_exec.fetch().is_some() + // It's already preserving the partitioning so that it can be regarded as a local sort + && !sort_exec.preserve_partitioning() + { + let sort = SortExec::new_with_partitioning( + sort_exec.expr().to_vec(), + sort_exec.input().clone(), + true, + sort_exec.fetch(), + ); + let global_sort: Arc = + Arc::new(SortPreservingMergeExec::new( + sort_exec.expr().to_vec(), + Arc::new(sort), + )); + Some(global_sort) + } else { + None + } + })) + }) + } + + fn name(&self) -> &str { + "global_sort_selection" + } + + fn schema_check(&self) -> bool { + false + } +} diff --git a/datafusion/core/src/physical_optimizer/mod.rs b/datafusion/core/src/physical_optimizer/mod.rs index 36b00a0e01bcd..fdd748178e1e2 100644 --- a/datafusion/core/src/physical_optimizer/mod.rs +++ b/datafusion/core/src/physical_optimizer/mod.rs @@ -21,6 +21,7 @@ pub mod aggregate_statistics; pub mod coalesce_batches; pub mod enforcement; +pub mod global_sort_selection; pub mod join_selection; pub mod optimizer; pub mod pruning; diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index 84c75f30bc50d..50f54aee19975 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -1795,6 +1795,11 @@ impl DefaultPhysicalPlanner { new_plan.schema() ))); } + trace!( + "Optimized physical plan by {}:\n{}\n", + optimizer.name(), + displayable(new_plan.as_ref()).indent() + ); observer(new_plan.as_ref(), optimizer.as_ref()) } debug!( From 46ddd99ecfd9d8393409fe617f3ee4c27ee0ea70 Mon Sep 17 00:00:00 2001 From: yangzhong Date: Thu, 22 Dec 2022 16:13:20 +0800 Subject: [PATCH 2/7] Make the optimizer rule of Repartition optional --- datafusion/core/src/config.rs | 9 +++++++++ datafusion/core/src/execution/context.rs | 13 +++++++++++-- .../sqllogictests/test_files/information_schema.slt | 1 + docs/source/user-guide/configs.md | 1 + 4 files changed, 22 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/config.rs b/datafusion/core/src/config.rs index 1c98c83ca7ba4..e0c08ae13681a 100644 --- a/datafusion/core/src/config.rs +++ b/datafusion/core/src/config.rs @@ -133,6 +133,10 @@ pub const OPT_PREFER_HASH_JOIN: &str = "datafusion.optimizer.prefer_hash_join"; pub const OPT_HASH_JOIN_SINGLE_PARTITION_THRESHOLD: &str = "datafusion.optimizer.hash_join_single_partition_threshold"; +/// Configuration option "datafusion.execution.round_robin_repartition" +pub const OPT_ENABLE_ROUND_ROBIN_REPARTITION: &str = + "datafusion.optimizer.enable_round_robin_repartition"; + /// Definition of a configuration option pub struct ConfigDefinition { /// key used to identifier this configuration option @@ -411,6 +415,11 @@ impl BuiltInConfigs { "The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition", 1024 * 1024, ), + ConfigDefinition::new_bool( + OPT_ENABLE_ROUND_ROBIN_REPARTITION, + "When set to true, the physical plan optimizer will try to add round robin repartition to increase parallelism to leverage more CPU cores", + true, + ), ] } } diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index 03c0d0b78fd36..1fc7f3e0b9bb1 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -76,7 +76,8 @@ use crate::physical_optimizer::repartition::Repartition; use crate::config::{ ConfigOptions, OPT_BATCH_SIZE, OPT_COALESCE_BATCHES, OPT_COALESCE_TARGET_BATCH_SIZE, - OPT_FILTER_NULL_JOIN_KEYS, OPT_OPTIMIZER_MAX_PASSES, OPT_OPTIMIZER_SKIP_FAILED_RULES, + OPT_ENABLE_ROUND_ROBIN_REPARTITION, OPT_FILTER_NULL_JOIN_KEYS, + OPT_OPTIMIZER_MAX_PASSES, OPT_OPTIMIZER_SKIP_FAILED_RULES, }; use crate::execution::{runtime_env::RuntimeEnv, FunctionRegistry}; use crate::physical_optimizer::enforcement::BasicEnforcement; @@ -1600,7 +1601,15 @@ impl SessionState { .unwrap(), ))); } - physical_optimizers.push(Arc::new(Repartition::new())); + // It's for increasing the parallelism by introducing round robin repartition + if config + .config_options + .read() + .get_bool(OPT_ENABLE_ROUND_ROBIN_REPARTITION) + .unwrap_or_default() + { + physical_optimizers.push(Arc::new(Repartition::new())); + } // Repartition rule could introduce additional RepartitionExec with RoundRobin partitioning. // To make sure the SinglePartition is satisfied, run the BasicEnforcement again, originally it was the AddCoalescePartitionsExec here. physical_optimizers.push(Arc::new(BasicEnforcement::new())); diff --git a/datafusion/core/tests/sqllogictests/test_files/information_schema.slt b/datafusion/core/tests/sqllogictests/test_files/information_schema.slt index e90fc2c12edb9..f4fc8667d9fd2 100644 --- a/datafusion/core/tests/sqllogictests/test_files/information_schema.slt +++ b/datafusion/core/tests/sqllogictests/test_files/information_schema.slt @@ -43,6 +43,7 @@ datafusion.execution.target_partitions 7 datafusion.execution.time_zone +00:00 datafusion.explain.logical_plan_only false datafusion.explain.physical_plan_only false +datafusion.optimizer.enable_round_robin_repartition true datafusion.optimizer.filter_null_join_keys false datafusion.optimizer.hash_join_single_partition_threshold 1048576 datafusion.optimizer.max_passes 3 diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 039981338b6d3..1c5f086563888 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -56,6 +56,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | then extract the hour. | | datafusion.explain.logical_plan_only | Boolean | false | When set to true, the explain statement will only print logical plans. | | datafusion.explain.physical_plan_only | Boolean | false | When set to true, the explain statement will only print physical plans. | +| datafusion.optimizer.enable_round_robin_repartition | Boolean | true | When set to true, the physical plan optimizer will try to add round robin repartition to increase parallelism to leverage more CPU cores | | datafusion.optimizer.filter_null_join_keys | Boolean | false | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down. | | datafusion.optimizer.hash_join_single_partition_threshold | UInt64 | 1048576 | The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition | | datafusion.optimizer.max_passes | UInt64 | 3 | Number of times that the optimizer will attempt to optimize the plan | From 48503834367b60d3a368c961f8104980be478da4 Mon Sep 17 00:00:00 2001 From: yangzhong Date: Fri, 23 Dec 2022 12:11:02 +0800 Subject: [PATCH 3/7] Fix EmptyExec data() method --- datafusion/core/src/physical_plan/empty.rs | 26 ++++++++++++++++------ 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/datafusion/core/src/physical_plan/empty.rs b/datafusion/core/src/physical_plan/empty.rs index 4751dade1ddab..b71f6739b65b6 100644 --- a/datafusion/core/src/physical_plan/empty.rs +++ b/datafusion/core/src/physical_plan/empty.rs @@ -24,7 +24,7 @@ use crate::error::{DataFusionError, Result}; use crate::physical_plan::{ memory::MemoryStream, DisplayFormatType, ExecutionPlan, Partitioning, }; -use arrow::array::NullArray; +use arrow::array::{ArrayRef, NullArray}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use log::debug; @@ -68,13 +68,25 @@ impl EmptyExec { fn data(&self) -> Result> { let batch = if self.produce_one_row { + let n_field = self.schema.fields.len(); + // hack for https://github.com/apache/arrow-datafusion/pull/3242 + let n_field = if n_field == 0 { 1 } else { n_field }; vec![RecordBatch::try_new( - Arc::new(Schema::new(vec![Field::new( - "placeholder", - DataType::Null, - true, - )])), - vec![Arc::new(NullArray::new(1))], + Arc::new(Schema::new( + (0..n_field) + .into_iter() + .map(|i| { + Field::new(format!("placeholder_{}", i), DataType::Null, true) + }) + .collect(), + )), + (0..n_field) + .into_iter() + .map(|_i| { + let ret: ArrayRef = Arc::new(NullArray::new(1)); + ret + }) + .collect(), )?] } else { vec![] From b95ba8676c0789b96aa17524bf427913866c39ee Mon Sep 17 00:00:00 2001 From: yangzhong Date: Thu, 22 Dec 2022 17:22:03 +0800 Subject: [PATCH 4/7] Reorder the physical plan optimizer rules --- datafusion/core/src/execution/context.rs | 53 +++++++++++++------ .../physical_optimizer/coalesce_batches.rs | 13 ++++- .../global_sort_selection.rs | 6 +-- .../src/physical_optimizer/repartition.rs | 49 +++++++---------- datafusion/core/tests/sql/joins.rs | 10 ++-- datafusion/core/tests/sql/timestamp.rs | 24 ++++----- datafusion/core/tests/sql/union.rs | 4 +- datafusion/core/tests/sql/window.rs | 19 ++++--- 8 files changed, 100 insertions(+), 78 deletions(-) diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index 1fc7f3e0b9bb1..7e4f515c5b70d 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -1579,12 +1579,43 @@ impl SessionState { .register_catalog(config.default_catalog.clone(), default_catalog); } - let mut physical_optimizers: Vec> = vec![ - Arc::new(AggregateStatistics::new()), - Arc::new(GlobalSortSelection::new()), - Arc::new(JoinSelection::new()), - ]; + // We need to take care of the rule ordering. They may influence each other. + let mut physical_optimizers: Vec> = + vec![Arc::new(AggregateStatistics::new())]; + // - In order to increase the parallelism, it will change the output partitioning + // of some operators in the plan tree, which will influence other rules. + // Therefore, it should be run as soon as possible. + // - The reason to make it optional is + // - it's not used for the distributed engine, Ballista. + // - it's conflicted with some parts of the BasicEnforcement, since it will + // introduce additional repartitioning while the BasicEnforcement aims at + // reducing unnecessary repartitioning. + if config + .config_options + .read() + .get_bool(OPT_ENABLE_ROUND_ROBIN_REPARTITION) + .unwrap_or_default() + { + physical_optimizers.push(Arc::new(Repartition::new())); + } + //- Currently it will depend on the partition number to decide whether to change the + // single node sort to parallel local sort and merge. Therefore, it should be run + // after the Repartition. + // - Since it will change the output ordering of some operators, it should be run + // before JoinSelection and BasicEnforcement, which may depend on that. + physical_optimizers.push(Arc::new(GlobalSortSelection::new())); + // Statistics-base join selection will change the Auto mode to real join implementation, + // like collect left, or hash join, or future sort merge join, which will + // influence the BasicEnforcement to decide whether to add additional repartition + // and local sort to meet the distribution and ordering requirements. + // Therefore, it should be run before BasicEnforcement + physical_optimizers.push(Arc::new(JoinSelection::new())); + // It's for adding essential repartition and local sorting operator to satisfy the + // required distribution and local sort. + // Please make sure that the whole plan tree is determined. physical_optimizers.push(Arc::new(BasicEnforcement::new())); + // It will not influence the distribution and ordering of the whole plan tree. + // Therefore, to avoid influencing other rules, it should be run at last. if config .config_options .read() @@ -1601,18 +1632,6 @@ impl SessionState { .unwrap(), ))); } - // It's for increasing the parallelism by introducing round robin repartition - if config - .config_options - .read() - .get_bool(OPT_ENABLE_ROUND_ROBIN_REPARTITION) - .unwrap_or_default() - { - physical_optimizers.push(Arc::new(Repartition::new())); - } - // Repartition rule could introduce additional RepartitionExec with RoundRobin partitioning. - // To make sure the SinglePartition is satisfied, run the BasicEnforcement again, originally it was the AddCoalescePartitionsExec here. - physical_optimizers.push(Arc::new(BasicEnforcement::new())); SessionState { session_id, diff --git a/datafusion/core/src/physical_optimizer/coalesce_batches.rs b/datafusion/core/src/physical_optimizer/coalesce_batches.rs index 941c5c14148ec..e0d20be16646d 100644 --- a/datafusion/core/src/physical_optimizer/coalesce_batches.rs +++ b/datafusion/core/src/physical_optimizer/coalesce_batches.rs @@ -23,7 +23,7 @@ use crate::{ physical_optimizer::PhysicalOptimizerRule, physical_plan::{ coalesce_batches::CoalesceBatchesExec, filter::FilterExec, joins::HashJoinExec, - repartition::RepartitionExec, rewrite::TreeNodeRewritable, + repartition::RepartitionExec, rewrite::TreeNodeRewritable, Partitioning, }, }; use std::sync::Arc; @@ -57,7 +57,16 @@ impl PhysicalOptimizerRule for CoalesceBatches { // See https://github.com/apache/arrow-datafusion/issues/139 let wrap_in_coalesce = plan_any.downcast_ref::().is_some() || plan_any.downcast_ref::().is_some() - || plan_any.downcast_ref::().is_some(); + // Don't need to add CoalesceBatchesExec after a round robin RepartitionExec + || plan_any + .downcast_ref::() + .map(|repart_exec| { + !matches!( + repart_exec.partitioning().clone(), + Partitioning::RoundRobinBatch(_) + ) + }) + .unwrap_or(false); if wrap_in_coalesce { Ok(Some(Arc::new(CoalesceBatchesExec::new( plan.clone(), diff --git a/datafusion/core/src/physical_optimizer/global_sort_selection.rs b/datafusion/core/src/physical_optimizer/global_sort_selection.rs index 308f76af65f01..a6bb8229c05bf 100644 --- a/datafusion/core/src/physical_optimizer/global_sort_selection.rs +++ b/datafusion/core/src/physical_optimizer/global_sort_selection.rs @@ -48,16 +48,14 @@ impl PhysicalOptimizerRule for GlobalSortSelection { fn optimize( &self, plan: Arc, - config: &SessionConfig, + _config: &SessionConfig, ) -> Result> { plan.transform_up(&|plan| { Ok(plan .as_any() .downcast_ref::() .and_then(|sort_exec| { - // Temporary use the config target partition number to pass through the unit test. - // Later it will be changed to use the input output partition number. - if config.target_partitions() > 1 + if sort_exec.input().output_partitioning().partition_count() > 1 && sort_exec.fetch().is_some() // It's already preserving the partitioning so that it can be regarded as a local sort && !sort_exec.preserve_partitioning() diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs index 42c5e0c3f7436..baa812adee1a4 100644 --- a/datafusion/core/src/physical_optimizer/repartition.rs +++ b/datafusion/core/src/physical_optimizer/repartition.rs @@ -143,7 +143,7 @@ impl Repartition { /// is an intervening node that does not `maintain_input_order` /// /// if `can_reorder` is false, means that the output of this node -/// can not be reordered as as something upstream is relying on that order +/// can not be reordered as as the final output is relying on that order /// /// If 'would_benefit` is false, the upstream operator doesn't /// benefit from additional repartition @@ -161,26 +161,6 @@ fn optimize_partitions( // leaf node - don't replace children plan } else { - let can_reorder_children = - match (plan.relies_on_input_order(), plan.maintains_input_order()) { - (true, _) => { - // `plan` itself relies on the order of its - // children, so don't reorder them! - false - } - (false, false) => { - // `plan` may reorder the input itself, so no need - // to preserve the order of any children - true - } - (false, true) => { - // `plan` will maintain the order, so we can only - // repartition children if it is ok to reorder the - // output of this node - can_reorder - } - }; - let children = plan .children() .iter() @@ -188,7 +168,7 @@ fn optimize_partitions( optimize_partitions( target_partitions, child.clone(), - can_reorder_children, + can_reorder || child.output_ordering().is_none(), plan.benefits_from_input_partitioning(), ) }) @@ -197,7 +177,7 @@ fn optimize_partitions( }; // decide if we should bother trying to repartition the output of this plan - let could_repartition = match new_plan.output_partitioning() { + let mut could_repartition = match new_plan.output_partitioning() { // Apply when underlying node has less than `self.target_partitions` amount of concurrency RoundRobinBatch(x) => x < target_partitions, UnknownPartitioning(x) => x < target_partitions, @@ -206,6 +186,13 @@ fn optimize_partitions( Hash(_, _) => false, }; + // Don't need to apply when the returned row count is not greater than 1 + let stats = new_plan.statistics(); + if stats.is_exact { + could_repartition = could_repartition + && stats.num_rows.map(|num_rows| num_rows > 1).unwrap_or(true); + } + if would_benefit && could_repartition && can_reorder { Ok(Arc::new(RepartitionExec::try_new( new_plan, @@ -226,7 +213,12 @@ impl PhysicalOptimizerRule for Repartition { if config.target_partitions() == 1 { Ok(plan) } else { - optimize_partitions(config.target_partitions(), plan, false, false) + optimize_partitions( + config.target_partitions(), + plan.clone(), + plan.output_ordering().is_none(), + false, + ) } } @@ -546,12 +538,12 @@ mod tests { } #[test] - fn repartition_ignores_sort_preserving_merge() -> Result<()> { + fn repartition_with_preserving_merge() -> Result<()> { let plan = sort_preserving_merge_exec(parquet_exec()); let expected = &[ "SortPreservingMergeExec: [c1@0 ASC]", - // Expect no repartition of SortPreservingMergeExec + "RepartitionExec: partitioning=RoundRobinBatch(10)", "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]", ]; @@ -560,14 +552,13 @@ mod tests { } #[test] - fn repartition_does_not_repartition_transitively() -> Result<()> { + fn repartition_transitively() -> Result<()> { let plan = sort_preserving_merge_exec(projection_exec(parquet_exec())); let expected = &[ "SortPreservingMergeExec: [c1@0 ASC]", - // Expect no repartition of SortPreservingMergeExec - // even though there is a projection exec between it "ProjectionExec: expr=[c1@0 as c1]", + "RepartitionExec: partitioning=RoundRobinBatch(10)", "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]", ]; diff --git a/datafusion/core/tests/sql/joins.rs b/datafusion/core/tests/sql/joins.rs index 7e701b56dfc53..b6c78b0cf3814 100644 --- a/datafusion/core/tests/sql/joins.rs +++ b/datafusion/core/tests/sql/joins.rs @@ -754,7 +754,7 @@ async fn cross_join() { assert_eq!(4 * 4, actual.len()); - let sql = "SELECT t1_id, t1_name, t2_name FROM t1 CROSS JOIN t2"; + let sql = "SELECT t1_id, t1_name, t2_name FROM t1 CROSS JOIN t2 ORDER BY t1_id"; let actual = execute(&ctx, sql).await; assert_eq!(4 * 4, actual.len()); @@ -2201,8 +2201,8 @@ async fn right_semi_join() -> Result<()> { "SortExec: [t1_id@0 ASC NULLS LAST]", " CoalescePartitionsExec", " ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as t1_int]", - " CoalesceBatchesExec: target_batch_size=4096", - " RepartitionExec: partitioning=RoundRobinBatch(2)", + " RepartitionExec: partitioning=RoundRobinBatch(2)", + " CoalesceBatchesExec: target_batch_size=4096", " HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(Column { name: \"t2_id\", index: 0 }, Column { name: \"t1_id\", index: 0 })], filter=BinaryExpr { left: Column { name: \"t2_name\", index: 1 }, op: NotEq, right: Column { name: \"t1_name\", index: 0 } }", " MemoryExec: partitions=1, partition_sizes=[1]", " MemoryExec: partitions=1, partition_sizes=[1]", @@ -2539,8 +2539,8 @@ async fn left_side_expr_key_inner_join() -> Result<()> { vec![ "ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id, t1_name@1 as t1_name]", " ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t2_id@3 as t2_id]", - " CoalesceBatchesExec: target_batch_size=4096", - " RepartitionExec: partitioning=RoundRobinBatch(2)", + " RepartitionExec: partitioning=RoundRobinBatch(2)", + " CoalesceBatchesExec: target_batch_size=4096", " HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(Column { name: \"t1.t1_id + Int64(11)\", index: 2 }, Column { name: \"t2_id\", index: 0 })]", " CoalescePartitionsExec", " ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + CAST(11 AS UInt32) as t1.t1_id + Int64(11)]", diff --git a/datafusion/core/tests/sql/timestamp.rs b/datafusion/core/tests/sql/timestamp.rs index cb70ab2d0adc1..c9b851c8e9805 100644 --- a/datafusion/core/tests/sql/timestamp.rs +++ b/datafusion/core/tests/sql/timestamp.rs @@ -557,7 +557,7 @@ async fn timestamp_coercion() -> Result<()> { ctx.register_table("table_a", table_a)?; ctx.register_table("table_b", table_b)?; - let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b"; + let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b order by table_a.ts desc, table_b.ts desc"; let actual = execute_to_batches(&ctx, sql).await; let expected = vec![ "+---------------------------+-------------------------------+-------------------------+", @@ -584,7 +584,7 @@ async fn timestamp_coercion() -> Result<()> { ctx.register_table("table_a", table_a)?; ctx.register_table("table_b", table_b)?; - let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b"; + let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b order by table_a.ts desc, table_b.ts desc"; let actual = execute_to_batches(&ctx, sql).await; let expected = vec![ "+---------------------+----------------------------+-------------------------+", @@ -612,7 +612,7 @@ async fn timestamp_coercion() -> Result<()> { ctx.register_table("table_a", table_a)?; ctx.register_table("table_b", table_b)?; - let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b"; + let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b order by table_a.ts desc, table_b.ts desc"; let actual = execute_to_batches(&ctx, sql).await; let expected = vec![ "+---------------------+----------------------------+-------------------------+", @@ -639,7 +639,7 @@ async fn timestamp_coercion() -> Result<()> { ctx.register_table("table_a", table_a)?; ctx.register_table("table_b", table_b)?; - let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b"; + let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b order by table_a.ts desc, table_b.ts desc"; let actual = execute_to_batches(&ctx, sql).await; let expected = vec![ "+-------------------------+---------------------+-------------------------+", @@ -666,7 +666,7 @@ async fn timestamp_coercion() -> Result<()> { ctx.register_table("table_a", table_a)?; ctx.register_table("table_b", table_b)?; - let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b"; + let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b order by table_a.ts desc, table_b.ts desc"; let actual = execute_to_batches(&ctx, sql).await; let expected = vec![ "+-------------------------+----------------------------+-------------------------+", @@ -693,7 +693,7 @@ async fn timestamp_coercion() -> Result<()> { ctx.register_table("table_a", table_a)?; ctx.register_table("table_b", table_b)?; - let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b"; + let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b order by table_a.ts desc, table_b.ts desc"; let actual = execute_to_batches(&ctx, sql).await; let expected = vec![ "+-------------------------+----------------------------+-------------------------+", @@ -720,7 +720,7 @@ async fn timestamp_coercion() -> Result<()> { ctx.register_table("table_a", table_a)?; ctx.register_table("table_b", table_b)?; - let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b"; + let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b order by table_a.ts desc, table_b.ts desc"; let actual = execute_to_batches(&ctx, sql).await; let expected = vec![ "+----------------------------+---------------------+-------------------------+", @@ -747,7 +747,7 @@ async fn timestamp_coercion() -> Result<()> { ctx.register_table("table_a", table_a)?; ctx.register_table("table_b", table_b)?; - let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b"; + let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b order by table_a.ts desc, table_b.ts desc"; let actual = execute_to_batches(&ctx, sql).await; let expected = vec![ "+----------------------------+-------------------------+-------------------------+", @@ -774,7 +774,7 @@ async fn timestamp_coercion() -> Result<()> { ctx.register_table("table_a", table_a)?; ctx.register_table("table_b", table_b)?; - let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b"; + let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b order by table_a.ts desc, table_b.ts desc"; let actual = execute_to_batches(&ctx, sql).await; let expected = vec![ "+----------------------------+----------------------------+-------------------------+", @@ -801,7 +801,7 @@ async fn timestamp_coercion() -> Result<()> { ctx.register_table("table_a", table_a)?; ctx.register_table("table_b", table_b)?; - let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b"; + let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b order by table_a.ts desc, table_b.ts desc"; let actual = execute_to_batches(&ctx, sql).await; let expected = vec![ "+----------------------------+---------------------+-------------------------+", @@ -828,7 +828,7 @@ async fn timestamp_coercion() -> Result<()> { ctx.register_table("table_a", table_a)?; ctx.register_table("table_b", table_b)?; - let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b"; + let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b order by table_a.ts desc, table_b.ts desc"; let actual = execute_to_batches(&ctx, sql).await; let expected = vec![ "+----------------------------+-------------------------+-------------------------+", @@ -855,7 +855,7 @@ async fn timestamp_coercion() -> Result<()> { ctx.register_table("table_a", table_a)?; ctx.register_table("table_b", table_b)?; - let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b"; + let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b order by table_a.ts desc, table_b.ts desc"; let actual = execute_to_batches(&ctx, sql).await; let expected = vec![ "+----------------------------+----------------------------+-------------------------+", diff --git a/datafusion/core/tests/sql/union.rs b/datafusion/core/tests/sql/union.rs index d5519ce2d0024..29856a37b1a96 100644 --- a/datafusion/core/tests/sql/union.rs +++ b/datafusion/core/tests/sql/union.rs @@ -86,7 +86,7 @@ async fn union_schemas() -> Result<()> { SessionContext::with_config(SessionConfig::new().with_information_schema(true)); let result = ctx - .sql("SELECT 1 A UNION ALL SELECT 2") + .sql("SELECT 1 A UNION ALL SELECT 2 order by 1") .await .unwrap() .collect() @@ -105,7 +105,7 @@ async fn union_schemas() -> Result<()> { assert_batches_eq!(expected, &result); let result = ctx - .sql("SELECT 1 UNION SELECT 2") + .sql("SELECT 1 UNION SELECT 2 order by 1") .await .unwrap() .collect() diff --git a/datafusion/core/tests/sql/window.rs b/datafusion/core/tests/sql/window.rs index c9ef64212a6b5..6674856db0c42 100644 --- a/datafusion/core/tests/sql/window.rs +++ b/datafusion/core/tests/sql/window.rs @@ -1633,7 +1633,10 @@ async fn test_window_frame_nth_value_aggregate() -> Result<()> { #[tokio::test] async fn test_window_agg_sort() -> Result<()> { - let ctx = SessionContext::new(); + // We need to specify the target partition number. + // Otherwise, the default value used may vary on different environment + // with different cpu core number, which may cause the UT failure. + let ctx = SessionContext::with_config(SessionConfig::new().with_target_partitions(2)); register_aggregate_csv(&ctx).await?; let sql = "SELECT c9, @@ -1649,9 +1652,10 @@ async fn test_window_agg_sort() -> Result<()> { let expected = { vec![ "ProjectionExec: expr=[c9@3 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@0 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as sum2]", - " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]", + " RepartitionExec: partitioning=RoundRobinBatch(2)", " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]", - " SortExec: [c9@1 ASC NULLS LAST,c8@0 ASC NULLS LAST]", + " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]", + " SortExec: [c9@1 ASC NULLS LAST,c8@0 ASC NULLS LAST]", ] }; @@ -1681,10 +1685,11 @@ async fn over_order_by_sort_keys_sorting_prefix_compacting() -> Result<()> { let expected = { vec![ "ProjectionExec: expr=[c2@3 as c2, MAX(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as MAX(aggregate_test_100.c9), SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@0 as SUM(aggregate_test_100.c9), MIN(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as MIN(aggregate_test_100.c9)]", - " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]", - " WindowAggExec: wdw=[MAX(aggregate_test_100.c9): Ok(Field { name: \"MAX(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]", - " WindowAggExec: wdw=[MIN(aggregate_test_100.c9): Ok(Field { name: \"MIN(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]", - " SortExec: [c2@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]" + " RepartitionExec: partitioning=RoundRobinBatch(2)", + " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]", + " WindowAggExec: wdw=[MAX(aggregate_test_100.c9): Ok(Field { name: \"MAX(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]", + " WindowAggExec: wdw=[MIN(aggregate_test_100.c9): Ok(Field { name: \"MIN(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]", + " SortExec: [c2@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]" ] }; From 0466b8d496c962b5737fa8c9716716878c5443c0 Mon Sep 17 00:00:00 2001 From: yangzhong Date: Tue, 27 Dec 2022 10:39:11 +0800 Subject: [PATCH 5/7] Refine the UT for the repartition rule by adding the essential BasicEnforcement rule --- .../src/physical_optimizer/repartition.rs | 60 ++++++++++++++----- 1 file changed, 45 insertions(+), 15 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs index baa812adee1a4..70ec3f36bf26c 100644 --- a/datafusion/core/src/physical_optimizer/repartition.rs +++ b/datafusion/core/src/physical_optimizer/repartition.rs @@ -239,6 +239,7 @@ mod tests { use crate::config::ConfigOptions; use crate::datasource::listing::PartitionedFile; use crate::datasource::object_store::ObjectStoreUrl; + use crate::physical_optimizer::enforcement::BasicEnforcement; use crate::physical_plan::aggregates::{ AggregateExec, AggregateMode, PhysicalGroupBy, }; @@ -289,12 +290,20 @@ mod tests { Arc::new(FilterExec::try_new(col("c1", &schema()).unwrap(), input).unwrap()) } - fn sort_exec(input: Arc) -> Arc { + fn sort_exec( + input: Arc, + preserve_partitioning: bool, + ) -> Arc { let sort_exprs = vec![PhysicalSortExpr { expr: col("c1", &schema()).unwrap(), options: SortOptions::default(), }]; - Arc::new(SortExec::try_new(sort_exprs, input, None).unwrap()) + Arc::new(SortExec::new_with_partitioning( + sort_exprs, + input, + preserve_partitioning, + None, + )) } fn projection_exec(input: Arc) -> Arc { @@ -354,9 +363,16 @@ mod tests { let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s| *s).collect(); // run optimizer - let optimizer = Repartition {}; - let optimized = optimizer - .optimize($PLAN, &SessionConfig::new().with_target_partitions(10))?; + let config = SessionConfig::new().with_target_partitions(10); + let optimizers: Vec> = vec![ + Arc::new(Repartition::new()), + // The `BasicEnforcement` is an essential rule to be applied. + // Otherwise, the correctness of the generated optimized plan cannot be guaranteed + Arc::new(BasicEnforcement::new()), + ]; + let optimized = optimizers.into_iter().fold($PLAN, |plan, optimizer| { + optimizer.optimize(plan, &config).unwrap() + }); // Now format correctly let plan = displayable(optimized.as_ref()).indent().to_string(); @@ -376,6 +392,7 @@ mod tests { let expected = [ "AggregateExec: mode=Final, gby=[], aggr=[]", + "CoalescePartitionsExec", "AggregateExec: mode=Partial, gby=[], aggr=[]", "RepartitionExec: partitioning=RoundRobinBatch(10)", "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]", @@ -391,6 +408,7 @@ mod tests { let expected = &[ "AggregateExec: mode=Final, gby=[], aggr=[]", + "CoalescePartitionsExec", "AggregateExec: mode=Partial, gby=[], aggr=[]", "FilterExec: c1@0", "RepartitionExec: partitioning=RoundRobinBatch(10)", @@ -407,6 +425,7 @@ mod tests { let expected = &[ "GlobalLimitExec: skip=0, fetch=100", + "CoalescePartitionsExec", "LocalLimitExec: fetch=100", "FilterExec: c1@0", // nothing sorts the data, so the local limit doesn't require sorted data either @@ -424,6 +443,7 @@ mod tests { let expected = &[ "GlobalLimitExec: skip=5, fetch=100", + "CoalescePartitionsExec", "LocalLimitExec: fetch=100", "FilterExec: c1@0", // nothing sorts the data, so the local limit doesn't require sorted data either @@ -437,7 +457,7 @@ mod tests { #[test] fn repartition_sorted_limit() -> Result<()> { - let plan = limit_exec(sort_exec(parquet_exec())); + let plan = limit_exec(sort_exec(parquet_exec(), false)); let expected = &[ "GlobalLimitExec: skip=0, fetch=100", @@ -453,7 +473,7 @@ mod tests { #[test] fn repartition_sorted_limit_with_filter() -> Result<()> { - let plan = limit_exec(filter_exec(sort_exec(parquet_exec()))); + let plan = limit_exec(filter_exec(sort_exec(parquet_exec(), false))); let expected = &[ "GlobalLimitExec: skip=0, fetch=100", @@ -475,9 +495,11 @@ mod tests { let expected = &[ "AggregateExec: mode=Final, gby=[], aggr=[]", + "CoalescePartitionsExec", "AggregateExec: mode=Partial, gby=[], aggr=[]", "RepartitionExec: partitioning=RoundRobinBatch(10)", "GlobalLimitExec: skip=0, fetch=100", + "CoalescePartitionsExec", "LocalLimitExec: fetch=100", "FilterExec: c1@0", // repartition should happen prior to the filter to maximize parallelism @@ -500,9 +522,11 @@ mod tests { let expected = &[ "AggregateExec: mode=Final, gby=[], aggr=[]", + "CoalescePartitionsExec", "AggregateExec: mode=Partial, gby=[], aggr=[]", "RepartitionExec: partitioning=RoundRobinBatch(10)", "GlobalLimitExec: skip=5, fetch=100", + "CoalescePartitionsExec", "LocalLimitExec: fetch=100", "FilterExec: c1@0", // repartition should happen prior to the filter to maximize parallelism @@ -521,7 +545,8 @@ mod tests { #[test] fn repartition_ignores_union() -> Result<()> { - let plan = Arc::new(UnionExec::new(vec![parquet_exec(); 5])); + let plan: Arc = + Arc::new(UnionExec::new(vec![parquet_exec(); 5])); let expected = &[ "UnionExec", @@ -538,11 +563,12 @@ mod tests { } #[test] - fn repartition_with_preserving_merge() -> Result<()> { + fn repartition_ignores_sort_preserving_merge() -> Result<()> { let plan = sort_preserving_merge_exec(parquet_exec()); let expected = &[ "SortPreservingMergeExec: [c1@0 ASC]", + "SortExec: [c1@0 ASC]", "RepartitionExec: partitioning=RoundRobinBatch(10)", "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]", ]; @@ -552,11 +578,12 @@ mod tests { } #[test] - fn repartition_transitively() -> Result<()> { + fn repartition_does_not_repartition_transitively() -> Result<()> { let plan = sort_preserving_merge_exec(projection_exec(parquet_exec())); let expected = &[ "SortPreservingMergeExec: [c1@0 ASC]", + "SortExec: [c1@0 ASC]", "ProjectionExec: expr=[c1@0 as c1]", "RepartitionExec: partitioning=RoundRobinBatch(10)", "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]", @@ -568,7 +595,8 @@ mod tests { #[test] fn repartition_transitively_past_sort_with_projection() -> Result<()> { - let plan = sort_preserving_merge_exec(sort_exec(projection_exec(parquet_exec()))); + let plan = + sort_preserving_merge_exec(sort_exec(projection_exec(parquet_exec()), true)); let expected = &[ "SortPreservingMergeExec: [c1@0 ASC]", @@ -585,7 +613,8 @@ mod tests { #[test] fn repartition_transitively_past_sort_with_filter() -> Result<()> { - let plan = sort_preserving_merge_exec(sort_exec(filter_exec(parquet_exec()))); + let plan = + sort_preserving_merge_exec(sort_exec(filter_exec(parquet_exec()), true)); let expected = &[ "SortPreservingMergeExec: [c1@0 ASC]", @@ -602,9 +631,10 @@ mod tests { #[test] fn repartition_transitively_past_sort_with_projection_and_filter() -> Result<()> { - let plan = sort_preserving_merge_exec(sort_exec(projection_exec(filter_exec( - parquet_exec(), - )))); + let plan = sort_preserving_merge_exec(sort_exec( + projection_exec(filter_exec(parquet_exec())), + true, + )); let expected = &[ "SortPreservingMergeExec: [c1@0 ASC]", From ad2d39e7757a78f659342e8e6281d141b81b48ca Mon Sep 17 00:00:00 2001 From: yangzhong Date: Tue, 27 Dec 2022 11:16:52 +0800 Subject: [PATCH 6/7] Fix UT failure for window function --- datafusion/core/tests/sql/window.rs | 125 ++++++++++++++++------------ 1 file changed, 71 insertions(+), 54 deletions(-) diff --git a/datafusion/core/tests/sql/window.rs b/datafusion/core/tests/sql/window.rs index 3d84bc68a4467..32438b6108a31 100644 --- a/datafusion/core/tests/sql/window.rs +++ b/datafusion/core/tests/sql/window.rs @@ -1783,7 +1783,7 @@ async fn test_window_partition_by_order_by() -> Result<()> { #[tokio::test] async fn test_window_agg_sort_reversed_plan() -> Result<()> { - let ctx = SessionContext::new(); + let ctx = SessionContext::with_config(SessionConfig::new().with_target_partitions(2)); register_aggregate_csv(&ctx).await?; let sql = "SELECT c9, @@ -1800,10 +1800,11 @@ async fn test_window_agg_sort_reversed_plan() -> Result<()> { let expected = { vec![ "ProjectionExec: expr=[c9@2 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@0 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@1 as sum2]", - " GlobalLimitExec: skip=0, fetch=5", - " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) }]", - " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]", - " SortExec: [c9@0 DESC]", + " RepartitionExec: partitioning=RoundRobinBatch(2)", + " GlobalLimitExec: skip=0, fetch=5", + " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) }]", + " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]", + " SortExec: [c9@0 DESC]", ] }; @@ -1835,7 +1836,7 @@ async fn test_window_agg_sort_reversed_plan() -> Result<()> { #[tokio::test] async fn test_window_agg_sort_reversed_plan_builtin() -> Result<()> { - let ctx = SessionContext::new(); + let ctx = SessionContext::with_config(SessionConfig::new().with_target_partitions(2)); register_aggregate_csv(&ctx).await?; let sql = "SELECT c9, @@ -1856,10 +1857,11 @@ async fn test_window_agg_sort_reversed_plan_builtin() -> Result<()> { let expected = { vec![ "ProjectionExec: expr=[c9@6 as c9, FIRST_VALUE(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@0 as fv1, FIRST_VALUE(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@3 as fv2, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as lag1, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@4 as lag2, LEAD(aggregate_test_100.c9,Int64(2),Int64(10101)) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as lead1, LEAD(aggregate_test_100.c9,Int64(2),Int64(10101)) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@5 as lead2]", - " GlobalLimitExec: skip=0, fetch=5", - " WindowAggExec: wdw=[FIRST_VALUE(aggregate_test_100.c9): Ok(Field { name: \"FIRST_VALUE(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) }, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)): Ok(Field { name: \"LAG(aggregate_test_100.c9,Int64(2),Int64(10101))\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(UInt32(NULL)) }, LEAD(aggregate_test_100.c9,Int64(2),Int64(10101)): Ok(Field { name: \"LEAD(aggregate_test_100.c9,Int64(2),Int64(10101))\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(UInt32(NULL)) }]", - " WindowAggExec: wdw=[FIRST_VALUE(aggregate_test_100.c9): Ok(Field { name: \"FIRST_VALUE(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)): Ok(Field { name: \"LAG(aggregate_test_100.c9,Int64(2),Int64(10101))\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(10)), end_bound: Following(UInt64(1)) }, LEAD(aggregate_test_100.c9,Int64(2),Int64(10101)): Ok(Field { name: \"LEAD(aggregate_test_100.c9,Int64(2),Int64(10101))\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(10)), end_bound: Following(UInt64(1)) }]", - " SortExec: [c9@0 DESC]", + " RepartitionExec: partitioning=RoundRobinBatch(2)", + " GlobalLimitExec: skip=0, fetch=5", + " WindowAggExec: wdw=[FIRST_VALUE(aggregate_test_100.c9): Ok(Field { name: \"FIRST_VALUE(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) }, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)): Ok(Field { name: \"LAG(aggregate_test_100.c9,Int64(2),Int64(10101))\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(UInt32(NULL)) }, LEAD(aggregate_test_100.c9,Int64(2),Int64(10101)): Ok(Field { name: \"LEAD(aggregate_test_100.c9,Int64(2),Int64(10101))\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(UInt32(NULL)) }]", + " WindowAggExec: wdw=[FIRST_VALUE(aggregate_test_100.c9): Ok(Field { name: \"FIRST_VALUE(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)): Ok(Field { name: \"LAG(aggregate_test_100.c9,Int64(2),Int64(10101))\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(10)), end_bound: Following(UInt64(1)) }, LEAD(aggregate_test_100.c9,Int64(2),Int64(10101)): Ok(Field { name: \"LEAD(aggregate_test_100.c9,Int64(2),Int64(10101))\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(10)), end_bound: Following(UInt64(1)) }]", + " SortExec: [c9@0 DESC]", ] }; @@ -1891,7 +1893,7 @@ async fn test_window_agg_sort_reversed_plan_builtin() -> Result<()> { #[tokio::test] async fn test_window_agg_sort_non_reversed_plan() -> Result<()> { - let ctx = SessionContext::new(); + let ctx = SessionContext::with_config(SessionConfig::new().with_target_partitions(2)); register_aggregate_csv(&ctx).await?; let sql = "SELECT c9, @@ -1908,11 +1910,12 @@ async fn test_window_agg_sort_non_reversed_plan() -> Result<()> { let expected = { vec![ "ProjectionExec: expr=[c9@2 as c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@0 as rn1, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@1 as rn2]", - " GlobalLimitExec: skip=0, fetch=5", - " WindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: \"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]", - " SortExec: [c9@1 ASC NULLS LAST]", - " WindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: \"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]", - " SortExec: [c9@0 DESC]", + " RepartitionExec: partitioning=RoundRobinBatch(2)", + " GlobalLimitExec: skip=0, fetch=5", + " WindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: \"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]", + " SortExec: [c9@1 ASC NULLS LAST]", + " WindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: \"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]", + " SortExec: [c9@0 DESC]", ] }; @@ -1944,7 +1947,7 @@ async fn test_window_agg_sort_non_reversed_plan() -> Result<()> { #[tokio::test] async fn test_window_agg_sort_multi_layer_non_reversed_plan() -> Result<()> { - let ctx = SessionContext::new(); + let ctx = SessionContext::with_config(SessionConfig::new().with_target_partitions(2)); register_aggregate_csv(&ctx).await?; let sql = "SELECT c9, @@ -1962,12 +1965,13 @@ async fn test_window_agg_sort_multi_layer_non_reversed_plan() -> Result<()> { let expected = { vec![ "ProjectionExec: expr=[c9@5 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@0 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c1 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@2 as sum2, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@1 as rn2]", - " GlobalLimitExec: skip=0, fetch=5", - " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]", - " SortExec: [c9@4 ASC NULLS LAST,c1@2 ASC NULLS LAST,c2@3 ASC NULLS LAST]", - " WindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: \"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]", - " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]", - " SortExec: [c9@2 DESC,c1@0 DESC]", + " RepartitionExec: partitioning=RoundRobinBatch(2)", + " GlobalLimitExec: skip=0, fetch=5", + " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]", + " SortExec: [c9@4 ASC NULLS LAST,c1@2 ASC NULLS LAST,c2@3 ASC NULLS LAST]", + " WindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: \"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]", + " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]", + " SortExec: [c9@2 DESC,c1@0 DESC]", ] }; @@ -1999,7 +2003,7 @@ async fn test_window_agg_sort_multi_layer_non_reversed_plan() -> Result<()> { #[tokio::test] async fn test_window_agg_complex_plan() -> Result<()> { - let ctx = SessionContext::new(); + let ctx = SessionContext::with_config(SessionConfig::new().with_target_partitions(2)); register_aggregate_null_cases_csv(&ctx).await?; let sql = "SELECT SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING) as a, @@ -2050,18 +2054,19 @@ async fn test_window_agg_complex_plan() -> Result<()> { let expected = { vec![ "ProjectionExec: expr=[SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@0 as a, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@0 as b, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@15 as c, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@7 as d, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@11 as e, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@15 as f, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@7 as g, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as h, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@16 as i, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@8 as j, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@16 as k, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@12 as l, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST, null_cases.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as m, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST, null_cases.c1 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@6 as n, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST, null_cases.c1 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@19 as o, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST, null_cases.c1 ASC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as p, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING@2 as a1, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING@2 as b1, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING@17 as c1, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING@9 as d1, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING@13 as e1, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING@17 as f1, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING@9 as g1, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as h1, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as j1, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@16 as k1, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@8 as l1, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@12 as m1, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@16 as n1, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@8 as o1, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING@3 as h11, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING@3 as j11, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING@18 as k11, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING@10 as l11, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS LAST] RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING@14 as m11, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING@18 as n11, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING@10 as o11]", - " GlobalLimitExec: skip=0, fetch=5", - " WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(10)), end_bound: Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(Int64(NULL)) }]", - " WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]", - " SortExec: [c3@17 ASC NULLS LAST,c2@16 ASC NULLS LAST]", - " WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]", - " SortExec: [c3@16 ASC NULLS LAST,c1@14 ASC]", - " WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(Int64(NULL)) }]", - " WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(11)), end_bound: Following(Int64(10)) }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(Int64(NULL)) }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(11)), end_bound: Following(Int64(NULL)) }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]", - " WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(10)), end_bound: Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(Int64(NULL)) }]", + " RepartitionExec: partitioning=RoundRobinBatch(2)", + " GlobalLimitExec: skip=0, fetch=5", + " WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(10)), end_bound: Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(Int64(NULL)) }]", + " WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]", + " SortExec: [c3@17 ASC NULLS LAST,c2@16 ASC NULLS LAST]", + " WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]", + " SortExec: [c3@16 ASC NULLS LAST,c1@14 ASC]", + " WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(Int64(NULL)) }]", + " WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(11)), end_bound: Following(Int64(10)) }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(Int64(NULL)) }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(11)), end_bound: Following(Int64(NULL)) }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]", " WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(10)), end_bound: Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(Int64(NULL)) }]", - " WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]", - " SortExec: [c3@2 DESC,c1@0 ASC NULLS LAST]", + " WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(10)), end_bound: Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(Int64(NULL)) }]", + " WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]", + " SortExec: [c3@2 DESC,c1@0 ASC NULLS LAST]", ] }; @@ -2079,7 +2084,9 @@ async fn test_window_agg_complex_plan() -> Result<()> { #[tokio::test] async fn test_window_agg_sort_orderby_reversed_partitionby_plan() -> Result<()> { - let config = SessionConfig::new().with_repartition_windows(false); + let config = SessionConfig::new() + .with_repartition_windows(false) + .with_target_partitions(2); let ctx = SessionContext::with_config(config); register_aggregate_csv(&ctx).await?; let sql = "SELECT @@ -2097,10 +2104,11 @@ async fn test_window_agg_sort_orderby_reversed_partitionby_plan() -> Result<()> let expected = { vec![ "ProjectionExec: expr=[c9@3 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@1 as sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@0 as sum2]", - " GlobalLimitExec: skip=0, fetch=5", - " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]", + " RepartitionExec: partitioning=RoundRobinBatch(2)", + " GlobalLimitExec: skip=0, fetch=5", " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]", - " SortExec: [c1@0 ASC NULLS LAST,c9@1 DESC]", + " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]", + " SortExec: [c1@0 ASC NULLS LAST,c9@1 DESC]", ] }; @@ -2132,7 +2140,9 @@ async fn test_window_agg_sort_orderby_reversed_partitionby_plan() -> Result<()> #[tokio::test] async fn test_window_agg_sort_partitionby_reversed_plan() -> Result<()> { - let config = SessionConfig::new().with_repartition_windows(false); + let config = SessionConfig::new() + .with_repartition_windows(false) + .with_target_partitions(2); let ctx = SessionContext::with_config(config); register_aggregate_csv(&ctx).await?; let sql = "SELECT @@ -2150,10 +2160,11 @@ async fn test_window_agg_sort_partitionby_reversed_plan() -> Result<()> { let expected = { vec![ "ProjectionExec: expr=[c9@3 as c9, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@0 as sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@1 as sum2]", - " GlobalLimitExec: skip=0, fetch=5", - " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) }]", - " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]", - " SortExec: [c1@0 ASC NULLS LAST,c9@1 DESC]", + " RepartitionExec: partitioning=RoundRobinBatch(2)", + " GlobalLimitExec: skip=0, fetch=5", + " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) }]", + " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]", + " SortExec: [c1@0 ASC NULLS LAST,c9@1 DESC]", ] }; @@ -2185,7 +2196,9 @@ async fn test_window_agg_sort_partitionby_reversed_plan() -> Result<()> { #[tokio::test] async fn test_window_agg_sort_orderby_reversed_binary_expr() -> Result<()> { - let config = SessionConfig::new().with_repartition_windows(false); + let config = SessionConfig::new() + .with_repartition_windows(false) + .with_target_partitions(2); let ctx = SessionContext::with_config(config); register_aggregate_csv(&ctx).await?; let sql = "SELECT c3, @@ -2202,10 +2215,11 @@ async fn test_window_agg_sort_orderby_reversed_binary_expr() -> Result<()> { let expected = { vec![ "ProjectionExec: expr=[c3@3 as c3, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@0 as sum2]", - " GlobalLimitExec: skip=0, fetch=5", - " WindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: \"SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(Int16(NULL)) }]", - " WindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: \"SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int16(NULL)), end_bound: CurrentRow }]", - " SortExec: [CAST(c3@1 AS Int16) + c4@2 DESC,c9@3 DESC,c2@0 ASC NULLS LAST]", + " RepartitionExec: partitioning=RoundRobinBatch(2)", + " GlobalLimitExec: skip=0, fetch=5", + " WindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: \"SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(Int16(NULL)) }]", + " WindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: \"SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int16(NULL)), end_bound: CurrentRow }]", + " SortExec: [CAST(c3@1 AS Int16) + c4@2 DESC,c9@3 DESC,c2@0 ASC NULLS LAST]", ] }; @@ -2296,7 +2310,9 @@ async fn test_remove_unnecessary_sort_in_sub_query() -> Result<()> { #[tokio::test] async fn test_window_agg_sort_orderby_reversed_partitionby_reversed_plan() -> Result<()> { - let config = SessionConfig::new().with_repartition_windows(false); + let config = SessionConfig::new() + .with_repartition_windows(false) + .with_target_partitions(2); let ctx = SessionContext::with_config(config); register_aggregate_csv(&ctx).await?; let sql = "SELECT c3, @@ -2313,10 +2329,11 @@ async fn test_window_agg_sort_orderby_reversed_partitionby_reversed_plan() -> Re let expected = { vec![ "ProjectionExec: expr=[c3@3 as c3, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c3] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@0 as sum2]", - " GlobalLimitExec: skip=0, fetch=5", - " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]", - " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int8(NULL)), end_bound: CurrentRow }]", - " SortExec: [c3@1 DESC,c9@2 DESC,c2@0 ASC NULLS LAST]", + " RepartitionExec: partitioning=RoundRobinBatch(2)", + " GlobalLimitExec: skip=0, fetch=5", + " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]", + " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int8(NULL)), end_bound: CurrentRow }]", + " SortExec: [c3@1 DESC,c9@2 DESC,c2@0 ASC NULLS LAST]", ] }; From 8eb3f568f40f851b52b572277d2661ed4c6ceed6 Mon Sep 17 00:00:00 2001 From: yangzhong Date: Tue, 27 Dec 2022 12:57:52 +0800 Subject: [PATCH 7/7] Fix example testing --- datafusion-examples/examples/custom_datasource.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion-examples/examples/custom_datasource.rs b/datafusion-examples/examples/custom_datasource.rs index 68e8f5a546305..c426d9611c608 100644 --- a/datafusion-examples/examples/custom_datasource.rs +++ b/datafusion-examples/examples/custom_datasource.rs @@ -264,6 +264,6 @@ impl ExecutionPlan for CustomExec { } fn statistics(&self) -> Statistics { - todo!() + Statistics::default() } }