diff --git a/cpp/src/arrow/util/bit_stream_utils.h b/cpp/src/arrow/util/bit_stream_utils.h index 49f602ed842..73b063925a1 100644 --- a/cpp/src/arrow/util/bit_stream_utils.h +++ b/cpp/src/arrow/util/bit_stream_utils.h @@ -140,7 +140,7 @@ class BitReader { } /// Gets the next value from the buffer. Returns true if 'v' could be read or false if - /// there are not enough bytes left. num_bits must be <= 32. + /// there are not enough bytes left. template bool GetValue(int num_bits, T* v); @@ -157,6 +157,10 @@ class BitReader { template bool GetAligned(int num_bytes, T* v); + /// Advances the stream by a number of bits. Returns true if succeed or false if there + /// are not enough bits left. + bool Advance(int64_t num_bits); + /// Reads a vlq encoded int from the stream. The encoded int must start at /// the beginning of a byte. Return false if there were not enough bytes in /// the buffer. @@ -255,6 +259,16 @@ inline bool BitWriter::PutAligned(T val, int num_bytes) { namespace detail { +inline void ResetBufferedValues_(const uint8_t* buffer, int byte_offset, + int bytes_remaining, uint64_t* buffered_values) { + if (ARROW_PREDICT_TRUE(bytes_remaining >= 8)) { + memcpy(buffered_values, buffer + byte_offset, 8); + } else { + memcpy(buffered_values, buffer + byte_offset, bytes_remaining); + } + *buffered_values = arrow::BitUtil::FromLittleEndian(*buffered_values); +} + template inline void GetValue_(int num_bits, T* v, int max_bytes, const uint8_t* buffer, int* bit_offset, int* byte_offset, uint64_t* buffered_values) { @@ -272,13 +286,7 @@ inline void GetValue_(int num_bits, T* v, int max_bytes, const uint8_t* buffer, *byte_offset += 8; *bit_offset -= 64; - int bytes_remaining = max_bytes - *byte_offset; - if (ARROW_PREDICT_TRUE(bytes_remaining >= 8)) { - memcpy(buffered_values, buffer + *byte_offset, 8); - } else { - memcpy(buffered_values, buffer + *byte_offset, bytes_remaining); - } - *buffered_values = arrow::BitUtil::FromLittleEndian(*buffered_values); + ResetBufferedValues_(buffer, *byte_offset, max_bytes - *byte_offset, buffered_values); #ifdef _MSC_VER #pragma warning(push) #pragma warning(disable : 4800 4805) @@ -374,13 +382,8 @@ inline int BitReader::GetBatch(int num_bits, T* v, int batch_size) { } } - int bytes_remaining = max_bytes - byte_offset; - if (bytes_remaining >= 8) { - memcpy(&buffered_values, buffer + byte_offset, 8); - } else { - memcpy(&buffered_values, buffer + byte_offset, bytes_remaining); - } - buffered_values = arrow::BitUtil::FromLittleEndian(buffered_values); + detail::ResetBufferedValues_(buffer, byte_offset, max_bytes - byte_offset, + &buffered_values); for (; i < batch_size; ++i) { detail::GetValue_(num_bits, &v[i], max_bytes, buffer, &bit_offset, &byte_offset, @@ -411,15 +414,22 @@ inline bool BitReader::GetAligned(int num_bytes, T* v) { *v = arrow::BitUtil::FromLittleEndian(*v); byte_offset_ += num_bytes; - // Reset buffered_values_ bit_offset_ = 0; - int bytes_remaining = max_bytes_ - byte_offset_; - if (ARROW_PREDICT_TRUE(bytes_remaining >= 8)) { - memcpy(&buffered_values_, buffer_ + byte_offset_, 8); - } else { - memcpy(&buffered_values_, buffer_ + byte_offset_, bytes_remaining); + detail::ResetBufferedValues_(buffer_, byte_offset_, max_bytes_ - byte_offset_, + &buffered_values_); + return true; +} + +inline bool BitReader::Advance(int64_t num_bits) { + int64_t bits_required = bit_offset_ + num_bits; + int64_t bytes_required = BitUtil::BytesForBits(bits_required); + if (ARROW_PREDICT_FALSE(bytes_required > max_bytes_ - byte_offset_)) { + return false; } - buffered_values_ = arrow::BitUtil::FromLittleEndian(buffered_values_); + byte_offset_ += static_cast(bits_required >> 3); + bit_offset_ = static_cast(bits_required & 7); + detail::ResetBufferedValues_(buffer_, byte_offset_, max_bytes_ - byte_offset_, + &buffered_values_); return true; } diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index fa8b15d2ba7..198c3a8817d 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -4158,36 +4158,119 @@ TEST(TestArrowWriteDictionaries, NestedSubfield) { } #ifdef ARROW_CSV -TEST(TestArrowReadDeltaEncoding, DeltaBinaryPacked) { - auto file = test::get_data_file("delta_binary_packed.parquet"); - auto expect_file = test::get_data_file("delta_binary_packed_expect.csv"); - auto pool = ::arrow::default_memory_pool(); - std::unique_ptr parquet_reader; - std::shared_ptr<::arrow::Table> table; - ASSERT_OK( - FileReader::Make(pool, ParquetFileReader::OpenFile(file, false), &parquet_reader)); - ASSERT_OK(parquet_reader->ReadTable(&table)); - ASSERT_OK_AND_ASSIGN(auto input_file, ::arrow::io::ReadableFile::Open(expect_file)); +class TestArrowReadDeltaEncoding : public ::testing::Test { + public: + void ReadTableFromParquetFile(const std::string& file_name, + std::shared_ptr* out) { + auto file = test::get_data_file(file_name); + auto pool = ::arrow::default_memory_pool(); + std::unique_ptr parquet_reader; + ASSERT_OK(FileReader::Make(pool, ParquetFileReader::OpenFile(file, false), + &parquet_reader)); + ASSERT_OK(parquet_reader->ReadTable(out)); + ASSERT_OK((*out)->ValidateFull()); + } + + void ReadTableFromCSVFile(const std::string& file_name, + const ::arrow::csv::ConvertOptions& convert_options, + std::shared_ptr
* out) { + auto file = test::get_data_file(file_name); + ASSERT_OK_AND_ASSIGN(auto input_file, ::arrow::io::ReadableFile::Open(file)); + ASSERT_OK_AND_ASSIGN(auto csv_reader, + ::arrow::csv::TableReader::Make( + ::arrow::io::default_io_context(), input_file, + ::arrow::csv::ReadOptions::Defaults(), + ::arrow::csv::ParseOptions::Defaults(), convert_options)); + ASSERT_OK_AND_ASSIGN(*out, csv_reader->Read()); + } +}; + +TEST_F(TestArrowReadDeltaEncoding, DeltaBinaryPacked) { + std::shared_ptr<::arrow::Table> actual_table, expect_table; + ReadTableFromParquetFile("delta_binary_packed.parquet", &actual_table); + auto convert_options = ::arrow::csv::ConvertOptions::Defaults(); for (int i = 0; i <= 64; ++i) { std::string column_name = "bitwidth" + std::to_string(i); convert_options.column_types[column_name] = ::arrow::int64(); } convert_options.column_types["int_value"] = ::arrow::int32(); - ASSERT_OK_AND_ASSIGN(auto csv_reader, - ::arrow::csv::TableReader::Make( - ::arrow::io::default_io_context(), input_file, - ::arrow::csv::ReadOptions::Defaults(), - ::arrow::csv::ParseOptions::Defaults(), convert_options)); - ASSERT_OK_AND_ASSIGN(auto expect_table, csv_reader->Read()); + ReadTableFromCSVFile("delta_binary_packed_expect.csv", convert_options, &expect_table); + + ::arrow::AssertTablesEqual(*actual_table, *expect_table); +} + +TEST_F(TestArrowReadDeltaEncoding, DeltaByteArray) { + std::shared_ptr<::arrow::Table> actual_table, expect_table; + ReadTableFromParquetFile("delta_byte_array.parquet", &actual_table); + + auto convert_options = ::arrow::csv::ConvertOptions::Defaults(); + std::vector column_names = { + "c_customer_id", "c_salutation", "c_first_name", + "c_last_name", "c_preferred_cust_flag", "c_birth_country", + "c_login", "c_email_address", "c_last_review_date"}; + for (auto name : column_names) { + convert_options.column_types[name] = ::arrow::utf8(); + } + convert_options.strings_can_be_null = true; + ReadTableFromCSVFile("delta_byte_array_expect.csv", convert_options, &expect_table); + + ::arrow::AssertTablesEqual(*actual_table, *expect_table, false); +} + +TEST_F(TestArrowReadDeltaEncoding, IncrementalDecodeDeltaByteArray) { + auto file = test::get_data_file("delta_byte_array.parquet"); + auto pool = ::arrow::default_memory_pool(); + const int64_t batch_size = 100; + ArrowReaderProperties properties = default_arrow_reader_properties(); + properties.set_batch_size(batch_size); + std::unique_ptr parquet_reader; + std::shared_ptr<::arrow::RecordBatchReader> rb_reader; + ASSERT_OK(FileReader::Make(pool, ParquetFileReader::OpenFile(file, false), properties, + &parquet_reader)); + ASSERT_OK(parquet_reader->GetRecordBatchReader(Iota(parquet_reader->num_row_groups()), + &rb_reader)); - ::arrow::AssertTablesEqual(*table, *expect_table); + auto convert_options = ::arrow::csv::ConvertOptions::Defaults(); + std::vector column_names = { + "c_customer_id", "c_salutation", "c_first_name", + "c_last_name", "c_preferred_cust_flag", "c_birth_country", + "c_login", "c_email_address", "c_last_review_date"}; + for (auto name : column_names) { + convert_options.column_types[name] = ::arrow::utf8(); + } + convert_options.strings_can_be_null = true; + std::shared_ptr<::arrow::Table> csv_table; + ReadTableFromCSVFile("delta_byte_array_expect.csv", convert_options, &csv_table); + + ::arrow::TableBatchReader csv_table_reader(*csv_table); + csv_table_reader.set_chunksize(batch_size); + + std::shared_ptr<::arrow::RecordBatch> actual_batch, expected_batch; + for (int i = 0; i < csv_table->num_rows() / batch_size; ++i) { + ASSERT_OK(rb_reader->ReadNext(&actual_batch)); + ASSERT_OK(actual_batch->ValidateFull()); + ASSERT_OK(csv_table_reader.ReadNext(&expected_batch)); + ASSERT_NO_FATAL_FAILURE(::arrow::AssertBatchesEqual(*expected_batch, *actual_batch)); + } + ASSERT_OK(rb_reader->ReadNext(&actual_batch)); + ASSERT_EQ(nullptr, actual_batch); } + #else TEST(TestArrowReadDeltaEncoding, DeltaBinaryPacked) { GTEST_SKIP() << "Test needs CSV reader"; } + +TEST(TestArrowReadDeltaEncoding, DeltaByteArray) { + GTEST_SKIP() << "Test needs CSV reader"; +} + +TEST(TestArrowReadDeltaEncoding, IncrementalDecodeDeltaByteArray) { + GTEST_SKIP() << "Test needs CSV reader"; +} + #endif struct NestedFilterTestCase { diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index c7ad78c10d1..c05f3564236 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -780,8 +780,13 @@ class ColumnReaderImplBase { decoders_[static_cast(encoding)] = std::move(decoder); break; } + case Encoding::DELTA_BYTE_ARRAY: { + auto decoder = MakeTypedDecoder(Encoding::DELTA_BYTE_ARRAY, descr_); + current_decoder_ = decoder.get(); + decoders_[static_cast(encoding)] = std::move(decoder); + break; + } case Encoding::DELTA_LENGTH_BYTE_ARRAY: - case Encoding::DELTA_BYTE_ARRAY: ParquetException::NYI("Unsupported encoding"); default: diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 2639c3dd4aa..3ff59423411 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -2068,11 +2068,25 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecodernum_values_ = num_values; - decoder_ = ::arrow::BitUtil::BitReader(data, len); + decoder_ = std::make_shared<::arrow::BitUtil::BitReader>(data, len); InitHeader(); } + // Set BitReader which is already initialized by DeltaLengthByteArrayDecoder or + // DeltaByteArrayDecoder + void SetDecoder(int num_values, std::shared_ptr<::arrow::BitUtil::BitReader> decoder) { + this->num_values_ = num_values; + decoder_ = decoder; + InitHeader(); + } + + int ValidValuesCount() { + // total_value_count_ in header ignores of null values + return static_cast(total_value_count_); + } + int Decode(T* buffer, int max_values) override { return GetInternal(buffer, max_values); } @@ -2108,10 +2122,10 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder(sizeof(T) * 8); void InitHeader() { - if (!decoder_.GetVlqInt(&values_per_block_) || - !decoder_.GetVlqInt(&mini_blocks_per_block_) || - !decoder_.GetVlqInt(&total_value_count_) || - !decoder_.GetZigZagVlqInt(&last_value_)) { + if (!decoder_->GetVlqInt(&values_per_block_) || + !decoder_->GetVlqInt(&mini_blocks_per_block_) || + !decoder_->GetVlqInt(&total_value_count_) || + !decoder_->GetZigZagVlqInt(&last_value_)) { ParquetException::EofException(); } @@ -2137,12 +2151,12 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoderGetZigZagVlqInt(&min_delta_)) ParquetException::EofException(); // read the bitwidth of each miniblock uint8_t* bit_width_data = delta_bit_widths_->mutable_data(); for (uint32_t i = 0; i < mini_blocks_per_block_; ++i) { - if (!decoder_.GetAligned(1, bit_width_data + i)) { + if (!decoder_->GetAligned(1, bit_width_data + i)) { ParquetException::EofException(); } if (bit_width_data[i] > kMaxDeltaBitWidth) { @@ -2163,7 +2177,6 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder(max_values - i)); - if (decoder_.GetBatch(delta_bit_width_, buffer + i, values_decode) != + if (decoder_->GetBatch(delta_bit_width_, buffer + i, values_decode) != values_decode) { ParquetException::EofException(); } @@ -2192,15 +2205,24 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecodernum_values_ -= max_values; + + if (ARROW_PREDICT_FALSE(total_value_count_ == 0)) { + uint32_t padding_bits = values_current_mini_block_ * delta_bit_width_; + // skip the padding bits + if (!decoder_->Advance(padding_bits)) { + ParquetException::EofException(); + } + values_current_mini_block_ = 0; + } return max_values; } MemoryPool* pool_; - ::arrow::BitUtil::BitReader decoder_; + std::shared_ptr<::arrow::BitUtil::BitReader> decoder_; uint32_t values_per_block_; uint32_t mini_blocks_per_block_; uint32_t values_per_mini_block_; @@ -2226,30 +2248,48 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl, MemoryPool* pool = ::arrow::default_memory_pool()) : DecoderImpl(descr, Encoding::DELTA_LENGTH_BYTE_ARRAY), len_decoder_(nullptr, pool), - pool_(pool) {} + buffered_length_(AllocateBuffer(pool, 0)), + buffered_data_(AllocateBuffer(pool, 0)) {} void SetData(int num_values, const uint8_t* data, int len) override { num_values_ = num_values; if (len == 0) return; - int total_lengths_len = ::arrow::util::SafeLoadAs(data); - data += 4; - this->len_decoder_.SetData(num_values, data, total_lengths_len); - data_ = data + total_lengths_len; - this->len_ = len - 4 - total_lengths_len; + decoder_ = std::make_shared<::arrow::BitUtil::BitReader>(data, len); + DecodeLengths(); + } + + void SetDecoder(int num_values, std::shared_ptr<::arrow::BitUtil::BitReader> decoder) { + num_values_ = num_values; + decoder_ = decoder; + DecodeLengths(); } int Decode(ByteArray* buffer, int max_values) override { - using VectorT = ArrowPoolVector; - max_values = std::min(max_values, num_values_); - VectorT lengths(max_values, 0, ::arrow::stl::allocator(pool_)); - len_decoder_.Decode(lengths.data(), max_values); + max_values = std::min(max_values, num_valid_values_); + + int64_t data_size = 0; + const int32_t* length_ptr = + reinterpret_cast(buffered_length_->data()) + length_idx_; + for (int i = 0; i < max_values; ++i) { + int32_t len = length_ptr[i]; + buffer[i].len = len; + data_size += len; + } + length_idx_ += max_values; + + PARQUET_THROW_NOT_OK(buffered_data_->Resize(data_size)); + if (decoder_->GetBatch(8, buffered_data_->mutable_data(), + static_cast(data_size)) != static_cast(data_size)) { + ParquetException::EofException(); + } + const uint8_t* data_ptr = buffered_data_->data(); + for (int i = 0; i < max_values; ++i) { - buffer[i].len = lengths[i]; - buffer[i].ptr = data_; - this->data_ += lengths[i]; - this->len_ -= lengths[i]; + buffer[i].ptr = data_ptr; + data_ptr += buffer[i].len; } this->num_values_ -= max_values; + num_valid_values_ -= max_values; return max_values; } @@ -2266,8 +2306,30 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl, } private: + // Decode all the encoded lengths. The decoder_ will be at the start of the encoded data + // after that. + void DecodeLengths() { + len_decoder_.SetDecoder(num_values_, decoder_); + + // get the number of encoded lengths + int num_length = len_decoder_.ValidValuesCount(); + PARQUET_THROW_NOT_OK(buffered_length_->Resize(num_length * sizeof(int32_t))); + + // call len_decoder_.Decode to decode all the lengths. + // all the lengths are buffered in buffered_length_. + int ret = len_decoder_.Decode( + reinterpret_cast(buffered_length_->mutable_data()), num_length); + DCHECK_EQ(ret, num_length); + length_idx_ = 0; + num_valid_values_ = num_length; + } + + std::shared_ptr<::arrow::BitUtil::BitReader> decoder_; DeltaBitPackDecoder len_decoder_; - ::arrow::MemoryPool* pool_; + int num_valid_values_; + uint32_t length_idx_; + std::shared_ptr buffered_length_; + std::shared_ptr buffered_data_; }; // ---------------------------------------------------------------------- @@ -2281,46 +2343,134 @@ class DeltaByteArrayDecoder : public DecoderImpl, : DecoderImpl(descr, Encoding::DELTA_BYTE_ARRAY), prefix_len_decoder_(nullptr, pool), suffix_decoder_(nullptr, pool), - last_value_(0, nullptr) {} + last_value_in_previous_page_(""), + buffered_prefix_length_(AllocateBuffer(pool, 0)), + buffered_data_(AllocateBuffer(pool, 0)) {} - virtual void SetData(int num_values, const uint8_t* data, int len) { + void SetData(int num_values, const uint8_t* data, int len) override { num_values_ = num_values; - if (len == 0) return; - int prefix_len_length = ::arrow::util::SafeLoadAs(data); - data += 4; - len -= 4; - prefix_len_decoder_.SetData(num_values, data, prefix_len_length); - data += prefix_len_length; - len -= prefix_len_length; - suffix_decoder_.SetData(num_values, data, len); - } - - // TODO: this doesn't work and requires memory management. We need to allocate - // new strings to store the results. - virtual int Decode(ByteArray* buffer, int max_values) { - max_values = std::min(max_values, this->num_values_); - for (int i = 0; i < max_values; ++i) { - int prefix_len = 0; - prefix_len_decoder_.Decode(&prefix_len, 1); - ByteArray suffix = {0, nullptr}; - suffix_decoder_.Decode(&suffix, 1); - buffer[i].len = prefix_len + suffix.len; + decoder_ = std::make_shared<::arrow::BitUtil::BitReader>(data, len); + prefix_len_decoder_.SetDecoder(num_values, decoder_); + + // get the number of encoded prefix lengths + int num_prefix = prefix_len_decoder_.ValidValuesCount(); + // call prefix_len_decoder_.Decode to decode all the prefix lengths. + // all the prefix lengths are buffered in buffered_prefix_length_. + PARQUET_THROW_NOT_OK(buffered_prefix_length_->Resize(num_prefix * sizeof(int32_t))); + int ret = prefix_len_decoder_.Decode( + reinterpret_cast(buffered_prefix_length_->mutable_data()), num_prefix); + DCHECK_EQ(ret, num_prefix); + prefix_len_offset_ = 0; + num_valid_values_ = num_prefix; + + // at this time, the decoder_ will be at the start of the encoded suffix data. + suffix_decoder_.SetDecoder(num_values, decoder_); + + // TODO: read corrupted files written with bug(PARQUET-246). last_value_ should be set + // to last_value_in_previous_page_ when decoding a new page(except the first page) + last_value_ = ""; + } + + int Decode(ByteArray* buffer, int max_values) override { + return GetInternal(buffer, max_values); + } + + int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, + int64_t valid_bits_offset, + typename EncodingTraits::Accumulator* out) override { + int result = 0; + PARQUET_THROW_NOT_OK(DecodeArrowDense(num_values, null_count, valid_bits, + valid_bits_offset, out, &result)); + return result; + } - uint8_t* result = reinterpret_cast(malloc(buffer[i].len)); - memcpy(result, last_value_.ptr, prefix_len); - memcpy(result + prefix_len, suffix.ptr, suffix.len); + int DecodeArrow( + int num_values, int null_count, const uint8_t* valid_bits, + int64_t valid_bits_offset, + typename EncodingTraits::DictAccumulator* builder) override { + ParquetException::NYI("DecodeArrow of DictAccumulator for DeltaByteArrayDecoder"); + } + + private: + int GetInternal(ByteArray* buffer, int max_values) { + max_values = std::min(max_values, num_valid_values_); + suffix_decoder_.Decode(buffer, max_values); + + int64_t data_size = 0; + const int32_t* prefix_len_ptr = + reinterpret_cast(buffered_prefix_length_->data()) + + prefix_len_offset_; + for (int i = 0; i < max_values; ++i) { + data_size += prefix_len_ptr[i] + buffer[i].len; + } + PARQUET_THROW_NOT_OK(buffered_data_->Resize(data_size)); - buffer[i].ptr = result; - last_value_ = buffer[i]; + uint8_t* data_ptr = buffered_data_->mutable_data(); + for (int i = 0; i < max_values; ++i) { + DCHECK_LE(static_cast(prefix_len_ptr[i]), last_value_.length()); + memcpy(data_ptr, last_value_.data(), prefix_len_ptr[i]); + memcpy(data_ptr + prefix_len_ptr[i], buffer[i].ptr, buffer[i].len); + buffer[i].ptr = data_ptr; + buffer[i].len += prefix_len_ptr[i]; + data_ptr += buffer[i].len; + last_value_ = + std::string(reinterpret_cast(buffer[i].ptr), buffer[i].len); } + prefix_len_offset_ += max_values; this->num_values_ -= max_values; + num_valid_values_ -= max_values; + + if (num_valid_values_ == 0) { + last_value_in_previous_page_ = last_value_; + } return max_values; } - private: + Status DecodeArrowDense(int num_values, int null_count, const uint8_t* valid_bits, + int64_t valid_bits_offset, + typename EncodingTraits::Accumulator* out, + int* out_num_values) { + ArrowBinaryHelper helper(out); + ::arrow::internal::BitmapReader bit_reader(valid_bits, valid_bits_offset, num_values); + + std::vector values(num_values); + int num_valid_values = GetInternal(values.data(), num_values - null_count); + DCHECK_EQ(num_values - null_count, num_valid_values); + + auto values_ptr = reinterpret_cast(values.data()); + int value_idx = 0; + + for (int i = 0; i < num_values; ++i) { + bool is_valid = bit_reader.IsSet(); + bit_reader.Next(); + + if (is_valid) { + const auto& val = values_ptr[value_idx]; + if (ARROW_PREDICT_FALSE(!helper.CanFit(val.len))) { + RETURN_NOT_OK(helper.PushChunk()); + } + RETURN_NOT_OK(helper.Append(val.ptr, static_cast(val.len))); + ++value_idx; + } else { + RETURN_NOT_OK(helper.AppendNull()); + --null_count; + } + } + DCHECK_EQ(null_count, 0); + *out_num_values = num_valid_values; + return Status::OK(); + } + + std::shared_ptr<::arrow::BitUtil::BitReader> decoder_; DeltaBitPackDecoder prefix_len_decoder_; DeltaLengthByteArrayDecoder suffix_decoder_; - ByteArray last_value_; + std::string last_value_; + // string buffer for last value in previous page + std::string last_value_in_previous_page_; + int num_valid_values_; + uint32_t prefix_len_offset_; + std::shared_ptr buffered_prefix_length_; + std::shared_ptr buffered_data_; }; // ---------------------------------------------------------------------- @@ -2558,6 +2708,12 @@ std::unique_ptr MakeDecoder(Type::type type_num, Encoding::type encodin throw ParquetException("DELTA_BINARY_PACKED only supports INT32 and INT64"); break; } + } else if (encoding == Encoding::DELTA_BYTE_ARRAY) { + if (type_num == Type::BYTE_ARRAY || type_num == Type::FIXED_LEN_BYTE_ARRAY) { + return std::unique_ptr(new DeltaByteArrayDecoder(descr)); + } + throw ParquetException( + "DELTA_BYTE_ARRAY only supports BYTE_ARRAY and FIXED_LEN_BYTE_ARRAY"); } else { ParquetException::NYI("Selected encoding is not supported"); } diff --git a/cpp/submodules/parquet-testing b/cpp/submodules/parquet-testing index 600d437de0e..8f2a069ed2c 160000 --- a/cpp/submodules/parquet-testing +++ b/cpp/submodules/parquet-testing @@ -1 +1 @@ -Subproject commit 600d437de0e8b0e9927c87e76f844a1b385b02e8 +Subproject commit 8f2a069ed2c58787e5be2a3ca8c68bc801b8eafa diff --git a/docs/source/cpp/parquet.rst b/docs/source/cpp/parquet.rst index 88ea4e5b6c8..bacc45baa68 100644 --- a/docs/source/cpp/parquet.rst +++ b/docs/source/cpp/parquet.rst @@ -80,29 +80,32 @@ Compression Encodings --------- -+--------------------------+---------+ -| Encoding | Notes | -+==========================+=========+ -| PLAIN | | -+--------------------------+---------+ -| PLAIN_DICTIONARY | | -+--------------------------+---------+ -| BIT_PACKED | | -+--------------------------+---------+ -| RLE | \(1) | -+--------------------------+---------+ -| RLE_DICTIONARY | \(2) | -+--------------------------+---------+ -| BYTE_STREAM_SPLIT | | -+--------------------------+---------+ ++--------------------------+----------+----------+---------+ +| Encoding | Reading | Writing | Notes | ++==========================+==========+==========+=========+ +| PLAIN | ✓ | ✓ | | ++--------------------------+----------+----------+---------+ +| PLAIN_DICTIONARY | ✓ | ✓ | | ++--------------------------+----------+----------+---------+ +| BIT_PACKED | ✓ | ✓ | | ++--------------------------+----------+----------+---------+ +| RLE | ✓ | ✓ | \(1) | ++--------------------------+----------+----------+---------+ +| RLE_DICTIONARY | ✓ | ✓ | \(2) | ++--------------------------+----------+----------+---------+ +| BYTE_STREAM_SPLIT | ✓ | ✓ | | ++--------------------------+----------+----------+---------+ +| DELTA_BINARY_PACKED | ✓ | | | ++--------------------------+----------+----------+---------+ +| DELTA_BYTE_ARRAY | ✓ | | | ++--------------------------+----------+----------+---------+ * \(1) Only supported for encoding definition and repetition levels, not values. * \(2) On the write path, RLE_DICTIONARY is only enabled if Parquet format version 2.4 or greater is selected in :func:`WriterProperties::version`. -*Unsupported encodings:* DELTA_BINARY_PACKED, DELTA_LENGTH_BYTE_ARRAY, -DELTA_BYTE_ARRAY. +*Unsupported encoding:* DELTA_LENGTH_BYTE_ARRAY. Types -----