diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index e3c8ab196f4..0564ea2b93f 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -3300,7 +3300,11 @@ class DeltaByteArrayDecoderImpl : public DecoderImpl, virtual public TypedDecode void SetData(int num_values, const uint8_t* data, int len) override { num_values_ = num_values; - decoder_ = std::make_shared<::arrow::bit_util::BitReader>(data, len); + if (decoder_) { + decoder_->Reset(data, len); + } else { + decoder_ = std::make_shared<::arrow::bit_util::BitReader>(data, len); + } prefix_len_decoder_.SetDecoder(num_values, decoder_); // get the number of encoded prefix lengths @@ -3323,7 +3327,7 @@ class DeltaByteArrayDecoderImpl : public DecoderImpl, virtual public TypedDecode // TODO: read corrupted files written with bug(PARQUET-246). last_value_ should be set // to last_value_in_previous_page_ when decoding a new page(except the first page) - last_value_ = ""; + last_value_.clear(); } int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, diff --git a/cpp/src/parquet/encoding_benchmark.cc b/cpp/src/parquet/encoding_benchmark.cc index 6726810911f..717c7163305 100644 --- a/cpp/src/parquet/encoding_benchmark.cc +++ b/cpp/src/parquet/encoding_benchmark.cc @@ -737,6 +737,114 @@ static void BM_DeltaLengthDecodingSpacedByteArray(benchmark::State& state) { BENCHMARK(BM_PlainDecodingSpacedByteArray)->Apply(ByteArrayCustomArguments); BENCHMARK(BM_DeltaLengthDecodingSpacedByteArray)->Apply(ByteArrayCustomArguments); +struct DeltaByteArrayState { + int32_t min_size = 0; + int32_t max_size; + int32_t array_length; + int32_t total_data_size = 0; + double prefixed_probability; + std::vector buf; + + explicit DeltaByteArrayState(const benchmark::State& state) + : max_size(static_cast(state.range(0))), + array_length(static_cast(state.range(1))), + prefixed_probability(state.range(2) / 100.0) {} + + std::vector MakeRandomByteArray(uint32_t seed) { + std::default_random_engine gen(seed); + std::uniform_int_distribution dist_size(min_size, max_size); + std::uniform_int_distribution dist_byte(0, 255); + std::bernoulli_distribution dist_has_prefix(prefixed_probability); + std::uniform_real_distribution dist_prefix_length(0, 1); + + std::vector out(array_length); + buf.resize(max_size * array_length); + auto buf_ptr = buf.data(); + total_data_size = 0; + + for (int32_t i = 0; i < array_length; ++i) { + int len = dist_size(gen); + out[i].len = len; + out[i].ptr = buf_ptr; + + bool do_prefix = i > 0 && dist_has_prefix(gen); + int prefix_len = 0; + if (do_prefix) { + int max_prefix_len = std::min(len, static_cast(out[i - 1].len)); + prefix_len = + static_cast(std::ceil(max_prefix_len * dist_prefix_length(gen))); + } + for (int j = 0; j < prefix_len; ++j) { + buf_ptr[j] = out[i - 1].ptr[j]; + } + for (int j = prefix_len; j < len; ++j) { + buf_ptr[j] = static_cast(dist_byte(gen)); + } + buf_ptr += len; + total_data_size += len; + } + return out; + } +}; + +static void BM_DeltaEncodingByteArray(benchmark::State& state) { + DeltaByteArrayState delta_state(state); + std::vector values = delta_state.MakeRandomByteArray(/*seed=*/42); + + auto encoder = MakeTypedEncoder(Encoding::DELTA_BYTE_ARRAY); + const int64_t plain_encoded_size = + delta_state.total_data_size + 4 * delta_state.array_length; + int64_t encoded_size = 0; + + for (auto _ : state) { + encoder->Put(values.data(), static_cast(values.size())); + encoded_size = encoder->FlushValues()->size(); + } + state.SetItemsProcessed(state.iterations() * delta_state.array_length); + state.SetBytesProcessed(state.iterations() * delta_state.total_data_size); + state.counters["compression_ratio"] = + static_cast(plain_encoded_size) / encoded_size; +} + +static void BM_DeltaDecodingByteArray(benchmark::State& state) { + DeltaByteArrayState delta_state(state); + std::vector values = delta_state.MakeRandomByteArray(/*seed=*/42); + + auto encoder = MakeTypedEncoder(Encoding::DELTA_BYTE_ARRAY); + encoder->Put(values.data(), static_cast(values.size())); + std::shared_ptr buf = encoder->FlushValues(); + + const int64_t plain_encoded_size = + delta_state.total_data_size + 4 * delta_state.array_length; + const int64_t encoded_size = buf->size(); + + auto decoder = MakeTypedDecoder(Encoding::DELTA_BYTE_ARRAY); + for (auto _ : state) { + decoder->SetData(delta_state.array_length, buf->data(), + static_cast(buf->size())); + decoder->Decode(values.data(), static_cast(values.size())); + ::benchmark::DoNotOptimize(values); + } + state.SetItemsProcessed(state.iterations() * delta_state.array_length); + state.SetBytesProcessed(state.iterations() * delta_state.total_data_size); + state.counters["compression_ratio"] = + static_cast(plain_encoded_size) / encoded_size; +} + +static void ByteArrayDeltaCustomArguments(benchmark::internal::Benchmark* b) { + for (int max_string_length : {8, 64, 1024}) { + for (int batch_size : {512, 2048}) { + for (int prefixed_percent : {10, 90, 99}) { + b->Args({max_string_length, batch_size, prefixed_percent}); + } + } + } + b->ArgNames({"max-string-length", "batch-size", "prefixed-percent"}); +} + +BENCHMARK(BM_DeltaEncodingByteArray)->Apply(ByteArrayDeltaCustomArguments); +BENCHMARK(BM_DeltaDecodingByteArray)->Apply(ByteArrayDeltaCustomArguments); + static void BM_RleEncodingBoolean(benchmark::State& state) { std::vector values(state.range(0), true); auto encoder = MakeEncoder(Type::BOOLEAN, Encoding::RLE);