From a7b9ed775b250a306fe5a35d376ec6371ab1b4ec Mon Sep 17 00:00:00 2001 From: Gabriel Musat Mestre Date: Sun, 1 Feb 2026 12:10:47 +0100 Subject: [PATCH] Add optional hash join buffering --- benchmarks/src/imdb/run.rs | 8 ++ benchmarks/src/tpcds/run.rs | 6 + benchmarks/src/tpch/run.rs | 8 ++ datafusion/common/src/config.rs | 15 +++ .../src/hash_join_buffering.rs | 103 ++++++++++++++++++ datafusion/physical-optimizer/src/lib.rs | 1 + .../physical-optimizer/src/optimizer.rs | 5 + .../sqllogictest/test_files/explain.slt | 4 + .../test_files/information_schema.slt | 2 + datafusion/sqllogictest/test_files/struct.slt | 2 +- docs/source/user-guide/configs.md | 1 + 11 files changed, 154 insertions(+), 1 deletion(-) create mode 100644 datafusion/physical-optimizer/src/hash_join_buffering.rs diff --git a/benchmarks/src/imdb/run.rs b/benchmarks/src/imdb/run.rs index 9ddea67148efd..29ca5249aa5b3 100644 --- a/benchmarks/src/imdb/run.rs +++ b/benchmarks/src/imdb/run.rs @@ -92,6 +92,10 @@ pub struct RunOpt { /// True by default. #[arg(short = 'j', long = "prefer_hash_join", default_value = "true")] prefer_hash_join: BoolDefaultTrue, + + /// How many bytes to buffer on the probe side of hash joins. + #[arg(long, default_value = "0")] + hash_join_buffering_capacity: usize, } fn map_query_id_to_str(query_id: usize) -> &'static str { @@ -306,6 +310,8 @@ impl RunOpt { .config()? .with_collect_statistics(!self.disable_statistics); config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join; + config.options_mut().execution.hash_join_buffering_capacity = + self.hash_join_buffering_capacity; let rt_builder = self.common.runtime_env_builder()?; let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?); @@ -527,6 +533,7 @@ mod tests { output_path: None, disable_statistics: false, prefer_hash_join: true, + hash_join_buffering_capacity: 0, }; opt.register_tables(&ctx).await?; let queries = get_query_sql(map_query_id_to_str(query))?; @@ -563,6 +570,7 @@ mod tests { output_path: None, disable_statistics: false, prefer_hash_join: true, + hash_join_buffering_capacity: 0, }; opt.register_tables(&ctx).await?; let queries = get_query_sql(map_query_id_to_str(query))?; diff --git a/benchmarks/src/tpcds/run.rs b/benchmarks/src/tpcds/run.rs index 586ee754d2114..8e24b121b2f93 100644 --- a/benchmarks/src/tpcds/run.rs +++ b/benchmarks/src/tpcds/run.rs @@ -144,6 +144,10 @@ pub struct RunOpt { /// The tables should have been created with the `--sort` option for this to have any effect. #[arg(short = 't', long = "sorted")] sorted: bool, + + /// How many bytes to buffer on the probe side of hash joins. + #[arg(long, default_value = "0")] + hash_join_buffering_capacity: usize, } impl RunOpt { @@ -162,6 +166,8 @@ impl RunOpt { config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join; config.options_mut().optimizer.enable_piecewise_merge_join = self.enable_piecewise_merge_join; + config.options_mut().execution.hash_join_buffering_capacity = + self.hash_join_buffering_capacity; let rt_builder = self.common.runtime_env_builder()?; let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?); // register tables diff --git a/benchmarks/src/tpch/run.rs b/benchmarks/src/tpch/run.rs index 9706296feae61..392e02f8478b7 100644 --- a/benchmarks/src/tpch/run.rs +++ b/benchmarks/src/tpch/run.rs @@ -105,6 +105,10 @@ pub struct RunOpt { /// The tables should have been created with the `--sort` option for this to have any effect. #[arg(short = 't', long = "sorted")] sorted: bool, + + /// How many bytes to buffer on the probe side of hash joins. + #[arg(long, default_value = "0")] + hash_join_buffering_capacity: usize, } impl RunOpt { @@ -123,6 +127,8 @@ impl RunOpt { config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join; config.options_mut().optimizer.enable_piecewise_merge_join = self.enable_piecewise_merge_join; + config.options_mut().execution.hash_join_buffering_capacity = + self.hash_join_buffering_capacity; let rt_builder = self.common.runtime_env_builder()?; let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?); // register tables @@ -392,6 +398,7 @@ mod tests { prefer_hash_join: true, enable_piecewise_merge_join: false, sorted: false, + hash_join_buffering_capacity: 0, }; opt.register_tables(&ctx).await?; let queries = get_query_sql(query)?; @@ -430,6 +437,7 @@ mod tests { prefer_hash_join: true, enable_piecewise_merge_join: false, sorted: false, + hash_join_buffering_capacity: 0, }; opt.register_tables(&ctx).await?; let queries = get_query_sql(query)?; diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index d71af206c78d5..1c249cc61ff4e 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -669,6 +669,21 @@ config_namespace! { /// # Default /// `false` — ANSI SQL mode is disabled by default. pub enable_ansi_mode: bool, default = false + + /// How many bytes to buffer in the probe side of hash joins while the build side is + /// concurrently being built. + /// + /// Without this, hash joins will wait until the full materialization of the build side + /// before polling the probe side. This is useful in scenarios where the query is not + /// completely CPU bounded, allowing to do some early work concurrently and reducing the + /// latency of the query. + /// + /// Note that when hash join buffering is enabled, the probe side will start eagerly + /// polling data, not giving time for the producer side of dynamic filters to produce any + /// meaningful predicate. Queries with dynamic filters might see performance degradation. + /// + /// Disabled by default, set to a number greater than 0 for enabling it. + pub hash_join_buffering_capacity: usize, default = 0 } } diff --git a/datafusion/physical-optimizer/src/hash_join_buffering.rs b/datafusion/physical-optimizer/src/hash_join_buffering.rs new file mode 100644 index 0000000000000..3c29b46c0fa64 --- /dev/null +++ b/datafusion/physical-optimizer/src/hash_join_buffering.rs @@ -0,0 +1,103 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::PhysicalOptimizerRule; +use datafusion_common::JoinSide; +use datafusion_common::config::ConfigOptions; +use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; +use datafusion_physical_plan::ExecutionPlan; +use datafusion_physical_plan::buffer::BufferExec; +use datafusion_physical_plan::joins::HashJoinExec; +use std::sync::Arc; + +/// Looks for all the [HashJoinExec]s in the plan and places a [BufferExec] node with the +/// configured capacity in the probe side: +/// +/// ```text +/// ┌───────────────────┐ +/// │ HashJoinExec │ +/// └─────▲────────▲────┘ +/// ┌───────┘ └─────────┐ +/// │ │ +/// ┌────────────────┐ ┌─────────────────┐ +/// │ Build side │ + │ BufferExec │ +/// └────────────────┘ └────────▲────────┘ +/// │ +/// ┌────────┴────────┐ +/// │ Probe side │ +/// └─────────────────┘ +/// ``` +/// +/// Which allows eagerly pulling it even before the build side has completely finished. +#[derive(Debug, Default)] +pub struct HashJoinBuffering {} + +impl HashJoinBuffering { + pub fn new() -> Self { + Self::default() + } +} + +impl PhysicalOptimizerRule for HashJoinBuffering { + fn optimize( + &self, + plan: Arc, + config: &ConfigOptions, + ) -> datafusion_common::Result> { + let capacity = config.execution.hash_join_buffering_capacity; + if capacity == 0 { + return Ok(plan); + } + + plan.transform_down(|plan| { + let Some(node) = plan.as_any().downcast_ref::() else { + return Ok(Transformed::no(plan)); + }; + let plan = Arc::clone(&plan); + Ok(Transformed::yes( + if HashJoinExec::probe_side() == JoinSide::Left { + // Do not stack BufferExec nodes together. + if node.left.as_any().downcast_ref::().is_some() { + return Ok(Transformed::no(plan)); + } + plan.with_new_children(vec![ + Arc::new(BufferExec::new(Arc::clone(&node.left), capacity)), + Arc::clone(&node.right), + ])? + } else { + // Do not stack BufferExec nodes together. + if node.right.as_any().downcast_ref::().is_some() { + return Ok(Transformed::no(plan)); + } + plan.with_new_children(vec![ + Arc::clone(&node.left), + Arc::new(BufferExec::new(Arc::clone(&node.right), capacity)), + ])? + }, + )) + }) + .data() + } + + fn name(&self) -> &str { + "HashJoinBuffering" + } + + fn schema_check(&self) -> bool { + true + } +} diff --git a/datafusion/physical-optimizer/src/lib.rs b/datafusion/physical-optimizer/src/lib.rs index 3a0d79ae2d234..98331a94e31c6 100644 --- a/datafusion/physical-optimizer/src/lib.rs +++ b/datafusion/physical-optimizer/src/lib.rs @@ -39,6 +39,7 @@ pub mod optimizer; pub mod output_requirements; pub mod projection_pushdown; pub use datafusion_pruning as pruning; +pub mod hash_join_buffering; pub mod pushdown_sort; pub mod sanity_checker; pub mod topk_aggregation; diff --git a/datafusion/physical-optimizer/src/optimizer.rs b/datafusion/physical-optimizer/src/optimizer.rs index 49225db03ac48..f31a1f981f876 100644 --- a/datafusion/physical-optimizer/src/optimizer.rs +++ b/datafusion/physical-optimizer/src/optimizer.rs @@ -35,6 +35,7 @@ use crate::sanity_checker::SanityCheckPlan; use crate::topk_aggregation::TopKAggregation; use crate::update_aggr_exprs::OptimizeAggregateOrder; +use crate::hash_join_buffering::HashJoinBuffering; use crate::limit_pushdown_past_window::LimitPushPastWindows; use crate::pushdown_sort::PushdownSort; use datafusion_common::Result; @@ -137,6 +138,10 @@ impl PhysicalOptimizer { // This can possibly be combined with [LimitPushdown] // It needs to come after [EnforceSorting] Arc::new(LimitPushPastWindows::new()), + // The HashJoinBuffering rule adds a BufferExec node with the configured capacity + // in the prob side of hash joins. That way, the probe side gets eagerly polled before + // the build side is completely finished. + Arc::new(HashJoinBuffering::new()), // The LimitPushdown rule tries to push limits down as far as possible, // replacing operators with fetching variants, or adding limits // past operators that support limit pushdown. diff --git a/datafusion/sqllogictest/test_files/explain.slt b/datafusion/sqllogictest/test_files/explain.slt index c5907d497500e..6b50a1d1cbcd4 100644 --- a/datafusion/sqllogictest/test_files/explain.slt +++ b/datafusion/sqllogictest/test_files/explain.slt @@ -243,6 +243,7 @@ physical_plan after ProjectionPushdown SAME TEXT AS ABOVE physical_plan after OutputRequirements DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], file_type=csv, has_header=true physical_plan after LimitAggregation SAME TEXT AS ABOVE physical_plan after LimitPushPastWindows SAME TEXT AS ABOVE +physical_plan after HashJoinBuffering SAME TEXT AS ABOVE physical_plan after LimitPushdown SAME TEXT AS ABOVE physical_plan after ProjectionPushdown SAME TEXT AS ABOVE physical_plan after PushdownSort SAME TEXT AS ABOVE @@ -323,6 +324,7 @@ physical_plan after OutputRequirements 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]: ScanBytes=Exact(32)),(Col[1]: ScanBytes=Inexact(24)),(Col[2]: ScanBytes=Exact(32)),(Col[3]: ScanBytes=Exact(32)),(Col[4]: ScanBytes=Exact(32)),(Col[5]: ScanBytes=Exact(64)),(Col[6]: ScanBytes=Exact(32)),(Col[7]: ScanBytes=Exact(64)),(Col[8]: ScanBytes=Inexact(88)),(Col[9]: ScanBytes=Inexact(49)),(Col[10]: ScanBytes=Exact(64))]] physical_plan after LimitAggregation SAME TEXT AS ABOVE physical_plan after LimitPushPastWindows SAME TEXT AS ABOVE +physical_plan after HashJoinBuffering SAME TEXT AS ABOVE physical_plan after LimitPushdown DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]: ScanBytes=Exact(32)),(Col[1]: ScanBytes=Inexact(24)),(Col[2]: ScanBytes=Exact(32)),(Col[3]: ScanBytes=Exact(32)),(Col[4]: ScanBytes=Exact(32)),(Col[5]: ScanBytes=Exact(64)),(Col[6]: ScanBytes=Exact(32)),(Col[7]: ScanBytes=Exact(64)),(Col[8]: ScanBytes=Inexact(88)),(Col[9]: ScanBytes=Inexact(49)),(Col[10]: ScanBytes=Exact(64))]] physical_plan after ProjectionPushdown SAME TEXT AS ABOVE physical_plan after PushdownSort SAME TEXT AS ABOVE @@ -367,6 +369,7 @@ physical_plan after OutputRequirements 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet physical_plan after LimitAggregation SAME TEXT AS ABOVE physical_plan after LimitPushPastWindows SAME TEXT AS ABOVE +physical_plan after HashJoinBuffering SAME TEXT AS ABOVE physical_plan after LimitPushdown DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet physical_plan after ProjectionPushdown SAME TEXT AS ABOVE physical_plan after PushdownSort SAME TEXT AS ABOVE @@ -608,6 +611,7 @@ physical_plan after ProjectionPushdown SAME TEXT AS ABOVE physical_plan after OutputRequirements DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], file_type=csv, has_header=true physical_plan after LimitAggregation SAME TEXT AS ABOVE physical_plan after LimitPushPastWindows SAME TEXT AS ABOVE +physical_plan after HashJoinBuffering SAME TEXT AS ABOVE physical_plan after LimitPushdown SAME TEXT AS ABOVE physical_plan after ProjectionPushdown SAME TEXT AS ABOVE physical_plan after PushdownSort SAME TEXT AS ABOVE diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index b61ceecb24fc0..18f6db9cdcf47 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -220,6 +220,7 @@ datafusion.execution.collect_statistics true datafusion.execution.enable_ansi_mode false datafusion.execution.enable_recursive_ctes true datafusion.execution.enforce_batch_size_in_joins false +datafusion.execution.hash_join_buffering_capacity 0 datafusion.execution.keep_partition_by_columns false datafusion.execution.listing_table_factory_infer_partitions true datafusion.execution.listing_table_ignore_subdirectory true @@ -358,6 +359,7 @@ datafusion.execution.collect_statistics true Should DataFusion collect statistic datafusion.execution.enable_ansi_mode false Whether to enable ANSI SQL mode. The flag is experimental and relevant only for DataFusion Spark built-in functions When `enable_ansi_mode` is set to `true`, the query engine follows ANSI SQL semantics for expressions, casting, and error handling. This means: - **Strict type coercion rules:** implicit casts between incompatible types are disallowed. - **Standard SQL arithmetic behavior:** operations such as division by zero, numeric overflow, or invalid casts raise runtime errors rather than returning `NULL` or adjusted values. - **Consistent ANSI behavior** for string concatenation, comparisons, and `NULL` handling. When `enable_ansi_mode` is `false` (the default), the engine uses a more permissive, non-ANSI mode designed for user convenience and backward compatibility. In this mode: - Implicit casts between types are allowed (e.g., string to integer when possible). - Arithmetic operations are more lenient — for example, `abs()` on the minimum representable integer value returns the input value instead of raising overflow. - Division by zero or invalid casts may return `NULL` instead of failing. # Default `false` — ANSI SQL mode is disabled by default. datafusion.execution.enable_recursive_ctes true Should DataFusion support recursive CTEs datafusion.execution.enforce_batch_size_in_joins false Should DataFusion enforce batch size in joins or not. By default, DataFusion will not enforce batch size in joins. Enforcing batch size in joins can reduce memory usage when joining large tables with a highly-selective join filter, but is also slightly slower. +datafusion.execution.hash_join_buffering_capacity 0 How many bytes to buffer in the probe side of hash joins while the build side is concurrently being built. Without this, hash joins will wait until the full materialization of the build side before polling the probe side. This is useful in scenarios where the query is not completely CPU bounded, allowing to do some early work concurrently and reducing the latency of the query. Note that when hash join buffering is enabled, the probe side will start eagerly polling data, not giving time for the producer side of dynamic filters to produce any meaningful predicate. Queries with dynamic filters might see performance degradation. Disabled by default, set to a number greater than 0 for enabling it. datafusion.execution.keep_partition_by_columns false Should DataFusion keep the columns used for partition_by in the output RecordBatches datafusion.execution.listing_table_factory_infer_partitions true Should a `ListingTable` created through the `ListingTableFactory` infer table partitions from Hive compliant directories. Defaults to true (partition columns are inferred and will be represented in the table schema). datafusion.execution.listing_table_ignore_subdirectory true Should sub directories be ignored when scanning directories for data files. Defaults to true (ignores subdirectories), consistent with Hive. Note that this setting does not affect reading partitioned tables (e.g. `/table/year=2021/month=01/data.parquet`). diff --git a/datafusion/sqllogictest/test_files/struct.slt b/datafusion/sqllogictest/test_files/struct.slt index e20815a58c765..53a1bb4ec6751 100644 --- a/datafusion/sqllogictest/test_files/struct.slt +++ b/datafusion/sqllogictest/test_files/struct.slt @@ -1666,4 +1666,4 @@ order by id; 3 2 150 statement ok -drop table t_agg_window; \ No newline at end of file +drop table t_agg_window; diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index e48f0a7c92276..95bdb0da35332 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -133,6 +133,7 @@ The following configuration settings are available: | datafusion.execution.enforce_batch_size_in_joins | false | Should DataFusion enforce batch size in joins or not. By default, DataFusion will not enforce batch size in joins. Enforcing batch size in joins can reduce memory usage when joining large tables with a highly-selective join filter, but is also slightly slower. | | datafusion.execution.objectstore_writer_buffer_size | 10485760 | Size (bytes) of data buffer DataFusion uses when writing output files. This affects the size of the data chunks that are uploaded to remote object stores (e.g. AWS S3). If very large (>= 100 GiB) output files are being written, it may be necessary to increase this size to avoid errors from the remote end point. | | datafusion.execution.enable_ansi_mode | false | Whether to enable ANSI SQL mode. The flag is experimental and relevant only for DataFusion Spark built-in functions When `enable_ansi_mode` is set to `true`, the query engine follows ANSI SQL semantics for expressions, casting, and error handling. This means: - **Strict type coercion rules:** implicit casts between incompatible types are disallowed. - **Standard SQL arithmetic behavior:** operations such as division by zero, numeric overflow, or invalid casts raise runtime errors rather than returning `NULL` or adjusted values. - **Consistent ANSI behavior** for string concatenation, comparisons, and `NULL` handling. When `enable_ansi_mode` is `false` (the default), the engine uses a more permissive, non-ANSI mode designed for user convenience and backward compatibility. In this mode: - Implicit casts between types are allowed (e.g., string to integer when possible). - Arithmetic operations are more lenient — for example, `abs()` on the minimum representable integer value returns the input value instead of raising overflow. - Division by zero or invalid casts may return `NULL` instead of failing. # Default `false` — ANSI SQL mode is disabled by default. | +| datafusion.execution.hash_join_buffering_capacity | 0 | How many bytes to buffer in the probe side of hash joins while the build side is concurrently being built. Without this, hash joins will wait until the full materialization of the build side before polling the probe side. This is useful in scenarios where the query is not completely CPU bounded, allowing to do some early work concurrently and reducing the latency of the query. Note that when hash join buffering is enabled, the probe side will start eagerly polling data, not giving time for the producer side of dynamic filters to produce any meaningful predicate. Queries with dynamic filters might see performance degradation. Disabled by default, set to a number greater than 0 for enabling it. | | datafusion.optimizer.enable_distinct_aggregation_soft_limit | true | When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read. | | datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores | | datafusion.optimizer.enable_topk_aggregation | true | When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible |