From b22b46f25e5d0f0be6e5a66e2c42c034294b2d9c Mon Sep 17 00:00:00 2001 From: mwish Date: Wed, 10 Apr 2024 13:45:04 +0800 Subject: [PATCH 01/10] minor: enhance boundary checking in CompressedInputStream --- cpp/src/arrow/io/compressed.cc | 28 +++- cpp/src/arrow/util/deposit_bits.h | 250 ++++++++++++++++++++++++++++++ cpp/src/arrow/util/unpack8.h | 105 +++++++++++++ cpp/submodules/parquet-testing | 2 +- 4 files changed, 376 insertions(+), 9 deletions(-) create mode 100644 cpp/src/arrow/util/deposit_bits.h create mode 100644 cpp/src/arrow/util/unpack8.h diff --git a/cpp/src/arrow/io/compressed.cc b/cpp/src/arrow/io/compressed.cc index 5faa4d095eb..f4440124b17 100644 --- a/cpp/src/arrow/io/compressed.cc +++ b/cpp/src/arrow/io/compressed.cc @@ -269,7 +269,7 @@ class CompressedInputStream::Impl { // Read compressed data if necessary Status EnsureCompressedData() { - int64_t compressed_avail = compressed_ ? compressed_->size() - compressed_pos_ : 0; + int64_t compressed_avail = CompressedBufferAvailable(); if (compressed_avail == 0) { // Ensure compressed_ buffer is allocated with kChunkSize. if (!supports_zero_copy_from_raw_) { @@ -298,9 +298,10 @@ class CompressedInputStream::Impl { } // Decompress some data from the compressed_ buffer. - // Call this function only if the decompressed_ buffer is empty. + // Call this function only if the decompressed_ buffer is fully consumed. Status DecompressData() { - DCHECK_NE(compressed_->data(), nullptr); + DCHECK_NE(0, CompressedBufferAvailable()); + DCHECK_EQ(0, DecompressedBufferAvailable()); int64_t decompress_size = kDecompressSize; @@ -354,7 +355,7 @@ class CompressedInputStream::Impl { // Try to feed more data into the decompressed_ buffer. Status RefillDecompressed(bool* has_data) { // First try to read data from the decompressor - if (compressed_ && compressed_->size() != 0) { + if (CompressedBufferAvailable() != 0) { if (decompressor_->IsFinished()) { // We just went over the end of a previous compressed stream. RETURN_NOT_OK(decompressor_->Reset()); @@ -362,10 +363,12 @@ class CompressedInputStream::Impl { } RETURN_NOT_OK(DecompressData()); } - if (!decompressed_ || decompressed_->size() == 0) { - // Got nothing, need to read more compressed data + int64_t decompress_avail = DecompressedBufferAvailable(); + if (decompress_avail == 0) { + // Got nothing from existing `compressed_`, need to read + // more compressed data RETURN_NOT_OK(EnsureCompressedData()); - if (compressed_pos_ == compressed_->size()) { + if (CompressedBufferAvailable() == 0) { // No more data to decompress if (!fresh_decompressor_ && !decompressor_->IsFinished()) { return Status::IOError("Truncated compressed stream"); @@ -405,13 +408,22 @@ class CompressedInputStream::Impl { ARROW_ASSIGN_OR_RAISE(auto buf, AllocateResizableBuffer(nbytes, pool_)); ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, Read(nbytes, buf->mutable_data())); RETURN_NOT_OK(buf->Resize(bytes_read)); - // Using std::move because the some compiler might has issue below: + // Using std::move because some compiler might has issue below: // https://wg21.cmeerw.net/cwg/issue1579 return std::move(buf); } const std::shared_ptr& raw() const { return raw_; } + private: + int64_t CompressedBufferAvailable() const { + return compressed_ ? compressed_->size() - compressed_pos_ : 0; + } + + int64_t DecompressedBufferAvailable() const { + return decompressed_ ? decompressed_->size() - decompressed_pos_ : 0; + } + private: // Read 64 KB compressed data at a time static const int64_t kChunkSize = 64 * 1024; diff --git a/cpp/src/arrow/util/deposit_bits.h b/cpp/src/arrow/util/deposit_bits.h new file mode 100644 index 00000000000..07cd66482fc --- /dev/null +++ b/cpp/src/arrow/util/deposit_bits.h @@ -0,0 +1,250 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include + +#include "arrow/util/bit_run_reader.h" +#include "arrow/util/bit_util.h" +#include "arrow/util/bitmap_writer.h" +#include "arrow/util/logging.h" +#include "arrow/util/simd.h" + +// clang-format off +/* Python code to generate lookup table: + +kLookupBits = 5 +count = 0 +print('constexpr int kLookupBits = {};'.format(kLookupBits)) +print('constexpr uint8_t kPdepTable[1 << kLookupBits][1 << kLookupBits] = {') +print(' ', end = '') +for mask in range(1 << kLookupBits): + for data in range(1 << kLookupBits): + bit_value = 0 + bit_len = 0 + for i in range(kLookupBits): + if mask & (1 << i): + bit_value |= (((data >> i) & 1) << bit_len) + bit_len += 1 + out = '0x{:02X},'.format(bit_value) + count += 1 + if count % (1 << kLookupBits) == 1: + print(' {') + if count % 8 == 1: + print(' ', end = '') + if count % 8 == 0: + print(out, end = '\n') + else: + print(out, end = ' ') + if count % (1 << kLookupBits) == 0: + print(' },', end = '') +print('\n};') + +*/ +// clang-format on + +constexpr int kLookupBits = 5; +constexpr uint8_t kPextTable[1 << kLookupBits][1 << kLookupBits] = { + { + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + }, + { + 0x00, 0x01, 0x00, 0x01, 0x00, 0x01, 0x00, 0x01, 0x00, 0x01, 0x00, + 0x01, 0x00, 0x01, 0x00, 0x01, 0x00, 0x01, 0x00, 0x01, 0x00, 0x01, + 0x00, 0x01, 0x00, 0x01, 0x00, 0x01, 0x00, 0x01, 0x00, 0x01, + }, + { + 0x00, 0x00, 0x01, 0x01, 0x00, 0x00, 0x01, 0x01, 0x00, 0x00, 0x01, + 0x01, 0x00, 0x00, 0x01, 0x01, 0x00, 0x00, 0x01, 0x01, 0x00, 0x00, + 0x01, 0x01, 0x00, 0x00, 0x01, 0x01, 0x00, 0x00, 0x01, 0x01, + }, + { + 0x00, 0x01, 0x02, 0x03, 0x00, 0x01, 0x02, 0x03, 0x00, 0x01, 0x02, + 0x03, 0x00, 0x01, 0x02, 0x03, 0x00, 0x01, 0x02, 0x03, 0x00, 0x01, + 0x02, 0x03, 0x00, 0x01, 0x02, 0x03, 0x00, 0x01, 0x02, 0x03, + }, + { + 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x01, 0x01, 0x00, 0x00, 0x00, + 0x00, 0x01, 0x01, 0x01, 0x01, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, + 0x01, 0x01, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x01, 0x01, + }, + { + 0x00, 0x01, 0x00, 0x01, 0x02, 0x03, 0x02, 0x03, 0x00, 0x01, 0x00, + 0x01, 0x02, 0x03, 0x02, 0x03, 0x00, 0x01, 0x00, 0x01, 0x02, 0x03, + 0x02, 0x03, 0x00, 0x01, 0x00, 0x01, 0x02, 0x03, 0x02, 0x03, + }, + { + 0x00, 0x00, 0x01, 0x01, 0x02, 0x02, 0x03, 0x03, 0x00, 0x00, 0x01, + 0x01, 0x02, 0x02, 0x03, 0x03, 0x00, 0x00, 0x01, 0x01, 0x02, 0x02, + 0x03, 0x03, 0x00, 0x00, 0x01, 0x01, 0x02, 0x02, 0x03, 0x03, + }, + { + 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x00, 0x01, 0x02, + 0x03, 0x04, 0x05, 0x06, 0x07, 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, + 0x06, 0x07, 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, + }, + { + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x01, + 0x01, 0x01, 0x01, 0x01, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, + }, + { + 0x00, 0x01, 0x00, 0x01, 0x00, 0x01, 0x00, 0x01, 0x02, 0x03, 0x02, + 0x03, 0x02, 0x03, 0x02, 0x03, 0x00, 0x01, 0x00, 0x01, 0x00, 0x01, + 0x00, 0x01, 0x02, 0x03, 0x02, 0x03, 0x02, 0x03, 0x02, 0x03, + }, + { + 0x00, 0x00, 0x01, 0x01, 0x00, 0x00, 0x01, 0x01, 0x02, 0x02, 0x03, + 0x03, 0x02, 0x02, 0x03, 0x03, 0x00, 0x00, 0x01, 0x01, 0x00, 0x00, + 0x01, 0x01, 0x02, 0x02, 0x03, 0x03, 0x02, 0x02, 0x03, 0x03, + }, + { + 0x00, 0x01, 0x02, 0x03, 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, + 0x07, 0x04, 0x05, 0x06, 0x07, 0x00, 0x01, 0x02, 0x03, 0x00, 0x01, + 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x04, 0x05, 0x06, 0x07, + }, + { + 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x01, 0x01, 0x02, 0x02, 0x02, + 0x02, 0x03, 0x03, 0x03, 0x03, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, + 0x01, 0x01, 0x02, 0x02, 0x02, 0x02, 0x03, 0x03, 0x03, 0x03, + }, + { + 0x00, 0x01, 0x00, 0x01, 0x02, 0x03, 0x02, 0x03, 0x04, 0x05, 0x04, + 0x05, 0x06, 0x07, 0x06, 0x07, 0x00, 0x01, 0x00, 0x01, 0x02, 0x03, + 0x02, 0x03, 0x04, 0x05, 0x04, 0x05, 0x06, 0x07, 0x06, 0x07, + }, + { + 0x00, 0x00, 0x01, 0x01, 0x02, 0x02, 0x03, 0x03, 0x04, 0x04, 0x05, + 0x05, 0x06, 0x06, 0x07, 0x07, 0x00, 0x00, 0x01, 0x01, 0x02, 0x02, + 0x03, 0x03, 0x04, 0x04, 0x05, 0x05, 0x06, 0x06, 0x07, 0x07, + }, + { + 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, + 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, + 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, + }, + { + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, + 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, + }, + { + 0x00, 0x01, 0x00, 0x01, 0x00, 0x01, 0x00, 0x01, 0x00, 0x01, 0x00, + 0x01, 0x00, 0x01, 0x00, 0x01, 0x02, 0x03, 0x02, 0x03, 0x02, 0x03, + 0x02, 0x03, 0x02, 0x03, 0x02, 0x03, 0x02, 0x03, 0x02, 0x03, + }, + { + 0x00, 0x00, 0x01, 0x01, 0x00, 0x00, 0x01, 0x01, 0x00, 0x00, 0x01, + 0x01, 0x00, 0x00, 0x01, 0x01, 0x02, 0x02, 0x03, 0x03, 0x02, 0x02, + 0x03, 0x03, 0x02, 0x02, 0x03, 0x03, 0x02, 0x02, 0x03, 0x03, + }, + { + 0x00, 0x01, 0x02, 0x03, 0x00, 0x01, 0x02, 0x03, 0x00, 0x01, 0x02, + 0x03, 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x04, 0x05, + 0x06, 0x07, 0x04, 0x05, 0x06, 0x07, 0x04, 0x05, 0x06, 0x07, + }, + { + 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x01, 0x01, 0x00, 0x00, 0x00, + 0x00, 0x01, 0x01, 0x01, 0x01, 0x02, 0x02, 0x02, 0x02, 0x03, 0x03, + 0x03, 0x03, 0x02, 0x02, 0x02, 0x02, 0x03, 0x03, 0x03, 0x03, + }, + { + 0x00, 0x01, 0x00, 0x01, 0x02, 0x03, 0x02, 0x03, 0x00, 0x01, 0x00, + 0x01, 0x02, 0x03, 0x02, 0x03, 0x04, 0x05, 0x04, 0x05, 0x06, 0x07, + 0x06, 0x07, 0x04, 0x05, 0x04, 0x05, 0x06, 0x07, 0x06, 0x07, + }, + { + 0x00, 0x00, 0x01, 0x01, 0x02, 0x02, 0x03, 0x03, 0x00, 0x00, 0x01, + 0x01, 0x02, 0x02, 0x03, 0x03, 0x04, 0x04, 0x05, 0x05, 0x06, 0x06, + 0x07, 0x07, 0x04, 0x04, 0x05, 0x05, 0x06, 0x06, 0x07, 0x07, + }, + { + 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x00, 0x01, 0x02, + 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, + 0x0E, 0x0F, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, + }, + { + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x01, + 0x01, 0x01, 0x01, 0x01, 0x01, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, + 0x02, 0x02, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, + }, + { + 0x00, 0x01, 0x00, 0x01, 0x00, 0x01, 0x00, 0x01, 0x02, 0x03, 0x02, + 0x03, 0x02, 0x03, 0x02, 0x03, 0x04, 0x05, 0x04, 0x05, 0x04, 0x05, + 0x04, 0x05, 0x06, 0x07, 0x06, 0x07, 0x06, 0x07, 0x06, 0x07, + }, + { + 0x00, 0x00, 0x01, 0x01, 0x00, 0x00, 0x01, 0x01, 0x02, 0x02, 0x03, + 0x03, 0x02, 0x02, 0x03, 0x03, 0x04, 0x04, 0x05, 0x05, 0x04, 0x04, + 0x05, 0x05, 0x06, 0x06, 0x07, 0x07, 0x06, 0x06, 0x07, 0x07, + }, + { + 0x00, 0x01, 0x02, 0x03, 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, + 0x07, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x08, 0x09, + 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x0C, 0x0D, 0x0E, 0x0F, + }, + { + 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x01, 0x01, 0x02, 0x02, 0x02, + 0x02, 0x03, 0x03, 0x03, 0x03, 0x04, 0x04, 0x04, 0x04, 0x05, 0x05, + 0x05, 0x05, 0x06, 0x06, 0x06, 0x06, 0x07, 0x07, 0x07, 0x07, + }, + { + 0x00, 0x01, 0x00, 0x01, 0x02, 0x03, 0x02, 0x03, 0x04, 0x05, 0x04, + 0x05, 0x06, 0x07, 0x06, 0x07, 0x08, 0x09, 0x08, 0x09, 0x0A, 0x0B, + 0x0A, 0x0B, 0x0C, 0x0D, 0x0C, 0x0D, 0x0E, 0x0F, 0x0E, 0x0F, + }, + { + 0x00, 0x00, 0x01, 0x01, 0x02, 0x02, 0x03, 0x03, 0x04, 0x04, 0x05, + 0x05, 0x06, 0x06, 0x07, 0x07, 0x08, 0x08, 0x09, 0x09, 0x0A, 0x0A, + 0x0B, 0x0B, 0x0C, 0x0C, 0x0D, 0x0D, 0x0E, 0x0E, 0x0F, 0x0F, + }, + { + 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, + 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10, 0x11, 0x12, 0x13, 0x14, 0x15, + 0x16, 0x17, 0x18, 0x19, 0x1A, 0x1B, 0x1C, 0x1D, 0x1E, 0x1F, + }, +}; + +inline uint64_t DepositBitsSoftware(uint64_t bitmap, uint64_t mask) { + // A software emulation of _pdep_u64 + + // These checks should be inline and are likely to be common cases. + if (mask == ~uint64_t{0}) { + return bitmap; + } + if (mask == 0) { + return 0; + } + + // Fallback to lookup table method + uint64_t bit_value = 0; + int bit_len = 0; + constexpr uint8_t kLookupMask = (1U << kLookupBits) - 1; + while (mask != 0) { + const uint64_t value = kPdepTable[mask & kLookupMask][bitmap & kLookupMask]; + bit_value |= (value << bit_len); + bit_len += mask_len; + bitmap >>= kLookupBits; + select_bitmap >>= kLookupBits; + } + return bit_value; +} \ No newline at end of file diff --git a/cpp/src/arrow/util/unpack8.h b/cpp/src/arrow/util/unpack8.h new file mode 100644 index 00000000000..1dc8fef74cd --- /dev/null +++ b/cpp/src/arrow/util/unpack8.h @@ -0,0 +1,105 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "arrow/util/endian.h" +#include "arrow/util/visibility.h" + +#include + +namespace arrow { +namespace internal { + + +static constexpr const uint64_t kPdepMask8[] = { + 0x0000000000000000, + 0x0101010101010101, + 0x0303030303030303, + 0x0707070707070707, + 0x0f0f0f0f0f0f0f0f, + 0x1f1f1f1f1f1f1f1f, + 0x3f3f3f3f3f3f3f3f, + 0x7f7f7f7f7f7f7f7f, + 0xffffffffffffffff}; + +template +static inline uint32_t unpackNaive( + const uint8_t* inputBits, + uint64_t inputBufferLen, + uint64_t numValues, + uint8_t bitWidth, + T* result) { + ARROW_CHECK(bitWidth >= 1 && bitWidth <= sizeof(T) * 8); + ARROW_CHECK(inputBufferLen * 8 >= bitWidth * numValues); + + auto mask = BITPACK_MASKS[bitWidth]; + + uint64_t bitPosition = 0; + for (uint32_t i = 0; i < numValues; i++) { + T val = (*inputBits >> bitPosition) & mask; + bitPosition += bitWidth; + while (bitPosition > 8) { + inputBits++; + val |= (*inputBits << (8 - (bitPosition - bitWidth))) & mask; + bitPosition -= 8; + } + result[i] = val; + } + return numValues; +} + +template <> +inline void unpack( + const uint8_t* inputBits, + uint64_t inputBufferLen, + uint64_t numValues, + uint8_t bitWidth, + uint8_t* result) { + ARROW_CHECK(bitWidth >= 1 && bitWidth <= 8); + ARROW_CHECK(inputBufferLen * 8 >= bitWidth * numValues); + +#if XSIMD_WITH_AVX2 + + uint64_t mask = kPdepMask8[bitWidth]; + auto writeEndOffset = result + numValues; + + // Process bitWidth bytes (8 values) a time. Note that for bitWidth 8, the + // performance of direct memcpy is about the same as this solution. + while (result + 8 <= writeEndOffset) { + // Using memcpy() here may result in non-optimized loops by clong. + uint64_t val = *reinterpret_cast(inputBits); + *(reinterpret_cast(result)) = _pdep_u64(val, mask); + inputBits += bitWidth; + result += 8; + } + + numValues = writeEndOffset - result; + unpackNaive( + inputBits, (bitWidth * numValues + 7) / 8, numValues, bitWidth, result); + +#else + + unpackNaive(inputBits, inputBufferLen, numValues, bitWidth, result); + +#endif +} + + + +} // namespace internal +} // namespace arrow diff --git a/cpp/submodules/parquet-testing b/cpp/submodules/parquet-testing index 74278bc4a11..d69d979223e 160000 --- a/cpp/submodules/parquet-testing +++ b/cpp/submodules/parquet-testing @@ -1 +1 @@ -Subproject commit 74278bc4a1122d74945969e6dec405abd1533ec3 +Subproject commit d69d979223e883faef9dc6fe3cf573087243c28a From 81fe3a3a3f7c3c6442cf228564e3dc631b03f4f2 Mon Sep 17 00:00:00 2001 From: mwish Date: Wed, 10 Apr 2024 14:07:49 +0800 Subject: [PATCH 02/10] fix an error --- cpp/src/arrow/io/compressed.cc | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/cpp/src/arrow/io/compressed.cc b/cpp/src/arrow/io/compressed.cc index f4440124b17..b951d96c880 100644 --- a/cpp/src/arrow/io/compressed.cc +++ b/cpp/src/arrow/io/compressed.cc @@ -19,6 +19,7 @@ #include #include +#include #include #include #include @@ -355,7 +356,10 @@ class CompressedInputStream::Impl { // Try to feed more data into the decompressed_ buffer. Status RefillDecompressed(bool* has_data) { // First try to read data from the decompressor - if (CompressedBufferAvailable() != 0) { + // This doesn't use `CompressedBufferAvailable()` because when compressed_ + // exists, and it doesn't contain any available data, it might trigger + // an empty decompress and set fresh_decompressor_ to true. + if (compressed_ && compressed_->size() != 0) { if (decompressor_->IsFinished()) { // We just went over the end of a previous compressed stream. RETURN_NOT_OK(decompressor_->Reset()); From cd87ec5e7ba498006eb90c979cf0e685b0b9af27 Mon Sep 17 00:00:00 2001 From: mwish Date: Wed, 10 Apr 2024 14:09:38 +0800 Subject: [PATCH 03/10] remove rubbish --- cpp/src/arrow/io/compressed.cc | 1 - cpp/src/arrow/util/deposit_bits.h | 250 ------------------------------ cpp/src/arrow/util/unpack8.h | 105 ------------- cpp/submodules/parquet-testing | 2 +- 4 files changed, 1 insertion(+), 357 deletions(-) delete mode 100644 cpp/src/arrow/util/deposit_bits.h delete mode 100644 cpp/src/arrow/util/unpack8.h diff --git a/cpp/src/arrow/io/compressed.cc b/cpp/src/arrow/io/compressed.cc index b951d96c880..0047b0be91b 100644 --- a/cpp/src/arrow/io/compressed.cc +++ b/cpp/src/arrow/io/compressed.cc @@ -19,7 +19,6 @@ #include #include -#include #include #include #include diff --git a/cpp/src/arrow/util/deposit_bits.h b/cpp/src/arrow/util/deposit_bits.h deleted file mode 100644 index 07cd66482fc..00000000000 --- a/cpp/src/arrow/util/deposit_bits.h +++ /dev/null @@ -1,250 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include -#include -#include - -#include "arrow/util/bit_run_reader.h" -#include "arrow/util/bit_util.h" -#include "arrow/util/bitmap_writer.h" -#include "arrow/util/logging.h" -#include "arrow/util/simd.h" - -// clang-format off -/* Python code to generate lookup table: - -kLookupBits = 5 -count = 0 -print('constexpr int kLookupBits = {};'.format(kLookupBits)) -print('constexpr uint8_t kPdepTable[1 << kLookupBits][1 << kLookupBits] = {') -print(' ', end = '') -for mask in range(1 << kLookupBits): - for data in range(1 << kLookupBits): - bit_value = 0 - bit_len = 0 - for i in range(kLookupBits): - if mask & (1 << i): - bit_value |= (((data >> i) & 1) << bit_len) - bit_len += 1 - out = '0x{:02X},'.format(bit_value) - count += 1 - if count % (1 << kLookupBits) == 1: - print(' {') - if count % 8 == 1: - print(' ', end = '') - if count % 8 == 0: - print(out, end = '\n') - else: - print(out, end = ' ') - if count % (1 << kLookupBits) == 0: - print(' },', end = '') -print('\n};') - -*/ -// clang-format on - -constexpr int kLookupBits = 5; -constexpr uint8_t kPextTable[1 << kLookupBits][1 << kLookupBits] = { - { - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - }, - { - 0x00, 0x01, 0x00, 0x01, 0x00, 0x01, 0x00, 0x01, 0x00, 0x01, 0x00, - 0x01, 0x00, 0x01, 0x00, 0x01, 0x00, 0x01, 0x00, 0x01, 0x00, 0x01, - 0x00, 0x01, 0x00, 0x01, 0x00, 0x01, 0x00, 0x01, 0x00, 0x01, - }, - { - 0x00, 0x00, 0x01, 0x01, 0x00, 0x00, 0x01, 0x01, 0x00, 0x00, 0x01, - 0x01, 0x00, 0x00, 0x01, 0x01, 0x00, 0x00, 0x01, 0x01, 0x00, 0x00, - 0x01, 0x01, 0x00, 0x00, 0x01, 0x01, 0x00, 0x00, 0x01, 0x01, - }, - { - 0x00, 0x01, 0x02, 0x03, 0x00, 0x01, 0x02, 0x03, 0x00, 0x01, 0x02, - 0x03, 0x00, 0x01, 0x02, 0x03, 0x00, 0x01, 0x02, 0x03, 0x00, 0x01, - 0x02, 0x03, 0x00, 0x01, 0x02, 0x03, 0x00, 0x01, 0x02, 0x03, - }, - { - 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x01, 0x01, 0x00, 0x00, 0x00, - 0x00, 0x01, 0x01, 0x01, 0x01, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, - 0x01, 0x01, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x01, 0x01, - }, - { - 0x00, 0x01, 0x00, 0x01, 0x02, 0x03, 0x02, 0x03, 0x00, 0x01, 0x00, - 0x01, 0x02, 0x03, 0x02, 0x03, 0x00, 0x01, 0x00, 0x01, 0x02, 0x03, - 0x02, 0x03, 0x00, 0x01, 0x00, 0x01, 0x02, 0x03, 0x02, 0x03, - }, - { - 0x00, 0x00, 0x01, 0x01, 0x02, 0x02, 0x03, 0x03, 0x00, 0x00, 0x01, - 0x01, 0x02, 0x02, 0x03, 0x03, 0x00, 0x00, 0x01, 0x01, 0x02, 0x02, - 0x03, 0x03, 0x00, 0x00, 0x01, 0x01, 0x02, 0x02, 0x03, 0x03, - }, - { - 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x00, 0x01, 0x02, - 0x03, 0x04, 0x05, 0x06, 0x07, 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, - 0x06, 0x07, 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, - }, - { - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x01, - 0x01, 0x01, 0x01, 0x01, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - 0x00, 0x00, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, - }, - { - 0x00, 0x01, 0x00, 0x01, 0x00, 0x01, 0x00, 0x01, 0x02, 0x03, 0x02, - 0x03, 0x02, 0x03, 0x02, 0x03, 0x00, 0x01, 0x00, 0x01, 0x00, 0x01, - 0x00, 0x01, 0x02, 0x03, 0x02, 0x03, 0x02, 0x03, 0x02, 0x03, - }, - { - 0x00, 0x00, 0x01, 0x01, 0x00, 0x00, 0x01, 0x01, 0x02, 0x02, 0x03, - 0x03, 0x02, 0x02, 0x03, 0x03, 0x00, 0x00, 0x01, 0x01, 0x00, 0x00, - 0x01, 0x01, 0x02, 0x02, 0x03, 0x03, 0x02, 0x02, 0x03, 0x03, - }, - { - 0x00, 0x01, 0x02, 0x03, 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, - 0x07, 0x04, 0x05, 0x06, 0x07, 0x00, 0x01, 0x02, 0x03, 0x00, 0x01, - 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x04, 0x05, 0x06, 0x07, - }, - { - 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x01, 0x01, 0x02, 0x02, 0x02, - 0x02, 0x03, 0x03, 0x03, 0x03, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, - 0x01, 0x01, 0x02, 0x02, 0x02, 0x02, 0x03, 0x03, 0x03, 0x03, - }, - { - 0x00, 0x01, 0x00, 0x01, 0x02, 0x03, 0x02, 0x03, 0x04, 0x05, 0x04, - 0x05, 0x06, 0x07, 0x06, 0x07, 0x00, 0x01, 0x00, 0x01, 0x02, 0x03, - 0x02, 0x03, 0x04, 0x05, 0x04, 0x05, 0x06, 0x07, 0x06, 0x07, - }, - { - 0x00, 0x00, 0x01, 0x01, 0x02, 0x02, 0x03, 0x03, 0x04, 0x04, 0x05, - 0x05, 0x06, 0x06, 0x07, 0x07, 0x00, 0x00, 0x01, 0x01, 0x02, 0x02, - 0x03, 0x03, 0x04, 0x04, 0x05, 0x05, 0x06, 0x06, 0x07, 0x07, - }, - { - 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, - 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, - 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, - }, - { - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, - 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, - }, - { - 0x00, 0x01, 0x00, 0x01, 0x00, 0x01, 0x00, 0x01, 0x00, 0x01, 0x00, - 0x01, 0x00, 0x01, 0x00, 0x01, 0x02, 0x03, 0x02, 0x03, 0x02, 0x03, - 0x02, 0x03, 0x02, 0x03, 0x02, 0x03, 0x02, 0x03, 0x02, 0x03, - }, - { - 0x00, 0x00, 0x01, 0x01, 0x00, 0x00, 0x01, 0x01, 0x00, 0x00, 0x01, - 0x01, 0x00, 0x00, 0x01, 0x01, 0x02, 0x02, 0x03, 0x03, 0x02, 0x02, - 0x03, 0x03, 0x02, 0x02, 0x03, 0x03, 0x02, 0x02, 0x03, 0x03, - }, - { - 0x00, 0x01, 0x02, 0x03, 0x00, 0x01, 0x02, 0x03, 0x00, 0x01, 0x02, - 0x03, 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x04, 0x05, - 0x06, 0x07, 0x04, 0x05, 0x06, 0x07, 0x04, 0x05, 0x06, 0x07, - }, - { - 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x01, 0x01, 0x00, 0x00, 0x00, - 0x00, 0x01, 0x01, 0x01, 0x01, 0x02, 0x02, 0x02, 0x02, 0x03, 0x03, - 0x03, 0x03, 0x02, 0x02, 0x02, 0x02, 0x03, 0x03, 0x03, 0x03, - }, - { - 0x00, 0x01, 0x00, 0x01, 0x02, 0x03, 0x02, 0x03, 0x00, 0x01, 0x00, - 0x01, 0x02, 0x03, 0x02, 0x03, 0x04, 0x05, 0x04, 0x05, 0x06, 0x07, - 0x06, 0x07, 0x04, 0x05, 0x04, 0x05, 0x06, 0x07, 0x06, 0x07, - }, - { - 0x00, 0x00, 0x01, 0x01, 0x02, 0x02, 0x03, 0x03, 0x00, 0x00, 0x01, - 0x01, 0x02, 0x02, 0x03, 0x03, 0x04, 0x04, 0x05, 0x05, 0x06, 0x06, - 0x07, 0x07, 0x04, 0x04, 0x05, 0x05, 0x06, 0x06, 0x07, 0x07, - }, - { - 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x00, 0x01, 0x02, - 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, - 0x0E, 0x0F, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, - }, - { - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x01, - 0x01, 0x01, 0x01, 0x01, 0x01, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, - 0x02, 0x02, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, - }, - { - 0x00, 0x01, 0x00, 0x01, 0x00, 0x01, 0x00, 0x01, 0x02, 0x03, 0x02, - 0x03, 0x02, 0x03, 0x02, 0x03, 0x04, 0x05, 0x04, 0x05, 0x04, 0x05, - 0x04, 0x05, 0x06, 0x07, 0x06, 0x07, 0x06, 0x07, 0x06, 0x07, - }, - { - 0x00, 0x00, 0x01, 0x01, 0x00, 0x00, 0x01, 0x01, 0x02, 0x02, 0x03, - 0x03, 0x02, 0x02, 0x03, 0x03, 0x04, 0x04, 0x05, 0x05, 0x04, 0x04, - 0x05, 0x05, 0x06, 0x06, 0x07, 0x07, 0x06, 0x06, 0x07, 0x07, - }, - { - 0x00, 0x01, 0x02, 0x03, 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, - 0x07, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x08, 0x09, - 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x0C, 0x0D, 0x0E, 0x0F, - }, - { - 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x01, 0x01, 0x02, 0x02, 0x02, - 0x02, 0x03, 0x03, 0x03, 0x03, 0x04, 0x04, 0x04, 0x04, 0x05, 0x05, - 0x05, 0x05, 0x06, 0x06, 0x06, 0x06, 0x07, 0x07, 0x07, 0x07, - }, - { - 0x00, 0x01, 0x00, 0x01, 0x02, 0x03, 0x02, 0x03, 0x04, 0x05, 0x04, - 0x05, 0x06, 0x07, 0x06, 0x07, 0x08, 0x09, 0x08, 0x09, 0x0A, 0x0B, - 0x0A, 0x0B, 0x0C, 0x0D, 0x0C, 0x0D, 0x0E, 0x0F, 0x0E, 0x0F, - }, - { - 0x00, 0x00, 0x01, 0x01, 0x02, 0x02, 0x03, 0x03, 0x04, 0x04, 0x05, - 0x05, 0x06, 0x06, 0x07, 0x07, 0x08, 0x08, 0x09, 0x09, 0x0A, 0x0A, - 0x0B, 0x0B, 0x0C, 0x0C, 0x0D, 0x0D, 0x0E, 0x0E, 0x0F, 0x0F, - }, - { - 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, - 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10, 0x11, 0x12, 0x13, 0x14, 0x15, - 0x16, 0x17, 0x18, 0x19, 0x1A, 0x1B, 0x1C, 0x1D, 0x1E, 0x1F, - }, -}; - -inline uint64_t DepositBitsSoftware(uint64_t bitmap, uint64_t mask) { - // A software emulation of _pdep_u64 - - // These checks should be inline and are likely to be common cases. - if (mask == ~uint64_t{0}) { - return bitmap; - } - if (mask == 0) { - return 0; - } - - // Fallback to lookup table method - uint64_t bit_value = 0; - int bit_len = 0; - constexpr uint8_t kLookupMask = (1U << kLookupBits) - 1; - while (mask != 0) { - const uint64_t value = kPdepTable[mask & kLookupMask][bitmap & kLookupMask]; - bit_value |= (value << bit_len); - bit_len += mask_len; - bitmap >>= kLookupBits; - select_bitmap >>= kLookupBits; - } - return bit_value; -} \ No newline at end of file diff --git a/cpp/src/arrow/util/unpack8.h b/cpp/src/arrow/util/unpack8.h deleted file mode 100644 index 1dc8fef74cd..00000000000 --- a/cpp/src/arrow/util/unpack8.h +++ /dev/null @@ -1,105 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include "arrow/util/endian.h" -#include "arrow/util/visibility.h" - -#include - -namespace arrow { -namespace internal { - - -static constexpr const uint64_t kPdepMask8[] = { - 0x0000000000000000, - 0x0101010101010101, - 0x0303030303030303, - 0x0707070707070707, - 0x0f0f0f0f0f0f0f0f, - 0x1f1f1f1f1f1f1f1f, - 0x3f3f3f3f3f3f3f3f, - 0x7f7f7f7f7f7f7f7f, - 0xffffffffffffffff}; - -template -static inline uint32_t unpackNaive( - const uint8_t* inputBits, - uint64_t inputBufferLen, - uint64_t numValues, - uint8_t bitWidth, - T* result) { - ARROW_CHECK(bitWidth >= 1 && bitWidth <= sizeof(T) * 8); - ARROW_CHECK(inputBufferLen * 8 >= bitWidth * numValues); - - auto mask = BITPACK_MASKS[bitWidth]; - - uint64_t bitPosition = 0; - for (uint32_t i = 0; i < numValues; i++) { - T val = (*inputBits >> bitPosition) & mask; - bitPosition += bitWidth; - while (bitPosition > 8) { - inputBits++; - val |= (*inputBits << (8 - (bitPosition - bitWidth))) & mask; - bitPosition -= 8; - } - result[i] = val; - } - return numValues; -} - -template <> -inline void unpack( - const uint8_t* inputBits, - uint64_t inputBufferLen, - uint64_t numValues, - uint8_t bitWidth, - uint8_t* result) { - ARROW_CHECK(bitWidth >= 1 && bitWidth <= 8); - ARROW_CHECK(inputBufferLen * 8 >= bitWidth * numValues); - -#if XSIMD_WITH_AVX2 - - uint64_t mask = kPdepMask8[bitWidth]; - auto writeEndOffset = result + numValues; - - // Process bitWidth bytes (8 values) a time. Note that for bitWidth 8, the - // performance of direct memcpy is about the same as this solution. - while (result + 8 <= writeEndOffset) { - // Using memcpy() here may result in non-optimized loops by clong. - uint64_t val = *reinterpret_cast(inputBits); - *(reinterpret_cast(result)) = _pdep_u64(val, mask); - inputBits += bitWidth; - result += 8; - } - - numValues = writeEndOffset - result; - unpackNaive( - inputBits, (bitWidth * numValues + 7) / 8, numValues, bitWidth, result); - -#else - - unpackNaive(inputBits, inputBufferLen, numValues, bitWidth, result); - -#endif -} - - - -} // namespace internal -} // namespace arrow diff --git a/cpp/submodules/parquet-testing b/cpp/submodules/parquet-testing index d69d979223e..74278bc4a11 160000 --- a/cpp/submodules/parquet-testing +++ b/cpp/submodules/parquet-testing @@ -1 +1 @@ -Subproject commit d69d979223e883faef9dc6fe3cf573087243c28a +Subproject commit 74278bc4a1122d74945969e6dec405abd1533ec3 From 833826f8513e71a230e45f7aa82a53c2a56c5dcf Mon Sep 17 00:00:00 2001 From: mwish Date: Wed, 10 Apr 2024 15:34:48 +0800 Subject: [PATCH 04/10] update comment --- cpp/src/arrow/io/compressed.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cpp/src/arrow/io/compressed.cc b/cpp/src/arrow/io/compressed.cc index 0047b0be91b..59be4d9100c 100644 --- a/cpp/src/arrow/io/compressed.cc +++ b/cpp/src/arrow/io/compressed.cc @@ -354,10 +354,10 @@ class CompressedInputStream::Impl { // Try to feed more data into the decompressed_ buffer. Status RefillDecompressed(bool* has_data) { - // First try to read data from the decompressor + // First try to read data from the decompressor. // This doesn't use `CompressedBufferAvailable()` because when compressed_ - // exists, and it doesn't contain any available data, it might trigger - // an empty decompress and set fresh_decompressor_ to true. + // exists and available == 0, it might trigger an empty decompress and set + // `decompressor_->IsFinished()` to true. if (compressed_ && compressed_->size() != 0) { if (decompressor_->IsFinished()) { // We just went over the end of a previous compressed stream. From 0adf757638d419a64407211fa9b3756e640a0d83 Mon Sep 17 00:00:00 2001 From: mwish Date: Wed, 10 Apr 2024 16:15:25 +0800 Subject: [PATCH 05/10] fix failed ci --- cpp/src/arrow/io/compressed.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cpp/src/arrow/io/compressed.cc b/cpp/src/arrow/io/compressed.cc index 59be4d9100c..9bf3d69f2c8 100644 --- a/cpp/src/arrow/io/compressed.cc +++ b/cpp/src/arrow/io/compressed.cc @@ -300,7 +300,8 @@ class CompressedInputStream::Impl { // Decompress some data from the compressed_ buffer. // Call this function only if the decompressed_ buffer is fully consumed. Status DecompressData() { - DCHECK_NE(0, CompressedBufferAvailable()); + // Currently, CompressedBufferAvailable() could be 0 in DecompressData(). + DCHECK_NE(compressed_->data(), nullptr); DCHECK_EQ(0, DecompressedBufferAvailable()); int64_t decompress_size = kDecompressSize; From 90b38ced69ca38fbbb76216c46634727da2f52b8 Mon Sep 17 00:00:00 2001 From: mwish Date: Wed, 10 Apr 2024 18:37:32 +0800 Subject: [PATCH 06/10] fix comment --- cpp/src/arrow/io/compressed.cc | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/cpp/src/arrow/io/compressed.cc b/cpp/src/arrow/io/compressed.cc index 9bf3d69f2c8..28267e0c68d 100644 --- a/cpp/src/arrow/io/compressed.cc +++ b/cpp/src/arrow/io/compressed.cc @@ -269,7 +269,7 @@ class CompressedInputStream::Impl { // Read compressed data if necessary Status EnsureCompressedData() { - int64_t compressed_avail = CompressedBufferAvailable(); + int64_t compressed_avail = compressed_buffer_available(); if (compressed_avail == 0) { // Ensure compressed_ buffer is allocated with kChunkSize. if (!supports_zero_copy_from_raw_) { @@ -300,9 +300,10 @@ class CompressedInputStream::Impl { // Decompress some data from the compressed_ buffer. // Call this function only if the decompressed_ buffer is fully consumed. Status DecompressData() { - // Currently, CompressedBufferAvailable() could be 0 in DecompressData(). + // Currently, CompressedBufferAvailable() could be 0 in DecompressData() + // because `decompressor_` might have it's own internal buffer. DCHECK_NE(compressed_->data(), nullptr); - DCHECK_EQ(0, DecompressedBufferAvailable()); + DCHECK_EQ(0, decompressed_buffer_available()); int64_t decompress_size = kDecompressSize; @@ -357,8 +358,8 @@ class CompressedInputStream::Impl { Status RefillDecompressed(bool* has_data) { // First try to read data from the decompressor. // This doesn't use `CompressedBufferAvailable()` because when compressed_ - // exists and available == 0, it might trigger an empty decompress and set - // `decompressor_->IsFinished()` to true. + // exists and available == 0, the decompressor might still have data to + // be read. if (compressed_ && compressed_->size() != 0) { if (decompressor_->IsFinished()) { // We just went over the end of a previous compressed stream. @@ -367,12 +368,12 @@ class CompressedInputStream::Impl { } RETURN_NOT_OK(DecompressData()); } - int64_t decompress_avail = DecompressedBufferAvailable(); + int64_t decompress_avail = decompressed_buffer_available(); if (decompress_avail == 0) { // Got nothing from existing `compressed_`, need to read // more compressed data RETURN_NOT_OK(EnsureCompressedData()); - if (CompressedBufferAvailable() == 0) { + if (compressed_buffer_available() == 0) { // No more data to decompress if (!fresh_decompressor_ && !decompressor_->IsFinished()) { return Status::IOError("Truncated compressed stream"); @@ -420,11 +421,11 @@ class CompressedInputStream::Impl { const std::shared_ptr& raw() const { return raw_; } private: - int64_t CompressedBufferAvailable() const { + int64_t compressed_buffer_available() const { return compressed_ ? compressed_->size() - compressed_pos_ : 0; } - int64_t DecompressedBufferAvailable() const { + int64_t decompressed_buffer_available() const { return decompressed_ ? decompressed_->size() - decompressed_pos_ : 0; } From 514403660d93487646b7df13fc5180b82a568bce Mon Sep 17 00:00:00 2001 From: mwish Date: Wed, 10 Apr 2024 20:00:06 +0800 Subject: [PATCH 07/10] fix comment --- cpp/src/arrow/io/compressed.cc | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/cpp/src/arrow/io/compressed.cc b/cpp/src/arrow/io/compressed.cc index 28267e0c68d..7e4b9782aa8 100644 --- a/cpp/src/arrow/io/compressed.cc +++ b/cpp/src/arrow/io/compressed.cc @@ -297,11 +297,11 @@ class CompressedInputStream::Impl { return Status::OK(); } - // Decompress some data from the compressed_ buffer. + // Decompress some data from the compressed_ buffer and decompressor_. // Call this function only if the decompressed_ buffer is fully consumed. Status DecompressData() { - // Currently, CompressedBufferAvailable() could be 0 in DecompressData() - // because `decompressor_` might have it's own internal buffer. + // Currently, compressed_buffer_available() could be 0 in DecompressData() + // because `decompressor_` might have its own internal buffer. DCHECK_NE(compressed_->data(), nullptr); DCHECK_EQ(0, decompressed_buffer_available()); @@ -357,9 +357,8 @@ class CompressedInputStream::Impl { // Try to feed more data into the decompressed_ buffer. Status RefillDecompressed(bool* has_data) { // First try to read data from the decompressor. - // This doesn't use `CompressedBufferAvailable()` because when compressed_ - // exists and available == 0, the decompressor might still have data to - // be read. + // This doesn't use `compressed_buffer_available()` because when compressed_ + // exists, even the decompressor might still have data to be read. if (compressed_ && compressed_->size() != 0) { if (decompressor_->IsFinished()) { // We just went over the end of a previous compressed stream. @@ -370,8 +369,8 @@ class CompressedInputStream::Impl { } int64_t decompress_avail = decompressed_buffer_available(); if (decompress_avail == 0) { - // Got nothing from existing `compressed_`, need to read - // more compressed data + // Got nothing from existing `compressed_` and `decompressor_`, + // need to read more compressed data. RETURN_NOT_OK(EnsureCompressedData()); if (compressed_buffer_available() == 0) { // No more data to decompress From 582e9d4c11ace27d7a4af54b0af35711e8113405 Mon Sep 17 00:00:00 2001 From: mwish <1506118561@qq.com> Date: Wed, 10 Apr 2024 23:29:37 +0800 Subject: [PATCH 08/10] Apply suggestions from code review Co-authored-by: Antoine Pitrou --- cpp/src/arrow/io/compressed.cc | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/cpp/src/arrow/io/compressed.cc b/cpp/src/arrow/io/compressed.cc index 7e4b9782aa8..386af535ea9 100644 --- a/cpp/src/arrow/io/compressed.cc +++ b/cpp/src/arrow/io/compressed.cc @@ -297,11 +297,12 @@ class CompressedInputStream::Impl { return Status::OK(); } - // Decompress some data from the compressed_ buffer and decompressor_. + // Decompress some data from the compressed_ buffer into decompressor_. // Call this function only if the decompressed_ buffer is fully consumed. Status DecompressData() { - // Currently, compressed_buffer_available() could be 0 in DecompressData() - // because `decompressor_` might have its own internal buffer. + // compressed_buffer_available() could be 0 here because there might + // still be some decompressed data left to emit even though the compressed + // data was entirely consumed (especially if the expansion factor is large) DCHECK_NE(compressed_->data(), nullptr); DCHECK_EQ(0, decompressed_buffer_available()); @@ -356,9 +357,8 @@ class CompressedInputStream::Impl { // Try to feed more data into the decompressed_ buffer. Status RefillDecompressed(bool* has_data) { - // First try to read data from the decompressor. - // This doesn't use `compressed_buffer_available()` because when compressed_ - // exists, even the decompressor might still have data to be read. + // First try to read data from the decompressor, unless we haven't read any + // compressed data yet. if (compressed_ && compressed_->size() != 0) { if (decompressor_->IsFinished()) { // We just went over the end of a previous compressed stream. From a740df334244a0a43fca0c00db4740ff6756bd03 Mon Sep 17 00:00:00 2001 From: mwish Date: Thu, 11 Apr 2024 23:55:34 +0800 Subject: [PATCH 09/10] use Result rather than bool* and Status --- cpp/src/arrow/io/compressed.cc | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/cpp/src/arrow/io/compressed.cc b/cpp/src/arrow/io/compressed.cc index 386af535ea9..86161d1eed2 100644 --- a/cpp/src/arrow/io/compressed.cc +++ b/cpp/src/arrow/io/compressed.cc @@ -356,9 +356,7 @@ class CompressedInputStream::Impl { } // Try to feed more data into the decompressed_ buffer. - Status RefillDecompressed(bool* has_data) { - // First try to read data from the decompressor, unless we haven't read any - // compressed data yet. + Result RefillDecompressed() { if (compressed_ && compressed_->size() != 0) { if (decompressor_->IsFinished()) { // We just went over the end of a previous compressed stream. @@ -377,13 +375,11 @@ class CompressedInputStream::Impl { if (!fresh_decompressor_ && !decompressor_->IsFinished()) { return Status::IOError("Truncated compressed stream"); } - *has_data = false; - return Status::OK(); + return false; } RETURN_NOT_OK(DecompressData()); } - *has_data = true; - return Status::OK(); + return true; } Result Read(int64_t nbytes, void* out) { @@ -401,7 +397,7 @@ class CompressedInputStream::Impl { // At this point, no more decompressed data remains, so we need to // decompress more - RETURN_NOT_OK(RefillDecompressed(&decompressor_has_data)); + ARROW_ASSIGN_OR_RAISE(decompressor_has_data, RefillDecompressed()); } total_pos_ += total_read; From 084d458f1c5a284596f3282fad7ca94e7359870c Mon Sep 17 00:00:00 2001 From: mwish Date: Fri, 12 Apr 2024 01:25:09 +0800 Subject: [PATCH 10/10] add comment for Refill return value --- cpp/src/arrow/io/compressed.cc | 3 +++ 1 file changed, 3 insertions(+) diff --git a/cpp/src/arrow/io/compressed.cc b/cpp/src/arrow/io/compressed.cc index 86161d1eed2..6a6fbf40f96 100644 --- a/cpp/src/arrow/io/compressed.cc +++ b/cpp/src/arrow/io/compressed.cc @@ -356,7 +356,10 @@ class CompressedInputStream::Impl { } // Try to feed more data into the decompressed_ buffer. + // Returns whether there is more data to read. Result RefillDecompressed() { + // First try to read data from the decompressor, unless we haven't read any + // compressed data yet. if (compressed_ && compressed_->size() != 0) { if (decompressor_->IsFinished()) { // We just went over the end of a previous compressed stream.