Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions cpp/src/arrow/io/test_common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,12 @@
#include "arrow/io/memory.h"
#include "arrow/memory_pool.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/util/io_util.h"

namespace arrow {

using internal::IOErrorFromErrno;

namespace io {

void AssertFileContents(const std::string& path, const std::string& contents) {
Expand All @@ -48,6 +52,28 @@ void AssertFileContents(const std::string& path, const std::string& contents) {

bool FileExists(const std::string& path) { return std::ifstream(path.c_str()).good(); }

Status PurgeLocalFileFromOsCache(const std::string& path) {
#if defined(POSIX_FADV_WILLNEED)
int fd = open(path.c_str(), O_WRONLY);
if (fd < 0) {
return IOErrorFromErrno(errno, "open on ", path,
" to clear from cache did not succeed.");
}
int err = posix_fadvise(fd, 0, 0, POSIX_FADV_DONTNEED);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

@westonpace westonpace Jan 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I looked at file.cc and realized we are using:

#if defined(POSIX_FADV_WILLNEED)

which seems much better. I can't use that #if in the benchmark itself though (unless I want to import fcntl.h which wouldn't be the worst thing) so I changed it to SkipWithError. This has the added advantage of making it clear to the user that we aren't running all the benchmarks.

if (err != 0) {
return IOErrorFromErrno(err, "fadvise on ", path,
" to clear from cache did not succeed");
}
err = close(fd);
if (err == 0) {
return Status::OK();
}
return IOErrorFromErrno(err, "close on ", path, " to clear from cache did not succeed");
#else
return Status::NotImplemented("posix_fadvise is not implemented on this machine");
#endif
}

#if defined(_WIN32)
static void InvalidParamHandler(const wchar_t* expr, const wchar_t* func,
const wchar_t* source_file, unsigned int source_line,
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/arrow/io/test_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ void AssertFileContents(const std::string& path, const std::string& contents);

ARROW_TESTING_EXPORT bool FileExists(const std::string& path);

ARROW_TESTING_EXPORT Status PurgeLocalFileFromOsCache(const std::string& path);

ARROW_TESTING_EXPORT bool FileIsClosed(int fd);

ARROW_TESTING_EXPORT
Expand Down
191 changes: 149 additions & 42 deletions cpp/src/arrow/ipc/read_write_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@
#include "arrow/buffer.h"
#include "arrow/io/file.h"
#include "arrow/io/memory.h"
#include "arrow/io/test_common.h"
#include "arrow/ipc/api.h"
#include "arrow/record_batch.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/random.h"
#include "arrow/type.h"
#include "arrow/util/io_util.h"

namespace arrow {

Expand All @@ -50,6 +52,27 @@ std::shared_ptr<RecordBatch> MakeRecordBatch(int64_t total_size, int64_t num_fie
return RecordBatch::Make(schema, length, arrays);
}

std::vector<int> GetIncludedFields(int64_t num_fields, int64_t is_partial_read) {
if (is_partial_read) {
std::vector<int> field_indices;
for (int i = 0; i < num_fields; i += 8) {
field_indices.push_back(i);
}
return field_indices;
} else {
return std::vector<int>();
}
}

int64_t BytesPerIteration(int64_t num_fields, int64_t is_partial_read,
int64_t total_size) {
std::size_t num_actual_fields = GetIncludedFields(num_fields, is_partial_read).size();
double selectivity = num_actual_fields / static_cast<double>(num_fields);
if (num_actual_fields == 0) selectivity = 1;
auto bytes = total_size * selectivity;
return static_cast<int64_t>(bytes);
}

static void WriteRecordBatch(benchmark::State& state) { // NOLINT non-const reference
// 1MB
constexpr int64_t kTotalSize = 1 << 20;
Expand Down Expand Up @@ -177,7 +200,8 @@ static void DecodeStream(benchmark::State& state) { // NOLINT non-const referen
} \
ABORT_NOT_OK(writer->Close()); \
ABORT_NOT_OK(stream.Close()); \
}
} \
constexpr int64_t total_size = kBatchSize * kBatches;
#endif

#define GENERATE_DATA_IN_MEMORY() \
Expand All @@ -192,7 +216,8 @@ static void DecodeStream(benchmark::State& state) { // NOLINT non-const referen
ABORT_NOT_OK(writer->WriteRecordBatch(*record_batch)); \
ABORT_NOT_OK(writer->Close()); \
ABORT_NOT_OK(stream.Close()); \
}
} \
constexpr int64_t total_size = kBatchSize * kBatches;

#define GENERATE_DATA_TEMP_FILE() \
constexpr int64_t kBatchSize = 1 << 20; /* 1 MB */ \
Expand All @@ -202,60 +227,142 @@ static void DecodeStream(benchmark::State& state) { // NOLINT non-const referen
{ \
auto record_batch = MakeRecordBatch(kBatchSize, state.range(0)); \
auto writer = *ipc::MakeFileWriter(sink, record_batch->schema(), options); \
ABORT_NOT_OK(writer->WriteRecordBatch(*record_batch)); \
for (int64_t i = 0; i < kBatches; i++) { \
ABORT_NOT_OK(writer->WriteRecordBatch(*record_batch)); \
} \
ABORT_NOT_OK(writer->Close()); \
ABORT_NOT_OK(sink->Close()); \
} \
constexpr int64_t total_size = kBatchSize * kBatches;

// Note: When working with real files we ensure each array is at least 4MB large
// This slows things down considerably but using smaller sized arrays will cause
// the I/O to bottleneck for partial reads which is not what we are trying to
// measure here (although this may be interesting to optimize someday)
#define GENERATE_DATA_REAL_FILE() \
constexpr int64_t kArraySize = (1 << 19) * sizeof(int64_t); /* 4 MB */ \
constexpr int64_t kBatches = 4; \
auto num_fields = state.range(0); \
auto options = ipc::IpcWriteOptions::Defaults(); \
ASSIGN_OR_ABORT(auto sink, io::FileOutputStream::Open("/tmp/benchmark.arrow")); \
{ \
auto batch_size = kArraySize * num_fields; \
auto record_batch = MakeRecordBatch(batch_size, num_fields); \
auto writer = *ipc::MakeFileWriter(sink, record_batch->schema(), options); \
for (int64_t i = 0; i < kBatches; i++) { \
ABORT_NOT_OK(writer->WriteRecordBatch(*record_batch)); \
} \
ABORT_NOT_OK(writer->Close()); \
ABORT_NOT_OK(sink->Close()); \
} \
int64_t total_size = kArraySize * kBatches * num_fields;

#define PURGE_OR_SKIP(FILE) \
{ \
auto status = io::PurgeLocalFileFromOsCache("/tmp/benchmark.arrow"); \
if (!status.ok()) { \
std::string err = "Cannot purge local files from cache: " + status.ToString(); \
state.SkipWithError(err.c_str()); \
} \
}

#define READ_DATA_IN_MEMORY() auto input = std::make_shared<io::BufferReader>(buffer);
#define READ_DATA_TEMP_FILE() \
ASSIGN_OR_ABORT(auto input, io::ReadableFile::Open("/tmp/benchmark.arrow"));
// This will not be correct if your system mounts /tmp to RAM (using tmpfs
// or ramfs).
Comment on lines +272 to +273
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How are our benchmark machines set up? (Trying to find out now)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the benchmark machine for C++ mounts /tmp to disk, so we should be all set.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, maybe this warning is superfluous. For some reason I always thought /tmp was mounted to tmpfs but now, reading up on it, it seems that is quite rare.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had the opposite impression, so that's good to know! But I think we can keep it to be safe. I just wanted to double-check what Conbench was doing.

#define READ_DATA_REAL_FILE() \
PURGE_OR_SKIP("/tmp/benchmark.arrow"); \
ASSIGN_OR_ABORT(auto input, io::ReadableFile::Open("/tmp/benchmark.arrow"));

#define READ_DATA_MMAP_FILE() \
ASSIGN_OR_ABORT(auto input, io::MemoryMappedFile::Open("/tmp/benchmark.arrow", \
io::FileMode::type::READ));
#define READ_DATA_MMAP_REAL_FILE() \
PURGE_OR_SKIP("/tmp/benchmark.arrow"); \
ASSIGN_OR_ABORT(auto input, io::MemoryMappedFile::Open("/tmp/benchmark.arrow", \
io::FileMode::type::READ));

#define READ_SYNC(NAME, GENERATE, READ) \
static void NAME(benchmark::State& state) { \
GENERATE(); \
for (auto _ : state) { \
READ(); \
auto reader = *ipc::RecordBatchFileReader::Open(input.get(), \
ipc::IpcReadOptions::Defaults()); \
const int num_batches = reader->num_record_batches(); \
for (int i = 0; i < num_batches; ++i) { \
auto batch = *reader->ReadRecordBatch(i); \
} \
} \
state.SetBytesProcessed(int64_t(state.iterations()) * kBatchSize * kBatches); \
} \
BENCHMARK(NAME)->RangeMultiplier(4)->Range(1, 1 << 13)->UseRealTime();

#define READ_ASYNC(NAME, GENERATE, READ) \
static void NAME##Async(benchmark::State& state) { \
GENERATE(); \
for (auto _ : state) { \
READ(); \
auto reader = *ipc::RecordBatchFileReader::Open(input.get(), \
ipc::IpcReadOptions::Defaults()); \
ASSIGN_OR_ABORT(auto generator, reader->GetRecordBatchGenerator()); \
const int num_batches = reader->num_record_batches(); \
for (int i = 0; i < num_batches; ++i) { \
auto batch = *generator().result(); \
} \
} \
state.SetBytesProcessed(int64_t(state.iterations()) * kBatchSize * kBatches); \
} \
BENCHMARK(NAME##Async)->RangeMultiplier(4)->Range(1, 1 << 13)->UseRealTime();
#define READ_SYNC(NAME, GENERATE, READ) \
static void NAME(benchmark::State& state) { \
GENERATE(); \
for (auto _ : state) { \
READ(); \
ipc::IpcReadOptions options; \
options.included_fields = GetIncludedFields(state.range(0), state.range(1)); \
auto reader = *ipc::RecordBatchFileReader::Open(input.get(), options); \
const int num_batches = reader->num_record_batches(); \
for (int i = 0; i < num_batches; ++i) { \
auto batch = *reader->ReadRecordBatch(i); \
} \
} \
int64_t bytes_per_iter = \
BytesPerIteration(state.range(0), state.range(1), total_size); \
state.SetBytesProcessed(int64_t(state.iterations()) * bytes_per_iter); \
} \
BENCHMARK(NAME)

#define READ_ASYNC(NAME, GENERATE, READ) \
static void NAME##Async(benchmark::State& state) { \
GENERATE(); \
for (auto _ : state) { \
READ(); \
ipc::IpcReadOptions options; \
options.included_fields = GetIncludedFields(state.range(0), state.range(1)); \
auto reader = *ipc::RecordBatchFileReader::Open(input.get(), options); \
ASSIGN_OR_ABORT(auto generator, reader->GetRecordBatchGenerator()); \
const int num_batches = reader->num_record_batches(); \
for (int i = 0; i < num_batches; ++i) { \
auto batch = *generator().result(); \
} \
} \
int64_t bytes_per_iter = \
BytesPerIteration(state.range(0), state.range(1), total_size); \
state.SetBytesProcessed(int64_t(state.iterations()) * bytes_per_iter); \
} \
BENCHMARK(NAME##Async)

const std::vector<std::string> kArgNames = {"num_cols", "is_partial"};

#define READ_BENCHMARK(NAME, GENERATE, READ) \
READ_SYNC(NAME, GENERATE, READ); \
READ_ASYNC(NAME, GENERATE, READ);

READ_BENCHMARK(ReadFile, GENERATE_DATA_IN_MEMORY, READ_DATA_IN_MEMORY);
READ_BENCHMARK(ReadTempFile, GENERATE_DATA_TEMP_FILE, READ_DATA_TEMP_FILE);
READ_BENCHMARK(ReadMmapFile, GENERATE_DATA_TEMP_FILE, READ_DATA_MMAP_FILE);
READ_SYNC(NAME, GENERATE, READ) \
->RangeMultiplier(8) \
->Ranges({{1, 1 << 12}, {0, 1}}) \
->ArgNames(kArgNames) \
->UseRealTime(); \
READ_ASYNC(NAME, GENERATE, READ) \
->RangeMultiplier(8) \
->ArgNames(kArgNames) \
->Ranges({{1, 1 << 12}, {0, 1}}) \
->UseRealTime();

READ_BENCHMARK(ReadBuffer, GENERATE_DATA_IN_MEMORY, READ_DATA_IN_MEMORY);
READ_BENCHMARK(ReadCachedFile, GENERATE_DATA_TEMP_FILE, READ_DATA_TEMP_FILE);
// We use READ_SYNC/READ_ASYNC directly here so we can reduce the parameter
// space as real files get quite large
READ_SYNC(ReadUncachedFile, GENERATE_DATA_REAL_FILE, READ_DATA_REAL_FILE)
->RangeMultiplier(8)
->ArgNames(kArgNames)
->Ranges({{1, 1 << 6}, {0, 1}})
->UseRealTime();
READ_ASYNC(ReadUncachedFile, GENERATE_DATA_REAL_FILE, READ_DATA_REAL_FILE)
->RangeMultiplier(8)
->ArgNames(kArgNames)
->Ranges({{1, 1 << 6}, {0, 1}})
->UseRealTime();
READ_BENCHMARK(ReadMmapCachedFile, GENERATE_DATA_TEMP_FILE, READ_DATA_MMAP_FILE);
READ_SYNC(ReadMmapUncachedFile, GENERATE_DATA_REAL_FILE, READ_DATA_MMAP_REAL_FILE)
->RangeMultiplier(8)
->ArgNames(kArgNames)
->Ranges({{1, 1 << 6}, {0, 1}})
->UseRealTime();
READ_ASYNC(ReadMmapUncachedFile, GENERATE_DATA_REAL_FILE, READ_DATA_MMAP_REAL_FILE)
->RangeMultiplier(8)
->ArgNames(kArgNames)
->Ranges({{1, 1 << 6}, {0, 1}})
->UseRealTime();
#ifdef ARROW_WITH_ZSTD
READ_BENCHMARK(ReadCompressedFile, GENERATE_COMPRESSED_DATA_IN_MEMORY,
READ_BENCHMARK(ReadCompressedBuffer, GENERATE_COMPRESSED_DATA_IN_MEMORY,
READ_DATA_IN_MEMORY);
#endif

Expand Down