Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cpp/src/arrow/io/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,7 @@ if(NOT (${ARROW_SIMD_LEVEL} STREQUAL "NONE") AND NOT (${ARROW_SIMD_LEVEL} STREQU
add_arrow_benchmark(memory_benchmark PREFIX "arrow-io")
endif()

add_arrow_benchmark(compressed_benchmark PREFIX "arrow-io")

# Headers: top level
arrow_install_all_headers("arrow/io")
63 changes: 45 additions & 18 deletions cpp/src/arrow/io/compressed.cc
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ Result<std::shared_ptr<CompressedOutputStream>> CompressedOutputStream::Make(
util::Codec* codec, const std::shared_ptr<OutputStream>& raw, MemoryPool* pool) {
// CAUTION: codec is not owned
std::shared_ptr<CompressedOutputStream> res(new CompressedOutputStream);
res->impl_.reset(new Impl(pool, std::move(raw)));
res->impl_.reset(new Impl(pool, raw));
RETURN_NOT_OK(res->impl_->Init(codec));
return res;
}
Expand Down Expand Up @@ -233,8 +233,10 @@ class CompressedInputStream::Impl {
: pool_(pool),
raw_(raw),
is_open_(true),
supports_zero_copy_from_raw_(raw_->supports_zero_copy()),
compressed_pos_(0),
decompressed_pos_(0),
fresh_decompressor_(false),
total_pos_(0) {}

Status Init(Codec* codec) {
Expand All @@ -261,16 +263,35 @@ class CompressedInputStream::Impl {
}
}

bool closed() { return !is_open_; }
bool closed() const { return !is_open_; }

Result<int64_t> Tell() const { return total_pos_; }

// Read compressed data if necessary
Status EnsureCompressedData() {
int64_t compressed_avail = compressed_ ? compressed_->size() - compressed_pos_ : 0;
if (compressed_avail == 0) {
// No compressed data available, read a full chunk
ARROW_ASSIGN_OR_RAISE(compressed_, raw_->Read(kChunkSize));
// Ensure compressed_ buffer is allocated with kChunkSize.
if (!supports_zero_copy_from_raw_) {
if (compressed_for_non_zero_copy_ == nullptr) {
ARROW_ASSIGN_OR_RAISE(compressed_for_non_zero_copy_,
AllocateResizableBuffer(kChunkSize, pool_));
} else if (compressed_for_non_zero_copy_->size() != kChunkSize) {
RETURN_NOT_OK(
compressed_for_non_zero_copy_->Resize(kChunkSize, /*shrink_to_fit=*/false));
}
ARROW_ASSIGN_OR_RAISE(
int64_t read_size,
raw_->Read(kChunkSize,
compressed_for_non_zero_copy_->mutable_data_as<void>()));
if (read_size != compressed_for_non_zero_copy_->size()) {
RETURN_NOT_OK(
compressed_for_non_zero_copy_->Resize(read_size, /*shrink_to_fit=*/false));
}
compressed_ = compressed_for_non_zero_copy_;
} else {
ARROW_ASSIGN_OR_RAISE(compressed_, raw_->Read(kChunkSize));
}
compressed_pos_ = 0;
}
return Status::OK();
Expand All @@ -284,8 +305,13 @@ class CompressedInputStream::Impl {
int64_t decompress_size = kDecompressSize;

while (true) {
ARROW_ASSIGN_OR_RAISE(decompressed_,
AllocateResizableBuffer(decompress_size, pool_));
if (decompressed_ == nullptr) {
ARROW_ASSIGN_OR_RAISE(decompressed_,
AllocateResizableBuffer(decompress_size, pool_));
} else {
// Shrinking the buffer if it's already large enough
RETURN_NOT_OK(decompressed_->Resize(decompress_size, /*shrink_to_fit=*/true));
}
decompressed_pos_ = 0;

int64_t input_len = compressed_->size() - compressed_pos_;
Expand All @@ -300,7 +326,9 @@ class CompressedInputStream::Impl {
fresh_decompressor_ = false;
}
if (result.bytes_written > 0 || !result.need_more_output || input_len == 0) {
RETURN_NOT_OK(decompressed_->Resize(result.bytes_written));
// Not calling shrink_to_fit here because we're likely to reusing the buffer.
RETURN_NOT_OK(
decompressed_->Resize(result.bytes_written, /*shrink_to_fit=*/false));
break;
}
DCHECK_EQ(result.bytes_written, 0);
Expand All @@ -310,19 +338,14 @@ class CompressedInputStream::Impl {
return Status::OK();
}

// Read a given number of bytes from the decompressed_ buffer.
// Copying a given number of bytes from the decompressed_ buffer.
int64_t ReadFromDecompressed(int64_t nbytes, uint8_t* out) {
int64_t readable = decompressed_ ? (decompressed_->size() - decompressed_pos_) : 0;
int64_t read_bytes = std::min(readable, nbytes);

if (read_bytes > 0) {
memcpy(out, decompressed_->data() + decompressed_pos_, read_bytes);
decompressed_pos_ += read_bytes;

if (decompressed_pos_ == decompressed_->size()) {
// Decompressed data is exhausted, release buffer
decompressed_.reset();
}
}

return read_bytes;
Expand Down Expand Up @@ -357,7 +380,7 @@ class CompressedInputStream::Impl {
}

Result<int64_t> Read(int64_t nbytes, void* out) {
auto out_data = reinterpret_cast<uint8_t*>(out);
auto* out_data = reinterpret_cast<uint8_t*>(out);

int64_t total_read = 0;
bool decompressor_has_data = true;
Expand All @@ -382,10 +405,10 @@ class CompressedInputStream::Impl {
ARROW_ASSIGN_OR_RAISE(auto buf, AllocateResizableBuffer(nbytes, pool_));
ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, Read(nbytes, buf->mutable_data()));
RETURN_NOT_OK(buf->Resize(bytes_read));
return std::move(buf);
return buf;
}

std::shared_ptr<InputStream> raw() const { return raw_; }
const std::shared_ptr<InputStream>& raw() const { return raw_; }

private:
// Read 64 KB compressed data at a time
Expand All @@ -396,7 +419,12 @@ class CompressedInputStream::Impl {
MemoryPool* pool_;
std::shared_ptr<InputStream> raw_;
bool is_open_;
const bool supports_zero_copy_from_raw_;
std::shared_ptr<Decompressor> decompressor_;
// If `raw_->supports_zero_copy()`, this buffer would not allocate memory.
// Otherwise, this buffer would allocate `kChunkSize` memory and read data from
// `raw_`.
std::shared_ptr<ResizableBuffer> compressed_for_non_zero_copy_;
std::shared_ptr<Buffer> compressed_;
// Position in compressed buffer
int64_t compressed_pos_;
Expand All @@ -413,10 +441,9 @@ Result<std::shared_ptr<CompressedInputStream>> CompressedInputStream::Make(
Codec* codec, const std::shared_ptr<InputStream>& raw, MemoryPool* pool) {
// CAUTION: codec is not owned
std::shared_ptr<CompressedInputStream> res(new CompressedInputStream);
res->impl_.reset(new Impl(pool, std::move(raw)));
res->impl_.reset(new Impl(pool, raw));
RETURN_NOT_OK(res->impl_->Init(codec));
return res;
return Status::OK();
}

CompressedInputStream::~CompressedInputStream() { internal::CloseFromDestructor(this); }
Expand Down
6 changes: 6 additions & 0 deletions cpp/src/arrow/io/compressed.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ class ARROW_EXPORT CompressedOutputStream : public OutputStream {
~CompressedOutputStream() override;

/// \brief Create a compressed output stream wrapping the given output stream.
///
/// The codec must be capable of streaming compression. Some codecs,
/// like Snappy, are not able to do so.
static Result<std::shared_ptr<CompressedOutputStream>> Make(
util::Codec* codec, const std::shared_ptr<OutputStream>& raw,
MemoryPool* pool = default_memory_pool());
Expand Down Expand Up @@ -82,6 +85,9 @@ class ARROW_EXPORT CompressedInputStream
~CompressedInputStream() override;

/// \brief Create a compressed input stream wrapping the given input stream.
///
/// The codec must be capable of streaming decompression. Some codecs,
/// like Snappy, are not able to do so.
static Result<std::shared_ptr<CompressedInputStream>> Make(
util::Codec* codec, const std::shared_ptr<InputStream>& raw,
MemoryPool* pool = default_memory_pool());
Expand Down
200 changes: 200 additions & 0 deletions cpp/src/arrow/io/compressed_benchmark.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "benchmark/benchmark.h"

#include <algorithm>
#include <cstdint>
#include <cstring>
#include <memory>
#include <random>
#include <string>
#include <vector>

#include "arrow/buffer.h"
#include "arrow/io/compressed.h"
#include "arrow/io/memory.h"
#include "arrow/result.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/util/compression.h"
#include "arrow/util/config.h"
#include "arrow/util/logging.h"
#include "arrow/util/macros.h"

namespace arrow::io {

using ::arrow::Compression;

std::vector<uint8_t> MakeCompressibleData(int data_size) {
// XXX This isn't a real-world corpus so doesn't really represent the
// comparative qualities of the algorithms

// First make highly compressible data
std::string base_data =
"Apache Arrow is a cross-language development platform for in-memory data";
int nrepeats = static_cast<int>(1 + data_size / base_data.size());

std::vector<uint8_t> data(base_data.size() * nrepeats);
for (int i = 0; i < nrepeats; ++i) {
std::memcpy(data.data() + i * base_data.size(), base_data.data(), base_data.size());
}
data.resize(data_size);

// Then randomly mutate some bytes so as to make things harder
std::mt19937 engine(42);
std::exponential_distribution<> offsets(0.05);
std::uniform_int_distribution<> values(0, 255);

int64_t pos = 0;
while (pos < data_size) {
data[pos] = static_cast<uint8_t>(values(engine));
pos += static_cast<int64_t>(offsets(engine));
}

return data;
}

// Using a non-zero copy buffer reader to benchmark the non-zero copy path.
class NonZeroCopyBufferReader final : public InputStream {
public:
NonZeroCopyBufferReader(std::shared_ptr<Buffer> buffer) : reader_(std::move(buffer)) {}

bool supports_zero_copy() const override { return false; }

Result<int64_t> Read(int64_t nbytes, void* out) override {
return reader_.Read(nbytes, out);
}

Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) override {
// Testing the non-zero copy path like reading from local file or Object store,
// so we need to allocate a buffer and copy the data.
ARROW_ASSIGN_OR_RAISE(auto buf, ::arrow::AllocateResizableBuffer(nbytes));
ARROW_ASSIGN_OR_RAISE(int64_t size, Read(nbytes, buf->mutable_data()));
ARROW_RETURN_NOT_OK(buf->Resize(size));
return buf;
}
Status Close() override { return reader_.Close(); }
Result<int64_t> Tell() const override { return reader_.Tell(); }
bool closed() const override { return reader_.closed(); }

private:
::arrow::io::BufferReader reader_;
};

enum class BufferReadMode { ProvidedByCaller, ReturnedByCallee };

template <typename BufReader, BufferReadMode Mode>
static void CompressedInputStreamBenchmark(::benchmark::State& state,
Compression::type compression) {
const int64_t input_size = state.range(0);
const int64_t batch_size = state.range(1);

const std::vector<uint8_t> data = MakeCompressibleData(static_cast<int>(input_size));
auto codec = ::arrow::util::Codec::Create(compression).ValueOrDie();
int64_t max_compress_len =
codec->MaxCompressedLen(static_cast<int64_t>(data.size()), data.data());
std::shared_ptr<::arrow::ResizableBuffer> buf =
::arrow::AllocateResizableBuffer(max_compress_len).ValueOrDie();
const int64_t compressed_length =
codec
->Compress(static_cast<int64_t>(data.size()), data.data(), max_compress_len,
buf->mutable_data())
.ValueOrDie();
ABORT_NOT_OK(buf->Resize(compressed_length));
for (auto _ : state) {
state.PauseTiming();
auto reader = std::make_shared<BufReader>(buf);
[[maybe_unused]] std::unique_ptr<Buffer> read_buffer;
if constexpr (Mode == BufferReadMode::ProvidedByCaller) {
read_buffer = ::arrow::AllocateBuffer(batch_size).ValueOrDie();
}
state.ResumeTiming();
// Put `CompressedInputStream::Make` in timing.
auto input_stream =
::arrow::io::CompressedInputStream::Make(codec.get(), reader).ValueOrDie();
auto remaining_size = input_size;
while (remaining_size > 0) {
if constexpr (Mode == BufferReadMode::ProvidedByCaller) {
auto value = input_stream->Read(batch_size, read_buffer->mutable_data());
ABORT_NOT_OK(value);
remaining_size -= value.ValueOrDie();
} else {
auto value = input_stream->Read(batch_size);
ABORT_NOT_OK(value);
remaining_size -= value.ValueOrDie()->size();
}
}
}
state.SetBytesProcessed(input_size * state.iterations());
}

template <Compression::type kCompression>
static void CompressedInputStreamZeroCopyBufferProvidedByCaller(
::benchmark::State& state) {
CompressedInputStreamBenchmark<::arrow::io::BufferReader,
BufferReadMode::ProvidedByCaller>(state, kCompression);
}

template <Compression::type kCompression>
static void CompressedInputStreamNonZeroCopyBufferProvidedByCaller(
::benchmark::State& state) {
CompressedInputStreamBenchmark<NonZeroCopyBufferReader,
BufferReadMode::ProvidedByCaller>(state, kCompression);
}

template <Compression::type kCompression>
static void CompressedInputStreamZeroCopyBufferReturnedByCallee(
::benchmark::State& state) {
CompressedInputStreamBenchmark<::arrow::io::BufferReader,
BufferReadMode::ReturnedByCallee>(state, kCompression);
}

template <Compression::type kCompression>
static void CompressedInputStreamNonZeroCopyBufferReturnedByCallee(
::benchmark::State& state) {
CompressedInputStreamBenchmark<NonZeroCopyBufferReader,
BufferReadMode::ReturnedByCallee>(state, kCompression);
}

static void CompressedInputArguments(::benchmark::internal::Benchmark* b) {
b->ArgNames({"num_bytes", "batch_size"})
->Args({8 * 1024, 8 * 1024})
->Args({64 * 1024, 8 * 1024})
->Args({64 * 1024, 64 * 1024})
->Args({1024 * 1024, 8 * 1024})
->Args({1024 * 1024, 64 * 1024})
->Args({1024 * 1024, 1024 * 1024});
}

#ifdef ARROW_WITH_LZ4
// Benchmark LZ4 because it's lightweight, which makes benchmarking focused on the
// overhead of the compression input stream.
BENCHMARK_TEMPLATE(CompressedInputStreamZeroCopyBufferProvidedByCaller,
Compression::LZ4_FRAME)
->Apply(CompressedInputArguments);
BENCHMARK_TEMPLATE(CompressedInputStreamNonZeroCopyBufferProvidedByCaller,
Compression::LZ4_FRAME)
->Apply(CompressedInputArguments);
BENCHMARK_TEMPLATE(CompressedInputStreamZeroCopyBufferReturnedByCallee,
Compression::LZ4_FRAME)
->Apply(CompressedInputArguments);
BENCHMARK_TEMPLATE(CompressedInputStreamNonZeroCopyBufferReturnedByCallee,
Compression::LZ4_FRAME)
->Apply(CompressedInputArguments);
#endif

} // namespace arrow::io