Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion cpp/src/arrow/util/bit_stream_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,15 @@ class BitReader {

/// Returns the number of bytes left in the stream, not including the current
/// byte (i.e., there may be an additional fraction of a byte).
int bytes_left() {
int bytes_left() const {
return max_bytes_ -
(byte_offset_ + static_cast<int>(bit_util::BytesForBits(bit_offset_)));
}

const uint8_t* begins() const { return buffer_; }

int64_t sum_bit_offsets() const { return byte_offset_ * 8 + bit_offset_; }

/// Maximum byte length of a vlq encoded int
static constexpr int kMaxVlqByteLength = 5;

Expand Down
24 changes: 14 additions & 10 deletions cpp/src/parquet/encoding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2703,8 +2703,7 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl,
MemoryPool* pool = ::arrow::default_memory_pool())
: DecoderImpl(descr, Encoding::DELTA_LENGTH_BYTE_ARRAY),
len_decoder_(nullptr, pool),
buffered_length_(AllocateBuffer(pool, 0)),
buffered_data_(AllocateBuffer(pool, 0)) {}
buffered_length_(AllocateBuffer(pool, 0)) {}

void SetData(int num_values, const uint8_t* data, int len) override {
num_values_ = num_values;
Expand All @@ -2713,9 +2712,10 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl,
DecodeLengths();
}

// SetDecoder will be used by DeltaByteArrayDecoder.
void SetDecoder(int num_values, std::shared_ptr<::arrow::bit_util::BitReader> decoder) {
num_values_ = num_values;
decoder_ = decoder;
decoder_ = std::move(decoder);
DecodeLengths();
}

Expand All @@ -2736,21 +2736,26 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl,
throw ParquetException("negative string delta length");
}
buffer[i].len = len;
if (AddWithOverflow(data_size, len, &data_size)) {
if (ARROW_PREDICT_FALSE(AddWithOverflow(data_size, len, &data_size))) {
throw ParquetException("excess expansion in DELTA_(LENGTH_)BYTE_ARRAY");
}
}
length_idx_ += max_values;

PARQUET_THROW_NOT_OK(buffered_data_->Resize(data_size));
if (decoder_->GetBatch(8, buffered_data_->mutable_data(), data_size) != data_size) {
const uint8_t* begin = decoder_->begins();
int64_t current_bits_offset = decoder_->sum_bit_offsets();
if (ARROW_PREDICT_FALSE(current_bits_offset % 8 != 0)) {
throw ParquetException("Invalid DELTA_(LENGTH_)BYTE_ARRAY");
}
int64_t current_bytes_offset = current_bits_offset / 8;
// Check overflow
if (ARROW_PREDICT_FALSE(!decoder_->Advance(static_cast<int64_t>(data_size) * 8))) {
ParquetException::EofException();
}
const uint8_t* data_ptr = buffered_data_->data();

for (int i = 0; i < max_values; ++i) {
buffer[i].ptr = data_ptr;
data_ptr += buffer[i].len;
buffer[i].ptr = begin + current_bytes_offset;
current_bytes_offset += buffer[i].len;
}
this->num_values_ -= max_values;
num_valid_values_ -= max_values;
Expand Down Expand Up @@ -2835,7 +2840,6 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl,
int num_valid_values_;
uint32_t length_idx_;
std::shared_ptr<ResizableBuffer> buffered_length_;
std::shared_ptr<ResizableBuffer> buffered_data_;
};

// ----------------------------------------------------------------------
Expand Down