diff --git a/benchmarks/README.md b/benchmarks/README.md index 86b2e1b3b958f..061366460f751 100644 --- a/benchmarks/README.md +++ b/benchmarks/README.md @@ -83,13 +83,25 @@ To run for specific query, for example Q21 ./bench.sh run tpch10 21 ``` -## Select join algorithm +## Benchmark with modified configurations +### Select join algorithm The benchmark runs with `prefer_hash_join == true` by default, which enforces HASH join algorithm. To run TPCH benchmarks with join other than HASH: ```shell PREFER_HASH_JOIN=false ./bench.sh run tpch ``` +### Configure with environment variables +Any [datafusion options](https://datafusion.apache.org/user-guide/configs.html) that are provided environment variables are +also considered by the benchmarks. +The following configuration runs the TPCH benchmark with datafusion configured to *not* repartition join keys. +```shell +DATAFUSION_OPTIMIZER_REPARTITION_JOINS=false ./bench.sh run tpch +``` +You might want to adjust the results location to avoid overwriting previous results. +Environment configuration that was picked up by datafusion is logged at `info` level. +To verify that datafusion picked up your configuration, run the benchmarks with `RUST_LOG=info` or higher. + ## Comparing performance of main and a branch ```shell diff --git a/benchmarks/bench.sh b/benchmarks/bench.sh index 5d3ad3446ddb9..7788795c2e454 100755 --- a/benchmarks/bench.sh +++ b/benchmarks/bench.sh @@ -98,6 +98,7 @@ DATAFUSION_DIR directory to use (default $DATAFUSION_DIR) RESULTS_NAME folder where the benchmark files are stored PREFER_HASH_JOIN Prefer hash join algorithm (default true) VENV_PATH Python venv to use for compare and venv commands (default ./venv, override by /bin/activate) +DATAFUSION_* Set the given datafusion configuration " exit 1 } diff --git a/benchmarks/src/bin/external_aggr.rs b/benchmarks/src/bin/external_aggr.rs index 578f71f8275d5..97cfefb0efd97 100644 --- a/benchmarks/src/bin/external_aggr.rs +++ b/benchmarks/src/bin/external_aggr.rs @@ -189,7 +189,7 @@ impl ExternalAggrConfig { ) -> Result> { let query_name = format!("Q{query_id}({})", human_readable_size(mem_limit as usize)); - let config = self.common.config(); + let config = self.common.config()?; let memory_pool: Arc = match mem_pool_type { "fair" => Arc::new(FairSpillPool::new(mem_limit as usize)), "greedy" => Arc::new(GreedyMemoryPool::new(mem_limit as usize)), diff --git a/benchmarks/src/clickbench.rs b/benchmarks/src/clickbench.rs index 923c2bdd7cdf4..2e934346748e1 100644 --- a/benchmarks/src/clickbench.rs +++ b/benchmarks/src/clickbench.rs @@ -116,7 +116,7 @@ impl RunOpt { }; // configure parquet options - let mut config = self.common.config(); + let mut config = self.common.config()?; { let parquet_options = &mut config.options_mut().execution.parquet; // The hits_partitioned dataset specifies string columns diff --git a/benchmarks/src/h2o.rs b/benchmarks/src/h2o.rs index cc463e70d74a2..d43257ef0b690 100644 --- a/benchmarks/src/h2o.rs +++ b/benchmarks/src/h2o.rs @@ -77,7 +77,7 @@ impl RunOpt { None => queries.min_query_id()..=queries.max_query_id(), }; - let config = self.common.config(); + let config = self.common.config()?; let rt_builder = self.common.runtime_env_builder()?; let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?); diff --git a/benchmarks/src/imdb/run.rs b/benchmarks/src/imdb/run.rs index d7d7a56d0540e..eaa67b98e11bc 100644 --- a/benchmarks/src/imdb/run.rs +++ b/benchmarks/src/imdb/run.rs @@ -303,7 +303,7 @@ impl RunOpt { async fn benchmark_query(&self, query_id: usize) -> Result> { let mut config = self .common - .config() + .config()? .with_collect_statistics(!self.disable_statistics); config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join; let rt_builder = self.common.runtime_env_builder()?; @@ -514,7 +514,7 @@ mod tests { let common = CommonOpt { iterations: 1, partitions: Some(2), - batch_size: 8192, + batch_size: Some(8192), mem_pool_type: "fair".to_string(), memory_limit: None, sort_spill_reservation_bytes: None, @@ -550,7 +550,7 @@ mod tests { let common = CommonOpt { iterations: 1, partitions: Some(2), - batch_size: 8192, + batch_size: Some(8192), mem_pool_type: "fair".to_string(), memory_limit: None, sort_spill_reservation_bytes: None, diff --git a/benchmarks/src/sort_tpch.rs b/benchmarks/src/sort_tpch.rs index 176234eca541c..2de2455f9f44d 100644 --- a/benchmarks/src/sort_tpch.rs +++ b/benchmarks/src/sort_tpch.rs @@ -202,7 +202,7 @@ impl RunOpt { /// Benchmark query `query_id` in `SORT_QUERIES` async fn benchmark_query(&self, query_id: usize) -> Result> { - let config = self.common.config(); + let config = self.common.config()?; let rt_builder = self.common.runtime_env_builder()?; let state = SessionStateBuilder::new() .with_config(config) diff --git a/benchmarks/src/tpch/run.rs b/benchmarks/src/tpch/run.rs index 752a5a1a6ba01..d923e3e57d4f5 100644 --- a/benchmarks/src/tpch/run.rs +++ b/benchmarks/src/tpch/run.rs @@ -123,7 +123,7 @@ impl RunOpt { async fn benchmark_query(&self, query_id: usize) -> Result> { let mut config = self .common - .config() + .config()? .with_collect_statistics(!self.disable_statistics); config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join; let rt_builder = self.common.runtime_env_builder()?; @@ -355,7 +355,7 @@ mod tests { let common = CommonOpt { iterations: 1, partitions: Some(2), - batch_size: 8192, + batch_size: Some(8192), mem_pool_type: "fair".to_string(), memory_limit: None, sort_spill_reservation_bytes: None, @@ -392,7 +392,7 @@ mod tests { let common = CommonOpt { iterations: 1, partitions: Some(2), - batch_size: 8192, + batch_size: Some(8192), mem_pool_type: "fair".to_string(), memory_limit: None, sort_spill_reservation_bytes: None, diff --git a/benchmarks/src/util/options.rs b/benchmarks/src/util/options.rs index a1cf31525dd92..0550fab8bbf77 100644 --- a/benchmarks/src/util/options.rs +++ b/benchmarks/src/util/options.rs @@ -25,7 +25,7 @@ use datafusion::{ }, prelude::SessionConfig, }; -use datafusion_common::{utils::get_available_parallelism, DataFusionError, Result}; +use datafusion_common::{DataFusionError, Result}; use structopt::StructOpt; // Common benchmark options (don't use doc comments otherwise this doc @@ -41,8 +41,8 @@ pub struct CommonOpt { pub partitions: Option, /// Batch size when reading CSV or Parquet files - #[structopt(short = "s", long = "batch-size", default_value = "8192")] - pub batch_size: usize, + #[structopt(short = "s", long = "batch-size")] + pub batch_size: Option, /// The memory pool type to use, should be one of "fair" or "greedy" #[structopt(long = "mem-pool-type", default_value = "fair")] @@ -65,21 +65,25 @@ pub struct CommonOpt { impl CommonOpt { /// Return an appropriately configured `SessionConfig` - pub fn config(&self) -> SessionConfig { - self.update_config(SessionConfig::new()) + pub fn config(&self) -> Result { + SessionConfig::from_env().map(|config| self.update_config(config)) } /// Modify the existing config appropriately - pub fn update_config(&self, config: SessionConfig) -> SessionConfig { - let mut config = config - .with_target_partitions( - self.partitions.unwrap_or(get_available_parallelism()), - ) - .with_batch_size(self.batch_size); + pub fn update_config(&self, mut config: SessionConfig) -> SessionConfig { + if let Some(batch_size) = self.batch_size { + config = config.with_batch_size(batch_size) + } + + if let Some(partitions) = self.partitions { + config = config.with_target_partitions(partitions) + } + if let Some(sort_spill_reservation_bytes) = self.sort_spill_reservation_bytes { config = config.with_sort_spill_reservation_bytes(sort_spill_reservation_bytes); } + config } diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 1e0f63d6d81ca..6b885f7c0a894 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -864,7 +864,9 @@ impl ConfigOptions { for key in keys.0 { let env = key.to_uppercase().replace('.', "_"); if let Some(var) = std::env::var_os(env) { - ret.set(&key, var.to_string_lossy().as_ref())?; + let value = var.to_string_lossy(); + log::info!("Set {key} to {value} from the environment variable"); + ret.set(&key, value.as_ref())?; } }