Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion-examples/examples/custom_datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,6 @@ impl ExecutionPlan for CustomExec {
}

fn statistics(&self) -> Statistics {
todo!()
Statistics::default()
}
}
9 changes: 9 additions & 0 deletions datafusion/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ pub const OPT_PREFER_HASH_JOIN: &str = "datafusion.optimizer.prefer_hash_join";
pub const OPT_HASH_JOIN_SINGLE_PARTITION_THRESHOLD: &str =
"datafusion.optimizer.hash_join_single_partition_threshold";

/// Configuration option "datafusion.execution.round_robin_repartition"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

pub const OPT_ENABLE_ROUND_ROBIN_REPARTITION: &str =
"datafusion.optimizer.enable_round_robin_repartition";

/// Definition of a configuration option
pub struct ConfigDefinition {
/// key used to identifier this configuration option
Expand Down Expand Up @@ -409,6 +413,11 @@ impl BuiltInConfigs {
"The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition",
1024 * 1024,
),
ConfigDefinition::new_bool(
OPT_ENABLE_ROUND_ROBIN_REPARTITION,
"When set to true, the physical plan optimizer will try to add round robin repartition to increase parallelism to leverage more CPU cores",
true,
),
]
}
}
Expand Down
58 changes: 43 additions & 15 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ use crate::physical_optimizer::repartition::Repartition;

use crate::config::{
ConfigOptions, OPT_BATCH_SIZE, OPT_COALESCE_BATCHES, OPT_COALESCE_TARGET_BATCH_SIZE,
OPT_FILTER_NULL_JOIN_KEYS, OPT_OPTIMIZER_MAX_PASSES, OPT_OPTIMIZER_SKIP_FAILED_RULES,
OPT_ENABLE_ROUND_ROBIN_REPARTITION, OPT_FILTER_NULL_JOIN_KEYS,
OPT_OPTIMIZER_MAX_PASSES, OPT_OPTIMIZER_SKIP_FAILED_RULES,
};
use crate::execution::{runtime_env::RuntimeEnv, FunctionRegistry};
use crate::physical_optimizer::enforcement::BasicEnforcement;
Expand All @@ -100,6 +101,7 @@ use url::Url;
use crate::catalog::listing_schema::ListingSchemaProvider;
use crate::datasource::object_store::ObjectStoreUrl;
use crate::execution::memory_pool::MemoryPool;
use crate::physical_optimizer::global_sort_selection::GlobalSortSelection;
use crate::physical_optimizer::optimize_sorts::OptimizeSorts;
use uuid::Uuid;

Expand Down Expand Up @@ -1557,11 +1559,47 @@ impl SessionState {
);
}

