diff --git a/cpp/src/arrow/testing/random.cc b/cpp/src/arrow/testing/random.cc index b050c5d69b7..f693a4535e9 100644 --- a/cpp/src/arrow/testing/random.cc +++ b/cpp/src/arrow/testing/random.cc @@ -79,7 +79,7 @@ std::shared_ptr RandomArrayGenerator::Boolean(int64_t size, double probab // only calls the GenerateBitmap method. using GenOpt = GenerateOptions>; - std::vector> buffers{2}; + BufferVector buffers{2}; // Need 2 distinct generators such that probabilities are not shared. GenOpt value_gen(seed(), 0, 1, probability); GenOpt null_gen(seed(), 0, 1, null_probability); @@ -100,7 +100,7 @@ static std::shared_ptr> GenerateNumericArray(int64_t siz OptionType options) { using CType = typename ArrowType::c_type; auto type = TypeTraits::type_singleton(); - std::vector> buffers{2}; + BufferVector buffers{2}; int64_t null_count = 0; ABORT_NOT_OK(AllocateEmptyBitmap(size, &buffers[0])); @@ -145,5 +145,64 @@ PRIMITIVE_RAND_FLOAT_IMPL(Float64, double, DoubleType) #undef PRIMITIVE_RAND_FLOAT_IMPL #undef PRIMITIVE_RAND_IMPL +std::shared_ptr RandomArrayGenerator::String(int64_t size, + int32_t min_length, + int32_t max_length, + double null_probability) { + if (null_probability < 0 || null_probability > 1) { + ABORT_NOT_OK(Status::Invalid("null_probability must be between 0 and 1")); + } + + auto int32_lengths = Int32(size, min_length, max_length, null_probability); + auto lengths = std::dynamic_pointer_cast(int32_lengths); + + // Visual Studio does not implement uniform_int_distribution for char types. + using GenOpt = GenerateOptions>; + GenOpt options(seed(), static_cast('A'), static_cast('z'), + /*null_probability=*/0); + + std::vector str_buffer(max_length); + StringBuilder builder; + + for (int64_t i = 0; i < size; ++i) { + if (lengths->IsValid(i)) { + options.GenerateData(str_buffer.data(), lengths->Value(i)); + ABORT_NOT_OK(builder.Append(str_buffer.data(), lengths->Value(i))); + } else { + ABORT_NOT_OK(builder.AppendNull()); + } + } + + std::shared_ptr result; + ABORT_NOT_OK(builder.Finish(&result)); + return result; +} + +std::shared_ptr RandomArrayGenerator::StringWithRepeats( + int64_t size, int64_t unique, int32_t min_length, int32_t max_length, + double null_probability) { + // Generate a random string dictionary without any nulls + auto array = String(unique, min_length, max_length, /*null_probability=*/0); + auto dictionary = std::dynamic_pointer_cast(array); + + // Generate random indices to sample the dictionary with + auto id_array = Int64(size, 0, unique - 1, null_probability); + auto indices = std::dynamic_pointer_cast(id_array); + StringBuilder builder; + + for (int64_t i = 0; i < size; ++i) { + if (indices->IsValid(i)) { + const auto index = indices->Value(i); + const auto value = dictionary->GetView(index); + ABORT_NOT_OK(builder.Append(value)); + } else { + ABORT_NOT_OK(builder.AppendNull()); + } + } + + std::shared_ptr result; + ABORT_NOT_OK(builder.Finish(&result)); + return result; +} } // namespace random } // namespace arrow diff --git a/cpp/src/arrow/testing/random.h b/cpp/src/arrow/testing/random.h index 1ed8d03e8a1..f69b7058db3 100644 --- a/cpp/src/arrow/testing/random.h +++ b/cpp/src/arrow/testing/random.h @@ -198,6 +198,35 @@ class ARROW_EXPORT RandomArrayGenerator { } } + /// \brief Generates a random StringArray + /// + /// \param[in] size the size of the array to generate + /// \param[in] min_length the lower bound of the string length + /// determined by the uniform distribution + /// \param[in] max_length the upper bound of the string length + /// determined by the uniform distribution + /// \param[in] null_probability the probability of a row being null + /// + /// \return a generated Array + std::shared_ptr String(int64_t size, int32_t min_length, + int32_t max_length, double null_probability); + + /// \brief Generates a random StringArray with repeated values + /// + /// \param[in] size the size of the array to generate + /// \param[in] unique the number of unique string values used + /// to populate the array + /// \param[in] min_length the lower bound of the string length + /// determined by the uniform distribution + /// \param[in] max_length the upper bound of the string length + /// determined by the uniform distribution + /// \param[in] null_probability the probability of a row being null + /// + /// \return a generated Array + std::shared_ptr StringWithRepeats(int64_t size, int64_t unique, + int32_t min_length, int32_t max_length, + double null_probability); + private: SeedType seed() { return seed_distribution_(seed_rng_); } diff --git a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc index 3995d2c2407..b54c3f41a5d 100644 --- a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc +++ b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc @@ -30,6 +30,7 @@ #include #include "arrow/api.h" +#include "arrow/testing/random.h" #include "arrow/testing/util.h" #include "arrow/type_traits.h" #include "arrow/util/decimal.h" @@ -2439,6 +2440,80 @@ TEST(TestArrowWriterAdHoc, SchemaMismatch) { ASSERT_RAISES(Invalid, writer->WriteTable(*tbl, 1)); } +// ---------------------------------------------------------------------- +// Tests for directly reading DictionaryArray +class TestArrowReadDictionary : public ::testing::TestWithParam { + public: + void SetUp() override { + GenerateData(GetParam()); + ASSERT_NO_FATAL_FAILURE( + WriteTableToBuffer(expected_dense_, expected_dense_->num_rows() / 2, + default_arrow_writer_properties(), &buffer_)); + + properties_ = default_arrow_reader_properties(); + } + + void GenerateData(double null_probability) { + constexpr int num_unique = 100; + constexpr int repeat = 10; + constexpr int64_t min_length = 2; + constexpr int64_t max_length = 10; + ::arrow::random::RandomArrayGenerator rag(0); + auto dense_array = rag.StringWithRepeats(repeat * num_unique, num_unique, min_length, + max_length, null_probability); + expected_dense_ = MakeSimpleTable(dense_array, /*nullable=*/true); + + ::arrow::StringDictionaryBuilder builder(default_memory_pool()); + const auto& string_array = static_cast(*dense_array); + ASSERT_OK(builder.AppendArray(string_array)); + + std::shared_ptr<::arrow::Array> dict_array; + ASSERT_OK(builder.Finish(&dict_array)); + expected_dict_ = MakeSimpleTable(dict_array, /*nullable=*/true); + + // TODO(hatemhelal): Figure out if we can use the following to init the expected_dict_ + // Currently fails due to DataType mismatch for indices array. + // Datum out; + // FunctionContext ctx(default_memory_pool()); + // ASSERT_OK(DictionaryEncode(&ctx, Datum(dense_array), &out)); + // expected_dict_ = MakeSimpleTable(out.make_array(), /*nullable=*/true); + } + + void TearDown() override {} + + void CheckReadWholeFile(const Table& expected) { + std::unique_ptr reader; + ASSERT_OK_NO_THROW(OpenFile(std::make_shared(buffer_), + ::arrow::default_memory_pool(), properties_, &reader)); + + std::shared_ptr actual; + ASSERT_OK_NO_THROW(reader->ReadTable(&actual)); + ::arrow::AssertTablesEqual(*actual, expected, /*same_chunk_layout=*/false); + } + + static std::vector null_probabilites() { return {0.0, 0.5, 1}; } + + protected: + std::shared_ptr
expected_dense_; + std::shared_ptr
expected_dict_; + std::shared_ptr buffer_; + ArrowReaderProperties properties_; +}; + +TEST_P(TestArrowReadDictionary, ReadWholeFileDict) { + properties_.set_read_dictionary(0, true); + CheckReadWholeFile(*expected_dict_); +} + +TEST_P(TestArrowReadDictionary, ReadWholeFileDense) { + properties_.set_read_dictionary(0, false); + CheckReadWholeFile(*expected_dense_); +} + +INSTANTIATE_TEST_CASE_P( + ReadDictionary, TestArrowReadDictionary, + ::testing::ValuesIn(TestArrowReadDictionary::null_probabilites())); + } // namespace arrow } // namespace parquet diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index 61f5bb28b79..f891682387e 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -101,6 +101,11 @@ Status GetSingleChunk(const ChunkedArray& chunked, std::shared_ptr* out) } // namespace +ArrowReaderProperties default_arrow_reader_properties() { + static ArrowReaderProperties default_reader_props; + return default_reader_props; +} + // ---------------------------------------------------------------------- // Iteration utilities @@ -236,8 +241,9 @@ using FileColumnIteratorFactory = class FileReader::Impl { public: - Impl(MemoryPool* pool, std::unique_ptr reader) - : pool_(pool), reader_(std::move(reader)), use_threads_(false) {} + Impl(MemoryPool* pool, std::unique_ptr reader, + const ArrowReaderProperties& properties) + : pool_(pool), reader_(std::move(reader)), reader_properties_(properties) {} virtual ~Impl() {} @@ -279,14 +285,21 @@ class FileReader::Impl { int num_columns() const { return reader_->metadata()->num_columns(); } - void set_use_threads(bool use_threads) { use_threads_ = use_threads; } + void set_use_threads(bool use_threads) { + reader_properties_.set_use_threads(use_threads); + } ParquetFileReader* reader() { return reader_.get(); } + std::vector GetDictionaryIndices(const std::vector& indices); + std::shared_ptr<::arrow::Schema> FixSchema( + const ::arrow::Schema& old_schema, const std::vector& dict_indices, + std::vector>& columns); + private: MemoryPool* pool_; std::unique_ptr reader_; - bool use_threads_; + ArrowReaderProperties reader_properties_; }; class ColumnReader::ColumnReaderImpl { @@ -302,9 +315,10 @@ class ColumnReader::ColumnReaderImpl { // Reader implementation for primitive arrays class PARQUET_NO_EXPORT PrimitiveImpl : public ColumnReader::ColumnReaderImpl { public: - PrimitiveImpl(MemoryPool* pool, std::unique_ptr input) + PrimitiveImpl(MemoryPool* pool, std::unique_ptr input, + const bool read_dictionary) : pool_(pool), input_(std::move(input)), descr_(input_->descr()) { - record_reader_ = RecordReader::Make(descr_, pool_); + record_reader_ = RecordReader::Make(descr_, pool_, read_dictionary); Status s = NodeToField(*input_->descr()->schema_node(), &field_); DCHECK_OK(s); NextRowGroup(); @@ -358,17 +372,19 @@ class PARQUET_NO_EXPORT StructImpl : public ColumnReader::ColumnReaderImpl { const std::vector>& children); }; -FileReader::FileReader(MemoryPool* pool, std::unique_ptr reader) - : impl_(new FileReader::Impl(pool, std::move(reader))) {} +FileReader::FileReader(MemoryPool* pool, std::unique_ptr reader, + const ArrowReaderProperties& properties) + : impl_(new FileReader::Impl(pool, std::move(reader), properties)) {} FileReader::~FileReader() {} Status FileReader::Impl::GetColumn(int i, FileColumnIteratorFactory iterator_factory, std::unique_ptr* out) { std::unique_ptr input(iterator_factory(i, reader_.get())); + bool read_dict = reader_properties_.read_dictionary(i); std::unique_ptr impl( - new PrimitiveImpl(pool_, std::move(input))); + new PrimitiveImpl(pool_, std::move(input), read_dict)); *out = std::unique_ptr(new ColumnReader(std::move(impl))); return Status::OK(); } @@ -552,7 +568,7 @@ Status FileReader::Impl::ReadRowGroup(int row_group_index, return Status::OK(); }; - if (use_threads_) { + if (reader_properties_.use_threads()) { std::vector> futures; auto pool = ::arrow::internal::GetCpuThreadPool(); for (int i = 0; i < num_fields; i++) { @@ -572,7 +588,15 @@ Status FileReader::Impl::ReadRowGroup(int row_group_index, } } - *out = Table::Make(schema, columns); + auto dict_indices = GetDictionaryIndices(indices); + + if (!dict_indices.empty()) { + schema = FixSchema(*schema, dict_indices, columns); + } + + std::shared_ptr
table = Table::Make(schema, columns); + RETURN_NOT_OK(table->Validate()); + *out = table; return Status::OK(); } @@ -599,7 +623,7 @@ Status FileReader::Impl::ReadTable(const std::vector& indices, return Status::OK(); }; - if (use_threads_) { + if (reader_properties_.use_threads()) { std::vector> futures; auto pool = ::arrow::internal::GetCpuThreadPool(); for (int i = 0; i < num_fields; i++) { @@ -619,6 +643,12 @@ Status FileReader::Impl::ReadTable(const std::vector& indices, } } + auto dict_indices = GetDictionaryIndices(indices); + + if (!dict_indices.empty()) { + schema = FixSchema(*schema, dict_indices, columns); + } + std::shared_ptr
table = Table::Make(schema, columns); RETURN_NOT_OK(table->Validate()); *out = table; @@ -664,6 +694,32 @@ Status FileReader::Impl::ReadRowGroup(int i, std::shared_ptr
* table) { return ReadRowGroup(i, indices, table); } +std::vector FileReader::Impl::GetDictionaryIndices(const std::vector& indices) { + // Select the column indices that were read as DictionaryArray + std::vector dict_indices(indices); + auto remove_func = [this](int i) { return !reader_properties_.read_dictionary(i); }; + auto it = std::remove_if(dict_indices.begin(), dict_indices.end(), remove_func); + dict_indices.erase(it, dict_indices.end()); + return dict_indices; +} + +std::shared_ptr<::arrow::Schema> FileReader::Impl::FixSchema( + const ::arrow::Schema& old_schema, const std::vector& dict_indices, + std::vector>& columns) { + // Fix the schema with the actual DictionaryType that was read + auto fields = old_schema.fields(); + + for (int idx : dict_indices) { + auto name = columns[idx]->name(); + auto dict_array = columns[idx]->data(); + auto dict_field = std::make_shared<::arrow::Field>(name, dict_array->type()); + fields[idx] = dict_field; + columns[idx] = std::make_shared(dict_field, dict_array); + } + + return std::make_shared<::arrow::Schema>(fields, old_schema.metadata()); +} + // Static ctor Status OpenFile(const std::shared_ptr<::arrow::io::RandomAccessFile>& file, MemoryPool* allocator, const ReaderProperties& props, @@ -683,6 +739,18 @@ Status OpenFile(const std::shared_ptr<::arrow::io::RandomAccessFile>& file, reader); } +Status OpenFile(const std::shared_ptr<::arrow::io::RandomAccessFile>& file, + ::arrow::MemoryPool* allocator, const ArrowReaderProperties& properties, + std::unique_ptr* reader) { + std::unique_ptr io_wrapper(new ArrowInputFile(file)); + std::unique_ptr pq_reader; + PARQUET_CATCH_NOT_OK( + pq_reader = ParquetReader::Open(std::move(io_wrapper), + ::parquet::default_reader_properties(), nullptr)); + reader->reset(new FileReader(allocator, std::move(pq_reader), properties)); + return Status::OK(); +} + Status FileReader::GetColumn(int i, std::unique_ptr* out) { FileColumnIteratorFactory iterator_factory = [](int i, ParquetFileReader* reader) { return new AllRowGroupsIterator(i, reader); @@ -1138,15 +1206,6 @@ struct TransferFunctor< Status operator()(RecordReader* reader, MemoryPool* pool, const std::shared_ptr<::arrow::DataType>& type, Datum* out) { std::vector> chunks = reader->GetBuilderChunks(); - - if (type->id() == ::arrow::Type::STRING) { - // Convert from BINARY type to STRING - for (size_t i = 0; i < chunks.size(); ++i) { - auto new_data = chunks[i]->data()->Copy(); - new_data->type = type; - chunks[i] = ::arrow::MakeArray(new_data); - } - } *out = std::make_shared(chunks); return Status::OK(); } diff --git a/cpp/src/parquet/arrow/reader.h b/cpp/src/parquet/arrow/reader.h index 7ef21fddf58..52fcec8f0d7 100644 --- a/cpp/src/parquet/arrow/reader.h +++ b/cpp/src/parquet/arrow/reader.h @@ -20,6 +20,7 @@ #include #include +#include #include #include "parquet/util/visibility.h" @@ -51,6 +52,42 @@ class ColumnChunkReader; class ColumnReader; class RowGroupReader; +static constexpr bool DEFAULT_USE_THREADS = false; + +/// EXPERIMENTAL: Properties for configuring FileReader behavior. +class PARQUET_EXPORT ArrowReaderProperties { + public: + explicit ArrowReaderProperties(bool use_threads = DEFAULT_USE_THREADS) + : use_threads_(use_threads), read_dict_indices_() {} + + void set_use_threads(bool use_threads) { use_threads_ = use_threads; } + + bool use_threads() const { return use_threads_; } + + void set_read_dictionary(int column_index, bool read_dict) { + if (read_dict) { + read_dict_indices_.insert(column_index); + } else { + read_dict_indices_.erase(column_index); + } + } + bool read_dictionary(int column_index) const { + if (read_dict_indices_.find(column_index) != read_dict_indices_.end()) { + return true; + } else { + return false; + } + } + + private: + bool use_threads_; + std::unordered_set read_dict_indices_; +}; + +/// EXPERIMENTAL: Constructs the default ArrowReaderProperties +PARQUET_EXPORT +ArrowReaderProperties default_arrow_reader_properties(); + // Arrow read adapter class for deserializing Parquet files as Arrow row // batches. // @@ -109,7 +146,8 @@ class RowGroupReader; // arrays class PARQUET_EXPORT FileReader { public: - FileReader(::arrow::MemoryPool* pool, std::unique_ptr reader); + FileReader(::arrow::MemoryPool* pool, std::unique_ptr reader, + const ArrowReaderProperties& properties = default_arrow_reader_properties()); // Since the distribution of columns amongst a Parquet file's row groups may // be uneven (the number of values in each column chunk can be different), we @@ -291,7 +329,7 @@ class PARQUET_EXPORT ColumnReader { }; // Helper function to create a file reader from an implementation of an Arrow -// readable file +// random access file // // metadata : separately-computed file metadata, can be nullptr PARQUET_EXPORT @@ -306,6 +344,12 @@ ::arrow::Status OpenFile(const std::shared_ptr<::arrow::io::RandomAccessFile>& f ::arrow::MemoryPool* allocator, std::unique_ptr* reader); +PARQUET_EXPORT +::arrow::Status OpenFile(const std::shared_ptr<::arrow::io::RandomAccessFile>& file, + ::arrow::MemoryPool* allocator, + const ArrowReaderProperties& properties, + std::unique_ptr* reader); + } // namespace arrow } // namespace parquet diff --git a/cpp/src/parquet/arrow/record_reader.cc b/cpp/src/parquet/arrow/record_reader.cc index c800f36d2ea..42334b91439 100644 --- a/cpp/src/parquet/arrow/record_reader.cc +++ b/cpp/src/parquet/arrow/record_reader.cc @@ -448,36 +448,17 @@ class RecordReader::RecordReaderImpl { std::shared_ptr<::arrow::ResizableBuffer> rep_levels_; }; -template -struct RecordReaderTraits { - using BuilderType = ::arrow::ArrayBuilder; -}; - -template <> -struct RecordReaderTraits { - using BuilderType = ::arrow::internal::ChunkedBinaryBuilder; -}; - -template <> -struct RecordReaderTraits { - using BuilderType = ::arrow::FixedSizeBinaryBuilder; -}; - template class TypedRecordReader : public RecordReader::RecordReaderImpl { public: using T = typename DType::c_type; - using BuilderType = typename RecordReaderTraits::BuilderType; - TypedRecordReader(const ColumnDescriptor* descr, ::arrow::MemoryPool* pool) - : RecordReader::RecordReaderImpl(descr, pool), current_decoder_(nullptr) { - InitializeBuilder(); - } + : RecordReader::RecordReaderImpl(descr, pool), current_decoder_(nullptr) {} void ResetDecoders() override { decoders_.clear(); } - inline void ReadValuesSpaced(int64_t values_with_nulls, int64_t null_count) { + virtual void ReadValuesSpaced(int64_t values_with_nulls, int64_t null_count) { uint8_t* valid_bits = valid_bits_->mutable_data(); const int64_t valid_bits_offset = values_written_; @@ -487,7 +468,7 @@ class TypedRecordReader : public RecordReader::RecordReaderImpl { DCHECK_EQ(num_decoded, values_with_nulls); } - inline void ReadValuesDense(int64_t values_to_read) { + virtual void ReadValuesDense(int64_t values_to_read) { int64_t num_decoded = current_decoder_->Decode(ValuesHead(), static_cast(values_to_read)); DCHECK_EQ(num_decoded, values_to_read); @@ -568,18 +549,17 @@ class TypedRecordReader : public RecordReader::RecordReaderImpl { throw ParquetException("GetChunks only implemented for binary types"); } - private: + protected: using DecoderType = typename EncodingTraits::Decoder; + DecoderType* current_decoder_; + + private: // Map of encoding type to the respective decoder object. For example, a // column chunk's data pages may include both dictionary-encoded and // plain-encoded data. std::unordered_map> decoders_; - std::unique_ptr builder_; - - DecoderType* current_decoder_; - // Initialize repetition and definition level decoders on the next data page. int64_t InitializeLevelDecoders(const DataPage& page, Encoding::type repetition_level_encoding, @@ -590,103 +570,143 @@ class TypedRecordReader : public RecordReader::RecordReaderImpl { // Advance to the next data page bool ReadNewPage() override; - void InitializeBuilder() {} - void ConfigureDictionary(const DictionaryPage* page); }; -// TODO(wesm): Implement these to some satisfaction -template <> -void TypedRecordReader::DebugPrintState() {} +class FLBARecordReader : public TypedRecordReader { + public: + FLBARecordReader(const ColumnDescriptor* descr, ::arrow::MemoryPool* pool) + : TypedRecordReader(descr, pool), builder_(nullptr) { + DCHECK_EQ(descr_->physical_type(), Type::FIXED_LEN_BYTE_ARRAY); + int byte_width = descr_->type_length(); + std::shared_ptr<::arrow::DataType> type = ::arrow::fixed_size_binary(byte_width); + builder_.reset(new ::arrow::FixedSizeBinaryBuilder(type, pool_)); + } -template <> -void TypedRecordReader::DebugPrintState() {} + ::arrow::ArrayVector GetBuilderChunks() override { + std::shared_ptr<::arrow::Array> chunk; + PARQUET_THROW_NOT_OK(builder_->Finish(&chunk)); + return ::arrow::ArrayVector({chunk}); + } -template <> -void TypedRecordReader::DebugPrintState() {} + void ReadValuesDense(int64_t values_to_read) override { + auto values = ValuesHead(); + int64_t num_decoded = + current_decoder_->Decode(values, static_cast(values_to_read)); + DCHECK_EQ(num_decoded, values_to_read); -template <> -void TypedRecordReader::InitializeBuilder() { - // Maximum of 16MB chunks - constexpr int32_t kBinaryChunksize = 1 << 24; - DCHECK_EQ(descr_->physical_type(), Type::BYTE_ARRAY); - builder_.reset(new ::arrow::internal::ChunkedBinaryBuilder(kBinaryChunksize, pool_)); -} + for (int64_t i = 0; i < num_decoded; i++) { + PARQUET_THROW_NOT_OK(builder_->Append(values[i].ptr)); + } + ResetValues(); + } -template <> -void TypedRecordReader::InitializeBuilder() { - DCHECK_EQ(descr_->physical_type(), Type::FIXED_LEN_BYTE_ARRAY); - int byte_width = descr_->type_length(); - std::shared_ptr<::arrow::DataType> type = ::arrow::fixed_size_binary(byte_width); - builder_.reset(new ::arrow::FixedSizeBinaryBuilder(type, pool_)); -} + void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override { + uint8_t* valid_bits = valid_bits_->mutable_data(); + const int64_t valid_bits_offset = values_written_; + auto values = ValuesHead(); -template <> -::arrow::ArrayVector TypedRecordReader::GetBuilderChunks() { - ::arrow::ArrayVector chunks; - PARQUET_THROW_NOT_OK(builder_->Finish(&chunks)); - return chunks; -} + int64_t num_decoded = current_decoder_->DecodeSpaced( + values, static_cast(values_to_read), static_cast(null_count), + valid_bits, valid_bits_offset); + DCHECK_EQ(num_decoded, values_to_read); -template <> -::arrow::ArrayVector TypedRecordReader::GetBuilderChunks() { - std::shared_ptr<::arrow::Array> chunk; - PARQUET_THROW_NOT_OK(builder_->Finish(&chunk)); - return ::arrow::ArrayVector({chunk}); -} + for (int64_t i = 0; i < num_decoded; i++) { + if (::arrow::BitUtil::GetBit(valid_bits, valid_bits_offset + i)) { + PARQUET_THROW_NOT_OK(builder_->Append(values[i].ptr)); + } else { + PARQUET_THROW_NOT_OK(builder_->AppendNull()); + } + } + ResetValues(); + } -template <> -inline void TypedRecordReader::ReadValuesDense(int64_t values_to_read) { - int64_t num_decoded = current_decoder_->DecodeArrowNonNull( - static_cast(values_to_read), builder_.get()); - DCHECK_EQ(num_decoded, values_to_read); - ResetValues(); -} + private: + std::unique_ptr<::arrow::FixedSizeBinaryBuilder> builder_; +}; -template <> -inline void TypedRecordReader::ReadValuesDense(int64_t values_to_read) { - auto values = ValuesHead(); - int64_t num_decoded = - current_decoder_->Decode(values, static_cast(values_to_read)); - DCHECK_EQ(num_decoded, values_to_read); +class ByteArrayChunkedRecordReader : public TypedRecordReader { + public: + ByteArrayChunkedRecordReader(const ColumnDescriptor* descr, ::arrow::MemoryPool* pool) + : TypedRecordReader(descr, pool), builder_(nullptr) { + // Maximum of 16MB chunks + constexpr int32_t kBinaryChunksize = 1 << 24; + DCHECK_EQ(descr_->physical_type(), Type::BYTE_ARRAY); + if (descr_->logical_type() == LogicalType::UTF8) { + builder_.reset( + new ::arrow::internal::ChunkedStringBuilder(kBinaryChunksize, pool_)); + } else { + builder_.reset( + new ::arrow::internal::ChunkedBinaryBuilder(kBinaryChunksize, pool_)); + } + } - for (int64_t i = 0; i < num_decoded; i++) { - PARQUET_THROW_NOT_OK(builder_->Append(values[i].ptr)); + ::arrow::ArrayVector GetBuilderChunks() override { + ::arrow::ArrayVector chunks; + PARQUET_THROW_NOT_OK(builder_->Finish(&chunks)); + return chunks; } - ResetValues(); -} + void ReadValuesDense(int64_t values_to_read) override { + int64_t num_decoded = current_decoder_->DecodeArrowNonNull( + static_cast(values_to_read), builder_.get()); + DCHECK_EQ(num_decoded, values_to_read); + ResetValues(); + } + + void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override { + int64_t num_decoded = current_decoder_->DecodeArrow( + static_cast(values_to_read), static_cast(null_count), + valid_bits_->mutable_data(), values_written_, builder_.get()); + DCHECK_EQ(num_decoded, values_to_read); + ResetValues(); + } + + private: + std::unique_ptr<::arrow::internal::ChunkedBinaryBuilder> builder_; +}; + +template +class ByteArrayDictionaryRecordReader : public TypedRecordReader { + public: + ByteArrayDictionaryRecordReader(const ColumnDescriptor* descr, + ::arrow::MemoryPool* pool) + : TypedRecordReader(descr, pool), builder_(new BuilderType(pool)) {} + + ::arrow::ArrayVector GetBuilderChunks() override { + std::shared_ptr<::arrow::Array> chunk; + PARQUET_THROW_NOT_OK(builder_->Finish(&chunk)); + return ::arrow::ArrayVector({chunk}); + } + + void ReadValuesDense(int64_t values_to_read) override { + int64_t num_decoded = current_decoder_->DecodeArrowNonNull( + static_cast(values_to_read), builder_.get()); + DCHECK_EQ(num_decoded, values_to_read); + ResetValues(); + } + + void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override { + int64_t num_decoded = current_decoder_->DecodeArrow( + static_cast(values_to_read), static_cast(null_count), + valid_bits_->mutable_data(), values_written_, builder_.get()); + DCHECK_EQ(num_decoded, values_to_read); + ResetValues(); + } + + private: + std::unique_ptr builder_; +}; + +// TODO(wesm): Implement these to some satisfaction template <> -inline void TypedRecordReader::ReadValuesSpaced(int64_t values_to_read, - int64_t null_count) { - int64_t num_decoded = current_decoder_->DecodeArrow( - static_cast(values_to_read), static_cast(null_count), - valid_bits_->mutable_data(), values_written_, builder_.get()); - DCHECK_EQ(num_decoded, values_to_read); - ResetValues(); -} +void TypedRecordReader::DebugPrintState() {} template <> -inline void TypedRecordReader::ReadValuesSpaced(int64_t values_to_read, - int64_t null_count) { - uint8_t* valid_bits = valid_bits_->mutable_data(); - const int64_t valid_bits_offset = values_written_; - auto values = ValuesHead(); - - int64_t num_decoded = current_decoder_->DecodeSpaced( - values, static_cast(values_to_read), static_cast(null_count), valid_bits, - valid_bits_offset); - DCHECK_EQ(num_decoded, values_to_read); - - for (int64_t i = 0; i < num_decoded; i++) { - if (::arrow::BitUtil::GetBit(valid_bits, valid_bits_offset + i)) { - PARQUET_THROW_NOT_OK(builder_->Append(values[i].ptr)); - } else { - PARQUET_THROW_NOT_OK(builder_->AppendNull()); - } - } - ResetValues(); -} +void TypedRecordReader::DebugPrintState() {} + +template <> +void TypedRecordReader::DebugPrintState() {} template inline void TypedRecordReader::ConfigureDictionary(const DictionaryPage* page) { @@ -845,8 +865,27 @@ bool TypedRecordReader::ReadNewPage() { return true; } +std::shared_ptr RecordReader::MakeByteArrayRecordReader( + const ColumnDescriptor* descr, arrow::MemoryPool* pool, bool read_dictionary) { + if (read_dictionary) { + if (descr->logical_type() == LogicalType::UTF8) { + using Builder = ::arrow::StringDictionaryBuilder; + return std::shared_ptr( + new RecordReader(new ByteArrayDictionaryRecordReader(descr, pool))); + } else { + using Builder = ::arrow::BinaryDictionaryBuilder; + return std::shared_ptr( + new RecordReader(new ByteArrayDictionaryRecordReader(descr, pool))); + } + } else { + return std::shared_ptr( + new RecordReader(new ByteArrayChunkedRecordReader(descr, pool))); + } +} + std::shared_ptr RecordReader::Make(const ColumnDescriptor* descr, - MemoryPool* pool) { + MemoryPool* pool, + const bool read_dictionary) { switch (descr->physical_type()) { case Type::BOOLEAN: return std::shared_ptr( @@ -867,11 +906,10 @@ std::shared_ptr RecordReader::Make(const ColumnDescriptor* descr, return std::shared_ptr( new RecordReader(new TypedRecordReader(descr, pool))); case Type::BYTE_ARRAY: - return std::shared_ptr( - new RecordReader(new TypedRecordReader(descr, pool))); + return RecordReader::MakeByteArrayRecordReader(descr, pool, read_dictionary); case Type::FIXED_LEN_BYTE_ARRAY: return std::shared_ptr( - new RecordReader(new TypedRecordReader(descr, pool))); + new RecordReader(new FLBARecordReader(descr, pool))); default: { // PARQUET-1481: This can occur if the file is corrupt std::stringstream ss; diff --git a/cpp/src/parquet/arrow/record_reader.h b/cpp/src/parquet/arrow/record_reader.h index cc932c28650..c999dd00ef2 100644 --- a/cpp/src/parquet/arrow/record_reader.h +++ b/cpp/src/parquet/arrow/record_reader.h @@ -51,7 +51,8 @@ class RecordReader { static std::shared_ptr Make( const ColumnDescriptor* descr, - ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()); + ::arrow::MemoryPool* pool = ::arrow::default_memory_pool(), + const bool read_dictionary = false); virtual ~RecordReader(); @@ -111,6 +112,10 @@ class RecordReader { private: std::unique_ptr impl_; explicit RecordReader(RecordReaderImpl* impl); + + static std::shared_ptr MakeByteArrayRecordReader( + const ColumnDescriptor* descr, ::arrow::MemoryPool* pool, + const bool read_dictionary); }; } // namespace internal diff --git a/cpp/src/parquet/encoding-benchmark.cc b/cpp/src/parquet/encoding-benchmark.cc index 8031aeb7ce1..0e881903525 100644 --- a/cpp/src/parquet/encoding-benchmark.cc +++ b/cpp/src/parquet/encoding-benchmark.cc @@ -17,13 +17,29 @@ #include "benchmark/benchmark.h" +#include "arrow/array.h" +#include "arrow/array/builder_binary.h" +#include "arrow/array/builder_dict.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/testing/random.h" +#include "arrow/testing/util.h" +#include "arrow/type.h" + #include "parquet/encoding.h" #include "parquet/schema.h" #include "parquet/util/memory.h" +#include + using arrow::default_memory_pool; using arrow::MemoryPool; +namespace { +// The min/max number of values used to drive each family of encoding benchmarks +constexpr int MIN_RANGE = 1024; +constexpr int MAX_RANGE = 65536; +} // namespace + namespace parquet { using schema::PrimitiveNode; @@ -39,14 +55,14 @@ static void BM_PlainEncodingBoolean(benchmark::State& state) { auto encoder = MakeEncoder(Type::BOOLEAN, Encoding::PLAIN); auto typed_encoder = dynamic_cast(encoder.get()); - while (state.KeepRunning()) { + for (auto _ : state) { typed_encoder->Put(values, static_cast(values.size())); typed_encoder->FlushValues(); } state.SetBytesProcessed(state.iterations() * state.range(0) * sizeof(bool)); } -BENCHMARK(BM_PlainEncodingBoolean)->Range(1024, 65536); +BENCHMARK(BM_PlainEncodingBoolean)->Range(MIN_RANGE, MAX_RANGE); static void BM_PlainDecodingBoolean(benchmark::State& state) { std::vector values(state.range(0), true); @@ -56,7 +72,7 @@ static void BM_PlainDecodingBoolean(benchmark::State& state) { typed_encoder->Put(values, static_cast(values.size())); std::shared_ptr buf = encoder->FlushValues(); - while (state.KeepRunning()) { + for (auto _ : state) { auto decoder = MakeTypedDecoder(Encoding::PLAIN); decoder->SetData(static_cast(values.size()), buf->data(), static_cast(buf->size())); @@ -67,19 +83,19 @@ static void BM_PlainDecodingBoolean(benchmark::State& state) { delete[] output; } -BENCHMARK(BM_PlainDecodingBoolean)->Range(1024, 65536); +BENCHMARK(BM_PlainDecodingBoolean)->Range(MIN_RANGE, MAX_RANGE); static void BM_PlainEncodingInt64(benchmark::State& state) { std::vector values(state.range(0), 64); auto encoder = MakeTypedEncoder(Encoding::PLAIN); - while (state.KeepRunning()) { + for (auto _ : state) { encoder->Put(values.data(), static_cast(values.size())); encoder->FlushValues(); } state.SetBytesProcessed(state.iterations() * state.range(0) * sizeof(int64_t)); } -BENCHMARK(BM_PlainEncodingInt64)->Range(1024, 65536); +BENCHMARK(BM_PlainEncodingInt64)->Range(MIN_RANGE, MAX_RANGE); static void BM_PlainDecodingInt64(benchmark::State& state) { std::vector values(state.range(0), 64); @@ -87,7 +103,7 @@ static void BM_PlainDecodingInt64(benchmark::State& state) { encoder->Put(values.data(), static_cast(values.size())); std::shared_ptr buf = encoder->FlushValues(); - while (state.KeepRunning()) { + for (auto _ : state) { auto decoder = MakeTypedDecoder(Encoding::PLAIN); decoder->SetData(static_cast(values.size()), buf->data(), static_cast(buf->size())); @@ -96,7 +112,7 @@ static void BM_PlainDecodingInt64(benchmark::State& state) { state.SetBytesProcessed(state.iterations() * state.range(0) * sizeof(int64_t)); } -BENCHMARK(BM_PlainDecodingInt64)->Range(1024, 65536); +BENCHMARK(BM_PlainDecodingInt64)->Range(MIN_RANGE, MAX_RANGE); template static void DecodeDict(std::vector& values, @@ -126,7 +142,7 @@ static void DecodeDict(std::vector& values, PARQUET_THROW_NOT_OK(indices->Resize(actual_bytes)); - while (state.KeepRunning()) { + for (auto _ : state) { auto dict_decoder = MakeTypedDecoder(Encoding::PLAIN, descr.get()); dict_decoder->SetData(dict_traits->num_entries(), dict_buffer->data(), static_cast(dict_buffer->size())); @@ -148,7 +164,7 @@ static void BM_DictDecodingInt64_repeats(benchmark::State& state) { DecodeDict(values, state); } -BENCHMARK(BM_DictDecodingInt64_repeats)->Range(1024, 65536); +BENCHMARK(BM_DictDecodingInt64_repeats)->Range(MIN_RANGE, MAX_RANGE); static void BM_DictDecodingInt64_literals(benchmark::State& state) { typedef Int64Type Type; @@ -161,6 +177,192 @@ static void BM_DictDecodingInt64_literals(benchmark::State& state) { DecodeDict(values, state); } -BENCHMARK(BM_DictDecodingInt64_literals)->Range(1024, 65536); +BENCHMARK(BM_DictDecodingInt64_literals)->Range(MIN_RANGE, MAX_RANGE); + +// ---------------------------------------------------------------------- +// Shared benchmarks for decoding using arrow builders +class BenchmarkDecodeArrow : public ::benchmark::Fixture { + public: + void SetUp(const ::benchmark::State& state) override { + num_values_ = static_cast(state.range()); + InitDataInputs(); + DoEncodeData(); + } + + void TearDown(const ::benchmark::State& state) override {} + + void InitDataInputs() { + // Generate a random string dictionary without any nulls so that this dataset can be + // used for benchmarking the DecodeArrowNonNull API + constexpr int repeat_factor = 8; + constexpr int64_t min_length = 2; + constexpr int64_t max_length = 10; + ::arrow::random::RandomArrayGenerator rag(0); + input_array_ = rag.StringWithRepeats(num_values_, num_values_ / repeat_factor, + min_length, max_length, /*null_probability=*/0); + valid_bits_ = input_array_->null_bitmap()->data(); + values_ = std::vector(); + values_.reserve(num_values_); + total_size_ = 0; + const auto& binary_array = static_cast(*input_array_); + + for (int64_t i = 0; i < binary_array.length(); i++) { + auto view = binary_array.GetView(i); + values_.emplace_back(static_cast(view.length()), + reinterpret_cast(view.data())); + total_size_ += view.length(); + } + } + + virtual void DoEncodeData() = 0; + + virtual std::unique_ptr InitializeDecoder() = 0; + + template + std::unique_ptr CreateBuilder(); + + template + void DecodeArrowBenchmark(benchmark::State& state) { + for (auto _ : state) { + auto decoder = InitializeDecoder(); + auto builder = CreateBuilder(); + decoder->DecodeArrow(num_values_, 0, valid_bits_, 0, builder.get()); + } + + state.SetBytesProcessed(state.iterations() * total_size_); + } + + template + void DecodeArrowNonNullBenchmark(benchmark::State& state) { + for (auto _ : state) { + auto decoder = InitializeDecoder(); + auto builder = CreateBuilder(); + decoder->DecodeArrowNonNull(num_values_, builder.get()); + } + + state.SetBytesProcessed(state.iterations() * total_size_); + } + + protected: + int num_values_; + std::shared_ptr<::arrow::Array> input_array_; + uint64_t total_size_; + std::vector values_; + const uint8_t* valid_bits_; + std::shared_ptr buffer_; +}; + +using ::arrow::BinaryDictionaryBuilder; +using ::arrow::internal::ChunkedBinaryBuilder; + +template <> +std::unique_ptr BenchmarkDecodeArrow::CreateBuilder() { + int chunk_size = static_cast(buffer_->size()); + return std::unique_ptr( + new ChunkedBinaryBuilder(chunk_size, default_memory_pool())); +} + +template <> +std::unique_ptr BenchmarkDecodeArrow::CreateBuilder() { + return std::unique_ptr( + new BinaryDictionaryBuilder(default_memory_pool())); +} + +// ---------------------------------------------------------------------- +// Benchmark Decoding from Plain Encoding +class BM_PlainDecodingByteArray : public BenchmarkDecodeArrow { + public: + void DoEncodeData() override { + auto encoder = MakeTypedEncoder(Encoding::PLAIN); + encoder->Put(values_.data(), num_values_); + buffer_ = encoder->FlushValues(); + } + + std::unique_ptr InitializeDecoder() override { + auto decoder = MakeTypedDecoder(Encoding::PLAIN); + decoder->SetData(num_values_, buffer_->data(), static_cast(buffer_->size())); + return decoder; + } +}; + +BENCHMARK_DEFINE_F(BM_PlainDecodingByteArray, DecodeArrow_Dense) +(benchmark::State& state) { DecodeArrowBenchmark(state); } +BENCHMARK_REGISTER_F(BM_PlainDecodingByteArray, DecodeArrow_Dense) + ->Range(MIN_RANGE, MAX_RANGE); + +BENCHMARK_DEFINE_F(BM_PlainDecodingByteArray, DecodeArrowNonNull_Dense) +(benchmark::State& state) { DecodeArrowNonNullBenchmark(state); } +BENCHMARK_REGISTER_F(BM_PlainDecodingByteArray, DecodeArrowNonNull_Dense) + ->Range(MIN_RANGE, MAX_RANGE); + +BENCHMARK_DEFINE_F(BM_PlainDecodingByteArray, DecodeArrow_Dict) +(benchmark::State& state) { DecodeArrowBenchmark(state); } +BENCHMARK_REGISTER_F(BM_PlainDecodingByteArray, DecodeArrow_Dict) + ->Range(MIN_RANGE, MAX_RANGE); + +BENCHMARK_DEFINE_F(BM_PlainDecodingByteArray, DecodeArrowNonNull_Dict) +(benchmark::State& state) { DecodeArrowNonNullBenchmark(state); } +BENCHMARK_REGISTER_F(BM_PlainDecodingByteArray, DecodeArrowNonNull_Dict) + ->Range(MIN_RANGE, MAX_RANGE); + +// ---------------------------------------------------------------------- +// Benchmark Decoding from Dictionary Encoding +class BM_DictDecodingByteArray : public BenchmarkDecodeArrow { + public: + void DoEncodeData() override { + auto node = schema::ByteArray("name"); + descr_ = std::unique_ptr(new ColumnDescriptor(node, 0, 0)); + auto encoder = MakeTypedEncoder(Encoding::PLAIN, + /*use_dictionary=*/true, descr_.get()); + ASSERT_NO_THROW(encoder->Put(values_.data(), num_values_)); + buffer_ = encoder->FlushValues(); + + auto dict_encoder = dynamic_cast*>(encoder.get()); + ASSERT_NE(dict_encoder, nullptr); + dict_buffer_ = + AllocateBuffer(default_memory_pool(), dict_encoder->dict_encoded_size()); + dict_encoder->WriteDict(dict_buffer_->mutable_data()); + num_dict_entries_ = dict_encoder->num_entries(); + } + + std::unique_ptr InitializeDecoder() override { + auto decoder = MakeTypedDecoder(Encoding::PLAIN, descr_.get()); + decoder->SetData(num_dict_entries_, dict_buffer_->data(), + static_cast(dict_buffer_->size())); + auto dict_decoder = MakeDictDecoder(descr_.get()); + dict_decoder->SetDict(decoder.get()); + dict_decoder->SetData(num_values_, buffer_->data(), + static_cast(buffer_->size())); + return std::unique_ptr( + dynamic_cast(dict_decoder.release())); + } + + protected: + std::unique_ptr descr_; + std::unique_ptr> dict_decoder_; + std::shared_ptr dict_buffer_; + int num_dict_entries_; +}; + +BENCHMARK_DEFINE_F(BM_DictDecodingByteArray, DecodeArrow_Dense)(benchmark::State& state) { + DecodeArrowBenchmark(state); +} +BENCHMARK_REGISTER_F(BM_DictDecodingByteArray, DecodeArrow_Dense) + ->Range(MIN_RANGE, MAX_RANGE); + +BENCHMARK_DEFINE_F(BM_DictDecodingByteArray, DecodeArrowNonNull_Dense) +(benchmark::State& state) { DecodeArrowNonNullBenchmark(state); } +BENCHMARK_REGISTER_F(BM_DictDecodingByteArray, DecodeArrowNonNull_Dense) + ->Range(MIN_RANGE, MAX_RANGE); + +BENCHMARK_DEFINE_F(BM_DictDecodingByteArray, DecodeArrow_Dict) +(benchmark::State& state) { DecodeArrowBenchmark(state); } +BENCHMARK_REGISTER_F(BM_DictDecodingByteArray, DecodeArrow_Dict) + ->Range(MIN_RANGE, MAX_RANGE); + +BENCHMARK_DEFINE_F(BM_DictDecodingByteArray, DecodeArrowNonNull_Dict) +(benchmark::State& state) { DecodeArrowNonNullBenchmark(state); } +BENCHMARK_REGISTER_F(BM_DictDecodingByteArray, DecodeArrowNonNull_Dict) + ->Range(MIN_RANGE, MAX_RANGE); } // namespace parquet diff --git a/cpp/src/parquet/encoding-test.cc b/cpp/src/parquet/encoding-test.cc index 28d98126ec8..4ec537a9e84 100644 --- a/cpp/src/parquet/encoding-test.cc +++ b/cpp/src/parquet/encoding-test.cc @@ -22,6 +22,12 @@ #include #include +#include "arrow/array.h" +#include "arrow/compute/api.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/testing/random.h" +#include "arrow/testing/util.h" +#include "arrow/type.h" #include "arrow/util/bit-util.h" #include "parquet/encoding.h" @@ -36,6 +42,13 @@ using arrow::MemoryPool; using std::string; using std::vector; +// TODO(hatemhelal): investigate whether this can be replaced with GTEST_SKIP in a future +// gtest release that contains https://github.com/google/googletest/pull/1544 +#define SKIP_TEST_IF(condition) \ + if (condition) { \ + return; \ + } + namespace parquet { namespace test { @@ -314,6 +327,258 @@ TEST(TestDictionaryEncoding, CannotDictDecodeBoolean) { ASSERT_THROW(MakeDictDecoder(nullptr), ParquetException); } +// ---------------------------------------------------------------------- +// Shared arrow builder decode tests +template +struct BuilderTraits {}; + +template <> +struct BuilderTraits<::arrow::BinaryType> { + using DenseArrayBuilder = ::arrow::internal::ChunkedBinaryBuilder; + using DictArrayBuilder = ::arrow::BinaryDictionaryBuilder; +}; + +template <> +struct BuilderTraits<::arrow::StringType> { + using DenseArrayBuilder = ::arrow::internal::ChunkedStringBuilder; + using DictArrayBuilder = ::arrow::StringDictionaryBuilder; +}; + +template +class TestArrowBuilderDecoding : public ::testing::Test { + public: + using DenseBuilder = typename BuilderTraits::DenseArrayBuilder; + using DictBuilder = typename BuilderTraits::DictArrayBuilder; + + void SetUp() override { null_probabilities_ = {0.0, 0.5, 1.0}; } + void TearDown() override {} + + void InitTestCase(double null_probability) { + GenerateInputData(null_probability); + SetupEncoderDecoder(); + } + + void GenerateInputData(double null_probability) { + constexpr int num_unique = 100; + constexpr int repeat = 10; + constexpr int64_t min_length = 2; + constexpr int64_t max_length = 10; + ::arrow::random::RandomArrayGenerator rag(0); + expected_dense_ = rag.StringWithRepeats(repeat * num_unique, num_unique, min_length, + max_length, null_probability); + + std::shared_ptr<::arrow::DataType> data_type = std::make_shared(); + + if (data_type->id() == ::arrow::BinaryType::type_id) { + // TODO(hatemhelal): this is a kludge. Probably best to extend the + // RandomArrayGenerator to also generate BinaryType arrays. + auto data = expected_dense_->data()->Copy(); + data->type = data_type; + expected_dense_ = std::make_shared<::arrow::BinaryArray>(data); + } + + num_values_ = static_cast(expected_dense_->length()); + null_count_ = static_cast(expected_dense_->null_count()); + valid_bits_ = expected_dense_->null_bitmap()->data(); + + auto builder = CreateDictBuilder(); + ASSERT_OK(builder->AppendArray(*expected_dense_)); + ASSERT_OK(builder->Finish(&expected_dict_)); + + // Initialize input_data_ for the encoder from the expected_array_ values + const auto& binary_array = static_cast(*expected_dense_); + input_data_.reserve(binary_array.length()); + + for (int64_t i = 0; i < binary_array.length(); ++i) { + auto view = binary_array.GetView(i); + input_data_[i] = {static_cast(view.length()), + reinterpret_cast(view.data())}; + } + } + + std::unique_ptr CreateDenseBuilder() { + // Use same default chunk size of 16MB as used in ByteArrayChunkedRecordReader + constexpr int32_t kChunkSize = 1 << 24; + return std::unique_ptr( + new DenseBuilder(kChunkSize, default_memory_pool())); + } + + std::unique_ptr CreateDictBuilder() { + return std::unique_ptr(new DictBuilder(default_memory_pool())); + } + + // Setup encoder/decoder pair for testing with + virtual void SetupEncoderDecoder() = 0; + + template + void CheckDense(int actual_num_values, Builder& builder) { + ASSERT_EQ(actual_num_values, num_values_); + ::arrow::ArrayVector actual_vec; + ASSERT_OK(builder.Finish(&actual_vec)); + ASSERT_EQ(actual_vec.size(), 1); + ASSERT_ARRAYS_EQUAL(*actual_vec[0], *expected_dense_); + } + + template + void CheckDict(int actual_num_values, Builder& builder) { + ASSERT_EQ(actual_num_values, num_values_); + std::shared_ptr<::arrow::Array> actual; + ASSERT_OK(builder.Finish(&actual)); + ASSERT_ARRAYS_EQUAL(*actual, *expected_dict_); + } + + void CheckDecodeArrowUsingDenseBuilder() { + for (auto np : null_probabilities_) { + InitTestCase(np); + auto builder = CreateDenseBuilder(); + auto actual_num_values = + decoder_->DecodeArrow(num_values_, null_count_, valid_bits_, 0, builder.get()); + CheckDense(actual_num_values, *builder); + } + } + + void CheckDecodeArrowUsingDictBuilder() { + for (auto np : null_probabilities_) { + InitTestCase(np); + auto builder = CreateDictBuilder(); + auto actual_num_values = + decoder_->DecodeArrow(num_values_, null_count_, valid_bits_, 0, builder.get()); + CheckDict(actual_num_values, *builder); + } + } + + void CheckDecodeArrowNonNullUsingDenseBuilder() { + for (auto np : null_probabilities_) { + InitTestCase(np); + SKIP_TEST_IF(null_count_ > 0) + auto builder = CreateDenseBuilder(); + auto actual_num_values = decoder_->DecodeArrowNonNull(num_values_, builder.get()); + CheckDense(actual_num_values, *builder); + } + } + + void CheckDecodeArrowNonNullUsingDictBuilder() { + for (auto np : null_probabilities_) { + InitTestCase(np); + SKIP_TEST_IF(null_count_ > 0) + auto builder = CreateDictBuilder(); + auto actual_num_values = decoder_->DecodeArrowNonNull(num_values_, builder.get()); + CheckDict(actual_num_values, *builder); + } + } + + protected: + std::vector null_probabilities_; + std::shared_ptr<::arrow::Array> expected_dict_; + std::shared_ptr<::arrow::Array> expected_dense_; + int num_values_; + int null_count_; + vector input_data_; + const uint8_t* valid_bits_; + std::unique_ptr encoder_; + std::unique_ptr decoder_; + std::shared_ptr buffer_; +}; + +#define TEST_ARROW_BUILDER_BASE_MEMBERS() \ + using TestArrowBuilderDecoding::encoder_; \ + using TestArrowBuilderDecoding::decoder_; \ + using TestArrowBuilderDecoding::input_data_; \ + using TestArrowBuilderDecoding::valid_bits_; \ + using TestArrowBuilderDecoding::num_values_; \ + using TestArrowBuilderDecoding::buffer_ + +template +class PlainEncoding : public TestArrowBuilderDecoding { + public: + void SetupEncoderDecoder() override { + encoder_ = MakeTypedEncoder(Encoding::PLAIN); + decoder_ = MakeTypedDecoder(Encoding::PLAIN); + ASSERT_NO_THROW(encoder_->PutSpaced(input_data_.data(), num_values_, valid_bits_, 0)); + buffer_ = encoder_->FlushValues(); + decoder_->SetData(num_values_, buffer_->data(), static_cast(buffer_->size())); + } + + protected: + TEST_ARROW_BUILDER_BASE_MEMBERS(); +}; + +using BuilderArrayTypes = ::testing::Types<::arrow::BinaryType, ::arrow::StringType>; +// using BuilderArrayTypes = ::testing::Types<::arrow::StringType>; +TYPED_TEST_CASE(PlainEncoding, BuilderArrayTypes); + +TYPED_TEST(PlainEncoding, CheckDecodeArrowUsingDenseBuilder) { + this->CheckDecodeArrowUsingDenseBuilder(); +} + +TYPED_TEST(PlainEncoding, CheckDecodeArrowUsingDictBuilder) { + this->CheckDecodeArrowUsingDictBuilder(); +} + +TYPED_TEST(PlainEncoding, CheckDecodeArrowNonNullDenseBuilder) { + this->CheckDecodeArrowNonNullUsingDenseBuilder(); +} + +TYPED_TEST(PlainEncoding, CheckDecodeArrowNonNullDictBuilder) { + this->CheckDecodeArrowNonNullUsingDictBuilder(); +} + +template +class DictEncoding : public TestArrowBuilderDecoding { + public: + void SetupEncoderDecoder() override { + auto node = schema::ByteArray("name"); + descr_ = std::unique_ptr(new ColumnDescriptor(node, 0, 0)); + encoder_ = MakeTypedEncoder(Encoding::PLAIN, /*use_dictionary=*/true, + descr_.get()); + ASSERT_NO_THROW(encoder_->PutSpaced(input_data_.data(), num_values_, valid_bits_, 0)); + buffer_ = encoder_->FlushValues(); + + auto dict_encoder = dynamic_cast*>(encoder_.get()); + ASSERT_NE(dict_encoder, nullptr); + dict_buffer_ = + AllocateBuffer(default_memory_pool(), dict_encoder->dict_encoded_size()); + dict_encoder->WriteDict(dict_buffer_->mutable_data()); + + // Simulate reading the dictionary page followed by a data page + plain_decoder_ = MakeTypedDecoder(Encoding::PLAIN, descr_.get()); + plain_decoder_->SetData(dict_encoder->num_entries(), dict_buffer_->data(), + static_cast(dict_buffer_->size())); + + dict_decoder_ = MakeDictDecoder(descr_.get()); + dict_decoder_->SetDict(plain_decoder_.get()); + dict_decoder_->SetData(num_values_, buffer_->data(), + static_cast(buffer_->size())); + decoder_ = std::unique_ptr( + dynamic_cast(dict_decoder_.release())); + } + + protected: + TEST_ARROW_BUILDER_BASE_MEMBERS(); + std::unique_ptr descr_; + std::unique_ptr plain_decoder_; + std::unique_ptr> dict_decoder_; + std::shared_ptr dict_buffer_; +}; + +TYPED_TEST_CASE(DictEncoding, BuilderArrayTypes); + +TYPED_TEST(DictEncoding, CheckDecodeArrowUsingDenseBuilder) { + this->CheckDecodeArrowUsingDenseBuilder(); +} + +TYPED_TEST(DictEncoding, CheckDecodeArrowUsingDictBuilder) { + this->CheckDecodeArrowUsingDictBuilder(); +} + +TYPED_TEST(DictEncoding, CheckDecodeArrowNonNullDenseBuilder) { + this->CheckDecodeArrowNonNullUsingDenseBuilder(); +} + +TYPED_TEST(DictEncoding, CheckDecodeArrowNonNullDictBuilder) { + this->CheckDecodeArrowNonNullUsingDictBuilder(); +} + } // namespace test } // namespace parquet diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index da630671f79..217ee808e23 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -24,7 +24,6 @@ #include #include -#include "arrow/builder.h" #include "arrow/status.h" #include "arrow/util/bit-stream-utils.h" #include "arrow/util/bit-util.h" @@ -697,40 +696,12 @@ class PlainByteArrayDecoder : public PlainDecoder, using Base::DecodeSpaced; using Base::PlainDecoder; - int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, - int64_t valid_bits_offset, - ::arrow::internal::ChunkedBinaryBuilder* out) override { - int result = 0; - PARQUET_THROW_NOT_OK( - DecodeArrow(num_values, null_count, valid_bits, valid_bits_offset, out, &result)); - return result; - } - - int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, - int64_t valid_bits_offset, - ::arrow::BinaryDictionaryBuilder* out) override { - int result = 0; - PARQUET_THROW_NOT_OK( - DecodeArrow(num_values, null_count, valid_bits, valid_bits_offset, out, &result)); - return result; - } - - int DecodeArrowNonNull(int num_values, - ::arrow::internal::ChunkedBinaryBuilder* out) override { - int result = 0; - PARQUET_THROW_NOT_OK(DecodeArrowNonNull(num_values, out, &result)); - return result; - } - private: - template ::arrow::Status DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, - int64_t valid_bits_offset, BuilderType* out, - int* values_decoded) { + int64_t valid_bits_offset, WrappedBuilderInterface* builder, + int* values_decoded) override { num_values = std::min(num_values, num_values_); - - ARROW_RETURN_NOT_OK(out->Reserve(num_values)); - + builder->Reserve(num_values); ::arrow::internal::BitmapReader bit_reader(valid_bits, valid_bits_offset, num_values); int increment; int i = 0; @@ -744,15 +715,15 @@ class PlainByteArrayDecoder : public PlainDecoder, if (data_size < increment) { ParquetException::EofException(); } - ARROW_RETURN_NOT_OK(out->Append(data + sizeof(uint32_t), len)); + builder->Append(data + sizeof(uint32_t), len); data += increment; data_size -= increment; bytes_decoded += increment; - ++i; } else { - ARROW_RETURN_NOT_OK(out->AppendNull()); + builder->AppendNull(); } bit_reader.Next(); + ++i; } data_ += bytes_decoded; @@ -762,23 +733,24 @@ class PlainByteArrayDecoder : public PlainDecoder, return ::arrow::Status::OK(); } - ::arrow::Status DecodeArrowNonNull(int num_values, - ::arrow::internal::ChunkedBinaryBuilder* out, - int* values_decoded) { + ::arrow::Status DecodeArrowNonNull(int num_values, WrappedBuilderInterface* builder, + int* values_decoded) override { num_values = std::min(num_values, num_values_); - ARROW_RETURN_NOT_OK(out->Reserve(num_values)); + builder->Reserve(num_values); int i = 0; const uint8_t* data = data_; int64_t data_size = len_; int bytes_decoded = 0; + while (i < num_values) { uint32_t len = *reinterpret_cast(data); int increment = static_cast(sizeof(uint32_t) + len); if (data_size < increment) ParquetException::EofException(); - ARROW_RETURN_NOT_OK(out->Append(data + sizeof(uint32_t), len)); + builder->Append(data + sizeof(uint32_t), len); data += increment; data_size -= increment; bytes_decoded += increment; + ++i; } data_ += bytes_decoded; @@ -916,39 +888,13 @@ class DictByteArrayDecoder : public DictDecoderImpl, using BASE = DictDecoderImpl; using BASE::DictDecoderImpl; - int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, - int64_t valid_bits_offset, - ::arrow::internal::ChunkedBinaryBuilder* out) override { - int result = 0; - PARQUET_THROW_NOT_OK( - DecodeArrow(num_values, null_count, valid_bits, valid_bits_offset, out, &result)); - return result; - } - - int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, - int64_t valid_bits_offset, - ::arrow::BinaryDictionaryBuilder* out) override { - int result = 0; - PARQUET_THROW_NOT_OK( - DecodeArrow(num_values, null_count, valid_bits, valid_bits_offset, out, &result)); - return result; - } - - int DecodeArrowNonNull(int num_values, - ::arrow::internal::ChunkedBinaryBuilder* out) override { - int result = 0; - PARQUET_THROW_NOT_OK(DecodeArrowNonNull(num_values, out, &result)); - return result; - } - private: - template ::arrow::Status DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, - int64_t valid_bits_offset, BuilderType* builder, - int* out_num_values) { + int64_t valid_bits_offset, WrappedBuilderInterface* builder, + int* out_num_values) override { constexpr int32_t buffer_size = 1024; int32_t indices_buffer[buffer_size]; - + builder->Reserve(num_values); ::arrow::internal::BitmapReader bit_reader(valid_bits, valid_bits_offset, num_values); int values_decoded = 0; @@ -966,10 +912,10 @@ class DictByteArrayDecoder : public DictDecoderImpl, // Consume all indices if (is_valid) { const auto& val = dictionary_[indices_buffer[i]]; - ARROW_RETURN_NOT_OK(builder->Append(val.ptr, val.len)); + builder->Append(val.ptr, val.len); ++i; } else { - ARROW_RETURN_NOT_OK(builder->AppendNull()); + builder->AppendNull(); --null_count; } ++values_decoded; @@ -982,7 +928,7 @@ class DictByteArrayDecoder : public DictDecoderImpl, bit_reader.Next(); } } else { - ARROW_RETURN_NOT_OK(builder->AppendNull()); + builder->AppendNull(); --null_count; ++values_decoded; } @@ -995,18 +941,20 @@ class DictByteArrayDecoder : public DictDecoderImpl, return ::arrow::Status::OK(); } - template - ::arrow::Status DecodeArrowNonNull(int num_values, BuilderType* builder, - int* out_num_values) { + ::arrow::Status DecodeArrowNonNull(int num_values, WrappedBuilderInterface* builder, + int* out_num_values) override { constexpr int32_t buffer_size = 2048; int32_t indices_buffer[buffer_size]; int values_decoded = 0; + builder->Reserve(num_values); + while (values_decoded < num_values) { - int num_indices = idx_decoder_.GetBatch(indices_buffer, buffer_size); + int32_t batch_size = std::min(buffer_size, num_values - values_decoded); + int num_indices = idx_decoder_.GetBatch(indices_buffer, batch_size); if (num_indices == 0) break; for (int i = 0; i < num_indices; ++i) { const auto& val = dictionary_[indices_buffer[i]]; - PARQUET_THROW_NOT_OK(builder->Append(val.ptr, val.len)); + builder->Append(val.ptr, val.len); } values_decoded += num_indices; } diff --git a/cpp/src/parquet/encoding.h b/cpp/src/parquet/encoding.h index 046296cdb14..09c1d0ffc77 100644 --- a/cpp/src/parquet/encoding.h +++ b/cpp/src/parquet/encoding.h @@ -32,17 +32,6 @@ #include "parquet/util/memory.h" #include "parquet/util/visibility.h" -namespace arrow { - -class BinaryDictionaryBuilder; - -namespace internal { - -class ChunkedBinaryBuilder; - -} // namespace internal -} // namespace arrow - namespace parquet { class ColumnDescriptor; @@ -207,18 +196,61 @@ using DoubleDecoder = TypedDecoder; class ByteArrayDecoder : virtual public TypedDecoder { public: using TypedDecoder::DecodeSpaced; - virtual int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, - int64_t valid_bits_offset, - ::arrow::internal::ChunkedBinaryBuilder* builder) = 0; - - virtual int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, - int64_t valid_bits_offset, - ::arrow::BinaryDictionaryBuilder* builder) = 0; - - // TODO(wesm): Implement DecodeArrowNonNull as part of ARROW-3325 - // See also ARROW-3772, ARROW-3769 - virtual int DecodeArrowNonNull(int num_values, - ::arrow::internal::ChunkedBinaryBuilder* builder) = 0; + + class WrappedBuilderInterface { + public: + virtual void Reserve(int64_t values) = 0; + virtual void Append(const uint8_t* value, uint32_t length) = 0; + virtual void AppendNull() = 0; + virtual ~WrappedBuilderInterface() = default; + }; + + template + class WrappedBuilder : public WrappedBuilderInterface { + public: + explicit WrappedBuilder(Builder* builder) : builder_(builder) {} + + void Reserve(int64_t values) override { + PARQUET_THROW_NOT_OK(builder_->Reserve(values)); + } + void Append(const uint8_t* value, uint32_t length) override { + PARQUET_THROW_NOT_OK(builder_->Append(value, length)); + } + + void AppendNull() override { PARQUET_THROW_NOT_OK(builder_->AppendNull()); } + + private: + Builder* builder_; + }; + + template + int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, + int64_t valid_bits_offset, Builder* builder) { + int result = 0; + WrappedBuilder wrapped_builder(builder); + PARQUET_THROW_NOT_OK(DecodeArrow(num_values, null_count, valid_bits, + valid_bits_offset, &wrapped_builder, &result)); + return result; + } + + template + int DecodeArrowNonNull(int num_values, Builder* builder) { + int result = 0; + WrappedBuilder wrapped_builder(builder); + PARQUET_THROW_NOT_OK(DecodeArrowNonNull(num_values, &wrapped_builder, &result)); + return result; + } + + private: + virtual ::arrow::Status DecodeArrow(int num_values, int null_count, + const uint8_t* valid_bits, + int64_t valid_bits_offset, + WrappedBuilderInterface* builder, + int* values_decoded) = 0; + + virtual ::arrow::Status DecodeArrowNonNull(int num_values, + WrappedBuilderInterface* builder, + int* values_decoded) = 0; }; class FLBADecoder : virtual public TypedDecoder {