From f11b3222035d7533f11555417a0102a365445504 Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Thu, 13 Aug 2020 18:47:34 +0200 Subject: [PATCH 1/5] ARROW-8289: [Rust] Parquet Arrow writer with nested support **Note**: I started making changes to #6785, and ended up deviating a lot, so I opted for making a new draft PR in case my approach is not suitable. ___ This is a draft to implement an arrow writer for parquet. It supports the following (no complete test coverage yet): * writing primitives except for booleans and binary * nested structs * null values (via definition levels) It does not yet support: - Boolean arrays (have to be handled differently from numeric values) - Binary arrays - Dictionary arrays - Union arrays (are they even possible?) I have only added a test by creating a nested schema, which I tested on pyarrow. ```jupyter # schema of test_complex.parquet a: int32 not null b: int32 c: struct> not null child 0, d: double child 1, e: struct child 0, f: float ``` This PR potentially addresses: * https://issues.apache.org/jira/browse/ARROW-8289 * https://issues.apache.org/jira/browse/ARROW-8423 * https://issues.apache.org/jira/browse/ARROW-8424 * https://issues.apache.org/jira/browse/ARROW-8425 And I would like to propose either opening new JIRAs for the above incomplete items, or renaming the last 3 above. ___ **Help Needed** I'm implementing the definition and repetition levels on first principle from an old Parquet blog post from the Twitter engineering blog. It's likely that I'm not getting some concepts correct, so I would appreciate help with: * Checking if my logic is correct * Guidance or suggestions on how to more efficiently extract levels from arrays * Adding tests - I suspect we might need a lot of tests, so far we only test writing 1 batch, so I don't know how paging would work when writing a large enough file I also don't know if the various encoding levels (dictionary, RLE, etc.) and compression levels are applied automagically, or if that'd be something we need to explicitly enable. CC @sunchao @sadikovi @andygrove @paddyhoran Might be of interest to @mcassels @maxburke Closes #7319 from nevi-me/arrow-parquet-writer Lead-authored-by: Neville Dipale Co-authored-by: Max Burke Co-authored-by: Andy Grove Co-authored-by: Max Burke Signed-off-by: Neville Dipale --- rust/arrow/src/array/mod.rs | 2 +- rust/parquet/src/arrow/arrow_writer.rs | 682 +++++++++++++++++++++++++ rust/parquet/src/arrow/mod.rs | 5 +- rust/parquet/src/column/writer.rs | 2 - rust/parquet/src/schema/types.rs | 6 +- 5 files changed, 692 insertions(+), 5 deletions(-) create mode 100644 rust/parquet/src/arrow/arrow_writer.rs diff --git a/rust/arrow/src/array/mod.rs b/rust/arrow/src/array/mod.rs index 3abc1423e4a..68a2dd602d0 100644 --- a/rust/arrow/src/array/mod.rs +++ b/rust/arrow/src/array/mod.rs @@ -115,7 +115,7 @@ pub use self::array::StructArray; pub use self::null::NullArray; pub use self::union::UnionArray; -pub(crate) use self::array::make_array; +pub use self::array::make_array; pub type BooleanArray = PrimitiveArray; pub type Int8Array = PrimitiveArray; diff --git a/rust/parquet/src/arrow/arrow_writer.rs b/rust/parquet/src/arrow/arrow_writer.rs new file mode 100644 index 00000000000..0c1c4903d16 --- /dev/null +++ b/rust/parquet/src/arrow/arrow_writer.rs @@ -0,0 +1,682 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Contains writer which writes arrow data into parquet data. + +use std::rc::Rc; + +use arrow::array as arrow_array; +use arrow::datatypes::{DataType as ArrowDataType, SchemaRef}; +use arrow::record_batch::RecordBatch; +use arrow_array::Array; + +use crate::column::writer::ColumnWriter; +use crate::errors::{ParquetError, Result}; +use crate::file::properties::WriterProperties; +use crate::{ + data_type::*, + file::writer::{FileWriter, ParquetWriter, RowGroupWriter, SerializedFileWriter}, +}; + +/// Arrow writer +/// +/// Writes Arrow `RecordBatch`es to a Parquet writer +pub struct ArrowWriter { + /// Underlying Parquet writer + writer: SerializedFileWriter, + /// A copy of the Arrow schema. + /// + /// The schema is used to verify that each record batch written has the correct schema + arrow_schema: SchemaRef, +} + +impl ArrowWriter { + /// Try to create a new Arrow writer + /// + /// The writer will fail if: + /// * a `SerializedFileWriter` cannot be created from the ParquetWriter + /// * the Arrow schema contains unsupported datatypes such as Unions + pub fn try_new( + writer: W, + arrow_schema: SchemaRef, + props: Option>, + ) -> Result { + let schema = crate::arrow::arrow_to_parquet_schema(&arrow_schema)?; + let props = match props { + Some(props) => props, + None => Rc::new(WriterProperties::builder().build()), + }; + let file_writer = SerializedFileWriter::new( + writer.try_clone()?, + schema.root_schema_ptr(), + props, + )?; + + Ok(Self { + writer: file_writer, + arrow_schema, + }) + } + + /// Write a RecordBatch to writer + /// + /// *NOTE:* The writer currently does not support all Arrow data types + pub fn write(&mut self, batch: &RecordBatch) -> Result<()> { + // validate batch schema against writer's supplied schema + if self.arrow_schema != batch.schema() { + return Err(ParquetError::ArrowError( + "Record batch schema does not match writer schema".to_string(), + )); + } + // compute the definition and repetition levels of the batch + let mut levels = vec![]; + batch.columns().iter().for_each(|array| { + let mut array_levels = + get_levels(array, 0, &vec![1i16; batch.num_rows()][..], None); + levels.append(&mut array_levels); + }); + // reverse levels so we can use Vec::pop(&mut self) + levels.reverse(); + + let mut row_group_writer = self.writer.next_row_group()?; + + // write leaves + for column in batch.columns() { + write_leaves(&mut row_group_writer, column, &mut levels)?; + } + + self.writer.close_row_group(row_group_writer) + } + + /// Close and finalise the underlying Parquet writer + pub fn close(&mut self) -> Result<()> { + self.writer.close() + } +} + +/// Convenience method to get the next ColumnWriter from the RowGroupWriter +#[inline] +#[allow(clippy::borrowed_box)] +fn get_col_writer( + row_group_writer: &mut Box, +) -> Result { + let col_writer = row_group_writer + .next_column()? + .expect("Unable to get column writer"); + Ok(col_writer) +} + +#[allow(clippy::borrowed_box)] +fn write_leaves( + mut row_group_writer: &mut Box, + array: &arrow_array::ArrayRef, + mut levels: &mut Vec, +) -> Result<()> { + match array.data_type() { + ArrowDataType::Int8 + | ArrowDataType::Int16 + | ArrowDataType::Int32 + | ArrowDataType::Int64 + | ArrowDataType::UInt8 + | ArrowDataType::UInt16 + | ArrowDataType::UInt32 + | ArrowDataType::UInt64 + | ArrowDataType::Float16 + | ArrowDataType::Float32 + | ArrowDataType::Float64 + | ArrowDataType::Timestamp(_, _) + | ArrowDataType::Date32(_) + | ArrowDataType::Date64(_) + | ArrowDataType::Time32(_) + | ArrowDataType::Time64(_) + | ArrowDataType::Duration(_) + | ArrowDataType::Interval(_) + | ArrowDataType::LargeBinary + | ArrowDataType::Binary + | ArrowDataType::Utf8 + | ArrowDataType::LargeUtf8 => { + let mut col_writer = get_col_writer(&mut row_group_writer)?; + write_leaf( + &mut col_writer, + array, + levels.pop().expect("Levels exhausted"), + )?; + row_group_writer.close_column(col_writer)?; + Ok(()) + } + ArrowDataType::List(_) | ArrowDataType::LargeList(_) => { + // write the child list + let data = array.data(); + let child_array = arrow_array::make_array(data.child_data()[0].clone()); + write_leaves(&mut row_group_writer, &child_array, &mut levels)?; + Ok(()) + } + ArrowDataType::Struct(_) => { + let struct_array: &arrow_array::StructArray = array + .as_any() + .downcast_ref::() + .expect("Unable to get struct array"); + for field in struct_array.columns() { + write_leaves(&mut row_group_writer, field, &mut levels)?; + } + Ok(()) + } + ArrowDataType::FixedSizeList(_, _) + | ArrowDataType::Null + | ArrowDataType::Boolean + | ArrowDataType::FixedSizeBinary(_) + | ArrowDataType::Union(_) + | ArrowDataType::Dictionary(_, _) => Err(ParquetError::NYI( + "Attempting to write an Arrow type that is not yet implemented".to_string(), + )), + } +} + +fn write_leaf( + writer: &mut ColumnWriter, + column: &arrow_array::ArrayRef, + levels: Levels, +) -> Result { + let written = match writer { + ColumnWriter::Int32ColumnWriter(ref mut typed) => { + let array = arrow::compute::cast(column, &ArrowDataType::Int32)?; + let array = array + .as_any() + .downcast_ref::() + .expect("Unable to get int32 array"); + typed.write_batch( + get_numeric_array_slice::(&array).as_slice(), + Some(levels.definition.as_slice()), + levels.repetition.as_deref(), + )? + } + ColumnWriter::BoolColumnWriter(ref mut _typed) => { + unreachable!("Currently unreachable because data type not supported") + } + ColumnWriter::Int64ColumnWriter(ref mut typed) => { + let array = arrow_array::Int64Array::from(column.data()); + typed.write_batch( + get_numeric_array_slice::(&array).as_slice(), + Some(levels.definition.as_slice()), + levels.repetition.as_deref(), + )? + } + ColumnWriter::Int96ColumnWriter(ref mut _typed) => { + unreachable!("Currently unreachable because data type not supported") + } + ColumnWriter::FloatColumnWriter(ref mut typed) => { + let array = arrow_array::Float32Array::from(column.data()); + typed.write_batch( + get_numeric_array_slice::(&array).as_slice(), + Some(levels.definition.as_slice()), + levels.repetition.as_deref(), + )? + } + ColumnWriter::DoubleColumnWriter(ref mut typed) => { + let array = arrow_array::Float64Array::from(column.data()); + typed.write_batch( + get_numeric_array_slice::(&array).as_slice(), + Some(levels.definition.as_slice()), + levels.repetition.as_deref(), + )? + } + ColumnWriter::ByteArrayColumnWriter(ref mut typed) => match column.data_type() { + ArrowDataType::Binary | ArrowDataType::Utf8 => { + let array = arrow_array::BinaryArray::from(column.data()); + typed.write_batch( + get_binary_array(&array).as_slice(), + Some(levels.definition.as_slice()), + levels.repetition.as_deref(), + )? + } + ArrowDataType::LargeBinary | ArrowDataType::LargeUtf8 => { + let array = arrow_array::LargeBinaryArray::from(column.data()); + typed.write_batch( + get_large_binary_array(&array).as_slice(), + Some(levels.definition.as_slice()), + levels.repetition.as_deref(), + )? + } + _ => unreachable!("Currently unreachable because data type not supported"), + }, + ColumnWriter::FixedLenByteArrayColumnWriter(ref mut _typed) => { + unreachable!("Currently unreachable because data type not supported") + } + }; + Ok(written as i64) +} + +/// A struct that represents definition and repetition levels. +/// Repetition levels are only populated if the parent or current leaf is repeated +#[derive(Debug)] +struct Levels { + definition: Vec, + repetition: Option>, +} + +/// Compute nested levels of the Arrow array, recursing into lists and structs +fn get_levels( + array: &arrow_array::ArrayRef, + level: i16, + parent_def_levels: &[i16], + parent_rep_levels: Option<&[i16]>, +) -> Vec { + match array.data_type() { + ArrowDataType::Null => unimplemented!(), + ArrowDataType::Boolean + | ArrowDataType::Int8 + | ArrowDataType::Int16 + | ArrowDataType::Int32 + | ArrowDataType::Int64 + | ArrowDataType::UInt8 + | ArrowDataType::UInt16 + | ArrowDataType::UInt32 + | ArrowDataType::UInt64 + | ArrowDataType::Float16 + | ArrowDataType::Float32 + | ArrowDataType::Float64 + | ArrowDataType::Utf8 + | ArrowDataType::LargeUtf8 + | ArrowDataType::Timestamp(_, _) + | ArrowDataType::Date32(_) + | ArrowDataType::Date64(_) + | ArrowDataType::Time32(_) + | ArrowDataType::Time64(_) + | ArrowDataType::Duration(_) + | ArrowDataType::Interval(_) + | ArrowDataType::Binary + | ArrowDataType::LargeBinary => vec![Levels { + definition: get_primitive_def_levels(array, parent_def_levels), + repetition: None, + }], + ArrowDataType::FixedSizeBinary(_) => unimplemented!(), + ArrowDataType::List(_) | ArrowDataType::LargeList(_) => { + let array_data = array.data(); + let child_data = array_data.child_data().get(0).unwrap(); + // get offsets, accounting for large offsets if present + let offsets: Vec = { + if let ArrowDataType::LargeList(_) = array.data_type() { + unsafe { array_data.buffers()[0].typed_data::() }.to_vec() + } else { + let offsets = unsafe { array_data.buffers()[0].typed_data::() }; + offsets.to_vec().into_iter().map(|v| v as i64).collect() + } + }; + let child_array = arrow_array::make_array(child_data.clone()); + + let mut list_def_levels = Vec::with_capacity(child_array.len()); + let mut list_rep_levels = Vec::with_capacity(child_array.len()); + let rep_levels: Vec = parent_rep_levels + .map(|l| l.to_vec()) + .unwrap_or_else(|| vec![0i16; parent_def_levels.len()]); + parent_def_levels + .iter() + .zip(rep_levels) + .zip(offsets.windows(2)) + .for_each(|((parent_def_level, parent_rep_level), window)| { + if *parent_def_level == 0 { + // parent is null, list element must also be null + list_def_levels.push(0); + list_rep_levels.push(0); + } else { + // parent is not null, check if list is empty or null + let start = window[0]; + let end = window[1]; + let len = end - start; + if len == 0 { + list_def_levels.push(*parent_def_level - 1); + list_rep_levels.push(parent_rep_level); + } else { + list_def_levels.push(*parent_def_level); + list_rep_levels.push(parent_rep_level); + for _ in 1..len { + list_def_levels.push(*parent_def_level); + list_rep_levels.push(parent_rep_level + 1); + } + } + } + }); + + // if datatype is a primitive, we can construct levels of the child array + match child_array.data_type() { + ArrowDataType::Null => unimplemented!(), + ArrowDataType::Boolean => unimplemented!(), + ArrowDataType::Int8 + | ArrowDataType::Int16 + | ArrowDataType::Int32 + | ArrowDataType::Int64 + | ArrowDataType::UInt8 + | ArrowDataType::UInt16 + | ArrowDataType::UInt32 + | ArrowDataType::UInt64 + | ArrowDataType::Float16 + | ArrowDataType::Float32 + | ArrowDataType::Float64 + | ArrowDataType::Timestamp(_, _) + | ArrowDataType::Date32(_) + | ArrowDataType::Date64(_) + | ArrowDataType::Time32(_) + | ArrowDataType::Time64(_) + | ArrowDataType::Duration(_) + | ArrowDataType::Interval(_) => { + let def_levels = + get_primitive_def_levels(&child_array, &list_def_levels[..]); + vec![Levels { + definition: def_levels, + repetition: Some(list_rep_levels), + }] + } + ArrowDataType::Binary + | ArrowDataType::Utf8 + | ArrowDataType::LargeUtf8 => unimplemented!(), + ArrowDataType::FixedSizeBinary(_) => unimplemented!(), + ArrowDataType::LargeBinary => unimplemented!(), + ArrowDataType::List(_) | ArrowDataType::LargeList(_) => { + // nested list + unimplemented!() + } + ArrowDataType::FixedSizeList(_, _) => unimplemented!(), + ArrowDataType::Struct(_) => get_levels( + array, + level + 1, // indicates a nesting level of 2 (list + struct) + &list_def_levels[..], + Some(&list_rep_levels[..]), + ), + ArrowDataType::Union(_) => unimplemented!(), + ArrowDataType::Dictionary(_, _) => unimplemented!(), + } + } + ArrowDataType::FixedSizeList(_, _) => unimplemented!(), + ArrowDataType::Struct(_) => { + let struct_array: &arrow_array::StructArray = array + .as_any() + .downcast_ref::() + .expect("Unable to get struct array"); + let mut struct_def_levels = Vec::with_capacity(struct_array.len()); + for i in 0..array.len() { + struct_def_levels.push(level + struct_array.is_valid(i) as i16); + } + // trying to create levels for struct's fields + let mut struct_levels = vec![]; + struct_array.columns().into_iter().for_each(|col| { + let mut levels = + get_levels(col, level + 1, &struct_def_levels[..], parent_rep_levels); + struct_levels.append(&mut levels); + }); + struct_levels + } + ArrowDataType::Union(_) => unimplemented!(), + ArrowDataType::Dictionary(_, _) => unimplemented!(), + } +} + +/// Get the definition levels of the numeric array, with level 0 being null and 1 being not null +/// In the case where the array in question is a child of either a list or struct, the levels +/// are incremented in accordance with the `level` parameter. +/// Parent levels are either 0 or 1, and are used to higher (correct terminology?) leaves as null +fn get_primitive_def_levels( + array: &arrow_array::ArrayRef, + parent_def_levels: &[i16], +) -> Vec { + let mut array_index = 0; + let max_def_level = parent_def_levels.iter().max().unwrap(); + let mut primitive_def_levels = vec![]; + parent_def_levels.iter().for_each(|def_level| { + if def_level < max_def_level { + primitive_def_levels.push(*def_level); + } else { + primitive_def_levels.push(def_level - array.is_null(array_index) as i16); + array_index += 1; + } + }); + primitive_def_levels +} + +macro_rules! def_get_binary_array_fn { + ($name:ident, $ty:ty) => { + fn $name(array: &$ty) -> Vec { + let mut values = Vec::with_capacity(array.len() - array.null_count()); + for i in 0..array.len() { + if array.is_valid(i) { + let bytes = ByteArray::from(array.value(i).to_vec()); + values.push(bytes); + } + } + values + } + }; +} + +def_get_binary_array_fn!(get_binary_array, arrow_array::BinaryArray); +def_get_binary_array_fn!(get_large_binary_array, arrow_array::LargeBinaryArray); + +/// Get the underlying numeric array slice, skipping any null values. +/// If there are no null values, it might be quicker to get the slice directly instead of +/// calling this function. +fn get_numeric_array_slice(array: &arrow_array::PrimitiveArray) -> Vec +where + T: DataType, + A: arrow::datatypes::ArrowNumericType, + T::T: From, +{ + let mut values = Vec::with_capacity(array.len() - array.null_count()); + for i in 0..array.len() { + if array.is_valid(i) { + values.push(array.value(i).into()) + } + } + values +} + +#[cfg(test)] +mod tests { + use super::*; + + use std::io::Seek; + use std::sync::Arc; + + use arrow::array::*; + use arrow::datatypes::ToByteSlice; + use arrow::datatypes::{DataType, Field, Schema}; + use arrow::record_batch::{RecordBatch, RecordBatchReader}; + + use crate::arrow::{ArrowReader, ParquetFileArrowReader}; + use crate::file::reader::SerializedFileReader; + use crate::util::test_common::get_temp_file; + + #[test] + fn arrow_writer() { + // define schema + let schema = Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, true), + ]); + + // create some data + let a = Int32Array::from(vec![1, 2, 3, 4, 5]); + let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]); + + // build a record batch + let batch = RecordBatch::try_new( + Arc::new(schema.clone()), + vec![Arc::new(a), Arc::new(b)], + ) + .unwrap(); + + let file = get_temp_file("test_arrow_writer.parquet", &[]); + let mut writer = ArrowWriter::try_new(file, Arc::new(schema), None).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + } + + #[test] + fn arrow_writer_list() { + // define schema + let schema = Schema::new(vec![Field::new( + "a", + DataType::List(Box::new(DataType::Int32)), + false, + )]); + + // create some data + let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); + + // Construct a buffer for value offsets, for the nested array: + // [[false], [true, false], null, [true, false, true], [false, true, false, true]] + let a_value_offsets = + arrow::buffer::Buffer::from(&[0, 1, 3, 3, 6, 10].to_byte_slice()); + + // Construct a list array from the above two + let a_list_data = ArrayData::builder(DataType::List(Box::new(DataType::Int32))) + .len(5) + .add_buffer(a_value_offsets) + .add_child_data(a_values.data()) + .build(); + let a = ListArray::from(a_list_data); + + // build a record batch + let batch = + RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)]).unwrap(); + + let file = get_temp_file("test_arrow_writer_list.parquet", &[]); + let mut writer = ArrowWriter::try_new(file, Arc::new(schema), None).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + } + + #[test] + fn arrow_writer_binary() { + let string_field = Field::new("a", DataType::Utf8, false); + let binary_field = Field::new("b", DataType::Binary, false); + let schema = Schema::new(vec![string_field, binary_field]); + + let raw_string_values = vec!["foo", "bar", "baz", "quux"]; + let raw_binary_values = vec![ + b"foo".to_vec(), + b"bar".to_vec(), + b"baz".to_vec(), + b"quux".to_vec(), + ]; + let raw_binary_value_refs = raw_binary_values + .iter() + .map(|x| x.as_slice()) + .collect::>(); + + let string_values = StringArray::from(raw_string_values.clone()); + let binary_values = BinaryArray::from(raw_binary_value_refs); + let batch = RecordBatch::try_new( + Arc::new(schema.clone()), + vec![Arc::new(string_values), Arc::new(binary_values)], + ) + .unwrap(); + + let mut file = get_temp_file("test_arrow_writer.parquet", &[]); + let mut writer = + ArrowWriter::try_new(file.try_clone().unwrap(), Arc::new(schema), None) + .unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + + file.seek(std::io::SeekFrom::Start(0)).unwrap(); + let file_reader = SerializedFileReader::new(file).unwrap(); + let mut arrow_reader = ParquetFileArrowReader::new(Rc::new(file_reader)); + let mut record_batch_reader = arrow_reader.get_record_reader(1024).unwrap(); + + let batch = record_batch_reader.next_batch().unwrap().unwrap(); + let string_col = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let binary_col = batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + + for i in 0..batch.num_rows() { + assert_eq!(string_col.value(i), raw_string_values[i]); + assert_eq!(binary_col.value(i), raw_binary_values[i].as_slice()); + } + } + + #[test] + fn arrow_writer_complex() { + // define schema + let struct_field_d = Field::new("d", DataType::Float64, true); + let struct_field_f = Field::new("f", DataType::Float32, true); + let struct_field_g = + Field::new("g", DataType::List(Box::new(DataType::Int16)), false); + let struct_field_e = Field::new( + "e", + DataType::Struct(vec![struct_field_f.clone(), struct_field_g.clone()]), + true, + ); + let schema = Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, true), + Field::new( + "c", + DataType::Struct(vec![struct_field_d.clone(), struct_field_e.clone()]), + false, + ), + ]); + + // create some data + let a = Int32Array::from(vec![1, 2, 3, 4, 5]); + let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]); + let d = Float64Array::from(vec![None, None, None, Some(1.0), None]); + let f = Float32Array::from(vec![Some(0.0), None, Some(333.3), None, Some(5.25)]); + + let g_value = Int16Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); + + // Construct a buffer for value offsets, for the nested array: + // [[1], [2, 3], null, [4, 5, 6], [7, 8, 9, 10]] + let g_value_offsets = + arrow::buffer::Buffer::from(&[0, 1, 3, 3, 6, 10].to_byte_slice()); + + // Construct a list array from the above two + let g_list_data = ArrayData::builder(struct_field_g.data_type().clone()) + .len(5) + .add_buffer(g_value_offsets) + .add_child_data(g_value.data()) + .build(); + let g = ListArray::from(g_list_data); + + let e = StructArray::from(vec![ + (struct_field_f, Arc::new(f) as ArrayRef), + (struct_field_g, Arc::new(g) as ArrayRef), + ]); + + let c = StructArray::from(vec![ + (struct_field_d, Arc::new(d) as ArrayRef), + (struct_field_e, Arc::new(e) as ArrayRef), + ]); + + // build a record batch + let batch = RecordBatch::try_new( + Arc::new(schema.clone()), + vec![Arc::new(a), Arc::new(b), Arc::new(c)], + ) + .unwrap(); + + let file = get_temp_file("test_arrow_writer_complex.parquet", &[]); + let mut writer = ArrowWriter::try_new(file, Arc::new(schema), None).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + } +} diff --git a/rust/parquet/src/arrow/mod.rs b/rust/parquet/src/arrow/mod.rs index 02f50fd3a90..c8739c2e1f8 100644 --- a/rust/parquet/src/arrow/mod.rs +++ b/rust/parquet/src/arrow/mod.rs @@ -51,10 +51,13 @@ pub(in crate::arrow) mod array_reader; pub mod arrow_reader; +pub mod arrow_writer; pub(in crate::arrow) mod converter; pub(in crate::arrow) mod record_reader; pub mod schema; pub use self::arrow_reader::ArrowReader; pub use self::arrow_reader::ParquetFileArrowReader; -pub use self::schema::{parquet_to_arrow_schema, parquet_to_arrow_schema_by_columns}; +pub use self::schema::{ + arrow_to_parquet_schema, parquet_to_arrow_schema, parquet_to_arrow_schema_by_columns, +}; diff --git a/rust/parquet/src/column/writer.rs b/rust/parquet/src/column/writer.rs index f26c37bc2a4..c4f60eed187 100644 --- a/rust/parquet/src/column/writer.rs +++ b/rust/parquet/src/column/writer.rs @@ -57,7 +57,6 @@ pub enum Level { macro_rules! gen_stats_section { ($physical_ty: ty, $stat_fn: ident, $min: ident, $max: ident, $distinct: ident, $nulls: ident) => {{ let min = $min.as_ref().and_then(|v| { - println!("min: {:?} {}", &v.as_bytes(), v.as_bytes().len()); Some(read_num_bytes!( $physical_ty, v.as_bytes().len(), @@ -65,7 +64,6 @@ macro_rules! gen_stats_section { )) }); let max = $max.as_ref().and_then(|v| { - println!("max: {:?} {}", &v.as_bytes(), v.as_bytes().len()); Some(read_num_bytes!( $physical_ty, v.as_bytes().len(), diff --git a/rust/parquet/src/schema/types.rs b/rust/parquet/src/schema/types.rs index e1227c283dc..cd748243772 100644 --- a/rust/parquet/src/schema/types.rs +++ b/rust/parquet/src/schema/types.rs @@ -788,7 +788,7 @@ impl SchemaDescriptor { result.clone() } - fn column_root_of(&self, i: usize) -> &Rc { + fn column_root_of(&self, i: usize) -> &TypePtr { assert!( i < self.leaves.len(), "Index out of bound: {} not in [0, {})", @@ -810,6 +810,10 @@ impl SchemaDescriptor { self.schema.as_ref() } + pub fn root_schema_ptr(&self) -> TypePtr { + self.schema.clone() + } + /// Returns schema name. pub fn name(&self) -> &str { self.schema.name() From 96a83e6097ca3228ed5b0aa5fc04ef37db8c9fb2 Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Sat, 8 Aug 2020 19:52:51 +0200 Subject: [PATCH 2/5] ARROW-8243: [Rust] [Parquet] Serialize Arrow schema metadata This will allow preserving Arrow-specific metadata when writing or reading Parquet files created from C++ or Rust. If the schema can't be deserialised, the normal Parquet > Arrow schema conversion is performed. --- rust/parquet/Cargo.toml | 3 +- rust/parquet/src/arrow/arrow_reader.rs | 165 ++++++++++++++++++++++++- rust/parquet/src/arrow/arrow_writer.rs | 56 ++++++++- rust/parquet/src/arrow/mod.rs | 4 + rust/parquet/src/arrow/schema.rs | 137 ++++++++++++-------- rust/parquet/src/file/properties.rs | 6 +- 6 files changed, 305 insertions(+), 66 deletions(-) diff --git a/rust/parquet/Cargo.toml b/rust/parquet/Cargo.toml index 7fc5b8ac271..bec0c93a2dd 100644 --- a/rust/parquet/Cargo.toml +++ b/rust/parquet/Cargo.toml @@ -40,6 +40,7 @@ zstd = { version = "0.5", optional = true } chrono = "0.4" num-bigint = "0.3" arrow = { path = "../arrow", version = "2.0.0-SNAPSHOT", optional = true } +base64 = { version = "*", optional = true } serde_json = { version = "1.0", features = ["preserve_order"] } [dev-dependencies] @@ -52,4 +53,4 @@ zstd = "0.5" arrow = { path = "../arrow", version = "2.0.0-SNAPSHOT" } [features] -default = ["arrow", "snap", "brotli", "flate2", "lz4", "zstd"] +default = ["arrow", "snap", "brotli", "flate2", "lz4", "zstd", "base64"] diff --git a/rust/parquet/src/arrow/arrow_reader.rs b/rust/parquet/src/arrow/arrow_reader.rs index f052d0f36e5..c6d46f45235 100644 --- a/rust/parquet/src/arrow/arrow_reader.rs +++ b/rust/parquet/src/arrow/arrow_reader.rs @@ -204,8 +204,11 @@ impl ParquetRecordBatchReader { #[cfg(test)] mod tests { use crate::arrow::arrow_reader::{ArrowReader, ParquetFileArrowReader}; - use crate::arrow::converter::{ - Converter, FixedSizeArrayConverter, FromConverter, Utf8ArrayConverter, + use crate::arrow::{ + converter::{ + Converter, FixedSizeArrayConverter, FromConverter, Utf8ArrayConverter, + }, + ArrowWriter, }; use crate::column::writer::get_typed_column_writer_mut; use crate::data_type::{ @@ -217,11 +220,11 @@ mod tests { use crate::file::writer::{FileWriter, SerializedFileWriter}; use crate::schema::parser::parse_message_type; use crate::schema::types::TypePtr; - use crate::util::test_common::{get_temp_filename, RandGen}; + use crate::util::test_common::{get_temp_file, get_temp_filename, RandGen}; use arrow::array::{ Array, BooleanArray, FixedSizeBinaryArray, StringArray, StructArray, }; - use arrow::record_batch::RecordBatchReader; + use arrow::{datatypes, record_batch::RecordBatchReader}; use rand::RngCore; use serde_json::json; use serde_json::Value::{Array as JArray, Null as JNull, Object as JObject}; @@ -230,7 +233,7 @@ mod tests { use std::env; use std::fs::File; use std::path::{Path, PathBuf}; - use std::rc::Rc; + use std::{collections::HashMap, rc::Rc, sync::Arc}; #[test] fn test_arrow_reader_all_columns() { @@ -304,6 +307,158 @@ mod tests { >(2, 100, 2, message_type, 15, 50, converter); } + #[test] + fn test_arrow_schema_roundtrip() -> Result<()> { + // This tests the roundtrip of an Arrow schema + // Fields that are commented out fail roundtrip tests or are unsupported by the writer + let metadata: HashMap = + [("Key".to_string(), "Value".to_string())] + .iter() + .cloned() + .collect(); + + let schema = datatypes::Schema::new_with_metadata( + vec![ + datatypes::Field::new("c1", datatypes::DataType::Utf8, false), + datatypes::Field::new("c2", datatypes::DataType::Binary, false), + datatypes::Field::new( + "c3", + datatypes::DataType::FixedSizeBinary(3), + false, + ), + datatypes::Field::new("c4", datatypes::DataType::Boolean, false), + datatypes::Field::new( + "c5", + datatypes::DataType::Date32(datatypes::DateUnit::Day), + false, + ), + datatypes::Field::new( + "c6", + datatypes::DataType::Date64(datatypes::DateUnit::Millisecond), + false, + ), + datatypes::Field::new( + "c7", + datatypes::DataType::Time32(datatypes::TimeUnit::Second), + false, + ), + datatypes::Field::new( + "c8", + datatypes::DataType::Time32(datatypes::TimeUnit::Millisecond), + false, + ), + datatypes::Field::new( + "c13", + datatypes::DataType::Time64(datatypes::TimeUnit::Microsecond), + false, + ), + datatypes::Field::new( + "c14", + datatypes::DataType::Time64(datatypes::TimeUnit::Nanosecond), + false, + ), + datatypes::Field::new( + "c15", + datatypes::DataType::Timestamp(datatypes::TimeUnit::Second, None), + false, + ), + datatypes::Field::new( + "c16", + datatypes::DataType::Timestamp( + datatypes::TimeUnit::Millisecond, + Some(Arc::new("UTC".to_string())), + ), + false, + ), + datatypes::Field::new( + "c17", + datatypes::DataType::Timestamp( + datatypes::TimeUnit::Microsecond, + Some(Arc::new("Africa/Johannesburg".to_string())), + ), + false, + ), + datatypes::Field::new( + "c18", + datatypes::DataType::Timestamp(datatypes::TimeUnit::Nanosecond, None), + false, + ), + // datatypes::Field::new("c19", datatypes::DataType::Interval(datatypes::IntervalUnit::DayTime), false), + // datatypes::Field::new("c20", datatypes::DataType::Interval(datatypes::IntervalUnit::YearMonth), false), + datatypes::Field::new( + "c21", + datatypes::DataType::List(Box::new(datatypes::DataType::Boolean)), + false, + ), + datatypes::Field::new( + "c22", + datatypes::DataType::FixedSizeList( + Box::new(datatypes::DataType::Boolean), + 5, + ), + false, + ), + // datatypes::Field::new( + // "c23", + // datatypes::DataType::List(Box::new(datatypes::DataType::List(Box::new(datatypes::DataType::Struct( + // vec![], + // ))))), + // true, + // ), + datatypes::Field::new( + "c24", + datatypes::DataType::Struct(vec![ + datatypes::Field::new("a", datatypes::DataType::Utf8, false), + datatypes::Field::new("b", datatypes::DataType::UInt16, false), + ]), + false, + ), + // datatypes::Field::new("c25", datatypes::DataType::Interval(datatypes::IntervalUnit::YearMonth), true), + // datatypes::Field::new("c26", datatypes::DataType::Interval(datatypes::IntervalUnit::DayTime), true), + // datatypes::Field::new("c27", datatypes::DataType::Duration(datatypes::TimeUnit::Second), false), + // datatypes::Field::new("c28", datatypes::DataType::Duration(datatypes::TimeUnit::Millisecond), false), + // datatypes::Field::new("c29", datatypes::DataType::Duration(datatypes::TimeUnit::Microsecond), false), + // datatypes::Field::new("c30", datatypes::DataType::Duration(datatypes::TimeUnit::Nanosecond), false), + // datatypes::Field::new_dict( + // "c31", + // datatypes::DataType::Dictionary( + // Box::new(datatypes::DataType::Int32), + // Box::new(datatypes::DataType::Utf8), + // ), + // true, + // 123, + // true, + // ), + datatypes::Field::new("c32", datatypes::DataType::LargeBinary, true), + datatypes::Field::new("c33", datatypes::DataType::LargeUtf8, true), + // datatypes::Field::new( + // "c34", + // datatypes::DataType::LargeList(Box::new(datatypes::DataType::LargeList(Box::new( + // datatypes::DataType::Struct(vec![]), + // )))), + // true, + // ), + ], + metadata, + ); + + // write to an empty parquet file so that schema is serialized + let file = get_temp_file("test_arrow_schema_roundtrip.parquet", &[]); + let mut writer = ArrowWriter::try_new( + file.try_clone().unwrap(), + Arc::new(schema.clone()), + None, + )?; + writer.close()?; + + // read file back + let parquet_reader = SerializedFileReader::try_from(file)?; + let mut arrow_reader = ParquetFileArrowReader::new(Rc::new(parquet_reader)); + let read_schema = arrow_reader.get_schema()?; + assert_eq!(schema, read_schema); + Ok(()) + } + struct RandFixedLenGen {} impl RandGen for RandFixedLenGen { diff --git a/rust/parquet/src/arrow/arrow_writer.rs b/rust/parquet/src/arrow/arrow_writer.rs index 0c1c4903d16..d8531430090 100644 --- a/rust/parquet/src/arrow/arrow_writer.rs +++ b/rust/parquet/src/arrow/arrow_writer.rs @@ -29,7 +29,10 @@ use crate::errors::{ParquetError, Result}; use crate::file::properties::WriterProperties; use crate::{ data_type::*, - file::writer::{FileWriter, ParquetWriter, RowGroupWriter, SerializedFileWriter}, + file::{ + metadata::KeyValue, + writer::{FileWriter, ParquetWriter, RowGroupWriter, SerializedFileWriter}, + }, }; /// Arrow writer @@ -53,13 +56,48 @@ impl ArrowWriter { pub fn try_new( writer: W, arrow_schema: SchemaRef, - props: Option>, + props: Option, ) -> Result { let schema = crate::arrow::arrow_to_parquet_schema(&arrow_schema)?; + // add serialized arrow schema + let mut serialized_schema = arrow::ipc::writer::schema_to_bytes(&arrow_schema); + let schema_len = serialized_schema.len(); + let mut len_prefix_schema = Vec::with_capacity(schema_len + 8); + len_prefix_schema.append(&mut vec![255u8, 255, 255, 255]); + len_prefix_schema.append((schema_len as u32).to_le_bytes().to_vec().as_mut()); + len_prefix_schema.append(&mut serialized_schema); + let encoded = base64::encode(&len_prefix_schema); + let schema_kv = KeyValue { + key: super::ARROW_SCHEMA_META_KEY.to_string(), + value: Some(encoded), + }; let props = match props { - Some(props) => props, - None => Rc::new(WriterProperties::builder().build()), + Some(mut props) => { + let mut meta = props.key_value_metadata.clone().unwrap_or_default(); + // check if ARROW:schema exists, and overwrite it + let schema_meta = meta + .iter() + .enumerate() + .find(|(_, kv)| kv.key.as_str() == super::ARROW_SCHEMA_META_KEY); + match schema_meta { + Some((i, _)) => { + meta.remove(i); + meta.push(schema_kv); + } + None => { + meta.push(schema_kv); + } + } + props.key_value_metadata = Some(meta); + Rc::new(props) + } + None => Rc::new( + WriterProperties::builder() + .set_key_value_metadata(Some(vec![schema_kv])) + .build(), + ), }; + let file_writer = SerializedFileWriter::new( writer.try_clone()?, schema.root_schema_ptr(), @@ -674,8 +712,16 @@ mod tests { ) .unwrap(); + let props = WriterProperties::builder() + .set_key_value_metadata(Some(vec![KeyValue { + key: "test_key".to_string(), + value: Some("test_value".to_string()), + }])) + .build(); + let file = get_temp_file("test_arrow_writer_complex.parquet", &[]); - let mut writer = ArrowWriter::try_new(file, Arc::new(schema), None).unwrap(); + let mut writer = + ArrowWriter::try_new(file, Arc::new(schema), Some(props)).unwrap(); writer.write(&batch).unwrap(); writer.close().unwrap(); } diff --git a/rust/parquet/src/arrow/mod.rs b/rust/parquet/src/arrow/mod.rs index c8739c2e1f8..2bdb07cfbbb 100644 --- a/rust/parquet/src/arrow/mod.rs +++ b/rust/parquet/src/arrow/mod.rs @@ -58,6 +58,10 @@ pub mod schema; pub use self::arrow_reader::ArrowReader; pub use self::arrow_reader::ParquetFileArrowReader; +pub use self::arrow_writer::ArrowWriter; pub use self::schema::{ arrow_to_parquet_schema, parquet_to_arrow_schema, parquet_to_arrow_schema_by_columns, }; + +/// Schema metadata key used to store serialized Arrow IPC schema +pub const ARROW_SCHEMA_META_KEY: &str = "ARROW:schema"; diff --git a/rust/parquet/src/arrow/schema.rs b/rust/parquet/src/arrow/schema.rs index c31f9db3583..dee6cb10fb7 100644 --- a/rust/parquet/src/arrow/schema.rs +++ b/rust/parquet/src/arrow/schema.rs @@ -55,32 +55,67 @@ pub fn parquet_to_arrow_schema_by_columns( where T: IntoIterator, { - let mut base_nodes = Vec::new(); - let mut base_nodes_set = HashSet::new(); - let mut leaves = HashSet::new(); - - for c in column_indices { - let column = parquet_schema.column(c).self_type() as *const Type; - let root = parquet_schema.get_column_root(c); - let root_raw_ptr = root as *const Type; - - leaves.insert(column); - if !base_nodes_set.contains(&root_raw_ptr) { - base_nodes.push(root); - base_nodes_set.insert(root_raw_ptr); + let mut metadata = parse_key_value_metadata(key_value_metadata).unwrap_or_default(); + let arrow_schema_metadata = metadata + .remove(super::ARROW_SCHEMA_META_KEY) + .map(|encoded| get_arrow_schema_from_metadata(&encoded)); + + match arrow_schema_metadata { + Some(Some(schema)) => Ok(schema), + _ => { + let mut base_nodes = Vec::new(); + let mut base_nodes_set = HashSet::new(); + let mut leaves = HashSet::new(); + + for c in column_indices { + let column = parquet_schema.column(c).self_type() as *const Type; + let root = parquet_schema.get_column_root(c); + let root_raw_ptr = root as *const Type; + + leaves.insert(column); + if !base_nodes_set.contains(&root_raw_ptr) { + base_nodes.push(root); + base_nodes_set.insert(root_raw_ptr); + } + } + base_nodes + .into_iter() + .map(|t| ParquetTypeConverter::new(t, &leaves).to_field()) + .collect::>>>() + .map(|result| { + result.into_iter().filter_map(|f| f).collect::>() + }) + .map(|fields| Schema::new_with_metadata(fields, metadata)) } } +} - let metadata = parse_key_value_metadata(key_value_metadata) - .map(|m| m.clone()) - .unwrap_or(HashMap::default()); - - base_nodes - .into_iter() - .map(|t| ParquetTypeConverter::new(t, &leaves).to_field()) - .collect::>>>() - .map(|result| result.into_iter().filter_map(|f| f).collect::>()) - .map(|fields| Schema::new_with_metadata(fields, metadata)) +/// Try to convert to Arrow schema +fn get_arrow_schema_from_metadata(encoded_meta: &str) -> Option { + let decoded = base64::decode(encoded_meta); + match decoded { + Ok(bytes) => { + let slice = if bytes[0..4] == [255u8; 4] { + &bytes[8..] + } else { + bytes.as_slice() + }; + let message = arrow::ipc::get_root_as_message(slice); + message + .header_as_schema() + .map(arrow::ipc::convert::fb_to_schema) + } + Err(err) => { + // The C++ implementation returns an error if the schema can't be parsed. + // To prevent this, we explicitly log this, then compute the schema without the metadata + eprintln!( + "Unable to decode the encoded schema stored in {}, {:?}", + super::ARROW_SCHEMA_META_KEY, + err + ); + None + } + } } /// Convert arrow schema to parquet schema @@ -88,7 +123,7 @@ pub fn arrow_to_parquet_schema(schema: &Schema) -> Result { let fields: Result> = schema .fields() .iter() - .map(|field| arrow_to_parquet_type(field).map(|f| Rc::new(f))) + .map(|field| arrow_to_parquet_type(field).map(Rc::new)) .collect(); let group = Type::group_type_builder("arrow_schema") .with_fields(&mut fields?) @@ -220,42 +255,43 @@ fn arrow_to_parquet_type(field: &Field) -> Result { .with_length(3) .build() } - DataType::Binary => Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY) - .with_repetition(repetition) - .build(), + DataType::Binary | DataType::LargeBinary => { + Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY) + .with_repetition(repetition) + .build() + } DataType::FixedSizeBinary(length) => { Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY) .with_repetition(repetition) .with_length(*length) .build() } - DataType::Utf8 => Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY) - .with_logical_type(LogicalType::UTF8) - .with_repetition(repetition) - .build(), - DataType::List(dtype) | DataType::FixedSizeList(dtype, _) => { - Type::group_type_builder(name) - .with_fields(&mut vec![Rc::new( - Type::group_type_builder("list") - .with_fields(&mut vec![Rc::new({ - let list_field = Field::new( - "element", - *dtype.clone(), - field.is_nullable(), - ); - arrow_to_parquet_type(&list_field)? - })]) - .with_repetition(Repetition::REPEATED) - .build()?, - )]) - .with_logical_type(LogicalType::LIST) - .with_repetition(Repetition::REQUIRED) + DataType::Utf8 | DataType::LargeUtf8 => { + Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY) + .with_logical_type(LogicalType::UTF8) + .with_repetition(repetition) .build() } + DataType::List(dtype) + | DataType::FixedSizeList(dtype, _) + | DataType::LargeList(dtype) => Type::group_type_builder(name) + .with_fields(&mut vec![Rc::new( + Type::group_type_builder("list") + .with_fields(&mut vec![Rc::new({ + let list_field = + Field::new("element", *dtype.clone(), field.is_nullable()); + arrow_to_parquet_type(&list_field)? + })]) + .with_repetition(Repetition::REPEATED) + .build()?, + )]) + .with_logical_type(LogicalType::LIST) + .with_repetition(Repetition::REQUIRED) + .build(), DataType::Struct(fields) => { // recursively convert children to types/nodes let fields: Result> = fields - .into_iter() + .iter() .map(|f| arrow_to_parquet_type(f).map(Rc::new)) .collect(); Type::group_type_builder(name) @@ -269,9 +305,6 @@ fn arrow_to_parquet_type(field: &Field) -> Result { let dict_field = Field::new(name, *value.clone(), field.is_nullable()); arrow_to_parquet_type(&dict_field) } - DataType::LargeUtf8 | DataType::LargeBinary | DataType::LargeList(_) => { - Err(ArrowError("Large arrays not supported".to_string())) - } } } /// This struct is used to group methods and data structures used to convert parquet diff --git a/rust/parquet/src/file/properties.rs b/rust/parquet/src/file/properties.rs index 81d739b3b09..5eff262b2f1 100644 --- a/rust/parquet/src/file/properties.rs +++ b/rust/parquet/src/file/properties.rs @@ -89,8 +89,8 @@ pub type WriterPropertiesPtr = Rc; /// Writer properties. /// -/// It is created as an immutable data structure, use [`WriterPropertiesBuilder`] to -/// assemble the properties. +/// All properties except the key-value metadata are immutable, +/// use [`WriterPropertiesBuilder`] to assemble these properties. #[derive(Debug, Clone)] pub struct WriterProperties { data_pagesize_limit: usize, @@ -99,7 +99,7 @@ pub struct WriterProperties { max_row_group_size: usize, writer_version: WriterVersion, created_by: String, - key_value_metadata: Option>, + pub(crate) key_value_metadata: Option>, default_column_properties: ColumnProperties, column_properties: HashMap, } From 57628f4f6e6d1096b218ba9f6f4ca8a16b8a5331 Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Fri, 14 Aug 2020 16:11:31 +0200 Subject: [PATCH 3/5] various changes - fix failing writer test - only convert schema if there is no projection - fix fsl conversion error - return descriptive error if empty arrow struct encountered --- rust/parquet/src/arrow/arrow_reader.rs | 72 ++++++++++++----- rust/parquet/src/arrow/arrow_writer.rs | 2 +- rust/parquet/src/arrow/schema.rs | 103 ++++++++++++++----------- 3 files changed, 115 insertions(+), 62 deletions(-) diff --git a/rust/parquet/src/arrow/arrow_reader.rs b/rust/parquet/src/arrow/arrow_reader.rs index c6d46f45235..eeb94e826e0 100644 --- a/rust/parquet/src/arrow/arrow_reader.rs +++ b/rust/parquet/src/arrow/arrow_reader.rs @@ -383,8 +383,16 @@ mod tests { datatypes::DataType::Timestamp(datatypes::TimeUnit::Nanosecond, None), false, ), - // datatypes::Field::new("c19", datatypes::DataType::Interval(datatypes::IntervalUnit::DayTime), false), - // datatypes::Field::new("c20", datatypes::DataType::Interval(datatypes::IntervalUnit::YearMonth), false), + datatypes::Field::new( + "c19", + datatypes::DataType::Interval(datatypes::IntervalUnit::DayTime), + false, + ), + datatypes::Field::new( + "c20", + datatypes::DataType::Interval(datatypes::IntervalUnit::YearMonth), + false, + ), datatypes::Field::new( "c21", datatypes::DataType::List(Box::new(datatypes::DataType::Boolean)), @@ -398,13 +406,20 @@ mod tests { ), false, ), - // datatypes::Field::new( - // "c23", - // datatypes::DataType::List(Box::new(datatypes::DataType::List(Box::new(datatypes::DataType::Struct( - // vec![], - // ))))), - // true, - // ), + datatypes::Field::new( + "c23", + datatypes::DataType::List(Box::new(datatypes::DataType::List( + Box::new(datatypes::DataType::Struct(vec![ + datatypes::Field::new("a", datatypes::DataType::Int16, true), + datatypes::Field::new( + "b", + datatypes::DataType::Float64, + false, + ), + ])), + ))), + true, + ), datatypes::Field::new( "c24", datatypes::DataType::Struct(vec![ @@ -413,8 +428,16 @@ mod tests { ]), false, ), - // datatypes::Field::new("c25", datatypes::DataType::Interval(datatypes::IntervalUnit::YearMonth), true), - // datatypes::Field::new("c26", datatypes::DataType::Interval(datatypes::IntervalUnit::DayTime), true), + datatypes::Field::new( + "c25", + datatypes::DataType::Interval(datatypes::IntervalUnit::YearMonth), + true, + ), + datatypes::Field::new( + "c26", + datatypes::DataType::Interval(datatypes::IntervalUnit::DayTime), + true, + ), // datatypes::Field::new("c27", datatypes::DataType::Duration(datatypes::TimeUnit::Second), false), // datatypes::Field::new("c28", datatypes::DataType::Duration(datatypes::TimeUnit::Millisecond), false), // datatypes::Field::new("c29", datatypes::DataType::Duration(datatypes::TimeUnit::Microsecond), false), @@ -431,13 +454,26 @@ mod tests { // ), datatypes::Field::new("c32", datatypes::DataType::LargeBinary, true), datatypes::Field::new("c33", datatypes::DataType::LargeUtf8, true), - // datatypes::Field::new( - // "c34", - // datatypes::DataType::LargeList(Box::new(datatypes::DataType::LargeList(Box::new( - // datatypes::DataType::Struct(vec![]), - // )))), - // true, - // ), + datatypes::Field::new( + "c34", + datatypes::DataType::LargeList(Box::new( + datatypes::DataType::LargeList(Box::new( + datatypes::DataType::Struct(vec![ + datatypes::Field::new( + "a", + datatypes::DataType::Int16, + true, + ), + datatypes::Field::new( + "b", + datatypes::DataType::Float64, + true, + ), + ]), + )), + )), + true, + ), ], metadata, ); diff --git a/rust/parquet/src/arrow/arrow_writer.rs b/rust/parquet/src/arrow/arrow_writer.rs index d8531430090..cf05158593a 100644 --- a/rust/parquet/src/arrow/arrow_writer.rs +++ b/rust/parquet/src/arrow/arrow_writer.rs @@ -622,7 +622,7 @@ mod tests { ) .unwrap(); - let mut file = get_temp_file("test_arrow_writer.parquet", &[]); + let mut file = get_temp_file("test_arrow_writer_binary.parquet", &[]); let mut writer = ArrowWriter::try_new(file.try_clone().unwrap(), Arc::new(schema), None) .unwrap(); diff --git a/rust/parquet/src/arrow/schema.rs b/rust/parquet/src/arrow/schema.rs index dee6cb10fb7..590bce10069 100644 --- a/rust/parquet/src/arrow/schema.rs +++ b/rust/parquet/src/arrow/schema.rs @@ -34,16 +34,26 @@ use crate::schema::types::{ColumnDescriptor, SchemaDescriptor, Type, TypePtr}; use arrow::datatypes::TimeUnit; use arrow::datatypes::{DataType, DateUnit, Field, Schema}; -/// Convert parquet schema to arrow schema including optional metadata. +/// Convert Parquet schema to Arrow schema including optional metadata. +/// Attempts to decode any existing Arrow shcema metadata, falling back +/// to converting the Parquet schema column-wise pub fn parquet_to_arrow_schema( parquet_schema: &SchemaDescriptor, - metadata: &Option>, + key_value_metadata: &Option>, ) -> Result { - parquet_to_arrow_schema_by_columns( - parquet_schema, - 0..parquet_schema.columns().len(), - metadata, - ) + let mut metadata = parse_key_value_metadata(key_value_metadata).unwrap_or_default(); + let arrow_schema_metadata = metadata + .remove(super::ARROW_SCHEMA_META_KEY) + .map(|encoded| get_arrow_schema_from_metadata(&encoded)); + + match arrow_schema_metadata { + Some(Some(schema)) => Ok(schema), + _ => parquet_to_arrow_schema_by_columns( + parquet_schema, + 0..parquet_schema.columns().len(), + key_value_metadata, + ), + } } /// Convert parquet schema to arrow schema including optional metadata, only preserving some leaf columns. @@ -55,42 +65,33 @@ pub fn parquet_to_arrow_schema_by_columns( where T: IntoIterator, { - let mut metadata = parse_key_value_metadata(key_value_metadata).unwrap_or_default(); - let arrow_schema_metadata = metadata - .remove(super::ARROW_SCHEMA_META_KEY) - .map(|encoded| get_arrow_schema_from_metadata(&encoded)); + let mut base_nodes = Vec::new(); + let mut base_nodes_set = HashSet::new(); + let mut leaves = HashSet::new(); - match arrow_schema_metadata { - Some(Some(schema)) => Ok(schema), - _ => { - let mut base_nodes = Vec::new(); - let mut base_nodes_set = HashSet::new(); - let mut leaves = HashSet::new(); - - for c in column_indices { - let column = parquet_schema.column(c).self_type() as *const Type; - let root = parquet_schema.get_column_root(c); - let root_raw_ptr = root as *const Type; - - leaves.insert(column); - if !base_nodes_set.contains(&root_raw_ptr) { - base_nodes.push(root); - base_nodes_set.insert(root_raw_ptr); - } - } - base_nodes - .into_iter() - .map(|t| ParquetTypeConverter::new(t, &leaves).to_field()) - .collect::>>>() - .map(|result| { - result.into_iter().filter_map(|f| f).collect::>() - }) - .map(|fields| Schema::new_with_metadata(fields, metadata)) + for c in column_indices { + let column = parquet_schema.column(c).self_type() as *const Type; + let root = parquet_schema.get_column_root(c); + let root_raw_ptr = root as *const Type; + + leaves.insert(column); + if !base_nodes_set.contains(&root_raw_ptr) { + base_nodes.push(root); + base_nodes_set.insert(root_raw_ptr); } } + + let metadata = parse_key_value_metadata(key_value_metadata).unwrap_or_default(); + + base_nodes + .into_iter() + .map(|t| ParquetTypeConverter::new(t, &leaves).to_field()) + .collect::>>>() + .map(|result| result.into_iter().filter_map(|f| f).collect::>()) + .map(|fields| Schema::new_with_metadata(fields, metadata)) } -/// Try to convert to Arrow schema +/// Try to convert Arrow schema metadata into a schema fn get_arrow_schema_from_metadata(encoded_meta: &str) -> Option { let decoded = base64::decode(encoded_meta); match decoded { @@ -252,7 +253,7 @@ fn arrow_to_parquet_type(field: &Field) -> Result { Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY) .with_logical_type(LogicalType::INTERVAL) .with_repetition(repetition) - .with_length(3) + .with_length(12) .build() } DataType::Binary | DataType::LargeBinary => { @@ -289,6 +290,11 @@ fn arrow_to_parquet_type(field: &Field) -> Result { .with_repetition(Repetition::REQUIRED) .build(), DataType::Struct(fields) => { + if fields.is_empty() { + return Err(ArrowError( + "Parquet does not support writing empty structs".to_string(), + )); + } // recursively convert children to types/nodes let fields: Result> = fields .iter() @@ -464,9 +470,9 @@ impl ParquetTypeConverter<'_> { ref type_length, .. } => *type_length, _ => { - return Err(ArrowError(format!( - "Expected a physical type, not a group type" - ))) + return Err(ArrowError( + "Expected a physical type, not a group type".to_string(), + )) } }; @@ -548,7 +554,7 @@ impl ParquetTypeConverter<'_> { let item_type = match list_item.as_ref() { Type::PrimitiveType { .. } => { if item_converter.is_repeated() { - item_converter.to_primitive_type_inner().map(|dt| Some(dt)) + item_converter.to_primitive_type_inner().map(Some) } else { Err(ArrowError( "Primitive element type of list must be repeated." @@ -1229,6 +1235,17 @@ mod tests { }); } + #[test] + #[should_panic(expected = "Parquet does not support writing empty structs")] + fn test_empty_struct_field() { + let arrow_fields = vec![Field::new("struct", DataType::Struct(vec![]), false)]; + let arrow_schema = Schema::new(arrow_fields); + let converted_arrow_schema = arrow_to_parquet_schema(&arrow_schema); + + assert!(converted_arrow_schema.is_err()); + converted_arrow_schema.unwrap(); + } + #[test] fn test_metadata() { let message_type = " From e8a43e0909884d7e5d67bb42d40e917cd6b2354d Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Fri, 14 Aug 2020 19:55:31 +0200 Subject: [PATCH 4/5] move changes made to schema.rs --- rust/parquet/src/arrow/arrow_reader.rs | 201 +------------------------ rust/parquet/src/arrow/arrow_writer.rs | 49 +----- rust/parquet/src/arrow/mod.rs | 3 +- rust/parquet/src/arrow/schema.rs | 175 ++++++++++++++++++++- 4 files changed, 180 insertions(+), 248 deletions(-) diff --git a/rust/parquet/src/arrow/arrow_reader.rs b/rust/parquet/src/arrow/arrow_reader.rs index eeb94e826e0..f052d0f36e5 100644 --- a/rust/parquet/src/arrow/arrow_reader.rs +++ b/rust/parquet/src/arrow/arrow_reader.rs @@ -204,11 +204,8 @@ impl ParquetRecordBatchReader { #[cfg(test)] mod tests { use crate::arrow::arrow_reader::{ArrowReader, ParquetFileArrowReader}; - use crate::arrow::{ - converter::{ - Converter, FixedSizeArrayConverter, FromConverter, Utf8ArrayConverter, - }, - ArrowWriter, + use crate::arrow::converter::{ + Converter, FixedSizeArrayConverter, FromConverter, Utf8ArrayConverter, }; use crate::column::writer::get_typed_column_writer_mut; use crate::data_type::{ @@ -220,11 +217,11 @@ mod tests { use crate::file::writer::{FileWriter, SerializedFileWriter}; use crate::schema::parser::parse_message_type; use crate::schema::types::TypePtr; - use crate::util::test_common::{get_temp_file, get_temp_filename, RandGen}; + use crate::util::test_common::{get_temp_filename, RandGen}; use arrow::array::{ Array, BooleanArray, FixedSizeBinaryArray, StringArray, StructArray, }; - use arrow::{datatypes, record_batch::RecordBatchReader}; + use arrow::record_batch::RecordBatchReader; use rand::RngCore; use serde_json::json; use serde_json::Value::{Array as JArray, Null as JNull, Object as JObject}; @@ -233,7 +230,7 @@ mod tests { use std::env; use std::fs::File; use std::path::{Path, PathBuf}; - use std::{collections::HashMap, rc::Rc, sync::Arc}; + use std::rc::Rc; #[test] fn test_arrow_reader_all_columns() { @@ -307,194 +304,6 @@ mod tests { >(2, 100, 2, message_type, 15, 50, converter); } - #[test] - fn test_arrow_schema_roundtrip() -> Result<()> { - // This tests the roundtrip of an Arrow schema - // Fields that are commented out fail roundtrip tests or are unsupported by the writer - let metadata: HashMap = - [("Key".to_string(), "Value".to_string())] - .iter() - .cloned() - .collect(); - - let schema = datatypes::Schema::new_with_metadata( - vec![ - datatypes::Field::new("c1", datatypes::DataType::Utf8, false), - datatypes::Field::new("c2", datatypes::DataType::Binary, false), - datatypes::Field::new( - "c3", - datatypes::DataType::FixedSizeBinary(3), - false, - ), - datatypes::Field::new("c4", datatypes::DataType::Boolean, false), - datatypes::Field::new( - "c5", - datatypes::DataType::Date32(datatypes::DateUnit::Day), - false, - ), - datatypes::Field::new( - "c6", - datatypes::DataType::Date64(datatypes::DateUnit::Millisecond), - false, - ), - datatypes::Field::new( - "c7", - datatypes::DataType::Time32(datatypes::TimeUnit::Second), - false, - ), - datatypes::Field::new( - "c8", - datatypes::DataType::Time32(datatypes::TimeUnit::Millisecond), - false, - ), - datatypes::Field::new( - "c13", - datatypes::DataType::Time64(datatypes::TimeUnit::Microsecond), - false, - ), - datatypes::Field::new( - "c14", - datatypes::DataType::Time64(datatypes::TimeUnit::Nanosecond), - false, - ), - datatypes::Field::new( - "c15", - datatypes::DataType::Timestamp(datatypes::TimeUnit::Second, None), - false, - ), - datatypes::Field::new( - "c16", - datatypes::DataType::Timestamp( - datatypes::TimeUnit::Millisecond, - Some(Arc::new("UTC".to_string())), - ), - false, - ), - datatypes::Field::new( - "c17", - datatypes::DataType::Timestamp( - datatypes::TimeUnit::Microsecond, - Some(Arc::new("Africa/Johannesburg".to_string())), - ), - false, - ), - datatypes::Field::new( - "c18", - datatypes::DataType::Timestamp(datatypes::TimeUnit::Nanosecond, None), - false, - ), - datatypes::Field::new( - "c19", - datatypes::DataType::Interval(datatypes::IntervalUnit::DayTime), - false, - ), - datatypes::Field::new( - "c20", - datatypes::DataType::Interval(datatypes::IntervalUnit::YearMonth), - false, - ), - datatypes::Field::new( - "c21", - datatypes::DataType::List(Box::new(datatypes::DataType::Boolean)), - false, - ), - datatypes::Field::new( - "c22", - datatypes::DataType::FixedSizeList( - Box::new(datatypes::DataType::Boolean), - 5, - ), - false, - ), - datatypes::Field::new( - "c23", - datatypes::DataType::List(Box::new(datatypes::DataType::List( - Box::new(datatypes::DataType::Struct(vec![ - datatypes::Field::new("a", datatypes::DataType::Int16, true), - datatypes::Field::new( - "b", - datatypes::DataType::Float64, - false, - ), - ])), - ))), - true, - ), - datatypes::Field::new( - "c24", - datatypes::DataType::Struct(vec![ - datatypes::Field::new("a", datatypes::DataType::Utf8, false), - datatypes::Field::new("b", datatypes::DataType::UInt16, false), - ]), - false, - ), - datatypes::Field::new( - "c25", - datatypes::DataType::Interval(datatypes::IntervalUnit::YearMonth), - true, - ), - datatypes::Field::new( - "c26", - datatypes::DataType::Interval(datatypes::IntervalUnit::DayTime), - true, - ), - // datatypes::Field::new("c27", datatypes::DataType::Duration(datatypes::TimeUnit::Second), false), - // datatypes::Field::new("c28", datatypes::DataType::Duration(datatypes::TimeUnit::Millisecond), false), - // datatypes::Field::new("c29", datatypes::DataType::Duration(datatypes::TimeUnit::Microsecond), false), - // datatypes::Field::new("c30", datatypes::DataType::Duration(datatypes::TimeUnit::Nanosecond), false), - // datatypes::Field::new_dict( - // "c31", - // datatypes::DataType::Dictionary( - // Box::new(datatypes::DataType::Int32), - // Box::new(datatypes::DataType::Utf8), - // ), - // true, - // 123, - // true, - // ), - datatypes::Field::new("c32", datatypes::DataType::LargeBinary, true), - datatypes::Field::new("c33", datatypes::DataType::LargeUtf8, true), - datatypes::Field::new( - "c34", - datatypes::DataType::LargeList(Box::new( - datatypes::DataType::LargeList(Box::new( - datatypes::DataType::Struct(vec![ - datatypes::Field::new( - "a", - datatypes::DataType::Int16, - true, - ), - datatypes::Field::new( - "b", - datatypes::DataType::Float64, - true, - ), - ]), - )), - )), - true, - ), - ], - metadata, - ); - - // write to an empty parquet file so that schema is serialized - let file = get_temp_file("test_arrow_schema_roundtrip.parquet", &[]); - let mut writer = ArrowWriter::try_new( - file.try_clone().unwrap(), - Arc::new(schema.clone()), - None, - )?; - writer.close()?; - - // read file back - let parquet_reader = SerializedFileReader::try_from(file)?; - let mut arrow_reader = ParquetFileArrowReader::new(Rc::new(parquet_reader)); - let read_schema = arrow_reader.get_schema()?; - assert_eq!(schema, read_schema); - Ok(()) - } - struct RandFixedLenGen {} impl RandGen for RandFixedLenGen { diff --git a/rust/parquet/src/arrow/arrow_writer.rs b/rust/parquet/src/arrow/arrow_writer.rs index cf05158593a..2e01c4a6ba2 100644 --- a/rust/parquet/src/arrow/arrow_writer.rs +++ b/rust/parquet/src/arrow/arrow_writer.rs @@ -24,15 +24,13 @@ use arrow::datatypes::{DataType as ArrowDataType, SchemaRef}; use arrow::record_batch::RecordBatch; use arrow_array::Array; +use super::add_encoded_arrow_schema_to_metadata; use crate::column::writer::ColumnWriter; use crate::errors::{ParquetError, Result}; use crate::file::properties::WriterProperties; use crate::{ data_type::*, - file::{ - metadata::KeyValue, - writer::{FileWriter, ParquetWriter, RowGroupWriter, SerializedFileWriter}, - }, + file::writer::{FileWriter, ParquetWriter, RowGroupWriter, SerializedFileWriter}, }; /// Arrow writer @@ -60,48 +58,13 @@ impl ArrowWriter { ) -> Result { let schema = crate::arrow::arrow_to_parquet_schema(&arrow_schema)?; // add serialized arrow schema - let mut serialized_schema = arrow::ipc::writer::schema_to_bytes(&arrow_schema); - let schema_len = serialized_schema.len(); - let mut len_prefix_schema = Vec::with_capacity(schema_len + 8); - len_prefix_schema.append(&mut vec![255u8, 255, 255, 255]); - len_prefix_schema.append((schema_len as u32).to_le_bytes().to_vec().as_mut()); - len_prefix_schema.append(&mut serialized_schema); - let encoded = base64::encode(&len_prefix_schema); - let schema_kv = KeyValue { - key: super::ARROW_SCHEMA_META_KEY.to_string(), - value: Some(encoded), - }; - let props = match props { - Some(mut props) => { - let mut meta = props.key_value_metadata.clone().unwrap_or_default(); - // check if ARROW:schema exists, and overwrite it - let schema_meta = meta - .iter() - .enumerate() - .find(|(_, kv)| kv.key.as_str() == super::ARROW_SCHEMA_META_KEY); - match schema_meta { - Some((i, _)) => { - meta.remove(i); - meta.push(schema_kv); - } - None => { - meta.push(schema_kv); - } - } - props.key_value_metadata = Some(meta); - Rc::new(props) - } - None => Rc::new( - WriterProperties::builder() - .set_key_value_metadata(Some(vec![schema_kv])) - .build(), - ), - }; + let mut props = props.unwrap_or_else(|| WriterProperties::builder().build()); + add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props); let file_writer = SerializedFileWriter::new( writer.try_clone()?, schema.root_schema_ptr(), - props, + Rc::new(props), )?; Ok(Self { @@ -533,7 +496,7 @@ mod tests { use arrow::record_batch::{RecordBatch, RecordBatchReader}; use crate::arrow::{ArrowReader, ParquetFileArrowReader}; - use crate::file::reader::SerializedFileReader; + use crate::file::{metadata::KeyValue, reader::SerializedFileReader}; use crate::util::test_common::get_temp_file; #[test] diff --git a/rust/parquet/src/arrow/mod.rs b/rust/parquet/src/arrow/mod.rs index 2bdb07cfbbb..14531e55f15 100644 --- a/rust/parquet/src/arrow/mod.rs +++ b/rust/parquet/src/arrow/mod.rs @@ -60,7 +60,8 @@ pub use self::arrow_reader::ArrowReader; pub use self::arrow_reader::ParquetFileArrowReader; pub use self::arrow_writer::ArrowWriter; pub use self::schema::{ - arrow_to_parquet_schema, parquet_to_arrow_schema, parquet_to_arrow_schema_by_columns, + add_encoded_arrow_schema_to_metadata, arrow_to_parquet_schema, + parquet_to_arrow_schema, parquet_to_arrow_schema_by_columns, }; /// Schema metadata key used to store serialized Arrow IPC schema diff --git a/rust/parquet/src/arrow/schema.rs b/rust/parquet/src/arrow/schema.rs index 590bce10069..50500f15b88 100644 --- a/rust/parquet/src/arrow/schema.rs +++ b/rust/parquet/src/arrow/schema.rs @@ -26,14 +26,13 @@ use std::collections::{HashMap, HashSet}; use std::rc::Rc; +use arrow::datatypes::{DataType, DateUnit, Field, Schema, TimeUnit}; + use crate::basic::{LogicalType, Repetition, Type as PhysicalType}; use crate::errors::{ParquetError::ArrowError, Result}; -use crate::file::metadata::KeyValue; +use crate::file::{metadata::KeyValue, properties::WriterProperties}; use crate::schema::types::{ColumnDescriptor, SchemaDescriptor, Type, TypePtr}; -use arrow::datatypes::TimeUnit; -use arrow::datatypes::{DataType, DateUnit, Field, Schema}; - /// Convert Parquet schema to Arrow schema including optional metadata. /// Attempts to decode any existing Arrow shcema metadata, falling back /// to converting the Parquet schema column-wise @@ -119,6 +118,43 @@ fn get_arrow_schema_from_metadata(encoded_meta: &str) -> Option { } } +/// Mutates writer metadata by encoding the Arrow schema and storing it in the metadata. +/// If there is an existing Arrow schema metadata, it is replaced. +pub fn add_encoded_arrow_schema_to_metadata( + schema: &Schema, + props: &mut WriterProperties, +) { + let mut serialized_schema = arrow::ipc::writer::schema_to_bytes(&schema); + let schema_len = serialized_schema.len(); + let mut len_prefix_schema = Vec::with_capacity(schema_len + 8); + len_prefix_schema.append(&mut vec![255u8, 255, 255, 255]); + len_prefix_schema.append((schema_len as u32).to_le_bytes().to_vec().as_mut()); + len_prefix_schema.append(&mut serialized_schema); + let encoded = base64::encode(&len_prefix_schema); + + let schema_kv = KeyValue { + key: super::ARROW_SCHEMA_META_KEY.to_string(), + value: Some(encoded), + }; + + let mut meta = props.key_value_metadata.clone().unwrap_or_default(); + // check if ARROW:schema exists, and overwrite it + let schema_meta = meta + .iter() + .enumerate() + .find(|(_, kv)| kv.key.as_str() == super::ARROW_SCHEMA_META_KEY); + match schema_meta { + Some((i, _)) => { + meta.remove(i); + meta.push(schema_kv); + } + None => { + meta.push(schema_kv); + } + } + props.key_value_metadata = Some(meta); +} + /// Convert arrow schema to parquet schema pub fn arrow_to_parquet_schema(schema: &Schema) -> Result { let fields: Result> = schema @@ -596,12 +632,16 @@ impl ParquetTypeConverter<'_> { mod tests { use super::*; - use std::collections::HashMap; + use std::{collections::HashMap, convert::TryFrom, sync::Arc}; - use arrow::datatypes::{DataType, DateUnit, Field, TimeUnit}; + use arrow::datatypes::{DataType, DateUnit, Field, IntervalUnit, TimeUnit}; - use crate::file::metadata::KeyValue; - use crate::schema::{parser::parse_message_type, types::SchemaDescriptor}; + use crate::file::{metadata::KeyValue, reader::SerializedFileReader}; + use crate::{ + arrow::{ArrowReader, ArrowWriter, ParquetFileArrowReader}, + schema::{parser::parse_message_type, types::SchemaDescriptor}, + util::test_common::get_temp_file, + }; #[test] fn test_flat_primitives() { @@ -1268,4 +1308,123 @@ mod tests { assert_eq!(converted_arrow_schema.metadata(), &expected_metadata); } + + #[test] + fn test_arrow_schema_roundtrip() -> Result<()> { + // This tests the roundtrip of an Arrow schema + // Fields that are commented out fail roundtrip tests or are unsupported by the writer + let metadata: HashMap = + [("Key".to_string(), "Value".to_string())] + .iter() + .cloned() + .collect(); + + let schema = Schema::new_with_metadata( + vec![ + Field::new("c1", DataType::Utf8, false), + Field::new("c2", DataType::Binary, false), + Field::new("c3", DataType::FixedSizeBinary(3), false), + Field::new("c4", DataType::Boolean, false), + Field::new("c5", DataType::Date32(DateUnit::Day), false), + Field::new("c6", DataType::Date64(DateUnit::Millisecond), false), + Field::new("c7", DataType::Time32(TimeUnit::Second), false), + Field::new("c8", DataType::Time32(TimeUnit::Millisecond), false), + Field::new("c13", DataType::Time64(TimeUnit::Microsecond), false), + Field::new("c14", DataType::Time64(TimeUnit::Nanosecond), false), + Field::new("c15", DataType::Timestamp(TimeUnit::Second, None), false), + Field::new( + "c16", + DataType::Timestamp( + TimeUnit::Millisecond, + Some(Arc::new("UTC".to_string())), + ), + false, + ), + Field::new( + "c17", + DataType::Timestamp( + TimeUnit::Microsecond, + Some(Arc::new("Africa/Johannesburg".to_string())), + ), + false, + ), + Field::new( + "c18", + DataType::Timestamp(TimeUnit::Nanosecond, None), + false, + ), + Field::new("c19", DataType::Interval(IntervalUnit::DayTime), false), + Field::new("c20", DataType::Interval(IntervalUnit::YearMonth), false), + Field::new("c21", DataType::List(Box::new(DataType::Boolean)), false), + Field::new( + "c22", + DataType::FixedSizeList(Box::new(DataType::Boolean), 5), + false, + ), + Field::new( + "c23", + DataType::List(Box::new(DataType::List(Box::new(DataType::Struct( + vec![ + Field::new("a", DataType::Int16, true), + Field::new("b", DataType::Float64, false), + ], + ))))), + true, + ), + Field::new( + "c24", + DataType::Struct(vec![ + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::UInt16, false), + ]), + false, + ), + Field::new("c25", DataType::Interval(IntervalUnit::YearMonth), true), + Field::new("c26", DataType::Interval(IntervalUnit::DayTime), true), + // Field::new("c27", DataType::Duration(TimeUnit::Second), false), + // Field::new("c28", DataType::Duration(TimeUnit::Millisecond), false), + // Field::new("c29", DataType::Duration(TimeUnit::Microsecond), false), + // Field::new("c30", DataType::Duration(TimeUnit::Nanosecond), false), + // Field::new_dict( + // "c31", + // DataType::Dictionary( + // Box::new(DataType::Int32), + // Box::new(DataType::Utf8), + // ), + // true, + // 123, + // true, + // ), + Field::new("c32", DataType::LargeBinary, true), + Field::new("c33", DataType::LargeUtf8, true), + Field::new( + "c34", + DataType::LargeList(Box::new(DataType::LargeList(Box::new( + DataType::Struct(vec![ + Field::new("a", DataType::Int16, true), + Field::new("b", DataType::Float64, true), + ]), + )))), + true, + ), + ], + metadata, + ); + + // write to an empty parquet file so that schema is serialized + let file = get_temp_file("test_arrow_schema_roundtrip.parquet", &[]); + let mut writer = ArrowWriter::try_new( + file.try_clone().unwrap(), + Arc::new(schema.clone()), + None, + )?; + writer.close()?; + + // read file back + let parquet_reader = SerializedFileReader::try_from(file)?; + let mut arrow_reader = ParquetFileArrowReader::new(Rc::new(parquet_reader)); + let read_schema = arrow_reader.get_schema()?; + assert_eq!(schema, read_schema); + Ok(()) + } } From 8abf5cf7597472554050ea30ac01286a6e07c891 Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Tue, 18 Aug 2020 13:39:38 +0200 Subject: [PATCH 5/5] address review feedback --- rust/parquet/src/arrow/arrow_writer.rs | 2 +- rust/parquet/src/arrow/mod.rs | 3 +-- rust/parquet/src/arrow/schema.rs | 23 ++++++++++++++++------- 3 files changed, 18 insertions(+), 10 deletions(-) diff --git a/rust/parquet/src/arrow/arrow_writer.rs b/rust/parquet/src/arrow/arrow_writer.rs index 2e01c4a6ba2..1ca8d50fed0 100644 --- a/rust/parquet/src/arrow/arrow_writer.rs +++ b/rust/parquet/src/arrow/arrow_writer.rs @@ -24,7 +24,7 @@ use arrow::datatypes::{DataType as ArrowDataType, SchemaRef}; use arrow::record_batch::RecordBatch; use arrow_array::Array; -use super::add_encoded_arrow_schema_to_metadata; +use super::schema::add_encoded_arrow_schema_to_metadata; use crate::column::writer::ColumnWriter; use crate::errors::{ParquetError, Result}; use crate::file::properties::WriterProperties; diff --git a/rust/parquet/src/arrow/mod.rs b/rust/parquet/src/arrow/mod.rs index 14531e55f15..2bdb07cfbbb 100644 --- a/rust/parquet/src/arrow/mod.rs +++ b/rust/parquet/src/arrow/mod.rs @@ -60,8 +60,7 @@ pub use self::arrow_reader::ArrowReader; pub use self::arrow_reader::ParquetFileArrowReader; pub use self::arrow_writer::ArrowWriter; pub use self::schema::{ - add_encoded_arrow_schema_to_metadata, arrow_to_parquet_schema, - parquet_to_arrow_schema, parquet_to_arrow_schema_by_columns, + arrow_to_parquet_schema, parquet_to_arrow_schema, parquet_to_arrow_schema_by_columns, }; /// Schema metadata key used to store serialized Arrow IPC schema diff --git a/rust/parquet/src/arrow/schema.rs b/rust/parquet/src/arrow/schema.rs index 50500f15b88..d4cfe1f4772 100644 --- a/rust/parquet/src/arrow/schema.rs +++ b/rust/parquet/src/arrow/schema.rs @@ -118,19 +118,28 @@ fn get_arrow_schema_from_metadata(encoded_meta: &str) -> Option { } } -/// Mutates writer metadata by encoding the Arrow schema and storing it in the metadata. -/// If there is an existing Arrow schema metadata, it is replaced. -pub fn add_encoded_arrow_schema_to_metadata( - schema: &Schema, - props: &mut WriterProperties, -) { +/// Encodes the Arrow schema into the IPC format, and base64 encodes it +fn encode_arrow_schema(schema: &Schema) -> String { let mut serialized_schema = arrow::ipc::writer::schema_to_bytes(&schema); + + // manually prepending the length to the schema as arrow uses the legacy IPC format + // TODO: change after addressing ARROW-9777 let schema_len = serialized_schema.len(); let mut len_prefix_schema = Vec::with_capacity(schema_len + 8); len_prefix_schema.append(&mut vec![255u8, 255, 255, 255]); len_prefix_schema.append((schema_len as u32).to_le_bytes().to_vec().as_mut()); len_prefix_schema.append(&mut serialized_schema); - let encoded = base64::encode(&len_prefix_schema); + + base64::encode(&len_prefix_schema) +} + +/// Mutates writer metadata by storing the encoded Arrow schema. +/// If there is an existing Arrow schema metadata, it is replaced. +pub(crate) fn add_encoded_arrow_schema_to_metadata( + schema: &Schema, + props: &mut WriterProperties, +) { + let encoded = encode_arrow_schema(schema); let schema_kv = KeyValue { key: super::ARROW_SCHEMA_META_KEY.to_string(),