From 76341d52caf1d29b1b57aab6bae580ef7f43f9d0 Mon Sep 17 00:00:00 2001 From: mwish Date: Fri, 26 Jan 2024 22:42:22 +0800 Subject: [PATCH 01/15] Reuse same buffer in CompressedInputStream --- cpp/src/arrow/io/compressed.cc | 38 +++++++++++++++++++++++++--------- cpp/src/arrow/io/compressed.h | 6 ++++++ 2 files changed, 34 insertions(+), 10 deletions(-) diff --git a/cpp/src/arrow/io/compressed.cc b/cpp/src/arrow/io/compressed.cc index 6c484242a4f..f9573613c41 100644 --- a/cpp/src/arrow/io/compressed.cc +++ b/cpp/src/arrow/io/compressed.cc @@ -235,6 +235,7 @@ class CompressedInputStream::Impl { is_open_(true), compressed_pos_(0), decompressed_pos_(0), + fresh_decompressor_(false), total_pos_(0) {} Status Init(Codec* codec) { @@ -261,7 +262,7 @@ class CompressedInputStream::Impl { } } - bool closed() { return !is_open_; } + bool closed() const { return !is_open_; } Result Tell() const { return total_pos_; } @@ -269,13 +270,31 @@ class CompressedInputStream::Impl { Status EnsureCompressedData() { int64_t compressed_avail = compressed_ ? compressed_->size() - compressed_pos_ : 0; if (compressed_avail == 0) { + // Ensure compressed_ buffer is allocated with kChunkSize. + if (compressed_ == nullptr) { + ARROW_ASSIGN_OR_RAISE(compressed_, AllocateResizableBuffer(kChunkSize, pool_)); + } else { + RETURN_NOT_OK(compressed_->Resize(kChunkSize, /*shrink_to_fit=*/false)); + } // No compressed data available, read a full chunk - ARROW_ASSIGN_OR_RAISE(compressed_, raw_->Read(kChunkSize)); + ARROW_ASSIGN_OR_RAISE(int64_t read_size, + raw_->Read(kChunkSize, compressed_->mutable_data_as())); + RETURN_NOT_OK(compressed_->Resize(read_size, /*shrink_to_fit=*/false)); compressed_pos_ = 0; } return Status::OK(); } + Result> GetDecompressBuffer(int64_t decompress_size) { + if (decompressed_impl_ == nullptr) { + ARROW_ASSIGN_OR_RAISE(decompressed_impl_, + AllocateResizableBuffer(decompress_size, pool_)); + } else { + RETURN_NOT_OK(decompressed_impl_->Resize(decompress_size, /*shrink_to_fit=*/false)); + } + return decompressed_impl_; + } + // Decompress some data from the compressed_ buffer. // Call this function only if the decompressed_ buffer is empty. Status DecompressData() { @@ -284,8 +303,7 @@ class CompressedInputStream::Impl { int64_t decompress_size = kDecompressSize; while (true) { - ARROW_ASSIGN_OR_RAISE(decompressed_, - AllocateResizableBuffer(decompress_size, pool_)); + ARROW_ASSIGN_OR_RAISE(decompressed_, GetDecompressBuffer(decompress_size)); decompressed_pos_ = 0; int64_t input_len = compressed_->size() - compressed_pos_; @@ -310,7 +328,7 @@ class CompressedInputStream::Impl { return Status::OK(); } - // Read a given number of bytes from the decompressed_ buffer. + // Copying a given number of bytes from the decompressed_ buffer. int64_t ReadFromDecompressed(int64_t nbytes, uint8_t* out) { int64_t readable = decompressed_ ? (decompressed_->size() - decompressed_pos_) : 0; int64_t read_bytes = std::min(readable, nbytes); @@ -357,7 +375,7 @@ class CompressedInputStream::Impl { } Result Read(int64_t nbytes, void* out) { - auto out_data = reinterpret_cast(out); + auto* out_data = reinterpret_cast(out); int64_t total_read = 0; bool decompressor_has_data = true; @@ -382,10 +400,10 @@ class CompressedInputStream::Impl { ARROW_ASSIGN_OR_RAISE(auto buf, AllocateResizableBuffer(nbytes, pool_)); ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, Read(nbytes, buf->mutable_data())); RETURN_NOT_OK(buf->Resize(bytes_read)); - return std::move(buf); + return buf; } - std::shared_ptr raw() const { return raw_; } + const std::shared_ptr& raw() const { return raw_; } private: // Read 64 KB compressed data at a time @@ -397,10 +415,11 @@ class CompressedInputStream::Impl { std::shared_ptr raw_; bool is_open_; std::shared_ptr decompressor_; - std::shared_ptr compressed_; + std::shared_ptr compressed_; // Position in compressed buffer int64_t compressed_pos_; std::shared_ptr decompressed_; + std::shared_ptr decompressed_impl_; // Position in decompressed buffer int64_t decompressed_pos_; // True if the decompressor hasn't read any data yet. @@ -416,7 +435,6 @@ Result> CompressedInputStream::Make( res->impl_.reset(new Impl(pool, std::move(raw))); RETURN_NOT_OK(res->impl_->Init(codec)); return res; - return Status::OK(); } CompressedInputStream::~CompressedInputStream() { internal::CloseFromDestructor(this); } diff --git a/cpp/src/arrow/io/compressed.h b/cpp/src/arrow/io/compressed.h index cd1a7f673ce..1f487510537 100644 --- a/cpp/src/arrow/io/compressed.h +++ b/cpp/src/arrow/io/compressed.h @@ -44,6 +44,9 @@ class ARROW_EXPORT CompressedOutputStream : public OutputStream { ~CompressedOutputStream() override; /// \brief Create a compressed output stream wrapping the given output stream. + /// + /// The codec must be able to streaming compress the data. Some codecs, + /// like snapppy, is not able to do so. static Result> Make( util::Codec* codec, const std::shared_ptr& raw, MemoryPool* pool = default_memory_pool()); @@ -82,6 +85,9 @@ class ARROW_EXPORT CompressedInputStream ~CompressedInputStream() override; /// \brief Create a compressed input stream wrapping the given input stream. + /// + /// The codec must be able to streaming decompress the data. Some codecs, + /// like snapppy, is not able to do so. static Result> Make( util::Codec* codec, const std::shared_ptr& raw, MemoryPool* pool = default_memory_pool()); From 6f9d865df3e5e8d3d1a167ba93b92a97ac3cd4f6 Mon Sep 17 00:00:00 2001 From: mwish Date: Tue, 30 Jan 2024 22:33:27 +0800 Subject: [PATCH 02/15] Resolve comments --- cpp/src/arrow/io/compressed.cc | 55 ++++++++++++++++++---------------- cpp/src/arrow/io/compressed.h | 8 ++--- 2 files changed, 34 insertions(+), 29 deletions(-) diff --git a/cpp/src/arrow/io/compressed.cc b/cpp/src/arrow/io/compressed.cc index f9573613c41..6d4c432ef71 100644 --- a/cpp/src/arrow/io/compressed.cc +++ b/cpp/src/arrow/io/compressed.cc @@ -233,6 +233,7 @@ class CompressedInputStream::Impl { : pool_(pool), raw_(raw), is_open_(true), + supports_zero_copy_from_raw_(raw->supports_zero_copy()), compressed_pos_(0), decompressed_pos_(0), fresh_decompressor_(false), @@ -271,30 +272,30 @@ class CompressedInputStream::Impl { int64_t compressed_avail = compressed_ ? compressed_->size() - compressed_pos_ : 0; if (compressed_avail == 0) { // Ensure compressed_ buffer is allocated with kChunkSize. - if (compressed_ == nullptr) { - ARROW_ASSIGN_OR_RAISE(compressed_, AllocateResizableBuffer(kChunkSize, pool_)); + if (supports_zero_copy_from_raw_) { + if (compressed_for_non_zero_copy_ == nullptr) { + ARROW_ASSIGN_OR_RAISE(compressed_for_non_zero_copy_, + AllocateResizableBuffer(kChunkSize, pool_)); + } else { + RETURN_NOT_OK( + compressed_for_non_zero_copy_->Resize(kChunkSize, /*shrink_to_fit=*/false)); + } + compressed_ = nullptr; + ARROW_ASSIGN_OR_RAISE( + int64_t read_size, + raw_->Read(kChunkSize, + compressed_for_non_zero_copy_->mutable_data_as())); + RETURN_NOT_OK( + compressed_for_non_zero_copy_->Resize(read_size, /*shrink_to_fit=*/false)); + compressed_ = compressed_for_non_zero_copy_; } else { - RETURN_NOT_OK(compressed_->Resize(kChunkSize, /*shrink_to_fit=*/false)); + ARROW_ASSIGN_OR_RAISE(compressed_, raw_->Read(kChunkSize)); } - // No compressed data available, read a full chunk - ARROW_ASSIGN_OR_RAISE(int64_t read_size, - raw_->Read(kChunkSize, compressed_->mutable_data_as())); - RETURN_NOT_OK(compressed_->Resize(read_size, /*shrink_to_fit=*/false)); compressed_pos_ = 0; } return Status::OK(); } - Result> GetDecompressBuffer(int64_t decompress_size) { - if (decompressed_impl_ == nullptr) { - ARROW_ASSIGN_OR_RAISE(decompressed_impl_, - AllocateResizableBuffer(decompress_size, pool_)); - } else { - RETURN_NOT_OK(decompressed_impl_->Resize(decompress_size, /*shrink_to_fit=*/false)); - } - return decompressed_impl_; - } - // Decompress some data from the compressed_ buffer. // Call this function only if the decompressed_ buffer is empty. Status DecompressData() { @@ -303,7 +304,12 @@ class CompressedInputStream::Impl { int64_t decompress_size = kDecompressSize; while (true) { - ARROW_ASSIGN_OR_RAISE(decompressed_, GetDecompressBuffer(decompress_size)); + if (decompressed_ == nullptr) { + ARROW_ASSIGN_OR_RAISE(decompressed_, + AllocateResizableBuffer(decompress_size, pool_)); + } else { + RETURN_NOT_OK(decompressed_->Resize(decompress_size, /*shrink_to_fit=*/false)); + } decompressed_pos_ = 0; int64_t input_len = compressed_->size() - compressed_pos_; @@ -336,11 +342,6 @@ class CompressedInputStream::Impl { if (read_bytes > 0) { memcpy(out, decompressed_->data() + decompressed_pos_, read_bytes); decompressed_pos_ += read_bytes; - - if (decompressed_pos_ == decompressed_->size()) { - // Decompressed data is exhausted, release buffer - decompressed_.reset(); - } } return read_bytes; @@ -414,12 +415,16 @@ class CompressedInputStream::Impl { MemoryPool* pool_; std::shared_ptr raw_; bool is_open_; + bool supports_zero_copy_from_raw_; std::shared_ptr decompressor_; - std::shared_ptr compressed_; + // If `raw_->supports_zero_copy()`, this buffer would not allocate memory. + // Otherwise, this buffer would allocate `kChunkSize` memory and read data from + // `raw_`. + std::shared_ptr compressed_for_non_zero_copy_; + std::shared_ptr compressed_; // Position in compressed buffer int64_t compressed_pos_; std::shared_ptr decompressed_; - std::shared_ptr decompressed_impl_; // Position in decompressed buffer int64_t decompressed_pos_; // True if the decompressor hasn't read any data yet. diff --git a/cpp/src/arrow/io/compressed.h b/cpp/src/arrow/io/compressed.h index 1f487510537..470d4db8e4d 100644 --- a/cpp/src/arrow/io/compressed.h +++ b/cpp/src/arrow/io/compressed.h @@ -45,8 +45,8 @@ class ARROW_EXPORT CompressedOutputStream : public OutputStream { /// \brief Create a compressed output stream wrapping the given output stream. /// - /// The codec must be able to streaming compress the data. Some codecs, - /// like snapppy, is not able to do so. + /// The codec must be capaable of streaming compression. Some codecs, + /// like Snappy, are not able to do so. static Result> Make( util::Codec* codec, const std::shared_ptr& raw, MemoryPool* pool = default_memory_pool()); @@ -86,8 +86,8 @@ class ARROW_EXPORT CompressedInputStream /// \brief Create a compressed input stream wrapping the given input stream. /// - /// The codec must be able to streaming decompress the data. Some codecs, - /// like snapppy, is not able to do so. + /// The codec must be capaable of streaming decompression. Some codecs, + /// like Snappy, are not able to do so. static Result> Make( util::Codec* codec, const std::shared_ptr& raw, MemoryPool* pool = default_memory_pool()); From fb2bb77ff0748c6670ab5c015a0ff5e2f0179ce3 Mon Sep 17 00:00:00 2001 From: mwish Date: Wed, 31 Jan 2024 01:05:21 +0800 Subject: [PATCH 03/15] fix stupid error --- cpp/src/arrow/io/compressed.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/io/compressed.cc b/cpp/src/arrow/io/compressed.cc index 6d4c432ef71..9e467f3bc8b 100644 --- a/cpp/src/arrow/io/compressed.cc +++ b/cpp/src/arrow/io/compressed.cc @@ -272,7 +272,7 @@ class CompressedInputStream::Impl { int64_t compressed_avail = compressed_ ? compressed_->size() - compressed_pos_ : 0; if (compressed_avail == 0) { // Ensure compressed_ buffer is allocated with kChunkSize. - if (supports_zero_copy_from_raw_) { + if (!supports_zero_copy_from_raw_) { if (compressed_for_non_zero_copy_ == nullptr) { ARROW_ASSIGN_OR_RAISE(compressed_for_non_zero_copy_, AllocateResizableBuffer(kChunkSize, pool_)); From de100e904381780db38f934d089037e266a71304 Mon Sep 17 00:00:00 2001 From: mwish Date: Tue, 6 Feb 2024 00:53:13 +0800 Subject: [PATCH 04/15] Add benchmark for compress input --- cpp/src/arrow/io/CMakeLists.txt | 2 + cpp/src/arrow/io/compressed_benchmark.cc | 118 +++++++++++++++++++++++ 2 files changed, 120 insertions(+) create mode 100644 cpp/src/arrow/io/compressed_benchmark.cc diff --git a/cpp/src/arrow/io/CMakeLists.txt b/cpp/src/arrow/io/CMakeLists.txt index d8224192ce0..178258cd408 100644 --- a/cpp/src/arrow/io/CMakeLists.txt +++ b/cpp/src/arrow/io/CMakeLists.txt @@ -42,5 +42,7 @@ if(NOT (${ARROW_SIMD_LEVEL} STREQUAL "NONE") AND NOT (${ARROW_SIMD_LEVEL} STREQU add_arrow_benchmark(memory_benchmark PREFIX "arrow-io") endif() +add_arrow_benchmark(compressed_benchmark PREFIX "arrow-io") + # Headers: top level arrow_install_all_headers("arrow/io") diff --git a/cpp/src/arrow/io/compressed_benchmark.cc b/cpp/src/arrow/io/compressed_benchmark.cc new file mode 100644 index 00000000000..aeecec090c6 --- /dev/null +++ b/cpp/src/arrow/io/compressed_benchmark.cc @@ -0,0 +1,118 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "benchmark/benchmark.h" + +#include +#include +#include +#include +#include +#include +#include + +#include "arrow/buffer.h" +#include "arrow/io/memory.h" +#include "arrow/result.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/util/compression.h" +#include "arrow/util/logging.h" +#include "arrow/util/macros.h" +#include "compressed.h" + +namespace arrow::io { + +std::vector MakeCompressibleData(int data_size) { + // XXX This isn't a real-world corpus so doesn't really represent the + // comparative qualities of the algorithms + + // First make highly compressible data + std::string base_data = + "Apache Arrow is a cross-language development platform for in-memory data"; + int nrepeats = static_cast(1 + data_size / base_data.size()); + + std::vector data(base_data.size() * nrepeats); + for (int i = 0; i < nrepeats; ++i) { + std::memcpy(data.data() + i * base_data.size(), base_data.data(), base_data.size()); + } + data.resize(data_size); + + // Then randomly mutate some bytes so as to make things harder + std::mt19937 engine(42); + std::exponential_distribution<> offsets(0.05); + std::uniform_int_distribution<> values(0, 255); + + int64_t pos = 0; + while (pos < data_size) { + data[pos] = static_cast(values(engine)); + pos += static_cast(offsets(engine)); + } + + return data; +} + +class NonZeroCopyBufferReader final : public BufferReader { + public: + using BufferReader::BufferReader; + + bool supports_zero_copy() const override { return false; } +}; + +template +static void CompressionInputBenchmark(::benchmark::State& state) { + auto data = MakeCompressibleData(state.range(0)); + auto codec = ::arrow::util::Codec::Create(COMPRESSION).ValueOrDie(); + auto max_compress_len = codec->MaxCompressedLen(data.size(), data.data()); + std::shared_ptr<::arrow::ResizableBuffer> buf = + ::arrow::AllocateResizableBuffer(max_compress_len).ValueOrDie(); + int64_t length = + codec->Compress(data.size(), data.data(), max_compress_len, buf->mutable_data()) + .ValueOrDie(); + ABORT_NOT_OK(buf->Resize(length)); + for (auto _ : state) { + state.PauseTiming(); + auto reader = std::make_shared(buf); + auto inputStream = + ::arrow::io::CompressedInputStream::Make(codec.get(), reader).ValueOrDie(); + state.ResumeTiming(); + ABORT_NOT_OK(inputStream->Read(data.size(), data.data())); + } + state.SetBytesProcessed(length * state.iterations()); +} + +template +static void CompressionInputZeroCopyBenchmark(::benchmark::State& state) { + CompressionInputBenchmark<::arrow::io::BufferReader, COMPRESSION>(state); +} + +template +static void CompressionInputNonZeroCopyBenchmark(::benchmark::State& state) { + CompressionInputBenchmark(state); +} + +static void CompressedInputArguments(::benchmark::internal::Benchmark* b) { + b->ArgName("InputBytes")->Arg(8 * 1024)->Arg(64 * 1024)->Arg(1024 * 1024); +} + +#ifdef ARROW_WITH_ZLIB +BENCHMARK_TEMPLATE(CompressionInputZeroCopyBenchmark, ::arrow::Compression::GZIP) + ->Apply(CompressedInputArguments); +BENCHMARK_TEMPLATE(CompressionInputNonZeroCopyBenchmark, ::arrow::Compression::GZIP) + ->Apply(CompressedInputArguments); +#endif + +} // namespace arrow::io From 4a1906146e4e1c8b72bb195faf48cf3d5c351ae3 Mon Sep 17 00:00:00 2001 From: mwish Date: Tue, 6 Feb 2024 01:44:26 +0800 Subject: [PATCH 05/15] re-impl NonZeroCopyBufferReader --- cpp/src/arrow/io/compressed.cc | 2 +- cpp/src/arrow/io/compressed_benchmark.cc | 21 +++++++++++++++++++-- 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/cpp/src/arrow/io/compressed.cc b/cpp/src/arrow/io/compressed.cc index 9e467f3bc8b..1ad8b15038b 100644 --- a/cpp/src/arrow/io/compressed.cc +++ b/cpp/src/arrow/io/compressed.cc @@ -415,7 +415,7 @@ class CompressedInputStream::Impl { MemoryPool* pool_; std::shared_ptr raw_; bool is_open_; - bool supports_zero_copy_from_raw_; + const bool supports_zero_copy_from_raw_; std::shared_ptr decompressor_; // If `raw_->supports_zero_copy()`, this buffer would not allocate memory. // Otherwise, this buffer would allocate `kChunkSize` memory and read data from diff --git a/cpp/src/arrow/io/compressed_benchmark.cc b/cpp/src/arrow/io/compressed_benchmark.cc index aeecec090c6..f6cabd19190 100644 --- a/cpp/src/arrow/io/compressed_benchmark.cc +++ b/cpp/src/arrow/io/compressed_benchmark.cc @@ -65,11 +65,28 @@ std::vector MakeCompressibleData(int data_size) { return data; } -class NonZeroCopyBufferReader final : public BufferReader { +class NonZeroCopyBufferReader final : public InputStream { public: - using BufferReader::BufferReader; + NonZeroCopyBufferReader(std::shared_ptr buffer) : reader_(std::move(buffer)) {} bool supports_zero_copy() const override { return false; } + + Result Read(int64_t nbytes, void* out) override { + return reader_.Read(nbytes, out); + } + + Result> Read(int64_t nbytes) override { + ARROW_ASSIGN_OR_RAISE(auto buf, ::arrow::AllocateResizableBuffer(nbytes)); + ARROW_ASSIGN_OR_RAISE(int64_t size, Read(nbytes, buf->mutable_data())); + ARROW_RETURN_NOT_OK(buf->Resize(size)); + return buf; + } + Status Close() override { return reader_.Close(); } + Result Tell() const override { return reader_.Tell(); } + bool closed() const override { return reader_.closed(); } + + private: + ::arrow::io::BufferReader reader_; }; template From 86a2a622394d932886984b0dea322649c9ec9377 Mon Sep 17 00:00:00 2001 From: mwish Date: Thu, 8 Feb 2024 13:10:49 +0800 Subject: [PATCH 06/15] DatasetWriter: Check num_rows() before allocate batch --- cpp/src/arrow/dataset/dataset_writer.cc | 45 +++++++++++++------------ 1 file changed, 23 insertions(+), 22 deletions(-) diff --git a/cpp/src/arrow/dataset/dataset_writer.cc b/cpp/src/arrow/dataset/dataset_writer.cc index ae9fb36484b..4441a83cd2b 100644 --- a/cpp/src/arrow/dataset/dataset_writer.cc +++ b/cpp/src/arrow/dataset/dataset_writer.cc @@ -610,31 +610,32 @@ class DatasetWriter::DatasetWriterImpl { bool will_open_file = false; ARROW_ASSIGN_OR_RAISE(auto next_chunk, dir_queue->NextWritableChunk( batch, &remainder, &will_open_file)); - - backpressure = - writer_state_.rows_in_flight_throttle.Acquire(next_chunk->num_rows()); - if (!backpressure.is_finished()) { - EVENT_ON_CURRENT_SPAN("DatasetWriter::Backpressure::TooManyRowsQueued"); - break; - } - if (will_open_file) { - backpressure = writer_state_.open_files_throttle.Acquire(1); + if (next_chunk->num_rows() > 0) { + backpressure = + writer_state_.rows_in_flight_throttle.Acquire(next_chunk->num_rows()); if (!backpressure.is_finished()) { - EVENT_ON_CURRENT_SPAN("DatasetWriter::Backpressure::TooManyOpenFiles"); - writer_state_.rows_in_flight_throttle.Release(next_chunk->num_rows()); - RETURN_NOT_OK(TryCloseLargestFile()); + EVENT_ON_CURRENT_SPAN("DatasetWriter::Backpressure::TooManyRowsQueued"); break; } - } - auto s = dir_queue->StartWrite(next_chunk); - if (!s.ok()) { - // If `StartWrite` succeeded, it will Release the - // `rows_in_flight_throttle` when the write task is finished. - // - // `open_files_throttle` will be handed by `DatasetWriterDirectoryQueue` - // so we don't need to release it here. - writer_state_.rows_in_flight_throttle.Release(next_chunk->num_rows()); - return s; + if (will_open_file) { + backpressure = writer_state_.open_files_throttle.Acquire(1); + if (!backpressure.is_finished()) { + EVENT_ON_CURRENT_SPAN("DatasetWriter::Backpressure::TooManyOpenFiles"); + writer_state_.rows_in_flight_throttle.Release(next_chunk->num_rows()); + RETURN_NOT_OK(TryCloseLargestFile()); + break; + } + } + auto s = dir_queue->StartWrite(next_chunk); + if (!s.ok()) { + // If `StartWrite` succeeded, it will Release the + // `rows_in_flight_throttle` when the write task is finished. + // + // `open_files_throttle` will be handed by `DatasetWriterDirectoryQueue` + // so we don't need to release it here. + writer_state_.rows_in_flight_throttle.Release(next_chunk->num_rows()); + return s; + } } batch = std::move(remainder); if (batch) { From 7cb23a315364901320b916f9b7f25037ef98d561 Mon Sep 17 00:00:00 2001 From: mwish Date: Mon, 18 Mar 2024 11:34:07 +0800 Subject: [PATCH 07/15] Update testing impl --- cpp/src/arrow/io/compressed_benchmark.cc | 40 +++++++++++++++++++++--- 1 file changed, 35 insertions(+), 5 deletions(-) diff --git a/cpp/src/arrow/io/compressed_benchmark.cc b/cpp/src/arrow/io/compressed_benchmark.cc index f6cabd19190..9385bf16a93 100644 --- a/cpp/src/arrow/io/compressed_benchmark.cc +++ b/cpp/src/arrow/io/compressed_benchmark.cc @@ -65,6 +65,7 @@ std::vector MakeCompressibleData(int data_size) { return data; } +// Using a non-zero copy buffer reader to benchmark the non-zero copy path. class NonZeroCopyBufferReader final : public InputStream { public: NonZeroCopyBufferReader(std::shared_ptr buffer) : reader_(std::move(buffer)) {} @@ -76,6 +77,8 @@ class NonZeroCopyBufferReader final : public InputStream { } Result> Read(int64_t nbytes) override { + // Testing the non-zero copy path like reading from local file or Object store, + // so we need to allocate a buffer and copy the data. ARROW_ASSIGN_OR_RAISE(auto buf, ::arrow::AllocateResizableBuffer(nbytes)); ARROW_ASSIGN_OR_RAISE(int64_t size, Read(nbytes, buf->mutable_data())); ARROW_RETURN_NOT_OK(buf->Resize(size)); @@ -91,9 +94,10 @@ class NonZeroCopyBufferReader final : public InputStream { template static void CompressionInputBenchmark(::benchmark::State& state) { - auto data = MakeCompressibleData(state.range(0)); + std::vector data = MakeCompressibleData(/*data_size=*/state.range(0)); + int64_t per_read_bytes = state.range(1); auto codec = ::arrow::util::Codec::Create(COMPRESSION).ValueOrDie(); - auto max_compress_len = codec->MaxCompressedLen(data.size(), data.data()); + int64_t max_compress_len = codec->MaxCompressedLen(data.size(), data.data()); std::shared_ptr<::arrow::ResizableBuffer> buf = ::arrow::AllocateResizableBuffer(max_compress_len).ValueOrDie(); int64_t length = @@ -103,10 +107,16 @@ static void CompressionInputBenchmark(::benchmark::State& state) { for (auto _ : state) { state.PauseTiming(); auto reader = std::make_shared(buf); - auto inputStream = + auto input_stream = ::arrow::io::CompressedInputStream::Make(codec.get(), reader).ValueOrDie(); + auto read_buffer = ::arrow::AllocateBuffer(per_read_bytes).ValueOrDie(); state.ResumeTiming(); - ABORT_NOT_OK(inputStream->Read(data.size(), data.data())); + int64_t remaining_size = data.size(); + while (remaining_size > 0) { + auto value = input_stream->Read(per_read_bytes, read_buffer->mutable_data()); + ABORT_NOT_OK(value); + remaining_size -= value.ValueOrDie(); + } } state.SetBytesProcessed(length * state.iterations()); } @@ -122,7 +132,13 @@ static void CompressionInputNonZeroCopyBenchmark(::benchmark::State& state) { } static void CompressedInputArguments(::benchmark::internal::Benchmark* b) { - b->ArgName("InputBytes")->Arg(8 * 1024)->Arg(64 * 1024)->Arg(1024 * 1024); + b->ArgNames({"InputBytes", "PerReadBytes"}) + ->Args({8 * 1024, 8 * 1024}) + ->Args({64 * 1024, 8 * 1024}) + ->Args({64 * 1024, 64 * 1024}) + ->Args({1024 * 1024, 8 * 1024}) + ->Args({1024 * 1024, 64 * 1024}) + ->Args({1024 * 1024, 1024 * 1024}); } #ifdef ARROW_WITH_ZLIB @@ -132,4 +148,18 @@ BENCHMARK_TEMPLATE(CompressionInputNonZeroCopyBenchmark, ::arrow::Compression::G ->Apply(CompressedInputArguments); #endif +#ifdef ARROW_WITH_ZSTD +BENCHMARK_TEMPLATE(CompressionInputZeroCopyBenchmark, ::arrow::Compression::ZSTD) + ->Apply(CompressedInputArguments); +BENCHMARK_TEMPLATE(CompressionInputNonZeroCopyBenchmark, ::arrow::Compression::ZSTD) + ->Apply(CompressedInputArguments); +#endif + +#ifdef ARROW_WITH_LZ4 +BENCHMARK_TEMPLATE(CompressionInputZeroCopyBenchmark, ::arrow::Compression::LZ4_FRAME) + ->Apply(CompressedInputArguments); +BENCHMARK_TEMPLATE(CompressionInputNonZeroCopyBenchmark, ::arrow::Compression::LZ4_FRAME) + ->Apply(CompressedInputArguments); +#endif + } // namespace arrow::io From 0521d1fbd617354e6b5a87f79abb7085d2861644 Mon Sep 17 00:00:00 2001 From: mwish Date: Mon, 18 Mar 2024 11:50:51 +0800 Subject: [PATCH 08/15] Reducing the calling to ResizableBuffer::Resize --- cpp/src/arrow/io/compressed.cc | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/cpp/src/arrow/io/compressed.cc b/cpp/src/arrow/io/compressed.cc index 1ad8b15038b..a526d0d8567 100644 --- a/cpp/src/arrow/io/compressed.cc +++ b/cpp/src/arrow/io/compressed.cc @@ -276,17 +276,21 @@ class CompressedInputStream::Impl { if (compressed_for_non_zero_copy_ == nullptr) { ARROW_ASSIGN_OR_RAISE(compressed_for_non_zero_copy_, AllocateResizableBuffer(kChunkSize, pool_)); - } else { + } else if (compressed_for_non_zero_copy_->size() != kChunkSize) { RETURN_NOT_OK( compressed_for_non_zero_copy_->Resize(kChunkSize, /*shrink_to_fit=*/false)); } + // set compressed_ to nullptr to avoid `compressed_for_non_zero_copy_` being + // referenced twice, which would making it "immutable". compressed_ = nullptr; ARROW_ASSIGN_OR_RAISE( int64_t read_size, raw_->Read(kChunkSize, compressed_for_non_zero_copy_->mutable_data_as())); - RETURN_NOT_OK( - compressed_for_non_zero_copy_->Resize(read_size, /*shrink_to_fit=*/false)); + if (read_size != compressed_for_non_zero_copy_->size()) { + RETURN_NOT_OK( + compressed_for_non_zero_copy_->Resize(read_size, /*shrink_to_fit=*/false)); + } compressed_ = compressed_for_non_zero_copy_; } else { ARROW_ASSIGN_OR_RAISE(compressed_, raw_->Read(kChunkSize)); From e50a404a613022f9810f95400a3aa34af227a62f Mon Sep 17 00:00:00 2001 From: mwish Date: Mon, 18 Mar 2024 12:32:08 +0800 Subject: [PATCH 09/15] Fix lint --- cpp/src/arrow/io/compressed_benchmark.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/io/compressed_benchmark.cc b/cpp/src/arrow/io/compressed_benchmark.cc index 9385bf16a93..a9cf11dd4de 100644 --- a/cpp/src/arrow/io/compressed_benchmark.cc +++ b/cpp/src/arrow/io/compressed_benchmark.cc @@ -26,13 +26,13 @@ #include #include "arrow/buffer.h" +#include "arrow/io/compressed.h" #include "arrow/io/memory.h" #include "arrow/result.h" #include "arrow/testing/gtest_util.h" #include "arrow/util/compression.h" #include "arrow/util/logging.h" #include "arrow/util/macros.h" -#include "compressed.h" namespace arrow::io { From fabb5b48864eb8ba27eabbe0228f4ca11c76e9c2 Mon Sep 17 00:00:00 2001 From: mwish Date: Mon, 18 Mar 2024 18:37:21 +0800 Subject: [PATCH 10/15] add arrow/util/config.h for macro --- cpp/src/arrow/io/compressed_benchmark.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/cpp/src/arrow/io/compressed_benchmark.cc b/cpp/src/arrow/io/compressed_benchmark.cc index a9cf11dd4de..d90ae1abce3 100644 --- a/cpp/src/arrow/io/compressed_benchmark.cc +++ b/cpp/src/arrow/io/compressed_benchmark.cc @@ -31,6 +31,7 @@ #include "arrow/result.h" #include "arrow/testing/gtest_util.h" #include "arrow/util/compression.h" +#include "arrow/util/config.h" #include "arrow/util/logging.h" #include "arrow/util/macros.h" From 01db14c51f7be160bf2f00eb9e5177391e43b6c6 Mon Sep 17 00:00:00 2001 From: mwish Date: Wed, 20 Mar 2024 22:45:17 +0800 Subject: [PATCH 11/15] resolve comment --- cpp/src/arrow/io/compressed.cc | 15 +++--- cpp/src/arrow/io/compressed_benchmark.cc | 68 +++++++++++++++--------- 2 files changed, 51 insertions(+), 32 deletions(-) diff --git a/cpp/src/arrow/io/compressed.cc b/cpp/src/arrow/io/compressed.cc index a526d0d8567..dc52e509ff4 100644 --- a/cpp/src/arrow/io/compressed.cc +++ b/cpp/src/arrow/io/compressed.cc @@ -19,6 +19,7 @@ #include #include +#include #include #include #include @@ -233,7 +234,7 @@ class CompressedInputStream::Impl { : pool_(pool), raw_(raw), is_open_(true), - supports_zero_copy_from_raw_(raw->supports_zero_copy()), + supports_zero_copy_from_raw_(raw_->supports_zero_copy()), compressed_pos_(0), decompressed_pos_(0), fresh_decompressor_(false), @@ -280,9 +281,6 @@ class CompressedInputStream::Impl { RETURN_NOT_OK( compressed_for_non_zero_copy_->Resize(kChunkSize, /*shrink_to_fit=*/false)); } - // set compressed_ to nullptr to avoid `compressed_for_non_zero_copy_` being - // referenced twice, which would making it "immutable". - compressed_ = nullptr; ARROW_ASSIGN_OR_RAISE( int64_t read_size, raw_->Read(kChunkSize, @@ -312,7 +310,8 @@ class CompressedInputStream::Impl { ARROW_ASSIGN_OR_RAISE(decompressed_, AllocateResizableBuffer(decompress_size, pool_)); } else { - RETURN_NOT_OK(decompressed_->Resize(decompress_size, /*shrink_to_fit=*/false)); + // Shrinking the buffer if it's already large enough + RETURN_NOT_OK(decompressed_->Resize(decompress_size, /*shrink_to_fit=*/true)); } decompressed_pos_ = 0; @@ -328,7 +327,9 @@ class CompressedInputStream::Impl { fresh_decompressor_ = false; } if (result.bytes_written > 0 || !result.need_more_output || input_len == 0) { - RETURN_NOT_OK(decompressed_->Resize(result.bytes_written)); + // Not calling shrink_to_fit here because we're likely to reusing the buffer. + RETURN_NOT_OK( + decompressed_->Resize(result.bytes_written, /*shrink_to_fit=*/false)); break; } DCHECK_EQ(result.bytes_written, 0); @@ -441,7 +442,7 @@ Result> CompressedInputStream::Make( Codec* codec, const std::shared_ptr& raw, MemoryPool* pool) { // CAUTION: codec is not owned std::shared_ptr res(new CompressedInputStream); - res->impl_.reset(new Impl(pool, std::move(raw))); + res->impl_.reset(new Impl(pool, raw)); RETURN_NOT_OK(res->impl_->Init(codec)); return res; } diff --git a/cpp/src/arrow/io/compressed_benchmark.cc b/cpp/src/arrow/io/compressed_benchmark.cc index d90ae1abce3..d2bf7aa9f15 100644 --- a/cpp/src/arrow/io/compressed_benchmark.cc +++ b/cpp/src/arrow/io/compressed_benchmark.cc @@ -93,7 +93,7 @@ class NonZeroCopyBufferReader final : public InputStream { ::arrow::io::BufferReader reader_; }; -template +template static void CompressionInputBenchmark(::benchmark::State& state) { std::vector data = MakeCompressibleData(/*data_size=*/state.range(0)); int64_t per_read_bytes = state.range(1); @@ -108,28 +108,50 @@ static void CompressionInputBenchmark(::benchmark::State& state) { for (auto _ : state) { state.PauseTiming(); auto reader = std::make_shared(buf); + [[maybe_unused]] auto read_buffer = + ::arrow::AllocateBuffer(per_read_bytes).ValueOrDie(); + state.ResumeTiming(); + // Put `CompressedInputStream::Make` in timing. auto input_stream = ::arrow::io::CompressedInputStream::Make(codec.get(), reader).ValueOrDie(); - auto read_buffer = ::arrow::AllocateBuffer(per_read_bytes).ValueOrDie(); - state.ResumeTiming(); int64_t remaining_size = data.size(); while (remaining_size > 0) { - auto value = input_stream->Read(per_read_bytes, read_buffer->mutable_data()); - ABORT_NOT_OK(value); - remaining_size -= value.ValueOrDie(); + if constexpr (ReadIntoBuffer) { + auto value = input_stream->Read(per_read_bytes, read_buffer->mutable_data()); + ABORT_NOT_OK(value); + remaining_size -= value.ValueOrDie(); + } else { + auto value = input_stream->Read(per_read_bytes); + ABORT_NOT_OK(value); + remaining_size -= value.ValueOrDie()->size(); + } } } state.SetBytesProcessed(length * state.iterations()); } template -static void CompressionInputZeroCopyBenchmark(::benchmark::State& state) { - CompressionInputBenchmark<::arrow::io::BufferReader, COMPRESSION>(state); +static void CompressionInputZeroCopyBenchmarkIntoBuffer(::benchmark::State& state) { + CompressionInputBenchmark<::arrow::io::BufferReader, COMPRESSION, + /*ReadIntoBuffer=*/true>(state); +} + +template +static void CompressionInputNonZeroCopyBenchmarkIntoBuffer(::benchmark::State& state) { + CompressionInputBenchmark(state); +} + +template +static void CompressionInputZeroCopyBenchmarkDirectRead(::benchmark::State& state) { + CompressionInputBenchmark<::arrow::io::BufferReader, COMPRESSION, + /*ReadIntoBuffer=*/false>(state); } template -static void CompressionInputNonZeroCopyBenchmark(::benchmark::State& state) { - CompressionInputBenchmark(state); +static void CompressionInputNonZeroCopyBenchmarkDirectRead(::benchmark::State& state) { + CompressionInputBenchmark(state); } static void CompressedInputArguments(::benchmark::internal::Benchmark* b) { @@ -142,24 +164,20 @@ static void CompressedInputArguments(::benchmark::internal::Benchmark* b) { ->Args({1024 * 1024, 1024 * 1024}); } -#ifdef ARROW_WITH_ZLIB -BENCHMARK_TEMPLATE(CompressionInputZeroCopyBenchmark, ::arrow::Compression::GZIP) - ->Apply(CompressedInputArguments); -BENCHMARK_TEMPLATE(CompressionInputNonZeroCopyBenchmark, ::arrow::Compression::GZIP) - ->Apply(CompressedInputArguments); -#endif - -#ifdef ARROW_WITH_ZSTD -BENCHMARK_TEMPLATE(CompressionInputZeroCopyBenchmark, ::arrow::Compression::ZSTD) +#ifdef ARROW_WITH_LZ4 +// Benchmark LZ4 because it's lightweight, which makes benchmarking focused on the +// overhead of the compression input stream. +BENCHMARK_TEMPLATE(CompressionInputZeroCopyBenchmarkIntoBuffer, + ::arrow::Compression::LZ4_FRAME) ->Apply(CompressedInputArguments); -BENCHMARK_TEMPLATE(CompressionInputNonZeroCopyBenchmark, ::arrow::Compression::ZSTD) +BENCHMARK_TEMPLATE(CompressionInputNonZeroCopyBenchmarkIntoBuffer, + ::arrow::Compression::LZ4_FRAME) ->Apply(CompressedInputArguments); -#endif - -#ifdef ARROW_WITH_LZ4 -BENCHMARK_TEMPLATE(CompressionInputZeroCopyBenchmark, ::arrow::Compression::LZ4_FRAME) +BENCHMARK_TEMPLATE(CompressionInputZeroCopyBenchmarkDirectRead, + ::arrow::Compression::LZ4_FRAME) ->Apply(CompressedInputArguments); -BENCHMARK_TEMPLATE(CompressionInputNonZeroCopyBenchmark, ::arrow::Compression::LZ4_FRAME) +BENCHMARK_TEMPLATE(CompressionInputNonZeroCopyBenchmarkDirectRead, + ::arrow::Compression::LZ4_FRAME) ->Apply(CompressedInputArguments); #endif From 441cd18c4c2a8d367b37f48513533943d5cffc51 Mon Sep 17 00:00:00 2001 From: mwish Date: Wed, 20 Mar 2024 23:35:14 +0800 Subject: [PATCH 12/15] remove included header --- cpp/src/arrow/io/compressed.cc | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cpp/src/arrow/io/compressed.cc b/cpp/src/arrow/io/compressed.cc index dc52e509ff4..d06101748dc 100644 --- a/cpp/src/arrow/io/compressed.cc +++ b/cpp/src/arrow/io/compressed.cc @@ -19,7 +19,6 @@ #include #include -#include #include #include #include @@ -202,7 +201,7 @@ Result> CompressedOutputStream::Make( util::Codec* codec, const std::shared_ptr& raw, MemoryPool* pool) { // CAUTION: codec is not owned std::shared_ptr res(new CompressedOutputStream); - res->impl_.reset(new Impl(pool, std::move(raw))); + res->impl_.reset(new Impl(pool, raw)); RETURN_NOT_OK(res->impl_->Init(codec)); return res; } From dff8f9c7790e74cb0915a8b164055848a076c0e3 Mon Sep 17 00:00:00 2001 From: mwish Date: Wed, 27 Mar 2024 01:13:59 +0800 Subject: [PATCH 13/15] fix win ci --- cpp/src/arrow/io/compressed_benchmark.cc | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/cpp/src/arrow/io/compressed_benchmark.cc b/cpp/src/arrow/io/compressed_benchmark.cc index d2bf7aa9f15..fe0ac9dbf70 100644 --- a/cpp/src/arrow/io/compressed_benchmark.cc +++ b/cpp/src/arrow/io/compressed_benchmark.cc @@ -95,15 +95,18 @@ class NonZeroCopyBufferReader final : public InputStream { template static void CompressionInputBenchmark(::benchmark::State& state) { - std::vector data = MakeCompressibleData(/*data_size=*/state.range(0)); + std::vector data = + MakeCompressibleData(/*data_size=*/static_cast(state.range(0))); int64_t per_read_bytes = state.range(1); auto codec = ::arrow::util::Codec::Create(COMPRESSION).ValueOrDie(); - int64_t max_compress_len = codec->MaxCompressedLen(data.size(), data.data()); + int64_t max_compress_len = + codec->MaxCompressedLen(static_cast(data.size()), data.data()); std::shared_ptr<::arrow::ResizableBuffer> buf = ::arrow::AllocateResizableBuffer(max_compress_len).ValueOrDie(); - int64_t length = - codec->Compress(data.size(), data.data(), max_compress_len, buf->mutable_data()) - .ValueOrDie(); + int64_t length = codec + ->Compress(static_cast(data.size()), data.data(), + max_compress_len, buf->mutable_data()) + .ValueOrDie(); ABORT_NOT_OK(buf->Resize(length)); for (auto _ : state) { state.PauseTiming(); @@ -114,7 +117,7 @@ static void CompressionInputBenchmark(::benchmark::State& state) { // Put `CompressedInputStream::Make` in timing. auto input_stream = ::arrow::io::CompressedInputStream::Make(codec.get(), reader).ValueOrDie(); - int64_t remaining_size = data.size(); + auto remaining_size = static_cast(data.size()); while (remaining_size > 0) { if constexpr (ReadIntoBuffer) { auto value = input_stream->Read(per_read_bytes, read_buffer->mutable_data()); From 4c9787142a333d43fb528a62567399582f2657cb Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Wed, 27 Mar 2024 11:02:57 +0100 Subject: [PATCH 14/15] Count decompressed bytes; naming nits --- cpp/src/arrow/io/compressed_benchmark.cc | 99 ++++++++++++++---------- 1 file changed, 56 insertions(+), 43 deletions(-) diff --git a/cpp/src/arrow/io/compressed_benchmark.cc b/cpp/src/arrow/io/compressed_benchmark.cc index fe0ac9dbf70..edf4ec85390 100644 --- a/cpp/src/arrow/io/compressed_benchmark.cc +++ b/cpp/src/arrow/io/compressed_benchmark.cc @@ -37,6 +37,8 @@ namespace arrow::io { +using ::arrow::Compression; + std::vector MakeCompressibleData(int data_size) { // XXX This isn't a real-world corpus so doesn't really represent the // comparative qualities of the algorithms @@ -93,72 +95,83 @@ class NonZeroCopyBufferReader final : public InputStream { ::arrow::io::BufferReader reader_; }; -template -static void CompressionInputBenchmark(::benchmark::State& state) { - std::vector data = - MakeCompressibleData(/*data_size=*/static_cast(state.range(0))); - int64_t per_read_bytes = state.range(1); - auto codec = ::arrow::util::Codec::Create(COMPRESSION).ValueOrDie(); +enum class BufferReadMode { ProvidedByCaller, ReturnedByCallee }; + +template +static void CompressedInputStreamBenchmark(::benchmark::State& state, + Compression::type compression) { + const int64_t input_size = state.range(0); + const int64_t batch_size = state.range(1); + + const std::vector data = MakeCompressibleData(input_size); + auto codec = ::arrow::util::Codec::Create(compression).ValueOrDie(); int64_t max_compress_len = codec->MaxCompressedLen(static_cast(data.size()), data.data()); std::shared_ptr<::arrow::ResizableBuffer> buf = ::arrow::AllocateResizableBuffer(max_compress_len).ValueOrDie(); - int64_t length = codec - ->Compress(static_cast(data.size()), data.data(), - max_compress_len, buf->mutable_data()) - .ValueOrDie(); - ABORT_NOT_OK(buf->Resize(length)); + const int64_t compressed_length = + codec + ->Compress(static_cast(data.size()), data.data(), max_compress_len, + buf->mutable_data()) + .ValueOrDie(); + ABORT_NOT_OK(buf->Resize(compressed_length)); for (auto _ : state) { state.PauseTiming(); auto reader = std::make_shared(buf); - [[maybe_unused]] auto read_buffer = - ::arrow::AllocateBuffer(per_read_bytes).ValueOrDie(); + [[maybe_unused]] std::unique_ptr read_buffer; + if constexpr (Mode == BufferReadMode::ProvidedByCaller) { + read_buffer = ::arrow::AllocateBuffer(batch_size).ValueOrDie(); + } state.ResumeTiming(); // Put `CompressedInputStream::Make` in timing. auto input_stream = ::arrow::io::CompressedInputStream::Make(codec.get(), reader).ValueOrDie(); - auto remaining_size = static_cast(data.size()); + auto remaining_size = input_size; while (remaining_size > 0) { - if constexpr (ReadIntoBuffer) { - auto value = input_stream->Read(per_read_bytes, read_buffer->mutable_data()); + if constexpr (Mode == BufferReadMode::ProvidedByCaller) { + auto value = input_stream->Read(batch_size, read_buffer->mutable_data()); ABORT_NOT_OK(value); remaining_size -= value.ValueOrDie(); } else { - auto value = input_stream->Read(per_read_bytes); + auto value = input_stream->Read(batch_size); ABORT_NOT_OK(value); remaining_size -= value.ValueOrDie()->size(); } } } - state.SetBytesProcessed(length * state.iterations()); + state.SetBytesProcessed(input_size * state.iterations()); } -template -static void CompressionInputZeroCopyBenchmarkIntoBuffer(::benchmark::State& state) { - CompressionInputBenchmark<::arrow::io::BufferReader, COMPRESSION, - /*ReadIntoBuffer=*/true>(state); +template +static void CompressedInputStreamZeroCopyBufferProvidedByCaller( + ::benchmark::State& state) { + CompressedInputStreamBenchmark<::arrow::io::BufferReader, + BufferReadMode::ProvidedByCaller>(state, kCompression); } -template -static void CompressionInputNonZeroCopyBenchmarkIntoBuffer(::benchmark::State& state) { - CompressionInputBenchmark(state); +template +static void CompressedInputStreamNonZeroCopyBufferProvidedByCaller( + ::benchmark::State& state) { + CompressedInputStreamBenchmark(state, kCompression); } -template -static void CompressionInputZeroCopyBenchmarkDirectRead(::benchmark::State& state) { - CompressionInputBenchmark<::arrow::io::BufferReader, COMPRESSION, - /*ReadIntoBuffer=*/false>(state); +template +static void CompressedInputStreamZeroCopyBufferReturnedByCallee( + ::benchmark::State& state) { + CompressedInputStreamBenchmark<::arrow::io::BufferReader, + BufferReadMode::ReturnedByCallee>(state, kCompression); } -template -static void CompressionInputNonZeroCopyBenchmarkDirectRead(::benchmark::State& state) { - CompressionInputBenchmark(state); +template +static void CompressedInputStreamNonZeroCopyBufferReturnedByCallee( + ::benchmark::State& state) { + CompressedInputStreamBenchmark(state, kCompression); } static void CompressedInputArguments(::benchmark::internal::Benchmark* b) { - b->ArgNames({"InputBytes", "PerReadBytes"}) + b->ArgNames({"num_bytes", "batch_size"}) ->Args({8 * 1024, 8 * 1024}) ->Args({64 * 1024, 8 * 1024}) ->Args({64 * 1024, 64 * 1024}) @@ -170,17 +183,17 @@ static void CompressedInputArguments(::benchmark::internal::Benchmark* b) { #ifdef ARROW_WITH_LZ4 // Benchmark LZ4 because it's lightweight, which makes benchmarking focused on the // overhead of the compression input stream. -BENCHMARK_TEMPLATE(CompressionInputZeroCopyBenchmarkIntoBuffer, - ::arrow::Compression::LZ4_FRAME) +BENCHMARK_TEMPLATE(CompressedInputStreamZeroCopyBufferProvidedByCaller, + Compression::LZ4_FRAME) ->Apply(CompressedInputArguments); -BENCHMARK_TEMPLATE(CompressionInputNonZeroCopyBenchmarkIntoBuffer, - ::arrow::Compression::LZ4_FRAME) +BENCHMARK_TEMPLATE(CompressedInputStreamNonZeroCopyBufferProvidedByCaller, + Compression::LZ4_FRAME) ->Apply(CompressedInputArguments); -BENCHMARK_TEMPLATE(CompressionInputZeroCopyBenchmarkDirectRead, - ::arrow::Compression::LZ4_FRAME) +BENCHMARK_TEMPLATE(CompressedInputStreamZeroCopyBufferReturnedByCallee, + Compression::LZ4_FRAME) ->Apply(CompressedInputArguments); -BENCHMARK_TEMPLATE(CompressionInputNonZeroCopyBenchmarkDirectRead, - ::arrow::Compression::LZ4_FRAME) +BENCHMARK_TEMPLATE(CompressedInputStreamNonZeroCopyBufferReturnedByCallee, + Compression::LZ4_FRAME) ->Apply(CompressedInputArguments); #endif From 8ca0840151471c13ed7a7d2dbd24d00f47aad04d Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Wed, 27 Mar 2024 11:22:09 +0100 Subject: [PATCH 15/15] Fix compile error --- cpp/src/arrow/io/compressed.h | 4 ++-- cpp/src/arrow/io/compressed_benchmark.cc | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/cpp/src/arrow/io/compressed.h b/cpp/src/arrow/io/compressed.h index 470d4db8e4d..6b4e7ab4d72 100644 --- a/cpp/src/arrow/io/compressed.h +++ b/cpp/src/arrow/io/compressed.h @@ -45,7 +45,7 @@ class ARROW_EXPORT CompressedOutputStream : public OutputStream { /// \brief Create a compressed output stream wrapping the given output stream. /// - /// The codec must be capaable of streaming compression. Some codecs, + /// The codec must be capable of streaming compression. Some codecs, /// like Snappy, are not able to do so. static Result> Make( util::Codec* codec, const std::shared_ptr& raw, @@ -86,7 +86,7 @@ class ARROW_EXPORT CompressedInputStream /// \brief Create a compressed input stream wrapping the given input stream. /// - /// The codec must be capaable of streaming decompression. Some codecs, + /// The codec must be capable of streaming decompression. Some codecs, /// like Snappy, are not able to do so. static Result> Make( util::Codec* codec, const std::shared_ptr& raw, diff --git a/cpp/src/arrow/io/compressed_benchmark.cc b/cpp/src/arrow/io/compressed_benchmark.cc index edf4ec85390..52a30d8cb08 100644 --- a/cpp/src/arrow/io/compressed_benchmark.cc +++ b/cpp/src/arrow/io/compressed_benchmark.cc @@ -103,7 +103,7 @@ static void CompressedInputStreamBenchmark(::benchmark::State& state, const int64_t input_size = state.range(0); const int64_t batch_size = state.range(1); - const std::vector data = MakeCompressibleData(input_size); + const std::vector data = MakeCompressibleData(static_cast(input_size)); auto codec = ::arrow::util::Codec::Create(compression).ValueOrDie(); int64_t max_compress_len = codec->MaxCompressedLen(static_cast(data.size()), data.data());