Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions cpp/src/arrow/io/file_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,22 @@ TEST_F(TestReadableFile, ReadAsync) {
AssertBufferEqual(*buf2, "test");
}

TEST_F(TestReadableFile, ReadManyAsync) {
MakeTestFile();
OpenFile();

std::vector<ReadRange> ranges = {{1, 3}, {2, 5}, {4, 2}};
auto futs = file_->ReadManyAsync(std::move(ranges));

ASSERT_EQ(futs.size(), 3);
ASSERT_OK_AND_ASSIGN(auto buf1, futs[0].result());
ASSERT_OK_AND_ASSIGN(auto buf2, futs[1].result());
ASSERT_OK_AND_ASSIGN(auto buf3, futs[2].result());
AssertBufferEqual(*buf1, "est");
AssertBufferEqual(*buf2, "stdat");
AssertBufferEqual(*buf3, "da");
}

TEST_F(TestReadableFile, SeekingRequired) {
MakeTestFile();
OpenFile();
Expand Down
14 changes: 14 additions & 0 deletions cpp/src/arrow/io/interfaces.cc
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,20 @@ Future<std::shared_ptr<Buffer>> RandomAccessFile::ReadAsync(int64_t position,
return ReadAsync(io_context(), position, nbytes);
}

std::vector<Future<std::shared_ptr<Buffer>>> RandomAccessFile::ReadManyAsync(
const IOContext& ctx, const std::vector<ReadRange>& ranges) {
std::vector<Future<std::shared_ptr<Buffer>>> ret;
for (auto r : ranges) {
ret.push_back(this->ReadAsync(ctx, r.offset, r.length));
}
return ret;
}

std::vector<Future<std::shared_ptr<Buffer>>> RandomAccessFile::ReadManyAsync(
const std::vector<ReadRange>& ranges) {
return ReadManyAsync(io_context(), ranges);
}

// Default WillNeed() implementation: no-op
Status RandomAccessFile::WillNeed(const std::vector<ReadRange>& ranges) {
return Status::OK();
Expand Down
21 changes: 21 additions & 0 deletions cpp/src/arrow/io/interfaces.h
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,27 @@ class ARROW_EXPORT RandomAccessFile : public InputStream, public Seekable {
/// EXPERIMENTAL: Read data asynchronously, using the file's IOContext.
Future<std::shared_ptr<Buffer>> ReadAsync(int64_t position, int64_t nbytes);

/// EXPERIMENTAL: Explicit multi-read.
/// \brief Request multiple reads at once
///
/// The underlying filesystem may optimize these reads by coalescing small reads into
/// large reads or by breaking up large reads into multiple parallel smaller reads. The
/// reads should be issued in parallel if it makes sense for the filesystem.
///
/// One future will be returned for each input read range. Multiple returned futures
/// may correspond to a single read. Or, a single returned future may be a combined
/// result of several individual reads.
///
/// \param[in] ranges The ranges to read
/// \return A future that will complete with the data from the requested range is
/// available
virtual std::vector<Future<std::shared_ptr<Buffer>>> ReadManyAsync(
const IOContext&, const std::vector<ReadRange>& ranges);

/// EXPERIMENTAL: Explicit multi-read, using the file's IOContext.
std::vector<Future<std::shared_ptr<Buffer>>> ReadManyAsync(
const std::vector<ReadRange>& ranges);

/// EXPERIMENTAL: Inform that the given ranges may be read soon.
///
/// Some implementations might arrange to prefetch some of the data.
Expand Down