diff --git a/cpp/cmake_modules/SetupCxxFlags.cmake b/cpp/cmake_modules/SetupCxxFlags.cmake index a3d4062f0eb..1606199c406 100644 --- a/cpp/cmake_modules/SetupCxxFlags.cmake +++ b/cpp/cmake_modules/SetupCxxFlags.cmake @@ -68,10 +68,10 @@ if(ARROW_CPU_FLAG STREQUAL "x86") add_definitions(-DARROW_HAVE_RUNTIME_SSE4_2) endif() if(CXX_SUPPORTS_AVX2 AND ARROW_RUNTIME_SIMD_LEVEL MATCHES "^(AVX2|AVX512|MAX)$") - add_definitions(-DARROW_HAVE_RUNTIME_AVX2) + add_definitions(-DARROW_HAVE_RUNTIME_AVX2 -DARROW_HAVE_RUNTIME_BMI2) endif() if(CXX_SUPPORTS_AVX512 AND ARROW_RUNTIME_SIMD_LEVEL MATCHES "^(AVX512|MAX)$") - add_definitions(-DARROW_HAVE_RUNTIME_AVX512) + add_definitions(-DARROW_HAVE_RUNTIME_AVX512 -DARROW_HAVE_RUNTIME_BMI2) endif() elseif(ARROW_CPU_FLAG STREQUAL "ppc") # power compiler flags, gcc/clang only diff --git a/cpp/src/arrow/buffer.cc b/cpp/src/arrow/buffer.cc index 17884db9476..9215d9ab544 100644 --- a/cpp/src/arrow/buffer.cc +++ b/cpp/src/arrow/buffer.cc @@ -206,7 +206,7 @@ class PoolBuffer : public ResizableBuffer { } Status Resize(const int64_t new_size, bool shrink_to_fit = true) override { - if (new_size < 0) { + if (ARROW_PREDICT_FALSE(new_size < 0)) { return Status::Invalid("Negative buffer resize: ", new_size); } if (mutable_data_ && shrink_to_fit && new_size <= size_) { diff --git a/cpp/src/arrow/util/cpu_info.h b/cpp/src/arrow/util/cpu_info.h index 73695b7742e..a57ffd29467 100644 --- a/cpp/src/arrow/util/cpu_info.h +++ b/cpp/src/arrow/util/cpu_info.h @@ -106,6 +106,11 @@ class ARROW_EXPORT CpuInfo { /// Returns the vendor of the cpu. Vendor vendor() const { return vendor_; } + bool HasEfficientBmi2() const { + // BMI2 (pext, pdep) is only efficient on Intel X86 processors. + return vendor() == Vendor::Intel && IsSupported(BMI2); + } + private: CpuInfo(); diff --git a/cpp/src/arrow/util/simd.h b/cpp/src/arrow/util/simd.h index 84c93a825cf..259641dd456 100644 --- a/cpp/src/arrow/util/simd.h +++ b/cpp/src/arrow/util/simd.h @@ -29,6 +29,10 @@ #else // gcc/clang (possibly others) +#if defined(ARROW_HAVE_BMI2) +#include +#endif + #if defined(ARROW_HAVE_AVX2) || defined(ARROW_HAVE_AVX512) #include #elif defined(ARROW_HAVE_SSE4_2) diff --git a/cpp/src/parquet/CMakeLists.txt b/cpp/src/parquet/CMakeLists.txt index b70fe6b168d..22ad69219a3 100644 --- a/cpp/src/parquet/CMakeLists.txt +++ b/cpp/src/parquet/CMakeLists.txt @@ -188,6 +188,7 @@ set(PARQUET_SRCS file_writer.cc internal_file_decryptor.cc internal_file_encryptor.cc + level_comparison.cc level_conversion.cc metadata.cc murmur3.cc @@ -202,6 +203,27 @@ set(PARQUET_SRCS stream_writer.cc types.cc) +if(CXX_SUPPORTS_AVX2) + # AVX2 is used as a proxy for BMI2. + list(APPEND PARQUET_SRCS level_comparison_avx2.cc level_conversion_bmi2.cc) + set_source_files_properties(level_comparison_avx2.cc + PROPERTIES + SKIP_PRECOMPILE_HEADERS + ON + COMPILE_FLAGS + "${ARROW_AVX2_FLAG}") + # WARNING: DO NOT BLINDLY COPY THIS CODE FOR OTHER BMI2 USE CASES. + # This code is always guarded by runtime dispatch which verifies + # BMI2 is present. For a very small number of CPUs AVX2 does not + # imply BMI2. + set_source_files_properties(level_conversion_bmi2.cc + PROPERTIES + SKIP_PRECOMPILE_HEADERS + ON + COMPILE_FLAGS + "${ARROW_AVX2_FLAG} -DARROW_HAVE_BMI2 -mbmi2") +endif() + if(PARQUET_REQUIRE_ENCRYPTION) set(PARQUET_SRCS ${PARQUET_SRCS} encryption_internal.cc) else() diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index 476d82f7fac..188bc8c178a 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -416,7 +416,6 @@ void DoSimpleRoundtrip(const std::shared_ptr& table, bool use_threads, ::arrow::default_memory_pool(), &reader)); reader->set_use_threads(use_threads); - if (column_subset.size() > 0) { ASSERT_OK_NO_THROW(reader->ReadTable(column_subset, out)); } else { @@ -2361,8 +2360,7 @@ TEST(ArrowReadWrite, SingleColumnNullableStruct) { 3); } -// Disabled until implementation can be finished. -TEST(TestArrowReadWrite, DISABLED_CanonicalNestedRoundTrip) { +TEST(TestArrowReadWrite, CanonicalNestedRoundTrip) { auto doc_id = field("DocId", ::arrow::int64(), /*nullable=*/false); auto links = field( "Links", @@ -2391,7 +2389,7 @@ TEST(TestArrowReadWrite, DISABLED_CanonicalNestedRoundTrip) { // string literals implemented properly auto name_array = ::arrow::ArrayFromJSON( name->type(), - "([[{\"Language\": [{\"Code\": \"en_us\", \"Country\":\"us\"}," + "[[{\"Language\": [{\"Code\": \"en_us\", \"Country\":\"us\"}," "{\"Code\": \"en_us\", \"Country\": null}]," "\"Url\": \"http://A\"}," "{\"Url\": \"http://B\"}," @@ -2810,12 +2808,6 @@ TEST_F(TestNestedSchemaRead, ReadTablePartial) { ASSERT_NO_FATAL_FAILURE(ValidateTableArrayTypes(*table)); } -TEST_F(TestNestedSchemaRead, StructAndListTogetherUnsupported) { - ASSERT_NO_FATAL_FAILURE(CreateSimpleNestedParquet(Repetition::REPEATED)); - std::shared_ptr
table; - ASSERT_RAISES(NotImplemented, reader_->ReadTable(&table)); -} - TEST_P(TestNestedSchemaRead, DeepNestedSchemaRead) { #ifdef PARQUET_VALGRIND const int num_trees = 3; @@ -2994,7 +2986,6 @@ TEST_P(TestArrowReaderAdHocSparkAndHvr, ReadDecimals) { ASSERT_OK(builder.Append(value)); } ASSERT_OK(builder.Finish(&expected_array)); - AssertArraysEqual(*expected_array, *chunk); } diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index 5f13259058d..8b3bfcb5704 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -43,6 +43,7 @@ #include "parquet/schema.h" using arrow::Array; +using arrow::ArrayData; using arrow::BooleanArray; using arrow::ChunkedArray; using arrow::DataType; @@ -59,10 +60,6 @@ using arrow::Table; using arrow::TimestampArray; using arrow::internal::Iota; -using parquet::schema::GroupNode; -using parquet::schema::Node; -using parquet::schema::PrimitiveNode; - // Help reduce verbosity using ParquetReader = parquet::ParquetFileReader; @@ -77,18 +74,47 @@ using parquet::internal::RecordReader; namespace parquet { namespace arrow { +namespace { + +::arrow::Result> ChunksToSingle(const ChunkedArray& chunked) { + switch (chunked.num_chunks()) { + case 0: { + ARROW_ASSIGN_OR_RAISE(std::shared_ptr array, + ::arrow::MakeArrayOfNull(chunked.type(), 0)); + return array->data(); + } + case 1: + return chunked.chunk(0)->data(); + default: + // ARROW-3762(wesm): If item reader yields a chunked array, we reject as + // this is not yet implemented + return Status::NotImplemented( + "Nested data conversions not implemented for chunked array outputs"); + } +} + +} // namespace class ColumnReaderImpl : public ColumnReader { public: - enum ReaderType { PRIMITIVE, LIST, STRUCT }; - virtual Status GetDefLevels(const int16_t** data, int64_t* length) = 0; virtual Status GetRepLevels(const int16_t** data, int64_t* length) = 0; virtual const std::shared_ptr field() = 0; + ::arrow::Status NextBatch(int64_t batch_size, + std::shared_ptr<::arrow::ChunkedArray>* out) final { + RETURN_NOT_OK(LoadBatch(batch_size)); + RETURN_NOT_OK(BuildArray(batch_size, out)); + for (int x = 0; x < (*out)->num_chunks(); x++) { + RETURN_NOT_OK((*out)->chunk(x)->Validate()); + } + return Status::OK(); + } - virtual const ColumnDescriptor* descr() const = 0; + virtual ::arrow::Status LoadBatch(int64_t num_records) = 0; - virtual ReaderType type() const = 0; + virtual ::arrow::Status BuildArray(int64_t length_upper_bound, + std::shared_ptr<::arrow::ChunkedArray>* out) = 0; + virtual bool IsOrHasRepeatedChild() const = 0; }; std::shared_ptr> VectorToSharedSet( @@ -373,30 +399,34 @@ class RowGroupReaderImpl : public RowGroupReader { class LeafReader : public ColumnReaderImpl { public: LeafReader(std::shared_ptr ctx, std::shared_ptr field, - std::unique_ptr input) + std::unique_ptr input, + ::parquet::internal::LevelInfo leaf_info) : ctx_(std::move(ctx)), field_(std::move(field)), input_(std::move(input)), descr_(input_->descr()) { record_reader_ = RecordReader::Make( - descr_, ctx_->pool, field_->type()->id() == ::arrow::Type::DICTIONARY); + descr_, leaf_info, ctx_->pool, field_->type()->id() == ::arrow::Type::DICTIONARY); NextRowGroup(); } - Status GetDefLevels(const int16_t** data, int64_t* length) override { + Status GetDefLevels(const int16_t** data, int64_t* length) final { *data = record_reader_->def_levels(); *length = record_reader_->levels_position(); return Status::OK(); } - Status GetRepLevels(const int16_t** data, int64_t* length) override { + Status GetRepLevels(const int16_t** data, int64_t* length) final { *data = record_reader_->rep_levels(); *length = record_reader_->levels_position(); return Status::OK(); } - Status NextBatch(int64_t records_to_read, std::shared_ptr* out) override { + bool IsOrHasRepeatedChild() const final { return false; } + + Status LoadBatch(int64_t records_to_read) final { BEGIN_PARQUET_CATCH_EXCEPTIONS + out_ = nullptr; record_reader_->Reset(); // Pre-allocation gives much better performance for flat columns record_reader_->Reserve(records_to_read); @@ -411,17 +441,21 @@ class LeafReader : public ColumnReaderImpl { } } RETURN_NOT_OK(TransferColumnData(record_reader_.get(), field_->type(), descr_, - ctx_->pool, out)); + ctx_->pool, &out_)); return Status::OK(); END_PARQUET_CATCH_EXCEPTIONS } - const std::shared_ptr field() override { return field_; } - const ColumnDescriptor* descr() const override { return descr_; } + ::arrow::Status BuildArray(int64_t length_upper_bound, + std::shared_ptr<::arrow::ChunkedArray>* out) final { + *out = out_; + return Status::OK(); + } - ReaderType type() const override { return PRIMITIVE; } + const std::shared_ptr field() override { return field_; } private: + std::shared_ptr out_; void NextRowGroup() { std::unique_ptr page_reader = input_->NextChunk(); record_reader_->SetPageReader(std::move(page_reader)); @@ -434,16 +468,16 @@ class LeafReader : public ColumnReaderImpl { std::shared_ptr record_reader_; }; -class NestedListReader : public ColumnReaderImpl { +template +class ListReader : public ColumnReaderImpl { public: - NestedListReader(std::shared_ptr ctx, std::shared_ptr field, - int16_t max_definition_level, int16_t max_repetition_level, - std::unique_ptr item_reader) + ListReader(std::shared_ptr ctx, std::shared_ptr field, + ::parquet::internal::LevelInfo level_info, + std::unique_ptr child_reader) : ctx_(std::move(ctx)), field_(std::move(field)), - max_definition_level_(max_definition_level), - max_repetition_level_(max_repetition_level), - item_reader_(std::move(item_reader)) {} + level_info_(level_info), + item_reader_(std::move(child_reader)) {} Status GetDefLevels(const int16_t** data, int64_t* length) override { return item_reader_->GetDefLevels(data, length); @@ -453,229 +487,211 @@ class NestedListReader : public ColumnReaderImpl { return item_reader_->GetRepLevels(data, length); } - Status NextBatch(int64_t records_to_read, std::shared_ptr* out) override { - if (item_reader_->type() == ColumnReaderImpl::STRUCT) { - return Status::Invalid("Mix of struct and list types not yet supported"); - } + bool IsOrHasRepeatedChild() const final { return true; } - RETURN_NOT_OK(item_reader_->NextBatch(records_to_read, out)); - - std::shared_ptr item_chunk; - switch ((*out)->num_chunks()) { - case 0: { - ARROW_ASSIGN_OR_RAISE(item_chunk, ::arrow::MakeArrayOfNull((*out)->type(), 0)); - break; - } - case 1: - item_chunk = (*out)->chunk(0); - break; - default: - // ARROW-3762(wesm): If item reader yields a chunked array, we reject as - // this is not yet implemented - return Status::NotImplemented( - "Nested data conversions not implemented for chunked array outputs"); - } + Status LoadBatch(int64_t number_of_records) final { + return item_reader_->LoadBatch(number_of_records); + } + Status BuildArray(int64_t length_upper_bound, + std::shared_ptr* out) override { const int16_t* def_levels; const int16_t* rep_levels; int64_t num_levels; RETURN_NOT_OK(item_reader_->GetDefLevels(&def_levels, &num_levels)); RETURN_NOT_OK(item_reader_->GetRepLevels(&rep_levels, &num_levels)); - std::shared_ptr result; - RETURN_NOT_OK(ReconstructNestedList(item_chunk, field_, max_definition_level_, - max_repetition_level_, def_levels, rep_levels, - num_levels, ctx_->pool, &result)); + std::shared_ptr validity_buffer; + ::parquet::internal::ValidityBitmapInputOutput validity_io; + validity_io.values_read_upper_bound = length_upper_bound; + if (field_->nullable()) { + ARROW_ASSIGN_OR_RAISE( + validity_buffer, + AllocateResizableBuffer(BitUtil::BytesForBits(length_upper_bound), ctx_->pool)); + validity_io.valid_bits = validity_buffer->mutable_data(); + } + ARROW_ASSIGN_OR_RAISE( + std::shared_ptr offsets_buffer, + AllocateResizableBuffer( + sizeof(IndexType) * std::max(int64_t{1}, length_upper_bound + 1), + ctx_->pool)); + // Ensure zero initialization in case we have reached a zero length list (and + // because first entry is always zero). + IndexType* offset_data = reinterpret_cast(offsets_buffer->mutable_data()); + offset_data[0] = 0; + BEGIN_PARQUET_CATCH_EXCEPTIONS + ::parquet::internal::DefRepLevelsToList(def_levels, rep_levels, num_levels, + level_info_, &validity_io, offset_data); + END_PARQUET_CATCH_EXCEPTIONS + + RETURN_NOT_OK(item_reader_->BuildArray(offset_data[validity_io.values_read], out)); + + // Resize to actual number of elements returned. + RETURN_NOT_OK( + offsets_buffer->Resize((validity_io.values_read + 1) * sizeof(IndexType))); + if (validity_buffer != nullptr) { + RETURN_NOT_OK( + validity_buffer->Resize(BitUtil::BytesForBits(validity_io.values_read))); + } + ARROW_ASSIGN_OR_RAISE(std::shared_ptr item_chunk, ChunksToSingle(**out)); + + std::vector> buffers{ + validity_io.null_count > 0 ? validity_buffer : nullptr, offsets_buffer}; + auto data = std::make_shared( + field_->type(), + /*length=*/validity_io.values_read, std::move(buffers), + std::vector>{item_chunk}, validity_io.null_count); + + std::shared_ptr result = ::arrow::MakeArray(data); *out = std::make_shared(result); return Status::OK(); } const std::shared_ptr field() override { return field_; } - const ColumnDescriptor* descr() const override { return nullptr; } - - ReaderType type() const override { return LIST; } - private: std::shared_ptr ctx_; std::shared_ptr field_; - int16_t max_definition_level_; - int16_t max_repetition_level_; + ::parquet::internal::LevelInfo level_info_; std::unique_ptr item_reader_; }; class PARQUET_NO_EXPORT StructReader : public ColumnReaderImpl { public: explicit StructReader(std::shared_ptr ctx, - const SchemaField& schema_field, std::shared_ptr filtered_field, - std::vector>&& children) + ::parquet::internal::LevelInfo level_info, + std::vector> children) : ctx_(std::move(ctx)), - schema_field_(schema_field), filtered_field_(std::move(filtered_field)), - struct_def_level_(schema_field.level_info.def_level), - children_(std::move(children)) {} + level_info_(level_info), + children_(std::move(children)) { + // There could be a mix of children some might be repeated some might not be. + // If possible use one that isn't since that will be guaranteed to have the least + // number of levels to reconstruct a nullable bitmap. + auto result = std::find_if(children_.begin(), children_.end(), + [](const std::unique_ptr& child) { + return !child->IsOrHasRepeatedChild(); + }); + if (result != children_.end()) { + def_rep_level_child_ = result->get(); + has_repeated_child_ = false; + } else if (!children_.empty()) { + def_rep_level_child_ = children_.front().get(); + has_repeated_child_ = true; + } + } - Status NextBatch(int64_t records_to_read, std::shared_ptr* out) override; + bool IsOrHasRepeatedChild() const final { return has_repeated_child_; } + + Status LoadBatch(int64_t records_to_read) override { + for (const std::unique_ptr& reader : children_) { + RETURN_NOT_OK(reader->LoadBatch(records_to_read)); + } + return Status::OK(); + } + Status BuildArray(int64_t length_upper_bound, + std::shared_ptr* out) override; Status GetDefLevels(const int16_t** data, int64_t* length) override; Status GetRepLevels(const int16_t** data, int64_t* length) override; const std::shared_ptr field() override { return filtered_field_; } - const ColumnDescriptor* descr() const override { return nullptr; } - ReaderType type() const override { return STRUCT; } private: - std::shared_ptr ctx_; - SchemaField schema_field_; - std::shared_ptr filtered_field_; - int16_t struct_def_level_; - std::vector> children_; - std::shared_ptr def_levels_buffer_; - Status DefLevelsToNullArray(std::shared_ptr* null_bitmap, int64_t* null_count); + const std::shared_ptr ctx_; + const std::shared_ptr filtered_field_; + const ::parquet::internal::LevelInfo level_info_; + const std::vector> children_; + ColumnReaderImpl* def_rep_level_child_ = nullptr; + bool has_repeated_child_; }; -Status StructReader::DefLevelsToNullArray(std::shared_ptr* null_bitmap_out, - int64_t* null_count_out) { - auto null_count = 0; - const int16_t* def_levels_data; - int64_t def_levels_length; - RETURN_NOT_OK(GetDefLevels(&def_levels_data, &def_levels_length)); - ARROW_ASSIGN_OR_RAISE(auto null_bitmap, - AllocateEmptyBitmap(def_levels_length, ctx_->pool)); - uint8_t* null_bitmap_ptr = null_bitmap->mutable_data(); - for (int64_t i = 0; i < def_levels_length; i++) { - if (def_levels_data[i] < struct_def_level_) { - // Mark null - null_count += 1; - } else { - DCHECK_EQ(def_levels_data[i], struct_def_level_); - ::arrow::BitUtil::SetBit(null_bitmap_ptr, i); - } - } - - *null_count_out = null_count; - *null_bitmap_out = (null_count == 0) ? nullptr : null_bitmap; - return Status::OK(); -} - -// TODO(itaiin): Consider caching the results of this calculation - -// note that this is only used once for each read for now Status StructReader::GetDefLevels(const int16_t** data, int64_t* length) { *data = nullptr; if (children_.size() == 0) { - // Empty struct *length = 0; - return Status::OK(); + return Status::Invalid("StructReader had no children"); } - // We have at least one child - const int16_t* child_def_levels; - int64_t child_length = 0; - bool found_nullable_child = false; - int16_t* result_levels = nullptr; + // This method should only be called when this struct or one of its parents + // are optional/repeated or it has a repeated child. + // Meaning all children must have rep/def levels associated + // with them. + RETURN_NOT_OK(def_rep_level_child_->GetDefLevels(data, length)); + return Status::OK(); +} - int child_index = 0; - while (child_index < static_cast(children_.size())) { - if (!children_[child_index]->field()->nullable()) { - ++child_index; - continue; - } - RETURN_NOT_OK(children_[child_index]->GetDefLevels(&child_def_levels, &child_length)); - auto size = child_length * sizeof(int16_t); - ARROW_ASSIGN_OR_RAISE(def_levels_buffer_, AllocateResizableBuffer(size, ctx_->pool)); - // Initialize with the minimal def level - std::memset(def_levels_buffer_->mutable_data(), -1, size); - result_levels = reinterpret_cast(def_levels_buffer_->mutable_data()); - found_nullable_child = true; - break; - } - - if (!found_nullable_child) { - *data = nullptr; +Status StructReader::GetRepLevels(const int16_t** data, int64_t* length) { + *data = nullptr; + if (children_.size() == 0) { *length = 0; - return Status::OK(); + return Status::Invalid("StructReader had no childre"); } - // Look at the rest of the children - - // When a struct is defined, all of its children def levels are at least at - // nesting level, and def level equals nesting level. - // When a struct is not defined, all of its children def levels are less than - // the nesting level, and the def level equals max(children def levels) - // All other possibilities are malformed definition data. - for (; child_index < static_cast(children_.size()); ++child_index) { - // Child is non-nullable, and therefore has no definition levels - if (!children_[child_index]->field()->nullable()) { - continue; - } - - auto& child = children_[child_index]; - int64_t current_child_length; - RETURN_NOT_OK(child->GetDefLevels(&child_def_levels, ¤t_child_length)); - - if (child_length != current_child_length) { - std::stringstream ss; - ss << "Parquet struct decoding error. Expected to decode " << child_length - << " definition levels" - << " from child field \"" << child->field()->ToString() << "\" in parent \"" - << this->field()->ToString() << "\" but was only able to decode " - << current_child_length; - return Status::IOError(ss.str()); - } - - DCHECK_EQ(child_length, current_child_length); - for (int64_t i = 0; i < child_length; i++) { - // Check that value is either uninitialized, or current - // and previous children def levels agree on the struct level - DCHECK((result_levels[i] == -1) || ((result_levels[i] >= struct_def_level_) == - (child_def_levels[i] >= struct_def_level_))); - result_levels[i] = - std::max(result_levels[i], std::min(child_def_levels[i], struct_def_level_)); - } - } - *data = reinterpret_cast(def_levels_buffer_->data()); - *length = static_cast(child_length); + // This method should only be called when this struct or one of its parents + // are optional/repeated or it has repeated child. + // Meaning all children must have rep/def levels associated + // with them. + RETURN_NOT_OK(def_rep_level_child_->GetRepLevels(data, length)); return Status::OK(); } -Status StructReader::GetRepLevels(const int16_t** data, int64_t* length) { - return Status::NotImplemented("GetRepLevels is not implemented for struct"); -} +Status StructReader::BuildArray(int64_t length_upper_bound, + std::shared_ptr* out) { + std::vector> children_array_data; + std::shared_ptr null_bitmap; -Status StructReader::NextBatch(int64_t records_to_read, - std::shared_ptr* out) { - std::vector> children_arrays; - std::shared_ptr null_bitmap; - int64_t null_count; + ::parquet::internal::ValidityBitmapInputOutput validity_io; + validity_io.values_read_upper_bound = length_upper_bound; + // This simplifies accounting below. + validity_io.values_read = length_upper_bound; + + BEGIN_PARQUET_CATCH_EXCEPTIONS + const int16_t* def_levels; + const int16_t* rep_levels; + int64_t num_levels; + + if (has_repeated_child_) { + ARROW_ASSIGN_OR_RAISE( + null_bitmap, + AllocateResizableBuffer(BitUtil::BytesForBits(length_upper_bound), ctx_->pool)); + validity_io.valid_bits = null_bitmap->mutable_data(); + RETURN_NOT_OK(GetDefLevels(&def_levels, &num_levels)); + RETURN_NOT_OK(GetRepLevels(&rep_levels, &num_levels)); + DefRepLevelsToBitmap(def_levels, rep_levels, num_levels, level_info_, &validity_io); + } else if (filtered_field_->nullable()) { + ARROW_ASSIGN_OR_RAISE( + null_bitmap, + AllocateResizableBuffer(BitUtil::BytesForBits(length_upper_bound), ctx_->pool)); + validity_io.valid_bits = null_bitmap->mutable_data(); + RETURN_NOT_OK(GetDefLevels(&def_levels, &num_levels)); + DefLevelsToBitmap(def_levels, num_levels, level_info_, &validity_io); + } + + // Ensure all values are initialized. + if (null_bitmap) { + RETURN_NOT_OK(null_bitmap->Resize(BitUtil::BytesForBits(validity_io.values_read))); + } + END_PARQUET_CATCH_EXCEPTIONS // Gather children arrays and def levels for (auto& child : children_) { - if (child->type() == ColumnReaderImpl::LIST) { - return Status::NotImplemented( - "Reading structs of lists from Parquet files not yet supported: ", - field()->ToString()); - } - std::shared_ptr field; - RETURN_NOT_OK(child->NextBatch(records_to_read, &field)); - - if (field->num_chunks() > 1) { - return Status::Invalid("Chunked field reads not yet supported with StructArray"); - } - children_arrays.push_back(field->chunk(0)); + RETURN_NOT_OK(child->BuildArray(validity_io.values_read, &field)); + ARROW_ASSIGN_OR_RAISE(std::shared_ptr array_data, ChunksToSingle(*field)); + children_array_data.push_back(std::move(array_data)); } - RETURN_NOT_OK(DefLevelsToNullArray(&null_bitmap, &null_count)); - - int64_t struct_length = children_arrays[0]->length(); - for (size_t i = 1; i < children_arrays.size(); ++i) { - if (children_arrays[i]->length() != struct_length) { - // TODO(wesm): This should really only occur if the Parquet file is - // malformed. Should this be a DCHECK? - return Status::Invalid("Struct children had different lengths"); - } + if (!filtered_field_->nullable() && !has_repeated_child_) { + validity_io.values_read = children_array_data.front()->length; } - auto result = std::make_shared(field()->type(), struct_length, - children_arrays, null_bitmap, null_count); + std::vector> buffers{validity_io.null_count > 0 ? null_bitmap + : nullptr}; + auto data = + std::make_shared(filtered_field_->type(), + /*length=*/validity_io.values_read, std::move(buffers), + std::move(children_array_data)); + std::shared_ptr result = ::arrow::MakeArray(data); + *out = std::make_shared(result); return Status::OK(); } @@ -692,40 +708,28 @@ Status GetReader(const SchemaField& field, const std::shared_ptr& if (!field.is_leaf()) { return Status::Invalid("Parquet non-leaf node has no children"); } + if (!ctx->IncludesLeaf(field.column_index)) { + *out = nullptr; + return Status::OK(); + } std::unique_ptr input( ctx->iterator_factory(field.column_index, ctx->reader)); - out->reset(new LeafReader(ctx, field.field, std::move(input))); + out->reset(new LeafReader(ctx, field.field, std::move(input), field.level_info)); } else if (type_id == ::arrow::Type::LIST) { - // We can only read lists-of-lists or structs at the moment auto list_field = field.field; auto child = &field.children[0]; - while (child->field->type()->id() == ::arrow::Type::LIST) { - child = &child->children[0]; - } - if (child->field->type()->id() == ::arrow::Type::STRUCT) { - return Status::NotImplemented( - "Reading lists of structs from Parquet files " - "not yet supported: ", - field.field->ToString()); - } - if (!ctx->IncludesLeaf(child->column_index)) { + std::unique_ptr child_reader; + RETURN_NOT_OK(GetReader(*child, ctx, &child_reader)); + if (child_reader == nullptr) { *out = nullptr; return Status::OK(); } - std::unique_ptr child_reader; - RETURN_NOT_OK(GetReader(*child, ctx, &child_reader)); - // Use the max definition/repetition level of the leaf here - out->reset(new NestedListReader(ctx, list_field, child->level_info.def_level, - child->level_info.rep_level, - std::move(child_reader))); + out->reset(new ListReader(ctx, list_field, field.level_info, + std::move(child_reader))); } else if (type_id == ::arrow::Type::STRUCT) { std::vector> child_fields; std::vector> child_readers; for (const auto& child : field.children) { - if (child.is_leaf() && !ctx->IncludesLeaf(child.column_index)) { - // Excluded leaf - continue; - } std::unique_ptr child_reader; RETURN_NOT_OK(GetReader(child, ctx, &child_reader)); if (!child_reader) { @@ -742,7 +746,8 @@ Status GetReader(const SchemaField& field, const std::shared_ptr& auto filtered_field = ::arrow::field(field.field->name(), ::arrow::struct_(child_fields), field.field->nullable(), field.field->metadata()); - out->reset(new StructReader(ctx, field, filtered_field, std::move(child_readers))); + out->reset(new StructReader(ctx, filtered_field, field.level_info, + std::move(child_readers))); } else { return Status::Invalid("Unsupported nested type: ", field.field->ToString()); } diff --git a/cpp/src/parquet/arrow/reader_internal.cc b/cpp/src/parquet/arrow/reader_internal.cc index 903cbabaae2..7ab401bd9cf 100644 --- a/cpp/src/parquet/arrow/reader_internal.cc +++ b/cpp/src/parquet/arrow/reader_internal.cc @@ -795,118 +795,5 @@ Status TransferColumnData(RecordReader* reader, std::shared_ptr value_ return Status::OK(); } -Status ReconstructNestedList(const std::shared_ptr& arr, - std::shared_ptr field, int16_t max_def_level, - int16_t max_rep_level, const int16_t* def_levels, - const int16_t* rep_levels, int64_t total_levels, - ::arrow::MemoryPool* pool, std::shared_ptr* out) { - // Walk downwards to extract nullability - std::vector item_names; - std::vector nullable; - std::vector> field_metadata; - std::vector> offset_builders; - std::vector> valid_bits_builders; - nullable.push_back(field->nullable()); - while (field->type()->num_fields() > 0) { - if (field->type()->num_fields() > 1) { - return Status::NotImplemented("Fields with more than one child are not supported."); - } else { - if (field->type()->id() != ::arrow::Type::LIST) { - return Status::NotImplemented("Currently only nesting with Lists is supported."); - } - field = field->type()->field(0); - } - item_names.push_back(field->name()); - offset_builders.emplace_back( - std::make_shared<::arrow::Int32Builder>(::arrow::int32(), pool)); - valid_bits_builders.emplace_back( - std::make_shared<::arrow::BooleanBuilder>(::arrow::boolean(), pool)); - nullable.push_back(field->nullable()); - field_metadata.push_back(field->metadata()); - } - - int64_t list_depth = offset_builders.size(); - // This describes the minimal definition that describes a level that - // reflects a value in the primitive values array. - int16_t values_def_level = max_def_level; - if (nullable[nullable.size() - 1]) { - values_def_level--; - } - - // The definition levels that are needed so that a list is declared - // as empty and not null. - std::vector empty_def_level(list_depth); - int def_level = 0; - for (int i = 0; i < list_depth; i++) { - if (nullable[i]) { - def_level++; - } - empty_def_level[i] = static_cast(def_level); - def_level++; - } - - int32_t values_offset = 0; - std::vector null_counts(list_depth, 0); - for (int64_t i = 0; i < total_levels; i++) { - int16_t rep_level = rep_levels[i]; - if (rep_level < max_rep_level) { - for (int64_t j = rep_level; j < list_depth; j++) { - if (j == (list_depth - 1)) { - RETURN_NOT_OK(offset_builders[j]->Append(values_offset)); - } else { - RETURN_NOT_OK(offset_builders[j]->Append( - static_cast(offset_builders[j + 1]->length()))); - } - - if (((empty_def_level[j] - 1) == def_levels[i]) && (nullable[j])) { - RETURN_NOT_OK(valid_bits_builders[j]->Append(false)); - null_counts[j]++; - break; - } else { - RETURN_NOT_OK(valid_bits_builders[j]->Append(true)); - if (empty_def_level[j] == def_levels[i]) { - break; - } - } - } - } - if (def_levels[i] >= values_def_level) { - values_offset++; - } - } - // Add the final offset to all lists - for (int64_t j = 0; j < list_depth; j++) { - if (j == (list_depth - 1)) { - RETURN_NOT_OK(offset_builders[j]->Append(values_offset)); - } else { - RETURN_NOT_OK(offset_builders[j]->Append( - static_cast(offset_builders[j + 1]->length()))); - } - } - - std::vector> offsets; - std::vector> valid_bits; - std::vector list_lengths; - for (int64_t j = 0; j < list_depth; j++) { - list_lengths.push_back(offset_builders[j]->length() - 1); - std::shared_ptr array; - RETURN_NOT_OK(offset_builders[j]->Finish(&array)); - offsets.emplace_back(std::static_pointer_cast(array)->values()); - RETURN_NOT_OK(valid_bits_builders[j]->Finish(&array)); - valid_bits.emplace_back(std::static_pointer_cast(array)->values()); - } - - *out = arr; - - // TODO(wesm): Use passed-in field - for (int64_t j = list_depth - 1; j >= 0; j--) { - auto list_type = ::arrow::list(::arrow::field(item_names[j], (*out)->type(), - nullable[j + 1], field_metadata[j])); - *out = std::make_shared<::arrow::ListArray>(list_type, list_lengths[j], offsets[j], - *out, valid_bits[j], null_counts[j]); - } - return Status::OK(); -} - } // namespace arrow } // namespace parquet diff --git a/cpp/src/parquet/arrow/reader_internal.h b/cpp/src/parquet/arrow/reader_internal.h index 62eb166da01..8dd439f4fbb 100644 --- a/cpp/src/parquet/arrow/reader_internal.h +++ b/cpp/src/parquet/arrow/reader_internal.h @@ -104,13 +104,6 @@ Status TransferColumnData(::parquet::internal::RecordReader* reader, const ColumnDescriptor* descr, ::arrow::MemoryPool* pool, std::shared_ptr<::arrow::ChunkedArray>* out); -Status ReconstructNestedList(const std::shared_ptr<::arrow::Array>& arr, - std::shared_ptr<::arrow::Field> field, int16_t max_def_level, - int16_t max_rep_level, const int16_t* def_levels, - const int16_t* rep_levels, int64_t total_levels, - ::arrow::MemoryPool* pool, - std::shared_ptr<::arrow::Array>* out); - struct ReaderContext { ParquetFileReader* reader; ::arrow::MemoryPool* pool; diff --git a/cpp/src/parquet/arrow/reconstruct_internal_test.cc b/cpp/src/parquet/arrow/reconstruct_internal_test.cc index 6faa5a5c70a..495b69f9eab 100644 --- a/cpp/src/parquet/arrow/reconstruct_internal_test.cc +++ b/cpp/src/parquet/arrow/reconstruct_internal_test.cc @@ -40,15 +40,6 @@ #include "parquet/file_writer.h" #include "parquet/properties.h" -// Set to 1 to see failures in failing tests -#define RUN_FAILING_TESTS 0 - -#if RUN_FAILING_TESTS -#define FAILING(test_name) test_name -#else -#define FAILING(test_name) DISABLED_##test_name -#endif - using arrow::Array; using arrow::ArrayFromJSON; using arrow::AssertArraysEqual; @@ -316,7 +307,7 @@ TEST_F(TestReconstructColumn, PrimitiveRequired) { AssertReconstruct(*expected, def_levels, rep_levels, values); } -TEST_F(TestReconstructColumn, FAILING(PrimitiveRepeated)) { +TEST_F(TestReconstructColumn, PrimitiveRepeated) { // Arrow schema: list(int32 not null) not null this->SetParquetSchema( PrimitiveNode::Make("node_name", Repetition::REPEATED, ParquetType::INT32)); @@ -349,7 +340,7 @@ TEST_F(TestReconstructColumn, NestedRequiredRequired) { AssertReconstruct(*expected, def_levels, rep_levels, values); } -TEST_F(TestReconstructColumn, FAILING(NestedOptionalRequired)) { +TEST_F(TestReconstructColumn, NestedOptionalRequired) { // Arrow schema: struct(a: int32 not null) SetParquetSchema(GroupNode::Make( "parent", Repetition::OPTIONAL, @@ -419,7 +410,7 @@ TEST_F(TestReconstructColumn, NestedRequiredRequiredRequired) { AssertReconstruct(*expected, def_levels, rep_levels, values); } -TEST_F(TestReconstructColumn, FAILING(NestedRequiredOptionalRequired)) { +TEST_F(TestReconstructColumn, NestedRequiredOptionalRequired) { // Arrow schema: struct(a: struct(b: int32 not null)) not null SetParquetSchema(GroupNode::Make( "parent", Repetition::REQUIRED, @@ -440,7 +431,7 @@ TEST_F(TestReconstructColumn, FAILING(NestedRequiredOptionalRequired)) { AssertReconstruct(*expected, def_levels, rep_levels, values); } -TEST_F(TestReconstructColumn, FAILING(NestedOptionalRequiredOptional)) { +TEST_F(TestReconstructColumn, NestedOptionalRequiredOptional) { // Arrow schema: struct(a: struct(b: int32) not null) SetParquetSchema(GroupNode::Make( "parent", Repetition::OPTIONAL, @@ -526,7 +517,7 @@ TEST_F(TestReconstructColumn, NestedTwoFields2) { CheckColumn(/*column_index=*/0, *expected); } -TEST_F(TestReconstructColumn, FAILING(NestedTwoFields3)) { +TEST_F(TestReconstructColumn, NestedTwoFields3) { // Arrow schema: struct(a: int32 not null, b: int64 not null) SetParquetSchema(GroupNode::Make( "parent", Repetition::OPTIONAL, @@ -847,7 +838,7 @@ TEST_F(TestReconstructColumn, ThreeLevelListOptionalOptional) { // Legacy list encodings // -TEST_F(TestReconstructColumn, FAILING(TwoLevelListRequired)) { +TEST_F(TestReconstructColumn, TwoLevelListRequired) { // Arrow schema: list(int32 not null) not null SetParquetSchema(GroupNode::Make( "parent", Repetition::REQUIRED, @@ -860,11 +851,11 @@ TEST_F(TestReconstructColumn, FAILING(TwoLevelListRequired)) { // TODO should field name "element" (Parquet convention for List nodes) // be changed to "item" (Arrow convention for List types)? - auto expected = ArrayFromJSON(List(int32()), "[[], [4, 5], [6]]"); + auto expected = ArrayFromJSON(List(int32(), /*nullable=*/false), "[[], [4, 5], [6]]"); AssertReconstruct(*expected, def_levels, rep_levels, values); } -TEST_F(TestReconstructColumn, FAILING(TwoLevelListOptional)) { +TEST_F(TestReconstructColumn, TwoLevelListOptional) { // Arrow schema: list(int32 not null) SetParquetSchema(GroupNode::Make( "parent", Repetition::OPTIONAL, @@ -884,12 +875,12 @@ TEST_F(TestReconstructColumn, FAILING(TwoLevelListOptional)) { // List-in-struct // -TEST_F(TestReconstructColumn, FAILING(NestedList1)) { +TEST_F(TestReconstructColumn, NestedList1) { // Arrow schema: struct(a: list(int32 not null) not null) not null SetParquetSchema(GroupNode::Make( "a", Repetition::REQUIRED, {GroupNode::Make( - "parent", Repetition::REQUIRED, + "p", Repetition::REQUIRED, {GroupNode::Make("list", Repetition::REPEATED, {PrimitiveNode::Make("element", Repetition::REQUIRED, ParquetType::INT32)})}, @@ -899,20 +890,20 @@ TEST_F(TestReconstructColumn, FAILING(NestedList1)) { LevelVector rep_levels = {0, 0, 1, 0}; std::vector values = {4, 5, 6}; - auto type = OneFieldStruct("a", List(int32(), /*nullable=*/false), + auto type = OneFieldStruct("p", List(int32(), /*nullable=*/false), /*nullable=*/false); - auto expected = ArrayFromJSON(type, R"([{"a": []}, - {"a": [4, 5]}, - {"a": [6]}])"); + auto expected = ArrayFromJSON(type, R"([{"p": []}, + {"p": [4, 5]}, + {"p": [6]}])"); AssertReconstruct(*expected, def_levels, rep_levels, values); } -TEST_F(TestReconstructColumn, FAILING(NestedList2)) { +TEST_F(TestReconstructColumn, NestedList2) { // Arrow schema: struct(a: list(int32 not null) not null) SetParquetSchema(GroupNode::Make( "a", Repetition::OPTIONAL, {GroupNode::Make( - "parent", Repetition::REQUIRED, + "p", Repetition::REQUIRED, {GroupNode::Make("list", Repetition::REPEATED, {PrimitiveNode::Make("element", Repetition::REQUIRED, ParquetType::INT32)})}, @@ -922,21 +913,21 @@ TEST_F(TestReconstructColumn, FAILING(NestedList2)) { LevelVector rep_levels = {0, 0, 0, 1, 0}; std::vector values = {4, 5, 6}; - auto type = OneFieldStruct("a", List(int32(), /*nullable=*/false), + auto type = OneFieldStruct("p", List(int32(), /*nullable=*/false), /*nullable=*/false); auto expected = ArrayFromJSON(type, R"([null, - {"a": []}, - {"a": [4, 5]}, - {"a": [6]}])"); + {"p": []}, + {"p": [4, 5]}, + {"p": [6]}])"); AssertReconstruct(*expected, def_levels, rep_levels, values); } -TEST_F(TestReconstructColumn, FAILING(NestedList3)) { +TEST_F(TestReconstructColumn, NestedList3) { // Arrow schema: struct(a: list(int32 not null)) not null SetParquetSchema(GroupNode::Make( - "a", Repetition::REQUIRED, + "a", Repetition::REQUIRED, // column name (column a is a struct of) {GroupNode::Make( - "parent", Repetition::OPTIONAL, + "p", Repetition::OPTIONAL, // name in struct {GroupNode::Make("list", Repetition::REPEATED, {PrimitiveNode::Make("element", Repetition::REQUIRED, ParquetType::INT32)})}, @@ -946,20 +937,20 @@ TEST_F(TestReconstructColumn, FAILING(NestedList3)) { LevelVector rep_levels = {0, 0, 0, 1, 0}; std::vector values = {4, 5, 6}; - auto type = OneFieldStruct("a", List(int32())); - auto expected = ArrayFromJSON(type, R"([{"a": null}, - {"a": []}, - {"a": [4, 5]}, - {"a": [6]}])"); + auto type = OneFieldStruct("p", List(int32(), /*nullable=*/false)); + auto expected = ArrayFromJSON(type, R"([{"p": null}, + {"p": []}, + {"p": [4, 5]}, + {"p": [6]}])"); AssertReconstruct(*expected, def_levels, rep_levels, values); } -TEST_F(TestReconstructColumn, FAILING(NestedList4)) { +TEST_F(TestReconstructColumn, NestedList4) { // Arrow schema: struct(a: list(int32 not null)) SetParquetSchema(GroupNode::Make( "a", Repetition::OPTIONAL, {GroupNode::Make( - "parent", Repetition::OPTIONAL, + "p", Repetition::OPTIONAL, {GroupNode::Make("list", Repetition::REPEATED, {PrimitiveNode::Make("element", Repetition::REQUIRED, ParquetType::INT32)})}, @@ -969,21 +960,21 @@ TEST_F(TestReconstructColumn, FAILING(NestedList4)) { LevelVector rep_levels = {0, 0, 0, 0, 1, 0}; std::vector values = {4, 5, 6}; - auto type = OneFieldStruct("a", List(int32())); + auto type = OneFieldStruct("p", List(int32(), /*nullable=*/false)); auto expected = ArrayFromJSON(type, R"([null, - {"a": null}, - {"a": []}, - {"a": [4, 5]}, - {"a": [6]}])"); + {"p": null}, + {"p": []}, + {"p": [4, 5]}, + {"p": [6]}])"); AssertReconstruct(*expected, def_levels, rep_levels, values); } -TEST_F(TestReconstructColumn, FAILING(NestedList5)) { +TEST_F(TestReconstructColumn, NestedList5) { // Arrow schema: struct(a: list(int32) not null) SetParquetSchema(GroupNode::Make( "a", Repetition::OPTIONAL, {GroupNode::Make( - "parent", Repetition::REQUIRED, + "p", Repetition::REQUIRED, {GroupNode::Make("list", Repetition::REPEATED, {PrimitiveNode::Make("element", Repetition::OPTIONAL, ParquetType::INT32)})}, @@ -993,20 +984,20 @@ TEST_F(TestReconstructColumn, FAILING(NestedList5)) { LevelVector rep_levels = {0, 0, 0, 1, 0, 1}; std::vector values = {4, 5, 6}; - auto type = OneFieldStruct("a", List(int32()), /*nullable=*/false); + auto type = OneFieldStruct("p", List(int32()), /*nullable=*/false); auto expected = ArrayFromJSON(type, R"([null, - {"a": []}, - {"a": [4, null]}, - {"a": [5, 6]}])"); + {"p": []}, + {"p": [4, null]}, + {"p": [5, 6]}])"); AssertReconstruct(*expected, def_levels, rep_levels, values); } -TEST_F(TestReconstructColumn, FAILING(NestedList6)) { +TEST_F(TestReconstructColumn, NestedList6) { // Arrow schema: struct(a: list(int32)) SetParquetSchema(GroupNode::Make( "a", Repetition::OPTIONAL, {GroupNode::Make( - "parent", Repetition::OPTIONAL, + "p", Repetition::OPTIONAL, {GroupNode::Make("list", Repetition::REPEATED, {PrimitiveNode::Make("element", Repetition::OPTIONAL, ParquetType::INT32)})}, @@ -1016,12 +1007,12 @@ TEST_F(TestReconstructColumn, FAILING(NestedList6)) { LevelVector rep_levels = {0, 0, 0, 0, 1, 0, 1}; std::vector values = {4, 5, 6}; - auto type = OneFieldStruct("a", List(int32())); + auto type = OneFieldStruct("p", List(int32())); auto expected = ArrayFromJSON(type, R"([null, - {"a": null}, - {"a": []}, - {"a": [4, null]}, - {"a": [5, 6]}])"); + {"p": null}, + {"p": []}, + {"p": [4, null]}, + {"p": [5, 6]}])"); AssertReconstruct(*expected, def_levels, rep_levels, values); } @@ -1029,7 +1020,7 @@ TEST_F(TestReconstructColumn, FAILING(NestedList6)) { // Struct-in-list // -TEST_F(TestReconstructColumn, FAILING(ListNested1)) { +TEST_F(TestReconstructColumn, ListNested1) { // Arrow schema: list(struct(a: int32 not null) not null) not null SetParquetSchema(GroupNode::Make( "parent", Repetition::REQUIRED, @@ -1052,7 +1043,7 @@ TEST_F(TestReconstructColumn, FAILING(ListNested1)) { AssertReconstruct(*expected, def_levels, rep_levels, values); } -TEST_F(TestReconstructColumn, FAILING(ListNested2)) { +TEST_F(TestReconstructColumn, ListNested2) { // Arrow schema: list(struct(a: int32 not null) not null) SetParquetSchema(GroupNode::Make( "parent", Repetition::OPTIONAL, @@ -1076,7 +1067,7 @@ TEST_F(TestReconstructColumn, FAILING(ListNested2)) { AssertReconstruct(*expected, def_levels, rep_levels, values); } -TEST_F(TestReconstructColumn, FAILING(ListNested3)) { +TEST_F(TestReconstructColumn, ListNested3) { // Arrow schema: list(struct(a: int32 not null)) not null SetParquetSchema(GroupNode::Make( "parent", Repetition::REQUIRED, @@ -1098,7 +1089,7 @@ TEST_F(TestReconstructColumn, FAILING(ListNested3)) { AssertReconstruct(*expected, def_levels, rep_levels, values); } -TEST_F(TestReconstructColumn, FAILING(ListNested4)) { +TEST_F(TestReconstructColumn, ListNested4) { // Arrow schema: list(struct(a: int32 not null)) SetParquetSchema(GroupNode::Make( "parent", Repetition::OPTIONAL, @@ -1121,7 +1112,7 @@ TEST_F(TestReconstructColumn, FAILING(ListNested4)) { AssertReconstruct(*expected, def_levels, rep_levels, values); } -TEST_F(TestReconstructColumn, FAILING(ListNested5)) { +TEST_F(TestReconstructColumn, ListNested5) { // Arrow schema: list(struct(a: int32) not null) SetParquetSchema(GroupNode::Make( "parent", Repetition::OPTIONAL, @@ -1145,7 +1136,7 @@ TEST_F(TestReconstructColumn, FAILING(ListNested5)) { AssertReconstruct(*expected, def_levels, rep_levels, values); } -TEST_F(TestReconstructColumn, FAILING(ListNested6)) { +TEST_F(TestReconstructColumn, ListNested6) { // Arrow schema: list(struct(a: int32)) SetParquetSchema(GroupNode::Make( "parent", Repetition::OPTIONAL, @@ -1172,7 +1163,7 @@ TEST_F(TestReconstructColumn, FAILING(ListNested6)) { // Struct (two fields)-in-list // -TEST_F(TestReconstructColumn, FAILING(ListNestedTwoFields1)) { +TEST_F(TestReconstructColumn, ListNestedTwoFields1) { // Arrow schema: list(struct(a: int32 not null, // b: int64 not null) not null) not null SetParquetSchema(GroupNode::Make( @@ -1202,7 +1193,7 @@ TEST_F(TestReconstructColumn, FAILING(ListNestedTwoFields1)) { CheckColumn(/*column_index=*/0, *expected); } -TEST_F(TestReconstructColumn, FAILING(ListNestedTwoFields2)) { +TEST_F(TestReconstructColumn, ListNestedTwoFields2) { // Arrow schema: list(struct(a: int32, // b: int64 not null) not null) not null SetParquetSchema(GroupNode::Make( @@ -1232,7 +1223,7 @@ TEST_F(TestReconstructColumn, FAILING(ListNestedTwoFields2)) { CheckColumn(/*column_index=*/0, *expected); } -TEST_F(TestReconstructColumn, FAILING(ListNestedTwoFields3)) { +TEST_F(TestReconstructColumn, ListNestedTwoFields3) { // Arrow schema: list(struct(a: int32 not null, // b: int64 not null)) not null SetParquetSchema(GroupNode::Make( @@ -1261,7 +1252,7 @@ TEST_F(TestReconstructColumn, FAILING(ListNestedTwoFields3)) { CheckColumn(/*column_index=*/0, *expected); } -TEST_F(TestReconstructColumn, FAILING(ListNestedTwoFields4)) { +TEST_F(TestReconstructColumn, ListNestedTwoFields4) { // Arrow schema: list(struct(a: int32, // b: int64 not null) not null) SetParquetSchema(GroupNode::Make( @@ -1292,7 +1283,7 @@ TEST_F(TestReconstructColumn, FAILING(ListNestedTwoFields4)) { CheckColumn(/*column_index=*/0, *expected); } -TEST_F(TestReconstructColumn, FAILING(ListNestedTwoFields5)) { +TEST_F(TestReconstructColumn, ListNestedTwoFields5) { // Arrow schema: list(struct(a: int32, // b: int64 not null)) SetParquetSchema(GroupNode::Make( @@ -1313,8 +1304,7 @@ TEST_F(TestReconstructColumn, FAILING(ListNestedTwoFields5)) { Int64Vector{7, 8})); auto type = - List(struct_({field("a", int32()), field("b", int64(), /*nullable=*/false)}), - /*nullable=*/false); + List(struct_({field("a", int32()), field("b", int64(), /*nullable=*/false)})); auto expected = ArrayFromJSON(type, R"([null, [], @@ -1323,7 +1313,7 @@ TEST_F(TestReconstructColumn, FAILING(ListNestedTwoFields5)) { CheckColumn(/*column_index=*/0, *expected); } -TEST_F(TestReconstructColumn, FAILING(ListNestedTwoFields6)) { +TEST_F(TestReconstructColumn, ListNestedTwoFields6) { // Arrow schema: list(struct(a: int32, // b: int64)) SetParquetSchema(GroupNode::Make( @@ -1343,9 +1333,7 @@ TEST_F(TestReconstructColumn, FAILING(ListNestedTwoFields6)) { ASSERT_OK(WriteInt64Column(DefLevels{0, 1, 3, 2, 4}, RepLevels{0, 0, 0, 1, 0}, Int64Vector{7})); - auto type = - List(struct_({field("a", int32()), field("b", int64(), /*nullable=*/false)}), - /*nullable=*/false); + auto type = List(struct_({field("a", int32()), field("b", int64())})); auto expected = ArrayFromJSON(type, R"([null, [], @@ -1358,7 +1346,7 @@ TEST_F(TestReconstructColumn, FAILING(ListNestedTwoFields6)) { // List-in-struct (two fields) // -TEST_F(TestReconstructColumn, FAILING(NestedTwoFieldsList1)) { +TEST_F(TestReconstructColumn, NestedTwoFieldsList1) { // Arrow schema: struct(a: int64 not null, // b: list(int32 not null) not null // ) not null @@ -1388,7 +1376,7 @@ TEST_F(TestReconstructColumn, FAILING(NestedTwoFieldsList1)) { CheckColumn(/*column_index=*/0, *expected); } -TEST_F(TestReconstructColumn, FAILING(NestedTwoFieldsList2)) { +TEST_F(TestReconstructColumn, NestedTwoFieldsList2) { // Arrow schema: struct(a: int64 not null, // b: list(int32 not null) // ) not null @@ -1418,7 +1406,7 @@ TEST_F(TestReconstructColumn, FAILING(NestedTwoFieldsList2)) { CheckColumn(/*column_index=*/0, *expected); } -TEST_F(TestReconstructColumn, FAILING(NestedTwoFieldsList3)) { +TEST_F(TestReconstructColumn, NestedTwoFieldsList3) { // Arrow schema: struct(a: int64, // b: list(int32 not null) // ) not null @@ -1448,7 +1436,7 @@ TEST_F(TestReconstructColumn, FAILING(NestedTwoFieldsList3)) { CheckColumn(/*column_index=*/0, *expected); } -TEST_F(TestReconstructColumn, FAILING(NestedTwoFieldsList4)) { +TEST_F(TestReconstructColumn, NestedTwoFieldsList4) { // Arrow schema: struct(a: int64, // b: list(int32 not null) // ) @@ -1480,7 +1468,7 @@ TEST_F(TestReconstructColumn, FAILING(NestedTwoFieldsList4)) { CheckColumn(/*column_index=*/0, *expected); } -TEST_F(TestReconstructColumn, FAILING(NestedTwoFieldsList5)) { +TEST_F(TestReconstructColumn, NestedTwoFieldsList5) { // Arrow schema: struct(a: int64, b: list(int32)) SetParquetSchema(GroupNode::Make( "parent", Repetition::OPTIONAL, @@ -1499,8 +1487,7 @@ TEST_F(TestReconstructColumn, FAILING(NestedTwoFieldsList5)) { ASSERT_OK(WriteInt32Column(DefLevels{0, 1, 2, 4, 3, 4}, RepLevels{0, 0, 0, 0, 1, 0}, Int32Vector{7, 8})); - auto type = - struct_({field("a", int64()), field("b", List(int32(), /*nullable=*/false))}); + auto type = struct_({field("a", int64()), field("b", List(int32()))}); auto expected = ArrayFromJSON(type, R"([null, {"a": 4, "b": null}, diff --git a/cpp/src/parquet/arrow/schema.cc b/cpp/src/parquet/arrow/schema.cc index a25704a7115..8b2faddf2b4 100644 --- a/cpp/src/parquet/arrow/schema.cc +++ b/cpp/src/parquet/arrow/schema.cc @@ -447,7 +447,6 @@ Status GroupToStruct(const GroupNode& node, LevelInfo current_levels, SchemaField* out) { std::vector> arrow_fields; out->children.resize(node.field_count()); - // All level increments for the node are expected to happen by callers. // This is required because repeated elements need to have there own // SchemaField. diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index 672b6e3708c..2d64422fa4b 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -42,6 +42,7 @@ #include "parquet/encoding.h" #include "parquet/encryption_internal.h" #include "parquet/internal_file_decryptor.h" +#include "parquet/level_comparison.h" #include "parquet/level_conversion.h" #include "parquet/properties.h" #include "parquet/statistics.h" @@ -55,6 +56,25 @@ using arrow::internal::checked_cast; using arrow::internal::MultiplyWithOverflow; namespace parquet { +namespace { +inline bool HasSpacedValues(const ColumnDescriptor* descr) { + if (descr->max_repetition_level() > 0) { + // repeated+flat case + return !descr->schema_node()->is_required(); + } else { + // non-repeated+nested case + // Find if a node forces nulls in the lowest level along the hierarchy + const schema::Node* node = descr->schema_node().get(); + while (node) { + if (node->is_optional()) { + return true; + } + node = node->parent(); + } + return false; + } +} +} // namespace LevelDecoder::LevelDecoder() : num_values_remaining_(0) {} @@ -63,6 +83,7 @@ LevelDecoder::~LevelDecoder() {} int LevelDecoder::SetData(Encoding::type encoding, int16_t max_level, int num_buffered_values, const uint8_t* data, int32_t data_size) { + max_level_ = max_level; int32_t num_bytes = 0; encoding_ = encoding; num_values_remaining_ = num_buffered_values; @@ -110,6 +131,7 @@ int LevelDecoder::SetData(Encoding::type encoding, int16_t max_level, void LevelDecoder::SetDataV2(int32_t num_bytes, int16_t max_level, int num_buffered_values, const uint8_t* data) { + max_level_ = max_level; // Repetition and definition levels always uses RLE encoding // in the DataPageV2 format. if (num_bytes < 0) { @@ -135,6 +157,15 @@ int LevelDecoder::Decode(int batch_size, int16_t* levels) { } else { num_decoded = bit_packed_decoder_->GetBatch(bit_width_, levels, num_values); } + if (num_decoded > 0) { + internal::MinMax min_max = internal::FindMinMax(levels, num_decoded); + if (ARROW_PREDICT_FALSE(min_max.min < 0 || min_max.max > max_level_)) { + std::stringstream ss; + ss << "Malformed levels. min: " << min_max.min << " max: " << min_max.max + << " out of range. Max Level: " << max_level_; + throw ParquetException(ss.str()); + } + } num_values_remaining_ -= num_decoded; return num_decoded; } @@ -880,8 +911,7 @@ int64_t TypedColumnReaderImpl::ReadBatchSpaced( } } - const bool has_spaced_values = internal::HasSpacedValues(this->descr_); - + const bool has_spaced_values = HasSpacedValues(this->descr_); int64_t null_count = 0; if (!has_spaced_values) { int values_to_read = 0; @@ -896,9 +926,21 @@ int64_t TypedColumnReaderImpl::ReadBatchSpaced( /*bits_are_set=*/true); *values_read = total_values; } else { - internal::DefinitionLevelsToBitmap(def_levels, num_def_levels, this->max_def_level_, - this->max_rep_level_, values_read, &null_count, - valid_bits, valid_bits_offset); + internal::LevelInfo info; + info.repeated_ancestor_def_level = this->max_def_level_ - 1; + info.def_level = this->max_def_level_; + info.rep_level = this->max_rep_level_; + internal::ValidityBitmapInputOutput validity_io; + validity_io.values_read_upper_bound = num_def_levels; + validity_io.valid_bits = valid_bits; + validity_io.valid_bits_offset = valid_bits_offset; + validity_io.null_count = null_count; + validity_io.values_read = *values_read; + + internal::DefLevelsToBitmap(def_levels, num_def_levels, info, &validity_io); + null_count = validity_io.null_count; + *values_read = validity_io.values_read; + total_values = this->ReadValuesSpaced(*values_read, values, static_cast(null_count), valid_bits, valid_bits_offset); @@ -1008,8 +1050,10 @@ class TypedRecordReader : public ColumnReaderImplBase, public: using T = typename DType::c_type; using BASE = ColumnReaderImplBase; - TypedRecordReader(const ColumnDescriptor* descr, MemoryPool* pool) : BASE(descr, pool) { - nullable_values_ = internal::HasSpacedValues(descr); + TypedRecordReader(const ColumnDescriptor* descr, LevelInfo leaf_info, MemoryPool* pool) + : BASE(descr, pool) { + leaf_info_ = leaf_info; + nullable_values_ = leaf_info.HasNullableValues(); at_record_start_ = true; records_read_ = 0; values_written_ = 0; @@ -1128,7 +1172,7 @@ class TypedRecordReader : public ColumnReaderImplBase, } std::shared_ptr ReleaseIsValid() override { - if (nullable_values_) { + if (leaf_info_.HasNullableValues()) { auto result = valid_bits_; PARQUET_THROW_NOT_OK(result->Resize(BitUtil::BytesForBits(values_written_), true)); valid_bits_ = AllocateBuffer(this->pool_); @@ -1170,13 +1214,7 @@ class TypedRecordReader : public ColumnReaderImplBase, break; } } - } else if (ARROW_PREDICT_FALSE(rep_level > this->max_rep_level_)) { - std::stringstream ss; - ss << "Malformed repetition levels, " << rep_level << " exceeded maximum " - << this->max_rep_level_ << " indicated by schema"; - throw ParquetException(ss.str()); } - // We have decided to consume the level at this position; therefore we // must advance until we find another record boundary at_record_start_ = false; @@ -1184,11 +1222,6 @@ class TypedRecordReader : public ColumnReaderImplBase, const int16_t def_level = *def_levels++; if (def_level == this->max_def_level_) { ++values_to_read; - } else if (ARROW_PREDICT_FALSE(def_level > this->max_def_level_)) { - std::stringstream ss; - ss << "Malformed definition levels, " << def_level << " exceeded maximum " - << this->max_def_level_ << " indicated by schema"; - throw ParquetException(ss.str()); } ++levels_position_; } @@ -1249,7 +1282,7 @@ class TypedRecordReader : public ColumnReaderImplBase, } values_capacity_ = new_values_capacity; } - if (nullable_values_) { + if (leaf_info_.HasNullableValues()) { int64_t valid_bytes_new = BitUtil::BytesForBits(values_capacity_); if (valid_bits_->size() < valid_bytes_new) { int64_t valid_bytes_old = BitUtil::BytesForBits(values_written_); @@ -1344,20 +1377,24 @@ class TypedRecordReader : public ColumnReaderImplBase, } int64_t null_count = 0; - if (nullable_values_) { - int64_t values_with_nulls = 0; - DefinitionLevelsToBitmap( - def_levels() + start_levels_position, levels_position_ - start_levels_position, - this->max_def_level_, this->max_rep_level_, &values_with_nulls, &null_count, - valid_bits_->mutable_data(), values_written_); - values_to_read = values_with_nulls - null_count; + if (leaf_info_.HasNullableValues()) { + ValidityBitmapInputOutput validity_io; + validity_io.values_read_upper_bound = levels_position_ - start_levels_position; + validity_io.valid_bits = valid_bits_->mutable_data(); + validity_io.valid_bits_offset = values_written_; + + DefLevelsToBitmap(def_levels() + start_levels_position, + levels_position_ - start_levels_position, leaf_info_, + &validity_io); + values_to_read = validity_io.values_read - validity_io.null_count; + null_count = validity_io.null_count; DCHECK_GE(values_to_read, 0); - ReadValuesSpaced(values_with_nulls, null_count); + ReadValuesSpaced(validity_io.values_read, null_count); } else { DCHECK_GE(values_to_read, 0); ReadValuesDense(values_to_read); } - if (this->max_def_level_ > 0) { + if (this->leaf_info_.def_level > 0) { // Optional, repeated, or some mix thereof this->ConsumeBufferedValues(levels_position_ - start_levels_position); } else { @@ -1415,13 +1452,15 @@ class TypedRecordReader : public ColumnReaderImplBase, T* ValuesHead() { return reinterpret_cast(values_->mutable_data()) + values_written_; } + LevelInfo leaf_info_; }; class FLBARecordReader : public TypedRecordReader, virtual public BinaryRecordReader { public: - FLBARecordReader(const ColumnDescriptor* descr, ::arrow::MemoryPool* pool) - : TypedRecordReader(descr, pool), builder_(nullptr) { + FLBARecordReader(const ColumnDescriptor* descr, LevelInfo leaf_info, + ::arrow::MemoryPool* pool) + : TypedRecordReader(descr, leaf_info, pool), builder_(nullptr) { DCHECK_EQ(descr_->physical_type(), Type::FIXED_LEN_BYTE_ARRAY); int byte_width = descr_->type_length(); std::shared_ptr<::arrow::DataType> type = ::arrow::fixed_size_binary(byte_width); @@ -1473,8 +1512,9 @@ class FLBARecordReader : public TypedRecordReader, class ByteArrayChunkedRecordReader : public TypedRecordReader, virtual public BinaryRecordReader { public: - ByteArrayChunkedRecordReader(const ColumnDescriptor* descr, ::arrow::MemoryPool* pool) - : TypedRecordReader(descr, pool) { + ByteArrayChunkedRecordReader(const ColumnDescriptor* descr, LevelInfo leaf_info, + ::arrow::MemoryPool* pool) + : TypedRecordReader(descr, leaf_info, pool) { DCHECK_EQ(descr_->physical_type(), Type::BYTE_ARRAY); accumulator_.builder.reset(new ::arrow::BinaryBuilder(pool)); } @@ -1513,9 +1553,9 @@ class ByteArrayChunkedRecordReader : public TypedRecordReader, class ByteArrayDictionaryRecordReader : public TypedRecordReader, virtual public DictionaryRecordReader { public: - ByteArrayDictionaryRecordReader(const ColumnDescriptor* descr, + ByteArrayDictionaryRecordReader(const ColumnDescriptor* descr, LevelInfo leaf_info, ::arrow::MemoryPool* pool) - : TypedRecordReader(descr, pool), builder_(pool) { + : TypedRecordReader(descr, leaf_info, pool), builder_(pool) { this->read_dictionary_ = true; } @@ -1602,35 +1642,36 @@ template <> void TypedRecordReader::DebugPrintState() {} std::shared_ptr MakeByteArrayRecordReader(const ColumnDescriptor* descr, + LevelInfo leaf_info, ::arrow::MemoryPool* pool, bool read_dictionary) { if (read_dictionary) { - return std::make_shared(descr, pool); + return std::make_shared(descr, leaf_info, pool); } else { - return std::make_shared(descr, pool); + return std::make_shared(descr, leaf_info, pool); } } std::shared_ptr RecordReader::Make(const ColumnDescriptor* descr, - MemoryPool* pool, + LevelInfo leaf_info, MemoryPool* pool, const bool read_dictionary) { switch (descr->physical_type()) { case Type::BOOLEAN: - return std::make_shared>(descr, pool); + return std::make_shared>(descr, leaf_info, pool); case Type::INT32: - return std::make_shared>(descr, pool); + return std::make_shared>(descr, leaf_info, pool); case Type::INT64: - return std::make_shared>(descr, pool); + return std::make_shared>(descr, leaf_info, pool); case Type::INT96: - return std::make_shared>(descr, pool); + return std::make_shared>(descr, leaf_info, pool); case Type::FLOAT: - return std::make_shared>(descr, pool); + return std::make_shared>(descr, leaf_info, pool); case Type::DOUBLE: - return std::make_shared>(descr, pool); + return std::make_shared>(descr, leaf_info, pool); case Type::BYTE_ARRAY: - return MakeByteArrayRecordReader(descr, pool, read_dictionary); + return MakeByteArrayRecordReader(descr, leaf_info, pool, read_dictionary); case Type::FIXED_LEN_BYTE_ARRAY: - return std::make_shared(descr, pool); + return std::make_shared(descr, leaf_info, pool); default: { // PARQUET-1481: This can occur if the file is corrupt std::stringstream ss; diff --git a/cpp/src/parquet/column_reader.h b/cpp/src/parquet/column_reader.h index 7b5ee1b722a..60c44ffa6d2 100644 --- a/cpp/src/parquet/column_reader.h +++ b/cpp/src/parquet/column_reader.h @@ -23,6 +23,7 @@ #include #include "parquet/exception.h" +#include "parquet/level_conversion.h" #include "parquet/platform.h" #include "parquet/schema.h" #include "parquet/types.h" @@ -75,6 +76,7 @@ class PARQUET_EXPORT LevelDecoder { Encoding::type encoding_; std::unique_ptr<::arrow::util::RleDecoder> rle_decoder_; std::unique_ptr<::arrow::BitUtil::BitReader> bit_packed_decoder_; + int16_t max_level_; }; struct CryptoContext { @@ -208,7 +210,7 @@ namespace internal { class RecordReader { public: static std::shared_ptr Make( - const ColumnDescriptor* descr, + const ColumnDescriptor* descr, LevelInfo leaf_info, ::arrow::MemoryPool* pool = ::arrow::default_memory_pool(), const bool read_dictionary = false); @@ -314,25 +316,6 @@ class DictionaryRecordReader : virtual public RecordReader { virtual std::shared_ptr<::arrow::ChunkedArray> GetResult() = 0; }; -// TODO(itaiin): another code path split to merge when the general case is done -static inline bool HasSpacedValues(const ColumnDescriptor* descr) { - if (descr->max_repetition_level() > 0) { - // repeated+flat case - return !descr->schema_node()->is_required(); - } else { - // non-repeated+nested case - // Find if a node forces nulls in the lowest level along the hierarchy - const schema::Node* node = descr->schema_node().get(); - while (node) { - if (node->is_optional()) { - return true; - } - node = node->parent(); - } - return false; - } -} - } // namespace internal using BoolReader = TypedColumnReader; diff --git a/cpp/src/parquet/level_comparison.cc b/cpp/src/parquet/level_comparison.cc new file mode 100644 index 00000000000..30614ae61fb --- /dev/null +++ b/cpp/src/parquet/level_comparison.cc @@ -0,0 +1,82 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/level_comparison.h" + +#define PARQUET_IMPL_NAMESPACE standard +#include "parquet/level_comparison_inc.h" +#undef PARQUET_IMPL_NAMESPACE + +#include + +#include "arrow/util/dispatch.h" + +namespace parquet { +namespace internal { + +#if defined(ARROW_HAVE_RUNTIME_AVX2) +MinMax FindMinMaxAvx2(const int16_t* levels, int64_t num_levels); +uint64_t GreaterThanBitmapAvx2(const int16_t* levels, int64_t num_levels, int16_t rhs); +#endif + +namespace { + +using ::arrow::internal::DispatchLevel; +using ::arrow::internal::DynamicDispatch; + +// defined in level_comparison_avx2.cc + +struct GreaterThanDynamicFunction { + using FunctionType = decltype(&GreaterThanBitmap); + + static std::vector> implementations() { + return { + { DispatchLevel::NONE, standard::GreaterThanBitmapImpl } +#if defined(ARROW_HAVE_RUNTIME_AVX2) + , { DispatchLevel::AVX2, GreaterThanBitmapAvx2 } +#endif + }; + } +}; + +struct MinMaxDynamicFunction { + using FunctionType = decltype(&FindMinMax); + + static std::vector> implementations() { + return { + { DispatchLevel::NONE, standard::FindMinMaxImpl } +#if defined(ARROW_HAVE_RUNTIME_AVX2) + , { DispatchLevel::AVX2, FindMinMaxAvx2 } +#endif + }; + } +}; + +} // namespace + +uint64_t GreaterThanBitmap(const int16_t* levels, int64_t num_levels, int16_t rhs) { + static DynamicDispatch dispatch; + return dispatch.func(levels, num_levels, rhs); +} + +MinMax FindMinMax(const int16_t* levels, int64_t num_levels) { + static DynamicDispatch dispatch; + return dispatch.func(levels, num_levels); +} + +} // namespace internal +} // namespace parquet diff --git a/cpp/src/parquet/level_comparison.h b/cpp/src/parquet/level_comparison.h new file mode 100644 index 00000000000..38e7ef8e2ec --- /dev/null +++ b/cpp/src/parquet/level_comparison.h @@ -0,0 +1,40 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include +#include + +#include "parquet/platform.h" + +namespace parquet { +namespace internal { + +/// Builds a bitmap where each set bit indicates the corresponding level is greater +/// than rhs. +uint64_t PARQUET_EXPORT GreaterThanBitmap(const int16_t* levels, int64_t num_levels, + int16_t rhs); + +struct MinMax { + int16_t min; + int16_t max; +}; + +MinMax FindMinMax(const int16_t* levels, int64_t num_levels); + +} // namespace internal +} // namespace parquet diff --git a/cpp/src/parquet/level_comparison_avx2.cc b/cpp/src/parquet/level_comparison_avx2.cc new file mode 100644 index 00000000000..b33eb2e2953 --- /dev/null +++ b/cpp/src/parquet/level_comparison_avx2.cc @@ -0,0 +1,34 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#define PARQUET_IMPL_NAMESPACE avx2 +#include "parquet/level_comparison_inc.h" +#undef PARQUET_IMPL_NAMESPACE + +namespace parquet { +namespace internal { + +uint64_t GreaterThanBitmapAvx2(const int16_t* levels, int64_t num_levels, int16_t rhs) { + return avx2::GreaterThanBitmapImpl(levels, num_levels, rhs); +} + +MinMax FindMinMaxAvx2(const int16_t* levels, int64_t num_levels) { + return avx2::FindMinMaxImpl(levels, num_levels); +} + +} // namespace internal +} // namespace parquet diff --git a/cpp/src/parquet/level_comparison_inc.h b/cpp/src/parquet/level_comparison_inc.h new file mode 100644 index 00000000000..f4cf7ab48e7 --- /dev/null +++ b/cpp/src/parquet/level_comparison_inc.h @@ -0,0 +1,64 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include "arrow/util/bit_util.h" +#include "parquet/level_comparison.h" + +// Used to make sure ODR rule isn't violated. +#ifndef PARQUET_IMPL_NAMESPACE +#error "PARQUET_IMPL_NAMESPACE must be defined" +#endif +namespace parquet { +namespace internal { +namespace PARQUET_IMPL_NAMESPACE { +/// Builds a bitmap by applying predicate to the level vector provided. +/// +/// \param[in] levels Rep or def level array. +/// \param[in] num_levels The number of levels to process (must be [0, 64]) +/// \param[in] predicate The predicate to apply (must have the signature `bool +/// predicate(int16_t)`. +/// \returns The bitmap using least significant "bit" ordering. +/// +template +inline uint64_t LevelsToBitmap(const int16_t* levels, int64_t num_levels, + Predicate predicate) { + // Both clang and GCC can vectorize this automatically with SSE4/AVX2. + uint64_t mask = 0; + for (int x = 0; x < num_levels; x++) { + mask |= static_cast(predicate(levels[x]) ? 1 : 0) << x; + } + return ::arrow::BitUtil::ToLittleEndian(mask); +} + +inline MinMax FindMinMaxImpl(const int16_t* levels, int64_t num_levels) { + MinMax out{std::numeric_limits::max(), std::numeric_limits::min()}; + for (int x = 0; x < num_levels; x++) { + out.min = std::min(levels[x], out.min); + out.max = std::max(levels[x], out.max); + } + return out; +} + +inline uint64_t GreaterThanBitmapImpl(const int16_t* levels, int64_t num_levels, + int16_t rhs) { + return LevelsToBitmap(levels, num_levels, [rhs](int16_t value) { return value > rhs; }); +} + +} // namespace PARQUET_IMPL_NAMESPACE +} // namespace internal +} // namespace parquet diff --git a/cpp/src/parquet/level_conversion.cc b/cpp/src/parquet/level_conversion.cc index cfa5df1a7e0..3f57be39e72 100644 --- a/cpp/src/parquet/level_conversion.cc +++ b/cpp/src/parquet/level_conversion.cc @@ -18,176 +18,204 @@ #include #include -#if defined(ARROW_HAVE_BMI2) -#include -#endif +#include "arrow/util/bit_run_reader.h" #include "arrow/util/bit_util.h" +#include "arrow/util/cpu_info.h" #include "arrow/util/logging.h" #include "parquet/exception.h" +#include "parquet/level_comparison.h" +#define PARQUET_IMPL_NAMESPACE standard +#include "parquet/level_conversion_inc.h" +#undef PARQUET_IMPL_NAMESPACE + namespace parquet { namespace internal { namespace { -inline void CheckLevelRange(const int16_t* levels, int64_t num_levels, - const int16_t max_expected_level) { - int16_t min_level = std::numeric_limits::max(); - int16_t max_level = std::numeric_limits::min(); - for (int x = 0; x < num_levels; x++) { - min_level = std::min(levels[x], min_level); - max_level = std::max(levels[x], max_level); + +using ::arrow::internal::CpuInfo; + +void DefLevelsToBitmapScalar(const int16_t* def_levels, int64_t num_def_levels, + LevelInfo level_info, ValidityBitmapInputOutput* output) { + ::arrow::internal::FirstTimeBitmapWriter valid_bits_writer( + output->valid_bits, + /*start_offset=*/output->valid_bits_offset, + /*length=*/num_def_levels); + for (int x = 0; x < num_def_levels; x++) { + // This indicates that a parent repeated element has zero + // length so the def level is not applicable to this column. + if (def_levels[x] < level_info.repeated_ancestor_def_level) { + continue; + } + if (ARROW_PREDICT_FALSE(valid_bits_writer.position() >= + output->values_read_upper_bound)) { + std::stringstream ss; + ss << "Definition levels exceeded upper bound: " << output->values_read_upper_bound; + throw ParquetException(ss.str()); + } + if (def_levels[x] >= level_info.def_level) { + valid_bits_writer.Set(); + } else { + valid_bits_writer.Clear(); + output->null_count += 1; + } + valid_bits_writer.Next(); } - if (ARROW_PREDICT_FALSE(num_levels > 0 && - (min_level < 0 || max_level > max_expected_level))) { - throw ParquetException("definition level exceeds maximum"); + valid_bits_writer.Finish(); + output->values_read = valid_bits_writer.position(); + if (output->null_count > 0 && level_info.null_slot_usage > 1) { + throw ParquetException( + "Null values with null_slot_usage > 1 not supported." + "(i.e. FixedSizeLists with null values are not supported"); } } -#if !defined(ARROW_HAVE_AVX512) - -inline void DefinitionLevelsToBitmapScalar( - const int16_t* def_levels, int64_t num_def_levels, const int16_t max_definition_level, - const int16_t max_repetition_level, int64_t* values_read, int64_t* null_count, - uint8_t* valid_bits, int64_t valid_bits_offset) { - // We assume here that valid_bits is large enough to accommodate the - // additional definition levels and the ones that have already been written - ::arrow::internal::BitmapWriter valid_bits_writer(valid_bits, valid_bits_offset, - num_def_levels); - - // TODO(itaiin): As an interim solution we are splitting the code path here - // between repeated+flat column reads, and non-repeated+nested reads. - // Those paths need to be merged in the future - for (int i = 0; i < num_def_levels; ++i) { - if (def_levels[i] == max_definition_level) { - valid_bits_writer.Set(); - } else if (max_repetition_level > 0) { - // repetition+flat case - if (def_levels[i] == (max_definition_level - 1)) { - valid_bits_writer.Clear(); - *null_count += 1; - } else { - continue; +template +void DefRepLevelsToListInfo(const int16_t* def_levels, const int16_t* rep_levels, + int64_t num_def_levels, LevelInfo level_info, + ValidityBitmapInputOutput* output, OffsetType* offsets) { + OffsetType* orig_pos = offsets; + std::unique_ptr<::arrow::internal::FirstTimeBitmapWriter> valid_bits_writer; + if (output->valid_bits) { + valid_bits_writer.reset(new ::arrow::internal::FirstTimeBitmapWriter( + output->valid_bits, output->valid_bits_offset, num_def_levels)); + } + for (int x = 0; x < num_def_levels; x++) { + // Skip items that belong to empty or null ancestor lists and further nested lists. + if (def_levels[x] < level_info.repeated_ancestor_def_level || + rep_levels[x] > level_info.rep_level) { + continue; + } + + if (rep_levels[x] == level_info.rep_level) { + // A continuation of an existing list. + // offsets can be null for structs with repeated children (we don't need to know + // offsets until we get to the children). + if (offsets != nullptr) { + if (ARROW_PREDICT_FALSE(*offsets == std::numeric_limits::max())) { + throw ParquetException("List index overflow."); + } + *offsets += 1; } } else { - // non-repeated+nested case - if (def_levels[i] < max_definition_level) { - valid_bits_writer.Clear(); - *null_count += 1; - } else { - throw ParquetException("definition level exceeds maximum"); + if (ARROW_PREDICT_FALSE( + (valid_bits_writer != nullptr && + valid_bits_writer->position() >= output->values_read_upper_bound) || + (offsets - orig_pos) >= output->values_read_upper_bound)) { + std::stringstream ss; + ss << "Definition levels exceeded upper bound: " + << output->values_read_upper_bound; + throw ParquetException(ss.str()); } - } - valid_bits_writer.Next(); + // current_rep < list rep_level i.e. start of a list (ancestor empty lists are + // filtered out above). + // offsets can be null for structs with repeated children (we don't need to know + // offsets until we get to the children). + if (offsets != nullptr) { + ++offsets; + // Use cumulative offsets because variable size lists are more common then + // fixed size lists so it should be cheaper to make these cumulative and + // subtract when validating fixed size lists. + *offsets = *(offsets - 1); + if (def_levels[x] >= level_info.def_level) { + if (ARROW_PREDICT_FALSE(*offsets == std::numeric_limits::max())) { + throw ParquetException("List index overflow."); + } + *offsets += 1; + } + } + + if (valid_bits_writer != nullptr) { + // the level_info def level for lists reflects element present level. + // the prior level distinguishes between empty lists. + if (def_levels[x] >= level_info.def_level - 1) { + valid_bits_writer->Set(); + } else { + output->null_count++; + valid_bits_writer->Clear(); + } + valid_bits_writer->Next(); + } + } + } + if (valid_bits_writer != nullptr) { + valid_bits_writer->Finish(); + } + if (offsets != nullptr) { + output->values_read = offsets - orig_pos; + } else if (valid_bits_writer != nullptr) { + output->values_read = valid_bits_writer->position(); + } + if (output->null_count > 0 && level_info.null_slot_usage > 1) { + throw ParquetException( + "Null values with null_slot_usage > 1 not supported." + "(i.e. FixedSizeLists with null values are not supported)"); } - valid_bits_writer.Finish(); - *values_read = valid_bits_writer.position(); } + +} // namespace + +#if defined(ARROW_HAVE_RUNTIME_BMI2) +// defined in level_conversion_bmi2.cc for dynamic dispatch. +void DefLevelsToBitmapBmi2WithRepeatedParent(const int16_t* def_levels, + int64_t num_def_levels, LevelInfo level_info, + ValidityBitmapInputOutput* output); #endif -template -int64_t DefinitionLevelsBatchToBitmap(const int16_t* def_levels, const int64_t batch_size, - const int16_t required_definition_level, - ::arrow::internal::FirstTimeBitmapWriter* writer) { - CheckLevelRange(def_levels, batch_size, required_definition_level); - uint64_t defined_bitmap = - internal::GreaterThanBitmap(def_levels, batch_size, required_definition_level - 1); - - DCHECK_LE(batch_size, 64); - if (has_repeated_parent) { -#if defined(ARROW_HAVE_BMI2) - // This is currently a specialized code path assuming only (nested) lists - // present through the leaf (i.e. no structs). Upper level code only calls - // this method when the leaf-values are nullable (otherwise no spacing is needed), - // Because only nested lists exists it is sufficient to know that the field - // was either null or included it (i.e. definition level > max_definitation_level - // -2) If there where structs mixed in, we need to know the def_level of the - // repeated parent so we can check for def_level > "def level of repeated parent". - uint64_t present_bitmap = internal::GreaterThanBitmap(def_levels, batch_size, - required_definition_level - 2); - uint64_t selected_bits = _pext_u64(defined_bitmap, present_bitmap); - writer->AppendWord(selected_bits, ::arrow::BitUtil::PopCount(present_bitmap)); - return ::arrow::BitUtil::PopCount(selected_bits); +void DefLevelsToBitmap(const int16_t* def_levels, int64_t num_def_levels, + LevelInfo level_info, ValidityBitmapInputOutput* output) { + // It is simpler to rely on rep_level here until PARQUET-1899 is done and the code + // is deleted in a follow-up release. + if (level_info.rep_level > 0) { +#if defined(ARROW_HAVE_RUNTIME_BMI2) + using FunctionType = decltype(&standard::DefLevelsToBitmapSimd); + // DefLevelsToBitmapSimd with emulated PEXT would be slow, so use the + // scalar version if BMI2 is unavailable. + static FunctionType fn = CpuInfo::GetInstance()->HasEfficientBmi2() + ? DefLevelsToBitmapBmi2WithRepeatedParent + : DefLevelsToBitmapScalar; + fn(def_levels, num_def_levels, level_info, output); #else - assert(false && "must not execute this without BMI2"); + DefLevelsToBitmapScalar(def_levels, num_def_levels, level_info, output); #endif } else { - writer->AppendWord(defined_bitmap, batch_size); - return ::arrow::BitUtil::PopCount(defined_bitmap); + // SIMD here applies to all platforms because the only operation that + // happens is def_levels->bitmap which should have good SIMD options + // on all platforms. + standard::DefLevelsToBitmapSimd( + def_levels, num_def_levels, level_info, output); } } -template -void DefinitionLevelsToBitmapSimd(const int16_t* def_levels, int64_t num_def_levels, - const int16_t required_definition_level, - int64_t* values_read, int64_t* null_count, - uint8_t* valid_bits, int64_t valid_bits_offset) { - constexpr int64_t kBitMaskSize = 64; - ::arrow::internal::FirstTimeBitmapWriter writer(valid_bits, - /*start_offset=*/valid_bits_offset, - /*length=*/num_def_levels); - int64_t set_count = 0; - *values_read = 0; - while (num_def_levels > kBitMaskSize) { - set_count += DefinitionLevelsBatchToBitmap( - def_levels, kBitMaskSize, required_definition_level, &writer); - def_levels += kBitMaskSize; - num_def_levels -= kBitMaskSize; - } - set_count += DefinitionLevelsBatchToBitmap( - def_levels, num_def_levels, required_definition_level, &writer); - - *values_read = writer.position(); - *null_count += *values_read - set_count; - writer.Finish(); +uint64_t TestOnlyRunBasedExtract(uint64_t bitmap, uint64_t select_bitmap) { + return standard::RunBasedExtractImpl(bitmap, select_bitmap); } -void DefinitionLevelsToBitmapLittleEndian( - const int16_t* def_levels, int64_t num_def_levels, const int16_t max_definition_level, - const int16_t max_repetition_level, int64_t* values_read, int64_t* null_count, - uint8_t* valid_bits, int64_t valid_bits_offset) { - if (max_repetition_level > 0) { -// This is a short term hack to prevent using the pext BMI2 instructions -// on non-intel platforms where performance is subpar. -// In the medium term we will hopefully be able to runtime dispatch -// to use this on intel only platforms that support pext. -#if defined(ARROW_HAVE_AVX512) - // BMI2 is required for efficient bit extraction. - DefinitionLevelsToBitmapSimd( - def_levels, num_def_levels, max_definition_level, values_read, null_count, - valid_bits, valid_bits_offset); -#else - DefinitionLevelsToBitmapScalar(def_levels, num_def_levels, max_definition_level, - max_repetition_level, values_read, null_count, - valid_bits, valid_bits_offset); -#endif // ARROW_HAVE_BMI2 - - } else { - // No BMI2 intsturctions are used for non-repeated case. - DefinitionLevelsToBitmapSimd( - def_levels, num_def_levels, max_definition_level, values_read, null_count, - valid_bits, valid_bits_offset); - } +void DefRepLevelsToList(const int16_t* def_levels, const int16_t* rep_levels, + int64_t num_def_levels, LevelInfo level_info, + ValidityBitmapInputOutput* output, int32_t* offsets) { + DefRepLevelsToListInfo(def_levels, rep_levels, num_def_levels, level_info, + output, offsets); } -} // namespace - -void DefinitionLevelsToBitmap(const int16_t* def_levels, int64_t num_def_levels, - const int16_t max_definition_level, - const int16_t max_repetition_level, int64_t* values_read, - int64_t* null_count, uint8_t* valid_bits, - int64_t valid_bits_offset) { -#if ARROW_LITTLE_ENDIAN - DefinitionLevelsToBitmapLittleEndian(def_levels, num_def_levels, max_definition_level, - max_repetition_level, values_read, null_count, - valid_bits, valid_bits_offset); - -#else - DefinitionLevelsToBitmapScalar(def_levels, num_def_levels, max_definition_level, - max_repetition_level, values_read, null_count, - valid_bits, valid_bits_offset); +void DefRepLevelsToList(const int16_t* def_levels, const int16_t* rep_levels, + int64_t num_def_levels, LevelInfo level_info, + ValidityBitmapInputOutput* output, int64_t* offsets) { + DefRepLevelsToListInfo(def_levels, rep_levels, num_def_levels, level_info, + output, offsets); +} -#endif +void DefRepLevelsToBitmap(const int16_t* def_levels, const int16_t* rep_levels, + int64_t num_def_levels, LevelInfo level_info, + ValidityBitmapInputOutput* output) { + // DefReplevelsToListInfo assumes it for the actual list method and this + // method is for parent structs, so we need to bump def and ref level. + level_info.rep_level += 1; + level_info.def_level += 1; + DefRepLevelsToListInfo(def_levels, rep_levels, num_def_levels, level_info, + output, /*offsets=*/nullptr); } } // namespace internal diff --git a/cpp/src/parquet/level_conversion.h b/cpp/src/parquet/level_conversion.h index dbecb3171cf..c664cbae4cb 100644 --- a/cpp/src/parquet/level_conversion.h +++ b/cpp/src/parquet/level_conversion.h @@ -41,6 +41,8 @@ struct PARQUET_EXPORT LevelInfo { repeated_ancestor_def_level == b.repeated_ancestor_def_level; } + bool HasNullableValues() const { return repeated_ancestor_def_level < def_level; } + // How many slots an undefined but present (i.e. null) element in // parquet consumes when decoding to Arrow. // "Slot" is used in the same context as the Arrow specification @@ -132,44 +134,65 @@ struct PARQUET_EXPORT LevelInfo { } }; -void PARQUET_EXPORT DefinitionLevelsToBitmap( - const int16_t* def_levels, int64_t num_def_levels, const int16_t max_definition_level, - const int16_t max_repetition_level, int64_t* values_read, int64_t* null_count, - uint8_t* valid_bits, int64_t valid_bits_offset); - -// These APIs are likely to be revised as part of ARROW-8494 to reduce duplicate code. -// They currently represent minimal functionality for vectorized computation of definition -// levels. - -#if defined(ARROW_LITTLE_ENDIAN) -/// Builds a bitmap by applying predicate to the level vector provided. -/// -/// \param[in] levels Rep or def level array. -/// \param[in] num_levels The number of levels to process (must be [0, 64]) -/// \param[in] predicate The predicate to apply (must have the signature `bool -/// predicate(int16_t)`. -/// \returns The bitmap using least significant "bit" ordering. -/// -/// N.B. Correct byte ordering is dependent on little-endian architectures. -/// -template -uint64_t LevelsToBitmap(const int16_t* levels, int64_t num_levels, Predicate predicate) { - // Both clang and GCC can vectorize this automatically with SSE4/AVX2. - uint64_t mask = 0; - for (int x = 0; x < num_levels; x++) { - mask |= static_cast(predicate(levels[x]) ? 1 : 0) << x; - } - return mask; -} - -/// Builds a bitmap where each set bit indicates the corresponding level is greater -/// than rhs. -static inline uint64_t GreaterThanBitmap(const int16_t* levels, int64_t num_levels, - int16_t rhs) { - return LevelsToBitmap(levels, num_levels, [rhs](int16_t value) { return value > rhs; }); -} +// Input/Output structure for reconstructed validity bitmaps. +struct PARQUET_EXPORT ValidityBitmapInputOutput { + // Input only. + // The maximum number of values_read expected (actual + // values read must be less than or equal to this value. + // If this number is exceeded methods will throw a + // ParquetException. Exceeding this limit indicates + // either a corrupt or incorrectly written file. + int64_t values_read_upper_bound = 0; + // Output only. The number of values added to the encountered + // (this is logicallyt he count of the number of elements + // for an Arrow array). + int64_t values_read = 0; + // Input/Output. The number of nulls encountered. + int64_t null_count = 0; + // Output only. The validity bitmap to populate. May be be null only + // for DefRepLevelsToListInfo (if all that is needed is list offsets). + uint8_t* valid_bits = NULLPTR; + // Input only, offset into valid_bits to start at. + int64_t valid_bits_offset = 0; +}; -#endif +// Converts def_levels to validity bitmaps for non-list arrays and structs that have +// at least one member that is not a list and has no list descendents. +// For lists use DefRepLevelsToList and structs where all descendants contain +// a list use DefRepLevelsToBitmap. +void PARQUET_EXPORT DefLevelsToBitmap(const int16_t* def_levels, int64_t num_def_levels, + LevelInfo level_info, + ValidityBitmapInputOutput* output); + +// Reconstructs a validity bitmap and list offsets for a list arrays based on +// def/rep levels. The first element of offsets will not be modified if rep_levels +// starts with a new list. The first element of offsets will be used when calculating +// the next offset. See documentation onf DefLevelsToBitmap for when to use this +// method vs the other ones in this file for reconstruction. +// +// Offsets must be sized to 1 + values_read_upper_bound. +void PARQUET_EXPORT DefRepLevelsToList(const int16_t* def_levels, + const int16_t* rep_levels, int64_t num_def_levels, + LevelInfo level_info, + ValidityBitmapInputOutput* output, + int32_t* offsets); +void PARQUET_EXPORT DefRepLevelsToList(const int16_t* def_levels, + const int16_t* rep_levels, int64_t num_def_levels, + LevelInfo level_info, + ValidityBitmapInputOutput* output, + int64_t* offsets); + +// Reconstructs a validity bitmap for a struct every member is a list or has +// a list descendant. See documentation on DefLevelsToBitmap for when more +// details on this method compared to the other ones defined above. +void PARQUET_EXPORT DefRepLevelsToBitmap(const int16_t* def_levels, + const int16_t* rep_levels, + int64_t num_def_levels, LevelInfo level_info, + ValidityBitmapInputOutput* output); + +// This is exposed to ensure we can properly test a software simulated pext function +// (i.e. it isn't hidden by runtime dispatch). +uint64_t PARQUET_EXPORT TestOnlyRunBasedExtract(uint64_t bitmap, uint64_t selection); } // namespace internal } // namespace parquet diff --git a/cpp/src/parquet/level_conversion_benchmark.cc b/cpp/src/parquet/level_conversion_benchmark.cc index 4f15838d339..f9e91c4820f 100644 --- a/cpp/src/parquet/level_conversion_benchmark.cc +++ b/cpp/src/parquet/level_conversion_benchmark.cc @@ -34,15 +34,17 @@ constexpr int16_t kHasRepeatedElements = 1; std::vector RunDefinitionLevelsToBitmap(const std::vector& def_levels, ::benchmark::State* state) { - int64_t values_read = 0; - int64_t null_count = 0; std::vector bitmap(/*count=*/def_levels.size(), 0); - int rep = 0; + parquet::internal::LevelInfo info; + info.def_level = kHasRepeatedElements; + info.repeated_ancestor_def_level = kPresentDefLevel; + info.rep_level = 1; + parquet::internal::ValidityBitmapInputOutput validity_io; + validity_io.values_read_upper_bound = def_levels.size(); + validity_io.valid_bits = bitmap.data(); for (auto _ : *state) { - parquet::internal::DefinitionLevelsToBitmap( - def_levels.data(), def_levels.size(), /*max_definition_level=*/kPresentDefLevel, - /*max_repetition_level=*/kHasRepeatedElements, &values_read, &null_count, - bitmap.data(), /*valid_bits_offset=*/(rep++ % 8) * def_levels.size()); + parquet::internal::DefLevelsToBitmap(def_levels.data(), def_levels.size(), info, + &validity_io); } state->SetBytesProcessed(int64_t(state->iterations()) * def_levels.size()); return bitmap; diff --git a/cpp/src/parquet/level_conversion_bmi2.cc b/cpp/src/parquet/level_conversion_bmi2.cc new file mode 100644 index 00000000000..274d54e503c --- /dev/null +++ b/cpp/src/parquet/level_conversion_bmi2.cc @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include "parquet/level_conversion.h" + +#define PARQUET_IMPL_NAMESPACE bmi2 +#include "parquet/level_conversion_inc.h" +#undef PARQUET_IMPL_NAMESPACE + +namespace parquet { +namespace internal { +void DefLevelsToBitmapBmi2WithRepeatedParent(const int16_t* def_levels, + int64_t num_def_levels, LevelInfo level_info, + ValidityBitmapInputOutput* output) { + bmi2::DefLevelsToBitmapSimd(def_levels, num_def_levels, + level_info, output); +} + +} // namespace internal +} // namespace parquet diff --git a/cpp/src/parquet/level_conversion_inc.h b/cpp/src/parquet/level_conversion_inc.h new file mode 100644 index 00000000000..c688f748043 --- /dev/null +++ b/cpp/src/parquet/level_conversion_inc.h @@ -0,0 +1,140 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include "parquet/level_conversion.h" + +#include +#include + +#include "arrow/util/bit_run_reader.h" +#include "arrow/util/bit_util.h" +#include "arrow/util/logging.h" +#include "arrow/util/simd.h" +#include "parquet/exception.h" +#include "parquet/level_comparison.h" + +namespace parquet { +namespace internal { +#ifndef PARQUET_IMPL_NAMESPACE +#error "PARQUET_IMPL_NAMESPACE must be defined" +#endif +namespace PARQUET_IMPL_NAMESPACE { + +/// Algorithm to simulate pext using BitRunReader for cases where all bits +/// not set or set. +uint64_t RunBasedExtractMixed(uint64_t bitmap, uint64_t select_bitmap) { + bitmap = arrow::BitUtil::FromLittleEndian(bitmap); + uint64_t new_bitmap = 0; + ::arrow::internal::BitRunReader selection(reinterpret_cast(&select_bitmap), + /*start_offset=*/0, /*length=*/64); + ::arrow::internal::BitRun run = selection.NextRun(); + int64_t selected_bits = 0; + while (run.length != 0) { + if (run.set) { + new_bitmap |= (bitmap & ::arrow::BitUtil::LeastSignficantBitMask(run.length)) + << selected_bits; + selected_bits += run.length; + } + bitmap = bitmap >> run.length; + run = selection.NextRun(); + } + return arrow::BitUtil::ToLittleEndian(new_bitmap); +} + +inline uint64_t RunBasedExtractImpl(uint64_t bitmap, uint64_t select_bitmap) { + /// These checks should be inline and are likely to be common cases. + if (select_bitmap == ~uint64_t{0}) { + return bitmap; + } else if (select_bitmap == 0) { + return 0; + } + /// Fallback to the slow method. + return RunBasedExtractMixed(bitmap, select_bitmap); +} + +inline uint64_t ExtractBits(uint64_t bitmap, uint64_t select_bitmap) { +// MING32 doesn't support 64-bit pext. +#if defined(ARROW_HAVE_BMI2) && !defined(__MINGW32__) + return _pext_u64(bitmap, select_bitmap); +#else + return RunBasedExtractImpl(bitmap, select_bitmap); +#endif +} + +template +int64_t DefLevelsBatchToBitmap(const int16_t* def_levels, const int64_t batch_size, + int64_t upper_bound_remaining, LevelInfo level_info, + ::arrow::internal::FirstTimeBitmapWriter* writer) { + // Greater than level_info.def_level - 1 implies >= the def_level + uint64_t defined_bitmap = + internal::GreaterThanBitmap(def_levels, batch_size, level_info.def_level - 1); + + DCHECK_LE(batch_size, 64); + if (has_repeated_parent) { + // Greater than level_info.repeated_ancestor_def_level - 1 implies >= the + // repeated_ancestor_def_level + uint64_t present_bitmap = internal::GreaterThanBitmap( + def_levels, batch_size, level_info.repeated_ancestor_def_level - 1); + uint64_t selected_bits = ExtractBits(defined_bitmap, present_bitmap); + int64_t selected_count = ::arrow::BitUtil::PopCount(present_bitmap); + if (ARROW_PREDICT_FALSE(selected_count > upper_bound_remaining)) { + throw ParquetException("Values read exceeded upper bound"); + } + writer->AppendWord(selected_bits, selected_count); + return ::arrow::BitUtil::PopCount(selected_bits); + } else { + if (ARROW_PREDICT_FALSE(batch_size > upper_bound_remaining)) { + std::stringstream ss; + ss << "Values read exceeded upper bound"; + throw ParquetException(ss.str()); + } + + writer->AppendWord(defined_bitmap, batch_size); + return ::arrow::BitUtil::PopCount(defined_bitmap); + } +} + +template +void DefLevelsToBitmapSimd(const int16_t* def_levels, int64_t num_def_levels, + LevelInfo level_info, ValidityBitmapInputOutput* output) { + constexpr int64_t kBitMaskSize = 64; + ::arrow::internal::FirstTimeBitmapWriter writer( + output->valid_bits, + /*start_offset=*/output->valid_bits_offset, + /*length=*/num_def_levels); + int64_t set_count = 0; + output->values_read = 0; + int64_t values_read_remaining = output->values_read_upper_bound; + while (num_def_levels > kBitMaskSize) { + set_count += DefLevelsBatchToBitmap( + def_levels, kBitMaskSize, values_read_remaining, level_info, &writer); + def_levels += kBitMaskSize; + num_def_levels -= kBitMaskSize; + values_read_remaining = output->values_read_upper_bound - writer.position(); + } + set_count += DefLevelsBatchToBitmap( + def_levels, num_def_levels, values_read_remaining, level_info, &writer); + + output->values_read = writer.position(); + output->null_count += output->values_read - set_count; + writer.Finish(); +} + +} // namespace PARQUET_IMPL_NAMESPACE +} // namespace internal +} // namespace parquet diff --git a/cpp/src/parquet/level_conversion_test.cc b/cpp/src/parquet/level_conversion_test.cc index d4f3719289d..b4f2d3ad5d1 100644 --- a/cpp/src/parquet/level_conversion_test.cc +++ b/cpp/src/parquet/level_conversion_test.cc @@ -17,18 +17,24 @@ #include "parquet/level_conversion.h" +#include "parquet/level_comparison.h" +#include "parquet/test_util.h" + #include #include #include #include +#include "arrow/testing/gtest_compat.h" #include "arrow/util/bit_util.h" #include "arrow/util/bitmap.h" +#include "arrow/util/ubsan.h" namespace parquet { namespace internal { +using ::arrow::internal::Bitmap; using ::testing::ElementsAreArray; std::string BitmapToString(const uint8_t* bitmap, int64_t bit_count) { @@ -39,53 +45,55 @@ std::string BitmapToString(const std::vector& bitmap, int64_t bit_count return BitmapToString(bitmap.data(), bit_count); } -TEST(TestColumnReader, DefinitionLevelsToBitmap) { +TEST(TestColumnReader, DefLevelsToBitmap) { // Bugs in this function were exposed in ARROW-3930 std::vector def_levels = {3, 3, 3, 2, 3, 3, 3, 3, 3}; std::vector valid_bits(2, 0); - const int max_def_level = 3; - const int max_rep_level = 1; + LevelInfo level_info; + level_info.def_level = 3; + level_info.rep_level = 1; + + ValidityBitmapInputOutput io; + io.values_read_upper_bound = def_levels.size(); + io.values_read = -1; + io.valid_bits = valid_bits.data(); - int64_t values_read = -1; - int64_t null_count = 0; - internal::DefinitionLevelsToBitmap(def_levels.data(), 9, max_def_level, max_rep_level, - &values_read, &null_count, valid_bits.data(), - 0 /* valid_bits_offset */); - ASSERT_EQ(9, values_read); - ASSERT_EQ(1, null_count); + internal::DefLevelsToBitmap(def_levels.data(), 9, level_info, &io); + ASSERT_EQ(9, io.values_read); + ASSERT_EQ(1, io.null_count); // Call again with 0 definition levels, make sure that valid_bits is unmodified const uint8_t current_byte = valid_bits[1]; - null_count = 0; - internal::DefinitionLevelsToBitmap(def_levels.data(), 0, max_def_level, max_rep_level, - &values_read, &null_count, valid_bits.data(), - 9 /* valid_bits_offset */); - ASSERT_EQ(0, values_read); - ASSERT_EQ(0, null_count); + io.null_count = 0; + internal::DefLevelsToBitmap(def_levels.data(), 0, level_info, &io); + + ASSERT_EQ(0, io.values_read); + ASSERT_EQ(0, io.null_count); ASSERT_EQ(current_byte, valid_bits[1]); } -TEST(TestColumnReader, DefinitionLevelsToBitmapPowerOfTwo) { +TEST(TestColumnReader, DefLevelsToBitmapPowerOfTwo) { // PARQUET-1623: Invalid memory access when decoding a valid bits vector that has a // length equal to a power of two and also using a non-zero valid_bits_offset. This // should not fail when run with ASAN or valgrind. std::vector def_levels = {3, 3, 3, 2, 3, 3, 3, 3}; std::vector valid_bits(1, 0); - const int max_def_level = 3; - const int max_rep_level = 1; + LevelInfo level_info; + level_info.rep_level = 1; + level_info.def_level = 3; - int64_t values_read = -1; - int64_t null_count = 0; + ValidityBitmapInputOutput io; + io.values_read_upper_bound = def_levels.size(); + io.values_read = -1; + io.valid_bits = valid_bits.data(); // Read the latter half of the validity bitmap - internal::DefinitionLevelsToBitmap(def_levels.data() + 4, 4, max_def_level, - max_rep_level, &values_read, &null_count, - valid_bits.data(), 4 /* valid_bits_offset */); - ASSERT_EQ(4, values_read); - ASSERT_EQ(0, null_count); + internal::DefLevelsToBitmap(def_levels.data() + 4, 4, level_info, &io); + ASSERT_EQ(4, io.values_read); + ASSERT_EQ(0, io.null_count); } #if defined(ARROW_LITTLE_ENDIAN) @@ -106,24 +114,251 @@ TEST(GreaterThanBitmap, GeneratesExpectedBitmasks) { } #endif -TEST(DefinitionLevelsToBitmap, WithRepetitionLevelFiltersOutEmptyListValues) { +TEST(DefLevelsToBitmap, WithRepetitionLevelFiltersOutEmptyListValues) { std::vector validity_bitmap(/*count*/ 8, 0); - int64_t null_count = 5; - int64_t values_read = 1; + ValidityBitmapInputOutput io; + io.values_read_upper_bound = 64; + io.values_read = 1; + io.null_count = 5; + io.valid_bits = validity_bitmap.data(); + io.valid_bits_offset = 1; + + LevelInfo level_info; + level_info.repeated_ancestor_def_level = 1; + level_info.def_level = 2; + level_info.rep_level = 1; // All zeros should be ignored, ones should be unset in the bitmp and 2 should be set. std::vector def_levels = {0, 0, 0, 2, 2, 1, 0, 2}; - DefinitionLevelsToBitmap( - def_levels.data(), def_levels.size(), /*max_definition_level=*/2, - /*max_repetition_level=*/1, &values_read, &null_count, validity_bitmap.data(), - /*valid_bits_offset=*/1); + DefLevelsToBitmap(def_levels.data(), def_levels.size(), level_info, &io); EXPECT_EQ(BitmapToString(validity_bitmap, /*bit_count=*/8), "01101000"); for (size_t x = 1; x < validity_bitmap.size(); x++) { EXPECT_EQ(validity_bitmap[x], 0) << "index: " << x; } - EXPECT_EQ(null_count, /*5 + 1 =*/6); - EXPECT_EQ(values_read, 4); // value should get overwritten. + EXPECT_EQ(io.null_count, /*5 + 1 =*/6); + EXPECT_EQ(io.values_read, 4); // value should get overwritten. +} + +struct MultiLevelTestData { + public: + std::vector def_levels; + std::vector rep_levels; +}; + +MultiLevelTestData TriplyNestedList() { + // Triply nested list values borrow from write_path + // [null, [[1 , null, 3], []], []], + // [[[]], [[], [1, 2]], null, [[3]]], + // null, + // [] + return MultiLevelTestData{ + /*def_levels=*/std::vector{2, 7, 6, 7, 5, 3, // first row + 5, 5, 7, 7, 2, 7, // second row + 0, // third row + 1}, + /*rep_levels=*/std::vector{0, 1, 3, 3, 2, 1, // first row + 0, 1, 2, 3, 1, 1, // second row + 0, 0}}; +} + +template +class NestedListTest : public testing::Test { + public: + void InitForLength(int length) { + this->validity_bits_.clear(); + this->validity_bits_.insert(this->validity_bits_.end(), length, 0); + validity_io_.valid_bits = validity_bits_.data(); + validity_io_.values_read_upper_bound = length; + offsets_.clear(); + offsets_.insert(offsets_.end(), length + 1, 0); + } + + typename ConverterType::OffsetsType* Run(const MultiLevelTestData& test_data, + LevelInfo level_info) { + return this->converter_.ComputeListInfo(test_data, level_info, &validity_io_, + offsets_.data()); + } + + ConverterType converter_; + ValidityBitmapInputOutput validity_io_; + std::vector validity_bits_; + std::vector offsets_; +}; + +template +struct RepDefLevelConverter { + using OffsetsType = IndexType; + OffsetsType* ComputeListInfo(const MultiLevelTestData& test_data, LevelInfo level_info, + ValidityBitmapInputOutput* output, IndexType* offsets) { + DefRepLevelsToList(test_data.def_levels.data(), test_data.rep_levels.data(), + test_data.def_levels.size(), level_info, output, offsets); + return offsets + output->values_read; + } +}; + +using ConverterTypes = + ::testing::Types, + RepDefLevelConverter>; +TYPED_TEST_SUITE(NestedListTest, ConverterTypes); + +TYPED_TEST(NestedListTest, OuterMostTest) { + // [null, [[1 , null, 3], []], []], + // [[[]], [[], [1, 2]], null, [[3]]], + // null, + // [] + // -> 4 outer most lists (len(3), len(4), null, len(0)) + LevelInfo level_info; + level_info.rep_level = 1; + level_info.def_level = 2; + + this->InitForLength(4); + typename TypeParam::OffsetsType* next_position = + this->Run(TriplyNestedList(), level_info); + + EXPECT_EQ(next_position, this->offsets_.data() + 4); + EXPECT_THAT(this->offsets_, testing::ElementsAre(0, 3, 7, 7, 7)); + + EXPECT_EQ(this->validity_io_.values_read, 4); + EXPECT_EQ(this->validity_io_.null_count, 1); + EXPECT_EQ(BitmapToString(this->validity_io_.valid_bits, /*length=*/4), "1101"); +} + +TYPED_TEST(NestedListTest, MiddleListTest) { + // [null, [[1 , null, 3], []], []], + // [[[]], [[], [1, 2]], null, [[3]]], + // null, + // [] + // -> middle lists (null, len(2), len(0), + // len(1), len(2), null, len(1), + // N/A, + // N/A + LevelInfo level_info; + level_info.rep_level = 2; + level_info.def_level = 4; + level_info.repeated_ancestor_def_level = 2; + + this->InitForLength(7); + typename TypeParam::OffsetsType* next_position = + this->Run(TriplyNestedList(), level_info); + + EXPECT_EQ(next_position, this->offsets_.data() + 7); + EXPECT_THAT(this->offsets_, testing::ElementsAre(0, 0, 2, 2, 3, 5, 5, 6)); + + EXPECT_EQ(this->validity_io_.values_read, 7); + EXPECT_EQ(this->validity_io_.null_count, 2); + EXPECT_EQ(BitmapToString(this->validity_io_.valid_bits, /*length=*/7), "0111101"); +} + +TYPED_TEST(NestedListTest, InnerMostListTest) { + // [null, [[1, null, 3], []], []], + // [[[]], [[], [1, 2]], null, [[3]]], + // null, + // [] + // -> 6 inner lists (N/A, [len(3), len(0)], N/A + // len(0), [len(0), len(2)], N/A, len(1), + // N/A, + // N/A + LevelInfo level_info; + level_info.rep_level = 3; + level_info.def_level = 6; + level_info.repeated_ancestor_def_level = 4; + + this->InitForLength(6); + typename TypeParam::OffsetsType* next_position = + this->Run(TriplyNestedList(), level_info); + + EXPECT_EQ(next_position, this->offsets_.data() + 6); + EXPECT_THAT(this->offsets_, testing::ElementsAre(0, 3, 3, 3, 3, 5, 6)); + + EXPECT_EQ(this->validity_io_.values_read, 6); + EXPECT_EQ(this->validity_io_.null_count, 0); + EXPECT_EQ(BitmapToString(this->validity_io_.valid_bits, /*length=*/6), "111111"); +} + +TYPED_TEST(NestedListTest, SimpleLongList) { + LevelInfo level_info; + level_info.rep_level = 1; + level_info.def_level = 2; + level_info.repeated_ancestor_def_level = 0; + + MultiLevelTestData test_data; + // No empty lists. + test_data.def_levels = std::vector(65 * 9, 2); + for (int x = 0; x < 65; x++) { + test_data.rep_levels.push_back(0); + test_data.rep_levels.insert(test_data.rep_levels.end(), 8, + /*rep_level=*/1); + } + + std::vector expected_offsets(66, 0); + for (size_t x = 1; x < expected_offsets.size(); x++) { + expected_offsets[x] = static_cast(x) * 9; + } + this->InitForLength(65); + typename TypeParam::OffsetsType* next_position = this->Run(test_data, level_info); + + EXPECT_EQ(next_position, this->offsets_.data() + 65); + EXPECT_THAT(this->offsets_, testing::ElementsAreArray(expected_offsets)); + + EXPECT_EQ(this->validity_io_.values_read, 65); + EXPECT_EQ(this->validity_io_.null_count, 0); + EXPECT_EQ(BitmapToString(this->validity_io_.valid_bits, /*length=*/65), + "11111111 " + "11111111 " + "11111111 " + "11111111 " + "11111111 " + "11111111 " + "11111111 " + "11111111 " + "1"); +} + +TYPED_TEST(NestedListTest, TestOverflow) { + LevelInfo level_info; + level_info.rep_level = 1; + level_info.def_level = 2; + level_info.repeated_ancestor_def_level = 0; + + MultiLevelTestData test_data; + test_data.def_levels = std::vector{2}; + test_data.rep_levels = std::vector{0}; + + this->InitForLength(2); + // Offsets is populated as the cumulative sum of all elements, + // so populating the offsets[0] with max-value impacts the + // other values populated. + this->offsets_[0] = std::numeric_limits::max(); + this->offsets_[1] = std::numeric_limits::max(); + ASSERT_THROW(this->Run(test_data, level_info), ParquetException); + + ASSERT_THROW(this->Run(test_data, level_info), ParquetException); + + // Same thing should happen if the list already existed. + test_data.rep_levels = std::vector{1}; + ASSERT_THROW(this->Run(test_data, level_info), ParquetException); + + // Should be OK because it shouldn't increment. + test_data.def_levels = std::vector{0}; + test_data.rep_levels = std::vector{0}; + this->Run(test_data, level_info); +} + +TEST(TestOnlyRunBasedExtract, BasicTest) { + EXPECT_EQ(TestOnlyRunBasedExtract(arrow::BitUtil::ToLittleEndian(0xFF), 0), 0); + EXPECT_EQ(TestOnlyRunBasedExtract(arrow::BitUtil::ToLittleEndian(0xFF), ~uint64_t{0}), + arrow::BitUtil::ToLittleEndian(0xFF)); + + EXPECT_EQ(TestOnlyRunBasedExtract(arrow::BitUtil::ToLittleEndian(0xFF00FF), + arrow::BitUtil::ToLittleEndian(0xAAAA)), + arrow::BitUtil::ToLittleEndian(0x000F)); + EXPECT_EQ(TestOnlyRunBasedExtract(arrow::BitUtil::ToLittleEndian(0xFF0AFF), + arrow::BitUtil::ToLittleEndian(0xAFAA)), + arrow::BitUtil::ToLittleEndian(0x00AF)); + EXPECT_EQ(TestOnlyRunBasedExtract(arrow::BitUtil::ToLittleEndian(0xFFAAFF), + arrow::BitUtil::ToLittleEndian(0xAFAA)), + arrow::BitUtil::ToLittleEndian(0x03AF)); } } // namespace internal