From 356429b54f276eb7ce8bd93119345824bae7e5ac Mon Sep 17 00:00:00 2001 From: yangzhong Date: Wed, 28 Dec 2022 16:09:33 +0800 Subject: [PATCH 1/4] Remove deprecated config related code in context.rs --- datafusion/core/src/execution/context.rs | 82 +++++++----------------- 1 file changed, 22 insertions(+), 60 deletions(-) diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index db6ab99fb5203..9a15580250cae 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -1093,19 +1093,6 @@ impl QueryPlanner for DefaultQueryPlanner { } } -/// Session Configuration entry name for 'TARGET_PARTITIONS' -pub const TARGET_PARTITIONS: &str = "target_partitions"; -/// Session Configuration entry name for 'REPARTITION_JOINS' -pub const REPARTITION_JOINS: &str = "repartition_joins"; -/// Session Configuration entry name for 'REPARTITION_AGGREGATIONS' -pub const REPARTITION_AGGREGATIONS: &str = "repartition_aggregations"; -/// Session Configuration entry name for 'REPARTITION_WINDOWS' -pub const REPARTITION_WINDOWS: &str = "repartition_windows"; -/// Session Configuration entry name for 'PARQUET_PRUNING' -pub const PARQUET_PRUNING: &str = "parquet_pruning"; -/// Session Configuration entry name for 'COLLECT_STATISTICS' -pub const COLLECT_STATISTICS: &str = "collect_statistics"; - /// Map that holds opaque objects indexed by their type. /// /// Data is wrapped into an [`Arc`] to enable [`Clone`] while still being [object safe]. @@ -1345,47 +1332,6 @@ impl SessionConfig { .unwrap() } - /// Convert configuration options to name-value pairs with values - /// converted to strings. - /// - /// Note that this method will eventually be deprecated and - /// replaced by [`config_options`]. - /// - /// [`config_options`]: SessionContext::config_option - pub fn to_props(&self) -> HashMap { - let mut map = HashMap::new(); - // copy configs from config_options - for (k, v) in self.config_options.options() { - map.insert(k.to_string(), format!("{}", v)); - } - map.insert( - TARGET_PARTITIONS.to_owned(), - format!("{}", self.target_partitions()), - ); - map.insert( - REPARTITION_JOINS.to_owned(), - format!("{}", self.repartition_joins()), - ); - map.insert( - REPARTITION_AGGREGATIONS.to_owned(), - format!("{}", self.repartition_aggregations()), - ); - map.insert( - REPARTITION_WINDOWS.to_owned(), - format!("{}", self.repartition_window_functions()), - ); - map.insert( - PARQUET_PRUNING.to_owned(), - format!("{}", self.parquet_pruning()), - ); - map.insert( - COLLECT_STATISTICS.to_owned(), - format!("{}", self.collect_statistics()), - ); - - map - } - /// Return a handle to the configuration options. /// /// [`config_options`]: SessionContext::config_option @@ -1916,30 +1862,46 @@ impl TaskContext { SessionConfig::new() .with_batch_size(task_props.get(OPT_BATCH_SIZE).unwrap().parse().unwrap()) .with_target_partitions( - task_props.get(TARGET_PARTITIONS).unwrap().parse().unwrap(), + task_props + .get(OPT_TARGET_PARTITIONS) + .unwrap() + .parse() + .unwrap(), ) .with_repartition_joins( - task_props.get(REPARTITION_JOINS).unwrap().parse().unwrap(), + task_props + .get(OPT_REPARTITION_JOINS) + .unwrap() + .parse() + .unwrap(), ) .with_repartition_aggregations( task_props - .get(REPARTITION_AGGREGATIONS) + .get(OPT_REPARTITION_AGGREGATIONS) .unwrap() .parse() .unwrap(), ) .with_repartition_windows( task_props - .get(REPARTITION_WINDOWS) + .get(OPT_REPARTITION_WINDOWS) .unwrap() .parse() .unwrap(), ) .with_parquet_pruning( - task_props.get(PARQUET_PRUNING).unwrap().parse().unwrap(), + task_props + .get(OPT_PARQUET_ENABLE_PRUNING) + .unwrap() + .parse() + .unwrap(), ) .with_collect_statistics( - task_props.get(COLLECT_STATISTICS).unwrap().parse().unwrap(), + task_props + .get(OPT_COLLECT_STATISTICS) + .unwrap() + .parse() + .unwrap(), ) }; From 747b72041d677cc2ce1221c901548009223e3d54 Mon Sep 17 00:00:00 2001 From: yangzhong Date: Wed, 28 Dec 2022 16:14:52 +0800 Subject: [PATCH 2/4] Provide coalesce_batches() and round_robin_repartition() methods for SessionConfig --- datafusion/core/src/execution/context.rs | 41 ++++++++++++++++++------ 1 file changed, 31 insertions(+), 10 deletions(-) diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index 9a15580250cae..fad3752e131ab 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -1332,6 +1332,35 @@ impl SessionConfig { .unwrap() } + /// Enables or disables the coalescence of small batches into larger batches + pub fn with_coalesce_batches(mut self, enabled: bool) -> Self { + self.config_options.set_bool(OPT_COALESCE_BATCHES, enabled); + self + } + + /// Returns true if record batches will be examined between each operator + /// and small batches will be coalesced into larger batches. + pub fn coalesce_batches(&self) -> bool { + self.config_options + .get_bool(OPT_COALESCE_BATCHES) + .unwrap_or_default() + } + + /// Enables or disables the round robin repartition for increasing parallelism + pub fn with_round_robin_repartition(mut self, enabled: bool) -> Self { + self.config_options + .set_bool(OPT_ENABLE_ROUND_ROBIN_REPARTITION, enabled); + self + } + + /// Returns true if the physical plan optimizer will try to + /// add round robin repartition to increase parallelism to leverage more CPU cores. + pub fn round_robin_repartition(&self) -> bool { + self.config_options + .get_bool(OPT_ENABLE_ROUND_ROBIN_REPARTITION) + .unwrap_or_default() + } + /// Return a handle to the configuration options. /// /// [`config_options`]: SessionContext::config_option @@ -1489,11 +1518,7 @@ impl SessionState { // - it's conflicted with some parts of the BasicEnforcement, since it will // introduce additional repartitioning while the BasicEnforcement aims at // reducing unnecessary repartitioning. - if config - .config_options - .get_bool(OPT_ENABLE_ROUND_ROBIN_REPARTITION) - .unwrap_or_default() - { + if config.round_robin_repartition() { physical_optimizers.push(Arc::new(Repartition::new())); } //- Currently it will depend on the partition number to decide whether to change the @@ -1519,11 +1544,7 @@ impl SessionState { physical_optimizers.push(Arc::new(OptimizeSorts::new())); // It will not influence the distribution and ordering of the whole plan tree. // Therefore, to avoid influencing other rules, it should be run at last. - if config - .config_options - .get_bool(OPT_COALESCE_BATCHES) - .unwrap_or_default() - { + if config.coalesce_batches() { physical_optimizers.push(Arc::new(CoalesceBatches::new( config .config_options From c37b6ad9815873648966e03e6f6a8fbae688c9a8 Mon Sep 17 00:00:00 2001 From: yangzhong Date: Wed, 28 Dec 2022 16:54:27 +0800 Subject: [PATCH 3/4] Remove the config datafusion.execution.coalesce_target_batch_size and use datafusion.execution.batch_size instead --- datafusion/core/src/config.rs | 12 +--- datafusion/core/src/execution/context.rs | 11 +--- .../src/physical_plan/coalesce_batches.rs | 6 +- datafusion/core/src/physical_plan/display.rs | 2 +- datafusion/core/src/physical_plan/mod.rs | 2 +- datafusion/core/tests/config_from_env.rs | 13 ++-- datafusion/core/tests/sql/explain_analyze.rs | 14 +++-- datafusion/core/tests/sql/mod.rs | 12 ++-- datafusion/core/tests/sql/window.rs | 7 ++- .../test_files/information_schema.slt | 1 - docs/source/user-guide/configs.md | 59 +++++++++---------- 11 files changed, 66 insertions(+), 73 deletions(-) diff --git a/datafusion/core/src/config.rs b/datafusion/core/src/config.rs index 976b8c07376ac..5c06e4e982b52 100644 --- a/datafusion/core/src/config.rs +++ b/datafusion/core/src/config.rs @@ -50,10 +50,6 @@ pub const OPT_BATCH_SIZE: &str = "datafusion.execution.batch_size"; /// Configuration option "datafusion.execution.coalesce_batches" pub const OPT_COALESCE_BATCHES: &str = "datafusion.execution.coalesce_batches"; -/// Configuration option "datafusion.execution.coalesce_target_batch_size" -pub const OPT_COALESCE_TARGET_BATCH_SIZE: &str = - "datafusion.execution.coalesce_target_batch_size"; - /// Configuration option "datafusion.execution.collect_statistics" pub const OPT_COLLECT_STATISTICS: &str = "datafusion.execution.collect_statistics"; @@ -317,14 +313,8 @@ impl BuiltInConfigs { small batches will be coalesced into larger batches. This is helpful when there \ are highly selective filters or joins that could produce tiny output batches. The \ target batch size is determined by the configuration setting \ - '{}'.", OPT_COALESCE_TARGET_BATCH_SIZE), + '{}'.", OPT_BATCH_SIZE), true, - ), - ConfigDefinition::new_u64( - OPT_COALESCE_TARGET_BATCH_SIZE, - format!("Target batch size when coalescing batches. Uses in conjunction with the \ - configuration setting '{}'.", OPT_COALESCE_BATCHES), - 4096, ), ConfigDefinition::new_string( OPT_TIME_ZONE, diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index fad3752e131ab..afdafa0a01779 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -72,7 +72,7 @@ use crate::physical_optimizer::coalesce_batches::CoalesceBatches; use crate::physical_optimizer::repartition::Repartition; use crate::config::{ - ConfigOptions, OPT_BATCH_SIZE, OPT_COALESCE_BATCHES, OPT_COALESCE_TARGET_BATCH_SIZE, + ConfigOptions, OPT_BATCH_SIZE, OPT_COALESCE_BATCHES, OPT_ENABLE_ROUND_ROBIN_REPARTITION, OPT_FILTER_NULL_JOIN_KEYS, OPT_OPTIMIZER_MAX_PASSES, OPT_OPTIMIZER_SKIP_FAILED_RULES, }; @@ -1545,14 +1545,7 @@ impl SessionState { // It will not influence the distribution and ordering of the whole plan tree. // Therefore, to avoid influencing other rules, it should be run at last. if config.coalesce_batches() { - physical_optimizers.push(Arc::new(CoalesceBatches::new( - config - .config_options - .get_u64(OPT_COALESCE_TARGET_BATCH_SIZE) - .unwrap_or_default() - .try_into() - .unwrap(), - ))); + physical_optimizers.push(Arc::new(CoalesceBatches::new(config.batch_size()))); } SessionState { diff --git a/datafusion/core/src/physical_plan/coalesce_batches.rs b/datafusion/core/src/physical_plan/coalesce_batches.rs index afa2a85abd252..e80fff610e198 100644 --- a/datafusion/core/src/physical_plan/coalesce_batches.rs +++ b/datafusion/core/src/physical_plan/coalesce_batches.rs @@ -301,7 +301,7 @@ pub fn concat_batches( #[cfg(test)] mod tests { use super::*; - use crate::config::{OPT_COALESCE_BATCHES, OPT_COALESCE_TARGET_BATCH_SIZE}; + use crate::config::OPT_COALESCE_BATCHES; use crate::datasource::MemTable; use crate::physical_plan::filter::FilterExec; use crate::physical_plan::projection::ProjectionExec; @@ -312,9 +312,7 @@ mod tests { #[tokio::test] async fn test_custom_batch_size() -> Result<()> { - let ctx = SessionContext::with_config( - SessionConfig::new().set_u64(OPT_COALESCE_TARGET_BATCH_SIZE, 1234), - ); + let ctx = SessionContext::with_config(SessionConfig::new().with_batch_size(1234)); let plan = create_physical_plan(ctx).await?; let projection = plan.as_any().downcast_ref::().unwrap(); let coalesce = projection diff --git a/datafusion/core/src/physical_plan/display.rs b/datafusion/core/src/physical_plan/display.rs index a8093d8a7f0d8..d618a13f8ea99 100644 --- a/datafusion/core/src/physical_plan/display.rs +++ b/datafusion/core/src/physical_plan/display.rs @@ -74,7 +74,7 @@ impl<'a> DisplayableExecutionPlan<'a> { /// /// ```text /// ProjectionExec: expr=[a] - /// CoalesceBatchesExec: target_batch_size=4096 + /// CoalesceBatchesExec: target_batch_size=8192 /// FilterExec: a < 5 /// RepartitionExec: partitioning=RoundRobinBatch(16) /// CsvExec: source=...", diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs index 07b4a6abb9064..d81d487cff184 100644 --- a/datafusion/core/src/physical_plan/mod.rs +++ b/datafusion/core/src/physical_plan/mod.rs @@ -329,7 +329,7 @@ pub fn with_new_children_if_necessary( /// let plan_string = plan_string.replace(normalized.as_ref(), "WORKING_DIR"); /// /// assert_eq!("ProjectionExec: expr=[a@0 as a]\ -/// \n CoalesceBatchesExec: target_batch_size=4096\ +/// \n CoalesceBatchesExec: target_batch_size=8192\ /// \n FilterExec: a@0 < 5\ /// \n RepartitionExec: partitioning=RoundRobinBatch(3)\ /// \n CsvExec: files={1 group: [[WORKING_DIR/tests/example.csv]]}, has_header=true, limit=None, projection=[a]", diff --git a/datafusion/core/tests/config_from_env.rs b/datafusion/core/tests/config_from_env.rs index 20d153d8f83b4..3f1fcbd601d25 100644 --- a/datafusion/core/tests/config_from_env.rs +++ b/datafusion/core/tests/config_from_env.rs @@ -32,18 +32,17 @@ fn get_config_bool_from_env() { fn get_config_int_from_env() { let config_key = "datafusion.execution.batch_size"; let env_key = "DATAFUSION_EXECUTION_BATCH_SIZE"; + + // for valid testing env::set_var(env_key, "4096"); let config = ConfigOptions::from_env(); - env::remove_var(env_key); assert_eq!(config.get_u64(config_key).unwrap_or_default(), 4096); -} -#[test] -fn get_config_int_from_env_invalid() { - let config_key = "datafusion.execution.coalesce_target_batch_size"; - let env_key = "DATAFUSION_EXECUTION_COALESCE_TARGET_BATCH_SIZE"; + // for invalid testing env::set_var(env_key, "abc"); let config = ConfigOptions::from_env(); + assert_eq!(config.get_u64(config_key).unwrap_or_default(), 8192); // set to its default value + + // To avoid influence other testing, we need to clear this environment variable env::remove_var(env_key); - assert_eq!(config.get_u64(config_key).unwrap_or_default(), 4096); // set to its default value } diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs index 90fd91164fe17..1d24d868a6aba 100644 --- a/datafusion/core/tests/sql/explain_analyze.rs +++ b/datafusion/core/tests/sql/explain_analyze.rs @@ -25,7 +25,9 @@ use datafusion::{ async fn explain_analyze_baseline_metrics() { // This test uses the execute function to run an actual plan under EXPLAIN ANALYZE // and then validate the presence of baseline metrics for supported operators - let config = SessionConfig::new().with_target_partitions(3); + let config = SessionConfig::new() + .with_target_partitions(3) + .with_batch_size(4096); let ctx = SessionContext::with_config(config); register_aggregate_csv_by_sql(&ctx).await; // a query with as many operators as we have metrics for @@ -668,7 +670,9 @@ order by #[tokio::test] async fn test_physical_plan_display_indent() { // Hard code target_partitions as it appears in the RepartitionExec output - let config = SessionConfig::new().with_target_partitions(9000); + let config = SessionConfig::new() + .with_target_partitions(9000) + .with_batch_size(4096); let ctx = SessionContext::with_config(config); register_aggregate_csv(&ctx).await.unwrap(); let sql = "SELECT c1, MAX(c12), MIN(c12) as the_min \ @@ -711,7 +715,9 @@ async fn test_physical_plan_display_indent() { #[tokio::test] async fn test_physical_plan_display_indent_multi_children() { // Hard code target_partitions as it appears in the RepartitionExec output - let config = SessionConfig::new().with_target_partitions(9000); + let config = SessionConfig::new() + .with_target_partitions(9000) + .with_batch_size(4096); let ctx = SessionContext::with_config(config); // ensure indenting works for nodes with multiple children register_aggregate_csv(&ctx).await.unwrap(); @@ -760,7 +766,7 @@ async fn test_physical_plan_display_indent_multi_children() { async fn csv_explain() { // This test uses the execute function that create full plan cycle: logical, optimized logical, and physical, // then execute the physical plan and return the final explain results - let ctx = SessionContext::new(); + let ctx = SessionContext::with_config(SessionConfig::new().with_batch_size(4096)); register_aggregate_csv_by_sql(&ctx).await; let sql = "EXPLAIN SELECT c1 FROM aggregate_test_100 where c2 > cast(10 as int)"; let actual = execute(&ctx, sql).await; diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs index 41ccdb9a24bbb..bc4a44be432e9 100644 --- a/datafusion/core/tests/sql/mod.rs +++ b/datafusion/core/tests/sql/mod.rs @@ -187,7 +187,8 @@ fn create_join_context( let ctx = SessionContext::with_config( SessionConfig::new() .with_repartition_joins(repartition_joins) - .with_target_partitions(2), + .with_target_partitions(2) + .with_batch_size(4096), ); let t1_schema = Arc::new(Schema::new(vec![ @@ -241,7 +242,8 @@ fn create_left_semi_anti_join_context_with_null_ids( let ctx = SessionContext::with_config( SessionConfig::new() .with_repartition_joins(repartition_joins) - .with_target_partitions(2), + .with_target_partitions(2) + .with_batch_size(4096), ); let t1_schema = Arc::new(Schema::new(vec![ @@ -313,7 +315,8 @@ fn create_right_semi_anti_join_context_with_null_ids( let ctx = SessionContext::with_config( SessionConfig::new() .with_repartition_joins(repartition_joins) - .with_target_partitions(2), + .with_target_partitions(2) + .with_batch_size(4096), ); let t1_schema = Arc::new(Schema::new(vec![ @@ -618,7 +621,8 @@ fn create_sort_merge_join_datatype_context() -> Result { let ctx = SessionContext::with_config( SessionConfig::new() .set_bool(OPT_PREFER_HASH_JOIN, false) - .with_target_partitions(2), + .with_target_partitions(2) + .with_batch_size(4096), ); let t1_schema = Schema::new(vec![ diff --git a/datafusion/core/tests/sql/window.rs b/datafusion/core/tests/sql/window.rs index 32438b6108a31..3af6e83c82c51 100644 --- a/datafusion/core/tests/sql/window.rs +++ b/datafusion/core/tests/sql/window.rs @@ -1743,7 +1743,11 @@ async fn over_order_by_sort_keys_sorting_global_order_compacting() -> Result<()> #[tokio::test] async fn test_window_partition_by_order_by() -> Result<()> { - let ctx = SessionContext::with_config(SessionConfig::new().with_target_partitions(2)); + let ctx = SessionContext::with_config( + SessionConfig::new() + .with_target_partitions(2) + .with_batch_size(4096), + ); register_aggregate_csv(&ctx).await?; let sql = "SELECT \ @@ -2253,6 +2257,7 @@ async fn test_window_agg_sort_orderby_reversed_binary_expr() -> Result<()> { async fn test_remove_unnecessary_sort_in_sub_query() -> Result<()> { let config = SessionConfig::new() .with_target_partitions(8) + .with_batch_size(4096) .with_repartition_windows(true); let ctx = SessionContext::with_config(config); register_aggregate_csv(&ctx).await?; diff --git a/datafusion/core/tests/sqllogictests/test_files/information_schema.slt b/datafusion/core/tests/sqllogictests/test_files/information_schema.slt index da2db8de64def..8eaa7dd439b84 100644 --- a/datafusion/core/tests/sqllogictests/test_files/information_schema.slt +++ b/datafusion/core/tests/sqllogictests/test_files/information_schema.slt @@ -108,7 +108,6 @@ datafusion.catalog.location NULL datafusion.catalog.type NULL datafusion.execution.batch_size 8192 datafusion.execution.coalesce_batches true -datafusion.execution.coalesce_target_batch_size 4096 datafusion.execution.collect_statistics false datafusion.execution.parquet.enable_page_index false datafusion.execution.parquet.metadata_size_hint NULL diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 1c5f086563888..22a30d8a6b104 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -35,34 +35,33 @@ Values are parsed according to the [same rules used in casts from Utf8](https:// If the value in the environment variable cannot be cast to the type of the configuration option, the default value will be used instead and a warning emitted. Environment variables are read during `SessionConfig` initialisation so they must be set beforehand and will not affect running sessions. -| key | type | default | description | -| --------------------------------------------------------- | ------- | ------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| datafusion.catalog.create_default_catalog_and_schema | Boolean | true | Whether the default catalog and schema should be created automatically. | -| datafusion.catalog.information_schema | Boolean | false | Should DataFusion provide access to `information_schema` virtual tables for displaying schema information | -| datafusion.catalog.location | Utf8 | NULL | Location scanned to load tables for `default` schema, defaults to None | -| datafusion.catalog.type | Utf8 | NULL | Type of `TableProvider` to use when loading `default` schema. Defaults to None | -| datafusion.execution.batch_size | UInt64 | 8192 | Default batch size while creating new batches, it's especially useful for buffer-in-memory batches since creating tiny batches would results in too much metadata memory consumption. | -| datafusion.execution.coalesce_batches | Boolean | true | When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting 'datafusion.execution.coalesce_target_batch_size'. | -| datafusion.execution.coalesce_target_batch_size | UInt64 | 4096 | Target batch size when coalescing batches. Uses in conjunction with the configuration setting 'datafusion.execution.coalesce_batches'. | -| datafusion.execution.collect_statistics | Boolean | false | Should DataFusion collect statistics after listing files | -| datafusion.execution.parquet.enable_page_index | Boolean | false | If true, uses parquet data page level metadata (Page Index) statistics to reduce the number of rows decoded. | -| datafusion.execution.parquet.metadata_size_hint | UInt64 | NULL | If specified, the parquet reader will try and fetch the last `size_hint` bytes of the parquet file optimistically. If not specified, two read are required: One read to fetch the 8-byte parquet footer and another to fetch the metadata length encoded in the footer. | -| datafusion.execution.parquet.pruning | Boolean | true | If true, the parquet reader attempts to skip entire row groups based on the predicate in the query and the metadata (min/max values) stored in the parquet file. | -| datafusion.execution.parquet.pushdown_filters | Boolean | false | If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded. | -| datafusion.execution.parquet.reorder_filters | Boolean | false | If true, filter expressions evaluated during the parquet decoding opearation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query. | -| datafusion.execution.parquet.skip_metadata | Boolean | true | If true, the parquet reader skip the optional embedded metadata that may be in the file Schema. This setting can help avoid schema conflicts when querying multiple parquet files with schemas containing compatible types but different metadata. | -| datafusion.execution.target_partitions | UInt64 | 0 | Number of partitions for query execution. Increasing partitions can increase concurrency. Defaults to the number of cpu cores on the system. | -| datafusion.execution.time_zone | Utf8 | +00:00 | The session time zone which some function require e.g. EXTRACT(HOUR from SOME_TIME) shift the underline datetime according to the time zone, | +| key | type | default | description | +| --------------------------------------------------------- | ------- | ------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| datafusion.catalog.create_default_catalog_and_schema | Boolean | true | Whether the default catalog and schema should be created automatically. | +| datafusion.catalog.information_schema | Boolean | false | Should DataFusion provide access to `information_schema` virtual tables for displaying schema information | +| datafusion.catalog.location | Utf8 | NULL | Location scanned to load tables for `default` schema, defaults to None | +| datafusion.catalog.type | Utf8 | NULL | Type of `TableProvider` to use when loading `default` schema. Defaults to None | +| datafusion.execution.batch_size | UInt64 | 8192 | Default batch size while creating new batches, it's especially useful for buffer-in-memory batches since creating tiny batches would results in too much metadata memory consumption. | +| datafusion.execution.coalesce_batches | Boolean | true | When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting 'datafusion.execution.batch_size'. | +| datafusion.execution.collect_statistics | Boolean | false | Should DataFusion collect statistics after listing files | +| datafusion.execution.parquet.enable_page_index | Boolean | false | If true, uses parquet data page level metadata (Page Index) statistics to reduce the number of rows decoded. | +| datafusion.execution.parquet.metadata_size_hint | UInt64 | NULL | If specified, the parquet reader will try and fetch the last `size_hint` bytes of the parquet file optimistically. If not specified, two read are required: One read to fetch the 8-byte parquet footer and another to fetch the metadata length encoded in the footer. | +| datafusion.execution.parquet.pruning | Boolean | true | If true, the parquet reader attempts to skip entire row groups based on the predicate in the query and the metadata (min/max values) stored in the parquet file. | +| datafusion.execution.parquet.pushdown_filters | Boolean | false | If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded. | +| datafusion.execution.parquet.reorder_filters | Boolean | false | If true, filter expressions evaluated during the parquet decoding opearation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query. | +| datafusion.execution.parquet.skip_metadata | Boolean | true | If true, the parquet reader skip the optional embedded metadata that may be in the file Schema. This setting can help avoid schema conflicts when querying multiple parquet files with schemas containing compatible types but different metadata. | +| datafusion.execution.target_partitions | UInt64 | 0 | Number of partitions for query execution. Increasing partitions can increase concurrency. Defaults to the number of cpu cores on the system. | +| datafusion.execution.time_zone | Utf8 | +00:00 | The session time zone which some function require e.g. EXTRACT(HOUR from SOME_TIME) shift the underline datetime according to the time zone, | | then extract the hour. | -| datafusion.explain.logical_plan_only | Boolean | false | When set to true, the explain statement will only print logical plans. | -| datafusion.explain.physical_plan_only | Boolean | false | When set to true, the explain statement will only print physical plans. | -| datafusion.optimizer.enable_round_robin_repartition | Boolean | true | When set to true, the physical plan optimizer will try to add round robin repartition to increase parallelism to leverage more CPU cores | -| datafusion.optimizer.filter_null_join_keys | Boolean | false | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down. | -| datafusion.optimizer.hash_join_single_partition_threshold | UInt64 | 1048576 | The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition | -| datafusion.optimizer.max_passes | UInt64 | 3 | Number of times that the optimizer will attempt to optimize the plan | -| datafusion.optimizer.prefer_hash_join | Boolean | true | When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficientlythan SortMergeJoin but consumes more memory. Defaults to true | -| datafusion.optimizer.repartition_aggregations | Boolean | true | Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel using the provided `target_partitions` level | -| datafusion.optimizer.repartition_joins | Boolean | true | Should DataFusion repartition data using the join keys to execute joins in parallel using the provided `target_partitions` level | -| datafusion.optimizer.repartition_windows | Boolean | true | Should DataFusion repartition data using the partitions keys to execute window functions in parallel using the provided `target_partitions` level | -| datafusion.optimizer.skip_failed_rules | Boolean | true | When set to true, the logical plan optimizer will produce warning messages if any optimization rules produce errors and then proceed to the next rule. When set to false, any rules that produce errors will cause the query to fail. | -| datafusion.optimizer.top_down_join_key_reordering | Boolean | true | When set to true, the physical plan optimizer will run a top down process to reorder the join keys. Defaults to true | +| datafusion.explain.logical_plan_only | Boolean | false | When set to true, the explain statement will only print logical plans. | +| datafusion.explain.physical_plan_only | Boolean | false | When set to true, the explain statement will only print physical plans. | +| datafusion.optimizer.enable_round_robin_repartition | Boolean | true | When set to true, the physical plan optimizer will try to add round robin repartition to increase parallelism to leverage more CPU cores | +| datafusion.optimizer.filter_null_join_keys | Boolean | false | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down. | +| datafusion.optimizer.hash_join_single_partition_threshold | UInt64 | 1048576 | The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition | +| datafusion.optimizer.max_passes | UInt64 | 3 | Number of times that the optimizer will attempt to optimize the plan | +| datafusion.optimizer.prefer_hash_join | Boolean | true | When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficientlythan SortMergeJoin but consumes more memory. Defaults to true | +| datafusion.optimizer.repartition_aggregations | Boolean | true | Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel using the provided `target_partitions` level | +| datafusion.optimizer.repartition_joins | Boolean | true | Should DataFusion repartition data using the join keys to execute joins in parallel using the provided `target_partitions` level | +| datafusion.optimizer.repartition_windows | Boolean | true | Should DataFusion repartition data using the partitions keys to execute window functions in parallel using the provided `target_partitions` level | +| datafusion.optimizer.skip_failed_rules | Boolean | true | When set to true, the logical plan optimizer will produce warning messages if any optimization rules produce errors and then proceed to the next rule. When set to false, any rules that produce errors will cause the query to fail. | +| datafusion.optimizer.top_down_join_key_reordering | Boolean | true | When set to true, the physical plan optimizer will run a top down process to reorder the join keys. Defaults to true | From aa24281c5ac2857b45af3fd1c3c9d778c44743a8 Mon Sep 17 00:00:00 2001 From: yangzhong Date: Wed, 28 Dec 2022 17:02:27 +0800 Subject: [PATCH 4/4] Provide a constructor for the ConfigOptions with a string hash map --- datafusion/core/src/config.rs | 58 ++++++++++++++++++++++++ datafusion/core/src/execution/context.rs | 8 ++++ 2 files changed, 66 insertions(+) diff --git a/datafusion/core/src/config.rs b/datafusion/core/src/config.rs index 5c06e4e982b52..eebeb875c9421 100644 --- a/datafusion/core/src/config.rs +++ b/datafusion/core/src/config.rs @@ -512,6 +512,34 @@ impl ConfigOptions { Self { options } } + /// Create new ConfigOptions struct, taking values from a string hash map. + /// Only the built-in configurations will be extracted from the hash map + /// and other key value pairs will be ignored. + pub fn from_string_hash_map(settings: HashMap) -> Self { + let built_in = BuiltInConfigs::new(); + let mut options = HashMap::with_capacity(built_in.config_definitions.len()); + for config_def in &built_in.config_definitions { + let config_value = { + if let Some(v) = settings.get(&config_def.key) { + match ScalarValue::try_from_string(v.clone(), &config_def.data_type) { + Ok(parsed) => parsed, + Err(_) => { + warn!( + "Warning: could not parse setting {}={} to type {}.", + config_def.key, v, config_def.data_type + ); + config_def.default_value.clone() + } + } + } else { + config_def.default_value.clone() + } + }; + options.insert(config_def.key.clone(), config_value); + } + Self { options } + } + /// set a configuration option pub fn set(&mut self, key: &str, value: ScalarValue) { self.options.insert(key.to_string(), value); @@ -578,6 +606,7 @@ impl ConfigOptions { #[cfg(test)] mod test { use crate::config::{BuiltInConfigs, ConfigOptions}; + use std::collections::HashMap; #[test] fn docs() { @@ -619,4 +648,33 @@ mod test { assert_eq!(None, config.get_string(key)); assert!(!config.get_bool(key).unwrap_or_default()); } + + #[test] + fn get_config_from_string_hash_map() { + use crate::config::{OPT_COLLECT_STATISTICS, OPT_ENABLE_ROUND_ROBIN_REPARTITION}; + + let valid_built_in_setting = (OPT_ENABLE_ROUND_ROBIN_REPARTITION, "false"); + let invalid_built_in_setting = (OPT_COLLECT_STATISTICS, "true2"); + let none_built_in_setting = ("a", "false"); + + let mut settings = HashMap::new(); + settings.insert( + valid_built_in_setting.0.to_string(), + valid_built_in_setting.1.to_string(), + ); + settings.insert( + invalid_built_in_setting.0.to_string(), + invalid_built_in_setting.1.to_string(), + ); + settings.insert( + none_built_in_setting.0.to_string(), + none_built_in_setting.1.to_string(), + ); + + let config = ConfigOptions::from_string_hash_map(settings); + + assert_eq!(config.get_bool(valid_built_in_setting.0), Some(false)); + assert_eq!(config.get_bool(invalid_built_in_setting.0), Some(false)); + assert_eq!(None, config.get_bool(none_built_in_setting.0)); + } } diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index afdafa0a01779..85b21cdf2163b 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -1167,6 +1167,14 @@ impl SessionConfig { } } + /// Create new ConfigOptions struct, taking values from a string hash map. + pub fn from_string_hash_map(settings: HashMap) -> Self { + Self { + config_options: ConfigOptions::from_string_hash_map(settings), + ..Default::default() + } + } + /// Set a configuration option pub fn set(mut self, key: &str, value: ScalarValue) -> Self { self.config_options.set(key, value);