From 6d159a9375904a6a6ed00666d48ada1a9f0cb94f Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Sat, 7 Nov 2020 15:08:31 +0200 Subject: [PATCH 1/6] ARROW-9728: [Rust] [Parquet] Nested definition & repetition for structs save progress (11/11/2020) save progress Integrating level calculations in writer Some tests are failing, still have a long way to go fix lints save progress I'm nearly able to reproduce a `>` I'm writing one level too high for nulls, so my null counts differ. Fixing this should result in nested struct roundtrip for the fully nullable case. Currently failing tests: ```rust failures: arrow::arrow_writer::tests::arrow_writer_2_level_struct arrow::arrow_writer::tests::arrow_writer_complex arrow::levels::tests::test_calculate_array_levels_2 arrow::levels::tests::test_calculate_array_levels_nested_list arrow::levels::tests::test_calculate_one_level_2 ``` They are mainly failing because we don't roundtrip lists correctly save progress 19/20-11-2020 Structs that have nulls are working (need to revert non-null logic) TODOs that need addressing later on save progress - Focused more on nested structs. - Confident that writes are now fine - Found issue with struct logical comparison, blocks this work add failing arrow struct array test a bit of cleanup for failing tests Also document why dictionary test is failing --- rust/parquet/src/arrow/arrow_writer.rs | 461 ++++++++++++---- rust/parquet/src/arrow/levels.rs | 692 +++++++++++++++++++++++++ rust/parquet/src/arrow/mod.rs | 1 + rust/parquet/src/column/writer.rs | 2 +- rust/parquet/src/util/bit_util.rs | 1 + 5 files changed, 1064 insertions(+), 93 deletions(-) create mode 100644 rust/parquet/src/arrow/levels.rs diff --git a/rust/parquet/src/arrow/arrow_writer.rs b/rust/parquet/src/arrow/arrow_writer.rs index dc9cf70a374..ae3b70420fa 100644 --- a/rust/parquet/src/arrow/arrow_writer.rs +++ b/rust/parquet/src/arrow/arrow_writer.rs @@ -20,11 +20,13 @@ use std::sync::Arc; use arrow::array as arrow_array; -use arrow::datatypes::{DataType as ArrowDataType, SchemaRef}; +use arrow::datatypes::{DataType as ArrowDataType, Field, SchemaRef}; use arrow::record_batch::RecordBatch; use arrow_array::Array; +use super::levels::LevelInfo; use super::schema::add_encoded_arrow_schema_to_metadata; + use crate::column::writer::{ColumnWriter, ColumnWriterImpl}; use crate::errors::{ParquetError, Result}; use crate::file::properties::WriterProperties; @@ -84,12 +86,28 @@ impl ArrowWriter { )); } // compute the definition and repetition levels of the batch + let num_rows = batch.num_rows(); let mut levels = vec![]; - batch.columns().iter().for_each(|array| { - let mut array_levels = - get_levels(array, 0, &vec![1i16; batch.num_rows()][..], None); - levels.append(&mut array_levels); - }); + let batch_level = LevelInfo { + definition: vec![1; num_rows], + repetition: None, + definition_mask: vec![(true, 1); num_rows], + array_offsets: (0..=(num_rows as i64)).collect(), + array_mask: vec![true; num_rows], + max_definition: 1, + is_list: false, + is_nullable: true, // setting as null treats non-null structs correctly + }; + // TODO: between `max_definition` and `level` below, one might have to be 0 + batch + .columns() + .iter() + .zip(batch.schema().fields()) + .for_each(|(array, field)| { + let mut array_levels = + calculate_array_levels(array, field, 1, &batch_level); + levels.append(&mut array_levels); + }); // reverse levels so we can use Vec::pop(&mut self) levels.reverse(); @@ -125,7 +143,7 @@ fn get_col_writer( fn write_leaves( mut row_group_writer: &mut Box, array: &arrow_array::ArrayRef, - mut levels: &mut Vec, + mut levels: &mut Vec, ) -> Result<()> { match array.data_type() { ArrowDataType::Null @@ -217,6 +235,9 @@ fn write_leaves( use std::convert::TryFrom; // This removes NULL values from the keys, but // they're encoded by the levels, so that's fine. + + // nevi-me: if we materialize values by iterating on the array, can't we instead 'just' cast to the values? + // in the failing dictionary test, the materialized values here are incorrect (missing 22345678) let materialized_values: Vec<_> = keys .into_iter() .flatten() @@ -232,11 +253,17 @@ fn write_leaves( materialized_values, ); + // I added this because we need to consider dictionaries in structs correctly, + // I don't think it's the cause for the failing test though, as the materialized_p_arr + // in the test is incorrect when it gets here (missing 22345678 value) + let indices = filter_array_indices(&levels); + let values = get_numeric_array_slice::( + &materialized_primitive_array, + &indices, + ); + writer.write_batch( - get_numeric_array_slice::( - &materialized_primitive_array, - ) - .as_slice(), + values.as_slice(), Some(levels.definition.as_slice()), levels.repetition.as_deref(), )?; @@ -318,7 +345,7 @@ where fn write_dict( array: &(dyn Array + 'static), writer: &mut ColumnWriterImpl, - levels: Levels, + levels: LevelInfo, ) -> Result<()> where T: DataType, @@ -336,8 +363,9 @@ where fn write_leaf( writer: &mut ColumnWriter, column: &arrow_array::ArrayRef, - levels: Levels, + levels: LevelInfo, ) -> Result { + let indices = filter_array_indices(&levels); let written = match writer { ColumnWriter::Int32ColumnWriter(ref mut typed) => { let array = arrow::compute::cast(column, &ArrowDataType::Int32)?; @@ -345,8 +373,10 @@ fn write_leaf( .as_any() .downcast_ref::() .expect("Unable to get int32 array"); + // assigning values to make it easier to debug + let slice = get_numeric_array_slice::(&array, &indices); // TODO: this function is incomplete as it doesn't take into account the actual definition in slicing typed.write_batch( - get_numeric_array_slice::(&array).as_slice(), + slice.as_slice(), Some(levels.definition.as_slice()), levels.repetition.as_deref(), )? @@ -354,7 +384,7 @@ fn write_leaf( ColumnWriter::BoolColumnWriter(ref mut typed) => { let array = arrow_array::BooleanArray::from(column.data()); typed.write_batch( - get_bool_array_slice(&array).as_slice(), + get_bool_array_slice(&array, &indices).as_slice(), Some(levels.definition.as_slice()), levels.repetition.as_deref(), )? @@ -362,7 +392,7 @@ fn write_leaf( ColumnWriter::Int64ColumnWriter(ref mut typed) => { let array = arrow_array::Int64Array::from(column.data()); typed.write_batch( - get_numeric_array_slice::(&array).as_slice(), + get_numeric_array_slice::(&array, &indices).as_slice(), Some(levels.definition.as_slice()), levels.repetition.as_deref(), )? @@ -373,7 +403,7 @@ fn write_leaf( ColumnWriter::FloatColumnWriter(ref mut typed) => { let array = arrow_array::Float32Array::from(column.data()); typed.write_batch( - get_numeric_array_slice::(&array).as_slice(), + get_numeric_array_slice::(&array, &indices).as_slice(), Some(levels.definition.as_slice()), levels.repetition.as_deref(), )? @@ -381,7 +411,7 @@ fn write_leaf( ColumnWriter::DoubleColumnWriter(ref mut typed) => { let array = arrow_array::Float64Array::from(column.data()); typed.write_batch( - get_numeric_array_slice::(&array).as_slice(), + get_numeric_array_slice::(&array, &indices).as_slice(), Some(levels.definition.as_slice()), levels.repetition.as_deref(), )? @@ -428,25 +458,64 @@ fn write_leaf( Ok(written as i64) } -/// A struct that represents definition and repetition levels. -/// Repetition levels are only populated if the parent or current leaf is repeated -#[derive(Debug)] -struct Levels { - definition: Vec, - repetition: Option>, -} - /// Compute nested levels of the Arrow array, recursing into lists and structs -fn get_levels( +/// Returns a list of `LevelInfo`, where each level is for nested primitive arrays. +/// +/// The algorithm works by eagerly incrementing non-null values, and decrementing +/// when a value is null. +/// +/// *Examples:* +/// +/// A record batch always starts at a populated definition = level 1. +/// When a batch only has a primitive, i.e. `>, column `a` +/// can only have a maximum level of 1 if it is not null. +/// If it is null, we decrement by 1, such that the null slots will = level 0. +/// +/// If a batch has nested arrays (list, struct, union, etc.), then the incrementing +/// takes place. +/// A `>` will have up to 2 levels (if nullable). +/// When calculating levels for `a`, if the struct slot is not empty, we +/// increment by 1, such that we'd have `[2, 2, 2]` if all 3 slots are not null. +/// If there is an empty slot, we decrement, leaving us with `[2, 0, 2]` as the +/// null slot effectively means that no record is populated for the row altogether. +/// +/// *Lists* +/// +/// TODO +/// +/// *Non-nullable arrays* +/// +/// If an array is non-nullable, this is accounted for when converting the Arrow +/// schema to a Parquet schema. +/// When dealing with `>` there is no issue, as the meximum +/// level will always be = 1. +/// +/// When dealing with nested types, the logic becomes a bit complicate. +/// A non-nullable struct; `>>` will only +/// have 1 maximum level, where 0 means `b` is nul, and 1 means `b` is not null. +/// +/// We account for the above by checking if the `Field` is nullable, and adjusting +/// the [inc|dec]rement accordingly. +fn calculate_array_levels( array: &arrow_array::ArrayRef, + field: &Field, level: i16, - parent_def_levels: &[i16], - parent_rep_levels: Option<&[i16]>, -) -> Vec { + level_info: &LevelInfo, +) -> Vec { match array.data_type() { - ArrowDataType::Null => vec![Levels { - definition: parent_def_levels.iter().map(|v| (v - 1).max(0)).collect(), - repetition: None, + ArrowDataType::Null => vec![LevelInfo { + definition: level_info + .definition + .iter() + .map(|d| (d - 1).max(0)) + .collect(), + repetition: level_info.repetition.clone(), + definition_mask: level_info.definition_mask.clone(), + array_offsets: level_info.array_offsets.clone(), + array_mask: level_info.array_mask.clone(), + max_definition: level, + is_list: level_info.is_list, + is_nullable: true, // always nullable as all values are nulls }], ArrowDataType::Boolean | ArrowDataType::Int8 @@ -470,13 +539,19 @@ fn get_levels( | ArrowDataType::Duration(_) | ArrowDataType::Interval(_) | ArrowDataType::Binary - | ArrowDataType::LargeBinary => vec![Levels { - definition: get_primitive_def_levels(array, parent_def_levels), - repetition: None, + | ArrowDataType::LargeBinary => vec![LevelInfo { + definition: get_primitive_def_levels(array, field, &level_info.definition), + repetition: level_info.repetition.clone(), + definition_mask: level_info.definition_mask.clone(), + array_offsets: level_info.array_offsets.clone(), + array_mask: level_info.array_mask.clone(), + is_list: level_info.is_list, + max_definition: level, + is_nullable: field.is_nullable(), }], ArrowDataType::FixedSizeBinary(_) => unimplemented!(), ArrowDataType::Decimal(_, _) => unimplemented!(), - ArrowDataType::List(_) | ArrowDataType::LargeList(_) => { + ArrowDataType::List(list_field) | ArrowDataType::LargeList(list_field) => { let array_data = array.data(); let child_data = array_data.child_data().get(0).unwrap(); // get offsets, accounting for large offsets if present @@ -492,10 +567,13 @@ fn get_levels( let mut list_def_levels = Vec::with_capacity(child_array.len()); let mut list_rep_levels = Vec::with_capacity(child_array.len()); - let rep_levels: Vec = parent_rep_levels + let rep_levels: Vec = level_info + .repetition + .clone() .map(|l| l.to_vec()) - .unwrap_or_else(|| vec![0i16; parent_def_levels.len()]); - parent_def_levels + .unwrap_or_else(|| vec![0i16; level_info.definition.len()]); + level_info + .definition .iter() .zip(rep_levels) .zip(offsets.windows(2)) @@ -526,9 +604,15 @@ fn get_levels( // if datatype is a primitive, we can construct levels of the child array match child_array.data_type() { // TODO: The behaviour of a > is untested - ArrowDataType::Null => vec![Levels { + ArrowDataType::Null => vec![LevelInfo { definition: list_def_levels, repetition: Some(list_rep_levels), + definition_mask: level_info.definition_mask.clone(), // TODO: list mask + array_offsets: offsets, + array_mask: level_info.array_mask.clone(), // TODO: list mask + is_list: true, + is_nullable: list_field.is_nullable(), + max_definition: level + 1, // TODO: compute correctly }], ArrowDataType::Boolean | ArrowDataType::Int8 @@ -549,11 +633,20 @@ fn get_levels( | ArrowDataType::Time64(_) | ArrowDataType::Duration(_) | ArrowDataType::Interval(_) => { - let def_levels = - get_primitive_def_levels(&child_array, &list_def_levels[..]); - vec![Levels { + let def_levels = get_primitive_def_levels( + &child_array, + list_field, + &list_def_levels[..], + ); + vec![LevelInfo { definition: def_levels, repetition: Some(list_rep_levels), + array_mask: vec![], + array_offsets: vec![], + definition_mask: vec![], + is_list: true, + is_nullable: list_field.is_nullable(), + max_definition: level + 1, // TODO: update }] } ArrowDataType::Binary @@ -567,33 +660,93 @@ fn get_levels( unimplemented!() } ArrowDataType::FixedSizeList(_, _) => unimplemented!(), - ArrowDataType::Struct(_) => get_levels( - array, - level + 1, // indicates a nesting level of 2 (list + struct) - &list_def_levels[..], - Some(&list_rep_levels[..]), - ), + ArrowDataType::Struct(_) => { + let struct_level_info = LevelInfo { + definition: list_def_levels, + repetition: Some(list_rep_levels), + definition_mask: vec![], + array_offsets: vec![], + array_mask: vec![], + max_definition: level + 1, + is_list: list_field.is_nullable(), + is_nullable: true, // // indicates a nesting level of 2 (list + struct) + }; + calculate_array_levels( + array, + list_field, + level + 1, // indicates a nesting level of 2 (list + struct) + &struct_level_info, + ) + } ArrowDataType::Union(_) => unimplemented!(), ArrowDataType::Dictionary(_, _) => unimplemented!(), } } ArrowDataType::FixedSizeList(_, _) => unimplemented!(), - ArrowDataType::Struct(_) => { + ArrowDataType::Struct(struct_fields) => { let struct_array: &arrow_array::StructArray = array .as_any() .downcast_ref::() .expect("Unable to get struct array"); - let mut struct_def_levels = Vec::with_capacity(struct_array.len()); - for i in 0..array.len() { - struct_def_levels.push(level + struct_array.is_valid(i) as i16); + let array_len = struct_array.len(); + let mut struct_def_levels = Vec::with_capacity(array_len); + let mut struct_mask = Vec::with_capacity(array_len); + // we can have a >, in which case we should check + // the parent struct in the child struct's offsets + for (i, def_level) in level_info.definition.iter().enumerate() { + if *def_level == level { + if !field.is_nullable() { + // push the level as is + struct_def_levels.push(level); + } else if struct_array.is_valid(i) { + // Increment to indicate that this value is not null + // The next level will decrement if it is null + // we can check if current value is null + struct_def_levels.push(level + 1); + } else { + // decrement to show that only the previous level is populated + // we only decrement if previous field is nullable + struct_def_levels.push(level - (level_info.is_nullable as i16)); + } + } else { + struct_def_levels.push(*def_level); + } + // TODO: is it more efficient to use `bitvec` here? + struct_mask.push(struct_array.is_valid(i)); } // trying to create levels for struct's fields let mut struct_levels = vec![]; - struct_array.columns().into_iter().for_each(|col| { - let mut levels = - get_levels(col, level + 1, &struct_def_levels[..], parent_rep_levels); - struct_levels.append(&mut levels); - }); + let struct_level_info = LevelInfo { + definition: struct_def_levels, + // TODO: inherit the parent's repetition? (relevant for >) + repetition: level_info.repetition.clone(), + // Is it correct to increment this by 1 level? + definition_mask: level_info + .definition_mask + .iter() + .map(|(state, index)| (*state, index + 1)) + .collect(), + // logically, a struct should inherit its parent's offsets + array_offsets: level_info.array_offsets.clone(), + // this should be just the struct's mask, not its parent's + array_mask: struct_mask, + max_definition: level_info.max_definition + (field.is_nullable() as i16), + is_list: level_info.is_list, + is_nullable: field.is_nullable(), + }; + struct_array + .columns() + .into_iter() + .zip(struct_fields) + .for_each(|(col, struct_field)| { + let mut levels = calculate_array_levels( + col, + struct_field, + level + (field.is_nullable() as i16), + &struct_level_info, + ); + struct_levels.append(&mut levels); + }); struct_levels } ArrowDataType::Union(_) => unimplemented!(), @@ -601,9 +754,19 @@ fn get_levels( // Need to check for these cases not implemented in C++: // - "Writing DictionaryArray with nested dictionary type not yet supported" // - "Writing DictionaryArray with null encoded in dictionary type not yet supported" - vec![Levels { - definition: get_primitive_def_levels(array, parent_def_levels), - repetition: None, + vec![LevelInfo { + definition: get_primitive_def_levels( + array, + field, + &level_info.definition, + ), + repetition: level_info.repetition.clone(), + definition_mask: level_info.definition_mask.clone(), + array_offsets: level_info.array_offsets.clone(), + array_mask: level_info.array_mask.clone(), + is_list: level_info.is_list, + max_definition: level, + is_nullable: field.is_nullable(), }] } } @@ -613,16 +776,31 @@ fn get_levels( /// In the case where the array in question is a child of either a list or struct, the levels /// are incremented in accordance with the `level` parameter. /// Parent levels are either 0 or 1, and are used to higher (correct terminology?) leaves as null +/// +/// TODO: (a comment to remove, note to help me reduce the mental bookkeeping) +/// We want an array's levels to be additive here, i.e. if we have an array that +/// comes from >, we should consume &[0; array.len()], so that +/// we add values to it, instead of subtract values +/// +/// An alternaitve is to pass the max level, and use it to compute whether we +/// should increment (though this is likely tricker) fn get_primitive_def_levels( array: &arrow_array::ArrayRef, + field: &Field, parent_def_levels: &[i16], ) -> Vec { let mut array_index = 0; let max_def_level = parent_def_levels.iter().max().unwrap(); let mut primitive_def_levels = vec![]; parent_def_levels.iter().for_each(|def_level| { - if def_level < max_def_level { + // TODO: if field is non-nullable, can its parent be nullable? Ideally shouldn't + // being non-null means that for a level > 1, then we should subtract 1? + if !field.is_nullable() && *max_def_level > 1 { + primitive_def_levels.push(*def_level - 1); + array_index += 1; + } else if def_level < max_def_level { primitive_def_levels.push(*def_level); + array_index += 1; } else { primitive_def_levels.push(def_level - array.is_null(array_index) as i16); array_index += 1; @@ -655,31 +833,59 @@ def_get_binary_array_fn!(get_large_string_array, arrow_array::LargeStringArray); /// Get the underlying numeric array slice, skipping any null values. /// If there are no null values, it might be quicker to get the slice directly instead of /// calling this function. -fn get_numeric_array_slice(array: &arrow_array::PrimitiveArray) -> Vec +fn get_numeric_array_slice( + array: &arrow_array::PrimitiveArray, + indices: &[usize], +) -> Vec where T: DataType, A: arrow::datatypes::ArrowNumericType, T::T: From, { - let mut values = Vec::with_capacity(array.len() - array.null_count()); - for i in 0..array.len() { - if array.is_valid(i) { - values.push(array.value(i).into()) - } + let mut values = Vec::with_capacity(indices.len()); + for i in indices { + values.push(array.value(*i).into()) } values } -fn get_bool_array_slice(array: &arrow_array::BooleanArray) -> Vec { - let mut values = Vec::with_capacity(array.len() - array.null_count()); - for i in 0..array.len() { - if array.is_valid(i) { - values.push(array.value(i)) - } +fn get_bool_array_slice( + array: &arrow_array::BooleanArray, + indices: &[usize], +) -> Vec { + let mut values = Vec::with_capacity(indices.len()); + for i in indices { + values.push(array.value(*i)) } values } +/// Given a level's information, calculate the offsets required to index an array +/// correctly. +fn filter_array_indices(level: &LevelInfo) -> Vec { + // TODO: we don't quite get the def levels right all the time, so for now we recalculate it + // this has the downside that if no values are populated, the slicing will be wrong + + // TODO: we should reliably track this, to avoid finding the max value + let max_def = level.definition.iter().max().cloned().unwrap(); + let mut filtered = vec![]; + // remove slots that are false from definition_mask + let mut index = 0; + level + .definition + .iter() + .zip(&level.definition_mask) + .for_each(|(def, (mask, _))| { + if *mask { + if *def == max_def { + filtered.push(index); + } + index += 1; + } + }); + filtered +} + #[cfg(test)] mod tests { use super::*; @@ -687,15 +893,13 @@ mod tests { use std::io::Seek; use std::sync::Arc; - use arrow::array::*; use arrow::datatypes::ToByteSlice; use arrow::datatypes::{DataType, Field, Schema, UInt32Type, UInt8Type}; use arrow::record_batch::RecordBatch; + use arrow::{array::*, buffer::Buffer}; use crate::arrow::{ArrowReader, ParquetFileArrowReader}; - use crate::file::{ - metadata::KeyValue, reader::SerializedFileReader, writer::InMemoryWriteableCursor, - }; + use crate::file::{reader::SerializedFileReader, writer::InMemoryWriteableCursor}; use crate::util::test_common::get_temp_file; #[test] @@ -772,6 +976,25 @@ mod tests { #[test] #[ignore = "repetitions might be incorrect, will be addressed as part of ARROW-9728"] + fn arrow_writer_non_null() { + // define schema + let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); + + // create some data + let a = Int32Array::from(vec![1, 2, 3, 4, 5]); + + // build a record batch + let batch = + RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)]).unwrap(); + + let file = get_temp_file("test_arrow_writer_non_null.parquet", &[]); + let mut writer = ArrowWriter::try_new(file, Arc::new(schema), None).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + } + + #[test] + #[ignore = "list support is incomplete"] fn arrow_writer_list() { // define schema let schema = Schema::new(vec![Field::new( @@ -870,6 +1093,7 @@ mod tests { } #[test] + #[ignore = "list support is incomplete"] fn arrow_writer_complex() { // define schema let struct_field_d = Field::new("d", DataType::Float64, true); @@ -927,23 +1151,76 @@ mod tests { // build a record batch let batch = RecordBatch::try_new( - Arc::new(schema.clone()), + Arc::new(schema), vec![Arc::new(a), Arc::new(b), Arc::new(c)], ) .unwrap(); - let props = WriterProperties::builder() - .set_key_value_metadata(Some(vec![KeyValue { - key: "test_key".to_string(), - value: Some("test_value".to_string()), - }])) + roundtrip("test_arrow_writer_complex.parquet", batch); + } + + #[test] + fn arrow_writer_2_level_struct() { + // tests writing > + let field_c = Field::new("c", DataType::Int32, true); + let field_b = Field::new("b", DataType::Struct(vec![field_c]), true); + let field_a = Field::new("a", DataType::Struct(vec![field_b.clone()]), true); + let schema = Schema::new(vec![field_a.clone()]); + + // create data + let c = Int32Array::from(vec![Some(1), None, Some(3), None, None, Some(6)]); + let b_data = ArrayDataBuilder::new(field_b.data_type().clone()) + .len(6) + .null_bit_buffer(Buffer::from(vec![0b00100111])) + .add_child_data(c.data()) .build(); + let b = StructArray::from(b_data); + let a_data = ArrayDataBuilder::new(field_a.data_type().clone()) + .len(6) + .null_bit_buffer(Buffer::from(vec![0b00101111])) + .add_child_data(b.data()) + .build(); + let a = StructArray::from(a_data); - let file = get_temp_file("test_arrow_writer_complex.parquet", &[]); - let mut writer = - ArrowWriter::try_new(file, Arc::new(schema), Some(props)).unwrap(); - writer.write(&batch).unwrap(); - writer.close().unwrap(); + assert_eq!(a.null_count(), 1); + assert_eq!(a.column(0).null_count(), 2); + + // build a racord batch + let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap(); + + roundtrip("test_arrow_writer_2_level_struct.parquet", batch); + } + + #[test] + fn arrow_writer_2_level_struct_non_null() { + // tests writing > + let field_c = Field::new("c", DataType::Int32, false); + let field_b = Field::new("b", DataType::Struct(vec![field_c]), true); + let field_a = Field::new("a", DataType::Struct(vec![field_b.clone()]), false); + let schema = Schema::new(vec![field_a.clone()]); + + // create data + let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]); + let b_data = ArrayDataBuilder::new(field_b.data_type().clone()) + .len(6) + .null_bit_buffer(Buffer::from(vec![0b00100111])) + .add_child_data(c.data()) + .build(); + let b = StructArray::from(b_data); + // a intentionally has no null buffer, to test that this is handled correctly + let a_data = ArrayDataBuilder::new(field_a.data_type().clone()) + .len(6) + .add_child_data(b.data()) + .build(); + let a = StructArray::from(a_data); + + assert_eq!(a.null_count(), 0); + assert_eq!(a.column(0).null_count(), 2); + + // build a racord batch + let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap(); + + roundtrip("test_arrow_writer_2_level_struct_non_null.parquet", batch); } const SMALL_SIZE: usize = 100; @@ -1292,7 +1569,7 @@ mod tests { } #[test] - #[ignore = "repetitions might be incorrect, will be addressed as part of ARROW-9728"] + #[ignore = "list support is incomplete"] fn list_single_column() { let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); let a_value_offsets = @@ -1317,7 +1594,7 @@ mod tests { } #[test] - #[ignore = "repetitions might be incorrect, will be addressed as part of ARROW-9728"] + #[ignore = "list support is incomplete"] fn large_list_single_column() { let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); let a_value_offsets = diff --git a/rust/parquet/src/arrow/levels.rs b/rust/parquet/src/arrow/levels.rs new file mode 100644 index 00000000000..489db17b2ac --- /dev/null +++ b/rust/parquet/src/arrow/levels.rs @@ -0,0 +1,692 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Contains the logic for computing definition and repetition levels + +#[derive(Debug, Eq, PartialEq, Clone)] +pub(crate) struct LevelInfo { + /// Array's definition levels + pub definition: Vec, + /// Array's optional repetition levels + pub repetition: Option>, + /// Definition mask, to indicate null ListArray slots that should be skipped + pub definition_mask: Vec<(bool, i16)>, + /// Array's offsets, 64-bit is used to accommodate large offset arrays + pub array_offsets: Vec, + /// Array's validity mask + pub array_mask: Vec, + /// The maximum definition at this level, 0 at the root (record batch) [TODO: the 0 might be inaccurate] + pub max_definition: i16, + /// Whether this array or any of its parents is a list + pub is_list: bool, + /// Whether the array is nullable (affects definition levels) + pub is_nullable: bool, +} + +impl LevelInfo { + fn calculate_child_levels( + &self, + array_offsets: Vec, + array_mask: Vec, + is_list: bool, + is_nullable: bool, + current_def_level: i16, + ) -> Self { + let mut definition = vec![]; + let mut repetition = vec![]; + let mut definition_mask = vec![]; + let has_repetition = self.is_list || is_list; + + // keep track of parent definition nulls seen through the definition_mask + let mut nulls_seen = 0; + + // push any initial array slots that are null + while !self.definition_mask[nulls_seen].0 + && self.definition_mask[nulls_seen].1 + 2 < current_def_level + { + definition_mask.push(self.definition_mask[nulls_seen]); + definition.push(self.definition[nulls_seen]); + repetition.push(0); // TODO is it always 0? + nulls_seen += 1; + println!("Definition length e: {}", definition.len()); + } + + // we use this index to determine if a repetition should be populated based + // on its definition at the index. It needs to be outside of the loop + let mut def_index = 0; + + self.array_offsets.windows(2).for_each(|w| { + // the parent's index allows us to iterate through its offsets and the child's + let from = w[0] as usize; + let to = w[1] as usize; + // dbg!((from, to)); + // if the parent slot is empty, fill it once to show the nullness + if from == to { + definition.push(self.max_definition - 1); + repetition.push(0); + definition_mask.push((false, self.max_definition - 1)); + println!("Definition length d: {}", definition.len()); + } + + (from..to).for_each(|index| { + println!( + "Array level: {}, parent offset: {}", + current_def_level, index + ); + let parent_mask = &self.definition_mask[index + nulls_seen]; + // TODO: this might need to be < instead of ==, but we generate duplicates in that case + if !parent_mask.0 && parent_mask.1 == current_def_level { + println!("Parent mask c: {:?}", parent_mask); + nulls_seen += 1; + definition.push(self.max_definition); + repetition.push(1); + definition_mask.push(*parent_mask); + println!("Definition length c: {}", definition.len()); + } + let mask = array_mask[index]; + let array_from = array_offsets[index]; + let array_to = array_offsets[index + 1]; + + let parent_def_level = &self.definition[index + nulls_seen]; + + // if array_len == 0, the child is null + let array_len = array_to - array_from; + + // compute the definition level + // what happens if array's len is 0? + if array_len == 0 { + definition.push(self.max_definition); + repetition.push(0); // TODO: validate that this is 0 for deeply nested lists + definition_mask.push((false, current_def_level)); + println!("Definition length b: {}", definition.len()); + } + (array_from..array_to).for_each(|_| { + definition.push(if *parent_def_level == self.max_definition { + // TODO: haven't validated this in deeply-nested lists + self.max_definition + mask as i16 + } else { + *parent_def_level + }); + definition_mask.push((true, current_def_level)); + println!("Definition length a: {}", definition.len()); + }); + + // 11-11-2020 (23:57GMT) + // we are pushing defined repetitions even if a definition is < max + // I had initially separated the repetition logic here so that I + // don't perform a `has_repetition` check on each loop. + // The downside's that I now need to index into `definitions` so I + // can check if a value is defined or not. + + if has_repetition && array_len > 0 { + // compute the repetition level + + // dbg!(&definition); + // dbg!(current_def_level, parent_level.max_definition); + // dbg!(&parent_level.repetition); + match &self.repetition { + Some(rep) => { + let parent_rep = rep[index]; + // TODO(11/11/2020) need correct variable to mask repetitions correctly + if definition[def_index] == current_def_level { + repetition.push(parent_rep); + println!("* Index {} definition is {}, and repetition is {}. Current def: {}", def_index, definition[def_index], parent_rep, current_def_level); + dbg!(&repetition); + def_index += 1; + (1..array_len).for_each(|_| { + println!("* Index {} definition is {}, and repetition is {}. Current def: {}", def_index, definition[def_index], parent_rep, current_def_level); + repetition.push(current_def_level); // was parent_rep + 1 + def_index += 1; + }); + } else { + (0..array_len).for_each(|_| { + println!("* Index {} definition is {}, and repetition is {}. Current def: {}", def_index, definition[def_index], parent_rep, current_def_level); + repetition.push(0); // TODO: should it be anything else? + // TODO: use an append instead of pushes + def_index += 1; + }); + } + } + None => { + println!("+ Index {} definition is {}, and repetition is 0. Current def: {}", def_index, definition[def_index], current_def_level); + // if definition[def_index] == current_def_level { + repetition.push(0); + def_index += 1; + (1..array_len).for_each(|_| { + repetition.push(1); // TODO: is it always 0 and 1? + def_index += 1; + }); + // } else { + // (0..array_len).for_each(|_| { + // repetition.push(0); // TODO: should it be anything else? + // // TODO: use an append instead of pushes + // def_index += 1; + // }); + // } + } + } + } + }); + }); + + let lev = LevelInfo { + definition, + repetition: if !has_repetition { + None + } else { + Some(repetition) + }, + definition_mask, + array_mask, + array_offsets, + is_list: has_repetition, + max_definition: current_def_level, + is_nullable, + }; + + println!("done"); + + lev + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_calculate_array_levels_twitter_example() { + // based on the example at https://blog.twitter.com/engineering/en_us/a/2013/dremel-made-simple-with-parquet.html + // [[a, b, c], [d, e, f, g]], [[h], [i,j]] + let parent_levels = LevelInfo { + definition: vec![0, 0], + repetition: None, + definition_mask: vec![(true, 1), (true, 1)], + array_offsets: vec![0, 1, 2], // 2 records, root offsets always sequential + array_mask: vec![true, true], // both lists defined + max_definition: 0, // at the root, set to 0 + is_list: false, // root is never list + is_nullable: false, // root in example is non-nullable + }; + // offset into array, each level1 has 2 values + let array_offsets = vec![0, 2, 4]; + let array_mask = vec![true, true]; + + // calculate level1 levels + let levels = parent_levels.calculate_child_levels( + array_offsets.clone(), + array_mask.clone(), + true, + false, + 1, + ); + // + let expected_levels = LevelInfo { + definition: vec![1, 1, 1, 1], + repetition: Some(vec![0, 1, 0, 1]), + definition_mask: vec![(true, 1), (true, 1), (true, 1), (true, 1)], + array_offsets, + array_mask, + max_definition: 1, + is_list: true, + is_nullable: false, + }; + assert_eq!(levels, expected_levels); + + // level2 + let parent_levels = levels; + let array_offsets = vec![0, 3, 7, 8, 10]; + let array_mask = vec![true, true, true, true]; + let levels = parent_levels.calculate_child_levels( + array_offsets.clone(), + array_mask.clone(), + true, + false, + 2, + ); + let expected_levels = LevelInfo { + definition: vec![2, 2, 2, 2, 2, 2, 2, 2, 2, 2], + repetition: Some(vec![0, 2, 2, 1, 2, 2, 2, 0, 1, 2]), + definition_mask: vec![ + (true, 2), + (true, 2), + (true, 2), + (true, 2), + (true, 2), + (true, 2), + (true, 2), + (true, 2), + (true, 2), + (true, 2), + ], + array_offsets, + array_mask, + max_definition: 2, + is_list: true, + is_nullable: false, + }; + assert_eq!(&levels, &expected_levels); + } + + #[test] + fn test_calculate_one_level_1() { + // This test calculates the levels for a non-null primitive array + let parent_levels = LevelInfo { + definition: vec![1; 10], + repetition: None, + definition_mask: vec![(true, 1); 10], + array_offsets: (0..=10).collect(), + array_mask: vec![true; 10], + max_definition: 0, + is_list: false, + is_nullable: false, + }; + let array_offsets: Vec = (0..=10).collect(); + let array_mask = vec![true; 10]; + + let levels = parent_levels.calculate_child_levels( + array_offsets.clone(), + array_mask.clone(), + false, + false, + 1, + ); + let expected_levels = LevelInfo { + definition: vec![1; 10], + repetition: None, + definition_mask: vec![(true, 1); 10], + array_offsets, + array_mask, + max_definition: 1, + is_list: false, + is_nullable: false, + }; + assert_eq!(&levels, &expected_levels); + } + + #[test] + #[ignore] + fn test_calculate_one_level_2() { + // This test calculates the levels for a non-null primitive array + let parent_levels = LevelInfo { + definition: vec![1; 5], + repetition: None, + definition_mask: vec![ + (true, 1), + (false, 1), + (true, 1), + (true, 1), + (false, 1), + ], + array_offsets: (0..=5).collect(), + array_mask: vec![true, false, true, true, false], + max_definition: 0, + is_list: false, + is_nullable: true, + }; + let array_offsets: Vec = (0..=5).collect(); + let array_mask = vec![true, false, true, true, false]; + + let levels = parent_levels.calculate_child_levels( + array_offsets.clone(), + array_mask.clone(), + false, + false, + 1, + ); + let expected_levels = LevelInfo { + definition: vec![1; 5], + repetition: None, + definition_mask: vec![(true, 1); 5], + array_offsets, + array_mask, + max_definition: 1, + is_list: false, + is_nullable: false, + }; + assert_eq!(&levels, &expected_levels); + } + + #[test] + fn test_calculate_array_levels_1() { + // if all array values are defined (e.g. batch>) + // [[0], [1], [2], [3], [4]] + let parent_levels = LevelInfo { + definition: vec![0, 0, 0, 0, 0], + repetition: None, + definition_mask: vec![(true, 1), (true, 1), (true, 1), (true, 1), (true, 1)], + array_offsets: vec![0, 1, 2, 3, 4, 5], + array_mask: vec![true, true, true, true, true], + max_definition: 0, + is_list: false, + is_nullable: false, + }; + let array_offsets = vec![0, 2, 2, 4, 8, 11]; + let array_mask = vec![true, false, true, true, true]; + + let levels = parent_levels.calculate_child_levels( + array_offsets.clone(), + array_mask.clone(), + true, + false, + 1, + ); + // array: [[0, 0], _1_, [2, 2], [3, 3, 3, 3], [4, 4, 4]] + // all values are defined as we do not have nulls on the root (batch) + // repetition: + // 0: 0, 1 + // 1: + // 2: 0, 1 + // 3: 0, 1, 1, 1 + // 4: 0, 1, 1 + let expected_levels = LevelInfo { + definition: vec![1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1], + repetition: Some(vec![0, 1, 0, 0, 1, 0, 1, 1, 1, 0, 1, 1]), + definition_mask: vec![ + (true, 1), + (true, 1), + (false, 1), + (true, 1), + (true, 1), + (true, 1), + (true, 1), + (true, 1), + (true, 1), + (true, 1), + (true, 1), + (true, 1), + ], + array_offsets, + array_mask, + max_definition: 1, + is_list: true, + is_nullable: false, + }; + assert_eq!(levels, expected_levels); + } + + #[test] + #[ignore] + fn test_calculate_array_levels_2() { + // if some values are null + let parent_levels = LevelInfo { + definition: vec![0, 1, 0, 1, 1], + repetition: None, + definition_mask: vec![ + (false, 1), + (true, 1), + (false, 1), + (true, 1), + (true, 1), + ], + array_offsets: vec![0, 1, 2, 3, 4, 5], + array_mask: vec![false, true, false, true, true], + max_definition: 0, + is_list: false, + is_nullable: true, + }; + let array_offsets = vec![0, 2, 2, 4, 8, 11]; + let array_mask = vec![true, false, true, true, true]; + + let levels = parent_levels.calculate_child_levels( + array_offsets.clone(), + array_mask.clone(), + true, + true, + 1, + ); + let expected_levels = LevelInfo { + // 0 1 [2] are 0 (not defined at level 1) + // [2] is 1, but has 0 slots so is not populated (defined at level 1 only) + // 2 3 [4] are 0 + // 4 5 6 7 [8] are 1 (defined at level 1 only) + // 8 9 10 [11] are 2 (defined at both levels) + definition: vec![0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1], + repetition: Some(vec![0, 1, 0, 0, 1, 0, 1, 1, 1, 0, 1, 1]), + definition_mask: vec![ + (true, 1), + (true, 1), + (false, 1), + (true, 1), + (true, 1), + (true, 1), + (true, 1), + (true, 1), + (true, 1), + (true, 1), + (true, 1), + (true, 1), + ], + array_offsets, + array_mask, + max_definition: 1, + is_nullable: true, + is_list: true, + }; + assert_eq!(&levels, &expected_levels); + + // nested lists (using previous test) + let _nested_parent_levels = levels; + let array_offsets = vec![0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22]; + let array_mask = vec![ + true, true, true, true, true, true, true, true, true, true, true, + ]; + let levels = parent_levels.calculate_child_levels( + array_offsets.clone(), + array_mask.clone(), + true, + true, + 2, + ); + let expected_levels = LevelInfo { + // (def: 0) 0 1 [2] are 0 (take parent) + // (def: 0) 2 3 [4] are 0 (take parent) + // (def: 0) 4 5 [6] are 0 (take parent) + // (def: 0) 6 7 [8] are 0 (take parent) + // (def: 1) 8 9 [10] are 1 (take parent) + // (def: 1) 10 11 [12] are 1 (take parent) + // (def: 1) 12 23 [14] are 1 (take parent) + // (def: 1) 14 15 [16] are 1 (take parent) + // (def: 2) 16 17 [18] are 2 (defined at all levels) + // (def: 2) 18 19 [20] are 2 (defined at all levels) + // (def: 2) 20 21 [22] are 2 (defined at all levels) + definition: vec![ + 0, 0, 0, 0, 0i16, 0, 0, 0, 0, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, + ], + // TODO: this doesn't feel right, needs some validation + repetition: Some(vec![ + 0, 0, 0, 0, 0i16, 0, 0, 0, 0, 0, 3, 1, 3, 1, 3, 1, 3, 0, 3, 1, 3, 1, 3, + ]), + definition_mask: vec![ + (false, 0), + (false, 0), + (false, 0), + (false, 0), + (false, 0), + (false, 0), + (false, 0), + (false, 0), + (false, 0), + (false, 0), + (false, 0), + (false, 0), + (false, 0), + (false, 0), + (false, 0), + (false, 0), + (false, 0), + (false, 0), + (false, 0), + (false, 0), + (false, 0), + (false, 0), + (false, 0), + ], + array_offsets, + array_mask, + max_definition: 3, + is_nullable: true, + is_list: true, + }; + assert_eq!(levels, expected_levels); + } + + #[test] + #[ignore] + fn test_calculate_array_levels_nested_list() { + // if all array values are defined (e.g. batch>) + let parent_levels = LevelInfo { + definition: vec![0, 0, 0, 0], + repetition: None, + definition_mask: vec![(true, 1), (true, 1), (true, 1), (true, 1)], + array_offsets: vec![0, 1, 2, 3, 4], + array_mask: vec![true, true, true, true], + max_definition: 0, + is_list: false, + is_nullable: false, + }; + let array_offsets = vec![0, 0, 3, 5, 7]; + let array_mask = vec![false, true, true, true]; + + let levels = parent_levels.calculate_child_levels( + array_offsets.clone(), + array_mask.clone(), + true, + false, + 1, + ); + let expected_levels = LevelInfo { + definition: vec![0, 1, 1, 1, 1, 1, 1, 1], + repetition: Some(vec![0, 0, 1, 1, 0, 1, 0, 1]), + definition_mask: vec![ + (false, 1), + (true, 1), + (true, 1), + (true, 1), + (true, 1), + (true, 1), + (true, 1), + (true, 1), + ], + array_offsets, + array_mask, + max_definition: 1, + is_list: true, + is_nullable: false, + }; + assert_eq!(levels, expected_levels); + + // nested lists (using previous test) + let _nested_parent_levels = levels; + let array_offsets = vec![0, 1, 3, 3, 6, 10, 10, 15]; + let array_mask = vec![true, true, false, true, true, false, true]; + let levels = parent_levels.calculate_child_levels( + array_offsets, + array_mask, + true, + true, + 2, + ); + let expected_levels = LevelInfo { + definition: vec![0, 2, 2, 2, 1, 2, 2, 2, 2, 2, 2, 2, 1, 2, 2, 2, 2, 2], + repetition: Some(vec![0, 0, 1, 2, 1, 0, 2, 2, 1, 2, 2, 2, 0, 1, 2, 2, 2, 2]), + definition_mask: vec![ + (false, 1), + (true, 2), + (true, 2), + (true, 2), + (false, 2), + (true, 2), + (true, 2), + (true, 2), + (true, 2), + (true, 2), + (true, 2), + (true, 2), + (false, 2), + (true, 2), + (true, 2), + (true, 2), + (true, 2), + (true, 2), + ], + array_mask: vec![true, true, false, true, true, false, true], + array_offsets: vec![0, 1, 3, 3, 6, 10, 10, 15], + is_list: true, + is_nullable: true, + max_definition: 2, + }; + assert_eq!(levels, expected_levels); + } + + #[test] + fn test_calculate_nested_struct_levels() { + // tests a > + // array: + // - {a: {b: {c: 1}}} + // - {a: {b: {c: null}}} + // - {a: {b: {c: 3}}} + // - {a: {b: null}} + // - {a: null}} + // - {a: {b: {c: 6}}} + let a_levels = LevelInfo { + definition: vec![1, 1, 1, 1, 0, 1], + repetition: None, + // should all be true if we haven't encountered a list + definition_mask: vec![(true, 1); 6], + array_offsets: (0..=6).collect(), + array_mask: vec![true, true, true, true, false, true], + max_definition: 1, + is_list: false, + is_nullable: true, + }; + // b's offset and mask + let b_offsets: Vec = (0..=6).collect(); + let b_mask = vec![true, true, true, false, false, true]; + // b's expected levels + let b_expected_levels = LevelInfo { + definition: vec![2, 2, 2, 1, 0, 2], + repetition: None, + definition_mask: vec![(true, 2); 6], + array_offsets: (0..=6).collect(), + array_mask: vec![true, true, true, false, false, true], + max_definition: 2, + is_list: false, + is_nullable: true, + }; + let b_levels = + a_levels.calculate_child_levels(b_offsets.clone(), b_mask, false, true, 2); + assert_eq!(&b_expected_levels, &b_levels); + + // c's offset and mask + let c_offsets = b_offsets; + let c_mask = vec![true, false, true, false, false, true]; + // c's expected levels + let c_expected_levels = LevelInfo { + definition: vec![3, 2, 3, 1, 0, 3], + repetition: None, + definition_mask: vec![(true, 3); 6], + array_offsets: c_offsets.clone(), + array_mask: vec![true, false, true, false, false, true], + max_definition: 3, + is_list: false, + is_nullable: true, + }; + let c_levels = b_levels.calculate_child_levels(c_offsets, c_mask, false, true, 3); + assert_eq!(&c_expected_levels, &c_levels); + } +} diff --git a/rust/parquet/src/arrow/mod.rs b/rust/parquet/src/arrow/mod.rs index e64a3d75d82..9095259163f 100644 --- a/rust/parquet/src/arrow/mod.rs +++ b/rust/parquet/src/arrow/mod.rs @@ -53,6 +53,7 @@ pub(in crate::arrow) mod array_reader; pub mod arrow_reader; pub mod arrow_writer; pub(in crate::arrow) mod converter; +pub mod levels; pub(in crate::arrow) mod record_reader; pub mod schema; diff --git a/rust/parquet/src/column/writer.rs b/rust/parquet/src/column/writer.rs index 9e1188ff8fb..c36bf946250 100644 --- a/rust/parquet/src/column/writer.rs +++ b/rust/parquet/src/column/writer.rs @@ -317,7 +317,7 @@ impl ColumnWriterImpl { } if let Some(nulls) = null_count { - self.num_column_nulls += nulls; + self.num_column_nulls += nulls; // TODO: null count doesn't seem to be computed } let calculate_page_stats = (min.is_none() || max.is_none()) diff --git a/rust/parquet/src/util/bit_util.rs b/rust/parquet/src/util/bit_util.rs index 5ccd1636b7b..a19bf108e99 100644 --- a/rust/parquet/src/util/bit_util.rs +++ b/rust/parquet/src/util/bit_util.rs @@ -332,6 +332,7 @@ impl BitWriter { #[inline] pub fn put_value(&mut self, v: u64, num_bits: usize) -> bool { assert!(num_bits <= 64); + // TODO:why does this cause crashes in tests? assert_eq!(v.checked_shr(num_bits as u32).unwrap_or(0), 0); // covers case v >> 64 if self.byte_offset * 8 + self.bit_offset + num_bits > self.max_bytes as usize * 8 From 2231ef0b90d3b16964d254e8ee2ece5a065be3ee Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Sat, 28 Nov 2020 14:20:07 +0200 Subject: [PATCH 2/6] simplify dictionary writes --- rust/parquet/src/arrow/arrow_writer.rs | 157 ++----------------------- rust/parquet/src/column/writer.rs | 2 +- rust/parquet/src/util/bit_util.rs | 1 - 3 files changed, 10 insertions(+), 150 deletions(-) diff --git a/rust/parquet/src/arrow/arrow_writer.rs b/rust/parquet/src/arrow/arrow_writer.rs index ae3b70420fa..20206219752 100644 --- a/rust/parquet/src/arrow/arrow_writer.rs +++ b/rust/parquet/src/arrow/arrow_writer.rs @@ -27,7 +27,7 @@ use arrow_array::Array; use super::levels::LevelInfo; use super::schema::add_encoded_arrow_schema_to_metadata; -use crate::column::writer::{ColumnWriter, ColumnWriterImpl}; +use crate::column::writer::ColumnWriter; use crate::errors::{ParquetError, Result}; use crate::file::properties::WriterProperties; use crate::{ @@ -195,96 +195,17 @@ fn write_leaves( } Ok(()) } - ArrowDataType::Dictionary(key_type, value_type) => { - use arrow_array::{PrimitiveArray, StringArray}; - use ArrowDataType::*; - use ColumnWriter::*; + ArrowDataType::Dictionary(_, value_type) => { + // cast dictionary to a primitive + let array = arrow::compute::cast(array, value_type)?; - let array = &**array; let mut col_writer = get_col_writer(&mut row_group_writer)?; - let levels = levels.pop().expect("Levels exhausted"); - - macro_rules! dispatch_dictionary { - ($($kt: pat, $vt: pat, $w: ident => $kat: ty, $vat: ty,)*) => ( - match (&**key_type, &**value_type, &mut col_writer) { - $(($kt, $vt, $w(writer)) => write_dict::<$kat, $vat, _>(array, writer, levels),)* - (kt, vt, _) => unreachable!("Shouldn't be attempting to write dictionary of <{:?}, {:?}>", kt, vt), - } - ); - } - - if let (UInt8, UInt32, Int32ColumnWriter(writer)) = - (&**key_type, &**value_type, &mut col_writer) - { - let typed_array = array - .as_any() - .downcast_ref::() - .expect("Unable to get dictionary array"); - - let keys = typed_array.keys(); - - let value_buffer = typed_array.values(); - let value_array = - arrow::compute::cast(&value_buffer, &ArrowDataType::Int32)?; - - let values = value_array - .as_any() - .downcast_ref::() - .unwrap(); - - use std::convert::TryFrom; - // This removes NULL values from the keys, but - // they're encoded by the levels, so that's fine. - - // nevi-me: if we materialize values by iterating on the array, can't we instead 'just' cast to the values? - // in the failing dictionary test, the materialized values here are incorrect (missing 22345678) - let materialized_values: Vec<_> = keys - .into_iter() - .flatten() - .map(|key| { - usize::try_from(key) - .unwrap_or_else(|k| panic!("key {} does not fit in usize", k)) - }) - .map(|key| values.value(key)) - .collect(); - - let materialized_primitive_array = - PrimitiveArray::::from( - materialized_values, - ); - - // I added this because we need to consider dictionaries in structs correctly, - // I don't think it's the cause for the failing test though, as the materialized_p_arr - // in the test is incorrect when it gets here (missing 22345678 value) - let indices = filter_array_indices(&levels); - let values = get_numeric_array_slice::( - &materialized_primitive_array, - &indices, - ); - - writer.write_batch( - values.as_slice(), - Some(levels.definition.as_slice()), - levels.repetition.as_deref(), - )?; - row_group_writer.close_column(col_writer)?; - - return Ok(()); - } - - dispatch_dictionary!( - Int8, Utf8, ByteArrayColumnWriter => arrow::datatypes::Int8Type, StringArray, - Int16, Utf8, ByteArrayColumnWriter => arrow::datatypes::Int16Type, StringArray, - Int32, Utf8, ByteArrayColumnWriter => arrow::datatypes::Int32Type, StringArray, - Int64, Utf8, ByteArrayColumnWriter => arrow::datatypes::Int64Type, StringArray, - UInt8, Utf8, ByteArrayColumnWriter => arrow::datatypes::UInt8Type, StringArray, - UInt16, Utf8, ByteArrayColumnWriter => arrow::datatypes::UInt16Type, StringArray, - UInt32, Utf8, ByteArrayColumnWriter => arrow::datatypes::UInt32Type, StringArray, - UInt64, Utf8, ByteArrayColumnWriter => arrow::datatypes::UInt64Type, StringArray, + write_leaf( + &mut col_writer, + &array, + levels.pop().expect("Levels exhausted"), )?; - row_group_writer.close_column(col_writer)?; - Ok(()) } ArrowDataType::Float16 => Err(ParquetError::ArrowError( @@ -299,67 +220,6 @@ fn write_leaves( } } -trait Materialize { - type Output; - - // Materialize the packed dictionary. The writer will later repack it. - fn materialize(&self) -> Vec; -} - -impl Materialize for dyn Array -where - K: arrow::datatypes::ArrowDictionaryKeyType, -{ - type Output = ByteArray; - - fn materialize(&self) -> Vec { - use arrow::datatypes::ArrowNativeType; - - let typed_array = self - .as_any() - .downcast_ref::>() - .expect("Unable to get dictionary array"); - - let keys = typed_array.keys(); - - let value_buffer = typed_array.values(); - let values = value_buffer - .as_any() - .downcast_ref::() - .unwrap(); - - // This removes NULL values from the keys, but - // they're encoded by the levels, so that's fine. - keys.into_iter() - .flatten() - .map(|key| { - key.to_usize() - .unwrap_or_else(|| panic!("key {:?} does not fit in usize", key)) - }) - .map(|key| values.value(key)) - .map(ByteArray::from) - .collect() - } -} - -fn write_dict( - array: &(dyn Array + 'static), - writer: &mut ColumnWriterImpl, - levels: LevelInfo, -) -> Result<()> -where - T: DataType, - dyn Array: Materialize, -{ - writer.write_batch( - &array.materialize(), - Some(levels.definition.as_slice()), - levels.repetition.as_deref(), - )?; - - Ok(()) -} - fn write_leaf( writer: &mut ColumnWriter, column: &arrow_array::ArrayRef, @@ -1192,6 +1052,7 @@ mod tests { } #[test] + #[ignore = "waiting on inheritance of nested structs, ARROW-10684"] fn arrow_writer_2_level_struct_non_null() { // tests writing > let field_c = Field::new("c", DataType::Int32, false); diff --git a/rust/parquet/src/column/writer.rs b/rust/parquet/src/column/writer.rs index c36bf946250..9e1188ff8fb 100644 --- a/rust/parquet/src/column/writer.rs +++ b/rust/parquet/src/column/writer.rs @@ -317,7 +317,7 @@ impl ColumnWriterImpl { } if let Some(nulls) = null_count { - self.num_column_nulls += nulls; // TODO: null count doesn't seem to be computed + self.num_column_nulls += nulls; } let calculate_page_stats = (min.is_none() || max.is_none()) diff --git a/rust/parquet/src/util/bit_util.rs b/rust/parquet/src/util/bit_util.rs index a19bf108e99..5ccd1636b7b 100644 --- a/rust/parquet/src/util/bit_util.rs +++ b/rust/parquet/src/util/bit_util.rs @@ -332,7 +332,6 @@ impl BitWriter { #[inline] pub fn put_value(&mut self, v: u64, num_bits: usize) -> bool { assert!(num_bits <= 64); - // TODO:why does this cause crashes in tests? assert_eq!(v.checked_shr(num_bits as u32).unwrap_or(0), 0); // covers case v >> 64 if self.byte_offset * 8 + self.bit_offset + num_bits > self.max_bytes as usize * 8 From 340079c1786f63fffcc6d0422f55a1b765ceedfd Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Sat, 28 Nov 2020 22:46:40 +0200 Subject: [PATCH 3/6] move things around strip out list support, to be worked on separately --- rust/parquet/src/arrow/arrow_writer.rs | 414 ++--------- rust/parquet/src/arrow/levels.rs | 930 ++++++++----------------- 2 files changed, 325 insertions(+), 1019 deletions(-) diff --git a/rust/parquet/src/arrow/arrow_writer.rs b/rust/parquet/src/arrow/arrow_writer.rs index 20206219752..41c1acb2c41 100644 --- a/rust/parquet/src/arrow/arrow_writer.rs +++ b/rust/parquet/src/arrow/arrow_writer.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use arrow::array as arrow_array; -use arrow::datatypes::{DataType as ArrowDataType, Field, SchemaRef}; +use arrow::datatypes::{DataType as ArrowDataType, SchemaRef}; use arrow::record_batch::RecordBatch; use arrow_array::Array; @@ -86,26 +86,15 @@ impl ArrowWriter { )); } // compute the definition and repetition levels of the batch - let num_rows = batch.num_rows(); let mut levels = vec![]; - let batch_level = LevelInfo { - definition: vec![1; num_rows], - repetition: None, - definition_mask: vec![(true, 1); num_rows], - array_offsets: (0..=(num_rows as i64)).collect(), - array_mask: vec![true; num_rows], - max_definition: 1, - is_list: false, - is_nullable: true, // setting as null treats non-null structs correctly - }; - // TODO: between `max_definition` and `level` below, one might have to be 0 + let batch_level = LevelInfo::new_from_batch(batch); batch .columns() .iter() .zip(batch.schema().fields()) .for_each(|(array, field)| { let mut array_levels = - calculate_array_levels(array, field, 1, &batch_level); + batch_level.calculate_array_levels(array, field, 1); levels.append(&mut array_levels); }); // reverse levels so we can use Vec::pop(&mut self) @@ -121,7 +110,7 @@ impl ArrowWriter { self.writer.close_row_group(row_group_writer) } - /// Close and finalise the underlying Parquet writer + /// Close and finalize the underlying Parquet writer pub fn close(&mut self) -> Result<()> { self.writer.close() } @@ -318,357 +307,6 @@ fn write_leaf( Ok(written as i64) } -/// Compute nested levels of the Arrow array, recursing into lists and structs -/// Returns a list of `LevelInfo`, where each level is for nested primitive arrays. -/// -/// The algorithm works by eagerly incrementing non-null values, and decrementing -/// when a value is null. -/// -/// *Examples:* -/// -/// A record batch always starts at a populated definition = level 1. -/// When a batch only has a primitive, i.e. `>, column `a` -/// can only have a maximum level of 1 if it is not null. -/// If it is null, we decrement by 1, such that the null slots will = level 0. -/// -/// If a batch has nested arrays (list, struct, union, etc.), then the incrementing -/// takes place. -/// A `>` will have up to 2 levels (if nullable). -/// When calculating levels for `a`, if the struct slot is not empty, we -/// increment by 1, such that we'd have `[2, 2, 2]` if all 3 slots are not null. -/// If there is an empty slot, we decrement, leaving us with `[2, 0, 2]` as the -/// null slot effectively means that no record is populated for the row altogether. -/// -/// *Lists* -/// -/// TODO -/// -/// *Non-nullable arrays* -/// -/// If an array is non-nullable, this is accounted for when converting the Arrow -/// schema to a Parquet schema. -/// When dealing with `>` there is no issue, as the meximum -/// level will always be = 1. -/// -/// When dealing with nested types, the logic becomes a bit complicate. -/// A non-nullable struct; `>>` will only -/// have 1 maximum level, where 0 means `b` is nul, and 1 means `b` is not null. -/// -/// We account for the above by checking if the `Field` is nullable, and adjusting -/// the [inc|dec]rement accordingly. -fn calculate_array_levels( - array: &arrow_array::ArrayRef, - field: &Field, - level: i16, - level_info: &LevelInfo, -) -> Vec { - match array.data_type() { - ArrowDataType::Null => vec![LevelInfo { - definition: level_info - .definition - .iter() - .map(|d| (d - 1).max(0)) - .collect(), - repetition: level_info.repetition.clone(), - definition_mask: level_info.definition_mask.clone(), - array_offsets: level_info.array_offsets.clone(), - array_mask: level_info.array_mask.clone(), - max_definition: level, - is_list: level_info.is_list, - is_nullable: true, // always nullable as all values are nulls - }], - ArrowDataType::Boolean - | ArrowDataType::Int8 - | ArrowDataType::Int16 - | ArrowDataType::Int32 - | ArrowDataType::Int64 - | ArrowDataType::UInt8 - | ArrowDataType::UInt16 - | ArrowDataType::UInt32 - | ArrowDataType::UInt64 - | ArrowDataType::Float16 - | ArrowDataType::Float32 - | ArrowDataType::Float64 - | ArrowDataType::Utf8 - | ArrowDataType::LargeUtf8 - | ArrowDataType::Timestamp(_, _) - | ArrowDataType::Date32(_) - | ArrowDataType::Date64(_) - | ArrowDataType::Time32(_) - | ArrowDataType::Time64(_) - | ArrowDataType::Duration(_) - | ArrowDataType::Interval(_) - | ArrowDataType::Binary - | ArrowDataType::LargeBinary => vec![LevelInfo { - definition: get_primitive_def_levels(array, field, &level_info.definition), - repetition: level_info.repetition.clone(), - definition_mask: level_info.definition_mask.clone(), - array_offsets: level_info.array_offsets.clone(), - array_mask: level_info.array_mask.clone(), - is_list: level_info.is_list, - max_definition: level, - is_nullable: field.is_nullable(), - }], - ArrowDataType::FixedSizeBinary(_) => unimplemented!(), - ArrowDataType::Decimal(_, _) => unimplemented!(), - ArrowDataType::List(list_field) | ArrowDataType::LargeList(list_field) => { - let array_data = array.data(); - let child_data = array_data.child_data().get(0).unwrap(); - // get offsets, accounting for large offsets if present - let offsets: Vec = { - if let ArrowDataType::LargeList(_) = array.data_type() { - unsafe { array_data.buffers()[0].typed_data::() }.to_vec() - } else { - let offsets = unsafe { array_data.buffers()[0].typed_data::() }; - offsets.to_vec().into_iter().map(|v| v as i64).collect() - } - }; - let child_array = arrow_array::make_array(child_data.clone()); - - let mut list_def_levels = Vec::with_capacity(child_array.len()); - let mut list_rep_levels = Vec::with_capacity(child_array.len()); - let rep_levels: Vec = level_info - .repetition - .clone() - .map(|l| l.to_vec()) - .unwrap_or_else(|| vec![0i16; level_info.definition.len()]); - level_info - .definition - .iter() - .zip(rep_levels) - .zip(offsets.windows(2)) - .for_each(|((parent_def_level, parent_rep_level), window)| { - if *parent_def_level == 0 { - // parent is null, list element must also be null - list_def_levels.push(0); - list_rep_levels.push(0); - } else { - // parent is not null, check if list is empty or null - let start = window[0]; - let end = window[1]; - let len = end - start; - if len == 0 { - list_def_levels.push(*parent_def_level - 1); - list_rep_levels.push(parent_rep_level); - } else { - list_def_levels.push(*parent_def_level); - list_rep_levels.push(parent_rep_level); - for _ in 1..len { - list_def_levels.push(*parent_def_level); - list_rep_levels.push(parent_rep_level + 1); - } - } - } - }); - - // if datatype is a primitive, we can construct levels of the child array - match child_array.data_type() { - // TODO: The behaviour of a > is untested - ArrowDataType::Null => vec![LevelInfo { - definition: list_def_levels, - repetition: Some(list_rep_levels), - definition_mask: level_info.definition_mask.clone(), // TODO: list mask - array_offsets: offsets, - array_mask: level_info.array_mask.clone(), // TODO: list mask - is_list: true, - is_nullable: list_field.is_nullable(), - max_definition: level + 1, // TODO: compute correctly - }], - ArrowDataType::Boolean - | ArrowDataType::Int8 - | ArrowDataType::Int16 - | ArrowDataType::Int32 - | ArrowDataType::Int64 - | ArrowDataType::UInt8 - | ArrowDataType::UInt16 - | ArrowDataType::UInt32 - | ArrowDataType::UInt64 - | ArrowDataType::Float16 - | ArrowDataType::Float32 - | ArrowDataType::Float64 - | ArrowDataType::Timestamp(_, _) - | ArrowDataType::Date32(_) - | ArrowDataType::Date64(_) - | ArrowDataType::Time32(_) - | ArrowDataType::Time64(_) - | ArrowDataType::Duration(_) - | ArrowDataType::Interval(_) => { - let def_levels = get_primitive_def_levels( - &child_array, - list_field, - &list_def_levels[..], - ); - vec![LevelInfo { - definition: def_levels, - repetition: Some(list_rep_levels), - array_mask: vec![], - array_offsets: vec![], - definition_mask: vec![], - is_list: true, - is_nullable: list_field.is_nullable(), - max_definition: level + 1, // TODO: update - }] - } - ArrowDataType::Binary - | ArrowDataType::Utf8 - | ArrowDataType::LargeUtf8 => unimplemented!(), - ArrowDataType::FixedSizeBinary(_) => unimplemented!(), - ArrowDataType::Decimal(_, _) => unimplemented!(), - ArrowDataType::LargeBinary => unimplemented!(), - ArrowDataType::List(_) | ArrowDataType::LargeList(_) => { - // nested list - unimplemented!() - } - ArrowDataType::FixedSizeList(_, _) => unimplemented!(), - ArrowDataType::Struct(_) => { - let struct_level_info = LevelInfo { - definition: list_def_levels, - repetition: Some(list_rep_levels), - definition_mask: vec![], - array_offsets: vec![], - array_mask: vec![], - max_definition: level + 1, - is_list: list_field.is_nullable(), - is_nullable: true, // // indicates a nesting level of 2 (list + struct) - }; - calculate_array_levels( - array, - list_field, - level + 1, // indicates a nesting level of 2 (list + struct) - &struct_level_info, - ) - } - ArrowDataType::Union(_) => unimplemented!(), - ArrowDataType::Dictionary(_, _) => unimplemented!(), - } - } - ArrowDataType::FixedSizeList(_, _) => unimplemented!(), - ArrowDataType::Struct(struct_fields) => { - let struct_array: &arrow_array::StructArray = array - .as_any() - .downcast_ref::() - .expect("Unable to get struct array"); - let array_len = struct_array.len(); - let mut struct_def_levels = Vec::with_capacity(array_len); - let mut struct_mask = Vec::with_capacity(array_len); - // we can have a >, in which case we should check - // the parent struct in the child struct's offsets - for (i, def_level) in level_info.definition.iter().enumerate() { - if *def_level == level { - if !field.is_nullable() { - // push the level as is - struct_def_levels.push(level); - } else if struct_array.is_valid(i) { - // Increment to indicate that this value is not null - // The next level will decrement if it is null - // we can check if current value is null - struct_def_levels.push(level + 1); - } else { - // decrement to show that only the previous level is populated - // we only decrement if previous field is nullable - struct_def_levels.push(level - (level_info.is_nullable as i16)); - } - } else { - struct_def_levels.push(*def_level); - } - // TODO: is it more efficient to use `bitvec` here? - struct_mask.push(struct_array.is_valid(i)); - } - // trying to create levels for struct's fields - let mut struct_levels = vec![]; - let struct_level_info = LevelInfo { - definition: struct_def_levels, - // TODO: inherit the parent's repetition? (relevant for >) - repetition: level_info.repetition.clone(), - // Is it correct to increment this by 1 level? - definition_mask: level_info - .definition_mask - .iter() - .map(|(state, index)| (*state, index + 1)) - .collect(), - // logically, a struct should inherit its parent's offsets - array_offsets: level_info.array_offsets.clone(), - // this should be just the struct's mask, not its parent's - array_mask: struct_mask, - max_definition: level_info.max_definition + (field.is_nullable() as i16), - is_list: level_info.is_list, - is_nullable: field.is_nullable(), - }; - struct_array - .columns() - .into_iter() - .zip(struct_fields) - .for_each(|(col, struct_field)| { - let mut levels = calculate_array_levels( - col, - struct_field, - level + (field.is_nullable() as i16), - &struct_level_info, - ); - struct_levels.append(&mut levels); - }); - struct_levels - } - ArrowDataType::Union(_) => unimplemented!(), - ArrowDataType::Dictionary(_, _) => { - // Need to check for these cases not implemented in C++: - // - "Writing DictionaryArray with nested dictionary type not yet supported" - // - "Writing DictionaryArray with null encoded in dictionary type not yet supported" - vec![LevelInfo { - definition: get_primitive_def_levels( - array, - field, - &level_info.definition, - ), - repetition: level_info.repetition.clone(), - definition_mask: level_info.definition_mask.clone(), - array_offsets: level_info.array_offsets.clone(), - array_mask: level_info.array_mask.clone(), - is_list: level_info.is_list, - max_definition: level, - is_nullable: field.is_nullable(), - }] - } - } -} - -/// Get the definition levels of the numeric array, with level 0 being null and 1 being not null -/// In the case where the array in question is a child of either a list or struct, the levels -/// are incremented in accordance with the `level` parameter. -/// Parent levels are either 0 or 1, and are used to higher (correct terminology?) leaves as null -/// -/// TODO: (a comment to remove, note to help me reduce the mental bookkeeping) -/// We want an array's levels to be additive here, i.e. if we have an array that -/// comes from >, we should consume &[0; array.len()], so that -/// we add values to it, instead of subtract values -/// -/// An alternaitve is to pass the max level, and use it to compute whether we -/// should increment (though this is likely tricker) -fn get_primitive_def_levels( - array: &arrow_array::ArrayRef, - field: &Field, - parent_def_levels: &[i16], -) -> Vec { - let mut array_index = 0; - let max_def_level = parent_def_levels.iter().max().unwrap(); - let mut primitive_def_levels = vec![]; - parent_def_levels.iter().for_each(|def_level| { - // TODO: if field is non-nullable, can its parent be nullable? Ideally shouldn't - // being non-null means that for a level > 1, then we should subtract 1? - if !field.is_nullable() && *max_def_level > 1 { - primitive_def_levels.push(*def_level - 1); - array_index += 1; - } else if def_level < max_def_level { - primitive_def_levels.push(*def_level); - array_index += 1; - } else { - primitive_def_levels.push(def_level - array.is_null(array_index) as i16); - array_index += 1; - } - }); - primitive_def_levels -} - macro_rules! def_get_binary_array_fn { ($name:ident, $ty:ty) => { fn $name(array: &$ty) -> Vec { @@ -835,7 +473,6 @@ mod tests { } #[test] - #[ignore = "repetitions might be incorrect, will be addressed as part of ARROW-9728"] fn arrow_writer_non_null() { // define schema let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); @@ -854,7 +491,7 @@ mod tests { } #[test] - #[ignore = "list support is incomplete"] + #[ignore = "ARROW-10766: list support is incomplete"] fn arrow_writer_list() { // define schema let schema = Schema::new(vec![Field::new( @@ -880,6 +517,7 @@ mod tests { .len(5) .add_buffer(a_value_offsets) .add_child_data(a_values.data()) + .null_bit_buffer(Buffer::from(vec![0b00011011])) .build(); let a = ListArray::from(a_list_data); @@ -953,7 +591,7 @@ mod tests { } #[test] - #[ignore = "list support is incomplete"] + #[ignore = "ARROW-10766: list support is incomplete"] fn arrow_writer_complex() { // define schema let struct_field_d = Field::new("d", DataType::Float64, true); @@ -1052,8 +690,38 @@ mod tests { } #[test] - #[ignore = "waiting on inheritance of nested structs, ARROW-10684"] fn arrow_writer_2_level_struct_non_null() { + // tests writing > + let field_c = Field::new("c", DataType::Int32, false); + let field_b = Field::new("b", DataType::Struct(vec![field_c]), false); + let field_a = Field::new("a", DataType::Struct(vec![field_b.clone()]), false); + let schema = Schema::new(vec![field_a.clone()]); + + // create data + let c = Int32Array::from(vec![1,2,3,4,5,6]); + let b_data = ArrayDataBuilder::new(field_b.data_type().clone()) + .len(6) + .add_child_data(c.data()) + .build(); + let b = StructArray::from(b_data); + let a_data = ArrayDataBuilder::new(field_a.data_type().clone()) + .len(6) + .add_child_data(b.data()) + .build(); + let a = StructArray::from(a_data); + + assert_eq!(a.null_count(), 0); + assert_eq!(a.column(0).null_count(), 0); + + // build a racord batch + let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap(); + + roundtrip("test_arrow_writer_2_level_struct_non_null.parquet", batch); + } + + #[test] + #[ignore = "waiting on inheritance of nested structs, ARROW-10684"] + fn arrow_writer_2_level_struct_mixed_null() { // tests writing > let field_c = Field::new("c", DataType::Int32, false); let field_b = Field::new("b", DataType::Struct(vec![field_c]), true); @@ -1081,7 +749,7 @@ mod tests { // build a racord batch let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap(); - roundtrip("test_arrow_writer_2_level_struct_non_null.parquet", batch); + roundtrip("test_arrow_writer_2_level_struct_mixed_null.parquet", batch); } const SMALL_SIZE: usize = 100; @@ -1430,7 +1098,7 @@ mod tests { } #[test] - #[ignore = "list support is incomplete"] + #[ignore = "ARROW-10766: list support is incomplete"] fn list_single_column() { let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); let a_value_offsets = @@ -1455,7 +1123,7 @@ mod tests { } #[test] - #[ignore = "list support is incomplete"] + #[ignore = "ARROW-10766: list support is incomplete"] fn large_list_single_column() { let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); let a_value_offsets = diff --git a/rust/parquet/src/arrow/levels.rs b/rust/parquet/src/arrow/levels.rs index 489db17b2ac..099ef9b83eb 100644 --- a/rust/parquet/src/arrow/levels.rs +++ b/rust/parquet/src/arrow/levels.rs @@ -15,678 +15,316 @@ // specific language governing permissions and limitations // under the License. -//! Contains the logic for computing definition and repetition levels - +//! Parquet definition and repetition levels +//! +//! Contains the algorithm for computing definition and repetition levels. +//! The algorithm works by tracking the slots of an array that should ultimately be populated when +//! writing to Parquet. +//! Parquet achieves nesting through definition levels and repetition levels [1]. +//! Definition levels specify how many optional fields in the part for the column are defined. +//! Repetition levels specify at what repeated field (list) in the path a column is defined. +//! +//! In a nested data structure such as `a.b.c`, one can see levels as defining whether a record is +//! defined at `a`, `a.b`, or `a.b.c`. Optional fields are nullable fields, thus if all 3 fiedls +//! are nullable, the maximum definition will be = 3. +//! +//! The algorithm in this module computes the necessary information to enable the writer to keep +//! track of which columns are at which levels, and to ultimately extract the correct values at +//! the correct slots from Arrow arrays. +//! +//! It works by walking a record batch's arrays, keeping track of what values are non-null, their +//! positions and computing what their levels are. +//! We use an eager approach that increments definition levels where incrementable, and decrements +//! if a value being checked is null. +//! +//! [1] https://github.com/apache/parquet-format#nested-encoding + +use arrow::array::{Array, ArrayRef, StructArray}; +use arrow::datatypes::{DataType, Field}; +use arrow::record_batch::RecordBatch; + +/// Keeps track of the level information per array that is needed to write an Arrow aray to Parquet. +/// +/// When a nested schema is traversed, intermediate [LevelInfo] structs are created to track +/// the state of parent arrays. When a primitive Arrow array is encountered, a final [LevelInfo] +/// is created, and this is what is used to index into the array when writing data to Parquet. +/// +/// Note: for convenience, the final primitive array's level info can omit some values below if +/// none of that array's parents were repetitive (i.e `is_list` is false) #[derive(Debug, Eq, PartialEq, Clone)] pub(crate) struct LevelInfo { /// Array's definition levels pub definition: Vec, /// Array's optional repetition levels pub repetition: Option>, - /// Definition mask, to indicate null ListArray slots that should be skipped - pub definition_mask: Vec<(bool, i16)>, /// Array's offsets, 64-bit is used to accommodate large offset arrays pub array_offsets: Vec, /// Array's validity mask + /// + /// While this looks like `definition_mask`, they serve different purposes. + /// This mask is for the immediate array, while the `definition_mask` tracks + /// the cumulative effect of all masks from the root (batch) to the current array. pub array_mask: Vec, - /// The maximum definition at this level, 0 at the root (record batch) [TODO: the 0 might be inaccurate] + /// Definition mask, to indicate null ListArray slots that should be skipped + pub definition_mask: Vec<(bool, i16)>, + /// The maximum definition at this level, 1 at the record batch pub max_definition: i16, - /// Whether this array or any of its parents is a list + /// Whether this array or any of its parents is a list, in which case the + /// `definition_mask` would be used to index correctly into list children. pub is_list: bool, - /// Whether the array is nullable (affects definition levels) + /// Whether the current array is nullable (affects definition levels) pub is_nullable: bool, } impl LevelInfo { - fn calculate_child_levels( - &self, - array_offsets: Vec, - array_mask: Vec, - is_list: bool, - is_nullable: bool, - current_def_level: i16, - ) -> Self { - let mut definition = vec![]; - let mut repetition = vec![]; - let mut definition_mask = vec![]; - let has_repetition = self.is_list || is_list; - - // keep track of parent definition nulls seen through the definition_mask - let mut nulls_seen = 0; - - // push any initial array slots that are null - while !self.definition_mask[nulls_seen].0 - && self.definition_mask[nulls_seen].1 + 2 < current_def_level - { - definition_mask.push(self.definition_mask[nulls_seen]); - definition.push(self.definition[nulls_seen]); - repetition.push(0); // TODO is it always 0? - nulls_seen += 1; - println!("Definition length e: {}", definition.len()); - } - - // we use this index to determine if a repetition should be populated based - // on its definition at the index. It needs to be outside of the loop - let mut def_index = 0; - - self.array_offsets.windows(2).for_each(|w| { - // the parent's index allows us to iterate through its offsets and the child's - let from = w[0] as usize; - let to = w[1] as usize; - // dbg!((from, to)); - // if the parent slot is empty, fill it once to show the nullness - if from == to { - definition.push(self.max_definition - 1); - repetition.push(0); - definition_mask.push((false, self.max_definition - 1)); - println!("Definition length d: {}", definition.len()); + /// Create a new [LevelInfo] from a record batch. + /// + /// This is a convenience function to populate the starting point of the traversal. + pub(crate) fn new_from_batch(batch: &RecordBatch) -> Self { + let num_rows = batch.num_rows(); + Self { + // a batch is treated as all-defined + definition: vec![1; num_rows], + // a batch has no repetition as it is not a list + repetition: None, + // all values of a batch as deemed to be defined at level 1 + definition_mask: vec![(true, 1); num_rows], + // a batch has sequential offsets, should be num_rows + 1 + array_offsets: (0..=(num_rows as i64)).collect(), + // all values at a batch-level are non-null + array_mask: vec![true; num_rows], + max_definition: 1, + is_list: false, + // a batch is treated as nullable even though it has no nulls, + // this is required to compute nested type levels correctly + is_nullable: true, } + } - (from..to).for_each(|index| { - println!( - "Array level: {}, parent offset: {}", - current_def_level, index - ); - let parent_mask = &self.definition_mask[index + nulls_seen]; - // TODO: this might need to be < instead of ==, but we generate duplicates in that case - if !parent_mask.0 && parent_mask.1 == current_def_level { - println!("Parent mask c: {:?}", parent_mask); - nulls_seen += 1; - definition.push(self.max_definition); - repetition.push(1); - definition_mask.push(*parent_mask); - println!("Definition length c: {}", definition.len()); + /// Compute nested levels of the Arrow array, recursing into lists and structs. + /// + /// Returns a list of `LevelInfo`, where each level is for nested primitive arrays. + /// + /// The algorithm works by eagerly incrementing non-null values, and decrementing + /// when a value is null. + /// + /// *Examples:* + /// + /// A record batch always starts at a populated definition = level 1. + /// When a batch only has a primitive, i.e. `>, column `a` + /// can only have a maximum level of 1 if it is not null. + /// If it is null, we decrement by 1, such that the null slots will = level 0. + /// + /// If a batch has nested arrays (list, struct, union, etc.), then the incrementing + /// takes place. + /// A `>` will have up to 2 levels (if nullable). + /// When calculating levels for `a`, we start with level 1 from the batch, + /// then if the struct slot is not empty, we increment by 1, such that we'd have `[2, 2, 2]` + /// if all 3 slots are not null. + /// If there is an empty slot, we decrement, leaving us with `[2, 0 (1-1), 2]` as the + /// null slot effectively means that no record is populated for the row altogether. + /// + /// When we encounter `b` which is primitive, we check if the supplied definition levels + /// equal the maximum level (i.e. level = 2). If the level < 2, then the parent of the + /// primitive (`a`) is already null, and `b` is kept as null. + /// If the level == 2, then we check if `b`'s slot is null, decrementing if it is null. + /// Thus we could have a final definition as: `[2, 0, 1]` indicating that only the first + /// slot is populated for `a.b`, the second one is all null, and only `a` has a value on the last. + /// + /// If expressed as JSON, this would be: + /// + /// ```json + /// {"a": {"b": 1}} + /// {"a": null} + /// {"a": {"b": null}} + /// ``` + /// + /// *Lists* + /// + /// TODO + /// + /// *Non-nullable arrays* + /// + /// If an array is non-nullable, this is accounted for when converting the Arrow schema to a + /// Parquet schema. + /// When dealing with `>` there is no issue, as the maximum + /// level will always be = 1. + /// + /// When dealing with nested types, the logic becomes a bit complicated. + /// A non-nullable struct; `>>` will only + /// have 1 maximum level, where 0 means `b` is null, and 1 means `b` is not null. + /// + /// We account for the above by checking if the `Field` is nullable, and adjusting + /// the `level` variable to determine which level the next child should increment or + /// decrement from. + pub(crate) fn calculate_array_levels( + &self, + array: &ArrayRef, + field: &Field, + level: i16, + ) -> Vec { + match array.data_type() { + DataType::Null => vec![Self { + definition: self.definition.iter().map(|d| (d - 1).max(0)).collect(), + repetition: self.repetition.clone(), + definition_mask: self.definition_mask.clone(), + array_offsets: self.array_offsets.clone(), + array_mask: self.array_mask.clone(), + max_definition: level, + is_list: self.is_list, + is_nullable: true, // always nullable as all values are nulls + }], + DataType::Boolean + | DataType::Int8 + | DataType::Int16 + | DataType::Int32 + | DataType::Int64 + | DataType::UInt8 + | DataType::UInt16 + | DataType::UInt32 + | DataType::UInt64 + | DataType::Float16 + | DataType::Float32 + | DataType::Float64 + | DataType::Utf8 + | DataType::LargeUtf8 + | DataType::Timestamp(_, _) + | DataType::Date32(_) + | DataType::Date64(_) + | DataType::Time32(_) + | DataType::Time64(_) + | DataType::Duration(_) + | DataType::Interval(_) + | DataType::Binary + | DataType::LargeBinary => { + // we return a vector of 1 value to represent the primitive + // it is safe to inherit the parent level's repetition, but we have to calculate + // the child's own definition levels + vec![Self { + definition: self.get_primitive_def_levels(array, field), + // TODO: if we change this when working on lists, then update the above comment + repetition: self.repetition.clone(), + definition_mask: self.definition_mask.clone(), + array_offsets: self.array_offsets.clone(), + array_mask: self.array_mask.clone(), + is_list: self.is_list, + max_definition: level, + is_nullable: field.is_nullable(), + }] } - let mask = array_mask[index]; - let array_from = array_offsets[index]; - let array_to = array_offsets[index + 1]; - - let parent_def_level = &self.definition[index + nulls_seen]; - - // if array_len == 0, the child is null - let array_len = array_to - array_from; - - // compute the definition level - // what happens if array's len is 0? - if array_len == 0 { - definition.push(self.max_definition); - repetition.push(0); // TODO: validate that this is 0 for deeply nested lists - definition_mask.push((false, current_def_level)); - println!("Definition length b: {}", definition.len()); + DataType::FixedSizeBinary(_) => unimplemented!(), + DataType::Decimal(_, _) => unimplemented!(), + DataType::List(_list_field) | DataType::LargeList(_list_field) => { + // TODO: ARROW-10766, it is better to not write lists at all until they are correct + todo!("List writing not yet implemented, see ARROW-10766") } - (array_from..array_to).for_each(|_| { - definition.push(if *parent_def_level == self.max_definition { - // TODO: haven't validated this in deeply-nested lists - self.max_definition + mask as i16 - } else { - *parent_def_level - }); - definition_mask.push((true, current_def_level)); - println!("Definition length a: {}", definition.len()); - }); - - // 11-11-2020 (23:57GMT) - // we are pushing defined repetitions even if a definition is < max - // I had initially separated the repetition logic here so that I - // don't perform a `has_repetition` check on each loop. - // The downside's that I now need to index into `definitions` so I - // can check if a value is defined or not. - - if has_repetition && array_len > 0 { - // compute the repetition level - - // dbg!(&definition); - // dbg!(current_def_level, parent_level.max_definition); - // dbg!(&parent_level.repetition); - match &self.repetition { - Some(rep) => { - let parent_rep = rep[index]; - // TODO(11/11/2020) need correct variable to mask repetitions correctly - if definition[def_index] == current_def_level { - repetition.push(parent_rep); - println!("* Index {} definition is {}, and repetition is {}. Current def: {}", def_index, definition[def_index], parent_rep, current_def_level); - dbg!(&repetition); - def_index += 1; - (1..array_len).for_each(|_| { - println!("* Index {} definition is {}, and repetition is {}. Current def: {}", def_index, definition[def_index], parent_rep, current_def_level); - repetition.push(current_def_level); // was parent_rep + 1 - def_index += 1; - }); + DataType::FixedSizeList(_, _) => unimplemented!(), + DataType::Struct(struct_fields) => { + let struct_array: &StructArray = array + .as_any() + .downcast_ref::() + .expect("Unable to get struct array"); + let array_len = struct_array.len(); + let mut struct_def_levels = Vec::with_capacity(array_len); + let mut struct_mask = Vec::with_capacity(array_len); + // we can have a >, in which case we should check + // the parent struct in the child struct's offsets + for (i, def_level) in self.definition.iter().enumerate() { + if *def_level == level { + if !field.is_nullable() { + // if the field is non-nullable and current definition = parent, + // then we should neither increment nor decrement the level + struct_def_levels.push(level); + } else if struct_array.is_valid(i) { + // Increment to indicate that this value is not null + // The next level will decrement if it is null + struct_def_levels.push(level + 1); } else { - (0..array_len).for_each(|_| { - println!("* Index {} definition is {}, and repetition is {}. Current def: {}", def_index, definition[def_index], parent_rep, current_def_level); - repetition.push(0); // TODO: should it be anything else? - // TODO: use an append instead of pushes - def_index += 1; - }); + // decrement to show that only the previous level is populated + // we only decrement if previous field is nullable because if it + // was not nullable, we can't decrement beyond its level + struct_def_levels.push(level - (self.is_nullable as i16)); } + } else { + // this means that the previous level's slot was null, so we preserve it + struct_def_levels.push(*def_level); } - None => { - println!("+ Index {} definition is {}, and repetition is 0. Current def: {}", def_index, definition[def_index], current_def_level); - // if definition[def_index] == current_def_level { - repetition.push(0); - def_index += 1; - (1..array_len).for_each(|_| { - repetition.push(1); // TODO: is it always 0 and 1? - def_index += 1; - }); - // } else { - // (0..array_len).for_each(|_| { - // repetition.push(0); // TODO: should it be anything else? - // // TODO: use an append instead of pushes - // def_index += 1; - // }); - // } - } + // TODO: is it more efficient to use `bitvec` here? + struct_mask.push(struct_array.is_valid(i)); } + // create levels for struct's fields, we accumulate them in this vec + let mut struct_levels = vec![]; + let struct_level_info = Self { + definition: struct_def_levels, + // inherit the parent's repetition + repetition: self.repetition.clone(), + // Is it correct to increment this by 1 level? + definition_mask: self + .definition_mask + .iter() + .map(|(state, index)| (*state, index + 1)) + .collect(), + // logically, a struct should inherit its parent's offsets + array_offsets: self.array_offsets.clone(), + // this should be just the struct's mask, not its parent's + array_mask: struct_mask, + max_definition: self.max_definition + (field.is_nullable() as i16), + is_list: self.is_list, + is_nullable: field.is_nullable(), + }; + struct_array + .columns() + .into_iter() + .zip(struct_fields) + .for_each(|(col, struct_field)| { + let mut levels = struct_level_info.calculate_array_levels( + col, + struct_field, + level + (field.is_nullable() as i16), + ); + struct_levels.append(&mut levels); + }); + struct_levels } - }); - }); - - let lev = LevelInfo { - definition, - repetition: if !has_repetition { - None - } else { - Some(repetition) - }, - definition_mask, - array_mask, - array_offsets, - is_list: has_repetition, - max_definition: current_def_level, - is_nullable, - }; - - println!("done"); - - lev - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_calculate_array_levels_twitter_example() { - // based on the example at https://blog.twitter.com/engineering/en_us/a/2013/dremel-made-simple-with-parquet.html - // [[a, b, c], [d, e, f, g]], [[h], [i,j]] - let parent_levels = LevelInfo { - definition: vec![0, 0], - repetition: None, - definition_mask: vec![(true, 1), (true, 1)], - array_offsets: vec![0, 1, 2], // 2 records, root offsets always sequential - array_mask: vec![true, true], // both lists defined - max_definition: 0, // at the root, set to 0 - is_list: false, // root is never list - is_nullable: false, // root in example is non-nullable - }; - // offset into array, each level1 has 2 values - let array_offsets = vec![0, 2, 4]; - let array_mask = vec![true, true]; - - // calculate level1 levels - let levels = parent_levels.calculate_child_levels( - array_offsets.clone(), - array_mask.clone(), - true, - false, - 1, - ); - // - let expected_levels = LevelInfo { - definition: vec![1, 1, 1, 1], - repetition: Some(vec![0, 1, 0, 1]), - definition_mask: vec![(true, 1), (true, 1), (true, 1), (true, 1)], - array_offsets, - array_mask, - max_definition: 1, - is_list: true, - is_nullable: false, - }; - assert_eq!(levels, expected_levels); - - // level2 - let parent_levels = levels; - let array_offsets = vec![0, 3, 7, 8, 10]; - let array_mask = vec![true, true, true, true]; - let levels = parent_levels.calculate_child_levels( - array_offsets.clone(), - array_mask.clone(), - true, - false, - 2, - ); - let expected_levels = LevelInfo { - definition: vec![2, 2, 2, 2, 2, 2, 2, 2, 2, 2], - repetition: Some(vec![0, 2, 2, 1, 2, 2, 2, 0, 1, 2]), - definition_mask: vec![ - (true, 2), - (true, 2), - (true, 2), - (true, 2), - (true, 2), - (true, 2), - (true, 2), - (true, 2), - (true, 2), - (true, 2), - ], - array_offsets, - array_mask, - max_definition: 2, - is_list: true, - is_nullable: false, - }; - assert_eq!(&levels, &expected_levels); - } - - #[test] - fn test_calculate_one_level_1() { - // This test calculates the levels for a non-null primitive array - let parent_levels = LevelInfo { - definition: vec![1; 10], - repetition: None, - definition_mask: vec![(true, 1); 10], - array_offsets: (0..=10).collect(), - array_mask: vec![true; 10], - max_definition: 0, - is_list: false, - is_nullable: false, - }; - let array_offsets: Vec = (0..=10).collect(); - let array_mask = vec![true; 10]; - - let levels = parent_levels.calculate_child_levels( - array_offsets.clone(), - array_mask.clone(), - false, - false, - 1, - ); - let expected_levels = LevelInfo { - definition: vec![1; 10], - repetition: None, - definition_mask: vec![(true, 1); 10], - array_offsets, - array_mask, - max_definition: 1, - is_list: false, - is_nullable: false, - }; - assert_eq!(&levels, &expected_levels); - } - - #[test] - #[ignore] - fn test_calculate_one_level_2() { - // This test calculates the levels for a non-null primitive array - let parent_levels = LevelInfo { - definition: vec![1; 5], - repetition: None, - definition_mask: vec![ - (true, 1), - (false, 1), - (true, 1), - (true, 1), - (false, 1), - ], - array_offsets: (0..=5).collect(), - array_mask: vec![true, false, true, true, false], - max_definition: 0, - is_list: false, - is_nullable: true, - }; - let array_offsets: Vec = (0..=5).collect(); - let array_mask = vec![true, false, true, true, false]; - - let levels = parent_levels.calculate_child_levels( - array_offsets.clone(), - array_mask.clone(), - false, - false, - 1, - ); - let expected_levels = LevelInfo { - definition: vec![1; 5], - repetition: None, - definition_mask: vec![(true, 1); 5], - array_offsets, - array_mask, - max_definition: 1, - is_list: false, - is_nullable: false, - }; - assert_eq!(&levels, &expected_levels); - } - - #[test] - fn test_calculate_array_levels_1() { - // if all array values are defined (e.g. batch>) - // [[0], [1], [2], [3], [4]] - let parent_levels = LevelInfo { - definition: vec![0, 0, 0, 0, 0], - repetition: None, - definition_mask: vec![(true, 1), (true, 1), (true, 1), (true, 1), (true, 1)], - array_offsets: vec![0, 1, 2, 3, 4, 5], - array_mask: vec![true, true, true, true, true], - max_definition: 0, - is_list: false, - is_nullable: false, - }; - let array_offsets = vec![0, 2, 2, 4, 8, 11]; - let array_mask = vec![true, false, true, true, true]; - - let levels = parent_levels.calculate_child_levels( - array_offsets.clone(), - array_mask.clone(), - true, - false, - 1, - ); - // array: [[0, 0], _1_, [2, 2], [3, 3, 3, 3], [4, 4, 4]] - // all values are defined as we do not have nulls on the root (batch) - // repetition: - // 0: 0, 1 - // 1: - // 2: 0, 1 - // 3: 0, 1, 1, 1 - // 4: 0, 1, 1 - let expected_levels = LevelInfo { - definition: vec![1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1], - repetition: Some(vec![0, 1, 0, 0, 1, 0, 1, 1, 1, 0, 1, 1]), - definition_mask: vec![ - (true, 1), - (true, 1), - (false, 1), - (true, 1), - (true, 1), - (true, 1), - (true, 1), - (true, 1), - (true, 1), - (true, 1), - (true, 1), - (true, 1), - ], - array_offsets, - array_mask, - max_definition: 1, - is_list: true, - is_nullable: false, - }; - assert_eq!(levels, expected_levels); - } - - #[test] - #[ignore] - fn test_calculate_array_levels_2() { - // if some values are null - let parent_levels = LevelInfo { - definition: vec![0, 1, 0, 1, 1], - repetition: None, - definition_mask: vec![ - (false, 1), - (true, 1), - (false, 1), - (true, 1), - (true, 1), - ], - array_offsets: vec![0, 1, 2, 3, 4, 5], - array_mask: vec![false, true, false, true, true], - max_definition: 0, - is_list: false, - is_nullable: true, - }; - let array_offsets = vec![0, 2, 2, 4, 8, 11]; - let array_mask = vec![true, false, true, true, true]; - - let levels = parent_levels.calculate_child_levels( - array_offsets.clone(), - array_mask.clone(), - true, - true, - 1, - ); - let expected_levels = LevelInfo { - // 0 1 [2] are 0 (not defined at level 1) - // [2] is 1, but has 0 slots so is not populated (defined at level 1 only) - // 2 3 [4] are 0 - // 4 5 6 7 [8] are 1 (defined at level 1 only) - // 8 9 10 [11] are 2 (defined at both levels) - definition: vec![0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1], - repetition: Some(vec![0, 1, 0, 0, 1, 0, 1, 1, 1, 0, 1, 1]), - definition_mask: vec![ - (true, 1), - (true, 1), - (false, 1), - (true, 1), - (true, 1), - (true, 1), - (true, 1), - (true, 1), - (true, 1), - (true, 1), - (true, 1), - (true, 1), - ], - array_offsets, - array_mask, - max_definition: 1, - is_nullable: true, - is_list: true, - }; - assert_eq!(&levels, &expected_levels); - - // nested lists (using previous test) - let _nested_parent_levels = levels; - let array_offsets = vec![0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22]; - let array_mask = vec![ - true, true, true, true, true, true, true, true, true, true, true, - ]; - let levels = parent_levels.calculate_child_levels( - array_offsets.clone(), - array_mask.clone(), - true, - true, - 2, - ); - let expected_levels = LevelInfo { - // (def: 0) 0 1 [2] are 0 (take parent) - // (def: 0) 2 3 [4] are 0 (take parent) - // (def: 0) 4 5 [6] are 0 (take parent) - // (def: 0) 6 7 [8] are 0 (take parent) - // (def: 1) 8 9 [10] are 1 (take parent) - // (def: 1) 10 11 [12] are 1 (take parent) - // (def: 1) 12 23 [14] are 1 (take parent) - // (def: 1) 14 15 [16] are 1 (take parent) - // (def: 2) 16 17 [18] are 2 (defined at all levels) - // (def: 2) 18 19 [20] are 2 (defined at all levels) - // (def: 2) 20 21 [22] are 2 (defined at all levels) - definition: vec![ - 0, 0, 0, 0, 0i16, 0, 0, 0, 0, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, - ], - // TODO: this doesn't feel right, needs some validation - repetition: Some(vec![ - 0, 0, 0, 0, 0i16, 0, 0, 0, 0, 0, 3, 1, 3, 1, 3, 1, 3, 0, 3, 1, 3, 1, 3, - ]), - definition_mask: vec![ - (false, 0), - (false, 0), - (false, 0), - (false, 0), - (false, 0), - (false, 0), - (false, 0), - (false, 0), - (false, 0), - (false, 0), - (false, 0), - (false, 0), - (false, 0), - (false, 0), - (false, 0), - (false, 0), - (false, 0), - (false, 0), - (false, 0), - (false, 0), - (false, 0), - (false, 0), - (false, 0), - ], - array_offsets, - array_mask, - max_definition: 3, - is_nullable: true, - is_list: true, - }; - assert_eq!(levels, expected_levels); - } - - #[test] - #[ignore] - fn test_calculate_array_levels_nested_list() { - // if all array values are defined (e.g. batch>) - let parent_levels = LevelInfo { - definition: vec![0, 0, 0, 0], - repetition: None, - definition_mask: vec![(true, 1), (true, 1), (true, 1), (true, 1)], - array_offsets: vec![0, 1, 2, 3, 4], - array_mask: vec![true, true, true, true], - max_definition: 0, - is_list: false, - is_nullable: false, - }; - let array_offsets = vec![0, 0, 3, 5, 7]; - let array_mask = vec![false, true, true, true]; - - let levels = parent_levels.calculate_child_levels( - array_offsets.clone(), - array_mask.clone(), - true, - false, - 1, - ); - let expected_levels = LevelInfo { - definition: vec![0, 1, 1, 1, 1, 1, 1, 1], - repetition: Some(vec![0, 0, 1, 1, 0, 1, 0, 1]), - definition_mask: vec![ - (false, 1), - (true, 1), - (true, 1), - (true, 1), - (true, 1), - (true, 1), - (true, 1), - (true, 1), - ], - array_offsets, - array_mask, - max_definition: 1, - is_list: true, - is_nullable: false, - }; - assert_eq!(levels, expected_levels); - - // nested lists (using previous test) - let _nested_parent_levels = levels; - let array_offsets = vec![0, 1, 3, 3, 6, 10, 10, 15]; - let array_mask = vec![true, true, false, true, true, false, true]; - let levels = parent_levels.calculate_child_levels( - array_offsets, - array_mask, - true, - true, - 2, - ); - let expected_levels = LevelInfo { - definition: vec![0, 2, 2, 2, 1, 2, 2, 2, 2, 2, 2, 2, 1, 2, 2, 2, 2, 2], - repetition: Some(vec![0, 0, 1, 2, 1, 0, 2, 2, 1, 2, 2, 2, 0, 1, 2, 2, 2, 2]), - definition_mask: vec![ - (false, 1), - (true, 2), - (true, 2), - (true, 2), - (false, 2), - (true, 2), - (true, 2), - (true, 2), - (true, 2), - (true, 2), - (true, 2), - (true, 2), - (false, 2), - (true, 2), - (true, 2), - (true, 2), - (true, 2), - (true, 2), - ], - array_mask: vec![true, true, false, true, true, false, true], - array_offsets: vec![0, 1, 3, 3, 6, 10, 10, 15], - is_list: true, - is_nullable: true, - max_definition: 2, - }; - assert_eq!(levels, expected_levels); + DataType::Union(_) => unimplemented!(), + DataType::Dictionary(_, _) => { + // Need to check for these cases not implemented in C++: + // - "Writing DictionaryArray with nested dictionary type not yet supported" + // - "Writing DictionaryArray with null encoded in dictionary type not yet supported" + vec![Self { + definition: self.get_primitive_def_levels(array, field), + repetition: self.repetition.clone(), + definition_mask: self.definition_mask.clone(), + array_offsets: self.array_offsets.clone(), + array_mask: self.array_mask.clone(), + is_list: self.is_list, + max_definition: level, + is_nullable: field.is_nullable(), + }] + } + } } - #[test] - fn test_calculate_nested_struct_levels() { - // tests a > - // array: - // - {a: {b: {c: 1}}} - // - {a: {b: {c: null}}} - // - {a: {b: {c: 3}}} - // - {a: {b: null}} - // - {a: null}} - // - {a: {b: {c: 6}}} - let a_levels = LevelInfo { - definition: vec![1, 1, 1, 1, 0, 1], - repetition: None, - // should all be true if we haven't encountered a list - definition_mask: vec![(true, 1); 6], - array_offsets: (0..=6).collect(), - array_mask: vec![true, true, true, true, false, true], - max_definition: 1, - is_list: false, - is_nullable: true, - }; - // b's offset and mask - let b_offsets: Vec = (0..=6).collect(); - let b_mask = vec![true, true, true, false, false, true]; - // b's expected levels - let b_expected_levels = LevelInfo { - definition: vec![2, 2, 2, 1, 0, 2], - repetition: None, - definition_mask: vec![(true, 2); 6], - array_offsets: (0..=6).collect(), - array_mask: vec![true, true, true, false, false, true], - max_definition: 2, - is_list: false, - is_nullable: true, - }; - let b_levels = - a_levels.calculate_child_levels(b_offsets.clone(), b_mask, false, true, 2); - assert_eq!(&b_expected_levels, &b_levels); - - // c's offset and mask - let c_offsets = b_offsets; - let c_mask = vec![true, false, true, false, false, true]; - // c's expected levels - let c_expected_levels = LevelInfo { - definition: vec![3, 2, 3, 1, 0, 3], - repetition: None, - definition_mask: vec![(true, 3); 6], - array_offsets: c_offsets.clone(), - array_mask: vec![true, false, true, false, false, true], - max_definition: 3, - is_list: false, - is_nullable: true, - }; - let c_levels = b_levels.calculate_child_levels(c_offsets, c_mask, false, true, 3); - assert_eq!(&c_expected_levels, &c_levels); + /// Get the definition levels of the numeric array, with level 0 being null and 1 being not null + /// In the case where the array in question is a child of either a list or struct, the levels + /// are incremented in accordance with the `level` parameter. + /// Parent levels are either 0 or 1, and are used to higher (correct terminology?) leaves as null + fn get_primitive_def_levels(&self, array: &ArrayRef, field: &Field) -> Vec { + let mut array_index = 0; + let max_def_level = self.definition.iter().max().unwrap(); + let mut primitive_def_levels = vec![]; + self.definition.iter().for_each(|def_level| { + if !field.is_nullable() && *max_def_level > 1 { + primitive_def_levels.push(*def_level - 1); + array_index += 1; + } else if def_level < max_def_level { + primitive_def_levels.push(*def_level); + array_index += 1; + } else { + primitive_def_levels.push(def_level - array.is_null(array_index) as i16); + array_index += 1; + } + }); + primitive_def_levels } } From 2afcf7671147cd108d7adc072f0a4a3a665c3a80 Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Sun, 29 Nov 2020 14:45:10 +0200 Subject: [PATCH 4/6] nested struct tests now pass --- rust/parquet/src/arrow/arrow_writer.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/rust/parquet/src/arrow/arrow_writer.rs b/rust/parquet/src/arrow/arrow_writer.rs index 41c1acb2c41..9275d7d35fa 100644 --- a/rust/parquet/src/arrow/arrow_writer.rs +++ b/rust/parquet/src/arrow/arrow_writer.rs @@ -720,7 +720,6 @@ mod tests { } #[test] - #[ignore = "waiting on inheritance of nested structs, ARROW-10684"] fn arrow_writer_2_level_struct_mixed_null() { // tests writing > let field_c = Field::new("c", DataType::Int32, false); From 16e09431ef84ddd1ccf24d2946c0c01cc1f9dcdb Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Sun, 29 Nov 2020 15:09:40 +0200 Subject: [PATCH 5/6] minor fixes to track max_definition correctly --- rust/parquet/src/arrow/arrow_writer.rs | 9 ++------- rust/parquet/src/arrow/levels.rs | 7 +++++-- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/rust/parquet/src/arrow/arrow_writer.rs b/rust/parquet/src/arrow/arrow_writer.rs index 9275d7d35fa..82e1758d9bb 100644 --- a/rust/parquet/src/arrow/arrow_writer.rs +++ b/rust/parquet/src/arrow/arrow_writer.rs @@ -361,11 +361,6 @@ fn get_bool_array_slice( /// Given a level's information, calculate the offsets required to index an array /// correctly. fn filter_array_indices(level: &LevelInfo) -> Vec { - // TODO: we don't quite get the def levels right all the time, so for now we recalculate it - // this has the downside that if no values are populated, the slicing will be wrong - - // TODO: we should reliably track this, to avoid finding the max value - let max_def = level.definition.iter().max().cloned().unwrap(); let mut filtered = vec![]; // remove slots that are false from definition_mask let mut index = 0; @@ -375,7 +370,7 @@ fn filter_array_indices(level: &LevelInfo) -> Vec { .zip(&level.definition_mask) .for_each(|(def, (mask, _))| { if *mask { - if *def == max_def { + if *def == level.max_definition { filtered.push(index); } index += 1; @@ -698,7 +693,7 @@ mod tests { let schema = Schema::new(vec![field_a.clone()]); // create data - let c = Int32Array::from(vec![1,2,3,4,5,6]); + let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]); let b_data = ArrayDataBuilder::new(field_b.data_type().clone()) .len(6) .add_child_data(c.data()) diff --git a/rust/parquet/src/arrow/levels.rs b/rust/parquet/src/arrow/levels.rs index 099ef9b83eb..1c178e3a0eb 100644 --- a/rust/parquet/src/arrow/levels.rs +++ b/rust/parquet/src/arrow/levels.rs @@ -170,7 +170,8 @@ impl LevelInfo { definition_mask: self.definition_mask.clone(), array_offsets: self.array_offsets.clone(), array_mask: self.array_mask.clone(), - max_definition: level, + // nulls will have all definitions being 0, so max value is reduced + max_definition: level - 1, is_list: self.is_list, is_nullable: true, // always nullable as all values are nulls }], @@ -208,7 +209,9 @@ impl LevelInfo { array_offsets: self.array_offsets.clone(), array_mask: self.array_mask.clone(), is_list: self.is_list, - max_definition: level, + // if the current value is non-null, but it's a child of another, we reduce + // the max definition to indicate that all its applicable values can be taken + max_definition: level - ((!field.is_nullable() && level > 1) as i16), is_nullable: field.is_nullable(), }] } From 9d83df3157fc06bd49a0aad6f846e0ee9c79e49c Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Sun, 29 Nov 2020 15:30:55 +0200 Subject: [PATCH 6/6] remove stray TODO to force CI to run --- rust/parquet/src/arrow/arrow_writer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/parquet/src/arrow/arrow_writer.rs b/rust/parquet/src/arrow/arrow_writer.rs index 82e1758d9bb..ca78ffc957b 100644 --- a/rust/parquet/src/arrow/arrow_writer.rs +++ b/rust/parquet/src/arrow/arrow_writer.rs @@ -223,7 +223,7 @@ fn write_leaf( .downcast_ref::() .expect("Unable to get int32 array"); // assigning values to make it easier to debug - let slice = get_numeric_array_slice::(&array, &indices); // TODO: this function is incomplete as it doesn't take into account the actual definition in slicing + let slice = get_numeric_array_slice::(&array, &indices); typed.write_batch( slice.as_slice(), Some(levels.definition.as_slice()),