diff --git a/cpp/src/parquet/column_writer_test.cc b/cpp/src/parquet/column_writer_test.cc index 0da78264832..0d90ee6034c 100644 --- a/cpp/src/parquet/column_writer_test.cc +++ b/cpp/src/parquet/column_writer_test.cc @@ -402,6 +402,8 @@ TYPED_TEST_SUITE(TestPrimitiveWriter, TestTypes); using TestValuesWriterInt32Type = TestPrimitiveWriter; using TestValuesWriterInt64Type = TestPrimitiveWriter; +using TestByteArrayValuesWriter = TestPrimitiveWriter; +using TestFixedLengthByteArrayValuesWriter = TestPrimitiveWriter; TYPED_TEST(TestPrimitiveWriter, RequiredPlain) { this->TestRequiredWithEncoding(Encoding::PLAIN); @@ -429,12 +431,16 @@ TEST_F(TestValuesWriterInt64Type, RequiredDeltaBinaryPacked) { this->TestRequiredWithEncoding(Encoding::DELTA_BINARY_PACKED); } -/* -TYPED_TEST(TestPrimitiveWriter, RequiredDeltaLengthByteArray) { +TEST_F(TestByteArrayValuesWriter, RequiredDeltaLengthByteArray) { this->TestRequiredWithEncoding(Encoding::DELTA_LENGTH_BYTE_ARRAY); } -TYPED_TEST(TestPrimitiveWriter, RequiredDeltaByteArray) { +/* +TYPED_TEST(TestByteArrayValuesWriter, RequiredDeltaByteArray) { + this->TestRequiredWithEncoding(Encoding::DELTA_BYTE_ARRAY); +} + +TEST_F(TestFixedLengthByteArrayValuesWriter, RequiredDeltaByteArray) { this->TestRequiredWithEncoding(Encoding::DELTA_BYTE_ARRAY); } */ @@ -692,7 +698,6 @@ TEST_F(TestBooleanValuesWriter, AlternateBooleanValues) { // PARQUET-979 // Prevent writing large MIN, MAX stats -using TestByteArrayValuesWriter = TestPrimitiveWriter; TEST_F(TestByteArrayValuesWriter, OmitStats) { int min_len = 1024 * 4; int max_len = 1024 * 8; diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index dc0edb1c801..f227a8d1938 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -2572,6 +2572,129 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder +class DeltaLengthByteArrayEncoder : public EncoderImpl, + virtual public TypedEncoder { + public: + explicit DeltaLengthByteArrayEncoder(const ColumnDescriptor* descr, MemoryPool* pool) + : EncoderImpl(descr, Encoding::DELTA_LENGTH_BYTE_ARRAY, + pool = ::arrow::default_memory_pool()), + sink_(pool), + length_encoder_(nullptr, pool), + encoded_size_{0} {} + + std::shared_ptr FlushValues() override; + + int64_t EstimatedDataEncodedSize() override { + return encoded_size_ + length_encoder_.EstimatedDataEncodedSize(); + } + + using TypedEncoder::Put; + + void Put(const ::arrow::Array& values) override; + + void Put(const T* buffer, int num_values) override; + + void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits, + int64_t valid_bits_offset) override; + + protected: + template + void PutBinaryArray(const ArrayType& array) { + PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline( + *array.data(), + [&](::std::string_view view) { + if (ARROW_PREDICT_FALSE(view.size() > kMaxByteArraySize)) { + return Status::Invalid("Parquet cannot store strings with size 2GB or more"); + } + length_encoder_.Put({static_cast(view.length())}, 1); + PARQUET_THROW_NOT_OK(sink_.Append(view.data(), view.length())); + return Status::OK(); + }, + []() { return Status::OK(); })); + } + + ::arrow::BufferBuilder sink_; + DeltaBitPackEncoder length_encoder_; + uint32_t encoded_size_; +}; + +template +void DeltaLengthByteArrayEncoder::Put(const ::arrow::Array& values) { + AssertBaseBinary(values); + if (::arrow::is_binary_like(values.type_id())) { + PutBinaryArray(checked_cast(values)); + } else { + PutBinaryArray(checked_cast(values)); + } +} + +template +void DeltaLengthByteArrayEncoder::Put(const T* src, int num_values) { + if (num_values == 0) { + return; + } + + constexpr int kBatchSize = 256; + std::array lengths; + for (int idx = 0; idx < num_values; idx += kBatchSize) { + const int batch_size = std::min(kBatchSize, num_values - idx); + for (int j = 0; j < batch_size; ++j) { + const int32_t len = src[idx + j].len; + if (AddWithOverflow(encoded_size_, len, &encoded_size_)) { + throw ParquetException("excess expansion in DELTA_LENGTH_BYTE_ARRAY"); + } + lengths[j] = len; + } + length_encoder_.Put(lengths.data(), batch_size); + } + + PARQUET_THROW_NOT_OK(sink_.Reserve(encoded_size_)); + for (int idx = 0; idx < num_values; idx++) { + sink_.UnsafeAppend(src[idx].ptr, src[idx].len); + } +} + +template +void DeltaLengthByteArrayEncoder::PutSpaced(const T* src, int num_values, + const uint8_t* valid_bits, + int64_t valid_bits_offset) { + if (valid_bits != NULLPTR) { + PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values * sizeof(T), + this->memory_pool())); + T* data = reinterpret_cast(buffer->mutable_data()); + int num_valid_values = ::arrow::util::internal::SpacedCompress( + src, num_values, valid_bits, valid_bits_offset, data); + Put(data, num_valid_values); + } else { + Put(src, num_values); + } +} + +template +std::shared_ptr DeltaLengthByteArrayEncoder::FlushValues() { + std::shared_ptr encoded_lengths = length_encoder_.FlushValues(); + + std::shared_ptr data; + PARQUET_THROW_NOT_OK(sink_.Finish(&data)); + sink_.Reset(); + + PARQUET_THROW_NOT_OK(sink_.Resize(encoded_lengths->size() + data->size())); + PARQUET_THROW_NOT_OK(sink_.Append(encoded_lengths->data(), encoded_lengths->size())); + PARQUET_THROW_NOT_OK(sink_.Append(data->data(), data->size())); + + std::shared_ptr buffer; + PARQUET_THROW_NOT_OK(sink_.Finish(&buffer, true)); + encoded_size_ = 0; + return buffer; +} + +// ---------------------------------------------------------------------- +// DeltaLengthByteArrayDecoder + class DeltaLengthByteArrayDecoder : public DecoderImpl, virtual public TypedDecoder { public: @@ -2636,13 +2759,17 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl, int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, typename EncodingTraits::Accumulator* out) override { - ParquetException::NYI("DecodeArrow for DeltaLengthByteArrayDecoder"); + int result = 0; + PARQUET_THROW_NOT_OK(DecodeArrowDense(num_values, null_count, valid_bits, + valid_bits_offset, out, &result)); + return result; } int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, typename EncodingTraits::DictAccumulator* out) override { - ParquetException::NYI("DecodeArrow for DeltaLengthByteArrayDecoder"); + ParquetException::NYI( + "DecodeArrow of DictAccumulator for DeltaLengthByteArrayDecoder"); } private: @@ -2664,6 +2791,44 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl, num_valid_values_ = num_length; } + Status DecodeArrowDense(int num_values, int null_count, const uint8_t* valid_bits, + int64_t valid_bits_offset, + typename EncodingTraits::Accumulator* out, + int* out_num_values) { + ArrowBinaryHelper helper(out); + + std::vector values(num_values - null_count); + const int num_valid_values = Decode(values.data(), num_values - null_count); + if (ARROW_PREDICT_FALSE(num_values - null_count != num_valid_values)) { + throw ParquetException("Expected to decode ", num_values - null_count, + " values, but decoded ", num_valid_values, " values."); + } + + auto values_ptr = values.data(); + int value_idx = 0; + + RETURN_NOT_OK(VisitNullBitmapInline( + valid_bits, valid_bits_offset, num_values, null_count, + [&]() { + const auto& val = values_ptr[value_idx]; + if (ARROW_PREDICT_FALSE(!helper.CanFit(val.len))) { + RETURN_NOT_OK(helper.PushChunk()); + } + RETURN_NOT_OK(helper.Append(val.ptr, static_cast(val.len))); + ++value_idx; + return Status::OK(); + }, + [&]() { + RETURN_NOT_OK(helper.AppendNull()); + --null_count; + return Status::OK(); + })); + + DCHECK_EQ(null_count, 0); + *out_num_values = num_valid_values; + return Status::OK(); + } + std::shared_ptr<::arrow::bit_util::BitReader> decoder_; DeltaBitPackDecoder len_decoder_; int num_valid_values_; @@ -3075,7 +3240,6 @@ std::unique_ptr MakeEncoder(Type::type type_num, Encoding::type encodin return std::make_unique>(descr, pool); default: throw ParquetException("BYTE_STREAM_SPLIT only supports FLOAT and DOUBLE"); - break; } } else if (encoding == Encoding::DELTA_BINARY_PACKED) { switch (type_num) { @@ -3086,7 +3250,13 @@ std::unique_ptr MakeEncoder(Type::type type_num, Encoding::type encodin default: throw ParquetException( "DELTA_BINARY_PACKED encoder only supports INT32 and INT64"); - break; + } + } else if (encoding == Encoding::DELTA_LENGTH_BYTE_ARRAY) { + switch (type_num) { + case Type::BYTE_ARRAY: + return std::make_unique>(descr, pool); + default: + throw ParquetException("DELTA_LENGTH_BYTE_ARRAY only supports BYTE_ARRAY"); } } else { ParquetException::NYI("Selected encoding is not supported"); @@ -3126,7 +3296,6 @@ std::unique_ptr MakeDecoder(Type::type type_num, Encoding::type encodin return std::make_unique>(descr); default: throw ParquetException("BYTE_STREAM_SPLIT only supports FLOAT and DOUBLE"); - break; } } else if (encoding == Encoding::DELTA_BINARY_PACKED) { switch (type_num) { @@ -3137,7 +3306,6 @@ std::unique_ptr MakeDecoder(Type::type type_num, Encoding::type encodin default: throw ParquetException( "DELTA_BINARY_PACKED decoder only supports INT32 and INT64"); - break; } } else if (encoding == Encoding::DELTA_BYTE_ARRAY) { if (type_num == Type::BYTE_ARRAY) { diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc index a0e3fe9545d..00fffbb1e8b 100644 --- a/cpp/src/parquet/encoding_test.cc +++ b/cpp/src/parquet/encoding_test.cc @@ -18,7 +18,6 @@ #include #include #include -#include #include #include #include @@ -27,6 +26,7 @@ #include "arrow/array.h" #include "arrow/array/builder_binary.h" #include "arrow/array/builder_dict.h" +#include "arrow/compute/cast.h" #include "arrow/testing/gtest_util.h" #include "arrow/testing/random.h" #include "arrow/testing/util.h" @@ -36,7 +36,6 @@ #include "arrow/util/checked_cast.h" #include "arrow/util/endian.h" #include "arrow/util/string.h" - #include "parquet/encoding.h" #include "parquet/platform.h" #include "parquet/schema.h" @@ -302,7 +301,8 @@ class TestPlainEncoding : public TestEncodingBase { static constexpr int TYPE = Type::type_num; virtual void CheckRoundtrip() { - auto encoder = MakeTypedEncoder(Encoding::PLAIN, false, descr_.get()); + auto encoder = + MakeTypedEncoder(Encoding::PLAIN, /*use_dictionary=*/false, descr_.get()); auto decoder = MakeTypedDecoder(Encoding::PLAIN, descr_.get()); encoder->Put(draws_, num_values_); encode_buffer_ = encoder->FlushValues(); @@ -315,7 +315,8 @@ class TestPlainEncoding : public TestEncodingBase { } void CheckRoundtripSpaced(const uint8_t* valid_bits, int64_t valid_bits_offset) { - auto encoder = MakeTypedEncoder(Encoding::PLAIN, false, descr_.get()); + auto encoder = + MakeTypedEncoder(Encoding::PLAIN, /*use_dictionary=*/false, descr_.get()); auto decoder = MakeTypedDecoder(Encoding::PLAIN, descr_.get()); int null_count = 0; for (auto i = 0; i < num_values_; i++) { @@ -741,6 +742,35 @@ class EncodingAdHocTyped : public ::testing::Test { ::arrow::AssertArraysEqual(*values, *result); } + void DeltaBitPack(int seed) { + if (!std::is_same::value && + !std::is_same::value) { + return; + } + auto values = GetValues(seed); + auto encoder = MakeTypedEncoder( + Encoding::DELTA_BINARY_PACKED, /*use_dictionary=*/false, column_descr()); + auto decoder = + MakeTypedDecoder(Encoding::DELTA_BINARY_PACKED, column_descr()); + + ASSERT_NO_THROW(encoder->Put(*values)); + auto buf = encoder->FlushValues(); + + int num_values = static_cast(values->length() - values->null_count()); + decoder->SetData(num_values, buf->data(), static_cast(buf->size())); + + BuilderType acc(arrow_type(), ::arrow::default_memory_pool()); + ASSERT_EQ(num_values, + decoder->DecodeArrow(static_cast(values->length()), + static_cast(values->null_count()), + values->null_bitmap_data(), values->offset(), &acc)); + + std::shared_ptr<::arrow::Array> result; + ASSERT_OK(acc.Finish(&result)); + ASSERT_EQ(50, result->length()); + ::arrow::AssertArraysEqual(*values, *result); + } + void Dict(int seed) { if (std::is_same::value) { return; @@ -826,7 +856,7 @@ class EncodingAdHocTyped : public ::testing::Test { protected: const int64_t size_ = 50; - const double null_probability_ = 0.25; + double null_probability_ = 0.25; }; template @@ -879,6 +909,14 @@ TYPED_TEST(EncodingAdHocTyped, ByteStreamSplitArrowDirectPut) { } } +TYPED_TEST(EncodingAdHocTyped, DeltaBitPackArrowDirectPut) { + // TODO: test with nulls once DeltaBitPackDecoder::DecodeArrow supports them + this->null_probability_ = 0; + for (auto seed : {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) { + this->DeltaBitPack(seed); + } +} + TEST(DictEncodingAdHoc, ArrowBinaryDirectPut) { // Implemented as part of ARROW-3246 const int64_t size = 50; @@ -1068,8 +1106,8 @@ class TestByteStreamSplitEncoding : public TestEncodingBase { static constexpr int TYPE = Type::type_num; void CheckRoundtrip() override { - auto encoder = - MakeTypedEncoder(Encoding::BYTE_STREAM_SPLIT, false, descr_.get()); + auto encoder = MakeTypedEncoder(Encoding::BYTE_STREAM_SPLIT, + /*use_dictionary=*/false, descr_.get()); auto decoder = MakeTypedDecoder(Encoding::BYTE_STREAM_SPLIT, descr_.get()); encoder->Put(draws_, num_values_); encode_buffer_ = encoder->FlushValues(); @@ -1345,8 +1383,8 @@ class TestDeltaBitPackEncoding : public TestEncodingBase { } void CheckRoundtrip() override { - auto encoder = - MakeTypedEncoder(Encoding::DELTA_BINARY_PACKED, false, descr_.get()); + auto encoder = MakeTypedEncoder(Encoding::DELTA_BINARY_PACKED, + /*use_dictionary=*/false, descr_.get()); // Encode a number of times to exercise the flush logic for (size_t i = 0; i < kNumRoundTrips; ++i) { encoder->Put(draws_, num_values_); @@ -1357,8 +1395,8 @@ class TestDeltaBitPackEncoding : public TestEncodingBase { void CheckRoundtripSpaced(const uint8_t* valid_bits, int64_t valid_bits_offset) override { - auto encoder = - MakeTypedEncoder(Encoding::DELTA_BINARY_PACKED, false, descr_.get()); + auto encoder = MakeTypedEncoder(Encoding::DELTA_BINARY_PACKED, + /*use_dictionary=*/false, descr_.get()); auto decoder = MakeTypedDecoder(Encoding::DELTA_BINARY_PACKED, descr_.get()); int null_count = 0; for (auto i = 0; i < num_values_; i++) { @@ -1451,8 +1489,8 @@ TYPED_TEST(TestDeltaBitPackEncoding, NonZeroPaddedMiniblockBitWidth) { this->InitBoundData(num_values, /*repeats=*/1, /*half_range=*/31); ASSERT_EQ(this->num_values_, num_values); - auto encoder = MakeTypedEncoder(Encoding::DELTA_BINARY_PACKED, false, - this->descr_.get()); + auto encoder = MakeTypedEncoder( + Encoding::DELTA_BINARY_PACKED, /*use_dictionary=*/false, this->descr_.get()); encoder->Put(this->draws_, this->num_values_); auto encoded = encoder->FlushValues(); const auto encoded_size = encoded->size(); @@ -1493,5 +1531,201 @@ TYPED_TEST(TestDeltaBitPackEncoding, NonZeroPaddedMiniblockBitWidth) { } } +// ---------------------------------------------------------------------- +// DELTA_LENGTH_BYTE_ARRAY encode/decode tests. + +template +class TestDeltaLengthByteArrayEncoding : public TestEncodingBase { + public: + using c_type = typename Type::c_type; + static constexpr int TYPE = Type::type_num; + + virtual void CheckRoundtrip() { + auto encoder = MakeTypedEncoder(Encoding::DELTA_LENGTH_BYTE_ARRAY, + /*use_dictionary=*/false, descr_.get()); + auto decoder = + MakeTypedDecoder(Encoding::DELTA_LENGTH_BYTE_ARRAY, descr_.get()); + + encoder->Put(draws_, num_values_); + encode_buffer_ = encoder->FlushValues(); + + decoder->SetData(num_values_, encode_buffer_->data(), + static_cast(encode_buffer_->size())); + int values_decoded = decoder->Decode(decode_buf_, num_values_); + ASSERT_EQ(num_values_, values_decoded); + ASSERT_NO_FATAL_FAILURE(VerifyResults(decode_buf_, draws_, num_values_)); + } + + void CheckRoundtripSpaced(const uint8_t* valid_bits, int64_t valid_bits_offset) { + auto encoder = MakeTypedEncoder(Encoding::DELTA_LENGTH_BYTE_ARRAY, + /*use_dictionary=*/false, descr_.get()); + auto decoder = + MakeTypedDecoder(Encoding::DELTA_LENGTH_BYTE_ARRAY, descr_.get()); + int null_count = 0; + for (auto i = 0; i < num_values_; i++) { + if (!bit_util::GetBit(valid_bits, valid_bits_offset + i)) { + null_count++; + } + } + + encoder->PutSpaced(draws_, num_values_, valid_bits, valid_bits_offset); + encode_buffer_ = encoder->FlushValues(); + decoder->SetData(num_values_ - null_count, encode_buffer_->data(), + static_cast(encode_buffer_->size())); + auto values_decoded = decoder->DecodeSpaced(decode_buf_, num_values_, null_count, + valid_bits, valid_bits_offset); + ASSERT_EQ(num_values_, values_decoded); + ASSERT_NO_FATAL_FAILURE(VerifyResultsSpaced(decode_buf_, draws_, num_values_, + valid_bits, valid_bits_offset)); + } + + protected: + USING_BASE_MEMBERS(); +}; + +typedef ::testing::Types TestDeltaLengthByteArrayEncodingTypes; +TYPED_TEST_SUITE(TestDeltaLengthByteArrayEncoding, TestDeltaLengthByteArrayEncodingTypes); + +TYPED_TEST(TestDeltaLengthByteArrayEncoding, BasicRoundTrip) { + ASSERT_NO_FATAL_FAILURE(this->Execute(0, 0)); + ASSERT_NO_FATAL_FAILURE(this->Execute(2000, 200)); + ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced( + /*nvalues*/ 1234, /*repeats*/ 1, /*valid_bits_offset*/ 64, + /*null_probability*/ 0.1)); +} + +std::shared_ptr<::arrow::Array> CastBinaryTypesHelper( + std::shared_ptr<::arrow::Array> result, std::shared_ptr<::arrow::DataType> type) { + if (::arrow::is_large_binary_like(type->id())) { + ::arrow::compute::CastOptions options; + if (::arrow::is_string(type->id())) { + options.to_type = ::arrow::large_utf8(); + } else { + options.to_type = ::arrow::large_binary(); + } + EXPECT_OK_AND_ASSIGN( + auto tmp, CallFunction("cast", {::arrow::Datum{result}}, &options, nullptr)); + result = tmp.make_array(); + } + return result; +} + +TEST(DeltaLengthByteArrayEncodingAdHoc, ArrowBinaryDirectPut) { + const int64_t size = 50; + const int32_t min_length = 0; + const int32_t max_length = 10; + const int32_t num_unique = 10; + const double null_probability = 0.25; + auto encoder = MakeTypedEncoder(Encoding::DELTA_LENGTH_BYTE_ARRAY); + auto decoder = MakeTypedDecoder(Encoding::DELTA_LENGTH_BYTE_ARRAY); + + auto CheckSeed = [&](std::shared_ptr<::arrow::Array> values) { + ASSERT_NO_THROW(encoder->Put(*values)); + auto buf = encoder->FlushValues(); + + int num_values = static_cast(values->length() - values->null_count()); + decoder->SetData(num_values, buf->data(), static_cast(buf->size())); + + typename EncodingTraits::Accumulator acc; + if (::arrow::is_string(values->type()->id())) { + acc.builder = std::make_unique<::arrow::StringBuilder>(); + } else { + acc.builder = std::make_unique<::arrow::BinaryBuilder>(); + } + ASSERT_EQ(num_values, + decoder->DecodeArrow(static_cast(values->length()), + static_cast(values->null_count()), + values->null_bitmap_data(), values->offset(), &acc)); + + std::shared_ptr<::arrow::Array> result; + ASSERT_OK(acc.builder->Finish(&result)); + ASSERT_EQ(values->length(), result->length()); + ASSERT_OK(result->ValidateFull()); + + auto upcast_result = CastBinaryTypesHelper(result, values->type()); + ::arrow::AssertArraysEqual(*values, *result); + }; + + ::arrow::random::RandomArrayGenerator rag(42); + auto values = rag.String(0, min_length, max_length, null_probability); + CheckSeed(values); + for (auto seed : {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) { + rag = ::arrow::random::RandomArrayGenerator(seed); + + values = rag.String(size, min_length, max_length, null_probability); + CheckSeed(values); + + values = + rag.BinaryWithRepeats(size, num_unique, min_length, max_length, null_probability); + CheckSeed(values); + } +} + +TEST(DeltaLengthByteArrayEncodingAdHoc, ArrowDirectPut) { + auto CheckEncode = [](std::shared_ptr<::arrow::Array> values, + std::shared_ptr<::arrow::Array> lengths) { + auto encoder = MakeTypedEncoder(Encoding::DELTA_LENGTH_BYTE_ARRAY); + ASSERT_NO_THROW(encoder->Put(*values)); + auto buf = encoder->FlushValues(); + + auto lengths_encoder = MakeTypedEncoder(Encoding::DELTA_BINARY_PACKED); + ASSERT_NO_THROW(lengths_encoder->Put(*lengths)); + auto lengths_buf = lengths_encoder->FlushValues(); + + auto encoded_lengths_buf = SliceBuffer(buf, 0, lengths_buf->size()); + auto encoded_values_buf = SliceBuffer(buf, lengths_buf->size()); + + ASSERT_TRUE(encoded_lengths_buf->Equals(*lengths_buf)); + ASSERT_TRUE(encoded_values_buf->Equals(*values->data()->buffers[2])); + }; + + auto CheckDecode = [](std::shared_ptr buf, + std::shared_ptr<::arrow::Array> values) { + int num_values = static_cast(values->length()); + auto decoder = MakeTypedDecoder(Encoding::DELTA_LENGTH_BYTE_ARRAY); + decoder->SetData(num_values, buf->data(), static_cast(buf->size())); + + typename EncodingTraits::Accumulator acc; + if (::arrow::is_string(values->type()->id())) { + acc.builder = std::make_unique<::arrow::StringBuilder>(); + } else { + acc.builder = std::make_unique<::arrow::BinaryBuilder>(); + } + + ASSERT_EQ(num_values, + decoder->DecodeArrow(static_cast(values->length()), + static_cast(values->null_count()), + values->null_bitmap_data(), values->offset(), &acc)); + + std::shared_ptr<::arrow::Array> result; + ASSERT_OK(acc.builder->Finish(&result)); + ASSERT_EQ(num_values, result->length()); + ASSERT_OK(result->ValidateFull()); + + auto upcast_result = CastBinaryTypesHelper(result, values->type()); + ::arrow::AssertArraysEqual(*values, *upcast_result); + }; + + auto values = R"(["Hello", "World", "Foobar", "ADBCEF"])"; + auto lengths = ::arrow::ArrayFromJSON(::arrow::int32(), R"([5, 5, 6, 6])"); + + CheckEncode(::arrow::ArrayFromJSON(::arrow::utf8(), values), lengths); + CheckEncode(::arrow::ArrayFromJSON(::arrow::large_utf8(), values), lengths); + CheckEncode(::arrow::ArrayFromJSON(::arrow::binary(), values), lengths); + CheckEncode(::arrow::ArrayFromJSON(::arrow::large_binary(), values), lengths); + + const uint8_t data[] = { + 0x80, 0x01, 0x04, 0x04, 0x0a, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0x00, + 0x00, 0x00, 0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x57, 0x6f, 0x72, 0x6c, 0x64, + 0x46, 0x6f, 0x6f, 0x62, 0x61, 0x72, 0x41, 0x44, 0x42, 0x43, 0x45, 0x46, + }; + auto encoded = Buffer::Wrap(data, sizeof(data)); + + CheckDecode(encoded, ::arrow::ArrayFromJSON(::arrow::utf8(), values)); + CheckDecode(encoded, ::arrow::ArrayFromJSON(::arrow::large_utf8(), values)); + CheckDecode(encoded, ::arrow::ArrayFromJSON(::arrow::binary(), values)); + CheckDecode(encoded, ::arrow::ArrayFromJSON(::arrow::large_binary(), values)); +} + } // namespace test } // namespace parquet diff --git a/docs/source/cpp/parquet.rst b/docs/source/cpp/parquet.rst index edc42d54cff..73dcac20214 100644 --- a/docs/source/cpp/parquet.rst +++ b/docs/source/cpp/parquet.rst @@ -402,7 +402,7 @@ Encodings +--------------------------+----------+----------+---------+ | DELTA_BYTE_ARRAY | ✓ | | | +--------------------------+----------+----------+---------+ -| DELTA_LENGTH_BYTE_ARRAY | ✓ | | | +| DELTA_LENGTH_BYTE_ARRAY | ✓ | ✓ | | +--------------------------+----------+----------+---------+ * \(1) Only supported for encoding definition and repetition levels, diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx index 061acafd12a..04fd71d590b 100644 --- a/python/pyarrow/_parquet.pyx +++ b/python/pyarrow/_parquet.pyx @@ -455,7 +455,7 @@ cdef class ColumnChunkMetaData(_Weakrefable): Encodings used for column (tuple of str). One of 'PLAIN', 'BIT_PACKED', 'RLE', 'BYTE_STREAM_SPLIT', 'DELTA_BINARY_PACKED', - 'DELTA_BYTE_ARRAY'. + 'DELTA_LENGTH_BYTE_ARRAY', 'DELTA_BYTE_ARRAY'. """ return tuple(map(encoding_name_from_enum, self.metadata.encodings())) @@ -1104,6 +1104,7 @@ cdef encoding_enum_from_name(str encoding_name): 'RLE': ParquetEncoding_RLE, 'BYTE_STREAM_SPLIT': ParquetEncoding_BYTE_STREAM_SPLIT, 'DELTA_BINARY_PACKED': ParquetEncoding_DELTA_BINARY_PACKED, + 'DELTA_LENGTH_BYTE_ARRAY': ParquetEncoding_DELTA_LENGTH_BYTE_ARRAY, 'DELTA_BYTE_ARRAY': ParquetEncoding_DELTA_BYTE_ARRAY, 'RLE_DICTIONARY': 'dict', 'PLAIN_DICTIONARY': 'dict', diff --git a/python/pyarrow/tests/parquet/test_basic.py b/python/pyarrow/tests/parquet/test_basic.py index 1ab392aed04..5c0e1a077d0 100644 --- a/python/pyarrow/tests/parquet/test_basic.py +++ b/python/pyarrow/tests/parquet/test_basic.py @@ -392,13 +392,16 @@ def test_byte_stream_split(use_legacy_dataset): def test_column_encoding(use_legacy_dataset): arr_float = pa.array(list(map(float, range(100)))) arr_int = pa.array(list(map(int, range(100)))) - mixed_table = pa.Table.from_arrays([arr_float, arr_int], - names=['a', 'b']) + arr_bin = pa.array([str(x) for x in range(100)]) + mixed_table = pa.Table.from_arrays([arr_float, arr_int, arr_bin], + names=['a', 'b', 'c']) # Check "BYTE_STREAM_SPLIT" for column 'a' and "PLAIN" column_encoding for - # column 'b'. + # column 'b' and 'c'. _check_roundtrip(mixed_table, expected=mixed_table, use_dictionary=False, - column_encoding={'a': "BYTE_STREAM_SPLIT", 'b': "PLAIN"}, + column_encoding={'a': "BYTE_STREAM_SPLIT", + 'b': "PLAIN", + 'c': "PLAIN"}, use_legacy_dataset=use_legacy_dataset) # Check "PLAIN" for all columns. @@ -411,7 +414,16 @@ def test_column_encoding(use_legacy_dataset): _check_roundtrip(mixed_table, expected=mixed_table, use_dictionary=False, column_encoding={'a': "PLAIN", - 'b': "DELTA_BINARY_PACKED"}, + 'b': "DELTA_BINARY_PACKED", + 'c': "PLAIN"}, + use_legacy_dataset=use_legacy_dataset) + + # Check "DELTA_LENGTH_BYTE_ARRAY" for byte columns. + _check_roundtrip(mixed_table, expected=mixed_table, + use_dictionary=False, + column_encoding={'a': "PLAIN", + 'b': "DELTA_BINARY_PACKED", + 'c': "DELTA_LENGTH_BYTE_ARRAY"}, use_legacy_dataset=use_legacy_dataset) # Try to pass "BYTE_STREAM_SPLIT" column encoding for integer column 'b'. @@ -421,7 +433,9 @@ def test_column_encoding(use_legacy_dataset): " DOUBLE"): _check_roundtrip(mixed_table, expected=mixed_table, use_dictionary=False, - column_encoding={'b': "BYTE_STREAM_SPLIT"}, + column_encoding={'a': "PLAIN", + 'b': "BYTE_STREAM_SPLIT", + 'c': "PLAIN"}, use_legacy_dataset=use_legacy_dataset) # Try to pass use "DELTA_BINARY_PACKED" encoding on float column. @@ -429,7 +443,9 @@ def test_column_encoding(use_legacy_dataset): with pytest.raises(OSError): _check_roundtrip(mixed_table, expected=mixed_table, use_dictionary=False, - column_encoding={'a': "DELTA_BINARY_PACKED"}, + column_encoding={'a': "DELTA_BINARY_PACKED", + 'b': "PLAIN", + 'c': "PLAIN"}, use_legacy_dataset=use_legacy_dataset) # Try to pass "RLE_DICTIONARY". @@ -470,7 +486,8 @@ def test_column_encoding(use_legacy_dataset): use_dictionary=False, use_byte_stream_split=['a'], column_encoding={'a': "RLE", - 'b': "BYTE_STREAM_SPLIT"}, + 'b': "BYTE_STREAM_SPLIT", + 'c': "PLAIN"}, use_legacy_dataset=use_legacy_dataset) # Try to pass column_encoding and use_byte_stream_split=True. @@ -480,7 +497,8 @@ def test_column_encoding(use_legacy_dataset): use_dictionary=False, use_byte_stream_split=True, column_encoding={'a': "RLE", - 'b': "BYTE_STREAM_SPLIT"}, + 'b': "BYTE_STREAM_SPLIT", + 'c': "PLAIN"}, use_legacy_dataset=use_legacy_dataset) # Try to pass column_encoding=True.