diff --git a/rust/arrow/src/compute/kernels/cast.rs b/rust/arrow/src/compute/kernels/cast.rs index d8cb480a80c..a1142b481d5 100644 --- a/rust/arrow/src/compute/kernels/cast.rs +++ b/rust/arrow/src/compute/kernels/cast.rs @@ -327,11 +327,27 @@ pub fn cast(array: &ArrayRef, to_type: &DataType) -> Result { // temporal casts (Int32, Date32(_)) => cast_array_data::(array, to_type.clone()), - (Int32, Time32(_)) => cast_array_data::(array, to_type.clone()), + (Int32, Time32(unit)) => match unit { + TimeUnit::Second => { + cast_array_data::(array, to_type.clone()) + } + TimeUnit::Millisecond => { + cast_array_data::(array, to_type.clone()) + } + _ => unreachable!(), + }, (Date32(_), Int32) => cast_array_data::(array, to_type.clone()), (Time32(_), Int32) => cast_array_data::(array, to_type.clone()), (Int64, Date64(_)) => cast_array_data::(array, to_type.clone()), - (Int64, Time64(_)) => cast_array_data::(array, to_type.clone()), + (Int64, Time64(unit)) => match unit { + TimeUnit::Microsecond => { + cast_array_data::(array, to_type.clone()) + } + TimeUnit::Nanosecond => { + cast_array_data::(array, to_type.clone()) + } + _ => unreachable!(), + }, (Date64(_), Int64) => cast_array_data::(array, to_type.clone()), (Time64(_), Int64) => cast_array_data::(array, to_type.clone()), (Date32(DateUnit::Day), Date64(DateUnit::Millisecond)) => { diff --git a/rust/arrow/src/ipc/convert.rs b/rust/arrow/src/ipc/convert.rs index 8f429bf1eb2..a02b6c44dd9 100644 --- a/rust/arrow/src/ipc/convert.rs +++ b/rust/arrow/src/ipc/convert.rs @@ -334,7 +334,9 @@ pub(crate) fn build_field<'a: 'b, 'b>( let mut field_builder = ipc::FieldBuilder::new(fbb); field_builder.add_name(fb_field_name); - fb_dictionary.map(|dictionary| field_builder.add_dictionary(dictionary)); + if let Some(dictionary) = fb_dictionary { + field_builder.add_dictionary(dictionary) + } field_builder.add_type_type(field_type.type_type); field_builder.add_nullable(field.is_nullable()); match field_type.children { diff --git a/rust/parquet/src/arrow/array_reader.rs b/rust/parquet/src/arrow/array_reader.rs index 14bf7d287a3..4fbc54d209d 100644 --- a/rust/parquet/src/arrow/array_reader.rs +++ b/rust/parquet/src/arrow/array_reader.rs @@ -35,9 +35,10 @@ use crate::arrow::converter::{ BinaryArrayConverter, BinaryConverter, BoolConverter, BooleanArrayConverter, Converter, Date32Converter, FixedLenBinaryConverter, FixedSizeArrayConverter, Float32Converter, Float64Converter, Int16Converter, Int32Converter, Int64Converter, - Int8Converter, Int96ArrayConverter, Int96Converter, TimestampMicrosecondConverter, - TimestampMillisecondConverter, UInt16Converter, UInt32Converter, UInt64Converter, - UInt8Converter, Utf8ArrayConverter, Utf8Converter, + Int8Converter, Int96ArrayConverter, Int96Converter, Time32MillisecondConverter, + Time32SecondConverter, Time64MicrosecondConverter, Time64NanosecondConverter, + TimestampMicrosecondConverter, TimestampMillisecondConverter, UInt16Converter, + UInt32Converter, UInt64Converter, UInt8Converter, Utf8ArrayConverter, Utf8Converter, }; use crate::arrow::record_reader::RecordReader; use crate::arrow::schema::parquet_to_arrow_field; @@ -196,11 +197,27 @@ impl ArrayReader for PrimitiveArrayReader { .convert(self.record_reader.cast::()), _ => Err(general_err!("No conversion from parquet type to arrow type for date with unit {:?}", unit)), } - (ArrowType::Time32(_), PhysicalType::INT32) => { - UInt32Converter::new().convert(self.record_reader.cast::()) + (ArrowType::Time32(unit), PhysicalType::INT32) => { + match unit { + TimeUnit::Second => { + Time32SecondConverter::new().convert(self.record_reader.cast::()) + } + TimeUnit::Millisecond => { + Time32MillisecondConverter::new().convert(self.record_reader.cast::()) + } + _ => Err(general_err!("Invalid or unsupported arrow array with datatype {:?}", self.get_data_type())) + } } - (ArrowType::Time64(_), PhysicalType::INT64) => { - UInt64Converter::new().convert(self.record_reader.cast::()) + (ArrowType::Time64(unit), PhysicalType::INT64) => { + match unit { + TimeUnit::Microsecond => { + Time64MicrosecondConverter::new().convert(self.record_reader.cast::()) + } + TimeUnit::Nanosecond => { + Time64NanosecondConverter::new().convert(self.record_reader.cast::()) + } + _ => Err(general_err!("Invalid or unsupported arrow array with datatype {:?}", self.get_data_type())) + } } (ArrowType::Interval(IntervalUnit::YearMonth), PhysicalType::INT32) => { UInt32Converter::new().convert(self.record_reader.cast::()) @@ -941,10 +958,12 @@ mod tests { use crate::util::test_common::{get_test_file, make_pages}; use arrow::array::{Array, ArrayRef, PrimitiveArray, StringArray, StructArray}; use arrow::datatypes::{ - DataType as ArrowType, Date32Type as ArrowDate32, Field, Int32Type as ArrowInt32, + ArrowPrimitiveType, DataType as ArrowType, Date32Type as ArrowDate32, Field, + Int32Type as ArrowInt32, Int64Type as ArrowInt64, + Time32MillisecondType as ArrowTime32MillisecondArray, + Time64MicrosecondType as ArrowTime64MicrosecondArray, TimestampMicrosecondType as ArrowTimestampMicrosecondType, TimestampMillisecondType as ArrowTimestampMillisecondType, - UInt32Type as ArrowUInt32, UInt64Type as ArrowUInt64, }; use rand::distributions::uniform::SampleUniform; use rand::{thread_rng, Rng}; @@ -1101,7 +1120,7 @@ mod tests { } macro_rules! test_primitive_array_reader_one_type { - ($arrow_parquet_type:ty, $physical_type:expr, $logical_type_str:expr, $result_arrow_type:ty, $result_primitive_type:ty) => {{ + ($arrow_parquet_type:ty, $physical_type:expr, $logical_type_str:expr, $result_arrow_type:ty, $result_arrow_cast_type:ty, $result_primitive_type:ty) => {{ let message_type = format!( " message test_schema {{ @@ -1112,7 +1131,7 @@ mod tests { ); let schema = parse_message_type(&message_type) .map(|t| Rc::new(SchemaDescriptor::new(Rc::new(t)))) - .unwrap(); + .expect("Unable to parse message type into a schema descriptor"); let column_desc = schema.column(0); @@ -1142,24 +1161,48 @@ mod tests { Box::new(page_iterator), column_desc.clone(), ) - .unwrap(); + .expect("Unable to get array reader"); - let array = array_reader.next_batch(50).unwrap(); + let array = array_reader + .next_batch(50) + .expect("Unable to get batch from reader"); + let result_data_type = <$result_arrow_type>::get_data_type(); let array = array .as_any() .downcast_ref::>() - .unwrap(); - - assert_eq!( - &PrimitiveArray::<$result_arrow_type>::from( - data[0..50] - .iter() - .map(|x| *x as $result_primitive_type) - .collect::>() - ), - array + .expect( + format!( + "Unable to downcast {:?} to {:?}", + array.data_type(), + result_data_type + ) + .as_str(), + ); + + // create expected array as primitive, and cast to result type + let expected = PrimitiveArray::<$result_arrow_cast_type>::from( + data[0..50] + .iter() + .map(|x| *x as $result_primitive_type) + .collect::>(), ); + let expected = Arc::new(expected) as ArrayRef; + let expected = arrow::compute::cast(&expected, &result_data_type) + .expect("Unable to cast expected array"); + assert_eq!(expected.data_type(), &result_data_type); + let expected = expected + .as_any() + .downcast_ref::>() + .expect( + format!( + "Unable to downcast expected {:?} to {:?}", + expected.data_type(), + result_data_type + ) + .as_str(), + ); + assert_eq!(expected, array); } }}; } @@ -1171,27 +1214,31 @@ mod tests { PhysicalType::INT32, "DATE", ArrowDate32, + ArrowInt32, i32 ); test_primitive_array_reader_one_type!( Int32Type, PhysicalType::INT32, "TIME_MILLIS", - ArrowUInt32, - u32 + ArrowTime32MillisecondArray, + ArrowInt32, + i32 ); test_primitive_array_reader_one_type!( Int64Type, PhysicalType::INT64, "TIME_MICROS", - ArrowUInt64, - u64 + ArrowTime64MicrosecondArray, + ArrowInt64, + i64 ); test_primitive_array_reader_one_type!( Int64Type, PhysicalType::INT64, "TIMESTAMP_MILLIS", ArrowTimestampMillisecondType, + ArrowInt64, i64 ); test_primitive_array_reader_one_type!( @@ -1199,6 +1246,7 @@ mod tests { PhysicalType::INT64, "TIMESTAMP_MICROS", ArrowTimestampMicrosecondType, + ArrowInt64, i64 ); } diff --git a/rust/parquet/src/arrow/arrow_reader.rs b/rust/parquet/src/arrow/arrow_reader.rs index b654de1ad0a..d3922ecaba2 100644 --- a/rust/parquet/src/arrow/arrow_reader.rs +++ b/rust/parquet/src/arrow/arrow_reader.rs @@ -19,7 +19,9 @@ use crate::arrow::array_reader::{build_array_reader, ArrayReader, StructArrayReader}; use crate::arrow::schema::parquet_to_arrow_schema; -use crate::arrow::schema::parquet_to_arrow_schema_by_columns; +use crate::arrow::schema::{ + parquet_to_arrow_schema_by_columns, parquet_to_arrow_schema_by_root_columns, +}; use crate::errors::{ParquetError, Result}; use crate::file::reader::FileReader; use arrow::datatypes::{DataType as ArrowType, Schema, SchemaRef}; @@ -40,7 +42,12 @@ pub trait ArrowReader { /// Read parquet schema and convert it into arrow schema. /// This schema only includes columns identified by `column_indices`. - fn get_schema_by_columns(&mut self, column_indices: T) -> Result + /// To select leaf columns (i.e. `a.b.c` instead of `a`), set `leaf_columns = true` + fn get_schema_by_columns( + &mut self, + column_indices: T, + leaf_columns: bool, + ) -> Result where T: IntoIterator; @@ -84,16 +91,28 @@ impl ArrowReader for ParquetFileArrowReader { ) } - fn get_schema_by_columns(&mut self, column_indices: T) -> Result + fn get_schema_by_columns( + &mut self, + column_indices: T, + leaf_columns: bool, + ) -> Result where T: IntoIterator, { let file_metadata = self.file_reader.metadata().file_metadata(); - parquet_to_arrow_schema_by_columns( - file_metadata.schema_descr(), - column_indices, - file_metadata.key_value_metadata(), - ) + if leaf_columns { + parquet_to_arrow_schema_by_columns( + file_metadata.schema_descr(), + column_indices, + file_metadata.key_value_metadata(), + ) + } else { + parquet_to_arrow_schema_by_root_columns( + file_metadata.schema_descr(), + column_indices, + file_metadata.key_value_metadata(), + ) + } } fn get_record_reader( diff --git a/rust/parquet/src/arrow/arrow_writer.rs b/rust/parquet/src/arrow/arrow_writer.rs index e0ad207b4dc..cf7b9a22a5c 100644 --- a/rust/parquet/src/arrow/arrow_writer.rs +++ b/rust/parquet/src/arrow/arrow_writer.rs @@ -136,7 +136,6 @@ fn write_leaves( | ArrowDataType::UInt16 | ArrowDataType::UInt32 | ArrowDataType::UInt64 - | ArrowDataType::Float16 | ArrowDataType::Float32 | ArrowDataType::Float64 | ArrowDataType::Timestamp(_, _) @@ -176,6 +175,9 @@ fn write_leaves( } Ok(()) } + ArrowDataType::Float16 => Err(ParquetError::ArrowError( + "Float16 arrays not supported".to_string(), + )), ArrowDataType::FixedSizeList(_, _) | ArrowDataType::Null | ArrowDataType::Boolean @@ -493,7 +495,7 @@ mod tests { use arrow::array::*; use arrow::datatypes::ToByteSlice; use arrow::datatypes::{DataType, Field, Schema}; - use arrow::record_batch::{RecordBatch, RecordBatchReader}; + use arrow::record_batch::RecordBatch; use crate::arrow::{ArrowReader, ParquetFileArrowReader}; use crate::file::{metadata::KeyValue, reader::SerializedFileReader}; @@ -597,7 +599,7 @@ mod tests { let mut arrow_reader = ParquetFileArrowReader::new(Rc::new(file_reader)); let mut record_batch_reader = arrow_reader.get_record_reader(1024).unwrap(); - let batch = record_batch_reader.next_batch().unwrap().unwrap(); + let batch = record_batch_reader.next().unwrap().unwrap(); let string_col = batch .column(0) .as_any() @@ -688,4 +690,409 @@ mod tests { writer.write(&batch).unwrap(); writer.close().unwrap(); } + + const SMALL_SIZE: usize = 100; + + fn roundtrip(filename: &str, expected_batch: RecordBatch) { + let file = get_temp_file(filename, &[]); + + let mut writer = ArrowWriter::try_new( + file.try_clone().unwrap(), + expected_batch.schema(), + None, + ) + .unwrap(); + writer.write(&expected_batch).unwrap(); + writer.close().unwrap(); + + let reader = SerializedFileReader::new(file).unwrap(); + let mut arrow_reader = ParquetFileArrowReader::new(Rc::new(reader)); + let mut record_batch_reader = arrow_reader.get_record_reader(1024).unwrap(); + + let actual_batch = record_batch_reader.next().unwrap().unwrap(); + + assert_eq!(expected_batch.schema(), actual_batch.schema()); + assert_eq!(expected_batch.num_columns(), actual_batch.num_columns()); + assert_eq!(expected_batch.num_rows(), actual_batch.num_rows()); + for i in 0..expected_batch.num_columns() { + let expected_data = expected_batch.column(i).data(); + let actual_data = actual_batch.column(i).data(); + + assert_eq!(expected_data.data_type(), actual_data.data_type()); + assert_eq!(expected_data.len(), actual_data.len()); + assert_eq!(expected_data.null_count(), actual_data.null_count()); + assert_eq!(expected_data.offset(), actual_data.offset()); + assert_eq!(expected_data.buffers(), actual_data.buffers()); + assert_eq!(expected_data.child_data(), actual_data.child_data()); + assert_eq!(expected_data.null_bitmap(), actual_data.null_bitmap()); + } + } + + fn one_column_roundtrip(filename: &str, values: ArrayRef, nullable: bool) { + let schema = Schema::new(vec![Field::new( + "col", + values.data_type().clone(), + nullable, + )]); + let expected_batch = + RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap(); + + roundtrip(filename, expected_batch); + } + + fn values_required(iter: I, filename: &str) + where + A: From> + Array + 'static, + I: IntoIterator, + { + let raw_values: Vec<_> = iter.into_iter().collect(); + let values = Arc::new(A::from(raw_values)); + one_column_roundtrip(filename, values, false); + } + + fn values_optional(iter: I, filename: &str) + where + A: From>> + Array + 'static, + I: IntoIterator, + { + let optional_raw_values: Vec<_> = iter + .into_iter() + .enumerate() + .map(|(i, v)| if i % 2 == 0 { None } else { Some(v) }) + .collect(); + let optional_values = Arc::new(A::from(optional_raw_values)); + one_column_roundtrip(filename, optional_values, true); + } + + fn required_and_optional(iter: I, filename: &str) + where + A: From> + From>> + Array + 'static, + I: IntoIterator + Clone, + { + values_required::(iter.clone(), filename); + values_optional::(iter, filename); + } + + #[test] + #[should_panic(expected = "Null arrays not supported")] + fn null_single_column() { + let values = Arc::new(NullArray::new(SMALL_SIZE)); + one_column_roundtrip("null_single_column", values.clone(), true); + one_column_roundtrip("null_single_column", values, false); + } + + #[test] + #[should_panic( + expected = "Attempting to write an Arrow type that is not yet implemented" + )] + fn bool_single_column() { + required_and_optional::( + [true, false].iter().cycle().copied().take(SMALL_SIZE), + "bool_single_column", + ); + } + + #[test] + fn i8_single_column() { + required_and_optional::(0..SMALL_SIZE as i8, "i8_single_column"); + } + + #[test] + fn i16_single_column() { + required_and_optional::(0..SMALL_SIZE as i16, "i16_single_column"); + } + + #[test] + fn i32_single_column() { + required_and_optional::(0..SMALL_SIZE as i32, "i32_single_column"); + } + + #[test] + fn i64_single_column() { + required_and_optional::(0..SMALL_SIZE as i64, "i64_single_column"); + } + + #[test] + fn u8_single_column() { + required_and_optional::(0..SMALL_SIZE as u8, "u8_single_column"); + } + + #[test] + fn u16_single_column() { + required_and_optional::( + 0..SMALL_SIZE as u16, + "u16_single_column", + ); + } + + #[test] + fn u32_single_column() { + required_and_optional::( + 0..SMALL_SIZE as u32, + "u32_single_column", + ); + } + + #[test] + fn u64_single_column() { + required_and_optional::( + 0..SMALL_SIZE as u64, + "u64_single_column", + ); + } + + #[test] + fn f32_single_column() { + required_and_optional::( + (0..SMALL_SIZE).map(|i| i as f32), + "f32_single_column", + ); + } + + #[test] + fn f64_single_column() { + required_and_optional::( + (0..SMALL_SIZE).map(|i| i as f64), + "f64_single_column", + ); + } + + // The timestamp array types don't implement From> because they need the timezone + // argument, and they also doesn't support building from a Vec>, so call + // one_column_roundtrip manually instead of calling required_and_optional for these tests. + + #[test] + #[ignore] // Timestamp support isn't correct yet + fn timestamp_second_single_column() { + let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect(); + let values = Arc::new(TimestampSecondArray::from_vec(raw_values, None)); + + one_column_roundtrip("timestamp_second_single_column", values, false); + } + + #[test] + fn timestamp_millisecond_single_column() { + let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect(); + let values = Arc::new(TimestampMillisecondArray::from_vec(raw_values, None)); + + one_column_roundtrip("timestamp_millisecond_single_column", values, false); + } + + #[test] + fn timestamp_microsecond_single_column() { + let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect(); + let values = Arc::new(TimestampMicrosecondArray::from_vec(raw_values, None)); + + one_column_roundtrip("timestamp_microsecond_single_column", values, false); + } + + #[test] + #[ignore] // Timestamp support isn't correct yet + fn timestamp_nanosecond_single_column() { + let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect(); + let values = Arc::new(TimestampNanosecondArray::from_vec(raw_values, None)); + + one_column_roundtrip("timestamp_nanosecond_single_column", values, false); + } + + #[test] + fn date32_single_column() { + required_and_optional::( + 0..SMALL_SIZE as i32, + "date32_single_column", + ); + } + + #[test] + #[ignore] // Date support isn't correct yet + fn date64_single_column() { + required_and_optional::( + 0..SMALL_SIZE as i64, + "date64_single_column", + ); + } + + #[test] + #[ignore] // DateUnit resolution mismatch + fn time32_second_single_column() { + required_and_optional::( + 0..SMALL_SIZE as i32, + "time32_second_single_column", + ); + } + + #[test] + fn time32_millisecond_single_column() { + required_and_optional::( + 0..SMALL_SIZE as i32, + "time32_millisecond_single_column", + ); + } + + #[test] + fn time64_microsecond_single_column() { + required_and_optional::( + 0..SMALL_SIZE as i64, + "time64_microsecond_single_column", + ); + } + + #[test] + #[ignore] // DateUnit resolution mismatch + fn time64_nanosecond_single_column() { + required_and_optional::( + 0..SMALL_SIZE as i64, + "time64_nanosecond_single_column", + ); + } + + #[test] + #[should_panic(expected = "Converting Duration to parquet not supported")] + fn duration_second_single_column() { + required_and_optional::( + 0..SMALL_SIZE as i64, + "duration_second_single_column", + ); + } + + #[test] + #[should_panic(expected = "Converting Duration to parquet not supported")] + fn duration_millisecond_single_column() { + required_and_optional::( + 0..SMALL_SIZE as i64, + "duration_millisecond_single_column", + ); + } + + #[test] + #[should_panic(expected = "Converting Duration to parquet not supported")] + fn duration_microsecond_single_column() { + required_and_optional::( + 0..SMALL_SIZE as i64, + "duration_microsecond_single_column", + ); + } + + #[test] + #[should_panic(expected = "Converting Duration to parquet not supported")] + fn duration_nanosecond_single_column() { + required_and_optional::( + 0..SMALL_SIZE as i64, + "duration_nanosecond_single_column", + ); + } + + #[test] + #[should_panic(expected = "Currently unreachable because data type not supported")] + fn interval_year_month_single_column() { + required_and_optional::( + 0..SMALL_SIZE as i32, + "interval_year_month_single_column", + ); + } + + #[test] + #[should_panic(expected = "Currently unreachable because data type not supported")] + fn interval_day_time_single_column() { + required_and_optional::( + 0..SMALL_SIZE as i64, + "interval_day_time_single_column", + ); + } + + #[test] + #[ignore] // Binary support isn't correct yet - null_bitmap doesn't match + fn binary_single_column() { + let one_vec: Vec = (0..SMALL_SIZE as u8).collect(); + let many_vecs: Vec<_> = std::iter::repeat(one_vec).take(SMALL_SIZE).collect(); + let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice()); + + // BinaryArrays can't be built from Vec>, so only call `values_required` + values_required::(many_vecs_iter, "binary_single_column"); + } + + #[test] + #[ignore] // Large Binary support isn't correct yet + fn large_binary_single_column() { + let one_vec: Vec = (0..SMALL_SIZE as u8).collect(); + let many_vecs: Vec<_> = std::iter::repeat(one_vec).take(SMALL_SIZE).collect(); + let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice()); + + // LargeBinaryArrays can't be built from Vec>, so only call `values_required` + values_required::( + many_vecs_iter, + "large_binary_single_column", + ); + } + + #[test] + #[ignore] // String support isn't correct yet - null_bitmap doesn't match + fn string_single_column() { + let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect(); + let raw_strs = raw_values.iter().map(|s| s.as_str()); + + required_and_optional::(raw_strs, "string_single_column"); + } + + #[test] + #[ignore] // Large String support isn't correct yet - null_bitmap and buffers don't match + fn large_string_single_column() { + let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect(); + let raw_strs = raw_values.iter().map(|s| s.as_str()); + + required_and_optional::( + raw_strs, + "large_string_single_column", + ); + } + + #[test] + #[should_panic( + expected = "Reading parquet list array into arrow is not supported yet!" + )] + fn list_single_column() { + let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); + let a_value_offsets = + arrow::buffer::Buffer::from(&[0, 1, 3, 3, 6, 10].to_byte_slice()); + let a_list_data = ArrayData::builder(DataType::List(Box::new(DataType::Int32))) + .len(5) + .add_buffer(a_value_offsets) + .add_child_data(a_values.data()) + .build(); + let a = ListArray::from(a_list_data); + + let values = Arc::new(a); + one_column_roundtrip("list_single_column", values, false); + } + + #[test] + #[should_panic( + expected = "Reading parquet list array into arrow is not supported yet!" + )] + fn large_list_single_column() { + let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); + let a_value_offsets = + arrow::buffer::Buffer::from(&[0i64, 1, 3, 3, 6, 10].to_byte_slice()); + let a_list_data = + ArrayData::builder(DataType::LargeList(Box::new(DataType::Int32))) + .len(5) + .add_buffer(a_value_offsets) + .add_child_data(a_values.data()) + .build(); + let a = LargeListArray::from(a_list_data); + + let values = Arc::new(a); + one_column_roundtrip("large_list_single_column", values, false); + } + + #[test] + #[ignore] // Struct support isn't correct yet - null_bitmap doesn't match + fn struct_single_column() { + let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); + let struct_field_a = Field::new("f", DataType::Int32, false); + let s = StructArray::from(vec![(struct_field_a, Arc::new(a_values) as ArrayRef)]); + + let values = Arc::new(s); + one_column_roundtrip("struct_single_column", values, false); + } } diff --git a/rust/parquet/src/arrow/converter.rs b/rust/parquet/src/arrow/converter.rs index 9fbfa339168..c988aaeacfc 100644 --- a/rust/parquet/src/arrow/converter.rs +++ b/rust/parquet/src/arrow/converter.rs @@ -17,12 +17,19 @@ use crate::arrow::record_reader::RecordReader; use crate::data_type::{ByteArray, DataType, Int96}; -use arrow::array::{ - Array, ArrayRef, BinaryBuilder, BooleanArray, BooleanBufferBuilder, - BufferBuilderTrait, FixedSizeBinaryBuilder, StringBuilder, - TimestampNanosecondBuilder, +// TODO: clean up imports (best done when there are few moving parts) +use arrow::{ + array::{ + Array, ArrayRef, BinaryBuilder, BooleanArray, BooleanBufferBuilder, + BufferBuilderTrait, FixedSizeBinaryBuilder, StringBuilder, + TimestampNanosecondBuilder, + }, + datatypes::Time32MillisecondType, +}; +use arrow::{ + compute::cast, datatypes::Time32SecondType, datatypes::Time64MicrosecondType, + datatypes::Time64NanosecondType, }; -use arrow::compute::cast; use std::convert::From; use std::sync::Arc; @@ -226,6 +233,14 @@ pub type TimestampMillisecondConverter = CastConverter; pub type TimestampMicrosecondConverter = CastConverter; +pub type Time32SecondConverter = + CastConverter; +pub type Time32MillisecondConverter = + CastConverter; +pub type Time64MicrosecondConverter = + CastConverter; +pub type Time64NanosecondConverter = + CastConverter; pub type UInt64Converter = CastConverter; pub type Float32Converter = CastConverter; pub type Float64Converter = CastConverter; diff --git a/rust/parquet/src/arrow/mod.rs b/rust/parquet/src/arrow/mod.rs index 2b012fb777e..979345722d2 100644 --- a/rust/parquet/src/arrow/mod.rs +++ b/rust/parquet/src/arrow/mod.rs @@ -35,7 +35,7 @@ //! //! println!("Converted arrow schema is: {}", arrow_reader.get_schema().unwrap()); //! println!("Arrow schema after projection is: {}", -//! arrow_reader.get_schema_by_columns(vec![2, 4, 6]).unwrap()); +//! arrow_reader.get_schema_by_columns(vec![2, 4, 6], true).unwrap()); //! //! let mut record_batch_reader = arrow_reader.get_record_reader(2048).unwrap(); //! @@ -61,6 +61,7 @@ pub use self::arrow_reader::ParquetFileArrowReader; pub use self::arrow_writer::ArrowWriter; pub use self::schema::{ arrow_to_parquet_schema, parquet_to_arrow_schema, parquet_to_arrow_schema_by_columns, + parquet_to_arrow_schema_by_root_columns, }; /// Schema metadata key used to store serialized Arrow IPC schema diff --git a/rust/parquet/src/arrow/record_reader.rs b/rust/parquet/src/arrow/record_reader.rs index ccfdaf8f0e5..b30ab7760b2 100644 --- a/rust/parquet/src/arrow/record_reader.rs +++ b/rust/parquet/src/arrow/record_reader.rs @@ -86,6 +86,7 @@ impl<'a, T> FatPtr<'a, T> { self.ptr } + #[allow(clippy::wrong_self_convention)] fn to_slice_mut(&mut self) -> &mut [T] { self.ptr } diff --git a/rust/parquet/src/arrow/schema.rs b/rust/parquet/src/arrow/schema.rs index 4a92a4642ef..0cd41fe5925 100644 --- a/rust/parquet/src/arrow/schema.rs +++ b/rust/parquet/src/arrow/schema.rs @@ -56,7 +56,61 @@ pub fn parquet_to_arrow_schema( } } -/// Convert parquet schema to arrow schema including optional metadata, only preserving some leaf columns. +/// Convert parquet schema to arrow schema including optional metadata, +/// only preserving some root columns. +/// This is useful if we have columns `a.b`, `a.c.e` and `a.d`, +/// and want `a` with all its child fields +pub fn parquet_to_arrow_schema_by_root_columns( + parquet_schema: &SchemaDescriptor, + column_indices: T, + key_value_metadata: &Option>, +) -> Result +where + T: IntoIterator, +{ + // Reconstruct the index ranges of the parent columns + // An Arrow struct gets represented by 1+ columns based on how many child fields the + // struct has. This means that getting fields 1 and 2 might return the struct twice, + // if field 1 is the struct having say 3 fields, and field 2 is a primitive. + // + // The below gets the parent columns, and counts the number of child fields in each parent, + // such that we would end up with: + // - field 1 - columns: [0, 1, 2] + // - field 2 - columns: [3] + let mut parent_columns = vec![]; + let mut curr_name = ""; + let mut prev_name = ""; + let mut indices = vec![]; + (0..(parquet_schema.num_columns())).for_each(|i| { + let p_type = parquet_schema.get_column_root(i); + curr_name = p_type.get_basic_info().name(); + if prev_name == "" { + // first index + indices.push(i); + prev_name = curr_name; + } else if curr_name != prev_name { + prev_name = curr_name; + parent_columns.push((curr_name.to_string(), indices.clone())); + indices = vec![i]; + } else { + indices.push(i); + } + }); + // push the last column if indices has values + if !indices.is_empty() { + parent_columns.push((curr_name.to_string(), indices)); + } + + // gather the required leaf columns + let leaf_columns = column_indices + .into_iter() + .flat_map(|i| parent_columns[i].1.clone()); + + parquet_to_arrow_schema_by_columns(parquet_schema, leaf_columns, key_value_metadata) +} + +/// Convert parquet schema to arrow schema including optional metadata, +/// only preserving some leaf columns. pub fn parquet_to_arrow_schema_by_columns( parquet_schema: &SchemaDescriptor, column_indices: T, @@ -65,27 +119,56 @@ pub fn parquet_to_arrow_schema_by_columns( where T: IntoIterator, { + let mut metadata = parse_key_value_metadata(key_value_metadata).unwrap_or_default(); + let arrow_schema_metadata = metadata + .remove(super::ARROW_SCHEMA_META_KEY) + .map(|encoded| get_arrow_schema_from_metadata(&encoded)) + .unwrap_or_default(); + + // add the Arrow metadata to the Parquet metadata + if let Some(arrow_schema) = &arrow_schema_metadata { + arrow_schema.metadata().iter().for_each(|(k, v)| { + metadata.insert(k.clone(), v.clone()); + }); + } + let mut base_nodes = Vec::new(); let mut base_nodes_set = HashSet::new(); let mut leaves = HashSet::new(); + enum FieldType<'a> { + Parquet(&'a Type), + Arrow(Field), + } + for c in column_indices { - let column = parquet_schema.column(c).self_type() as *const Type; - let root = parquet_schema.get_column_root(c); - let root_raw_ptr = root as *const Type; - - leaves.insert(column); - if !base_nodes_set.contains(&root_raw_ptr) { - base_nodes.push(root); - base_nodes_set.insert(root_raw_ptr); + let column = parquet_schema.column(c); + let name = column.name(); + + if let Some(field) = arrow_schema_metadata + .as_ref() + .and_then(|schema| schema.field_with_name(name).ok().cloned()) + { + base_nodes.push(FieldType::Arrow(field)); + } else { + let column = column.self_type() as *const Type; + let root = parquet_schema.get_column_root(c); + let root_raw_ptr = root as *const Type; + + leaves.insert(column); + if !base_nodes_set.contains(&root_raw_ptr) { + base_nodes.push(FieldType::Parquet(root)); + base_nodes_set.insert(root_raw_ptr); + } } } - let metadata = parse_key_value_metadata(key_value_metadata).unwrap_or_default(); - base_nodes .into_iter() - .map(|t| ParquetTypeConverter::new(t, &leaves).to_field()) + .map(|t| match t { + FieldType::Parquet(t) => ParquetTypeConverter::new(t, &leaves).to_field(), + FieldType::Arrow(f) => Ok(Some(f)), + }) .collect::>>>() .map(|result| result.into_iter().filter_map(|f| f).collect::>()) .map(|fields| Schema::new_with_metadata(fields, metadata)) @@ -1367,21 +1450,21 @@ mod tests { Field::new("c19", DataType::Interval(IntervalUnit::DayTime), false), Field::new("c20", DataType::Interval(IntervalUnit::YearMonth), false), Field::new("c21", DataType::List(Box::new(DataType::Boolean)), false), - Field::new( - "c22", - DataType::FixedSizeList(Box::new(DataType::Boolean), 5), - false, - ), - Field::new( - "c23", - DataType::List(Box::new(DataType::List(Box::new(DataType::Struct( - vec![ - Field::new("a", DataType::Int16, true), - Field::new("b", DataType::Float64, false), - ], - ))))), - true, - ), + // Field::new( + // "c22", + // DataType::FixedSizeList(Box::new(DataType::Boolean), 5), + // false, + // ), + // Field::new( + // "c23", + // DataType::List(Box::new(DataType::LargeList(Box::new( + // DataType::Struct(vec![ + // Field::new("a", DataType::Int16, true), + // Field::new("b", DataType::Float64, false), + // ]), + // )))), + // true, + // ), Field::new( "c24", DataType::Struct(vec![ @@ -1408,12 +1491,66 @@ mod tests { ), Field::new("c32", DataType::LargeBinary, true), Field::new("c33", DataType::LargeUtf8, true), + // Field::new( + // "c34", + // DataType::LargeList(Box::new(DataType::List(Box::new( + // DataType::Struct(vec![ + // Field::new("a", DataType::Int16, true), + // Field::new("b", DataType::Float64, true), + // ]), + // )))), + // true, + // ), + ], + metadata, + ); + + // write to an empty parquet file so that schema is serialized + let file = get_temp_file("test_arrow_schema_roundtrip.parquet", &[]); + let mut writer = ArrowWriter::try_new( + file.try_clone().unwrap(), + Arc::new(schema.clone()), + None, + )?; + writer.close()?; + + // read file back + let parquet_reader = SerializedFileReader::try_from(file)?; + let mut arrow_reader = ParquetFileArrowReader::new(Rc::new(parquet_reader)); + let read_schema = arrow_reader.get_schema()?; + assert_eq!(schema, read_schema); + + // read all fields by columns + let partial_read_schema = + arrow_reader.get_schema_by_columns(0..(schema.fields().len()), false)?; + assert_eq!(schema, partial_read_schema); + + Ok(()) + } + + #[test] + #[ignore = "Roundtrip of lists currently fails because we don't check their types correctly in the Arrow schema"] + fn test_arrow_schema_roundtrip_lists() -> Result<()> { + let metadata: HashMap = + [("Key".to_string(), "Value".to_string())] + .iter() + .cloned() + .collect(); + + let schema = Schema::new_with_metadata( + vec![ + Field::new("c21", DataType::List(Box::new(DataType::Boolean)), false), Field::new( - "c34", - DataType::LargeList(Box::new(DataType::LargeList(Box::new( + "c22", + DataType::FixedSizeList(Box::new(DataType::Boolean), 5), + false, + ), + Field::new( + "c23", + DataType::List(Box::new(DataType::LargeList(Box::new( DataType::Struct(vec![ Field::new("a", DataType::Int16, true), - Field::new("b", DataType::Float64, true), + Field::new("b", DataType::Float64, false), ]), )))), true, @@ -1423,7 +1560,7 @@ mod tests { ); // write to an empty parquet file so that schema is serialized - let file = get_temp_file("test_arrow_schema_roundtrip.parquet", &[]); + let file = get_temp_file("test_arrow_schema_roundtrip_lists.parquet", &[]); let mut writer = ArrowWriter::try_new( file.try_clone().unwrap(), Arc::new(schema.clone()), @@ -1436,6 +1573,12 @@ mod tests { let mut arrow_reader = ParquetFileArrowReader::new(Rc::new(parquet_reader)); let read_schema = arrow_reader.get_schema()?; assert_eq!(schema, read_schema); + + // read all fields by columns + let partial_read_schema = + arrow_reader.get_schema_by_columns(0..(schema.fields().len()), false)?; + assert_eq!(schema, partial_read_schema); + Ok(()) } }