Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cpp/src/parquet/column_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,12 @@ class ColumnReaderImplBase {
decoders_[static_cast<int>(encoding)] = std::move(decoder);
break;
}
case Encoding::BYTE_STREAM_SPLIT: {
auto decoder = MakeTypedDecoder<DType>(Encoding::BYTE_STREAM_SPLIT, descr_);
current_decoder_ = decoder.get();
decoders_[static_cast<int>(encoding)] = std::move(decoder);
break;
}
case Encoding::RLE_DICTIONARY:
throw ParquetException("Dictionary page must be before data page.");

Expand Down
188 changes: 188 additions & 0 deletions cpp/src/parquet/encoding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -815,6 +815,104 @@ void DictEncoderImpl<ByteArrayType>::PutDictionary(const arrow::Array& values) {
}
}

// ----------------------------------------------------------------------
// ByteStreamSplitEncoder<T> implementations

template <typename DType>
class ByteStreamSplitEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {
public:
using T = typename DType::c_type;
using TypedEncoder<DType>::Put;

explicit ByteStreamSplitEncoder(
const ColumnDescriptor* descr,
::arrow::MemoryPool* pool = ::arrow::default_memory_pool());

int64_t EstimatedDataEncodedSize() override;
std::shared_ptr<Buffer> FlushValues() override;

void Put(const T* buffer, int num_values) override;
void Put(const arrow::Array& values) override;
void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
int64_t valid_bits_offset) override;

protected:
arrow::TypedBufferBuilder<T> values_;

private:
void PutArrowArray(const arrow::Array& values);
};

template <typename DType>
ByteStreamSplitEncoder<DType>::ByteStreamSplitEncoder(const ColumnDescriptor* descr,
::arrow::MemoryPool* pool)
: EncoderImpl(descr, Encoding::BYTE_STREAM_SPLIT, pool), values_{pool} {}

template <typename DType>
int64_t ByteStreamSplitEncoder<DType>::EstimatedDataEncodedSize() {
return values_.length() * sizeof(T);
}

template <typename DType>
std::shared_ptr<Buffer> ByteStreamSplitEncoder<DType>::FlushValues() {
constexpr size_t num_streams = sizeof(T);
std::shared_ptr<ResizableBuffer> output_buffer =
AllocateBuffer(this->memory_pool(), EstimatedDataEncodedSize());
uint8_t* output_buffer_raw = output_buffer->mutable_data();
const size_t num_values = values_.length();
const uint8_t* raw_values = reinterpret_cast<const uint8_t*>(values_.data());
for (size_t i = 0; i < num_values; ++i) {
for (size_t j = 0U; j < num_streams; ++j) {
const uint8_t byte_in_value = raw_values[i * num_streams + j];
output_buffer_raw[j * num_values + i] = byte_in_value;
}
}
values_.Reset();
return std::move(output_buffer);
}

template <typename DType>
void ByteStreamSplitEncoder<DType>::Put(const T* buffer, int num_values) {
PARQUET_THROW_NOT_OK(values_.Append(buffer, num_values));
}

template <typename DType>
void ByteStreamSplitEncoder<DType>::Put(const ::arrow::Array& values) {
PutArrowArray(values);
}

template <>
void ByteStreamSplitEncoder<FloatType>::PutArrowArray(const ::arrow::Array& values) {
DirectPutImpl<arrow::FloatArray>(values,
reinterpret_cast<arrow::BufferBuilder*>(&values_));
}

template <>
void ByteStreamSplitEncoder<DoubleType>::PutArrowArray(const ::arrow::Array& values) {
DirectPutImpl<arrow::DoubleArray>(values,
reinterpret_cast<arrow::BufferBuilder*>(&values_));
}

