Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 44 additions & 3 deletions cpp/src/arrow/ipc/message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
#include "arrow/io/interfaces.h"
#include "arrow/ipc/metadata_internal.h"
#include "arrow/ipc/options.h"
#include "arrow/ipc/reader.h"
#include "arrow/ipc/reader_internal.h"
#include "arrow/ipc/util.h"
#include "arrow/status.h"
#include "arrow/util/endian.h"
Expand Down Expand Up @@ -279,8 +281,39 @@ std::string FormatMessageType(MessageType type) {
return "unknown";
}

Status ReadFieldsSubset(int64_t offset, int32_t metadata_length,
io::RandomAccessFile* file,
const FieldsLoaderFunction& fields_loader,
const std::shared_ptr<Buffer>& metadata, int64_t required_size,
std::shared_ptr<Buffer>& body) {
const flatbuf::Message* message = nullptr;
uint8_t continuation_metadata_size = sizeof(int32_t) + sizeof(int32_t);
// skip 8 bytes (32-bit continuation indicator + 32-bit little-endian length prefix)
RETURN_NOT_OK(internal::VerifyMessage(metadata->data() + continuation_metadata_size,
metadata->size() - continuation_metadata_size,
&message));
auto batch = message->header_as_RecordBatch();
if (batch == nullptr) {
return Status::IOError(
"Header-type of flatbuffer-encoded Message is not RecordBatch.");
}
internal::IoRecordedRandomAccessFile io_recorded_random_access_file(required_size);
RETURN_NOT_OK(fields_loader(batch, &io_recorded_random_access_file));
auto const& read_ranges = io_recorded_random_access_file.GetReadRanges();
for (auto const& range : read_ranges) {
auto read_result = file->ReadAt(offset + metadata_length + range.offset, range.length,
body->mutable_data() + range.offset);
if (!read_result.ok()) {
return Status::IOError("Failed to read message body, error ",
read_result.status().ToString());
}
}
return Status::OK();
}

