diff --git a/rust/parquet/src/arrow/arrow_writer.rs b/rust/parquet/src/arrow/arrow_writer.rs index dc9cf70a374..ca78ffc957b 100644 --- a/rust/parquet/src/arrow/arrow_writer.rs +++ b/rust/parquet/src/arrow/arrow_writer.rs @@ -24,8 +24,10 @@ use arrow::datatypes::{DataType as ArrowDataType, SchemaRef}; use arrow::record_batch::RecordBatch; use arrow_array::Array; +use super::levels::LevelInfo; use super::schema::add_encoded_arrow_schema_to_metadata; -use crate::column::writer::{ColumnWriter, ColumnWriterImpl}; + +use crate::column::writer::ColumnWriter; use crate::errors::{ParquetError, Result}; use crate::file::properties::WriterProperties; use crate::{ @@ -85,11 +87,16 @@ impl ArrowWriter { } // compute the definition and repetition levels of the batch let mut levels = vec![]; - batch.columns().iter().for_each(|array| { - let mut array_levels = - get_levels(array, 0, &vec![1i16; batch.num_rows()][..], None); - levels.append(&mut array_levels); - }); + let batch_level = LevelInfo::new_from_batch(batch); + batch + .columns() + .iter() + .zip(batch.schema().fields()) + .for_each(|(array, field)| { + let mut array_levels = + batch_level.calculate_array_levels(array, field, 1); + levels.append(&mut array_levels); + }); // reverse levels so we can use Vec::pop(&mut self) levels.reverse(); @@ -103,7 +110,7 @@ impl ArrowWriter { self.writer.close_row_group(row_group_writer) } - /// Close and finalise the underlying Parquet writer + /// Close and finalize the underlying Parquet writer pub fn close(&mut self) -> Result<()> { self.writer.close() } @@ -125,7 +132,7 @@ fn get_col_writer( fn write_leaves( mut row_group_writer: &mut Box, array: &arrow_array::ArrayRef, - mut levels: &mut Vec, + mut levels: &mut Vec, ) -> Result<()> { match array.data_type() { ArrowDataType::Null @@ -177,87 +184,17 @@ fn write_leaves( } Ok(()) } - ArrowDataType::Dictionary(key_type, value_type) => { - use arrow_array::{PrimitiveArray, StringArray}; - use ArrowDataType::*; - use ColumnWriter::*; + ArrowDataType::Dictionary(_, value_type) => { + // cast dictionary to a primitive + let array = arrow::compute::cast(array, value_type)?; - let array = &**array; let mut col_writer = get_col_writer(&mut row_group_writer)?; - let levels = levels.pop().expect("Levels exhausted"); - - macro_rules! dispatch_dictionary { - ($($kt: pat, $vt: pat, $w: ident => $kat: ty, $vat: ty,)*) => ( - match (&**key_type, &**value_type, &mut col_writer) { - $(($kt, $vt, $w(writer)) => write_dict::<$kat, $vat, _>(array, writer, levels),)* - (kt, vt, _) => unreachable!("Shouldn't be attempting to write dictionary of <{:?}, {:?}>", kt, vt), - } - ); - } - - if let (UInt8, UInt32, Int32ColumnWriter(writer)) = - (&**key_type, &**value_type, &mut col_writer) - { - let typed_array = array - .as_any() - .downcast_ref::() - .expect("Unable to get dictionary array"); - - let keys = typed_array.keys(); - - let value_buffer = typed_array.values(); - let value_array = - arrow::compute::cast(&value_buffer, &ArrowDataType::Int32)?; - - let values = value_array - .as_any() - .downcast_ref::() - .unwrap(); - - use std::convert::TryFrom; - // This removes NULL values from the keys, but - // they're encoded by the levels, so that's fine. - let materialized_values: Vec<_> = keys - .into_iter() - .flatten() - .map(|key| { - usize::try_from(key) - .unwrap_or_else(|k| panic!("key {} does not fit in usize", k)) - }) - .map(|key| values.value(key)) - .collect(); - - let materialized_primitive_array = - PrimitiveArray::::from( - materialized_values, - ); - - writer.write_batch( - get_numeric_array_slice::( - &materialized_primitive_array, - ) - .as_slice(), - Some(levels.definition.as_slice()), - levels.repetition.as_deref(), - )?; - row_group_writer.close_column(col_writer)?; - - return Ok(()); - } - - dispatch_dictionary!( - Int8, Utf8, ByteArrayColumnWriter => arrow::datatypes::Int8Type, StringArray, - Int16, Utf8, ByteArrayColumnWriter => arrow::datatypes::Int16Type, StringArray, - Int32, Utf8, ByteArrayColumnWriter => arrow::datatypes::Int32Type, StringArray, - Int64, Utf8, ByteArrayColumnWriter => arrow::datatypes::Int64Type, StringArray, - UInt8, Utf8, ByteArrayColumnWriter => arrow::datatypes::UInt8Type, StringArray, - UInt16, Utf8, ByteArrayColumnWriter => arrow::datatypes::UInt16Type, StringArray, - UInt32, Utf8, ByteArrayColumnWriter => arrow::datatypes::UInt32Type, StringArray, - UInt64, Utf8, ByteArrayColumnWriter => arrow::datatypes::UInt64Type, StringArray, + write_leaf( + &mut col_writer, + &array, + levels.pop().expect("Levels exhausted"), )?; - row_group_writer.close_column(col_writer)?; - Ok(()) } ArrowDataType::Float16 => Err(ParquetError::ArrowError( @@ -272,72 +209,12 @@ fn write_leaves( } } -trait Materialize { - type Output; - - // Materialize the packed dictionary. The writer will later repack it. - fn materialize(&self) -> Vec; -} - -impl Materialize for dyn Array -where - K: arrow::datatypes::ArrowDictionaryKeyType, -{ - type Output = ByteArray; - - fn materialize(&self) -> Vec { - use arrow::datatypes::ArrowNativeType; - - let typed_array = self - .as_any() - .downcast_ref::>() - .expect("Unable to get dictionary array"); - - let keys = typed_array.keys(); - - let value_buffer = typed_array.values(); - let values = value_buffer - .as_any() - .downcast_ref::() - .unwrap(); - - // This removes NULL values from the keys, but - // they're encoded by the levels, so that's fine. - keys.into_iter() - .flatten() - .map(|key| { - key.to_usize() - .unwrap_or_else(|| panic!("key {:?} does not fit in usize", key)) - }) - .map(|key| values.value(key)) - .map(ByteArray::from) - .collect() - } -} - -fn write_dict( - array: &(dyn Array + 'static), - writer: &mut ColumnWriterImpl, - levels: Levels, -) -> Result<()> -where - T: DataType, - dyn Array: Materialize, -{ - writer.write_batch( - &array.materialize(), - Some(levels.definition.as_slice()), - levels.repetition.as_deref(), - )?; - - Ok(()) -} - fn write_leaf( writer: &mut ColumnWriter, column: &arrow_array::ArrayRef, - levels: Levels, + levels: LevelInfo, ) -> Result { + let indices = filter_array_indices(&levels); let written = match writer { ColumnWriter::Int32ColumnWriter(ref mut typed) => { let array = arrow::compute::cast(column, &ArrowDataType::Int32)?; @@ -345,8 +222,10 @@ fn write_leaf( .as_any() .downcast_ref::() .expect("Unable to get int32 array"); + // assigning values to make it easier to debug + let slice = get_numeric_array_slice::(&array, &indices); typed.write_batch( - get_numeric_array_slice::(&array).as_slice(), + slice.as_slice(), Some(levels.definition.as_slice()), levels.repetition.as_deref(), )? @@ -354,7 +233,7 @@ fn write_leaf( ColumnWriter::BoolColumnWriter(ref mut typed) => { let array = arrow_array::BooleanArray::from(column.data()); typed.write_batch( - get_bool_array_slice(&array).as_slice(), + get_bool_array_slice(&array, &indices).as_slice(), Some(levels.definition.as_slice()), levels.repetition.as_deref(), )? @@ -362,7 +241,7 @@ fn write_leaf( ColumnWriter::Int64ColumnWriter(ref mut typed) => { let array = arrow_array::Int64Array::from(column.data()); typed.write_batch( - get_numeric_array_slice::(&array).as_slice(), + get_numeric_array_slice::(&array, &indices).as_slice(), Some(levels.definition.as_slice()), levels.repetition.as_deref(), )? @@ -373,7 +252,7 @@ fn write_leaf( ColumnWriter::FloatColumnWriter(ref mut typed) => { let array = arrow_array::Float32Array::from(column.data()); typed.write_batch( - get_numeric_array_slice::(&array).as_slice(), + get_numeric_array_slice::(&array, &indices).as_slice(), Some(levels.definition.as_slice()), levels.repetition.as_deref(), )? @@ -381,7 +260,7 @@ fn write_leaf( ColumnWriter::DoubleColumnWriter(ref mut typed) => { let array = arrow_array::Float64Array::from(column.data()); typed.write_batch( - get_numeric_array_slice::(&array).as_slice(), + get_numeric_array_slice::(&array, &indices).as_slice(), Some(levels.definition.as_slice()), levels.repetition.as_deref(), )? @@ -428,209 +307,6 @@ fn write_leaf( Ok(written as i64) } -/// A struct that represents definition and repetition levels. -/// Repetition levels are only populated if the parent or current leaf is repeated -#[derive(Debug)] -struct Levels { - definition: Vec, - repetition: Option>, -} - -/// Compute nested levels of the Arrow array, recursing into lists and structs -fn get_levels( - array: &arrow_array::ArrayRef, - level: i16, - parent_def_levels: &[i16], - parent_rep_levels: Option<&[i16]>, -) -> Vec { - match array.data_type() { - ArrowDataType::Null => vec![Levels { - definition: parent_def_levels.iter().map(|v| (v - 1).max(0)).collect(), - repetition: None, - }], - ArrowDataType::Boolean - | ArrowDataType::Int8 - | ArrowDataType::Int16 - | ArrowDataType::Int32 - | ArrowDataType::Int64 - | ArrowDataType::UInt8 - | ArrowDataType::UInt16 - | ArrowDataType::UInt32 - | ArrowDataType::UInt64 - | ArrowDataType::Float16 - | ArrowDataType::Float32 - | ArrowDataType::Float64 - | ArrowDataType::Utf8 - | ArrowDataType::LargeUtf8 - | ArrowDataType::Timestamp(_, _) - | ArrowDataType::Date32(_) - | ArrowDataType::Date64(_) - | ArrowDataType::Time32(_) - | ArrowDataType::Time64(_) - | ArrowDataType::Duration(_) - | ArrowDataType::Interval(_) - | ArrowDataType::Binary - | ArrowDataType::LargeBinary => vec![Levels { - definition: get_primitive_def_levels(array, parent_def_levels), - repetition: None, - }], - ArrowDataType::FixedSizeBinary(_) => unimplemented!(), - ArrowDataType::Decimal(_, _) => unimplemented!(), - ArrowDataType::List(_) | ArrowDataType::LargeList(_) => { - let array_data = array.data(); - let child_data = array_data.child_data().get(0).unwrap(); - // get offsets, accounting for large offsets if present - let offsets: Vec = { - if let ArrowDataType::LargeList(_) = array.data_type() { - unsafe { array_data.buffers()[0].typed_data::() }.to_vec() - } else { - let offsets = unsafe { array_data.buffers()[0].typed_data::() }; - offsets.to_vec().into_iter().map(|v| v as i64).collect() - } - }; - let child_array = arrow_array::make_array(child_data.clone()); - - let mut list_def_levels = Vec::with_capacity(child_array.len()); - let mut list_rep_levels = Vec::with_capacity(child_array.len()); - let rep_levels: Vec = parent_rep_levels - .map(|l| l.to_vec()) - .unwrap_or_else(|| vec![0i16; parent_def_levels.len()]); - parent_def_levels - .iter() - .zip(rep_levels) - .zip(offsets.windows(2)) - .for_each(|((parent_def_level, parent_rep_level), window)| { - if *parent_def_level == 0 { - // parent is null, list element must also be null - list_def_levels.push(0); - list_rep_levels.push(0); - } else { - // parent is not null, check if list is empty or null - let start = window[0]; - let end = window[1]; - let len = end - start; - if len == 0 { - list_def_levels.push(*parent_def_level - 1); - list_rep_levels.push(parent_rep_level); - } else { - list_def_levels.push(*parent_def_level); - list_rep_levels.push(parent_rep_level); - for _ in 1..len { - list_def_levels.push(*parent_def_level); - list_rep_levels.push(parent_rep_level + 1); - } - } - } - }); - - // if datatype is a primitive, we can construct levels of the child array - match child_array.data_type() { - // TODO: The behaviour of a > is untested - ArrowDataType::Null => vec![Levels { - definition: list_def_levels, - repetition: Some(list_rep_levels), - }], - ArrowDataType::Boolean - | ArrowDataType::Int8 - | ArrowDataType::Int16 - | ArrowDataType::Int32 - | ArrowDataType::Int64 - | ArrowDataType::UInt8 - | ArrowDataType::UInt16 - | ArrowDataType::UInt32 - | ArrowDataType::UInt64 - | ArrowDataType::Float16 - | ArrowDataType::Float32 - | ArrowDataType::Float64 - | ArrowDataType::Timestamp(_, _) - | ArrowDataType::Date32(_) - | ArrowDataType::Date64(_) - | ArrowDataType::Time32(_) - | ArrowDataType::Time64(_) - | ArrowDataType::Duration(_) - | ArrowDataType::Interval(_) => { - let def_levels = - get_primitive_def_levels(&child_array, &list_def_levels[..]); - vec![Levels { - definition: def_levels, - repetition: Some(list_rep_levels), - }] - } - ArrowDataType::Binary - | ArrowDataType::Utf8 - | ArrowDataType::LargeUtf8 => unimplemented!(), - ArrowDataType::FixedSizeBinary(_) => unimplemented!(), - ArrowDataType::Decimal(_, _) => unimplemented!(), - ArrowDataType::LargeBinary => unimplemented!(), - ArrowDataType::List(_) | ArrowDataType::LargeList(_) => { - // nested list - unimplemented!() - } - ArrowDataType::FixedSizeList(_, _) => unimplemented!(), - ArrowDataType::Struct(_) => get_levels( - array, - level + 1, // indicates a nesting level of 2 (list + struct) - &list_def_levels[..], - Some(&list_rep_levels[..]), - ), - ArrowDataType::Union(_) => unimplemented!(), - ArrowDataType::Dictionary(_, _) => unimplemented!(), - } - } - ArrowDataType::FixedSizeList(_, _) => unimplemented!(), - ArrowDataType::Struct(_) => { - let struct_array: &arrow_array::StructArray = array - .as_any() - .downcast_ref::() - .expect("Unable to get struct array"); - let mut struct_def_levels = Vec::with_capacity(struct_array.len()); - for i in 0..array.len() { - struct_def_levels.push(level + struct_array.is_valid(i) as i16); - } - // trying to create levels for struct's fields - let mut struct_levels = vec![]; - struct_array.columns().into_iter().for_each(|col| { - let mut levels = - get_levels(col, level + 1, &struct_def_levels[..], parent_rep_levels); - struct_levels.append(&mut levels); - }); - struct_levels - } - ArrowDataType::Union(_) => unimplemented!(), - ArrowDataType::Dictionary(_, _) => { - // Need to check for these cases not implemented in C++: - // - "Writing DictionaryArray with nested dictionary type not yet supported" - // - "Writing DictionaryArray with null encoded in dictionary type not yet supported" - vec![Levels { - definition: get_primitive_def_levels(array, parent_def_levels), - repetition: None, - }] - } - } -} - -/// Get the definition levels of the numeric array, with level 0 being null and 1 being not null -/// In the case where the array in question is a child of either a list or struct, the levels -/// are incremented in accordance with the `level` parameter. -/// Parent levels are either 0 or 1, and are used to higher (correct terminology?) leaves as null -fn get_primitive_def_levels( - array: &arrow_array::ArrayRef, - parent_def_levels: &[i16], -) -> Vec { - let mut array_index = 0; - let max_def_level = parent_def_levels.iter().max().unwrap(); - let mut primitive_def_levels = vec![]; - parent_def_levels.iter().for_each(|def_level| { - if def_level < max_def_level { - primitive_def_levels.push(*def_level); - } else { - primitive_def_levels.push(def_level - array.is_null(array_index) as i16); - array_index += 1; - } - }); - primitive_def_levels -} - macro_rules! def_get_binary_array_fn { ($name:ident, $ty:ty) => { fn $name(array: &$ty) -> Vec { @@ -655,31 +331,54 @@ def_get_binary_array_fn!(get_large_string_array, arrow_array::LargeStringArray); /// Get the underlying numeric array slice, skipping any null values. /// If there are no null values, it might be quicker to get the slice directly instead of /// calling this function. -fn get_numeric_array_slice(array: &arrow_array::PrimitiveArray) -> Vec +fn get_numeric_array_slice( + array: &arrow_array::PrimitiveArray, + indices: &[usize], +) -> Vec where T: DataType, A: arrow::datatypes::ArrowNumericType, T::T: From, { - let mut values = Vec::with_capacity(array.len() - array.null_count()); - for i in 0..array.len() { - if array.is_valid(i) { - values.push(array.value(i).into()) - } + let mut values = Vec::with_capacity(indices.len()); + for i in indices { + values.push(array.value(*i).into()) } values } -fn get_bool_array_slice(array: &arrow_array::BooleanArray) -> Vec { - let mut values = Vec::with_capacity(array.len() - array.null_count()); - for i in 0..array.len() { - if array.is_valid(i) { - values.push(array.value(i)) - } +fn get_bool_array_slice( + array: &arrow_array::BooleanArray, + indices: &[usize], +) -> Vec { + let mut values = Vec::with_capacity(indices.len()); + for i in indices { + values.push(array.value(*i)) } values } +/// Given a level's information, calculate the offsets required to index an array +/// correctly. +fn filter_array_indices(level: &LevelInfo) -> Vec { + let mut filtered = vec![]; + // remove slots that are false from definition_mask + let mut index = 0; + level + .definition + .iter() + .zip(&level.definition_mask) + .for_each(|(def, (mask, _))| { + if *mask { + if *def == level.max_definition { + filtered.push(index); + } + index += 1; + } + }); + filtered +} + #[cfg(test)] mod tests { use super::*; @@ -687,15 +386,13 @@ mod tests { use std::io::Seek; use std::sync::Arc; - use arrow::array::*; use arrow::datatypes::ToByteSlice; use arrow::datatypes::{DataType, Field, Schema, UInt32Type, UInt8Type}; use arrow::record_batch::RecordBatch; + use arrow::{array::*, buffer::Buffer}; use crate::arrow::{ArrowReader, ParquetFileArrowReader}; - use crate::file::{ - metadata::KeyValue, reader::SerializedFileReader, writer::InMemoryWriteableCursor, - }; + use crate::file::{reader::SerializedFileReader, writer::InMemoryWriteableCursor}; use crate::util::test_common::get_temp_file; #[test] @@ -771,7 +468,25 @@ mod tests { } #[test] - #[ignore = "repetitions might be incorrect, will be addressed as part of ARROW-9728"] + fn arrow_writer_non_null() { + // define schema + let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); + + // create some data + let a = Int32Array::from(vec![1, 2, 3, 4, 5]); + + // build a record batch + let batch = + RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)]).unwrap(); + + let file = get_temp_file("test_arrow_writer_non_null.parquet", &[]); + let mut writer = ArrowWriter::try_new(file, Arc::new(schema), None).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + } + + #[test] + #[ignore = "ARROW-10766: list support is incomplete"] fn arrow_writer_list() { // define schema let schema = Schema::new(vec![Field::new( @@ -797,6 +512,7 @@ mod tests { .len(5) .add_buffer(a_value_offsets) .add_child_data(a_values.data()) + .null_bit_buffer(Buffer::from(vec![0b00011011])) .build(); let a = ListArray::from(a_list_data); @@ -870,6 +586,7 @@ mod tests { } #[test] + #[ignore = "ARROW-10766: list support is incomplete"] fn arrow_writer_complex() { // define schema let struct_field_d = Field::new("d", DataType::Float64, true); @@ -927,23 +644,106 @@ mod tests { // build a record batch let batch = RecordBatch::try_new( - Arc::new(schema.clone()), + Arc::new(schema), vec![Arc::new(a), Arc::new(b), Arc::new(c)], ) .unwrap(); - let props = WriterProperties::builder() - .set_key_value_metadata(Some(vec![KeyValue { - key: "test_key".to_string(), - value: Some("test_value".to_string()), - }])) + roundtrip("test_arrow_writer_complex.parquet", batch); + } + + #[test] + fn arrow_writer_2_level_struct() { + // tests writing > + let field_c = Field::new("c", DataType::Int32, true); + let field_b = Field::new("b", DataType::Struct(vec![field_c]), true); + let field_a = Field::new("a", DataType::Struct(vec![field_b.clone()]), true); + let schema = Schema::new(vec![field_a.clone()]); + + // create data + let c = Int32Array::from(vec![Some(1), None, Some(3), None, None, Some(6)]); + let b_data = ArrayDataBuilder::new(field_b.data_type().clone()) + .len(6) + .null_bit_buffer(Buffer::from(vec![0b00100111])) + .add_child_data(c.data()) .build(); + let b = StructArray::from(b_data); + let a_data = ArrayDataBuilder::new(field_a.data_type().clone()) + .len(6) + .null_bit_buffer(Buffer::from(vec![0b00101111])) + .add_child_data(b.data()) + .build(); + let a = StructArray::from(a_data); - let file = get_temp_file("test_arrow_writer_complex.parquet", &[]); - let mut writer = - ArrowWriter::try_new(file, Arc::new(schema), Some(props)).unwrap(); - writer.write(&batch).unwrap(); - writer.close().unwrap(); + assert_eq!(a.null_count(), 1); + assert_eq!(a.column(0).null_count(), 2); + + // build a racord batch + let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap(); + + roundtrip("test_arrow_writer_2_level_struct.parquet", batch); + } + + #[test] + fn arrow_writer_2_level_struct_non_null() { + // tests writing > + let field_c = Field::new("c", DataType::Int32, false); + let field_b = Field::new("b", DataType::Struct(vec![field_c]), false); + let field_a = Field::new("a", DataType::Struct(vec![field_b.clone()]), false); + let schema = Schema::new(vec![field_a.clone()]); + + // create data + let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]); + let b_data = ArrayDataBuilder::new(field_b.data_type().clone()) + .len(6) + .add_child_data(c.data()) + .build(); + let b = StructArray::from(b_data); + let a_data = ArrayDataBuilder::new(field_a.data_type().clone()) + .len(6) + .add_child_data(b.data()) + .build(); + let a = StructArray::from(a_data); + + assert_eq!(a.null_count(), 0); + assert_eq!(a.column(0).null_count(), 0); + + // build a racord batch + let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap(); + + roundtrip("test_arrow_writer_2_level_struct_non_null.parquet", batch); + } + + #[test] + fn arrow_writer_2_level_struct_mixed_null() { + // tests writing > + let field_c = Field::new("c", DataType::Int32, false); + let field_b = Field::new("b", DataType::Struct(vec![field_c]), true); + let field_a = Field::new("a", DataType::Struct(vec![field_b.clone()]), false); + let schema = Schema::new(vec![field_a.clone()]); + + // create data + let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]); + let b_data = ArrayDataBuilder::new(field_b.data_type().clone()) + .len(6) + .null_bit_buffer(Buffer::from(vec![0b00100111])) + .add_child_data(c.data()) + .build(); + let b = StructArray::from(b_data); + // a intentionally has no null buffer, to test that this is handled correctly + let a_data = ArrayDataBuilder::new(field_a.data_type().clone()) + .len(6) + .add_child_data(b.data()) + .build(); + let a = StructArray::from(a_data); + + assert_eq!(a.null_count(), 0); + assert_eq!(a.column(0).null_count(), 2); + + // build a racord batch + let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap(); + + roundtrip("test_arrow_writer_2_level_struct_mixed_null.parquet", batch); } const SMALL_SIZE: usize = 100; @@ -1292,7 +1092,7 @@ mod tests { } #[test] - #[ignore = "repetitions might be incorrect, will be addressed as part of ARROW-9728"] + #[ignore = "ARROW-10766: list support is incomplete"] fn list_single_column() { let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); let a_value_offsets = @@ -1317,7 +1117,7 @@ mod tests { } #[test] - #[ignore = "repetitions might be incorrect, will be addressed as part of ARROW-9728"] + #[ignore = "ARROW-10766: list support is incomplete"] fn large_list_single_column() { let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); let a_value_offsets = diff --git a/rust/parquet/src/arrow/levels.rs b/rust/parquet/src/arrow/levels.rs new file mode 100644 index 00000000000..1c178e3a0eb --- /dev/null +++ b/rust/parquet/src/arrow/levels.rs @@ -0,0 +1,333 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Parquet definition and repetition levels +//! +//! Contains the algorithm for computing definition and repetition levels. +//! The algorithm works by tracking the slots of an array that should ultimately be populated when +//! writing to Parquet. +//! Parquet achieves nesting through definition levels and repetition levels [1]. +//! Definition levels specify how many optional fields in the part for the column are defined. +//! Repetition levels specify at what repeated field (list) in the path a column is defined. +//! +//! In a nested data structure such as `a.b.c`, one can see levels as defining whether a record is +//! defined at `a`, `a.b`, or `a.b.c`. Optional fields are nullable fields, thus if all 3 fiedls +//! are nullable, the maximum definition will be = 3. +//! +//! The algorithm in this module computes the necessary information to enable the writer to keep +//! track of which columns are at which levels, and to ultimately extract the correct values at +//! the correct slots from Arrow arrays. +//! +//! It works by walking a record batch's arrays, keeping track of what values are non-null, their +//! positions and computing what their levels are. +//! We use an eager approach that increments definition levels where incrementable, and decrements +//! if a value being checked is null. +//! +//! [1] https://github.com/apache/parquet-format#nested-encoding + +use arrow::array::{Array, ArrayRef, StructArray}; +use arrow::datatypes::{DataType, Field}; +use arrow::record_batch::RecordBatch; + +/// Keeps track of the level information per array that is needed to write an Arrow aray to Parquet. +/// +/// When a nested schema is traversed, intermediate [LevelInfo] structs are created to track +/// the state of parent arrays. When a primitive Arrow array is encountered, a final [LevelInfo] +/// is created, and this is what is used to index into the array when writing data to Parquet. +/// +/// Note: for convenience, the final primitive array's level info can omit some values below if +/// none of that array's parents were repetitive (i.e `is_list` is false) +#[derive(Debug, Eq, PartialEq, Clone)] +pub(crate) struct LevelInfo { + /// Array's definition levels + pub definition: Vec, + /// Array's optional repetition levels + pub repetition: Option>, + /// Array's offsets, 64-bit is used to accommodate large offset arrays + pub array_offsets: Vec, + /// Array's validity mask + /// + /// While this looks like `definition_mask`, they serve different purposes. + /// This mask is for the immediate array, while the `definition_mask` tracks + /// the cumulative effect of all masks from the root (batch) to the current array. + pub array_mask: Vec, + /// Definition mask, to indicate null ListArray slots that should be skipped + pub definition_mask: Vec<(bool, i16)>, + /// The maximum definition at this level, 1 at the record batch + pub max_definition: i16, + /// Whether this array or any of its parents is a list, in which case the + /// `definition_mask` would be used to index correctly into list children. + pub is_list: bool, + /// Whether the current array is nullable (affects definition levels) + pub is_nullable: bool, +} + +impl LevelInfo { + /// Create a new [LevelInfo] from a record batch. + /// + /// This is a convenience function to populate the starting point of the traversal. + pub(crate) fn new_from_batch(batch: &RecordBatch) -> Self { + let num_rows = batch.num_rows(); + Self { + // a batch is treated as all-defined + definition: vec![1; num_rows], + // a batch has no repetition as it is not a list + repetition: None, + // all values of a batch as deemed to be defined at level 1 + definition_mask: vec![(true, 1); num_rows], + // a batch has sequential offsets, should be num_rows + 1 + array_offsets: (0..=(num_rows as i64)).collect(), + // all values at a batch-level are non-null + array_mask: vec![true; num_rows], + max_definition: 1, + is_list: false, + // a batch is treated as nullable even though it has no nulls, + // this is required to compute nested type levels correctly + is_nullable: true, + } + } + + /// Compute nested levels of the Arrow array, recursing into lists and structs. + /// + /// Returns a list of `LevelInfo`, where each level is for nested primitive arrays. + /// + /// The algorithm works by eagerly incrementing non-null values, and decrementing + /// when a value is null. + /// + /// *Examples:* + /// + /// A record batch always starts at a populated definition = level 1. + /// When a batch only has a primitive, i.e. `>, column `a` + /// can only have a maximum level of 1 if it is not null. + /// If it is null, we decrement by 1, such that the null slots will = level 0. + /// + /// If a batch has nested arrays (list, struct, union, etc.), then the incrementing + /// takes place. + /// A `>` will have up to 2 levels (if nullable). + /// When calculating levels for `a`, we start with level 1 from the batch, + /// then if the struct slot is not empty, we increment by 1, such that we'd have `[2, 2, 2]` + /// if all 3 slots are not null. + /// If there is an empty slot, we decrement, leaving us with `[2, 0 (1-1), 2]` as the + /// null slot effectively means that no record is populated for the row altogether. + /// + /// When we encounter `b` which is primitive, we check if the supplied definition levels + /// equal the maximum level (i.e. level = 2). If the level < 2, then the parent of the + /// primitive (`a`) is already null, and `b` is kept as null. + /// If the level == 2, then we check if `b`'s slot is null, decrementing if it is null. + /// Thus we could have a final definition as: `[2, 0, 1]` indicating that only the first + /// slot is populated for `a.b`, the second one is all null, and only `a` has a value on the last. + /// + /// If expressed as JSON, this would be: + /// + /// ```json + /// {"a": {"b": 1}} + /// {"a": null} + /// {"a": {"b": null}} + /// ``` + /// + /// *Lists* + /// + /// TODO + /// + /// *Non-nullable arrays* + /// + /// If an array is non-nullable, this is accounted for when converting the Arrow schema to a + /// Parquet schema. + /// When dealing with `>` there is no issue, as the maximum + /// level will always be = 1. + /// + /// When dealing with nested types, the logic becomes a bit complicated. + /// A non-nullable struct; `>>` will only + /// have 1 maximum level, where 0 means `b` is null, and 1 means `b` is not null. + /// + /// We account for the above by checking if the `Field` is nullable, and adjusting + /// the `level` variable to determine which level the next child should increment or + /// decrement from. + pub(crate) fn calculate_array_levels( + &self, + array: &ArrayRef, + field: &Field, + level: i16, + ) -> Vec { + match array.data_type() { + DataType::Null => vec![Self { + definition: self.definition.iter().map(|d| (d - 1).max(0)).collect(), + repetition: self.repetition.clone(), + definition_mask: self.definition_mask.clone(), + array_offsets: self.array_offsets.clone(), + array_mask: self.array_mask.clone(), + // nulls will have all definitions being 0, so max value is reduced + max_definition: level - 1, + is_list: self.is_list, + is_nullable: true, // always nullable as all values are nulls + }], + DataType::Boolean + | DataType::Int8 + | DataType::Int16 + | DataType::Int32 + | DataType::Int64 + | DataType::UInt8 + | DataType::UInt16 + | DataType::UInt32 + | DataType::UInt64 + | DataType::Float16 + | DataType::Float32 + | DataType::Float64 + | DataType::Utf8 + | DataType::LargeUtf8 + | DataType::Timestamp(_, _) + | DataType::Date32(_) + | DataType::Date64(_) + | DataType::Time32(_) + | DataType::Time64(_) + | DataType::Duration(_) + | DataType::Interval(_) + | DataType::Binary + | DataType::LargeBinary => { + // we return a vector of 1 value to represent the primitive + // it is safe to inherit the parent level's repetition, but we have to calculate + // the child's own definition levels + vec![Self { + definition: self.get_primitive_def_levels(array, field), + // TODO: if we change this when working on lists, then update the above comment + repetition: self.repetition.clone(), + definition_mask: self.definition_mask.clone(), + array_offsets: self.array_offsets.clone(), + array_mask: self.array_mask.clone(), + is_list: self.is_list, + // if the current value is non-null, but it's a child of another, we reduce + // the max definition to indicate that all its applicable values can be taken + max_definition: level - ((!field.is_nullable() && level > 1) as i16), + is_nullable: field.is_nullable(), + }] + } + DataType::FixedSizeBinary(_) => unimplemented!(), + DataType::Decimal(_, _) => unimplemented!(), + DataType::List(_list_field) | DataType::LargeList(_list_field) => { + // TODO: ARROW-10766, it is better to not write lists at all until they are correct + todo!("List writing not yet implemented, see ARROW-10766") + } + DataType::FixedSizeList(_, _) => unimplemented!(), + DataType::Struct(struct_fields) => { + let struct_array: &StructArray = array + .as_any() + .downcast_ref::() + .expect("Unable to get struct array"); + let array_len = struct_array.len(); + let mut struct_def_levels = Vec::with_capacity(array_len); + let mut struct_mask = Vec::with_capacity(array_len); + // we can have a >, in which case we should check + // the parent struct in the child struct's offsets + for (i, def_level) in self.definition.iter().enumerate() { + if *def_level == level { + if !field.is_nullable() { + // if the field is non-nullable and current definition = parent, + // then we should neither increment nor decrement the level + struct_def_levels.push(level); + } else if struct_array.is_valid(i) { + // Increment to indicate that this value is not null + // The next level will decrement if it is null + struct_def_levels.push(level + 1); + } else { + // decrement to show that only the previous level is populated + // we only decrement if previous field is nullable because if it + // was not nullable, we can't decrement beyond its level + struct_def_levels.push(level - (self.is_nullable as i16)); + } + } else { + // this means that the previous level's slot was null, so we preserve it + struct_def_levels.push(*def_level); + } + // TODO: is it more efficient to use `bitvec` here? + struct_mask.push(struct_array.is_valid(i)); + } + // create levels for struct's fields, we accumulate them in this vec + let mut struct_levels = vec![]; + let struct_level_info = Self { + definition: struct_def_levels, + // inherit the parent's repetition + repetition: self.repetition.clone(), + // Is it correct to increment this by 1 level? + definition_mask: self + .definition_mask + .iter() + .map(|(state, index)| (*state, index + 1)) + .collect(), + // logically, a struct should inherit its parent's offsets + array_offsets: self.array_offsets.clone(), + // this should be just the struct's mask, not its parent's + array_mask: struct_mask, + max_definition: self.max_definition + (field.is_nullable() as i16), + is_list: self.is_list, + is_nullable: field.is_nullable(), + }; + struct_array + .columns() + .into_iter() + .zip(struct_fields) + .for_each(|(col, struct_field)| { + let mut levels = struct_level_info.calculate_array_levels( + col, + struct_field, + level + (field.is_nullable() as i16), + ); + struct_levels.append(&mut levels); + }); + struct_levels + } + DataType::Union(_) => unimplemented!(), + DataType::Dictionary(_, _) => { + // Need to check for these cases not implemented in C++: + // - "Writing DictionaryArray with nested dictionary type not yet supported" + // - "Writing DictionaryArray with null encoded in dictionary type not yet supported" + vec![Self { + definition: self.get_primitive_def_levels(array, field), + repetition: self.repetition.clone(), + definition_mask: self.definition_mask.clone(), + array_offsets: self.array_offsets.clone(), + array_mask: self.array_mask.clone(), + is_list: self.is_list, + max_definition: level, + is_nullable: field.is_nullable(), + }] + } + } + } + + /// Get the definition levels of the numeric array, with level 0 being null and 1 being not null + /// In the case where the array in question is a child of either a list or struct, the levels + /// are incremented in accordance with the `level` parameter. + /// Parent levels are either 0 or 1, and are used to higher (correct terminology?) leaves as null + fn get_primitive_def_levels(&self, array: &ArrayRef, field: &Field) -> Vec { + let mut array_index = 0; + let max_def_level = self.definition.iter().max().unwrap(); + let mut primitive_def_levels = vec![]; + self.definition.iter().for_each(|def_level| { + if !field.is_nullable() && *max_def_level > 1 { + primitive_def_levels.push(*def_level - 1); + array_index += 1; + } else if def_level < max_def_level { + primitive_def_levels.push(*def_level); + array_index += 1; + } else { + primitive_def_levels.push(def_level - array.is_null(array_index) as i16); + array_index += 1; + } + }); + primitive_def_levels + } +} diff --git a/rust/parquet/src/arrow/mod.rs b/rust/parquet/src/arrow/mod.rs index e64a3d75d82..9095259163f 100644 --- a/rust/parquet/src/arrow/mod.rs +++ b/rust/parquet/src/arrow/mod.rs @@ -53,6 +53,7 @@ pub(in crate::arrow) mod array_reader; pub mod arrow_reader; pub mod arrow_writer; pub(in crate::arrow) mod converter; +pub mod levels; pub(in crate::arrow) mod record_reader; pub mod schema;