diff --git a/datafusion-cli/src/object_storage.rs b/datafusion-cli/src/object_storage.rs index 1fd509775d59f..8d29f80a4ba6d 100644 --- a/datafusion-cli/src/object_storage.rs +++ b/datafusion-cli/src/object_storage.rs @@ -112,7 +112,8 @@ fn get_bucket_name(url: &Url) -> Result<&str> { #[cfg(test)] mod tests { use datafusion::{ - datasource::listing::ListingTableUrl, logical_expr::{LogicalPlan, DdlStatement}, + datasource::listing::ListingTableUrl, + logical_expr::{DdlStatement, LogicalPlan}, prelude::SessionContext, }; diff --git a/datafusion/core/src/datasource/view.rs b/datafusion/core/src/datasource/view.rs index dea7ecadf7f4e..391e4b93c4e57 100644 --- a/datafusion/core/src/datasource/view.rs +++ b/datafusion/core/src/datasource/view.rs @@ -473,7 +473,8 @@ mod tests { let formatted = arrow::util::pretty::pretty_format_batches(&plan) .unwrap() .to_string(); - assert!(formatted.contains("ParquetExec: limit=Some(10)")); + assert!(formatted.contains("ParquetExec: ")); + assert!(formatted.contains("projection=[bool_col, int_col], limit=10")); Ok(()) } diff --git a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs index 0674b6aa31b94..59e5fc95edbb5 100644 --- a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs +++ b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs @@ -334,7 +334,7 @@ mod tests { "AggregateExec: mode=Final, gby=[], aggr=[COUNT(1)]", "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "AggregateExec: mode=Partial, gby=[], aggr=[COUNT(1)]", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c]", ]; assert_optimized!(expected, plan); @@ -362,7 +362,7 @@ mod tests { let expected = &[ "AggregateExec: mode=Final, gby=[], aggr=[COUNT(2)]", "AggregateExec: mode=Partial, gby=[], aggr=[COUNT(1)]", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c]", ]; assert_optimized!(expected, plan); @@ -391,7 +391,7 @@ mod tests { // should combine the Partial/Final AggregateExecs to tne Single AggregateExec let expected = &[ "AggregateExec: mode=Single, gby=[], aggr=[COUNT(1)]", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c]", ]; assert_optimized!(expected, plan); @@ -425,7 +425,7 @@ mod tests { // should combine the Partial/Final AggregateExecs to tne Single AggregateExec let expected = &[ "AggregateExec: mode=Single, gby=[c@2 as c], aggr=[Sum(b)]", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c]", ]; assert_optimized!(expected, plan); diff --git a/datafusion/core/src/physical_optimizer/dist_enforcement.rs b/datafusion/core/src/physical_optimizer/dist_enforcement.rs index f74a7f2e93e35..e6fd15b7ce88d 100644 --- a/datafusion/core/src/physical_optimizer/dist_enforcement.rs +++ b/datafusion/core/src/physical_optimizer/dist_enforcement.rs @@ -1273,12 +1273,12 @@ mod tests { top_join_plan.as_str(), join_plan.as_str(), "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10), input_partitions=1", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }], 10), input_partitions=1", "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10), input_partitions=1", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ], // Should include 4 RepartitionExecs _ => vec![ @@ -1286,12 +1286,12 @@ mod tests { "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10), input_partitions=10", join_plan.as_str(), "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10), input_partitions=1", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }], 10), input_partitions=1", "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10), input_partitions=1", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ], }; assert_optimized!(expected, top_join); @@ -1329,12 +1329,12 @@ mod tests { top_join_plan.as_str(), join_plan.as_str(), "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10), input_partitions=1", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }], 10), input_partitions=1", "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10), input_partitions=1", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ], // Should include 4 RepartitionExecs _ => @@ -1343,12 +1343,12 @@ mod tests { "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 6 }], 10), input_partitions=10", join_plan.as_str(), "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10), input_partitions=1", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }], 10), input_partitions=1", "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10), input_partitions=1", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ], }; assert_optimized!(expected, top_join); @@ -1398,11 +1398,11 @@ mod tests { "ProjectionExec: expr=[a@0 as a1, a@0 as a2]", "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"a\", index: 0 }, Column { name: \"b\", index: 1 })]", "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10), input_partitions=1", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", "RepartitionExec: partitioning=Hash([Column { name: \"b\", index: 1 }], 10), input_partitions=1", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10), input_partitions=1", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ]; assert_optimized!(expected, top_join); @@ -1420,11 +1420,11 @@ mod tests { "ProjectionExec: expr=[a@0 as a1, a@0 as a2]", "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"a\", index: 0 }, Column { name: \"b\", index: 1 })]", "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10), input_partitions=1", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", "RepartitionExec: partitioning=Hash([Column { name: \"b\", index: 1 }], 10), input_partitions=1", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10), input_partitions=1", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ]; assert_optimized!(expected, top_join); @@ -1471,11 +1471,11 @@ mod tests { "ProjectionExec: expr=[c@2 as c1]", "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"a\", index: 0 }, Column { name: \"b\", index: 1 })]", "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10), input_partitions=1", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", "RepartitionExec: partitioning=Hash([Column { name: \"b\", index: 1 }], 10), input_partitions=1", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10), input_partitions=1", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ]; assert_optimized!(expected, top_join); @@ -1508,11 +1508,11 @@ mod tests { "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]", "RepartitionExec: partitioning=Hash([Column { name: \"a1\", index: 0 }], 10), input_partitions=1", "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", "AggregateExec: mode=FinalPartitioned, gby=[a2@0 as a2], aggr=[]", "RepartitionExec: partitioning=Hash([Column { name: \"a2\", index: 0 }], 10), input_partitions=1", "AggregateExec: mode=Partial, gby=[a@0 as a2], aggr=[]", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ]; assert_optimized!(expected, join); Ok(()) @@ -1557,11 +1557,11 @@ mod tests { "AggregateExec: mode=FinalPartitioned, gby=[b1@0 as b1, a1@1 as a1], aggr=[]", "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 0 }, Column { name: \"a1\", index: 1 }], 10), input_partitions=1", "AggregateExec: mode=Partial, gby=[b@1 as b1, a@0 as a1], aggr=[]", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", "AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], aggr=[]", "RepartitionExec: partitioning=Hash([Column { name: \"b\", index: 0 }, Column { name: \"a\", index: 1 }], 10), input_partitions=1", "AggregateExec: mode=Partial, gby=[b@1 as b, a@0 as a], aggr=[]", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ]; assert_optimized!(expected, join); Ok(()) @@ -1663,16 +1663,16 @@ mod tests { "ProjectionExec: expr=[a@0 as A, a@0 as AA, b@1 as B, c@2 as C]", "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"b\", index: 1 }, Column { name: \"b1\", index: 1 }), (Column { name: \"c\", index: 2 }, Column { name: \"c1\", index: 2 }), (Column { name: \"a\", index: 0 }, Column { name: \"a1\", index: 0 })]", "RepartitionExec: partitioning=Hash([Column { name: \"b\", index: 1 }, Column { name: \"c\", index: 2 }, Column { name: \"a\", index: 0 }], 10), input_partitions=1", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }, Column { name: \"c1\", index: 2 }, Column { name: \"a1\", index: 0 }], 10), input_partitions=1", "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"b\", index: 1 }, Column { name: \"b1\", index: 1 }), (Column { name: \"c\", index: 2 }, Column { name: \"c1\", index: 2 }), (Column { name: \"a\", index: 0 }, Column { name: \"a1\", index: 0 })]", "RepartitionExec: partitioning=Hash([Column { name: \"b\", index: 1 }, Column { name: \"c\", index: 2 }, Column { name: \"a\", index: 0 }], 10), input_partitions=1", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }, Column { name: \"c1\", index: 2 }, Column { name: \"a1\", index: 0 }], 10), input_partitions=1", "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ]; assert_optimized!(expected, filter_top_join); Ok(()) @@ -1785,16 +1785,16 @@ mod tests { "ProjectionExec: expr=[a@0 as A, a@0 as AA, b@1 as B, c@2 as C]", "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"a\", index: 0 }, Column { name: \"a1\", index: 0 }), (Column { name: \"b\", index: 1 }, Column { name: \"b1\", index: 1 }), (Column { name: \"c\", index: 2 }, Column { name: \"c1\", index: 2 })]", "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }, Column { name: \"b\", index: 1 }, Column { name: \"c\", index: 2 }], 10), input_partitions=1", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", "RepartitionExec: partitioning=Hash([Column { name: \"a1\", index: 0 }, Column { name: \"b1\", index: 1 }, Column { name: \"c1\", index: 2 }], 10), input_partitions=1", "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"c\", index: 2 }, Column { name: \"c1\", index: 2 }), (Column { name: \"b\", index: 1 }, Column { name: \"b1\", index: 1 }), (Column { name: \"a\", index: 0 }, Column { name: \"a1\", index: 0 })]", "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }, Column { name: \"b\", index: 1 }, Column { name: \"a\", index: 0 }], 10), input_partitions=1", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", "RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 2 }, Column { name: \"b1\", index: 1 }, Column { name: \"a1\", index: 0 }], 10), input_partitions=1", "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ]; assert_plan_txt!(expected, reordered); @@ -1906,16 +1906,16 @@ mod tests { "ProjectionExec: expr=[a@0 as A, a@0 as AA, b@1 as B, c@2 as C]", "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"a\", index: 0 }, Column { name: \"a1\", index: 0 }), (Column { name: \"b\", index: 1 }, Column { name: \"b1\", index: 1 })]", "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }, Column { name: \"b\", index: 1 }], 10), input_partitions=1", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", "RepartitionExec: partitioning=Hash([Column { name: \"a1\", index: 0 }, Column { name: \"b1\", index: 1 }], 10), input_partitions=1", "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"c\", index: 2 }, Column { name: \"c1\", index: 2 }), (Column { name: \"b\", index: 1 }, Column { name: \"b1\", index: 1 }), (Column { name: \"a\", index: 0 }, Column { name: \"a1\", index: 0 })]", "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }, Column { name: \"b\", index: 1 }, Column { name: \"a\", index: 0 }], 10), input_partitions=1", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", "RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 2 }, Column { name: \"b1\", index: 1 }, Column { name: \"a1\", index: 0 }], 10), input_partitions=1", "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ]; assert_plan_txt!(expected, reordered); @@ -1980,14 +1980,14 @@ mod tests { join_plan.as_str(), "SortExec: expr=[a@0 ASC]", "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10), input_partitions=1", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", "SortExec: expr=[b1@1 ASC]", "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }], 10), input_partitions=1", "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", "SortExec: expr=[c@2 ASC]", "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10), input_partitions=1", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ], // Should include 4 RepartitionExecs _ => vec![ @@ -1997,14 +1997,14 @@ mod tests { join_plan.as_str(), "SortExec: expr=[a@0 ASC]", "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10), input_partitions=1", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", "SortExec: expr=[b1@1 ASC]", "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }], 10), input_partitions=1", "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", "SortExec: expr=[c@2 ASC]", "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10), input_partitions=1", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ], }; assert_optimized!(expected, top_join); @@ -2033,14 +2033,14 @@ mod tests { join_plan.as_str(), "SortExec: expr=[a@0 ASC]", "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10), input_partitions=1", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", "SortExec: expr=[b1@1 ASC]", "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }], 10), input_partitions=1", "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", "SortExec: expr=[c@2 ASC]", "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10), input_partitions=1", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ], // Should include 4 RepartitionExecs and 4 SortExecs _ => vec![ @@ -2050,14 +2050,14 @@ mod tests { join_plan.as_str(), "SortExec: expr=[a@0 ASC]", "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10), input_partitions=1", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", "SortExec: expr=[b1@1 ASC]", "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }], 10), input_partitions=1", "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", "SortExec: expr=[c@2 ASC]", "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10), input_partitions=1", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ], }; assert_optimized!(expected, top_join); @@ -2124,13 +2124,13 @@ mod tests { "AggregateExec: mode=FinalPartitioned, gby=[b1@0 as b1, a1@1 as a1], aggr=[]", "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 0 }, Column { name: \"a1\", index: 1 }], 10), input_partitions=1", "AggregateExec: mode=Partial, gby=[b@1 as b1, a@0 as a1], aggr=[]", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", "SortExec: expr=[b2@1 ASC,a2@0 ASC]", "ProjectionExec: expr=[a@1 as a2, b@0 as b2]", "AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], aggr=[]", "RepartitionExec: partitioning=Hash([Column { name: \"b\", index: 0 }, Column { name: \"a\", index: 1 }], 10), input_partitions=1", "AggregateExec: mode=Partial, gby=[b@1 as b, a@0 as a], aggr=[]", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ]; assert_optimized!(expected, join); Ok(()) @@ -2159,7 +2159,7 @@ mod tests { let expected = &[ "SortPreservingMergeExec: [a@0 ASC]", "CoalesceBatchesExec: target_batch_size=4096", - "ParquetExec: limit=None, partitions={2 groups: [[x], [y]]}, output_ordering=[a@0 ASC], projection=[a, b, c, d, e]", + "ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC]", ]; assert_optimized!(expected, exec); Ok(()) @@ -2193,11 +2193,11 @@ mod tests { "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]", "RepartitionExec: partitioning=Hash([Column { name: \"a1\", index: 0 }], 10), input_partitions=1", "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]", "RepartitionExec: partitioning=Hash([Column { name: \"a1\", index: 0 }], 10), input_partitions=1", "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ]; assert_optimized!(expected, plan); Ok(()) diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs index 1db61e379e892..e7c72398fd09f 100644 --- a/datafusion/core/src/physical_optimizer/repartition.rs +++ b/datafusion/core/src/physical_optimizer/repartition.rs @@ -575,7 +575,7 @@ mod tests { "CoalescePartitionsExec", "AggregateExec: mode=Partial, gby=[], aggr=[]", "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]", ]; assert_optimized!(expected, plan); @@ -592,7 +592,7 @@ mod tests { "AggregateExec: mode=Partial, gby=[], aggr=[]", "FilterExec: c1@0", "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]", ]; assert_optimized!(expected, plan); @@ -610,7 +610,7 @@ mod tests { "FilterExec: c1@0", // nothing sorts the data, so the local limit doesn't require sorted data either "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]", ]; assert_optimized!(expected, plan); @@ -628,7 +628,7 @@ mod tests { "FilterExec: c1@0", // nothing sorts the data, so the local limit doesn't require sorted data either "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]", ]; assert_optimized!(expected, plan); @@ -644,7 +644,7 @@ mod tests { "LocalLimitExec: fetch=100", // data is sorted so can't repartition here "SortExec: expr=[c1@0 ASC]", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]", ]; assert_optimized!(expected, plan); @@ -662,7 +662,7 @@ mod tests { // data is sorted so can't repartition here even though // filter would benefit from parallelism, the answers might be wrong "SortExec: expr=[c1@0 ASC]", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]", ]; assert_optimized!(expected, plan); @@ -687,7 +687,7 @@ mod tests { "GlobalLimitExec: skip=0, fetch=100", "LocalLimitExec: fetch=100", // Expect no repartition to happen for local limit - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]", ]; assert_optimized!(expected, plan); @@ -714,7 +714,7 @@ mod tests { "GlobalLimitExec: skip=0, fetch=100", "LocalLimitExec: fetch=100", // Expect no repartition to happen for local limit - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]", ]; assert_optimized!(expected, plan); @@ -730,11 +730,11 @@ mod tests { let expected = &[ "UnionExec", // Expect no repartition of ParquetExec - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]", ]; assert_optimized!(expected, plan); @@ -751,7 +751,7 @@ mod tests { "SortPreservingMergeExec: [c1@0 ASC]", "SortExec: expr=[c1@0 ASC]", "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]", ]; assert_optimized!(expected, plan); @@ -766,7 +766,7 @@ mod tests { // should not repartition / sort (as the data was already sorted) let expected = &[ "SortPreservingMergeExec: [c1@0 ASC]", - "ParquetExec: limit=None, partitions={2 groups: [[x], [y]]}, output_ordering=[c1@0 ASC], projection=[c1]", + "ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[c1], output_ordering=[c1@0 ASC]", ]; assert_optimized!(expected, plan); @@ -783,8 +783,8 @@ mod tests { let expected = &[ "SortPreservingMergeExec: [c1@0 ASC]", "UnionExec", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1], output_ordering=[c1@0 ASC]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1], output_ordering=[c1@0 ASC]", ]; assert_optimized!(expected, plan); @@ -801,7 +801,7 @@ mod tests { // should not repartition as doing so destroys the necessary sort order let expected = &[ "SortRequiredExec", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1], output_ordering=[c1@0 ASC]", ]; assert_optimized!(expected, plan); @@ -830,11 +830,11 @@ mod tests { "UnionExec", // union input 1: no repartitioning "SortRequiredExec", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1], output_ordering=[c1@0 ASC]", // union input 2: should repartition "FilterExec: c1@0", "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]", ]; assert_optimized!(expected, plan); @@ -852,7 +852,7 @@ mod tests { "SortExec: expr=[c1@0 ASC]", "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "ProjectionExec: expr=[c1@0 as c1]", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]", ]; assert_optimized!(expected, plan); @@ -869,7 +869,7 @@ mod tests { let expected = &[ "SortPreservingMergeExec: [c1@0 ASC]", "ProjectionExec: expr=[c1@0 as c1]", - "ParquetExec: limit=None, partitions={2 groups: [[x], [y]]}, output_ordering=[c1@0 ASC], projection=[c1]", + "ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[c1], output_ordering=[c1@0 ASC]", ]; assert_optimized!(expected, plan); @@ -884,7 +884,7 @@ mod tests { let expected = &[ "SortExec: expr=[c1@0 ASC]", "ProjectionExec: expr=[c1@0 as c1]", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]", ]; assert_optimized!(expected, plan); @@ -902,7 +902,7 @@ mod tests { "SortExec: expr=[c1@0 ASC]", "FilterExec: c1@0", "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]", ]; assert_optimized!(expected, plan); @@ -924,7 +924,7 @@ mod tests { "FilterExec: c1@0", // repartition is lowest down "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]", ]; assert_optimized!(expected, plan); @@ -939,7 +939,7 @@ mod tests { "AggregateExec: mode=Final, gby=[], aggr=[]", "CoalescePartitionsExec", "AggregateExec: mode=Partial, gby=[], aggr=[]", - "ParquetExec: limit=None, partitions={2 groups: [[x:0..50], [x:50..100]]}, projection=[c1]", + "ParquetExec: file_groups={2 groups: [[x:0..50], [x:50..100]]}, projection=[c1]", ]; assert_optimized!(expected, plan, 2, true, 10); @@ -955,7 +955,7 @@ mod tests { "CoalescePartitionsExec", "AggregateExec: mode=Partial, gby=[], aggr=[]", // Plan already has two partitions - "ParquetExec: limit=None, partitions={2 groups: [[x], [y]]}, projection=[c1]", + "ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[c1]", ]; assert_optimized!(expected, plan, 2, true, 10); @@ -971,7 +971,7 @@ mod tests { "CoalescePartitionsExec", "AggregateExec: mode=Partial, gby=[], aggr=[]", // Multiple source files splitted across partitions - "ParquetExec: limit=None, partitions={4 groups: [[x:0..75], [x:75..100, y:0..50], [y:50..125], [y:125..200]]}, projection=[c1]", + "ParquetExec: file_groups={4 groups: [[x:0..75], [x:75..100, y:0..50], [y:50..125], [y:125..200]]}, projection=[c1]", ]; assert_optimized!(expected, plan, 4, true, 10); @@ -988,7 +988,7 @@ mod tests { // data is sorted so can't repartition here "SortExec: expr=[c1@0 ASC]", // Doesn't parallelize for SortExec without preserve_partitioning - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]", ]; assert_optimized!(expected, plan, 2, true, 10); @@ -1007,7 +1007,7 @@ mod tests { // filter would benefit from parallelism, the answers might be wrong "SortExec: expr=[c1@0 ASC]", // SortExec doesn't benefit from input partitioning - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]", ]; assert_optimized!(expected, plan, 2, true, 10); @@ -1032,7 +1032,7 @@ mod tests { "GlobalLimitExec: skip=0, fetch=100", // Limit doesn't benefit from input partitionins - no parallelism "LocalLimitExec: fetch=100", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]", ]; assert_optimized!(expected, plan, 2, true, 10); @@ -1045,12 +1045,12 @@ mod tests { let expected = &[ "UnionExec", - // Union doesn benefit from input partitioning - no parallelism - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]", + // Union doesn't benefit from input partitioning - no parallelism + "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]", ]; assert_optimized!(expected, plan, 2, true, 10); @@ -1064,7 +1064,7 @@ mod tests { // parallelization potentially could break sort order let expected = &[ - "ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1], output_ordering=[c1@0 ASC]", ]; assert_optimized!(expected, plan, 2, true, 10); @@ -1081,8 +1081,8 @@ mod tests { let expected = &[ "SortPreservingMergeExec: [c1@0 ASC]", "UnionExec", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1], output_ordering=[c1@0 ASC]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1], output_ordering=[c1@0 ASC]", ]; assert_optimized!(expected, plan, 2, true, 10); @@ -1099,7 +1099,7 @@ mod tests { // no parallelization to preserve sort order let expected = &[ "SortRequiredExec", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1], output_ordering=[c1@0 ASC]", ]; assert_optimized!(expected, plan, 2, true, 10); @@ -1114,7 +1114,7 @@ mod tests { // data should not be repartitioned / resorted let expected = &[ "ProjectionExec: expr=[c1@0 as c1]", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1], output_ordering=[c1@0 ASC]", ]; assert_optimized!(expected, plan, 2, true, 10); diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs b/datafusion/core/src/physical_optimizer/sort_enforcement.rs index 1032a16c7fd9a..713eaf288dfce 100644 --- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs +++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs @@ -1748,11 +1748,11 @@ mod tests { "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]", " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", " UnionExec", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]", " GlobalLimitExec: skip=0, fetch=100", " LocalLimitExec: fetch=100", " SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", ]; // We should keep the bottom `SortExec`. @@ -1761,11 +1761,11 @@ mod tests { " SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]", " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", " UnionExec", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]", " GlobalLimitExec: skip=0, fetch=100", " LocalLimitExec: fetch=100", " SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", ]; assert_optimized!(expected_input, expected_optimized, physical_plan); Ok(()) @@ -1838,9 +1838,9 @@ mod tests { let expected_input = vec![ "SortPreservingMergeExec: [nullable_col@0 ASC]", " UnionExec", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]", " SortExec: expr=[nullable_col@0 ASC]", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", ]; // should not add a sort at the output of the union, input plan should not be changed let expected_optimized = expected_input.clone(); @@ -1869,9 +1869,9 @@ mod tests { let expected_input = vec![ "SortPreservingMergeExec: [nullable_col@0 ASC]", " UnionExec", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC, non_nullable_col@1 ASC], projection=[nullable_col, non_nullable_col]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC, non_nullable_col@1 ASC]", " SortExec: expr=[nullable_col@0 ASC]", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", ]; // should not add a sort at the output of the union, input plan should not be changed let expected_optimized = expected_input.clone(); @@ -1902,18 +1902,18 @@ mod tests { let expected_input = vec![ "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]", " UnionExec", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]", " SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", ]; let expected_optimized = vec![ "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]", " UnionExec", " SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]", " SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", ]; assert_optimized!(expected_input, expected_optimized, physical_plan); Ok(()) @@ -1945,20 +1945,20 @@ mod tests { "SortPreservingMergeExec: [nullable_col@0 ASC]", " UnionExec", " SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]", " SortExec: expr=[nullable_col@0 ASC]", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", ]; // should adjust sorting in the first input of the union such that it is not unnecessarily fine let expected_optimized = vec![ "SortPreservingMergeExec: [nullable_col@0 ASC]", " UnionExec", " SortExec: expr=[nullable_col@0 ASC]", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]", " SortExec: expr=[nullable_col@0 ASC]", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", ]; assert_optimized!(expected_input, expected_optimized, physical_plan); Ok(()) @@ -1990,20 +1990,20 @@ mod tests { "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]", " UnionExec", " SortExec: expr=[nullable_col@0 ASC]", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]", " SortExec: expr=[nullable_col@0 ASC]", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", ]; let expected_optimized = vec![ "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]", " UnionExec", " SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", " SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]", " SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", ]; assert_optimized!(expected_input, expected_optimized, physical_plan); Ok(()) @@ -2043,17 +2043,17 @@ mod tests { "SortPreservingMergeExec: [nullable_col@0 ASC]", " UnionExec", " SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", " SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 DESC NULLS LAST]", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", ]; let expected_optimized = vec![ "SortPreservingMergeExec: [nullable_col@0 ASC]", " UnionExec", " SortExec: expr=[nullable_col@0 ASC]", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", " SortExec: expr=[nullable_col@0 ASC]", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", ]; assert_optimized!(expected_input, expected_optimized, physical_plan); Ok(()) @@ -2089,11 +2089,11 @@ mod tests { "SortPreservingMergeExec: [nullable_col@0 ASC]", " UnionExec", " SortExec: expr=[nullable_col@0 ASC]", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]", " SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]", " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", ]; // Should adjust the requirement in the third input of the union so // that it is not unnecessarily fine. @@ -2101,11 +2101,11 @@ mod tests { "SortPreservingMergeExec: [nullable_col@0 ASC]", " UnionExec", " SortExec: expr=[nullable_col@0 ASC]", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]", " SortExec: expr=[nullable_col@0 ASC]", " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", ]; assert_optimized!(expected_input, expected_optimized, physical_plan); Ok(()) @@ -2132,18 +2132,18 @@ mod tests { "SortPreservingMergeExec: [nullable_col@0 ASC]", " UnionExec", " SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", " SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", ]; // Union preserves the inputs ordering and we should not change any of the SortExecs under UnionExec let expected_output = vec![ "SortPreservingMergeExec: [nullable_col@0 ASC]", " UnionExec", " SortExec: expr=[nullable_col@0 ASC]", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", " SortExec: expr=[nullable_col@0 ASC]", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", ]; assert_optimized!(expected_input, expected_output, physical_plan); Ok(()) @@ -2186,16 +2186,16 @@ mod tests { let expected_input = vec![ "UnionExec", " SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", " SortExec: expr=[nullable_col@0 DESC NULLS LAST,non_nullable_col@1 DESC NULLS LAST]", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", ]; // Since `UnionExec` doesn't preserve ordering in the plan above. // We shouldn't keep SortExecs in the plan. let expected_optimized = vec![ "UnionExec", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", ]; assert_optimized!(expected_input, expected_optimized, physical_plan); Ok(()) @@ -2237,16 +2237,16 @@ mod tests { " SortPreservingMergeExec: [nullable_col@0 DESC NULLS LAST]", " UnionExec", " SortExec: expr=[nullable_col@0 DESC NULLS LAST]", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC, non_nullable_col@1 ASC], projection=[nullable_col, non_nullable_col]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC, non_nullable_col@1 ASC]", " SortExec: expr=[nullable_col@0 DESC NULLS LAST]", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]", ]; let expected_optimized = vec![ "WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(NULL) }]", " SortPreservingMergeExec: [nullable_col@0 ASC]", " UnionExec", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC, non_nullable_col@1 ASC], projection=[nullable_col, non_nullable_col]", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC, non_nullable_col@1 ASC]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]", ]; assert_optimized!(expected_input, expected_optimized, physical_plan); Ok(()) @@ -2277,16 +2277,16 @@ mod tests { " SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]", " UnionExec", " SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]", " SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]", ]; let expected_optimized = vec![ "BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow }], mode=[Sorted]", " SortPreservingMergeExec: [nullable_col@0 ASC]", " UnionExec", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]", ]; assert_optimized!(expected_input, expected_optimized, physical_plan); Ok(()) @@ -2327,21 +2327,21 @@ mod tests { "SortPreservingMergeExec: [nullable_col@0 ASC]", " UnionExec", " SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", " GlobalLimitExec: skip=0, fetch=100", " LocalLimitExec: fetch=100", " SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 DESC NULLS LAST]", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", ]; let expected_optimized = vec![ "SortPreservingMergeExec: [nullable_col@0 ASC]", " UnionExec", " SortExec: expr=[nullable_col@0 ASC]", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", " GlobalLimitExec: skip=0, fetch=100", " LocalLimitExec: fetch=100", " SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 DESC NULLS LAST]", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", ]; assert_optimized!(expected_input, expected_optimized, physical_plan); Ok(()) @@ -2385,8 +2385,8 @@ mod tests { let expected_input = vec![ "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]", join_plan2.as_str(), - " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[col_a, col_b]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[col_a, col_b]", ]; let expected_optimized = match join_type { JoinType::Inner @@ -2397,9 +2397,9 @@ mod tests { vec![ join_plan.as_str(), " SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", " SortExec: expr=[col_a@0 ASC]", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[col_a, col_b]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[col_a, col_b]", ] } _ => { @@ -2408,9 +2408,9 @@ mod tests { "SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]", join_plan2.as_str(), " SortExec: expr=[nullable_col@0 ASC]", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", " SortExec: expr=[col_a@0 ASC]", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[col_a, col_b]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[col_a, col_b]", ] } }; @@ -2462,8 +2462,8 @@ mod tests { let expected_input = vec![ spm_plan, join_plan2.as_str(), - " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[col_a, col_b]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[col_a, col_b]", ]; let expected_optimized = match join_type { JoinType::Inner | JoinType::Right | JoinType::RightAnti => { @@ -2471,9 +2471,9 @@ mod tests { vec![ join_plan.as_str(), " SortExec: expr=[nullable_col@0 ASC]", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", " SortExec: expr=[col_a@0 ASC,col_b@1 ASC]", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[col_a, col_b]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[col_a, col_b]", ] } _ => { @@ -2482,9 +2482,9 @@ mod tests { "SortExec: expr=[col_a@2 ASC,col_b@3 ASC]", join_plan2.as_str(), " SortExec: expr=[nullable_col@0 ASC]", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", " SortExec: expr=[col_a@0 ASC]", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[col_a, col_b]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[col_a, col_b]", ] } }; @@ -2519,8 +2519,8 @@ mod tests { let expected_input = vec![ "SortPreservingMergeExec: [col_b@3 ASC,col_a@2 ASC]", " SortMergeJoin: join_type=Inner, on=[(Column { name: \"nullable_col\", index: 0 }, Column { name: \"col_a\", index: 0 })]", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[col_a, col_b]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[col_a, col_b]", ]; // can not push down the sort requirements, need to add SortExec @@ -2528,9 +2528,9 @@ mod tests { "SortExec: expr=[col_b@3 ASC,col_a@2 ASC]", " SortMergeJoin: join_type=Inner, on=[(Column { name: \"nullable_col\", index: 0 }, Column { name: \"col_a\", index: 0 })]", " SortExec: expr=[nullable_col@0 ASC]", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", " SortExec: expr=[col_a@0 ASC]", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[col_a, col_b]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[col_a, col_b]", ]; assert_optimized!(expected_input, expected_optimized, physical_plan); @@ -2545,8 +2545,8 @@ mod tests { let expected_input = vec![ "SortPreservingMergeExec: [nullable_col@0 ASC,col_b@3 ASC,col_a@2 ASC]", " SortMergeJoin: join_type=Inner, on=[(Column { name: \"nullable_col\", index: 0 }, Column { name: \"col_a\", index: 0 })]", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[col_a, col_b]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[col_a, col_b]", ]; // can not push down the sort requirements, need to add SortExec @@ -2554,9 +2554,9 @@ mod tests { "SortExec: expr=[nullable_col@0 ASC,col_b@3 ASC,col_a@2 ASC]", " SortMergeJoin: join_type=Inner, on=[(Column { name: \"nullable_col\", index: 0 }, Column { name: \"col_a\", index: 0 })]", " SortExec: expr=[nullable_col@0 ASC]", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", " SortExec: expr=[col_a@0 ASC]", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[col_a, col_b]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[col_a, col_b]", ]; assert_optimized!(expected_input, expected_optimized, physical_plan); @@ -2627,14 +2627,14 @@ mod tests { " FilterExec: NOT non_nullable_col@1", " CoalescePartitionsExec", " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", ]; let expected_optimized = vec![ "SortPreservingMergeExec: [nullable_col@0 ASC]", " SortExec: expr=[nullable_col@0 ASC]", " FilterExec: NOT non_nullable_col@1", " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", ]; assert_optimized!(expected_input, expected_optimized, physical_plan); Ok(()) diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs b/datafusion/core/src/physical_plan/file_format/avro.rs index 87d3ca1ec7505..16dc146750948 100644 --- a/datafusion/core/src/physical_plan/file_format/avro.rs +++ b/datafusion/core/src/physical_plan/file_format/avro.rs @@ -136,12 +136,7 @@ impl ExecutionPlan for AvroExec { ) -> std::fmt::Result { match t { DisplayFormatType::Default => { - write!( - f, - "AvroExec: files={}, limit={:?}", - super::FileGroupsDisplay(&self.base_config.file_groups), - self.base_config.limit, - ) + write!(f, "AvroExec: {}", self.base_config) } } } diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs b/datafusion/core/src/physical_plan/file_format/csv.rs index 886c609e72e97..e2d2bb8ef7bf1 100644 --- a/datafusion/core/src/physical_plan/file_format/csv.rs +++ b/datafusion/core/src/physical_plan/file_format/csv.rs @@ -172,11 +172,8 @@ impl ExecutionPlan for CsvExec { DisplayFormatType::Default => { write!( f, - "CsvExec: files={}, has_header={}, limit={:?}, projection={}", - super::FileGroupsDisplay(&self.base_config.file_groups), - self.has_header, - self.base_config.limit, - super::ProjectSchemaDisplay(&self.projected_schema), + "CsvExec: {}, has_header={}", + self.base_config, self.has_header, ) } } diff --git a/datafusion/core/src/physical_plan/file_format/json.rs b/datafusion/core/src/physical_plan/file_format/json.rs index cef69883e99c2..d122fd78b4b24 100644 --- a/datafusion/core/src/physical_plan/file_format/json.rs +++ b/datafusion/core/src/physical_plan/file_format/json.rs @@ -146,12 +146,7 @@ impl ExecutionPlan for NdJsonExec { ) -> std::fmt::Result { match t { DisplayFormatType::Default => { - write!( - f, - "JsonExec: limit={:?}, files={}", - self.base_config.limit, - super::FileGroupsDisplay(&self.base_config.file_groups), - ) + write!(f, "JsonExec: {}", self.base_config) } } } diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs index 4f85f54113d9d..6591d8346558e 100644 --- a/datafusion/core/src/physical_plan/file_format/mod.rs +++ b/datafusion/core/src/physical_plan/file_format/mod.rs @@ -234,6 +234,29 @@ impl FileScanConfig { } } +impl Display for FileScanConfig { + fn fmt(&self, f: &mut Formatter) -> FmtResult { + let (schema, _, ordering) = self.project(); + + write!(f, "file_groups={}", FileGroupsDisplay(&self.file_groups))?; + + if !schema.fields().is_empty() { + write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?; + } + + if let Some(limit) = self.limit { + write!(f, ", limit={}", limit)?; + } + + if let Some(orders) = ordering { + if !orders.is_empty() { + write!(f, ", output_ordering={}", OutputOrderingDisplay(&orders))?; + } + } + Ok(()) + } +} + /// A wrapper to customize partitioned file display /// /// Prints in the format: @@ -291,6 +314,23 @@ impl<'a> Display for ProjectSchemaDisplay<'a> { } } +/// A wrapper to customize output ordering display. +#[derive(Debug)] +struct OutputOrderingDisplay<'a>(&'a [PhysicalSortExpr]); + +impl<'a> Display for OutputOrderingDisplay<'a> { + fn fmt(&self, f: &mut Formatter) -> FmtResult { + write!(f, "[")?; + for (i, e) in self.0.iter().enumerate() { + if i > 0 { + write!(f, ", ")? + } + write!(f, "{e}")?; + } + write!(f, "]") + } +} + /// A utility which can adapt file-level record batches to a table schema which may have a schema /// obtained from merging multiple file-level schemas. /// diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index c18b8fab74b93..0afb819d43efa 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -86,7 +86,7 @@ pub struct ParquetExec { /// Override for `Self::with_enable_page_index`. If None, uses /// values from base_config enable_page_index: Option, - /// Base configuraton for this scan + /// Base configuration for this scan base_config: FileScanConfig, projected_statistics: Statistics, projected_schema: SchemaRef, @@ -419,20 +419,10 @@ impl ExecutionPlan for ParquetExec { .map(|pre| format!(", pruning_predicate={}", pre.predicate_expr())) .unwrap_or_default(); - let output_ordering_string = self - .output_ordering() - .map(make_output_ordering_string) - .unwrap_or_default(); - write!( f, - "ParquetExec: limit={:?}, partitions={}{}{}{}, projection={}", - self.base_config.limit, - super::FileGroupsDisplay(&self.base_config.file_groups), - predicate_string, - pruning_predicate_string, - output_ordering_string, - super::ProjectSchemaDisplay(&self.projected_schema), + "ParquetExec: {}{}{}", + self.base_config, predicate_string, pruning_predicate_string, ) } } @@ -447,20 +437,6 @@ impl ExecutionPlan for ParquetExec { } } -fn make_output_ordering_string(ordering: &[PhysicalSortExpr]) -> String { - use std::fmt::Write; - let mut w: String = ", output_ordering=[".into(); - - for (i, e) in ordering.iter().enumerate() { - if i > 0 { - write!(&mut w, ", ").unwrap() - } - write!(&mut w, "{e}").unwrap() - } - write!(&mut w, "]").unwrap(); - w -} - /// Implements [`FileOpener`] for a parquet file struct ParquetOpener { partition_index: usize, diff --git a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs index 4356d4aa85e06..9013fb269243d 100644 --- a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs @@ -2454,10 +2454,10 @@ mod tests { "SymmetricHashJoinExec: join_type=Full, on=[(Column { name: \"a2\", index: 1 }, Column { name: \"a2\", index: 1 })], filter=BinaryExpr { left: BinaryExpr { left: CastExpr { expr: Column { name: \"a1\", index: 0 }, cast_type: Int64, cast_options: CastOptions { safe: false } }, op: Gt, right: BinaryExpr { left: CastExpr { expr: Column { name: \"a1\", index: 1 }, cast_type: Int64, cast_options: CastOptions { safe: false } }, op: Plus, right: Literal { value: Int64(3) } } }, op: And, right: BinaryExpr { left: CastExpr { expr: Column { name: \"a1\", index: 0 }, cast_type: Int64, cast_options: CastOptions { safe: false } }, op: Lt, right: BinaryExpr { left: CastExpr { expr: Column { name: \"a1\", index: 1 }, cast_type: Int64, cast_options: CastOptions { safe: false } }, op: Plus, right: Literal { value: Int64(10) } } } }", " CoalesceBatchesExec: target_batch_size=8192", " RepartitionExec: partitioning=Hash([Column { name: \"a2\", index: 1 }], 8), input_partitions=1", - // " CsvExec: files={1 group: [[tempdir/left.csv]]}, has_header=false, limit=None, projection=[a1, a2]", + // " CsvExec: file_groups={1 group: [[tempdir/left.csv]]}, projection=[a1, a2], has_header=false", " CoalesceBatchesExec: target_batch_size=8192", " RepartitionExec: partitioning=Hash([Column { name: \"a2\", index: 1 }], 8), input_partitions=1", - // " CsvExec: files={1 group: [[tempdir/right.csv]]}, has_header=false, limit=None, projection=[a1, a2]" + // " CsvExec: file_groups={1 group: [[tempdir/right.csv]]}, projection=[a1, a2], has_header=false" ] }; let mut actual: Vec<&str> = formatted.trim().lines().collect(); @@ -2507,10 +2507,10 @@ mod tests { "SymmetricHashJoinExec: join_type=Full, on=[(Column { name: \"a2\", index: 1 }, Column { name: \"a2\", index: 1 })], filter=BinaryExpr { left: BinaryExpr { left: CastExpr { expr: Column { name: \"a1\", index: 0 }, cast_type: Int64, cast_options: CastOptions { safe: false } }, op: Gt, right: BinaryExpr { left: CastExpr { expr: Column { name: \"a1\", index: 1 }, cast_type: Int64, cast_options: CastOptions { safe: false } }, op: Plus, right: Literal { value: Int64(3) } } }, op: And, right: BinaryExpr { left: CastExpr { expr: Column { name: \"a1\", index: 0 }, cast_type: Int64, cast_options: CastOptions { safe: false } }, op: Lt, right: BinaryExpr { left: CastExpr { expr: Column { name: \"a1\", index: 1 }, cast_type: Int64, cast_options: CastOptions { safe: false } }, op: Plus, right: Literal { value: Int64(10) } } } }", " CoalesceBatchesExec: target_batch_size=8192", " RepartitionExec: partitioning=Hash([Column { name: \"a2\", index: 1 }], 8), input_partitions=1", - // " CsvExec: files={1 group: [[tempdir/left.csv]]}, has_header=false, limit=None, projection=[a1, a2]", + // " CsvExec: file_groups={1 group: [[tempdir/left.csv]]}, projection=[a1, a2], has_header=false", " CoalesceBatchesExec: target_batch_size=8192", " RepartitionExec: partitioning=Hash([Column { name: \"a2\", index: 1 }], 8), input_partitions=1", - // " CsvExec: files={1 group: [[tempdir/right.csv]]}, has_header=false, limit=None, projection=[a1, a2]" + // " CsvExec: file_groups={1 group: [[tempdir/right.csv]]}, projection=[a1, a2], has_header=false" ] }; let mut actual: Vec<&str> = formatted.trim().lines().collect(); diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs index 00850f3237bca..c8edf701cf05c 100644 --- a/datafusion/core/src/physical_plan/mod.rs +++ b/datafusion/core/src/physical_plan/mod.rs @@ -328,7 +328,7 @@ pub fn with_new_children_if_necessary( /// assert_eq!("CoalesceBatchesExec: target_batch_size=8192\ /// \n FilterExec: a@0 < 5\ /// \n RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1\ -/// \n CsvExec: files={1 group: [[WORKING_DIR/tests/data/example.csv]]}, has_header=true, limit=None, projection=[a]", +/// \n CsvExec: file_groups={1 group: [[WORKING_DIR/tests/data/example.csv]]}, projection=[a], has_header=true", /// plan_string.trim()); /// /// let one_line = format!("{}", displayable_plan.one_line()); diff --git a/datafusion/core/tests/sql/avro.rs b/datafusion/core/tests/sql/avro.rs index d933db067d6d2..85ed30044c178 100644 --- a/datafusion/core/tests/sql/avro.rs +++ b/datafusion/core/tests/sql/avro.rs @@ -149,7 +149,7 @@ async fn avro_explain() { \n CoalescePartitionsExec\ \n AggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1))]\ \n RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES), input_partitions=1\ - \n AvroExec: files={1 group: [[ARROW_TEST_DATA/avro/alltypes_plain.avro]]}, limit=None\ + \n AvroExec: file_groups={1 group: [[ARROW_TEST_DATA/avro/alltypes_plain.avro]]}, projection=[id]\ \n", ], ]; diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs index 6f7150d2a53b6..1ab933022d321 100644 --- a/datafusion/core/tests/sql/explain_analyze.rs +++ b/datafusion/core/tests/sql/explain_analyze.rs @@ -609,7 +609,7 @@ async fn test_physical_plan_display_indent() { " CoalesceBatchesExec: target_batch_size=4096", " FilterExec: c12@1 < 10", " RepartitionExec: partitioning=RoundRobinBatch(9000), input_partitions=1", - " CsvExec: files={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c12]", + " CsvExec: file_groups={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, projection=[c1, c12], has_header=true", ]; let normalizer = ExplainNormalizer::new(); @@ -650,12 +650,12 @@ async fn test_physical_plan_display_indent_multi_children() { " CoalesceBatchesExec: target_batch_size=4096", " RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 9000), input_partitions=9000", " RepartitionExec: partitioning=RoundRobinBatch(9000), input_partitions=1", - " CsvExec: files={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1]", + " CsvExec: file_groups={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, projection=[c1], has_header=true", " CoalesceBatchesExec: target_batch_size=4096", " RepartitionExec: partitioning=Hash([Column { name: \"c2\", index: 0 }], 9000), input_partitions=9000", " RepartitionExec: partitioning=RoundRobinBatch(9000), input_partitions=1", " ProjectionExec: expr=[c1@0 as c2]", - " CsvExec: files={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1]", + " CsvExec: file_groups={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, projection=[c1], has_header=true", ]; let normalizer = ExplainNormalizer::new(); diff --git a/datafusion/core/tests/sql/json.rs b/datafusion/core/tests/sql/json.rs index 965a9c14fc985..10fcdfda20deb 100644 --- a/datafusion/core/tests/sql/json.rs +++ b/datafusion/core/tests/sql/json.rs @@ -92,7 +92,7 @@ async fn json_explain() { \n CoalescePartitionsExec\ \n AggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1))]\ \n RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES), input_partitions=1\ - \n JsonExec: limit=None, files={1 group: [[WORKING_DIR/tests/jsons/2.json]]}\n", + \n JsonExec: file_groups={1 group: [[WORKING_DIR/tests/jsons/2.json]]}, projection=[a]\n", ], ]; assert_eq!(expected, actual); diff --git a/datafusion/core/tests/sqllogictests/test_files/explain.slt b/datafusion/core/tests/sqllogictests/test_files/explain.slt index fe1d3ac2e4c47..0fdc9bf61a8fe 100644 --- a/datafusion/core/tests/sqllogictests/test_files/explain.slt +++ b/datafusion/core/tests/sqllogictests/test_files/explain.slt @@ -47,7 +47,41 @@ ProjectionExec: expr=[c1@0 as c1] CoalesceBatchesExec: target_batch_size=8192 FilterExec: c2@1 > 10 RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 - CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c2] + CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2], has_header=true + +# explain_csv_exec_scan_config + +statement ok +CREATE EXTERNAL TABLE aggregate_test_100_with_order ( + c1 VARCHAR NOT NULL, + c2 TINYINT NOT NULL, + c3 SMALLINT NOT NULL, + c4 SMALLINT NOT NULL, + c5 INTEGER NOT NULL, + c6 BIGINT NOT NULL, + c7 SMALLINT NOT NULL, + c8 INT NOT NULL, + c9 INT UNSIGNED NOT NULL, + c10 BIGINT UNSIGNED NOT NULL, + c11 FLOAT NOT NULL, + c12 DOUBLE NOT NULL, + c13 VARCHAR NOT NULL + ) +STORED AS CSV +WITH HEADER ROW +WITH ORDER (c1 ASC) +LOCATION '../../testing/data/csv/aggregate_test_100.csv'; + +query TT +explain SELECT c1 FROM aggregate_test_100_with_order order by c1 ASC limit 10 +---- +logical_plan +Limit: skip=0, fetch=10 + Sort: aggregate_test_100_with_order.c1 ASC NULLS LAST, fetch=10 + TableScan: aggregate_test_100_with_order projection=[c1] +physical_plan +GlobalLimitExec: skip=0, fetch=10 + CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1], output_ordering=[c1@0 ASC NULLS LAST], has_header=true ## explain_physical_plan_only diff --git a/datafusion/core/tests/sqllogictests/test_files/order.slt b/datafusion/core/tests/sqllogictests/test_files/order.slt index d42d2cf62f1f2..3b77319c0cb30 100644 --- a/datafusion/core/tests/sqllogictests/test_files/order.slt +++ b/datafusion/core/tests/sqllogictests/test_files/order.slt @@ -165,7 +165,7 @@ Projection: aggregate_test_100.c1, aggregate_test_100.c2 physical_plan ProjectionExec: expr=[c1@0 as c1, c2@1 as c2] SortExec: expr=[c2@1 ASC NULLS LAST,c3@2 ASC NULLS LAST] - CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c2, c3] + CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3], has_header=true query II SELECT c2, c3 FROM aggregate_test_100 ORDER BY c2, c3, c2 diff --git a/datafusion/core/tests/sqllogictests/test_files/select.slt b/datafusion/core/tests/sqllogictests/test_files/select.slt index ff523d713da2d..26db6cc059851 100644 --- a/datafusion/core/tests/sqllogictests/test_files/select.slt +++ b/datafusion/core/tests/sqllogictests/test_files/select.slt @@ -661,9 +661,8 @@ select * from largeutf8_data where large_str != '1' statement ok CREATE TABLE empty_table; -query +statement ok SELECT * FROM empty_table ----- # TODO: boolean_literal diff --git a/datafusion/core/tests/sqllogictests/test_files/window.slt b/datafusion/core/tests/sqllogictests/test_files/window.slt index 5ba3644923ea5..66b011eeb93d9 100644 --- a/datafusion/core/tests/sqllogictests/test_files/window.slt +++ b/datafusion/core/tests/sqllogictests/test_files/window.slt @@ -1217,7 +1217,7 @@ ProjectionExec: expr=[c9@0 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate ProjectionExec: expr=[c9@1 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as SUM(aggregate_test_100.c9)] BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted] SortExec: expr=[c9@1 ASC NULLS LAST,c8@0 ASC NULLS LAST] - CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c8, c9] + CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c8, c9], has_header=true # over_order_by_sort_keys_sorting_prefix_compacting @@ -1238,7 +1238,7 @@ ProjectionExec: expr=[c2@0 as c2, MAX(aggregate_test_100.c9) ORDER BY [aggregate BoundedWindowAggExec: wdw=[MAX(aggregate_test_100.c9): Ok(Field { name: "MAX(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int8(NULL)), end_bound: CurrentRow }], mode=[Sorted] BoundedWindowAggExec: wdw=[MIN(aggregate_test_100.c9): Ok(Field { name: "MIN(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int8(NULL)), end_bound: CurrentRow }], mode=[Sorted] SortExec: expr=[c2@0 ASC NULLS LAST,c9@1 ASC NULLS LAST] - CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c2, c9] + CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, c9], has_header=true # FIXME: for now we are not detecting prefix of sorting keys in order to re-arrange with global and save one SortExec @@ -1263,7 +1263,7 @@ SortExec: expr=[c2@0 ASC NULLS LAST] SortExec: expr=[c9@1 ASC NULLS LAST,c2@0 ASC NULLS LAST] BoundedWindowAggExec: wdw=[MIN(aggregate_test_100.c9): Ok(Field { name: "MIN(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int8(NULL)), end_bound: CurrentRow }], mode=[Sorted] SortExec: expr=[c2@0 ASC NULLS LAST,c9@1 ASC NULLS LAST] - CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c2, c9] + CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, c9], has_header=true # test_window_partition_by_order_by statement ok @@ -1293,7 +1293,7 @@ ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_10 CoalesceBatchesExec: target_batch_size=4096 RepartitionExec: partitioning=Hash([Column { name: "c1", index: 0 }, Column { name: "c2", index: 1 }], 2), input_partitions=2 RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 - CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c2, c4] + CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c4], has_header=true # test_window_agg_sort_reversed_plan @@ -1318,7 +1318,7 @@ ProjectionExec: expr=[c9@0 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) }], mode=[Sorted] BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], mode=[Sorted] SortExec: expr=[c9@0 DESC] - CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c9] + CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], has_header=true query III SELECT @@ -1359,7 +1359,7 @@ ProjectionExec: expr=[c9@0 as c9, FIRST_VALUE(aggregate_test_100.c9) ORDER BY [a BoundedWindowAggExec: wdw=[FIRST_VALUE(aggregate_test_100.c9): Ok(Field { name: "FIRST_VALUE(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) }, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)): Ok(Field { name: "LAG(aggregate_test_100.c9,Int64(2),Int64(10101))", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(UInt64(NULL)) }, LEAD(aggregate_test_100.c9,Int64(2),Int64(10101)): Ok(Field { name: "LEAD(aggregate_test_100.c9,Int64(2),Int64(10101))", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(UInt64(NULL)) }], mode=[Sorted] BoundedWindowAggExec: wdw=[FIRST_VALUE(aggregate_test_100.c9): Ok(Field { name: "FIRST_VALUE(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)): Ok(Field { name: "LAG(aggregate_test_100.c9,Int64(2),Int64(10101))", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(10)), end_bound: Following(UInt64(1)) }, LEAD(aggregate_test_100.c9,Int64(2),Int64(10101)): Ok(Field { name: "LEAD(aggregate_test_100.c9,Int64(2),Int64(10101))", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(10)), end_bound: Following(UInt64(1)) }], mode=[Sorted] SortExec: expr=[c9@0 DESC] - CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c9] + CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], has_header=true query IIIIIII SELECT @@ -1403,7 +1403,7 @@ ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 A SortExec: expr=[c9@0 ASC NULLS LAST] BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], mode=[Sorted] SortExec: expr=[c9@0 DESC] - CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c9] + CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], has_header=true query III @@ -1446,7 +1446,7 @@ ProjectionExec: expr=[c9@2 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], mode=[Sorted] BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], mode=[Sorted] SortExec: expr=[c9@2 DESC,c1@0 DESC] - CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c2, c9] + CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c9], has_header=true query IIII SELECT @@ -1537,7 +1537,7 @@ ProjectionExec: expr=[SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(10)), end_bound: Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }, SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(Int64(NULL)) }] BoundedWindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }], mode=[Sorted] SortExec: expr=[c3@2 DESC,c1@0 ASC NULLS LAST] - CsvExec: files={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/null_cases.csv]]}, has_header=true, limit=None, projection=[c1, c2, c3] + CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/null_cases.csv]]}, projection=[c1, c2, c3], has_header=true query IIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIII SELECT @@ -1612,7 +1612,7 @@ ProjectionExec: expr=[c9@1 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], mode=[Sorted] BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], mode=[Sorted] SortExec: expr=[c1@0 ASC NULLS LAST,c9@1 DESC] - CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c9] + CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c9], has_header=true query III @@ -1656,7 +1656,7 @@ ProjectionExec: expr=[c9@1 as c9, SUM(aggregate_test_100.c9) PARTITION BY [aggre BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) }], mode=[Sorted] BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], mode=[Sorted] SortExec: expr=[c1@0 ASC NULLS LAST,c9@1 DESC] - CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c9] + CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c9], has_header=true query III SELECT @@ -1702,7 +1702,7 @@ ProjectionExec: expr=[c3@0 as c3, SUM(aggregate_test_100.c9) ORDER BY [aggregate ProjectionExec: expr=[c3@1 as c3, c4@2 as c4, c9@3 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as SUM(aggregate_test_100.c9)] BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int16(NULL)), end_bound: CurrentRow }], mode=[Sorted] SortExec: expr=[c3@1 + c4@2 DESC,c9@3 DESC,c2@0 ASC NULLS LAST] - CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c2, c3, c4, c9] + CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, c3, c4, c9], has_header=true query III @@ -1756,7 +1756,7 @@ ProjectionExec: expr=[COUNT(UInt8(1))@0 as global_count] CoalesceBatchesExec: target_batch_size=4096 FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434 RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 - CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c13] + CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c13], has_header=true query I SELECT count(*) as global_count FROM @@ -1802,7 +1802,7 @@ GlobalLimitExec: skip=0, fetch=5 ProjectionExec: expr=[c3@1 as c3, c9@2 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as SUM(aggregate_test_100.c9)] BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int16(NULL)), end_bound: CurrentRow }], mode=[Sorted] SortExec: expr=[c3@1 DESC,c9@2 DESC,c2@0 ASC NULLS LAST] - CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c2, c3, c9] + CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, c3, c9], has_header=true @@ -1841,7 +1841,7 @@ SortPreservingMergeExec: [c1@0 ASC NULLS LAST] CoalesceBatchesExec: target_batch_size=4096 RepartitionExec: partitioning=Hash([Column { name: "c1", index: 0 }], 2), input_partitions=2 RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 - CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1] + CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1], has_header=true query TI SELECT c1, ROW_NUMBER() OVER (PARTITION BY c1) as rn1 FROM aggregate_test_100 ORDER BY c1 ASC @@ -1968,7 +1968,7 @@ SortExec: expr=[c1@0 ASC NULLS LAST] CoalesceBatchesExec: target_batch_size=4096 RepartitionExec: partitioning=Hash([Column { name: "c1", index: 0 }], 2), input_partitions=2 RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 - CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1] + CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1], has_header=true statement ok set datafusion.optimizer.repartition_sorts = true; @@ -1997,7 +1997,7 @@ SortExec: expr=[c1@0 ASC NULLS LAST] CoalesceBatchesExec: target_batch_size=4096 RepartitionExec: partitioning=Hash([Column { name: "c1", index: 0 }], 2), input_partitions=2 RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 - CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c9] + CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c9], has_header=true # test_window_agg_with_global_limit statement ok @@ -2020,7 +2020,7 @@ ProjectionExec: expr=[ARRAYAGG(aggregate_test_100.c13)@0 as array_agg1] RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 GlobalLimitExec: skip=0, fetch=1 SortExec: fetch=1, expr=[c13@0 ASC NULLS LAST] - CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c13] + CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c13], has_header=true query ? @@ -2087,7 +2087,7 @@ GlobalLimitExec: skip=0, fetch=5 BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], mode=[Sorted] WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(NULL)) }] SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST,c9@3 ASC NULLS LAST,c8@2 ASC NULLS LAST] - CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c2, c8, c9] + CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c8, c9], has_header=true @@ -2146,7 +2146,7 @@ ProjectionExec: expr=[c9@1 as c9, SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER B WindowAggExec: wdw=[SUM(t1.c9): Ok(Field { name: "SUM(t1.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(NULL)) }] SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST,c9@3 ASC NULLS LAST,c8@2 ASC NULLS LAST] ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c8@2 as c8, c9@3 as c9, c1@0 as c1_alias] - CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c2, c8, c9] + CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c8, c9], has_header=true @@ -2192,7 +2192,7 @@ ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2] ProjectionExec: expr=[c1@0 as c1, c9@2 as c9, c12@3 as c12, SUM(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] GROUPS BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as SUM(aggregate_test_100.c12)] BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c12): Ok(Field { name: "SUM(aggregate_test_100.c12)", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Groups, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }], mode=[Sorted] SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST] - CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c2, c9, c12] + CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c9, c12], has_header=true query RR SELECT SUM(c12) OVER(ORDER BY c1, c2 GROUPS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as sum1, @@ -2228,7 +2228,7 @@ GlobalLimitExec: skip=0, fetch=5 ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as rn1] BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted] SortExec: expr=[c9@0 ASC NULLS LAST] - CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c9] + CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], has_header=true query II SELECT c9, rn1 FROM (SELECT c9, @@ -2267,7 +2267,7 @@ GlobalLimitExec: skip=0, fetch=5 ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as rn1] BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted] SortExec: expr=[c9@0 DESC] - CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c9] + CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], has_header=true query II SELECT c9, rn1 FROM (SELECT c9, @@ -2307,7 +2307,7 @@ GlobalLimitExec: skip=0, fetch=5 ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as rn1] BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted] SortExec: expr=[c9@0 DESC] - CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c9] + CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], has_header=true query II SELECT c9, rn1 FROM (SELECT c9, @@ -2350,7 +2350,7 @@ GlobalLimitExec: skip=0, fetch=5 ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as rn1] BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted] SortExec: expr=[c9@0 DESC] - CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c9] + CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], has_header=true query II SELECT c9, rn1 FROM (SELECT c9, diff --git a/docs/source/user-guide/sql/explain.md b/docs/source/user-guide/sql/explain.md index ca4169d5773cc..b240b14eb5816 100644 --- a/docs/source/user-guide/sql/explain.md +++ b/docs/source/user-guide/sql/explain.md @@ -44,7 +44,7 @@ EXPLAIN SELECT SUM(x) FROM table GROUP BY b; | | RepartitionExec: partitioning=Hash([Column { name: "b", index: 0 }], 16) | | | AggregateExec: mode=Partial, gby=[b@1 as b], aggr=[SUM(table.x)] | | | RepartitionExec: partitioning=RoundRobinBatch(16) | -| | CsvExec: source=Path(/tmp/table.csv: [/tmp/table.csv]), has_header=false, limit=None, projection=[x, b] | +| | CsvExec: file_groups={1 group: [[/tmp/table.csv]]}, projection=[x, b], has_header=false | | | | +---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------+ ``` @@ -66,6 +66,6 @@ EXPLAIN ANALYZE SELECT SUM(x) FROM table GROUP BY b; | | RepartitionExec: partitioning=Hash([Column { name: "b", index: 0 }], 16), metrics=[sendTime=839560, fetchTime=122528525, repartitionTime=5327877] | | | HashAggregateExec: mode=Partial, gby=[b@1 as b], aggr=[SUM(x)], metrics=[outputRows=2] | | | RepartitionExec: partitioning=RoundRobinBatch(16), metrics=[fetchTime=5660489, repartitionTime=0, sendTime=8012] | -| | CsvExec: source=Path(/tmp/table.csv: [/tmp/table.csv]), has_header=false, metrics=[] | +| | CsvExec: file_groups={1 group: [[/tmp/table.csv]]}, has_header=false, metrics=[] | +-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+ ```