diff --git a/cpp/cmake_modules/SetupCxxFlags.cmake b/cpp/cmake_modules/SetupCxxFlags.cmake index a3d4062f0eb..96e79c69fc4 100644 --- a/cpp/cmake_modules/SetupCxxFlags.cmake +++ b/cpp/cmake_modules/SetupCxxFlags.cmake @@ -50,7 +50,7 @@ if(ARROW_CPU_FLAG STREQUAL "x86") # skylake-avx512 consists of AVX512F,AVX512BW,AVX512VL,AVX512CD,AVX512DQ set(ARROW_AVX512_FLAG "-march=skylake-avx512 -mbmi2") # Append the avx2/avx512 subset option also, fix issue ARROW-9877 for homebrew-cpp - set(ARROW_AVX2_FLAG "${ARROW_AVX2_FLAG} -mavx2") + set(ARROW_AVX2_FLAG "${ARROW_AVX2_FLAG} -mavx2 -mbmi2") set(ARROW_AVX512_FLAG "${ARROW_AVX512_FLAG} -mavx512f -mavx512cd -mavx512vl -mavx512dq -mavx512bw") check_cxx_compiler_flag(${ARROW_SSE4_2_FLAG} CXX_SUPPORTS_SSE4_2) @@ -68,10 +68,10 @@ if(ARROW_CPU_FLAG STREQUAL "x86") add_definitions(-DARROW_HAVE_RUNTIME_SSE4_2) endif() if(CXX_SUPPORTS_AVX2 AND ARROW_RUNTIME_SIMD_LEVEL MATCHES "^(AVX2|AVX512|MAX)$") - add_definitions(-DARROW_HAVE_RUNTIME_AVX2) + add_definitions(-DARROW_HAVE_RUNTIME_AVX2 -DARROW_HAVE_RUNTIME_BMI2) endif() if(CXX_SUPPORTS_AVX512 AND ARROW_RUNTIME_SIMD_LEVEL MATCHES "^(AVX512|MAX)$") - add_definitions(-DARROW_HAVE_RUNTIME_AVX512) + add_definitions(-DARROW_HAVE_RUNTIME_AVX512 -DARROW_HAVE_RUNTIME_BMI2) endif() elseif(ARROW_CPU_FLAG STREQUAL "ppc") # power compiler flags, gcc/clang only diff --git a/cpp/src/arrow/util/cpu_info.h b/cpp/src/arrow/util/cpu_info.h index 73695b7742e..a57ffd29467 100644 --- a/cpp/src/arrow/util/cpu_info.h +++ b/cpp/src/arrow/util/cpu_info.h @@ -106,6 +106,11 @@ class ARROW_EXPORT CpuInfo { /// Returns the vendor of the cpu. Vendor vendor() const { return vendor_; } + bool HasEfficientBmi2() const { + // BMI2 (pext, pdep) is only efficient on Intel X86 processors. + return vendor() == Vendor::Intel && IsSupported(BMI2); + } + private: CpuInfo(); diff --git a/cpp/src/parquet/CMakeLists.txt b/cpp/src/parquet/CMakeLists.txt index b70fe6b168d..3722a229338 100644 --- a/cpp/src/parquet/CMakeLists.txt +++ b/cpp/src/parquet/CMakeLists.txt @@ -188,6 +188,7 @@ set(PARQUET_SRCS file_writer.cc internal_file_decryptor.cc internal_file_encryptor.cc + level_comparison.cc level_conversion.cc metadata.cc murmur3.cc @@ -202,6 +203,19 @@ set(PARQUET_SRCS stream_writer.cc types.cc) +if(CXX_SUPPORTS_AVX2) + # AVX2 is used as a proxy for BMI2. + list(APPEND PARQUET_SRCS level_comparison_avx2.cc level_conversion_bmi2.cc) + set_source_files_properties(level_comparison_avx2.cc + level_conversion_bmi2.cc + PROPERTIES + SKIP_PRECOMPILE_HEADERS + ON + COMPILE_FLAGS + "${ARROW_AVX2_FLAG} -DARROW_HAVE_BMI2") + +endif() + if(PARQUET_REQUIRE_ENCRYPTION) set(PARQUET_SRCS ${PARQUET_SRCS} encryption_internal.cc) else() diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index 476d82f7fac..280097fca66 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -2994,7 +2994,6 @@ TEST_P(TestArrowReaderAdHocSparkAndHvr, ReadDecimals) { ASSERT_OK(builder.Append(value)); } ASSERT_OK(builder.Finish(&expected_array)); - AssertArraysEqual(*expected_array, *chunk); } diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index 5f13259058d..f054b8e8f38 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -373,13 +373,14 @@ class RowGroupReaderImpl : public RowGroupReader { class LeafReader : public ColumnReaderImpl { public: LeafReader(std::shared_ptr ctx, std::shared_ptr field, - std::unique_ptr input) + std::unique_ptr input, + ::parquet::internal::LevelInfo leaf_info) : ctx_(std::move(ctx)), field_(std::move(field)), input_(std::move(input)), descr_(input_->descr()) { record_reader_ = RecordReader::Make( - descr_, ctx_->pool, field_->type()->id() == ::arrow::Type::DICTIONARY); + descr_, leaf_info, ctx_->pool, field_->type()->id() == ::arrow::Type::DICTIONARY); NextRowGroup(); } @@ -694,7 +695,7 @@ Status GetReader(const SchemaField& field, const std::shared_ptr& } std::unique_ptr input( ctx->iterator_factory(field.column_index, ctx->reader)); - out->reset(new LeafReader(ctx, field.field, std::move(input))); + out->reset(new LeafReader(ctx, field.field, std::move(input), field.level_info)); } else if (type_id == ::arrow::Type::LIST) { // We can only read lists-of-lists or structs at the moment auto list_field = field.field; diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index 672b6e3708c..6b5450312b3 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -42,6 +42,7 @@ #include "parquet/encoding.h" #include "parquet/encryption_internal.h" #include "parquet/internal_file_decryptor.h" +#include "parquet/level_comparison.h" #include "parquet/level_conversion.h" #include "parquet/properties.h" #include "parquet/statistics.h" @@ -55,6 +56,25 @@ using arrow::internal::checked_cast; using arrow::internal::MultiplyWithOverflow; namespace parquet { +namespace { +inline bool HasSpacedValues(const ColumnDescriptor* descr) { + if (descr->max_repetition_level() > 0) { + // repeated+flat case + return !descr->schema_node()->is_required(); + } else { + // non-repeated+nested case + // Find if a node forces nulls in the lowest level along the hierarchy + const schema::Node* node = descr->schema_node().get(); + while (node) { + if (node->is_optional()) { + return true; + } + node = node->parent(); + } + return false; + } +} +} // namespace LevelDecoder::LevelDecoder() : num_values_remaining_(0) {} @@ -63,6 +83,7 @@ LevelDecoder::~LevelDecoder() {} int LevelDecoder::SetData(Encoding::type encoding, int16_t max_level, int num_buffered_values, const uint8_t* data, int32_t data_size) { + max_level_ = max_level; int32_t num_bytes = 0; encoding_ = encoding; num_values_remaining_ = num_buffered_values; @@ -110,6 +131,7 @@ int LevelDecoder::SetData(Encoding::type encoding, int16_t max_level, void LevelDecoder::SetDataV2(int32_t num_bytes, int16_t max_level, int num_buffered_values, const uint8_t* data) { + max_level_ = max_level; // Repetition and definition levels always uses RLE encoding // in the DataPageV2 format. if (num_bytes < 0) { @@ -135,6 +157,15 @@ int LevelDecoder::Decode(int batch_size, int16_t* levels) { } else { num_decoded = bit_packed_decoder_->GetBatch(bit_width_, levels, num_values); } + if (num_decoded > 0) { + internal::MinMax min_max = internal::FindMinMax(levels, num_decoded); + if (ARROW_PREDICT_FALSE(min_max.min < 0 || min_max.max > max_level_)) { + std::stringstream ss; + ss << "Malformed levels. min: " << min_max.min << " max: " << min_max.max + << " out of range. Max Level: " << max_level_; + throw ParquetException(ss.str()); + } + } num_values_remaining_ -= num_decoded; return num_decoded; } @@ -880,8 +911,7 @@ int64_t TypedColumnReaderImpl::ReadBatchSpaced( } } - const bool has_spaced_values = internal::HasSpacedValues(this->descr_); - + const bool has_spaced_values = HasSpacedValues(this->descr_); int64_t null_count = 0; if (!has_spaced_values) { int values_to_read = 0; @@ -896,9 +926,12 @@ int64_t TypedColumnReaderImpl::ReadBatchSpaced( /*bits_are_set=*/true); *values_read = total_values; } else { - internal::DefinitionLevelsToBitmap(def_levels, num_def_levels, this->max_def_level_, - this->max_rep_level_, values_read, &null_count, - valid_bits, valid_bits_offset); + internal::LevelInfo info; + info.repeated_ancestor_def_level = this->max_def_level_ - 1; + info.def_level = this->max_def_level_; + info.rep_level = this->max_rep_level_; + internal::DefinitionLevelsToBitmap(def_levels, num_def_levels, info, values_read, + &null_count, valid_bits, valid_bits_offset); total_values = this->ReadValuesSpaced(*values_read, values, static_cast(null_count), valid_bits, valid_bits_offset); @@ -1008,8 +1041,10 @@ class TypedRecordReader : public ColumnReaderImplBase, public: using T = typename DType::c_type; using BASE = ColumnReaderImplBase; - TypedRecordReader(const ColumnDescriptor* descr, MemoryPool* pool) : BASE(descr, pool) { - nullable_values_ = internal::HasSpacedValues(descr); + TypedRecordReader(const ColumnDescriptor* descr, LevelInfo leaf_info, MemoryPool* pool) + : BASE(descr, pool) { + leaf_info_ = leaf_info; + nullable_values_ = leaf_info.HasNullableValues(); at_record_start_ = true; records_read_ = 0; values_written_ = 0; @@ -1128,7 +1163,7 @@ class TypedRecordReader : public ColumnReaderImplBase, } std::shared_ptr ReleaseIsValid() override { - if (nullable_values_) { + if (leaf_info_.HasNullableValues()) { auto result = valid_bits_; PARQUET_THROW_NOT_OK(result->Resize(BitUtil::BytesForBits(values_written_), true)); valid_bits_ = AllocateBuffer(this->pool_); @@ -1170,13 +1205,7 @@ class TypedRecordReader : public ColumnReaderImplBase, break; } } - } else if (ARROW_PREDICT_FALSE(rep_level > this->max_rep_level_)) { - std::stringstream ss; - ss << "Malformed repetition levels, " << rep_level << " exceeded maximum " - << this->max_rep_level_ << " indicated by schema"; - throw ParquetException(ss.str()); } - // We have decided to consume the level at this position; therefore we // must advance until we find another record boundary at_record_start_ = false; @@ -1184,11 +1213,6 @@ class TypedRecordReader : public ColumnReaderImplBase, const int16_t def_level = *def_levels++; if (def_level == this->max_def_level_) { ++values_to_read; - } else if (ARROW_PREDICT_FALSE(def_level > this->max_def_level_)) { - std::stringstream ss; - ss << "Malformed definition levels, " << def_level << " exceeded maximum " - << this->max_def_level_ << " indicated by schema"; - throw ParquetException(ss.str()); } ++levels_position_; } @@ -1249,7 +1273,7 @@ class TypedRecordReader : public ColumnReaderImplBase, } values_capacity_ = new_values_capacity; } - if (nullable_values_) { + if (leaf_info_.HasNullableValues()) { int64_t valid_bytes_new = BitUtil::BytesForBits(values_capacity_); if (valid_bits_->size() < valid_bytes_new) { int64_t valid_bytes_old = BitUtil::BytesForBits(values_written_); @@ -1344,12 +1368,12 @@ class TypedRecordReader : public ColumnReaderImplBase, } int64_t null_count = 0; - if (nullable_values_) { + if (leaf_info_.HasNullableValues()) { int64_t values_with_nulls = 0; - DefinitionLevelsToBitmap( - def_levels() + start_levels_position, levels_position_ - start_levels_position, - this->max_def_level_, this->max_rep_level_, &values_with_nulls, &null_count, - valid_bits_->mutable_data(), values_written_); + DefinitionLevelsToBitmap(def_levels() + start_levels_position, + levels_position_ - start_levels_position, leaf_info_, + &values_with_nulls, &null_count, + valid_bits_->mutable_data(), values_written_); values_to_read = values_with_nulls - null_count; DCHECK_GE(values_to_read, 0); ReadValuesSpaced(values_with_nulls, null_count); @@ -1357,7 +1381,7 @@ class TypedRecordReader : public ColumnReaderImplBase, DCHECK_GE(values_to_read, 0); ReadValuesDense(values_to_read); } - if (this->max_def_level_ > 0) { + if (this->leaf_info_.def_level > 0) { // Optional, repeated, or some mix thereof this->ConsumeBufferedValues(levels_position_ - start_levels_position); } else { @@ -1415,13 +1439,15 @@ class TypedRecordReader : public ColumnReaderImplBase, T* ValuesHead() { return reinterpret_cast(values_->mutable_data()) + values_written_; } + LevelInfo leaf_info_; }; class FLBARecordReader : public TypedRecordReader, virtual public BinaryRecordReader { public: - FLBARecordReader(const ColumnDescriptor* descr, ::arrow::MemoryPool* pool) - : TypedRecordReader(descr, pool), builder_(nullptr) { + FLBARecordReader(const ColumnDescriptor* descr, LevelInfo leaf_info, + ::arrow::MemoryPool* pool) + : TypedRecordReader(descr, leaf_info, pool), builder_(nullptr) { DCHECK_EQ(descr_->physical_type(), Type::FIXED_LEN_BYTE_ARRAY); int byte_width = descr_->type_length(); std::shared_ptr<::arrow::DataType> type = ::arrow::fixed_size_binary(byte_width); @@ -1473,8 +1499,9 @@ class FLBARecordReader : public TypedRecordReader, class ByteArrayChunkedRecordReader : public TypedRecordReader, virtual public BinaryRecordReader { public: - ByteArrayChunkedRecordReader(const ColumnDescriptor* descr, ::arrow::MemoryPool* pool) - : TypedRecordReader(descr, pool) { + ByteArrayChunkedRecordReader(const ColumnDescriptor* descr, LevelInfo leaf_info, + ::arrow::MemoryPool* pool) + : TypedRecordReader(descr, leaf_info, pool) { DCHECK_EQ(descr_->physical_type(), Type::BYTE_ARRAY); accumulator_.builder.reset(new ::arrow::BinaryBuilder(pool)); } @@ -1513,9 +1540,9 @@ class ByteArrayChunkedRecordReader : public TypedRecordReader, class ByteArrayDictionaryRecordReader : public TypedRecordReader, virtual public DictionaryRecordReader { public: - ByteArrayDictionaryRecordReader(const ColumnDescriptor* descr, + ByteArrayDictionaryRecordReader(const ColumnDescriptor* descr, LevelInfo leaf_info, ::arrow::MemoryPool* pool) - : TypedRecordReader(descr, pool), builder_(pool) { + : TypedRecordReader(descr, leaf_info, pool), builder_(pool) { this->read_dictionary_ = true; } @@ -1602,35 +1629,36 @@ template <> void TypedRecordReader::DebugPrintState() {} std::shared_ptr MakeByteArrayRecordReader(const ColumnDescriptor* descr, + LevelInfo leaf_info, ::arrow::MemoryPool* pool, bool read_dictionary) { if (read_dictionary) { - return std::make_shared(descr, pool); + return std::make_shared(descr, leaf_info, pool); } else { - return std::make_shared(descr, pool); + return std::make_shared(descr, leaf_info, pool); } } std::shared_ptr RecordReader::Make(const ColumnDescriptor* descr, - MemoryPool* pool, + LevelInfo leaf_info, MemoryPool* pool, const bool read_dictionary) { switch (descr->physical_type()) { case Type::BOOLEAN: - return std::make_shared>(descr, pool); + return std::make_shared>(descr, leaf_info, pool); case Type::INT32: - return std::make_shared>(descr, pool); + return std::make_shared>(descr, leaf_info, pool); case Type::INT64: - return std::make_shared>(descr, pool); + return std::make_shared>(descr, leaf_info, pool); case Type::INT96: - return std::make_shared>(descr, pool); + return std::make_shared>(descr, leaf_info, pool); case Type::FLOAT: - return std::make_shared>(descr, pool); + return std::make_shared>(descr, leaf_info, pool); case Type::DOUBLE: - return std::make_shared>(descr, pool); + return std::make_shared>(descr, leaf_info, pool); case Type::BYTE_ARRAY: - return MakeByteArrayRecordReader(descr, pool, read_dictionary); + return MakeByteArrayRecordReader(descr, leaf_info, pool, read_dictionary); case Type::FIXED_LEN_BYTE_ARRAY: - return std::make_shared(descr, pool); + return std::make_shared(descr, leaf_info, pool); default: { // PARQUET-1481: This can occur if the file is corrupt std::stringstream ss; diff --git a/cpp/src/parquet/column_reader.h b/cpp/src/parquet/column_reader.h index 7b5ee1b722a..60c44ffa6d2 100644 --- a/cpp/src/parquet/column_reader.h +++ b/cpp/src/parquet/column_reader.h @@ -23,6 +23,7 @@ #include #include "parquet/exception.h" +#include "parquet/level_conversion.h" #include "parquet/platform.h" #include "parquet/schema.h" #include "parquet/types.h" @@ -75,6 +76,7 @@ class PARQUET_EXPORT LevelDecoder { Encoding::type encoding_; std::unique_ptr<::arrow::util::RleDecoder> rle_decoder_; std::unique_ptr<::arrow::BitUtil::BitReader> bit_packed_decoder_; + int16_t max_level_; }; struct CryptoContext { @@ -208,7 +210,7 @@ namespace internal { class RecordReader { public: static std::shared_ptr Make( - const ColumnDescriptor* descr, + const ColumnDescriptor* descr, LevelInfo leaf_info, ::arrow::MemoryPool* pool = ::arrow::default_memory_pool(), const bool read_dictionary = false); @@ -314,25 +316,6 @@ class DictionaryRecordReader : virtual public RecordReader { virtual std::shared_ptr<::arrow::ChunkedArray> GetResult() = 0; }; -// TODO(itaiin): another code path split to merge when the general case is done -static inline bool HasSpacedValues(const ColumnDescriptor* descr) { - if (descr->max_repetition_level() > 0) { - // repeated+flat case - return !descr->schema_node()->is_required(); - } else { - // non-repeated+nested case - // Find if a node forces nulls in the lowest level along the hierarchy - const schema::Node* node = descr->schema_node().get(); - while (node) { - if (node->is_optional()) { - return true; - } - node = node->parent(); - } - return false; - } -} - } // namespace internal using BoolReader = TypedColumnReader; diff --git a/cpp/src/parquet/level_comparison.cc b/cpp/src/parquet/level_comparison.cc new file mode 100644 index 00000000000..2b68fef42c1 --- /dev/null +++ b/cpp/src/parquet/level_comparison.cc @@ -0,0 +1,71 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#define IMPL_NAMESPACE standard +#include "parquet/level_comparison.h" + +#include + +#include "arrow/util/dispatch.h" + +namespace parquet { +namespace internal { +namespace { + +using ::arrow::internal::DispatchLevel; +using ::arrow::internal::DynamicDispatch; + +struct GreaterThanDynamicFunction { + using FunctionType = decltype(&GreaterThanBitmap); + + static std::vector> implementations() { + return { + { DispatchLevel::NONE, standard::GreaterThanBitmapImpl } +#if defined(ARROW_HAVE_RUNTIME_AVX2) + , { DispatchLevel::AVX2, GreaterThanBitmapAvx2 } +#endif + }; + } +}; + +struct MinMaxDynamicFunction { + using FunctionType = decltype(&FindMinMax); + + static std::vector> implementations() { + return { + { DispatchLevel::NONE, standard::FindMinMaxImpl } +#if defined(ARROW_HAVE_RUNTIME_AVX2) + , { DispatchLevel::AVX2, FindMinMaxAvx2 } +#endif + }; + } +}; + +} // namespace + +uint64_t GreaterThanBitmap(const int16_t* levels, int64_t num_levels, int16_t rhs) { + static DynamicDispatch dispatch; + return dispatch.func(levels, num_levels, rhs); +} + +MinMax FindMinMax(const int16_t* levels, int64_t num_levels) { + static DynamicDispatch dispatch; + return dispatch.func(levels, num_levels); +} + +} // namespace internal +} // namespace parquet diff --git a/cpp/src/parquet/level_comparison.h b/cpp/src/parquet/level_comparison.h new file mode 100644 index 00000000000..92e5f0d3cb3 --- /dev/null +++ b/cpp/src/parquet/level_comparison.h @@ -0,0 +1,91 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include +#include + +#include "arrow/util/bit_util.h" + +namespace parquet { +namespace internal { + +// These APIs are likely to be revised as part of ARROW-8494 to reduce duplicate code. +// They currently represent minimal functionality for vectorized computation of definition +// levels. + +/// Builds a bitmap by applying predicate to the level vector provided. +/// +/// \param[in] levels Rep or def level array. +/// \param[in] num_levels The number of levels to process (must be [0, 64]) +/// \param[in] predicate The predicate to apply (must have the signature `bool +/// predicate(int16_t)`. +/// \returns The bitmap using least significant "bit" ordering. +/// +/// N.B. Correct byte ordering is dependent on little-endian architectures. +/// +template +inline uint64_t LevelsToBitmap(const int16_t* levels, int64_t num_levels, + Predicate predicate) { + // Both clang and GCC can vectorize this automatically with SSE4/AVX2. + uint64_t mask = 0; + for (int x = 0; x < num_levels; x++) { + mask |= static_cast(predicate(levels[x]) ? 1 : 0) << x; + } + return ::arrow::BitUtil::ToLittleEndian(mask); +} + +/// Builds a bitmap where each set bit indicates the corresponding level is greater +/// than rhs. +uint64_t GreaterThanBitmap(const int16_t* levels, int64_t num_levels, int16_t rhs); + +#if defined(ARROW_HAVE_RUNTIME_AVX2) +uint64_t GreaterThanBitmapAvx2(const int16_t* levels, int64_t num_levels, int16_t rhs); +#endif + +struct MinMax { + int16_t min; + int16_t max; +}; + +MinMax FindMinMax(const int16_t* levels, int64_t num_levels); + +#if defined(ARROW_HAVE_RUNTIME_AVX2) +MinMax FindMinMaxAvx2(const int16_t* levels, int64_t num_levels); +#endif + +// Used to make sure ODR rule isn't violated. +namespace IMPL_NAMESPACE { +inline MinMax FindMinMaxImpl(const int16_t* levels, int64_t num_levels) { + MinMax out{std::numeric_limits::max(), std::numeric_limits::min()}; + for (int x = 0; x < num_levels; x++) { + out.min = std::min(levels[x], out.min); + out.max = std::max(levels[x], out.max); + } + return out; +} + +inline uint64_t GreaterThanBitmapImpl(const int16_t* levels, int64_t num_levels, + int16_t rhs) { + return LevelsToBitmap(levels, num_levels, [rhs](int16_t value) { return value > rhs; }); +} + +} // namespace IMPL_NAMESPACE + +} // namespace internal + +} // namespace parquet diff --git a/cpp/src/parquet/level_comparison_avx2.cc b/cpp/src/parquet/level_comparison_avx2.cc new file mode 100644 index 00000000000..6cd6d0cc1f7 --- /dev/null +++ b/cpp/src/parquet/level_comparison_avx2.cc @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#define IMPL_NAMESPACE avx2 +#include "parquet/level_comparison.h" +#undef IMPL_NAMESPACE + +namespace parquet { +namespace internal { + +uint64_t GreaterThanBitmapAvx2(const int16_t* levels, int64_t num_levels, int16_t rhs) { + return avx2::GreaterThanBitmapImpl(levels, num_levels, rhs); +} + +MinMax FindMinMaxAvx2(const int16_t* levels, int64_t num_levels) { + return avx2::FindMinMaxImpl(levels, num_levels); +} +} // namespace internal +} // namespace parquet diff --git a/cpp/src/parquet/level_conversion.cc b/cpp/src/parquet/level_conversion.cc index cfa5df1a7e0..bd3d280781b 100644 --- a/cpp/src/parquet/level_conversion.cc +++ b/cpp/src/parquet/level_conversion.cc @@ -18,176 +18,150 @@ #include #include -#if defined(ARROW_HAVE_BMI2) -#include -#endif +#include "arrow/util/bit_run_reader.h" #include "arrow/util/bit_util.h" +#include "arrow/util/cpu_info.h" #include "arrow/util/logging.h" #include "parquet/exception.h" +#include "parquet/level_comparison.h" + +#define BMI_RUNTIME_VERSION standard +#include "parquet/level_conversion_inc.h" +#undef BMI_RUNTIME_VERSION namespace parquet { namespace internal { namespace { -inline void CheckLevelRange(const int16_t* levels, int64_t num_levels, - const int16_t max_expected_level) { - int16_t min_level = std::numeric_limits::max(); - int16_t max_level = std::numeric_limits::min(); - for (int x = 0; x < num_levels; x++) { - min_level = std::min(levels[x], min_level); - max_level = std::max(levels[x], max_level); - } - if (ARROW_PREDICT_FALSE(num_levels > 0 && - (min_level < 0 || max_level > max_expected_level))) { - throw ParquetException("definition level exceeds maximum"); - } -} -#if !defined(ARROW_HAVE_AVX512) - -inline void DefinitionLevelsToBitmapScalar( - const int16_t* def_levels, int64_t num_def_levels, const int16_t max_definition_level, - const int16_t max_repetition_level, int64_t* values_read, int64_t* null_count, - uint8_t* valid_bits, int64_t valid_bits_offset) { - // We assume here that valid_bits is large enough to accommodate the - // additional definition levels and the ones that have already been written - ::arrow::internal::BitmapWriter valid_bits_writer(valid_bits, valid_bits_offset, - num_def_levels); - - // TODO(itaiin): As an interim solution we are splitting the code path here - // between repeated+flat column reads, and non-repeated+nested reads. - // Those paths need to be merged in the future - for (int i = 0; i < num_def_levels; ++i) { - if (def_levels[i] == max_definition_level) { +using ::arrow::internal::CpuInfo; + +#if !defined(ARROW_HAVE_RUNTIME_BMI2) +void DefinitionLevelsToBitmapScalar(const int16_t* def_levels, int64_t num_def_levels, + LevelInfo level_info, int64_t* values_read, + int64_t* null_count, uint8_t* valid_bits, + int64_t valid_bits_offset) { + ::arrow::internal::FirstTimeBitmapWriter valid_bits_writer( + valid_bits, + /*start_offset=*/valid_bits_offset, + /*length=*/num_def_levels); + for (int x = 0; x < num_def_levels; x++) { + if (def_levels[x] < level_info.repeated_ancestor_def_level) { + continue; + } + if (def_levels[x] >= level_info.def_level) { valid_bits_writer.Set(); - } else if (max_repetition_level > 0) { - // repetition+flat case - if (def_levels[i] == (max_definition_level - 1)) { - valid_bits_writer.Clear(); - *null_count += 1; - } else { - continue; - } } else { - // non-repeated+nested case - if (def_levels[i] < max_definition_level) { - valid_bits_writer.Clear(); - *null_count += 1; - } else { - throw ParquetException("definition level exceeds maximum"); - } + valid_bits_writer.Clear(); + *null_count += 1; } - valid_bits_writer.Next(); } valid_bits_writer.Finish(); *values_read = valid_bits_writer.position(); + if (*null_count > 0 && level_info.null_slot_usage > 1) { + throw ParquetException( + "Null values with null_slot_usage > 1 not supported." + "(i.e. FixedSizeLists with null values are not supported"); + } } #endif -template -int64_t DefinitionLevelsBatchToBitmap(const int16_t* def_levels, const int64_t batch_size, - const int16_t required_definition_level, - ::arrow::internal::FirstTimeBitmapWriter* writer) { - CheckLevelRange(def_levels, batch_size, required_definition_level); - uint64_t defined_bitmap = - internal::GreaterThanBitmap(def_levels, batch_size, required_definition_level - 1); - - DCHECK_LE(batch_size, 64); - if (has_repeated_parent) { -#if defined(ARROW_HAVE_BMI2) - // This is currently a specialized code path assuming only (nested) lists - // present through the leaf (i.e. no structs). Upper level code only calls - // this method when the leaf-values are nullable (otherwise no spacing is needed), - // Because only nested lists exists it is sufficient to know that the field - // was either null or included it (i.e. definition level > max_definitation_level - // -2) If there where structs mixed in, we need to know the def_level of the - // repeated parent so we can check for def_level > "def level of repeated parent". - uint64_t present_bitmap = internal::GreaterThanBitmap(def_levels, batch_size, - required_definition_level - 2); - uint64_t selected_bits = _pext_u64(defined_bitmap, present_bitmap); - writer->AppendWord(selected_bits, ::arrow::BitUtil::PopCount(present_bitmap)); - return ::arrow::BitUtil::PopCount(selected_bits); -#else - assert(false && "must not execute this without BMI2"); -#endif - } else { - writer->AppendWord(defined_bitmap, batch_size); - return ::arrow::BitUtil::PopCount(defined_bitmap); +template +void DefRepLevelsToListInfo(const int16_t* def_levels, const int16_t* rep_levels, + int64_t num_def_levels, LevelInfo level_info, + ValidityBitmapInputOutput* output, LengthType* lengths) { + LengthType* orig_pos = lengths; + std::unique_ptr<::arrow::internal::FirstTimeBitmapWriter> valid_bits_writer; + if (output->valid_bits) { + valid_bits_writer.reset(new ::arrow::internal::FirstTimeBitmapWriter( + output->valid_bits, output->valid_bits_offset, num_def_levels)); } -} - -template -void DefinitionLevelsToBitmapSimd(const int16_t* def_levels, int64_t num_def_levels, - const int16_t required_definition_level, - int64_t* values_read, int64_t* null_count, - uint8_t* valid_bits, int64_t valid_bits_offset) { - constexpr int64_t kBitMaskSize = 64; - ::arrow::internal::FirstTimeBitmapWriter writer(valid_bits, - /*start_offset=*/valid_bits_offset, - /*length=*/num_def_levels); - int64_t set_count = 0; - *values_read = 0; - while (num_def_levels > kBitMaskSize) { - set_count += DefinitionLevelsBatchToBitmap( - def_levels, kBitMaskSize, required_definition_level, &writer); - def_levels += kBitMaskSize; - num_def_levels -= kBitMaskSize; + for (int x = 0; x < num_def_levels; x++) { + // Skip items that belong to empty ancenstor lists and futher nested lists. + if (def_levels[x] < level_info.repeated_ancestor_def_level || + rep_levels[x] > level_info.rep_level) { + continue; + } + if (rep_levels[x] == level_info.rep_level) { + // A continuation of an existing list. + *lengths += 1; + } else { + // current_rep < list rep_level i.e. start of a list (ancenstor empty lists are + // filtered out above). + ++lengths; + *lengths = (def_levels[x] >= level_info.def_level) ? 1 : 0; + + if (valid_bits_writer != nullptr) { + // the level_info def level for lists reflects element present level. + // the prior level distinguishes between empty lists. + if (def_levels[x] >= level_info.def_level - 1) { + valid_bits_writer->Set(); + } else { + output->null_count++; + valid_bits_writer->Clear(); + } + valid_bits_writer->Next(); + } + } } - set_count += DefinitionLevelsBatchToBitmap( - def_levels, num_def_levels, required_definition_level, &writer); - - *values_read = writer.position(); - *null_count += *values_read - set_count; - writer.Finish(); -} - -void DefinitionLevelsToBitmapLittleEndian( - const int16_t* def_levels, int64_t num_def_levels, const int16_t max_definition_level, - const int16_t max_repetition_level, int64_t* values_read, int64_t* null_count, - uint8_t* valid_bits, int64_t valid_bits_offset) { - if (max_repetition_level > 0) { -// This is a short term hack to prevent using the pext BMI2 instructions -// on non-intel platforms where performance is subpar. -// In the medium term we will hopefully be able to runtime dispatch -// to use this on intel only platforms that support pext. -#if defined(ARROW_HAVE_AVX512) - // BMI2 is required for efficient bit extraction. - DefinitionLevelsToBitmapSimd( - def_levels, num_def_levels, max_definition_level, values_read, null_count, - valid_bits, valid_bits_offset); -#else - DefinitionLevelsToBitmapScalar(def_levels, num_def_levels, max_definition_level, - max_repetition_level, values_read, null_count, - valid_bits, valid_bits_offset); -#endif // ARROW_HAVE_BMI2 - - } else { - // No BMI2 intsturctions are used for non-repeated case. - DefinitionLevelsToBitmapSimd( - def_levels, num_def_levels, max_definition_level, values_read, null_count, - valid_bits, valid_bits_offset); + if (valid_bits_writer != nullptr) { + valid_bits_writer->Finish(); + } + output->values_read = lengths - orig_pos; + if (output->null_count > 0 && level_info.null_slot_usage > 1) { + throw ParquetException( + "Null values with null_slot_usage > 1 not supported." + "(i.e. FixedSizeLists with null values are not supported)"); } } } // namespace void DefinitionLevelsToBitmap(const int16_t* def_levels, int64_t num_def_levels, - const int16_t max_definition_level, - const int16_t max_repetition_level, int64_t* values_read, + LevelInfo level_info, int64_t* values_read, int64_t* null_count, uint8_t* valid_bits, int64_t valid_bits_offset) { -#if ARROW_LITTLE_ENDIAN - DefinitionLevelsToBitmapLittleEndian(def_levels, num_def_levels, max_definition_level, - max_repetition_level, values_read, null_count, - valid_bits, valid_bits_offset); - + if (level_info.rep_level > 0) { +#if defined(ARROW_HAVE_RUNTIME_BMI2) + using FunctionType = decltype(&standard::DefinitionLevelsToBitmapSimd); + static FunctionType fn = + CpuInfo::GetInstance()->HasEfficientBmi2() + ? DefinitionLevelsToBitmapBmi2WithRepeatedParent + : standard::DefinitionLevelsToBitmapSimd; + fn(def_levels, num_def_levels, level_info, values_read, null_count, valid_bits, + valid_bits_offset); #else - DefinitionLevelsToBitmapScalar(def_levels, num_def_levels, max_definition_level, - max_repetition_level, values_read, null_count, - valid_bits, valid_bits_offset); + DefinitionLevelsToBitmapScalar(def_levels, num_def_levels, level_info, values_read, + null_count, valid_bits, valid_bits_offset); #endif + } else { + standard::DefinitionLevelsToBitmapSimd( + def_levels, num_def_levels, level_info, values_read, null_count, valid_bits, + valid_bits_offset); + } +} + +uint64_t RunBasedExtract(uint64_t bitmap, uint64_t select_bitmap) { + return standard::RunBasedExtractImpl(bitmap, select_bitmap); +} + +void ConvertDefRepLevelsToList(const int16_t* def_levels, const int16_t* rep_levels, + int64_t num_def_levels, LevelInfo level_info, + ValidityBitmapInputOutput* output, + ::arrow::util::variant lengths) { + if (arrow::util::holds_alternative(lengths)) { + auto int32_lengths = ::arrow::util::get(lengths); + DefRepLevelsToListInfo(def_levels, rep_levels, num_def_levels, level_info, + output, int32_lengths); + } else if (arrow::util::holds_alternative(lengths)) { + auto int64_lengths = ::arrow::util::get(lengths); + DefRepLevelsToListInfo(def_levels, rep_levels, num_def_levels, level_info, + output, int64_lengths); + } else { + throw ParquetException("Unrecognized variant"); + } } } // namespace internal diff --git a/cpp/src/parquet/level_conversion.h b/cpp/src/parquet/level_conversion.h index dbecb3171cf..cc8112cde08 100644 --- a/cpp/src/parquet/level_conversion.h +++ b/cpp/src/parquet/level_conversion.h @@ -19,6 +19,9 @@ #include +#include "arrow/util/bitmap.h" +#include "arrow/util/optional.h" +#include "arrow/util/variant.h" #include "parquet/platform.h" #include "parquet/schema.h" @@ -41,6 +44,8 @@ struct PARQUET_EXPORT LevelInfo { repeated_ancestor_def_level == b.repeated_ancestor_def_level; } + bool HasNullableValues() const { return repeated_ancestor_def_level < def_level; } + // How many slots an undefined but present (i.e. null) element in // parquet consumes when decoding to Arrow. // "Slot" is used in the same context as the Arrow specification @@ -132,43 +137,41 @@ struct PARQUET_EXPORT LevelInfo { } }; -void PARQUET_EXPORT DefinitionLevelsToBitmap( - const int16_t* def_levels, int64_t num_def_levels, const int16_t max_definition_level, - const int16_t max_repetition_level, int64_t* values_read, int64_t* null_count, - uint8_t* valid_bits, int64_t valid_bits_offset); - -// These APIs are likely to be revised as part of ARROW-8494 to reduce duplicate code. -// They currently represent minimal functionality for vectorized computation of definition -// levels. - -#if defined(ARROW_LITTLE_ENDIAN) -/// Builds a bitmap by applying predicate to the level vector provided. -/// -/// \param[in] levels Rep or def level array. -/// \param[in] num_levels The number of levels to process (must be [0, 64]) -/// \param[in] predicate The predicate to apply (must have the signature `bool -/// predicate(int16_t)`. -/// \returns The bitmap using least significant "bit" ordering. -/// -/// N.B. Correct byte ordering is dependent on little-endian architectures. -/// -template -uint64_t LevelsToBitmap(const int16_t* levels, int64_t num_levels, Predicate predicate) { - // Both clang and GCC can vectorize this automatically with SSE4/AVX2. - uint64_t mask = 0; - for (int x = 0; x < num_levels; x++) { - mask |= static_cast(predicate(levels[x]) ? 1 : 0) << x; - } - return mask; -} +/// Converts def_levels to validity bitmaps for non-list arrays. +/// TODO: use input/output parameter below instead of individual return variables. +void PARQUET_EXPORT DefinitionLevelsToBitmap(const int16_t* def_levels, + int64_t num_def_levels, LevelInfo level_info, + int64_t* values_read, int64_t* null_count, + uint8_t* valid_bits, + int64_t valid_bits_offset); + +/// Input/Output structure for reconstructed validity bitmaps. +struct PARQUET_EXPORT ValidityBitmapInputOutput { + /// The number of values added to the bitmap. + int64_t values_read = 0; + /// The number of nulls encountered. + int64_t null_count = 0; + // The validity bitmp to populate. Can only be null + // for DefRepLevelsToListInfo (if all that is needed is list lengths). + uint8_t* valid_bits = nullptr; + /// Input only, offset into valid_bits to start at. + int64_t valid_bits_offset = 0; +}; + +/// Reconstructs a validity bitmap and list lengths for a ListArray based on +/// def/rep levels. +void PARQUET_EXPORT ConvertDefRepLevelsToList( + const int16_t* def_levels, const int16_t* rep_levels, int64_t num_def_levels, + LevelInfo level_info, ValidityBitmapInputOutput* output, + ::arrow::util::variant lengths); -/// Builds a bitmap where each set bit indicates the corresponding level is greater -/// than rhs. -static inline uint64_t GreaterThanBitmap(const int16_t* levels, int64_t num_levels, - int16_t rhs) { - return LevelsToBitmap(levels, num_levels, [rhs](int16_t value) { return value > rhs; }); -} +uint64_t RunBasedExtract(uint64_t bitmap, uint64_t selection); +#if defined(ARROW_HAVE_RUNTIME_BMI2) +void PARQUET_EXPORT DefinitionLevelsToBitmapBmi2WithRepeatedParent( + const int16_t* def_levels, int64_t num_def_levels, LevelInfo level_info, + int64_t* values_read, int64_t* null_count, uint8_t* valid_bits, + int64_t valid_bits_offset); #endif } // namespace internal diff --git a/cpp/src/parquet/level_conversion_benchmark.cc b/cpp/src/parquet/level_conversion_benchmark.cc index 4f15838d339..dccc922b53c 100644 --- a/cpp/src/parquet/level_conversion_benchmark.cc +++ b/cpp/src/parquet/level_conversion_benchmark.cc @@ -38,10 +38,12 @@ std::vector RunDefinitionLevelsToBitmap(const std::vector& def int64_t null_count = 0; std::vector bitmap(/*count=*/def_levels.size(), 0); int rep = 0; + parquet::internal::LevelInfo info; + info.def_level = kHasRepeatedElements; + info.repeated_ancestor_def_level = kPresentDefLevel; for (auto _ : *state) { parquet::internal::DefinitionLevelsToBitmap( - def_levels.data(), def_levels.size(), /*max_definition_level=*/kPresentDefLevel, - /*max_repetition_level=*/kHasRepeatedElements, &values_read, &null_count, + def_levels.data(), def_levels.size(), info, &values_read, &null_count, bitmap.data(), /*valid_bits_offset=*/(rep++ % 8) * def_levels.size()); } state->SetBytesProcessed(int64_t(state->iterations()) * def_levels.size()); diff --git a/cpp/src/parquet/level_conversion_bmi2.cc b/cpp/src/parquet/level_conversion_bmi2.cc new file mode 100644 index 00000000000..1c9a6d6c692 --- /dev/null +++ b/cpp/src/parquet/level_conversion_bmi2.cc @@ -0,0 +1,35 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include "parquet/level_conversion.h" + +#define BMI_RUNTIME_VERSION bmi2 +#include "parquet/level_conversion_inc.h" +#undef BMI_RUNTIME_VERSION + +namespace parquet { +namespace internal { +void DefinitionLevelsToBitmapBmi2WithRepeatedParent( + const int16_t* def_levels, int64_t num_def_levels, LevelInfo level_info, + int64_t* values_read, int64_t* null_count, uint8_t* valid_bits, + int64_t valid_bits_offset) { + bmi2::DefinitionLevelsToBitmapSimd( + def_levels, num_def_levels, level_info, values_read, null_count, valid_bits, + valid_bits_offset); +} + +} // namespace internal +} // namespace parquet diff --git a/cpp/src/parquet/level_conversion_inc.h b/cpp/src/parquet/level_conversion_inc.h new file mode 100644 index 00000000000..4b932580898 --- /dev/null +++ b/cpp/src/parquet/level_conversion_inc.h @@ -0,0 +1,131 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include "parquet/level_conversion.h" + +#include +#include +#if defined(ARROW_HAVE_BMI2) +#include +#endif + +#include "arrow/util/bit_run_reader.h" +#include "arrow/util/bit_util.h" +#include "arrow/util/logging.h" +#include "parquet/exception.h" +#include "parquet/level_comparison.h" + +namespace parquet { +namespace internal { +namespace BMI_RUNTIME_VERSION { + +using ::arrow::internal::BitRun; +using ::arrow::internal::BitRunReader; + +/// Algorithm to simulate pext using BitRunReader for cases where all bits +/// not set or set. +uint64_t RunBasedExtractMixed(uint64_t bitmap, uint64_t select_bitmap) { + bitmap = arrow::BitUtil::FromLittleEndian(bitmap); + uint64_t new_bitmap = 0; + ::arrow::internal::BitRunReader selection(reinterpret_cast(&select_bitmap), + /*start_offset=*/0, /*length=*/64); + ::arrow::internal::BitRun run = selection.NextRun(); + int64_t selected_bits = 0; + while (run.length != 0) { + if (run.set) { + new_bitmap |= (bitmap & ::arrow::BitUtil::LeastSignficantBitMask(run.length)) + << selected_bits; + selected_bits += run.length; + } + bitmap = bitmap >> run.length; + run = selection.NextRun(); + } + return arrow::BitUtil::ToLittleEndian(new_bitmap); +} + +inline uint64_t RunBasedExtractImpl(uint64_t bitmap, uint64_t select_bitmap) { + /// These checks should be inline and are likely to be common cases. + if (select_bitmap == ~uint64_t{0}) { + return bitmap; + } else if (select_bitmap == 0) { + return 0; + } + /// Fallback to the slow method. + return RunBasedExtractMixed(bitmap, select_bitmap); +} + +inline uint64_t ExtractBits(uint64_t bitmap, uint64_t select_bitmap) { +#if defined(ARROW_HAVE_BMI2) + return _pext_u64(bitmap, select_bitmap); +#else + return RunBasedExtractImpl(bitmap, select_bitmap); +#endif +} + +template +int64_t DefinitionLevelsBatchToBitmap(const int16_t* def_levels, const int64_t batch_size, + LevelInfo level_info, + ::arrow::internal::FirstTimeBitmapWriter* writer) { + // Greater than level_info.def_level - 1 implies >= the def_level + uint64_t defined_bitmap = + internal::GreaterThanBitmap(def_levels, batch_size, level_info.def_level - 1); + + DCHECK_LE(batch_size, 64); + if (has_repeated_parent) { + // Greater than level_info.repeated_ancestor_def_level - 1 implies >= the + // repeated_ancenstor_def_level + uint64_t present_bitmap = internal::GreaterThanBitmap( + def_levels, batch_size, level_info.repeated_ancestor_def_level - 1); + uint64_t selected_bits = ExtractBits(defined_bitmap, present_bitmap); + writer->AppendWord(selected_bits, ::arrow::BitUtil::PopCount(present_bitmap)); + return ::arrow::BitUtil::PopCount(selected_bits); + } else { + writer->AppendWord(defined_bitmap, batch_size); + return ::arrow::BitUtil::PopCount(defined_bitmap); + } +} + +template +void DefinitionLevelsToBitmapSimd(const int16_t* def_levels, int64_t num_def_levels, + LevelInfo level_info, int64_t* values_read, + int64_t* null_count, uint8_t* valid_bits, + int64_t valid_bits_offset) { + constexpr int64_t kBitMaskSize = 64; + ::arrow::internal::FirstTimeBitmapWriter writer(valid_bits, + /*start_offset=*/valid_bits_offset, + /*length=*/num_def_levels); + int64_t set_count = 0; + *values_read = 0; + while (num_def_levels > kBitMaskSize) { + set_count += DefinitionLevelsBatchToBitmap( + def_levels, kBitMaskSize, level_info, &writer); + def_levels += kBitMaskSize; + num_def_levels -= kBitMaskSize; + } + set_count += DefinitionLevelsBatchToBitmap( + def_levels, num_def_levels, level_info, &writer); + + *values_read = writer.position(); + *null_count += *values_read - set_count; + writer.Finish(); +} + + +} // namespace BMI_RUNTIME_VERSION +} // namespace internal +} // namespace parquet diff --git a/cpp/src/parquet/level_conversion_test.cc b/cpp/src/parquet/level_conversion_test.cc index d4f3719289d..c967bcf26d7 100644 --- a/cpp/src/parquet/level_conversion_test.cc +++ b/cpp/src/parquet/level_conversion_test.cc @@ -16,6 +16,7 @@ // under the License. #include "parquet/level_conversion.h" +#include "parquet/level_comparison.h" #include #include @@ -25,10 +26,12 @@ #include "arrow/util/bit_util.h" #include "arrow/util/bitmap.h" +#include "arrow/util/ubsan.h" namespace parquet { namespace internal { +using ::arrow::internal::Bitmap; using ::testing::ElementsAreArray; std::string BitmapToString(const uint8_t* bitmap, int64_t bit_count) { @@ -45,13 +48,14 @@ TEST(TestColumnReader, DefinitionLevelsToBitmap) { std::vector valid_bits(2, 0); - const int max_def_level = 3; - const int max_rep_level = 1; + LevelInfo level_info; + level_info.def_level = 3; + level_info.rep_level = 1; int64_t values_read = -1; int64_t null_count = 0; - internal::DefinitionLevelsToBitmap(def_levels.data(), 9, max_def_level, max_rep_level, - &values_read, &null_count, valid_bits.data(), + internal::DefinitionLevelsToBitmap(def_levels.data(), 9, level_info, &values_read, + &null_count, valid_bits.data(), 0 /* valid_bits_offset */); ASSERT_EQ(9, values_read); ASSERT_EQ(1, null_count); @@ -59,8 +63,8 @@ TEST(TestColumnReader, DefinitionLevelsToBitmap) { // Call again with 0 definition levels, make sure that valid_bits is unmodified const uint8_t current_byte = valid_bits[1]; null_count = 0; - internal::DefinitionLevelsToBitmap(def_levels.data(), 0, max_def_level, max_rep_level, - &values_read, &null_count, valid_bits.data(), + internal::DefinitionLevelsToBitmap(def_levels.data(), 0, level_info, &values_read, + &null_count, valid_bits.data(), 9 /* valid_bits_offset */); ASSERT_EQ(0, values_read); ASSERT_EQ(0, null_count); @@ -74,16 +78,17 @@ TEST(TestColumnReader, DefinitionLevelsToBitmapPowerOfTwo) { std::vector def_levels = {3, 3, 3, 2, 3, 3, 3, 3}; std::vector valid_bits(1, 0); - const int max_def_level = 3; - const int max_rep_level = 1; + LevelInfo level_info; + level_info.rep_level = 1; + level_info.def_level = 3; int64_t values_read = -1; int64_t null_count = 0; // Read the latter half of the validity bitmap - internal::DefinitionLevelsToBitmap(def_levels.data() + 4, 4, max_def_level, - max_rep_level, &values_read, &null_count, - valid_bits.data(), 4 /* valid_bits_offset */); + internal::DefinitionLevelsToBitmap(def_levels.data() + 4, 4, level_info, &values_read, + &null_count, valid_bits.data(), + 4 /* valid_bits_offset */); ASSERT_EQ(4, values_read); ASSERT_EQ(0, null_count); } @@ -111,12 +116,15 @@ TEST(DefinitionLevelsToBitmap, WithRepetitionLevelFiltersOutEmptyListValues) { int64_t null_count = 5; int64_t values_read = 1; + LevelInfo level_info; + level_info.repeated_ancestor_def_level = 1; + level_info.def_level = 2; + level_info.rep_level = 1; // All zeros should be ignored, ones should be unset in the bitmp and 2 should be set. std::vector def_levels = {0, 0, 0, 2, 2, 1, 0, 2}; - DefinitionLevelsToBitmap( - def_levels.data(), def_levels.size(), /*max_definition_level=*/2, - /*max_repetition_level=*/1, &values_read, &null_count, validity_bitmap.data(), - /*valid_bits_offset=*/1); + DefinitionLevelsToBitmap(def_levels.data(), def_levels.size(), level_info, &values_read, + &null_count, validity_bitmap.data(), + /*valid_bits_offset=*/1); EXPECT_EQ(BitmapToString(validity_bitmap, /*bit_count=*/8), "01101000"); for (size_t x = 1; x < validity_bitmap.size(); x++) { @@ -126,5 +134,205 @@ TEST(DefinitionLevelsToBitmap, WithRepetitionLevelFiltersOutEmptyListValues) { EXPECT_EQ(values_read, 4); // value should get overwritten. } +template +void DefRepLevelsToListLengths(const int16_t* def_levels, const int16_t* rep_levels, + int64_t num_def_levels, LevelInfo level_info, + LengthType* lengths) { + for (int x = 0; x < num_def_levels; x++) { + if (rep_levels[x] < level_info.rep_level) { + // A value less than the current rep_level indicates either a start of a + // new list or an empty list. + if (def_levels[x] >= level_info.repeated_ancestor_def_level) { + ++lengths; + *lengths = def_levels[x] >= level_info.def_level ? 1 : 0; + } + } else { + // The current list length only increases when the rep level is equal + // to the current one (greater rep levels belong to the next length list. + lengths += rep_levels[x] == level_info.rep_level ? 1 : 0; + } + } +} + +class MultiLevelTestData { + public: + // Triply nested list values borrow from write_path + // [null, [[1 , null, 3], []], []], + // [[[]], [[], [1, 2]], null, [[3]]], + // null, + // [] + std::vector def_levels_{2, 7, 6, 7, 5, 3, // first row + 5, 5, 7, 7, 2, 7, // second row + 0, // third row + 1}; + std::vector rep_levels_{0, 1, 3, 3, 2, 1, // first row + 0, 1, 2, 3, 1, 1, // second row + 0, 0}; +}; + +template +class NestedListTest : public testing::Test { + public: + MultiLevelTestData test_data_; + ConverterType converter_; +}; + +template +struct RepDefLevelConverter { + using ListLengthType = ListType; + ListLengthType* ComputeListInfo(const MultiLevelTestData& test_data, + LevelInfo level_info, ValidityBitmapInputOutput* output, + ListType* lengths) { + ConvertDefRepLevelsToList(test_data.def_levels_.data(), test_data.rep_levels_.data(), + test_data.def_levels_.size(), level_info, output, lengths); + return lengths + output->values_read; + } +}; + +using ConverterTypes = + ::testing::Types, + RepDefLevelConverter>; +TYPED_TEST_CASE(NestedListTest, ConverterTypes); + +TYPED_TEST(NestedListTest, OuterMostTest) { + // [null, [[1 , null, 3], []], []], + // [[[]], [[], [1, 2]], null, [[3]]], + // null, + // [] + // -> 4 outer most lists (len(3), len(4), null, len(0)) + LevelInfo level_info; + level_info.rep_level = 1; + level_info.def_level = 2; + + std::vector lengths(5, -1); + uint64_t validity_output; + ValidityBitmapInputOutput validity_io; + validity_io.valid_bits = reinterpret_cast(&validity_output); + typename TypeParam::ListLengthType* next_position = this->converter_.ComputeListInfo( + this->test_data_, level_info, &validity_io, lengths.data()); + + EXPECT_THAT(next_position, lengths.data() + 4); + EXPECT_THAT(lengths, testing::ElementsAre(-1, 3, 4, 0, 0)); + + EXPECT_EQ(validity_io.values_read, 4); + EXPECT_EQ(validity_io.null_count, 1); + EXPECT_EQ(BitmapToString(validity_io.valid_bits, /*length=*/4), "1101"); +} + +TYPED_TEST(NestedListTest, MiddleListTest) { + // [null, [[1 , null, 3], []], []], + // [[[]], [[], [1, 2]], null, [[3]]], + // null, + // [] + // -> middle lists (null, len(2), len(0), + // len(1), len(2), null, len(1), + // N/A, + // N/A + LevelInfo level_info; + level_info.rep_level = 2; + level_info.def_level = 4; + level_info.repeated_ancestor_def_level = 2; + + std::vector lengths(8, -1); + uint64_t validity_output; + ValidityBitmapInputOutput validity_io; + validity_io.valid_bits = reinterpret_cast(&validity_output); + typename TypeParam::ListLengthType* next_position = this->converter_.ComputeListInfo( + this->test_data_, level_info, &validity_io, lengths.data()); + + EXPECT_THAT(next_position, lengths.data() + 7); + EXPECT_THAT(lengths, testing::ElementsAre(-1, 0, 2, 0, 1, 2, 0, 1)); + + EXPECT_EQ(validity_io.values_read, 7); + EXPECT_EQ(validity_io.null_count, 2); + EXPECT_EQ(BitmapToString(validity_io.valid_bits, /*length=*/7), "0111101"); +} + +TYPED_TEST(NestedListTest, InnerMostListTest) { + // [null, [[1, null, 3], []], []], + // [[[]], [[], [1, 2]], null, [[3]]], + // null, + // [] + // -> 4 inner lists (N/A, [len(3), len(0)], N/A + // len(0), [len(0), len(2)], N/A, len(1), + // N/A, + // N/A + LevelInfo level_info; + level_info.rep_level = 3; + level_info.def_level = 6; + level_info.repeated_ancestor_def_level = 4; + + std::vector lengths(7, -1); + uint64_t validity_output; + ValidityBitmapInputOutput validity_io; + validity_io.valid_bits = reinterpret_cast(&validity_output); + typename TypeParam::ListLengthType* next_position = this->converter_.ComputeListInfo( + this->test_data_, level_info, &validity_io, lengths.data()); + + EXPECT_THAT(next_position, lengths.data() + 6); + EXPECT_THAT(lengths, testing::ElementsAre(-1, 3, 0, 0, 0, 2, 1)); + + EXPECT_EQ(validity_io.values_read, 6); + EXPECT_EQ(validity_io.null_count, 0); + EXPECT_EQ(BitmapToString(validity_io.valid_bits, /*length=*/6), "111111"); +} + +TYPED_TEST(NestedListTest, SimpleLongList) { + LevelInfo level_info; + level_info.rep_level = 1; + level_info.def_level = 2; + level_info.repeated_ancestor_def_level = 0; + + // No empty lists. + this->test_data_.def_levels_ = std::vector(65 * 9, 2); + this->test_data_.rep_levels_.clear(); + for (int x = 0; x < 65; x++) { + this->test_data_.rep_levels_.push_back(0); + this->test_data_.rep_levels_.insert(this->test_data_.rep_levels_.end(), 8, + /*rep_level=*/1); + } + + std::vector lengths(66, -1); + std::vector expected_lengths(66, 9); + expected_lengths[0] = -1; + std::vector validity_output(9, 0); + ValidityBitmapInputOutput validity_io; + validity_io.valid_bits = validity_output.data(); + typename TypeParam::ListLengthType* next_position = this->converter_.ComputeListInfo( + this->test_data_, level_info, &validity_io, lengths.data()); + + EXPECT_THAT(next_position, lengths.data() + 65); + EXPECT_THAT(lengths, testing::ElementsAreArray(expected_lengths)); + + EXPECT_EQ(validity_io.values_read, 65); + EXPECT_EQ(validity_io.null_count, 0); + EXPECT_EQ(BitmapToString(validity_io.valid_bits, /*length=*/65), + "11111111 " + "11111111 " + "11111111 " + "11111111 " + "11111111 " + "11111111 " + "11111111 " + "11111111 " + "1"); +} + +TEST(RunBasedExtract, BasicTest) { + EXPECT_EQ(RunBasedExtract(arrow::BitUtil::ToLittleEndian(0xFF), 0), 0); + EXPECT_EQ(RunBasedExtract(arrow::BitUtil::ToLittleEndian(0xFF), ~uint64_t{0}), + arrow::BitUtil::ToLittleEndian(0xFF)); + + EXPECT_EQ(RunBasedExtract(arrow::BitUtil::ToLittleEndian(0xFF00FF), + arrow::BitUtil::ToLittleEndian(0xAAAA)), + arrow::BitUtil::ToLittleEndian(0x000F)); + EXPECT_EQ(RunBasedExtract(arrow::BitUtil::ToLittleEndian(0xFF0AFF), + arrow::BitUtil::ToLittleEndian(0xAFAA)), + arrow::BitUtil::ToLittleEndian(0x00AF)); + EXPECT_EQ(RunBasedExtract(arrow::BitUtil::ToLittleEndian(0xFFAAFF), + arrow::BitUtil::ToLittleEndian(0xAFAA)), + arrow::BitUtil::ToLittleEndian(0x03AF)); +} + } // namespace internal } // namespace parquet