From 57c1b232408922cdb66bf619b906da069979849e Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 31 Mar 2020 08:36:35 -0600 Subject: [PATCH 01/11] PoC of Arrow writer for Parquet --- rust/parquet/src/file/writer.rs | 86 +++++++++++++++++++++++++++++++++ 1 file changed, 86 insertions(+) diff --git a/rust/parquet/src/file/writer.rs b/rust/parquet/src/file/writer.rs index ede6ce47a2e..1a49025ee48 100644 --- a/rust/parquet/src/file/writer.rs +++ b/rust/parquet/src/file/writer.rs @@ -538,6 +538,92 @@ mod tests { use crate::record::RowAccessor; use crate::util::{memory::ByteBufferPtr, test_common::get_temp_file}; + use arrow::array::Int32Array; + use arrow::datatypes::{DataType, Field, Schema}; + use arrow::record_batch::RecordBatch; + use std::sync::Arc; + + #[test] + fn arrow_writer() { + // define schema + let schema = Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ]); + + // create some data + let a = Int32Array::from(vec![1, 2, 3, 4, 5]); + let b = Int32Array::from(vec![1, 2, 3, 4, 5]); + + // build a record batch + let batch = + RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]) + .unwrap(); + + let file = File::create("test.parquet").unwrap(); + //let file = File::create(path)?; + + //pub fn new() { + //TODO convert Arrow schema to Parquet schema + let schema = Rc::new( + types::Type::group_type_builder("schema") + .with_fields(&mut vec![ + Rc::new( + types::Type::primitive_type_builder("a", Type::INT32) + .with_repetition(Repetition::REQUIRED) + .build() + .unwrap(), + ), + Rc::new( + types::Type::primitive_type_builder("b", Type::INT32) + .with_repetition(Repetition::REQUIRED) + .build() + .unwrap(), + ), + ]) + .build() + .unwrap(), + ); + let props = Rc::new(WriterProperties::builder().build()); + let mut file_writer = + SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap(); + + //Self { file_writer: file_writer, rows: 0 } + //} + + //pub fn write(&mut self, batch: &RecordBatch) { + let mut _rows: i64 = 0; + let mut row_group_writer = file_writer.next_row_group().unwrap(); + for i in 0..batch.schema().fields().len() { + let col_writer = row_group_writer.next_column().unwrap(); + if let Some(mut writer) = col_writer { + match writer { + ColumnWriter::Int32ColumnWriter(ref mut typed) => { + let array = batch + .column(i) + .as_any() + .downcast_ref::() + .unwrap(); + _rows += typed + .write_batch(array.value_slice(0, array.len()), None, None) + .unwrap() as i64; + } + //TODO add other types + _ => { + unimplemented!(); + } + } + row_group_writer.close_column(writer).unwrap(); + } + } + file_writer.close_row_group(row_group_writer).unwrap(); + //} + + //pub fn close(&mut self) { + file_writer.close().unwrap(); + //} + } + #[test] fn test_file_writer_error_after_close() { let file = get_temp_file("test_file_writer_error_after_close", &[]); From 41337e4cc50470524f571d823f0d3025dbdda0d1 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 31 Mar 2020 08:49:42 -0600 Subject: [PATCH 02/11] move parquet writer to struct --- rust/parquet/src/file/writer.rs | 130 ++++++++++++++++++-------------- 1 file changed, 74 insertions(+), 56 deletions(-) diff --git a/rust/parquet/src/file/writer.rs b/rust/parquet/src/file/writer.rs index 1a49025ee48..8b8ed6dcb09 100644 --- a/rust/parquet/src/file/writer.rs +++ b/rust/parquet/src/file/writer.rs @@ -18,27 +18,32 @@ //! Contains file writer API, and provides methods to write row groups and columns by //! using row group writers and column writers respectively. +use std::fs::File; use std::{ io::{Seek, SeekFrom, Write}, rc::Rc, }; +use arrow::array; +use arrow::datatypes::Schema; use byteorder::{ByteOrder, LittleEndian}; use parquet_format as parquet; use thrift::protocol::{TCompactOutputProtocol, TOutputProtocol}; -use crate::basic::PageType; +use crate::basic::{PageType, Repetition, Type}; use crate::column::{ page::{CompressedPage, Page, PageWriteSpec, PageWriter}, writer::{get_column_writer, ColumnWriter}, }; use crate::errors::{ParquetError, Result}; +use crate::file::properties::WriterProperties; use crate::file::{ metadata::*, properties::WriterPropertiesPtr, reader::TryClone, statistics::to_thrift as statistics_to_thrift, FOOTER_SIZE, PARQUET_MAGIC, }; use crate::schema::types::{self, SchemaDescPtr, SchemaDescriptor, TypePtr}; use crate::util::io::{FileSink, Position}; +use arrow::record_batch::RecordBatch; // ---------------------------------------------------------------------- // APIs for file & row group writers @@ -521,49 +526,13 @@ impl PageWriter for SerializedPageWriter { } } -#[cfg(test)] -mod tests { - use super::*; - - use std::{fs::File, io::Cursor}; - - use crate::basic::{Compression, Encoding, Repetition, Type}; - use crate::column::page::PageReader; - use crate::compression::{create_codec, Codec}; - use crate::file::{ - properties::WriterProperties, - reader::{FileReader, SerializedFileReader, SerializedPageReader}, - statistics::{from_thrift, to_thrift, Statistics}, - }; - use crate::record::RowAccessor; - use crate::util::{memory::ByteBufferPtr, test_common::get_temp_file}; - - use arrow::array::Int32Array; - use arrow::datatypes::{DataType, Field, Schema}; - use arrow::record_batch::RecordBatch; - use std::sync::Arc; - - #[test] - fn arrow_writer() { - // define schema - let schema = Schema::new(vec![ - Field::new("a", DataType::Int32, false), - Field::new("b", DataType::Int32, false), - ]); - - // create some data - let a = Int32Array::from(vec![1, 2, 3, 4, 5]); - let b = Int32Array::from(vec![1, 2, 3, 4, 5]); - - // build a record batch - let batch = - RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]) - .unwrap(); - - let file = File::create("test.parquet").unwrap(); - //let file = File::create(path)?; +struct ArrowWriter { + writer: SerializedFileWriter, + rows: i64, +} - //pub fn new() { +impl ArrowWriter { + pub fn new(file: File, _arrow_schema: &Schema) -> Self { //TODO convert Arrow schema to Parquet schema let schema = Rc::new( types::Type::group_type_builder("schema") @@ -585,15 +554,17 @@ mod tests { .unwrap(), ); let props = Rc::new(WriterProperties::builder().build()); - let mut file_writer = + let file_writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap(); - //Self { file_writer: file_writer, rows: 0 } - //} + Self { + writer: file_writer, + rows: 0, + } + } - //pub fn write(&mut self, batch: &RecordBatch) { - let mut _rows: i64 = 0; - let mut row_group_writer = file_writer.next_row_group().unwrap(); + pub fn write(&mut self, batch: &RecordBatch) { + let mut row_group_writer = self.writer.next_row_group().unwrap(); for i in 0..batch.schema().fields().len() { let col_writer = row_group_writer.next_column().unwrap(); if let Some(mut writer) = col_writer { @@ -602,9 +573,9 @@ mod tests { let array = batch .column(i) .as_any() - .downcast_ref::() + .downcast_ref::() .unwrap(); - _rows += typed + self.rows += typed .write_batch(array.value_slice(0, array.len()), None, None) .unwrap() as i64; } @@ -616,12 +587,59 @@ mod tests { row_group_writer.close_column(writer).unwrap(); } } - file_writer.close_row_group(row_group_writer).unwrap(); - //} + self.writer.close_row_group(row_group_writer).unwrap(); + } - //pub fn close(&mut self) { - file_writer.close().unwrap(); - //} + pub fn close(&mut self) { + self.writer.close().unwrap(); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use std::{fs::File, io::Cursor}; + + use crate::basic::{Compression, Encoding, Repetition, Type}; + use crate::column::page::PageReader; + use crate::compression::{create_codec, Codec}; + use crate::file::{ + properties::WriterProperties, + reader::{FileReader, SerializedFileReader, SerializedPageReader}, + statistics::{from_thrift, to_thrift, Statistics}, + }; + use crate::record::RowAccessor; + use crate::util::{memory::ByteBufferPtr, test_common::get_temp_file}; + + use arrow::array::Int32Array; + use arrow::datatypes::{DataType, Field, Schema}; + use arrow::record_batch::RecordBatch; + use std::sync::Arc; + + #[test] + fn arrow_writer() { + // define schema + let schema = Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ]); + + // create some data + let a = Int32Array::from(vec![1, 2, 3, 4, 5]); + let b = Int32Array::from(vec![1, 2, 3, 4, 5]); + + // build a record batch + let batch = RecordBatch::try_new( + Arc::new(schema.clone()), + vec![Arc::new(a), Arc::new(b)], + ) + .unwrap(); + + let file = File::create("test.parquet").unwrap(); + let mut writer = ArrowWriter::new(file, &schema); + writer.write(&batch); + writer.close(); } #[test] From 89fab669d814830ce581aa3c93068d151dc95a57 Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Fri, 15 May 2020 11:33:51 +0200 Subject: [PATCH 03/11] WIP: parquet writer updates --- rust/parquet/src/arrow/arrow_writer.rs | 120 +++++++++++++++++++++++++ rust/parquet/src/arrow/mod.rs | 5 +- rust/parquet/src/file/writer.rs | 106 +--------------------- rust/parquet/src/schema/types.rs | 6 +- 4 files changed, 130 insertions(+), 107 deletions(-) create mode 100644 rust/parquet/src/arrow/arrow_writer.rs diff --git a/rust/parquet/src/arrow/arrow_writer.rs b/rust/parquet/src/arrow/arrow_writer.rs new file mode 100644 index 00000000000..c88faa330c5 --- /dev/null +++ b/rust/parquet/src/arrow/arrow_writer.rs @@ -0,0 +1,120 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Contains writer which writes arrow data into parquet data. + +use std::fs::File; +use std::rc::Rc; + +use arrow::array; +use arrow::datatypes::Schema; +use arrow::record_batch::RecordBatch; + +use crate::column::writer::ColumnWriter; +use crate::errors::Result; +use crate::file::properties::WriterProperties; +use crate::file::writer::{FileWriter, SerializedFileWriter}; + +struct ArrowWriter { + writer: SerializedFileWriter, + rows: i64, +} + +impl ArrowWriter { + pub fn try_new(file: File, arrow_schema: &Schema) -> Result { + let schema = crate::arrow::arrow_to_parquet_schema(arrow_schema)?; + let props = Rc::new(WriterProperties::builder().build()); + let file_writer = SerializedFileWriter::new( + file.try_clone()?, + schema.root_schema_ptr(), + props, + )?; + + Ok(Self { + writer: file_writer, + rows: 0, + }) + } + + pub fn write(&mut self, batch: &RecordBatch) -> Result<()> { + let mut row_group_writer = self.writer.next_row_group()?; + for i in 0..batch.schema().fields().len() { + let col_writer = row_group_writer.next_column()?; + if let Some(mut writer) = col_writer { + match writer { + ColumnWriter::Int32ColumnWriter(ref mut typed) => { + let array = batch + .column(i) + .as_any() + .downcast_ref::() + .expect("Unable to downcast to Int32Array"); + self.rows += typed.write_batch( + array.value_slice(0, array.len()), + None, + None, + )? as i64; + } + //TODO add other types + _ => { + unimplemented!(); + } + } + row_group_writer.close_column(writer)?; + } + } + self.writer.close_row_group(row_group_writer) + } + + pub fn close(&mut self) -> Result<()> { + self.writer.close() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use arrow::array::Int32Array; + use arrow::datatypes::{DataType, Field, Schema}; + use arrow::record_batch::RecordBatch; + use std::sync::Arc; + + #[test] + fn arrow_writer() { + // define schema + let schema = Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ]); + + // create some data + let a = Int32Array::from(vec![1, 2, 3, 4, 5]); + let b = Int32Array::from(vec![1, 2, 3, 4, 5]); + + // build a record batch + let batch = RecordBatch::try_new( + Arc::new(schema.clone()), + vec![Arc::new(a), Arc::new(b)], + ) + .unwrap(); + + let file = File::create("test.parquet").unwrap(); + let mut writer = ArrowWriter::try_new(file, &schema).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + } +} diff --git a/rust/parquet/src/arrow/mod.rs b/rust/parquet/src/arrow/mod.rs index 02f50fd3a90..c8739c2e1f8 100644 --- a/rust/parquet/src/arrow/mod.rs +++ b/rust/parquet/src/arrow/mod.rs @@ -51,10 +51,13 @@ pub(in crate::arrow) mod array_reader; pub mod arrow_reader; +pub mod arrow_writer; pub(in crate::arrow) mod converter; pub(in crate::arrow) mod record_reader; pub mod schema; pub use self::arrow_reader::ArrowReader; pub use self::arrow_reader::ParquetFileArrowReader; -pub use self::schema::{parquet_to_arrow_schema, parquet_to_arrow_schema_by_columns}; +pub use self::schema::{ + arrow_to_parquet_schema, parquet_to_arrow_schema, parquet_to_arrow_schema_by_columns, +}; diff --git a/rust/parquet/src/file/writer.rs b/rust/parquet/src/file/writer.rs index 8b8ed6dcb09..ede6ce47a2e 100644 --- a/rust/parquet/src/file/writer.rs +++ b/rust/parquet/src/file/writer.rs @@ -18,32 +18,27 @@ //! Contains file writer API, and provides methods to write row groups and columns by //! using row group writers and column writers respectively. -use std::fs::File; use std::{ io::{Seek, SeekFrom, Write}, rc::Rc, }; -use arrow::array; -use arrow::datatypes::Schema; use byteorder::{ByteOrder, LittleEndian}; use parquet_format as parquet; use thrift::protocol::{TCompactOutputProtocol, TOutputProtocol}; -use crate::basic::{PageType, Repetition, Type}; +use crate::basic::PageType; use crate::column::{ page::{CompressedPage, Page, PageWriteSpec, PageWriter}, writer::{get_column_writer, ColumnWriter}, }; use crate::errors::{ParquetError, Result}; -use crate::file::properties::WriterProperties; use crate::file::{ metadata::*, properties::WriterPropertiesPtr, reader::TryClone, statistics::to_thrift as statistics_to_thrift, FOOTER_SIZE, PARQUET_MAGIC, }; use crate::schema::types::{self, SchemaDescPtr, SchemaDescriptor, TypePtr}; use crate::util::io::{FileSink, Position}; -use arrow::record_batch::RecordBatch; // ---------------------------------------------------------------------- // APIs for file & row group writers @@ -526,75 +521,6 @@ impl PageWriter for SerializedPageWriter { } } -struct ArrowWriter { - writer: SerializedFileWriter, - rows: i64, -} - -impl ArrowWriter { - pub fn new(file: File, _arrow_schema: &Schema) -> Self { - //TODO convert Arrow schema to Parquet schema - let schema = Rc::new( - types::Type::group_type_builder("schema") - .with_fields(&mut vec![ - Rc::new( - types::Type::primitive_type_builder("a", Type::INT32) - .with_repetition(Repetition::REQUIRED) - .build() - .unwrap(), - ), - Rc::new( - types::Type::primitive_type_builder("b", Type::INT32) - .with_repetition(Repetition::REQUIRED) - .build() - .unwrap(), - ), - ]) - .build() - .unwrap(), - ); - let props = Rc::new(WriterProperties::builder().build()); - let file_writer = - SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap(); - - Self { - writer: file_writer, - rows: 0, - } - } - - pub fn write(&mut self, batch: &RecordBatch) { - let mut row_group_writer = self.writer.next_row_group().unwrap(); - for i in 0..batch.schema().fields().len() { - let col_writer = row_group_writer.next_column().unwrap(); - if let Some(mut writer) = col_writer { - match writer { - ColumnWriter::Int32ColumnWriter(ref mut typed) => { - let array = batch - .column(i) - .as_any() - .downcast_ref::() - .unwrap(); - self.rows += typed - .write_batch(array.value_slice(0, array.len()), None, None) - .unwrap() as i64; - } - //TODO add other types - _ => { - unimplemented!(); - } - } - row_group_writer.close_column(writer).unwrap(); - } - } - self.writer.close_row_group(row_group_writer).unwrap(); - } - - pub fn close(&mut self) { - self.writer.close().unwrap(); - } -} - #[cfg(test)] mod tests { use super::*; @@ -612,36 +538,6 @@ mod tests { use crate::record::RowAccessor; use crate::util::{memory::ByteBufferPtr, test_common::get_temp_file}; - use arrow::array::Int32Array; - use arrow::datatypes::{DataType, Field, Schema}; - use arrow::record_batch::RecordBatch; - use std::sync::Arc; - - #[test] - fn arrow_writer() { - // define schema - let schema = Schema::new(vec![ - Field::new("a", DataType::Int32, false), - Field::new("b", DataType::Int32, false), - ]); - - // create some data - let a = Int32Array::from(vec![1, 2, 3, 4, 5]); - let b = Int32Array::from(vec![1, 2, 3, 4, 5]); - - // build a record batch - let batch = RecordBatch::try_new( - Arc::new(schema.clone()), - vec![Arc::new(a), Arc::new(b)], - ) - .unwrap(); - - let file = File::create("test.parquet").unwrap(); - let mut writer = ArrowWriter::new(file, &schema); - writer.write(&batch); - writer.close(); - } - #[test] fn test_file_writer_error_after_close() { let file = get_temp_file("test_file_writer_error_after_close", &[]); diff --git a/rust/parquet/src/schema/types.rs b/rust/parquet/src/schema/types.rs index e1227c283dc..cd748243772 100644 --- a/rust/parquet/src/schema/types.rs +++ b/rust/parquet/src/schema/types.rs @@ -788,7 +788,7 @@ impl SchemaDescriptor { result.clone() } - fn column_root_of(&self, i: usize) -> &Rc { + fn column_root_of(&self, i: usize) -> &TypePtr { assert!( i < self.leaves.len(), "Index out of bound: {} not in [0, {})", @@ -810,6 +810,10 @@ impl SchemaDescriptor { self.schema.as_ref() } + pub fn root_schema_ptr(&self) -> TypePtr { + self.schema.clone() + } + /// Returns schema name. pub fn name(&self) -> &str { self.schema.name() From 6543ef1fd7115905dda724687d8ec91aea34dd12 Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Mon, 1 Jun 2020 22:32:08 +0200 Subject: [PATCH 04/11] more progress on parquet arrow writer --- rust/parquet/src/arrow/arrow_writer.rs | 288 ++++++++++++++++++++++--- 1 file changed, 258 insertions(+), 30 deletions(-) diff --git a/rust/parquet/src/arrow/arrow_writer.rs b/rust/parquet/src/arrow/arrow_writer.rs index c88faa330c5..fe59f80ed1b 100644 --- a/rust/parquet/src/arrow/arrow_writer.rs +++ b/rust/parquet/src/arrow/arrow_writer.rs @@ -20,14 +20,18 @@ use std::fs::File; use std::rc::Rc; +use array::Array; use arrow::array; -use arrow::datatypes::Schema; +use arrow::datatypes::{DataType as ArrowDataType, Field, Schema}; use arrow::record_batch::RecordBatch; use crate::column::writer::ColumnWriter; use crate::errors::Result; use crate::file::properties::WriterProperties; -use crate::file::writer::{FileWriter, SerializedFileWriter}; +use crate::{ + data_type::*, + file::writer::{FileWriter, RowGroupWriter, SerializedFileWriter}, +}; struct ArrowWriter { writer: SerializedFileWriter, @@ -52,30 +56,13 @@ impl ArrowWriter { pub fn write(&mut self, batch: &RecordBatch) -> Result<()> { let mut row_group_writer = self.writer.next_row_group()?; - for i in 0..batch.schema().fields().len() { - let col_writer = row_group_writer.next_column()?; - if let Some(mut writer) = col_writer { - match writer { - ColumnWriter::Int32ColumnWriter(ref mut typed) => { - let array = batch - .column(i) - .as_any() - .downcast_ref::() - .expect("Unable to downcast to Int32Array"); - self.rows += typed.write_batch( - array.value_slice(0, array.len()), - None, - None, - )? as i64; - } - //TODO add other types - _ => { - unimplemented!(); - } - } - row_group_writer.close_column(writer)?; - } - } + self.rows += unnest_arrays_to_leaves( + &mut row_group_writer, + batch.schema().fields(), + batch.columns(), + &vec![1i16; batch.num_rows()][..], + 0, + )?; self.writer.close_row_group(row_group_writer) } @@ -84,26 +71,196 @@ impl ArrowWriter { } } +/// Write nested arrays by traversing into structs and lists until primitive +/// arrays are found. +fn unnest_arrays_to_leaves( + row_group_writer: &mut Box, + // The fields from the record batch or struct + fields: &Vec, + // The columns from record batch or struct, must have same length as fields + columns: &[array::ArrayRef], + // The parent mask, in the case of a struct, this represents which values + // of the struct are true (1) or false(0). + // This is useful to respect the definition level of structs where all values are null in a row + parent_mask: &[i16], + // The current level that is being read at + level: i16, +) -> Result { + let mut rows_written = 0; + for (field, column) in fields.iter().zip(columns) { + match field.data_type() { + ArrowDataType::List(_dtype) => unimplemented!("list not yet implemented"), + ArrowDataType::FixedSizeList(_, _) => { + unimplemented!("fsl not yet implemented") + } + ArrowDataType::Struct(fields) => { + // fields in a struct should recursively be written out + let array = column + .as_any() + .downcast_ref::() + .expect("Unable to get struct array"); + let mut null_mask = Vec::with_capacity(array.len()); + for i in 0..array.len() { + null_mask.push(array.is_valid(i) as i16); + } + rows_written += unnest_arrays_to_leaves( + row_group_writer, + fields, + &array.columns_ref()[..], + &null_mask[..], + // if the field is nullable, we have to increment level + level + field.is_nullable() as i16, + )?; + } + ArrowDataType::Null => unimplemented!(), + ArrowDataType::Boolean + | ArrowDataType::Int8 + | ArrowDataType::Int16 + | ArrowDataType::Int32 + | ArrowDataType::Int64 + | ArrowDataType::UInt8 + | ArrowDataType::UInt16 + | ArrowDataType::UInt32 + | ArrowDataType::UInt64 + | ArrowDataType::Float16 + | ArrowDataType::Float32 + | ArrowDataType::Float64 + | ArrowDataType::Timestamp(_, _) + | ArrowDataType::Date32(_) + | ArrowDataType::Date64(_) + | ArrowDataType::Time32(_) + | ArrowDataType::Time64(_) + | ArrowDataType::Duration(_) + | ArrowDataType::Interval(_) + | ArrowDataType::Binary + | ArrowDataType::FixedSizeBinary(_) + | ArrowDataType::Utf8 => { + let col_writer = row_group_writer.next_column()?; + if let Some(mut writer) = col_writer { + // write_column + rows_written += + write_column(&mut writer, column, level, parent_mask)? as i64; + row_group_writer.close_column(writer)?; + } else { + panic!("No writer found") + } + } + ArrowDataType::Union(_) => unimplemented!(), + ArrowDataType::Dictionary(_, _) => unimplemented!(), + } + } + Ok(rows_written) +} + +/// Write column to writer +fn write_column( + writer: &mut ColumnWriter, + column: &array::ArrayRef, + level: i16, + parent_levels: &[i16], +) -> Result { + match writer { + ColumnWriter::Int32ColumnWriter(ref mut typed) => { + let array = array::Int32Array::from(column.data()); + typed.write_batch( + get_numeric_array_slice::(&array).as_slice(), + Some(get_primitive_def_levels(column, level, parent_levels).as_slice()), + None, + ) + } + ColumnWriter::BoolColumnWriter(ref mut _typed) => unimplemented!(), + ColumnWriter::Int64ColumnWriter(ref mut typed) => { + let array = array::Int64Array::from(column.data()); + typed.write_batch( + get_numeric_array_slice::(&array).as_slice(), + Some(get_primitive_def_levels(column, level, parent_levels).as_slice()), + None, + ) + } + ColumnWriter::Int96ColumnWriter(ref mut _typed) => unimplemented!(), + ColumnWriter::FloatColumnWriter(ref mut typed) => { + let array = array::Float32Array::from(column.data()); + typed.write_batch( + get_numeric_array_slice::(&array).as_slice(), + Some(get_primitive_def_levels(column, level, parent_levels).as_slice()), + None, + ) + } + ColumnWriter::DoubleColumnWriter(ref mut typed) => { + let array = array::Float64Array::from(column.data()); + typed.write_batch( + get_numeric_array_slice::(&array).as_slice(), + Some(get_primitive_def_levels(column, level, parent_levels).as_slice()), + None, + ) + } + ColumnWriter::ByteArrayColumnWriter(ref mut _typed) => unimplemented!(), + ColumnWriter::FixedLenByteArrayColumnWriter(ref mut _typed) => unimplemented!(), + } +} + +/// Get the definition levels of the numeric array, with level 0 being null and 1 being not null +/// In the case where the array in question is a child of either a list or struct, the levels +/// are incremented in accordance with the `level` parameter. +/// Parent levels are either 0 or 1, and are used to higher (correct terminology?) leaves as null +fn get_primitive_def_levels( + array: &array::ArrayRef, + level: i16, + parent_levels: &[i16], +) -> Vec { + // convince the compiler that bounds are fine + let len = array.len(); + assert_eq!( + len, + parent_levels.len(), + "Parent definition levels must equal array length" + ); + let levels = (0..len) + .map(|index| (array.is_valid(index) as i16 + level) * parent_levels[index]) + .collect(); + levels +} + +/// Get the underlying numeric array slice, skipping any null values. +/// If there are no null values, the entire slice is returned, +/// thus this should only be called when there are null values. +fn get_numeric_array_slice(array: &array::PrimitiveArray) -> Vec +where + T: DataType, + A: arrow::datatypes::ArrowNumericType, + T::T: From, +{ + let mut values = Vec::with_capacity(array.len() - array.null_count()); + for i in 0..array.len() { + if array.is_valid(i) { + values.push(array.value(i).into()) + } + } + values +} + #[cfg(test)] mod tests { use super::*; - use arrow::array::Int32Array; + use std::sync::Arc; + + use arrow::array::*; + use arrow::datatypes::ToByteSlice; use arrow::datatypes::{DataType, Field, Schema}; use arrow::record_batch::RecordBatch; - use std::sync::Arc; #[test] fn arrow_writer() { // define schema let schema = Schema::new(vec![ Field::new("a", DataType::Int32, false), - Field::new("b", DataType::Int32, false), + Field::new("b", DataType::Int32, true), ]); // create some data let a = Int32Array::from(vec![1, 2, 3, 4, 5]); - let b = Int32Array::from(vec![1, 2, 3, 4, 5]); + let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]); // build a record batch let batch = RecordBatch::try_new( @@ -117,4 +274,75 @@ mod tests { writer.write(&batch).unwrap(); writer.close().unwrap(); } + + #[test] + fn arrow_writer_complex() { + // define schema + let struct_field_d = Field::new("d", DataType::Float64, true); + let struct_field_f = Field::new("f", DataType::Float32, true); + let struct_field_g = + Field::new("g", DataType::List(Box::new(DataType::Boolean)), false); + let struct_field_e = Field::new( + "e", + DataType::Struct(vec![ + struct_field_f.clone(), + // struct_field_g.clone() + ]), + true, + ); + let schema = Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, true), + Field::new( + "c", + DataType::Struct(vec![struct_field_d.clone(), struct_field_e.clone()]), + false, + ), + ]); + + // create some data + let a = Int32Array::from(vec![1, 2, 3, 4, 5]); + let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]); + let d = Float64Array::from(vec![None, None, None, Some(1.0), None]); + let f = Float32Array::from(vec![Some(0.0), None, Some(333.3), None, Some(5.25)]); + + let g_value = BooleanArray::from(vec![ + false, true, false, true, false, true, false, true, false, true, + ]); + + // Construct a buffer for value offsets, for the nested array: + // [[false], [true, false], null, [true, false, true], [false, true, false, true]] + let g_value_offsets = + arrow::buffer::Buffer::from(&[0, 1, 3, 3, 6, 10].to_byte_slice()); + + // Construct a list array from the above two + let g_list_data = ArrayData::builder(struct_field_g.data_type().clone()) + .len(5) + .add_buffer(g_value_offsets.clone()) + .add_child_data(g_value.data()) + .build(); + let _g = ListArray::from(g_list_data); + + let e = StructArray::from(vec![ + (struct_field_f, Arc::new(f) as ArrayRef), + // (struct_field_g, Arc::new(g) as ArrayRef), + ]); + + let c = StructArray::from(vec![ + (struct_field_d, Arc::new(d) as ArrayRef), + (struct_field_e, Arc::new(e) as ArrayRef), + ]); + + // build a record batch + let batch = RecordBatch::try_new( + Arc::new(schema.clone()), + vec![Arc::new(a), Arc::new(b), Arc::new(c)], + ) + .unwrap(); + + let file = File::create("test_complex.parquet").unwrap(); + let mut writer = ArrowWriter::try_new(file, &schema).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + } } From 9056b76c71288e38be19c539e5c775f6999922e1 Mon Sep 17 00:00:00 2001 From: Max Burke Date: Tue, 4 Aug 2020 09:49:57 -0700 Subject: [PATCH 05/11] Need suport for LargeX types --- rust/parquet/src/arrow/arrow_writer.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/rust/parquet/src/arrow/arrow_writer.rs b/rust/parquet/src/arrow/arrow_writer.rs index fe59f80ed1b..a3565e385ae 100644 --- a/rust/parquet/src/arrow/arrow_writer.rs +++ b/rust/parquet/src/arrow/arrow_writer.rs @@ -90,6 +90,7 @@ fn unnest_arrays_to_leaves( for (field, column) in fields.iter().zip(columns) { match field.data_type() { ArrowDataType::List(_dtype) => unimplemented!("list not yet implemented"), + ArrowDataType::LargeList(_dtype) => unimplemented!("largelist not yet implemented"), ArrowDataType::FixedSizeList(_, _) => { unimplemented!("fsl not yet implemented") } @@ -134,7 +135,9 @@ fn unnest_arrays_to_leaves( | ArrowDataType::Interval(_) | ArrowDataType::Binary | ArrowDataType::FixedSizeBinary(_) - | ArrowDataType::Utf8 => { + | ArrowDataType::Utf8 + | ArrowDataType::LargeBinary + | ArrowDataType::LargeUtf8 => { let col_writer = row_group_writer.next_column()?; if let Some(mut writer) = col_writer { // write_column From 6e9f7131cd30ec79dcad06ad90a231bb896d1c6d Mon Sep 17 00:00:00 2001 From: Max Burke Date: Tue, 4 Aug 2020 10:31:44 -0700 Subject: [PATCH 06/11] ArrowWriter needs to be public, allow passing in of props --- rust/parquet/src/arrow/arrow_writer.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/rust/parquet/src/arrow/arrow_writer.rs b/rust/parquet/src/arrow/arrow_writer.rs index a3565e385ae..00f9ed36aee 100644 --- a/rust/parquet/src/arrow/arrow_writer.rs +++ b/rust/parquet/src/arrow/arrow_writer.rs @@ -33,15 +33,18 @@ use crate::{ file::writer::{FileWriter, RowGroupWriter, SerializedFileWriter}, }; -struct ArrowWriter { +pub struct ArrowWriter { writer: SerializedFileWriter, rows: i64, } impl ArrowWriter { - pub fn try_new(file: File, arrow_schema: &Schema) -> Result { + pub fn try_new(file: File, arrow_schema: &Schema, props: Option>) -> Result { let schema = crate::arrow::arrow_to_parquet_schema(arrow_schema)?; - let props = Rc::new(WriterProperties::builder().build()); + let props = match props { + Some(props) => props, + None => Rc::new(WriterProperties::builder().build()), + }; let file_writer = SerializedFileWriter::new( file.try_clone()?, schema.root_schema_ptr(), From 3c8f105b8405dfa415a46afc7fe912a8ce72dd4d Mon Sep 17 00:00:00 2001 From: Max Burke Date: Tue, 4 Aug 2020 13:40:46 -0700 Subject: [PATCH 07/11] Use ParquetWriter interface instead of std::fs::File --- rust/parquet/src/arrow/arrow_writer.rs | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/rust/parquet/src/arrow/arrow_writer.rs b/rust/parquet/src/arrow/arrow_writer.rs index 00f9ed36aee..1d7ee2e5b8e 100644 --- a/rust/parquet/src/arrow/arrow_writer.rs +++ b/rust/parquet/src/arrow/arrow_writer.rs @@ -17,7 +17,6 @@ //! Contains writer which writes arrow data into parquet data. -use std::fs::File; use std::rc::Rc; use array::Array; @@ -30,23 +29,27 @@ use crate::errors::Result; use crate::file::properties::WriterProperties; use crate::{ data_type::*, - file::writer::{FileWriter, RowGroupWriter, SerializedFileWriter}, + file::writer::{FileWriter, ParquetWriter, RowGroupWriter, SerializedFileWriter}, }; -pub struct ArrowWriter { - writer: SerializedFileWriter, +pub struct ArrowWriter { + writer: SerializedFileWriter, rows: i64, } -impl ArrowWriter { - pub fn try_new(file: File, arrow_schema: &Schema, props: Option>) -> Result { +impl ArrowWriter { + pub fn try_new( + writer: W, + arrow_schema: &Schema, + props: Option>, + ) -> Result { let schema = crate::arrow::arrow_to_parquet_schema(arrow_schema)?; let props = match props { Some(props) => props, None => Rc::new(WriterProperties::builder().build()), }; let file_writer = SerializedFileWriter::new( - file.try_clone()?, + writer.try_clone()?, schema.root_schema_ptr(), props, )?; @@ -93,7 +96,9 @@ fn unnest_arrays_to_leaves( for (field, column) in fields.iter().zip(columns) { match field.data_type() { ArrowDataType::List(_dtype) => unimplemented!("list not yet implemented"), - ArrowDataType::LargeList(_dtype) => unimplemented!("largelist not yet implemented"), + ArrowDataType::LargeList(_dtype) => { + unimplemented!("largelist not yet implemented") + } ArrowDataType::FixedSizeList(_, _) => { unimplemented!("fsl not yet implemented") } From a5a80718b63f66865f7fc43a08599552a7427633 Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Fri, 7 Aug 2020 12:44:14 +0200 Subject: [PATCH 08/11] fix tests --- rust/parquet/src/arrow/arrow_writer.rs | 20 +++++++++++--------- rust/parquet/src/column/writer.rs | 2 -- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/rust/parquet/src/arrow/arrow_writer.rs b/rust/parquet/src/arrow/arrow_writer.rs index 1d7ee2e5b8e..127b629e2b3 100644 --- a/rust/parquet/src/arrow/arrow_writer.rs +++ b/rust/parquet/src/arrow/arrow_writer.rs @@ -82,7 +82,7 @@ impl ArrowWriter { fn unnest_arrays_to_leaves( row_group_writer: &mut Box, // The fields from the record batch or struct - fields: &Vec, + fields: &[Field], // The columns from record batch or struct, must have same length as fields columns: &[array::ArrayRef], // The parent mask, in the case of a struct, this represents which values @@ -226,10 +226,10 @@ fn get_primitive_def_levels( parent_levels.len(), "Parent definition levels must equal array length" ); - let levels = (0..len) + + (0..len) .map(|index| (array.is_valid(index) as i16 + level) * parent_levels[index]) - .collect(); - levels + .collect() } /// Get the underlying numeric array slice, skipping any null values. @@ -261,6 +261,8 @@ mod tests { use arrow::datatypes::{DataType, Field, Schema}; use arrow::record_batch::RecordBatch; + use crate::util::test_common::get_temp_file; + #[test] fn arrow_writer() { // define schema @@ -280,8 +282,8 @@ mod tests { ) .unwrap(); - let file = File::create("test.parquet").unwrap(); - let mut writer = ArrowWriter::try_new(file, &schema).unwrap(); + let file = get_temp_file("test_arrow_writer.parquet", &[]); + let mut writer = ArrowWriter::try_new(file, &schema, None).unwrap(); writer.write(&batch).unwrap(); writer.close().unwrap(); } @@ -329,7 +331,7 @@ mod tests { // Construct a list array from the above two let g_list_data = ArrayData::builder(struct_field_g.data_type().clone()) .len(5) - .add_buffer(g_value_offsets.clone()) + .add_buffer(g_value_offsets) .add_child_data(g_value.data()) .build(); let _g = ListArray::from(g_list_data); @@ -351,8 +353,8 @@ mod tests { ) .unwrap(); - let file = File::create("test_complex.parquet").unwrap(); - let mut writer = ArrowWriter::try_new(file, &schema).unwrap(); + let file = get_temp_file("test_arrow_writer_complex.parquet", &[]); + let mut writer = ArrowWriter::try_new(file, &schema, None).unwrap(); writer.write(&batch).unwrap(); writer.close().unwrap(); } diff --git a/rust/parquet/src/column/writer.rs b/rust/parquet/src/column/writer.rs index f26c37bc2a4..c4f60eed187 100644 --- a/rust/parquet/src/column/writer.rs +++ b/rust/parquet/src/column/writer.rs @@ -57,7 +57,6 @@ pub enum Level { macro_rules! gen_stats_section { ($physical_ty: ty, $stat_fn: ident, $min: ident, $max: ident, $distinct: ident, $nulls: ident) => {{ let min = $min.as_ref().and_then(|v| { - println!("min: {:?} {}", &v.as_bytes(), v.as_bytes().len()); Some(read_num_bytes!( $physical_ty, v.as_bytes().len(), @@ -65,7 +64,6 @@ macro_rules! gen_stats_section { )) }); let max = $max.as_ref().and_then(|v| { - println!("max: {:?} {}", &v.as_bytes(), v.as_bytes().len()); Some(read_num_bytes!( $physical_ty, v.as_bytes().len(), From 2573302cfbc113f4f0d5d9c10b503bf7fea76471 Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Sat, 8 Aug 2020 12:49:47 +0200 Subject: [PATCH 09/11] Various changes * compute def and rep levels for all arrays in a batch at once * support arbitrary nesting of lists and structs * add doc comments on structs and functions --- rust/arrow/src/array/mod.rs | 2 +- rust/parquet/src/arrow/arrow_writer.rs | 538 ++++++++++++++++++------- 2 files changed, 387 insertions(+), 153 deletions(-) diff --git a/rust/arrow/src/array/mod.rs b/rust/arrow/src/array/mod.rs index 3abc1423e4a..68a2dd602d0 100644 --- a/rust/arrow/src/array/mod.rs +++ b/rust/arrow/src/array/mod.rs @@ -115,7 +115,7 @@ pub use self::array::StructArray; pub use self::null::NullArray; pub use self::union::UnionArray; -pub(crate) use self::array::make_array; +pub use self::array::make_array; pub type BooleanArray = PrimitiveArray; pub type Int8Array = PrimitiveArray; diff --git a/rust/parquet/src/arrow/arrow_writer.rs b/rust/parquet/src/arrow/arrow_writer.rs index 127b629e2b3..1d6126997c7 100644 --- a/rust/parquet/src/arrow/arrow_writer.rs +++ b/rust/parquet/src/arrow/arrow_writer.rs @@ -19,31 +19,43 @@ use std::rc::Rc; -use array::Array; -use arrow::array; -use arrow::datatypes::{DataType as ArrowDataType, Field, Schema}; +use arrow::array as arrow_array; +use arrow::datatypes::{DataType as ArrowDataType, SchemaRef}; use arrow::record_batch::RecordBatch; +use arrow_array::Array; use crate::column::writer::ColumnWriter; -use crate::errors::Result; +use crate::errors::{ParquetError, Result}; use crate::file::properties::WriterProperties; use crate::{ data_type::*, file::writer::{FileWriter, ParquetWriter, RowGroupWriter, SerializedFileWriter}, }; +/// Arrow writer +/// +/// Writes Arrow `RecordBatch`es to a Parquet writer pub struct ArrowWriter { + /// Underlying Parquet writer writer: SerializedFileWriter, - rows: i64, + /// A copy of the Arrow schema. + /// + /// The schema is used to verify that each record batch written has the correct schema + arrow_schema: SchemaRef, } impl ArrowWriter { + /// Try to create a new Arrow writer + /// + /// The writer will fail if: + /// * a `SerializedFileWriter` cannot be created from the ParquetWriter + /// * the Arrow schema contains unsupported datatypes such as Unions pub fn try_new( writer: W, - arrow_schema: &Schema, + arrow_schema: SchemaRef, props: Option>, ) -> Result { - let schema = crate::arrow::arrow_to_parquet_schema(arrow_schema)?; + let schema = crate::arrow::arrow_to_parquet_schema(&arrow_schema)?; let props = match props { Some(props) => props, None => Rc::new(WriterProperties::builder().build()), @@ -56,157 +68,349 @@ impl ArrowWriter { Ok(Self { writer: file_writer, - rows: 0, + arrow_schema, }) } + /// Write a RecordBatch to writer + /// + /// *NOTE:* The writer currently does not support all Arrow data types pub fn write(&mut self, batch: &RecordBatch) -> Result<()> { + // validate batch schema against writer's supplied schema + if self.arrow_schema != batch.schema() { + return Err(ParquetError::ArrowError( + "Record batch schema does not match writer schema".to_string(), + )); + } + // compute the definition and repetition levels of the batch + let mut levels = vec![]; + batch.columns().iter().for_each(|array| { + let mut array_levels = + get_levels(array, 0, &vec![1i16; batch.num_rows()][..], None); + levels.append(&mut array_levels); + }); + // reverse levels so we can use Vec::pop(&mut self) + levels.reverse(); + let mut row_group_writer = self.writer.next_row_group()?; - self.rows += unnest_arrays_to_leaves( - &mut row_group_writer, - batch.schema().fields(), - batch.columns(), - &vec![1i16; batch.num_rows()][..], - 0, - )?; + + // write leaves + for column in batch.columns() { + write_leaves(&mut row_group_writer, column, &mut levels)?; + } + self.writer.close_row_group(row_group_writer) } + /// Close and finalise the underlying Parquet writer pub fn close(&mut self) -> Result<()> { self.writer.close() } } -/// Write nested arrays by traversing into structs and lists until primitive -/// arrays are found. -fn unnest_arrays_to_leaves( +/// Convenience method to get the next ColumnWriter from the RowGroupWriter +#[inline] +#[allow(clippy::borrowed_box)] +fn get_col_writer( row_group_writer: &mut Box, - // The fields from the record batch or struct - fields: &[Field], - // The columns from record batch or struct, must have same length as fields - columns: &[array::ArrayRef], - // The parent mask, in the case of a struct, this represents which values - // of the struct are true (1) or false(0). - // This is useful to respect the definition level of structs where all values are null in a row - parent_mask: &[i16], - // The current level that is being read at - level: i16, -) -> Result { - let mut rows_written = 0; - for (field, column) in fields.iter().zip(columns) { - match field.data_type() { - ArrowDataType::List(_dtype) => unimplemented!("list not yet implemented"), - ArrowDataType::LargeList(_dtype) => { - unimplemented!("largelist not yet implemented") - } - ArrowDataType::FixedSizeList(_, _) => { - unimplemented!("fsl not yet implemented") - } - ArrowDataType::Struct(fields) => { - // fields in a struct should recursively be written out - let array = column - .as_any() - .downcast_ref::() - .expect("Unable to get struct array"); - let mut null_mask = Vec::with_capacity(array.len()); - for i in 0..array.len() { - null_mask.push(array.is_valid(i) as i16); - } - rows_written += unnest_arrays_to_leaves( - row_group_writer, - fields, - &array.columns_ref()[..], - &null_mask[..], - // if the field is nullable, we have to increment level - level + field.is_nullable() as i16, - )?; - } - ArrowDataType::Null => unimplemented!(), - ArrowDataType::Boolean - | ArrowDataType::Int8 - | ArrowDataType::Int16 - | ArrowDataType::Int32 - | ArrowDataType::Int64 - | ArrowDataType::UInt8 - | ArrowDataType::UInt16 - | ArrowDataType::UInt32 - | ArrowDataType::UInt64 - | ArrowDataType::Float16 - | ArrowDataType::Float32 - | ArrowDataType::Float64 - | ArrowDataType::Timestamp(_, _) - | ArrowDataType::Date32(_) - | ArrowDataType::Date64(_) - | ArrowDataType::Time32(_) - | ArrowDataType::Time64(_) - | ArrowDataType::Duration(_) - | ArrowDataType::Interval(_) - | ArrowDataType::Binary - | ArrowDataType::FixedSizeBinary(_) - | ArrowDataType::Utf8 - | ArrowDataType::LargeBinary - | ArrowDataType::LargeUtf8 => { - let col_writer = row_group_writer.next_column()?; - if let Some(mut writer) = col_writer { - // write_column - rows_written += - write_column(&mut writer, column, level, parent_mask)? as i64; - row_group_writer.close_column(writer)?; - } else { - panic!("No writer found") - } +) -> Result { + let col_writer = row_group_writer + .next_column()? + .expect("Unable to get column writer"); + Ok(col_writer) +} + +#[allow(clippy::borrowed_box)] +fn write_leaves( + mut row_group_writer: &mut Box, + array: &arrow_array::ArrayRef, + mut levels: &mut Vec, +) -> Result<()> { + match array.data_type() { + ArrowDataType::Int8 + | ArrowDataType::Int16 + | ArrowDataType::Int32 + | ArrowDataType::Int64 + | ArrowDataType::UInt8 + | ArrowDataType::UInt16 + | ArrowDataType::UInt32 + | ArrowDataType::UInt64 + | ArrowDataType::Float16 + | ArrowDataType::Float32 + | ArrowDataType::Float64 + | ArrowDataType::Timestamp(_, _) + | ArrowDataType::Date32(_) + | ArrowDataType::Date64(_) + | ArrowDataType::Time32(_) + | ArrowDataType::Time64(_) + | ArrowDataType::Duration(_) + | ArrowDataType::Interval(_) => { + let mut col_writer = get_col_writer(&mut row_group_writer)?; + write_leaf( + &mut col_writer, + array, + levels.pop().expect("Levels exhausted"), + )?; + row_group_writer.close_column(col_writer)?; + Ok(()) + } + ArrowDataType::List(_) | ArrowDataType::LargeList(_) => { + // write the child list + let data = array.data(); + let child_array = arrow_array::make_array(data.child_data()[0].clone()); + write_leaves(&mut row_group_writer, &child_array, &mut levels)?; + Ok(()) + } + ArrowDataType::Struct(_) => { + let struct_array: &arrow_array::StructArray = array + .as_any() + .downcast_ref::() + .expect("Unable to get struct array"); + for field in struct_array.columns() { + write_leaves(&mut row_group_writer, field, &mut levels)?; } - ArrowDataType::Union(_) => unimplemented!(), - ArrowDataType::Dictionary(_, _) => unimplemented!(), + Ok(()) } + ArrowDataType::FixedSizeList(_, _) + | ArrowDataType::Null + | ArrowDataType::Boolean + | ArrowDataType::FixedSizeBinary(_) + | ArrowDataType::LargeBinary + | ArrowDataType::Binary + | ArrowDataType::Utf8 + | ArrowDataType::LargeUtf8 + | ArrowDataType::Union(_) + | ArrowDataType::Dictionary(_, _) => Err(ParquetError::NYI( + "Attempting to write an Arrow type that is not yet implemented".to_string(), + )), } - Ok(rows_written) } -/// Write column to writer -fn write_column( +fn write_leaf( writer: &mut ColumnWriter, - column: &array::ArrayRef, - level: i16, - parent_levels: &[i16], -) -> Result { - match writer { + column: &arrow_array::ArrayRef, + levels: Levels, +) -> Result { + let written = match writer { ColumnWriter::Int32ColumnWriter(ref mut typed) => { - let array = array::Int32Array::from(column.data()); + let array = arrow::compute::cast(column, &ArrowDataType::Int32)?; + let array = array + .as_any() + .downcast_ref::() + .expect("Unable to get int32 array"); typed.write_batch( get_numeric_array_slice::(&array).as_slice(), - Some(get_primitive_def_levels(column, level, parent_levels).as_slice()), - None, - ) + Some(levels.definition.as_slice()), + levels.repetition.as_deref(), + )? + } + ColumnWriter::BoolColumnWriter(ref mut _typed) => { + unreachable!("Currently unreachable because data type not supported") } - ColumnWriter::BoolColumnWriter(ref mut _typed) => unimplemented!(), ColumnWriter::Int64ColumnWriter(ref mut typed) => { - let array = array::Int64Array::from(column.data()); + let array = arrow_array::Int64Array::from(column.data()); typed.write_batch( get_numeric_array_slice::(&array).as_slice(), - Some(get_primitive_def_levels(column, level, parent_levels).as_slice()), - None, - ) + Some(levels.definition.as_slice()), + levels.repetition.as_deref(), + )? + } + ColumnWriter::Int96ColumnWriter(ref mut _typed) => { + unreachable!("Currently unreachable because data type not supported") } - ColumnWriter::Int96ColumnWriter(ref mut _typed) => unimplemented!(), ColumnWriter::FloatColumnWriter(ref mut typed) => { - let array = array::Float32Array::from(column.data()); + let array = arrow_array::Float32Array::from(column.data()); typed.write_batch( get_numeric_array_slice::(&array).as_slice(), - Some(get_primitive_def_levels(column, level, parent_levels).as_slice()), - None, - ) + Some(levels.definition.as_slice()), + levels.repetition.as_deref(), + )? } ColumnWriter::DoubleColumnWriter(ref mut typed) => { - let array = array::Float64Array::from(column.data()); + let array = arrow_array::Float64Array::from(column.data()); typed.write_batch( get_numeric_array_slice::(&array).as_slice(), - Some(get_primitive_def_levels(column, level, parent_levels).as_slice()), - None, - ) + Some(levels.definition.as_slice()), + levels.repetition.as_deref(), + )? + } + ColumnWriter::ByteArrayColumnWriter(ref mut _typed) => { + unreachable!("Currently unreachable because data type not supported") + } + ColumnWriter::FixedLenByteArrayColumnWriter(ref mut _typed) => { + unreachable!("Currently unreachable because data type not supported") + } + }; + Ok(written as i64) +} + +/// A struct that repreesnts definition and repetition levels. +/// Repetition levels are only populated if the parent or current leaf is repeated +#[derive(Debug)] +struct Levels { + definition: Vec, + repetition: Option>, +} + +/// Compute nested levels of the Arrow array, recursing into lists and structs +fn get_levels( + array: &arrow_array::ArrayRef, + level: i16, + parent_def_levels: &[i16], + parent_rep_levels: Option<&[i16]>, +) -> Vec { + match array.data_type() { + ArrowDataType::Null => unimplemented!(), + ArrowDataType::Boolean + | ArrowDataType::Int8 + | ArrowDataType::Int16 + | ArrowDataType::Int32 + | ArrowDataType::Int64 + | ArrowDataType::UInt8 + | ArrowDataType::UInt16 + | ArrowDataType::UInt32 + | ArrowDataType::UInt64 + | ArrowDataType::Float16 + | ArrowDataType::Float32 + | ArrowDataType::Float64 + | ArrowDataType::Utf8 + | ArrowDataType::LargeUtf8 + | ArrowDataType::Timestamp(_, _) + | ArrowDataType::Date32(_) + | ArrowDataType::Date64(_) + | ArrowDataType::Time32(_) + | ArrowDataType::Time64(_) + | ArrowDataType::Duration(_) + | ArrowDataType::Interval(_) => vec![Levels { + definition: get_primitive_def_levels(array, parent_def_levels), + repetition: None, + }], + ArrowDataType::Binary => unimplemented!(), + ArrowDataType::FixedSizeBinary(_) => unimplemented!(), + ArrowDataType::LargeBinary => unimplemented!(), + ArrowDataType::List(_) | ArrowDataType::LargeList(_) => { + // a list can either be nested or flat. If it is flat, def and rep lengths will be the length of the list's items + let array_data = array.data(); + let child_data = array_data.child_data().get(0).unwrap(); + // get offsets, accounting for large offsets if present + let offsets: Vec = { + if let ArrowDataType::LargeList(_) = array.data_type() { + unsafe { + array_data.buffers()[0].typed_data::() // TODO: offsets + } + .to_vec() + } else { + let offsets = unsafe { + array_data.buffers()[0].typed_data::() // TODO: offsets + }; + offsets.to_vec().into_iter().map(|v| v as i64).collect() + } + }; + let child_array = arrow_array::make_array(child_data.clone()); + + let mut list_def_levels = Vec::with_capacity(child_array.len()); + let mut list_rep_levels = Vec::with_capacity(child_array.len()); + let rep_levels: Vec = parent_rep_levels + .map(|l| l.to_vec()) + .unwrap_or_else(|| vec![0i16; parent_def_levels.len()]); + parent_def_levels + .iter() + .zip(rep_levels) + .zip(offsets.windows(2)) + .for_each(|((parent_def_level, parent_rep_level), window)| { + if *parent_def_level == 0 { + // parent is null, list element must also be null + list_def_levels.push(0); + list_rep_levels.push(0); + } else { + // parent is not null, check if list is empty or null + let start = window[0]; + let end = window[1]; + let len = end - start; + if len == 0 { + list_def_levels.push(*parent_def_level - 1); + list_rep_levels.push(parent_rep_level); + } else { + list_def_levels.push(*parent_def_level); + list_rep_levels.push(parent_rep_level); + for _ in 1..len { + list_def_levels.push(*parent_def_level); + list_rep_levels.push(parent_rep_level + 1); + } + } + } + }); + + // if datatype is a primitive, we can construct levels of the child array + match child_array.data_type() { + ArrowDataType::Null => unimplemented!(), + ArrowDataType::Boolean => unimplemented!(), + ArrowDataType::Int8 + | ArrowDataType::Int16 + | ArrowDataType::Int32 + | ArrowDataType::Int64 + | ArrowDataType::UInt8 + | ArrowDataType::UInt16 + | ArrowDataType::UInt32 + | ArrowDataType::UInt64 + | ArrowDataType::Float16 + | ArrowDataType::Float32 + | ArrowDataType::Float64 + | ArrowDataType::Timestamp(_, _) + | ArrowDataType::Date32(_) + | ArrowDataType::Date64(_) + | ArrowDataType::Time32(_) + | ArrowDataType::Time64(_) + | ArrowDataType::Duration(_) + | ArrowDataType::Interval(_) => { + let def_levels = + get_primitive_def_levels(&child_array, &list_def_levels[..]); + vec![Levels { + definition: def_levels, + repetition: Some(list_rep_levels), + }] + } + ArrowDataType::Binary + | ArrowDataType::Utf8 + | ArrowDataType::LargeUtf8 => unimplemented!(), + ArrowDataType::FixedSizeBinary(_) => unimplemented!(), + ArrowDataType::LargeBinary => unimplemented!(), + ArrowDataType::List(_) | ArrowDataType::LargeList(_) => { + // nested list + unimplemented!() + } + ArrowDataType::FixedSizeList(_, _) => unimplemented!(), + ArrowDataType::Struct(_) => get_levels( + array, + level + 1, // indicates a nesting level of 2 (list + struct) + &list_def_levels[..], + Some(&list_rep_levels[..]), + ), + ArrowDataType::Union(_) => unimplemented!(), + ArrowDataType::Dictionary(_, _) => unimplemented!(), + } + } + ArrowDataType::FixedSizeList(_, _) => unimplemented!(), + ArrowDataType::Struct(_) => { + let struct_array: &arrow_array::StructArray = array + .as_any() + .downcast_ref::() + .expect("Unable to get struct array"); + let mut struct_def_levels = Vec::with_capacity(struct_array.len()); + for i in 0..array.len() { + struct_def_levels.push(level + struct_array.is_valid(i) as i16); + } + // trying to create levels for struct's fields + let mut struct_levels = vec![]; + struct_array.columns().into_iter().for_each(|col| { + let mut levels = + get_levels(col, level + 1, &struct_def_levels[..], parent_rep_levels); + struct_levels.append(&mut levels); + }); + struct_levels } - ColumnWriter::ByteArrayColumnWriter(ref mut _typed) => unimplemented!(), - ColumnWriter::FixedLenByteArrayColumnWriter(ref mut _typed) => unimplemented!(), + ArrowDataType::Union(_) => unimplemented!(), + ArrowDataType::Dictionary(_, _) => unimplemented!(), } } @@ -215,27 +419,27 @@ fn write_column( /// are incremented in accordance with the `level` parameter. /// Parent levels are either 0 or 1, and are used to higher (correct terminology?) leaves as null fn get_primitive_def_levels( - array: &array::ArrayRef, - level: i16, - parent_levels: &[i16], + array: &arrow_array::ArrayRef, + parent_def_levels: &[i16], ) -> Vec { - // convince the compiler that bounds are fine - let len = array.len(); - assert_eq!( - len, - parent_levels.len(), - "Parent definition levels must equal array length" - ); - - (0..len) - .map(|index| (array.is_valid(index) as i16 + level) * parent_levels[index]) - .collect() + let mut array_index = 0; + let max_def_level = parent_def_levels.iter().max().unwrap(); + let mut primitive_def_levels = vec![]; + parent_def_levels.iter().for_each(|def_level| { + if def_level < max_def_level { + primitive_def_levels.push(*def_level); + } else { + primitive_def_levels.push(def_level - array.is_null(array_index) as i16); + array_index += 1; + } + }); + primitive_def_levels } /// Get the underlying numeric array slice, skipping any null values. -/// If there are no null values, the entire slice is returned, -/// thus this should only be called when there are null values. -fn get_numeric_array_slice(array: &array::PrimitiveArray) -> Vec +/// If there are no null values, it might be quicker to get the slice directly instead of +/// calling this function. +fn get_numeric_array_slice(array: &arrow_array::PrimitiveArray) -> Vec where T: DataType, A: arrow::datatypes::ArrowNumericType, @@ -283,7 +487,42 @@ mod tests { .unwrap(); let file = get_temp_file("test_arrow_writer.parquet", &[]); - let mut writer = ArrowWriter::try_new(file, &schema, None).unwrap(); + let mut writer = ArrowWriter::try_new(file, Arc::new(schema), None).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + } + + #[test] + fn arrow_writer_list() { + // define schema + let schema = Schema::new(vec![Field::new( + "a", + DataType::List(Box::new(DataType::Int32)), + false, + )]); + + // create some data + let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); + + // Construct a buffer for value offsets, for the nested array: + // [[false], [true, false], null, [true, false, true], [false, true, false, true]] + let a_value_offsets = + arrow::buffer::Buffer::from(&[0, 1, 3, 3, 6, 10].to_byte_slice()); + + // Construct a list array from the above two + let a_list_data = ArrayData::builder(DataType::List(Box::new(DataType::Int32))) + .len(5) + .add_buffer(a_value_offsets) + .add_child_data(a_values.data()) + .build(); + let a = ListArray::from(a_list_data); + + // build a record batch + let batch = + RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)]).unwrap(); + + let file = get_temp_file("test_arrow_writer_list.parquet", &[]); + let mut writer = ArrowWriter::try_new(file, Arc::new(schema), None).unwrap(); writer.write(&batch).unwrap(); writer.close().unwrap(); } @@ -294,13 +533,10 @@ mod tests { let struct_field_d = Field::new("d", DataType::Float64, true); let struct_field_f = Field::new("f", DataType::Float32, true); let struct_field_g = - Field::new("g", DataType::List(Box::new(DataType::Boolean)), false); + Field::new("g", DataType::List(Box::new(DataType::Int16)), false); let struct_field_e = Field::new( "e", - DataType::Struct(vec![ - struct_field_f.clone(), - // struct_field_g.clone() - ]), + DataType::Struct(vec![struct_field_f.clone(), struct_field_g.clone()]), true, ); let schema = Schema::new(vec![ @@ -319,12 +555,10 @@ mod tests { let d = Float64Array::from(vec![None, None, None, Some(1.0), None]); let f = Float32Array::from(vec![Some(0.0), None, Some(333.3), None, Some(5.25)]); - let g_value = BooleanArray::from(vec![ - false, true, false, true, false, true, false, true, false, true, - ]); + let g_value = Int16Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); // Construct a buffer for value offsets, for the nested array: - // [[false], [true, false], null, [true, false, true], [false, true, false, true]] + // [[1], [2, 3], null, [4, 5, 6], [7, 8, 9, 10]] let g_value_offsets = arrow::buffer::Buffer::from(&[0, 1, 3, 3, 6, 10].to_byte_slice()); @@ -334,11 +568,11 @@ mod tests { .add_buffer(g_value_offsets) .add_child_data(g_value.data()) .build(); - let _g = ListArray::from(g_list_data); + let g = ListArray::from(g_list_data); let e = StructArray::from(vec![ (struct_field_f, Arc::new(f) as ArrayRef), - // (struct_field_g, Arc::new(g) as ArrayRef), + (struct_field_g, Arc::new(g) as ArrayRef), ]); let c = StructArray::from(vec![ @@ -354,7 +588,7 @@ mod tests { .unwrap(); let file = get_temp_file("test_arrow_writer_complex.parquet", &[]); - let mut writer = ArrowWriter::try_new(file, &schema, None).unwrap(); + let mut writer = ArrowWriter::try_new(file, Arc::new(schema), None).unwrap(); writer.write(&batch).unwrap(); writer.close().unwrap(); } From 869dd8e016f9f0d54d337ce4739e0bf6321a1681 Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Sat, 8 Aug 2020 22:29:13 +0200 Subject: [PATCH 10/11] remove redundant comment --- rust/parquet/src/arrow/arrow_writer.rs | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/rust/parquet/src/arrow/arrow_writer.rs b/rust/parquet/src/arrow/arrow_writer.rs index 1d6126997c7..9b3e4505d34 100644 --- a/rust/parquet/src/arrow/arrow_writer.rs +++ b/rust/parquet/src/arrow/arrow_writer.rs @@ -244,7 +244,7 @@ fn write_leaf( Ok(written as i64) } -/// A struct that repreesnts definition and repetition levels. +/// A struct that represents definition and repetition levels. /// Repetition levels are only populated if the parent or current leaf is repeated #[derive(Debug)] struct Levels { @@ -289,20 +289,14 @@ fn get_levels( ArrowDataType::FixedSizeBinary(_) => unimplemented!(), ArrowDataType::LargeBinary => unimplemented!(), ArrowDataType::List(_) | ArrowDataType::LargeList(_) => { - // a list can either be nested or flat. If it is flat, def and rep lengths will be the length of the list's items let array_data = array.data(); let child_data = array_data.child_data().get(0).unwrap(); // get offsets, accounting for large offsets if present let offsets: Vec = { if let ArrowDataType::LargeList(_) = array.data_type() { - unsafe { - array_data.buffers()[0].typed_data::() // TODO: offsets - } - .to_vec() + unsafe { array_data.buffers()[0].typed_data::() }.to_vec() } else { - let offsets = unsafe { - array_data.buffers()[0].typed_data::() // TODO: offsets - }; + let offsets = unsafe { array_data.buffers()[0].typed_data::() }; offsets.to_vec().into_iter().map(|v| v as i64).collect() } }; From 86a2386db36a51a99c71ae491c23fff0b4e67a0c Mon Sep 17 00:00:00 2001 From: Max Burke Date: Thu, 13 Aug 2020 08:57:10 -0700 Subject: [PATCH 11/11] Preliminary fleshing out of writers for binary/string types * Preliminary fleshing out of writers for binary/string types * Add test for writing string and binary data to parquet from arrow * Incorporate feedback --- rust/parquet/src/arrow/arrow_writer.rs | 117 ++++++++++++++++++++++--- 1 file changed, 105 insertions(+), 12 deletions(-) diff --git a/rust/parquet/src/arrow/arrow_writer.rs b/rust/parquet/src/arrow/arrow_writer.rs index 9b3e4505d34..0c1c4903d16 100644 --- a/rust/parquet/src/arrow/arrow_writer.rs +++ b/rust/parquet/src/arrow/arrow_writer.rs @@ -144,7 +144,11 @@ fn write_leaves( | ArrowDataType::Time32(_) | ArrowDataType::Time64(_) | ArrowDataType::Duration(_) - | ArrowDataType::Interval(_) => { + | ArrowDataType::Interval(_) + | ArrowDataType::LargeBinary + | ArrowDataType::Binary + | ArrowDataType::Utf8 + | ArrowDataType::LargeUtf8 => { let mut col_writer = get_col_writer(&mut row_group_writer)?; write_leaf( &mut col_writer, @@ -175,10 +179,6 @@ fn write_leaves( | ArrowDataType::Null | ArrowDataType::Boolean | ArrowDataType::FixedSizeBinary(_) - | ArrowDataType::LargeBinary - | ArrowDataType::Binary - | ArrowDataType::Utf8 - | ArrowDataType::LargeUtf8 | ArrowDataType::Union(_) | ArrowDataType::Dictionary(_, _) => Err(ParquetError::NYI( "Attempting to write an Arrow type that is not yet implemented".to_string(), @@ -234,9 +234,25 @@ fn write_leaf( levels.repetition.as_deref(), )? } - ColumnWriter::ByteArrayColumnWriter(ref mut _typed) => { - unreachable!("Currently unreachable because data type not supported") - } + ColumnWriter::ByteArrayColumnWriter(ref mut typed) => match column.data_type() { + ArrowDataType::Binary | ArrowDataType::Utf8 => { + let array = arrow_array::BinaryArray::from(column.data()); + typed.write_batch( + get_binary_array(&array).as_slice(), + Some(levels.definition.as_slice()), + levels.repetition.as_deref(), + )? + } + ArrowDataType::LargeBinary | ArrowDataType::LargeUtf8 => { + let array = arrow_array::LargeBinaryArray::from(column.data()); + typed.write_batch( + get_large_binary_array(&array).as_slice(), + Some(levels.definition.as_slice()), + levels.repetition.as_deref(), + )? + } + _ => unreachable!("Currently unreachable because data type not supported"), + }, ColumnWriter::FixedLenByteArrayColumnWriter(ref mut _typed) => { unreachable!("Currently unreachable because data type not supported") } @@ -281,13 +297,13 @@ fn get_levels( | ArrowDataType::Time32(_) | ArrowDataType::Time64(_) | ArrowDataType::Duration(_) - | ArrowDataType::Interval(_) => vec![Levels { + | ArrowDataType::Interval(_) + | ArrowDataType::Binary + | ArrowDataType::LargeBinary => vec![Levels { definition: get_primitive_def_levels(array, parent_def_levels), repetition: None, }], - ArrowDataType::Binary => unimplemented!(), ArrowDataType::FixedSizeBinary(_) => unimplemented!(), - ArrowDataType::LargeBinary => unimplemented!(), ArrowDataType::List(_) | ArrowDataType::LargeList(_) => { let array_data = array.data(); let child_data = array_data.child_data().get(0).unwrap(); @@ -430,6 +446,24 @@ fn get_primitive_def_levels( primitive_def_levels } +macro_rules! def_get_binary_array_fn { + ($name:ident, $ty:ty) => { + fn $name(array: &$ty) -> Vec { + let mut values = Vec::with_capacity(array.len() - array.null_count()); + for i in 0..array.len() { + if array.is_valid(i) { + let bytes = ByteArray::from(array.value(i).to_vec()); + values.push(bytes); + } + } + values + } + }; +} + +def_get_binary_array_fn!(get_binary_array, arrow_array::BinaryArray); +def_get_binary_array_fn!(get_large_binary_array, arrow_array::LargeBinaryArray); + /// Get the underlying numeric array slice, skipping any null values. /// If there are no null values, it might be quicker to get the slice directly instead of /// calling this function. @@ -452,13 +486,16 @@ where mod tests { use super::*; + use std::io::Seek; use std::sync::Arc; use arrow::array::*; use arrow::datatypes::ToByteSlice; use arrow::datatypes::{DataType, Field, Schema}; - use arrow::record_batch::RecordBatch; + use arrow::record_batch::{RecordBatch, RecordBatchReader}; + use crate::arrow::{ArrowReader, ParquetFileArrowReader}; + use crate::file::reader::SerializedFileReader; use crate::util::test_common::get_temp_file; #[test] @@ -521,6 +558,62 @@ mod tests { writer.close().unwrap(); } + #[test] + fn arrow_writer_binary() { + let string_field = Field::new("a", DataType::Utf8, false); + let binary_field = Field::new("b", DataType::Binary, false); + let schema = Schema::new(vec![string_field, binary_field]); + + let raw_string_values = vec!["foo", "bar", "baz", "quux"]; + let raw_binary_values = vec![ + b"foo".to_vec(), + b"bar".to_vec(), + b"baz".to_vec(), + b"quux".to_vec(), + ]; + let raw_binary_value_refs = raw_binary_values + .iter() + .map(|x| x.as_slice()) + .collect::>(); + + let string_values = StringArray::from(raw_string_values.clone()); + let binary_values = BinaryArray::from(raw_binary_value_refs); + let batch = RecordBatch::try_new( + Arc::new(schema.clone()), + vec![Arc::new(string_values), Arc::new(binary_values)], + ) + .unwrap(); + + let mut file = get_temp_file("test_arrow_writer.parquet", &[]); + let mut writer = + ArrowWriter::try_new(file.try_clone().unwrap(), Arc::new(schema), None) + .unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + + file.seek(std::io::SeekFrom::Start(0)).unwrap(); + let file_reader = SerializedFileReader::new(file).unwrap(); + let mut arrow_reader = ParquetFileArrowReader::new(Rc::new(file_reader)); + let mut record_batch_reader = arrow_reader.get_record_reader(1024).unwrap(); + + let batch = record_batch_reader.next_batch().unwrap().unwrap(); + let string_col = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let binary_col = batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + + for i in 0..batch.num_rows() { + assert_eq!(string_col.value(i), raw_string_values[i]); + assert_eq!(binary_col.value(i), raw_binary_values[i].as_slice()); + } + } + #[test] fn arrow_writer_complex() { // define schema