From a982bc1f4ee93450b87f09d388945b4f59774eb1 Mon Sep 17 00:00:00 2001 From: mwish Date: Sat, 7 Jan 2023 19:40:57 +0800 Subject: [PATCH 01/15] Init commit --- cpp/src/parquet/encoding.cc | 21 +++++++++++++------ cpp/src/parquet/encoding_test.cc | 35 ++++++++++++++++++++++++++++++++ 2 files changed, 50 insertions(+), 6 deletions(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index b9472d72aeb..327ffada1c5 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -2453,16 +2453,25 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoderGetZigZagVlqInt(&min_delta_)) ParquetException::EofException(); // read the bitwidth of each miniblock + uint32_t miniblock_values_sum = 0; uint8_t* bit_width_data = delta_bit_widths_->mutable_data(); - for (uint32_t i = 0; i < mini_blocks_per_block_; ++i) { - if (!decoder_->GetAligned(1, bit_width_data + i)) { + for (uint32_t current_mini_block_idx = 0; current_mini_block_idx < mini_blocks_per_block_; ++current_mini_block_idx) { + if (!decoder_->GetAligned(1, bit_width_data + current_mini_block_idx)) { ParquetException::EofException(); } - if (bit_width_data[i] > kMaxDeltaBitWidth) { - throw ParquetException("delta bit width " + std::to_string(bit_width_data[i]) + - " larger than integer bit width " + - std::to_string(kMaxDeltaBitWidth)); + + if (bit_width_data[current_mini_block_idx] > kMaxDeltaBitWidth) { + if (miniblock_values_sum < total_value_count_) { + throw ParquetException( + "delta bit width " + + std::to_string( + bit_width_data[current_mini_block_idx * values_current_mini_block_]) + + " larger than integer bit width " + std::to_string(kMaxDeltaBitWidth)); + } else { + break; + } } + miniblock_values_sum += values_current_mini_block_; } mini_block_idx_ = 0; delta_bit_width_ = bit_width_data[0]; diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc index b8363d29cdb..4b84cf3db49 100644 --- a/cpp/src/parquet/encoding_test.cc +++ b/cpp/src/parquet/encoding_test.cc @@ -1427,5 +1427,40 @@ TYPED_TEST(TestDeltaBitPackEncoding, BasicRoundTrip) { } } +class DeltaBitPackEncoding : public TestArrowBuilderDecoding { + public: + void SetupEncoderDecoder() override {} +}; + +TEST_F(DeltaBitPackEncoding, MalfordMiniblockBitWidth) { + std::shared_ptr descr_ = ExampleDescr(); + auto decoder = MakeTypedDecoder(Encoding::DELTA_BINARY_PACKED, descr_.get()); + using c_type = parquet::Int32Type::c_type; + + unsigned char good_data[] = "\200\001\004A\237\224\316\362\r\242\220\203- "; + int encode_buffer_size = 273; + int num_values = 65; + std::vector output_bytes = std::vector(num_values * sizeof(c_type)); + auto decode_buf = reinterpret_cast(output_bytes.data()); + decoder->SetData(num_values, &good_data[0], encode_buffer_size); + int values_decoded = decoder->Decode(decode_buf, num_values); + ASSERT_EQ(num_values, values_decoded); + + unsigned char bad_data[] = + "\200\001\004A\237\224\316\362\r\242\220\203- " + "\245\245\304;`\210'\313\r\270D\316\306h㖀~\372\255\360A\254}\211L\343\373_" + "\034®\312Y\036\233<\203\035P\202)\307Y\356\327\024\302!\232\036," + "\271\b\331\353\037e\333\332\315Crm\203\350בOo\001\347\305Z\203G\037\263Y\254\366_" + "\"\v\276\242Y\002\374\300\226\231\252C\240\363ۙ\r\334E\314\f\002\255\227\273\307" + "\305'\"\033\235\374\250\243\244F\266\254\350\203\304U\036X\331&\210/" + "\037\322\321s.\031e/" + "\232\340\363\366\306\030\243,5\337\031\005bw\021\017wj\003#Q`\371ʉ\323\300+~=" + "\232W\232\374p\336$\022\211VQ\237>\v1gە'\224\207\262f\247h\363A!" + "\255\271f\026\274\033_\333)4"; + output_bytes = std::vector(num_values * sizeof(c_type)); + decode_buf = reinterpret_cast(output_bytes.data()); + decoder->SetData(num_values, &bad_data[0], encode_buffer_size); +} + } // namespace test } // namespace parquet From b17392e0610a7c40a85cf47bf21fd30e2e82512a Mon Sep 17 00:00:00 2001 From: mwish Date: Sat, 7 Jan 2023 22:10:30 +0800 Subject: [PATCH 02/15] Update and fix test --- cpp/src/parquet/encoding.cc | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 327ffada1c5..af3d05d6f47 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -2445,7 +2445,8 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoderGetZigZagVlqInt(&min_delta_)) ParquetException::EofException(); // read the bitwidth of each miniblock - uint32_t miniblock_values_sum = 0; uint8_t* bit_width_data = delta_bit_widths_->mutable_data(); + uint32_t miniblock_values_sum = value_sum_up_to_current_block_; for (uint32_t current_mini_block_idx = 0; current_mini_block_idx < mini_blocks_per_block_; ++current_mini_block_idx) { if (!decoder_->GetAligned(1, bit_width_data + current_mini_block_idx)) { ParquetException::EofException(); } if (bit_width_data[current_mini_block_idx] > kMaxDeltaBitWidth) { - if (miniblock_values_sum < total_value_count_) { + if (miniblock_values_sum <= total_value_count_) { throw ParquetException( "delta bit width " + std::to_string( - bit_width_data[current_mini_block_idx * values_current_mini_block_]) + + bit_width_data[current_mini_block_idx]) + " larger than integer bit width " + std::to_string(kMaxDeltaBitWidth)); } else { + // according to the parquet standard, we should ignore the bit_width_data here. break; } } - miniblock_values_sum += values_current_mini_block_; + miniblock_values_sum += values_per_mini_block_; } + value_sum_up_to_current_block_ = std::min(miniblock_values_sum, total_value_count_); mini_block_idx_ = 0; delta_bit_width_ = bit_width_data[0]; values_current_mini_block_ = values_per_mini_block_; - block_initialized_ = true; + first_block_initialized_ = true; } int GetInternal(T* buffer, int max_values) { @@ -2488,9 +2491,11 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder delta_bit_widths_; From bf088a74f40f6a1e1dafb35704eb79f6bb4e2bb9 Mon Sep 17 00:00:00 2001 From: mwish Date: Sat, 7 Jan 2023 22:12:51 +0800 Subject: [PATCH 03/15] add PREDICT_FALSE because it is merely touched --- cpp/src/parquet/encoding.cc | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index af3d05d6f47..ca2b478a0af 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -2456,18 +2456,19 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecodermutable_data(); uint32_t miniblock_values_sum = value_sum_up_to_current_block_; - for (uint32_t current_mini_block_idx = 0; current_mini_block_idx < mini_blocks_per_block_; ++current_mini_block_idx) { + for (uint32_t current_mini_block_idx = 0; + current_mini_block_idx < mini_blocks_per_block_; ++current_mini_block_idx) { if (!decoder_->GetAligned(1, bit_width_data + current_mini_block_idx)) { ParquetException::EofException(); } - if (bit_width_data[current_mini_block_idx] > kMaxDeltaBitWidth) { + if (ARROW_PREDICT_FALSE(bit_width_data[current_mini_block_idx] > + kMaxDeltaBitWidth)) { if (miniblock_values_sum <= total_value_count_) { - throw ParquetException( - "delta bit width " + - std::to_string( - bit_width_data[current_mini_block_idx]) + - " larger than integer bit width " + std::to_string(kMaxDeltaBitWidth)); + throw ParquetException("delta bit width " + + std::to_string(bit_width_data[current_mini_block_idx]) + + " larger than integer bit width " + + std::to_string(kMaxDeltaBitWidth)); } else { // according to the parquet standard, we should ignore the bit_width_data here. break; From b21057477b744697048c526a501516578c797d44 Mon Sep 17 00:00:00 2001 From: mwish Date: Sat, 7 Jan 2023 22:17:00 +0800 Subject: [PATCH 04/15] add some comments --- cpp/src/arrow/util/bit_stream_utils.h | 2 +- cpp/src/parquet/encoding.cc | 18 +++++++++++------- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/cpp/src/arrow/util/bit_stream_utils.h b/cpp/src/arrow/util/bit_stream_utils.h index dc9b41793cf..0b8b975d8a7 100644 --- a/cpp/src/arrow/util/bit_stream_utils.h +++ b/cpp/src/arrow/util/bit_stream_utils.h @@ -318,7 +318,7 @@ inline bool BitReader::GetValue(int num_bits, T* v) { template inline int BitReader::GetBatch(int num_bits, T* v, int batch_size) { DCHECK(buffer_ != NULL); - DCHECK_LE(num_bits, static_cast(sizeof(T) * 8)); + DCHECK_LE(num_bits, static_cast(sizeof(T) * 8)) << "num_bits: " << num_bits; int bit_offset = bit_offset_; int byte_offset = byte_offset_; diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index ca2b478a0af..aadb87d8eb8 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -2446,7 +2446,7 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecodermutable_data(); - uint32_t miniblock_values_sum = value_sum_up_to_current_block_; + uint32_t miniblock_values_sum = values_num_up_to_current_block_; for (uint32_t current_mini_block_idx = 0; current_mini_block_idx < mini_blocks_per_block_; ++current_mini_block_idx) { if (!decoder_->GetAligned(1, bit_width_data + current_mini_block_idx)) { @@ -2476,7 +2476,7 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder delta_bit_widths_; From 361e84dded1a4d8b8c603d52baa26643119ce370 Mon Sep 17 00:00:00 2001 From: mwish Date: Sun, 8 Jan 2023 00:02:31 +0800 Subject: [PATCH 05/15] [Update] Comment `good_data` and fix a equal condition ( using < instead of <= for corner case ) --- cpp/src/arrow/util/bit_stream_utils.h | 5 ++--- cpp/src/parquet/encoding.cc | 7 +++---- cpp/src/parquet/encoding_test.cc | 18 +++++++++++++----- 3 files changed, 18 insertions(+), 12 deletions(-) diff --git a/cpp/src/arrow/util/bit_stream_utils.h b/cpp/src/arrow/util/bit_stream_utils.h index 0b8b975d8a7..774d0df54b4 100644 --- a/cpp/src/arrow/util/bit_stream_utils.h +++ b/cpp/src/arrow/util/bit_stream_utils.h @@ -19,10 +19,9 @@ #pragma once -#include - #include #include +#include #include "arrow/util/bit_util.h" #include "arrow/util/bpacking.h" @@ -153,7 +152,7 @@ class BitReader { /// 'num_bytes'. The value is assumed to be byte-aligned so the stream will /// be advanced to the start of the next byte before 'v' is read. Returns /// false if there are not enough bytes left. - /// Assume the v was stored in buffer_ as a litte-endian format + /// Assume the v was stored in buffer_ as a little-endian format template bool GetAligned(int num_bytes, T* v); diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index aadb87d8eb8..6c4bec89010 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -2464,15 +2464,14 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder kMaxDeltaBitWidth)) { - if (miniblock_values_sum <= total_value_count_) { + if (miniblock_values_sum < total_value_count_) { throw ParquetException("delta bit width " + std::to_string(bit_width_data[current_mini_block_idx]) + " larger than integer bit width " + std::to_string(kMaxDeltaBitWidth)); - } else { - // according to the parquet standard, we should ignore the bit_width_data here. - break; } + // according to the parquet standard, we should ignore the bit_width_data here. + // cannot break because still need to read remaining bit-width from decoder. } miniblock_values_sum += values_per_mini_block_; } diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc index 4b84cf3db49..eae86aeaf46 100644 --- a/cpp/src/parquet/encoding_test.cc +++ b/cpp/src/parquet/encoding_test.cc @@ -1398,6 +1398,7 @@ TYPED_TEST(TestDeltaBitPackEncoding, BasicRoundTrip) { ASSERT_NO_FATAL_FAILURE( this->Execute((values_per_mini_block * values_per_block) + 1, 10)); ASSERT_NO_FATAL_FAILURE(this->Execute(0, 0)); + ASSERT_NO_FATAL_FAILURE(this->Execute(65, 1)); ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced( /*nvalues*/ 1234, /*repeats*/ 1, /*valid_bits_offset*/ 64, /*null_probability*/ 0.1)); @@ -1437,14 +1438,19 @@ TEST_F(DeltaBitPackEncoding, MalfordMiniblockBitWidth) { auto decoder = MakeTypedDecoder(Encoding::DELTA_BINARY_PACKED, descr_.get()); using c_type = parquet::Int32Type::c_type; - unsigned char good_data[] = "\200\001\004A\237\224\316\362\r\242\220\203- "; int encode_buffer_size = 273; int num_values = 65; - std::vector output_bytes = std::vector(num_values * sizeof(c_type)); - auto decode_buf = reinterpret_cast(output_bytes.data()); + c_type* decode_buf = nullptr; + std::vector output_bytes; + int values_decoded = 0; + /* + unsigned char good_data[] = "\200\001\004A\237\224\316\362\r\242\220\203- "; + output_bytes = std::vector(num_values * sizeof(c_type)); + decode_buf = reinterpret_cast(output_bytes.data()); decoder->SetData(num_values, &good_data[0], encode_buffer_size); - int values_decoded = decoder->Decode(decode_buf, num_values); + values_decoded = decoder->Decode(decode_buf, num_values); ASSERT_EQ(num_values, values_decoded); + */ unsigned char bad_data[] = "\200\001\004A\237\224\316\362\r\242\220\203- " @@ -1459,7 +1465,9 @@ TEST_F(DeltaBitPackEncoding, MalfordMiniblockBitWidth) { "\255\271f\026\274\033_\333)4"; output_bytes = std::vector(num_values * sizeof(c_type)); decode_buf = reinterpret_cast(output_bytes.data()); - decoder->SetData(num_values, &bad_data[0], encode_buffer_size); + decoder->SetData(num_values, &bad_data[0], encode_buffer_size); + values_decoded = decoder->Decode(decode_buf, num_values); + ASSERT_EQ(num_values, values_decoded); } } // namespace test From fcc4c8542f35e56cb5cdfa78053f919b79b389b3 Mon Sep 17 00:00:00 2001 From: mwish Date: Sun, 8 Jan 2023 00:18:28 +0800 Subject: [PATCH 06/15] [Update] update testing --- cpp/src/parquet/encoding_test.cc | 56 ++++++++++++++++++-------------- 1 file changed, 31 insertions(+), 25 deletions(-) diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc index eae86aeaf46..52e6fe20967 100644 --- a/cpp/src/parquet/encoding_test.cc +++ b/cpp/src/parquet/encoding_test.cc @@ -1443,31 +1443,37 @@ TEST_F(DeltaBitPackEncoding, MalfordMiniblockBitWidth) { c_type* decode_buf = nullptr; std::vector output_bytes; int values_decoded = 0; - /* - unsigned char good_data[] = "\200\001\004A\237\224\316\362\r\242\220\203- "; - output_bytes = std::vector(num_values * sizeof(c_type)); - decode_buf = reinterpret_cast(output_bytes.data()); - decoder->SetData(num_values, &good_data[0], encode_buffer_size); - values_decoded = decoder->Decode(decode_buf, num_values); - ASSERT_EQ(num_values, values_decoded); - */ - - unsigned char bad_data[] = - "\200\001\004A\237\224\316\362\r\242\220\203- " - "\245\245\304;`\210'\313\r\270D\316\306h㖀~\372\255\360A\254}\211L\343\373_" - "\034®\312Y\036\233<\203\035P\202)\307Y\356\327\024\302!\232\036," - "\271\b\331\353\037e\333\332\315Crm\203\350בOo\001\347\305Z\203G\037\263Y\254\366_" - "\"\v\276\242Y\002\374\300\226\231\252C\240\363ۙ\r\334E\314\f\002\255\227\273\307" - "\305'\"\033\235\374\250\243\244F\266\254\350\203\304U\036X\331&\210/" - "\037\322\321s.\031e/" - "\232\340\363\366\306\030\243,5\337\031\005bw\021\017wj\003#Q`\371ʉ\323\300+~=" - "\232W\232\374p\336$\022\211VQ\237>\v1gە'\224\207\262f\247h\363A!" - "\255\271f\026\274\033_\333)4"; - output_bytes = std::vector(num_values * sizeof(c_type)); - decode_buf = reinterpret_cast(output_bytes.data()); - decoder->SetData(num_values, &bad_data[0], encode_buffer_size); - values_decoded = decoder->Decode(decode_buf, num_values); - ASSERT_EQ(num_values, values_decoded); + + { + unsigned char good_data[] = "\200\001\004A\237\224\316\362\r\242\220\203- "; + unsigned char data_buf[273] = {}; + memcpy(&data_buf[0], &good_data[0], 15); + output_bytes = std::vector(num_values * sizeof(c_type)); + decode_buf = reinterpret_cast(output_bytes.data()); + decoder->SetData(num_values, &data_buf[0], encode_buffer_size); + values_decoded = decoder->Decode(decode_buf, num_values); + ASSERT_EQ(num_values, values_decoded); + } + + { + unsigned char bad_data[] = + "\200\001\004A\237\224\316\362\r\242\220\203- " + "\245\245\304;`\210'\313\r\270D\316\306h㖀~\372\255\360A\254}\211L\343\373_" + "\034®\312Y\036\233<\203\035P\202)\307Y\356\327\024\302!\232\036," + "\271\b\331\353\037e\333\332\315Crm\203\350בOo\001\347\305Z\203G\037\263Y\254\366" + "_" + "\"\v\276\242Y\002\374\300\226\231\252C\240\363ۙ\r\334E\314\f\002\255\227\273\307" + "\305'\"\033\235\374\250\243\244F\266\254\350\203\304U\036X\331&\210/" + "\037\322\321s.\031e/" + "\232\340\363\366\306\030\243,5\337\031\005bw\021\017wj\003#Q`\371ʉ\323\300+~=" + "\232W\232\374p\336$\022\211VQ\237>\v1gە'\224\207\262f\247h\363A!" + "\255\271f\026\274\033_\333)4"; + output_bytes = std::vector(num_values * sizeof(c_type)); + decode_buf = reinterpret_cast(output_bytes.data()); + decoder->SetData(num_values, &bad_data[0], encode_buffer_size); + values_decoded = decoder->Decode(decode_buf, num_values); + ASSERT_EQ(num_values, values_decoded); + } } } // namespace test From e6d4ed631902fab7a225ac0eb720eb50d70c584a Mon Sep 17 00:00:00 2001 From: mwish Date: Mon, 9 Jan 2023 04:08:58 +0800 Subject: [PATCH 07/15] [Update] Fix a internal bug - `total_value_count_` would be changed on every read - Add renaming to `total_value_count_` and `values_count_current_mini_block` --- cpp/src/parquet/encoding.cc | 61 ++++++++++++++++++++------------ cpp/src/parquet/encoding_test.cc | 7 +++- 2 files changed, 44 insertions(+), 24 deletions(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 6c4bec89010..f63b78bc742 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -2378,7 +2378,7 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder(total_value_count_); + return static_cast(total_value_remaining_); } int Decode(T* buffer, int max_values) override { @@ -2420,7 +2420,7 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoderGetVlqInt(&mini_blocks_per_block_) || !decoder_->GetVlqInt(&total_value_count_) || !decoder_->GetZigZagVlqInt(&last_value_)) { - ParquetException::EofException(); + ParquetException::EofException("InitHeader EOF"); } if (values_per_block_ == 0) { @@ -2444,22 +2444,24 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoderGetZigZagVlqInt(&min_delta_)) ParquetException::EofException(); + if (!decoder_->GetZigZagVlqInt(&min_delta_)) + ParquetException::EofException("InitBlock EOF"); // read the bitwidth of each miniblock uint8_t* bit_width_data = delta_bit_widths_->mutable_data(); - uint32_t miniblock_values_sum = values_num_up_to_current_block_; + uint32_t miniblock_values_sum = values_num_up_to_current_mini_block_; for (uint32_t current_mini_block_idx = 0; current_mini_block_idx < mini_blocks_per_block_; ++current_mini_block_idx) { if (!decoder_->GetAligned(1, bit_width_data + current_mini_block_idx)) { - ParquetException::EofException(); + ParquetException::EofException("Decode bit-width EOF"); } if (ARROW_PREDICT_FALSE(bit_width_data[current_mini_block_idx] > @@ -2475,27 +2477,31 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder total_value_count_) { + values_remaining_current_mini_block_ = + total_value_count_ - values_num_up_to_current_mini_block_; + } else { + values_remaining_current_mini_block_ = values_per_mini_block_; + } first_block_initialized_ = true; } int GetInternal(T* buffer, int max_values) { - max_values = static_cast(std::min(max_values, total_value_count_)); + max_values = static_cast(std::min(max_values, total_value_remaining_)); if (max_values == 0) { return 0; } int i = 0; while (i < max_values) { - if (ARROW_PREDICT_FALSE(values_current_mini_block_ == 0)) { + if (ARROW_PREDICT_FALSE(values_remaining_current_mini_block_ == 0)) { if (ARROW_PREDICT_FALSE(!first_block_initialized_)) { buffer[i++] = last_value_; DCHECK_EQ(i, 1); // we're at the beginning of the page - DCHECK_EQ(values_num_up_to_current_block_, 0); - values_num_up_to_current_block_ = 1; + DCHECK_EQ(values_num_up_to_current_mini_block_, 0); + values_num_up_to_current_mini_block_ = 1; if (ARROW_PREDICT_FALSE(i == max_values)) { // When block is uninitialized and i reaches max_values we have two // different possibilities: @@ -2512,9 +2518,16 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoderdata()[mini_block_idx_]; - values_current_mini_block_ = values_per_mini_block_; + if (values_num_up_to_current_mini_block_ + values_per_mini_block_ > + total_value_count_) { + values_remaining_current_mini_block_ = + total_value_count_ - values_num_up_to_current_mini_block_; + } else { + values_remaining_current_mini_block_ = values_per_mini_block_; + } } else { InitBlock(); } @@ -2522,7 +2535,7 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder(max_values - i)); + std::min(values_remaining_current_mini_block_, static_cast(max_values - i)); if (decoder_->GetBatch(delta_bit_width_, buffer + i, values_decode) != values_decode) { ParquetException::EofException(); @@ -2534,19 +2547,19 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder(last_value_); last_value_ = buffer[i + j]; } - values_current_mini_block_ -= values_decode; + values_remaining_current_mini_block_ -= values_decode; i += values_decode; } - total_value_count_ -= max_values; + total_value_remaining_ -= max_values; this->num_values_ -= max_values; - if (ARROW_PREDICT_FALSE(total_value_count_ == 0)) { - uint32_t padding_bits = values_current_mini_block_ * delta_bit_width_; + if (ARROW_PREDICT_FALSE(total_value_remaining_ == 0)) { + uint32_t padding_bits = values_remaining_current_mini_block_ * delta_bit_width_; // skip the padding bits if (!decoder_->Advance(padding_bits)) { ParquetException::EofException(); } - values_current_mini_block_ = 0; + values_remaining_current_mini_block_ = 0; } return max_values; } @@ -2556,17 +2569,19 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder delta_bit_widths_; diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc index 52e6fe20967..fde0151edf0 100644 --- a/cpp/src/parquet/encoding_test.cc +++ b/cpp/src/parquet/encoding_test.cc @@ -1444,10 +1444,15 @@ TEST_F(DeltaBitPackEncoding, MalfordMiniblockBitWidth) { std::vector output_bytes; int values_decoded = 0; + // Both good_data and bad_data is a DELTA_BINARY_PACKED INT32 Page with 65 values. + // For needed miniblocks, there bit-widths are all 32. + // There bit-widths for unneeded miniblocks are different: + // * good_data's unneeded bit-width is 0. + // * bad_data's unneeded bit-width is 165. { unsigned char good_data[] = "\200\001\004A\237\224\316\362\r\242\220\203- "; unsigned char data_buf[273] = {}; - memcpy(&data_buf[0], &good_data[0], 15); + std::memcpy(&data_buf[0], &good_data[0], 15); output_bytes = std::vector(num_values * sizeof(c_type)); decode_buf = reinterpret_cast(output_bytes.data()); decoder->SetData(num_values, &data_buf[0], encode_buffer_size); From 72517588a885c4e44434ac337d99781c33de6459 Mon Sep 17 00:00:00 2001 From: mwish Date: Mon, 9 Jan 2023 04:27:26 +0800 Subject: [PATCH 08/15] Simplify code: making remaing-value in miniblock could be larger --- cpp/src/parquet/encoding.cc | 21 ++++++--------------- 1 file changed, 6 insertions(+), 15 deletions(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index f63b78bc742..6afba796e37 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -2479,12 +2479,7 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder total_value_count_) { - values_remaining_current_mini_block_ = - total_value_count_ - values_num_up_to_current_mini_block_; - } else { - values_remaining_current_mini_block_ = values_per_mini_block_; - } + values_remaining_current_mini_block_ = values_per_mini_block_; first_block_initialized_ = true; } @@ -2521,21 +2516,15 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoderdata()[mini_block_idx_]; - if (values_num_up_to_current_mini_block_ + values_per_mini_block_ > - total_value_count_) { - values_remaining_current_mini_block_ = - total_value_count_ - values_num_up_to_current_mini_block_; - } else { - values_remaining_current_mini_block_ = values_per_mini_block_; - } + values_remaining_current_mini_block_ = values_per_mini_block_; } else { InitBlock(); } } } - int values_decode = - std::min(values_remaining_current_mini_block_, static_cast(max_values - i)); + int values_decode = std::min(values_remaining_current_mini_block_, + static_cast(max_values - i)); if (decoder_->GetBatch(delta_bit_width_, buffer + i, values_decode) != values_decode) { ParquetException::EofException(); @@ -2572,6 +2561,8 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder Date: Mon, 9 Jan 2023 10:08:26 +0800 Subject: [PATCH 09/15] Test: retrigger CI --- cpp/src/parquet/encoding_test.cc | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc index fde0151edf0..defd7125856 100644 --- a/cpp/src/parquet/encoding_test.cc +++ b/cpp/src/parquet/encoding_test.cc @@ -1449,9 +1449,9 @@ TEST_F(DeltaBitPackEncoding, MalfordMiniblockBitWidth) { // There bit-widths for unneeded miniblocks are different: // * good_data's unneeded bit-width is 0. // * bad_data's unneeded bit-width is 165. + unsigned char data_buf[273] = {}; { unsigned char good_data[] = "\200\001\004A\237\224\316\362\r\242\220\203- "; - unsigned char data_buf[273] = {}; std::memcpy(&data_buf[0], &good_data[0], 15); output_bytes = std::vector(num_values * sizeof(c_type)); decode_buf = reinterpret_cast(output_bytes.data()); @@ -1473,9 +1473,10 @@ TEST_F(DeltaBitPackEncoding, MalfordMiniblockBitWidth) { "\232\340\363\366\306\030\243,5\337\031\005bw\021\017wj\003#Q`\371ʉ\323\300+~=" "\232W\232\374p\336$\022\211VQ\237>\v1gە'\224\207\262f\247h\363A!" "\255\271f\026\274\033_\333)4"; + std::memcpy(&data_buf[0], &bad_data[0], 222); output_bytes = std::vector(num_values * sizeof(c_type)); decode_buf = reinterpret_cast(output_bytes.data()); - decoder->SetData(num_values, &bad_data[0], encode_buffer_size); + decoder->SetData(num_values, &data_buf[0], encode_buffer_size); values_decoded = decoder->Decode(decode_buf, num_values); ASSERT_EQ(num_values, values_decoded); } From 624f15ee7c098c82b2991f539e7c52ce995ee8ff Mon Sep 17 00:00:00 2001 From: mwish Date: Mon, 9 Jan 2023 21:43:21 +0800 Subject: [PATCH 10/15] Re-generate testing data, and represent using Hex. --- cpp/src/parquet/encoding_test.cc | 62 ++++++++++++++++++-------------- 1 file changed, 35 insertions(+), 27 deletions(-) diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc index defd7125856..27b22507afb 100644 --- a/cpp/src/parquet/encoding_test.cc +++ b/cpp/src/parquet/encoding_test.cc @@ -35,6 +35,7 @@ #include "arrow/util/bitmap_writer.h" #include "arrow/util/checked_cast.h" #include "arrow/util/endian.h" +#include "arrow/util/string.h" #include "parquet/encoding.h" #include "parquet/platform.h" @@ -1428,17 +1429,12 @@ TYPED_TEST(TestDeltaBitPackEncoding, BasicRoundTrip) { } } -class DeltaBitPackEncoding : public TestArrowBuilderDecoding { - public: - void SetupEncoderDecoder() override {} -}; - -TEST_F(DeltaBitPackEncoding, MalfordMiniblockBitWidth) { - std::shared_ptr descr_ = ExampleDescr(); - auto decoder = MakeTypedDecoder(Encoding::DELTA_BINARY_PACKED, descr_.get()); +TEST(DeltaBitPackEncoding, MalfordMiniblockBitWidth) { + std::shared_ptr descr = ExampleDescr(); + auto decoder = MakeTypedDecoder(Encoding::DELTA_BINARY_PACKED, descr.get()); using c_type = parquet::Int32Type::c_type; - int encode_buffer_size = 273; + int encode_buffer_size = 274; int num_values = 65; c_type* decode_buf = nullptr; std::vector output_bytes; @@ -1449,34 +1445,46 @@ TEST_F(DeltaBitPackEncoding, MalfordMiniblockBitWidth) { // There bit-widths for unneeded miniblocks are different: // * good_data's unneeded bit-width is 0. // * bad_data's unneeded bit-width is 165. - unsigned char data_buf[273] = {}; + // We use Hex to represent good_data and bad_data. { - unsigned char good_data[] = "\200\001\004A\237\224\316\362\r\242\220\203- "; - std::memcpy(&data_buf[0], &good_data[0], 15); + char good_data_hex[] = + "80010441BDA08DB708A488C0F1012020000089F79E53946D93CA242D48B21478591FDE6E4855CC03" + "7024A4E8E56E1D20687C8BFB4DE880E73CE2E49E873BEB0B1DAD0E0169C7951C411CA76AD949D8ED" + "FD9E07A485DF4410B4486253A6280335BB87494B7D61E40A8D487731BBC83A37E1E8EE2F5431150C" + "F46D7224B3C7C271AC3220F2C6663289427E1CE3B00F8C34D5DDA845664AF36F56ACF6FED2339911" + "7AB54F6667A80EA78D08DC61B4262CE28960B46B2D93785ED9933175A910415B50B30E9889C70ECD" + "84C8856531E78B4E05B2AB27B1233E4BAB722869D785F5B8B1049955EFE200000000C6C2201E1836" + "C0579B202A6A7395C761044AD151D5CEE992CBBA5AE937E54F29D28F86CC0627B9D1"; + unsigned char good_data[274]; + for (int i = 0; i < 274; ++i) { + auto s = ::arrow::ParseHexValue(&good_data_hex[i * 2], &good_data[i]); + ASSERT_TRUE(s.ok()); + } + output_bytes = std::vector(num_values * sizeof(c_type)); decode_buf = reinterpret_cast(output_bytes.data()); - decoder->SetData(num_values, &data_buf[0], encode_buffer_size); + decoder->SetData(num_values, &good_data[0], encode_buffer_size); values_decoded = decoder->Decode(decode_buf, num_values); ASSERT_EQ(num_values, values_decoded); } { - unsigned char bad_data[] = - "\200\001\004A\237\224\316\362\r\242\220\203- " - "\245\245\304;`\210'\313\r\270D\316\306h㖀~\372\255\360A\254}\211L\343\373_" - "\034®\312Y\036\233<\203\035P\202)\307Y\356\327\024\302!\232\036," - "\271\b\331\353\037e\333\332\315Crm\203\350בOo\001\347\305Z\203G\037\263Y\254\366" - "_" - "\"\v\276\242Y\002\374\300\226\231\252C\240\363ۙ\r\334E\314\f\002\255\227\273\307" - "\305'\"\033\235\374\250\243\244F\266\254\350\203\304U\036X\331&\210/" - "\037\322\321s.\031e/" - "\232\340\363\366\306\030\243,5\337\031\005bw\021\017wj\003#Q`\371ʉ\323\300+~=" - "\232W\232\374p\336$\022\211VQ\237>\v1gە'\224\207\262f\247h\363A!" - "\255\271f\026\274\033_\333)4"; - std::memcpy(&data_buf[0], &bad_data[0], 222); + char bad_data_hex[] = + "80010441BDA08DB708A488C0F1012020A5A589F79E53946D93CA242D48B21478591FDE6E4855CC03" + "7024A4E8E56E1D20687C8BFB4DE880E73CE2E49E873BEB0B1DAD0E0169C7951C411CA76AD949D8ED" + "FD9E07A485DF4410B4486253A6280335BB87494B7D61E40A8D487731BBC83A37E1E8EE2F5431150C" + "F46D7224B3C7C271AC3220F2C6663289427E1CE3B00F8C34D5DDA845664AF36F56ACF6FED2339911" + "7AB54F6667A80EA78D08DC61B4262CE28960B46B2D93785ED9933175A910415B50B30E9889C70ECD" + "84C8856531E78B4E05B2AB27B1233E4BAB722869D785F5B8B1049955EFE200000000C6C2201E1836" + "C0579B202A6A7395C761044AD151D5CEE992CBBA5AE937E54F29D28F86CC0627B9D1"; + unsigned char bad_data[274]; + for (int i = 0; i < 274; ++i) { + auto s = ::arrow::ParseHexValue(&bad_data_hex[i * 2], &bad_data[i]); + ASSERT_TRUE(s.ok()); + } output_bytes = std::vector(num_values * sizeof(c_type)); decode_buf = reinterpret_cast(output_bytes.data()); - decoder->SetData(num_values, &data_buf[0], encode_buffer_size); + decoder->SetData(num_values, &bad_data[0], encode_buffer_size); values_decoded = decoder->Decode(decode_buf, num_values); ASSERT_EQ(num_values, values_decoded); } From c0bfefc13134cad721b58496d8fda04a53a45804 Mon Sep 17 00:00:00 2001 From: mwish Date: Wed, 11 Jan 2023 02:02:25 +0800 Subject: [PATCH 11/15] Update testing --- cpp/src/parquet/encoding_test.cc | 59 ++++++++++++++------------------ 1 file changed, 25 insertions(+), 34 deletions(-) diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc index 27b22507afb..8f53d84e387 100644 --- a/cpp/src/parquet/encoding_test.cc +++ b/cpp/src/parquet/encoding_test.cc @@ -1434,20 +1434,30 @@ TEST(DeltaBitPackEncoding, MalfordMiniblockBitWidth) { auto decoder = MakeTypedDecoder(Encoding::DELTA_BINARY_PACKED, descr.get()); using c_type = parquet::Int32Type::c_type; - int encode_buffer_size = 274; - int num_values = 65; - c_type* decode_buf = nullptr; - std::vector output_bytes; - int values_decoded = 0; - - // Both good_data and bad_data is a DELTA_BINARY_PACKED INT32 Page with 65 values. - // For needed miniblocks, there bit-widths are all 32. - // There bit-widths for unneeded miniblocks are different: - // * good_data's unneeded bit-width is 0. - // * bad_data's unneeded bit-width is 165. + auto testHexBinary = [&decoder](std::string_view hex_string, int binary_buffer_size, + int expect_values_decoded) { + std::vector binary_data(binary_buffer_size); + for (int i = 0; i < binary_buffer_size; ++i) { + ASSERT_OK(::arrow::ParseHexValue(hex_string.data() + i * 2, &binary_data[i])); + } + + auto output_bytes = std::vector(expect_values_decoded * sizeof(c_type)); + auto decode_buf = reinterpret_cast(output_bytes.data()); + decoder->SetData(expect_values_decoded, &binary_data[0], binary_buffer_size); + int values_decoded = decoder->Decode(decode_buf, expect_values_decoded); + ASSERT_EQ(expect_values_decoded, values_decoded); + }; + + // Both good_bitwidth_data and bad_bitwidth_data is a DELTA_BINARY_PACKED INT32 Page + // with 65 values. For needed miniblocks, there bit-widths are all 32. There bit-widths + // for unneeded miniblocks are different: + // * good_bitwidth_data's unneeded bit-width is 0. + // * bad_bitwidth_data's unneeded bit-width is 165. // We use Hex to represent good_data and bad_data. + constexpr int kEncodeBufferSize = 274; + constexpr int kExpectValues = 65; { - char good_data_hex[] = + std::string_view good_bitwidth_data_hex = "80010441BDA08DB708A488C0F1012020000089F79E53946D93CA242D48B21478591FDE6E4855CC03" "7024A4E8E56E1D20687C8BFB4DE880E73CE2E49E873BEB0B1DAD0E0169C7951C411CA76AD949D8ED" "FD9E07A485DF4410B4486253A6280335BB87494B7D61E40A8D487731BBC83A37E1E8EE2F5431150C" @@ -1455,21 +1465,11 @@ TEST(DeltaBitPackEncoding, MalfordMiniblockBitWidth) { "7AB54F6667A80EA78D08DC61B4262CE28960B46B2D93785ED9933175A910415B50B30E9889C70ECD" "84C8856531E78B4E05B2AB27B1233E4BAB722869D785F5B8B1049955EFE200000000C6C2201E1836" "C0579B202A6A7395C761044AD151D5CEE992CBBA5AE937E54F29D28F86CC0627B9D1"; - unsigned char good_data[274]; - for (int i = 0; i < 274; ++i) { - auto s = ::arrow::ParseHexValue(&good_data_hex[i * 2], &good_data[i]); - ASSERT_TRUE(s.ok()); - } - - output_bytes = std::vector(num_values * sizeof(c_type)); - decode_buf = reinterpret_cast(output_bytes.data()); - decoder->SetData(num_values, &good_data[0], encode_buffer_size); - values_decoded = decoder->Decode(decode_buf, num_values); - ASSERT_EQ(num_values, values_decoded); + testHexBinary(good_bitwidth_data_hex, kEncodeBufferSize, kExpectValues); } { - char bad_data_hex[] = + std::string_view bad_bitwidth_data_hex = "80010441BDA08DB708A488C0F1012020A5A589F79E53946D93CA242D48B21478591FDE6E4855CC03" "7024A4E8E56E1D20687C8BFB4DE880E73CE2E49E873BEB0B1DAD0E0169C7951C411CA76AD949D8ED" "FD9E07A485DF4410B4486253A6280335BB87494B7D61E40A8D487731BBC83A37E1E8EE2F5431150C" @@ -1477,16 +1477,7 @@ TEST(DeltaBitPackEncoding, MalfordMiniblockBitWidth) { "7AB54F6667A80EA78D08DC61B4262CE28960B46B2D93785ED9933175A910415B50B30E9889C70ECD" "84C8856531E78B4E05B2AB27B1233E4BAB722869D785F5B8B1049955EFE200000000C6C2201E1836" "C0579B202A6A7395C761044AD151D5CEE992CBBA5AE937E54F29D28F86CC0627B9D1"; - unsigned char bad_data[274]; - for (int i = 0; i < 274; ++i) { - auto s = ::arrow::ParseHexValue(&bad_data_hex[i * 2], &bad_data[i]); - ASSERT_TRUE(s.ok()); - } - output_bytes = std::vector(num_values * sizeof(c_type)); - decode_buf = reinterpret_cast(output_bytes.data()); - decoder->SetData(num_values, &bad_data[0], encode_buffer_size); - values_decoded = decoder->Decode(decode_buf, num_values); - ASSERT_EQ(num_values, values_decoded); + testHexBinary(bad_bitwidth_data_hex, kEncodeBufferSize, kExpectValues); } } From c2ede0fe9619b676638125b75458e0e2b454649d Mon Sep 17 00:00:00 2001 From: mwish Date: Wed, 11 Jan 2023 02:25:15 +0800 Subject: [PATCH 12/15] Fix partial comment --- cpp/src/parquet/encoding.cc | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 6afba796e37..1f829e4d22f 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -2377,8 +2377,8 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder(total_value_remaining_); + // total_values_remaining_ in header ignores of null values + return static_cast(total_values_remaining_); } int Decode(T* buffer, int max_values) override { @@ -2444,7 +2444,7 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder kMaxDeltaBitWidth)) { - if (miniblock_values_sum < total_value_count_) { + if (miniblock_values_sum < total_values_remaining_) { throw ParquetException("delta bit width " + std::to_string(bit_width_data[current_mini_block_idx]) + " larger than integer bit width " + @@ -2484,7 +2484,7 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder(std::min(max_values, total_value_remaining_)); + max_values = static_cast(std::min(max_values, total_values_remaining_)); if (max_values == 0) { return 0; } @@ -2539,10 +2539,10 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecodernum_values_ -= max_values; - if (ARROW_PREDICT_FALSE(total_value_remaining_ == 0)) { + if (ARROW_PREDICT_FALSE(total_values_remaining_ == 0)) { uint32_t padding_bits = values_remaining_current_mini_block_ * delta_bit_width_; // skip the padding bits if (!decoder_->Advance(padding_bits)) { @@ -2560,9 +2560,9 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder Date: Wed, 11 Jan 2023 02:44:13 +0800 Subject: [PATCH 13/15] Update: using counter `i` rather than `sum` --- cpp/src/parquet/encoding.cc | 26 +++++++++----------------- 1 file changed, 9 insertions(+), 17 deletions(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 1f829e4d22f..e9aeac6c25b 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -2447,26 +2447,26 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoderGetZigZagVlqInt(&min_delta_)) ParquetException::EofException("InitBlock EOF"); // read the bitwidth of each miniblock uint8_t* bit_width_data = delta_bit_widths_->mutable_data(); - uint32_t miniblock_values_sum = values_num_up_to_current_mini_block_; + // If the current block contains the last mini block, + // current_num_values may greater than total_values_remaining_. + uint32_t current_num_values = prefix_values; for (uint32_t current_mini_block_idx = 0; current_mini_block_idx < mini_blocks_per_block_; ++current_mini_block_idx) { if (!decoder_->GetAligned(1, bit_width_data + current_mini_block_idx)) { ParquetException::EofException("Decode bit-width EOF"); } - if (ARROW_PREDICT_FALSE(bit_width_data[current_mini_block_idx] > kMaxDeltaBitWidth)) { - if (miniblock_values_sum < total_values_remaining_) { + if (current_num_values < total_values_remaining_) { throw ParquetException("delta bit width " + std::to_string(bit_width_data[current_mini_block_idx]) + " larger than integer bit width " + @@ -2475,7 +2475,7 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoderdata()[mini_block_idx_]; values_remaining_current_mini_block_ = values_per_mini_block_; } else { - InitBlock(); + InitBlock(i); } } } @@ -2568,11 +2565,6 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder delta_bit_widths_; From e1650b99784c2645c89f4c4fa980765e223148e7 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Mon, 16 Jan 2023 16:01:53 +0100 Subject: [PATCH 14/15] Rewrite test and implementation --- cpp/src/parquet/encoding.cc | 48 +++++------ cpp/src/parquet/encoding_test.cc | 138 +++++++++++++++++-------------- 2 files changed, 97 insertions(+), 89 deletions(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index e9aeac6c25b..3c36a8bf7e8 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -2450,37 +2450,34 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoderGetZigZagVlqInt(&min_delta_)) ParquetException::EofException("InitBlock EOF"); // read the bitwidth of each miniblock uint8_t* bit_width_data = delta_bit_widths_->mutable_data(); - // If the current block contains the last mini block, - // current_num_values may greater than total_values_remaining_. - uint32_t current_num_values = prefix_values; - for (uint32_t current_mini_block_idx = 0; - current_mini_block_idx < mini_blocks_per_block_; ++current_mini_block_idx) { - if (!decoder_->GetAligned(1, bit_width_data + current_mini_block_idx)) { + for (uint32_t i = 0; i < mini_blocks_per_block_; ++i) { + if (!decoder_->GetAligned(1, bit_width_data + i)) { ParquetException::EofException("Decode bit-width EOF"); } - if (ARROW_PREDICT_FALSE(bit_width_data[current_mini_block_idx] > - kMaxDeltaBitWidth)) { - if (current_num_values < total_values_remaining_) { - throw ParquetException("delta bit width " + - std::to_string(bit_width_data[current_mini_block_idx]) + - " larger than integer bit width " + - std::to_string(kMaxDeltaBitWidth)); - } - // according to the parquet standard, we should ignore the bit_width_data here. - // cannot break because still need to read remaining bit-width from decoder. - } - current_num_values += values_per_mini_block_; + // Note that non-conformant bitwidth entries are allowed by the Parquet spec + // for extraneous miniblocks in the last block (GH-14923), so we check + // the bitwidths when actually using them (see InitMiniBlock()). } + mini_block_idx_ = 0; - delta_bit_width_ = bit_width_data[0]; - values_remaining_current_mini_block_ = values_per_mini_block_; first_block_initialized_ = true; + InitMiniBlock(bit_width_data[0]); + } + + void InitMiniBlock(int bit_width) { + if (ARROW_PREDICT_FALSE(bit_width > kMaxDeltaBitWidth)) { + throw ParquetException("delta bit width larger than integer bit width"); + } + delta_bit_width_ = bit_width; + values_remaining_current_mini_block_ = values_per_mini_block_; } int GetInternal(T* buffer, int max_values) { @@ -2504,18 +2501,17 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoderdata()[mini_block_idx_]; - values_remaining_current_mini_block_ = values_per_mini_block_; + InitMiniBlock(delta_bit_widths_->data()[mini_block_idx_]); } else { - InitBlock(i); + InitBlock(); } } } diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc index 8f53d84e387..840f8af09a6 100644 --- a/cpp/src/parquet/encoding_test.cc +++ b/cpp/src/parquet/encoding_test.cc @@ -1326,29 +1326,32 @@ class TestDeltaBitPackEncoding : public TestEncodingBase { CheckRoundtripSpaced(valid_bits, valid_bits_offset); } - void CheckRoundtrip() override { - auto encoder = - MakeTypedEncoder(Encoding::DELTA_BINARY_PACKED, false, descr_.get()); + void CheckDecoding() { auto decoder = MakeTypedDecoder(Encoding::DELTA_BINARY_PACKED, descr_.get()); auto read_batch_sizes = kReadBatchSizes; read_batch_sizes.push_back(num_values_); + // Exercise different batch sizes + for (const int read_batch_size : read_batch_sizes) { + decoder->SetData(num_values_, encode_buffer_->data(), + static_cast(encode_buffer_->size())); + + int values_decoded = 0; + while (values_decoded < num_values_) { + values_decoded += decoder->Decode(decode_buf_ + values_decoded, read_batch_size); + } + ASSERT_EQ(num_values_, values_decoded); + ASSERT_NO_FATAL_FAILURE(VerifyResults(decode_buf_, draws_, num_values_)); + } + } + + void CheckRoundtrip() override { + auto encoder = + MakeTypedEncoder(Encoding::DELTA_BINARY_PACKED, false, descr_.get()); // Encode a number of times to exercise the flush logic for (size_t i = 0; i < kNumRoundTrips; ++i) { encoder->Put(draws_, num_values_); encode_buffer_ = encoder->FlushValues(); - // Exercise different batch sizes - for (const int read_batch_size : read_batch_sizes) { - decoder->SetData(num_values_, encode_buffer_->data(), - static_cast(encode_buffer_->size())); - - int values_decoded = 0; - while (values_decoded < num_values_) { - values_decoded += - decoder->Decode(decode_buf_ + values_decoded, read_batch_size); - } - ASSERT_EQ(num_values_, values_decoded); - ASSERT_NO_FATAL_FAILURE(VerifyResults(decode_buf_, draws_, num_values_)); - } + CheckDecoding(); } } @@ -1429,55 +1432,64 @@ TYPED_TEST(TestDeltaBitPackEncoding, BasicRoundTrip) { } } -TEST(DeltaBitPackEncoding, MalfordMiniblockBitWidth) { - std::shared_ptr descr = ExampleDescr(); - auto decoder = MakeTypedDecoder(Encoding::DELTA_BINARY_PACKED, descr.get()); - using c_type = parquet::Int32Type::c_type; - - auto testHexBinary = [&decoder](std::string_view hex_string, int binary_buffer_size, - int expect_values_decoded) { - std::vector binary_data(binary_buffer_size); - for (int i = 0; i < binary_buffer_size; ++i) { - ASSERT_OK(::arrow::ParseHexValue(hex_string.data() + i * 2, &binary_data[i])); +TYPED_TEST(TestDeltaBitPackEncoding, NonZeroPaddedMiniblockBitWidth) { + // GH-14923: depending on the number of encoded values, some of the miniblock + // bitwidths are actually padding bytes that may take non-conformant values + // according to the Parquet spec. + + // Same values as in DeltaBitPackEncoder + constexpr int kValuesPerBlock = 128; + constexpr int kMiniBlocksPerBlock = 4; + constexpr int kValuesPerMiniBlock = kValuesPerBlock / kMiniBlocksPerBlock; + + // num_values must be kept small enough for kHeaderLength below + for (const int num_values : {2, 62, 63, 64, 65, 95, 96, 97, 127}) { + ARROW_SCOPED_TRACE("num_values = ", num_values); + + // Generate input data with a small half_range to make the header length + // deterministic (see kHeaderLength). + this->InitBoundData(num_values, /*repeats=*/1, /*half_range=*/63); + ASSERT_EQ(this->num_values_, num_values); + + auto encoder = MakeTypedEncoder(Encoding::DELTA_BINARY_PACKED, false, + this->descr_.get()); + encoder->Put(this->draws_, this->num_values_); + auto encoded = encoder->FlushValues(); + const auto encoded_size = encoded->size(); + + // Make mutable copy of encoded buffer + this->encode_buffer_ = AllocateBuffer(default_memory_pool(), encoded_size); + uint8_t* data = this->encode_buffer_->mutable_data(); + memcpy(data, encoded->data(), encoded_size); + + // The number of padding bytes at the end of the miniblock bitwidths array. + // We subtract 1 from num_values because the first data value is encoded + // in the header, thus does not participate in miniblock encoding. + const int num_padding_bytes = + (kValuesPerBlock - num_values + 1) / kValuesPerMiniBlock; + ARROW_SCOPED_TRACE("num_padding_bytes = ", num_padding_bytes); + + // The header length is: + // - 2 bytes for ULEB128-encoded block size (== kValuesPerBlock) + // - 1 byte for ULEB128-encoded miniblocks per block (== kMiniBlocksPerBlock) + // - 1 byte for ULEB128-encoded num_values + // - 1 byte for ULEB128-encoded first value + // (this assumes that num_values and the first value are narrow enough) + constexpr int kHeaderLength = 5; + // After the header, there is a ULEB128-encoded min delta for the first block, + // then the miniblock bitwidths for the first block. + // Given a narrow enough range, the ULEB128-encoded min delta is 1 byte long. + uint8_t* mini_block_bitwidths = data + kHeaderLength + 1; + + // Garble padding bytes; decoding should succeed. + for (int i = 0; i < num_padding_bytes; ++i) { + mini_block_bitwidths[kMiniBlocksPerBlock - i - 1] = 0xFFU; } + ASSERT_NO_THROW(this->CheckDecoding()); - auto output_bytes = std::vector(expect_values_decoded * sizeof(c_type)); - auto decode_buf = reinterpret_cast(output_bytes.data()); - decoder->SetData(expect_values_decoded, &binary_data[0], binary_buffer_size); - int values_decoded = decoder->Decode(decode_buf, expect_values_decoded); - ASSERT_EQ(expect_values_decoded, values_decoded); - }; - - // Both good_bitwidth_data and bad_bitwidth_data is a DELTA_BINARY_PACKED INT32 Page - // with 65 values. For needed miniblocks, there bit-widths are all 32. There bit-widths - // for unneeded miniblocks are different: - // * good_bitwidth_data's unneeded bit-width is 0. - // * bad_bitwidth_data's unneeded bit-width is 165. - // We use Hex to represent good_data and bad_data. - constexpr int kEncodeBufferSize = 274; - constexpr int kExpectValues = 65; - { - std::string_view good_bitwidth_data_hex = - "80010441BDA08DB708A488C0F1012020000089F79E53946D93CA242D48B21478591FDE6E4855CC03" - "7024A4E8E56E1D20687C8BFB4DE880E73CE2E49E873BEB0B1DAD0E0169C7951C411CA76AD949D8ED" - "FD9E07A485DF4410B4486253A6280335BB87494B7D61E40A8D487731BBC83A37E1E8EE2F5431150C" - "F46D7224B3C7C271AC3220F2C6663289427E1CE3B00F8C34D5DDA845664AF36F56ACF6FED2339911" - "7AB54F6667A80EA78D08DC61B4262CE28960B46B2D93785ED9933175A910415B50B30E9889C70ECD" - "84C8856531E78B4E05B2AB27B1233E4BAB722869D785F5B8B1049955EFE200000000C6C2201E1836" - "C0579B202A6A7395C761044AD151D5CEE992CBBA5AE937E54F29D28F86CC0627B9D1"; - testHexBinary(good_bitwidth_data_hex, kEncodeBufferSize, kExpectValues); - } - - { - std::string_view bad_bitwidth_data_hex = - "80010441BDA08DB708A488C0F1012020A5A589F79E53946D93CA242D48B21478591FDE6E4855CC03" - "7024A4E8E56E1D20687C8BFB4DE880E73CE2E49E873BEB0B1DAD0E0169C7951C411CA76AD949D8ED" - "FD9E07A485DF4410B4486253A6280335BB87494B7D61E40A8D487731BBC83A37E1E8EE2F5431150C" - "F46D7224B3C7C271AC3220F2C6663289427E1CE3B00F8C34D5DDA845664AF36F56ACF6FED2339911" - "7AB54F6667A80EA78D08DC61B4262CE28960B46B2D93785ED9933175A910415B50B30E9889C70ECD" - "84C8856531E78B4E05B2AB27B1233E4BAB722869D785F5B8B1049955EFE200000000C6C2201E1836" - "C0579B202A6A7395C761044AD151D5CEE992CBBA5AE937E54F29D28F86CC0627B9D1"; - testHexBinary(bad_bitwidth_data_hex, kEncodeBufferSize, kExpectValues); + // Not a padding byte but an actual miniblock bitwidth; decoding should error out. + mini_block_bitwidths[kMiniBlocksPerBlock - num_padding_bytes - 1] = 0xFFU; + EXPECT_THROW(this->CheckDecoding(), ParquetException); } } From 2995eff1668ba83347fcb2e797d799cb09e7dda6 Mon Sep 17 00:00:00 2001 From: mwish Date: Tue, 17 Jan 2023 04:08:00 +0800 Subject: [PATCH 15/15] fix testing for zigzag uleb128 --- cpp/src/parquet/encoding_test.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc index 840f8af09a6..a0e3fe9545d 100644 --- a/cpp/src/parquet/encoding_test.cc +++ b/cpp/src/parquet/encoding_test.cc @@ -1448,7 +1448,7 @@ TYPED_TEST(TestDeltaBitPackEncoding, NonZeroPaddedMiniblockBitWidth) { // Generate input data with a small half_range to make the header length // deterministic (see kHeaderLength). - this->InitBoundData(num_values, /*repeats=*/1, /*half_range=*/63); + this->InitBoundData(num_values, /*repeats=*/1, /*half_range=*/31); ASSERT_EQ(this->num_values_, num_values); auto encoder = MakeTypedEncoder(Encoding::DELTA_BINARY_PACKED, false, @@ -1476,9 +1476,9 @@ TYPED_TEST(TestDeltaBitPackEncoding, NonZeroPaddedMiniblockBitWidth) { // - 1 byte for ULEB128-encoded first value // (this assumes that num_values and the first value are narrow enough) constexpr int kHeaderLength = 5; - // After the header, there is a ULEB128-encoded min delta for the first block, + // After the header, there is a zigzag ULEB128-encoded min delta for the first block, // then the miniblock bitwidths for the first block. - // Given a narrow enough range, the ULEB128-encoded min delta is 1 byte long. + // Given a narrow enough range, the zigzag ULEB128-encoded min delta is 1 byte long. uint8_t* mini_block_bitwidths = data + kHeaderLength + 1; // Garble padding bytes; decoding should succeed.