Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ macro_rules! get_statistic {
Some(DataType::Date64) => {
Some(ScalarValue::Date64(Some(i64::from(*s.$func()) * 24 * 60 * 60 * 1000)))
}
Some(DataType::Time32(TimeUnit::Second)) => {
Some(ScalarValue::Time32Second(Some((*s.$func()))))
}
Some(DataType::Time32(TimeUnit::Millisecond)) => {
Some(ScalarValue::Time32Millisecond(Some((*s.$func()))))
}
_ => Some(ScalarValue::Int32(Some(*s.$func()))),
}
}
Expand All @@ -120,6 +126,12 @@ macro_rules! get_statistic {
Some(DataType::UInt64) => {
Some(ScalarValue::UInt64(Some((*s.$func()) as u64)))
}
Some(DataType::Time64(TimeUnit::Microsecond)) => {
Some(ScalarValue::Time64Microsecond(Some((*s.$func() as i64))))
}
Some(DataType::Time64(TimeUnit::Nanosecond)) => {
Some(ScalarValue::Time64Nanosecond(Some((*s.$func() as i64))))
}
Some(DataType::Timestamp(unit, timezone)) => {
Some(match unit {
TimeUnit::Second => ScalarValue::TimestampSecond(
Expand Down
102 changes: 99 additions & 3 deletions datafusion/core/tests/parquet/arrow_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@ use arrow_array::{
make_array, Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Date64Array,
Decimal128Array, FixedSizeBinaryArray, Float16Array, Float32Array, Float64Array,
Int16Array, Int32Array, Int64Array, Int8Array, LargeBinaryArray, LargeStringArray,
RecordBatch, StringArray, TimestampMicrosecondArray, TimestampMillisecondArray,
TimestampNanosecondArray, TimestampSecondArray, UInt16Array, UInt32Array,
UInt64Array, UInt8Array,
RecordBatch, StringArray, Time32MillisecondArray, Time32SecondArray,
Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray,
TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray,
UInt16Array, UInt32Array, UInt64Array, UInt8Array,
};
use arrow_schema::{DataType, Field, Schema};
use datafusion::datasource::physical_plan::parquet::{
Expand Down Expand Up @@ -918,6 +919,101 @@ async fn test_dates_32_diff_rg_sizes() {
.run();
}

#[tokio::test]
async fn test_time32_second_and_time64_nanosecond_diff_rg_sizes() {
let reader_time32 = TestReader {
scenario: Scenario::Time32Second,
row_per_group: 4,
};

// Test for Time32Second column
Test {
reader: reader_time32.build().await,
// Assuming specific minimum and maximum values for demonstration
expected_min: Arc::new(Time32SecondArray::from(vec![18506, 18510, 18514, 18518])),
expected_max: Arc::new(Time32SecondArray::from(vec![18509, 18513, 18517, 18521])),
expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]), // Assuming 1 null per row group for simplicity
expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4]),
column_name: "second",
}
.run();

let reader_time32_millisecond = TestReader {
scenario: Scenario::Time32Millisecond,
row_per_group: 4,
};

// Test for Time32Millisecond column
Test {
reader: reader_time32_millisecond.build().await,
// Assuming specific minimum and maximum values for demonstration
expected_min: Arc::new(Time32MillisecondArray::from(vec![
3600000, 3600004, 3600008, 3600012,
])),
expected_max: Arc::new(Time32MillisecondArray::from(vec![
3600003, 3600007, 3600011, 3600015,
])),
expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]), // Assuming 1 null per row group for simplicity
expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4]),
column_name: "millisecond",
}
.run();

let reader_time64_micro = TestReader {
scenario: Scenario::Time64Microsecond,
row_per_group: 4,
};

// Test for Time64MicroSecond column
Test {
reader: reader_time64_micro.build().await,
// Assuming specific minimum and maximum values for demonstration
expected_min: Arc::new(Time64MicrosecondArray::from(vec![
1234567890123,
1234567890127,
1234567890131,
1234567890135,
])),
expected_max: Arc::new(Time64MicrosecondArray::from(vec![
1234567890126,
1234567890130,
1234567890134,
1234567890138,
])),
expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]), // Assuming 1 null per row group for simplicity
expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4]),
column_name: "microsecond",
}
.run();

let reader_time64_nano = TestReader {
scenario: Scenario::Time64Nanosecond,
row_per_group: 4,
};

// Test for Time32Second column
Test {
reader: reader_time64_nano.build().await,
// Assuming specific minimum and maximum values for demonstration
expected_min: Arc::new(Time64NanosecondArray::from(vec![
987654321012345,
987654321012349,
987654321012353,
987654321012357,
])),
expected_max: Arc::new(Time64NanosecondArray::from(vec![
987654321012348,
987654321012352,
987654321012356,
987654321012360,
])),
expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]), // Assuming 1 null per row group for simplicity
expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4]),
column_name: "nanosecond",
}
.run();
}

