Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 41 additions & 15 deletions benchmarks/src/bin/parquet_filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,18 @@ use arrow::datatypes::{DataType, Field, Int32Type, Schema, SchemaRef, TimeUnit};
use arrow::record_batch::RecordBatch;
use arrow::util::pretty;
use datafusion::common::{Result, ToDFSchema};
use datafusion::config::{
ConfigOptions, OPT_PARQUET_ENABLE_PAGE_INDEX, OPT_PARQUET_PUSHDOWN_FILTERS,
OPT_PARQUET_REORDER_FILTERS,
};
use datafusion::datasource::listing::{ListingTableUrl, PartitionedFile};
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::execution::context::ExecutionProps;
use datafusion::logical_expr::{lit, or, Expr};
use datafusion::optimizer::utils::disjunction;
use datafusion::physical_expr::create_physical_expr;
use datafusion::physical_plan::collect;
use datafusion::physical_plan::file_format::{
FileScanConfig, ParquetExec, ParquetScanOptions,
};
use datafusion::physical_plan::file_format::{FileScanConfig, ParquetExec};
use datafusion::physical_plan::filter::FilterExec;
use datafusion::prelude::{col, SessionConfig, SessionContext};
use object_store::path::Path;
Expand Down Expand Up @@ -109,6 +111,13 @@ async fn main() -> Result<()> {
Ok(())
}

#[derive(Debug, Clone)]
struct ParquetScanOptions {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was replicated for the benchmark code as I felt such a struct was the easiest to understand for this matrix strategy

pushdown_filters: bool,
reorder_filters: bool,
enable_page_index: bool,
}

