From fb83b3813512d49295572a8cb332748778f5f3dd Mon Sep 17 00:00:00 2001 From: mwish Date: Fri, 24 Feb 2023 23:44:59 +0800 Subject: [PATCH 1/3] Parquet: Optimize Decoding DELTA_LENGTH_BYTE_ARRAY --- cpp/src/parquet/encoding.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 44f02b24261..9a4ea7e0944 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -2715,7 +2715,7 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl, void SetDecoder(int num_values, std::shared_ptr<::arrow::bit_util::BitReader> decoder) { num_values_ = num_values; - decoder_ = decoder; + decoder_ = std::move(decoder); DecodeLengths(); } @@ -2736,14 +2736,14 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl, throw ParquetException("negative string delta length"); } buffer[i].len = len; - if (AddWithOverflow(data_size, len, &data_size)) { + if (ARROW_PREDICT_FALSE(AddWithOverflow(data_size, len, &data_size))) { throw ParquetException("excess expansion in DELTA_(LENGTH_)BYTE_ARRAY"); } } length_idx_ += max_values; PARQUET_THROW_NOT_OK(buffered_data_->Resize(data_size)); - if (decoder_->GetBatch(8, buffered_data_->mutable_data(), data_size) != data_size) { + if (ARROW_PREDICT_FALSE(!decoder_->Advance(data_size * 8))) { ParquetException::EofException(); } const uint8_t* data_ptr = buffered_data_->data(); From 620828af6fe4474861acac79157deb46f62c1635 Mon Sep 17 00:00:00 2001 From: mwish Date: Sat, 25 Feb 2023 00:02:24 +0800 Subject: [PATCH 2/3] zero-copy read --- cpp/src/arrow/util/bit_stream_utils.h | 6 +++++- cpp/src/parquet/encoding.cc | 18 +++++++++++------- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/cpp/src/arrow/util/bit_stream_utils.h b/cpp/src/arrow/util/bit_stream_utils.h index 774d0df54b4..1e36075e8c1 100644 --- a/cpp/src/arrow/util/bit_stream_utils.h +++ b/cpp/src/arrow/util/bit_stream_utils.h @@ -178,11 +178,15 @@ class BitReader { /// Returns the number of bytes left in the stream, not including the current /// byte (i.e., there may be an additional fraction of a byte). - int bytes_left() { + int bytes_left() const { return max_bytes_ - (byte_offset_ + static_cast(bit_util::BytesForBits(bit_offset_))); } + const uint8_t* begins() const { return buffer_; } + + int64_t sum_bit_offsets() const { return byte_offset_ * 8 + bit_offset_; } + /// Maximum byte length of a vlq encoded int static constexpr int kMaxVlqByteLength = 5; diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 9a4ea7e0944..f9ab7fa9f8e 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -2703,8 +2703,7 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl, MemoryPool* pool = ::arrow::default_memory_pool()) : DecoderImpl(descr, Encoding::DELTA_LENGTH_BYTE_ARRAY), len_decoder_(nullptr, pool), - buffered_length_(AllocateBuffer(pool, 0)), - buffered_data_(AllocateBuffer(pool, 0)) {} + buffered_length_(AllocateBuffer(pool, 0)) {} void SetData(int num_values, const uint8_t* data, int len) override { num_values_ = num_values; @@ -2713,6 +2712,7 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl, DecodeLengths(); } + // SetDecoder will be used by DeltaByteArrayDecoder. void SetDecoder(int num_values, std::shared_ptr<::arrow::bit_util::BitReader> decoder) { num_values_ = num_values; decoder_ = std::move(decoder); @@ -2742,15 +2742,20 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl, } length_idx_ += max_values; - PARQUET_THROW_NOT_OK(buffered_data_->Resize(data_size)); + const uint8_t* begin = decoder_->begins(); + int64_t current_bits_offset = decoder_->sum_bit_offsets(); + if (ARROW_PREDICT_FALSE(current_bits_offset % 8 != 0)) { + throw ParquetException("Invalid DELTA_(LENGTH_)BYTE_ARRAY"); + } + int64_t current_bytes_offset = current_bits_offset / 8; + // Check overflow if (ARROW_PREDICT_FALSE(!decoder_->Advance(data_size * 8))) { ParquetException::EofException(); } - const uint8_t* data_ptr = buffered_data_->data(); for (int i = 0; i < max_values; ++i) { - buffer[i].ptr = data_ptr; - data_ptr += buffer[i].len; + buffer[i].ptr = begin + current_bytes_offset; + current_bytes_offset += buffer[i].len; } this->num_values_ -= max_values; num_valid_values_ -= max_values; @@ -2835,7 +2840,6 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl, int num_valid_values_; uint32_t length_idx_; std::shared_ptr buffered_length_; - std::shared_ptr buffered_data_; }; // ---------------------------------------------------------------------- From dc33d366895e77d7956444cc12e6e60d5804275b Mon Sep 17 00:00:00 2001 From: mwish Date: Sat, 25 Feb 2023 00:25:50 +0800 Subject: [PATCH 3/3] fix ub --- cpp/src/parquet/encoding.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index f9ab7fa9f8e..48af39b237c 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -2749,7 +2749,7 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl, } int64_t current_bytes_offset = current_bits_offset / 8; // Check overflow - if (ARROW_PREDICT_FALSE(!decoder_->Advance(data_size * 8))) { + if (ARROW_PREDICT_FALSE(!decoder_->Advance(static_cast(data_size) * 8))) { ParquetException::EofException(); }