Skip to content
9 changes: 6 additions & 3 deletions cpp/src/arrow/io/caching.cc
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ struct RangeCacheEntry {
};

struct ReadRangeCache::Impl {
std::shared_ptr<RandomAccessFile> file;
std::shared_ptr<RandomAccessFile> owned_file;
RandomAccessFile* file;
IOContext ctx;
CacheOptions options;

Expand Down Expand Up @@ -289,10 +290,12 @@ struct ReadRangeCache::LazyImpl : public ReadRangeCache::Impl {
}
};

ReadRangeCache::ReadRangeCache(std::shared_ptr<RandomAccessFile> file, IOContext ctx,
ReadRangeCache::ReadRangeCache(std::shared_ptr<RandomAccessFile> owned_file,
RandomAccessFile* file, IOContext ctx,
CacheOptions options)
: impl_(options.lazy ? new LazyImpl() : new Impl()) {
impl_->file = std::move(file);
impl_->owned_file = std::move(owned_file);
impl_->file = file;
impl_->ctx = std::move(ctx);
impl_->options = options;
}
Expand Down
13 changes: 11 additions & 2 deletions cpp/src/arrow/io/caching.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,17 @@ class ARROW_EXPORT ReadRangeCache {

/// Construct a read cache with default
explicit ReadRangeCache(std::shared_ptr<RandomAccessFile> file, IOContext ctx)
: ReadRangeCache(file, std::move(ctx), CacheOptions::Defaults()) {}
: ReadRangeCache(file, file.get(), std::move(ctx), CacheOptions::Defaults()) {}

/// Construct a read cache with given options
explicit ReadRangeCache(std::shared_ptr<RandomAccessFile> file, IOContext ctx,
CacheOptions options);
CacheOptions options)
: ReadRangeCache(file, file.get(), ctx, options) {}

/// Construct a read cache with an unowned file
ReadRangeCache(RandomAccessFile* file, IOContext ctx, CacheOptions options)
: ReadRangeCache(NULLPTR, file, ctx, options) {}

~ReadRangeCache();

/// \brief Cache the given ranges in the background.
Expand All @@ -130,6 +136,9 @@ class ARROW_EXPORT ReadRangeCache {
struct Impl;
struct LazyImpl;

ReadRangeCache(std::shared_ptr<RandomAccessFile> owned_file, RandomAccessFile* file,
IOContext ctx, CacheOptions options);

std::unique_ptr<Impl> impl_;
};

Expand Down
61 changes: 54 additions & 7 deletions cpp/src/arrow/ipc/message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,50 @@ Status ReadFieldsSubset(int64_t offset, int32_t metadata_length,
return Status::OK();
}

Result<std::unique_ptr<Message>> ReadMessage(std::shared_ptr<Buffer> metadata,
std::shared_ptr<Buffer> body) {
std::unique_ptr<Message> result;
auto listener = std::make_shared<AssignMessageDecoderListener>(&result);
// If the user does not pass in a body buffer then we assume they are skipping it
MessageDecoder decoder(listener, default_memory_pool(), body == nullptr);

if (metadata->size() < decoder.next_required_size()) {
return Status::Invalid("metadata_length should be at least ",
decoder.next_required_size());
}

ARROW_RETURN_NOT_OK(decoder.Consume(metadata));

switch (decoder.state()) {
case MessageDecoder::State::INITIAL:
// Metadata did not request a body so we better not have provided one
DCHECK_EQ(body, nullptr);
return std::move(result);
case MessageDecoder::State::METADATA_LENGTH:
return Status::Invalid("metadata length is missing from the metadata buffer");
case MessageDecoder::State::METADATA:
return Status::Invalid("flatbuffer size ", decoder.next_required_size(),
" invalid. Buffer size: ", metadata->size());
case MessageDecoder::State::BODY: {
if (body == nullptr) {
// Caller didn't give a body so just give them a message without body
return std::move(result);
}
if (body->size() != decoder.next_required_size()) {
return Status::IOError("Expected body buffer to be ",
decoder.next_required_size(),
" bytes for message body, got ", body->size());
}
RETURN_NOT_OK(decoder.Consume(body));
return std::move(result);
}
case MessageDecoder::State::EOS:
return Status::Invalid("Unexpected empty message in IPC file format");
default:
return Status::Invalid("Unexpected state: ", decoder.state());
}
}

Result<std::unique_ptr<Message>> ReadMessage(int64_t offset, int32_t metadata_length,
io::RandomAccessFile* file,
const FieldsLoaderFunction& fields_loader) {
Expand Down Expand Up @@ -560,14 +604,15 @@ class MessageDecoder::MessageDecoderImpl {
public:
explicit MessageDecoderImpl(std::shared_ptr<MessageDecoderListener> listener,
State initial_state, int64_t initial_next_required_size,
MemoryPool* pool)
MemoryPool* pool, bool skip_body)
: listener_(std::move(listener)),
pool_(pool),
state_(initial_state),
next_required_size_(initial_next_required_size),
chunks_(),
buffered_size_(0),
metadata_(nullptr) {}
metadata_(nullptr),
skip_body_(skip_body) {}

Status ConsumeData(const uint8_t* data, int64_t size) {
if (buffered_size_ == 0) {
Expand Down Expand Up @@ -798,7 +843,7 @@ class MessageDecoder::MessageDecoderImpl {
RETURN_NOT_OK(CheckMetadataAndGetBodyLength(*metadata_, &body_length));

state_ = State::BODY;
next_required_size_ = body_length;
next_required_size_ = skip_body_ ? 0 : body_length;
RETURN_NOT_OK(listener_->OnBody());
if (next_required_size_ == 0) {
ARROW_ASSIGN_OR_RAISE(auto body, AllocateBuffer(0, pool_));
Expand Down Expand Up @@ -894,19 +939,21 @@ class MessageDecoder::MessageDecoderImpl {
std::vector<std::shared_ptr<Buffer>> chunks_;
int64_t buffered_size_;
std::shared_ptr<Buffer> metadata_; // Must be CPU buffer
bool skip_body_;
};

MessageDecoder::MessageDecoder(std::shared_ptr<MessageDecoderListener> listener,
MemoryPool* pool) {
MemoryPool* pool, bool skip_body) {
impl_.reset(new MessageDecoderImpl(std::move(listener), State::INITIAL,
kMessageDecoderNextRequiredSizeInitial, pool));
kMessageDecoderNextRequiredSizeInitial, pool,
skip_body));
}

MessageDecoder::MessageDecoder(std::shared_ptr<MessageDecoderListener> listener,
State initial_state, int64_t initial_next_required_size,
MemoryPool* pool) {
MemoryPool* pool, bool skip_body) {
impl_.reset(new MessageDecoderImpl(std::move(listener), initial_state,
initial_next_required_size, pool));
initial_next_required_size, pool, skip_body));
}

MessageDecoder::~MessageDecoder() {}
Expand Down
26 changes: 24 additions & 2 deletions cpp/src/arrow/ipc/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -266,9 +266,11 @@ class ARROW_EXPORT MessageDecoder {
/// \param[in] listener a MessageDecoderListener that responds events from
/// the decoder
/// \param[in] pool an optional MemoryPool to copy metadata on the
/// \param[in] skip_body if true the body will be skipped even if the message has a body
/// CPU, if required
explicit MessageDecoder(std::shared_ptr<MessageDecoderListener> listener,
MemoryPool* pool = default_memory_pool());
MemoryPool* pool = default_memory_pool(),
bool skip_body = false);

/// \brief Construct a message decoder with the specified state.
///
Expand All @@ -282,9 +284,10 @@ class ARROW_EXPORT MessageDecoder {
/// to run the next action
/// \param[in] pool an optional MemoryPool to copy metadata on the
/// CPU, if required
/// \param[in] skip_body if true the body will be skipped even if the message has a body
MessageDecoder(std::shared_ptr<MessageDecoderListener> listener, State initial_state,
int64_t initial_next_required_size,
MemoryPool* pool = default_memory_pool());
MemoryPool* pool = default_memory_pool(), bool skip_body = false);

virtual ~MessageDecoder();

Expand Down Expand Up @@ -466,6 +469,25 @@ Result<std::unique_ptr<Message>> ReadMessage(
const int64_t offset, const int32_t metadata_length, io::RandomAccessFile* file,
const FieldsLoaderFunction& fields_loader = {});

/// \brief Read encapsulated RPC message from cached buffers
///
/// The buffers should contain an entire message. Partial reads are not handled.
///
/// This method can be used to read just the metadata by passing in a nullptr for the
/// body. The body will then be skipped and the body size will not be validated.
///
/// If the body buffer is provided then it must be the complete body buffer
///
/// This is similar to Message::Open but performs slightly more validation (e.g. checks
/// to see that the metadata length is correct and that the body is the size the metadata
/// expected)
///
/// \param metadata The bytes for the metadata
/// \param body The bytes for the body
/// \return The message represented by the buffers
ARROW_EXPORT Result<std::unique_ptr<Message>> ReadMessage(
std::shared_ptr<Buffer> metadata, std::shared_ptr<Buffer> body);

ARROW_EXPORT
Future<std::shared_ptr<Message>> ReadMessageAsync(
const int64_t offset, const int32_t metadata_length, const int64_t body_length,
Expand Down
6 changes: 6 additions & 0 deletions cpp/src/arrow/ipc/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <cstdint>
#include <vector>

#include "arrow/io/caching.h"
#include "arrow/ipc/type_fwd.h"
#include "arrow/status.h"
#include "arrow/type_fwd.h"
Expand Down Expand Up @@ -148,6 +149,11 @@ struct ARROW_EXPORT IpcReadOptions {
/// RecordBatchStreamReader and StreamDecoder classes.
bool ensure_native_endian = true;

/// \brief Options to control caching behavior when pre-buffering is requested
///
/// The lazy property will always be reset to true to deliver the expected behavior
io::CacheOptions pre_buffer_cache_options = io::CacheOptions::LazyDefaults();

static IpcReadOptions Defaults();
};

Expand Down
Loading