From 78f4bcc84b8be67e9a44324a92f65b2e8d070150 Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Mon, 17 Jun 2024 12:52:50 +0200 Subject: [PATCH 1/3] feat: add data page bench --- datafusion/core/benches/parquet_statistic.rs | 169 +++++++++++++++---- 1 file changed, 135 insertions(+), 34 deletions(-) diff --git a/datafusion/core/benches/parquet_statistic.rs b/datafusion/core/benches/parquet_statistic.rs index 5fd6b0066eb2c..a24f1da62a50a 100644 --- a/datafusion/core/benches/parquet_statistic.rs +++ b/datafusion/core/benches/parquet_statistic.rs @@ -18,20 +18,26 @@ //! Benchmarks of benchmark for extracting arrow statistics from parquet use arrow::array::{ArrayRef, DictionaryArray, Float64Array, StringArray, UInt64Array}; -use arrow_array::{Int32Array, RecordBatch}; +use arrow_array::{Int32Array, Int64Array, RecordBatch}; use arrow_schema::{ DataType::{self, *}, Field, Schema, }; use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; use datafusion::datasource::physical_plan::parquet::StatisticsConverter; -use parquet::arrow::{arrow_reader::ArrowReaderBuilder, ArrowWriter}; -use parquet::file::properties::WriterProperties; +use parquet::{ + arrow::arrow_reader::ArrowReaderOptions, file::properties::WriterProperties, +}; +use parquet::{ + arrow::{arrow_reader::ArrowReaderBuilder, ArrowWriter}, + file::properties::EnabledStatistics, +}; use std::sync::Arc; use tempfile::NamedTempFile; #[derive(Debug, Clone)] enum TestTypes { UInt64, + Int64, F64, String, Dictionary, @@ -43,6 +49,7 @@ impl fmt::Display for TestTypes { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { TestTypes::UInt64 => write!(f, "UInt64"), + TestTypes::Int64 => write!(f, "Int64"), TestTypes::F64 => write!(f, "F64"), TestTypes::String => write!(f, "String"), TestTypes::Dictionary => write!(f, "Dictionary(Int32, String)"), @@ -50,11 +57,18 @@ impl fmt::Display for TestTypes { } } -fn create_parquet_file(dtype: TestTypes, row_groups: usize) -> NamedTempFile { +fn create_parquet_file( + dtype: TestTypes, + row_groups: usize, + data_page_row_count_limit: &Option, +) -> NamedTempFile { let schema = match dtype { TestTypes::UInt64 => { Arc::new(Schema::new(vec![Field::new("col", DataType::UInt64, true)])) } + TestTypes::Int64 => { + Arc::new(Schema::new(vec![Field::new("col", DataType::Int64, true)])) + } TestTypes::F64 => Arc::new(Schema::new(vec![Field::new( "col", DataType::Float64, @@ -70,7 +84,14 @@ fn create_parquet_file(dtype: TestTypes, row_groups: usize) -> NamedTempFile { )])), }; - let props = WriterProperties::builder().build(); + let mut props = WriterProperties::builder(); + if let Some(limit) = data_page_row_count_limit { + props = props + .set_data_page_row_count_limit(*limit) + .set_statistics_enabled(EnabledStatistics::Page); + }; + let props = props.build(); + let file = tempfile::Builder::new() .suffix(".parquet") .tempfile() @@ -82,11 +103,18 @@ fn create_parquet_file(dtype: TestTypes, row_groups: usize) -> NamedTempFile { for _ in 0..row_groups { let batch = match dtype { TestTypes::UInt64 => make_uint64_batch(), + TestTypes::Int64 => make_int64_batch(), TestTypes::F64 => make_f64_batch(), TestTypes::String => make_string_batch(), TestTypes::Dictionary => make_dict_batch(), }; - writer.write(&batch).unwrap(); + if data_page_row_count_limit.is_some() { + for i in 0..batch.num_rows() { + writer.write(&batch.slice(i, 1)).unwrap(); + } + } else { + writer.write(&batch).unwrap(); + } } writer.close().unwrap(); file @@ -109,6 +137,23 @@ fn make_uint64_batch() -> RecordBatch { .unwrap() } +fn make_int64_batch() -> RecordBatch { + let array: ArrayRef = Arc::new(Int64Array::from(vec![ + Some(1), + Some(2), + Some(3), + Some(4), + Some(5), + ])); + RecordBatch::try_new( + Arc::new(arrow::datatypes::Schema::new(vec![ + arrow::datatypes::Field::new("col", Int64, false), + ])), + vec![array], + ) + .unwrap() +} + fn make_f64_batch() -> RecordBatch { let array: ArrayRef = Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0, 5.0])); RecordBatch::try_new( @@ -150,37 +195,93 @@ fn make_dict_batch() -> RecordBatch { fn criterion_benchmark(c: &mut Criterion) { let row_groups = 100; use TestTypes::*; - let types = vec![UInt64, F64, String, Dictionary]; + let types = vec![Int64, UInt64, F64, String, Dictionary]; + let data_page_row_count_limits = vec![None, Some(1)]; for dtype in types { - let file = create_parquet_file(dtype.clone(), row_groups); - let file = file.reopen().unwrap(); - let reader = ArrowReaderBuilder::try_new(file).unwrap(); - let metadata = reader.metadata(); - let row_groups = metadata.row_groups(); - - let mut group = - c.benchmark_group(format!("Extract statistics for {}", dtype.clone())); - group.bench_function( - BenchmarkId::new("extract_statistics", dtype.clone()), - |b| { - b.iter(|| { - let converter = StatisticsConverter::try_new( - "col", - reader.schema(), - reader.parquet_schema(), - ) - .unwrap(); - - let _ = converter.row_group_mins(row_groups.iter()).unwrap(); - let _ = converter.row_group_maxes(row_groups.iter()).unwrap(); - let _ = converter.row_group_null_counts(row_groups.iter()).unwrap(); - let _ = StatisticsConverter::row_group_row_counts(row_groups.iter()) + for data_page_row_count_limit in &data_page_row_count_limits { + let file = + create_parquet_file(dtype.clone(), row_groups, data_page_row_count_limit); + let file = file.reopen().unwrap(); + let options = ArrowReaderOptions::new().with_page_index(true); + let reader = ArrowReaderBuilder::try_new_with_options(file, options).unwrap(); + let metadata = reader.metadata(); + let row_groups = metadata.row_groups(); + let row_group_indices = row_groups + .iter() + .enumerate() + .map(|(i, _)| i) + .collect::>(); + + let statistic_type = if data_page_row_count_limit.is_some() { + "data page" + } else { + "row group" + }; + + let mut group = c.benchmark_group(format!( + "Extract {} statistics for {}", + statistic_type, + dtype.clone() + )); + group.bench_function( + BenchmarkId::new("extract_statistics", dtype.clone()), + |b| { + b.iter(|| { + let converter = StatisticsConverter::try_new( + "col", + reader.schema(), + reader.parquet_schema(), + ) .unwrap(); - }) - }, - ); - group.finish(); + + if data_page_row_count_limit.is_some() { + let column_page_index = reader + .metadata() + .column_index() + .expect("File should have column page indices"); + + let column_offset_index = reader + .metadata() + .offset_index() + .expect("File should have column offset indices"); + + let _ = converter.data_page_mins( + column_page_index, + column_offset_index, + &row_group_indices, + ); + let _ = converter.data_page_maxes( + column_page_index, + column_offset_index, + &row_group_indices, + ); + let _ = converter.data_page_null_counts( + column_page_index, + column_offset_index, + &row_group_indices, + ); + let _ = converter.data_page_row_counts( + column_offset_index, + row_groups, + &row_group_indices, + ); + } else { + let _ = converter.row_group_mins(row_groups.iter()).unwrap(); + let _ = converter.row_group_maxes(row_groups.iter()).unwrap(); + let _ = converter + .row_group_null_counts(row_groups.iter()) + .unwrap(); + let _ = StatisticsConverter::row_group_row_counts( + row_groups.iter(), + ) + .unwrap(); + } + }) + }, + ); + group.finish(); + } } } From bcd1407629380173e42bcfc77ab3a5ef73a88ca8 Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Mon, 17 Jun 2024 16:14:55 +0200 Subject: [PATCH 2/3] chore: add comment --- datafusion/core/benches/parquet_statistic.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/datafusion/core/benches/parquet_statistic.rs b/datafusion/core/benches/parquet_statistic.rs index a24f1da62a50a..d7c4da0d62580 100644 --- a/datafusion/core/benches/parquet_statistic.rs +++ b/datafusion/core/benches/parquet_statistic.rs @@ -109,6 +109,9 @@ fn create_parquet_file( TestTypes::Dictionary => make_dict_batch(), }; if data_page_row_count_limit.is_some() { + // Send batches one at a time. This allows the + // writer to apply the page limit, that is only + // checked on RecordBatch boundaries. for i in 0..batch.num_rows() { writer.write(&batch.slice(i, 1)).unwrap(); } From 1f8f2a4ea8bea71c200d6530be594101a8c3cdff Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Mon, 17 Jun 2024 16:24:09 +0200 Subject: [PATCH 3/3] fix: row_groups + shorten row_group_indices --- datafusion/core/benches/parquet_statistic.rs | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/datafusion/core/benches/parquet_statistic.rs b/datafusion/core/benches/parquet_statistic.rs index d7c4da0d62580..d67a03d1e3acd 100644 --- a/datafusion/core/benches/parquet_statistic.rs +++ b/datafusion/core/benches/parquet_statistic.rs @@ -84,7 +84,7 @@ fn create_parquet_file( )])), }; - let mut props = WriterProperties::builder(); + let mut props = WriterProperties::builder().set_max_row_group_size(row_groups); if let Some(limit) = data_page_row_count_limit { props = props .set_data_page_row_count_limit(*limit) @@ -210,11 +210,7 @@ fn criterion_benchmark(c: &mut Criterion) { let reader = ArrowReaderBuilder::try_new_with_options(file, options).unwrap(); let metadata = reader.metadata(); let row_groups = metadata.row_groups(); - let row_group_indices = row_groups - .iter() - .enumerate() - .map(|(i, _)| i) - .collect::>(); + let row_group_indices: Vec<_> = (0..row_groups.len()).collect(); let statistic_type = if data_page_row_count_limit.is_some() { "data page"