From 15af5be142cd6c762ebe36e3da67d2b1a48127f6 Mon Sep 17 00:00:00 2001 From: NGA-TRAN Date: Wed, 15 May 2024 17:38:47 -0400 Subject: [PATCH 01/11] test: some tests to write data to a parquet file and read its metadata --- .../physical_plan/parquet/arrow_statistics.rs | 43 ++ .../datasource/physical_plan/parquet/mod.rs | 1 + .../core/tests/parquet/arrow_statistics.rs | 528 ++++++++++++++++++ datafusion/core/tests/parquet/mod.rs | 1 + 4 files changed, 573 insertions(+) create mode 100644 datafusion/core/src/datasource/physical_plan/parquet/arrow_statistics.rs create mode 100644 datafusion/core/tests/parquet/arrow_statistics.rs diff --git a/datafusion/core/src/datasource/physical_plan/parquet/arrow_statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/arrow_statistics.rs new file mode 100644 index 0000000000000..4d380a68c5768 --- /dev/null +++ b/datafusion/core/src/datasource/physical_plan/parquet/arrow_statistics.rs @@ -0,0 +1,43 @@ +use arrow_array::ArrayRef; +use arrow_schema::DataType; +use datafusion_common::Result; +use parquet::file::statistics::Statistics as ParquetStatistics; + +/// statistics extracted from `Statistics` as Arrow `ArrayRef`s +/// +/// # Note: +/// If the corresponding `Statistics` is not present, or has no information for +/// a column, a NULL is present in the corresponding array entry +pub struct ArrowStatistics { + /// min values + min: ArrayRef, + /// max values + max: ArrayRef, + /// Row counts (UInt64Array) + row_count: ArrayRef, + /// Null Counts (UInt64Array) + null_count: ArrayRef, +} + +/// Extract `ArrowStatistics` from the parquet [`Statistics`] +pub fn parquet_stats_to_arrow<'a>( + arrow_datatype: &DataType, + statistics: impl IntoIterator>, +) -> Result { + todo!() // MY TODO next +} + +// TODO: Now I had tests that read parquet metadata, I will use that to implement the below +// struct ParquetStatisticsExtractor { +// ... +// } + +// // create an extractor that can extract data from parquet files +// let extractor = ParquetStatisticsExtractor::new(arrow_schema, parquet_schema) + +// // get parquet statistics (one for each row group) somehow: +// let parquet_stats: Vec<&Statistics> = ...; + +// // extract min/max values for column "a" and "b"; +// let col_a stats = extractor.extract("a", parquet_stats.iter()); +// let col_b stats = extractor.extract("b", parquet_stats.iter()); diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index a48f510adb56d..33922782fc55f 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -63,6 +63,7 @@ use parquet::file::{metadata::ParquetMetaData, properties::WriterProperties}; use parquet::schema::types::ColumnDescriptor; use tokio::task::JoinSet; +mod arrow_statistics; mod metrics; mod page_filter; mod row_filter; diff --git a/datafusion/core/tests/parquet/arrow_statistics.rs b/datafusion/core/tests/parquet/arrow_statistics.rs new file mode 100644 index 0000000000000..3636ca411ac57 --- /dev/null +++ b/datafusion/core/tests/parquet/arrow_statistics.rs @@ -0,0 +1,528 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This file contains an end to end test of extracting statitics from parquet files. +//! It writes data into a parquet file, reads statistics and verifies they are correct + +use std::sync::Arc; + +// use arrow::json::reader; +use arrow_array::{make_array, Array, Int64Array, RecordBatch}; +use arrow_schema::{DataType, Field, Schema}; +use parquet::arrow::arrow_reader::ArrowReaderBuilder; +use parquet::arrow::ArrowWriter; +use parquet::file::properties::{EnabledStatistics, WriterProperties}; +use parquet::file::statistics::Statistics as ParquetFileStatistics; +use parquet::format::Statistics as ParquetFormatStatistics; + +// TEST HELPERS + +/// Return a record batch with i64 with Null values +fn make_int64_batches_with_null( + null_values: usize, + no_null_values_start: i64, + no_null_values_end: i64, +) -> RecordBatch { + let schema = Arc::new(Schema::new(vec![Field::new("i64", DataType::Int64, true)])); + + let v64: Vec = (no_null_values_start as _..no_null_values_end as _).collect(); + + RecordBatch::try_new( + schema, + vec![make_array( + Int64Array::from_iter( + v64.into_iter() + .map(Some) + .chain(std::iter::repeat(None).take(null_values)), + ) + .to_data(), + )], + ) + .unwrap() +} + +// TODO: Eventually, we will need only read_statistics and read_row_count, +// but for my understading, we will keep both write and read statistics. +// Their values should be the same +pub struct RowGroupsStatistics { + // Statistics from the file writer + pub write_statistics: Vec, + + // Statistics from the file reader + pub read_statistics: Vec, +} + +pub struct WriteRowGroupStatistics { + pub statistics: ParquetFormatStatistics, + pub row_count: i64, +} + +pub struct ReadRowGroupStatistics { + pub statistics: ParquetFileStatistics, + pub row_count: i64, +} + +// Create a parquet file with one column for data type i64 +// Data of the file include +// . Number of null rows is the given num_null +// . There are non-null values in the range [no_null_values_start, no_null_values_end], one value each row +// . The file is divided into row groups of size row_per_group +pub fn parquet_row_group_statistics( + num_null: usize, + no_null_values_start: i64, + no_null_values_end: i64, + row_per_group: usize, +) -> RowGroupsStatistics { + let mut output_file = tempfile::Builder::new() + .prefix("parquert_statistics_test") + .suffix(".parquet") + .tempfile() + .expect("tempfile creation"); + + let props = WriterProperties::builder() + .set_max_row_group_size(row_per_group) + .set_statistics_enabled(EnabledStatistics::Chunk) + .build(); + + let batches = vec![ + make_int64_batches_with_null(num_null, no_null_values_start, no_null_values_end), // TODO: likely make this more general using + // create_data_batch(scenario); where scenario is the respective enum data type + ]; + + let schema = batches[0].schema(); + + let mut writer = ArrowWriter::try_new(&mut output_file, schema, Some(props)).unwrap(); + + for batch in batches { + writer.write(&batch).expect("writing batch"); + } + + //////////////// WRITE STATISTICS /////////////////////// + let file_meta = writer.close().unwrap(); + + // meta data of the row groups + let rg_metas = file_meta.row_groups; + + // Statstics of each grow group + let mut write_rg_statistics = Vec::new(); + for rg_meta in rg_metas { + let column = &rg_meta.columns; + assert_eq!(column.len(), 1, "expected 1 column"); + let col_metadata = &column[0].meta_data; + let col_metadata = col_metadata.as_ref().unwrap(); + let stats = &col_metadata.statistics; + let stats = stats.as_ref().unwrap(); + + let rg_stats = WriteRowGroupStatistics { + row_count: rg_meta.num_rows, + statistics: stats.clone(), + }; + write_rg_statistics.push(rg_stats); + } + + //////////////// READ STATISTICS /////////////////////// + let file = output_file.reopen().unwrap(); + + let reader = ArrowReaderBuilder::try_new(file).unwrap(); + let parquet_meta = reader.metadata().clone(); + + // meta data of the row groups + let rg_metas = parquet_meta.row_groups(); + + // Statstics of each grow group + let mut read_rg_statistics = Vec::new(); + for rg_meta in rg_metas { + let column = &rg_meta.columns(); + assert_eq!(column.len(), 1, "expected 1 column"); + let stats = &column[0].statistics().unwrap(); + + let rg_stats = ReadRowGroupStatistics { + row_count: rg_meta.num_rows() as i64, + statistics: (*stats).clone(), + }; + read_rg_statistics.push(rg_stats); + } + + RowGroupsStatistics { + write_statistics: write_rg_statistics, + read_statistics: read_rg_statistics, + } +} + +// TESTS + +#[tokio::test] +async fn test_one_row_group_without_null() { + let row_per_group = 20; + let rg_statistics = parquet_row_group_statistics(0, 4, 7, row_per_group); + + //////////// Verify write row group statistics //////////// + let write_rg_stats = &rg_statistics.write_statistics; + + // only 1 row group + assert_eq!(write_rg_stats.len(), 1, "expected 1 row group"); + let rg = &write_rg_stats[0]; + + // 3 rows + assert_eq!(rg.row_count, 3, "expected 3 rows"); + + // no nulls + assert!(rg.statistics.null_count.is_none(), "expected no nulls"); + + let statistics = &rg.statistics; + + // min is [4, 0, 0, 0, 0, 0, 0, 0] + assert_eq!( + *statistics.min_value.as_ref().unwrap(), + [4, 0, 0, 0, 0, 0, 0, 0] + ); + // max is [6, 0, 0, 0, 0, 0, 0, 0] + assert_eq!( + *statistics.max_value.as_ref().unwrap(), + [6, 0, 0, 0, 0, 0, 0, 0] + ); + + println!("RG statistics:\n {statistics:#?}"); + // This will display: + // RG statistics: + // Statistics { + // max: Some( + // [ + // 6, + // 0, + // 0, + // 0, + // 0, + // 0, + // 0, + // 0, + // ], + // ), + // min: Some( + // [ + // 4, + // 0, + // 0, + // 0, + // 0, + // 0, + // 0, + // 0, + // ], + // ), + // null_count: None, + // distinct_count: None, + // max_value: Some( + // [ + // 6, + // 0, + // 0, + // 0, + // 0, + // 0, + // 0, + // 0, + // ], + // ), + // min_value: Some( + // [ + // 4, + // 0, + // 0, + // 0, + // 0, + // 0, + // 0, + // 0, + // ], + // ), + // is_max_value_exact: Some( + // true, + // ), + // is_min_value_exact: Some( + // true, + // ), + // } + + //////////// Verify read row group statistics //////////// + let read_rg_stats = &rg_statistics.read_statistics; + + // only 1 row group + assert_eq!(read_rg_stats.len(), 1, "expected 1 row group"); + let rg = &read_rg_stats[0]; + + // 3 rows + assert_eq!(rg.row_count, 3, "expected 3 rows"); + + // rg.statistics is Int64 + let stats = &rg.statistics; + match stats { + ParquetFileStatistics::Int64(int64_stats) => { + // no nulls + assert_eq!(int64_stats.null_count(), 0, "expected no nulls"); + // min is 4 + assert_eq!(*int64_stats.min(), 4); + // max is 6 + assert_eq!(*int64_stats.max(), 6); + } + _ => panic!("expected Int64"), + } + + // TODO: + // 1. run the function (to be implemented) to convert rg_statistics to ArrowStatistics + // 2. compare the ArrowStatistics with the expected +} + +#[tokio::test] +async fn test_one_row_group_with_null_and_negative() { + let row_per_group = 20; + let rg_statistics = parquet_row_group_statistics(2, -1, 5, row_per_group); + + //////////// Verify write row group statistics //////////// + let write_rg_stats = &rg_statistics.write_statistics; + + // only 1 row group + assert_eq!(write_rg_stats.len(), 1, "expected 1 row group"); + let rg = &write_rg_stats[0]; + + // 8 rows + assert_eq!(rg.row_count, 8, "expected 8 rows"); + + // 2 nulls + assert_eq!(rg.statistics.null_count, Some(2), "expected 2 nulls"); + + let statistics = &rg.statistics; + + // // min of -1 is [255, 255, 255, 255, 255, 255, 255, 255] + assert_eq!( + *statistics.min_value.as_ref().unwrap(), + [255, 255, 255, 255, 255, 255, 255, 255] + ); + // max is [4, 0, 0, 0, 0, 0, 0, 0] + assert_eq!( + *statistics.max_value.as_ref().unwrap(), + [4, 0, 0, 0, 0, 0, 0, 0] + ); + + println!("RG statistics:\n {statistics:#?}"); + + //////////// Verify read row group statistics //////////// + let read_rg_stats = &rg_statistics.read_statistics; + + // only 1 row group + assert_eq!(read_rg_stats.len(), 1, "expected 1 row group"); + let rg = &read_rg_stats[0]; + + // 8 rows + assert_eq!(rg.row_count, 8, "expected 8 rows"); + + // rg.statistics is Int64 + let stats = &rg.statistics; + match stats { + ParquetFileStatistics::Int64(int64_stats) => { + // 2 nulls + assert_eq!(int64_stats.null_count(), 2, "expected 2 nulls"); + // min of -1 + assert_eq!(*int64_stats.min(), -1); + // max is 4 + assert_eq!(*int64_stats.max(), 4); + } + _ => panic!("expected Int64"), + } + + // TODO: + // 1. run the function (to be implemented) to convert rg_statistics to ArrowStatistics + // 2. compare the ArrowStatistics with the expected +} + +#[tokio::test] +async fn test_two_row_group_with_null() { + let row_per_group = 10; + let rg_statistics = parquet_row_group_statistics(2, 4, 17, row_per_group); + + //////////// Verify write row group statistics //////////// + let write_rg_stats = &rg_statistics.write_statistics; + + // 2 row group + assert_eq!(write_rg_stats.len(), 2, "expected 2 row group"); + + // 1st row group with 10 rows (max number of rows) + let rg1 = &write_rg_stats[0]; + assert_eq!(rg1.row_count, 10, "expected 10 rows"); + let stats = &rg1.statistics; + // no nulls + assert_eq!(stats.null_count, None, "expected no nulls"); + // min is [4, 0, 0, 0, 0, 0, 0, 0] + assert_eq!(*stats.min_value.as_ref().unwrap(), [4, 0, 0, 0, 0, 0, 0, 0]); + // max is [13, 0, 0, 0, 0, 0, 0, 0] + assert_eq!( + *stats.max_value.as_ref().unwrap(), + [13, 0, 0, 0, 0, 0, 0, 0] + ); + println!("RG1 statistics:\n {stats:#?}"); + + // 2nd row group with 5 rows + let rg2 = &write_rg_stats[1]; + assert_eq!(rg2.row_count, 5, "expected 5 rows"); + // 2 nulls + assert_eq!(rg2.statistics.null_count, Some(2), "expected no nulls"); + // min is [14, 0, 0, 0, 0, 0, 0, 0] + assert_eq!( + *rg2.statistics.min_value.as_ref().unwrap(), + [14, 0, 0, 0, 0, 0, 0, 0] + ); + // max is [16, 0, 0, 0, 0, 0, 0, 0] + assert_eq!( + *rg2.statistics.max_value.as_ref().unwrap(), + [16, 0, 0, 0, 0, 0, 0, 0] + ); + + let stats = &rg2.statistics; + println!("RG2 statistics:\n {stats:#?}"); + + //////////// Verify read row group statistics //////////// + let read_rg_stats = &rg_statistics.read_statistics; + + // 2 row groups + assert_eq!(read_rg_stats.len(), 2, "expected 2 row groups"); + + // 1st row group with 10 rows (max number of rows) + let rg1 = &read_rg_stats[0]; + assert_eq!(rg1.row_count, 10, "expected 10 rows"); + + // rg1.statistics is Int64 + let stats = &rg1.statistics; + match stats { + ParquetFileStatistics::Int64(int64_stats) => { + // no nulls + assert_eq!(int64_stats.null_count(), 0, "expected no nulls"); + // min is 4 + assert_eq!(*int64_stats.min(), 4); + // max is 13 + assert_eq!(*int64_stats.max(), 13); + } + _ => panic!("expected Int64"), + } + + // 2nd row group with 5 rows + let rg2 = &read_rg_stats[1]; + assert_eq!(rg2.row_count, 5, "expected 5 rows"); + + // rg2.statistics is Int64 + let stats = &rg2.statistics; + match stats { + ParquetFileStatistics::Int64(int64_stats) => { + // 2 nulls + assert_eq!(int64_stats.null_count(), 2, "expected 2 nulls"); + // min is 14 + assert_eq!(*int64_stats.min(), 14); + // max is 16 + assert_eq!(*int64_stats.max(), 16); + } + _ => panic!("expected Int64"), + } + + // TODO: + // 1. run the function (to be implemented) to convert rg_statistics to ArrowStatistics + // 2. compare the ArrowStatistics with the expected +} + +#[tokio::test] +async fn test_two_row_groups_with_all_nulls_in_one() { + let row_per_group = 5; + let rg_statistics = parquet_row_group_statistics(4, -2, 2, row_per_group); + + //////////// Verify write row group statistics //////////// + let write_rg_stats = &rg_statistics.write_statistics; + + // 2 row group + assert_eq!(write_rg_stats.len(), 2, "expected 2 row group"); + + // 1st row group with 5 rows + let rg1 = &write_rg_stats[0]; + assert_eq!(rg1.row_count, 5, "expected 5 rows"); + // 1 null + assert_eq!(rg1.statistics.null_count, Some(1), "expected 1 null"); + // min of -2 is [254, 255, 255, 255, 255, 255, 255, 255] + assert_eq!( + *rg1.statistics.min_value.as_ref().unwrap(), + [254, 255, 255, 255, 255, 255, 255, 255] + ); + // max is [1, 0, 0, 0, 0, 0, 0, 0] + assert_eq!( + *rg1.statistics.max_value.as_ref().unwrap(), + [1, 0, 0, 0, 0, 0, 0, 0] + ); + let stats = &rg1.statistics; + println!("RG1 statistics:\n {stats:#?}"); + + // 2nd row group with 3 rows + let rg2 = &write_rg_stats[1]; + assert_eq!(rg2.row_count, 3, "expected 3 rows"); + // 3 nulls + assert_eq!(rg2.statistics.null_count, Some(3), "expected 3 nulls"); + // min is None + assert_eq!(rg2.statistics.min_value, None); + // max is None + assert_eq!(rg2.statistics.max_value, None); + let stats = &rg2.statistics; + println!("RG2 statistics:\n {stats:#?}"); + + //////////// Verify read row group statistics //////////// + let read_rg_stats = &rg_statistics.read_statistics; + + // 2 row groups + assert_eq!(read_rg_stats.len(), 2, "expected 2 row groups"); + + // 1st row group with 5 rows + let rg1 = &read_rg_stats[0]; + assert_eq!(rg1.row_count, 5, "expected 5 rows"); + + // rg1.statistics is Int64 + let stats = &rg1.statistics; + match stats { + ParquetFileStatistics::Int64(int64_stats) => { + // 1 null + assert_eq!(int64_stats.null_count(), 1, "expected 1 null"); + // min of -2 + assert_eq!(*int64_stats.min(), -2); + // max is 1 + assert_eq!(*int64_stats.max(), 1); + } + _ => panic!("expected Int64"), + } + + // 2nd row group with 3 rows + let rg2 = &read_rg_stats[1]; + assert_eq!(rg2.row_count, 3, "expected 3 rows"); + + // rg2.statistics is Int64 + let stats = &rg2.statistics; + match stats { + ParquetFileStatistics::Int64(int64_stats) => { + // 3 nulls + assert_eq!(int64_stats.null_count(), 3, "expected 3 nulls"); + // min is None + assert!(!int64_stats.has_min_max_set()); + } + _ => panic!("expected Int64"), + } + + // TODO: + // 1. run the function (to be implemented) to convert rg_statistics to ArrowStatistics + // 2. compare the ArrowStatistics with the expected +} diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index bb938e3af493c..edc7c641877a5 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -41,6 +41,7 @@ use parquet::file::properties::WriterProperties; use std::sync::Arc; use tempfile::NamedTempFile; +mod arrow_statistics; mod custom_reader; mod file_statistics; #[cfg(not(target_family = "windows"))] From 4b107f8f0f9184e0185c0b755ef44ab0f1373ee3 Mon Sep 17 00:00:00 2001 From: NGA-TRAN Date: Thu, 16 May 2024 18:25:13 -0400 Subject: [PATCH 02/11] feat: API to convert parquet stats to arrow stats --- .../physical_plan/parquet/arrow_statistics.rs | 243 +++++++- .../datasource/physical_plan/parquet/mod.rs | 4 +- .../core/tests/parquet/arrow_statistics.rs | 530 +++++------------- 3 files changed, 364 insertions(+), 413 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/arrow_statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/arrow_statistics.rs index 4d380a68c5768..4a4d7cf7a33fd 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/arrow_statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/arrow_statistics.rs @@ -1,13 +1,40 @@ -use arrow_array::ArrayRef; +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Extract parquet statistics and convert it to Arrow statistics + +use std::{fs::File, sync::Arc}; + +use arrow_array::{ArrayRef, Int64Array, UInt64Array}; use arrow_schema::DataType; -use datafusion_common::Result; -use parquet::file::statistics::Statistics as ParquetStatistics; +use datafusion_common::{DataFusionError, Result}; +use parquet::{ + arrow::arrow_reader::ParquetRecordBatchReaderBuilder, + file::statistics::{Statistics as ParquetStatistics, ValueStatistics}, +}; + +use super::statistics::parquet_column; /// statistics extracted from `Statistics` as Arrow `ArrayRef`s /// /// # Note: /// If the corresponding `Statistics` is not present, or has no information for /// a column, a NULL is present in the corresponding array entry +#[derive(Debug)] pub struct ArrowStatistics { /// min values min: ArrayRef, @@ -19,25 +46,207 @@ pub struct ArrowStatistics { null_count: ArrayRef, } +impl ArrowStatistics { + /// Create a new instance of `ArrowStatistics` + pub fn new( + min: ArrayRef, + max: ArrayRef, + row_count: ArrayRef, + null_count: ArrayRef, + ) -> Self { + Self { + min, + max, + row_count, + null_count, + } + } + + /// Get the min values + pub fn min(&self) -> &ArrayRef { + &self.min + } + + /// Get the max values + pub fn max(&self) -> &ArrayRef { + &self.max + } + + /// Get the row counts + pub fn row_count(&self) -> &ArrayRef { + &self.row_count + } + + /// Get the null counts + pub fn null_count(&self) -> &ArrayRef { + &self.null_count + } +} + /// Extract `ArrowStatistics` from the parquet [`Statistics`] -pub fn parquet_stats_to_arrow<'a>( +pub fn parquet_stats_to_arrow( arrow_datatype: &DataType, - statistics: impl IntoIterator>, + statistics: &ParquetColumnStatistics, ) -> Result { - todo!() // MY TODO next + // check of the data type is Int64 + if !matches!(arrow_datatype, DataType::Int64) { + return Err(DataFusionError::Internal(format!( + "Unsupported data type {:?} for statistics", + arrow_datatype + ))); + } + + // row counts + let row_count = statistics + .rg_statistics + .iter() + .map(|rg| rg.row_count) + .collect::>(); + + // get null counts + let parquet_stats = statistics.rg_statistics.iter().map(|rg| rg.statistics); + let null_counts = parquet_stats + // .map(|stats| stats.and_then(|s| Some(s.null_count()))) + .map(|stats| stats.map(|s| s.null_count())) + .collect::>(); + + // get min and max values + let parquet_stats_with_min_max = statistics + .rg_statistics + .iter() + .map(|rg| rg.get_statistics()) + .collect::>(); + + let mins = parquet_stats_with_min_max + .iter() + .map(|stats| { + stats.and_then(|s| { + let stats = ParquetColumnRowGroupStatistics::try_as_i64(s)?; + Some(*stats.min()) + }) + }) + .collect::>(); + + let maxs = parquet_stats_with_min_max + .iter() + .map(|stats| { + stats.and_then(|s| { + let stats = ParquetColumnRowGroupStatistics::try_as_i64(s)?; + Some(*stats.max()) + }) + }) + .collect::>(); + + Ok(ArrowStatistics { + min: Arc::new(Int64Array::from(mins)), + max: Arc::new(Int64Array::from(maxs)), + row_count: Arc::new(UInt64Array::from(row_count)), + null_count: Arc::new(UInt64Array::from(null_counts)), + }) } -// TODO: Now I had tests that read parquet metadata, I will use that to implement the below -// struct ParquetStatisticsExtractor { -// ... -// } +/// All row group statistics of a file for a column +pub struct ParquetColumnStatistics<'a> { + // todo: do we need this? + // arrow column schema + // column_schema: &'a FieldRef, + _column_name: &'a str, // todo: do we need this? + rg_statistics: Vec>, +} -// // create an extractor that can extract data from parquet files -// let extractor = ParquetStatisticsExtractor::new(arrow_schema, parquet_schema) +/// Row group statistics of a column +pub struct ParquetColumnRowGroupStatistics<'a> { + row_count: u64, + statistics: Option<&'a ParquetStatistics>, +} -// // get parquet statistics (one for each row group) somehow: -// let parquet_stats: Vec<&Statistics> = ...; +impl<'a> ParquetColumnRowGroupStatistics<'a> { + /// Create a new instance of `ParquetColumnRowGroupStatistics` + pub fn new(row_count: u64, statistics: Option<&'a ParquetStatistics>) -> Self { + Self { + row_count, + statistics, + } + } + + /// Return statistics if it exists and has min max + /// Otherwise return None + pub fn get_statistics(&self) -> Option<&'a ParquetStatistics> { + let stats = self.statistics?; + if stats.has_min_max_set() { + Some(stats) + } else { + None + } + } + + /// Return the statistics as ValuesStatistcs if the column is i64 + /// Otherwise return None + fn try_as_i64(stats: &'a ParquetStatistics) -> Option<&'a ValueStatistics> { + if let parquet::file::statistics::Statistics::Int64(statistics) = stats { + Some(statistics) + } else { + None + } + } +} -// // extract min/max values for column "a" and "b"; -// let col_a stats = extractor.extract("a", parquet_stats.iter()); -// let col_b stats = extractor.extract("b", parquet_stats.iter()); +impl<'a> ParquetColumnStatistics<'a> { + /// Create a new instance of `ParquetColumnStatistics` + pub fn new( + _column_name: &'a str, + rg_statistics: Vec>, + ) -> Self { + Self { + _column_name, + rg_statistics, + } + } + + /// Extract statistics of all columns from a parquet file metadata + pub fn from_parquet_statistics( + reader: &'a ParquetRecordBatchReaderBuilder, + ) -> Result> { + // Get metadata & schemas + let metadata = reader.metadata(); + let parquet_schema = reader.parquet_schema(); + let arrow_schema = reader.schema(); + + // Get colum names from arrow schema & its index in the parquet schema + let columns = arrow_schema + .fields() + .iter() + .map(|f| { + let col_name = f.name(); + let col_idx = parquet_column(parquet_schema, arrow_schema, col_name); + match col_idx { + Some(idx) => Ok((col_name, idx)), + None => Err(DataFusionError::Internal(format!( + "Column {} in Arrow schema not found in Parquet schema", + col_name, + ))), + } + }) + .collect::, _>>()?; + + // Get statistics for each column + let col_stats = columns + .iter() + .map(|(col_name, col_idx)| { + let rg_statistics = metadata + .row_groups() + .iter() + .map(|rg_meta| { + let row_count = rg_meta.num_rows() as u64; + let statistics = rg_meta.column(col_idx.0).statistics(); + ParquetColumnRowGroupStatistics::new(row_count, statistics) + }) + .collect::>(); + + ParquetColumnStatistics::new(col_name, rg_statistics) + }) + .collect::>(); + + Ok(col_stats) + } +} diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index 33922782fc55f..a9e340cce1a8a 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -63,13 +63,14 @@ use parquet::file::{metadata::ParquetMetaData, properties::WriterProperties}; use parquet::schema::types::ColumnDescriptor; use tokio::task::JoinSet; -mod arrow_statistics; +pub mod arrow_statistics; mod metrics; mod page_filter; mod row_filter; mod row_groups; mod statistics; +// pub use arrow_statistics::ParquetColumnStatistics; pub use metrics::ParquetFileMetrics; /// Execution plan for scanning one or more Parquet partitions @@ -458,7 +459,6 @@ struct ParquetOpener { impl FileOpener for ParquetOpener { fn open(&self, file_meta: FileMeta) -> Result { let file_range = file_meta.range.clone(); - let file_metrics = ParquetFileMetrics::new( self.partition_index, file_meta.location().as_ref(), diff --git a/datafusion/core/tests/parquet/arrow_statistics.rs b/datafusion/core/tests/parquet/arrow_statistics.rs index 3636ca411ac57..458d4222770b0 100644 --- a/datafusion/core/tests/parquet/arrow_statistics.rs +++ b/datafusion/core/tests/parquet/arrow_statistics.rs @@ -18,16 +18,17 @@ //! This file contains an end to end test of extracting statitics from parquet files. //! It writes data into a parquet file, reads statistics and verifies they are correct +use std::fs::File; use std::sync::Arc; -// use arrow::json::reader; -use arrow_array::{make_array, Array, Int64Array, RecordBatch}; +use arrow_array::{make_array, Array, ArrayRef, Int64Array, RecordBatch, UInt64Array}; use arrow_schema::{DataType, Field, Schema}; -use parquet::arrow::arrow_reader::ArrowReaderBuilder; +use datafusion::datasource::physical_plan::parquet::arrow_statistics::{ + self, parquet_stats_to_arrow, +}; +use parquet::arrow::arrow_reader::{ArrowReaderBuilder, ParquetRecordBatchReaderBuilder}; use parquet::arrow::ArrowWriter; use parquet::file::properties::{EnabledStatistics, WriterProperties}; -use parquet::file::statistics::Statistics as ParquetFileStatistics; -use parquet::format::Statistics as ParquetFormatStatistics; // TEST HELPERS @@ -55,38 +56,17 @@ fn make_int64_batches_with_null( .unwrap() } -// TODO: Eventually, we will need only read_statistics and read_row_count, -// but for my understading, we will keep both write and read statistics. -// Their values should be the same -pub struct RowGroupsStatistics { - // Statistics from the file writer - pub write_statistics: Vec, - - // Statistics from the file reader - pub read_statistics: Vec, -} - -pub struct WriteRowGroupStatistics { - pub statistics: ParquetFormatStatistics, - pub row_count: i64, -} - -pub struct ReadRowGroupStatistics { - pub statistics: ParquetFileStatistics, - pub row_count: i64, -} - // Create a parquet file with one column for data type i64 // Data of the file include // . Number of null rows is the given num_null // . There are non-null values in the range [no_null_values_start, no_null_values_end], one value each row // . The file is divided into row groups of size row_per_group -pub fn parquet_row_group_statistics( +pub fn parquet_file( num_null: usize, no_null_values_start: i64, no_null_values_end: i64, row_per_group: usize, -) -> RowGroupsStatistics { +) -> ParquetRecordBatchReaderBuilder { let mut output_file = tempfile::Builder::new() .prefix("parquert_statistics_test") .suffix(".parquet") @@ -111,56 +91,12 @@ pub fn parquet_row_group_statistics( writer.write(&batch).expect("writing batch"); } - //////////////// WRITE STATISTICS /////////////////////// - let file_meta = writer.close().unwrap(); - - // meta data of the row groups - let rg_metas = file_meta.row_groups; - - // Statstics of each grow group - let mut write_rg_statistics = Vec::new(); - for rg_meta in rg_metas { - let column = &rg_meta.columns; - assert_eq!(column.len(), 1, "expected 1 column"); - let col_metadata = &column[0].meta_data; - let col_metadata = col_metadata.as_ref().unwrap(); - let stats = &col_metadata.statistics; - let stats = stats.as_ref().unwrap(); - - let rg_stats = WriteRowGroupStatistics { - row_count: rg_meta.num_rows, - statistics: stats.clone(), - }; - write_rg_statistics.push(rg_stats); - } + // close file + let _file_meta = writer.close().unwrap(); - //////////////// READ STATISTICS /////////////////////// + // open the file & get the reader let file = output_file.reopen().unwrap(); - - let reader = ArrowReaderBuilder::try_new(file).unwrap(); - let parquet_meta = reader.metadata().clone(); - - // meta data of the row groups - let rg_metas = parquet_meta.row_groups(); - - // Statstics of each grow group - let mut read_rg_statistics = Vec::new(); - for rg_meta in rg_metas { - let column = &rg_meta.columns(); - assert_eq!(column.len(), 1, "expected 1 column"); - let stats = &column[0].statistics().unwrap(); - - let rg_stats = ReadRowGroupStatistics { - row_count: rg_meta.num_rows() as i64, - statistics: (*stats).clone(), - }; - read_rg_statistics.push(rg_stats); - } - - RowGroupsStatistics { - write_statistics: write_rg_statistics, - read_statistics: read_rg_statistics, - } + ArrowReaderBuilder::try_new(file).unwrap() } // TESTS @@ -168,361 +104,167 @@ pub fn parquet_row_group_statistics( #[tokio::test] async fn test_one_row_group_without_null() { let row_per_group = 20; - let rg_statistics = parquet_row_group_statistics(0, 4, 7, row_per_group); + let reader = parquet_file(0, 4, 7, row_per_group); - //////////// Verify write row group statistics //////////// - let write_rg_stats = &rg_statistics.write_statistics; + let statistics = + arrow_statistics::ParquetColumnStatistics::from_parquet_statistics(&reader) + .expect("getting statistics"); - // only 1 row group - assert_eq!(write_rg_stats.len(), 1, "expected 1 row group"); - let rg = &write_rg_stats[0]; + // only 1 column + assert_eq!(statistics.len(), 1, "expected 1 column"); - // 3 rows - assert_eq!(rg.row_count, 3, "expected 3 rows"); + let arrow_stats = parquet_stats_to_arrow(&DataType::Int64, &statistics[0]).unwrap(); + println!("Arrow statistics:\n {arrow_stats:#?}"); + + // only one row group + let min = arrow_stats.min(); + assert_eq!(arrow_stats.min().len(), 1, "expected 1 row group"); + + // min is 4 + let expect = Int64Array::from(vec![4]); + let expect = Arc::new(expect) as ArrayRef; + assert_eq!(min, &expect); + + // max is 6 + let expect = Int64Array::from(vec![6]); + let expect = Arc::new(expect) as ArrayRef; + assert_eq!(arrow_stats.max(), &expect); // no nulls - assert!(rg.statistics.null_count.is_none(), "expected no nulls"); - - let statistics = &rg.statistics; - - // min is [4, 0, 0, 0, 0, 0, 0, 0] - assert_eq!( - *statistics.min_value.as_ref().unwrap(), - [4, 0, 0, 0, 0, 0, 0, 0] - ); - // max is [6, 0, 0, 0, 0, 0, 0, 0] - assert_eq!( - *statistics.max_value.as_ref().unwrap(), - [6, 0, 0, 0, 0, 0, 0, 0] - ); - - println!("RG statistics:\n {statistics:#?}"); - // This will display: - // RG statistics: - // Statistics { - // max: Some( - // [ - // 6, - // 0, - // 0, - // 0, - // 0, - // 0, - // 0, - // 0, - // ], - // ), - // min: Some( - // [ - // 4, - // 0, - // 0, - // 0, - // 0, - // 0, - // 0, - // 0, - // ], - // ), - // null_count: None, - // distinct_count: None, - // max_value: Some( - // [ - // 6, - // 0, - // 0, - // 0, - // 0, - // 0, - // 0, - // 0, - // ], - // ), - // min_value: Some( - // [ - // 4, - // 0, - // 0, - // 0, - // 0, - // 0, - // 0, - // 0, - // ], - // ), - // is_max_value_exact: Some( - // true, - // ), - // is_min_value_exact: Some( - // true, - // ), - // } - - //////////// Verify read row group statistics //////////// - let read_rg_stats = &rg_statistics.read_statistics; - - // only 1 row group - assert_eq!(read_rg_stats.len(), 1, "expected 1 row group"); - let rg = &read_rg_stats[0]; + let expect = UInt64Array::from(vec![0]); + let expect = Arc::new(expect) as ArrayRef; + assert_eq!(arrow_stats.null_count(), &expect); // 3 rows - assert_eq!(rg.row_count, 3, "expected 3 rows"); - - // rg.statistics is Int64 - let stats = &rg.statistics; - match stats { - ParquetFileStatistics::Int64(int64_stats) => { - // no nulls - assert_eq!(int64_stats.null_count(), 0, "expected no nulls"); - // min is 4 - assert_eq!(*int64_stats.min(), 4); - // max is 6 - assert_eq!(*int64_stats.max(), 6); - } - _ => panic!("expected Int64"), - } - - // TODO: - // 1. run the function (to be implemented) to convert rg_statistics to ArrowStatistics - // 2. compare the ArrowStatistics with the expected + let expect = UInt64Array::from(vec![3]); + let expect = Arc::new(expect) as ArrayRef; + assert_eq!(arrow_stats.row_count(), &expect); } #[tokio::test] async fn test_one_row_group_with_null_and_negative() { let row_per_group = 20; - let rg_statistics = parquet_row_group_statistics(2, -1, 5, row_per_group); - - //////////// Verify write row group statistics //////////// - let write_rg_stats = &rg_statistics.write_statistics; + let reader = parquet_file(2, -1, 5, row_per_group); - // only 1 row group - assert_eq!(write_rg_stats.len(), 1, "expected 1 row group"); - let rg = &write_rg_stats[0]; + let statistics = + arrow_statistics::ParquetColumnStatistics::from_parquet_statistics(&reader) + .expect("getting statistics"); - // 8 rows - assert_eq!(rg.row_count, 8, "expected 8 rows"); - - // 2 nulls - assert_eq!(rg.statistics.null_count, Some(2), "expected 2 nulls"); + // only 1 column + assert_eq!(statistics.len(), 1, "expected 1 column"); - let statistics = &rg.statistics; + let arrow_stats = parquet_stats_to_arrow(&DataType::Int64, &statistics[0]).unwrap(); + println!("Arrow statistics:\n {arrow_stats:#?}"); - // // min of -1 is [255, 255, 255, 255, 255, 255, 255, 255] - assert_eq!( - *statistics.min_value.as_ref().unwrap(), - [255, 255, 255, 255, 255, 255, 255, 255] - ); - // max is [4, 0, 0, 0, 0, 0, 0, 0] - assert_eq!( - *statistics.max_value.as_ref().unwrap(), - [4, 0, 0, 0, 0, 0, 0, 0] - ); + // only one row group + let min = arrow_stats.min(); + assert_eq!(arrow_stats.min().len(), 1, "expected 1 row group"); - println!("RG statistics:\n {statistics:#?}"); + // min is -1 + let expect = Int64Array::from(vec![-1]); + let expect = Arc::new(expect) as ArrayRef; + assert_eq!(min, &expect); - //////////// Verify read row group statistics //////////// - let read_rg_stats = &rg_statistics.read_statistics; + // max is 4 + let expect = Int64Array::from(vec![4]); + let expect = Arc::new(expect) as ArrayRef; + assert_eq!(arrow_stats.max(), &expect); - // only 1 row group - assert_eq!(read_rg_stats.len(), 1, "expected 1 row group"); - let rg = &read_rg_stats[0]; + // 2 nulls + let expect = UInt64Array::from(vec![2]); + let expect = Arc::new(expect) as ArrayRef; + assert_eq!(arrow_stats.null_count(), &expect); // 8 rows - assert_eq!(rg.row_count, 8, "expected 8 rows"); - - // rg.statistics is Int64 - let stats = &rg.statistics; - match stats { - ParquetFileStatistics::Int64(int64_stats) => { - // 2 nulls - assert_eq!(int64_stats.null_count(), 2, "expected 2 nulls"); - // min of -1 - assert_eq!(*int64_stats.min(), -1); - // max is 4 - assert_eq!(*int64_stats.max(), 4); - } - _ => panic!("expected Int64"), - } - - // TODO: - // 1. run the function (to be implemented) to convert rg_statistics to ArrowStatistics - // 2. compare the ArrowStatistics with the expected + let expect = UInt64Array::from(vec![8]); + let expect = Arc::new(expect) as ArrayRef; + assert_eq!(arrow_stats.row_count(), &expect); } #[tokio::test] async fn test_two_row_group_with_null() { let row_per_group = 10; - let rg_statistics = parquet_row_group_statistics(2, 4, 17, row_per_group); + let reader = parquet_file(2, 4, 17, row_per_group); - //////////// Verify write row group statistics //////////// - let write_rg_stats = &rg_statistics.write_statistics; + let statistics = + arrow_statistics::ParquetColumnStatistics::from_parquet_statistics(&reader) + .expect("getting statistics"); - // 2 row group - assert_eq!(write_rg_stats.len(), 2, "expected 2 row group"); + // only 1 column + assert_eq!(statistics.len(), 1, "expected 1 column"); - // 1st row group with 10 rows (max number of rows) - let rg1 = &write_rg_stats[0]; - assert_eq!(rg1.row_count, 10, "expected 10 rows"); - let stats = &rg1.statistics; - // no nulls - assert_eq!(stats.null_count, None, "expected no nulls"); - // min is [4, 0, 0, 0, 0, 0, 0, 0] - assert_eq!(*stats.min_value.as_ref().unwrap(), [4, 0, 0, 0, 0, 0, 0, 0]); - // max is [13, 0, 0, 0, 0, 0, 0, 0] - assert_eq!( - *stats.max_value.as_ref().unwrap(), - [13, 0, 0, 0, 0, 0, 0, 0] - ); - println!("RG1 statistics:\n {stats:#?}"); - - // 2nd row group with 5 rows - let rg2 = &write_rg_stats[1]; - assert_eq!(rg2.row_count, 5, "expected 5 rows"); - // 2 nulls - assert_eq!(rg2.statistics.null_count, Some(2), "expected no nulls"); - // min is [14, 0, 0, 0, 0, 0, 0, 0] - assert_eq!( - *rg2.statistics.min_value.as_ref().unwrap(), - [14, 0, 0, 0, 0, 0, 0, 0] - ); - // max is [16, 0, 0, 0, 0, 0, 0, 0] - assert_eq!( - *rg2.statistics.max_value.as_ref().unwrap(), - [16, 0, 0, 0, 0, 0, 0, 0] - ); - - let stats = &rg2.statistics; - println!("RG2 statistics:\n {stats:#?}"); - - //////////// Verify read row group statistics //////////// - let read_rg_stats = &rg_statistics.read_statistics; + let arrow_stats = parquet_stats_to_arrow(&DataType::Int64, &statistics[0]).unwrap(); + println!("Arrow statistics:\n {arrow_stats:#?}"); // 2 row groups - assert_eq!(read_rg_stats.len(), 2, "expected 2 row groups"); - - // 1st row group with 10 rows (max number of rows) - let rg1 = &read_rg_stats[0]; - assert_eq!(rg1.row_count, 10, "expected 10 rows"); - - // rg1.statistics is Int64 - let stats = &rg1.statistics; - match stats { - ParquetFileStatistics::Int64(int64_stats) => { - // no nulls - assert_eq!(int64_stats.null_count(), 0, "expected no nulls"); - // min is 4 - assert_eq!(*int64_stats.min(), 4); - // max is 13 - assert_eq!(*int64_stats.max(), 13); - } - _ => panic!("expected Int64"), - } - - // 2nd row group with 5 rows - let rg2 = &read_rg_stats[1]; - assert_eq!(rg2.row_count, 5, "expected 5 rows"); - - // rg2.statistics is Int64 - let stats = &rg2.statistics; - match stats { - ParquetFileStatistics::Int64(int64_stats) => { - // 2 nulls - assert_eq!(int64_stats.null_count(), 2, "expected 2 nulls"); - // min is 14 - assert_eq!(*int64_stats.min(), 14); - // max is 16 - assert_eq!(*int64_stats.max(), 16); - } - _ => panic!("expected Int64"), - } - - // TODO: - // 1. run the function (to be implemented) to convert rg_statistics to ArrowStatistics - // 2. compare the ArrowStatistics with the expected + let mins = arrow_stats.min(); + assert_eq!(mins.len(), 2, "expected 2 row groups"); + + // mins are [4, 14] + let expect = Int64Array::from(vec![4, 14]); + let expect = Arc::new(expect) as ArrayRef; + assert_eq!(mins, &expect); + + // maxs are [13, 16] + let maxs = arrow_stats.max(); + let expect = Int64Array::from(vec![13, 16]); + let expect = Arc::new(expect) as ArrayRef; + assert_eq!(maxs, &expect); + + // nuls are [0, 2] + let nulls = arrow_stats.null_count(); + let expect = UInt64Array::from(vec![0, 2]); + let expect = Arc::new(expect) as ArrayRef; + assert_eq!(nulls, &expect); + + // row counts are [10, 5] + let row_counts = arrow_stats.row_count(); + let expect = UInt64Array::from(vec![10, 5]); + let expect = Arc::new(expect) as ArrayRef; + assert_eq!(row_counts, &expect); } #[tokio::test] async fn test_two_row_groups_with_all_nulls_in_one() { let row_per_group = 5; - let rg_statistics = parquet_row_group_statistics(4, -2, 2, row_per_group); - - //////////// Verify write row group statistics //////////// - let write_rg_stats = &rg_statistics.write_statistics; - - // 2 row group - assert_eq!(write_rg_stats.len(), 2, "expected 2 row group"); - - // 1st row group with 5 rows - let rg1 = &write_rg_stats[0]; - assert_eq!(rg1.row_count, 5, "expected 5 rows"); - // 1 null - assert_eq!(rg1.statistics.null_count, Some(1), "expected 1 null"); - // min of -2 is [254, 255, 255, 255, 255, 255, 255, 255] - assert_eq!( - *rg1.statistics.min_value.as_ref().unwrap(), - [254, 255, 255, 255, 255, 255, 255, 255] - ); - // max is [1, 0, 0, 0, 0, 0, 0, 0] - assert_eq!( - *rg1.statistics.max_value.as_ref().unwrap(), - [1, 0, 0, 0, 0, 0, 0, 0] - ); - let stats = &rg1.statistics; - println!("RG1 statistics:\n {stats:#?}"); - - // 2nd row group with 3 rows - let rg2 = &write_rg_stats[1]; - assert_eq!(rg2.row_count, 3, "expected 3 rows"); - // 3 nulls - assert_eq!(rg2.statistics.null_count, Some(3), "expected 3 nulls"); - // min is None - assert_eq!(rg2.statistics.min_value, None); - // max is None - assert_eq!(rg2.statistics.max_value, None); - let stats = &rg2.statistics; - println!("RG2 statistics:\n {stats:#?}"); - - //////////// Verify read row group statistics //////////// - let read_rg_stats = &rg_statistics.read_statistics; + let reader = parquet_file(4, -2, 2, row_per_group); - // 2 row groups - assert_eq!(read_rg_stats.len(), 2, "expected 2 row groups"); - - // 1st row group with 5 rows - let rg1 = &read_rg_stats[0]; - assert_eq!(rg1.row_count, 5, "expected 5 rows"); - - // rg1.statistics is Int64 - let stats = &rg1.statistics; - match stats { - ParquetFileStatistics::Int64(int64_stats) => { - // 1 null - assert_eq!(int64_stats.null_count(), 1, "expected 1 null"); - // min of -2 - assert_eq!(*int64_stats.min(), -2); - // max is 1 - assert_eq!(*int64_stats.max(), 1); - } - _ => panic!("expected Int64"), - } + let statistics = + arrow_statistics::ParquetColumnStatistics::from_parquet_statistics(&reader) + .expect("getting statistics"); - // 2nd row group with 3 rows - let rg2 = &read_rg_stats[1]; - assert_eq!(rg2.row_count, 3, "expected 3 rows"); - - // rg2.statistics is Int64 - let stats = &rg2.statistics; - match stats { - ParquetFileStatistics::Int64(int64_stats) => { - // 3 nulls - assert_eq!(int64_stats.null_count(), 3, "expected 3 nulls"); - // min is None - assert!(!int64_stats.has_min_max_set()); - } - _ => panic!("expected Int64"), - } + // only 1 column + assert_eq!(statistics.len(), 1, "expected 1 column"); - // TODO: - // 1. run the function (to be implemented) to convert rg_statistics to ArrowStatistics - // 2. compare the ArrowStatistics with the expected + let arrow_stats = parquet_stats_to_arrow(&DataType::Int64, &statistics[0]).unwrap(); + println!("Arrow statistics:\n {arrow_stats:#?}"); + + // 2 row groups + let mins = arrow_stats.min(); + assert_eq!(mins.len(), 2, "expected 2 row groups"); + + // mins are [-2, null] + assert!(mins.is_null(1)); + // check the first value -2 + // let expect = Int64Array::from(vec![-2]); + // let expect = Arc::new(expect) as ArrayRef; + // assert_eq!(mins, &expect); + + // maxs are [1, null + assert!(arrow_stats.max().is_null(1)); + // let expect = Int64Array::from(vec![1]); + // let expect = Arc::new(expect) as ArrayRef; + // assert_eq!(arrow_stats.max(), &expect); + + // nuls are [1, 3] + let nulls = arrow_stats.null_count(); + let expect = UInt64Array::from(vec![1, 3]); + let expect = Arc::new(expect) as ArrayRef; + assert_eq!(nulls, &expect); + + // row counts are [5, 3] + let row_counts = arrow_stats.row_count(); + let expect = UInt64Array::from(vec![5, 3]); + let expect = Arc::new(expect) as ArrayRef; + assert_eq!(row_counts, &expect); } From ab32178726dfce13ed3333220f568bcf04818a13 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 17 May 2024 09:53:45 -0400 Subject: [PATCH 03/11] Refine statistics extraction API and tests --- .../physical_plan/parquet/arrow_statistics.rs | 252 ------------------ .../datasource/physical_plan/parquet/mod.rs | 3 +- .../physical_plan/parquet/statistics.rs | 149 ++++++++++- .../core/tests/parquet/arrow_statistics.rs | 107 ++++---- 4 files changed, 209 insertions(+), 302 deletions(-) delete mode 100644 datafusion/core/src/datasource/physical_plan/parquet/arrow_statistics.rs diff --git a/datafusion/core/src/datasource/physical_plan/parquet/arrow_statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/arrow_statistics.rs deleted file mode 100644 index 4a4d7cf7a33fd..0000000000000 --- a/datafusion/core/src/datasource/physical_plan/parquet/arrow_statistics.rs +++ /dev/null @@ -1,252 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Extract parquet statistics and convert it to Arrow statistics - -use std::{fs::File, sync::Arc}; - -use arrow_array::{ArrayRef, Int64Array, UInt64Array}; -use arrow_schema::DataType; -use datafusion_common::{DataFusionError, Result}; -use parquet::{ - arrow::arrow_reader::ParquetRecordBatchReaderBuilder, - file::statistics::{Statistics as ParquetStatistics, ValueStatistics}, -}; - -use super::statistics::parquet_column; - -/// statistics extracted from `Statistics` as Arrow `ArrayRef`s -/// -/// # Note: -/// If the corresponding `Statistics` is not present, or has no information for -/// a column, a NULL is present in the corresponding array entry -#[derive(Debug)] -pub struct ArrowStatistics { - /// min values - min: ArrayRef, - /// max values - max: ArrayRef, - /// Row counts (UInt64Array) - row_count: ArrayRef, - /// Null Counts (UInt64Array) - null_count: ArrayRef, -} - -impl ArrowStatistics { - /// Create a new instance of `ArrowStatistics` - pub fn new( - min: ArrayRef, - max: ArrayRef, - row_count: ArrayRef, - null_count: ArrayRef, - ) -> Self { - Self { - min, - max, - row_count, - null_count, - } - } - - /// Get the min values - pub fn min(&self) -> &ArrayRef { - &self.min - } - - /// Get the max values - pub fn max(&self) -> &ArrayRef { - &self.max - } - - /// Get the row counts - pub fn row_count(&self) -> &ArrayRef { - &self.row_count - } - - /// Get the null counts - pub fn null_count(&self) -> &ArrayRef { - &self.null_count - } -} - -/// Extract `ArrowStatistics` from the parquet [`Statistics`] -pub fn parquet_stats_to_arrow( - arrow_datatype: &DataType, - statistics: &ParquetColumnStatistics, -) -> Result { - // check of the data type is Int64 - if !matches!(arrow_datatype, DataType::Int64) { - return Err(DataFusionError::Internal(format!( - "Unsupported data type {:?} for statistics", - arrow_datatype - ))); - } - - // row counts - let row_count = statistics - .rg_statistics - .iter() - .map(|rg| rg.row_count) - .collect::>(); - - // get null counts - let parquet_stats = statistics.rg_statistics.iter().map(|rg| rg.statistics); - let null_counts = parquet_stats - // .map(|stats| stats.and_then(|s| Some(s.null_count()))) - .map(|stats| stats.map(|s| s.null_count())) - .collect::>(); - - // get min and max values - let parquet_stats_with_min_max = statistics - .rg_statistics - .iter() - .map(|rg| rg.get_statistics()) - .collect::>(); - - let mins = parquet_stats_with_min_max - .iter() - .map(|stats| { - stats.and_then(|s| { - let stats = ParquetColumnRowGroupStatistics::try_as_i64(s)?; - Some(*stats.min()) - }) - }) - .collect::>(); - - let maxs = parquet_stats_with_min_max - .iter() - .map(|stats| { - stats.and_then(|s| { - let stats = ParquetColumnRowGroupStatistics::try_as_i64(s)?; - Some(*stats.max()) - }) - }) - .collect::>(); - - Ok(ArrowStatistics { - min: Arc::new(Int64Array::from(mins)), - max: Arc::new(Int64Array::from(maxs)), - row_count: Arc::new(UInt64Array::from(row_count)), - null_count: Arc::new(UInt64Array::from(null_counts)), - }) -} - -/// All row group statistics of a file for a column -pub struct ParquetColumnStatistics<'a> { - // todo: do we need this? - // arrow column schema - // column_schema: &'a FieldRef, - _column_name: &'a str, // todo: do we need this? - rg_statistics: Vec>, -} - -/// Row group statistics of a column -pub struct ParquetColumnRowGroupStatistics<'a> { - row_count: u64, - statistics: Option<&'a ParquetStatistics>, -} - -impl<'a> ParquetColumnRowGroupStatistics<'a> { - /// Create a new instance of `ParquetColumnRowGroupStatistics` - pub fn new(row_count: u64, statistics: Option<&'a ParquetStatistics>) -> Self { - Self { - row_count, - statistics, - } - } - - /// Return statistics if it exists and has min max - /// Otherwise return None - pub fn get_statistics(&self) -> Option<&'a ParquetStatistics> { - let stats = self.statistics?; - if stats.has_min_max_set() { - Some(stats) - } else { - None - } - } - - /// Return the statistics as ValuesStatistcs if the column is i64 - /// Otherwise return None - fn try_as_i64(stats: &'a ParquetStatistics) -> Option<&'a ValueStatistics> { - if let parquet::file::statistics::Statistics::Int64(statistics) = stats { - Some(statistics) - } else { - None - } - } -} - -impl<'a> ParquetColumnStatistics<'a> { - /// Create a new instance of `ParquetColumnStatistics` - pub fn new( - _column_name: &'a str, - rg_statistics: Vec>, - ) -> Self { - Self { - _column_name, - rg_statistics, - } - } - - /// Extract statistics of all columns from a parquet file metadata - pub fn from_parquet_statistics( - reader: &'a ParquetRecordBatchReaderBuilder, - ) -> Result> { - // Get metadata & schemas - let metadata = reader.metadata(); - let parquet_schema = reader.parquet_schema(); - let arrow_schema = reader.schema(); - - // Get colum names from arrow schema & its index in the parquet schema - let columns = arrow_schema - .fields() - .iter() - .map(|f| { - let col_name = f.name(); - let col_idx = parquet_column(parquet_schema, arrow_schema, col_name); - match col_idx { - Some(idx) => Ok((col_name, idx)), - None => Err(DataFusionError::Internal(format!( - "Column {} in Arrow schema not found in Parquet schema", - col_name, - ))), - } - }) - .collect::, _>>()?; - - // Get statistics for each column - let col_stats = columns - .iter() - .map(|(col_name, col_idx)| { - let rg_statistics = metadata - .row_groups() - .iter() - .map(|rg_meta| { - let row_count = rg_meta.num_rows() as u64; - let statistics = rg_meta.column(col_idx.0).statistics(); - ParquetColumnRowGroupStatistics::new(row_count, statistics) - }) - .collect::>(); - - ParquetColumnStatistics::new(col_name, rg_statistics) - }) - .collect::>(); - - Ok(col_stats) - } -} diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index a9e340cce1a8a..83fed1a9caabe 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -63,15 +63,14 @@ use parquet::file::{metadata::ParquetMetaData, properties::WriterProperties}; use parquet::schema::types::ColumnDescriptor; use tokio::task::JoinSet; -pub mod arrow_statistics; mod metrics; mod page_filter; mod row_filter; mod row_groups; mod statistics; -// pub use arrow_statistics::ParquetColumnStatistics; pub use metrics::ParquetFileMetrics; +pub use statistics::{RequestedStatistics, StatisticsConverter}; /// Execution plan for scanning one or more Parquet partitions #[derive(Debug, Clone)] diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index 8972c261b14a5..218bb0a17885a 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -20,9 +20,12 @@ // TODO: potentially move this to arrow-rs: https://github.com/apache/arrow-rs/issues/4328 use arrow::{array::ArrayRef, datatypes::DataType}; -use arrow_array::new_empty_array; -use arrow_schema::{FieldRef, Schema}; -use datafusion_common::{Result, ScalarValue}; +use arrow_array::{new_empty_array, new_null_array, UInt64Array}; +use arrow_schema::{Field, FieldRef, Schema}; +use datafusion_common::{ + internal_datafusion_err, internal_err, plan_err, Result, ScalarValue, +}; +use parquet::file::metadata::ParquetMetaData; use parquet::file::statistics::Statistics as ParquetStatistics; use parquet::schema::types::SchemaDescriptor; @@ -210,6 +213,146 @@ fn collect_scalars>>( } } +/// What type of statistics should be extracted? +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub enum RequestedStatistics { + /// Minimum Value + Min, + /// Maximum Value + Max, +} + +/// Extracts Parquet statistics as Arrow arrays +/// +/// This is used to convert Parquet statistics to Arrow arrays, with proper type +/// conversions. This information can be used for pruning parquet files or row +/// groups based on the statistics embedded in parquet files +/// +/// # Schemas +/// +/// The schema of the parquet file and the arrow schema are used to convert the +/// underlying statistics value (stored as a parquet value) into the +/// corresponding Arrow value. For example, Decimals are stored as binary in +/// parquet files. +/// +/// The parquet_schema and arrow _schema do not have to be identical (for +/// example, the columns may be in different orders and one or the other schemas +/// may have additional columns). The function [`parquet_column`] is used to +/// match the column in the parquet file to the column in the arrow schema. +/// +/// # Multiple parquet files +/// +/// This API is designed to support efficiently extracting statistics from +/// multiple parquet files (hence why the parquet schema is passed in as an +/// argument). This is useful when building an index for a directory of parquet +/// files. +/// +#[derive(Debug)] +pub struct StatisticsConverter<'a> { + /// The name of the column to extract statistics for + column_name: &'a str, + /// The type of statistics to extract + statistics_type: RequestedStatistics, + /// The arrow schema of the query + arrow_schema: &'a Schema, + /// The field (with data type) of the column in the arrow schema + arrow_field: &'a Field, +} + +impl<'a> StatisticsConverter<'a> { + /// Returns a [`UInt64Array`] with counts for each row group + /// + /// The returned array has no nulls, and has one value for each row group. + /// Each value is the number of rows in the row group. + pub fn row_counts(metadata: &ParquetMetaData) -> Result { + let row_groups = metadata.row_groups(); + let mut builder = UInt64Array::builder(row_groups.len()); + for row_group in row_groups { + let row_count = row_group.num_rows(); + let row_count: u64 = row_count.try_into().map_err(|e| { + internal_datafusion_err!( + "Parquet row count {row_count} too large to convert to u64: {e}" + ) + })?; + builder.append_value(row_count); + } + Ok(builder.finish()) + } + + /// create an new statistics converter + pub fn try_new( + column_name: &'a str, + statistics_type: RequestedStatistics, + arrow_schema: &'a Schema, + ) -> Result { + // ensure the requested column is in the arrow schema + let Some((_idx, arrow_field)) = arrow_schema.column_with_name(column_name) else { + return plan_err!( + "Column '{}' not found in schema for statistics conversion", + column_name + ); + }; + + Ok(Self { + column_name, + statistics_type, + arrow_schema, + arrow_field, + }) + } + + /// extract the statistics from a parquet file, given the parquet file's metadata + /// + /// The returned array contains 1 value for each row group in the parquet + /// file in order + /// + /// Each value is either + /// * the requested statistics type for the column + /// * a null value, if the statistics can not be extracted + /// + /// Note that a null value does NOT mean the min or max value was actually + /// `null` it means it the requested statistic is unknown + /// + /// Reasons for not being able to extract the statistics include: + /// * the column is not present in the parquet file + /// * statistics for the column are not present in the row group + /// * the stored statistic value can not be converted to the requested type + pub fn extract(&self, metadata: &ParquetMetaData) -> Result { + let data_type = self.arrow_field.data_type(); + let num_row_groups = metadata.row_groups().len(); + + let parquet_schema = metadata.file_metadata().schema_descr(); + let row_groups = metadata.row_groups(); + + // find the column in the parquet schema, if not, return a null array + let Some((parquet_idx, matched_field)) = + parquet_column(parquet_schema, self.arrow_schema, self.column_name) + else { + // column was in the arrow schema but not in the parquet schema, so return a null array + return Ok(new_null_array(data_type, num_row_groups)); + }; + + // sanity check that matching field matches the arrow field + if matched_field.as_ref() != self.arrow_field { + return internal_err!( + "Matched column '{:?}' does not match original matched column '{:?}'", + matched_field, + self.arrow_field + ); + } + + // Get an iterator over the column statistics + let iter = row_groups + .iter() + .map(|x| x.column(parquet_idx).statistics()); + + match self.statistics_type { + RequestedStatistics::Min => min_statistics(data_type, iter), + RequestedStatistics::Max => max_statistics(data_type, iter), + } + } +} + #[cfg(test)] mod test { use super::*; diff --git a/datafusion/core/tests/parquet/arrow_statistics.rs b/datafusion/core/tests/parquet/arrow_statistics.rs index 458d4222770b0..3abf340070a02 100644 --- a/datafusion/core/tests/parquet/arrow_statistics.rs +++ b/datafusion/core/tests/parquet/arrow_statistics.rs @@ -23,8 +23,8 @@ use std::sync::Arc; use arrow_array::{make_array, Array, ArrayRef, Int64Array, RecordBatch, UInt64Array}; use arrow_schema::{DataType, Field, Schema}; -use datafusion::datasource::physical_plan::parquet::arrow_statistics::{ - self, parquet_stats_to_arrow, +use datafusion::datasource::physical_plan::parquet::{ + RequestedStatistics, StatisticsConverter, }; use parquet::arrow::arrow_reader::{ArrowReaderBuilder, ParquetRecordBatchReaderBuilder}; use parquet::arrow::ArrowWriter; @@ -99,63 +99,79 @@ pub fn parquet_file( ArrowReaderBuilder::try_new(file).unwrap() } +struct Test { + reader: ParquetRecordBatchReaderBuilder, + expected_min: ArrayRef, + expected_max: ArrayRef, + expected_row_counts: UInt64Array, +} + +impl Test { + fn run(self) { + let Self { + reader, + expected_min, + expected_max, + expected_row_counts, + } = self; + + let min = StatisticsConverter::try_new( + "i64", + RequestedStatistics::Min, + reader.schema(), + ) + .unwrap() + .extract(reader.metadata()) + .unwrap(); + assert_eq!(&min, &expected_min, "Mismatch with expected minimums"); + + let max = StatisticsConverter::try_new( + "i64", + RequestedStatistics::Max, + reader.schema(), + ) + .unwrap() + .extract(reader.metadata()) + .unwrap(); + assert_eq!(&max, &expected_max, "Mismatch with expected maximum"); + + let row_counts = StatisticsConverter::row_counts(reader.metadata()).unwrap(); + assert_eq!( + row_counts, expected_row_counts, + "Mismatch with expected row counts" + ); + } +} + // TESTS #[tokio::test] async fn test_one_row_group_without_null() { let row_per_group = 20; let reader = parquet_file(0, 4, 7, row_per_group); + Test { + reader, + // min is 4 + expected_min: Arc::new(Int64Array::from(vec![4])), + // max is 6 + expected_max: Arc::new(Int64Array::from(vec![6])), + // 3 rows + expected_row_counts: UInt64Array::from(vec![3]), + } + .run() - let statistics = - arrow_statistics::ParquetColumnStatistics::from_parquet_statistics(&reader) - .expect("getting statistics"); - - // only 1 column - assert_eq!(statistics.len(), 1, "expected 1 column"); - - let arrow_stats = parquet_stats_to_arrow(&DataType::Int64, &statistics[0]).unwrap(); - println!("Arrow statistics:\n {arrow_stats:#?}"); - - // only one row group - let min = arrow_stats.min(); - assert_eq!(arrow_stats.min().len(), 1, "expected 1 row group"); - - // min is 4 - let expect = Int64Array::from(vec![4]); - let expect = Arc::new(expect) as ArrayRef; - assert_eq!(min, &expect); - - // max is 6 - let expect = Int64Array::from(vec![6]); - let expect = Arc::new(expect) as ArrayRef; - assert_eq!(arrow_stats.max(), &expect); - - // no nulls - let expect = UInt64Array::from(vec![0]); - let expect = Arc::new(expect) as ArrayRef; - assert_eq!(arrow_stats.null_count(), &expect); - - // 3 rows - let expect = UInt64Array::from(vec![3]); - let expect = Arc::new(expect) as ArrayRef; - assert_eq!(arrow_stats.row_count(), &expect); + // no nulls (TBD I don't think the row group metadata has null count) + //let expect = UInt64Array::from(vec![0]); + //let expect = Arc::new(expect) as ArrayRef; + //assert_eq!(arrow_stats.null_count(), &expect); } +/* #[tokio::test] async fn test_one_row_group_with_null_and_negative() { let row_per_group = 20; let reader = parquet_file(2, -1, 5, row_per_group); - let statistics = - arrow_statistics::ParquetColumnStatistics::from_parquet_statistics(&reader) - .expect("getting statistics"); - - // only 1 column - assert_eq!(statistics.len(), 1, "expected 1 column"); - - let arrow_stats = parquet_stats_to_arrow(&DataType::Int64, &statistics[0]).unwrap(); - println!("Arrow statistics:\n {arrow_stats:#?}"); - // only one row group let min = arrow_stats.min(); assert_eq!(arrow_stats.min().len(), 1, "expected 1 row group"); @@ -268,3 +284,4 @@ async fn test_two_row_groups_with_all_nulls_in_one() { let expect = Arc::new(expect) as ArrayRef; assert_eq!(row_counts, &expect); } +*/ From 47e9d924b2f99df330dd56d34c7ac7a91583e2bc Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 17 May 2024 10:11:49 -0400 Subject: [PATCH 04/11] Implement null counts --- .../physical_plan/parquet/statistics.rs | 7 +++++ .../core/tests/parquet/arrow_statistics.rs | 29 +++++++++++++++---- 2 files changed, 31 insertions(+), 5 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index 218bb0a17885a..0ebf7dfe23842 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -28,6 +28,7 @@ use datafusion_common::{ use parquet::file::metadata::ParquetMetaData; use parquet::file::statistics::Statistics as ParquetStatistics; use parquet::schema::types::SchemaDescriptor; +use std::sync::Arc; // Convert the bytes array to i128. // The endian of the input bytes array must be big-endian. @@ -220,6 +221,8 @@ pub enum RequestedStatistics { Min, /// Maximum Value Max, + /// Null Count, returned as a [`UInt64Array`]) + NullCount, } /// Extracts Parquet statistics as Arrow arrays @@ -349,6 +352,10 @@ impl<'a> StatisticsConverter<'a> { match self.statistics_type { RequestedStatistics::Min => min_statistics(data_type, iter), RequestedStatistics::Max => max_statistics(data_type, iter), + RequestedStatistics::NullCount => { + let null_counts = iter.map(|stats| stats.map(|s| s.null_count())); + Ok(Arc::new(UInt64Array::from_iter(null_counts))) + } } } } diff --git a/datafusion/core/tests/parquet/arrow_statistics.rs b/datafusion/core/tests/parquet/arrow_statistics.rs index 3abf340070a02..c735c16b34956 100644 --- a/datafusion/core/tests/parquet/arrow_statistics.rs +++ b/datafusion/core/tests/parquet/arrow_statistics.rs @@ -103,6 +103,7 @@ struct Test { reader: ParquetRecordBatchReaderBuilder, expected_min: ArrayRef, expected_max: ArrayRef, + expected_null_counts: UInt64Array, expected_row_counts: UInt64Array, } @@ -112,6 +113,7 @@ impl Test { reader, expected_min, expected_max, + expected_null_counts, expected_row_counts, } = self; @@ -135,6 +137,20 @@ impl Test { .unwrap(); assert_eq!(&max, &expected_max, "Mismatch with expected maximum"); + let null_counts = StatisticsConverter::try_new( + "i64", + RequestedStatistics::NullCount, + reader.schema(), + ) + .unwrap() + .extract(reader.metadata()) + .unwrap(); + let expected_null_counts = Arc::new(expected_null_counts) as ArrayRef; + assert_eq!( + &null_counts, &expected_null_counts, + "Mismatch with expected null counts" + ); + let row_counts = StatisticsConverter::row_counts(reader.metadata()).unwrap(); assert_eq!( row_counts, expected_row_counts, @@ -144,6 +160,12 @@ impl Test { } // TESTS +// +// Remaining cases +// - Create parquet files / metadata with missing statistic values +// - Create parquet files / metadata with different data types +// - Create parquet files / metadata with different row group sizes +// - Using truncated statistics ("exact min value" and "exact max value" https://docs.rs/parquet/latest/parquet/file/statistics/enum.Statistics.html#method.max_is_exact) #[tokio::test] async fn test_one_row_group_without_null() { @@ -155,15 +177,12 @@ async fn test_one_row_group_without_null() { expected_min: Arc::new(Int64Array::from(vec![4])), // max is 6 expected_max: Arc::new(Int64Array::from(vec![6])), + // no nulls + expected_null_counts: UInt64Array::from(vec![0]), // 3 rows expected_row_counts: UInt64Array::from(vec![3]), } .run() - - // no nulls (TBD I don't think the row group metadata has null count) - //let expect = UInt64Array::from(vec![0]); - //let expect = Arc::new(expect) as ArrayRef; - //assert_eq!(arrow_stats.null_count(), &expect); } /* From f86841eab890412b9d00249dd6fe4c435dba8a25 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 17 May 2024 10:15:01 -0400 Subject: [PATCH 05/11] port test --- .../core/tests/parquet/arrow_statistics.rs | 135 +++++------------- 1 file changed, 36 insertions(+), 99 deletions(-) diff --git a/datafusion/core/tests/parquet/arrow_statistics.rs b/datafusion/core/tests/parquet/arrow_statistics.rs index c735c16b34956..2ccd12a27dd86 100644 --- a/datafusion/core/tests/parquet/arrow_statistics.rs +++ b/datafusion/core/tests/parquet/arrow_statistics.rs @@ -184,36 +184,24 @@ async fn test_one_row_group_without_null() { } .run() } -/* #[tokio::test] async fn test_one_row_group_with_null_and_negative() { let row_per_group = 20; let reader = parquet_file(2, -1, 5, row_per_group); - // only one row group - let min = arrow_stats.min(); - assert_eq!(arrow_stats.min().len(), 1, "expected 1 row group"); - - // min is -1 - let expect = Int64Array::from(vec![-1]); - let expect = Arc::new(expect) as ArrayRef; - assert_eq!(min, &expect); - - // max is 4 - let expect = Int64Array::from(vec![4]); - let expect = Arc::new(expect) as ArrayRef; - assert_eq!(arrow_stats.max(), &expect); - - // 2 nulls - let expect = UInt64Array::from(vec![2]); - let expect = Arc::new(expect) as ArrayRef; - assert_eq!(arrow_stats.null_count(), &expect); - - // 8 rows - let expect = UInt64Array::from(vec![8]); - let expect = Arc::new(expect) as ArrayRef; - assert_eq!(arrow_stats.row_count(), &expect); + Test { + reader, + // min is -1 + expected_min: Arc::new(Int64Array::from(vec![-1])), + // max is 4 + expected_max: Arc::new(Int64Array::from(vec![4])), + // 2 nulls + expected_null_counts: UInt64Array::from(vec![2]), + // 8 rows + expected_row_counts: UInt64Array::from(vec![8]), + } + .run() } #[tokio::test] @@ -221,42 +209,18 @@ async fn test_two_row_group_with_null() { let row_per_group = 10; let reader = parquet_file(2, 4, 17, row_per_group); - let statistics = - arrow_statistics::ParquetColumnStatistics::from_parquet_statistics(&reader) - .expect("getting statistics"); - - // only 1 column - assert_eq!(statistics.len(), 1, "expected 1 column"); - - let arrow_stats = parquet_stats_to_arrow(&DataType::Int64, &statistics[0]).unwrap(); - println!("Arrow statistics:\n {arrow_stats:#?}"); - - // 2 row groups - let mins = arrow_stats.min(); - assert_eq!(mins.len(), 2, "expected 2 row groups"); - - // mins are [4, 14] - let expect = Int64Array::from(vec![4, 14]); - let expect = Arc::new(expect) as ArrayRef; - assert_eq!(mins, &expect); - - // maxs are [13, 16] - let maxs = arrow_stats.max(); - let expect = Int64Array::from(vec![13, 16]); - let expect = Arc::new(expect) as ArrayRef; - assert_eq!(maxs, &expect); - - // nuls are [0, 2] - let nulls = arrow_stats.null_count(); - let expect = UInt64Array::from(vec![0, 2]); - let expect = Arc::new(expect) as ArrayRef; - assert_eq!(nulls, &expect); - - // row counts are [10, 5] - let row_counts = arrow_stats.row_count(); - let expect = UInt64Array::from(vec![10, 5]); - let expect = Arc::new(expect) as ArrayRef; - assert_eq!(row_counts, &expect); + Test { + reader, + // mins are [4, 14] + expected_min: Arc::new(Int64Array::from(vec![4, 14])), + // maxes are [13, 16] + expected_max: Arc::new(Int64Array::from(vec![13, 16])), + // nulls are [0, 2] + expected_null_counts: UInt64Array::from(vec![0, 2]), + // row counts are [10, 5] + expected_row_counts: UInt64Array::from(vec![10, 5]), + } + .run() } #[tokio::test] @@ -264,43 +228,16 @@ async fn test_two_row_groups_with_all_nulls_in_one() { let row_per_group = 5; let reader = parquet_file(4, -2, 2, row_per_group); - let statistics = - arrow_statistics::ParquetColumnStatistics::from_parquet_statistics(&reader) - .expect("getting statistics"); - - // only 1 column - assert_eq!(statistics.len(), 1, "expected 1 column"); - - let arrow_stats = parquet_stats_to_arrow(&DataType::Int64, &statistics[0]).unwrap(); - println!("Arrow statistics:\n {arrow_stats:#?}"); - - // 2 row groups - let mins = arrow_stats.min(); - assert_eq!(mins.len(), 2, "expected 2 row groups"); - - // mins are [-2, null] - assert!(mins.is_null(1)); - // check the first value -2 - // let expect = Int64Array::from(vec![-2]); - // let expect = Arc::new(expect) as ArrayRef; - // assert_eq!(mins, &expect); - - // maxs are [1, null - assert!(arrow_stats.max().is_null(1)); - // let expect = Int64Array::from(vec![1]); - // let expect = Arc::new(expect) as ArrayRef; - // assert_eq!(arrow_stats.max(), &expect); - - // nuls are [1, 3] - let nulls = arrow_stats.null_count(); - let expect = UInt64Array::from(vec![1, 3]); - let expect = Arc::new(expect) as ArrayRef; - assert_eq!(nulls, &expect); - - // row counts are [5, 3] - let row_counts = arrow_stats.row_count(); - let expect = UInt64Array::from(vec![5, 3]); - let expect = Arc::new(expect) as ArrayRef; - assert_eq!(row_counts, &expect); + Test { + reader, + // mins are [-2, null] + expected_min: Arc::new(Int64Array::from(vec![Some(-2), None])), + // maxes are [1, null] + expected_max: Arc::new(Int64Array::from(vec![Some(1), None])), + // nulls are [1, 3] + expected_null_counts: UInt64Array::from(vec![1, 3]), + // row counts are [5, 3] + expected_row_counts: UInt64Array::from(vec![5, 3]), + } + .run() } -*/ From a535b7561413e4055ed2cfc8cea9403abc50cf88 Mon Sep 17 00:00:00 2001 From: NGA-TRAN Date: Fri, 17 May 2024 17:18:02 -0400 Subject: [PATCH 06/11] test: add more tests for the arrow statistics --- .../core/tests/parquet/arrow_statistics.rs | 374 +++++++++++++++++- 1 file changed, 360 insertions(+), 14 deletions(-) diff --git a/datafusion/core/tests/parquet/arrow_statistics.rs b/datafusion/core/tests/parquet/arrow_statistics.rs index 2ccd12a27dd86..13916d0b2057e 100644 --- a/datafusion/core/tests/parquet/arrow_statistics.rs +++ b/datafusion/core/tests/parquet/arrow_statistics.rs @@ -21,7 +21,7 @@ use std::fs::File; use std::sync::Arc; -use arrow_array::{make_array, Array, ArrayRef, Int64Array, RecordBatch, UInt64Array}; +use arrow_array::{make_array, Array, ArrayRef, Int16Array, Int32Array, Int64Array, Int8Array, RecordBatch, UInt64Array}; use arrow_schema::{DataType, Field, Schema}; use datafusion::datasource::physical_plan::parquet::{ RequestedStatistics, StatisticsConverter, @@ -30,6 +30,10 @@ use parquet::arrow::arrow_reader::{ArrowReaderBuilder, ParquetRecordBatchReaderB use parquet::arrow::ArrowWriter; use parquet::file::properties::{EnabledStatistics, WriterProperties}; +use crate::parquet::Scenario; + +use super:: make_test_file_rg; + // TEST HELPERS /// Return a record batch with i64 with Null values @@ -79,8 +83,7 @@ pub fn parquet_file( .build(); let batches = vec![ - make_int64_batches_with_null(num_null, no_null_values_start, no_null_values_end), // TODO: likely make this more general using - // create_data_batch(scenario); where scenario is the respective enum data type + make_int64_batches_with_null(num_null, no_null_values_start, no_null_values_end), ]; let schema = batches[0].schema(); @@ -99,6 +102,23 @@ pub fn parquet_file( ArrowReaderBuilder::try_new(file).unwrap() } + +// Create a parquet file with many columns each has different data type +// - Data types are specified by the given scenario +// - Row group sizes are withe the same or different depending on the provided row_per_group & data created in the scenario +pub async fn create_parquet_file( + scenario: super::Scenario, + row_per_group: usize, +) -> ParquetRecordBatchReaderBuilder { + + let file = make_test_file_rg(scenario, row_per_group).await; + + // open the file & get the reader + let file = file.reopen().unwrap(); + ArrowReaderBuilder::try_new(file).unwrap() +} + + struct Test { reader: ParquetRecordBatchReaderBuilder, expected_min: ArrayRef, @@ -108,7 +128,7 @@ struct Test { } impl Test { - fn run(self) { + fn run(self, col_name: &str) { let Self { reader, expected_min, @@ -118,17 +138,18 @@ impl Test { } = self; let min = StatisticsConverter::try_new( - "i64", + col_name, RequestedStatistics::Min, reader.schema(), ) .unwrap() .extract(reader.metadata()) .unwrap(); + assert_eq!(&min, &expected_min, "Mismatch with expected minimums"); let max = StatisticsConverter::try_new( - "i64", + col_name, RequestedStatistics::Max, reader.schema(), ) @@ -138,7 +159,7 @@ impl Test { assert_eq!(&max, &expected_max, "Mismatch with expected maximum"); let null_counts = StatisticsConverter::try_new( - "i64", + col_name, RequestedStatistics::NullCount, reader.schema(), ) @@ -157,14 +178,32 @@ impl Test { "Mismatch with expected row counts" ); } + + fn run_col_not_found(self, col_name: &str) { + let Self { + reader, + expected_min: _, + expected_max: _, + expected_null_counts: _, + expected_row_counts: _, + } = self; + + let min = StatisticsConverter::try_new( + col_name, + RequestedStatistics::Min, + reader.schema(), + ); + + assert!(min.is_err()); + } } // TESTS // // Remaining cases // - Create parquet files / metadata with missing statistic values -// - Create parquet files / metadata with different data types -// - Create parquet files / metadata with different row group sizes +// - Create parquet files / metadata with different data types -- included but not all data types yet +// - Create parquet files / metadata with different row group sizes -- done // - Using truncated statistics ("exact min value" and "exact max value" https://docs.rs/parquet/latest/parquet/file/statistics/enum.Statistics.html#method.max_is_exact) #[tokio::test] @@ -181,8 +220,8 @@ async fn test_one_row_group_without_null() { expected_null_counts: UInt64Array::from(vec![0]), // 3 rows expected_row_counts: UInt64Array::from(vec![3]), - } - .run() +} + .run("i64") } #[tokio::test] @@ -201,7 +240,7 @@ async fn test_one_row_group_with_null_and_negative() { // 8 rows expected_row_counts: UInt64Array::from(vec![8]), } - .run() + .run("i64") } #[tokio::test] @@ -220,7 +259,7 @@ async fn test_two_row_group_with_null() { // row counts are [10, 5] expected_row_counts: UInt64Array::from(vec![10, 5]), } - .run() + .run("i64") } #[tokio::test] @@ -239,5 +278,312 @@ async fn test_two_row_groups_with_all_nulls_in_one() { // row counts are [5, 3] expected_row_counts: UInt64Array::from(vec![5, 3]), } - .run() + .run("i64") +} + +/////////////// MORE GENERAL TESTS ////////////////////// +// . Many columns in a file +// . Differnet data types +// . Different row group sizes + +// Four different integer types +#[tokio::test] +async fn test_int_64() { + let row_per_group = 5; + // This creates a parquet files of 4 columns named "i8", "i16", "i32", "i64" + let reader = create_parquet_file(Scenario::Int, row_per_group).await; + + Test { + reader, + // mins are [-5, -4, 0, 5] + expected_min: Arc::new(Int64Array::from(vec![-5, -4, 0, 5])), + // maxes are [-1, 0, 4, 9] + expected_max: Arc::new(Int64Array::from(vec![-1, 0, 4, 9])), + // nulls are [0, 0, 0, 0] + expected_null_counts:UInt64Array::from(vec![0, 0, 0, 0]), + // row counts are [5, 5, 5, 5] + expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), + } + .run("i64"); +} + +#[tokio::test] +async fn test_int_32() { + let row_per_group = 5; + // This creates a parquet files of 4 columns named "i8", "i16", "i32", "i64" + let reader = create_parquet_file(Scenario::Int, row_per_group).await; + + Test { + reader, + // mins are [-5, -4, 0, 5] + expected_min: Arc::new(Int32Array::from(vec![-5, -4, 0, 5])), + // maxes are [-1, 0, 4, 9] + expected_max: Arc::new(Int32Array::from(vec![-1, 0, 4, 9])), + // nulls are [0, 0, 0, 0] + expected_null_counts:UInt64Array::from(vec![0, 0, 0, 0]), + // row counts are [5, 5, 5, 5] + expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), + } + .run("i32"); +} + +// BUG: ignore this test for now +// Note that the file has 4 columns named "i8", "i16", "i32", "i64". +// - The tests on column i32 and i64 passed. +// - The tests on column i8 and i16 failed. +#[ignore] +#[tokio::test] +async fn test_int_16() { + let row_per_group = 5; + // This creates a parquet files of 4 columns named "i8", "i16", "i32", "i64" + let reader = create_parquet_file(Scenario::Int, row_per_group).await; + + Test { + reader, + // mins are [-5, -4, 0, 5] + // BUG: not sure why this returns same data but in Int32Array type even though I debugged and the columns name is "i16" an its data is Int16 + // My debugging tells me the bug is either at: + // 1. The new code to get "iter". See the code in this PR with + // // Get an iterator over the column statistics + // let iter = row_groups + // .iter() + // .map(|x| x.column(parquet_idx).statistics()); + // OR + // 2. in the function (and/or its marco) `pub(crate) fn min_statistics<'a, I: Iterator>>` here + // https://github.com/apache/datafusion/blob/ea023e2d4878240eece870cf4b346c7a0667aeed/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs#L179 + expected_min: Arc::new(Int16Array::from(vec![-5, -4, 0, 5])), // panic here because the actual data is Int32Array + // maxes are [-1, 0, 4, 9] + expected_max: Arc::new(Int16Array::from(vec![-1, 0, 4, 9])), + // nulls are [0, 0, 0, 0] + expected_null_counts:UInt64Array::from(vec![0, 0, 0, 0]), + // row counts are [5, 5, 5, 5] + expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), + } + .run("i16"); +} + +// BUG (same as above): ignore this test for now +#[ignore] +#[tokio::test] +async fn test_int_8() { + let row_per_group = 5; + // This creates a parquet files of 4 columns named "i8", "i16", "i32", "i64" + let reader = create_parquet_file(Scenario::Int, row_per_group).await; + + Test { + reader, + // mins are [-5, -4, 0, 5] + // BUG: not sure why this returns same data but in Int32Array even though I debugged and the columns name is "i8" an its data is Int8 + expected_min: Arc::new(Int8Array::from(vec![-5, -4, 0, 5])), // panic here because the actual data is Int32Array + // maxes are [-1, 0, 4, 9] + expected_max: Arc::new(Int8Array::from(vec![-1, 0, 4, 9])), + // nulls are [0, 0, 0, 0] + expected_null_counts:UInt64Array::from(vec![0, 0, 0, 0]), + // row counts are [5, 5, 5, 5] + expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), + } + .run("i8"); +} + +// timestamp +#[tokio::test] +async fn test_timestamp() { + let row_per_group = 5; + + // This creates a parquet files of 5 columns named "nanos", "micros", "millis", "seconds", "names" + // "nanos" --> TimestampNanosecondArray + // "micros" --> TimestampMicrosecondArray + // "millis" --> TimestampMillisecondArray + // "seconds" --> TimestampSecondArray + // "names" --> StringArray + // + // The file is created by 4 record batches, each has 5 rowws. + // Since the row group isze is set to 5, those 4 batches will go into 4 row groups + let reader = create_parquet_file(Scenario::Timestamps, row_per_group).await; + + Test { + reader, + // mins are [1577840461000000000, 1577840471000000000, 1577841061000000000, 1578704461000000000,] + expected_min: Arc::new(Int64Array::from(vec![1577840461000000000, 1577840471000000000, 1577841061000000000, 1578704461000000000,])), + // maxes are [1577926861000000000, 1577926871000000000, 1577927461000000000, 1578790861000000000,] + expected_max: Arc::new(Int64Array::from(vec![1577926861000000000, 1577926871000000000, 1577927461000000000, 1578790861000000000,])), + // nulls are [1, 1, 1, 1] + expected_null_counts:UInt64Array::from(vec![1, 1, 1, 1]), + // row counts are [5, 5, 5, 5] + expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), + } + .run("nanos"); + + // micros + let reader = create_parquet_file(Scenario::Timestamps, row_per_group).await; + Test { + reader, + expected_min: Arc::new(Int64Array::from(vec![1577840461000000, 1577840471000000, 1577841061000000, 1578704461000000,])), + expected_max: Arc::new(Int64Array::from(vec![1577926861000000, 1577926871000000, 1577927461000000, 1578790861000000,])), + expected_null_counts:UInt64Array::from(vec![1, 1, 1, 1]), + expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), + } + .run("micros"); + + // millis + let reader = create_parquet_file(Scenario::Timestamps, row_per_group).await; + Test { + reader, + expected_min: Arc::new(Int64Array::from(vec![1577840461000, 1577840471000, 1577841061000, 1578704461000,])), + expected_max: Arc::new(Int64Array::from(vec![1577926861000, 1577926871000, 1577927461000, 1578790861000,])), + expected_null_counts:UInt64Array::from(vec![1, 1, 1, 1]), + expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), + } + .run("millis"); + + // seconds + let reader = create_parquet_file(Scenario::Timestamps, row_per_group).await; + Test { + reader, + expected_min: Arc::new(Int64Array::from(vec![1577840461, 1577840471, 1577841061, 1578704461,])), + expected_max: Arc::new(Int64Array::from(vec![1577926861, 1577926871, 1577927461, 1578790861,])), + expected_null_counts:UInt64Array::from(vec![1, 1, 1, 1]), + expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), + } + .run("seconds"); +} + + +// timestamp with different row group sizes +#[tokio::test] +async fn test_timestamp_diff_rg_sizes() { + let row_per_group = 8; + + // This creates a parquet files of 5 columns named "nanos", "micros", "millis", "seconds", "names" + // "nanos" --> TimestampNanosecondArray + // "micros" --> TimestampMicrosecondArray + // "millis" --> TimestampMillisecondArray + // "seconds" --> TimestampSecondArray + // "names" --> StringArray + // + // The file is created by 4 record batches (each has a null row), each has 5 rows but then will be split into 3 row groups with size 8, 8, 4 + let reader = create_parquet_file(Scenario::Timestamps, row_per_group).await; + + Test { + reader, + // mins are [1577840461000000000, 1577841061000000000, 1578704521000000000] + expected_min: Arc::new(Int64Array::from(vec![1577840461000000000, 1577841061000000000, 1578704521000000000])), + // maxes are [1577926861000000000, 1578704461000000000, 157879086100000000] + expected_max: Arc::new(Int64Array::from(vec![1577926861000000000, 1578704461000000000, 1578790861000000000])), + // nulls are [1, 2, 1] + expected_null_counts:UInt64Array::from(vec![1, 2, 1]), + // row counts are [8, 8, 4] + expected_row_counts: UInt64Array::from(vec![8, 8, 4]), + } + .run("nanos"); + + // micros + let reader = create_parquet_file(Scenario::Timestamps, row_per_group).await; + Test { + reader, + expected_min: Arc::new(Int64Array::from(vec![1577840461000000, 1577841061000000, 1578704521000000])), + expected_max: Arc::new(Int64Array::from(vec![1577926861000000, 1578704461000000, 1578790861000000])), + expected_null_counts:UInt64Array::from(vec![1, 2, 1]), + expected_row_counts: UInt64Array::from(vec![8, 8, 4]), + } + .run("micros"); + + // millis + let reader = create_parquet_file(Scenario::Timestamps, row_per_group).await; + Test { + reader, + expected_min: Arc::new(Int64Array::from(vec![1577840461000, 1577841061000, 1578704521000])), + expected_max: Arc::new(Int64Array::from(vec![1577926861000, 1578704461000, 1578790861000])), + expected_null_counts:UInt64Array::from(vec![1, 2, 1]), + expected_row_counts: UInt64Array::from(vec![8, 8, 4]), + } + .run("millis"); + + // seconds + let reader = create_parquet_file(Scenario::Timestamps, row_per_group).await; + Test { + reader, + expected_min: Arc::new(Int64Array::from(vec![1577840461, 1577841061, 1578704521])), + expected_max: Arc::new(Int64Array::from(vec![1577926861, 1578704461, 1578790861])), + expected_null_counts:UInt64Array::from(vec![1, 2, 1]), + expected_row_counts: UInt64Array::from(vec![8, 8, 4]), + } + .run("seconds"); +} + +// date with different row group sizes +#[tokio::test] +async fn test_dates_32_diff_rg_sizes() { + let row_per_group = 13; + + // This creates a parquet files of 3 columns named "date32", "date64", "names" + // "date32" --> Date32Array + // "date64" --> Date64Array + // "names" --> StringArray + // + // The file is created by 4 record batches (each has a null row), each has 5 rows but then will be split into 2 row groups with size 13, 7 + let reader = create_parquet_file(Scenario::Dates, row_per_group).await; + + Test { + reader, + // mins are [18262, 18565,] + expected_min: Arc::new(Int32Array::from(vec![18262, 18565])), + // maxes are [18564, 21865,] + expected_max: Arc::new(Int32Array::from(vec![18564, 21865])), + // nulls are [2, 2] + expected_null_counts:UInt64Array::from(vec![2, 2]), + // row counts are [13, 7] + expected_row_counts: UInt64Array::from(vec![13, 7]), + } + .run("date32"); +} + +// BUG: same as above. Expect to return Int64Array (for Date64Array) but returns Int32Array +// test date with different row group sizes +#[tokio::test] +async fn test_dates_64_diff_rg_sizes() { + let row_per_group = 13; + // The file is created by 4 record batches (each has a null row), each has 5 rows but then will be split into 2 row groups with size 13, 7 + let reader = create_parquet_file(Scenario::Dates, row_per_group).await; + Test { + reader, + expected_min: Arc::new(Int64Array::from(vec![18262, 18565])), // panic here because the actual data is Int32Array + expected_max: Arc::new(Int64Array::from(vec![18564, 21865])), + expected_null_counts:UInt64Array::from(vec![2, 2]), + expected_row_counts: UInt64Array::from(vec![13, 7]), + } + .run("date64"); +} + +// TODO: +// Other data types to tests + // Int32Range, + // UInt, + // UInt32Range, + // Float64, + // Decimal, + // DecimalBloomFilterInt32, + // DecimalBloomFilterInt64, + // DecimalLargePrecision, + // DecimalLargePrecisionBloomFilter, + // ByteArray, + // PeriodsInColumnNames, + // WithNullValuesPageLevel, + // WITHOUT Stats + + +/////// NEGATIVE TESTS /////// +// column not found +#[tokio::test] +async fn test_column_not_found() { + let row_per_group = 5; + let reader = create_parquet_file(Scenario::Dates, row_per_group).await; + Test { + reader, + expected_min: Arc::new(Int64Array::from(vec![18262, 18565])), + expected_max: Arc::new(Int64Array::from(vec![18564, 21865])), + expected_null_counts:UInt64Array::from(vec![2, 2]), + expected_row_counts: UInt64Array::from(vec![13, 7]), + } + .run_col_not_found("not_a_column"); } From 44fcb6f8177c788b457b994c112ac766670b5d05 Mon Sep 17 00:00:00 2001 From: NGA-TRAN Date: Fri, 17 May 2024 17:37:44 -0400 Subject: [PATCH 07/11] chore: fix format and test output --- .../datasource/physical_plan/parquet/mod.rs | 2 +- .../core/tests/parquet/arrow_statistics.rs | 183 ++++++++++++------ 2 files changed, 124 insertions(+), 61 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index d5be748169dd3..dd953878df49e 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -71,8 +71,8 @@ mod schema_adapter; mod statistics; pub use metrics::ParquetFileMetrics; -pub use statistics::{RequestedStatistics, StatisticsConverter}; pub use schema_adapter::{SchemaAdapter, SchemaAdapterFactory, SchemaMapper}; +pub use statistics::{RequestedStatistics, StatisticsConverter}; /// Execution plan for scanning one or more Parquet partitions #[derive(Debug, Clone)] diff --git a/datafusion/core/tests/parquet/arrow_statistics.rs b/datafusion/core/tests/parquet/arrow_statistics.rs index 13916d0b2057e..9d3ac299cf15f 100644 --- a/datafusion/core/tests/parquet/arrow_statistics.rs +++ b/datafusion/core/tests/parquet/arrow_statistics.rs @@ -21,7 +21,10 @@ use std::fs::File; use std::sync::Arc; -use arrow_array::{make_array, Array, ArrayRef, Int16Array, Int32Array, Int64Array, Int8Array, RecordBatch, UInt64Array}; +use arrow_array::{ + make_array, Array, ArrayRef, Int16Array, Int32Array, Int64Array, Int8Array, + RecordBatch, UInt64Array, +}; use arrow_schema::{DataType, Field, Schema}; use datafusion::datasource::physical_plan::parquet::{ RequestedStatistics, StatisticsConverter, @@ -32,7 +35,7 @@ use parquet::file::properties::{EnabledStatistics, WriterProperties}; use crate::parquet::Scenario; -use super:: make_test_file_rg; +use super::make_test_file_rg; // TEST HELPERS @@ -82,9 +85,11 @@ pub fn parquet_file( .set_statistics_enabled(EnabledStatistics::Chunk) .build(); - let batches = vec![ - make_int64_batches_with_null(num_null, no_null_values_start, no_null_values_end), - ]; + let batches = vec![make_int64_batches_with_null( + num_null, + no_null_values_start, + no_null_values_end, + )]; let schema = batches[0].schema(); @@ -102,7 +107,6 @@ pub fn parquet_file( ArrowReaderBuilder::try_new(file).unwrap() } - // Create a parquet file with many columns each has different data type // - Data types are specified by the given scenario // - Row group sizes are withe the same or different depending on the provided row_per_group & data created in the scenario @@ -110,7 +114,6 @@ pub async fn create_parquet_file( scenario: super::Scenario, row_per_group: usize, ) -> ParquetRecordBatchReaderBuilder { - let file = make_test_file_rg(scenario, row_per_group).await; // open the file & get the reader @@ -118,7 +121,6 @@ pub async fn create_parquet_file( ArrowReaderBuilder::try_new(file).unwrap() } - struct Test { reader: ParquetRecordBatchReaderBuilder, expected_min: ArrayRef, @@ -220,7 +222,7 @@ async fn test_one_row_group_without_null() { expected_null_counts: UInt64Array::from(vec![0]), // 3 rows expected_row_counts: UInt64Array::from(vec![3]), -} + } .run("i64") } @@ -300,7 +302,7 @@ async fn test_int_64() { // maxes are [-1, 0, 4, 9] expected_max: Arc::new(Int64Array::from(vec![-1, 0, 4, 9])), // nulls are [0, 0, 0, 0] - expected_null_counts:UInt64Array::from(vec![0, 0, 0, 0]), + expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]), // row counts are [5, 5, 5, 5] expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), } @@ -320,7 +322,7 @@ async fn test_int_32() { // maxes are [-1, 0, 4, 9] expected_max: Arc::new(Int32Array::from(vec![-1, 0, 4, 9])), // nulls are [0, 0, 0, 0] - expected_null_counts:UInt64Array::from(vec![0, 0, 0, 0]), + expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]), // row counts are [5, 5, 5, 5] expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), } @@ -344,18 +346,18 @@ async fn test_int_16() { // BUG: not sure why this returns same data but in Int32Array type even though I debugged and the columns name is "i16" an its data is Int16 // My debugging tells me the bug is either at: // 1. The new code to get "iter". See the code in this PR with - // // Get an iterator over the column statistics - // let iter = row_groups - // .iter() - // .map(|x| x.column(parquet_idx).statistics()); + // // Get an iterator over the column statistics + // let iter = row_groups + // .iter() + // .map(|x| x.column(parquet_idx).statistics()); // OR // 2. in the function (and/or its marco) `pub(crate) fn min_statistics<'a, I: Iterator>>` here // https://github.com/apache/datafusion/blob/ea023e2d4878240eece870cf4b346c7a0667aeed/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs#L179 - expected_min: Arc::new(Int16Array::from(vec![-5, -4, 0, 5])), // panic here because the actual data is Int32Array + expected_min: Arc::new(Int16Array::from(vec![-5, -4, 0, 5])), // panic here because the actual data is Int32Array // maxes are [-1, 0, 4, 9] expected_max: Arc::new(Int16Array::from(vec![-1, 0, 4, 9])), // nulls are [0, 0, 0, 0] - expected_null_counts:UInt64Array::from(vec![0, 0, 0, 0]), + expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]), // row counts are [5, 5, 5, 5] expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), } @@ -378,7 +380,7 @@ async fn test_int_8() { // maxes are [-1, 0, 4, 9] expected_max: Arc::new(Int8Array::from(vec![-1, 0, 4, 9])), // nulls are [0, 0, 0, 0] - expected_null_counts:UInt64Array::from(vec![0, 0, 0, 0]), + expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]), // row counts are [5, 5, 5, 5] expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), } @@ -404,11 +406,21 @@ async fn test_timestamp() { Test { reader, // mins are [1577840461000000000, 1577840471000000000, 1577841061000000000, 1578704461000000000,] - expected_min: Arc::new(Int64Array::from(vec![1577840461000000000, 1577840471000000000, 1577841061000000000, 1578704461000000000,])), + expected_min: Arc::new(Int64Array::from(vec![ + 1577840461000000000, + 1577840471000000000, + 1577841061000000000, + 1578704461000000000, + ])), // maxes are [1577926861000000000, 1577926871000000000, 1577927461000000000, 1578790861000000000,] - expected_max: Arc::new(Int64Array::from(vec![1577926861000000000, 1577926871000000000, 1577927461000000000, 1578790861000000000,])), + expected_max: Arc::new(Int64Array::from(vec![ + 1577926861000000000, + 1577926871000000000, + 1577927461000000000, + 1578790861000000000, + ])), // nulls are [1, 1, 1, 1] - expected_null_counts:UInt64Array::from(vec![1, 1, 1, 1]), + expected_null_counts: UInt64Array::from(vec![1, 1, 1, 1]), // row counts are [5, 5, 5, 5] expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), } @@ -418,9 +430,19 @@ async fn test_timestamp() { let reader = create_parquet_file(Scenario::Timestamps, row_per_group).await; Test { reader, - expected_min: Arc::new(Int64Array::from(vec![1577840461000000, 1577840471000000, 1577841061000000, 1578704461000000,])), - expected_max: Arc::new(Int64Array::from(vec![1577926861000000, 1577926871000000, 1577927461000000, 1578790861000000,])), - expected_null_counts:UInt64Array::from(vec![1, 1, 1, 1]), + expected_min: Arc::new(Int64Array::from(vec![ + 1577840461000000, + 1577840471000000, + 1577841061000000, + 1578704461000000, + ])), + expected_max: Arc::new(Int64Array::from(vec![ + 1577926861000000, + 1577926871000000, + 1577927461000000, + 1578790861000000, + ])), + expected_null_counts: UInt64Array::from(vec![1, 1, 1, 1]), expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), } .run("micros"); @@ -429,9 +451,19 @@ async fn test_timestamp() { let reader = create_parquet_file(Scenario::Timestamps, row_per_group).await; Test { reader, - expected_min: Arc::new(Int64Array::from(vec![1577840461000, 1577840471000, 1577841061000, 1578704461000,])), - expected_max: Arc::new(Int64Array::from(vec![1577926861000, 1577926871000, 1577927461000, 1578790861000,])), - expected_null_counts:UInt64Array::from(vec![1, 1, 1, 1]), + expected_min: Arc::new(Int64Array::from(vec![ + 1577840461000, + 1577840471000, + 1577841061000, + 1578704461000, + ])), + expected_max: Arc::new(Int64Array::from(vec![ + 1577926861000, + 1577926871000, + 1577927461000, + 1578790861000, + ])), + expected_null_counts: UInt64Array::from(vec![1, 1, 1, 1]), expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), } .run("millis"); @@ -440,15 +472,18 @@ async fn test_timestamp() { let reader = create_parquet_file(Scenario::Timestamps, row_per_group).await; Test { reader, - expected_min: Arc::new(Int64Array::from(vec![1577840461, 1577840471, 1577841061, 1578704461,])), - expected_max: Arc::new(Int64Array::from(vec![1577926861, 1577926871, 1577927461, 1578790861,])), - expected_null_counts:UInt64Array::from(vec![1, 1, 1, 1]), + expected_min: Arc::new(Int64Array::from(vec![ + 1577840461, 1577840471, 1577841061, 1578704461, + ])), + expected_max: Arc::new(Int64Array::from(vec![ + 1577926861, 1577926871, 1577927461, 1578790861, + ])), + expected_null_counts: UInt64Array::from(vec![1, 1, 1, 1]), expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), } .run("seconds"); } - // timestamp with different row group sizes #[tokio::test] async fn test_timestamp_diff_rg_sizes() { @@ -467,11 +502,19 @@ async fn test_timestamp_diff_rg_sizes() { Test { reader, // mins are [1577840461000000000, 1577841061000000000, 1578704521000000000] - expected_min: Arc::new(Int64Array::from(vec![1577840461000000000, 1577841061000000000, 1578704521000000000])), + expected_min: Arc::new(Int64Array::from(vec![ + 1577840461000000000, + 1577841061000000000, + 1578704521000000000, + ])), // maxes are [1577926861000000000, 1578704461000000000, 157879086100000000] - expected_max: Arc::new(Int64Array::from(vec![1577926861000000000, 1578704461000000000, 1578790861000000000])), + expected_max: Arc::new(Int64Array::from(vec![ + 1577926861000000000, + 1578704461000000000, + 1578790861000000000, + ])), // nulls are [1, 2, 1] - expected_null_counts:UInt64Array::from(vec![1, 2, 1]), + expected_null_counts: UInt64Array::from(vec![1, 2, 1]), // row counts are [8, 8, 4] expected_row_counts: UInt64Array::from(vec![8, 8, 4]), } @@ -481,9 +524,17 @@ async fn test_timestamp_diff_rg_sizes() { let reader = create_parquet_file(Scenario::Timestamps, row_per_group).await; Test { reader, - expected_min: Arc::new(Int64Array::from(vec![1577840461000000, 1577841061000000, 1578704521000000])), - expected_max: Arc::new(Int64Array::from(vec![1577926861000000, 1578704461000000, 1578790861000000])), - expected_null_counts:UInt64Array::from(vec![1, 2, 1]), + expected_min: Arc::new(Int64Array::from(vec![ + 1577840461000000, + 1577841061000000, + 1578704521000000, + ])), + expected_max: Arc::new(Int64Array::from(vec![ + 1577926861000000, + 1578704461000000, + 1578790861000000, + ])), + expected_null_counts: UInt64Array::from(vec![1, 2, 1]), expected_row_counts: UInt64Array::from(vec![8, 8, 4]), } .run("micros"); @@ -492,9 +543,17 @@ async fn test_timestamp_diff_rg_sizes() { let reader = create_parquet_file(Scenario::Timestamps, row_per_group).await; Test { reader, - expected_min: Arc::new(Int64Array::from(vec![1577840461000, 1577841061000, 1578704521000])), - expected_max: Arc::new(Int64Array::from(vec![1577926861000, 1578704461000, 1578790861000])), - expected_null_counts:UInt64Array::from(vec![1, 2, 1]), + expected_min: Arc::new(Int64Array::from(vec![ + 1577840461000, + 1577841061000, + 1578704521000, + ])), + expected_max: Arc::new(Int64Array::from(vec![ + 1577926861000, + 1578704461000, + 1578790861000, + ])), + expected_null_counts: UInt64Array::from(vec![1, 2, 1]), expected_row_counts: UInt64Array::from(vec![8, 8, 4]), } .run("millis"); @@ -503,9 +562,13 @@ async fn test_timestamp_diff_rg_sizes() { let reader = create_parquet_file(Scenario::Timestamps, row_per_group).await; Test { reader, - expected_min: Arc::new(Int64Array::from(vec![1577840461, 1577841061, 1578704521])), - expected_max: Arc::new(Int64Array::from(vec![1577926861, 1578704461, 1578790861])), - expected_null_counts:UInt64Array::from(vec![1, 2, 1]), + expected_min: Arc::new(Int64Array::from(vec![ + 1577840461, 1577841061, 1578704521, + ])), + expected_max: Arc::new(Int64Array::from(vec![ + 1577926861, 1578704461, 1578790861, + ])), + expected_null_counts: UInt64Array::from(vec![1, 2, 1]), expected_row_counts: UInt64Array::from(vec![8, 8, 4]), } .run("seconds"); @@ -531,7 +594,7 @@ async fn test_dates_32_diff_rg_sizes() { // maxes are [18564, 21865,] expected_max: Arc::new(Int32Array::from(vec![18564, 21865])), // nulls are [2, 2] - expected_null_counts:UInt64Array::from(vec![2, 2]), + expected_null_counts: UInt64Array::from(vec![2, 2]), // row counts are [13, 7] expected_row_counts: UInt64Array::from(vec![13, 7]), } @@ -540,6 +603,7 @@ async fn test_dates_32_diff_rg_sizes() { // BUG: same as above. Expect to return Int64Array (for Date64Array) but returns Int32Array // test date with different row group sizes +#[ignore] #[tokio::test] async fn test_dates_64_diff_rg_sizes() { let row_per_group = 13; @@ -549,7 +613,7 @@ async fn test_dates_64_diff_rg_sizes() { reader, expected_min: Arc::new(Int64Array::from(vec![18262, 18565])), // panic here because the actual data is Int32Array expected_max: Arc::new(Int64Array::from(vec![18564, 21865])), - expected_null_counts:UInt64Array::from(vec![2, 2]), + expected_null_counts: UInt64Array::from(vec![2, 2]), expected_row_counts: UInt64Array::from(vec![13, 7]), } .run("date64"); @@ -557,20 +621,19 @@ async fn test_dates_64_diff_rg_sizes() { // TODO: // Other data types to tests - // Int32Range, - // UInt, - // UInt32Range, - // Float64, - // Decimal, - // DecimalBloomFilterInt32, - // DecimalBloomFilterInt64, - // DecimalLargePrecision, - // DecimalLargePrecisionBloomFilter, - // ByteArray, - // PeriodsInColumnNames, - // WithNullValuesPageLevel, - // WITHOUT Stats - +// Int32Range, +// UInt, +// UInt32Range, +// Float64, +// Decimal, +// DecimalBloomFilterInt32, +// DecimalBloomFilterInt64, +// DecimalLargePrecision, +// DecimalLargePrecisionBloomFilter, +// ByteArray, +// PeriodsInColumnNames, +// WithNullValuesPageLevel, +// WITHOUT Stats /////// NEGATIVE TESTS /////// // column not found @@ -582,7 +645,7 @@ async fn test_column_not_found() { reader, expected_min: Arc::new(Int64Array::from(vec![18262, 18565])), expected_max: Arc::new(Int64Array::from(vec![18564, 21865])), - expected_null_counts:UInt64Array::from(vec![2, 2]), + expected_null_counts: UInt64Array::from(vec![2, 2]), expected_row_counts: UInt64Array::from(vec![13, 7]), } .run_col_not_found("not_a_column"); From f3ae219d823d010262bca9b0b4ac750e1c5ce0c8 Mon Sep 17 00:00:00 2001 From: NGA-TRAN Date: Sat, 18 May 2024 08:09:16 -0400 Subject: [PATCH 08/11] chore: rename test helpers --- .../core/tests/parquet/arrow_statistics.rs | 42 +++++++++---------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/datafusion/core/tests/parquet/arrow_statistics.rs b/datafusion/core/tests/parquet/arrow_statistics.rs index 9d3ac299cf15f..b208bbb656e60 100644 --- a/datafusion/core/tests/parquet/arrow_statistics.rs +++ b/datafusion/core/tests/parquet/arrow_statistics.rs @@ -68,7 +68,7 @@ fn make_int64_batches_with_null( // . Number of null rows is the given num_null // . There are non-null values in the range [no_null_values_start, no_null_values_end], one value each row // . The file is divided into row groups of size row_per_group -pub fn parquet_file( +pub fn parquet_file_one_column( num_null: usize, no_null_values_start: i64, no_null_values_end: i64, @@ -110,7 +110,7 @@ pub fn parquet_file( // Create a parquet file with many columns each has different data type // - Data types are specified by the given scenario // - Row group sizes are withe the same or different depending on the provided row_per_group & data created in the scenario -pub async fn create_parquet_file( +pub async fn parquet_file_many_columns( scenario: super::Scenario, row_per_group: usize, ) -> ParquetRecordBatchReaderBuilder { @@ -211,7 +211,7 @@ impl Test { #[tokio::test] async fn test_one_row_group_without_null() { let row_per_group = 20; - let reader = parquet_file(0, 4, 7, row_per_group); + let reader = parquet_file_one_column(0, 4, 7, row_per_group); Test { reader, // min is 4 @@ -229,7 +229,7 @@ async fn test_one_row_group_without_null() { #[tokio::test] async fn test_one_row_group_with_null_and_negative() { let row_per_group = 20; - let reader = parquet_file(2, -1, 5, row_per_group); + let reader = parquet_file_one_column(2, -1, 5, row_per_group); Test { reader, @@ -248,7 +248,7 @@ async fn test_one_row_group_with_null_and_negative() { #[tokio::test] async fn test_two_row_group_with_null() { let row_per_group = 10; - let reader = parquet_file(2, 4, 17, row_per_group); + let reader = parquet_file_one_column(2, 4, 17, row_per_group); Test { reader, @@ -267,7 +267,7 @@ async fn test_two_row_group_with_null() { #[tokio::test] async fn test_two_row_groups_with_all_nulls_in_one() { let row_per_group = 5; - let reader = parquet_file(4, -2, 2, row_per_group); + let reader = parquet_file_one_column(4, -2, 2, row_per_group); Test { reader, @@ -293,7 +293,7 @@ async fn test_two_row_groups_with_all_nulls_in_one() { async fn test_int_64() { let row_per_group = 5; // This creates a parquet files of 4 columns named "i8", "i16", "i32", "i64" - let reader = create_parquet_file(Scenario::Int, row_per_group).await; + let reader = parquet_file_many_columns(Scenario::Int, row_per_group).await; Test { reader, @@ -313,7 +313,7 @@ async fn test_int_64() { async fn test_int_32() { let row_per_group = 5; // This creates a parquet files of 4 columns named "i8", "i16", "i32", "i64" - let reader = create_parquet_file(Scenario::Int, row_per_group).await; + let reader = parquet_file_many_columns(Scenario::Int, row_per_group).await; Test { reader, @@ -338,7 +338,7 @@ async fn test_int_32() { async fn test_int_16() { let row_per_group = 5; // This creates a parquet files of 4 columns named "i8", "i16", "i32", "i64" - let reader = create_parquet_file(Scenario::Int, row_per_group).await; + let reader = parquet_file_many_columns(Scenario::Int, row_per_group).await; Test { reader, @@ -370,7 +370,7 @@ async fn test_int_16() { async fn test_int_8() { let row_per_group = 5; // This creates a parquet files of 4 columns named "i8", "i16", "i32", "i64" - let reader = create_parquet_file(Scenario::Int, row_per_group).await; + let reader = parquet_file_many_columns(Scenario::Int, row_per_group).await; Test { reader, @@ -401,7 +401,7 @@ async fn test_timestamp() { // // The file is created by 4 record batches, each has 5 rowws. // Since the row group isze is set to 5, those 4 batches will go into 4 row groups - let reader = create_parquet_file(Scenario::Timestamps, row_per_group).await; + let reader = parquet_file_many_columns(Scenario::Timestamps, row_per_group).await; Test { reader, @@ -427,7 +427,7 @@ async fn test_timestamp() { .run("nanos"); // micros - let reader = create_parquet_file(Scenario::Timestamps, row_per_group).await; + let reader = parquet_file_many_columns(Scenario::Timestamps, row_per_group).await; Test { reader, expected_min: Arc::new(Int64Array::from(vec![ @@ -448,7 +448,7 @@ async fn test_timestamp() { .run("micros"); // millis - let reader = create_parquet_file(Scenario::Timestamps, row_per_group).await; + let reader = parquet_file_many_columns(Scenario::Timestamps, row_per_group).await; Test { reader, expected_min: Arc::new(Int64Array::from(vec![ @@ -469,7 +469,7 @@ async fn test_timestamp() { .run("millis"); // seconds - let reader = create_parquet_file(Scenario::Timestamps, row_per_group).await; + let reader = parquet_file_many_columns(Scenario::Timestamps, row_per_group).await; Test { reader, expected_min: Arc::new(Int64Array::from(vec![ @@ -497,7 +497,7 @@ async fn test_timestamp_diff_rg_sizes() { // "names" --> StringArray // // The file is created by 4 record batches (each has a null row), each has 5 rows but then will be split into 3 row groups with size 8, 8, 4 - let reader = create_parquet_file(Scenario::Timestamps, row_per_group).await; + let reader = parquet_file_many_columns(Scenario::Timestamps, row_per_group).await; Test { reader, @@ -521,7 +521,7 @@ async fn test_timestamp_diff_rg_sizes() { .run("nanos"); // micros - let reader = create_parquet_file(Scenario::Timestamps, row_per_group).await; + let reader = parquet_file_many_columns(Scenario::Timestamps, row_per_group).await; Test { reader, expected_min: Arc::new(Int64Array::from(vec![ @@ -540,7 +540,7 @@ async fn test_timestamp_diff_rg_sizes() { .run("micros"); // millis - let reader = create_parquet_file(Scenario::Timestamps, row_per_group).await; + let reader = parquet_file_many_columns(Scenario::Timestamps, row_per_group).await; Test { reader, expected_min: Arc::new(Int64Array::from(vec![ @@ -559,7 +559,7 @@ async fn test_timestamp_diff_rg_sizes() { .run("millis"); // seconds - let reader = create_parquet_file(Scenario::Timestamps, row_per_group).await; + let reader = parquet_file_many_columns(Scenario::Timestamps, row_per_group).await; Test { reader, expected_min: Arc::new(Int64Array::from(vec![ @@ -585,7 +585,7 @@ async fn test_dates_32_diff_rg_sizes() { // "names" --> StringArray // // The file is created by 4 record batches (each has a null row), each has 5 rows but then will be split into 2 row groups with size 13, 7 - let reader = create_parquet_file(Scenario::Dates, row_per_group).await; + let reader = parquet_file_many_columns(Scenario::Dates, row_per_group).await; Test { reader, @@ -608,7 +608,7 @@ async fn test_dates_32_diff_rg_sizes() { async fn test_dates_64_diff_rg_sizes() { let row_per_group = 13; // The file is created by 4 record batches (each has a null row), each has 5 rows but then will be split into 2 row groups with size 13, 7 - let reader = create_parquet_file(Scenario::Dates, row_per_group).await; + let reader = parquet_file_many_columns(Scenario::Dates, row_per_group).await; Test { reader, expected_min: Arc::new(Int64Array::from(vec![18262, 18565])), // panic here because the actual data is Int32Array @@ -640,7 +640,7 @@ async fn test_dates_64_diff_rg_sizes() { #[tokio::test] async fn test_column_not_found() { let row_per_group = 5; - let reader = create_parquet_file(Scenario::Dates, row_per_group).await; + let reader = parquet_file_many_columns(Scenario::Dates, row_per_group).await; Test { reader, expected_min: Arc::new(Int64Array::from(vec![18262, 18565])), From 5aea4bfedfe9b0f61ee44f907616acd341681a6e Mon Sep 17 00:00:00 2001 From: Nga Tran Date: Mon, 20 May 2024 14:14:02 -0400 Subject: [PATCH 09/11] chore: Apply suggestions from code review Co-authored-by: Andrew Lamb --- datafusion/core/tests/parquet/arrow_statistics.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/tests/parquet/arrow_statistics.rs b/datafusion/core/tests/parquet/arrow_statistics.rs index b208bbb656e60..190c4dc95bb48 100644 --- a/datafusion/core/tests/parquet/arrow_statistics.rs +++ b/datafusion/core/tests/parquet/arrow_statistics.rs @@ -621,7 +621,7 @@ async fn test_dates_64_diff_rg_sizes() { // TODO: // Other data types to tests -// Int32Range, +// `u8`, `u16`, `u32`, and `u64`, // UInt, // UInt32Range, // Float64, From 5b91f2cfc617663b9d365da27e1aed36554d8d66 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 20 May 2024 14:14:33 -0400 Subject: [PATCH 10/11] Apply suggestions from code review --- datafusion/core/tests/parquet/arrow_statistics.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/datafusion/core/tests/parquet/arrow_statistics.rs b/datafusion/core/tests/parquet/arrow_statistics.rs index 190c4dc95bb48..1f0e42905d2f3 100644 --- a/datafusion/core/tests/parquet/arrow_statistics.rs +++ b/datafusion/core/tests/parquet/arrow_statistics.rs @@ -330,6 +330,7 @@ async fn test_int_32() { } // BUG: ignore this test for now +// https://github.com/apache/datafusion/issues/10585 // Note that the file has 4 columns named "i8", "i16", "i32", "i64". // - The tests on column i32 and i64 passed. // - The tests on column i8 and i16 failed. @@ -365,6 +366,7 @@ async fn test_int_16() { } // BUG (same as above): ignore this test for now +// https://github.com/apache/datafusion/issues/10585 #[ignore] #[tokio::test] async fn test_int_8() { From bd7437caf0424056f0b6814f2db436da52f21b92 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 20 May 2024 14:28:59 -0400 Subject: [PATCH 11/11] Apply suggestions from code review --- datafusion/core/tests/parquet/arrow_statistics.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/datafusion/core/tests/parquet/arrow_statistics.rs b/datafusion/core/tests/parquet/arrow_statistics.rs index 1f0e42905d2f3..272afea7b28a7 100644 --- a/datafusion/core/tests/parquet/arrow_statistics.rs +++ b/datafusion/core/tests/parquet/arrow_statistics.rs @@ -577,6 +577,8 @@ async fn test_timestamp_diff_rg_sizes() { } // date with different row group sizes +// Bug expect `Date32Array` but returns Int32Array +// https://github.com/apache/datafusion/issues/10587 #[tokio::test] async fn test_dates_32_diff_rg_sizes() { let row_per_group = 13; @@ -603,8 +605,9 @@ async fn test_dates_32_diff_rg_sizes() { .run("date32"); } -// BUG: same as above. Expect to return Int64Array (for Date64Array) but returns Int32Array +// BUG: same as above. Expect to return Date64Array but returns Int32Array // test date with different row group sizes +// https://github.com/apache/datafusion/issues/10587 #[ignore] #[tokio::test] async fn test_dates_64_diff_rg_sizes() {