Result<std::unique_ptr<Message>> ReadMessage(int64_t offset, int32_t metadata_length,
io::RandomAccessFile* file) {
io::RandomAccessFile* file,
const FieldsLoaderFunction& fields_loader) {
std::unique_ptr<Message> result;
auto listener = std::make_shared<AssignMessageDecoderListener>(&result);
MessageDecoder decoder(listener);
Expand Down Expand Up @@ -308,8 +341,16 @@ Result<std::unique_ptr<Message>> ReadMessage(int64_t offset, int32_t metadata_le
" invalid. File offset: ", offset,
", metadata length: ", metadata_length);
case MessageDecoder::State::BODY: {
ARROW_ASSIGN_OR_RAISE(auto body, file->ReadAt(offset + metadata_length,
decoder.next_required_size()));
std::shared_ptr<Buffer> body;
if (fields_loader) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Depending on if included_fields are used in IpcReadOptions, here either fields_loader will be used to load each field's buffers or the entire body will be loaded.

ARROW_ASSIGN_OR_RAISE(
body, AllocateBuffer(decoder.next_required_size(), default_memory_pool()));
RETURN_NOT_OK(ReadFieldsSubset(offset, metadata_length, file, fields_loader,
metadata, decoder.next_required_size(), body));
} else {
ARROW_ASSIGN_OR_RAISE(
body, file->ReadAt(offset + metadata_length, decoder.next_required_size()));
}
if (body->size() < decoder.next_required_size()) {
return Status::IOError("Expected to be able to read ",
decoder.next_required_size(),
Expand Down
13 changes: 10 additions & 3 deletions cpp/src/arrow/ipc/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#pragma once

#include <cstdint>
#include <functional>
#include <memory>
#include <string>
#include <utility>
Expand Down Expand Up @@ -441,6 +442,10 @@ class ARROW_EXPORT MessageReader {
virtual Result<std::unique_ptr<Message>> ReadNextMessage() = 0;
};

// the first parameter of the function should be a pointer to metadata (aka.
// org::apache::arrow::flatbuf::RecordBatch*)
using FieldsLoaderFunction = std::function<Status(const void*, io::RandomAccessFile*)>;

/// \brief Read encapsulated RPC message from position in file
///
/// Read a length-prefixed message flatbuffer starting at the indicated file
Expand All @@ -453,11 +458,13 @@ class ARROW_EXPORT MessageReader {
/// first 4 bytes after the offset are the message length
/// \param[in] metadata_length the total number of bytes to read from file
/// \param[in] file the seekable file interface to read from
/// \param[in] fields_loader the function for loading subset of fields from the given file
/// \return the message read

ARROW_EXPORT
Result<std::unique_ptr<Message>> ReadMessage(const int64_t offset,
const int32_t metadata_length,
io::RandomAccessFile* file);
Result<std::unique_ptr<Message>> ReadMessage(
const int64_t offset, const int32_t metadata_length, io::RandomAccessFile* file,
const FieldsLoaderFunction& fields_loader = {});

ARROW_EXPORT
Future<std::shared_ptr<Message>> ReadMessageAsync(
Expand Down
225 changes: 225 additions & 0 deletions cpp/src/arrow/ipc/read_write_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "arrow/ipc/message.h"
#include "arrow/ipc/metadata_internal.h"
#include "arrow/ipc/reader.h"
#include "arrow/ipc/reader_internal.h"
#include "arrow/ipc/test_common.h"
#include "arrow/ipc/writer.h"
#include "arrow/record_batch.h"
Expand Down Expand Up @@ -62,6 +63,7 @@ using internal::TemporaryDir;
namespace ipc {

using internal::FieldPosition;
using internal::IoRecordedRandomAccessFile;

namespace test {

Expand Down Expand Up @@ -1706,6 +1708,54 @@ TEST_F(TestFileFormat, ReadFieldSubset) { TestReadSubsetOfFields(); }

TEST_F(TestFileFormatGenerator, ReadFieldSubset) { TestReadSubsetOfFields(); }

class TrackedRandomAccessFile : public io::RandomAccessFile {
public:
explicit TrackedRandomAccessFile(io::RandomAccessFile* delegate)
: delegate_(delegate) {}

Status Close() override { return delegate_->Close(); }
bool closed() const override { return delegate_->closed(); }
Result<int64_t> Tell() const override { return delegate_->Tell(); }
Status Seek(int64_t position) override { return delegate_->Seek(position); }
Result<int64_t> Read(int64_t nbytes, void* out) override {
ARROW_ASSIGN_OR_RAISE(auto position, delegate_->Tell());
SaveReadRange(position, nbytes);
return delegate_->Read(nbytes, out);
}
Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) override {
ARROW_ASSIGN_OR_RAISE(auto position, delegate_->Tell());
SaveReadRange(position, nbytes);
return delegate_->Read(nbytes);
}
bool supports_zero_copy() const override { return delegate_->supports_zero_copy(); }
Result<int64_t> GetSize() override { return delegate_->GetSize(); }
Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) override {
SaveReadRange(position, nbytes);
return delegate_->ReadAt(position, nbytes, out);
}
Result<std::shared_ptr<Buffer>> ReadAt(int64_t position, int64_t nbytes) override {
SaveReadRange(position, nbytes);
return delegate_->ReadAt(position, nbytes);
}
Future<std::shared_ptr<Buffer>> ReadAsync(const io::IOContext& io_context,
int64_t position, int64_t nbytes) override {
SaveReadRange(position, nbytes);
return delegate_->ReadAsync(io_context, position, nbytes);
}

int64_t num_reads() const { return read_ranges_.size(); }

const std::vector<io::ReadRange>& get_read_ranges() const { return read_ranges_; }

private:
io::RandomAccessFile* delegate_;
std::vector<io::ReadRange> read_ranges_;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lidavidm I copy the TrackedRandomAccessFile into this PR, and tracking the read ranges using a vector, since I think the num_reads is just the length of this vector, I remove the read_ member variable in https://github.com/apache/arrow/pull/11535/files#diff-900c46995b5706697d6e4b010f610f1a1cf27d4d865afe48de0a800830ac676bL1708


void SaveReadRange(int64_t offset, int64_t length) {
read_ranges_.emplace_back(io::ReadRange{offset, length});
}
};

TEST(TestRecordBatchStreamReader, EmptyStreamWithDictionaries) {
// ARROW-6006
auto f0 = arrow::field("f0", arrow::dictionary(arrow::int8(), arrow::utf8()));
Expand Down Expand Up @@ -2410,6 +2460,181 @@ TEST(DictionaryMemo, AddDictionaryType) {
AssertMemoDictionaryType(memo, 44, utf8());
}

TEST(IoRecordedRandomAccessFile, IoRecording) {
IoRecordedRandomAccessFile file(42);
ASSERT_TRUE(file.GetReadRanges().empty());

ASSERT_OK(file.ReadAt(1, 2));
ASSERT_EQ(file.GetReadRanges().size(), 1);
ASSERT_EQ(file.GetReadRanges()[0], (io::ReadRange{1, 2}));

ASSERT_OK(file.ReadAt(5, 3));
ASSERT_EQ(file.GetReadRanges().size(), 2);
ASSERT_EQ(file.GetReadRanges()[1], (io::ReadRange{5, 3}));

// continuous IOs will be merged
ASSERT_OK(file.ReadAt(5 + 3, 6));
ASSERT_EQ(file.GetReadRanges().size(), 2);
ASSERT_EQ(file.GetReadRanges()[1], (io::ReadRange{5, 3 + 6}));

// this should not happen but reading out of bounds will do no harm
ASSERT_OK(file.ReadAt(43, 1));
}

TEST(IoRecordedRandomAccessFile, IoRecordingWithOutput) {
std::shared_ptr<Buffer> out;
IoRecordedRandomAccessFile file(42);
ASSERT_TRUE(file.GetReadRanges().empty());
ASSERT_EQ(file.ReadAt(1, 2, &out), 2L);
ASSERT_EQ(file.GetReadRanges().size(), 1);
ASSERT_EQ(file.GetReadRanges()[0], (io::ReadRange{1, 2}));

ASSERT_EQ(file.ReadAt(5, 1, &out), 1);
ASSERT_EQ(file.GetReadRanges().size(), 2);
ASSERT_EQ(file.GetReadRanges()[1], (io::ReadRange{5, 1}));

// continuous IOs will be merged
ASSERT_EQ(file.ReadAt(5 + 1, 6, &out), 6);
ASSERT_EQ(file.GetReadRanges().size(), 2);
ASSERT_EQ(file.GetReadRanges()[1], (io::ReadRange{5, 1 + 6}));
}

TEST(IoRecordedRandomAccessFile, ReadWithCurrentPosition) {
IoRecordedRandomAccessFile file(42);
ASSERT_TRUE(file.GetReadRanges().empty());

ASSERT_OK(file.Read(10));
ASSERT_EQ(file.GetReadRanges().size(), 1);
ASSERT_EQ(file.GetReadRanges()[0], (io::ReadRange{0, 10}));

// the previous read should advance the position
ASSERT_OK(file.Read(10));
ASSERT_EQ(file.GetReadRanges().size(), 1);
// the two reads are merged into single continuous IO
ASSERT_EQ(file.GetReadRanges()[0], (io::ReadRange{0, 20}));
}

Status MakeBooleanInt32Int64Batch(const int length, std::shared_ptr<RecordBatch>* out) {
// Make the schema
auto f0 = field("f0", boolean());
auto f1 = field("f1", int32());
auto f2 = field("f2", int64());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A record batch with 3 fields (bool/int32/int64) are used for IO testing below.

auto schema = ::arrow::schema({f0, f1, f2});

std::shared_ptr<Array> a0, a1, a2;
RETURN_NOT_OK(MakeRandomBooleanArray(length, false, &a0));
RETURN_NOT_OK(MakeRandomInt32Array(length, false, arrow::default_memory_pool(), &a1));
RETURN_NOT_OK(MakeRandomInt64Array(length, false, arrow::default_memory_pool(), &a2));
*out = RecordBatch::Make(schema, length, {a0, a1, a2});
return Status::OK();
}

void GetReadRecordBatchReadRanges(
uint32_t num_rows, const std::vector<int>& included_fields,
const std::vector<int64_t>& expected_body_read_lengths) {
std::shared_ptr<RecordBatch> batch;
// [bool, int32, int64] batch
ASSERT_OK(MakeBooleanInt32Int64Batch(num_rows, &batch));

ASSERT_OK_AND_ASSIGN(auto sink, io::BufferOutputStream::Create(0));
ASSERT_OK_AND_ASSIGN(auto writer, MakeFileWriter(sink.get(), batch->schema()));
ASSERT_OK(writer->WriteRecordBatch(*batch));
ASSERT_OK(writer->Close());
ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());

io::BufferReader buffer_reader(buffer);
TrackedRandomAccessFile tracked(&buffer_reader);

auto read_options = IpcReadOptions::Defaults();
// if empty, return all fields
read_options.included_fields = included_fields;
ASSERT_OK_AND_ASSIGN(auto reader, RecordBatchFileReader::Open(&tracked, read_options));
ASSERT_OK_AND_ASSIGN(auto out_batch, reader->ReadRecordBatch(0));

ASSERT_EQ(out_batch->num_rows(), num_rows);
ASSERT_EQ(out_batch->num_columns(),
included_fields.empty() ? 3 : included_fields.size());

auto read_ranges = tracked.get_read_ranges();

// there are 3 read IOs before reading body:
// 1) read magic and footer length IO
// 2) read footer IO
// 3) read record batch metadata IO
ASSERT_EQ(read_ranges.size(), 3 + expected_body_read_lengths.size());
const int32_t magic_size = static_cast<int>(strlen(ipc::internal::kArrowMagicBytes));
// read magic and footer length IO
auto file_end_size = magic_size + sizeof(int32_t);
ASSERT_EQ(read_ranges[0].length, file_end_size);
// read footer IO
ASSERT_EQ(read_ranges[1].length, 256);
// read record batch metadata
ASSERT_EQ(read_ranges[2].length, 240);
for (uint32_t i = 0; i < expected_body_read_lengths.size(); i++) {
ASSERT_EQ(read_ranges[3 + i].length, expected_body_read_lengths[i]);
}
}

void GetReadRecordBatchReadRanges(
const std::vector<int>& included_fields,
const std::vector<int64_t>& expected_body_read_lengths) {
return GetReadRecordBatchReadRanges(5, included_fields, expected_body_read_lengths);
}

TEST(TestRecordBatchFileReaderIo, LoadAllFieldsShouldReadTheEntireBody) {
// read the entire record batch body in single read
// the batch has 5 * bool + 5 * int32 + 5 * int32
// ==>
// + 5 bool: 5 bits (aligned to 8 bytes)
// + 5 int32: 5 * 4 bytes (aligned to 24 bytes)
// + 5 int64: 5 * 8 bytes (aligned to 40 bytes)
GetReadRecordBatchReadRanges({}, {8 + 24 + 40});
}

TEST(TestRecordBatchFileReaderIo, ReadSingleFieldAtTheStart) {
// read only the bool field
// + 5 bool: 5 bits (1 byte)
GetReadRecordBatchReadRanges({0}, {1});
}

TEST(TestRecordBatchFileReaderIo, ReadSingleFieldInTheMiddle) {
// read only the int32 field
// + 5 int32: 5 * 4 bytes
GetReadRecordBatchReadRanges({1}, {20});
}

TEST(TestRecordBatchFileReaderIo, ReadSingleFieldInTheEnd) {
// read only the int64 field
// + 5 int64: 5 * 8 bytes
GetReadRecordBatchReadRanges({2}, {40});
}

TEST(TestRecordBatchFileReaderIo, SkipTheFieldInTheMiddle) {
// read the bool field and the int64 field
// two IOs for body are expected, first for reading bool and the second for reading
// int64
// + 5 bool: 5 bits (1 byte)
// + 5 int64: 5 * 8 bytes
GetReadRecordBatchReadRanges({0, 2}, {1, 40});
}

TEST(TestRecordBatchFileReaderIo, ReadTwoContinousFields) {
// read the int32 field and the int64 field
// + 5 int32: 5 * 4 bytes
// + 5 int64: 5 * 8 bytes
GetReadRecordBatchReadRanges({1, 2}, {20, 40});
}

TEST(TestRecordBatchFileReaderIo, ReadTwoContinousFieldsWithIoMerged) {
// change the array length to 64 so that bool field and int32 are continuous without
// padding
// read the bool field and the int32 field since the bool field's aligned offset
// is continuous with next field (int32 field), two IOs are merged into one
// + 64 bool: 64 bits (8 bytes)
// + 64 int32: 64 * 4 bytes (256 bytes)
GetReadRecordBatchReadRanges(64, {0, 1}, {8 + 64 * 4});
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Several RecordBatchFileReader IO related tests are added above


} // namespace test
} // namespace ipc
} // namespace arrow
Loading