From 6f3335b20c7eb940e391b3ed4298b220c9e79760 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Tue, 3 Sep 2019 15:00:34 -0500 Subject: [PATCH 1/2] Tune performance of dense Parquet BYTE_ARRAY reads to Arrow BinaryArray --- cpp/src/arrow/array/builder_binary.cc | 11 +- cpp/src/arrow/buffer_builder.h | 2 +- cpp/src/parquet/column_reader.cc | 25 +- cpp/src/parquet/encoding.cc | 316 ++++++++++++++++++-------- cpp/src/parquet/encoding.h | 17 +- cpp/src/parquet/encoding_test.cc | 74 +++--- 6 files changed, 293 insertions(+), 152 deletions(-) diff --git a/cpp/src/arrow/array/builder_binary.cc b/cpp/src/arrow/array/builder_binary.cc index 097aa4daf59..8a9296ee624 100644 --- a/cpp/src/arrow/array/builder_binary.cc +++ b/cpp/src/arrow/array/builder_binary.cc @@ -127,14 +127,15 @@ namespace internal { ChunkedBinaryBuilder::ChunkedBinaryBuilder(int32_t max_chunk_value_length, MemoryPool* pool) - : max_chunk_value_length_(max_chunk_value_length), - builder_(new BinaryBuilder(pool)) {} + : max_chunk_value_length_(max_chunk_value_length), builder_(new BinaryBuilder(pool)) { + DCHECK_LE(max_chunk_value_length, kBinaryMemoryLimit); +} ChunkedBinaryBuilder::ChunkedBinaryBuilder(int32_t max_chunk_value_length, int32_t max_chunk_length, MemoryPool* pool) - : max_chunk_value_length_(max_chunk_value_length), - max_chunk_length_(max_chunk_length), - builder_(new BinaryBuilder(pool)) {} + : ChunkedBinaryBuilder(max_chunk_value_length, pool) { + max_chunk_length_ = max_chunk_length; +} Status ChunkedBinaryBuilder::Finish(ArrayVector* out) { if (builder_->length() > 0 || chunks_.size() == 0) { diff --git a/cpp/src/arrow/buffer_builder.h b/cpp/src/arrow/buffer_builder.h index f443c836840..427d66ca669 100644 --- a/cpp/src/arrow/buffer_builder.h +++ b/cpp/src/arrow/buffer_builder.h @@ -85,7 +85,7 @@ class ARROW_EXPORT BufferBuilder { return Resize(GrowByFactor(capacity_, min_capacity), false); } - /// \brief Return a capacity expanded by an unspecified growth factor + /// \brief Return a capacity expanded by the desired growth factor static int64_t GrowByFactor(int64_t current_capacity, int64_t new_capacity) { // Doubling capacity except for large Reserve requests. 2x growth strategy // (versus 1.5x) seems to have slightly better performance when using diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index 5c3e9831dd4..d1dea8e9bc6 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -1209,23 +1209,25 @@ class ByteArrayChunkedRecordReader : public TypedRecordReader, virtual public BinaryRecordReader { public: ByteArrayChunkedRecordReader(const ColumnDescriptor* descr, ::arrow::MemoryPool* pool) - : TypedRecordReader(descr, pool), builder_(nullptr) { - // ARROW-4688(wesm): Using 2^31 - 1 chunks for now - constexpr int32_t kBinaryChunksize = 2147483647; + : TypedRecordReader(descr, pool) { DCHECK_EQ(descr_->physical_type(), Type::BYTE_ARRAY); - builder_.reset( - new ::arrow::internal::ChunkedBinaryBuilder(kBinaryChunksize, this->pool_)); + accumulator_.builder.reset(new ::arrow::BinaryBuilder(pool)); } ::arrow::ArrayVector GetBuilderChunks() override { - ::arrow::ArrayVector chunks; - PARQUET_THROW_NOT_OK(builder_->Finish(&chunks)); - return chunks; + ::arrow::ArrayVector result = accumulator_.chunks; + if (result.size() == 0 || accumulator_.builder->length() > 0) { + std::shared_ptr<::arrow::Array> last_chunk; + PARQUET_THROW_NOT_OK(accumulator_.builder->Finish(&last_chunk)); + result.push_back(last_chunk); + } + accumulator_.chunks = {}; + return result; } void ReadValuesDense(int64_t values_to_read) override { int64_t num_decoded = this->current_decoder_->DecodeArrowNonNull( - static_cast(values_to_read), builder_.get()); + static_cast(values_to_read), &accumulator_); DCHECK_EQ(num_decoded, values_to_read); ResetValues(); } @@ -1233,13 +1235,14 @@ class ByteArrayChunkedRecordReader : public TypedRecordReader, void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override { int64_t num_decoded = this->current_decoder_->DecodeArrow( static_cast(values_to_read), static_cast(null_count), - valid_bits_->mutable_data(), values_written_, builder_.get()); + valid_bits_->mutable_data(), values_written_, &accumulator_); DCHECK_EQ(num_decoded, values_to_read - null_count); ResetValues(); } private: - std::unique_ptr<::arrow::internal::ChunkedBinaryBuilder> builder_; + // Helper data structure for accumulating builder chunks + ArrowBinaryAccumulator accumulator_; }; class ByteArrayDictionaryRecordReader : public TypedRecordReader, diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index e63d69f41e1..a278ff0527d 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -38,6 +38,7 @@ #include "parquet/schema.h" #include "parquet/types.h" +using arrow::Status; using arrow::internal::checked_cast; namespace parquet { @@ -831,6 +832,43 @@ int PlainBooleanDecoder::Decode(bool* buffer, int max_values) { return max_values; } +struct ArrowBinaryHelper { + explicit ArrowBinaryHelper(ArrowBinaryAccumulator* out) { + this->out = out; + this->builder = out->builder.get(); + this->chunk_space_remaining = + ::arrow::kBinaryMemoryLimit - this->builder->value_data_length(); + } + + Status PushChunk() { + std::shared_ptr<::arrow::Array> result; + RETURN_NOT_OK(builder->Finish(&result)); + out->chunks.push_back(result); + chunk_space_remaining = ::arrow::kBinaryMemoryLimit; + return Status::OK(); + } + + bool CanFit(int64_t length) const { return length <= chunk_space_remaining; } + + void UnsafeAppend(const uint8_t* data, int32_t length) { + chunk_space_remaining -= length; + builder->UnsafeAppend(data, length); + } + + void UnsafeAppendNull() { builder->UnsafeAppendNull(); } + + Status Append(const uint8_t* data, int32_t length) { + chunk_space_remaining -= length; + return builder->Append(data, length); + } + + Status AppendNull() { return builder->AppendNull(); } + + ArrowBinaryAccumulator* out; + arrow::BinaryBuilder* builder; + int64_t chunk_space_remaining; +}; + class PlainByteArrayDecoder : public PlainDecoder, virtual public ByteArrayDecoder { public: @@ -838,6 +876,9 @@ class PlainByteArrayDecoder : public PlainDecoder, using Base::DecodeSpaced; using Base::PlainDecoder; + // ---------------------------------------------------------------------- + // Dictionary read paths + int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, arrow::BinaryDictionary32Builder* builder) override { @@ -847,102 +888,133 @@ class PlainByteArrayDecoder : public PlainDecoder, return result; } - int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, - int64_t valid_bits_offset, - arrow::internal::ChunkedBinaryBuilder* builder) override { + int DecodeArrowNonNull(int num_values, + arrow::BinaryDictionary32Builder* builder) override { int result = 0; - PARQUET_THROW_NOT_OK(DecodeArrow(num_values, null_count, valid_bits, - valid_bits_offset, builder, &result)); + PARQUET_THROW_NOT_OK(DecodeArrowNonNull(num_values, builder, &result)); return result; } + // ---------------------------------------------------------------------- + // Optimized dense binary read paths + int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, - int64_t valid_bits_offset, arrow::BinaryBuilder* builder) override { + int64_t valid_bits_offset, ArrowBinaryAccumulator* out) override { int result = 0; - PARQUET_THROW_NOT_OK(DecodeArrow(num_values, null_count, valid_bits, - valid_bits_offset, builder, &result)); + PARQUET_THROW_NOT_OK(DecodeArrowDense(num_values, null_count, valid_bits, + valid_bits_offset, out, &result)); return result; } - int DecodeArrowNonNull(int num_values, - arrow::BinaryDictionary32Builder* builder) override { + int DecodeArrowNonNull(int num_values, ArrowBinaryAccumulator* out) override { int result = 0; - PARQUET_THROW_NOT_OK(DecodeArrowNonNull(num_values, builder, &result)); + PARQUET_THROW_NOT_OK(DecodeArrowDenseNonNull(num_values, out, &result)); return result; } - int DecodeArrowNonNull(int num_values, - arrow::internal::ChunkedBinaryBuilder* builder) override { - int result = 0; - PARQUET_THROW_NOT_OK(DecodeArrowNonNull(num_values, builder, &result)); - return result; + private: + Status DecodeArrowDense(int num_values, int null_count, const uint8_t* valid_bits, + int64_t valid_bits_offset, ArrowBinaryAccumulator* out, + int* out_values_decoded) { + ArrowBinaryHelper helper(out); + arrow::internal::BitmapReader bit_reader(valid_bits, valid_bits_offset, num_values); + int values_decoded = 0; + + RETURN_NOT_OK(helper.builder->Reserve(num_values)); + RETURN_NOT_OK(helper.builder->ReserveData( + std::min(len_, helper.chunk_space_remaining))); + for (int i = 0; i < num_values; ++i) { + if (bit_reader.IsSet()) { + auto value_len = static_cast(arrow::util::SafeLoadAs(data_)); + int increment = static_cast(sizeof(uint32_t) + value_len); + if (ARROW_PREDICT_FALSE(len_ < increment)) ParquetException::EofException(); + if (ARROW_PREDICT_FALSE(!helper.CanFit(value_len))) { + // This element would exceed the capacity of a chunk + RETURN_NOT_OK(helper.PushChunk()); + RETURN_NOT_OK(helper.builder->Reserve(num_values - i)); + RETURN_NOT_OK(helper.builder->ReserveData( + std::min(len_, helper.chunk_space_remaining))); + } + helper.UnsafeAppend(data_ + sizeof(uint32_t), value_len); + data_ += increment; + len_ -= increment; + ++values_decoded; + } else { + helper.UnsafeAppendNull(); + } + bit_reader.Next(); + } + + num_values_ -= values_decoded; + *out_values_decoded = values_decoded; + return Status::OK(); + } + + Status DecodeArrowDenseNonNull(int num_values, ArrowBinaryAccumulator* out, + int* values_decoded) { + ArrowBinaryHelper helper(out); + num_values = std::min(num_values, num_values_); + for (int i = 0; i < num_values; ++i) { + int32_t value_len = static_cast(arrow::util::SafeLoadAs(data_)); + int increment = static_cast(sizeof(uint32_t) + value_len); + if (ARROW_PREDICT_FALSE(len_ < increment)) ParquetException::EofException(); + if (ARROW_PREDICT_FALSE(!helper.CanFit(value_len))) { + // This element would exceed the capacity of a chunk + RETURN_NOT_OK(helper.PushChunk()); + } + RETURN_NOT_OK(helper.Append(data_ + sizeof(uint32_t), value_len)); + data_ += increment; + len_ -= increment; + } + + num_values_ -= num_values; + *values_decoded = num_values; + return Status::OK(); } - private: template - arrow::Status DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, - int64_t valid_bits_offset, BuilderType* builder, - int* out_values_decoded) { + Status DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, + int64_t valid_bits_offset, BuilderType* builder, + int* out_values_decoded) { RETURN_NOT_OK(builder->Reserve(num_values)); arrow::internal::BitmapReader bit_reader(valid_bits, valid_bits_offset, num_values); - int increment; - int i = 0; - const uint8_t* data = data_; - int64_t data_size = len_; - int bytes_decoded = 0; int values_decoded = 0; - while (i < num_values) { + for (int i = 0; i < num_values; ++i) { if (bit_reader.IsSet()) { - uint32_t len = arrow::util::SafeLoadAs(data); - increment = static_cast(sizeof(uint32_t) + len); - if (data_size < increment) { + uint32_t value_len = arrow::util::SafeLoadAs(data_); + int increment = static_cast(sizeof(uint32_t) + value_len); + if (len_ < increment) { ParquetException::EofException(); } - RETURN_NOT_OK(builder->Append(data + sizeof(uint32_t), len)); - data += increment; - data_size -= increment; - bytes_decoded += increment; + RETURN_NOT_OK(builder->Append(data_ + sizeof(uint32_t), value_len)); + data_ += increment; + len_ -= increment; ++values_decoded; } else { RETURN_NOT_OK(builder->AppendNull()); } bit_reader.Next(); - ++i; } - - data_ += bytes_decoded; - len_ -= bytes_decoded; num_values_ -= values_decoded; *out_values_decoded = values_decoded; - return arrow::Status::OK(); + return Status::OK(); } template - arrow::Status DecodeArrowNonNull(int num_values, BuilderType* builder, - int* values_decoded) { + Status DecodeArrowNonNull(int num_values, BuilderType* builder, int* values_decoded) { num_values = std::min(num_values, num_values_); RETURN_NOT_OK(builder->Reserve(num_values)); - int i = 0; - const uint8_t* data = data_; - int64_t data_size = len_; - int bytes_decoded = 0; - - while (i < num_values) { - uint32_t len = arrow::util::SafeLoadAs(data); - int increment = static_cast(sizeof(uint32_t) + len); - if (data_size < increment) ParquetException::EofException(); - RETURN_NOT_OK(builder->Append(data + sizeof(uint32_t), len)); - data += increment; - data_size -= increment; - bytes_decoded += increment; - ++i; + for (int i = 0; i < num_values; ++i) { + uint32_t value_len = arrow::util::SafeLoadAs(data_); + int increment = static_cast(sizeof(uint32_t) + value_len); + if (len_ < increment) ParquetException::EofException(); + RETURN_NOT_OK(builder->Append(data_ + sizeof(uint32_t), value_len)); + data_ += increment; + len_ -= increment; } - - data_ += bytes_decoded; - len_ -= bytes_decoded; num_values_ -= num_values; *values_decoded = num_values; - return arrow::Status::OK(); + return Status::OK(); } }; @@ -1184,42 +1256,114 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, return result; } - int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, - int64_t valid_bits_offset, - arrow::internal::ChunkedBinaryBuilder* builder) override { + int DecodeArrowNonNull(int num_values, + arrow::BinaryDictionary32Builder* builder) override { int result = 0; - PARQUET_THROW_NOT_OK(DecodeArrow(num_values, null_count, valid_bits, - valid_bits_offset, builder, &result)); + PARQUET_THROW_NOT_OK(DecodeArrowNonNull(num_values, builder, &result)); return result; } int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, - int64_t valid_bits_offset, arrow::BinaryBuilder* builder) override { + int64_t valid_bits_offset, ArrowBinaryAccumulator* out) override { int result = 0; - PARQUET_THROW_NOT_OK(DecodeArrow(num_values, null_count, valid_bits, - valid_bits_offset, builder, &result)); + PARQUET_THROW_NOT_OK(DecodeArrowDense(num_values, null_count, valid_bits, + valid_bits_offset, out, &result)); return result; } - int DecodeArrowNonNull(int num_values, - arrow::BinaryDictionary32Builder* builder) override { + int DecodeArrowNonNull(int num_values, ArrowBinaryAccumulator* out) override { int result = 0; - PARQUET_THROW_NOT_OK(DecodeArrowNonNull(num_values, builder, &result)); + PARQUET_THROW_NOT_OK(DecodeArrowDenseNonNull(num_values, out, &result)); return result; } - int DecodeArrowNonNull(int num_values, - arrow::internal::ChunkedBinaryBuilder* builder) override { - int result = 0; - PARQUET_THROW_NOT_OK(DecodeArrowNonNull(num_values, builder, &result)); - return result; + private: + Status DecodeArrowDense(int num_values, int null_count, const uint8_t* valid_bits, + int64_t valid_bits_offset, ArrowBinaryAccumulator* out, + int* out_num_values) { + constexpr int32_t buffer_size = 1024; + int32_t indices_buffer[buffer_size]; + + ArrowBinaryHelper helper(out); + + arrow::internal::BitmapReader bit_reader(valid_bits, valid_bits_offset, num_values); + + auto dict_values = reinterpret_cast(dictionary_->data()); + int values_decoded = 0; + int num_appended = 0; + while (num_appended < num_values) { + bool is_valid = bit_reader.IsSet(); + bit_reader.Next(); + + if (is_valid) { + int32_t batch_size = + std::min(buffer_size, num_values - num_appended - null_count); + int num_indices = idx_decoder_.GetBatch(indices_buffer, batch_size); + + int i = 0; + while (true) { + // Consume all indices + if (is_valid) { + const auto& val = dict_values[indices_buffer[i]]; + if (ARROW_PREDICT_FALSE(!helper.CanFit(val.len))) { + RETURN_NOT_OK(helper.PushChunk()); + } + RETURN_NOT_OK(helper.Append(val.ptr, static_cast(val.len))); + ++i; + ++values_decoded; + } else { + RETURN_NOT_OK(helper.AppendNull()); + --null_count; + } + ++num_appended; + if (i == num_indices) { + // Do not advance the bit_reader if we have fulfilled the decode + // request + break; + } + is_valid = bit_reader.IsSet(); + bit_reader.Next(); + } + } else { + RETURN_NOT_OK(helper.AppendNull()); + --null_count; + ++num_appended; + } + } + *out_num_values = values_decoded; + return Status::OK(); + } + + Status DecodeArrowDenseNonNull(int num_values, ArrowBinaryAccumulator* out, + int* out_num_values) { + constexpr int32_t buffer_size = 2048; + int32_t indices_buffer[buffer_size]; + int values_decoded = 0; + + ArrowBinaryHelper helper(out); + auto dict_values = reinterpret_cast(dictionary_->data()); + + while (values_decoded < num_values) { + int32_t batch_size = std::min(buffer_size, num_values - values_decoded); + int num_indices = idx_decoder_.GetBatch(indices_buffer, batch_size); + if (num_indices == 0) ParquetException::EofException(); + for (int i = 0; i < num_indices; ++i) { + const auto& val = dict_values[indices_buffer[i]]; + if (ARROW_PREDICT_FALSE(!helper.CanFit(val.len))) { + RETURN_NOT_OK(helper.PushChunk()); + } + RETURN_NOT_OK(helper.Append(val.ptr, static_cast(val.len))); + } + values_decoded += num_indices; + } + *out_num_values = values_decoded; + return Status::OK(); } - private: template - arrow::Status DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, - int64_t valid_bits_offset, BuilderType* builder, - int* out_num_values) { + Status DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, + int64_t valid_bits_offset, BuilderType* builder, + int* out_num_values) { constexpr int32_t buffer_size = 1024; int32_t indices_buffer[buffer_size]; @@ -1266,17 +1410,12 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, ++num_appended; } } - if (num_values != num_appended) { - return arrow::Status::IOError("Expected to dictionary-decode ", num_values, - " but only able to decode ", num_appended); - } *out_num_values = values_decoded; - return arrow::Status::OK(); + return Status::OK(); } template - arrow::Status DecodeArrowNonNull(int num_values, BuilderType* builder, - int* out_num_values) { + Status DecodeArrowNonNull(int num_values, BuilderType* builder, int* out_num_values) { constexpr int32_t buffer_size = 2048; int32_t indices_buffer[buffer_size]; int values_decoded = 0; @@ -1287,18 +1426,15 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, while (values_decoded < num_values) { int32_t batch_size = std::min(buffer_size, num_values - values_decoded); int num_indices = idx_decoder_.GetBatch(indices_buffer, batch_size); - if (num_indices == 0) break; + if (num_indices == 0) ParquetException::EofException(); for (int i = 0; i < num_indices; ++i) { const auto& val = dict_values[indices_buffer[i]]; RETURN_NOT_OK(builder->Append(val.ptr, val.len)); } values_decoded += num_indices; } - if (values_decoded != num_values) { - ParquetException::EofException(); - } *out_num_values = values_decoded; - return arrow::Status::OK(); + return Status::OK(); } }; diff --git a/cpp/src/parquet/encoding.h b/cpp/src/parquet/encoding.h index 618fd1a4c0c..a839e7fab21 100644 --- a/cpp/src/parquet/encoding.h +++ b/cpp/src/parquet/encoding.h @@ -225,6 +225,13 @@ using Int96Decoder = TypedDecoder; using FloatDecoder = TypedDecoder; using DoubleDecoder = TypedDecoder; +/// \brief Internal helper class for decoding BYTE_ARRAY data where we can +/// overflow the capacity of a single arrow::BinaryArray +struct ArrowBinaryAccumulator { + std::unique_ptr<::arrow::BinaryBuilder> builder; + std::vector> chunks; +}; + class ByteArrayDecoder : virtual public TypedDecoder { public: using TypedDecoder::DecodeSpaced; @@ -239,15 +246,9 @@ class ByteArrayDecoder : virtual public TypedDecoder { /// \brief Returns number of encoded values decoded virtual int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, - int64_t valid_bits_offset, ::arrow::BinaryBuilder* builder) = 0; + int64_t valid_bits_offset, ArrowBinaryAccumulator* out) = 0; - /// \brief Returns number of encoded values decoded - virtual int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, - int64_t valid_bits_offset, - ::arrow::internal::ChunkedBinaryBuilder* builder) = 0; - - virtual int DecodeArrowNonNull(int num_values, - ::arrow::internal::ChunkedBinaryBuilder* builder) = 0; + virtual int DecodeArrowNonNull(int num_values, ArrowBinaryAccumulator* out) = 0; }; class FLBADecoder : virtual public TypedDecoder { diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc index 5ce237f6533..1a374b0c9e7 100644 --- a/cpp/src/parquet/encoding_test.cc +++ b/cpp/src/parquet/encoding_test.cc @@ -367,13 +367,6 @@ class TestArrowBuilderDecoding : public ::testing::Test { } } - std::unique_ptr CreateDenseBuilder() { - // Use same default chunk size of 16MB as used in ByteArrayChunkedRecordReader - constexpr int32_t kChunkSize = 1 << 24; - return std::unique_ptr( - new DenseBuilder(kChunkSize, default_memory_pool())); - } - std::unique_ptr CreateDictBuilder() { return std::unique_ptr(new DictBuilder(default_memory_pool())); } @@ -381,13 +374,9 @@ class TestArrowBuilderDecoding : public ::testing::Test { // Setup encoder/decoder pair for testing with virtual void SetupEncoderDecoder() = 0; - template - void CheckDense(int actual_num_values, Builder& builder) { + void CheckDense(int actual_num_values, const arrow::Array& chunk) { ASSERT_EQ(actual_num_values, num_values_ - null_count_); - arrow::ArrayVector actual_vec; - ASSERT_OK(builder.Finish(&actual_vec)); - ASSERT_EQ(actual_vec.size(), 1); - ASSERT_ARRAYS_EQUAL(*actual_vec[0], *expected_dense_); + ASSERT_ARRAYS_EQUAL(chunk, *expected_dense_); } template @@ -401,10 +390,15 @@ class TestArrowBuilderDecoding : public ::testing::Test { void CheckDecodeArrowUsingDenseBuilder() { for (auto np : null_probabilities_) { InitTestCase(np); - auto builder = CreateDenseBuilder(); + + ArrowBinaryAccumulator acc; + acc.builder.reset(new ::arrow::BinaryBuilder); auto actual_num_values = - decoder_->DecodeArrow(num_values_, null_count_, valid_bits_, 0, builder.get()); - CheckDense(actual_num_values, *builder); + decoder_->DecodeArrow(num_values_, null_count_, valid_bits_, 0, &acc); + + std::shared_ptr<::arrow::Array> chunk; + ASSERT_OK(acc.builder->Finish(&chunk)); + CheckDense(actual_num_values, *chunk); } } @@ -422,9 +416,12 @@ class TestArrowBuilderDecoding : public ::testing::Test { for (auto np : null_probabilities_) { InitTestCase(np); SKIP_TEST_IF(null_count_ > 0) - auto builder = CreateDenseBuilder(); - auto actual_num_values = decoder_->DecodeArrowNonNull(num_values_, builder.get()); - CheckDense(actual_num_values, *builder); + ArrowBinaryAccumulator acc; + acc.builder.reset(new ::arrow::BinaryBuilder); + auto actual_num_values = decoder_->DecodeArrowNonNull(num_values_, &acc); + std::shared_ptr<::arrow::Array> chunk; + ASSERT_OK(acc.builder->Finish(&chunk)); + CheckDense(actual_num_values, *chunk); } } @@ -502,14 +499,15 @@ TEST(PlainEncodingAdHoc, ArrowBinaryDirectPut) { int num_values = static_cast(values->length() - values->null_count()); decoder->SetData(num_values, buf->data(), static_cast(buf->size())); - arrow::StringBuilder builder; - ASSERT_EQ(num_values, decoder->DecodeArrow(static_cast(values->length()), - static_cast(values->null_count()), - values->null_bitmap_data(), - values->offset(), &builder)); + ArrowBinaryAccumulator acc; + acc.builder.reset(new arrow::StringBuilder); + ASSERT_EQ(num_values, + decoder->DecodeArrow(static_cast(values->length()), + static_cast(values->null_count()), + values->null_bitmap_data(), values->offset(), &acc)); - std::shared_ptr result; - ASSERT_OK(builder.Finish(&result)); + std::shared_ptr<::arrow::Array> result; + ASSERT_OK(acc.builder->Finish(&result)); ASSERT_EQ(50, result->length()); arrow::AssertArraysEqual(*values, *result); @@ -567,14 +565,15 @@ TEST(DictEncodingAdHoc, ArrowBinaryDirectPut) { int num_values = static_cast(values->length() - values->null_count()); GetBinaryDictDecoder(encoder, num_values, &buf, &dict_buf, &decoder); - arrow::StringBuilder builder; + ArrowBinaryAccumulator acc; + acc.builder.reset(new arrow::StringBuilder); ASSERT_EQ(num_values, decoder->DecodeArrow(static_cast(values->length()), static_cast(values->null_count()), - values->null_bitmap_data(), values->offset(), &builder)); + values->null_bitmap_data(), values->offset(), &acc)); - std::shared_ptr result; - ASSERT_OK(builder.Finish(&result)); + std::shared_ptr<::arrow::Array> result; + ASSERT_OK(acc.builder->Finish(&result)); arrow::AssertArraysEqual(*values, *result); } @@ -607,14 +606,15 @@ TEST(DictEncodingAdHoc, PutDictionaryPutIndices) { int num_values = static_cast(expected->length() - expected->null_count()); GetBinaryDictDecoder(encoder, num_values, &buf, &dict_buf, &decoder); - arrow::BinaryBuilder builder; - ASSERT_EQ(num_values, decoder->DecodeArrow(static_cast(expected->length()), - static_cast(expected->null_count()), - expected->null_bitmap_data(), - expected->offset(), &builder)); + ArrowBinaryAccumulator acc; + acc.builder.reset(new arrow::BinaryBuilder); + ASSERT_EQ(num_values, + decoder->DecodeArrow(static_cast(expected->length()), + static_cast(expected->null_count()), + expected->null_bitmap_data(), expected->offset(), &acc)); - std::shared_ptr result; - ASSERT_OK(builder.Finish(&result)); + std::shared_ptr<::arrow::Array> result; + ASSERT_OK(acc.builder->Finish(&result)); arrow::AssertArraysEqual(*expected, *result); } From a4eab7da3e399cbc83fa986f5b2e6a57b91b37bd Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Wed, 4 Sep 2019 22:06:37 -0500 Subject: [PATCH 2/2] Fix up parquet-encoding-benchmark --- cpp/src/parquet/encoding.cc | 8 ++- cpp/src/parquet/encoding_benchmark.cc | 77 +++++++++++++-------------- 2 files changed, 45 insertions(+), 40 deletions(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index a278ff0527d..d473ac528bd 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -954,6 +954,9 @@ class PlainByteArrayDecoder : public PlainDecoder, int* values_decoded) { ArrowBinaryHelper helper(out); num_values = std::min(num_values, num_values_); + RETURN_NOT_OK(helper.builder->Reserve(num_values)); + RETURN_NOT_OK(helper.builder->ReserveData( + std::min(len_, helper.chunk_space_remaining))); for (int i = 0; i < num_values; ++i) { int32_t value_len = static_cast(arrow::util::SafeLoadAs(data_)); int increment = static_cast(sizeof(uint32_t) + value_len); @@ -961,8 +964,11 @@ class PlainByteArrayDecoder : public PlainDecoder, if (ARROW_PREDICT_FALSE(!helper.CanFit(value_len))) { // This element would exceed the capacity of a chunk RETURN_NOT_OK(helper.PushChunk()); + RETURN_NOT_OK(helper.builder->Reserve(num_values - i)); + RETURN_NOT_OK(helper.builder->ReserveData( + std::min(len_, helper.chunk_space_remaining))); } - RETURN_NOT_OK(helper.Append(data_ + sizeof(uint32_t), value_len)); + helper.UnsafeAppend(data_ + sizeof(uint32_t), value_len); data_ += increment; len_ -= increment; } diff --git a/cpp/src/parquet/encoding_benchmark.cc b/cpp/src/parquet/encoding_benchmark.cc index 4424c3f82f4..82246aaa0b5 100644 --- a/cpp/src/parquet/encoding_benchmark.cc +++ b/cpp/src/parquet/encoding_benchmark.cc @@ -265,6 +265,10 @@ BENCHMARK(BM_DictDecodingInt64_literals)->Range(MIN_RANGE, MAX_RANGE); // ---------------------------------------------------------------------- // Shared benchmarks for decoding using arrow builders + +using ::arrow::BinaryBuilder; +using ::arrow::BinaryDictionary32Builder; + class BenchmarkDecodeArrow : public ::benchmark::Fixture { public: void SetUp(const ::benchmark::State& state) override { @@ -304,9 +308,6 @@ class BenchmarkDecodeArrow : public ::benchmark::Fixture { virtual std::unique_ptr InitializeDecoder() = 0; - template - std::unique_ptr CreateBuilder(); - void EncodeArrowBenchmark(benchmark::State& state) { for (auto _ : state) { DoEncodeArrow(); @@ -321,23 +322,41 @@ class BenchmarkDecodeArrow : public ::benchmark::Fixture { state.SetBytesProcessed(state.iterations() * total_size_); } - template - void DecodeArrowBenchmark(benchmark::State& state) { + void DecodeArrowDenseBenchmark(benchmark::State& state) { + for (auto _ : state) { + auto decoder = InitializeDecoder(); + ArrowBinaryAccumulator acc; + acc.builder.reset(new BinaryBuilder); + decoder->DecodeArrow(num_values_, 0, valid_bits_, 0, &acc); + } + state.SetBytesProcessed(state.iterations() * total_size_); + } + + void DecodeArrowNonNullDenseBenchmark(benchmark::State& state) { + for (auto _ : state) { + auto decoder = InitializeDecoder(); + ArrowBinaryAccumulator acc; + acc.builder.reset(new BinaryBuilder); + decoder->DecodeArrowNonNull(num_values_, &acc); + } + state.SetBytesProcessed(state.iterations() * total_size_); + } + + void DecodeArrowDictBenchmark(benchmark::State& state) { for (auto _ : state) { auto decoder = InitializeDecoder(); - auto builder = CreateBuilder(); - decoder->DecodeArrow(num_values_, 0, valid_bits_, 0, builder.get()); + BinaryDictionary32Builder builder(default_memory_pool()); + decoder->DecodeArrow(num_values_, 0, valid_bits_, 0, &builder); } state.SetBytesProcessed(state.iterations() * total_size_); } - template - void DecodeArrowNonNullBenchmark(benchmark::State& state) { + void DecodeArrowNonNullDictBenchmark(benchmark::State& state) { for (auto _ : state) { auto decoder = InitializeDecoder(); - auto builder = CreateBuilder(); - decoder->DecodeArrowNonNull(num_values_, builder.get()); + BinaryDictionary32Builder builder(default_memory_pool()); + decoder->DecodeArrowNonNull(num_values_, &builder); } state.SetBytesProcessed(state.iterations() * total_size_); @@ -352,22 +371,6 @@ class BenchmarkDecodeArrow : public ::benchmark::Fixture { std::shared_ptr buffer_; }; -using ::arrow::BinaryDictionary32Builder; -using ::arrow::internal::ChunkedBinaryBuilder; - -template <> -std::unique_ptr BenchmarkDecodeArrow::CreateBuilder() { - int chunk_size = static_cast(buffer_->size()); - return std::unique_ptr( - new ChunkedBinaryBuilder(chunk_size, default_memory_pool())); -} - -template <> -std::unique_ptr BenchmarkDecodeArrow::CreateBuilder() { - return std::unique_ptr( - new BinaryDictionary32Builder(default_memory_pool())); -} - // ---------------------------------------------------------------------- // Benchmark Decoding from Plain Encoding class BM_ArrowBinaryPlain : public BenchmarkDecodeArrow { @@ -400,22 +403,20 @@ BENCHMARK_DEFINE_F(BM_ArrowBinaryPlain, EncodeLowLevel) BENCHMARK_REGISTER_F(BM_ArrowBinaryPlain, EncodeLowLevel)->Range(1 << 18, 1 << 20); BENCHMARK_DEFINE_F(BM_ArrowBinaryPlain, DecodeArrow_Dense) -(benchmark::State& state) { DecodeArrowBenchmark(state); } +(benchmark::State& state) { DecodeArrowDenseBenchmark(state); } BENCHMARK_REGISTER_F(BM_ArrowBinaryPlain, DecodeArrow_Dense)->Range(MIN_RANGE, MAX_RANGE); BENCHMARK_DEFINE_F(BM_ArrowBinaryPlain, DecodeArrowNonNull_Dense) -(benchmark::State& state) { DecodeArrowNonNullBenchmark(state); } +(benchmark::State& state) { DecodeArrowNonNullDenseBenchmark(state); } BENCHMARK_REGISTER_F(BM_ArrowBinaryPlain, DecodeArrowNonNull_Dense) ->Range(MIN_RANGE, MAX_RANGE); BENCHMARK_DEFINE_F(BM_ArrowBinaryPlain, DecodeArrow_Dict) -(benchmark::State& state) { DecodeArrowBenchmark(state); } +(benchmark::State& state) { DecodeArrowDictBenchmark(state); } BENCHMARK_REGISTER_F(BM_ArrowBinaryPlain, DecodeArrow_Dict)->Range(MIN_RANGE, MAX_RANGE); BENCHMARK_DEFINE_F(BM_ArrowBinaryPlain, DecodeArrowNonNull_Dict) -(benchmark::State& state) { - DecodeArrowNonNullBenchmark(state); -} +(benchmark::State& state) { DecodeArrowNonNullDictBenchmark(state); } BENCHMARK_REGISTER_F(BM_ArrowBinaryPlain, DecodeArrowNonNull_Dict) ->Range(MIN_RANGE, MAX_RANGE); @@ -525,23 +526,21 @@ BENCHMARK_DEFINE_F(BM_ArrowBinaryDict, EncodeLowLevel) BENCHMARK_REGISTER_F(BM_ArrowBinaryDict, EncodeLowLevel)->Range(1 << 18, 1 << 20); BENCHMARK_DEFINE_F(BM_ArrowBinaryDict, DecodeArrow_Dense)(benchmark::State& state) { - DecodeArrowBenchmark(state); + DecodeArrowDenseBenchmark(state); } BENCHMARK_REGISTER_F(BM_ArrowBinaryDict, DecodeArrow_Dense)->Range(MIN_RANGE, MAX_RANGE); BENCHMARK_DEFINE_F(BM_ArrowBinaryDict, DecodeArrowNonNull_Dense) -(benchmark::State& state) { DecodeArrowNonNullBenchmark(state); } +(benchmark::State& state) { DecodeArrowNonNullDenseBenchmark(state); } BENCHMARK_REGISTER_F(BM_ArrowBinaryDict, DecodeArrowNonNull_Dense) ->Range(MIN_RANGE, MAX_RANGE); BENCHMARK_DEFINE_F(BM_ArrowBinaryDict, DecodeArrow_Dict) -(benchmark::State& state) { DecodeArrowBenchmark(state); } +(benchmark::State& state) { DecodeArrowDictBenchmark(state); } BENCHMARK_REGISTER_F(BM_ArrowBinaryDict, DecodeArrow_Dict)->Range(MIN_RANGE, MAX_RANGE); BENCHMARK_DEFINE_F(BM_ArrowBinaryDict, DecodeArrowNonNull_Dict) -(benchmark::State& state) { - DecodeArrowNonNullBenchmark(state); -} +(benchmark::State& state) { DecodeArrowNonNullDictBenchmark(state); } BENCHMARK_REGISTER_F(BM_ArrowBinaryDict, DecodeArrowNonNull_Dict) ->Range(MIN_RANGE, MAX_RANGE);