diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index 9d59808508604..756eb8eb10d68 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -104,6 +104,12 @@ macro_rules! get_statistic { Some(DataType::Date64) => { Some(ScalarValue::Date64(Some(i64::from(*s.$func()) * 24 * 60 * 60 * 1000))) } + Some(DataType::Time32(TimeUnit::Second)) => { + Some(ScalarValue::Time32Second(Some((*s.$func())))) + } + Some(DataType::Time32(TimeUnit::Millisecond)) => { + Some(ScalarValue::Time32Millisecond(Some((*s.$func())))) + } _ => Some(ScalarValue::Int32(Some(*s.$func()))), } } @@ -120,6 +126,12 @@ macro_rules! get_statistic { Some(DataType::UInt64) => { Some(ScalarValue::UInt64(Some((*s.$func()) as u64))) } + Some(DataType::Time64(TimeUnit::Microsecond)) => { + Some(ScalarValue::Time64Microsecond(Some((*s.$func() as i64)))) + } + Some(DataType::Time64(TimeUnit::Nanosecond)) => { + Some(ScalarValue::Time64Nanosecond(Some((*s.$func() as i64)))) + } Some(DataType::Timestamp(unit, timezone)) => { Some(match unit { TimeUnit::Second => ScalarValue::TimestampSecond( diff --git a/datafusion/core/tests/parquet/arrow_statistics.rs b/datafusion/core/tests/parquet/arrow_statistics.rs index 3b7961236efa8..e58bf23705bfb 100644 --- a/datafusion/core/tests/parquet/arrow_statistics.rs +++ b/datafusion/core/tests/parquet/arrow_statistics.rs @@ -31,9 +31,10 @@ use arrow_array::{ make_array, Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Date64Array, Decimal128Array, FixedSizeBinaryArray, Float16Array, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, LargeBinaryArray, LargeStringArray, - RecordBatch, StringArray, TimestampMicrosecondArray, TimestampMillisecondArray, - TimestampNanosecondArray, TimestampSecondArray, UInt16Array, UInt32Array, - UInt64Array, UInt8Array, + RecordBatch, StringArray, Time32MillisecondArray, Time32SecondArray, + Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray, + TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, + UInt16Array, UInt32Array, UInt64Array, UInt8Array, }; use arrow_schema::{DataType, Field, Schema}; use datafusion::datasource::physical_plan::parquet::{ @@ -918,6 +919,101 @@ async fn test_dates_32_diff_rg_sizes() { .run(); } +#[tokio::test] +async fn test_time32_second_and_time64_nanosecond_diff_rg_sizes() { + let reader_time32 = TestReader { + scenario: Scenario::Time32Second, + row_per_group: 4, + }; + + // Test for Time32Second column + Test { + reader: reader_time32.build().await, + // Assuming specific minimum and maximum values for demonstration + expected_min: Arc::new(Time32SecondArray::from(vec![18506, 18510, 18514, 18518])), + expected_max: Arc::new(Time32SecondArray::from(vec![18509, 18513, 18517, 18521])), + expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]), // Assuming 1 null per row group for simplicity + expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4]), + column_name: "second", + } + .run(); + + let reader_time32_millisecond = TestReader { + scenario: Scenario::Time32Millisecond, + row_per_group: 4, + }; + + // Test for Time32Millisecond column + Test { + reader: reader_time32_millisecond.build().await, + // Assuming specific minimum and maximum values for demonstration + expected_min: Arc::new(Time32MillisecondArray::from(vec![ + 3600000, 3600004, 3600008, 3600012, + ])), + expected_max: Arc::new(Time32MillisecondArray::from(vec![ + 3600003, 3600007, 3600011, 3600015, + ])), + expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]), // Assuming 1 null per row group for simplicity + expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4]), + column_name: "millisecond", + } + .run(); + + let reader_time64_micro = TestReader { + scenario: Scenario::Time64Microsecond, + row_per_group: 4, + }; + + // Test for Time64MicroSecond column + Test { + reader: reader_time64_micro.build().await, + // Assuming specific minimum and maximum values for demonstration + expected_min: Arc::new(Time64MicrosecondArray::from(vec![ + 1234567890123, + 1234567890127, + 1234567890131, + 1234567890135, + ])), + expected_max: Arc::new(Time64MicrosecondArray::from(vec![ + 1234567890126, + 1234567890130, + 1234567890134, + 1234567890138, + ])), + expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]), // Assuming 1 null per row group for simplicity + expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4]), + column_name: "microsecond", + } + .run(); + + let reader_time64_nano = TestReader { + scenario: Scenario::Time64Nanosecond, + row_per_group: 4, + }; + + // Test for Time32Second column + Test { + reader: reader_time64_nano.build().await, + // Assuming specific minimum and maximum values for demonstration + expected_min: Arc::new(Time64NanosecondArray::from(vec![ + 987654321012345, + 987654321012349, + 987654321012353, + 987654321012357, + ])), + expected_max: Arc::new(Time64NanosecondArray::from(vec![ + 987654321012348, + 987654321012352, + 987654321012356, + 987654321012360, + ])), + expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]), // Assuming 1 null per row group for simplicity + expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4]), + column_name: "nanosecond", + } + .run(); +} + #[tokio::test] async fn test_dates_64_diff_rg_sizes() { // The file is created by 4 record batches (each has a null row), each has 5 rows but then will be split into 2 row groups with size 13, 7 diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index fd2f184328cd2..72f659205a981 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -22,11 +22,12 @@ use arrow::{ make_array, Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Date64Array, DictionaryArray, FixedSizeBinaryArray, Float16Array, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, LargeBinaryArray, - LargeStringArray, StringArray, StructArray, TimestampMicrosecondArray, - TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, - UInt16Array, UInt32Array, UInt64Array, UInt8Array, + LargeStringArray, StringArray, StructArray, Time32MillisecondArray, + Time32SecondArray, Time64MicrosecondArray, Time64NanosecondArray, + TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, + TimestampSecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array, }, - datatypes::{DataType, Field, Int32Type, Int8Type, Schema}, + datatypes::{DataType, Field, Int32Type, Int8Type, Schema, TimeUnit}, record_batch::RecordBatch, util::pretty::pretty_format_batches, }; @@ -73,6 +74,10 @@ enum Scenario { Int32Range, UInt, UInt32Range, + Time32Second, + Time32Millisecond, + Time64Nanosecond, + Time64Microsecond, /// 7 Rows, for each i8, i16, i32, i64, u8, u16, u32, u64, f32, f64 /// -MIN, -100, -1, 0, 1, 100, MAX NumericLimits, @@ -486,6 +491,55 @@ fn make_int_batches(start: i8, end: i8) -> RecordBatch { .unwrap() } +/// Return record batch with Time32Second, Time32Millisecond sequences +fn make_time32_batches(scenario: Scenario, v: Vec) -> RecordBatch { + match scenario { + Scenario::Time32Second => { + let schema = Arc::new(Schema::new(vec![Field::new( + "second", + DataType::Time32(TimeUnit::Second), + true, + )])); + let array = Arc::new(Time32SecondArray::from(v)) as ArrayRef; + RecordBatch::try_new(schema, vec![array]).unwrap() + } + Scenario::Time32Millisecond => { + let schema = Arc::new(Schema::new(vec![Field::new( + "millisecond", + DataType::Time32(TimeUnit::Millisecond), + true, + )])); + let array = Arc::new(Time32MillisecondArray::from(v)) as ArrayRef; + RecordBatch::try_new(schema, vec![array]).unwrap() + } + _ => panic!("Unsupported scenario for Time32"), + } +} + +/// Return record batch with Time64Microsecond, Time64Nanosecond sequences +fn make_time64_batches(scenario: Scenario, v: Vec) -> RecordBatch { + match scenario { + Scenario::Time64Microsecond => { + let schema = Arc::new(Schema::new(vec![Field::new( + "microsecond", + DataType::Time64(TimeUnit::Microsecond), + true, + )])); + let array = Arc::new(Time64MicrosecondArray::from(v)) as ArrayRef; + RecordBatch::try_new(schema, vec![array]).unwrap() + } + Scenario::Time64Nanosecond => { + let schema = Arc::new(Schema::new(vec![Field::new( + "nanosecond", + DataType::Time64(TimeUnit::Nanosecond), + true, + )])); + let array = Arc::new(Time64NanosecondArray::from(v)) as ArrayRef; + RecordBatch::try_new(schema, vec![array]).unwrap() + } + _ => panic!("Unsupported scenario for Time64"), + } +} /// Return record batch with u8, u16, u32, and u64 sequences /// /// Columns are named @@ -1121,6 +1175,106 @@ fn create_data_batch(scenario: Scenario) -> Vec { )])); vec![RecordBatch::try_new(schema, vec![struct_array_data]).unwrap()] } + Scenario::Time32Second => { + vec![ + make_time32_batches( + Scenario::Time32Second, + vec![18506, 18507, 18508, 18509], + ), + make_time32_batches( + Scenario::Time32Second, + vec![18510, 18511, 18512, 18513], + ), + make_time32_batches( + Scenario::Time32Second, + vec![18514, 18515, 18516, 18517], + ), + make_time32_batches( + Scenario::Time32Second, + vec![18518, 18519, 18520, 18521], + ), + ] + } + Scenario::Time32Millisecond => { + vec![ + make_time32_batches( + Scenario::Time32Millisecond, + vec![3600000, 3600001, 3600002, 3600003], + ), + make_time32_batches( + Scenario::Time32Millisecond, + vec![3600004, 3600005, 3600006, 3600007], + ), + make_time32_batches( + Scenario::Time32Millisecond, + vec![3600008, 3600009, 3600010, 3600011], + ), + make_time32_batches( + Scenario::Time32Millisecond, + vec![3600012, 3600013, 3600014, 3600015], + ), + ] + } + Scenario::Time64Microsecond => { + vec![ + make_time64_batches( + Scenario::Time64Microsecond, + vec![1234567890123, 1234567890124, 1234567890125, 1234567890126], + ), + make_time64_batches( + Scenario::Time64Microsecond, + vec![1234567890127, 1234567890128, 1234567890129, 1234567890130], + ), + make_time64_batches( + Scenario::Time64Microsecond, + vec![1234567890131, 1234567890132, 1234567890133, 1234567890134], + ), + make_time64_batches( + Scenario::Time64Microsecond, + vec![1234567890135, 1234567890136, 1234567890137, 1234567890138], + ), + ] + } + Scenario::Time64Nanosecond => { + vec![ + make_time64_batches( + Scenario::Time64Nanosecond, + vec![ + 987654321012345, + 987654321012346, + 987654321012347, + 987654321012348, + ], + ), + make_time64_batches( + Scenario::Time64Nanosecond, + vec![ + 987654321012349, + 987654321012350, + 987654321012351, + 987654321012352, + ], + ), + make_time64_batches( + Scenario::Time64Nanosecond, + vec![ + 987654321012353, + 987654321012354, + 987654321012355, + 987654321012356, + ], + ), + make_time64_batches( + Scenario::Time64Nanosecond, + vec![ + 987654321012357, + 987654321012358, + 987654321012359, + 987654321012360, + ], + ), + ] + } Scenario::UTF8 => { vec![ make_utf8_batch(vec![Some("a"), Some("b"), Some("c"), Some("d"), None]),