diff --git a/benchmarks/README.md b/benchmarks/README.md index 0399f8a7ffe2b..8fed85fa02b80 100644 --- a/benchmarks/README.md +++ b/benchmarks/README.md @@ -760,6 +760,20 @@ Different queries are included to test nested loop joins under various workloads ./bench.sh run nlj ``` +## Hash Join + +This benchmark focuses on the performance of queries with nested hash joins, minimizing other overheads such as scanning data sources or evaluating predicates. + +Several queries are included to test hash joins under various workloads. + +### Example Run + +```bash +# No need to generate data: this benchmark uses table function `range()` as the data source + +./bench.sh run hj +``` + ## Cancellation Test performance of cancelling queries. diff --git a/benchmarks/bench.sh b/benchmarks/bench.sh index 56af6b458c34a..dbfd319dd9ad4 100755 --- a/benchmarks/bench.sh +++ b/benchmarks/bench.sh @@ -125,8 +125,10 @@ imdb: Join Order Benchmark (JOB) using the IMDB dataset conver # Micro-Benchmarks (specific operators and features) cancellation: How long cancelling a query takes nlj: Benchmark for simple nested loop joins, testing various join scenarios +hj: Benchmark for simple hash joins, testing various join scenarios compile_profile: Compile and execute TPC-H across selected Cargo profiles, reporting timing and binary size + ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ Supported Configuration (Environment Variables) ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ @@ -305,6 +307,10 @@ main() { # nlj uses range() function, no data generation needed echo "NLJ benchmark does not require data generation" ;; + hj) + # hj uses range() function, no data generation needed + echo "HJ benchmark does not require data generation" + ;; compile_profile) data_tpch "1" ;; @@ -377,6 +383,7 @@ main() { run_imdb run_external_aggr run_nlj + run_hj ;; tpch) run_tpch "1" "parquet" @@ -484,6 +491,9 @@ main() { nlj) run_nlj ;; + hj) + run_hj + ;; compile_profile) run_compile_profile "${PROFILE_ARGS[@]}" ;; @@ -1136,6 +1146,14 @@ run_nlj() { debug_run $CARGO_COMMAND --bin dfbench -- nlj --iterations 5 -o "${RESULTS_FILE}" ${QUERY_ARG} } +# Runs the hj benchmark +run_hj() { + RESULTS_FILE="${RESULTS_DIR}/hj.json" + echo "RESULTS_FILE: ${RESULTS_FILE}" + echo "Running hj benchmark..." + debug_run $CARGO_COMMAND --bin dfbench -- hj --iterations 5 -o "${RESULTS_FILE}" ${QUERY_ARG} +} + compare_benchmarks() { BASE_RESULTS_DIR="${SCRIPT_DIR}/results" diff --git a/benchmarks/src/bin/dfbench.rs b/benchmarks/src/bin/dfbench.rs index 88378492b7267..816cae0e38555 100644 --- a/benchmarks/src/bin/dfbench.rs +++ b/benchmarks/src/bin/dfbench.rs @@ -33,7 +33,9 @@ static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc; #[global_allocator] static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc; -use datafusion_benchmarks::{cancellation, clickbench, h2o, imdb, nlj, sort_tpch, tpch}; +use datafusion_benchmarks::{ + cancellation, clickbench, h2o, hj, imdb, nlj, sort_tpch, tpch, +}; #[derive(Debug, StructOpt)] #[structopt(about = "benchmark command")] @@ -41,6 +43,7 @@ enum Options { Cancellation(cancellation::RunOpt), Clickbench(clickbench::RunOpt), H2o(h2o::RunOpt), + HJ(hj::RunOpt), Imdb(imdb::RunOpt), Nlj(nlj::RunOpt), SortTpch(sort_tpch::RunOpt), @@ -57,6 +60,7 @@ pub async fn main() -> Result<()> { Options::Cancellation(opt) => opt.run().await, Options::Clickbench(opt) => opt.run().await, Options::H2o(opt) => opt.run().await, + Options::HJ(opt) => opt.run().await, Options::Imdb(opt) => Box::pin(opt.run()).await, Options::Nlj(opt) => opt.run().await, Options::SortTpch(opt) => opt.run().await, diff --git a/benchmarks/src/hj.rs b/benchmarks/src/hj.rs new file mode 100644 index 0000000000000..505b322745485 --- /dev/null +++ b/benchmarks/src/hj.rs @@ -0,0 +1,273 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::util::{BenchmarkRun, CommonOpt, QueryResult}; +use datafusion::physical_plan::execute_stream; +use datafusion::{error::Result, prelude::SessionContext}; +use datafusion_common::instant::Instant; +use datafusion_common::{exec_datafusion_err, exec_err, DataFusionError}; +use structopt::StructOpt; + +use futures::StreamExt; + +// TODO: Add existence joins + +/// Run the Hash Join benchmark +/// +/// This micro-benchmark focuses on the performance characteristics of Hash Joins. +/// It uses simple equality predicates to ensure a hash join is selected. +/// Where we vary selectivity, we do so with additional cheap predicates that +/// do not change the join key (so the physical operator remains HashJoin). +#[derive(Debug, StructOpt, Clone)] +#[structopt(verbatim_doc_comment)] +pub struct RunOpt { + /// Query number (between 1 and 12). If not specified, runs all queries + #[structopt(short, long)] + query: Option, + + /// Common options (iterations, batch size, target_partitions, etc.) + #[structopt(flatten)] + common: CommonOpt, + + /// If present, write results json here + #[structopt(parse(from_os_str), short = "o", long = "output")] + output_path: Option, +} + +/// Inline SQL queries for Hash Join benchmarks +/// +/// Each query's comment includes: +/// - Left row count × Right row count +/// - Join predicate selectivity (approximate output fraction). +/// - Q11 and Q12 selectivity is relative to cartesian product while the others are +/// relative to probe side. +const HASH_QUERIES: &[&str] = &[ + // Q1: INNER 10 x 10K | LOW ~0.1% + // equality on key + cheap filter to downselect + r#" + SELECT t1.value, t2.value + FROM generate_series(0, 9000, 1000) AS t1(value) + JOIN range(10000) AS t2 + ON t1.value = t2.value; + "#, + // Q2: INNER 10 x 10K | LOW ~0.1% + r#" + SELECT t1.value, t2.value + FROM generate_series(0, 9000, 1000) AS t1 + JOIN range(10000) AS t2 + ON t1.value = t2.value + WHERE t1.value % 5 = 0 + "#, + // Q3: INNER 10K x 10K | HIGH ~90% + r#" + SELECT t1.value, t2.value + FROM range(10000) AS t1 + JOIN range(10000) AS t2 + ON t1.value = t2.value + WHERE t1.value % 10 <> 0 + "#, + // Q4: INNER 30 x 30K | LOW ~0.1% + r#" + SELECT t1.value, t2.value + FROM generate_series(0, 29000, 1000) AS t1 + JOIN range(30000) AS t2 + ON t1.value = t2.value + WHERE t1.value % 5 = 0 + "#, + // Q5: INNER 10 x 200K | VERY LOW ~0.005% (small to large) + r#" + SELECT t1.value, t2.value + FROM generate_series(0, 9000, 1000) AS t1 + JOIN range(200000) AS t2 + ON t1.value = t2.value + WHERE t1.value % 1000 = 0 + "#, + // Q6: INNER 200K x 10 | VERY LOW ~0.005% (large to small) + r#" + SELECT t1.value, t2.value + FROM range(200000) AS t1 + JOIN generate_series(0, 9000, 1000) AS t2 + ON t1.value = t2.value + WHERE t1.value % 1000 = 0 + "#, + // Q7: RIGHT OUTER 10 x 200K | LOW ~0.1% + // Outer join still uses HashJoin for equi-keys; the extra filter reduces matches + r#" + SELECT t1.value AS l, t2.value AS r + FROM generate_series(0, 9000, 1000) AS t1 + RIGHT JOIN range(200000) AS t2 + ON t1.value = t2.value + WHERE t2.value % 1000 = 0 + "#, + // Q8: LEFT OUTER 200K x 10 | LOW ~0.1% + r#" + SELECT t1.value AS l, t2.value AS r + FROM range(200000) AS t1 + LEFT JOIN generate_series(0, 9000, 1000) AS t2 + ON t1.value = t2.value + WHERE t1.value % 1000 = 0 + "#, + // Q9: FULL OUTER 30 x 30K | LOW ~0.1% + r#" + SELECT t1.value AS l, t2.value AS r + FROM generate_series(0, 29000, 1000) AS t1 + FULL JOIN range(30000) AS t2 + ON t1.value = t2.value + WHERE COALESCE(t1.value, t2.value) % 1000 = 0 + "#, + // Q10: FULL OUTER 30 x 30K | HIGH ~90% + r#" + SELECT t1.value AS l, t2.value AS r + FROM generate_series(0, 29000, 1000) AS t1 + FULL JOIN range(30000) AS t2 + ON t1.value = t2.value + WHERE COALESCE(t1.value, t2.value) % 10 <> 0 + "#, + // Q11: INNER 30 x 30K | MEDIUM ~50% | cheap predicate on parity + r#" + SELECT t1.value, t2.value + FROM generate_series(0, 29000, 1000) AS t1 + INNER JOIN range(30000) AS t2 + ON (t1.value % 2) = (t2.value % 2) + "#, + // Q12: FULL OUTER 30 x 30K | MEDIUM ~50% | expression key + r#" + SELECT t1.value AS l, t2.value AS r + FROM generate_series(0, 29000, 1000) AS t1 + FULL JOIN range(30000) AS t2 + ON (t1.value % 2) = (t2.value % 2) + "#, + // Q13: INNER 30 x 30K | LOW 0.1% | modulo with adding values + r#" + SELECT t1.value, t2.value + FROM generate_series(0, 29000, 1000) AS t1 + INNER JOIN range(30000) AS t2 + ON (t1.value = t2.value) AND ((t1.value + t2.value) % 10 < 1) + "#, + // Q14: FULL OUTER 30 x 30K | ALL ~100% | modulo + r#" + SELECT t1.value AS l, t2.value AS r + FROM generate_series(0, 29000, 1000) AS t1 + FULL JOIN range(30000) AS t2 + ON (t1.value = t2.value) AND ((t1.value + t2.value) % 10 = 0) + "#, +]; + +impl RunOpt { + pub async fn run(self) -> Result<()> { + println!("Running Hash Join benchmarks with the following options: {self:#?}\n"); + + let query_range = match self.query { + Some(query_id) => { + if query_id >= 1 && query_id <= HASH_QUERIES.len() { + query_id..=query_id + } else { + return exec_err!( + "Query {query_id} not found. Available queries: 1 to {}", + HASH_QUERIES.len() + ); + } + } + None => 1..=HASH_QUERIES.len(), + }; + + let config = self.common.config()?; + let rt_builder = self.common.runtime_env_builder()?; + let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?); + + let mut benchmark_run = BenchmarkRun::new(); + + for query_id in query_range { + let query_index = query_id - 1; + let sql = HASH_QUERIES[query_index]; + + benchmark_run.start_new_case(&format!("Query {query_id}")); + let query_run = self.benchmark_query(sql, &query_id.to_string(), &ctx).await; + match query_run { + Ok(query_results) => { + for iter in query_results { + benchmark_run.write_iter(iter.elapsed, iter.row_count); + } + } + Err(e) => { + return Err(DataFusionError::Context( + format!("Hash Join benchmark Q{query_id} failed with error:"), + Box::new(e), + )); + } + } + } + + benchmark_run.maybe_write_json(self.output_path.as_ref())?; + Ok(()) + } + + /// Validates that the physical plan uses a HashJoin, then executes. + async fn benchmark_query( + &self, + sql: &str, + query_name: &str, + ctx: &SessionContext, + ) -> Result> { + let mut query_results = vec![]; + + // Build/validate plan + let df = ctx.sql(sql).await?; + let physical_plan = df.create_physical_plan().await?; + let plan_string = format!("{physical_plan:#?}"); + + if !plan_string.contains("HashJoinExec") { + return Err(exec_datafusion_err!( + "Query {query_name} does not use Hash Join. Physical plan: {plan_string}" + )); + } + + // Execute without buffering + for i in 0..self.common.iterations { + let start = Instant::now(); + let row_count = Self::execute_sql_without_result_buffering(sql, ctx).await?; + let elapsed = start.elapsed(); + + println!( + "Query {query_name} iteration {i} returned {row_count} rows in {elapsed:?}" + ); + + query_results.push(QueryResult { elapsed, row_count }); + } + + Ok(query_results) + } + + /// Executes the SQL query and drops each batch to avoid result buffering. + async fn execute_sql_without_result_buffering( + sql: &str, + ctx: &SessionContext, + ) -> Result { + let mut row_count = 0; + + let df = ctx.sql(sql).await?; + let physical_plan = df.create_physical_plan().await?; + let mut stream = execute_stream(physical_plan, ctx.task_ctx())?; + + while let Some(batch) = stream.next().await { + row_count += batch?.num_rows(); + // Drop batches immediately to minimize memory pressure + } + + Ok(row_count) + } +} diff --git a/benchmarks/src/lib.rs b/benchmarks/src/lib.rs index 5d982fad6f77f..07cffa5ae468e 100644 --- a/benchmarks/src/lib.rs +++ b/benchmarks/src/lib.rs @@ -19,6 +19,7 @@ pub mod cancellation; pub mod clickbench; pub mod h2o; +pub mod hj; pub mod imdb; pub mod nlj; pub mod sort_tpch;