From b9a5bb63cc48b2d5c54e558721f5bfd7186f5264 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 13 Oct 2022 10:47:41 -0400 Subject: [PATCH 1/3] Expose parquet reader settings as DataFusion config settings --- benchmarks/src/bin/parquet_filter_pushdown.rs | 56 +++++-- datafusion/core/src/config.rs | 78 +++++++--- .../core/src/datasource/file_format/mod.rs | 2 + .../core/src/datasource/listing/table.rs | 1 + datafusion/core/src/execution/context.rs | 9 +- datafusion/core/src/execution/options.rs | 30 +++- .../src/physical_optimizer/repartition.rs | 2 + .../src/physical_plan/file_format/avro.rs | 4 + .../physical_plan/file_format/file_stream.rs | 2 + .../src/physical_plan/file_format/json.rs | 4 + .../core/src/physical_plan/file_format/mod.rs | 12 +- .../src/physical_plan/file_format/parquet.rs | 138 +++++++++++------- datafusion/core/src/test/mod.rs | 2 + .../core/tests/custom_parquet_reader.rs | 2 + datafusion/core/tests/row.rs | 2 + 15 files changed, 245 insertions(+), 99 deletions(-) diff --git a/benchmarks/src/bin/parquet_filter_pushdown.rs b/benchmarks/src/bin/parquet_filter_pushdown.rs index e4bc9295e5abd..f77cbc8fd6802 100644 --- a/benchmarks/src/bin/parquet_filter_pushdown.rs +++ b/benchmarks/src/bin/parquet_filter_pushdown.rs @@ -23,6 +23,10 @@ use arrow::datatypes::{DataType, Field, Int32Type, Schema, SchemaRef, TimeUnit}; use arrow::record_batch::RecordBatch; use arrow::util::pretty; use datafusion::common::{Result, ToDFSchema}; +use datafusion::config::{ + ConfigOptions, OPT_PARQUET_ENABLE_PAGE_INDEX, OPT_PARQUET_PUSHDOWN_FILTERS, + OPT_PARQUET_REORDER_FILTERS, +}; use datafusion::datasource::listing::{ListingTableUrl, PartitionedFile}; use datafusion::datasource::object_store::ObjectStoreUrl; use datafusion::execution::context::ExecutionProps; @@ -30,9 +34,7 @@ use datafusion::logical_expr::{lit, or, Expr}; use datafusion::optimizer::utils::disjunction; use datafusion::physical_expr::create_physical_expr; use datafusion::physical_plan::collect; -use datafusion::physical_plan::file_format::{ - FileScanConfig, ParquetExec, ParquetScanOptions, -}; +use datafusion::physical_plan::file_format::{FileScanConfig, ParquetExec}; use datafusion::physical_plan::filter::FilterExec; use datafusion::prelude::{col, SessionConfig, SessionContext}; use object_store::path::Path; @@ -109,6 +111,13 @@ async fn main() -> Result<()> { Ok(()) } +#[derive(Debug, Clone)] +struct ParquetScanOptions { + pushdown_filters: bool, + reorder_filters: bool, + enable_page_index: bool, +} + async fn run_benchmarks( ctx: &mut SessionContext, object_store_url: ObjectStoreUrl, @@ -117,15 +126,21 @@ async fn run_benchmarks( debug: bool, ) -> Result<()> { let scan_options_matrix = vec![ - ParquetScanOptions::default(), - ParquetScanOptions::default() - .with_page_index(true) - .with_pushdown_filters(true) - .with_reorder_predicates(true), - ParquetScanOptions::default() - .with_page_index(true) - .with_pushdown_filters(true) - .with_reorder_predicates(false), + ParquetScanOptions { + pushdown_filters: false, + reorder_filters: false, + enable_page_index: false, + }, + ParquetScanOptions { + pushdown_filters: true, + reorder_filters: true, + enable_page_index: true, + }, + ParquetScanOptions { + pushdown_filters: true, + reorder_filters: true, + enable_page_index: false, + }, ]; let filter_matrix = vec![ @@ -193,6 +208,18 @@ async fn exec_scan( debug: bool, ) -> Result { let schema = BatchBuilder::schema(); + + let ParquetScanOptions { + pushdown_filters, + reorder_filters, + enable_page_index, + } = scan_options; + + let mut config_options = ConfigOptions::new(); + config_options.set_bool(OPT_PARQUET_PUSHDOWN_FILTERS, pushdown_filters); + config_options.set_bool(OPT_PARQUET_REORDER_FILTERS, reorder_filters); + config_options.set_bool(OPT_PARQUET_ENABLE_PAGE_INDEX, enable_page_index); + let scan_config = FileScanConfig { object_store_url, file_schema: schema.clone(), @@ -206,6 +233,7 @@ async fn exec_scan( projection: None, limit: None, table_partition_cols: vec![], + config_options: config_options.into_shareable(), }; let df_schema = schema.clone().to_dfschema()?; @@ -217,9 +245,7 @@ async fn exec_scan( &ExecutionProps::default(), )?; - let parquet_exec = Arc::new( - ParquetExec::new(scan_config, Some(filter), None).with_scan_options(scan_options), - ); + let parquet_exec = Arc::new(ParquetExec::new(scan_config, Some(filter), None)); let exec = Arc::new(FilterExec::try_new(physical_filter_expr, parquet_exec)?); diff --git a/datafusion/core/src/config.rs b/datafusion/core/src/config.rs index 2a2139fb255cb..b95c12d3ba1a8 100644 --- a/datafusion/core/src/config.rs +++ b/datafusion/core/src/config.rs @@ -21,8 +21,10 @@ use arrow::datatypes::DataType; use datafusion_common::ScalarValue; use itertools::Itertools; use log::warn; +use parking_lot::RwLock; use std::collections::HashMap; use std::env; +use std::sync::Arc; /// Configuration option "datafusion.optimizer.filter_null_join_keys" pub const OPT_FILTER_NULL_JOIN_KEYS: &str = "datafusion.optimizer.filter_null_join_keys"; @@ -43,13 +45,25 @@ pub const OPT_COALESCE_BATCHES: &str = "datafusion.execution.coalesce_batches"; pub const OPT_COALESCE_TARGET_BATCH_SIZE: &str = "datafusion.execution.coalesce_target_batch_size"; +/// Configuration option "datafusion.execution.time_zone" +pub const OPT_TIME_ZONE: &str = "datafusion.execution.time_zone"; + +/// Configuration option "datafusion.execution.parquet.pushdown_filters" +pub const OPT_PARQUET_PUSHDOWN_FILTERS: &str = + "datafusion.execution.parquet.pushdown_filters"; + +/// Configuration option "datafusion.execution.parquet.reorder_filters" +pub const OPT_PARQUET_REORDER_FILTERS: &str = + "datafusion.execution.parquet.reorder_filters"; + +/// Configuration option "datafusion.execution.parquet.enable_page_index" +pub const OPT_PARQUET_ENABLE_PAGE_INDEX: &str = + "datafusion.execution.parquet.enable_page_index"; + /// Configuration option "datafusion.optimizer.skip_failed_rules" pub const OPT_OPTIMIZER_SKIP_FAILED_RULES: &str = "datafusion.optimizer.skip_failed_rules"; -/// Configuration option "datafusion.execution.time_zone" -pub const OPT_TIME_ZONE: &str = "datafusion.execution.time_zone"; - /// Definition of a configuration option pub struct ConfigDefinition { /// key used to identifier this configuration option @@ -173,11 +187,11 @@ impl BuiltInConfigs { false, ), ConfigDefinition::new_u64( - OPT_BATCH_SIZE, - "Default batch size while creating new batches, it's especially useful for \ - buffer-in-memory batches since creating tiny batches would results in too much metadata \ - memory consumption.", - 8192, + OPT_BATCH_SIZE, + "Default batch size while creating new batches, it's especially useful for \ + buffer-in-memory batches since creating tiny batches would results in too much metadata \ + memory consumption.", + 8192, ), ConfigDefinition::new_bool( OPT_COALESCE_BATCHES, @@ -191,9 +205,35 @@ impl BuiltInConfigs { ConfigDefinition::new_u64( OPT_COALESCE_TARGET_BATCH_SIZE, format!("Target batch size when coalescing batches. Uses in conjunction with the \ - configuration setting '{}'.", OPT_COALESCE_BATCHES), + configuration setting '{}'.", OPT_COALESCE_BATCHES), 4096, ), + ConfigDefinition::new_string( + OPT_TIME_ZONE, + "The session time zone which some function require \ + e.g. EXTRACT(HOUR from SOME_TIME) shift the underline datetime according to the time zone, + then extract the hour.", + "UTC".into() + ), + ConfigDefinition::new_bool( + OPT_PARQUET_PUSHDOWN_FILTERS, + "If true, filter expressions are be applied during the parquet decoding operation to \ + reduce the number of rows decoded.", + false, + ), + ConfigDefinition::new_bool( + OPT_PARQUET_REORDER_FILTERS, + "If true, filter expressions evaluated during the parquet decoding opearation \ + will be reordered heuristically to minimize the cost of evaluation. If false, \ + the filters are applied in the same order as written in the query.", + false, + ), + ConfigDefinition::new_bool( + OPT_PARQUET_ENABLE_PAGE_INDEX, + "If true, uses parquet data page level metadata (Page Index) statistics \ + to reduce the number of rows decoded.", + false, + ), ConfigDefinition::new_bool( OPT_OPTIMIZER_SKIP_FAILED_RULES, "When set to true, the logical plan optimizer will produce warning \ @@ -201,13 +241,7 @@ impl BuiltInConfigs { rule. When set to false, any rules that produce errors will cause the query to fail.", true ), - ConfigDefinition::new_string( - OPT_TIME_ZONE, - "The session time zone which some function require \ - e.g. EXTRACT(HOUR from SOME_TIME) shift the underline datetime according to the time zone, - then extract the hour", - "UTC".into() - )] + ] } } @@ -255,8 +289,16 @@ impl ConfigOptions { Self { options } } - /// Create new ConfigOptions struct, taking values from environment variables where possible. - /// For example, setting `DATAFUSION_EXECUTION_BATCH_SIZE` to control `datafusion.execution.batch_size`. + /// Create a new [`ConfigOptions`] wrapped in an RwLock and Arc + pub fn into_shareable(self) -> Arc> { + Arc::new(RwLock::new(self)) + } + + /// Create new ConfigOptions struct, taking values from + /// environment variables where possible. + /// + /// For example, setting `DATAFUSION_EXECUTION_BATCH_SIZE` will + /// control `datafusion.execution.batch_size`. pub fn from_env() -> Self { let built_in = BuiltInConfigs::new(); let mut options = HashMap::with_capacity(built_in.config_definitions.len()); diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index 82f5b1df8839e..6775117e2c5b1 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -84,6 +84,7 @@ pub trait FileFormat: Send + Sync + fmt::Debug { #[cfg(test)] pub(crate) mod test_util { use super::*; + use crate::config::ConfigOptions; use crate::datasource::listing::PartitionedFile; use crate::datasource::object_store::ObjectStoreUrl; use crate::test::object_store::local_unpartitioned_file; @@ -122,6 +123,7 @@ pub(crate) mod test_util { projection, limit, table_partition_cols: vec![], + config_options: ConfigOptions::new().into_shareable(), }, &[], ) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 3a0c4dceea244..deaa09249e110 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -404,6 +404,7 @@ impl TableProvider for ListingTable { projection: projection.clone(), limit, table_partition_cols: self.options.table_partition_cols.clone(), + config_options: ctx.config.config_options(), }, filters, ) diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index f7fb0eb901943..c50f79426c8f4 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -1184,7 +1184,7 @@ impl SessionConfig { /// Create an execution config with config options read from the environment pub fn from_env() -> Self { Self { - config_options: Arc::new(RwLock::new(ConfigOptions::from_env())), + config_options: ConfigOptions::from_env().into_shareable(), ..Default::default() } } @@ -1324,6 +1324,13 @@ impl SessionConfig { map } + /// Return a handle to the shared configuration options. + /// + /// [`config_options`]: SessionContext::config_option + pub fn config_options(&self) -> Arc> { + self.config_options.clone() + } + /// Add extensions. /// /// Extensions can be used to attach extra data to the session config -- e.g. tracing information or caches. diff --git a/datafusion/core/src/execution/options.rs b/datafusion/core/src/execution/options.rs index 9ddd3f1d686cc..150a20670ce2d 100644 --- a/datafusion/core/src/execution/options.rs +++ b/datafusion/core/src/execution/options.rs @@ -34,7 +34,12 @@ use crate::datasource::{ listing::ListingOptions, }; -/// CSV file read option +/// Options that control the reading of CSV files. +/// +/// Note this structure is supplied when a datasource is created and +/// can not not vary from statement to statement. For settings that +/// can vary statement to statement see +/// [`ConfigOptions`](crate::config::ConfigOptions). #[derive(Clone)] pub struct CsvReadOptions<'a> { /// Does the CSV file have a header? @@ -150,7 +155,12 @@ impl<'a> CsvReadOptions<'a> { } } -/// Parquet read options +/// Options that control the reading of Parquet files. +/// +/// Note this structure is supplied when a datasource is created and +/// can not not vary from statement to statement. For settings that +/// can vary statement to statement see +/// [`ConfigOptions`](crate::config::ConfigOptions). #[derive(Clone)] pub struct ParquetReadOptions<'a> { /// File extension; only files with this extension are selected for data input. @@ -160,10 +170,12 @@ pub struct ParquetReadOptions<'a> { pub table_partition_cols: Vec, /// Should DataFusion parquet reader use the predicate to prune data, /// overridden by value on execution::context::SessionConfig + // TODO move this into ConfigOptions pub parquet_pruning: bool, /// Tell the parquet reader to skip any metadata that may be in /// the file Schema. This can help avoid schema conflicts due to /// metadata. Defaults to true. + // TODO move this into ConfigOptions pub skip_metadata: bool, } @@ -217,7 +229,12 @@ impl<'a> ParquetReadOptions<'a> { } } -/// Avro read options +/// Options that control the reading of AVRO files. +/// +/// Note this structure is supplied when a datasource is created and +/// can not not vary from statement to statement. For settings that +/// can vary statement to statement see +/// [`ConfigOptions`](crate::config::ConfigOptions). #[derive(Clone)] pub struct AvroReadOptions<'a> { /// The data source schema. @@ -261,7 +278,12 @@ impl<'a> AvroReadOptions<'a> { } } -/// Line-delimited JSON read options +/// Options that control the reading of Line-delimited JSON files (NDJson) +/// +/// Note this structure is supplied when a datasource is created and +/// can not not vary from statement to statement. For settings that +/// can vary statement to statement see +/// [`ConfigOptions`](crate::config::ConfigOptions). #[derive(Clone)] pub struct NdJsonReadOptions<'a> { /// The data source schema. diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs index 1d2b259086839..839908d0659bb 100644 --- a/datafusion/core/src/physical_optimizer/repartition.rs +++ b/datafusion/core/src/physical_optimizer/repartition.rs @@ -240,6 +240,7 @@ mod tests { use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use super::*; + use crate::config::ConfigOptions; use crate::datasource::listing::PartitionedFile; use crate::datasource::object_store::ObjectStoreUrl; use crate::physical_plan::aggregates::{ @@ -269,6 +270,7 @@ mod tests { projection: None, limit: None, table_partition_cols: vec![], + config_options: ConfigOptions::new().into_shareable(), }, None, None, diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs b/datafusion/core/src/physical_plan/file_format/avro.rs index 0b7841d885d20..2aab84fadbcfc 100644 --- a/datafusion/core/src/physical_plan/file_format/avro.rs +++ b/datafusion/core/src/physical_plan/file_format/avro.rs @@ -208,6 +208,7 @@ mod private { #[cfg(test)] #[cfg(feature = "avro")] mod tests { + use crate::config::ConfigOptions; use crate::datasource::file_format::{avro::AvroFormat, FileFormat}; use crate::datasource::listing::PartitionedFile; use crate::datasource::object_store::ObjectStoreUrl; @@ -237,6 +238,7 @@ mod tests { projection: Some(vec![0, 1, 2]), limit: None, table_partition_cols: vec![], + config_options: ConfigOptions::new().into_shareable(), }); assert_eq!(avro_exec.output_partitioning().partition_count(), 1); @@ -306,6 +308,7 @@ mod tests { projection, limit: None, table_partition_cols: vec![], + config_options: ConfigOptions::new().into_shareable(), }); assert_eq!(avro_exec.output_partitioning().partition_count(), 1); @@ -374,6 +377,7 @@ mod tests { statistics: Statistics::default(), limit: None, table_partition_cols: vec!["date".to_owned()], + config_options: ConfigOptions::new().into_shareable(), }); assert_eq!(avro_exec.output_partitioning().partition_count(), 1); diff --git a/datafusion/core/src/physical_plan/file_format/file_stream.rs b/datafusion/core/src/physical_plan/file_format/file_stream.rs index 609e3b3a9b20c..df12f310534ec 100644 --- a/datafusion/core/src/physical_plan/file_format/file_stream.rs +++ b/datafusion/core/src/physical_plan/file_format/file_stream.rs @@ -321,6 +321,7 @@ mod tests { use futures::StreamExt; use super::*; + use crate::config::ConfigOptions; use crate::datasource::object_store::ObjectStoreUrl; use crate::physical_plan::metrics::ExecutionPlanMetricsSet; use crate::prelude::SessionContext; @@ -366,6 +367,7 @@ mod tests { projection: None, limit, table_partition_cols: vec![], + config_options: ConfigOptions::new().into_shareable(), }; let file_stream = FileStream::new( diff --git a/datafusion/core/src/physical_plan/file_format/json.rs b/datafusion/core/src/physical_plan/file_format/json.rs index d207988f4ffe1..c8c5d71bd73f2 100644 --- a/datafusion/core/src/physical_plan/file_format/json.rs +++ b/datafusion/core/src/physical_plan/file_format/json.rs @@ -256,6 +256,7 @@ mod tests { use object_store::local::LocalFileSystem; use crate::assert_batches_eq; + use crate::config::ConfigOptions; use crate::datasource::file_format::file_type::FileType; use crate::datasource::file_format::{json::JsonFormat, FileFormat}; use crate::datasource::listing::PartitionedFile; @@ -330,6 +331,7 @@ mod tests { projection: None, limit: Some(3), table_partition_cols: vec![], + config_options: ConfigOptions::new().into_shareable(), }, file_compression_type.to_owned(), ); @@ -405,6 +407,7 @@ mod tests { projection: None, limit: Some(3), table_partition_cols: vec![], + config_options: ConfigOptions::new().into_shareable(), }, file_compression_type.to_owned(), ); @@ -450,6 +453,7 @@ mod tests { projection: Some(vec![0, 2]), limit: None, table_partition_cols: vec![], + config_options: ConfigOptions::new().into_shareable(), }, file_compression_type.to_owned(), ); diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs index 2926f18a20235..c33e2bc14701f 100644 --- a/datafusion/core/src/physical_plan/file_format/mod.rs +++ b/datafusion/core/src/physical_plan/file_format/mod.rs @@ -30,9 +30,7 @@ mod row_filter; pub(crate) use self::csv::plan_to_csv; pub use self::csv::CsvExec; pub(crate) use self::parquet::plan_to_parquet; -pub use self::parquet::{ - ParquetExec, ParquetFileMetrics, ParquetFileReaderFactory, ParquetScanOptions, -}; +pub use self::parquet::{ParquetExec, ParquetFileMetrics, ParquetFileReaderFactory}; use arrow::{ array::{ArrayData, ArrayRef, DictionaryArray}, buffer::Buffer, @@ -44,9 +42,10 @@ pub use avro::AvroExec; pub use file_stream::{FileOpenFuture, FileOpener, FileStream}; pub(crate) use json::plan_to_json; pub use json::NdJsonExec; +use parking_lot::RwLock; -use crate::datasource::listing::FileRange; use crate::datasource::{listing::PartitionedFile, object_store::ObjectStoreUrl}; +use crate::{config::ConfigOptions, datasource::listing::FileRange}; use crate::{ error::{DataFusionError, Result}, scalar::ScalarValue, @@ -91,6 +90,8 @@ pub struct FileScanConfig { pub limit: Option, /// The partitioning column names pub table_partition_cols: Vec, + /// Configuration options passed to the physical plans + pub config_options: Arc>, } impl FileScanConfig { @@ -413,7 +414,7 @@ pub struct FileMeta { pub object_meta: ObjectMeta, /// An optional file range for a more fine-grained parallel execution pub range: Option, - /// An optional field for user defined per object metadata + /// An optional field for user defined per object metadata pub extensions: Option>, } @@ -698,6 +699,7 @@ mod tests { projection, statistics, table_partition_cols, + config_options: ConfigOptions::new().into_shareable(), } } } diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index a5b146dff30a8..ad8c808f0bf35 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -24,6 +24,9 @@ use std::ops::Range; use std::sync::Arc; use std::{any::Any, convert::TryInto}; +use crate::config::OPT_PARQUET_ENABLE_PAGE_INDEX; +use crate::config::OPT_PARQUET_PUSHDOWN_FILTERS; +use crate::config::OPT_PARQUET_REORDER_FILTERS; use crate::datasource::file_format::parquet::fetch_parquet_metadata; use crate::datasource::listing::FileRange; use crate::physical_plan::file_format::file_stream::{ @@ -72,43 +75,6 @@ use parquet::file::{ use parquet::format::PageLocation; use parquet::schema::types::ColumnDescriptor; -#[derive(Debug, Clone, Default)] -/// Specify options for the parquet scan -pub struct ParquetScanOptions { - /// If true, any available `pruning_predicate` will be converted to a `RowFilter` - /// and pushed down to the `ParquetRecordBatchStream`. This will enable row level - /// filter at the decoder level. Defaults to false - pushdown_filters: bool, - /// If true, the generated `RowFilter` may reorder the predicate `Expr`s to try and optimize - /// the cost of filter evaluation. - reorder_predicates: bool, - /// If enabled, the reader will read the page index - /// This is used to optimise filter pushdown - /// via `RowSelector` and `RowFilter` by - /// eliminating unnecessary IO and decoding - enable_page_index: bool, -} - -impl ParquetScanOptions { - /// Set whether to pushdown pruning predicate to the parquet scan - pub fn with_pushdown_filters(mut self, pushdown_filters: bool) -> Self { - self.pushdown_filters = pushdown_filters; - self - } - - /// Set whether to reorder pruning predicate expressions in order to minimize evaluation cost - pub fn with_reorder_predicates(mut self, reorder_predicates: bool) -> Self { - self.reorder_predicates = reorder_predicates; - self - } - - /// Set whether to read page index when reading parquet - pub fn with_page_index(mut self, page_index: bool) -> Self { - self.enable_page_index = page_index; - self - } -} - /// Execution plan for scanning one or more Parquet partitions #[derive(Debug, Clone)] pub struct ParquetExec { @@ -123,8 +89,6 @@ pub struct ParquetExec { metadata_size_hint: Option, /// Optional user defined parquet file reader factory parquet_file_reader_factory: Option>, - /// Options to specify behavior of parquet scan - scan_options: ParquetScanOptions, } impl ParquetExec { @@ -165,7 +129,6 @@ impl ParquetExec { pruning_predicate, metadata_size_hint, parquet_file_reader_factory: None, - scan_options: ParquetScanOptions::default(), } } @@ -194,15 +157,71 @@ impl ParquetExec { self } - /// Configure `ParquetScanOptions` - pub fn with_scan_options(mut self, scan_options: ParquetScanOptions) -> Self { - self.scan_options = scan_options; + /// If true, any filter [`Expr`]s on the scan will converted to a + /// [`RowFilter`](parquet::arrow::arrow_reader::RowFilter) in the + /// `ParquetRecordBatchStream`. These filters are applied by the + /// parquet decoder to skip unecessairly decoding other columns + /// which would not pass the predicate. Defaults to false + pub fn with_pushdown_filters(self, pushdown_filters: bool) -> Self { + self.base_config + .config_options + .write() + .set_bool(OPT_PARQUET_PUSHDOWN_FILTERS, pushdown_filters); + self + } + + /// Return the value described in [`Self::with_pushdown_filters`] + pub fn pushdown_filters(&self) -> bool { + self.base_config + .config_options + .read() + .get_bool(OPT_PARQUET_PUSHDOWN_FILTERS) + // default to false + .unwrap_or_default() + } + + /// If true, the `RowFilter` made by `pushdown_filters` may try to + /// minimize the cost of filter evaluation by reordering the + /// predicate [`Expr`]s. If false, the predicates are applied in + /// the same order as specified in the query. Defaults to false. + pub fn with_reorder_filters(self, reorder_filters: bool) -> Self { + self.base_config + .config_options + .write() + .set_bool(OPT_PARQUET_REORDER_FILTERS, reorder_filters); + self + } + + /// Return the value described in [`Self::with_reorder_filters`] + pub fn reorder_filters(&self) -> bool { + self.base_config + .config_options + .read() + .get_bool(OPT_PARQUET_REORDER_FILTERS) + // default to false + .unwrap_or_default() + } + + /// If enabled, the reader will read the page index + /// This is used to optimise filter pushdown + /// via `RowSelector` and `RowFilter` by + /// eliminating unnecessary IO and decoding + pub fn with_enable_page_index(self, enable_page_index: bool) -> Self { + self.base_config + .config_options + .write() + .set_bool(OPT_PARQUET_ENABLE_PAGE_INDEX, enable_page_index); self } - /// Ref to the `ParquetScanOptions` - pub fn parquet_scan_options(&self) -> &ParquetScanOptions { - &self.scan_options + /// Return the value described in [`Self::with_enable_page_index`] + pub fn enable_page_index(&self) -> bool { + self.base_config + .config_options + .read() + .get_bool(OPT_PARQUET_ENABLE_PAGE_INDEX) + // default to false + .unwrap_or_default() } } @@ -314,7 +333,9 @@ impl ExecutionPlan for ParquetExec { metadata_size_hint: self.metadata_size_hint, metrics: self.metrics.clone(), parquet_file_reader_factory, - scan_options: self.scan_options.clone(), + pushdown_filters: self.pushdown_filters(), + reorder_filters: self.reorder_filters(), + enable_page_index: self.enable_page_index(), }; let stream = FileStream::new( @@ -376,7 +397,9 @@ struct ParquetOpener { metadata_size_hint: Option, metrics: ExecutionPlanMetricsSet, parquet_file_reader_factory: Arc, - scan_options: ParquetScanOptions, + pushdown_filters: bool, + reorder_filters: bool, + enable_page_index: bool, } impl FileOpener for ParquetOpener { @@ -406,9 +429,9 @@ impl FileOpener for ParquetOpener { let projection = self.projection.clone(); let pruning_predicate = self.pruning_predicate.clone(); let table_schema = self.table_schema.clone(); - let reorder_predicates = self.scan_options.reorder_predicates; - let pushdown_filters = self.scan_options.pushdown_filters; - let enable_page_index = self.scan_options.enable_page_index; + let reorder_predicates = self.reorder_filters; + let pushdown_filters = self.pushdown_filters; + let enable_page_index = self.enable_page_index; Ok(Box::pin(async move { let options = ArrowReaderOptions::new().with_page_index(enable_page_index); @@ -1138,6 +1161,7 @@ pub async fn plan_to_parquet( #[cfg(test)] mod tests { use super::*; + use crate::config::ConfigOptions; use crate::datasource::file_format::parquet::test_util::store_parquet; use crate::datasource::file_format::test_util::scan_format; use crate::datasource::listing::{FileRange, PartitionedFile}; @@ -1203,17 +1227,16 @@ mod tests { projection, limit: None, table_partition_cols: vec![], + config_options: ConfigOptions::new().into_shareable(), }, predicate, None, ); if pushdown_predicate { - parquet_exec = parquet_exec.with_scan_options( - ParquetScanOptions::default() - .with_pushdown_filters(true) - .with_reorder_predicates(true), - ); + parquet_exec = parquet_exec + .with_pushdown_filters(true) + .with_reorder_filters(true); } let session_ctx = SessionContext::new(); @@ -1695,6 +1718,7 @@ mod tests { projection: None, limit: None, table_partition_cols: vec![], + config_options: ConfigOptions::new().into_shareable(), }, None, None, @@ -1796,6 +1820,7 @@ mod tests { "month".to_owned(), "day".to_owned(), ], + config_options: ConfigOptions::new().into_shareable(), }, None, None, @@ -1854,6 +1879,7 @@ mod tests { projection: None, limit: None, table_partition_cols: vec![], + config_options: ConfigOptions::new().into_shareable(), }, None, None, diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs index d6e2c05fc4eff..bce277676deab 100644 --- a/datafusion/core/src/test/mod.rs +++ b/datafusion/core/src/test/mod.rs @@ -18,6 +18,7 @@ //! Common unit test utility methods use crate::arrow::array::UInt32Array; +use crate::config::ConfigOptions; use crate::datasource::file_format::file_type::{FileCompressionType, FileType}; use crate::datasource::listing::PartitionedFile; use crate::datasource::object_store::ObjectStoreUrl; @@ -165,6 +166,7 @@ pub fn partitioned_csv_config( projection: None, limit: None, table_partition_cols: vec![], + config_options: ConfigOptions::new().into_shareable(), }) } diff --git a/datafusion/core/tests/custom_parquet_reader.rs b/datafusion/core/tests/custom_parquet_reader.rs index ac8c983814472..ded5fad022db5 100644 --- a/datafusion/core/tests/custom_parquet_reader.rs +++ b/datafusion/core/tests/custom_parquet_reader.rs @@ -24,6 +24,7 @@ mod tests { use arrow::record_batch::RecordBatch; use bytes::Bytes; use datafusion::assert_batches_sorted_eq; + use datafusion::config::ConfigOptions; use datafusion::datasource::file_format::parquet::fetch_parquet_metadata; use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::object_store::ObjectStoreUrl; @@ -88,6 +89,7 @@ mod tests { projection: None, limit: None, table_partition_cols: vec![], + config_options: ConfigOptions::new().into_shareable(), }, None, None, diff --git a/datafusion/core/tests/row.rs b/datafusion/core/tests/row.rs index 2c840321f56c7..630c28a109c22 100644 --- a/datafusion/core/tests/row.rs +++ b/datafusion/core/tests/row.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use datafusion::config::ConfigOptions; use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::file_format::FileFormat; use datafusion::datasource::object_store::ObjectStoreUrl; @@ -105,6 +106,7 @@ async fn get_exec( projection: projection.clone(), limit, table_partition_cols: vec![], + config_options: ConfigOptions::new().into_shareable(), }, &[], ) From f1cdaba1fc7a341023c833350d5103f5407f3d9f Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 18 Oct 2022 10:14:25 -0400 Subject: [PATCH 2/3] fix logical conflit --- datafusion/core/src/physical_plan/file_format/parquet.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index ad8c808f0bf35..f5bd890591fd5 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -2502,14 +2502,13 @@ mod tests { projection: None, limit: None, table_partition_cols: vec![], + config_options: ConfigOptions::new().into_shareable(), }, Some(filter), None, ); - let parquet_exec_page_index = parquet_exec - .clone() - .with_scan_options(ParquetScanOptions::default().with_page_index(true)); + let parquet_exec_page_index = parquet_exec.clone().with_enable_page_index(true); let mut results = parquet_exec_page_index.execute(0, task_ctx)?; From 900e15fa642e16b0a1ce7aefaa53238a0884bfcb Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 18 Oct 2022 10:16:35 -0400 Subject: [PATCH 3/3] Update tests --- datafusion/core/tests/sql/information_schema.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/datafusion/core/tests/sql/information_schema.rs b/datafusion/core/tests/sql/information_schema.rs index d94a9cd5680ee..873ead46258b5 100644 --- a/datafusion/core/tests/sql/information_schema.rs +++ b/datafusion/core/tests/sql/information_schema.rs @@ -701,6 +701,9 @@ async fn show_all() { "| datafusion.execution.batch_size | 8192 |", "| datafusion.execution.coalesce_batches | true |", "| datafusion.execution.coalesce_target_batch_size | 4096 |", + "| datafusion.execution.parquet.enable_page_index | false |", + "| datafusion.execution.parquet.pushdown_filters | false |", + "| datafusion.execution.parquet.reorder_filters | false |", "| datafusion.execution.time_zone | UTC |", "| datafusion.explain.logical_plan_only | false |", "| datafusion.explain.physical_plan_only | false |",