From baf029e66771dfb66a6f6002f6d77b44f7a78ec3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Percy=20Camilo=20Trive=C3=B1o=20Aucahuasi?= Date: Wed, 23 Nov 2022 11:20:45 -0500 Subject: [PATCH 1/3] add RandomAccessFile::ReadManyAsync --- cpp/src/arrow/io/file_test.cc | 20 ++++++++++++++++++++ cpp/src/arrow/io/interfaces.cc | 14 ++++++++++++++ cpp/src/arrow/io/interfaces.h | 21 +++++++++++++++++++++ 3 files changed, 55 insertions(+) diff --git a/cpp/src/arrow/io/file_test.cc b/cpp/src/arrow/io/file_test.cc index b5c8797b0b0..c94f638ff75 100644 --- a/cpp/src/arrow/io/file_test.cc +++ b/cpp/src/arrow/io/file_test.cc @@ -387,6 +387,26 @@ TEST_F(TestReadableFile, ReadAsync) { AssertBufferEqual(*buf2, "test"); } +TEST_F(TestReadableFile, ReadManyAsync) { + MakeTestFile(); + OpenFile(); + + std::vector ranges = { + {1, 3}, + {2, 5}, + {4, 2} + }; + + auto futs = file_->ReadManyAsync(std::move(ranges)); + ASSERT_EQ(futs.size(), 3); + ASSERT_OK_AND_ASSIGN(auto buf1, futs[0].result()); + ASSERT_OK_AND_ASSIGN(auto buf2, futs[1].result()); + ASSERT_OK_AND_ASSIGN(auto buf3, futs[2].result()); + AssertBufferEqual(*buf1, "est"); + AssertBufferEqual(*buf2, "stdat"); + AssertBufferEqual(*buf3, "da"); +} + TEST_F(TestReadableFile, SeekingRequired) { MakeTestFile(); OpenFile(); diff --git a/cpp/src/arrow/io/interfaces.cc b/cpp/src/arrow/io/interfaces.cc index a78bc1b55c2..b4e0d8fe133 100644 --- a/cpp/src/arrow/io/interfaces.cc +++ b/cpp/src/arrow/io/interfaces.cc @@ -175,6 +175,20 @@ Future> RandomAccessFile::ReadAsync(int64_t position, return ReadAsync(io_context(), position, nbytes); } +std::vector>> RandomAccessFile::ReadManyAsync( + const IOContext&, const std::vector& ranges) { + std::vector>> ret; + for (auto r : ranges) { + ret.push_back(this->ReadAsync(r.offset, r.length)); + } + return ret; +} + +std::vector>> RandomAccessFile::ReadManyAsync( + const std::vector& ranges) { + return ReadManyAsync(io_context(), ranges); +} + // Default WillNeed() implementation: no-op Status RandomAccessFile::WillNeed(const std::vector& ranges) { return Status::OK(); diff --git a/cpp/src/arrow/io/interfaces.h b/cpp/src/arrow/io/interfaces.h index 86e9ad2d524..936b8c4fe94 100644 --- a/cpp/src/arrow/io/interfaces.h +++ b/cpp/src/arrow/io/interfaces.h @@ -305,6 +305,27 @@ class ARROW_EXPORT RandomAccessFile : public InputStream, public Seekable { /// EXPERIMENTAL: Read data asynchronously, using the file's IOContext. Future> ReadAsync(int64_t position, int64_t nbytes); + /// EXPERIMENTAL: Explicit multi-read. + /// \brief Request multiple reads at once + /// + /// The underlying filesystem may optimize these reads by coalescing small reads into + /// large reads or by breaking up large reads into multiple parallel smaller reads. The + /// reads should be issued in parallel if it makes sense for the filesystem. + /// + /// One future will be returned for each input read range. Multiple returned futures + /// may correspond to a single read. Or, a single returned future may be a combined + /// result of several individual reads. + /// + /// \param[in] ranges The ranges to read + /// \return A future that will complete with the data from the requested range is + /// available + virtual std::vector>> ReadManyAsync( + const IOContext&, const std::vector& ranges); + + /// EXPERIMENTAL: Explicit multi-read, using the file's IOContext. + std::vector>> ReadManyAsync( + const std::vector& ranges); + /// EXPERIMENTAL: Inform that the given ranges may be read soon. /// /// Some implementations might arrange to prefetch some of the data. From 7b1103fa2b58a8aed74144ce140196d0427ea45d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Percy=20Camilo=20Trive=C3=B1o=20Aucahuasi?= Date: Wed, 23 Nov 2022 16:44:43 -0500 Subject: [PATCH 2/3] format --- cpp/src/arrow/io/file_test.cc | 8 ++------ cpp/src/arrow/io/interfaces.h | 4 ++-- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/cpp/src/arrow/io/file_test.cc b/cpp/src/arrow/io/file_test.cc index c94f638ff75..16989ec2707 100644 --- a/cpp/src/arrow/io/file_test.cc +++ b/cpp/src/arrow/io/file_test.cc @@ -391,13 +391,9 @@ TEST_F(TestReadableFile, ReadManyAsync) { MakeTestFile(); OpenFile(); - std::vector ranges = { - {1, 3}, - {2, 5}, - {4, 2} - }; - + std::vector ranges = {{1, 3}, {2, 5}, {4, 2}}; auto futs = file_->ReadManyAsync(std::move(ranges)); + ASSERT_EQ(futs.size(), 3); ASSERT_OK_AND_ASSIGN(auto buf1, futs[0].result()); ASSERT_OK_AND_ASSIGN(auto buf2, futs[1].result()); diff --git a/cpp/src/arrow/io/interfaces.h b/cpp/src/arrow/io/interfaces.h index 936b8c4fe94..c5355c94227 100644 --- a/cpp/src/arrow/io/interfaces.h +++ b/cpp/src/arrow/io/interfaces.h @@ -320,11 +320,11 @@ class ARROW_EXPORT RandomAccessFile : public InputStream, public Seekable { /// \return A future that will complete with the data from the requested range is /// available virtual std::vector>> ReadManyAsync( - const IOContext&, const std::vector& ranges); + const IOContext&, const std::vector& ranges); /// EXPERIMENTAL: Explicit multi-read, using the file's IOContext. std::vector>> ReadManyAsync( - const std::vector& ranges); + const std::vector& ranges); /// EXPERIMENTAL: Inform that the given ranges may be read soon. /// From 4c8a81d460f55553332af0683abcb0c3eaed996e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Percy=20Camilo=20Trive=C3=B1o=20Aucahuasi?= Date: Mon, 28 Nov 2022 00:16:43 -0500 Subject: [PATCH 3/3] pass io_context --- cpp/src/arrow/io/interfaces.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/io/interfaces.cc b/cpp/src/arrow/io/interfaces.cc index b4e0d8fe133..e7819e139f6 100644 --- a/cpp/src/arrow/io/interfaces.cc +++ b/cpp/src/arrow/io/interfaces.cc @@ -176,10 +176,10 @@ Future> RandomAccessFile::ReadAsync(int64_t position, } std::vector>> RandomAccessFile::ReadManyAsync( - const IOContext&, const std::vector& ranges) { + const IOContext& ctx, const std::vector& ranges) { std::vector>> ret; for (auto r : ranges) { - ret.push_back(this->ReadAsync(r.offset, r.length)); + ret.push_back(this->ReadAsync(ctx, r.offset, r.length)); } return ret; }