From 8fe596c34c4066b39ded4110a9f65d942d47bfcb Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Mon, 22 Mar 2021 19:00:19 +0200 Subject: [PATCH 1/2] ARROW-12043: [Rust] [Parquet] Write FSB arrays --- rust/parquet/src/arrow/arrow_writer.rs | 33 ++++++++++++++++++++++---- rust/parquet/src/arrow/levels.rs | 26 +++++++++++++------- rust/parquet/src/arrow/mod.rs | 2 +- 3 files changed, 47 insertions(+), 14 deletions(-) diff --git a/rust/parquet/src/arrow/arrow_writer.rs b/rust/parquet/src/arrow/arrow_writer.rs index 1ce907f81c1..506f93a30ee 100644 --- a/rust/parquet/src/arrow/arrow_writer.rs +++ b/rust/parquet/src/arrow/arrow_writer.rs @@ -156,6 +156,16 @@ fn write_leaves( row_group_writer.close_column(col_writer)?; Ok(()) } + ArrowDataType::FixedSizeBinary(_) => { + let mut col_writer = get_col_writer(&mut row_group_writer)?; + write_leaf( + &mut col_writer, + array, + levels.pop().expect("Levels exhausted"), + )?; + row_group_writer.close_column(col_writer)?; + Ok(()) + } ArrowDataType::List(_) | ArrowDataType::LargeList(_) => { // write the child list let data = array.data(); @@ -189,11 +199,12 @@ fn write_leaves( ArrowDataType::Float16 => Err(ParquetError::ArrowError( "Float16 arrays not supported".to_string(), )), - ArrowDataType::FixedSizeList(_, _) - | ArrowDataType::FixedSizeBinary(_) - | ArrowDataType::Union(_) => Err(ParquetError::NYI( - "Attempting to write an Arrow type that is not yet implemented".to_string(), - )), + ArrowDataType::FixedSizeList(_, _) | ArrowDataType::Union(_) => { + Err(ParquetError::NYI( + "Attempting to write an Arrow type that is not yet implemented" + .to_string(), + )) + } } } @@ -1224,6 +1235,18 @@ mod tests { ); } + #[test] + fn fixed_size_binary_single_column() { + let mut builder = FixedSizeBinaryBuilder::new(16, 4); + builder.append_value(b"0123").unwrap(); + builder.append_null().unwrap(); + builder.append_value(b"8910").unwrap(); + builder.append_value(b"1112").unwrap(); + let array = Arc::new(builder.finish()); + + one_column_roundtrip("timestamp_millisecond_single_column", array, true); + } + #[test] fn string_single_column() { let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect(); diff --git a/rust/parquet/src/arrow/levels.rs b/rust/parquet/src/arrow/levels.rs index 641e330522d..00bb553fc1f 100644 --- a/rust/parquet/src/arrow/levels.rs +++ b/rust/parquet/src/arrow/levels.rs @@ -136,7 +136,8 @@ impl LevelInfo { | DataType::Interval(_) | DataType::Binary | DataType::LargeBinary - | DataType::Decimal(_, _) => { + | DataType::Decimal(_, _) + | DataType::FixedSizeBinary(_) => { // we return a vector of 1 value to represent the primitive vec![self.calculate_child_levels( array_offsets, @@ -145,7 +146,6 @@ impl LevelInfo { field.is_nullable(), )] } - DataType::FixedSizeBinary(_) => unimplemented!(), DataType::List(list_field) | DataType::LargeList(list_field) => { // Calculate the list level let list_level = self.calculate_child_levels( @@ -297,9 +297,10 @@ impl LevelInfo { is_list: bool, is_nullable: bool, ) -> Self { - let mut definition = vec![]; - let mut repetition = vec![]; - let mut merged_array_mask = vec![]; + let min_len = *(array_offsets.last().unwrap()) as usize; + let mut definition = Vec::with_capacity(min_len); + let mut repetition = Vec::with_capacity(min_len); + let mut merged_array_mask = Vec::with_capacity(min_len); // determine the total level increment based on data types let max_definition = match is_list { @@ -624,9 +625,18 @@ impl LevelInfo { let masks = offsets.windows(2).map(|w| w[1] > w[0]).collect(); (offsets, masks) } - DataType::FixedSizeBinary(_) - | DataType::FixedSizeList(_, _) - | DataType::Union(_) => { + DataType::FixedSizeBinary(value_len) => { + let array_mask = match array.data().null_buffer() { + Some(buf) => get_bool_array_slice(buf, array.offset(), array.len()), + None => vec![true; array.len()], + }; + let value_len = *value_len as i64; + ( + (0..=(array.len() as i64)).map(|v| v * value_len).collect(), + array_mask, + ) + } + DataType::FixedSizeList(_, _) | DataType::Union(_) => { unimplemented!("Getting offsets not yet implemented") } } diff --git a/rust/parquet/src/arrow/mod.rs b/rust/parquet/src/arrow/mod.rs index 9095259163f..b1aa39ebafa 100644 --- a/rust/parquet/src/arrow/mod.rs +++ b/rust/parquet/src/arrow/mod.rs @@ -53,7 +53,7 @@ pub(in crate::arrow) mod array_reader; pub mod arrow_reader; pub mod arrow_writer; pub(in crate::arrow) mod converter; -pub mod levels; +pub(in crate::arrow) mod levels; pub(in crate::arrow) mod record_reader; pub mod schema; From 4ecc2d7e49ceec2d690bbf83af25df0a8f87c52b Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Thu, 25 Mar 2021 00:06:17 +0200 Subject: [PATCH 2/2] review comments --- rust/parquet/src/arrow/arrow_writer.rs | 19 ++++++------------- rust/parquet/src/arrow/levels.rs | 4 ++-- 2 files changed, 8 insertions(+), 15 deletions(-) diff --git a/rust/parquet/src/arrow/arrow_writer.rs b/rust/parquet/src/arrow/arrow_writer.rs index 506f93a30ee..a3577caec69 100644 --- a/rust/parquet/src/arrow/arrow_writer.rs +++ b/rust/parquet/src/arrow/arrow_writer.rs @@ -146,17 +146,8 @@ fn write_leaves( | ArrowDataType::Binary | ArrowDataType::Utf8 | ArrowDataType::LargeUtf8 - | ArrowDataType::Decimal(_, _) => { - let mut col_writer = get_col_writer(&mut row_group_writer)?; - write_leaf( - &mut col_writer, - array, - levels.pop().expect("Levels exhausted"), - )?; - row_group_writer.close_column(col_writer)?; - Ok(()) - } - ArrowDataType::FixedSizeBinary(_) => { + | ArrowDataType::Decimal(_, _) + | ArrowDataType::FixedSizeBinary(_) => { let mut col_writer = get_col_writer(&mut row_group_writer)?; write_leaf( &mut col_writer, @@ -201,8 +192,10 @@ fn write_leaves( )), ArrowDataType::FixedSizeList(_, _) | ArrowDataType::Union(_) => { Err(ParquetError::NYI( - "Attempting to write an Arrow type that is not yet implemented" - .to_string(), + format!( + "Attempting to write an Arrow type {:?} to parquet that is not yet implemented", + array.data_type() + ) )) } } diff --git a/rust/parquet/src/arrow/levels.rs b/rust/parquet/src/arrow/levels.rs index 00bb553fc1f..2168670bb59 100644 --- a/rust/parquet/src/arrow/levels.rs +++ b/rust/parquet/src/arrow/levels.rs @@ -189,7 +189,8 @@ impl LevelInfo { | DataType::Utf8 | DataType::LargeUtf8 | DataType::Dictionary(_, _) - | DataType::Decimal(_, _) => { + | DataType::Decimal(_, _) + | DataType::FixedSizeBinary(_) => { vec![list_level.calculate_child_levels( child_offsets, child_mask, @@ -197,7 +198,6 @@ impl LevelInfo { list_field.is_nullable(), )] } - DataType::FixedSizeBinary(_) => unimplemented!(), DataType::List(_) | DataType::LargeList(_) | DataType::Struct(_) => { list_level.calculate_array_levels(&child_array, list_field) }