async fn run_benchmarks(
ctx: &mut SessionContext,
object_store_url: ObjectStoreUrl,
Expand All @@ -117,15 +126,21 @@ async fn run_benchmarks(
debug: bool,
) -> Result<()> {
let scan_options_matrix = vec![
ParquetScanOptions::default(),
ParquetScanOptions::default()
.with_page_index(true)
.with_pushdown_filters(true)
.with_reorder_predicates(true),
ParquetScanOptions::default()
.with_page_index(true)
.with_pushdown_filters(true)
.with_reorder_predicates(false),
ParquetScanOptions {
pushdown_filters: false,
reorder_filters: false,
enable_page_index: false,
},
ParquetScanOptions {
pushdown_filters: true,
reorder_filters: true,
enable_page_index: true,
},
ParquetScanOptions {
pushdown_filters: true,
reorder_filters: true,
enable_page_index: false,
},
];

let filter_matrix = vec![
Expand Down Expand Up @@ -193,6 +208,18 @@ async fn exec_scan(
debug: bool,
) -> Result<usize> {
let schema = BatchBuilder::schema();

let ParquetScanOptions {
pushdown_filters,
reorder_filters,
enable_page_index,
} = scan_options;

let mut config_options = ConfigOptions::new();
config_options.set_bool(OPT_PARQUET_PUSHDOWN_FILTERS, pushdown_filters);
config_options.set_bool(OPT_PARQUET_REORDER_FILTERS, reorder_filters);
config_options.set_bool(OPT_PARQUET_ENABLE_PAGE_INDEX, enable_page_index);

let scan_config = FileScanConfig {
object_store_url,
file_schema: schema.clone(),
Expand All @@ -206,6 +233,7 @@ async fn exec_scan(
projection: None,
limit: None,
table_partition_cols: vec![],
config_options: config_options.into_shareable(),
};

let df_schema = schema.clone().to_dfschema()?;
Expand All @@ -217,9 +245,7 @@ async fn exec_scan(
&ExecutionProps::default(),
)?;

let parquet_exec = Arc::new(
ParquetExec::new(scan_config, Some(filter), None).with_scan_options(scan_options),
);
let parquet_exec = Arc::new(ParquetExec::new(scan_config, Some(filter), None));

let exec = Arc::new(FilterExec::try_new(physical_filter_expr, parquet_exec)?);

Expand Down
78 changes: 60 additions & 18 deletions datafusion/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ use arrow::datatypes::DataType;
use datafusion_common::ScalarValue;
use itertools::Itertools;
use log::warn;
use parking_lot::RwLock;
use std::collections::HashMap;
use std::env;
use std::sync::Arc;

/// Configuration option "datafusion.optimizer.filter_null_join_keys"
pub const OPT_FILTER_NULL_JOIN_KEYS: &str = "datafusion.optimizer.filter_null_join_keys";
Expand All @@ -43,13 +45,25 @@ pub const OPT_COALESCE_BATCHES: &str = "datafusion.execution.coalesce_batches";
pub const OPT_COALESCE_TARGET_BATCH_SIZE: &str =
"datafusion.execution.coalesce_target_batch_size";

/// Configuration option "datafusion.execution.time_zone"
pub const OPT_TIME_ZONE: &str = "datafusion.execution.time_zone";

/// Configuration option "datafusion.execution.parquet.pushdown_filters"
pub const OPT_PARQUET_PUSHDOWN_FILTERS: &str =
"datafusion.execution.parquet.pushdown_filters";

/// Configuration option "datafusion.execution.parquet.reorder_filters"
pub const OPT_PARQUET_REORDER_FILTERS: &str =
"datafusion.execution.parquet.reorder_filters";

/// Configuration option "datafusion.execution.parquet.enable_page_index"
pub const OPT_PARQUET_ENABLE_PAGE_INDEX: &str =
"datafusion.execution.parquet.enable_page_index";

/// Configuration option "datafusion.optimizer.skip_failed_rules"
pub const OPT_OPTIMIZER_SKIP_FAILED_RULES: &str =
"datafusion.optimizer.skip_failed_rules";

/// Configuration option "datafusion.execution.time_zone"
pub const OPT_TIME_ZONE: &str = "datafusion.execution.time_zone";

/// Definition of a configuration option
pub struct ConfigDefinition {
/// key used to identifier this configuration option
Expand Down Expand Up @@ -173,11 +187,11 @@ impl BuiltInConfigs {
false,
),
ConfigDefinition::new_u64(
OPT_BATCH_SIZE,
"Default batch size while creating new batches, it's especially useful for \
buffer-in-memory batches since creating tiny batches would results in too much metadata \
memory consumption.",
8192,
OPT_BATCH_SIZE,
"Default batch size while creating new batches, it's especially useful for \
buffer-in-memory batches since creating tiny batches would results in too much metadata \
memory consumption.",
8192,
),
ConfigDefinition::new_bool(
OPT_COALESCE_BATCHES,
Expand All @@ -191,23 +205,43 @@ impl BuiltInConfigs {
ConfigDefinition::new_u64(
OPT_COALESCE_TARGET_BATCH_SIZE,
format!("Target batch size when coalescing batches. Uses in conjunction with the \
configuration setting '{}'.", OPT_COALESCE_BATCHES),
configuration setting '{}'.", OPT_COALESCE_BATCHES),
4096,
),
ConfigDefinition::new_string(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this to be with the other settings

OPT_TIME_ZONE,
"The session time zone which some function require \
e.g. EXTRACT(HOUR from SOME_TIME) shift the underline datetime according to the time zone,
then extract the hour.",
"UTC".into()
),
ConfigDefinition::new_bool(
OPT_PARQUET_PUSHDOWN_FILTERS,
"If true, filter expressions are be applied during the parquet decoding operation to \
reduce the number of rows decoded.",
false,
),
ConfigDefinition::new_bool(
OPT_PARQUET_REORDER_FILTERS,
"If true, filter expressions evaluated during the parquet decoding opearation \
will be reordered heuristically to minimize the cost of evaluation. If false, \
the filters are applied in the same order as written in the query.",
false,
),
ConfigDefinition::new_bool(
OPT_PARQUET_ENABLE_PAGE_INDEX,
"If true, uses parquet data page level metadata (Page Index) statistics \
to reduce the number of rows decoded.",
false,
),
ConfigDefinition::new_bool(
OPT_OPTIMIZER_SKIP_FAILED_RULES,
"When set to true, the logical plan optimizer will produce warning \
messages if any optimization rules produce errors and then proceed to the next \
rule. When set to false, any rules that produce errors will cause the query to fail.",
true
),
ConfigDefinition::new_string(
OPT_TIME_ZONE,
"The session time zone which some function require \
e.g. EXTRACT(HOUR from SOME_TIME) shift the underline datetime according to the time zone,
then extract the hour",
"UTC".into()
)]
]
}
}

Expand Down Expand Up @@ -255,8 +289,16 @@ impl ConfigOptions {
Self { options }
}

/// Create new ConfigOptions struct, taking values from environment variables where possible.
/// For example, setting `DATAFUSION_EXECUTION_BATCH_SIZE` to control `datafusion.execution.batch_size`.
/// Create a new [`ConfigOptions`] wrapped in an RwLock and Arc
pub fn into_shareable(self) -> Arc<RwLock<Self>> {
Arc::new(RwLock::new(self))
}

/// Create new ConfigOptions struct, taking values from
/// environment variables where possible.
///
/// For example, setting `DATAFUSION_EXECUTION_BATCH_SIZE` will
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add some documentation about this to the datafusion-cli docs as I couldn't find it when I was looking

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/// control `datafusion.execution.batch_size`.
pub fn from_env() -> Self {
let built_in = BuiltInConfigs::new();
let mut options = HashMap::with_capacity(built_in.config_definitions.len());
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
#[cfg(test)]
pub(crate) mod test_util {
use super::*;
use crate::config::ConfigOptions;
use crate::datasource::listing::PartitionedFile;
use crate::datasource::object_store::ObjectStoreUrl;
use crate::test::object_store::local_unpartitioned_file;
Expand Down Expand Up @@ -122,6 +123,7 @@ pub(crate) mod test_util {
projection,
limit,
table_partition_cols: vec![],
config_options: ConfigOptions::new().into_shareable(),
},
&[],
)
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ impl TableProvider for ListingTable {
projection: projection.clone(),
limit,
table_partition_cols: self.options.table_partition_cols.clone(),
config_options: ctx.config.config_options(),
},
filters,
)
Expand Down
9 changes: 8 additions & 1 deletion datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1184,7 +1184,7 @@ impl SessionConfig {
/// Create an execution config with config options read from the environment
pub fn from_env() -> Self {
Self {
config_options: Arc::new(RwLock::new(ConfigOptions::from_env())),
config_options: ConfigOptions::from_env().into_shareable(),
..Default::default()
}
}
Expand Down Expand Up @@ -1324,6 +1324,13 @@ impl SessionConfig {
map
}

/// Return a handle to the shared configuration options.
///
/// [`config_options`]: SessionContext::config_option
pub fn config_options(&self) -> Arc<RwLock<ConfigOptions>> {
self.config_options.clone()
}

/// Add extensions.
///
/// Extensions can be used to attach extra data to the session config -- e.g. tracing information or caches.
Expand Down
30 changes: 26 additions & 4 deletions datafusion/core/src/execution/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,12 @@ use crate::datasource::{
listing::ListingOptions,
};

/// CSV file read option
/// Options that control the reading of CSV files.
///
/// Note this structure is supplied when a datasource is created and
/// can not not vary from statement to statement. For settings that
/// can vary statement to statement see
/// [`ConfigOptions`](crate::config::ConfigOptions).
#[derive(Clone)]
pub struct CsvReadOptions<'a> {
/// Does the CSV file have a header?
Expand Down Expand Up @@ -150,7 +155,12 @@ impl<'a> CsvReadOptions<'a> {
}
}

/// Parquet read options
/// Options that control the reading of Parquet files.
///
/// Note this structure is supplied when a datasource is created and
/// can not not vary from statement to statement. For settings that
/// can vary statement to statement see
/// [`ConfigOptions`](crate::config::ConfigOptions).
#[derive(Clone)]
pub struct ParquetReadOptions<'a> {
/// File extension; only files with this extension are selected for data input.
Expand All @@ -160,10 +170,12 @@ pub struct ParquetReadOptions<'a> {
pub table_partition_cols: Vec<String>,
/// Should DataFusion parquet reader use the predicate to prune data,
/// overridden by value on execution::context::SessionConfig
// TODO move this into ConfigOptions
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will do this as a follow on PR

pub parquet_pruning: bool,
/// Tell the parquet reader to skip any metadata that may be in
/// the file Schema. This can help avoid schema conflicts due to
/// metadata. Defaults to true.
// TODO move this into ConfigOptions
pub skip_metadata: bool,
}

Expand Down Expand Up @@ -217,7 +229,12 @@ impl<'a> ParquetReadOptions<'a> {
}
}

/// Avro read options
/// Options that control the reading of AVRO files.
///
/// Note this structure is supplied when a datasource is created and
/// can not not vary from statement to statement. For settings that
/// can vary statement to statement see
/// [`ConfigOptions`](crate::config::ConfigOptions).
#[derive(Clone)]
pub struct AvroReadOptions<'a> {
/// The data source schema.
Expand Down Expand Up @@ -261,7 +278,12 @@ impl<'a> AvroReadOptions<'a> {
}
}

/// Line-delimited JSON read options
/// Options that control the reading of Line-delimited JSON files (NDJson)
///
/// Note this structure is supplied when a datasource is created and
/// can not not vary from statement to statement. For settings that
/// can vary statement to statement see
/// [`ConfigOptions`](crate::config::ConfigOptions).
#[derive(Clone)]
pub struct NdJsonReadOptions<'a> {
/// The data source schema.
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/src/physical_optimizer/repartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ mod tests {
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};

use super::*;
use crate::config::ConfigOptions;
use crate::datasource::listing::PartitionedFile;
use crate::datasource::object_store::ObjectStoreUrl;
use crate::physical_plan::aggregates::{
Expand Down Expand Up @@ -269,6 +270,7 @@ mod tests {
projection: None,
limit: None,
table_partition_cols: vec![],
config_options: ConfigOptions::new().into_shareable(),
},
None,
None,
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/physical_plan/file_format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ mod private {
#[cfg(test)]
#[cfg(feature = "avro")]
mod tests {
use crate::config::ConfigOptions;
use crate::datasource::file_format::{avro::AvroFormat, FileFormat};
use crate::datasource::listing::PartitionedFile;
use crate::datasource::object_store::ObjectStoreUrl;
Expand Down Expand Up @@ -237,6 +238,7 @@ mod tests {
projection: Some(vec![0, 1, 2]),
limit: None,
table_partition_cols: vec![],
config_options: ConfigOptions::new().into_shareable(),
});
assert_eq!(avro_exec.output_partitioning().partition_count(), 1);

Expand Down Expand Up @@ -306,6 +308,7 @@ mod tests {
projection,
limit: None,
table_partition_cols: vec![],
config_options: ConfigOptions::new().into_shareable(),
});
assert_eq!(avro_exec.output_partitioning().partition_count(), 1);

Expand Down Expand Up @@ -374,6 +377,7 @@ mod tests {
statistics: Statistics::default(),
limit: None,
table_partition_cols: vec!["date".to_owned()],
config_options: ConfigOptions::new().into_shareable(),
});
assert_eq!(avro_exec.output_partitioning().partition_count(), 1);

Expand Down
Loading