From 9bd79146b76614350b698bb88cba4257fced6a48 Mon Sep 17 00:00:00 2001
From: David Li
Date: Fri, 5 Mar 2021 16:22:27 -0500
Subject: [PATCH 1/3] ARROW-11772: [C++] Add IPC record batch generator
---
cpp/src/arrow/ipc/message.cc | 55 ++++
cpp/src/arrow/ipc/message.h | 5 +
cpp/src/arrow/ipc/read_write_benchmark.cc | 127 ++++++---
cpp/src/arrow/ipc/read_write_test.cc | 65 +++++
cpp/src/arrow/ipc/reader.cc | 317 +++++++++++++++++-----
cpp/src/arrow/ipc/reader.h | 35 ++-
cpp/src/arrow/memory_pool.cc | 24 ++
7 files changed, 535 insertions(+), 93 deletions(-)
diff --git a/cpp/src/arrow/ipc/message.cc b/cpp/src/arrow/ipc/message.cc
index 6db8a0f0d3d..197556efcea 100644
--- a/cpp/src/arrow/ipc/message.cc
+++ b/cpp/src/arrow/ipc/message.cc
@@ -33,6 +33,7 @@
#include "arrow/ipc/util.h"
#include "arrow/status.h"
#include "arrow/util/endian.h"
+#include "arrow/util/future.h"
#include "arrow/util/logging.h"
#include "arrow/util/ubsan.h"
@@ -324,6 +325,60 @@ Result> ReadMessage(int64_t offset, int32_t metadata_le
}
}
+Future> ReadMessageAsync(int64_t offset, int32_t metadata_length,
+ int64_t body_length,
+ io::RandomAccessFile* file,
+ const io::IOContext& context) {
+ struct State {
+ std::unique_ptr result;
+ std::shared_ptr listener;
+ std::shared_ptr decoder;
+ };
+ auto state = std::make_shared();
+ state->listener = std::make_shared(&state->result);
+ state->decoder = std::make_shared(state->listener);
+
+ if (metadata_length < state->decoder->next_required_size()) {
+ return Status::Invalid("metadata_length should be at least ",
+ state->decoder->next_required_size());
+ }
+ return file->ReadAsync(context, offset, metadata_length + body_length)
+ .Then([=](std::shared_ptr metadata) -> Result> {
+ if (metadata->size() < metadata_length) {
+ return Status::Invalid("Expected to read ", metadata_length,
+ " metadata bytes but got ", metadata->size());
+ }
+ ARROW_RETURN_NOT_OK(
+ state->decoder->Consume(SliceBuffer(metadata, 0, metadata_length)));
+ switch (state->decoder->state()) {
+ case MessageDecoder::State::INITIAL:
+ return std::move(state->result);
+ case MessageDecoder::State::METADATA_LENGTH:
+ return Status::Invalid("metadata length is missing. File offset: ", offset,
+ ", metadata length: ", metadata_length);
+ case MessageDecoder::State::METADATA:
+ return Status::Invalid("flatbuffer size ",
+ state->decoder->next_required_size(),
+ " invalid. File offset: ", offset,
+ ", metadata length: ", metadata_length);
+ case MessageDecoder::State::BODY: {
+ auto body = SliceBuffer(metadata, metadata_length, body_length);
+ if (body->size() < state->decoder->next_required_size()) {
+ return Status::IOError("Expected to be able to read ",
+ state->decoder->next_required_size(),
+ " bytes for message body, got ", body->size());
+ }
+ RETURN_NOT_OK(state->decoder->Consume(body));
+ return std::move(state->result);
+ }
+ case MessageDecoder::State::EOS:
+ return Status::Invalid("Unexpected empty message in IPC file format");
+ default:
+ return Status::Invalid("Unexpected state: ", state->decoder->state());
+ }
+ });
+}
+
Status AlignStream(io::InputStream* stream, int32_t alignment) {
ARROW_ASSIGN_OR_RAISE(int64_t position, stream->Tell());
return stream->Advance(PaddedLength(position, alignment) - position);
diff --git a/cpp/src/arrow/ipc/message.h b/cpp/src/arrow/ipc/message.h
index 6a7619d31b3..b2683259cb4 100644
--- a/cpp/src/arrow/ipc/message.h
+++ b/cpp/src/arrow/ipc/message.h
@@ -459,6 +459,11 @@ Result> ReadMessage(const int64_t offset,
const int32_t metadata_length,
io::RandomAccessFile* file);
+ARROW_EXPORT
+Future> ReadMessageAsync(
+ const int64_t offset, const int32_t metadata_length, const int64_t body_length,
+ io::RandomAccessFile* file, const io::IOContext& context = io::default_io_context());
+
/// \brief Advance stream to an 8-byte offset if its position is not a multiple
/// of 8 already
/// \param[in] stream an input stream
diff --git a/cpp/src/arrow/ipc/read_write_benchmark.cc b/cpp/src/arrow/ipc/read_write_benchmark.cc
index a56dd3579e2..bc032f451ac 100644
--- a/cpp/src/arrow/ipc/read_write_benchmark.cc
+++ b/cpp/src/arrow/ipc/read_write_benchmark.cc
@@ -21,6 +21,7 @@
#include
#include
+#include "arrow/io/file.h"
#include "arrow/io/memory.h"
#include "arrow/ipc/api.h"
#include "arrow/record_batch.h"
@@ -90,36 +91,6 @@ static void ReadRecordBatch(benchmark::State& state) { // NOLINT non-const refe
state.SetBytesProcessed(int64_t(state.iterations()) * kTotalSize);
}
-static void ReadFile(benchmark::State& state) { // NOLINT non-const reference
- // 1MB
- constexpr int64_t kTotalSize = 1 << 20;
- auto options = ipc::IpcWriteOptions::Defaults();
-
- std::shared_ptr buffer = *AllocateResizableBuffer(1024);
- {
- // Make Arrow IPC file
- auto record_batch = MakeRecordBatch(kTotalSize, state.range(0));
-
- io::BufferOutputStream stream(buffer);
- auto writer = *ipc::MakeFileWriter(&stream, record_batch->schema(), options);
- ABORT_NOT_OK(writer->WriteRecordBatch(*record_batch));
- ABORT_NOT_OK(writer->Close());
- ABORT_NOT_OK(stream.Close());
- }
-
- ipc::DictionaryMemo empty_memo;
- while (state.KeepRunning()) {
- io::BufferReader input(buffer);
- auto reader =
- *ipc::RecordBatchFileReader::Open(&input, ipc::IpcReadOptions::Defaults());
- const int num_batches = reader->num_record_batches();
- for (int i = 0; i < num_batches; ++i) {
- auto batch = *reader->ReadRecordBatch(i);
- }
- }
- state.SetBytesProcessed(int64_t(state.iterations()) * kTotalSize);
-}
-
static void ReadStream(benchmark::State& state) { // NOLINT non-const reference
// 1MB
constexpr int64_t kTotalSize = 1 << 20;
@@ -188,9 +159,103 @@ static void DecodeStream(benchmark::State& state) { // NOLINT non-const referen
state.SetBytesProcessed(int64_t(state.iterations()) * kTotalSize);
}
+#define GENERATE_COMPRESSED_DATA_IN_MEMORY() \
+ constexpr int64_t kTotalSize = 1 << 20; /* 1 MB */ \
+ constexpr int64_t kBatches = 16; \
+ auto options = ipc::IpcWriteOptions::Defaults(); \
+ ASSIGN_OR_ABORT(options.codec, \
+ arrow::util::Codec::Create(arrow::Compression::type::ZSTD)); \
+ std::shared_ptr buffer = *AllocateResizableBuffer(1024); \
+ { \
+ auto record_batch = MakeRecordBatch(kTotalSize, state.range(0)); \
+ io::BufferOutputStream stream(buffer); \
+ auto writer = *ipc::MakeFileWriter(&stream, record_batch->schema(), options); \
+ for (int i = 0; i < kBatches; i++) { \
+ ABORT_NOT_OK(writer->WriteRecordBatch(*record_batch)); \
+ } \
+ ABORT_NOT_OK(writer->Close()); \
+ ABORT_NOT_OK(stream.Close()); \
+ }
+
+#define GENERATE_DATA_IN_MEMORY() \
+ constexpr int64_t kTotalSize = 1 << 20; /* 1 MB */ \
+ constexpr int64_t kBatches = 1; \
+ auto options = ipc::IpcWriteOptions::Defaults(); \
+ std::shared_ptr buffer = *AllocateResizableBuffer(1024); \
+ { \
+ auto record_batch = MakeRecordBatch(kTotalSize, state.range(0)); \
+ io::BufferOutputStream stream(buffer); \
+ auto writer = *ipc::MakeFileWriter(&stream, record_batch->schema(), options); \
+ ABORT_NOT_OK(writer->WriteRecordBatch(*record_batch)); \
+ ABORT_NOT_OK(writer->Close()); \
+ ABORT_NOT_OK(stream.Close()); \
+ }
+
+#define GENERATE_DATA_TEMP_FILE() \
+ constexpr int64_t kTotalSize = 1 << 20; /* 1 MB */ \
+ constexpr int64_t kBatches = 16; \
+ auto options = ipc::IpcWriteOptions::Defaults(); \
+ ASSIGN_OR_ABORT(auto sink, io::FileOutputStream::Open("/tmp/benchmark.arrow")); \
+ { \
+ auto record_batch = MakeRecordBatch(kTotalSize, state.range(0)); \
+ auto writer = *ipc::MakeFileWriter(sink, record_batch->schema(), options); \
+ ABORT_NOT_OK(writer->WriteRecordBatch(*record_batch)); \
+ ABORT_NOT_OK(writer->Close()); \
+ ABORT_NOT_OK(sink->Close()); \
+ }
+
+#define READ_DATA_IN_MEMORY() auto input = std::make_shared(buffer);
+#define READ_DATA_TEMP_FILE() \
+ ASSIGN_OR_ABORT(auto input, io::ReadableFile::Open("/tmp/benchmark.arrow"));
+#define READ_DATA_MMAP_FILE() \
+ ASSIGN_OR_ABORT(auto input, io::MemoryMappedFile::Open("/tmp/benchmark.arrow", \
+ io::FileMode::type::READ));
+
+#define READ_SYNC(NAME, GENERATE, READ) \
+ static void NAME(benchmark::State& state) { \
+ GENERATE(); \
+ for (auto _ : state) { \
+ READ(); \
+ auto reader = *ipc::RecordBatchFileReader::Open(input.get(), \
+ ipc::IpcReadOptions::Defaults()); \
+ const int num_batches = reader->num_record_batches(); \
+ for (int i = 0; i < num_batches; ++i) { \
+ auto batch = *reader->ReadRecordBatch(i); \
+ } \
+ } \
+ state.SetBytesProcessed(int64_t(state.iterations()) * kTotalSize * kBatches); \
+ } \
+ BENCHMARK(NAME)->RangeMultiplier(4)->Range(1, 1 << 13)->UseRealTime();
+
+#define READ_ASYNC(NAME, GENERATE, READ) \
+ static void NAME##Async(benchmark::State& state) { \
+ GENERATE(); \
+ for (auto _ : state) { \
+ READ(); \
+ auto reader = *ipc::RecordBatchFileReader::Open(input.get(), \
+ ipc::IpcReadOptions::Defaults()); \
+ ASSIGN_OR_ABORT(auto generator, reader->GetRecordBatchGenerator()); \
+ const int num_batches = reader->num_record_batches(); \
+ for (int i = 0; i < num_batches; ++i) { \
+ auto batch = *generator().result(); \
+ } \
+ } \
+ state.SetBytesProcessed(int64_t(state.iterations()) * kTotalSize * kBatches); \
+ } \
+ BENCHMARK(NAME##Async)->RangeMultiplier(4)->Range(1, 1 << 13)->UseRealTime();
+
+#define READ_BENCHMARK(NAME, GENERATE, READ) \
+ READ_SYNC(NAME, GENERATE, READ); \
+ READ_ASYNC(NAME, GENERATE, READ);
+
+READ_BENCHMARK(ReadFile, GENERATE_DATA_IN_MEMORY, READ_DATA_IN_MEMORY);
+READ_BENCHMARK(ReadTempFile, GENERATE_DATA_TEMP_FILE, READ_DATA_TEMP_FILE);
+READ_BENCHMARK(ReadMmapFile, GENERATE_DATA_TEMP_FILE, READ_DATA_MMAP_FILE);
+READ_BENCHMARK(ReadCompressedFile, GENERATE_COMPRESSED_DATA_IN_MEMORY,
+ READ_DATA_IN_MEMORY);
+
BENCHMARK(WriteRecordBatch)->RangeMultiplier(4)->Range(1, 1 << 13)->UseRealTime();
BENCHMARK(ReadRecordBatch)->RangeMultiplier(4)->Range(1, 1 << 13)->UseRealTime();
-BENCHMARK(ReadFile)->RangeMultiplier(4)->Range(1, 1 << 13)->UseRealTime();
BENCHMARK(ReadStream)->RangeMultiplier(4)->Range(1, 1 << 13)->UseRealTime();
BENCHMARK(DecodeStream)->RangeMultiplier(4)->Range(1, 1 << 13)->UseRealTime();
diff --git a/cpp/src/arrow/ipc/read_write_test.cc b/cpp/src/arrow/ipc/read_write_test.cc
index 2efa79de8e0..7a75a48aaeb 100644
--- a/cpp/src/arrow/ipc/read_write_test.cc
+++ b/cpp/src/arrow/ipc/read_write_test.cc
@@ -40,6 +40,7 @@
#include "arrow/status.h"
#include "arrow/table.h"
#include "arrow/testing/extension_type.h"
+#include "arrow/testing/future_util.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/random.h"
#include "arrow/testing/util.h"
@@ -1009,6 +1010,47 @@ struct FileWriterHelper {
int64_t footer_offset_;
};
+struct FileGeneratorWriterHelper : public FileWriterHelper {
+ Status ReadBatches(const IpcReadOptions& options, RecordBatchVector* out_batches,
+ ReadStats* out_stats = nullptr) override {
+ auto buf_reader = std::make_shared(buffer_);
+ AsyncGenerator> generator;
+
+ {
+ auto fut =
+ RecordBatchFileReader::OpenAsync(buf_reader.get(), footer_offset_, options);
+ RETURN_NOT_OK(fut.status());
+ EXPECT_FINISHES_OK_AND_ASSIGN(auto reader, fut);
+ EXPECT_EQ(num_batches_written_, reader->num_record_batches());
+ // Generator's lifetime is independent of the reader's
+ ARROW_ASSIGN_OR_RAISE(generator, reader->GetRecordBatchGenerator());
+ }
+
+ // Generator is async-reentrant
+ std::vector>> futures;
+ for (int i = 0; i < num_batches_written_; ++i) {
+ futures.push_back(generator());
+ }
+ auto fut = generator();
+ EXPECT_FINISHES_OK_AND_ASSIGN(auto extra_read, fut);
+ EXPECT_EQ(nullptr, extra_read);
+ for (auto& future : futures) {
+ EXPECT_FINISHES_OK_AND_ASSIGN(auto batch, future);
+ out_batches->push_back(batch);
+ }
+
+ // The generator doesn't track stats.
+ EXPECT_EQ(nullptr, out_stats);
+
+ return Status::OK();
+ }
+
+ Status Read(const IpcReadOptions& options, RecordBatchVector* out_batches,
+ ReadStats* out_stats = nullptr) override {
+ return ReadBatches(options, out_batches, out_stats);
+ }
+};
+
struct StreamWriterHelper {
static constexpr bool kIsFileFormat = false;
@@ -1342,6 +1384,9 @@ class ReaderWriterMixin : public ExtensionTypesMixin {
class TestFileFormat : public ReaderWriterMixin,
public ::testing::TestWithParam {};
+class TestFileFormatGenerator : public ReaderWriterMixin,
+ public ::testing::TestWithParam {};
+
class TestStreamFormat : public ReaderWriterMixin,
public ::testing::TestWithParam {};
@@ -1366,6 +1411,16 @@ TEST_P(TestFileFormat, RoundTrip) {
TestZeroLengthRoundTrip(*GetParam(), options);
}
+TEST_P(TestFileFormatGenerator, RoundTrip) {
+ TestRoundTrip(*GetParam(), IpcWriteOptions::Defaults());
+ TestZeroLengthRoundTrip(*GetParam(), IpcWriteOptions::Defaults());
+
+ IpcWriteOptions options;
+ options.write_legacy_ipc_format = true;
+ TestRoundTrip(*GetParam(), options);
+ TestZeroLengthRoundTrip(*GetParam(), options);
+}
+
Status MakeDictionaryBatch(std::shared_ptr* out) {
auto f0_type = arrow::dictionary(int32(), utf8());
auto f1_type = arrow::dictionary(int8(), utf8());
@@ -1571,6 +1626,8 @@ INSTANTIATE_TEST_SUITE_P(GenericIpcRoundTripTests, TestIpcRoundTrip,
::testing::ValuesIn(kBatchCases));
INSTANTIATE_TEST_SUITE_P(FileRoundTripTests, TestFileFormat,
::testing::ValuesIn(kBatchCases));
+INSTANTIATE_TEST_SUITE_P(FileRoundTripTests, TestFileFormatGenerator,
+ ::testing::ValuesIn(kBatchCases));
INSTANTIATE_TEST_SUITE_P(StreamRoundTripTests, TestStreamFormat,
::testing::ValuesIn(kBatchCases));
INSTANTIATE_TEST_SUITE_P(StreamDecoderDataRoundTripTests, TestStreamDecoderData,
@@ -1635,18 +1692,26 @@ TEST_F(TestStreamFormat, DictionaryRoundTrip) { TestDictionaryRoundtrip(); }
TEST_F(TestFileFormat, DictionaryRoundTrip) { TestDictionaryRoundtrip(); }
+TEST_F(TestFileFormatGenerator, DictionaryRoundTrip) { TestDictionaryRoundtrip(); }
+
TEST_F(TestStreamFormat, DifferentSchema) { TestWriteDifferentSchema(); }
TEST_F(TestFileFormat, DifferentSchema) { TestWriteDifferentSchema(); }
+TEST_F(TestFileFormatGenerator, DifferentSchema) { TestWriteDifferentSchema(); }
+
TEST_F(TestStreamFormat, NoRecordBatches) { TestWriteNoRecordBatches(); }
TEST_F(TestFileFormat, NoRecordBatches) { TestWriteNoRecordBatches(); }
+TEST_F(TestFileFormatGenerator, NoRecordBatches) { TestWriteNoRecordBatches(); }
+
TEST_F(TestStreamFormat, ReadFieldSubset) { TestReadSubsetOfFields(); }
TEST_F(TestFileFormat, ReadFieldSubset) { TestReadSubsetOfFields(); }
+TEST_F(TestFileFormatGenerator, ReadFieldSubset) { TestReadSubsetOfFields(); }
+
TEST(TestRecordBatchStreamReader, EmptyStreamWithDictionaries) {
// ARROW-6006
auto f0 = arrow::field("f0", arrow::dictionary(arrow::int8(), arrow::utf8()));
diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
index 45a3d3e3cd8..faf0acd3cfd 100644
--- a/cpp/src/arrow/ipc/reader.cc
+++ b/cpp/src/arrow/ipc/reader.cc
@@ -51,6 +51,7 @@
#include "arrow/util/logging.h"
#include "arrow/util/parallel.h"
#include "arrow/util/string.h"
+#include "arrow/util/thread_pool.h"
#include "arrow/util/ubsan.h"
#include "arrow/visitor_inline.h"
@@ -958,10 +959,91 @@ Result> RecordBatchStreamReader::Open(
// ----------------------------------------------------------------------
// Reader implementation
+// Common functions used in both the random-access file reader and the
+// asynchronous generator
static inline FileBlock FileBlockFromFlatbuffer(const flatbuf::Block* block) {
return FileBlock{block->offset(), block->metaDataLength(), block->bodyLength()};
}
+static Result> ReadMessageFromBlock(const FileBlock& block,
+ io::RandomAccessFile* file) {
+ if (!BitUtil::IsMultipleOf8(block.offset) ||
+ !BitUtil::IsMultipleOf8(block.metadata_length) ||
+ !BitUtil::IsMultipleOf8(block.body_length)) {
+ return Status::Invalid("Unaligned block in IPC file");
+ }
+
+ // TODO(wesm): this breaks integration tests, see ARROW-3256
+ // DCHECK_EQ((*out)->body_length(), block.body_length);
+
+ ARROW_ASSIGN_OR_RAISE(auto message,
+ ReadMessage(block.offset, block.metadata_length, file));
+ return std::move(message);
+}
+
+static Future> ReadMessageFromBlockAsync(
+ const FileBlock& block, io::RandomAccessFile* file, const io::IOContext& io_context) {
+ if (!BitUtil::IsMultipleOf8(block.offset) ||
+ !BitUtil::IsMultipleOf8(block.metadata_length) ||
+ !BitUtil::IsMultipleOf8(block.body_length)) {
+ return Status::Invalid("Unaligned block in IPC file");
+ }
+
+ // TODO(wesm): this breaks integration tests, see ARROW-3256
+ // DCHECK_EQ((*out)->body_length(), block.body_length);
+
+ return ReadMessageAsync(block.offset, block.metadata_length, block.body_length, file,
+ io_context);
+}
+
+static Status ReadOneDictionary(Message* message, const IpcReadContext& context) {
+ CHECK_HAS_BODY(*message);
+ ARROW_ASSIGN_OR_RAISE(auto reader, Buffer::GetReader(message->body()));
+ DictionaryKind kind;
+ RETURN_NOT_OK(ReadDictionary(*message->metadata(), context, &kind, reader.get()));
+ if (kind != DictionaryKind::New) {
+ return Status::Invalid(
+ "Unsupported dictionary replacement or "
+ "dictionary delta in IPC file");
+ }
+ return Status::OK();
+}
+
+class RecordBatchFileReaderImpl;
+
+/// A generator of record batches.
+///
+/// All batches are yielded in order.
+class ARROW_EXPORT IpcFileRecordBatchGenerator {
+ public:
+ using Item = std::shared_ptr;
+
+ explicit IpcFileRecordBatchGenerator(std::shared_ptr state,
+ const io::IOContext& io_context,
+ arrow::internal::Executor* executor)
+ : state_(std::move(state)),
+ io_context_(io_context),
+ executor_(executor),
+ index_(0) {}
+
+ Future- operator()();
+
+ static Result> ReadDictionaries(
+ RecordBatchFileReaderImpl* state,
+ std::vector> dictionary_messages);
+
+ static Result> ReadRecordBatch(
+ RecordBatchFileReaderImpl* state, Message* message);
+
+ private:
+ std::shared_ptr state_;
+ io::IOContext io_context_;
+ arrow::internal::Executor* executor_;
+ int index_;
+ // Odd Future type, but this lets us use All() easily
+ Future> read_dictionaries_;
+};
+
class RecordBatchFileReaderImpl : public RecordBatchFileReader {
public:
RecordBatchFileReaderImpl() : file_(NULLPTR), footer_offset_(0), footer_(NULLPTR) {}
@@ -1035,13 +1117,46 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
return Status::OK();
}
+ Future<> OpenAsync(const std::shared_ptr& file,
+ int64_t footer_offset, const IpcReadOptions& options) {
+ owned_file_ = file;
+ return OpenAsync(file.get(), footer_offset, options);
+ }
+
+ Future<> OpenAsync(io::RandomAccessFile* file, int64_t footer_offset,
+ const IpcReadOptions& options) {
+ file_ = file;
+ options_ = options;
+ footer_offset_ = footer_offset;
+ auto cpu_executor = ::arrow::internal::GetCpuThreadPool();
+ auto self = std::dynamic_pointer_cast(shared_from_this());
+ return ReadFooterAsync(cpu_executor).Then([self, options](...) -> Status {
+ // Get the schema and record any observed dictionaries
+ RETURN_NOT_OK(UnpackSchemaMessage(
+ self->footer_->schema(), options, &self->dictionary_memo_, &self->schema_,
+ &self->out_schema_, &self->field_inclusion_mask_, &self->swap_endian_));
+ ++self->stats_.num_messages;
+ return Status::OK();
+ });
+ }
+
std::shared_ptr schema() const override { return out_schema_; }
std::shared_ptr metadata() const override { return metadata_; }
ReadStats stats() const override { return stats_; }
+ Result>> GetRecordBatchGenerator(
+ const io::IOContext& io_context, arrow::internal::Executor* executor) override {
+ auto state = std::dynamic_pointer_cast(shared_from_this());
+ return IpcFileRecordBatchGenerator(std::move(state), io_context, executor);
+ }
+
private:
+ friend AsyncGenerator> MakeMessageGenerator(
+ std::shared_ptr, const io::IOContext&);
+ friend class IpcFileRecordBatchGenerator;
+
FileBlock GetRecordBatchBlock(int i) const {
return FileBlockFromFlatbuffer(footer_->recordBatches()->Get(i));
}
@@ -1051,42 +1166,28 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
}
Result> ReadMessageFromBlock(const FileBlock& block) {
- if (!BitUtil::IsMultipleOf8(block.offset) ||
- !BitUtil::IsMultipleOf8(block.metadata_length) ||
- !BitUtil::IsMultipleOf8(block.body_length)) {
- return Status::Invalid("Unaligned block in IPC file");
- }
-
- // TODO(wesm): this breaks integration tests, see ARROW-3256
- // DCHECK_EQ((*out)->body_length(), block.body_length);
-
- ARROW_ASSIGN_OR_RAISE(auto message,
- ReadMessage(block.offset, block.metadata_length, file_));
+ ARROW_ASSIGN_OR_RAISE(auto message, arrow::ipc::ReadMessageFromBlock(block, file_));
++stats_.num_messages;
return std::move(message);
}
Status ReadDictionaries() {
// Read all the dictionaries
+ IpcReadContext context(&dictionary_memo_, options_, swap_endian_);
for (int i = 0; i < num_dictionaries(); ++i) {
ARROW_ASSIGN_OR_RAISE(auto message, ReadMessageFromBlock(GetDictionaryBlock(i)));
-
- CHECK_HAS_BODY(*message);
- ARROW_ASSIGN_OR_RAISE(auto reader, Buffer::GetReader(message->body()));
- DictionaryKind kind;
- IpcReadContext context(&dictionary_memo_, options_, swap_endian_);
- RETURN_NOT_OK(ReadDictionary(*message->metadata(), context, &kind, reader.get()));
+ RETURN_NOT_OK(ReadOneDictionary(message.get(), context));
++stats_.num_dictionary_batches;
- if (kind != DictionaryKind::New) {
- return Status::Invalid(
- "Unsupported dictionary replacement or "
- "dictionary delta in IPC file");
- }
}
return Status::OK();
}
Status ReadFooter() {
+ auto fut = ReadFooterAsync(/*executor=*/nullptr);
+ return fut.status();
+ }
+
+ Future<> ReadFooterAsync(arrow::internal::Executor* executor) {
const int32_t magic_size = static_cast(strlen(kArrowMagicBytes));
if (footer_offset_ <= magic_size * 2 + 4) {
@@ -1094,45 +1195,53 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
}
int file_end_size = static_cast(magic_size + sizeof(int32_t));
- ARROW_ASSIGN_OR_RAISE(auto buffer,
- file_->ReadAt(footer_offset_ - file_end_size, file_end_size));
-
- const int64_t expected_footer_size = magic_size + sizeof(int32_t);
- if (buffer->size() < expected_footer_size) {
- return Status::Invalid("Unable to read ", expected_footer_size, "from end of file");
- }
-
- if (memcmp(buffer->data() + sizeof(int32_t), kArrowMagicBytes, magic_size)) {
- return Status::Invalid("Not an Arrow file");
- }
-
- int32_t footer_length =
- BitUtil::FromLittleEndian(*reinterpret_cast(buffer->data()));
-
- if (footer_length <= 0 || footer_length > footer_offset_ - magic_size * 2 - 4) {
- return Status::Invalid("File is smaller than indicated metadata size");
- }
-
- // Now read the footer
- ARROW_ASSIGN_OR_RAISE(
- footer_buffer_,
- file_->ReadAt(footer_offset_ - footer_length - file_end_size, footer_length));
-
- const auto data = footer_buffer_->data();
- const auto size = footer_buffer_->size();
- if (!internal::VerifyFlatbuffers(data, size)) {
- return Status::IOError("Verification of flatbuffer-encoded Footer failed.");
- }
- footer_ = flatbuf::GetFooter(data);
-
- auto fb_metadata = footer_->custom_metadata();
- if (fb_metadata != nullptr) {
- std::shared_ptr md;
- RETURN_NOT_OK(internal::GetKeyValueMetadata(fb_metadata, &md));
- metadata_ = std::move(md); // const-ify
- }
-
- return Status::OK();
+ auto self = std::dynamic_pointer_cast(shared_from_this());
+ auto read_magic = file_->ReadAsync(footer_offset_ - file_end_size, file_end_size);
+ if (executor) read_magic = executor->Transfer(std::move(read_magic));
+ return read_magic
+ .Then([=](const std::shared_ptr& buffer)
+ -> Future> {
+ const int64_t expected_footer_size = magic_size + sizeof(int32_t);
+ if (buffer->size() < expected_footer_size) {
+ return Status::Invalid("Unable to read ", expected_footer_size,
+ "from end of file");
+ }
+
+ if (memcmp(buffer->data() + sizeof(int32_t), kArrowMagicBytes, magic_size)) {
+ return Status::Invalid("Not an Arrow file");
+ }
+
+ int32_t footer_length = BitUtil::FromLittleEndian(
+ *reinterpret_cast(buffer->data()));
+
+ if (footer_length <= 0 ||
+ footer_length > self->footer_offset_ - magic_size * 2 - 4) {
+ return Status::Invalid("File is smaller than indicated metadata size");
+ }
+
+ // Now read the footer
+ auto read_footer = self->file_->ReadAsync(
+ self->footer_offset_ - footer_length - file_end_size, footer_length);
+ if (executor) read_footer = executor->Transfer(std::move(read_footer));
+ return read_footer;
+ })
+ .Then([=](const std::shared_ptr& buffer) -> Status {
+ self->footer_buffer_ = buffer;
+ const auto data = self->footer_buffer_->data();
+ const auto size = self->footer_buffer_->size();
+ if (!internal::VerifyFlatbuffers(data, size)) {
+ return Status::IOError("Verification of flatbuffer-encoded Footer failed.");
+ }
+ self->footer_ = flatbuf::GetFooter(data);
+
+ auto fb_metadata = self->footer_->custom_metadata();
+ if (fb_metadata != nullptr) {
+ std::shared_ptr md;
+ RETURN_NOT_OK(internal::GetKeyValueMetadata(fb_metadata, &md));
+ self->metadata_ = std::move(md); // const-ify
+ }
+ return Status::OK();
+ });
}
int num_dictionaries() const {
@@ -1194,6 +1303,92 @@ Result> RecordBatchFileReader::Open(
return result;
}
+Future> RecordBatchFileReader::OpenAsync(
+ const std::shared_ptr& file, const IpcReadOptions& options) {
+ ARROW_ASSIGN_OR_RAISE(int64_t footer_offset, file->GetSize());
+ return OpenAsync(std::move(file), footer_offset, options);
+}
+
+Future> RecordBatchFileReader::OpenAsync(
+ io::RandomAccessFile* file, const IpcReadOptions& options) {
+ ARROW_ASSIGN_OR_RAISE(int64_t footer_offset, file->GetSize());
+ return OpenAsync(file, footer_offset, options);
+}
+
+Future> RecordBatchFileReader::OpenAsync(
+ const std::shared_ptr& file, int64_t footer_offset,
+ const IpcReadOptions& options) {
+ auto result = std::make_shared();
+ return result->OpenAsync(file, footer_offset, options)
+ .Then(
+ [=](...) -> Result> { return result; });
+}
+
+Future> RecordBatchFileReader::OpenAsync(
+ io::RandomAccessFile* file, int64_t footer_offset, const IpcReadOptions& options) {
+ auto result = std::make_shared();
+ return result->OpenAsync(file, footer_offset, options)
+ .Then(
+ [=](...) -> Result> { return result; });
+}
+
+Future IpcFileRecordBatchGenerator::operator()() {
+ auto state = state_;
+ if (!read_dictionaries_.is_valid()) {
+ std::vector>> messages(state->num_dictionaries());
+ for (int i = 0; i < state->num_dictionaries(); i++) {
+ auto block = FileBlockFromFlatbuffer(state->footer_->dictionaries()->Get(i));
+ messages[i] = ReadMessageFromBlockAsync(block, state->file_, io_context_);
+ }
+ auto read_messages = All(std::move(messages));
+ if (executor_) read_messages = executor_->Transfer(read_messages);
+ read_dictionaries_ = read_messages.Then(
+ [=](const std::vector>> maybe_messages)
+ -> Result> {
+ std::vector> messages(state->num_dictionaries());
+ for (size_t i = 0; i < messages.size(); i++) {
+ ARROW_ASSIGN_OR_RAISE(messages[i], maybe_messages[i]);
+ }
+ return ReadDictionaries(state.get(), std::move(messages));
+ });
+ }
+ if (index_ >= state_->num_record_batches()) {
+ return Future
- ::MakeFinished(IterationTraits
- ::End());
+ }
+ auto block = FileBlockFromFlatbuffer(state->footer_->recordBatches()->Get(index_++));
+ auto read_message = ReadMessageFromBlockAsync(block, state->file_, io_context_);
+ std::vector>> dependencies{read_dictionaries_,
+ std::move(read_message)};
+ auto read_messages = All(dependencies);
+ if (executor_) read_messages = executor_->Transfer(read_messages);
+ return read_messages.Then(
+ [=](const std::vector>> maybe_messages)
+ -> Result
- {
+ RETURN_NOT_OK(maybe_messages[0]); // Make sure dictionaries were read
+ ARROW_ASSIGN_OR_RAISE(auto message, maybe_messages[1]);
+ return ReadRecordBatch(state.get(), message.get());
+ });
+}
+
+Result> IpcFileRecordBatchGenerator::ReadDictionaries(
+ RecordBatchFileReaderImpl* state,
+ std::vector> dictionary_messages) {
+ IpcReadContext context(&state->dictionary_memo_, state->options_, state->swap_endian_);
+ for (const auto& message : dictionary_messages) {
+ RETURN_NOT_OK(ReadOneDictionary(message.get(), context));
+ }
+ return nullptr;
+}
+
+Result> IpcFileRecordBatchGenerator::ReadRecordBatch(
+ RecordBatchFileReaderImpl* state, Message* message) {
+ CHECK_HAS_BODY(*message);
+ ARROW_ASSIGN_OR_RAISE(auto reader, Buffer::GetReader(message->body()));
+ IpcReadContext context(&state->dictionary_memo_, state->options_, state->swap_endian_);
+ return ReadRecordBatchInternal(*message->metadata(), state->schema_,
+ state->field_inclusion_mask_, context, reader.get());
+}
+
Status Listener::OnEOS() { return Status::OK(); }
Status Listener::OnSchemaDecoded(std::shared_ptr schema) { return Status::OK(); }
diff --git a/cpp/src/arrow/ipc/reader.h b/cpp/src/arrow/ipc/reader.h
index 38f7f2ed8b9..df19f58f217 100644
--- a/cpp/src/arrow/ipc/reader.h
+++ b/cpp/src/arrow/ipc/reader.h
@@ -31,6 +31,7 @@
#include "arrow/record_batch.h"
#include "arrow/result.h"
#include "arrow/type_fwd.h"
+#include "arrow/util/async_generator.h"
#include "arrow/util/macros.h"
#include "arrow/util/visibility.h"
@@ -99,7 +100,8 @@ class ARROW_EXPORT RecordBatchStreamReader : public RecordBatchReader {
};
/// \brief Reads the record batch file format
-class ARROW_EXPORT RecordBatchFileReader {
+class ARROW_EXPORT RecordBatchFileReader
+ : public std::enable_shared_from_this {
public:
virtual ~RecordBatchFileReader() = default;
@@ -147,6 +149,26 @@ class ARROW_EXPORT RecordBatchFileReader {
const std::shared_ptr& file, int64_t footer_offset,
const IpcReadOptions& options = IpcReadOptions::Defaults());
+ /// \brief Open a file asynchronously (owns the file).
+ static Future> OpenAsync(
+ const std::shared_ptr& file,
+ const IpcReadOptions& options = IpcReadOptions::Defaults());
+
+ /// \brief Open a file asynchronously (borrows the file).
+ static Future> OpenAsync(
+ io::RandomAccessFile* file,
+ const IpcReadOptions& options = IpcReadOptions::Defaults());
+
+ /// \brief Open a file asynchronously (owns the file).
+ static Future> OpenAsync(
+ const std::shared_ptr& file, int64_t footer_offset,
+ const IpcReadOptions& options = IpcReadOptions::Defaults());
+
+ /// \brief Open a file asynchronously (borrows the file).
+ static Future> OpenAsync(
+ io::RandomAccessFile* file, int64_t footer_offset,
+ const IpcReadOptions& options = IpcReadOptions::Defaults());
+
/// \brief The schema read from the file
virtual std::shared_ptr schema() const = 0;
@@ -172,6 +194,17 @@ class ARROW_EXPORT RecordBatchFileReader {
/// \brief Computes the total number of rows in the file.
virtual Result CountRows() = 0;
+
+ /// \brief Get a reentrant generator of record batches.
+ ///
+ /// \param[in] io_context The IOContext to use (controls which thread pool
+ /// is used for I/O).
+ /// \param[in] executor Optionally, an executor to use for decoding record
+ /// batches. This is generally only a benefit for very wide and/or
+ /// compressed batches.
+ virtual Result>> GetRecordBatchGenerator(
+ const io::IOContext& io_context = io::default_io_context(),
+ arrow::internal::Executor* executor = NULLPTR) = 0;
};
/// \brief A general listener class to receive events.
diff --git a/cpp/src/arrow/memory_pool.cc b/cpp/src/arrow/memory_pool.cc
index 63e3cb93a25..dbd6ae422b9 100644
--- a/cpp/src/arrow/memory_pool.cc
+++ b/cpp/src/arrow/memory_pool.cc
@@ -28,12 +28,14 @@
#include
#endif
+#include "arrow/io/util_internal.h"
#include "arrow/result.h"
#include "arrow/status.h"
#include "arrow/util/io_util.h"
#include "arrow/util/logging.h" // IWYU pragma: keep
#include "arrow/util/optional.h"
#include "arrow/util/string.h"
+#include "arrow/util/thread_pool.h"
#ifdef __GLIBC__
#include
@@ -502,6 +504,28 @@ static JemallocMemoryPool jemalloc_pool;
static MimallocMemoryPool mimalloc_pool;
#endif
+/// Force shutdown of thread pools before destruction of memory pools.
+///
+/// If the program completes quickly enough, we may run the memory pool
+/// destructor before/concurrently with a thread pool task destructor; if the
+/// task holds a reference to a Buffer anywhere, when the Buffer goes to call
+/// Free, we'll get a "pure virtual method called" error.
+///
+/// By declaring this after the static memory pools, we can force shutdown of the
+/// thread pools first. This is only a workaround, and not a general solution
+/// (there could be other resources for which destruction order matters).
+#ifndef _WIN32
+class ShutdownThreadPools {
+ public:
+ ~ShutdownThreadPools() {
+ ARROW_UNUSED(io::internal::GetIOThreadPool()->Shutdown(true));
+ ARROW_UNUSED(internal::GetCpuThreadPool()->Shutdown(true));
+ }
+};
+
+static ShutdownThreadPools handle;
+#endif
+
MemoryPool* system_memory_pool() { return &system_pool; }
Status jemalloc_memory_pool(MemoryPool** out) {
From 19478e800c53d6538868a5a3cf00e77ad444bb45 Mon Sep 17 00:00:00 2001
From: Antoine Pitrou
Date: Tue, 6 Apr 2021 14:47:23 +0200
Subject: [PATCH 2/3] Only delete memory in ~PoolBuffer if not finalizing pools
---
cpp/src/arrow/buffer.cc | 107 ----------------------
cpp/src/arrow/memory_pool.cc | 167 ++++++++++++++++++++++++++++-------
2 files changed, 134 insertions(+), 140 deletions(-)
diff --git a/cpp/src/arrow/buffer.cc b/cpp/src/arrow/buffer.cc
index 8275542c542..b1b2945d0f5 100644
--- a/cpp/src/arrow/buffer.cc
+++ b/cpp/src/arrow/buffer.cc
@@ -21,7 +21,6 @@
#include
#include
-#include "arrow/memory_pool.h"
#include "arrow/result.h"
#include "arrow/status.h"
#include "arrow/util/bit_util.h"
@@ -171,112 +170,6 @@ MutableBuffer::MutableBuffer(const std::shared_ptr& parent, const int64_
parent_ = parent;
}
-// -----------------------------------------------------------------------
-// Pool buffer and allocation
-
-/// A Buffer whose lifetime is tied to a particular MemoryPool
-class PoolBuffer final : public ResizableBuffer {
- public:
- explicit PoolBuffer(std::shared_ptr mm, MemoryPool* pool)
- : ResizableBuffer(nullptr, 0, std::move(mm)), pool_(pool) {}
-
- ~PoolBuffer() override {
- uint8_t* ptr = mutable_data();
- if (ptr) {
- pool_->Free(ptr, capacity_);
- }
- }
-
- Status Reserve(const int64_t capacity) override {
- if (capacity < 0) {
- return Status::Invalid("Negative buffer capacity: ", capacity);
- }
- uint8_t* ptr = mutable_data();
- if (!ptr || capacity > capacity_) {
- int64_t new_capacity = BitUtil::RoundUpToMultipleOf64(capacity);
- if (ptr) {
- RETURN_NOT_OK(pool_->Reallocate(capacity_, new_capacity, &ptr));
- } else {
- RETURN_NOT_OK(pool_->Allocate(new_capacity, &ptr));
- }
- data_ = ptr;
- capacity_ = new_capacity;
- }
- return Status::OK();
- }
-
- Status Resize(const int64_t new_size, bool shrink_to_fit = true) override {
- if (ARROW_PREDICT_FALSE(new_size < 0)) {
- return Status::Invalid("Negative buffer resize: ", new_size);
- }
- uint8_t* ptr = mutable_data();
- if (ptr && shrink_to_fit && new_size <= size_) {
- // Buffer is non-null and is not growing, so shrink to the requested size without
- // excess space.
- int64_t new_capacity = BitUtil::RoundUpToMultipleOf64(new_size);
- if (capacity_ != new_capacity) {
- // Buffer hasn't got yet the requested size.
- RETURN_NOT_OK(pool_->Reallocate(capacity_, new_capacity, &ptr));
- data_ = ptr;
- capacity_ = new_capacity;
- }
- } else {
- RETURN_NOT_OK(Reserve(new_size));
- }
- size_ = new_size;
-
- return Status::OK();
- }
-
- static std::shared_ptr MakeShared(MemoryPool* pool) {
- std::shared_ptr mm;
- if (pool == nullptr) {
- pool = default_memory_pool();
- mm = default_cpu_memory_manager();
- } else {
- mm = CPUDevice::memory_manager(pool);
- }
- return std::make_shared(std::move(mm), pool);
- }
-
- static std::unique_ptr MakeUnique(MemoryPool* pool) {
- std::shared_ptr mm;
- if (pool == nullptr) {
- pool = default_memory_pool();
- mm = default_cpu_memory_manager();
- } else {
- mm = CPUDevice::memory_manager(pool);
- }
- return std::unique_ptr(new PoolBuffer(std::move(mm), pool));
- }
-
- private:
- MemoryPool* pool_;
-};
-
-namespace {
-// A utility that does most of the work of the `AllocateBuffer` and
-// `AllocateResizableBuffer` methods. The argument `buffer` should be a smart pointer to
-// a PoolBuffer.
-template
-inline Result ResizePoolBuffer(PoolBufferPtr&& buffer, const int64_t size) {
- RETURN_NOT_OK(buffer->Resize(size));
- buffer->ZeroPadding();
- return std::move(buffer);
-}
-
-} // namespace
-
-Result> AllocateBuffer(const int64_t size, MemoryPool* pool) {
- return ResizePoolBuffer>(PoolBuffer::MakeUnique(pool), size);
-}
-
-Result> AllocateResizableBuffer(const int64_t size,
- MemoryPool* pool) {
- return ResizePoolBuffer>(PoolBuffer::MakeUnique(pool),
- size);
-}
-
Result> AllocateBitmap(int64_t length, MemoryPool* pool) {
ARROW_ASSIGN_OR_RAISE(auto buf, AllocateBuffer(BitUtil::BytesForBits(length), pool));
// Zero out any trailing bits
diff --git a/cpp/src/arrow/memory_pool.cc b/cpp/src/arrow/memory_pool.cc
index dbd6ae422b9..80a05f80c69 100644
--- a/cpp/src/arrow/memory_pool.cc
+++ b/cpp/src/arrow/memory_pool.cc
@@ -18,6 +18,7 @@
#include "arrow/memory_pool.h"
#include // IWYU pragma: keep
+#include
#include // IWYU pragma: keep
#include // IWYU pragma: keep
#include // IWYU pragma: keep
@@ -28,9 +29,11 @@
#include
#endif
+#include "arrow/buffer.h"
#include "arrow/io/util_internal.h"
#include "arrow/result.h"
#include "arrow/status.h"
+#include "arrow/util/bit_util.h"
#include "arrow/util/io_util.h"
#include "arrow/util/logging.h" // IWYU pragma: keep
#include "arrow/util/optional.h"
@@ -496,41 +499,27 @@ std::unique_ptr MemoryPool::CreateDefault() {
}
}
-static SystemMemoryPool system_pool;
-#ifdef ARROW_JEMALLOC
-static JemallocMemoryPool jemalloc_pool;
-#endif
-#ifdef ARROW_MIMALLOC
-static MimallocMemoryPool mimalloc_pool;
-#endif
+static struct GlobalState {
+ ~GlobalState() { finalizing.store(true, std::memory_order_relaxed); }
-/// Force shutdown of thread pools before destruction of memory pools.
-///
-/// If the program completes quickly enough, we may run the memory pool
-/// destructor before/concurrently with a thread pool task destructor; if the
-/// task holds a reference to a Buffer anywhere, when the Buffer goes to call
-/// Free, we'll get a "pure virtual method called" error.
-///
-/// By declaring this after the static memory pools, we can force shutdown of the
-/// thread pools first. This is only a workaround, and not a general solution
-/// (there could be other resources for which destruction order matters).
-#ifndef _WIN32
-class ShutdownThreadPools {
- public:
- ~ShutdownThreadPools() {
- ARROW_UNUSED(io::internal::GetIOThreadPool()->Shutdown(true));
- ARROW_UNUSED(internal::GetCpuThreadPool()->Shutdown(true));
- }
-};
+ bool is_finalizing() const { return finalizing.load(std::memory_order_relaxed); }
-static ShutdownThreadPools handle;
-#endif
+ std::atomic finalizing{false}; // constructed first, destroyed last
+
+ SystemMemoryPool system_pool;
+ #ifdef ARROW_JEMALLOC
+ JemallocMemoryPool jemalloc_pool;
+ #endif
+ #ifdef ARROW_MIMALLOC
+ MimallocMemoryPool mimalloc_pool;
+ #endif
+} global_state;
-MemoryPool* system_memory_pool() { return &system_pool; }
+MemoryPool* system_memory_pool() { return &global_state.system_pool; }
Status jemalloc_memory_pool(MemoryPool** out) {
#ifdef ARROW_JEMALLOC
- *out = &jemalloc_pool;
+ *out = &global_state.jemalloc_pool;
return Status::OK();
#else
return Status::NotImplemented("This Arrow build does not enable jemalloc");
@@ -539,7 +528,7 @@ Status jemalloc_memory_pool(MemoryPool** out) {
Status mimalloc_memory_pool(MemoryPool** out) {
#ifdef ARROW_MIMALLOC
- *out = &mimalloc_pool;
+ *out = &global_state.mimalloc_pool;
return Status::OK();
#else
return Status::NotImplemented("This Arrow build does not enable mimalloc");
@@ -550,14 +539,14 @@ MemoryPool* default_memory_pool() {
auto backend = DefaultBackend();
switch (backend) {
case MemoryPoolBackend::System:
- return &system_pool;
+ return &global_state.system_pool;
#ifdef ARROW_JEMALLOC
case MemoryPoolBackend::Jemalloc:
- return &jemalloc_pool;
+ return &global_state.jemalloc_pool;
#endif
#ifdef ARROW_MIMALLOC
case MemoryPoolBackend::Mimalloc:
- return &mimalloc_pool;
+ return &global_state.mimalloc_pool;
#endif
default:
ARROW_LOG(FATAL) << "Internal error: cannot create default memory pool";
@@ -693,4 +682,116 @@ std::vector SupportedMemoryBackendNames() {
return supported;
}
+// -----------------------------------------------------------------------
+// Pool buffer and allocation
+
+/// A Buffer whose lifetime is tied to a particular MemoryPool
+class PoolBuffer final : public ResizableBuffer {
+ public:
+ explicit PoolBuffer(std::shared_ptr mm, MemoryPool* pool)
+ : ResizableBuffer(nullptr, 0, std::move(mm)), pool_(pool) {}
+
+ ~PoolBuffer() override {
+ // Avoid calling pool_->Free if the global pools are destroyed
+ // (XXX this will not work with user-defined pools)
+
+ // This can happen if a Future is destructing on one thread while or
+ // after memory pools are destructed on the main thread (as there is
+ // no guarantee of destructor order between thread/memory pools)
+ uint8_t* ptr = mutable_data();
+ if (ptr && !global_state.is_finalizing()) {
+ pool_->Free(ptr, capacity_);
+ }
+ }
+
+ Status Reserve(const int64_t capacity) override {
+ if (capacity < 0) {
+ return Status::Invalid("Negative buffer capacity: ", capacity);
+ }
+ uint8_t* ptr = mutable_data();
+ if (!ptr || capacity > capacity_) {
+ int64_t new_capacity = BitUtil::RoundUpToMultipleOf64(capacity);
+ if (ptr) {
+ RETURN_NOT_OK(pool_->Reallocate(capacity_, new_capacity, &ptr));
+ } else {
+ RETURN_NOT_OK(pool_->Allocate(new_capacity, &ptr));
+ }
+ data_ = ptr;
+ capacity_ = new_capacity;
+ }
+ return Status::OK();
+ }
+
+ Status Resize(const int64_t new_size, bool shrink_to_fit = true) override {
+ if (ARROW_PREDICT_FALSE(new_size < 0)) {
+ return Status::Invalid("Negative buffer resize: ", new_size);
+ }
+ uint8_t* ptr = mutable_data();
+ if (ptr && shrink_to_fit && new_size <= size_) {
+ // Buffer is non-null and is not growing, so shrink to the requested size without
+ // excess space.
+ int64_t new_capacity = BitUtil::RoundUpToMultipleOf64(new_size);
+ if (capacity_ != new_capacity) {
+ // Buffer hasn't got yet the requested size.
+ RETURN_NOT_OK(pool_->Reallocate(capacity_, new_capacity, &ptr));
+ data_ = ptr;
+ capacity_ = new_capacity;
+ }
+ } else {
+ RETURN_NOT_OK(Reserve(new_size));
+ }
+ size_ = new_size;
+
+ return Status::OK();
+ }
+
+ static std::shared_ptr MakeShared(MemoryPool* pool) {
+ std::shared_ptr mm;
+ if (pool == nullptr) {
+ pool = default_memory_pool();
+ mm = default_cpu_memory_manager();
+ } else {
+ mm = CPUDevice::memory_manager(pool);
+ }
+ return std::make_shared(std::move(mm), pool);
+ }
+
+ static std::unique_ptr MakeUnique(MemoryPool* pool) {
+ std::shared_ptr mm;
+ if (pool == nullptr) {
+ pool = default_memory_pool();
+ mm = default_cpu_memory_manager();
+ } else {
+ mm = CPUDevice::memory_manager(pool);
+ }
+ return std::unique_ptr(new PoolBuffer(std::move(mm), pool));
+ }
+
+ private:
+ MemoryPool* pool_;
+};
+
+namespace {
+// A utility that does most of the work of the `AllocateBuffer` and
+// `AllocateResizableBuffer` methods. The argument `buffer` should be a smart pointer to
+// a PoolBuffer.
+template
+inline Result ResizePoolBuffer(PoolBufferPtr&& buffer, const int64_t size) {
+ RETURN_NOT_OK(buffer->Resize(size));
+ buffer->ZeroPadding();
+ return std::move(buffer);
+}
+
+} // namespace
+
+Result> AllocateBuffer(const int64_t size, MemoryPool* pool) {
+ return ResizePoolBuffer>(PoolBuffer::MakeUnique(pool), size);
+}
+
+Result> AllocateResizableBuffer(const int64_t size,
+ MemoryPool* pool) {
+ return ResizePoolBuffer>(PoolBuffer::MakeUnique(pool),
+ size);
+}
+
} // namespace arrow
From c29d946e21d248d43e2a2d000cfbcc76abaec038 Mon Sep 17 00:00:00 2001
From: David Li
Date: Tue, 6 Apr 2021 09:35:49 -0400
Subject: [PATCH 3/3] ARROW-11772: [C++] Implement ScanBatchesAsync
---
cpp/src/arrow/dataset/file_ipc.cc | 104 ++++++++++++++-----
cpp/src/arrow/dataset/file_ipc.h | 8 ++
cpp/src/arrow/io/memory.cc | 4 +-
cpp/src/arrow/io/type_fwd.h | 1 +
cpp/src/arrow/ipc/read_write_benchmark.cc | 16 +--
cpp/src/arrow/ipc/read_write_test.cc | 33 +------
cpp/src/arrow/ipc/reader.cc | 115 ++++++++++++++++------
cpp/src/arrow/ipc/reader.h | 5 +
cpp/src/arrow/memory_pool.cc | 14 +--
cpp/src/arrow/testing/future_util.h | 11 +++
10 files changed, 210 insertions(+), 101 deletions(-)
diff --git a/cpp/src/arrow/dataset/file_ipc.cc b/cpp/src/arrow/dataset/file_ipc.cc
index a60e31bf7d2..a8863ee0775 100644
--- a/cpp/src/arrow/dataset/file_ipc.cc
+++ b/cpp/src/arrow/dataset/file_ipc.cc
@@ -59,6 +59,21 @@ static inline Result> OpenReader(
return reader;
}
+static inline Future> OpenReaderAsync(
+ const FileSource& source,
+ const ipc::IpcReadOptions& options = default_read_options()) {
+ ARROW_ASSIGN_OR_RAISE(auto input, source.Open());
+ auto path = source.path();
+ return ipc::RecordBatchFileReader::OpenAsync(std::move(input), options)
+ .Then([](const std::shared_ptr& reader)
+ -> Result> { return reader; },
+ [path](const Status& status)
+ -> Result> {
+ return status.WithMessage("Could not open IPC input source '", path,
+ "': ", status.message());
+ });
+}
+
static inline Result> GetIncludedFields(
const Schema& schema, const std::vector& materialized_fields) {
std::vector included_fields;
@@ -73,6 +88,26 @@ static inline Result> GetIncludedFields(
return included_fields;
}
+static inline Result GetReadOptions(
+ const Schema& schema, const FileFormat& format, const ScanOptions& scan_options) {
+ ARROW_ASSIGN_OR_RAISE(
+ auto ipc_scan_options,
+ GetFragmentScanOptions(
+ kIpcTypeName, &scan_options, format.default_fragment_scan_options));
+ auto options =
+ ipc_scan_options->options ? *ipc_scan_options->options : default_read_options();
+ options.memory_pool = scan_options.pool;
+ if (!options.included_fields.empty()) {
+ // Cannot set them here
+ ARROW_LOG(WARNING) << "IpcFragmentScanOptions.options->included_fields was set "
+ "but will be ignored; included_fields are derived from "
+ "fields referenced by the scan";
+ }
+ ARROW_ASSIGN_OR_RAISE(options.included_fields,
+ GetIncludedFields(schema, scan_options.MaterializedFields()));
+ return options;
+}
+
/// \brief A ScanTask backed by an Ipc file.
class IpcScanTask : public ScanTask {
public:
@@ -83,28 +118,11 @@ class IpcScanTask : public ScanTask {
Result Execute() override {
struct Impl {
static Result Make(const FileSource& source,
- FileFormat* format,
- const ScanOptions* scan_options) {
+ const FileFormat& format,
+ const ScanOptions& scan_options) {
ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source));
-
- ARROW_ASSIGN_OR_RAISE(
- auto ipc_scan_options,
- GetFragmentScanOptions(
- kIpcTypeName, scan_options, format->default_fragment_scan_options));
- auto options = ipc_scan_options->options ? *ipc_scan_options->options
- : default_read_options();
- options.memory_pool = scan_options->pool;
- options.use_threads = false;
- if (!options.included_fields.empty()) {
- // Cannot set them here
- ARROW_LOG(WARNING) << "IpcFragmentScanOptions.options->included_fields was set "
- "but will be ignored; included_fields are derived from "
- "fields referenced by the scan";
- }
- ARROW_ASSIGN_OR_RAISE(
- options.included_fields,
- GetIncludedFields(*reader->schema(), scan_options->MaterializedFields()));
-
+ ARROW_ASSIGN_OR_RAISE(auto options,
+ GetReadOptions(*reader->schema(), format, scan_options));
ARROW_ASSIGN_OR_RAISE(reader, OpenReader(source, options));
return RecordBatchIterator(Impl{std::move(reader), 0});
}
@@ -121,9 +139,9 @@ class IpcScanTask : public ScanTask {
int i_;
};
- return Impl::Make(
- source_, internal::checked_pointer_cast(fragment_)->format().get(),
- options_.get());
+ return Impl::Make(source_,
+ *internal::checked_pointer_cast(fragment_)->format(),
+ *options_);
}
private:
@@ -173,6 +191,44 @@ Result IpcFileFormat::ScanFile(
return IpcScanTaskIterator::Make(options, fragment);
}
+Result IpcFileFormat::ScanBatchesAsync(
+ const std::shared_ptr& options,
+ const std::shared_ptr& file) const {
+ auto self = shared_from_this();
+ auto source = file->source();
+ auto open_reader = OpenReaderAsync(source);
+ auto reopen_reader = [self, options,
+ source](std::shared_ptr reader)
+ -> Future> {
+ ARROW_ASSIGN_OR_RAISE(auto options,
+ GetReadOptions(*reader->schema(), *self, *options));
+ return OpenReader(source, options);
+ };
+ auto readahead_level = options->batch_readahead;
+ auto default_fragment_scan_options = this->default_fragment_scan_options;
+ auto open_generator = [=](const std::shared_ptr& reader)
+ -> Result {
+ ARROW_ASSIGN_OR_RAISE(
+ auto ipc_scan_options,
+ GetFragmentScanOptions(kIpcTypeName, options.get(),
+ default_fragment_scan_options));
+
+ RecordBatchGenerator generator;
+ if (ipc_scan_options->cache_options) {
+ // Transferring helps performance when coalescing
+ ARROW_ASSIGN_OR_RAISE(
+ generator, reader->GetRecordBatchGenerator(
+ /*coalesce=*/true, options->io_context,
+ *ipc_scan_options->cache_options, internal::GetCpuThreadPool()));
+ } else {
+ ARROW_ASSIGN_OR_RAISE(generator, reader->GetRecordBatchGenerator(
+ /*coalesce=*/false, options->io_context));
+ }
+ return MakeReadaheadGenerator(std::move(generator), readahead_level);
+ };
+ return MakeFromFuture(open_reader.Then(reopen_reader).Then(open_generator));
+}
+
Future> IpcFileFormat::CountRows(
const std::shared_ptr& file, compute::Expression predicate,
std::shared_ptr options) {
diff --git a/cpp/src/arrow/dataset/file_ipc.h b/cpp/src/arrow/dataset/file_ipc.h
index d1c16a93cf4..3888de027c5 100644
--- a/cpp/src/arrow/dataset/file_ipc.h
+++ b/cpp/src/arrow/dataset/file_ipc.h
@@ -25,6 +25,7 @@
#include "arrow/dataset/file_base.h"
#include "arrow/dataset/type_fwd.h"
#include "arrow/dataset/visibility.h"
+#include "arrow/io/type_fwd.h"
#include "arrow/ipc/type_fwd.h"
#include "arrow/result.h"
@@ -56,6 +57,10 @@ class ARROW_DS_EXPORT IpcFileFormat : public FileFormat {
const std::shared_ptr& options,
const std::shared_ptr& fragment) const override;
+ Result ScanBatchesAsync(
+ const std::shared_ptr& options,
+ const std::shared_ptr& file) const override;
+
Future> CountRows(
const std::shared_ptr& file, compute::Expression predicate,
std::shared_ptr options) override;
@@ -75,6 +80,9 @@ class ARROW_DS_EXPORT IpcFragmentScanOptions : public FragmentScanOptions {
/// Options passed to the IPC file reader.
/// included_fields, memory_pool, and use_threads are ignored.
std::shared_ptr options;
+ /// If present, the async scanner will enable I/O coalescing.
+ /// This is ignored by the sync scanner.
+ std::shared_ptr cache_options;
};
class ARROW_DS_EXPORT IpcFileWriteOptions : public FileWriteOptions {
diff --git a/cpp/src/arrow/io/memory.cc b/cpp/src/arrow/io/memory.cc
index a953c8f28a7..7d111183635 100644
--- a/cpp/src/arrow/io/memory.cc
+++ b/cpp/src/arrow/io/memory.cc
@@ -344,8 +344,8 @@ Result> BufferReader::DoReadAt(int64_t position, int64_t
DCHECK_GE(nbytes, 0);
// Arrange for data to be paged in
- RETURN_NOT_OK(::arrow::internal::MemoryAdviseWillNeed(
- {{const_cast(data_ + position), static_cast(nbytes)}}));
+ // RETURN_NOT_OK(::arrow::internal::MemoryAdviseWillNeed(
+ // {{const_cast(data_ + position), static_cast(nbytes)}}));
if (nbytes > 0 && buffer_ != nullptr) {
return SliceBuffer(buffer_, position, nbytes);
diff --git a/cpp/src/arrow/io/type_fwd.h b/cpp/src/arrow/io/type_fwd.h
index 041b825c988..d8208d39d60 100644
--- a/cpp/src/arrow/io/type_fwd.h
+++ b/cpp/src/arrow/io/type_fwd.h
@@ -27,6 +27,7 @@ struct FileMode {
};
struct IOContext;
+struct CacheOptions;
/// EXPERIMENTAL: convenience global singleton for default IOContext settings
ARROW_EXPORT
diff --git a/cpp/src/arrow/ipc/read_write_benchmark.cc b/cpp/src/arrow/ipc/read_write_benchmark.cc
index bc032f451ac..f5cc857acb0 100644
--- a/cpp/src/arrow/ipc/read_write_benchmark.cc
+++ b/cpp/src/arrow/ipc/read_write_benchmark.cc
@@ -160,14 +160,14 @@ static void DecodeStream(benchmark::State& state) { // NOLINT non-const referen
}
#define GENERATE_COMPRESSED_DATA_IN_MEMORY() \
- constexpr int64_t kTotalSize = 1 << 20; /* 1 MB */ \
+ constexpr int64_t kBatchSize = 1 << 20; /* 1 MB */ \
constexpr int64_t kBatches = 16; \
auto options = ipc::IpcWriteOptions::Defaults(); \
ASSIGN_OR_ABORT(options.codec, \
arrow::util::Codec::Create(arrow::Compression::type::ZSTD)); \
std::shared_ptr buffer = *AllocateResizableBuffer(1024); \
{ \
- auto record_batch = MakeRecordBatch(kTotalSize, state.range(0)); \
+ auto record_batch = MakeRecordBatch(kBatchSize, state.range(0)); \
io::BufferOutputStream stream(buffer); \
auto writer = *ipc::MakeFileWriter(&stream, record_batch->schema(), options); \
for (int i = 0; i < kBatches; i++) { \
@@ -178,12 +178,12 @@ static void DecodeStream(benchmark::State& state) { // NOLINT non-const referen
}
#define GENERATE_DATA_IN_MEMORY() \
- constexpr int64_t kTotalSize = 1 << 20; /* 1 MB */ \
+ constexpr int64_t kBatchSize = 1 << 20; /* 1 MB */ \
constexpr int64_t kBatches = 1; \
auto options = ipc::IpcWriteOptions::Defaults(); \
std::shared_ptr buffer = *AllocateResizableBuffer(1024); \
{ \
- auto record_batch = MakeRecordBatch(kTotalSize, state.range(0)); \
+ auto record_batch = MakeRecordBatch(kBatchSize, state.range(0)); \
io::BufferOutputStream stream(buffer); \
auto writer = *ipc::MakeFileWriter(&stream, record_batch->schema(), options); \
ABORT_NOT_OK(writer->WriteRecordBatch(*record_batch)); \
@@ -192,12 +192,12 @@ static void DecodeStream(benchmark::State& state) { // NOLINT non-const referen
}
#define GENERATE_DATA_TEMP_FILE() \
- constexpr int64_t kTotalSize = 1 << 20; /* 1 MB */ \
+ constexpr int64_t kBatchSize = 1 << 20; /* 1 MB */ \
constexpr int64_t kBatches = 16; \
auto options = ipc::IpcWriteOptions::Defaults(); \
ASSIGN_OR_ABORT(auto sink, io::FileOutputStream::Open("/tmp/benchmark.arrow")); \
{ \
- auto record_batch = MakeRecordBatch(kTotalSize, state.range(0)); \
+ auto record_batch = MakeRecordBatch(kBatchSize, state.range(0)); \
auto writer = *ipc::MakeFileWriter(sink, record_batch->schema(), options); \
ABORT_NOT_OK(writer->WriteRecordBatch(*record_batch)); \
ABORT_NOT_OK(writer->Close()); \
@@ -223,7 +223,7 @@ static void DecodeStream(benchmark::State& state) { // NOLINT non-const referen
auto batch = *reader->ReadRecordBatch(i); \
} \
} \
- state.SetBytesProcessed(int64_t(state.iterations()) * kTotalSize * kBatches); \
+ state.SetBytesProcessed(int64_t(state.iterations()) * kBatchSize * kBatches); \
} \
BENCHMARK(NAME)->RangeMultiplier(4)->Range(1, 1 << 13)->UseRealTime();
@@ -240,7 +240,7 @@ static void DecodeStream(benchmark::State& state) { // NOLINT non-const referen
auto batch = *generator().result(); \
} \
} \
- state.SetBytesProcessed(int64_t(state.iterations()) * kTotalSize * kBatches); \
+ state.SetBytesProcessed(int64_t(state.iterations()) * kBatchSize * kBatches); \
} \
BENCHMARK(NAME##Async)->RangeMultiplier(4)->Range(1, 1 << 13)->UseRealTime();
diff --git a/cpp/src/arrow/ipc/read_write_test.cc b/cpp/src/arrow/ipc/read_write_test.cc
index 7a75a48aaeb..9f8d69d2537 100644
--- a/cpp/src/arrow/ipc/read_write_test.cc
+++ b/cpp/src/arrow/ipc/read_write_test.cc
@@ -964,24 +964,6 @@ struct FileWriterHelper {
return Status::OK();
}
- virtual Status Read(const IpcReadOptions& options, RecordBatchVector* out_batches,
- ReadStats* out_stats = nullptr) {
- auto buf_reader = std::make_shared(buffer_);
- ARROW_ASSIGN_OR_RAISE(auto reader, RecordBatchFileReader::Open(
- buf_reader.get(), footer_offset_, options));
-
- EXPECT_EQ(num_batches_written_, reader->num_record_batches());
- for (int i = 0; i < num_batches_written_; ++i) {
- ARROW_ASSIGN_OR_RAISE(std::shared_ptr chunk,
- reader->ReadRecordBatch(i));
- out_batches->push_back(chunk);
- }
- if (out_stats) {
- *out_stats = reader->stats();
- }
- return Status::OK();
- }
-
Status ReadSchema(std::shared_ptr* out) {
return ReadSchema(ipc::IpcReadOptions::Defaults(), out);
}
@@ -1019,10 +1001,11 @@ struct FileGeneratorWriterHelper : public FileWriterHelper {
{
auto fut =
RecordBatchFileReader::OpenAsync(buf_reader.get(), footer_offset_, options);
- RETURN_NOT_OK(fut.status());
- EXPECT_FINISHES_OK_AND_ASSIGN(auto reader, fut);
+ // Do NOT assert OK since some tests check whether this fails properly
+ EXPECT_FINISHES(fut);
+ ARROW_ASSIGN_OR_RAISE(auto reader, fut.result());
EXPECT_EQ(num_batches_written_, reader->num_record_batches());
- // Generator's lifetime is independent of the reader's
+ // Generator will keep reader alive internally
ARROW_ASSIGN_OR_RAISE(generator, reader->GetRecordBatchGenerator());
}
@@ -1032,8 +1015,7 @@ struct FileGeneratorWriterHelper : public FileWriterHelper {
futures.push_back(generator());
}
auto fut = generator();
- EXPECT_FINISHES_OK_AND_ASSIGN(auto extra_read, fut);
- EXPECT_EQ(nullptr, extra_read);
+ EXPECT_FINISHES_OK_AND_EQ(nullptr, fut);
for (auto& future : futures) {
EXPECT_FINISHES_OK_AND_ASSIGN(auto batch, future);
out_batches->push_back(batch);
@@ -1044,11 +1026,6 @@ struct FileGeneratorWriterHelper : public FileWriterHelper {
return Status::OK();
}
-
- Status Read(const IpcReadOptions& options, RecordBatchVector* out_batches,
- ReadStats* out_stats = nullptr) override {
- return ReadBatches(options, out_batches, out_stats);
- }
};
struct StreamWriterHelper {
diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
index faf0acd3cfd..7c3115b7c3f 100644
--- a/cpp/src/arrow/ipc/reader.cc
+++ b/cpp/src/arrow/ipc/reader.cc
@@ -31,6 +31,7 @@
#include "arrow/array.h"
#include "arrow/buffer.h"
#include "arrow/extension_type.h"
+#include "arrow/io/caching.h"
#include "arrow/io/interfaces.h"
#include "arrow/io/memory.h"
#include "arrow/ipc/message.h"
@@ -1018,30 +1019,33 @@ class ARROW_EXPORT IpcFileRecordBatchGenerator {
public:
using Item = std::shared_ptr;
- explicit IpcFileRecordBatchGenerator(std::shared_ptr state,
- const io::IOContext& io_context,
- arrow::internal::Executor* executor)
+ explicit IpcFileRecordBatchGenerator(
+ std::shared_ptr state,
+ std::shared_ptr cached_source,
+ const io::IOContext& io_context, arrow::internal::Executor* executor)
: state_(std::move(state)),
+ cached_source_(std::move(cached_source)),
io_context_(io_context),
executor_(executor),
index_(0) {}
Future
- operator()();
+ Future> ReadBlock(const FileBlock& block);
- static Result> ReadDictionaries(
+ static Status ReadDictionaries(
RecordBatchFileReaderImpl* state,
std::vector> dictionary_messages);
-
static Result> ReadRecordBatch(
RecordBatchFileReaderImpl* state, Message* message);
private:
std::shared_ptr state_;
+ std::shared_ptr cached_source_;
io::IOContext io_context_;
arrow::internal::Executor* executor_;
int index_;
// Odd Future type, but this lets us use All() easily
- Future> read_dictionaries_;
+ Future<> read_dictionaries_;
};
class RecordBatchFileReaderImpl : public RecordBatchFileReader {
@@ -1130,14 +1134,15 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
footer_offset_ = footer_offset;
auto cpu_executor = ::arrow::internal::GetCpuThreadPool();
auto self = std::dynamic_pointer_cast(shared_from_this());
- return ReadFooterAsync(cpu_executor).Then([self, options](...) -> Status {
- // Get the schema and record any observed dictionaries
- RETURN_NOT_OK(UnpackSchemaMessage(
- self->footer_->schema(), options, &self->dictionary_memo_, &self->schema_,
- &self->out_schema_, &self->field_inclusion_mask_, &self->swap_endian_));
- ++self->stats_.num_messages;
- return Status::OK();
- });
+ return ReadFooterAsync(cpu_executor)
+ .Then([self, options](const detail::Empty&) -> Status {
+ // Get the schema and record any observed dictionaries
+ RETURN_NOT_OK(UnpackSchemaMessage(
+ self->footer_->schema(), options, &self->dictionary_memo_, &self->schema_,
+ &self->out_schema_, &self->field_inclusion_mask_, &self->swap_endian_));
+ ++self->stats_.num_messages;
+ return Status::OK();
+ });
}
std::shared_ptr schema() const override { return out_schema_; }
@@ -1147,9 +1152,32 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
ReadStats stats() const override { return stats_; }
Result>> GetRecordBatchGenerator(
- const io::IOContext& io_context, arrow::internal::Executor* executor) override {
+ const bool coalesce, const io::IOContext& io_context,
+ const io::CacheOptions cache_options,
+ arrow::internal::Executor* executor) override {
auto state = std::dynamic_pointer_cast(shared_from_this());
- return IpcFileRecordBatchGenerator(std::move(state), io_context, executor);
+ std::shared_ptr cached_source;
+ if (coalesce) {
+ if (!owned_file_) return Status::Invalid("Cannot coalesce without an owned file");
+ cached_source = std::make_shared(
+ owned_file_, io_context, cache_options);
+ auto num_dictionaries = this->num_dictionaries();
+ auto num_record_batches = this->num_record_batches();
+ std::vector ranges(num_dictionaries + num_record_batches);
+ for (int i = 0; i < num_dictionaries; i++) {
+ auto block = FileBlockFromFlatbuffer(footer_->dictionaries()->Get(i));
+ ranges[i].offset = block.offset;
+ ranges[i].length = block.metadata_length + block.body_length;
+ }
+ for (int i = 0; i < num_record_batches; i++) {
+ auto block = FileBlockFromFlatbuffer(footer_->recordBatches()->Get(i));
+ ranges[num_dictionaries + i].offset = block.offset;
+ ranges[num_dictionaries + i].length = block.metadata_length + block.body_length;
+ }
+ RETURN_NOT_OK(cached_source->Cache(std::move(ranges)));
+ }
+ return IpcFileRecordBatchGenerator(std::move(state), std::move(cached_source),
+ io_context, executor);
}
private:
@@ -1338,13 +1366,13 @@ Future IpcFileRecordBatchGenerator::operator(
std::vector>> messages(state->num_dictionaries());
for (int i = 0; i < state->num_dictionaries(); i++) {
auto block = FileBlockFromFlatbuffer(state->footer_->dictionaries()->Get(i));
- messages[i] = ReadMessageFromBlockAsync(block, state->file_, io_context_);
+ messages[i] = ReadBlock(block);
}
auto read_messages = All(std::move(messages));
if (executor_) read_messages = executor_->Transfer(read_messages);
read_dictionaries_ = read_messages.Then(
[=](const std::vector>> maybe_messages)
- -> Result> {
+ -> Status {
std::vector> messages(state->num_dictionaries());
for (size_t i = 0; i < messages.size(); i++) {
ARROW_ASSIGN_OR_RAISE(messages[i], maybe_messages[i]);
@@ -1356,28 +1384,51 @@ Future IpcFileRecordBatchGenerator::operator(
return Future
- ::MakeFinished(IterationTraits
- ::End());
}
auto block = FileBlockFromFlatbuffer(state->footer_->recordBatches()->Get(index_++));
- auto read_message = ReadMessageFromBlockAsync(block, state->file_, io_context_);
- std::vector>> dependencies{read_dictionaries_,
- std::move(read_message)};
- auto read_messages = All(dependencies);
- if (executor_) read_messages = executor_->Transfer(read_messages);
- return read_messages.Then(
- [=](const std::vector>> maybe_messages)
- -> Result
- {
- RETURN_NOT_OK(maybe_messages[0]); // Make sure dictionaries were read
- ARROW_ASSIGN_OR_RAISE(auto message, maybe_messages[1]);
- return ReadRecordBatch(state.get(), message.get());
- });
+ auto read_message = ReadBlock(block);
+ auto read_messages = read_dictionaries_.Then(
+ [read_message](const detail::Empty&) { return read_message; });
+ // Force transfer. This may be wasteful in some cases, but ensures we get off the
+ // I/O threads as soon as possible, and ensures we don't decode record batches
+ // synchronously in the case that the message read has already finished.
+ if (executor_) {
+ auto executor = executor_;
+ return read_messages.Then(
+ [=](const std::shared_ptr& message) -> Future
- {
+ return DeferNotOk(executor->Submit(
+ [=]() { return ReadRecordBatch(state.get(), message.get()); }));
+ });
+ }
+ return read_messages.Then([=](const std::shared_ptr& message) -> Result
- {
+ return ReadRecordBatch(state.get(), message.get());
+ });
+}
+
+Future> IpcFileRecordBatchGenerator::ReadBlock(
+ const FileBlock& block) {
+ if (cached_source_) {
+ auto cached_source = cached_source_;
+ io::ReadRange range{block.offset, block.metadata_length + block.body_length};
+ auto pool = state_->options_.memory_pool;
+ return cached_source->WaitFor({range}).Then(
+ [cached_source, pool,
+ range](const detail::Empty&) -> Result> {
+ ARROW_ASSIGN_OR_RAISE(auto buffer, cached_source->Read(range));
+ io::BufferReader stream(std::move(buffer));
+ return ReadMessage(&stream, pool);
+ });
+ } else {
+ return ReadMessageFromBlockAsync(block, state_->file_, io_context_);
+ }
}
-Result> IpcFileRecordBatchGenerator::ReadDictionaries(
+Status IpcFileRecordBatchGenerator::ReadDictionaries(
RecordBatchFileReaderImpl* state,
std::vector> dictionary_messages) {
IpcReadContext context(&state->dictionary_memo_, state->options_, state->swap_endian_);
for (const auto& message : dictionary_messages) {
RETURN_NOT_OK(ReadOneDictionary(message.get(), context));
}
- return nullptr;
+ return Status::OK();
}
Result> IpcFileRecordBatchGenerator::ReadRecordBatch(
diff --git a/cpp/src/arrow/ipc/reader.h b/cpp/src/arrow/ipc/reader.h
index df19f58f217..6f2157557f3 100644
--- a/cpp/src/arrow/ipc/reader.h
+++ b/cpp/src/arrow/ipc/reader.h
@@ -25,6 +25,7 @@
#include
#include
+#include "arrow/io/caching.h"
#include "arrow/io/type_fwd.h"
#include "arrow/ipc/message.h"
#include "arrow/ipc/options.h"
@@ -197,13 +198,17 @@ class ARROW_EXPORT RecordBatchFileReader
/// \brief Get a reentrant generator of record batches.
///
+ /// \param[in] coalesce If true, enable I/O coalescing.
/// \param[in] io_context The IOContext to use (controls which thread pool
/// is used for I/O).
+ /// \param[in] cache_options Options for coalescing (if enabled).
/// \param[in] executor Optionally, an executor to use for decoding record
/// batches. This is generally only a benefit for very wide and/or
/// compressed batches.
virtual Result>> GetRecordBatchGenerator(
+ const bool coalesce = false,
const io::IOContext& io_context = io::default_io_context(),
+ const io::CacheOptions cache_options = io::CacheOptions::LazyDefaults(),
arrow::internal::Executor* executor = NULLPTR) = 0;
};
diff --git a/cpp/src/arrow/memory_pool.cc b/cpp/src/arrow/memory_pool.cc
index 80a05f80c69..c80e8f6f680 100644
--- a/cpp/src/arrow/memory_pool.cc
+++ b/cpp/src/arrow/memory_pool.cc
@@ -19,9 +19,9 @@
#include // IWYU pragma: keep
#include
-#include // IWYU pragma: keep
-#include // IWYU pragma: keep
-#include // IWYU pragma: keep
+#include // IWYU pragma: keep
+#include // IWYU pragma: keep
+#include // IWYU pragma: keep
#include
#include
@@ -507,12 +507,12 @@ static struct GlobalState {
std::atomic finalizing{false}; // constructed first, destroyed last
SystemMemoryPool system_pool;
- #ifdef ARROW_JEMALLOC
+#ifdef ARROW_JEMALLOC
JemallocMemoryPool jemalloc_pool;
- #endif
- #ifdef ARROW_MIMALLOC
+#endif
+#ifdef ARROW_MIMALLOC
MimallocMemoryPool mimalloc_pool;
- #endif
+#endif
} global_state;
MemoryPool* system_memory_pool() { return &global_state.system_pool; }
diff --git a/cpp/src/arrow/testing/future_util.h b/cpp/src/arrow/testing/future_util.h
index 0a20b5f4d57..190e5839bbf 100644
--- a/cpp/src/arrow/testing/future_util.h
+++ b/cpp/src/arrow/testing/future_util.h
@@ -81,10 +81,21 @@
handle_error(future_name.status()); \
EXPECT_OK_AND_ASSIGN(lhs, future_name.result());
+#define EXPECT_FINISHES(expr) \
+ do { \
+ EXPECT_FINISHES_IMPL(expr); \
+ } while (0)
+
#define EXPECT_FINISHES_OK_AND_ASSIGN(lhs, rexpr) \
ON_FINISH_ASSIGN_OR_HANDLE_ERROR_IMPL( \
ARROW_EXPECT_OK, ARROW_ASSIGN_OR_RAISE_NAME(_fut, __COUNTER__), lhs, rexpr);
+#define EXPECT_FINISHES_OK_AND_EQ(expected, expr) \
+ do { \
+ EXPECT_FINISHES_OK_AND_ASSIGN(auto _actual, (expr)); \
+ EXPECT_EQ(expected, _actual); \
+ } while (0)
+
namespace arrow {
template