From 076b982ec3cf9e939c304933fae8b820f9a76157 Mon Sep 17 00:00:00 2001 From: Yanxin Xiang Date: Sun, 2 Jun 2024 21:59:40 -0700 Subject: [PATCH] Extract parquet statistics from Time32 and Time64 columns --- .../physical_plan/parquet/statistics.rs | 13 ++ .../core/tests/parquet/arrow_statistics.rs | 98 ++++++++++- datafusion/core/tests/parquet/mod.rs | 159 +++++++++++++++++- 3 files changed, 268 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index ae8395aef6a41..48e1887822efa 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -21,6 +21,7 @@ use arrow::{array::ArrayRef, datatypes::DataType}; use arrow_array::{new_empty_array, new_null_array, UInt64Array}; +use arrow_schema::TimeUnit; use arrow_schema::{Field, FieldRef, Schema}; use datafusion_common::{ internal_datafusion_err, internal_err, plan_err, Result, ScalarValue, @@ -96,6 +97,12 @@ macro_rules! get_statistic { Some(DataType::Date64) => { Some(ScalarValue::Date64(Some(i64::from(*s.$func()) * 24 * 60 * 60 * 1000))) } + Some(DataType::Time32(TimeUnit::Second)) => { + Some(ScalarValue::Time32Second(Some((*s.$func())))) + } + Some(DataType::Time32(TimeUnit::Millisecond)) => { + Some(ScalarValue::Time32Millisecond(Some((*s.$func())))) + } _ => Some(ScalarValue::Int32(Some(*s.$func()))), } } @@ -112,6 +119,12 @@ macro_rules! get_statistic { Some(DataType::UInt64) => { Some(ScalarValue::UInt64(Some((*s.$func()) as u64))) } + Some(DataType::Time64(TimeUnit::Microsecond)) => { + Some(ScalarValue::Time64Microsecond(Some((*s.$func() as i64)))) + } + Some(DataType::Time64(TimeUnit::Nanosecond)) => { + Some(ScalarValue::Time64Nanosecond(Some((*s.$func() as i64)))) + } _ => Some(ScalarValue::Int64(Some(*s.$func()))), } } diff --git a/datafusion/core/tests/parquet/arrow_statistics.rs b/datafusion/core/tests/parquet/arrow_statistics.rs index eebf3447cbe9f..ee3dba79c12e7 100644 --- a/datafusion/core/tests/parquet/arrow_statistics.rs +++ b/datafusion/core/tests/parquet/arrow_statistics.rs @@ -26,7 +26,8 @@ use arrow::datatypes::{Date32Type, Date64Type}; use arrow_array::{ make_array, Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Date64Array, Decimal128Array, FixedSizeBinaryArray, Float32Array, Float64Array, Int16Array, - Int32Array, Int64Array, Int8Array, RecordBatch, StringArray, UInt16Array, + Int32Array, Int64Array, Int8Array, RecordBatch, StringArray, Time32MillisecondArray, + Time32SecondArray, Time64MicrosecondArray, Time64NanosecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array, }; use arrow_schema::{DataType, Field, Schema}; @@ -680,6 +681,101 @@ async fn test_dates_32_diff_rg_sizes() { .run(); } +#[tokio::test] +async fn test_time32_second_and_time64_nanosecond_diff_rg_sizes() { + let reader_time32 = TestReader { + scenario: Scenario::Time32Second, + row_per_group: 4, + }; + + // Test for Time32Second column + Test { + reader: reader_time32.build().await, + // Assuming specific minimum and maximum values for demonstration + expected_min: Arc::new(Time32SecondArray::from(vec![18506, 18510, 18514, 18518])), + expected_max: Arc::new(Time32SecondArray::from(vec![18509, 18513, 18517, 18521])), + expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]), // Assuming 1 null per row group for simplicity + expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4]), + column_name: "second", + } + .run(); + + let reader_time32_millisecond = TestReader { + scenario: Scenario::Time32Millisecond, + row_per_group: 4, + }; + + // Test for Time32Millisecond column + Test { + reader: reader_time32_millisecond.build().await, + // Assuming specific minimum and maximum values for demonstration + expected_min: Arc::new(Time32MillisecondArray::from(vec![ + 3600000, 3600004, 3600008, 3600012, + ])), + expected_max: Arc::new(Time32MillisecondArray::from(vec![ + 3600003, 3600007, 3600011, 3600015, + ])), + expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]), // Assuming 1 null per row group for simplicity + expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4]), + column_name: "millisecond", + } + .run(); + + let reader_time64_micro = TestReader { + scenario: Scenario::Time64Microsecond, + row_per_group: 4, + }; + + // Test for Time64MicroSecond column + Test { + reader: reader_time64_micro.build().await, + // Assuming specific minimum and maximum values for demonstration + expected_min: Arc::new(Time64MicrosecondArray::from(vec![ + 1234567890123, + 1234567890127, + 1234567890131, + 1234567890135, + ])), + expected_max: Arc::new(Time64MicrosecondArray::from(vec![ + 1234567890126, + 1234567890130, + 1234567890134, + 1234567890138, + ])), + expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]), // Assuming 1 null per row group for simplicity + expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4]), + column_name: "microsecond", + } + .run(); + + let reader_time64_nano = TestReader { + scenario: Scenario::Time64Nanosecond, + row_per_group: 4, + }; + + // Test for Time32Second column + Test { + reader: reader_time64_nano.build().await, + // Assuming specific minimum and maximum values for demonstration + expected_min: Arc::new(Time64NanosecondArray::from(vec![ + 987654321012345, + 987654321012349, + 987654321012353, + 987654321012357, + ])), + expected_max: Arc::new(Time64NanosecondArray::from(vec![ + 987654321012348, + 987654321012352, + 987654321012356, + 987654321012360, + ])), + expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]), // Assuming 1 null per row group for simplicity + expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4]), + column_name: "nanosecond", + } + .run(); +} + #[tokio::test] async fn test_dates_64_diff_rg_sizes() { // The file is created by 4 record batches (each has a null row), each has 5 rows but then will be split into 2 row groups with size 13, 7 diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index 94ae9ff601ecf..5555d5acdcd0f 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -28,7 +28,11 @@ use arrow::{ record_batch::RecordBatch, util::pretty::pretty_format_batches, }; -use arrow_array::{make_array, BooleanArray, Float32Array, StructArray}; +use arrow_array::{ + make_array, BooleanArray, Float32Array, StructArray, Time32MillisecondArray, + Time32SecondArray, Time64MicrosecondArray, Time64NanosecondArray, +}; +use arrow_schema::TimeUnit; use chrono::{Datelike, Duration, TimeDelta}; use datafusion::{ datasource::{physical_plan::ParquetExec, provider_as_source, TableProvider}, @@ -72,6 +76,10 @@ enum Scenario { Int32Range, UInt, UInt32Range, + Time32Second, + Time32Millisecond, + Time64Nanosecond, + Time64Microsecond, /// 7 Rows, for each i8, i16, i32, i64, u8, u16, u32, u64, f32, f64 /// -MIN, -100, -1, 0, 1, 100, MAX NumericLimits, @@ -442,6 +450,55 @@ fn make_int_batches(start: i8, end: i8) -> RecordBatch { .unwrap() } +/// Return record batch with Time32Second, Time32Millisecond sequences +fn make_time32_batches(scenario: Scenario, v: Vec) -> RecordBatch { + match scenario { + Scenario::Time32Second => { + let schema = Arc::new(Schema::new(vec![Field::new( + "second", + DataType::Time32(TimeUnit::Second), + true, + )])); + let array = Arc::new(Time32SecondArray::from(v)) as ArrayRef; + RecordBatch::try_new(schema, vec![array]).unwrap() + } + Scenario::Time32Millisecond => { + let schema = Arc::new(Schema::new(vec![Field::new( + "millisecond", + DataType::Time32(TimeUnit::Millisecond), + true, + )])); + let array = Arc::new(Time32MillisecondArray::from(v)) as ArrayRef; + RecordBatch::try_new(schema, vec![array]).unwrap() + } + _ => panic!("Unsupported scenario for Time32"), + } +} + +/// Return record batch with Time64Microsecond, Time64Nanosecond sequences +fn make_time64_batches(scenario: Scenario, v: Vec) -> RecordBatch { + match scenario { + Scenario::Time64Microsecond => { + let schema = Arc::new(Schema::new(vec![Field::new( + "microsecond", + DataType::Time64(TimeUnit::Microsecond), + true, + )])); + let array = Arc::new(Time64MicrosecondArray::from(v)) as ArrayRef; + RecordBatch::try_new(schema, vec![array]).unwrap() + } + Scenario::Time64Nanosecond => { + let schema = Arc::new(Schema::new(vec![Field::new( + "nanosecond", + DataType::Time64(TimeUnit::Nanosecond), + true, + )])); + let array = Arc::new(Time64NanosecondArray::from(v)) as ArrayRef; + RecordBatch::try_new(schema, vec![array]).unwrap() + } + _ => panic!("Unsupported scenario for Time64"), + } +} /// Return record batch with u8, u16, u32, and u64 sequences /// /// Columns are named @@ -964,6 +1021,106 @@ fn create_data_batch(scenario: Scenario) -> Vec { )])); vec![RecordBatch::try_new(schema, vec![struct_array_data]).unwrap()] } + Scenario::Time32Second => { + vec![ + make_time32_batches( + Scenario::Time32Second, + vec![18506, 18507, 18508, 18509], + ), + make_time32_batches( + Scenario::Time32Second, + vec![18510, 18511, 18512, 18513], + ), + make_time32_batches( + Scenario::Time32Second, + vec![18514, 18515, 18516, 18517], + ), + make_time32_batches( + Scenario::Time32Second, + vec![18518, 18519, 18520, 18521], + ), + ] + } + Scenario::Time32Millisecond => { + vec![ + make_time32_batches( + Scenario::Time32Millisecond, + vec![3600000, 3600001, 3600002, 3600003], + ), + make_time32_batches( + Scenario::Time32Millisecond, + vec![3600004, 3600005, 3600006, 3600007], + ), + make_time32_batches( + Scenario::Time32Millisecond, + vec![3600008, 3600009, 3600010, 3600011], + ), + make_time32_batches( + Scenario::Time32Millisecond, + vec![3600012, 3600013, 3600014, 3600015], + ), + ] + } + Scenario::Time64Microsecond => { + vec![ + make_time64_batches( + Scenario::Time64Microsecond, + vec![1234567890123, 1234567890124, 1234567890125, 1234567890126], + ), + make_time64_batches( + Scenario::Time64Microsecond, + vec![1234567890127, 1234567890128, 1234567890129, 1234567890130], + ), + make_time64_batches( + Scenario::Time64Microsecond, + vec![1234567890131, 1234567890132, 1234567890133, 1234567890134], + ), + make_time64_batches( + Scenario::Time64Microsecond, + vec![1234567890135, 1234567890136, 1234567890137, 1234567890138], + ), + ] + } + Scenario::Time64Nanosecond => { + vec![ + make_time64_batches( + Scenario::Time64Nanosecond, + vec![ + 987654321012345, + 987654321012346, + 987654321012347, + 987654321012348, + ], + ), + make_time64_batches( + Scenario::Time64Nanosecond, + vec![ + 987654321012349, + 987654321012350, + 987654321012351, + 987654321012352, + ], + ), + make_time64_batches( + Scenario::Time64Nanosecond, + vec![ + 987654321012353, + 987654321012354, + 987654321012355, + 987654321012356, + ], + ), + make_time64_batches( + Scenario::Time64Nanosecond, + vec![ + 987654321012357, + 987654321012358, + 987654321012359, + 987654321012360, + ], + ), + ] + } } }