From d310d5084563ec8f73a12e47a09c6a00bcd0d8a4 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 5 Nov 2021 07:58:07 -1000 Subject: [PATCH 1/8] ARROW-14577: Change the logic in the async IPC reader to first read all metadata and then use that metadata to perform partial reads when reading partial columns. --- cpp/src/arrow/io/caching.cc | 9 +- cpp/src/arrow/io/caching.h | 13 +- cpp/src/arrow/ipc/message.cc | 61 +++- cpp/src/arrow/ipc/message.h | 26 +- cpp/src/arrow/ipc/options.h | 6 + cpp/src/arrow/ipc/read_write_test.cc | 165 ++++++++- cpp/src/arrow/ipc/reader.cc | 491 +++++++++++++++++++-------- cpp/src/arrow/ipc/reader.h | 12 + 8 files changed, 618 insertions(+), 165 deletions(-) diff --git a/cpp/src/arrow/io/caching.cc b/cpp/src/arrow/io/caching.cc index 722026ccd9b..1cbebfd935e 100644 --- a/cpp/src/arrow/io/caching.cc +++ b/cpp/src/arrow/io/caching.cc @@ -144,7 +144,8 @@ struct RangeCacheEntry { }; struct ReadRangeCache::Impl { - std::shared_ptr file; + std::shared_ptr owned_file; + RandomAccessFile* file; IOContext ctx; CacheOptions options; @@ -289,10 +290,12 @@ struct ReadRangeCache::LazyImpl : public ReadRangeCache::Impl { } }; -ReadRangeCache::ReadRangeCache(std::shared_ptr file, IOContext ctx, +ReadRangeCache::ReadRangeCache(std::shared_ptr owned_file, + RandomAccessFile* file, IOContext ctx, CacheOptions options) : impl_(options.lazy ? new LazyImpl() : new Impl()) { - impl_->file = std::move(file); + impl_->owned_file = std::move(owned_file); + impl_->file = file; impl_->ctx = std::move(ctx); impl_->options = options; } diff --git a/cpp/src/arrow/io/caching.h b/cpp/src/arrow/io/caching.h index 59a9b60e82f..9f047fd62fb 100644 --- a/cpp/src/arrow/io/caching.h +++ b/cpp/src/arrow/io/caching.h @@ -104,11 +104,17 @@ class ARROW_EXPORT ReadRangeCache { /// Construct a read cache with default explicit ReadRangeCache(std::shared_ptr file, IOContext ctx) - : ReadRangeCache(file, std::move(ctx), CacheOptions::Defaults()) {} + : ReadRangeCache(file, file.get(), std::move(ctx), CacheOptions::Defaults()) {} /// Construct a read cache with given options explicit ReadRangeCache(std::shared_ptr file, IOContext ctx, - CacheOptions options); + CacheOptions options) + : ReadRangeCache(file, file.get(), ctx, options) {} + + /// Construct a read cache with an unowned file + ReadRangeCache(RandomAccessFile* file, IOContext ctx, CacheOptions options) + : ReadRangeCache(NULLPTR, file, ctx, options) {} + ~ReadRangeCache(); /// \brief Cache the given ranges in the background. @@ -130,6 +136,9 @@ class ARROW_EXPORT ReadRangeCache { struct Impl; struct LazyImpl; + ReadRangeCache(std::shared_ptr owned_file, RandomAccessFile* file, + IOContext ctx, CacheOptions options); + std::unique_ptr impl_; }; diff --git a/cpp/src/arrow/ipc/message.cc b/cpp/src/arrow/ipc/message.cc index c256e42c3d0..fc7e8b8c00f 100644 --- a/cpp/src/arrow/ipc/message.cc +++ b/cpp/src/arrow/ipc/message.cc @@ -311,6 +311,50 @@ Status ReadFieldsSubset(int64_t offset, int32_t metadata_length, return Status::OK(); } +Result> ReadMessage(std::shared_ptr metadata, + std::shared_ptr body) { + std::unique_ptr result; + auto listener = std::make_shared(&result); + // If the user does not pass in a body buffer then we assume they are skipping it + MessageDecoder decoder(listener, default_memory_pool(), body == nullptr); + + if (metadata->size() < decoder.next_required_size()) { + return Status::Invalid("metadata_length should be at least ", + decoder.next_required_size()); + } + + ARROW_RETURN_NOT_OK(decoder.Consume(metadata)); + + switch (decoder.state()) { + case MessageDecoder::State::INITIAL: + // Metadata did not request a body so we better not have provided one + DCHECK_EQ(body, nullptr); + return std::move(result); + case MessageDecoder::State::METADATA_LENGTH: + return Status::Invalid("metadata length is missing from the metadata buffer"); + case MessageDecoder::State::METADATA: + return Status::Invalid("flatbuffer size ", decoder.next_required_size(), + " invalid. Buffer size: ", metadata->size()); + case MessageDecoder::State::BODY: { + if (body == nullptr) { + // Caller didn't give a body so just give them a message without body + return std::move(result); + } + if (body->size() != decoder.next_required_size()) { + return Status::IOError("Expected body buffer to be ", + decoder.next_required_size(), + " bytes for message body, got ", body->size()); + } + RETURN_NOT_OK(decoder.Consume(body)); + return std::move(result); + } + case MessageDecoder::State::EOS: + return Status::Invalid("Unexpected empty message in IPC file format"); + default: + return Status::Invalid("Unexpected state: ", decoder.state()); + } +} + Result> ReadMessage(int64_t offset, int32_t metadata_length, io::RandomAccessFile* file, const FieldsLoaderFunction& fields_loader) { @@ -560,14 +604,15 @@ class MessageDecoder::MessageDecoderImpl { public: explicit MessageDecoderImpl(std::shared_ptr listener, State initial_state, int64_t initial_next_required_size, - MemoryPool* pool) + MemoryPool* pool, bool skip_body) : listener_(std::move(listener)), pool_(pool), state_(initial_state), next_required_size_(initial_next_required_size), chunks_(), buffered_size_(0), - metadata_(nullptr) {} + metadata_(nullptr), + skip_body_(skip_body) {} Status ConsumeData(const uint8_t* data, int64_t size) { if (buffered_size_ == 0) { @@ -798,7 +843,7 @@ class MessageDecoder::MessageDecoderImpl { RETURN_NOT_OK(CheckMetadataAndGetBodyLength(*metadata_, &body_length)); state_ = State::BODY; - next_required_size_ = body_length; + next_required_size_ = skip_body_ ? 0 : body_length; RETURN_NOT_OK(listener_->OnBody()); if (next_required_size_ == 0) { ARROW_ASSIGN_OR_RAISE(auto body, AllocateBuffer(0, pool_)); @@ -894,19 +939,21 @@ class MessageDecoder::MessageDecoderImpl { std::vector> chunks_; int64_t buffered_size_; std::shared_ptr metadata_; // Must be CPU buffer + bool skip_body_; }; MessageDecoder::MessageDecoder(std::shared_ptr listener, - MemoryPool* pool) { + MemoryPool* pool, bool skip_body) { impl_.reset(new MessageDecoderImpl(std::move(listener), State::INITIAL, - kMessageDecoderNextRequiredSizeInitial, pool)); + kMessageDecoderNextRequiredSizeInitial, pool, + skip_body)); } MessageDecoder::MessageDecoder(std::shared_ptr listener, State initial_state, int64_t initial_next_required_size, - MemoryPool* pool) { + MemoryPool* pool, bool skip_body) { impl_.reset(new MessageDecoderImpl(std::move(listener), initial_state, - initial_next_required_size, pool)); + initial_next_required_size, pool, skip_body)); } MessageDecoder::~MessageDecoder() {} diff --git a/cpp/src/arrow/ipc/message.h b/cpp/src/arrow/ipc/message.h index 9c0ed8ced2e..1cd72ce993e 100644 --- a/cpp/src/arrow/ipc/message.h +++ b/cpp/src/arrow/ipc/message.h @@ -266,9 +266,11 @@ class ARROW_EXPORT MessageDecoder { /// \param[in] listener a MessageDecoderListener that responds events from /// the decoder /// \param[in] pool an optional MemoryPool to copy metadata on the + /// \param[in] skip_body if true the body will be skipped even if the message has a body /// CPU, if required explicit MessageDecoder(std::shared_ptr listener, - MemoryPool* pool = default_memory_pool()); + MemoryPool* pool = default_memory_pool(), + bool skip_body = false); /// \brief Construct a message decoder with the specified state. /// @@ -282,9 +284,10 @@ class ARROW_EXPORT MessageDecoder { /// to run the next action /// \param[in] pool an optional MemoryPool to copy metadata on the /// CPU, if required + /// \param[in] skip_body if true the body will be skipped even if the message has a body MessageDecoder(std::shared_ptr listener, State initial_state, int64_t initial_next_required_size, - MemoryPool* pool = default_memory_pool()); + MemoryPool* pool = default_memory_pool(), bool skip_body = false); virtual ~MessageDecoder(); @@ -466,6 +469,25 @@ Result> ReadMessage( const int64_t offset, const int32_t metadata_length, io::RandomAccessFile* file, const FieldsLoaderFunction& fields_loader = {}); +/// \brief Read encapsulated RPC message from cached buffers +/// +/// The buffers should contain an entire message. Partial reads are not handled. +/// +/// This method can be used to read just the metadata by passing in a nullptr for the +/// body. The body will then be skipped and the body size will not be validated. +/// +/// If the body buffer is provided then it must be the complete body buffer +/// +/// This is similar to Message::Open but performs slightly more validation (e.g. checks +/// to see that the metadata length is correct and that the body is the size the metadata +/// expected) +/// +/// \param metadata The bytes for the metadata +/// \param body The bytes for the body +/// \return The message represented by the buffers +ARROW_EXPORT Result> ReadMessage( + std::shared_ptr metadata, std::shared_ptr body); + ARROW_EXPORT Future> ReadMessageAsync( const int64_t offset, const int32_t metadata_length, const int64_t body_length, diff --git a/cpp/src/arrow/ipc/options.h b/cpp/src/arrow/ipc/options.h index 1dbfc5d1473..396344216f1 100644 --- a/cpp/src/arrow/ipc/options.h +++ b/cpp/src/arrow/ipc/options.h @@ -20,6 +20,7 @@ #include #include +#include "arrow/io/caching.h" #include "arrow/ipc/type_fwd.h" #include "arrow/status.h" #include "arrow/type_fwd.h" @@ -148,6 +149,11 @@ struct ARROW_EXPORT IpcReadOptions { /// RecordBatchStreamReader and StreamDecoder classes. bool ensure_native_endian = true; + /// \brief Options to control caching behavior when pre-buffering is requested + /// + /// The lazy property will always be reset to true to deliver the expected behavior + io::CacheOptions pre_buffer_cache_options = io::CacheOptions::LazyDefaults(); + static IpcReadOptions Defaults(); }; diff --git a/cpp/src/arrow/ipc/read_write_test.cc b/cpp/src/arrow/ipc/read_write_test.cc index ee54cdaa2a0..ed43fdd9a91 100644 --- a/cpp/src/arrow/ipc/read_write_test.cc +++ b/cpp/src/arrow/ipc/read_write_test.cc @@ -51,6 +51,7 @@ #include "arrow/util/checked_cast.h" #include "arrow/util/io_util.h" #include "arrow/util/key_value_metadata.h" +#include "arrow/util/make_unique.h" #include "generated/Message_generated.h" // IWYU pragma: keep @@ -59,6 +60,7 @@ namespace arrow { using internal::checked_cast; using internal::checked_pointer_cast; using internal::GetByteWidth; +using internal::make_unique; using internal::TemporaryDir; namespace ipc { @@ -2645,33 +2647,43 @@ TEST(IoRecordedRandomAccessFile, ReadWithCurrentPosition) { ASSERT_EQ(file.GetReadRanges()[0], (io::ReadRange{0, 20})); } -Status MakeBooleanInt32Int64Batch(const int length, std::shared_ptr* out) { - // Make the schema +std::shared_ptr MakeBooleanInt32Int64Schema() { auto f0 = field("f0", boolean()); auto f1 = field("f1", int32()); auto f2 = field("f2", int64()); - auto schema = ::arrow::schema({f0, f1, f2}); + return ::arrow::schema({f0, f1, f2}); +} +Status MakeBooleanInt32Int64Batch(const int length, std::shared_ptr* out) { + auto schema_ = MakeBooleanInt32Int64Schema(); std::shared_ptr a0, a1, a2; RETURN_NOT_OK(MakeRandomBooleanArray(length, false, &a0)); RETURN_NOT_OK(MakeRandomInt32Array(length, false, arrow::default_memory_pool(), &a1)); RETURN_NOT_OK(MakeRandomInt64Array(length, false, arrow::default_memory_pool(), &a2)); - *out = RecordBatch::Make(schema, length, {a0, a1, a2}); + *out = RecordBatch::Make(std::move(schema_), length, {a0, a1, a2}); return Status::OK(); } +std::shared_ptr MakeBooleanInt32Int64File(int num_rows, int num_batches) { + auto schema_ = MakeBooleanInt32Int64Schema(); + EXPECT_OK_AND_ASSIGN(auto sink, io::BufferOutputStream::Create(0)); + EXPECT_OK_AND_ASSIGN(auto writer, MakeFileWriter(sink.get(), schema_)); + + std::shared_ptr batch; + for (int i = 0; i < num_batches; i++) { + ARROW_EXPECT_OK(MakeBooleanInt32Int64Batch(num_rows, &batch)); + ARROW_EXPECT_OK(writer->WriteRecordBatch(*batch)); + } + + ARROW_EXPECT_OK(writer->Close()); + EXPECT_OK_AND_ASSIGN(auto buffer, sink->Finish()); + return buffer; +} + void GetReadRecordBatchReadRanges( uint32_t num_rows, const std::vector& included_fields, const std::vector& expected_body_read_lengths) { - std::shared_ptr batch; - // [bool, int32, int64] batch - ASSERT_OK(MakeBooleanInt32Int64Batch(num_rows, &batch)); - - ASSERT_OK_AND_ASSIGN(auto sink, io::BufferOutputStream::Create(0)); - ASSERT_OK_AND_ASSIGN(auto writer, MakeFileWriter(sink.get(), batch->schema())); - ASSERT_OK(writer->WriteRecordBatch(*batch)); - ASSERT_OK(writer->Close()); - ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish()); + auto buffer = MakeBooleanInt32Int64File(num_rows, /*num_batches=*/1); io::BufferReader buffer_reader(buffer); TrackedRandomAccessFile tracked(&buffer_reader); @@ -2770,6 +2782,133 @@ TEST(TestRecordBatchFileReaderIo, ReadTwoContinousFieldsWithIoMerged) { GetReadRecordBatchReadRanges(64, {0, 1}, {8 + 64 * 4}); } +constexpr static int kNumBatches = 10; +// It can be difficult to know the exact size of the schema. Instead we just make the +// row data big enough that we can easily identify if a read is for a schema or for +// row data. +// +// This needs to be large enough to space record batches kDefaultHoleSizeLimit bytes apart +// and also large enough that record batch data is more than kMaxMetadataSizeBytes bytes +constexpr static int kRowsPerBatch = 1000; +constexpr static int64_t kMaxMetadataSizeBytes = 1 << 13; +// There are always 2 reads when the file is opened +constexpr static int kNumReadsOnOpen = 2; + +class PreBufferingTest : public ::testing::TestWithParam { + protected: + void SetUp() override { + file_buffer_ = MakeBooleanInt32Int64File(kRowsPerBatch, kNumBatches); + } + + void OpenReader() { + buffer_reader_ = make_unique(file_buffer_); + tracked_ = std::make_shared(buffer_reader_.get()); + auto read_options = IpcReadOptions::Defaults(); + if (ReadsArePlugged()) { + // This will ensure that all reads get globbed together into one large read + read_options.pre_buffer_cache_options.hole_size_limit = + std::numeric_limits::max() - 1; + read_options.pre_buffer_cache_options.range_size_limit = + std::numeric_limits::max(); + } + ASSERT_OK_AND_ASSIGN(reader_, RecordBatchFileReader::Open(tracked_, read_options)); + } + + bool ReadsArePlugged() { return GetParam(); } + + std::vector AllBatchIndices() { + std::vector all_batch_indices(kNumBatches); + std::iota(all_batch_indices.begin(), all_batch_indices.end(), 0); + return all_batch_indices; + } + + void AssertMetadataLoaded(std::vector batch_indices) { + if (batch_indices.size() == 0) { + batch_indices = AllBatchIndices(); + } + const auto& read_ranges = tracked_->get_read_ranges(); + if (ReadsArePlugged()) { + // The read should have arrived as one large read + ASSERT_EQ(kNumReadsOnOpen + 1, read_ranges.size()); + if (batch_indices.size() > 1) { + ASSERT_GT(read_ranges[kNumReadsOnOpen].length, kMaxMetadataSizeBytes); + } + } else { + // We should get many small reads of metadata only + ASSERT_EQ(batch_indices.size() + kNumReadsOnOpen, read_ranges.size()); + for (const auto& read_range : read_ranges) { + ASSERT_LT(read_range.length, kMaxMetadataSizeBytes); + } + } + } + + std::vector> LoadExpected() { + auto buffer_reader = make_unique(file_buffer_); + auto read_options = IpcReadOptions::Defaults(); + EXPECT_OK_AND_ASSIGN(auto reader, + RecordBatchFileReader::Open(buffer_reader.get(), read_options)); + std::vector> expected_batches; + for (int i = 0; i < reader->num_record_batches(); i++) { + EXPECT_OK_AND_ASSIGN(auto expected_batch, reader->ReadRecordBatch(i)); + expected_batches.push_back(expected_batch); + } + return expected_batches; + } + + void CheckFileRead(int num_indices_pre_buffered) { + auto expected_batches = LoadExpected(); + const std::vector& read_ranges = tracked_->get_read_ranges(); + std::size_t starting_reads = read_ranges.size(); + for (int i = 0; i < reader_->num_record_batches(); i++) { + ASSERT_OK_AND_ASSIGN(auto next_batch, reader_->ReadRecordBatch(i)); + AssertBatchesEqual(*expected_batches[i], *next_batch); + } + int metadata_reads = 0; + int data_reads = 0; + for (std::size_t i = starting_reads; i < read_ranges.size(); i++) { + if (read_ranges[i].length > kMaxMetadataSizeBytes) { + data_reads++; + } else { + metadata_reads++; + } + } + ASSERT_EQ(metadata_reads, reader_->num_record_batches() - num_indices_pre_buffered); + ASSERT_EQ(data_reads, reader_->num_record_batches()); + } + + std::vector> batches_; + std::shared_ptr file_buffer_; + std::unique_ptr buffer_reader_; + std::shared_ptr tracked_; + std::shared_ptr reader_; +}; + +TEST_P(PreBufferingTest, MetadataOnlyAllBatches) { + OpenReader(); + // Should pre_buffer all metadata + ASSERT_OK(reader_->PreBufferMetadata({})); + AssertMetadataLoaded({}); + CheckFileRead(kNumBatches); +} + +TEST_P(PreBufferingTest, MetadataOnlySomeBatches) { + OpenReader(); + // Should pre_buffer all metadata + ASSERT_OK(reader_->PreBufferMetadata({1, 2, 3})); + AssertMetadataLoaded({1, 2, 3}); + CheckFileRead(3); +} + +INSTANTIATE_TEST_SUITE_P(PreBufferingTests, PreBufferingTest, + ::testing::Values(false, true), + [](const ::testing::TestParamInfo& info) { + if (info.param) { + return "plugged"; + } else { + return "not_plugged"; + } + }); + } // namespace test } // namespace ipc } // namespace arrow diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc index c12254d271d..ec43ec39d94 100644 --- a/cpp/src/arrow/ipc/reader.cc +++ b/cpp/src/arrow/ipc/reader.cc @@ -18,11 +18,13 @@ #include "arrow/ipc/reader.h" #include -#include #include #include +#include #include #include +#include +#include #include #include @@ -37,7 +39,6 @@ #include "arrow/ipc/message.h" #include "arrow/ipc/metadata_internal.h" #include "arrow/ipc/reader_internal.h" -#include "arrow/ipc/util.h" #include "arrow/ipc/writer.h" #include "arrow/record_batch.h" #include "arrow/sparse_tensor.h" @@ -51,6 +52,7 @@ #include "arrow/util/endian.h" #include "arrow/util/key_value_metadata.h" #include "arrow/util/logging.h" +#include "arrow/util/make_unique.h" #include "arrow/util/parallel.h" #include "arrow/util/string.h" #include "arrow/util/thread_pool.h" @@ -70,6 +72,7 @@ namespace flatbuf = org::apache::arrow::flatbuf; using internal::checked_cast; using internal::checked_pointer_cast; using internal::GetByteWidth; +using internal::make_unique; namespace ipc { @@ -137,6 +140,29 @@ struct IpcReadContext { const bool swap_endian; }; +/// A collection of ranges to read and pointers to set to those ranges when they are +/// available. This allows the ArrayLoader to utilize a two pass cache-then-read +/// strategy with a ReadRangeCache +class BatchDataReadRequest { + public: + const std::vector& ranges_to_read() const { return ranges_to_read_; } + + void RequestRange(int64_t offset, int64_t length, std::shared_ptr* out) { + ranges_to_read_.push_back({offset, length}); + destinations_.push_back(out); + } + + void FulfillRequest(const std::vector>& buffers) { + for (std::size_t i = 0; i < buffers.size(); i++) { + *destinations_[i] = buffers[i]; + } + } + + private: + std::vector ranges_to_read_; + std::vector*> destinations_; +}; + /// The field_index and buffer_index are incremented based on how much of the /// batch is "consumed" (through nested data reconstruction, for example) class ArrayLoader { @@ -147,6 +173,16 @@ class ArrayLoader { : metadata_(metadata), metadata_version_(metadata_version), file_(file), + file_offset_(0), + max_recursion_depth_(options.max_recursion_depth) {} + + explicit ArrayLoader(const flatbuf::RecordBatch* metadata, + MetadataVersion metadata_version, const IpcReadOptions& options, + int64_t file_offset) + : metadata_(metadata), + metadata_version_(metadata_version), + file_(nullptr), + file_offset_(file_offset), max_recursion_depth_(options.max_recursion_depth) {} Status ReadBuffer(int64_t offset, int64_t length, std::shared_ptr* out) { @@ -164,7 +200,12 @@ class ArrayLoader { return Status::Invalid("Buffer ", buffer_index_, " did not start on 8-byte aligned offset: ", offset); } - return file_->ReadAt(offset, length).Value(out); + if (file_) { + return file_->ReadAt(offset, length).Value(out); + } else { + read_request_.RequestRange(offset + file_offset_, length, out); + return Status::OK(); + } } Status LoadType(const DataType& type) { return VisitTypeInline(type, this); } @@ -384,17 +425,21 @@ class ArrayLoader { Status Visit(const ExtensionType& type) { return LoadType(*type.storage_type()); } + BatchDataReadRequest& read_request() { return read_request_; } + private: const flatbuf::RecordBatch* metadata_; const MetadataVersion metadata_version_; io::RandomAccessFile* file_; + int64_t file_offset_; int max_recursion_depth_; int buffer_index_ = 0; int field_index_ = 0; bool skip_io_ = false; - const Field* field_; - ArrayData* out_; + BatchDataReadRequest read_request_; + const Field* field_ = nullptr; + ArrayData* out_ = nullptr; }; Result> DecompressBuffer(const std::shared_ptr& buf, @@ -533,7 +578,8 @@ Result> LoadRecordBatch( if (inclusion_mask.size() > 0) { return LoadRecordBatchSubset(metadata, schema, &inclusion_mask, context, file); } else { - return LoadRecordBatchSubset(metadata, schema, /*param_name=*/nullptr, context, file); + return LoadRecordBatchSubset(metadata, schema, /*inclusion_mask=*/nullptr, context, + file); } } @@ -968,15 +1014,19 @@ static inline FileBlock FileBlockFromFlatbuffer(const flatbuf::Block* block) { return FileBlock{block->offset(), block->metaDataLength(), block->bodyLength()}; } -static Result> ReadMessageFromBlock( - const FileBlock& block, io::RandomAccessFile* file, - const FieldsLoaderFunction& fields_loader) { +Status CheckAligned(const FileBlock& block) { if (!bit_util::IsMultipleOf8(block.offset) || !bit_util::IsMultipleOf8(block.metadata_length) || !bit_util::IsMultipleOf8(block.body_length)) { return Status::Invalid("Unaligned block in IPC file"); } + return Status::OK(); +} +static Result> ReadMessageFromBlock( + const FileBlock& block, io::RandomAccessFile* file, + const FieldsLoaderFunction& fields_loader) { + RETURN_NOT_OK(CheckAligned(block)); // TODO(wesm): this breaks integration tests, see ARROW-3256 // DCHECK_EQ((*out)->body_length(), block.body_length); @@ -985,19 +1035,11 @@ static Result> ReadMessageFromBlock( return std::move(message); } -static Future> ReadMessageFromBlockAsync( - const FileBlock& block, io::RandomAccessFile* file, const io::IOContext& io_context) { - if (!bit_util::IsMultipleOf8(block.offset) || - !bit_util::IsMultipleOf8(block.metadata_length) || - !bit_util::IsMultipleOf8(block.body_length)) { - return Status::Invalid("Unaligned block in IPC file"); - } - - // TODO(wesm): this breaks integration tests, see ARROW-3256 - // DCHECK_EQ((*out)->body_length(), block.body_length); - - return ReadMessageAsync(block.offset, block.metadata_length, block.body_length, file, - io_context); +static Result> ReadMessageFromCached( + std::shared_ptr cached_metadata, std::shared_ptr cached_data) { + ARROW_ASSIGN_OR_RAISE(auto message, + ReadMessage(std::move(cached_metadata), std::move(cached_data))); + return std::move(message); } static Status ReadOneDictionary(Message* message, const IpcReadContext& context) { @@ -1022,33 +1064,14 @@ class ARROW_EXPORT IpcFileRecordBatchGenerator { public: using Item = std::shared_ptr; - explicit IpcFileRecordBatchGenerator( - std::shared_ptr state, - std::shared_ptr cached_source, - const io::IOContext& io_context, arrow::internal::Executor* executor) - : state_(std::move(state)), - cached_source_(std::move(cached_source)), - io_context_(io_context), - executor_(executor), - index_(0) {} + explicit IpcFileRecordBatchGenerator(std::shared_ptr state) + : state_(std::move(state)), index_(0) {} Future operator()(); - Future> ReadBlock(const FileBlock& block); - - static Status ReadDictionaries( - RecordBatchFileReaderImpl* state, - std::vector> dictionary_messages); - static Result> ReadRecordBatch( - RecordBatchFileReaderImpl* state, Message* message); private: std::shared_ptr state_; - std::shared_ptr cached_source_; - io::IOContext io_context_; - arrow::internal::Executor* executor_; int index_; - // Odd Future type, but this lets us use All() easily - Future<> read_dictionaries_; }; class RecordBatchFileReaderImpl : public RecordBatchFileReader { @@ -1088,10 +1111,33 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { return Status::OK(); } + Future> ReadRecordBatchAsync(int i) { + DCHECK_GE(i, 0); + DCHECK_LT(i, num_record_batches()); + + auto cached_metadata = cached_metadata_.find(i); + if (cached_metadata != cached_metadata_.end()) { + return ReadCachedRecordBatch(i, cached_metadata->second); + } + + return Status::Invalid( + "Asynchronous record batch reading is only supported after a call to " + "PreBufferMetadata or PreBufferBatches"); + } + Result> ReadRecordBatch(int i) override { DCHECK_GE(i, 0); DCHECK_LT(i, num_record_batches()); + auto cached_metadata = cached_metadata_.find(i); + if (cached_metadata != cached_metadata_.end()) { + return ReadCachedRecordBatch(i, cached_metadata->second).result(); + } + + // FIXME: What if they have prebuffered metadata and so the dictionary read has + // started but this batch wasn't prebuffered and so the dictionaries haven't been + // finished getting read yet. + if (!read_dictionaries_) { RETURN_NOT_OK(ReadDictionaries()); read_dictionaries_ = true; @@ -1143,11 +1189,19 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { Status Open(const std::shared_ptr& file, int64_t footer_offset, const IpcReadOptions& options) { owned_file_ = file; + metadata_cache_ = std::make_shared( + file, file->io_context(), options.pre_buffer_cache_options); return Open(file.get(), footer_offset, options); } Status Open(io::RandomAccessFile* file, int64_t footer_offset, const IpcReadOptions& options) { + // The metadata_cache_ may have already been constructed with an owned file in the + // owning overload of Open + if (!metadata_cache_) { + metadata_cache_ = std::make_shared( + file, file->io_context(), options.pre_buffer_cache_options); + } file_ = file; options_ = options; footer_offset_ = footer_offset; @@ -1164,11 +1218,19 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { Future<> OpenAsync(const std::shared_ptr& file, int64_t footer_offset, const IpcReadOptions& options) { owned_file_ = file; + metadata_cache_ = std::make_shared( + file, file->io_context(), options.pre_buffer_cache_options); return OpenAsync(file.get(), footer_offset, options); } Future<> OpenAsync(io::RandomAccessFile* file, int64_t footer_offset, const IpcReadOptions& options) { + // The metadata_cache_ may have already been constructed with an owned file in the + // owning overload of OpenAsync + if (!metadata_cache_) { + metadata_cache_ = std::make_shared( + file, file->io_context(), options.pre_buffer_cache_options); + } file_ = file; options_ = options; footer_offset_ = footer_offset; @@ -1195,33 +1257,41 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { const io::CacheOptions cache_options, arrow::internal::Executor* executor) override { auto state = std::dynamic_pointer_cast(shared_from_this()); - std::shared_ptr cached_source; - if (coalesce) { - if (!owned_file_) return Status::Invalid("Cannot coalesce without an owned file"); - cached_source = std::make_shared( - owned_file_, io_context, cache_options); - auto num_dictionaries = this->num_dictionaries(); - auto num_record_batches = this->num_record_batches(); - std::vector ranges(num_dictionaries + num_record_batches); - for (int i = 0; i < num_dictionaries; i++) { - auto block = FileBlockFromFlatbuffer(footer_->dictionaries()->Get(i)); - ranges[i].offset = block.offset; - ranges[i].length = block.metadata_length + block.body_length; - } - for (int i = 0; i < num_record_batches; i++) { - auto block = FileBlockFromFlatbuffer(footer_->recordBatches()->Get(i)); - ranges[num_dictionaries + i].offset = block.offset; - ranges[num_dictionaries + i].length = block.metadata_length + block.body_length; - } - RETURN_NOT_OK(cached_source->Cache(std::move(ranges))); + RETURN_NOT_OK(state->PreBufferMetadata({})); + return IpcFileRecordBatchGenerator(std::move(state)); + } + + Status DoPreBufferMetadata(const std::vector& indices) { + RETURN_NOT_OK(CacheMetadata(indices)); + EnsureDictionaryReadStarted(); + Future<> all_metadata_ready = WaitForMetadatas(indices); + for (int index : indices) { + Future> metadata_loaded = + all_metadata_ready.Then([this, index]() -> Result> { + ARROW_ASSIGN_OR_RAISE(std::unique_ptr message, + ReadCachedMessageFromBlock(GetRecordBatchBlock(index))); + return std::shared_ptr(std::move(message)); + }); + cached_metadata_.emplace(index, metadata_loaded); + } + return Status::OK(); + } + + std::vector AllIndices() const { + std::vector all_indices(num_record_batches()); + std::iota(all_indices.begin(), all_indices.end(), 0); + return all_indices; + } + + Status PreBufferMetadata(const std::vector& indices) override { + if (indices.size() == 0) { + return DoPreBufferMetadata(AllIndices()); + } else { + return DoPreBufferMetadata(indices); } - return IpcFileRecordBatchGenerator(std::move(state), std::move(cached_source), - io_context, executor); } private: - friend AsyncGenerator> MakeMessageGenerator( - std::shared_ptr, const io::IOContext&); friend class IpcFileRecordBatchGenerator; FileBlock GetRecordBatchBlock(int i) const { @@ -1240,6 +1310,14 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { return std::move(message); } + Result> ReadCachedMessageFromBlock(const FileBlock& block) { + ++stats_.num_messages; + ARROW_ASSIGN_OR_RAISE(std::shared_ptr metadata, + metadata_cache_->Read({block.offset, block.metadata_length})); + std::shared_ptr data; + return arrow::ipc::ReadMessageFromCached(std::move(metadata), std::move(data)); + } + Status ReadDictionaries() { // Read all the dictionaries IpcReadContext context(&dictionary_memo_, options_, swap_endian_); @@ -1251,6 +1329,206 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { return Status::OK(); } + void AddDictionaryRanges(std::vector* ranges) const { + // Adds all dictionaries to the range cache + for (int i = 0; i < num_dictionaries(); ++i) { + FileBlock block = GetDictionaryBlock(i); + ranges->push_back({block.offset, block.metadata_length + block.body_length}); + } + } + + void AddMetadataRanges(const std::vector& indices, + std::vector* ranges) { + for (int index : indices) { + FileBlock block = GetRecordBatchBlock(static_cast(index)); + ranges->push_back({block.offset, block.metadata_length}); + } + } + + Status CacheMetadata(const std::vector& indices) { + std::vector ranges; + if (!read_dictionaries_) { + AddDictionaryRanges(&ranges); + } + AddMetadataRanges(indices, &ranges); + return metadata_cache_->Cache(std::move(ranges)); + } + + void EnsureDictionaryReadStarted() { + if (!dictionary_load_finished_.is_valid()) { + read_dictionaries_ = true; + std::vector ranges; + AddDictionaryRanges(&ranges); + dictionary_load_finished_ = + metadata_cache_->WaitFor(std::move(ranges)).Then([this] { + return ReadDictionaries(); + }); + } + } + + Future<> WaitForMetadatas(const std::vector& indices) { + std::vector ranges; + AddMetadataRanges(indices, &ranges); + return metadata_cache_->WaitFor(std::move(ranges)); + } + + Result GetIpcReadContext(const flatbuf::Message* message, + const flatbuf::RecordBatch* batch) { + IpcReadContext context(&dictionary_memo_, options_, swap_endian_); + Compression::type compression; + RETURN_NOT_OK(GetCompression(batch, &compression)); + if (context.compression == Compression::UNCOMPRESSED && + message->version() == flatbuf::MetadataVersion::V4) { + // Possibly obtain codec information from experimental serialization format + // in 0.17.x + RETURN_NOT_OK(GetCompressionExperimental(message, &compression)); + } + context.compression = compression; + context.metadata_version = internal::GetMetadataVersion(message->version()); + return std::move(context); + } + + Result GetBatchFromMessage( + const flatbuf::Message* message) { + auto batch = message->header_as_RecordBatch(); + if (batch == nullptr) { + return Status::IOError( + "Header-type of flatbuffer-encoded Message is not RecordBatch."); + } + return batch; + } + + Result GetFlatbufMessage( + const std::shared_ptr& message) { + const Buffer& metadata = *message->metadata(); + const flatbuf::Message* flatbuf_message = nullptr; + RETURN_NOT_OK( + internal::VerifyMessage(metadata.data(), metadata.size(), &flatbuf_message)); + return flatbuf_message; + } + + struct CachedRecordBatchReadContext { + CachedRecordBatchReadContext(std::shared_ptr sch, + const flatbuf::RecordBatch* batch, + IpcReadContext context, io::RandomAccessFile* file, + std::shared_ptr owned_file, + int64_t block_data_offset) + : schema(std::move(sch)), + context(std::move(context)), + file(file), + owned_file(std::move(owned_file)), + loader(batch, context.metadata_version, context.options, block_data_offset), + columns(schema->num_fields()), + cache(file, file->io_context(), io::CacheOptions::LazyDefaults()), + length(batch->length()) {} + + Status CalculateLoadRequest() { + std::shared_ptr out_schema; + RETURN_NOT_OK(GetInclusionMaskAndOutSchema(schema, context.options.included_fields, + &inclusion_mask, &out_schema)); + + for (int i = 0; i < schema->num_fields(); ++i) { + const Field& field = *schema->field(i); + if (inclusion_mask.size() == 0 || inclusion_mask[i]) { + // Read field + auto column = std::make_shared(); + RETURN_NOT_OK(loader.Load(&field, column.get())); + if (length != column->length) { + return Status::IOError("Array length did not match record batch length"); + } + columns[i] = std::move(column); + if (inclusion_mask.size() > 0) { + filtered_columns.push_back(columns[i]); + filtered_fields.push_back(schema->field(i)); + } + } else { + // Skip field. This logic must be executed to advance the state of the + // loader to the next field + RETURN_NOT_OK(loader.SkipField(&field)); + } + } + if (inclusion_mask.size() > 0) { + filtered_schema = ::arrow::schema(std::move(filtered_fields), schema->metadata()); + } else { + filtered_schema = schema; + } + return Status::OK(); + } + + Future<> ReadAsync() { + RETURN_NOT_OK(cache.Cache(loader.read_request().ranges_to_read())); + return cache.WaitFor(loader.read_request().ranges_to_read()); + } + + Result> CreateRecordBatch() { + std::vector> buffers; + for (const auto& range_to_read : loader.read_request().ranges_to_read()) { + ARROW_ASSIGN_OR_RAISE(auto buffer, cache.Read(range_to_read)); + buffers.push_back(std::move(buffer)); + } + loader.read_request().FulfillRequest(buffers); + + // Dictionary resolution needs to happen on the unfiltered columns, + // because fields are mapped structurally (by path in the original schema). + RETURN_NOT_OK(ResolveDictionaries(columns, *context.dictionary_memo, + context.options.memory_pool)); + if (inclusion_mask.size() > 0) { + columns.clear(); + } else { + filtered_columns = std::move(columns); + } + + if (context.compression != Compression::UNCOMPRESSED) { + RETURN_NOT_OK( + DecompressBuffers(context.compression, context.options, &filtered_columns)); + } + + // swap endian in a set of ArrayData if necessary (swap_endian == true) + if (context.swap_endian) { + for (int i = 0; i < static_cast(filtered_columns.size()); ++i) { + ARROW_ASSIGN_OR_RAISE(filtered_columns[i], arrow::internal::SwapEndianArrayData( + filtered_columns[i])); + } + } + return RecordBatch::Make(std::move(filtered_schema), length, + std::move(filtered_columns)); + } + + std::shared_ptr schema; + IpcReadContext context; + io::RandomAccessFile* file; + std::shared_ptr owned_file; + + ArrayLoader loader; + ArrayDataVector columns; + io::internal::ReadRangeCache cache; + int64_t length; + ArrayDataVector filtered_columns; + FieldVector filtered_fields; + std::shared_ptr filtered_schema; + std::vector inclusion_mask; + }; + + Future> ReadCachedRecordBatch( + int index, Future> message_fut) { + ++stats_.num_record_batches; + return dictionary_load_finished_.Then([message_fut] { return message_fut; }) + .Then([this, index](const std::shared_ptr& message_obj) + -> Future> { + FileBlock block = GetRecordBatchBlock(index); + ARROW_ASSIGN_OR_RAISE(auto message, GetFlatbufMessage(message_obj)); + ARROW_ASSIGN_OR_RAISE(auto batch, GetBatchFromMessage(message)); + ARROW_ASSIGN_OR_RAISE(auto context, GetIpcReadContext(message, batch)); + + auto read_context = std::make_shared( + schema_, batch, std::move(context), file_, owned_file_, + block.offset + static_cast(block.metadata_length)); + RETURN_NOT_OK(read_context->CalculateLoadRequest()); + return read_context->ReadAsync().Then( + [read_context] { return read_context->CreateRecordBatch(); }); + }); + } + Status ReadFooter() { auto fut = ReadFooterAsync(/*executor=*/nullptr); return fut.status(); @@ -1341,6 +1619,11 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { std::shared_ptr out_schema_; ReadStats stats_; + std::shared_ptr metadata_cache_; + std::unordered_set cached_data_blocks_; + Future<> dictionary_load_finished_; + std::unordered_map>> cached_metadata_; + std::unordered_map> cached_data_requests_; bool swap_endian_; }; @@ -1400,79 +1683,11 @@ Future> RecordBatchFileReader::OpenAsync( } Future IpcFileRecordBatchGenerator::operator()() { - auto state = state_; - if (!read_dictionaries_.is_valid()) { - std::vector>> messages(state->num_dictionaries()); - for (int i = 0; i < state->num_dictionaries(); i++) { - auto block = FileBlockFromFlatbuffer(state->footer_->dictionaries()->Get(i)); - messages[i] = ReadBlock(block); - } - auto read_messages = All(std::move(messages)); - if (executor_) read_messages = executor_->Transfer(read_messages); - read_dictionaries_ = read_messages.Then( - [=](const std::vector>>& maybe_messages) - -> Status { - ARROW_ASSIGN_OR_RAISE(auto messages, - arrow::internal::UnwrapOrRaise(maybe_messages)); - return ReadDictionaries(state.get(), std::move(messages)); - }); - } - if (index_ >= state_->num_record_batches()) { - return Future::MakeFinished(IterationTraits::End()); - } - auto block = FileBlockFromFlatbuffer(state->footer_->recordBatches()->Get(index_++)); - auto read_message = ReadBlock(block); - auto read_messages = read_dictionaries_.Then([read_message]() { return read_message; }); - // Force transfer. This may be wasteful in some cases, but ensures we get off the - // I/O threads as soon as possible, and ensures we don't decode record batches - // synchronously in the case that the message read has already finished. - if (executor_) { - auto executor = executor_; - return read_messages.Then( - [=](const std::shared_ptr& message) -> Future { - return DeferNotOk(executor->Submit( - [=]() { return ReadRecordBatch(state.get(), message.get()); })); - }); - } - return read_messages.Then([=](const std::shared_ptr& message) -> Result { - return ReadRecordBatch(state.get(), message.get()); - }); -} - -Future> IpcFileRecordBatchGenerator::ReadBlock( - const FileBlock& block) { - if (cached_source_) { - auto cached_source = cached_source_; - io::ReadRange range{block.offset, block.metadata_length + block.body_length}; - auto pool = state_->options_.memory_pool; - return cached_source->WaitFor({range}).Then( - [cached_source, pool, range]() -> Result> { - ARROW_ASSIGN_OR_RAISE(auto buffer, cached_source->Read(range)); - io::BufferReader stream(std::move(buffer)); - return ReadMessage(&stream, pool); - }); - } else { - return ReadMessageFromBlockAsync(block, state_->file_, io_context_); + int index = index_++; + if (index >= state_->num_record_batches()) { + return IterationEnd(); } -} - -Status IpcFileRecordBatchGenerator::ReadDictionaries( - RecordBatchFileReaderImpl* state, - std::vector> dictionary_messages) { - IpcReadContext context(&state->dictionary_memo_, state->options_, state->swap_endian_); - for (const auto& message : dictionary_messages) { - RETURN_NOT_OK(ReadOneDictionary(message.get(), context)); - } - return Status::OK(); -} - -Result> IpcFileRecordBatchGenerator::ReadRecordBatch( - RecordBatchFileReaderImpl* state, Message* message) { - CHECK_HAS_BODY(*message); - ARROW_ASSIGN_OR_RAISE(auto reader, Buffer::GetReader(message->body())); - IpcReadContext context(&state->dictionary_memo_, state->options_, state->swap_endian_); - return ReadRecordBatchInternal(*message->metadata(), state->schema_, - state->field_inclusion_mask_, context, reader.get()); + return state_->ReadRecordBatchAsync(index); } Status Listener::OnEOS() { return Status::OK(); } diff --git a/cpp/src/arrow/ipc/reader.h b/cpp/src/arrow/ipc/reader.h index 6f2157557f3..4bdbccc5097 100644 --- a/cpp/src/arrow/ipc/reader.h +++ b/cpp/src/arrow/ipc/reader.h @@ -196,6 +196,18 @@ class ARROW_EXPORT RecordBatchFileReader /// \brief Computes the total number of rows in the file. virtual Result CountRows() = 0; + /// \brief Begin loading metadata for the desired batches into memory. + /// + /// This method will also begin loading all dictionaries messages into memory. + /// + /// For a regular file this will immediately begin disk I/O in the background on a + /// thread on the IOContext's thread pool. If the file is memory mapped this will + /// ensure the memory needed for the metadata is paged from disk into memory + /// + /// \param indices Indices of the batches to prefetch + /// If empty then all batches will be prefetched. + virtual Status PreBufferMetadata(const std::vector& indices) = 0; + /// \brief Get a reentrant generator of record batches. /// /// \param[in] coalesce If true, enable I/O coalescing. From 874a34b0f5deff12ca9c4a7bc40a2ef6b75a5082 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Thu, 13 Jan 2022 17:51:22 -1000 Subject: [PATCH 2/8] ARROW-14577: Getting rid of calls to make_unique that were confusing Windows --- cpp/src/arrow/ipc/read_write_test.cc | 8 +++----- cpp/src/arrow/ipc/reader.cc | 1 - 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/cpp/src/arrow/ipc/read_write_test.cc b/cpp/src/arrow/ipc/read_write_test.cc index ed43fdd9a91..117353f0e0c 100644 --- a/cpp/src/arrow/ipc/read_write_test.cc +++ b/cpp/src/arrow/ipc/read_write_test.cc @@ -51,7 +51,6 @@ #include "arrow/util/checked_cast.h" #include "arrow/util/io_util.h" #include "arrow/util/key_value_metadata.h" -#include "arrow/util/make_unique.h" #include "generated/Message_generated.h" // IWYU pragma: keep @@ -60,7 +59,6 @@ namespace arrow { using internal::checked_cast; using internal::checked_pointer_cast; using internal::GetByteWidth; -using internal::make_unique; using internal::TemporaryDir; namespace ipc { @@ -2801,7 +2799,7 @@ class PreBufferingTest : public ::testing::TestWithParam { } void OpenReader() { - buffer_reader_ = make_unique(file_buffer_); + buffer_reader_ = std::make_shared(file_buffer_); tracked_ = std::make_shared(buffer_reader_.get()); auto read_options = IpcReadOptions::Defaults(); if (ReadsArePlugged()) { @@ -2843,7 +2841,7 @@ class PreBufferingTest : public ::testing::TestWithParam { } std::vector> LoadExpected() { - auto buffer_reader = make_unique(file_buffer_); + auto buffer_reader = std::make_shared(file_buffer_); auto read_options = IpcReadOptions::Defaults(); EXPECT_OK_AND_ASSIGN(auto reader, RecordBatchFileReader::Open(buffer_reader.get(), read_options)); @@ -2878,7 +2876,7 @@ class PreBufferingTest : public ::testing::TestWithParam { std::vector> batches_; std::shared_ptr file_buffer_; - std::unique_ptr buffer_reader_; + std::shared_ptr buffer_reader_; std::shared_ptr tracked_; std::shared_ptr reader_; }; diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc index ec43ec39d94..176b5c7ce1b 100644 --- a/cpp/src/arrow/ipc/reader.cc +++ b/cpp/src/arrow/ipc/reader.cc @@ -72,7 +72,6 @@ namespace flatbuf = org::apache::arrow::flatbuf; using internal::checked_cast; using internal::checked_pointer_cast; using internal::GetByteWidth; -using internal::make_unique; namespace ipc { From ae782e9f8ac473d04a3b487fd962ae4f218b50ca Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 14 Jan 2022 08:22:10 -1000 Subject: [PATCH 3/8] ARROW-14577: Added back in the old style and will use it when reading the entire file. This allows us to prebuffer the entire file which is more efficient --- cpp/src/arrow/ipc/reader.cc | 165 ++++++++++++++++++++++++++++++++++-- 1 file changed, 158 insertions(+), 7 deletions(-) diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc index 176b5c7ce1b..ab6937830ec 100644 --- a/cpp/src/arrow/ipc/reader.cc +++ b/cpp/src/arrow/ipc/reader.cc @@ -1034,6 +1034,21 @@ static Result> ReadMessageFromBlock( return std::move(message); } +static Future> ReadMessageFromBlockAsync( + const FileBlock& block, io::RandomAccessFile* file, const io::IOContext& io_context) { + if (!bit_util::IsMultipleOf8(block.offset) || + !bit_util::IsMultipleOf8(block.metadata_length) || + !bit_util::IsMultipleOf8(block.body_length)) { + return Status::Invalid("Unaligned block in IPC file"); + } + + // TODO(wesm): this breaks integration tests, see ARROW-3256 + // DCHECK_EQ((*out)->body_length(), block.body_length); + + return ReadMessageAsync(block.offset, block.metadata_length, block.body_length, file, + io_context); +} + static Result> ReadMessageFromCached( std::shared_ptr cached_metadata, std::shared_ptr cached_data) { ARROW_ASSIGN_OR_RAISE(auto message, @@ -1059,11 +1074,49 @@ class RecordBatchFileReaderImpl; /// A generator of record batches. /// /// All batches are yielded in order. -class ARROW_EXPORT IpcFileRecordBatchGenerator { +class ARROW_EXPORT WholeIpcFileRecordBatchGenerator { + public: + using Item = std::shared_ptr; + + explicit WholeIpcFileRecordBatchGenerator( + std::shared_ptr state, + std::shared_ptr cached_source, + const io::IOContext& io_context, arrow::internal::Executor* executor) + : state_(std::move(state)), + cached_source_(std::move(cached_source)), + io_context_(io_context), + executor_(executor), + index_(0) {} + + Future operator()(); + Future> ReadBlock(const FileBlock& block); + + static Status ReadDictionaries( + RecordBatchFileReaderImpl* state, + std::vector> dictionary_messages); + static Result> ReadRecordBatch( + RecordBatchFileReaderImpl* state, Message* message); + + private: + std::shared_ptr state_; + std::shared_ptr cached_source_; + io::IOContext io_context_; + arrow::internal::Executor* executor_; + int index_; + // Odd Future type, but this lets us use All() easily + Future<> read_dictionaries_; +}; + +/// A generator of record batches for use when reading +/// a subset of columns from the file. +/// +/// All batches are yielded in order. +class ARROW_EXPORT SelectiveIpcFileRecordBatchGenerator { public: using Item = std::shared_ptr; - explicit IpcFileRecordBatchGenerator(std::shared_ptr state) + explicit SelectiveIpcFileRecordBatchGenerator( + std::shared_ptr state) : state_(std::move(state)), index_(0) {} Future operator()(); @@ -1256,8 +1309,28 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { const io::CacheOptions cache_options, arrow::internal::Executor* executor) override { auto state = std::dynamic_pointer_cast(shared_from_this()); - RETURN_NOT_OK(state->PreBufferMetadata({})); - return IpcFileRecordBatchGenerator(std::move(state)); + // Prebuffering causes us to use a lot of futures which, at the moment, + // can only slow things down when we are doing zero-copy in-memory reads. + // + // Prebuffering's read patterns are also slightly worse than the alternative + // when doing whole-file reads because the logic is not in place to recognize + // we can just read the entire file up-front + if (options_.included_fields.size() != 0 && + options_.included_fields.size() != schema_->fields().size() && + !file_->supports_zero_copy()) { + RETURN_NOT_OK(state->PreBufferMetadata({})); + return SelectiveIpcFileRecordBatchGenerator(std::move(state)); + } + + std::shared_ptr cached_source; + if (coalesce && file_->supports_zero_copy()) { + if (!owned_file_) return Status::Invalid("Cannot coalesce without an owned file"); + // Since the user is asking for all fields then we can cache the entire + // file (up to the footer) + return cached_source->Cache({{0, footer_offset_}}); + } + return WholeIpcFileRecordBatchGenerator(std::move(state), std::move(cached_source), + io_context, executor); } Status DoPreBufferMetadata(const std::vector& indices) { @@ -1291,7 +1364,7 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { } private: - friend class IpcFileRecordBatchGenerator; + friend class WholeIpcFileRecordBatchGenerator; FileBlock GetRecordBatchBlock(int i) const { return FileBlockFromFlatbuffer(footer_->recordBatches()->Get(i)); @@ -1681,14 +1754,92 @@ Future> RecordBatchFileReader::OpenAsync( .Then([=]() -> Result> { return result; }); } -Future IpcFileRecordBatchGenerator::operator()() { +Future SelectiveIpcFileRecordBatchGenerator:: +operator()() { int index = index_++; if (index >= state_->num_record_batches()) { - return IterationEnd(); + return IterationEnd(); } return state_->ReadRecordBatchAsync(index); } +Future WholeIpcFileRecordBatchGenerator:: +operator()() { + auto state = state_; + if (!read_dictionaries_.is_valid()) { + std::vector>> messages(state->num_dictionaries()); + for (int i = 0; i < state->num_dictionaries(); i++) { + auto block = FileBlockFromFlatbuffer(state->footer_->dictionaries()->Get(i)); + messages[i] = ReadBlock(block); + } + auto read_messages = All(std::move(messages)); + if (executor_) read_messages = executor_->Transfer(read_messages); + read_dictionaries_ = read_messages.Then( + [=](const std::vector>>& maybe_messages) + -> Status { + ARROW_ASSIGN_OR_RAISE(auto messages, + arrow::internal::UnwrapOrRaise(maybe_messages)); + return ReadDictionaries(state.get(), std::move(messages)); + }); + } + if (index_ >= state_->num_record_batches()) { + return Future::MakeFinished(IterationTraits::End()); + } + auto block = FileBlockFromFlatbuffer(state->footer_->recordBatches()->Get(index_++)); + auto read_message = ReadBlock(block); + auto read_messages = read_dictionaries_.Then([read_message]() { return read_message; }); + // Force transfer. This may be wasteful in some cases, but ensures we get off the + // I/O threads as soon as possible, and ensures we don't decode record batches + // synchronously in the case that the message read has already finished. + if (executor_) { + auto executor = executor_; + return read_messages.Then( + [=](const std::shared_ptr& message) -> Future { + return DeferNotOk(executor->Submit( + [=]() { return ReadRecordBatch(state.get(), message.get()); })); + }); + } + return read_messages.Then([=](const std::shared_ptr& message) -> Result { + return ReadRecordBatch(state.get(), message.get()); + }); +} + +Future> WholeIpcFileRecordBatchGenerator::ReadBlock( + const FileBlock& block) { + if (cached_source_) { + auto cached_source = cached_source_; + io::ReadRange range{block.offset, block.metadata_length + block.body_length}; + auto pool = state_->options_.memory_pool; + return cached_source->WaitFor({range}).Then( + [cached_source, pool, range]() -> Result> { + ARROW_ASSIGN_OR_RAISE(auto buffer, cached_source->Read(range)); + io::BufferReader stream(std::move(buffer)); + return ReadMessage(&stream, pool); + }); + } else { + return ReadMessageFromBlockAsync(block, state_->file_, io_context_); + } +} + +Status WholeIpcFileRecordBatchGenerator::ReadDictionaries( + RecordBatchFileReaderImpl* state, + std::vector> dictionary_messages) { + IpcReadContext context(&state->dictionary_memo_, state->options_, state->swap_endian_); + for (const auto& message : dictionary_messages) { + RETURN_NOT_OK(ReadOneDictionary(message.get(), context)); + } + return Status::OK(); +} + +Result> WholeIpcFileRecordBatchGenerator::ReadRecordBatch( + RecordBatchFileReaderImpl* state, Message* message) { + CHECK_HAS_BODY(*message); + ARROW_ASSIGN_OR_RAISE(auto reader, Buffer::GetReader(message->body())); + IpcReadContext context(&state->dictionary_memo_, state->options_, state->swap_endian_); + return ReadRecordBatchInternal(*message->metadata(), state->schema_, + state->field_inclusion_mask_, context, reader.get()); +} + Status Listener::OnEOS() { return Status::OK(); } Status Listener::OnSchemaDecoded(std::shared_ptr schema) { return Status::OK(); } From efce22e1940f7308f07f6b2d0520260eeb0203d9 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 14 Jan 2022 09:10:19 -1000 Subject: [PATCH 4/8] ARROW-14577: Lint --- cpp/src/arrow/ipc/reader.cc | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc index ab6937830ec..17f5113d89f 100644 --- a/cpp/src/arrow/ipc/reader.cc +++ b/cpp/src/arrow/ipc/reader.cc @@ -1754,8 +1754,8 @@ Future> RecordBatchFileReader::OpenAsync( .Then([=]() -> Result> { return result; }); } -Future SelectiveIpcFileRecordBatchGenerator:: -operator()() { +Future +SelectiveIpcFileRecordBatchGenerator::operator()() { int index = index_++; if (index >= state_->num_record_batches()) { return IterationEnd(); @@ -1763,8 +1763,8 @@ operator()() { return state_->ReadRecordBatchAsync(index); } -Future WholeIpcFileRecordBatchGenerator:: -operator()() { +Future +WholeIpcFileRecordBatchGenerator::operator()() { auto state = state_; if (!read_dictionaries_.is_valid()) { std::vector>> messages(state->num_dictionaries()); From 2f427be64432d69d8cfd8bd303398b284ba446c2 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 14 Jan 2022 09:52:51 -1000 Subject: [PATCH 5/8] ARROW-14577: Missing a header for std::iota --- cpp/src/arrow/ipc/read_write_test.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/cpp/src/arrow/ipc/read_write_test.cc b/cpp/src/arrow/ipc/read_write_test.cc index 117353f0e0c..caaba421721 100644 --- a/cpp/src/arrow/ipc/read_write_test.cc +++ b/cpp/src/arrow/ipc/read_write_test.cc @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include From f53fc134caa4003462eab3ecb2097f339cc5fb99 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 14 Jan 2022 12:29:52 -1000 Subject: [PATCH 6/8] ARROW-14577: Removed some called-in-one-spot methods. Put in a more robust WaitForDictionaryReadFinished --- cpp/src/arrow/ipc/reader.cc | 46 ++++++++++++++++--------------------- 1 file changed, 20 insertions(+), 26 deletions(-) diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc index 17f5113d89f..22e30d76118 100644 --- a/cpp/src/arrow/ipc/reader.cc +++ b/cpp/src/arrow/ipc/reader.cc @@ -1049,13 +1049,6 @@ static Future> ReadMessageFromBlockAsync( io_context); } -static Result> ReadMessageFromCached( - std::shared_ptr cached_metadata, std::shared_ptr cached_data) { - ARROW_ASSIGN_OR_RAISE(auto message, - ReadMessage(std::move(cached_metadata), std::move(cached_data))); - return std::move(message); -} - static Status ReadOneDictionary(Message* message, const IpcReadContext& context) { CHECK_HAS_BODY(*message); ARROW_ASSIGN_OR_RAISE(auto reader, Buffer::GetReader(message->body())); @@ -1186,14 +1179,7 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { return ReadCachedRecordBatch(i, cached_metadata->second).result(); } - // FIXME: What if they have prebuffered metadata and so the dictionary read has - // started but this batch wasn't prebuffered and so the dictionaries haven't been - // finished getting read yet. - - if (!read_dictionaries_) { - RETURN_NOT_OK(ReadDictionaries()); - read_dictionaries_ = true; - } + RETURN_NOT_OK(WaitForDictionaryReadFinished()); FieldsLoaderFunction fields_loader = {}; if (!field_inclusion_mask_.empty()) { @@ -1340,9 +1326,12 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { for (int index : indices) { Future> metadata_loaded = all_metadata_ready.Then([this, index]() -> Result> { - ARROW_ASSIGN_OR_RAISE(std::unique_ptr message, - ReadCachedMessageFromBlock(GetRecordBatchBlock(index))); - return std::shared_ptr(std::move(message)); + ++stats_.num_messages; + FileBlock block = GetRecordBatchBlock(index); + ARROW_ASSIGN_OR_RAISE( + std::shared_ptr metadata, + metadata_cache_->Read({block.offset, block.metadata_length})); + return ReadMessage(std::move(metadata), nullptr); }); cached_metadata_.emplace(index, metadata_loaded); } @@ -1382,14 +1371,6 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { return std::move(message); } - Result> ReadCachedMessageFromBlock(const FileBlock& block) { - ++stats_.num_messages; - ARROW_ASSIGN_OR_RAISE(std::shared_ptr metadata, - metadata_cache_->Read({block.offset, block.metadata_length})); - std::shared_ptr data; - return arrow::ipc::ReadMessageFromCached(std::move(metadata), std::move(data)); - } - Status ReadDictionaries() { // Read all the dictionaries IpcReadContext context(&dictionary_memo_, options_, swap_endian_); @@ -1438,6 +1419,19 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { } } + Status WaitForDictionaryReadFinished() { + if (!read_dictionaries_) { + RETURN_NOT_OK(ReadDictionaries()); + read_dictionaries_ = true; + return Status::OK(); + } + if (dictionary_load_finished_.is_valid()) { + return dictionary_load_finished_.status(); + } + // Dictionaries were previously loaded synchronously + return Status::OK(); + } + Future<> WaitForMetadatas(const std::vector& indices) { std::vector ranges; AddMetadataRanges(indices, &ranges); From 5bc29845562350367a6e45440b11e0ef66250cb3 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 14 Jan 2022 13:15:09 -1000 Subject: [PATCH 7/8] ARROW-14577: Added a test for mixed access verifying the just-fixed dictionary issue --- cpp/src/arrow/ipc/read_write_test.cc | 46 ++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/cpp/src/arrow/ipc/read_write_test.cc b/cpp/src/arrow/ipc/read_write_test.cc index caaba421721..843bf46dd4c 100644 --- a/cpp/src/arrow/ipc/read_write_test.cc +++ b/cpp/src/arrow/ipc/read_write_test.cc @@ -30,6 +30,8 @@ #include "arrow/array.h" #include "arrow/array/builder_primitive.h" #include "arrow/buffer_builder.h" +#include "arrow/compute/api_vector.h" +#include "arrow/filesystem/localfs.h" #include "arrow/io/file.h" #include "arrow/io/memory.h" #include "arrow/io/test_common.h" @@ -2908,6 +2910,50 @@ INSTANTIATE_TEST_SUITE_P(PreBufferingTests, PreBufferingTest, } }); +Result> MakeBatchWithDictionaries(const int length) { + auto schema_ = ::arrow::schema({::arrow::field("i32", int32()), + ::arrow::field("i32d", dictionary(int8(), int32()))}); + std::shared_ptr a0, a1; + RETURN_NOT_OK(MakeRandomInt32Array(length, false, arrow::default_memory_pool(), &a0)); + RETURN_NOT_OK(MakeRandomInt32Array(length, false, arrow::default_memory_pool(), &a1)); + EXPECT_OK_AND_ASSIGN(auto encoded_datum, compute::DictionaryEncode(a1)); + a1 = encoded_datum.make_array(); + return RecordBatch::Make(std::move(schema_), length, {a0, a1}); +} + +Result> MakeFileWithDictionaries( + const std::unique_ptr& tempdir, int rows_per_batch, int num_batches) { + EXPECT_OK_AND_ASSIGN(auto temppath, tempdir->path().Join("testfile")); + auto fs = fs::LocalFileSystem(); + EXPECT_OK_AND_ASSIGN(auto batch, MakeBatchWithDictionaries(rows_per_batch)); + EXPECT_OK_AND_ASSIGN(auto sink, fs.OpenOutputStream(temppath.ToString())); + EXPECT_OK_AND_ASSIGN(auto writer, MakeFileWriter(sink.get(), batch->schema())); + + for (int i = 0; i < num_batches; i++) { + ARROW_EXPECT_OK(writer->WriteRecordBatch(*batch)); + } + + ARROW_EXPECT_OK(writer->Close()); + ARROW_EXPECT_OK(sink->Close()); + return fs.OpenInputFile(temppath.ToString()); +} + +TEST(PreBuffering, MixedAccess) { + ASSERT_OK_AND_ASSIGN(auto tempdir, TemporaryDir::Make("arrow-ipc-read-write-test-")); + ASSERT_OK_AND_ASSIGN(auto readable_file, MakeFileWithDictionaries(tempdir, 50, 2)); + auto read_options = IpcReadOptions::Defaults(); + ASSERT_OK_AND_ASSIGN(auto reader, + RecordBatchFileReader::Open(readable_file, read_options)); + ASSERT_OK(reader->PreBufferMetadata({0})); + ASSERT_OK_AND_ASSIGN(auto batch, reader->ReadRecordBatch(1)); + ASSERT_EQ(50, batch->num_rows()); + ASSERT_OK_AND_ASSIGN(batch, reader->ReadRecordBatch(0)); + ASSERT_EQ(50, batch->num_rows()); + auto stats = reader->stats(); + ASSERT_EQ(1, stats.num_dictionary_batches); + ASSERT_EQ(2, stats.num_record_batches); +} + } // namespace test } // namespace ipc } // namespace arrow From d5c9cdf4d977ca23ddff3f2e564e2f100384ef7b Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 14 Jan 2022 13:32:52 -1000 Subject: [PATCH 8/8] ARROW-14577: Removed fs and compute dependencies from IPC test --- cpp/src/arrow/ipc/read_write_test.cc | 27 ++++++++++++++------------- cpp/src/arrow/ipc/test_common.cc | 5 +++-- cpp/src/arrow/ipc/test_common.h | 3 ++- 3 files changed, 19 insertions(+), 16 deletions(-) diff --git a/cpp/src/arrow/ipc/read_write_test.cc b/cpp/src/arrow/ipc/read_write_test.cc index 843bf46dd4c..a30cb40046e 100644 --- a/cpp/src/arrow/ipc/read_write_test.cc +++ b/cpp/src/arrow/ipc/read_write_test.cc @@ -30,8 +30,6 @@ #include "arrow/array.h" #include "arrow/array/builder_primitive.h" #include "arrow/buffer_builder.h" -#include "arrow/compute/api_vector.h" -#include "arrow/filesystem/localfs.h" #include "arrow/io/file.h" #include "arrow/io/memory.h" #include "arrow/io/test_common.h" @@ -2911,22 +2909,25 @@ INSTANTIATE_TEST_SUITE_P(PreBufferingTests, PreBufferingTest, }); Result> MakeBatchWithDictionaries(const int length) { - auto schema_ = ::arrow::schema({::arrow::field("i32", int32()), - ::arrow::field("i32d", dictionary(int8(), int32()))}); - std::shared_ptr a0, a1; - RETURN_NOT_OK(MakeRandomInt32Array(length, false, arrow::default_memory_pool(), &a0)); - RETURN_NOT_OK(MakeRandomInt32Array(length, false, arrow::default_memory_pool(), &a1)); - EXPECT_OK_AND_ASSIGN(auto encoded_datum, compute::DictionaryEncode(a1)); - a1 = encoded_datum.make_array(); - return RecordBatch::Make(std::move(schema_), length, {a0, a1}); + auto dict_type = dictionary(int32(), int32()); + auto schema_ = ::arrow::schema( + {::arrow::field("i32", int32()), ::arrow::field("i32d", dict_type)}); + std::shared_ptr i32, i32d_values, i32d_indices; + RETURN_NOT_OK(MakeRandomInt32Array(length, false, arrow::default_memory_pool(), &i32)); + RETURN_NOT_OK( + MakeRandomInt32Array(length, false, arrow::default_memory_pool(), &i32d_values)); + RETURN_NOT_OK(MakeRandomInt32Array(length, false, arrow::default_memory_pool(), + &i32d_indices, 0, 0, length)); + std::shared_ptr i32d = + std::make_shared(dict_type, i32d_indices, i32d_values); + return RecordBatch::Make(std::move(schema_), length, {i32, i32d}); } Result> MakeFileWithDictionaries( const std::unique_ptr& tempdir, int rows_per_batch, int num_batches) { EXPECT_OK_AND_ASSIGN(auto temppath, tempdir->path().Join("testfile")); - auto fs = fs::LocalFileSystem(); EXPECT_OK_AND_ASSIGN(auto batch, MakeBatchWithDictionaries(rows_per_batch)); - EXPECT_OK_AND_ASSIGN(auto sink, fs.OpenOutputStream(temppath.ToString())); + EXPECT_OK_AND_ASSIGN(auto sink, io::FileOutputStream::Open(temppath.ToString())); EXPECT_OK_AND_ASSIGN(auto writer, MakeFileWriter(sink.get(), batch->schema())); for (int i = 0; i < num_batches; i++) { @@ -2935,7 +2936,7 @@ Result> MakeFileWithDictionaries( ARROW_EXPECT_OK(writer->Close()); ARROW_EXPECT_OK(sink->Close()); - return fs.OpenInputFile(temppath.ToString()); + return io::ReadableFile::Open(temppath.ToString()); } TEST(PreBuffering, MixedAccess) { diff --git a/cpp/src/arrow/ipc/test_common.cc b/cpp/src/arrow/ipc/test_common.cc index d7c1d852b88..e68a4332d7f 100644 --- a/cpp/src/arrow/ipc/test_common.cc +++ b/cpp/src/arrow/ipc/test_common.cc @@ -75,11 +75,12 @@ void CompareBatchColumnsDetailed(const RecordBatch& result, const RecordBatch& e } Status MakeRandomInt32Array(int64_t length, bool include_nulls, MemoryPool* pool, - std::shared_ptr* out, uint32_t seed) { + std::shared_ptr* out, uint32_t seed, int32_t min, + int32_t max) { random::RandomArrayGenerator rand(seed); const double null_probability = include_nulls ? 0.5 : 0.0; - *out = rand.Int32(length, 0, 1000, null_probability); + *out = rand.Int32(length, min, max, null_probability); return Status::OK(); } diff --git a/cpp/src/arrow/ipc/test_common.h b/cpp/src/arrow/ipc/test_common.h index b4c7e31c925..28aea00e30f 100644 --- a/cpp/src/arrow/ipc/test_common.h +++ b/cpp/src/arrow/ipc/test_common.h @@ -42,7 +42,8 @@ void CompareBatchColumnsDetailed(const RecordBatch& result, const RecordBatch& e ARROW_TESTING_EXPORT Status MakeRandomInt32Array(int64_t length, bool include_nulls, MemoryPool* pool, - std::shared_ptr* out, uint32_t seed = 0); + std::shared_ptr* out, uint32_t seed = 0, + int32_t min = 0, int32_t max = 1000); ARROW_TESTING_EXPORT Status MakeRandomInt64Array(int64_t length, bool include_nulls, MemoryPool* pool,