From baf029e66771dfb66a6f6002f6d77b44f7a78ec3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Percy=20Camilo=20Trive=C3=B1o=20Aucahuasi?= Date: Wed, 23 Nov 2022 11:20:45 -0500 Subject: [PATCH 1/3] add RandomAccessFile::ReadManyAsync --- cpp/src/arrow/io/file_test.cc | 20 ++++++++++++++++++++ cpp/src/arrow/io/interfaces.cc | 14 ++++++++++++++ cpp/src/arrow/io/interfaces.h | 21 +++++++++++++++++++++ 3 files changed, 55 insertions(+) diff --git a/cpp/src/arrow/io/file_test.cc b/cpp/src/arrow/io/file_test.cc index b5c8797b0b0..c94f638ff75 100644 --- a/cpp/src/arrow/io/file_test.cc +++ b/cpp/src/arrow/io/file_test.cc @@ -387,6 +387,26 @@ TEST_F(TestReadableFile, ReadAsync) { AssertBufferEqual(*buf2, "test"); } +TEST_F(TestReadableFile, ReadManyAsync) { + MakeTestFile(); + OpenFile(); + + std::vector ranges = { + {1, 3}, + {2, 5}, + {4, 2} + }; + + auto futs = file_->ReadManyAsync(std::move(ranges)); + ASSERT_EQ(futs.size(), 3); + ASSERT_OK_AND_ASSIGN(auto buf1, futs[0].result()); + ASSERT_OK_AND_ASSIGN(auto buf2, futs[1].result()); + ASSERT_OK_AND_ASSIGN(auto buf3, futs[2].result()); + AssertBufferEqual(*buf1, "est"); + AssertBufferEqual(*buf2, "stdat"); + AssertBufferEqual(*buf3, "da"); +} + TEST_F(TestReadableFile, SeekingRequired) { MakeTestFile(); OpenFile(); diff --git a/cpp/src/arrow/io/interfaces.cc b/cpp/src/arrow/io/interfaces.cc index a78bc1b55c2..b4e0d8fe133 100644 --- a/cpp/src/arrow/io/interfaces.cc +++ b/cpp/src/arrow/io/interfaces.cc @@ -175,6 +175,20 @@ Future> RandomAccessFile::ReadAsync(int64_t position, return ReadAsync(io_context(), position, nbytes); } +std::vector>> RandomAccessFile::ReadManyAsync( + const IOContext&, const std::vector& ranges) { + std::vector>> ret; + for (auto r : ranges) { + ret.push_back(this->ReadAsync(r.offset, r.length)); + } + return ret; +} + +std::vector>> RandomAccessFile::ReadManyAsync( + const std::vector& ranges) { + return ReadManyAsync(io_context(), ranges); +} + // Default WillNeed() implementation: no-op Status RandomAccessFile::WillNeed(const std::vector& ranges) { return Status::OK(); diff --git a/cpp/src/arrow/io/interfaces.h b/cpp/src/arrow/io/interfaces.h index 86e9ad2d524..936b8c4fe94 100644 --- a/cpp/src/arrow/io/interfaces.h +++ b/cpp/src/arrow/io/interfaces.h @@ -305,6 +305,27 @@ class ARROW_EXPORT RandomAccessFile : public InputStream, public Seekable { /// EXPERIMENTAL: Read data asynchronously, using the file's IOContext. Future> ReadAsync(int64_t position, int64_t nbytes); + /// EXPERIMENTAL: Explicit multi-read. + /// \brief Request multiple reads at once + /// + /// The underlying filesystem may optimize these reads by coalescing small reads into + /// large reads or by breaking up large reads into multiple parallel smaller reads. The + /// reads should be issued in parallel if it makes sense for the filesystem. + /// + /// One future will be returned for each input read range. Multiple returned futures + /// may correspond to a single read. Or, a single returned future may be a combined + /// result of several individual reads. + /// + /// \param[in] ranges The ranges to read + /// \return A future that will complete with the data from the requested range is + /// available + virtual std::vector>> ReadManyAsync( + const IOContext&, const std::vector& ranges); + + /// EXPERIMENTAL: Explicit multi-read, using the file's IOContext. + std::vector>> ReadManyAsync( + const std::vector& ranges); + /// EXPERIMENTAL: Inform that the given ranges may be read soon. /// /// Some implementations might arrange to prefetch some of the data. From 7b1103fa2b58a8aed74144ce140196d0427ea45d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Percy=20Camilo=20Trive=C3=B1o=20Aucahuasi?= Date: Wed, 23 Nov 2022 16:44:43 -0500 Subject: [PATCH 2/3] format --- cpp/src/arrow/io/file_test.cc | 8 ++------ cpp/src/arrow/io/interfaces.h | 4 ++-- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/cpp/src/arrow/io/file_test.cc b/cpp/src/arrow/io/file_test.cc index c94f638ff75..16989ec2707 100644 --- a/cpp/src/arrow/io/file_test.cc +++ b/cpp/src/arrow/io/file_test.cc @@ -391,13 +391,9 @@ TEST_F(TestReadableFile, ReadManyAsync) { MakeTestFile(); OpenFile(); - std::vector ranges = { - {1, 3}, - {2, 5}, - {4, 2} - }; - + std::vector ranges = {{1, 3}, {2, 5}, {4, 2}}; auto futs = file_->ReadManyAsync(std::move(ranges)); + ASSERT_EQ(futs.size(), 3); ASSERT_OK_AND_ASSIGN(auto buf1, futs[0].result()); ASSERT_OK_AND_ASSIGN(auto buf2, futs[1].result()); diff --git a/cpp/src/arrow/io/interfaces.h b/cpp/src/arrow/io/interfaces.h index 936b8c4fe94..c5355c94227 100644 --- a/cpp/src/arrow/io/interfaces.h +++ b/cpp/src/arrow/io/interfaces.h @@ -320,11 +320,11 @@ class ARROW_EXPORT RandomAccessFile : public InputStream, public Seekable { /// \return A future that will complete with the data from the requested range is /// available virtual std::vector>> ReadManyAsync( - const IOContext&, const std::vector& ranges); + const IOContext&, const std::vector& ranges); /// EXPERIMENTAL: Explicit multi-read, using the file's IOContext. std::vector>> ReadManyAsync( - const std::vector& ranges); + const std::vector& ranges); /// EXPERIMENTAL: Inform that the given ranges may be read soon. /// From 57afc25a71a79da91353dc8b43be1719387db3b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Percy=20Camilo=20Trive=C3=B1o=20Aucahuasi?= Date: Mon, 28 Nov 2022 01:29:41 -0500 Subject: [PATCH 3/3] Use RandomAccessFile::ReadManyAsync for s3 filesystem and rename arrow::io::CacheOptions to arrow::io::CoalesceOptions --- cpp/src/arrow/filesystem/s3fs.cc | 19 +++++++++++++++---- cpp/src/arrow/filesystem/s3fs.h | 7 +++++++ cpp/src/arrow/io/caching.cc | 27 +++++++++++++-------------- cpp/src/arrow/io/caching.h | 20 +++++++++++--------- cpp/src/arrow/io/type_fwd.h | 2 +- cpp/src/arrow/ipc/options.h | 2 +- cpp/src/arrow/ipc/reader.cc | 4 ++-- cpp/src/arrow/ipc/reader.h | 2 +- 8 files changed, 51 insertions(+), 32 deletions(-) diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc index 16ffe25266c..11f8ecd8327 100644 --- a/cpp/src/arrow/filesystem/s3fs.cc +++ b/cpp/src/arrow/filesystem/s3fs.cc @@ -996,10 +996,11 @@ class ObjectInputFile final : public io::RandomAccessFile { public: ObjectInputFile(std::shared_ptr client, const io::IOContext& io_context, const S3Path& path, - int64_t size = kNoSize) + const io::CoalesceOptions& coalesceOptions, int64_t size = kNoSize) : client_(std::move(client)), io_context_(io_context), path_(path), + coalesceOptions(coalesceOptions), content_length_(size) {} Status Init() { @@ -1128,6 +1129,14 @@ class ObjectInputFile final : public io::RandomAccessFile { return bytes_read; } + std::vector>> ReadManyAsync( + const io::IOContext& ctx, const std::vector& ranges) override { + return RandomAccessFile::ReadManyAsync( + ctx, io::internal::CoalesceReadRanges(std::move(ranges), + coalesceOptions.hole_size_limit, + coalesceOptions.range_size_limit)); + } + Result> Read(int64_t nbytes) override { ARROW_ASSIGN_OR_RAISE(auto buffer, ReadAt(pos_, nbytes)); pos_ += buffer->size(); @@ -1138,6 +1147,7 @@ class ObjectInputFile final : public io::RandomAccessFile { std::shared_ptr client_; const io::IOContext io_context_; S3Path path_; + io::CoalesceOptions coalesceOptions; bool closed_ = false; int64_t pos_ = 0; @@ -2178,7 +2188,8 @@ class S3FileSystem::Impl : public std::enable_shared_from_this(client_, fs->io_context(), path); + auto ptr = std::make_shared(client_, fs->io_context(), path, + builder_.options().coalesceOptions); RETURN_NOT_OK(ptr->Init()); return ptr; } @@ -2196,8 +2207,8 @@ class S3FileSystem::Impl : public std::enable_shared_from_this(client_, fs->io_context(), path, info.size()); + auto ptr = std::make_shared( + client_, fs->io_context(), path, builder_.options().coalesceOptions, info.size()); RETURN_NOT_OK(ptr->Init()); return ptr; } diff --git a/cpp/src/arrow/filesystem/s3fs.h b/cpp/src/arrow/filesystem/s3fs.h index ba642ebe61c..dfe97dd4097 100644 --- a/cpp/src/arrow/filesystem/s3fs.h +++ b/cpp/src/arrow/filesystem/s3fs.h @@ -22,6 +22,7 @@ #include #include "arrow/filesystem/filesystem.h" +#include "arrow/io/caching.h" #include "arrow/util/macros.h" #include "arrow/util/uri.h" @@ -167,6 +168,12 @@ struct ARROW_EXPORT S3Options { /// delay between retries. std::shared_ptr retry_strategy; + /// Coalesce options for requesting multiple reads at once + io::CoalesceOptions coalesceOptions = io::CoalesceOptions::Defaults(); + + int64_t hole_size_limit = 1; + int64_t range_size_limit = 100; + S3Options(); /// Configure with the default AWS credentials provider chain. diff --git a/cpp/src/arrow/io/caching.cc b/cpp/src/arrow/io/caching.cc index 1cbebfd935e..e8ad2e2e4c2 100644 --- a/cpp/src/arrow/io/caching.cc +++ b/cpp/src/arrow/io/caching.cc @@ -32,22 +32,21 @@ namespace arrow { namespace io { -CacheOptions CacheOptions::Defaults() { - return CacheOptions{internal::ReadRangeCache::kDefaultHoleSizeLimit, - internal::ReadRangeCache::kDefaultRangeSizeLimit, - /*lazy=*/false}; +CoalesceOptions CoalesceOptions::Defaults() { + return CoalesceOptions{internal::ReadRangeCache::kDefaultHoleSizeLimit, + internal::ReadRangeCache::kDefaultRangeSizeLimit, + /*lazy=*/false}; } -CacheOptions CacheOptions::LazyDefaults() { - return CacheOptions{internal::ReadRangeCache::kDefaultHoleSizeLimit, - internal::ReadRangeCache::kDefaultRangeSizeLimit, - /*lazy=*/true}; +CoalesceOptions CoalesceOptions::LazyDefaults() { + return CoalesceOptions{internal::ReadRangeCache::kDefaultHoleSizeLimit, + internal::ReadRangeCache::kDefaultRangeSizeLimit, + /*lazy=*/true}; } -CacheOptions CacheOptions::MakeFromNetworkMetrics(int64_t time_to_first_byte_millis, - int64_t transfer_bandwidth_mib_per_sec, - double ideal_bandwidth_utilization_frac, - int64_t max_ideal_request_size_mib) { +CoalesceOptions CoalesceOptions::MakeFromNetworkMetrics( + int64_t time_to_first_byte_millis, int64_t transfer_bandwidth_mib_per_sec, + double ideal_bandwidth_utilization_frac, int64_t max_ideal_request_size_mib) { // // The I/O coalescing algorithm uses two parameters: // 1. hole_size_limit (a.k.a max_io_gap): Max I/O gap/hole size in bytes @@ -147,7 +146,7 @@ struct ReadRangeCache::Impl { std::shared_ptr owned_file; RandomAccessFile* file; IOContext ctx; - CacheOptions options; + CoalesceOptions options; // Ordered by offset (so as to find a matching region by binary search) std::vector entries; @@ -292,7 +291,7 @@ struct ReadRangeCache::LazyImpl : public ReadRangeCache::Impl { ReadRangeCache::ReadRangeCache(std::shared_ptr owned_file, RandomAccessFile* file, IOContext ctx, - CacheOptions options) + CoalesceOptions options) : impl_(options.lazy ? new LazyImpl() : new Impl()) { impl_->owned_file = std::move(owned_file); impl_->file = file; diff --git a/cpp/src/arrow/io/caching.h b/cpp/src/arrow/io/caching.h index 9f047fd62fb..b2996b4a5e4 100644 --- a/cpp/src/arrow/io/caching.h +++ b/cpp/src/arrow/io/caching.h @@ -30,7 +30,7 @@ namespace arrow { namespace io { -struct ARROW_EXPORT CacheOptions { +struct ARROW_EXPORT CoalesceOptions { static constexpr double kDefaultIdealBandwidthUtilizationFrac = 0.9; static constexpr int64_t kDefaultMaxIdealRequestSizeMib = 64; @@ -44,7 +44,7 @@ struct ARROW_EXPORT CacheOptions { /// \brief A lazy cache does not perform any I/O until requested. bool lazy; - bool operator==(const CacheOptions& other) const { + bool operator==(const CoalesceOptions& other) const { return hole_size_limit == other.hole_size_limit && range_size_limit == other.range_size_limit && lazy == other.lazy; } @@ -63,15 +63,17 @@ struct ARROW_EXPORT CacheOptions { /// to maximize the net data load. /// The value is a positive integer. /// \return A new instance of CacheOptions. - static CacheOptions MakeFromNetworkMetrics( + static CoalesceOptions MakeFromNetworkMetrics( int64_t time_to_first_byte_millis, int64_t transfer_bandwidth_mib_per_sec, double ideal_bandwidth_utilization_frac = kDefaultIdealBandwidthUtilizationFrac, int64_t max_ideal_request_size_mib = kDefaultMaxIdealRequestSizeMib); - static CacheOptions Defaults(); - static CacheOptions LazyDefaults(); + static CoalesceOptions Defaults(); + static CoalesceOptions LazyDefaults(); }; +using CacheOptions [[deprecated]] = CoalesceOptions; + namespace internal { /// \brief A read cache designed to hide IO latencies when reading. @@ -104,15 +106,15 @@ class ARROW_EXPORT ReadRangeCache { /// Construct a read cache with default explicit ReadRangeCache(std::shared_ptr file, IOContext ctx) - : ReadRangeCache(file, file.get(), std::move(ctx), CacheOptions::Defaults()) {} + : ReadRangeCache(file, file.get(), std::move(ctx), CoalesceOptions::Defaults()) {} /// Construct a read cache with given options explicit ReadRangeCache(std::shared_ptr file, IOContext ctx, - CacheOptions options) + CoalesceOptions options) : ReadRangeCache(file, file.get(), ctx, options) {} /// Construct a read cache with an unowned file - ReadRangeCache(RandomAccessFile* file, IOContext ctx, CacheOptions options) + ReadRangeCache(RandomAccessFile* file, IOContext ctx, CoalesceOptions options) : ReadRangeCache(NULLPTR, file, ctx, options) {} ~ReadRangeCache(); @@ -137,7 +139,7 @@ class ARROW_EXPORT ReadRangeCache { struct LazyImpl; ReadRangeCache(std::shared_ptr owned_file, RandomAccessFile* file, - IOContext ctx, CacheOptions options); + IOContext ctx, CoalesceOptions options); std::unique_ptr impl_; }; diff --git a/cpp/src/arrow/io/type_fwd.h b/cpp/src/arrow/io/type_fwd.h index a1b9e626bba..339229fbac3 100644 --- a/cpp/src/arrow/io/type_fwd.h +++ b/cpp/src/arrow/io/type_fwd.h @@ -28,7 +28,7 @@ struct FileMode { }; struct IOContext; -struct CacheOptions; +struct CoalesceOptions; /// EXPERIMENTAL: convenience global singleton for default IOContext settings ARROW_EXPORT diff --git a/cpp/src/arrow/ipc/options.h b/cpp/src/arrow/ipc/options.h index 4206f36195f..d1969db968c 100644 --- a/cpp/src/arrow/ipc/options.h +++ b/cpp/src/arrow/ipc/options.h @@ -148,7 +148,7 @@ struct ARROW_EXPORT IpcReadOptions { /// \brief Options to control caching behavior when pre-buffering is requested /// /// The lazy property will always be reset to true to deliver the expected behavior - io::CacheOptions pre_buffer_cache_options = io::CacheOptions::LazyDefaults(); + io::CoalesceOptions pre_buffer_cache_options = io::CoalesceOptions::LazyDefaults(); static IpcReadOptions Defaults(); }; diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc index a1b17afaaf9..91b9bde5116 100644 --- a/cpp/src/arrow/ipc/reader.cc +++ b/cpp/src/arrow/ipc/reader.cc @@ -1308,7 +1308,7 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { Result>> GetRecordBatchGenerator( const bool coalesce, const io::IOContext& io_context, - const io::CacheOptions cache_options, + const io::CoalesceOptions cache_options, arrow::internal::Executor* executor) override { auto state = std::dynamic_pointer_cast(shared_from_this()); // Prebuffering causes us to use a lot of futures which, at the moment, @@ -1537,7 +1537,7 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { owned_file(std::move(owned_file)), loader(batch, context.metadata_version, context.options, block_data_offset), columns(schema->num_fields()), - cache(file, file->io_context(), io::CacheOptions::LazyDefaults()), + cache(file, file->io_context(), io::CoalesceOptions::LazyDefaults()), length(batch->length()) {} Status CalculateLoadRequest() { diff --git a/cpp/src/arrow/ipc/reader.h b/cpp/src/arrow/ipc/reader.h index ad7969b31c9..475e4ed7012 100644 --- a/cpp/src/arrow/ipc/reader.h +++ b/cpp/src/arrow/ipc/reader.h @@ -227,7 +227,7 @@ class ARROW_EXPORT RecordBatchFileReader virtual Result>> GetRecordBatchGenerator( const bool coalesce = false, const io::IOContext& io_context = io::default_io_context(), - const io::CacheOptions cache_options = io::CacheOptions::LazyDefaults(), + const io::CoalesceOptions cache_options = io::CoalesceOptions::LazyDefaults(), arrow::internal::Executor* executor = NULLPTR) = 0; };