Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions cpp/src/arrow/filesystem/s3fs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -996,10 +996,11 @@ class ObjectInputFile final : public io::RandomAccessFile {
public:
ObjectInputFile(std::shared_ptr<Aws::S3::S3Client> client,
const io::IOContext& io_context, const S3Path& path,
int64_t size = kNoSize)
const io::CoalesceOptions& coalesceOptions, int64_t size = kNoSize)
: client_(std::move(client)),
io_context_(io_context),
path_(path),
coalesceOptions(coalesceOptions),
content_length_(size) {}

Status Init() {
Expand Down Expand Up @@ -1128,6 +1129,14 @@ class ObjectInputFile final : public io::RandomAccessFile {
return bytes_read;
}

std::vector<Future<std::shared_ptr<Buffer>>> ReadManyAsync(
const io::IOContext& ctx, const std::vector<io::ReadRange>& ranges) override {
return RandomAccessFile::ReadManyAsync(
ctx, io::internal::CoalesceReadRanges(std::move(ranges),
coalesceOptions.hole_size_limit,
coalesceOptions.range_size_limit));
Comment on lines +1135 to +1137
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since coalescing may merge ranges, you'll need to post-process things so that the returned futures map 1:1 with the original ranges. (May need to refactor things so you don't have to do the coalescing step twice?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, thanks David, I'm writing a test for verifying these cases.

}

Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) override {
ARROW_ASSIGN_OR_RAISE(auto buffer, ReadAt(pos_, nbytes));
pos_ += buffer->size();
Expand All @@ -1138,6 +1147,7 @@ class ObjectInputFile final : public io::RandomAccessFile {
std::shared_ptr<Aws::S3::S3Client> client_;
const io::IOContext io_context_;
S3Path path_;
io::CoalesceOptions coalesceOptions;

bool closed_ = false;
int64_t pos_ = 0;
Expand Down Expand Up @@ -2178,7 +2188,8 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
RETURN_NOT_OK(ValidateFilePath(path));

auto ptr = std::make_shared<ObjectInputFile>(client_, fs->io_context(), path);
auto ptr = std::make_shared<ObjectInputFile>(client_, fs->io_context(), path,
builder_.options().coalesceOptions);
RETURN_NOT_OK(ptr->Init());
return ptr;
}
Expand All @@ -2196,8 +2207,8 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(info.path()));
RETURN_NOT_OK(ValidateFilePath(path));

auto ptr =
std::make_shared<ObjectInputFile>(client_, fs->io_context(), path, info.size());
auto ptr = std::make_shared<ObjectInputFile>(
client_, fs->io_context(), path, builder_.options().coalesceOptions, info.size());
RETURN_NOT_OK(ptr->Init());
return ptr;
}
Expand Down
7 changes: 7 additions & 0 deletions cpp/src/arrow/filesystem/s3fs.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <vector>

#include "arrow/filesystem/filesystem.h"
#include "arrow/io/caching.h"
#include "arrow/util/macros.h"
#include "arrow/util/uri.h"

