Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
65 commits
Select commit Hold shift + click to select a range
56db313
Sort Removal rule initial commit
mustafasrepo Dec 13, 2022
343fafb
move ordering satisfy to the util
mustafasrepo Dec 13, 2022
dfb6683
update test and change repartition maintain_input_order impl
mustafasrepo Dec 13, 2022
0a42315
simplifications
mustafasrepo Dec 13, 2022
c2a1593
partition by refactor (#28)
mustafasrepo Dec 15, 2022
bf7bd11
Add naive sort removal rule
mustafasrepo Dec 15, 2022
4cb7258
Add todo for finer Sort removal handling
mustafasrepo Dec 15, 2022
dbc30ab
Merge branch 'apache:master' into feature/sort_removal_rule
mustafasrepo Dec 16, 2022
aa4f739
Refactors to improve readability and reduce nesting
ozankabak Dec 19, 2022
6309b01
reverse expr returns Option (no need for support check)
mustafasrepo Dec 19, 2022
d0d06de
Merge branch 'master' into feature/sort_removal_rule
mustafasrepo Dec 20, 2022
91629b8
fix tests
mustafasrepo Dec 20, 2022
ae451a4
partition by and order by no longer ends up at the same window group
mustafasrepo Dec 20, 2022
94c784b
Bounded window exec
mustafasrepo Dec 15, 2022
7c4bcb9
Merge branch 'feature/sort_removal_rule' into feature/bounded_window_…
mustafasrepo Dec 20, 2022
0068566
solve merge problems
mustafasrepo Dec 20, 2022
0e73945
Refactor to simplify code
ozankabak Dec 21, 2022
4f145dd
Better comments, change method names
ozankabak Dec 21, 2022
c63057f
Merge branch 'feature/sort_removal_rule' into feature/bounded_window_…
mustafasrepo Dec 21, 2022
838972c
resolve merge conflicts
mustafasrepo Dec 21, 2022
6d9a876
Merge branch 'apache:master' into feature/sort_removal_rule
mustafasrepo Dec 22, 2022
f2c7286
Merge branch 'apache:master' into feature/bounded_window_exec
mustafasrepo Dec 22, 2022
6b07621
Resolve errors introduced by syncing
mustafasrepo Dec 22, 2022
d62bbdc
Merge branch 'feature/sort_removal_rule' into feature/bounded_window_…
mustafasrepo Dec 22, 2022
a2d2229
remove set_state, make ntile debuggable
mustafasrepo Dec 22, 2022
63d77a6
remove locked flag
mustafasrepo Dec 22, 2022
28075e6
Merge branch 'feature/sort_removal_rule' of https://github.com/synnad…
mustafasrepo Dec 23, 2022
ba388cb
address reviews
mustafasrepo Dec 23, 2022
572a1a4
address reviews
mustafasrepo Dec 23, 2022
fa30d91
Merge branch 'feature/sort_removal_rule' into feature/bounded_window_…
mustafasrepo Dec 23, 2022
af60aa9
Resolve merge conflict
mustafasrepo Dec 23, 2022
ca711e4
address reviews
mustafasrepo Dec 23, 2022
eb97a5c
address reviews
mustafasrepo Dec 23, 2022
36394c0
address reviews
mustafasrepo Dec 26, 2022
8b3d37f
Add new tests
mustafasrepo Dec 26, 2022
25af93c
Update tests
mustafasrepo Dec 26, 2022
2b2b376
add support for bounded min max
mustafasrepo Dec 26, 2022
670fe32
address reviews
mustafasrepo Dec 26, 2022
3ea9eed
rename sort rule
mustafasrepo Dec 27, 2022
3892394
Merge branch 'master' into feature/bounded_window_exec
mustafasrepo Dec 27, 2022
ca666e9
Resolve merge conflicts
mustafasrepo Dec 27, 2022
73d99c6
refactors
mustafasrepo Dec 27, 2022
09c1942
Update fuzzy tests + minor changes
mustafasrepo Dec 27, 2022
39564d4
Simplify code and improve comments
ozankabak Dec 29, 2022
9f73ba7
Merge branch 'master' into feature/bounded_window_exec
ozankabak Dec 29, 2022
8ac3847
Fix imports, make create_schema more functional
ozankabak Dec 29, 2022
3349edf
address reviews
mustafasrepo Dec 29, 2022
701c43e
undo yml change
mustafasrepo Dec 29, 2022
3b523b4
minor change to pass from CI
mustafasrepo Dec 29, 2022
28773ab
Merge branch 'master' into feature/bounded_window_exec
mustafasrepo Dec 29, 2022
15d416a
resolve merge conflicts
mustafasrepo Dec 29, 2022
9ceb137
rename some members
mustafasrepo Dec 29, 2022
8b9aa6f
Move rule to physical planning
mustafasrepo Dec 30, 2022
e13d6e0
Minor stylistic/comment changes
ozankabak Dec 30, 2022
93b8d80
Merge branch 'master' into feature/bounded_window_exec
ozankabak Dec 30, 2022
d97a1ad
Simplify batch-merging utility functions
ozankabak Dec 31, 2022
29007ea
Remove unnecessary clones, simplify code
ozankabak Jan 2, 2023
a5019c3
Merge branch 'master' into feature/bounded_window_exec
mustafasrepo Jan 3, 2023
ac2f248
update cargo lock file
mustafasrepo Jan 3, 2023
0ca3889
address reviews
mustafasrepo Jan 4, 2023
516e512
Merge branch 'master' into feature/bounded_window_exec
mustafasrepo Jan 4, 2023
1e764dd
update comments
mustafasrepo Jan 4, 2023
28d68bb
resolve linter error
mustafasrepo Jan 4, 2023
c4b61c5
Tidy up comments after final review
ozankabak Jan 4, 2023
8463418
Minor: add link to upstream ticket
alamb Jan 4, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ jobs:
- name: Check Cargo.lock for datafusion-cli
run: |
# If this test fails, try running `cargo update` in the `datafusion-cli` directory
# and check in the updated Cargo.lock file.
cargo check --manifest-path datafusion-cli/Cargo.toml --locked

# test the crate
Expand Down
2 changes: 2 additions & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ flate2 = { version = "1.0.24", optional = true }
futures = "0.3"
glob = "0.3.0"
hashbrown = { version = "0.13", features = ["raw"] }
indexmap = "1.9.2"
itertools = "0.10"
lazy_static = { version = "^1.4.0" }
log = "^0.4"
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/catalog/listing_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ mod tests {
path: Path::new("/file"),
is_dir: true,
};
assert!(table_path.to_string().expect("table path").ends_with("/"));
assert!(table_path.to_string().expect("table path").ends_with('/'));
}

#[test]
Expand All @@ -246,6 +246,6 @@ mod tests {
path: Path::new("/file"),
is_dir: false,
};
assert!(!table_path.to_string().expect("table_path").ends_with("/"));
assert!(!table_path.to_string().expect("table_path").ends_with('/'));
}
}
43 changes: 21 additions & 22 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1453,44 +1453,43 @@ impl SessionState {
// We need to take care of the rule ordering. They may influence each other.
let physical_optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Sync + Send>> = vec![
Arc::new(AggregateStatistics::new()),
// - In order to increase the parallelism, it will change the output partitioning
// of some operators in the plan tree, which will influence other rules.
// Therefore, it should be run as soon as possible.
// - The reason to make it optional is
// - it's not used for the distributed engine, Ballista.
// - it's conflicted with some parts of the BasicEnforcement, since it will
// introduce additional repartitioning while the BasicEnforcement aims at
// reducing unnecessary repartitioning.
// In order to increase the parallelism, the Repartition rule will change the
// output partitioning of some operators in the plan tree, which will influence
// other rules. Therefore, it should run as soon as possible. It is optional because:
// - It's not used for the distributed engine, Ballista.
// - It's conflicted with some parts of the BasicEnforcement, since it will
// introduce additional repartitioning while the BasicEnforcement aims at
// reducing unnecessary repartitioning.
Arc::new(Repartition::new()),
//- Currently it will depend on the partition number to decide whether to change the
// single node sort to parallel local sort and merge. Therefore, it should be run
// after the Repartition.
// - Since it will change the output ordering of some operators, it should be run
// - Currently it will depend on the partition number to decide whether to change the
// single node sort to parallel local sort and merge. Therefore, GlobalSortSelection
// should run after the Repartition.
// - Since it will change the output ordering of some operators, it should run
// before JoinSelection and BasicEnforcement, which may depend on that.
Arc::new(GlobalSortSelection::new()),
// Statistics-base join selection will change the Auto mode to real join implementation,
// Statistics-based join selection will change the Auto mode to a real join implementation,
// like collect left, or hash join, or future sort merge join, which will
// influence the BasicEnforcement to decide whether to add additional repartition
// and local sort to meet the distribution and ordering requirements.
// Therefore, it should be run before BasicEnforcement
// Therefore, it should run before BasicEnforcement.
Arc::new(JoinSelection::new()),
// If the query is processing infinite inputs, the PipelineFixer rule applies the
// necessary transformations to make the query runnable (if it is not already runnable).
// If the query can not be made runnable, the rule emits an error with a diagnostic message.
// Since the transformations it applies may alter output partitioning properties of operators
// (e.g. by swapping hash join sides), this rule runs before BasicEnforcement.
Arc::new(PipelineFixer::new()),
// It's for adding essential repartition and local sorting operator to satisfy the
// required distribution and local sort.
// BasicEnforcement is for adding essential repartition and local sorting operators
// to satisfy the required distribution and local sort requirements.
// Please make sure that the whole plan tree is determined.
Arc::new(BasicEnforcement::new()),
// `BasicEnforcement` stage conservatively inserts `SortExec`s to satisfy ordering requirements.
// However, a deeper analysis may sometimes reveal that such a `SortExec` is actually unnecessary.
// These cases typically arise when we have reversible `WindowAggExec`s or deep subqueries. The
// rule below performs this analysis and removes unnecessary `SortExec`s.
// The BasicEnforcement stage conservatively inserts sorts to satisfy ordering requirements.
// However, a deeper analysis may sometimes reveal that such a sort is actually unnecessary.
// These cases typically arise when we have reversible window expressions or deep subqueries.
// The rule below performs this analysis and removes unnecessary sorts.
Arc::new(OptimizeSorts::new()),
// It will not influence the distribution and ordering of the whole plan tree.
// Therefore, to avoid influencing other rules, it should be run at last.
// The CoalesceBatches rule will not influence the distribution and ordering of the
// whole plan tree. Therefore, to avoid influencing other rules, it should run last.
Arc::new(CoalesceBatches::new()),
// The PipelineChecker rule will reject non-runnable query plans that use
// pipeline-breaking operators on infinite input(s). The rule generates a
Expand Down
64 changes: 47 additions & 17 deletions datafusion/core/src/physical_optimizer/optimize_sorts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@ use crate::physical_optimizer::utils::{
use crate::physical_optimizer::PhysicalOptimizerRule;
use crate::physical_plan::rewrite::TreeNodeRewritable;
use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::windows::WindowAggExec;
use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
use arrow::datatypes::SchemaRef;
use datafusion_common::{reverse_sort_options, DataFusionError};
use datafusion_physical_expr::window::WindowExpr;
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
use itertools::izip;
use std::iter::zip;
Expand Down Expand Up @@ -181,17 +182,32 @@ fn optimize_sorts(
sort_exec.input().equivalence_properties()
}) {
update_child_to_remove_unnecessary_sort(child, sort_onwards)?;
} else if let Some(window_agg_exec) =
}
// For window expressions, we can remove some sorts when we can
// calculate the result in reverse:
else if let Some(exec) =
requirements.plan.as_any().downcast_ref::<WindowAggExec>()
{
// For window expressions, we can remove some sorts when we can
// calculate the result in reverse:
if let Some(res) = analyze_window_sort_removal(
window_agg_exec,
if let Some(result) = analyze_window_sort_removal(
exec.window_expr(),
&exec.partition_keys,
sort_exec,
sort_onwards,
)? {
return Ok(Some(result));
}
} else if let Some(exec) = requirements
.plan
.as_any()
.downcast_ref::<BoundedWindowAggExec>()
{
if let Some(result) = analyze_window_sort_removal(
exec.window_expr(),
&exec.partition_keys,
sort_exec,
sort_onwards,
)? {
return Ok(Some(res));
return Ok(Some(result));
}
}
// TODO: Once we can ensure that required ordering information propagates with
Expand Down Expand Up @@ -273,9 +289,11 @@ fn analyze_immediate_sort_removal(
Ok(None)
}

/// Analyzes a `WindowAggExec` to determine whether it may allow removing a sort.
/// Analyzes a [WindowAggExec] or a [BoundedWindowAggExec] to determine whether
/// it may allow removing a sort.
fn analyze_window_sort_removal(
window_agg_exec: &WindowAggExec,
window_expr: &[Arc<dyn WindowExpr>],
partition_keys: &[Arc<dyn PhysicalExpr>],
sort_exec: &SortExec,
sort_onward: &mut Vec<(usize, Arc<dyn ExecutionPlan>)>,
) -> Result<Option<PlanWithCorrespondingSort>> {
Expand All @@ -289,7 +307,6 @@ fn analyze_window_sort_removal(
// If there is no physical ordering, there is no way to remove a sort -- immediately return:
return Ok(None);
};
let window_expr = window_agg_exec.window_expr();
let (can_skip_sorting, should_reverse) = can_skip_sort(
window_expr[0].partition_by(),
required_ordering,
Expand All @@ -308,13 +325,26 @@ fn analyze_window_sort_removal(
if let Some(window_expr) = new_window_expr {
let new_child = remove_corresponding_sort_from_sub_plan(sort_onward)?;
let new_schema = new_child.schema();
let new_plan = Arc::new(WindowAggExec::try_new(
window_expr,
new_child,
new_schema,
window_agg_exec.partition_keys.clone(),
Some(physical_ordering.to_vec()),
)?);

let uses_bounded_memory = window_expr.iter().all(|e| e.uses_bounded_memory());
// If all window exprs can run with bounded memory choose bounded window variant
let new_plan = if uses_bounded_memory {
Arc::new(BoundedWindowAggExec::try_new(
window_expr,
new_child,
new_schema,
partition_keys.to_vec(),
Some(physical_ordering.to_vec()),
)?) as _
} else {
Arc::new(WindowAggExec::try_new(
window_expr,
new_child,
new_schema,
partition_keys.to_vec(),
Some(physical_ordering.to_vec()),
)?) as _
};
return Ok(Some(PlanWithCorrespondingSort::new(new_plan)));
}
}
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/physical_optimizer/pipeline_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ mod sql_tests {
let case = QueryCase {
sql: "SELECT
c9,
SUM(c9) OVER(PARTITION BY c1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum1
SUM(c9) OVER(PARTITION BY c1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING) as sum1
FROM test
LIMIT 5".to_string(),
cases: vec![Arc::new(test1), Arc::new(test2)],
Expand All @@ -325,7 +325,7 @@ mod sql_tests {
let case = QueryCase {
sql: "SELECT
c9,
SUM(c9) OVER(ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum1
SUM(c9) OVER(ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING) as sum1
FROM test".to_string(),
cases: vec![Arc::new(test1), Arc::new(test2)],
error_operator: "Window Error".to_string()
Expand Down
46 changes: 46 additions & 0 deletions datafusion/core/src/physical_plan/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::error::{DataFusionError, Result};
use crate::execution::context::TaskContext;
use crate::physical_plan::metrics::MemTrackingMetrics;
use crate::physical_plan::{displayable, ColumnStatistics, ExecutionPlan, Statistics};
use arrow::compute::concat;
use arrow::datatypes::{Schema, SchemaRef};
use arrow::error::ArrowError;
use arrow::error::Result as ArrowResult;
Expand Down Expand Up @@ -95,6 +96,51 @@ pub async fn collect(stream: SendableRecordBatchStream) -> Result<Vec<RecordBatc
.map_err(DataFusionError::from)
}

/// Merge two record batch references into a single record batch.
/// All the record batches inside the slice must have the same schema.
///
/// Can use concat_batches after https://github.com/apache/arrow-rs/issues/3456
pub fn merge_batches(
first: &RecordBatch,
second: &RecordBatch,
schema: SchemaRef,
) -> ArrowResult<RecordBatch> {
let columns = (0..schema.fields.len())
.map(|index| {
let first_column = first.column(index).as_ref();
let second_column = second.column(index).as_ref();
concat(&[first_column, second_column])
})
.collect::<ArrowResult<Vec<_>>>()?;
RecordBatch::try_new(schema, columns)
}

/// Merge a slice of record batch references into a single record batch, or
/// return None if the slice itself is empty. All the record batches inside the
/// slice must have the same schema.
///
/// Can use concat_batches after https://github.com/apache/arrow-rs/issues/3456
pub fn merge_multiple_batches(
batches: &[&RecordBatch],
schema: SchemaRef,
) -> ArrowResult<Option<RecordBatch>> {
Ok(if batches.is_empty() {
None
} else {
let columns = (0..schema.fields.len())
.map(|index| {
concat(
&batches
.iter()
.map(|batch| batch.column(index).as_ref())
.collect::<Vec<_>>(),
)
})
.collect::<ArrowResult<Vec<_>>>()?;
Some(RecordBatch::try_new(schema, columns)?)
})
}

/// Recursively builds a list of files in a directory with a given extension
pub fn build_checked_file_list(dir: &str, ext: &str) -> Result<Vec<String>> {
let mut filenames: Vec<String> = Vec::new();
Expand Down
31 changes: 23 additions & 8 deletions datafusion/core/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use crate::physical_plan::projection::ProjectionExec;
use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::windows::WindowAggExec;
use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
use crate::physical_plan::{joins::utils as join_utils, Partitioning};
use crate::physical_plan::{AggregateExpr, ExecutionPlan, PhysicalExpr, WindowExpr};
use crate::{
Expand Down Expand Up @@ -614,13 +614,28 @@ impl DefaultPhysicalPlanner {
})
.collect::<Result<Vec<_>>>()?;

Ok(Arc::new(WindowAggExec::try_new(
window_expr,
input_exec,
physical_input_schema,
physical_partition_keys,
physical_sort_keys,
)?))
let uses_bounded_memory = window_expr
.iter()
.all(|e| e.uses_bounded_memory());
// If all window expressions can run with bounded memory,
// choose the bounded window variant:
Ok(if uses_bounded_memory {
Arc::new(BoundedWindowAggExec::try_new(
window_expr,
input_exec,
physical_input_schema,
physical_partition_keys,
physical_sort_keys,
)?)
} else {
Arc::new(WindowAggExec::try_new(
window_expr,
input_exec,
physical_input_schema,
physical_partition_keys,
physical_sort_keys,
)?)
})
}
LogicalPlan::Aggregate(Aggregate {
input,
Expand Down
Loading