Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions cpp/src/arrow/filesystem/s3fs_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,10 @@ static void CoalescedRead(benchmark::State& st, S3FileSystem* fs,
ASSERT_OK_AND_ASSIGN(size, file->GetSize());
total_items += 1;

io::internal::ReadRangeCache cache(file, {},
io::CacheOptions{8192, 64 * 1024 * 1024});
io::internal::ReadRangeCache cache(
file, {},
io::CacheOptions{/*hole_size_limit=*/8192, /*range_size_limit=*/64 * 1024 * 1024,
/*lazy=*/false});
std::vector<io::ReadRange> ranges;

int64_t offset = 0;
Expand Down
186 changes: 148 additions & 38 deletions cpp/src/arrow/io/caching.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <algorithm>
#include <atomic>
#include <cmath>
#include <mutex>
#include <utility>
#include <vector>

Expand All @@ -33,7 +34,14 @@ namespace io {

CacheOptions CacheOptions::Defaults() {
return CacheOptions{internal::ReadRangeCache::kDefaultHoleSizeLimit,
internal::ReadRangeCache::kDefaultRangeSizeLimit};
internal::ReadRangeCache::kDefaultRangeSizeLimit,
/*lazy=*/false};
}

CacheOptions CacheOptions::LazyDefaults() {
return CacheOptions{internal::ReadRangeCache::kDefaultHoleSizeLimit,
internal::ReadRangeCache::kDefaultRangeSizeLimit,
/*lazy=*/true};
}

CacheOptions CacheOptions::MakeFromNetworkMetrics(int64_t time_to_first_byte_millis,
Expand Down Expand Up @@ -117,7 +125,7 @@ CacheOptions CacheOptions::MakeFromNetworkMetrics(int64_t time_to_first_byte_mil
(1 - ideal_bandwidth_utilization_frac))));
DCHECK_GT(range_size_limit, 0) << "Computed range_size_limit must be > 0";

return {hole_size_limit, range_size_limit};
return {hole_size_limit, range_size_limit, false};
}

