From 6e7bfbc67b4f76b12d6295787854cea893c6dd73 Mon Sep 17 00:00:00 2001 From: Yue Date: Wed, 20 Oct 2021 20:44:42 +0800 Subject: [PATCH] Support reading arrow IPC file with fine grained IO. Using a no-op random access file to record the read ranges and replay only the necessary read operation. --- cpp/src/arrow/ipc/message.cc | 47 +++++- cpp/src/arrow/ipc/message.h | 13 +- cpp/src/arrow/ipc/read_write_test.cc | 225 +++++++++++++++++++++++++++ cpp/src/arrow/ipc/reader.cc | 117 +++++++++++++- cpp/src/arrow/ipc/reader_internal.h | 84 ++++++++++ cpp/src/arrow/ipc/test_common.cc | 10 ++ cpp/src/arrow/ipc/test_common.h | 4 + 7 files changed, 486 insertions(+), 14 deletions(-) create mode 100644 cpp/src/arrow/ipc/reader_internal.h diff --git a/cpp/src/arrow/ipc/message.cc b/cpp/src/arrow/ipc/message.cc index 197556efcea..ab20897e861 100644 --- a/cpp/src/arrow/ipc/message.cc +++ b/cpp/src/arrow/ipc/message.cc @@ -30,6 +30,8 @@ #include "arrow/io/interfaces.h" #include "arrow/ipc/metadata_internal.h" #include "arrow/ipc/options.h" +#include "arrow/ipc/reader.h" +#include "arrow/ipc/reader_internal.h" #include "arrow/ipc/util.h" #include "arrow/status.h" #include "arrow/util/endian.h" @@ -279,8 +281,39 @@ std::string FormatMessageType(MessageType type) { return "unknown"; } +Status ReadFieldsSubset(int64_t offset, int32_t metadata_length, + io::RandomAccessFile* file, + const FieldsLoaderFunction& fields_loader, + const std::shared_ptr& metadata, int64_t required_size, + std::shared_ptr& body) { + const flatbuf::Message* message = nullptr; + uint8_t continuation_metadata_size = sizeof(int32_t) + sizeof(int32_t); + // skip 8 bytes (32-bit continuation indicator + 32-bit little-endian length prefix) + RETURN_NOT_OK(internal::VerifyMessage(metadata->data() + continuation_metadata_size, + metadata->size() - continuation_metadata_size, + &message)); + auto batch = message->header_as_RecordBatch(); + if (batch == nullptr) { + return Status::IOError( + "Header-type of flatbuffer-encoded Message is not RecordBatch."); + } + internal::IoRecordedRandomAccessFile io_recorded_random_access_file(required_size); + RETURN_NOT_OK(fields_loader(batch, &io_recorded_random_access_file)); + auto const& read_ranges = io_recorded_random_access_file.GetReadRanges(); + for (auto const& range : read_ranges) { + auto read_result = file->ReadAt(offset + metadata_length + range.offset, range.length, + body->mutable_data() + range.offset); + if (!read_result.ok()) { + return Status::IOError("Failed to read message body, error ", + read_result.status().ToString()); + } + } + return Status::OK(); +} + Result> ReadMessage(int64_t offset, int32_t metadata_length, - io::RandomAccessFile* file) { + io::RandomAccessFile* file, + const FieldsLoaderFunction& fields_loader) { std::unique_ptr result; auto listener = std::make_shared(&result); MessageDecoder decoder(listener); @@ -308,8 +341,16 @@ Result> ReadMessage(int64_t offset, int32_t metadata_le " invalid. File offset: ", offset, ", metadata length: ", metadata_length); case MessageDecoder::State::BODY: { - ARROW_ASSIGN_OR_RAISE(auto body, file->ReadAt(offset + metadata_length, - decoder.next_required_size())); + std::shared_ptr body; + if (fields_loader) { + ARROW_ASSIGN_OR_RAISE( + body, AllocateBuffer(decoder.next_required_size(), default_memory_pool())); + RETURN_NOT_OK(ReadFieldsSubset(offset, metadata_length, file, fields_loader, + metadata, decoder.next_required_size(), body)); + } else { + ARROW_ASSIGN_OR_RAISE( + body, file->ReadAt(offset + metadata_length, decoder.next_required_size())); + } if (body->size() < decoder.next_required_size()) { return Status::IOError("Expected to be able to read ", decoder.next_required_size(), diff --git a/cpp/src/arrow/ipc/message.h b/cpp/src/arrow/ipc/message.h index b2683259cb4..9c0ed8ced2e 100644 --- a/cpp/src/arrow/ipc/message.h +++ b/cpp/src/arrow/ipc/message.h @@ -20,6 +20,7 @@ #pragma once #include +#include #include #include #include @@ -441,6 +442,10 @@ class ARROW_EXPORT MessageReader { virtual Result> ReadNextMessage() = 0; }; +// the first parameter of the function should be a pointer to metadata (aka. +// org::apache::arrow::flatbuf::RecordBatch*) +using FieldsLoaderFunction = std::function; + /// \brief Read encapsulated RPC message from position in file /// /// Read a length-prefixed message flatbuffer starting at the indicated file @@ -453,11 +458,13 @@ class ARROW_EXPORT MessageReader { /// first 4 bytes after the offset are the message length /// \param[in] metadata_length the total number of bytes to read from file /// \param[in] file the seekable file interface to read from +/// \param[in] fields_loader the function for loading subset of fields from the given file /// \return the message read + ARROW_EXPORT -Result> ReadMessage(const int64_t offset, - const int32_t metadata_length, - io::RandomAccessFile* file); +Result> ReadMessage( + const int64_t offset, const int32_t metadata_length, io::RandomAccessFile* file, + const FieldsLoaderFunction& fields_loader = {}); ARROW_EXPORT Future> ReadMessageAsync( diff --git a/cpp/src/arrow/ipc/read_write_test.cc b/cpp/src/arrow/ipc/read_write_test.cc index 70edab1f6b7..4f2e51060d9 100644 --- a/cpp/src/arrow/ipc/read_write_test.cc +++ b/cpp/src/arrow/ipc/read_write_test.cc @@ -35,6 +35,7 @@ #include "arrow/ipc/message.h" #include "arrow/ipc/metadata_internal.h" #include "arrow/ipc/reader.h" +#include "arrow/ipc/reader_internal.h" #include "arrow/ipc/test_common.h" #include "arrow/ipc/writer.h" #include "arrow/record_batch.h" @@ -62,6 +63,7 @@ using internal::TemporaryDir; namespace ipc { using internal::FieldPosition; +using internal::IoRecordedRandomAccessFile; namespace test { @@ -1706,6 +1708,54 @@ TEST_F(TestFileFormat, ReadFieldSubset) { TestReadSubsetOfFields(); } TEST_F(TestFileFormatGenerator, ReadFieldSubset) { TestReadSubsetOfFields(); } +class TrackedRandomAccessFile : public io::RandomAccessFile { + public: + explicit TrackedRandomAccessFile(io::RandomAccessFile* delegate) + : delegate_(delegate) {} + + Status Close() override { return delegate_->Close(); } + bool closed() const override { return delegate_->closed(); } + Result Tell() const override { return delegate_->Tell(); } + Status Seek(int64_t position) override { return delegate_->Seek(position); } + Result Read(int64_t nbytes, void* out) override { + ARROW_ASSIGN_OR_RAISE(auto position, delegate_->Tell()); + SaveReadRange(position, nbytes); + return delegate_->Read(nbytes, out); + } + Result> Read(int64_t nbytes) override { + ARROW_ASSIGN_OR_RAISE(auto position, delegate_->Tell()); + SaveReadRange(position, nbytes); + return delegate_->Read(nbytes); + } + bool supports_zero_copy() const override { return delegate_->supports_zero_copy(); } + Result GetSize() override { return delegate_->GetSize(); } + Result ReadAt(int64_t position, int64_t nbytes, void* out) override { + SaveReadRange(position, nbytes); + return delegate_->ReadAt(position, nbytes, out); + } + Result> ReadAt(int64_t position, int64_t nbytes) override { + SaveReadRange(position, nbytes); + return delegate_->ReadAt(position, nbytes); + } + Future> ReadAsync(const io::IOContext& io_context, + int64_t position, int64_t nbytes) override { + SaveReadRange(position, nbytes); + return delegate_->ReadAsync(io_context, position, nbytes); + } + + int64_t num_reads() const { return read_ranges_.size(); } + + const std::vector& get_read_ranges() const { return read_ranges_; } + + private: + io::RandomAccessFile* delegate_; + std::vector read_ranges_; + + void SaveReadRange(int64_t offset, int64_t length) { + read_ranges_.emplace_back(io::ReadRange{offset, length}); + } +}; + TEST(TestRecordBatchStreamReader, EmptyStreamWithDictionaries) { // ARROW-6006 auto f0 = arrow::field("f0", arrow::dictionary(arrow::int8(), arrow::utf8())); @@ -2410,6 +2460,181 @@ TEST(DictionaryMemo, AddDictionaryType) { AssertMemoDictionaryType(memo, 44, utf8()); } +TEST(IoRecordedRandomAccessFile, IoRecording) { + IoRecordedRandomAccessFile file(42); + ASSERT_TRUE(file.GetReadRanges().empty()); + + ASSERT_OK(file.ReadAt(1, 2)); + ASSERT_EQ(file.GetReadRanges().size(), 1); + ASSERT_EQ(file.GetReadRanges()[0], (io::ReadRange{1, 2})); + + ASSERT_OK(file.ReadAt(5, 3)); + ASSERT_EQ(file.GetReadRanges().size(), 2); + ASSERT_EQ(file.GetReadRanges()[1], (io::ReadRange{5, 3})); + + // continuous IOs will be merged + ASSERT_OK(file.ReadAt(5 + 3, 6)); + ASSERT_EQ(file.GetReadRanges().size(), 2); + ASSERT_EQ(file.GetReadRanges()[1], (io::ReadRange{5, 3 + 6})); + + // this should not happen but reading out of bounds will do no harm + ASSERT_OK(file.ReadAt(43, 1)); +} + +TEST(IoRecordedRandomAccessFile, IoRecordingWithOutput) { + std::shared_ptr out; + IoRecordedRandomAccessFile file(42); + ASSERT_TRUE(file.GetReadRanges().empty()); + ASSERT_EQ(file.ReadAt(1, 2, &out), 2L); + ASSERT_EQ(file.GetReadRanges().size(), 1); + ASSERT_EQ(file.GetReadRanges()[0], (io::ReadRange{1, 2})); + + ASSERT_EQ(file.ReadAt(5, 1, &out), 1); + ASSERT_EQ(file.GetReadRanges().size(), 2); + ASSERT_EQ(file.GetReadRanges()[1], (io::ReadRange{5, 1})); + + // continuous IOs will be merged + ASSERT_EQ(file.ReadAt(5 + 1, 6, &out), 6); + ASSERT_EQ(file.GetReadRanges().size(), 2); + ASSERT_EQ(file.GetReadRanges()[1], (io::ReadRange{5, 1 + 6})); +} + +TEST(IoRecordedRandomAccessFile, ReadWithCurrentPosition) { + IoRecordedRandomAccessFile file(42); + ASSERT_TRUE(file.GetReadRanges().empty()); + + ASSERT_OK(file.Read(10)); + ASSERT_EQ(file.GetReadRanges().size(), 1); + ASSERT_EQ(file.GetReadRanges()[0], (io::ReadRange{0, 10})); + + // the previous read should advance the position + ASSERT_OK(file.Read(10)); + ASSERT_EQ(file.GetReadRanges().size(), 1); + // the two reads are merged into single continuous IO + ASSERT_EQ(file.GetReadRanges()[0], (io::ReadRange{0, 20})); +} + +Status MakeBooleanInt32Int64Batch(const int length, std::shared_ptr* out) { + // Make the schema + auto f0 = field("f0", boolean()); + auto f1 = field("f1", int32()); + auto f2 = field("f2", int64()); + auto schema = ::arrow::schema({f0, f1, f2}); + + std::shared_ptr a0, a1, a2; + RETURN_NOT_OK(MakeRandomBooleanArray(length, false, &a0)); + RETURN_NOT_OK(MakeRandomInt32Array(length, false, arrow::default_memory_pool(), &a1)); + RETURN_NOT_OK(MakeRandomInt64Array(length, false, arrow::default_memory_pool(), &a2)); + *out = RecordBatch::Make(schema, length, {a0, a1, a2}); + return Status::OK(); +} + +void GetReadRecordBatchReadRanges( + uint32_t num_rows, const std::vector& included_fields, + const std::vector& expected_body_read_lengths) { + std::shared_ptr batch; + // [bool, int32, int64] batch + ASSERT_OK(MakeBooleanInt32Int64Batch(num_rows, &batch)); + + ASSERT_OK_AND_ASSIGN(auto sink, io::BufferOutputStream::Create(0)); + ASSERT_OK_AND_ASSIGN(auto writer, MakeFileWriter(sink.get(), batch->schema())); + ASSERT_OK(writer->WriteRecordBatch(*batch)); + ASSERT_OK(writer->Close()); + ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish()); + + io::BufferReader buffer_reader(buffer); + TrackedRandomAccessFile tracked(&buffer_reader); + + auto read_options = IpcReadOptions::Defaults(); + // if empty, return all fields + read_options.included_fields = included_fields; + ASSERT_OK_AND_ASSIGN(auto reader, RecordBatchFileReader::Open(&tracked, read_options)); + ASSERT_OK_AND_ASSIGN(auto out_batch, reader->ReadRecordBatch(0)); + + ASSERT_EQ(out_batch->num_rows(), num_rows); + ASSERT_EQ(out_batch->num_columns(), + included_fields.empty() ? 3 : included_fields.size()); + + auto read_ranges = tracked.get_read_ranges(); + + // there are 3 read IOs before reading body: + // 1) read magic and footer length IO + // 2) read footer IO + // 3) read record batch metadata IO + ASSERT_EQ(read_ranges.size(), 3 + expected_body_read_lengths.size()); + const int32_t magic_size = static_cast(strlen(ipc::internal::kArrowMagicBytes)); + // read magic and footer length IO + auto file_end_size = magic_size + sizeof(int32_t); + ASSERT_EQ(read_ranges[0].length, file_end_size); + // read footer IO + ASSERT_EQ(read_ranges[1].length, 256); + // read record batch metadata + ASSERT_EQ(read_ranges[2].length, 240); + for (uint32_t i = 0; i < expected_body_read_lengths.size(); i++) { + ASSERT_EQ(read_ranges[3 + i].length, expected_body_read_lengths[i]); + } +} + +void GetReadRecordBatchReadRanges( + const std::vector& included_fields, + const std::vector& expected_body_read_lengths) { + return GetReadRecordBatchReadRanges(5, included_fields, expected_body_read_lengths); +} + +TEST(TestRecordBatchFileReaderIo, LoadAllFieldsShouldReadTheEntireBody) { + // read the entire record batch body in single read + // the batch has 5 * bool + 5 * int32 + 5 * int32 + // ==> + // + 5 bool: 5 bits (aligned to 8 bytes) + // + 5 int32: 5 * 4 bytes (aligned to 24 bytes) + // + 5 int64: 5 * 8 bytes (aligned to 40 bytes) + GetReadRecordBatchReadRanges({}, {8 + 24 + 40}); +} + +TEST(TestRecordBatchFileReaderIo, ReadSingleFieldAtTheStart) { + // read only the bool field + // + 5 bool: 5 bits (1 byte) + GetReadRecordBatchReadRanges({0}, {1}); +} + +TEST(TestRecordBatchFileReaderIo, ReadSingleFieldInTheMiddle) { + // read only the int32 field + // + 5 int32: 5 * 4 bytes + GetReadRecordBatchReadRanges({1}, {20}); +} + +TEST(TestRecordBatchFileReaderIo, ReadSingleFieldInTheEnd) { + // read only the int64 field + // + 5 int64: 5 * 8 bytes + GetReadRecordBatchReadRanges({2}, {40}); +} + +TEST(TestRecordBatchFileReaderIo, SkipTheFieldInTheMiddle) { + // read the bool field and the int64 field + // two IOs for body are expected, first for reading bool and the second for reading + // int64 + // + 5 bool: 5 bits (1 byte) + // + 5 int64: 5 * 8 bytes + GetReadRecordBatchReadRanges({0, 2}, {1, 40}); +} + +TEST(TestRecordBatchFileReaderIo, ReadTwoContinousFields) { + // read the int32 field and the int64 field + // + 5 int32: 5 * 4 bytes + // + 5 int64: 5 * 8 bytes + GetReadRecordBatchReadRanges({1, 2}, {20, 40}); +} + +TEST(TestRecordBatchFileReaderIo, ReadTwoContinousFieldsWithIoMerged) { + // change the array length to 64 so that bool field and int32 are continuous without + // padding + // read the bool field and the int32 field since the bool field's aligned offset + // is continuous with next field (int32 field), two IOs are merged into one + // + 64 bool: 64 bits (8 bytes) + // + 64 int32: 64 * 4 bytes (256 bytes) + GetReadRecordBatchReadRanges(64, {0, 1}, {8 + 64 * 4}); +} + } // namespace test } // namespace ipc } // namespace arrow diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc index a98f844c749..e9d85ff6088 100644 --- a/cpp/src/arrow/ipc/reader.cc +++ b/cpp/src/arrow/ipc/reader.cc @@ -36,6 +36,7 @@ #include "arrow/io/memory.h" #include "arrow/ipc/message.h" #include "arrow/ipc/metadata_internal.h" +#include "arrow/ipc/reader_internal.h" #include "arrow/ipc/util.h" #include "arrow/ipc/writer.h" #include "arrow/record_batch.h" @@ -967,8 +968,9 @@ static inline FileBlock FileBlockFromFlatbuffer(const flatbuf::Block* block) { return FileBlock{block->offset(), block->metaDataLength(), block->bodyLength()}; } -static Result> ReadMessageFromBlock(const FileBlock& block, - io::RandomAccessFile* file) { +static Result> ReadMessageFromBlock( + const FileBlock& block, io::RandomAccessFile* file, + const FieldsLoaderFunction& fields_loader) { if (!BitUtil::IsMultipleOf8(block.offset) || !BitUtil::IsMultipleOf8(block.metadata_length) || !BitUtil::IsMultipleOf8(block.body_length)) { @@ -978,8 +980,8 @@ static Result> ReadMessageFromBlock(const FileBlock& bl // TODO(wesm): this breaks integration tests, see ARROW-3256 // DCHECK_EQ((*out)->body_length(), block.body_length); - ARROW_ASSIGN_OR_RAISE(auto message, - ReadMessage(block.offset, block.metadata_length, file)); + ARROW_ASSIGN_OR_RAISE(auto message, ReadMessage(block.offset, block.metadata_length, + file, fields_loader)); return std::move(message); } @@ -1061,6 +1063,31 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { return internal::GetMetadataVersion(footer_->version()); } + static Status LoadFieldsSubset(const flatbuf::RecordBatch* metadata, + const IpcReadOptions& options, + io::RandomAccessFile* file, + const std::shared_ptr& schema, + const std::vector* inclusion_mask, + MetadataVersion metadata_version = MetadataVersion::V5) { + ArrayLoader loader(metadata, metadata_version, options, file); + for (int i = 0; i < schema->num_fields(); ++i) { + const Field& field = *schema->field(i); + if (!inclusion_mask || (*inclusion_mask)[i]) { + // Read field + ArrayData column; + RETURN_NOT_OK(loader.Load(&field, &column)); + if (metadata->length() != column.length) { + return Status::IOError("Array length did not match record batch length"); + } + } else { + // Skip field. This logic must be executed to advance the state of the + // loader to the next field + RETURN_NOT_OK(loader.SkipField(&field)); + } + } + return Status::OK(); + } + Result> ReadRecordBatch(int i) override { DCHECK_GE(i, 0); DCHECK_LT(i, num_record_batches()); @@ -1070,7 +1097,19 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { read_dictionaries_ = true; } - ARROW_ASSIGN_OR_RAISE(auto message, ReadMessageFromBlock(GetRecordBatchBlock(i))); + FieldsLoaderFunction fields_loader = {}; + if (!field_inclusion_mask_.empty()) { + auto& schema = schema_; + auto& inclusion_mask = field_inclusion_mask_; + auto& read_options = options_; + fields_loader = [schema, inclusion_mask, read_options](const void* metadata, + io::RandomAccessFile* file) { + return LoadFieldsSubset(static_cast(metadata), + read_options, file, schema, &inclusion_mask); + }; + } + ARROW_ASSIGN_OR_RAISE(auto message, + ReadMessageFromBlock(GetRecordBatchBlock(i), fields_loader)); CHECK_HAS_BODY(*message); ARROW_ASSIGN_OR_RAISE(auto reader, Buffer::GetReader(message->body())); @@ -1193,8 +1232,10 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { return FileBlockFromFlatbuffer(footer_->dictionaries()->Get(i)); } - Result> ReadMessageFromBlock(const FileBlock& block) { - ARROW_ASSIGN_OR_RAISE(auto message, arrow::ipc::ReadMessageFromBlock(block, file_)); + Result> ReadMessageFromBlock( + const FileBlock& block, const FieldsLoaderFunction& fields_loader = {}) { + ARROW_ASSIGN_OR_RAISE(auto message, + arrow::ipc::ReadMessageFromBlock(block, file_, fields_loader)); ++stats_.num_messages; return std::move(message); } @@ -1529,7 +1570,6 @@ class StreamDecoder::StreamDecoderImpl : public MessageDecoderListener { } Status OnRecordBatchMessageDecoded(std::unique_ptr message) { - IpcReadContext context(&dictionary_memo_, options_, swap_endian_); if (message->type() == MessageType::DICTIONARY_BATCH) { return ReadDictionary(*message); } else { @@ -2090,6 +2130,67 @@ Status FuzzIpcTensorStream(const uint8_t* data, int64_t size) { return Status::OK(); } +Result IoRecordedRandomAccessFile::GetSize() { return file_size_; } + +Result IoRecordedRandomAccessFile::ReadAt(int64_t position, int64_t nbytes, + void* out) { + auto num_bytes_read = std::min(file_size_, position + nbytes) - position; + + if (!read_ranges_.empty() && + position == read_ranges_.back().offset + read_ranges_.back().length) { + // merge continuous IOs into one if possible + read_ranges_.back().length += num_bytes_read; + } else { + // no real IO is performed, it is only saved into a vector for replaying later + read_ranges_.emplace_back(io::ReadRange{position, num_bytes_read}); + } + return num_bytes_read; +} + +Result> IoRecordedRandomAccessFile::ReadAt(int64_t position, + int64_t nbytes) { + std::shared_ptr out; + auto result = ReadAt(position, nbytes, &out); + return out; +} + +Status IoRecordedRandomAccessFile::Close() { + closed_ = true; + return Status::OK(); +} + +Status IoRecordedRandomAccessFile::Abort() { return Status::OK(); } + +Result IoRecordedRandomAccessFile::Tell() const { return position_; } + +bool IoRecordedRandomAccessFile::closed() const { return closed_; } + +Status IoRecordedRandomAccessFile::Seek(int64_t position) { + position_ = position; + return Status::OK(); +} + +Result IoRecordedRandomAccessFile::Read(int64_t nbytes, void* out) { + ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, ReadAt(position_, nbytes, out)); + position_ += bytes_read; + return bytes_read; +} + +Result> IoRecordedRandomAccessFile::Read(int64_t nbytes) { + ARROW_ASSIGN_OR_RAISE(std::shared_ptr buffer, ReadAt(position_, nbytes)); + auto num_bytes_read = std::min(file_size_, position_ + nbytes) - position_; + position_ += num_bytes_read; + return std::move(buffer); +} + +const io::IOContext& IoRecordedRandomAccessFile::io_context() const { + return io_context_; +} + +const std::vector& IoRecordedRandomAccessFile::GetReadRanges() const { + return read_ranges_; +} + } // namespace internal } // namespace ipc } // namespace arrow diff --git a/cpp/src/arrow/ipc/reader_internal.h b/cpp/src/arrow/ipc/reader_internal.h new file mode 100644 index 00000000000..a71d070bb0d --- /dev/null +++ b/cpp/src/arrow/ipc/reader_internal.h @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +#include "arrow/io/type_fwd.h" +#include "arrow/result.h" +#include "arrow/type_fwd.h" + +namespace arrow { +namespace io { +struct ReadRange; +} + +namespace ipc { + +namespace internal { +/// \class IoRecordedRandomAccessFile +/// \brief An RandomAccessFile that doesn't perform real IO, but only save all the IO +/// operations it receives, including read operation's , for replaying +/// later +class ARROW_EXPORT IoRecordedRandomAccessFile : public io::RandomAccessFile { + public: + explicit IoRecordedRandomAccessFile(const int64_t file_size) + : file_size_(file_size), position_(0) {} + + Status Close() override; + + Status Abort() override; + + /// \brief Return the position in this stream + Result Tell() const override; + + /// \brief Return whether the stream is closed + bool closed() const override; + + Status Seek(int64_t position) override; + + Result GetSize() override; + + Result ReadAt(int64_t position, int64_t nbytes, void* out) override; + + Result> ReadAt(int64_t position, int64_t nbytes) override; + + Result Read(int64_t nbytes, void* out) override; + + Result> Read(int64_t nbytes) override; + + const io::IOContext& io_context() const override; + + /// \brief Return a vector containing all the read operations this file receives, each + /// read operation is represented as an arrow::io::ReadRange + /// + /// \return a vector + const std::vector& GetReadRanges() const; + + private: + const int64_t file_size_; + std::vector read_ranges_; + int64_t position_; + bool closed_ = false; + io::IOContext io_context_; +}; + +} // namespace internal +} // namespace ipc +} // namespace arrow diff --git a/cpp/src/arrow/ipc/test_common.cc b/cpp/src/arrow/ipc/test_common.cc index 5068eca001a..e31bb530c03 100644 --- a/cpp/src/arrow/ipc/test_common.cc +++ b/cpp/src/arrow/ipc/test_common.cc @@ -83,6 +83,16 @@ Status MakeRandomInt32Array(int64_t length, bool include_nulls, MemoryPool* pool return Status::OK(); } +Status MakeRandomInt64Array(int64_t length, bool include_nulls, MemoryPool* pool, + std::shared_ptr* out, uint32_t seed) { + random::RandomArrayGenerator rand(seed); + const double null_probability = include_nulls ? 0.5 : 0.0; + + *out = rand.Int64(length, 0, 1000, null_probability); + + return Status::OK(); +} + namespace { template diff --git a/cpp/src/arrow/ipc/test_common.h b/cpp/src/arrow/ipc/test_common.h index 48df28b2d5a..b4c7e31c925 100644 --- a/cpp/src/arrow/ipc/test_common.h +++ b/cpp/src/arrow/ipc/test_common.h @@ -44,6 +44,10 @@ ARROW_TESTING_EXPORT Status MakeRandomInt32Array(int64_t length, bool include_nulls, MemoryPool* pool, std::shared_ptr* out, uint32_t seed = 0); +ARROW_TESTING_EXPORT +Status MakeRandomInt64Array(int64_t length, bool include_nulls, MemoryPool* pool, + std::shared_ptr* out, uint32_t seed = 0); + ARROW_TESTING_EXPORT Status MakeRandomListArray(const std::shared_ptr& child_array, int num_lists, bool include_nulls, MemoryPool* pool,