From 08469ffd017d258a1501a2a5173830e04739eda0 Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Fri, 18 Sep 2020 09:19:57 +0000 Subject: [PATCH 1/4] ARROW-9603: Don't rely on nested fields matching nullabilit of their parents --- .../parquet/arrow/arrow_reader_writer_test.cc | 50 ++- cpp/src/parquet/arrow/path_internal.cc | 8 + cpp/src/parquet/arrow/path_internal.h | 5 + cpp/src/parquet/arrow/writer.cc | 7 +- cpp/src/parquet/column_writer.cc | 299 +++++++++++------- cpp/src/parquet/column_writer.h | 3 +- 6 files changed, 251 insertions(+), 121 deletions(-) diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index 188bc8c178a..408ba8da02a 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -55,6 +55,7 @@ #include "parquet/test_util.h" using arrow::Array; +using arrow::ArrayData; using arrow::ArrayVisitor; using arrow::Buffer; using arrow::ChunkedArray; @@ -686,7 +687,7 @@ TYPED_TEST(TestParquetIO, SingleColumnTableRequiredWrite) { ASSERT_NO_FATAL_FAILURE(this->ReaderFromSink(&reader)); ASSERT_NO_FATAL_FAILURE(this->ReadTableFromFile(std::move(reader), &out)); ASSERT_EQ(1, out->num_columns()); - ASSERT_EQ(100, out->num_rows()); + EXPECT_EQ(100, out->num_rows()); std::shared_ptr chunked_array = out->column(0); ASSERT_EQ(1, chunked_array->num_chunks()); @@ -1085,8 +1086,8 @@ TEST_F(TestUInt32ParquetIO, Parquet_1_0_Compatibility) { } std::vector> buffers{values->null_bitmap(), int64_data}; - auto arr_data = std::make_shared<::arrow::ArrayData>(::arrow::int64(), values->length(), - buffers, values->null_count()); + auto arr_data = std::make_shared(::arrow::int64(), values->length(), buffers, + values->null_count()); std::shared_ptr expected_values = MakeArray(arr_data); ASSERT_NE(expected_values, NULLPTR); @@ -2360,6 +2361,49 @@ TEST(ArrowReadWrite, SingleColumnNullableStruct) { 3); } +TEST(ArrowReadWrite, DisagreeingValidityBitmap) {} + +TEST(ArrowReadWrite, NestedRequiredField) { + auto int_field = ::arrow::field("int_array", ::arrow::int32(), /*nullable=*/false); + auto int_array = ::arrow::ArrayFromJSON(int_field->type(), "[0, 1, 2, 3, 4, 5, 7, 8]"); + auto struct_field = + ::arrow::field("root", ::arrow::struct_({int_field}), /*nullable=*/true); + std::shared_ptr validity_bitmap; + ASSERT_OK_AND_ASSIGN(validity_bitmap, ::arrow::AllocateBitmap(8)); + validity_bitmap->mutable_data()[0] = 0xCC; + + auto struct_data = std::make_shared( + struct_field->type(), /*length=*/8, + std::vector>{validity_bitmap}, + std::vector>{int_array->data()}); + CheckSimpleRoundtrip( + ::arrow::Table::Make( + ::arrow::schema({struct_field}), + {std::make_shared<::arrow::ChunkedArray>(::arrow::MakeArray(struct_data))}), + /*row_group_size=*/8); +} + +TEST(ArrowReadWrite, NestedNullableField) { + auto int_field = ::arrow::field("int_array", ::arrow::int32()); + auto int_array = ::arrow::ArrayFromJSON(int_field->type(), "[0, null, 2, null, 4, 5, null, 8]"); + auto struct_field = + ::arrow::field("root", ::arrow::struct_({int_field}), /*nullable=*/true); + std::shared_ptr validity_bitmap; + ASSERT_OK_AND_ASSIGN(validity_bitmap, ::arrow::AllocateBitmap(8)); + validity_bitmap->mutable_data()[0] = 0xCC; + + auto struct_data = std::make_shared( + struct_field->type(), /*length=*/8, + std::vector>{validity_bitmap}, + std::vector>{int_array->data()}); + CheckSimpleRoundtrip( + ::arrow::Table::Make( + ::arrow::schema({struct_field}), + {std::make_shared<::arrow::ChunkedArray>(::arrow::MakeArray(struct_data))}), + /*row_group_size=*/8); +} + + TEST(TestArrowReadWrite, CanonicalNestedRoundTrip) { auto doc_id = field("DocId", ::arrow::int64(), /*nullable=*/false); auto links = field( diff --git a/cpp/src/parquet/arrow/path_internal.cc b/cpp/src/parquet/arrow/path_internal.cc index e2079b71f21..fbdbdb903fa 100644 --- a/cpp/src/parquet/arrow/path_internal.cc +++ b/cpp/src/parquet/arrow/path_internal.cc @@ -525,6 +525,7 @@ struct PathInfo { int16_t max_def_level = 0; int16_t max_rep_level = 0; bool has_dictionary = false; + bool leaf_is_nullable = false; }; /// Contains logic for writing a single leaf node to parquet. @@ -540,6 +541,7 @@ Status WritePath(ElementRange root_range, PathInfo* path_info, std::vector stack(path_info->path.size()); MultipathLevelBuilderResult builder_result; builder_result.leaf_array = path_info->primitive_array; + builder_result.leaf_is_nullable = path_info->leaf_is_nullable; if (path_info->max_def_level == 0) { // This case only occurs when there are no nullable or repeated @@ -706,6 +708,7 @@ class PathBuilder { explicit PathBuilder(bool start_nullable) : nullable_in_parent_(start_nullable) {} template void AddTerminalInfo(const T& array) { + info_.leaf_is_nullable = nullable_in_parent_; if (nullable_in_parent_) { info_.max_def_level++; } @@ -838,10 +841,13 @@ class PathBuilder { #undef NOT_IMPLEMENTED_VISIT std::vector& paths() { return paths_; } + bool root_is_nullable() const { return root_is_nullable_; } + private: PathInfo info_; std::vector paths_; bool nullable_in_parent_; + bool root_is_nullable_; }; Status PathBuilder::VisitInline(const Array& array) { @@ -871,6 +877,8 @@ class MultipathLevelBuilderImpl : public MultipathLevelBuilder { std::move(write_leaf_callback)); } + bool Nested() const override { return !data_->child_data.empty(); } + private: ElementRange root_range_; // Reference holder to ensure the data stays valid. diff --git a/cpp/src/parquet/arrow/path_internal.h b/cpp/src/parquet/arrow/path_internal.h index 17c486dc7b2..8c8a82238cb 100644 --- a/cpp/src/parquet/arrow/path_internal.h +++ b/cpp/src/parquet/arrow/path_internal.h @@ -91,6 +91,9 @@ struct MultipathLevelBuilderResult { /// This allows for the parquet writing to determine which values ultimately /// needs to be written. std::vector post_list_visited_elements; + + /// Whether the leaf array is nullable. + bool leaf_is_nullable; }; /// \brief Logic for being able to write out nesting (rep/def level) data that is @@ -146,6 +149,8 @@ class PARQUET_EXPORT MultipathLevelBuilder { /// \param[out] write_leaf_callback Callback to receive the result. virtual ::arrow::Status Write(int leaf_index, ArrowWriteContext* context, CallbackFunction write_leaf_callback) = 0; + + virtual bool Nested() const = 0; }; } // namespace arrow diff --git a/cpp/src/parquet/arrow/writer.cc b/cpp/src/parquet/arrow/writer.cc index b922b21728c..fb5250c9f3f 100644 --- a/cpp/src/parquet/arrow/writer.cc +++ b/cpp/src/parquet/arrow/writer.cc @@ -134,15 +134,14 @@ class ArrowColumnWriterV2 { std::shared_ptr values_array = result.leaf_array->Slice(range.start, range.Size()); - return column_writer->WriteArrow(result.def_levels, result.rep_levels, - result.def_rep_level_count, *values_array, - ctx); + PARQUET_CATCH_AND_RETURN(column_writer->WriteArrow( + result.def_levels, result.rep_levels, result.def_rep_level_count, + *values_array, ctx, level_builder->Nested(), result.leaf_is_nullable)); })); } PARQUET_CATCH_NOT_OK(column_writer->Close()); } - return Status::OK(); } diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc index 26dd1845922..82072506f06 100644 --- a/cpp/src/parquet/column_writer.cc +++ b/cpp/src/parquet/column_writer.cc @@ -41,6 +41,7 @@ #include "parquet/encoding.h" #include "parquet/encryption_internal.h" #include "parquet/internal_file_encryptor.h" +#include "parquet/level_conversion.h" #include "parquet/metadata.h" #include "parquet/platform.h" #include "parquet/properties.h" @@ -49,6 +50,8 @@ #include "parquet/thrift_internal.h" #include "parquet/types.h" +using arrow::Array; +using arrow::ArrayData; using arrow::Datum; using arrow::Status; using arrow::BitUtil::BitWriter; @@ -59,6 +62,23 @@ namespace parquet { namespace { +internal::LevelInfo ComputeLevelInfo(const ColumnDescriptor* descr) { + internal::LevelInfo level_info; + level_info.def_level = descr->max_definition_level(); + level_info.rep_level = descr->max_repetition_level(); + + int16_t min_spaced_def_level = descr->max_definition_level(); + const ::parquet::schema::Node* node = descr->schema_node().get(); + while (node != nullptr && !node->is_repeated()) { + if (node->is_optional()) { + min_spaced_def_level--; + } + node = node->parent(); + } + level_info.repeated_ancestor_def_level = min_spaced_def_level; + return level_info; +} + inline const int16_t* AddIfNotNull(const int16_t* base, int64_t offset) { if (base != nullptr) { return base + offset; @@ -543,6 +563,7 @@ class ColumnWriterImpl { Encoding::type encoding, const WriterProperties* properties) : metadata_(metadata), descr_(metadata->descr()), + level_info_(ComputeLevelInfo(metadata->descr())), pager_(std::move(pager)), has_dictionary_(use_dictionary), encoding_(encoding), @@ -563,6 +584,7 @@ class ColumnWriterImpl { std::static_pointer_cast(AllocateBuffer(allocator_, 0)); uncompressed_data_ = std::static_pointer_cast(AllocateBuffer(allocator_, 0)); + if (pager_->has_compressor()) { compressor_temp_buffer_ = std::static_pointer_cast(AllocateBuffer(allocator_, 0)); @@ -627,6 +649,9 @@ class ColumnWriterImpl { ColumnChunkMetaDataBuilder* metadata_; const ColumnDescriptor* descr_; + // scratch buffer if validity bits need to be recalculated. + std::shared_ptr bits_buffer_; + const internal::LevelInfo level_info_; std::unique_ptr pager_; @@ -965,9 +990,11 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< // of values, the chunking will ensure the AddDataPage() is called at a reasonable // pagesize limit int64_t value_offset = 0; + auto WriteChunk = [&](int64_t offset, int64_t batch_size) { int64_t values_to_write = WriteLevels(batch_size, AddIfNotNull(def_levels, offset), AddIfNotNull(rep_levels, offset)); + // PARQUET-780 if (values_to_write > 0) { DCHECK_NE(nullptr, values); @@ -992,11 +1019,21 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< auto WriteChunk = [&](int64_t offset, int64_t batch_size) { int64_t batch_num_values = 0; int64_t batch_num_spaced_values = 0; + int64_t null_count; + MaybeCalculateValidityBits(AddIfNotNull(def_levels, offset), batch_size, + &batch_num_values, &batch_num_spaced_values, + &null_count); + WriteLevelsSpaced(batch_size, AddIfNotNull(def_levels, offset), - AddIfNotNull(rep_levels, offset), &batch_num_values, - &batch_num_spaced_values); - WriteValuesSpaced(values + value_offset, batch_num_values, batch_num_spaced_values, - valid_bits, valid_bits_offset + value_offset); + AddIfNotNull(rep_levels, offset)); + if (bits_buffer_ != nullptr) { + WriteValuesSpaced(values + value_offset, batch_num_values, + batch_num_spaced_values, bits_buffer_->data(), /*offset=*/0); + } else { + WriteValuesSpaced(values + value_offset, batch_num_values, + batch_num_spaced_values, valid_bits, + valid_bits_offset + value_offset); + } CommitWriteAndCheckPageLimit(batch_size, batch_num_spaced_values); value_offset += batch_num_spaced_values; @@ -1009,11 +1046,29 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< Status WriteArrow(const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels, const ::arrow::Array& array, - ArrowWriteContext* ctx) override { + ArrowWriteContext* ctx, bool nested, bool array_nullable) override { + bool leaf_is_not_nullable = !level_info_.HasNullableValues(); + // Leaf nulls are canonical when there is only a single null element and it is at the + // leaf. + bool leaf_nulls_are_canonical = + (level_info_.def_level == level_info_.repeated_ancestor_def_level + 1) && + array_nullable; + bool maybe_has_nulls = nested && !(leaf_is_not_nullable || leaf_nulls_are_canonical); + if (maybe_has_nulls) { + ARROW_ASSIGN_OR_RAISE( + bits_buffer_, + arrow::AllocateResizableBuffer( + BitUtil::BytesForBits(properties_->write_batch_size()), ctx->memory_pool)); + bits_buffer_->ZeroPadding(); + std::static_pointer_cast(AllocateBuffer(allocator_, 0)); + } + if (array.type()->id() == ::arrow::Type::DICTIONARY) { - return WriteArrowDictionary(def_levels, rep_levels, num_levels, array, ctx); + return WriteArrowDictionary(def_levels, rep_levels, num_levels, array, ctx, + maybe_has_nulls); } else { - return WriteArrowDense(def_levels, rep_levels, num_levels, array, ctx); + return WriteArrowDense(def_levels, rep_levels, num_levels, array, ctx, + maybe_has_nulls); } } @@ -1031,11 +1086,11 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< // plain encoding is circumvented Status WriteArrowDictionary(const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels, const ::arrow::Array& array, - ArrowWriteContext* context); + ArrowWriteContext* context, bool maybe_has_nulls); Status WriteArrowDense(const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels, const ::arrow::Array& array, - ArrowWriteContext* context); + ArrowWriteContext* context, bool maybe_has_nulls); void WriteDictionaryPage() override { // We have to dynamic cast here because of TypedEncoder as @@ -1130,37 +1185,60 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< return values_to_write; } + void MaybeCalculateValidityBits(const int16_t* def_levels, int64_t batch_size, + int64_t* out_values_to_write, + int64_t* out_spaced_values_to_write, + int64_t* null_count) { + if (bits_buffer_ == nullptr) { + if (!level_info_.HasNullableValues()) { + *out_values_to_write = batch_size; + *out_spaced_values_to_write = batch_size; + *null_count = 0; + } else { + for (int x = 0; x < batch_size; x++) { + *out_values_to_write += def_levels[x] == level_info_.def_level ? 1 : 0; + *out_spaced_values_to_write += + def_levels[x] >= level_info_.repeated_ancestor_def_level ? 1 : 0; + } + *null_count = *out_values_to_write - *out_spaced_values_to_write; + } + return; + } + // Shrink to fit possible causes another allocation, and would only be necessary + // on the last batch. + int64_t new_bitmap_size = BitUtil::BytesForBits(batch_size); + if (new_bitmap_size != bits_buffer_->size()) { + PARQUET_THROW_NOT_OK( + bits_buffer_->Resize(new_bitmap_size, /*shrink_to_fit=*/false)); + bits_buffer_->ZeroPadding(); + } + internal::ValidityBitmapInputOutput io; + io.valid_bits = bits_buffer_->mutable_data(); + io.values_read_upper_bound = batch_size; + internal::DefLevelsToBitmap(def_levels, batch_size, level_info_, &io); + *out_values_to_write = io.values_read - io.null_count; + *out_spaced_values_to_write = io.values_read; + *null_count = io.null_count; + } + + std::shared_ptr MaybeUpdateArray(std::shared_ptr array, + int64_t new_null_count) { + if (bits_buffer_ == nullptr) { + return array; + } + std::vector> buffers = array->data()->buffers; + buffers[0] = bits_buffer_; + DCHECK(array->num_fields() == 0); + return arrow::MakeArray(std::make_shared( + array->type(), array->length(), std::move(buffers), new_null_count)); + } + void WriteLevelsSpaced(int64_t num_levels, const int16_t* def_levels, - const int16_t* rep_levels, int64_t* out_values_to_write, - int64_t* out_spaced_values_to_write) { - int64_t values_to_write = 0; - int64_t spaced_values_to_write = 0; + const int16_t* rep_levels) { // If the field is required and non-repeated, there are no definition levels if (descr_->max_definition_level() > 0) { - // Minimal definition level for which spaced values are written - int16_t min_spaced_def_level = descr_->max_definition_level(); - const ::parquet::schema::Node* node = descr_->schema_node().get(); - while (node != nullptr && !node->is_repeated()) { - if (node->is_optional()) { - min_spaced_def_level--; - } - node = node->parent(); - } - for (int64_t i = 0; i < num_levels; ++i) { - if (def_levels[i] == descr_->max_definition_level()) { - ++values_to_write; - } - if (def_levels[i] >= min_spaced_def_level) { - ++spaced_values_to_write; - } - } WriteDefinitionLevels(num_levels, def_levels); - } else { - // Required field, write all values - values_to_write = num_levels; - spaced_values_to_write = num_levels; } - // Not present for non-repeated fields if (descr_->max_repetition_level() > 0) { // A row could include more than one value @@ -1170,15 +1248,11 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< rows_written_++; } } - WriteRepetitionLevels(num_levels, rep_levels); } else { // Each value is exactly one row rows_written_ += static_cast(num_levels); } - - *out_values_to_write = values_to_write; - *out_spaced_values_to_write = spaced_values_to_write; } void CommitWriteAndCheckPageLimit(int64_t num_levels, int64_t num_values) { @@ -1234,7 +1308,7 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< void WriteValuesSpaced(const T* values, int64_t num_values, int64_t num_spaced_values, const uint8_t* valid_bits, int64_t valid_bits_offset) { - if (descr_->schema_node()->is_optional()) { + if (num_values != num_spaced_values) { dynamic_cast(current_encoder_.get()) ->PutSpaced(values, static_cast(num_spaced_values), valid_bits, valid_bits_offset); @@ -1251,11 +1325,9 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< }; template -Status TypedColumnWriterImpl::WriteArrowDictionary(const int16_t* def_levels, - const int16_t* rep_levels, - int64_t num_levels, - const ::arrow::Array& array, - ArrowWriteContext* ctx) { +Status TypedColumnWriterImpl::WriteArrowDictionary( + const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels, + const ::arrow::Array& array, ArrowWriteContext* ctx, bool maybe_has_nulls) { // If this is the first time writing a DictionaryArray, then there's // a few possible paths to take: // @@ -1275,7 +1347,8 @@ Status TypedColumnWriterImpl::WriteArrowDictionary(const int16_t* def_lev std::shared_ptr<::arrow::Array> dense_array; RETURN_NOT_OK( ConvertDictionaryToDense(array, properties_->memory_pool(), &dense_array)); - return WriteArrowDense(def_levels, rep_levels, num_levels, *dense_array, ctx); + return WriteArrowDense(def_levels, rep_levels, num_levels, *dense_array, ctx, + maybe_has_nulls); }; if (!IsDictionaryEncoding(current_encoder_->encoding()) || @@ -1298,10 +1371,18 @@ Status TypedColumnWriterImpl::WriteArrowDictionary(const int16_t* def_lev auto WriteIndicesChunk = [&](int64_t offset, int64_t batch_size) { int64_t batch_num_values = 0; int64_t batch_num_spaced_values = 0; + int64_t null_count = arrow::kUnknownNullCount; + // Bits is not null for nullable values. At this point in the code we can't determine + // if the leaf array has the same null values as any parents it might have had so we + // need to recompute it from def levels. + MaybeCalculateValidityBits(AddIfNotNull(def_levels, offset), batch_size, + &batch_num_values, &batch_num_spaced_values, &null_count); WriteLevelsSpaced(batch_size, AddIfNotNull(def_levels, offset), - AddIfNotNull(rep_levels, offset), &batch_num_values, - &batch_num_spaced_values); - dict_encoder->PutIndices(*indices->Slice(value_offset, batch_num_spaced_values)); + AddIfNotNull(rep_levels, offset)); + std::shared_ptr writeable_indices = + indices->Slice(value_offset, batch_num_spaced_values); + writeable_indices = MaybeUpdateArray(writeable_indices, null_count); + dict_encoder->PutIndices(*writeable_indices); CommitWriteAndCheckPageLimit(batch_size, batch_num_values); value_offset += batch_num_spaced_values; }; @@ -1352,21 +1433,19 @@ struct SerializeFunctor { template Status WriteArrowSerialize(const ::arrow::Array& array, int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels, - ArrowWriteContext* ctx, - TypedColumnWriter* writer) { + ArrowWriteContext* ctx, TypedColumnWriter* writer, + bool maybe_has_nulls) { using ParquetCType = typename ParquetType::c_type; using ArrayType = typename ::arrow::TypeTraits::ArrayType; ParquetCType* buffer = nullptr; PARQUET_THROW_NOT_OK(ctx->GetScratchData(array.length(), &buffer)); - bool no_nulls = - writer->descr()->schema_node()->is_required() || (array.null_count() == 0); - SerializeFunctor functor; RETURN_NOT_OK(functor.Serialize(checked_cast(array), ctx, buffer)); - - if (no_nulls) { + bool no_nulls = + writer->descr()->schema_node()->is_required() || (array.null_count() == 0); + if (!maybe_has_nulls && no_nulls) { PARQUET_CATCH_NOT_OK(writer->WriteBatch(num_levels, def_levels, rep_levels, buffer)); } else { PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced(num_levels, def_levels, rep_levels, @@ -1379,8 +1458,8 @@ Status WriteArrowSerialize(const ::arrow::Array& array, int64_t num_levels, template Status WriteArrowZeroCopy(const ::arrow::Array& array, int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels, - ArrowWriteContext* ctx, - TypedColumnWriter* writer) { + ArrowWriteContext* ctx, TypedColumnWriter* writer, + bool maybe_has_nulls) { using T = typename ParquetType::c_type; const auto& data = static_cast(array); const T* values = nullptr; @@ -1390,7 +1469,10 @@ Status WriteArrowZeroCopy(const ::arrow::Array& array, int64_t num_levels, } else { DCHECK_EQ(data.length(), 0); } - if (writer->descr()->schema_node()->is_required() || (data.null_count() == 0)) { + bool no_nulls = + writer->descr()->schema_node()->is_required() || (array.null_count() == 0); + + if (!maybe_has_nulls && no_nulls) { PARQUET_CATCH_NOT_OK(writer->WriteBatch(num_levels, def_levels, rep_levels, values)); } else { PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced(num_levels, def_levels, rep_levels, @@ -1403,12 +1485,12 @@ Status WriteArrowZeroCopy(const ::arrow::Array& array, int64_t num_levels, #define WRITE_SERIALIZE_CASE(ArrowEnum, ArrowType, ParquetType) \ case ::arrow::Type::ArrowEnum: \ return WriteArrowSerialize( \ - array, num_levels, def_levels, rep_levels, ctx, this); + array, num_levels, def_levels, rep_levels, ctx, this, maybe_has_nulls); #define WRITE_ZERO_COPY_CASE(ArrowEnum, ArrowType, ParquetType) \ case ::arrow::Type::ArrowEnum: \ return WriteArrowZeroCopy(array, num_levels, def_levels, rep_levels, \ - ctx, this); + ctx, this, maybe_has_nulls); #define ARROW_UNSUPPORTED() \ std::stringstream ss; \ @@ -1430,16 +1512,14 @@ struct SerializeFunctor { }; template <> -Status TypedColumnWriterImpl::WriteArrowDense(const int16_t* def_levels, - const int16_t* rep_levels, - int64_t num_levels, - const ::arrow::Array& array, - ArrowWriteContext* ctx) { +Status TypedColumnWriterImpl::WriteArrowDense( + const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels, + const ::arrow::Array& array, ArrowWriteContext* ctx, bool maybe_has_nulls) { if (array.type_id() != ::arrow::Type::BOOL) { ARROW_UNSUPPORTED(); } return WriteArrowSerialize( - array, num_levels, def_levels, rep_levels, ctx, this); + array, num_levels, def_levels, rep_levels, ctx, this, maybe_has_nulls); } // ---------------------------------------------------------------------- @@ -1473,11 +1553,9 @@ struct SerializeFunctor { }; template <> -Status TypedColumnWriterImpl::WriteArrowDense(const int16_t* def_levels, - const int16_t* rep_levels, - int64_t num_levels, - const ::arrow::Array& array, - ArrowWriteContext* ctx) { +Status TypedColumnWriterImpl::WriteArrowDense( + const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels, + const ::arrow::Array& array, ArrowWriteContext* ctx, bool maybe_has_nulls) { switch (array.type()->id()) { case ::arrow::Type::NA: { PARQUET_CATCH_NOT_OK(WriteBatch(num_levels, def_levels, rep_levels, nullptr)); @@ -1599,14 +1677,15 @@ struct SerializeFunctor { Status WriteTimestamps(const ::arrow::Array& values, int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels, - ArrowWriteContext* ctx, TypedColumnWriter* writer) { + ArrowWriteContext* ctx, TypedColumnWriter* writer, + bool maybe_has_nulls) { const auto& source_type = static_cast(*values.type()); auto WriteCoerce = [&](const ArrowWriterProperties* properties) { ArrowWriteContext temp_ctx = *ctx; temp_ctx.properties = properties; return WriteArrowSerialize( - values, num_levels, def_levels, rep_levels, &temp_ctx, writer); + values, num_levels, def_levels, rep_levels, &temp_ctx, writer, maybe_has_nulls); }; if (ctx->properties->coerce_timestamps_enabled()) { @@ -1614,7 +1693,7 @@ Status WriteTimestamps(const ::arrow::Array& values, int64_t num_levels, if (source_type.unit() == ctx->properties->coerce_timestamps_unit()) { // No data conversion necessary return WriteArrowZeroCopy(values, num_levels, def_levels, rep_levels, - ctx, writer); + ctx, writer, maybe_has_nulls); } else { return WriteCoerce(ctx->properties); } @@ -1639,19 +1718,18 @@ Status WriteTimestamps(const ::arrow::Array& values, int64_t num_levels, } else { // No data conversion necessary return WriteArrowZeroCopy(values, num_levels, def_levels, rep_levels, ctx, - writer); + writer, maybe_has_nulls); } } template <> -Status TypedColumnWriterImpl::WriteArrowDense(const int16_t* def_levels, - const int16_t* rep_levels, - int64_t num_levels, - const ::arrow::Array& array, - ArrowWriteContext* ctx) { +Status TypedColumnWriterImpl::WriteArrowDense( + const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels, + const ::arrow::Array& array, ArrowWriteContext* ctx, bool maybe_has_nulls) { switch (array.type()->id()) { case ::arrow::Type::TIMESTAMP: - return WriteTimestamps(array, num_levels, def_levels, rep_levels, ctx, this); + return WriteTimestamps(array, num_levels, def_levels, rep_levels, ctx, this, + maybe_has_nulls); WRITE_ZERO_COPY_CASE(INT64, Int64Type, Int64Type) WRITE_SERIALIZE_CASE(UINT32, UInt32Type, Int64Type) WRITE_SERIALIZE_CASE(UINT64, UInt64Type, Int64Type) @@ -1662,56 +1740,48 @@ Status TypedColumnWriterImpl::WriteArrowDense(const int16_t* def_leve } template <> -Status TypedColumnWriterImpl::WriteArrowDense(const int16_t* def_levels, - const int16_t* rep_levels, - int64_t num_levels, - const ::arrow::Array& array, - ArrowWriteContext* ctx) { +Status TypedColumnWriterImpl::WriteArrowDense( + const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels, + const ::arrow::Array& array, ArrowWriteContext* ctx, bool maybe_has_nulls) { if (array.type_id() != ::arrow::Type::TIMESTAMP) { ARROW_UNSUPPORTED(); } return WriteArrowSerialize( - array, num_levels, def_levels, rep_levels, ctx, this); + array, num_levels, def_levels, rep_levels, ctx, this, maybe_has_nulls); } // ---------------------------------------------------------------------- // Floating point types template <> -Status TypedColumnWriterImpl::WriteArrowDense(const int16_t* def_levels, - const int16_t* rep_levels, - int64_t num_levels, - const ::arrow::Array& array, - ArrowWriteContext* ctx) { +Status TypedColumnWriterImpl::WriteArrowDense( + const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels, + const ::arrow::Array& array, ArrowWriteContext* ctx, bool maybe_has_nulls) { if (array.type_id() != ::arrow::Type::FLOAT) { ARROW_UNSUPPORTED(); } return WriteArrowZeroCopy(array, num_levels, def_levels, rep_levels, ctx, - this); + this, maybe_has_nulls); } template <> -Status TypedColumnWriterImpl::WriteArrowDense(const int16_t* def_levels, - const int16_t* rep_levels, - int64_t num_levels, - const ::arrow::Array& array, - ArrowWriteContext* ctx) { +Status TypedColumnWriterImpl::WriteArrowDense( + const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels, + const ::arrow::Array& array, ArrowWriteContext* ctx, bool maybe_has_nulls) { if (array.type_id() != ::arrow::Type::DOUBLE) { ARROW_UNSUPPORTED(); } return WriteArrowZeroCopy(array, num_levels, def_levels, rep_levels, ctx, - this); + this, maybe_has_nulls); } // ---------------------------------------------------------------------- // Write Arrow to BYTE_ARRAY template <> -Status TypedColumnWriterImpl::WriteArrowDense(const int16_t* def_levels, - const int16_t* rep_levels, - int64_t num_levels, - const ::arrow::Array& array, - ArrowWriteContext* ctx) { +Status TypedColumnWriterImpl::WriteArrowDense( + const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels, + const ::arrow::Array& array, ArrowWriteContext* ctx, bool maybe_has_nulls) { if (array.type()->id() != ::arrow::Type::BINARY && array.type()->id() != ::arrow::Type::STRING) { ARROW_UNSUPPORTED(); @@ -1721,11 +1791,16 @@ Status TypedColumnWriterImpl::WriteArrowDense(const int16_t* def_ auto WriteChunk = [&](int64_t offset, int64_t batch_size) { int64_t batch_num_values = 0; int64_t batch_num_spaced_values = 0; + int64_t null_count = 0; + + MaybeCalculateValidityBits(AddIfNotNull(def_levels, offset), batch_size, + &batch_num_values, &batch_num_spaced_values, &null_count); WriteLevelsSpaced(batch_size, AddIfNotNull(def_levels, offset), - AddIfNotNull(rep_levels, offset), &batch_num_values, - &batch_num_spaced_values); - std::shared_ptr<::arrow::Array> data_slice = + AddIfNotNull(rep_levels, offset)); + std::shared_ptr data_slice = array.Slice(value_offset, batch_num_spaced_values); + data_slice = MaybeUpdateArray(data_slice, null_count); + current_encoder_->Put(*data_slice); if (page_statistics_ != nullptr) { page_statistics_->Update(*data_slice); @@ -1824,11 +1899,9 @@ struct SerializeFunctor -Status TypedColumnWriterImpl::WriteArrowDense(const int16_t* def_levels, - const int16_t* rep_levels, - int64_t num_levels, - const ::arrow::Array& array, - ArrowWriteContext* ctx) { +Status TypedColumnWriterImpl::WriteArrowDense( + const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels, + const ::arrow::Array& array, ArrowWriteContext* ctx, bool maybe_has_nulls) { switch (array.type()->id()) { WRITE_SERIALIZE_CASE(FIXED_SIZE_BINARY, FixedSizeBinaryType, FLBAType) WRITE_SERIALIZE_CASE(DECIMAL, Decimal128Type, FLBAType) diff --git a/cpp/src/parquet/column_writer.h b/cpp/src/parquet/column_writer.h index 02f4725754e..f831ca51d4b 100644 --- a/cpp/src/parquet/column_writer.h +++ b/cpp/src/parquet/column_writer.h @@ -144,7 +144,8 @@ class PARQUET_EXPORT ColumnWriter { /// writer type virtual ::arrow::Status WriteArrow(const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels, const ::arrow::Array& array, - ArrowWriteContext* ctx) = 0; + ArrowWriteContext* ctx, bool nested_array, + bool array_is_nullable) = 0; }; // API to write values to a single column. This is the main client facing API. From 96d2ad507dcdd38e64f2250b5ad4ef0b32d79cf2 Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Tue, 22 Sep 2020 04:32:37 +0000 Subject: [PATCH 2/4] address PR comments --- .../parquet/arrow/arrow_reader_writer_test.cc | 34 +++----- cpp/src/parquet/arrow/path_internal.cc | 5 +- cpp/src/parquet/arrow/path_internal.h | 2 +- cpp/src/parquet/arrow/reader.cc | 7 -- cpp/src/parquet/arrow/writer.cc | 4 +- cpp/src/parquet/column_writer.cc | 77 ++++++++++--------- cpp/src/parquet/exception.h | 7 ++ 7 files changed, 64 insertions(+), 72 deletions(-) diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index 408ba8da02a..7ce81651c00 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -2361,8 +2361,6 @@ TEST(ArrowReadWrite, SingleColumnNullableStruct) { 3); } -TEST(ArrowReadWrite, DisagreeingValidityBitmap) {} - TEST(ArrowReadWrite, NestedRequiredField) { auto int_field = ::arrow::field("int_array", ::arrow::int32(), /*nullable=*/false); auto int_array = ::arrow::ArrayFromJSON(int_field->type(), "[0, 1, 2, 3, 4, 5, 7, 8]"); @@ -2372,38 +2370,30 @@ TEST(ArrowReadWrite, NestedRequiredField) { ASSERT_OK_AND_ASSIGN(validity_bitmap, ::arrow::AllocateBitmap(8)); validity_bitmap->mutable_data()[0] = 0xCC; - auto struct_data = std::make_shared( - struct_field->type(), /*length=*/8, - std::vector>{validity_bitmap}, - std::vector>{int_array->data()}); - CheckSimpleRoundtrip( - ::arrow::Table::Make( - ::arrow::schema({struct_field}), - {std::make_shared<::arrow::ChunkedArray>(::arrow::MakeArray(struct_data))}), - /*row_group_size=*/8); + auto struct_data = ArrayData::Make(struct_field->type(), /*length=*/8, + {validity_bitmap}, {int_array->data()}); + CheckSimpleRoundtrip(::arrow::Table::Make(::arrow::schema({struct_field}), + {::arrow::MakeArray(struct_data)}), + /*row_group_size=*/8); } TEST(ArrowReadWrite, NestedNullableField) { auto int_field = ::arrow::field("int_array", ::arrow::int32()); - auto int_array = ::arrow::ArrayFromJSON(int_field->type(), "[0, null, 2, null, 4, 5, null, 8]"); + auto int_array = + ::arrow::ArrayFromJSON(int_field->type(), "[0, null, 2, null, 4, 5, null, 8]"); auto struct_field = ::arrow::field("root", ::arrow::struct_({int_field}), /*nullable=*/true); std::shared_ptr validity_bitmap; ASSERT_OK_AND_ASSIGN(validity_bitmap, ::arrow::AllocateBitmap(8)); validity_bitmap->mutable_data()[0] = 0xCC; - auto struct_data = std::make_shared( - struct_field->type(), /*length=*/8, - std::vector>{validity_bitmap}, - std::vector>{int_array->data()}); - CheckSimpleRoundtrip( - ::arrow::Table::Make( - ::arrow::schema({struct_field}), - {std::make_shared<::arrow::ChunkedArray>(::arrow::MakeArray(struct_data))}), - /*row_group_size=*/8); + auto struct_data = ArrayData::Make(struct_field->type(), /*length=*/8, + {validity_bitmap}, {int_array->data()}); + CheckSimpleRoundtrip(::arrow::Table::Make(::arrow::schema({struct_field}), + {::arrow::MakeArray(struct_data)}), + /*row_group_size=*/8); } - TEST(TestArrowReadWrite, CanonicalNestedRoundTrip) { auto doc_id = field("DocId", ::arrow::int64(), /*nullable=*/false); auto links = field( diff --git a/cpp/src/parquet/arrow/path_internal.cc b/cpp/src/parquet/arrow/path_internal.cc index fbdbdb903fa..16db68654b2 100644 --- a/cpp/src/parquet/arrow/path_internal.cc +++ b/cpp/src/parquet/arrow/path_internal.cc @@ -841,13 +841,10 @@ class PathBuilder { #undef NOT_IMPLEMENTED_VISIT std::vector& paths() { return paths_; } - bool root_is_nullable() const { return root_is_nullable_; } - private: PathInfo info_; std::vector paths_; bool nullable_in_parent_; - bool root_is_nullable_; }; Status PathBuilder::VisitInline(const Array& array) { @@ -877,7 +874,7 @@ class MultipathLevelBuilderImpl : public MultipathLevelBuilder { std::move(write_leaf_callback)); } - bool Nested() const override { return !data_->child_data.empty(); } + bool IsNested() const override { return !data_->child_data.empty(); } private: ElementRange root_range_; diff --git a/cpp/src/parquet/arrow/path_internal.h b/cpp/src/parquet/arrow/path_internal.h index 8c8a82238cb..cbc525da21f 100644 --- a/cpp/src/parquet/arrow/path_internal.h +++ b/cpp/src/parquet/arrow/path_internal.h @@ -150,7 +150,7 @@ class PARQUET_EXPORT MultipathLevelBuilder { virtual ::arrow::Status Write(int leaf_index, ArrowWriteContext* context, CallbackFunction write_leaf_callback) = 0; - virtual bool Nested() const = 0; + virtual bool IsNested() const = 0; }; } // namespace arrow diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index 8b3bfcb5704..bc1ceadf5f7 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -65,13 +65,6 @@ using ParquetReader = parquet::ParquetFileReader; using parquet::internal::RecordReader; -#define BEGIN_PARQUET_CATCH_EXCEPTIONS try { -#define END_PARQUET_CATCH_EXCEPTIONS \ - } \ - catch (const ::parquet::ParquetException& e) { \ - return ::arrow::Status::IOError(e.what()); \ - } - namespace parquet { namespace arrow { namespace { diff --git a/cpp/src/parquet/arrow/writer.cc b/cpp/src/parquet/arrow/writer.cc index fb5250c9f3f..d7468ef2022 100644 --- a/cpp/src/parquet/arrow/writer.cc +++ b/cpp/src/parquet/arrow/writer.cc @@ -134,9 +134,9 @@ class ArrowColumnWriterV2 { std::shared_ptr values_array = result.leaf_array->Slice(range.start, range.Size()); - PARQUET_CATCH_AND_RETURN(column_writer->WriteArrow( + return column_writer->WriteArrow( result.def_levels, result.rep_levels, result.def_rep_level_count, - *values_array, ctx, level_builder->Nested(), result.leaf_is_nullable)); + *values_array, ctx, level_builder->IsNested(), result.leaf_is_nullable); })); } diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc index 82072506f06..d71e98c7383 100644 --- a/cpp/src/parquet/column_writer.cc +++ b/cpp/src/parquet/column_writer.cc @@ -1047,14 +1047,16 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< Status WriteArrow(const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels, const ::arrow::Array& array, ArrowWriteContext* ctx, bool nested, bool array_nullable) override { + BEGIN_PARQUET_CATCH_EXCEPTIONS bool leaf_is_not_nullable = !level_info_.HasNullableValues(); // Leaf nulls are canonical when there is only a single null element and it is at the // leaf. bool leaf_nulls_are_canonical = (level_info_.def_level == level_info_.repeated_ancestor_def_level + 1) && array_nullable; - bool maybe_has_nulls = nested && !(leaf_is_not_nullable || leaf_nulls_are_canonical); - if (maybe_has_nulls) { + bool maybe_parent_nulls = + nested && !(leaf_is_not_nullable || leaf_nulls_are_canonical); + if (maybe_parent_nulls) { ARROW_ASSIGN_OR_RAISE( bits_buffer_, arrow::AllocateResizableBuffer( @@ -1065,11 +1067,12 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< if (array.type()->id() == ::arrow::Type::DICTIONARY) { return WriteArrowDictionary(def_levels, rep_levels, num_levels, array, ctx, - maybe_has_nulls); + maybe_parent_nulls); } else { return WriteArrowDense(def_levels, rep_levels, num_levels, array, ctx, - maybe_has_nulls); + maybe_parent_nulls); } + END_PARQUET_CATCH_EXCEPTIONS } int64_t EstimatedBufferedValueBytes() const override { @@ -1086,11 +1089,11 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< // plain encoding is circumvented Status WriteArrowDictionary(const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels, const ::arrow::Array& array, - ArrowWriteContext* context, bool maybe_has_nulls); + ArrowWriteContext* context, bool maybe_parent_nulls); Status WriteArrowDense(const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels, const ::arrow::Array& array, - ArrowWriteContext* context, bool maybe_has_nulls); + ArrowWriteContext* context, bool maybe_parent_nulls); void WriteDictionaryPage() override { // We have to dynamic cast here because of TypedEncoder as @@ -1221,14 +1224,15 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< *null_count = io.null_count; } - std::shared_ptr MaybeUpdateArray(std::shared_ptr array, - int64_t new_null_count) { + std::shared_ptr MaybeReplaceValidity(std::shared_ptr array, + int64_t new_null_count) { if (bits_buffer_ == nullptr) { return array; } std::vector> buffers = array->data()->buffers; buffers[0] = bits_buffer_; - DCHECK(array->num_fields() == 0); + // Should be a leaf array. + DCHECK_EQ(array->num_fields(), 0); return arrow::MakeArray(std::make_shared( array->type(), array->length(), std::move(buffers), new_null_count)); } @@ -1327,7 +1331,7 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< template Status TypedColumnWriterImpl::WriteArrowDictionary( const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels, - const ::arrow::Array& array, ArrowWriteContext* ctx, bool maybe_has_nulls) { + const ::arrow::Array& array, ArrowWriteContext* ctx, bool maybe_parent_nulls) { // If this is the first time writing a DictionaryArray, then there's // a few possible paths to take: // @@ -1348,7 +1352,7 @@ Status TypedColumnWriterImpl::WriteArrowDictionary( RETURN_NOT_OK( ConvertDictionaryToDense(array, properties_->memory_pool(), &dense_array)); return WriteArrowDense(def_levels, rep_levels, num_levels, *dense_array, ctx, - maybe_has_nulls); + maybe_parent_nulls); }; if (!IsDictionaryEncoding(current_encoder_->encoding()) || @@ -1381,7 +1385,7 @@ Status TypedColumnWriterImpl::WriteArrowDictionary( AddIfNotNull(rep_levels, offset)); std::shared_ptr writeable_indices = indices->Slice(value_offset, batch_num_spaced_values); - writeable_indices = MaybeUpdateArray(writeable_indices, null_count); + writeable_indices = MaybeReplaceValidity(writeable_indices, null_count); dict_encoder->PutIndices(*writeable_indices); CommitWriteAndCheckPageLimit(batch_size, batch_num_values); value_offset += batch_num_spaced_values; @@ -1434,7 +1438,7 @@ template Status WriteArrowSerialize(const ::arrow::Array& array, int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels, ArrowWriteContext* ctx, TypedColumnWriter* writer, - bool maybe_has_nulls) { + bool maybe_parent_nulls) { using ParquetCType = typename ParquetType::c_type; using ArrayType = typename ::arrow::TypeTraits::ArrayType; @@ -1445,7 +1449,7 @@ Status WriteArrowSerialize(const ::arrow::Array& array, int64_t num_levels, RETURN_NOT_OK(functor.Serialize(checked_cast(array), ctx, buffer)); bool no_nulls = writer->descr()->schema_node()->is_required() || (array.null_count() == 0); - if (!maybe_has_nulls && no_nulls) { + if (!maybe_parent_nulls && no_nulls) { PARQUET_CATCH_NOT_OK(writer->WriteBatch(num_levels, def_levels, rep_levels, buffer)); } else { PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced(num_levels, def_levels, rep_levels, @@ -1459,7 +1463,7 @@ template Status WriteArrowZeroCopy(const ::arrow::Array& array, int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels, ArrowWriteContext* ctx, TypedColumnWriter* writer, - bool maybe_has_nulls) { + bool maybe_parent_nulls) { using T = typename ParquetType::c_type; const auto& data = static_cast(array); const T* values = nullptr; @@ -1472,7 +1476,7 @@ Status WriteArrowZeroCopy(const ::arrow::Array& array, int64_t num_levels, bool no_nulls = writer->descr()->schema_node()->is_required() || (array.null_count() == 0); - if (!maybe_has_nulls && no_nulls) { + if (!maybe_parent_nulls && no_nulls) { PARQUET_CATCH_NOT_OK(writer->WriteBatch(num_levels, def_levels, rep_levels, values)); } else { PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced(num_levels, def_levels, rep_levels, @@ -1485,12 +1489,12 @@ Status WriteArrowZeroCopy(const ::arrow::Array& array, int64_t num_levels, #define WRITE_SERIALIZE_CASE(ArrowEnum, ArrowType, ParquetType) \ case ::arrow::Type::ArrowEnum: \ return WriteArrowSerialize( \ - array, num_levels, def_levels, rep_levels, ctx, this, maybe_has_nulls); + array, num_levels, def_levels, rep_levels, ctx, this, maybe_parent_nulls); #define WRITE_ZERO_COPY_CASE(ArrowEnum, ArrowType, ParquetType) \ case ::arrow::Type::ArrowEnum: \ return WriteArrowZeroCopy(array, num_levels, def_levels, rep_levels, \ - ctx, this, maybe_has_nulls); + ctx, this, maybe_parent_nulls); #define ARROW_UNSUPPORTED() \ std::stringstream ss; \ @@ -1514,12 +1518,12 @@ struct SerializeFunctor { template <> Status TypedColumnWriterImpl::WriteArrowDense( const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels, - const ::arrow::Array& array, ArrowWriteContext* ctx, bool maybe_has_nulls) { + const ::arrow::Array& array, ArrowWriteContext* ctx, bool maybe_parent_nulls) { if (array.type_id() != ::arrow::Type::BOOL) { ARROW_UNSUPPORTED(); } return WriteArrowSerialize( - array, num_levels, def_levels, rep_levels, ctx, this, maybe_has_nulls); + array, num_levels, def_levels, rep_levels, ctx, this, maybe_parent_nulls); } // ---------------------------------------------------------------------- @@ -1555,7 +1559,7 @@ struct SerializeFunctor { template <> Status TypedColumnWriterImpl::WriteArrowDense( const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels, - const ::arrow::Array& array, ArrowWriteContext* ctx, bool maybe_has_nulls) { + const ::arrow::Array& array, ArrowWriteContext* ctx, bool maybe_parent_nulls) { switch (array.type()->id()) { case ::arrow::Type::NA: { PARQUET_CATCH_NOT_OK(WriteBatch(num_levels, def_levels, rep_levels, nullptr)); @@ -1678,14 +1682,15 @@ struct SerializeFunctor { Status WriteTimestamps(const ::arrow::Array& values, int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels, ArrowWriteContext* ctx, TypedColumnWriter* writer, - bool maybe_has_nulls) { + bool maybe_parent_nulls) { const auto& source_type = static_cast(*values.type()); auto WriteCoerce = [&](const ArrowWriterProperties* properties) { ArrowWriteContext temp_ctx = *ctx; temp_ctx.properties = properties; return WriteArrowSerialize( - values, num_levels, def_levels, rep_levels, &temp_ctx, writer, maybe_has_nulls); + values, num_levels, def_levels, rep_levels, &temp_ctx, writer, + maybe_parent_nulls); }; if (ctx->properties->coerce_timestamps_enabled()) { @@ -1693,7 +1698,7 @@ Status WriteTimestamps(const ::arrow::Array& values, int64_t num_levels, if (source_type.unit() == ctx->properties->coerce_timestamps_unit()) { // No data conversion necessary return WriteArrowZeroCopy(values, num_levels, def_levels, rep_levels, - ctx, writer, maybe_has_nulls); + ctx, writer, maybe_parent_nulls); } else { return WriteCoerce(ctx->properties); } @@ -1718,18 +1723,18 @@ Status WriteTimestamps(const ::arrow::Array& values, int64_t num_levels, } else { // No data conversion necessary return WriteArrowZeroCopy(values, num_levels, def_levels, rep_levels, ctx, - writer, maybe_has_nulls); + writer, maybe_parent_nulls); } } template <> Status TypedColumnWriterImpl::WriteArrowDense( const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels, - const ::arrow::Array& array, ArrowWriteContext* ctx, bool maybe_has_nulls) { + const ::arrow::Array& array, ArrowWriteContext* ctx, bool maybe_parent_nulls) { switch (array.type()->id()) { case ::arrow::Type::TIMESTAMP: return WriteTimestamps(array, num_levels, def_levels, rep_levels, ctx, this, - maybe_has_nulls); + maybe_parent_nulls); WRITE_ZERO_COPY_CASE(INT64, Int64Type, Int64Type) WRITE_SERIALIZE_CASE(UINT32, UInt32Type, Int64Type) WRITE_SERIALIZE_CASE(UINT64, UInt64Type, Int64Type) @@ -1742,12 +1747,12 @@ Status TypedColumnWriterImpl::WriteArrowDense( template <> Status TypedColumnWriterImpl::WriteArrowDense( const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels, - const ::arrow::Array& array, ArrowWriteContext* ctx, bool maybe_has_nulls) { + const ::arrow::Array& array, ArrowWriteContext* ctx, bool maybe_parent_nulls) { if (array.type_id() != ::arrow::Type::TIMESTAMP) { ARROW_UNSUPPORTED(); } return WriteArrowSerialize( - array, num_levels, def_levels, rep_levels, ctx, this, maybe_has_nulls); + array, num_levels, def_levels, rep_levels, ctx, this, maybe_parent_nulls); } // ---------------------------------------------------------------------- @@ -1756,23 +1761,23 @@ Status TypedColumnWriterImpl::WriteArrowDense( template <> Status TypedColumnWriterImpl::WriteArrowDense( const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels, - const ::arrow::Array& array, ArrowWriteContext* ctx, bool maybe_has_nulls) { + const ::arrow::Array& array, ArrowWriteContext* ctx, bool maybe_parent_nulls) { if (array.type_id() != ::arrow::Type::FLOAT) { ARROW_UNSUPPORTED(); } return WriteArrowZeroCopy(array, num_levels, def_levels, rep_levels, ctx, - this, maybe_has_nulls); + this, maybe_parent_nulls); } template <> Status TypedColumnWriterImpl::WriteArrowDense( const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels, - const ::arrow::Array& array, ArrowWriteContext* ctx, bool maybe_has_nulls) { + const ::arrow::Array& array, ArrowWriteContext* ctx, bool maybe_parent_nulls) { if (array.type_id() != ::arrow::Type::DOUBLE) { ARROW_UNSUPPORTED(); } return WriteArrowZeroCopy(array, num_levels, def_levels, rep_levels, ctx, - this, maybe_has_nulls); + this, maybe_parent_nulls); } // ---------------------------------------------------------------------- @@ -1781,7 +1786,7 @@ Status TypedColumnWriterImpl::WriteArrowDense( template <> Status TypedColumnWriterImpl::WriteArrowDense( const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels, - const ::arrow::Array& array, ArrowWriteContext* ctx, bool maybe_has_nulls) { + const ::arrow::Array& array, ArrowWriteContext* ctx, bool maybe_parent_nulls) { if (array.type()->id() != ::arrow::Type::BINARY && array.type()->id() != ::arrow::Type::STRING) { ARROW_UNSUPPORTED(); @@ -1799,7 +1804,7 @@ Status TypedColumnWriterImpl::WriteArrowDense( AddIfNotNull(rep_levels, offset)); std::shared_ptr data_slice = array.Slice(value_offset, batch_num_spaced_values); - data_slice = MaybeUpdateArray(data_slice, null_count); + data_slice = MaybeReplaceValidity(data_slice, null_count); current_encoder_->Put(*data_slice); if (page_statistics_ != nullptr) { @@ -1901,7 +1906,7 @@ struct SerializeFunctor Status TypedColumnWriterImpl::WriteArrowDense( const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels, - const ::arrow::Array& array, ArrowWriteContext* ctx, bool maybe_has_nulls) { + const ::arrow::Array& array, ArrowWriteContext* ctx, bool maybe_parent_nulls) { switch (array.type()->id()) { WRITE_SERIALIZE_CASE(FIXED_SIZE_BINARY, FixedSizeBinaryType, FLBAType) WRITE_SERIALIZE_CASE(DECIMAL, Decimal128Type, FLBAType) diff --git a/cpp/src/parquet/exception.h b/cpp/src/parquet/exception.h index 3f97c1e9267..1640decf0a3 100644 --- a/cpp/src/parquet/exception.h +++ b/cpp/src/parquet/exception.h @@ -140,4 +140,11 @@ void ThrowNotOk(StatusReturnBlock&& b) { PARQUET_THROW_NOT_OK(b()); } +#define BEGIN_PARQUET_CATCH_EXCEPTIONS try { +#define END_PARQUET_CATCH_EXCEPTIONS \ + } \ + catch (const ::parquet::ParquetException& e) { \ + return ::arrow::Status::IOError(e.what()); \ + } + } // namespace parquet From 5f9fff115ab178a943ba02f5f3915e27a51392ec Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Tue, 22 Sep 2020 15:55:57 +0000 Subject: [PATCH 3/4] cleanup --- cpp/src/parquet/arrow/path_internal.cc | 2 -- cpp/src/parquet/arrow/path_internal.h | 2 -- cpp/src/parquet/arrow/writer.cc | 6 +++--- cpp/src/parquet/column_writer.cc | 23 ++++++++++------------- cpp/src/parquet/column_writer.h | 10 ++++++---- 5 files changed, 19 insertions(+), 24 deletions(-) diff --git a/cpp/src/parquet/arrow/path_internal.cc b/cpp/src/parquet/arrow/path_internal.cc index 16db68654b2..0f570b7e0ca 100644 --- a/cpp/src/parquet/arrow/path_internal.cc +++ b/cpp/src/parquet/arrow/path_internal.cc @@ -874,8 +874,6 @@ class MultipathLevelBuilderImpl : public MultipathLevelBuilder { std::move(write_leaf_callback)); } - bool IsNested() const override { return !data_->child_data.empty(); } - private: ElementRange root_range_; // Reference holder to ensure the data stays valid. diff --git a/cpp/src/parquet/arrow/path_internal.h b/cpp/src/parquet/arrow/path_internal.h index cbc525da21f..c5b7fdfdac3 100644 --- a/cpp/src/parquet/arrow/path_internal.h +++ b/cpp/src/parquet/arrow/path_internal.h @@ -149,8 +149,6 @@ class PARQUET_EXPORT MultipathLevelBuilder { /// \param[out] write_leaf_callback Callback to receive the result. virtual ::arrow::Status Write(int leaf_index, ArrowWriteContext* context, CallbackFunction write_leaf_callback) = 0; - - virtual bool IsNested() const = 0; }; } // namespace arrow diff --git a/cpp/src/parquet/arrow/writer.cc b/cpp/src/parquet/arrow/writer.cc index d7468ef2022..d42b257b38c 100644 --- a/cpp/src/parquet/arrow/writer.cc +++ b/cpp/src/parquet/arrow/writer.cc @@ -134,9 +134,9 @@ class ArrowColumnWriterV2 { std::shared_ptr values_array = result.leaf_array->Slice(range.start, range.Size()); - return column_writer->WriteArrow( - result.def_levels, result.rep_levels, result.def_rep_level_count, - *values_array, ctx, level_builder->IsNested(), result.leaf_is_nullable); + return column_writer->WriteArrow(result.def_levels, result.rep_levels, + result.def_rep_level_count, *values_array, + ctx, result.leaf_is_nullable); })); } diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc index d71e98c7383..0e366dc05df 100644 --- a/cpp/src/parquet/column_writer.cc +++ b/cpp/src/parquet/column_writer.cc @@ -1045,31 +1045,28 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< } Status WriteArrow(const int16_t* def_levels, const int16_t* rep_levels, - int64_t num_levels, const ::arrow::Array& array, - ArrowWriteContext* ctx, bool nested, bool array_nullable) override { + int64_t num_levels, const ::arrow::Array& leaf_array, + ArrowWriteContext* ctx, bool leaf_field_nullable) override { BEGIN_PARQUET_CATCH_EXCEPTIONS - bool leaf_is_not_nullable = !level_info_.HasNullableValues(); - // Leaf nulls are canonical when there is only a single null element and it is at the - // leaf. - bool leaf_nulls_are_canonical = + // Leaf nulls are canonical when there is only a single null element after a list + // and it is at the leaf. + bool single_nullable_element = (level_info_.def_level == level_info_.repeated_ancestor_def_level + 1) && - array_nullable; - bool maybe_parent_nulls = - nested && !(leaf_is_not_nullable || leaf_nulls_are_canonical); + leaf_field_nullable; + bool maybe_parent_nulls = level_info_.HasNullableValues() && !single_nullable_element; if (maybe_parent_nulls) { ARROW_ASSIGN_OR_RAISE( bits_buffer_, arrow::AllocateResizableBuffer( BitUtil::BytesForBits(properties_->write_batch_size()), ctx->memory_pool)); bits_buffer_->ZeroPadding(); - std::static_pointer_cast(AllocateBuffer(allocator_, 0)); } - if (array.type()->id() == ::arrow::Type::DICTIONARY) { - return WriteArrowDictionary(def_levels, rep_levels, num_levels, array, ctx, + if (leaf_array.type()->id() == ::arrow::Type::DICTIONARY) { + return WriteArrowDictionary(def_levels, rep_levels, num_levels, leaf_array, ctx, maybe_parent_nulls); } else { - return WriteArrowDense(def_levels, rep_levels, num_levels, array, ctx, + return WriteArrowDense(def_levels, rep_levels, num_levels, leaf_array, ctx, maybe_parent_nulls); } END_PARQUET_CATCH_EXCEPTIONS diff --git a/cpp/src/parquet/column_writer.h b/cpp/src/parquet/column_writer.h index f831ca51d4b..5e70a28e570 100644 --- a/cpp/src/parquet/column_writer.h +++ b/cpp/src/parquet/column_writer.h @@ -141,11 +141,13 @@ class PARQUET_EXPORT ColumnWriter { /// \brief Write Apache Arrow columnar data directly to ColumnWriter. Returns /// error status if the array data type is not compatible with the concrete - /// writer type + /// writer type. + /// + /// leaf_array is always a primitive (possibly dictionary encoded type). virtual ::arrow::Status WriteArrow(const int16_t* def_levels, const int16_t* rep_levels, - int64_t num_levels, const ::arrow::Array& array, - ArrowWriteContext* ctx, bool nested_array, - bool array_is_nullable) = 0; + int64_t num_levels, const ::arrow::Array& leaf_array, + ArrowWriteContext* ctx, + bool leaf_field_nullable) = 0; }; // API to write values to a single column. This is the main client facing API. From 40e30bdd26a3fbb8c0fef620593972900b28f51e Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Tue, 22 Sep 2020 15:58:51 +0000 Subject: [PATCH 4/4] add more documentation --- cpp/src/parquet/column_writer.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cpp/src/parquet/column_writer.h b/cpp/src/parquet/column_writer.h index 5e70a28e570..57f98533a72 100644 --- a/cpp/src/parquet/column_writer.h +++ b/cpp/src/parquet/column_writer.h @@ -144,6 +144,8 @@ class PARQUET_EXPORT ColumnWriter { /// writer type. /// /// leaf_array is always a primitive (possibly dictionary encoded type). + /// Leaf_field_nullable indicates whether the leaf array is considered nullable + /// according to its schema in a Table or its parent array. virtual ::arrow::Status WriteArrow(const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels, const ::arrow::Array& leaf_array, ArrowWriteContext* ctx,