From ed65e98aea116f145478c1e4d423bb28360deb68 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Sun, 2 Oct 2022 23:02:18 +0200 Subject: [PATCH 01/26] Initial commit --- cpp/src/arrow/util/bit_stream_utils.h | 16 +- cpp/src/parquet/encoding.cc | 190 ++++++++++++++++++++- cpp/src/parquet/encoding_test.cc | 59 +++++++ docs/source/cpp/parquet.rst | 2 +- python/pyarrow/tests/parquet/test_basic.py | 17 +- 5 files changed, 274 insertions(+), 10 deletions(-) diff --git a/cpp/src/arrow/util/bit_stream_utils.h b/cpp/src/arrow/util/bit_stream_utils.h index 2f70c286503..02746abc8f7 100644 --- a/cpp/src/arrow/util/bit_stream_utils.h +++ b/cpp/src/arrow/util/bit_stream_utils.h @@ -69,6 +69,13 @@ class BitWriter { template bool PutAligned(T v, int num_bytes); + /// Writes v to givent pointer using num_bytes. If T is larger than + /// num_bytes, the extra high-order bytes will be ignored. Returns false if + /// there was not enough space. + /// Assume the v is stored in buffer_ as a litte-endian format + template + bool PutAlignedOffset(uint8_t* ptr, T val, int num_bytes); + /// Write a Vlq encoded int to the buffer. Returns false if there was not enough /// room. The value is written byte aligned. /// For more details on vlq: @@ -249,14 +256,19 @@ inline uint8_t* BitWriter::GetNextBytePtr(int num_bytes) { } template -inline bool BitWriter::PutAligned(T val, int num_bytes) { - uint8_t* ptr = GetNextBytePtr(num_bytes); +inline bool BitWriter::PutAlignedOffset(uint8_t* ptr, T val, int num_bytes) { if (ptr == NULL) return false; val = arrow::bit_util::ToLittleEndian(val); memcpy(ptr, &val, num_bytes); return true; } +template +inline bool BitWriter::PutAligned(T val, int num_bytes) { + uint8_t* ptr = GetNextBytePtr(num_bytes); + return PutAlignedOffset(ptr, val, num_bytes); +} + namespace detail { inline void ResetBufferedValues_(const uint8_t* buffer, int byte_offset, diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 44f762d7113..db05a81e349 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -2060,6 +2060,184 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, } }; +// ---------------------------------------------------------------------- +// DeltaBitPackEncoder + +template +class DeltaBitPackEncoder : public EncoderImpl, virtual public TypedEncoder { + public: + using T = typename DType::c_type; + + explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool) + : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool), + values_per_block_(128), + mini_blocks_per_block_(4), + values_per_mini_block_(values_per_block_ / mini_blocks_per_block_), + values_current_block_(0), + total_value_count_(0), + first_value_(0), + current_value_(0), + sink_(pool), + bits_buffer_(AllocateBuffer(pool, kInMemoryDefaultCapacity)), + bit_writer_(bits_buffer_->mutable_data(), static_cast(bits_buffer_->size())), + deltas_(std::vector(values_per_block_)) {} + + std::shared_ptr FlushValues() override; + + int64_t EstimatedDataEncodedSize() override { + return 4 * sizeof(uint64_t) + sink_.length(); + } + + using TypedEncoder::Put; + + void Put(const ::arrow::Array& values) override; + + void Put(const T* buffer, int num_values) override; + + void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits, + int64_t valid_bits_offset) override { + ParquetException::NYI("put spaced"); + } + + protected: + const uint32_t values_per_block_; + const uint32_t mini_blocks_per_block_; + const uint32_t values_per_mini_block_; + uint32_t values_current_block_; + uint32_t total_value_count_; + T first_value_; + T current_value_; + ::arrow::BufferBuilder sink_; + std::shared_ptr bits_buffer_; + ::arrow::bit_util::BitWriter bit_writer_; + std::vector deltas_; + + private: + void FlushBlock(); +}; + +template +void DeltaBitPackEncoder::Put(const T* src, int num_values) { + if (num_values == 0) { + return; + } + + uint32_t idx = 0; + if (total_value_count_ == 0) { + first_value_ = src[0]; + current_value_ = first_value_; + idx = 1; + } + total_value_count_ += num_values; + + int increment = total_value_count_ * sizeof(T); + if (ARROW_PREDICT_FALSE(sink_.capacity() + increment > sink_.capacity())) { + PARQUET_THROW_NOT_OK(sink_.Resize(increment, false)); + } + + while (idx < static_cast(num_values)) { + T value = src[idx]; + deltas_[values_current_block_] = value - current_value_; + current_value_ = value; + idx++; + values_current_block_++; + if (values_current_block_ == values_per_block_) { + FlushBlock(); + } + } + + if (values_current_block_ != 0) { + FlushBlock(); + } +} + +template +void DeltaBitPackEncoder::FlushBlock() { + if (values_current_block_ == 0) { + return; + } + + const auto min_delta = + static_cast(*std::min_element(deltas_.begin(), deltas_.end())); + DCHECK(bit_writer_.PutZigZagVlqInt(min_delta)); + + uint8_t* bit_width_offsets = bit_writer_.GetNextBytePtr(mini_blocks_per_block_); + DCHECK(bit_width_offsets != NULL); + + for (uint32_t i = 0; i < mini_blocks_per_block_; i++) { + const uint32_t n = std::min(values_per_mini_block_, values_current_block_); + if (n == 0) { + DCHECK( + bit_writer_.PutAlignedOffset(bit_width_offsets + i, uint32_t(1), 1)); + continue; + } + + const uint32_t start = i * values_per_mini_block_; + const auto max_delta = static_cast( + *std::max_element(deltas_.begin() + start, deltas_.begin() + start + n)); + + const uint32_t num_bits = bit_util::NumRequiredBits(max_delta - min_delta); + DCHECK(bit_writer_.PutAlignedOffset(bit_width_offsets + i, num_bits, 1)); + + for (uint64_t j = start; j < start + n; j++) { + DCHECK( + bit_writer_.PutValue(static_cast(deltas_[j] - min_delta), num_bits)); + } + for (uint64_t j = n; j < values_per_mini_block_; j++) { + DCHECK(bit_writer_.PutValue(0, num_bits)); + } + values_current_block_ -= n; + } + DCHECK_EQ(values_current_block_, 0); + + PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written())); + bit_writer_.Clear(); +} + +template +std::shared_ptr DeltaBitPackEncoder::FlushValues() { + std::shared_ptr header_buffer = AllocateBuffer(pool_, 32); + ::arrow::bit_util::BitWriter header_writer(header_buffer->mutable_data(), + static_cast(header_buffer->size())); + if (!header_writer.PutVlqInt(values_per_block_) || + !header_writer.PutVlqInt(mini_blocks_per_block_) || + !header_writer.PutVlqInt(total_value_count_) || + !header_writer.PutZigZagVlqInt(first_value_)) { + throw ParquetException("cannot write"); + } + header_writer.Flush(false); + + ::arrow::BufferBuilder sink; + PARQUET_THROW_NOT_OK( + sink.Append(header_writer.buffer(), header_writer.bytes_written())); + header_writer.Clear(); + + std::shared_ptr bits_buffer; + PARQUET_THROW_NOT_OK(sink_.Finish(&bits_buffer, true)); + + std::shared_ptr buffer; + PARQUET_THROW_NOT_OK(sink.Append(bits_buffer->mutable_data(), bits_buffer->size())); + PARQUET_THROW_NOT_OK(sink.Finish(&buffer, true)); + return buffer; +} + +template <> +void DeltaBitPackEncoder::Put(const ::arrow::Array& values) { + auto src = reinterpret_cast(values.data()->buffers[0]->mutable_data()); + Put(src, static_cast(values.length())); +} + +template <> +void DeltaBitPackEncoder::Put(const ::arrow::Array& values) { + auto src = reinterpret_cast(values.data()->buffers[0]->mutable_data()); + Put(src, static_cast(values.length())); +} + +template +void DeltaBitPackEncoder::Put(const ::arrow::Array& values) { + ParquetException::NYI("direct put of " + values.type()->ToString()); +} + // ---------------------------------------------------------------------- // DeltaBitPackDecoder @@ -2210,7 +2388,7 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder(min_delta_) + static_cast(buffer[i + j]); buffer[i + j] = static_cast(delta + static_cast(last_value_)); @@ -2760,6 +2938,16 @@ std::unique_ptr MakeEncoder(Type::type type_num, Encoding::type encodin throw ParquetException("BYTE_STREAM_SPLIT only supports FLOAT and DOUBLE"); break; } + } else if (encoding == Encoding::DELTA_BINARY_PACKED) { + switch (type_num) { + case Type::INT32: + return std::unique_ptr(new DeltaBitPackEncoder(descr, pool)); + case Type::INT64: + return std::unique_ptr(new DeltaBitPackEncoder(descr, pool)); + default: + throw ParquetException("DELTA_BINARY_PACKED only supports INT32 and INT64"); + break; + } } else { ParquetException::NYI("Selected encoding is not supported"); } diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc index 7d42e3e8ce3..c4c740aed88 100644 --- a/cpp/src/parquet/encoding_test.cc +++ b/cpp/src/parquet/encoding_test.cc @@ -1276,5 +1276,64 @@ TEST(ByteStreamSplitEncodeDecode, InvalidDataTypes) { ASSERT_THROW(MakeTypedDecoder(Encoding::BYTE_STREAM_SPLIT), ParquetException); } +// ---------------------------------------------------------------------- +// DELTA_BINARY_PACKED encode/decode tests. + +template +class TestDeltaBitPackEncoding : public TestEncodingBase { + public: + using c_type = typename Type::c_type; + static constexpr int TYPE = Type::type_num; + + virtual void CheckRoundtrip() { + auto encoder = + MakeTypedEncoder(Encoding::DELTA_BINARY_PACKED, false, descr_.get()); + auto decoder = MakeTypedDecoder(Encoding::DELTA_BINARY_PACKED, descr_.get()); + + encoder->Put(draws_, num_values_); + encode_buffer_ = encoder->FlushValues(); + + decoder->SetData(num_values_, encode_buffer_->data(), + static_cast(encode_buffer_->size())); + int values_decoded = decoder->Decode(decode_buf_, num_values_); + ASSERT_EQ(num_values_, values_decoded); + ASSERT_NO_FATAL_FAILURE(VerifyResults(decode_buf_, draws_, num_values_)); + } + + void CheckRoundtripSpaced(const uint8_t* valid_bits, int64_t valid_bits_offset) { + auto encoder = + MakeTypedEncoder(Encoding::DELTA_BINARY_PACKED, false, descr_.get()); + auto decoder = MakeTypedDecoder(Encoding::DELTA_BINARY_PACKED, descr_.get()); + int null_count = 0; + for (auto i = 0; i < num_values_; i++) { + if (!bit_util::GetBit(valid_bits, valid_bits_offset + i)) { + null_count++; + } + } + + encoder->PutSpaced(draws_, num_values_, valid_bits, valid_bits_offset); + encode_buffer_ = encoder->FlushValues(); + decoder->SetData(num_values_ - null_count, encode_buffer_->data(), + static_cast(encode_buffer_->size())); + auto values_decoded = decoder->DecodeSpaced(decode_buf_, num_values_, null_count, + valid_bits, valid_bits_offset); + ASSERT_EQ(num_values_, values_decoded); + ASSERT_NO_FATAL_FAILURE(VerifyResultsSpaced(decode_buf_, draws_, num_values_, + valid_bits, valid_bits_offset)); + } + + protected: + USING_BASE_MEMBERS(); +}; + +// TODO +// typedef ::testing::Types TestDeltaBitPackEncodingTypes; +typedef ::testing::Types TestDeltaBitPackEncodingTypes; +TYPED_TEST_SUITE(TestDeltaBitPackEncoding, TestDeltaBitPackEncodingTypes); + +TYPED_TEST(TestDeltaBitPackEncoding, BasicRoundTrip) { + ASSERT_NO_FATAL_FAILURE(this->Execute(2500, 2)); +} + } // namespace test } // namespace parquet diff --git a/docs/source/cpp/parquet.rst b/docs/source/cpp/parquet.rst index 23a9657fd41..edc42d54cff 100644 --- a/docs/source/cpp/parquet.rst +++ b/docs/source/cpp/parquet.rst @@ -398,7 +398,7 @@ Encodings +--------------------------+----------+----------+---------+ | BYTE_STREAM_SPLIT | ✓ | ✓ | | +--------------------------+----------+----------+---------+ -| DELTA_BINARY_PACKED | ✓ | | | +| DELTA_BINARY_PACKED | ✓ | ✓ | | +--------------------------+----------+----------+---------+ | DELTA_BYTE_ARRAY | ✓ | | | +--------------------------+----------+----------+---------+ diff --git a/python/pyarrow/tests/parquet/test_basic.py b/python/pyarrow/tests/parquet/test_basic.py index 004bbd8d77f..b2809e0f910 100644 --- a/python/pyarrow/tests/parquet/test_basic.py +++ b/python/pyarrow/tests/parquet/test_basic.py @@ -405,6 +405,13 @@ def test_column_encoding(use_legacy_dataset): column_encoding="PLAIN", use_legacy_dataset=use_legacy_dataset) + # Check "DELTA_BINARY_PACKED" for integer columns. + _check_roundtrip(mixed_table, expected=mixed_table, + use_dictionary=False, + column_encoding={'a': "PLAIN", + 'b': "DELTA_BINARY_PACKED"}, + use_legacy_dataset=use_legacy_dataset) + # Try to pass "BYTE_STREAM_SPLIT" column encoding for integer column 'b'. # This should throw an error as it is only supports FLOAT and DOUBLE. with pytest.raises(IOError, @@ -415,14 +422,12 @@ def test_column_encoding(use_legacy_dataset): column_encoding={'b': "BYTE_STREAM_SPLIT"}, use_legacy_dataset=use_legacy_dataset) - # Try to pass "DELTA_BINARY_PACKED". - # This should throw an error as it is only supported for reading. - with pytest.raises(IOError, - match="Not yet implemented: Selected encoding is" - " not supported."): + # Try to pass use "DELTA_BINARY_PACKED" encoding on float column. + # This should throw an error as only integers are supported. + with pytest.raises(OSError): _check_roundtrip(mixed_table, expected=mixed_table, use_dictionary=False, - column_encoding={'b': "DELTA_BINARY_PACKED"}, + column_encoding={'a': "DELTA_BINARY_PACKED"}, use_legacy_dataset=use_legacy_dataset) # Try to pass "RLE_DICTIONARY". From 4b6c56264810454abe1e4bed0dc16fd239b4fbe8 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Mon, 3 Oct 2022 17:09:11 +0200 Subject: [PATCH 02/26] Review feedback --- cpp/src/parquet/encoding.cc | 20 ++++++++------------ 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index db05a81e349..15d5c71433e 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -2080,7 +2080,7 @@ class DeltaBitPackEncoder : public EncoderImpl, virtual public TypedEncodermutable_data(), static_cast(bits_buffer_->size())), - deltas_(std::vector(values_per_block_)) {} + deltas_(std::vector(values_per_block_)) {} std::shared_ptr FlushValues() override; @@ -2110,7 +2110,7 @@ class DeltaBitPackEncoder : public EncoderImpl, virtual public TypedEncoder bits_buffer_; ::arrow::bit_util::BitWriter bit_writer_; - std::vector deltas_; + std::vector deltas_; private: void FlushBlock(); @@ -2129,11 +2129,7 @@ void DeltaBitPackEncoder::Put(const T* src, int num_values) { idx = 1; } total_value_count_ += num_values; - - int increment = total_value_count_ * sizeof(T); - if (ARROW_PREDICT_FALSE(sink_.capacity() + increment > sink_.capacity())) { - PARQUET_THROW_NOT_OK(sink_.Resize(increment, false)); - } + PARQUET_THROW_NOT_OK(sink_.Resize(total_value_count_ * sizeof(T), false)); while (idx < static_cast(num_values)) { T value = src[idx]; @@ -2157,8 +2153,8 @@ void DeltaBitPackEncoder::FlushBlock() { return; } - const auto min_delta = - static_cast(*std::min_element(deltas_.begin(), deltas_.end())); + const int64_t min_delta = static_cast( + *std::min_element(deltas_.begin(), deltas_.begin() + values_current_block_)); DCHECK(bit_writer_.PutZigZagVlqInt(min_delta)); uint8_t* bit_width_offsets = bit_writer_.GetNextBytePtr(mini_blocks_per_block_); @@ -2173,7 +2169,7 @@ void DeltaBitPackEncoder::FlushBlock() { } const uint32_t start = i * values_per_mini_block_; - const auto max_delta = static_cast( + const int64_t max_delta = static_cast( *std::max_element(deltas_.begin() + start, deltas_.begin() + start + n)); const uint32_t num_bits = bit_util::NumRequiredBits(max_delta - min_delta); @@ -2223,13 +2219,13 @@ std::shared_ptr DeltaBitPackEncoder::FlushValues() { template <> void DeltaBitPackEncoder::Put(const ::arrow::Array& values) { - auto src = reinterpret_cast(values.data()->buffers[0]->mutable_data()); + auto src = values.data()->GetValues(1); Put(src, static_cast(values.length())); } template <> void DeltaBitPackEncoder::Put(const ::arrow::Array& values) { - auto src = reinterpret_cast(values.data()->buffers[0]->mutable_data()); + auto src = values.data()->GetValues(1); Put(src, static_cast(values.length())); } From 327e014feb0fb5a2a81ad813419beb3363ec55b2 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Tue, 4 Oct 2022 02:19:56 +0200 Subject: [PATCH 03/26] Use SubtractWithOverflow --- cpp/src/parquet/encoding.cc | 25 +++++++++++++++---------- cpp/src/parquet/encoding_test.cc | 4 ++-- 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 15d5c71433e..b114464249f 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -55,6 +55,7 @@ namespace bit_util = arrow::bit_util; using arrow::Status; using arrow::VisitNullBitmapInline; using arrow::internal::AddWithOverflow; +using arrow::internal::SubtractWithOverflow; using arrow::internal::checked_cast; using std::string_view; @@ -2080,7 +2081,7 @@ class DeltaBitPackEncoder : public EncoderImpl, virtual public TypedEncodermutable_data(), static_cast(bits_buffer_->size())), - deltas_(std::vector(values_per_block_)) {} + deltas_(std::vector(values_per_block_)) {} std::shared_ptr FlushValues() override; @@ -2110,7 +2111,7 @@ class DeltaBitPackEncoder : public EncoderImpl, virtual public TypedEncoder bits_buffer_; ::arrow::bit_util::BitWriter bit_writer_; - std::vector deltas_; + std::vector deltas_; private: void FlushBlock(); @@ -2133,7 +2134,7 @@ void DeltaBitPackEncoder::Put(const T* src, int num_values) { while (idx < static_cast(num_values)) { T value = src[idx]; - deltas_[values_current_block_] = value - current_value_; + SubtractWithOverflow(value, current_value_, &deltas_[values_current_block_]); current_value_ = value; idx++; values_current_block_++; @@ -2153,8 +2154,8 @@ void DeltaBitPackEncoder::FlushBlock() { return; } - const int64_t min_delta = static_cast( - *std::min_element(deltas_.begin(), deltas_.begin() + values_current_block_)); + const T min_delta = + *std::min_element(deltas_.begin(), deltas_.begin() + values_current_block_); DCHECK(bit_writer_.PutZigZagVlqInt(min_delta)); uint8_t* bit_width_offsets = bit_writer_.GetNextBytePtr(mini_blocks_per_block_); @@ -2169,15 +2170,19 @@ void DeltaBitPackEncoder::FlushBlock() { } const uint32_t start = i * values_per_mini_block_; - const int64_t max_delta = static_cast( - *std::max_element(deltas_.begin() + start, deltas_.begin() + start + n)); + const T max_delta = + *std::max_element(deltas_.begin() + start, deltas_.begin() + start + n); - const uint32_t num_bits = bit_util::NumRequiredBits(max_delta - min_delta); + T max_delta_diff; + SubtractWithOverflow(max_delta, min_delta, &max_delta_diff); + const uint32_t num_bits = + bit_util::NumRequiredBits(static_cast(max_delta_diff)); DCHECK(bit_writer_.PutAlignedOffset(bit_width_offsets + i, num_bits, 1)); for (uint64_t j = start; j < start + n; j++) { - DCHECK( - bit_writer_.PutValue(static_cast(deltas_[j] - min_delta), num_bits)); + T value; + SubtractWithOverflow(deltas_[j], min_delta, &value); + DCHECK(bit_writer_.PutValue(static_cast(value), num_bits)); } for (uint64_t j = n; j < values_per_mini_block_; j++) { DCHECK(bit_writer_.PutValue(0, num_bits)); diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc index c4c740aed88..95e41ab7750 100644 --- a/cpp/src/parquet/encoding_test.cc +++ b/cpp/src/parquet/encoding_test.cc @@ -1327,12 +1327,12 @@ class TestDeltaBitPackEncoding : public TestEncodingBase { }; // TODO -// typedef ::testing::Types TestDeltaBitPackEncodingTypes; +// typedef ::testing::Types TestDeltaBitPackEncodingTypes; typedef ::testing::Types TestDeltaBitPackEncodingTypes; TYPED_TEST_SUITE(TestDeltaBitPackEncoding, TestDeltaBitPackEncodingTypes); TYPED_TEST(TestDeltaBitPackEncoding, BasicRoundTrip) { - ASSERT_NO_FATAL_FAILURE(this->Execute(2500, 2)); + ASSERT_NO_FATAL_FAILURE(this->Execute(25000, 200)); } } // namespace test From 43fac72a54411a7cf0a0913f8567ea5880715fdb Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Wed, 5 Oct 2022 22:47:54 +0200 Subject: [PATCH 04/26] SafeSignedAdd and SafeSignedSubtract --- cpp/src/parquet/encoding.cc | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index b114464249f..3997f55a097 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -55,8 +55,9 @@ namespace bit_util = arrow::bit_util; using arrow::Status; using arrow::VisitNullBitmapInline; using arrow::internal::AddWithOverflow; -using arrow::internal::SubtractWithOverflow; using arrow::internal::checked_cast; +using arrow::internal::SafeSignedAdd; +using arrow::internal::SafeSignedSubtract; using std::string_view; template @@ -2134,7 +2135,8 @@ void DeltaBitPackEncoder::Put(const T* src, int num_values) { while (idx < static_cast(num_values)) { T value = src[idx]; - SubtractWithOverflow(value, current_value_, &deltas_[values_current_block_]); + deltas_[values_current_block_] = + static_cast(SafeSignedSubtract(value, current_value_)); current_value_ = value; idx++; values_current_block_++; @@ -2173,15 +2175,13 @@ void DeltaBitPackEncoder::FlushBlock() { const T max_delta = *std::max_element(deltas_.begin() + start, deltas_.begin() + start + n); - T max_delta_diff; - SubtractWithOverflow(max_delta, min_delta, &max_delta_diff); + T max_delta_diff = static_cast(SafeSignedSubtract(max_delta, min_delta)); const uint32_t num_bits = bit_util::NumRequiredBits(static_cast(max_delta_diff)); DCHECK(bit_writer_.PutAlignedOffset(bit_width_offsets + i, num_bits, 1)); for (uint64_t j = start; j < start + n; j++) { - T value; - SubtractWithOverflow(deltas_[j], min_delta, &value); + T value = static_cast(SafeSignedSubtract(deltas_[j], min_delta)); DCHECK(bit_writer_.PutValue(static_cast(value), num_bits)); } for (uint64_t j = n; j < values_per_mini_block_; j++) { @@ -2390,9 +2390,8 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder(min_delta_) + static_cast(buffer[i + j]); - buffer[i + j] = static_cast(delta + static_cast(last_value_)); + const T delta = SafeSignedAdd(min_delta_, buffer[i + j]); + buffer[i + j] = SafeSignedAdd(delta, last_value_); last_value_ = buffer[i + j]; } values_current_mini_block_ -= values_decode; From 1fd82120e40704db5ded785a1772c4dcdb58ce3b Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Thu, 6 Oct 2022 02:01:39 +0200 Subject: [PATCH 05/26] PutSpaced --- cpp/src/parquet/encoding.cc | 59 ++++++++++++++++++++------------ cpp/src/parquet/encoding_test.cc | 6 ++-- 2 files changed, 41 insertions(+), 24 deletions(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 3997f55a097..17087731fc4 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -2080,7 +2080,7 @@ class DeltaBitPackEncoder : public EncoderImpl, virtual public TypedEncodermutable_data(), static_cast(bits_buffer_->size())), deltas_(std::vector(values_per_block_)) {} @@ -2097,9 +2097,7 @@ class DeltaBitPackEncoder : public EncoderImpl, virtual public TypedEncoder bits_buffer_; @@ -2124,19 +2122,18 @@ void DeltaBitPackEncoder::Put(const T* src, int num_values) { return; } - uint32_t idx = 0; + int32_t idx = 0; if (total_value_count_ == 0) { first_value_ = src[0]; - current_value_ = first_value_; + current_value_ = src[0]; idx = 1; } total_value_count_ += num_values; PARQUET_THROW_NOT_OK(sink_.Resize(total_value_count_ * sizeof(T), false)); - while (idx < static_cast(num_values)) { + while (idx < num_values) { T value = src[idx]; - deltas_[values_current_block_] = - static_cast(SafeSignedSubtract(value, current_value_)); + deltas_[values_current_block_] = SafeSignedSubtract(value, current_value_); current_value_ = value; idx++; values_current_block_++; @@ -2166,8 +2163,7 @@ void DeltaBitPackEncoder::FlushBlock() { for (uint32_t i = 0; i < mini_blocks_per_block_; i++) { const uint32_t n = std::min(values_per_mini_block_, values_current_block_); if (n == 0) { - DCHECK( - bit_writer_.PutAlignedOffset(bit_width_offsets + i, uint32_t(1), 1)); + DCHECK(bit_writer_.PutAlignedOffset(bit_width_offsets + i, 1, 1)); continue; } @@ -2175,17 +2171,22 @@ void DeltaBitPackEncoder::FlushBlock() { const T max_delta = *std::max_element(deltas_.begin() + start, deltas_.begin() + start + n); - T max_delta_diff = static_cast(SafeSignedSubtract(max_delta, min_delta)); - const uint32_t num_bits = - bit_util::NumRequiredBits(static_cast(max_delta_diff)); - DCHECK(bit_writer_.PutAlignedOffset(bit_width_offsets + i, num_bits, 1)); + const T max_delta_diff = SafeSignedSubtract(max_delta, min_delta); + int64_t num_bits; + if constexpr (std::is_same::value) { + num_bits = bit_util::NumRequiredBits(static_cast(max_delta_diff)); + } else { + num_bits = bit_util::NumRequiredBits(static_cast(max_delta_diff)); + } + const int32_t num_bytes = static_cast(bit_util::CeilDiv(num_bits, 8)); + DCHECK(bit_writer_.PutAlignedOffset(bit_width_offsets + i, num_bits, 1)); - for (uint64_t j = start; j < start + n; j++) { - T value = static_cast(SafeSignedSubtract(deltas_[j], min_delta)); - DCHECK(bit_writer_.PutValue(static_cast(value), num_bits)); + for (uint32_t j = start; j < start + n; j++) { + const T value = SafeSignedSubtract(deltas_[j], min_delta); + DCHECK(bit_writer_.PutAligned(value, num_bytes)); } - for (uint64_t j = n; j < values_per_mini_block_; j++) { - DCHECK(bit_writer_.PutValue(0, num_bits)); + for (uint32_t j = n; j < values_per_mini_block_; j++) { + DCHECK(bit_writer_.PutAligned(0, num_bytes)); } values_current_block_ -= n; } @@ -2239,6 +2240,22 @@ void DeltaBitPackEncoder::Put(const ::arrow::Array& values) { ParquetException::NYI("direct put of " + values.type()->ToString()); } +template +void DeltaBitPackEncoder::PutSpaced(const T* src, int num_values, + const uint8_t* valid_bits, + int64_t valid_bits_offset) { + if (valid_bits != NULLPTR) { + PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values * sizeof(T), + this->memory_pool())); + T* data = reinterpret_cast(buffer->mutable_data()); + int num_valid_values = ::arrow::util::internal::SpacedCompress( + src, num_values, valid_bits, valid_bits_offset, data); + Put(data, num_valid_values); + } else { + Put(src, num_values); + } +} + // ---------------------------------------------------------------------- // DeltaBitPackDecoder diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc index 95e41ab7750..36c6a3a390b 100644 --- a/cpp/src/parquet/encoding_test.cc +++ b/cpp/src/parquet/encoding_test.cc @@ -1326,13 +1326,13 @@ class TestDeltaBitPackEncoding : public TestEncodingBase { USING_BASE_MEMBERS(); }; -// TODO -// typedef ::testing::Types TestDeltaBitPackEncodingTypes; -typedef ::testing::Types TestDeltaBitPackEncodingTypes; +typedef ::testing::Types TestDeltaBitPackEncodingTypes; TYPED_TEST_SUITE(TestDeltaBitPackEncoding, TestDeltaBitPackEncodingTypes); TYPED_TEST(TestDeltaBitPackEncoding, BasicRoundTrip) { ASSERT_NO_FATAL_FAILURE(this->Execute(25000, 200)); + ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced( + /*nvalues*/ 1234, /*repeats*/ 1, /*valid_bits_offset*/ 64, /*null_prob*/ 0.1)); } } // namespace test From c3e517989bee8cafb242a04b836ccd0642b39a5e Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Mon, 10 Oct 2022 21:33:21 +0200 Subject: [PATCH 06/26] Review feedback. --- cpp/src/parquet/encoding.cc | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 17087731fc4..5f79cb2771e 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -2141,10 +2141,6 @@ void DeltaBitPackEncoder::Put(const T* src, int num_values) { FlushBlock(); } } - - if (values_current_block_ != 0) { - FlushBlock(); - } } template @@ -2198,6 +2194,10 @@ void DeltaBitPackEncoder::FlushBlock() { template std::shared_ptr DeltaBitPackEncoder::FlushValues() { + if (values_current_block_ != 0) { + FlushBlock(); + } + std::shared_ptr header_buffer = AllocateBuffer(pool_, 32); ::arrow::bit_util::BitWriter header_writer(header_buffer->mutable_data(), static_cast(header_buffer->size())); From cc1d74e06e569efdd38ab2cadc3843fd1b9b62de Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Mon, 10 Oct 2022 22:30:59 +0200 Subject: [PATCH 07/26] Fixing types --- cpp/src/arrow/util/bit_stream_utils.h | 2 +- cpp/src/parquet/encoding.cc | 34 ++++++++++++++------------- 2 files changed, 19 insertions(+), 17 deletions(-) diff --git a/cpp/src/arrow/util/bit_stream_utils.h b/cpp/src/arrow/util/bit_stream_utils.h index 02746abc8f7..92c74db7698 100644 --- a/cpp/src/arrow/util/bit_stream_utils.h +++ b/cpp/src/arrow/util/bit_stream_utils.h @@ -69,7 +69,7 @@ class BitWriter { template bool PutAligned(T v, int num_bytes); - /// Writes v to givent pointer using num_bytes. If T is larger than + /// Writes v to given pointer using num_bytes. If T is larger than /// num_bytes, the extra high-order bytes will be ignored. Returns false if /// there was not enough space. /// Assume the v is stored in buffer_ as a litte-endian format diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 5f79cb2771e..2733cb9585a 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -2080,15 +2080,13 @@ class DeltaBitPackEncoder : public EncoderImpl, virtual public TypedEncodermutable_data(), static_cast(bits_buffer_->size())), deltas_(std::vector(values_per_block_)) {} std::shared_ptr FlushValues() override; - int64_t EstimatedDataEncodedSize() override { - return 4 * sizeof(uint64_t) + sink_.length(); - } + int64_t EstimatedDataEncodedSize() override { return 4 * sizeof(T) + sink_.length(); } using TypedEncoder::Put; @@ -2105,7 +2103,7 @@ class DeltaBitPackEncoder : public EncoderImpl, virtual public TypedEncoder bits_buffer_; @@ -2122,14 +2120,13 @@ void DeltaBitPackEncoder::Put(const T* src, int num_values) { return; } - int32_t idx = 0; + int idx = 0; if (total_value_count_ == 0) { - first_value_ = src[0]; current_value_ = src[0]; + first_value_ = current_value_; idx = 1; } total_value_count_ += num_values; - PARQUET_THROW_NOT_OK(sink_.Resize(total_value_count_ * sizeof(T), false)); while (idx < num_values) { T value = src[idx]; @@ -2154,12 +2151,15 @@ void DeltaBitPackEncoder::FlushBlock() { DCHECK(bit_writer_.PutZigZagVlqInt(min_delta)); uint8_t* bit_width_offsets = bit_writer_.GetNextBytePtr(mini_blocks_per_block_); - DCHECK(bit_width_offsets != NULL); + DCHECK(bit_width_offsets != nullptr); for (uint32_t i = 0; i < mini_blocks_per_block_; i++) { const uint32_t n = std::min(values_per_mini_block_, values_current_block_); if (n == 0) { - DCHECK(bit_writer_.PutAlignedOffset(bit_width_offsets + i, 1, 1)); + DCHECK(bit_writer_.PutAlignedOffset(bit_width_offsets++, int8_t(32), 1)); + for (uint32_t j = 0; j < values_per_mini_block_; j++) { + DCHECK(bit_writer_.PutAligned(0, 4)); + } continue; } @@ -2168,26 +2168,28 @@ void DeltaBitPackEncoder::FlushBlock() { *std::max_element(deltas_.begin() + start, deltas_.begin() + start + n); const T max_delta_diff = SafeSignedSubtract(max_delta, min_delta); - int64_t num_bits; + int8_t num_bits; if constexpr (std::is_same::value) { - num_bits = bit_util::NumRequiredBits(static_cast(max_delta_diff)); + num_bits = bit_util::NumRequiredBits(max_delta_diff); } else { num_bits = bit_util::NumRequiredBits(static_cast(max_delta_diff)); } - const int32_t num_bytes = static_cast(bit_util::CeilDiv(num_bits, 8)); - DCHECK(bit_writer_.PutAlignedOffset(bit_width_offsets + i, num_bits, 1)); + const int32_t num_bytes = + static_cast(bit_util::BytesForBits(num_bits)); + DCHECK(bit_writer_.PutAlignedOffset(bit_width_offsets++, num_bits, 1)); for (uint32_t j = start; j < start + n; j++) { const T value = SafeSignedSubtract(deltas_[j], min_delta); - DCHECK(bit_writer_.PutAligned(value, num_bytes)); + DCHECK(bit_writer_.PutAligned(value, num_bytes)); } for (uint32_t j = n; j < values_per_mini_block_; j++) { - DCHECK(bit_writer_.PutAligned(0, num_bytes)); + DCHECK(bit_writer_.PutAligned(0, num_bytes)); } values_current_block_ -= n; } DCHECK_EQ(values_current_block_, 0); + bit_writer_.Flush(); PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written())); bit_writer_.Clear(); } From 50e0b6fd54e99cbf43aca620d28c01e8db436c06 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Fri, 14 Oct 2022 22:37:03 +0200 Subject: [PATCH 08/26] Update cpp/src/arrow/util/bit_stream_utils.h Co-authored-by: Will Jones --- cpp/src/arrow/util/bit_stream_utils.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/util/bit_stream_utils.h b/cpp/src/arrow/util/bit_stream_utils.h index 92c74db7698..7955767adb0 100644 --- a/cpp/src/arrow/util/bit_stream_utils.h +++ b/cpp/src/arrow/util/bit_stream_utils.h @@ -69,10 +69,10 @@ class BitWriter { template bool PutAligned(T v, int num_bytes); - /// Writes v to given pointer using num_bytes. If T is larger than + /// Writes val to given pointer using num_bytes. If T is larger than /// num_bytes, the extra high-order bytes will be ignored. Returns false if /// there was not enough space. - /// Assume the v is stored in buffer_ as a litte-endian format + /// Assume the val is stored in buffer_ as a litte-endian format template bool PutAlignedOffset(uint8_t* ptr, T val, int num_bytes); From 4e11f4121d7a63a1d65e4887ef9d72fb0ffce688 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Thu, 10 Nov 2022 03:06:27 +0100 Subject: [PATCH 09/26] Review feedback. --- cpp/src/arrow/util/bit_stream_utils.h | 16 +--- cpp/src/parquet/encoding.cc | 123 ++++++++++++++------------ 2 files changed, 67 insertions(+), 72 deletions(-) diff --git a/cpp/src/arrow/util/bit_stream_utils.h b/cpp/src/arrow/util/bit_stream_utils.h index 7955767adb0..2f70c286503 100644 --- a/cpp/src/arrow/util/bit_stream_utils.h +++ b/cpp/src/arrow/util/bit_stream_utils.h @@ -69,13 +69,6 @@ class BitWriter { template bool PutAligned(T v, int num_bytes); - /// Writes val to given pointer using num_bytes. If T is larger than - /// num_bytes, the extra high-order bytes will be ignored. Returns false if - /// there was not enough space. - /// Assume the val is stored in buffer_ as a litte-endian format - template - bool PutAlignedOffset(uint8_t* ptr, T val, int num_bytes); - /// Write a Vlq encoded int to the buffer. Returns false if there was not enough /// room. The value is written byte aligned. /// For more details on vlq: @@ -256,19 +249,14 @@ inline uint8_t* BitWriter::GetNextBytePtr(int num_bytes) { } template -inline bool BitWriter::PutAlignedOffset(uint8_t* ptr, T val, int num_bytes) { +inline bool BitWriter::PutAligned(T val, int num_bytes) { + uint8_t* ptr = GetNextBytePtr(num_bytes); if (ptr == NULL) return false; val = arrow::bit_util::ToLittleEndian(val); memcpy(ptr, &val, num_bytes); return true; } -template -inline bool BitWriter::PutAligned(T val, int num_bytes) { - uint8_t* ptr = GetNextBytePtr(num_bytes); - return PutAlignedOffset(ptr, val, num_bytes); -} - namespace detail { inline void ResetBufferedValues_(const uint8_t* buffer, int byte_offset, diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 2733cb9585a..03f7800b54a 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -2065,30 +2065,34 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, // ---------------------------------------------------------------------- // DeltaBitPackEncoder +constexpr uint32_t kValuesPerBlock = 128; +constexpr uint32_t kMiniBlocksPerBlock = 4; + template class DeltaBitPackEncoder : public EncoderImpl, virtual public TypedEncoder { public: using T = typename DType::c_type; + using TypedEncoder::Put; - explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool) + explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool, + const uint32_t values_per_block = kValuesPerBlock, + const uint32_t mini_blocks_per_block = kMiniBlocksPerBlock) : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool), - values_per_block_(128), - mini_blocks_per_block_(4), - values_per_mini_block_(values_per_block_ / mini_blocks_per_block_), - values_current_block_(0), - total_value_count_(0), - first_value_(0), - current_value_(0), + values_per_block_(values_per_block), + mini_blocks_per_block_(mini_blocks_per_block), + values_per_mini_block_(values_per_block / mini_blocks_per_block), + deltas_(values_per_block, ::arrow::stl::allocator(pool)), + bits_buffer_(AllocateBuffer(pool, (values_per_block + 3) * sizeof(T))), sink_(pool), - bits_buffer_(AllocateBuffer(pool, (4 + values_per_block_) * sizeof(T))), - bit_writer_(bits_buffer_->mutable_data(), static_cast(bits_buffer_->size())), - deltas_(std::vector(values_per_block_)) {} + bit_writer_(bits_buffer_->mutable_data(), + static_cast(bits_buffer_->size())) { + // TODO: this does not evaluate in release build + DCHECK_EQ(values_per_mini_block_ % 32, 0); + } std::shared_ptr FlushValues() override; - int64_t EstimatedDataEncodedSize() override { return 4 * sizeof(T) + sink_.length(); } - - using TypedEncoder::Put; + int64_t EstimatedDataEncodedSize() override { return sink_.length(); } void Put(const ::arrow::Array& values) override; @@ -2097,21 +2101,20 @@ class DeltaBitPackEncoder : public EncoderImpl, virtual public TypedEncoder deltas_; std::shared_ptr bits_buffer_; + ::arrow::BufferBuilder sink_; ::arrow::bit_util::BitWriter bit_writer_; - std::vector deltas_; - - private: - void FlushBlock(); }; template @@ -2148,18 +2151,17 @@ void DeltaBitPackEncoder::FlushBlock() { const T min_delta = *std::min_element(deltas_.begin(), deltas_.begin() + values_current_block_); - DCHECK(bit_writer_.PutZigZagVlqInt(min_delta)); + bit_writer_.PutZigZagVlqInt(min_delta); - uint8_t* bit_width_offsets = bit_writer_.GetNextBytePtr(mini_blocks_per_block_); - DCHECK(bit_width_offsets != nullptr); + std::vector bit_widths(mini_blocks_per_block_); + uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_); + DCHECK(bit_width_data != nullptr); for (uint32_t i = 0; i < mini_blocks_per_block_; i++) { const uint32_t n = std::min(values_per_mini_block_, values_current_block_); - if (n == 0) { - DCHECK(bit_writer_.PutAlignedOffset(bit_width_offsets++, int8_t(32), 1)); - for (uint32_t j = 0; j < values_per_mini_block_; j++) { - DCHECK(bit_writer_.PutAligned(0, 4)); - } + + if (ARROW_PREDICT_FALSE(n == 0)) { + bit_widths[i] = 1; continue; } @@ -2168,71 +2170,76 @@ void DeltaBitPackEncoder::FlushBlock() { *std::max_element(deltas_.begin() + start, deltas_.begin() + start + n); const T max_delta_diff = SafeSignedSubtract(max_delta, min_delta); - int8_t num_bits; + int num_bits; if constexpr (std::is_same::value) { num_bits = bit_util::NumRequiredBits(max_delta_diff); } else { num_bits = bit_util::NumRequiredBits(static_cast(max_delta_diff)); } - const int32_t num_bytes = - static_cast(bit_util::BytesForBits(num_bits)); - DCHECK(bit_writer_.PutAlignedOffset(bit_width_offsets++, num_bits, 1)); + const int num_bytes = static_cast(bit_util::BytesForBits(num_bits)); + bit_widths[i] = num_bits; for (uint32_t j = start; j < start + n; j++) { const T value = SafeSignedSubtract(deltas_[j], min_delta); - DCHECK(bit_writer_.PutAligned(value, num_bytes)); + bit_writer_.PutAligned(value, num_bytes); } for (uint32_t j = n; j < values_per_mini_block_; j++) { - DCHECK(bit_writer_.PutAligned(0, num_bytes)); + bit_writer_.PutAligned(0, num_bytes); } values_current_block_ -= n; } DCHECK_EQ(values_current_block_, 0); + for (uint32_t i = 0; i < mini_blocks_per_block_; i++) { + T val = bit_util::ToLittleEndian(bit_widths[i]); + memcpy(bit_width_data + i, &val, 1); + } + bit_writer_.Flush(); PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written())); bit_writer_.Clear(); + bit_width_data = NULL; } template std::shared_ptr DeltaBitPackEncoder::FlushValues() { - if (values_current_block_ != 0) { + if (values_current_block_ > 0) { FlushBlock(); } - std::shared_ptr header_buffer = AllocateBuffer(pool_, 32); - ::arrow::bit_util::BitWriter header_writer(header_buffer->mutable_data(), - static_cast(header_buffer->size())); - if (!header_writer.PutVlqInt(values_per_block_) || - !header_writer.PutVlqInt(mini_blocks_per_block_) || - !header_writer.PutVlqInt(total_value_count_) || - !header_writer.PutZigZagVlqInt(first_value_)) { - throw ParquetException("cannot write"); - } - header_writer.Flush(false); - - ::arrow::BufferBuilder sink; - PARQUET_THROW_NOT_OK( - sink.Append(header_writer.buffer(), header_writer.bytes_written())); - header_writer.Clear(); + std::shared_ptr bit_buffer; + PARQUET_ASSIGN_OR_THROW(bit_buffer, sink_.Finish()); + sink_.Reset(); - std::shared_ptr bits_buffer; - PARQUET_THROW_NOT_OK(sink_.Finish(&bits_buffer, true)); + if (!bit_writer_.PutVlqInt(values_per_block_) || + !bit_writer_.PutVlqInt(mini_blocks_per_block_) || + !bit_writer_.PutVlqInt(total_value_count_) || + !bit_writer_.PutZigZagVlqInt(first_value_)) { + throw ParquetException("header writing error"); + } + bit_writer_.Flush(); + PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written())); + PARQUET_THROW_NOT_OK(sink_.Append(bit_buffer->mutable_data(), bit_buffer->size())); std::shared_ptr buffer; - PARQUET_THROW_NOT_OK(sink.Append(bits_buffer->mutable_data(), bits_buffer->size())); - PARQUET_THROW_NOT_OK(sink.Finish(&buffer, true)); + PARQUET_THROW_NOT_OK(sink_.Finish(&buffer, true)); return buffer; } template <> void DeltaBitPackEncoder::Put(const ::arrow::Array& values) { + if (values.null_count() > 0) { + ParquetException::NYI("DELTA_BINARY_PACKED encoding with null values"); + } auto src = values.data()->GetValues(1); Put(src, static_cast(values.length())); } template <> void DeltaBitPackEncoder::Put(const ::arrow::Array& values) { + if (values.null_count() > 0) { + ParquetException::NYI("DELTA_BINARY_PACKED encoding with null values"); + } auto src = values.data()->GetValues(1); Put(src, static_cast(values.length())); } From 8064b7fbbc1c81e9acd63d545173c740a9d3f790 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Fri, 11 Nov 2022 14:22:17 +0100 Subject: [PATCH 10/26] Update cpp/src/parquet/encoding.cc Co-authored-by: Gang Wu --- cpp/src/parquet/encoding.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 03f7800b54a..8601c179600 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -2158,7 +2158,7 @@ void DeltaBitPackEncoder::FlushBlock() { DCHECK(bit_width_data != nullptr); for (uint32_t i = 0; i < mini_blocks_per_block_; i++) { - const uint32_t n = std::min(values_per_mini_block_, values_current_block_); + const uint32_t values_current_mini_block = std::min(values_per_mini_block_, values_current_block_); if (ARROW_PREDICT_FALSE(n == 0)) { bit_widths[i] = 1; From b29ab91517068d1452accd1a97342dd9f6084e23 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Fri, 11 Nov 2022 16:51:13 +0100 Subject: [PATCH 11/26] Review feedback --- cpp/src/parquet/encoding.cc | 57 ++++++++++++++++++++++--------------- 1 file changed, 34 insertions(+), 23 deletions(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 8601c179600..5f810f43824 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -2086,8 +2086,11 @@ class DeltaBitPackEncoder : public EncoderImpl, virtual public TypedEncodermutable_data(), static_cast(bits_buffer_->size())) { - // TODO: this does not evaluate in release build - DCHECK_EQ(values_per_mini_block_ % 32, 0); + if (values_per_mini_block_ % 32 != 0) { + throw ParquetException( + "the number of values in a miniblock must be multiple of 32, but it's " + + std::to_string(values_per_mini_block_)); + } } std::shared_ptr FlushValues() override; @@ -2153,21 +2156,22 @@ void DeltaBitPackEncoder::FlushBlock() { *std::min_element(deltas_.begin(), deltas_.begin() + values_current_block_); bit_writer_.PutZigZagVlqInt(min_delta); - std::vector bit_widths(mini_blocks_per_block_); + std::vector> bit_widths( + mini_blocks_per_block_, 0); uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_); DCHECK(bit_width_data != nullptr); - for (uint32_t i = 0; i < mini_blocks_per_block_; i++) { - const uint32_t values_current_mini_block = std::min(values_per_mini_block_, values_current_block_); - - if (ARROW_PREDICT_FALSE(n == 0)) { - bit_widths[i] = 1; - continue; - } + uint32_t num_miniblocks = std::min( + static_cast(std::ceil(static_cast(values_current_block_) / + static_cast(values_per_mini_block_))), + mini_blocks_per_block_); + for (uint32_t i = 0; i < num_miniblocks; i++) { + const uint32_t values_current_mini_block = + std::min(values_per_mini_block_, values_current_block_); const uint32_t start = i * values_per_mini_block_; - const T max_delta = - *std::max_element(deltas_.begin() + start, deltas_.begin() + start + n); + const T max_delta = *std::max_element( + deltas_.begin() + start, deltas_.begin() + start + values_current_mini_block); const T max_delta_diff = SafeSignedSubtract(max_delta, min_delta); int num_bits; @@ -2179,14 +2183,17 @@ void DeltaBitPackEncoder::FlushBlock() { const int num_bytes = static_cast(bit_util::BytesForBits(num_bits)); bit_widths[i] = num_bits; - for (uint32_t j = start; j < start + n; j++) { + for (uint32_t j = start; j < start + values_current_mini_block; j++) { const T value = SafeSignedSubtract(deltas_[j], min_delta); bit_writer_.PutAligned(value, num_bytes); } - for (uint32_t j = n; j < values_per_mini_block_; j++) { + // If there are not enough values to fill the last miniblock, we pad the miniblock + // with zeroes so that its length is the number of values in a full miniblock + // multiplied by the bit width. + for (uint32_t j = values_current_mini_block; j < values_per_mini_block_; j++) { bit_writer_.PutAligned(0, num_bytes); } - values_current_block_ -= n; + values_current_block_ -= values_current_mini_block; } DCHECK_EQ(values_current_block_, 0); @@ -2228,20 +2235,24 @@ std::shared_ptr DeltaBitPackEncoder::FlushValues() { template <> void DeltaBitPackEncoder::Put(const ::arrow::Array& values) { - if (values.null_count() > 0) { - ParquetException::NYI("DELTA_BINARY_PACKED encoding with null values"); + const auto& data = *values.data(); + if (values.null_count() == 0) { + Put(data.GetValues(1), static_cast(values.length())); + } else { + PutSpaced(data.GetValues(1), static_cast(data.length), + data.GetValues(0, 0), data.offset); } - auto src = values.data()->GetValues(1); - Put(src, static_cast(values.length())); } template <> void DeltaBitPackEncoder::Put(const ::arrow::Array& values) { - if (values.null_count() > 0) { - ParquetException::NYI("DELTA_BINARY_PACKED encoding with null values"); + const auto& data = *values.data(); + if (values.null_count() == 0) { + Put(data.GetValues(1), static_cast(values.length())); + } else { + PutSpaced(data.GetValues(1), static_cast(data.length), + data.GetValues(0, 0), data.offset); } - auto src = values.data()->GetValues(1); - Put(src, static_cast(values.length())); } template From 09b8a321645dad675e867a14a99c558b2bf186b2 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Thu, 17 Nov 2022 12:41:40 +0100 Subject: [PATCH 12/26] Apply suggestions from code review Co-authored-by: Will Jones --- cpp/src/parquet/encoding.cc | 11 +++++------ cpp/src/parquet/encoding_test.cc | 2 ++ 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 5f810f43824..81afd066130 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -2198,8 +2198,7 @@ void DeltaBitPackEncoder::FlushBlock() { DCHECK_EQ(values_current_block_, 0); for (uint32_t i = 0; i < mini_blocks_per_block_; i++) { - T val = bit_util::ToLittleEndian(bit_widths[i]); - memcpy(bit_width_data + i, &val, 1); + bit_width_data[i] = bit_util::ToLittleEndian(bit_widths[i]); } bit_writer_.Flush(); @@ -2235,9 +2234,9 @@ std::shared_ptr DeltaBitPackEncoder::FlushValues() { template <> void DeltaBitPackEncoder::Put(const ::arrow::Array& values) { - const auto& data = *values.data(); + const ArrayData& data = *values.data(); if (values.null_count() == 0) { - Put(data.GetValues(1), static_cast(values.length())); + Put(data.GetValues(1), static_cast(data.length)); } else { PutSpaced(data.GetValues(1), static_cast(data.length), data.GetValues(0, 0), data.offset); @@ -2246,9 +2245,9 @@ void DeltaBitPackEncoder::Put(const ::arrow::Array& values) { template <> void DeltaBitPackEncoder::Put(const ::arrow::Array& values) { - const auto& data = *values.data(); + const ArrayData& data = *values.data(); if (values.null_count() == 0) { - Put(data.GetValues(1), static_cast(values.length())); + Put(data.GetValues(1), static_cast(data.length)); } else { PutSpaced(data.GetValues(1), static_cast(data.length), data.GetValues(0, 0), data.offset); diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc index 36c6a3a390b..71b3122f9f8 100644 --- a/cpp/src/parquet/encoding_test.cc +++ b/cpp/src/parquet/encoding_test.cc @@ -1331,6 +1331,8 @@ TYPED_TEST_SUITE(TestDeltaBitPackEncoding, TestDeltaBitPackEncodingTypes); TYPED_TEST(TestDeltaBitPackEncoding, BasicRoundTrip) { ASSERT_NO_FATAL_FAILURE(this->Execute(25000, 200)); + ASSERT_NO_FATAL_FAILURE(this->Execute(0, 0)); + ASSERT_NO_FATAL_FAILURE(this->Execute(2000, 2000)); ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced( /*nvalues*/ 1234, /*repeats*/ 1, /*valid_bits_offset*/ 64, /*null_prob*/ 0.1)); } From a8c3d283ea5b3ce55153aca7d83e0526a9e7a3d5 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Thu, 17 Nov 2022 19:45:19 +0100 Subject: [PATCH 13/26] Review feedback --- cpp/src/parquet/encoding.cc | 60 +++++++++++++++++++++++++++++++++---- 1 file changed, 55 insertions(+), 5 deletions(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 81afd066130..cac4f0f7e2d 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -2065,9 +2065,44 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, // ---------------------------------------------------------------------- // DeltaBitPackEncoder +constexpr uint32_t kMaxPageHeaderWriterSize = 32; constexpr uint32_t kValuesPerBlock = 128; constexpr uint32_t kMiniBlocksPerBlock = 4; +/// DeltaBitPackEncoder is an encoder for the DeltaBinary Packing format +/// as per the parquet spec. See: +/// https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-encoding-delta_binary_packed--5 +/// +/// Consists of a header followed by blocks of delta encoded values binary packed. +/// +/// Format +/// [header] [block 1] [block 2] ... [block N] +/// +/// Header +/// [block size] [number of mini blocks per block] [total value count] [first value] +/// +/// Block +/// [min delta] [list of bitwidths of the mini blocks] [miniblocks] +/// +/// Sets aside bytes at the start of the internal buffer where the header will be written, +/// and only writes the header when FlushValues is called before returning it. +/// +/// To encode a block, we will: +/// +/// 1. Compute the differences between consecutive elements. For the first element in the +/// block, use the last element in the previous block or, in the case of the first block, +/// use the first value of the whole sequence, stored in the header. +/// +/// 2. Compute the frame of reference (the minimum of the deltas in the block). Subtract +/// this min delta from all deltas in the block. This guarantees that all values are +/// non-negative. +/// +/// 3. Encode the frame of reference (min delta) as a zigzag ULEB128 int followed by the +/// bit widths of the mini blocks and the delta values (minus the min delta) bit packed +/// per mini block. +/// +/// Supports only INT32 and INT64. + template class DeltaBitPackEncoder : public EncoderImpl, virtual public TypedEncoder { public: @@ -2082,10 +2117,15 @@ class DeltaBitPackEncoder : public EncoderImpl, virtual public TypedEncoder(pool)), - bits_buffer_(AllocateBuffer(pool, (values_per_block + 3) * sizeof(T))), + bits_buffer_(AllocateBuffer(pool, kMaxPageHeaderWriterSize + values_per_block * sizeof(T))), sink_(pool), bit_writer_(bits_buffer_->mutable_data(), static_cast(bits_buffer_->size())) { + if (values_per_block_ % 32 != 0) { + throw ParquetException( + "the number of values in a block must be multiple of 128, but it's " + + std::to_string(values_per_block_)); + } if (values_per_mini_block_ % 32 != 0) { throw ParquetException( "the number of values in a miniblock must be multiple of 32, but it's " + @@ -2156,6 +2196,10 @@ void DeltaBitPackEncoder::FlushBlock() { *std::min_element(deltas_.begin(), deltas_.begin() + values_current_block_); bit_writer_.PutZigZagVlqInt(min_delta); + // If, in the last block, less than miniblocks are + // needed to store the values, the bytes storing the bit widths of the unneeded + // miniblocks are still present, their value should be zero, but readers must accept + // arbitrary values as well. std::vector> bit_widths( mini_blocks_per_block_, 0); uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_); @@ -2180,6 +2224,7 @@ void DeltaBitPackEncoder::FlushBlock() { } else { num_bits = bit_util::NumRequiredBits(static_cast(max_delta_diff)); } + // The minimum number of bytes required to write any of value in deltas_ vector. const int num_bytes = static_cast(bit_util::BytesForBits(num_bits)); bit_widths[i] = num_bits; @@ -2187,8 +2232,8 @@ void DeltaBitPackEncoder::FlushBlock() { const T value = SafeSignedSubtract(deltas_[j], min_delta); bit_writer_.PutAligned(value, num_bytes); } - // If there are not enough values to fill the last miniblock, we pad the miniblock - // with zeroes so that its length is the number of values in a full miniblock + // If there are not enough values to fill the last mini block, we pad the mini block + // with zeroes so that its length is the number of values in a full mini block // multiplied by the bit width. for (uint32_t j = values_current_mini_block; j < values_per_mini_block_; j++) { bit_writer_.PutAligned(0, num_bytes); @@ -2234,7 +2279,7 @@ std::shared_ptr DeltaBitPackEncoder::FlushValues() { template <> void DeltaBitPackEncoder::Put(const ::arrow::Array& values) { - const ArrayData& data = *values.data(); + const ::arrow::ArrayData& data = *values.data(); if (values.null_count() == 0) { Put(data.GetValues(1), static_cast(data.length)); } else { @@ -2245,7 +2290,7 @@ void DeltaBitPackEncoder::Put(const ::arrow::Array& values) { template <> void DeltaBitPackEncoder::Put(const ::arrow::Array& values) { - const ArrayData& data = *values.data(); + const ::arrow::ArrayData& data = *values.data(); if (values.null_count() == 0) { Put(data.GetValues(1), static_cast(data.length)); } else { @@ -2356,6 +2401,11 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder Date: Fri, 18 Nov 2022 18:04:29 +0100 Subject: [PATCH 14/26] Linting --- cpp/src/parquet/encoding.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index cac4f0f7e2d..5cc369561fc 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -2117,7 +2117,8 @@ class DeltaBitPackEncoder : public EncoderImpl, virtual public TypedEncoder(pool)), - bits_buffer_(AllocateBuffer(pool, kMaxPageHeaderWriterSize + values_per_block * sizeof(T))), + bits_buffer_(AllocateBuffer( + pool, kMaxPageHeaderWriterSize + values_per_block * sizeof(T))), sink_(pool), bit_writer_(bits_buffer_->mutable_data(), static_cast(bits_buffer_->size())) { From a7f1cea5a569ee740f3a5f86204905d37abdd090 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Mon, 21 Nov 2022 21:31:18 +0100 Subject: [PATCH 15/26] Apply suggestions from code review Co-authored-by: Antoine Pitrou --- cpp/src/parquet/encoding.cc | 7 ++----- cpp/src/parquet/encoding_test.cc | 2 +- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 5cc369561fc..1ecbbf930cb 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -2259,9 +2259,7 @@ std::shared_ptr DeltaBitPackEncoder::FlushValues() { FlushBlock(); } - std::shared_ptr bit_buffer; - PARQUET_ASSIGN_OR_THROW(bit_buffer, sink_.Finish()); - sink_.Reset(); + PARQUET_ASSIGN_OR_THROW(auto bit_buffer, sink_.Finish(/*shrink_to_fit=*/ true)); if (!bit_writer_.PutVlqInt(values_per_block_) || !bit_writer_.PutVlqInt(mini_blocks_per_block_) || @@ -2273,8 +2271,7 @@ std::shared_ptr DeltaBitPackEncoder::FlushValues() { PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written())); PARQUET_THROW_NOT_OK(sink_.Append(bit_buffer->mutable_data(), bit_buffer->size())); - std::shared_ptr buffer; - PARQUET_THROW_NOT_OK(sink_.Finish(&buffer, true)); + PARQUET_ASSIGN_OR_THROW(auto buffer, sink_.Finish(/*shrink_to_fit=*/ true)); return buffer; } diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc index 71b3122f9f8..67134535880 100644 --- a/cpp/src/parquet/encoding_test.cc +++ b/cpp/src/parquet/encoding_test.cc @@ -1326,7 +1326,7 @@ class TestDeltaBitPackEncoding : public TestEncodingBase { USING_BASE_MEMBERS(); }; -typedef ::testing::Types TestDeltaBitPackEncodingTypes; +using TestDeltaBitPackEncodingTypes = ::testing::Types; TYPED_TEST_SUITE(TestDeltaBitPackEncoding, TestDeltaBitPackEncodingTypes); TYPED_TEST(TestDeltaBitPackEncoding, BasicRoundTrip) { From 068ea81dca320af8f55742042b50b2ae802879b8 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Mon, 21 Nov 2022 21:54:01 +0100 Subject: [PATCH 16/26] Review feeback --- cpp/src/parquet/encoding.cc | 52 ++++++++++++++++++++------------ cpp/src/parquet/encoding_test.cc | 5 +-- 2 files changed, 36 insertions(+), 21 deletions(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 1ecbbf930cb..a1b5f37c790 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -2065,10 +2065,6 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, // ---------------------------------------------------------------------- // DeltaBitPackEncoder -constexpr uint32_t kMaxPageHeaderWriterSize = 32; -constexpr uint32_t kValuesPerBlock = 128; -constexpr uint32_t kMiniBlocksPerBlock = 4; - /// DeltaBitPackEncoder is an encoder for the DeltaBinary Packing format /// as per the parquet spec. See: /// https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-encoding-delta_binary_packed--5 @@ -2105,6 +2101,10 @@ constexpr uint32_t kMiniBlocksPerBlock = 4; template class DeltaBitPackEncoder : public EncoderImpl, virtual public TypedEncoder { + static constexpr uint32_t kMaxPageHeaderWriterSize = 32; + static constexpr uint32_t kValuesPerBlock = 128; + static constexpr uint32_t kMiniBlocksPerBlock = 4; + public: using T = typename DType::c_type; using TypedEncoder::Put; @@ -2122,7 +2122,7 @@ class DeltaBitPackEncoder : public EncoderImpl, virtual public TypedEncodermutable_data(), static_cast(bits_buffer_->size())) { - if (values_per_block_ % 32 != 0) { + if (values_per_block_ % 128 != 0) { throw ParquetException( "the number of values in a block must be multiple of 128, but it's " + std::to_string(values_per_block_)); @@ -2132,6 +2132,12 @@ class DeltaBitPackEncoder : public EncoderImpl, virtual public TypedEncoder FlushValues() override; @@ -2177,6 +2183,11 @@ void DeltaBitPackEncoder::Put(const T* src, int num_values) { while (idx < num_values) { T value = src[idx]; + // Calculate deltas. The possible overflow is handled by SafeSignedSubtract that + // internally uses unsigned integers making subtraction operations well defined + // and correct even in case of overflow. Encoded integes will wrap back around on + // decoding. + // See http://en.wikipedia.org/wiki/Modular_arithmetic#Integers_modulo_n deltas_[values_current_block_] = SafeSignedSubtract(value, current_value_); current_value_ = value; idx++; @@ -2201,15 +2212,15 @@ void DeltaBitPackEncoder::FlushBlock() { // needed to store the values, the bytes storing the bit widths of the unneeded // miniblocks are still present, their value should be zero, but readers must accept // arbitrary values as well. - std::vector> bit_widths( - mini_blocks_per_block_, 0); + std::vector bit_widths(mini_blocks_per_block_, 0); + // Call to GetNextBytePtr reserves mini_blocks_per_block_ bytes of space to write + // bit widths of miniblocks as they become known during the encoding. uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_); DCHECK(bit_width_data != nullptr); - uint32_t num_miniblocks = std::min( - static_cast(std::ceil(static_cast(values_current_block_) / - static_cast(values_per_mini_block_))), - mini_blocks_per_block_); + uint32_t num_miniblocks = + static_cast(std::ceil(static_cast(values_current_block_) / + static_cast(values_per_mini_block_))); for (uint32_t i = 0; i < num_miniblocks; i++) { const uint32_t values_current_mini_block = std::min(values_per_mini_block_, values_current_block_); @@ -2218,18 +2229,16 @@ void DeltaBitPackEncoder::FlushBlock() { const T max_delta = *std::max_element( deltas_.begin() + start, deltas_.begin() + start + values_current_mini_block); + // See overflow comment above const T max_delta_diff = SafeSignedSubtract(max_delta, min_delta); - int num_bits; - if constexpr (std::is_same::value) { - num_bits = bit_util::NumRequiredBits(max_delta_diff); - } else { - num_bits = bit_util::NumRequiredBits(static_cast(max_delta_diff)); - } + const int num_bits = + bit_util::NumRequiredBits(static_cast>(max_delta_diff)); // The minimum number of bytes required to write any of value in deltas_ vector. const int num_bytes = static_cast(bit_util::BytesForBits(num_bits)); bit_widths[i] = num_bits; for (uint32_t j = start; j < start + values_current_mini_block; j++) { + // See overflow comment above const T value = SafeSignedSubtract(deltas_[j], min_delta); bit_writer_.PutAligned(value, num_bytes); } @@ -2244,7 +2253,7 @@ void DeltaBitPackEncoder::FlushBlock() { DCHECK_EQ(values_current_block_, 0); for (uint32_t i = 0; i < mini_blocks_per_block_; i++) { - bit_width_data[i] = bit_util::ToLittleEndian(bit_widths[i]); + bit_width_data[i] = bit_widths[i]; } bit_writer_.Flush(); @@ -2259,8 +2268,12 @@ std::shared_ptr DeltaBitPackEncoder::FlushValues() { FlushBlock(); } - PARQUET_ASSIGN_OR_THROW(auto bit_buffer, sink_.Finish(/*shrink_to_fit=*/ true)); + PARQUET_ASSIGN_OR_THROW(auto bit_buffer, sink_.Finish(/*shrink_to_fit=*/true)); + if (bit_writer_.buffer_len() - bit_writer_.bytes_written() < + static_cast(kMaxPageHeaderWriterSize)) { + throw ParquetException("not enough space to write header"); + } if (!bit_writer_.PutVlqInt(values_per_block_) || !bit_writer_.PutVlqInt(mini_blocks_per_block_) || !bit_writer_.PutVlqInt(total_value_count_) || @@ -2269,6 +2282,7 @@ std::shared_ptr DeltaBitPackEncoder::FlushValues() { } bit_writer_.Flush(); + PARQUET_THROW_NOT_OK(sink_.Reserve(bit_writer_.bytes_written() + bit_buffer->size())); PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written())); PARQUET_THROW_NOT_OK(sink_.Append(bit_buffer->mutable_data(), bit_buffer->size())); PARQUET_ASSIGN_OR_THROW(auto buffer, sink_.Finish(/*shrink_to_fit=*/ true)); diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc index 67134535880..d102abbf54a 100644 --- a/cpp/src/parquet/encoding_test.cc +++ b/cpp/src/parquet/encoding_test.cc @@ -1285,7 +1285,7 @@ class TestDeltaBitPackEncoding : public TestEncodingBase { using c_type = typename Type::c_type; static constexpr int TYPE = Type::type_num; - virtual void CheckRoundtrip() { + void CheckRoundtrip() override { auto encoder = MakeTypedEncoder(Encoding::DELTA_BINARY_PACKED, false, descr_.get()); auto decoder = MakeTypedDecoder(Encoding::DELTA_BINARY_PACKED, descr_.get()); @@ -1300,7 +1300,8 @@ class TestDeltaBitPackEncoding : public TestEncodingBase { ASSERT_NO_FATAL_FAILURE(VerifyResults(decode_buf_, draws_, num_values_)); } - void CheckRoundtripSpaced(const uint8_t* valid_bits, int64_t valid_bits_offset) { + void CheckRoundtripSpaced(const uint8_t* valid_bits, + int64_t valid_bits_offset) override { auto encoder = MakeTypedEncoder(Encoding::DELTA_BINARY_PACKED, false, descr_.get()); auto decoder = MakeTypedDecoder(Encoding::DELTA_BINARY_PACKED, descr_.get()); From 87eb6627660a18e66170316a637cc13fc07d2aaa Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Tue, 22 Nov 2022 16:51:35 +0100 Subject: [PATCH 17/26] Review feedback --- cpp/src/parquet/encoding.cc | 84 ++++++++++++++++---------------- cpp/src/parquet/encoding_test.cc | 51 ++++++++++++++++++- 2 files changed, 93 insertions(+), 42 deletions(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index a1b5f37c790..650a289f6ba 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -56,7 +56,6 @@ using arrow::Status; using arrow::VisitNullBitmapInline; using arrow::internal::AddWithOverflow; using arrow::internal::checked_cast; -using arrow::internal::SafeSignedAdd; using arrow::internal::SafeSignedSubtract; using std::string_view; @@ -2101,12 +2100,13 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, template class DeltaBitPackEncoder : public EncoderImpl, virtual public TypedEncoder { - static constexpr uint32_t kMaxPageHeaderWriterSize = 32; - static constexpr uint32_t kValuesPerBlock = 128; - static constexpr uint32_t kMiniBlocksPerBlock = 4; + static constexpr uint32_t kMaxPageHeaderWriterSize = 32; + static constexpr uint32_t kValuesPerBlock = 128; + static constexpr uint32_t kMiniBlocksPerBlock = 4; public: using T = typename DType::c_type; + using UT = std::make_unsigned_t; using TypedEncoder::Put; explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool, @@ -2117,11 +2117,11 @@ class DeltaBitPackEncoder : public EncoderImpl, virtual public TypedEncoder(pool)), - bits_buffer_(AllocateBuffer( - pool, kMaxPageHeaderWriterSize + values_per_block * sizeof(T))), + bits_buffer_( + AllocateBuffer(pool, (kMiniBlocksPerBlock + values_per_block) * sizeof(T))), sink_(pool), - bit_writer_(bits_buffer_->mutable_data(), - static_cast(bits_buffer_->size())) { + bit_writer_(bits_buffer_->mutable_data(), static_cast(bits_buffer_->size())), + bit_widths_(mini_blocks_per_block, 0) { if (values_per_block_ % 128 != 0) { throw ParquetException( "the number of values in a block must be multiple of 128, but it's " + @@ -2138,6 +2138,8 @@ class DeltaBitPackEncoder : public EncoderImpl, virtual public TypedEncoder FlushValues() override; @@ -2165,6 +2167,7 @@ class DeltaBitPackEncoder : public EncoderImpl, virtual public TypedEncoder bits_buffer_; ::arrow::BufferBuilder sink_; ::arrow::bit_util::BitWriter bit_writer_; + std::vector bit_widths_; }; template @@ -2188,7 +2191,8 @@ void DeltaBitPackEncoder::Put(const T* src, int num_values) { // and correct even in case of overflow. Encoded integes will wrap back around on // decoding. // See http://en.wikipedia.org/wiki/Modular_arithmetic#Integers_modulo_n - deltas_[values_current_block_] = SafeSignedSubtract(value, current_value_); + deltas_[values_current_block_] = + static_cast(value) - static_cast(current_value_); current_value_ = value; idx++; values_current_block_++; @@ -2208,11 +2212,6 @@ void DeltaBitPackEncoder::FlushBlock() { *std::min_element(deltas_.begin(), deltas_.begin() + values_current_block_); bit_writer_.PutZigZagVlqInt(min_delta); - // If, in the last block, less than miniblocks are - // needed to store the values, the bytes storing the bit widths of the unneeded - // miniblocks are still present, their value should be zero, but readers must accept - // arbitrary values as well. - std::vector bit_widths(mini_blocks_per_block_, 0); // Call to GetNextBytePtr reserves mini_blocks_per_block_ bytes of space to write // bit widths of miniblocks as they become known during the encoding. uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_); @@ -2231,29 +2230,30 @@ void DeltaBitPackEncoder::FlushBlock() { // See overflow comment above const T max_delta_diff = SafeSignedSubtract(max_delta, min_delta); - const int num_bits = - bit_util::NumRequiredBits(static_cast>(max_delta_diff)); - // The minimum number of bytes required to write any of value in deltas_ vector. - const int num_bytes = static_cast(bit_util::BytesForBits(num_bits)); - bit_widths[i] = num_bits; + // The minimum number of bits required to write any of values in deltas_ vector. + bit_widths_[i] = bit_util::NumRequiredBits(static_cast(max_delta_diff)); for (uint32_t j = start; j < start + values_current_mini_block; j++) { // See overflow comment above - const T value = SafeSignedSubtract(deltas_[j], min_delta); - bit_writer_.PutAligned(value, num_bytes); + auto value = static_cast(SafeSignedSubtract(deltas_[j], min_delta)); + bit_writer_.PutValue(value, bit_widths_[i]); } // If there are not enough values to fill the last mini block, we pad the mini block // with zeroes so that its length is the number of values in a full mini block // multiplied by the bit width. for (uint32_t j = values_current_mini_block; j < values_per_mini_block_; j++) { - bit_writer_.PutAligned(0, num_bytes); + bit_writer_.PutValue(UT(0), bit_widths_[i]); } values_current_block_ -= values_current_mini_block; } DCHECK_EQ(values_current_block_, 0); + // If, in the last block, less than miniblocks are + // needed to store the values, the bytes storing the bit widths of the unneeded + // miniblocks are still present, their value should be zero, but readers must accept + // arbitrary values as well. for (uint32_t i = 0; i < mini_blocks_per_block_; i++) { - bit_width_data[i] = bit_widths[i]; + bit_width_data[i] = bit_widths_[i]; } bit_writer_.Flush(); @@ -2267,26 +2267,27 @@ std::shared_ptr DeltaBitPackEncoder::FlushValues() { if (values_current_block_ > 0) { FlushBlock(); } + PARQUET_ASSIGN_OR_THROW(auto buffer, sink_.Finish(/*shrink_to_fit=*/true)); - PARQUET_ASSIGN_OR_THROW(auto bit_buffer, sink_.Finish(/*shrink_to_fit=*/true)); - - if (bit_writer_.buffer_len() - bit_writer_.bytes_written() < - static_cast(kMaxPageHeaderWriterSize)) { - throw ParquetException("not enough space to write header"); - } - if (!bit_writer_.PutVlqInt(values_per_block_) || - !bit_writer_.PutVlqInt(mini_blocks_per_block_) || - !bit_writer_.PutVlqInt(total_value_count_) || - !bit_writer_.PutZigZagVlqInt(first_value_)) { + uint8_t header_buffer_[kMaxPageHeaderWriterSize] = {}; + bit_util::BitWriter header_writer(header_buffer_, sizeof(header_buffer_)); + if (!header_writer.PutVlqInt(values_per_block_) || + !header_writer.PutVlqInt(mini_blocks_per_block_) || + !header_writer.PutVlqInt(total_value_count_) || + !header_writer.PutZigZagVlqInt(first_value_)) { throw ParquetException("header writing error"); } - bit_writer_.Flush(); + header_writer.Flush(); - PARQUET_THROW_NOT_OK(sink_.Reserve(bit_writer_.bytes_written() + bit_buffer->size())); - PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written())); - PARQUET_THROW_NOT_OK(sink_.Append(bit_buffer->mutable_data(), bit_buffer->size())); - PARQUET_ASSIGN_OR_THROW(auto buffer, sink_.Finish(/*shrink_to_fit=*/ true)); - return buffer; + // We reserved enough space at the beginning of the buffer for largest possible header + // and data was written immediately after. We now write the header data immediately + // before the end of reserved space. + const size_t offset_bytes = kMaxPageHeaderWriterSize - header_writer.bytes_written(); + std::memcpy(buffer->mutable_data() + offset_bytes, header_buffer_, + header_writer.bytes_written()); + + // Excess bytes at the beginning will are sliced off and ignored. + return SliceBuffer(buffer, offset_bytes); } template <> @@ -2488,8 +2489,9 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder(min_delta_) + static_cast(buffer[i + j]); + buffer[i + j] = static_cast(delta + static_cast(last_value_)); last_value_ = buffer[i + j]; } values_current_mini_block_ -= values_decode; diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc index d102abbf54a..c76d3b1e08c 100644 --- a/cpp/src/parquet/encoding_test.cc +++ b/cpp/src/parquet/encoding_test.cc @@ -124,6 +124,12 @@ void GenerateData(int num_values, T* out, std::vector* heap) { std::numeric_limits::max(), out); } +template +void GenerateBoundData(int num_values, T* out, T min, T max, std::vector* heap) { + // seed the prng so failure is deterministic + random_numbers(num_values, 0, min, max, out); +} + template <> void GenerateData(int num_values, bool* out, std::vector* heap) { // seed the prng so failure is deterministic @@ -1285,6 +1291,40 @@ class TestDeltaBitPackEncoding : public TestEncodingBase { using c_type = typename Type::c_type; static constexpr int TYPE = Type::type_num; + void InitBoundData(int nvalues, int repeats) { + num_values_ = nvalues * repeats; + input_bytes_.resize(num_values_ * sizeof(c_type)); + output_bytes_.resize(num_values_ * sizeof(c_type)); + draws_ = reinterpret_cast(input_bytes_.data()); + decode_buf_ = reinterpret_cast(output_bytes_.data()); + GenerateBoundData(nvalues, draws_, -10, 10, &data_buffer_); + + // add some repeated values + for (int j = 1; j < repeats; ++j) { + for (int i = 0; i < nvalues; ++i) { + draws_[nvalues * j + i] = draws_[i]; + } + } + } + + void ExecuteBound(int nvalues, int repeats) { + InitBoundData(nvalues, repeats); + CheckRoundtrip(); + } + + void ExecuteSpacedBound(int nvalues, int repeats, int64_t valid_bits_offset, + double null_probability) { + InitBoundData(nvalues, repeats); + + int64_t size = num_values_ + valid_bits_offset; + auto rand = ::arrow::random::RandomArrayGenerator(1923); + const auto array = rand.UInt8(size, 0, 100, null_probability); + const auto valid_bits = array->null_bitmap_data(); + if (valid_bits) { + CheckRoundtripSpaced(valid_bits, valid_bits_offset); + } + } + void CheckRoundtrip() override { auto encoder = MakeTypedEncoder(Encoding::DELTA_BINARY_PACKED, false, descr_.get()); @@ -1325,9 +1365,13 @@ class TestDeltaBitPackEncoding : public TestEncodingBase { protected: USING_BASE_MEMBERS(); + std::vector input_bytes_; + std::vector output_bytes_; }; -using TestDeltaBitPackEncodingTypes = ::testing::Types; +// TODO: Int64Type +// using TestDeltaBitPackEncodingTypes = ::testing::Types; +using TestDeltaBitPackEncodingTypes = ::testing::Types; TYPED_TEST_SUITE(TestDeltaBitPackEncoding, TestDeltaBitPackEncodingTypes); TYPED_TEST(TestDeltaBitPackEncoding, BasicRoundTrip) { @@ -1336,6 +1380,11 @@ TYPED_TEST(TestDeltaBitPackEncoding, BasicRoundTrip) { ASSERT_NO_FATAL_FAILURE(this->Execute(2000, 2000)); ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced( /*nvalues*/ 1234, /*repeats*/ 1, /*valid_bits_offset*/ 64, /*null_prob*/ 0.1)); + ASSERT_NO_FATAL_FAILURE(this->ExecuteBound(25000, 200)); + ASSERT_NO_FATAL_FAILURE(this->ExecuteBound(0, 0)); + ASSERT_NO_FATAL_FAILURE(this->ExecuteBound(2000, 2000)); + ASSERT_NO_FATAL_FAILURE(this->ExecuteSpacedBound( + /*nvalues*/ 1234, /*repeats*/ 1, /*valid_bits_offset*/ 64, /*null_prob*/ 0.1)); } } // namespace test From ed68e54e9c24f8fe35ed76310366be926d7dab09 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Wed, 23 Nov 2022 12:15:00 +0100 Subject: [PATCH 18/26] Enable int64 --- cpp/src/arrow/util/bit_stream_utils.h | 9 +++-- cpp/src/parquet/encoding.cc | 48 ++++++++++++--------------- cpp/src/parquet/encoding_test.cc | 4 +-- 3 files changed, 29 insertions(+), 32 deletions(-) diff --git a/cpp/src/arrow/util/bit_stream_utils.h b/cpp/src/arrow/util/bit_stream_utils.h index 2f70c286503..285134a40db 100644 --- a/cpp/src/arrow/util/bit_stream_utils.h +++ b/cpp/src/arrow/util/bit_stream_utils.h @@ -204,8 +204,10 @@ class BitReader { inline bool BitWriter::PutValue(uint64_t v, int num_bits) { // TODO: revisit this limit if necessary (can be raised to 64 by fixing some edge cases) - DCHECK_LE(num_bits, 32); - DCHECK_EQ(v >> num_bits, 0) << "v = " << v << ", num_bits = " << num_bits; + DCHECK_LE(num_bits, 64); + if (num_bits < 64) { + DCHECK_EQ(v >> num_bits, 0) << "v = " << v << ", num_bits = " << num_bits; + } if (ARROW_PREDICT_FALSE(byte_offset_ * 8 + bit_offset_ + num_bits > max_bytes_ * 8)) return false; @@ -220,7 +222,8 @@ inline bool BitWriter::PutValue(uint64_t v, int num_bits) { buffered_values_ = 0; byte_offset_ += 8; bit_offset_ -= 64; - buffered_values_ = v >> (num_bits - bit_offset_); + buffered_values_ = + (num_bits - bit_offset_ == 64) ? 0 : (v >> (num_bits - bit_offset_)); } DCHECK_LT(bit_offset_, 64); return true; diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 650a289f6ba..156f89d18ab 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -56,7 +56,6 @@ using arrow::Status; using arrow::VisitNullBitmapInline; using arrow::internal::AddWithOverflow; using arrow::internal::checked_cast; -using arrow::internal::SafeSignedSubtract; using std::string_view; template @@ -2161,9 +2160,9 @@ class DeltaBitPackEncoder : public EncoderImpl, virtual public TypedEncoder deltas_; + UT first_value_{0}; + UT current_value_{0}; + ArrowPoolVector deltas_; std::shared_ptr bits_buffer_; ::arrow::BufferBuilder sink_; ::arrow::bit_util::BitWriter bit_writer_; @@ -2185,14 +2184,12 @@ void DeltaBitPackEncoder::Put(const T* src, int num_values) { total_value_count_ += num_values; while (idx < num_values) { - T value = src[idx]; - // Calculate deltas. The possible overflow is handled by SafeSignedSubtract that - // internally uses unsigned integers making subtraction operations well defined - // and correct even in case of overflow. Encoded integes will wrap back around on - // decoding. + UT value = static_cast(src[idx]); + // Calculate deltas. The possible overflow is handled by use of unsigned integers + // making subtraction operations well defined and correct even in case of overflow. + // Encoded integes will wrap back around on decoding. // See http://en.wikipedia.org/wiki/Modular_arithmetic#Integers_modulo_n - deltas_[values_current_block_] = - static_cast(value) - static_cast(current_value_); + deltas_[values_current_block_] = value - current_value_; current_value_ = value; idx++; values_current_block_++; @@ -2208,9 +2205,9 @@ void DeltaBitPackEncoder::FlushBlock() { return; } - const T min_delta = - *std::min_element(deltas_.begin(), deltas_.begin() + values_current_block_); - bit_writer_.PutZigZagVlqInt(min_delta); + auto min_delta = static_cast( + *std::min_element(deltas_.begin(), deltas_.begin() + values_current_block_)); + bit_writer_.PutZigZagVlqInt(static_cast(min_delta)); // Call to GetNextBytePtr reserves mini_blocks_per_block_ bytes of space to write // bit widths of miniblocks as they become known during the encoding. @@ -2225,24 +2222,23 @@ void DeltaBitPackEncoder::FlushBlock() { std::min(values_per_mini_block_, values_current_block_); const uint32_t start = i * values_per_mini_block_; - const T max_delta = *std::max_element( - deltas_.begin() + start, deltas_.begin() + start + values_current_mini_block); + auto max_delta = static_cast(*std::max_element( + deltas_.begin() + start, deltas_.begin() + start + values_current_mini_block)); - // See overflow comment above - const T max_delta_diff = SafeSignedSubtract(max_delta, min_delta); // The minimum number of bits required to write any of values in deltas_ vector. - bit_widths_[i] = bit_util::NumRequiredBits(static_cast(max_delta_diff)); + // See overflow comment above. + bit_widths_[i] = bit_util::NumRequiredBits(max_delta - min_delta); for (uint32_t j = start; j < start + values_current_mini_block; j++) { - // See overflow comment above - auto value = static_cast(SafeSignedSubtract(deltas_[j], min_delta)); + // See overflow comment above. + const UT value = deltas_[j] - min_delta; bit_writer_.PutValue(value, bit_widths_[i]); } // If there are not enough values to fill the last mini block, we pad the mini block // with zeroes so that its length is the number of values in a full mini block // multiplied by the bit width. for (uint32_t j = values_current_mini_block; j < values_per_mini_block_; j++) { - bit_writer_.PutValue(UT(0), bit_widths_[i]); + bit_writer_.PutValue(0, bit_widths_[i]); } values_current_block_ -= values_current_mini_block; } @@ -2274,7 +2270,7 @@ std::shared_ptr DeltaBitPackEncoder::FlushValues() { if (!header_writer.PutVlqInt(values_per_block_) || !header_writer.PutVlqInt(mini_blocks_per_block_) || !header_writer.PutVlqInt(total_value_count_) || - !header_writer.PutZigZagVlqInt(first_value_)) { + !header_writer.PutZigZagVlqInt(static_cast(first_value_))) { throw ParquetException("header writing error"); } header_writer.Flush(); @@ -2340,6 +2336,7 @@ template class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder { public: typedef typename DType::c_type T; + using UT = std::make_unsigned_t; explicit DeltaBitPackDecoder(const ColumnDescriptor* descr, MemoryPool* pool = ::arrow::default_memory_pool()) @@ -2489,9 +2486,8 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder(min_delta_) + static_cast(buffer[i + j]); - buffer[i + j] = static_cast(delta + static_cast(last_value_)); + buffer[i + j] = static_cast(min_delta_) + static_cast(buffer[i + j]) + + static_cast(last_value_); last_value_ = buffer[i + j]; } values_current_mini_block_ -= values_decode; diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc index c76d3b1e08c..e991d6bbbf7 100644 --- a/cpp/src/parquet/encoding_test.cc +++ b/cpp/src/parquet/encoding_test.cc @@ -1369,9 +1369,7 @@ class TestDeltaBitPackEncoding : public TestEncodingBase { std::vector output_bytes_; }; -// TODO: Int64Type -// using TestDeltaBitPackEncodingTypes = ::testing::Types; -using TestDeltaBitPackEncodingTypes = ::testing::Types; +using TestDeltaBitPackEncodingTypes = ::testing::Types; TYPED_TEST_SUITE(TestDeltaBitPackEncoding, TestDeltaBitPackEncodingTypes); TYPED_TEST(TestDeltaBitPackEncoding, BasicRoundTrip) { From 586c324d30c82092bfcf0c45e7e0288c45f88254 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Fri, 2 Dec 2022 21:01:31 +0100 Subject: [PATCH 19/26] Review feedback --- cpp/src/arrow/util/bit_stream_utils.h | 1 - cpp/src/parquet/column_writer_test.cc | 15 +++++++++++---- cpp/src/parquet/encoding.cc | 6 ++++-- 3 files changed, 15 insertions(+), 7 deletions(-) diff --git a/cpp/src/arrow/util/bit_stream_utils.h b/cpp/src/arrow/util/bit_stream_utils.h index 285134a40db..dc9b41793cf 100644 --- a/cpp/src/arrow/util/bit_stream_utils.h +++ b/cpp/src/arrow/util/bit_stream_utils.h @@ -203,7 +203,6 @@ class BitReader { }; inline bool BitWriter::PutValue(uint64_t v, int num_bits) { - // TODO: revisit this limit if necessary (can be raised to 64 by fixing some edge cases) DCHECK_LE(num_bits, 64); if (num_bits < 64) { DCHECK_EQ(v >> num_bits, 0) << "v = " << v << ", num_bits = " << num_bits; diff --git a/cpp/src/parquet/column_writer_test.cc b/cpp/src/parquet/column_writer_test.cc index 2cd21628b3f..0da78264832 100644 --- a/cpp/src/parquet/column_writer_test.cc +++ b/cpp/src/parquet/column_writer_test.cc @@ -400,7 +400,8 @@ typedef ::testing::Types; +using TestValuesWriterInt32Type = TestPrimitiveWriter; +using TestValuesWriterInt64Type = TestPrimitiveWriter; TYPED_TEST(TestPrimitiveWriter, RequiredPlain) { this->TestRequiredWithEncoding(Encoding::PLAIN); @@ -418,11 +419,17 @@ TYPED_TEST(TestPrimitiveWriter, RequiredRLE) { TYPED_TEST(TestPrimitiveWriter, RequiredBitPacked) { this->TestRequiredWithEncoding(Encoding::BIT_PACKED); } +*/ + +TEST_F(TestValuesWriterInt32Type, RequiredDeltaBinaryPacked) { + this->TestRequiredWithEncoding(Encoding::DELTA_BINARY_PACKED); +} -TYPED_TEST(TestPrimitiveWriter, RequiredDeltaBinaryPacked) { +TEST_F(TestValuesWriterInt64Type, RequiredDeltaBinaryPacked) { this->TestRequiredWithEncoding(Encoding::DELTA_BINARY_PACKED); } +/* TYPED_TEST(TestPrimitiveWriter, RequiredDeltaLengthByteArray) { this->TestRequiredWithEncoding(Encoding::DELTA_LENGTH_BYTE_ARRAY); } @@ -430,11 +437,11 @@ TYPED_TEST(TestPrimitiveWriter, RequiredDeltaLengthByteArray) { TYPED_TEST(TestPrimitiveWriter, RequiredDeltaByteArray) { this->TestRequiredWithEncoding(Encoding::DELTA_BYTE_ARRAY); } +*/ TYPED_TEST(TestPrimitiveWriter, RequiredRLEDictionary) { this->TestRequiredWithEncoding(Encoding::RLE_DICTIONARY); } -*/ TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStats) { this->TestRequiredWithSettings(Encoding::PLAIN, Compression::UNCOMPRESSED, false, true, @@ -647,7 +654,7 @@ TEST(TestWriter, NullValuesBuffer) { // PARQUET-719 // Test case for NULL values -TEST_F(TestNullValuesWriter, OptionalNullValueChunk) { +TEST_F(TestValuesWriterInt32Type, OptionalNullValueChunk) { this->SetUpSchema(Repetition::OPTIONAL); this->GenerateData(LARGE_SIZE); diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 156f89d18ab..649dbae8b0d 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -3041,7 +3041,8 @@ std::unique_ptr MakeEncoder(Type::type type_num, Encoding::type encodin case Type::INT64: return std::unique_ptr(new DeltaBitPackEncoder(descr, pool)); default: - throw ParquetException("DELTA_BINARY_PACKED only supports INT32 and INT64"); + throw ParquetException( + "DELTA_BINARY_PACKED encoder only supports INT32 and INT64"); break; } } else { @@ -3091,7 +3092,8 @@ std::unique_ptr MakeDecoder(Type::type type_num, Encoding::type encodin case Type::INT64: return std::make_unique>(descr); default: - throw ParquetException("DELTA_BINARY_PACKED only supports INT32 and INT64"); + throw ParquetException( + "DELTA_BINARY_PACKED decoder only supports INT32 and INT64"); break; } } else if (encoding == Encoding::DELTA_BYTE_ARRAY) { From c25ae527f66a3e41d78c901108d6ce5d77bf5ae0 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Tue, 6 Dec 2022 20:45:31 +0100 Subject: [PATCH 20/26] Apply suggestions from code review Co-authored-by: Antoine Pitrou --- cpp/src/parquet/encoding.cc | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 649dbae8b0d..dc8cc65ffc1 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -2187,7 +2187,7 @@ void DeltaBitPackEncoder::Put(const T* src, int num_values) { UT value = static_cast(src[idx]); // Calculate deltas. The possible overflow is handled by use of unsigned integers // making subtraction operations well defined and correct even in case of overflow. - // Encoded integes will wrap back around on decoding. + // Encoded integers will wrap back around on decoding. // See http://en.wikipedia.org/wiki/Modular_arithmetic#Integers_modulo_n deltas_[values_current_block_] = value - current_value_; current_value_ = value; @@ -2214,7 +2214,7 @@ void DeltaBitPackEncoder::FlushBlock() { uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_); DCHECK(bit_width_data != nullptr); - uint32_t num_miniblocks = + const uint32_t num_miniblocks = static_cast(std::ceil(static_cast(values_current_block_) / static_cast(values_per_mini_block_))); for (uint32_t i = 0; i < num_miniblocks; i++) { @@ -2227,7 +2227,7 @@ void DeltaBitPackEncoder::FlushBlock() { // The minimum number of bits required to write any of values in deltas_ vector. // See overflow comment above. - bit_widths_[i] = bit_util::NumRequiredBits(max_delta - min_delta); + const auto bit_width = bit_widths_[i] = bit_util::NumRequiredBits(max_delta - min_delta); for (uint32_t j = start; j < start + values_current_mini_block; j++) { // See overflow comment above. @@ -2282,7 +2282,7 @@ std::shared_ptr DeltaBitPackEncoder::FlushValues() { std::memcpy(buffer->mutable_data() + offset_bytes, header_buffer_, header_writer.bytes_written()); - // Excess bytes at the beginning will are sliced off and ignored. + // Excess bytes at the beginning are sliced off and ignored. return SliceBuffer(buffer, offset_bytes); } From 62ae22565e62775451c7bd17460879661f7c69ac Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Wed, 7 Dec 2022 02:34:34 +0100 Subject: [PATCH 21/26] Review feedback. --- cpp/src/parquet/encoding.cc | 55 ++++++++++++++++++-------------- cpp/src/parquet/encoding_test.cc | 52 ++++++++++++++++++++---------- 2 files changed, 67 insertions(+), 40 deletions(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index dc8cc65ffc1..bfc3c88f1f6 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -2099,6 +2099,7 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, template class DeltaBitPackEncoder : public EncoderImpl, virtual public TypedEncoder { + // Maximum possible header size static constexpr uint32_t kMaxPageHeaderWriterSize = 32; static constexpr uint32_t kValuesPerBlock = 128; static constexpr uint32_t kMiniBlocksPerBlock = 4; @@ -2119,8 +2120,8 @@ class DeltaBitPackEncoder : public EncoderImpl, virtual public TypedEncodermutable_data(), static_cast(bits_buffer_->size())), - bit_widths_(mini_blocks_per_block, 0) { + bit_writer_(bits_buffer_->mutable_data(), + static_cast(bits_buffer_->size())) { if (values_per_block_ % 128 != 0) { throw ParquetException( "the number of values in a block must be multiple of 128, but it's " + @@ -2166,7 +2167,6 @@ class DeltaBitPackEncoder : public EncoderImpl, virtual public TypedEncoder bits_buffer_; ::arrow::BufferBuilder sink_; ::arrow::bit_util::BitWriter bit_writer_; - std::vector bit_widths_; }; template @@ -2205,8 +2205,8 @@ void DeltaBitPackEncoder::FlushBlock() { return; } - auto min_delta = static_cast( - *std::min_element(deltas_.begin(), deltas_.begin() + values_current_block_)); + const UT min_delta = + *std::min_element(deltas_.begin(), deltas_.begin() + values_current_block_); bit_writer_.PutZigZagVlqInt(static_cast(min_delta)); // Call to GetNextBytePtr reserves mini_blocks_per_block_ bytes of space to write @@ -2222,40 +2222,36 @@ void DeltaBitPackEncoder::FlushBlock() { std::min(values_per_mini_block_, values_current_block_); const uint32_t start = i * values_per_mini_block_; - auto max_delta = static_cast(*std::max_element( - deltas_.begin() + start, deltas_.begin() + start + values_current_mini_block)); + const UT max_delta = *std::max_element( + deltas_.begin() + start, deltas_.begin() + start + values_current_mini_block); // The minimum number of bits required to write any of values in deltas_ vector. // See overflow comment above. - const auto bit_width = bit_widths_[i] = bit_util::NumRequiredBits(max_delta - min_delta); + // If, in the last block, less than miniblocks are + // needed to store the values, the bytes storing the bit widths of the unneeded + // miniblocks are still present, their value should be zero, but readers must accept + // arbitrary values as well. + const auto bit_width = bit_width_data[i] = + bit_util::NumRequiredBits(max_delta - min_delta); for (uint32_t j = start; j < start + values_current_mini_block; j++) { // See overflow comment above. const UT value = deltas_[j] - min_delta; - bit_writer_.PutValue(value, bit_widths_[i]); + bit_writer_.PutValue(value, bit_width); } // If there are not enough values to fill the last mini block, we pad the mini block // with zeroes so that its length is the number of values in a full mini block // multiplied by the bit width. for (uint32_t j = values_current_mini_block; j < values_per_mini_block_; j++) { - bit_writer_.PutValue(0, bit_widths_[i]); + bit_writer_.PutValue(0, bit_width); } values_current_block_ -= values_current_mini_block; } DCHECK_EQ(values_current_block_, 0); - // If, in the last block, less than miniblocks are - // needed to store the values, the bytes storing the bit widths of the unneeded - // miniblocks are still present, their value should be zero, but readers must accept - // arbitrary values as well. - for (uint32_t i = 0; i < mini_blocks_per_block_; i++) { - bit_width_data[i] = bit_widths_[i]; - } - bit_writer_.Flush(); PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written())); bit_writer_.Clear(); - bit_width_data = NULL; } template @@ -2286,9 +2282,21 @@ std::shared_ptr DeltaBitPackEncoder::FlushValues() { return SliceBuffer(buffer, offset_bytes); } +void AssertInteger(const ::arrow::Array& values) { + if (values.type_id() != ::arrow::Type::INT32 && + values.type_id() != ::arrow::Type::INT64) { + throw ParquetException("Delta bit pack encoding should only be for integer data."); + } +} + template <> void DeltaBitPackEncoder::Put(const ::arrow::Array& values) { + AssertInteger(values); const ::arrow::ArrayData& data = *values.data(); + if (data.length % sizeof(int32_t) != 0) { + throw ParquetException("Data length must be multiple of length of int32."); + } + if (values.null_count() == 0) { Put(data.GetValues(1), static_cast(data.length)); } else { @@ -2299,7 +2307,11 @@ void DeltaBitPackEncoder::Put(const ::arrow::Array& values) { template <> void DeltaBitPackEncoder::Put(const ::arrow::Array& values) { + AssertInteger(values); const ::arrow::ArrayData& data = *values.data(); + if (data.length % sizeof(int64_t) != 0) { + throw ParquetException("Data length must be multiple of length of int64."); + } if (values.null_count() == 0) { Put(data.GetValues(1), static_cast(data.length)); } else { @@ -2308,11 +2320,6 @@ void DeltaBitPackEncoder::Put(const ::arrow::Array& values) { } } -template -void DeltaBitPackEncoder::Put(const ::arrow::Array& values) { - ParquetException::NYI("direct put of " + values.type()->ToString()); -} - template void DeltaBitPackEncoder::PutSpaced(const T* src, int num_values, const uint8_t* valid_bits, diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc index e991d6bbbf7..c3b793ecf60 100644 --- a/cpp/src/parquet/encoding_test.cc +++ b/cpp/src/parquet/encoding_test.cc @@ -1291,13 +1291,13 @@ class TestDeltaBitPackEncoding : public TestEncodingBase { using c_type = typename Type::c_type; static constexpr int TYPE = Type::type_num; - void InitBoundData(int nvalues, int repeats) { + void InitBoundData(int nvalues, int repeats, c_type half_range) { num_values_ = nvalues * repeats; input_bytes_.resize(num_values_ * sizeof(c_type)); output_bytes_.resize(num_values_ * sizeof(c_type)); draws_ = reinterpret_cast(input_bytes_.data()); decode_buf_ = reinterpret_cast(output_bytes_.data()); - GenerateBoundData(nvalues, draws_, -10, 10, &data_buffer_); + GenerateBoundData(nvalues, draws_, -half_range, half_range, &data_buffer_); // add some repeated values for (int j = 1; j < repeats; ++j) { @@ -1307,22 +1307,20 @@ class TestDeltaBitPackEncoding : public TestEncodingBase { } } - void ExecuteBound(int nvalues, int repeats) { - InitBoundData(nvalues, repeats); + void ExecuteBound(int nvalues, int repeats, c_type half_range) { + InitBoundData(nvalues, repeats, half_range); CheckRoundtrip(); } void ExecuteSpacedBound(int nvalues, int repeats, int64_t valid_bits_offset, - double null_probability) { - InitBoundData(nvalues, repeats); + double null_probability, c_type half_range) { + InitBoundData(nvalues, repeats, half_range); int64_t size = num_values_ + valid_bits_offset; auto rand = ::arrow::random::RandomArrayGenerator(1923); const auto array = rand.UInt8(size, 0, 100, null_probability); const auto valid_bits = array->null_bitmap_data(); - if (valid_bits) { - CheckRoundtripSpaced(valid_bits, valid_bits_offset); - } + CheckRoundtripSpaced(valid_bits, valid_bits_offset); } void CheckRoundtrip() override { @@ -1373,16 +1371,38 @@ using TestDeltaBitPackEncodingTypes = ::testing::Types; TYPED_TEST_SUITE(TestDeltaBitPackEncoding, TestDeltaBitPackEncodingTypes); TYPED_TEST(TestDeltaBitPackEncoding, BasicRoundTrip) { - ASSERT_NO_FATAL_FAILURE(this->Execute(25000, 200)); + using T = typename TypeParam::c_type; + int values_per_block = 128; + int values_per_mini_block = 32; + + // Size a multiple of miniblock size + ASSERT_NO_FATAL_FAILURE(this->Execute(values_per_mini_block * 10, 10)); + // Size a multiple of block size + ASSERT_NO_FATAL_FAILURE(this->Execute(values_per_block * 10, 20)); + // Size multiple of neither miniblock or block size + ASSERT_NO_FATAL_FAILURE( + this->Execute((values_per_mini_block * values_per_block) + 1, 10)); ASSERT_NO_FATAL_FAILURE(this->Execute(0, 0)); - ASSERT_NO_FATAL_FAILURE(this->Execute(2000, 2000)); ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced( - /*nvalues*/ 1234, /*repeats*/ 1, /*valid_bits_offset*/ 64, /*null_prob*/ 0.1)); - ASSERT_NO_FATAL_FAILURE(this->ExecuteBound(25000, 200)); - ASSERT_NO_FATAL_FAILURE(this->ExecuteBound(0, 0)); - ASSERT_NO_FATAL_FAILURE(this->ExecuteBound(2000, 2000)); + /*nvalues*/ 1234, /*repeats*/ 1, /*valid_bits_offset*/ 64, + /*null_probability*/ 0.1)); + + ASSERT_NO_FATAL_FAILURE(this->ExecuteBound(2000, 2000, 0)); ASSERT_NO_FATAL_FAILURE(this->ExecuteSpacedBound( - /*nvalues*/ 1234, /*repeats*/ 1, /*valid_bits_offset*/ 64, /*null_prob*/ 0.1)); + /*nvalues*/ 1234, /*repeats*/ 1, /*valid_bits_offset*/ 64, + /*null_probability*/ 0.1, + /*half_range*/ 0)); + + int max_bitwidth = bit_util::NumRequiredBits(std::numeric_limits::max()) / 2; + for (int half_range_bitwidth = 0; half_range_bitwidth < max_bitwidth; + half_range_bitwidth += 8) { + T half_range = exp2(half_range_bitwidth); + ASSERT_NO_FATAL_FAILURE(this->ExecuteBound(25000, 200, half_range)); + ASSERT_NO_FATAL_FAILURE(this->ExecuteSpacedBound( + /*nvalues*/ 1234, /*repeats*/ 1, /*valid_bits_offset*/ 64, + /*null_probability*/ 0.1, + /*half_range*/ half_range)); + } } } // namespace test From 90999e1b1e12d649ebef6343e05c0825df9e73c6 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Wed, 7 Dec 2022 15:46:14 +0100 Subject: [PATCH 22/26] Adding bitwidths for unpopulated blocks. --- cpp/src/parquet/encoding.cc | 12 ++++++++---- cpp/src/parquet/encoding_test.cc | 11 ++++++----- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index bfc3c88f1f6..1a0965f20ee 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -2227,10 +2227,6 @@ void DeltaBitPackEncoder::FlushBlock() { // The minimum number of bits required to write any of values in deltas_ vector. // See overflow comment above. - // If, in the last block, less than miniblocks are - // needed to store the values, the bytes storing the bit widths of the unneeded - // miniblocks are still present, their value should be zero, but readers must accept - // arbitrary values as well. const auto bit_width = bit_width_data[i] = bit_util::NumRequiredBits(max_delta - min_delta); @@ -2247,6 +2243,14 @@ void DeltaBitPackEncoder::FlushBlock() { } values_current_block_ -= values_current_mini_block; } + + // If, in the last block, less than miniblocks are + // needed to store the values, the bytes storing the bit widths of the unneeded + // miniblocks are still present, their value should be zero, but readers must accept + // arbitrary values as well. + for (uint32_t i = num_miniblocks; i < mini_blocks_per_block_; i++) { + bit_width_data[i] = 0; + } DCHECK_EQ(values_current_block_, 0); bit_writer_.Flush(); diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc index c3b793ecf60..2b26a13c2e7 100644 --- a/cpp/src/parquet/encoding_test.cc +++ b/cpp/src/parquet/encoding_test.cc @@ -1378,8 +1378,8 @@ TYPED_TEST(TestDeltaBitPackEncoding, BasicRoundTrip) { // Size a multiple of miniblock size ASSERT_NO_FATAL_FAILURE(this->Execute(values_per_mini_block * 10, 10)); // Size a multiple of block size - ASSERT_NO_FATAL_FAILURE(this->Execute(values_per_block * 10, 20)); - // Size multiple of neither miniblock or block size + ASSERT_NO_FATAL_FAILURE(this->Execute(values_per_block * 10, 10)); + // Size multiple of neither miniblock nor block size ASSERT_NO_FATAL_FAILURE( this->Execute((values_per_mini_block * values_per_block) + 1, 10)); ASSERT_NO_FATAL_FAILURE(this->Execute(0, 0)); @@ -1393,10 +1393,11 @@ TYPED_TEST(TestDeltaBitPackEncoding, BasicRoundTrip) { /*null_probability*/ 0.1, /*half_range*/ 0)); - int max_bitwidth = bit_util::NumRequiredBits(std::numeric_limits::max()) / 2; - for (int half_range_bitwidth = 0; half_range_bitwidth < max_bitwidth; + auto max_bitwidth = + std::round(bit_util::NumRequiredBits(std::numeric_limits::max()) / 2); + for (T half_range_bitwidth = 0; half_range_bitwidth < max_bitwidth; half_range_bitwidth += 8) { - T half_range = exp2(half_range_bitwidth); + auto half_range = std::round(exp2(half_range_bitwidth)); ASSERT_NO_FATAL_FAILURE(this->ExecuteBound(25000, 200, half_range)); ASSERT_NO_FATAL_FAILURE(this->ExecuteSpacedBound( /*nvalues*/ 1234, /*repeats*/ 1, /*valid_bits_offset*/ 64, From e1000af3fbb1054531d8dfbdc508ac6f8246e8a6 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Mon, 12 Dec 2022 21:51:10 +0100 Subject: [PATCH 23/26] Review feedback --- cpp/src/arrow/util/rle_encoding_test.cc | 33 +++++++++++++++++++++++++ cpp/src/parquet/encoding_test.cc | 9 +++---- 2 files changed, 37 insertions(+), 5 deletions(-) diff --git a/cpp/src/arrow/util/rle_encoding_test.cc b/cpp/src/arrow/util/rle_encoding_test.cc index 52f355daf21..5a68e9a02eb 100644 --- a/cpp/src/arrow/util/rle_encoding_test.cc +++ b/cpp/src/arrow/util/rle_encoding_test.cc @@ -173,6 +173,39 @@ TEST(BitArray, TestMixed) { } } +// Writes 'num_vals' values with width 'bit_width' and reads them back. +static void TestPutValue(int bit_width, uint64_t num_vals) { + const uint64_t max = uint64_t(2) << (bit_width - 1); + num_vals = std::min(num_vals, max); + int len = static_cast(bit_util::BytesForBits(bit_width * num_vals)); + EXPECT_GT(len, 0); + + std::vector buffer(len); + bit_util::BitWriter writer(buffer.data(), len); + for (uint64_t i = max - num_vals; i < max; i++) { + bool result = writer.PutValue(i, bit_width); + EXPECT_TRUE(result); + } + writer.Flush(); + EXPECT_EQ(writer.bytes_written(), len); + + bit_util::BitReader reader(buffer.data(), len); + for (uint64_t i = max - num_vals; i < max; i++) { + int64_t val = 0; + bool result = reader.GetValue(bit_width, &val); + EXPECT_TRUE(result); + EXPECT_EQ(val, i); + } + EXPECT_EQ(reader.bytes_left(), 0); +} + +TEST(BitUtil, RoundTripIntValues) { + for (int width = 1; width < 64; width++) { + TestPutValue(width, 1); + TestPutValue(width, 1024); + } +} + // Validates encoding of values by encoding and decoding them. If // expected_encoding != NULL, also validates that the encoded buffer is // exactly 'expected_encoding'. diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc index 2b26a13c2e7..8cde41e09cc 100644 --- a/cpp/src/parquet/encoding_test.cc +++ b/cpp/src/parquet/encoding_test.cc @@ -1393,11 +1393,10 @@ TYPED_TEST(TestDeltaBitPackEncoding, BasicRoundTrip) { /*null_probability*/ 0.1, /*half_range*/ 0)); - auto max_bitwidth = - std::round(bit_util::NumRequiredBits(std::numeric_limits::max()) / 2); - for (T half_range_bitwidth = 0; half_range_bitwidth < max_bitwidth; - half_range_bitwidth += 8) { - auto half_range = std::round(exp2(half_range_bitwidth)); + const uint32_t half_bitwidth = sizeof(T) * 4; + for (uint32_t bitwidth = 0; bitwidth < half_bitwidth; bitwidth += 4) { + T half_range = 2 << bitwidth; + ASSERT_NO_FATAL_FAILURE(this->ExecuteBound(25000, 200, half_range)); ASSERT_NO_FATAL_FAILURE(this->ExecuteSpacedBound( /*nvalues*/ 1234, /*repeats*/ 1, /*valid_bits_offset*/ 64, From 702b7a80857a3ffaf91c3989872c9f7f63bff621 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Tue, 13 Dec 2022 19:26:57 +0100 Subject: [PATCH 24/26] Apply suggestions from code review Co-authored-by: Antoine Pitrou --- cpp/src/arrow/util/rle_encoding_test.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/util/rle_encoding_test.cc b/cpp/src/arrow/util/rle_encoding_test.cc index 5a68e9a02eb..42edf221aa6 100644 --- a/cpp/src/arrow/util/rle_encoding_test.cc +++ b/cpp/src/arrow/util/rle_encoding_test.cc @@ -173,7 +173,7 @@ TEST(BitArray, TestMixed) { } } -// Writes 'num_vals' values with width 'bit_width' and reads them back. +// Write up to 'num_vals' values with width 'bit_width' and reads them back. static void TestPutValue(int bit_width, uint64_t num_vals) { const uint64_t max = uint64_t(2) << (bit_width - 1); num_vals = std::min(num_vals, max); @@ -200,7 +200,7 @@ static void TestPutValue(int bit_width, uint64_t num_vals) { } TEST(BitUtil, RoundTripIntValues) { - for (int width = 1; width < 64; width++) { + for (int width = 1; width <= 64; width++) { TestPutValue(width, 1); TestPutValue(width, 1024); } From d46251c394fff186f57a0027f11e090a92ae5e61 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Tue, 13 Dec 2022 19:45:09 +0100 Subject: [PATCH 25/26] Review feedback --- cpp/src/arrow/util/rle_encoding_test.cc | 2 +- cpp/src/parquet/encoding.cc | 25 ++++++++++++------------- cpp/src/parquet/encoding_test.cc | 7 ++++--- 3 files changed, 17 insertions(+), 17 deletions(-) diff --git a/cpp/src/arrow/util/rle_encoding_test.cc b/cpp/src/arrow/util/rle_encoding_test.cc index 42edf221aa6..9b904ea7b61 100644 --- a/cpp/src/arrow/util/rle_encoding_test.cc +++ b/cpp/src/arrow/util/rle_encoding_test.cc @@ -200,7 +200,7 @@ static void TestPutValue(int bit_width, uint64_t num_vals) { } TEST(BitUtil, RoundTripIntValues) { - for (int width = 1; width <= 64; width++) { + for (int width = 1; width < 64; width++) { TestPutValue(width, 1); TestPutValue(width, 1024); } diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 1a0965f20ee..4923870e9e6 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -2286,19 +2286,15 @@ std::shared_ptr DeltaBitPackEncoder::FlushValues() { return SliceBuffer(buffer, offset_bytes); } -void AssertInteger(const ::arrow::Array& values) { - if (values.type_id() != ::arrow::Type::INT32 && - values.type_id() != ::arrow::Type::INT64) { - throw ParquetException("Delta bit pack encoding should only be for integer data."); - } -} - template <> void DeltaBitPackEncoder::Put(const ::arrow::Array& values) { - AssertInteger(values); const ::arrow::ArrayData& data = *values.data(); - if (data.length % sizeof(int32_t) != 0) { - throw ParquetException("Data length must be multiple of length of int32."); + if (values.type_id() != ::arrow::Type::INT32) { + throw ParquetException("Expected Int32TArray, got ", values.type()->ToString()); + } + if (data.length > std::numeric_limits::max()) { + throw ParquetException("Array cannot be longer than ", + std::numeric_limits::max()); } if (values.null_count() == 0) { @@ -2311,10 +2307,13 @@ void DeltaBitPackEncoder::Put(const ::arrow::Array& values) { template <> void DeltaBitPackEncoder::Put(const ::arrow::Array& values) { - AssertInteger(values); const ::arrow::ArrayData& data = *values.data(); - if (data.length % sizeof(int64_t) != 0) { - throw ParquetException("Data length must be multiple of length of int64."); + if (values.type_id() != ::arrow::Type::INT64) { + throw ParquetException("Expected Int64TArray, got ", values.type()->ToString()); + } + if (data.length > std::numeric_limits::max()) { + throw ParquetException("Array cannot be longer than ", + std::numeric_limits::max()); } if (values.null_count() == 0) { Put(data.GetValues(1), static_cast(data.length)); diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc index 8cde41e09cc..5f788fcd7d0 100644 --- a/cpp/src/parquet/encoding_test.cc +++ b/cpp/src/parquet/encoding_test.cc @@ -1372,6 +1372,7 @@ TYPED_TEST_SUITE(TestDeltaBitPackEncoding, TestDeltaBitPackEncodingTypes); TYPED_TEST(TestDeltaBitPackEncoding, BasicRoundTrip) { using T = typename TypeParam::c_type; + using UT = std::make_unsigned_t; int values_per_block = 128; int values_per_mini_block = 32; @@ -1393,9 +1394,9 @@ TYPED_TEST(TestDeltaBitPackEncoding, BasicRoundTrip) { /*null_probability*/ 0.1, /*half_range*/ 0)); - const uint32_t half_bitwidth = sizeof(T) * 4; - for (uint32_t bitwidth = 0; bitwidth < half_bitwidth; bitwidth += 4) { - T half_range = 2 << bitwidth; + const uint32_t max_bitwidth = sizeof(T) * 8; + for (uint32_t bitwidth = 4; bitwidth <= max_bitwidth; bitwidth += 4) { + UT half_range = (UT(1) << (bitwidth - 1)) / 2; ASSERT_NO_FATAL_FAILURE(this->ExecuteBound(25000, 200, half_range)); ASSERT_NO_FATAL_FAILURE(this->ExecuteSpacedBound( From 01271e75351458aa1be6ddc5aada2552ed89d249 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Wed, 14 Dec 2022 15:08:45 +0100 Subject: [PATCH 26/26] Apply suggestions from code review Co-authored-by: Antoine Pitrou --- cpp/src/arrow/util/rle_encoding_test.cc | 3 ++- cpp/src/parquet/encoding_test.cc | 10 ++++++---- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/cpp/src/arrow/util/rle_encoding_test.cc b/cpp/src/arrow/util/rle_encoding_test.cc index 9b904ea7b61..01d1ffd767f 100644 --- a/cpp/src/arrow/util/rle_encoding_test.cc +++ b/cpp/src/arrow/util/rle_encoding_test.cc @@ -175,7 +175,8 @@ TEST(BitArray, TestMixed) { // Write up to 'num_vals' values with width 'bit_width' and reads them back. static void TestPutValue(int bit_width, uint64_t num_vals) { - const uint64_t max = uint64_t(2) << (bit_width - 1); + // The max value representable in `bit_width` bits. + const uint64_t max = std::numeric_limits::max() >> (64 - bit_width); num_vals = std::min(num_vals, max); int len = static_cast(bit_util::BytesForBits(bit_width * num_vals)); EXPECT_GT(len, 0); diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc index 5f788fcd7d0..f0a5f32c413 100644 --- a/cpp/src/parquet/encoding_test.cc +++ b/cpp/src/parquet/encoding_test.cc @@ -1372,7 +1372,6 @@ TYPED_TEST_SUITE(TestDeltaBitPackEncoding, TestDeltaBitPackEncodingTypes); TYPED_TEST(TestDeltaBitPackEncoding, BasicRoundTrip) { using T = typename TypeParam::c_type; - using UT = std::make_unsigned_t; int values_per_block = 128; int values_per_mini_block = 32; @@ -1394,9 +1393,12 @@ TYPED_TEST(TestDeltaBitPackEncoding, BasicRoundTrip) { /*null_probability*/ 0.1, /*half_range*/ 0)); - const uint32_t max_bitwidth = sizeof(T) * 8; - for (uint32_t bitwidth = 4; bitwidth <= max_bitwidth; bitwidth += 4) { - UT half_range = (UT(1) << (bitwidth - 1)) / 2; + const int max_bitwidth = sizeof(T) * 8; + std::vector bitwidths = { + 1, 2, 3, 5, 8, 11, 16, max_bitwidth - 8, max_bitwidth - 1, max_bitwidth}; + for (int bitwidth : bitwidths) { + T half_range = + std::numeric_limits::max() >> static_cast(max_bitwidth - bitwidth); ASSERT_NO_FATAL_FAILURE(this->ExecuteBound(25000, 200, half_range)); ASSERT_NO_FATAL_FAILURE(this->ExecuteSpacedBound(