diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index dded399240e..3cfb8938175 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -577,6 +577,12 @@ class ColumnReaderImplBase { decoders_[static_cast(encoding)] = std::move(decoder); break; } + case Encoding::BYTE_STREAM_SPLIT: { + auto decoder = MakeTypedDecoder(Encoding::BYTE_STREAM_SPLIT, descr_); + current_decoder_ = decoder.get(); + decoders_[static_cast(encoding)] = std::move(decoder); + break; + } case Encoding::RLE_DICTIONARY: throw ParquetException("Dictionary page must be before data page."); diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index a61bcecc974..dc06436ff9c 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -815,6 +815,104 @@ void DictEncoderImpl::PutDictionary(const arrow::Array& values) { } } +// ---------------------------------------------------------------------- +// ByteStreamSplitEncoder implementations + +template +class ByteStreamSplitEncoder : public EncoderImpl, virtual public TypedEncoder { + public: + using T = typename DType::c_type; + using TypedEncoder::Put; + + explicit ByteStreamSplitEncoder( + const ColumnDescriptor* descr, + ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()); + + int64_t EstimatedDataEncodedSize() override; + std::shared_ptr FlushValues() override; + + void Put(const T* buffer, int num_values) override; + void Put(const arrow::Array& values) override; + void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits, + int64_t valid_bits_offset) override; + + protected: + arrow::TypedBufferBuilder values_; + + private: + void PutArrowArray(const arrow::Array& values); +}; + +template +ByteStreamSplitEncoder::ByteStreamSplitEncoder(const ColumnDescriptor* descr, + ::arrow::MemoryPool* pool) + : EncoderImpl(descr, Encoding::BYTE_STREAM_SPLIT, pool), values_{pool} {} + +template +int64_t ByteStreamSplitEncoder::EstimatedDataEncodedSize() { + return values_.length() * sizeof(T); +} + +template +std::shared_ptr ByteStreamSplitEncoder::FlushValues() { + constexpr size_t num_streams = sizeof(T); + std::shared_ptr output_buffer = + AllocateBuffer(this->memory_pool(), EstimatedDataEncodedSize()); + uint8_t* output_buffer_raw = output_buffer->mutable_data(); + const size_t num_values = values_.length(); + const uint8_t* raw_values = reinterpret_cast(values_.data()); + for (size_t i = 0; i < num_values; ++i) { + for (size_t j = 0U; j < num_streams; ++j) { + const uint8_t byte_in_value = raw_values[i * num_streams + j]; + output_buffer_raw[j * num_values + i] = byte_in_value; + } + } + values_.Reset(); + return std::move(output_buffer); +} + +template +void ByteStreamSplitEncoder::Put(const T* buffer, int num_values) { + PARQUET_THROW_NOT_OK(values_.Append(buffer, num_values)); +} + +template +void ByteStreamSplitEncoder::Put(const ::arrow::Array& values) { + PutArrowArray(values); +} + +template <> +void ByteStreamSplitEncoder::PutArrowArray(const ::arrow::Array& values) { + DirectPutImpl(values, + reinterpret_cast(&values_)); +} + +template <> +void ByteStreamSplitEncoder::PutArrowArray(const ::arrow::Array& values) { + DirectPutImpl(values, + reinterpret_cast(&values_)); +} + +template +void ByteStreamSplitEncoder::PutSpaced(const T* src, int num_values, + const uint8_t* valid_bits, + int64_t valid_bits_offset) { + std::shared_ptr buffer; + PARQUET_THROW_NOT_OK(arrow::AllocateResizableBuffer(this->memory_pool(), + num_values * sizeof(T), &buffer)); + int32_t num_valid_values = 0; + arrow::internal::BitmapReader valid_bits_reader(valid_bits, valid_bits_offset, + num_values); + T* data = reinterpret_cast(buffer->mutable_data()); + for (int32_t i = 0; i < num_values; i++) { + if (valid_bits_reader.IsSet()) { + data[num_valid_values++] = src[i]; + } + valid_bits_reader.Next(); + } + Put(data, num_valid_values); +} + // ---------------------------------------------------------------------- // Encoder and decoder factory functions @@ -863,6 +961,18 @@ std::unique_ptr MakeEncoder(Type::type type_num, Encoding::type encodin DCHECK(false) << "Encoder not implemented"; break; } + } else if (encoding == Encoding::BYTE_STREAM_SPLIT) { + switch (type_num) { + case Type::FLOAT: + return std::unique_ptr( + new ByteStreamSplitEncoder(descr, pool)); + case Type::DOUBLE: + return std::unique_ptr( + new ByteStreamSplitEncoder(descr, pool)); + default: + throw ParquetException("BYTE_STREAM_SPLIT only supports FLOAT and DOUBLE"); + break; + } } else { ParquetException::NYI("Selected encoding is not supported"); } @@ -2236,6 +2346,74 @@ class DeltaByteArrayDecoder : public DecoderImpl, ByteArray last_value_; }; +// ---------------------------------------------------------------------- +// BYTE_STREAM_SPLIT + +template +class ByteStreamSplitDecoder : public DecoderImpl, virtual public TypedDecoder { + public: + using T = typename DType::c_type; + explicit ByteStreamSplitDecoder(const ColumnDescriptor* descr); + + int Decode(T* buffer, int max_values) override; + + int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, + int64_t valid_bits_offset, + typename EncodingTraits::Accumulator* builder) override; + + int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, + int64_t valid_bits_offset, + typename EncodingTraits::DictAccumulator* builder) override; + + void SetData(int num_values, const uint8_t* data, int len) override; + + private: + int num_values_in_buffer{0U}; +}; + +template +ByteStreamSplitDecoder::ByteStreamSplitDecoder(const ColumnDescriptor* descr) + : DecoderImpl(descr, Encoding::BYTE_STREAM_SPLIT) {} + +template +void ByteStreamSplitDecoder::SetData(int num_values, const uint8_t* data, + int len) { + DecoderImpl::SetData(num_values, data, len); + num_values_in_buffer = num_values; +} + +template +int ByteStreamSplitDecoder::Decode(T* buffer, int max_values) { + constexpr size_t num_streams = sizeof(T); + const int values_to_decode = std::min(num_values_, max_values); + const int num_decoded_previously = num_values_in_buffer - num_values_; + for (int i = 0; i < values_to_decode; ++i) { + uint8_t gathered_byte_data[num_streams]; + for (size_t b = 0; b < num_streams; ++b) { + const size_t byte_index = b * num_values_in_buffer + num_decoded_previously + i; + gathered_byte_data[b] = data_[byte_index]; + } + buffer[i] = arrow::util::SafeLoadAs(&gathered_byte_data[0]); + } + num_values_ -= values_to_decode; + len_ -= sizeof(T) * values_to_decode; + return values_to_decode; +} + +template +int ByteStreamSplitDecoder::DecodeArrow( + int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, + typename EncodingTraits::Accumulator* builder) { + ParquetException::NYI("DecodeArrow for ByteStreamSplitDecoder"); +} + +template +int ByteStreamSplitDecoder::DecodeArrow( + int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, + typename EncodingTraits::DictAccumulator* builder) { + ParquetException::NYI("DecodeArrow for ByteStreamSplitDecoder"); +} + // ---------------------------------------------------------------------- std::unique_ptr MakeDecoder(Type::type type_num, Encoding::type encoding, @@ -2261,6 +2439,16 @@ std::unique_ptr MakeDecoder(Type::type type_num, Encoding::type encodin default: break; } + } else if (encoding == Encoding::BYTE_STREAM_SPLIT) { + switch (type_num) { + case Type::FLOAT: + return std::unique_ptr(new ByteStreamSplitDecoder(descr)); + case Type::DOUBLE: + return std::unique_ptr(new ByteStreamSplitDecoder(descr)); + default: + throw ParquetException("BYTE_STREAM_SPLIT only supports FLOAT and DOUBLE"); + break; + } } else { ParquetException::NYI("Selected encoding is not supported"); } diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc index 473b985b35a..4a40d0162f8 100644 --- a/cpp/src/parquet/encoding_test.cc +++ b/cpp/src/parquet/encoding_test.cc @@ -29,6 +29,7 @@ #include "arrow/testing/random.h" #include "arrow/testing/util.h" #include "arrow/type.h" +#include "arrow/util/checked_cast.h" #include "parquet/encoding.h" #include "parquet/platform.h" @@ -38,6 +39,7 @@ using arrow::default_memory_pool; using arrow::MemoryPool; +using arrow::internal::checked_cast; // TODO(hatemhelal): investigate whether this can be replaced with GTEST_SKIP in a future // gtest release that contains https://github.com/google/googletest/pull/1544 @@ -862,5 +864,190 @@ TEST_F(DictEncoding, CheckDecodeIndicesNoNulls) { CheckDict(actual_num_values, *builder); } +// ---------------------------------------------------------------------- +// BYTE_STREAM_SPLIT encode/decode tests. + +template +void TestByteStreamSplitDecodePath(const uint8_t* encoded_data, + const int64_t encoded_data_size, + const typename DType::c_type* expected_decoded_data, + const int num_elements, + const bool request_more_values) { + std::unique_ptr> decoder = + MakeTypedDecoder(Encoding::BYTE_STREAM_SPLIT); + decoder->SetData(num_elements, encoded_data, static_cast(encoded_data_size)); + std::vector decoded_data(num_elements); + int num_elements_to_decode = num_elements; + if (request_more_values) { + num_elements_to_decode += 100; + } + int num_decoded_elements = decoder->Decode(decoded_data.data(), num_elements_to_decode); + ASSERT_EQ(num_elements, num_decoded_elements); + for (size_t i = 0U; i < decoded_data.size(); ++i) { + ASSERT_EQ(expected_decoded_data[i], decoded_data[i]); + } + ASSERT_EQ(0, decoder->values_left()); +} + +template +void TestByteStreamSplitRoundTrip(const typename DType::c_type* input_data, const int n) { + auto encoder = MakeTypedEncoder(Encoding::BYTE_STREAM_SPLIT); + encoder->Put(input_data, n); + const int64_t estimated_num_bytes = encoder->EstimatedDataEncodedSize(); + ASSERT_EQ(static_cast(n) * sizeof(typename DType::c_type), + estimated_num_bytes); + std::shared_ptr buffer = encoder->FlushValues(); + TestByteStreamSplitDecodePath(buffer->data(), buffer->size(), input_data, n, + false); +} + +template +void TestEncodeDecodeWithBigInput() { + const int nvalues = 10000; + using T = typename DType::c_type; + std::vector data(nvalues); + GenerateData(nvalues, data.data(), nullptr); + TestByteStreamSplitRoundTrip(data.data(), nvalues); +} + +// Check that the encoder can handle input with one element. +TEST(ByteStreamSplitEncodeDecode, EncodeOneLenInput) { + const float value = 1.0f; + TestByteStreamSplitRoundTrip(&value, 1); +} + +// Check that the decoder can handle empty input. +TEST(ByteStreamSplitEncodeDecode, DecodeZeroLenInput) { + std::unique_ptr> decoder = + MakeTypedDecoder(Encoding::BYTE_STREAM_SPLIT); + decoder->SetData(0, NULL, 0); + ASSERT_EQ(0U, decoder->Decode(NULL, 0)); +} + +TEST(ByteStreamSplitEncodeDecode, DecodeOneLenInput) { + const uint8_t data[] = {0x47U, 0x24U, 0xa7U, 0x44U}; + TestByteStreamSplitDecodePath( + data, 4, reinterpret_cast(&data[0]), 1, false); +} + +// Check that requesting to decode more elements than is available in the storage +// of the decoder works correctly. +TEST(ByteStreamSplitEncodeDecode, DecodeLargerPortion) { + const uint8_t data[] = {0xDEU, 0xC0U, 0x37U, 0x13U, 0x11U, 0x22U, 0x33U, 0x44U, + 0xAAU, 0xBBU, 0xCCU, 0xDDU, 0x55U, 0x66U, 0x77U, 0x88U}; + const uint64_t expected_output[2] = {0x7755CCAA331137DEULL, 0x8866DDBB442213C0ULL}; + TestByteStreamSplitDecodePath( + data, sizeof(data), reinterpret_cast(&expected_output[0]), 2, true); +} + +// Check that the decoder can decode the input in smaller steps. +TEST(ByteStreamSplitEncodeDecode, DecodeMultipleTimes) { + std::unique_ptr> decoder = + MakeTypedDecoder(Encoding::BYTE_STREAM_SPLIT); + const int num_values = 100; + std::vector data(num_values * 4); + for (size_t i = 0; i < data.size(); ++i) { + data[i] = static_cast(i & 0xFFU); + } + decoder->SetData(num_values, data.data(), num_values * 4); + + const int step = 25; + std::vector decoded_data(step); + for (int i = 0; i < num_values; i += step) { + int num_decoded = decoder->Decode(decoded_data.data(), step); + ASSERT_EQ(step, num_decoded); + for (int j = 0; j < step; ++j) { + const uint32_t assembled_value = + static_cast(data[i + j]) | + (static_cast(data[(i + j) + num_values]) << 8U) | + (static_cast(data[(i + j) + num_values * 2]) << 16U) | + (static_cast(data[(i + j) + num_values * 3]) << 24U); + const float assembled_value_as_float = + *reinterpret_cast(&assembled_value); + ASSERT_EQ(assembled_value_as_float, decoded_data[j]); + } + } +} + +// Check that an encode-decode pipeline produces the original small input. +// This small-input test is added to ease debugging in case of changes to +// the encoder/decoder implementation. +TEST(ByteStreamSplitEncodeDecode, SmallInput) { + const float data[] = {-166.166f, -0.2566f, .0f, 322.0f, 178888.189f}; + const int num_values = sizeof(data) / sizeof(data[0U]); + TestByteStreamSplitRoundTrip(data, num_values); +} + +TEST(ByteStreamSplitEncodeDecode, PutSpaced) { + const float data[] = {-1.0f, .0f, .0f, 3.0f, .0f, 22.1234f, + .0f, 198891.0f, .0f, -223345.4455f, 24443.124f}; + const float valid_data[] = {-1.0f, 3.0f, 22.1234f, + 198891.0f, -223345.4455f, 24443.124f}; + // The valid ones are the ones which are non-zero. + // The enable bits are: 10010101 011. + const uint8_t valid_bits[2] = {0xA9U, 0x6U}; + const int num_values = sizeof(data) / sizeof(data[0U]); + const int num_valid_values = sizeof(valid_data) / sizeof(valid_data[0U]); + std::unique_ptr> encoder = + MakeTypedEncoder(Encoding::BYTE_STREAM_SPLIT); + encoder->PutSpaced(data, num_values, valid_bits, 0); + std::shared_ptr buffer = encoder->FlushValues(); + + TestByteStreamSplitDecodePath(buffer->data(), buffer->size(), valid_data, + num_valid_values, false); +} + +TEST(ByteStreamSplitEncodeDecode, PutArrow) { + arrow::random::RandomArrayGenerator rag{1337}; + const int num_values = 123; + auto arr = rag.Float32(num_values, -2048.0f, 2048.0f, 0); + std::unique_ptr> encoder = + MakeTypedEncoder(Encoding::BYTE_STREAM_SPLIT); + encoder->Put(*arr); + std::shared_ptr buffer = encoder->FlushValues(); + + auto raw_values = checked_cast(*arr).raw_values(); + TestByteStreamSplitDecodePath(buffer->data(), buffer->size(), raw_values, + num_values, false); +} + +// Test that the encode-decode pipeline can handle big 32-bit FP input. +TEST(ByteStreamSplitEncodeDecode, BigInputFloat) { + TestEncodeDecodeWithBigInput(); +} + +// Test that the encode-decode pipeline can handle big 64-bit FP input. +TEST(ByteStreamSplitEncodeDecode, BigInputDouble) { + TestEncodeDecodeWithBigInput(); +} + +TEST(ByteStreamSplitEncodeDecode, InvalidDataTypes) { + // First check encoders. + ASSERT_THROW(MakeTypedEncoder(Encoding::BYTE_STREAM_SPLIT), + ParquetException); + ASSERT_THROW(MakeTypedEncoder(Encoding::BYTE_STREAM_SPLIT), + ParquetException); + ASSERT_THROW(MakeTypedEncoder(Encoding::BYTE_STREAM_SPLIT), + ParquetException); + ASSERT_THROW(MakeTypedEncoder(Encoding::BYTE_STREAM_SPLIT), + ParquetException); + ASSERT_THROW(MakeTypedEncoder(Encoding::BYTE_STREAM_SPLIT), + ParquetException); + ASSERT_THROW(MakeTypedEncoder(Encoding::BYTE_STREAM_SPLIT), ParquetException); + + // Then check decoders. + ASSERT_THROW(MakeTypedDecoder(Encoding::BYTE_STREAM_SPLIT), + ParquetException); + ASSERT_THROW(MakeTypedDecoder(Encoding::BYTE_STREAM_SPLIT), + ParquetException); + ASSERT_THROW(MakeTypedDecoder(Encoding::BYTE_STREAM_SPLIT), + ParquetException); + ASSERT_THROW(MakeTypedDecoder(Encoding::BYTE_STREAM_SPLIT), + ParquetException); + ASSERT_THROW(MakeTypedDecoder(Encoding::BYTE_STREAM_SPLIT), + ParquetException); + ASSERT_THROW(MakeTypedDecoder(Encoding::BYTE_STREAM_SPLIT), ParquetException); +} + } // namespace test } // namespace parquet diff --git a/cpp/src/parquet/types.cc b/cpp/src/parquet/types.cc index e7efe6a263c..9dc13e42191 100644 --- a/cpp/src/parquet/types.cc +++ b/cpp/src/parquet/types.cc @@ -157,6 +157,8 @@ std::string EncodingToString(Encoding::type t) { return "DELTA_BYTE_ARRAY"; case Encoding::RLE_DICTIONARY: return "RLE_DICTIONARY"; + case Encoding::BYTE_STREAM_SPLIT: + return "BYTE_STREAM_SPLIT"; default: return "UNKNOWN"; } diff --git a/cpp/src/parquet/types.h b/cpp/src/parquet/types.h index ebb8c2446e3..420494a8951 100644 --- a/cpp/src/parquet/types.h +++ b/cpp/src/parquet/types.h @@ -452,6 +452,7 @@ struct Encoding { DELTA_LENGTH_BYTE_ARRAY = 6, DELTA_BYTE_ARRAY = 7, RLE_DICTIONARY = 8, + BYTE_STREAM_SPLIT = 9, UNKNOWN = 999 }; };