template <typename DType>
void ByteStreamSplitEncoder<DType>::PutSpaced(const T* src, int num_values,
const uint8_t* valid_bits,
int64_t valid_bits_offset) {
std::shared_ptr<ResizableBuffer> buffer;
PARQUET_THROW_NOT_OK(arrow::AllocateResizableBuffer(this->memory_pool(),
num_values * sizeof(T), &buffer));
int32_t num_valid_values = 0;
arrow::internal::BitmapReader valid_bits_reader(valid_bits, valid_bits_offset,
num_values);
T* data = reinterpret_cast<T*>(buffer->mutable_data());
for (int32_t i = 0; i < num_values; i++) {
if (valid_bits_reader.IsSet()) {
data[num_valid_values++] = src[i];
}
valid_bits_reader.Next();
}
Put(data, num_valid_values);
}

// ----------------------------------------------------------------------
// Encoder and decoder factory functions

Expand Down Expand Up @@ -863,6 +961,18 @@ std::unique_ptr<Encoder> MakeEncoder(Type::type type_num, Encoding::type encodin
DCHECK(false) << "Encoder not implemented";
break;
}
} else if (encoding == Encoding::BYTE_STREAM_SPLIT) {
switch (type_num) {
case Type::FLOAT:
return std::unique_ptr<Encoder>(
new ByteStreamSplitEncoder<FloatType>(descr, pool));
case Type::DOUBLE:
return std::unique_ptr<Encoder>(
new ByteStreamSplitEncoder<DoubleType>(descr, pool));
default:
throw ParquetException("BYTE_STREAM_SPLIT only supports FLOAT and DOUBLE");
break;
}
} else {
ParquetException::NYI("Selected encoding is not supported");
}
Expand Down Expand Up @@ -2236,6 +2346,74 @@ class DeltaByteArrayDecoder : public DecoderImpl,
ByteArray last_value_;
};

// ----------------------------------------------------------------------
// BYTE_STREAM_SPLIT

template <typename DType>
class ByteStreamSplitDecoder : public DecoderImpl, virtual public TypedDecoder<DType> {
public:
using T = typename DType::c_type;
explicit ByteStreamSplitDecoder(const ColumnDescriptor* descr);

int Decode(T* buffer, int max_values) override;

int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
int64_t valid_bits_offset,
typename EncodingTraits<DType>::Accumulator* builder) override;

int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
int64_t valid_bits_offset,
typename EncodingTraits<DType>::DictAccumulator* builder) override;

void SetData(int num_values, const uint8_t* data, int len) override;

private:
int num_values_in_buffer{0U};
};

template <typename DType>
ByteStreamSplitDecoder<DType>::ByteStreamSplitDecoder(const ColumnDescriptor* descr)
: DecoderImpl(descr, Encoding::BYTE_STREAM_SPLIT) {}

template <typename DType>
void ByteStreamSplitDecoder<DType>::SetData(int num_values, const uint8_t* data,
int len) {
DecoderImpl::SetData(num_values, data, len);
num_values_in_buffer = num_values;
}

