diff --git a/cpp/src/arrow/buffer.cc b/cpp/src/arrow/buffer.cc index 8275542c542..b1b2945d0f5 100644 --- a/cpp/src/arrow/buffer.cc +++ b/cpp/src/arrow/buffer.cc @@ -21,7 +21,6 @@ #include #include -#include "arrow/memory_pool.h" #include "arrow/result.h" #include "arrow/status.h" #include "arrow/util/bit_util.h" @@ -171,112 +170,6 @@ MutableBuffer::MutableBuffer(const std::shared_ptr& parent, const int64_ parent_ = parent; } -// ----------------------------------------------------------------------- -// Pool buffer and allocation - -/// A Buffer whose lifetime is tied to a particular MemoryPool -class PoolBuffer final : public ResizableBuffer { - public: - explicit PoolBuffer(std::shared_ptr mm, MemoryPool* pool) - : ResizableBuffer(nullptr, 0, std::move(mm)), pool_(pool) {} - - ~PoolBuffer() override { - uint8_t* ptr = mutable_data(); - if (ptr) { - pool_->Free(ptr, capacity_); - } - } - - Status Reserve(const int64_t capacity) override { - if (capacity < 0) { - return Status::Invalid("Negative buffer capacity: ", capacity); - } - uint8_t* ptr = mutable_data(); - if (!ptr || capacity > capacity_) { - int64_t new_capacity = BitUtil::RoundUpToMultipleOf64(capacity); - if (ptr) { - RETURN_NOT_OK(pool_->Reallocate(capacity_, new_capacity, &ptr)); - } else { - RETURN_NOT_OK(pool_->Allocate(new_capacity, &ptr)); - } - data_ = ptr; - capacity_ = new_capacity; - } - return Status::OK(); - } - - Status Resize(const int64_t new_size, bool shrink_to_fit = true) override { - if (ARROW_PREDICT_FALSE(new_size < 0)) { - return Status::Invalid("Negative buffer resize: ", new_size); - } - uint8_t* ptr = mutable_data(); - if (ptr && shrink_to_fit && new_size <= size_) { - // Buffer is non-null and is not growing, so shrink to the requested size without - // excess space. - int64_t new_capacity = BitUtil::RoundUpToMultipleOf64(new_size); - if (capacity_ != new_capacity) { - // Buffer hasn't got yet the requested size. - RETURN_NOT_OK(pool_->Reallocate(capacity_, new_capacity, &ptr)); - data_ = ptr; - capacity_ = new_capacity; - } - } else { - RETURN_NOT_OK(Reserve(new_size)); - } - size_ = new_size; - - return Status::OK(); - } - - static std::shared_ptr MakeShared(MemoryPool* pool) { - std::shared_ptr mm; - if (pool == nullptr) { - pool = default_memory_pool(); - mm = default_cpu_memory_manager(); - } else { - mm = CPUDevice::memory_manager(pool); - } - return std::make_shared(std::move(mm), pool); - } - - static std::unique_ptr MakeUnique(MemoryPool* pool) { - std::shared_ptr mm; - if (pool == nullptr) { - pool = default_memory_pool(); - mm = default_cpu_memory_manager(); - } else { - mm = CPUDevice::memory_manager(pool); - } - return std::unique_ptr(new PoolBuffer(std::move(mm), pool)); - } - - private: - MemoryPool* pool_; -}; - -namespace { -// A utility that does most of the work of the `AllocateBuffer` and -// `AllocateResizableBuffer` methods. The argument `buffer` should be a smart pointer to -// a PoolBuffer. -template -inline Result ResizePoolBuffer(PoolBufferPtr&& buffer, const int64_t size) { - RETURN_NOT_OK(buffer->Resize(size)); - buffer->ZeroPadding(); - return std::move(buffer); -} - -} // namespace - -Result> AllocateBuffer(const int64_t size, MemoryPool* pool) { - return ResizePoolBuffer>(PoolBuffer::MakeUnique(pool), size); -} - -Result> AllocateResizableBuffer(const int64_t size, - MemoryPool* pool) { - return ResizePoolBuffer>(PoolBuffer::MakeUnique(pool), - size); -} - Result> AllocateBitmap(int64_t length, MemoryPool* pool) { ARROW_ASSIGN_OR_RAISE(auto buf, AllocateBuffer(BitUtil::BytesForBits(length), pool)); // Zero out any trailing bits diff --git a/cpp/src/arrow/dataset/file_ipc.cc b/cpp/src/arrow/dataset/file_ipc.cc index a60e31bf7d2..a8863ee0775 100644 --- a/cpp/src/arrow/dataset/file_ipc.cc +++ b/cpp/src/arrow/dataset/file_ipc.cc @@ -59,6 +59,21 @@ static inline Result> OpenReader( return reader; } +static inline Future> OpenReaderAsync( + const FileSource& source, + const ipc::IpcReadOptions& options = default_read_options()) { + ARROW_ASSIGN_OR_RAISE(auto input, source.Open()); + auto path = source.path(); + return ipc::RecordBatchFileReader::OpenAsync(std::move(input), options) + .Then([](const std::shared_ptr& reader) + -> Result> { return reader; }, + [path](const Status& status) + -> Result> { + return status.WithMessage("Could not open IPC input source '", path, + "': ", status.message()); + }); +} + static inline Result> GetIncludedFields( const Schema& schema, const std::vector& materialized_fields) { std::vector included_fields; @@ -73,6 +88,26 @@ static inline Result> GetIncludedFields( return included_fields; } +static inline Result GetReadOptions( + const Schema& schema, const FileFormat& format, const ScanOptions& scan_options) { + ARROW_ASSIGN_OR_RAISE( + auto ipc_scan_options, + GetFragmentScanOptions( + kIpcTypeName, &scan_options, format.default_fragment_scan_options)); + auto options = + ipc_scan_options->options ? *ipc_scan_options->options : default_read_options(); + options.memory_pool = scan_options.pool; + if (!options.included_fields.empty()) { + // Cannot set them here + ARROW_LOG(WARNING) << "IpcFragmentScanOptions.options->included_fields was set " + "but will be ignored; included_fields are derived from " + "fields referenced by the scan"; + } + ARROW_ASSIGN_OR_RAISE(options.included_fields, + GetIncludedFields(schema, scan_options.MaterializedFields())); + return options; +} + /// \brief A ScanTask backed by an Ipc file. class IpcScanTask : public ScanTask { public: @@ -83,28 +118,11 @@ class IpcScanTask : public ScanTask { Result Execute() override { struct Impl { static Result Make(const FileSource& source, - FileFormat* format, - const ScanOptions* scan_options) { + const FileFormat& format, + const ScanOptions& scan_options) { ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source)); - - ARROW_ASSIGN_OR_RAISE( - auto ipc_scan_options, - GetFragmentScanOptions( - kIpcTypeName, scan_options, format->default_fragment_scan_options)); - auto options = ipc_scan_options->options ? *ipc_scan_options->options - : default_read_options(); - options.memory_pool = scan_options->pool; - options.use_threads = false; - if (!options.included_fields.empty()) { - // Cannot set them here - ARROW_LOG(WARNING) << "IpcFragmentScanOptions.options->included_fields was set " - "but will be ignored; included_fields are derived from " - "fields referenced by the scan"; - } - ARROW_ASSIGN_OR_RAISE( - options.included_fields, - GetIncludedFields(*reader->schema(), scan_options->MaterializedFields())); - + ARROW_ASSIGN_OR_RAISE(auto options, + GetReadOptions(*reader->schema(), format, scan_options)); ARROW_ASSIGN_OR_RAISE(reader, OpenReader(source, options)); return RecordBatchIterator(Impl{std::move(reader), 0}); } @@ -121,9 +139,9 @@ class IpcScanTask : public ScanTask { int i_; }; - return Impl::Make( - source_, internal::checked_pointer_cast(fragment_)->format().get(), - options_.get()); + return Impl::Make(source_, + *internal::checked_pointer_cast(fragment_)->format(), + *options_); } private: @@ -173,6 +191,44 @@ Result IpcFileFormat::ScanFile( return IpcScanTaskIterator::Make(options, fragment); } +Result IpcFileFormat::ScanBatchesAsync( + const std::shared_ptr& options, + const std::shared_ptr& file) const { + auto self = shared_from_this(); + auto source = file->source(); + auto open_reader = OpenReaderAsync(source); + auto reopen_reader = [self, options, + source](std::shared_ptr reader) + -> Future> { + ARROW_ASSIGN_OR_RAISE(auto options, + GetReadOptions(*reader->schema(), *self, *options)); + return OpenReader(source, options); + }; + auto readahead_level = options->batch_readahead; + auto default_fragment_scan_options = this->default_fragment_scan_options; + auto open_generator = [=](const std::shared_ptr& reader) + -> Result { + ARROW_ASSIGN_OR_RAISE( + auto ipc_scan_options, + GetFragmentScanOptions(kIpcTypeName, options.get(), + default_fragment_scan_options)); + + RecordBatchGenerator generator; + if (ipc_scan_options->cache_options) { + // Transferring helps performance when coalescing + ARROW_ASSIGN_OR_RAISE( + generator, reader->GetRecordBatchGenerator( + /*coalesce=*/true, options->io_context, + *ipc_scan_options->cache_options, internal::GetCpuThreadPool())); + } else { + ARROW_ASSIGN_OR_RAISE(generator, reader->GetRecordBatchGenerator( + /*coalesce=*/false, options->io_context)); + } + return MakeReadaheadGenerator(std::move(generator), readahead_level); + }; + return MakeFromFuture(open_reader.Then(reopen_reader).Then(open_generator)); +} + Future> IpcFileFormat::CountRows( const std::shared_ptr& file, compute::Expression predicate, std::shared_ptr options) { diff --git a/cpp/src/arrow/dataset/file_ipc.h b/cpp/src/arrow/dataset/file_ipc.h index d1c16a93cf4..3888de027c5 100644 --- a/cpp/src/arrow/dataset/file_ipc.h +++ b/cpp/src/arrow/dataset/file_ipc.h @@ -25,6 +25,7 @@ #include "arrow/dataset/file_base.h" #include "arrow/dataset/type_fwd.h" #include "arrow/dataset/visibility.h" +#include "arrow/io/type_fwd.h" #include "arrow/ipc/type_fwd.h" #include "arrow/result.h" @@ -56,6 +57,10 @@ class ARROW_DS_EXPORT IpcFileFormat : public FileFormat { const std::shared_ptr& options, const std::shared_ptr& fragment) const override; + Result ScanBatchesAsync( + const std::shared_ptr& options, + const std::shared_ptr& file) const override; + Future> CountRows( const std::shared_ptr& file, compute::Expression predicate, std::shared_ptr options) override; @@ -75,6 +80,9 @@ class ARROW_DS_EXPORT IpcFragmentScanOptions : public FragmentScanOptions { /// Options passed to the IPC file reader. /// included_fields, memory_pool, and use_threads are ignored. std::shared_ptr options; + /// If present, the async scanner will enable I/O coalescing. + /// This is ignored by the sync scanner. + std::shared_ptr cache_options; }; class ARROW_DS_EXPORT IpcFileWriteOptions : public FileWriteOptions { diff --git a/cpp/src/arrow/io/memory.cc b/cpp/src/arrow/io/memory.cc index a953c8f28a7..7d111183635 100644 --- a/cpp/src/arrow/io/memory.cc +++ b/cpp/src/arrow/io/memory.cc @@ -344,8 +344,8 @@ Result> BufferReader::DoReadAt(int64_t position, int64_t DCHECK_GE(nbytes, 0); // Arrange for data to be paged in - RETURN_NOT_OK(::arrow::internal::MemoryAdviseWillNeed( - {{const_cast(data_ + position), static_cast(nbytes)}})); + // RETURN_NOT_OK(::arrow::internal::MemoryAdviseWillNeed( + // {{const_cast(data_ + position), static_cast(nbytes)}})); if (nbytes > 0 && buffer_ != nullptr) { return SliceBuffer(buffer_, position, nbytes); diff --git a/cpp/src/arrow/io/type_fwd.h b/cpp/src/arrow/io/type_fwd.h index 041b825c988..d8208d39d60 100644 --- a/cpp/src/arrow/io/type_fwd.h +++ b/cpp/src/arrow/io/type_fwd.h @@ -27,6 +27,7 @@ struct FileMode { }; struct IOContext; +struct CacheOptions; /// EXPERIMENTAL: convenience global singleton for default IOContext settings ARROW_EXPORT diff --git a/cpp/src/arrow/ipc/message.cc b/cpp/src/arrow/ipc/message.cc index 6db8a0f0d3d..197556efcea 100644 --- a/cpp/src/arrow/ipc/message.cc +++ b/cpp/src/arrow/ipc/message.cc @@ -33,6 +33,7 @@ #include "arrow/ipc/util.h" #include "arrow/status.h" #include "arrow/util/endian.h" +#include "arrow/util/future.h" #include "arrow/util/logging.h" #include "arrow/util/ubsan.h" @@ -324,6 +325,60 @@ Result> ReadMessage(int64_t offset, int32_t metadata_le } } +Future> ReadMessageAsync(int64_t offset, int32_t metadata_length, + int64_t body_length, + io::RandomAccessFile* file, + const io::IOContext& context) { + struct State { + std::unique_ptr result; + std::shared_ptr listener; + std::shared_ptr decoder; + }; + auto state = std::make_shared(); + state->listener = std::make_shared(&state->result); + state->decoder = std::make_shared(state->listener); + + if (metadata_length < state->decoder->next_required_size()) { + return Status::Invalid("metadata_length should be at least ", + state->decoder->next_required_size()); + } + return file->ReadAsync(context, offset, metadata_length + body_length) + .Then([=](std::shared_ptr metadata) -> Result> { + if (metadata->size() < metadata_length) { + return Status::Invalid("Expected to read ", metadata_length, + " metadata bytes but got ", metadata->size()); + } + ARROW_RETURN_NOT_OK( + state->decoder->Consume(SliceBuffer(metadata, 0, metadata_length))); + switch (state->decoder->state()) { + case MessageDecoder::State::INITIAL: + return std::move(state->result); + case MessageDecoder::State::METADATA_LENGTH: + return Status::Invalid("metadata length is missing. File offset: ", offset, + ", metadata length: ", metadata_length); + case MessageDecoder::State::METADATA: + return Status::Invalid("flatbuffer size ", + state->decoder->next_required_size(), + " invalid. File offset: ", offset, + ", metadata length: ", metadata_length); + case MessageDecoder::State::BODY: { + auto body = SliceBuffer(metadata, metadata_length, body_length); + if (body->size() < state->decoder->next_required_size()) { + return Status::IOError("Expected to be able to read ", + state->decoder->next_required_size(), + " bytes for message body, got ", body->size()); + } + RETURN_NOT_OK(state->decoder->Consume(body)); + return std::move(state->result); + } + case MessageDecoder::State::EOS: + return Status::Invalid("Unexpected empty message in IPC file format"); + default: + return Status::Invalid("Unexpected state: ", state->decoder->state()); + } + }); +} + Status AlignStream(io::InputStream* stream, int32_t alignment) { ARROW_ASSIGN_OR_RAISE(int64_t position, stream->Tell()); return stream->Advance(PaddedLength(position, alignment) - position); diff --git a/cpp/src/arrow/ipc/message.h b/cpp/src/arrow/ipc/message.h index 6a7619d31b3..b2683259cb4 100644 --- a/cpp/src/arrow/ipc/message.h +++ b/cpp/src/arrow/ipc/message.h @@ -459,6 +459,11 @@ Result> ReadMessage(const int64_t offset, const int32_t metadata_length, io::RandomAccessFile* file); +ARROW_EXPORT +Future> ReadMessageAsync( + const int64_t offset, const int32_t metadata_length, const int64_t body_length, + io::RandomAccessFile* file, const io::IOContext& context = io::default_io_context()); + /// \brief Advance stream to an 8-byte offset if its position is not a multiple /// of 8 already /// \param[in] stream an input stream diff --git a/cpp/src/arrow/ipc/read_write_benchmark.cc b/cpp/src/arrow/ipc/read_write_benchmark.cc index a56dd3579e2..f5cc857acb0 100644 --- a/cpp/src/arrow/ipc/read_write_benchmark.cc +++ b/cpp/src/arrow/ipc/read_write_benchmark.cc @@ -21,6 +21,7 @@ #include #include +#include "arrow/io/file.h" #include "arrow/io/memory.h" #include "arrow/ipc/api.h" #include "arrow/record_batch.h" @@ -90,36 +91,6 @@ static void ReadRecordBatch(benchmark::State& state) { // NOLINT non-const refe state.SetBytesProcessed(int64_t(state.iterations()) * kTotalSize); } -static void ReadFile(benchmark::State& state) { // NOLINT non-const reference - // 1MB - constexpr int64_t kTotalSize = 1 << 20; - auto options = ipc::IpcWriteOptions::Defaults(); - - std::shared_ptr buffer = *AllocateResizableBuffer(1024); - { - // Make Arrow IPC file - auto record_batch = MakeRecordBatch(kTotalSize, state.range(0)); - - io::BufferOutputStream stream(buffer); - auto writer = *ipc::MakeFileWriter(&stream, record_batch->schema(), options); - ABORT_NOT_OK(writer->WriteRecordBatch(*record_batch)); - ABORT_NOT_OK(writer->Close()); - ABORT_NOT_OK(stream.Close()); - } - - ipc::DictionaryMemo empty_memo; - while (state.KeepRunning()) { - io::BufferReader input(buffer); - auto reader = - *ipc::RecordBatchFileReader::Open(&input, ipc::IpcReadOptions::Defaults()); - const int num_batches = reader->num_record_batches(); - for (int i = 0; i < num_batches; ++i) { - auto batch = *reader->ReadRecordBatch(i); - } - } - state.SetBytesProcessed(int64_t(state.iterations()) * kTotalSize); -} - static void ReadStream(benchmark::State& state) { // NOLINT non-const reference // 1MB constexpr int64_t kTotalSize = 1 << 20; @@ -188,9 +159,103 @@ static void DecodeStream(benchmark::State& state) { // NOLINT non-const referen state.SetBytesProcessed(int64_t(state.iterations()) * kTotalSize); } +#define GENERATE_COMPRESSED_DATA_IN_MEMORY() \ + constexpr int64_t kBatchSize = 1 << 20; /* 1 MB */ \ + constexpr int64_t kBatches = 16; \ + auto options = ipc::IpcWriteOptions::Defaults(); \ + ASSIGN_OR_ABORT(options.codec, \ + arrow::util::Codec::Create(arrow::Compression::type::ZSTD)); \ + std::shared_ptr buffer = *AllocateResizableBuffer(1024); \ + { \ + auto record_batch = MakeRecordBatch(kBatchSize, state.range(0)); \ + io::BufferOutputStream stream(buffer); \ + auto writer = *ipc::MakeFileWriter(&stream, record_batch->schema(), options); \ + for (int i = 0; i < kBatches; i++) { \ + ABORT_NOT_OK(writer->WriteRecordBatch(*record_batch)); \ + } \ + ABORT_NOT_OK(writer->Close()); \ + ABORT_NOT_OK(stream.Close()); \ + } + +#define GENERATE_DATA_IN_MEMORY() \ + constexpr int64_t kBatchSize = 1 << 20; /* 1 MB */ \ + constexpr int64_t kBatches = 1; \ + auto options = ipc::IpcWriteOptions::Defaults(); \ + std::shared_ptr buffer = *AllocateResizableBuffer(1024); \ + { \ + auto record_batch = MakeRecordBatch(kBatchSize, state.range(0)); \ + io::BufferOutputStream stream(buffer); \ + auto writer = *ipc::MakeFileWriter(&stream, record_batch->schema(), options); \ + ABORT_NOT_OK(writer->WriteRecordBatch(*record_batch)); \ + ABORT_NOT_OK(writer->Close()); \ + ABORT_NOT_OK(stream.Close()); \ + } + +#define GENERATE_DATA_TEMP_FILE() \ + constexpr int64_t kBatchSize = 1 << 20; /* 1 MB */ \ + constexpr int64_t kBatches = 16; \ + auto options = ipc::IpcWriteOptions::Defaults(); \ + ASSIGN_OR_ABORT(auto sink, io::FileOutputStream::Open("/tmp/benchmark.arrow")); \ + { \ + auto record_batch = MakeRecordBatch(kBatchSize, state.range(0)); \ + auto writer = *ipc::MakeFileWriter(sink, record_batch->schema(), options); \ + ABORT_NOT_OK(writer->WriteRecordBatch(*record_batch)); \ + ABORT_NOT_OK(writer->Close()); \ + ABORT_NOT_OK(sink->Close()); \ + } + +#define READ_DATA_IN_MEMORY() auto input = std::make_shared(buffer); +#define READ_DATA_TEMP_FILE() \ + ASSIGN_OR_ABORT(auto input, io::ReadableFile::Open("/tmp/benchmark.arrow")); +#define READ_DATA_MMAP_FILE() \ + ASSIGN_OR_ABORT(auto input, io::MemoryMappedFile::Open("/tmp/benchmark.arrow", \ + io::FileMode::type::READ)); + +#define READ_SYNC(NAME, GENERATE, READ) \ + static void NAME(benchmark::State& state) { \ + GENERATE(); \ + for (auto _ : state) { \ + READ(); \ + auto reader = *ipc::RecordBatchFileReader::Open(input.get(), \ + ipc::IpcReadOptions::Defaults()); \ + const int num_batches = reader->num_record_batches(); \ + for (int i = 0; i < num_batches; ++i) { \ + auto batch = *reader->ReadRecordBatch(i); \ + } \ + } \ + state.SetBytesProcessed(int64_t(state.iterations()) * kBatchSize * kBatches); \ + } \ + BENCHMARK(NAME)->RangeMultiplier(4)->Range(1, 1 << 13)->UseRealTime(); + +#define READ_ASYNC(NAME, GENERATE, READ) \ + static void NAME##Async(benchmark::State& state) { \ + GENERATE(); \ + for (auto _ : state) { \ + READ(); \ + auto reader = *ipc::RecordBatchFileReader::Open(input.get(), \ + ipc::IpcReadOptions::Defaults()); \ + ASSIGN_OR_ABORT(auto generator, reader->GetRecordBatchGenerator()); \ + const int num_batches = reader->num_record_batches(); \ + for (int i = 0; i < num_batches; ++i) { \ + auto batch = *generator().result(); \ + } \ + } \ + state.SetBytesProcessed(int64_t(state.iterations()) * kBatchSize * kBatches); \ + } \ + BENCHMARK(NAME##Async)->RangeMultiplier(4)->Range(1, 1 << 13)->UseRealTime(); + +#define READ_BENCHMARK(NAME, GENERATE, READ) \ + READ_SYNC(NAME, GENERATE, READ); \ + READ_ASYNC(NAME, GENERATE, READ); + +READ_BENCHMARK(ReadFile, GENERATE_DATA_IN_MEMORY, READ_DATA_IN_MEMORY); +READ_BENCHMARK(ReadTempFile, GENERATE_DATA_TEMP_FILE, READ_DATA_TEMP_FILE); +READ_BENCHMARK(ReadMmapFile, GENERATE_DATA_TEMP_FILE, READ_DATA_MMAP_FILE); +READ_BENCHMARK(ReadCompressedFile, GENERATE_COMPRESSED_DATA_IN_MEMORY, + READ_DATA_IN_MEMORY); + BENCHMARK(WriteRecordBatch)->RangeMultiplier(4)->Range(1, 1 << 13)->UseRealTime(); BENCHMARK(ReadRecordBatch)->RangeMultiplier(4)->Range(1, 1 << 13)->UseRealTime(); -BENCHMARK(ReadFile)->RangeMultiplier(4)->Range(1, 1 << 13)->UseRealTime(); BENCHMARK(ReadStream)->RangeMultiplier(4)->Range(1, 1 << 13)->UseRealTime(); BENCHMARK(DecodeStream)->RangeMultiplier(4)->Range(1, 1 << 13)->UseRealTime(); diff --git a/cpp/src/arrow/ipc/read_write_test.cc b/cpp/src/arrow/ipc/read_write_test.cc index 2efa79de8e0..9f8d69d2537 100644 --- a/cpp/src/arrow/ipc/read_write_test.cc +++ b/cpp/src/arrow/ipc/read_write_test.cc @@ -40,6 +40,7 @@ #include "arrow/status.h" #include "arrow/table.h" #include "arrow/testing/extension_type.h" +#include "arrow/testing/future_util.h" #include "arrow/testing/gtest_util.h" #include "arrow/testing/random.h" #include "arrow/testing/util.h" @@ -963,24 +964,6 @@ struct FileWriterHelper { return Status::OK(); } - virtual Status Read(const IpcReadOptions& options, RecordBatchVector* out_batches, - ReadStats* out_stats = nullptr) { - auto buf_reader = std::make_shared(buffer_); - ARROW_ASSIGN_OR_RAISE(auto reader, RecordBatchFileReader::Open( - buf_reader.get(), footer_offset_, options)); - - EXPECT_EQ(num_batches_written_, reader->num_record_batches()); - for (int i = 0; i < num_batches_written_; ++i) { - ARROW_ASSIGN_OR_RAISE(std::shared_ptr chunk, - reader->ReadRecordBatch(i)); - out_batches->push_back(chunk); - } - if (out_stats) { - *out_stats = reader->stats(); - } - return Status::OK(); - } - Status ReadSchema(std::shared_ptr* out) { return ReadSchema(ipc::IpcReadOptions::Defaults(), out); } @@ -1009,6 +992,42 @@ struct FileWriterHelper { int64_t footer_offset_; }; +struct FileGeneratorWriterHelper : public FileWriterHelper { + Status ReadBatches(const IpcReadOptions& options, RecordBatchVector* out_batches, + ReadStats* out_stats = nullptr) override { + auto buf_reader = std::make_shared(buffer_); + AsyncGenerator> generator; + + { + auto fut = + RecordBatchFileReader::OpenAsync(buf_reader.get(), footer_offset_, options); + // Do NOT assert OK since some tests check whether this fails properly + EXPECT_FINISHES(fut); + ARROW_ASSIGN_OR_RAISE(auto reader, fut.result()); + EXPECT_EQ(num_batches_written_, reader->num_record_batches()); + // Generator will keep reader alive internally + ARROW_ASSIGN_OR_RAISE(generator, reader->GetRecordBatchGenerator()); + } + + // Generator is async-reentrant + std::vector>> futures; + for (int i = 0; i < num_batches_written_; ++i) { + futures.push_back(generator()); + } + auto fut = generator(); + EXPECT_FINISHES_OK_AND_EQ(nullptr, fut); + for (auto& future : futures) { + EXPECT_FINISHES_OK_AND_ASSIGN(auto batch, future); + out_batches->push_back(batch); + } + + // The generator doesn't track stats. + EXPECT_EQ(nullptr, out_stats); + + return Status::OK(); + } +}; + struct StreamWriterHelper { static constexpr bool kIsFileFormat = false; @@ -1342,6 +1361,9 @@ class ReaderWriterMixin : public ExtensionTypesMixin { class TestFileFormat : public ReaderWriterMixin, public ::testing::TestWithParam {}; +class TestFileFormatGenerator : public ReaderWriterMixin, + public ::testing::TestWithParam {}; + class TestStreamFormat : public ReaderWriterMixin, public ::testing::TestWithParam {}; @@ -1366,6 +1388,16 @@ TEST_P(TestFileFormat, RoundTrip) { TestZeroLengthRoundTrip(*GetParam(), options); } +TEST_P(TestFileFormatGenerator, RoundTrip) { + TestRoundTrip(*GetParam(), IpcWriteOptions::Defaults()); + TestZeroLengthRoundTrip(*GetParam(), IpcWriteOptions::Defaults()); + + IpcWriteOptions options; + options.write_legacy_ipc_format = true; + TestRoundTrip(*GetParam(), options); + TestZeroLengthRoundTrip(*GetParam(), options); +} + Status MakeDictionaryBatch(std::shared_ptr* out) { auto f0_type = arrow::dictionary(int32(), utf8()); auto f1_type = arrow::dictionary(int8(), utf8()); @@ -1571,6 +1603,8 @@ INSTANTIATE_TEST_SUITE_P(GenericIpcRoundTripTests, TestIpcRoundTrip, ::testing::ValuesIn(kBatchCases)); INSTANTIATE_TEST_SUITE_P(FileRoundTripTests, TestFileFormat, ::testing::ValuesIn(kBatchCases)); +INSTANTIATE_TEST_SUITE_P(FileRoundTripTests, TestFileFormatGenerator, + ::testing::ValuesIn(kBatchCases)); INSTANTIATE_TEST_SUITE_P(StreamRoundTripTests, TestStreamFormat, ::testing::ValuesIn(kBatchCases)); INSTANTIATE_TEST_SUITE_P(StreamDecoderDataRoundTripTests, TestStreamDecoderData, @@ -1635,18 +1669,26 @@ TEST_F(TestStreamFormat, DictionaryRoundTrip) { TestDictionaryRoundtrip(); } TEST_F(TestFileFormat, DictionaryRoundTrip) { TestDictionaryRoundtrip(); } +TEST_F(TestFileFormatGenerator, DictionaryRoundTrip) { TestDictionaryRoundtrip(); } + TEST_F(TestStreamFormat, DifferentSchema) { TestWriteDifferentSchema(); } TEST_F(TestFileFormat, DifferentSchema) { TestWriteDifferentSchema(); } +TEST_F(TestFileFormatGenerator, DifferentSchema) { TestWriteDifferentSchema(); } + TEST_F(TestStreamFormat, NoRecordBatches) { TestWriteNoRecordBatches(); } TEST_F(TestFileFormat, NoRecordBatches) { TestWriteNoRecordBatches(); } +TEST_F(TestFileFormatGenerator, NoRecordBatches) { TestWriteNoRecordBatches(); } + TEST_F(TestStreamFormat, ReadFieldSubset) { TestReadSubsetOfFields(); } TEST_F(TestFileFormat, ReadFieldSubset) { TestReadSubsetOfFields(); } +TEST_F(TestFileFormatGenerator, ReadFieldSubset) { TestReadSubsetOfFields(); } + TEST(TestRecordBatchStreamReader, EmptyStreamWithDictionaries) { // ARROW-6006 auto f0 = arrow::field("f0", arrow::dictionary(arrow::int8(), arrow::utf8())); diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc index 45a3d3e3cd8..7c3115b7c3f 100644 --- a/cpp/src/arrow/ipc/reader.cc +++ b/cpp/src/arrow/ipc/reader.cc @@ -31,6 +31,7 @@ #include "arrow/array.h" #include "arrow/buffer.h" #include "arrow/extension_type.h" +#include "arrow/io/caching.h" #include "arrow/io/interfaces.h" #include "arrow/io/memory.h" #include "arrow/ipc/message.h" @@ -51,6 +52,7 @@ #include "arrow/util/logging.h" #include "arrow/util/parallel.h" #include "arrow/util/string.h" +#include "arrow/util/thread_pool.h" #include "arrow/util/ubsan.h" #include "arrow/visitor_inline.h" @@ -958,10 +960,94 @@ Result> RecordBatchStreamReader::Open( // ---------------------------------------------------------------------- // Reader implementation +// Common functions used in both the random-access file reader and the +// asynchronous generator static inline FileBlock FileBlockFromFlatbuffer(const flatbuf::Block* block) { return FileBlock{block->offset(), block->metaDataLength(), block->bodyLength()}; } +static Result> ReadMessageFromBlock(const FileBlock& block, + io::RandomAccessFile* file) { + if (!BitUtil::IsMultipleOf8(block.offset) || + !BitUtil::IsMultipleOf8(block.metadata_length) || + !BitUtil::IsMultipleOf8(block.body_length)) { + return Status::Invalid("Unaligned block in IPC file"); + } + + // TODO(wesm): this breaks integration tests, see ARROW-3256 + // DCHECK_EQ((*out)->body_length(), block.body_length); + + ARROW_ASSIGN_OR_RAISE(auto message, + ReadMessage(block.offset, block.metadata_length, file)); + return std::move(message); +} + +static Future> ReadMessageFromBlockAsync( + const FileBlock& block, io::RandomAccessFile* file, const io::IOContext& io_context) { + if (!BitUtil::IsMultipleOf8(block.offset) || + !BitUtil::IsMultipleOf8(block.metadata_length) || + !BitUtil::IsMultipleOf8(block.body_length)) { + return Status::Invalid("Unaligned block in IPC file"); + } + + // TODO(wesm): this breaks integration tests, see ARROW-3256 + // DCHECK_EQ((*out)->body_length(), block.body_length); + + return ReadMessageAsync(block.offset, block.metadata_length, block.body_length, file, + io_context); +} + +static Status ReadOneDictionary(Message* message, const IpcReadContext& context) { + CHECK_HAS_BODY(*message); + ARROW_ASSIGN_OR_RAISE(auto reader, Buffer::GetReader(message->body())); + DictionaryKind kind; + RETURN_NOT_OK(ReadDictionary(*message->metadata(), context, &kind, reader.get())); + if (kind != DictionaryKind::New) { + return Status::Invalid( + "Unsupported dictionary replacement or " + "dictionary delta in IPC file"); + } + return Status::OK(); +} + +class RecordBatchFileReaderImpl; + +/// A generator of record batches. +/// +/// All batches are yielded in order. +class ARROW_EXPORT IpcFileRecordBatchGenerator { + public: + using Item = std::shared_ptr; + + explicit IpcFileRecordBatchGenerator( + std::shared_ptr state, + std::shared_ptr cached_source, + const io::IOContext& io_context, arrow::internal::Executor* executor) + : state_(std::move(state)), + cached_source_(std::move(cached_source)), + io_context_(io_context), + executor_(executor), + index_(0) {} + + Future operator()(); + Future> ReadBlock(const FileBlock& block); + + static Status ReadDictionaries( + RecordBatchFileReaderImpl* state, + std::vector> dictionary_messages); + static Result> ReadRecordBatch( + RecordBatchFileReaderImpl* state, Message* message); + + private: + std::shared_ptr state_; + std::shared_ptr cached_source_; + io::IOContext io_context_; + arrow::internal::Executor* executor_; + int index_; + // Odd Future type, but this lets us use All() easily + Future<> read_dictionaries_; +}; + class RecordBatchFileReaderImpl : public RecordBatchFileReader { public: RecordBatchFileReaderImpl() : file_(NULLPTR), footer_offset_(0), footer_(NULLPTR) {} @@ -1035,13 +1121,70 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { return Status::OK(); } + Future<> OpenAsync(const std::shared_ptr& file, + int64_t footer_offset, const IpcReadOptions& options) { + owned_file_ = file; + return OpenAsync(file.get(), footer_offset, options); + } + + Future<> OpenAsync(io::RandomAccessFile* file, int64_t footer_offset, + const IpcReadOptions& options) { + file_ = file; + options_ = options; + footer_offset_ = footer_offset; + auto cpu_executor = ::arrow::internal::GetCpuThreadPool(); + auto self = std::dynamic_pointer_cast(shared_from_this()); + return ReadFooterAsync(cpu_executor) + .Then([self, options](const detail::Empty&) -> Status { + // Get the schema and record any observed dictionaries + RETURN_NOT_OK(UnpackSchemaMessage( + self->footer_->schema(), options, &self->dictionary_memo_, &self->schema_, + &self->out_schema_, &self->field_inclusion_mask_, &self->swap_endian_)); + ++self->stats_.num_messages; + return Status::OK(); + }); + } + std::shared_ptr schema() const override { return out_schema_; } std::shared_ptr metadata() const override { return metadata_; } ReadStats stats() const override { return stats_; } + Result>> GetRecordBatchGenerator( + const bool coalesce, const io::IOContext& io_context, + const io::CacheOptions cache_options, + arrow::internal::Executor* executor) override { + auto state = std::dynamic_pointer_cast(shared_from_this()); + std::shared_ptr cached_source; + if (coalesce) { + if (!owned_file_) return Status::Invalid("Cannot coalesce without an owned file"); + cached_source = std::make_shared( + owned_file_, io_context, cache_options); + auto num_dictionaries = this->num_dictionaries(); + auto num_record_batches = this->num_record_batches(); + std::vector ranges(num_dictionaries + num_record_batches); + for (int i = 0; i < num_dictionaries; i++) { + auto block = FileBlockFromFlatbuffer(footer_->dictionaries()->Get(i)); + ranges[i].offset = block.offset; + ranges[i].length = block.metadata_length + block.body_length; + } + for (int i = 0; i < num_record_batches; i++) { + auto block = FileBlockFromFlatbuffer(footer_->recordBatches()->Get(i)); + ranges[num_dictionaries + i].offset = block.offset; + ranges[num_dictionaries + i].length = block.metadata_length + block.body_length; + } + RETURN_NOT_OK(cached_source->Cache(std::move(ranges))); + } + return IpcFileRecordBatchGenerator(std::move(state), std::move(cached_source), + io_context, executor); + } + private: + friend AsyncGenerator> MakeMessageGenerator( + std::shared_ptr, const io::IOContext&); + friend class IpcFileRecordBatchGenerator; + FileBlock GetRecordBatchBlock(int i) const { return FileBlockFromFlatbuffer(footer_->recordBatches()->Get(i)); } @@ -1051,42 +1194,28 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { } Result> ReadMessageFromBlock(const FileBlock& block) { - if (!BitUtil::IsMultipleOf8(block.offset) || - !BitUtil::IsMultipleOf8(block.metadata_length) || - !BitUtil::IsMultipleOf8(block.body_length)) { - return Status::Invalid("Unaligned block in IPC file"); - } - - // TODO(wesm): this breaks integration tests, see ARROW-3256 - // DCHECK_EQ((*out)->body_length(), block.body_length); - - ARROW_ASSIGN_OR_RAISE(auto message, - ReadMessage(block.offset, block.metadata_length, file_)); + ARROW_ASSIGN_OR_RAISE(auto message, arrow::ipc::ReadMessageFromBlock(block, file_)); ++stats_.num_messages; return std::move(message); } Status ReadDictionaries() { // Read all the dictionaries + IpcReadContext context(&dictionary_memo_, options_, swap_endian_); for (int i = 0; i < num_dictionaries(); ++i) { ARROW_ASSIGN_OR_RAISE(auto message, ReadMessageFromBlock(GetDictionaryBlock(i))); - - CHECK_HAS_BODY(*message); - ARROW_ASSIGN_OR_RAISE(auto reader, Buffer::GetReader(message->body())); - DictionaryKind kind; - IpcReadContext context(&dictionary_memo_, options_, swap_endian_); - RETURN_NOT_OK(ReadDictionary(*message->metadata(), context, &kind, reader.get())); + RETURN_NOT_OK(ReadOneDictionary(message.get(), context)); ++stats_.num_dictionary_batches; - if (kind != DictionaryKind::New) { - return Status::Invalid( - "Unsupported dictionary replacement or " - "dictionary delta in IPC file"); - } } return Status::OK(); } Status ReadFooter() { + auto fut = ReadFooterAsync(/*executor=*/nullptr); + return fut.status(); + } + + Future<> ReadFooterAsync(arrow::internal::Executor* executor) { const int32_t magic_size = static_cast(strlen(kArrowMagicBytes)); if (footer_offset_ <= magic_size * 2 + 4) { @@ -1094,45 +1223,53 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { } int file_end_size = static_cast(magic_size + sizeof(int32_t)); - ARROW_ASSIGN_OR_RAISE(auto buffer, - file_->ReadAt(footer_offset_ - file_end_size, file_end_size)); - - const int64_t expected_footer_size = magic_size + sizeof(int32_t); - if (buffer->size() < expected_footer_size) { - return Status::Invalid("Unable to read ", expected_footer_size, "from end of file"); - } - - if (memcmp(buffer->data() + sizeof(int32_t), kArrowMagicBytes, magic_size)) { - return Status::Invalid("Not an Arrow file"); - } - - int32_t footer_length = - BitUtil::FromLittleEndian(*reinterpret_cast(buffer->data())); - - if (footer_length <= 0 || footer_length > footer_offset_ - magic_size * 2 - 4) { - return Status::Invalid("File is smaller than indicated metadata size"); - } - - // Now read the footer - ARROW_ASSIGN_OR_RAISE( - footer_buffer_, - file_->ReadAt(footer_offset_ - footer_length - file_end_size, footer_length)); - - const auto data = footer_buffer_->data(); - const auto size = footer_buffer_->size(); - if (!internal::VerifyFlatbuffers(data, size)) { - return Status::IOError("Verification of flatbuffer-encoded Footer failed."); - } - footer_ = flatbuf::GetFooter(data); - - auto fb_metadata = footer_->custom_metadata(); - if (fb_metadata != nullptr) { - std::shared_ptr md; - RETURN_NOT_OK(internal::GetKeyValueMetadata(fb_metadata, &md)); - metadata_ = std::move(md); // const-ify - } - - return Status::OK(); + auto self = std::dynamic_pointer_cast(shared_from_this()); + auto read_magic = file_->ReadAsync(footer_offset_ - file_end_size, file_end_size); + if (executor) read_magic = executor->Transfer(std::move(read_magic)); + return read_magic + .Then([=](const std::shared_ptr& buffer) + -> Future> { + const int64_t expected_footer_size = magic_size + sizeof(int32_t); + if (buffer->size() < expected_footer_size) { + return Status::Invalid("Unable to read ", expected_footer_size, + "from end of file"); + } + + if (memcmp(buffer->data() + sizeof(int32_t), kArrowMagicBytes, magic_size)) { + return Status::Invalid("Not an Arrow file"); + } + + int32_t footer_length = BitUtil::FromLittleEndian( + *reinterpret_cast(buffer->data())); + + if (footer_length <= 0 || + footer_length > self->footer_offset_ - magic_size * 2 - 4) { + return Status::Invalid("File is smaller than indicated metadata size"); + } + + // Now read the footer + auto read_footer = self->file_->ReadAsync( + self->footer_offset_ - footer_length - file_end_size, footer_length); + if (executor) read_footer = executor->Transfer(std::move(read_footer)); + return read_footer; + }) + .Then([=](const std::shared_ptr& buffer) -> Status { + self->footer_buffer_ = buffer; + const auto data = self->footer_buffer_->data(); + const auto size = self->footer_buffer_->size(); + if (!internal::VerifyFlatbuffers(data, size)) { + return Status::IOError("Verification of flatbuffer-encoded Footer failed."); + } + self->footer_ = flatbuf::GetFooter(data); + + auto fb_metadata = self->footer_->custom_metadata(); + if (fb_metadata != nullptr) { + std::shared_ptr md; + RETURN_NOT_OK(internal::GetKeyValueMetadata(fb_metadata, &md)); + self->metadata_ = std::move(md); // const-ify + } + return Status::OK(); + }); } int num_dictionaries() const { @@ -1194,6 +1331,115 @@ Result> RecordBatchFileReader::Open( return result; } +Future> RecordBatchFileReader::OpenAsync( + const std::shared_ptr& file, const IpcReadOptions& options) { + ARROW_ASSIGN_OR_RAISE(int64_t footer_offset, file->GetSize()); + return OpenAsync(std::move(file), footer_offset, options); +} + +Future> RecordBatchFileReader::OpenAsync( + io::RandomAccessFile* file, const IpcReadOptions& options) { + ARROW_ASSIGN_OR_RAISE(int64_t footer_offset, file->GetSize()); + return OpenAsync(file, footer_offset, options); +} + +Future> RecordBatchFileReader::OpenAsync( + const std::shared_ptr& file, int64_t footer_offset, + const IpcReadOptions& options) { + auto result = std::make_shared(); + return result->OpenAsync(file, footer_offset, options) + .Then( + [=](...) -> Result> { return result; }); +} + +Future> RecordBatchFileReader::OpenAsync( + io::RandomAccessFile* file, int64_t footer_offset, const IpcReadOptions& options) { + auto result = std::make_shared(); + return result->OpenAsync(file, footer_offset, options) + .Then( + [=](...) -> Result> { return result; }); +} + +Future IpcFileRecordBatchGenerator::operator()() { + auto state = state_; + if (!read_dictionaries_.is_valid()) { + std::vector>> messages(state->num_dictionaries()); + for (int i = 0; i < state->num_dictionaries(); i++) { + auto block = FileBlockFromFlatbuffer(state->footer_->dictionaries()->Get(i)); + messages[i] = ReadBlock(block); + } + auto read_messages = All(std::move(messages)); + if (executor_) read_messages = executor_->Transfer(read_messages); + read_dictionaries_ = read_messages.Then( + [=](const std::vector>> maybe_messages) + -> Status { + std::vector> messages(state->num_dictionaries()); + for (size_t i = 0; i < messages.size(); i++) { + ARROW_ASSIGN_OR_RAISE(messages[i], maybe_messages[i]); + } + return ReadDictionaries(state.get(), std::move(messages)); + }); + } + if (index_ >= state_->num_record_batches()) { + return Future::MakeFinished(IterationTraits::End()); + } + auto block = FileBlockFromFlatbuffer(state->footer_->recordBatches()->Get(index_++)); + auto read_message = ReadBlock(block); + auto read_messages = read_dictionaries_.Then( + [read_message](const detail::Empty&) { return read_message; }); + // Force transfer. This may be wasteful in some cases, but ensures we get off the + // I/O threads as soon as possible, and ensures we don't decode record batches + // synchronously in the case that the message read has already finished. + if (executor_) { + auto executor = executor_; + return read_messages.Then( + [=](const std::shared_ptr& message) -> Future { + return DeferNotOk(executor->Submit( + [=]() { return ReadRecordBatch(state.get(), message.get()); })); + }); + } + return read_messages.Then([=](const std::shared_ptr& message) -> Result { + return ReadRecordBatch(state.get(), message.get()); + }); +} + +Future> IpcFileRecordBatchGenerator::ReadBlock( + const FileBlock& block) { + if (cached_source_) { + auto cached_source = cached_source_; + io::ReadRange range{block.offset, block.metadata_length + block.body_length}; + auto pool = state_->options_.memory_pool; + return cached_source->WaitFor({range}).Then( + [cached_source, pool, + range](const detail::Empty&) -> Result> { + ARROW_ASSIGN_OR_RAISE(auto buffer, cached_source->Read(range)); + io::BufferReader stream(std::move(buffer)); + return ReadMessage(&stream, pool); + }); + } else { + return ReadMessageFromBlockAsync(block, state_->file_, io_context_); + } +} + +Status IpcFileRecordBatchGenerator::ReadDictionaries( + RecordBatchFileReaderImpl* state, + std::vector> dictionary_messages) { + IpcReadContext context(&state->dictionary_memo_, state->options_, state->swap_endian_); + for (const auto& message : dictionary_messages) { + RETURN_NOT_OK(ReadOneDictionary(message.get(), context)); + } + return Status::OK(); +} + +Result> IpcFileRecordBatchGenerator::ReadRecordBatch( + RecordBatchFileReaderImpl* state, Message* message) { + CHECK_HAS_BODY(*message); + ARROW_ASSIGN_OR_RAISE(auto reader, Buffer::GetReader(message->body())); + IpcReadContext context(&state->dictionary_memo_, state->options_, state->swap_endian_); + return ReadRecordBatchInternal(*message->metadata(), state->schema_, + state->field_inclusion_mask_, context, reader.get()); +} + Status Listener::OnEOS() { return Status::OK(); } Status Listener::OnSchemaDecoded(std::shared_ptr schema) { return Status::OK(); } diff --git a/cpp/src/arrow/ipc/reader.h b/cpp/src/arrow/ipc/reader.h index 38f7f2ed8b9..6f2157557f3 100644 --- a/cpp/src/arrow/ipc/reader.h +++ b/cpp/src/arrow/ipc/reader.h @@ -25,12 +25,14 @@ #include #include +#include "arrow/io/caching.h" #include "arrow/io/type_fwd.h" #include "arrow/ipc/message.h" #include "arrow/ipc/options.h" #include "arrow/record_batch.h" #include "arrow/result.h" #include "arrow/type_fwd.h" +#include "arrow/util/async_generator.h" #include "arrow/util/macros.h" #include "arrow/util/visibility.h" @@ -99,7 +101,8 @@ class ARROW_EXPORT RecordBatchStreamReader : public RecordBatchReader { }; /// \brief Reads the record batch file format -class ARROW_EXPORT RecordBatchFileReader { +class ARROW_EXPORT RecordBatchFileReader + : public std::enable_shared_from_this { public: virtual ~RecordBatchFileReader() = default; @@ -147,6 +150,26 @@ class ARROW_EXPORT RecordBatchFileReader { const std::shared_ptr& file, int64_t footer_offset, const IpcReadOptions& options = IpcReadOptions::Defaults()); + /// \brief Open a file asynchronously (owns the file). + static Future> OpenAsync( + const std::shared_ptr& file, + const IpcReadOptions& options = IpcReadOptions::Defaults()); + + /// \brief Open a file asynchronously (borrows the file). + static Future> OpenAsync( + io::RandomAccessFile* file, + const IpcReadOptions& options = IpcReadOptions::Defaults()); + + /// \brief Open a file asynchronously (owns the file). + static Future> OpenAsync( + const std::shared_ptr& file, int64_t footer_offset, + const IpcReadOptions& options = IpcReadOptions::Defaults()); + + /// \brief Open a file asynchronously (borrows the file). + static Future> OpenAsync( + io::RandomAccessFile* file, int64_t footer_offset, + const IpcReadOptions& options = IpcReadOptions::Defaults()); + /// \brief The schema read from the file virtual std::shared_ptr schema() const = 0; @@ -172,6 +195,21 @@ class ARROW_EXPORT RecordBatchFileReader { /// \brief Computes the total number of rows in the file. virtual Result CountRows() = 0; + + /// \brief Get a reentrant generator of record batches. + /// + /// \param[in] coalesce If true, enable I/O coalescing. + /// \param[in] io_context The IOContext to use (controls which thread pool + /// is used for I/O). + /// \param[in] cache_options Options for coalescing (if enabled). + /// \param[in] executor Optionally, an executor to use for decoding record + /// batches. This is generally only a benefit for very wide and/or + /// compressed batches. + virtual Result>> GetRecordBatchGenerator( + const bool coalesce = false, + const io::IOContext& io_context = io::default_io_context(), + const io::CacheOptions cache_options = io::CacheOptions::LazyDefaults(), + arrow::internal::Executor* executor = NULLPTR) = 0; }; /// \brief A general listener class to receive events. diff --git a/cpp/src/arrow/memory_pool.cc b/cpp/src/arrow/memory_pool.cc index 63e3cb93a25..c80e8f6f680 100644 --- a/cpp/src/arrow/memory_pool.cc +++ b/cpp/src/arrow/memory_pool.cc @@ -18,9 +18,10 @@ #include "arrow/memory_pool.h" #include // IWYU pragma: keep -#include // IWYU pragma: keep -#include // IWYU pragma: keep -#include // IWYU pragma: keep +#include +#include // IWYU pragma: keep +#include // IWYU pragma: keep +#include // IWYU pragma: keep #include #include @@ -28,12 +29,16 @@ #include #endif +#include "arrow/buffer.h" +#include "arrow/io/util_internal.h" #include "arrow/result.h" #include "arrow/status.h" +#include "arrow/util/bit_util.h" #include "arrow/util/io_util.h" #include "arrow/util/logging.h" // IWYU pragma: keep #include "arrow/util/optional.h" #include "arrow/util/string.h" +#include "arrow/util/thread_pool.h" #ifdef __GLIBC__ #include @@ -494,19 +499,27 @@ std::unique_ptr MemoryPool::CreateDefault() { } } -static SystemMemoryPool system_pool; +static struct GlobalState { + ~GlobalState() { finalizing.store(true, std::memory_order_relaxed); } + + bool is_finalizing() const { return finalizing.load(std::memory_order_relaxed); } + + std::atomic finalizing{false}; // constructed first, destroyed last + + SystemMemoryPool system_pool; #ifdef ARROW_JEMALLOC -static JemallocMemoryPool jemalloc_pool; + JemallocMemoryPool jemalloc_pool; #endif #ifdef ARROW_MIMALLOC -static MimallocMemoryPool mimalloc_pool; + MimallocMemoryPool mimalloc_pool; #endif +} global_state; -MemoryPool* system_memory_pool() { return &system_pool; } +MemoryPool* system_memory_pool() { return &global_state.system_pool; } Status jemalloc_memory_pool(MemoryPool** out) { #ifdef ARROW_JEMALLOC - *out = &jemalloc_pool; + *out = &global_state.jemalloc_pool; return Status::OK(); #else return Status::NotImplemented("This Arrow build does not enable jemalloc"); @@ -515,7 +528,7 @@ Status jemalloc_memory_pool(MemoryPool** out) { Status mimalloc_memory_pool(MemoryPool** out) { #ifdef ARROW_MIMALLOC - *out = &mimalloc_pool; + *out = &global_state.mimalloc_pool; return Status::OK(); #else return Status::NotImplemented("This Arrow build does not enable mimalloc"); @@ -526,14 +539,14 @@ MemoryPool* default_memory_pool() { auto backend = DefaultBackend(); switch (backend) { case MemoryPoolBackend::System: - return &system_pool; + return &global_state.system_pool; #ifdef ARROW_JEMALLOC case MemoryPoolBackend::Jemalloc: - return &jemalloc_pool; + return &global_state.jemalloc_pool; #endif #ifdef ARROW_MIMALLOC case MemoryPoolBackend::Mimalloc: - return &mimalloc_pool; + return &global_state.mimalloc_pool; #endif default: ARROW_LOG(FATAL) << "Internal error: cannot create default memory pool"; @@ -669,4 +682,116 @@ std::vector SupportedMemoryBackendNames() { return supported; } +// ----------------------------------------------------------------------- +// Pool buffer and allocation + +/// A Buffer whose lifetime is tied to a particular MemoryPool +class PoolBuffer final : public ResizableBuffer { + public: + explicit PoolBuffer(std::shared_ptr mm, MemoryPool* pool) + : ResizableBuffer(nullptr, 0, std::move(mm)), pool_(pool) {} + + ~PoolBuffer() override { + // Avoid calling pool_->Free if the global pools are destroyed + // (XXX this will not work with user-defined pools) + + // This can happen if a Future is destructing on one thread while or + // after memory pools are destructed on the main thread (as there is + // no guarantee of destructor order between thread/memory pools) + uint8_t* ptr = mutable_data(); + if (ptr && !global_state.is_finalizing()) { + pool_->Free(ptr, capacity_); + } + } + + Status Reserve(const int64_t capacity) override { + if (capacity < 0) { + return Status::Invalid("Negative buffer capacity: ", capacity); + } + uint8_t* ptr = mutable_data(); + if (!ptr || capacity > capacity_) { + int64_t new_capacity = BitUtil::RoundUpToMultipleOf64(capacity); + if (ptr) { + RETURN_NOT_OK(pool_->Reallocate(capacity_, new_capacity, &ptr)); + } else { + RETURN_NOT_OK(pool_->Allocate(new_capacity, &ptr)); + } + data_ = ptr; + capacity_ = new_capacity; + } + return Status::OK(); + } + + Status Resize(const int64_t new_size, bool shrink_to_fit = true) override { + if (ARROW_PREDICT_FALSE(new_size < 0)) { + return Status::Invalid("Negative buffer resize: ", new_size); + } + uint8_t* ptr = mutable_data(); + if (ptr && shrink_to_fit && new_size <= size_) { + // Buffer is non-null and is not growing, so shrink to the requested size without + // excess space. + int64_t new_capacity = BitUtil::RoundUpToMultipleOf64(new_size); + if (capacity_ != new_capacity) { + // Buffer hasn't got yet the requested size. + RETURN_NOT_OK(pool_->Reallocate(capacity_, new_capacity, &ptr)); + data_ = ptr; + capacity_ = new_capacity; + } + } else { + RETURN_NOT_OK(Reserve(new_size)); + } + size_ = new_size; + + return Status::OK(); + } + + static std::shared_ptr MakeShared(MemoryPool* pool) { + std::shared_ptr mm; + if (pool == nullptr) { + pool = default_memory_pool(); + mm = default_cpu_memory_manager(); + } else { + mm = CPUDevice::memory_manager(pool); + } + return std::make_shared(std::move(mm), pool); + } + + static std::unique_ptr MakeUnique(MemoryPool* pool) { + std::shared_ptr mm; + if (pool == nullptr) { + pool = default_memory_pool(); + mm = default_cpu_memory_manager(); + } else { + mm = CPUDevice::memory_manager(pool); + } + return std::unique_ptr(new PoolBuffer(std::move(mm), pool)); + } + + private: + MemoryPool* pool_; +}; + +namespace { +// A utility that does most of the work of the `AllocateBuffer` and +// `AllocateResizableBuffer` methods. The argument `buffer` should be a smart pointer to +// a PoolBuffer. +template +inline Result ResizePoolBuffer(PoolBufferPtr&& buffer, const int64_t size) { + RETURN_NOT_OK(buffer->Resize(size)); + buffer->ZeroPadding(); + return std::move(buffer); +} + +} // namespace + +Result> AllocateBuffer(const int64_t size, MemoryPool* pool) { + return ResizePoolBuffer>(PoolBuffer::MakeUnique(pool), size); +} + +Result> AllocateResizableBuffer(const int64_t size, + MemoryPool* pool) { + return ResizePoolBuffer>(PoolBuffer::MakeUnique(pool), + size); +} + } // namespace arrow diff --git a/cpp/src/arrow/testing/future_util.h b/cpp/src/arrow/testing/future_util.h index 0a20b5f4d57..190e5839bbf 100644 --- a/cpp/src/arrow/testing/future_util.h +++ b/cpp/src/arrow/testing/future_util.h @@ -81,10 +81,21 @@ handle_error(future_name.status()); \ EXPECT_OK_AND_ASSIGN(lhs, future_name.result()); +#define EXPECT_FINISHES(expr) \ + do { \ + EXPECT_FINISHES_IMPL(expr); \ + } while (0) + #define EXPECT_FINISHES_OK_AND_ASSIGN(lhs, rexpr) \ ON_FINISH_ASSIGN_OR_HANDLE_ERROR_IMPL( \ ARROW_EXPECT_OK, ARROW_ASSIGN_OR_RAISE_NAME(_fut, __COUNTER__), lhs, rexpr); +#define EXPECT_FINISHES_OK_AND_EQ(expected, expr) \ + do { \ + EXPECT_FINISHES_OK_AND_ASSIGN(auto _actual, (expr)); \ + EXPECT_EQ(expected, _actual); \ + } while (0) + namespace arrow { template