Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ async fn get_table(
let options = ListingOptions::new(format)
.with_file_extension(extension)
.with_target_partitions(target_partitions)
.with_collect_stat(ctx.config.collect_statistics);
.with_collect_stat(ctx.config.collect_statistics());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this illustrates the major API change for DataFusion users


let table_path = ListingTableUrl::parse(path)?;
let config = ListingTableConfig::new(table_path).with_listing_options(options);
Expand Down
5 changes: 3 additions & 2 deletions datafusion/core/benches/sort_limit_query_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use datafusion::datasource::listing::{
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
};

use datafusion::prelude::SessionConfig;
use parking_lot::Mutex;
use std::sync::Arc;

Expand Down Expand Up @@ -85,8 +86,8 @@ fn create_context() -> Arc<Mutex<SessionContext>> {

rt.block_on(async {
// create local session context
let ctx = SessionContext::new();
ctx.state.write().config.target_partitions = 1;
let ctx =
SessionContext::with_config(SessionConfig::new().with_target_partitions(1));

let table_provider = Arc::new(csv.await);
let mem_table = MemTable::load(table_provider, Some(partitions), &ctx.state())
Expand Down
98 changes: 93 additions & 5 deletions datafusion/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,28 @@ use std::env;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;

/// Configuration option "datafusion.execution.target_partitions"
pub const OPT_TARGET_PARTITIONS: &str = "datafusion.execution.target_partitions";

/// Configuration option "datafusion.catalog.create_default_catalog_and_schema"
pub const OPT_CREATE_DEFAULT_CATALOG_AND_SCHEMA: &str =
"datafusion.catalog.create_default_catalog_and_schema";
/// Configuration option "datafusion.catalog.information_schema"
pub const OPT_INFORMATION_SCHEMA: &str = "datafusion.catalog.information_schema";

/// Configuration option "datafusion.optimizer.repartition_joins"
pub const OPT_REPARTITION_JOINS: &str = "datafusion.optimizer.repartition_joins";

/// Configuration option "datafusion.optimizer.repartition_aggregations"
pub const OPT_REPARTITION_AGGREGATIONS: &str =
"datafusion.optimizer.repartition_aggregations";

/// Configuration option "datafusion.optimizer.repartition_windows"
pub const OPT_REPARTITION_WINDOWS: &str = "datafusion.optimizer.repartition_windows";

/// Configuration option "datafusion.execuction_collect_statistics"
pub const OPT_COLLECT_STATISTICS: &str = "datafusion.execuction_collect_statistics";

/// Configuration option "datafusion.optimizer.filter_null_join_keys"
pub const OPT_FILTER_NULL_JOIN_KEYS: &str = "datafusion.optimizer.filter_null_join_keys";
Comment on lines +31 to 53
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason we couldn't just make these an enum? Then we could make the set_x and get_x methods type safe.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 that is an excellent idea -- I don't think there is any reason we could not do so. I will file a follow on ticket to do so

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


Expand Down Expand Up @@ -199,7 +221,54 @@ impl BuiltInConfigs {
/// configuration options
pub fn new() -> Self {
Self {
config_definitions: vec![ConfigDefinition::new_bool(
config_definitions: vec![ConfigDefinition::new_u64(
OPT_TARGET_PARTITIONS,
"Number of partitions for query execution. Increasing partitions can increase \
concurrency. Defaults to the number of cpu cores on the system.",
num_cpus::get() as u64,
),

ConfigDefinition::new_bool(
OPT_CREATE_DEFAULT_CATALOG_AND_SCHEMA,
"Whether the default catalog and schema should be created automatically.",
true
),

ConfigDefinition::new_bool(
OPT_INFORMATION_SCHEMA,
"Should DataFusion provide access to `information_schema` \
virtual tables for displaying schema information",
false
),

ConfigDefinition::new_bool(
OPT_REPARTITION_JOINS,
"Should DataFusion repartition data using the join keys to execute joins in parallel \
using the provided `target_partitions` level",
true
),

ConfigDefinition::new_bool(
OPT_REPARTITION_AGGREGATIONS,
"Should DataFusion repartition data using the aggregate keys to execute aggregates \
in parallel using the provided `target_partitions` level",
true
),

ConfigDefinition::new_bool(
OPT_REPARTITION_WINDOWS,
"Should DataFusion collect statistics after listing files",
true
),

ConfigDefinition::new_bool(
OPT_COLLECT_STATISTICS,
"Should DataFusion repartition data using the partitions keys to execute window \
functions in parallel using the provided `target_partitions` level",
false
),

ConfigDefinition::new_bool(
OPT_FILTER_NULL_JOIN_KEYS,
"When set to true, the optimizer will insert filters before a join between \
a nullable and non-nullable column to filter out nulls on the nullable side. This \
Expand Down Expand Up @@ -336,11 +405,14 @@ impl BuiltInConfigs {
let configs = Self::new();
let mut docs = "| key | type | default | description |\n".to_string();
docs += "|-----|------|---------|-------------|\n";
for config in configs

let config_definitions: Vec<_> = configs
.config_definitions
.iter()
.sorted_by_key(|c| c.key.as_str())
{
.into_iter()
.map(normalize_for_display)
.collect();

for config in config_definitions.iter().sorted_by_key(|c| c.key.as_str()) {
let _ = writeln!(
&mut docs,
"| {} | {} | {} | {} |",
Expand All @@ -351,6 +423,16 @@ impl BuiltInConfigs {
}
}

/// Normalizes a config definition prior to markdown display
fn normalize_for_display(mut v: ConfigDefinition) -> ConfigDefinition {
// Since the default value of target_partitions depends on the number of cores,
// set the default value to 0 in the docs.
if v.key == OPT_TARGET_PARTITIONS {
v.default_value = ScalarValue::UInt64(Some(0))
}
v
}

/// Configuration options struct. This can contain values for built-in and custom options
#[derive(Clone)]
pub struct ConfigOptions {
Expand Down Expand Up @@ -437,6 +519,12 @@ impl ConfigOptions {
self.set(key, ScalarValue::UInt64(Some(value)))
}

/// set a `usize` configuration option
pub fn set_usize(&mut self, key: &str, value: usize) {
let value: u64 = value.try_into().expect("convert u64 to usize");
self.set(key, ScalarValue::UInt64(Some(value)))
}

/// set a `String` configuration option
pub fn set_string(&mut self, key: &str, value: impl Into<String>) {
self.set(key, ScalarValue::Utf8(Some(value.into())))
Expand Down
10 changes: 4 additions & 6 deletions datafusion/core/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -839,6 +839,7 @@ mod tests {
use std::vec;

use super::*;
use crate::execution::context::SessionConfig;
use crate::execution::options::{CsvReadOptions, ParquetReadOptions};
use crate::physical_plan::ColumnarValue;
use crate::physical_plan::Partitioning;
Expand Down Expand Up @@ -1541,8 +1542,7 @@ mod tests {
assert_eq!(4016, union_rows.iter().map(|x| x.num_rows()).sum::<usize>());

let physical_plan = union.create_physical_plan().await?;
let default_partition_count =
SessionContext::new().copied_config().target_partitions;
let default_partition_count = SessionConfig::new().target_partitions();

// For partition aware union, the output partition count should not be changed.
assert_eq!(
Expand Down Expand Up @@ -1597,8 +1597,7 @@ mod tests {
assert_eq!(916, union_rows.iter().map(|x| x.num_rows()).sum::<usize>());

let physical_plan = union.create_physical_plan().await?;
let default_partition_count =
SessionContext::new().copied_config().target_partitions;
let default_partition_count = SessionConfig::new().target_partitions();

// For non-partition aware union, the output partitioning count should be the combination of all output partitions count
assert!(matches!(
Expand Down Expand Up @@ -1627,8 +1626,7 @@ mod tests {
JoinType::RightAnti,
];

let default_partition_count =
SessionContext::new().copied_config().target_partitions;
let default_partition_count = SessionConfig::new().target_partitions();

for join_type in all_join_types {
let join = left.join(
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ impl ListingTableConfig {

let listing_options = ListingOptions::new(format)
.with_file_extension(file_extension)
.with_target_partitions(state.config.target_partitions);
.with_target_partitions(state.config.target_partitions());

Ok(Self {
table_paths: self.table_paths,
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/listing_table_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,9 @@ impl TableProviderFactory for ListingTableFactory {
};

let options = ListingOptions::new(file_format)
.with_collect_stat(state.config.collect_statistics)
.with_collect_stat(state.config.collect_statistics())
.with_file_extension(file_extension)
.with_target_partitions(state.config.target_partitions)
.with_target_partitions(state.config.target_partitions())
.with_table_partition_cols(table_partition_cols)
.with_file_sort_order(None);

Expand Down
Loading