let mut physical_optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Sync + Send>> = vec![
Arc::new(AggregateStatistics::new()),
Arc::new(JoinSelection::new()),
];
// We need to take care of the rule ordering. They may influence each other.
let mut physical_optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Sync + Send>> =
vec![Arc::new(AggregateStatistics::new())];
// - In order to increase the parallelism, it will change the output partitioning
// of some operators in the plan tree, which will influence other rules.
// Therefore, it should be run as soon as possible.
// - The reason to make it optional is
// - it's not used for the distributed engine, Ballista.
// - it's conflicted with some parts of the BasicEnforcement, since it will
// introduce additional repartitioning while the BasicEnforcement aims at
// reducing unnecessary repartitioning.
if config
.config_options
.get_bool(OPT_ENABLE_ROUND_ROBIN_REPARTITION)
.unwrap_or_default()
{
physical_optimizers.push(Arc::new(Repartition::new()));
}
//- Currently it will depend on the partition number to decide whether to change the
// single node sort to parallel local sort and merge. Therefore, it should be run
// after the Repartition.
// - Since it will change the output ordering of some operators, it should be run
// before JoinSelection and BasicEnforcement, which may depend on that.
physical_optimizers.push(Arc::new(GlobalSortSelection::new()));
// Statistics-base join selection will change the Auto mode to real join implementation,
// like collect left, or hash join, or future sort merge join, which will
// influence the BasicEnforcement to decide whether to add additional repartition
// and local sort to meet the distribution and ordering requirements.
// Therefore, it should be run before BasicEnforcement
physical_optimizers.push(Arc::new(JoinSelection::new()));
// It's for adding essential repartition and local sorting operator to satisfy the
// required distribution and local sort.
// Please make sure that the whole plan tree is determined.
physical_optimizers.push(Arc::new(BasicEnforcement::new()));
// `BasicEnforcement` stage conservatively inserts `SortExec`s to satisfy ordering requirements.
// However, a deeper analysis may sometimes reveal that such a `SortExec` is actually unnecessary.
// These cases typically arise when we have reversible `WindowAggExec`s or deep subqueries. The
// rule below performs this analysis and removes unnecessary `SortExec`s.
physical_optimizers.push(Arc::new(OptimizeSorts::new()));
// It will not influence the distribution and ordering of the whole plan tree.
// Therefore, to avoid influencing other rules, it should be run at last.
if config
.config_options
.get_bool(OPT_COALESCE_BATCHES)
Expand All @@ -1576,16 +1614,6 @@ impl SessionState {
.unwrap(),
)));
}
physical_optimizers.push(Arc::new(Repartition::new()));
// Repartition rule could introduce additional RepartitionExec with RoundRobin partitioning.
// To make sure the SinglePartition is satisfied, run the BasicEnforcement again, originally it was the AddCoalescePartitionsExec here.
physical_optimizers.push(Arc::new(BasicEnforcement::new()));

// `BasicEnforcement` stage conservatively inserts `SortExec`s to satisfy ordering requirements.
// However, a deeper analysis may sometimes reveal that such a `SortExec` is actually unnecessary.
// These cases typically arise when we have reversible `WindowAggExec`s or deep subqueries. The
// rule below performs this analysis and removes unnecessary `SortExec`s.
physical_optimizers.push(Arc::new(OptimizeSorts::new()));

