From 3df7c93a1e9e1d36ef229c1d627be0289ac01f9c Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Thu, 18 Jul 2019 14:16:34 -0500 Subject: [PATCH 1/2] Consolidate internal::RecordReader, ColumnReader files --- cpp/src/parquet/arrow/reader.cc | 2 +- cpp/src/parquet/arrow/record_reader.cc | 975 ----------------------- cpp/src/parquet/arrow/record_reader.h | 122 --- cpp/src/parquet/column_reader.cc | 1013 +++++++++++++++++++----- cpp/src/parquet/column_reader.h | 100 +++ 5 files changed, 906 insertions(+), 1306 deletions(-) delete mode 100644 cpp/src/parquet/arrow/record_reader.cc delete mode 100644 cpp/src/parquet/arrow/record_reader.h diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index e789b949cea..2259f7a5da5 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -1418,7 +1418,7 @@ struct TransferFunctor< typename std::enable_if::value && (std::is_same::value || std::is_same::value)>::type> { - Status operator()(RecordReader* reader, MemoryPool* pool, + Status operator()(RecordReader<* reader, MemoryPool* pool, const std::shared_ptr<::arrow::DataType>& type, Datum* out) { DCHECK_EQ(type->id(), ::arrow::Type::DECIMAL); diff --git a/cpp/src/parquet/arrow/record_reader.cc b/cpp/src/parquet/arrow/record_reader.cc deleted file mode 100644 index 71bedec35ab..00000000000 --- a/cpp/src/parquet/arrow/record_reader.cc +++ /dev/null @@ -1,975 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "parquet/arrow/record_reader.h" - -#include -#include -#include -#include -#include -#include -#include - -#include "arrow/array.h" -#include "arrow/buffer.h" -#include "arrow/builder.h" -#include "arrow/type.h" -#include "arrow/util/logging.h" - -#include "parquet/column_page.h" -#include "parquet/column_reader.h" -#include "parquet/encoding.h" -#include "parquet/exception.h" -#include "parquet/schema.h" -#include "parquet/types.h" - -using arrow::MemoryPool; - -namespace parquet { -namespace internal { - -// PLAIN_DICTIONARY is deprecated but used to be used as a dictionary index -// encoding. -static bool IsDictionaryIndexEncoding(Encoding::type e) { - return e == Encoding::RLE_DICTIONARY || e == Encoding::PLAIN_DICTIONARY; -} - -// The minimum number of repetition/definition levels to decode at a time, for -// better vectorized performance when doing many smaller record reads -constexpr int64_t kMinLevelBatchSize = 1024; - -class RecordReader::RecordReaderImpl { - public: - RecordReaderImpl(const ColumnDescriptor* descr, MemoryPool* pool) - : descr_(descr), - pool_(pool), - num_buffered_values_(0), - num_decoded_values_(0), - max_def_level_(descr->max_definition_level()), - max_rep_level_(descr->max_repetition_level()), - at_record_start_(true), - records_read_(0), - values_written_(0), - values_capacity_(0), - null_count_(0), - levels_written_(0), - levels_position_(0), - levels_capacity_(0), - uses_values_(!(descr->physical_type() == Type::BYTE_ARRAY)) { - nullable_values_ = internal::HasSpacedValues(descr); - if (uses_values_) { - values_ = AllocateBuffer(pool); - } - valid_bits_ = AllocateBuffer(pool); - def_levels_ = AllocateBuffer(pool); - rep_levels_ = AllocateBuffer(pool); - Reset(); - } - - virtual ~RecordReaderImpl() = default; - - virtual int64_t ReadRecordData(int64_t num_records) = 0; - - // Returns true if there are still values in this column. - bool HasNext() { - // Either there is no data page available yet, or the data page has been - // exhausted - if (num_buffered_values_ == 0 || num_decoded_values_ == num_buffered_values_) { - if (!ReadNewPage() || num_buffered_values_ == 0) { - return false; - } - } - return true; - } - - int64_t ReadRecords(int64_t num_records) { - // Delimit records, then read values at the end - int64_t records_read = 0; - - if (levels_position_ < levels_written_) { - records_read += ReadRecordData(num_records); - } - - int64_t level_batch_size = std::max(kMinLevelBatchSize, num_records); - - // If we are in the middle of a record, we continue until reaching the - // desired number of records or the end of the current record if we've found - // enough records - while (!at_record_start_ || records_read < num_records) { - // Is there more data to read in this row group? - if (!HasNext()) { - if (!at_record_start_) { - // We ended the row group while inside a record that we haven't seen - // the end of yet. So increment the record count for the last record in - // the row group - ++records_read; - at_record_start_ = true; - } - break; - } - - /// We perform multiple batch reads until we either exhaust the row group - /// or observe the desired number of records - int64_t batch_size = std::min(level_batch_size, available_values_current_page()); - - // No more data in column - if (batch_size == 0) { - break; - } - - if (max_def_level_ > 0) { - ReserveLevels(batch_size); - - int16_t* def_levels = this->def_levels() + levels_written_; - int16_t* rep_levels = this->rep_levels() + levels_written_; - - // Not present for non-repeated fields - int64_t levels_read = 0; - if (max_rep_level_ > 0) { - levels_read = ReadDefinitionLevels(batch_size, def_levels); - if (ReadRepetitionLevels(batch_size, rep_levels) != levels_read) { - throw ParquetException("Number of decoded rep / def levels did not match"); - } - } else if (max_def_level_ > 0) { - levels_read = ReadDefinitionLevels(batch_size, def_levels); - } - - // Exhausted column chunk - if (levels_read == 0) { - break; - } - - levels_written_ += levels_read; - records_read += ReadRecordData(num_records - records_read); - } else { - // No repetition or definition levels - batch_size = std::min(num_records - records_read, batch_size); - records_read += ReadRecordData(batch_size); - } - } - - return records_read; - } - - // Dictionary decoders must be reset when advancing row groups - virtual void ResetDecoders() = 0; - - void SetPageReader(std::unique_ptr reader) { - at_record_start_ = true; - pager_ = std::move(reader); - ResetDecoders(); - } - - bool HasMoreData() const { return pager_ != nullptr; } - - int16_t* def_levels() const { - return reinterpret_cast(def_levels_->mutable_data()); - } - - int16_t* rep_levels() { - return reinterpret_cast(rep_levels_->mutable_data()); - } - - uint8_t* values() const { return values_->mutable_data(); } - - /// \brief Number of values written including nulls (if any) - int64_t values_written() const { return values_written_; } - - int64_t levels_position() const { return levels_position_; } - int64_t levels_written() const { return levels_written_; } - - // We may outwardly have the appearance of having exhausted a column chunk - // when in fact we are in the middle of processing the last batch - bool has_values_to_process() const { return levels_position_ < levels_written_; } - - int64_t null_count() const { return null_count_; } - - bool nullable_values() const { return nullable_values_; } - - std::shared_ptr ReleaseValues() { - if (uses_values_) { - auto result = values_; - values_ = AllocateBuffer(pool_); - return result; - } else { - return nullptr; - } - } - - std::shared_ptr ReleaseIsValid() { - auto result = valid_bits_; - valid_bits_ = AllocateBuffer(pool_); - return result; - } - - // Process written repetition/definition levels to reach the end of - // records. Process no more levels than necessary to delimit the indicated - // number of logical records. Updates internal state of RecordReader - // - // \return Number of records delimited - int64_t DelimitRecords(int64_t num_records, int64_t* values_seen) { - int64_t values_to_read = 0; - int64_t records_read = 0; - - const int16_t* def_levels = this->def_levels() + levels_position_; - const int16_t* rep_levels = this->rep_levels() + levels_position_; - - DCHECK_GT(max_rep_level_, 0); - - // Count logical records and number of values to read - while (levels_position_ < levels_written_) { - if (*rep_levels++ == 0) { - // If at_record_start_ is true, we are seeing the start of a record - // for the second time, such as after repeated calls to - // DelimitRecords. In this case we must continue until we find - // another record start or exhausting the ColumnChunk - if (!at_record_start_) { - // We've reached the end of a record; increment the record count. - ++records_read; - if (records_read == num_records) { - // We've found the number of records we were looking for. Set - // at_record_start_ to true and break - at_record_start_ = true; - break; - } - } - } - - // We have decided to consume the level at this position; therefore we - // must advance until we find another record boundary - at_record_start_ = false; - - if (*def_levels++ == max_def_level_) { - ++values_to_read; - } - ++levels_position_; - } - *values_seen = values_to_read; - return records_read; - } - - // Read multiple definition levels into preallocated memory - // - // Returns the number of decoded definition levels - int64_t ReadDefinitionLevels(int64_t batch_size, int16_t* levels) { - if (descr_->max_definition_level() == 0) { - return 0; - } - return definition_level_decoder_.Decode(static_cast(batch_size), levels); - } - - int64_t ReadRepetitionLevels(int64_t batch_size, int16_t* levels) { - if (descr_->max_repetition_level() == 0) { - return 0; - } - return repetition_level_decoder_.Decode(static_cast(batch_size), levels); - } - - int64_t available_values_current_page() const { - return num_buffered_values_ - num_decoded_values_; - } - - void ConsumeBufferedValues(int64_t num_values) { num_decoded_values_ += num_values; } - - Type::type type() const { return descr_->physical_type(); } - - const ColumnDescriptor* descr() const { return descr_; } - - void Reserve(int64_t capacity) { - ReserveLevels(capacity); - ReserveValues(capacity); - } - - void ReserveLevels(int64_t capacity) { - if (descr_->max_definition_level() > 0 && - (levels_written_ + capacity > levels_capacity_)) { - int64_t new_levels_capacity = BitUtil::NextPower2(levels_capacity_ + 1); - while (levels_written_ + capacity > new_levels_capacity) { - new_levels_capacity = BitUtil::NextPower2(new_levels_capacity + 1); - } - PARQUET_THROW_NOT_OK( - def_levels_->Resize(new_levels_capacity * sizeof(int16_t), false)); - if (descr_->max_repetition_level() > 0) { - PARQUET_THROW_NOT_OK( - rep_levels_->Resize(new_levels_capacity * sizeof(int16_t), false)); - } - levels_capacity_ = new_levels_capacity; - } - } - - void ReserveValues(int64_t capacity) { - if (values_written_ + capacity > values_capacity_) { - int64_t new_values_capacity = BitUtil::NextPower2(values_capacity_ + 1); - while (values_written_ + capacity > new_values_capacity) { - new_values_capacity = BitUtil::NextPower2(new_values_capacity + 1); - } - - int type_size = GetTypeByteSize(descr_->physical_type()); - - // XXX(wesm): A hack to avoid memory allocation when reading directly - // into builder classes - if (uses_values_) { - PARQUET_THROW_NOT_OK(values_->Resize(new_values_capacity * type_size, false)); - } - - values_capacity_ = new_values_capacity; - } - if (nullable_values_) { - int64_t valid_bytes_new = BitUtil::BytesForBits(values_capacity_); - if (valid_bits_->size() < valid_bytes_new) { - int64_t valid_bytes_old = BitUtil::BytesForBits(values_written_); - PARQUET_THROW_NOT_OK(valid_bits_->Resize(valid_bytes_new, false)); - - // Avoid valgrind warnings - memset(valid_bits_->mutable_data() + valid_bytes_old, 0, - valid_bytes_new - valid_bytes_old); - } - } - } - - void Reset() { - ResetValues(); - - if (levels_written_ > 0) { - const int64_t levels_remaining = levels_written_ - levels_position_; - // Shift remaining levels to beginning of buffer and trim to only the number - // of decoded levels remaining - int16_t* def_data = def_levels(); - int16_t* rep_data = rep_levels(); - - std::copy(def_data + levels_position_, def_data + levels_written_, def_data); - PARQUET_THROW_NOT_OK( - def_levels_->Resize(levels_remaining * sizeof(int16_t), false)); - - if (max_rep_level_ > 0) { - std::copy(rep_data + levels_position_, rep_data + levels_written_, rep_data); - PARQUET_THROW_NOT_OK( - rep_levels_->Resize(levels_remaining * sizeof(int16_t), false)); - } - - levels_written_ -= levels_position_; - levels_position_ = 0; - levels_capacity_ = levels_remaining; - } - - records_read_ = 0; - - // Call Finish on the binary builders to reset them - } - - void ResetValues() { - if (values_written_ > 0) { - // Resize to 0, but do not shrink to fit - if (uses_values_) { - PARQUET_THROW_NOT_OK(values_->Resize(0, false)); - } - PARQUET_THROW_NOT_OK(valid_bits_->Resize(0, false)); - values_written_ = 0; - values_capacity_ = 0; - null_count_ = 0; - } - } - - virtual void DebugPrintState() = 0; - - virtual std::vector> GetBuilderChunks() = 0; - - protected: - virtual bool ReadNewPage() = 0; - - const ColumnDescriptor* descr_; - ::arrow::MemoryPool* pool_; - - std::unique_ptr pager_; - std::shared_ptr current_page_; - - // Not set if full schema for this field has no optional or repeated elements - LevelDecoder definition_level_decoder_; - - // Not set for flat schemas. - LevelDecoder repetition_level_decoder_; - - // The total number of values stored in the data page. This is the maximum of - // the number of encoded definition levels or encoded values. For - // non-repeated, required columns, this is equal to the number of encoded - // values. For repeated or optional values, there may be fewer data values - // than levels, and this tells you how many encoded levels there are in that - // case. - int64_t num_buffered_values_; - - // The number of values from the current data page that have been decoded - // into memory - int64_t num_decoded_values_; - - const int16_t max_def_level_; - const int16_t max_rep_level_; - - bool nullable_values_; - - bool at_record_start_; - int64_t records_read_; - - int64_t values_written_; - int64_t values_capacity_; - int64_t null_count_; - - int64_t levels_written_; - int64_t levels_position_; - int64_t levels_capacity_; - - std::shared_ptr<::arrow::ResizableBuffer> values_; - // In the case of false, don't allocate the values buffer (when we directly read into - // builder classes). - bool uses_values_; - - template - T* ValuesHead() { - return reinterpret_cast(values_->mutable_data()) + values_written_; - } - - std::shared_ptr<::arrow::ResizableBuffer> valid_bits_; - std::shared_ptr<::arrow::ResizableBuffer> def_levels_; - std::shared_ptr<::arrow::ResizableBuffer> rep_levels_; -}; - -template -class TypedRecordReader : public RecordReader::RecordReaderImpl { - public: - using T = typename DType::c_type; - - TypedRecordReader(const ColumnDescriptor* descr, ::arrow::MemoryPool* pool) - : RecordReader::RecordReaderImpl(descr, pool), current_decoder_(nullptr) {} - - void ResetDecoders() override { decoders_.clear(); } - - virtual void ReadValuesSpaced(int64_t values_with_nulls, int64_t null_count) { - uint8_t* valid_bits = valid_bits_->mutable_data(); - const int64_t valid_bits_offset = values_written_; - - int64_t num_decoded = current_decoder_->DecodeSpaced( - ValuesHead(), static_cast(values_with_nulls), - static_cast(null_count), valid_bits, valid_bits_offset); - DCHECK_EQ(num_decoded, values_with_nulls); - } - - virtual void ReadValuesDense(int64_t values_to_read) { - int64_t num_decoded = - current_decoder_->Decode(ValuesHead(), static_cast(values_to_read)); - DCHECK_EQ(num_decoded, values_to_read); - } - - // Return number of logical records read - int64_t ReadRecordData(int64_t num_records) override { - // Conservative upper bound - const int64_t possible_num_values = - std::max(num_records, levels_written_ - levels_position_); - ReserveValues(possible_num_values); - - const int64_t start_levels_position = levels_position_; - - int64_t values_to_read = 0; - int64_t records_read = 0; - if (max_rep_level_ > 0) { - records_read = DelimitRecords(num_records, &values_to_read); - } else if (max_def_level_ > 0) { - // No repetition levels, skip delimiting logic. Each level represents a - // null or not null entry - records_read = std::min(levels_written_ - levels_position_, num_records); - - // This is advanced by DelimitRecords, which we skipped - levels_position_ += records_read; - } else { - records_read = values_to_read = num_records; - } - - int64_t null_count = 0; - if (nullable_values_) { - int64_t values_with_nulls = 0; - internal::DefinitionLevelsToBitmap( - def_levels() + start_levels_position, levels_position_ - start_levels_position, - max_def_level_, max_rep_level_, &values_with_nulls, &null_count, - valid_bits_->mutable_data(), values_written_); - values_to_read = values_with_nulls - null_count; - ReadValuesSpaced(values_with_nulls, null_count); - ConsumeBufferedValues(levels_position_ - start_levels_position); - } else { - ReadValuesDense(values_to_read); - ConsumeBufferedValues(values_to_read); - } - // Total values, including null spaces, if any - values_written_ += values_to_read + null_count; - null_count_ += null_count; - - return records_read; - } - - void DebugPrintState() override { - const int16_t* def_levels = this->def_levels(); - const int16_t* rep_levels = this->rep_levels(); - const int64_t total_levels_read = levels_position_; - - const T* values = reinterpret_cast(this->values()); - - std::cout << "def levels: "; - for (int64_t i = 0; i < total_levels_read; ++i) { - std::cout << def_levels[i] << " "; - } - std::cout << std::endl; - - std::cout << "rep levels: "; - for (int64_t i = 0; i < total_levels_read; ++i) { - std::cout << rep_levels[i] << " "; - } - std::cout << std::endl; - - std::cout << "values: "; - for (int64_t i = 0; i < this->values_written(); ++i) { - std::cout << values[i] << " "; - } - std::cout << std::endl; - } - - std::vector> GetBuilderChunks() override { - throw ParquetException("GetChunks only implemented for binary types"); - } - - protected: - using DecoderType = typename EncodingTraits::Decoder; - - DecoderType* current_decoder_; - - private: - // Map of encoding type to the respective decoder object. For example, a - // column chunk's data pages may include both dictionary-encoded and - // plain-encoded data. - std::unordered_map> decoders_; - - // Initialize repetition and definition level decoders on the next data page. - int64_t InitializeLevelDecoders(const DataPage& page, - Encoding::type repetition_level_encoding, - Encoding::type definition_level_encoding); - - void InitializeDataDecoder(const DataPage& page, int64_t levels_bytes); - - // Advance to the next data page - bool ReadNewPage() override; - - void ConfigureDictionary(const DictionaryPage* page); -}; - -class FLBARecordReader : public TypedRecordReader { - public: - FLBARecordReader(const ColumnDescriptor* descr, ::arrow::MemoryPool* pool) - : TypedRecordReader(descr, pool), builder_(nullptr) { - DCHECK_EQ(descr_->physical_type(), Type::FIXED_LEN_BYTE_ARRAY); - int byte_width = descr_->type_length(); - std::shared_ptr<::arrow::DataType> type = ::arrow::fixed_size_binary(byte_width); - builder_.reset(new ::arrow::FixedSizeBinaryBuilder(type, pool_)); - } - - ::arrow::ArrayVector GetBuilderChunks() override { - std::shared_ptr<::arrow::Array> chunk; - PARQUET_THROW_NOT_OK(builder_->Finish(&chunk)); - return ::arrow::ArrayVector({chunk}); - } - - void ReadValuesDense(int64_t values_to_read) override { - auto values = ValuesHead(); - int64_t num_decoded = - current_decoder_->Decode(values, static_cast(values_to_read)); - DCHECK_EQ(num_decoded, values_to_read); - - for (int64_t i = 0; i < num_decoded; i++) { - PARQUET_THROW_NOT_OK(builder_->Append(values[i].ptr)); - } - ResetValues(); - } - - void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override { - uint8_t* valid_bits = valid_bits_->mutable_data(); - const int64_t valid_bits_offset = values_written_; - auto values = ValuesHead(); - - int64_t num_decoded = current_decoder_->DecodeSpaced( - values, static_cast(values_to_read), static_cast(null_count), - valid_bits, valid_bits_offset); - DCHECK_EQ(num_decoded, values_to_read); - - for (int64_t i = 0; i < num_decoded; i++) { - if (::arrow::BitUtil::GetBit(valid_bits, valid_bits_offset + i)) { - PARQUET_THROW_NOT_OK(builder_->Append(values[i].ptr)); - } else { - PARQUET_THROW_NOT_OK(builder_->AppendNull()); - } - } - ResetValues(); - } - - private: - std::unique_ptr<::arrow::FixedSizeBinaryBuilder> builder_; -}; - -class ByteArrayChunkedRecordReader : public TypedRecordReader { - public: - ByteArrayChunkedRecordReader(const ColumnDescriptor* descr, ::arrow::MemoryPool* pool) - : TypedRecordReader(descr, pool), builder_(nullptr) { - // ARROW-4688(wesm): Using 2^31 - 1 chunks for now - constexpr int32_t kBinaryChunksize = 2147483647; - DCHECK_EQ(descr_->physical_type(), Type::BYTE_ARRAY); - if (descr_->converted_type() == ConvertedType::UTF8) { - builder_.reset( - new ::arrow::internal::ChunkedStringBuilder(kBinaryChunksize, pool_)); - } else { - builder_.reset( - new ::arrow::internal::ChunkedBinaryBuilder(kBinaryChunksize, pool_)); - } - } - - ::arrow::ArrayVector GetBuilderChunks() override { - ::arrow::ArrayVector chunks; - PARQUET_THROW_NOT_OK(builder_->Finish(&chunks)); - return chunks; - } - - void ReadValuesDense(int64_t values_to_read) override { - int64_t num_decoded = current_decoder_->DecodeArrowNonNull( - static_cast(values_to_read), builder_.get()); - DCHECK_EQ(num_decoded, values_to_read); - ResetValues(); - } - - void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override { - int64_t num_decoded = current_decoder_->DecodeArrow( - static_cast(values_to_read), static_cast(null_count), - valid_bits_->mutable_data(), values_written_, builder_.get()); - DCHECK_EQ(num_decoded, values_to_read); - ResetValues(); - } - - private: - std::unique_ptr<::arrow::internal::ChunkedBinaryBuilder> builder_; -}; - -template -class ByteArrayDictionaryRecordReader : public TypedRecordReader { - public: - ByteArrayDictionaryRecordReader(const ColumnDescriptor* descr, - ::arrow::MemoryPool* pool) - : TypedRecordReader(descr, pool), builder_(new BuilderType(pool)) {} - - ::arrow::ArrayVector GetBuilderChunks() override { - std::shared_ptr<::arrow::Array> chunk; - PARQUET_THROW_NOT_OK(builder_->Finish(&chunk)); - return ::arrow::ArrayVector({chunk}); - } - - void ReadValuesDense(int64_t values_to_read) override { - int64_t num_decoded = current_decoder_->DecodeArrowNonNull( - static_cast(values_to_read), builder_.get()); - DCHECK_EQ(num_decoded, values_to_read); - ResetValues(); - } - - void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override { - int64_t num_decoded = current_decoder_->DecodeArrow( - static_cast(values_to_read), static_cast(null_count), - valid_bits_->mutable_data(), values_written_, builder_.get()); - DCHECK_EQ(num_decoded, values_to_read); - ResetValues(); - } - - private: - std::unique_ptr builder_; -}; - -// TODO(wesm): Implement these to some satisfaction -template <> -void TypedRecordReader::DebugPrintState() {} - -template <> -void TypedRecordReader::DebugPrintState() {} - -template <> -void TypedRecordReader::DebugPrintState() {} - -template -inline void TypedRecordReader::ConfigureDictionary(const DictionaryPage* page) { - int encoding = static_cast(page->encoding()); - if (page->encoding() == Encoding::PLAIN_DICTIONARY || - page->encoding() == Encoding::PLAIN) { - encoding = static_cast(Encoding::RLE_DICTIONARY); - } - - auto it = decoders_.find(encoding); - if (it != decoders_.end()) { - throw ParquetException("Column cannot have more than one dictionary."); - } - - if (page->encoding() == Encoding::PLAIN_DICTIONARY || - page->encoding() == Encoding::PLAIN) { - auto dictionary = MakeTypedDecoder(Encoding::PLAIN, descr_); - dictionary->SetData(page->num_values(), page->data(), page->size()); - - // The dictionary is fully decoded during DictionaryDecoder::Init, so the - // DictionaryPage buffer is no longer required after this step - // - // TODO(wesm): investigate whether this all-or-nothing decoding of the - // dictionary makes sense and whether performance can be improved - - std::unique_ptr> decoder = MakeDictDecoder(descr_, pool_); - decoder->SetDict(dictionary.get()); - decoders_[encoding] = - std::unique_ptr(dynamic_cast(decoder.release())); - } else { - ParquetException::NYI("only plain dictionary encoding has been implemented"); - } - - current_decoder_ = decoders_[encoding].get(); - DCHECK(current_decoder_); -} - -// If the data page includes repetition and definition levels, we -// initialize the level decoders and return the number of encoded level bytes. -// The return value helps determine the number of bytes in the encoded data. -template -int64_t TypedRecordReader::InitializeLevelDecoders( - const DataPage& page, Encoding::type repetition_level_encoding, - Encoding::type definition_level_encoding) { - // Read a data page. - num_buffered_values_ = page.num_values(); - - // Have not decoded any values from the data page yet - num_decoded_values_ = 0; - - const uint8_t* buffer = page.data(); - int64_t levels_byte_size = 0; - - // Data page Layout: Repetition Levels - Definition Levels - encoded values. - // Levels are encoded as rle or bit-packed. - // Init repetition levels - if (descr_->max_repetition_level() > 0) { - int64_t rep_levels_bytes = repetition_level_decoder_.SetData( - repetition_level_encoding, descr_->max_repetition_level(), - static_cast(num_buffered_values_), buffer); - buffer += rep_levels_bytes; - levels_byte_size += rep_levels_bytes; - } - // TODO figure a way to set max_definition_level_ to 0 - // if the initial value is invalid - - // Init definition levels - if (descr_->max_definition_level() > 0) { - int64_t def_levels_bytes = definition_level_decoder_.SetData( - definition_level_encoding, descr_->max_definition_level(), - static_cast(num_buffered_values_), buffer); - levels_byte_size += def_levels_bytes; - } - - return levels_byte_size; -} - -// Get a decoder object for this page or create a new decoder if this is the -// first page with this encoding. -template -void TypedRecordReader::InitializeDataDecoder(const DataPage& page, - int64_t levels_byte_size) { - const uint8_t* buffer = page.data() + levels_byte_size; - const int64_t data_size = page.size() - levels_byte_size; - - Encoding::type encoding = page.encoding(); - - if (IsDictionaryIndexEncoding(encoding)) { - encoding = Encoding::RLE_DICTIONARY; - } - - auto it = decoders_.find(static_cast(encoding)); - if (it != decoders_.end()) { - DCHECK(it->second.get() != nullptr); - if (encoding == Encoding::RLE_DICTIONARY) { - DCHECK(current_decoder_->encoding() == Encoding::RLE_DICTIONARY); - } - current_decoder_ = it->second.get(); - } else { - switch (encoding) { - case Encoding::PLAIN: { - auto decoder = MakeTypedDecoder(Encoding::PLAIN, descr_); - current_decoder_ = decoder.get(); - decoders_[static_cast(encoding)] = std::move(decoder); - break; - } - case Encoding::RLE_DICTIONARY: - throw ParquetException("Dictionary page must be before data page."); - - case Encoding::DELTA_BINARY_PACKED: - case Encoding::DELTA_LENGTH_BYTE_ARRAY: - case Encoding::DELTA_BYTE_ARRAY: - ParquetException::NYI("Unsupported encoding"); - - default: - throw ParquetException("Unknown encoding type."); - } - } - current_decoder_->SetData(static_cast(num_buffered_values_), buffer, - static_cast(data_size)); -} - -template -bool TypedRecordReader::ReadNewPage() { - // Loop until we find the next data page. - while (true) { - current_page_ = pager_->NextPage(); - if (!current_page_) { - // EOS - return false; - } - - if (current_page_->type() == PageType::DICTIONARY_PAGE) { - ConfigureDictionary(static_cast(current_page_.get())); - continue; - } else if (current_page_->type() == PageType::DATA_PAGE) { - const auto page = std::static_pointer_cast(current_page_); - const int64_t levels_byte_size = InitializeLevelDecoders( - *page, page->repetition_level_encoding(), page->definition_level_encoding()); - InitializeDataDecoder(*page, levels_byte_size); - return true; - } else if (current_page_->type() == PageType::DATA_PAGE_V2) { - const auto page = std::static_pointer_cast(current_page_); - // Repetition and definition levels are always encoded using RLE encoding - // in the DataPageV2 format. - const int64_t levels_byte_size = - InitializeLevelDecoders(*page, Encoding::RLE, Encoding::RLE); - InitializeDataDecoder(*page, levels_byte_size); - return true; - } else { - // We don't know what this page type is. We're allowed to skip non-data - // pages. - continue; - } - } - return true; -} - -std::shared_ptr RecordReader::MakeByteArrayRecordReader( - const ColumnDescriptor* descr, arrow::MemoryPool* pool, bool read_dictionary) { - if (read_dictionary) { - if (descr->converted_type() == ConvertedType::UTF8) { - using Builder = ::arrow::StringDictionaryBuilder; - return std::shared_ptr( - new RecordReader(new ByteArrayDictionaryRecordReader(descr, pool))); - } else { - using Builder = ::arrow::BinaryDictionaryBuilder; - return std::shared_ptr( - new RecordReader(new ByteArrayDictionaryRecordReader(descr, pool))); - } - } else { - return std::shared_ptr( - new RecordReader(new ByteArrayChunkedRecordReader(descr, pool))); - } -} - -std::shared_ptr RecordReader::Make(const ColumnDescriptor* descr, - MemoryPool* pool, - const bool read_dictionary) { - switch (descr->physical_type()) { - case Type::BOOLEAN: - return std::shared_ptr( - new RecordReader(new TypedRecordReader(descr, pool))); - case Type::INT32: - return std::shared_ptr( - new RecordReader(new TypedRecordReader(descr, pool))); - case Type::INT64: - return std::shared_ptr( - new RecordReader(new TypedRecordReader(descr, pool))); - case Type::INT96: - return std::shared_ptr( - new RecordReader(new TypedRecordReader(descr, pool))); - case Type::FLOAT: - return std::shared_ptr( - new RecordReader(new TypedRecordReader(descr, pool))); - case Type::DOUBLE: - return std::shared_ptr( - new RecordReader(new TypedRecordReader(descr, pool))); - case Type::BYTE_ARRAY: - return RecordReader::MakeByteArrayRecordReader(descr, pool, read_dictionary); - case Type::FIXED_LEN_BYTE_ARRAY: - return std::shared_ptr( - new RecordReader(new FLBARecordReader(descr, pool))); - default: { - // PARQUET-1481: This can occur if the file is corrupt - std::stringstream ss; - ss << "Invalid physical column type: " << static_cast(descr->physical_type()); - throw ParquetException(ss.str()); - } - } - // Unreachable code, but supress compiler warning - return nullptr; -} - -// ---------------------------------------------------------------------- -// Implement public API - -RecordReader::RecordReader(RecordReaderImpl* impl) { impl_.reset(impl); } - -RecordReader::~RecordReader() {} - -int64_t RecordReader::ReadRecords(int64_t num_records) { - return impl_->ReadRecords(num_records); -} - -void RecordReader::Reset() { return impl_->Reset(); } - -void RecordReader::Reserve(int64_t num_values) { impl_->Reserve(num_values); } - -const int16_t* RecordReader::def_levels() const { return impl_->def_levels(); } - -const int16_t* RecordReader::rep_levels() const { return impl_->rep_levels(); } - -const uint8_t* RecordReader::values() const { return impl_->values(); } - -std::shared_ptr RecordReader::ReleaseValues() { - return impl_->ReleaseValues(); -} - -std::shared_ptr RecordReader::ReleaseIsValid() { - return impl_->ReleaseIsValid(); -} - -int64_t RecordReader::values_written() const { return impl_->values_written(); } - -int64_t RecordReader::levels_position() const { return impl_->levels_position(); } - -int64_t RecordReader::levels_written() const { return impl_->levels_written(); } - -int64_t RecordReader::null_count() const { return impl_->null_count(); } - -bool RecordReader::nullable_values() const { return impl_->nullable_values(); } - -bool RecordReader::HasMoreData() const { return impl_->HasMoreData(); } - -void RecordReader::SetPageReader(std::unique_ptr reader) { - impl_->SetPageReader(std::move(reader)); -} - -::arrow::ArrayVector RecordReader::GetBuilderChunks() { - return impl_->GetBuilderChunks(); -} - -void RecordReader::DebugPrintState() { impl_->DebugPrintState(); } - -} // namespace internal -} // namespace parquet diff --git a/cpp/src/parquet/arrow/record_reader.h b/cpp/src/parquet/arrow/record_reader.h deleted file mode 100644 index 2ae26a5a47d..00000000000 --- a/cpp/src/parquet/arrow/record_reader.h +++ /dev/null @@ -1,122 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef PARQUET_RECORD_READER_H -#define PARQUET_RECORD_READER_H - -#include -#include -#include - -#include "parquet/platform.h" - -namespace arrow { - -class Array; - -} // namespace arrow - -namespace parquet { - -class ColumnDescriptor; -class PageReader; - -namespace internal { - -/// \brief Stateful column reader that delimits semantic records for both flat -/// and nested columns -/// -/// \note API EXPERIMENTAL -/// \since 1.3.0 -class RecordReader { - public: - // So that we can create subclasses - class RecordReaderImpl; - - static std::shared_ptr Make( - const ColumnDescriptor* descr, - ::arrow::MemoryPool* pool = ::arrow::default_memory_pool(), - const bool read_dictionary = false); - - virtual ~RecordReader(); - - /// \brief Decoded definition levels - const int16_t* def_levels() const; - - /// \brief Decoded repetition levels - const int16_t* rep_levels() const; - - /// \brief Decoded values, including nulls, if any - const uint8_t* values() const; - - /// \brief Attempt to read indicated number of records from column chunk - /// \return number of records read - int64_t ReadRecords(int64_t num_records); - - /// \brief Pre-allocate space for data. Results in better flat read performance - void Reserve(int64_t num_values); - - /// \brief Clear consumed values and repetition/definition levels as the - /// result of calling ReadRecords - void Reset(); - - std::shared_ptr ReleaseValues(); - std::shared_ptr ReleaseIsValid(); - - /// \brief Number of values written including nulls (if any) - int64_t values_written() const; - - /// \brief Number of definition / repetition levels (from those that have - /// been decoded) that have been consumed inside the reader. - int64_t levels_position() const; - - /// \brief Number of definition / repetition levels that have been written - /// internally in the reader - int64_t levels_written() const; - - /// \brief Number of nulls in the leaf - int64_t null_count() const; - - /// \brief True if the leaf values are nullable - bool nullable_values() const; - - /// \brief Return true if the record reader has more internal data yet to - /// process - bool HasMoreData() const; - - /// \brief Advance record reader to the next row group - /// \param[in] reader obtained from RowGroupReader::GetColumnPageReader - void SetPageReader(std::unique_ptr reader); - - void DebugPrintState(); - - // For BYTE_ARRAY, FIXED_LEN_BYTE_ARRAY types that may have chunked output - std::vector> GetBuilderChunks(); - - private: - std::unique_ptr impl_; - explicit RecordReader(RecordReaderImpl* impl); - - static std::shared_ptr MakeByteArrayRecordReader( - const ColumnDescriptor* descr, ::arrow::MemoryPool* pool, - const bool read_dictionary); -}; - -} // namespace internal -} // namespace parquet - -#endif // PARQUET_RECORD_READER_H diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index 130b75a5210..bc7d255fde4 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -30,6 +30,7 @@ #include "arrow/util/ubsan.h" #include "parquet/column_page.h" +#include "parquet/column_reader_internal.h" #include "parquet/encoding.h" #include "parquet/properties.h" #include "parquet/statistics.h" @@ -264,31 +265,60 @@ std::unique_ptr PageReader::Open( } // ---------------------------------------------------------------------- -// TypedColumnReader implementations +// Impl base class for TypedColumnReader and RecordReader + +// PLAIN_DICTIONARY is deprecated but used to be used as a dictionary index +// encoding. +static bool IsDictionaryIndexEncoding(const Encoding::type& e) { + return e == Encoding::RLE_DICTIONARY || e == Encoding::PLAIN_DICTIONARY; +} template -class TypedColumnReaderImpl : public TypedColumnReader { +class ColumnReaderImplBase { public: using T = typename DType::c_type; - TypedColumnReaderImpl(const ColumnDescriptor* descr, std::unique_ptr pager, - ::arrow::MemoryPool* pool) + ColumnReaderImplBase(const ColumnDescriptor* descr, ::arrow::MemoryPool* pool) : descr_(descr), - pager_(std::move(pager)), num_buffered_values_(0), num_decoded_values_(0), + max_def_level_(descr->max_definition_level()), + max_rep_level_(descr->max_repetition_level()), pool_(pool), - current_decoder_(NULLPTR) {} + current_decoder_(nullptr) {} - int64_t ReadBatch(int64_t batch_size, int16_t* def_levels, int16_t* rep_levels, - T* values, int64_t* values_read) override; + protected: + // Read up to batch_size values from the current data page into the + // pre-allocated memory T* + // + // @returns: the number of values read into the out buffer + int64_t ReadValues(int64_t batch_size, T* out) { + int64_t num_decoded = current_decoder_->Decode(out, static_cast(batch_size)); + return num_decoded; + } - int64_t ReadBatchSpaced(int64_t batch_size, int16_t* def_levels, int16_t* rep_levels, - T* values, uint8_t* valid_bits, int64_t valid_bits_offset, - int64_t* levels_read, int64_t* values_read, - int64_t* null_count) override; + // Read up to batch_size values from the current data page into the + // pre-allocated memory T*, leaving spaces for null entries according + // to the def_levels. + // + // @returns: the number of values read into the out buffer + int64_t ReadValuesSpaced(int64_t batch_size, T* out, int64_t null_count, + uint8_t* valid_bits, + int64_t valid_bits_offset) { + return current_decoder_->DecodeSpaced(out, static_cast(batch_size), + static_cast(null_count), valid_bits, + valid_bits_offset); + } - int64_t Skip(int64_t num_rows_to_skip) override; + // Read multiple definition levels into preallocated memory + // + // Returns the number of decoded definition levels + int64_t ReadDefinitionLevels(int64_t batch_size, int16_t* levels) { + if (descr_->max_definition_level() == 0) { + return 0; + } + return definition_level_decoder_.Decode(static_cast(batch_size), levels); + } bool HasNext() override { // Either there is no data page available yet, or the data page has been @@ -301,26 +331,6 @@ class TypedColumnReaderImpl : public TypedColumnReader { return true; } - Type::type type() const override { return descr_->physical_type(); } - - const ColumnDescriptor* descr() const override { return descr_; } - - protected: - using DecoderType = TypedDecoder; - - // Advance to the next data page - bool ReadNewPage(); - - // Read multiple definition levels into preallocated memory - // - // Returns the number of decoded definition levels - int64_t ReadDefinitionLevels(int64_t batch_size, int16_t* levels) { - if (descr_->max_definition_level() == 0) { - return 0; - } - return definition_level_decoder_.Decode(static_cast(batch_size), levels); - } - // Read multiple repetition levels into preallocated memory // Returns the number of decoded repetition levels int64_t ReadRepetitionLevels(int64_t batch_size, int16_t* levels) { @@ -330,14 +340,162 @@ class TypedColumnReaderImpl : public TypedColumnReader { return repetition_level_decoder_.Decode(static_cast(batch_size), levels); } - int64_t available_values_current_page() const { - return num_buffered_values_ - num_decoded_values_; + // Advance to the next data page + bool ReadNewPage() { + // Loop until we find the next data page. + while (true) { + current_page_ = pager_->NextPage(); + if (!current_page_) { + // EOS + return false; + } + + if (current_page_->type() == PageType::DICTIONARY_PAGE) { + ConfigureDictionary(static_cast(current_page_.get())); + continue; + } else if (current_page_->type() == PageType::DATA_PAGE) { + const auto page = std::static_pointer_cast(current_page_); + const int64_t levels_byte_size = InitializeLevelDecoders( + *page, page->repetition_level_encoding(), page->definition_level_encoding()); + InitializeDataDecoder(*page, levels_byte_size); + return true; + } else if (current_page_->type() == PageType::DATA_PAGE_V2) { + const auto page = std::static_pointer_cast(current_page_); + // Repetition and definition levels are always encoded using RLE encoding + // in the DataPageV2 format. + const int64_t levels_byte_size = + InitializeLevelDecoders(*page, Encoding::RLE, Encoding::RLE); + InitializeDataDecoder(*page, levels_byte_size); + return true; + } else { + // We don't know what this page type is. We're allowed to skip non-data + // pages. + continue; + } + } + return true; } - void ConsumeBufferedValues(int64_t num_values) { num_decoded_values_ += num_values; } + void ConfigureDictionary(const DictionaryPage* page) { + int encoding = static_cast(page->encoding()); + if (page->encoding() == Encoding::PLAIN_DICTIONARY || + page->encoding() == Encoding::PLAIN) { + encoding = static_cast(Encoding::RLE_DICTIONARY); + } - const ColumnDescriptor* descr_; + auto it = decoders_.find(encoding); + if (it != decoders_.end()) { + throw ParquetException("Column cannot have more than one dictionary."); + } + + if (page->encoding() == Encoding::PLAIN_DICTIONARY || + page->encoding() == Encoding::PLAIN) { + auto dictionary = MakeTypedDecoder(Encoding::PLAIN, descr_); + dictionary->SetData(page->num_values(), page->data(), page->size()); + + // The dictionary is fully decoded during DictionaryDecoder::Init, so the + // DictionaryPage buffer is no longer required after this step + // + // TODO(wesm): investigate whether this all-or-nothing decoding of the + // dictionary makes sense and whether performance can be improved + + std::unique_ptr> decoder = MakeDictDecoder(descr_, pool_); + decoder->SetDict(dictionary.get()); + decoders_[encoding] = + std::unique_ptr(dynamic_cast(decoder.release())); + } else { + ParquetException::NYI("only plain dictionary encoding has been implemented"); + } + + current_decoder_ = decoders_[encoding].get(); + DCHECK(current_decoder_); + } + // Initialize repetition and definition level decoders on the next data page. + + // If the data page includes repetition and definition levels, we + // initialize the level decoders and return the number of encoded level bytes. + // The return value helps determine the number of bytes in the encoded data. + int64_t InitializeLevelDecoders(const DataPage& page, + Encoding::type repetition_level_encoding, + Encoding::type definition_level_encoding) { + // Read a data page. + num_buffered_values_ = page.num_values(); + + // Have not decoded any values from the data page yet + num_decoded_values_ = 0; + + const uint8_t* buffer = page.data(); + int64_t levels_byte_size = 0; + + // Data page Layout: Repetition Levels - Definition Levels - encoded values. + // Levels are encoded as rle or bit-packed. + // Init repetition levels + if (descr_->max_repetition_level() > 0) { + int64_t rep_levels_bytes = repetition_level_decoder_.SetData( + repetition_level_encoding, descr_->max_repetition_level(), + static_cast(num_buffered_values_), buffer); + buffer += rep_levels_bytes; + levels_byte_size += rep_levels_bytes; + } + // TODO figure a way to set max_definition_level_ to 0 + // if the initial value is invalid + + // Init definition levels + if (descr_->max_definition_level() > 0) { + int64_t def_levels_bytes = definition_level_decoder_.SetData( + definition_level_encoding, descr_->max_definition_level(), + static_cast(num_buffered_values_), buffer); + levels_byte_size += def_levels_bytes; + } + + return levels_byte_size; + } + + // Get a decoder object for this page or create a new decoder if this is the + // first page with this encoding. + void InitializeDataDecoder(const DataPage& page, int64_t levels_byte_size) { + const uint8_t* buffer = page.data() + levels_byte_size; + const int64_t data_size = page.size() - levels_byte_size; + + Encoding::type encoding = page.encoding(); + + if (IsDictionaryIndexEncoding(encoding)) { + encoding = Encoding::RLE_DICTIONARY; + } + + auto it = decoders_.find(static_cast(encoding)); + if (it != decoders_.end()) { + DCHECK(it->second.get() != nullptr); + if (encoding == Encoding::RLE_DICTIONARY) { + DCHECK(current_decoder_->encoding() == Encoding::RLE_DICTIONARY); + } + current_decoder_ = it->second.get(); + } else { + switch (encoding) { + case Encoding::PLAIN: { + auto decoder = MakeTypedDecoder(Encoding::PLAIN, descr_); + current_decoder_ = decoder.get(); + decoders_[static_cast(encoding)] = std::move(decoder); + break; + } + case Encoding::RLE_DICTIONARY: + throw ParquetException("Dictionary page must be before data page."); + + case Encoding::DELTA_BINARY_PACKED: + case Encoding::DELTA_LENGTH_BYTE_ARRAY: + case Encoding::DELTA_BYTE_ARRAY: + ParquetException::NYI("Unsupported encoding"); + + default: + throw ParquetException("Unknown encoding type."); + } + } + current_decoder_->SetData(static_cast(num_buffered_values_), buffer, + static_cast(data_size)); + } + + const ColumnDescriptor* descr_; std::unique_ptr pager_; std::shared_ptr current_page_; @@ -359,21 +517,12 @@ class TypedColumnReaderImpl : public TypedColumnReader { // into memory int64_t num_decoded_values_; - ::arrow::MemoryPool* pool_; + const int16_t max_def_level_; + const int16_t max_rep_level_; - // Read up to batch_size values from the current data page into the - // pre-allocated memory T* - // - // @returns: the number of values read into the out buffer - int64_t ReadValues(int64_t batch_size, T* out); + ::arrow::MemoryPool* pool_; - // Read up to batch_size values from the current data page into the - // pre-allocated memory T*, leaving spaces for null entries according - // to the def_levels. - // - // @returns: the number of values read into the out buffer - int64_t ReadValuesSpaced(int64_t batch_size, T* out, int64_t null_count, - uint8_t* valid_bits, int64_t valid_bits_offset); + using DecoderType = TypedDecoder; // Map of encoding type to the respective decoder object. For example, a // column chunk's data pages may include both dictionary-encoded and @@ -382,23 +531,41 @@ class TypedColumnReaderImpl : public TypedColumnReader { void ConfigureDictionary(const DictionaryPage* page); DecoderType* current_decoder_; + + int64_t available_values_current_page() const { + return num_buffered_values_ - num_decoded_values_; + } + + void ConsumeBufferedValues(int64_t num_values) { num_decoded_values_ += num_values; } }; -template -int64_t TypedColumnReaderImpl::ReadValues(int64_t batch_size, T* out) { - int64_t num_decoded = current_decoder_->Decode(out, static_cast(batch_size)); - return num_decoded; -} +// ---------------------------------------------------------------------- +// TypedColumnReader implementations template -int64_t TypedColumnReaderImpl::ReadValuesSpaced(int64_t batch_size, T* out, - int64_t null_count, - uint8_t* valid_bits, - int64_t valid_bits_offset) { - return current_decoder_->DecodeSpaced(out, static_cast(batch_size), - static_cast(null_count), valid_bits, - valid_bits_offset); -} +class TypedColumnReaderImpl : public TypedColumnReader, + public ColumnReaderImplBase { + public: + TypedColumnReaderImpl(const ColumnDescriptor* descr, std::unique_ptr pager, + ::arrow::MemoryPool* pool) + : ColumnReaderImplBase(descr, pool) { + pager_ = std::move(pager); + } + + int64_t ReadBatch(int64_t batch_size, int16_t* def_levels, int16_t* rep_levels, + T* values, int64_t* values_read) override; + + int64_t ReadBatchSpaced(int64_t batch_size, int16_t* def_levels, int16_t* rep_levels, + T* values, uint8_t* valid_bits, int64_t valid_bits_offset, + int64_t* levels_read, int64_t* values_read, + int64_t* null_count) override; + + int64_t Skip(int64_t num_rows_to_skip) override; + + Type::type type() const override { return descr_->physical_type(); } + + const ColumnDescriptor* descr() const override { return descr_; } +}; template int64_t TypedColumnReaderImpl::ReadBatch(int64_t batch_size, int16_t* def_levels, @@ -535,20 +702,17 @@ int64_t TypedColumnReaderImpl::Skip(int64_t num_rows_to_skip) { int64_t batch_size = 1024; // ReadBatch with a smaller memory footprint int64_t values_read = 0; - std::shared_ptr vals = AllocateBuffer( - this->pool_, batch_size * type_traits::value_byte_size); - std::shared_ptr def_levels = - AllocateBuffer(this->pool_, batch_size * sizeof(int16_t)); - - std::shared_ptr rep_levels = - AllocateBuffer(this->pool_, batch_size * sizeof(int16_t)); + std::shared_ptr scratch = + AllocateBuffer(this->pool_, batch_size * + std::max(sizeof(int16_t), + type_traits::value_byte_size)); do { batch_size = std::min(batch_size, rows_to_skip); values_read = ReadBatch(static_cast(batch_size), - reinterpret_cast(def_levels->mutable_data()), - reinterpret_cast(rep_levels->mutable_data()), - reinterpret_cast(vals->mutable_data()), &values_read); + reinterpret_cast(scratch->mutable_data()), + reinterpret_cast(scratch->mutable_data()), + reinterpret_cast(scratch->mutable_data()), &values_read); rows_to_skip -= values_read; } while (values_read > 0 && rows_to_skip > 0); } @@ -556,180 +720,613 @@ int64_t TypedColumnReaderImpl::Skip(int64_t num_rows_to_skip) { return num_rows_to_skip - rows_to_skip; } -template -void TypedColumnReaderImpl::ConfigureDictionary(const DictionaryPage* page) { - int encoding = static_cast(page->encoding()); - if (page->encoding() == Encoding::PLAIN_DICTIONARY || - page->encoding() == Encoding::PLAIN) { - encoding = static_cast(Encoding::RLE_DICTIONARY); - } - - auto it = decoders_.find(encoding); - if (it != decoders_.end()) { - throw ParquetException("Column cannot have more than one dictionary."); - } - - if (page->encoding() == Encoding::PLAIN_DICTIONARY || - page->encoding() == Encoding::PLAIN) { - auto dictionary = MakeTypedDecoder(Encoding::PLAIN, descr_); - dictionary->SetData(page->num_values(), page->data(), page->size()); - - // The dictionary is fully decoded during SetData, so the - // DictionaryPage buffer is no longer required after this step - // - // TODO(wesm): investigate whether this all-or-nothing decoding of the - // dictionary makes sense and whether performance can be improved - auto decoder = MakeDictDecoder(descr_, pool_); - decoder->SetDict(dictionary.get()); - decoders_[encoding] = std::move(decoder); - } else { - ParquetException::NYI("only plain dictionary encoding has been implemented"); - } +// ---------------------------------------------------------------------- +// Dynamic column reader constructor - current_decoder_ = decoders_[encoding].get(); +std::shared_ptr ColumnReader::Make(const ColumnDescriptor* descr, + std::unique_ptr pager, + MemoryPool* pool) { + switch (descr->physical_type()) { + case Type::BOOLEAN: + return std::make_shared>(descr, std::move(pager), + pool); + case Type::INT32: + return std::make_shared>(descr, std::move(pager), + pool); + case Type::INT64: + return std::make_shared>(descr, std::move(pager), + pool); + case Type::INT96: + return std::make_shared>(descr, std::move(pager), + pool); + case Type::FLOAT: + return std::make_shared>(descr, std::move(pager), + pool); + case Type::DOUBLE: + return std::make_shared>(descr, std::move(pager), + pool); + case Type::BYTE_ARRAY: + return std::make_shared>( + descr, std::move(pager), pool); + case Type::FIXED_LEN_BYTE_ARRAY: + return std::make_shared>(descr, std::move(pager), + pool); + default: + ParquetException::NYI("type reader not implemented"); + } + // Unreachable code, but supress compiler warning + return std::shared_ptr(nullptr); } -// PLAIN_DICTIONARY is deprecated but used to be used as a dictionary index -// encoding. -static bool IsDictionaryIndexEncoding(const Encoding::type& e) { - return e == Encoding::RLE_DICTIONARY || e == Encoding::PLAIN_DICTIONARY; -} +// ---------------------------------------------------------------------- +// RecordReader + +namespace internal { + +// The minimum number of repetition/definition levels to decode at a time, for +// better vectorized performance when doing many smaller record reads +constexpr int64_t kMinLevelBatchSize = 1024; template -bool TypedColumnReaderImpl::ReadNewPage() { - // Loop until we find the next data page. - const uint8_t* buffer; +class TypedRecordReader : public ColumnReaderImplBase, + public RecordReader { + public: + using BASE = ColumnReaderImplBase; + TypedRecordReader(const ColumnDescriptor* descr, MemoryPool* pool) + : BASE(descr, pool) { + nullable_values_ = internal::HasSpacedValues(descr); + at_record_start_(true); + records_read_ = 0; + values_written_ = 0; + values_capacity_ = 0; + null_count_ = 0; + levels_written_ = 0; + levels_position_ = 0; + levels_capacity_ = 0; + uses_values_ = !(descr->physical_type() == Type::BYTE_ARRAY); + + if (uses_values_) { + values_ = AllocateBuffer(pool); + } + valid_bits_ = AllocateBuffer(pool); + def_levels_ = AllocateBuffer(pool); + rep_levels_ = AllocateBuffer(pool); + Reset(); + } + + int64_t ReadRecords(int64_t num_records) override { + // Delimit records, then read values at the end + int64_t records_read = 0; - while (true) { - current_page_ = pager_->NextPage(); - if (!current_page_) { - // EOS - return false; + if (levels_position_ < levels_written_) { + records_read += ReadRecordData(num_records); } - if (current_page_->type() == PageType::DICTIONARY_PAGE) { - ConfigureDictionary(static_cast(current_page_.get())); - continue; - } else if (current_page_->type() == PageType::DATA_PAGE) { - const DataPageV1& page = static_cast(*current_page_); + int64_t level_batch_size = std::max(kMinLevelBatchSize, num_records); + + // If we are in the middle of a record, we continue until reaching the + // desired number of records or the end of the current record if we've found + // enough records + while (!at_record_start_ || records_read < num_records) { + // Is there more data to read in this row group? + if (!HasNext()) { + if (!at_record_start_) { + // We ended the row group while inside a record that we haven't seen + // the end of yet. So increment the record count for the last record in + // the row group + ++records_read; + at_record_start_ = true; + } + break; + } + + /// We perform multiple batch reads until we either exhaust the row group + /// or observe the desired number of records + int64_t batch_size = std::min(level_batch_size, available_values_current_page()); + + // No more data in column + if (batch_size == 0) { + break; + } + + if (max_def_level_ > 0) { + ReserveLevels(batch_size); - // Read a data page. - num_buffered_values_ = page.num_values(); + int16_t* def_levels = this->def_levels() + levels_written_; + int16_t* rep_levels = this->rep_levels() + levels_written_; - // Have not decoded any values from the data page yet - num_decoded_values_ = 0; + // Not present for non-repeated fields + int64_t levels_read = 0; + if (max_rep_level_ > 0) { + levels_read = ReadDefinitionLevels(batch_size, def_levels); + if (ReadRepetitionLevels(batch_size, rep_levels) != levels_read) { + throw ParquetException("Number of decoded rep / def levels did not match"); + } + } else if (max_def_level_ > 0) { + levels_read = ReadDefinitionLevels(batch_size, def_levels); + } + + // Exhausted column chunk + if (levels_read == 0) { + break; + } + + levels_written_ += levels_read; + records_read += ReadRecordData(num_records - records_read); + } else { + // No repetition or definition levels + batch_size = std::min(num_records - records_read, batch_size); + records_read += ReadRecordData(batch_size); + } + } + + return records_read; + } + + // We may outwardly have the appearance of having exhausted a column chunk + // when in fact we are in the middle of processing the last batch + bool has_values_to_process() const { return levels_position_ < levels_written_; } + + std::shared_ptr ReleaseValues() override { + if (uses_values_) { + auto result = values_; + values_ = AllocateBuffer(pool_); + return result; + } else { + return nullptr; + } + } + + std::shared_ptr ReleaseIsValid() override { + auto result = valid_bits_; + valid_bits_ = AllocateBuffer(pool_); + return result; + } + + // Process written repetition/definition levels to reach the end of + // records. Process no more levels than necessary to delimit the indicated + // number of logical records. Updates internal state of RecordReader + // + // \return Number of records delimited + int64_t DelimitRecords(int64_t num_records, int64_t* values_seen) { + int64_t values_to_read = 0; + int64_t records_read = 0; + + const int16_t* def_levels = this->def_levels() + levels_position_; + const int16_t* rep_levels = this->rep_levels() + levels_position_; + + DCHECK_GT(max_rep_level_, 0); + + // Count logical records and number of values to read + while (levels_position_ < levels_written_) { + if (*rep_levels++ == 0) { + // If at_record_start_ is true, we are seeing the start of a record + // for the second time, such as after repeated calls to + // DelimitRecords. In this case we must continue until we find + // another record start or exhausting the ColumnChunk + if (!at_record_start_) { + // We've reached the end of a record; increment the record count. + ++records_read; + if (records_read == num_records) { + // We've found the number of records we were looking for. Set + // at_record_start_ to true and break + at_record_start_ = true; + break; + } + } + } + + // We have decided to consume the level at this position; therefore we + // must advance until we find another record boundary + at_record_start_ = false; + + if (*def_levels++ == max_def_level_) { + ++values_to_read; + } + ++levels_position_; + } + *values_seen = values_to_read; + return records_read; + } - buffer = page.data(); + Type::type type() const { return descr_->physical_type(); } - // If the data page includes repetition and definition levels, we - // initialize the level decoder and subtract the encoded level bytes from - // the page size to determine the number of bytes in the encoded data. - int64_t data_size = page.size(); + const ColumnDescriptor* descr() const { return descr_; } - // Data page Layout: Repetition Levels - Definition Levels - encoded values. - // Levels are encoded as rle or bit-packed. - // Init repetition levels + void Reserve(int64_t capacity) override { + ReserveLevels(capacity); + ReserveValues(capacity); + } + + void ReserveLevels(int64_t capacity) { + if (descr_->max_definition_level() > 0 && + (levels_written_ + capacity > levels_capacity_)) { + int64_t new_levels_capacity = BitUtil::NextPower2(levels_capacity_ + 1); + while (levels_written_ + capacity > new_levels_capacity) { + new_levels_capacity = BitUtil::NextPower2(new_levels_capacity + 1); + } + PARQUET_THROW_NOT_OK( + def_levels_->Resize(new_levels_capacity * sizeof(int16_t), false)); if (descr_->max_repetition_level() > 0) { - int64_t rep_levels_bytes = repetition_level_decoder_.SetData( - page.repetition_level_encoding(), descr_->max_repetition_level(), - static_cast(num_buffered_values_), buffer); - buffer += rep_levels_bytes; - data_size -= rep_levels_bytes; + PARQUET_THROW_NOT_OK( + rep_levels_->Resize(new_levels_capacity * sizeof(int16_t), false)); } - // TODO figure a way to set max_definition_level_ to 0 - // if the initial value is invalid - - // Init definition levels - if (descr_->max_definition_level() > 0) { - int64_t def_levels_bytes = definition_level_decoder_.SetData( - page.definition_level_encoding(), descr_->max_definition_level(), - static_cast(num_buffered_values_), buffer); - buffer += def_levels_bytes; - data_size -= def_levels_bytes; + levels_capacity_ = new_levels_capacity; + } + } + + void ReserveValues(int64_t capacity) { + if (values_written_ + capacity > values_capacity_) { + int64_t new_values_capacity = BitUtil::NextPower2(values_capacity_ + 1); + while (values_written_ + capacity > new_values_capacity) { + new_values_capacity = BitUtil::NextPower2(new_values_capacity + 1); } - // Get a decoder object for this page or create a new decoder if this is the - // first page with this encoding. - Encoding::type encoding = page.encoding(); + int type_size = GetTypeByteSize(descr_->physical_type()); - if (IsDictionaryIndexEncoding(encoding)) { - encoding = Encoding::RLE_DICTIONARY; + // XXX(wesm): A hack to avoid memory allocation when reading directly + // into builder classes + if (uses_values_) { + PARQUET_THROW_NOT_OK(values_->Resize(new_values_capacity * type_size, false)); } - auto it = decoders_.find(static_cast(encoding)); - if (it != decoders_.end()) { - if (encoding == Encoding::RLE_DICTIONARY) { - DCHECK(current_decoder_->encoding() == Encoding::RLE_DICTIONARY); - } - current_decoder_ = it->second.get(); - } else { - switch (encoding) { - case Encoding::PLAIN: { - auto decoder = MakeTypedDecoder(Encoding::PLAIN, descr_); - current_decoder_ = decoder.get(); - decoders_[static_cast(encoding)] = std::move(decoder); - break; - } - case Encoding::RLE_DICTIONARY: - throw ParquetException("Dictionary page must be before data page."); + values_capacity_ = new_values_capacity; + } + if (nullable_values_) { + int64_t valid_bytes_new = BitUtil::BytesForBits(values_capacity_); + if (valid_bits_->size() < valid_bytes_new) { + int64_t valid_bytes_old = BitUtil::BytesForBits(values_written_); + PARQUET_THROW_NOT_OK(valid_bits_->Resize(valid_bytes_new, false)); + + // Avoid valgrind warnings + memset(valid_bits_->mutable_data() + valid_bytes_old, 0, + valid_bytes_new - valid_bytes_old); + } + } + } - case Encoding::DELTA_BINARY_PACKED: - case Encoding::DELTA_LENGTH_BYTE_ARRAY: - case Encoding::DELTA_BYTE_ARRAY: - ParquetException::NYI("Unsupported encoding"); + void Reset() override { + ResetValues(); - default: - throw ParquetException("Unknown encoding type."); - } + if (levels_written_ > 0) { + const int64_t levels_remaining = levels_written_ - levels_position_; + // Shift remaining levels to beginning of buffer and trim to only the number + // of decoded levels remaining + int16_t* def_data = def_levels(); + int16_t* rep_data = rep_levels(); + + std::copy(def_data + levels_position_, def_data + levels_written_, def_data); + PARQUET_THROW_NOT_OK( + def_levels_->Resize(levels_remaining * sizeof(int16_t), false)); + + if (max_rep_level_ > 0) { + std::copy(rep_data + levels_position_, rep_data + levels_written_, rep_data); + PARQUET_THROW_NOT_OK( + rep_levels_->Resize(levels_remaining * sizeof(int16_t), false)); } - current_decoder_->SetData(static_cast(num_buffered_values_), buffer, - static_cast(data_size)); - return true; + + levels_written_ -= levels_position_; + levels_position_ = 0; + levels_capacity_ = levels_remaining; + } + + records_read_ = 0; + + // Call Finish on the binary builders to reset them + } + + void SetPageReader(std::unique_ptr reader) override { + at_record_start_ = true; + pager_ = std::move(reader); + ResetDecoders(); + } + + bool HasMoreData() const override { return pager_ != nullptr; } + + // Dictionary decoders must be reset when advancing row groups + void ResetDecoders() override { decoders_.clear(); } + + virtual void ReadValuesSpaced(int64_t values_with_nulls, int64_t null_count) { + uint8_t* valid_bits = valid_bits_->mutable_data(); + const int64_t valid_bits_offset = values_written_; + + int64_t num_decoded = current_decoder_->DecodeSpaced( + ValuesHead(), static_cast(values_with_nulls), + static_cast(null_count), valid_bits, valid_bits_offset); + DCHECK_EQ(num_decoded, values_with_nulls); + } + + virtual void ReadValuesDense(int64_t values_to_read) { + int64_t num_decoded = + current_decoder_->Decode(ValuesHead(), static_cast(values_to_read)); + DCHECK_EQ(num_decoded, values_to_read); + } + + // Return number of logical records read + int64_t ReadRecordData(int64_t num_records) override { + // Conservative upper bound + const int64_t possible_num_values = + std::max(num_records, levels_written_ - levels_position_); + ReserveValues(possible_num_values); + + const int64_t start_levels_position = levels_position_; + + int64_t values_to_read = 0; + int64_t records_read = 0; + if (max_rep_level_ > 0) { + records_read = DelimitRecords(num_records, &values_to_read); + } else if (max_def_level_ > 0) { + // No repetition levels, skip delimiting logic. Each level represents a + // null or not null entry + records_read = std::min(levels_written_ - levels_position_, num_records); + + // This is advanced by DelimitRecords, which we skipped + levels_position_ += records_read; } else { - // We don't know what this page type is. We're allowed to skip non-data - // pages. - continue; + records_read = values_to_read = num_records; + } + + int64_t null_count = 0; + if (nullable_values_) { + int64_t values_with_nulls = 0; + internal::DefinitionLevelsToBitmap( + def_levels() + start_levels_position, levels_position_ - start_levels_position, + max_def_level_, max_rep_level_, &values_with_nulls, &null_count, + valid_bits_->mutable_data(), values_written_); + values_to_read = values_with_nulls - null_count; + ReadValuesSpaced(values_with_nulls, null_count); + ConsumeBufferedValues(levels_position_ - start_levels_position); + } else { + ReadValuesDense(values_to_read); + ConsumeBufferedValues(values_to_read); } + // Total values, including null spaces, if any + values_written_ += values_to_read + null_count; + null_count_ += null_count; + + return records_read; } - return true; -} -// ---------------------------------------------------------------------- -// Dynamic column reader constructor + void DebugPrintState() override { + const int16_t* def_levels = this->def_levels(); + const int16_t* rep_levels = this->rep_levels(); + const int64_t total_levels_read = levels_position_; -std::shared_ptr ColumnReader::Make(const ColumnDescriptor* descr, - std::unique_ptr pager, - MemoryPool* pool) { + const T* values = reinterpret_cast(this->values()); + + std::cout << "def levels: "; + for (int64_t i = 0; i < total_levels_read; ++i) { + std::cout << def_levels[i] << " "; + } + std::cout << std::endl; + + std::cout << "rep levels: "; + for (int64_t i = 0; i < total_levels_read; ++i) { + std::cout << rep_levels[i] << " "; + } + std::cout << std::endl; + + std::cout << "values: "; + for (int64_t i = 0; i < this->values_written(); ++i) { + std::cout << values[i] << " "; + } + std::cout << std::endl; + } + + std::vector> GetBuilderChunks() override { + throw ParquetException("GetChunks only implemented for binary types"); + } + + void ResetValues() { + if (values_written_ > 0) { + // Resize to 0, but do not shrink to fit + if (uses_values_) { + PARQUET_THROW_NOT_OK(values_->Resize(0, false)); + } + PARQUET_THROW_NOT_OK(valid_bits_->Resize(0, false)); + values_written_ = 0; + values_capacity_ = 0; + null_count_ = 0; + } + } + + protected: + template + T* ValuesHead() { + return reinterpret_cast(values_->mutable_data()) + values_written_; + } +}; + +class FLBARecordReader : public TypedRecordReader { + public: + FLBARecordReader(const ColumnDescriptor* descr, ::arrow::MemoryPool* pool) + : TypedRecordReader(descr, pool), builder_(nullptr) { + DCHECK_EQ(descr_->physical_type(), Type::FIXED_LEN_BYTE_ARRAY); + int byte_width = descr_->type_length(); + std::shared_ptr<::arrow::DataType> type = ::arrow::fixed_size_binary(byte_width); + builder_.reset(new ::arrow::FixedSizeBinaryBuilder(type, pool_)); + } + + ::arrow::ArrayVector GetBuilderChunks() override { + std::shared_ptr<::arrow::Array> chunk; + PARQUET_THROW_NOT_OK(builder_->Finish(&chunk)); + return ::arrow::ArrayVector({chunk}); + } + + void ReadValuesDense(int64_t values_to_read) override { + auto values = ValuesHead(); + int64_t num_decoded = + current_decoder_->Decode(values, static_cast(values_to_read)); + DCHECK_EQ(num_decoded, values_to_read); + + for (int64_t i = 0; i < num_decoded; i++) { + PARQUET_THROW_NOT_OK(builder_->Append(values[i].ptr)); + } + ResetValues(); + } + + void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override { + uint8_t* valid_bits = valid_bits_->mutable_data(); + const int64_t valid_bits_offset = values_written_; + auto values = ValuesHead(); + + int64_t num_decoded = current_decoder_->DecodeSpaced( + values, static_cast(values_to_read), static_cast(null_count), + valid_bits, valid_bits_offset); + DCHECK_EQ(num_decoded, values_to_read); + + for (int64_t i = 0; i < num_decoded; i++) { + if (::arrow::BitUtil::GetBit(valid_bits, valid_bits_offset + i)) { + PARQUET_THROW_NOT_OK(builder_->Append(values[i].ptr)); + } else { + PARQUET_THROW_NOT_OK(builder_->AppendNull()); + } + } + ResetValues(); + } + + private: + std::unique_ptr<::arrow::FixedSizeBinaryBuilder> builder_; +}; + +class ByteArrayChunkedRecordReader : public TypedRecordReader { + public: + ByteArrayChunkedRecordReader(const ColumnDescriptor* descr, ::arrow::MemoryPool* pool) + : TypedRecordReader(descr, pool), builder_(nullptr) { + // ARROW-4688(wesm): Using 2^31 - 1 chunks for now + constexpr int32_t kBinaryChunksize = 2147483647; + DCHECK_EQ(descr_->physical_type(), Type::BYTE_ARRAY); + if (descr_->converted_type() == ConvertedType::UTF8) { + builder_.reset( + new ::arrow::internal::ChunkedStringBuilder(kBinaryChunksize, pool_)); + } else { + builder_.reset( + new ::arrow::internal::ChunkedBinaryBuilder(kBinaryChunksize, pool_)); + } + } + + ::arrow::ArrayVector GetBuilderChunks() override { + ::arrow::ArrayVector chunks; + PARQUET_THROW_NOT_OK(builder_->Finish(&chunks)); + return chunks; + } + + void ReadValuesDense(int64_t values_to_read) override { + int64_t num_decoded = current_decoder_->DecodeArrowNonNull( + static_cast(values_to_read), builder_.get()); + DCHECK_EQ(num_decoded, values_to_read); + ResetValues(); + } + + void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override { + int64_t num_decoded = current_decoder_->DecodeArrow( + static_cast(values_to_read), static_cast(null_count), + valid_bits_->mutable_data(), values_written_, builder_.get()); + DCHECK_EQ(num_decoded, values_to_read); + ResetValues(); + } + + private: + std::unique_ptr<::arrow::internal::ChunkedBinaryBuilder> builder_; +}; + +template +class ByteArrayDictionaryRecordReader : public TypedRecordReader { + public: + ByteArrayDictionaryRecordReader(const ColumnDescriptor* descr, + ::arrow::MemoryPool* pool) + : TypedRecordReader(descr, pool), builder_(new BuilderType(pool)) {} + + ::arrow::ArrayVector GetBuilderChunks() override { + std::shared_ptr<::arrow::Array> chunk; + PARQUET_THROW_NOT_OK(builder_->Finish(&chunk)); + return ::arrow::ArrayVector({chunk}); + } + + void ReadValuesDense(int64_t values_to_read) override { + int64_t num_decoded = current_decoder_->DecodeArrowNonNull( + static_cast(values_to_read), builder_.get()); + DCHECK_EQ(num_decoded, values_to_read); + ResetValues(); + } + + void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override { + int64_t num_decoded = current_decoder_->DecodeArrow( + static_cast(values_to_read), static_cast(null_count), + valid_bits_->mutable_data(), values_written_, builder_.get()); + DCHECK_EQ(num_decoded, values_to_read); + ResetValues(); + } + + private: + std::unique_ptr builder_; +}; + +// TODO(wesm): Implement these to some satisfaction +template <> +void TypedRecordReader::DebugPrintState() {} + +template <> +void TypedRecordReader::DebugPrintState() {} + +template <> +void TypedRecordReader::DebugPrintState() {} + +std::shared_ptr MakeByteArrayRecordReader( + const ColumnDescriptor* descr, arrow::MemoryPool* pool, bool read_dictionary) { + if (read_dictionary) { + if (descr->converted_type() == ConvertedType::UTF8) { + using Builder = ::arrow::StringDictionaryBuilder; + return std::shared_ptr( + new RecordReader(new ByteArrayDictionaryRecordReader(descr, pool))); + } else { + using Builder = ::arrow::BinaryDictionaryBuilder; + return std::shared_ptr( + new RecordReader(new ByteArrayDictionaryRecordReader(descr, pool))); + } + } else { + return std::shared_ptr( + new RecordReader(new ByteArrayChunkedRecordReader(descr, pool))); + } +} + +std::shared_ptr RecordReader::Make(const ColumnDescriptor* descr, + MemoryPool* pool, + const bool read_dictionary) { switch (descr->physical_type()) { case Type::BOOLEAN: - return std::make_shared>(descr, std::move(pager), - pool); + return std::shared_ptr( + new RecordReader(new TypedRecordReader(descr, pool))); case Type::INT32: - return std::make_shared>(descr, std::move(pager), - pool); + return std::shared_ptr( + new RecordReader(new TypedRecordReader(descr, pool))); case Type::INT64: - return std::make_shared>(descr, std::move(pager), - pool); + return std::shared_ptr( + new RecordReader(new TypedRecordReader(descr, pool))); case Type::INT96: - return std::make_shared>(descr, std::move(pager), - pool); + return std::shared_ptr( + new RecordReader(new TypedRecordReader(descr, pool))); case Type::FLOAT: - return std::make_shared>(descr, std::move(pager), - pool); + return std::shared_ptr( + new RecordReader(new TypedRecordReader(descr, pool))); case Type::DOUBLE: - return std::make_shared>(descr, std::move(pager), - pool); + return std::shared_ptr( + new RecordReader(new TypedRecordReader(descr, pool))); case Type::BYTE_ARRAY: - return std::make_shared>( - descr, std::move(pager), pool); + return MakeByteArrayRecordReader(descr, pool, read_dictionary); case Type::FIXED_LEN_BYTE_ARRAY: - return std::make_shared>(descr, std::move(pager), - pool); - default: - ParquetException::NYI("type reader not implemented"); + return std::shared_ptr( + new RecordReader(new FLBARecordReader(descr, pool))); + default: { + // PARQUET-1481: This can occur if the file is corrupt + std::stringstream ss; + ss << "Invalid physical column type: " << static_cast(descr->physical_type()); + throw ParquetException(ss.str()); + } } // Unreachable code, but supress compiler warning - return std::shared_ptr(nullptr); + return nullptr; } +} // namespace internal } // namespace parquet diff --git a/cpp/src/parquet/column_reader.h b/cpp/src/parquet/column_reader.h index 15987c5cee7..4a9257e8453 100644 --- a/cpp/src/parquet/column_reader.h +++ b/cpp/src/parquet/column_reader.h @@ -179,6 +179,106 @@ class TypedColumnReader : public ColumnReader { namespace internal { +/// \brief Stateful column reader that delimits semantic records for both flat +/// and nested columns +/// +/// \note API EXPERIMENTAL +/// \since 1.3.0 +class RecordReader { + public: + static std::shared_ptr Make( + const ColumnDescriptor* descr, + ::arrow::MemoryPool* pool = ::arrow::default_memory_pool(), + const bool read_dictionary = false); + + virtual ~RecordReader() = default; + + /// \brief Attempt to read indicated number of records from column chunk + /// \return number of records read + virtual int64_t ReadRecords(int64_t num_records) = 0; + + /// \brief Pre-allocate space for data. Results in better flat read performance + virtual void Reserve(int64_t num_values) = 0; + + /// \brief Clear consumed values and repetition/definition levels as the + /// result of calling ReadRecords + virtual void Reset() = 0; + + /// \brief Transfer filled values buffer to caller. A new one will be + /// allocated in subsequent ReadRecords calls + virtual std::shared_ptr ReleaseValues() = 0; + + /// \brief Transfer filled validity bitmap buffer to caller. A new one will + /// be allocated in subsequent ReadRecords calls + virtual std::shared_ptr ReleaseIsValid() = 0; + + /// \brief Return true if the record reader has more internal data yet to + /// process + virtual bool HasMoreData() const = 0; + + /// \brief Advance record reader to the next row group + /// \param[in] reader obtained from RowGroupReader::GetColumnPageReader + virtual void SetPageReader(std::unique_ptr reader) = 0; + + virtual void DebugPrintState() = 0; + + // For BYTE_ARRAY, FIXED_LEN_BYTE_ARRAY types that may have chunked output + virtual std::vector> GetBuilderChunks() = 0; + + /// \brief Decoded definition levels + int16_t* def_levels() const { + return reinterpret_cast(def_levels_->mutable_data()); + } + + /// \brief Decoded repetition levels + int16_t* rep_levels() const { + return reinterpret_cast(rep_levels_->mutable_data()); + } + + /// \brief Decoded values, including nulls, if any + uint8_t* values() const { return values_->mutable_data(); } + + /// \brief Number of values written including nulls (if any) + int64_t values_written() const { return values_written_; } + + /// \brief Number of definition / repetition levels (from those that have + /// been decoded) that have been consumed inside the reader. + int64_t levels_position() const { return levels_position_; } + + /// \brief Number of definition / repetition levels that have been written + /// internally in the reader + int64_t levels_written() const { return levels_written_; } + + /// \brief Number of nulls in the leaf + int64_t null_count() const { return null_count_; } + + /// \brief True if the leaf values are nullable + bool nullable_values() const { return nullable_values_; } + + protected: + bool nullable_values_; + + bool at_record_start_; + int64_t records_read_; + + int64_t values_written_; + int64_t values_capacity_; + int64_t null_count_; + + int64_t levels_written_; + int64_t levels_position_; + int64_t levels_capacity_; + + std::shared_ptr<::arrow::ResizableBuffer> values_; + // In the case of false, don't allocate the values buffer (when we directly read into + // builder classes). + bool uses_values_; + + std::shared_ptr<::arrow::ResizableBuffer> valid_bits_; + std::shared_ptr<::arrow::ResizableBuffer> def_levels_; + std::shared_ptr<::arrow::ResizableBuffer> rep_levels_; +}; + static inline void DefinitionLevelsToBitmap( const int16_t* def_levels, int64_t num_def_levels, const int16_t max_definition_level, const int16_t max_repetition_level, int64_t* values_read, int64_t* null_count, From 5eb664fe84209de4b38c2c0ca11266c76b42ff70 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Thu, 18 Jul 2019 14:58:34 -0500 Subject: [PATCH 2/2] Finish cleaning, compiles and tests pass --- cpp/src/parquet/CMakeLists.txt | 1 - cpp/src/parquet/arrow/reader.cc | 3 +- cpp/src/parquet/arrow/test-util.h | 2 +- cpp/src/parquet/column_reader.cc | 263 +++++++++++++++--------------- cpp/src/parquet/column_reader.h | 3 + 5 files changed, 133 insertions(+), 139 deletions(-) diff --git a/cpp/src/parquet/CMakeLists.txt b/cpp/src/parquet/CMakeLists.txt index cb8de1657d6..8a077985061 100644 --- a/cpp/src/parquet/CMakeLists.txt +++ b/cpp/src/parquet/CMakeLists.txt @@ -157,7 +157,6 @@ add_custom_command(OUTPUT ${THRIFT_OUTPUT_FILES} set(PARQUET_SRCS arrow/reader.cc - arrow/record_reader.cc arrow/schema.cc arrow/writer.cc bloom_filter.cc diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index 2259f7a5da5..45071b5003f 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -42,7 +42,6 @@ // For arrow::compute::Datum. This should perhaps be promoted. See ARROW-4022 #include "arrow/compute/kernel.h" -#include "parquet/arrow/record_reader.h" #include "parquet/arrow/schema.h" #include "parquet/column_reader.h" #include "parquet/exception.h" @@ -1418,7 +1417,7 @@ struct TransferFunctor< typename std::enable_if::value && (std::is_same::value || std::is_same::value)>::type> { - Status operator()(RecordReader<* reader, MemoryPool* pool, + Status operator()(RecordReader* reader, MemoryPool* pool, const std::shared_ptr<::arrow::DataType>& type, Datum* out) { DCHECK_EQ(type->id(), ::arrow::Type::DECIMAL); diff --git a/cpp/src/parquet/arrow/test-util.h b/cpp/src/parquet/arrow/test-util.h index c50dfd6fc29..8760d91f2a9 100644 --- a/cpp/src/parquet/arrow/test-util.h +++ b/cpp/src/parquet/arrow/test-util.h @@ -30,7 +30,7 @@ #include "arrow/type_traits.h" #include "arrow/util/decimal.h" -#include "parquet/arrow/record_reader.h" +#include "parquet/column_reader.h" namespace parquet { diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index bc7d255fde4..6727fe6fcd0 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -21,8 +21,10 @@ #include #include #include +#include #include "arrow/buffer.h" +#include "arrow/builder.h" #include "arrow/util/bit-stream-utils.h" #include "arrow/util/compression.h" #include "arrow/util/logging.h" @@ -30,7 +32,6 @@ #include "arrow/util/ubsan.h" #include "parquet/column_page.h" -#include "parquet/column_reader_internal.h" #include "parquet/encoding.h" #include "parquet/properties.h" #include "parquet/statistics.h" @@ -280,13 +281,15 @@ class ColumnReaderImplBase { ColumnReaderImplBase(const ColumnDescriptor* descr, ::arrow::MemoryPool* pool) : descr_(descr), - num_buffered_values_(0), - num_decoded_values_(0), max_def_level_(descr->max_definition_level()), max_rep_level_(descr->max_repetition_level()), + num_buffered_values_(0), + num_decoded_values_(0), pool_(pool), current_decoder_(nullptr) {} + virtual ~ColumnReaderImplBase() = default; + protected: // Read up to batch_size values from the current data page into the // pre-allocated memory T* @@ -303,8 +306,7 @@ class ColumnReaderImplBase { // // @returns: the number of values read into the out buffer int64_t ReadValuesSpaced(int64_t batch_size, T* out, int64_t null_count, - uint8_t* valid_bits, - int64_t valid_bits_offset) { + uint8_t* valid_bits, int64_t valid_bits_offset) { return current_decoder_->DecodeSpaced(out, static_cast(batch_size), static_cast(null_count), valid_bits, valid_bits_offset); @@ -314,13 +316,13 @@ class ColumnReaderImplBase { // // Returns the number of decoded definition levels int64_t ReadDefinitionLevels(int64_t batch_size, int16_t* levels) { - if (descr_->max_definition_level() == 0) { + if (max_def_level_ == 0) { return 0; } return definition_level_decoder_.Decode(static_cast(batch_size), levels); } - bool HasNext() override { + bool HasNextInternal() { // Either there is no data page available yet, or the data page has been // exhausted if (num_buffered_values_ == 0 || num_decoded_values_ == num_buffered_values_) { @@ -334,7 +336,7 @@ class ColumnReaderImplBase { // Read multiple repetition levels into preallocated memory // Returns the number of decoded repetition levels int64_t ReadRepetitionLevels(int64_t batch_size, int16_t* levels) { - if (descr_->max_repetition_level() == 0) { + if (max_rep_level_ == 0) { return 0; } return repetition_level_decoder_.Decode(static_cast(batch_size), levels); @@ -364,7 +366,7 @@ class ColumnReaderImplBase { // Repetition and definition levels are always encoded using RLE encoding // in the DataPageV2 format. const int64_t levels_byte_size = - InitializeLevelDecoders(*page, Encoding::RLE, Encoding::RLE); + InitializeLevelDecoders(*page, Encoding::RLE, Encoding::RLE); InitializeDataDecoder(*page, levels_byte_size); return true; } else { @@ -402,7 +404,7 @@ class ColumnReaderImplBase { std::unique_ptr> decoder = MakeDictDecoder(descr_, pool_); decoder->SetDict(dictionary.get()); decoders_[encoding] = - std::unique_ptr(dynamic_cast(decoder.release())); + std::unique_ptr(dynamic_cast(decoder.release())); } else { ParquetException::NYI("only plain dictionary encoding has been implemented"); } @@ -431,20 +433,20 @@ class ColumnReaderImplBase { // Data page Layout: Repetition Levels - Definition Levels - encoded values. // Levels are encoded as rle or bit-packed. // Init repetition levels - if (descr_->max_repetition_level() > 0) { + if (max_rep_level_ > 0) { int64_t rep_levels_bytes = repetition_level_decoder_.SetData( - repetition_level_encoding, descr_->max_repetition_level(), + repetition_level_encoding, max_rep_level_, static_cast(num_buffered_values_), buffer); buffer += rep_levels_bytes; levels_byte_size += rep_levels_bytes; } - // TODO figure a way to set max_definition_level_ to 0 + // TODO figure a way to set max_def_level_ to 0 // if the initial value is invalid // Init definition levels - if (descr_->max_definition_level() > 0) { + if (max_def_level_ > 0) { int64_t def_levels_bytes = definition_level_decoder_.SetData( - definition_level_encoding, descr_->max_definition_level(), + definition_level_encoding, max_def_level_, static_cast(num_buffered_values_), buffer); levels_byte_size += def_levels_bytes; } @@ -496,6 +498,9 @@ class ColumnReaderImplBase { } const ColumnDescriptor* descr_; + const int16_t max_def_level_; + const int16_t max_rep_level_; + std::unique_ptr pager_; std::shared_ptr current_page_; @@ -517,25 +522,16 @@ class ColumnReaderImplBase { // into memory int64_t num_decoded_values_; - const int16_t max_def_level_; - const int16_t max_rep_level_; - ::arrow::MemoryPool* pool_; - using DecoderType = TypedDecoder; + using DecoderType = typename EncodingTraits::Decoder; + DecoderType* current_decoder_; // Map of encoding type to the respective decoder object. For example, a // column chunk's data pages may include both dictionary-encoded and // plain-encoded data. std::unordered_map> decoders_; - void ConfigureDictionary(const DictionaryPage* page); - DecoderType* current_decoder_; - - int64_t available_values_current_page() const { - return num_buffered_values_ - num_decoded_values_; - } - void ConsumeBufferedValues(int64_t num_values) { num_decoded_values_ += num_values; } }; @@ -546,12 +542,16 @@ template class TypedColumnReaderImpl : public TypedColumnReader, public ColumnReaderImplBase { public: + using T = typename DType::c_type; + TypedColumnReaderImpl(const ColumnDescriptor* descr, std::unique_ptr pager, ::arrow::MemoryPool* pool) - : ColumnReaderImplBase(descr, pool) { - pager_ = std::move(pager); + : ColumnReaderImplBase(descr, pool) { + this->pager_ = std::move(pager); } + bool HasNext() override { return this->HasNextInternal(); } + int64_t ReadBatch(int64_t batch_size, int16_t* def_levels, int16_t* rep_levels, T* values, int64_t* values_read) override; @@ -562,9 +562,9 @@ class TypedColumnReaderImpl : public TypedColumnReader, int64_t Skip(int64_t num_rows_to_skip) override; - Type::type type() const override { return descr_->physical_type(); } + Type::type type() const override { return this->descr_->physical_type(); } - const ColumnDescriptor* descr() const override { return descr_; } + const ColumnDescriptor* descr() const override { return this->descr_; } }; template @@ -579,7 +579,8 @@ int64_t TypedColumnReaderImpl::ReadBatch(int64_t batch_size, int16_t* def // TODO(wesm): keep reading data pages until batch_size is reached, or the // row group is finished - batch_size = std::min(batch_size, num_buffered_values_ - num_decoded_values_); + batch_size = + std::min(batch_size, this->num_buffered_values_ - this->num_decoded_values_); int64_t num_def_levels = 0; int64_t num_rep_levels = 0; @@ -587,12 +588,12 @@ int64_t TypedColumnReaderImpl::ReadBatch(int64_t batch_size, int16_t* def int64_t values_to_read = 0; // If the field is required and non-repeated, there are no definition levels - if (descr_->max_definition_level() > 0 && def_levels) { - num_def_levels = ReadDefinitionLevels(batch_size, def_levels); + if (this->max_def_level_ > 0 && def_levels) { + num_def_levels = this->ReadDefinitionLevels(batch_size, def_levels); // TODO(wesm): this tallying of values-to-decode can be performed with better // cache-efficiency if fused with the level decoding. for (int64_t i = 0; i < num_def_levels; ++i) { - if (def_levels[i] == descr_->max_definition_level()) { + if (def_levels[i] == this->max_def_level_) { ++values_to_read; } } @@ -602,16 +603,16 @@ int64_t TypedColumnReaderImpl::ReadBatch(int64_t batch_size, int16_t* def } // Not present for non-repeated fields - if (descr_->max_repetition_level() > 0 && rep_levels) { - num_rep_levels = ReadRepetitionLevels(batch_size, rep_levels); + if (this->max_rep_level_ > 0 && rep_levels) { + num_rep_levels = this->ReadRepetitionLevels(batch_size, rep_levels); if (def_levels && num_def_levels != num_rep_levels) { throw ParquetException("Number of decoded rep / def levels did not match"); } } - *values_read = ReadValues(values_to_read, values); + *values_read = this->ReadValues(values_to_read, values); int64_t total_values = std::max(num_def_levels, *values_read); - ConsumeBufferedValues(total_values); + this->ConsumeBufferedValues(total_values); return total_values; } @@ -632,50 +633,50 @@ int64_t TypedColumnReaderImpl::ReadBatchSpaced( int64_t total_values; // TODO(wesm): keep reading data pages until batch_size is reached, or the // row group is finished - batch_size = std::min(batch_size, num_buffered_values_ - num_decoded_values_); + batch_size = + std::min(batch_size, this->num_buffered_values_ - this->num_decoded_values_); // If the field is required and non-repeated, there are no definition levels - if (descr_->max_definition_level() > 0) { - int64_t num_def_levels = ReadDefinitionLevels(batch_size, def_levels); + if (this->max_def_level_ > 0) { + int64_t num_def_levels = this->ReadDefinitionLevels(batch_size, def_levels); // Not present for non-repeated fields - if (descr_->max_repetition_level() > 0) { - int64_t num_rep_levels = ReadRepetitionLevels(batch_size, rep_levels); + if (this->max_rep_level_ > 0) { + int64_t num_rep_levels = this->ReadRepetitionLevels(batch_size, rep_levels); if (num_def_levels != num_rep_levels) { throw ParquetException("Number of decoded rep / def levels did not match"); } } - const bool has_spaced_values = internal::HasSpacedValues(descr_); + const bool has_spaced_values = internal::HasSpacedValues(this->descr_); int64_t null_count = 0; if (!has_spaced_values) { int values_to_read = 0; for (int64_t i = 0; i < num_def_levels; ++i) { - if (def_levels[i] == descr_->max_definition_level()) { + if (def_levels[i] == this->max_def_level_) { ++values_to_read; } } - total_values = ReadValues(values_to_read, values); + total_values = this->ReadValues(values_to_read, values); for (int64_t i = 0; i < total_values; i++) { ::arrow::BitUtil::SetBit(valid_bits, valid_bits_offset + i); } *values_read = total_values; } else { - int16_t max_definition_level = descr_->max_definition_level(); - int16_t max_repetition_level = descr_->max_repetition_level(); - internal::DefinitionLevelsToBitmap(def_levels, num_def_levels, max_definition_level, - max_repetition_level, values_read, &null_count, + internal::DefinitionLevelsToBitmap(def_levels, num_def_levels, this->max_def_level_, + this->max_rep_level_, values_read, &null_count, valid_bits, valid_bits_offset); - total_values = ReadValuesSpaced(*values_read, values, static_cast(null_count), - valid_bits, valid_bits_offset); + total_values = + this->ReadValuesSpaced(*values_read, values, static_cast(null_count), + valid_bits, valid_bits_offset); } *levels_read = num_def_levels; *null_count_out = null_count; } else { // Required field, read all values - total_values = ReadValues(batch_size, values); + total_values = this->ReadValues(batch_size, values); for (int64_t i = 0; i < total_values; i++) { ::arrow::BitUtil::SetBit(valid_bits, valid_bits_offset + i); } @@ -683,7 +684,7 @@ int64_t TypedColumnReaderImpl::ReadBatchSpaced( *levels_read = total_values; } - ConsumeBufferedValues(*levels_read); + this->ConsumeBufferedValues(*levels_read); return total_values; } @@ -693,26 +694,27 @@ int64_t TypedColumnReaderImpl::Skip(int64_t num_rows_to_skip) { while (HasNext() && rows_to_skip > 0) { // If the number of rows to skip is more than the number of undecoded values, skip the // Page. - if (rows_to_skip > (num_buffered_values_ - num_decoded_values_)) { - rows_to_skip -= num_buffered_values_ - num_decoded_values_; - num_decoded_values_ = num_buffered_values_; + if (rows_to_skip > (this->num_buffered_values_ - this->num_decoded_values_)) { + rows_to_skip -= this->num_buffered_values_ - this->num_decoded_values_; + this->num_decoded_values_ = this->num_buffered_values_; } else { // We need to read this Page // Jump to the right offset in the Page int64_t batch_size = 1024; // ReadBatch with a smaller memory footprint int64_t values_read = 0; - std::shared_ptr scratch = - AllocateBuffer(this->pool_, batch_size * - std::max(sizeof(int16_t), - type_traits::value_byte_size)); + // This will be enough scratch space to accommodate 16-bit levels or any + // value type + std::shared_ptr scratch = AllocateBuffer( + this->pool_, batch_size * type_traits::value_byte_size); do { batch_size = std::min(batch_size, rows_to_skip); - values_read = ReadBatch(static_cast(batch_size), - reinterpret_cast(scratch->mutable_data()), - reinterpret_cast(scratch->mutable_data()), - reinterpret_cast(scratch->mutable_data()), &values_read); + values_read = + ReadBatch(static_cast(batch_size), + reinterpret_cast(scratch->mutable_data()), + reinterpret_cast(scratch->mutable_data()), + reinterpret_cast(scratch->mutable_data()), &values_read); rows_to_skip -= values_read; } while (values_read > 0 && rows_to_skip > 0); } @@ -768,14 +770,13 @@ namespace internal { constexpr int64_t kMinLevelBatchSize = 1024; template -class TypedRecordReader : public ColumnReaderImplBase, - public RecordReader { +class TypedRecordReader : public ColumnReaderImplBase, public RecordReader { public: + using T = typename DType::c_type; using BASE = ColumnReaderImplBase; - TypedRecordReader(const ColumnDescriptor* descr, MemoryPool* pool) - : BASE(descr, pool) { + TypedRecordReader(const ColumnDescriptor* descr, MemoryPool* pool) : BASE(descr, pool) { nullable_values_ = internal::HasSpacedValues(descr); - at_record_start_(true); + at_record_start_ = true; records_read_ = 0; values_written_ = 0; values_capacity_ = 0; @@ -794,6 +795,10 @@ class TypedRecordReader : public ColumnReaderImplBase, Reset(); } + int64_t available_values_current_page() const { + return this->num_buffered_values_ - this->num_decoded_values_; + } + int64_t ReadRecords(int64_t num_records) override { // Delimit records, then read values at the end int64_t records_read = 0; @@ -809,7 +814,7 @@ class TypedRecordReader : public ColumnReaderImplBase, // enough records while (!at_record_start_ || records_read < num_records) { // Is there more data to read in this row group? - if (!HasNext()) { + if (!this->HasNextInternal()) { if (!at_record_start_) { // We ended the row group while inside a record that we haven't seen // the end of yet. So increment the record count for the last record in @@ -829,7 +834,7 @@ class TypedRecordReader : public ColumnReaderImplBase, break; } - if (max_def_level_ > 0) { + if (this->max_def_level_ > 0) { ReserveLevels(batch_size); int16_t* def_levels = this->def_levels() + levels_written_; @@ -837,13 +842,13 @@ class TypedRecordReader : public ColumnReaderImplBase, // Not present for non-repeated fields int64_t levels_read = 0; - if (max_rep_level_ > 0) { - levels_read = ReadDefinitionLevels(batch_size, def_levels); - if (ReadRepetitionLevels(batch_size, rep_levels) != levels_read) { + if (this->max_rep_level_ > 0) { + levels_read = this->ReadDefinitionLevels(batch_size, def_levels); + if (this->ReadRepetitionLevels(batch_size, rep_levels) != levels_read) { throw ParquetException("Number of decoded rep / def levels did not match"); } - } else if (max_def_level_ > 0) { - levels_read = ReadDefinitionLevels(batch_size, def_levels); + } else if (this->max_def_level_ > 0) { + levels_read = this->ReadDefinitionLevels(batch_size, def_levels); } // Exhausted column chunk @@ -870,7 +875,7 @@ class TypedRecordReader : public ColumnReaderImplBase, std::shared_ptr ReleaseValues() override { if (uses_values_) { auto result = values_; - values_ = AllocateBuffer(pool_); + values_ = AllocateBuffer(this->pool_); return result; } else { return nullptr; @@ -879,7 +884,7 @@ class TypedRecordReader : public ColumnReaderImplBase, std::shared_ptr ReleaseIsValid() override { auto result = valid_bits_; - valid_bits_ = AllocateBuffer(pool_); + valid_bits_ = AllocateBuffer(this->pool_); return result; } @@ -895,7 +900,7 @@ class TypedRecordReader : public ColumnReaderImplBase, const int16_t* def_levels = this->def_levels() + levels_position_; const int16_t* rep_levels = this->rep_levels() + levels_position_; - DCHECK_GT(max_rep_level_, 0); + DCHECK_GT(this->max_rep_level_, 0); // Count logical records and number of values to read while (levels_position_ < levels_written_) { @@ -920,7 +925,7 @@ class TypedRecordReader : public ColumnReaderImplBase, // must advance until we find another record boundary at_record_start_ = false; - if (*def_levels++ == max_def_level_) { + if (*def_levels++ == this->max_def_level_) { ++values_to_read; } ++levels_position_; @@ -929,25 +934,20 @@ class TypedRecordReader : public ColumnReaderImplBase, return records_read; } - Type::type type() const { return descr_->physical_type(); } - - const ColumnDescriptor* descr() const { return descr_; } - void Reserve(int64_t capacity) override { ReserveLevels(capacity); ReserveValues(capacity); } void ReserveLevels(int64_t capacity) { - if (descr_->max_definition_level() > 0 && - (levels_written_ + capacity > levels_capacity_)) { + if (this->max_def_level_ > 0 && (levels_written_ + capacity > levels_capacity_)) { int64_t new_levels_capacity = BitUtil::NextPower2(levels_capacity_ + 1); while (levels_written_ + capacity > new_levels_capacity) { new_levels_capacity = BitUtil::NextPower2(new_levels_capacity + 1); } PARQUET_THROW_NOT_OK( def_levels_->Resize(new_levels_capacity * sizeof(int16_t), false)); - if (descr_->max_repetition_level() > 0) { + if (this->max_rep_level_ > 0) { PARQUET_THROW_NOT_OK( rep_levels_->Resize(new_levels_capacity * sizeof(int16_t), false)); } @@ -962,7 +962,7 @@ class TypedRecordReader : public ColumnReaderImplBase, new_values_capacity = BitUtil::NextPower2(new_values_capacity + 1); } - int type_size = GetTypeByteSize(descr_->physical_type()); + int type_size = GetTypeByteSize(this->descr_->physical_type()); // XXX(wesm): A hack to avoid memory allocation when reading directly // into builder classes @@ -999,7 +999,7 @@ class TypedRecordReader : public ColumnReaderImplBase, PARQUET_THROW_NOT_OK( def_levels_->Resize(levels_remaining * sizeof(int16_t), false)); - if (max_rep_level_ > 0) { + if (this->max_rep_level_ > 0) { std::copy(rep_data + levels_position_, rep_data + levels_written_, rep_data); PARQUET_THROW_NOT_OK( rep_levels_->Resize(levels_remaining * sizeof(int16_t), false)); @@ -1017,20 +1017,20 @@ class TypedRecordReader : public ColumnReaderImplBase, void SetPageReader(std::unique_ptr reader) override { at_record_start_ = true; - pager_ = std::move(reader); + this->pager_ = std::move(reader); ResetDecoders(); } - bool HasMoreData() const override { return pager_ != nullptr; } + bool HasMoreData() const override { return this->pager_ != nullptr; } // Dictionary decoders must be reset when advancing row groups - void ResetDecoders() override { decoders_.clear(); } + void ResetDecoders() { this->decoders_.clear(); } virtual void ReadValuesSpaced(int64_t values_with_nulls, int64_t null_count) { uint8_t* valid_bits = valid_bits_->mutable_data(); const int64_t valid_bits_offset = values_written_; - int64_t num_decoded = current_decoder_->DecodeSpaced( + int64_t num_decoded = this->current_decoder_->DecodeSpaced( ValuesHead(), static_cast(values_with_nulls), static_cast(null_count), valid_bits, valid_bits_offset); DCHECK_EQ(num_decoded, values_with_nulls); @@ -1038,12 +1038,12 @@ class TypedRecordReader : public ColumnReaderImplBase, virtual void ReadValuesDense(int64_t values_to_read) { int64_t num_decoded = - current_decoder_->Decode(ValuesHead(), static_cast(values_to_read)); + this->current_decoder_->Decode(ValuesHead(), static_cast(values_to_read)); DCHECK_EQ(num_decoded, values_to_read); } // Return number of logical records read - int64_t ReadRecordData(int64_t num_records) override { + int64_t ReadRecordData(int64_t num_records) { // Conservative upper bound const int64_t possible_num_values = std::max(num_records, levels_written_ - levels_position_); @@ -1053,9 +1053,9 @@ class TypedRecordReader : public ColumnReaderImplBase, int64_t values_to_read = 0; int64_t records_read = 0; - if (max_rep_level_ > 0) { + if (this->max_rep_level_ > 0) { records_read = DelimitRecords(num_records, &values_to_read); - } else if (max_def_level_ > 0) { + } else if (this->max_def_level_ > 0) { // No repetition levels, skip delimiting logic. Each level represents a // null or not null entry records_read = std::min(levels_written_ - levels_position_, num_records); @@ -1071,14 +1071,14 @@ class TypedRecordReader : public ColumnReaderImplBase, int64_t values_with_nulls = 0; internal::DefinitionLevelsToBitmap( def_levels() + start_levels_position, levels_position_ - start_levels_position, - max_def_level_, max_rep_level_, &values_with_nulls, &null_count, + this->max_def_level_, this->max_rep_level_, &values_with_nulls, &null_count, valid_bits_->mutable_data(), values_written_); values_to_read = values_with_nulls - null_count; ReadValuesSpaced(values_with_nulls, null_count); - ConsumeBufferedValues(levels_position_ - start_levels_position); + this->ConsumeBufferedValues(levels_position_ - start_levels_position); } else { ReadValuesDense(values_to_read); - ConsumeBufferedValues(values_to_read); + this->ConsumeBufferedValues(values_to_read); } // Total values, including null spaces, if any values_written_ += values_to_read + null_count; @@ -1092,7 +1092,7 @@ class TypedRecordReader : public ColumnReaderImplBase, const int16_t* rep_levels = this->rep_levels(); const int64_t total_levels_read = levels_position_; - const T* values = reinterpret_cast(this->values()); + const T* vals = reinterpret_cast(this->values()); std::cout << "def levels: "; for (int64_t i = 0; i < total_levels_read; ++i) { @@ -1108,7 +1108,7 @@ class TypedRecordReader : public ColumnReaderImplBase, std::cout << "values: "; for (int64_t i = 0; i < this->values_written(); ++i) { - std::cout << values[i] << " "; + std::cout << vals[i] << " "; } std::cout << std::endl; } @@ -1144,7 +1144,7 @@ class FLBARecordReader : public TypedRecordReader { DCHECK_EQ(descr_->physical_type(), Type::FIXED_LEN_BYTE_ARRAY); int byte_width = descr_->type_length(); std::shared_ptr<::arrow::DataType> type = ::arrow::fixed_size_binary(byte_width); - builder_.reset(new ::arrow::FixedSizeBinaryBuilder(type, pool_)); + builder_.reset(new ::arrow::FixedSizeBinaryBuilder(type, this->pool_)); } ::arrow::ArrayVector GetBuilderChunks() override { @@ -1156,7 +1156,7 @@ class FLBARecordReader : public TypedRecordReader { void ReadValuesDense(int64_t values_to_read) override { auto values = ValuesHead(); int64_t num_decoded = - current_decoder_->Decode(values, static_cast(values_to_read)); + this->current_decoder_->Decode(values, static_cast(values_to_read)); DCHECK_EQ(num_decoded, values_to_read); for (int64_t i = 0; i < num_decoded; i++) { @@ -1170,7 +1170,7 @@ class FLBARecordReader : public TypedRecordReader { const int64_t valid_bits_offset = values_written_; auto values = ValuesHead(); - int64_t num_decoded = current_decoder_->DecodeSpaced( + int64_t num_decoded = this->current_decoder_->DecodeSpaced( values, static_cast(values_to_read), static_cast(null_count), valid_bits, valid_bits_offset); DCHECK_EQ(num_decoded, values_to_read); @@ -1191,6 +1191,8 @@ class FLBARecordReader : public TypedRecordReader { class ByteArrayChunkedRecordReader : public TypedRecordReader { public: + using BuilderType = ::arrow::internal::ChunkedBinaryBuilder; + ByteArrayChunkedRecordReader(const ColumnDescriptor* descr, ::arrow::MemoryPool* pool) : TypedRecordReader(descr, pool), builder_(nullptr) { // ARROW-4688(wesm): Using 2^31 - 1 chunks for now @@ -1198,10 +1200,10 @@ class ByteArrayChunkedRecordReader : public TypedRecordReader { DCHECK_EQ(descr_->physical_type(), Type::BYTE_ARRAY); if (descr_->converted_type() == ConvertedType::UTF8) { builder_.reset( - new ::arrow::internal::ChunkedStringBuilder(kBinaryChunksize, pool_)); + new ::arrow::internal::ChunkedStringBuilder(kBinaryChunksize, this->pool_)); } else { builder_.reset( - new ::arrow::internal::ChunkedBinaryBuilder(kBinaryChunksize, pool_)); + new ::arrow::internal::ChunkedBinaryBuilder(kBinaryChunksize, this->pool_)); } } @@ -1212,14 +1214,14 @@ class ByteArrayChunkedRecordReader : public TypedRecordReader { } void ReadValuesDense(int64_t values_to_read) override { - int64_t num_decoded = current_decoder_->DecodeArrowNonNull( + int64_t num_decoded = this->current_decoder_->DecodeArrowNonNull( static_cast(values_to_read), builder_.get()); DCHECK_EQ(num_decoded, values_to_read); ResetValues(); } void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override { - int64_t num_decoded = current_decoder_->DecodeArrow( + int64_t num_decoded = this->current_decoder_->DecodeArrow( static_cast(values_to_read), static_cast(null_count), valid_bits_->mutable_data(), values_written_, builder_.get()); DCHECK_EQ(num_decoded, values_to_read); @@ -1227,7 +1229,7 @@ class ByteArrayChunkedRecordReader : public TypedRecordReader { } private: - std::unique_ptr<::arrow::internal::ChunkedBinaryBuilder> builder_; + std::unique_ptr builder_; }; template @@ -1244,14 +1246,14 @@ class ByteArrayDictionaryRecordReader : public TypedRecordReader } void ReadValuesDense(int64_t values_to_read) override { - int64_t num_decoded = current_decoder_->DecodeArrowNonNull( + int64_t num_decoded = this->current_decoder_->DecodeArrowNonNull( static_cast(values_to_read), builder_.get()); DCHECK_EQ(num_decoded, values_to_read); ResetValues(); } void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override { - int64_t num_decoded = current_decoder_->DecodeArrow( + int64_t num_decoded = this->current_decoder_->DecodeArrow( static_cast(values_to_read), static_cast(null_count), valid_bits_->mutable_data(), values_written_, builder_.get()); DCHECK_EQ(num_decoded, values_to_read); @@ -1272,21 +1274,19 @@ void TypedRecordReader::DebugPrintState() {} template <> void TypedRecordReader::DebugPrintState() {} -std::shared_ptr MakeByteArrayRecordReader( - const ColumnDescriptor* descr, arrow::MemoryPool* pool, bool read_dictionary) { +std::shared_ptr MakeByteArrayRecordReader(const ColumnDescriptor* descr, + arrow::MemoryPool* pool, + bool read_dictionary) { if (read_dictionary) { if (descr->converted_type() == ConvertedType::UTF8) { using Builder = ::arrow::StringDictionaryBuilder; - return std::shared_ptr( - new RecordReader(new ByteArrayDictionaryRecordReader(descr, pool))); + return std::make_shared>(descr, pool); } else { using Builder = ::arrow::BinaryDictionaryBuilder; - return std::shared_ptr( - new RecordReader(new ByteArrayDictionaryRecordReader(descr, pool))); + return std::make_shared>(descr, pool); } } else { - return std::shared_ptr( - new RecordReader(new ByteArrayChunkedRecordReader(descr, pool))); + return std::make_shared(descr, pool); } } @@ -1295,28 +1295,21 @@ std::shared_ptr RecordReader::Make(const ColumnDescriptor* descr, const bool read_dictionary) { switch (descr->physical_type()) { case Type::BOOLEAN: - return std::shared_ptr( - new RecordReader(new TypedRecordReader(descr, pool))); + return std::make_shared>(descr, pool); case Type::INT32: - return std::shared_ptr( - new RecordReader(new TypedRecordReader(descr, pool))); + return std::make_shared>(descr, pool); case Type::INT64: - return std::shared_ptr( - new RecordReader(new TypedRecordReader(descr, pool))); + return std::make_shared>(descr, pool); case Type::INT96: - return std::shared_ptr( - new RecordReader(new TypedRecordReader(descr, pool))); + return std::make_shared>(descr, pool); case Type::FLOAT: - return std::shared_ptr( - new RecordReader(new TypedRecordReader(descr, pool))); + return std::make_shared>(descr, pool); case Type::DOUBLE: - return std::shared_ptr( - new RecordReader(new TypedRecordReader(descr, pool))); + return std::make_shared>(descr, pool); case Type::BYTE_ARRAY: return MakeByteArrayRecordReader(descr, pool, read_dictionary); case Type::FIXED_LEN_BYTE_ARRAY: - return std::shared_ptr( - new RecordReader(new FLBARecordReader(descr, pool))); + return std::make_shared(descr, pool); default: { // PARQUET-1481: This can occur if the file is corrupt std::stringstream ss; diff --git a/cpp/src/parquet/column_reader.h b/cpp/src/parquet/column_reader.h index 4a9257e8453..461cf726733 100644 --- a/cpp/src/parquet/column_reader.h +++ b/cpp/src/parquet/column_reader.h @@ -22,6 +22,7 @@ #include #include #include +#include #include "parquet/encoding.h" #include "parquet/exception.h" @@ -31,6 +32,8 @@ namespace arrow { +class Array; + namespace BitUtil { class BitReader; } // namespace BitUtil