From 03ce3c89b193575b7b6dad52b4ccfafc6c2a9e88 Mon Sep 17 00:00:00 2001
From: David Li
Date: Fri, 23 Apr 2021 15:56:08 -0400
Subject: [PATCH 1/3] ARROW-12522: [C++] Add ReadRangeCache::WaitFor
---
cpp/src/arrow/filesystem/s3fs_benchmark.cc | 6 +-
cpp/src/arrow/io/caching.cc | 182 ++++++++++++++++-----
cpp/src/arrow/io/caching.h | 39 ++++-
cpp/src/arrow/io/memory_test.cc | 101 +++++++++++-
4 files changed, 279 insertions(+), 49 deletions(-)
diff --git a/cpp/src/arrow/filesystem/s3fs_benchmark.cc b/cpp/src/arrow/filesystem/s3fs_benchmark.cc
index 36564a70d29..869601b844e 100644
--- a/cpp/src/arrow/filesystem/s3fs_benchmark.cc
+++ b/cpp/src/arrow/filesystem/s3fs_benchmark.cc
@@ -260,8 +260,10 @@ static void CoalescedRead(benchmark::State& st, S3FileSystem* fs,
ASSERT_OK_AND_ASSIGN(size, file->GetSize());
total_items += 1;
- io::internal::ReadRangeCache cache(file, {},
- io::CacheOptions{8192, 64 * 1024 * 1024});
+ io::internal::ReadRangeCache cache(
+ file, {},
+ io::CacheOptions{/*hole_size_limit=*/8192, /*range_size_limit=*/64 * 1024 * 1024,
+ /*lazy=*/false});
std::vector ranges;
int64_t offset = 0;
diff --git a/cpp/src/arrow/io/caching.cc b/cpp/src/arrow/io/caching.cc
index 1a7a55cd1b2..85e21833627 100644
--- a/cpp/src/arrow/io/caching.cc
+++ b/cpp/src/arrow/io/caching.cc
@@ -18,6 +18,7 @@
#include
#include
#include
+#include
#include
#include
@@ -33,7 +34,14 @@ namespace io {
CacheOptions CacheOptions::Defaults() {
return CacheOptions{internal::ReadRangeCache::kDefaultHoleSizeLimit,
- internal::ReadRangeCache::kDefaultRangeSizeLimit};
+ internal::ReadRangeCache::kDefaultRangeSizeLimit,
+ /*lazy=*/false};
+}
+
+CacheOptions CacheOptions::LazyDefaults() {
+ return CacheOptions{internal::ReadRangeCache::kDefaultHoleSizeLimit,
+ internal::ReadRangeCache::kDefaultRangeSizeLimit,
+ /*lazy=*/true};
}
CacheOptions CacheOptions::MakeFromNetworkMetrics(int64_t time_to_first_byte_millis,
@@ -117,7 +125,7 @@ CacheOptions CacheOptions::MakeFromNetworkMetrics(int64_t time_to_first_byte_mil
(1 - ideal_bandwidth_utilization_frac))));
DCHECK_GT(range_size_limit, 0) << "Computed range_size_limit must be > 0";
- return {hole_size_limit, range_size_limit};
+ return {hole_size_limit, range_size_limit, false};
}
namespace internal {
@@ -139,8 +147,28 @@ struct ReadRangeCache::Impl {
// Ordered by offset (so as to find a matching region by binary search)
std::vector entries;
- // Add new entries, themselves ordered by offset
- void AddEntries(std::vector new_entries) {
+ virtual ~Impl() = default;
+
+ // Get the future corresponding to a range
+ virtual Future> MaybeRead(RangeCacheEntry* entry) {
+ return entry->future;
+ }
+
+ // Make a cache entry for a range
+ virtual RangeCacheEntry MakeCacheEntry(const ReadRange& range) {
+ return {range, file->ReadAsync(ctx, range.offset, range.length)};
+ }
+
+ // Add the given ranges to the cache, coalescing them where possible
+ virtual Status Cache(std::vector ranges) {
+ ranges = internal::CoalesceReadRanges(std::move(ranges), options.hole_size_limit,
+ options.range_size_limit);
+ std::vector new_entries;
+ new_entries.reserve(ranges.size());
+ for (const auto& range : ranges) {
+ new_entries.push_back(MakeCacheEntry(range));
+ }
+ // Add new entries, themselves ordered by offset
if (entries.size() > 0) {
std::vector merged(entries.size() + new_entries.size());
std::merge(entries.begin(), entries.end(), new_entries.begin(), new_entries.end(),
@@ -149,12 +177,117 @@ struct ReadRangeCache::Impl {
} else {
entries = std::move(new_entries);
}
+ // Prefetch immediately, regardless of executor availability, if possible
+ return file->WillNeed(ranges);
+ }
+
+ // Read the given range from the cache, blocking if needed. Cannot read a range
+ // that spans cache entries.
+ virtual Result> Read(ReadRange range) {
+ if (range.length == 0) {
+ static const uint8_t byte = 0;
+ return std::make_shared(&byte, 0);
+ }
+
+ const auto it = std::lower_bound(
+ entries.begin(), entries.end(), range,
+ [](const RangeCacheEntry& entry, const ReadRange& range) {
+ return entry.range.offset + entry.range.length < range.offset + range.length;
+ });
+ if (it != entries.end() && it->range.Contains(range)) {
+ auto fut = MaybeRead(&*it);
+ ARROW_ASSIGN_OR_RAISE(auto buf, fut.result());
+ return SliceBuffer(std::move(buf), range.offset - it->range.offset, range.length);
+ }
+ return Status::Invalid("ReadRangeCache did not find matching cache entry");
+ }
+
+ virtual Future<> Wait() {
+ std::vector> futures;
+ for (auto& entry : entries) {
+ futures.emplace_back(MaybeRead(&entry));
+ }
+ return AllComplete(futures);
+ }
+
+ // Return a Future that completes when the given ranges have been read.
+ virtual Future<> WaitFor(std::vector ranges) {
+ auto end = std::remove_if(ranges.begin(), ranges.end(),
+ [](const ReadRange& range) { return range.length == 0; });
+ ranges.resize(end - ranges.begin());
+ // Sort in reverse position order
+ std::sort(ranges.begin(), ranges.end(),
+ [](const ReadRange& a, const ReadRange& b) { return a.offset > b.offset; });
+
+ std::vector> futures;
+ for (auto& entry : entries) {
+ bool include = false;
+ while (!ranges.empty()) {
+ const auto& next = ranges.back();
+ if (next.offset >= entry.range.offset &&
+ next.offset + next.length <= entry.range.offset + entry.range.length) {
+ include = true;
+ ranges.pop_back();
+ } else {
+ break;
+ }
+ }
+ if (include) futures.emplace_back(MaybeRead(&entry));
+ if (ranges.empty()) break;
+ }
+ if (!ranges.empty()) {
+ return Status::Invalid("Given ranges were not previously requested for caching");
+ }
+ return AllComplete(futures);
+ }
+};
+
+// Don't read ranges when they're first added. Instead, wait until they're requested
+// (either through Read or WaitFor).
+struct ReadRangeCache::LazyImpl : public ReadRangeCache::Impl {
+ // Protect against concurrent modification of entries[i]->future
+ std::mutex entry_mutex;
+
+ virtual ~LazyImpl() = default;
+
+ Future> MaybeRead(RangeCacheEntry* entry) override {
+ // Called by superclass Read()/WaitFor() so we have the lock
+ if (!entry->future.is_valid()) {
+ entry->future = file->ReadAsync(ctx, entry->range.offset, entry->range.length);
+ }
+ return entry->future;
+ }
+
+ RangeCacheEntry MakeCacheEntry(const ReadRange& range) override {
+ // In the lazy variant, don't read data here - later, a call to Read or WaitFor
+ // will call back to MaybeRead (under the lock) which will fill the future.
+ return {range, Future>()};
+ }
+
+ Status Cache(std::vector ranges) override {
+ std::unique_lock guard(entry_mutex);
+ return ReadRangeCache::Impl::Cache(std::move(ranges));
+ }
+
+ Result> Read(ReadRange range) override {
+ std::unique_lock guard(entry_mutex);
+ return ReadRangeCache::Impl::Read(range);
+ }
+
+ Future<> Wait() override {
+ std::unique_lock guard(entry_mutex);
+ return ReadRangeCache::Impl::Wait();
+ }
+
+ Future<> WaitFor(std::vector ranges) override {
+ std::unique_lock guard(entry_mutex);
+ return ReadRangeCache::Impl::WaitFor(std::move(ranges));
}
};
ReadRangeCache::ReadRangeCache(std::shared_ptr file, IOContext ctx,
CacheOptions options)
- : impl_(new Impl()) {
+ : impl_(options.lazy ? new LazyImpl() : new Impl()) {
impl_->file = std::move(file);
impl_->ctx = std::move(ctx);
impl_->options = options;
@@ -163,44 +296,17 @@ ReadRangeCache::ReadRangeCache(std::shared_ptr file, IOContext
ReadRangeCache::~ReadRangeCache() = default;
Status ReadRangeCache::Cache(std::vector ranges) {
- ranges = internal::CoalesceReadRanges(std::move(ranges), impl_->options.hole_size_limit,
- impl_->options.range_size_limit);
- std::vector entries;
- entries.reserve(ranges.size());
- for (const auto& range : ranges) {
- auto fut = impl_->file->ReadAsync(impl_->ctx, range.offset, range.length);
- entries.push_back({range, std::move(fut)});
- }
-
- impl_->AddEntries(std::move(entries));
- // Prefetch immediately, regardless of executor availability, if possible
- return impl_->file->WillNeed(ranges);
+ return impl_->Cache(std::move(ranges));
}
Result> ReadRangeCache::Read(ReadRange range) {
- if (range.length == 0) {
- static const uint8_t byte = 0;
- return std::make_shared(&byte, 0);
- }
-
- const auto it = std::lower_bound(
- impl_->entries.begin(), impl_->entries.end(), range,
- [](const RangeCacheEntry& entry, const ReadRange& range) {
- return entry.range.offset + entry.range.length < range.offset + range.length;
- });
- if (it != impl_->entries.end() && it->range.Contains(range)) {
- ARROW_ASSIGN_OR_RAISE(auto buf, it->future.result());
- return SliceBuffer(std::move(buf), range.offset - it->range.offset, range.length);
- }
- return Status::Invalid("ReadRangeCache did not find matching cache entry");
+ return impl_->Read(range);
}
-Future<> ReadRangeCache::Wait() {
- std::vector> futures;
- for (const auto& entry : impl_->entries) {
- futures.emplace_back(entry.future);
- }
- return AllComplete(futures);
+Future<> ReadRangeCache::Wait() { return impl_->Wait(); }
+
+Future<> ReadRangeCache::WaitFor(std::vector ranges) {
+ return impl_->WaitFor(std::move(ranges));
}
} // namespace internal
diff --git a/cpp/src/arrow/io/caching.h b/cpp/src/arrow/io/caching.h
index a5b48dd885e..59a9b60e82f 100644
--- a/cpp/src/arrow/io/caching.h
+++ b/cpp/src/arrow/io/caching.h
@@ -34,17 +34,19 @@ struct ARROW_EXPORT CacheOptions {
static constexpr double kDefaultIdealBandwidthUtilizationFrac = 0.9;
static constexpr int64_t kDefaultMaxIdealRequestSizeMib = 64;
- /// /brief The maximum distance in bytes between two consecutive
+ /// \brief The maximum distance in bytes between two consecutive
/// ranges; beyond this value, ranges are not combined
int64_t hole_size_limit;
- /// /brief The maximum size in bytes of a combined range; if
+ /// \brief The maximum size in bytes of a combined range; if
/// combining two consecutive ranges would produce a range of a
/// size greater than this, they are not combined
int64_t range_size_limit;
+ /// \brief A lazy cache does not perform any I/O until requested.
+ bool lazy;
bool operator==(const CacheOptions& other) const {
return hole_size_limit == other.hole_size_limit &&
- range_size_limit == other.range_size_limit;
+ range_size_limit == other.range_size_limit && lazy == other.lazy;
}
/// \brief Construct CacheOptions from network storage metrics (e.g. S3).
@@ -67,16 +69,34 @@ struct ARROW_EXPORT CacheOptions {
int64_t max_ideal_request_size_mib = kDefaultMaxIdealRequestSizeMib);
static CacheOptions Defaults();
+ static CacheOptions LazyDefaults();
};
namespace internal {
/// \brief A read cache designed to hide IO latencies when reading.
///
-/// To use this, you must first pass it the ranges you'll need in the future.
-/// The cache will combine those ranges according to parameters (see constructor)
-/// and start fetching the combined ranges in the background.
-/// You can then individually fetch them using Read().
+/// This class takes multiple byte ranges that an application expects to read, and
+/// coalesces them into fewer, larger read requests, which benefits performance on some
+/// filesystems, particularly remote ones like Amazon S3. By default, it also issues
+/// these read requests in parallel up front.
+///
+/// To use:
+/// 1. Cache() the ranges you expect to read in the future. Ideally, these ranges have
+/// the exact offset and length that will later be read. The cache will combine those
+/// ranges according to parameters (see constructor).
+///
+/// By default, the cache will also start fetching the combined ranges in parallel in
+/// the background, unless CacheOptions.lazy is set.
+///
+/// 2. Call WaitFor() to be notified when the given ranges have been read. If
+/// CacheOptions.lazy is set, I/O will be triggered in the background here instead.
+/// This can be done in parallel (e.g. if parsing a file, call WaitFor() for each
+/// chunk of the file that can be parsed in parallel).
+///
+/// 3. Call Read() to retrieve the actual data for the given ranges.
+/// A synchronous application may skip WaitFor() and just call Read() - it will still
+/// benefit from coalescing and parallel fetching.
class ARROW_EXPORT ReadRangeCache {
public:
static constexpr int64_t kDefaultHoleSizeLimit = 8192;
@@ -103,8 +123,13 @@ class ARROW_EXPORT ReadRangeCache {
/// \brief Wait until all ranges added so far have been cached.
Future<> Wait();
+ /// \brief Wait until all given ranges have been cached.
+ Future<> WaitFor(std::vector ranges);
+
protected:
struct Impl;
+ struct LazyImpl;
+
std::unique_ptr impl_;
};
diff --git a/cpp/src/arrow/io/memory_test.cc b/cpp/src/arrow/io/memory_test.cc
index 00a1dcedb55..988f7821537 100644
--- a/cpp/src/arrow/io/memory_test.cc
+++ b/cpp/src/arrow/io/memory_test.cc
@@ -37,6 +37,7 @@
#include "arrow/io/transform.h"
#include "arrow/io/util_internal.h"
#include "arrow/status.h"
+#include "arrow/testing/future_util.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/util.h"
#include "arrow/util/bit_util.h"
@@ -44,6 +45,7 @@
#include "arrow/util/future.h"
#include "arrow/util/iterator.h"
#include "arrow/util/logging.h"
+#include "arrow/util/parallel.h"
namespace arrow {
@@ -692,10 +694,24 @@ TEST(CoalesceReadRanges, Basics) {
{{110, 21}, {140, 100}, {240, 31}});
}
+class CountingBufferReader : public BufferReader {
+ public:
+ using BufferReader::BufferReader;
+ Future> ReadAsync(const IOContext& context, int64_t position,
+ int64_t nbytes) override {
+ read_count_++;
+ return BufferReader::ReadAsync(context, position, nbytes);
+ }
+ int64_t read_count() const { return read_count_; }
+
+ private:
+ int64_t read_count_ = 0;
+};
+
TEST(RangeReadCache, Basics) {
std::string data = "abcdefghijklmnopqrstuvwxyz";
- auto file = std::make_shared(Buffer(data));
+ auto file = std::make_shared(Buffer(data));
CacheOptions options = CacheOptions::Defaults();
options.hole_size_limit = 2;
options.range_size_limit = 10;
@@ -727,6 +743,86 @@ TEST(RangeReadCache, Basics) {
ASSERT_RAISES(Invalid, cache.Read({19, 3}));
ASSERT_RAISES(Invalid, cache.Read({0, 3}));
ASSERT_RAISES(Invalid, cache.Read({25, 2}));
+
+ ASSERT_FINISHES_OK(cache.Wait());
+ // 8 ranges should lead to less than 8 reads
+ ASSERT_LT(file->read_count(), 8);
+}
+
+TEST(RangeReadCache, Concurrency) {
+ std::string data = "abcdefghijklmnopqrstuvwxyz";
+
+ auto file = std::make_shared(Buffer(data));
+ std::vector ranges{{1, 2}, {3, 2}, {8, 2}, {20, 2},
+ {25, 0}, {10, 4}, {14, 0}, {15, 4}};
+
+ for (auto lazy : std::vector{false, true}) {
+ CacheOptions options = CacheOptions::Defaults();
+ options.hole_size_limit = 2;
+ options.range_size_limit = 10;
+ options.lazy = lazy;
+
+ {
+ internal::ReadRangeCache cache(file, {}, options);
+ ASSERT_OK(cache.Cache(ranges));
+ std::vector>> futures;
+ for (const auto& range : ranges) {
+ futures.push_back(cache.WaitFor({range}).Then(
+ [&cache, range](const detail::Empty&) { return cache.Read(range); }));
+ }
+ for (auto fut : futures) {
+ ASSERT_FINISHES_OK(fut);
+ }
+ }
+ {
+ internal::ReadRangeCache cache(file, {}, options);
+ ASSERT_OK(cache.Cache(ranges));
+ ASSERT_OK(arrow::internal::ParallelFor(
+ static_cast(ranges.size()),
+ [&](int index) { return cache.Read(ranges[index]).status(); }));
+ }
+ }
+}
+
+TEST(RangeReadCache, Lazy) {
+ std::string data = "abcdefghijklmnopqrstuvwxyz";
+
+ auto file = std::make_shared(Buffer(data));
+ CacheOptions options = CacheOptions::LazyDefaults();
+ options.hole_size_limit = 2;
+ options.range_size_limit = 10;
+ internal::ReadRangeCache cache(file, {}, options);
+
+ ASSERT_OK(cache.Cache({{1, 2}, {3, 2}, {8, 2}, {20, 2}, {25, 0}}));
+ ASSERT_OK(cache.Cache({{10, 4}, {14, 0}, {15, 4}}));
+
+ // Lazy cache doesn't fetch ranges until requested
+ ASSERT_EQ(0, file->read_count());
+
+ ASSERT_OK_AND_ASSIGN(auto buf, cache.Read({20, 2}));
+ AssertBufferEqual(*buf, "uv");
+ ASSERT_EQ(1, file->read_count());
+
+ ASSERT_OK_AND_ASSIGN(buf, cache.Read({1, 4}));
+ AssertBufferEqual(*buf, "bcde");
+ ASSERT_EQ(2, file->read_count());
+
+ // Requested ranges are still cached
+ ASSERT_OK_AND_ASSIGN(buf, cache.Read({1, 4}));
+ ASSERT_EQ(2, file->read_count());
+
+ // Non-cached ranges
+ ASSERT_RAISES(Invalid, cache.Read({20, 3}));
+ ASSERT_RAISES(Invalid, cache.Read({19, 3}));
+ ASSERT_RAISES(Invalid, cache.Read({0, 3}));
+ ASSERT_RAISES(Invalid, cache.Read({25, 2}));
+
+ // Can asynchronously kick off a read (though BufferReader::ReadAsync is synchronous so
+ // it will increment the read count here)
+ ASSERT_FINISHES_OK(cache.WaitFor({{10, 2}, {15, 4}}));
+ ASSERT_EQ(3, file->read_count());
+ ASSERT_OK_AND_ASSIGN(buf, cache.Read({10, 2}));
+ ASSERT_EQ(3, file->read_count());
}
TEST(CacheOptions, Basics) {
@@ -734,7 +830,8 @@ TEST(CacheOptions, Basics) {
const double expected_range_size_limit_MiB) -> void {
const CacheOptions expected = {
static_cast(std::round(expected_hole_size_limit_MiB * 1024 * 1024)),
- static_cast(std::round(expected_range_size_limit_MiB * 1024 * 1024))};
+ static_cast(std::round(expected_range_size_limit_MiB * 1024 * 1024)),
+ /*lazy=*/false};
ASSERT_EQ(actual, expected);
};
From 49a9dfe5cbcdb6f512abbb3779ecd8b7502b5531 Mon Sep 17 00:00:00 2001
From: David Li
Date: Wed, 28 Apr 2021 08:46:54 -0400
Subject: [PATCH 2/3] ARROW-12522: [C++] Improve ReadRangeCache::WaitFor
---
cpp/src/arrow/io/caching.cc | 68 +++++++++++++++---------------
cpp/src/arrow/io/memory_test.cc | 73 ++++++++++++++++++---------------
2 files changed, 77 insertions(+), 64 deletions(-)
diff --git a/cpp/src/arrow/io/caching.cc b/cpp/src/arrow/io/caching.cc
index 85e21833627..722026ccd9b 100644
--- a/cpp/src/arrow/io/caching.cc
+++ b/cpp/src/arrow/io/caching.cc
@@ -134,6 +134,10 @@ struct RangeCacheEntry {
ReadRange range;
Future> future;
+ RangeCacheEntry() = default;
+ RangeCacheEntry(const ReadRange& range_, Future> future_)
+ : range(range_), future(std::move(future_)) {}
+
friend bool operator<(const RangeCacheEntry& left, const RangeCacheEntry& right) {
return left.range.offset < right.range.offset;
}
@@ -154,20 +158,22 @@ struct ReadRangeCache::Impl {
return entry->future;
}
- // Make a cache entry for a range
- virtual RangeCacheEntry MakeCacheEntry(const ReadRange& range) {
- return {range, file->ReadAsync(ctx, range.offset, range.length)};
+ // Make cache entries for ranges
+ virtual std::vector MakeCacheEntries(
+ const std::vector& ranges) {
+ std::vector new_entries;
+ new_entries.reserve(ranges.size());
+ for (const auto& range : ranges) {
+ new_entries.emplace_back(range, file->ReadAsync(ctx, range.offset, range.length));
+ }
+ return new_entries;
}
// Add the given ranges to the cache, coalescing them where possible
virtual Status Cache(std::vector ranges) {
ranges = internal::CoalesceReadRanges(std::move(ranges), options.hole_size_limit,
options.range_size_limit);
- std::vector new_entries;
- new_entries.reserve(ranges.size());
- for (const auto& range : ranges) {
- new_entries.push_back(MakeCacheEntry(range));
- }
+ std::vector new_entries = MakeCacheEntries(ranges);
// Add new entries, themselves ordered by offset
if (entries.size() > 0) {
std::vector merged(entries.size() + new_entries.size());
@@ -215,28 +221,20 @@ struct ReadRangeCache::Impl {
auto end = std::remove_if(ranges.begin(), ranges.end(),
[](const ReadRange& range) { return range.length == 0; });
ranges.resize(end - ranges.begin());
- // Sort in reverse position order
- std::sort(ranges.begin(), ranges.end(),
- [](const ReadRange& a, const ReadRange& b) { return a.offset > b.offset; });
-
std::vector> futures;
- for (auto& entry : entries) {
- bool include = false;
- while (!ranges.empty()) {
- const auto& next = ranges.back();
- if (next.offset >= entry.range.offset &&
- next.offset + next.length <= entry.range.offset + entry.range.length) {
- include = true;
- ranges.pop_back();
- } else {
- break;
- }
+ futures.reserve(ranges.size());
+ for (auto& range : ranges) {
+ const auto it = std::lower_bound(
+ entries.begin(), entries.end(), range,
+ [](const RangeCacheEntry& entry, const ReadRange& range) {
+ return entry.range.offset + entry.range.length < range.offset + range.length;
+ });
+ if (it != entries.end() && it->range.Contains(range)) {
+ futures.push_back(Future<>(MaybeRead(&*it)));
+ } else {
+ return Status::Invalid("Range was not requested for caching: offset=",
+ range.offset, " length=", range.length);
}
- if (include) futures.emplace_back(MaybeRead(&entry));
- if (ranges.empty()) break;
- }
- if (!ranges.empty()) {
- return Status::Invalid("Given ranges were not previously requested for caching");
}
return AllComplete(futures);
}
@@ -258,10 +256,16 @@ struct ReadRangeCache::LazyImpl : public ReadRangeCache::Impl {
return entry->future;
}
- RangeCacheEntry MakeCacheEntry(const ReadRange& range) override {
- // In the lazy variant, don't read data here - later, a call to Read or WaitFor
- // will call back to MaybeRead (under the lock) which will fill the future.
- return {range, Future>()};
+ std::vector MakeCacheEntries(
+ const std::vector& ranges) override {
+ std::vector new_entries;
+ new_entries.reserve(ranges.size());
+ for (const auto& range : ranges) {
+ // In the lazy variant, don't read data here - later, a call to Read or WaitFor
+ // will call back to MaybeRead (under the lock) which will fill the future.
+ new_entries.emplace_back(range, Future>());
+ }
+ return new_entries;
}
Status Cache(std::vector ranges) override {
diff --git a/cpp/src/arrow/io/memory_test.cc b/cpp/src/arrow/io/memory_test.cc
index 988f7821537..b6e78fbad7c 100644
--- a/cpp/src/arrow/io/memory_test.cc
+++ b/cpp/src/arrow/io/memory_test.cc
@@ -711,42 +711,50 @@ class CountingBufferReader : public BufferReader {
TEST(RangeReadCache, Basics) {
std::string data = "abcdefghijklmnopqrstuvwxyz";
- auto file = std::make_shared(Buffer(data));
CacheOptions options = CacheOptions::Defaults();
options.hole_size_limit = 2;
options.range_size_limit = 10;
- internal::ReadRangeCache cache(file, {}, options);
-
- ASSERT_OK(cache.Cache({{1, 2}, {3, 2}, {8, 2}, {20, 2}, {25, 0}}));
- ASSERT_OK(cache.Cache({{10, 4}, {14, 0}, {15, 4}}));
- ASSERT_OK_AND_ASSIGN(auto buf, cache.Read({20, 2}));
- AssertBufferEqual(*buf, "uv");
- ASSERT_OK_AND_ASSIGN(buf, cache.Read({1, 2}));
- AssertBufferEqual(*buf, "bc");
- ASSERT_OK_AND_ASSIGN(buf, cache.Read({3, 2}));
- AssertBufferEqual(*buf, "de");
- ASSERT_OK_AND_ASSIGN(buf, cache.Read({8, 2}));
- AssertBufferEqual(*buf, "ij");
- ASSERT_OK_AND_ASSIGN(buf, cache.Read({10, 4}));
- AssertBufferEqual(*buf, "klmn");
- ASSERT_OK_AND_ASSIGN(buf, cache.Read({15, 4}));
- AssertBufferEqual(*buf, "pqrs");
- // Zero-sized
- ASSERT_OK_AND_ASSIGN(buf, cache.Read({14, 0}));
- AssertBufferEqual(*buf, "");
- ASSERT_OK_AND_ASSIGN(buf, cache.Read({25, 0}));
- AssertBufferEqual(*buf, "");
-
- // Non-cached ranges
- ASSERT_RAISES(Invalid, cache.Read({20, 3}));
- ASSERT_RAISES(Invalid, cache.Read({19, 3}));
- ASSERT_RAISES(Invalid, cache.Read({0, 3}));
- ASSERT_RAISES(Invalid, cache.Read({25, 2}));
-
- ASSERT_FINISHES_OK(cache.Wait());
- // 8 ranges should lead to less than 8 reads
- ASSERT_LT(file->read_count(), 8);
+ for (auto lazy : std::vector{false, true}) {
+ SCOPED_TRACE(lazy);
+ options.lazy = lazy;
+ auto file = std::make_shared(Buffer(data));
+ internal::ReadRangeCache cache(file, {}, options);
+
+ ASSERT_OK(cache.Cache({{1, 2}, {3, 2}, {8, 2}, {20, 2}, {25, 0}}));
+ ASSERT_OK(cache.Cache({{10, 4}, {14, 0}, {15, 4}}));
+
+ ASSERT_OK_AND_ASSIGN(auto buf, cache.Read({20, 2}));
+ AssertBufferEqual(*buf, "uv");
+ ASSERT_OK_AND_ASSIGN(buf, cache.Read({1, 2}));
+ AssertBufferEqual(*buf, "bc");
+ ASSERT_OK_AND_ASSIGN(buf, cache.Read({3, 2}));
+ AssertBufferEqual(*buf, "de");
+ ASSERT_OK_AND_ASSIGN(buf, cache.Read({8, 2}));
+ AssertBufferEqual(*buf, "ij");
+ ASSERT_OK_AND_ASSIGN(buf, cache.Read({10, 4}));
+ AssertBufferEqual(*buf, "klmn");
+ ASSERT_OK_AND_ASSIGN(buf, cache.Read({15, 4}));
+ AssertBufferEqual(*buf, "pqrs");
+ ASSERT_FINISHES_OK(cache.WaitFor({{15, 1}, {16, 3}, {25, 0}, {1, 2}}));
+ // Zero-sized
+ ASSERT_OK_AND_ASSIGN(buf, cache.Read({14, 0}));
+ AssertBufferEqual(*buf, "");
+ ASSERT_OK_AND_ASSIGN(buf, cache.Read({25, 0}));
+ AssertBufferEqual(*buf, "");
+
+ // Non-cached ranges
+ ASSERT_RAISES(Invalid, cache.Read({20, 3}));
+ ASSERT_RAISES(Invalid, cache.Read({19, 3}));
+ ASSERT_RAISES(Invalid, cache.Read({0, 3}));
+ ASSERT_RAISES(Invalid, cache.Read({25, 2}));
+ ASSERT_FINISHES_AND_RAISES(Invalid, cache.WaitFor({{25, 2}}));
+ ASSERT_FINISHES_AND_RAISES(Invalid, cache.WaitFor({{1, 2}, {25, 2}}));
+
+ ASSERT_FINISHES_OK(cache.Wait());
+ // 8 ranges should lead to less than 8 reads
+ ASSERT_LT(file->read_count(), 8);
+ }
}
TEST(RangeReadCache, Concurrency) {
@@ -757,6 +765,7 @@ TEST(RangeReadCache, Concurrency) {
{25, 0}, {10, 4}, {14, 0}, {15, 4}};
for (auto lazy : std::vector{false, true}) {
+ SCOPED_TRACE(lazy);
CacheOptions options = CacheOptions::Defaults();
options.hole_size_limit = 2;
options.range_size_limit = 10;
From c84d6093eab5ba77a4a89c6759f06e3489a40624 Mon Sep 17 00:00:00 2001
From: David Li
Date: Wed, 28 Apr 2021 09:01:15 -0400
Subject: [PATCH 3/3] ARROW-12522: [C++] Coalesce completely-overlapping ranges
---
cpp/src/arrow/io/interfaces.cc | 10 ++++++++--
cpp/src/arrow/io/memory_test.cc | 3 +++
2 files changed, 11 insertions(+), 2 deletions(-)
diff --git a/cpp/src/arrow/io/interfaces.cc b/cpp/src/arrow/io/interfaces.cc
index d052c016837..670fab415d7 100644
--- a/cpp/src/arrow/io/interfaces.cc
+++ b/cpp/src/arrow/io/interfaces.cc
@@ -357,10 +357,16 @@ struct ReadRangeCombiner {
// Remove zero-sized ranges
auto end = std::remove_if(ranges.begin(), ranges.end(),
[](const ReadRange& range) { return range.length == 0; });
- ranges.resize(end - ranges.begin());
// Sort in position order
- std::sort(ranges.begin(), ranges.end(),
+ std::sort(ranges.begin(), end,
[](const ReadRange& a, const ReadRange& b) { return a.offset < b.offset; });
+ // Remove ranges that overlap 100%
+ end = std::unique(ranges.begin(), end,
+ [](const ReadRange& left, const ReadRange& right) {
+ return right.offset >= left.offset &&
+ right.offset + right.length <= left.offset + left.length;
+ });
+ ranges.resize(end - ranges.begin());
// Skip further processing if ranges is empty after removing zero-sized ranges.
if (ranges.empty()) {
diff --git a/cpp/src/arrow/io/memory_test.cc b/cpp/src/arrow/io/memory_test.cc
index b6e78fbad7c..6a24b0c764f 100644
--- a/cpp/src/arrow/io/memory_test.cc
+++ b/cpp/src/arrow/io/memory_test.cc
@@ -692,6 +692,9 @@ TEST(CoalesceReadRanges, Basics) {
// Same as (*) but unsorted
check({{140, 100}, {120, 11}, {240, 11}, {110, 10}, {260, 11}},
{{110, 21}, {140, 100}, {240, 31}});
+
+ // Completely overlapping ranges should be eliminated
+ check({{20, 5}, {20, 5}, {21, 2}}, {{20, 5}});
}
class CountingBufferReader : public BufferReader {