Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
56db313
Sort Removal rule initial commit
mustafasrepo Dec 13, 2022
343fafb
move ordering satisfy to the util
mustafasrepo Dec 13, 2022
dfb6683
update test and change repartition maintain_input_order impl
mustafasrepo Dec 13, 2022
0a42315
simplifications
mustafasrepo Dec 13, 2022
c2a1593
partition by refactor (#28)
mustafasrepo Dec 15, 2022
bf7bd11
Add naive sort removal rule
mustafasrepo Dec 15, 2022
4cb7258
Add todo for finer Sort removal handling
mustafasrepo Dec 15, 2022
dbc30ab
Merge branch 'apache:master' into feature/sort_removal_rule
mustafasrepo Dec 16, 2022
aa4f739
Refactors to improve readability and reduce nesting
ozankabak Dec 19, 2022
6309b01
reverse expr returns Option (no need for support check)
mustafasrepo Dec 19, 2022
d0d06de
Merge branch 'master' into feature/sort_removal_rule
mustafasrepo Dec 20, 2022
91629b8
fix tests
mustafasrepo Dec 20, 2022
ae451a4
partition by and order by no longer ends up at the same window group
mustafasrepo Dec 20, 2022
0e73945
Refactor to simplify code
ozankabak Dec 21, 2022
4f145dd
Better comments, change method names
ozankabak Dec 21, 2022
6d9a876
Merge branch 'apache:master' into feature/sort_removal_rule
mustafasrepo Dec 22, 2022
6b07621
Resolve errors introduced by syncing
mustafasrepo Dec 22, 2022
28075e6
Merge branch 'feature/sort_removal_rule' of https://github.com/synnad…
mustafasrepo Dec 23, 2022
ba388cb
address reviews
mustafasrepo Dec 23, 2022
572a1a4
address reviews
mustafasrepo Dec 23, 2022
717a68b
Merge branch 'master' into feature/sort_removal_rule
ozankabak Dec 26, 2022
ef92d46
Rename to less confusing OptimizeSorts
ozankabak Dec 26, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions datafusion/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub mod stats;
mod table_reference;
pub mod test_util;

use arrow::compute::SortOptions;
pub use column::Column;
pub use dfschema::{DFField, DFSchema, DFSchemaRef, ExprSchema, ToDFSchema};
pub use error::{field_not_found, DataFusionError, Result, SchemaError};
Expand Down Expand Up @@ -63,3 +64,12 @@ macro_rules! downcast_value {
})?
}};
}

/// Computes the "reverse" of given `SortOptions`.
// TODO: If/when arrow supports `!` for `SortOptions`, we can remove this.
pub fn reverse_sort_options(options: SortOptions) -> SortOptions {
SortOptions {
descending: !options.descending,
nulls_first: !options.nulls_first,
}
}
7 changes: 7 additions & 0 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ use url::Url;
use crate::catalog::listing_schema::ListingSchemaProvider;
use crate::datasource::object_store::ObjectStoreUrl;
use crate::execution::memory_pool::MemoryPool;
use crate::physical_optimizer::optimize_sorts::OptimizeSorts;
use uuid::Uuid;

