From e79c52d16428e8ad5e444fcfbf71fac04936ecbe Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Sun, 2 Oct 2022 23:02:18 +0200 Subject: [PATCH 01/22] Initial commit --- cpp/src/parquet/column_writer_test.cc | 16 ++-- cpp/src/parquet/encoding.cc | 105 +++++++++++++++++++++ cpp/src/parquet/encoding_test.cc | 62 ++++++++++++ docs/source/cpp/parquet.rst | 2 +- python/pyarrow/_parquet.pyx | 3 +- python/pyarrow/tests/parquet/test_basic.py | 37 ++++++-- 6 files changed, 206 insertions(+), 19 deletions(-) diff --git a/cpp/src/parquet/column_writer_test.cc b/cpp/src/parquet/column_writer_test.cc index 0da78264832..d2938443fe3 100644 --- a/cpp/src/parquet/column_writer_test.cc +++ b/cpp/src/parquet/column_writer_test.cc @@ -400,8 +400,9 @@ typedef ::testing::Types; -using TestValuesWriterInt64Type = TestPrimitiveWriter; +using TestInt32TypeValuesWriter = TestPrimitiveWriter; +using TestInt64TypeValuesWriter = TestPrimitiveWriter; +using TestByteArrayValuesWriter = TestPrimitiveWriter; TYPED_TEST(TestPrimitiveWriter, RequiredPlain) { this->TestRequiredWithEncoding(Encoding::PLAIN); @@ -421,19 +422,19 @@ TYPED_TEST(TestPrimitiveWriter, RequiredBitPacked) { } */ -TEST_F(TestValuesWriterInt32Type, RequiredDeltaBinaryPacked) { +TEST_F(TestInt32TypeValuesWriter, RequiredDeltaBinaryPacked) { this->TestRequiredWithEncoding(Encoding::DELTA_BINARY_PACKED); } -TEST_F(TestValuesWriterInt64Type, RequiredDeltaBinaryPacked) { +TEST_F(TestInt64TypeValuesWriter, RequiredDeltaBinaryPacked) { this->TestRequiredWithEncoding(Encoding::DELTA_BINARY_PACKED); } -/* -TYPED_TEST(TestPrimitiveWriter, RequiredDeltaLengthByteArray) { +TEST_F(TestByteArrayValuesWriter, RequiredDeltaLengthByteArray) { this->TestRequiredWithEncoding(Encoding::DELTA_LENGTH_BYTE_ARRAY); } +/* TYPED_TEST(TestPrimitiveWriter, RequiredDeltaByteArray) { this->TestRequiredWithEncoding(Encoding::DELTA_BYTE_ARRAY); } @@ -654,7 +655,7 @@ TEST(TestWriter, NullValuesBuffer) { // PARQUET-719 // Test case for NULL values -TEST_F(TestValuesWriterInt32Type, OptionalNullValueChunk) { +TEST_F(TestInt32TypeValuesWriter, OptionalNullValueChunk) { this->SetUpSchema(Repetition::OPTIONAL); this->GenerateData(LARGE_SIZE); @@ -692,7 +693,6 @@ TEST_F(TestBooleanValuesWriter, AlternateBooleanValues) { // PARQUET-979 // Prevent writing large MIN, MAX stats -using TestByteArrayValuesWriter = TestPrimitiveWriter; TEST_F(TestByteArrayValuesWriter, OmitStats) { int min_len = 1024 * 4; int max_len = 1024 * 8; diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index dc0edb1c801..adca1b714c1 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -2572,6 +2572,102 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder +class DeltaLengthByteArrayEncoder : public EncoderImpl, + virtual public TypedEncoder { + public: + using T = typename DType::c_type; + + explicit DeltaLengthByteArrayEncoder(const ColumnDescriptor* descr, MemoryPool* pool) + : EncoderImpl(descr, Encoding::DELTA_LENGTH_BYTE_ARRAY, + pool = ::arrow::default_memory_pool()), + sink_(pool), + length_encoder_(nullptr, pool), + encoded_size_{0} {} + + std::shared_ptr FlushValues() override; + + int64_t EstimatedDataEncodedSize() override { return encoded_size_; } + + using TypedEncoder::Put; + + void Put(const ::arrow::Array& values) override; + + void Put(const T* buffer, int num_values) override; + + void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits, + int64_t valid_bits_offset) override; + + protected: + ::arrow::BufferBuilder sink_; + DeltaBitPackEncoder length_encoder_; + uint32_t encoded_size_; +}; + +template <> +void DeltaLengthByteArrayEncoder::Put(const ::arrow::Array& values) { + auto src = values.data()->GetValues(1); + Put(src, static_cast(values.length())); +} + +template +void DeltaLengthByteArrayEncoder::Put(const T* src, int num_values) { + if (num_values == 0) { + return; + } + + std::vector lengths(num_values); + for (int idx = 0; idx < num_values; idx++) { + auto len = static_cast(src[idx].len); + lengths[idx] = len; + encoded_size_ += len; + } + length_encoder_.Put(lengths.data(), num_values); + PARQUET_THROW_NOT_OK(sink_.Reserve(encoded_size_)); + + for (int idx = 0; idx < num_values; idx++) { + sink_.UnsafeAppend(src[idx].ptr, lengths[idx]); + } +} + +template +void DeltaLengthByteArrayEncoder::PutSpaced(const T* src, int num_values, + const uint8_t* valid_bits, + int64_t valid_bits_offset) { + if (valid_bits != NULLPTR) { + PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values * sizeof(T), + this->memory_pool())); + T* data = reinterpret_cast(buffer->mutable_data()); + int num_valid_values = ::arrow::util::internal::SpacedCompress( + src, num_values, valid_bits, valid_bits_offset, data); + Put(data, num_valid_values); + } else { + Put(src, num_values); + } +} + +template +std::shared_ptr DeltaLengthByteArrayEncoder::FlushValues() { + std::shared_ptr encoded_lengths = length_encoder_.FlushValues(); + + std::shared_ptr data; + PARQUET_THROW_NOT_OK(sink_.Finish(&data)); + sink_.Reset(); + + PARQUET_THROW_NOT_OK(sink_.Append(encoded_lengths->data(), encoded_lengths->size())); + PARQUET_THROW_NOT_OK(sink_.Append(data->mutable_data(), data->size())); + + std::shared_ptr buffer; + PARQUET_THROW_NOT_OK(sink_.Finish(&buffer, true)); + return buffer; +} + +// ---------------------------------------------------------------------- +// DeltaByteArrayDecoder + class DeltaLengthByteArrayDecoder : public DecoderImpl, virtual public TypedDecoder { public: @@ -3088,6 +3184,15 @@ std::unique_ptr MakeEncoder(Type::type type_num, Encoding::type encodin "DELTA_BINARY_PACKED encoder only supports INT32 and INT64"); break; } + } else if (encoding == Encoding::DELTA_LENGTH_BYTE_ARRAY) { + switch (type_num) { + case Type::BYTE_ARRAY: + return std::unique_ptr( + new DeltaLengthByteArrayEncoder(descr, pool)); + default: + throw ParquetException("DELTA_LENGTH_BYTE_ARRAY only supports BYTE_ARRAY"); + break; + } } else { ParquetException::NYI("Selected encoding is not supported"); } diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc index a0e3fe9545d..b4d0d97d3cd 100644 --- a/cpp/src/parquet/encoding_test.cc +++ b/cpp/src/parquet/encoding_test.cc @@ -1493,5 +1493,67 @@ TYPED_TEST(TestDeltaBitPackEncoding, NonZeroPaddedMiniblockBitWidth) { } } +// ---------------------------------------------------------------------- +// DELTA_LENGTH_BYTE_ARRAY encode/decode tests. + +template +class TestDeltaLengthByteArrayEncoding : public TestEncodingBase { + public: + using c_type = typename Type::c_type; + static constexpr int TYPE = Type::type_num; + + virtual void CheckRoundtrip() { + auto encoder = + MakeTypedEncoder(Encoding::DELTA_LENGTH_BYTE_ARRAY, false, descr_.get()); + auto decoder = + MakeTypedDecoder(Encoding::DELTA_LENGTH_BYTE_ARRAY, descr_.get()); + + encoder->Put(draws_, num_values_); + encode_buffer_ = encoder->FlushValues(); + + decoder->SetData(num_values_, encode_buffer_->data(), + static_cast(encode_buffer_->size())); + int values_decoded = decoder->Decode(decode_buf_, num_values_); + ASSERT_EQ(num_values_, values_decoded); + ASSERT_NO_FATAL_FAILURE(VerifyResults(decode_buf_, draws_, num_values_)); + } + + void CheckRoundtripSpaced(const uint8_t* valid_bits, int64_t valid_bits_offset) { + auto encoder = + MakeTypedEncoder(Encoding::DELTA_LENGTH_BYTE_ARRAY, false, descr_.get()); + auto decoder = + MakeTypedDecoder(Encoding::DELTA_LENGTH_BYTE_ARRAY, descr_.get()); + int null_count = 0; + for (auto i = 0; i < num_values_; i++) { + if (!bit_util::GetBit(valid_bits, valid_bits_offset + i)) { + null_count++; + } + } + + encoder->PutSpaced(draws_, num_values_, valid_bits, valid_bits_offset); + encode_buffer_ = encoder->FlushValues(); + decoder->SetData(num_values_ - null_count, encode_buffer_->data(), + static_cast(encode_buffer_->size())); + auto values_decoded = decoder->DecodeSpaced(decode_buf_, num_values_, null_count, + valid_bits, valid_bits_offset); + ASSERT_EQ(num_values_, values_decoded); + ASSERT_NO_FATAL_FAILURE(VerifyResultsSpaced(decode_buf_, draws_, num_values_, + valid_bits, valid_bits_offset)); + } + + protected: + USING_BASE_MEMBERS(); +}; + +typedef ::testing::Types TestDeltaLengthByteArrayEncodingTypes; +TYPED_TEST_SUITE(TestDeltaLengthByteArrayEncoding, TestDeltaLengthByteArrayEncodingTypes); + +TYPED_TEST(TestDeltaLengthByteArrayEncoding, BasicRoundTrip) { + ASSERT_NO_FATAL_FAILURE(this->Execute(2000, 200)); + ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced( + /*nvalues*/ 1234, /*repeats*/ 1, /*valid_bits_offset*/ 64, + /*null_probability*/ 0.1)); +} + } // namespace test } // namespace parquet diff --git a/docs/source/cpp/parquet.rst b/docs/source/cpp/parquet.rst index edc42d54cff..73dcac20214 100644 --- a/docs/source/cpp/parquet.rst +++ b/docs/source/cpp/parquet.rst @@ -402,7 +402,7 @@ Encodings +--------------------------+----------+----------+---------+ | DELTA_BYTE_ARRAY | ✓ | | | +--------------------------+----------+----------+---------+ -| DELTA_LENGTH_BYTE_ARRAY | ✓ | | | +| DELTA_LENGTH_BYTE_ARRAY | ✓ | ✓ | | +--------------------------+----------+----------+---------+ * \(1) Only supported for encoding definition and repetition levels, diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx index 061acafd12a..04fd71d590b 100644 --- a/python/pyarrow/_parquet.pyx +++ b/python/pyarrow/_parquet.pyx @@ -455,7 +455,7 @@ cdef class ColumnChunkMetaData(_Weakrefable): Encodings used for column (tuple of str). One of 'PLAIN', 'BIT_PACKED', 'RLE', 'BYTE_STREAM_SPLIT', 'DELTA_BINARY_PACKED', - 'DELTA_BYTE_ARRAY'. + 'DELTA_LENGTH_BYTE_ARRAY', 'DELTA_BYTE_ARRAY'. """ return tuple(map(encoding_name_from_enum, self.metadata.encodings())) @@ -1104,6 +1104,7 @@ cdef encoding_enum_from_name(str encoding_name): 'RLE': ParquetEncoding_RLE, 'BYTE_STREAM_SPLIT': ParquetEncoding_BYTE_STREAM_SPLIT, 'DELTA_BINARY_PACKED': ParquetEncoding_DELTA_BINARY_PACKED, + 'DELTA_LENGTH_BYTE_ARRAY': ParquetEncoding_DELTA_LENGTH_BYTE_ARRAY, 'DELTA_BYTE_ARRAY': ParquetEncoding_DELTA_BYTE_ARRAY, 'RLE_DICTIONARY': 'dict', 'PLAIN_DICTIONARY': 'dict', diff --git a/python/pyarrow/tests/parquet/test_basic.py b/python/pyarrow/tests/parquet/test_basic.py index 1ab392aed04..0fe0e47ac8b 100644 --- a/python/pyarrow/tests/parquet/test_basic.py +++ b/python/pyarrow/tests/parquet/test_basic.py @@ -392,13 +392,17 @@ def test_byte_stream_split(use_legacy_dataset): def test_column_encoding(use_legacy_dataset): arr_float = pa.array(list(map(float, range(100)))) arr_int = pa.array(list(map(int, range(100)))) - mixed_table = pa.Table.from_arrays([arr_float, arr_int], - names=['a', 'b']) + arr_bin = pa.array(list(map( + lambda x: bytes(str(x).zfill(8), "utf-8"), range(100))), pa.binary()) + mixed_table = pa.Table.from_arrays([arr_float, arr_int, arr_bin], + names=['a', 'b', 'c']) # Check "BYTE_STREAM_SPLIT" for column 'a' and "PLAIN" column_encoding for - # column 'b'. + # column 'b' and 'c'. _check_roundtrip(mixed_table, expected=mixed_table, use_dictionary=False, - column_encoding={'a': "BYTE_STREAM_SPLIT", 'b': "PLAIN"}, + column_encoding={'a': "BYTE_STREAM_SPLIT", + 'b': "PLAIN", + 'c': "PLAIN"}, use_legacy_dataset=use_legacy_dataset) # Check "PLAIN" for all columns. @@ -411,7 +415,16 @@ def test_column_encoding(use_legacy_dataset): _check_roundtrip(mixed_table, expected=mixed_table, use_dictionary=False, column_encoding={'a': "PLAIN", - 'b': "DELTA_BINARY_PACKED"}, + 'b': "DELTA_BINARY_PACKED", + 'c': "PLAIN"}, + use_legacy_dataset=use_legacy_dataset) + + # Check "DELTA_LENGTH_BYTE_ARRAY" for byte columns. + _check_roundtrip(mixed_table, expected=mixed_table, + use_dictionary=False, + column_encoding={'a': "PLAIN", + 'b': "DELTA_BINARY_PACKED", + 'c': "DELTA_LENGTH_BYTE_ARRAY"}, use_legacy_dataset=use_legacy_dataset) # Try to pass "BYTE_STREAM_SPLIT" column encoding for integer column 'b'. @@ -421,7 +434,9 @@ def test_column_encoding(use_legacy_dataset): " DOUBLE"): _check_roundtrip(mixed_table, expected=mixed_table, use_dictionary=False, - column_encoding={'b': "BYTE_STREAM_SPLIT"}, + column_encoding={'a': "PLAIN", + 'b': "BYTE_STREAM_SPLIT", + 'c': "PLAIN"}, use_legacy_dataset=use_legacy_dataset) # Try to pass use "DELTA_BINARY_PACKED" encoding on float column. @@ -429,7 +444,9 @@ def test_column_encoding(use_legacy_dataset): with pytest.raises(OSError): _check_roundtrip(mixed_table, expected=mixed_table, use_dictionary=False, - column_encoding={'a': "DELTA_BINARY_PACKED"}, + column_encoding={'a': "DELTA_BINARY_PACKED", + 'b': "PLAIN", + 'c': "PLAIN"}, use_legacy_dataset=use_legacy_dataset) # Try to pass "RLE_DICTIONARY". @@ -470,7 +487,8 @@ def test_column_encoding(use_legacy_dataset): use_dictionary=False, use_byte_stream_split=['a'], column_encoding={'a': "RLE", - 'b': "BYTE_STREAM_SPLIT"}, + 'b': "BYTE_STREAM_SPLIT", + 'c': "PLAIN"}, use_legacy_dataset=use_legacy_dataset) # Try to pass column_encoding and use_byte_stream_split=True. @@ -480,7 +498,8 @@ def test_column_encoding(use_legacy_dataset): use_dictionary=False, use_byte_stream_split=True, column_encoding={'a': "RLE", - 'b': "BYTE_STREAM_SPLIT"}, + 'b': "BYTE_STREAM_SPLIT", + 'c': "PLAIN"}, use_legacy_dataset=use_legacy_dataset) # Try to pass column_encoding=True. From 7d6b25a4ea78ef15e39fd99de163001c1bd74d8e Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Mon, 19 Dec 2022 21:41:01 +0100 Subject: [PATCH 02/22] Review feedback --- cpp/src/parquet/encoding.cc | 39 +++++++++++++++++-------------------- 1 file changed, 18 insertions(+), 21 deletions(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index adca1b714c1..91a6cef0ea2 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -2575,24 +2575,24 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder class DeltaLengthByteArrayEncoder : public EncoderImpl, - virtual public TypedEncoder { + virtual public TypedEncoder { public: - using T = typename DType::c_type; + using T = typename ByteArrayType::c_type; explicit DeltaLengthByteArrayEncoder(const ColumnDescriptor* descr, MemoryPool* pool) : EncoderImpl(descr, Encoding::DELTA_LENGTH_BYTE_ARRAY, pool = ::arrow::default_memory_pool()), sink_(pool), length_encoder_(nullptr, pool), - encoded_size_{0} {} + encoded_size_{0}, + lengths_{0} {} std::shared_ptr FlushValues() override; int64_t EstimatedDataEncodedSize() override { return encoded_size_; } - using TypedEncoder::Put; + using TypedEncoder::Put; void Put(const ::arrow::Array& values) override; @@ -2605,38 +2605,36 @@ class DeltaLengthByteArrayEncoder : public EncoderImpl, ::arrow::BufferBuilder sink_; DeltaBitPackEncoder length_encoder_; uint32_t encoded_size_; + std::vector lengths_; }; -template <> -void DeltaLengthByteArrayEncoder::Put(const ::arrow::Array& values) { +void DeltaLengthByteArrayEncoder::Put(const ::arrow::Array& values) { auto src = values.data()->GetValues(1); Put(src, static_cast(values.length())); } -template -void DeltaLengthByteArrayEncoder::Put(const T* src, int num_values) { +void DeltaLengthByteArrayEncoder::Put(const T* src, int num_values) { if (num_values == 0) { return; } + lengths_.resize(num_values); - std::vector lengths(num_values); for (int idx = 0; idx < num_values; idx++) { auto len = static_cast(src[idx].len); - lengths[idx] = len; + lengths_[idx] = len; encoded_size_ += len; } - length_encoder_.Put(lengths.data(), num_values); + length_encoder_.Put(lengths_.data(), num_values); PARQUET_THROW_NOT_OK(sink_.Reserve(encoded_size_)); for (int idx = 0; idx < num_values; idx++) { - sink_.UnsafeAppend(src[idx].ptr, lengths[idx]); + sink_.UnsafeAppend(src[idx].ptr, lengths_[idx]); } } -template -void DeltaLengthByteArrayEncoder::PutSpaced(const T* src, int num_values, - const uint8_t* valid_bits, - int64_t valid_bits_offset) { +void DeltaLengthByteArrayEncoder::PutSpaced(const T* src, int num_values, + const uint8_t* valid_bits, + int64_t valid_bits_offset) { if (valid_bits != NULLPTR) { PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values * sizeof(T), this->memory_pool())); @@ -2649,8 +2647,7 @@ void DeltaLengthByteArrayEncoder::PutSpaced(const T* src, int num_values, } } -template -std::shared_ptr DeltaLengthByteArrayEncoder::FlushValues() { +std::shared_ptr DeltaLengthByteArrayEncoder::FlushValues() { std::shared_ptr encoded_lengths = length_encoder_.FlushValues(); std::shared_ptr data; @@ -2662,6 +2659,7 @@ std::shared_ptr DeltaLengthByteArrayEncoder::FlushValues() { std::shared_ptr buffer; PARQUET_THROW_NOT_OK(sink_.Finish(&buffer, true)); + encoded_size_ = 0; return buffer; } @@ -3187,8 +3185,7 @@ std::unique_ptr MakeEncoder(Type::type type_num, Encoding::type encodin } else if (encoding == Encoding::DELTA_LENGTH_BYTE_ARRAY) { switch (type_num) { case Type::BYTE_ARRAY: - return std::unique_ptr( - new DeltaLengthByteArrayEncoder(descr, pool)); + return std::unique_ptr(new DeltaLengthByteArrayEncoder(descr, pool)); default: throw ParquetException("DELTA_LENGTH_BYTE_ARRAY only supports BYTE_ARRAY"); break; From 04464b3690edf3180cf654ad3b24801da3091a8a Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Tue, 20 Dec 2022 17:20:31 +0100 Subject: [PATCH 03/22] Review feedback --- cpp/src/parquet/encoding.cc | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 91a6cef0ea2..2a3a0d53c03 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -2586,11 +2586,13 @@ class DeltaLengthByteArrayEncoder : public EncoderImpl, sink_(pool), length_encoder_(nullptr, pool), encoded_size_{0}, - lengths_{0} {} + lengths_(0, ::arrow::stl::allocator(pool)) {} std::shared_ptr FlushValues() override; - int64_t EstimatedDataEncodedSize() override { return encoded_size_; } + int64_t EstimatedDataEncodedSize() override { + return encoded_size_ + length_encoder_.EstimatedDataEncodedSize(); + } using TypedEncoder::Put; @@ -2605,12 +2607,24 @@ class DeltaLengthByteArrayEncoder : public EncoderImpl, ::arrow::BufferBuilder sink_; DeltaBitPackEncoder length_encoder_; uint32_t encoded_size_; - std::vector lengths_; + ArrowPoolVector lengths_; }; void DeltaLengthByteArrayEncoder::Put(const ::arrow::Array& values) { - auto src = values.data()->GetValues(1); - Put(src, static_cast(values.length())); + const ::arrow::ArrayData& data = *values.data(); + if (values.type_id() != ::arrow::Type::BINARY) { + throw ParquetException("Expected ByteArrayType, got ", values.type()->ToString()); + } + if (data.length > std::numeric_limits::max()) { + throw ParquetException("Array cannot be longer than ", + std::numeric_limits::max()); + } + if (values.null_count() == 0) { + Put(data.GetValues(1), static_cast(data.length)); + } else { + PutSpaced(data.GetValues(1), static_cast(data.length), + data.GetValues(0, 0), data.offset); + } } void DeltaLengthByteArrayEncoder::Put(const T* src, int num_values) { From 24a566bd485c13ae66b4aa2ac3fbd2d1e1f94bbf Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Tue, 20 Dec 2022 18:30:18 +0100 Subject: [PATCH 04/22] Review feedback --- cpp/src/parquet/encoding.cc | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 2a3a0d53c03..5432e33299f 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -2585,8 +2585,7 @@ class DeltaLengthByteArrayEncoder : public EncoderImpl, pool = ::arrow::default_memory_pool()), sink_(pool), length_encoder_(nullptr, pool), - encoded_size_{0}, - lengths_(0, ::arrow::stl::allocator(pool)) {} + encoded_size_{0} {} std::shared_ptr FlushValues() override; @@ -2607,7 +2606,6 @@ class DeltaLengthByteArrayEncoder : public EncoderImpl, ::arrow::BufferBuilder sink_; DeltaBitPackEncoder length_encoder_; uint32_t encoded_size_; - ArrowPoolVector lengths_; }; void DeltaLengthByteArrayEncoder::Put(const ::arrow::Array& values) { @@ -2631,18 +2629,22 @@ void DeltaLengthByteArrayEncoder::Put(const T* src, int num_values) { if (num_values == 0) { return; } - lengths_.resize(num_values); - for (int idx = 0; idx < num_values; idx++) { - auto len = static_cast(src[idx].len); - lengths_[idx] = len; - encoded_size_ += len; + constexpr int kBatchSize = 256; + std::array lengths; + for (int idx = 0; idx < num_values; idx += kBatchSize) { + const int batch_size = std::min(kBatchSize, num_values - idx); + for (int j = 0; j < batch_size; ++j) { + const int32_t len = src[idx + j].len; + encoded_size_ += len; + lengths[j] = len; + } + length_encoder_.Put(lengths.data(), batch_size); } - length_encoder_.Put(lengths_.data(), num_values); - PARQUET_THROW_NOT_OK(sink_.Reserve(encoded_size_)); + PARQUET_THROW_NOT_OK(sink_.Reserve(encoded_size_)); for (int idx = 0; idx < num_values; idx++) { - sink_.UnsafeAppend(src[idx].ptr, lengths_[idx]); + sink_.UnsafeAppend(src[idx].ptr, src[idx].len); } } From f5f241500f112179ed0b1df144ca402701f18003 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Fri, 20 Jan 2023 20:35:15 +0100 Subject: [PATCH 05/22] work --- cpp/src/parquet/column_writer_test.cc | 16 +++-- cpp/src/parquet/encoding.cc | 96 +++++++++++++++++++-------- cpp/src/parquet/encoding_test.cc | 74 ++++++++++++++++++++- 3 files changed, 152 insertions(+), 34 deletions(-) diff --git a/cpp/src/parquet/column_writer_test.cc b/cpp/src/parquet/column_writer_test.cc index d2938443fe3..fc6fc3677d2 100644 --- a/cpp/src/parquet/column_writer_test.cc +++ b/cpp/src/parquet/column_writer_test.cc @@ -400,9 +400,10 @@ typedef ::testing::Types; -using TestInt64TypeValuesWriter = TestPrimitiveWriter; +using TestValuesWriterInt32Type = TestPrimitiveWriter; +using TestValuesWriterInt64Type = TestPrimitiveWriter; using TestByteArrayValuesWriter = TestPrimitiveWriter; +using TestFixedLengthByteArrayValuesWriter = TestPrimitiveWriter; TYPED_TEST(TestPrimitiveWriter, RequiredPlain) { this->TestRequiredWithEncoding(Encoding::PLAIN); @@ -422,11 +423,11 @@ TYPED_TEST(TestPrimitiveWriter, RequiredBitPacked) { } */ -TEST_F(TestInt32TypeValuesWriter, RequiredDeltaBinaryPacked) { +TEST_F(TestValuesWriterInt32Type, RequiredDeltaBinaryPacked) { this->TestRequiredWithEncoding(Encoding::DELTA_BINARY_PACKED); } -TEST_F(TestInt64TypeValuesWriter, RequiredDeltaBinaryPacked) { +TEST_F(TestValuesWriterInt64Type, RequiredDeltaBinaryPacked) { this->TestRequiredWithEncoding(Encoding::DELTA_BINARY_PACKED); } @@ -434,6 +435,11 @@ TEST_F(TestByteArrayValuesWriter, RequiredDeltaLengthByteArray) { this->TestRequiredWithEncoding(Encoding::DELTA_LENGTH_BYTE_ARRAY); } +// TODO +//TEST_F(TestFixedLengthByteArrayValuesWriter, RequiredDeltaLengthByteArray) { +// this->TestRequiredWithEncoding(Encoding::DELTA_LENGTH_BYTE_ARRAY); +//} + /* TYPED_TEST(TestPrimitiveWriter, RequiredDeltaByteArray) { this->TestRequiredWithEncoding(Encoding::DELTA_BYTE_ARRAY); @@ -655,7 +661,7 @@ TEST(TestWriter, NullValuesBuffer) { // PARQUET-719 // Test case for NULL values -TEST_F(TestInt32TypeValuesWriter, OptionalNullValueChunk) { +TEST_F(TestValuesWriterInt32Type, OptionalNullValueChunk) { this->SetUpSchema(Repetition::OPTIONAL); this->GenerateData(LARGE_SIZE); diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 5432e33299f..856dc02a23d 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -2575,11 +2575,10 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder class DeltaLengthByteArrayEncoder : public EncoderImpl, virtual public TypedEncoder { public: - using T = typename ByteArrayType::c_type; - explicit DeltaLengthByteArrayEncoder(const ColumnDescriptor* descr, MemoryPool* pool) : EncoderImpl(descr, Encoding::DELTA_LENGTH_BYTE_ARRAY, pool = ::arrow::default_memory_pool()), @@ -2602,30 +2601,64 @@ class DeltaLengthByteArrayEncoder : public EncoderImpl, void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits, int64_t valid_bits_offset) override; + void UnsafePutByteArray(const void* data, uint32_t length) { + DCHECK(length == 0 || data != nullptr) << "Value ptr cannot be NULL"; + sink_.UnsafeAppend(&length, sizeof(uint32_t)); + sink_.UnsafeAppend(data, static_cast(length)); + } + protected: + template + void PutBinaryArray(const ArrayType& array) { + const int64_t total_bytes = + array.value_offset(array.length()) - array.value_offset(0); + PARQUET_THROW_NOT_OK(sink_.Reserve(total_bytes + array.length() * sizeof(uint32_t))); + + PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline( + *array.data(), + [&](::std::string_view view) { + if (ARROW_PREDICT_FALSE(view.size() > kMaxByteArraySize)) { + return Status::Invalid("Parquet cannot store strings with size 2GB or more"); + } + UnsafePutByteArray(view.data(), static_cast(view.size())); + return Status::OK(); + }, + []() { return Status::OK(); })); + } + + template + void PutBinaryArray2(const ArrayType& array) { + ARROW_LOG(INFO) << "PutBinaryArray2"; + } + ::arrow::BufferBuilder sink_; DeltaBitPackEncoder length_encoder_; uint32_t encoded_size_; }; -void DeltaLengthByteArrayEncoder::Put(const ::arrow::Array& values) { - const ::arrow::ArrayData& data = *values.data(); - if (values.type_id() != ::arrow::Type::BINARY) { - throw ParquetException("Expected ByteArrayType, got ", values.type()->ToString()); - } - if (data.length > std::numeric_limits::max()) { - throw ParquetException("Array cannot be longer than ", - std::numeric_limits::max()); - } - if (values.null_count() == 0) { - Put(data.GetValues(1), static_cast(data.length)); - } else { - PutSpaced(data.GetValues(1), static_cast(data.length), - data.GetValues(0, 0), data.offset); +template +void DeltaLengthByteArrayEncoder::Put(const ::arrow::Array& values) { + ARROW_LOG(INFO) << "DeltaLengthByteArrayEncoder::Put"; + AssertBaseBinary(values); + switch (values.type_id()) { + case ::arrow::Type::STRING: + PutBinaryArray(checked_cast(values)); + case ::arrow::Type::LARGE_STRING: + PutBinaryArray(checked_cast(values)); + case ::arrow::Type::BINARY: + PutBinaryArray(static_cast(values)); + case ::arrow::Type::LARGE_BINARY: + PutBinaryArray(static_cast(values)); + case ::arrow::Type::FIXED_SIZE_BINARY: + PutBinaryArray2(static_cast(values)); + default: + throw ParquetException("Expected ByteArray-like type, got ", + values.type()->ToString()); } } -void DeltaLengthByteArrayEncoder::Put(const T* src, int num_values) { +template +void DeltaLengthByteArrayEncoder::Put(const T* src, int num_values) { if (num_values == 0) { return; } @@ -2648,9 +2681,10 @@ void DeltaLengthByteArrayEncoder::Put(const T* src, int num_values) { } } -void DeltaLengthByteArrayEncoder::PutSpaced(const T* src, int num_values, - const uint8_t* valid_bits, - int64_t valid_bits_offset) { +template +void DeltaLengthByteArrayEncoder::PutSpaced(const T* src, int num_values, + const uint8_t* valid_bits, + int64_t valid_bits_offset) { if (valid_bits != NULLPTR) { PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values * sizeof(T), this->memory_pool())); @@ -2663,7 +2697,8 @@ void DeltaLengthByteArrayEncoder::PutSpaced(const T* src, int num_values, } } -std::shared_ptr DeltaLengthByteArrayEncoder::FlushValues() { +template +std::shared_ptr DeltaLengthByteArrayEncoder::FlushValues() { std::shared_ptr encoded_lengths = length_encoder_.FlushValues(); std::shared_ptr data; @@ -3201,13 +3236,16 @@ std::unique_ptr MakeEncoder(Type::type type_num, Encoding::type encodin } else if (encoding == Encoding::DELTA_LENGTH_BYTE_ARRAY) { switch (type_num) { case Type::BYTE_ARRAY: - return std::unique_ptr(new DeltaLengthByteArrayEncoder(descr, pool)); + return std::unique_ptr( + new DeltaLengthByteArrayEncoder(descr, pool)); + case Type::FIXED_LEN_BYTE_ARRAY: + return std::unique_ptr( + new DeltaLengthByteArrayEncoder(descr, pool)); default: throw ParquetException("DELTA_LENGTH_BYTE_ARRAY only supports BYTE_ARRAY"); - break; } } else { - ParquetException::NYI("Selected encoding is not supported"); + ParquetException::NYI("Selected encoding is not supported1"); } DCHECK(false) << "Should not be able to reach this code"; return nullptr; @@ -3263,10 +3301,14 @@ std::unique_ptr MakeDecoder(Type::type type_num, Encoding::type encodin } throw ParquetException("DELTA_BYTE_ARRAY only supports BYTE_ARRAY"); } else if (encoding == Encoding::DELTA_LENGTH_BYTE_ARRAY) { - if (type_num == Type::BYTE_ARRAY) { - return std::make_unique(descr); + switch (type_num) { + case Type::BYTE_ARRAY: + return std::make_unique(descr); + case Type::FIXED_LEN_BYTE_ARRAY: + return std::make_unique(descr); + default: + throw ParquetException("DELTA_LENGTH_BYTE_ARRAY only supports BYTE_ARRAY"); } - throw ParquetException("DELTA_LENGTH_BYTE_ARRAY only supports BYTE_ARRAY"); } else if (encoding == Encoding::RLE) { if (type_num == Type::BOOLEAN) { return std::make_unique(descr); diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc index b4d0d97d3cd..78886dc7759 100644 --- a/cpp/src/parquet/encoding_test.cc +++ b/cpp/src/parquet/encoding_test.cc @@ -18,7 +18,6 @@ #include #include #include -#include #include #include #include @@ -741,6 +740,63 @@ class EncodingAdHocTyped : public ::testing::Test { ::arrow::AssertArraysEqual(*values, *result); } + void DeltaBitPack(int seed) { + if (!std::is_same::value && + !std::is_same::value) { + return; + } + auto values = GetValues(seed); + auto encoder = MakeTypedEncoder( + Encoding::DELTA_BINARY_PACKED, /*use_dictionary=*/false, column_descr()); + auto decoder = + MakeTypedDecoder(Encoding::DELTA_BINARY_PACKED, column_descr()); + + ASSERT_NO_THROW(encoder->Put(*values)); + auto buf = encoder->FlushValues(); + + int num_values = static_cast(values->length() - values->null_count()); + decoder->SetData(num_values, buf->data(), static_cast(buf->size())); + + BuilderType acc(arrow_type(), ::arrow::default_memory_pool()); + ASSERT_EQ(num_values, + decoder->DecodeArrow(static_cast(values->length()), + static_cast(values->null_count()), + values->null_bitmap_data(), values->offset(), &acc)); + + std::shared_ptr<::arrow::Array> result; + ASSERT_OK(acc.Finish(&result)); + ASSERT_EQ(50, result->length()); + ::arrow::AssertArraysEqual(*values, *result); + } + + void DeltaLengthByte(int seed) { + if (!std::is_same::value) { + return; + } + auto values = GetValues(seed); + auto encoder = MakeTypedEncoder( + Encoding::DELTA_LENGTH_BYTE_ARRAY, /*use_dictionary=*/false, column_descr()); + auto decoder = + MakeTypedDecoder(Encoding::DELTA_LENGTH_BYTE_ARRAY, column_descr()); + + ASSERT_NO_THROW(encoder->Put(*values)); + auto buf = encoder->FlushValues(); + + int num_values = static_cast(values->length() - values->null_count()); + decoder->SetData(num_values, buf->data(), static_cast(buf->size())); + + BuilderType acc(arrow_type(), ::arrow::default_memory_pool()); + ASSERT_EQ(num_values, + decoder->DecodeArrow(static_cast(values->length()), + static_cast(values->null_count()), + values->null_bitmap_data(), values->offset(), &acc)); + + std::shared_ptr<::arrow::Array> result; + ASSERT_OK(acc.Finish(&result)); + ASSERT_EQ(50, result->length()); + ::arrow::AssertArraysEqual(*values, *result); + } + void Dict(int seed) { if (std::is_same::value) { return; @@ -826,7 +882,7 @@ class EncodingAdHocTyped : public ::testing::Test { protected: const int64_t size_ = 50; - const double null_probability_ = 0.25; + double null_probability_ = 0.25; }; template @@ -879,6 +935,20 @@ TYPED_TEST(EncodingAdHocTyped, ByteStreamSplitArrowDirectPut) { } } +TYPED_TEST(EncodingAdHocTyped, DeltaBitPackArrowDirectPut) { + this->null_probability_ = 0; + for (auto seed : {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) { + this->DeltaBitPack(seed); + } +} + +// TODO +TYPED_TEST(EncodingAdHocTyped, DeltaLengthByteArrowDirectPut) { + for (auto seed : {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) { + this->DeltaLengthByte(seed); + } +} + TEST(DictEncodingAdHoc, ArrowBinaryDirectPut) { // Implemented as part of ARROW-3246 const int64_t size = 50; From 7de26ec937615397f17da297d5eeeabd605ee439 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Tue, 24 Jan 2023 19:14:38 +0100 Subject: [PATCH 06/22] work --- cpp/src/parquet/column_reader.cc | 6 +++--- cpp/src/parquet/encoding.cc | 6 +----- cpp/src/parquet/encoding.h | 4 +++- cpp/src/parquet/encoding_test.cc | 20 ++++++++++---------- 4 files changed, 17 insertions(+), 19 deletions(-) diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index 3670af49fbf..9ab96656d89 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -1967,14 +1967,14 @@ class ByteArrayChunkedRecordReader : public TypedRecordReader, ::arrow::MemoryPool* pool) : TypedRecordReader(descr, leaf_info, pool) { DCHECK_EQ(descr_->physical_type(), Type::BYTE_ARRAY); - accumulator_.builder = std::make_unique<::arrow::BinaryBuilder>(pool); + accumulator_.offsets_builder = std::make_unique<::arrow::BinaryBuilder>(pool); } ::arrow::ArrayVector GetBuilderChunks() override { ::arrow::ArrayVector result = accumulator_.chunks; - if (result.size() == 0 || accumulator_.builder->length() > 0) { + if (result.size() == 0 || accumulator_.offsets_builder->length() > 0) { std::shared_ptr<::arrow::Array> last_chunk; - PARQUET_THROW_NOT_OK(accumulator_.builder->Finish(&last_chunk)); + PARQUET_THROW_NOT_OK(accumulator_.offsets_builder->Finish(&last_chunk)); result.push_back(std::move(last_chunk)); } accumulator_.chunks = {}; diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 856dc02a23d..ffbcd4617f0 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -1233,7 +1233,7 @@ int PlainBooleanDecoder::Decode(bool* buffer, int max_values) { struct ArrowBinaryHelper { explicit ArrowBinaryHelper(typename EncodingTraits::Accumulator* out) { this->out = out; - this->builder = out->builder.get(); + this->builder = out->offsets_builder.get(); this->chunk_space_remaining = ::arrow::kBinaryMemoryLimit - this->builder->value_data_length(); } @@ -3220,7 +3220,6 @@ std::unique_ptr MakeEncoder(Type::type type_num, Encoding::type encodin return std::make_unique>(descr, pool); default: throw ParquetException("BYTE_STREAM_SPLIT only supports FLOAT and DOUBLE"); - break; } } else if (encoding == Encoding::DELTA_BINARY_PACKED) { switch (type_num) { @@ -3231,7 +3230,6 @@ std::unique_ptr MakeEncoder(Type::type type_num, Encoding::type encodin default: throw ParquetException( "DELTA_BINARY_PACKED encoder only supports INT32 and INT64"); - break; } } else if (encoding == Encoding::DELTA_LENGTH_BYTE_ARRAY) { switch (type_num) { @@ -3282,7 +3280,6 @@ std::unique_ptr MakeDecoder(Type::type type_num, Encoding::type encodin return std::make_unique>(descr); default: throw ParquetException("BYTE_STREAM_SPLIT only supports FLOAT and DOUBLE"); - break; } } else if (encoding == Encoding::DELTA_BINARY_PACKED) { switch (type_num) { @@ -3293,7 +3290,6 @@ std::unique_ptr MakeDecoder(Type::type type_num, Encoding::type encodin default: throw ParquetException( "DELTA_BINARY_PACKED decoder only supports INT32 and INT64"); - break; } } else if (encoding == Encoding::DELTA_BYTE_ARRAY) { if (type_num == Type::BYTE_ARRAY) { diff --git a/cpp/src/parquet/encoding.h b/cpp/src/parquet/encoding.h index 374a02cf491..5c04142ae41 100644 --- a/cpp/src/parquet/encoding.h +++ b/cpp/src/parquet/encoding.h @@ -22,6 +22,7 @@ #include #include +#include "arrow/buffer_builder.h" #include "arrow/util/spaced.h" #include "parquet/exception.h" @@ -144,7 +145,8 @@ struct EncodingTraits { /// \brief Internal helper class for decoding BYTE_ARRAY data where we can /// overflow the capacity of a single arrow::BinaryArray struct Accumulator { - std::unique_ptr<::arrow::BinaryBuilder> builder; + std::unique_ptr<::arrow::BinaryBuilder> offsets_builder; + std::unique_ptr<::arrow::BufferBuilder> data_builder; std::vector> chunks; }; using ArrowType = ::arrow::BinaryType; diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc index 78886dc7759..126c3fe7dab 100644 --- a/cpp/src/parquet/encoding_test.cc +++ b/cpp/src/parquet/encoding_test.cc @@ -509,12 +509,12 @@ class TestArrowBuilderDecoding : public ::testing::Test { InitTestCase(np); typename EncodingTraits::Accumulator acc; - acc.builder.reset(new ::arrow::BinaryBuilder); + acc.offsets_builder.reset(new ::arrow::BinaryBuilder); auto actual_num_values = decoder_->DecodeArrow(num_values_, null_count_, valid_bits_, 0, &acc); std::shared_ptr<::arrow::Array> chunk; - ASSERT_OK(acc.builder->Finish(&chunk)); + ASSERT_OK(acc.offsets_builder->Finish(&chunk)); CheckDense(actual_num_values, *chunk); } } @@ -536,10 +536,10 @@ class TestArrowBuilderDecoding : public ::testing::Test { continue; } typename EncodingTraits::Accumulator acc; - acc.builder.reset(new ::arrow::BinaryBuilder); + acc.offsets_builder.reset(new ::arrow::BinaryBuilder); auto actual_num_values = decoder_->DecodeArrowNonNull(num_values_, &acc); std::shared_ptr<::arrow::Array> chunk; - ASSERT_OK(acc.builder->Finish(&chunk)); + ASSERT_OK(acc.offsets_builder->Finish(&chunk)); CheckDense(actual_num_values, *chunk); } } @@ -626,14 +626,14 @@ TEST(PlainEncodingAdHoc, ArrowBinaryDirectPut) { decoder->SetData(num_values, buf->data(), static_cast(buf->size())); typename EncodingTraits::Accumulator acc; - acc.builder.reset(new ::arrow::StringBuilder); + acc.offsets_builder.reset(new ::arrow::StringBuilder); ASSERT_EQ(num_values, decoder->DecodeArrow(static_cast(values->length()), static_cast(values->null_count()), values->null_bitmap_data(), values->offset(), &acc)); std::shared_ptr<::arrow::Array> result; - ASSERT_OK(acc.builder->Finish(&result)); + ASSERT_OK(acc.offsets_builder->Finish(&result)); ASSERT_EQ(50, result->length()); ::arrow::AssertArraysEqual(*values, *result); }; @@ -971,14 +971,14 @@ TEST(DictEncodingAdHoc, ArrowBinaryDirectPut) { GetDictDecoder(encoder, num_values, &buf, &dict_buf, nullptr, &decoder); typename EncodingTraits::Accumulator acc; - acc.builder.reset(new ::arrow::StringBuilder); + acc.offsets_builder.reset(new ::arrow::StringBuilder); ASSERT_EQ(num_values, decoder->DecodeArrow(static_cast(values->length()), static_cast(values->null_count()), values->null_bitmap_data(), values->offset(), &acc)); std::shared_ptr<::arrow::Array> result; - ASSERT_OK(acc.builder->Finish(&result)); + ASSERT_OK(acc.offsets_builder->Finish(&result)); ::arrow::AssertArraysEqual(*values, *result); } @@ -1017,14 +1017,14 @@ TEST(DictEncodingAdHoc, PutDictionaryPutIndices) { GetDictDecoder(encoder, num_values, &buf, &dict_buf, nullptr, &decoder); typename EncodingTraits::Accumulator acc; - acc.builder.reset(new ::arrow::BinaryBuilder); + acc.offsets_builder.reset(new ::arrow::BinaryBuilder); ASSERT_EQ(num_values, decoder->DecodeArrow(static_cast(expected->length()), static_cast(expected->null_count()), expected->null_bitmap_data(), expected->offset(), &acc)); std::shared_ptr<::arrow::Array> result; - ASSERT_OK(acc.builder->Finish(&result)); + ASSERT_OK(acc.offsets_builder->Finish(&result)); ::arrow::AssertArraysEqual(*expected, *result); }; From e31b69bacfce2a4c6f29581b47712cc9d418a00c Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Wed, 25 Jan 2023 22:41:53 +0100 Subject: [PATCH 07/22] work --- cpp/src/parquet/column_reader.cc | 6 +- cpp/src/parquet/column_writer_test.cc | 2 +- cpp/src/parquet/encoding.cc | 115 ++++++++++++++++---------- cpp/src/parquet/encoding.h | 4 +- cpp/src/parquet/encoding_benchmark.cc | 4 +- cpp/src/parquet/encoding_test.cc | 92 +++++++++++---------- 6 files changed, 126 insertions(+), 97 deletions(-) diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index 9ab96656d89..2691be588bd 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -1967,14 +1967,14 @@ class ByteArrayChunkedRecordReader : public TypedRecordReader, ::arrow::MemoryPool* pool) : TypedRecordReader(descr, leaf_info, pool) { DCHECK_EQ(descr_->physical_type(), Type::BYTE_ARRAY); - accumulator_.offsets_builder = std::make_unique<::arrow::BinaryBuilder>(pool); + accumulator_.data_builder = std::make_unique<::arrow::BinaryBuilder>(pool); } ::arrow::ArrayVector GetBuilderChunks() override { ::arrow::ArrayVector result = accumulator_.chunks; - if (result.size() == 0 || accumulator_.offsets_builder->length() > 0) { + if (result.size() == 0 || accumulator_.data_builder->length() > 0) { std::shared_ptr<::arrow::Array> last_chunk; - PARQUET_THROW_NOT_OK(accumulator_.offsets_builder->Finish(&last_chunk)); + PARQUET_THROW_NOT_OK(accumulator_.data_builder->Finish(&last_chunk)); result.push_back(std::move(last_chunk)); } accumulator_.chunks = {}; diff --git a/cpp/src/parquet/column_writer_test.cc b/cpp/src/parquet/column_writer_test.cc index fc6fc3677d2..6ea88a19179 100644 --- a/cpp/src/parquet/column_writer_test.cc +++ b/cpp/src/parquet/column_writer_test.cc @@ -436,7 +436,7 @@ TEST_F(TestByteArrayValuesWriter, RequiredDeltaLengthByteArray) { } // TODO -//TEST_F(TestFixedLengthByteArrayValuesWriter, RequiredDeltaLengthByteArray) { +// TEST_F(TestFixedLengthByteArrayValuesWriter, RequiredDeltaLengthByteArray) { // this->TestRequiredWithEncoding(Encoding::DELTA_LENGTH_BYTE_ARRAY); //} diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index ffbcd4617f0..9af152816f6 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -1233,14 +1233,14 @@ int PlainBooleanDecoder::Decode(bool* buffer, int max_values) { struct ArrowBinaryHelper { explicit ArrowBinaryHelper(typename EncodingTraits::Accumulator* out) { this->out = out; - this->builder = out->offsets_builder.get(); + this->data_builder = out->data_builder.get(); this->chunk_space_remaining = - ::arrow::kBinaryMemoryLimit - this->builder->value_data_length(); + ::arrow::kBinaryMemoryLimit - this->data_builder->value_data_length(); } Status PushChunk() { std::shared_ptr<::arrow::Array> result; - RETURN_NOT_OK(builder->Finish(&result)); + RETURN_NOT_OK(data_builder->Finish(&result)); out->chunks.push_back(result); chunk_space_remaining = ::arrow::kBinaryMemoryLimit; return Status::OK(); @@ -1250,20 +1250,20 @@ struct ArrowBinaryHelper { void UnsafeAppend(const uint8_t* data, int32_t length) { chunk_space_remaining -= length; - builder->UnsafeAppend(data, length); + data_builder->UnsafeAppend(data, length); } - void UnsafeAppendNull() { builder->UnsafeAppendNull(); } + void UnsafeAppendNull() { data_builder->UnsafeAppendNull(); } Status Append(const uint8_t* data, int32_t length) { chunk_space_remaining -= length; - return builder->Append(data, length); + return data_builder->Append(data, length); } - Status AppendNull() { return builder->AppendNull(); } + Status AppendNull() { return data_builder->AppendNull(); } typename EncodingTraits::Accumulator* out; - ::arrow::BinaryBuilder* builder; + ::arrow::BinaryBuilder* data_builder; int64_t chunk_space_remaining; }; @@ -1368,8 +1368,8 @@ class PlainByteArrayDecoder : public PlainDecoder, ArrowBinaryHelper helper(out); int values_decoded = 0; - RETURN_NOT_OK(helper.builder->Reserve(num_values)); - RETURN_NOT_OK(helper.builder->ReserveData( + RETURN_NOT_OK(helper.data_builder->Reserve(num_values)); + RETURN_NOT_OK(helper.data_builder->ReserveData( std::min(len_, helper.chunk_space_remaining))); int i = 0; @@ -1390,8 +1390,8 @@ class PlainByteArrayDecoder : public PlainDecoder, if (ARROW_PREDICT_FALSE(!helper.CanFit(value_len))) { // This element would exceed the capacity of a chunk RETURN_NOT_OK(helper.PushChunk()); - RETURN_NOT_OK(helper.builder->Reserve(num_values - i)); - RETURN_NOT_OK(helper.builder->ReserveData( + RETURN_NOT_OK(helper.data_builder->Reserve(num_values - i)); + RETURN_NOT_OK(helper.data_builder->ReserveData( std::min(len_, helper.chunk_space_remaining))); } helper.UnsafeAppend(data_ + 4, value_len); @@ -2601,36 +2601,32 @@ class DeltaLengthByteArrayEncoder : public EncoderImpl, void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits, int64_t valid_bits_offset) override; - void UnsafePutByteArray(const void* data, uint32_t length) { - DCHECK(length == 0 || data != nullptr) << "Value ptr cannot be NULL"; - sink_.UnsafeAppend(&length, sizeof(uint32_t)); - sink_.UnsafeAppend(data, static_cast(length)); + void Put(const ByteArray& val) { + // Write the result to the output stream + const int64_t increment = static_cast(val.len + sizeof(uint32_t)); + if (ARROW_PREDICT_FALSE(sink_.length() + increment > sink_.capacity())) { + PARQUET_THROW_NOT_OK(sink_.Reserve(increment)); + } + DCHECK(val.len == 0 || val.ptr != nullptr) << "Value ptr cannot be NULL"; + sink_.UnsafeAppend(val.ptr, val.len); } protected: template void PutBinaryArray(const ArrayType& array) { - const int64_t total_bytes = - array.value_offset(array.length()) - array.value_offset(0); - PARQUET_THROW_NOT_OK(sink_.Reserve(total_bytes + array.length() * sizeof(uint32_t))); - PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline( *array.data(), [&](::std::string_view view) { if (ARROW_PREDICT_FALSE(view.size() > kMaxByteArraySize)) { return Status::Invalid("Parquet cannot store strings with size 2GB or more"); } - UnsafePutByteArray(view.data(), static_cast(view.size())); + length_encoder_.Put({static_cast(view.length())}, 1); + Put(view); return Status::OK(); }, []() { return Status::OK(); })); } - template - void PutBinaryArray2(const ArrayType& array) { - ARROW_LOG(INFO) << "PutBinaryArray2"; - } - ::arrow::BufferBuilder sink_; DeltaBitPackEncoder length_encoder_; uint32_t encoded_size_; @@ -2638,22 +2634,12 @@ class DeltaLengthByteArrayEncoder : public EncoderImpl, template void DeltaLengthByteArrayEncoder::Put(const ::arrow::Array& values) { - ARROW_LOG(INFO) << "DeltaLengthByteArrayEncoder::Put"; AssertBaseBinary(values); - switch (values.type_id()) { - case ::arrow::Type::STRING: - PutBinaryArray(checked_cast(values)); - case ::arrow::Type::LARGE_STRING: - PutBinaryArray(checked_cast(values)); - case ::arrow::Type::BINARY: - PutBinaryArray(static_cast(values)); - case ::arrow::Type::LARGE_BINARY: - PutBinaryArray(static_cast(values)); - case ::arrow::Type::FIXED_SIZE_BINARY: - PutBinaryArray2(static_cast(values)); - default: - throw ParquetException("Expected ByteArray-like type, got ", - values.type()->ToString()); + if (::arrow::is_binary_like(values.type_id())) { + PutBinaryArray(checked_cast(values)); + } else { + DCHECK(::arrow::is_large_binary_like(values.type_id())); + PutBinaryArray(checked_cast(values)); } } @@ -2669,7 +2655,9 @@ void DeltaLengthByteArrayEncoder::Put(const T* src, int num_values) { const int batch_size = std::min(kBatchSize, num_values - idx); for (int j = 0; j < batch_size; ++j) { const int32_t len = src[idx + j].len; - encoded_size_ += len; + if (AddWithOverflow(encoded_size_, len, &encoded_size_)) { + throw ParquetException("excess expansion in DELTA_LENGTH_BYTE_ARRAY"); + } lengths[j] = len; } length_encoder_.Put(lengths.data(), batch_size); @@ -2781,13 +2769,17 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl, int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, typename EncodingTraits::Accumulator* out) override { - ParquetException::NYI("DecodeArrow for DeltaLengthByteArrayDecoder"); + int result = 0; + PARQUET_THROW_NOT_OK(DecodeArrowDense(num_values, null_count, valid_bits, + valid_bits_offset, out, &result)); + return result; } int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, typename EncodingTraits::DictAccumulator* out) override { - ParquetException::NYI("DecodeArrow for DeltaLengthByteArrayDecoder"); + ParquetException::NYI( + "DecodeArrow of DictAccumulator for DeltaLengthByteArrayDecoder"); } private: @@ -2809,6 +2801,41 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl, num_valid_values_ = num_length; } + Status DecodeArrowDense(int num_values, int null_count, const uint8_t* valid_bits, + int64_t valid_bits_offset, + typename EncodingTraits::Accumulator* out, + int* out_num_values) { + ArrowBinaryHelper helper(out); + + std::vector values(num_values); + const int num_valid_values = Decode(values.data(), num_values - null_count); + DCHECK_EQ(num_values - null_count, num_valid_values); + + auto values_ptr = reinterpret_cast(values.data()); + int value_idx = 0; + + RETURN_NOT_OK(VisitNullBitmapInline( + valid_bits, valid_bits_offset, num_values, null_count, + [&]() { + const auto& val = values_ptr[value_idx]; + if (ARROW_PREDICT_FALSE(!helper.CanFit(val.len))) { + RETURN_NOT_OK(helper.PushChunk()); + } + RETURN_NOT_OK(helper.Append(val.ptr, static_cast(val.len))); + ++value_idx; + return Status::OK(); + }, + [&]() { + RETURN_NOT_OK(helper.AppendNull()); + --null_count; + return Status::OK(); + })); + + DCHECK_EQ(null_count, 0); + *out_num_values = num_valid_values; + return Status::OK(); + } + std::shared_ptr<::arrow::bit_util::BitReader> decoder_; DeltaBitPackDecoder len_decoder_; int num_valid_values_; diff --git a/cpp/src/parquet/encoding.h b/cpp/src/parquet/encoding.h index 5c04142ae41..1505c0c3dae 100644 --- a/cpp/src/parquet/encoding.h +++ b/cpp/src/parquet/encoding.h @@ -145,8 +145,8 @@ struct EncodingTraits { /// \brief Internal helper class for decoding BYTE_ARRAY data where we can /// overflow the capacity of a single arrow::BinaryArray struct Accumulator { - std::unique_ptr<::arrow::BinaryBuilder> offsets_builder; - std::unique_ptr<::arrow::BufferBuilder> data_builder; + std::unique_ptr<::arrow::Int32Builder> offsets_builder; + std::unique_ptr<::arrow::BinaryBuilder> data_builder; std::vector> chunks; }; using ArrowType = ::arrow::BinaryType; diff --git a/cpp/src/parquet/encoding_benchmark.cc b/cpp/src/parquet/encoding_benchmark.cc index e6a3c2c58ca..fc37f326618 100644 --- a/cpp/src/parquet/encoding_benchmark.cc +++ b/cpp/src/parquet/encoding_benchmark.cc @@ -698,7 +698,7 @@ class BenchmarkDecodeArrow : public ::benchmark::Fixture { for (auto _ : state) { auto decoder = InitializeDecoder(); typename EncodingTraits::Accumulator acc; - acc.builder.reset(new BinaryBuilder); + acc.data_builder.reset(new BinaryBuilder); decoder->DecodeArrow(num_values_, 0, valid_bits_, 0, &acc); } state.SetBytesProcessed(state.iterations() * total_size_); @@ -708,7 +708,7 @@ class BenchmarkDecodeArrow : public ::benchmark::Fixture { for (auto _ : state) { auto decoder = InitializeDecoder(); typename EncodingTraits::Accumulator acc; - acc.builder.reset(new BinaryBuilder); + acc.data_builder.reset(new BinaryBuilder); decoder->DecodeArrowNonNull(num_values_, &acc); } state.SetBytesProcessed(state.iterations() * total_size_); diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc index 126c3fe7dab..66888d9485b 100644 --- a/cpp/src/parquet/encoding_test.cc +++ b/cpp/src/parquet/encoding_test.cc @@ -509,12 +509,12 @@ class TestArrowBuilderDecoding : public ::testing::Test { InitTestCase(np); typename EncodingTraits::Accumulator acc; - acc.offsets_builder.reset(new ::arrow::BinaryBuilder); + acc.data_builder.reset(new ::arrow::BinaryBuilder); auto actual_num_values = decoder_->DecodeArrow(num_values_, null_count_, valid_bits_, 0, &acc); std::shared_ptr<::arrow::Array> chunk; - ASSERT_OK(acc.offsets_builder->Finish(&chunk)); + ASSERT_OK(acc.data_builder->Finish(&chunk)); CheckDense(actual_num_values, *chunk); } } @@ -536,10 +536,10 @@ class TestArrowBuilderDecoding : public ::testing::Test { continue; } typename EncodingTraits::Accumulator acc; - acc.offsets_builder.reset(new ::arrow::BinaryBuilder); + acc.data_builder.reset(new ::arrow::BinaryBuilder); auto actual_num_values = decoder_->DecodeArrowNonNull(num_values_, &acc); std::shared_ptr<::arrow::Array> chunk; - ASSERT_OK(acc.offsets_builder->Finish(&chunk)); + ASSERT_OK(acc.data_builder->Finish(&chunk)); CheckDense(actual_num_values, *chunk); } } @@ -626,14 +626,14 @@ TEST(PlainEncodingAdHoc, ArrowBinaryDirectPut) { decoder->SetData(num_values, buf->data(), static_cast(buf->size())); typename EncodingTraits::Accumulator acc; - acc.offsets_builder.reset(new ::arrow::StringBuilder); + acc.data_builder.reset(new ::arrow::StringBuilder); ASSERT_EQ(num_values, decoder->DecodeArrow(static_cast(values->length()), static_cast(values->null_count()), values->null_bitmap_data(), values->offset(), &acc)); std::shared_ptr<::arrow::Array> result; - ASSERT_OK(acc.offsets_builder->Finish(&result)); + ASSERT_OK(acc.data_builder->Finish(&result)); ASSERT_EQ(50, result->length()); ::arrow::AssertArraysEqual(*values, *result); }; @@ -769,34 +769,6 @@ class EncodingAdHocTyped : public ::testing::Test { ::arrow::AssertArraysEqual(*values, *result); } - void DeltaLengthByte(int seed) { - if (!std::is_same::value) { - return; - } - auto values = GetValues(seed); - auto encoder = MakeTypedEncoder( - Encoding::DELTA_LENGTH_BYTE_ARRAY, /*use_dictionary=*/false, column_descr()); - auto decoder = - MakeTypedDecoder(Encoding::DELTA_LENGTH_BYTE_ARRAY, column_descr()); - - ASSERT_NO_THROW(encoder->Put(*values)); - auto buf = encoder->FlushValues(); - - int num_values = static_cast(values->length() - values->null_count()); - decoder->SetData(num_values, buf->data(), static_cast(buf->size())); - - BuilderType acc(arrow_type(), ::arrow::default_memory_pool()); - ASSERT_EQ(num_values, - decoder->DecodeArrow(static_cast(values->length()), - static_cast(values->null_count()), - values->null_bitmap_data(), values->offset(), &acc)); - - std::shared_ptr<::arrow::Array> result; - ASSERT_OK(acc.Finish(&result)); - ASSERT_EQ(50, result->length()); - ::arrow::AssertArraysEqual(*values, *result); - } - void Dict(int seed) { if (std::is_same::value) { return; @@ -942,13 +914,6 @@ TYPED_TEST(EncodingAdHocTyped, DeltaBitPackArrowDirectPut) { } } -// TODO -TYPED_TEST(EncodingAdHocTyped, DeltaLengthByteArrowDirectPut) { - for (auto seed : {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) { - this->DeltaLengthByte(seed); - } -} - TEST(DictEncodingAdHoc, ArrowBinaryDirectPut) { // Implemented as part of ARROW-3246 const int64_t size = 50; @@ -971,14 +936,14 @@ TEST(DictEncodingAdHoc, ArrowBinaryDirectPut) { GetDictDecoder(encoder, num_values, &buf, &dict_buf, nullptr, &decoder); typename EncodingTraits::Accumulator acc; - acc.offsets_builder.reset(new ::arrow::StringBuilder); + acc.data_builder.reset(new ::arrow::StringBuilder); ASSERT_EQ(num_values, decoder->DecodeArrow(static_cast(values->length()), static_cast(values->null_count()), values->null_bitmap_data(), values->offset(), &acc)); std::shared_ptr<::arrow::Array> result; - ASSERT_OK(acc.offsets_builder->Finish(&result)); + ASSERT_OK(acc.data_builder->Finish(&result)); ::arrow::AssertArraysEqual(*values, *result); } @@ -1017,14 +982,14 @@ TEST(DictEncodingAdHoc, PutDictionaryPutIndices) { GetDictDecoder(encoder, num_values, &buf, &dict_buf, nullptr, &decoder); typename EncodingTraits::Accumulator acc; - acc.offsets_builder.reset(new ::arrow::BinaryBuilder); + acc.data_builder.reset(new ::arrow::BinaryBuilder); ASSERT_EQ(num_values, decoder->DecodeArrow(static_cast(expected->length()), static_cast(expected->null_count()), expected->null_bitmap_data(), expected->offset(), &acc)); std::shared_ptr<::arrow::Array> result; - ASSERT_OK(acc.offsets_builder->Finish(&result)); + ASSERT_OK(acc.data_builder->Finish(&result)); ::arrow::AssertArraysEqual(*expected, *result); }; @@ -1625,5 +1590,42 @@ TYPED_TEST(TestDeltaLengthByteArrayEncoding, BasicRoundTrip) { /*null_probability*/ 0.1)); } +TEST(DeltaLengthByteArrayEncodingAdHoc, ArrowBinaryDirectPut) { + const int64_t size = 50; + const int32_t min_length = 0; + const int32_t max_length = 10; + const double null_probability = 0.25; + + auto CheckSeed = [&](int seed) { + ::arrow::random::RandomArrayGenerator rag(seed); + auto values = rag.String(size, min_length, max_length, null_probability); + + auto encoder = MakeTypedEncoder(Encoding::DELTA_LENGTH_BYTE_ARRAY); + auto decoder = MakeTypedDecoder(Encoding::DELTA_LENGTH_BYTE_ARRAY); + + ASSERT_NO_THROW(encoder->Put(*values)); + auto buf = encoder->FlushValues(); + + int num_values = static_cast(values->length() - values->null_count()); + decoder->SetData(num_values, buf->data(), static_cast(buf->size())); + + typename EncodingTraits::Accumulator acc; + acc.data_builder.reset(new ::arrow::StringBuilder); + ASSERT_EQ(num_values, + decoder->DecodeArrow(static_cast(values->length()), + static_cast(values->null_count()), + values->null_bitmap_data(), values->offset(), &acc)); + + std::shared_ptr<::arrow::Array> result; + ASSERT_OK(acc.data_builder->Finish(&result)); + ASSERT_EQ(50, result->length()); + ::arrow::AssertArraysEqual(*values, *result); + }; + + for (auto seed : {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) { + CheckSeed(seed); + } +} + } // namespace test } // namespace parquet From c94ad8879effc1b7e1e12d30209de798640289aa Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Thu, 26 Jan 2023 13:41:16 +0100 Subject: [PATCH 08/22] work --- cpp/src/parquet/column_writer_test.cc | 11 +++++------ cpp/src/parquet/encoding.cc | 12 ++++-------- 2 files changed, 9 insertions(+), 14 deletions(-) diff --git a/cpp/src/parquet/column_writer_test.cc b/cpp/src/parquet/column_writer_test.cc index 6ea88a19179..0d90ee6034c 100644 --- a/cpp/src/parquet/column_writer_test.cc +++ b/cpp/src/parquet/column_writer_test.cc @@ -435,13 +435,12 @@ TEST_F(TestByteArrayValuesWriter, RequiredDeltaLengthByteArray) { this->TestRequiredWithEncoding(Encoding::DELTA_LENGTH_BYTE_ARRAY); } -// TODO -// TEST_F(TestFixedLengthByteArrayValuesWriter, RequiredDeltaLengthByteArray) { -// this->TestRequiredWithEncoding(Encoding::DELTA_LENGTH_BYTE_ARRAY); -//} - /* -TYPED_TEST(TestPrimitiveWriter, RequiredDeltaByteArray) { +TYPED_TEST(TestByteArrayValuesWriter, RequiredDeltaByteArray) { + this->TestRequiredWithEncoding(Encoding::DELTA_BYTE_ARRAY); +} + +TEST_F(TestFixedLengthByteArrayValuesWriter, RequiredDeltaByteArray) { this->TestRequiredWithEncoding(Encoding::DELTA_BYTE_ARRAY); } */ diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 9af152816f6..2d65ffe6299 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -3270,7 +3270,7 @@ std::unique_ptr MakeEncoder(Type::type type_num, Encoding::type encodin throw ParquetException("DELTA_LENGTH_BYTE_ARRAY only supports BYTE_ARRAY"); } } else { - ParquetException::NYI("Selected encoding is not supported1"); + ParquetException::NYI("Selected encoding is not supported"); } DCHECK(false) << "Should not be able to reach this code"; return nullptr; @@ -3324,14 +3324,10 @@ std::unique_ptr MakeDecoder(Type::type type_num, Encoding::type encodin } throw ParquetException("DELTA_BYTE_ARRAY only supports BYTE_ARRAY"); } else if (encoding == Encoding::DELTA_LENGTH_BYTE_ARRAY) { - switch (type_num) { - case Type::BYTE_ARRAY: - return std::make_unique(descr); - case Type::FIXED_LEN_BYTE_ARRAY: - return std::make_unique(descr); - default: - throw ParquetException("DELTA_LENGTH_BYTE_ARRAY only supports BYTE_ARRAY"); + if (type_num == Type::BYTE_ARRAY || type_num == Type::FIXED_LEN_BYTE_ARRAY) { + return std::make_unique(descr); } + throw ParquetException("DELTA_LENGTH_BYTE_ARRAY only supports BYTE_ARRAY"); } else if (encoding == Encoding::RLE) { if (type_num == Type::BOOLEAN) { return std::make_unique(descr); From 5adb9230009ea536af72a3fe74ab8f8bbf2058b5 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Sun, 29 Jan 2023 03:01:24 +0100 Subject: [PATCH 09/22] Review feedback --- cpp/src/parquet/column_reader.cc | 6 ++-- cpp/src/parquet/encoding.cc | 51 +++++++++++---------------- cpp/src/parquet/encoding.h | 3 +- cpp/src/parquet/encoding_benchmark.cc | 4 +-- cpp/src/parquet/encoding_test.cc | 24 ++++++------- 5 files changed, 39 insertions(+), 49 deletions(-) diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index 2691be588bd..3670af49fbf 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -1967,14 +1967,14 @@ class ByteArrayChunkedRecordReader : public TypedRecordReader, ::arrow::MemoryPool* pool) : TypedRecordReader(descr, leaf_info, pool) { DCHECK_EQ(descr_->physical_type(), Type::BYTE_ARRAY); - accumulator_.data_builder = std::make_unique<::arrow::BinaryBuilder>(pool); + accumulator_.builder = std::make_unique<::arrow::BinaryBuilder>(pool); } ::arrow::ArrayVector GetBuilderChunks() override { ::arrow::ArrayVector result = accumulator_.chunks; - if (result.size() == 0 || accumulator_.data_builder->length() > 0) { + if (result.size() == 0 || accumulator_.builder->length() > 0) { std::shared_ptr<::arrow::Array> last_chunk; - PARQUET_THROW_NOT_OK(accumulator_.data_builder->Finish(&last_chunk)); + PARQUET_THROW_NOT_OK(accumulator_.builder->Finish(&last_chunk)); result.push_back(std::move(last_chunk)); } accumulator_.chunks = {}; diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 2d65ffe6299..b8344bbed99 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -1233,14 +1233,14 @@ int PlainBooleanDecoder::Decode(bool* buffer, int max_values) { struct ArrowBinaryHelper { explicit ArrowBinaryHelper(typename EncodingTraits::Accumulator* out) { this->out = out; - this->data_builder = out->data_builder.get(); + this->builder = out->builder.get(); this->chunk_space_remaining = - ::arrow::kBinaryMemoryLimit - this->data_builder->value_data_length(); + ::arrow::kBinaryMemoryLimit - this->builder->value_data_length(); } Status PushChunk() { std::shared_ptr<::arrow::Array> result; - RETURN_NOT_OK(data_builder->Finish(&result)); + RETURN_NOT_OK(builder->Finish(&result)); out->chunks.push_back(result); chunk_space_remaining = ::arrow::kBinaryMemoryLimit; return Status::OK(); @@ -1250,20 +1250,20 @@ struct ArrowBinaryHelper { void UnsafeAppend(const uint8_t* data, int32_t length) { chunk_space_remaining -= length; - data_builder->UnsafeAppend(data, length); + builder->UnsafeAppend(data, length); } - void UnsafeAppendNull() { data_builder->UnsafeAppendNull(); } + void UnsafeAppendNull() { builder->UnsafeAppendNull(); } Status Append(const uint8_t* data, int32_t length) { chunk_space_remaining -= length; - return data_builder->Append(data, length); + return builder->Append(data, length); } - Status AppendNull() { return data_builder->AppendNull(); } + Status AppendNull() { return builder->AppendNull(); } typename EncodingTraits::Accumulator* out; - ::arrow::BinaryBuilder* data_builder; + ::arrow::BinaryBuilder* builder; int64_t chunk_space_remaining; }; @@ -1368,8 +1368,8 @@ class PlainByteArrayDecoder : public PlainDecoder, ArrowBinaryHelper helper(out); int values_decoded = 0; - RETURN_NOT_OK(helper.data_builder->Reserve(num_values)); - RETURN_NOT_OK(helper.data_builder->ReserveData( + RETURN_NOT_OK(helper.builder->Reserve(num_values)); + RETURN_NOT_OK(helper.builder->ReserveData( std::min(len_, helper.chunk_space_remaining))); int i = 0; @@ -1390,8 +1390,8 @@ class PlainByteArrayDecoder : public PlainDecoder, if (ARROW_PREDICT_FALSE(!helper.CanFit(value_len))) { // This element would exceed the capacity of a chunk RETURN_NOT_OK(helper.PushChunk()); - RETURN_NOT_OK(helper.data_builder->Reserve(num_values - i)); - RETURN_NOT_OK(helper.data_builder->ReserveData( + RETURN_NOT_OK(helper.builder->Reserve(num_values - i)); + RETURN_NOT_OK(helper.builder->ReserveData( std::min(len_, helper.chunk_space_remaining))); } helper.UnsafeAppend(data_ + 4, value_len); @@ -2601,16 +2601,6 @@ class DeltaLengthByteArrayEncoder : public EncoderImpl, void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits, int64_t valid_bits_offset) override; - void Put(const ByteArray& val) { - // Write the result to the output stream - const int64_t increment = static_cast(val.len + sizeof(uint32_t)); - if (ARROW_PREDICT_FALSE(sink_.length() + increment > sink_.capacity())) { - PARQUET_THROW_NOT_OK(sink_.Reserve(increment)); - } - DCHECK(val.len == 0 || val.ptr != nullptr) << "Value ptr cannot be NULL"; - sink_.UnsafeAppend(val.ptr, val.len); - } - protected: template void PutBinaryArray(const ArrayType& array) { @@ -2621,7 +2611,7 @@ class DeltaLengthByteArrayEncoder : public EncoderImpl, return Status::Invalid("Parquet cannot store strings with size 2GB or more"); } length_encoder_.Put({static_cast(view.length())}, 1); - Put(view); + PARQUET_THROW_NOT_OK(sink_.Append(view.data(), view.length())); return Status::OK(); }, []() { return Status::OK(); })); @@ -2664,6 +2654,8 @@ void DeltaLengthByteArrayEncoder::Put(const T* src, int num_values) { } PARQUET_THROW_NOT_OK(sink_.Reserve(encoded_size_)); + // TODO: replace UnsafeAppend with memcpy? + // memcpy(sink_.mutable_data() + sink_.length(), src, encoded_size_); for (int idx = 0; idx < num_values; idx++) { sink_.UnsafeAppend(src[idx].ptr, src[idx].len); } @@ -2694,7 +2686,7 @@ std::shared_ptr DeltaLengthByteArrayEncoder::FlushValues() { sink_.Reset(); PARQUET_THROW_NOT_OK(sink_.Append(encoded_lengths->data(), encoded_lengths->size())); - PARQUET_THROW_NOT_OK(sink_.Append(data->mutable_data(), data->size())); + PARQUET_THROW_NOT_OK(sink_.Append(data->data(), data->size())); std::shared_ptr buffer; PARQUET_THROW_NOT_OK(sink_.Finish(&buffer, true)); @@ -2807,7 +2799,7 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl, int* out_num_values) { ArrowBinaryHelper helper(out); - std::vector values(num_values); + std::vector values(num_values - null_count); const int num_valid_values = Decode(values.data(), num_values - null_count); DCHECK_EQ(num_values - null_count, num_valid_values); @@ -3261,13 +3253,12 @@ std::unique_ptr MakeEncoder(Type::type type_num, Encoding::type encodin } else if (encoding == Encoding::DELTA_LENGTH_BYTE_ARRAY) { switch (type_num) { case Type::BYTE_ARRAY: - return std::unique_ptr( - new DeltaLengthByteArrayEncoder(descr, pool)); + return std::make_unique>(descr, pool); case Type::FIXED_LEN_BYTE_ARRAY: - return std::unique_ptr( - new DeltaLengthByteArrayEncoder(descr, pool)); + return std::make_unique>(descr, pool); default: - throw ParquetException("DELTA_LENGTH_BYTE_ARRAY only supports BYTE_ARRAY"); + throw ParquetException( + "DELTA_LENGTH_BYTE_ARRAY only supports BYTE_ARRAY and FIXED_LEN_BYTE_ARRAY"); } } else { ParquetException::NYI("Selected encoding is not supported"); diff --git a/cpp/src/parquet/encoding.h b/cpp/src/parquet/encoding.h index 1505c0c3dae..8537d5b65b8 100644 --- a/cpp/src/parquet/encoding.h +++ b/cpp/src/parquet/encoding.h @@ -145,8 +145,7 @@ struct EncodingTraits { /// \brief Internal helper class for decoding BYTE_ARRAY data where we can /// overflow the capacity of a single arrow::BinaryArray struct Accumulator { - std::unique_ptr<::arrow::Int32Builder> offsets_builder; - std::unique_ptr<::arrow::BinaryBuilder> data_builder; + std::unique_ptr<::arrow::BinaryBuilder> builder; std::vector> chunks; }; using ArrowType = ::arrow::BinaryType; diff --git a/cpp/src/parquet/encoding_benchmark.cc b/cpp/src/parquet/encoding_benchmark.cc index fc37f326618..e6a3c2c58ca 100644 --- a/cpp/src/parquet/encoding_benchmark.cc +++ b/cpp/src/parquet/encoding_benchmark.cc @@ -698,7 +698,7 @@ class BenchmarkDecodeArrow : public ::benchmark::Fixture { for (auto _ : state) { auto decoder = InitializeDecoder(); typename EncodingTraits::Accumulator acc; - acc.data_builder.reset(new BinaryBuilder); + acc.builder.reset(new BinaryBuilder); decoder->DecodeArrow(num_values_, 0, valid_bits_, 0, &acc); } state.SetBytesProcessed(state.iterations() * total_size_); @@ -708,7 +708,7 @@ class BenchmarkDecodeArrow : public ::benchmark::Fixture { for (auto _ : state) { auto decoder = InitializeDecoder(); typename EncodingTraits::Accumulator acc; - acc.data_builder.reset(new BinaryBuilder); + acc.builder.reset(new BinaryBuilder); decoder->DecodeArrowNonNull(num_values_, &acc); } state.SetBytesProcessed(state.iterations() * total_size_); diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc index 66888d9485b..0cbbcc6956a 100644 --- a/cpp/src/parquet/encoding_test.cc +++ b/cpp/src/parquet/encoding_test.cc @@ -509,12 +509,12 @@ class TestArrowBuilderDecoding : public ::testing::Test { InitTestCase(np); typename EncodingTraits::Accumulator acc; - acc.data_builder.reset(new ::arrow::BinaryBuilder); + acc.builder.reset(new ::arrow::BinaryBuilder); auto actual_num_values = decoder_->DecodeArrow(num_values_, null_count_, valid_bits_, 0, &acc); std::shared_ptr<::arrow::Array> chunk; - ASSERT_OK(acc.data_builder->Finish(&chunk)); + ASSERT_OK(acc.builder->Finish(&chunk)); CheckDense(actual_num_values, *chunk); } } @@ -536,10 +536,10 @@ class TestArrowBuilderDecoding : public ::testing::Test { continue; } typename EncodingTraits::Accumulator acc; - acc.data_builder.reset(new ::arrow::BinaryBuilder); + acc.builder.reset(new ::arrow::BinaryBuilder); auto actual_num_values = decoder_->DecodeArrowNonNull(num_values_, &acc); std::shared_ptr<::arrow::Array> chunk; - ASSERT_OK(acc.data_builder->Finish(&chunk)); + ASSERT_OK(acc.builder->Finish(&chunk)); CheckDense(actual_num_values, *chunk); } } @@ -626,14 +626,14 @@ TEST(PlainEncodingAdHoc, ArrowBinaryDirectPut) { decoder->SetData(num_values, buf->data(), static_cast(buf->size())); typename EncodingTraits::Accumulator acc; - acc.data_builder.reset(new ::arrow::StringBuilder); + acc.builder.reset(new ::arrow::StringBuilder); ASSERT_EQ(num_values, decoder->DecodeArrow(static_cast(values->length()), static_cast(values->null_count()), values->null_bitmap_data(), values->offset(), &acc)); std::shared_ptr<::arrow::Array> result; - ASSERT_OK(acc.data_builder->Finish(&result)); + ASSERT_OK(acc.builder->Finish(&result)); ASSERT_EQ(50, result->length()); ::arrow::AssertArraysEqual(*values, *result); }; @@ -936,14 +936,14 @@ TEST(DictEncodingAdHoc, ArrowBinaryDirectPut) { GetDictDecoder(encoder, num_values, &buf, &dict_buf, nullptr, &decoder); typename EncodingTraits::Accumulator acc; - acc.data_builder.reset(new ::arrow::StringBuilder); + acc.builder.reset(new ::arrow::StringBuilder); ASSERT_EQ(num_values, decoder->DecodeArrow(static_cast(values->length()), static_cast(values->null_count()), values->null_bitmap_data(), values->offset(), &acc)); std::shared_ptr<::arrow::Array> result; - ASSERT_OK(acc.data_builder->Finish(&result)); + ASSERT_OK(acc.builder->Finish(&result)); ::arrow::AssertArraysEqual(*values, *result); } @@ -982,14 +982,14 @@ TEST(DictEncodingAdHoc, PutDictionaryPutIndices) { GetDictDecoder(encoder, num_values, &buf, &dict_buf, nullptr, &decoder); typename EncodingTraits::Accumulator acc; - acc.data_builder.reset(new ::arrow::BinaryBuilder); + acc.builder.reset(new ::arrow::BinaryBuilder); ASSERT_EQ(num_values, decoder->DecodeArrow(static_cast(expected->length()), static_cast(expected->null_count()), expected->null_bitmap_data(), expected->offset(), &acc)); std::shared_ptr<::arrow::Array> result; - ASSERT_OK(acc.data_builder->Finish(&result)); + ASSERT_OK(acc.builder->Finish(&result)); ::arrow::AssertArraysEqual(*expected, *result); }; @@ -1610,14 +1610,14 @@ TEST(DeltaLengthByteArrayEncodingAdHoc, ArrowBinaryDirectPut) { decoder->SetData(num_values, buf->data(), static_cast(buf->size())); typename EncodingTraits::Accumulator acc; - acc.data_builder.reset(new ::arrow::StringBuilder); + acc.builder.reset(new ::arrow::StringBuilder); ASSERT_EQ(num_values, decoder->DecodeArrow(static_cast(values->length()), static_cast(values->null_count()), values->null_bitmap_data(), values->offset(), &acc)); std::shared_ptr<::arrow::Array> result; - ASSERT_OK(acc.data_builder->Finish(&result)); + ASSERT_OK(acc.builder->Finish(&result)); ASSERT_EQ(50, result->length()); ::arrow::AssertArraysEqual(*values, *result); }; From 2469f71c90af7f7ea5763d896504ce86af728396 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Mon, 30 Jan 2023 01:39:14 +0100 Subject: [PATCH 10/22] Review feedback --- cpp/src/parquet/encoding.h | 1 - 1 file changed, 1 deletion(-) diff --git a/cpp/src/parquet/encoding.h b/cpp/src/parquet/encoding.h index 8537d5b65b8..374a02cf491 100644 --- a/cpp/src/parquet/encoding.h +++ b/cpp/src/parquet/encoding.h @@ -22,7 +22,6 @@ #include #include -#include "arrow/buffer_builder.h" #include "arrow/util/spaced.h" #include "parquet/exception.h" From 112699a2aeff157c3f60d116bcd70310ccff5ab9 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Mon, 30 Jan 2023 12:04:42 +0100 Subject: [PATCH 11/22] Update python/pyarrow/tests/parquet/test_basic.py Co-authored-by: Antoine Pitrou --- python/pyarrow/tests/parquet/test_basic.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/python/pyarrow/tests/parquet/test_basic.py b/python/pyarrow/tests/parquet/test_basic.py index 0fe0e47ac8b..5c0e1a077d0 100644 --- a/python/pyarrow/tests/parquet/test_basic.py +++ b/python/pyarrow/tests/parquet/test_basic.py @@ -392,8 +392,7 @@ def test_byte_stream_split(use_legacy_dataset): def test_column_encoding(use_legacy_dataset): arr_float = pa.array(list(map(float, range(100)))) arr_int = pa.array(list(map(int, range(100)))) - arr_bin = pa.array(list(map( - lambda x: bytes(str(x).zfill(8), "utf-8"), range(100))), pa.binary()) + arr_bin = pa.array([str(x) for x in range(100)]) mixed_table = pa.Table.from_arrays([arr_float, arr_int, arr_bin], names=['a', 'b', 'c']) From 1508a84b33c0acce2b181b7e05e3031cb38a8bdb Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Mon, 30 Jan 2023 13:57:25 +0100 Subject: [PATCH 12/22] Review feedback --- cpp/src/parquet/encoding.cc | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index b8344bbed99..6bedf26dda0 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -2628,7 +2628,6 @@ void DeltaLengthByteArrayEncoder::Put(const ::arrow::Array& values) { if (::arrow::is_binary_like(values.type_id())) { PutBinaryArray(checked_cast(values)); } else { - DCHECK(::arrow::is_large_binary_like(values.type_id())); PutBinaryArray(checked_cast(values)); } } @@ -2654,8 +2653,6 @@ void DeltaLengthByteArrayEncoder::Put(const T* src, int num_values) { } PARQUET_THROW_NOT_OK(sink_.Reserve(encoded_size_)); - // TODO: replace UnsafeAppend with memcpy? - // memcpy(sink_.mutable_data() + sink_.length(), src, encoded_size_); for (int idx = 0; idx < num_values; idx++) { sink_.UnsafeAppend(src[idx].ptr, src[idx].len); } @@ -2695,7 +2692,7 @@ std::shared_ptr DeltaLengthByteArrayEncoder::FlushValues() { } // ---------------------------------------------------------------------- -// DeltaByteArrayDecoder +// DeltaLengthByteArrayDecoder class DeltaLengthByteArrayDecoder : public DecoderImpl, virtual public TypedDecoder { @@ -2803,7 +2800,7 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl, const int num_valid_values = Decode(values.data(), num_values - null_count); DCHECK_EQ(num_values - null_count, num_valid_values); - auto values_ptr = reinterpret_cast(values.data()); + auto values_ptr = values.data(); int value_idx = 0; RETURN_NOT_OK(VisitNullBitmapInline( @@ -3315,7 +3312,7 @@ std::unique_ptr MakeDecoder(Type::type type_num, Encoding::type encodin } throw ParquetException("DELTA_BYTE_ARRAY only supports BYTE_ARRAY"); } else if (encoding == Encoding::DELTA_LENGTH_BYTE_ARRAY) { - if (type_num == Type::BYTE_ARRAY || type_num == Type::FIXED_LEN_BYTE_ARRAY) { + if (type_num == Type::BYTE_ARRAY) { return std::make_unique(descr); } throw ParquetException("DELTA_LENGTH_BYTE_ARRAY only supports BYTE_ARRAY"); From c2fb6f36e7f0c179053050c18756667651842a28 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Mon, 30 Jan 2023 14:14:56 +0100 Subject: [PATCH 13/22] Review feedback --- cpp/src/parquet/encoding.cc | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 6bedf26dda0..417e59f1664 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -2798,7 +2798,10 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl, std::vector values(num_values - null_count); const int num_valid_values = Decode(values.data(), num_values - null_count); - DCHECK_EQ(num_values - null_count, num_valid_values); + if (ARROW_PREDICT_TRUE(num_values - null_count != num_valid_values)) { + throw ParquetException("Expected to decode ", num_values - null_count, + " values, but decoded ", num_valid_values, " values."); + } auto values_ptr = values.data(); int value_idx = 0; From 3eab4394ec8c7c09000a7c509a85d833a2d970a4 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Mon, 30 Jan 2023 15:54:09 +0100 Subject: [PATCH 14/22] Review feedback --- cpp/src/parquet/encoding.cc | 5 +---- cpp/src/parquet/encoding_test.cc | 5 ++--- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 417e59f1664..d435e149528 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -3254,11 +3254,8 @@ std::unique_ptr MakeEncoder(Type::type type_num, Encoding::type encodin switch (type_num) { case Type::BYTE_ARRAY: return std::make_unique>(descr, pool); - case Type::FIXED_LEN_BYTE_ARRAY: - return std::make_unique>(descr, pool); default: - throw ParquetException( - "DELTA_LENGTH_BYTE_ARRAY only supports BYTE_ARRAY and FIXED_LEN_BYTE_ARRAY"); + throw ParquetException("DELTA_LENGTH_BYTE_ARRAY only supports BYTE_ARRAY"); } } else { ParquetException::NYI("Selected encoding is not supported"); diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc index 0cbbcc6956a..781e9a0accf 100644 --- a/cpp/src/parquet/encoding_test.cc +++ b/cpp/src/parquet/encoding_test.cc @@ -1595,14 +1595,13 @@ TEST(DeltaLengthByteArrayEncodingAdHoc, ArrowBinaryDirectPut) { const int32_t min_length = 0; const int32_t max_length = 10; const double null_probability = 0.25; + auto encoder = MakeTypedEncoder(Encoding::DELTA_LENGTH_BYTE_ARRAY); + auto decoder = MakeTypedDecoder(Encoding::DELTA_LENGTH_BYTE_ARRAY); auto CheckSeed = [&](int seed) { ::arrow::random::RandomArrayGenerator rag(seed); auto values = rag.String(size, min_length, max_length, null_probability); - auto encoder = MakeTypedEncoder(Encoding::DELTA_LENGTH_BYTE_ARRAY); - auto decoder = MakeTypedDecoder(Encoding::DELTA_LENGTH_BYTE_ARRAY); - ASSERT_NO_THROW(encoder->Put(*values)); auto buf = encoder->FlushValues(); From aeafc86aaffb8a0a2d97432b3057e8bb5713d425 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Mon, 30 Jan 2023 15:54:09 +0100 Subject: [PATCH 15/22] Review feedback --- cpp/src/parquet/encoding_test.cc | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc index 781e9a0accf..8c7dba67fdc 100644 --- a/cpp/src/parquet/encoding_test.cc +++ b/cpp/src/parquet/encoding_test.cc @@ -1584,6 +1584,7 @@ typedef ::testing::Types TestDeltaLengthByteArrayEncodingTypes; TYPED_TEST_SUITE(TestDeltaLengthByteArrayEncoding, TestDeltaLengthByteArrayEncodingTypes); TYPED_TEST(TestDeltaLengthByteArrayEncoding, BasicRoundTrip) { + ASSERT_NO_FATAL_FAILURE(this->Execute(0, 0)); ASSERT_NO_FATAL_FAILURE(this->Execute(2000, 200)); ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced( /*nvalues*/ 1234, /*repeats*/ 1, /*valid_bits_offset*/ 64, @@ -1598,7 +1599,7 @@ TEST(DeltaLengthByteArrayEncodingAdHoc, ArrowBinaryDirectPut) { auto encoder = MakeTypedEncoder(Encoding::DELTA_LENGTH_BYTE_ARRAY); auto decoder = MakeTypedDecoder(Encoding::DELTA_LENGTH_BYTE_ARRAY); - auto CheckSeed = [&](int seed) { + auto CheckSeed = [&](int seed, int64_t size) { ::arrow::random::RandomArrayGenerator rag(seed); auto values = rag.String(size, min_length, max_length, null_probability); @@ -1617,12 +1618,13 @@ TEST(DeltaLengthByteArrayEncodingAdHoc, ArrowBinaryDirectPut) { std::shared_ptr<::arrow::Array> result; ASSERT_OK(acc.builder->Finish(&result)); - ASSERT_EQ(50, result->length()); + ASSERT_EQ(size, result->length()); ::arrow::AssertArraysEqual(*values, *result); }; + CheckSeed(/*seed=*/42, /*size=*/0); for (auto seed : {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) { - CheckSeed(seed); + CheckSeed(seed, size); } } From bb95765473797fbd57d89da3629a019e39fd03a1 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Mon, 30 Jan 2023 19:52:41 +0100 Subject: [PATCH 16/22] Review feedback --- cpp/src/parquet/encoding.cc | 50 +++++++++++++++++++++++++++++++------ 1 file changed, 43 insertions(+), 7 deletions(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index d435e149528..60aff65b5fd 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -2143,6 +2143,7 @@ class DeltaBitPackEncoder : public EncoderImpl, virtual public TypedEncoder FlushValues() override; + std::shared_ptr FlushValuesInternal(size_t& offset_bytes); int64_t EstimatedDataEncodedSize() override { return sink_.length(); } @@ -2291,6 +2292,38 @@ std::shared_ptr DeltaBitPackEncoder::FlushValues() { return SliceBuffer(buffer, offset_bytes); } +template +std::shared_ptr DeltaBitPackEncoder::FlushValuesInternal( + size_t& offset_bytes) { + if (values_current_block_ > 0) { + FlushBlock(); + } + PARQUET_ASSIGN_OR_THROW(auto buffer, sink_.Finish(/*shrink_to_fit=*/true)); + + uint8_t header_buffer_[kMaxPageHeaderWriterSize] = {}; + bit_util::BitWriter header_writer(header_buffer_, sizeof(header_buffer_)); + if (!header_writer.PutVlqInt(values_per_block_) || + !header_writer.PutVlqInt(mini_blocks_per_block_) || + !header_writer.PutVlqInt(total_value_count_) || + !header_writer.PutZigZagVlqInt(static_cast(first_value_))) { + throw ParquetException("header writing error"); + } + header_writer.Flush(); + + // We reserved enough space at the beginning of the buffer for largest possible header + // and data was written immediately after. We now write the header data immediately + // before the end of reserved space. + offset_bytes = kMaxPageHeaderWriterSize - header_writer.bytes_written(); + std::memcpy(buffer->mutable_data() + offset_bytes, header_buffer_, + header_writer.bytes_written()); + + // Reset counter of cached values + total_value_count_ = 0; + // Reserve enough space at the beginning of the buffer for largest possible header. + PARQUET_THROW_NOT_OK(sink_.Advance(kMaxPageHeaderWriterSize)); + return reinterpret_cast&>(buffer); +} + template <> void DeltaBitPackEncoder::Put(const ::arrow::Array& values) { const ::arrow::ArrayData& data = *values.data(); @@ -2676,19 +2709,22 @@ void DeltaLengthByteArrayEncoder::PutSpaced(const T* src, int num_values, template std::shared_ptr DeltaLengthByteArrayEncoder::FlushValues() { - std::shared_ptr encoded_lengths = length_encoder_.FlushValues(); + size_t offset_bytes = 0; + std::shared_ptr encoded_lengths = + length_encoder_.FlushValuesInternal(offset_bytes); std::shared_ptr data; PARQUET_THROW_NOT_OK(sink_.Finish(&data)); sink_.Reset(); - PARQUET_THROW_NOT_OK(sink_.Append(encoded_lengths->data(), encoded_lengths->size())); - PARQUET_THROW_NOT_OK(sink_.Append(data->data(), data->size())); - - std::shared_ptr buffer; - PARQUET_THROW_NOT_OK(sink_.Finish(&buffer, true)); + const int64_t encoded_lengths_size = encoded_lengths->size(); + PARQUET_THROW_NOT_OK(encoded_lengths->Resize(encoded_lengths->size() + data->size())); + memcpy(encoded_lengths->mutable_data() + encoded_lengths_size, data->data(), + data->size()); encoded_size_ = 0; - return buffer; + + // Excess bytes at the beginning are sliced off and ignored. + return SliceBuffer(encoded_lengths, offset_bytes); } // ---------------------------------------------------------------------- From 3650276806f04b4c6c63b26c5d8c731ba1dfa177 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Wed, 1 Feb 2023 17:04:00 +0100 Subject: [PATCH 17/22] Update cpp/src/parquet/encoding.cc Co-authored-by: Gang Wu --- cpp/src/parquet/encoding.cc | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 60aff65b5fd..28d479b6c09 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -2143,7 +2143,7 @@ class DeltaBitPackEncoder : public EncoderImpl, virtual public TypedEncoder FlushValues() override; - std::shared_ptr FlushValuesInternal(size_t& offset_bytes); + std::shared_ptr FlushValuesInternal(size_t* offset_bytes); int64_t EstimatedDataEncodedSize() override { return sink_.length(); } @@ -2294,7 +2294,7 @@ std::shared_ptr DeltaBitPackEncoder::FlushValues() { template std::shared_ptr DeltaBitPackEncoder::FlushValuesInternal( - size_t& offset_bytes) { + size_t* offset_bytes) { if (values_current_block_ > 0) { FlushBlock(); } @@ -2313,8 +2313,8 @@ std::shared_ptr DeltaBitPackEncoder::FlushValuesInternal // We reserved enough space at the beginning of the buffer for largest possible header // and data was written immediately after. We now write the header data immediately // before the end of reserved space. - offset_bytes = kMaxPageHeaderWriterSize - header_writer.bytes_written(); - std::memcpy(buffer->mutable_data() + offset_bytes, header_buffer_, + *offset_bytes = kMaxPageHeaderWriterSize - header_writer.bytes_written(); + std::memcpy(buffer->mutable_data() + *offset_bytes, header_buffer_, header_writer.bytes_written()); // Reset counter of cached values @@ -2711,7 +2711,7 @@ template std::shared_ptr DeltaLengthByteArrayEncoder::FlushValues() { size_t offset_bytes = 0; std::shared_ptr encoded_lengths = - length_encoder_.FlushValuesInternal(offset_bytes); + length_encoder_.FlushValuesInternal(&offset_bytes); std::shared_ptr data; PARQUET_THROW_NOT_OK(sink_.Finish(&data)); From 621385861ac19b05fc3890a57e31d25bc0726df3 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Wed, 1 Feb 2023 18:31:49 +0100 Subject: [PATCH 18/22] Reverting FlushValuesInternal --- cpp/src/parquet/encoding.cc | 50 ++++++------------------------------- 1 file changed, 7 insertions(+), 43 deletions(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 28d479b6c09..d435e149528 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -2143,7 +2143,6 @@ class DeltaBitPackEncoder : public EncoderImpl, virtual public TypedEncoder FlushValues() override; - std::shared_ptr FlushValuesInternal(size_t* offset_bytes); int64_t EstimatedDataEncodedSize() override { return sink_.length(); } @@ -2292,38 +2291,6 @@ std::shared_ptr DeltaBitPackEncoder::FlushValues() { return SliceBuffer(buffer, offset_bytes); } -template -std::shared_ptr DeltaBitPackEncoder::FlushValuesInternal( - size_t* offset_bytes) { - if (values_current_block_ > 0) { - FlushBlock(); - } - PARQUET_ASSIGN_OR_THROW(auto buffer, sink_.Finish(/*shrink_to_fit=*/true)); - - uint8_t header_buffer_[kMaxPageHeaderWriterSize] = {}; - bit_util::BitWriter header_writer(header_buffer_, sizeof(header_buffer_)); - if (!header_writer.PutVlqInt(values_per_block_) || - !header_writer.PutVlqInt(mini_blocks_per_block_) || - !header_writer.PutVlqInt(total_value_count_) || - !header_writer.PutZigZagVlqInt(static_cast(first_value_))) { - throw ParquetException("header writing error"); - } - header_writer.Flush(); - - // We reserved enough space at the beginning of the buffer for largest possible header - // and data was written immediately after. We now write the header data immediately - // before the end of reserved space. - *offset_bytes = kMaxPageHeaderWriterSize - header_writer.bytes_written(); - std::memcpy(buffer->mutable_data() + *offset_bytes, header_buffer_, - header_writer.bytes_written()); - - // Reset counter of cached values - total_value_count_ = 0; - // Reserve enough space at the beginning of the buffer for largest possible header. - PARQUET_THROW_NOT_OK(sink_.Advance(kMaxPageHeaderWriterSize)); - return reinterpret_cast&>(buffer); -} - template <> void DeltaBitPackEncoder::Put(const ::arrow::Array& values) { const ::arrow::ArrayData& data = *values.data(); @@ -2709,22 +2676,19 @@ void DeltaLengthByteArrayEncoder::PutSpaced(const T* src, int num_values, template std::shared_ptr DeltaLengthByteArrayEncoder::FlushValues() { - size_t offset_bytes = 0; - std::shared_ptr encoded_lengths = - length_encoder_.FlushValuesInternal(&offset_bytes); + std::shared_ptr encoded_lengths = length_encoder_.FlushValues(); std::shared_ptr data; PARQUET_THROW_NOT_OK(sink_.Finish(&data)); sink_.Reset(); - const int64_t encoded_lengths_size = encoded_lengths->size(); - PARQUET_THROW_NOT_OK(encoded_lengths->Resize(encoded_lengths->size() + data->size())); - memcpy(encoded_lengths->mutable_data() + encoded_lengths_size, data->data(), - data->size()); - encoded_size_ = 0; + PARQUET_THROW_NOT_OK(sink_.Append(encoded_lengths->data(), encoded_lengths->size())); + PARQUET_THROW_NOT_OK(sink_.Append(data->data(), data->size())); - // Excess bytes at the beginning are sliced off and ignored. - return SliceBuffer(encoded_lengths, offset_bytes); + std::shared_ptr buffer; + PARQUET_THROW_NOT_OK(sink_.Finish(&buffer, true)); + encoded_size_ = 0; + return buffer; } // ---------------------------------------------------------------------- From b8e1dc00a013e8f1c53ba071ac901c96fa57c045 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Thu, 2 Feb 2023 19:17:11 +0100 Subject: [PATCH 19/22] Apply suggestions from code review Co-authored-by: Antoine Pitrou --- cpp/src/parquet/encoding.cc | 1 + cpp/src/parquet/encoding_test.cc | 1 + 2 files changed, 2 insertions(+) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index d435e149528..7077e4e7632 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -2682,6 +2682,7 @@ std::shared_ptr DeltaLengthByteArrayEncoder::FlushValues() { PARQUET_THROW_NOT_OK(sink_.Finish(&data)); sink_.Reset(); + PARQUET_THROW_NOT_OK(sink_.Resize(encoded_lengths->size() + data->size())); PARQUET_THROW_NOT_OK(sink_.Append(encoded_lengths->data(), encoded_lengths->size())); PARQUET_THROW_NOT_OK(sink_.Append(data->data(), data->size())); diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc index 8c7dba67fdc..e4102a9052f 100644 --- a/cpp/src/parquet/encoding_test.cc +++ b/cpp/src/parquet/encoding_test.cc @@ -1619,6 +1619,7 @@ TEST(DeltaLengthByteArrayEncodingAdHoc, ArrowBinaryDirectPut) { std::shared_ptr<::arrow::Array> result; ASSERT_OK(acc.builder->Finish(&result)); ASSERT_EQ(size, result->length()); + ASSERT_OK(result->ValidateFull()); ::arrow::AssertArraysEqual(*values, *result); }; From eebc0b0d9a7f77ab897662e56cd29b1d12501062 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Thu, 2 Feb 2023 19:32:11 +0100 Subject: [PATCH 20/22] Review feedback Co-authored-by: Will Jones --- cpp/src/parquet/encoding_test.cc | 30 ++++++++++++++++-------------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc index e4102a9052f..94421713c91 100644 --- a/cpp/src/parquet/encoding_test.cc +++ b/cpp/src/parquet/encoding_test.cc @@ -301,7 +301,8 @@ class TestPlainEncoding : public TestEncodingBase { static constexpr int TYPE = Type::type_num; virtual void CheckRoundtrip() { - auto encoder = MakeTypedEncoder(Encoding::PLAIN, false, descr_.get()); + auto encoder = + MakeTypedEncoder(Encoding::PLAIN, /*use_dictionary=*/false, descr_.get()); auto decoder = MakeTypedDecoder(Encoding::PLAIN, descr_.get()); encoder->Put(draws_, num_values_); encode_buffer_ = encoder->FlushValues(); @@ -314,7 +315,8 @@ class TestPlainEncoding : public TestEncodingBase { } void CheckRoundtripSpaced(const uint8_t* valid_bits, int64_t valid_bits_offset) { - auto encoder = MakeTypedEncoder(Encoding::PLAIN, false, descr_.get()); + auto encoder = + MakeTypedEncoder(Encoding::PLAIN, /*use_dictionary=*/false, descr_.get()); auto decoder = MakeTypedDecoder(Encoding::PLAIN, descr_.get()); int null_count = 0; for (auto i = 0; i < num_values_; i++) { @@ -1103,8 +1105,8 @@ class TestByteStreamSplitEncoding : public TestEncodingBase { static constexpr int TYPE = Type::type_num; void CheckRoundtrip() override { - auto encoder = - MakeTypedEncoder(Encoding::BYTE_STREAM_SPLIT, false, descr_.get()); + auto encoder = MakeTypedEncoder(Encoding::BYTE_STREAM_SPLIT, + /*use_dictionary=*/false, descr_.get()); auto decoder = MakeTypedDecoder(Encoding::BYTE_STREAM_SPLIT, descr_.get()); encoder->Put(draws_, num_values_); encode_buffer_ = encoder->FlushValues(); @@ -1380,8 +1382,8 @@ class TestDeltaBitPackEncoding : public TestEncodingBase { } void CheckRoundtrip() override { - auto encoder = - MakeTypedEncoder(Encoding::DELTA_BINARY_PACKED, false, descr_.get()); + auto encoder = MakeTypedEncoder(Encoding::DELTA_BINARY_PACKED, + /*use_dictionary=*/false, descr_.get()); // Encode a number of times to exercise the flush logic for (size_t i = 0; i < kNumRoundTrips; ++i) { encoder->Put(draws_, num_values_); @@ -1392,8 +1394,8 @@ class TestDeltaBitPackEncoding : public TestEncodingBase { void CheckRoundtripSpaced(const uint8_t* valid_bits, int64_t valid_bits_offset) override { - auto encoder = - MakeTypedEncoder(Encoding::DELTA_BINARY_PACKED, false, descr_.get()); + auto encoder = MakeTypedEncoder(Encoding::DELTA_BINARY_PACKED, + /*use_dictionary=*/false, descr_.get()); auto decoder = MakeTypedDecoder(Encoding::DELTA_BINARY_PACKED, descr_.get()); int null_count = 0; for (auto i = 0; i < num_values_; i++) { @@ -1486,8 +1488,8 @@ TYPED_TEST(TestDeltaBitPackEncoding, NonZeroPaddedMiniblockBitWidth) { this->InitBoundData(num_values, /*repeats=*/1, /*half_range=*/31); ASSERT_EQ(this->num_values_, num_values); - auto encoder = MakeTypedEncoder(Encoding::DELTA_BINARY_PACKED, false, - this->descr_.get()); + auto encoder = MakeTypedEncoder( + Encoding::DELTA_BINARY_PACKED, /*use_dictionary=*/false, this->descr_.get()); encoder->Put(this->draws_, this->num_values_); auto encoded = encoder->FlushValues(); const auto encoded_size = encoded->size(); @@ -1538,8 +1540,8 @@ class TestDeltaLengthByteArrayEncoding : public TestEncodingBase { static constexpr int TYPE = Type::type_num; virtual void CheckRoundtrip() { - auto encoder = - MakeTypedEncoder(Encoding::DELTA_LENGTH_BYTE_ARRAY, false, descr_.get()); + auto encoder = MakeTypedEncoder(Encoding::DELTA_LENGTH_BYTE_ARRAY, + /*use_dictionary=*/false, descr_.get()); auto decoder = MakeTypedDecoder(Encoding::DELTA_LENGTH_BYTE_ARRAY, descr_.get()); @@ -1554,8 +1556,8 @@ class TestDeltaLengthByteArrayEncoding : public TestEncodingBase { } void CheckRoundtripSpaced(const uint8_t* valid_bits, int64_t valid_bits_offset) { - auto encoder = - MakeTypedEncoder(Encoding::DELTA_LENGTH_BYTE_ARRAY, false, descr_.get()); + auto encoder = MakeTypedEncoder(Encoding::DELTA_LENGTH_BYTE_ARRAY, + /*use_dictionary=*/false, descr_.get()); auto decoder = MakeTypedDecoder(Encoding::DELTA_LENGTH_BYTE_ARRAY, descr_.get()); int null_count = 0; From 44282c4cd2bb82f3429b758cf3020f03c2df141a Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Fri, 10 Feb 2023 04:43:59 +0100 Subject: [PATCH 21/22] New tests for string and binary types --- cpp/src/parquet/encoding.cc | 2 +- cpp/src/parquet/encoding_test.cc | 114 ++++++++++++++++++++++++++++--- 2 files changed, 106 insertions(+), 10 deletions(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 7077e4e7632..75c579c08ce 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -3254,7 +3254,7 @@ std::unique_ptr MakeEncoder(Type::type type_num, Encoding::type encodin } else if (encoding == Encoding::DELTA_LENGTH_BYTE_ARRAY) { switch (type_num) { case Type::BYTE_ARRAY: - return std::make_unique>(descr, pool); + return std::make_unique>(descr, pool); default: throw ParquetException("DELTA_LENGTH_BYTE_ARRAY only supports BYTE_ARRAY"); } diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc index 94421713c91..00fffbb1e8b 100644 --- a/cpp/src/parquet/encoding_test.cc +++ b/cpp/src/parquet/encoding_test.cc @@ -26,6 +26,7 @@ #include "arrow/array.h" #include "arrow/array/builder_binary.h" #include "arrow/array/builder_dict.h" +#include "arrow/compute/cast.h" #include "arrow/testing/gtest_util.h" #include "arrow/testing/random.h" #include "arrow/testing/util.h" @@ -35,7 +36,6 @@ #include "arrow/util/checked_cast.h" #include "arrow/util/endian.h" #include "arrow/util/string.h" - #include "parquet/encoding.h" #include "parquet/platform.h" #include "parquet/schema.h" @@ -910,6 +910,7 @@ TYPED_TEST(EncodingAdHocTyped, ByteStreamSplitArrowDirectPut) { } TYPED_TEST(EncodingAdHocTyped, DeltaBitPackArrowDirectPut) { + // TODO: test with nulls once DeltaBitPackDecoder::DecodeArrow supports them this->null_probability_ = 0; for (auto seed : {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) { this->DeltaBitPack(seed); @@ -1593,18 +1594,32 @@ TYPED_TEST(TestDeltaLengthByteArrayEncoding, BasicRoundTrip) { /*null_probability*/ 0.1)); } +std::shared_ptr<::arrow::Array> CastBinaryTypesHelper( + std::shared_ptr<::arrow::Array> result, std::shared_ptr<::arrow::DataType> type) { + if (::arrow::is_large_binary_like(type->id())) { + ::arrow::compute::CastOptions options; + if (::arrow::is_string(type->id())) { + options.to_type = ::arrow::large_utf8(); + } else { + options.to_type = ::arrow::large_binary(); + } + EXPECT_OK_AND_ASSIGN( + auto tmp, CallFunction("cast", {::arrow::Datum{result}}, &options, nullptr)); + result = tmp.make_array(); + } + return result; +} + TEST(DeltaLengthByteArrayEncodingAdHoc, ArrowBinaryDirectPut) { const int64_t size = 50; const int32_t min_length = 0; const int32_t max_length = 10; + const int32_t num_unique = 10; const double null_probability = 0.25; auto encoder = MakeTypedEncoder(Encoding::DELTA_LENGTH_BYTE_ARRAY); auto decoder = MakeTypedDecoder(Encoding::DELTA_LENGTH_BYTE_ARRAY); - auto CheckSeed = [&](int seed, int64_t size) { - ::arrow::random::RandomArrayGenerator rag(seed); - auto values = rag.String(size, min_length, max_length, null_probability); - + auto CheckSeed = [&](std::shared_ptr<::arrow::Array> values) { ASSERT_NO_THROW(encoder->Put(*values)); auto buf = encoder->FlushValues(); @@ -1612,7 +1627,11 @@ TEST(DeltaLengthByteArrayEncodingAdHoc, ArrowBinaryDirectPut) { decoder->SetData(num_values, buf->data(), static_cast(buf->size())); typename EncodingTraits::Accumulator acc; - acc.builder.reset(new ::arrow::StringBuilder); + if (::arrow::is_string(values->type()->id())) { + acc.builder = std::make_unique<::arrow::StringBuilder>(); + } else { + acc.builder = std::make_unique<::arrow::BinaryBuilder>(); + } ASSERT_EQ(num_values, decoder->DecodeArrow(static_cast(values->length()), static_cast(values->null_count()), @@ -1620,16 +1639,93 @@ TEST(DeltaLengthByteArrayEncodingAdHoc, ArrowBinaryDirectPut) { std::shared_ptr<::arrow::Array> result; ASSERT_OK(acc.builder->Finish(&result)); - ASSERT_EQ(size, result->length()); + ASSERT_EQ(values->length(), result->length()); ASSERT_OK(result->ValidateFull()); + + auto upcast_result = CastBinaryTypesHelper(result, values->type()); ::arrow::AssertArraysEqual(*values, *result); }; - CheckSeed(/*seed=*/42, /*size=*/0); + ::arrow::random::RandomArrayGenerator rag(42); + auto values = rag.String(0, min_length, max_length, null_probability); + CheckSeed(values); for (auto seed : {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) { - CheckSeed(seed, size); + rag = ::arrow::random::RandomArrayGenerator(seed); + + values = rag.String(size, min_length, max_length, null_probability); + CheckSeed(values); + + values = + rag.BinaryWithRepeats(size, num_unique, min_length, max_length, null_probability); + CheckSeed(values); } } +TEST(DeltaLengthByteArrayEncodingAdHoc, ArrowDirectPut) { + auto CheckEncode = [](std::shared_ptr<::arrow::Array> values, + std::shared_ptr<::arrow::Array> lengths) { + auto encoder = MakeTypedEncoder(Encoding::DELTA_LENGTH_BYTE_ARRAY); + ASSERT_NO_THROW(encoder->Put(*values)); + auto buf = encoder->FlushValues(); + + auto lengths_encoder = MakeTypedEncoder(Encoding::DELTA_BINARY_PACKED); + ASSERT_NO_THROW(lengths_encoder->Put(*lengths)); + auto lengths_buf = lengths_encoder->FlushValues(); + + auto encoded_lengths_buf = SliceBuffer(buf, 0, lengths_buf->size()); + auto encoded_values_buf = SliceBuffer(buf, lengths_buf->size()); + + ASSERT_TRUE(encoded_lengths_buf->Equals(*lengths_buf)); + ASSERT_TRUE(encoded_values_buf->Equals(*values->data()->buffers[2])); + }; + + auto CheckDecode = [](std::shared_ptr buf, + std::shared_ptr<::arrow::Array> values) { + int num_values = static_cast(values->length()); + auto decoder = MakeTypedDecoder(Encoding::DELTA_LENGTH_BYTE_ARRAY); + decoder->SetData(num_values, buf->data(), static_cast(buf->size())); + + typename EncodingTraits::Accumulator acc; + if (::arrow::is_string(values->type()->id())) { + acc.builder = std::make_unique<::arrow::StringBuilder>(); + } else { + acc.builder = std::make_unique<::arrow::BinaryBuilder>(); + } + + ASSERT_EQ(num_values, + decoder->DecodeArrow(static_cast(values->length()), + static_cast(values->null_count()), + values->null_bitmap_data(), values->offset(), &acc)); + + std::shared_ptr<::arrow::Array> result; + ASSERT_OK(acc.builder->Finish(&result)); + ASSERT_EQ(num_values, result->length()); + ASSERT_OK(result->ValidateFull()); + + auto upcast_result = CastBinaryTypesHelper(result, values->type()); + ::arrow::AssertArraysEqual(*values, *upcast_result); + }; + + auto values = R"(["Hello", "World", "Foobar", "ADBCEF"])"; + auto lengths = ::arrow::ArrayFromJSON(::arrow::int32(), R"([5, 5, 6, 6])"); + + CheckEncode(::arrow::ArrayFromJSON(::arrow::utf8(), values), lengths); + CheckEncode(::arrow::ArrayFromJSON(::arrow::large_utf8(), values), lengths); + CheckEncode(::arrow::ArrayFromJSON(::arrow::binary(), values), lengths); + CheckEncode(::arrow::ArrayFromJSON(::arrow::large_binary(), values), lengths); + + const uint8_t data[] = { + 0x80, 0x01, 0x04, 0x04, 0x0a, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0x00, + 0x00, 0x00, 0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x57, 0x6f, 0x72, 0x6c, 0x64, + 0x46, 0x6f, 0x6f, 0x62, 0x61, 0x72, 0x41, 0x44, 0x42, 0x43, 0x45, 0x46, + }; + auto encoded = Buffer::Wrap(data, sizeof(data)); + + CheckDecode(encoded, ::arrow::ArrayFromJSON(::arrow::utf8(), values)); + CheckDecode(encoded, ::arrow::ArrayFromJSON(::arrow::large_utf8(), values)); + CheckDecode(encoded, ::arrow::ArrayFromJSON(::arrow::binary(), values)); + CheckDecode(encoded, ::arrow::ArrayFromJSON(::arrow::large_binary(), values)); +} + } // namespace test } // namespace parquet From b3cf11e96fa42c3d7dba611c9316e6ad99d597fe Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Thu, 16 Feb 2023 13:32:35 +0100 Subject: [PATCH 22/22] Update cpp/src/parquet/encoding.cc Co-authored-by: Gang Wu --- cpp/src/parquet/encoding.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 75c579c08ce..f227a8d1938 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -2799,7 +2799,7 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl, std::vector values(num_values - null_count); const int num_valid_values = Decode(values.data(), num_values - null_count); - if (ARROW_PREDICT_TRUE(num_values - null_count != num_valid_values)) { + if (ARROW_PREDICT_FALSE(num_values - null_count != num_valid_values)) { throw ParquetException("Expected to decode ", num_values - null_count, " values, but decoded ", num_valid_values, " values."); }