From 275f20c659e21853ffca677f5226c23ff94e16b8 Mon Sep 17 00:00:00 2001 From: Xin Li Date: Mon, 3 Jun 2024 08:48:38 +0000 Subject: [PATCH 1/2] Fix Extract parquet statistics from LargeBinary columns --- .../physical_plan/parquet/statistics.rs | 43 ++++++++++++++++++- .../core/tests/parquet/arrow_statistics.rs | 30 ++++++++++--- datafusion/core/tests/parquet/mod.rs | 35 ++++++++++++++- 3 files changed, 101 insertions(+), 7 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index ae8395aef6a41..e3d1e214c62a5 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -132,6 +132,9 @@ macro_rules! get_statistic { Some(DataType::Binary) => { Some(ScalarValue::Binary(Some(s.$bytes_func().to_vec()))) } + Some(DataType::LargeBinary) => { + Some(ScalarValue::LargeBinary(Some(s.$bytes_func().to_vec()))) + } _ => { let s = std::str::from_utf8(s.$bytes_func()) .map(|s| s.to_string()) @@ -395,7 +398,8 @@ mod test { use arrow_array::{ new_null_array, Array, BinaryArray, BooleanArray, Date32Array, Date64Array, Decimal128Array, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, - Int8Array, RecordBatch, StringArray, StructArray, TimestampNanosecondArray, + Int8Array, LargeBinaryArray, RecordBatch, StringArray, StructArray, + TimestampNanosecondArray, }; use arrow_schema::{Field, SchemaRef}; use bytes::Bytes; @@ -751,6 +755,34 @@ mod test { .run() } + #[test] + fn roundtrip_large_binary_array() { + let input: Vec> = vec![ + // row group 1 + Some(b"A"), + None, + Some(b"Q"), + // row group 2 + Some(b"ZZ"), + Some(b"AA"), + None, + // row group 3 + None, + None, + None, + ]; + + let expected_min: Vec> = vec![Some(b"A"), Some(b"AA"), None]; + let expected_max: Vec> = vec![Some(b"Q"), Some(b"ZZ"), None]; + + Test { + input: large_binary_array(input), + expected_min: large_binary_array(expected_min), + expected_max: large_binary_array(expected_max), + } + .run(); + } + #[test] fn struct_and_non_struct() { // Ensures that statistics for an array that appears *after* a struct @@ -1186,4 +1218,13 @@ mod test { ); Arc::new(array) } + + fn large_binary_array<'a>( + input: impl IntoIterator>, + ) -> ArrayRef { + let array = + LargeBinaryArray::from(input.into_iter().collect::>>()); + + Arc::new(array) + } } diff --git a/datafusion/core/tests/parquet/arrow_statistics.rs b/datafusion/core/tests/parquet/arrow_statistics.rs index eebf3447cbe9f..04a755d62143c 100644 --- a/datafusion/core/tests/parquet/arrow_statistics.rs +++ b/datafusion/core/tests/parquet/arrow_statistics.rs @@ -26,8 +26,8 @@ use arrow::datatypes::{Date32Type, Date64Type}; use arrow_array::{ make_array, Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Date64Array, Decimal128Array, FixedSizeBinaryArray, Float32Array, Float64Array, Int16Array, - Int32Array, Int64Array, Int8Array, RecordBatch, StringArray, UInt16Array, - UInt32Array, UInt64Array, UInt8Array, + Int32Array, Int64Array, Int8Array, LargeBinaryArray, RecordBatch, StringArray, + UInt16Array, UInt32Array, UInt64Array, UInt8Array, }; use arrow_schema::{DataType, Field, Schema}; use datafusion::datasource::physical_plan::parquet::{ @@ -994,15 +994,14 @@ async fn test_decimal() { .run(); } -// BUG: not convert BinaryArray to StringArray -// https://github.com/apache/datafusion/issues/10605 #[tokio::test] async fn test_byte() { - // This creates a parquet file of 4 columns + // This creates a parquet file of 5 columns // "name" // "service_string" // "service_binary" // "service_fixedsize" + // "service_large_binary" // file has 3 record batches, each has 5 rows. They will be saved into 3 row groups let reader = TestReader { @@ -1085,6 +1084,26 @@ async fn test_byte() { column_name: "service_fixedsize", } .run(); + + let expected_service_large_binary_min_values: Vec<&[u8]> = + vec![b"frontend five", b"backend one", b"backend eight"]; + + let expected_service_large_binary_max_values: Vec<&[u8]> = + vec![b"frontend two", b"frontend six", b"backend six"]; + + Test { + reader: reader.build().await, + expected_min: Arc::new(LargeBinaryArray::from( + expected_service_large_binary_min_values, + )), + expected_max: Arc::new(LargeBinaryArray::from( + expected_service_large_binary_max_values, + )), + expected_null_counts: UInt64Array::from(vec![0, 0, 0]), + expected_row_counts: UInt64Array::from(vec![5, 5, 5]), + column_name: "service_large_binary", + } + .run(); } // PeriodsInColumnNames @@ -1174,6 +1193,7 @@ async fn test_struct() { } .run(); } + ////// Files with missing statistics /////// #[tokio::test] diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index 94ae9ff601ecf..949963e348137 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -28,7 +28,9 @@ use arrow::{ record_batch::RecordBatch, util::pretty::pretty_format_batches, }; -use arrow_array::{make_array, BooleanArray, Float32Array, StructArray}; +use arrow_array::{ + make_array, BooleanArray, Float32Array, LargeBinaryArray, StructArray, +}; use chrono::{Datelike, Duration, TimeDelta}; use datafusion::{ datasource::{physical_plan::ParquetExec, provider_as_source, TableProvider}, @@ -598,6 +600,8 @@ fn make_bytearray_batch( string_values: Vec<&str>, binary_values: Vec<&[u8]>, fixedsize_values: Vec<&[u8; 3]>, + // i64 offset. + large_binary_values: Vec<&[u8]>, ) -> RecordBatch { let num_rows = string_values.len(); let name: StringArray = std::iter::repeat(Some(name)).take(num_rows).collect(); @@ -608,6 +612,8 @@ fn make_bytearray_batch( .map(|value| Some(value.as_slice())) .collect::>() .into(); + let service_large_binary: LargeBinaryArray = + large_binary_values.iter().map(Some).collect(); let schema = Schema::new(vec![ Field::new("name", name.data_type().clone(), true), @@ -619,6 +625,11 @@ fn make_bytearray_batch( service_fixedsize.data_type().clone(), true, ), + Field::new( + "service_large_binary", + service_large_binary.data_type().clone(), + true, + ), ]); let schema = Arc::new(schema); @@ -629,6 +640,7 @@ fn make_bytearray_batch( Arc::new(service_string), Arc::new(service_binary), Arc::new(service_fixedsize), + Arc::new(service_large_binary), ], ) .unwrap() @@ -877,6 +889,13 @@ fn create_data_batch(scenario: Scenario) -> Vec { b"frontend five", ], vec![b"fe1", b"fe2", b"fe3", b"fe7", b"fe5"], + vec![ + b"frontend one", + b"frontend two", + b"frontend three", + b"frontend seven", + b"frontend five", + ], ), make_bytearray_batch( "mixed", @@ -895,6 +914,13 @@ fn create_data_batch(scenario: Scenario) -> Vec { b"backend three", ], vec![b"fe6", b"fe4", b"be1", b"be2", b"be3"], + vec![ + b"frontend six", + b"frontend four", + b"backend one", + b"backend two", + b"backend three", + ], ), make_bytearray_batch( "all backends", @@ -913,6 +939,13 @@ fn create_data_batch(scenario: Scenario) -> Vec { b"backend eight", ], vec![b"be4", b"be5", b"be6", b"be7", b"be8"], + vec![ + b"backend four", + b"backend five", + b"backend six", + b"backend seven", + b"backend eight", + ], ), ] } From 7d81466557f3783c33d7a15a5bf647ef91f2a42c Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 3 Jun 2024 20:46:13 -0400 Subject: [PATCH 2/2] fix --- .../core/src/datasource/physical_plan/parquet/statistics.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index 4ab79fc6ad0bb..9d59808508604 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -430,8 +430,9 @@ mod test { use arrow_array::{ new_null_array, Array, BinaryArray, BooleanArray, Date32Array, Date64Array, Decimal128Array, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, - Int8Array, RecordBatch, StringArray, StructArray, TimestampMicrosecondArray, - TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, + Int8Array, LargeBinaryArray, RecordBatch, StringArray, StructArray, + TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, + TimestampSecondArray, }; use arrow_schema::{Field, SchemaRef}; use bytes::Bytes;