From afb0fee1e13d9b59deece1d5eb5cd497a554f0b0 Mon Sep 17 00:00:00 2001 From: shanhuuang Date: Mon, 12 Jul 2021 17:30:19 +0800 Subject: [PATCH 1/8] PARQUET-492: [C++][Parquet] Basic support for encoding DELTA_LENGTH_BYTE_ARRAY and DELTA_BYTE_ARRAY. TODO: read corrupted files written with bug(PARQUET-246) --- cpp/src/arrow/util/bit_stream_utils.h | 25 +- .../parquet/arrow/arrow_reader_writer_test.cc | 38 +++ cpp/src/parquet/column_reader.cc | 7 +- cpp/src/parquet/encoding.cc | 259 ++++++++++++++---- cpp/submodules/parquet-testing | 2 +- 5 files changed, 275 insertions(+), 56 deletions(-) diff --git a/cpp/src/arrow/util/bit_stream_utils.h b/cpp/src/arrow/util/bit_stream_utils.h index 49f602ed842..d30d3d544b6 100644 --- a/cpp/src/arrow/util/bit_stream_utils.h +++ b/cpp/src/arrow/util/bit_stream_utils.h @@ -140,7 +140,7 @@ class BitReader { } /// Gets the next value from the buffer. Returns true if 'v' could be read or false if - /// there are not enough bytes left. num_bits must be <= 32. + /// there are not enough bytes left. template bool GetValue(int num_bits, T* v); @@ -157,6 +157,10 @@ class BitReader { template bool GetAligned(int num_bytes, T* v); + /// Advances the stream by a number of bits. Returns true if succeed or false if there + /// are not enough bits left. + bool Advance(int64_t num_bits); + /// Reads a vlq encoded int from the stream. The encoded int must start at /// the beginning of a byte. Return false if there were not enough bytes in /// the buffer. @@ -423,6 +427,25 @@ inline bool BitReader::GetAligned(int num_bytes, T* v) { return true; } +inline bool BitReader::Advance(int64_t num_bits) { + int bits_required = bit_offset_ + num_bits; + int bytes_required = static_cast(BitUtil::BytesForBits(bits_required)); + if (ARROW_PREDICT_FALSE(byte_offset_ + bytes_required > max_bytes_)) { + return false; + } + byte_offset_ += bits_required >> 3; + bit_offset_ = bits_required & 7; + // Reset buffered_values_ + int bytes_remaining = max_bytes_ - byte_offset_; + if (ARROW_PREDICT_TRUE(bytes_remaining >= 8)) { + memcpy(&buffered_values_, buffer_ + byte_offset_, 8); + } else { + memcpy(&buffered_values_, buffer_ + byte_offset_, bytes_remaining); + } + buffered_values_ = arrow::BitUtil::FromLittleEndian(buffered_values_); + return true; +} + inline bool BitWriter::PutVlqInt(uint32_t v) { bool result = true; while ((v & 0xFFFFFF80UL) != 0UL) { diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index 6c1b9c48d0e..2524eb566c4 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -4192,10 +4192,48 @@ TEST(TestArrowReadDeltaEncoding, DeltaBinaryPacked) { ::arrow::AssertTablesEqual(*table, *expect_table); } + +TEST(TestArrowReadDeltaEncoding, DeltaLengthByteArray) { + auto file = test::get_data_file("delta_byte_array.parquet"); + auto expect_file = test::get_data_file("delta_byte_array_expect.csv"); + auto pool = ::arrow::default_memory_pool(); + std::unique_ptr parquet_reader; + std::shared_ptr<::arrow::Table> parquet_table; + ASSERT_OK( + FileReader::Make(pool, ParquetFileReader::OpenFile(file, false), &parquet_reader)); + ASSERT_OK(parquet_reader->ReadTable(&parquet_table)); + ASSERT_OK_AND_ASSIGN(auto actural_table, parquet_table->CombineChunks()); + + ASSERT_OK_AND_ASSIGN(auto input_file, ::arrow::io::ReadableFile::Open(expect_file)); + auto convert_options = ::arrow::csv::ConvertOptions::Defaults(); + std::array column_names = { + "c_customer_id", "c_salutation", "c_first_name", + "c_last_name", "c_preferred_cust_flag", "c_birth_country", + "c_login", "c_email_address", "c_last_review_date"}; + for (auto name : column_names) { + convert_options.column_types[name] = ::arrow::utf8(); + } + convert_options.strings_can_be_null = true; + ASSERT_OK_AND_ASSIGN(auto csv_reader, + ::arrow::csv::TableReader::Make( + ::arrow::io::default_io_context(), input_file, + ::arrow::csv::ReadOptions::Defaults(), + ::arrow::csv::ParseOptions::Defaults(), convert_options)); + ASSERT_OK_AND_ASSIGN(auto csv_table, csv_reader->Read()); + ASSERT_OK_AND_ASSIGN(auto expect_table, csv_table->CombineChunks()); + + ::arrow::AssertTablesEqual(*actural_table, *expect_table); +} + #else TEST(TestArrowReadDeltaEncoding, DeltaBinaryPacked) { GTEST_SKIP() << "Test needs CSV reader"; } + +TEST(TestArrowReadDeltaEncoding, DeltaLengthByteArray) { + GTEST_SKIP() << "Test needs CSV reader"; +} + #endif } // namespace arrow diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index c7ad78c10d1..c05f3564236 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -780,8 +780,13 @@ class ColumnReaderImplBase { decoders_[static_cast(encoding)] = std::move(decoder); break; } + case Encoding::DELTA_BYTE_ARRAY: { + auto decoder = MakeTypedDecoder(Encoding::DELTA_BYTE_ARRAY, descr_); + current_decoder_ = decoder.get(); + decoders_[static_cast(encoding)] = std::move(decoder); + break; + } case Encoding::DELTA_LENGTH_BYTE_ARRAY: - case Encoding::DELTA_BYTE_ARRAY: ParquetException::NYI("Unsupported encoding"); default: diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index e3460144fc1..188017c9f67 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -2068,11 +2069,25 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecodernum_values_ = num_values; - decoder_ = ::arrow::BitUtil::BitReader(data, len); + decoder_ = std::make_shared<::arrow::BitUtil::BitReader>(data, len); InitHeader(); } + // Set BitReader which is already initialized by DeltaLengthByteArrayDecoder or + // DeltaByteArrayDecoder + void SetDecoder(int num_values, std::shared_ptr<::arrow::BitUtil::BitReader> decoder) { + this->num_values_ = num_values; + decoder_ = decoder; + InitHeader(); + } + + int ValidValuesCount() { + // total_value_count_ in header ignores of null values + return static_cast(total_value_count_); + } + int Decode(T* buffer, int max_values) override { return GetInternal(buffer, max_values); } @@ -2106,12 +2121,12 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoderGetVlqInt(&values_per_block_)) ParquetException::EofException(); + if (!decoder_->GetVlqInt(&mini_blocks_per_block_)) ParquetException::EofException(); + if (!decoder_->GetVlqInt(&total_value_count_) || (total_value_count_ & (1UL << 31))) { ParquetException::EofException(); } - if (!decoder_.GetZigZagVlqInt(&last_value_)) ParquetException::EofException(); + if (!decoder_->GetZigZagVlqInt(&last_value_)) ParquetException::EofException(); delta_bit_widths_ = AllocateBuffer(pool_, mini_blocks_per_block_); values_per_mini_block_ = values_per_block_ / mini_blocks_per_block_; @@ -2126,12 +2141,12 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoderGetZigZagVlqInt(&min_delta_)) ParquetException::EofException(); // read the bitwidth of each miniblock uint8_t* bit_width_data = delta_bit_widths_->mutable_data(); for (uint32_t i = 0; i < mini_blocks_per_block_; ++i) { - if (!decoder_.GetAligned(1, bit_width_data + i)) { + if (!decoder_->GetAligned(1, bit_width_data + i)) { ParquetException::EofException(); } } @@ -2149,7 +2164,6 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder(max_values - i)); - if (decoder_.GetBatch(delta_bit_width_, buffer + i, values_decode) != + if (decoder_->GetBatch(delta_bit_width_, buffer + i, values_decode) != values_decode) { ParquetException::EofException(); } @@ -2178,15 +2192,22 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecodernum_values_ -= max_values; + + if (ARROW_PREDICT_FALSE(total_value_count_ == 0)) { + uint32_t padding_bits = values_current_mini_block_ * delta_bit_width_; + // skip the padding bits + decoder_->Advance(padding_bits); + values_current_mini_block_ = 0; + } return max_values; } MemoryPool* pool_; - ::arrow::BitUtil::BitReader decoder_; + std::shared_ptr<::arrow::BitUtil::BitReader> decoder_; uint32_t values_per_block_; uint32_t mini_blocks_per_block_; uint32_t values_per_mini_block_; @@ -2194,6 +2215,7 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder delta_bit_widths_; @@ -2212,30 +2234,66 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl, MemoryPool* pool = ::arrow::default_memory_pool()) : DecoderImpl(descr, Encoding::DELTA_LENGTH_BYTE_ARRAY), len_decoder_(nullptr, pool), - pool_(pool) {} + buffered_length_(AllocateBuffer(pool, 0)), + buffered_data_(AllocateBuffer(pool, 0)) {} void SetData(int num_values, const uint8_t* data, int len) override { num_values_ = num_values; if (len == 0) return; - int total_lengths_len = ::arrow::util::SafeLoadAs(data); - data += 4; - this->len_decoder_.SetData(num_values, data, total_lengths_len); - data_ = data + total_lengths_len; - this->len_ = len - 4 - total_lengths_len; + + decoder_ = std::make_shared<::arrow::BitUtil::BitReader>(data, len); + len_decoder_.SetDecoder(num_values, decoder_); + + int num_length = len_decoder_.ValidValuesCount(); + PARQUET_THROW_NOT_OK(buffered_length_->Resize(num_length * sizeof(int32_t))); + + int ret = len_decoder_.Decode( + reinterpret_cast(buffered_length_->mutable_data()), num_length); + DCHECK_EQ(ret, num_length); + length_idx_ = 0; + num_valid_values_ = num_length; + } + + void SetData(int num_values, std::shared_ptr<::arrow::BitUtil::BitReader> decoder) { + num_values_ = num_values; + decoder_ = decoder; + + len_decoder_.SetDecoder(num_values, decoder_); + + int num_length = len_decoder_.ValidValuesCount(); + PARQUET_THROW_NOT_OK(buffered_length_->Resize(num_length * sizeof(int32_t))); + + int ret = len_decoder_.Decode( + reinterpret_cast(buffered_length_->mutable_data()), num_length); + DCHECK_EQ(ret, num_length); + length_idx_ = 0; + num_valid_values_ = num_length; } int Decode(ByteArray* buffer, int max_values) override { - using VectorT = ArrowPoolVector; - max_values = std::min(max_values, num_values_); - VectorT lengths(max_values, 0, ::arrow::stl::allocator(pool_)); - len_decoder_.Decode(lengths.data(), max_values); + max_values = std::min(max_values, num_valid_values_); + + int64_t data_size = 0; + const int32_t* length_ptr = + reinterpret_cast(buffered_length_->data()) + length_idx_; for (int i = 0; i < max_values; ++i) { - buffer[i].len = lengths[i]; - buffer[i].ptr = data_; - this->data_ += lengths[i]; - this->len_ -= lengths[i]; + int32_t len = length_ptr[i]; + buffer[i].len = len; + data_size += len; + } + length_idx_ += max_values; + + PARQUET_THROW_NOT_OK(buffered_data_->Resize(data_size)); + decoder_->GetBatch(8, buffered_data_->mutable_data(), data_size); + const uint8_t* data_ptr = buffered_data_->data(); + + for (int i = 0; i < max_values; ++i) { + buffer[i].ptr = data_ptr; + std::string str((char*)buffer[i].ptr, buffer[i].len); + data_ptr += buffer[i].len; } this->num_values_ -= max_values; + num_valid_values_ -= max_values; return max_values; } @@ -2252,8 +2310,12 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl, } private: + std::shared_ptr<::arrow::BitUtil::BitReader> decoder_; DeltaBitPackDecoder len_decoder_; - ::arrow::MemoryPool* pool_; + int num_valid_values_; + uint32_t length_idx_; + std::shared_ptr buffered_length_; + std::shared_ptr buffered_data_; }; // ---------------------------------------------------------------------- @@ -2267,46 +2329,131 @@ class DeltaByteArrayDecoder : public DecoderImpl, : DecoderImpl(descr, Encoding::DELTA_BYTE_ARRAY), prefix_len_decoder_(nullptr, pool), suffix_decoder_(nullptr, pool), - last_value_(0, nullptr) {} + last_value_in_previous_page_(""), + buffered_prefix_length_(AllocateBuffer(pool, 0)), + buffered_data_(AllocateBuffer(pool, 0)) {} - virtual void SetData(int num_values, const uint8_t* data, int len) { + void SetData(int num_values, const uint8_t* data, int len) override { num_values_ = num_values; - if (len == 0) return; - int prefix_len_length = ::arrow::util::SafeLoadAs(data); - data += 4; - len -= 4; - prefix_len_decoder_.SetData(num_values, data, prefix_len_length); - data += prefix_len_length; - len -= prefix_len_length; - suffix_decoder_.SetData(num_values, data, len); - } - - // TODO: this doesn't work and requires memory management. We need to allocate - // new strings to store the results. - virtual int Decode(ByteArray* buffer, int max_values) { - max_values = std::min(max_values, this->num_values_); - for (int i = 0; i < max_values; ++i) { - int prefix_len = 0; - prefix_len_decoder_.Decode(&prefix_len, 1); - ByteArray suffix = {0, nullptr}; - suffix_decoder_.Decode(&suffix, 1); - buffer[i].len = prefix_len + suffix.len; + decoder_ = std::make_shared<::arrow::BitUtil::BitReader>(data, len); + prefix_len_decoder_.SetDecoder(num_values, decoder_); + + int num_prefix = prefix_len_decoder_.ValidValuesCount(); + PARQUET_THROW_NOT_OK(buffered_prefix_length_->Resize(num_prefix * sizeof(int32_t))); + int ret = prefix_len_decoder_.Decode( + reinterpret_cast(buffered_prefix_length_->mutable_data()), num_prefix); + DCHECK_EQ(ret, num_prefix); + prefix_len_offset_ = 0; + num_valid_values_ = num_prefix; + + suffix_decoder_.SetData(num_values, decoder_); + // TODO: read corrupted files written with bug(PARQUET-246). last_value_ should be set + // to last_value_in_previous_page_ when decoding a new page(except the first page) + last_value_ = ByteArray(0, nullptr); + } + + int Decode(ByteArray* buffer, int max_values) override { + return GetInternal(buffer, max_values); + } + + int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, + int64_t valid_bits_offset, + typename EncodingTraits::Accumulator* out) override { + int result = 0; + PARQUET_THROW_NOT_OK(DecodeArrowDense(num_values, null_count, valid_bits, + valid_bits_offset, out, &result)); + return result; + } + + int DecodeArrow( + int num_values, int null_count, const uint8_t* valid_bits, + int64_t valid_bits_offset, + typename EncodingTraits::DictAccumulator* builder) override { + ParquetException::NYI( + "DecodeArrow of DictAccumulator for DeltaLengthByteArrayDecoder"); + } - uint8_t* result = reinterpret_cast(malloc(buffer[i].len)); - memcpy(result, last_value_.ptr, prefix_len); - memcpy(result + prefix_len, suffix.ptr, suffix.len); + private: + int GetInternal(ByteArray* buffer, int max_values) { + max_values = std::min(max_values, num_valid_values_); + suffix_decoder_.Decode(buffer, max_values); + + int64_t data_size = 0; + const int32_t* prefix_len_ptr = + reinterpret_cast(buffered_prefix_length_->data()) + + prefix_len_offset_; + for (int i = 0; i < max_values; ++i) { + data_size += prefix_len_ptr[i] + buffer[i].len; + } + PARQUET_THROW_NOT_OK(buffered_data_->Resize(data_size)); - buffer[i].ptr = result; + uint8_t* data_ptr = buffered_data_->mutable_data(); + for (int i = 0; i < max_values; ++i) { + if (ARROW_PREDICT_TRUE(last_value_.ptr)) { + memcpy(data_ptr, last_value_.ptr, prefix_len_ptr[i]); + } + memcpy(data_ptr + prefix_len_ptr[i], buffer[i].ptr, buffer[i].len); + buffer[i].ptr = data_ptr; + buffer[i].len += prefix_len_ptr[i]; + data_ptr += buffer[i].len; last_value_ = buffer[i]; } + prefix_len_offset_ += max_values; this->num_values_ -= max_values; + num_valid_values_ -= max_values; + + if (num_valid_values_ == 0) { + last_value_in_previous_page_ = + std::string(reinterpret_cast(last_value_.ptr), last_value_.len); + } return max_values; } - private: + Status DecodeArrowDense(int num_values, int null_count, const uint8_t* valid_bits, + int64_t valid_bits_offset, + typename EncodingTraits::Accumulator* out, + int* out_num_values) { + ArrowBinaryHelper helper(out); + ::arrow::internal::BitmapReader bit_reader(valid_bits, valid_bits_offset, num_values); + + std::vector values(num_values); + int num_valid_values = GetInternal(values.data(), num_values); + DCHECK_EQ(num_values - null_count, num_valid_values); + + auto values_ptr = reinterpret_cast(values.data()); + int value_idx = 0; + + for (int i = 0; i < num_values; ++i) { + bool is_valid = bit_reader.IsSet(); + bit_reader.Next(); + + if (is_valid) { + const auto& val = values_ptr[value_idx]; + if (ARROW_PREDICT_FALSE(!helper.CanFit(val.len))) { + RETURN_NOT_OK(helper.PushChunk()); + } + RETURN_NOT_OK(helper.Append(val.ptr, static_cast(val.len))); + ++value_idx; + } else { + RETURN_NOT_OK(helper.AppendNull()); + --null_count; + } + } + DCHECK_EQ(null_count, 0); + *out_num_values = num_valid_values; + return Status::OK(); + } + + std::shared_ptr<::arrow::BitUtil::BitReader> decoder_; DeltaBitPackDecoder prefix_len_decoder_; DeltaLengthByteArrayDecoder suffix_decoder_; ByteArray last_value_; + // string buffer for last value in previous page + std::string last_value_in_previous_page_; + int num_valid_values_; + uint32_t prefix_len_offset_; + std::shared_ptr buffered_prefix_length_; + std::shared_ptr buffered_data_; }; // ---------------------------------------------------------------------- @@ -2544,6 +2691,12 @@ std::unique_ptr MakeDecoder(Type::type type_num, Encoding::type encodin throw ParquetException("DELTA_BINARY_PACKED only supports INT32 and INT64"); break; } + } else if (encoding == Encoding::DELTA_BYTE_ARRAY) { + if (type_num == Type::BYTE_ARRAY || type_num == Type::FIXED_LEN_BYTE_ARRAY) { + return std::unique_ptr(new DeltaByteArrayDecoder(descr)); + } + throw ParquetException( + "DELTA_BYTE_ARRAY only supports BYTE_ARRAY and FIXED_LEN_BYTE_ARRAY"); } else { ParquetException::NYI("Selected encoding is not supported"); } diff --git a/cpp/submodules/parquet-testing b/cpp/submodules/parquet-testing index 600d437de0e..8c2cf340390 160000 --- a/cpp/submodules/parquet-testing +++ b/cpp/submodules/parquet-testing @@ -1 +1 @@ -Subproject commit 600d437de0e8b0e9927c87e76f844a1b385b02e8 +Subproject commit 8c2cf340390d70510f9ba64e5ad51e303fde13cd From 4f4300e455d27e888bf480854980ec525d562d76 Mon Sep 17 00:00:00 2001 From: shanhuuang Date: Tue, 24 Aug 2021 09:58:06 +0800 Subject: [PATCH 2/8] Fix CI failure --- cpp/src/arrow/util/bit_stream_utils.h | 6 +++--- cpp/src/parquet/encoding.cc | 5 ++++- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/cpp/src/arrow/util/bit_stream_utils.h b/cpp/src/arrow/util/bit_stream_utils.h index d30d3d544b6..b309277ed64 100644 --- a/cpp/src/arrow/util/bit_stream_utils.h +++ b/cpp/src/arrow/util/bit_stream_utils.h @@ -428,13 +428,13 @@ inline bool BitReader::GetAligned(int num_bytes, T* v) { } inline bool BitReader::Advance(int64_t num_bits) { - int bits_required = bit_offset_ + num_bits; + int64_t bits_required = bit_offset_ + num_bits; int bytes_required = static_cast(BitUtil::BytesForBits(bits_required)); if (ARROW_PREDICT_FALSE(byte_offset_ + bytes_required > max_bytes_)) { return false; } - byte_offset_ += bits_required >> 3; - bit_offset_ = bits_required & 7; + byte_offset_ += static_cast(bits_required >> 3); + bit_offset_ = static_cast(bits_required & 7); // Reset buffered_values_ int bytes_remaining = max_bytes_ - byte_offset_; if (ARROW_PREDICT_TRUE(bytes_remaining >= 8)) { diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 188017c9f67..efc74979cb8 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -2284,7 +2284,10 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl, length_idx_ += max_values; PARQUET_THROW_NOT_OK(buffered_data_->Resize(data_size)); - decoder_->GetBatch(8, buffered_data_->mutable_data(), data_size); + if (decoder_->GetBatch(8, buffered_data_->mutable_data(), + static_cast(data_size)) != static_cast(data_size)) { + ParquetException::EofException(); + } const uint8_t* data_ptr = buffered_data_->data(); for (int i = 0; i < max_values; ++i) { From ba3d33e50bb911fed33c5fb7cd25816a8420dca3 Mon Sep 17 00:00:00 2001 From: shanhuuang Date: Wed, 1 Sep 2021 12:11:17 +0800 Subject: [PATCH 3/8] Modify BitReader::Advance and the name of unit test --- cpp/src/arrow/util/bit_stream_utils.h | 4 ++-- cpp/src/parquet/arrow/arrow_reader_writer_test.cc | 4 ++-- cpp/src/parquet/encoding.cc | 7 ++++--- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/cpp/src/arrow/util/bit_stream_utils.h b/cpp/src/arrow/util/bit_stream_utils.h index b309277ed64..2017f627c63 100644 --- a/cpp/src/arrow/util/bit_stream_utils.h +++ b/cpp/src/arrow/util/bit_stream_utils.h @@ -429,8 +429,8 @@ inline bool BitReader::GetAligned(int num_bytes, T* v) { inline bool BitReader::Advance(int64_t num_bits) { int64_t bits_required = bit_offset_ + num_bits; - int bytes_required = static_cast(BitUtil::BytesForBits(bits_required)); - if (ARROW_PREDICT_FALSE(byte_offset_ + bytes_required > max_bytes_)) { + int64_t bytes_required = BitUtil::BytesForBits(bits_required); + if (ARROW_PREDICT_FALSE(bytes_required > max_bytes_ - byte_offset_)) { return false; } byte_offset_ += static_cast(bits_required >> 3); diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index 2524eb566c4..baa52aeed99 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -4193,7 +4193,7 @@ TEST(TestArrowReadDeltaEncoding, DeltaBinaryPacked) { ::arrow::AssertTablesEqual(*table, *expect_table); } -TEST(TestArrowReadDeltaEncoding, DeltaLengthByteArray) { +TEST(TestArrowReadDeltaEncoding, DeltaByteArray) { auto file = test::get_data_file("delta_byte_array.parquet"); auto expect_file = test::get_data_file("delta_byte_array_expect.csv"); auto pool = ::arrow::default_memory_pool(); @@ -4230,7 +4230,7 @@ TEST(TestArrowReadDeltaEncoding, DeltaBinaryPacked) { GTEST_SKIP() << "Test needs CSV reader"; } -TEST(TestArrowReadDeltaEncoding, DeltaLengthByteArray) { +TEST(TestArrowReadDeltaEncoding, DeltaByteArray) { GTEST_SKIP() << "Test needs CSV reader"; } diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index efc74979cb8..b5a25934f48 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -2200,7 +2200,9 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoderAdvance(padding_bits); + if (!decoder_->Advance(padding_bits)) { + ParquetException::EofException(); + } values_current_mini_block_ = 0; } return max_values; @@ -2372,8 +2374,7 @@ class DeltaByteArrayDecoder : public DecoderImpl, int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, typename EncodingTraits::DictAccumulator* builder) override { - ParquetException::NYI( - "DecodeArrow of DictAccumulator for DeltaLengthByteArrayDecoder"); + ParquetException::NYI("DecodeArrow of DictAccumulator for DeltaByteArrayDecoder"); } private: From 5c3fb367c78a02a0a4ba6892edf399d630cd682c Mon Sep 17 00:00:00 2001 From: shanhuuang Date: Thu, 2 Sep 2021 14:05:55 +0800 Subject: [PATCH 4/8] Remove the unused code --- cpp/src/parquet/encoding.cc | 2 -- cpp/submodules/parquet-testing | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index b5a25934f48..fb7bb5ecd01 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -20,7 +20,6 @@ #include #include #include -#include #include #include #include @@ -2217,7 +2216,6 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder delta_bit_widths_; diff --git a/cpp/submodules/parquet-testing b/cpp/submodules/parquet-testing index 8c2cf340390..be86178aede 160000 --- a/cpp/submodules/parquet-testing +++ b/cpp/submodules/parquet-testing @@ -1 +1 @@ -Subproject commit 8c2cf340390d70510f9ba64e5ad51e303fde13cd +Subproject commit be86178aede115c3cdc2dfd5995839feba8748ac From 5cb1dea850fca01681c8fd1384694bd21933195e Mon Sep 17 00:00:00 2001 From: shanhuuang Date: Wed, 8 Sep 2021 21:08:42 +0800 Subject: [PATCH 5/8] Add function "ResetBufferdValues_" in bit_stream_utils.h and modify the code --- cpp/src/arrow/util/bit_stream_utils.h | 47 +++++++------------ .../parquet/arrow/arrow_reader_writer_test.cc | 7 ++- cpp/src/parquet/encoding.cc | 5 +- cpp/submodules/parquet-testing | 2 +- 4 files changed, 23 insertions(+), 38 deletions(-) diff --git a/cpp/src/arrow/util/bit_stream_utils.h b/cpp/src/arrow/util/bit_stream_utils.h index 2017f627c63..f11ad322809 100644 --- a/cpp/src/arrow/util/bit_stream_utils.h +++ b/cpp/src/arrow/util/bit_stream_utils.h @@ -259,6 +259,16 @@ inline bool BitWriter::PutAligned(T val, int num_bytes) { namespace detail { +inline void ResetBufferdValues_(const uint8_t* buffer, const int& byte_offset, + const int& bytes_remaining, uint64_t* buffered_values) { + if (ARROW_PREDICT_TRUE(bytes_remaining >= 8)) { + memcpy(buffered_values, buffer + byte_offset, 8); + } else { + memcpy(buffered_values, buffer + byte_offset, bytes_remaining); + } + *buffered_values = arrow::BitUtil::FromLittleEndian(*buffered_values); +} + template inline void GetValue_(int num_bits, T* v, int max_bytes, const uint8_t* buffer, int* bit_offset, int* byte_offset, uint64_t* buffered_values) { @@ -276,13 +286,7 @@ inline void GetValue_(int num_bits, T* v, int max_bytes, const uint8_t* buffer, *byte_offset += 8; *bit_offset -= 64; - int bytes_remaining = max_bytes - *byte_offset; - if (ARROW_PREDICT_TRUE(bytes_remaining >= 8)) { - memcpy(buffered_values, buffer + *byte_offset, 8); - } else { - memcpy(buffered_values, buffer + *byte_offset, bytes_remaining); - } - *buffered_values = arrow::BitUtil::FromLittleEndian(*buffered_values); + ResetBufferdValues_(buffer, *byte_offset, max_bytes - *byte_offset, buffered_values); #ifdef _MSC_VER #pragma warning(push) #pragma warning(disable : 4800 4805) @@ -378,13 +382,8 @@ inline int BitReader::GetBatch(int num_bits, T* v, int batch_size) { } } - int bytes_remaining = max_bytes - byte_offset; - if (bytes_remaining >= 8) { - memcpy(&buffered_values, buffer + byte_offset, 8); - } else { - memcpy(&buffered_values, buffer + byte_offset, bytes_remaining); - } - buffered_values = arrow::BitUtil::FromLittleEndian(buffered_values); + detail::ResetBufferdValues_(buffer, byte_offset, max_bytes - byte_offset, + &buffered_values); for (; i < batch_size; ++i) { detail::GetValue_(num_bits, &v[i], max_bytes, buffer, &bit_offset, &byte_offset, @@ -415,15 +414,9 @@ inline bool BitReader::GetAligned(int num_bytes, T* v) { *v = arrow::BitUtil::FromLittleEndian(*v); byte_offset_ += num_bytes; - // Reset buffered_values_ bit_offset_ = 0; - int bytes_remaining = max_bytes_ - byte_offset_; - if (ARROW_PREDICT_TRUE(bytes_remaining >= 8)) { - memcpy(&buffered_values_, buffer_ + byte_offset_, 8); - } else { - memcpy(&buffered_values_, buffer_ + byte_offset_, bytes_remaining); - } - buffered_values_ = arrow::BitUtil::FromLittleEndian(buffered_values_); + detail::ResetBufferdValues_(buffer_, byte_offset_, max_bytes_ - byte_offset_, + &buffered_values_); return true; } @@ -435,14 +428,8 @@ inline bool BitReader::Advance(int64_t num_bits) { } byte_offset_ += static_cast(bits_required >> 3); bit_offset_ = static_cast(bits_required & 7); - // Reset buffered_values_ - int bytes_remaining = max_bytes_ - byte_offset_; - if (ARROW_PREDICT_TRUE(bytes_remaining >= 8)) { - memcpy(&buffered_values_, buffer_ + byte_offset_, 8); - } else { - memcpy(&buffered_values_, buffer_ + byte_offset_, bytes_remaining); - } - buffered_values_ = arrow::BitUtil::FromLittleEndian(buffered_values_); + detail::ResetBufferdValues_(buffer_, byte_offset_, max_bytes_ - byte_offset_, + &buffered_values_); return true; } diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index c7287072b3a..e9236352c2a 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -4193,11 +4193,11 @@ TEST(TestArrowReadDeltaEncoding, DeltaByteArray) { ASSERT_OK( FileReader::Make(pool, ParquetFileReader::OpenFile(file, false), &parquet_reader)); ASSERT_OK(parquet_reader->ReadTable(&parquet_table)); - ASSERT_OK_AND_ASSIGN(auto actural_table, parquet_table->CombineChunks()); + ASSERT_OK(parquet_table->ValidateFull()); ASSERT_OK_AND_ASSIGN(auto input_file, ::arrow::io::ReadableFile::Open(expect_file)); auto convert_options = ::arrow::csv::ConvertOptions::Defaults(); - std::array column_names = { + std::vector column_names = { "c_customer_id", "c_salutation", "c_first_name", "c_last_name", "c_preferred_cust_flag", "c_birth_country", "c_login", "c_email_address", "c_last_review_date"}; @@ -4211,9 +4211,8 @@ TEST(TestArrowReadDeltaEncoding, DeltaByteArray) { ::arrow::csv::ReadOptions::Defaults(), ::arrow::csv::ParseOptions::Defaults(), convert_options)); ASSERT_OK_AND_ASSIGN(auto csv_table, csv_reader->Read()); - ASSERT_OK_AND_ASSIGN(auto expect_table, csv_table->CombineChunks()); - ::arrow::AssertTablesEqual(*actural_table, *expect_table); + ::arrow::AssertTablesEqual(*parquet_table, *csv_table, false); } #else diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 672ba429ace..63bd83551f6 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -2268,7 +2268,7 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl, num_valid_values_ = num_length; } - void SetData(int num_values, std::shared_ptr<::arrow::BitUtil::BitReader> decoder) { + void SetDecoder(int num_values, std::shared_ptr<::arrow::BitUtil::BitReader> decoder) { num_values_ = num_values; decoder_ = decoder; @@ -2306,7 +2306,6 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl, for (int i = 0; i < max_values; ++i) { buffer[i].ptr = data_ptr; - std::string str((char*)buffer[i].ptr, buffer[i].len); data_ptr += buffer[i].len; } this->num_values_ -= max_values; @@ -2363,7 +2362,7 @@ class DeltaByteArrayDecoder : public DecoderImpl, prefix_len_offset_ = 0; num_valid_values_ = num_prefix; - suffix_decoder_.SetData(num_values, decoder_); + suffix_decoder_.SetDecoder(num_values, decoder_); // TODO: read corrupted files written with bug(PARQUET-246). last_value_ should be set // to last_value_in_previous_page_ when decoding a new page(except the first page) last_value_ = ByteArray(0, nullptr); diff --git a/cpp/submodules/parquet-testing b/cpp/submodules/parquet-testing index be86178aede..8f2a069ed2c 160000 --- a/cpp/submodules/parquet-testing +++ b/cpp/submodules/parquet-testing @@ -1 +1 @@ -Subproject commit be86178aede115c3cdc2dfd5995839feba8748ac +Subproject commit 8f2a069ed2c58787e5be2a3ca8c68bc801b8eafa From d89f4726f332014c53699ae41aa866e356708dbb Mon Sep 17 00:00:00 2001 From: shanhuuang Date: Thu, 9 Sep 2021 20:13:48 +0800 Subject: [PATCH 6/8] Add test case "IncrementalDecodeDeltaByteArray" and some notes. --- .../parquet/arrow/arrow_reader_writer_test.cc | 48 +++++++++++++++++++ cpp/src/parquet/encoding.cc | 22 +++++---- 2 files changed, 61 insertions(+), 9 deletions(-) diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index e9236352c2a..94102595e8a 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -4215,6 +4215,50 @@ TEST(TestArrowReadDeltaEncoding, DeltaByteArray) { ::arrow::AssertTablesEqual(*parquet_table, *csv_table, false); } +TEST(TestArrowReadDeltaEncoding, IncrementalDecodeDeltaByteArray) { + auto file = test::get_data_file("delta_byte_array.parquet"); + auto expect_file = test::get_data_file("delta_byte_array_expect.csv"); + auto pool = ::arrow::default_memory_pool(); + const int64_t batch_size = 100; + ArrowReaderProperties properties = default_arrow_reader_properties(); + properties.set_batch_size(batch_size); + + std::unique_ptr parquet_reader; + std::shared_ptr<::arrow::RecordBatchReader> rb_reader; + ASSERT_OK(FileReader::Make(pool, ParquetFileReader::OpenFile(file, false), properties, + &parquet_reader)); + ASSERT_OK(parquet_reader->GetRecordBatchReader(Iota(parquet_reader->num_row_groups()), + &rb_reader)); + + ASSERT_OK_AND_ASSIGN(auto input_file, ::arrow::io::ReadableFile::Open(expect_file)); + auto convert_options = ::arrow::csv::ConvertOptions::Defaults(); + std::vector column_names = { + "c_customer_id", "c_salutation", "c_first_name", + "c_last_name", "c_preferred_cust_flag", "c_birth_country", + "c_login", "c_email_address", "c_last_review_date"}; + for (auto name : column_names) { + convert_options.column_types[name] = ::arrow::utf8(); + } + convert_options.strings_can_be_null = true; + ASSERT_OK_AND_ASSIGN(auto csv_reader, + ::arrow::csv::TableReader::Make( + ::arrow::io::default_io_context(), input_file, + ::arrow::csv::ReadOptions::Defaults(), + ::arrow::csv::ParseOptions::Defaults(), convert_options)); + ASSERT_OK_AND_ASSIGN(auto csv_table, csv_reader->Read()); + ::arrow::TableBatchReader table_reader(*csv_table); + table_reader.set_chunksize(batch_size); + + std::shared_ptr<::arrow::RecordBatch> actual_batch, expected_batch; + for (int i = 0; i < csv_table->num_rows() / batch_size; ++i) { + ASSERT_OK(rb_reader->ReadNext(&actual_batch)); + ASSERT_OK(table_reader.ReadNext(&expected_batch)); + ASSERT_NO_FATAL_FAILURE(::arrow::AssertBatchesEqual(*expected_batch, *actual_batch)); + } + ASSERT_OK(rb_reader->ReadNext(&actual_batch)); + ASSERT_EQ(nullptr, actual_batch); +} + #else TEST(TestArrowReadDeltaEncoding, DeltaBinaryPacked) { GTEST_SKIP() << "Test needs CSV reader"; @@ -4224,6 +4268,10 @@ TEST(TestArrowReadDeltaEncoding, DeltaByteArray) { GTEST_SKIP() << "Test needs CSV reader"; } +TEST(TestArrowReadDeltaEncoding, IncrementalDecodeDeltaByteArray) { + GTEST_SKIP() << "Test needs CSV reader"; +} + #endif } // namespace arrow diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 63bd83551f6..a02e92b014b 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -2354,7 +2354,10 @@ class DeltaByteArrayDecoder : public DecoderImpl, decoder_ = std::make_shared<::arrow::BitUtil::BitReader>(data, len); prefix_len_decoder_.SetDecoder(num_values, decoder_); + // get the number of encoded prefix lengths int num_prefix = prefix_len_decoder_.ValidValuesCount(); + // call prefix_len_decoder_.Decode to decode all the prefix lengths. + // all the prefix lengths are buffered in buffered_prefix_length_. PARQUET_THROW_NOT_OK(buffered_prefix_length_->Resize(num_prefix * sizeof(int32_t))); int ret = prefix_len_decoder_.Decode( reinterpret_cast(buffered_prefix_length_->mutable_data()), num_prefix); @@ -2362,10 +2365,12 @@ class DeltaByteArrayDecoder : public DecoderImpl, prefix_len_offset_ = 0; num_valid_values_ = num_prefix; + // at this time, the decoder_ will be at the start of the encoded suffix data. suffix_decoder_.SetDecoder(num_values, decoder_); + // TODO: read corrupted files written with bug(PARQUET-246). last_value_ should be set // to last_value_in_previous_page_ when decoding a new page(except the first page) - last_value_ = ByteArray(0, nullptr); + last_value_ = ""; } int Decode(ByteArray* buffer, int max_values) override { @@ -2404,22 +2409,21 @@ class DeltaByteArrayDecoder : public DecoderImpl, uint8_t* data_ptr = buffered_data_->mutable_data(); for (int i = 0; i < max_values; ++i) { - if (ARROW_PREDICT_TRUE(last_value_.ptr)) { - memcpy(data_ptr, last_value_.ptr, prefix_len_ptr[i]); - } + DCHECK_LE(static_cast(prefix_len_ptr[i]), last_value_.length()); + memcpy(data_ptr, last_value_.data(), prefix_len_ptr[i]); memcpy(data_ptr + prefix_len_ptr[i], buffer[i].ptr, buffer[i].len); buffer[i].ptr = data_ptr; buffer[i].len += prefix_len_ptr[i]; data_ptr += buffer[i].len; - last_value_ = buffer[i]; + last_value_ = + std::string(reinterpret_cast(buffer[i].ptr), buffer[i].len); } prefix_len_offset_ += max_values; this->num_values_ -= max_values; num_valid_values_ -= max_values; if (num_valid_values_ == 0) { - last_value_in_previous_page_ = - std::string(reinterpret_cast(last_value_.ptr), last_value_.len); + last_value_in_previous_page_ = last_value_; } return max_values; } @@ -2432,7 +2436,7 @@ class DeltaByteArrayDecoder : public DecoderImpl, ::arrow::internal::BitmapReader bit_reader(valid_bits, valid_bits_offset, num_values); std::vector values(num_values); - int num_valid_values = GetInternal(values.data(), num_values); + int num_valid_values = GetInternal(values.data(), num_values - null_count); DCHECK_EQ(num_values - null_count, num_valid_values); auto values_ptr = reinterpret_cast(values.data()); @@ -2462,7 +2466,7 @@ class DeltaByteArrayDecoder : public DecoderImpl, std::shared_ptr<::arrow::BitUtil::BitReader> decoder_; DeltaBitPackDecoder prefix_len_decoder_; DeltaLengthByteArrayDecoder suffix_decoder_; - ByteArray last_value_; + std::string last_value_; // string buffer for last value in previous page std::string last_value_in_previous_page_; int num_valid_values_; From 7469605d26f5578becd0eaf6c2c5eeec2c1ea0dc Mon Sep 17 00:00:00 2001 From: shanhuuang Date: Thu, 23 Sep 2021 15:54:50 +0800 Subject: [PATCH 7/8] Factor out repeated code --- cpp/src/arrow/util/bit_stream_utils.h | 18 ++-- .../parquet/arrow/arrow_reader_writer_test.cc | 98 +++++++++---------- cpp/src/parquet/encoding.cc | 42 ++++---- 3 files changed, 77 insertions(+), 81 deletions(-) diff --git a/cpp/src/arrow/util/bit_stream_utils.h b/cpp/src/arrow/util/bit_stream_utils.h index f11ad322809..a20c4ddbbea 100644 --- a/cpp/src/arrow/util/bit_stream_utils.h +++ b/cpp/src/arrow/util/bit_stream_utils.h @@ -259,8 +259,8 @@ inline bool BitWriter::PutAligned(T val, int num_bytes) { namespace detail { -inline void ResetBufferdValues_(const uint8_t* buffer, const int& byte_offset, - const int& bytes_remaining, uint64_t* buffered_values) { +inline void ResetBufferedValues_(const uint8_t* buffer, const int& byte_offset, + const int& bytes_remaining, uint64_t* buffered_values) { if (ARROW_PREDICT_TRUE(bytes_remaining >= 8)) { memcpy(buffered_values, buffer + byte_offset, 8); } else { @@ -286,7 +286,7 @@ inline void GetValue_(int num_bits, T* v, int max_bytes, const uint8_t* buffer, *byte_offset += 8; *bit_offset -= 64; - ResetBufferdValues_(buffer, *byte_offset, max_bytes - *byte_offset, buffered_values); + ResetBufferedValues_(buffer, *byte_offset, max_bytes - *byte_offset, buffered_values); #ifdef _MSC_VER #pragma warning(push) #pragma warning(disable : 4800 4805) @@ -382,8 +382,8 @@ inline int BitReader::GetBatch(int num_bits, T* v, int batch_size) { } } - detail::ResetBufferdValues_(buffer, byte_offset, max_bytes - byte_offset, - &buffered_values); + detail::ResetBufferedValues_(buffer, byte_offset, max_bytes - byte_offset, + &buffered_values); for (; i < batch_size; ++i) { detail::GetValue_(num_bits, &v[i], max_bytes, buffer, &bit_offset, &byte_offset, @@ -415,8 +415,8 @@ inline bool BitReader::GetAligned(int num_bytes, T* v) { byte_offset_ += num_bytes; bit_offset_ = 0; - detail::ResetBufferdValues_(buffer_, byte_offset_, max_bytes_ - byte_offset_, - &buffered_values_); + detail::ResetBufferedValues_(buffer_, byte_offset_, max_bytes_ - byte_offset_, + &buffered_values_); return true; } @@ -428,8 +428,8 @@ inline bool BitReader::Advance(int64_t num_bits) { } byte_offset_ += static_cast(bits_required >> 3); bit_offset_ = static_cast(bits_required & 7); - detail::ResetBufferdValues_(buffer_, byte_offset_, max_bytes_ - byte_offset_, - &buffered_values_); + detail::ResetBufferedValues_(buffer_, byte_offset_, max_bytes_ - byte_offset_, + &buffered_values_); return true; } diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index 4768be88e98..cec11c3cf8a 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -4159,45 +4159,53 @@ TEST(TestArrowWriteDictionaries, NestedSubfield) { } #ifdef ARROW_CSV -TEST(TestArrowReadDeltaEncoding, DeltaBinaryPacked) { - auto file = test::get_data_file("delta_binary_packed.parquet"); - auto expect_file = test::get_data_file("delta_binary_packed_expect.csv"); - auto pool = ::arrow::default_memory_pool(); - std::unique_ptr parquet_reader; - std::shared_ptr<::arrow::Table> table; - ASSERT_OK( - FileReader::Make(pool, ParquetFileReader::OpenFile(file, false), &parquet_reader)); - ASSERT_OK(parquet_reader->ReadTable(&table)); - ASSERT_OK_AND_ASSIGN(auto input_file, ::arrow::io::ReadableFile::Open(expect_file)); +class TestArrowReadDeltaEncoding : public ::testing::Test { + public: + void ReadTableFromParquetFile(const std::string& file_name, + std::shared_ptr* out) { + auto file = test::get_data_file(file_name); + auto pool = ::arrow::default_memory_pool(); + std::unique_ptr parquet_reader; + ASSERT_OK(FileReader::Make(pool, ParquetFileReader::OpenFile(file, false), + &parquet_reader)); + ASSERT_OK(parquet_reader->ReadTable(out)); + ASSERT_OK((*out)->ValidateFull()); + } + + void ReadTableFromCSVFile(const std::string& file_name, + const ::arrow::csv::ConvertOptions& convert_options, + std::shared_ptr
* out) { + auto file = test::get_data_file(file_name); + ASSERT_OK_AND_ASSIGN(auto input_file, ::arrow::io::ReadableFile::Open(file)); + ASSERT_OK_AND_ASSIGN(auto csv_reader, + ::arrow::csv::TableReader::Make( + ::arrow::io::default_io_context(), input_file, + ::arrow::csv::ReadOptions::Defaults(), + ::arrow::csv::ParseOptions::Defaults(), convert_options)); + ASSERT_OK_AND_ASSIGN(*out, csv_reader->Read()); + } +}; + +TEST_F(TestArrowReadDeltaEncoding, DeltaBinaryPacked) { + std::shared_ptr<::arrow::Table> actual_table, expect_table; + ReadTableFromParquetFile("delta_binary_packed.parquet", &actual_table); + auto convert_options = ::arrow::csv::ConvertOptions::Defaults(); for (int i = 0; i <= 64; ++i) { std::string column_name = "bitwidth" + std::to_string(i); convert_options.column_types[column_name] = ::arrow::int64(); } convert_options.column_types["int_value"] = ::arrow::int32(); - ASSERT_OK_AND_ASSIGN(auto csv_reader, - ::arrow::csv::TableReader::Make( - ::arrow::io::default_io_context(), input_file, - ::arrow::csv::ReadOptions::Defaults(), - ::arrow::csv::ParseOptions::Defaults(), convert_options)); - ASSERT_OK_AND_ASSIGN(auto expect_table, csv_reader->Read()); - - ::arrow::AssertTablesEqual(*table, *expect_table); + ReadTableFromCSVFile("delta_binary_packed_expect.csv", convert_options, &expect_table); + + ::arrow::AssertTablesEqual(*actual_table, *expect_table); } -TEST(TestArrowReadDeltaEncoding, DeltaByteArray) { - auto file = test::get_data_file("delta_byte_array.parquet"); - auto expect_file = test::get_data_file("delta_byte_array_expect.csv"); - auto pool = ::arrow::default_memory_pool(); - std::unique_ptr parquet_reader; - std::shared_ptr<::arrow::Table> parquet_table; - ASSERT_OK( - FileReader::Make(pool, ParquetFileReader::OpenFile(file, false), &parquet_reader)); - ASSERT_OK(parquet_reader->ReadTable(&parquet_table)); - ASSERT_OK(parquet_table->ValidateFull()); +TEST_F(TestArrowReadDeltaEncoding, DeltaByteArray) { + std::shared_ptr<::arrow::Table> actual_table, expect_table; + ReadTableFromParquetFile("delta_byte_array.parquet", &actual_table); - ASSERT_OK_AND_ASSIGN(auto input_file, ::arrow::io::ReadableFile::Open(expect_file)); auto convert_options = ::arrow::csv::ConvertOptions::Defaults(); std::vector column_names = { "c_customer_id", "c_salutation", "c_first_name", @@ -4207,24 +4215,17 @@ TEST(TestArrowReadDeltaEncoding, DeltaByteArray) { convert_options.column_types[name] = ::arrow::utf8(); } convert_options.strings_can_be_null = true; - ASSERT_OK_AND_ASSIGN(auto csv_reader, - ::arrow::csv::TableReader::Make( - ::arrow::io::default_io_context(), input_file, - ::arrow::csv::ReadOptions::Defaults(), - ::arrow::csv::ParseOptions::Defaults(), convert_options)); - ASSERT_OK_AND_ASSIGN(auto csv_table, csv_reader->Read()); - - ::arrow::AssertTablesEqual(*parquet_table, *csv_table, false); + ReadTableFromCSVFile("delta_byte_array_expect.csv", convert_options, &expect_table); + + ::arrow::AssertTablesEqual(*actual_table, *expect_table, false); } -TEST(TestArrowReadDeltaEncoding, IncrementalDecodeDeltaByteArray) { +TEST_F(TestArrowReadDeltaEncoding, IncrementalDecodeDeltaByteArray) { auto file = test::get_data_file("delta_byte_array.parquet"); - auto expect_file = test::get_data_file("delta_byte_array_expect.csv"); auto pool = ::arrow::default_memory_pool(); const int64_t batch_size = 100; ArrowReaderProperties properties = default_arrow_reader_properties(); properties.set_batch_size(batch_size); - std::unique_ptr parquet_reader; std::shared_ptr<::arrow::RecordBatchReader> rb_reader; ASSERT_OK(FileReader::Make(pool, ParquetFileReader::OpenFile(file, false), properties, @@ -4232,7 +4233,6 @@ TEST(TestArrowReadDeltaEncoding, IncrementalDecodeDeltaByteArray) { ASSERT_OK(parquet_reader->GetRecordBatchReader(Iota(parquet_reader->num_row_groups()), &rb_reader)); - ASSERT_OK_AND_ASSIGN(auto input_file, ::arrow::io::ReadableFile::Open(expect_file)); auto convert_options = ::arrow::csv::ConvertOptions::Defaults(); std::vector column_names = { "c_customer_id", "c_salutation", "c_first_name", @@ -4242,19 +4242,17 @@ TEST(TestArrowReadDeltaEncoding, IncrementalDecodeDeltaByteArray) { convert_options.column_types[name] = ::arrow::utf8(); } convert_options.strings_can_be_null = true; - ASSERT_OK_AND_ASSIGN(auto csv_reader, - ::arrow::csv::TableReader::Make( - ::arrow::io::default_io_context(), input_file, - ::arrow::csv::ReadOptions::Defaults(), - ::arrow::csv::ParseOptions::Defaults(), convert_options)); - ASSERT_OK_AND_ASSIGN(auto csv_table, csv_reader->Read()); - ::arrow::TableBatchReader table_reader(*csv_table); - table_reader.set_chunksize(batch_size); + std::shared_ptr<::arrow::Table> csv_table; + ReadTableFromCSVFile("delta_byte_array_expect.csv", convert_options, &csv_table); + + ::arrow::TableBatchReader csv_table_reader(*csv_table); + csv_table_reader.set_chunksize(batch_size); std::shared_ptr<::arrow::RecordBatch> actual_batch, expected_batch; for (int i = 0; i < csv_table->num_rows() / batch_size; ++i) { ASSERT_OK(rb_reader->ReadNext(&actual_batch)); - ASSERT_OK(table_reader.ReadNext(&expected_batch)); + ASSERT_OK(actual_batch->ValidateFull()); + ASSERT_OK(csv_table_reader.ReadNext(&expected_batch)); ASSERT_NO_FATAL_FAILURE(::arrow::AssertBatchesEqual(*expected_batch, *actual_batch)); } ASSERT_OK(rb_reader->ReadNext(&actual_batch)); diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index a02e92b014b..3ff59423411 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -2254,34 +2254,14 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl, void SetData(int num_values, const uint8_t* data, int len) override { num_values_ = num_values; if (len == 0) return; - decoder_ = std::make_shared<::arrow::BitUtil::BitReader>(data, len); - len_decoder_.SetDecoder(num_values, decoder_); - - int num_length = len_decoder_.ValidValuesCount(); - PARQUET_THROW_NOT_OK(buffered_length_->Resize(num_length * sizeof(int32_t))); - - int ret = len_decoder_.Decode( - reinterpret_cast(buffered_length_->mutable_data()), num_length); - DCHECK_EQ(ret, num_length); - length_idx_ = 0; - num_valid_values_ = num_length; + DecodeLengths(); } void SetDecoder(int num_values, std::shared_ptr<::arrow::BitUtil::BitReader> decoder) { num_values_ = num_values; decoder_ = decoder; - - len_decoder_.SetDecoder(num_values, decoder_); - - int num_length = len_decoder_.ValidValuesCount(); - PARQUET_THROW_NOT_OK(buffered_length_->Resize(num_length * sizeof(int32_t))); - - int ret = len_decoder_.Decode( - reinterpret_cast(buffered_length_->mutable_data()), num_length); - DCHECK_EQ(ret, num_length); - length_idx_ = 0; - num_valid_values_ = num_length; + DecodeLengths(); } int Decode(ByteArray* buffer, int max_values) override { @@ -2326,6 +2306,24 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl, } private: + // Decode all the encoded lengths. The decoder_ will be at the start of the encoded data + // after that. + void DecodeLengths() { + len_decoder_.SetDecoder(num_values_, decoder_); + + // get the number of encoded lengths + int num_length = len_decoder_.ValidValuesCount(); + PARQUET_THROW_NOT_OK(buffered_length_->Resize(num_length * sizeof(int32_t))); + + // call len_decoder_.Decode to decode all the lengths. + // all the lengths are buffered in buffered_length_. + int ret = len_decoder_.Decode( + reinterpret_cast(buffered_length_->mutable_data()), num_length); + DCHECK_EQ(ret, num_length); + length_idx_ = 0; + num_valid_values_ = num_length; + } + std::shared_ptr<::arrow::BitUtil::BitReader> decoder_; DeltaBitPackDecoder len_decoder_; int num_valid_values_; From e4f265f1ee1df916066f093b3565e5b9da88d02a Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Mon, 8 Nov 2021 18:35:40 +0100 Subject: [PATCH 8/8] Update doc for supported Parquet encodings --- cpp/src/arrow/util/bit_stream_utils.h | 4 +-- docs/source/cpp/parquet.rst | 37 +++++++++++++++------------ 2 files changed, 22 insertions(+), 19 deletions(-) diff --git a/cpp/src/arrow/util/bit_stream_utils.h b/cpp/src/arrow/util/bit_stream_utils.h index a20c4ddbbea..73b063925a1 100644 --- a/cpp/src/arrow/util/bit_stream_utils.h +++ b/cpp/src/arrow/util/bit_stream_utils.h @@ -259,8 +259,8 @@ inline bool BitWriter::PutAligned(T val, int num_bytes) { namespace detail { -inline void ResetBufferedValues_(const uint8_t* buffer, const int& byte_offset, - const int& bytes_remaining, uint64_t* buffered_values) { +inline void ResetBufferedValues_(const uint8_t* buffer, int byte_offset, + int bytes_remaining, uint64_t* buffered_values) { if (ARROW_PREDICT_TRUE(bytes_remaining >= 8)) { memcpy(buffered_values, buffer + byte_offset, 8); } else { diff --git a/docs/source/cpp/parquet.rst b/docs/source/cpp/parquet.rst index 88ea4e5b6c8..bacc45baa68 100644 --- a/docs/source/cpp/parquet.rst +++ b/docs/source/cpp/parquet.rst @@ -80,29 +80,32 @@ Compression Encodings --------- -+--------------------------+---------+ -| Encoding | Notes | -+==========================+=========+ -| PLAIN | | -+--------------------------+---------+ -| PLAIN_DICTIONARY | | -+--------------------------+---------+ -| BIT_PACKED | | -+--------------------------+---------+ -| RLE | \(1) | -+--------------------------+---------+ -| RLE_DICTIONARY | \(2) | -+--------------------------+---------+ -| BYTE_STREAM_SPLIT | | -+--------------------------+---------+ ++--------------------------+----------+----------+---------+ +| Encoding | Reading | Writing | Notes | ++==========================+==========+==========+=========+ +| PLAIN | ✓ | ✓ | | ++--------------------------+----------+----------+---------+ +| PLAIN_DICTIONARY | ✓ | ✓ | | ++--------------------------+----------+----------+---------+ +| BIT_PACKED | ✓ | ✓ | | ++--------------------------+----------+----------+---------+ +| RLE | ✓ | ✓ | \(1) | ++--------------------------+----------+----------+---------+ +| RLE_DICTIONARY | ✓ | ✓ | \(2) | ++--------------------------+----------+----------+---------+ +| BYTE_STREAM_SPLIT | ✓ | ✓ | | ++--------------------------+----------+----------+---------+ +| DELTA_BINARY_PACKED | ✓ | | | ++--------------------------+----------+----------+---------+ +| DELTA_BYTE_ARRAY | ✓ | | | ++--------------------------+----------+----------+---------+ * \(1) Only supported for encoding definition and repetition levels, not values. * \(2) On the write path, RLE_DICTIONARY is only enabled if Parquet format version 2.4 or greater is selected in :func:`WriterProperties::version`. -*Unsupported encodings:* DELTA_BINARY_PACKED, DELTA_LENGTH_BYTE_ARRAY, -DELTA_BYTE_ARRAY. +*Unsupported encoding:* DELTA_LENGTH_BYTE_ARRAY. Types -----