Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions cpp/src/parquet/encoding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3300,7 +3300,11 @@ class DeltaByteArrayDecoderImpl : public DecoderImpl, virtual public TypedDecode

void SetData(int num_values, const uint8_t* data, int len) override {
num_values_ = num_values;
decoder_ = std::make_shared<::arrow::bit_util::BitReader>(data, len);
if (decoder_) {
decoder_->Reset(data, len);
} else {
decoder_ = std::make_shared<::arrow::bit_util::BitReader>(data, len);
}
prefix_len_decoder_.SetDecoder(num_values, decoder_);

// get the number of encoded prefix lengths
Expand All @@ -3323,7 +3327,7 @@ class DeltaByteArrayDecoderImpl : public DecoderImpl, virtual public TypedDecode

// TODO: read corrupted files written with bug(PARQUET-246). last_value_ should be set
// to last_value_in_previous_page_ when decoding a new page(except the first page)
last_value_ = "";
last_value_.clear();
}

int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
Expand Down
108 changes: 108 additions & 0 deletions cpp/src/parquet/encoding_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -737,6 +737,114 @@ static void BM_DeltaLengthDecodingSpacedByteArray(benchmark::State& state) {
BENCHMARK(BM_PlainDecodingSpacedByteArray)->Apply(ByteArrayCustomArguments);
BENCHMARK(BM_DeltaLengthDecodingSpacedByteArray)->Apply(ByteArrayCustomArguments);

struct DeltaByteArrayState {
int32_t min_size = 0;
int32_t max_size;
int32_t array_length;
int32_t total_data_size = 0;
double prefixed_probability;
std::vector<uint8_t> buf;

explicit DeltaByteArrayState(const benchmark::State& state)
: max_size(static_cast<int32_t>(state.range(0))),
array_length(static_cast<int32_t>(state.range(1))),
prefixed_probability(state.range(2) / 100.0) {}

std::vector<ByteArray> MakeRandomByteArray(uint32_t seed) {
std::default_random_engine gen(seed);
std::uniform_int_distribution<int> dist_size(min_size, max_size);
std::uniform_int_distribution<int> dist_byte(0, 255);
std::bernoulli_distribution dist_has_prefix(prefixed_probability);
std::uniform_real_distribution<double> dist_prefix_length(0, 1);

std::vector<ByteArray> out(array_length);
buf.resize(max_size * array_length);
auto buf_ptr = buf.data();
total_data_size = 0;

for (int32_t i = 0; i < array_length; ++i) {
int len = dist_size(gen);
out[i].len = len;
out[i].ptr = buf_ptr;

bool do_prefix = i > 0 && dist_has_prefix(gen);
int prefix_len = 0;
if (do_prefix) {
int max_prefix_len = std::min(len, static_cast<int>(out[i - 1].len));
prefix_len =
static_cast<int>(std::ceil(max_prefix_len * dist_prefix_length(gen)));
}
for (int j = 0; j < prefix_len; ++j) {
buf_ptr[j] = out[i - 1].ptr[j];
}
for (int j = prefix_len; j < len; ++j) {
buf_ptr[j] = static_cast<uint8_t>(dist_byte(gen));
}
buf_ptr += len;
total_data_size += len;
}
return out;
}
};

static void BM_DeltaEncodingByteArray(benchmark::State& state) {
DeltaByteArrayState delta_state(state);
std::vector<ByteArray> values = delta_state.MakeRandomByteArray(/*seed=*/42);

auto encoder = MakeTypedEncoder<ByteArrayType>(Encoding::DELTA_BYTE_ARRAY);
const int64_t plain_encoded_size =
delta_state.total_data_size + 4 * delta_state.array_length;
int64_t encoded_size = 0;

for (auto _ : state) {
encoder->Put(values.data(), static_cast<int>(values.size()));
encoded_size = encoder->FlushValues()->size();
}
state.SetItemsProcessed(state.iterations() * delta_state.array_length);
state.SetBytesProcessed(state.iterations() * delta_state.total_data_size);
state.counters["compression_ratio"] =
static_cast<double>(plain_encoded_size) / encoded_size;
}

static void BM_DeltaDecodingByteArray(benchmark::State& state) {
DeltaByteArrayState delta_state(state);
std::vector<ByteArray> values = delta_state.MakeRandomByteArray(/*seed=*/42);

auto encoder = MakeTypedEncoder<ByteArrayType>(Encoding::DELTA_BYTE_ARRAY);
encoder->Put(values.data(), static_cast<int>(values.size()));
std::shared_ptr<Buffer> buf = encoder->FlushValues();

const int64_t plain_encoded_size =
delta_state.total_data_size + 4 * delta_state.array_length;
const int64_t encoded_size = buf->size();

auto decoder = MakeTypedDecoder<ByteArrayType>(Encoding::DELTA_BYTE_ARRAY);
for (auto _ : state) {
decoder->SetData(delta_state.array_length, buf->data(),
static_cast<int>(buf->size()));
decoder->Decode(values.data(), static_cast<int>(values.size()));
::benchmark::DoNotOptimize(values);
}
state.SetItemsProcessed(state.iterations() * delta_state.array_length);
state.SetBytesProcessed(state.iterations() * delta_state.total_data_size);
state.counters["compression_ratio"] =
static_cast<double>(plain_encoded_size) / encoded_size;
}

static void ByteArrayDeltaCustomArguments(benchmark::internal::Benchmark* b) {
for (int max_string_length : {8, 64, 1024}) {
for (int batch_size : {512, 2048}) {
for (int prefixed_percent : {10, 90, 99}) {
b->Args({max_string_length, batch_size, prefixed_percent});
}
}
}
b->ArgNames({"max-string-length", "batch-size", "prefixed-percent"});
}

BENCHMARK(BM_DeltaEncodingByteArray)->Apply(ByteArrayDeltaCustomArguments);
BENCHMARK(BM_DeltaDecodingByteArray)->Apply(ByteArrayDeltaCustomArguments);

static void BM_RleEncodingBoolean(benchmark::State& state) {
std::vector<bool> values(state.range(0), true);
auto encoder = MakeEncoder(Type::BOOLEAN, Encoding::RLE);
Expand Down