diff --git a/rust/parquet/src/arrow/array_reader.rs b/rust/parquet/src/arrow/array_reader.rs index 037a881c7e1..f4d2a79fd8d 100644 --- a/rust/parquet/src/arrow/array_reader.rs +++ b/rust/parquet/src/arrow/array_reader.rs @@ -33,14 +33,14 @@ use arrow::array::{ use arrow::buffer::{Buffer, MutableBuffer}; use arrow::datatypes::{ ArrowPrimitiveType, BooleanType as ArrowBooleanType, DataType as ArrowType, - Date32Type as ArrowDate32Type, Date64Type as ArrowDate64Type, + Date32Type as ArrowDate32Type, Date64Type as ArrowDate64Type, DateUnit, DurationMicrosecondType as ArrowDurationMicrosecondType, DurationMillisecondType as ArrowDurationMillisecondType, DurationNanosecondType as ArrowDurationNanosecondType, DurationSecondType as ArrowDurationSecondType, Field, Float32Type as ArrowFloat32Type, Float64Type as ArrowFloat64Type, Int16Type as ArrowInt16Type, Int32Type as ArrowInt32Type, - Int64Type as ArrowInt64Type, Int8Type as ArrowInt8Type, Schema, + Int64Type as ArrowInt64Type, Int8Type as ArrowInt8Type, IntervalUnit, Schema, Time32MillisecondType as ArrowTime32MillisecondType, Time32SecondType as ArrowTime32SecondType, Time64MicrosecondType as ArrowTime64MicrosecondType, @@ -57,7 +57,9 @@ use arrow::util::bit_util; use crate::arrow::converter::{ BinaryArrayConverter, BinaryConverter, Converter, DecimalArrayConverter, DecimalConverter, FixedLenBinaryConverter, FixedSizeArrayConverter, - Int96ArrayConverter, Int96Converter, LargeBinaryArrayConverter, LargeBinaryConverter, + Int96ArrayConverter, Int96Converter, IntervalDayTimeArrayConverter, + IntervalDayTimeConverter, IntervalYearMonthArrayConverter, + IntervalYearMonthConverter, LargeBinaryArrayConverter, LargeBinaryConverter, LargeUtf8ArrayConverter, LargeUtf8Converter, Utf8ArrayConverter, Utf8Converter, }; use crate::arrow::record_reader::RecordReader; @@ -333,11 +335,23 @@ impl ArrayReader for PrimitiveArrayReader { }; // cast to Arrow type - // TODO: we need to check if it's fine for this to be fallible. - // My assumption is that we can't get to an illegal cast as we can only - // generate types that are supported, because we'd have gotten them from - // the metadata which was written to the Parquet sink - let array = arrow::compute::cast(&array, self.get_data_type())?; + // We make a strong assumption here that the casts should be infallible. + // If the cast fails because of incompatible datatypes, then there might + // be a bigger problem with how Arrow schemas are converted to Parquet. + // + // As there is not always a 1:1 mapping between Arrow and Parquet, there + // are datatypes which we must convert explicitly. + // These are: + // - date64: we should cast int32 to date32, then date32 to date64. + let target_type = self.get_data_type(); + let array = match target_type { + ArrowType::Date64(_) => { + // this is cheap as it internally reinterprets the data + let a = arrow::compute::cast(&array, &ArrowType::Date32(DateUnit::Day))?; + arrow::compute::cast(&a, target_type)? + } + _ => arrow::compute::cast(&array, target_type)?, + }; // save definition and repetition buffers self.def_levels_buffer = self.record_reader.consume_def_levels()?; @@ -1570,28 +1584,92 @@ impl<'a> ArrayReaderBuilder { )?)) } PhysicalType::FIXED_LEN_BYTE_ARRAY => { - let byte_width = match *cur_type { - Type::PrimitiveType { - ref type_length, .. - } => *type_length, - _ => { - return Err(ArrowError( - "Expected a physical type, not a group type".to_string(), - )) + if cur_type.get_basic_info().logical_type() == LogicalType::INTERVAL { + let byte_width = match *cur_type { + Type::PrimitiveType { + ref type_length, .. + } => *type_length, + _ => { + return Err(ArrowError( + "Expected a physical type, not a group type".to_string(), + )) + } + }; + if byte_width != 12 { + return Err(ArrowError(format!( + "Parquet interval type should have length of 12, found {}", + byte_width + ))); } - }; - let converter = FixedLenBinaryConverter::new( - FixedSizeArrayConverter::new(byte_width), - ); - Ok(Box::new(ComplexObjectArrayReader::< - FixedLenByteArrayType, - FixedLenBinaryConverter, - >::new( - page_iterator, - column_desc, - converter, - arrow_type, - )?)) + match arrow_type { + Some(ArrowType::Interval(IntervalUnit::DayTime)) => { + let converter = IntervalDayTimeConverter::new( + IntervalDayTimeArrayConverter {}, + ); + Ok(Box::new(ComplexObjectArrayReader::< + FixedLenByteArrayType, + IntervalDayTimeConverter, + >::new( + page_iterator, + column_desc, + converter, + arrow_type, + )?)) + } + Some(ArrowType::Interval(IntervalUnit::YearMonth)) => { + let converter = IntervalYearMonthConverter::new( + IntervalYearMonthArrayConverter {}, + ); + Ok(Box::new(ComplexObjectArrayReader::< + FixedLenByteArrayType, + IntervalYearMonthConverter, + >::new( + page_iterator, + column_desc, + converter, + arrow_type, + )?)) + } + Some(t) => Err(ArrowError(format!( + "Cannot write a Parquet interval to {:?}", + t + ))), + None => { + // we do not support an interval not matched to an Arrow type, + // because we risk data loss as we won't know which of the 12 bytes + // are or should be populated + Err(ArrowError( + "Cannot write a Parquet interval with no Arrow type specified. + There is a risk of data loss as Arrow either supports YearMonth or + DayTime precision. Without the Arrow type, we cannot infer the type. + ".to_string() + )) + } + } + } else { + let byte_width = match *cur_type { + Type::PrimitiveType { + ref type_length, .. + } => *type_length, + _ => { + return Err(ArrowError( + "Expected a physical type, not a group type".to_string(), + )) + } + }; + let converter = FixedLenBinaryConverter::new( + FixedSizeArrayConverter::new(byte_width), + ); + Ok(Box::new(ComplexObjectArrayReader::< + FixedLenByteArrayType, + FixedLenBinaryConverter, + >::new( + page_iterator, + column_desc, + converter, + arrow_type, + )?)) + } } } } diff --git a/rust/parquet/src/arrow/arrow_reader.rs b/rust/parquet/src/arrow/arrow_reader.rs index 5aba18ea7fa..127ab832183 100644 --- a/rust/parquet/src/arrow/arrow_reader.rs +++ b/rust/parquet/src/arrow/arrow_reader.rs @@ -235,7 +235,8 @@ impl ParquetRecordBatchReader { mod tests { use crate::arrow::arrow_reader::{ArrowReader, ParquetFileArrowReader}; use crate::arrow::converter::{ - Converter, FixedSizeArrayConverter, FromConverter, Utf8ArrayConverter, + Converter, FixedSizeArrayConverter, FromConverter, IntervalDayTimeArrayConverter, + Utf8ArrayConverter, }; use crate::column::writer::get_typed_column_writer_mut; use crate::data_type::{ @@ -249,9 +250,7 @@ mod tests { use crate::schema::parser::parse_message_type; use crate::schema::types::TypePtr; use crate::util::test_common::{get_temp_filename, RandGen}; - use arrow::array::{ - Array, BooleanArray, FixedSizeBinaryArray, StringArray, StructArray, - }; + use arrow::array::*; use arrow::record_batch::RecordBatchReader; use rand::RngCore; use serde_json::json; @@ -362,6 +361,23 @@ mod tests { >(20, message_type, &converter); } + #[test] + fn test_interval_day_time_column_reader() { + let message_type = " + message test_schema { + REQUIRED FIXED_LEN_BYTE_ARRAY (12) leaf (INTERVAL); + } + "; + + let converter = IntervalDayTimeArrayConverter {}; + run_single_column_reader_tests::< + FixedLenByteArrayType, + IntervalDayTimeArray, + IntervalDayTimeArrayConverter, + RandFixedLenGen, + >(12, message_type, &converter); + } + struct RandUtf8Gen {} impl RandGen for RandUtf8Gen { diff --git a/rust/parquet/src/arrow/arrow_writer.rs b/rust/parquet/src/arrow/arrow_writer.rs index ca78ffc957b..7dd2da2153a 100644 --- a/rust/parquet/src/arrow/arrow_writer.rs +++ b/rust/parquet/src/arrow/arrow_writer.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use arrow::array as arrow_array; -use arrow::datatypes::{DataType as ArrowDataType, SchemaRef}; +use arrow::datatypes::{DataType as ArrowDataType, DateUnit, IntervalUnit, SchemaRef}; use arrow::record_batch::RecordBatch; use arrow_array::Array; @@ -217,15 +217,20 @@ fn write_leaf( let indices = filter_array_indices(&levels); let written = match writer { ColumnWriter::Int32ColumnWriter(ref mut typed) => { - let array = arrow::compute::cast(column, &ArrowDataType::Int32)?; + // If the column is a Date64, we cast it to a Date32, and then interpret that as Int32 + let array = if let ArrowDataType::Date64(_) = column.data_type() { + let array = + arrow::compute::cast(column, &ArrowDataType::Date32(DateUnit::Day))?; + Arc::new(arrow_array::Int32Array::from(array.data())) + } else { + arrow::compute::cast(column, &ArrowDataType::Int32)? + }; let array = array .as_any() .downcast_ref::() .expect("Unable to get int32 array"); - // assigning values to make it easier to debug - let slice = get_numeric_array_slice::(&array, &indices); typed.write_batch( - slice.as_slice(), + get_numeric_array_slice::(&array, &indices).as_slice(), Some(levels.definition.as_slice()), levels.repetition.as_deref(), )? @@ -300,8 +305,43 @@ fn write_leaf( } _ => unreachable!("Currently unreachable because data type not supported"), }, - ColumnWriter::FixedLenByteArrayColumnWriter(ref mut _typed) => { - unreachable!("Currently unreachable because data type not supported") + ColumnWriter::FixedLenByteArrayColumnWriter(ref mut typed) => { + let bytes = match column.data_type() { + ArrowDataType::Interval(interval_unit) => match interval_unit { + IntervalUnit::YearMonth => { + let array = column + .as_any() + .downcast_ref::() + .unwrap(); + get_interval_ym_array_slice(&array, &indices) + } + IntervalUnit::DayTime => { + let array = column + .as_any() + .downcast_ref::() + .unwrap(); + get_interval_dt_array_slice(&array, &indices) + } + }, + ArrowDataType::FixedSizeBinary(_) => { + let array = column + .as_any() + .downcast_ref::() + .unwrap(); + get_fsb_array_slice(&array, &indices) + } + _ => { + return Err(ParquetError::NYI( + "Attempting to write an Arrow type that is not yet implemented" + .to_string(), + )); + } + }; + typed.write_batch( + bytes.as_slice(), + Some(levels.definition.as_slice()), + levels.repetition.as_deref(), + )? } }; Ok(written as i64) @@ -358,6 +398,51 @@ fn get_bool_array_slice( values } +/// Returns 12-byte values representing 3 values of months, days and milliseconds (4-bytes each). +/// An Arrow YearMonth interval only stores months, thus only the first 4 bytes are populated. +fn get_interval_ym_array_slice( + array: &arrow_array::IntervalYearMonthArray, + indices: &[usize], +) -> Vec { + let mut values = Vec::with_capacity(indices.len()); + for i in indices { + let mut value = array.value(*i).to_le_bytes().to_vec(); + let mut suffix = vec![0; 8]; + value.append(&mut suffix); + values.push(FixedLenByteArray::from(ByteArray::from(value))) + } + values +} + +/// Returns 12-byte values representing 3 values of months, days and milliseconds (4-bytes each). +/// An Arrow DayTime interval only stores days and millis, thus the first 4 bytes are not populated. +fn get_interval_dt_array_slice( + array: &arrow_array::IntervalDayTimeArray, + indices: &[usize], +) -> Vec { + let mut values = Vec::with_capacity(indices.len()); + for i in indices { + let mut prefix = vec![0; 4]; + let mut value = array.value(*i).to_le_bytes().to_vec(); + prefix.append(&mut value); + debug_assert_eq!(prefix.len(), 12); + values.push(FixedLenByteArray::from(ByteArray::from(prefix))); + } + values +} + +fn get_fsb_array_slice( + array: &arrow_array::FixedSizeBinaryArray, + indices: &[usize], +) -> Vec { + let mut values = Vec::with_capacity(indices.len()); + for i in indices { + let value = array.value(*i).to_vec(); + values.push(FixedLenByteArray::from(ByteArray::from(value))) + } + values +} + /// Given a level's information, calculate the offsets required to index an array /// correctly. fn filter_array_indices(level: &LevelInfo) -> Vec { @@ -955,10 +1040,10 @@ mod tests { } #[test] - #[ignore] // Date support isn't correct yet fn date64_single_column() { + // Date64 must be a multiple of 86400000, see ARROW-10925 required_and_optional::( - 0..SMALL_SIZE as i64, + (0..(SMALL_SIZE as i64 * 86400000)).step_by(86400000), "date64_single_column", ); } @@ -1032,7 +1117,6 @@ mod tests { } #[test] - #[should_panic(expected = "Currently unreachable because data type not supported")] fn interval_year_month_single_column() { required_and_optional::( 0..SMALL_SIZE as i32, @@ -1041,7 +1125,6 @@ mod tests { } #[test] - #[should_panic(expected = "Currently unreachable because data type not supported")] fn interval_day_time_single_column() { required_and_optional::( 0..SMALL_SIZE as i64, diff --git a/rust/parquet/src/arrow/converter.rs b/rust/parquet/src/arrow/converter.rs index 2c109bcfbb5..47ed88189d5 100644 --- a/rust/parquet/src/arrow/converter.rs +++ b/rust/parquet/src/arrow/converter.rs @@ -19,11 +19,13 @@ use crate::data_type::{ByteArray, DataType, FixedLenByteArray, Int96}; // TODO: clean up imports (best done when there are few moving parts) use arrow::array::{ Array, ArrayRef, BinaryBuilder, DecimalBuilder, FixedSizeBinaryBuilder, - LargeBinaryBuilder, LargeStringBuilder, PrimitiveBuilder, PrimitiveDictionaryBuilder, - StringBuilder, StringDictionaryBuilder, TimestampNanosecondBuilder, + IntervalDayTimeArray, IntervalDayTimeBuilder, IntervalYearMonthArray, + IntervalYearMonthBuilder, LargeBinaryBuilder, LargeStringBuilder, PrimitiveBuilder, + PrimitiveDictionaryBuilder, StringBuilder, StringDictionaryBuilder, + TimestampNanosecondBuilder, }; use arrow::compute::cast; -use std::convert::From; +use std::convert::{From, TryInto}; use std::sync::Arc; use crate::errors::Result; @@ -116,6 +118,55 @@ impl Converter>, DecimalArray> for DecimalArrayCon Ok(builder.finish()) } } +/// An Arrow Interval converter, which reads the first 4 bytes of a Parquet interval, +/// and interprets it as an i32 value representing the Arrow YearMonth value +pub struct IntervalYearMonthArrayConverter {} + +impl Converter>, IntervalYearMonthArray> + for IntervalYearMonthArrayConverter +{ + fn convert( + &self, + source: Vec>, + ) -> Result { + let mut builder = IntervalYearMonthBuilder::new(source.len()); + for v in source { + match v { + Some(array) => builder.append_value(i32::from_le_bytes( + array.data()[0..4].try_into().unwrap(), + )), + None => builder.append_null(), + }? + } + + Ok(builder.finish()) + } +} + +/// An Arrow Interval converter, which reads the last 8 bytes of a Parquet interval, +/// and interprets it as an i32 value representing the Arrow DayTime value +pub struct IntervalDayTimeArrayConverter {} + +impl Converter>, IntervalDayTimeArray> + for IntervalDayTimeArrayConverter +{ + fn convert( + &self, + source: Vec>, + ) -> Result { + let mut builder = IntervalDayTimeBuilder::new(source.len()); + for v in source { + match v { + Some(array) => builder.append_value(i64::from_le_bytes( + array.data()[4..12].try_into().unwrap(), + )), + None => builder.append_null(), + }? + } + + Ok(builder.finish()) + } +} pub struct Int96ArrayConverter {} @@ -322,11 +373,22 @@ pub type PrimitiveDictionaryConverter = ArrayRefConverter< pub type Int96Converter = ArrayRefConverter>, TimestampNanosecondArray, Int96ArrayConverter>; + pub type FixedLenBinaryConverter = ArrayRefConverter< Vec>, FixedSizeBinaryArray, FixedSizeArrayConverter, >; +pub type IntervalYearMonthConverter = ArrayRefConverter< + Vec>, + IntervalYearMonthArray, + IntervalYearMonthArrayConverter, +>; +pub type IntervalDayTimeConverter = ArrayRefConverter< + Vec>, + IntervalDayTimeArray, + IntervalDayTimeArrayConverter, +>; pub type DecimalConverter = ArrayRefConverter< Vec>, diff --git a/rust/parquet/src/arrow/schema.rs b/rust/parquet/src/arrow/schema.rs index f8bda5e5b59..6a30539e2ec 100644 --- a/rust/parquet/src/arrow/schema.rs +++ b/rust/parquet/src/arrow/schema.rs @@ -26,7 +26,7 @@ use std::collections::{HashMap, HashSet}; use std::sync::Arc; -use arrow::datatypes::{DataType, DateUnit, Field, Schema, TimeUnit}; +use arrow::datatypes::{DataType, DateUnit, Field, IntervalUnit, Schema, TimeUnit}; use arrow::ipc::writer; use crate::basic::{LogicalType, Repetition, Type as PhysicalType}; @@ -368,6 +368,7 @@ fn arrow_to_parquet_type(field: &Field) -> Result { .with_logical_type(LogicalType::DATE) .with_repetition(repetition) .build(), + // date64 is cast to date32 DataType::Date64(_) => Type::primitive_type_builder(name, PhysicalType::INT32) .with_logical_type(LogicalType::DATE) .with_repetition(repetition) @@ -606,33 +607,43 @@ impl ParquetTypeConverter<'_> { } fn from_fixed_len_byte_array(&self) -> Result { - if self.schema.get_basic_info().logical_type() == LogicalType::DECIMAL { - let (precision, scale) = match self.schema { - Type::PrimitiveType { - ref precision, - ref scale, - .. - } => (*precision, *scale), - _ => { - return Err(ArrowError( - "Expected a physical type, not a group type".to_string(), - )) - } - }; - return Ok(DataType::Decimal(precision as usize, scale as usize)); - } - let byte_width = match self.schema { - Type::PrimitiveType { - ref type_length, .. - } => *type_length, - _ => { - return Err(ArrowError( - "Expected a physical type, not a group type".to_string(), - )) + match self.schema.get_basic_info().logical_type() { + LogicalType::DECIMAL => { + let (precision, scale) = match self.schema { + Type::PrimitiveType { + ref precision, + ref scale, + .. + } => (*precision, *scale), + _ => { + return Err(ArrowError( + "Expected a physical type, not a group type".to_string(), + )) + } + }; + Ok(DataType::Decimal(precision as usize, scale as usize)) + } + LogicalType::INTERVAL => { + // There is currently no reliable way of determining which IntervalUnit + // to return. Thus without the original Arrow schema, the results + // would be incorrect if all 12 bytes of the interval are populated + Ok(DataType::Interval(IntervalUnit::DayTime)) } - }; + _ => { + let byte_width = match self.schema { + Type::PrimitiveType { + ref type_length, .. + } => *type_length, + _ => { + return Err(ArrowError( + "Expected a physical type, not a group type".to_string(), + )) + } + }; - Ok(DataType::FixedSizeBinary(byte_width)) + Ok(DataType::FixedSizeBinary(byte_width)) + } + } } fn from_byte_array(&self) -> Result {