Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions cpp/src/arrow/util/bit_stream_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@

#pragma once

#include <string.h>

#include <algorithm>
#include <cstdint>
#include <cstring>

#include "arrow/util/bit_util.h"
#include "arrow/util/bpacking.h"
Expand Down Expand Up @@ -153,7 +152,7 @@ class BitReader {
/// 'num_bytes'. The value is assumed to be byte-aligned so the stream will
/// be advanced to the start of the next byte before 'v' is read. Returns
/// false if there are not enough bytes left.
/// Assume the v was stored in buffer_ as a litte-endian format
/// Assume the v was stored in buffer_ as a little-endian format
template <typename T>
bool GetAligned(int num_bytes, T* v);

Expand Down Expand Up @@ -318,7 +317,7 @@ inline bool BitReader::GetValue(int num_bits, T* v) {
template <typename T>
inline int BitReader::GetBatch(int num_bits, T* v, int batch_size) {
DCHECK(buffer_ != NULL);
DCHECK_LE(num_bits, static_cast<int>(sizeof(T) * 8));
DCHECK_LE(num_bits, static_cast<int>(sizeof(T) * 8)) << "num_bits: " << num_bits;

int bit_offset = bit_offset_;
int byte_offset = byte_offset_;
Expand Down
73 changes: 44 additions & 29 deletions cpp/src/parquet/encoding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2377,8 +2377,8 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder<DTyp
}

int ValidValuesCount() {
// total_value_count_ in header ignores of null values
return static_cast<int>(total_value_count_);
// total_values_remaining_ in header ignores of null values
return static_cast<int>(total_values_remaining_);
}

int Decode(T* buffer, int max_values) override {
Expand Down Expand Up @@ -2420,7 +2420,7 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder<DTyp
!decoder_->GetVlqInt(&mini_blocks_per_block_) ||
!decoder_->GetVlqInt(&total_value_count_) ||
!decoder_->GetZigZagVlqInt(&last_value_)) {
ParquetException::EofException();
ParquetException::EofException("InitHeader EOF");
}

if (values_per_block_ == 0) {
Expand All @@ -2444,42 +2444,52 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder<DTyp
std::to_string(values_per_mini_block_));
}

total_values_remaining_ = total_value_count_;
delta_bit_widths_ = AllocateBuffer(pool_, mini_blocks_per_block_);
block_initialized_ = false;
values_current_mini_block_ = 0;
first_block_initialized_ = false;
values_remaining_current_mini_block_ = 0;
}

void InitBlock() {
if (!decoder_->GetZigZagVlqInt(&min_delta_)) ParquetException::EofException();
DCHECK_GT(total_values_remaining_, 0) << "InitBlock called at EOF";

if (!decoder_->GetZigZagVlqInt(&min_delta_))
ParquetException::EofException("InitBlock EOF");

// read the bitwidth of each miniblock
uint8_t* bit_width_data = delta_bit_widths_->mutable_data();
for (uint32_t i = 0; i < mini_blocks_per_block_; ++i) {
if (!decoder_->GetAligned<uint8_t>(1, bit_width_data + i)) {
ParquetException::EofException();
}
if (bit_width_data[i] > kMaxDeltaBitWidth) {
throw ParquetException("delta bit width " + std::to_string(bit_width_data[i]) +
" larger than integer bit width " +
std::to_string(kMaxDeltaBitWidth));
ParquetException::EofException("Decode bit-width EOF");
}
// Note that non-conformant bitwidth entries are allowed by the Parquet spec
// for extraneous miniblocks in the last block (GH-14923), so we check
// the bitwidths when actually using them (see InitMiniBlock()).
}

mini_block_idx_ = 0;
delta_bit_width_ = bit_width_data[0];
values_current_mini_block_ = values_per_mini_block_;
block_initialized_ = true;
first_block_initialized_ = true;
InitMiniBlock(bit_width_data[0]);
}

void InitMiniBlock(int bit_width) {
if (ARROW_PREDICT_FALSE(bit_width > kMaxDeltaBitWidth)) {
throw ParquetException("delta bit width larger than integer bit width");
}
delta_bit_width_ = bit_width;
values_remaining_current_mini_block_ = values_per_mini_block_;
}