#[tokio::test]
async fn test_dates_64_diff_rg_sizes() {
// The file is created by 4 record batches (each has a null row), each has 5 rows but then will be split into 2 row groups with size 13, 7
Expand Down
162 changes: 158 additions & 4 deletions datafusion/core/tests/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ use arrow::{
make_array, Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Date64Array,
DictionaryArray, FixedSizeBinaryArray, Float16Array, Float32Array, Float64Array,
Int16Array, Int32Array, Int64Array, Int8Array, LargeBinaryArray,
LargeStringArray, StringArray, StructArray, TimestampMicrosecondArray,
TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray,
UInt16Array, UInt32Array, UInt64Array, UInt8Array,
LargeStringArray, StringArray, StructArray, Time32MillisecondArray,
Time32SecondArray, Time64MicrosecondArray, Time64NanosecondArray,
TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
TimestampSecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
},
datatypes::{DataType, Field, Int32Type, Int8Type, Schema},
datatypes::{DataType, Field, Int32Type, Int8Type, Schema, TimeUnit},
record_batch::RecordBatch,
util::pretty::pretty_format_batches,
};
Expand Down Expand Up @@ -73,6 +74,10 @@ enum Scenario {
Int32Range,
UInt,
UInt32Range,
Time32Second,
Time32Millisecond,
Time64Nanosecond,
Time64Microsecond,
/// 7 Rows, for each i8, i16, i32, i64, u8, u16, u32, u64, f32, f64
/// -MIN, -100, -1, 0, 1, 100, MAX
NumericLimits,
Expand Down Expand Up @@ -486,6 +491,55 @@ fn make_int_batches(start: i8, end: i8) -> RecordBatch {
.unwrap()
}

/// Return record batch with Time32Second, Time32Millisecond sequences
fn make_time32_batches(scenario: Scenario, v: Vec<i32>) -> RecordBatch {
match scenario {
Scenario::Time32Second => {
let schema = Arc::new(Schema::new(vec![Field::new(
"second",
DataType::Time32(TimeUnit::Second),
true,
)]));
let array = Arc::new(Time32SecondArray::from(v)) as ArrayRef;
RecordBatch::try_new(schema, vec![array]).unwrap()
}
Scenario::Time32Millisecond => {
let schema = Arc::new(Schema::new(vec![Field::new(
"millisecond",
DataType::Time32(TimeUnit::Millisecond),
true,
)]));
let array = Arc::new(Time32MillisecondArray::from(v)) as ArrayRef;
RecordBatch::try_new(schema, vec![array]).unwrap()
}
_ => panic!("Unsupported scenario for Time32"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
_ => panic!("Unsupported scenario for Time32"),
s => panic!("Unsupported scenario for Time32: {s}"),

}
}

/// Return record batch with Time64Microsecond, Time64Nanosecond sequences
fn make_time64_batches(scenario: Scenario, v: Vec<i64>) -> RecordBatch {
match scenario {
Scenario::Time64Microsecond => {
let schema = Arc::new(Schema::new(vec![Field::new(
"microsecond",
DataType::Time64(TimeUnit::Microsecond),
true,
)]));
let array = Arc::new(Time64MicrosecondArray::from(v)) as ArrayRef;
RecordBatch::try_new(schema, vec![array]).unwrap()
}
Scenario::Time64Nanosecond => {
let schema = Arc::new(Schema::new(vec![Field::new(
"nanosecond",
DataType::Time64(TimeUnit::Nanosecond),
true,
)]));
let array = Arc::new(Time64NanosecondArray::from(v)) as ArrayRef;
RecordBatch::try_new(schema, vec![array]).unwrap()
}
_ => panic!("Unsupported scenario for Time64"),
}
}
/// Return record batch with u8, u16, u32, and u64 sequences
///
/// Columns are named
Expand Down Expand Up @@ -1121,6 +1175,106 @@ fn create_data_batch(scenario: Scenario) -> Vec<RecordBatch> {
)]));
vec![RecordBatch::try_new(schema, vec![struct_array_data]).unwrap()]
}
Scenario::Time32Second => {
vec![
make_time32_batches(
Scenario::Time32Second,
vec![18506, 18507, 18508, 18509],
),
make_time32_batches(
Scenario::Time32Second,
vec![18510, 18511, 18512, 18513],
),
make_time32_batches(
Scenario::Time32Second,
vec![18514, 18515, 18516, 18517],
),
make_time32_batches(
Scenario::Time32Second,
vec![18518, 18519, 18520, 18521],
),
]
}
Scenario::Time32Millisecond => {
vec![
make_time32_batches(
Scenario::Time32Millisecond,
vec![3600000, 3600001, 3600002, 3600003],
),
make_time32_batches(
Scenario::Time32Millisecond,
vec![3600004, 3600005, 3600006, 3600007],
),
make_time32_batches(
Scenario::Time32Millisecond,
vec![3600008, 3600009, 3600010, 3600011],
),
make_time32_batches(
Scenario::Time32Millisecond,
vec![3600012, 3600013, 3600014, 3600015],
),
]
}
Scenario::Time64Microsecond => {
vec![
make_time64_batches(
Scenario::Time64Microsecond,
vec![1234567890123, 1234567890124, 1234567890125, 1234567890126],
),
make_time64_batches(
Scenario::Time64Microsecond,
vec![1234567890127, 1234567890128, 1234567890129, 1234567890130],
),
make_time64_batches(
Scenario::Time64Microsecond,
vec![1234567890131, 1234567890132, 1234567890133, 1234567890134],
),
make_time64_batches(
Scenario::Time64Microsecond,
vec![1234567890135, 1234567890136, 1234567890137, 1234567890138],
),
]
}
Scenario::Time64Nanosecond => {
vec![
make_time64_batches(
Scenario::Time64Nanosecond,
vec![
987654321012345,
987654321012346,
987654321012347,
987654321012348,
],
),
make_time64_batches(
Scenario::Time64Nanosecond,
vec![
987654321012349,
987654321012350,
987654321012351,
987654321012352,
],
),
make_time64_batches(
Scenario::Time64Nanosecond,
vec![
987654321012353,
987654321012354,
987654321012355,
987654321012356,
],
),
make_time64_batches(
Scenario::Time64Nanosecond,
vec![
987654321012357,
987654321012358,
987654321012359,
987654321012360,
],
),
]
}
Scenario::UTF8 => {
vec![
make_utf8_batch(vec![Some("a"), Some("b"), Some("c"), Some("d"), None]),
Expand Down