diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index 6c738cfe03a95..9d59808508604 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -160,6 +160,9 @@ macro_rules! get_statistic { Some(DataType::Binary) => { Some(ScalarValue::Binary(Some(s.$bytes_func().to_vec()))) } + Some(DataType::LargeBinary) => { + Some(ScalarValue::LargeBinary(Some(s.$bytes_func().to_vec()))) + } Some(DataType::LargeUtf8) | _ => { let utf8_value = std::str::from_utf8(s.$bytes_func()) .map(|s| s.to_string()) @@ -427,8 +430,9 @@ mod test { use arrow_array::{ new_null_array, Array, BinaryArray, BooleanArray, Date32Array, Date64Array, Decimal128Array, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, - Int8Array, RecordBatch, StringArray, StructArray, TimestampMicrosecondArray, - TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, + Int8Array, LargeBinaryArray, RecordBatch, StringArray, StructArray, + TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, + TimestampSecondArray, }; use arrow_schema::{Field, SchemaRef}; use bytes::Bytes; @@ -965,6 +969,34 @@ mod test { .run() } + #[test] + fn roundtrip_large_binary_array() { + let input: Vec> = vec![ + // row group 1 + Some(b"A"), + None, + Some(b"Q"), + // row group 2 + Some(b"ZZ"), + Some(b"AA"), + None, + // row group 3 + None, + None, + None, + ]; + + let expected_min: Vec> = vec![Some(b"A"), Some(b"AA"), None]; + let expected_max: Vec> = vec![Some(b"Q"), Some(b"ZZ"), None]; + + Test { + input: large_binary_array(input), + expected_min: large_binary_array(expected_min), + expected_max: large_binary_array(expected_max), + } + .run(); + } + #[test] fn struct_and_non_struct() { // Ensures that statistics for an array that appears *after* a struct @@ -1439,4 +1471,13 @@ mod test { ); Arc::new(array) } + + fn large_binary_array<'a>( + input: impl IntoIterator>, + ) -> ArrayRef { + let array = + LargeBinaryArray::from(input.into_iter().collect::>>()); + + Arc::new(array) + } } diff --git a/datafusion/core/tests/parquet/arrow_statistics.rs b/datafusion/core/tests/parquet/arrow_statistics.rs index c2bf75c8f0896..3b7961236efa8 100644 --- a/datafusion/core/tests/parquet/arrow_statistics.rs +++ b/datafusion/core/tests/parquet/arrow_statistics.rs @@ -30,8 +30,8 @@ use arrow::datatypes::{ use arrow_array::{ make_array, Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Date64Array, Decimal128Array, FixedSizeBinaryArray, Float16Array, Float32Array, Float64Array, - Int16Array, Int32Array, Int64Array, Int8Array, LargeStringArray, RecordBatch, - StringArray, TimestampMicrosecondArray, TimestampMillisecondArray, + Int16Array, Int32Array, Int64Array, Int8Array, LargeBinaryArray, LargeStringArray, + RecordBatch, StringArray, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array, }; @@ -1261,7 +1261,6 @@ async fn test_decimal() { } .run(); } - #[tokio::test] async fn test_dictionary() { let reader = TestReader { @@ -1302,11 +1301,12 @@ async fn test_dictionary() { #[tokio::test] async fn test_byte() { - // This creates a parquet file of 4 columns + // This creates a parquet file of 5 columns // "name" // "service_string" // "service_binary" // "service_fixedsize" + // "service_large_binary" // file has 3 record batches, each has 5 rows. They will be saved into 3 row groups let reader = TestReader { @@ -1389,6 +1389,26 @@ async fn test_byte() { column_name: "service_fixedsize", } .run(); + + let expected_service_large_binary_min_values: Vec<&[u8]> = + vec![b"frontend five", b"backend one", b"backend eight"]; + + let expected_service_large_binary_max_values: Vec<&[u8]> = + vec![b"frontend two", b"frontend six", b"backend six"]; + + Test { + reader: reader.build().await, + expected_min: Arc::new(LargeBinaryArray::from( + expected_service_large_binary_min_values, + )), + expected_max: Arc::new(LargeBinaryArray::from( + expected_service_large_binary_max_values, + )), + expected_null_counts: UInt64Array::from(vec![0, 0, 0]), + expected_row_counts: UInt64Array::from(vec![5, 5, 5]), + column_name: "service_large_binary", + } + .run(); } // PeriodsInColumnNames diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index e951644f2cbfd..fd2f184328cd2 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -21,10 +21,10 @@ use arrow::{ array::{ make_array, Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Date64Array, DictionaryArray, FixedSizeBinaryArray, Float16Array, Float32Array, Float64Array, - Int16Array, Int32Array, Int64Array, Int8Array, LargeStringArray, StringArray, - StructArray, TimestampMicrosecondArray, TimestampMillisecondArray, - TimestampNanosecondArray, TimestampSecondArray, UInt16Array, UInt32Array, - UInt64Array, UInt8Array, + Int16Array, Int32Array, Int64Array, Int8Array, LargeBinaryArray, + LargeStringArray, StringArray, StructArray, TimestampMicrosecondArray, + TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, + UInt16Array, UInt32Array, UInt64Array, UInt8Array, }, datatypes::{DataType, Field, Int32Type, Int8Type, Schema}, record_batch::RecordBatch, @@ -648,6 +648,8 @@ fn make_bytearray_batch( string_values: Vec<&str>, binary_values: Vec<&[u8]>, fixedsize_values: Vec<&[u8; 3]>, + // i64 offset. + large_binary_values: Vec<&[u8]>, ) -> RecordBatch { let num_rows = string_values.len(); let name: StringArray = std::iter::repeat(Some(name)).take(num_rows).collect(); @@ -658,6 +660,8 @@ fn make_bytearray_batch( .map(|value| Some(value.as_slice())) .collect::>() .into(); + let service_large_binary: LargeBinaryArray = + large_binary_values.iter().map(Some).collect(); let schema = Schema::new(vec![ Field::new("name", name.data_type().clone(), true), @@ -669,6 +673,11 @@ fn make_bytearray_batch( service_fixedsize.data_type().clone(), true, ), + Field::new( + "service_large_binary", + service_large_binary.data_type().clone(), + true, + ), ]); let schema = Arc::new(schema); @@ -679,6 +688,7 @@ fn make_bytearray_batch( Arc::new(service_string), Arc::new(service_binary), Arc::new(service_fixedsize), + Arc::new(service_large_binary), ], ) .unwrap() @@ -1000,6 +1010,13 @@ fn create_data_batch(scenario: Scenario) -> Vec { b"frontend five", ], vec![b"fe1", b"fe2", b"fe3", b"fe7", b"fe5"], + vec![ + b"frontend one", + b"frontend two", + b"frontend three", + b"frontend seven", + b"frontend five", + ], ), make_bytearray_batch( "mixed", @@ -1018,6 +1035,13 @@ fn create_data_batch(scenario: Scenario) -> Vec { b"backend three", ], vec![b"fe6", b"fe4", b"be1", b"be2", b"be3"], + vec![ + b"frontend six", + b"frontend four", + b"backend one", + b"backend two", + b"backend three", + ], ), make_bytearray_batch( "all backends", @@ -1036,6 +1060,13 @@ fn create_data_batch(scenario: Scenario) -> Vec { b"backend eight", ], vec![b"be4", b"be5", b"be6", b"be7", b"be8"], + vec![ + b"backend four", + b"backend five", + b"backend six", + b"backend seven", + b"backend eight", + ], ), ] }