int GetInternal(T* buffer, int max_values) {
max_values = static_cast<int>(std::min<int64_t>(max_values, total_value_count_));
max_values = static_cast<int>(std::min<int64_t>(max_values, total_values_remaining_));
if (max_values == 0) {
return 0;
}

int i = 0;
while (i < max_values) {
if (ARROW_PREDICT_FALSE(values_current_mini_block_ == 0)) {
if (ARROW_PREDICT_FALSE(!block_initialized_)) {
if (ARROW_PREDICT_FALSE(values_remaining_current_mini_block_ == 0)) {
if (ARROW_PREDICT_FALSE(!first_block_initialized_)) {
buffer[i++] = last_value_;
DCHECK_EQ(i, 1); // we're at the beginning of the page
if (ARROW_PREDICT_FALSE(i == max_values)) {
Expand All @@ -2499,16 +2509,15 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder<DTyp
} else {
++mini_block_idx_;
if (mini_block_idx_ < mini_blocks_per_block_) {
delta_bit_width_ = delta_bit_widths_->data()[mini_block_idx_];
values_current_mini_block_ = values_per_mini_block_;
InitMiniBlock(delta_bit_widths_->data()[mini_block_idx_]);
} else {
InitBlock();
}
}
}

int values_decode =
std::min(values_current_mini_block_, static_cast<uint32_t>(max_values - i));
int values_decode = std::min(values_remaining_current_mini_block_,
static_cast<uint32_t>(max_values - i));
if (decoder_->GetBatch(delta_bit_width_, buffer + i, values_decode) !=
values_decode) {
ParquetException::EofException();
Expand All @@ -2520,19 +2529,19 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder<DTyp
static_cast<UT>(last_value_);
last_value_ = buffer[i + j];
}
values_current_mini_block_ -= values_decode;
values_remaining_current_mini_block_ -= values_decode;
i += values_decode;
}
total_value_count_ -= max_values;
total_values_remaining_ -= max_values;
this->num_values_ -= max_values;

if (ARROW_PREDICT_FALSE(total_value_count_ == 0)) {
uint32_t padding_bits = values_current_mini_block_ * delta_bit_width_;
if (ARROW_PREDICT_FALSE(total_values_remaining_ == 0)) {
uint32_t padding_bits = values_remaining_current_mini_block_ * delta_bit_width_;
// skip the padding bits
if (!decoder_->Advance(padding_bits)) {
ParquetException::EofException();
}
values_current_mini_block_ = 0;
values_remaining_current_mini_block_ = 0;
}
return max_values;
}
Expand All @@ -2542,10 +2551,16 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder<DTyp
uint32_t values_per_block_;
uint32_t mini_blocks_per_block_;
uint32_t values_per_mini_block_;
uint32_t values_current_mini_block_;
uint32_t total_value_count_;

bool block_initialized_;
uint32_t total_values_remaining_;
// Remaining values in current mini block. If the current block is the last mini block,
// values_remaining_current_mini_block_ may greater than total_values_remaining_.
uint32_t values_remaining_current_mini_block_;

// If the page doesn't contain any block, `first_block_initialized_` will
// always be false. Otherwise, it will be true when first block initialized.
bool first_block_initialized_;
T min_delta_;
uint32_t mini_block_idx_;
std::shared_ptr<ResizableBuffer> delta_bit_widths_;
Expand Down
98 changes: 82 additions & 16 deletions cpp/src/parquet/encoding_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "arrow/util/bitmap_writer.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/endian.h"
#include "arrow/util/string.h"

#include "parquet/encoding.h"
#include "parquet/platform.h"
Expand Down Expand Up @@ -1325,29 +1326,32 @@ class TestDeltaBitPackEncoding : public TestEncodingBase<Type> {
CheckRoundtripSpaced(valid_bits, valid_bits_offset);
}

void CheckRoundtrip() override {
auto encoder =
MakeTypedEncoder<Type>(Encoding::DELTA_BINARY_PACKED, false, descr_.get());
void CheckDecoding() {
auto decoder = MakeTypedDecoder<Type>(Encoding::DELTA_BINARY_PACKED, descr_.get());
auto read_batch_sizes = kReadBatchSizes;
read_batch_sizes.push_back(num_values_);
// Exercise different batch sizes
for (const int read_batch_size : read_batch_sizes) {
decoder->SetData(num_values_, encode_buffer_->data(),
static_cast<int>(encode_buffer_->size()));

int values_decoded = 0;
while (values_decoded < num_values_) {
values_decoded += decoder->Decode(decode_buf_ + values_decoded, read_batch_size);
}
ASSERT_EQ(num_values_, values_decoded);
ASSERT_NO_FATAL_FAILURE(VerifyResults<c_type>(decode_buf_, draws_, num_values_));
}
}

void CheckRoundtrip() override {
auto encoder =
MakeTypedEncoder<Type>(Encoding::DELTA_BINARY_PACKED, false, descr_.get());
// Encode a number of times to exercise the flush logic
for (size_t i = 0; i < kNumRoundTrips; ++i) {
encoder->Put(draws_, num_values_);
encode_buffer_ = encoder->FlushValues();
// Exercise different batch sizes
for (const int read_batch_size : read_batch_sizes) {
decoder->SetData(num_values_, encode_buffer_->data(),
static_cast<int>(encode_buffer_->size()));

int values_decoded = 0;
while (values_decoded < num_values_) {
values_decoded +=
decoder->Decode(decode_buf_ + values_decoded, read_batch_size);
}
ASSERT_EQ(num_values_, values_decoded);
ASSERT_NO_FATAL_FAILURE(VerifyResults<c_type>(decode_buf_, draws_, num_values_));
}
CheckDecoding();
}
}

Expand Down Expand Up @@ -1398,6 +1402,7 @@ TYPED_TEST(TestDeltaBitPackEncoding, BasicRoundTrip) {
ASSERT_NO_FATAL_FAILURE(
this->Execute((values_per_mini_block * values_per_block) + 1, 10));
ASSERT_NO_FATAL_FAILURE(this->Execute(0, 0));
ASSERT_NO_FATAL_FAILURE(this->Execute(65, 1));
ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced(
/*nvalues*/ 1234, /*repeats*/ 1, /*valid_bits_offset*/ 64,
/*null_probability*/ 0.1));
Expand Down Expand Up @@ -1427,5 +1432,66 @@ TYPED_TEST(TestDeltaBitPackEncoding, BasicRoundTrip) {
}
}

TYPED_TEST(TestDeltaBitPackEncoding, NonZeroPaddedMiniblockBitWidth) {
// GH-14923: depending on the number of encoded values, some of the miniblock
// bitwidths are actually padding bytes that may take non-conformant values
// according to the Parquet spec.

// Same values as in DeltaBitPackEncoder
constexpr int kValuesPerBlock = 128;
constexpr int kMiniBlocksPerBlock = 4;
constexpr int kValuesPerMiniBlock = kValuesPerBlock / kMiniBlocksPerBlock;

// num_values must be kept small enough for kHeaderLength below
for (const int num_values : {2, 62, 63, 64, 65, 95, 96, 97, 127}) {
ARROW_SCOPED_TRACE("num_values = ", num_values);

// Generate input data with a small half_range to make the header length
// deterministic (see kHeaderLength).
this->InitBoundData(num_values, /*repeats=*/1, /*half_range=*/31);
ASSERT_EQ(this->num_values_, num_values);

auto encoder = MakeTypedEncoder<TypeParam>(Encoding::DELTA_BINARY_PACKED, false,
this->descr_.get());
encoder->Put(this->draws_, this->num_values_);
auto encoded = encoder->FlushValues();
const auto encoded_size = encoded->size();

// Make mutable copy of encoded buffer
this->encode_buffer_ = AllocateBuffer(default_memory_pool(), encoded_size);
uint8_t* data = this->encode_buffer_->mutable_data();
memcpy(data, encoded->data(), encoded_size);

// The number of padding bytes at the end of the miniblock bitwidths array.
// We subtract 1 from num_values because the first data value is encoded
// in the header, thus does not participate in miniblock encoding.
const int num_padding_bytes =
(kValuesPerBlock - num_values + 1) / kValuesPerMiniBlock;
ARROW_SCOPED_TRACE("num_padding_bytes = ", num_padding_bytes);

// The header length is:
// - 2 bytes for ULEB128-encoded block size (== kValuesPerBlock)
// - 1 byte for ULEB128-encoded miniblocks per block (== kMiniBlocksPerBlock)
// - 1 byte for ULEB128-encoded num_values
// - 1 byte for ULEB128-encoded first value
// (this assumes that num_values and the first value are narrow enough)
constexpr int kHeaderLength = 5;
// After the header, there is a zigzag ULEB128-encoded min delta for the first block,
// then the miniblock bitwidths for the first block.
// Given a narrow enough range, the zigzag ULEB128-encoded min delta is 1 byte long.
uint8_t* mini_block_bitwidths = data + kHeaderLength + 1;

// Garble padding bytes; decoding should succeed.
for (int i = 0; i < num_padding_bytes; ++i) {
mini_block_bitwidths[kMiniBlocksPerBlock - i - 1] = 0xFFU;
}
ASSERT_NO_THROW(this->CheckDecoding());

// Not a padding byte but an actual miniblock bitwidth; decoding should error out.
mini_block_bitwidths[kMiniBlocksPerBlock - num_padding_bytes - 1] = 0xFFU;
EXPECT_THROW(this->CheckDecoding(), ParquetException);
}
}

} // namespace test
} // namespace parquet