Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 0 additions & 107 deletions cpp/src/arrow/buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
#include <cstdint>
#include <utility>

#include "arrow/memory_pool.h"
#include "arrow/result.h"
#include "arrow/status.h"
#include "arrow/util/bit_util.h"
Expand Down Expand Up @@ -171,112 +170,6 @@ MutableBuffer::MutableBuffer(const std::shared_ptr<Buffer>& parent, const int64_
parent_ = parent;
}

// -----------------------------------------------------------------------
// Pool buffer and allocation

/// A Buffer whose lifetime is tied to a particular MemoryPool
class PoolBuffer final : public ResizableBuffer {
public:
explicit PoolBuffer(std::shared_ptr<MemoryManager> mm, MemoryPool* pool)
: ResizableBuffer(nullptr, 0, std::move(mm)), pool_(pool) {}

~PoolBuffer() override {
uint8_t* ptr = mutable_data();
if (ptr) {
pool_->Free(ptr, capacity_);
}
}

Status Reserve(const int64_t capacity) override {
if (capacity < 0) {
return Status::Invalid("Negative buffer capacity: ", capacity);
}
uint8_t* ptr = mutable_data();
if (!ptr || capacity > capacity_) {
int64_t new_capacity = BitUtil::RoundUpToMultipleOf64(capacity);
if (ptr) {
RETURN_NOT_OK(pool_->Reallocate(capacity_, new_capacity, &ptr));
} else {
RETURN_NOT_OK(pool_->Allocate(new_capacity, &ptr));
}
data_ = ptr;
capacity_ = new_capacity;
}
return Status::OK();
}

Status Resize(const int64_t new_size, bool shrink_to_fit = true) override {
if (ARROW_PREDICT_FALSE(new_size < 0)) {
return Status::Invalid("Negative buffer resize: ", new_size);
}
uint8_t* ptr = mutable_data();
if (ptr && shrink_to_fit && new_size <= size_) {
// Buffer is non-null and is not growing, so shrink to the requested size without
// excess space.
int64_t new_capacity = BitUtil::RoundUpToMultipleOf64(new_size);
if (capacity_ != new_capacity) {
// Buffer hasn't got yet the requested size.
RETURN_NOT_OK(pool_->Reallocate(capacity_, new_capacity, &ptr));
data_ = ptr;
capacity_ = new_capacity;
}
} else {
RETURN_NOT_OK(Reserve(new_size));
}
size_ = new_size;

return Status::OK();
}

static std::shared_ptr<PoolBuffer> MakeShared(MemoryPool* pool) {
std::shared_ptr<MemoryManager> mm;
if (pool == nullptr) {
pool = default_memory_pool();
mm = default_cpu_memory_manager();
} else {
mm = CPUDevice::memory_manager(pool);
}
return std::make_shared<PoolBuffer>(std::move(mm), pool);
}

static std::unique_ptr<PoolBuffer> MakeUnique(MemoryPool* pool) {
std::shared_ptr<MemoryManager> mm;
if (pool == nullptr) {
pool = default_memory_pool();
mm = default_cpu_memory_manager();
} else {
mm = CPUDevice::memory_manager(pool);
}
return std::unique_ptr<PoolBuffer>(new PoolBuffer(std::move(mm), pool));
}

private:
MemoryPool* pool_;
};

namespace {
// A utility that does most of the work of the `AllocateBuffer` and
// `AllocateResizableBuffer` methods. The argument `buffer` should be a smart pointer to
// a PoolBuffer.
template <typename BufferPtr, typename PoolBufferPtr>
inline Result<BufferPtr> ResizePoolBuffer(PoolBufferPtr&& buffer, const int64_t size) {
RETURN_NOT_OK(buffer->Resize(size));
buffer->ZeroPadding();
return std::move(buffer);
}

} // namespace

Result<std::unique_ptr<Buffer>> AllocateBuffer(const int64_t size, MemoryPool* pool) {
return ResizePoolBuffer<std::unique_ptr<Buffer>>(PoolBuffer::MakeUnique(pool), size);
}

Result<std::unique_ptr<ResizableBuffer>> AllocateResizableBuffer(const int64_t size,
MemoryPool* pool) {
return ResizePoolBuffer<std::unique_ptr<ResizableBuffer>>(PoolBuffer::MakeUnique(pool),
size);
}