let mut this = SessionState {
session_id,
Expand Down
13 changes: 11 additions & 2 deletions datafusion/core/src/physical_optimizer/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::{
physical_optimizer::PhysicalOptimizerRule,
physical_plan::{
coalesce_batches::CoalesceBatchesExec, filter::FilterExec, joins::HashJoinExec,
repartition::RepartitionExec, rewrite::TreeNodeRewritable,
repartition::RepartitionExec, rewrite::TreeNodeRewritable, Partitioning,
},
};
use std::sync::Arc;
Expand Down Expand Up @@ -57,7 +57,16 @@ impl PhysicalOptimizerRule for CoalesceBatches {
// See https://github.com/apache/arrow-datafusion/issues/139
let wrap_in_coalesce = plan_any.downcast_ref::<FilterExec>().is_some()
|| plan_any.downcast_ref::<HashJoinExec>().is_some()
|| plan_any.downcast_ref::<RepartitionExec>().is_some();
// Don't need to add CoalesceBatchesExec after a round robin RepartitionExec
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 makes sense to me -- to confirm the rationale for this change is that RoundRobinRepartition simply shuffles batches around to different partitions, but doesn't actually change (grow or shrink) the actual RecordBatches

|| plan_any
.downcast_ref::<RepartitionExec>()
.map(|repart_exec| {
!matches!(
repart_exec.partitioning().clone(),
Partitioning::RoundRobinBatch(_)
)
})
.unwrap_or(false);
if wrap_in_coalesce {
Ok(Some(Arc::new(CoalesceBatchesExec::new(
plan.clone(),
Expand Down
33 changes: 1 addition & 32 deletions datafusion/core/src/physical_optimizer/enforcement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ use crate::physical_plan::joins::{
use crate::physical_plan::projection::ProjectionExec;
use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::rewrite::TreeNodeRewritable;
use crate::physical_plan::sorts::sort::{SortExec, SortOptions};
use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use crate::physical_plan::sorts::sort::SortOptions;
use crate::physical_plan::windows::WindowAggExec;
use crate::physical_plan::Partitioning;
use crate::physical_plan::{with_new_children_if_necessary, Distribution, ExecutionPlan};
Expand Down Expand Up @@ -844,36 +843,6 @@ fn ensure_distribution_and_ordering(
if plan.children().is_empty() {
return Ok(plan);
}
// It's mainly for changing the single node global SortExec to
// the SortPreservingMergeExec with multiple local SortExec.
// What's more, if limit exists, it can also be pushed down to the local sort
let plan = plan
.as_any()
.downcast_ref::<SortExec>()
.and_then(|sort_exec| {
// There are three situations that there's no need for this optimization
// - There's only one input partition;
// - It's already preserving the partitioning so that it can be regarded as a local sort
// - There's no limit pushed down to the local sort (It's still controversial)
if sort_exec.input().output_partitioning().partition_count() > 1
&& !sort_exec.preserve_partitioning()
&& sort_exec.fetch().is_some()
{
let sort = SortExec::new_with_partitioning(
sort_exec.expr().to_vec(),
sort_exec.input().clone(),
true,
sort_exec.fetch(),
);
Some(Arc::new(SortPreservingMergeExec::new(
sort_exec.expr().to_vec(),
Arc::new(sort),
)))
} else {
None
}
})
.map_or(plan, |new_plan| new_plan);

let required_input_distributions = plan.required_input_distribution();
let required_input_orderings = plan.required_input_ordering();
Expand Down
89 changes: 89 additions & 0 deletions datafusion/core/src/physical_optimizer/global_sort_selection.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Select the efficient global sort implementation based on sort details.
use std::sync::Arc;

use crate::error::Result;
use crate::physical_optimizer::PhysicalOptimizerRule;
use crate::physical_plan::rewrite::TreeNodeRewritable;
use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use crate::physical_plan::ExecutionPlan;
use crate::prelude::SessionConfig;

/// Currently for a sort operator, if
/// - there are more than one input partitions
/// - and there's some limit which can be pushed down to each of its input partitions
/// then [SortPreservingMergeExec] with local sort with a limit pushed down will be preferred;
/// Otherwise, the normal global sort [SortExec] will be used.
/// Later more intelligent statistics-based decision can also be introduced.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See also #4691

/// For example, for a small data set, the global sort may be efficient enough
#[derive(Default)]
pub struct GlobalSortSelection {}

impl GlobalSortSelection {
#[allow(missing_docs)]
pub fn new() -> Self {
Self {}
}
}

impl PhysicalOptimizerRule for GlobalSortSelection {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
_config: &SessionConfig,
) -> Result<Arc<dyn ExecutionPlan>> {
plan.transform_up(&|plan| {
Ok(plan
.as_any()
.downcast_ref::<SortExec>()
.and_then(|sort_exec| {
if sort_exec.input().output_partitioning().partition_count() > 1
&& sort_exec.fetch().is_some()
// It's already preserving the partitioning so that it can be regarded as a local sort
&& !sort_exec.preserve_partitioning()
{
let sort = SortExec::new_with_partitioning(
sort_exec.expr().to_vec(),
sort_exec.input().clone(),
true,
sort_exec.fetch(),
);
let global_sort: Arc<dyn ExecutionPlan> =
Arc::new(SortPreservingMergeExec::new(
sort_exec.expr().to_vec(),
Arc::new(sort),
));
Some(global_sort)
} else {
None
}
}))
})
}

fn name(&self) -> &str {
"global_sort_selection"
}

fn schema_check(&self) -> bool {
false
}
}
1 change: 1 addition & 0 deletions datafusion/core/src/physical_optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
pub mod aggregate_statistics;
pub mod coalesce_batches;
pub mod enforcement;
pub mod global_sort_selection;
pub mod join_selection;
pub mod optimize_sorts;
pub mod optimizer;
Expand Down
Loading