diff --git a/cpp/src/arrow/ipc/message.cc b/cpp/src/arrow/ipc/message.cc index ab20897e861..496c8f92ec2 100644 --- a/cpp/src/arrow/ipc/message.cc +++ b/cpp/src/arrow/ipc/message.cc @@ -312,6 +312,7 @@ Status ReadFieldsSubset(int64_t offset, int32_t metadata_length, } Result> ReadMessage(int64_t offset, int32_t metadata_length, + int64_t body_length, io::RandomAccessFile* file, const FieldsLoaderFunction& fields_loader) { std::unique_ptr result; @@ -323,7 +324,14 @@ Result> ReadMessage(int64_t offset, int32_t metadata_le decoder.next_required_size()); } - ARROW_ASSIGN_OR_RAISE(auto metadata, file->ReadAt(offset, metadata_length)); + // Don't read ahead if a fine-grained read was requested. + // TODO(ARROW-14577): this is suboptimal for column selection on + // high-latency filesystems. + int64_t to_read = metadata_length; + if (!fields_loader && body_length > 0) { + to_read += body_length; + } + ARROW_ASSIGN_OR_RAISE(auto metadata, file->ReadAt(offset, to_read)); if (metadata->size() < metadata_length) { return Status::Invalid("Expected to read ", metadata_length, " metadata bytes but got ", metadata->size()); @@ -347,6 +355,8 @@ Result> ReadMessage(int64_t offset, int32_t metadata_le body, AllocateBuffer(decoder.next_required_size(), default_memory_pool())); RETURN_NOT_OK(ReadFieldsSubset(offset, metadata_length, file, fields_loader, metadata, decoder.next_required_size(), body)); + } else if (body_length >= 0) { + body = SliceBuffer(metadata, metadata_length, body_length); } else { ARROW_ASSIGN_OR_RAISE( body, file->ReadAt(offset + metadata_length, decoder.next_required_size())); @@ -366,6 +376,11 @@ Result> ReadMessage(int64_t offset, int32_t metadata_le } } +Result> ReadMessage(int64_t offset, int32_t metadata_length, + io::RandomAccessFile* file) { + return ReadMessage(offset, metadata_length, -1, file); +} + Future> ReadMessageAsync(int64_t offset, int32_t metadata_length, int64_t body_length, io::RandomAccessFile* file, diff --git a/cpp/src/arrow/ipc/message.h b/cpp/src/arrow/ipc/message.h index 9c0ed8ced2e..5c735590307 100644 --- a/cpp/src/arrow/ipc/message.h +++ b/cpp/src/arrow/ipc/message.h @@ -456,15 +456,24 @@ using FieldsLoaderFunction = std::function> ReadMessage( - const int64_t offset, const int32_t metadata_length, io::RandomAccessFile* file, - const FieldsLoaderFunction& fields_loader = {}); + const int64_t offset, const int32_t metadata_length, const int64_t body_length, + io::RandomAccessFile* file, const FieldsLoaderFunction& fields_loader = {}); + +/// \brief Read encapsulated RPC message from position in file +ARROW_EXPORT +Result> ReadMessage(const int64_t offset, + const int32_t metadata_length, + io::RandomAccessFile* file); ARROW_EXPORT Future> ReadMessageAsync( diff --git a/cpp/src/arrow/ipc/read_write_test.cc b/cpp/src/arrow/ipc/read_write_test.cc index 7bff394bcc7..be5f09d3530 100644 --- a/cpp/src/arrow/ipc/read_write_test.cc +++ b/cpp/src/arrow/ipc/read_write_test.cc @@ -2561,21 +2561,35 @@ void GetReadRecordBatchReadRanges( // 1) read magic and footer length IO // 2) read footer IO // 3) read record batch metadata IO - ASSERT_EQ(read_ranges.size(), 3 + expected_body_read_lengths.size()); + // ARROW-14429: 1+2 are merged together for small footers (which is + // the case in all tests here) + if (included_fields.empty()) { + // ARROW-14429: The I/O for the metadata is merged with the body itself + ASSERT_EQ(1, expected_body_read_lengths.size()); + ASSERT_EQ(read_ranges.size(), 2); + } else { + ASSERT_EQ(read_ranges.size(), 2 + expected_body_read_lengths.size()); + } const int32_t magic_size = static_cast(strlen(ipc::internal::kArrowMagicBytes)); // read magic and footer length IO auto file_end_size = magic_size + sizeof(int32_t); auto footer_length_offset = buffer->size() - file_end_size; auto footer_length = BitUtil::FromLittleEndian( util::SafeLoadAs(buffer->data() + footer_length_offset)); - ASSERT_EQ(read_ranges[0].length, file_end_size); - // read footer IO - ASSERT_EQ(read_ranges[1].length, footer_length); + // ARROW-14429: the reader eagerly reads a fixed chunk of the end of + // the file to try to avoid a separate call to read the footer, so it may read too much + ASSERT_GE(read_ranges[0].length, file_end_size + footer_length); // read record batch metadata. The exact size is tricky to determine but it doesn't // matter for this test and it should be smaller than the footer. - ASSERT_LT(read_ranges[2].length, footer_length); + if (included_fields.empty()) { + // ARROW-14429: The I/O for the metadata is merged with the body itself + ASSERT_LT(read_ranges[1].length, footer_length + expected_body_read_lengths.front()); + return; + } + // The metadata is read separately + ASSERT_LT(read_ranges[1].length, footer_length); for (uint32_t i = 0; i < expected_body_read_lengths.size(); i++) { - ASSERT_EQ(read_ranges[3 + i].length, expected_body_read_lengths[i]); + ASSERT_EQ(read_ranges[2 + i].length, expected_body_read_lengths[i]); } } diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc index e9d85ff6088..4741d91485d 100644 --- a/cpp/src/arrow/ipc/reader.cc +++ b/cpp/src/arrow/ipc/reader.cc @@ -980,8 +980,9 @@ static Result> ReadMessageFromBlock( // TODO(wesm): this breaks integration tests, see ARROW-3256 // DCHECK_EQ((*out)->body_length(), block.body_length); - ARROW_ASSIGN_OR_RAISE(auto message, ReadMessage(block.offset, block.metadata_length, - file, fields_loader)); + ARROW_ASSIGN_OR_RAISE( + auto message, ReadMessage(block.offset, block.metadata_length, block.body_length, + file, fields_loader)); return std::move(message); } @@ -1257,6 +1258,11 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { } Future<> ReadFooterAsync(arrow::internal::Executor* executor) { + // When reading the footer, read up to this much additional data in + // an attempt to avoid a second I/O operation (which can be slow + // on a high-latency filesystem like S3) + constexpr static int kFooterReadaheadSize = 512 * 1024; + const int32_t magic_size = static_cast(strlen(kArrowMagicBytes)); if (footer_offset_ <= magic_size * 2 + 4) { @@ -1264,8 +1270,11 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { } int file_end_size = static_cast(magic_size + sizeof(int32_t)); + int readahead = std::min(kFooterReadaheadSize, + static_cast(footer_offset_ - file_end_size)); auto self = std::dynamic_pointer_cast(shared_from_this()); - auto read_magic = file_->ReadAsync(footer_offset_ - file_end_size, file_end_size); + auto read_magic = file_->ReadAsync(footer_offset_ - file_end_size - readahead, + file_end_size + readahead); if (executor) read_magic = executor->Transfer(std::move(read_magic)); return read_magic .Then([=](const std::shared_ptr& buffer) @@ -1276,23 +1285,35 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { "from end of file"); } - if (memcmp(buffer->data() + sizeof(int32_t), kArrowMagicBytes, magic_size)) { + const uint8_t* magic_start = buffer->data() + readahead; + if (memcmp(magic_start + sizeof(int32_t), kArrowMagicBytes, magic_size)) { return Status::Invalid("Not an Arrow file"); } - int32_t footer_length = BitUtil::FromLittleEndian( - *reinterpret_cast(buffer->data())); - + int32_t footer_length = + BitUtil::FromLittleEndian(util::SafeLoadAs(magic_start)); if (footer_length <= 0 || footer_length > self->footer_offset_ - magic_size * 2 - 4) { return Status::Invalid("File is smaller than indicated metadata size"); } // Now read the footer - auto read_footer = self->file_->ReadAsync( - self->footer_offset_ - footer_length - file_end_size, footer_length); - if (executor) read_footer = executor->Transfer(std::move(read_footer)); - return read_footer; + if (footer_length <= readahead) { + return SliceBuffer(buffer, buffer->size() - file_end_size - footer_length, + footer_length); + } + + const int64_t already_read = buffer->size() - file_end_size; + auto read_remainder = + self->file_->ReadAsync(self->footer_offset_ - footer_length - file_end_size, + footer_length - already_read); + auto* memory_pool = options_.memory_pool; + if (executor) read_remainder = executor->Transfer(std::move(read_remainder)); + return read_remainder.Then([memory_pool, buffer, already_read]( + const std::shared_ptr& remainder) { + return ConcatenateBuffers({remainder, SliceBuffer(buffer, 0, already_read)}, + memory_pool); + }); }) .Then([=](const std::shared_ptr& buffer) -> Status { self->footer_buffer_ = buffer;