namespace internal {
Expand All @@ -126,6 +134,10 @@ struct RangeCacheEntry {
ReadRange range;
Future<std::shared_ptr<Buffer>> future;

RangeCacheEntry() = default;
RangeCacheEntry(const ReadRange& range_, Future<std::shared_ptr<Buffer>> future_)
: range(range_), future(std::move(future_)) {}

friend bool operator<(const RangeCacheEntry& left, const RangeCacheEntry& right) {
return left.range.offset < right.range.offset;
}
Expand All @@ -139,8 +151,30 @@ struct ReadRangeCache::Impl {
// Ordered by offset (so as to find a matching region by binary search)
std::vector<RangeCacheEntry> entries;

// Add new entries, themselves ordered by offset
void AddEntries(std::vector<RangeCacheEntry> new_entries) {
virtual ~Impl() = default;

// Get the future corresponding to a range
virtual Future<std::shared_ptr<Buffer>> MaybeRead(RangeCacheEntry* entry) {
return entry->future;
}

// Make cache entries for ranges
virtual std::vector<RangeCacheEntry> MakeCacheEntries(
const std::vector<ReadRange>& ranges) {
std::vector<RangeCacheEntry> new_entries;
new_entries.reserve(ranges.size());
for (const auto& range : ranges) {
new_entries.emplace_back(range, file->ReadAsync(ctx, range.offset, range.length));
}
return new_entries;
}

// Add the given ranges to the cache, coalescing them where possible
virtual Status Cache(std::vector<ReadRange> ranges) {
ranges = internal::CoalesceReadRanges(std::move(ranges), options.hole_size_limit,
options.range_size_limit);
std::vector<RangeCacheEntry> new_entries = MakeCacheEntries(ranges);
// Add new entries, themselves ordered by offset
if (entries.size() > 0) {
std::vector<RangeCacheEntry> merged(entries.size() + new_entries.size());
std::merge(entries.begin(), entries.end(), new_entries.begin(), new_entries.end(),
Expand All @@ -149,12 +183,115 @@ struct ReadRangeCache::Impl {
} else {
entries = std::move(new_entries);
}
// Prefetch immediately, regardless of executor availability, if possible
return file->WillNeed(ranges);
}

// Read the given range from the cache, blocking if needed. Cannot read a range
// that spans cache entries.
virtual Result<std::shared_ptr<Buffer>> Read(ReadRange range) {
if (range.length == 0) {
static const uint8_t byte = 0;
return std::make_shared<Buffer>(&byte, 0);
}

const auto it = std::lower_bound(
entries.begin(), entries.end(), range,
[](const RangeCacheEntry& entry, const ReadRange& range) {
return entry.range.offset + entry.range.length < range.offset + range.length;
});
if (it != entries.end() && it->range.Contains(range)) {
auto fut = MaybeRead(&*it);
ARROW_ASSIGN_OR_RAISE(auto buf, fut.result());
return SliceBuffer(std::move(buf), range.offset - it->range.offset, range.length);
}
return Status::Invalid("ReadRangeCache did not find matching cache entry");
}

virtual Future<> Wait() {
std::vector<Future<>> futures;
for (auto& entry : entries) {
futures.emplace_back(MaybeRead(&entry));
}
return AllComplete(futures);
}

// Return a Future that completes when the given ranges have been read.
virtual Future<> WaitFor(std::vector<ReadRange> ranges) {
auto end = std::remove_if(ranges.begin(), ranges.end(),
[](const ReadRange& range) { return range.length == 0; });
ranges.resize(end - ranges.begin());
std::vector<Future<>> futures;
futures.reserve(ranges.size());
for (auto& range : ranges) {
const auto it = std::lower_bound(
entries.begin(), entries.end(), range,
[](const RangeCacheEntry& entry, const ReadRange& range) {
return entry.range.offset + entry.range.length < range.offset + range.length;
});
if (it != entries.end() && it->range.Contains(range)) {
futures.push_back(Future<>(MaybeRead(&*it)));
} else {
return Status::Invalid("Range was not requested for caching: offset=",
range.offset, " length=", range.length);
}
}
return AllComplete(futures);
}
};

// Don't read ranges when they're first added. Instead, wait until they're requested
// (either through Read or WaitFor).
struct ReadRangeCache::LazyImpl : public ReadRangeCache::Impl {
// Protect against concurrent modification of entries[i]->future
std::mutex entry_mutex;

virtual ~LazyImpl() = default;

Future<std::shared_ptr<Buffer>> MaybeRead(RangeCacheEntry* entry) override {
// Called by superclass Read()/WaitFor() so we have the lock
if (!entry->future.is_valid()) {
entry->future = file->ReadAsync(ctx, entry->range.offset, entry->range.length);
}
return entry->future;
}

std::vector<RangeCacheEntry> MakeCacheEntries(
const std::vector<ReadRange>& ranges) override {
std::vector<RangeCacheEntry> new_entries;
new_entries.reserve(ranges.size());
for (const auto& range : ranges) {
// In the lazy variant, don't read data here - later, a call to Read or WaitFor
// will call back to MaybeRead (under the lock) which will fill the future.
new_entries.emplace_back(range, Future<std::shared_ptr<Buffer>>());
}
return new_entries;
}

Status Cache(std::vector<ReadRange> ranges) override {
std::unique_lock<std::mutex> guard(entry_mutex);
return ReadRangeCache::Impl::Cache(std::move(ranges));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current file->ReadAsync has some leeway in it which allows the method to be synchronous if needed. If that is the case this could end up holding onto the lock for a while. Actually, it looks like you have guards on the Wait/WaitFor method as well so perhaps this isn't intended to be consumed by multiple threads?

Could you maybe add a short comment explaining how you expect this class to be used (e.g. first a thread does a bunch of cache calls and then a bunch of read calls? Or maybe there are multiple threads calling cache or read?)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding to this, could you create a simple test case around whatever type of multithreading you expect to guard against with these mutexes?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. I feel that if ReadAsync is synchronous, that's because it's also very fast (e.g. in-memory copy), in which case it's not a concern. I'll document the usage pattern and both variants.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or put another way, (when lazy == true) this 'passes through' the synchronicity of ReadAsync (an oxymoron if there ever was one), which is the intent.

}

Result<std::shared_ptr<Buffer>> Read(ReadRange range) override {
std::unique_lock<std::mutex> guard(entry_mutex);
return ReadRangeCache::Impl::Read(range);
}

Future<> Wait() override {
std::unique_lock<std::mutex> guard(entry_mutex);
return ReadRangeCache::Impl::Wait();
}

Future<> WaitFor(std::vector<ReadRange> ranges) override {
std::unique_lock<std::mutex> guard(entry_mutex);
return ReadRangeCache::Impl::WaitFor(std::move(ranges));
}
};

ReadRangeCache::ReadRangeCache(std::shared_ptr<RandomAccessFile> file, IOContext ctx,
CacheOptions options)
: impl_(new Impl()) {
: impl_(options.lazy ? new LazyImpl() : new Impl()) {
impl_->file = std::move(file);
impl_->ctx = std::move(ctx);
impl_->options = options;
Expand All @@ -163,44 +300,17 @@ ReadRangeCache::ReadRangeCache(std::shared_ptr<RandomAccessFile> file, IOContext
ReadRangeCache::~ReadRangeCache() = default;

Status ReadRangeCache::Cache(std::vector<ReadRange> ranges) {
ranges = internal::CoalesceReadRanges(std::move(ranges), impl_->options.hole_size_limit,
impl_->options.range_size_limit);
std::vector<RangeCacheEntry> entries;
entries.reserve(ranges.size());
for (const auto& range : ranges) {
auto fut = impl_->file->ReadAsync(impl_->ctx, range.offset, range.length);
entries.push_back({range, std::move(fut)});
}

impl_->AddEntries(std::move(entries));
// Prefetch immediately, regardless of executor availability, if possible
return impl_->file->WillNeed(ranges);
return impl_->Cache(std::move(ranges));
}

Result<std::shared_ptr<Buffer>> ReadRangeCache::Read(ReadRange range) {
if (range.length == 0) {
static const uint8_t byte = 0;
return std::make_shared<Buffer>(&byte, 0);
}

const auto it = std::lower_bound(
impl_->entries.begin(), impl_->entries.end(), range,
[](const RangeCacheEntry& entry, const ReadRange& range) {
return entry.range.offset + entry.range.length < range.offset + range.length;
});
if (it != impl_->entries.end() && it->range.Contains(range)) {
ARROW_ASSIGN_OR_RAISE(auto buf, it->future.result());
return SliceBuffer(std::move(buf), range.offset - it->range.offset, range.length);
}
return Status::Invalid("ReadRangeCache did not find matching cache entry");
return impl_->Read(range);
}

Future<> ReadRangeCache::Wait() {
std::vector<Future<>> futures;
for (const auto& entry : impl_->entries) {
futures.emplace_back(entry.future);
}
return AllComplete(futures);
Future<> ReadRangeCache::Wait() { return impl_->Wait(); }

Future<> ReadRangeCache::WaitFor(std::vector<ReadRange> ranges) {
return impl_->WaitFor(std::move(ranges));
}

} // namespace internal
Expand Down
39 changes: 32 additions & 7 deletions cpp/src/arrow/io/caching.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,19 @@ struct ARROW_EXPORT CacheOptions {
static constexpr double kDefaultIdealBandwidthUtilizationFrac = 0.9;
static constexpr int64_t kDefaultMaxIdealRequestSizeMib = 64;

/// /brief The maximum distance in bytes between two consecutive
/// \brief The maximum distance in bytes between two consecutive
/// ranges; beyond this value, ranges are not combined
int64_t hole_size_limit;
/// /brief The maximum size in bytes of a combined range; if
/// \brief The maximum size in bytes of a combined range; if
/// combining two consecutive ranges would produce a range of a
/// size greater than this, they are not combined
int64_t range_size_limit;
/// \brief A lazy cache does not perform any I/O until requested.
bool lazy;

bool operator==(const CacheOptions& other) const {
return hole_size_limit == other.hole_size_limit &&
range_size_limit == other.range_size_limit;
range_size_limit == other.range_size_limit && lazy == other.lazy;
}

/// \brief Construct CacheOptions from network storage metrics (e.g. S3).
Expand All @@ -67,16 +69,34 @@ struct ARROW_EXPORT CacheOptions {
int64_t max_ideal_request_size_mib = kDefaultMaxIdealRequestSizeMib);

static CacheOptions Defaults();
static CacheOptions LazyDefaults();
};

namespace internal {

/// \brief A read cache designed to hide IO latencies when reading.
///
/// To use this, you must first pass it the ranges you'll need in the future.
/// The cache will combine those ranges according to parameters (see constructor)
/// and start fetching the combined ranges in the background.
/// You can then individually fetch them using Read().
/// This class takes multiple byte ranges that an application expects to read, and
/// coalesces them into fewer, larger read requests, which benefits performance on some
/// filesystems, particularly remote ones like Amazon S3. By default, it also issues
/// these read requests in parallel up front.
///
/// To use:
/// 1. Cache() the ranges you expect to read in the future. Ideally, these ranges have
/// the exact offset and length that will later be read. The cache will combine those
/// ranges according to parameters (see constructor).
///
/// By default, the cache will also start fetching the combined ranges in parallel in
/// the background, unless CacheOptions.lazy is set.
///
/// 2. Call WaitFor() to be notified when the given ranges have been read. If
/// CacheOptions.lazy is set, I/O will be triggered in the background here instead.
/// This can be done in parallel (e.g. if parsing a file, call WaitFor() for each
/// chunk of the file that can be parsed in parallel).
///
/// 3. Call Read() to retrieve the actual data for the given ranges.
/// A synchronous application may skip WaitFor() and just call Read() - it will still
/// benefit from coalescing and parallel fetching.
class ARROW_EXPORT ReadRangeCache {
public:
static constexpr int64_t kDefaultHoleSizeLimit = 8192;
Expand All @@ -103,8 +123,13 @@ class ARROW_EXPORT ReadRangeCache {
/// \brief Wait until all ranges added so far have been cached.
Future<> Wait();

/// \brief Wait until all given ranges have been cached.
Future<> WaitFor(std::vector<ReadRange> ranges);

protected:
struct Impl;
struct LazyImpl;

std::unique_ptr<Impl> impl_;
};

Expand Down
10 changes: 8 additions & 2 deletions cpp/src/arrow/io/interfaces.cc
Original file line number Diff line number Diff line change
Expand Up @@ -357,10 +357,16 @@ struct ReadRangeCombiner {
// Remove zero-sized ranges
auto end = std::remove_if(ranges.begin(), ranges.end(),
[](const ReadRange& range) { return range.length == 0; });
ranges.resize(end - ranges.begin());
// Sort in position order
std::sort(ranges.begin(), ranges.end(),
std::sort(ranges.begin(), end,
[](const ReadRange& a, const ReadRange& b) { return a.offset < b.offset; });
// Remove ranges that overlap 100%
end = std::unique(ranges.begin(), end,
[](const ReadRange& left, const ReadRange& right) {
return right.offset >= left.offset &&
right.offset + right.length <= left.offset + left.length;
});
ranges.resize(end - ranges.begin());

// Skip further processing if ranges is empty after removing zero-sized ranges.
if (ranges.empty()) {
Expand Down
Loading