Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion cpp/src/arrow/ipc/message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ Status ReadFieldsSubset(int64_t offset, int32_t metadata_length,
}

Result<std::unique_ptr<Message>> ReadMessage(int64_t offset, int32_t metadata_length,
int64_t body_length,
io::RandomAccessFile* file,
const FieldsLoaderFunction& fields_loader) {
std::unique_ptr<Message> result;
Expand All @@ -323,7 +324,14 @@ Result<std::unique_ptr<Message>> ReadMessage(int64_t offset, int32_t metadata_le
decoder.next_required_size());
}

ARROW_ASSIGN_OR_RAISE(auto metadata, file->ReadAt(offset, metadata_length));
// Don't read ahead if a fine-grained read was requested.
// TODO(ARROW-14577): this is suboptimal for column selection on
// high-latency filesystems.
int64_t to_read = metadata_length;
if (!fields_loader && body_length > 0) {
to_read += body_length;
}
ARROW_ASSIGN_OR_RAISE(auto metadata, file->ReadAt(offset, to_read));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: metadata is a slightly inaccurate name now.

if (metadata->size() < metadata_length) {
return Status::Invalid("Expected to read ", metadata_length,
" metadata bytes but got ", metadata->size());
Expand All @@ -347,6 +355,8 @@ Result<std::unique_ptr<Message>> ReadMessage(int64_t offset, int32_t metadata_le
body, AllocateBuffer(decoder.next_required_size(), default_memory_pool()));
RETURN_NOT_OK(ReadFieldsSubset(offset, metadata_length, file, fields_loader,
metadata, decoder.next_required_size(), body));
} else if (body_length >= 0) {
body = SliceBuffer(metadata, metadata_length, body_length);
} else {
ARROW_ASSIGN_OR_RAISE(
body, file->ReadAt(offset + metadata_length, decoder.next_required_size()));
Expand All @@ -366,6 +376,11 @@ Result<std::unique_ptr<Message>> ReadMessage(int64_t offset, int32_t metadata_le
}
}

Result<std::unique_ptr<Message>> ReadMessage(int64_t offset, int32_t metadata_length,
io::RandomAccessFile* file) {
return ReadMessage(offset, metadata_length, -1, file);
}

