From b8accb7c86dddeb421504c51755bb14b274d68cc Mon Sep 17 00:00:00 2001 From: ctsk <9384305+ctsk@users.noreply.github.com> Date: Sat, 19 Apr 2025 07:08:20 +0200 Subject: [PATCH 1/8] Read benchmark SessionConfig from env --- benchmarks/src/bin/external_aggr.rs | 2 +- benchmarks/src/clickbench.rs | 2 +- benchmarks/src/h2o.rs | 2 +- benchmarks/src/imdb/run.rs | 2 +- benchmarks/src/sort_tpch.rs | 2 +- benchmarks/src/tpch/run.rs | 2 +- benchmarks/src/util/options.rs | 4 ++-- 7 files changed, 8 insertions(+), 8 deletions(-) diff --git a/benchmarks/src/bin/external_aggr.rs b/benchmarks/src/bin/external_aggr.rs index 578f71f8275d5..97cfefb0efd97 100644 --- a/benchmarks/src/bin/external_aggr.rs +++ b/benchmarks/src/bin/external_aggr.rs @@ -189,7 +189,7 @@ impl ExternalAggrConfig { ) -> Result> { let query_name = format!("Q{query_id}({})", human_readable_size(mem_limit as usize)); - let config = self.common.config(); + let config = self.common.config()?; let memory_pool: Arc = match mem_pool_type { "fair" => Arc::new(FairSpillPool::new(mem_limit as usize)), "greedy" => Arc::new(GreedyMemoryPool::new(mem_limit as usize)), diff --git a/benchmarks/src/clickbench.rs b/benchmarks/src/clickbench.rs index 923c2bdd7cdf4..2e934346748e1 100644 --- a/benchmarks/src/clickbench.rs +++ b/benchmarks/src/clickbench.rs @@ -116,7 +116,7 @@ impl RunOpt { }; // configure parquet options - let mut config = self.common.config(); + let mut config = self.common.config()?; { let parquet_options = &mut config.options_mut().execution.parquet; // The hits_partitioned dataset specifies string columns diff --git a/benchmarks/src/h2o.rs b/benchmarks/src/h2o.rs index cc463e70d74a2..d43257ef0b690 100644 --- a/benchmarks/src/h2o.rs +++ b/benchmarks/src/h2o.rs @@ -77,7 +77,7 @@ impl RunOpt { None => queries.min_query_id()..=queries.max_query_id(), }; - let config = self.common.config(); + let config = self.common.config()?; let rt_builder = self.common.runtime_env_builder()?; let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?); diff --git a/benchmarks/src/imdb/run.rs b/benchmarks/src/imdb/run.rs index d7d7a56d0540e..0aca5147a1778 100644 --- a/benchmarks/src/imdb/run.rs +++ b/benchmarks/src/imdb/run.rs @@ -303,7 +303,7 @@ impl RunOpt { async fn benchmark_query(&self, query_id: usize) -> Result> { let mut config = self .common - .config() + .config()? .with_collect_statistics(!self.disable_statistics); config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join; let rt_builder = self.common.runtime_env_builder()?; diff --git a/benchmarks/src/sort_tpch.rs b/benchmarks/src/sort_tpch.rs index 176234eca541c..2de2455f9f44d 100644 --- a/benchmarks/src/sort_tpch.rs +++ b/benchmarks/src/sort_tpch.rs @@ -202,7 +202,7 @@ impl RunOpt { /// Benchmark query `query_id` in `SORT_QUERIES` async fn benchmark_query(&self, query_id: usize) -> Result> { - let config = self.common.config(); + let config = self.common.config()?; let rt_builder = self.common.runtime_env_builder()?; let state = SessionStateBuilder::new() .with_config(config) diff --git a/benchmarks/src/tpch/run.rs b/benchmarks/src/tpch/run.rs index 752a5a1a6ba01..523d52e5b6ee9 100644 --- a/benchmarks/src/tpch/run.rs +++ b/benchmarks/src/tpch/run.rs @@ -123,7 +123,7 @@ impl RunOpt { async fn benchmark_query(&self, query_id: usize) -> Result> { let mut config = self .common - .config() + .config()? .with_collect_statistics(!self.disable_statistics); config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join; let rt_builder = self.common.runtime_env_builder()?; diff --git a/benchmarks/src/util/options.rs b/benchmarks/src/util/options.rs index a1cf31525dd92..b42b1456dbcaa 100644 --- a/benchmarks/src/util/options.rs +++ b/benchmarks/src/util/options.rs @@ -65,8 +65,8 @@ pub struct CommonOpt { impl CommonOpt { /// Return an appropriately configured `SessionConfig` - pub fn config(&self) -> SessionConfig { - self.update_config(SessionConfig::new()) + pub fn config(&self) -> Result { + SessionConfig::from_env().map(|config| self.update_config(config)) } /// Modify the existing config appropriately From cfb7800806acf791d77d68447528e1f0e443ae3e Mon Sep 17 00:00:00 2001 From: ctsk <9384305+ctsk@users.noreply.github.com> Date: Sun, 20 Apr 2025 17:39:57 +0200 Subject: [PATCH 2/8] Set target partitions from env by default fix --- benchmarks/src/util/options.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/benchmarks/src/util/options.rs b/benchmarks/src/util/options.rs index b42b1456dbcaa..dba33559b898d 100644 --- a/benchmarks/src/util/options.rs +++ b/benchmarks/src/util/options.rs @@ -25,7 +25,7 @@ use datafusion::{ }, prelude::SessionConfig, }; -use datafusion_common::{utils::get_available_parallelism, DataFusionError, Result}; +use datafusion_common::{DataFusionError, Result}; use structopt::StructOpt; // Common benchmark options (don't use doc comments otherwise this doc @@ -72,10 +72,12 @@ impl CommonOpt { /// Modify the existing config appropriately pub fn update_config(&self, config: SessionConfig) -> SessionConfig { let mut config = config - .with_target_partitions( - self.partitions.unwrap_or(get_available_parallelism()), - ) .with_batch_size(self.batch_size); + + if let Some(partitions) = self.partitions { + config = config.with_target_partitions(partitions) + } + if let Some(sort_spill_reservation_bytes) = self.sort_spill_reservation_bytes { config = config.with_sort_spill_reservation_bytes(sort_spill_reservation_bytes); From beb3a0d9848f31c14bf3b29f9b2620d9cda4434b Mon Sep 17 00:00:00 2001 From: ctsk <9384305+ctsk@users.noreply.github.com> Date: Sun, 20 Apr 2025 17:46:54 +0200 Subject: [PATCH 3/8] Set batch size from env by default --- benchmarks/src/imdb/run.rs | 4 ++-- benchmarks/src/util/options.rs | 12 +++++++----- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/benchmarks/src/imdb/run.rs b/benchmarks/src/imdb/run.rs index 0aca5147a1778..eaa67b98e11bc 100644 --- a/benchmarks/src/imdb/run.rs +++ b/benchmarks/src/imdb/run.rs @@ -514,7 +514,7 @@ mod tests { let common = CommonOpt { iterations: 1, partitions: Some(2), - batch_size: 8192, + batch_size: Some(8192), mem_pool_type: "fair".to_string(), memory_limit: None, sort_spill_reservation_bytes: None, @@ -550,7 +550,7 @@ mod tests { let common = CommonOpt { iterations: 1, partitions: Some(2), - batch_size: 8192, + batch_size: Some(8192), mem_pool_type: "fair".to_string(), memory_limit: None, sort_spill_reservation_bytes: None, diff --git a/benchmarks/src/util/options.rs b/benchmarks/src/util/options.rs index dba33559b898d..0550fab8bbf77 100644 --- a/benchmarks/src/util/options.rs +++ b/benchmarks/src/util/options.rs @@ -41,8 +41,8 @@ pub struct CommonOpt { pub partitions: Option, /// Batch size when reading CSV or Parquet files - #[structopt(short = "s", long = "batch-size", default_value = "8192")] - pub batch_size: usize, + #[structopt(short = "s", long = "batch-size")] + pub batch_size: Option, /// The memory pool type to use, should be one of "fair" or "greedy" #[structopt(long = "mem-pool-type", default_value = "fair")] @@ -70,9 +70,10 @@ impl CommonOpt { } /// Modify the existing config appropriately - pub fn update_config(&self, config: SessionConfig) -> SessionConfig { - let mut config = config - .with_batch_size(self.batch_size); + pub fn update_config(&self, mut config: SessionConfig) -> SessionConfig { + if let Some(batch_size) = self.batch_size { + config = config.with_batch_size(batch_size) + } if let Some(partitions) = self.partitions { config = config.with_target_partitions(partitions) @@ -82,6 +83,7 @@ impl CommonOpt { config = config.with_sort_spill_reservation_bytes(sort_spill_reservation_bytes); } + config } From 3e71b54296df817f4c8cf8f36d5f8278ea64d845 Mon Sep 17 00:00:00 2001 From: ctsk <9384305+ctsk@users.noreply.github.com> Date: Sun, 20 Apr 2025 18:48:10 +0200 Subject: [PATCH 4/8] Fix batch size option for tpch ci --- benchmarks/src/tpch/run.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/benchmarks/src/tpch/run.rs b/benchmarks/src/tpch/run.rs index 523d52e5b6ee9..d923e3e57d4f5 100644 --- a/benchmarks/src/tpch/run.rs +++ b/benchmarks/src/tpch/run.rs @@ -355,7 +355,7 @@ mod tests { let common = CommonOpt { iterations: 1, partitions: Some(2), - batch_size: 8192, + batch_size: Some(8192), mem_pool_type: "fair".to_string(), memory_limit: None, sort_spill_reservation_bytes: None, @@ -392,7 +392,7 @@ mod tests { let common = CommonOpt { iterations: 1, partitions: Some(2), - batch_size: 8192, + batch_size: Some(8192), mem_pool_type: "fair".to_string(), memory_limit: None, sort_spill_reservation_bytes: None, From 9615753fdb08b626dfd6fcdb5ef642947285b085 Mon Sep 17 00:00:00 2001 From: ctsk <9384305+ctsk@users.noreply.github.com> Date: Tue, 22 Apr 2025 09:51:03 +0200 Subject: [PATCH 5/8] Log environment variable configuration --- datafusion/common/src/config.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 1e0f63d6d81ca..ad1e0e8b104c8 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -864,7 +864,9 @@ impl ConfigOptions { for key in keys.0 { let env = key.to_uppercase().replace('.', "_"); if let Some(var) = std::env::var_os(env) { - ret.set(&key, var.to_string_lossy().as_ref())?; + let value = var.to_string_lossy(); + log::info!("Set {key} to {value} from the environment variable"); + ret.set(&key, value.as_ref())?; } } From e80010c04ca9d0a68eb0c1464ab8644ded34c48e Mon Sep 17 00:00:00 2001 From: ctsk <9384305+ctsk@users.noreply.github.com> Date: Tue, 22 Apr 2025 10:05:59 +0200 Subject: [PATCH 6/8] Document benchmarking env variable config --- benchmarks/README.md | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/benchmarks/README.md b/benchmarks/README.md index 86b2e1b3b958f..061366460f751 100644 --- a/benchmarks/README.md +++ b/benchmarks/README.md @@ -83,13 +83,25 @@ To run for specific query, for example Q21 ./bench.sh run tpch10 21 ``` -## Select join algorithm +## Benchmark with modified configurations +### Select join algorithm The benchmark runs with `prefer_hash_join == true` by default, which enforces HASH join algorithm. To run TPCH benchmarks with join other than HASH: ```shell PREFER_HASH_JOIN=false ./bench.sh run tpch ``` +### Configure with environment variables +Any [datafusion options](https://datafusion.apache.org/user-guide/configs.html) that are provided environment variables are +also considered by the benchmarks. +The following configuration runs the TPCH benchmark with datafusion configured to *not* repartition join keys. +```shell +DATAFUSION_OPTIMIZER_REPARTITION_JOINS=false ./bench.sh run tpch +``` +You might want to adjust the results location to avoid overwriting previous results. +Environment configuration that was picked up by datafusion is logged at `info` level. +To verify that datafusion picked up your configuration, run the benchmarks with `RUST_LOG=info` or higher. + ## Comparing performance of main and a branch ```shell From 2a66e4a091b36a110ac2866dc9a0b12538a08a2b Mon Sep 17 00:00:00 2001 From: ctsk <9384305+ctsk@users.noreply.github.com> Date: Tue, 22 Apr 2025 10:12:11 +0200 Subject: [PATCH 7/8] Add DATAFUSION_* env config to Error: unknown command: help Orchestrates running benchmarks against DataFusion checkouts Usage: ./bench.sh data [benchmark] [query] ./bench.sh run [benchmark] ./bench.sh compare ./bench.sh venv ********** Examples: ********** # Create the datasets for all benchmarks in /Users/christian/MA/datafusion/benchmarks/data ./bench.sh data # Run the 'tpch' benchmark on the datafusion checkout in /source/datafusion DATAFUSION_DIR=/source/datafusion ./bench.sh run tpch ********** * Commands ********** data: Generates or downloads data needed for benchmarking run: Runs the named benchmark compare: Compares results from benchmark runs venv: Creates new venv (unless already exists) and installs compare's requirements into it ********** * Benchmarks ********** all(default): Data/Run/Compare for all benchmarks tpch: TPCH inspired benchmark on Scale Factor (SF) 1 (~1GB), single parquet file per table, hash join tpch_mem: TPCH inspired benchmark on Scale Factor (SF) 1 (~1GB), query from memory tpch10: TPCH inspired benchmark on Scale Factor (SF) 10 (~10GB), single parquet file per table, hash join tpch_mem10: TPCH inspired benchmark on Scale Factor (SF) 10 (~10GB), query from memory cancellation: How long cancelling a query takes parquet: Benchmark of parquet reader's filtering speed sort: Benchmark of sorting speed sort_tpch: Benchmark of sorting speed for end-to-end sort queries on TPCH dataset clickbench_1: ClickBench queries against a single parquet file clickbench_partitioned: ClickBench queries against a partitioned (100 files) parquet clickbench_extended: ClickBench "inspired" queries against a single parquet (DataFusion specific) external_aggr: External aggregation benchmark h2o_small: h2oai benchmark with small dataset (1e7 rows) for groupby, default file format is csv h2o_medium: h2oai benchmark with medium dataset (1e8 rows) for groupby, default file format is csv h2o_big: h2oai benchmark with large dataset (1e9 rows) for groupby, default file format is csv h2o_small_join: h2oai benchmark with small dataset (1e7 rows) for join, default file format is csv h2o_medium_join: h2oai benchmark with medium dataset (1e8 rows) for join, default file format is csv h2o_big_join: h2oai benchmark with large dataset (1e9 rows) for join, default file format is csv imdb: Join Order Benchmark (JOB) using the IMDB dataset converted to parquet ********** * Supported Configuration (Environment Variables) ********** DATA_DIR directory to store datasets CARGO_COMMAND command that runs the benchmark binary DATAFUSION_DIR directory to use (default /Users/christian/MA/datafusion/benchmarks/..) RESULTS_NAME folder where the benchmark files are stored PREFER_HASH_JOIN Prefer hash join algorithm (default true) VENV_PATH Python venv to use for compare and venv commands (default ./venv, override by /bin/activate) DATAFUSION_* Set the given datafusion configuration --- benchmarks/bench.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/benchmarks/bench.sh b/benchmarks/bench.sh index 5d3ad3446ddb9..7788795c2e454 100755 --- a/benchmarks/bench.sh +++ b/benchmarks/bench.sh @@ -98,6 +98,7 @@ DATAFUSION_DIR directory to use (default $DATAFUSION_DIR) RESULTS_NAME folder where the benchmark files are stored PREFER_HASH_JOIN Prefer hash join algorithm (default true) VENV_PATH Python venv to use for compare and venv commands (default ./venv, override by /bin/activate) +DATAFUSION_* Set the given datafusion configuration " exit 1 } From e691aaac6cc0de45eae6873df129241c250f80af Mon Sep 17 00:00:00 2001 From: ctsk <9384305+ctsk@users.noreply.github.com> Date: Tue, 22 Apr 2025 10:16:43 +0200 Subject: [PATCH 8/8] fmt --- datafusion/common/src/config.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index ad1e0e8b104c8..6b885f7c0a894 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -864,7 +864,7 @@ impl ConfigOptions { for key in keys.0 { let env = key.to_uppercase().replace('.', "_"); if let Some(var) = std::env::var_os(env) { - let value = var.to_string_lossy(); + let value = var.to_string_lossy(); log::info!("Set {key} to {value} from the environment variable"); ret.set(&key, value.as_ref())?; }