From fe6ceea7b67e9aa0dc82d909396c1a1bb7522d35 Mon Sep 17 00:00:00 2001 From: mwish Date: Fri, 30 Dec 2022 08:17:05 +0800 Subject: [PATCH 1/8] Move InitBlock ahead --- cpp/src/parquet/encoding.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 9761dfd3013..82721514e4f 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -2478,9 +2478,9 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder Date: Fri, 30 Dec 2022 21:45:41 +0800 Subject: [PATCH 2/8] [Update] Handling the case for only one value --- cpp/src/parquet/encoding.cc | 10 ++++++---- cpp/src/parquet/encoding_test.cc | 2 ++ 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 82721514e4f..731d719c755 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -2372,7 +2372,7 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder decoder) { this->num_values_ = num_values; - decoder_ = decoder; + decoder_ = std::move(decoder); InitHeader(); } @@ -2459,7 +2459,9 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder kMaxDeltaBitWidth) { - throw ParquetException("delta bit width larger than integer bit width"); + throw ParquetException("delta bit width " + std::to_string(bit_width_data[i]) + + " larger than integer bit width " + + std::to_string(kMaxDeltaBitWidth)); } } mini_block_idx_ = 0; @@ -2478,9 +2480,9 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder(total_value_count_))) break; + InitBlock(); } else { ++mini_block_idx_; if (mini_block_idx_ < mini_blocks_per_block_) { diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc index 3b4cafab829..c8c1d79c919 100644 --- a/cpp/src/parquet/encoding_test.cc +++ b/cpp/src/parquet/encoding_test.cc @@ -1388,6 +1388,8 @@ TYPED_TEST(TestDeltaBitPackEncoding, BasicRoundTrip) { ASSERT_NO_FATAL_FAILURE( this->Execute((values_per_mini_block * values_per_block) + 1, 10)); ASSERT_NO_FATAL_FAILURE(this->Execute(0, 0)); + ASSERT_NO_FATAL_FAILURE(this->Execute(1, 1)); + ASSERT_NO_FATAL_FAILURE(this->Execute(1, 10)); ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced( /*nvalues*/ 1234, /*repeats*/ 1, /*valid_bits_offset*/ 64, /*null_probability*/ 0.1)); From f978e25a4fcb009200aa333d521b6542949e9bd5 Mon Sep 17 00:00:00 2001 From: mwish Date: Fri, 30 Dec 2022 22:09:56 +0800 Subject: [PATCH 3/8] [ADD] add some test with different steps --- cpp/src/parquet/encoding.cc | 7 ++++++- cpp/src/parquet/encoding_test.cc | 36 ++++++++++++++++++++++++++++++-- 2 files changed, 40 insertions(+), 3 deletions(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 731d719c755..2a9acafff53 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -2481,7 +2481,12 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder(total_value_count_))) break; + if (ARROW_PREDICT_FALSE(i == max_values)) { + if (i != static_cast(total_value_count_)) { + InitBlock(); + } + break; + } InitBlock(); } else { ++mini_block_idx_; diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc index c8c1d79c919..93be29cf695 100644 --- a/cpp/src/parquet/encoding_test.cc +++ b/cpp/src/parquet/encoding_test.cc @@ -1324,6 +1324,29 @@ class TestDeltaBitPackEncoding : public TestEncodingBase { CheckRoundtripSpaced(valid_bits, valid_bits_offset); } + void ExecuteSteps(int nvalues, int repeats, int read_batch) { + this->InitData(nvalues, repeats); + auto encoder = + MakeTypedEncoder(Encoding::DELTA_BINARY_PACKED, false, descr_.get()); + auto decoder = MakeTypedDecoder(Encoding::DELTA_BINARY_PACKED, descr_.get()); + + for (size_t i = 0; i < ROUND_TRIP_TIMES; ++i) { + encoder->Put(draws_, num_values_); + encode_buffer_ = encoder->FlushValues(); + + decoder->SetData(num_values_, encode_buffer_->data(), + static_cast(encode_buffer_->size())); + int values_decoded_sum = 0; + while (values_decoded_sum < num_values_) { + int values_decoded = + decoder->Decode(decode_buf_ + values_decoded_sum, read_batch); + values_decoded_sum += values_decoded; + } + ASSERT_EQ(num_values_, values_decoded_sum); + ASSERT_NO_FATAL_FAILURE(VerifyResults(decode_buf_, draws_, num_values_)); + } + } + void CheckRoundtrip() override { auto encoder = MakeTypedEncoder(Encoding::DELTA_BINARY_PACKED, false, descr_.get()); @@ -1388,8 +1411,6 @@ TYPED_TEST(TestDeltaBitPackEncoding, BasicRoundTrip) { ASSERT_NO_FATAL_FAILURE( this->Execute((values_per_mini_block * values_per_block) + 1, 10)); ASSERT_NO_FATAL_FAILURE(this->Execute(0, 0)); - ASSERT_NO_FATAL_FAILURE(this->Execute(1, 1)); - ASSERT_NO_FATAL_FAILURE(this->Execute(1, 10)); ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced( /*nvalues*/ 1234, /*repeats*/ 1, /*valid_bits_offset*/ 64, /*null_probability*/ 0.1)); @@ -1415,5 +1436,16 @@ TYPED_TEST(TestDeltaBitPackEncoding, BasicRoundTrip) { } } +TYPED_TEST(TestDeltaBitPackEncoding, ZeroRoundTrip) { + ASSERT_NO_FATAL_FAILURE(this->Execute(1, 1)); + ASSERT_NO_FATAL_FAILURE(this->Execute(1, 2)); + ASSERT_NO_FATAL_FAILURE(this->Execute(2, 2)); + ASSERT_NO_FATAL_FAILURE(this->ExecuteSteps(1, 1, 1)); + ASSERT_NO_FATAL_FAILURE(this->ExecuteSteps(1, 2, 1)); + ASSERT_NO_FATAL_FAILURE(this->ExecuteSteps(2, 2, 1)); + ASSERT_NO_FATAL_FAILURE(this->ExecuteSteps(10, 10, 1)); + ASSERT_NO_FATAL_FAILURE(this->ExecuteSteps(10, 10, 3)); +} + } // namespace test } // namespace parquet From 0abb8638c23500059e44727ba832b5d8bd4c9339 Mon Sep 17 00:00:00 2001 From: mwish Date: Fri, 30 Dec 2022 22:56:17 +0800 Subject: [PATCH 4/8] address comment --- cpp/src/parquet/encoding.cc | 6 ++++++ cpp/src/parquet/encoding_test.cc | 4 +++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 2a9acafff53..28f1f65cb33 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -2482,6 +2482,12 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder(total_value_count_)) { InitBlock(); } diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc index 93be29cf695..1d89fd81aab 100644 --- a/cpp/src/parquet/encoding_test.cc +++ b/cpp/src/parquet/encoding_test.cc @@ -1436,15 +1436,17 @@ TYPED_TEST(TestDeltaBitPackEncoding, BasicRoundTrip) { } } -TYPED_TEST(TestDeltaBitPackEncoding, ZeroRoundTrip) { +TYPED_TEST(TestDeltaBitPackEncoding, SmallBatchRoundTrip) { ASSERT_NO_FATAL_FAILURE(this->Execute(1, 1)); ASSERT_NO_FATAL_FAILURE(this->Execute(1, 2)); ASSERT_NO_FATAL_FAILURE(this->Execute(2, 2)); + ASSERT_NO_FATAL_FAILURE(this->Execute(1, 10)); ASSERT_NO_FATAL_FAILURE(this->ExecuteSteps(1, 1, 1)); ASSERT_NO_FATAL_FAILURE(this->ExecuteSteps(1, 2, 1)); ASSERT_NO_FATAL_FAILURE(this->ExecuteSteps(2, 2, 1)); ASSERT_NO_FATAL_FAILURE(this->ExecuteSteps(10, 10, 1)); ASSERT_NO_FATAL_FAILURE(this->ExecuteSteps(10, 10, 3)); + ASSERT_NO_FATAL_FAILURE(this->ExecuteSteps(1, 10, 1)); } } // namespace test From 3c0a611f860ad14aff54d0db21ab7e8f4202a9b6 Mon Sep 17 00:00:00 2001 From: mwish <1506118561@qq.com> Date: Sat, 31 Dec 2022 04:01:38 +0800 Subject: [PATCH 5/8] Update commit in cpp/src/parquet/encoding.cc Change the comments for fixing Co-authored-by: Rok Mihevc --- cpp/src/parquet/encoding.cc | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 28f1f65cb33..8abb6e3a277 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -2482,12 +2482,12 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder(total_value_count_)) { InitBlock(); } From 7125483c42eaa1cb837c9390a3d57529e13207cb Mon Sep 17 00:00:00 2001 From: mwish Date: Tue, 3 Jan 2023 19:53:29 +0800 Subject: [PATCH 6/8] address comment --- cpp/src/parquet/encoding.cc | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index e6be56e63ad..5d0b5ad9c87 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -2481,14 +2481,15 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder(total_value_count_)) { + if (1 != static_cast(total_value_count_)) { InitBlock(); } break; From 1cf2511b3bdd7af92fc7cf66def7a8ba20486489 Mon Sep 17 00:00:00 2001 From: mwish Date: Tue, 3 Jan 2023 23:46:06 +0800 Subject: [PATCH 7/8] add extra_batch_size, and remove ExecuteSteps --- cpp/src/parquet/encoding_test.cc | 64 ++++++++++---------------------- 1 file changed, 19 insertions(+), 45 deletions(-) diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc index 1d89fd81aab..9fa1a0d52db 100644 --- a/cpp/src/parquet/encoding_test.cc +++ b/cpp/src/parquet/encoding_test.cc @@ -1291,6 +1291,7 @@ class TestDeltaBitPackEncoding : public TestEncodingBase { using c_type = typename Type::c_type; static constexpr int TYPE = Type::type_num; static constexpr size_t ROUND_TRIP_TIMES = 3; + const std::vector EXTRA_READ_BATCH_SIZES = {1, 10}; void InitBoundData(int nvalues, int repeats, c_type half_range) { num_values_ = nvalues * repeats; @@ -1324,43 +1325,29 @@ class TestDeltaBitPackEncoding : public TestEncodingBase { CheckRoundtripSpaced(valid_bits, valid_bits_offset); } - void ExecuteSteps(int nvalues, int repeats, int read_batch) { - this->InitData(nvalues, repeats); - auto encoder = - MakeTypedEncoder(Encoding::DELTA_BINARY_PACKED, false, descr_.get()); - auto decoder = MakeTypedDecoder(Encoding::DELTA_BINARY_PACKED, descr_.get()); - - for (size_t i = 0; i < ROUND_TRIP_TIMES; ++i) { - encoder->Put(draws_, num_values_); - encode_buffer_ = encoder->FlushValues(); - - decoder->SetData(num_values_, encode_buffer_->data(), - static_cast(encode_buffer_->size())); - int values_decoded_sum = 0; - while (values_decoded_sum < num_values_) { - int values_decoded = - decoder->Decode(decode_buf_ + values_decoded_sum, read_batch); - values_decoded_sum += values_decoded; - } - ASSERT_EQ(num_values_, values_decoded_sum); - ASSERT_NO_FATAL_FAILURE(VerifyResults(decode_buf_, draws_, num_values_)); - } - } - void CheckRoundtrip() override { auto encoder = MakeTypedEncoder(Encoding::DELTA_BINARY_PACKED, false, descr_.get()); auto decoder = MakeTypedDecoder(Encoding::DELTA_BINARY_PACKED, descr_.get()); - + auto read_batch_sizes = EXTRA_READ_BATCH_SIZES; + read_batch_sizes.push_back(num_values_); for (size_t i = 0; i < ROUND_TRIP_TIMES; ++i) { - encoder->Put(draws_, num_values_); - encode_buffer_ = encoder->FlushValues(); - - decoder->SetData(num_values_, encode_buffer_->data(), - static_cast(encode_buffer_->size())); - int values_decoded = decoder->Decode(decode_buf_, num_values_); - ASSERT_EQ(num_values_, values_decoded); - ASSERT_NO_FATAL_FAILURE(VerifyResults(decode_buf_, draws_, num_values_)); + for (int read_batch_size : read_batch_sizes) { + encoder->Put(draws_, num_values_); + encode_buffer_ = encoder->FlushValues(); + + decoder->SetData(num_values_, encode_buffer_->data(), + static_cast(encode_buffer_->size())); + + int values_decoded_sum = 0; + while (values_decoded_sum < num_values_) { + int values_decoded = + decoder->Decode(decode_buf_ + values_decoded_sum, read_batch_size); + values_decoded_sum += values_decoded; + } + ASSERT_EQ(num_values_, values_decoded_sum); + ASSERT_NO_FATAL_FAILURE(VerifyResults(decode_buf_, draws_, num_values_)); + } } } @@ -1436,18 +1423,5 @@ TYPED_TEST(TestDeltaBitPackEncoding, BasicRoundTrip) { } } -TYPED_TEST(TestDeltaBitPackEncoding, SmallBatchRoundTrip) { - ASSERT_NO_FATAL_FAILURE(this->Execute(1, 1)); - ASSERT_NO_FATAL_FAILURE(this->Execute(1, 2)); - ASSERT_NO_FATAL_FAILURE(this->Execute(2, 2)); - ASSERT_NO_FATAL_FAILURE(this->Execute(1, 10)); - ASSERT_NO_FATAL_FAILURE(this->ExecuteSteps(1, 1, 1)); - ASSERT_NO_FATAL_FAILURE(this->ExecuteSteps(1, 2, 1)); - ASSERT_NO_FATAL_FAILURE(this->ExecuteSteps(2, 2, 1)); - ASSERT_NO_FATAL_FAILURE(this->ExecuteSteps(10, 10, 1)); - ASSERT_NO_FATAL_FAILURE(this->ExecuteSteps(10, 10, 3)); - ASSERT_NO_FATAL_FAILURE(this->ExecuteSteps(1, 10, 1)); -} - } // namespace test } // namespace parquet From be8436cb32a2fb90e88e00b5a29bff50d3ad2460 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Wed, 4 Jan 2023 14:43:15 +0100 Subject: [PATCH 8/8] Nits --- cpp/src/parquet/encoding.cc | 11 ++++++----- cpp/src/parquet/encoding_test.cc | 30 +++++++++++++++--------------- 2 files changed, 21 insertions(+), 20 deletions(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 5d0b5ad9c87..b9472d72aeb 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -2485,11 +2485,12 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder(total_value_count_)) { + // 1. total_value_count_ == 1, which means that the page may have only + // one value (encoded in the header), and we should not initialize + // any block. + // 2. total_value_count_ != 1, which means we should initialize the + // incoming block for subsequent reads. + if (total_value_count_ != 1) { InitBlock(); } break; diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc index 9e478a126e3..b8363d29cdb 100644 --- a/cpp/src/parquet/encoding_test.cc +++ b/cpp/src/parquet/encoding_test.cc @@ -1290,8 +1290,8 @@ class TestDeltaBitPackEncoding : public TestEncodingBase { public: using c_type = typename Type::c_type; static constexpr int TYPE = Type::type_num; - static constexpr size_t ROUND_TRIP_TIMES = 3; - const std::vector EXTRA_READ_BATCH_SIZES = {1, 10}; + static constexpr size_t kNumRoundTrips = 3; + const std::vector kReadBatchSizes = {1, 11}; void InitBoundData(int nvalues, int repeats, c_type half_range) { num_values_ = nvalues * repeats; @@ -1329,23 +1329,23 @@ class TestDeltaBitPackEncoding : public TestEncodingBase { auto encoder = MakeTypedEncoder(Encoding::DELTA_BINARY_PACKED, false, descr_.get()); auto decoder = MakeTypedDecoder(Encoding::DELTA_BINARY_PACKED, descr_.get()); - auto read_batch_sizes = EXTRA_READ_BATCH_SIZES; + auto read_batch_sizes = kReadBatchSizes; read_batch_sizes.push_back(num_values_); - for (size_t i = 0; i < ROUND_TRIP_TIMES; ++i) { - for (int read_batch_size : read_batch_sizes) { - encoder->Put(draws_, num_values_); - encode_buffer_ = encoder->FlushValues(); - + // Encode a number of times to exercise the flush logic + for (size_t i = 0; i < kNumRoundTrips; ++i) { + encoder->Put(draws_, num_values_); + encode_buffer_ = encoder->FlushValues(); + // Exercise different batch sizes + for (const int read_batch_size : read_batch_sizes) { decoder->SetData(num_values_, encode_buffer_->data(), static_cast(encode_buffer_->size())); - int values_decoded_sum = 0; - while (values_decoded_sum < num_values_) { - int values_decoded = - decoder->Decode(decode_buf_ + values_decoded_sum, read_batch_size); - values_decoded_sum += values_decoded; + int values_decoded = 0; + while (values_decoded < num_values_) { + values_decoded += + decoder->Decode(decode_buf_ + values_decoded, read_batch_size); } - ASSERT_EQ(num_values_, values_decoded_sum); + ASSERT_EQ(num_values_, values_decoded); ASSERT_NO_FATAL_FAILURE(VerifyResults(decode_buf_, draws_, num_values_)); } } @@ -1363,7 +1363,7 @@ class TestDeltaBitPackEncoding : public TestEncodingBase { } } - for (size_t i = 0; i < ROUND_TRIP_TIMES; ++i) { + for (size_t i = 0; i < kNumRoundTrips; ++i) { encoder->PutSpaced(draws_, num_values_, valid_bits, valid_bits_offset); encode_buffer_ = encoder->FlushValues(); decoder->SetData(num_values_ - null_count, encode_buffer_->data(),