diff --git a/cpp/src/arrow/io/CMakeLists.txt b/cpp/src/arrow/io/CMakeLists.txt index 041d5110834..f7afbca5580 100644 --- a/cpp/src/arrow/io/CMakeLists.txt +++ b/cpp/src/arrow/io/CMakeLists.txt @@ -43,5 +43,7 @@ if(NOT (${ARROW_SIMD_LEVEL} STREQUAL "NONE") AND NOT (${ARROW_SIMD_LEVEL} STREQU add_arrow_benchmark(memory_benchmark PREFIX "arrow-io") endif() +add_arrow_benchmark(compressed_benchmark PREFIX "arrow-io") + # Headers: top level arrow_install_all_headers("arrow/io") diff --git a/cpp/src/arrow/io/compressed.cc b/cpp/src/arrow/io/compressed.cc index 6c484242a4f..d06101748dc 100644 --- a/cpp/src/arrow/io/compressed.cc +++ b/cpp/src/arrow/io/compressed.cc @@ -201,7 +201,7 @@ Result> CompressedOutputStream::Make( util::Codec* codec, const std::shared_ptr& raw, MemoryPool* pool) { // CAUTION: codec is not owned std::shared_ptr res(new CompressedOutputStream); - res->impl_.reset(new Impl(pool, std::move(raw))); + res->impl_.reset(new Impl(pool, raw)); RETURN_NOT_OK(res->impl_->Init(codec)); return res; } @@ -233,8 +233,10 @@ class CompressedInputStream::Impl { : pool_(pool), raw_(raw), is_open_(true), + supports_zero_copy_from_raw_(raw_->supports_zero_copy()), compressed_pos_(0), decompressed_pos_(0), + fresh_decompressor_(false), total_pos_(0) {} Status Init(Codec* codec) { @@ -261,7 +263,7 @@ class CompressedInputStream::Impl { } } - bool closed() { return !is_open_; } + bool closed() const { return !is_open_; } Result Tell() const { return total_pos_; } @@ -269,8 +271,27 @@ class CompressedInputStream::Impl { Status EnsureCompressedData() { int64_t compressed_avail = compressed_ ? compressed_->size() - compressed_pos_ : 0; if (compressed_avail == 0) { - // No compressed data available, read a full chunk - ARROW_ASSIGN_OR_RAISE(compressed_, raw_->Read(kChunkSize)); + // Ensure compressed_ buffer is allocated with kChunkSize. + if (!supports_zero_copy_from_raw_) { + if (compressed_for_non_zero_copy_ == nullptr) { + ARROW_ASSIGN_OR_RAISE(compressed_for_non_zero_copy_, + AllocateResizableBuffer(kChunkSize, pool_)); + } else if (compressed_for_non_zero_copy_->size() != kChunkSize) { + RETURN_NOT_OK( + compressed_for_non_zero_copy_->Resize(kChunkSize, /*shrink_to_fit=*/false)); + } + ARROW_ASSIGN_OR_RAISE( + int64_t read_size, + raw_->Read(kChunkSize, + compressed_for_non_zero_copy_->mutable_data_as())); + if (read_size != compressed_for_non_zero_copy_->size()) { + RETURN_NOT_OK( + compressed_for_non_zero_copy_->Resize(read_size, /*shrink_to_fit=*/false)); + } + compressed_ = compressed_for_non_zero_copy_; + } else { + ARROW_ASSIGN_OR_RAISE(compressed_, raw_->Read(kChunkSize)); + } compressed_pos_ = 0; } return Status::OK(); @@ -284,8 +305,13 @@ class CompressedInputStream::Impl { int64_t decompress_size = kDecompressSize; while (true) { - ARROW_ASSIGN_OR_RAISE(decompressed_, - AllocateResizableBuffer(decompress_size, pool_)); + if (decompressed_ == nullptr) { + ARROW_ASSIGN_OR_RAISE(decompressed_, + AllocateResizableBuffer(decompress_size, pool_)); + } else { + // Shrinking the buffer if it's already large enough + RETURN_NOT_OK(decompressed_->Resize(decompress_size, /*shrink_to_fit=*/true)); + } decompressed_pos_ = 0; int64_t input_len = compressed_->size() - compressed_pos_; @@ -300,7 +326,9 @@ class CompressedInputStream::Impl { fresh_decompressor_ = false; } if (result.bytes_written > 0 || !result.need_more_output || input_len == 0) { - RETURN_NOT_OK(decompressed_->Resize(result.bytes_written)); + // Not calling shrink_to_fit here because we're likely to reusing the buffer. + RETURN_NOT_OK( + decompressed_->Resize(result.bytes_written, /*shrink_to_fit=*/false)); break; } DCHECK_EQ(result.bytes_written, 0); @@ -310,7 +338,7 @@ class CompressedInputStream::Impl { return Status::OK(); } - // Read a given number of bytes from the decompressed_ buffer. + // Copying a given number of bytes from the decompressed_ buffer. int64_t ReadFromDecompressed(int64_t nbytes, uint8_t* out) { int64_t readable = decompressed_ ? (decompressed_->size() - decompressed_pos_) : 0; int64_t read_bytes = std::min(readable, nbytes); @@ -318,11 +346,6 @@ class CompressedInputStream::Impl { if (read_bytes > 0) { memcpy(out, decompressed_->data() + decompressed_pos_, read_bytes); decompressed_pos_ += read_bytes; - - if (decompressed_pos_ == decompressed_->size()) { - // Decompressed data is exhausted, release buffer - decompressed_.reset(); - } } return read_bytes; @@ -357,7 +380,7 @@ class CompressedInputStream::Impl { } Result Read(int64_t nbytes, void* out) { - auto out_data = reinterpret_cast(out); + auto* out_data = reinterpret_cast(out); int64_t total_read = 0; bool decompressor_has_data = true; @@ -382,10 +405,10 @@ class CompressedInputStream::Impl { ARROW_ASSIGN_OR_RAISE(auto buf, AllocateResizableBuffer(nbytes, pool_)); ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, Read(nbytes, buf->mutable_data())); RETURN_NOT_OK(buf->Resize(bytes_read)); - return std::move(buf); + return buf; } - std::shared_ptr raw() const { return raw_; } + const std::shared_ptr& raw() const { return raw_; } private: // Read 64 KB compressed data at a time @@ -396,7 +419,12 @@ class CompressedInputStream::Impl { MemoryPool* pool_; std::shared_ptr raw_; bool is_open_; + const bool supports_zero_copy_from_raw_; std::shared_ptr decompressor_; + // If `raw_->supports_zero_copy()`, this buffer would not allocate memory. + // Otherwise, this buffer would allocate `kChunkSize` memory and read data from + // `raw_`. + std::shared_ptr compressed_for_non_zero_copy_; std::shared_ptr compressed_; // Position in compressed buffer int64_t compressed_pos_; @@ -413,10 +441,9 @@ Result> CompressedInputStream::Make( Codec* codec, const std::shared_ptr& raw, MemoryPool* pool) { // CAUTION: codec is not owned std::shared_ptr res(new CompressedInputStream); - res->impl_.reset(new Impl(pool, std::move(raw))); + res->impl_.reset(new Impl(pool, raw)); RETURN_NOT_OK(res->impl_->Init(codec)); return res; - return Status::OK(); } CompressedInputStream::~CompressedInputStream() { internal::CloseFromDestructor(this); } diff --git a/cpp/src/arrow/io/compressed.h b/cpp/src/arrow/io/compressed.h index cd1a7f673ce..6b4e7ab4d72 100644 --- a/cpp/src/arrow/io/compressed.h +++ b/cpp/src/arrow/io/compressed.h @@ -44,6 +44,9 @@ class ARROW_EXPORT CompressedOutputStream : public OutputStream { ~CompressedOutputStream() override; /// \brief Create a compressed output stream wrapping the given output stream. + /// + /// The codec must be capable of streaming compression. Some codecs, + /// like Snappy, are not able to do so. static Result> Make( util::Codec* codec, const std::shared_ptr& raw, MemoryPool* pool = default_memory_pool()); @@ -82,6 +85,9 @@ class ARROW_EXPORT CompressedInputStream ~CompressedInputStream() override; /// \brief Create a compressed input stream wrapping the given input stream. + /// + /// The codec must be capable of streaming decompression. Some codecs, + /// like Snappy, are not able to do so. static Result> Make( util::Codec* codec, const std::shared_ptr& raw, MemoryPool* pool = default_memory_pool()); diff --git a/cpp/src/arrow/io/compressed_benchmark.cc b/cpp/src/arrow/io/compressed_benchmark.cc new file mode 100644 index 00000000000..52a30d8cb08 --- /dev/null +++ b/cpp/src/arrow/io/compressed_benchmark.cc @@ -0,0 +1,200 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "benchmark/benchmark.h" + +#include +#include +#include +#include +#include +#include +#include + +#include "arrow/buffer.h" +#include "arrow/io/compressed.h" +#include "arrow/io/memory.h" +#include "arrow/result.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/util/compression.h" +#include "arrow/util/config.h" +#include "arrow/util/logging.h" +#include "arrow/util/macros.h" + +namespace arrow::io { + +using ::arrow::Compression; + +std::vector MakeCompressibleData(int data_size) { + // XXX This isn't a real-world corpus so doesn't really represent the + // comparative qualities of the algorithms + + // First make highly compressible data + std::string base_data = + "Apache Arrow is a cross-language development platform for in-memory data"; + int nrepeats = static_cast(1 + data_size / base_data.size()); + + std::vector data(base_data.size() * nrepeats); + for (int i = 0; i < nrepeats; ++i) { + std::memcpy(data.data() + i * base_data.size(), base_data.data(), base_data.size()); + } + data.resize(data_size); + + // Then randomly mutate some bytes so as to make things harder + std::mt19937 engine(42); + std::exponential_distribution<> offsets(0.05); + std::uniform_int_distribution<> values(0, 255); + + int64_t pos = 0; + while (pos < data_size) { + data[pos] = static_cast(values(engine)); + pos += static_cast(offsets(engine)); + } + + return data; +} + +// Using a non-zero copy buffer reader to benchmark the non-zero copy path. +class NonZeroCopyBufferReader final : public InputStream { + public: + NonZeroCopyBufferReader(std::shared_ptr buffer) : reader_(std::move(buffer)) {} + + bool supports_zero_copy() const override { return false; } + + Result Read(int64_t nbytes, void* out) override { + return reader_.Read(nbytes, out); + } + + Result> Read(int64_t nbytes) override { + // Testing the non-zero copy path like reading from local file or Object store, + // so we need to allocate a buffer and copy the data. + ARROW_ASSIGN_OR_RAISE(auto buf, ::arrow::AllocateResizableBuffer(nbytes)); + ARROW_ASSIGN_OR_RAISE(int64_t size, Read(nbytes, buf->mutable_data())); + ARROW_RETURN_NOT_OK(buf->Resize(size)); + return buf; + } + Status Close() override { return reader_.Close(); } + Result Tell() const override { return reader_.Tell(); } + bool closed() const override { return reader_.closed(); } + + private: + ::arrow::io::BufferReader reader_; +}; + +enum class BufferReadMode { ProvidedByCaller, ReturnedByCallee }; + +template +static void CompressedInputStreamBenchmark(::benchmark::State& state, + Compression::type compression) { + const int64_t input_size = state.range(0); + const int64_t batch_size = state.range(1); + + const std::vector data = MakeCompressibleData(static_cast(input_size)); + auto codec = ::arrow::util::Codec::Create(compression).ValueOrDie(); + int64_t max_compress_len = + codec->MaxCompressedLen(static_cast(data.size()), data.data()); + std::shared_ptr<::arrow::ResizableBuffer> buf = + ::arrow::AllocateResizableBuffer(max_compress_len).ValueOrDie(); + const int64_t compressed_length = + codec + ->Compress(static_cast(data.size()), data.data(), max_compress_len, + buf->mutable_data()) + .ValueOrDie(); + ABORT_NOT_OK(buf->Resize(compressed_length)); + for (auto _ : state) { + state.PauseTiming(); + auto reader = std::make_shared(buf); + [[maybe_unused]] std::unique_ptr read_buffer; + if constexpr (Mode == BufferReadMode::ProvidedByCaller) { + read_buffer = ::arrow::AllocateBuffer(batch_size).ValueOrDie(); + } + state.ResumeTiming(); + // Put `CompressedInputStream::Make` in timing. + auto input_stream = + ::arrow::io::CompressedInputStream::Make(codec.get(), reader).ValueOrDie(); + auto remaining_size = input_size; + while (remaining_size > 0) { + if constexpr (Mode == BufferReadMode::ProvidedByCaller) { + auto value = input_stream->Read(batch_size, read_buffer->mutable_data()); + ABORT_NOT_OK(value); + remaining_size -= value.ValueOrDie(); + } else { + auto value = input_stream->Read(batch_size); + ABORT_NOT_OK(value); + remaining_size -= value.ValueOrDie()->size(); + } + } + } + state.SetBytesProcessed(input_size * state.iterations()); +} + +template +static void CompressedInputStreamZeroCopyBufferProvidedByCaller( + ::benchmark::State& state) { + CompressedInputStreamBenchmark<::arrow::io::BufferReader, + BufferReadMode::ProvidedByCaller>(state, kCompression); +} + +template +static void CompressedInputStreamNonZeroCopyBufferProvidedByCaller( + ::benchmark::State& state) { + CompressedInputStreamBenchmark(state, kCompression); +} + +template +static void CompressedInputStreamZeroCopyBufferReturnedByCallee( + ::benchmark::State& state) { + CompressedInputStreamBenchmark<::arrow::io::BufferReader, + BufferReadMode::ReturnedByCallee>(state, kCompression); +} + +template +static void CompressedInputStreamNonZeroCopyBufferReturnedByCallee( + ::benchmark::State& state) { + CompressedInputStreamBenchmark(state, kCompression); +} + +static void CompressedInputArguments(::benchmark::internal::Benchmark* b) { + b->ArgNames({"num_bytes", "batch_size"}) + ->Args({8 * 1024, 8 * 1024}) + ->Args({64 * 1024, 8 * 1024}) + ->Args({64 * 1024, 64 * 1024}) + ->Args({1024 * 1024, 8 * 1024}) + ->Args({1024 * 1024, 64 * 1024}) + ->Args({1024 * 1024, 1024 * 1024}); +} + +#ifdef ARROW_WITH_LZ4 +// Benchmark LZ4 because it's lightweight, which makes benchmarking focused on the +// overhead of the compression input stream. +BENCHMARK_TEMPLATE(CompressedInputStreamZeroCopyBufferProvidedByCaller, + Compression::LZ4_FRAME) + ->Apply(CompressedInputArguments); +BENCHMARK_TEMPLATE(CompressedInputStreamNonZeroCopyBufferProvidedByCaller, + Compression::LZ4_FRAME) + ->Apply(CompressedInputArguments); +BENCHMARK_TEMPLATE(CompressedInputStreamZeroCopyBufferReturnedByCallee, + Compression::LZ4_FRAME) + ->Apply(CompressedInputArguments); +BENCHMARK_TEMPLATE(CompressedInputStreamNonZeroCopyBufferReturnedByCallee, + Compression::LZ4_FRAME) + ->Apply(CompressedInputArguments); +#endif + +} // namespace arrow::io