Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 59 additions & 11 deletions datafusion/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,6 @@ pub const OPT_BATCH_SIZE: &str = "datafusion.execution.batch_size";
/// Configuration option "datafusion.execution.coalesce_batches"
pub const OPT_COALESCE_BATCHES: &str = "datafusion.execution.coalesce_batches";

/// Configuration option "datafusion.execution.coalesce_target_batch_size"
pub const OPT_COALESCE_TARGET_BATCH_SIZE: &str =
"datafusion.execution.coalesce_target_batch_size";

/// Configuration option "datafusion.execution.collect_statistics"
pub const OPT_COLLECT_STATISTICS: &str = "datafusion.execution.collect_statistics";

Expand Down Expand Up @@ -317,14 +313,8 @@ impl BuiltInConfigs {
small batches will be coalesced into larger batches. This is helpful when there \
are highly selective filters or joins that could produce tiny output batches. The \
target batch size is determined by the configuration setting \
'{}'.", OPT_COALESCE_TARGET_BATCH_SIZE),
'{}'.", OPT_BATCH_SIZE),
true,
),
ConfigDefinition::new_u64(
OPT_COALESCE_TARGET_BATCH_SIZE,
format!("Target batch size when coalescing batches. Uses in conjunction with the \
configuration setting '{}'.", OPT_COALESCE_BATCHES),
4096,
),
ConfigDefinition::new_string(
OPT_TIME_ZONE,
Expand Down Expand Up @@ -522,6 +512,34 @@ impl ConfigOptions {
Self { options }
}

/// Create new ConfigOptions struct, taking values from a string hash map.
/// Only the built-in configurations will be extracted from the hash map
/// and other key value pairs will be ignored.
pub fn from_string_hash_map(settings: HashMap<String, String>) -> Self {
let built_in = BuiltInConfigs::new();
let mut options = HashMap::with_capacity(built_in.config_definitions.len());
for config_def in &built_in.config_definitions {
let config_value = {
if let Some(v) = settings.get(&config_def.key) {
match ScalarValue::try_from_string(v.clone(), &config_def.data_type) {
Ok(parsed) => parsed,
Err(_) => {
warn!(
"Warning: could not parse setting {}={} to type {}.",
config_def.key, v, config_def.data_type
);
config_def.default_value.clone()
}
}
} else {
config_def.default_value.clone()
}
};
options.insert(config_def.key.clone(), config_value);
}
Self { options }
}

/// set a configuration option
pub fn set(&mut self, key: &str, value: ScalarValue) {
self.options.insert(key.to_string(), value);
Expand Down Expand Up @@ -588,6 +606,7 @@ impl ConfigOptions {
#[cfg(test)]
mod test {
use crate::config::{BuiltInConfigs, ConfigOptions};
use std::collections::HashMap;

#[test]
fn docs() {
Expand Down Expand Up @@ -629,4 +648,33 @@ mod test {
assert_eq!(None, config.get_string(key));
assert!(!config.get_bool(key).unwrap_or_default());
}

#[test]
fn get_config_from_string_hash_map() {
use crate::config::{OPT_COLLECT_STATISTICS, OPT_ENABLE_ROUND_ROBIN_REPARTITION};

let valid_built_in_setting = (OPT_ENABLE_ROUND_ROBIN_REPARTITION, "false");
let invalid_built_in_setting = (OPT_COLLECT_STATISTICS, "true2");
let none_built_in_setting = ("a", "false");

let mut settings = HashMap::new();
settings.insert(
valid_built_in_setting.0.to_string(),
valid_built_in_setting.1.to_string(),
);
settings.insert(
invalid_built_in_setting.0.to_string(),
invalid_built_in_setting.1.to_string(),
);
settings.insert(
none_built_in_setting.0.to_string(),
none_built_in_setting.1.to_string(),
);

let config = ConfigOptions::from_string_hash_map(settings);

assert_eq!(config.get_bool(valid_built_in_setting.0), Some(false));
assert_eq!(config.get_bool(invalid_built_in_setting.0), Some(false));
assert_eq!(None, config.get_bool(none_built_in_setting.0));
}
}
136 changes: 60 additions & 76 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ use crate::physical_optimizer::coalesce_batches::CoalesceBatches;
use crate::physical_optimizer::repartition::Repartition;

use crate::config::{
ConfigOptions, OPT_BATCH_SIZE, OPT_COALESCE_BATCHES, OPT_COALESCE_TARGET_BATCH_SIZE,
ConfigOptions, OPT_BATCH_SIZE, OPT_COALESCE_BATCHES,
OPT_ENABLE_ROUND_ROBIN_REPARTITION, OPT_FILTER_NULL_JOIN_KEYS,
OPT_OPTIMIZER_MAX_PASSES, OPT_OPTIMIZER_SKIP_FAILED_RULES,
};
Expand Down Expand Up @@ -1093,19 +1093,6 @@ impl QueryPlanner for DefaultQueryPlanner {
}
}

/// Session Configuration entry name for 'TARGET_PARTITIONS'
pub const TARGET_PARTITIONS: &str = "target_partitions";
/// Session Configuration entry name for 'REPARTITION_JOINS'
pub const REPARTITION_JOINS: &str = "repartition_joins";
/// Session Configuration entry name for 'REPARTITION_AGGREGATIONS'
pub const REPARTITION_AGGREGATIONS: &str = "repartition_aggregations";
/// Session Configuration entry name for 'REPARTITION_WINDOWS'
pub const REPARTITION_WINDOWS: &str = "repartition_windows";
/// Session Configuration entry name for 'PARQUET_PRUNING'
pub const PARQUET_PRUNING: &str = "parquet_pruning";
/// Session Configuration entry name for 'COLLECT_STATISTICS'
pub const COLLECT_STATISTICS: &str = "collect_statistics";

/// Map that holds opaque objects indexed by their type.
///
/// Data is wrapped into an [`Arc`] to enable [`Clone`] while still being [object safe].
Expand Down Expand Up @@ -1180,6 +1167,14 @@ impl SessionConfig {
}
}

/// Create new ConfigOptions struct, taking values from a string hash map.
pub fn from_string_hash_map(settings: HashMap<String, String>) -> Self {
Self {
config_options: ConfigOptions::from_string_hash_map(settings),
..Default::default()
}
}

/// Set a configuration option
pub fn set(mut self, key: &str, value: ScalarValue) -> Self {
self.config_options.set(key, value);
Expand Down Expand Up @@ -1345,45 +1340,33 @@ impl SessionConfig {
.unwrap()
}

/// Convert configuration options to name-value pairs with values
/// converted to strings.
///
/// Note that this method will eventually be deprecated and
/// replaced by [`config_options`].
///
/// [`config_options`]: SessionContext::config_option
pub fn to_props(&self) -> HashMap<String, String> {
let mut map = HashMap::new();
// copy configs from config_options
for (k, v) in self.config_options.options() {
map.insert(k.to_string(), format!("{}", v));
}
map.insert(
TARGET_PARTITIONS.to_owned(),
format!("{}", self.target_partitions()),
);
map.insert(
REPARTITION_JOINS.to_owned(),
format!("{}", self.repartition_joins()),
);
map.insert(
REPARTITION_AGGREGATIONS.to_owned(),
format!("{}", self.repartition_aggregations()),
);
map.insert(
REPARTITION_WINDOWS.to_owned(),
format!("{}", self.repartition_window_functions()),
);
map.insert(
PARQUET_PRUNING.to_owned(),
format!("{}", self.parquet_pruning()),
);
map.insert(
COLLECT_STATISTICS.to_owned(),
format!("{}", self.collect_statistics()),
);
/// Enables or disables the coalescence of small batches into larger batches
pub fn with_coalesce_batches(mut self, enabled: bool) -> Self {
self.config_options.set_bool(OPT_COALESCE_BATCHES, enabled);
self
}

map
/// Returns true if record batches will be examined between each operator
/// and small batches will be coalesced into larger batches.
pub fn coalesce_batches(&self) -> bool {
self.config_options
.get_bool(OPT_COALESCE_BATCHES)
.unwrap_or_default()
}

/// Enables or disables the round robin repartition for increasing parallelism
pub fn with_round_robin_repartition(mut self, enabled: bool) -> Self {
self.config_options
.set_bool(OPT_ENABLE_ROUND_ROBIN_REPARTITION, enabled);
self
}

/// Returns true if the physical plan optimizer will try to
/// add round robin repartition to increase parallelism to leverage more CPU cores.
pub fn round_robin_repartition(&self) -> bool {
self.config_options
.get_bool(OPT_ENABLE_ROUND_ROBIN_REPARTITION)
.unwrap_or_default()
}

/// Return a handle to the configuration options.
Expand Down Expand Up @@ -1543,11 +1526,7 @@ impl SessionState {
// - it's conflicted with some parts of the BasicEnforcement, since it will
// introduce additional repartitioning while the BasicEnforcement aims at
// reducing unnecessary repartitioning.
if config
.config_options
.get_bool(OPT_ENABLE_ROUND_ROBIN_REPARTITION)
.unwrap_or_default()
{
if config.round_robin_repartition() {
physical_optimizers.push(Arc::new(Repartition::new()));
}
//- Currently it will depend on the partition number to decide whether to change the
Expand All @@ -1573,19 +1552,8 @@ impl SessionState {
physical_optimizers.push(Arc::new(OptimizeSorts::new()));
// It will not influence the distribution and ordering of the whole plan tree.
// Therefore, to avoid influencing other rules, it should be run at last.
if config
.config_options
.get_bool(OPT_COALESCE_BATCHES)
.unwrap_or_default()
{
physical_optimizers.push(Arc::new(CoalesceBatches::new(
config
.config_options
.get_u64(OPT_COALESCE_TARGET_BATCH_SIZE)
.unwrap_or_default()
.try_into()
.unwrap(),
)));
if config.coalesce_batches() {
physical_optimizers.push(Arc::new(CoalesceBatches::new(config.batch_size())));
}

SessionState {
Expand Down Expand Up @@ -1916,30 +1884,46 @@ impl TaskContext {
SessionConfig::new()
.with_batch_size(task_props.get(OPT_BATCH_SIZE).unwrap().parse().unwrap())
.with_target_partitions(
task_props.get(TARGET_PARTITIONS).unwrap().parse().unwrap(),
task_props
.get(OPT_TARGET_PARTITIONS)
.unwrap()
.parse()
.unwrap(),
)
.with_repartition_joins(
task_props.get(REPARTITION_JOINS).unwrap().parse().unwrap(),
task_props
.get(OPT_REPARTITION_JOINS)
.unwrap()
.parse()
.unwrap(),
)
.with_repartition_aggregations(
task_props
.get(REPARTITION_AGGREGATIONS)
.get(OPT_REPARTITION_AGGREGATIONS)
.unwrap()
.parse()
.unwrap(),
)
.with_repartition_windows(
task_props
.get(REPARTITION_WINDOWS)
.get(OPT_REPARTITION_WINDOWS)
.unwrap()
.parse()
.unwrap(),
)
.with_parquet_pruning(
task_props.get(PARQUET_PRUNING).unwrap().parse().unwrap(),
task_props
.get(OPT_PARQUET_ENABLE_PRUNING)
.unwrap()
.parse()
.unwrap(),
)
.with_collect_statistics(
task_props.get(COLLECT_STATISTICS).unwrap().parse().unwrap(),
task_props
.get(OPT_COLLECT_STATISTICS)
.unwrap()
.parse()
.unwrap(),
)
};

Expand Down
6 changes: 2 additions & 4 deletions datafusion/core/src/physical_plan/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ pub fn concat_batches(
#[cfg(test)]
mod tests {
use super::*;
use crate::config::{OPT_COALESCE_BATCHES, OPT_COALESCE_TARGET_BATCH_SIZE};
use crate::config::OPT_COALESCE_BATCHES;
use crate::datasource::MemTable;
use crate::physical_plan::filter::FilterExec;
use crate::physical_plan::projection::ProjectionExec;
Expand All @@ -312,9 +312,7 @@ mod tests {

#[tokio::test]
async fn test_custom_batch_size() -> Result<()> {
let ctx = SessionContext::with_config(
SessionConfig::new().set_u64(OPT_COALESCE_TARGET_BATCH_SIZE, 1234),
);
let ctx = SessionContext::with_config(SessionConfig::new().with_batch_size(1234));
let plan = create_physical_plan(ctx).await?;
let projection = plan.as_any().downcast_ref::<ProjectionExec>().unwrap();
let coalesce = projection
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl<'a> DisplayableExecutionPlan<'a> {
///
/// ```text
/// ProjectionExec: expr=[a]
/// CoalesceBatchesExec: target_batch_size=4096
/// CoalesceBatchesExec: target_batch_size=8192
/// FilterExec: a < 5
/// RepartitionExec: partitioning=RoundRobinBatch(16)
/// CsvExec: source=...",
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ pub fn with_new_children_if_necessary(
/// let plan_string = plan_string.replace(normalized.as_ref(), "WORKING_DIR");
///
/// assert_eq!("ProjectionExec: expr=[a@0 as a]\
/// \n CoalesceBatchesExec: target_batch_size=4096\
/// \n CoalesceBatchesExec: target_batch_size=8192\
/// \n FilterExec: a@0 < 5\
/// \n RepartitionExec: partitioning=RoundRobinBatch(3)\
/// \n CsvExec: files={1 group: [[WORKING_DIR/tests/example.csv]]}, has_header=true, limit=None, projection=[a]",
Expand Down
13 changes: 6 additions & 7 deletions datafusion/core/tests/config_from_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,17 @@ fn get_config_bool_from_env() {
fn get_config_int_from_env() {
let config_key = "datafusion.execution.batch_size";
let env_key = "DATAFUSION_EXECUTION_BATCH_SIZE";

// for valid testing
env::set_var(env_key, "4096");
let config = ConfigOptions::from_env();
env::remove_var(env_key);
assert_eq!(config.get_u64(config_key).unwrap_or_default(), 4096);
}

#[test]
fn get_config_int_from_env_invalid() {
let config_key = "datafusion.execution.coalesce_target_batch_size";
let env_key = "DATAFUSION_EXECUTION_COALESCE_TARGET_BATCH_SIZE";
// for invalid testing
env::set_var(env_key, "abc");
let config = ConfigOptions::from_env();
assert_eq!(config.get_u64(config_key).unwrap_or_default(), 8192); // set to its default value

// To avoid influence other testing, we need to clear this environment variable
env::remove_var(env_key);
assert_eq!(config.get_u64(config_key).unwrap_or_default(), 4096); // set to its default value
}
Loading