From 86e988386a48103ceb46cf58e2aee934f238fa2b Mon Sep 17 00:00:00 2001
From: David Li
Date: Mon, 25 Oct 2021 09:26:07 -0400
Subject: [PATCH 1/6] ARROW-14429: [C++] Read entire IPC message at once
---
cpp/src/arrow/ipc/message.cc | 17 ++++++++++++++++-
cpp/src/arrow/ipc/message.h | 17 +++++++++++++----
cpp/src/arrow/ipc/read_write_test.cc | 14 +++++++++++++-
cpp/src/arrow/ipc/reader.cc | 5 +++--
4 files changed, 45 insertions(+), 8 deletions(-)
diff --git a/cpp/src/arrow/ipc/message.cc b/cpp/src/arrow/ipc/message.cc
index ab20897e861..496c8f92ec2 100644
--- a/cpp/src/arrow/ipc/message.cc
+++ b/cpp/src/arrow/ipc/message.cc
@@ -312,6 +312,7 @@ Status ReadFieldsSubset(int64_t offset, int32_t metadata_length,
}
Result> ReadMessage(int64_t offset, int32_t metadata_length,
+ int64_t body_length,
io::RandomAccessFile* file,
const FieldsLoaderFunction& fields_loader) {
std::unique_ptr result;
@@ -323,7 +324,14 @@ Result> ReadMessage(int64_t offset, int32_t metadata_le
decoder.next_required_size());
}
- ARROW_ASSIGN_OR_RAISE(auto metadata, file->ReadAt(offset, metadata_length));
+ // Don't read ahead if a fine-grained read was requested.
+ // TODO(ARROW-14577): this is suboptimal for column selection on
+ // high-latency filesystems.
+ int64_t to_read = metadata_length;
+ if (!fields_loader && body_length > 0) {
+ to_read += body_length;
+ }
+ ARROW_ASSIGN_OR_RAISE(auto metadata, file->ReadAt(offset, to_read));
if (metadata->size() < metadata_length) {
return Status::Invalid("Expected to read ", metadata_length,
" metadata bytes but got ", metadata->size());
@@ -347,6 +355,8 @@ Result> ReadMessage(int64_t offset, int32_t metadata_le
body, AllocateBuffer(decoder.next_required_size(), default_memory_pool()));
RETURN_NOT_OK(ReadFieldsSubset(offset, metadata_length, file, fields_loader,
metadata, decoder.next_required_size(), body));
+ } else if (body_length >= 0) {
+ body = SliceBuffer(metadata, metadata_length, body_length);
} else {
ARROW_ASSIGN_OR_RAISE(
body, file->ReadAt(offset + metadata_length, decoder.next_required_size()));
@@ -366,6 +376,11 @@ Result> ReadMessage(int64_t offset, int32_t metadata_le
}
}
+Result> ReadMessage(int64_t offset, int32_t metadata_length,
+ io::RandomAccessFile* file) {
+ return ReadMessage(offset, metadata_length, -1, file);
+}
+
Future> ReadMessageAsync(int64_t offset, int32_t metadata_length,
int64_t body_length,
io::RandomAccessFile* file,
diff --git a/cpp/src/arrow/ipc/message.h b/cpp/src/arrow/ipc/message.h
index 9c0ed8ced2e..5c735590307 100644
--- a/cpp/src/arrow/ipc/message.h
+++ b/cpp/src/arrow/ipc/message.h
@@ -456,15 +456,24 @@ using FieldsLoaderFunction = std::function> ReadMessage(
- const int64_t offset, const int32_t metadata_length, io::RandomAccessFile* file,
- const FieldsLoaderFunction& fields_loader = {});
+ const int64_t offset, const int32_t metadata_length, const int64_t body_length,
+ io::RandomAccessFile* file, const FieldsLoaderFunction& fields_loader = {});
+
+/// \brief Read encapsulated RPC message from position in file
+ARROW_EXPORT
+Result> ReadMessage(const int64_t offset,
+ const int32_t metadata_length,
+ io::RandomAccessFile* file);
ARROW_EXPORT
Future> ReadMessageAsync(
diff --git a/cpp/src/arrow/ipc/read_write_test.cc b/cpp/src/arrow/ipc/read_write_test.cc
index 7bff394bcc7..9c55d4ba7cb 100644
--- a/cpp/src/arrow/ipc/read_write_test.cc
+++ b/cpp/src/arrow/ipc/read_write_test.cc
@@ -2561,7 +2561,13 @@ void GetReadRecordBatchReadRanges(
// 1) read magic and footer length IO
// 2) read footer IO
// 3) read record batch metadata IO
- ASSERT_EQ(read_ranges.size(), 3 + expected_body_read_lengths.size());
+ if (included_fields.empty()) {
+ // ARROW-14429: The I/O for the metadata is merged with the body itself
+ ASSERT_EQ(1, expected_body_read_lengths.size());
+ ASSERT_EQ(read_ranges.size(), 3);
+ } else {
+ ASSERT_EQ(read_ranges.size(), 3 + expected_body_read_lengths.size());
+ }
const int32_t magic_size = static_cast(strlen(ipc::internal::kArrowMagicBytes));
// read magic and footer length IO
auto file_end_size = magic_size + sizeof(int32_t);
@@ -2573,6 +2579,12 @@ void GetReadRecordBatchReadRanges(
ASSERT_EQ(read_ranges[1].length, footer_length);
// read record batch metadata. The exact size is tricky to determine but it doesn't
// matter for this test and it should be smaller than the footer.
+ if (included_fields.empty()) {
+ // ARROW-14429: The I/O for the metadata is merged with the body itself
+ ASSERT_LT(read_ranges[2].length, footer_length + expected_body_read_lengths.front());
+ return;
+ }
+ // The metadata is read separately
ASSERT_LT(read_ranges[2].length, footer_length);
for (uint32_t i = 0; i < expected_body_read_lengths.size(); i++) {
ASSERT_EQ(read_ranges[3 + i].length, expected_body_read_lengths[i]);
diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
index e9d85ff6088..b8574f4575a 100644
--- a/cpp/src/arrow/ipc/reader.cc
+++ b/cpp/src/arrow/ipc/reader.cc
@@ -980,8 +980,9 @@ static Result> ReadMessageFromBlock(
// TODO(wesm): this breaks integration tests, see ARROW-3256
// DCHECK_EQ((*out)->body_length(), block.body_length);
- ARROW_ASSIGN_OR_RAISE(auto message, ReadMessage(block.offset, block.metadata_length,
- file, fields_loader));
+ ARROW_ASSIGN_OR_RAISE(
+ auto message, ReadMessage(block.offset, block.metadata_length, block.body_length,
+ file, fields_loader));
return std::move(message);
}
From 99ac26f4a4b8c8b0760a70ad3a345f5da7ebcb8c Mon Sep 17 00:00:00 2001
From: David Li
Date: Mon, 25 Oct 2021 09:54:40 -0400
Subject: [PATCH 2/6] ARROW-14429: [C++] Try to avoid two calls to read footer
---
cpp/src/arrow/ipc/reader.cc | 24 +++++++++++++++++++-----
1 file changed, 19 insertions(+), 5 deletions(-)
diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
index b8574f4575a..8dedb7e8ede 100644
--- a/cpp/src/arrow/ipc/reader.cc
+++ b/cpp/src/arrow/ipc/reader.cc
@@ -1258,6 +1258,11 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
}
Future<> ReadFooterAsync(arrow::internal::Executor* executor) {
+ // When reading the footer, read this much additional data in an
+ // attempt to avoid a second I/O operation (which can be slow on a
+ // high-latency filesystem like S3)
+ constexpr static int kFooterReadaheadSize = 512 * 1024;
+
const int32_t magic_size = static_cast(strlen(kArrowMagicBytes));
if (footer_offset_ <= magic_size * 2 + 4) {
@@ -1265,8 +1270,12 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
}
int file_end_size = static_cast(magic_size + sizeof(int32_t));
+ int readahead = std::min(kFooterReadaheadSize,
+ static_cast(footer_offset_ - file_end_size));
+ file_end_size = file_end_size;
auto self = std::dynamic_pointer_cast(shared_from_this());
- auto read_magic = file_->ReadAsync(footer_offset_ - file_end_size, file_end_size);
+ auto read_magic = file_->ReadAsync(footer_offset_ - file_end_size - readahead,
+ file_end_size + readahead);
if (executor) read_magic = executor->Transfer(std::move(read_magic));
return read_magic
.Then([=](const std::shared_ptr& buffer)
@@ -1277,19 +1286,24 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
"from end of file");
}
- if (memcmp(buffer->data() + sizeof(int32_t), kArrowMagicBytes, magic_size)) {
+ const uint8_t* magic_start = buffer->data() + readahead;
+ if (memcmp(magic_start + sizeof(int32_t), kArrowMagicBytes, magic_size)) {
return Status::Invalid("Not an Arrow file");
}
- int32_t footer_length = BitUtil::FromLittleEndian(
- *reinterpret_cast(buffer->data()));
-
+ int32_t footer_length =
+ BitUtil::FromLittleEndian(util::SafeLoadAs(magic_start));
if (footer_length <= 0 ||
footer_length > self->footer_offset_ - magic_size * 2 - 4) {
return Status::Invalid("File is smaller than indicated metadata size");
}
// Now read the footer
+ if (footer_length <= readahead) {
+ return SliceBuffer(buffer, buffer->size() - file_end_size - footer_length,
+ footer_length);
+ }
+
auto read_footer = self->file_->ReadAsync(
self->footer_offset_ - footer_length - file_end_size, footer_length);
if (executor) read_footer = executor->Transfer(std::move(read_footer));
From fd1902cee0347ddaae9ed687588a283a70ba28c6 Mon Sep 17 00:00:00 2001
From: David Li
Date: Mon, 25 Oct 2021 12:02:45 -0400
Subject: [PATCH 3/6] ARROW-14429: [C++] Remove extraneous line
---
cpp/src/arrow/ipc/reader.cc | 1 -
1 file changed, 1 deletion(-)
diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
index 8dedb7e8ede..fd833f5ff94 100644
--- a/cpp/src/arrow/ipc/reader.cc
+++ b/cpp/src/arrow/ipc/reader.cc
@@ -1272,7 +1272,6 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
int file_end_size = static_cast(magic_size + sizeof(int32_t));
int readahead = std::min(kFooterReadaheadSize,
static_cast(footer_offset_ - file_end_size));
- file_end_size = file_end_size;
auto self = std::dynamic_pointer_cast(shared_from_this());
auto read_magic = file_->ReadAsync(footer_offset_ - file_end_size - readahead,
file_end_size + readahead);
From c3758b11141a25be4d1e526a396d1d48a094a252 Mon Sep 17 00:00:00 2001
From: David Li
Date: Fri, 29 Oct 2021 12:03:46 -0400
Subject: [PATCH 4/6] ARROW-14429: [C++] Add regression test
---
cpp/src/arrow/ipc/read_write_test.cc | 18 ++++++++++--------
1 file changed, 10 insertions(+), 8 deletions(-)
diff --git a/cpp/src/arrow/ipc/read_write_test.cc b/cpp/src/arrow/ipc/read_write_test.cc
index 9c55d4ba7cb..be5f09d3530 100644
--- a/cpp/src/arrow/ipc/read_write_test.cc
+++ b/cpp/src/arrow/ipc/read_write_test.cc
@@ -2561,12 +2561,14 @@ void GetReadRecordBatchReadRanges(
// 1) read magic and footer length IO
// 2) read footer IO
// 3) read record batch metadata IO
+ // ARROW-14429: 1+2 are merged together for small footers (which is
+ // the case in all tests here)
if (included_fields.empty()) {
// ARROW-14429: The I/O for the metadata is merged with the body itself
ASSERT_EQ(1, expected_body_read_lengths.size());
- ASSERT_EQ(read_ranges.size(), 3);
+ ASSERT_EQ(read_ranges.size(), 2);
} else {
- ASSERT_EQ(read_ranges.size(), 3 + expected_body_read_lengths.size());
+ ASSERT_EQ(read_ranges.size(), 2 + expected_body_read_lengths.size());
}
const int32_t magic_size = static_cast(strlen(ipc::internal::kArrowMagicBytes));
// read magic and footer length IO
@@ -2574,20 +2576,20 @@ void GetReadRecordBatchReadRanges(
auto footer_length_offset = buffer->size() - file_end_size;
auto footer_length = BitUtil::FromLittleEndian(
util::SafeLoadAs(buffer->data() + footer_length_offset));
- ASSERT_EQ(read_ranges[0].length, file_end_size);
- // read footer IO
- ASSERT_EQ(read_ranges[1].length, footer_length);
+ // ARROW-14429: the reader eagerly reads a fixed chunk of the end of
+ // the file to try to avoid a separate call to read the footer, so it may read too much
+ ASSERT_GE(read_ranges[0].length, file_end_size + footer_length);
// read record batch metadata. The exact size is tricky to determine but it doesn't
// matter for this test and it should be smaller than the footer.
if (included_fields.empty()) {
// ARROW-14429: The I/O for the metadata is merged with the body itself
- ASSERT_LT(read_ranges[2].length, footer_length + expected_body_read_lengths.front());
+ ASSERT_LT(read_ranges[1].length, footer_length + expected_body_read_lengths.front());
return;
}
// The metadata is read separately
- ASSERT_LT(read_ranges[2].length, footer_length);
+ ASSERT_LT(read_ranges[1].length, footer_length);
for (uint32_t i = 0; i < expected_body_read_lengths.size(); i++) {
- ASSERT_EQ(read_ranges[3 + i].length, expected_body_read_lengths[i]);
+ ASSERT_EQ(read_ranges[2 + i].length, expected_body_read_lengths[i]);
}
}
From 860a14d99670ed0068d55c539676e124706a860e Mon Sep 17 00:00:00 2001
From: David Li
Date: Fri, 29 Oct 2021 21:07:53 -0400
Subject: [PATCH 5/6] Update cpp/src/arrow/ipc/reader.cc
Co-authored-by: Weston Pace
---
cpp/src/arrow/ipc/reader.cc | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
index fd833f5ff94..6d5bb4d4757 100644
--- a/cpp/src/arrow/ipc/reader.cc
+++ b/cpp/src/arrow/ipc/reader.cc
@@ -1258,9 +1258,9 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
}
Future<> ReadFooterAsync(arrow::internal::Executor* executor) {
- // When reading the footer, read this much additional data in an
- // attempt to avoid a second I/O operation (which can be slow on a
- // high-latency filesystem like S3)
+ // When reading the footer, read up to this much additional data in
+ // an attempt to avoid a second I/O operation (which can be slow
+ // on a high-latency filesystem like S3)
constexpr static int kFooterReadaheadSize = 512 * 1024;
const int32_t magic_size = static_cast(strlen(kArrowMagicBytes));
From 3c2ae3d89f5bc05119a4e41990cc456028230274 Mon Sep 17 00:00:00 2001
From: David Li
Date: Sat, 30 Oct 2021 14:32:10 -0400
Subject: [PATCH 6/6] ARROW-14429: [C++] Address feedback
---
cpp/src/arrow/ipc/reader.cc | 15 +++++++++++----
1 file changed, 11 insertions(+), 4 deletions(-)
diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
index 6d5bb4d4757..4741d91485d 100644
--- a/cpp/src/arrow/ipc/reader.cc
+++ b/cpp/src/arrow/ipc/reader.cc
@@ -1303,10 +1303,17 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
footer_length);
}
- auto read_footer = self->file_->ReadAsync(
- self->footer_offset_ - footer_length - file_end_size, footer_length);
- if (executor) read_footer = executor->Transfer(std::move(read_footer));
- return read_footer;
+ const int64_t already_read = buffer->size() - file_end_size;
+ auto read_remainder =
+ self->file_->ReadAsync(self->footer_offset_ - footer_length - file_end_size,
+ footer_length - already_read);
+ auto* memory_pool = options_.memory_pool;
+ if (executor) read_remainder = executor->Transfer(std::move(read_remainder));
+ return read_remainder.Then([memory_pool, buffer, already_read](
+ const std::shared_ptr& remainder) {
+ return ConcatenateBuffers({remainder, SliceBuffer(buffer, 0, already_read)},
+ memory_pool);
+ });
})
.Then([=](const std::shared_ptr& buffer) -> Status {
self->footer_buffer_ = buffer;