diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index 188bc8c178a..7ce81651c00 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -55,6 +55,7 @@ #include "parquet/test_util.h" using arrow::Array; +using arrow::ArrayData; using arrow::ArrayVisitor; using arrow::Buffer; using arrow::ChunkedArray; @@ -686,7 +687,7 @@ TYPED_TEST(TestParquetIO, SingleColumnTableRequiredWrite) { ASSERT_NO_FATAL_FAILURE(this->ReaderFromSink(&reader)); ASSERT_NO_FATAL_FAILURE(this->ReadTableFromFile(std::move(reader), &out)); ASSERT_EQ(1, out->num_columns()); - ASSERT_EQ(100, out->num_rows()); + EXPECT_EQ(100, out->num_rows()); std::shared_ptr chunked_array = out->column(0); ASSERT_EQ(1, chunked_array->num_chunks()); @@ -1085,8 +1086,8 @@ TEST_F(TestUInt32ParquetIO, Parquet_1_0_Compatibility) { } std::vector> buffers{values->null_bitmap(), int64_data}; - auto arr_data = std::make_shared<::arrow::ArrayData>(::arrow::int64(), values->length(), - buffers, values->null_count()); + auto arr_data = std::make_shared(::arrow::int64(), values->length(), buffers, + values->null_count()); std::shared_ptr expected_values = MakeArray(arr_data); ASSERT_NE(expected_values, NULLPTR); @@ -2360,6 +2361,39 @@ TEST(ArrowReadWrite, SingleColumnNullableStruct) { 3); } +TEST(ArrowReadWrite, NestedRequiredField) { + auto int_field = ::arrow::field("int_array", ::arrow::int32(), /*nullable=*/false); + auto int_array = ::arrow::ArrayFromJSON(int_field->type(), "[0, 1, 2, 3, 4, 5, 7, 8]"); + auto struct_field = + ::arrow::field("root", ::arrow::struct_({int_field}), /*nullable=*/true); + std::shared_ptr validity_bitmap; + ASSERT_OK_AND_ASSIGN(validity_bitmap, ::arrow::AllocateBitmap(8)); + validity_bitmap->mutable_data()[0] = 0xCC; + + auto struct_data = ArrayData::Make(struct_field->type(), /*length=*/8, + {validity_bitmap}, {int_array->data()}); + CheckSimpleRoundtrip(::arrow::Table::Make(::arrow::schema({struct_field}), + {::arrow::MakeArray(struct_data)}), + /*row_group_size=*/8); +} + +TEST(ArrowReadWrite, NestedNullableField) { + auto int_field = ::arrow::field("int_array", ::arrow::int32()); + auto int_array = + ::arrow::ArrayFromJSON(int_field->type(), "[0, null, 2, null, 4, 5, null, 8]"); + auto struct_field = + ::arrow::field("root", ::arrow::struct_({int_field}), /*nullable=*/true); + std::shared_ptr validity_bitmap; + ASSERT_OK_AND_ASSIGN(validity_bitmap, ::arrow::AllocateBitmap(8)); + validity_bitmap->mutable_data()[0] = 0xCC; + + auto struct_data = ArrayData::Make(struct_field->type(), /*length=*/8, + {validity_bitmap}, {int_array->data()}); + CheckSimpleRoundtrip(::arrow::Table::Make(::arrow::schema({struct_field}), + {::arrow::MakeArray(struct_data)}), + /*row_group_size=*/8); +} + TEST(TestArrowReadWrite, CanonicalNestedRoundTrip) { auto doc_id = field("DocId", ::arrow::int64(), /*nullable=*/false); auto links = field( diff --git a/cpp/src/parquet/arrow/path_internal.cc b/cpp/src/parquet/arrow/path_internal.cc index e2079b71f21..0f570b7e0ca 100644 --- a/cpp/src/parquet/arrow/path_internal.cc +++ b/cpp/src/parquet/arrow/path_internal.cc @@ -525,6 +525,7 @@ struct PathInfo { int16_t max_def_level = 0; int16_t max_rep_level = 0; bool has_dictionary = false; + bool leaf_is_nullable = false; }; /// Contains logic for writing a single leaf node to parquet. @@ -540,6 +541,7 @@ Status WritePath(ElementRange root_range, PathInfo* path_info, std::vector stack(path_info->path.size()); MultipathLevelBuilderResult builder_result; builder_result.leaf_array = path_info->primitive_array; + builder_result.leaf_is_nullable = path_info->leaf_is_nullable; if (path_info->max_def_level == 0) { // This case only occurs when there are no nullable or repeated @@ -706,6 +708,7 @@ class PathBuilder { explicit PathBuilder(bool start_nullable) : nullable_in_parent_(start_nullable) {} template void AddTerminalInfo(const T& array) { + info_.leaf_is_nullable = nullable_in_parent_; if (nullable_in_parent_) { info_.max_def_level++; } diff --git a/cpp/src/parquet/arrow/path_internal.h b/cpp/src/parquet/arrow/path_internal.h index 17c486dc7b2..c5b7fdfdac3 100644 --- a/cpp/src/parquet/arrow/path_internal.h +++ b/cpp/src/parquet/arrow/path_internal.h @@ -91,6 +91,9 @@ struct MultipathLevelBuilderResult { /// This allows for the parquet writing to determine which values ultimately /// needs to be written. std::vector post_list_visited_elements; + + /// Whether the leaf array is nullable. + bool leaf_is_nullable; }; /// \brief Logic for being able to write out nesting (rep/def level) data that is diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index 8b3bfcb5704..bc1ceadf5f7 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -65,13 +65,6 @@ using ParquetReader = parquet::ParquetFileReader; using parquet::internal::RecordReader; -#define BEGIN_PARQUET_CATCH_EXCEPTIONS try { -#define END_PARQUET_CATCH_EXCEPTIONS \ - } \ - catch (const ::parquet::ParquetException& e) { \ - return ::arrow::Status::IOError(e.what()); \ - } - namespace parquet { namespace arrow { namespace { diff --git a/cpp/src/parquet/arrow/writer.cc b/cpp/src/parquet/arrow/writer.cc index b922b21728c..d42b257b38c 100644 --- a/cpp/src/parquet/arrow/writer.cc +++ b/cpp/src/parquet/arrow/writer.cc @@ -136,13 +136,12 @@ class ArrowColumnWriterV2 { return column_writer->WriteArrow(result.def_levels, result.rep_levels, result.def_rep_level_count, *values_array, - ctx); + ctx, result.leaf_is_nullable); })); } PARQUET_CATCH_NOT_OK(column_writer->Close()); } - return Status::OK(); } diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc index 26dd1845922..0e366dc05df 100644 --- a/cpp/src/parquet/column_writer.cc +++ b/cpp/src/parquet/column_writer.cc @@ -41,6 +41,7 @@ #include "parquet/encoding.h" #include "parquet/encryption_internal.h" #include "parquet/internal_file_encryptor.h" +#include "parquet/level_conversion.h" #include "parquet/metadata.h" #include "parquet/platform.h" #include "parquet/properties.h" @@ -49,6 +50,8 @@ #include "parquet/thrift_internal.h" #include "parquet/types.h" +using arrow::Array; +using arrow::ArrayData; using arrow::Datum; using arrow::Status; using arrow::BitUtil::BitWriter; @@ -59,6 +62,23 @@ namespace parquet { namespace { +internal::LevelInfo ComputeLevelInfo(const ColumnDescriptor* descr) { + internal::LevelInfo level_info; + level_info.def_level = descr->max_definition_level(); + level_info.rep_level = descr->max_repetition_level(); + + int16_t min_spaced_def_level = descr->max_definition_level(); + const ::parquet::schema::Node* node = descr->schema_node().get(); + while (node != nullptr && !node->is_repeated()) { + if (node->is_optional()) { + min_spaced_def_level--; + } + node = node->parent(); + } + level_info.repeated_ancestor_def_level = min_spaced_def_level; + return level_info; +} + inline const int16_t* AddIfNotNull(const int16_t* base, int64_t offset) { if (base != nullptr) { return base + offset; @@ -543,6 +563,7 @@ class ColumnWriterImpl { Encoding::type encoding, const WriterProperties* properties) : metadata_(metadata), descr_(metadata->descr()), + level_info_(ComputeLevelInfo(metadata->descr())), pager_(std::move(pager)), has_dictionary_(use_dictionary), encoding_(encoding), @@ -563,6 +584,7 @@ class ColumnWriterImpl { std::static_pointer_cast(AllocateBuffer(allocator_, 0)); uncompressed_data_ = std::static_pointer_cast(AllocateBuffer(allocator_, 0)); + if (pager_->has_compressor()) { compressor_temp_buffer_ = std::static_pointer_cast(AllocateBuffer(allocator_, 0)); @@ -627,6 +649,9 @@ class ColumnWriterImpl { ColumnChunkMetaDataBuilder* metadata_; const ColumnDescriptor* descr_; + // scratch buffer if validity bits need to be recalculated. + std::shared_ptr bits_buffer_; + const internal::LevelInfo level_info_; std::unique_ptr pager_; @@ -965,9 +990,11 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< // of values, the chunking will ensure the AddDataPage() is called at a reasonable // pagesize limit int64_t value_offset = 0; + auto WriteChunk = [&](int64_t offset, int64_t batch_size) { int64_t values_to_write = WriteLevels(batch_size, AddIfNotNull(def_levels, offset), AddIfNotNull(rep_levels, offset)); + // PARQUET-780 if (values_to_write > 0) { DCHECK_NE(nullptr, values); @@ -992,11 +1019,21 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< auto WriteChunk = [&](int64_t offset, int64_t batch_size) { int64_t batch_num_values = 0; int64_t batch_num_spaced_values = 0; + int64_t null_count; + MaybeCalculateValidityBits(AddIfNotNull(def_levels, offset), batch_size, + &batch_num_values, &batch_num_spaced_values, + &null_count); + WriteLevelsSpaced(batch_size, AddIfNotNull(def_levels, offset), - AddIfNotNull(rep_levels, offset), &batch_num_values, - &batch_num_spaced_values); - WriteValuesSpaced(values + value_offset, batch_num_values, batch_num_spaced_values, - valid_bits, valid_bits_offset + value_offset); + AddIfNotNull(rep_levels, offset)); + if (bits_buffer_ != nullptr) { + WriteValuesSpaced(values + value_offset, batch_num_values, + batch_num_spaced_values, bits_buffer_->data(), /*offset=*/0); + } else { + WriteValuesSpaced(values + value_offset, batch_num_values, + batch_num_spaced_values, valid_bits, + valid_bits_offset + value_offset); + } CommitWriteAndCheckPageLimit(batch_size, batch_num_spaced_values); value_offset += batch_num_spaced_values; @@ -1008,13 +1045,31 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< } Status WriteArrow(const int16_t* def_levels, const int16_t* rep_levels, - int64_t num_levels, const ::arrow::Array& array, - ArrowWriteContext* ctx) override { - if (array.type()->id() == ::arrow::Type::DICTIONARY) { - return WriteArrowDictionary(def_levels, rep_levels, num_levels, array, ctx); + int64_t num_levels, const ::arrow::Array& leaf_array, + ArrowWriteContext* ctx, bool leaf_field_nullable) override { + BEGIN_PARQUET_CATCH_EXCEPTIONS + // Leaf nulls are canonical when there is only a single null element after a list + // and it is at the leaf. + bool single_nullable_element = + (level_info_.def_level == level_info_.repeated_ancestor_def_level + 1) && + leaf_field_nullable; + bool maybe_parent_nulls = level_info_.HasNullableValues() && !single_nullable_element; + if (maybe_parent_nulls) { + ARROW_ASSIGN_OR_RAISE( + bits_buffer_, + arrow::AllocateResizableBuffer( + BitUtil::BytesForBits(properties_->write_batch_size()), ctx->memory_pool)); + bits_buffer_->ZeroPadding(); + } + + if (leaf_array.type()->id() == ::arrow::Type::DICTIONARY) { + return WriteArrowDictionary(def_levels, rep_levels, num_levels, leaf_array, ctx, + maybe_parent_nulls); } else { - return WriteArrowDense(def_levels, rep_levels, num_levels, array, ctx); + return WriteArrowDense(def_levels, rep_levels, num_levels, leaf_array, ctx, + maybe_parent_nulls); } + END_PARQUET_CATCH_EXCEPTIONS } int64_t EstimatedBufferedValueBytes() const override { @@ -1031,11 +1086,11 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< // plain encoding is circumvented Status WriteArrowDictionary(const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels, const ::arrow::Array& array, - ArrowWriteContext* context); + ArrowWriteContext* context, bool maybe_parent_nulls); Status WriteArrowDense(const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels, const ::arrow::Array& array, - ArrowWriteContext* context); + ArrowWriteContext* context, bool maybe_parent_nulls); void WriteDictionaryPage() override { // We have to dynamic cast here because of TypedEncoder as @@ -1130,37 +1185,61 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< return values_to_write; } + void MaybeCalculateValidityBits(const int16_t* def_levels, int64_t batch_size, + int64_t* out_values_to_write, + int64_t* out_spaced_values_to_write, + int64_t* null_count) { + if (bits_buffer_ == nullptr) { + if (!level_info_.HasNullableValues()) { + *out_values_to_write = batch_size; + *out_spaced_values_to_write = batch_size; + *null_count = 0; + } else { + for (int x = 0; x < batch_size; x++) { + *out_values_to_write += def_levels[x] == level_info_.def_level ? 1 : 0; + *out_spaced_values_to_write += + def_levels[x] >= level_info_.repeated_ancestor_def_level ? 1 : 0; + } + *null_count = *out_values_to_write - *out_spaced_values_to_write; + } + return; + } + // Shrink to fit possible causes another allocation, and would only be necessary + // on the last batch. + int64_t new_bitmap_size = BitUtil::BytesForBits(batch_size); + if (new_bitmap_size != bits_buffer_->size()) { + PARQUET_THROW_NOT_OK( + bits_buffer_->Resize(new_bitmap_size, /*shrink_to_fit=*/false)); + bits_buffer_->ZeroPadding(); + } + internal::ValidityBitmapInputOutput io; + io.valid_bits = bits_buffer_->mutable_data(); + io.values_read_upper_bound = batch_size; + internal::DefLevelsToBitmap(def_levels, batch_size, level_info_, &io); + *out_values_to_write = io.values_read - io.null_count; + *out_spaced_values_to_write = io.values_read; + *null_count = io.null_count; + } + + std::shared_ptr MaybeReplaceValidity(std::shared_ptr array, + int64_t new_null_count) { + if (bits_buffer_ == nullptr) { + return array; + } + std::vector> buffers = array->data()->buffers; + buffers[0] = bits_buffer_; + // Should be a leaf array. + DCHECK_EQ(array->num_fields(), 0); + return arrow::MakeArray(std::make_shared( + array->type(), array->length(), std::move(buffers), new_null_count)); + } + void WriteLevelsSpaced(int64_t num_levels, const int16_t* def_levels, - const int16_t* rep_levels, int64_t* out_values_to_write, - int64_t* out_spaced_values_to_write) { - int64_t values_to_write = 0; - int64_t spaced_values_to_write = 0; + const int16_t* rep_levels) { // If the field is required and non-repeated, there are no definition levels if (descr_->max_definition_level() > 0) { - // Minimal definition level for which spaced values are written - int16_t min_spaced_def_level = descr_->max_definition_level(); - const ::parquet::schema::Node* node = descr_->schema_node().get(); - while (node != nullptr && !node->is_repeated()) { - if (node->is_optional()) { - min_spaced_def_level--; - } - node = node->parent(); - } - for (int64_t i = 0; i < num_levels; ++i) { - if (def_levels[i] == descr_->max_definition_level()) { - ++values_to_write; - } - if (def_levels[i] >= min_spaced_def_level) { - ++spaced_values_to_write; - } - } WriteDefinitionLevels(num_levels, def_levels); - } else { - // Required field, write all values - values_to_write = num_levels; - spaced_values_to_write = num_levels; } - // Not present for non-repeated fields if (descr_->max_repetition_level() > 0) { // A row could include more than one value @@ -1170,15 +1249,11 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< rows_written_++; } } - WriteRepetitionLevels(num_levels, rep_levels); } else { // Each value is exactly one row rows_written_ += static_cast(num_levels); } - - *out_values_to_write = values_to_write; - *out_spaced_values_to_write = spaced_values_to_write; } void CommitWriteAndCheckPageLimit(int64_t num_levels, int64_t num_values) { @@ -1234,7 +1309,7 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< void WriteValuesSpaced(const T* values, int64_t num_values, int64_t num_spaced_values, const uint8_t* valid_bits, int64_t valid_bits_offset) { - if (descr_->schema_node()->is_optional()) { + if (num_values != num_spaced_values) { dynamic_cast(current_encoder_.get()) ->PutSpaced(values, static_cast(num_spaced_values), valid_bits, valid_bits_offset); @@ -1251,11 +1326,9 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< }; template -Status TypedColumnWriterImpl::WriteArrowDictionary(const int16_t* def_levels, - const int16_t* rep_levels, - int64_t num_levels, - const ::arrow::Array& array, - ArrowWriteContext* ctx) { +Status TypedColumnWriterImpl::WriteArrowDictionary( + const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels, + const ::arrow::Array& array, ArrowWriteContext* ctx, bool maybe_parent_nulls) { // If this is the first time writing a DictionaryArray, then there's // a few possible paths to take: // @@ -1275,7 +1348,8 @@ Status TypedColumnWriterImpl::WriteArrowDictionary(const int16_t* def_lev std::shared_ptr<::arrow::Array> dense_array; RETURN_NOT_OK( ConvertDictionaryToDense(array, properties_->memory_pool(), &dense_array)); - return WriteArrowDense(def_levels, rep_levels, num_levels, *dense_array, ctx); + return WriteArrowDense(def_levels, rep_levels, num_levels, *dense_array, ctx, + maybe_parent_nulls); }; if (!IsDictionaryEncoding(current_encoder_->encoding()) || @@ -1298,10 +1372,18 @@ Status TypedColumnWriterImpl::WriteArrowDictionary(const int16_t* def_lev auto WriteIndicesChunk = [&](int64_t offset, int64_t batch_size) { int64_t batch_num_values = 0; int64_t batch_num_spaced_values = 0; + int64_t null_count = arrow::kUnknownNullCount; + // Bits is not null for nullable values. At this point in the code we can't determine + // if the leaf array has the same null values as any parents it might have had so we + // need to recompute it from def levels. + MaybeCalculateValidityBits(AddIfNotNull(def_levels, offset), batch_size, + &batch_num_values, &batch_num_spaced_values, &null_count); WriteLevelsSpaced(batch_size, AddIfNotNull(def_levels, offset), - AddIfNotNull(rep_levels, offset), &batch_num_values, - &batch_num_spaced_values); - dict_encoder->PutIndices(*indices->Slice(value_offset, batch_num_spaced_values)); + AddIfNotNull(rep_levels, offset)); + std::shared_ptr writeable_indices = + indices->Slice(value_offset, batch_num_spaced_values); + writeable_indices = MaybeReplaceValidity(writeable_indices, null_count); + dict_encoder->PutIndices(*writeable_indices); CommitWriteAndCheckPageLimit(batch_size, batch_num_values); value_offset += batch_num_spaced_values; }; @@ -1352,21 +1434,19 @@ struct SerializeFunctor { template Status WriteArrowSerialize(const ::arrow::Array& array, int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels, - ArrowWriteContext* ctx, - TypedColumnWriter* writer) { + ArrowWriteContext* ctx, TypedColumnWriter* writer, + bool maybe_parent_nulls) { using ParquetCType = typename ParquetType::c_type; using ArrayType = typename ::arrow::TypeTraits::ArrayType; ParquetCType* buffer = nullptr; PARQUET_THROW_NOT_OK(ctx->GetScratchData(array.length(), &buffer)); - bool no_nulls = - writer->descr()->schema_node()->is_required() || (array.null_count() == 0); - SerializeFunctor functor; RETURN_NOT_OK(functor.Serialize(checked_cast(array), ctx, buffer)); - - if (no_nulls) { + bool no_nulls = + writer->descr()->schema_node()->is_required() || (array.null_count() == 0); + if (!maybe_parent_nulls && no_nulls) { PARQUET_CATCH_NOT_OK(writer->WriteBatch(num_levels, def_levels, rep_levels, buffer)); } else { PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced(num_levels, def_levels, rep_levels, @@ -1379,8 +1459,8 @@ Status WriteArrowSerialize(const ::arrow::Array& array, int64_t num_levels, template Status WriteArrowZeroCopy(const ::arrow::Array& array, int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels, - ArrowWriteContext* ctx, - TypedColumnWriter* writer) { + ArrowWriteContext* ctx, TypedColumnWriter* writer, + bool maybe_parent_nulls) { using T = typename ParquetType::c_type; const auto& data = static_cast(array); const T* values = nullptr; @@ -1390,7 +1470,10 @@ Status WriteArrowZeroCopy(const ::arrow::Array& array, int64_t num_levels, } else { DCHECK_EQ(data.length(), 0); } - if (writer->descr()->schema_node()->is_required() || (data.null_count() == 0)) { + bool no_nulls = + writer->descr()->schema_node()->is_required() || (array.null_count() == 0); + + if (!maybe_parent_nulls && no_nulls) { PARQUET_CATCH_NOT_OK(writer->WriteBatch(num_levels, def_levels, rep_levels, values)); } else { PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced(num_levels, def_levels, rep_levels, @@ -1403,12 +1486,12 @@ Status WriteArrowZeroCopy(const ::arrow::Array& array, int64_t num_levels, #define WRITE_SERIALIZE_CASE(ArrowEnum, ArrowType, ParquetType) \ case ::arrow::Type::ArrowEnum: \ return WriteArrowSerialize( \ - array, num_levels, def_levels, rep_levels, ctx, this); + array, num_levels, def_levels, rep_levels, ctx, this, maybe_parent_nulls); #define WRITE_ZERO_COPY_CASE(ArrowEnum, ArrowType, ParquetType) \ case ::arrow::Type::ArrowEnum: \ return WriteArrowZeroCopy(array, num_levels, def_levels, rep_levels, \ - ctx, this); + ctx, this, maybe_parent_nulls); #define ARROW_UNSUPPORTED() \ std::stringstream ss; \ @@ -1430,16 +1513,14 @@ struct SerializeFunctor { }; template <> -Status TypedColumnWriterImpl::WriteArrowDense(const int16_t* def_levels, - const int16_t* rep_levels, - int64_t num_levels, - const ::arrow::Array& array, - ArrowWriteContext* ctx) { +Status TypedColumnWriterImpl::WriteArrowDense( + const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels, + const ::arrow::Array& array, ArrowWriteContext* ctx, bool maybe_parent_nulls) { if (array.type_id() != ::arrow::Type::BOOL) { ARROW_UNSUPPORTED(); } return WriteArrowSerialize( - array, num_levels, def_levels, rep_levels, ctx, this); + array, num_levels, def_levels, rep_levels, ctx, this, maybe_parent_nulls); } // ---------------------------------------------------------------------- @@ -1473,11 +1554,9 @@ struct SerializeFunctor { }; template <> -Status TypedColumnWriterImpl::WriteArrowDense(const int16_t* def_levels, - const int16_t* rep_levels, - int64_t num_levels, - const ::arrow::Array& array, - ArrowWriteContext* ctx) { +Status TypedColumnWriterImpl::WriteArrowDense( + const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels, + const ::arrow::Array& array, ArrowWriteContext* ctx, bool maybe_parent_nulls) { switch (array.type()->id()) { case ::arrow::Type::NA: { PARQUET_CATCH_NOT_OK(WriteBatch(num_levels, def_levels, rep_levels, nullptr)); @@ -1599,14 +1678,16 @@ struct SerializeFunctor { Status WriteTimestamps(const ::arrow::Array& values, int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels, - ArrowWriteContext* ctx, TypedColumnWriter* writer) { + ArrowWriteContext* ctx, TypedColumnWriter* writer, + bool maybe_parent_nulls) { const auto& source_type = static_cast(*values.type()); auto WriteCoerce = [&](const ArrowWriterProperties* properties) { ArrowWriteContext temp_ctx = *ctx; temp_ctx.properties = properties; return WriteArrowSerialize( - values, num_levels, def_levels, rep_levels, &temp_ctx, writer); + values, num_levels, def_levels, rep_levels, &temp_ctx, writer, + maybe_parent_nulls); }; if (ctx->properties->coerce_timestamps_enabled()) { @@ -1614,7 +1695,7 @@ Status WriteTimestamps(const ::arrow::Array& values, int64_t num_levels, if (source_type.unit() == ctx->properties->coerce_timestamps_unit()) { // No data conversion necessary return WriteArrowZeroCopy(values, num_levels, def_levels, rep_levels, - ctx, writer); + ctx, writer, maybe_parent_nulls); } else { return WriteCoerce(ctx->properties); } @@ -1639,19 +1720,18 @@ Status WriteTimestamps(const ::arrow::Array& values, int64_t num_levels, } else { // No data conversion necessary return WriteArrowZeroCopy(values, num_levels, def_levels, rep_levels, ctx, - writer); + writer, maybe_parent_nulls); } } template <> -Status TypedColumnWriterImpl::WriteArrowDense(const int16_t* def_levels, - const int16_t* rep_levels, - int64_t num_levels, - const ::arrow::Array& array, - ArrowWriteContext* ctx) { +Status TypedColumnWriterImpl::WriteArrowDense( + const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels, + const ::arrow::Array& array, ArrowWriteContext* ctx, bool maybe_parent_nulls) { switch (array.type()->id()) { case ::arrow::Type::TIMESTAMP: - return WriteTimestamps(array, num_levels, def_levels, rep_levels, ctx, this); + return WriteTimestamps(array, num_levels, def_levels, rep_levels, ctx, this, + maybe_parent_nulls); WRITE_ZERO_COPY_CASE(INT64, Int64Type, Int64Type) WRITE_SERIALIZE_CASE(UINT32, UInt32Type, Int64Type) WRITE_SERIALIZE_CASE(UINT64, UInt64Type, Int64Type) @@ -1662,56 +1742,48 @@ Status TypedColumnWriterImpl::WriteArrowDense(const int16_t* def_leve } template <> -Status TypedColumnWriterImpl::WriteArrowDense(const int16_t* def_levels, - const int16_t* rep_levels, - int64_t num_levels, - const ::arrow::Array& array, - ArrowWriteContext* ctx) { +Status TypedColumnWriterImpl::WriteArrowDense( + const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels, + const ::arrow::Array& array, ArrowWriteContext* ctx, bool maybe_parent_nulls) { if (array.type_id() != ::arrow::Type::TIMESTAMP) { ARROW_UNSUPPORTED(); } return WriteArrowSerialize( - array, num_levels, def_levels, rep_levels, ctx, this); + array, num_levels, def_levels, rep_levels, ctx, this, maybe_parent_nulls); } // ---------------------------------------------------------------------- // Floating point types template <> -Status TypedColumnWriterImpl::WriteArrowDense(const int16_t* def_levels, - const int16_t* rep_levels, - int64_t num_levels, - const ::arrow::Array& array, - ArrowWriteContext* ctx) { +Status TypedColumnWriterImpl::WriteArrowDense( + const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels, + const ::arrow::Array& array, ArrowWriteContext* ctx, bool maybe_parent_nulls) { if (array.type_id() != ::arrow::Type::FLOAT) { ARROW_UNSUPPORTED(); } return WriteArrowZeroCopy(array, num_levels, def_levels, rep_levels, ctx, - this); + this, maybe_parent_nulls); } template <> -Status TypedColumnWriterImpl::WriteArrowDense(const int16_t* def_levels, - const int16_t* rep_levels, - int64_t num_levels, - const ::arrow::Array& array, - ArrowWriteContext* ctx) { +Status TypedColumnWriterImpl::WriteArrowDense( + const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels, + const ::arrow::Array& array, ArrowWriteContext* ctx, bool maybe_parent_nulls) { if (array.type_id() != ::arrow::Type::DOUBLE) { ARROW_UNSUPPORTED(); } return WriteArrowZeroCopy(array, num_levels, def_levels, rep_levels, ctx, - this); + this, maybe_parent_nulls); } // ---------------------------------------------------------------------- // Write Arrow to BYTE_ARRAY template <> -Status TypedColumnWriterImpl::WriteArrowDense(const int16_t* def_levels, - const int16_t* rep_levels, - int64_t num_levels, - const ::arrow::Array& array, - ArrowWriteContext* ctx) { +Status TypedColumnWriterImpl::WriteArrowDense( + const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels, + const ::arrow::Array& array, ArrowWriteContext* ctx, bool maybe_parent_nulls) { if (array.type()->id() != ::arrow::Type::BINARY && array.type()->id() != ::arrow::Type::STRING) { ARROW_UNSUPPORTED(); @@ -1721,11 +1793,16 @@ Status TypedColumnWriterImpl::WriteArrowDense(const int16_t* def_ auto WriteChunk = [&](int64_t offset, int64_t batch_size) { int64_t batch_num_values = 0; int64_t batch_num_spaced_values = 0; + int64_t null_count = 0; + + MaybeCalculateValidityBits(AddIfNotNull(def_levels, offset), batch_size, + &batch_num_values, &batch_num_spaced_values, &null_count); WriteLevelsSpaced(batch_size, AddIfNotNull(def_levels, offset), - AddIfNotNull(rep_levels, offset), &batch_num_values, - &batch_num_spaced_values); - std::shared_ptr<::arrow::Array> data_slice = + AddIfNotNull(rep_levels, offset)); + std::shared_ptr data_slice = array.Slice(value_offset, batch_num_spaced_values); + data_slice = MaybeReplaceValidity(data_slice, null_count); + current_encoder_->Put(*data_slice); if (page_statistics_ != nullptr) { page_statistics_->Update(*data_slice); @@ -1824,11 +1901,9 @@ struct SerializeFunctor -Status TypedColumnWriterImpl::WriteArrowDense(const int16_t* def_levels, - const int16_t* rep_levels, - int64_t num_levels, - const ::arrow::Array& array, - ArrowWriteContext* ctx) { +Status TypedColumnWriterImpl::WriteArrowDense( + const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels, + const ::arrow::Array& array, ArrowWriteContext* ctx, bool maybe_parent_nulls) { switch (array.type()->id()) { WRITE_SERIALIZE_CASE(FIXED_SIZE_BINARY, FixedSizeBinaryType, FLBAType) WRITE_SERIALIZE_CASE(DECIMAL, Decimal128Type, FLBAType) diff --git a/cpp/src/parquet/column_writer.h b/cpp/src/parquet/column_writer.h index 02f4725754e..57f98533a72 100644 --- a/cpp/src/parquet/column_writer.h +++ b/cpp/src/parquet/column_writer.h @@ -141,10 +141,15 @@ class PARQUET_EXPORT ColumnWriter { /// \brief Write Apache Arrow columnar data directly to ColumnWriter. Returns /// error status if the array data type is not compatible with the concrete - /// writer type + /// writer type. + /// + /// leaf_array is always a primitive (possibly dictionary encoded type). + /// Leaf_field_nullable indicates whether the leaf array is considered nullable + /// according to its schema in a Table or its parent array. virtual ::arrow::Status WriteArrow(const int16_t* def_levels, const int16_t* rep_levels, - int64_t num_levels, const ::arrow::Array& array, - ArrowWriteContext* ctx) = 0; + int64_t num_levels, const ::arrow::Array& leaf_array, + ArrowWriteContext* ctx, + bool leaf_field_nullable) = 0; }; // API to write values to a single column. This is the main client facing API. diff --git a/cpp/src/parquet/exception.h b/cpp/src/parquet/exception.h index 3f97c1e9267..1640decf0a3 100644 --- a/cpp/src/parquet/exception.h +++ b/cpp/src/parquet/exception.h @@ -140,4 +140,11 @@ void ThrowNotOk(StatusReturnBlock&& b) { PARQUET_THROW_NOT_OK(b()); } +#define BEGIN_PARQUET_CATCH_EXCEPTIONS try { +#define END_PARQUET_CATCH_EXCEPTIONS \ + } \ + catch (const ::parquet::ParquetException& e) { \ + return ::arrow::Status::IOError(e.what()); \ + } + } // namespace parquet