Result<std::shared_ptr<Buffer>> AllocateBitmap(int64_t length, MemoryPool* pool) {
ARROW_ASSIGN_OR_RAISE(auto buf, AllocateBuffer(BitUtil::BytesForBits(length), pool));
// Zero out any trailing bits
Expand Down
104 changes: 80 additions & 24 deletions cpp/src/arrow/dataset/file_ipc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,21 @@ static inline Result<std::shared_ptr<ipc::RecordBatchFileReader>> OpenReader(
return reader;
}

static inline Future<std::shared_ptr<ipc::RecordBatchFileReader>> OpenReaderAsync(
const FileSource& source,
const ipc::IpcReadOptions& options = default_read_options()) {
ARROW_ASSIGN_OR_RAISE(auto input, source.Open());
auto path = source.path();
return ipc::RecordBatchFileReader::OpenAsync(std::move(input), options)
.Then([](const std::shared_ptr<ipc::RecordBatchFileReader>& reader)
-> Result<std::shared_ptr<ipc::RecordBatchFileReader>> { return reader; },
[path](const Status& status)
-> Result<std::shared_ptr<ipc::RecordBatchFileReader>> {
return status.WithMessage("Could not open IPC input source '", path,
"': ", status.message());
});
}

static inline Result<std::vector<int>> GetIncludedFields(
const Schema& schema, const std::vector<std::string>& materialized_fields) {
std::vector<int> included_fields;
Expand All @@ -73,6 +88,26 @@ static inline Result<std::vector<int>> GetIncludedFields(
return included_fields;
}

static inline Result<ipc::IpcReadOptions> GetReadOptions(
const Schema& schema, const FileFormat& format, const ScanOptions& scan_options) {
ARROW_ASSIGN_OR_RAISE(
auto ipc_scan_options,
GetFragmentScanOptions<IpcFragmentScanOptions>(
kIpcTypeName, &scan_options, format.default_fragment_scan_options));
auto options =
ipc_scan_options->options ? *ipc_scan_options->options : default_read_options();
options.memory_pool = scan_options.pool;
if (!options.included_fields.empty()) {
// Cannot set them here
ARROW_LOG(WARNING) << "IpcFragmentScanOptions.options->included_fields was set "
"but will be ignored; included_fields are derived from "
"fields referenced by the scan";
}
ARROW_ASSIGN_OR_RAISE(options.included_fields,
GetIncludedFields(schema, scan_options.MaterializedFields()));
return options;
}

/// \brief A ScanTask backed by an Ipc file.
class IpcScanTask : public ScanTask {
public:
Expand All @@ -83,28 +118,11 @@ class IpcScanTask : public ScanTask {
Result<RecordBatchIterator> Execute() override {
struct Impl {
static Result<RecordBatchIterator> Make(const FileSource& source,
FileFormat* format,
const ScanOptions* scan_options) {
const FileFormat& format,
const ScanOptions& scan_options) {
ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source));

ARROW_ASSIGN_OR_RAISE(
auto ipc_scan_options,
GetFragmentScanOptions<IpcFragmentScanOptions>(
kIpcTypeName, scan_options, format->default_fragment_scan_options));
auto options = ipc_scan_options->options ? *ipc_scan_options->options
: default_read_options();
options.memory_pool = scan_options->pool;
options.use_threads = false;
if (!options.included_fields.empty()) {
// Cannot set them here
ARROW_LOG(WARNING) << "IpcFragmentScanOptions.options->included_fields was set "
"but will be ignored; included_fields are derived from "
"fields referenced by the scan";
}
ARROW_ASSIGN_OR_RAISE(
options.included_fields,
GetIncludedFields(*reader->schema(), scan_options->MaterializedFields()));

ARROW_ASSIGN_OR_RAISE(auto options,
GetReadOptions(*reader->schema(), format, scan_options));
ARROW_ASSIGN_OR_RAISE(reader, OpenReader(source, options));
return RecordBatchIterator(Impl{std::move(reader), 0});
}
Expand All @@ -121,9 +139,9 @@ class IpcScanTask : public ScanTask {
int i_;
};

return Impl::Make(
source_, internal::checked_pointer_cast<FileFragment>(fragment_)->format().get(),
options_.get());
return Impl::Make(source_,
*internal::checked_pointer_cast<FileFragment>(fragment_)->format(),
*options_);
}

private:
Expand Down Expand Up @@ -173,6 +191,44 @@ Result<ScanTaskIterator> IpcFileFormat::ScanFile(
return IpcScanTaskIterator::Make(options, fragment);
}

Result<RecordBatchGenerator> IpcFileFormat::ScanBatchesAsync(
const std::shared_ptr<ScanOptions>& options,
const std::shared_ptr<FileFragment>& file) const {
auto self = shared_from_this();
auto source = file->source();
auto open_reader = OpenReaderAsync(source);
auto reopen_reader = [self, options,
source](std::shared_ptr<ipc::RecordBatchFileReader> reader)
-> Future<std::shared_ptr<ipc::RecordBatchFileReader>> {
ARROW_ASSIGN_OR_RAISE(auto options,
GetReadOptions(*reader->schema(), *self, *options));
return OpenReader(source, options);
};
auto readahead_level = options->batch_readahead;
auto default_fragment_scan_options = this->default_fragment_scan_options;
auto open_generator = [=](const std::shared_ptr<ipc::RecordBatchFileReader>& reader)
-> Result<RecordBatchGenerator> {
ARROW_ASSIGN_OR_RAISE(
auto ipc_scan_options,
GetFragmentScanOptions<IpcFragmentScanOptions>(kIpcTypeName, options.get(),
default_fragment_scan_options));

RecordBatchGenerator generator;
if (ipc_scan_options->cache_options) {
// Transferring helps performance when coalescing
ARROW_ASSIGN_OR_RAISE(
generator, reader->GetRecordBatchGenerator(
/*coalesce=*/true, options->io_context,
*ipc_scan_options->cache_options, internal::GetCpuThreadPool()));
} else {
ARROW_ASSIGN_OR_RAISE(generator, reader->GetRecordBatchGenerator(
/*coalesce=*/false, options->io_context));
}
return MakeReadaheadGenerator(std::move(generator), readahead_level);
};
return MakeFromFuture(open_reader.Then(reopen_reader).Then(open_generator));
}

Future<util::optional<int64_t>> IpcFileFormat::CountRows(
const std::shared_ptr<FileFragment>& file, compute::Expression predicate,
std::shared_ptr<ScanOptions> options) {
Expand Down
8 changes: 8 additions & 0 deletions cpp/src/arrow/dataset/file_ipc.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "arrow/dataset/file_base.h"
#include "arrow/dataset/type_fwd.h"
#include "arrow/dataset/visibility.h"
#include "arrow/io/type_fwd.h"
#include "arrow/ipc/type_fwd.h"
#include "arrow/result.h"

Expand Down Expand Up @@ -56,6 +57,10 @@ class ARROW_DS_EXPORT IpcFileFormat : public FileFormat {
const std::shared_ptr<ScanOptions>& options,
const std::shared_ptr<FileFragment>& fragment) const override;

Result<RecordBatchGenerator> ScanBatchesAsync(
const std::shared_ptr<ScanOptions>& options,
const std::shared_ptr<FileFragment>& file) const override;

Future<util::optional<int64_t>> CountRows(
const std::shared_ptr<FileFragment>& file, compute::Expression predicate,
std::shared_ptr<ScanOptions> options) override;
Expand All @@ -75,6 +80,9 @@ class ARROW_DS_EXPORT IpcFragmentScanOptions : public FragmentScanOptions {
/// Options passed to the IPC file reader.
/// included_fields, memory_pool, and use_threads are ignored.
std::shared_ptr<ipc::IpcReadOptions> options;
/// If present, the async scanner will enable I/O coalescing.
/// This is ignored by the sync scanner.
std::shared_ptr<io::CacheOptions> cache_options;
};

class ARROW_DS_EXPORT IpcFileWriteOptions : public FileWriteOptions {
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/io/memory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -344,8 +344,8 @@ Result<std::shared_ptr<Buffer>> BufferReader::DoReadAt(int64_t position, int64_t
DCHECK_GE(nbytes, 0);

// Arrange for data to be paged in
RETURN_NOT_OK(::arrow::internal::MemoryAdviseWillNeed(
{{const_cast<uint8_t*>(data_ + position), static_cast<size_t>(nbytes)}}));
// RETURN_NOT_OK(::arrow::internal::MemoryAdviseWillNeed(
// {{const_cast<uint8_t*>(data_ + position), static_cast<size_t>(nbytes)}}));

if (nbytes > 0 && buffer_ != nullptr) {
return SliceBuffer(buffer_, position, nbytes);
Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/io/type_fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ struct FileMode {
};

struct IOContext;
struct CacheOptions;

/// EXPERIMENTAL: convenience global singleton for default IOContext settings
ARROW_EXPORT
Expand Down
55 changes: 55 additions & 0 deletions cpp/src/arrow/ipc/message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "arrow/ipc/util.h"
#include "arrow/status.h"
#include "arrow/util/endian.h"
#include "arrow/util/future.h"
#include "arrow/util/logging.h"
#include "arrow/util/ubsan.h"

Expand Down Expand Up @@ -324,6 +325,60 @@ Result<std::unique_ptr<Message>> ReadMessage(int64_t offset, int32_t metadata_le
}
}

Future<std::shared_ptr<Message>> ReadMessageAsync(int64_t offset, int32_t metadata_length,
int64_t body_length,
io::RandomAccessFile* file,
const io::IOContext& context) {
struct State {
std::unique_ptr<Message> result;
std::shared_ptr<MessageDecoderListener> listener;
std::shared_ptr<MessageDecoder> decoder;
};
auto state = std::make_shared<State>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since State is already a shared_ptr do result, listener, and decoder need to be separate allocations?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately yes, since MessageDecoder takes shared_ptr/unique_ptr directly.

state->listener = std::make_shared<AssignMessageDecoderListener>(&state->result);
state->decoder = std::make_shared<MessageDecoder>(state->listener);

if (metadata_length < state->decoder->next_required_size()) {
return Status::Invalid("metadata_length should be at least ",
state->decoder->next_required_size());
}
return file->ReadAsync(context, offset, metadata_length + body_length)
.Then([=](std::shared_ptr<Buffer> metadata) -> Result<std::shared_ptr<Message>> {
if (metadata->size() < metadata_length) {
return Status::Invalid("Expected to read ", metadata_length,
" metadata bytes but got ", metadata->size());
}
ARROW_RETURN_NOT_OK(
state->decoder->Consume(SliceBuffer(metadata, 0, metadata_length)));
switch (state->decoder->state()) {
case MessageDecoder::State::INITIAL:
return std::move(state->result);
case MessageDecoder::State::METADATA_LENGTH:
return Status::Invalid("metadata length is missing. File offset: ", offset,
", metadata length: ", metadata_length);
case MessageDecoder::State::METADATA:
return Status::Invalid("flatbuffer size ",
state->decoder->next_required_size(),
" invalid. File offset: ", offset,
", metadata length: ", metadata_length);
case MessageDecoder::State::BODY: {
auto body = SliceBuffer(metadata, metadata_length, body_length);
if (body->size() < state->decoder->next_required_size()) {
return Status::IOError("Expected to be able to read ",
state->decoder->next_required_size(),
" bytes for message body, got ", body->size());
}
RETURN_NOT_OK(state->decoder->Consume(body));
return std::move(state->result);
}
case MessageDecoder::State::EOS:
return Status::Invalid("Unexpected empty message in IPC file format");
default:
return Status::Invalid("Unexpected state: ", state->decoder->state());
}
});
}

Status AlignStream(io::InputStream* stream, int32_t alignment) {
ARROW_ASSIGN_OR_RAISE(int64_t position, stream->Tell());
return stream->Advance(PaddedLength(position, alignment) - position);
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/arrow/ipc/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,11 @@ Result<std::unique_ptr<Message>> ReadMessage(const int64_t offset,
const int32_t metadata_length,
io::RandomAccessFile* file);

ARROW_EXPORT
Future<std::shared_ptr<Message>> ReadMessageAsync(
const int64_t offset, const int32_t metadata_length, const int64_t body_length,
io::RandomAccessFile* file, const io::IOContext& context = io::default_io_context());

/// \brief Advance stream to an 8-byte offset if its position is not a multiple
/// of 8 already
/// \param[in] stream an input stream
Expand Down
Loading