use super::options::{
Expand Down Expand Up @@ -1580,6 +1581,12 @@ impl SessionState {
// To make sure the SinglePartition is satisfied, run the BasicEnforcement again, originally it was the AddCoalescePartitionsExec here.
physical_optimizers.push(Arc::new(BasicEnforcement::new()));

// `BasicEnforcement` stage conservatively inserts `SortExec`s to satisfy ordering requirements.
// However, a deeper analysis may sometimes reveal that such a `SortExec` is actually unnecessary.
// These cases typically arise when we have reversible `WindowAggExec`s or deep subqueries. The
// rule below performs this analysis and removes unnecessary `SortExec`s.
physical_optimizers.push(Arc::new(OptimizeSorts::new()));

let mut this = SessionState {
session_id,
optimizer: Optimizer::new(),
Expand Down
76 changes: 9 additions & 67 deletions datafusion/core/src/physical_optimizer/enforcement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
//!
use crate::config::OPT_TOP_DOWN_JOIN_KEY_REORDERING;
use crate::error::Result;
use crate::physical_optimizer::utils::{add_sort_above_child, ordering_satisfy};
use crate::physical_optimizer::PhysicalOptimizerRule;
use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy};
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
Expand All @@ -29,8 +30,7 @@ use crate::physical_plan::joins::{
use crate::physical_plan::projection::ProjectionExec;
use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::rewrite::TreeNodeRewritable;
use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::sorts::sort::SortOptions;
use crate::physical_plan::sorts::sort::{SortExec, SortOptions};
use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use crate::physical_plan::windows::WindowAggExec;
use crate::physical_plan::Partitioning;
Expand All @@ -42,9 +42,8 @@ use datafusion_physical_expr::equivalence::EquivalenceProperties;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::expressions::NoOp;
use datafusion_physical_expr::{
expr_list_eq_strict_order, normalize_expr_with_equivalence_properties,
normalize_sort_expr_with_equivalence_properties, AggregateExpr, PhysicalExpr,
PhysicalSortExpr,
expr_list_eq_strict_order, normalize_expr_with_equivalence_properties, AggregateExpr,
PhysicalExpr,
};
use std::collections::HashMap;
use std::sync::Arc;
Expand Down Expand Up @@ -919,71 +918,14 @@ fn ensure_distribution_and_ordering(
Ok(child)
} else {
let sort_expr = required.unwrap().to_vec();
Ok(Arc::new(SortExec::new_with_partitioning(
sort_expr, child, true, None,
)) as Arc<dyn ExecutionPlan>)
add_sort_above_child(&child, sort_expr)
}
})
.collect();

with_new_children_if_necessary(plan, new_children?)
}

/// Check the required ordering requirements are satisfied by the provided PhysicalSortExprs.
fn ordering_satisfy<F: FnOnce() -> EquivalenceProperties>(
provided: Option<&[PhysicalSortExpr]>,
required: Option<&[PhysicalSortExpr]>,
equal_properties: F,
) -> bool {
match (provided, required) {
(_, None) => true,
(None, Some(_)) => false,
(Some(provided), Some(required)) => {
if required.len() > provided.len() {
false
} else {
let fast_match = required
.iter()
.zip(provided.iter())
.all(|(order1, order2)| order1.eq(order2));

if !fast_match {
let eq_properties = equal_properties();
let eq_classes = eq_properties.classes();
if !eq_classes.is_empty() {
let normalized_required_exprs = required
.iter()
.map(|e| {
normalize_sort_expr_with_equivalence_properties(
e.clone(),
eq_classes,
)
})
.collect::<Vec<_>>();
let normalized_provided_exprs = provided
.iter()
.map(|e| {
normalize_sort_expr_with_equivalence_properties(
e.clone(),
eq_classes,
)
})
.collect::<Vec<_>>();
normalized_required_exprs
.iter()
.zip(normalized_provided_exprs.iter())
.all(|(order1, order2)| order1.eq(order2))
} else {
fast_match
}
} else {
fast_match
}
}
}
}
}

#[derive(Debug, Clone)]
struct JoinKeyPairs {
left_keys: Vec<Arc<dyn PhysicalExpr>>,
Expand Down Expand Up @@ -1063,10 +1005,10 @@ mod tests {
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_expr::logical_plan::JoinType;
use datafusion_expr::Operator;
use datafusion_physical_expr::expressions::binary;
use datafusion_physical_expr::expressions::lit;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::{expressions, PhysicalExpr};
use datafusion_physical_expr::{
expressions, expressions::binary, expressions::lit, expressions::Column,
PhysicalExpr, PhysicalSortExpr,
};
use std::ops::Deref;

use super::*;
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/physical_optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub mod aggregate_statistics;
pub mod coalesce_batches;
pub mod enforcement;
pub mod join_selection;
pub mod optimize_sorts;
pub mod optimizer;
pub mod pruning;
pub mod repartition;
Expand Down
Loading