diff --git a/datafusion-examples/examples/custom_datasource.rs b/datafusion-examples/examples/custom_datasource.rs index 68e8f5a546305..c426d9611c608 100644 --- a/datafusion-examples/examples/custom_datasource.rs +++ b/datafusion-examples/examples/custom_datasource.rs @@ -264,6 +264,6 @@ impl ExecutionPlan for CustomExec { } fn statistics(&self) -> Statistics { - todo!() + Statistics::default() } } diff --git a/datafusion/core/src/config.rs b/datafusion/core/src/config.rs index 4dabb53d24250..976b8c07376ac 100644 --- a/datafusion/core/src/config.rs +++ b/datafusion/core/src/config.rs @@ -131,6 +131,10 @@ pub const OPT_PREFER_HASH_JOIN: &str = "datafusion.optimizer.prefer_hash_join"; pub const OPT_HASH_JOIN_SINGLE_PARTITION_THRESHOLD: &str = "datafusion.optimizer.hash_join_single_partition_threshold"; +/// Configuration option "datafusion.execution.round_robin_repartition" +pub const OPT_ENABLE_ROUND_ROBIN_REPARTITION: &str = + "datafusion.optimizer.enable_round_robin_repartition"; + /// Definition of a configuration option pub struct ConfigDefinition { /// key used to identifier this configuration option @@ -409,6 +413,11 @@ impl BuiltInConfigs { "The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition", 1024 * 1024, ), + ConfigDefinition::new_bool( + OPT_ENABLE_ROUND_ROBIN_REPARTITION, + "When set to true, the physical plan optimizer will try to add round robin repartition to increase parallelism to leverage more CPU cores", + true, + ), ] } } diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index 978bde2a2ed8b..2164176913b1e 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -76,7 +76,8 @@ use crate::physical_optimizer::repartition::Repartition; use crate::config::{ ConfigOptions, OPT_BATCH_SIZE, OPT_COALESCE_BATCHES, OPT_COALESCE_TARGET_BATCH_SIZE, - OPT_FILTER_NULL_JOIN_KEYS, OPT_OPTIMIZER_MAX_PASSES, OPT_OPTIMIZER_SKIP_FAILED_RULES, + OPT_ENABLE_ROUND_ROBIN_REPARTITION, OPT_FILTER_NULL_JOIN_KEYS, + OPT_OPTIMIZER_MAX_PASSES, OPT_OPTIMIZER_SKIP_FAILED_RULES, }; use crate::execution::{runtime_env::RuntimeEnv, FunctionRegistry}; use crate::physical_optimizer::enforcement::BasicEnforcement; @@ -100,6 +101,7 @@ use url::Url; use crate::catalog::listing_schema::ListingSchemaProvider; use crate::datasource::object_store::ObjectStoreUrl; use crate::execution::memory_pool::MemoryPool; +use crate::physical_optimizer::global_sort_selection::GlobalSortSelection; use crate::physical_optimizer::optimize_sorts::OptimizeSorts; use uuid::Uuid; @@ -1557,11 +1559,47 @@ impl SessionState { ); } - let mut physical_optimizers: Vec> = vec![ - Arc::new(AggregateStatistics::new()), - Arc::new(JoinSelection::new()), - ]; + // We need to take care of the rule ordering. They may influence each other. + let mut physical_optimizers: Vec> = + vec![Arc::new(AggregateStatistics::new())]; + // - In order to increase the parallelism, it will change the output partitioning + // of some operators in the plan tree, which will influence other rules. + // Therefore, it should be run as soon as possible. + // - The reason to make it optional is + // - it's not used for the distributed engine, Ballista. + // - it's conflicted with some parts of the BasicEnforcement, since it will + // introduce additional repartitioning while the BasicEnforcement aims at + // reducing unnecessary repartitioning. + if config + .config_options + .get_bool(OPT_ENABLE_ROUND_ROBIN_REPARTITION) + .unwrap_or_default() + { + physical_optimizers.push(Arc::new(Repartition::new())); + } + //- Currently it will depend on the partition number to decide whether to change the + // single node sort to parallel local sort and merge. Therefore, it should be run + // after the Repartition. + // - Since it will change the output ordering of some operators, it should be run + // before JoinSelection and BasicEnforcement, which may depend on that. + physical_optimizers.push(Arc::new(GlobalSortSelection::new())); + // Statistics-base join selection will change the Auto mode to real join implementation, + // like collect left, or hash join, or future sort merge join, which will + // influence the BasicEnforcement to decide whether to add additional repartition + // and local sort to meet the distribution and ordering requirements. + // Therefore, it should be run before BasicEnforcement + physical_optimizers.push(Arc::new(JoinSelection::new())); + // It's for adding essential repartition and local sorting operator to satisfy the + // required distribution and local sort. + // Please make sure that the whole plan tree is determined. physical_optimizers.push(Arc::new(BasicEnforcement::new())); + // `BasicEnforcement` stage conservatively inserts `SortExec`s to satisfy ordering requirements. + // However, a deeper analysis may sometimes reveal that such a `SortExec` is actually unnecessary. + // These cases typically arise when we have reversible `WindowAggExec`s or deep subqueries. The + // rule below performs this analysis and removes unnecessary `SortExec`s. + physical_optimizers.push(Arc::new(OptimizeSorts::new())); + // It will not influence the distribution and ordering of the whole plan tree. + // Therefore, to avoid influencing other rules, it should be run at last. if config .config_options .get_bool(OPT_COALESCE_BATCHES) @@ -1576,16 +1614,6 @@ impl SessionState { .unwrap(), ))); } - physical_optimizers.push(Arc::new(Repartition::new())); - // Repartition rule could introduce additional RepartitionExec with RoundRobin partitioning. - // To make sure the SinglePartition is satisfied, run the BasicEnforcement again, originally it was the AddCoalescePartitionsExec here. - physical_optimizers.push(Arc::new(BasicEnforcement::new())); - - // `BasicEnforcement` stage conservatively inserts `SortExec`s to satisfy ordering requirements. - // However, a deeper analysis may sometimes reveal that such a `SortExec` is actually unnecessary. - // These cases typically arise when we have reversible `WindowAggExec`s or deep subqueries. The - // rule below performs this analysis and removes unnecessary `SortExec`s. - physical_optimizers.push(Arc::new(OptimizeSorts::new())); let mut this = SessionState { session_id, diff --git a/datafusion/core/src/physical_optimizer/coalesce_batches.rs b/datafusion/core/src/physical_optimizer/coalesce_batches.rs index 941c5c14148ec..e0d20be16646d 100644 --- a/datafusion/core/src/physical_optimizer/coalesce_batches.rs +++ b/datafusion/core/src/physical_optimizer/coalesce_batches.rs @@ -23,7 +23,7 @@ use crate::{ physical_optimizer::PhysicalOptimizerRule, physical_plan::{ coalesce_batches::CoalesceBatchesExec, filter::FilterExec, joins::HashJoinExec, - repartition::RepartitionExec, rewrite::TreeNodeRewritable, + repartition::RepartitionExec, rewrite::TreeNodeRewritable, Partitioning, }, }; use std::sync::Arc; @@ -57,7 +57,16 @@ impl PhysicalOptimizerRule for CoalesceBatches { // See https://github.com/apache/arrow-datafusion/issues/139 let wrap_in_coalesce = plan_any.downcast_ref::().is_some() || plan_any.downcast_ref::().is_some() - || plan_any.downcast_ref::().is_some(); + // Don't need to add CoalesceBatchesExec after a round robin RepartitionExec + || plan_any + .downcast_ref::() + .map(|repart_exec| { + !matches!( + repart_exec.partitioning().clone(), + Partitioning::RoundRobinBatch(_) + ) + }) + .unwrap_or(false); if wrap_in_coalesce { Ok(Some(Arc::new(CoalesceBatchesExec::new( plan.clone(), diff --git a/datafusion/core/src/physical_optimizer/enforcement.rs b/datafusion/core/src/physical_optimizer/enforcement.rs index 4a496a3ef9f16..30a7961919538 100644 --- a/datafusion/core/src/physical_optimizer/enforcement.rs +++ b/datafusion/core/src/physical_optimizer/enforcement.rs @@ -30,8 +30,7 @@ use crate::physical_plan::joins::{ use crate::physical_plan::projection::ProjectionExec; use crate::physical_plan::repartition::RepartitionExec; use crate::physical_plan::rewrite::TreeNodeRewritable; -use crate::physical_plan::sorts::sort::{SortExec, SortOptions}; -use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; +use crate::physical_plan::sorts::sort::SortOptions; use crate::physical_plan::windows::WindowAggExec; use crate::physical_plan::Partitioning; use crate::physical_plan::{with_new_children_if_necessary, Distribution, ExecutionPlan}; @@ -844,36 +843,6 @@ fn ensure_distribution_and_ordering( if plan.children().is_empty() { return Ok(plan); } - // It's mainly for changing the single node global SortExec to - // the SortPreservingMergeExec with multiple local SortExec. - // What's more, if limit exists, it can also be pushed down to the local sort - let plan = plan - .as_any() - .downcast_ref::() - .and_then(|sort_exec| { - // There are three situations that there's no need for this optimization - // - There's only one input partition; - // - It's already preserving the partitioning so that it can be regarded as a local sort - // - There's no limit pushed down to the local sort (It's still controversial) - if sort_exec.input().output_partitioning().partition_count() > 1 - && !sort_exec.preserve_partitioning() - && sort_exec.fetch().is_some() - { - let sort = SortExec::new_with_partitioning( - sort_exec.expr().to_vec(), - sort_exec.input().clone(), - true, - sort_exec.fetch(), - ); - Some(Arc::new(SortPreservingMergeExec::new( - sort_exec.expr().to_vec(), - Arc::new(sort), - ))) - } else { - None - } - }) - .map_or(plan, |new_plan| new_plan); let required_input_distributions = plan.required_input_distribution(); let required_input_orderings = plan.required_input_ordering(); diff --git a/datafusion/core/src/physical_optimizer/global_sort_selection.rs b/datafusion/core/src/physical_optimizer/global_sort_selection.rs new file mode 100644 index 0000000000000..a6bb8229c05bf --- /dev/null +++ b/datafusion/core/src/physical_optimizer/global_sort_selection.rs @@ -0,0 +1,89 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Select the efficient global sort implementation based on sort details. + +use std::sync::Arc; + +use crate::error::Result; +use crate::physical_optimizer::PhysicalOptimizerRule; +use crate::physical_plan::rewrite::TreeNodeRewritable; +use crate::physical_plan::sorts::sort::SortExec; +use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; +use crate::physical_plan::ExecutionPlan; +use crate::prelude::SessionConfig; + +/// Currently for a sort operator, if +/// - there are more than one input partitions +/// - and there's some limit which can be pushed down to each of its input partitions +/// then [SortPreservingMergeExec] with local sort with a limit pushed down will be preferred; +/// Otherwise, the normal global sort [SortExec] will be used. +/// Later more intelligent statistics-based decision can also be introduced. +/// For example, for a small data set, the global sort may be efficient enough +#[derive(Default)] +pub struct GlobalSortSelection {} + +impl GlobalSortSelection { + #[allow(missing_docs)] + pub fn new() -> Self { + Self {} + } +} + +impl PhysicalOptimizerRule for GlobalSortSelection { + fn optimize( + &self, + plan: Arc, + _config: &SessionConfig, + ) -> Result> { + plan.transform_up(&|plan| { + Ok(plan + .as_any() + .downcast_ref::() + .and_then(|sort_exec| { + if sort_exec.input().output_partitioning().partition_count() > 1 + && sort_exec.fetch().is_some() + // It's already preserving the partitioning so that it can be regarded as a local sort + && !sort_exec.preserve_partitioning() + { + let sort = SortExec::new_with_partitioning( + sort_exec.expr().to_vec(), + sort_exec.input().clone(), + true, + sort_exec.fetch(), + ); + let global_sort: Arc = + Arc::new(SortPreservingMergeExec::new( + sort_exec.expr().to_vec(), + Arc::new(sort), + )); + Some(global_sort) + } else { + None + } + })) + }) + } + + fn name(&self) -> &str { + "global_sort_selection" + } + + fn schema_check(&self) -> bool { + false + } +} diff --git a/datafusion/core/src/physical_optimizer/mod.rs b/datafusion/core/src/physical_optimizer/mod.rs index 0fd0600fbe678..86ec6f8464f29 100644 --- a/datafusion/core/src/physical_optimizer/mod.rs +++ b/datafusion/core/src/physical_optimizer/mod.rs @@ -21,6 +21,7 @@ pub mod aggregate_statistics; pub mod coalesce_batches; pub mod enforcement; +pub mod global_sort_selection; pub mod join_selection; pub mod optimize_sorts; pub mod optimizer; diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs index 7bdff91ecd7bb..2d3f7a0e17673 100644 --- a/datafusion/core/src/physical_optimizer/repartition.rs +++ b/datafusion/core/src/physical_optimizer/repartition.rs @@ -143,7 +143,7 @@ impl Repartition { /// is an intervening node that does not `maintain_input_order` /// /// if `can_reorder` is false, means that the output of this node -/// can not be reordered as as something upstream is relying on that order +/// can not be reordered as as the final output is relying on that order /// /// If 'would_benefit` is false, the upstream operator doesn't /// benefit from additional repartition @@ -161,26 +161,6 @@ fn optimize_partitions( // leaf node - don't replace children plan } else { - let can_reorder_children = - match (plan.relies_on_input_order(), plan.maintains_input_order()) { - (true, _) => { - // `plan` itself relies on the order of its - // children, so don't reorder them! - false - } - (false, false) => { - // `plan` may reorder the input itself, so no need - // to preserve the order of any children - true - } - (false, true) => { - // `plan` will maintain the order, so we can only - // repartition children if it is ok to reorder the - // output of this node - can_reorder - } - }; - let children = plan .children() .iter() @@ -188,7 +168,7 @@ fn optimize_partitions( optimize_partitions( target_partitions, child.clone(), - can_reorder_children, + can_reorder || child.output_ordering().is_none(), plan.benefits_from_input_partitioning(), ) }) @@ -197,7 +177,7 @@ fn optimize_partitions( }; // decide if we should bother trying to repartition the output of this plan - let could_repartition = match new_plan.output_partitioning() { + let mut could_repartition = match new_plan.output_partitioning() { // Apply when underlying node has less than `self.target_partitions` amount of concurrency RoundRobinBatch(x) => x < target_partitions, UnknownPartitioning(x) => x < target_partitions, @@ -206,6 +186,13 @@ fn optimize_partitions( Hash(_, _) => false, }; + // Don't need to apply when the returned row count is not greater than 1 + let stats = new_plan.statistics(); + if stats.is_exact { + could_repartition = could_repartition + && stats.num_rows.map(|num_rows| num_rows > 1).unwrap_or(true); + } + if would_benefit && could_repartition && can_reorder { Ok(Arc::new(RepartitionExec::try_new( new_plan, @@ -226,7 +213,12 @@ impl PhysicalOptimizerRule for Repartition { if config.target_partitions() == 1 { Ok(plan) } else { - optimize_partitions(config.target_partitions(), plan, false, false) + optimize_partitions( + config.target_partitions(), + plan.clone(), + plan.output_ordering().is_none(), + false, + ) } } @@ -246,6 +238,7 @@ mod tests { use super::*; use crate::datasource::listing::PartitionedFile; use crate::datasource::object_store::ObjectStoreUrl; + use crate::physical_optimizer::enforcement::BasicEnforcement; use crate::physical_plan::aggregates::{ AggregateExec, AggregateMode, PhysicalGroupBy, }; @@ -295,12 +288,20 @@ mod tests { Arc::new(FilterExec::try_new(col("c1", &schema()).unwrap(), input).unwrap()) } - fn sort_exec(input: Arc) -> Arc { + fn sort_exec( + input: Arc, + preserve_partitioning: bool, + ) -> Arc { let sort_exprs = vec![PhysicalSortExpr { expr: col("c1", &schema()).unwrap(), options: SortOptions::default(), }]; - Arc::new(SortExec::try_new(sort_exprs, input, None).unwrap()) + Arc::new(SortExec::new_with_partitioning( + sort_exprs, + input, + preserve_partitioning, + None, + )) } fn projection_exec(input: Arc) -> Arc { @@ -360,9 +361,16 @@ mod tests { let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s| *s).collect(); // run optimizer - let optimizer = Repartition {}; - let optimized = optimizer - .optimize($PLAN, &SessionConfig::new().with_target_partitions(10))?; + let config = SessionConfig::new().with_target_partitions(10); + let optimizers: Vec> = vec![ + Arc::new(Repartition::new()), + // The `BasicEnforcement` is an essential rule to be applied. + // Otherwise, the correctness of the generated optimized plan cannot be guaranteed + Arc::new(BasicEnforcement::new()), + ]; + let optimized = optimizers.into_iter().fold($PLAN, |plan, optimizer| { + optimizer.optimize(plan, &config).unwrap() + }); // Now format correctly let plan = displayable(optimized.as_ref()).indent().to_string(); @@ -382,6 +390,7 @@ mod tests { let expected = [ "AggregateExec: mode=Final, gby=[], aggr=[]", + "CoalescePartitionsExec", "AggregateExec: mode=Partial, gby=[], aggr=[]", "RepartitionExec: partitioning=RoundRobinBatch(10)", "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]", @@ -397,6 +406,7 @@ mod tests { let expected = &[ "AggregateExec: mode=Final, gby=[], aggr=[]", + "CoalescePartitionsExec", "AggregateExec: mode=Partial, gby=[], aggr=[]", "FilterExec: c1@0", "RepartitionExec: partitioning=RoundRobinBatch(10)", @@ -413,6 +423,7 @@ mod tests { let expected = &[ "GlobalLimitExec: skip=0, fetch=100", + "CoalescePartitionsExec", "LocalLimitExec: fetch=100", "FilterExec: c1@0", // nothing sorts the data, so the local limit doesn't require sorted data either @@ -430,6 +441,7 @@ mod tests { let expected = &[ "GlobalLimitExec: skip=5, fetch=100", + "CoalescePartitionsExec", "LocalLimitExec: fetch=100", "FilterExec: c1@0", // nothing sorts the data, so the local limit doesn't require sorted data either @@ -443,7 +455,7 @@ mod tests { #[test] fn repartition_sorted_limit() -> Result<()> { - let plan = limit_exec(sort_exec(parquet_exec())); + let plan = limit_exec(sort_exec(parquet_exec(), false)); let expected = &[ "GlobalLimitExec: skip=0, fetch=100", @@ -459,7 +471,7 @@ mod tests { #[test] fn repartition_sorted_limit_with_filter() -> Result<()> { - let plan = limit_exec(filter_exec(sort_exec(parquet_exec()))); + let plan = limit_exec(filter_exec(sort_exec(parquet_exec(), false))); let expected = &[ "GlobalLimitExec: skip=0, fetch=100", @@ -481,9 +493,11 @@ mod tests { let expected = &[ "AggregateExec: mode=Final, gby=[], aggr=[]", + "CoalescePartitionsExec", "AggregateExec: mode=Partial, gby=[], aggr=[]", "RepartitionExec: partitioning=RoundRobinBatch(10)", "GlobalLimitExec: skip=0, fetch=100", + "CoalescePartitionsExec", "LocalLimitExec: fetch=100", "FilterExec: c1@0", // repartition should happen prior to the filter to maximize parallelism @@ -506,9 +520,11 @@ mod tests { let expected = &[ "AggregateExec: mode=Final, gby=[], aggr=[]", + "CoalescePartitionsExec", "AggregateExec: mode=Partial, gby=[], aggr=[]", "RepartitionExec: partitioning=RoundRobinBatch(10)", "GlobalLimitExec: skip=5, fetch=100", + "CoalescePartitionsExec", "LocalLimitExec: fetch=100", "FilterExec: c1@0", // repartition should happen prior to the filter to maximize parallelism @@ -527,7 +543,8 @@ mod tests { #[test] fn repartition_ignores_union() -> Result<()> { - let plan = Arc::new(UnionExec::new(vec![parquet_exec(); 5])); + let plan: Arc = + Arc::new(UnionExec::new(vec![parquet_exec(); 5])); let expected = &[ "UnionExec", @@ -549,7 +566,8 @@ mod tests { let expected = &[ "SortPreservingMergeExec: [c1@0 ASC]", - // Expect no repartition of SortPreservingMergeExec + "SortExec: [c1@0 ASC]", + "RepartitionExec: partitioning=RoundRobinBatch(10)", "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]", ]; @@ -563,9 +581,9 @@ mod tests { let expected = &[ "SortPreservingMergeExec: [c1@0 ASC]", - // Expect no repartition of SortPreservingMergeExec - // even though there is a projection exec between it + "SortExec: [c1@0 ASC]", "ProjectionExec: expr=[c1@0 as c1]", + "RepartitionExec: partitioning=RoundRobinBatch(10)", "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]", ]; @@ -575,7 +593,8 @@ mod tests { #[test] fn repartition_transitively_past_sort_with_projection() -> Result<()> { - let plan = sort_preserving_merge_exec(sort_exec(projection_exec(parquet_exec()))); + let plan = + sort_preserving_merge_exec(sort_exec(projection_exec(parquet_exec()), true)); let expected = &[ "SortPreservingMergeExec: [c1@0 ASC]", @@ -592,7 +611,8 @@ mod tests { #[test] fn repartition_transitively_past_sort_with_filter() -> Result<()> { - let plan = sort_preserving_merge_exec(sort_exec(filter_exec(parquet_exec()))); + let plan = + sort_preserving_merge_exec(sort_exec(filter_exec(parquet_exec()), true)); let expected = &[ "SortPreservingMergeExec: [c1@0 ASC]", @@ -609,9 +629,10 @@ mod tests { #[test] fn repartition_transitively_past_sort_with_projection_and_filter() -> Result<()> { - let plan = sort_preserving_merge_exec(sort_exec(projection_exec(filter_exec( - parquet_exec(), - )))); + let plan = sort_preserving_merge_exec(sort_exec( + projection_exec(filter_exec(parquet_exec())), + true, + )); let expected = &[ "SortPreservingMergeExec: [c1@0 ASC]", diff --git a/datafusion/core/src/physical_plan/empty.rs b/datafusion/core/src/physical_plan/empty.rs index 4751dade1ddab..b71f6739b65b6 100644 --- a/datafusion/core/src/physical_plan/empty.rs +++ b/datafusion/core/src/physical_plan/empty.rs @@ -24,7 +24,7 @@ use crate::error::{DataFusionError, Result}; use crate::physical_plan::{ memory::MemoryStream, DisplayFormatType, ExecutionPlan, Partitioning, }; -use arrow::array::NullArray; +use arrow::array::{ArrayRef, NullArray}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use log::debug; @@ -68,13 +68,25 @@ impl EmptyExec { fn data(&self) -> Result> { let batch = if self.produce_one_row { + let n_field = self.schema.fields.len(); + // hack for https://github.com/apache/arrow-datafusion/pull/3242 + let n_field = if n_field == 0 { 1 } else { n_field }; vec![RecordBatch::try_new( - Arc::new(Schema::new(vec![Field::new( - "placeholder", - DataType::Null, - true, - )])), - vec![Arc::new(NullArray::new(1))], + Arc::new(Schema::new( + (0..n_field) + .into_iter() + .map(|i| { + Field::new(format!("placeholder_{}", i), DataType::Null, true) + }) + .collect(), + )), + (0..n_field) + .into_iter() + .map(|_i| { + let ret: ArrayRef = Arc::new(NullArray::new(1)); + ret + }) + .collect(), )?] } else { vec![] diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index e16c518b62d94..768c42978936b 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -1792,6 +1792,11 @@ impl DefaultPhysicalPlanner { new_plan.schema() ))); } + trace!( + "Optimized physical plan by {}:\n{}\n", + optimizer.name(), + displayable(new_plan.as_ref()).indent() + ); observer(new_plan.as_ref(), optimizer.as_ref()) } debug!( diff --git a/datafusion/core/tests/sql/joins.rs b/datafusion/core/tests/sql/joins.rs index 7e701b56dfc53..b6c78b0cf3814 100644 --- a/datafusion/core/tests/sql/joins.rs +++ b/datafusion/core/tests/sql/joins.rs @@ -754,7 +754,7 @@ async fn cross_join() { assert_eq!(4 * 4, actual.len()); - let sql = "SELECT t1_id, t1_name, t2_name FROM t1 CROSS JOIN t2"; + let sql = "SELECT t1_id, t1_name, t2_name FROM t1 CROSS JOIN t2 ORDER BY t1_id"; let actual = execute(&ctx, sql).await; assert_eq!(4 * 4, actual.len()); @@ -2201,8 +2201,8 @@ async fn right_semi_join() -> Result<()> { "SortExec: [t1_id@0 ASC NULLS LAST]", " CoalescePartitionsExec", " ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as t1_int]", - " CoalesceBatchesExec: target_batch_size=4096", - " RepartitionExec: partitioning=RoundRobinBatch(2)", + " RepartitionExec: partitioning=RoundRobinBatch(2)", + " CoalesceBatchesExec: target_batch_size=4096", " HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(Column { name: \"t2_id\", index: 0 }, Column { name: \"t1_id\", index: 0 })], filter=BinaryExpr { left: Column { name: \"t2_name\", index: 1 }, op: NotEq, right: Column { name: \"t1_name\", index: 0 } }", " MemoryExec: partitions=1, partition_sizes=[1]", " MemoryExec: partitions=1, partition_sizes=[1]", @@ -2539,8 +2539,8 @@ async fn left_side_expr_key_inner_join() -> Result<()> { vec![ "ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id, t1_name@1 as t1_name]", " ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t2_id@3 as t2_id]", - " CoalesceBatchesExec: target_batch_size=4096", - " RepartitionExec: partitioning=RoundRobinBatch(2)", + " RepartitionExec: partitioning=RoundRobinBatch(2)", + " CoalesceBatchesExec: target_batch_size=4096", " HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(Column { name: \"t1.t1_id + Int64(11)\", index: 2 }, Column { name: \"t2_id\", index: 0 })]", " CoalescePartitionsExec", " ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + CAST(11 AS UInt32) as t1.t1_id + Int64(11)]", diff --git a/datafusion/core/tests/sql/timestamp.rs b/datafusion/core/tests/sql/timestamp.rs index 33ef1e51b6649..885adda47bee4 100644 --- a/datafusion/core/tests/sql/timestamp.rs +++ b/datafusion/core/tests/sql/timestamp.rs @@ -557,7 +557,7 @@ async fn timestamp_coercion() -> Result<()> { ctx.register_table("table_a", table_a)?; ctx.register_table("table_b", table_b)?; - let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b"; + let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b order by table_a.ts desc, table_b.ts desc"; let actual = execute_to_batches(&ctx, sql).await; let expected = vec![ "+---------------------------+-------------------------------+-------------------------+", @@ -584,7 +584,7 @@ async fn timestamp_coercion() -> Result<()> { ctx.register_table("table_a", table_a)?; ctx.register_table("table_b", table_b)?; - let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b"; + let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b order by table_a.ts desc, table_b.ts desc"; let actual = execute_to_batches(&ctx, sql).await; let expected = vec![ "+---------------------+----------------------------+-------------------------+", @@ -612,7 +612,7 @@ async fn timestamp_coercion() -> Result<()> { ctx.register_table("table_a", table_a)?; ctx.register_table("table_b", table_b)?; - let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b"; + let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b order by table_a.ts desc, table_b.ts desc"; let actual = execute_to_batches(&ctx, sql).await; let expected = vec![ "+---------------------+----------------------------+-------------------------+", @@ -639,7 +639,7 @@ async fn timestamp_coercion() -> Result<()> { ctx.register_table("table_a", table_a)?; ctx.register_table("table_b", table_b)?; - let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b"; + let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b order by table_a.ts desc, table_b.ts desc"; let actual = execute_to_batches(&ctx, sql).await; let expected = vec![ "+-------------------------+---------------------+-------------------------+", @@ -666,7 +666,7 @@ async fn timestamp_coercion() -> Result<()> { ctx.register_table("table_a", table_a)?; ctx.register_table("table_b", table_b)?; - let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b"; + let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b order by table_a.ts desc, table_b.ts desc"; let actual = execute_to_batches(&ctx, sql).await; let expected = vec![ "+-------------------------+----------------------------+-------------------------+", @@ -693,7 +693,7 @@ async fn timestamp_coercion() -> Result<()> { ctx.register_table("table_a", table_a)?; ctx.register_table("table_b", table_b)?; - let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b"; + let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b order by table_a.ts desc, table_b.ts desc"; let actual = execute_to_batches(&ctx, sql).await; let expected = vec![ "+-------------------------+----------------------------+-------------------------+", @@ -720,7 +720,7 @@ async fn timestamp_coercion() -> Result<()> { ctx.register_table("table_a", table_a)?; ctx.register_table("table_b", table_b)?; - let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b"; + let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b order by table_a.ts desc, table_b.ts desc"; let actual = execute_to_batches(&ctx, sql).await; let expected = vec![ "+----------------------------+---------------------+-------------------------+", @@ -747,7 +747,7 @@ async fn timestamp_coercion() -> Result<()> { ctx.register_table("table_a", table_a)?; ctx.register_table("table_b", table_b)?; - let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b"; + let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b order by table_a.ts desc, table_b.ts desc"; let actual = execute_to_batches(&ctx, sql).await; let expected = vec![ "+----------------------------+-------------------------+-------------------------+", @@ -774,7 +774,7 @@ async fn timestamp_coercion() -> Result<()> { ctx.register_table("table_a", table_a)?; ctx.register_table("table_b", table_b)?; - let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b"; + let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b order by table_a.ts desc, table_b.ts desc"; let actual = execute_to_batches(&ctx, sql).await; let expected = vec![ "+----------------------------+----------------------------+-------------------------+", @@ -801,7 +801,7 @@ async fn timestamp_coercion() -> Result<()> { ctx.register_table("table_a", table_a)?; ctx.register_table("table_b", table_b)?; - let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b"; + let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b order by table_a.ts desc, table_b.ts desc"; let actual = execute_to_batches(&ctx, sql).await; let expected = vec![ "+----------------------------+---------------------+-------------------------+", @@ -828,7 +828,7 @@ async fn timestamp_coercion() -> Result<()> { ctx.register_table("table_a", table_a)?; ctx.register_table("table_b", table_b)?; - let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b"; + let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b order by table_a.ts desc, table_b.ts desc"; let actual = execute_to_batches(&ctx, sql).await; let expected = vec![ "+----------------------------+-------------------------+-------------------------+", @@ -855,7 +855,7 @@ async fn timestamp_coercion() -> Result<()> { ctx.register_table("table_a", table_a)?; ctx.register_table("table_b", table_b)?; - let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b"; + let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b order by table_a.ts desc, table_b.ts desc"; let actual = execute_to_batches(&ctx, sql).await; let expected = vec![ "+----------------------------+----------------------------+-------------------------+", diff --git a/datafusion/core/tests/sql/union.rs b/datafusion/core/tests/sql/union.rs index d5519ce2d0024..29856a37b1a96 100644 --- a/datafusion/core/tests/sql/union.rs +++ b/datafusion/core/tests/sql/union.rs @@ -86,7 +86,7 @@ async fn union_schemas() -> Result<()> { SessionContext::with_config(SessionConfig::new().with_information_schema(true)); let result = ctx - .sql("SELECT 1 A UNION ALL SELECT 2") + .sql("SELECT 1 A UNION ALL SELECT 2 order by 1") .await .unwrap() .collect() @@ -105,7 +105,7 @@ async fn union_schemas() -> Result<()> { assert_batches_eq!(expected, &result); let result = ctx - .sql("SELECT 1 UNION SELECT 2") + .sql("SELECT 1 UNION SELECT 2 order by 1") .await .unwrap() .collect() diff --git a/datafusion/core/tests/sql/window.rs b/datafusion/core/tests/sql/window.rs index 41278e1208b78..32438b6108a31 100644 --- a/datafusion/core/tests/sql/window.rs +++ b/datafusion/core/tests/sql/window.rs @@ -1633,7 +1633,10 @@ async fn test_window_frame_nth_value_aggregate() -> Result<()> { #[tokio::test] async fn test_window_agg_sort() -> Result<()> { - let ctx = SessionContext::new(); + // We need to specify the target partition number. + // Otherwise, the default value used may vary on different environment + // with different cpu core number, which may cause the UT failure. + let ctx = SessionContext::with_config(SessionConfig::new().with_target_partitions(2)); register_aggregate_csv(&ctx).await?; let sql = "SELECT c9, @@ -1649,9 +1652,10 @@ async fn test_window_agg_sort() -> Result<()> { let expected = { vec![ "ProjectionExec: expr=[c9@3 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@0 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as sum2]", - " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]", + " RepartitionExec: partitioning=RoundRobinBatch(2)", " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]", - " SortExec: [c9@1 ASC NULLS LAST,c8@0 ASC NULLS LAST]", + " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]", + " SortExec: [c9@1 ASC NULLS LAST,c8@0 ASC NULLS LAST]", ] }; @@ -1681,10 +1685,11 @@ async fn over_order_by_sort_keys_sorting_prefix_compacting() -> Result<()> { let expected = { vec![ "ProjectionExec: expr=[c2@3 as c2, MAX(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as MAX(aggregate_test_100.c9), SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@0 as SUM(aggregate_test_100.c9), MIN(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as MIN(aggregate_test_100.c9)]", - " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]", - " WindowAggExec: wdw=[MAX(aggregate_test_100.c9): Ok(Field { name: \"MAX(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]", - " WindowAggExec: wdw=[MIN(aggregate_test_100.c9): Ok(Field { name: \"MIN(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]", - " SortExec: [c2@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]" + " RepartitionExec: partitioning=RoundRobinBatch(2)", + " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]", + " WindowAggExec: wdw=[MAX(aggregate_test_100.c9): Ok(Field { name: \"MAX(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]", + " WindowAggExec: wdw=[MIN(aggregate_test_100.c9): Ok(Field { name: \"MIN(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]", + " SortExec: [c2@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]" ] }; @@ -1778,7 +1783,7 @@ async fn test_window_partition_by_order_by() -> Result<()> { #[tokio::test] async fn test_window_agg_sort_reversed_plan() -> Result<()> { - let ctx = SessionContext::new(); + let ctx = SessionContext::with_config(SessionConfig::new().with_target_partitions(2)); register_aggregate_csv(&ctx).await?; let sql = "SELECT c9, @@ -1795,10 +1800,11 @@ async fn test_window_agg_sort_reversed_plan() -> Result<()> { let expected = { vec![ "ProjectionExec: expr=[c9@2 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@0 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@1 as sum2]", - " GlobalLimitExec: skip=0, fetch=5", - " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) }]", - " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]", - " SortExec: [c9@0 DESC]", + " RepartitionExec: partitioning=RoundRobinBatch(2)", + " GlobalLimitExec: skip=0, fetch=5", + " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) }]", + " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]", + " SortExec: [c9@0 DESC]", ] }; @@ -1830,7 +1836,7 @@ async fn test_window_agg_sort_reversed_plan() -> Result<()> { #[tokio::test] async fn test_window_agg_sort_reversed_plan_builtin() -> Result<()> { - let ctx = SessionContext::new(); + let ctx = SessionContext::with_config(SessionConfig::new().with_target_partitions(2)); register_aggregate_csv(&ctx).await?; let sql = "SELECT c9, @@ -1851,10 +1857,11 @@ async fn test_window_agg_sort_reversed_plan_builtin() -> Result<()> { let expected = { vec![ "ProjectionExec: expr=[c9@6 as c9, FIRST_VALUE(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@0 as fv1, FIRST_VALUE(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@3 as fv2, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as lag1, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@4 as lag2, LEAD(aggregate_test_100.c9,Int64(2),Int64(10101)) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as lead1, LEAD(aggregate_test_100.c9,Int64(2),Int64(10101)) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@5 as lead2]", - " GlobalLimitExec: skip=0, fetch=5", - " WindowAggExec: wdw=[FIRST_VALUE(aggregate_test_100.c9): Ok(Field { name: \"FIRST_VALUE(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) }, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)): Ok(Field { name: \"LAG(aggregate_test_100.c9,Int64(2),Int64(10101))\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(UInt32(NULL)) }, LEAD(aggregate_test_100.c9,Int64(2),Int64(10101)): Ok(Field { name: \"LEAD(aggregate_test_100.c9,Int64(2),Int64(10101))\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(UInt32(NULL)) }]", - " WindowAggExec: wdw=[FIRST_VALUE(aggregate_test_100.c9): Ok(Field { name: \"FIRST_VALUE(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)): Ok(Field { name: \"LAG(aggregate_test_100.c9,Int64(2),Int64(10101))\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(10)), end_bound: Following(UInt64(1)) }, LEAD(aggregate_test_100.c9,Int64(2),Int64(10101)): Ok(Field { name: \"LEAD(aggregate_test_100.c9,Int64(2),Int64(10101))\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(10)), end_bound: Following(UInt64(1)) }]", - " SortExec: [c9@0 DESC]", + " RepartitionExec: partitioning=RoundRobinBatch(2)", + " GlobalLimitExec: skip=0, fetch=5", + " WindowAggExec: wdw=[FIRST_VALUE(aggregate_test_100.c9): Ok(Field { name: \"FIRST_VALUE(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) }, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)): Ok(Field { name: \"LAG(aggregate_test_100.c9,Int64(2),Int64(10101))\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(UInt32(NULL)) }, LEAD(aggregate_test_100.c9,Int64(2),Int64(10101)): Ok(Field { name: \"LEAD(aggregate_test_100.c9,Int64(2),Int64(10101))\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(UInt32(NULL)) }]", + " WindowAggExec: wdw=[FIRST_VALUE(aggregate_test_100.c9): Ok(Field { name: \"FIRST_VALUE(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)): Ok(Field { name: \"LAG(aggregate_test_100.c9,Int64(2),Int64(10101))\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(10)), end_bound: Following(UInt64(1)) }, LEAD(aggregate_test_100.c9,Int64(2),Int64(10101)): Ok(Field { name: \"LEAD(aggregate_test_100.c9,Int64(2),Int64(10101))\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(10)), end_bound: Following(UInt64(1)) }]", + " SortExec: [c9@0 DESC]", ] }; @@ -1886,7 +1893,7 @@ async fn test_window_agg_sort_reversed_plan_builtin() -> Result<()> { #[tokio::test] async fn test_window_agg_sort_non_reversed_plan() -> Result<()> { - let ctx = SessionContext::new(); + let ctx = SessionContext::with_config(SessionConfig::new().with_target_partitions(2)); register_aggregate_csv(&ctx).await?; let sql = "SELECT c9, @@ -1903,11 +1910,12 @@ async fn test_window_agg_sort_non_reversed_plan() -> Result<()> { let expected = { vec![ "ProjectionExec: expr=[c9@2 as c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@0 as rn1, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@1 as rn2]", - " GlobalLimitExec: skip=0, fetch=5", - " WindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: \"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]", - " SortExec: [c9@1 ASC NULLS LAST]", - " WindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: \"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]", - " SortExec: [c9@0 DESC]", + " RepartitionExec: partitioning=RoundRobinBatch(2)", + " GlobalLimitExec: skip=0, fetch=5", + " WindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: \"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]", + " SortExec: [c9@1 ASC NULLS LAST]", + " WindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: \"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]", + " SortExec: [c9@0 DESC]", ] }; @@ -1939,7 +1947,7 @@ async fn test_window_agg_sort_non_reversed_plan() -> Result<()> { #[tokio::test] async fn test_window_agg_sort_multi_layer_non_reversed_plan() -> Result<()> { - let ctx = SessionContext::new(); + let ctx = SessionContext::with_config(SessionConfig::new().with_target_partitions(2)); register_aggregate_csv(&ctx).await?; let sql = "SELECT c9, @@ -1957,12 +1965,13 @@ async fn test_window_agg_sort_multi_layer_non_reversed_plan() -> Result<()> { let expected = { vec![ "ProjectionExec: expr=[c9@5 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@0 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c1 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@2 as sum2, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@1 as rn2]", - " GlobalLimitExec: skip=0, fetch=5", - " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]", - " SortExec: [c9@4 ASC NULLS LAST,c1@2 ASC NULLS LAST,c2@3 ASC NULLS LAST]", - " WindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: \"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]", - " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]", - " SortExec: [c9@2 DESC,c1@0 DESC]", + " RepartitionExec: partitioning=RoundRobinBatch(2)", + " GlobalLimitExec: skip=0, fetch=5", + " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]", + " SortExec: [c9@4 ASC NULLS LAST,c1@2 ASC NULLS LAST,c2@3 ASC NULLS LAST]", + " WindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: \"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]", + " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]", + " SortExec: [c9@2 DESC,c1@0 DESC]", ] }; @@ -1994,7 +2003,7 @@ async fn test_window_agg_sort_multi_layer_non_reversed_plan() -> Result<()> { #[tokio::test] async fn test_window_agg_complex_plan() -> Result<()> { - let ctx = SessionContext::new(); + let ctx = SessionContext::with_config(SessionConfig::new().with_target_partitions(2)); register_aggregate_null_cases_csv(&ctx).await?; let sql = "SELECT SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING) as a, @@ -2045,18 +2054,19 @@ async fn test_window_agg_complex_plan() -> Result<()> { let expected = { vec![ "ProjectionExec: expr=[SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@0 as a, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@0 as b, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@15 as c, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@7 as d, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@11 as e, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@15 as f, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@7 as g, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as h, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@16 as i, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@8 as j, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@16 as k, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@12 as l, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST, null_cases.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as m, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST, null_cases.c1 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@6 as n, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST, null_cases.c1 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@19 as o, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST, null_cases.c1 ASC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as p, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING@2 as a1, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING@2 as b1, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING@17 as c1, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING@9 as d1, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING@13 as e1, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING@17 as f1, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING@9 as g1, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as h1, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as j1, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@16 as k1, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@8 as l1, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@12 as m1, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@16 as n1, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@8 as o1, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING@3 as h11, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING@3 as j11, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING@18 as k11, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING@10 as l11, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS LAST] RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING@14 as m11, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING@18 as n11, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING@10 as o11]", - " GlobalLimitExec: skip=0, fetch=5", - " WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(10)), end_bound: Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(Int64(NULL)) }]", - " WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]", - " SortExec: [c3@17 ASC NULLS LAST,c2@16 ASC NULLS LAST]", - " WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]", - " SortExec: [c3@16 ASC NULLS LAST,c1@14 ASC]", - " WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(Int64(NULL)) }]", - " WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(11)), end_bound: Following(Int64(10)) }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(Int64(NULL)) }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(11)), end_bound: Following(Int64(NULL)) }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]", - " WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(10)), end_bound: Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(Int64(NULL)) }]", + " RepartitionExec: partitioning=RoundRobinBatch(2)", + " GlobalLimitExec: skip=0, fetch=5", + " WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(10)), end_bound: Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(Int64(NULL)) }]", + " WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]", + " SortExec: [c3@17 ASC NULLS LAST,c2@16 ASC NULLS LAST]", + " WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]", + " SortExec: [c3@16 ASC NULLS LAST,c1@14 ASC]", + " WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(Int64(NULL)) }]", + " WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(11)), end_bound: Following(Int64(10)) }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(Int64(NULL)) }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(11)), end_bound: Following(Int64(NULL)) }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]", " WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(10)), end_bound: Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(Int64(NULL)) }]", - " WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]", - " SortExec: [c3@2 DESC,c1@0 ASC NULLS LAST]", + " WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(10)), end_bound: Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(Int64(NULL)) }]", + " WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]", + " SortExec: [c3@2 DESC,c1@0 ASC NULLS LAST]", ] }; @@ -2074,7 +2084,9 @@ async fn test_window_agg_complex_plan() -> Result<()> { #[tokio::test] async fn test_window_agg_sort_orderby_reversed_partitionby_plan() -> Result<()> { - let config = SessionConfig::new().with_repartition_windows(false); + let config = SessionConfig::new() + .with_repartition_windows(false) + .with_target_partitions(2); let ctx = SessionContext::with_config(config); register_aggregate_csv(&ctx).await?; let sql = "SELECT @@ -2092,10 +2104,11 @@ async fn test_window_agg_sort_orderby_reversed_partitionby_plan() -> Result<()> let expected = { vec![ "ProjectionExec: expr=[c9@3 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@1 as sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@0 as sum2]", - " GlobalLimitExec: skip=0, fetch=5", - " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]", + " RepartitionExec: partitioning=RoundRobinBatch(2)", + " GlobalLimitExec: skip=0, fetch=5", " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]", - " SortExec: [c1@0 ASC NULLS LAST,c9@1 DESC]", + " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]", + " SortExec: [c1@0 ASC NULLS LAST,c9@1 DESC]", ] }; @@ -2127,7 +2140,9 @@ async fn test_window_agg_sort_orderby_reversed_partitionby_plan() -> Result<()> #[tokio::test] async fn test_window_agg_sort_partitionby_reversed_plan() -> Result<()> { - let config = SessionConfig::new().with_repartition_windows(false); + let config = SessionConfig::new() + .with_repartition_windows(false) + .with_target_partitions(2); let ctx = SessionContext::with_config(config); register_aggregate_csv(&ctx).await?; let sql = "SELECT @@ -2145,10 +2160,11 @@ async fn test_window_agg_sort_partitionby_reversed_plan() -> Result<()> { let expected = { vec![ "ProjectionExec: expr=[c9@3 as c9, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@0 as sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@1 as sum2]", - " GlobalLimitExec: skip=0, fetch=5", - " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) }]", - " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]", - " SortExec: [c1@0 ASC NULLS LAST,c9@1 DESC]", + " RepartitionExec: partitioning=RoundRobinBatch(2)", + " GlobalLimitExec: skip=0, fetch=5", + " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) }]", + " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]", + " SortExec: [c1@0 ASC NULLS LAST,c9@1 DESC]", ] }; @@ -2180,7 +2196,9 @@ async fn test_window_agg_sort_partitionby_reversed_plan() -> Result<()> { #[tokio::test] async fn test_window_agg_sort_orderby_reversed_binary_expr() -> Result<()> { - let config = SessionConfig::new().with_repartition_windows(false); + let config = SessionConfig::new() + .with_repartition_windows(false) + .with_target_partitions(2); let ctx = SessionContext::with_config(config); register_aggregate_csv(&ctx).await?; let sql = "SELECT c3, @@ -2197,10 +2215,11 @@ async fn test_window_agg_sort_orderby_reversed_binary_expr() -> Result<()> { let expected = { vec![ "ProjectionExec: expr=[c3@3 as c3, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@0 as sum2]", - " GlobalLimitExec: skip=0, fetch=5", - " WindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: \"SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(Int16(NULL)) }]", - " WindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: \"SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int16(NULL)), end_bound: CurrentRow }]", - " SortExec: [CAST(c3@1 AS Int16) + c4@2 DESC,c9@3 DESC,c2@0 ASC NULLS LAST]", + " RepartitionExec: partitioning=RoundRobinBatch(2)", + " GlobalLimitExec: skip=0, fetch=5", + " WindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: \"SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(Int16(NULL)) }]", + " WindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: \"SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int16(NULL)), end_bound: CurrentRow }]", + " SortExec: [CAST(c3@1 AS Int16) + c4@2 DESC,c9@3 DESC,c2@0 ASC NULLS LAST]", ] }; @@ -2291,7 +2310,9 @@ async fn test_remove_unnecessary_sort_in_sub_query() -> Result<()> { #[tokio::test] async fn test_window_agg_sort_orderby_reversed_partitionby_reversed_plan() -> Result<()> { - let config = SessionConfig::new().with_repartition_windows(false); + let config = SessionConfig::new() + .with_repartition_windows(false) + .with_target_partitions(2); let ctx = SessionContext::with_config(config); register_aggregate_csv(&ctx).await?; let sql = "SELECT c3, @@ -2308,10 +2329,11 @@ async fn test_window_agg_sort_orderby_reversed_partitionby_reversed_plan() -> Re let expected = { vec![ "ProjectionExec: expr=[c3@3 as c3, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c3] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@0 as sum2]", - " GlobalLimitExec: skip=0, fetch=5", - " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]", - " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int8(NULL)), end_bound: CurrentRow }]", - " SortExec: [c3@1 DESC,c9@2 DESC,c2@0 ASC NULLS LAST]", + " RepartitionExec: partitioning=RoundRobinBatch(2)", + " GlobalLimitExec: skip=0, fetch=5", + " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]", + " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int8(NULL)), end_bound: CurrentRow }]", + " SortExec: [c3@1 DESC,c9@2 DESC,c2@0 ASC NULLS LAST]", ] }; diff --git a/datafusion/core/tests/sqllogictests/test_files/information_schema.slt b/datafusion/core/tests/sqllogictests/test_files/information_schema.slt index 3ca74588d54cc..da2db8de64def 100644 --- a/datafusion/core/tests/sqllogictests/test_files/information_schema.slt +++ b/datafusion/core/tests/sqllogictests/test_files/information_schema.slt @@ -120,6 +120,7 @@ datafusion.execution.target_partitions 7 datafusion.execution.time_zone +00:00 datafusion.explain.logical_plan_only false datafusion.explain.physical_plan_only false +datafusion.optimizer.enable_round_robin_repartition true datafusion.optimizer.filter_null_join_keys false datafusion.optimizer.hash_join_single_partition_threshold 1048576 datafusion.optimizer.max_passes 3 diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 039981338b6d3..1c5f086563888 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -56,6 +56,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | then extract the hour. | | datafusion.explain.logical_plan_only | Boolean | false | When set to true, the explain statement will only print logical plans. | | datafusion.explain.physical_plan_only | Boolean | false | When set to true, the explain statement will only print physical plans. | +| datafusion.optimizer.enable_round_robin_repartition | Boolean | true | When set to true, the physical plan optimizer will try to add round robin repartition to increase parallelism to leverage more CPU cores | | datafusion.optimizer.filter_null_join_keys | Boolean | false | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down. | | datafusion.optimizer.hash_join_single_partition_threshold | UInt64 | 1048576 | The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition | | datafusion.optimizer.max_passes | UInt64 | 3 | Number of times that the optimizer will attempt to optimize the plan |