Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 43 additions & 2 deletions datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@ macro_rules! get_statistic {
Some(DataType::Binary) => {
Some(ScalarValue::Binary(Some(s.$bytes_func().to_vec())))
}
Some(DataType::LargeBinary) => {
Some(ScalarValue::LargeBinary(Some(s.$bytes_func().to_vec())))
}
Some(DataType::LargeUtf8) | _ => {
let utf8_value = std::str::from_utf8(s.$bytes_func())
.map(|s| s.to_string())
Expand Down Expand Up @@ -427,8 +430,9 @@ mod test {
use arrow_array::{
new_null_array, Array, BinaryArray, BooleanArray, Date32Array, Date64Array,
Decimal128Array, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array,
Int8Array, RecordBatch, StringArray, StructArray, TimestampMicrosecondArray,
TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray,
Int8Array, LargeBinaryArray, RecordBatch, StringArray, StructArray,
TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
TimestampSecondArray,
};
use arrow_schema::{Field, SchemaRef};
use bytes::Bytes;
Expand Down Expand Up @@ -965,6 +969,34 @@ mod test {
.run()
}

#[test]
fn roundtrip_large_binary_array() {
let input: Vec<Option<&[u8]>> = vec![
// row group 1
Some(b"A"),
None,
Some(b"Q"),
// row group 2
Some(b"ZZ"),
Some(b"AA"),
None,
// row group 3
None,
None,
None,
];

let expected_min: Vec<Option<&[u8]>> = vec![Some(b"A"), Some(b"AA"), None];
let expected_max: Vec<Option<&[u8]>> = vec![Some(b"Q"), Some(b"ZZ"), None];

Test {
input: large_binary_array(input),
expected_min: large_binary_array(expected_min),
expected_max: large_binary_array(expected_max),
}
.run();
}

#[test]
fn struct_and_non_struct() {
// Ensures that statistics for an array that appears *after* a struct
Expand Down Expand Up @@ -1439,4 +1471,13 @@ mod test {
);
Arc::new(array)
}

fn large_binary_array<'a>(
input: impl IntoIterator<Item = Option<&'a [u8]>>,
) -> ArrayRef {
let array =
LargeBinaryArray::from(input.into_iter().collect::<Vec<Option<&[u8]>>>());

Arc::new(array)
}
}
28 changes: 24 additions & 4 deletions datafusion/core/tests/parquet/arrow_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ use arrow::datatypes::{
use arrow_array::{
make_array, Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Date64Array,
Decimal128Array, FixedSizeBinaryArray, Float16Array, Float32Array, Float64Array,
Int16Array, Int32Array, Int64Array, Int8Array, LargeStringArray, RecordBatch,
StringArray, TimestampMicrosecondArray, TimestampMillisecondArray,
Int16Array, Int32Array, Int64Array, Int8Array, LargeBinaryArray, LargeStringArray,
RecordBatch, StringArray, TimestampMicrosecondArray, TimestampMillisecondArray,
TimestampNanosecondArray, TimestampSecondArray, UInt16Array, UInt32Array,
UInt64Array, UInt8Array,
};
Expand Down Expand Up @@ -1261,7 +1261,6 @@ async fn test_decimal() {
}
.run();
}

#[tokio::test]
async fn test_dictionary() {
let reader = TestReader {
Expand Down Expand Up @@ -1302,11 +1301,12 @@ async fn test_dictionary() {

#[tokio::test]
async fn test_byte() {
// This creates a parquet file of 4 columns
// This creates a parquet file of 5 columns
// "name"
// "service_string"
// "service_binary"
// "service_fixedsize"
// "service_large_binary"

// file has 3 record batches, each has 5 rows. They will be saved into 3 row groups
let reader = TestReader {
Expand Down Expand Up @@ -1389,6 +1389,26 @@ async fn test_byte() {
column_name: "service_fixedsize",
}
.run();

let expected_service_large_binary_min_values: Vec<&[u8]> =
vec![b"frontend five", b"backend one", b"backend eight"];

let expected_service_large_binary_max_values: Vec<&[u8]> =
vec![b"frontend two", b"frontend six", b"backend six"];

Test {
reader: reader.build().await,
expected_min: Arc::new(LargeBinaryArray::from(
expected_service_large_binary_min_values,
)),
expected_max: Arc::new(LargeBinaryArray::from(
expected_service_large_binary_max_values,
)),
expected_null_counts: UInt64Array::from(vec![0, 0, 0]),
expected_row_counts: UInt64Array::from(vec![5, 5, 5]),
column_name: "service_large_binary",
}
.run();
}

// PeriodsInColumnNames
Expand Down
39 changes: 35 additions & 4 deletions datafusion/core/tests/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ use arrow::{
array::{
make_array, Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Date64Array,
DictionaryArray, FixedSizeBinaryArray, Float16Array, Float32Array, Float64Array,
Int16Array, Int32Array, Int64Array, Int8Array, LargeStringArray, StringArray,
StructArray, TimestampMicrosecondArray, TimestampMillisecondArray,
TimestampNanosecondArray, TimestampSecondArray, UInt16Array, UInt32Array,
UInt64Array, UInt8Array,
Int16Array, Int32Array, Int64Array, Int8Array, LargeBinaryArray,
LargeStringArray, StringArray, StructArray, TimestampMicrosecondArray,
TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray,
UInt16Array, UInt32Array, UInt64Array, UInt8Array,
},
datatypes::{DataType, Field, Int32Type, Int8Type, Schema},
record_batch::RecordBatch,
Expand Down Expand Up @@ -648,6 +648,8 @@ fn make_bytearray_batch(
string_values: Vec<&str>,
binary_values: Vec<&[u8]>,
fixedsize_values: Vec<&[u8; 3]>,
// i64 offset.
large_binary_values: Vec<&[u8]>,
) -> RecordBatch {
let num_rows = string_values.len();
let name: StringArray = std::iter::repeat(Some(name)).take(num_rows).collect();
Expand All @@ -658,6 +660,8 @@ fn make_bytearray_batch(
.map(|value| Some(value.as_slice()))
.collect::<Vec<_>>()
.into();
let service_large_binary: LargeBinaryArray =
large_binary_values.iter().map(Some).collect();

let schema = Schema::new(vec![
Field::new("name", name.data_type().clone(), true),
Expand All @@ -669,6 +673,11 @@ fn make_bytearray_batch(
service_fixedsize.data_type().clone(),
true,
),
Field::new(
"service_large_binary",
service_large_binary.data_type().clone(),
true,
),
]);
let schema = Arc::new(schema);

Expand All @@ -679,6 +688,7 @@ fn make_bytearray_batch(
Arc::new(service_string),
Arc::new(service_binary),
Arc::new(service_fixedsize),
Arc::new(service_large_binary),
],
)
.unwrap()
Expand Down Expand Up @@ -1000,6 +1010,13 @@ fn create_data_batch(scenario: Scenario) -> Vec<RecordBatch> {
b"frontend five",
],
vec![b"fe1", b"fe2", b"fe3", b"fe7", b"fe5"],
vec![
b"frontend one",
b"frontend two",
b"frontend three",
b"frontend seven",
b"frontend five",
],
),
make_bytearray_batch(
"mixed",
Expand All @@ -1018,6 +1035,13 @@ fn create_data_batch(scenario: Scenario) -> Vec<RecordBatch> {
b"backend three",
],
vec![b"fe6", b"fe4", b"be1", b"be2", b"be3"],
vec![
b"frontend six",
b"frontend four",
b"backend one",
b"backend two",
b"backend three",
],
),
make_bytearray_batch(
"all backends",
Expand All @@ -1036,6 +1060,13 @@ fn create_data_batch(scenario: Scenario) -> Vec<RecordBatch> {
b"backend eight",
],
vec![b"be4", b"be5", b"be6", b"be7", b"be8"],
vec![
b"backend four",
b"backend five",
b"backend six",
b"backend seven",
b"backend eight",
],
),
]
}
Expand Down