Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cpp/cmake_modules/SetupCxxFlags.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ if(ARROW_CPU_FLAG STREQUAL "x86")
# skylake-avx512 consists of AVX512F,AVX512BW,AVX512VL,AVX512CD,AVX512DQ
set(ARROW_AVX512_FLAG "-march=skylake-avx512 -mbmi2")
# Append the avx2/avx512 subset option also, fix issue ARROW-9877 for homebrew-cpp
set(ARROW_AVX2_FLAG "${ARROW_AVX2_FLAG} -mavx2")
set(ARROW_AVX2_FLAG "${ARROW_AVX2_FLAG} -mavx2 -mbmi2")
set(ARROW_AVX512_FLAG
"${ARROW_AVX512_FLAG} -mavx512f -mavx512cd -mavx512vl -mavx512dq -mavx512bw")
check_cxx_compiler_flag(${ARROW_SSE4_2_FLAG} CXX_SUPPORTS_SSE4_2)
Expand All @@ -68,10 +68,10 @@ if(ARROW_CPU_FLAG STREQUAL "x86")
add_definitions(-DARROW_HAVE_RUNTIME_SSE4_2)
endif()
if(CXX_SUPPORTS_AVX2 AND ARROW_RUNTIME_SIMD_LEVEL MATCHES "^(AVX2|AVX512|MAX)$")
add_definitions(-DARROW_HAVE_RUNTIME_AVX2)
add_definitions(-DARROW_HAVE_RUNTIME_AVX2 -DARROW_HAVE_RUNTIME_BMI2)
endif()
if(CXX_SUPPORTS_AVX512 AND ARROW_RUNTIME_SIMD_LEVEL MATCHES "^(AVX512|MAX)$")
add_definitions(-DARROW_HAVE_RUNTIME_AVX512)
add_definitions(-DARROW_HAVE_RUNTIME_AVX512 -DARROW_HAVE_RUNTIME_BMI2)
endif()
elseif(ARROW_CPU_FLAG STREQUAL "ppc")
# power compiler flags, gcc/clang only
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/arrow/util/cpu_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ class ARROW_EXPORT CpuInfo {
/// Returns the vendor of the cpu.
Vendor vendor() const { return vendor_; }

bool HasEfficientBmi2() const {
// BMI2 (pext, pdep) is only efficient on Intel X86 processors.
return vendor() == Vendor::Intel && IsSupported(BMI2);
}

private:
CpuInfo();

Expand Down
14 changes: 14 additions & 0 deletions cpp/src/parquet/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ set(PARQUET_SRCS
file_writer.cc
internal_file_decryptor.cc
internal_file_encryptor.cc
level_comparison.cc
level_conversion.cc
metadata.cc
murmur3.cc
Expand All @@ -202,6 +203,19 @@ set(PARQUET_SRCS
stream_writer.cc
types.cc)

if(CXX_SUPPORTS_AVX2)
# AVX2 is used as a proxy for BMI2.
list(APPEND PARQUET_SRCS level_comparison_avx2.cc level_conversion_bmi2.cc)
set_source_files_properties(level_comparison_avx2.cc
level_conversion_bmi2.cc
PROPERTIES
SKIP_PRECOMPILE_HEADERS
ON
COMPILE_FLAGS
"${ARROW_AVX2_FLAG} -DARROW_HAVE_BMI2")

endif()

if(PARQUET_REQUIRE_ENCRYPTION)
set(PARQUET_SRCS ${PARQUET_SRCS} encryption_internal.cc)
else()
Expand Down
1 change: 0 additions & 1 deletion cpp/src/parquet/arrow/arrow_reader_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2994,7 +2994,6 @@ TEST_P(TestArrowReaderAdHocSparkAndHvr, ReadDecimals) {
ASSERT_OK(builder.Append(value));
}
ASSERT_OK(builder.Finish(&expected_array));

AssertArraysEqual(*expected_array, *chunk);
}

Expand Down
7 changes: 4 additions & 3 deletions cpp/src/parquet/arrow/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -373,13 +373,14 @@ class RowGroupReaderImpl : public RowGroupReader {
class LeafReader : public ColumnReaderImpl {
public:
LeafReader(std::shared_ptr<ReaderContext> ctx, std::shared_ptr<Field> field,
std::unique_ptr<FileColumnIterator> input)
std::unique_ptr<FileColumnIterator> input,
::parquet::internal::LevelInfo leaf_info)
: ctx_(std::move(ctx)),
field_(std::move(field)),
input_(std::move(input)),
descr_(input_->descr()) {
record_reader_ = RecordReader::Make(
descr_, ctx_->pool, field_->type()->id() == ::arrow::Type::DICTIONARY);
descr_, leaf_info, ctx_->pool, field_->type()->id() == ::arrow::Type::DICTIONARY);
NextRowGroup();
}

Expand Down Expand Up @@ -694,7 +695,7 @@ Status GetReader(const SchemaField& field, const std::shared_ptr<ReaderContext>&
}
std::unique_ptr<FileColumnIterator> input(
ctx->iterator_factory(field.column_index, ctx->reader));
out->reset(new LeafReader(ctx, field.field, std::move(input)));
out->reset(new LeafReader(ctx, field.field, std::move(input), field.level_info));
} else if (type_id == ::arrow::Type::LIST) {
// We can only read lists-of-lists or structs at the moment
auto list_field = field.field;
Expand Down
114 changes: 71 additions & 43 deletions cpp/src/parquet/column_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include "parquet/encoding.h"
#include "parquet/encryption_internal.h"
#include "parquet/internal_file_decryptor.h"
#include "parquet/level_comparison.h"
#include "parquet/level_conversion.h"
#include "parquet/properties.h"
#include "parquet/statistics.h"
Expand All @@ -55,6 +56,25 @@ using arrow::internal::checked_cast;
using arrow::internal::MultiplyWithOverflow;

namespace parquet {
namespace {
inline bool HasSpacedValues(const ColumnDescriptor* descr) {
if (descr->max_repetition_level() > 0) {
// repeated+flat case
return !descr->schema_node()->is_required();
} else {
// non-repeated+nested case
// Find if a node forces nulls in the lowest level along the hierarchy
const schema::Node* node = descr->schema_node().get();
while (node) {
if (node->is_optional()) {
return true;
}
node = node->parent();
}
return false;
}
}
} // namespace

LevelDecoder::LevelDecoder() : num_values_remaining_(0) {}

Expand All @@ -63,6 +83,7 @@ LevelDecoder::~LevelDecoder() {}
int LevelDecoder::SetData(Encoding::type encoding, int16_t max_level,
int num_buffered_values, const uint8_t* data,
int32_t data_size) {
max_level_ = max_level;
int32_t num_bytes = 0;
encoding_ = encoding;
num_values_remaining_ = num_buffered_values;
Expand Down Expand Up @@ -110,6 +131,7 @@ int LevelDecoder::SetData(Encoding::type encoding, int16_t max_level,

void LevelDecoder::SetDataV2(int32_t num_bytes, int16_t max_level,
int num_buffered_values, const uint8_t* data) {
max_level_ = max_level;
// Repetition and definition levels always uses RLE encoding
// in the DataPageV2 format.
if (num_bytes < 0) {
Expand All @@ -135,6 +157,15 @@ int LevelDecoder::Decode(int batch_size, int16_t* levels) {
} else {
num_decoded = bit_packed_decoder_->GetBatch(bit_width_, levels, num_values);
}
if (num_decoded > 0) {
internal::MinMax min_max = internal::FindMinMax(levels, num_decoded);
if (ARROW_PREDICT_FALSE(min_max.min < 0 || min_max.max > max_level_)) {
std::stringstream ss;
ss << "Malformed levels. min: " << min_max.min << " max: " << min_max.max
<< " out of range. Max Level: " << max_level_;
throw ParquetException(ss.str());
}
}
num_values_remaining_ -= num_decoded;
return num_decoded;
}
Expand Down Expand Up @@ -880,8 +911,7 @@ int64_t TypedColumnReaderImpl<DType>::ReadBatchSpaced(
}
}

const bool has_spaced_values = internal::HasSpacedValues(this->descr_);

const bool has_spaced_values = HasSpacedValues(this->descr_);
int64_t null_count = 0;
if (!has_spaced_values) {
int values_to_read = 0;
Expand All @@ -896,9 +926,12 @@ int64_t TypedColumnReaderImpl<DType>::ReadBatchSpaced(
/*bits_are_set=*/true);
*values_read = total_values;
} else {
internal::DefinitionLevelsToBitmap(def_levels, num_def_levels, this->max_def_level_,
this->max_rep_level_, values_read, &null_count,
valid_bits, valid_bits_offset);
internal::LevelInfo info;
info.repeated_ancestor_def_level = this->max_def_level_ - 1;
info.def_level = this->max_def_level_;
info.rep_level = this->max_rep_level_;
internal::DefinitionLevelsToBitmap(def_levels, num_def_levels, info, values_read,
&null_count, valid_bits, valid_bits_offset);
total_values =
this->ReadValuesSpaced(*values_read, values, static_cast<int>(null_count),
valid_bits, valid_bits_offset);
Expand Down Expand Up @@ -1008,8 +1041,10 @@ class TypedRecordReader : public ColumnReaderImplBase<DType>,
public:
using T = typename DType::c_type;
using BASE = ColumnReaderImplBase<DType>;
TypedRecordReader(const ColumnDescriptor* descr, MemoryPool* pool) : BASE(descr, pool) {
nullable_values_ = internal::HasSpacedValues(descr);
TypedRecordReader(const ColumnDescriptor* descr, LevelInfo leaf_info, MemoryPool* pool)
: BASE(descr, pool) {
leaf_info_ = leaf_info;
nullable_values_ = leaf_info.HasNullableValues();
at_record_start_ = true;
records_read_ = 0;
values_written_ = 0;
Expand Down Expand Up @@ -1128,7 +1163,7 @@ class TypedRecordReader : public ColumnReaderImplBase<DType>,
}

std::shared_ptr<ResizableBuffer> ReleaseIsValid() override {
if (nullable_values_) {
if (leaf_info_.HasNullableValues()) {
auto result = valid_bits_;
PARQUET_THROW_NOT_OK(result->Resize(BitUtil::BytesForBits(values_written_), true));
valid_bits_ = AllocateBuffer(this->pool_);
Expand Down Expand Up @@ -1170,25 +1205,14 @@ class TypedRecordReader : public ColumnReaderImplBase<DType>,
break;
}
}
} else if (ARROW_PREDICT_FALSE(rep_level > this->max_rep_level_)) {
std::stringstream ss;
ss << "Malformed repetition levels, " << rep_level << " exceeded maximum "
<< this->max_rep_level_ << " indicated by schema";
throw ParquetException(ss.str());
}

// We have decided to consume the level at this position; therefore we
// must advance until we find another record boundary
at_record_start_ = false;

const int16_t def_level = *def_levels++;
if (def_level == this->max_def_level_) {
++values_to_read;
} else if (ARROW_PREDICT_FALSE(def_level > this->max_def_level_)) {
std::stringstream ss;
ss << "Malformed definition levels, " << def_level << " exceeded maximum "
<< this->max_def_level_ << " indicated by schema";
throw ParquetException(ss.str());
}
++levels_position_;
}
Expand Down Expand Up @@ -1249,7 +1273,7 @@ class TypedRecordReader : public ColumnReaderImplBase<DType>,
}
values_capacity_ = new_values_capacity;
}
if (nullable_values_) {
if (leaf_info_.HasNullableValues()) {
int64_t valid_bytes_new = BitUtil::BytesForBits(values_capacity_);
if (valid_bits_->size() < valid_bytes_new) {
int64_t valid_bytes_old = BitUtil::BytesForBits(values_written_);
Expand Down Expand Up @@ -1344,20 +1368,20 @@ class TypedRecordReader : public ColumnReaderImplBase<DType>,
}

int64_t null_count = 0;
if (nullable_values_) {
if (leaf_info_.HasNullableValues()) {
int64_t values_with_nulls = 0;
DefinitionLevelsToBitmap(
def_levels() + start_levels_position, levels_position_ - start_levels_position,
this->max_def_level_, this->max_rep_level_, &values_with_nulls, &null_count,
valid_bits_->mutable_data(), values_written_);
DefinitionLevelsToBitmap(def_levels() + start_levels_position,
levels_position_ - start_levels_position, leaf_info_,
&values_with_nulls, &null_count,
valid_bits_->mutable_data(), values_written_);
values_to_read = values_with_nulls - null_count;
DCHECK_GE(values_to_read, 0);
ReadValuesSpaced(values_with_nulls, null_count);
} else {
DCHECK_GE(values_to_read, 0);
ReadValuesDense(values_to_read);
}
if (this->max_def_level_ > 0) {
if (this->leaf_info_.def_level > 0) {
// Optional, repeated, or some mix thereof
this->ConsumeBufferedValues(levels_position_ - start_levels_position);
} else {
Expand Down Expand Up @@ -1415,13 +1439,15 @@ class TypedRecordReader : public ColumnReaderImplBase<DType>,
T* ValuesHead() {
return reinterpret_cast<T*>(values_->mutable_data()) + values_written_;
}
LevelInfo leaf_info_;
};

class FLBARecordReader : public TypedRecordReader<FLBAType>,
virtual public BinaryRecordReader {
public:
FLBARecordReader(const ColumnDescriptor* descr, ::arrow::MemoryPool* pool)
: TypedRecordReader<FLBAType>(descr, pool), builder_(nullptr) {
FLBARecordReader(const ColumnDescriptor* descr, LevelInfo leaf_info,
::arrow::MemoryPool* pool)
: TypedRecordReader<FLBAType>(descr, leaf_info, pool), builder_(nullptr) {
DCHECK_EQ(descr_->physical_type(), Type::FIXED_LEN_BYTE_ARRAY);
int byte_width = descr_->type_length();
std::shared_ptr<::arrow::DataType> type = ::arrow::fixed_size_binary(byte_width);
Expand Down Expand Up @@ -1473,8 +1499,9 @@ class FLBARecordReader : public TypedRecordReader<FLBAType>,
class ByteArrayChunkedRecordReader : public TypedRecordReader<ByteArrayType>,
virtual public BinaryRecordReader {
public:
ByteArrayChunkedRecordReader(const ColumnDescriptor* descr, ::arrow::MemoryPool* pool)
: TypedRecordReader<ByteArrayType>(descr, pool) {
ByteArrayChunkedRecordReader(const ColumnDescriptor* descr, LevelInfo leaf_info,
::arrow::MemoryPool* pool)
: TypedRecordReader<ByteArrayType>(descr, leaf_info, pool) {
DCHECK_EQ(descr_->physical_type(), Type::BYTE_ARRAY);
accumulator_.builder.reset(new ::arrow::BinaryBuilder(pool));
}
Expand Down Expand Up @@ -1513,9 +1540,9 @@ class ByteArrayChunkedRecordReader : public TypedRecordReader<ByteArrayType>,
class ByteArrayDictionaryRecordReader : public TypedRecordReader<ByteArrayType>,
virtual public DictionaryRecordReader {
public:
ByteArrayDictionaryRecordReader(const ColumnDescriptor* descr,
ByteArrayDictionaryRecordReader(const ColumnDescriptor* descr, LevelInfo leaf_info,
::arrow::MemoryPool* pool)
: TypedRecordReader<ByteArrayType>(descr, pool), builder_(pool) {
: TypedRecordReader<ByteArrayType>(descr, leaf_info, pool), builder_(pool) {
this->read_dictionary_ = true;
}

Expand Down Expand Up @@ -1602,35 +1629,36 @@ template <>
void TypedRecordReader<FLBAType>::DebugPrintState() {}

std::shared_ptr<RecordReader> MakeByteArrayRecordReader(const ColumnDescriptor* descr,
LevelInfo leaf_info,
::arrow::MemoryPool* pool,
bool read_dictionary) {
if (read_dictionary) {
return std::make_shared<ByteArrayDictionaryRecordReader>(descr, pool);
return std::make_shared<ByteArrayDictionaryRecordReader>(descr, leaf_info, pool);
} else {
return std::make_shared<ByteArrayChunkedRecordReader>(descr, pool);
return std::make_shared<ByteArrayChunkedRecordReader>(descr, leaf_info, pool);
}
}

std::shared_ptr<RecordReader> RecordReader::Make(const ColumnDescriptor* descr,
MemoryPool* pool,
LevelInfo leaf_info, MemoryPool* pool,
const bool read_dictionary) {
switch (descr->physical_type()) {
case Type::BOOLEAN:
return std::make_shared<TypedRecordReader<BooleanType>>(descr, pool);
return std::make_shared<TypedRecordReader<BooleanType>>(descr, leaf_info, pool);
case Type::INT32:
return std::make_shared<TypedRecordReader<Int32Type>>(descr, pool);
return std::make_shared<TypedRecordReader<Int32Type>>(descr, leaf_info, pool);
case Type::INT64:
return std::make_shared<TypedRecordReader<Int64Type>>(descr, pool);
return std::make_shared<TypedRecordReader<Int64Type>>(descr, leaf_info, pool);
case Type::INT96:
return std::make_shared<TypedRecordReader<Int96Type>>(descr, pool);
return std::make_shared<TypedRecordReader<Int96Type>>(descr, leaf_info, pool);
case Type::FLOAT:
return std::make_shared<TypedRecordReader<FloatType>>(descr, pool);
return std::make_shared<TypedRecordReader<FloatType>>(descr, leaf_info, pool);
case Type::DOUBLE:
return std::make_shared<TypedRecordReader<DoubleType>>(descr, pool);
return std::make_shared<TypedRecordReader<DoubleType>>(descr, leaf_info, pool);
case Type::BYTE_ARRAY:
return MakeByteArrayRecordReader(descr, pool, read_dictionary);
return MakeByteArrayRecordReader(descr, leaf_info, pool, read_dictionary);
case Type::FIXED_LEN_BYTE_ARRAY:
return std::make_shared<FLBARecordReader>(descr, pool);
return std::make_shared<FLBARecordReader>(descr, leaf_info, pool);
default: {
// PARQUET-1481: This can occur if the file is corrupt
std::stringstream ss;
Expand Down
Loading