Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 19 additions & 20 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ use crate::physical_optimizer::repartition::Repartition;

use crate::config::ConfigOptions;
use crate::execution::{runtime_env::RuntimeEnv, FunctionRegistry};
use crate::physical_optimizer::enforcement::BasicEnforcement;
use crate::physical_optimizer::dist_enforcement::EnforceDistribution;
use crate::physical_plan::file_format::{plan_to_csv, plan_to_json, plan_to_parquet};
use crate::physical_plan::planner::DefaultPhysicalPlanner;
use crate::physical_plan::udaf::AggregateUDF;
Expand All @@ -90,9 +90,9 @@ use crate::catalog::listing_schema::ListingSchemaProvider;
use crate::datasource::object_store::ObjectStoreUrl;
use crate::execution::memory_pool::MemoryPool;
use crate::physical_optimizer::global_sort_selection::GlobalSortSelection;
use crate::physical_optimizer::optimize_sorts::OptimizeSorts;
use crate::physical_optimizer::pipeline_checker::PipelineChecker;
use crate::physical_optimizer::pipeline_fixer::PipelineFixer;
use crate::physical_optimizer::sort_enforcement::EnforceSorting;
use datafusion_optimizer::OptimizerConfig;
use uuid::Uuid;

Expand Down Expand Up @@ -1457,37 +1457,36 @@ impl SessionState {
// output partitioning of some operators in the plan tree, which will influence
// other rules. Therefore, it should run as soon as possible. It is optional because:
// - It's not used for the distributed engine, Ballista.
// - It's conflicted with some parts of the BasicEnforcement, since it will
// introduce additional repartitioning while the BasicEnforcement aims at
// reducing unnecessary repartitioning.
// - It's conflicted with some parts of the EnforceDistribution, since it will
// introduce additional repartitioning while EnforceDistribution aims to
// reduce unnecessary repartitioning.
Arc::new(Repartition::new()),
// - Currently it will depend on the partition number to decide whether to change the
// single node sort to parallel local sort and merge. Therefore, GlobalSortSelection
// should run after the Repartition.
// - Since it will change the output ordering of some operators, it should run
// before JoinSelection and BasicEnforcement, which may depend on that.
// before JoinSelection and EnforceSorting, which may depend on that.
Arc::new(GlobalSortSelection::new()),
// Statistics-based join selection will change the Auto mode to a real join implementation,
// like collect left, or hash join, or future sort merge join, which will
// influence the BasicEnforcement to decide whether to add additional repartition
// and local sort to meet the distribution and ordering requirements.
// Therefore, it should run before BasicEnforcement.
// like collect left, or hash join, or future sort merge join, which will influence the
// EnforceDistribution and EnforceSorting rules as they decide whether to add additional
// repartitioning and local sorting steps to meet distribution and ordering requirements.
// Therefore, it should run before EnforceDistribution and EnforceSorting.
Arc::new(JoinSelection::new()),
// If the query is processing infinite inputs, the PipelineFixer rule applies the
// necessary transformations to make the query runnable (if it is not already runnable).
// If the query can not be made runnable, the rule emits an error with a diagnostic message.
// Since the transformations it applies may alter output partitioning properties of operators
// (e.g. by swapping hash join sides), this rule runs before BasicEnforcement.
// (e.g. by swapping hash join sides), this rule runs before EnforceDistribution.
Arc::new(PipelineFixer::new()),
// BasicEnforcement is for adding essential repartition and local sorting operators
// to satisfy the required distribution and local sort requirements.
// Please make sure that the whole plan tree is determined.
Arc::new(BasicEnforcement::new()),
// The BasicEnforcement stage conservatively inserts sorts to satisfy ordering requirements.
// However, a deeper analysis may sometimes reveal that such a sort is actually unnecessary.
// These cases typically arise when we have reversible window expressions or deep subqueries.
// The rule below performs this analysis and removes unnecessary sorts.
Arc::new(OptimizeSorts::new()),
// The EnforceDistribution rule is for adding essential repartition to satisfy the required
// distribution. Please make sure that the whole plan tree is determined before this rule.
Arc::new(EnforceDistribution::new()),
// The EnforceSorting rule is for adding essential local sorting to satisfy the required
// ordering. Please make sure that the whole plan tree is determined before this rule.
// Note that one should always run this rule after running the EnforceDistribution rule
// as the latter may break local sorting requirements.
Arc::new(EnforceSorting::new()),
// The CoalesceBatches rule will not influence the distribution and ordering of the
// whole plan tree. Therefore, to avoid influencing other rules, it should run last.
Arc::new(CoalesceBatches::new()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@
// specific language governing permissions and limitations
// under the License.

//! Enforcement optimizer rules are used to make sure the plan's Distribution and Ordering
//! requirements are met by inserting necessary [[RepartitionExec]] and [[SortExec]].
//!
//! EnforceDistribution optimizer rule inspects the physical plan with respect
//! to distribution requirements and adds [RepartitionExec]s to satisfy them
//! when necessary.
use crate::config::ConfigOptions;
use crate::error::Result;
use crate::physical_optimizer::utils::{add_sort_above_child, ordering_satisfy};
use crate::physical_optimizer::PhysicalOptimizerRule;
use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy};
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
Expand All @@ -46,25 +45,25 @@ use datafusion_physical_expr::{
use std::collections::HashMap;
use std::sync::Arc;

/// BasicEnforcement rule, it ensures the Distribution and Ordering requirements are met
/// in the strictest way. It might add additional [[RepartitionExec]] to the plan tree
/// The EnforceDistribution rule ensures that distribution requirements are met
/// in the strictest way. It might add additional [RepartitionExec] to the plan tree
/// and give a non-optimal plan, but it can avoid the possible data skew in joins.
///
/// For example for a HashJoin with keys(a, b, c), the required Distribution(a, b, c) can be satisfied by
/// several alternative partitioning ways: [(a, b, c), (a, b), (a, c), (b, c), (a), (b), (c), ( )].
///
/// This rule only chooses the exactly match and satisfies the Distribution(a, b, c) by a HashPartition(a, b, c).
#[derive(Default)]
pub struct BasicEnforcement {}
pub struct EnforceDistribution {}

impl BasicEnforcement {
impl EnforceDistribution {
#[allow(missing_docs)]
pub fn new() -> Self {
Self {}
}
}

impl PhysicalOptimizerRule for BasicEnforcement {
impl PhysicalOptimizerRule for EnforceDistribution {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
Expand All @@ -81,24 +80,21 @@ impl PhysicalOptimizerRule for BasicEnforcement {
} else {
plan
};
// Distribution and Ordering enforcement need to be applied bottom-up.
// Distribution enforcement needs to be applied bottom-up.
new_plan.transform_up(&{
|plan| {
let adjusted = if !top_down_join_key_reordering {
reorder_join_keys_to_inputs(plan)?
} else {
plan
};
Ok(Some(ensure_distribution_and_ordering(
adjusted,
target_partitions,
)?))
Ok(Some(ensure_distribution(adjusted, target_partitions)?))
}
})
}

fn name(&self) -> &str {
"BasicEnforcement"
"EnforceDistribution"
}

fn schema_check(&self) -> bool {
Expand Down Expand Up @@ -829,10 +825,11 @@ fn new_join_conditions(
new_join_on
}

/// Within this function, it checks whether we need to add additional plan operators
/// of data exchanging and data ordering to satisfy the required distribution and ordering.
/// And we should avoid to manually add plan operators of data exchanging and data ordering in other places
fn ensure_distribution_and_ordering(
/// This function checks whether we need to add additional data exchange
/// operators to satisfy distribution requirements. Since this function
/// takes care of such requirements, we should avoid manually adding data
/// exchange operators in other places.
fn ensure_distribution(
plan: Arc<dyn crate::physical_plan::ExecutionPlan>,
target_partitions: usize,
) -> Result<Arc<dyn crate::physical_plan::ExecutionPlan>> {
Expand All @@ -841,13 +838,11 @@ fn ensure_distribution_and_ordering(
}

let required_input_distributions = plan.required_input_distribution();
let required_input_orderings = plan.required_input_ordering();
let children: Vec<Arc<dyn ExecutionPlan>> = plan.children();
assert_eq!(children.len(), required_input_distributions.len());
assert_eq!(children.len(), required_input_orderings.len());

// Add RepartitionExec to guarantee output partitioning
let children = children
let new_children: Result<Vec<Arc<dyn ExecutionPlan>>> = children
.into_iter()
.zip(required_input_distributions.into_iter())
.map(|(child, required)| {
Expand All @@ -870,24 +865,8 @@ fn ensure_distribution_and_ordering(
};
new_child
}
});

// Add local SortExec to guarantee output ordering within each partition
let new_children: Result<Vec<Arc<dyn ExecutionPlan>>> = children
.zip(required_input_orderings.into_iter())
.map(|(child_result, required)| {
let child = child_result?;
if ordering_satisfy(child.output_ordering(), required, || {
child.equivalence_properties()
}) {
Ok(child)
} else {
let sort_expr = required.unwrap().to_vec();
add_sort_above_child(&child, sort_expr)
}
})
.collect();

with_new_children_if_necessary(plan, new_children?)
}

Expand Down Expand Up @@ -979,6 +958,7 @@ mod tests {
use super::*;
use crate::datasource::listing::PartitionedFile;
use crate::datasource::object_store::ObjectStoreUrl;
use crate::physical_optimizer::sort_enforcement::EnforceSorting;
use crate::physical_plan::aggregates::{
AggregateExec, AggregateMode, PhysicalGroupBy,
};
Expand Down Expand Up @@ -1136,8 +1116,15 @@ mod tests {
config.execution.target_partitions = 10;

// run optimizer
let optimizer = BasicEnforcement {};
let optimizer = EnforceDistribution {};
let optimized = optimizer.optimize($PLAN, &config)?;
// NOTE: These tests verify the joint `EnforceDistribution` + `EnforceSorting` cascade
// because they were written prior to the separation of `BasicEnforcement` into
// `EnforceSorting` and `EnfoceDistribution`.
// TODO: Orthogonalize the tests here just to verify `EnforceDistribution` and create
// new tests for the cascade.
let optimizer = EnforceSorting {};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is slightly confusing that the tests in EnforceDistribution also rely on EnforceSorting though I understand the reason for this given they both started in the same pass

Maybe we can add a comment like:

// These tests also ensure  `EnforceSorting` because they were written prior to the
// separation of `EnforceSorting` and `EnfoceDistribution`

Or something like that to give future readers a clue about the rationale for this seemingly strange choice

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree -- just sent a commit with a NOTE and a TODO.

let optimized = optimizer.optimize(optimized, &config)?;

// Now format correctly
let plan = displayable(optimized.as_ref()).indent().to_string();
Expand Down Expand Up @@ -1656,7 +1643,7 @@ mod tests {
Column::new_with_schema("c1", &right.schema()).unwrap(),
),
];
let bottom_left_join = ensure_distribution_and_ordering(
let bottom_left_join = ensure_distribution(
hash_join_exec(left.clone(), right.clone(), &join_on, &JoinType::Inner),
10,
)?;
Expand Down Expand Up @@ -1686,7 +1673,7 @@ mod tests {
Column::new_with_schema("a1", &right.schema()).unwrap(),
),
];
let bottom_right_join = ensure_distribution_and_ordering(
let bottom_right_join = ensure_distribution(
hash_join_exec(left, right.clone(), &join_on, &JoinType::Inner),
10,
)?;
Expand Down Expand Up @@ -1775,7 +1762,7 @@ mod tests {
Column::new_with_schema("b1", &right.schema()).unwrap(),
),
];
let bottom_left_join = ensure_distribution_and_ordering(
let bottom_left_join = ensure_distribution(
hash_join_exec(left.clone(), right.clone(), &join_on, &JoinType::Inner),
10,
)?;
Expand Down Expand Up @@ -1805,7 +1792,7 @@ mod tests {
Column::new_with_schema("a1", &right.schema()).unwrap(),
),
];
let bottom_right_join = ensure_distribution_and_ordering(
let bottom_right_join = ensure_distribution(
hash_join_exec(left, right.clone(), &join_on, &JoinType::Inner),
10,
)?;
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/physical_optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@

pub mod aggregate_statistics;
pub mod coalesce_batches;
pub mod enforcement;
pub mod dist_enforcement;
pub mod global_sort_selection;
pub mod join_selection;
pub mod optimize_sorts;
pub mod optimizer;
pub mod pipeline_checker;
pub mod pruning;
pub mod repartition;
pub mod sort_enforcement;
mod utils;

pub mod pipeline_fixer;
Expand Down
10 changes: 7 additions & 3 deletions datafusion/core/src/physical_optimizer/repartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,8 @@ mod tests {
use super::*;
use crate::datasource::listing::PartitionedFile;
use crate::datasource::object_store::ObjectStoreUrl;
use crate::physical_optimizer::enforcement::BasicEnforcement;
use crate::physical_optimizer::dist_enforcement::EnforceDistribution;
use crate::physical_optimizer::sort_enforcement::EnforceSorting;
use crate::physical_plan::aggregates::{
AggregateExec, AggregateMode, PhysicalGroupBy,
};
Expand Down Expand Up @@ -370,9 +371,12 @@ mod tests {
// run optimizer
let optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Sync + Send>> = vec![
Arc::new(Repartition::new()),
// The `BasicEnforcement` is an essential rule to be applied.
// EnforceDistribution is an essential rule to be applied.
// Otherwise, the correctness of the generated optimized plan cannot be guaranteed
Arc::new(BasicEnforcement::new()),
Arc::new(EnforceDistribution::new()),
// EnforceSorting is an essential rule to be applied.
// Otherwise, the correctness of the generated optimized plan cannot be guaranteed
Arc::new(EnforceSorting::new()),
];
let optimized = optimizers.into_iter().fold($PLAN, |plan, optimizer| {
optimizer.optimize(plan, &config).unwrap()
Expand Down
Loading