diff --git a/cpp/src/arrow/util/bit_stream_utils.h b/cpp/src/arrow/util/bit_stream_utils.h index 774d0df54b4..1e36075e8c1 100644 --- a/cpp/src/arrow/util/bit_stream_utils.h +++ b/cpp/src/arrow/util/bit_stream_utils.h @@ -178,11 +178,15 @@ class BitReader { /// Returns the number of bytes left in the stream, not including the current /// byte (i.e., there may be an additional fraction of a byte). - int bytes_left() { + int bytes_left() const { return max_bytes_ - (byte_offset_ + static_cast(bit_util::BytesForBits(bit_offset_))); } + const uint8_t* begins() const { return buffer_; } + + int64_t sum_bit_offsets() const { return byte_offset_ * 8 + bit_offset_; } + /// Maximum byte length of a vlq encoded int static constexpr int kMaxVlqByteLength = 5; diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 44f02b24261..48af39b237c 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -2703,8 +2703,7 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl, MemoryPool* pool = ::arrow::default_memory_pool()) : DecoderImpl(descr, Encoding::DELTA_LENGTH_BYTE_ARRAY), len_decoder_(nullptr, pool), - buffered_length_(AllocateBuffer(pool, 0)), - buffered_data_(AllocateBuffer(pool, 0)) {} + buffered_length_(AllocateBuffer(pool, 0)) {} void SetData(int num_values, const uint8_t* data, int len) override { num_values_ = num_values; @@ -2713,9 +2712,10 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl, DecodeLengths(); } + // SetDecoder will be used by DeltaByteArrayDecoder. void SetDecoder(int num_values, std::shared_ptr<::arrow::bit_util::BitReader> decoder) { num_values_ = num_values; - decoder_ = decoder; + decoder_ = std::move(decoder); DecodeLengths(); } @@ -2736,21 +2736,26 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl, throw ParquetException("negative string delta length"); } buffer[i].len = len; - if (AddWithOverflow(data_size, len, &data_size)) { + if (ARROW_PREDICT_FALSE(AddWithOverflow(data_size, len, &data_size))) { throw ParquetException("excess expansion in DELTA_(LENGTH_)BYTE_ARRAY"); } } length_idx_ += max_values; - PARQUET_THROW_NOT_OK(buffered_data_->Resize(data_size)); - if (decoder_->GetBatch(8, buffered_data_->mutable_data(), data_size) != data_size) { + const uint8_t* begin = decoder_->begins(); + int64_t current_bits_offset = decoder_->sum_bit_offsets(); + if (ARROW_PREDICT_FALSE(current_bits_offset % 8 != 0)) { + throw ParquetException("Invalid DELTA_(LENGTH_)BYTE_ARRAY"); + } + int64_t current_bytes_offset = current_bits_offset / 8; + // Check overflow + if (ARROW_PREDICT_FALSE(!decoder_->Advance(static_cast(data_size) * 8))) { ParquetException::EofException(); } - const uint8_t* data_ptr = buffered_data_->data(); for (int i = 0; i < max_values; ++i) { - buffer[i].ptr = data_ptr; - data_ptr += buffer[i].len; + buffer[i].ptr = begin + current_bytes_offset; + current_bytes_offset += buffer[i].len; } this->num_values_ -= max_values; num_valid_values_ -= max_values; @@ -2835,7 +2840,6 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl, int num_valid_values_; uint32_t length_idx_; std::shared_ptr buffered_length_; - std::shared_ptr buffered_data_; }; // ----------------------------------------------------------------------