From 4b29c4c00d2fbc3dd51dfb7bf49f67669c75baba Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 23 May 2024 18:10:28 -0400 Subject: [PATCH] Simplify ParquetExec::new() --- .../src/datasource/file_format/parquet.rs | 11 +- .../datasource/physical_plan/parquet/mod.rs | 256 ++++++++++-------- .../combine_partial_final_agg.rs | 25 +- .../enforce_distribution.rs | 56 ++-- .../core/src/physical_optimizer/test_utils.rs | 50 ++-- datafusion/core/src/test_util/parquet.rs | 14 +- 6 files changed, 207 insertions(+), 205 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 7fcd41049cb4e..50f3447c4900f 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -258,12 +258,11 @@ impl FileFormat for ParquetFormat { // will not prune data based on the statistics. let predicate = self.enable_pruning().then(|| filters.cloned()).flatten(); - Ok(Arc::new(ParquetExec::new( - conf, - predicate, - self.metadata_size_hint(), - self.options.clone(), - ))) + Ok(Arc::new( + ParquetExec::new_with_options(conf, self.options.clone()) + .with_predicate(predicate) + .with_metadata_size_hint(self.metadata_size_hint()), + )) } async fn create_writer_physical_plan( diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index 410413ebd71b4..dab36a8edb09e 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -102,48 +102,27 @@ pub struct ParquetExec { } impl ParquetExec { - /// Create a new Parquet reader execution plan provided file list and schema. - pub fn new( + /// Create a `ParquetExec` to read the provided file list. + /// + /// # Example + /// ``` + /// TODO + /// ``` + pub fn new(base_config: FileScanConfig) -> Self { + Self::new_with_options(base_config, TableParquetOptions::default()) + } + + /// Create a `ParquetExec` to read the provided file list with options + pub fn new_with_options( base_config: FileScanConfig, - predicate: Option>, - metadata_size_hint: Option, table_parquet_options: TableParquetOptions, ) -> Self { - debug!("Creating ParquetExec, files: {:?}, projection {:?}, predicate: {:?}, limit: {:?}", - base_config.file_groups, base_config.projection, predicate, base_config.limit); + debug!( + "Creating ParquetExec, files: {:?}, projection {:?}, limit: {:?}", + base_config.file_groups, base_config.projection, base_config.limit + ); let metrics = ExecutionPlanMetricsSet::new(); - let predicate_creation_errors = - MetricBuilder::new(&metrics).global_counter("num_predicate_creation_errors"); - - let file_schema = &base_config.file_schema; - let pruning_predicate = predicate - .clone() - .and_then(|predicate_expr| { - match PruningPredicate::try_new(predicate_expr, file_schema.clone()) { - Ok(pruning_predicate) => Some(Arc::new(pruning_predicate)), - Err(e) => { - debug!("Could not create pruning predicate for: {e}"); - predicate_creation_errors.add(1); - None - } - } - }) - .filter(|p| !p.always_true()); - - let page_pruning_predicate = predicate.as_ref().and_then(|predicate_expr| { - match PagePruningPredicate::try_new(predicate_expr, file_schema.clone()) { - Ok(pruning_predicate) => Some(Arc::new(pruning_predicate)), - Err(e) => { - debug!( - "Could not create page pruning predicate for '{:?}': {}", - pruning_predicate, e - ); - predicate_creation_errors.add(1); - None - } - } - }); let (projected_schema, projected_statistics, projected_output_ordering) = base_config.project(); @@ -156,10 +135,10 @@ impl ParquetExec { base_config, projected_statistics, metrics, - predicate, - pruning_predicate, - page_pruning_predicate, - metadata_size_hint, + predicate: None, + pruning_predicate: None, + page_pruning_predicate: None, + metadata_size_hint: None, parquet_file_reader_factory: None, cache, table_parquet_options, @@ -177,6 +156,78 @@ impl ParquetExec { &self.table_parquet_options } + /// Set the predicate (and [`PruningPredicate`]) that will be used to prune + /// row groups and pages + pub fn with_predicate(mut self, predicate: Option>) -> Self { + debug!(" Setting ParquetExec predicate: {:?}", predicate); + let Some(predicate) = predicate else { + self.predicate = None; + self.pruning_predicate = None; + self.page_pruning_predicate = None; + return self; + }; + + let predicate_creation_errors = MetricBuilder::new(&self.metrics) + .global_counter("num_predicate_creation_errors"); + + let file_schema = &self.base_config.file_schema; + + self.pruning_predicate = + match PruningPredicate::try_new(predicate.clone(), file_schema.clone()) { + Ok(pruning_predicate) if !pruning_predicate.always_true() => { + Some(Arc::new(pruning_predicate)) + } + Ok(_pruning_predicate) => { + debug!( + "Pruning predicate is always true: {:?}, skipping", + predicate + ); + None + } + Err(e) => { + debug!("Could not create pruning predicate for: {e}"); + predicate_creation_errors.add(1); + None + } + }; + + self.page_pruning_predicate = + match PagePruningPredicate::try_new(&predicate, file_schema.clone()) { + Ok(pruning_predicate) => Some(Arc::new(pruning_predicate)), + Err(e) => { + debug!( + "Could not create page pruning predicate for '{:?}': {}", + self.pruning_predicate, e + ); + predicate_creation_errors.add(1); + None + } + }; + + self.predicate = Some(predicate); + + self + } + + /// Set the options for reading Parquet files + pub fn with_table_parquet_options( + mut self, + table_parquet_options: TableParquetOptions, + ) -> Self { + self.table_parquet_options = table_parquet_options; + self + } + + /// Set the metadata size hint for the parquet reader + /// + /// This value determines how many bytes at the end of the file the + /// ParquetExec will request in the initial IO. If this is too small, the + /// ParquetExec will need to make additional IO requests to read the footer. + pub fn with_metadata_size_hint(mut self, metadata_size_hint: Option) -> Self { + self.metadata_size_hint = metadata_size_hint; + self + } + /// Optional predicate. pub fn predicate(&self) -> Option<&Arc> { self.predicate.as_ref() @@ -931,21 +982,17 @@ mod tests { let predicate = predicate.map(|p| logical2physical(&p, &file_schema)); // prepare the scan - let mut parquet_exec = ParquetExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::local_filesystem(), - file_groups: vec![file_groups], - statistics: Statistics::new_unknown(&file_schema), - file_schema, - projection, - limit: None, - table_partition_cols: vec![], - output_ordering: vec![], - }, - predicate, - None, - Default::default(), - ); + let mut parquet_exec = ParquetExec::new(FileScanConfig { + object_store_url: ObjectStoreUrl::local_filesystem(), + file_groups: vec![file_groups], + statistics: Statistics::new_unknown(&file_schema), + file_schema, + projection, + limit: None, + table_partition_cols: vec![], + output_ordering: vec![], + }) + .with_predicate(predicate); if pushdown_predicate { parquet_exec = parquet_exec @@ -1589,21 +1636,16 @@ mod tests { expected_row_num: Option, file_schema: SchemaRef, ) -> Result<()> { - let parquet_exec = ParquetExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::local_filesystem(), - file_groups, - statistics: Statistics::new_unknown(&file_schema), - file_schema, - projection: None, - limit: None, - table_partition_cols: vec![], - output_ordering: vec![], - }, - None, - None, - Default::default(), - ); + let parquet_exec = ParquetExec::new(FileScanConfig { + object_store_url: ObjectStoreUrl::local_filesystem(), + file_groups, + statistics: Statistics::new_unknown(&file_schema), + file_schema, + projection: None, + limit: None, + table_partition_cols: vec![], + output_ordering: vec![], + }); assert_eq!( parquet_exec .properties() @@ -1699,33 +1741,28 @@ mod tests { ), ]); - let parquet_exec = ParquetExec::new( - FileScanConfig { - object_store_url, - file_groups: vec![vec![partitioned_file]], - file_schema: schema.clone(), - statistics: Statistics::new_unknown(&schema), - // file has 10 cols so index 12 should be month and 13 should be day - projection: Some(vec![0, 1, 2, 12, 13]), - limit: None, - table_partition_cols: vec![ - Field::new("year", DataType::Utf8, false), - Field::new("month", DataType::UInt8, false), - Field::new( - "day", - DataType::Dictionary( - Box::new(DataType::UInt16), - Box::new(DataType::Utf8), - ), - false, + let parquet_exec = ParquetExec::new(FileScanConfig { + object_store_url, + file_groups: vec![vec![partitioned_file]], + file_schema: schema.clone(), + statistics: Statistics::new_unknown(&schema), + // file has 10 cols so index 12 should be month and 13 should be day + projection: Some(vec![0, 1, 2, 12, 13]), + limit: None, + table_partition_cols: vec![ + Field::new("year", DataType::Utf8, false), + Field::new("month", DataType::UInt8, false), + Field::new( + "day", + DataType::Dictionary( + Box::new(DataType::UInt16), + Box::new(DataType::Utf8), ), - ], - output_ordering: vec![], - }, - None, - None, - Default::default(), - ); + false, + ), + ], + output_ordering: vec![], + }); assert_eq!( parquet_exec.cache.output_partitioning().partition_count(), 1 @@ -1779,21 +1816,16 @@ mod tests { extensions: None, }; - let parquet_exec = ParquetExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::local_filesystem(), - file_groups: vec![vec![partitioned_file]], - file_schema: Arc::new(Schema::empty()), - statistics: Statistics::new_unknown(&Schema::empty()), - projection: None, - limit: None, - table_partition_cols: vec![], - output_ordering: vec![], - }, - None, - None, - Default::default(), - ); + let parquet_exec = ParquetExec::new(FileScanConfig { + object_store_url: ObjectStoreUrl::local_filesystem(), + file_groups: vec![vec![partitioned_file]], + file_schema: Arc::new(Schema::empty()), + statistics: Statistics::new_unknown(&Schema::empty()), + projection: None, + limit: None, + table_partition_cols: vec![], + output_ordering: vec![], + }); let mut results = parquet_exec.execute(0, state.task_ctx())?; let batch = results.next().await.unwrap(); diff --git a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs index e41e4dd316479..f324b95a550e8 100644 --- a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs +++ b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs @@ -245,21 +245,16 @@ mod tests { } fn parquet_exec(schema: &SchemaRef) -> Arc { - Arc::new(ParquetExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::parse("test:///").unwrap(), - file_schema: schema.clone(), - file_groups: vec![vec![PartitionedFile::new("x".to_string(), 100)]], - statistics: Statistics::new_unknown(schema), - projection: None, - limit: None, - table_partition_cols: vec![], - output_ordering: vec![], - }, - None, - None, - Default::default(), - )) + Arc::new(ParquetExec::new(FileScanConfig { + object_store_url: ObjectStoreUrl::parse("test:///").unwrap(), + file_schema: schema.clone(), + file_groups: vec![vec![PartitionedFile::new("x".to_string(), 100)]], + statistics: Statistics::new_unknown(schema), + projection: None, + limit: None, + table_partition_cols: vec![], + output_ordering: vec![], + })) } fn partial_aggregate_exec( diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index cd84e911d381a..51a53253666dd 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -1431,21 +1431,16 @@ pub(crate) mod tests { pub(crate) fn parquet_exec_with_sort( output_ordering: Vec>, ) -> Arc { - Arc::new(ParquetExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::parse("test:///").unwrap(), - file_schema: schema(), - file_groups: vec![vec![PartitionedFile::new("x".to_string(), 100)]], - statistics: Statistics::new_unknown(&schema()), - projection: None, - limit: None, - table_partition_cols: vec![], - output_ordering, - }, - None, - None, - Default::default(), - )) + Arc::new(ParquetExec::new(FileScanConfig { + object_store_url: ObjectStoreUrl::parse("test:///").unwrap(), + file_schema: schema(), + file_groups: vec![vec![PartitionedFile::new("x".to_string(), 100)]], + statistics: Statistics::new_unknown(&schema()), + projection: None, + limit: None, + table_partition_cols: vec![], + output_ordering, + })) } fn parquet_exec_multiple() -> Arc { @@ -1456,24 +1451,19 @@ pub(crate) mod tests { fn parquet_exec_multiple_sorted( output_ordering: Vec>, ) -> Arc { - Arc::new(ParquetExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::parse("test:///").unwrap(), - file_schema: schema(), - file_groups: vec![ - vec![PartitionedFile::new("x".to_string(), 100)], - vec![PartitionedFile::new("y".to_string(), 100)], - ], - statistics: Statistics::new_unknown(&schema()), - projection: None, - limit: None, - table_partition_cols: vec![], - output_ordering, - }, - None, - None, - Default::default(), - )) + Arc::new(ParquetExec::new(FileScanConfig { + object_store_url: ObjectStoreUrl::parse("test:///").unwrap(), + file_schema: schema(), + file_groups: vec![ + vec![PartitionedFile::new("x".to_string(), 100)], + vec![PartitionedFile::new("y".to_string(), 100)], + ], + statistics: Statistics::new_unknown(&schema()), + projection: None, + limit: None, + table_partition_cols: vec![], + output_ordering, + })) } fn csv_exec() -> Arc { diff --git a/datafusion/core/src/physical_optimizer/test_utils.rs b/datafusion/core/src/physical_optimizer/test_utils.rs index 7bc1eeb7c4a5a..580b06936cd4e 100644 --- a/datafusion/core/src/physical_optimizer/test_utils.rs +++ b/datafusion/core/src/physical_optimizer/test_utils.rs @@ -274,21 +274,16 @@ pub fn sort_preserving_merge_exec( /// Create a non sorted parquet exec pub fn parquet_exec(schema: &SchemaRef) -> Arc { - Arc::new(ParquetExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::parse("test:///").unwrap(), - file_schema: schema.clone(), - file_groups: vec![vec![PartitionedFile::new("x".to_string(), 100)]], - statistics: Statistics::new_unknown(schema), - projection: None, - limit: None, - table_partition_cols: vec![], - output_ordering: vec![], - }, - None, - None, - Default::default(), - )) + Arc::new(ParquetExec::new(FileScanConfig { + object_store_url: ObjectStoreUrl::parse("test:///").unwrap(), + file_schema: schema.clone(), + file_groups: vec![vec![PartitionedFile::new("x".to_string(), 100)]], + statistics: Statistics::new_unknown(schema), + projection: None, + limit: None, + table_partition_cols: vec![], + output_ordering: vec![], + })) } // Created a sorted parquet exec @@ -298,21 +293,16 @@ pub fn parquet_exec_sorted( ) -> Arc { let sort_exprs = sort_exprs.into_iter().collect(); - Arc::new(ParquetExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::parse("test:///").unwrap(), - file_schema: schema.clone(), - file_groups: vec![vec![PartitionedFile::new("x".to_string(), 100)]], - statistics: Statistics::new_unknown(schema), - projection: None, - limit: None, - table_partition_cols: vec![], - output_ordering: vec![sort_exprs], - }, - None, - None, - Default::default(), - )) + Arc::new(ParquetExec::new(FileScanConfig { + object_store_url: ObjectStoreUrl::parse("test:///").unwrap(), + file_schema: schema.clone(), + file_groups: vec![vec![PartitionedFile::new("x".to_string(), 100)]], + statistics: Statistics::new_unknown(schema), + projection: None, + limit: None, + table_partition_cols: vec![], + output_ordering: vec![sort_exprs], + })) } pub fn union_exec(input: Vec>) -> Arc { diff --git a/datafusion/core/src/test_util/parquet.rs b/datafusion/core/src/test_util/parquet.rs index 1d5668c7ec55e..3bdc04ce973cf 100644 --- a/datafusion/core/src/test_util/parquet.rs +++ b/datafusion/core/src/test_util/parquet.rs @@ -172,20 +172,16 @@ impl TestParquetFile { let filter = simplifier.coerce(filter, &df_schema).unwrap(); let physical_filter_expr = create_physical_expr(&filter, &df_schema, &ExecutionProps::default())?; - let parquet_exec = Arc::new(ParquetExec::new( - scan_config, - Some(physical_filter_expr.clone()), - None, - parquet_options, - )); + let parquet_exec = Arc::new( + ParquetExec::new_with_options(scan_config, parquet_options) + .with_predicate(Some(physical_filter_expr.clone())), + ); let exec = Arc::new(FilterExec::try_new(physical_filter_expr, parquet_exec)?); Ok(exec) } else { - Ok(Arc::new(ParquetExec::new( + Ok(Arc::new(ParquetExec::new_with_options( scan_config, - None, - None, parquet_options, ))) }