From f2830a9ee0ad371955e77ebec6e99b39c6c00cb9 Mon Sep 17 00:00:00 2001 From: Matthew Kim <38759997+friendlymatthew@users.noreply.github.com> Date: Thu, 26 Mar 2026 10:56:00 -0400 Subject: [PATCH 1/2] commit benchmarks --- datafusion/core/Cargo.toml | 5 + .../core/benches/parquet_struct_projection.rs | 403 ++++++++++++++++++ 2 files changed, 408 insertions(+) create mode 100644 datafusion/core/benches/parquet_struct_projection.rs diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 326b791a2f624..e3864e4bff685 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -247,6 +247,11 @@ harness = false name = "parquet_struct_query" required-features = ["parquet"] +[[bench]] +harness = false +name = "parquet_struct_projection" +required-features = ["parquet"] + [[bench]] harness = false name = "range_and_generate_series" diff --git a/datafusion/core/benches/parquet_struct_projection.rs b/datafusion/core/benches/parquet_struct_projection.rs new file mode 100644 index 0000000000000..d4e86be2a1843 --- /dev/null +++ b/datafusion/core/benches/parquet_struct_projection.rs @@ -0,0 +1,403 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Benchmarks for struct leaf-level projection pruning in Parquet. +//! +//! Measures the benefit of reading only the needed leaf columns from a +//! struct column. Three dataset shapes are tested: +//! +//! 1. **Narrow struct** (2 leaves): one 8 KiB UTF-8 field + one INT field +//! 2. **Wide struct** (5 leaves): four 8 KiB UTF-8 fields + one INT field +//! 3. **Nested struct** (3 leaves): `STRUCT, extra_string>` +//! +//! In all cases, projecting just the small integer should skip decoding +//! all of the large string leaves, including through nested struct levels. + +use arrow::array::{ArrayRef, Int32Array, StringBuilder, StructArray}; +use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef}; +use arrow::record_batch::RecordBatch; +use criterion::{Criterion, criterion_group, criterion_main}; +use datafusion::prelude::SessionContext; +use datafusion_common::instant::Instant; +use parquet::arrow::ArrowWriter; +use parquet::file::properties::{WriterProperties, WriterVersion}; +use std::hint::black_box; +use std::path::Path; +use std::sync::Arc; +use tempfile::NamedTempFile; +use tokio::runtime::Runtime; + +const NUM_BATCHES: usize = 64; +const WRITE_RECORD_BATCH_SIZE: usize = 4096; +const ROW_GROUP_ROW_COUNT: usize = 65536; +const EXPECTED_ROW_GROUPS: usize = 4; +const LARGE_STRING_LEN: usize = 8 * 1024; + +fn narrow_schema() -> SchemaRef { + let struct_fields = Fields::from(vec![ + Field::new("large_string", DataType::Utf8, false), + Field::new("small_int", DataType::Int32, false), + ]); + Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("s", DataType::Struct(struct_fields), false), + ])) +} + +fn narrow_batch(batch_id: usize) -> RecordBatch { + let schema = narrow_schema(); + let len = WRITE_RECORD_BATCH_SIZE; + + let base_id = (batch_id * len) as i32; + let id_values: Vec = (0..len).map(|i| base_id + i as i32).collect(); + let id_array = Arc::new(Int32Array::from(id_values.clone())); + + let small_int_array = Arc::new(Int32Array::from(id_values)); + + let large_string: String = "x".repeat(LARGE_STRING_LEN); + let mut string_builder = StringBuilder::new(); + for _ in 0..len { + string_builder.append_value(&large_string); + } + let large_string_array = Arc::new(string_builder.finish()); + + let struct_array = StructArray::from(vec![ + ( + Arc::new(Field::new("large_string", DataType::Utf8, false)), + large_string_array as ArrayRef, + ), + ( + Arc::new(Field::new("small_int", DataType::Int32, false)), + small_int_array as ArrayRef, + ), + ]); + + RecordBatch::try_new(schema, vec![id_array, Arc::new(struct_array)]).unwrap() +} + +fn wide_schema() -> SchemaRef { + let struct_fields = Fields::from(vec![ + Field::new("str_a", DataType::Utf8, false), + Field::new("str_b", DataType::Utf8, false), + Field::new("str_c", DataType::Utf8, false), + Field::new("str_d", DataType::Utf8, false), + Field::new("small_int", DataType::Int32, false), + ]); + Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("s", DataType::Struct(struct_fields), false), + ])) +} + +fn wide_batch(batch_id: usize) -> RecordBatch { + let schema = wide_schema(); + let len = WRITE_RECORD_BATCH_SIZE; + + let base_id = (batch_id * len) as i32; + let id_values: Vec = (0..len).map(|i| base_id + i as i32).collect(); + let id_array = Arc::new(Int32Array::from(id_values.clone())); + let small_int_array = Arc::new(Int32Array::from(id_values)); + + let large_string: String = "x".repeat(LARGE_STRING_LEN); + let mut string_fields: Vec<(Arc, ArrayRef)> = Vec::new(); + for name in &["str_a", "str_b", "str_c", "str_d"] { + let mut sb = StringBuilder::new(); + for _ in 0..len { + sb.append_value(&large_string); + } + string_fields.push(( + Arc::new(Field::new(*name, DataType::Utf8, false)), + Arc::new(sb.finish()) as ArrayRef, + )); + } + string_fields.push(( + Arc::new(Field::new("small_int", DataType::Int32, false)), + small_int_array as ArrayRef, + )); + + let struct_array = StructArray::from(string_fields); + RecordBatch::try_new(schema, vec![id_array, Arc::new(struct_array)]).unwrap() +} + +fn generate_file( + schema: SchemaRef, + batch_fn: fn(usize) -> RecordBatch, + prefix: &str, +) -> NamedTempFile { + let now = Instant::now(); + let mut named_file = tempfile::Builder::new() + .prefix(prefix) + .suffix(".parquet") + .tempfile() + .unwrap(); + + println!("Generating parquet file - {}", named_file.path().display()); + + let properties = WriterProperties::builder() + .set_writer_version(WriterVersion::PARQUET_2_0) + .set_max_row_group_row_count(Some(ROW_GROUP_ROW_COUNT)) + .build(); + + let mut writer = + ArrowWriter::try_new(&mut named_file, schema, Some(properties)).unwrap(); + + for batch_id in 0..NUM_BATCHES { + let batch = batch_fn(batch_id); + writer.write(&batch).unwrap(); + } + + let metadata = writer.close().unwrap(); + let file_metadata = metadata.file_metadata(); + let expected_rows = WRITE_RECORD_BATCH_SIZE * NUM_BATCHES; + assert_eq!( + file_metadata.num_rows() as usize, + expected_rows, + "Expected {expected_rows} rows but got {}", + file_metadata.num_rows() + ); + assert_eq!( + metadata.row_groups().len(), + EXPECTED_ROW_GROUPS, + "Expected {EXPECTED_ROW_GROUPS} row groups but got {}", + metadata.row_groups().len() + ); + + println!( + "Generated parquet file with {} rows and {} row groups in {:.2}s", + file_metadata.num_rows(), + metadata.row_groups().len(), + now.elapsed().as_secs_f32() + ); + + named_file +} + +fn create_context(rt: &Runtime, file_path: &str, table: &str) -> SessionContext { + let ctx = SessionContext::new(); + rt.block_on(ctx.register_parquet(table, file_path, Default::default())) + .unwrap(); + ctx +} + +fn query(ctx: &SessionContext, rt: &Runtime, sql: &str) { + let ctx = ctx.clone(); + let sql = sql.to_string(); + let df = rt.block_on(ctx.sql(&sql)).unwrap(); + black_box(rt.block_on(df.collect()).unwrap()); +} + +fn narrow_benchmarks(c: &mut Criterion) { + let temp_file = generate_file(narrow_schema(), narrow_batch, "narrow_struct"); + let file_path = temp_file.path().display().to_string(); + assert!(Path::new(&file_path).exists(), "path not found"); + + let rt = Runtime::new().unwrap(); + let ctx = create_context(&rt, &file_path, "t"); + + let mut group = c.benchmark_group("narrow_struct"); + + // baseline: full struct, must decode both leaves + group.bench_function("select_struct", |b| { + b.iter(|| query(&ctx, &rt, "SELECT s FROM t")) + }); + + // pruned: skip large_string, read only small_int + group.bench_function("select_small_field", |b| { + b.iter(|| query(&ctx, &rt, "SELECT s['small_int'] FROM t")) + }); + + // pruned: skip small_int, read only large_string + group.bench_function("select_large_field", |b| { + b.iter(|| query(&ctx, &rt, "SELECT s['large_string'] FROM t")) + }); + + // no pruning: all columns + group.bench_function("select_all", |b| { + b.iter(|| query(&ctx, &rt, "SELECT * FROM t")) + }); + + // top-level column + pruned struct sub-field + group.bench_function("select_id_and_small_field", |b| { + b.iter(|| query(&ctx, &rt, "SELECT id, s['small_int'] FROM t")) + }); + + // aggregation on pruned sub-field, realistic analytical pattern + group.bench_function("sum_small_field", |b| { + b.iter(|| query(&ctx, &rt, "SELECT SUM(s['small_int']) FROM t")) + }); + + group.finish(); + drop(temp_file); +} + +fn wide_benchmarks(c: &mut Criterion) { + let temp_file = generate_file(wide_schema(), wide_batch, "wide_struct"); + let file_path = temp_file.path().display().to_string(); + assert!(Path::new(&file_path).exists(), "path not found"); + + let rt = Runtime::new().unwrap(); + let ctx = create_context(&rt, &file_path, "t"); + + let mut group = c.benchmark_group("wide_struct"); + + // baseline: full struct, must decode all 5 leaves + group.bench_function("select_struct", |b| { + b.iter(|| query(&ctx, &rt, "SELECT s FROM t")) + }); + + // pruned: skip all 4 large string leaves + group.bench_function("select_small_field", |b| { + b.iter(|| query(&ctx, &rt, "SELECT s['small_int'] FROM t")) + }); + + // pruned: read 1 of 4 string leaves + skip the rest + group.bench_function("select_one_string_field", |b| { + b.iter(|| query(&ctx, &rt, "SELECT s['str_a'] FROM t")) + }); + + // pruned: read 2 of 4 string leaves + group.bench_function("select_two_string_fields", |b| { + b.iter(|| query(&ctx, &rt, "SELECT s['str_a'], s['str_b'] FROM t")) + }); + + // no pruning: all columns + group.bench_function("select_all", |b| { + b.iter(|| query(&ctx, &rt, "SELECT * FROM t")) + }); + + // aggregation on pruned sub-field, skips all 4 large leaves + group.bench_function("sum_small_field", |b| { + b.iter(|| query(&ctx, &rt, "SELECT SUM(s['small_int']) FROM t")) + }); + + group.finish(); + drop(temp_file); +} + +fn nested_schema() -> SchemaRef { + let inner_fields = Fields::from(vec![ + Field::new("large_string", DataType::Utf8, false), + Field::new("small_int", DataType::Int32, false), + ]); + let outer_fields = Fields::from(vec![ + Field::new("inner", DataType::Struct(inner_fields), false), + Field::new("extra_string", DataType::Utf8, false), + ]); + Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("s", DataType::Struct(outer_fields), false), + ])) +} + +fn nested_batch(batch_id: usize) -> RecordBatch { + let schema = nested_schema(); + let len = WRITE_RECORD_BATCH_SIZE; + + let base_id = (batch_id * len) as i32; + let id_values: Vec = (0..len).map(|i| base_id + i as i32).collect(); + let id_array = Arc::new(Int32Array::from(id_values.clone())); + let small_int_array = Arc::new(Int32Array::from(id_values)); + + let large_string: String = "x".repeat(LARGE_STRING_LEN); + + let mut sb1 = StringBuilder::new(); + let mut sb2 = StringBuilder::new(); + for _ in 0..len { + sb1.append_value(&large_string); + sb2.append_value(&large_string); + } + + let inner_struct = StructArray::from(vec![ + ( + Arc::new(Field::new("large_string", DataType::Utf8, false)), + Arc::new(sb1.finish()) as ArrayRef, + ), + ( + Arc::new(Field::new("small_int", DataType::Int32, false)), + small_int_array as ArrayRef, + ), + ]); + + let inner_fields = Fields::from(vec![ + Field::new("large_string", DataType::Utf8, false), + Field::new("small_int", DataType::Int32, false), + ]); + let outer_struct = StructArray::from(vec![ + ( + Arc::new(Field::new("inner", DataType::Struct(inner_fields), false)), + Arc::new(inner_struct) as ArrayRef, + ), + ( + Arc::new(Field::new("extra_string", DataType::Utf8, false)), + Arc::new(sb2.finish()) as ArrayRef, + ), + ]); + + RecordBatch::try_new(schema, vec![id_array, Arc::new(outer_struct)]).unwrap() +} + +fn nested_benchmarks(c: &mut Criterion) { + let temp_file = generate_file(nested_schema(), nested_batch, "nested_struct"); + let file_path = temp_file.path().display().to_string(); + assert!(Path::new(&file_path).exists(), "path not found"); + + let rt = Runtime::new().unwrap(); + let ctx = create_context(&rt, &file_path, "t"); + + let mut group = c.benchmark_group("nested_struct"); + + // baseline: full outer struct, decode all 3 leaves + group.bench_function("select_struct", |b| { + b.iter(|| query(&ctx, &rt, "SELECT s FROM t")) + }); + + // pruned outer: read only inner struct, skip extra_string + group.bench_function("select_inner_struct", |b| { + b.iter(|| query(&ctx, &rt, "SELECT s['inner'] FROM t")) + }); + + // pruned both levels: reach through outer + inner, skip both large strings + group.bench_function("select_inner_small_field", |b| { + b.iter(|| query(&ctx, &rt, "SELECT s['inner']['small_int'] FROM t")) + }); + + // pruned outer only: skip inner struct entirely, read extra_string + group.bench_function("select_extra_string", |b| { + b.iter(|| query(&ctx, &rt, "SELECT s['extra_string'] FROM t")) + }); + + // no pruning: all columns + group.bench_function("select_all", |b| { + b.iter(|| query(&ctx, &rt, "SELECT * FROM t")) + }); + + // aggregation reaching through two levels of nesting + group.bench_function("sum_inner_small_field", |b| { + b.iter(|| query(&ctx, &rt, "SELECT SUM(s['inner']['small_int']) FROM t")) + }); + + group.finish(); + drop(temp_file); +} + +criterion_group!( + benches, + narrow_benchmarks, + wide_benchmarks, + nested_benchmarks +); +criterion_main!(benches); From 41a823842f42e02f3278f4edaf0d3f7e1759bdcc Mon Sep 17 00:00:00 2001 From: Matthew Kim <38759997+friendlymatthew@users.noreply.github.com> Date: Thu, 26 Mar 2026 11:49:56 -0400 Subject: [PATCH 2/2] try 128kb string --- datafusion/core/benches/parquet_struct_projection.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/core/benches/parquet_struct_projection.rs b/datafusion/core/benches/parquet_struct_projection.rs index d4e86be2a1843..d6cf86a91c86b 100644 --- a/datafusion/core/benches/parquet_struct_projection.rs +++ b/datafusion/core/benches/parquet_struct_projection.rs @@ -20,8 +20,8 @@ //! Measures the benefit of reading only the needed leaf columns from a //! struct column. Three dataset shapes are tested: //! -//! 1. **Narrow struct** (2 leaves): one 8 KiB UTF-8 field + one INT field -//! 2. **Wide struct** (5 leaves): four 8 KiB UTF-8 fields + one INT field +//! 1. **Narrow struct** (2 leaves): one 128 KiB UTF-8 field + one INT field +//! 2. **Wide struct** (5 leaves): four 128 KiB UTF-8 fields + one INT field //! 3. **Nested struct** (3 leaves): `STRUCT, extra_string>` //! //! In all cases, projecting just the small integer should skip decoding @@ -45,7 +45,7 @@ const NUM_BATCHES: usize = 64; const WRITE_RECORD_BATCH_SIZE: usize = 4096; const ROW_GROUP_ROW_COUNT: usize = 65536; const EXPECTED_ROW_GROUPS: usize = 4; -const LARGE_STRING_LEN: usize = 8 * 1024; +const LARGE_STRING_LEN: usize = 128 * 1024; fn narrow_schema() -> SchemaRef { let struct_fields = Fields::from(vec![