Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cpp/src/arrow/compute/exec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,8 @@ class ScalarExecutor : public FunctionExecutorImpl<ScalarFunction> {
} else {
// XXX: In the case where no outputs are omitted, is returning a 0-length
// array always the correct move?
return MakeArrayOfNull(output_descr_.type, /*length=*/0).ValueOrDie();
return MakeArrayOfNull(output_descr_.type, /*length=*/0, exec_ctx_->memory_pool())
.ValueOrDie();
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions cpp/src/arrow/compute/kernels/vector_selection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1793,7 +1793,8 @@ void StructFilter(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
std::shared_ptr<ArrayData> indices;
KERNEL_RETURN_IF_ERROR(
ctx,
GetTakeIndices(*batch[1].array(), FilterState::Get(ctx).null_selection_behavior)
GetTakeIndices(*batch[1].array(), FilterState::Get(ctx).null_selection_behavior,
ctx->memory_pool())
.Value(&indices));

Datum result;
Expand All @@ -1820,7 +1821,8 @@ Result<std::shared_ptr<RecordBatch>> FilterRecordBatch(const RecordBatch& batch,
const auto& filter_opts = *static_cast<const FilterOptions*>(options);
ARROW_ASSIGN_OR_RAISE(
std::shared_ptr<ArrayData> indices,
GetTakeIndices(*filter.array(), filter_opts.null_selection_behavior));
GetTakeIndices(*filter.array(), filter_opts.null_selection_behavior,
ctx->memory_pool()));
std::vector<std::shared_ptr<Array>> columns(batch.num_columns());
for (int i = 0; i < batch.num_columns(); ++i) {
ARROW_ASSIGN_OR_RAISE(Datum out, Take(batch.column(i)->data(), Datum(indices),
Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/io/memory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ BufferOutputStream::BufferOutputStream(const std::shared_ptr<ResizableBuffer>& b
Result<std::shared_ptr<BufferOutputStream>> BufferOutputStream::Create(
int64_t initial_capacity, MemoryPool* pool) {
// ctor is private, so cannot use make_shared
DCHECK_NE(pool, nullptr);
auto ptr = std::shared_ptr<BufferOutputStream>(new BufferOutputStream);
RETURN_NOT_OK(ptr->Reset(initial_capacity, pool));
return ptr;
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/ipc/feather.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
#include <string>
#include <vector>

#include "arrow/result.h"
#include "arrow/type_fwd.h"
#include "arrow/util/compression.h"
#include "arrow/util/visibility.h"

Expand Down
19 changes: 12 additions & 7 deletions cpp/src/arrow/ipc/metadata_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -869,12 +869,12 @@ Status SchemaToFlatbuffer(FBB& fbb, const Schema& schema,
Result<std::shared_ptr<Buffer>> WriteFBMessage(
FBB& fbb, flatbuf::MessageHeader header_type, flatbuffers::Offset<void> header,
int64_t body_length, MetadataVersion version,
const std::shared_ptr<const KeyValueMetadata>& custom_metadata = nullptr) {
const std::shared_ptr<const KeyValueMetadata>& custom_metadata, MemoryPool* pool) {
auto message = flatbuf::CreateMessage(fbb, MetadataVersionToFlatbuffer(version),
header_type, header, body_length,
SerializeCustomMetadata(fbb, custom_metadata));
fbb.Finish(message);
return WriteFlatbufferBuilder(fbb);
return WriteFlatbufferBuilder(fbb, pool);
}

using FieldNodeVector =
Expand Down Expand Up @@ -1181,7 +1181,8 @@ Status WriteSchemaMessage(const Schema& schema, const DictionaryFieldMapper& map
flatbuffers::Offset<flatbuf::Schema> fb_schema;
RETURN_NOT_OK(SchemaToFlatbuffer(fbb, schema, mapper, &fb_schema));
return WriteFBMessage(fbb, flatbuf::MessageHeader::Schema, fb_schema.Union(),
/*body_length=*/0, options.metadata_version)
/*body_length=*/0, options.metadata_version,
/*custom_metadata=*/nullptr, options.memory_pool)
.Value(out);
}

Expand All @@ -1195,7 +1196,8 @@ Status WriteRecordBatchMessage(
RETURN_NOT_OK(
MakeRecordBatch(fbb, length, body_length, nodes, buffers, options, &record_batch));
return WriteFBMessage(fbb, flatbuf::MessageHeader::RecordBatch, record_batch.Union(),
body_length, options.metadata_version, custom_metadata)
body_length, options.metadata_version, custom_metadata,
options.memory_pool)
.Value(out);
}

Expand Down Expand Up @@ -1229,7 +1231,8 @@ Result<std::shared_ptr<Buffer>> WriteTensorMessage(const Tensor& tensor,
flatbuf::CreateTensor(fbb, fb_type_type, fb_type, fb_shape, fb_strides, &buffer);

return WriteFBMessage(fbb, flatbuf::MessageHeader::Tensor, fb_tensor.Union(),
body_length, options.metadata_version);
body_length, options.metadata_version,
/*custom_metadata=*/nullptr, options.memory_pool);
}

Result<std::shared_ptr<Buffer>> WriteSparseTensorMessage(
Expand All @@ -1240,7 +1243,8 @@ Result<std::shared_ptr<Buffer>> WriteSparseTensorMessage(
RETURN_NOT_OK(
MakeSparseTensor(fbb, sparse_tensor, body_length, buffers, &fb_sparse_tensor));
return WriteFBMessage(fbb, flatbuf::MessageHeader::SparseTensor,
fb_sparse_tensor.Union(), body_length, options.metadata_version);
fb_sparse_tensor.Union(), body_length, options.metadata_version,
/*custom_metadata=*/nullptr, options.memory_pool);
}

Status WriteDictionaryMessage(
Expand All @@ -1255,7 +1259,8 @@ Status WriteDictionaryMessage(
auto dictionary_batch =
flatbuf::CreateDictionaryBatch(fbb, id, record_batch, is_delta).Union();
return WriteFBMessage(fbb, flatbuf::MessageHeader::DictionaryBatch, dictionary_batch,
body_length, options.metadata_version, custom_metadata)
body_length, options.metadata_version, custom_metadata,
options.memory_pool)
.Value(out);
}

Expand Down
5 changes: 3 additions & 2 deletions cpp/src/arrow/ipc/metadata_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,11 @@ Status WriteDictionaryMessage(
const IpcWriteOptions& options, std::shared_ptr<Buffer>* out);

static inline Result<std::shared_ptr<Buffer>> WriteFlatbufferBuilder(
flatbuffers::FlatBufferBuilder& fbb) {
flatbuffers::FlatBufferBuilder& fbb, // NOLINT non-const reference
MemoryPool* pool = default_memory_pool()) {
int32_t size = fbb.GetSize();

ARROW_ASSIGN_OR_RAISE(auto result, AllocateBuffer(size));
ARROW_ASSIGN_OR_RAISE(auto result, AllocateBuffer(size, pool));

uint8_t* dst = result->mutable_data();
memcpy(dst, fbb.GetBufferPointer(), size);
Expand Down
4 changes: 0 additions & 4 deletions cpp/src/arrow/memory_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -264,10 +264,6 @@ class MimallocAllocator {

} // namespace

MemoryPool::MemoryPool() {}

MemoryPool::~MemoryPool() {}

int64_t MemoryPool::max_memory() const { return -1; }

///////////////////////////////////////////////////////////////////////
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/memory_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class MemoryPoolStats {
/// take care of the required 64-byte alignment.
class ARROW_EXPORT MemoryPool {
public:
virtual ~MemoryPool();
virtual ~MemoryPool() = default;

/// \brief EXPERIMENTAL. Create a new instance of the default MemoryPool
static std::unique_ptr<MemoryPool> CreateDefault();
Expand Down Expand Up @@ -101,7 +101,7 @@ class ARROW_EXPORT MemoryPool {
virtual std::string backend_name() const = 0;

protected:
MemoryPool();
MemoryPool() = default;
};

class ARROW_EXPORT LoggingMemoryPool : public MemoryPool {
Expand Down
2 changes: 1 addition & 1 deletion r/man/enums.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 8 additions & 6 deletions r/src/array_from_vector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ struct VectorToArrayConverter {

static std::shared_ptr<Array> Visit(SEXP x, const std::shared_ptr<DataType>& type) {
std::unique_ptr<ArrayBuilder> builder;
StopIfNotOk(MakeBuilder(arrow::default_memory_pool(), type, &builder));
StopIfNotOk(MakeBuilder(gc_memory_pool(), type, &builder));

VectorToArrayConverter converter{x, builder.get()};
StopIfNotOk(arrow::VisitTypeInline(*type, &converter));
Expand All @@ -342,7 +342,7 @@ std::shared_ptr<Array> MakeFactorArrayImpl(cpp11::integers factor,
auto n = factor.size();

std::shared_ptr<Buffer> indices_buffer =
ValueOrStop(AllocateBuffer(n * sizeof(value_type)));
ValueOrStop(AllocateBuffer(n * sizeof(value_type), gc_memory_pool()));

std::vector<std::shared_ptr<Buffer>> buffers{nullptr, indices_buffer};

Expand All @@ -357,7 +357,8 @@ std::shared_ptr<Array> MakeFactorArrayImpl(cpp11::integers factor,

if (i < n) {
// there are NA's so we need a null buffer
auto null_buffer = ValueOrStop(AllocateBuffer(BitUtil::BytesForBits(n)));
auto null_buffer =
ValueOrStop(AllocateBuffer(BitUtil::BytesForBits(n), gc_memory_pool()));
internal::FirstTimeBitmapWriter null_bitmap_writer(null_buffer->mutable_data(), 0, n);

// catch up
Expand Down Expand Up @@ -1367,7 +1368,8 @@ std::shared_ptr<Array> MakeSimpleArray(SEXP x) {

auto first_na = std::find_if(p_vec_start, p_vec_end, is_na<value_type>);
if (first_na < p_vec_end) {
auto null_bitmap = ValueOrStop(AllocateBuffer(BitUtil::BytesForBits(n)));
auto null_bitmap =
ValueOrStop(AllocateBuffer(BitUtil::BytesForBits(n), gc_memory_pool()));
internal::FirstTimeBitmapWriter bitmap_writer(null_bitmap->mutable_data(), 0, n);

// first loop to clear all the bits before the first NA
Expand Down Expand Up @@ -1512,7 +1514,7 @@ std::shared_ptr<arrow::Array> Array__from_vector(

// Create ArrayBuilder for type
std::unique_ptr<arrow::ArrayBuilder> type_builder;
StopIfNotOk(arrow::MakeBuilder(arrow::default_memory_pool(), type, &type_builder));
StopIfNotOk(arrow::MakeBuilder(gc_memory_pool(), type, &type_builder));
StopIfNotOk(converter->Init(type_builder.get()));

// ingest R data and grab the result array
Expand Down Expand Up @@ -1568,7 +1570,7 @@ std::shared_ptr<arrow::ChunkedArray> ChunkedArray__from_list(cpp11::list chunks,
if (n == 0) {
std::shared_ptr<arrow::Array> array;
std::unique_ptr<arrow::ArrayBuilder> type_builder;
StopIfNotOk(arrow::MakeBuilder(arrow::default_memory_pool(), type, &type_builder));
StopIfNotOk(arrow::MakeBuilder(gc_memory_pool(), type, &type_builder));
StopIfNotOk(type_builder->Finish(&array));
vec.push_back(array);
} else {
Expand Down
6 changes: 3 additions & 3 deletions r/src/array_to_vector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,7 @@ class Converter_Struct : public Converter {
auto struct_array = checked_cast<const arrow::StructArray*>(array.get());
int nf = converters.size();
// Flatten() deals with merging of nulls
auto arrays = ValueOrStop(struct_array->Flatten(default_memory_pool()));
auto arrays = ValueOrStop(struct_array->Flatten(gc_memory_pool()));
for (int i = 0; i < nf; i++) {
StopIfNotOk(converters[i]->Ingest_some_nulls(VECTOR_ELT(data, i), arrays[i], start,
n, chunk_index));
Expand Down Expand Up @@ -818,7 +818,7 @@ class Converter_List : public Converter {

// Build an empty array to match value_type
std::unique_ptr<arrow::ArrayBuilder> builder;
StopIfNotOk(arrow::MakeBuilder(arrow::default_memory_pool(), value_type_, &builder));
StopIfNotOk(arrow::MakeBuilder(gc_memory_pool(), value_type_, &builder));

std::shared_ptr<arrow::Array> array;
StopIfNotOk(builder->Finish(&array));
Expand Down Expand Up @@ -869,7 +869,7 @@ class Converter_FixedSizeList : public Converter {

// Build an empty array to match value_type
std::unique_ptr<arrow::ArrayBuilder> builder;
StopIfNotOk(arrow::MakeBuilder(arrow::default_memory_pool(), value_type_, &builder));
StopIfNotOk(arrow::MakeBuilder(gc_memory_pool(), value_type_, &builder));

std::shared_ptr<arrow::Array> array;
StopIfNotOk(builder->Finish(&array));
Expand Down
4 changes: 3 additions & 1 deletion r/src/arrow_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ SEXP ChunkedArray__as_vector(const std::shared_ptr<arrow::ChunkedArray>& chunked
SEXP Array__as_vector(const std::shared_ptr<arrow::Array>& array);
std::shared_ptr<arrow::Array> Array__from_vector(SEXP x, SEXP type);
std::shared_ptr<arrow::RecordBatch> RecordBatch__from_arrays(SEXP, SEXP);
arrow::MemoryPool* gc_memory_pool();

namespace arrow {

Expand Down Expand Up @@ -71,7 +72,8 @@ class RBuffer : public MutableBuffer {
public:
explicit RBuffer(RVector vec)
: MutableBuffer(reinterpret_cast<uint8_t*>(DATAPTR(vec)),
vec.size() * sizeof(typename RVector::value_type)),
vec.size() * sizeof(typename RVector::value_type),
arrow::CPUDevice::memory_manager(gc_memory_pool())),
vec_(vec) {}

private:
Expand Down
6 changes: 4 additions & 2 deletions r/src/compression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,16 @@ bool util___Codec__IsAvailable(arrow::Compression::type codec) {
std::shared_ptr<arrow::io::CompressedOutputStream> io___CompressedOutputStream__Make(
const std::shared_ptr<arrow::util::Codec>& codec,
const std::shared_ptr<arrow::io::OutputStream>& raw) {
return ValueOrStop(arrow::io::CompressedOutputStream::Make(codec.get(), raw));
return ValueOrStop(
arrow::io::CompressedOutputStream::Make(codec.get(), raw, gc_memory_pool()));
}

// [[arrow::export]]
std::shared_ptr<arrow::io::CompressedInputStream> io___CompressedInputStream__Make(
const std::shared_ptr<arrow::util::Codec>& codec,
const std::shared_ptr<arrow::io::InputStream>& raw) {
return ValueOrStop(arrow::io::CompressedInputStream::Make(codec.get(), raw));
return ValueOrStop(
arrow::io::CompressedInputStream::Make(codec.get(), raw, gc_memory_pool()));
}

#endif
13 changes: 10 additions & 3 deletions r/src/compute.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@
#include <arrow/record_batch.h>
#include <arrow/table.h>

arrow::compute::ExecContext* gc_context() {
static arrow::compute::ExecContext context(gc_memory_pool());
return &context;
}

// [[arrow::export]]
std::shared_ptr<arrow::compute::CastOptions> compute___CastOptions__initialize(
bool allow_int_overflow, bool allow_time_truncate, bool allow_float_truncate) {
Expand All @@ -38,7 +43,7 @@ std::shared_ptr<arrow::Array> Array__cast(
const std::shared_ptr<arrow::Array>& array,
const std::shared_ptr<arrow::DataType>& target_type,
const std::shared_ptr<arrow::compute::CastOptions>& options) {
return ValueOrStop(arrow::compute::Cast(*array, target_type, *options));
return ValueOrStop(arrow::compute::Cast(*array, target_type, *options, gc_context()));
}

// [[arrow::export]]
Expand All @@ -47,7 +52,8 @@ std::shared_ptr<arrow::ChunkedArray> ChunkedArray__cast(
const std::shared_ptr<arrow::DataType>& target_type,
const std::shared_ptr<arrow::compute::CastOptions>& options) {
arrow::Datum value(chunked_array);
arrow::Datum out = ValueOrStop(arrow::compute::Cast(value, target_type, *options));
arrow::Datum out =
ValueOrStop(arrow::compute::Cast(value, target_type, *options, gc_context()));
return out.chunked_array();
}

Expand Down Expand Up @@ -179,7 +185,8 @@ std::shared_ptr<arrow::compute::FunctionOptions> make_compute_options(
SEXP compute__CallFunction(std::string func_name, cpp11::list args, cpp11::list options) {
auto opts = make_compute_options(func_name, options);
auto datum_args = arrow::r::from_r_list<arrow::Datum>(args);
auto out = ValueOrStop(arrow::compute::CallFunction(func_name, datum_args, opts.get()));
auto out = ValueOrStop(
arrow::compute::CallFunction(func_name, datum_args, opts.get(), gc_context()));
return from_datum(out);
}

Expand Down
5 changes: 2 additions & 3 deletions r/src/csv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,8 @@ std::shared_ptr<arrow::csv::TableReader> csv___TableReader__Make(
const std::shared_ptr<arrow::csv::ReadOptions>& read_options,
const std::shared_ptr<arrow::csv::ParseOptions>& parse_options,
const std::shared_ptr<arrow::csv::ConvertOptions>& convert_options) {
return ValueOrStop(arrow::csv::TableReader::Make(arrow::default_memory_pool(), input,
*read_options, *parse_options,
*convert_options));
return ValueOrStop(arrow::csv::TableReader::Make(gc_memory_pool(), input, *read_options,
*parse_options, *convert_options));
}

// [[arrow::export]]
Expand Down
4 changes: 3 additions & 1 deletion r/src/dataset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ namespace fs = ::arrow::fs;
// [[arrow::export]]
std::shared_ptr<ds::ScannerBuilder> dataset___Dataset__NewScan(
const std::shared_ptr<ds::Dataset>& ds) {
return ValueOrStop(ds->NewScan());
auto context = std::make_shared<ds::ScanContext>();
context->pool = gc_memory_pool();
return ValueOrStop(ds->NewScan(std::move(context)));
}

// [[arrow::export]]
Expand Down
6 changes: 3 additions & 3 deletions r/src/io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ void io___MemoryMappedFile__Resize(const std::shared_ptr<arrow::io::MemoryMapped
// [[arrow::export]]
std::shared_ptr<arrow::io::ReadableFile> io___ReadableFile__Open(
const std::string& path) {
return ValueOrStop(arrow::io::ReadableFile::Open(path));
return ValueOrStop(arrow::io::ReadableFile::Open(path, gc_memory_pool()));
}

// ------ arrow::io::BufferReader
Expand Down Expand Up @@ -150,8 +150,8 @@ std::shared_ptr<arrow::io::FileOutputStream> io___FileOutputStream__Open(
// [[arrow::export]]
std::shared_ptr<arrow::io::BufferOutputStream> io___BufferOutputStream__Create(
int64_t initial_capacity) {
return ValueOrStop(arrow::io::BufferOutputStream::Create(initial_capacity,
arrow::default_memory_pool()));
return ValueOrStop(
arrow::io::BufferOutputStream::Create(initial_capacity, gc_memory_pool()));
}

// [[arrow::export]]
Expand Down
2 changes: 1 addition & 1 deletion r/src/json.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ std::shared_ptr<arrow::json::TableReader> json___TableReader__Make(
const std::shared_ptr<arrow::io::InputStream>& input,
const std::shared_ptr<arrow::json::ReadOptions>& read_options,
const std::shared_ptr<arrow::json::ParseOptions>& parse_options) {
return ValueOrStop(arrow::json::TableReader::Make(arrow::default_memory_pool(), input,
return ValueOrStop(arrow::json::TableReader::Make(gc_memory_pool(), input,
*read_options, *parse_options));
}

Expand Down
Loading