From 1c773e1622b0a8e90dbf1543d81e14375e232707 Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Tue, 21 Apr 2026 01:00:43 -0700 Subject: [PATCH 01/12] roaring_bitmap_for_blazingly_fast_semi_joins_int32_join_key --- Cargo.lock | 17 ++++ Cargo.toml | 1 + datafusion/physical-plan/Cargo.toml | 5 + .../benches/roaring_semi_join.rs | 43 ++++++++ .../physical-plan/src/joins/hash_join/exec.rs | 97 ++++++++++++++++--- .../joins/hash_join/partitioned_hash_eval.rs | 7 +- .../src/joins/hash_join/stream.rs | 47 ++++++++- datafusion/physical-plan/src/joins/mod.rs | 4 + 8 files changed, 205 insertions(+), 16 deletions(-) create mode 100644 datafusion/physical-plan/benches/roaring_semi_join.rs diff --git a/Cargo.lock b/Cargo.lock index ca2554e99d88..29a3d6eeb02c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1186,6 +1186,12 @@ version = "3.20.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5d20789868f4b01b2f2caec9f5c4e0213b41e3e5702a50157d699ae31ced2fcb" +[[package]] +name = "bytemuck" +version = "1.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8efb64bd706a16a1bdde310ae86b351e4d21550d98d056f22f8a7f7a2183fec" + [[package]] name = "byteorder" version = "1.5.0" @@ -2501,6 +2507,7 @@ dependencies = [ "parking_lot", "pin-project-lite", "rand 0.9.2", + "roaring", "rstest", "rstest_reuse", "tokio", @@ -5253,6 +5260,16 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "roaring" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ba9ce64a8f45d7fc86358410bb1a82e8c987504c0d4900e9141d69a9f26c885" +dependencies = [ + "bytemuck", + "byteorder", +] + [[package]] name = "rstest" version = "0.26.1" diff --git a/Cargo.toml b/Cargo.toml index 8d90a11858a4..9d33c0f15dba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -185,6 +185,7 @@ prost = "0.14.1" rand = "0.9" recursive = "0.1.1" regex = "1.12" +roaring = "0.11.3" rstest = "0.26.1" serde_json = "1" sha2 = "^0.10.9" diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index 374fc275a06e..28705a06823f 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -70,6 +70,7 @@ log = { workspace = true } num-traits = { workspace = true } parking_lot = { workspace = true } pin-project-lite = "^0.2.7" +roaring = { workspace = true } tokio = { workspace = true } [dev-dependencies] @@ -108,3 +109,7 @@ required-features = ["test_utils"] harness = false name = "aggregate_vectorized" required-features = ["test_utils"] + +[[bench]] +harness = false +name = "roaring_semi_join" diff --git a/datafusion/physical-plan/benches/roaring_semi_join.rs b/datafusion/physical-plan/benches/roaring_semi_join.rs new file mode 100644 index 000000000000..e4a7d2dae0ad --- /dev/null +++ b/datafusion/physical-plan/benches/roaring_semi_join.rs @@ -0,0 +1,43 @@ +use criterion::{Criterion, black_box, criterion_group, criterion_main}; +use roaring::RoaringBitmap; +use std::collections::HashSet; + +fn bench_semi_join(c: &mut Criterion) { + let build_size = 100_000u32; + let probe_size = 1_000_000; + + let build_keys: Vec = (0..build_size).collect(); + let probe_keys: Vec = (0..probe_size) + .map(|i| (i as u32 * 7) % (build_size * 2)) + .collect(); + + let hashset: HashSet = build_keys.iter().copied().collect(); + let roaring: RoaringBitmap = build_keys.iter().copied().collect(); + + c.bench_function("hashset_probe_100k", |b| { + b.iter(|| { + let mut hits = 0u64; + for &key in &probe_keys { + if hashset.contains(&key) { + hits += 1; + } + } + black_box(hits) + }) + }); + + c.bench_function("roaring_probe_100k", |b| { + b.iter(|| { + let mut hits = 0u64; + for &key in &probe_keys { + if roaring.contains(key) { + hits += 1; + } + } + black_box(hits) + }) + }); +} + +criterion_group!(benches, bench_semi_join); +criterion_main!(benches); diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 735375441f54..05b693f8c36a 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -65,7 +65,7 @@ use crate::{ metrics::{ExecutionPlanMetricsSet, MetricsSet}, }; -use arrow::array::{ArrayRef, BooleanBufferBuilder}; +use arrow::array::{Array, ArrayRef, BooleanBufferBuilder, Int16Array, Int32Array, UInt32Array}; use arrow::compute::concat_batches; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; @@ -89,19 +89,19 @@ use datafusion_physical_expr::expressions::{Column, DynamicFilterPhysicalExpr, l use datafusion_physical_expr::projection::{ProjectionRef, combine_projections}; use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef}; +use super::partitioned_hash_eval::SeededRandomState; use datafusion_common::hash_utils::RandomState; use datafusion_physical_expr_common::physical_expr::fmt_sql; use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays; use futures::TryStreamExt; use parking_lot::Mutex; -use super::partitioned_hash_eval::SeededRandomState; - /// Hard-coded seed to ensure hash values from the hash join differ from `RepartitionExec`, avoiding collisions. pub(crate) const HASH_JOIN_SEED: SeededRandomState = SeededRandomState::with_seed(12210250226015887276); const ARRAY_MAP_CREATED_COUNT_METRIC_NAME: &str = "array_map_created_count"; +const ROARING_MAP_CREATED_COUNT_METRIC_NAME: &str = "roaring_map_created_count"; #[expect(clippy::too_many_arguments)] fn try_create_array_map( @@ -1316,6 +1316,10 @@ impl ExecutionPlan for HashJoinExec { .with_category(MetricCategory::Rows) .counter(ARRAY_MAP_CREATED_COUNT_METRIC_NAME, partition); + let roaring_map_created_count = MetricBuilder::new(&self.metrics) + .with_category(MetricCategory::Rows) + .counter(ROARING_MAP_CREATED_COUNT_METRIC_NAME, partition); + let left_fut = match self.mode { PartitionMode::CollectLeft => self.left_fut.try_once(|| { let left_stream = self.left.execute(0, Arc::clone(&context))?; @@ -1335,6 +1339,9 @@ impl ExecutionPlan for HashJoinExec { Arc::clone(context.session_config().options()), self.null_equality, array_map_created_count, + roaring_map_created_count, + self.join_type, + self.filter.is_some(), )) })?, PartitionMode::Partitioned => { @@ -1356,6 +1363,9 @@ impl ExecutionPlan for HashJoinExec { Arc::clone(context.session_config().options()), self.null_equality, array_map_created_count, + roaring_map_created_count, + self.join_type, + self.filter.is_some(), )) } PartitionMode::Auto => { @@ -1855,6 +1865,31 @@ fn should_collect_min_max_for_perfect_hash( Ok(ArrayMap::is_supported_type(&data_type)) } +fn should_use_roaring_bitmap( + join_type: &JoinType, + on_left: &[PhysicalExprRef], + schema: &Schema, + has_filter: bool, +) -> bool { + if has_filter { + return false; + } + if !matches!(join_type, JoinType::RightSemi | JoinType::RightAnti) { + return false; + } + if on_left.len() != 1 { + return false; + } + let left_key = &on_left[0]; + if !matches!( + left_key.data_type(schema), + Ok(DataType::Int32) | Ok(DataType::UInt32) + ) { + return false; + } + true +} + /// Collects all batches from the left (build) side stream and creates a hash map for joining. /// /// This function is responsible for: @@ -1896,6 +1931,9 @@ async fn collect_left_input( config: Arc, null_equality: NullEquality, array_map_created_count: Count, + roaring_map_created_count: Count, + join_type: JoinType, + has_filter: bool, ) -> Result { let schema = left_stream.schema(); @@ -1957,7 +1995,34 @@ async fn collect_left_input( }; let (join_hash_map, batch, left_values) = - if let Some((array_map, batch, left_value)) = try_create_array_map( + if config.execution.perfect_hash_join_small_build_threshold > 0 + && should_use_roaring_bitmap(&join_type, &on_left, &schema, has_filter) + { + let batch = concat_batches(&schema, &batches)?; + let left_values = evaluate_expressions_to_arrays(&on_left, &batch)?; + let key_col = &left_values[0]; + let bitmap = match key_col.data_type() { + DataType::UInt32 => key_col + .as_any() + .downcast_ref::() + .unwrap() + .values() + .iter() + .map(|&v| v as u32) + .collect(), + DataType::Int32 => key_col + .as_any() + .downcast_ref::() + .unwrap() + .values() + .iter() + .map(|&v| v as u32) + .collect(), + _ => return internal_err!("unsupported data type to build bitmap"), + }; + roaring_map_created_count.add(1); + (Map::RoaringMap(bitmap), batch, left_values) + } else if let Some((array_map, batch, left_value)) = try_create_array_map( &bounds, &schema, &batches, @@ -2090,21 +2155,25 @@ mod tests { use super::*; fn assert_phj_used(metrics: &MetricsSet, use_phj: bool) { + let array_map_count = metrics + .sum_by_name(ARRAY_MAP_CREATED_COUNT_METRIC_NAME) + .map(|v| v.as_usize()) + .unwrap_or(0); + let roaring_map_count = metrics + .sum_by_name(ROARING_MAP_CREATED_COUNT_METRIC_NAME) + .map(|v| v.as_usize()) + .unwrap_or(0); + if use_phj { assert!( - metrics - .sum_by_name(ARRAY_MAP_CREATED_COUNT_METRIC_NAME) - .expect("should have array_map_created_count metrics") - .as_usize() - >= 1 + array_map_count >= 1 || roaring_map_count >= 1, + "Expected either array_map or roaring_map to be created" ); } else { assert_eq!( - metrics - .sum_by_name(ARRAY_MAP_CREATED_COUNT_METRIC_NAME) - .map(|v| v.as_usize()) - .unwrap_or(0), - 0 + array_map_count + roaring_map_count, + 0, + "Expected no array_map or roaring_map to be created" ) } } diff --git a/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs b/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs index 0daac0bb86a7..353c67af3802 100644 --- a/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs +++ b/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs @@ -24,9 +24,9 @@ use arrow::{ datatypes::{DataType, Schema}, record_batch::RecordBatch, }; -use datafusion_common::Result; use datafusion_common::hash_utils::RandomState; use datafusion_common::hash_utils::{create_hashes, with_hashes}; +use datafusion_common::{Result, internal_err}; use datafusion_expr::ColumnarValue; use datafusion_physical_expr_common::physical_expr::{ DynHash, PhysicalExpr, PhysicalExprRef, @@ -335,6 +335,11 @@ impl PhysicalExpr for HashTableLookupExpr { let array = map.contain_keys(&join_keys)?; Ok(ColumnarValue::Array(Arc::new(array))) } + Map::RoaringMap(_bimap) => { + internal_err!( + "Roaringbitmap is not support for partitioned hash evaluation" + ) + } } } diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index 1004fba3d4f4..7a89a163831a 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -46,9 +46,10 @@ use crate::{ }, }; -use arrow::array::{Array, ArrayRef, UInt32Array, UInt64Array}; +use arrow::array::{Array, ArrayRef, Int32Array, UInt32Array, UInt64Array}; use arrow::datatypes::{Schema, SchemaRef}; use arrow::record_batch::RecordBatch; +use arrow_schema::DataType; use datafusion_common::{ JoinSide, JoinType, NullEquality, Result, internal_datafusion_err, internal_err, }; @@ -711,6 +712,50 @@ impl HashJoinStream { next_offset, ) } + Map::RoaringMap(bitmap) => { + let key_col = &state.values[0]; + let is_semi = matches!(self.join_type, JoinType::RightSemi); + let right_indices = match key_col.data_type() { + DataType::Int32 => { + let arr = key_col.as_any().downcast_ref::().unwrap(); + arr.values() + .iter() + .enumerate() + .filter_map(|(i, v)| { + let contains = bitmap.contains(*v as u32); + let emit = if is_semi { contains } else { !contains }; + emit.then_some(i as u32) + }) + .collect::>() + } + DataType::UInt32 => { + let arr = key_col.as_any().downcast_ref::().unwrap(); + arr.values() + .iter() + .enumerate() + .filter_map(|(i, v)| { + let contains = bitmap.contains(*v); + let emit = if is_semi { contains } else { !contains }; + emit.then_some(i as u32) + }) + .collect::>() + } + _ => { + return internal_err!("unsupported data type for roaring bitmap"); + } + }; + let indices = UInt32Array::from(right_indices); + let columns: Vec = state.batch + .columns() + .iter() + .map(|col| arrow::compute::take(col, &indices, None)) + .collect::, _>>()?; + + let batch = RecordBatch::try_new(self.schema.clone(), columns)?; + self.output_buffer.push_batch(batch)?; + self.state = HashJoinStreamState::FetchProbeBatch; + return Ok(StatefulStreamResult::Continue); + } }; let distinct_right_indices_count = count_distinct_sorted_indices(&right_indices); diff --git a/datafusion/physical-plan/src/joins/mod.rs b/datafusion/physical-plan/src/joins/mod.rs index 2cdfa1e6ac02..ab26b347b5ea 100644 --- a/datafusion/physical-plan/src/joins/mod.rs +++ b/datafusion/physical-plan/src/joins/mod.rs @@ -25,6 +25,7 @@ pub use hash_join::{ }; pub use nested_loop_join::{NestedLoopJoinExec, NestedLoopJoinExecBuilder}; use parking_lot::Mutex; +use roaring::RoaringBitmap; // Note: SortMergeJoin is not used in plans yet pub use piecewise_merge_join::PiecewiseMergeJoinExec; pub use sort_merge_join::SortMergeJoinExec; @@ -53,6 +54,8 @@ use utils::JoinHashMapType; pub enum Map { HashMap(Box), ArrayMap(ArrayMap), + // optimized path for single int join keys + RoaringMap(RoaringBitmap), } impl Map { @@ -61,6 +64,7 @@ impl Map { match self { Map::HashMap(map) => map.len(), Map::ArrayMap(array_map) => array_map.num_of_distinct_key(), + Map::RoaringMap(bitmap) => bitmap.len() as usize, } } From 40bea15899ef80408702ff741234c5db7fd57b43 Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Thu, 23 Apr 2026 16:52:12 -0700 Subject: [PATCH 02/12] rebase_main --- datafusion/physical-plan/src/joins/hash_join/exec.rs | 4 ++-- datafusion/physical-plan/src/joins/hash_join/stream.rs | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 05b693f8c36a..dd31a7735e08 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -65,7 +65,7 @@ use crate::{ metrics::{ExecutionPlanMetricsSet, MetricsSet}, }; -use arrow::array::{Array, ArrayRef, BooleanBufferBuilder, Int16Array, Int32Array, UInt32Array}; +use arrow::array::{Array, ArrayRef, BooleanBufferBuilder, Int32Array, UInt32Array}; use arrow::compute::concat_batches; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; @@ -2008,7 +2008,7 @@ async fn collect_left_input( .unwrap() .values() .iter() - .map(|&v| v as u32) + .map(|&v| v) .collect(), DataType::Int32 => key_col .as_any() diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index 7a89a163831a..38c926f14c59 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -745,7 +745,8 @@ impl HashJoinStream { } }; let indices = UInt32Array::from(right_indices); - let columns: Vec = state.batch + let columns: Vec = state + .batch .columns() .iter() .map(|col| arrow::compute::take(col, &indices, None)) From 5a75ee084e0460d0b1bef72546a4a4e027d1ca90 Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Thu, 23 Apr 2026 16:53:50 -0700 Subject: [PATCH 03/12] rebase_main --- datafusion/physical-plan/Cargo.toml | 4 -- .../benches/roaring_semi_join.rs | 43 ------------------- 2 files changed, 47 deletions(-) delete mode 100644 datafusion/physical-plan/benches/roaring_semi_join.rs diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index 28705a06823f..d87550b412c4 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -109,7 +109,3 @@ required-features = ["test_utils"] harness = false name = "aggregate_vectorized" required-features = ["test_utils"] - -[[bench]] -harness = false -name = "roaring_semi_join" diff --git a/datafusion/physical-plan/benches/roaring_semi_join.rs b/datafusion/physical-plan/benches/roaring_semi_join.rs deleted file mode 100644 index e4a7d2dae0ad..000000000000 --- a/datafusion/physical-plan/benches/roaring_semi_join.rs +++ /dev/null @@ -1,43 +0,0 @@ -use criterion::{Criterion, black_box, criterion_group, criterion_main}; -use roaring::RoaringBitmap; -use std::collections::HashSet; - -fn bench_semi_join(c: &mut Criterion) { - let build_size = 100_000u32; - let probe_size = 1_000_000; - - let build_keys: Vec = (0..build_size).collect(); - let probe_keys: Vec = (0..probe_size) - .map(|i| (i as u32 * 7) % (build_size * 2)) - .collect(); - - let hashset: HashSet = build_keys.iter().copied().collect(); - let roaring: RoaringBitmap = build_keys.iter().copied().collect(); - - c.bench_function("hashset_probe_100k", |b| { - b.iter(|| { - let mut hits = 0u64; - for &key in &probe_keys { - if hashset.contains(&key) { - hits += 1; - } - } - black_box(hits) - }) - }); - - c.bench_function("roaring_probe_100k", |b| { - b.iter(|| { - let mut hits = 0u64; - for &key in &probe_keys { - if roaring.contains(key) { - hits += 1; - } - } - black_box(hits) - }) - }); -} - -criterion_group!(benches, bench_semi_join); -criterion_main!(benches); From cc6607d73e5dca0308b617fdb91f9b254680f202 Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Thu, 23 Apr 2026 16:59:20 -0700 Subject: [PATCH 04/12] rebase_main --- Cargo.lock | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 695881c509f0..7bc949671f4f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2536,8 +2536,8 @@ dependencies = [ "num-traits", "parking_lot", "pin-project-lite", - "roaring", "rand 0.9.4", + "roaring", "rstest", "rstest_reuse", "tokio", From 86c0f304af9816f04d4be51fa40375d4b1eb9527 Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Thu, 23 Apr 2026 17:19:28 -0700 Subject: [PATCH 05/12] null_handling --- .../physical-plan/src/joins/hash_join/exec.rs | 11 ++--- .../src/joins/hash_join/stream.rs | 41 +++++++------------ 2 files changed, 18 insertions(+), 34 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 5d260716ded2..f312e9bad7ea 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -1993,9 +1993,7 @@ async fn collect_left_input( }; let (join_hash_map, batch, left_values) = - if config.execution.perfect_hash_join_small_build_threshold > 0 - && should_use_roaring_bitmap(&join_type, &on_left, &schema, has_filter) - { + if should_use_roaring_bitmap(&join_type, &on_left, &schema, has_filter) { let batch = concat_batches(&schema, &batches)?; let left_values = evaluate_expressions_to_arrays(&on_left, &batch)?; let key_col = &left_values[0]; @@ -2004,17 +2002,16 @@ async fn collect_left_input( .as_any() .downcast_ref::() .unwrap() - .values() .iter() - .map(|&v| v) + .flatten() .collect(), DataType::Int32 => key_col .as_any() .downcast_ref::() .unwrap() - .values() .iter() - .map(|&v| v as u32) + .flatten() + .map(|v| v as u32) .collect(), _ => return internal_err!("unsupported data type to build bitmap"), }; diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index 280046a11b9c..0d4d1dbe5899 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -46,7 +46,8 @@ use crate::{ }, }; -use arrow::array::{Array, ArrayRef, Int32Array, UInt32Array, UInt64Array}; +use arrow::array::{Array, ArrayRef, BooleanArray, Int32Array, UInt32Array, UInt64Array}; +use arrow::compute::filter_record_batch; use arrow::datatypes::{Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use arrow_schema::DataType; @@ -727,44 +728,30 @@ impl HashJoinStream { Map::RoaringMap(bitmap) => { let key_col = &state.values[0]; let is_semi = matches!(self.join_type, JoinType::RightSemi); - let right_indices = match key_col.data_type() { + let mask: BooleanArray = match key_col.data_type() { DataType::Int32 => { let arr = key_col.as_any().downcast_ref::().unwrap(); - arr.values() - .iter() - .enumerate() - .filter_map(|(i, v)| { - let contains = bitmap.contains(*v as u32); - let emit = if is_semi { contains } else { !contains }; - emit.then_some(i as u32) + arr.iter() + .map(|v| match v { + Some(v) => bitmap.contains(v as u32) == is_semi, + None => !is_semi, }) - .collect::>() + .collect() } DataType::UInt32 => { let arr = key_col.as_any().downcast_ref::().unwrap(); - arr.values() - .iter() - .enumerate() - .filter_map(|(i, v)| { - let contains = bitmap.contains(*v); - let emit = if is_semi { contains } else { !contains }; - emit.then_some(i as u32) + arr.iter() + .map(|v| match v { + Some(v) => bitmap.contains(v) == is_semi, + None => !is_semi, }) - .collect::>() + .collect() } _ => { return internal_err!("unsupported data type for roaring bitmap"); } }; - let indices = UInt32Array::from(right_indices); - let columns: Vec = state - .batch - .columns() - .iter() - .map(|col| arrow::compute::take(col, &indices, None)) - .collect::, _>>()?; - - let batch = RecordBatch::try_new(self.schema.clone(), columns)?; + let batch = filter_record_batch(&state.batch, &mask)?; self.output_buffer.push_batch(batch)?; self.state = HashJoinStreamState::FetchProbeBatch; return Ok(StatefulStreamResult::Continue); From d1094170f678fe7107cb0d4fe8369bc12e98afa9 Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Thu, 23 Apr 2026 17:39:14 -0700 Subject: [PATCH 06/12] null_handling --- datafusion/physical-plan/src/joins/hash_join/exec.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index f312e9bad7ea..297821b4f053 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -1993,7 +1993,9 @@ async fn collect_left_input( }; let (join_hash_map, batch, left_values) = - if should_use_roaring_bitmap(&join_type, &on_left, &schema, has_filter) { + if config.execution.perfect_hash_join_small_build_threshold > 0 + && should_use_roaring_bitmap(&join_type, &on_left, &schema, has_filter) + { let batch = concat_batches(&schema, &batches)?; let left_values = evaluate_expressions_to_arrays(&on_left, &batch)?; let key_col = &left_values[0]; From 31f2afc3044ef252862a1a92d320902626bfe508 Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Thu, 23 Apr 2026 17:55:03 -0700 Subject: [PATCH 07/12] update_slt_to_reflect_roaring_bitmaps --- datafusion/sqllogictest/test_files/joins.slt | 2 +- .../test_files/push_down_filter_parquet.slt | 20 +++++++++---------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index e0be63fe7152..c49d0df70676 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -5148,7 +5148,7 @@ LEFT ANTI JOIN ( ) t2 ON t1.k = t2.k; ---- Plan with Metrics -01)HashJoinExec: mode=CollectLeft, join_type=RightAnti, on=[(k@0, k@0)], metrics=[output_rows=2, elapsed_compute=, output_bytes=, output_batches=1, array_map_created_count=0, build_input_batches=0, build_input_rows=0, input_batches=1, input_rows=2, build_mem_used=, build_time=, join_time=, avg_fanout=N/A (0/0), probe_hit_rate=0% (0/2)] +01)HashJoinExec: mode=CollectLeft, join_type=RightAnti, on=[(k@0, k@0)], metrics=[output_rows=2, elapsed_compute=, output_bytes=, output_batches=1, array_map_created_count=0, build_input_batches=0, build_input_rows=0, input_batches=1, input_rows=2, roaring_map_created_count=0, build_mem_used=, build_time=, join_time=, avg_fanout=N/A (0/0), probe_hit_rate=0% (0/2)] 02)--ProjectionExec: expr=[column1@0 as k], metrics=[output_rows=0, elapsed_compute=, output_bytes=, output_batches=0, expr_0_eval_time=] 03)----FilterExec: column1@0 != 1, metrics=[output_rows=0, elapsed_compute=, output_bytes=, output_batches=0, selectivity=0% (0/1)] 04)------DataSourceExec: partitions=1, partition_sizes=[1], metrics=[] diff --git a/datafusion/sqllogictest/test_files/push_down_filter_parquet.slt b/datafusion/sqllogictest/test_files/push_down_filter_parquet.slt index 8469c32a1703..0b902c24202d 100644 --- a/datafusion/sqllogictest/test_files/push_down_filter_parquet.slt +++ b/datafusion/sqllogictest/test_files/push_down_filter_parquet.slt @@ -387,7 +387,7 @@ FROM join_probe p INNER JOIN join_build AS build ON p.a = build.a AND p.b = build.b; ---- Plan with Metrics -01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)], projection=[a@3, b@4, c@2, e@5], metrics=[output_rows=2, output_batches=1, array_map_created_count=0, build_input_batches=1, build_input_rows=2, input_batches=1, input_rows=2, avg_fanout=100% (2/2), probe_hit_rate=100% (2/2)] +01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)], projection=[a@3, b@4, c@2, e@5], metrics=[output_rows=2, output_batches=1, array_map_created_count=0, build_input_batches=1, build_input_rows=2, input_batches=1, input_rows=2, roaring_map_created_count=0, avg_fanout=100% (2/2), probe_hit_rate=100% (2/2)] 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/join_build.parquet]]}, projection=[a, b, c], file_type=parquet, metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, predicate_cache_inner_records=0, predicate_cache_records=0, scan_efficiency_ratio=20.48% (214/1.04 K)] 03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/join_probe.parquet]]}, projection=[a, b, e], file_type=parquet, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}, {c0:ab,c1:bb}]) ], pruning_predicate=a_null_count@1 != row_count@2 AND a_max@0 >= aa AND a_null_count@1 != row_count@2 AND a_min@3 <= ab AND b_null_count@5 != row_count@2 AND b_max@4 >= ba AND b_null_count@5 != row_count@2 AND b_min@6 <= bb, required_guarantees=[], metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=2, pushdown_rows_pruned=2, predicate_cache_inner_records=8, predicate_cache_records=4, scan_efficiency_ratio=22.78% (246/1.08 K)] @@ -472,8 +472,8 @@ INNER JOIN nested_t2 ON nested_t1.a = nested_t2.b INNER JOIN nested_t3 ON nested_t2.c = nested_t3.d; ---- Plan with Metrics -01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(c@3, d@0)], metrics=[output_rows=2, output_batches=1, array_map_created_count=0, build_input_batches=1, build_input_rows=2, input_batches=1, input_rows=2, avg_fanout=100% (2/2), probe_hit_rate=100% (2/2)] -02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, b@0)], metrics=[output_rows=2, output_batches=1, array_map_created_count=0, build_input_batches=1, build_input_rows=2, input_batches=1, input_rows=2, avg_fanout=100% (2/2), probe_hit_rate=100% (2/2)] +01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(c@3, d@0)], metrics=[output_rows=2, output_batches=1, array_map_created_count=0, build_input_batches=1, build_input_rows=2, input_batches=1, input_rows=2, roaring_map_created_count=0, avg_fanout=100% (2/2), probe_hit_rate=100% (2/2)] +02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, b@0)], metrics=[output_rows=2, output_batches=1, array_map_created_count=0, build_input_batches=1, build_input_rows=2, input_batches=1, input_rows=2, roaring_map_created_count=0, avg_fanout=100% (2/2), probe_hit_rate=100% (2/2)] 03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/nested_t1.parquet]]}, projection=[a, x], file_type=parquet, metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, predicate_cache_inner_records=0, predicate_cache_records=0, scan_efficiency_ratio=18.23% (144/790)] 04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/nested_t2.parquet]]}, projection=[b, c, y], file_type=parquet, predicate=DynamicFilter [ b@0 >= aa AND b@0 <= ab AND b@0 IN (SET) ([aa, ab]) ], pruning_predicate=b_null_count@1 != row_count@2 AND b_max@0 >= aa AND b_null_count@1 != row_count@2 AND b_min@3 <= ab AND (b_null_count@1 != row_count@2 AND b_min@3 <= aa AND aa <= b_max@0 OR b_null_count@1 != row_count@2 AND b_min@3 <= ab AND ab <= b_max@0), required_guarantees=[b in (aa, ab)], metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=1 total → 1 matched, page_index_rows_pruned=5 total → 5 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=2, pushdown_rows_pruned=3, predicate_cache_inner_records=5, predicate_cache_records=2, scan_efficiency_ratio=23.2% (252/1.09 K)] 05)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/nested_t3.parquet]]}, projection=[d, z], file_type=parquet, predicate=DynamicFilter [ d@0 >= ca AND d@0 <= cb AND hash_lookup ], pruning_predicate=d_null_count@1 != row_count@2 AND d_max@0 >= ca AND d_null_count@1 != row_count@2 AND d_min@3 <= cb, required_guarantees=[], metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=1 total → 1 matched, page_index_rows_pruned=8 total → 8 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=2, pushdown_rows_pruned=6, predicate_cache_inner_records=8, predicate_cache_records=2, scan_efficiency_ratio=22.12% (184/832)] @@ -604,7 +604,7 @@ LIMIT 2; ---- Plan with Metrics 01)SortExec: TopK(fetch=2), expr=[e@0 ASC NULLS LAST], preserve_partitioning=[false], filter=[e@0 < bb], metrics=[output_rows=2, output_batches=1, row_replacements=2] -02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, d@0)], projection=[e@2], metrics=[output_rows=2, output_batches=1, array_map_created_count=0, build_input_batches=1, build_input_rows=2, input_batches=1, input_rows=2, avg_fanout=100% (2/2), probe_hit_rate=100% (2/2)] +02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, d@0)], projection=[e@2], metrics=[output_rows=2, output_batches=1, array_map_created_count=0, build_input_batches=1, build_input_rows=2, input_batches=1, input_rows=2, roaring_map_created_count=0, avg_fanout=100% (2/2), probe_hit_rate=100% (2/2)] 03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/topk_join_build.parquet]]}, projection=[a], file_type=parquet, metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, predicate_cache_inner_records=0, predicate_cache_records=0, scan_efficiency_ratio=6.7% (70/1.04 K)] 04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/topk_join_probe.parquet]]}, projection=[d, e], file_type=parquet, predicate=DynamicFilter [ d@0 >= aa AND d@0 <= ab AND d@0 IN (SET) ([aa, ab]) ] AND DynamicFilter [ e@1 < bb ], pruning_predicate=d_null_count@1 != row_count@2 AND d_max@0 >= aa AND d_null_count@1 != row_count@2 AND d_min@3 <= ab AND (d_null_count@1 != row_count@2 AND d_min@3 <= aa AND aa <= d_max@0 OR d_null_count@1 != row_count@2 AND d_min@3 <= ab AND ab <= d_max@0) AND e_null_count@5 != row_count@2 AND e_min@4 < bb, required_guarantees=[d in (aa, ab)], metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=1 total → 1 matched, page_index_rows_pruned=4 total → 4 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=2, pushdown_rows_pruned=2, predicate_cache_inner_records=8, predicate_cache_records=4, scan_efficiency_ratio=15.37% (166/1.08 K)] @@ -738,7 +738,7 @@ INNER JOIN ( ) agg ON b.a = agg.a; ---- Plan with Metrics -01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0)], projection=[a@0, min_value@2], metrics=[output_rows=2, output_batches=2, array_map_created_count=0, build_input_batches=1, build_input_rows=2, input_batches=2, input_rows=2, avg_fanout=100% (2/2), probe_hit_rate=100% (2/2)] +01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0)], projection=[a@0, min_value@2], metrics=[output_rows=2, output_batches=2, array_map_created_count=0, build_input_batches=1, build_input_rows=2, input_batches=2, input_rows=2, roaring_map_created_count=0, avg_fanout=100% (2/2), probe_hit_rate=100% (2/2)] 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/join_agg_build.parquet]]}, projection=[a], file_type=parquet, metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, predicate_cache_inner_records=0, predicate_cache_records=0, scan_efficiency_ratio=15.32% (70/457)] 03)--ProjectionExec: expr=[a@0 as a, min(join_agg_probe.value)@1 as min_value], metrics=[output_rows=2, output_batches=2] 04)----AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[min(join_agg_probe.value)], metrics=[output_rows=2, output_batches=2, spill_count=0, spilled_rows=0] @@ -805,7 +805,7 @@ FROM nulls_build INNER JOIN nulls_probe ON nulls_build.a = nulls_probe.a AND nulls_build.b = nulls_probe.b; ---- Plan with Metrics -01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)], metrics=[output_rows=1, output_batches=1, array_map_created_count=0, build_input_batches=1, build_input_rows=3, input_batches=1, input_rows=1, avg_fanout=100% (1/1), probe_hit_rate=100% (1/1)] +01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)], metrics=[output_rows=1, output_batches=1, array_map_created_count=0, build_input_batches=1, build_input_rows=3, input_batches=1, input_rows=1, roaring_map_created_count=0, avg_fanout=100% (1/1), probe_hit_rate=100% (1/1)] 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/nulls_build.parquet]]}, projection=[a, b], file_type=parquet, metrics=[output_rows=3, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, predicate_cache_inner_records=0, predicate_cache_records=0, scan_efficiency_ratio=18.6% (144/774)] 03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/nulls_probe.parquet]]}, projection=[a, b, c], file_type=parquet, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= 1 AND b@1 <= 2 AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:1}, {c0:,c1:2}, {c0:ab,c1:}]) ], pruning_predicate=a_null_count@1 != row_count@2 AND a_max@0 >= aa AND a_null_count@1 != row_count@2 AND a_min@3 <= ab AND b_null_count@5 != row_count@2 AND b_max@4 >= 1 AND b_null_count@5 != row_count@2 AND b_min@6 <= 2, required_guarantees=[], metrics=[output_rows=1, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=1, pushdown_rows_pruned=3, predicate_cache_inner_records=8, predicate_cache_records=2, scan_efficiency_ratio=21.1% (237/1.12 K)] @@ -871,7 +871,7 @@ FROM lj_build LEFT JOIN lj_probe ON lj_build.a = lj_probe.a AND lj_build.b = lj_probe.b; ---- Plan with Metrics -01)HashJoinExec: mode=CollectLeft, join_type=Left, on=[(a@0, a@0), (b@1, b@1)], metrics=[output_rows=2, output_batches=1, array_map_created_count=0, build_input_batches=1, build_input_rows=2, input_batches=2, input_rows=2, avg_fanout=100% (2/2), probe_hit_rate=100% (2/2)] +01)HashJoinExec: mode=CollectLeft, join_type=Left, on=[(a@0, a@0), (b@1, b@1)], metrics=[output_rows=2, output_batches=1, array_map_created_count=0, build_input_batches=1, build_input_rows=2, input_batches=2, input_rows=2, roaring_map_created_count=0, avg_fanout=100% (2/2), probe_hit_rate=100% (2/2)] 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/lj_build.parquet]]}, projection=[a, b, c], file_type=parquet, metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, predicate_cache_inner_records=0, predicate_cache_records=0, scan_efficiency_ratio=20.48% (214/1.04 K)] 03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/lj_probe.parquet]]}, projection=[a, b, e], file_type=parquet, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}, {c0:ab,c1:bb}]) ], pruning_predicate=a_null_count@1 != row_count@2 AND a_max@0 >= aa AND a_null_count@1 != row_count@2 AND a_min@3 <= ab AND b_null_count@5 != row_count@2 AND b_max@4 >= ba AND b_null_count@5 != row_count@2 AND b_min@6 <= bb, required_guarantees=[], metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=2, pushdown_rows_pruned=2, predicate_cache_inner_records=8, predicate_cache_records=4, scan_efficiency_ratio=22.78% (246/1.08 K)] @@ -887,7 +887,7 @@ WHERE EXISTS ( ); ---- Plan with Metrics -01)HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(a@0, a@0), (b@1, b@1)], metrics=[output_rows=2, output_batches=1, array_map_created_count=0, build_input_batches=1, build_input_rows=2, input_batches=2, input_rows=4, avg_fanout=100% (2/2), probe_hit_rate=100% (2/2)] +01)HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(a@0, a@0), (b@1, b@1)], metrics=[output_rows=2, output_batches=1, array_map_created_count=0, build_input_batches=1, build_input_rows=2, input_batches=2, input_rows=4, roaring_map_created_count=0, avg_fanout=100% (2/2), probe_hit_rate=100% (2/2)] 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/lj_build.parquet]]}, projection=[a, b, c], file_type=parquet, metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, predicate_cache_inner_records=0, predicate_cache_records=0, scan_efficiency_ratio=20.48% (214/1.04 K)] 03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/lj_probe.parquet]]}, projection=[a, b], file_type=parquet, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}, {c0:ab,c1:bb}]) ], pruning_predicate=a_null_count@1 != row_count@2 AND a_max@0 >= aa AND a_null_count@1 != row_count@2 AND a_min@3 <= ab AND b_null_count@5 != row_count@2 AND b_max@4 >= ba AND b_null_count@5 != row_count@2 AND b_min@6 <= bb, required_guarantees=[], metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=2, pushdown_rows_pruned=2, predicate_cache_inner_records=8, predicate_cache_records=4, scan_efficiency_ratio=15.37% (166/1.08 K)] @@ -957,7 +957,7 @@ FROM hl_probe p INNER JOIN hl_build AS build ON p.a = build.a AND p.b = build.b; ---- Plan with Metrics -01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)], projection=[a@3, b@4, c@2, e@5], metrics=[output_rows=2, output_batches=1, array_map_created_count=0, build_input_batches=1, build_input_rows=2, input_batches=1, input_rows=2, avg_fanout=100% (2/2), probe_hit_rate=100% (2/2)] +01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)], projection=[a@3, b@4, c@2, e@5], metrics=[output_rows=2, output_batches=1, array_map_created_count=0, build_input_batches=1, build_input_rows=2, input_batches=1, input_rows=2, roaring_map_created_count=0, avg_fanout=100% (2/2), probe_hit_rate=100% (2/2)] 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/hl_build.parquet]]}, projection=[a, b, c], file_type=parquet, metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, predicate_cache_inner_records=0, predicate_cache_records=0, scan_efficiency_ratio=20.48% (214/1.04 K)] 03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/hl_probe.parquet]]}, projection=[a, b, e], file_type=parquet, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND hash_lookup ], pruning_predicate=a_null_count@1 != row_count@2 AND a_max@0 >= aa AND a_null_count@1 != row_count@2 AND a_min@3 <= ab AND b_null_count@5 != row_count@2 AND b_max@4 >= ba AND b_null_count@5 != row_count@2 AND b_min@6 <= bb, required_guarantees=[], metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=2, pushdown_rows_pruned=2, predicate_cache_inner_records=8, predicate_cache_records=4, scan_efficiency_ratio=22.78% (246/1.08 K)] @@ -1006,7 +1006,7 @@ FROM int_build b INNER JOIN int_probe p ON b.id1 = p.id1 AND b.id2 = p.id2; ---- Plan with Metrics -01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(id1@0, id1@0), (id2@1, id2@1)], projection=[id1@0, id2@1, value@2, data@5], metrics=[output_rows=2, output_batches=1, array_map_created_count=0, build_input_batches=1, build_input_rows=2, input_batches=1, input_rows=2, avg_fanout=100% (2/2), probe_hit_rate=100% (2/2)] +01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(id1@0, id1@0), (id2@1, id2@1)], projection=[id1@0, id2@1, value@2, data@5], metrics=[output_rows=2, output_batches=1, array_map_created_count=0, build_input_batches=1, build_input_rows=2, input_batches=1, input_rows=2, roaring_map_created_count=0, avg_fanout=100% (2/2), probe_hit_rate=100% (2/2)] 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/int_build.parquet]]}, projection=[id1, id2, value], file_type=parquet, metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, predicate_cache_inner_records=0, predicate_cache_records=0, scan_efficiency_ratio=19.02% (222/1.17 K)] 03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/int_probe.parquet]]}, projection=[id1, id2, data], file_type=parquet, predicate=DynamicFilter [ id1@0 >= 1 AND id1@0 <= 2 AND id2@1 >= 10 AND id2@1 <= 20 AND hash_lookup ], pruning_predicate=id1_null_count@1 != row_count@2 AND id1_max@0 >= 1 AND id1_null_count@1 != row_count@2 AND id1_min@3 <= 2 AND id2_null_count@5 != row_count@2 AND id2_max@4 >= 10 AND id2_null_count@5 != row_count@2 AND id2_min@6 <= 20, required_guarantees=[], metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=2, pushdown_rows_pruned=2, predicate_cache_inner_records=8, predicate_cache_records=4, scan_efficiency_ratio=21.43% (239/1.11 K)] From a470f37e6418c6ac4efe75eeb07434c6398d8719 Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Thu, 23 Apr 2026 18:09:33 -0700 Subject: [PATCH 08/12] fix_tests --- .../physical-plan/src/joins/hash_join/stream.rs | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index 0d4d1dbe5899..f82f40eb5f36 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -49,7 +49,7 @@ use crate::{ use arrow::array::{Array, ArrayRef, BooleanArray, Int32Array, UInt32Array, UInt64Array}; use arrow::compute::filter_record_batch; use arrow::datatypes::{Schema, SchemaRef}; -use arrow::record_batch::RecordBatch; +use arrow::record_batch::{RecordBatch, RecordBatchOptions}; use arrow_schema::DataType; use datafusion_common::{ JoinSide, JoinType, NullEquality, Result, internal_datafusion_err, internal_err, @@ -751,7 +751,19 @@ impl HashJoinStream { return internal_err!("unsupported data type for roaring bitmap"); } }; - let batch = filter_record_batch(&state.batch, &mask)?; + let filtered_batch = filter_record_batch(&state.batch, &mask)?; + // Apply column projection to match output schema + let columns: Vec = self + .column_indices + .iter() + .map(|col_idx| { + // For RightSemi/RightAnti, all columns are from the right (probe) side + Arc::clone(filtered_batch.column(col_idx.index)) + }) + .collect(); + // Use try_new_with_options to handle empty columns case (e.g., count(*)) + let options = RecordBatchOptions::new().with_row_count(Some(filtered_batch.num_rows())); + let batch = RecordBatch::try_new_with_options(Arc::clone(&self.schema), columns, &options)?; self.output_buffer.push_batch(batch)?; self.state = HashJoinStreamState::FetchProbeBatch; return Ok(StatefulStreamResult::Continue); From fb3f2143d904525e37fd5db2d1022e492833bcb8 Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Thu, 23 Apr 2026 18:14:31 -0700 Subject: [PATCH 09/12] fix_tests --- datafusion/physical-plan/src/joins/hash_join/stream.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index f82f40eb5f36..5b8a6a8fb1f6 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -762,8 +762,13 @@ impl HashJoinStream { }) .collect(); // Use try_new_with_options to handle empty columns case (e.g., count(*)) - let options = RecordBatchOptions::new().with_row_count(Some(filtered_batch.num_rows())); - let batch = RecordBatch::try_new_with_options(Arc::clone(&self.schema), columns, &options)?; + let options = RecordBatchOptions::new() + .with_row_count(Some(filtered_batch.num_rows())); + let batch = RecordBatch::try_new_with_options( + Arc::clone(&self.schema), + columns, + &options, + )?; self.output_buffer.push_batch(batch)?; self.state = HashJoinStreamState::FetchProbeBatch; return Ok(StatefulStreamResult::Continue); From 698dcbb42c72f9989ff9c7c434e6a42dca530732 Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Thu, 23 Apr 2026 21:53:01 -0700 Subject: [PATCH 10/12] support_partition_eval_mode --- .../joins/hash_join/partitioned_hash_eval.rs | 43 ++++++++++++++++--- 1 file changed, 38 insertions(+), 5 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs b/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs index 353c67af3802..43409f95868e 100644 --- a/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs +++ b/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs @@ -20,7 +20,7 @@ use std::{fmt::Display, hash::Hash, sync::Arc}; use arrow::{ - array::{ArrayRef, UInt64Array}, + array::{Array, ArrayRef, BooleanArray, Int32Array, UInt32Array, UInt64Array}, datatypes::{DataType, Schema}, record_batch::RecordBatch, }; @@ -335,10 +335,43 @@ impl PhysicalExpr for HashTableLookupExpr { let array = map.contain_keys(&join_keys)?; Ok(ColumnarValue::Array(Arc::new(array))) } - Map::RoaringMap(_bimap) => { - internal_err!( - "Roaringbitmap is not support for partitioned hash evaluation" - ) + Map::RoaringMap(bitmap) => { + // For roaring bitmap, check membership directly + // Roaring only supports u32 values, so we handle Int32 and UInt32 types + if join_keys.len() != 1 { + return internal_err!( + "Roaring bitmap expects 1 join key, but got {}", + join_keys.len() + ); + } + let key_col = &join_keys[0]; + let contains: BooleanArray = match key_col.data_type() { + DataType::Int32 => { + let arr = key_col + .as_any() + .downcast_ref::() + .expect("Expected Int32Array"); + arr.iter() + .map(|v| v.map(|val| bitmap.contains(val as u32))) + .collect() + } + DataType::UInt32 => { + let arr = key_col + .as_any() + .downcast_ref::() + .expect("Expected UInt32Array"); + arr.iter() + .map(|v| v.map(|val| bitmap.contains(val))) + .collect() + } + other => { + return internal_err!( + "Unsupported data type for roaring bitmap: {:?}", + other + ); + } + }; + Ok(ColumnarValue::Array(Arc::new(contains))) } } } From 16c82bb5ba553c63c9c18a55790da31bea8babca Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Thu, 23 Apr 2026 22:34:20 -0700 Subject: [PATCH 11/12] add_bool_buffers_memebership_test --- .../joins/hash_join/partitioned_hash_eval.rs | 21 ++++++++++----- .../src/joins/hash_join/stream.rs | 27 ++++++++++--------- 2 files changed, 30 insertions(+), 18 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs b/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs index 43409f95868e..532c66592dbd 100644 --- a/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs +++ b/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs @@ -21,6 +21,7 @@ use std::{fmt::Display, hash::Hash, sync::Arc}; use arrow::{ array::{Array, ArrayRef, BooleanArray, Int32Array, UInt32Array, UInt64Array}, + buffer::BooleanBuffer, datatypes::{DataType, Schema}, record_batch::RecordBatch, }; @@ -351,18 +352,26 @@ impl PhysicalExpr for HashTableLookupExpr { .as_any() .downcast_ref::() .expect("Expected Int32Array"); - arr.iter() - .map(|v| v.map(|val| bitmap.contains(val as u32))) - .collect() + let buffer = BooleanBuffer::collect_bool(arr.len(), |i| { + if arr.is_null(i) { + return false; + } + bitmap.contains(arr.value(i) as u32) + }); + BooleanArray::new(buffer, None) } DataType::UInt32 => { let arr = key_col .as_any() .downcast_ref::() .expect("Expected UInt32Array"); - arr.iter() - .map(|v| v.map(|val| bitmap.contains(val))) - .collect() + let buffer = BooleanBuffer::collect_bool(arr.len(), |i| { + if arr.is_null(i) { + return false; + } + bitmap.contains(arr.value(i)) + }); + BooleanArray::new(buffer, None) } other => { return internal_err!( diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index 5b8a6a8fb1f6..cfb9fec61327 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -47,6 +47,7 @@ use crate::{ }; use arrow::array::{Array, ArrayRef, BooleanArray, Int32Array, UInt32Array, UInt64Array}; +use arrow::buffer::BooleanBuffer; use arrow::compute::filter_record_batch; use arrow::datatypes::{Schema, SchemaRef}; use arrow::record_batch::{RecordBatch, RecordBatchOptions}; @@ -731,21 +732,23 @@ impl HashJoinStream { let mask: BooleanArray = match key_col.data_type() { DataType::Int32 => { let arr = key_col.as_any().downcast_ref::().unwrap(); - arr.iter() - .map(|v| match v { - Some(v) => bitmap.contains(v as u32) == is_semi, - None => !is_semi, - }) - .collect() + let buffer = BooleanBuffer::collect_bool(arr.len(), |i| { + if arr.is_null(i) { + return !is_semi; + } + bitmap.contains(arr.value(i) as u32) == is_semi + }); + BooleanArray::new(buffer, None) } DataType::UInt32 => { let arr = key_col.as_any().downcast_ref::().unwrap(); - arr.iter() - .map(|v| match v { - Some(v) => bitmap.contains(v) == is_semi, - None => !is_semi, - }) - .collect() + let buffer = BooleanBuffer::collect_bool(arr.len(), |i| { + if arr.is_null(i) { + return !is_semi; + } + bitmap.contains(arr.value(i)) == is_semi + }); + BooleanArray::new(buffer, None) } _ => { return internal_err!("unsupported data type for roaring bitmap"); From 173bd10c13a94487e9784872c578658bfac9e004 Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Thu, 23 Apr 2026 23:53:44 -0700 Subject: [PATCH 12/12] add_existence_benchmarks_hashjoin --- benchmarks/src/hj.rs | 192 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 192 insertions(+) diff --git a/benchmarks/src/hj.rs b/benchmarks/src/hj.rs index 301fe0d599cd..dbf9f9b92ffa 100644 --- a/benchmarks/src/hj.rs +++ b/benchmarks/src/hj.rs @@ -303,6 +303,198 @@ const HASH_QUERIES: &[HashJoinQuery] = &[ build_size: "100K_(20%_dups)", probe_size: "60M", }, + // RightSemi Join benchmarks with Int32 keys + // Q16: RightSemi, 100% Density, 100% Hit rate + HashJoinQuery { + sql: r###"SELECT l.k + FROM ( + SELECT CAST(l_suppkey AS INT) as k FROM lineitem + ) l + WHERE EXISTS ( + SELECT 1 FROM (SELECT CAST(s_suppkey AS INT) as k FROM supplier) s WHERE s.k = l.k + )"###, + density: 1.0, + prob_hit: 1.0, + build_size: "100K", + probe_size: "60M_RightSemi", + }, + // Q17: RightSemi, 100% Density, 10% Hit rate + HashJoinQuery { + sql: r###"SELECT l.k + FROM ( + SELECT CAST(CASE WHEN l_suppkey % 10 = 0 THEN l_suppkey ELSE l_suppkey + 1000000 END AS INT) as k + FROM lineitem + ) l + WHERE EXISTS ( + SELECT 1 FROM (SELECT CAST(s_suppkey AS INT) as k FROM supplier) s WHERE s.k = l.k + )"###, + density: 1.0, + prob_hit: 0.1, + build_size: "100K", + probe_size: "60M_RightSemi", + }, + // Q18: RightSemi, 50% Density, 100% Hit rate + HashJoinQuery { + sql: r###"SELECT l.k + FROM ( + SELECT CAST(l_suppkey * 2 AS INT) as k FROM lineitem + ) l + WHERE EXISTS ( + SELECT 1 FROM (SELECT CAST(s_suppkey * 2 AS INT) as k FROM supplier) s WHERE s.k = l.k + )"###, + density: 0.5, + prob_hit: 1.0, + build_size: "100K", + probe_size: "60M_RightSemi", + }, + // Q19: RightSemi, 50% Density, 10% Hit rate + HashJoinQuery { + sql: r###"SELECT l.k + FROM ( + SELECT CAST(CASE + WHEN l_suppkey % 10 = 0 THEN l_suppkey * 2 + WHEN l_suppkey % 10 < 9 THEN l_suppkey * 2 + 1 + ELSE l_suppkey * 2 + 1000000 + END AS INT) as k + FROM lineitem + ) l + WHERE EXISTS ( + SELECT 1 FROM (SELECT CAST(s_suppkey * 2 AS INT) as k FROM supplier) s WHERE s.k = l.k + )"###, + density: 0.5, + prob_hit: 0.1, + build_size: "100K", + probe_size: "60M_RightSemi", + }, + // Q20: RightSemi, 10% Density, 100% Hit rate + HashJoinQuery { + sql: r###"SELECT l.k + FROM ( + SELECT CAST(l_suppkey * 10 AS INT) as k FROM lineitem + ) l + WHERE EXISTS ( + SELECT 1 FROM (SELECT CAST(s_suppkey * 10 AS INT) as k FROM supplier) s WHERE s.k = l.k + )"###, + density: 0.1, + prob_hit: 1.0, + build_size: "100K", + probe_size: "60M_RightSemi", + }, + // Q21: RightSemi, 10% Density, 10% Hit rate + HashJoinQuery { + sql: r###"SELECT l.k + FROM ( + SELECT CAST(CASE + WHEN l_suppkey % 10 = 0 THEN l_suppkey * 10 + WHEN l_suppkey % 10 < 9 THEN l_suppkey * 10 + 1 + ELSE l_suppkey * 10 + 1000000 + END AS INT) as k + FROM lineitem + ) l + WHERE EXISTS ( + SELECT 1 FROM (SELECT CAST(s_suppkey * 10 AS INT) as k FROM supplier) s WHERE s.k = l.k + )"###, + density: 0.1, + prob_hit: 0.1, + build_size: "100K", + probe_size: "60M_RightSemi", + }, + // RightAnti Join benchmarks with Int32 keys + // Q22: RightAnti, 100% Density, 100% Hit rate (no output) + HashJoinQuery { + sql: r###"SELECT l.k + FROM ( + SELECT CAST(l_suppkey AS INT) as k FROM lineitem + ) l + WHERE NOT EXISTS ( + SELECT 1 FROM (SELECT CAST(s_suppkey AS INT) as k FROM supplier) s WHERE s.k = l.k + )"###, + density: 1.0, + prob_hit: 1.0, + build_size: "100K", + probe_size: "60M_RightAnti", + }, + // Q23: RightAnti, 100% Density, 10% Hit rate (90% output) + HashJoinQuery { + sql: r###"SELECT l.k + FROM ( + SELECT CAST(CASE WHEN l_suppkey % 10 = 0 THEN l_suppkey ELSE l_suppkey + 1000000 END AS INT) as k + FROM lineitem + ) l + WHERE NOT EXISTS ( + SELECT 1 FROM (SELECT CAST(s_suppkey AS INT) as k FROM supplier) s WHERE s.k = l.k + )"###, + density: 1.0, + prob_hit: 0.1, + build_size: "100K", + probe_size: "60M_RightAnti", + }, + // Q24: RightAnti, 50% Density, 100% Hit rate (no output) + HashJoinQuery { + sql: r###"SELECT l.k + FROM ( + SELECT CAST(l_suppkey * 2 AS INT) as k FROM lineitem + ) l + WHERE NOT EXISTS ( + SELECT 1 FROM (SELECT CAST(s_suppkey * 2 AS INT) as k FROM supplier) s WHERE s.k = l.k + )"###, + density: 0.5, + prob_hit: 1.0, + build_size: "100K", + probe_size: "60M_RightAnti", + }, + // Q25: RightAnti, 50% Density, 10% Hit rate (90% output) + HashJoinQuery { + sql: r###"SELECT l.k + FROM ( + SELECT CAST(CASE + WHEN l_suppkey % 10 = 0 THEN l_suppkey * 2 + WHEN l_suppkey % 10 < 9 THEN l_suppkey * 2 + 1 + ELSE l_suppkey * 2 + 1000000 + END AS INT) as k + FROM lineitem + ) l + WHERE NOT EXISTS ( + SELECT 1 FROM (SELECT CAST(s_suppkey * 2 AS INT) as k FROM supplier) s WHERE s.k = l.k + )"###, + density: 0.5, + prob_hit: 0.1, + build_size: "100K", + probe_size: "60M_RightAnti", + }, + // Q26: RightAnti, 10% Density, 100% Hit rate (no output) + HashJoinQuery { + sql: r###"SELECT l.k + FROM ( + SELECT CAST(l_suppkey * 10 AS INT) as k FROM lineitem + ) l + WHERE NOT EXISTS ( + SELECT 1 FROM (SELECT CAST(s_suppkey * 10 AS INT) as k FROM supplier) s WHERE s.k = l.k + )"###, + density: 0.1, + prob_hit: 1.0, + build_size: "100K", + probe_size: "60M_RightAnti", + }, + // Q27: RightAnti, 10% Density, 10% Hit rate (90% output) + HashJoinQuery { + sql: r###"SELECT l.k + FROM ( + SELECT CAST(CASE + WHEN l_suppkey % 10 = 0 THEN l_suppkey * 10 + WHEN l_suppkey % 10 < 9 THEN l_suppkey * 10 + 1 + ELSE l_suppkey * 10 + 1000000 + END AS INT) as k + FROM lineitem + ) l + WHERE NOT EXISTS ( + SELECT 1 FROM (SELECT CAST(s_suppkey * 10 AS INT) as k FROM supplier) s WHERE s.k = l.k + )"###, + density: 0.1, + prob_hit: 0.1, + build_size: "100K", + probe_size: "60M_RightAnti", + }, ]; impl RunOpt {