Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion datafusion-cli/src/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ fn get_bucket_name(url: &Url) -> Result<&str> {
#[cfg(test)]
mod tests {
use datafusion::{
datasource::listing::ListingTableUrl, logical_expr::{LogicalPlan, DdlStatement},
datasource::listing::ListingTableUrl,
logical_expr::{DdlStatement, LogicalPlan},
prelude::SessionContext,
};

Expand Down
3 changes: 2 additions & 1 deletion datafusion/core/src/datasource/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,8 @@ mod tests {
let formatted = arrow::util::pretty::pretty_format_batches(&plan)
.unwrap()
.to_string();
assert!(formatted.contains("ParquetExec: limit=Some(10)"));
assert!(formatted.contains("ParquetExec: "));
assert!(formatted.contains("projection=[bool_col, int_col], limit=10"));
Ok(())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ mod tests {
"AggregateExec: mode=Final, gby=[], aggr=[COUNT(1)]",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"AggregateExec: mode=Partial, gby=[], aggr=[COUNT(1)]",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c]",
];
assert_optimized!(expected, plan);

Expand Down Expand Up @@ -362,7 +362,7 @@ mod tests {
let expected = &[
"AggregateExec: mode=Final, gby=[], aggr=[COUNT(2)]",
"AggregateExec: mode=Partial, gby=[], aggr=[COUNT(1)]",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c]",
];

assert_optimized!(expected, plan);
Expand Down Expand Up @@ -391,7 +391,7 @@ mod tests {
// should combine the Partial/Final AggregateExecs to tne Single AggregateExec
let expected = &[
"AggregateExec: mode=Single, gby=[], aggr=[COUNT(1)]",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c]",
];

assert_optimized!(expected, plan);
Expand Down Expand Up @@ -425,7 +425,7 @@ mod tests {
// should combine the Partial/Final AggregateExecs to tne Single AggregateExec
let expected = &[
"AggregateExec: mode=Single, gby=[c@2 as c], aggr=[Sum(b)]",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c]",
];

assert_optimized!(expected, plan);
Expand Down
108 changes: 54 additions & 54 deletions datafusion/core/src/physical_optimizer/dist_enforcement.rs

Large diffs are not rendered by default.

84 changes: 42 additions & 42 deletions datafusion/core/src/physical_optimizer/repartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ mod tests {
"CoalescePartitionsExec",
"AggregateExec: mode=Partial, gby=[], aggr=[]",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
];

assert_optimized!(expected, plan);
Expand All @@ -592,7 +592,7 @@ mod tests {
"AggregateExec: mode=Partial, gby=[], aggr=[]",
"FilterExec: c1@0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
];

assert_optimized!(expected, plan);
Expand All @@ -610,7 +610,7 @@ mod tests {
"FilterExec: c1@0",
// nothing sorts the data, so the local limit doesn't require sorted data either
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
];

assert_optimized!(expected, plan);
Expand All @@ -628,7 +628,7 @@ mod tests {
"FilterExec: c1@0",
// nothing sorts the data, so the local limit doesn't require sorted data either
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
];

assert_optimized!(expected, plan);
Expand All @@ -644,7 +644,7 @@ mod tests {
"LocalLimitExec: fetch=100",
// data is sorted so can't repartition here
"SortExec: expr=[c1@0 ASC]",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
];

assert_optimized!(expected, plan);
Expand All @@ -662,7 +662,7 @@ mod tests {
// data is sorted so can't repartition here even though
// filter would benefit from parallelism, the answers might be wrong
"SortExec: expr=[c1@0 ASC]",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
];

assert_optimized!(expected, plan);
Expand All @@ -687,7 +687,7 @@ mod tests {
"GlobalLimitExec: skip=0, fetch=100",
"LocalLimitExec: fetch=100",
// Expect no repartition to happen for local limit
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
];

assert_optimized!(expected, plan);
Expand All @@ -714,7 +714,7 @@ mod tests {
"GlobalLimitExec: skip=0, fetch=100",
"LocalLimitExec: fetch=100",
// Expect no repartition to happen for local limit
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
];

assert_optimized!(expected, plan);
Expand All @@ -730,11 +730,11 @@ mod tests {
let expected = &[
"UnionExec",
// Expect no repartition of ParquetExec
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
];

assert_optimized!(expected, plan);
Expand All @@ -751,7 +751,7 @@ mod tests {
"SortPreservingMergeExec: [c1@0 ASC]",
"SortExec: expr=[c1@0 ASC]",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
];

assert_optimized!(expected, plan);
Expand All @@ -766,7 +766,7 @@ mod tests {
// should not repartition / sort (as the data was already sorted)
let expected = &[
"SortPreservingMergeExec: [c1@0 ASC]",
"ParquetExec: limit=None, partitions={2 groups: [[x], [y]]}, output_ordering=[c1@0 ASC], projection=[c1]",
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[c1], output_ordering=[c1@0 ASC]",
];

assert_optimized!(expected, plan);
Expand All @@ -783,8 +783,8 @@ mod tests {
let expected = &[
"SortPreservingMergeExec: [c1@0 ASC]",
"UnionExec",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1], output_ordering=[c1@0 ASC]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1], output_ordering=[c1@0 ASC]",
];

assert_optimized!(expected, plan);
Expand All @@ -801,7 +801,7 @@ mod tests {
// should not repartition as doing so destroys the necessary sort order
let expected = &[
"SortRequiredExec",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1], output_ordering=[c1@0 ASC]",
];

assert_optimized!(expected, plan);
Expand Down Expand Up @@ -830,11 +830,11 @@ mod tests {
"UnionExec",
// union input 1: no repartitioning
"SortRequiredExec",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1], output_ordering=[c1@0 ASC]",
// union input 2: should repartition
"FilterExec: c1@0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
];

assert_optimized!(expected, plan);
Expand All @@ -852,7 +852,7 @@ mod tests {
"SortExec: expr=[c1@0 ASC]",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ProjectionExec: expr=[c1@0 as c1]",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
];

assert_optimized!(expected, plan);
Expand All @@ -869,7 +869,7 @@ mod tests {
let expected = &[
"SortPreservingMergeExec: [c1@0 ASC]",
"ProjectionExec: expr=[c1@0 as c1]",
"ParquetExec: limit=None, partitions={2 groups: [[x], [y]]}, output_ordering=[c1@0 ASC], projection=[c1]",
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[c1], output_ordering=[c1@0 ASC]",
];

assert_optimized!(expected, plan);
Expand All @@ -884,7 +884,7 @@ mod tests {
let expected = &[
"SortExec: expr=[c1@0 ASC]",
"ProjectionExec: expr=[c1@0 as c1]",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
];

assert_optimized!(expected, plan);
Expand All @@ -902,7 +902,7 @@ mod tests {
"SortExec: expr=[c1@0 ASC]",
"FilterExec: c1@0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
];

assert_optimized!(expected, plan);
Expand All @@ -924,7 +924,7 @@ mod tests {
"FilterExec: c1@0",
// repartition is lowest down
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
];

assert_optimized!(expected, plan);
Expand All @@ -939,7 +939,7 @@ mod tests {
"AggregateExec: mode=Final, gby=[], aggr=[]",
"CoalescePartitionsExec",
"AggregateExec: mode=Partial, gby=[], aggr=[]",
"ParquetExec: limit=None, partitions={2 groups: [[x:0..50], [x:50..100]]}, projection=[c1]",
"ParquetExec: file_groups={2 groups: [[x:0..50], [x:50..100]]}, projection=[c1]",
];

assert_optimized!(expected, plan, 2, true, 10);
Expand All @@ -955,7 +955,7 @@ mod tests {
"CoalescePartitionsExec",
"AggregateExec: mode=Partial, gby=[], aggr=[]",
// Plan already has two partitions
"ParquetExec: limit=None, partitions={2 groups: [[x], [y]]}, projection=[c1]",
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[c1]",
];

assert_optimized!(expected, plan, 2, true, 10);
Expand All @@ -971,7 +971,7 @@ mod tests {
"CoalescePartitionsExec",
"AggregateExec: mode=Partial, gby=[], aggr=[]",
// Multiple source files splitted across partitions
"ParquetExec: limit=None, partitions={4 groups: [[x:0..75], [x:75..100, y:0..50], [y:50..125], [y:125..200]]}, projection=[c1]",
"ParquetExec: file_groups={4 groups: [[x:0..75], [x:75..100, y:0..50], [y:50..125], [y:125..200]]}, projection=[c1]",
];

assert_optimized!(expected, plan, 4, true, 10);
Expand All @@ -988,7 +988,7 @@ mod tests {
// data is sorted so can't repartition here
"SortExec: expr=[c1@0 ASC]",
// Doesn't parallelize for SortExec without preserve_partitioning
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
];

assert_optimized!(expected, plan, 2, true, 10);
Expand All @@ -1007,7 +1007,7 @@ mod tests {
// filter would benefit from parallelism, the answers might be wrong
"SortExec: expr=[c1@0 ASC]",
// SortExec doesn't benefit from input partitioning
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
];

assert_optimized!(expected, plan, 2, true, 10);
Expand All @@ -1032,7 +1032,7 @@ mod tests {
"GlobalLimitExec: skip=0, fetch=100",
// Limit doesn't benefit from input partitionins - no parallelism
"LocalLimitExec: fetch=100",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
];

assert_optimized!(expected, plan, 2, true, 10);
Expand All @@ -1045,12 +1045,12 @@ mod tests {

let expected = &[
"UnionExec",
// Union doesn benefit from input partitioning - no parallelism
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
// Union doesn't benefit from input partitioning - no parallelism
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
];

assert_optimized!(expected, plan, 2, true, 10);
Expand All @@ -1064,7 +1064,7 @@ mod tests {

// parallelization potentially could break sort order
let expected = &[
"ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1], output_ordering=[c1@0 ASC]",
];

assert_optimized!(expected, plan, 2, true, 10);
Expand All @@ -1081,8 +1081,8 @@ mod tests {
let expected = &[
"SortPreservingMergeExec: [c1@0 ASC]",
"UnionExec",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1], output_ordering=[c1@0 ASC]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1], output_ordering=[c1@0 ASC]",
];

assert_optimized!(expected, plan, 2, true, 10);
Expand All @@ -1099,7 +1099,7 @@ mod tests {
// no parallelization to preserve sort order
let expected = &[
"SortRequiredExec",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1], output_ordering=[c1@0 ASC]",
];

assert_optimized!(expected, plan, 2, true, 10);
Expand All @@ -1114,7 +1114,7 @@ mod tests {
// data should not be repartitioned / resorted
let expected = &[
"ProjectionExec: expr=[c1@0 as c1]",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1], output_ordering=[c1@0 ASC]",
];

assert_optimized!(expected, plan, 2, true, 10);
Expand Down
Loading