Expand Down Expand Up @@ -167,6 +168,12 @@ struct ARROW_EXPORT S3Options {
/// delay between retries.
std::shared_ptr<S3RetryStrategy> retry_strategy;

/// Coalesce options for requesting multiple reads at once
io::CoalesceOptions coalesceOptions = io::CoalesceOptions::Defaults();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: follow the naming conventions (snake_case)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, btw I'll use here io::CoalesceOptions::MakeFromNetworkMetrics(5, 500) as default value (i.e. hole_size_limit = 2.5 MiB and range_size_limit = 22.5 MiB)


int64_t hole_size_limit = 1;
int64_t range_size_limit = 100;
Comment on lines +174 to +175
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need these separately?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo, I forgot to delete those fields


S3Options();

/// Configure with the default AWS credentials provider chain.
Expand Down
27 changes: 13 additions & 14 deletions cpp/src/arrow/io/caching.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,21 @@
namespace arrow {
namespace io {

CacheOptions CacheOptions::Defaults() {
return CacheOptions{internal::ReadRangeCache::kDefaultHoleSizeLimit,
internal::ReadRangeCache::kDefaultRangeSizeLimit,
/*lazy=*/false};
CoalesceOptions CoalesceOptions::Defaults() {
return CoalesceOptions{internal::ReadRangeCache::kDefaultHoleSizeLimit,
internal::ReadRangeCache::kDefaultRangeSizeLimit,
/*lazy=*/false};
}

CacheOptions CacheOptions::LazyDefaults() {
return CacheOptions{internal::ReadRangeCache::kDefaultHoleSizeLimit,
internal::ReadRangeCache::kDefaultRangeSizeLimit,
/*lazy=*/true};
CoalesceOptions CoalesceOptions::LazyDefaults() {
return CoalesceOptions{internal::ReadRangeCache::kDefaultHoleSizeLimit,
internal::ReadRangeCache::kDefaultRangeSizeLimit,
/*lazy=*/true};
}

CacheOptions CacheOptions::MakeFromNetworkMetrics(int64_t time_to_first_byte_millis,
int64_t transfer_bandwidth_mib_per_sec,
double ideal_bandwidth_utilization_frac,
int64_t max_ideal_request_size_mib) {
CoalesceOptions CoalesceOptions::MakeFromNetworkMetrics(
int64_t time_to_first_byte_millis, int64_t transfer_bandwidth_mib_per_sec,
double ideal_bandwidth_utilization_frac, int64_t max_ideal_request_size_mib) {
//
// The I/O coalescing algorithm uses two parameters:
// 1. hole_size_limit (a.k.a max_io_gap): Max I/O gap/hole size in bytes
Expand Down Expand Up @@ -147,7 +146,7 @@ struct ReadRangeCache::Impl {
std::shared_ptr<RandomAccessFile> owned_file;
RandomAccessFile* file;
IOContext ctx;
CacheOptions options;
CoalesceOptions options;

// Ordered by offset (so as to find a matching region by binary search)
std::vector<RangeCacheEntry> entries;
Expand Down Expand Up @@ -292,7 +291,7 @@ struct ReadRangeCache::LazyImpl : public ReadRangeCache::Impl {

ReadRangeCache::ReadRangeCache(std::shared_ptr<RandomAccessFile> owned_file,
RandomAccessFile* file, IOContext ctx,
CacheOptions options)
CoalesceOptions options)
: impl_(options.lazy ? new LazyImpl() : new Impl()) {
impl_->owned_file = std::move(owned_file);
impl_->file = file;
Expand Down
20 changes: 11 additions & 9 deletions cpp/src/arrow/io/caching.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
namespace arrow {
namespace io {

struct ARROW_EXPORT CacheOptions {
struct ARROW_EXPORT CoalesceOptions {
static constexpr double kDefaultIdealBandwidthUtilizationFrac = 0.9;
static constexpr int64_t kDefaultMaxIdealRequestSizeMib = 64;

Expand All @@ -44,7 +44,7 @@ struct ARROW_EXPORT CacheOptions {
/// \brief A lazy cache does not perform any I/O until requested.
bool lazy;

bool operator==(const CacheOptions& other) const {
bool operator==(const CoalesceOptions& other) const {
return hole_size_limit == other.hole_size_limit &&
range_size_limit == other.range_size_limit && lazy == other.lazy;
}
Expand All @@ -63,15 +63,17 @@ struct ARROW_EXPORT CacheOptions {
/// to maximize the net data load.
/// The value is a positive integer.
/// \return A new instance of CacheOptions.
static CacheOptions MakeFromNetworkMetrics(
static CoalesceOptions MakeFromNetworkMetrics(
int64_t time_to_first_byte_millis, int64_t transfer_bandwidth_mib_per_sec,
double ideal_bandwidth_utilization_frac = kDefaultIdealBandwidthUtilizationFrac,
int64_t max_ideal_request_size_mib = kDefaultMaxIdealRequestSizeMib);

static CacheOptions Defaults();
static CacheOptions LazyDefaults();
static CoalesceOptions Defaults();
static CoalesceOptions LazyDefaults();
};

using CacheOptions [[deprecated]] = CoalesceOptions;

namespace internal {

/// \brief A read cache designed to hide IO latencies when reading.
Expand Down Expand Up @@ -104,15 +106,15 @@ class ARROW_EXPORT ReadRangeCache {

/// Construct a read cache with default
explicit ReadRangeCache(std::shared_ptr<RandomAccessFile> file, IOContext ctx)
: ReadRangeCache(file, file.get(), std::move(ctx), CacheOptions::Defaults()) {}
: ReadRangeCache(file, file.get(), std::move(ctx), CoalesceOptions::Defaults()) {}

/// Construct a read cache with given options
explicit ReadRangeCache(std::shared_ptr<RandomAccessFile> file, IOContext ctx,
CacheOptions options)
CoalesceOptions options)
: ReadRangeCache(file, file.get(), ctx, options) {}

/// Construct a read cache with an unowned file
ReadRangeCache(RandomAccessFile* file, IOContext ctx, CacheOptions options)
ReadRangeCache(RandomAccessFile* file, IOContext ctx, CoalesceOptions options)
: ReadRangeCache(NULLPTR, file, ctx, options) {}

~ReadRangeCache();
Expand All @@ -137,7 +139,7 @@ class ARROW_EXPORT ReadRangeCache {
struct LazyImpl;

ReadRangeCache(std::shared_ptr<RandomAccessFile> owned_file, RandomAccessFile* file,
IOContext ctx, CacheOptions options);
IOContext ctx, CoalesceOptions options);

std::unique_ptr<Impl> impl_;
};
Expand Down
16 changes: 16 additions & 0 deletions cpp/src/arrow/io/file_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,22 @@ TEST_F(TestReadableFile, ReadAsync) {
AssertBufferEqual(*buf2, "test");
}

TEST_F(TestReadableFile, ReadManyAsync) {
MakeTestFile();
OpenFile();

std::vector<ReadRange> ranges = {{1, 3}, {2, 5}, {4, 2}};
auto futs = file_->ReadManyAsync(std::move(ranges));

ASSERT_EQ(futs.size(), 3);
ASSERT_OK_AND_ASSIGN(auto buf1, futs[0].result());
ASSERT_OK_AND_ASSIGN(auto buf2, futs[1].result());
ASSERT_OK_AND_ASSIGN(auto buf3, futs[2].result());
AssertBufferEqual(*buf1, "est");
AssertBufferEqual(*buf2, "stdat");
AssertBufferEqual(*buf3, "da");
}

TEST_F(TestReadableFile, SeekingRequired) {
MakeTestFile();
OpenFile();
Expand Down
14 changes: 14 additions & 0 deletions cpp/src/arrow/io/interfaces.cc
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,20 @@ Future<std::shared_ptr<Buffer>> RandomAccessFile::ReadAsync(int64_t position,
return ReadAsync(io_context(), position, nbytes);
}

std::vector<Future<std::shared_ptr<Buffer>>> RandomAccessFile::ReadManyAsync(
const IOContext&, const std::vector<ReadRange>& ranges) {
std::vector<Future<std::shared_ptr<Buffer>>> ret;
for (auto r : ranges) {
ret.push_back(this->ReadAsync(r.offset, r.length));
}
return ret;
}

std::vector<Future<std::shared_ptr<Buffer>>> RandomAccessFile::ReadManyAsync(
const std::vector<ReadRange>& ranges) {
return ReadManyAsync(io_context(), ranges);
}

// Default WillNeed() implementation: no-op
Status RandomAccessFile::WillNeed(const std::vector<ReadRange>& ranges) {
return Status::OK();
Expand Down
21 changes: 21 additions & 0 deletions cpp/src/arrow/io/interfaces.h
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,27 @@ class ARROW_EXPORT RandomAccessFile : public InputStream, public Seekable {
/// EXPERIMENTAL: Read data asynchronously, using the file's IOContext.
Future<std::shared_ptr<Buffer>> ReadAsync(int64_t position, int64_t nbytes);

/// EXPERIMENTAL: Explicit multi-read.
/// \brief Request multiple reads at once
///
/// The underlying filesystem may optimize these reads by coalescing small reads into
/// large reads or by breaking up large reads into multiple parallel smaller reads. The
/// reads should be issued in parallel if it makes sense for the filesystem.
///
/// One future will be returned for each input read range. Multiple returned futures
/// may correspond to a single read. Or, a single returned future may be a combined
/// result of several individual reads.
///
/// \param[in] ranges The ranges to read
/// \return A future that will complete with the data from the requested range is
/// available
virtual std::vector<Future<std::shared_ptr<Buffer>>> ReadManyAsync(
const IOContext&, const std::vector<ReadRange>& ranges);

/// EXPERIMENTAL: Explicit multi-read, using the file's IOContext.
std::vector<Future<std::shared_ptr<Buffer>>> ReadManyAsync(
const std::vector<ReadRange>& ranges);

/// EXPERIMENTAL: Inform that the given ranges may be read soon.
///
/// Some implementations might arrange to prefetch some of the data.
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/io/type_fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ struct FileMode {
};

struct IOContext;
struct CacheOptions;
struct CoalesceOptions;

/// EXPERIMENTAL: convenience global singleton for default IOContext settings
ARROW_EXPORT
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/ipc/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ struct ARROW_EXPORT IpcReadOptions {
/// \brief Options to control caching behavior when pre-buffering is requested
///
/// The lazy property will always be reset to true to deliver the expected behavior
io::CacheOptions pre_buffer_cache_options = io::CacheOptions::LazyDefaults();
io::CoalesceOptions pre_buffer_cache_options = io::CoalesceOptions::LazyDefaults();

static IpcReadOptions Defaults();
};
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/ipc/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1308,7 +1308,7 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {

Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> GetRecordBatchGenerator(
const bool coalesce, const io::IOContext& io_context,
const io::CacheOptions cache_options,
const io::CoalesceOptions cache_options,
arrow::internal::Executor* executor) override {
auto state = std::dynamic_pointer_cast<RecordBatchFileReaderImpl>(shared_from_this());
// Prebuffering causes us to use a lot of futures which, at the moment,
Expand Down Expand Up @@ -1537,7 +1537,7 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
owned_file(std::move(owned_file)),
loader(batch, context.metadata_version, context.options, block_data_offset),
columns(schema->num_fields()),
cache(file, file->io_context(), io::CacheOptions::LazyDefaults()),
cache(file, file->io_context(), io::CoalesceOptions::LazyDefaults()),
length(batch->length()) {}

Status CalculateLoadRequest() {
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/ipc/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ class ARROW_EXPORT RecordBatchFileReader
virtual Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> GetRecordBatchGenerator(
const bool coalesce = false,
const io::IOContext& io_context = io::default_io_context(),
const io::CacheOptions cache_options = io::CacheOptions::LazyDefaults(),
const io::CoalesceOptions cache_options = io::CoalesceOptions::LazyDefaults(),
arrow::internal::Executor* executor = NULLPTR) = 0;
};

Expand Down