Future<std::shared_ptr<Message>> ReadMessageAsync(int64_t offset, int32_t metadata_length,
int64_t body_length,
io::RandomAccessFile* file,
Expand Down
17 changes: 13 additions & 4 deletions cpp/src/arrow/ipc/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -456,15 +456,24 @@ using FieldsLoaderFunction = std::function<Status(const void*, io::RandomAccessF
///
/// \param[in] offset the position in the file where the message starts. The
/// first 4 bytes after the offset are the message length
/// \param[in] metadata_length the total number of bytes to read from file
/// \param[in] metadata_length the size of the message header
/// \param[in] body_length the size of the message body, if known, or
/// -1 otherwise. When provided, the entire message will be read in
/// one I/O operation, potentially reducing I/O costs on high-latency
/// filesystems.
/// \param[in] file the seekable file interface to read from
/// \param[in] fields_loader the function for loading subset of fields from the given file
/// \return the message read

ARROW_EXPORT
Result<std::unique_ptr<Message>> ReadMessage(
const int64_t offset, const int32_t metadata_length, io::RandomAccessFile* file,
const FieldsLoaderFunction& fields_loader = {});
const int64_t offset, const int32_t metadata_length, const int64_t body_length,
io::RandomAccessFile* file, const FieldsLoaderFunction& fields_loader = {});

/// \brief Read encapsulated RPC message from position in file
ARROW_EXPORT
Result<std::unique_ptr<Message>> ReadMessage(const int64_t offset,
const int32_t metadata_length,
io::RandomAccessFile* file);

ARROW_EXPORT
Future<std::shared_ptr<Message>> ReadMessageAsync(
Expand Down
26 changes: 20 additions & 6 deletions cpp/src/arrow/ipc/read_write_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2561,21 +2561,35 @@ void GetReadRecordBatchReadRanges(
// 1) read magic and footer length IO
// 2) read footer IO
// 3) read record batch metadata IO
ASSERT_EQ(read_ranges.size(), 3 + expected_body_read_lengths.size());
// ARROW-14429: 1+2 are merged together for small footers (which is
// the case in all tests here)
if (included_fields.empty()) {
// ARROW-14429: The I/O for the metadata is merged with the body itself
ASSERT_EQ(1, expected_body_read_lengths.size());
ASSERT_EQ(read_ranges.size(), 2);
} else {
ASSERT_EQ(read_ranges.size(), 2 + expected_body_read_lengths.size());
}
const int32_t magic_size = static_cast<int>(strlen(ipc::internal::kArrowMagicBytes));
// read magic and footer length IO
auto file_end_size = magic_size + sizeof(int32_t);
auto footer_length_offset = buffer->size() - file_end_size;
auto footer_length = BitUtil::FromLittleEndian(
util::SafeLoadAs<int32_t>(buffer->data() + footer_length_offset));
ASSERT_EQ(read_ranges[0].length, file_end_size);
// read footer IO
ASSERT_EQ(read_ranges[1].length, footer_length);
// ARROW-14429: the reader eagerly reads a fixed chunk of the end of
// the file to try to avoid a separate call to read the footer, so it may read too much
ASSERT_GE(read_ranges[0].length, file_end_size + footer_length);
// read record batch metadata. The exact size is tricky to determine but it doesn't
// matter for this test and it should be smaller than the footer.
ASSERT_LT(read_ranges[2].length, footer_length);
if (included_fields.empty()) {
// ARROW-14429: The I/O for the metadata is merged with the body itself
ASSERT_LT(read_ranges[1].length, footer_length + expected_body_read_lengths.front());
return;
}
// The metadata is read separately
ASSERT_LT(read_ranges[1].length, footer_length);
for (uint32_t i = 0; i < expected_body_read_lengths.size(); i++) {
ASSERT_EQ(read_ranges[3 + i].length, expected_body_read_lengths[i]);
ASSERT_EQ(read_ranges[2 + i].length, expected_body_read_lengths[i]);
}
}

Expand Down
43 changes: 32 additions & 11 deletions cpp/src/arrow/ipc/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -980,8 +980,9 @@ static Result<std::unique_ptr<Message>> ReadMessageFromBlock(
// TODO(wesm): this breaks integration tests, see ARROW-3256
// DCHECK_EQ((*out)->body_length(), block.body_length);

ARROW_ASSIGN_OR_RAISE(auto message, ReadMessage(block.offset, block.metadata_length,
file, fields_loader));
ARROW_ASSIGN_OR_RAISE(
auto message, ReadMessage(block.offset, block.metadata_length, block.body_length,
file, fields_loader));
return std::move(message);
}

Expand Down Expand Up @@ -1257,15 +1258,23 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
}

Future<> ReadFooterAsync(arrow::internal::Executor* executor) {
// When reading the footer, read up to this much additional data in
// an attempt to avoid a second I/O operation (which can be slow
// on a high-latency filesystem like S3)
constexpr static int kFooterReadaheadSize = 512 * 1024;

const int32_t magic_size = static_cast<int>(strlen(kArrowMagicBytes));

if (footer_offset_ <= magic_size * 2 + 4) {
return Status::Invalid("File is too small: ", footer_offset_);
}

int file_end_size = static_cast<int>(magic_size + sizeof(int32_t));
int readahead = std::min<int>(kFooterReadaheadSize,
static_cast<int>(footer_offset_ - file_end_size));
auto self = std::dynamic_pointer_cast<RecordBatchFileReaderImpl>(shared_from_this());
auto read_magic = file_->ReadAsync(footer_offset_ - file_end_size, file_end_size);
auto read_magic = file_->ReadAsync(footer_offset_ - file_end_size - readahead,
file_end_size + readahead);
if (executor) read_magic = executor->Transfer(std::move(read_magic));
return read_magic
.Then([=](const std::shared_ptr<Buffer>& buffer)
Expand All @@ -1276,23 +1285,35 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
"from end of file");
}

if (memcmp(buffer->data() + sizeof(int32_t), kArrowMagicBytes, magic_size)) {
const uint8_t* magic_start = buffer->data() + readahead;
if (memcmp(magic_start + sizeof(int32_t), kArrowMagicBytes, magic_size)) {
return Status::Invalid("Not an Arrow file");
}

int32_t footer_length = BitUtil::FromLittleEndian(
*reinterpret_cast<const int32_t*>(buffer->data()));

int32_t footer_length =
BitUtil::FromLittleEndian(util::SafeLoadAs<int32_t>(magic_start));
if (footer_length <= 0 ||
footer_length > self->footer_offset_ - magic_size * 2 - 4) {
return Status::Invalid("File is smaller than indicated metadata size");
}

// Now read the footer
auto read_footer = self->file_->ReadAsync(
self->footer_offset_ - footer_length - file_end_size, footer_length);
if (executor) read_footer = executor->Transfer(std::move(read_footer));
return read_footer;
if (footer_length <= readahead) {
return SliceBuffer(buffer, buffer->size() - file_end_size - footer_length,
footer_length);
}

Comment on lines +1301 to +1305
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this condition is false would it be faster to read just the remaining portion instead of rereading a part of the file?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated this to just read the missing part of the footer.

const int64_t already_read = buffer->size() - file_end_size;
auto read_remainder =
self->file_->ReadAsync(self->footer_offset_ - footer_length - file_end_size,
footer_length - already_read);
auto* memory_pool = options_.memory_pool;
if (executor) read_remainder = executor->Transfer(std::move(read_remainder));
return read_remainder.Then([memory_pool, buffer, already_read](
const std::shared_ptr<Buffer>& remainder) {
return ConcatenateBuffers({remainder, SliceBuffer(buffer, 0, already_read)},
memory_pool);
});
})
.Then([=](const std::shared_ptr<Buffer>& buffer) -> Status {
self->footer_buffer_ = buffer;
Expand Down