template <typename DType>
int ByteStreamSplitDecoder<DType>::Decode(T* buffer, int max_values) {
constexpr size_t num_streams = sizeof(T);
const int values_to_decode = std::min(num_values_, max_values);
const int num_decoded_previously = num_values_in_buffer - num_values_;
for (int i = 0; i < values_to_decode; ++i) {
uint8_t gathered_byte_data[num_streams];
for (size_t b = 0; b < num_streams; ++b) {
const size_t byte_index = b * num_values_in_buffer + num_decoded_previously + i;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... do you think the compiler is able to unroll the inner loop? Otherwise, perhaps you can precompute the stream pointers outside of the loop:

  const uint8_t* stream_data[num_streams];
  for (size_t b = 0; b < num_streams; ++b) {
    stream_data[b] = data_ + b * num_values_in_buffer + num_decoded_previously;
  }
  for (int i = 0; i < values_to_decode; ++i) {
    uint8_t gathered_byte_data[num_streams];
    for (size_t b = 0; b < num_streams; ++b) {
      gathered_byte_data[b] = stream_data[b][byte_index];
    }
    // ...

Copy link
Contributor Author

@martinradev martinradev Jan 23, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the loop is unrolled for the decoder and encoder for both float and double types:

Encoder<float>
   0x00007ffff736f060 <+624>:	movzx  r10d,BYTE PTR [rax+r9*4]
   0x00007ffff736f065 <+629>:	mov    BYTE PTR [r8+r9*1],r10b
   0x00007ffff736f069 <+633>:	movzx  r10d,BYTE PTR [rax+r9*4+0x1]
   0x00007ffff736f06f <+639>:	mov    BYTE PTR [rdx+r9*1],r10b
   0x00007ffff736f073 <+643>:	movzx  r10d,BYTE PTR [rax+r9*4+0x2]
   0x00007ffff736f079 <+649>:	mov    BYTE PTR [rdi+r9*1],r10b
   0x00007ffff736f07d <+653>:	movzx  r10d,BYTE PTR [rax+r9*4+0x3]
   0x00007ffff736f083 <+659>:	mov    BYTE PTR [rsi+r9*1],r10b
   0x00007ffff736f087 <+663>:	add    r9,0x1
   0x00007ffff736f08b <+667>:	cmp    rcx,r9
   0x00007ffff736f08e <+670>:	ja     0x7ffff736f060 <_ZN7parquet22ByteStreamSplitEncoderINS_12PhysicalTypeILNS_4Type4typeE4EEEE11FlushValuesEv+624>

Decoder<float>
   0x7ffff736eca8 <_ZTv0_n64_N7parquet22ByteStreamSplitDecoderINS_12PhysicalTypeILNS_4Type4typeE4EEEE6DecodeEPfi+104>:	movzx  ecx,BYTE PTR [r11+rdx*1]
   0x7ffff736ecad <_ZTv0_n64_N7parquet22ByteStreamSplitDecoderINS_12PhysicalTypeILNS_4Type4typeE4EEEE6DecodeEPfi+109>:	mov    al,BYTE PTR [rbx+rdx*1]
   0x7ffff736ecb0 <_ZTv0_n64_N7parquet22ByteStreamSplitDecoderINS_12PhysicalTypeILNS_4Type4typeE4EEEE6DecodeEPfi+112>:	movzx  edi,BYTE PTR [r10+rdx*1]
   0x7ffff736ecb5 <_ZTv0_n64_N7parquet22ByteStreamSplitDecoderINS_12PhysicalTypeILNS_4Type4typeE4EEEE6DecodeEPfi+117>:	mov    ah,cl
   0x7ffff736ecb7 <_ZTv0_n64_N7parquet22ByteStreamSplitDecoderINS_12PhysicalTypeILNS_4Type4typeE4EEEE6DecodeEPfi+119>:	movzx  ecx,BYTE PTR [r9+rdx*1]
   0x7ffff736ecbc <_ZTv0_n64_N7parquet22ByteStreamSplitDecoderINS_12PhysicalTypeILNS_4Type4typeE4EEEE6DecodeEPfi+124>:	shl    edi,0x10
   0x7ffff736ecbf <_ZTv0_n64_N7parquet22ByteStreamSplitDecoderINS_12PhysicalTypeILNS_4Type4typeE4EEEE6DecodeEPfi+127>:	movzx  eax,ax
   0x7ffff736ecc2 <_ZTv0_n64_N7parquet22ByteStreamSplitDecoderINS_12PhysicalTypeILNS_4Type4typeE4EEEE6DecodeEPfi+130>:	or     eax,edi
   0x7ffff736ecc4 <_ZTv0_n64_N7parquet22ByteStreamSplitDecoderINS_12PhysicalTypeILNS_4Type4typeE4EEEE6DecodeEPfi+132>:	shl    ecx,0x18
   0x7ffff736ecc7 <_ZTv0_n64_N7parquet22ByteStreamSplitDecoderINS_12PhysicalTypeILNS_4Type4typeE4EEEE6DecodeEPfi+135>:	or     eax,ecx
   0x7ffff736ecc9 <_ZTv0_n64_N7parquet22ByteStreamSplitDecoderINS_12PhysicalTypeILNS_4Type4typeE4EEEE6DecodeEPfi+137>:	mov    DWORD PTR [rsi+rdx*4],eax
   0x7ffff736eccc <_ZTv0_n64_N7parquet22ByteStreamSplitDecoderINS_12PhysicalTypeILNS_4Type4typeE4EEEE6DecodeEPfi+140>:	add    rdx,0x1
   0x7ffff736ecd0 <_ZTv0_n64_N7parquet22ByteStreamSplitDecoderINS_12PhysicalTypeILNS_4Type4typeE4EEEE6DecodeEPfi+144>:	cmp    r8,rdx
   0x7ffff736ecd3 <_ZTv0_n64_N7parquet22ByteStreamSplitDecoderINS_12PhysicalTypeILNS_4Type4typeE4EEEE6DecodeEPfi+147>:	jne    0x7ffff736eca8 <_ZTv0_n64_N7parquet22ByteStreamSplitDecoderINS_12PhysicalTypeILNS_4Type4typeE4EEEE6DecodeEPfi+104>

Encoder<double>
   0x00007ffff736ee70 <+128>:	movzx  esi,BYTE PTR [rdx]
   0x00007ffff736ee73 <+131>:	add    rax,0x1
   0x00007ffff736ee77 <+135>:	add    rdx,0x8
   0x00007ffff736ee7b <+139>:	mov    BYTE PTR [rax-0x1],sil
   0x00007ffff736ee7f <+143>:	movzx  esi,BYTE PTR [rdx-0x7]
   0x00007ffff736ee83 <+147>:	mov    BYTE PTR [rax+rcx*1-0x1],sil
   0x00007ffff736ee88 <+152>:	movzx  esi,BYTE PTR [rdx-0x6]
   0x00007ffff736ee8c <+156>:	mov    BYTE PTR [rax+r10*1-0x1],sil
   0x00007ffff736ee91 <+161>:	movzx  esi,BYTE PTR [rdx-0x5]
   0x00007ffff736ee95 <+165>:	mov    BYTE PTR [rax+r8*1-0x1],sil
   0x00007ffff736ee9a <+170>:	movzx  esi,BYTE PTR [rdx-0x4]
   0x00007ffff736ee9e <+174>:	mov    BYTE PTR [rax+r9*1-0x1],sil
   0x00007ffff736eea3 <+179>:	movzx  esi,BYTE PTR [rdx-0x3]
   0x00007ffff736eea7 <+183>:	mov    BYTE PTR [rax+rbp*1-0x1],sil
   0x00007ffff736eeac <+188>:	movzx  esi,BYTE PTR [rdx-0x2]
   0x00007ffff736eeb0 <+192>:	mov    BYTE PTR [rax+rbx*1-0x1],sil
   0x00007ffff736eeb5 <+197>:	movzx  esi,BYTE PTR [rdx-0x1]
   0x00007ffff736eeb9 <+201>:	mov    BYTE PTR [rax+rdi*1-0x1],sil
   0x00007ffff736eebe <+206>:	cmp    r11,rax
   0x00007ffff736eec1 <+209>:	jne    0x7ffff736ee70 <_ZN7parquet22ByteStreamSplitEncoderINS_12PhysicalTypeILNS_4Type4typeE5EEEE11FlushValuesEv+128>

Decoder<double>:
   0x00007ffff736ecc8 <+184>:	movzx  r15d,BYTE PTR [r13+rdx*1+0x0]
   0x00007ffff736ecce <+190>:	mov    al,BYTE PTR [r14+rdx*1]
   0x00007ffff736ecd2 <+194>:	mov    rcx,r15
   0x00007ffff736ecd5 <+197>:	movzx  r15d,BYTE PTR [r12+rdx*1]
   0x00007ffff736ecda <+202>:	mov    ah,cl
   0x00007ffff736ecdc <+204>:	movabs rcx,0xffff00ffffffffff
   0x00007ffff736ece6 <+214>:	and    rax,0xffffffffff00ffff
   0x00007ffff736ecec <+220>:	shl    r15,0x10
   0x00007ffff736ecf0 <+224>:	or     rax,r15
   0x00007ffff736ecf3 <+227>:	movzx  r15d,BYTE PTR [rbp+rdx*1+0x0]
   0x00007ffff736ecf9 <+233>:	and    rax,r8
   0x00007ffff736ecfc <+236>:	shl    r15,0x18
   0x00007ffff736ed00 <+240>:	or     rax,r15
   0x00007ffff736ed03 <+243>:	movzx  r15d,BYTE PTR [rbx+rdx*1]
   0x00007ffff736ed08 <+248>:	and    rax,rdi
   0x00007ffff736ed0b <+251>:	shl    r15,0x20
   0x00007ffff736ed0f <+255>:	or     rax,r15
   0x00007ffff736ed12 <+258>:	movzx  r15d,BYTE PTR [r11+rdx*1]
   0x00007ffff736ed17 <+263>:	and    rax,rcx
   0x00007ffff736ed1a <+266>:	movabs rcx,0xff00ffffffffffff
   0x00007ffff736ed24 <+276>:	shl    r15,0x28
   0x00007ffff736ed28 <+280>:	or     rax,r15
   0x00007ffff736ed2b <+283>:	movzx  r15d,BYTE PTR [r10+rdx*1]
   0x00007ffff736ed30 <+288>:	and    rax,rcx
   0x00007ffff736ed33 <+291>:	movabs rcx,0xffffffffffffff
   0x00007ffff736ed3d <+301>:	shl    r15,0x30
   0x00007ffff736ed41 <+305>:	or     rax,r15
   0x00007ffff736ed44 <+308>:	movzx  r15d,BYTE PTR [r9+rdx*1]
   0x00007ffff736ed49 <+313>:	and    rax,rcx
   0x00007ffff736ed4c <+316>:	shl    r15,0x38
   0x00007ffff736ed50 <+320>:	or     rax,r15
   0x00007ffff736ed53 <+323>:	mov    QWORD PTR [rsi+rdx*8],rax
   0x00007ffff736ed57 <+327>:	add    rdx,0x1
   0x00007ffff736ed5b <+331>:	cmp    QWORD PTR [rsp+0x8],rdx
   0x00007ffff736ed60 <+336>:	jne    0x7ffff736ecc8 <_ZTv0_n64_N7parquet22ByteStreamSplitDecoderINS_12PhysicalTypeILNS_4Type4typeE5EEEE6DecodeEPdi+184>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the compiler does not decide to simd-vectorize any of the loops.
In my experiments, it does make a difference to use simd -> https://github.com/martinradev/arrow-fp-compression-bench/blob/master/optimize_byte_stream_split/report_final.pdf
I have an implementation for the encoder and decoder, and I plan to submit a PR once we get this patch in.
Do you think this makes sense?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It probably makes sense, yes.

gathered_byte_data[b] = data_[byte_index];
}
buffer[i] = arrow::util::SafeLoadAs<T>(&gathered_byte_data[0]);
}
num_values_ -= values_to_decode;
len_ -= sizeof(T) * values_to_decode;
return values_to_decode;
}

template <typename DType>
int ByteStreamSplitDecoder<DType>::DecodeArrow(
int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
typename EncodingTraits<DType>::Accumulator* builder) {
ParquetException::NYI("DecodeArrow for ByteStreamSplitDecoder");
}

template <typename DType>
int ByteStreamSplitDecoder<DType>::DecodeArrow(
int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
typename EncodingTraits<DType>::DictAccumulator* builder) {
ParquetException::NYI("DecodeArrow for ByteStreamSplitDecoder");
}

// ----------------------------------------------------------------------

std::unique_ptr<Decoder> MakeDecoder(Type::type type_num, Encoding::type encoding,
Expand All @@ -2261,6 +2439,16 @@ std::unique_ptr<Decoder> MakeDecoder(Type::type type_num, Encoding::type encodin
default:
break;
}
} else if (encoding == Encoding::BYTE_STREAM_SPLIT) {
switch (type_num) {
case Type::FLOAT:
return std::unique_ptr<Decoder>(new ByteStreamSplitDecoder<FloatType>(descr));
case Type::DOUBLE:
return std::unique_ptr<Decoder>(new ByteStreamSplitDecoder<DoubleType>(descr));
default:
throw ParquetException("BYTE_STREAM_SPLIT only supports FLOAT and DOUBLE");
break;
}
} else {
ParquetException::NYI("Selected encoding is not supported");
}
Expand Down
Loading