From 889143f407896e50ea0dc883ad0aeb44b069be8d Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Sun, 2 Jun 2024 12:06:36 +0800 Subject: [PATCH] refactor: handle LargeUtf8 statistics and add tests for UTF8 and LargeUTF8 --- .../physical_plan/parquet/statistics.rs | 16 +++++---- .../core/tests/parquet/arrow_statistics.rs | 36 +++++++++++++++++-- datafusion/core/tests/parquet/mod.rs | 27 +++++++++++++- 3 files changed, 69 insertions(+), 10 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index ae8395aef6a41..d65b2c0530b9e 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -132,16 +132,18 @@ macro_rules! get_statistic { Some(DataType::Binary) => { Some(ScalarValue::Binary(Some(s.$bytes_func().to_vec()))) } - _ => { - let s = std::str::from_utf8(s.$bytes_func()) + Some(DataType::LargeUtf8) | _ => { + let utf8_value = std::str::from_utf8(s.$bytes_func()) .map(|s| s.to_string()) .ok(); - if s.is_none() { - log::debug!( - "Utf8 statistics is a non-UTF8 value, ignoring it." - ); + if utf8_value.is_none() { + log::debug!("Utf8 statistics is a non-UTF8 value, ignoring it."); + } + + match $target_arrow_type { + Some(DataType::LargeUtf8) => Some(ScalarValue::LargeUtf8(utf8_value)), + _ => Some(ScalarValue::Utf8(utf8_value)), } - Some(ScalarValue::Utf8(s)) } } } diff --git a/datafusion/core/tests/parquet/arrow_statistics.rs b/datafusion/core/tests/parquet/arrow_statistics.rs index eebf3447cbe9f..7daec1bef9f5a 100644 --- a/datafusion/core/tests/parquet/arrow_statistics.rs +++ b/datafusion/core/tests/parquet/arrow_statistics.rs @@ -26,8 +26,8 @@ use arrow::datatypes::{Date32Type, Date64Type}; use arrow_array::{ make_array, Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Date64Array, Decimal128Array, FixedSizeBinaryArray, Float32Array, Float64Array, Int16Array, - Int32Array, Int64Array, Int8Array, RecordBatch, StringArray, UInt16Array, - UInt32Array, UInt64Array, UInt8Array, + Int32Array, Int64Array, Int8Array, LargeStringArray, RecordBatch, StringArray, + UInt16Array, UInt32Array, UInt64Array, UInt8Array, }; use arrow_schema::{DataType, Field, Schema}; use datafusion::datasource::physical_plan::parquet::{ @@ -1174,6 +1174,38 @@ async fn test_struct() { } .run(); } + +// UTF8 +#[tokio::test] +async fn test_utf8() { + let reader = TestReader { + scenario: Scenario::UTF8, + row_per_group: 5, + }; + + // test for utf8 + Test { + reader: reader.build().await, + expected_min: Arc::new(StringArray::from(vec!["a", "e"])), + expected_max: Arc::new(StringArray::from(vec!["d", "i"])), + expected_null_counts: UInt64Array::from(vec![1, 0]), + expected_row_counts: UInt64Array::from(vec![5, 5]), + column_name: "utf8", + } + .run(); + + // test for large_utf8 + Test { + reader: reader.build().await, + expected_min: Arc::new(LargeStringArray::from(vec!["a", "e"])), + expected_max: Arc::new(LargeStringArray::from(vec!["d", "i"])), + expected_null_counts: UInt64Array::from(vec![1, 0]), + expected_row_counts: UInt64Array::from(vec![5, 5]), + column_name: "large_utf8", + } + .run(); +} + ////// Files with missing statistics /////// #[tokio::test] diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index 94ae9ff601ecf..665e0c1caf3fe 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -28,7 +28,9 @@ use arrow::{ record_batch::RecordBatch, util::pretty::pretty_format_batches, }; -use arrow_array::{make_array, BooleanArray, Float32Array, StructArray}; +use arrow_array::{ + make_array, BooleanArray, Float32Array, LargeStringArray, StructArray, +}; use chrono::{Datelike, Duration, TimeDelta}; use datafusion::{ datasource::{physical_plan::ParquetExec, provider_as_source, TableProvider}, @@ -86,6 +88,7 @@ enum Scenario { WithNullValues, WithNullValuesPageLevel, StructArray, + UTF8, } enum Unit { @@ -745,6 +748,16 @@ fn make_numeric_limit_batch() -> RecordBatch { .unwrap() } +fn make_utf8_batch(value: Vec>) -> RecordBatch { + let utf8 = StringArray::from(value.clone()); + let large_utf8 = LargeStringArray::from(value); + RecordBatch::try_from_iter(vec![ + ("utf8", Arc::new(utf8) as _), + ("large_utf8", Arc::new(large_utf8) as _), + ]) + .unwrap() +} + fn create_data_batch(scenario: Scenario) -> Vec { match scenario { Scenario::Boolean => { @@ -964,6 +977,18 @@ fn create_data_batch(scenario: Scenario) -> Vec { )])); vec![RecordBatch::try_new(schema, vec![struct_array_data]).unwrap()] } + Scenario::UTF8 => { + vec![ + make_utf8_batch(vec![Some("a"), Some("b"), Some("c"), Some("d"), None]), + make_utf8_batch(vec![ + Some("e"), + Some("f"), + Some("g"), + Some("h"), + Some("i"), + ]), + ] + } } }