diff --git a/Cargo.lock b/Cargo.lock index 5a76c063bbfad..7bc949671f4f8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1198,6 +1198,12 @@ version = "3.20.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5d20789868f4b01b2f2caec9f5c4e0213b41e3e5702a50157d699ae31ced2fcb" +[[package]] +name = "bytemuck" +version = "1.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8efb64bd706a16a1bdde310ae86b351e4d21550d98d056f22f8a7f7a2183fec" + [[package]] name = "byteorder" version = "1.5.0" @@ -2531,6 +2537,7 @@ dependencies = [ "parking_lot", "pin-project-lite", "rand 0.9.4", + "roaring", "rstest", "rstest_reuse", "tokio", @@ -5292,6 +5299,16 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "roaring" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ba9ce64a8f45d7fc86358410bb1a82e8c987504c0d4900e9141d69a9f26c885" +dependencies = [ + "bytemuck", + "byteorder", +] + [[package]] name = "rstest" version = "0.26.1" diff --git a/Cargo.toml b/Cargo.toml index 82081c1e42930..3f595843c3a55 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -188,6 +188,7 @@ prost = "0.14.1" rand = "0.9" recursive = "0.1.1" regex = "1.12" +roaring = "0.11.3" rstest = "0.26.1" serde_json = "1" sha2 = "^0.10.9" diff --git a/benchmarks/src/hj.rs b/benchmarks/src/hj.rs index 301fe0d599cd6..dbf9f9b92ffa2 100644 --- a/benchmarks/src/hj.rs +++ b/benchmarks/src/hj.rs @@ -303,6 +303,198 @@ const HASH_QUERIES: &[HashJoinQuery] = &[ build_size: "100K_(20%_dups)", probe_size: "60M", }, + // RightSemi Join benchmarks with Int32 keys + // Q16: RightSemi, 100% Density, 100% Hit rate + HashJoinQuery { + sql: r###"SELECT l.k + FROM ( + SELECT CAST(l_suppkey AS INT) as k FROM lineitem + ) l + WHERE EXISTS ( + SELECT 1 FROM (SELECT CAST(s_suppkey AS INT) as k FROM supplier) s WHERE s.k = l.k + )"###, + density: 1.0, + prob_hit: 1.0, + build_size: "100K", + probe_size: "60M_RightSemi", + }, + // Q17: RightSemi, 100% Density, 10% Hit rate + HashJoinQuery { + sql: r###"SELECT l.k + FROM ( + SELECT CAST(CASE WHEN l_suppkey % 10 = 0 THEN l_suppkey ELSE l_suppkey + 1000000 END AS INT) as k + FROM lineitem + ) l + WHERE EXISTS ( + SELECT 1 FROM (SELECT CAST(s_suppkey AS INT) as k FROM supplier) s WHERE s.k = l.k + )"###, + density: 1.0, + prob_hit: 0.1, + build_size: "100K", + probe_size: "60M_RightSemi", + }, + // Q18: RightSemi, 50% Density, 100% Hit rate + HashJoinQuery { + sql: r###"SELECT l.k + FROM ( + SELECT CAST(l_suppkey * 2 AS INT) as k FROM lineitem + ) l + WHERE EXISTS ( + SELECT 1 FROM (SELECT CAST(s_suppkey * 2 AS INT) as k FROM supplier) s WHERE s.k = l.k + )"###, + density: 0.5, + prob_hit: 1.0, + build_size: "100K", + probe_size: "60M_RightSemi", + }, + // Q19: RightSemi, 50% Density, 10% Hit rate + HashJoinQuery { + sql: r###"SELECT l.k + FROM ( + SELECT CAST(CASE + WHEN l_suppkey % 10 = 0 THEN l_suppkey * 2 + WHEN l_suppkey % 10 < 9 THEN l_suppkey * 2 + 1 + ELSE l_suppkey * 2 + 1000000 + END AS INT) as k + FROM lineitem + ) l + WHERE EXISTS ( + SELECT 1 FROM (SELECT CAST(s_suppkey * 2 AS INT) as k FROM supplier) s WHERE s.k = l.k + )"###, + density: 0.5, + prob_hit: 0.1, + build_size: "100K", + probe_size: "60M_RightSemi", + }, + // Q20: RightSemi, 10% Density, 100% Hit rate + HashJoinQuery { + sql: r###"SELECT l.k + FROM ( + SELECT CAST(l_suppkey * 10 AS INT) as k FROM lineitem + ) l + WHERE EXISTS ( + SELECT 1 FROM (SELECT CAST(s_suppkey * 10 AS INT) as k FROM supplier) s WHERE s.k = l.k + )"###, + density: 0.1, + prob_hit: 1.0, + build_size: "100K", + probe_size: "60M_RightSemi", + }, + // Q21: RightSemi, 10% Density, 10% Hit rate + HashJoinQuery { + sql: r###"SELECT l.k + FROM ( + SELECT CAST(CASE + WHEN l_suppkey % 10 = 0 THEN l_suppkey * 10 + WHEN l_suppkey % 10 < 9 THEN l_suppkey * 10 + 1 + ELSE l_suppkey * 10 + 1000000 + END AS INT) as k + FROM lineitem + ) l + WHERE EXISTS ( + SELECT 1 FROM (SELECT CAST(s_suppkey * 10 AS INT) as k FROM supplier) s WHERE s.k = l.k + )"###, + density: 0.1, + prob_hit: 0.1, + build_size: "100K", + probe_size: "60M_RightSemi", + }, + // RightAnti Join benchmarks with Int32 keys + // Q22: RightAnti, 100% Density, 100% Hit rate (no output) + HashJoinQuery { + sql: r###"SELECT l.k + FROM ( + SELECT CAST(l_suppkey AS INT) as k FROM lineitem + ) l + WHERE NOT EXISTS ( + SELECT 1 FROM (SELECT CAST(s_suppkey AS INT) as k FROM supplier) s WHERE s.k = l.k + )"###, + density: 1.0, + prob_hit: 1.0, + build_size: "100K", + probe_size: "60M_RightAnti", + }, + // Q23: RightAnti, 100% Density, 10% Hit rate (90% output) + HashJoinQuery { + sql: r###"SELECT l.k + FROM ( + SELECT CAST(CASE WHEN l_suppkey % 10 = 0 THEN l_suppkey ELSE l_suppkey + 1000000 END AS INT) as k + FROM lineitem + ) l + WHERE NOT EXISTS ( + SELECT 1 FROM (SELECT CAST(s_suppkey AS INT) as k FROM supplier) s WHERE s.k = l.k + )"###, + density: 1.0, + prob_hit: 0.1, + build_size: "100K", + probe_size: "60M_RightAnti", + }, + // Q24: RightAnti, 50% Density, 100% Hit rate (no output) + HashJoinQuery { + sql: r###"SELECT l.k + FROM ( + SELECT CAST(l_suppkey * 2 AS INT) as k FROM lineitem + ) l + WHERE NOT EXISTS ( + SELECT 1 FROM (SELECT CAST(s_suppkey * 2 AS INT) as k FROM supplier) s WHERE s.k = l.k + )"###, + density: 0.5, + prob_hit: 1.0, + build_size: "100K", + probe_size: "60M_RightAnti", + }, + // Q25: RightAnti, 50% Density, 10% Hit rate (90% output) + HashJoinQuery { + sql: r###"SELECT l.k + FROM ( + SELECT CAST(CASE + WHEN l_suppkey % 10 = 0 THEN l_suppkey * 2 + WHEN l_suppkey % 10 < 9 THEN l_suppkey * 2 + 1 + ELSE l_suppkey * 2 + 1000000 + END AS INT) as k + FROM lineitem + ) l + WHERE NOT EXISTS ( + SELECT 1 FROM (SELECT CAST(s_suppkey * 2 AS INT) as k FROM supplier) s WHERE s.k = l.k + )"###, + density: 0.5, + prob_hit: 0.1, + build_size: "100K", + probe_size: "60M_RightAnti", + }, + // Q26: RightAnti, 10% Density, 100% Hit rate (no output) + HashJoinQuery { + sql: r###"SELECT l.k + FROM ( + SELECT CAST(l_suppkey * 10 AS INT) as k FROM lineitem + ) l + WHERE NOT EXISTS ( + SELECT 1 FROM (SELECT CAST(s_suppkey * 10 AS INT) as k FROM supplier) s WHERE s.k = l.k + )"###, + density: 0.1, + prob_hit: 1.0, + build_size: "100K", + probe_size: "60M_RightAnti", + }, + // Q27: RightAnti, 10% Density, 10% Hit rate (90% output) + HashJoinQuery { + sql: r###"SELECT l.k + FROM ( + SELECT CAST(CASE + WHEN l_suppkey % 10 = 0 THEN l_suppkey * 10 + WHEN l_suppkey % 10 < 9 THEN l_suppkey * 10 + 1 + ELSE l_suppkey * 10 + 1000000 + END AS INT) as k + FROM lineitem + ) l + WHERE NOT EXISTS ( + SELECT 1 FROM (SELECT CAST(s_suppkey * 10 AS INT) as k FROM supplier) s WHERE s.k = l.k + )"###, + density: 0.1, + prob_hit: 0.1, + build_size: "100K", + probe_size: "60M_RightAnti", + }, ]; impl RunOpt { diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index 374fc275a06e0..d87550b412c42 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -70,6 +70,7 @@ log = { workspace = true } num-traits = { workspace = true } parking_lot = { workspace = true } pin-project-lite = "^0.2.7" +roaring = { workspace = true } tokio = { workspace = true } [dev-dependencies] diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 0eca270ebb06f..297821b4f0532 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -65,7 +65,7 @@ use crate::{ metrics::{ExecutionPlanMetricsSet, MetricsSet}, }; -use arrow::array::{ArrayRef, BooleanBufferBuilder}; +use arrow::array::{Array, ArrayRef, BooleanBufferBuilder, Int32Array, UInt32Array}; use arrow::compute::concat_batches; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; @@ -89,19 +89,19 @@ use datafusion_physical_expr::expressions::{Column, DynamicFilterPhysicalExpr, l use datafusion_physical_expr::projection::{ProjectionRef, combine_projections}; use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef}; +use super::partitioned_hash_eval::SeededRandomState; use datafusion_common::hash_utils::RandomState; use datafusion_physical_expr_common::physical_expr::fmt_sql; use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays; use futures::TryStreamExt; use parking_lot::Mutex; -use super::partitioned_hash_eval::SeededRandomState; - /// Hard-coded seed to ensure hash values from the hash join differ from `RepartitionExec`, avoiding collisions. pub(crate) const HASH_JOIN_SEED: SeededRandomState = SeededRandomState::with_seed(12210250226015887276); const ARRAY_MAP_CREATED_COUNT_METRIC_NAME: &str = "array_map_created_count"; +const ROARING_MAP_CREATED_COUNT_METRIC_NAME: &str = "roaring_map_created_count"; #[expect(clippy::too_many_arguments)] fn try_create_array_map( @@ -1316,6 +1316,9 @@ impl ExecutionPlan for HashJoinExec { .with_category(MetricCategory::Rows) .counter(ARRAY_MAP_CREATED_COUNT_METRIC_NAME, partition); + let roaring_map_created_count = MetricBuilder::new(&self.metrics) + .with_category(MetricCategory::Rows) + .counter(ROARING_MAP_CREATED_COUNT_METRIC_NAME, partition); // Initialize build_accumulator lazily with runtime partition counts (only if enabled) // Use RepartitionExec's random state (seeds: 0,0,0,0) for partition routing let repartition_random_state = REPARTITION_RANDOM_STATE; @@ -1362,6 +1365,9 @@ impl ExecutionPlan for HashJoinExec { Arc::clone(context.session_config().options()), self.null_equality, array_map_created_count, + roaring_map_created_count, + self.join_type, + self.filter.is_some(), )) })?, PartitionMode::Partitioned => { @@ -1382,6 +1388,9 @@ impl ExecutionPlan for HashJoinExec { Arc::clone(context.session_config().options()), self.null_equality, array_map_created_count, + roaring_map_created_count, + self.join_type, + self.filter.is_some(), )) } PartitionMode::Auto => { @@ -1854,6 +1863,31 @@ fn should_collect_min_max_for_perfect_hash( Ok(ArrayMap::is_supported_type(&data_type)) } +fn should_use_roaring_bitmap( + join_type: &JoinType, + on_left: &[PhysicalExprRef], + schema: &Schema, + has_filter: bool, +) -> bool { + if has_filter { + return false; + } + if !matches!(join_type, JoinType::RightSemi | JoinType::RightAnti) { + return false; + } + if on_left.len() != 1 { + return false; + } + let left_key = &on_left[0]; + if !matches!( + left_key.data_type(schema), + Ok(DataType::Int32) | Ok(DataType::UInt32) + ) { + return false; + } + true +} + /// Collects all batches from the left (build) side stream and creates a hash map for joining. /// /// This function is responsible for: @@ -1895,6 +1929,9 @@ async fn collect_left_input( config: Arc, null_equality: NullEquality, array_map_created_count: Count, + roaring_map_created_count: Count, + join_type: JoinType, + has_filter: bool, ) -> Result { let schema = left_stream.schema(); @@ -1956,7 +1993,33 @@ async fn collect_left_input( }; let (join_hash_map, batch, left_values) = - if let Some((array_map, batch, left_value)) = try_create_array_map( + if config.execution.perfect_hash_join_small_build_threshold > 0 + && should_use_roaring_bitmap(&join_type, &on_left, &schema, has_filter) + { + let batch = concat_batches(&schema, &batches)?; + let left_values = evaluate_expressions_to_arrays(&on_left, &batch)?; + let key_col = &left_values[0]; + let bitmap = match key_col.data_type() { + DataType::UInt32 => key_col + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .flatten() + .collect(), + DataType::Int32 => key_col + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .flatten() + .map(|v| v as u32) + .collect(), + _ => return internal_err!("unsupported data type to build bitmap"), + }; + roaring_map_created_count.add(1); + (Map::RoaringMap(bitmap), batch, left_values) + } else if let Some((array_map, batch, left_value)) = try_create_array_map( &bounds, &schema, &batches, @@ -2089,21 +2152,25 @@ mod tests { use super::*; fn assert_phj_used(metrics: &MetricsSet, use_phj: bool) { + let array_map_count = metrics + .sum_by_name(ARRAY_MAP_CREATED_COUNT_METRIC_NAME) + .map(|v| v.as_usize()) + .unwrap_or(0); + let roaring_map_count = metrics + .sum_by_name(ROARING_MAP_CREATED_COUNT_METRIC_NAME) + .map(|v| v.as_usize()) + .unwrap_or(0); + if use_phj { assert!( - metrics - .sum_by_name(ARRAY_MAP_CREATED_COUNT_METRIC_NAME) - .expect("should have array_map_created_count metrics") - .as_usize() - >= 1 + array_map_count >= 1 || roaring_map_count >= 1, + "Expected either array_map or roaring_map to be created" ); } else { assert_eq!( - metrics - .sum_by_name(ARRAY_MAP_CREATED_COUNT_METRIC_NAME) - .map(|v| v.as_usize()) - .unwrap_or(0), - 0 + array_map_count + roaring_map_count, + 0, + "Expected no array_map or roaring_map to be created" ) } } diff --git a/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs b/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs index 0daac0bb86a75..532c66592dbdb 100644 --- a/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs +++ b/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs @@ -20,13 +20,14 @@ use std::{fmt::Display, hash::Hash, sync::Arc}; use arrow::{ - array::{ArrayRef, UInt64Array}, + array::{Array, ArrayRef, BooleanArray, Int32Array, UInt32Array, UInt64Array}, + buffer::BooleanBuffer, datatypes::{DataType, Schema}, record_batch::RecordBatch, }; -use datafusion_common::Result; use datafusion_common::hash_utils::RandomState; use datafusion_common::hash_utils::{create_hashes, with_hashes}; +use datafusion_common::{Result, internal_err}; use datafusion_expr::ColumnarValue; use datafusion_physical_expr_common::physical_expr::{ DynHash, PhysicalExpr, PhysicalExprRef, @@ -335,6 +336,52 @@ impl PhysicalExpr for HashTableLookupExpr { let array = map.contain_keys(&join_keys)?; Ok(ColumnarValue::Array(Arc::new(array))) } + Map::RoaringMap(bitmap) => { + // For roaring bitmap, check membership directly + // Roaring only supports u32 values, so we handle Int32 and UInt32 types + if join_keys.len() != 1 { + return internal_err!( + "Roaring bitmap expects 1 join key, but got {}", + join_keys.len() + ); + } + let key_col = &join_keys[0]; + let contains: BooleanArray = match key_col.data_type() { + DataType::Int32 => { + let arr = key_col + .as_any() + .downcast_ref::() + .expect("Expected Int32Array"); + let buffer = BooleanBuffer::collect_bool(arr.len(), |i| { + if arr.is_null(i) { + return false; + } + bitmap.contains(arr.value(i) as u32) + }); + BooleanArray::new(buffer, None) + } + DataType::UInt32 => { + let arr = key_col + .as_any() + .downcast_ref::() + .expect("Expected UInt32Array"); + let buffer = BooleanBuffer::collect_bool(arr.len(), |i| { + if arr.is_null(i) { + return false; + } + bitmap.contains(arr.value(i)) + }); + BooleanArray::new(buffer, None) + } + other => { + return internal_err!( + "Unsupported data type for roaring bitmap: {:?}", + other + ); + } + }; + Ok(ColumnarValue::Array(Arc::new(contains))) + } } } diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index 9885fb5c5c70a..cfb9fec613273 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -46,9 +46,12 @@ use crate::{ }, }; -use arrow::array::{Array, ArrayRef, UInt32Array, UInt64Array}; +use arrow::array::{Array, ArrayRef, BooleanArray, Int32Array, UInt32Array, UInt64Array}; +use arrow::buffer::BooleanBuffer; +use arrow::compute::filter_record_batch; use arrow::datatypes::{Schema, SchemaRef}; -use arrow::record_batch::RecordBatch; +use arrow::record_batch::{RecordBatch, RecordBatchOptions}; +use arrow_schema::DataType; use datafusion_common::{ JoinSide, JoinType, NullEquality, Result, internal_datafusion_err, internal_err, }; @@ -723,6 +726,56 @@ impl HashJoinStream { next_offset, ) } + Map::RoaringMap(bitmap) => { + let key_col = &state.values[0]; + let is_semi = matches!(self.join_type, JoinType::RightSemi); + let mask: BooleanArray = match key_col.data_type() { + DataType::Int32 => { + let arr = key_col.as_any().downcast_ref::().unwrap(); + let buffer = BooleanBuffer::collect_bool(arr.len(), |i| { + if arr.is_null(i) { + return !is_semi; + } + bitmap.contains(arr.value(i) as u32) == is_semi + }); + BooleanArray::new(buffer, None) + } + DataType::UInt32 => { + let arr = key_col.as_any().downcast_ref::().unwrap(); + let buffer = BooleanBuffer::collect_bool(arr.len(), |i| { + if arr.is_null(i) { + return !is_semi; + } + bitmap.contains(arr.value(i)) == is_semi + }); + BooleanArray::new(buffer, None) + } + _ => { + return internal_err!("unsupported data type for roaring bitmap"); + } + }; + let filtered_batch = filter_record_batch(&state.batch, &mask)?; + // Apply column projection to match output schema + let columns: Vec = self + .column_indices + .iter() + .map(|col_idx| { + // For RightSemi/RightAnti, all columns are from the right (probe) side + Arc::clone(filtered_batch.column(col_idx.index)) + }) + .collect(); + // Use try_new_with_options to handle empty columns case (e.g., count(*)) + let options = RecordBatchOptions::new() + .with_row_count(Some(filtered_batch.num_rows())); + let batch = RecordBatch::try_new_with_options( + Arc::clone(&self.schema), + columns, + &options, + )?; + self.output_buffer.push_batch(batch)?; + self.state = HashJoinStreamState::FetchProbeBatch; + return Ok(StatefulStreamResult::Continue); + } }; let distinct_right_indices_count = count_distinct_sorted_indices(&right_indices); diff --git a/datafusion/physical-plan/src/joins/mod.rs b/datafusion/physical-plan/src/joins/mod.rs index 2cdfa1e6ac020..ab26b347b5eac 100644 --- a/datafusion/physical-plan/src/joins/mod.rs +++ b/datafusion/physical-plan/src/joins/mod.rs @@ -25,6 +25,7 @@ pub use hash_join::{ }; pub use nested_loop_join::{NestedLoopJoinExec, NestedLoopJoinExecBuilder}; use parking_lot::Mutex; +use roaring::RoaringBitmap; // Note: SortMergeJoin is not used in plans yet pub use piecewise_merge_join::PiecewiseMergeJoinExec; pub use sort_merge_join::SortMergeJoinExec; @@ -53,6 +54,8 @@ use utils::JoinHashMapType; pub enum Map { HashMap(Box), ArrayMap(ArrayMap), + // optimized path for single int join keys + RoaringMap(RoaringBitmap), } impl Map { @@ -61,6 +64,7 @@ impl Map { match self { Map::HashMap(map) => map.len(), Map::ArrayMap(array_map) => array_map.num_of_distinct_key(), + Map::RoaringMap(bitmap) => bitmap.len() as usize, } } diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index e0be63fe71525..c49d0df706768 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -5148,7 +5148,7 @@ LEFT ANTI JOIN ( ) t2 ON t1.k = t2.k; ---- Plan with Metrics -01)HashJoinExec: mode=CollectLeft, join_type=RightAnti, on=[(k@0, k@0)], metrics=[output_rows=2, elapsed_compute=, output_bytes=, output_batches=1, array_map_created_count=0, build_input_batches=0, build_input_rows=0, input_batches=1, input_rows=2, build_mem_used=, build_time=, join_time=, avg_fanout=N/A (0/0), probe_hit_rate=0% (0/2)] +01)HashJoinExec: mode=CollectLeft, join_type=RightAnti, on=[(k@0, k@0)], metrics=[output_rows=2, elapsed_compute=, output_bytes=, output_batches=1, array_map_created_count=0, build_input_batches=0, build_input_rows=0, input_batches=1, input_rows=2, roaring_map_created_count=0, build_mem_used=, build_time=, join_time=, avg_fanout=N/A (0/0), probe_hit_rate=0% (0/2)] 02)--ProjectionExec: expr=[column1@0 as k], metrics=[output_rows=0, elapsed_compute=, output_bytes=, output_batches=0, expr_0_eval_time=] 03)----FilterExec: column1@0 != 1, metrics=[output_rows=0, elapsed_compute=, output_bytes=, output_batches=0, selectivity=0% (0/1)] 04)------DataSourceExec: partitions=1, partition_sizes=[1], metrics=[] diff --git a/datafusion/sqllogictest/test_files/push_down_filter_parquet.slt b/datafusion/sqllogictest/test_files/push_down_filter_parquet.slt index 8469c32a17033..0b902c24202dc 100644 --- a/datafusion/sqllogictest/test_files/push_down_filter_parquet.slt +++ b/datafusion/sqllogictest/test_files/push_down_filter_parquet.slt @@ -387,7 +387,7 @@ FROM join_probe p INNER JOIN join_build AS build ON p.a = build.a AND p.b = build.b; ---- Plan with Metrics -01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)], projection=[a@3, b@4, c@2, e@5], metrics=[output_rows=2, output_batches=1, array_map_created_count=0, build_input_batches=1, build_input_rows=2, input_batches=1, input_rows=2, avg_fanout=100% (2/2), probe_hit_rate=100% (2/2)] +01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)], projection=[a@3, b@4, c@2, e@5], metrics=[output_rows=2, output_batches=1, array_map_created_count=0, build_input_batches=1, build_input_rows=2, input_batches=1, input_rows=2, roaring_map_created_count=0, avg_fanout=100% (2/2), probe_hit_rate=100% (2/2)] 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/join_build.parquet]]}, projection=[a, b, c], file_type=parquet, metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, predicate_cache_inner_records=0, predicate_cache_records=0, scan_efficiency_ratio=20.48% (214/1.04 K)] 03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/join_probe.parquet]]}, projection=[a, b, e], file_type=parquet, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}, {c0:ab,c1:bb}]) ], pruning_predicate=a_null_count@1 != row_count@2 AND a_max@0 >= aa AND a_null_count@1 != row_count@2 AND a_min@3 <= ab AND b_null_count@5 != row_count@2 AND b_max@4 >= ba AND b_null_count@5 != row_count@2 AND b_min@6 <= bb, required_guarantees=[], metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=2, pushdown_rows_pruned=2, predicate_cache_inner_records=8, predicate_cache_records=4, scan_efficiency_ratio=22.78% (246/1.08 K)] @@ -472,8 +472,8 @@ INNER JOIN nested_t2 ON nested_t1.a = nested_t2.b INNER JOIN nested_t3 ON nested_t2.c = nested_t3.d; ---- Plan with Metrics -01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(c@3, d@0)], metrics=[output_rows=2, output_batches=1, array_map_created_count=0, build_input_batches=1, build_input_rows=2, input_batches=1, input_rows=2, avg_fanout=100% (2/2), probe_hit_rate=100% (2/2)] -02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, b@0)], metrics=[output_rows=2, output_batches=1, array_map_created_count=0, build_input_batches=1, build_input_rows=2, input_batches=1, input_rows=2, avg_fanout=100% (2/2), probe_hit_rate=100% (2/2)] +01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(c@3, d@0)], metrics=[output_rows=2, output_batches=1, array_map_created_count=0, build_input_batches=1, build_input_rows=2, input_batches=1, input_rows=2, roaring_map_created_count=0, avg_fanout=100% (2/2), probe_hit_rate=100% (2/2)] +02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, b@0)], metrics=[output_rows=2, output_batches=1, array_map_created_count=0, build_input_batches=1, build_input_rows=2, input_batches=1, input_rows=2, roaring_map_created_count=0, avg_fanout=100% (2/2), probe_hit_rate=100% (2/2)] 03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/nested_t1.parquet]]}, projection=[a, x], file_type=parquet, metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, predicate_cache_inner_records=0, predicate_cache_records=0, scan_efficiency_ratio=18.23% (144/790)] 04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/nested_t2.parquet]]}, projection=[b, c, y], file_type=parquet, predicate=DynamicFilter [ b@0 >= aa AND b@0 <= ab AND b@0 IN (SET) ([aa, ab]) ], pruning_predicate=b_null_count@1 != row_count@2 AND b_max@0 >= aa AND b_null_count@1 != row_count@2 AND b_min@3 <= ab AND (b_null_count@1 != row_count@2 AND b_min@3 <= aa AND aa <= b_max@0 OR b_null_count@1 != row_count@2 AND b_min@3 <= ab AND ab <= b_max@0), required_guarantees=[b in (aa, ab)], metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=1 total → 1 matched, page_index_rows_pruned=5 total → 5 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=2, pushdown_rows_pruned=3, predicate_cache_inner_records=5, predicate_cache_records=2, scan_efficiency_ratio=23.2% (252/1.09 K)] 05)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/nested_t3.parquet]]}, projection=[d, z], file_type=parquet, predicate=DynamicFilter [ d@0 >= ca AND d@0 <= cb AND hash_lookup ], pruning_predicate=d_null_count@1 != row_count@2 AND d_max@0 >= ca AND d_null_count@1 != row_count@2 AND d_min@3 <= cb, required_guarantees=[], metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=1 total → 1 matched, page_index_rows_pruned=8 total → 8 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=2, pushdown_rows_pruned=6, predicate_cache_inner_records=8, predicate_cache_records=2, scan_efficiency_ratio=22.12% (184/832)] @@ -604,7 +604,7 @@ LIMIT 2; ---- Plan with Metrics 01)SortExec: TopK(fetch=2), expr=[e@0 ASC NULLS LAST], preserve_partitioning=[false], filter=[e@0 < bb], metrics=[output_rows=2, output_batches=1, row_replacements=2] -02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, d@0)], projection=[e@2], metrics=[output_rows=2, output_batches=1, array_map_created_count=0, build_input_batches=1, build_input_rows=2, input_batches=1, input_rows=2, avg_fanout=100% (2/2), probe_hit_rate=100% (2/2)] +02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, d@0)], projection=[e@2], metrics=[output_rows=2, output_batches=1, array_map_created_count=0, build_input_batches=1, build_input_rows=2, input_batches=1, input_rows=2, roaring_map_created_count=0, avg_fanout=100% (2/2), probe_hit_rate=100% (2/2)] 03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/topk_join_build.parquet]]}, projection=[a], file_type=parquet, metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, predicate_cache_inner_records=0, predicate_cache_records=0, scan_efficiency_ratio=6.7% (70/1.04 K)] 04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/topk_join_probe.parquet]]}, projection=[d, e], file_type=parquet, predicate=DynamicFilter [ d@0 >= aa AND d@0 <= ab AND d@0 IN (SET) ([aa, ab]) ] AND DynamicFilter [ e@1 < bb ], pruning_predicate=d_null_count@1 != row_count@2 AND d_max@0 >= aa AND d_null_count@1 != row_count@2 AND d_min@3 <= ab AND (d_null_count@1 != row_count@2 AND d_min@3 <= aa AND aa <= d_max@0 OR d_null_count@1 != row_count@2 AND d_min@3 <= ab AND ab <= d_max@0) AND e_null_count@5 != row_count@2 AND e_min@4 < bb, required_guarantees=[d in (aa, ab)], metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=1 total → 1 matched, page_index_rows_pruned=4 total → 4 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=2, pushdown_rows_pruned=2, predicate_cache_inner_records=8, predicate_cache_records=4, scan_efficiency_ratio=15.37% (166/1.08 K)] @@ -738,7 +738,7 @@ INNER JOIN ( ) agg ON b.a = agg.a; ---- Plan with Metrics -01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0)], projection=[a@0, min_value@2], metrics=[output_rows=2, output_batches=2, array_map_created_count=0, build_input_batches=1, build_input_rows=2, input_batches=2, input_rows=2, avg_fanout=100% (2/2), probe_hit_rate=100% (2/2)] +01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0)], projection=[a@0, min_value@2], metrics=[output_rows=2, output_batches=2, array_map_created_count=0, build_input_batches=1, build_input_rows=2, input_batches=2, input_rows=2, roaring_map_created_count=0, avg_fanout=100% (2/2), probe_hit_rate=100% (2/2)] 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/join_agg_build.parquet]]}, projection=[a], file_type=parquet, metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, predicate_cache_inner_records=0, predicate_cache_records=0, scan_efficiency_ratio=15.32% (70/457)] 03)--ProjectionExec: expr=[a@0 as a, min(join_agg_probe.value)@1 as min_value], metrics=[output_rows=2, output_batches=2] 04)----AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[min(join_agg_probe.value)], metrics=[output_rows=2, output_batches=2, spill_count=0, spilled_rows=0] @@ -805,7 +805,7 @@ FROM nulls_build INNER JOIN nulls_probe ON nulls_build.a = nulls_probe.a AND nulls_build.b = nulls_probe.b; ---- Plan with Metrics -01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)], metrics=[output_rows=1, output_batches=1, array_map_created_count=0, build_input_batches=1, build_input_rows=3, input_batches=1, input_rows=1, avg_fanout=100% (1/1), probe_hit_rate=100% (1/1)] +01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)], metrics=[output_rows=1, output_batches=1, array_map_created_count=0, build_input_batches=1, build_input_rows=3, input_batches=1, input_rows=1, roaring_map_created_count=0, avg_fanout=100% (1/1), probe_hit_rate=100% (1/1)] 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/nulls_build.parquet]]}, projection=[a, b], file_type=parquet, metrics=[output_rows=3, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, predicate_cache_inner_records=0, predicate_cache_records=0, scan_efficiency_ratio=18.6% (144/774)] 03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/nulls_probe.parquet]]}, projection=[a, b, c], file_type=parquet, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= 1 AND b@1 <= 2 AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:1}, {c0:,c1:2}, {c0:ab,c1:}]) ], pruning_predicate=a_null_count@1 != row_count@2 AND a_max@0 >= aa AND a_null_count@1 != row_count@2 AND a_min@3 <= ab AND b_null_count@5 != row_count@2 AND b_max@4 >= 1 AND b_null_count@5 != row_count@2 AND b_min@6 <= 2, required_guarantees=[], metrics=[output_rows=1, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=1, pushdown_rows_pruned=3, predicate_cache_inner_records=8, predicate_cache_records=2, scan_efficiency_ratio=21.1% (237/1.12 K)] @@ -871,7 +871,7 @@ FROM lj_build LEFT JOIN lj_probe ON lj_build.a = lj_probe.a AND lj_build.b = lj_probe.b; ---- Plan with Metrics -01)HashJoinExec: mode=CollectLeft, join_type=Left, on=[(a@0, a@0), (b@1, b@1)], metrics=[output_rows=2, output_batches=1, array_map_created_count=0, build_input_batches=1, build_input_rows=2, input_batches=2, input_rows=2, avg_fanout=100% (2/2), probe_hit_rate=100% (2/2)] +01)HashJoinExec: mode=CollectLeft, join_type=Left, on=[(a@0, a@0), (b@1, b@1)], metrics=[output_rows=2, output_batches=1, array_map_created_count=0, build_input_batches=1, build_input_rows=2, input_batches=2, input_rows=2, roaring_map_created_count=0, avg_fanout=100% (2/2), probe_hit_rate=100% (2/2)] 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/lj_build.parquet]]}, projection=[a, b, c], file_type=parquet, metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, predicate_cache_inner_records=0, predicate_cache_records=0, scan_efficiency_ratio=20.48% (214/1.04 K)] 03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/lj_probe.parquet]]}, projection=[a, b, e], file_type=parquet, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}, {c0:ab,c1:bb}]) ], pruning_predicate=a_null_count@1 != row_count@2 AND a_max@0 >= aa AND a_null_count@1 != row_count@2 AND a_min@3 <= ab AND b_null_count@5 != row_count@2 AND b_max@4 >= ba AND b_null_count@5 != row_count@2 AND b_min@6 <= bb, required_guarantees=[], metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=2, pushdown_rows_pruned=2, predicate_cache_inner_records=8, predicate_cache_records=4, scan_efficiency_ratio=22.78% (246/1.08 K)] @@ -887,7 +887,7 @@ WHERE EXISTS ( ); ---- Plan with Metrics -01)HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(a@0, a@0), (b@1, b@1)], metrics=[output_rows=2, output_batches=1, array_map_created_count=0, build_input_batches=1, build_input_rows=2, input_batches=2, input_rows=4, avg_fanout=100% (2/2), probe_hit_rate=100% (2/2)] +01)HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(a@0, a@0), (b@1, b@1)], metrics=[output_rows=2, output_batches=1, array_map_created_count=0, build_input_batches=1, build_input_rows=2, input_batches=2, input_rows=4, roaring_map_created_count=0, avg_fanout=100% (2/2), probe_hit_rate=100% (2/2)] 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/lj_build.parquet]]}, projection=[a, b, c], file_type=parquet, metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, predicate_cache_inner_records=0, predicate_cache_records=0, scan_efficiency_ratio=20.48% (214/1.04 K)] 03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/lj_probe.parquet]]}, projection=[a, b], file_type=parquet, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}, {c0:ab,c1:bb}]) ], pruning_predicate=a_null_count@1 != row_count@2 AND a_max@0 >= aa AND a_null_count@1 != row_count@2 AND a_min@3 <= ab AND b_null_count@5 != row_count@2 AND b_max@4 >= ba AND b_null_count@5 != row_count@2 AND b_min@6 <= bb, required_guarantees=[], metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=2, pushdown_rows_pruned=2, predicate_cache_inner_records=8, predicate_cache_records=4, scan_efficiency_ratio=15.37% (166/1.08 K)] @@ -957,7 +957,7 @@ FROM hl_probe p INNER JOIN hl_build AS build ON p.a = build.a AND p.b = build.b; ---- Plan with Metrics -01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)], projection=[a@3, b@4, c@2, e@5], metrics=[output_rows=2, output_batches=1, array_map_created_count=0, build_input_batches=1, build_input_rows=2, input_batches=1, input_rows=2, avg_fanout=100% (2/2), probe_hit_rate=100% (2/2)] +01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)], projection=[a@3, b@4, c@2, e@5], metrics=[output_rows=2, output_batches=1, array_map_created_count=0, build_input_batches=1, build_input_rows=2, input_batches=1, input_rows=2, roaring_map_created_count=0, avg_fanout=100% (2/2), probe_hit_rate=100% (2/2)] 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/hl_build.parquet]]}, projection=[a, b, c], file_type=parquet, metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, predicate_cache_inner_records=0, predicate_cache_records=0, scan_efficiency_ratio=20.48% (214/1.04 K)] 03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/hl_probe.parquet]]}, projection=[a, b, e], file_type=parquet, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND hash_lookup ], pruning_predicate=a_null_count@1 != row_count@2 AND a_max@0 >= aa AND a_null_count@1 != row_count@2 AND a_min@3 <= ab AND b_null_count@5 != row_count@2 AND b_max@4 >= ba AND b_null_count@5 != row_count@2 AND b_min@6 <= bb, required_guarantees=[], metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=2, pushdown_rows_pruned=2, predicate_cache_inner_records=8, predicate_cache_records=4, scan_efficiency_ratio=22.78% (246/1.08 K)] @@ -1006,7 +1006,7 @@ FROM int_build b INNER JOIN int_probe p ON b.id1 = p.id1 AND b.id2 = p.id2; ---- Plan with Metrics -01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(id1@0, id1@0), (id2@1, id2@1)], projection=[id1@0, id2@1, value@2, data@5], metrics=[output_rows=2, output_batches=1, array_map_created_count=0, build_input_batches=1, build_input_rows=2, input_batches=1, input_rows=2, avg_fanout=100% (2/2), probe_hit_rate=100% (2/2)] +01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(id1@0, id1@0), (id2@1, id2@1)], projection=[id1@0, id2@1, value@2, data@5], metrics=[output_rows=2, output_batches=1, array_map_created_count=0, build_input_batches=1, build_input_rows=2, input_batches=1, input_rows=2, roaring_map_created_count=0, avg_fanout=100% (2/2), probe_hit_rate=100% (2/2)] 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/int_build.parquet]]}, projection=[id1, id2, value], file_type=parquet, metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, predicate_cache_inner_records=0, predicate_cache_records=0, scan_efficiency_ratio=19.02% (222/1.17 K)] 03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/int_probe.parquet]]}, projection=[id1, id2, data], file_type=parquet, predicate=DynamicFilter [ id1@0 >= 1 AND id1@0 <= 2 AND id2@1 >= 10 AND id2@1 <= 20 AND hash_lookup ], pruning_predicate=id1_null_count@1 != row_count@2 AND id1_max@0 >= 1 AND id1_null_count@1 != row_count@2 AND id1_min@3 <= 2 AND id2_null_count@5 != row_count@2 AND id2_max@4 >= 10 AND id2_null_count@5 != row_count@2 AND id2_min@6 <= 20, required_guarantees=[], metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=2, pushdown_rows_pruned=2, predicate_cache_inner_records=8, predicate_cache_records=4, scan_efficiency_ratio=21.43% (239/1.11 K)]