diff --git a/cpp/src/arrow/io/test_common.cc b/cpp/src/arrow/io/test_common.cc index 0a9686a2868..2da6b4a1c5a 100644 --- a/cpp/src/arrow/io/test_common.cc +++ b/cpp/src/arrow/io/test_common.cc @@ -33,8 +33,12 @@ #include "arrow/io/memory.h" #include "arrow/memory_pool.h" #include "arrow/testing/gtest_util.h" +#include "arrow/util/io_util.h" namespace arrow { + +using internal::IOErrorFromErrno; + namespace io { void AssertFileContents(const std::string& path, const std::string& contents) { @@ -48,6 +52,28 @@ void AssertFileContents(const std::string& path, const std::string& contents) { bool FileExists(const std::string& path) { return std::ifstream(path.c_str()).good(); } +Status PurgeLocalFileFromOsCache(const std::string& path) { +#if defined(POSIX_FADV_WILLNEED) + int fd = open(path.c_str(), O_WRONLY); + if (fd < 0) { + return IOErrorFromErrno(errno, "open on ", path, + " to clear from cache did not succeed."); + } + int err = posix_fadvise(fd, 0, 0, POSIX_FADV_DONTNEED); + if (err != 0) { + return IOErrorFromErrno(err, "fadvise on ", path, + " to clear from cache did not succeed"); + } + err = close(fd); + if (err == 0) { + return Status::OK(); + } + return IOErrorFromErrno(err, "close on ", path, " to clear from cache did not succeed"); +#else + return Status::NotImplemented("posix_fadvise is not implemented on this machine"); +#endif +} + #if defined(_WIN32) static void InvalidParamHandler(const wchar_t* expr, const wchar_t* func, const wchar_t* source_file, unsigned int source_line, diff --git a/cpp/src/arrow/io/test_common.h b/cpp/src/arrow/io/test_common.h index ba263a3ad30..9b68c8104a7 100644 --- a/cpp/src/arrow/io/test_common.h +++ b/cpp/src/arrow/io/test_common.h @@ -34,6 +34,8 @@ void AssertFileContents(const std::string& path, const std::string& contents); ARROW_TESTING_EXPORT bool FileExists(const std::string& path); +ARROW_TESTING_EXPORT Status PurgeLocalFileFromOsCache(const std::string& path); + ARROW_TESTING_EXPORT bool FileIsClosed(int fd); ARROW_TESTING_EXPORT diff --git a/cpp/src/arrow/ipc/read_write_benchmark.cc b/cpp/src/arrow/ipc/read_write_benchmark.cc index ed00208672a..ed7e6957df1 100644 --- a/cpp/src/arrow/ipc/read_write_benchmark.cc +++ b/cpp/src/arrow/ipc/read_write_benchmark.cc @@ -24,11 +24,13 @@ #include "arrow/buffer.h" #include "arrow/io/file.h" #include "arrow/io/memory.h" +#include "arrow/io/test_common.h" #include "arrow/ipc/api.h" #include "arrow/record_batch.h" #include "arrow/testing/gtest_util.h" #include "arrow/testing/random.h" #include "arrow/type.h" +#include "arrow/util/io_util.h" namespace arrow { @@ -50,6 +52,27 @@ std::shared_ptr MakeRecordBatch(int64_t total_size, int64_t num_fie return RecordBatch::Make(schema, length, arrays); } +std::vector GetIncludedFields(int64_t num_fields, int64_t is_partial_read) { + if (is_partial_read) { + std::vector field_indices; + for (int i = 0; i < num_fields; i += 8) { + field_indices.push_back(i); + } + return field_indices; + } else { + return std::vector(); + } +} + +int64_t BytesPerIteration(int64_t num_fields, int64_t is_partial_read, + int64_t total_size) { + std::size_t num_actual_fields = GetIncludedFields(num_fields, is_partial_read).size(); + double selectivity = num_actual_fields / static_cast(num_fields); + if (num_actual_fields == 0) selectivity = 1; + auto bytes = total_size * selectivity; + return static_cast(bytes); +} + static void WriteRecordBatch(benchmark::State& state) { // NOLINT non-const reference // 1MB constexpr int64_t kTotalSize = 1 << 20; @@ -177,7 +200,8 @@ static void DecodeStream(benchmark::State& state) { // NOLINT non-const referen } \ ABORT_NOT_OK(writer->Close()); \ ABORT_NOT_OK(stream.Close()); \ - } + } \ + constexpr int64_t total_size = kBatchSize * kBatches; #endif #define GENERATE_DATA_IN_MEMORY() \ @@ -192,7 +216,8 @@ static void DecodeStream(benchmark::State& state) { // NOLINT non-const referen ABORT_NOT_OK(writer->WriteRecordBatch(*record_batch)); \ ABORT_NOT_OK(writer->Close()); \ ABORT_NOT_OK(stream.Close()); \ - } + } \ + constexpr int64_t total_size = kBatchSize * kBatches; #define GENERATE_DATA_TEMP_FILE() \ constexpr int64_t kBatchSize = 1 << 20; /* 1 MB */ \ @@ -202,60 +227,142 @@ static void DecodeStream(benchmark::State& state) { // NOLINT non-const referen { \ auto record_batch = MakeRecordBatch(kBatchSize, state.range(0)); \ auto writer = *ipc::MakeFileWriter(sink, record_batch->schema(), options); \ - ABORT_NOT_OK(writer->WriteRecordBatch(*record_batch)); \ + for (int64_t i = 0; i < kBatches; i++) { \ + ABORT_NOT_OK(writer->WriteRecordBatch(*record_batch)); \ + } \ + ABORT_NOT_OK(writer->Close()); \ + ABORT_NOT_OK(sink->Close()); \ + } \ + constexpr int64_t total_size = kBatchSize * kBatches; + +// Note: When working with real files we ensure each array is at least 4MB large +// This slows things down considerably but using smaller sized arrays will cause +// the I/O to bottleneck for partial reads which is not what we are trying to +// measure here (although this may be interesting to optimize someday) +#define GENERATE_DATA_REAL_FILE() \ + constexpr int64_t kArraySize = (1 << 19) * sizeof(int64_t); /* 4 MB */ \ + constexpr int64_t kBatches = 4; \ + auto num_fields = state.range(0); \ + auto options = ipc::IpcWriteOptions::Defaults(); \ + ASSIGN_OR_ABORT(auto sink, io::FileOutputStream::Open("/tmp/benchmark.arrow")); \ + { \ + auto batch_size = kArraySize * num_fields; \ + auto record_batch = MakeRecordBatch(batch_size, num_fields); \ + auto writer = *ipc::MakeFileWriter(sink, record_batch->schema(), options); \ + for (int64_t i = 0; i < kBatches; i++) { \ + ABORT_NOT_OK(writer->WriteRecordBatch(*record_batch)); \ + } \ ABORT_NOT_OK(writer->Close()); \ ABORT_NOT_OK(sink->Close()); \ + } \ + int64_t total_size = kArraySize * kBatches * num_fields; + +#define PURGE_OR_SKIP(FILE) \ + { \ + auto status = io::PurgeLocalFileFromOsCache("/tmp/benchmark.arrow"); \ + if (!status.ok()) { \ + std::string err = "Cannot purge local files from cache: " + status.ToString(); \ + state.SkipWithError(err.c_str()); \ + } \ } #define READ_DATA_IN_MEMORY() auto input = std::make_shared(buffer); #define READ_DATA_TEMP_FILE() \ ASSIGN_OR_ABORT(auto input, io::ReadableFile::Open("/tmp/benchmark.arrow")); +// This will not be correct if your system mounts /tmp to RAM (using tmpfs +// or ramfs). +#define READ_DATA_REAL_FILE() \ + PURGE_OR_SKIP("/tmp/benchmark.arrow"); \ + ASSIGN_OR_ABORT(auto input, io::ReadableFile::Open("/tmp/benchmark.arrow")); + #define READ_DATA_MMAP_FILE() \ ASSIGN_OR_ABORT(auto input, io::MemoryMappedFile::Open("/tmp/benchmark.arrow", \ io::FileMode::type::READ)); +#define READ_DATA_MMAP_REAL_FILE() \ + PURGE_OR_SKIP("/tmp/benchmark.arrow"); \ + ASSIGN_OR_ABORT(auto input, io::MemoryMappedFile::Open("/tmp/benchmark.arrow", \ + io::FileMode::type::READ)); -#define READ_SYNC(NAME, GENERATE, READ) \ - static void NAME(benchmark::State& state) { \ - GENERATE(); \ - for (auto _ : state) { \ - READ(); \ - auto reader = *ipc::RecordBatchFileReader::Open(input.get(), \ - ipc::IpcReadOptions::Defaults()); \ - const int num_batches = reader->num_record_batches(); \ - for (int i = 0; i < num_batches; ++i) { \ - auto batch = *reader->ReadRecordBatch(i); \ - } \ - } \ - state.SetBytesProcessed(int64_t(state.iterations()) * kBatchSize * kBatches); \ - } \ - BENCHMARK(NAME)->RangeMultiplier(4)->Range(1, 1 << 13)->UseRealTime(); - -#define READ_ASYNC(NAME, GENERATE, READ) \ - static void NAME##Async(benchmark::State& state) { \ - GENERATE(); \ - for (auto _ : state) { \ - READ(); \ - auto reader = *ipc::RecordBatchFileReader::Open(input.get(), \ - ipc::IpcReadOptions::Defaults()); \ - ASSIGN_OR_ABORT(auto generator, reader->GetRecordBatchGenerator()); \ - const int num_batches = reader->num_record_batches(); \ - for (int i = 0; i < num_batches; ++i) { \ - auto batch = *generator().result(); \ - } \ - } \ - state.SetBytesProcessed(int64_t(state.iterations()) * kBatchSize * kBatches); \ - } \ - BENCHMARK(NAME##Async)->RangeMultiplier(4)->Range(1, 1 << 13)->UseRealTime(); +#define READ_SYNC(NAME, GENERATE, READ) \ + static void NAME(benchmark::State& state) { \ + GENERATE(); \ + for (auto _ : state) { \ + READ(); \ + ipc::IpcReadOptions options; \ + options.included_fields = GetIncludedFields(state.range(0), state.range(1)); \ + auto reader = *ipc::RecordBatchFileReader::Open(input.get(), options); \ + const int num_batches = reader->num_record_batches(); \ + for (int i = 0; i < num_batches; ++i) { \ + auto batch = *reader->ReadRecordBatch(i); \ + } \ + } \ + int64_t bytes_per_iter = \ + BytesPerIteration(state.range(0), state.range(1), total_size); \ + state.SetBytesProcessed(int64_t(state.iterations()) * bytes_per_iter); \ + } \ + BENCHMARK(NAME) + +#define READ_ASYNC(NAME, GENERATE, READ) \ + static void NAME##Async(benchmark::State& state) { \ + GENERATE(); \ + for (auto _ : state) { \ + READ(); \ + ipc::IpcReadOptions options; \ + options.included_fields = GetIncludedFields(state.range(0), state.range(1)); \ + auto reader = *ipc::RecordBatchFileReader::Open(input.get(), options); \ + ASSIGN_OR_ABORT(auto generator, reader->GetRecordBatchGenerator()); \ + const int num_batches = reader->num_record_batches(); \ + for (int i = 0; i < num_batches; ++i) { \ + auto batch = *generator().result(); \ + } \ + } \ + int64_t bytes_per_iter = \ + BytesPerIteration(state.range(0), state.range(1), total_size); \ + state.SetBytesProcessed(int64_t(state.iterations()) * bytes_per_iter); \ + } \ + BENCHMARK(NAME##Async) + +const std::vector kArgNames = {"num_cols", "is_partial"}; #define READ_BENCHMARK(NAME, GENERATE, READ) \ - READ_SYNC(NAME, GENERATE, READ); \ - READ_ASYNC(NAME, GENERATE, READ); - -READ_BENCHMARK(ReadFile, GENERATE_DATA_IN_MEMORY, READ_DATA_IN_MEMORY); -READ_BENCHMARK(ReadTempFile, GENERATE_DATA_TEMP_FILE, READ_DATA_TEMP_FILE); -READ_BENCHMARK(ReadMmapFile, GENERATE_DATA_TEMP_FILE, READ_DATA_MMAP_FILE); + READ_SYNC(NAME, GENERATE, READ) \ + ->RangeMultiplier(8) \ + ->Ranges({{1, 1 << 12}, {0, 1}}) \ + ->ArgNames(kArgNames) \ + ->UseRealTime(); \ + READ_ASYNC(NAME, GENERATE, READ) \ + ->RangeMultiplier(8) \ + ->ArgNames(kArgNames) \ + ->Ranges({{1, 1 << 12}, {0, 1}}) \ + ->UseRealTime(); + +READ_BENCHMARK(ReadBuffer, GENERATE_DATA_IN_MEMORY, READ_DATA_IN_MEMORY); +READ_BENCHMARK(ReadCachedFile, GENERATE_DATA_TEMP_FILE, READ_DATA_TEMP_FILE); +// We use READ_SYNC/READ_ASYNC directly here so we can reduce the parameter +// space as real files get quite large +READ_SYNC(ReadUncachedFile, GENERATE_DATA_REAL_FILE, READ_DATA_REAL_FILE) + ->RangeMultiplier(8) + ->ArgNames(kArgNames) + ->Ranges({{1, 1 << 6}, {0, 1}}) + ->UseRealTime(); +READ_ASYNC(ReadUncachedFile, GENERATE_DATA_REAL_FILE, READ_DATA_REAL_FILE) + ->RangeMultiplier(8) + ->ArgNames(kArgNames) + ->Ranges({{1, 1 << 6}, {0, 1}}) + ->UseRealTime(); +READ_BENCHMARK(ReadMmapCachedFile, GENERATE_DATA_TEMP_FILE, READ_DATA_MMAP_FILE); +READ_SYNC(ReadMmapUncachedFile, GENERATE_DATA_REAL_FILE, READ_DATA_MMAP_REAL_FILE) + ->RangeMultiplier(8) + ->ArgNames(kArgNames) + ->Ranges({{1, 1 << 6}, {0, 1}}) + ->UseRealTime(); +READ_ASYNC(ReadMmapUncachedFile, GENERATE_DATA_REAL_FILE, READ_DATA_MMAP_REAL_FILE) + ->RangeMultiplier(8) + ->ArgNames(kArgNames) + ->Ranges({{1, 1 << 6}, {0, 1}}) + ->UseRealTime(); #ifdef ARROW_WITH_ZSTD -READ_BENCHMARK(ReadCompressedFile, GENERATE_COMPRESSED_DATA_IN_MEMORY, +READ_BENCHMARK(ReadCompressedBuffer, GENERATE_COMPRESSED_DATA_IN_MEMORY, READ_DATA_IN_MEMORY); #endif